tipc: split link outqueue
[cascardo/linux.git] / net / tipc / link.c
index a4cf364..7e0036f 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * net/tipc/link.c: TIPC link code
  *
- * Copyright (c) 1996-2007, 2012-2014, Ericsson AB
+ * Copyright (c) 1996-2007, 2012-2015, Ericsson AB
  * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
  * All rights reserved.
  *
@@ -194,10 +194,10 @@ static void link_timeout(unsigned long data)
        tipc_node_lock(l_ptr->owner);
 
        /* update counters used in statistical profiling of send traffic */
-       l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->outqueue);
+       l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->transmq);
        l_ptr->stats.queue_sz_counts++;
 
-       skb = skb_peek(&l_ptr->outqueue);
+       skb = skb_peek(&l_ptr->transmq);
        if (skb) {
                struct tipc_msg *msg = buf_msg(skb);
                u32 length = msg_size(msg);
@@ -229,7 +229,7 @@ static void link_timeout(unsigned long data)
        /* do all other link processing performed on a periodic basis */
        link_state_event(l_ptr, TIMEOUT_EVT);
 
-       if (l_ptr->next_out)
+       if (skb_queue_len(&l_ptr->backlogq))
                tipc_link_push_packets(l_ptr);
 
        tipc_node_unlock(l_ptr->owner);
@@ -313,8 +313,9 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
        link_init_max_pkt(l_ptr);
 
        l_ptr->next_out_no = 1;
-       __skb_queue_head_init(&l_ptr->outqueue);
-       __skb_queue_head_init(&l_ptr->deferred_queue);
+       __skb_queue_head_init(&l_ptr->transmq);
+       __skb_queue_head_init(&l_ptr->backlogq);
+       __skb_queue_head_init(&l_ptr->deferdq);
        skb_queue_head_init(&l_ptr->wakeupq);
        skb_queue_head_init(&l_ptr->inputq);
        skb_queue_head_init(&l_ptr->namedq);
@@ -344,6 +345,7 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id,
        struct tipc_net *tn = net_generic(net, tipc_net_id);
        struct tipc_link *link;
        struct tipc_node *node;
+       bool del_link;
 
        rcu_read_lock();
        list_for_each_entry_rcu(node, &tn->node_list, list) {
@@ -353,12 +355,13 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id,
                        tipc_node_unlock(node);
                        continue;
                }
+               del_link = !tipc_link_is_up(link) && !link->exp_msg_count;
                tipc_link_reset(link);
                if (del_timer(&link->timer))
                        tipc_link_put(link);
                link->flags |= LINK_STOPPED;
                /* Delete link now, or when failover is finished: */
-               if (shutting_down || !tipc_node_is_up(node))
+               if (shutting_down || !tipc_node_is_up(node) || del_link)
                        tipc_link_delete(link);
                tipc_node_unlock(node);
        }
@@ -398,7 +401,7 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
  */
 void link_prepare_wakeup(struct tipc_link *link)
 {
-       uint pend_qsz = skb_queue_len(&link->outqueue);
+       uint pend_qsz = skb_queue_len(&link->backlogq);
        struct sk_buff *skb, *tmp;
 
        skb_queue_walk_safe(&link->wakeupq, skb, tmp) {
@@ -428,8 +431,9 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
  */
 void tipc_link_purge_queues(struct tipc_link *l_ptr)
 {
-       __skb_queue_purge(&l_ptr->deferred_queue);
-       __skb_queue_purge(&l_ptr->outqueue);
+       __skb_queue_purge(&l_ptr->deferdq);
+       __skb_queue_purge(&l_ptr->transmq);
+       __skb_queue_purge(&l_ptr->backlogq);
        tipc_link_reset_fragments(l_ptr);
 }
 
@@ -462,14 +466,15 @@ void tipc_link_reset(struct tipc_link *l_ptr)
        }
 
        /* Clean up all queues, except inputq: */
-       __skb_queue_purge(&l_ptr->outqueue);
-       __skb_queue_purge(&l_ptr->deferred_queue);
-       skb_queue_splice_init(&l_ptr->wakeupq, &l_ptr->inputq);
-       if (!skb_queue_empty(&l_ptr->inputq))
+       __skb_queue_purge(&l_ptr->transmq);
+       __skb_queue_purge(&l_ptr->backlogq);
+       __skb_queue_purge(&l_ptr->deferdq);
+       if (!owner->inputq)
+               owner->inputq = &l_ptr->inputq;
+       skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);
+       if (!skb_queue_empty(owner->inputq))
                owner->action_flags |= TIPC_MSG_EVT;
-       owner->inputq = &l_ptr->inputq;
-       l_ptr->next_out = NULL;
-       l_ptr->unacked_window = 0;
+       l_ptr->rcv_unacked = 0;
        l_ptr->checkpoint = 1;
        l_ptr->next_out_no = 1;
        l_ptr->fsm_msg_cnt = 0;
@@ -739,54 +744,51 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
                     struct sk_buff_head *list)
 {
        struct tipc_msg *msg = buf_msg(skb_peek(list));
-       uint psz = msg_size(msg);
-       uint sndlim = link->queue_limit[0];
+       unsigned int maxwin = link->window;
        uint imp = tipc_msg_tot_importance(msg);
        uint mtu = link->max_pkt;
        uint ack = mod(link->next_in_no - 1);
        uint seqno = link->next_out_no;
        uint bc_last_in = link->owner->bclink.last_in;
        struct tipc_media_addr *addr = &link->media_addr;
-       struct sk_buff_head *outqueue = &link->outqueue;
+       struct sk_buff_head *transmq = &link->transmq;
+       struct sk_buff_head *backlogq = &link->backlogq;
        struct sk_buff *skb, *tmp;
 
        /* Match queue limits against msg importance: */
-       if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp]))
+       if (unlikely(skb_queue_len(backlogq) >= link->queue_limit[imp]))
                return tipc_link_cong(link, list);
 
        /* Has valid packet limit been used ? */
-       if (unlikely(psz > mtu)) {
+       if (unlikely(msg_size(msg) > mtu)) {
                __skb_queue_purge(list);
                return -EMSGSIZE;
        }
 
-       /* Prepare each packet for sending, and add to outqueue: */
+       /* Prepare each packet for sending, and add to relevant queue: */
        skb_queue_walk_safe(list, skb, tmp) {
                __skb_unlink(skb, list);
                msg = buf_msg(skb);
-               msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
+               msg_set_seqno(msg, seqno);
+               msg_set_ack(msg, ack);
                msg_set_bcast_ack(msg, bc_last_in);
 
-               if (skb_queue_len(outqueue) < sndlim) {
-                       __skb_queue_tail(outqueue, skb);
-                       tipc_bearer_send(net, link->bearer_id,
-                                        skb, addr);
-                       link->next_out = NULL;
-                       link->unacked_window = 0;
-               } else if (tipc_msg_bundle(outqueue, skb, mtu)) {
+               if (likely(skb_queue_len(transmq) < maxwin)) {
+                       __skb_queue_tail(transmq, skb);
+                       tipc_bearer_send(net, link->bearer_id, skb, addr);
+                       link->rcv_unacked = 0;
+                       seqno++;
+                       continue;
+               }
+               if (tipc_msg_bundle(skb_peek_tail(backlogq), skb, mtu)) {
                        link->stats.sent_bundled++;
                        continue;
-               } else if (tipc_msg_make_bundle(outqueue, skb, mtu,
-                                               link->addr)) {
+               }
+               if (tipc_msg_make_bundle(&skb, mtu, link->addr)) {
                        link->stats.sent_bundled++;
                        link->stats.sent_bundles++;
-                       if (!link->next_out)
-                               link->next_out = skb_peek_tail(outqueue);
-               } else {
-                       __skb_queue_tail(outqueue, skb);
-                       if (!link->next_out)
-                               link->next_out = skb;
                }
+               __skb_queue_tail(backlogq, skb);
                seqno++;
        }
        link->next_out_no = seqno;
@@ -892,14 +894,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
        kfree_skb(buf);
 }
 
-struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
-                                   const struct sk_buff *skb)
-{
-       if (skb_queue_is_last(list, skb))
-               return NULL;
-       return skb->next;
-}
-
 /*
  * tipc_link_push_packets - push unsent packets to bearer
  *
@@ -908,30 +902,23 @@ struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
  *
  * Called with node locked
  */
-void tipc_link_push_packets(struct tipc_link *l_ptr)
+void tipc_link_push_packets(struct tipc_link *link)
 {
-       struct sk_buff_head *outqueue = &l_ptr->outqueue;
-       struct sk_buff *skb = l_ptr->next_out;
+       struct sk_buff *skb;
        struct tipc_msg *msg;
-       u32 next, first;
+       unsigned int ack = mod(link->next_in_no - 1);
 
-       skb_queue_walk_from(outqueue, skb) {
-               msg = buf_msg(skb);
-               next = msg_seqno(msg);
-               first = buf_seqno(skb_peek(outqueue));
-
-               if (mod(next - first) < l_ptr->queue_limit[0]) {
-                       msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
-                       msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
-                       if (msg_user(msg) == MSG_BUNDLER)
-                               TIPC_SKB_CB(skb)->bundling = false;
-                       tipc_bearer_send(l_ptr->owner->net,
-                                        l_ptr->bearer_id, skb,
-                                        &l_ptr->media_addr);
-                       l_ptr->next_out = tipc_skb_queue_next(outqueue, skb);
-               } else {
+       while (skb_queue_len(&link->transmq) < link->window) {
+               skb = __skb_dequeue(&link->backlogq);
+               if (!skb)
                        break;
-               }
+               msg = buf_msg(skb);
+               msg_set_ack(msg, ack);
+               msg_set_bcast_ack(msg, link->owner->bclink.last_in);
+               link->rcv_unacked = 0;
+               __skb_queue_tail(&link->transmq, skb);
+               tipc_bearer_send(link->owner->net, link->bearer_id,
+                                skb, &link->media_addr);
        }
 }
 
@@ -1018,8 +1005,8 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
                l_ptr->stale_count = 1;
        }
 
-       skb_queue_walk_from(&l_ptr->outqueue, skb) {
-               if (!retransmits || skb == l_ptr->next_out)
+       skb_queue_walk_from(&l_ptr->transmq, skb) {
+               if (!retransmits)
                        break;
                msg = buf_msg(skb);
                msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
@@ -1036,67 +1023,12 @@ static void link_retrieve_defq(struct tipc_link *link,
 {
        u32 seq_no;
 
-       if (skb_queue_empty(&link->deferred_queue))
+       if (skb_queue_empty(&link->deferdq))
                return;
 
-       seq_no = buf_seqno(skb_peek(&link->deferred_queue));
+       seq_no = buf_seqno(skb_peek(&link->deferdq));
        if (seq_no == mod(link->next_in_no))
-               skb_queue_splice_tail_init(&link->deferred_queue, list);
-}
-
-/**
- * link_recv_buf_validate - validate basic format of received message
- *
- * This routine ensures a TIPC message has an acceptable header, and at least
- * as much data as the header indicates it should.  The routine also ensures
- * that the entire message header is stored in the main fragment of the message
- * buffer, to simplify future access to message header fields.
- *
- * Note: Having extra info present in the message header or data areas is OK.
- * TIPC will ignore the excess, under the assumption that it is optional info
- * introduced by a later release of the protocol.
- */
-static int link_recv_buf_validate(struct sk_buff *buf)
-{
-       static u32 min_data_hdr_size[8] = {
-               SHORT_H_SIZE, MCAST_H_SIZE, NAMED_H_SIZE, BASIC_H_SIZE,
-               MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
-               };
-
-       struct tipc_msg *msg;
-       u32 tipc_hdr[2];
-       u32 size;
-       u32 hdr_size;
-       u32 min_hdr_size;
-
-       /* If this packet comes from the defer queue, the skb has already
-        * been validated
-        */
-       if (unlikely(TIPC_SKB_CB(buf)->deferred))
-               return 1;
-
-       if (unlikely(buf->len < MIN_H_SIZE))
-               return 0;
-
-       msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
-       if (msg == NULL)
-               return 0;
-
-       if (unlikely(msg_version(msg) != TIPC_VERSION))
-               return 0;
-
-       size = msg_size(msg);
-       hdr_size = msg_hdr_sz(msg);
-       min_hdr_size = msg_isdata(msg) ?
-               min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;
-
-       if (unlikely((hdr_size < min_hdr_size) ||
-                    (size < hdr_size) ||
-                    (buf->len < size) ||
-                    (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
-               return 0;
-
-       return pskb_may_pull(buf, hdr_size);
+               skb_queue_splice_tail_init(&link->deferdq, list);
 }
 
 /**
@@ -1124,16 +1056,11 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
 
        while ((skb = __skb_dequeue(&head))) {
                /* Ensure message is well-formed */
-               if (unlikely(!link_recv_buf_validate(skb)))
-                       goto discard;
-
-               /* Ensure message data is a single contiguous unit */
-               if (unlikely(skb_linearize(skb)))
+               if (unlikely(!tipc_msg_validate(skb)))
                        goto discard;
 
                /* Handle arrival of a non-unicast link message */
                msg = buf_msg(skb);
-
                if (unlikely(msg_non_seq(msg))) {
                        if (msg_user(msg) ==  LINK_CONFIG)
                                tipc_disc_rcv(net, skb, b_ptr);
@@ -1174,21 +1101,20 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
                ackd = msg_ack(msg);
 
                /* Release acked messages */
-               if (n_ptr->bclink.recv_permitted)
+               if (unlikely(n_ptr->bclink.acked != msg_bcast_ack(msg)))
                        tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
 
                released = 0;
-               skb_queue_walk_safe(&l_ptr->outqueue, skb1, tmp) {
-                       if (skb1 == l_ptr->next_out ||
-                           more(buf_seqno(skb1), ackd))
+               skb_queue_walk_safe(&l_ptr->transmq, skb1, tmp) {
+                       if (more(buf_seqno(skb1), ackd))
                                break;
-                        __skb_unlink(skb1, &l_ptr->outqueue);
+                        __skb_unlink(skb1, &l_ptr->transmq);
                         kfree_skb(skb1);
                         released = 1;
                }
 
                /* Try sending any messages link endpoint has pending */
-               if (unlikely(l_ptr->next_out))
+               if (unlikely(skb_queue_len(&l_ptr->backlogq)))
                        tipc_link_push_packets(l_ptr);
 
                if (released && !skb_queue_empty(&l_ptr->wakeupq))
@@ -1223,10 +1149,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
                        goto unlock;
                }
                l_ptr->next_in_no++;
-               if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue)))
+               if (unlikely(!skb_queue_empty(&l_ptr->deferdq)))
                        link_retrieve_defq(l_ptr, &head);
-
-               if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
+               if (unlikely(++l_ptr->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
                        l_ptr->stats.sent_acks++;
                        tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
                }
@@ -1393,10 +1318,9 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
                return;
        }
 
-       if (tipc_link_defer_pkt(&l_ptr->deferred_queue, buf)) {
+       if (tipc_link_defer_pkt(&l_ptr->deferdq, buf)) {
                l_ptr->stats.deferred_recv++;
-               TIPC_SKB_CB(buf)->deferred = true;
-               if ((skb_queue_len(&l_ptr->deferred_queue) % 16) == 1)
+               if ((skb_queue_len(&l_ptr->deferdq) % TIPC_MIN_LINK_WIN) == 1)
                        tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
        } else {
                l_ptr->stats.duplicates++;
@@ -1433,11 +1357,11 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
 
                if (!tipc_link_is_up(l_ptr))
                        return;
-               if (l_ptr->next_out)
-                       next_sent = buf_seqno(l_ptr->next_out);
+               if (skb_queue_len(&l_ptr->backlogq))
+                       next_sent = buf_seqno(skb_peek(&l_ptr->backlogq));
                msg_set_next_sent(msg, next_sent);
-               if (!skb_queue_empty(&l_ptr->deferred_queue)) {
-                       u32 rec = buf_seqno(skb_peek(&l_ptr->deferred_queue));
+               if (!skb_queue_empty(&l_ptr->deferdq)) {
+                       u32 rec = buf_seqno(skb_peek(&l_ptr->deferdq));
                        gap = mod(rec - mod(l_ptr->next_in_no));
                }
                msg_set_seq_gap(msg, gap);
@@ -1489,10 +1413,9 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
 
        skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
        buf->priority = TC_PRIO_CONTROL;
-
        tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, buf,
                         &l_ptr->media_addr);
-       l_ptr->unacked_window = 0;
+       l_ptr->rcv_unacked = 0;
        kfree_skb(buf);
 }
 
@@ -1627,7 +1550,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
                }
                if (msg_seq_gap(msg)) {
                        l_ptr->stats.recv_nacks++;
-                       tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->outqueue),
+                       tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq),
                                             msg_seq_gap(msg));
                }
                break;
@@ -1674,7 +1597,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
  */
 void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
 {
-       u32 msgcount = skb_queue_len(&l_ptr->outqueue);
+       int msgcount;
        struct tipc_link *tunnel = l_ptr->owner->active_links[0];
        struct tipc_msg tunnel_hdr;
        struct sk_buff *skb;
@@ -1685,10 +1608,12 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
 
        tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
                      ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
+       skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq);
+       msgcount = skb_queue_len(&l_ptr->transmq);
        msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
        msg_set_msgcnt(&tunnel_hdr, msgcount);
 
-       if (skb_queue_empty(&l_ptr->outqueue)) {
+       if (skb_queue_empty(&l_ptr->transmq)) {
                skb = tipc_buf_acquire(INT_H_SIZE);
                if (skb) {
                        skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
@@ -1704,7 +1629,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
        split_bundles = (l_ptr->owner->active_links[0] !=
                         l_ptr->owner->active_links[1]);
 
-       skb_queue_walk(&l_ptr->outqueue, skb) {
+       skb_queue_walk(&l_ptr->transmq, skb) {
                struct tipc_msg *msg = buf_msg(skb);
 
                if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
@@ -1735,80 +1660,66 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
  * and sequence order is preserved per sender/receiver socket pair.
  * Owner node is locked.
  */
-void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
-                             struct tipc_link *tunnel)
+void tipc_link_dup_queue_xmit(struct tipc_link *link,
+                             struct tipc_link *tnl)
 {
        struct sk_buff *skb;
-       struct tipc_msg tunnel_hdr;
-
-       tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
-                     DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
-       msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue));
-       msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
-       skb_queue_walk(&l_ptr->outqueue, skb) {
+       struct tipc_msg tnl_hdr;
+       struct sk_buff_head *queue = &link->transmq;
+       int mcnt;
+
+       tipc_msg_init(link_own_addr(link), &tnl_hdr, CHANGEOVER_PROTOCOL,
+                     DUPLICATE_MSG, INT_H_SIZE, link->addr);
+       mcnt = skb_queue_len(&link->transmq) + skb_queue_len(&link->backlogq);
+       msg_set_msgcnt(&tnl_hdr, mcnt);
+       msg_set_bearer_id(&tnl_hdr, link->peer_bearer_id);
+
+tunnel_queue:
+       skb_queue_walk(queue, skb) {
                struct sk_buff *outskb;
                struct tipc_msg *msg = buf_msg(skb);
-               u32 length = msg_size(msg);
+               u32 len = msg_size(msg);
 
-               if (msg_user(msg) == MSG_BUNDLER)
-                       msg_set_type(msg, CLOSED_MSG);
-               msg_set_ack(msg, mod(l_ptr->next_in_no - 1));   /* Update */
-               msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
-               msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
-               outskb = tipc_buf_acquire(length + INT_H_SIZE);
+               msg_set_ack(msg, mod(link->next_in_no - 1));
+               msg_set_bcast_ack(msg, link->owner->bclink.last_in);
+               msg_set_size(&tnl_hdr, len + INT_H_SIZE);
+               outskb = tipc_buf_acquire(len + INT_H_SIZE);
                if (outskb == NULL) {
                        pr_warn("%sunable to send duplicate msg\n",
                                link_co_err);
                        return;
                }
-               skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE);
-               skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data,
-                                              length);
-               __tipc_link_xmit_skb(tunnel, outskb);
-               if (!tipc_link_is_up(l_ptr))
+               skb_copy_to_linear_data(outskb, &tnl_hdr, INT_H_SIZE);
+               skb_copy_to_linear_data_offset(outskb, INT_H_SIZE,
+                                              skb->data, len);
+               __tipc_link_xmit_skb(tnl, outskb);
+               if (!tipc_link_is_up(link))
                        return;
        }
-}
-
-/**
- * buf_extract - extracts embedded TIPC message from another message
- * @skb: encapsulating message buffer
- * @from_pos: offset to extract from
- *
- * Returns a new message buffer containing an embedded message.  The
- * encapsulating buffer is left unchanged.
- */
-static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
-{
-       struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
-       u32 size = msg_size(msg);
-       struct sk_buff *eb;
-
-       eb = tipc_buf_acquire(size);
-       if (eb)
-               skb_copy_to_linear_data(eb, msg, size);
-       return eb;
+       if (queue == &link->backlogq)
+               return;
+       queue = &link->backlogq;
+       goto tunnel_queue;
 }
 
 /* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
  * Owner node is locked.
  */
-static void tipc_link_dup_rcv(struct tipc_link *l_ptr,
-                             struct sk_buff *t_buf)
+static void tipc_link_dup_rcv(struct tipc_link *link,
+                             struct sk_buff *skb)
 {
-       struct sk_buff *buf;
+       struct sk_buff *iskb;
+       int pos = 0;
 
-       if (!tipc_link_is_up(l_ptr))
+       if (!tipc_link_is_up(link))
                return;
 
-       buf = buf_extract(t_buf, INT_H_SIZE);
-       if (buf == NULL) {
+       if (!tipc_msg_extract(skb, &iskb, &pos)) {
                pr_warn("%sfailed to extract inner dup pkt\n", link_co_err);
                return;
        }
-
-       /* Add buffer to deferred queue, if applicable: */
-       link_handle_out_of_seq_msg(l_ptr, buf);
+       /* Append buffer to deferred queue, if applicable: */
+       link_handle_out_of_seq_msg(link, iskb);
 }
 
 /*  tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet
@@ -1820,6 +1731,7 @@ static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr,
        struct tipc_msg *t_msg = buf_msg(t_buf);
        struct sk_buff *buf = NULL;
        struct tipc_msg *msg;
+       int pos = 0;
 
        if (tipc_link_is_up(l_ptr))
                tipc_link_reset(l_ptr);
@@ -1831,8 +1743,7 @@ static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr,
        /* Should there be an inner packet? */
        if (l_ptr->exp_msg_count) {
                l_ptr->exp_msg_count--;
-               buf = buf_extract(t_buf, INT_H_SIZE);
-               if (buf == NULL) {
+               if (!tipc_msg_extract(t_buf, &buf, &pos)) {
                        pr_warn("%sno inner failover pkt\n", link_co_err);
                        goto exit;
                }
@@ -1902,6 +1813,8 @@ static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)
 
 void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
 {
+       l_ptr->window = window;
+
        /* Data messages from this node, inclusive FIRST_FRAGM */
        l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
        l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;