tipc: use standard printk shortcut macros (pr_err etc.)
[cascardo/linux.git] / net / tipc / port.c
1 /*
2  * net/tipc/port.c: TIPC port code
3  *
4  * Copyright (c) 1992-2007, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2011, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "config.h"
39 #include "port.h"
40 #include "name_table.h"
41
42 /* Connection management: */
43 #define PROBING_INTERVAL 3600000        /* [ms] => 1 h */
44 #define CONFIRMED 0
45 #define PROBING 1
46
47 #define MAX_REJECT_SIZE 1024
48
49 static struct sk_buff *msg_queue_head;
50 static struct sk_buff *msg_queue_tail;
51
52 DEFINE_SPINLOCK(tipc_port_list_lock);
53 static DEFINE_SPINLOCK(queue_lock);
54
55 static LIST_HEAD(ports);
56 static void port_handle_node_down(unsigned long ref);
57 static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err);
58 static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err);
59 static void port_timeout(unsigned long ref);
60
61
62 static u32 port_peernode(struct tipc_port *p_ptr)
63 {
64         return msg_destnode(&p_ptr->phdr);
65 }
66
67 static u32 port_peerport(struct tipc_port *p_ptr)
68 {
69         return msg_destport(&p_ptr->phdr);
70 }
71
72 /**
73  * tipc_port_peer_msg - verify message was sent by connected port's peer
74  *
75  * Handles cases where the node's network address has changed from
76  * the default of <0.0.0> to its configured setting.
77  */
78 int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg)
79 {
80         u32 peernode;
81         u32 orignode;
82
83         if (msg_origport(msg) != port_peerport(p_ptr))
84                 return 0;
85
86         orignode = msg_orignode(msg);
87         peernode = port_peernode(p_ptr);
88         return (orignode == peernode) ||
89                 (!orignode && (peernode == tipc_own_addr)) ||
90                 (!peernode && (orignode == tipc_own_addr));
91 }
92
93 /**
94  * tipc_multicast - send a multicast message to local and remote destinations
95  */
96 int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
97                    u32 num_sect, struct iovec const *msg_sect,
98                    unsigned int total_len)
99 {
100         struct tipc_msg *hdr;
101         struct sk_buff *buf;
102         struct sk_buff *ibuf = NULL;
103         struct tipc_port_list dports = {0, NULL, };
104         struct tipc_port *oport = tipc_port_deref(ref);
105         int ext_targets;
106         int res;
107
108         if (unlikely(!oport))
109                 return -EINVAL;
110
111         /* Create multicast message */
112         hdr = &oport->phdr;
113         msg_set_type(hdr, TIPC_MCAST_MSG);
114         msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
115         msg_set_destport(hdr, 0);
116         msg_set_destnode(hdr, 0);
117         msg_set_nametype(hdr, seq->type);
118         msg_set_namelower(hdr, seq->lower);
119         msg_set_nameupper(hdr, seq->upper);
120         msg_set_hdr_sz(hdr, MCAST_H_SIZE);
121         res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
122                         !oport->user_port, &buf);
123         if (unlikely(!buf))
124                 return res;
125
126         /* Figure out where to send multicast message */
127         ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper,
128                                                 TIPC_NODE_SCOPE, &dports);
129
130         /* Send message to destinations (duplicate it only if necessary) */
131         if (ext_targets) {
132                 if (dports.count != 0) {
133                         ibuf = skb_copy(buf, GFP_ATOMIC);
134                         if (ibuf == NULL) {
135                                 tipc_port_list_free(&dports);
136                                 kfree_skb(buf);
137                                 return -ENOMEM;
138                         }
139                 }
140                 res = tipc_bclink_send_msg(buf);
141                 if ((res < 0) && (dports.count != 0))
142                         kfree_skb(ibuf);
143         } else {
144                 ibuf = buf;
145         }
146
147         if (res >= 0) {
148                 if (ibuf)
149                         tipc_port_recv_mcast(ibuf, &dports);
150         } else {
151                 tipc_port_list_free(&dports);
152         }
153         return res;
154 }
155
156 /**
157  * tipc_port_recv_mcast - deliver multicast message to all destination ports
158  *
159  * If there is no port list, perform a lookup to create one
160  */
161 void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp)
162 {
163         struct tipc_msg *msg;
164         struct tipc_port_list dports = {0, NULL, };
165         struct tipc_port_list *item = dp;
166         int cnt = 0;
167
168         msg = buf_msg(buf);
169
170         /* Create destination port list, if one wasn't supplied */
171         if (dp == NULL) {
172                 tipc_nametbl_mc_translate(msg_nametype(msg),
173                                      msg_namelower(msg),
174                                      msg_nameupper(msg),
175                                      TIPC_CLUSTER_SCOPE,
176                                      &dports);
177                 item = dp = &dports;
178         }
179
180         /* Deliver a copy of message to each destination port */
181         if (dp->count != 0) {
182                 msg_set_destnode(msg, tipc_own_addr);
183                 if (dp->count == 1) {
184                         msg_set_destport(msg, dp->ports[0]);
185                         tipc_port_recv_msg(buf);
186                         tipc_port_list_free(dp);
187                         return;
188                 }
189                 for (; cnt < dp->count; cnt++) {
190                         int index = cnt % PLSIZE;
191                         struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
192
193                         if (b == NULL) {
194                                 pr_warn("Unable to deliver multicast message(s)\n");
195                                 goto exit;
196                         }
197                         if ((index == 0) && (cnt != 0))
198                                 item = item->next;
199                         msg_set_destport(buf_msg(b), item->ports[index]);
200                         tipc_port_recv_msg(b);
201                 }
202         }
203 exit:
204         kfree_skb(buf);
205         tipc_port_list_free(dp);
206 }
207
208 /**
209  * tipc_createport_raw - create a generic TIPC port
210  *
211  * Returns pointer to (locked) TIPC port, or NULL if unable to create it
212  */
213 struct tipc_port *tipc_createport_raw(void *usr_handle,
214                         u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
215                         void (*wakeup)(struct tipc_port *),
216                         const u32 importance)
217 {
218         struct tipc_port *p_ptr;
219         struct tipc_msg *msg;
220         u32 ref;
221
222         p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC);
223         if (!p_ptr) {
224                 pr_warn("Port creation failed, no memory\n");
225                 return NULL;
226         }
227         ref = tipc_ref_acquire(p_ptr, &p_ptr->lock);
228         if (!ref) {
229                 pr_warn("Port creation failed, ref. table exhausted\n");
230                 kfree(p_ptr);
231                 return NULL;
232         }
233
234         p_ptr->usr_handle = usr_handle;
235         p_ptr->max_pkt = MAX_PKT_DEFAULT;
236         p_ptr->ref = ref;
237         INIT_LIST_HEAD(&p_ptr->wait_list);
238         INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
239         p_ptr->dispatcher = dispatcher;
240         p_ptr->wakeup = wakeup;
241         p_ptr->user_port = NULL;
242         k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
243         INIT_LIST_HEAD(&p_ptr->publications);
244         INIT_LIST_HEAD(&p_ptr->port_list);
245
246         /*
247          * Must hold port list lock while initializing message header template
248          * to ensure a change to node's own network address doesn't result
249          * in template containing out-dated network address information
250          */
251         spin_lock_bh(&tipc_port_list_lock);
252         msg = &p_ptr->phdr;
253         tipc_msg_init(msg, importance, TIPC_NAMED_MSG, NAMED_H_SIZE, 0);
254         msg_set_origport(msg, ref);
255         list_add_tail(&p_ptr->port_list, &ports);
256         spin_unlock_bh(&tipc_port_list_lock);
257         return p_ptr;
258 }
259
260 int tipc_deleteport(u32 ref)
261 {
262         struct tipc_port *p_ptr;
263         struct sk_buff *buf = NULL;
264
265         tipc_withdraw(ref, 0, NULL);
266         p_ptr = tipc_port_lock(ref);
267         if (!p_ptr)
268                 return -EINVAL;
269
270         tipc_ref_discard(ref);
271         tipc_port_unlock(p_ptr);
272
273         k_cancel_timer(&p_ptr->timer);
274         if (p_ptr->connected) {
275                 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
276                 tipc_nodesub_unsubscribe(&p_ptr->subscription);
277         }
278         kfree(p_ptr->user_port);
279
280         spin_lock_bh(&tipc_port_list_lock);
281         list_del(&p_ptr->port_list);
282         list_del(&p_ptr->wait_list);
283         spin_unlock_bh(&tipc_port_list_lock);
284         k_term_timer(&p_ptr->timer);
285         kfree(p_ptr);
286         tipc_net_route_msg(buf);
287         return 0;
288 }
289
290 static int port_unreliable(struct tipc_port *p_ptr)
291 {
292         return msg_src_droppable(&p_ptr->phdr);
293 }
294
295 int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
296 {
297         struct tipc_port *p_ptr;
298
299         p_ptr = tipc_port_lock(ref);
300         if (!p_ptr)
301                 return -EINVAL;
302         *isunreliable = port_unreliable(p_ptr);
303         tipc_port_unlock(p_ptr);
304         return 0;
305 }
306
307 int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
308 {
309         struct tipc_port *p_ptr;
310
311         p_ptr = tipc_port_lock(ref);
312         if (!p_ptr)
313                 return -EINVAL;
314         msg_set_src_droppable(&p_ptr->phdr, (isunreliable != 0));
315         tipc_port_unlock(p_ptr);
316         return 0;
317 }
318
319 static int port_unreturnable(struct tipc_port *p_ptr)
320 {
321         return msg_dest_droppable(&p_ptr->phdr);
322 }
323
324 int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
325 {
326         struct tipc_port *p_ptr;
327
328         p_ptr = tipc_port_lock(ref);
329         if (!p_ptr)
330                 return -EINVAL;
331         *isunrejectable = port_unreturnable(p_ptr);
332         tipc_port_unlock(p_ptr);
333         return 0;
334 }
335
336 int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
337 {
338         struct tipc_port *p_ptr;
339
340         p_ptr = tipc_port_lock(ref);
341         if (!p_ptr)
342                 return -EINVAL;
343         msg_set_dest_droppable(&p_ptr->phdr, (isunrejectable != 0));
344         tipc_port_unlock(p_ptr);
345         return 0;
346 }
347
348 /*
349  * port_build_proto_msg(): create connection protocol message for port
350  *
351  * On entry the port must be locked and connected.
352  */
353 static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr,
354                                             u32 type, u32 ack)
355 {
356         struct sk_buff *buf;
357         struct tipc_msg *msg;
358
359         buf = tipc_buf_acquire(INT_H_SIZE);
360         if (buf) {
361                 msg = buf_msg(buf);
362                 tipc_msg_init(msg, CONN_MANAGER, type, INT_H_SIZE,
363                               port_peernode(p_ptr));
364                 msg_set_destport(msg, port_peerport(p_ptr));
365                 msg_set_origport(msg, p_ptr->ref);
366                 msg_set_msgcnt(msg, ack);
367         }
368         return buf;
369 }
370
371 int tipc_reject_msg(struct sk_buff *buf, u32 err)
372 {
373         struct tipc_msg *msg = buf_msg(buf);
374         struct sk_buff *rbuf;
375         struct tipc_msg *rmsg;
376         int hdr_sz;
377         u32 imp;
378         u32 data_sz = msg_data_sz(msg);
379         u32 src_node;
380         u32 rmsg_sz;
381
382         /* discard rejected message if it shouldn't be returned to sender */
383         if (WARN(!msg_isdata(msg),
384                  "attempt to reject message with user=%u", msg_user(msg))) {
385                 dump_stack();
386                 goto exit;
387         }
388         if (msg_errcode(msg) || msg_dest_droppable(msg))
389                 goto exit;
390
391         /*
392          * construct returned message by copying rejected message header and
393          * data (or subset), then updating header fields that need adjusting
394          */
395         hdr_sz = msg_hdr_sz(msg);
396         rmsg_sz = hdr_sz + min_t(u32, data_sz, MAX_REJECT_SIZE);
397
398         rbuf = tipc_buf_acquire(rmsg_sz);
399         if (rbuf == NULL)
400                 goto exit;
401
402         rmsg = buf_msg(rbuf);
403         skb_copy_to_linear_data(rbuf, msg, rmsg_sz);
404
405         if (msg_connected(rmsg)) {
406                 imp = msg_importance(rmsg);
407                 if (imp < TIPC_CRITICAL_IMPORTANCE)
408                         msg_set_importance(rmsg, ++imp);
409         }
410         msg_set_non_seq(rmsg, 0);
411         msg_set_size(rmsg, rmsg_sz);
412         msg_set_errcode(rmsg, err);
413         msg_set_prevnode(rmsg, tipc_own_addr);
414         msg_swap_words(rmsg, 4, 5);
415         if (!msg_short(rmsg))
416                 msg_swap_words(rmsg, 6, 7);
417
418         /* send self-abort message when rejecting on a connected port */
419         if (msg_connected(msg)) {
420                 struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
421
422                 if (p_ptr) {
423                         struct sk_buff *abuf = NULL;
424
425                         if (p_ptr->connected)
426                                 abuf = port_build_self_abort_msg(p_ptr, err);
427                         tipc_port_unlock(p_ptr);
428                         tipc_net_route_msg(abuf);
429                 }
430         }
431
432         /* send returned message & dispose of rejected message */
433         src_node = msg_prevnode(msg);
434         if (in_own_node(src_node))
435                 tipc_port_recv_msg(rbuf);
436         else
437                 tipc_link_send(rbuf, src_node, msg_link_selector(rmsg));
438 exit:
439         kfree_skb(buf);
440         return data_sz;
441 }
442
443 int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr,
444                               struct iovec const *msg_sect, u32 num_sect,
445                               unsigned int total_len, int err)
446 {
447         struct sk_buff *buf;
448         int res;
449
450         res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
451                         !p_ptr->user_port, &buf);
452         if (!buf)
453                 return res;
454
455         return tipc_reject_msg(buf, err);
456 }
457
458 static void port_timeout(unsigned long ref)
459 {
460         struct tipc_port *p_ptr = tipc_port_lock(ref);
461         struct sk_buff *buf = NULL;
462
463         if (!p_ptr)
464                 return;
465
466         if (!p_ptr->connected) {
467                 tipc_port_unlock(p_ptr);
468                 return;
469         }
470
471         /* Last probe answered ? */
472         if (p_ptr->probing_state == PROBING) {
473                 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
474         } else {
475                 buf = port_build_proto_msg(p_ptr, CONN_PROBE, 0);
476                 p_ptr->probing_state = PROBING;
477                 k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
478         }
479         tipc_port_unlock(p_ptr);
480         tipc_net_route_msg(buf);
481 }
482
483
484 static void port_handle_node_down(unsigned long ref)
485 {
486         struct tipc_port *p_ptr = tipc_port_lock(ref);
487         struct sk_buff *buf = NULL;
488
489         if (!p_ptr)
490                 return;
491         buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
492         tipc_port_unlock(p_ptr);
493         tipc_net_route_msg(buf);
494 }
495
496
497 static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 err)
498 {
499         struct sk_buff *buf = port_build_peer_abort_msg(p_ptr, err);
500
501         if (buf) {
502                 struct tipc_msg *msg = buf_msg(buf);
503                 msg_swap_words(msg, 4, 5);
504                 msg_swap_words(msg, 6, 7);
505         }
506         return buf;
507 }
508
509
510 static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 err)
511 {
512         struct sk_buff *buf;
513         struct tipc_msg *msg;
514         u32 imp;
515
516         if (!p_ptr->connected)
517                 return NULL;
518
519         buf = tipc_buf_acquire(BASIC_H_SIZE);
520         if (buf) {
521                 msg = buf_msg(buf);
522                 memcpy(msg, &p_ptr->phdr, BASIC_H_SIZE);
523                 msg_set_hdr_sz(msg, BASIC_H_SIZE);
524                 msg_set_size(msg, BASIC_H_SIZE);
525                 imp = msg_importance(msg);
526                 if (imp < TIPC_CRITICAL_IMPORTANCE)
527                         msg_set_importance(msg, ++imp);
528                 msg_set_errcode(msg, err);
529         }
530         return buf;
531 }
532
533 void tipc_port_recv_proto_msg(struct sk_buff *buf)
534 {
535         struct tipc_msg *msg = buf_msg(buf);
536         struct tipc_port *p_ptr;
537         struct sk_buff *r_buf = NULL;
538         u32 destport = msg_destport(msg);
539         int wakeable;
540
541         /* Validate connection */
542         p_ptr = tipc_port_lock(destport);
543         if (!p_ptr || !p_ptr->connected || !tipc_port_peer_msg(p_ptr, msg)) {
544                 r_buf = tipc_buf_acquire(BASIC_H_SIZE);
545                 if (r_buf) {
546                         msg = buf_msg(r_buf);
547                         tipc_msg_init(msg, TIPC_HIGH_IMPORTANCE, TIPC_CONN_MSG,
548                                       BASIC_H_SIZE, msg_orignode(msg));
549                         msg_set_errcode(msg, TIPC_ERR_NO_PORT);
550                         msg_set_origport(msg, destport);
551                         msg_set_destport(msg, msg_origport(msg));
552                 }
553                 if (p_ptr)
554                         tipc_port_unlock(p_ptr);
555                 goto exit;
556         }
557
558         /* Process protocol message sent by peer */
559         switch (msg_type(msg)) {
560         case CONN_ACK:
561                 wakeable = tipc_port_congested(p_ptr) && p_ptr->congested &&
562                         p_ptr->wakeup;
563                 p_ptr->acked += msg_msgcnt(msg);
564                 if (!tipc_port_congested(p_ptr)) {
565                         p_ptr->congested = 0;
566                         if (wakeable)
567                                 p_ptr->wakeup(p_ptr);
568                 }
569                 break;
570         case CONN_PROBE:
571                 r_buf = port_build_proto_msg(p_ptr, CONN_PROBE_REPLY, 0);
572                 break;
573         default:
574                 /* CONN_PROBE_REPLY or unrecognized - no action required */
575                 break;
576         }
577         p_ptr->probing_state = CONFIRMED;
578         tipc_port_unlock(p_ptr);
579 exit:
580         tipc_net_route_msg(r_buf);
581         kfree_skb(buf);
582 }
583
584 static void port_print(struct tipc_port *p_ptr, struct print_buf *buf, int full_id)
585 {
586         struct publication *publ;
587
588         if (full_id)
589                 tipc_printf(buf, "<%u.%u.%u:%u>:",
590                             tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
591                             tipc_node(tipc_own_addr), p_ptr->ref);
592         else
593                 tipc_printf(buf, "%-10u:", p_ptr->ref);
594
595         if (p_ptr->connected) {
596                 u32 dport = port_peerport(p_ptr);
597                 u32 destnode = port_peernode(p_ptr);
598
599                 tipc_printf(buf, " connected to <%u.%u.%u:%u>",
600                             tipc_zone(destnode), tipc_cluster(destnode),
601                             tipc_node(destnode), dport);
602                 if (p_ptr->conn_type != 0)
603                         tipc_printf(buf, " via {%u,%u}",
604                                     p_ptr->conn_type,
605                                     p_ptr->conn_instance);
606         } else if (p_ptr->published) {
607                 tipc_printf(buf, " bound to");
608                 list_for_each_entry(publ, &p_ptr->publications, pport_list) {
609                         if (publ->lower == publ->upper)
610                                 tipc_printf(buf, " {%u,%u}", publ->type,
611                                             publ->lower);
612                         else
613                                 tipc_printf(buf, " {%u,%u,%u}", publ->type,
614                                             publ->lower, publ->upper);
615                 }
616         }
617         tipc_printf(buf, "\n");
618 }
619
620 #define MAX_PORT_QUERY 32768
621
622 struct sk_buff *tipc_port_get_ports(void)
623 {
624         struct sk_buff *buf;
625         struct tlv_desc *rep_tlv;
626         struct print_buf pb;
627         struct tipc_port *p_ptr;
628         int str_len;
629
630         buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY));
631         if (!buf)
632                 return NULL;
633         rep_tlv = (struct tlv_desc *)buf->data;
634
635         tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY);
636         spin_lock_bh(&tipc_port_list_lock);
637         list_for_each_entry(p_ptr, &ports, port_list) {
638                 spin_lock_bh(p_ptr->lock);
639                 port_print(p_ptr, &pb, 0);
640                 spin_unlock_bh(p_ptr->lock);
641         }
642         spin_unlock_bh(&tipc_port_list_lock);
643         str_len = tipc_printbuf_validate(&pb);
644
645         skb_put(buf, TLV_SPACE(str_len));
646         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
647
648         return buf;
649 }
650
651 void tipc_port_reinit(void)
652 {
653         struct tipc_port *p_ptr;
654         struct tipc_msg *msg;
655
656         spin_lock_bh(&tipc_port_list_lock);
657         list_for_each_entry(p_ptr, &ports, port_list) {
658                 msg = &p_ptr->phdr;
659                 msg_set_prevnode(msg, tipc_own_addr);
660                 msg_set_orignode(msg, tipc_own_addr);
661         }
662         spin_unlock_bh(&tipc_port_list_lock);
663 }
664
665
666 /*
667  *  port_dispatcher_sigh(): Signal handler for messages destinated
668  *                          to the tipc_port interface.
669  */
670 static void port_dispatcher_sigh(void *dummy)
671 {
672         struct sk_buff *buf;
673
674         spin_lock_bh(&queue_lock);
675         buf = msg_queue_head;
676         msg_queue_head = NULL;
677         spin_unlock_bh(&queue_lock);
678
679         while (buf) {
680                 struct tipc_port *p_ptr;
681                 struct user_port *up_ptr;
682                 struct tipc_portid orig;
683                 struct tipc_name_seq dseq;
684                 void *usr_handle;
685                 int connected;
686                 int peer_invalid;
687                 int published;
688                 u32 message_type;
689
690                 struct sk_buff *next = buf->next;
691                 struct tipc_msg *msg = buf_msg(buf);
692                 u32 dref = msg_destport(msg);
693
694                 message_type = msg_type(msg);
695                 if (message_type > TIPC_DIRECT_MSG)
696                         goto reject;    /* Unsupported message type */
697
698                 p_ptr = tipc_port_lock(dref);
699                 if (!p_ptr)
700                         goto reject;    /* Port deleted while msg in queue */
701
702                 orig.ref = msg_origport(msg);
703                 orig.node = msg_orignode(msg);
704                 up_ptr = p_ptr->user_port;
705                 usr_handle = up_ptr->usr_handle;
706                 connected = p_ptr->connected;
707                 peer_invalid = connected && !tipc_port_peer_msg(p_ptr, msg);
708                 published = p_ptr->published;
709
710                 if (unlikely(msg_errcode(msg)))
711                         goto err;
712
713                 switch (message_type) {
714
715                 case TIPC_CONN_MSG:{
716                                 tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
717                                 u32 dsz;
718
719                                 tipc_port_unlock(p_ptr);
720                                 if (unlikely(!cb))
721                                         goto reject;
722                                 if (unlikely(!connected)) {
723                                         if (tipc_connect2port(dref, &orig))
724                                                 goto reject;
725                                 } else if (peer_invalid)
726                                         goto reject;
727                                 dsz = msg_data_sz(msg);
728                                 if (unlikely(dsz &&
729                                              (++p_ptr->conn_unacked >=
730                                               TIPC_FLOW_CONTROL_WIN)))
731                                         tipc_acknowledge(dref,
732                                                          p_ptr->conn_unacked);
733                                 skb_pull(buf, msg_hdr_sz(msg));
734                                 cb(usr_handle, dref, &buf, msg_data(msg), dsz);
735                                 break;
736                         }
737                 case TIPC_DIRECT_MSG:{
738                                 tipc_msg_event cb = up_ptr->msg_cb;
739
740                                 tipc_port_unlock(p_ptr);
741                                 if (unlikely(!cb || connected))
742                                         goto reject;
743                                 skb_pull(buf, msg_hdr_sz(msg));
744                                 cb(usr_handle, dref, &buf, msg_data(msg),
745                                    msg_data_sz(msg), msg_importance(msg),
746                                    &orig);
747                                 break;
748                         }
749                 case TIPC_MCAST_MSG:
750                 case TIPC_NAMED_MSG:{
751                                 tipc_named_msg_event cb = up_ptr->named_msg_cb;
752
753                                 tipc_port_unlock(p_ptr);
754                                 if (unlikely(!cb || connected || !published))
755                                         goto reject;
756                                 dseq.type =  msg_nametype(msg);
757                                 dseq.lower = msg_nameinst(msg);
758                                 dseq.upper = (message_type == TIPC_NAMED_MSG)
759                                         ? dseq.lower : msg_nameupper(msg);
760                                 skb_pull(buf, msg_hdr_sz(msg));
761                                 cb(usr_handle, dref, &buf, msg_data(msg),
762                                    msg_data_sz(msg), msg_importance(msg),
763                                    &orig, &dseq);
764                                 break;
765                         }
766                 }
767                 if (buf)
768                         kfree_skb(buf);
769                 buf = next;
770                 continue;
771 err:
772                 switch (message_type) {
773
774                 case TIPC_CONN_MSG:{
775                                 tipc_conn_shutdown_event cb =
776                                         up_ptr->conn_err_cb;
777
778                                 tipc_port_unlock(p_ptr);
779                                 if (!cb || !connected || peer_invalid)
780                                         break;
781                                 tipc_disconnect(dref);
782                                 skb_pull(buf, msg_hdr_sz(msg));
783                                 cb(usr_handle, dref, &buf, msg_data(msg),
784                                    msg_data_sz(msg), msg_errcode(msg));
785                                 break;
786                         }
787                 case TIPC_DIRECT_MSG:{
788                                 tipc_msg_err_event cb = up_ptr->err_cb;
789
790                                 tipc_port_unlock(p_ptr);
791                                 if (!cb || connected)
792                                         break;
793                                 skb_pull(buf, msg_hdr_sz(msg));
794                                 cb(usr_handle, dref, &buf, msg_data(msg),
795                                    msg_data_sz(msg), msg_errcode(msg), &orig);
796                                 break;
797                         }
798                 case TIPC_MCAST_MSG:
799                 case TIPC_NAMED_MSG:{
800                                 tipc_named_msg_err_event cb =
801                                         up_ptr->named_err_cb;
802
803                                 tipc_port_unlock(p_ptr);
804                                 if (!cb || connected)
805                                         break;
806                                 dseq.type =  msg_nametype(msg);
807                                 dseq.lower = msg_nameinst(msg);
808                                 dseq.upper = (message_type == TIPC_NAMED_MSG)
809                                         ? dseq.lower : msg_nameupper(msg);
810                                 skb_pull(buf, msg_hdr_sz(msg));
811                                 cb(usr_handle, dref, &buf, msg_data(msg),
812                                    msg_data_sz(msg), msg_errcode(msg), &dseq);
813                                 break;
814                         }
815                 }
816                 if (buf)
817                         kfree_skb(buf);
818                 buf = next;
819                 continue;
820 reject:
821                 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
822                 buf = next;
823         }
824 }
825
826 /*
827  *  port_dispatcher(): Dispatcher for messages destinated
828  *  to the tipc_port interface. Called with port locked.
829  */
830 static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
831 {
832         buf->next = NULL;
833         spin_lock_bh(&queue_lock);
834         if (msg_queue_head) {
835                 msg_queue_tail->next = buf;
836                 msg_queue_tail = buf;
837         } else {
838                 msg_queue_tail = msg_queue_head = buf;
839                 tipc_k_signal((Handler)port_dispatcher_sigh, 0);
840         }
841         spin_unlock_bh(&queue_lock);
842         return 0;
843 }
844
845 /*
846  * Wake up port after congestion: Called with port locked
847  */
848 static void port_wakeup_sh(unsigned long ref)
849 {
850         struct tipc_port *p_ptr;
851         struct user_port *up_ptr;
852         tipc_continue_event cb = NULL;
853         void *uh = NULL;
854
855         p_ptr = tipc_port_lock(ref);
856         if (p_ptr) {
857                 up_ptr = p_ptr->user_port;
858                 if (up_ptr) {
859                         cb = up_ptr->continue_event_cb;
860                         uh = up_ptr->usr_handle;
861                 }
862                 tipc_port_unlock(p_ptr);
863         }
864         if (cb)
865                 cb(uh, ref);
866 }
867
868
869 static void port_wakeup(struct tipc_port *p_ptr)
870 {
871         tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref);
872 }
873
874 void tipc_acknowledge(u32 ref, u32 ack)
875 {
876         struct tipc_port *p_ptr;
877         struct sk_buff *buf = NULL;
878
879         p_ptr = tipc_port_lock(ref);
880         if (!p_ptr)
881                 return;
882         if (p_ptr->connected) {
883                 p_ptr->conn_unacked -= ack;
884                 buf = port_build_proto_msg(p_ptr, CONN_ACK, ack);
885         }
886         tipc_port_unlock(p_ptr);
887         tipc_net_route_msg(buf);
888 }
889
890 /*
891  * tipc_createport(): user level call.
892  */
893 int tipc_createport(void *usr_handle,
894                     unsigned int importance,
895                     tipc_msg_err_event error_cb,
896                     tipc_named_msg_err_event named_error_cb,
897                     tipc_conn_shutdown_event conn_error_cb,
898                     tipc_msg_event msg_cb,
899                     tipc_named_msg_event named_msg_cb,
900                     tipc_conn_msg_event conn_msg_cb,
901                     tipc_continue_event continue_event_cb, /* May be zero */
902                     u32 *portref)
903 {
904         struct user_port *up_ptr;
905         struct tipc_port *p_ptr;
906
907         up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
908         if (!up_ptr) {
909                 pr_warn("Port creation failed, no memory\n");
910                 return -ENOMEM;
911         }
912         p_ptr = tipc_createport_raw(NULL, port_dispatcher, port_wakeup,
913                                     importance);
914         if (!p_ptr) {
915                 kfree(up_ptr);
916                 return -ENOMEM;
917         }
918
919         p_ptr->user_port = up_ptr;
920         up_ptr->usr_handle = usr_handle;
921         up_ptr->ref = p_ptr->ref;
922         up_ptr->err_cb = error_cb;
923         up_ptr->named_err_cb = named_error_cb;
924         up_ptr->conn_err_cb = conn_error_cb;
925         up_ptr->msg_cb = msg_cb;
926         up_ptr->named_msg_cb = named_msg_cb;
927         up_ptr->conn_msg_cb = conn_msg_cb;
928         up_ptr->continue_event_cb = continue_event_cb;
929         *portref = p_ptr->ref;
930         tipc_port_unlock(p_ptr);
931         return 0;
932 }
933
934 int tipc_portimportance(u32 ref, unsigned int *importance)
935 {
936         struct tipc_port *p_ptr;
937
938         p_ptr = tipc_port_lock(ref);
939         if (!p_ptr)
940                 return -EINVAL;
941         *importance = (unsigned int)msg_importance(&p_ptr->phdr);
942         tipc_port_unlock(p_ptr);
943         return 0;
944 }
945
946 int tipc_set_portimportance(u32 ref, unsigned int imp)
947 {
948         struct tipc_port *p_ptr;
949
950         if (imp > TIPC_CRITICAL_IMPORTANCE)
951                 return -EINVAL;
952
953         p_ptr = tipc_port_lock(ref);
954         if (!p_ptr)
955                 return -EINVAL;
956         msg_set_importance(&p_ptr->phdr, (u32)imp);
957         tipc_port_unlock(p_ptr);
958         return 0;
959 }
960
961
962 int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
963 {
964         struct tipc_port *p_ptr;
965         struct publication *publ;
966         u32 key;
967         int res = -EINVAL;
968
969         p_ptr = tipc_port_lock(ref);
970         if (!p_ptr)
971                 return -EINVAL;
972
973         if (p_ptr->connected)
974                 goto exit;
975         key = ref + p_ptr->pub_count + 1;
976         if (key == ref) {
977                 res = -EADDRINUSE;
978                 goto exit;
979         }
980         publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
981                                     scope, p_ptr->ref, key);
982         if (publ) {
983                 list_add(&publ->pport_list, &p_ptr->publications);
984                 p_ptr->pub_count++;
985                 p_ptr->published = 1;
986                 res = 0;
987         }
988 exit:
989         tipc_port_unlock(p_ptr);
990         return res;
991 }
992
993 int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
994 {
995         struct tipc_port *p_ptr;
996         struct publication *publ;
997         struct publication *tpubl;
998         int res = -EINVAL;
999
1000         p_ptr = tipc_port_lock(ref);
1001         if (!p_ptr)
1002                 return -EINVAL;
1003         if (!seq) {
1004                 list_for_each_entry_safe(publ, tpubl,
1005                                          &p_ptr->publications, pport_list) {
1006                         tipc_nametbl_withdraw(publ->type, publ->lower,
1007                                               publ->ref, publ->key);
1008                 }
1009                 res = 0;
1010         } else {
1011                 list_for_each_entry_safe(publ, tpubl,
1012                                          &p_ptr->publications, pport_list) {
1013                         if (publ->scope != scope)
1014                                 continue;
1015                         if (publ->type != seq->type)
1016                                 continue;
1017                         if (publ->lower != seq->lower)
1018                                 continue;
1019                         if (publ->upper != seq->upper)
1020                                 break;
1021                         tipc_nametbl_withdraw(publ->type, publ->lower,
1022                                               publ->ref, publ->key);
1023                         res = 0;
1024                         break;
1025                 }
1026         }
1027         if (list_empty(&p_ptr->publications))
1028                 p_ptr->published = 0;
1029         tipc_port_unlock(p_ptr);
1030         return res;
1031 }
1032
1033 int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
1034 {
1035         struct tipc_port *p_ptr;
1036         struct tipc_msg *msg;
1037         int res = -EINVAL;
1038
1039         p_ptr = tipc_port_lock(ref);
1040         if (!p_ptr)
1041                 return -EINVAL;
1042         if (p_ptr->published || p_ptr->connected)
1043                 goto exit;
1044         if (!peer->ref)
1045                 goto exit;
1046
1047         msg = &p_ptr->phdr;
1048         msg_set_destnode(msg, peer->node);
1049         msg_set_destport(msg, peer->ref);
1050         msg_set_type(msg, TIPC_CONN_MSG);
1051         msg_set_lookup_scope(msg, 0);
1052         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1053
1054         p_ptr->probing_interval = PROBING_INTERVAL;
1055         p_ptr->probing_state = CONFIRMED;
1056         p_ptr->connected = 1;
1057         k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
1058
1059         tipc_nodesub_subscribe(&p_ptr->subscription, peer->node,
1060                           (void *)(unsigned long)ref,
1061                           (net_ev_handler)port_handle_node_down);
1062         res = 0;
1063 exit:
1064         tipc_port_unlock(p_ptr);
1065         p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref);
1066         return res;
1067 }
1068
1069 /**
1070  * tipc_disconnect_port - disconnect port from peer
1071  *
1072  * Port must be locked.
1073  */
1074 int tipc_disconnect_port(struct tipc_port *tp_ptr)
1075 {
1076         int res;
1077
1078         if (tp_ptr->connected) {
1079                 tp_ptr->connected = 0;
1080                 /* let timer expire on it's own to avoid deadlock! */
1081                 tipc_nodesub_unsubscribe(&tp_ptr->subscription);
1082                 res = 0;
1083         } else {
1084                 res = -ENOTCONN;
1085         }
1086         return res;
1087 }
1088
1089 /*
1090  * tipc_disconnect(): Disconnect port form peer.
1091  *                    This is a node local operation.
1092  */
1093 int tipc_disconnect(u32 ref)
1094 {
1095         struct tipc_port *p_ptr;
1096         int res;
1097
1098         p_ptr = tipc_port_lock(ref);
1099         if (!p_ptr)
1100                 return -EINVAL;
1101         res = tipc_disconnect_port(p_ptr);
1102         tipc_port_unlock(p_ptr);
1103         return res;
1104 }
1105
1106 /*
1107  * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect
1108  */
1109 int tipc_shutdown(u32 ref)
1110 {
1111         struct tipc_port *p_ptr;
1112         struct sk_buff *buf = NULL;
1113
1114         p_ptr = tipc_port_lock(ref);
1115         if (!p_ptr)
1116                 return -EINVAL;
1117
1118         buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN);
1119         tipc_port_unlock(p_ptr);
1120         tipc_net_route_msg(buf);
1121         return tipc_disconnect(ref);
1122 }
1123
1124 /**
1125  * tipc_port_recv_msg - receive message from lower layer and deliver to port user
1126  */
1127 int tipc_port_recv_msg(struct sk_buff *buf)
1128 {
1129         struct tipc_port *p_ptr;
1130         struct tipc_msg *msg = buf_msg(buf);
1131         u32 destport = msg_destport(msg);
1132         u32 dsz = msg_data_sz(msg);
1133         u32 err;
1134
1135         /* forward unresolved named message */
1136         if (unlikely(!destport)) {
1137                 tipc_net_route_msg(buf);
1138                 return dsz;
1139         }
1140
1141         /* validate destination & pass to port, otherwise reject message */
1142         p_ptr = tipc_port_lock(destport);
1143         if (likely(p_ptr)) {
1144                 err = p_ptr->dispatcher(p_ptr, buf);
1145                 tipc_port_unlock(p_ptr);
1146                 if (likely(!err))
1147                         return dsz;
1148         } else {
1149                 err = TIPC_ERR_NO_PORT;
1150         }
1151
1152         return tipc_reject_msg(buf, err);
1153 }
1154
1155 /*
1156  *  tipc_port_recv_sections(): Concatenate and deliver sectioned
1157  *                        message for this node.
1158  */
1159 static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_sect,
1160                                    struct iovec const *msg_sect,
1161                                    unsigned int total_len)
1162 {
1163         struct sk_buff *buf;
1164         int res;
1165
1166         res = tipc_msg_build(&sender->phdr, msg_sect, num_sect, total_len,
1167                         MAX_MSG_SIZE, !sender->user_port, &buf);
1168         if (likely(buf))
1169                 tipc_port_recv_msg(buf);
1170         return res;
1171 }
1172
1173 /**
1174  * tipc_send - send message sections on connection
1175  */
1176 int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect,
1177               unsigned int total_len)
1178 {
1179         struct tipc_port *p_ptr;
1180         u32 destnode;
1181         int res;
1182
1183         p_ptr = tipc_port_deref(ref);
1184         if (!p_ptr || !p_ptr->connected)
1185                 return -EINVAL;
1186
1187         p_ptr->congested = 1;
1188         if (!tipc_port_congested(p_ptr)) {
1189                 destnode = port_peernode(p_ptr);
1190                 if (likely(!in_own_node(destnode)))
1191                         res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1192                                                            total_len, destnode);
1193                 else
1194                         res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
1195                                                       total_len);
1196
1197                 if (likely(res != -ELINKCONG)) {
1198                         p_ptr->congested = 0;
1199                         if (res > 0)
1200                                 p_ptr->sent++;
1201                         return res;
1202                 }
1203         }
1204         if (port_unreliable(p_ptr)) {
1205                 p_ptr->congested = 0;
1206                 return total_len;
1207         }
1208         return -ELINKCONG;
1209 }
1210
1211 /**
1212  * tipc_send2name - send message sections to port name
1213  */
1214 int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
1215                    unsigned int num_sect, struct iovec const *msg_sect,
1216                    unsigned int total_len)
1217 {
1218         struct tipc_port *p_ptr;
1219         struct tipc_msg *msg;
1220         u32 destnode = domain;
1221         u32 destport;
1222         int res;
1223
1224         p_ptr = tipc_port_deref(ref);
1225         if (!p_ptr || p_ptr->connected)
1226                 return -EINVAL;
1227
1228         msg = &p_ptr->phdr;
1229         msg_set_type(msg, TIPC_NAMED_MSG);
1230         msg_set_hdr_sz(msg, NAMED_H_SIZE);
1231         msg_set_nametype(msg, name->type);
1232         msg_set_nameinst(msg, name->instance);
1233         msg_set_lookup_scope(msg, tipc_addr_scope(domain));
1234         destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
1235         msg_set_destnode(msg, destnode);
1236         msg_set_destport(msg, destport);
1237
1238         if (likely(destport || destnode)) {
1239                 if (likely(in_own_node(destnode)))
1240                         res = tipc_port_recv_sections(p_ptr, num_sect,
1241                                                       msg_sect, total_len);
1242                 else if (tipc_own_addr)
1243                         res = tipc_link_send_sections_fast(p_ptr, msg_sect,
1244                                                            num_sect, total_len,
1245                                                            destnode);
1246                 else
1247                         res = tipc_port_reject_sections(p_ptr, msg, msg_sect,
1248                                                         num_sect, total_len,
1249                                                         TIPC_ERR_NO_NODE);
1250                 if (likely(res != -ELINKCONG)) {
1251                         if (res > 0)
1252                                 p_ptr->sent++;
1253                         return res;
1254                 }
1255                 if (port_unreliable(p_ptr)) {
1256                         return total_len;
1257                 }
1258                 return -ELINKCONG;
1259         }
1260         return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1261                                          total_len, TIPC_ERR_NO_NAME);
1262 }
1263
1264 /**
1265  * tipc_send2port - send message sections to port identity
1266  */
1267 int tipc_send2port(u32 ref, struct tipc_portid const *dest,
1268                    unsigned int num_sect, struct iovec const *msg_sect,
1269                    unsigned int total_len)
1270 {
1271         struct tipc_port *p_ptr;
1272         struct tipc_msg *msg;
1273         int res;
1274
1275         p_ptr = tipc_port_deref(ref);
1276         if (!p_ptr || p_ptr->connected)
1277                 return -EINVAL;
1278
1279         msg = &p_ptr->phdr;
1280         msg_set_type(msg, TIPC_DIRECT_MSG);
1281         msg_set_lookup_scope(msg, 0);
1282         msg_set_destnode(msg, dest->node);
1283         msg_set_destport(msg, dest->ref);
1284         msg_set_hdr_sz(msg, BASIC_H_SIZE);
1285
1286         if (in_own_node(dest->node))
1287                 res =  tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
1288                                                total_len);
1289         else if (tipc_own_addr)
1290                 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1291                                                    total_len, dest->node);
1292         else
1293                 res = tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1294                                                 total_len, TIPC_ERR_NO_NODE);
1295         if (likely(res != -ELINKCONG)) {
1296                 if (res > 0)
1297                         p_ptr->sent++;
1298                 return res;
1299         }
1300         if (port_unreliable(p_ptr)) {
1301                 return total_len;
1302         }
1303         return -ELINKCONG;
1304 }
1305
1306 /**
1307  * tipc_send_buf2port - send message buffer to port identity
1308  */
1309 int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest,
1310                struct sk_buff *buf, unsigned int dsz)
1311 {
1312         struct tipc_port *p_ptr;
1313         struct tipc_msg *msg;
1314         int res;
1315
1316         p_ptr = (struct tipc_port *)tipc_ref_deref(ref);
1317         if (!p_ptr || p_ptr->connected)
1318                 return -EINVAL;
1319
1320         msg = &p_ptr->phdr;
1321         msg_set_type(msg, TIPC_DIRECT_MSG);
1322         msg_set_destnode(msg, dest->node);
1323         msg_set_destport(msg, dest->ref);
1324         msg_set_hdr_sz(msg, BASIC_H_SIZE);
1325         msg_set_size(msg, BASIC_H_SIZE + dsz);
1326         if (skb_cow(buf, BASIC_H_SIZE))
1327                 return -ENOMEM;
1328
1329         skb_push(buf, BASIC_H_SIZE);
1330         skb_copy_to_linear_data(buf, msg, BASIC_H_SIZE);
1331
1332         if (in_own_node(dest->node))
1333                 res = tipc_port_recv_msg(buf);
1334         else
1335                 res = tipc_send_buf_fast(buf, dest->node);
1336         if (likely(res != -ELINKCONG)) {
1337                 if (res > 0)
1338                         p_ptr->sent++;
1339                 return res;
1340         }
1341         if (port_unreliable(p_ptr))
1342                 return dsz;
1343         return -ELINKCONG;
1344 }