#include "bitmap.h"
#include "column.h"
#include "dynamic-string.h"
+#include "monitor.h"
#include "json.h"
#include "jsonrpc.h"
#include "ovsdb-error.h"
#include "timeval.h"
#include "transaction.h"
#include "trigger.h"
-#include "monitor.h"
#include "openvswitch/vlog.h"
VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server);
struct ovsdb_jsonrpc_remote;
struct ovsdb_jsonrpc_session;
+/* Set false to defeature monitor2, causing jsonrpc to respond to monitor2
+ * method with an error. */
+static bool monitor2_enable__ = true;
+
/* Message rate-limiting. */
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
/* Monitors. */
static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_create(
struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params,
- const struct json *request_id);
+ enum ovsdb_monitor_version, const struct json *request_id);
static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
struct ovsdb_jsonrpc_session *,
struct json_array *params,
struct ovsdb_jsonrpc_server {
struct ovsdb_server up;
- unsigned int n_sessions, max_sessions;
+ unsigned int n_sessions;
struct shash remotes; /* Contains "struct ovsdb_jsonrpc_remote *"s. */
};
{
struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
ovsdb_server_init(&server->up);
- server->max_sessions = 330; /* Random limit. */
shash_init(&server->remotes);
return server;
}
struct ovsdb_jsonrpc_remote *remote = node->data;
if (remote->listener) {
- if (svr->n_sessions < svr->max_sessions) {
- struct stream *stream;
- int error;
-
- error = pstream_accept(remote->listener, &stream);
- if (!error) {
- struct jsonrpc_session *js;
- js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
- remote->dscp);
- ovsdb_jsonrpc_session_create(remote, js);
- } else if (error != EAGAIN) {
- VLOG_WARN_RL(&rl, "%s: accept failed: %s",
- pstream_get_name(remote->listener),
- ovs_strerror(error));
- }
- } else {
- VLOG_WARN_RL(&rl, "%s: connection exceeded maximum (%d)",
+ struct stream *stream;
+ int error;
+
+ error = pstream_accept(remote->listener, &stream);
+ if (!error) {
+ struct jsonrpc_session *js;
+ js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
+ remote->dscp);
+ ovsdb_jsonrpc_session_create(remote, js);
+ } else if (error != EAGAIN) {
+ VLOG_WARN_RL(&rl, "%s: accept failed: %s",
pstream_get_name(remote->listener),
- svr->max_sessions);
+ ovs_strerror(error));
}
}
SHASH_FOR_EACH (node, &svr->remotes) {
struct ovsdb_jsonrpc_remote *remote = node->data;
- if (remote->listener && svr->n_sessions < svr->max_sessions) {
+ if (remote->listener) {
pstream_wait(remote->listener);
}
struct simap *usage)
{
simap_increase(usage, "triggers", hmap_count(&s->triggers));
- simap_increase(usage, "monitors", hmap_count(&s->monitors));
simap_increase(usage, "backlog", jsonrpc_session_get_backlog(s->js));
}
if (!reply) {
reply = execute_transaction(s, db, request);
}
- } else if (!strcmp(request->method, "monitor")) {
+ } else if (!strcmp(request->method, "monitor") ||
+ (monitor2_enable__ && !strcmp(request->method, "monitor2"))) {
struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
if (!reply) {
+ int l = strlen(request->method) - strlen("monitor");
+ enum ovsdb_monitor_version version = l ? OVSDB_MONITOR_V2
+ : OVSDB_MONITOR_V1;
reply = ovsdb_jsonrpc_monitor_create(s, db, request->params,
- request->id);
+ version, request->id);
}
} else if (!strcmp(request->method, "monitor_cancel")) {
reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
struct ovsdb_monitor *dbmon;
uint64_t unflushed; /* The first transaction that has not been
flushed to the jsonrpc remote client. */
+ enum ovsdb_monitor_version version;
};
static struct ovsdb_jsonrpc_monitor *
static struct jsonrpc_msg *
ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
struct json *params,
+ enum ovsdb_monitor_version version,
const struct json *request_id)
{
struct ovsdb_jsonrpc_monitor *m = NULL;
m->db = db;
m->dbmon = ovsdb_monitor_create(db, m);
m->unflushed = 0;
+ m->version = version;
hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
m->monitor_id = json_clone(monitor_id);
ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m,
bool initial)
{
- return ovsdb_monitor_compose_update(m->dbmon, initial, &m->unflushed);
+ if (!ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
+ return NULL;
+ }
+
+ return ovsdb_monitor_get_update(m->dbmon, initial, &m->unflushed,
+ m->version);
}
static bool
free(m);
}
+static struct jsonrpc_msg *
+ovsdb_jsonrpc_create_notify(const struct ovsdb_jsonrpc_monitor *m,
+ struct json *params)
+{
+ const char *method;
+
+ switch(m->version) {
+ case OVSDB_MONITOR_V1:
+ method = "update";
+ break;
+ case OVSDB_MONITOR_V2:
+ method = "update2";
+ break;
+ case OVSDB_MONITOR_VERSION_MAX:
+ default:
+ OVS_NOT_REACHED();
+ }
+
+ return jsonrpc_create_notify(method, params);
+}
+
static void
ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *s)
{
struct json *params;
params = json_array_create_2(json_clone(m->monitor_id), json);
- msg = jsonrpc_create_notify("update", params);
+ msg = ovsdb_jsonrpc_create_notify(m, params);
jsonrpc_session_send(s->js, msg);
}
}
}
+
+void
+ovsdb_jsonrpc_disable_monitor2(void)
+{
+ /* Once disabled, it is not possible to re-enable it. */
+ monitor2_enable__ = false;
+}