23bcc1132365128de97ee2de41c9465b1b5d03ff
[cascardo/linux.git] / net / tipc / link.c
1 /*
2  * net/tipc/link.c: TIPC link code
3  *
4  * Copyright (c) 1996-2007, 2012-2014, Ericsson AB
5  * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "link.h"
39 #include "bcast.h"
40 #include "socket.h"
41 #include "name_distr.h"
42 #include "discover.h"
43 #include "config.h"
44 #include "netlink.h"
45
46 #include <linux/pkt_sched.h>
47
48 /*
49  * Error message prefixes
50  */
51 static const char *link_co_err = "Link changeover error, ";
52 static const char *link_rst_msg = "Resetting link ";
53 static const char *link_unk_evt = "Unknown link event ";
54
55 static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = {
56         [TIPC_NLA_LINK_UNSPEC]          = { .type = NLA_UNSPEC },
57         [TIPC_NLA_LINK_NAME] = {
58                 .type = NLA_STRING,
59                 .len = TIPC_MAX_LINK_NAME
60         },
61         [TIPC_NLA_LINK_MTU]             = { .type = NLA_U32 },
62         [TIPC_NLA_LINK_BROADCAST]       = { .type = NLA_FLAG },
63         [TIPC_NLA_LINK_UP]              = { .type = NLA_FLAG },
64         [TIPC_NLA_LINK_ACTIVE]          = { .type = NLA_FLAG },
65         [TIPC_NLA_LINK_PROP]            = { .type = NLA_NESTED },
66         [TIPC_NLA_LINK_STATS]           = { .type = NLA_NESTED },
67         [TIPC_NLA_LINK_RX]              = { .type = NLA_U32 },
68         [TIPC_NLA_LINK_TX]              = { .type = NLA_U32 }
69 };
70
71 /* Properties valid for media, bearar and link */
72 static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
73         [TIPC_NLA_PROP_UNSPEC]          = { .type = NLA_UNSPEC },
74         [TIPC_NLA_PROP_PRIO]            = { .type = NLA_U32 },
75         [TIPC_NLA_PROP_TOL]             = { .type = NLA_U32 },
76         [TIPC_NLA_PROP_WIN]             = { .type = NLA_U32 }
77 };
78
79 /*
80  * Out-of-range value for link session numbers
81  */
82 #define INVALID_SESSION 0x10000
83
84 /*
85  * Link state events:
86  */
87 #define  STARTING_EVT    856384768      /* link processing trigger */
88 #define  TRAFFIC_MSG_EVT 560815u        /* rx'd ??? */
89 #define  TIMEOUT_EVT     560817u        /* link timer expired */
90
91 /*
92  * The following two 'message types' is really just implementation
93  * data conveniently stored in the message header.
94  * They must not be considered part of the protocol
95  */
96 #define OPEN_MSG   0
97 #define CLOSED_MSG 1
98
99 /*
100  * State value stored in 'exp_msg_count'
101  */
102 #define START_CHANGEOVER 100000u
103
104 static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
105                                        struct sk_buff *buf);
106 static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf);
107 static int  tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
108                                  struct sk_buff **buf);
109 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
110 static void link_state_event(struct tipc_link *l_ptr, u32 event);
111 static void link_reset_statistics(struct tipc_link *l_ptr);
112 static void link_print(struct tipc_link *l_ptr, const char *str);
113 static void tipc_link_sync_xmit(struct tipc_link *l);
114 static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
115 static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf);
116 static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf);
117
118 /*
119  *  Simple link routines
120  */
121 static unsigned int align(unsigned int i)
122 {
123         return (i + 3) & ~3u;
124 }
125
126 static void link_init_max_pkt(struct tipc_link *l_ptr)
127 {
128         struct tipc_bearer *b_ptr;
129         u32 max_pkt;
130
131         rcu_read_lock();
132         b_ptr = rcu_dereference_rtnl(bearer_list[l_ptr->bearer_id]);
133         if (!b_ptr) {
134                 rcu_read_unlock();
135                 return;
136         }
137         max_pkt = (b_ptr->mtu & ~3);
138         rcu_read_unlock();
139
140         if (max_pkt > MAX_MSG_SIZE)
141                 max_pkt = MAX_MSG_SIZE;
142
143         l_ptr->max_pkt_target = max_pkt;
144         if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT)
145                 l_ptr->max_pkt = l_ptr->max_pkt_target;
146         else
147                 l_ptr->max_pkt = MAX_PKT_DEFAULT;
148
149         l_ptr->max_pkt_probes = 0;
150 }
151
152 /*
153  *  Simple non-static link routines (i.e. referenced outside this file)
154  */
155 int tipc_link_is_up(struct tipc_link *l_ptr)
156 {
157         if (!l_ptr)
158                 return 0;
159         return link_working_working(l_ptr) || link_working_unknown(l_ptr);
160 }
161
162 int tipc_link_is_active(struct tipc_link *l_ptr)
163 {
164         return  (l_ptr->owner->active_links[0] == l_ptr) ||
165                 (l_ptr->owner->active_links[1] == l_ptr);
166 }
167
168 /**
169  * link_timeout - handle expiration of link timer
170  * @l_ptr: pointer to link
171  */
172 static void link_timeout(struct tipc_link *l_ptr)
173 {
174         struct sk_buff *skb;
175
176         tipc_node_lock(l_ptr->owner);
177
178         /* update counters used in statistical profiling of send traffic */
179         l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->outqueue);
180         l_ptr->stats.queue_sz_counts++;
181
182         skb = skb_peek(&l_ptr->outqueue);
183         if (skb) {
184                 struct tipc_msg *msg = buf_msg(skb);
185                 u32 length = msg_size(msg);
186
187                 if ((msg_user(msg) == MSG_FRAGMENTER) &&
188                     (msg_type(msg) == FIRST_FRAGMENT)) {
189                         length = msg_size(msg_get_wrapped(msg));
190                 }
191                 if (length) {
192                         l_ptr->stats.msg_lengths_total += length;
193                         l_ptr->stats.msg_length_counts++;
194                         if (length <= 64)
195                                 l_ptr->stats.msg_length_profile[0]++;
196                         else if (length <= 256)
197                                 l_ptr->stats.msg_length_profile[1]++;
198                         else if (length <= 1024)
199                                 l_ptr->stats.msg_length_profile[2]++;
200                         else if (length <= 4096)
201                                 l_ptr->stats.msg_length_profile[3]++;
202                         else if (length <= 16384)
203                                 l_ptr->stats.msg_length_profile[4]++;
204                         else if (length <= 32768)
205                                 l_ptr->stats.msg_length_profile[5]++;
206                         else
207                                 l_ptr->stats.msg_length_profile[6]++;
208                 }
209         }
210
211         /* do all other link processing performed on a periodic basis */
212         link_state_event(l_ptr, TIMEOUT_EVT);
213
214         if (l_ptr->next_out)
215                 tipc_link_push_packets(l_ptr);
216
217         tipc_node_unlock(l_ptr->owner);
218 }
219
220 static void link_set_timer(struct tipc_link *l_ptr, u32 time)
221 {
222         k_start_timer(&l_ptr->timer, time);
223 }
224
225 /**
226  * tipc_link_create - create a new link
227  * @n_ptr: pointer to associated node
228  * @b_ptr: pointer to associated bearer
229  * @media_addr: media address to use when sending messages over link
230  *
231  * Returns pointer to link.
232  */
233 struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
234                                    struct tipc_bearer *b_ptr,
235                                    const struct tipc_media_addr *media_addr)
236 {
237         struct tipc_link *l_ptr;
238         struct tipc_msg *msg;
239         char *if_name;
240         char addr_string[16];
241         u32 peer = n_ptr->addr;
242
243         if (n_ptr->link_cnt >= MAX_BEARERS) {
244                 tipc_addr_string_fill(addr_string, n_ptr->addr);
245                 pr_err("Attempt to establish %uth link to %s. Max %u allowed.\n",
246                         n_ptr->link_cnt, addr_string, MAX_BEARERS);
247                 return NULL;
248         }
249
250         if (n_ptr->links[b_ptr->identity]) {
251                 tipc_addr_string_fill(addr_string, n_ptr->addr);
252                 pr_err("Attempt to establish second link on <%s> to %s\n",
253                        b_ptr->name, addr_string);
254                 return NULL;
255         }
256
257         l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
258         if (!l_ptr) {
259                 pr_warn("Link creation failed, no memory\n");
260                 return NULL;
261         }
262
263         l_ptr->addr = peer;
264         if_name = strchr(b_ptr->name, ':') + 1;
265         sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:unknown",
266                 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
267                 tipc_node(tipc_own_addr),
268                 if_name,
269                 tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
270                 /* note: peer i/f name is updated by reset/activate message */
271         memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
272         l_ptr->owner = n_ptr;
273         l_ptr->checkpoint = 1;
274         l_ptr->peer_session = INVALID_SESSION;
275         l_ptr->bearer_id = b_ptr->identity;
276         link_set_supervision_props(l_ptr, b_ptr->tolerance);
277         l_ptr->state = RESET_UNKNOWN;
278
279         l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
280         msg = l_ptr->pmsg;
281         tipc_msg_init(msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, l_ptr->addr);
282         msg_set_size(msg, sizeof(l_ptr->proto_msg));
283         msg_set_session(msg, (tipc_random & 0xffff));
284         msg_set_bearer_id(msg, b_ptr->identity);
285         strcpy((char *)msg_data(msg), if_name);
286
287         l_ptr->priority = b_ptr->priority;
288         tipc_link_set_queue_limits(l_ptr, b_ptr->window);
289
290         l_ptr->net_plane = b_ptr->net_plane;
291         link_init_max_pkt(l_ptr);
292
293         l_ptr->next_out_no = 1;
294         __skb_queue_head_init(&l_ptr->outqueue);
295         __skb_queue_head_init(&l_ptr->deferred_queue);
296         skb_queue_head_init(&l_ptr->waiting_sks);
297
298         link_reset_statistics(l_ptr);
299
300         tipc_node_attach_link(n_ptr, l_ptr);
301
302         k_init_timer(&l_ptr->timer, (Handler)link_timeout,
303                      (unsigned long)l_ptr);
304
305         link_state_event(l_ptr, STARTING_EVT);
306
307         return l_ptr;
308 }
309
310 void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
311 {
312         struct tipc_link *l_ptr;
313         struct tipc_node *n_ptr;
314
315         rcu_read_lock();
316         list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
317                 tipc_node_lock(n_ptr);
318                 l_ptr = n_ptr->links[bearer_id];
319                 if (l_ptr) {
320                         tipc_link_reset(l_ptr);
321                         if (shutting_down || !tipc_node_is_up(n_ptr)) {
322                                 tipc_node_detach_link(l_ptr->owner, l_ptr);
323                                 tipc_link_reset_fragments(l_ptr);
324                                 tipc_node_unlock(n_ptr);
325
326                                 /* Nobody else can access this link now: */
327                                 del_timer_sync(&l_ptr->timer);
328                                 kfree(l_ptr);
329                         } else {
330                                 /* Detach/delete when failover is finished: */
331                                 l_ptr->flags |= LINK_STOPPED;
332                                 tipc_node_unlock(n_ptr);
333                                 del_timer_sync(&l_ptr->timer);
334                         }
335                         continue;
336                 }
337                 tipc_node_unlock(n_ptr);
338         }
339         rcu_read_unlock();
340 }
341
342 /**
343  * link_schedule_user - schedule user for wakeup after congestion
344  * @link: congested link
345  * @oport: sending port
346  * @chain_sz: size of buffer chain that was attempted sent
347  * @imp: importance of message attempted sent
348  * Create pseudo msg to send back to user when congestion abates
349  */
350 static bool link_schedule_user(struct tipc_link *link, u32 oport,
351                                uint chain_sz, uint imp)
352 {
353         struct sk_buff *buf;
354
355         buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, tipc_own_addr,
356                               tipc_own_addr, oport, 0, 0);
357         if (!buf)
358                 return false;
359         TIPC_SKB_CB(buf)->chain_sz = chain_sz;
360         TIPC_SKB_CB(buf)->chain_imp = imp;
361         skb_queue_tail(&link->waiting_sks, buf);
362         link->stats.link_congs++;
363         return true;
364 }
365
366 /**
367  * link_prepare_wakeup - prepare users for wakeup after congestion
368  * @link: congested link
369  * Move a number of waiting users, as permitted by available space in
370  * the send queue, from link wait queue to node wait queue for wakeup
371  */
372 static void link_prepare_wakeup(struct tipc_link *link)
373 {
374         uint pend_qsz = skb_queue_len(&link->outqueue);
375         struct sk_buff *skb, *tmp;
376
377         skb_queue_walk_safe(&link->waiting_sks, skb, tmp) {
378                 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp])
379                         break;
380                 pend_qsz += TIPC_SKB_CB(skb)->chain_sz;
381                 skb_unlink(skb, &link->waiting_sks);
382                 skb_queue_tail(&link->owner->waiting_sks, skb);
383         }
384 }
385
386 /**
387  * tipc_link_reset_fragments - purge link's inbound message fragments queue
388  * @l_ptr: pointer to link
389  */
390 void tipc_link_reset_fragments(struct tipc_link *l_ptr)
391 {
392         kfree_skb(l_ptr->reasm_buf);
393         l_ptr->reasm_buf = NULL;
394 }
395
396 /**
397  * tipc_link_purge_queues - purge all pkt queues associated with link
398  * @l_ptr: pointer to link
399  */
400 void tipc_link_purge_queues(struct tipc_link *l_ptr)
401 {
402         __skb_queue_purge(&l_ptr->deferred_queue);
403         __skb_queue_purge(&l_ptr->outqueue);
404         tipc_link_reset_fragments(l_ptr);
405 }
406
407 void tipc_link_reset(struct tipc_link *l_ptr)
408 {
409         u32 prev_state = l_ptr->state;
410         u32 checkpoint = l_ptr->next_in_no;
411         int was_active_link = tipc_link_is_active(l_ptr);
412         struct tipc_node *owner = l_ptr->owner;
413
414         msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
415
416         /* Link is down, accept any session */
417         l_ptr->peer_session = INVALID_SESSION;
418
419         /* Prepare for max packet size negotiation */
420         link_init_max_pkt(l_ptr);
421
422         l_ptr->state = RESET_UNKNOWN;
423
424         if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
425                 return;
426
427         tipc_node_link_down(l_ptr->owner, l_ptr);
428         tipc_bearer_remove_dest(l_ptr->bearer_id, l_ptr->addr);
429
430         if (was_active_link && tipc_node_active_links(l_ptr->owner)) {
431                 l_ptr->reset_checkpoint = checkpoint;
432                 l_ptr->exp_msg_count = START_CHANGEOVER;
433         }
434
435         /* Clean up all queues: */
436         __skb_queue_purge(&l_ptr->outqueue);
437         __skb_queue_purge(&l_ptr->deferred_queue);
438         if (!skb_queue_empty(&l_ptr->waiting_sks)) {
439                 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks);
440                 owner->action_flags |= TIPC_WAKEUP_USERS;
441         }
442         l_ptr->next_out = NULL;
443         l_ptr->unacked_window = 0;
444         l_ptr->checkpoint = 1;
445         l_ptr->next_out_no = 1;
446         l_ptr->fsm_msg_cnt = 0;
447         l_ptr->stale_count = 0;
448         link_reset_statistics(l_ptr);
449 }
450
451 void tipc_link_reset_list(unsigned int bearer_id)
452 {
453         struct tipc_link *l_ptr;
454         struct tipc_node *n_ptr;
455
456         rcu_read_lock();
457         list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
458                 tipc_node_lock(n_ptr);
459                 l_ptr = n_ptr->links[bearer_id];
460                 if (l_ptr)
461                         tipc_link_reset(l_ptr);
462                 tipc_node_unlock(n_ptr);
463         }
464         rcu_read_unlock();
465 }
466
467 static void link_activate(struct tipc_link *l_ptr)
468 {
469         l_ptr->next_in_no = l_ptr->stats.recv_info = 1;
470         tipc_node_link_up(l_ptr->owner, l_ptr);
471         tipc_bearer_add_dest(l_ptr->bearer_id, l_ptr->addr);
472 }
473
474 /**
475  * link_state_event - link finite state machine
476  * @l_ptr: pointer to link
477  * @event: state machine event to process
478  */
479 static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
480 {
481         struct tipc_link *other;
482         u32 cont_intv = l_ptr->continuity_interval;
483
484         if (l_ptr->flags & LINK_STOPPED)
485                 return;
486
487         if (!(l_ptr->flags & LINK_STARTED) && (event != STARTING_EVT))
488                 return;         /* Not yet. */
489
490         /* Check whether changeover is going on */
491         if (l_ptr->exp_msg_count) {
492                 if (event == TIMEOUT_EVT)
493                         link_set_timer(l_ptr, cont_intv);
494                 return;
495         }
496
497         switch (l_ptr->state) {
498         case WORKING_WORKING:
499                 switch (event) {
500                 case TRAFFIC_MSG_EVT:
501                 case ACTIVATE_MSG:
502                         break;
503                 case TIMEOUT_EVT:
504                         if (l_ptr->next_in_no != l_ptr->checkpoint) {
505                                 l_ptr->checkpoint = l_ptr->next_in_no;
506                                 if (tipc_bclink_acks_missing(l_ptr->owner)) {
507                                         tipc_link_proto_xmit(l_ptr, STATE_MSG,
508                                                              0, 0, 0, 0, 0);
509                                         l_ptr->fsm_msg_cnt++;
510                                 } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
511                                         tipc_link_proto_xmit(l_ptr, STATE_MSG,
512                                                              1, 0, 0, 0, 0);
513                                         l_ptr->fsm_msg_cnt++;
514                                 }
515                                 link_set_timer(l_ptr, cont_intv);
516                                 break;
517                         }
518                         l_ptr->state = WORKING_UNKNOWN;
519                         l_ptr->fsm_msg_cnt = 0;
520                         tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
521                         l_ptr->fsm_msg_cnt++;
522                         link_set_timer(l_ptr, cont_intv / 4);
523                         break;
524                 case RESET_MSG:
525                         pr_info("%s<%s>, requested by peer\n", link_rst_msg,
526                                 l_ptr->name);
527                         tipc_link_reset(l_ptr);
528                         l_ptr->state = RESET_RESET;
529                         l_ptr->fsm_msg_cnt = 0;
530                         tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
531                                              0, 0, 0, 0, 0);
532                         l_ptr->fsm_msg_cnt++;
533                         link_set_timer(l_ptr, cont_intv);
534                         break;
535                 default:
536                         pr_err("%s%u in WW state\n", link_unk_evt, event);
537                 }
538                 break;
539         case WORKING_UNKNOWN:
540                 switch (event) {
541                 case TRAFFIC_MSG_EVT:
542                 case ACTIVATE_MSG:
543                         l_ptr->state = WORKING_WORKING;
544                         l_ptr->fsm_msg_cnt = 0;
545                         link_set_timer(l_ptr, cont_intv);
546                         break;
547                 case RESET_MSG:
548                         pr_info("%s<%s>, requested by peer while probing\n",
549                                 link_rst_msg, l_ptr->name);
550                         tipc_link_reset(l_ptr);
551                         l_ptr->state = RESET_RESET;
552                         l_ptr->fsm_msg_cnt = 0;
553                         tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
554                                              0, 0, 0, 0, 0);
555                         l_ptr->fsm_msg_cnt++;
556                         link_set_timer(l_ptr, cont_intv);
557                         break;
558                 case TIMEOUT_EVT:
559                         if (l_ptr->next_in_no != l_ptr->checkpoint) {
560                                 l_ptr->state = WORKING_WORKING;
561                                 l_ptr->fsm_msg_cnt = 0;
562                                 l_ptr->checkpoint = l_ptr->next_in_no;
563                                 if (tipc_bclink_acks_missing(l_ptr->owner)) {
564                                         tipc_link_proto_xmit(l_ptr, STATE_MSG,
565                                                              0, 0, 0, 0, 0);
566                                         l_ptr->fsm_msg_cnt++;
567                                 }
568                                 link_set_timer(l_ptr, cont_intv);
569                         } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
570                                 tipc_link_proto_xmit(l_ptr, STATE_MSG,
571                                                      1, 0, 0, 0, 0);
572                                 l_ptr->fsm_msg_cnt++;
573                                 link_set_timer(l_ptr, cont_intv / 4);
574                         } else {        /* Link has failed */
575                                 pr_warn("%s<%s>, peer not responding\n",
576                                         link_rst_msg, l_ptr->name);
577                                 tipc_link_reset(l_ptr);
578                                 l_ptr->state = RESET_UNKNOWN;
579                                 l_ptr->fsm_msg_cnt = 0;
580                                 tipc_link_proto_xmit(l_ptr, RESET_MSG,
581                                                      0, 0, 0, 0, 0);
582                                 l_ptr->fsm_msg_cnt++;
583                                 link_set_timer(l_ptr, cont_intv);
584                         }
585                         break;
586                 default:
587                         pr_err("%s%u in WU state\n", link_unk_evt, event);
588                 }
589                 break;
590         case RESET_UNKNOWN:
591                 switch (event) {
592                 case TRAFFIC_MSG_EVT:
593                         break;
594                 case ACTIVATE_MSG:
595                         other = l_ptr->owner->active_links[0];
596                         if (other && link_working_unknown(other))
597                                 break;
598                         l_ptr->state = WORKING_WORKING;
599                         l_ptr->fsm_msg_cnt = 0;
600                         link_activate(l_ptr);
601                         tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
602                         l_ptr->fsm_msg_cnt++;
603                         if (l_ptr->owner->working_links == 1)
604                                 tipc_link_sync_xmit(l_ptr);
605                         link_set_timer(l_ptr, cont_intv);
606                         break;
607                 case RESET_MSG:
608                         l_ptr->state = RESET_RESET;
609                         l_ptr->fsm_msg_cnt = 0;
610                         tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
611                                              1, 0, 0, 0, 0);
612                         l_ptr->fsm_msg_cnt++;
613                         link_set_timer(l_ptr, cont_intv);
614                         break;
615                 case STARTING_EVT:
616                         l_ptr->flags |= LINK_STARTED;
617                         /* fall through */
618                 case TIMEOUT_EVT:
619                         tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
620                         l_ptr->fsm_msg_cnt++;
621                         link_set_timer(l_ptr, cont_intv);
622                         break;
623                 default:
624                         pr_err("%s%u in RU state\n", link_unk_evt, event);
625                 }
626                 break;
627         case RESET_RESET:
628                 switch (event) {
629                 case TRAFFIC_MSG_EVT:
630                 case ACTIVATE_MSG:
631                         other = l_ptr->owner->active_links[0];
632                         if (other && link_working_unknown(other))
633                                 break;
634                         l_ptr->state = WORKING_WORKING;
635                         l_ptr->fsm_msg_cnt = 0;
636                         link_activate(l_ptr);
637                         tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
638                         l_ptr->fsm_msg_cnt++;
639                         if (l_ptr->owner->working_links == 1)
640                                 tipc_link_sync_xmit(l_ptr);
641                         link_set_timer(l_ptr, cont_intv);
642                         break;
643                 case RESET_MSG:
644                         break;
645                 case TIMEOUT_EVT:
646                         tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
647                                              0, 0, 0, 0, 0);
648                         l_ptr->fsm_msg_cnt++;
649                         link_set_timer(l_ptr, cont_intv);
650                         break;
651                 default:
652                         pr_err("%s%u in RR state\n", link_unk_evt, event);
653                 }
654                 break;
655         default:
656                 pr_err("Unknown link state %u/%u\n", l_ptr->state, event);
657         }
658 }
659
660 /* tipc_link_cong: determine return value and how to treat the
661  * sent buffer during link congestion.
662  * - For plain, errorless user data messages we keep the buffer and
663  *   return -ELINKONG.
664  * - For all other messages we discard the buffer and return -EHOSTUNREACH
665  * - For TIPC internal messages we also reset the link
666  */
667 static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list)
668 {
669         struct sk_buff *skb = skb_peek(list);
670         struct tipc_msg *msg = buf_msg(skb);
671         uint imp = tipc_msg_tot_importance(msg);
672         u32 oport = msg_tot_origport(msg);
673
674         if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
675                 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
676                 tipc_link_reset(link);
677                 goto drop;
678         }
679         if (unlikely(msg_errcode(msg)))
680                 goto drop;
681         if (unlikely(msg_reroute_cnt(msg)))
682                 goto drop;
683         if (TIPC_SKB_CB(skb)->wakeup_pending)
684                 return -ELINKCONG;
685         if (link_schedule_user(link, oport, skb_queue_len(list), imp))
686                 return -ELINKCONG;
687 drop:
688         __skb_queue_purge(list);
689         return -EHOSTUNREACH;
690 }
691
692 /**
693  * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
694  * @link: link to use
695  * @list: chain of buffers containing message
696  *
697  * Consumes the buffer chain, except when returning -ELINKCONG
698  * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
699  * user data messages) or -EHOSTUNREACH (all other messages/senders)
700  * Only the socket functions tipc_send_stream() and tipc_send_packet() need
701  * to act on the return value, since they may need to do more send attempts.
702  */
703 int __tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list)
704 {
705         struct tipc_msg *msg = buf_msg(skb_peek(list));
706         uint psz = msg_size(msg);
707         uint sndlim = link->queue_limit[0];
708         uint imp = tipc_msg_tot_importance(msg);
709         uint mtu = link->max_pkt;
710         uint ack = mod(link->next_in_no - 1);
711         uint seqno = link->next_out_no;
712         uint bc_last_in = link->owner->bclink.last_in;
713         struct tipc_media_addr *addr = &link->media_addr;
714         struct sk_buff_head *outqueue = &link->outqueue;
715         struct sk_buff *skb, *tmp;
716
717         /* Match queue limits against msg importance: */
718         if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp]))
719                 return tipc_link_cong(link, list);
720
721         /* Has valid packet limit been used ? */
722         if (unlikely(psz > mtu)) {
723                 __skb_queue_purge(list);
724                 return -EMSGSIZE;
725         }
726
727         /* Prepare each packet for sending, and add to outqueue: */
728         skb_queue_walk_safe(list, skb, tmp) {
729                 __skb_unlink(skb, list);
730                 msg = buf_msg(skb);
731                 msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
732                 msg_set_bcast_ack(msg, bc_last_in);
733
734                 if (skb_queue_len(outqueue) < sndlim) {
735                         __skb_queue_tail(outqueue, skb);
736                         tipc_bearer_send(link->bearer_id, skb, addr);
737                         link->next_out = NULL;
738                         link->unacked_window = 0;
739                 } else if (tipc_msg_bundle(outqueue, skb, mtu)) {
740                         link->stats.sent_bundled++;
741                         continue;
742                 } else if (tipc_msg_make_bundle(outqueue, skb, mtu,
743                                                 link->addr)) {
744                         link->stats.sent_bundled++;
745                         link->stats.sent_bundles++;
746                         if (!link->next_out)
747                                 link->next_out = skb_peek_tail(outqueue);
748                 } else {
749                         __skb_queue_tail(outqueue, skb);
750                         if (!link->next_out)
751                                 link->next_out = skb;
752                 }
753                 seqno++;
754         }
755         link->next_out_no = seqno;
756         return 0;
757 }
758
759 static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
760 {
761         __skb_queue_head_init(list);
762         __skb_queue_tail(list, skb);
763 }
764
765 static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
766 {
767         struct sk_buff_head head;
768
769         skb2list(skb, &head);
770         return __tipc_link_xmit(link, &head);
771 }
772
773 int tipc_link_xmit_skb(struct sk_buff *skb, u32 dnode, u32 selector)
774 {
775         struct sk_buff_head head;
776
777         skb2list(skb, &head);
778         return tipc_link_xmit(&head, dnode, selector);
779 }
780
781 /**
782  * tipc_link_xmit() is the general link level function for message sending
783  * @list: chain of buffers containing message
784  * @dsz: amount of user data to be sent
785  * @dnode: address of destination node
786  * @selector: a number used for deterministic link selection
787  * Consumes the buffer chain, except when returning -ELINKCONG
788  * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
789  */
790 int tipc_link_xmit(struct sk_buff_head *list, u32 dnode, u32 selector)
791 {
792         struct tipc_link *link = NULL;
793         struct tipc_node *node;
794         int rc = -EHOSTUNREACH;
795
796         node = tipc_node_find(dnode);
797         if (node) {
798                 tipc_node_lock(node);
799                 link = node->active_links[selector & 1];
800                 if (link)
801                         rc = __tipc_link_xmit(link, list);
802                 tipc_node_unlock(node);
803         }
804
805         if (link)
806                 return rc;
807
808         if (likely(in_own_node(dnode))) {
809                 /* As a node local message chain never contains more than one
810                  * buffer, we just need to dequeue one SKB buffer from the
811                  * head list.
812                  */
813                 return tipc_sk_rcv(__skb_dequeue(list));
814         }
815         __skb_queue_purge(list);
816
817         return rc;
818 }
819
820 /*
821  * tipc_link_sync_xmit - synchronize broadcast link endpoints.
822  *
823  * Give a newly added peer node the sequence number where it should
824  * start receiving and acking broadcast packets.
825  *
826  * Called with node locked
827  */
828 static void tipc_link_sync_xmit(struct tipc_link *link)
829 {
830         struct sk_buff *skb;
831         struct tipc_msg *msg;
832
833         skb = tipc_buf_acquire(INT_H_SIZE);
834         if (!skb)
835                 return;
836
837         msg = buf_msg(skb);
838         tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr);
839         msg_set_last_bcast(msg, link->owner->bclink.acked);
840         __tipc_link_xmit_skb(link, skb);
841 }
842
843 /*
844  * tipc_link_sync_rcv - synchronize broadcast link endpoints.
845  * Receive the sequence number where we should start receiving and
846  * acking broadcast packets from a newly added peer node, and open
847  * up for reception of such packets.
848  *
849  * Called with node locked
850  */
851 static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
852 {
853         struct tipc_msg *msg = buf_msg(buf);
854
855         n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
856         n->bclink.recv_permitted = true;
857         kfree_skb(buf);
858 }
859
860 struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
861                                     const struct sk_buff *skb)
862 {
863         if (skb_queue_is_last(list, skb))
864                 return NULL;
865         return skb->next;
866 }
867
868 /*
869  * tipc_link_push_packets - push unsent packets to bearer
870  *
871  * Push out the unsent messages of a link where congestion
872  * has abated. Node is locked.
873  *
874  * Called with node locked
875  */
876 void tipc_link_push_packets(struct tipc_link *l_ptr)
877 {
878         struct sk_buff_head *outqueue = &l_ptr->outqueue;
879         struct sk_buff *skb = l_ptr->next_out;
880         struct tipc_msg *msg;
881         u32 next, first;
882
883         skb_queue_walk_from(outqueue, skb) {
884                 msg = buf_msg(skb);
885                 next = msg_seqno(msg);
886                 first = buf_seqno(skb_peek(outqueue));
887
888                 if (mod(next - first) < l_ptr->queue_limit[0]) {
889                         msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
890                         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
891                         if (msg_user(msg) == MSG_BUNDLER)
892                                 TIPC_SKB_CB(skb)->bundling = false;
893                         tipc_bearer_send(l_ptr->bearer_id, skb,
894                                          &l_ptr->media_addr);
895                         l_ptr->next_out = tipc_skb_queue_next(outqueue, skb);
896                 } else {
897                         break;
898                 }
899         }
900 }
901
902 void tipc_link_reset_all(struct tipc_node *node)
903 {
904         char addr_string[16];
905         u32 i;
906
907         tipc_node_lock(node);
908
909         pr_warn("Resetting all links to %s\n",
910                 tipc_addr_string_fill(addr_string, node->addr));
911
912         for (i = 0; i < MAX_BEARERS; i++) {
913                 if (node->links[i]) {
914                         link_print(node->links[i], "Resetting link\n");
915                         tipc_link_reset(node->links[i]);
916                 }
917         }
918
919         tipc_node_unlock(node);
920 }
921
922 static void link_retransmit_failure(struct tipc_link *l_ptr,
923                                     struct sk_buff *buf)
924 {
925         struct tipc_msg *msg = buf_msg(buf);
926
927         pr_warn("Retransmission failure on link <%s>\n", l_ptr->name);
928
929         if (l_ptr->addr) {
930                 /* Handle failure on standard link */
931                 link_print(l_ptr, "Resetting link\n");
932                 tipc_link_reset(l_ptr);
933
934         } else {
935                 /* Handle failure on broadcast link */
936                 struct tipc_node *n_ptr;
937                 char addr_string[16];
938
939                 pr_info("Msg seq number: %u,  ", msg_seqno(msg));
940                 pr_cont("Outstanding acks: %lu\n",
941                         (unsigned long) TIPC_SKB_CB(buf)->handle);
942
943                 n_ptr = tipc_bclink_retransmit_to();
944                 tipc_node_lock(n_ptr);
945
946                 tipc_addr_string_fill(addr_string, n_ptr->addr);
947                 pr_info("Broadcast link info for %s\n", addr_string);
948                 pr_info("Reception permitted: %d,  Acked: %u\n",
949                         n_ptr->bclink.recv_permitted,
950                         n_ptr->bclink.acked);
951                 pr_info("Last in: %u,  Oos state: %u,  Last sent: %u\n",
952                         n_ptr->bclink.last_in,
953                         n_ptr->bclink.oos_state,
954                         n_ptr->bclink.last_sent);
955
956                 tipc_node_unlock(n_ptr);
957
958                 tipc_bclink_set_flags(TIPC_BCLINK_RESET);
959                 l_ptr->stale_count = 0;
960         }
961 }
962
963 void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
964                           u32 retransmits)
965 {
966         struct tipc_msg *msg;
967
968         if (!skb)
969                 return;
970
971         msg = buf_msg(skb);
972
973         /* Detect repeated retransmit failures */
974         if (l_ptr->last_retransmitted == msg_seqno(msg)) {
975                 if (++l_ptr->stale_count > 100) {
976                         link_retransmit_failure(l_ptr, skb);
977                         return;
978                 }
979         } else {
980                 l_ptr->last_retransmitted = msg_seqno(msg);
981                 l_ptr->stale_count = 1;
982         }
983
984         skb_queue_walk_from(&l_ptr->outqueue, skb) {
985                 if (!retransmits || skb == l_ptr->next_out)
986                         break;
987                 msg = buf_msg(skb);
988                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
989                 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
990                 tipc_bearer_send(l_ptr->bearer_id, skb, &l_ptr->media_addr);
991                 retransmits--;
992                 l_ptr->stats.retransmitted++;
993         }
994 }
995
996 static void link_retrieve_defq(struct tipc_link *link,
997                                struct sk_buff_head *list)
998 {
999         u32 seq_no;
1000
1001         if (skb_queue_empty(&link->deferred_queue))
1002                 return;
1003
1004         seq_no = buf_seqno(skb_peek(&link->deferred_queue));
1005         if (seq_no == mod(link->next_in_no))
1006                 skb_queue_splice_tail_init(&link->deferred_queue, list);
1007 }
1008
1009 /**
1010  * link_recv_buf_validate - validate basic format of received message
1011  *
1012  * This routine ensures a TIPC message has an acceptable header, and at least
1013  * as much data as the header indicates it should.  The routine also ensures
1014  * that the entire message header is stored in the main fragment of the message
1015  * buffer, to simplify future access to message header fields.
1016  *
1017  * Note: Having extra info present in the message header or data areas is OK.
1018  * TIPC will ignore the excess, under the assumption that it is optional info
1019  * introduced by a later release of the protocol.
1020  */
1021 static int link_recv_buf_validate(struct sk_buff *buf)
1022 {
1023         static u32 min_data_hdr_size[8] = {
1024                 SHORT_H_SIZE, MCAST_H_SIZE, NAMED_H_SIZE, BASIC_H_SIZE,
1025                 MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
1026                 };
1027
1028         struct tipc_msg *msg;
1029         u32 tipc_hdr[2];
1030         u32 size;
1031         u32 hdr_size;
1032         u32 min_hdr_size;
1033
1034         /* If this packet comes from the defer queue, the skb has already
1035          * been validated
1036          */
1037         if (unlikely(TIPC_SKB_CB(buf)->deferred))
1038                 return 1;
1039
1040         if (unlikely(buf->len < MIN_H_SIZE))
1041                 return 0;
1042
1043         msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
1044         if (msg == NULL)
1045                 return 0;
1046
1047         if (unlikely(msg_version(msg) != TIPC_VERSION))
1048                 return 0;
1049
1050         size = msg_size(msg);
1051         hdr_size = msg_hdr_sz(msg);
1052         min_hdr_size = msg_isdata(msg) ?
1053                 min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;
1054
1055         if (unlikely((hdr_size < min_hdr_size) ||
1056                      (size < hdr_size) ||
1057                      (buf->len < size) ||
1058                      (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
1059                 return 0;
1060
1061         return pskb_may_pull(buf, hdr_size);
1062 }
1063
1064 /**
1065  * tipc_rcv - process TIPC packets/messages arriving from off-node
1066  * @skb: TIPC packet
1067  * @b_ptr: pointer to bearer message arrived on
1068  *
1069  * Invoked with no locks held.  Bearer pointer must point to a valid bearer
1070  * structure (i.e. cannot be NULL), but bearer can be inactive.
1071  */
1072 void tipc_rcv(struct sk_buff *skb, struct tipc_bearer *b_ptr)
1073 {
1074         struct sk_buff_head head;
1075         struct tipc_node *n_ptr;
1076         struct tipc_link *l_ptr;
1077         struct sk_buff *skb1, *tmp;
1078         struct tipc_msg *msg;
1079         u32 seq_no;
1080         u32 ackd;
1081         u32 released;
1082
1083         skb2list(skb, &head);
1084
1085         while ((skb = __skb_dequeue(&head))) {
1086                 /* Ensure message is well-formed */
1087                 if (unlikely(!link_recv_buf_validate(skb)))
1088                         goto discard;
1089
1090                 /* Ensure message data is a single contiguous unit */
1091                 if (unlikely(skb_linearize(skb)))
1092                         goto discard;
1093
1094                 /* Handle arrival of a non-unicast link message */
1095                 msg = buf_msg(skb);
1096
1097                 if (unlikely(msg_non_seq(msg))) {
1098                         if (msg_user(msg) ==  LINK_CONFIG)
1099                                 tipc_disc_rcv(skb, b_ptr);
1100                         else
1101                                 tipc_bclink_rcv(skb);
1102                         continue;
1103                 }
1104
1105                 /* Discard unicast link messages destined for another node */
1106                 if (unlikely(!msg_short(msg) &&
1107                              (msg_destnode(msg) != tipc_own_addr)))
1108                         goto discard;
1109
1110                 /* Locate neighboring node that sent message */
1111                 n_ptr = tipc_node_find(msg_prevnode(msg));
1112                 if (unlikely(!n_ptr))
1113                         goto discard;
1114                 tipc_node_lock(n_ptr);
1115
1116                 /* Locate unicast link endpoint that should handle message */
1117                 l_ptr = n_ptr->links[b_ptr->identity];
1118                 if (unlikely(!l_ptr))
1119                         goto unlock_discard;
1120
1121                 /* Verify that communication with node is currently allowed */
1122                 if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) &&
1123                     msg_user(msg) == LINK_PROTOCOL &&
1124                     (msg_type(msg) == RESET_MSG ||
1125                     msg_type(msg) == ACTIVATE_MSG) &&
1126                     !msg_redundant_link(msg))
1127                         n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN;
1128
1129                 if (tipc_node_blocked(n_ptr))
1130                         goto unlock_discard;
1131
1132                 /* Validate message sequence number info */
1133                 seq_no = msg_seqno(msg);
1134                 ackd = msg_ack(msg);
1135
1136                 /* Release acked messages */
1137                 if (n_ptr->bclink.recv_permitted)
1138                         tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1139
1140                 released = 0;
1141                 skb_queue_walk_safe(&l_ptr->outqueue, skb1, tmp) {
1142                         if (skb1 == l_ptr->next_out ||
1143                             more(buf_seqno(skb1), ackd))
1144                                 break;
1145                          __skb_unlink(skb1, &l_ptr->outqueue);
1146                          kfree_skb(skb1);
1147                          released = 1;
1148                 }
1149
1150                 /* Try sending any messages link endpoint has pending */
1151                 if (unlikely(l_ptr->next_out))
1152                         tipc_link_push_packets(l_ptr);
1153
1154                 if (released && !skb_queue_empty(&l_ptr->waiting_sks)) {
1155                         link_prepare_wakeup(l_ptr);
1156                         l_ptr->owner->action_flags |= TIPC_WAKEUP_USERS;
1157                 }
1158
1159                 /* Process the incoming packet */
1160                 if (unlikely(!link_working_working(l_ptr))) {
1161                         if (msg_user(msg) == LINK_PROTOCOL) {
1162                                 tipc_link_proto_rcv(l_ptr, skb);
1163                                 link_retrieve_defq(l_ptr, &head);
1164                                 tipc_node_unlock(n_ptr);
1165                                 continue;
1166                         }
1167
1168                         /* Traffic message. Conditionally activate link */
1169                         link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1170
1171                         if (link_working_working(l_ptr)) {
1172                                 /* Re-insert buffer in front of queue */
1173                                 __skb_queue_head(&head, skb);
1174                                 tipc_node_unlock(n_ptr);
1175                                 continue;
1176                         }
1177                         goto unlock_discard;
1178                 }
1179
1180                 /* Link is now in state WORKING_WORKING */
1181                 if (unlikely(seq_no != mod(l_ptr->next_in_no))) {
1182                         link_handle_out_of_seq_msg(l_ptr, skb);
1183                         link_retrieve_defq(l_ptr, &head);
1184                         tipc_node_unlock(n_ptr);
1185                         continue;
1186                 }
1187                 l_ptr->next_in_no++;
1188                 if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue)))
1189                         link_retrieve_defq(l_ptr, &head);
1190
1191                 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1192                         l_ptr->stats.sent_acks++;
1193                         tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1194                 }
1195
1196                 if (tipc_link_prepare_input(l_ptr, &skb)) {
1197                         tipc_node_unlock(n_ptr);
1198                         continue;
1199                 }
1200                 tipc_node_unlock(n_ptr);
1201
1202                 if (tipc_link_input(l_ptr, skb) != 0)
1203                         goto discard;
1204                 continue;
1205 unlock_discard:
1206                 tipc_node_unlock(n_ptr);
1207 discard:
1208                 kfree_skb(skb);
1209         }
1210 }
1211
1212 /**
1213  * tipc_link_prepare_input - process TIPC link messages
1214  *
1215  * returns nonzero if the message was consumed
1216  *
1217  * Node lock must be held
1218  */
1219 static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf)
1220 {
1221         struct tipc_node *n;
1222         struct tipc_msg *msg;
1223         int res = -EINVAL;
1224
1225         n = l->owner;
1226         msg = buf_msg(*buf);
1227         switch (msg_user(msg)) {
1228         case CHANGEOVER_PROTOCOL:
1229                 if (tipc_link_tunnel_rcv(n, buf))
1230                         res = 0;
1231                 break;
1232         case MSG_FRAGMENTER:
1233                 l->stats.recv_fragments++;
1234                 if (tipc_buf_append(&l->reasm_buf, buf)) {
1235                         l->stats.recv_fragmented++;
1236                         res = 0;
1237                 } else if (!l->reasm_buf) {
1238                         tipc_link_reset(l);
1239                 }
1240                 break;
1241         case MSG_BUNDLER:
1242                 l->stats.recv_bundles++;
1243                 l->stats.recv_bundled += msg_msgcnt(msg);
1244                 res = 0;
1245                 break;
1246         case NAME_DISTRIBUTOR:
1247                 n->bclink.recv_permitted = true;
1248                 res = 0;
1249                 break;
1250         case BCAST_PROTOCOL:
1251                 tipc_link_sync_rcv(n, *buf);
1252                 break;
1253         default:
1254                 res = 0;
1255         }
1256         return res;
1257 }
1258 /**
1259  * tipc_link_input - Deliver message too higher layers
1260  */
1261 static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf)
1262 {
1263         struct tipc_msg *msg = buf_msg(buf);
1264         int res = 0;
1265
1266         switch (msg_user(msg)) {
1267         case TIPC_LOW_IMPORTANCE:
1268         case TIPC_MEDIUM_IMPORTANCE:
1269         case TIPC_HIGH_IMPORTANCE:
1270         case TIPC_CRITICAL_IMPORTANCE:
1271         case CONN_MANAGER:
1272                 tipc_sk_rcv(buf);
1273                 break;
1274         case NAME_DISTRIBUTOR:
1275                 tipc_named_rcv(buf);
1276                 break;
1277         case MSG_BUNDLER:
1278                 tipc_link_bundle_rcv(buf);
1279                 break;
1280         default:
1281                 res = -EINVAL;
1282         }
1283         return res;
1284 }
1285
1286 /**
1287  * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
1288  *
1289  * Returns increase in queue length (i.e. 0 or 1)
1290  */
1291 u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)
1292 {
1293         struct sk_buff *skb1;
1294         u32 seq_no = buf_seqno(skb);
1295
1296         /* Empty queue ? */
1297         if (skb_queue_empty(list)) {
1298                 __skb_queue_tail(list, skb);
1299                 return 1;
1300         }
1301
1302         /* Last ? */
1303         if (less(buf_seqno(skb_peek_tail(list)), seq_no)) {
1304                 __skb_queue_tail(list, skb);
1305                 return 1;
1306         }
1307
1308         /* Locate insertion point in queue, then insert; discard if duplicate */
1309         skb_queue_walk(list, skb1) {
1310                 u32 curr_seqno = buf_seqno(skb1);
1311
1312                 if (seq_no == curr_seqno) {
1313                         kfree_skb(skb);
1314                         return 0;
1315                 }
1316
1317                 if (less(seq_no, curr_seqno))
1318                         break;
1319         }
1320
1321         __skb_queue_before(list, skb1, skb);
1322         return 1;
1323 }
1324
1325 /*
1326  * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
1327  */
1328 static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1329                                        struct sk_buff *buf)
1330 {
1331         u32 seq_no = buf_seqno(buf);
1332
1333         if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
1334                 tipc_link_proto_rcv(l_ptr, buf);
1335                 return;
1336         }
1337
1338         /* Record OOS packet arrival (force mismatch on next timeout) */
1339         l_ptr->checkpoint--;
1340
1341         /*
1342          * Discard packet if a duplicate; otherwise add it to deferred queue
1343          * and notify peer of gap as per protocol specification
1344          */
1345         if (less(seq_no, mod(l_ptr->next_in_no))) {
1346                 l_ptr->stats.duplicates++;
1347                 kfree_skb(buf);
1348                 return;
1349         }
1350
1351         if (tipc_link_defer_pkt(&l_ptr->deferred_queue, buf)) {
1352                 l_ptr->stats.deferred_recv++;
1353                 TIPC_SKB_CB(buf)->deferred = true;
1354                 if ((skb_queue_len(&l_ptr->deferred_queue) % 16) == 1)
1355                         tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1356         } else {
1357                 l_ptr->stats.duplicates++;
1358         }
1359 }
1360
1361 /*
1362  * Send protocol message to the other endpoint.
1363  */
1364 void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1365                           u32 gap, u32 tolerance, u32 priority, u32 ack_mtu)
1366 {
1367         struct sk_buff *buf = NULL;
1368         struct tipc_msg *msg = l_ptr->pmsg;
1369         u32 msg_size = sizeof(l_ptr->proto_msg);
1370         int r_flag;
1371
1372         /* Don't send protocol message during link changeover */
1373         if (l_ptr->exp_msg_count)
1374                 return;
1375
1376         /* Abort non-RESET send if communication with node is prohibited */
1377         if ((tipc_node_blocked(l_ptr->owner)) && (msg_typ != RESET_MSG))
1378                 return;
1379
1380         /* Create protocol message with "out-of-sequence" sequence number */
1381         msg_set_type(msg, msg_typ);
1382         msg_set_net_plane(msg, l_ptr->net_plane);
1383         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1384         msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
1385
1386         if (msg_typ == STATE_MSG) {
1387                 u32 next_sent = mod(l_ptr->next_out_no);
1388
1389                 if (!tipc_link_is_up(l_ptr))
1390                         return;
1391                 if (l_ptr->next_out)
1392                         next_sent = buf_seqno(l_ptr->next_out);
1393                 msg_set_next_sent(msg, next_sent);
1394                 if (!skb_queue_empty(&l_ptr->deferred_queue)) {
1395                         u32 rec = buf_seqno(skb_peek(&l_ptr->deferred_queue));
1396                         gap = mod(rec - mod(l_ptr->next_in_no));
1397                 }
1398                 msg_set_seq_gap(msg, gap);
1399                 if (gap)
1400                         l_ptr->stats.sent_nacks++;
1401                 msg_set_link_tolerance(msg, tolerance);
1402                 msg_set_linkprio(msg, priority);
1403                 msg_set_max_pkt(msg, ack_mtu);
1404                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1405                 msg_set_probe(msg, probe_msg != 0);
1406                 if (probe_msg) {
1407                         u32 mtu = l_ptr->max_pkt;
1408
1409                         if ((mtu < l_ptr->max_pkt_target) &&
1410                             link_working_working(l_ptr) &&
1411                             l_ptr->fsm_msg_cnt) {
1412                                 msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1413                                 if (l_ptr->max_pkt_probes == 10) {
1414                                         l_ptr->max_pkt_target = (msg_size - 4);
1415                                         l_ptr->max_pkt_probes = 0;
1416                                         msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1417                                 }
1418                                 l_ptr->max_pkt_probes++;
1419                         }
1420
1421                         l_ptr->stats.sent_probes++;
1422                 }
1423                 l_ptr->stats.sent_states++;
1424         } else {                /* RESET_MSG or ACTIVATE_MSG */
1425                 msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
1426                 msg_set_seq_gap(msg, 0);
1427                 msg_set_next_sent(msg, 1);
1428                 msg_set_probe(msg, 0);
1429                 msg_set_link_tolerance(msg, l_ptr->tolerance);
1430                 msg_set_linkprio(msg, l_ptr->priority);
1431                 msg_set_max_pkt(msg, l_ptr->max_pkt_target);
1432         }
1433
1434         r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
1435         msg_set_redundant_link(msg, r_flag);
1436         msg_set_linkprio(msg, l_ptr->priority);
1437         msg_set_size(msg, msg_size);
1438
1439         msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
1440
1441         buf = tipc_buf_acquire(msg_size);
1442         if (!buf)
1443                 return;
1444
1445         skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
1446         buf->priority = TC_PRIO_CONTROL;
1447
1448         tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
1449         l_ptr->unacked_window = 0;
1450         kfree_skb(buf);
1451 }
1452
1453 /*
1454  * Receive protocol message :
1455  * Note that network plane id propagates through the network, and may
1456  * change at any time. The node with lowest address rules
1457  */
1458 static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf)
1459 {
1460         u32 rec_gap = 0;
1461         u32 max_pkt_info;
1462         u32 max_pkt_ack;
1463         u32 msg_tol;
1464         struct tipc_msg *msg = buf_msg(buf);
1465
1466         /* Discard protocol message during link changeover */
1467         if (l_ptr->exp_msg_count)
1468                 goto exit;
1469
1470         if (l_ptr->net_plane != msg_net_plane(msg))
1471                 if (tipc_own_addr > msg_prevnode(msg))
1472                         l_ptr->net_plane = msg_net_plane(msg);
1473
1474         switch (msg_type(msg)) {
1475
1476         case RESET_MSG:
1477                 if (!link_working_unknown(l_ptr) &&
1478                     (l_ptr->peer_session != INVALID_SESSION)) {
1479                         if (less_eq(msg_session(msg), l_ptr->peer_session))
1480                                 break; /* duplicate or old reset: ignore */
1481                 }
1482
1483                 if (!msg_redundant_link(msg) && (link_working_working(l_ptr) ||
1484                                 link_working_unknown(l_ptr))) {
1485                         /*
1486                          * peer has lost contact -- don't allow peer's links
1487                          * to reactivate before we recognize loss & clean up
1488                          */
1489                         l_ptr->owner->action_flags |= TIPC_WAIT_OWN_LINKS_DOWN;
1490                 }
1491
1492                 link_state_event(l_ptr, RESET_MSG);
1493
1494                 /* fall thru' */
1495         case ACTIVATE_MSG:
1496                 /* Update link settings according other endpoint's values */
1497                 strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
1498
1499                 msg_tol = msg_link_tolerance(msg);
1500                 if (msg_tol > l_ptr->tolerance)
1501                         link_set_supervision_props(l_ptr, msg_tol);
1502
1503                 if (msg_linkprio(msg) > l_ptr->priority)
1504                         l_ptr->priority = msg_linkprio(msg);
1505
1506                 max_pkt_info = msg_max_pkt(msg);
1507                 if (max_pkt_info) {
1508                         if (max_pkt_info < l_ptr->max_pkt_target)
1509                                 l_ptr->max_pkt_target = max_pkt_info;
1510                         if (l_ptr->max_pkt > l_ptr->max_pkt_target)
1511                                 l_ptr->max_pkt = l_ptr->max_pkt_target;
1512                 } else {
1513                         l_ptr->max_pkt = l_ptr->max_pkt_target;
1514                 }
1515
1516                 /* Synchronize broadcast link info, if not done previously */
1517                 if (!tipc_node_is_up(l_ptr->owner)) {
1518                         l_ptr->owner->bclink.last_sent =
1519                                 l_ptr->owner->bclink.last_in =
1520                                 msg_last_bcast(msg);
1521                         l_ptr->owner->bclink.oos_state = 0;
1522                 }
1523
1524                 l_ptr->peer_session = msg_session(msg);
1525                 l_ptr->peer_bearer_id = msg_bearer_id(msg);
1526
1527                 if (msg_type(msg) == ACTIVATE_MSG)
1528                         link_state_event(l_ptr, ACTIVATE_MSG);
1529                 break;
1530         case STATE_MSG:
1531
1532                 msg_tol = msg_link_tolerance(msg);
1533                 if (msg_tol)
1534                         link_set_supervision_props(l_ptr, msg_tol);
1535
1536                 if (msg_linkprio(msg) &&
1537                     (msg_linkprio(msg) != l_ptr->priority)) {
1538                         pr_warn("%s<%s>, priority change %u->%u\n",
1539                                 link_rst_msg, l_ptr->name, l_ptr->priority,
1540                                 msg_linkprio(msg));
1541                         l_ptr->priority = msg_linkprio(msg);
1542                         tipc_link_reset(l_ptr); /* Enforce change to take effect */
1543                         break;
1544                 }
1545
1546                 /* Record reception; force mismatch at next timeout: */
1547                 l_ptr->checkpoint--;
1548
1549                 link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1550                 l_ptr->stats.recv_states++;
1551                 if (link_reset_unknown(l_ptr))
1552                         break;
1553
1554                 if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) {
1555                         rec_gap = mod(msg_next_sent(msg) -
1556                                       mod(l_ptr->next_in_no));
1557                 }
1558
1559                 max_pkt_ack = msg_max_pkt(msg);
1560                 if (max_pkt_ack > l_ptr->max_pkt) {
1561                         l_ptr->max_pkt = max_pkt_ack;
1562                         l_ptr->max_pkt_probes = 0;
1563                 }
1564
1565                 max_pkt_ack = 0;
1566                 if (msg_probe(msg)) {
1567                         l_ptr->stats.recv_probes++;
1568                         if (msg_size(msg) > sizeof(l_ptr->proto_msg))
1569                                 max_pkt_ack = msg_size(msg);
1570                 }
1571
1572                 /* Protocol message before retransmits, reduce loss risk */
1573                 if (l_ptr->owner->bclink.recv_permitted)
1574                         tipc_bclink_update_link_state(l_ptr->owner,
1575                                                       msg_last_bcast(msg));
1576
1577                 if (rec_gap || (msg_probe(msg))) {
1578                         tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, rec_gap, 0,
1579                                              0, max_pkt_ack);
1580                 }
1581                 if (msg_seq_gap(msg)) {
1582                         l_ptr->stats.recv_nacks++;
1583                         tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->outqueue),
1584                                              msg_seq_gap(msg));
1585                 }
1586                 break;
1587         }
1588 exit:
1589         kfree_skb(buf);
1590 }
1591
1592
1593 /* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to
1594  * a different bearer. Owner node is locked.
1595  */
1596 static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1597                                   struct tipc_msg *tunnel_hdr,
1598                                   struct tipc_msg *msg,
1599                                   u32 selector)
1600 {
1601         struct tipc_link *tunnel;
1602         struct sk_buff *skb;
1603         u32 length = msg_size(msg);
1604
1605         tunnel = l_ptr->owner->active_links[selector & 1];
1606         if (!tipc_link_is_up(tunnel)) {
1607                 pr_warn("%stunnel link no longer available\n", link_co_err);
1608                 return;
1609         }
1610         msg_set_size(tunnel_hdr, length + INT_H_SIZE);
1611         skb = tipc_buf_acquire(length + INT_H_SIZE);
1612         if (!skb) {
1613                 pr_warn("%sunable to send tunnel msg\n", link_co_err);
1614                 return;
1615         }
1616         skb_copy_to_linear_data(skb, tunnel_hdr, INT_H_SIZE);
1617         skb_copy_to_linear_data_offset(skb, INT_H_SIZE, msg, length);
1618         __tipc_link_xmit_skb(tunnel, skb);
1619 }
1620
1621
1622 /* tipc_link_failover_send_queue(): A link has gone down, but a second
1623  * link is still active. We can do failover. Tunnel the failing link's
1624  * whole send queue via the remaining link. This way, we don't lose
1625  * any packets, and sequence order is preserved for subsequent traffic
1626  * sent over the remaining link. Owner node is locked.
1627  */
1628 void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1629 {
1630         u32 msgcount = skb_queue_len(&l_ptr->outqueue);
1631         struct tipc_link *tunnel = l_ptr->owner->active_links[0];
1632         struct tipc_msg tunnel_hdr;
1633         struct sk_buff *skb;
1634         int split_bundles;
1635
1636         if (!tunnel)
1637                 return;
1638
1639         tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
1640                  ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
1641         msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1642         msg_set_msgcnt(&tunnel_hdr, msgcount);
1643
1644         if (skb_queue_empty(&l_ptr->outqueue)) {
1645                 skb = tipc_buf_acquire(INT_H_SIZE);
1646                 if (skb) {
1647                         skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
1648                         msg_set_size(&tunnel_hdr, INT_H_SIZE);
1649                         __tipc_link_xmit_skb(tunnel, skb);
1650                 } else {
1651                         pr_warn("%sunable to send changeover msg\n",
1652                                 link_co_err);
1653                 }
1654                 return;
1655         }
1656
1657         split_bundles = (l_ptr->owner->active_links[0] !=
1658                          l_ptr->owner->active_links[1]);
1659
1660         skb_queue_walk(&l_ptr->outqueue, skb) {
1661                 struct tipc_msg *msg = buf_msg(skb);
1662
1663                 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
1664                         struct tipc_msg *m = msg_get_wrapped(msg);
1665                         unchar *pos = (unchar *)m;
1666
1667                         msgcount = msg_msgcnt(msg);
1668                         while (msgcount--) {
1669                                 msg_set_seqno(m, msg_seqno(msg));
1670                                 tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, m,
1671                                                       msg_link_selector(m));
1672                                 pos += align(msg_size(m));
1673                                 m = (struct tipc_msg *)pos;
1674                         }
1675                 } else {
1676                         tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg,
1677                                               msg_link_selector(msg));
1678                 }
1679         }
1680 }
1681
1682 /* tipc_link_dup_queue_xmit(): A second link has become active. Tunnel a
1683  * duplicate of the first link's send queue via the new link. This way, we
1684  * are guaranteed that currently queued packets from a socket are delivered
1685  * before future traffic from the same socket, even if this is using the
1686  * new link. The last arriving copy of each duplicate packet is dropped at
1687  * the receiving end by the regular protocol check, so packet cardinality
1688  * and sequence order is preserved per sender/receiver socket pair.
1689  * Owner node is locked.
1690  */
1691 void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
1692                               struct tipc_link *tunnel)
1693 {
1694         struct sk_buff *skb;
1695         struct tipc_msg tunnel_hdr;
1696
1697         tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
1698                  DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
1699         msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue));
1700         msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1701         skb_queue_walk(&l_ptr->outqueue, skb) {
1702                 struct sk_buff *outskb;
1703                 struct tipc_msg *msg = buf_msg(skb);
1704                 u32 length = msg_size(msg);
1705
1706                 if (msg_user(msg) == MSG_BUNDLER)
1707                         msg_set_type(msg, CLOSED_MSG);
1708                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));   /* Update */
1709                 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1710                 msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
1711                 outskb = tipc_buf_acquire(length + INT_H_SIZE);
1712                 if (outskb == NULL) {
1713                         pr_warn("%sunable to send duplicate msg\n",
1714                                 link_co_err);
1715                         return;
1716                 }
1717                 skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE);
1718                 skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data,
1719                                                length);
1720                 __tipc_link_xmit_skb(tunnel, outskb);
1721                 if (!tipc_link_is_up(l_ptr))
1722                         return;
1723         }
1724 }
1725
1726 /**
1727  * buf_extract - extracts embedded TIPC message from another message
1728  * @skb: encapsulating message buffer
1729  * @from_pos: offset to extract from
1730  *
1731  * Returns a new message buffer containing an embedded message.  The
1732  * encapsulating message itself is left unchanged.
1733  */
1734 static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
1735 {
1736         struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
1737         u32 size = msg_size(msg);
1738         struct sk_buff *eb;
1739
1740         eb = tipc_buf_acquire(size);
1741         if (eb)
1742                 skb_copy_to_linear_data(eb, msg, size);
1743         return eb;
1744 }
1745
1746
1747
1748 /* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
1749  * Owner node is locked.
1750  */
1751 static void tipc_link_dup_rcv(struct tipc_link *l_ptr,
1752                               struct sk_buff *t_buf)
1753 {
1754         struct sk_buff *buf;
1755
1756         if (!tipc_link_is_up(l_ptr))
1757                 return;
1758
1759         buf = buf_extract(t_buf, INT_H_SIZE);
1760         if (buf == NULL) {
1761                 pr_warn("%sfailed to extract inner dup pkt\n", link_co_err);
1762                 return;
1763         }
1764
1765         /* Add buffer to deferred queue, if applicable: */
1766         link_handle_out_of_seq_msg(l_ptr, buf);
1767 }
1768
1769 /*  tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet
1770  *  Owner node is locked.
1771  */
1772 static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr,
1773                                               struct sk_buff *t_buf)
1774 {
1775         struct tipc_msg *t_msg = buf_msg(t_buf);
1776         struct sk_buff *buf = NULL;
1777         struct tipc_msg *msg;
1778
1779         if (tipc_link_is_up(l_ptr))
1780                 tipc_link_reset(l_ptr);
1781
1782         /* First failover packet? */
1783         if (l_ptr->exp_msg_count == START_CHANGEOVER)
1784                 l_ptr->exp_msg_count = msg_msgcnt(t_msg);
1785
1786         /* Should there be an inner packet? */
1787         if (l_ptr->exp_msg_count) {
1788                 l_ptr->exp_msg_count--;
1789                 buf = buf_extract(t_buf, INT_H_SIZE);
1790                 if (buf == NULL) {
1791                         pr_warn("%sno inner failover pkt\n", link_co_err);
1792                         goto exit;
1793                 }
1794                 msg = buf_msg(buf);
1795
1796                 if (less(msg_seqno(msg), l_ptr->reset_checkpoint)) {
1797                         kfree_skb(buf);
1798                         buf = NULL;
1799                         goto exit;
1800                 }
1801                 if (msg_user(msg) == MSG_FRAGMENTER) {
1802                         l_ptr->stats.recv_fragments++;
1803                         tipc_buf_append(&l_ptr->reasm_buf, &buf);
1804                 }
1805         }
1806 exit:
1807         if ((l_ptr->exp_msg_count == 0) && (l_ptr->flags & LINK_STOPPED)) {
1808                 tipc_node_detach_link(l_ptr->owner, l_ptr);
1809                 kfree(l_ptr);
1810         }
1811         return buf;
1812 }
1813
1814 /*  tipc_link_tunnel_rcv(): Receive a tunnelled packet, sent
1815  *  via other link as result of a failover (ORIGINAL_MSG) or
1816  *  a new active link (DUPLICATE_MSG). Failover packets are
1817  *  returned to the active link for delivery upwards.
1818  *  Owner node is locked.
1819  */
1820 static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
1821                                 struct sk_buff **buf)
1822 {
1823         struct sk_buff *t_buf = *buf;
1824         struct tipc_link *l_ptr;
1825         struct tipc_msg *t_msg = buf_msg(t_buf);
1826         u32 bearer_id = msg_bearer_id(t_msg);
1827
1828         *buf = NULL;
1829
1830         if (bearer_id >= MAX_BEARERS)
1831                 goto exit;
1832
1833         l_ptr = n_ptr->links[bearer_id];
1834         if (!l_ptr)
1835                 goto exit;
1836
1837         if (msg_type(t_msg) == DUPLICATE_MSG)
1838                 tipc_link_dup_rcv(l_ptr, t_buf);
1839         else if (msg_type(t_msg) == ORIGINAL_MSG)
1840                 *buf = tipc_link_failover_rcv(l_ptr, t_buf);
1841         else
1842                 pr_warn("%sunknown tunnel pkt received\n", link_co_err);
1843 exit:
1844         kfree_skb(t_buf);
1845         return *buf != NULL;
1846 }
1847
1848 /*
1849  *  Bundler functionality:
1850  */
1851 void tipc_link_bundle_rcv(struct sk_buff *buf)
1852 {
1853         u32 msgcount = msg_msgcnt(buf_msg(buf));
1854         u32 pos = INT_H_SIZE;
1855         struct sk_buff *obuf;
1856         struct tipc_msg *omsg;
1857
1858         while (msgcount--) {
1859                 obuf = buf_extract(buf, pos);
1860                 if (obuf == NULL) {
1861                         pr_warn("Link unable to unbundle message(s)\n");
1862                         break;
1863                 }
1864                 omsg = buf_msg(obuf);
1865                 pos += align(msg_size(omsg));
1866                 if (msg_isdata(omsg)) {
1867                         if (unlikely(msg_type(omsg) == TIPC_MCAST_MSG))
1868                                 tipc_sk_mcast_rcv(obuf);
1869                         else
1870                                 tipc_sk_rcv(obuf);
1871                 } else if (msg_user(omsg) == CONN_MANAGER) {
1872                         tipc_sk_rcv(obuf);
1873                 } else if (msg_user(omsg) == NAME_DISTRIBUTOR) {
1874                         tipc_named_rcv(obuf);
1875                 } else {
1876                         pr_warn("Illegal bundled msg: %u\n", msg_user(omsg));
1877                         kfree_skb(obuf);
1878                 }
1879         }
1880         kfree_skb(buf);
1881 }
1882
1883 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
1884 {
1885         if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL))
1886                 return;
1887
1888         l_ptr->tolerance = tolerance;
1889         l_ptr->continuity_interval =
1890                 ((tolerance / 4) > 500) ? 500 : tolerance / 4;
1891         l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4);
1892 }
1893
1894 void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
1895 {
1896         /* Data messages from this node, inclusive FIRST_FRAGM */
1897         l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
1898         l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
1899         l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
1900         l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
1901         /* Transiting data messages,inclusive FIRST_FRAGM */
1902         l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
1903         l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
1904         l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
1905         l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
1906         l_ptr->queue_limit[CONN_MANAGER] = 1200;
1907         l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
1908         l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
1909         /* FRAGMENT and LAST_FRAGMENT packets */
1910         l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
1911 }
1912
1913 /* tipc_link_find_owner - locate owner node of link by link's name
1914  * @name: pointer to link name string
1915  * @bearer_id: pointer to index in 'node->links' array where the link was found.
1916  *
1917  * Returns pointer to node owning the link, or 0 if no matching link is found.
1918  */
1919 static struct tipc_node *tipc_link_find_owner(const char *link_name,
1920                                               unsigned int *bearer_id)
1921 {
1922         struct tipc_link *l_ptr;
1923         struct tipc_node *n_ptr;
1924         struct tipc_node *found_node = 0;
1925         int i;
1926
1927         *bearer_id = 0;
1928         rcu_read_lock();
1929         list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
1930                 tipc_node_lock(n_ptr);
1931                 for (i = 0; i < MAX_BEARERS; i++) {
1932                         l_ptr = n_ptr->links[i];
1933                         if (l_ptr && !strcmp(l_ptr->name, link_name)) {
1934                                 *bearer_id = i;
1935                                 found_node = n_ptr;
1936                                 break;
1937                         }
1938                 }
1939                 tipc_node_unlock(n_ptr);
1940                 if (found_node)
1941                         break;
1942         }
1943         rcu_read_unlock();
1944
1945         return found_node;
1946 }
1947
1948 /**
1949  * link_value_is_valid -- validate proposed link tolerance/priority/window
1950  *
1951  * @cmd: value type (TIPC_CMD_SET_LINK_*)
1952  * @new_value: the new value
1953  *
1954  * Returns 1 if value is within range, 0 if not.
1955  */
1956 static int link_value_is_valid(u16 cmd, u32 new_value)
1957 {
1958         switch (cmd) {
1959         case TIPC_CMD_SET_LINK_TOL:
1960                 return (new_value >= TIPC_MIN_LINK_TOL) &&
1961                         (new_value <= TIPC_MAX_LINK_TOL);
1962         case TIPC_CMD_SET_LINK_PRI:
1963                 return (new_value <= TIPC_MAX_LINK_PRI);
1964         case TIPC_CMD_SET_LINK_WINDOW:
1965                 return (new_value >= TIPC_MIN_LINK_WIN) &&
1966                         (new_value <= TIPC_MAX_LINK_WIN);
1967         }
1968         return 0;
1969 }
1970
1971 /**
1972  * link_cmd_set_value - change priority/tolerance/window for link/bearer/media
1973  * @name: ptr to link, bearer, or media name
1974  * @new_value: new value of link, bearer, or media setting
1975  * @cmd: which link, bearer, or media attribute to set (TIPC_CMD_SET_LINK_*)
1976  *
1977  * Caller must hold RTNL lock to ensure link/bearer/media is not deleted.
1978  *
1979  * Returns 0 if value updated and negative value on error.
1980  */
1981 static int link_cmd_set_value(const char *name, u32 new_value, u16 cmd)
1982 {
1983         struct tipc_node *node;
1984         struct tipc_link *l_ptr;
1985         struct tipc_bearer *b_ptr;
1986         struct tipc_media *m_ptr;
1987         int bearer_id;
1988         int res = 0;
1989
1990         node = tipc_link_find_owner(name, &bearer_id);
1991         if (node) {
1992                 tipc_node_lock(node);
1993                 l_ptr = node->links[bearer_id];
1994
1995                 if (l_ptr) {
1996                         switch (cmd) {
1997                         case TIPC_CMD_SET_LINK_TOL:
1998                                 link_set_supervision_props(l_ptr, new_value);
1999                                 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0,
2000                                                      new_value, 0, 0);
2001                                 break;
2002                         case TIPC_CMD_SET_LINK_PRI:
2003                                 l_ptr->priority = new_value;
2004                                 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0,
2005                                                      0, new_value, 0);
2006                                 break;
2007                         case TIPC_CMD_SET_LINK_WINDOW:
2008                                 tipc_link_set_queue_limits(l_ptr, new_value);
2009                                 break;
2010                         default:
2011                                 res = -EINVAL;
2012                                 break;
2013                         }
2014                 }
2015                 tipc_node_unlock(node);
2016                 return res;
2017         }
2018
2019         b_ptr = tipc_bearer_find(name);
2020         if (b_ptr) {
2021                 switch (cmd) {
2022                 case TIPC_CMD_SET_LINK_TOL:
2023                         b_ptr->tolerance = new_value;
2024                         break;
2025                 case TIPC_CMD_SET_LINK_PRI:
2026                         b_ptr->priority = new_value;
2027                         break;
2028                 case TIPC_CMD_SET_LINK_WINDOW:
2029                         b_ptr->window = new_value;
2030                         break;
2031                 default:
2032                         res = -EINVAL;
2033                         break;
2034                 }
2035                 return res;
2036         }
2037
2038         m_ptr = tipc_media_find(name);
2039         if (!m_ptr)
2040                 return -ENODEV;
2041         switch (cmd) {
2042         case TIPC_CMD_SET_LINK_TOL:
2043                 m_ptr->tolerance = new_value;
2044                 break;
2045         case TIPC_CMD_SET_LINK_PRI:
2046                 m_ptr->priority = new_value;
2047                 break;
2048         case TIPC_CMD_SET_LINK_WINDOW:
2049                 m_ptr->window = new_value;
2050                 break;
2051         default:
2052                 res = -EINVAL;
2053                 break;
2054         }
2055         return res;
2056 }
2057
2058 struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space,
2059                                      u16 cmd)
2060 {
2061         struct tipc_link_config *args;
2062         u32 new_value;
2063         int res;
2064
2065         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG))
2066                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2067
2068         args = (struct tipc_link_config *)TLV_DATA(req_tlv_area);
2069         new_value = ntohl(args->value);
2070
2071         if (!link_value_is_valid(cmd, new_value))
2072                 return tipc_cfg_reply_error_string(
2073                         "cannot change, value invalid");
2074
2075         if (!strcmp(args->name, tipc_bclink_name)) {
2076                 if ((cmd == TIPC_CMD_SET_LINK_WINDOW) &&
2077                     (tipc_bclink_set_queue_limits(new_value) == 0))
2078                         return tipc_cfg_reply_none();
2079                 return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
2080                                                    " (cannot change setting on broadcast link)");
2081         }
2082
2083         res = link_cmd_set_value(args->name, new_value, cmd);
2084         if (res)
2085                 return tipc_cfg_reply_error_string("cannot change link setting");
2086
2087         return tipc_cfg_reply_none();
2088 }
2089
2090 /**
2091  * link_reset_statistics - reset link statistics
2092  * @l_ptr: pointer to link
2093  */
2094 static void link_reset_statistics(struct tipc_link *l_ptr)
2095 {
2096         memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
2097         l_ptr->stats.sent_info = l_ptr->next_out_no;
2098         l_ptr->stats.recv_info = l_ptr->next_in_no;
2099 }
2100
2101 struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space)
2102 {
2103         char *link_name;
2104         struct tipc_link *l_ptr;
2105         struct tipc_node *node;
2106         unsigned int bearer_id;
2107
2108         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2109                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2110
2111         link_name = (char *)TLV_DATA(req_tlv_area);
2112         if (!strcmp(link_name, tipc_bclink_name)) {
2113                 if (tipc_bclink_reset_stats())
2114                         return tipc_cfg_reply_error_string("link not found");
2115                 return tipc_cfg_reply_none();
2116         }
2117         node = tipc_link_find_owner(link_name, &bearer_id);
2118         if (!node)
2119                 return tipc_cfg_reply_error_string("link not found");
2120
2121         tipc_node_lock(node);
2122         l_ptr = node->links[bearer_id];
2123         if (!l_ptr) {
2124                 tipc_node_unlock(node);
2125                 return tipc_cfg_reply_error_string("link not found");
2126         }
2127         link_reset_statistics(l_ptr);
2128         tipc_node_unlock(node);
2129         return tipc_cfg_reply_none();
2130 }
2131
2132 /**
2133  * percent - convert count to a percentage of total (rounding up or down)
2134  */
2135 static u32 percent(u32 count, u32 total)
2136 {
2137         return (count * 100 + (total / 2)) / total;
2138 }
2139
2140 /**
2141  * tipc_link_stats - print link statistics
2142  * @name: link name
2143  * @buf: print buffer area
2144  * @buf_size: size of print buffer area
2145  *
2146  * Returns length of print buffer data string (or 0 if error)
2147  */
2148 static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
2149 {
2150         struct tipc_link *l;
2151         struct tipc_stats *s;
2152         struct tipc_node *node;
2153         char *status;
2154         u32 profile_total = 0;
2155         unsigned int bearer_id;
2156         int ret;
2157
2158         if (!strcmp(name, tipc_bclink_name))
2159                 return tipc_bclink_stats(buf, buf_size);
2160
2161         node = tipc_link_find_owner(name, &bearer_id);
2162         if (!node)
2163                 return 0;
2164
2165         tipc_node_lock(node);
2166
2167         l = node->links[bearer_id];
2168         if (!l) {
2169                 tipc_node_unlock(node);
2170                 return 0;
2171         }
2172
2173         s = &l->stats;
2174
2175         if (tipc_link_is_active(l))
2176                 status = "ACTIVE";
2177         else if (tipc_link_is_up(l))
2178                 status = "STANDBY";
2179         else
2180                 status = "DEFUNCT";
2181
2182         ret = tipc_snprintf(buf, buf_size, "Link <%s>\n"
2183                             "  %s  MTU:%u  Priority:%u  Tolerance:%u ms"
2184                             "  Window:%u packets\n",
2185                             l->name, status, l->max_pkt, l->priority,
2186                             l->tolerance, l->queue_limit[0]);
2187
2188         ret += tipc_snprintf(buf + ret, buf_size - ret,
2189                              "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
2190                              l->next_in_no - s->recv_info, s->recv_fragments,
2191                              s->recv_fragmented, s->recv_bundles,
2192                              s->recv_bundled);
2193
2194         ret += tipc_snprintf(buf + ret, buf_size - ret,
2195                              "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
2196                              l->next_out_no - s->sent_info, s->sent_fragments,
2197                              s->sent_fragmented, s->sent_bundles,
2198                              s->sent_bundled);
2199
2200         profile_total = s->msg_length_counts;
2201         if (!profile_total)
2202                 profile_total = 1;
2203
2204         ret += tipc_snprintf(buf + ret, buf_size - ret,
2205                              "  TX profile sample:%u packets  average:%u octets\n"
2206                              "  0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% "
2207                              "-16384:%u%% -32768:%u%% -66000:%u%%\n",
2208                              s->msg_length_counts,
2209                              s->msg_lengths_total / profile_total,
2210                              percent(s->msg_length_profile[0], profile_total),
2211                              percent(s->msg_length_profile[1], profile_total),
2212                              percent(s->msg_length_profile[2], profile_total),
2213                              percent(s->msg_length_profile[3], profile_total),
2214                              percent(s->msg_length_profile[4], profile_total),
2215                              percent(s->msg_length_profile[5], profile_total),
2216                              percent(s->msg_length_profile[6], profile_total));
2217
2218         ret += tipc_snprintf(buf + ret, buf_size - ret,
2219                              "  RX states:%u probes:%u naks:%u defs:%u"
2220                              " dups:%u\n", s->recv_states, s->recv_probes,
2221                              s->recv_nacks, s->deferred_recv, s->duplicates);
2222
2223         ret += tipc_snprintf(buf + ret, buf_size - ret,
2224                              "  TX states:%u probes:%u naks:%u acks:%u"
2225                              " dups:%u\n", s->sent_states, s->sent_probes,
2226                              s->sent_nacks, s->sent_acks, s->retransmitted);
2227
2228         ret += tipc_snprintf(buf + ret, buf_size - ret,
2229                              "  Congestion link:%u  Send queue"
2230                              " max:%u avg:%u\n", s->link_congs,
2231                              s->max_queue_sz, s->queue_sz_counts ?
2232                              (s->accu_queue_sz / s->queue_sz_counts) : 0);
2233
2234         tipc_node_unlock(node);
2235         return ret;
2236 }
2237
2238 struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space)
2239 {
2240         struct sk_buff *buf;
2241         struct tlv_desc *rep_tlv;
2242         int str_len;
2243         int pb_len;
2244         char *pb;
2245
2246         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2247                 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2248
2249         buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
2250         if (!buf)
2251                 return NULL;
2252
2253         rep_tlv = (struct tlv_desc *)buf->data;
2254         pb = TLV_DATA(rep_tlv);
2255         pb_len = ULTRA_STRING_MAX_LEN;
2256         str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area),
2257                                   pb, pb_len);
2258         if (!str_len) {
2259                 kfree_skb(buf);
2260                 return tipc_cfg_reply_error_string("link not found");
2261         }
2262         str_len += 1;   /* for "\0" */
2263         skb_put(buf, TLV_SPACE(str_len));
2264         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
2265
2266         return buf;
2267 }
2268
2269 /**
2270  * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination
2271  * @dest: network address of destination node
2272  * @selector: used to select from set of active links
2273  *
2274  * If no active link can be found, uses default maximum packet size.
2275  */
2276 u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
2277 {
2278         struct tipc_node *n_ptr;
2279         struct tipc_link *l_ptr;
2280         u32 res = MAX_PKT_DEFAULT;
2281
2282         if (dest == tipc_own_addr)
2283                 return MAX_MSG_SIZE;
2284
2285         n_ptr = tipc_node_find(dest);
2286         if (n_ptr) {
2287                 tipc_node_lock(n_ptr);
2288                 l_ptr = n_ptr->active_links[selector & 1];
2289                 if (l_ptr)
2290                         res = l_ptr->max_pkt;
2291                 tipc_node_unlock(n_ptr);
2292         }
2293         return res;
2294 }
2295
2296 static void link_print(struct tipc_link *l_ptr, const char *str)
2297 {
2298         struct tipc_bearer *b_ptr;
2299
2300         rcu_read_lock();
2301         b_ptr = rcu_dereference_rtnl(bearer_list[l_ptr->bearer_id]);
2302         if (b_ptr)
2303                 pr_info("%s Link %x<%s>:", str, l_ptr->addr, b_ptr->name);
2304         rcu_read_unlock();
2305
2306         if (link_working_unknown(l_ptr))
2307                 pr_cont(":WU\n");
2308         else if (link_reset_reset(l_ptr))
2309                 pr_cont(":RR\n");
2310         else if (link_reset_unknown(l_ptr))
2311                 pr_cont(":RU\n");
2312         else if (link_working_working(l_ptr))
2313                 pr_cont(":WW\n");
2314         else
2315                 pr_cont("\n");
2316 }
2317
2318 /* Parse and validate nested (link) properties valid for media, bearer and link
2319  */
2320 int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[])
2321 {
2322         int err;
2323
2324         err = nla_parse_nested(props, TIPC_NLA_PROP_MAX, prop,
2325                                tipc_nl_prop_policy);
2326         if (err)
2327                 return err;
2328
2329         if (props[TIPC_NLA_PROP_PRIO]) {
2330                 u32 prio;
2331
2332                 prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
2333                 if (prio > TIPC_MAX_LINK_PRI)
2334                         return -EINVAL;
2335         }
2336
2337         if (props[TIPC_NLA_PROP_TOL]) {
2338                 u32 tol;
2339
2340                 tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
2341                 if ((tol < TIPC_MIN_LINK_TOL) || (tol > TIPC_MAX_LINK_TOL))
2342                         return -EINVAL;
2343         }
2344
2345         if (props[TIPC_NLA_PROP_WIN]) {
2346                 u32 win;
2347
2348                 win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
2349                 if ((win < TIPC_MIN_LINK_WIN) || (win > TIPC_MAX_LINK_WIN))
2350                         return -EINVAL;
2351         }
2352
2353         return 0;
2354 }
2355
2356 int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)
2357 {
2358         int err;
2359         int res = 0;
2360         int bearer_id;
2361         char *name;
2362         struct tipc_link *link;
2363         struct tipc_node *node;
2364         struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
2365
2366         if (!info->attrs[TIPC_NLA_LINK])
2367                 return -EINVAL;
2368
2369         err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
2370                                info->attrs[TIPC_NLA_LINK],
2371                                tipc_nl_link_policy);
2372         if (err)
2373                 return err;
2374
2375         if (!attrs[TIPC_NLA_LINK_NAME])
2376                 return -EINVAL;
2377
2378         name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
2379
2380         node = tipc_link_find_owner(name, &bearer_id);
2381         if (!node)
2382                 return -EINVAL;
2383
2384         tipc_node_lock(node);
2385
2386         link = node->links[bearer_id];
2387         if (!link) {
2388                 res = -EINVAL;
2389                 goto out;
2390         }
2391
2392         if (attrs[TIPC_NLA_LINK_PROP]) {
2393                 struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
2394
2395                 err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP],
2396                                               props);
2397                 if (err) {
2398                         res = err;
2399                         goto out;
2400                 }
2401
2402                 if (props[TIPC_NLA_PROP_TOL]) {
2403                         u32 tol;
2404
2405                         tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
2406                         link_set_supervision_props(link, tol);
2407                         tipc_link_proto_xmit(link, STATE_MSG, 0, 0, tol, 0, 0);
2408                 }
2409                 if (props[TIPC_NLA_PROP_PRIO]) {
2410                         u32 prio;
2411
2412                         prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
2413                         link->priority = prio;
2414                         tipc_link_proto_xmit(link, STATE_MSG, 0, 0, 0, prio, 0);
2415                 }
2416                 if (props[TIPC_NLA_PROP_WIN]) {
2417                         u32 win;
2418
2419                         win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
2420                         tipc_link_set_queue_limits(link, win);
2421                 }
2422         }
2423
2424 out:
2425         tipc_node_unlock(node);
2426
2427         return res;
2428 }
2429
2430 static int __tipc_nl_add_stats(struct sk_buff *skb, struct tipc_stats *s)
2431 {
2432         int i;
2433         struct nlattr *stats;
2434
2435         struct nla_map {
2436                 u32 key;
2437                 u32 val;
2438         };
2439
2440         struct nla_map map[] = {
2441                 {TIPC_NLA_STATS_RX_INFO, s->recv_info},
2442                 {TIPC_NLA_STATS_RX_FRAGMENTS, s->recv_fragments},
2443                 {TIPC_NLA_STATS_RX_FRAGMENTED, s->recv_fragmented},
2444                 {TIPC_NLA_STATS_RX_BUNDLES, s->recv_bundles},
2445                 {TIPC_NLA_STATS_RX_BUNDLED, s->recv_bundled},
2446                 {TIPC_NLA_STATS_TX_INFO, s->sent_info},
2447                 {TIPC_NLA_STATS_TX_FRAGMENTS, s->sent_fragments},
2448                 {TIPC_NLA_STATS_TX_FRAGMENTED, s->sent_fragmented},
2449                 {TIPC_NLA_STATS_TX_BUNDLES, s->sent_bundles},
2450                 {TIPC_NLA_STATS_TX_BUNDLED, s->sent_bundled},
2451                 {TIPC_NLA_STATS_MSG_PROF_TOT, (s->msg_length_counts) ?
2452                         s->msg_length_counts : 1},
2453                 {TIPC_NLA_STATS_MSG_LEN_CNT, s->msg_length_counts},
2454                 {TIPC_NLA_STATS_MSG_LEN_TOT, s->msg_lengths_total},
2455                 {TIPC_NLA_STATS_MSG_LEN_P0, s->msg_length_profile[0]},
2456                 {TIPC_NLA_STATS_MSG_LEN_P1, s->msg_length_profile[1]},
2457                 {TIPC_NLA_STATS_MSG_LEN_P2, s->msg_length_profile[2]},
2458                 {TIPC_NLA_STATS_MSG_LEN_P3, s->msg_length_profile[3]},
2459                 {TIPC_NLA_STATS_MSG_LEN_P4, s->msg_length_profile[4]},
2460                 {TIPC_NLA_STATS_MSG_LEN_P5, s->msg_length_profile[5]},
2461                 {TIPC_NLA_STATS_MSG_LEN_P6, s->msg_length_profile[6]},
2462                 {TIPC_NLA_STATS_RX_STATES, s->recv_states},
2463                 {TIPC_NLA_STATS_RX_PROBES, s->recv_probes},
2464                 {TIPC_NLA_STATS_RX_NACKS, s->recv_nacks},
2465                 {TIPC_NLA_STATS_RX_DEFERRED, s->deferred_recv},
2466                 {TIPC_NLA_STATS_TX_STATES, s->sent_states},
2467                 {TIPC_NLA_STATS_TX_PROBES, s->sent_probes},
2468                 {TIPC_NLA_STATS_TX_NACKS, s->sent_nacks},
2469                 {TIPC_NLA_STATS_TX_ACKS, s->sent_acks},
2470                 {TIPC_NLA_STATS_RETRANSMITTED, s->retransmitted},
2471                 {TIPC_NLA_STATS_DUPLICATES, s->duplicates},
2472                 {TIPC_NLA_STATS_LINK_CONGS, s->link_congs},
2473                 {TIPC_NLA_STATS_MAX_QUEUE, s->max_queue_sz},
2474                 {TIPC_NLA_STATS_AVG_QUEUE, s->queue_sz_counts ?
2475                         (s->accu_queue_sz / s->queue_sz_counts) : 0}
2476         };
2477
2478         stats = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
2479         if (!stats)
2480                 return -EMSGSIZE;
2481
2482         for (i = 0; i <  ARRAY_SIZE(map); i++)
2483                 if (nla_put_u32(skb, map[i].key, map[i].val))
2484                         goto msg_full;
2485
2486         nla_nest_end(skb, stats);
2487
2488         return 0;
2489 msg_full:
2490         nla_nest_cancel(skb, stats);
2491
2492         return -EMSGSIZE;
2493 }
2494
2495 /* Caller should hold appropriate locks to protect the link */
2496 static int __tipc_nl_add_link(struct tipc_nl_msg *msg, struct tipc_link *link)
2497 {
2498         int err;
2499         void *hdr;
2500         struct nlattr *attrs;
2501         struct nlattr *prop;
2502
2503         hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family,
2504                           NLM_F_MULTI, TIPC_NL_LINK_GET);
2505         if (!hdr)
2506                 return -EMSGSIZE;
2507
2508         attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
2509         if (!attrs)
2510                 goto msg_full;
2511
2512         if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, link->name))
2513                 goto attr_msg_full;
2514         if (nla_put_u32(msg->skb, TIPC_NLA_LINK_DEST,
2515                         tipc_cluster_mask(tipc_own_addr)))
2516                 goto attr_msg_full;
2517         if (nla_put_u32(msg->skb, TIPC_NLA_LINK_MTU, link->max_pkt))
2518                 goto attr_msg_full;
2519         if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, link->next_in_no))
2520                 goto attr_msg_full;
2521         if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, link->next_out_no))
2522                 goto attr_msg_full;
2523
2524         if (tipc_link_is_up(link))
2525                 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
2526                         goto attr_msg_full;
2527         if (tipc_link_is_active(link))
2528                 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_ACTIVE))
2529                         goto attr_msg_full;
2530
2531         prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
2532         if (!prop)
2533                 goto attr_msg_full;
2534         if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))
2535                 goto prop_msg_full;
2536         if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, link->tolerance))
2537                 goto prop_msg_full;
2538         if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN,
2539                         link->queue_limit[TIPC_LOW_IMPORTANCE]))
2540                 goto prop_msg_full;
2541         if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))
2542                 goto prop_msg_full;
2543         nla_nest_end(msg->skb, prop);
2544
2545         err = __tipc_nl_add_stats(msg->skb, &link->stats);
2546         if (err)
2547                 goto attr_msg_full;
2548
2549         nla_nest_end(msg->skb, attrs);
2550         genlmsg_end(msg->skb, hdr);
2551
2552         return 0;
2553
2554 prop_msg_full:
2555         nla_nest_cancel(msg->skb, prop);
2556 attr_msg_full:
2557         nla_nest_cancel(msg->skb, attrs);
2558 msg_full:
2559         genlmsg_cancel(msg->skb, hdr);
2560
2561         return -EMSGSIZE;
2562 }
2563
2564 /* Caller should hold node lock  */
2565 static int __tipc_nl_add_node_links(struct tipc_nl_msg *msg,
2566                                     struct tipc_node *node,
2567                                     u32 *prev_link)
2568 {
2569         u32 i;
2570         int err;
2571
2572         for (i = *prev_link; i < MAX_BEARERS; i++) {
2573                 *prev_link = i;
2574
2575                 if (!node->links[i])
2576                         continue;
2577
2578                 err = __tipc_nl_add_link(msg, node->links[i]);
2579                 if (err)
2580                         return err;
2581         }
2582         *prev_link = 0;
2583
2584         return 0;
2585 }
2586
2587 int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb)
2588 {
2589         struct tipc_node *node;
2590         struct tipc_nl_msg msg;
2591         u32 prev_node = cb->args[0];
2592         u32 prev_link = cb->args[1];
2593         int done = cb->args[2];
2594         int err;
2595
2596         if (done)
2597                 return 0;
2598
2599         msg.skb = skb;
2600         msg.portid = NETLINK_CB(cb->skb).portid;
2601         msg.seq = cb->nlh->nlmsg_seq;
2602
2603         rcu_read_lock();
2604
2605         if (prev_node) {
2606                 node = tipc_node_find(prev_node);
2607                 if (!node) {
2608                         /* We never set seq or call nl_dump_check_consistent()
2609                          * this means that setting prev_seq here will cause the
2610                          * consistence check to fail in the netlink callback
2611                          * handler. Resulting in the last NLMSG_DONE message
2612                          * having the NLM_F_DUMP_INTR flag set.
2613                          */
2614                         cb->prev_seq = 1;
2615                         goto out;
2616                 }
2617
2618                 list_for_each_entry_continue_rcu(node, &tipc_node_list, list) {
2619                         tipc_node_lock(node);
2620                         err = __tipc_nl_add_node_links(&msg, node, &prev_link);
2621                         tipc_node_unlock(node);
2622                         if (err)
2623                                 goto out;
2624
2625                         prev_node = node->addr;
2626                 }
2627         } else {
2628                 err = tipc_nl_add_bc_link(&msg);
2629                 if (err)
2630                         goto out;
2631
2632                 list_for_each_entry_rcu(node, &tipc_node_list, list) {
2633                         tipc_node_lock(node);
2634                         err = __tipc_nl_add_node_links(&msg, node, &prev_link);
2635                         tipc_node_unlock(node);
2636                         if (err)
2637                                 goto out;
2638
2639                         prev_node = node->addr;
2640                 }
2641         }
2642         done = 1;
2643 out:
2644         rcu_read_unlock();
2645
2646         cb->args[0] = prev_node;
2647         cb->args[1] = prev_link;
2648         cb->args[2] = done;
2649
2650         return skb->len;
2651 }
2652
2653 int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info)
2654 {
2655         struct sk_buff *ans_skb;
2656         struct tipc_nl_msg msg;
2657         struct tipc_link *link;
2658         struct tipc_node *node;
2659         char *name;
2660         int bearer_id;
2661         int err;
2662
2663         if (!info->attrs[TIPC_NLA_LINK_NAME])
2664                 return -EINVAL;
2665
2666         name = nla_data(info->attrs[TIPC_NLA_LINK_NAME]);
2667         node = tipc_link_find_owner(name, &bearer_id);
2668         if (!node)
2669                 return -EINVAL;
2670
2671         ans_skb = nlmsg_new(NLMSG_GOODSIZE, GFP_KERNEL);
2672         if (!ans_skb)
2673                 return -ENOMEM;
2674
2675         msg.skb = ans_skb;
2676         msg.portid = info->snd_portid;
2677         msg.seq = info->snd_seq;
2678
2679         tipc_node_lock(node);
2680         link = node->links[bearer_id];
2681         if (!link) {
2682                 err = -EINVAL;
2683                 goto err_out;
2684         }
2685
2686         err = __tipc_nl_add_link(&msg, link);
2687         if (err)
2688                 goto err_out;
2689
2690         tipc_node_unlock(node);
2691
2692         return genlmsg_reply(ans_skb, info);
2693
2694 err_out:
2695         tipc_node_unlock(node);
2696         nlmsg_free(ans_skb);
2697
2698         return err;
2699 }
2700
2701 int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)
2702 {
2703         int err;
2704         char *link_name;
2705         unsigned int bearer_id;
2706         struct tipc_link *link;
2707         struct tipc_node *node;
2708         struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
2709
2710         if (!info->attrs[TIPC_NLA_LINK])
2711                 return -EINVAL;
2712
2713         err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
2714                                info->attrs[TIPC_NLA_LINK],
2715                                tipc_nl_link_policy);
2716         if (err)
2717                 return err;
2718
2719         if (!attrs[TIPC_NLA_LINK_NAME])
2720                 return -EINVAL;
2721
2722         link_name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
2723
2724         if (strcmp(link_name, tipc_bclink_name) == 0) {
2725                 err = tipc_bclink_reset_stats();
2726                 if (err)
2727                         return err;
2728                 return 0;
2729         }
2730
2731         node = tipc_link_find_owner(link_name, &bearer_id);
2732         if (!node)
2733                 return -EINVAL;
2734
2735         tipc_node_lock(node);
2736
2737         link = node->links[bearer_id];
2738         if (!link) {
2739                 tipc_node_unlock(node);
2740                 return -EINVAL;
2741         }
2742
2743         link_reset_statistics(link);
2744
2745         tipc_node_unlock(node);
2746
2747         return 0;
2748 }