net: remove deprecated syststamp timestamp
[cascardo/linux.git] / net / tipc / link.c
index ad2c57f..fb1485d 100644 (file)
@@ -82,15 +82,13 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf);
 static int  tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
                                 struct sk_buff **buf);
 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
-static int  tipc_link_iovec_long_xmit(struct tipc_port *sender,
-                                     struct iovec const *msg_sect,
-                                     unsigned int len, u32 destnode);
 static void link_state_event(struct tipc_link *l_ptr, u32 event);
 static void link_reset_statistics(struct tipc_link *l_ptr);
 static void link_print(struct tipc_link *l_ptr, const char *str);
-static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
 static void tipc_link_sync_xmit(struct tipc_link *l);
 static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
+static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf);
+static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf);
 
 /*
  *  Simple link routines
@@ -335,13 +333,15 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
 static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
 {
        struct tipc_port *p_ptr;
+       struct tipc_sock *tsk;
 
        spin_lock_bh(&tipc_port_list_lock);
        p_ptr = tipc_port_lock(origport);
        if (p_ptr) {
                if (!list_empty(&p_ptr->wait_list))
                        goto exit;
-               p_ptr->congested = 1;
+               tsk = tipc_port_to_sock(p_ptr);
+               tsk->link_cong = 1;
                p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
                list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
                l_ptr->stats.link_congs++;
@@ -355,6 +355,7 @@ exit:
 void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
 {
        struct tipc_port *p_ptr;
+       struct tipc_sock *tsk;
        struct tipc_port *temp_p_ptr;
        int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
 
@@ -370,10 +371,11 @@ void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
                                 wait_list) {
                if (win <= 0)
                        break;
+               tsk = tipc_port_to_sock(p_ptr);
                list_del_init(&p_ptr->wait_list);
                spin_lock_bh(p_ptr->lock);
-               p_ptr->congested = 0;
-               tipc_port_wakeup(p_ptr);
+               tsk->link_cong = 0;
+               tipc_sock_wakeup(tsk);
                win -= p_ptr->waiting_pkts;
                spin_unlock_bh(p_ptr->lock);
        }
@@ -676,178 +678,142 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
        }
 }
 
-/*
- * link_bundle_buf(): Append contents of a buffer to
- * the tail of an existing one.
+/* tipc_link_cong: determine return value and how to treat the
+ * sent buffer during link congestion.
+ * - For plain, errorless user data messages we keep the buffer and
+ *   return -ELINKONG.
+ * - For all other messages we discard the buffer and return -EHOSTUNREACH
+ * - For TIPC internal messages we also reset the link
  */
-static int link_bundle_buf(struct tipc_link *l_ptr, struct sk_buff *bundler,
-                          struct sk_buff *buf)
+static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
 {
-       struct tipc_msg *bundler_msg = buf_msg(bundler);
        struct tipc_msg *msg = buf_msg(buf);
-       u32 size = msg_size(msg);
-       u32 bundle_size = msg_size(bundler_msg);
-       u32 to_pos = align(bundle_size);
-       u32 pad = to_pos - bundle_size;
-
-       if (msg_user(bundler_msg) != MSG_BUNDLER)
-               return 0;
-       if (msg_type(bundler_msg) != OPEN_MSG)
-               return 0;
-       if (skb_tailroom(bundler) < (pad + size))
-               return 0;
-       if (l_ptr->max_pkt < (to_pos + size))
-               return 0;
-
-       skb_put(bundler, pad + size);
-       skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
-       msg_set_size(bundler_msg, to_pos + size);
-       msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
-       kfree_skb(buf);
-       l_ptr->stats.sent_bundled++;
-       return 1;
-}
-
-static void link_add_to_outqueue(struct tipc_link *l_ptr,
-                                struct sk_buff *buf,
-                                struct tipc_msg *msg)
-{
-       u32 ack = mod(l_ptr->next_in_no - 1);
-       u32 seqno = mod(l_ptr->next_out_no++);
+       uint psz = msg_size(msg);
+       uint imp = tipc_msg_tot_importance(msg);
+       u32 oport = msg_tot_origport(msg);
 
-       msg_set_word(msg, 2, ((ack << 16) | seqno));
-       msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
-       buf->next = NULL;
-       if (l_ptr->first_out) {
-               l_ptr->last_out->next = buf;
-               l_ptr->last_out = buf;
-       } else
-               l_ptr->first_out = l_ptr->last_out = buf;
-
-       l_ptr->out_queue_size++;
-       if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
-               l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
-}
-
-static void link_add_chain_to_outqueue(struct tipc_link *l_ptr,
-                                      struct sk_buff *buf_chain,
-                                      u32 long_msgno)
-{
-       struct sk_buff *buf;
-       struct tipc_msg *msg;
-
-       if (!l_ptr->next_out)
-               l_ptr->next_out = buf_chain;
-       while (buf_chain) {
-               buf = buf_chain;
-               buf_chain = buf_chain->next;
-
-               msg = buf_msg(buf);
-               msg_set_long_msgno(msg, long_msgno);
-               link_add_to_outqueue(l_ptr, buf, msg);
+       if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) {
+               if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
+                       link_schedule_port(link, oport, psz);
+                       return -ELINKCONG;
+               }
+       } else {
+               pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
+               tipc_link_reset(link);
        }
+       kfree_skb_list(buf);
+       return -EHOSTUNREACH;
 }
 
-/*
- * tipc_link_xmit() is the 'full path' for messages, called from
- * inside TIPC when the 'fast path' in tipc_send_xmit
- * has failed, and from link_send()
+/**
+ * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
+ * @link: link to use
+ * @buf: chain of buffers containing message
+ * Consumes the buffer chain, except when returning -ELINKCONG
+ * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
+ * user data messages) or -EHOSTUNREACH (all other messages/senders)
+ * Only the socket functions tipc_send_stream() and tipc_send_packet() need
+ * to act on the return value, since they may need to do more send attempts.
  */
-int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf)
+int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf)
 {
        struct tipc_msg *msg = buf_msg(buf);
-       u32 size = msg_size(msg);
-       u32 dsz = msg_data_sz(msg);
-       u32 queue_size = l_ptr->out_queue_size;
-       u32 imp = tipc_msg_tot_importance(msg);
-       u32 queue_limit = l_ptr->queue_limit[imp];
-       u32 max_packet = l_ptr->max_pkt;
-
-       /* Match msg importance against queue limits: */
-       if (unlikely(queue_size >= queue_limit)) {
-               if (imp <= TIPC_CRITICAL_IMPORTANCE) {
-                       link_schedule_port(l_ptr, msg_origport(msg), size);
-                       kfree_skb(buf);
-                       return -ELINKCONG;
-               }
-               kfree_skb(buf);
-               if (imp > CONN_MANAGER) {
-                       pr_warn("%s<%s>, send queue full", link_rst_msg,
-                               l_ptr->name);
-                       tipc_link_reset(l_ptr);
-               }
-               return dsz;
+       uint psz = msg_size(msg);
+       uint qsz = link->out_queue_size;
+       uint sndlim = link->queue_limit[0];
+       uint imp = tipc_msg_tot_importance(msg);
+       uint mtu = link->max_pkt;
+       uint ack = mod(link->next_in_no - 1);
+       uint seqno = link->next_out_no;
+       uint bc_last_in = link->owner->bclink.last_in;
+       struct tipc_media_addr *addr = &link->media_addr;
+       struct sk_buff *next = buf->next;
+
+       /* Match queue limits against msg importance: */
+       if (unlikely(qsz >= link->queue_limit[imp]))
+               return tipc_link_cong(link, buf);
+
+       /* Has valid packet limit been used ? */
+       if (unlikely(psz > mtu)) {
+               kfree_skb_list(buf);
+               return -EMSGSIZE;
        }
 
-       /* Fragmentation needed ? */
-       if (size > max_packet)
-               return tipc_link_frag_xmit(l_ptr, buf);
-
-       /* Packet can be queued or sent. */
-       if (likely(!link_congested(l_ptr))) {
-               link_add_to_outqueue(l_ptr, buf, msg);
+       /* Prepare each packet for sending, and add to outqueue: */
+       while (buf) {
+               next = buf->next;
+               msg = buf_msg(buf);
+               msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
+               msg_set_bcast_ack(msg, bc_last_in);
+
+               if (!link->first_out) {
+                       link->first_out = buf;
+               } else if (qsz < sndlim) {
+                       link->last_out->next = buf;
+               } else if (tipc_msg_bundle(link->last_out, buf, mtu)) {
+                       link->stats.sent_bundled++;
+                       buf = next;
+                       next = buf->next;
+                       continue;
+               } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) {
+                       link->stats.sent_bundled++;
+                       link->stats.sent_bundles++;
+                       link->last_out->next = buf;
+                       if (!link->next_out)
+                               link->next_out = buf;
+               } else {
+                       link->last_out->next = buf;
+                       if (!link->next_out)
+                               link->next_out = buf;
+               }
 
-               tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
-               l_ptr->unacked_window = 0;
-               return dsz;
-       }
-       /* Congestion: can message be bundled ? */
-       if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
-           (msg_user(msg) != MSG_FRAGMENTER)) {
-
-               /* Try adding message to an existing bundle */
-               if (l_ptr->next_out &&
-                   link_bundle_buf(l_ptr, l_ptr->last_out, buf))
-                       return dsz;
-
-               /* Try creating a new bundle */
-               if (size <= max_packet * 2 / 3) {
-                       struct sk_buff *bundler = tipc_buf_acquire(max_packet);
-                       struct tipc_msg bundler_hdr;
-
-                       if (bundler) {
-                               tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
-                                        INT_H_SIZE, l_ptr->addr);
-                               skb_copy_to_linear_data(bundler, &bundler_hdr,
-                                                       INT_H_SIZE);
-                               skb_trim(bundler, INT_H_SIZE);
-                               link_bundle_buf(l_ptr, bundler, buf);
-                               buf = bundler;
-                               msg = buf_msg(buf);
-                               l_ptr->stats.sent_bundles++;
-                       }
+               /* Send packet if possible: */
+               if (likely(++qsz <= sndlim)) {
+                       tipc_bearer_send(link->bearer_id, buf, addr);
+                       link->next_out = next;
+                       link->unacked_window = 0;
                }
+               seqno++;
+               link->last_out = buf;
+               buf = next;
        }
-       if (!l_ptr->next_out)
-               l_ptr->next_out = buf;
-       link_add_to_outqueue(l_ptr, buf, msg);
-       return dsz;
+       link->next_out_no = seqno;
+       link->out_queue_size = qsz;
+       return 0;
 }
 
-/*
- * tipc_link_xmit(): same as __tipc_link_xmit(), but the link to use
- * has not been selected yet, and the the owner node is not locked
- * Called by TIPC internal users, e.g. the name distributor
+/**
+ * tipc_link_xmit() is the general link level function for message sending
+ * @buf: chain of buffers containing message
+ * @dsz: amount of user data to be sent
+ * @dnode: address of destination node
+ * @selector: a number used for deterministic link selection
+ * Consumes the buffer chain, except when returning -ELINKCONG
+ * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
  */
-int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector)
+int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
 {
-       struct tipc_link *l_ptr;
-       struct tipc_node *n_ptr;
-       int res = -ELINKCONG;
+       struct tipc_link *link = NULL;
+       struct tipc_node *node;
+       int rc = -EHOSTUNREACH;
 
-       n_ptr = tipc_node_find(dest);
-       if (n_ptr) {
-               tipc_node_lock(n_ptr);
-               l_ptr = n_ptr->active_links[selector & 1];
-               if (l_ptr)
-                       res = __tipc_link_xmit(l_ptr, buf);
-               else
-                       kfree_skb(buf);
-               tipc_node_unlock(n_ptr);
-       } else {
-               kfree_skb(buf);
+       node = tipc_node_find(dnode);
+       if (node) {
+               tipc_node_lock(node);
+               link = node->active_links[selector & 1];
+               if (link)
+                       rc = __tipc_link_xmit(link, buf);
+               tipc_node_unlock(node);
        }
-       return res;
+
+       if (link)
+               return rc;
+
+       if (likely(in_own_node(dnode)))
+               return tipc_sk_rcv(buf);
+
+       kfree_skb_list(buf);
+       return rc;
 }
 
 /*
@@ -858,7 +824,7 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector)
  *
  * Called with node locked
  */
-static void tipc_link_sync_xmit(struct tipc_link *l)
+static void tipc_link_sync_xmit(struct tipc_link *link)
 {
        struct sk_buff *buf;
        struct tipc_msg *msg;
@@ -868,10 +834,9 @@ static void tipc_link_sync_xmit(struct tipc_link *l)
                return;
 
        msg = buf_msg(buf);
-       tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr);
-       msg_set_last_bcast(msg, l->owner->bclink.acked);
-       link_add_chain_to_outqueue(l, buf, 0);
-       tipc_link_push_queue(l);
+       tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr);
+       msg_set_last_bcast(msg, link->owner->bclink.acked);
+       __tipc_link_xmit(link, buf);
 }
 
 /*
@@ -891,293 +856,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
        kfree_skb(buf);
 }
 
-/*
- * tipc_link_names_xmit - send name table entries to new neighbor
- *
- * Send routine for bulk delivery of name table messages when contact
- * with a new neighbor occurs. No link congestion checking is performed
- * because name table messages *must* be delivered. The messages must be
- * small enough not to require fragmentation.
- * Called without any locks held.
- */
-void tipc_link_names_xmit(struct list_head *message_list, u32 dest)
-{
-       struct tipc_node *n_ptr;
-       struct tipc_link *l_ptr;
-       struct sk_buff *buf;
-       struct sk_buff *temp_buf;
-
-       if (list_empty(message_list))
-               return;
-
-       n_ptr = tipc_node_find(dest);
-       if (n_ptr) {
-               tipc_node_lock(n_ptr);
-               l_ptr = n_ptr->active_links[0];
-               if (l_ptr) {
-                       /* convert circular list to linear list */
-                       ((struct sk_buff *)message_list->prev)->next = NULL;
-                       link_add_chain_to_outqueue(l_ptr,
-                               (struct sk_buff *)message_list->next, 0);
-                       tipc_link_push_queue(l_ptr);
-                       INIT_LIST_HEAD(message_list);
-               }
-               tipc_node_unlock(n_ptr);
-       }
-
-       /* discard the messages if they couldn't be sent */
-       list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) {
-               list_del((struct list_head *)buf);
-               kfree_skb(buf);
-       }
-}
-
-/*
- * tipc_link_xmit_fast: Entry for data messages where the
- * destination link is known and the header is complete,
- * inclusive total message length. Very time critical.
- * Link is locked. Returns user data length.
- */
-static int tipc_link_xmit_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
-                              u32 *used_max_pkt)
-{
-       struct tipc_msg *msg = buf_msg(buf);
-       int res = msg_data_sz(msg);
-
-       if (likely(!link_congested(l_ptr))) {
-               if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
-                       link_add_to_outqueue(l_ptr, buf, msg);
-                       tipc_bearer_send(l_ptr->bearer_id, buf,
-                                        &l_ptr->media_addr);
-                       l_ptr->unacked_window = 0;
-                       return res;
-               }
-               else
-                       *used_max_pkt = l_ptr->max_pkt;
-       }
-       return __tipc_link_xmit(l_ptr, buf);  /* All other cases */
-}
-
-/*
- * tipc_link_iovec_xmit_fast: Entry for messages where the
- * destination processor is known and the header is complete,
- * except for total message length.
- * Returns user data length or errno.
- */
-int tipc_link_iovec_xmit_fast(struct tipc_port *sender,
-                             struct iovec const *msg_sect,
-                             unsigned int len, u32 destaddr)
-{
-       struct tipc_msg *hdr = &sender->phdr;
-       struct tipc_link *l_ptr;
-       struct sk_buff *buf;
-       struct tipc_node *node;
-       int res;
-       u32 selector = msg_origport(hdr) & 1;
-
-again:
-       /*
-        * Try building message using port's max_pkt hint.
-        * (Must not hold any locks while building message.)
-        */
-       res = tipc_msg_build(hdr, msg_sect, len, sender->max_pkt, &buf);
-       /* Exit if build request was invalid */
-       if (unlikely(res < 0))
-               return res;
-
-       node = tipc_node_find(destaddr);
-       if (likely(node)) {
-               tipc_node_lock(node);
-               l_ptr = node->active_links[selector];
-               if (likely(l_ptr)) {
-                       if (likely(buf)) {
-                               res = tipc_link_xmit_fast(l_ptr, buf,
-                                                         &sender->max_pkt);
-exit:
-                               tipc_node_unlock(node);
-                               return res;
-                       }
-
-                       /* Exit if link (or bearer) is congested */
-                       if (link_congested(l_ptr)) {
-                               res = link_schedule_port(l_ptr,
-                                                        sender->ref, res);
-                               goto exit;
-                       }
-
-                       /*
-                        * Message size exceeds max_pkt hint; update hint,
-                        * then re-try fast path or fragment the message
-                        */
-                       sender->max_pkt = l_ptr->max_pkt;
-                       tipc_node_unlock(node);
-
-
-                       if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt)
-                               goto again;
-
-                       return tipc_link_iovec_long_xmit(sender, msg_sect,
-                                                        len, destaddr);
-               }
-               tipc_node_unlock(node);
-       }
-
-       /* Couldn't find a link to the destination node */
-       kfree_skb(buf);
-       tipc_port_iovec_reject(sender, hdr, msg_sect, len, TIPC_ERR_NO_NODE);
-       return -ENETUNREACH;
-}
-
-/*
- * tipc_link_iovec_long_xmit(): Entry for long messages where the
- * destination node is known and the header is complete,
- * inclusive total message length.
- * Link and bearer congestion status have been checked to be ok,
- * and are ignored if they change.
- *
- * Note that fragments do not use the full link MTU so that they won't have
- * to undergo refragmentation if link changeover causes them to be sent
- * over another link with an additional tunnel header added as prefix.
- * (Refragmentation will still occur if the other link has a smaller MTU.)
- *
- * Returns user data length or errno.
- */
-static int tipc_link_iovec_long_xmit(struct tipc_port *sender,
-                                    struct iovec const *msg_sect,
-                                    unsigned int len, u32 destaddr)
-{
-       struct tipc_link *l_ptr;
-       struct tipc_node *node;
-       struct tipc_msg *hdr = &sender->phdr;
-       u32 dsz = len;
-       u32 max_pkt, fragm_sz, rest;
-       struct tipc_msg fragm_hdr;
-       struct sk_buff *buf, *buf_chain, *prev;
-       u32 fragm_crs, fragm_rest, hsz, sect_rest;
-       const unchar __user *sect_crs;
-       int curr_sect;
-       u32 fragm_no;
-       int res = 0;
-
-again:
-       fragm_no = 1;
-       max_pkt = sender->max_pkt - INT_H_SIZE;
-               /* leave room for tunnel header in case of link changeover */
-       fragm_sz = max_pkt - INT_H_SIZE;
-               /* leave room for fragmentation header in each fragment */
-       rest = dsz;
-       fragm_crs = 0;
-       fragm_rest = 0;
-       sect_rest = 0;
-       sect_crs = NULL;
-       curr_sect = -1;
-
-       /* Prepare reusable fragment header */
-       tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
-                INT_H_SIZE, msg_destnode(hdr));
-       msg_set_size(&fragm_hdr, max_pkt);
-       msg_set_fragm_no(&fragm_hdr, 1);
-
-       /* Prepare header of first fragment */
-       buf_chain = buf = tipc_buf_acquire(max_pkt);
-       if (!buf)
-               return -ENOMEM;
-       buf->next = NULL;
-       skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
-       hsz = msg_hdr_sz(hdr);
-       skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
-
-       /* Chop up message */
-       fragm_crs = INT_H_SIZE + hsz;
-       fragm_rest = fragm_sz - hsz;
-
-       do {            /* For all sections */
-               u32 sz;
-
-               if (!sect_rest) {
-                       sect_rest = msg_sect[++curr_sect].iov_len;
-                       sect_crs = msg_sect[curr_sect].iov_base;
-               }
-
-               if (sect_rest < fragm_rest)
-                       sz = sect_rest;
-               else
-                       sz = fragm_rest;
-
-               if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
-                       res = -EFAULT;
-error:
-                       kfree_skb_list(buf_chain);
-                       return res;
-               }
-               sect_crs += sz;
-               sect_rest -= sz;
-               fragm_crs += sz;
-               fragm_rest -= sz;
-               rest -= sz;
-
-               if (!fragm_rest && rest) {
-
-                       /* Initiate new fragment: */
-                       if (rest <= fragm_sz) {
-                               fragm_sz = rest;
-                               msg_set_type(&fragm_hdr, LAST_FRAGMENT);
-                       } else {
-                               msg_set_type(&fragm_hdr, FRAGMENT);
-                       }
-                       msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
-                       msg_set_fragm_no(&fragm_hdr, ++fragm_no);
-                       prev = buf;
-                       buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
-                       if (!buf) {
-                               res = -ENOMEM;
-                               goto error;
-                       }
-
-                       buf->next = NULL;
-                       prev->next = buf;
-                       skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
-                       fragm_crs = INT_H_SIZE;
-                       fragm_rest = fragm_sz;
-               }
-       } while (rest > 0);
-
-       /*
-        * Now we have a buffer chain. Select a link and check
-        * that packet size is still OK
-        */
-       node = tipc_node_find(destaddr);
-       if (likely(node)) {
-               tipc_node_lock(node);
-               l_ptr = node->active_links[sender->ref & 1];
-               if (!l_ptr) {
-                       tipc_node_unlock(node);
-                       goto reject;
-               }
-               if (l_ptr->max_pkt < max_pkt) {
-                       sender->max_pkt = l_ptr->max_pkt;
-                       tipc_node_unlock(node);
-                       kfree_skb_list(buf_chain);
-                       goto again;
-               }
-       } else {
-reject:
-               kfree_skb_list(buf_chain);
-               tipc_port_iovec_reject(sender, hdr, msg_sect, len,
-                                      TIPC_ERR_NO_NODE);
-               return -ENETUNREACH;
-       }
-
-       /* Append chain of fragments to send queue & send them */
-       l_ptr->long_msg_seq_no++;
-       link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
-       l_ptr->stats.sent_fragments += fragm_no;
-       l_ptr->stats.sent_fragmented++;
-       tipc_link_push_queue(l_ptr);
-       tipc_node_unlock(node);
-       return dsz;
-}
-
 /*
  * tipc_link_push_packet: Push one unsent packet to the media
  */
@@ -1238,7 +916,7 @@ static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
                        tipc_bearer_send(l_ptr->bearer_id, buf,
                                         &l_ptr->media_addr);
                        if (msg_user(msg) == MSG_BUNDLER)
-                               msg_set_type(msg, CLOSED_MSG);
+                               msg_set_type(msg, BUNDLE_CLOSED);
                        l_ptr->next_out = buf->next;
                        return 0;
                }
@@ -1527,11 +1205,6 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
                if (unlikely(!list_empty(&l_ptr->waiting_ports)))
                        tipc_link_wakeup_ports(l_ptr, 0);
 
-               if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
-                       l_ptr->stats.sent_acks++;
-                       tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
-               }
-
                /* Process the incoming packet */
                if (unlikely(!link_working_working(l_ptr))) {
                        if (msg_user(msg) == LINK_PROTOCOL) {
@@ -1565,57 +1238,19 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
                if (unlikely(l_ptr->oldest_deferred_in))
                        head = link_insert_deferred_queue(l_ptr, head);
 
-               /* Deliver packet/message to correct user: */
-               if (unlikely(msg_user(msg) ==  CHANGEOVER_PROTOCOL)) {
-                       if (!tipc_link_tunnel_rcv(n_ptr, &buf)) {
-                               tipc_node_unlock(n_ptr);
-                               continue;
-                       }
-                       msg = buf_msg(buf);
-               } else if (msg_user(msg) == MSG_FRAGMENTER) {
-                       l_ptr->stats.recv_fragments++;
-                       if (tipc_buf_append(&l_ptr->reasm_buf, &buf)) {
-                               l_ptr->stats.recv_fragmented++;
-                               msg = buf_msg(buf);
-                       } else {
-                               if (!l_ptr->reasm_buf)
-                                       tipc_link_reset(l_ptr);
-                               tipc_node_unlock(n_ptr);
-                               continue;
-                       }
+               if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
+                       l_ptr->stats.sent_acks++;
+                       tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
                }
 
-               switch (msg_user(msg)) {
-               case TIPC_LOW_IMPORTANCE:
-               case TIPC_MEDIUM_IMPORTANCE:
-               case TIPC_HIGH_IMPORTANCE:
-               case TIPC_CRITICAL_IMPORTANCE:
+               if (tipc_link_prepare_input(l_ptr, &buf)) {
                        tipc_node_unlock(n_ptr);
-                       tipc_sk_rcv(buf);
                        continue;
-               case MSG_BUNDLER:
-                       l_ptr->stats.recv_bundles++;
-                       l_ptr->stats.recv_bundled += msg_msgcnt(msg);
-                       tipc_node_unlock(n_ptr);
-                       tipc_link_bundle_rcv(buf);
-                       continue;
-               case NAME_DISTRIBUTOR:
-                       n_ptr->bclink.recv_permitted = true;
-                       tipc_node_unlock(n_ptr);
-                       tipc_named_rcv(buf);
-                       continue;
-               case CONN_MANAGER:
-                       tipc_node_unlock(n_ptr);
-                       tipc_port_proto_rcv(buf);
-                       continue;
-               case BCAST_PROTOCOL:
-                       tipc_link_sync_rcv(n_ptr, buf);
-                       break;
-               default:
-                       kfree_skb(buf);
-                       break;
                }
                tipc_node_unlock(n_ptr);
+               msg = buf_msg(buf);
+               if (tipc_link_input(l_ptr, buf) != 0)
+                       goto discard;
                continue;
 unlock_discard:
                tipc_node_unlock(n_ptr);
@@ -1624,6 +1259,80 @@ discard:
        }
 }
 
+/**
+ * tipc_link_prepare_input - process TIPC link messages
+ *
+ * returns nonzero if the message was consumed
+ *
+ * Node lock must be held
+ */
+static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf)
+{
+       struct tipc_node *n;
+       struct tipc_msg *msg;
+       int res = -EINVAL;
+
+       n = l->owner;
+       msg = buf_msg(*buf);
+       switch (msg_user(msg)) {
+       case CHANGEOVER_PROTOCOL:
+               if (tipc_link_tunnel_rcv(n, buf))
+                       res = 0;
+               break;
+       case MSG_FRAGMENTER:
+               l->stats.recv_fragments++;
+               if (tipc_buf_append(&l->reasm_buf, buf)) {
+                       l->stats.recv_fragmented++;
+                       res = 0;
+               } else if (!l->reasm_buf) {
+                       tipc_link_reset(l);
+               }
+               break;
+       case MSG_BUNDLER:
+               l->stats.recv_bundles++;
+               l->stats.recv_bundled += msg_msgcnt(msg);
+               res = 0;
+               break;
+       case NAME_DISTRIBUTOR:
+               n->bclink.recv_permitted = true;
+               res = 0;
+               break;
+       case BCAST_PROTOCOL:
+               tipc_link_sync_rcv(n, *buf);
+               break;
+       default:
+               res = 0;
+       }
+       return res;
+}
+/**
+ * tipc_link_input - Deliver message too higher layers
+ */
+static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf)
+{
+       struct tipc_msg *msg = buf_msg(buf);
+       int res = 0;
+
+       switch (msg_user(msg)) {
+       case TIPC_LOW_IMPORTANCE:
+       case TIPC_MEDIUM_IMPORTANCE:
+       case TIPC_HIGH_IMPORTANCE:
+       case TIPC_CRITICAL_IMPORTANCE:
+       case CONN_MANAGER:
+               tipc_sk_rcv(buf);
+               break;
+       case NAME_DISTRIBUTOR:
+               tipc_named_rcv(buf);
+               break;
+       case MSG_BUNDLER:
+               tipc_link_bundle_rcv(buf);
+               break;
+       default:
+               res = -EINVAL;
+       }
+       return res;
+}
+
 /**
  * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
  *
@@ -2217,6 +1926,7 @@ void tipc_link_bundle_rcv(struct sk_buff *buf)
        u32 msgcount = msg_msgcnt(buf_msg(buf));
        u32 pos = INT_H_SIZE;
        struct sk_buff *obuf;
+       struct tipc_msg *omsg;
 
        while (msgcount--) {
                obuf = buf_extract(buf, pos);
@@ -2224,82 +1934,18 @@ void tipc_link_bundle_rcv(struct sk_buff *buf)
                        pr_warn("Link unable to unbundle message(s)\n");
                        break;
                }
-               pos += align(msg_size(buf_msg(obuf)));
-               tipc_net_route_msg(obuf);
-       }
-       kfree_skb(buf);
-}
-
-/*
- *  Fragmentation/defragmentation:
- */
-
-/*
- * tipc_link_frag_xmit: Entry for buffers needing fragmentation.
- * The buffer is complete, inclusive total message length.
- * Returns user data length.
- */
-static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf)
-{
-       struct sk_buff *buf_chain = NULL;
-       struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain;
-       struct tipc_msg *inmsg = buf_msg(buf);
-       struct tipc_msg fragm_hdr;
-       u32 insize = msg_size(inmsg);
-       u32 dsz = msg_data_sz(inmsg);
-       unchar *crs = buf->data;
-       u32 rest = insize;
-       u32 pack_sz = l_ptr->max_pkt;
-       u32 fragm_sz = pack_sz - INT_H_SIZE;
-       u32 fragm_no = 0;
-       u32 destaddr;
-
-       if (msg_short(inmsg))
-               destaddr = l_ptr->addr;
-       else
-               destaddr = msg_destnode(inmsg);
-
-       /* Prepare reusable fragment header: */
-       tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
-                INT_H_SIZE, destaddr);
-
-       /* Chop up message: */
-       while (rest > 0) {
-               struct sk_buff *fragm;
-
-               if (rest <= fragm_sz) {
-                       fragm_sz = rest;
-                       msg_set_type(&fragm_hdr, LAST_FRAGMENT);
-               }
-               fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
-               if (fragm == NULL) {
-                       kfree_skb(buf);
-                       kfree_skb_list(buf_chain);
-                       return -ENOMEM;
+               omsg = buf_msg(obuf);
+               pos += align(msg_size(omsg));
+               if (msg_isdata(omsg) || (msg_user(omsg) == CONN_MANAGER)) {
+                       tipc_sk_rcv(obuf);
+               } else if (msg_user(omsg) == NAME_DISTRIBUTOR) {
+                       tipc_named_rcv(obuf);
+               } else {
+                       pr_warn("Illegal bundled msg: %u\n", msg_user(omsg));
+                       kfree_skb(obuf);
                }
-               msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
-               fragm_no++;
-               msg_set_fragm_no(&fragm_hdr, fragm_no);
-               skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
-               skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
-                                              fragm_sz);
-               buf_chain_tail->next = fragm;
-               buf_chain_tail = fragm;
-
-               rest -= fragm_sz;
-               crs += fragm_sz;
-               msg_set_type(&fragm_hdr, FRAGMENT);
        }
        kfree_skb(buf);
-
-       /* Append chain of fragments to send queue & send them */
-       l_ptr->long_msg_seq_no++;
-       link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
-       l_ptr->stats.sent_fragments += fragm_no;
-       l_ptr->stats.sent_fragmented++;
-       tipc_link_push_queue(l_ptr);
-
-       return dsz;
 }
 
 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)