ovsdb-monitor: add transaction ids
authorAndy Zhou <azhou@nicira.com>
Fri, 13 Mar 2015 06:15:56 +0000 (23:15 -0700)
committerAndy Zhou <azhou@nicira.com>
Sat, 30 May 2015 00:39:50 +0000 (17:39 -0700)
With N:1 mappings, multiple jsonrpc server may be servicing the rpc
connection at a different pace. ovsdb-monitor thus needs to maintain
different change sets, depends on connection speed of each rpc
connections. Connections servicing at the same speed can share the
same change set.

Transaction ID is an concept added to describe the change set. One
possible view of the database state is a sequence of changes, more
precisely, commits be applied to it in order, starting from an
initial state, with commit 0. The logic can also be applied to the
jsonrpc monitor; each change it pushes corresponds to commits between
two transaction IDs.

This patch introduces transaction IDs. For ovsdb-monitor, it maintains
n_transactions, starting from 0. Each commit add 1 to the number.
Jsonrpc maintains and 'unflushed' transaction number, corresponding to
the next commit the remote has not seen. jsonrpc's job is simply to
notice there are changes in the ovsdb-monitor that it is interested in,
i.e.  'n_transactions' >= 'unflushed', get the changes in json format,
and push them to the remote site.

Signed-off-by: Andy Zhou <azhou@nicira.com>
Acked-by: Ben Pfaff <blp@nicira.com>
ovsdb/jsonrpc-server.c
ovsdb/monitor.c
ovsdb/monitor.h

index 93d5977..84e1bb4 100644 (file)
@@ -90,7 +90,7 @@ static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
 static void ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *);
 static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *);
 static struct json *ovsdb_jsonrpc_monitor_compose_table_update(
-    const struct ovsdb_jsonrpc_monitor *monitor, bool initial);
+    struct ovsdb_jsonrpc_monitor *monitor, bool initial);
 
 \f
 /* JSON-RPC database server. */
@@ -1037,6 +1037,8 @@ struct ovsdb_jsonrpc_monitor {
     struct hmap_node node;      /* In ovsdb_jsonrpc_session's "monitors". */
     struct json *monitor_id;
     struct ovsdb_monitor *dbmon;
+    uint64_t unflushed;         /* The first transaction that has not been
+                                       flushed to the jsonrpc remote client. */
 };
 
 static struct ovsdb_jsonrpc_monitor *
@@ -1181,6 +1183,7 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
     m->session = s;
     m->db = db;
     m->dbmon = ovsdb_monitor_create(db, m);
+    m->unflushed = 0;
     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
     m->monitor_id = json_clone(monitor_id);
 
@@ -1280,9 +1283,10 @@ ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
 
 static struct json *
 ovsdb_jsonrpc_monitor_compose_table_update(
-    const struct ovsdb_jsonrpc_monitor *monitor, bool initial)
+    struct ovsdb_jsonrpc_monitor *monitor, bool initial)
 {
-    return ovsdb_monitor_compose_table_update(monitor->dbmon, initial);
+    return ovsdb_monitor_compose_table_update(monitor->dbmon, initial,
+                                      &monitor->unflushed);
 }
 
 static bool
@@ -1291,7 +1295,7 @@ ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s)
     struct ovsdb_jsonrpc_monitor *m;
 
     HMAP_FOR_EACH (m, node, &s->monitors) {
-        if (ovsdb_monitor_needs_flush(m->dbmon)) {
+        if (ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
             return true;
         }
     }
index db47ccd..769ea36 100644 (file)
@@ -49,6 +49,7 @@ struct ovsdb_monitor {
     struct shash tables;     /* Holds "struct ovsdb_monitor_table"s. */
     struct ovs_list jsonrpc_monitors;  /* Contains "jsonrpc_monitor_node"s. */
     struct ovsdb *db;
+    uint64_t n_transactions;      /* Count number of committed transactions. */
 };
 
 struct jsonrpc_monitor_node {
@@ -214,6 +215,7 @@ ovsdb_monitor_create(struct ovsdb *db,
     ovsdb_add_replica(db, &dbmon->replica);
     list_init(&dbmon->jsonrpc_monitors);
     dbmon->db = db;
+    dbmon->n_transactions = 0;
     shash_init(&dbmon->tables);
 
     jm = xzalloc(sizeof *jm);
@@ -376,14 +378,16 @@ ovsdb_monitor_compose_row_update(
  * be used as part of the initial reply to a "monitor" request, false if it is
  * going to be used as part of an "update" notification. */
 struct json *
-ovsdb_monitor_compose_table_update(
-    const struct ovsdb_monitor *dbmon, bool initial)
+ovsdb_monitor_compose_table_update(const struct ovsdb_monitor *dbmon,
+                                   bool initial, uint64_t *unflushed)
 {
     struct shash_node *node;
     unsigned long int *changed;
     struct json *json;
     size_t max_columns;
 
+    *unflushed = dbmon->n_transactions + 1;
+
     max_columns = 0;
     SHASH_FOR_EACH (node, &dbmon->tables) {
         struct ovsdb_monitor_table *mt = node->data;
@@ -432,18 +436,11 @@ ovsdb_monitor_compose_table_update(
 }
 
 bool
-ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon)
+ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon,
+                          uint64_t next_transaction)
 {
-    struct shash_node *node;
-
-    SHASH_FOR_EACH (node, &dbmon->tables) {
-        struct ovsdb_monitor_table *mt = node->data;
-
-        if (!hmap_is_empty(&mt->changes)) {
-            return true;
-        }
-    }
-    return false;
+    ovs_assert(next_transaction <= dbmon->n_transactions + 1);
+    return (next_transaction <= dbmon->n_transactions);
 }
 
 void
@@ -596,6 +593,7 @@ ovsdb_monitor_commit(struct ovsdb_replica *replica,
 
     ovsdb_monitor_init_aux(&aux, m);
     ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux);
+    m->n_transactions++;
 
     return NULL;
 }
index a432432..4db36b2 100644 (file)
@@ -46,14 +46,15 @@ const char * OVS_WARN_UNUSED_RESULT
 ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *,
                           const struct ovsdb_table *);
 
-struct json *ovsdb_monitor_compose_table_update(
-    const struct ovsdb_monitor *dbmon, bool initial);
+struct json *ovsdb_monitor_compose_table_update(const struct ovsdb_monitor *dbmon,
+                          bool initial, uint64_t *unflushed_transaction);
 
 void ovsdb_monitor_table_add_select(struct ovsdb_monitor *dbmon,
                                     const struct ovsdb_table *table,
                                     enum ovsdb_monitor_selection select);
 
-bool ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon);
+bool ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon,
+                               uint64_t next_transaction);
 
 void ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon);
 #endif