net: remove deprecated syststamp timestamp
[cascardo/linux.git] / net / tipc / name_distr.c
index aff8041..dcc15bc 100644 (file)
 #include "link.h"
 #include "name_distr.h"
 
-#define ITEM_SIZE sizeof(struct distr_item)
-
-/**
- * struct distr_item - publication info distributed to other nodes
- * @type: name sequence type
- * @lower: name sequence lower bound
- * @upper: name sequence upper bound
- * @ref: publishing port reference
- * @key: publication key
- *
- * ===> All fields are stored in network byte order. <===
- *
- * First 3 fields identify (name or) name sequence being published.
- * Reference field uniquely identifies port that published name sequence.
- * Key field uniquely identifies publication, in the event a port has
- * multiple publications of the same name sequence.
- *
- * Note: There is no field that identifies the publishing node because it is
- * the same for all items contained within a publication message.
- */
-struct distr_item {
-       __be32 type;
-       __be32 lower;
-       __be32 upper;
-       __be32 ref;
-       __be32 key;
-};
-
 /**
  * struct publ_list - list of publications made by this node
  * @list: circular list of publications
@@ -127,26 +99,24 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest)
        return buf;
 }
 
-static void named_cluster_distribute(struct sk_buff *buf)
+void named_cluster_distribute(struct sk_buff *buf)
 {
-       struct sk_buff *buf_copy;
-       struct tipc_node *n_ptr;
-       struct tipc_link *l_ptr;
+       struct sk_buff *obuf;
+       struct tipc_node *node;
+       u32 dnode;
 
        rcu_read_lock();
-       list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
-               spin_lock_bh(&n_ptr->lock);
-               l_ptr = n_ptr->active_links[n_ptr->addr & 1];
-               if (l_ptr) {
-                       buf_copy = skb_copy(buf, GFP_ATOMIC);
-                       if (!buf_copy) {
-                               spin_unlock_bh(&n_ptr->lock);
-                               break;
-                       }
-                       msg_set_destnode(buf_msg(buf_copy), n_ptr->addr);
-                       __tipc_link_xmit(l_ptr, buf_copy);
-               }
-               spin_unlock_bh(&n_ptr->lock);
+       list_for_each_entry_rcu(node, &tipc_node_list, list) {
+               dnode = node->addr;
+               if (in_own_node(dnode))
+                       continue;
+               if (!tipc_node_active_links(node))
+                       continue;
+               obuf = skb_copy(buf, GFP_ATOMIC);
+               if (!obuf)
+                       break;
+               msg_set_destnode(buf_msg(obuf), dnode);
+               tipc_link_xmit(obuf, dnode, dnode);
        }
        rcu_read_unlock();
 
@@ -156,7 +126,7 @@ static void named_cluster_distribute(struct sk_buff *buf)
 /**
  * tipc_named_publish - tell other nodes about a new publication by this node
  */
-void tipc_named_publish(struct publication *publ)
+struct sk_buff *tipc_named_publish(struct publication *publ)
 {
        struct sk_buff *buf;
        struct distr_item *item;
@@ -165,23 +135,23 @@ void tipc_named_publish(struct publication *publ)
        publ_lists[publ->scope]->size++;
 
        if (publ->scope == TIPC_NODE_SCOPE)
-               return;
+               return NULL;
 
        buf = named_prepare_buf(PUBLICATION, ITEM_SIZE, 0);
        if (!buf) {
                pr_warn("Publication distribution failure\n");
-               return;
+               return NULL;
        }
 
        item = (struct distr_item *)msg_data(buf_msg(buf));
        publ_to_item(item, publ);
-       named_cluster_distribute(buf);
+       return buf;
 }
 
 /**
  * tipc_named_withdraw - tell other nodes about a withdrawn publication by this node
  */
-void tipc_named_withdraw(struct publication *publ)
+struct sk_buff *tipc_named_withdraw(struct publication *publ)
 {
        struct sk_buff *buf;
        struct distr_item *item;
@@ -190,47 +160,57 @@ void tipc_named_withdraw(struct publication *publ)
        publ_lists[publ->scope]->size--;
 
        if (publ->scope == TIPC_NODE_SCOPE)
-               return;
+               return NULL;
 
        buf = named_prepare_buf(WITHDRAWAL, ITEM_SIZE, 0);
        if (!buf) {
                pr_warn("Withdrawal distribution failure\n");
-               return;
+               return NULL;
        }
 
        item = (struct distr_item *)msg_data(buf_msg(buf));
        publ_to_item(item, publ);
-       named_cluster_distribute(buf);
+       return buf;
 }
 
-/*
+/**
  * named_distribute - prepare name info for bulk distribution to another node
+ * @msg_list: list of messages (buffers) to be returned from this function
+ * @dnode: node to be updated
+ * @pls: linked list of publication items to be packed into buffer chain
  */
-static void named_distribute(struct list_head *message_list, u32 node,
-                            struct publ_list *pls, u32 max_item_buf)
+static void named_distribute(struct list_head *msg_list, u32 dnode,
+                            struct publ_list *pls)
 {
        struct publication *publ;
        struct sk_buff *buf = NULL;
        struct distr_item *item = NULL;
-       u32 left = 0;
-       u32 rest = pls->size * ITEM_SIZE;
+       uint dsz = pls->size * ITEM_SIZE;
+       uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE;
+       uint rem = dsz;
+       uint msg_rem = 0;
 
        list_for_each_entry(publ, &pls->list, local_list) {
+               /* Prepare next buffer: */
                if (!buf) {
-                       left = (rest <= max_item_buf) ? rest : max_item_buf;
-                       rest -= left;
-                       buf = named_prepare_buf(PUBLICATION, left, node);
+                       msg_rem = min_t(uint, rem, msg_dsz);
+                       rem -= msg_rem;
+                       buf = named_prepare_buf(PUBLICATION, msg_rem, dnode);
                        if (!buf) {
                                pr_warn("Bulk publication failure\n");
                                return;
                        }
                        item = (struct distr_item *)msg_data(buf_msg(buf));
                }
+
+               /* Pack publication into message: */
                publ_to_item(item, publ);
                item++;
-               left -= ITEM_SIZE;
-               if (!left) {
-                       list_add_tail((struct list_head *)buf, message_list);
+               msg_rem -= ITEM_SIZE;
+
+               /* Append full buffer to list: */
+               if (!msg_rem) {
+                       list_add_tail((struct list_head *)buf, msg_list);
                        buf = NULL;
                }
        }
@@ -239,38 +219,20 @@ static void named_distribute(struct list_head *message_list, u32 node,
 /**
  * tipc_named_node_up - tell specified node about all publications by this node
  */
-void tipc_named_node_up(unsigned long nodearg)
+void tipc_named_node_up(u32 dnode)
 {
-       struct tipc_node *n_ptr;
-       struct tipc_link *l_ptr;
-       struct list_head message_list;
-       u32 node = (u32)nodearg;
-       u32 max_item_buf = 0;
-
-       /* compute maximum amount of publication data to send per message */
-       read_lock_bh(&tipc_net_lock);
-       n_ptr = tipc_node_find(node);
-       if (n_ptr) {
-               tipc_node_lock(n_ptr);
-               l_ptr = n_ptr->active_links[0];
-               if (l_ptr)
-                       max_item_buf = ((l_ptr->max_pkt - INT_H_SIZE) /
-                               ITEM_SIZE) * ITEM_SIZE;
-               tipc_node_unlock(n_ptr);
-       }
-       read_unlock_bh(&tipc_net_lock);
-       if (!max_item_buf)
-               return;
-
-       /* create list of publication messages, then send them as a unit */
-       INIT_LIST_HEAD(&message_list);
+       LIST_HEAD(msg_list);
+       struct sk_buff *buf_chain;
 
        read_lock_bh(&tipc_nametbl_lock);
-       named_distribute(&message_list, node, &publ_cluster, max_item_buf);
-       named_distribute(&message_list, node, &publ_zone, max_item_buf);
+       named_distribute(&msg_list, dnode, &publ_cluster);
+       named_distribute(&msg_list, dnode, &publ_zone);
        read_unlock_bh(&tipc_nametbl_lock);
 
-       tipc_link_names_xmit(&message_list, node);
+       /* Convert circular list to linear list and send: */
+       buf_chain = (struct sk_buff *)msg_list.next;
+       ((struct sk_buff *)msg_list.prev)->next = NULL;
+       tipc_link_xmit(buf_chain, dnode, dnode);
 }
 
 /**