tipc: eliminate port_connect()/port_disconnect() functions
[cascardo/linux.git] / net / tipc / socket.c
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012-2014, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "port.h"
39 #include "name_table.h"
40 #include "node.h"
41 #include "link.h"
42 #include <linux/export.h>
43
44 #define SS_LISTENING    -1      /* socket is listening */
45 #define SS_READY        -2      /* socket is connectionless */
46
47 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
48 #define CONN_PROBING_INTERVAL 3600000   /* [ms] => 1 h */
49 #define TIPC_FWD_MSG            1
50
51 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
52 static void tipc_data_ready(struct sock *sk);
53 static void tipc_write_space(struct sock *sk);
54 static int tipc_release(struct socket *sock);
55 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
56 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
57 static void tipc_sk_timeout(unsigned long ref);
58
59 static const struct proto_ops packet_ops;
60 static const struct proto_ops stream_ops;
61 static const struct proto_ops msg_ops;
62
63 static struct proto tipc_proto;
64 static struct proto tipc_proto_kern;
65
66 /*
67  * Revised TIPC socket locking policy:
68  *
69  * Most socket operations take the standard socket lock when they start
70  * and hold it until they finish (or until they need to sleep).  Acquiring
71  * this lock grants the owner exclusive access to the fields of the socket
72  * data structures, with the exception of the backlog queue.  A few socket
73  * operations can be done without taking the socket lock because they only
74  * read socket information that never changes during the life of the socket.
75  *
76  * Socket operations may acquire the lock for the associated TIPC port if they
77  * need to perform an operation on the port.  If any routine needs to acquire
78  * both the socket lock and the port lock it must take the socket lock first
79  * to avoid the risk of deadlock.
80  *
81  * The dispatcher handling incoming messages cannot grab the socket lock in
82  * the standard fashion, since invoked it runs at the BH level and cannot block.
83  * Instead, it checks to see if the socket lock is currently owned by someone,
84  * and either handles the message itself or adds it to the socket's backlog
85  * queue; in the latter case the queued message is processed once the process
86  * owning the socket lock releases it.
87  *
88  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
89  * the problem of a blocked socket operation preventing any other operations
90  * from occurring.  However, applications must be careful if they have
91  * multiple threads trying to send (or receive) on the same socket, as these
92  * operations might interfere with each other.  For example, doing a connect
93  * and a receive at the same time might allow the receive to consume the
94  * ACK message meant for the connect.  While additional work could be done
95  * to try and overcome this, it doesn't seem to be worthwhile at the present.
96  *
97  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
98  * that another operation that must be performed in a non-blocking manner is
99  * not delayed for very long because the lock has already been taken.
100  *
101  * NOTE: This code assumes that certain fields of a port/socket pair are
102  * constant over its lifetime; such fields can be examined without taking
103  * the socket lock and/or port lock, and do not need to be re-read even
104  * after resuming processing after waiting.  These fields include:
105  *   - socket type
106  *   - pointer to socket sk structure (aka tipc_sock structure)
107  *   - pointer to port structure
108  *   - port reference
109  */
110
111 #include "socket.h"
112
113 /**
114  * advance_rx_queue - discard first buffer in socket receive queue
115  *
116  * Caller must hold socket lock
117  */
118 static void advance_rx_queue(struct sock *sk)
119 {
120         kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
121 }
122
123 /**
124  * reject_rx_queue - reject all buffers in socket receive queue
125  *
126  * Caller must hold socket lock
127  */
128 static void reject_rx_queue(struct sock *sk)
129 {
130         struct sk_buff *buf;
131         u32 dnode;
132
133         while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
134                 if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT))
135                         tipc_link_xmit(buf, dnode, 0);
136         }
137 }
138
139 /**
140  * tipc_sk_create - create a TIPC socket
141  * @net: network namespace (must be default network)
142  * @sock: pre-allocated socket structure
143  * @protocol: protocol indicator (must be 0)
144  * @kern: caused by kernel or by userspace?
145  *
146  * This routine creates additional data structures used by the TIPC socket,
147  * initializes them, and links them together.
148  *
149  * Returns 0 on success, errno otherwise
150  */
151 static int tipc_sk_create(struct net *net, struct socket *sock,
152                           int protocol, int kern)
153 {
154         const struct proto_ops *ops;
155         socket_state state;
156         struct sock *sk;
157         struct tipc_sock *tsk;
158         struct tipc_port *port;
159         u32 ref;
160
161         /* Validate arguments */
162         if (unlikely(protocol != 0))
163                 return -EPROTONOSUPPORT;
164
165         switch (sock->type) {
166         case SOCK_STREAM:
167                 ops = &stream_ops;
168                 state = SS_UNCONNECTED;
169                 break;
170         case SOCK_SEQPACKET:
171                 ops = &packet_ops;
172                 state = SS_UNCONNECTED;
173                 break;
174         case SOCK_DGRAM:
175         case SOCK_RDM:
176                 ops = &msg_ops;
177                 state = SS_READY;
178                 break;
179         default:
180                 return -EPROTOTYPE;
181         }
182
183         /* Allocate socket's protocol area */
184         if (!kern)
185                 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
186         else
187                 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern);
188
189         if (sk == NULL)
190                 return -ENOMEM;
191
192         tsk = tipc_sk(sk);
193         port = &tsk->port;
194
195         ref = tipc_port_init(port, TIPC_LOW_IMPORTANCE);
196         if (!ref) {
197                 pr_warn("Socket registration failed, ref. table exhausted\n");
198                 sk_free(sk);
199                 return -ENOMEM;
200         }
201
202         /* Finish initializing socket data structures */
203         sock->ops = ops;
204         sock->state = state;
205
206         sock_init_data(sock, sk);
207         k_init_timer(&port->timer, (Handler)tipc_sk_timeout, ref);
208         sk->sk_backlog_rcv = tipc_backlog_rcv;
209         sk->sk_rcvbuf = sysctl_tipc_rmem[1];
210         sk->sk_data_ready = tipc_data_ready;
211         sk->sk_write_space = tipc_write_space;
212         tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
213         tsk->sent_unacked = 0;
214         atomic_set(&tsk->dupl_rcvcnt, 0);
215         tipc_port_unlock(port);
216
217         if (sock->state == SS_READY) {
218                 tipc_port_set_unreturnable(port, true);
219                 if (sock->type == SOCK_DGRAM)
220                         tipc_port_set_unreliable(port, true);
221         }
222         return 0;
223 }
224
225 /**
226  * tipc_sock_create_local - create TIPC socket from inside TIPC module
227  * @type: socket type - SOCK_RDM or SOCK_SEQPACKET
228  *
229  * We cannot use sock_creat_kern here because it bumps module user count.
230  * Since socket owner and creator is the same module we must make sure
231  * that module count remains zero for module local sockets, otherwise
232  * we cannot do rmmod.
233  *
234  * Returns 0 on success, errno otherwise
235  */
236 int tipc_sock_create_local(int type, struct socket **res)
237 {
238         int rc;
239
240         rc = sock_create_lite(AF_TIPC, type, 0, res);
241         if (rc < 0) {
242                 pr_err("Failed to create kernel socket\n");
243                 return rc;
244         }
245         tipc_sk_create(&init_net, *res, 0, 1);
246
247         return 0;
248 }
249
250 /**
251  * tipc_sock_release_local - release socket created by tipc_sock_create_local
252  * @sock: the socket to be released.
253  *
254  * Module reference count is not incremented when such sockets are created,
255  * so we must keep it from being decremented when they are released.
256  */
257 void tipc_sock_release_local(struct socket *sock)
258 {
259         tipc_release(sock);
260         sock->ops = NULL;
261         sock_release(sock);
262 }
263
264 /**
265  * tipc_sock_accept_local - accept a connection on a socket created
266  * with tipc_sock_create_local. Use this function to avoid that
267  * module reference count is inadvertently incremented.
268  *
269  * @sock:    the accepting socket
270  * @newsock: reference to the new socket to be created
271  * @flags:   socket flags
272  */
273
274 int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
275                            int flags)
276 {
277         struct sock *sk = sock->sk;
278         int ret;
279
280         ret = sock_create_lite(sk->sk_family, sk->sk_type,
281                                sk->sk_protocol, newsock);
282         if (ret < 0)
283                 return ret;
284
285         ret = tipc_accept(sock, *newsock, flags);
286         if (ret < 0) {
287                 sock_release(*newsock);
288                 return ret;
289         }
290         (*newsock)->ops = sock->ops;
291         return ret;
292 }
293
294 /**
295  * tipc_release - destroy a TIPC socket
296  * @sock: socket to destroy
297  *
298  * This routine cleans up any messages that are still queued on the socket.
299  * For DGRAM and RDM socket types, all queued messages are rejected.
300  * For SEQPACKET and STREAM socket types, the first message is rejected
301  * and any others are discarded.  (If the first message on a STREAM socket
302  * is partially-read, it is discarded and the next one is rejected instead.)
303  *
304  * NOTE: Rejected messages are not necessarily returned to the sender!  They
305  * are returned or discarded according to the "destination droppable" setting
306  * specified for the message by the sender.
307  *
308  * Returns 0 on success, errno otherwise
309  */
310 static int tipc_release(struct socket *sock)
311 {
312         struct sock *sk = sock->sk;
313         struct tipc_sock *tsk;
314         struct tipc_port *port;
315         struct sk_buff *buf;
316         u32 dnode;
317
318         /*
319          * Exit if socket isn't fully initialized (occurs when a failed accept()
320          * releases a pre-allocated child socket that was never used)
321          */
322         if (sk == NULL)
323                 return 0;
324
325         tsk = tipc_sk(sk);
326         port = &tsk->port;
327         lock_sock(sk);
328
329         /*
330          * Reject all unreceived messages, except on an active connection
331          * (which disconnects locally & sends a 'FIN+' to peer)
332          */
333         while (sock->state != SS_DISCONNECTING) {
334                 buf = __skb_dequeue(&sk->sk_receive_queue);
335                 if (buf == NULL)
336                         break;
337                 if (TIPC_SKB_CB(buf)->handle != NULL)
338                         kfree_skb(buf);
339                 else {
340                         if ((sock->state == SS_CONNECTING) ||
341                             (sock->state == SS_CONNECTED)) {
342                                 sock->state = SS_DISCONNECTING;
343                                 port->connected = 0;
344                                 tipc_node_remove_conn(tipc_port_peernode(port),
345                                                       port->ref);
346                         }
347                         if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT))
348                                 tipc_link_xmit(buf, dnode, 0);
349                 }
350         }
351
352         /* Destroy TIPC port; also disconnects an active connection and
353          * sends a 'FIN-' to peer.
354          */
355         tipc_port_destroy(port);
356
357         /* Discard any remaining (connection-based) messages in receive queue */
358         __skb_queue_purge(&sk->sk_receive_queue);
359
360         /* Reject any messages that accumulated in backlog queue */
361         sock->state = SS_DISCONNECTING;
362         release_sock(sk);
363
364         sock_put(sk);
365         sock->sk = NULL;
366
367         return 0;
368 }
369
370 /**
371  * tipc_bind - associate or disassocate TIPC name(s) with a socket
372  * @sock: socket structure
373  * @uaddr: socket address describing name(s) and desired operation
374  * @uaddr_len: size of socket address data structure
375  *
376  * Name and name sequence binding is indicated using a positive scope value;
377  * a negative scope value unbinds the specified name.  Specifying no name
378  * (i.e. a socket address length of 0) unbinds all names from the socket.
379  *
380  * Returns 0 on success, errno otherwise
381  *
382  * NOTE: This routine doesn't need to take the socket lock since it doesn't
383  *       access any non-constant socket information.
384  */
385 static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
386                      int uaddr_len)
387 {
388         struct sock *sk = sock->sk;
389         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
390         struct tipc_sock *tsk = tipc_sk(sk);
391         int res = -EINVAL;
392
393         lock_sock(sk);
394         if (unlikely(!uaddr_len)) {
395                 res = tipc_withdraw(&tsk->port, 0, NULL);
396                 goto exit;
397         }
398
399         if (uaddr_len < sizeof(struct sockaddr_tipc)) {
400                 res = -EINVAL;
401                 goto exit;
402         }
403         if (addr->family != AF_TIPC) {
404                 res = -EAFNOSUPPORT;
405                 goto exit;
406         }
407
408         if (addr->addrtype == TIPC_ADDR_NAME)
409                 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
410         else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
411                 res = -EAFNOSUPPORT;
412                 goto exit;
413         }
414
415         if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
416             (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
417             (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
418                 res = -EACCES;
419                 goto exit;
420         }
421
422         res = (addr->scope > 0) ?
423                 tipc_publish(&tsk->port, addr->scope, &addr->addr.nameseq) :
424                 tipc_withdraw(&tsk->port, -addr->scope, &addr->addr.nameseq);
425 exit:
426         release_sock(sk);
427         return res;
428 }
429
430 /**
431  * tipc_getname - get port ID of socket or peer socket
432  * @sock: socket structure
433  * @uaddr: area for returned socket address
434  * @uaddr_len: area for returned length of socket address
435  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
436  *
437  * Returns 0 on success, errno otherwise
438  *
439  * NOTE: This routine doesn't need to take the socket lock since it only
440  *       accesses socket information that is unchanging (or which changes in
441  *       a completely predictable manner).
442  */
443 static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
444                         int *uaddr_len, int peer)
445 {
446         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
447         struct tipc_sock *tsk = tipc_sk(sock->sk);
448
449         memset(addr, 0, sizeof(*addr));
450         if (peer) {
451                 if ((sock->state != SS_CONNECTED) &&
452                         ((peer != 2) || (sock->state != SS_DISCONNECTING)))
453                         return -ENOTCONN;
454                 addr->addr.id.ref = tipc_port_peerport(&tsk->port);
455                 addr->addr.id.node = tipc_port_peernode(&tsk->port);
456         } else {
457                 addr->addr.id.ref = tsk->port.ref;
458                 addr->addr.id.node = tipc_own_addr;
459         }
460
461         *uaddr_len = sizeof(*addr);
462         addr->addrtype = TIPC_ADDR_ID;
463         addr->family = AF_TIPC;
464         addr->scope = 0;
465         addr->addr.name.domain = 0;
466
467         return 0;
468 }
469
470 /**
471  * tipc_poll - read and possibly block on pollmask
472  * @file: file structure associated with the socket
473  * @sock: socket for which to calculate the poll bits
474  * @wait: ???
475  *
476  * Returns pollmask value
477  *
478  * COMMENTARY:
479  * It appears that the usual socket locking mechanisms are not useful here
480  * since the pollmask info is potentially out-of-date the moment this routine
481  * exits.  TCP and other protocols seem to rely on higher level poll routines
482  * to handle any preventable race conditions, so TIPC will do the same ...
483  *
484  * TIPC sets the returned events as follows:
485  *
486  * socket state         flags set
487  * ------------         ---------
488  * unconnected          no read flags
489  *                      POLLOUT if port is not congested
490  *
491  * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
492  *                      no write flags
493  *
494  * connected            POLLIN/POLLRDNORM if data in rx queue
495  *                      POLLOUT if port is not congested
496  *
497  * disconnecting        POLLIN/POLLRDNORM/POLLHUP
498  *                      no write flags
499  *
500  * listening            POLLIN if SYN in rx queue
501  *                      no write flags
502  *
503  * ready                POLLIN/POLLRDNORM if data in rx queue
504  * [connectionless]     POLLOUT (since port cannot be congested)
505  *
506  * IMPORTANT: The fact that a read or write operation is indicated does NOT
507  * imply that the operation will succeed, merely that it should be performed
508  * and will not block.
509  */
510 static unsigned int tipc_poll(struct file *file, struct socket *sock,
511                               poll_table *wait)
512 {
513         struct sock *sk = sock->sk;
514         struct tipc_sock *tsk = tipc_sk(sk);
515         u32 mask = 0;
516
517         sock_poll_wait(file, sk_sleep(sk), wait);
518
519         switch ((int)sock->state) {
520         case SS_UNCONNECTED:
521                 if (!tsk->link_cong)
522                         mask |= POLLOUT;
523                 break;
524         case SS_READY:
525         case SS_CONNECTED:
526                 if (!tsk->link_cong && !tipc_sk_conn_cong(tsk))
527                         mask |= POLLOUT;
528                 /* fall thru' */
529         case SS_CONNECTING:
530         case SS_LISTENING:
531                 if (!skb_queue_empty(&sk->sk_receive_queue))
532                         mask |= (POLLIN | POLLRDNORM);
533                 break;
534         case SS_DISCONNECTING:
535                 mask = (POLLIN | POLLRDNORM | POLLHUP);
536                 break;
537         }
538
539         return mask;
540 }
541
542 /**
543  * tipc_sendmcast - send multicast message
544  * @sock: socket structure
545  * @seq: destination address
546  * @iov: message data to send
547  * @dsz: total length of message data
548  * @timeo: timeout to wait for wakeup
549  *
550  * Called from function tipc_sendmsg(), which has done all sanity checks
551  * Returns the number of bytes sent on success, or errno
552  */
553 static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
554                           struct iovec *iov, size_t dsz, long timeo)
555 {
556         struct sock *sk = sock->sk;
557         struct tipc_msg *mhdr = &tipc_sk(sk)->port.phdr;
558         struct sk_buff *buf;
559         uint mtu;
560         int rc;
561
562         msg_set_type(mhdr, TIPC_MCAST_MSG);
563         msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
564         msg_set_destport(mhdr, 0);
565         msg_set_destnode(mhdr, 0);
566         msg_set_nametype(mhdr, seq->type);
567         msg_set_namelower(mhdr, seq->lower);
568         msg_set_nameupper(mhdr, seq->upper);
569         msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
570
571 new_mtu:
572         mtu = tipc_bclink_get_mtu();
573         rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf);
574         if (unlikely(rc < 0))
575                 return rc;
576
577         do {
578                 rc = tipc_bclink_xmit(buf);
579                 if (likely(rc >= 0)) {
580                         rc = dsz;
581                         break;
582                 }
583                 if (rc == -EMSGSIZE)
584                         goto new_mtu;
585                 if (rc != -ELINKCONG)
586                         break;
587                 tipc_sk(sk)->link_cong = 1;
588                 rc = tipc_wait_for_sndmsg(sock, &timeo);
589                 if (rc)
590                         kfree_skb_list(buf);
591         } while (!rc);
592         return rc;
593 }
594
595 /* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
596  */
597 void tipc_sk_mcast_rcv(struct sk_buff *buf)
598 {
599         struct tipc_msg *msg = buf_msg(buf);
600         struct tipc_port_list dports = {0, NULL, };
601         struct tipc_port_list *item;
602         struct sk_buff *b;
603         uint i, last, dst = 0;
604         u32 scope = TIPC_CLUSTER_SCOPE;
605
606         if (in_own_node(msg_orignode(msg)))
607                 scope = TIPC_NODE_SCOPE;
608
609         /* Create destination port list: */
610         tipc_nametbl_mc_translate(msg_nametype(msg),
611                                   msg_namelower(msg),
612                                   msg_nameupper(msg),
613                                   scope,
614                                   &dports);
615         last = dports.count;
616         if (!last) {
617                 kfree_skb(buf);
618                 return;
619         }
620
621         for (item = &dports; item; item = item->next) {
622                 for (i = 0; i < PLSIZE && ++dst <= last; i++) {
623                         b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf;
624                         if (!b) {
625                                 pr_warn("Failed do clone mcast rcv buffer\n");
626                                 continue;
627                         }
628                         msg_set_destport(msg, item->ports[i]);
629                         tipc_sk_rcv(b);
630                 }
631         }
632         tipc_port_list_free(&dports);
633 }
634
635 /**
636  * tipc_sk_proto_rcv - receive a connection mng protocol message
637  * @tsk: receiving socket
638  * @dnode: node to send response message to, if any
639  * @buf: buffer containing protocol message
640  * Returns 0 (TIPC_OK) if message was consumed, 1 (TIPC_FWD_MSG) if
641  * (CONN_PROBE_REPLY) message should be forwarded.
642  */
643 static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode,
644                              struct sk_buff *buf)
645 {
646         struct tipc_msg *msg = buf_msg(buf);
647         struct tipc_port *port = &tsk->port;
648         int conn_cong;
649
650         /* Ignore if connection cannot be validated: */
651         if (!port->connected || !tipc_port_peer_msg(port, msg))
652                 goto exit;
653
654         port->probing_state = TIPC_CONN_OK;
655
656         if (msg_type(msg) == CONN_ACK) {
657                 conn_cong = tipc_sk_conn_cong(tsk);
658                 tsk->sent_unacked -= msg_msgcnt(msg);
659                 if (conn_cong)
660                         tsk->sk.sk_write_space(&tsk->sk);
661         } else if (msg_type(msg) == CONN_PROBE) {
662                 if (!tipc_msg_reverse(buf, dnode, TIPC_OK))
663                         return TIPC_OK;
664                 msg_set_type(msg, CONN_PROBE_REPLY);
665                 return TIPC_FWD_MSG;
666         }
667         /* Do nothing if msg_type() == CONN_PROBE_REPLY */
668 exit:
669         kfree_skb(buf);
670         return TIPC_OK;
671 }
672
673 /**
674  * dest_name_check - verify user is permitted to send to specified port name
675  * @dest: destination address
676  * @m: descriptor for message to be sent
677  *
678  * Prevents restricted configuration commands from being issued by
679  * unauthorized users.
680  *
681  * Returns 0 if permission is granted, otherwise errno
682  */
683 static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
684 {
685         struct tipc_cfg_msg_hdr hdr;
686
687         if (unlikely(dest->addrtype == TIPC_ADDR_ID))
688                 return 0;
689         if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
690                 return 0;
691         if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
692                 return 0;
693         if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
694                 return -EACCES;
695
696         if (!m->msg_iovlen || (m->msg_iov[0].iov_len < sizeof(hdr)))
697                 return -EMSGSIZE;
698         if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr)))
699                 return -EFAULT;
700         if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN)))
701                 return -EACCES;
702
703         return 0;
704 }
705
706 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
707 {
708         struct sock *sk = sock->sk;
709         struct tipc_sock *tsk = tipc_sk(sk);
710         DEFINE_WAIT(wait);
711         int done;
712
713         do {
714                 int err = sock_error(sk);
715                 if (err)
716                         return err;
717                 if (sock->state == SS_DISCONNECTING)
718                         return -EPIPE;
719                 if (!*timeo_p)
720                         return -EAGAIN;
721                 if (signal_pending(current))
722                         return sock_intr_errno(*timeo_p);
723
724                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
725                 done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
726                 finish_wait(sk_sleep(sk), &wait);
727         } while (!done);
728         return 0;
729 }
730
731 /**
732  * tipc_sendmsg - send message in connectionless manner
733  * @iocb: if NULL, indicates that socket lock is already held
734  * @sock: socket structure
735  * @m: message to send
736  * @dsz: amount of user data to be sent
737  *
738  * Message must have an destination specified explicitly.
739  * Used for SOCK_RDM and SOCK_DGRAM messages,
740  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
741  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
742  *
743  * Returns the number of bytes sent on success, or errno otherwise
744  */
745 static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
746                         struct msghdr *m, size_t dsz)
747 {
748         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
749         struct sock *sk = sock->sk;
750         struct tipc_sock *tsk = tipc_sk(sk);
751         struct tipc_port *port = &tsk->port;
752         struct tipc_msg *mhdr = &port->phdr;
753         struct iovec *iov = m->msg_iov;
754         u32 dnode, dport;
755         struct sk_buff *buf;
756         struct tipc_name_seq *seq = &dest->addr.nameseq;
757         u32 mtu;
758         long timeo;
759         int rc = -EINVAL;
760
761         if (unlikely(!dest))
762                 return -EDESTADDRREQ;
763
764         if (unlikely((m->msg_namelen < sizeof(*dest)) ||
765                      (dest->family != AF_TIPC)))
766                 return -EINVAL;
767
768         if (dsz > TIPC_MAX_USER_MSG_SIZE)
769                 return -EMSGSIZE;
770
771         if (iocb)
772                 lock_sock(sk);
773
774         if (unlikely(sock->state != SS_READY)) {
775                 if (sock->state == SS_LISTENING) {
776                         rc = -EPIPE;
777                         goto exit;
778                 }
779                 if (sock->state != SS_UNCONNECTED) {
780                         rc = -EISCONN;
781                         goto exit;
782                 }
783                 if (tsk->port.published) {
784                         rc = -EOPNOTSUPP;
785                         goto exit;
786                 }
787                 if (dest->addrtype == TIPC_ADDR_NAME) {
788                         tsk->port.conn_type = dest->addr.name.name.type;
789                         tsk->port.conn_instance = dest->addr.name.name.instance;
790                 }
791         }
792         rc = dest_name_check(dest, m);
793         if (rc)
794                 goto exit;
795
796         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
797
798         if (dest->addrtype == TIPC_ADDR_MCAST) {
799                 rc = tipc_sendmcast(sock, seq, iov, dsz, timeo);
800                 goto exit;
801         } else if (dest->addrtype == TIPC_ADDR_NAME) {
802                 u32 type = dest->addr.name.name.type;
803                 u32 inst = dest->addr.name.name.instance;
804                 u32 domain = dest->addr.name.domain;
805
806                 dnode = domain;
807                 msg_set_type(mhdr, TIPC_NAMED_MSG);
808                 msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
809                 msg_set_nametype(mhdr, type);
810                 msg_set_nameinst(mhdr, inst);
811                 msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
812                 dport = tipc_nametbl_translate(type, inst, &dnode);
813                 msg_set_destnode(mhdr, dnode);
814                 msg_set_destport(mhdr, dport);
815                 if (unlikely(!dport && !dnode)) {
816                         rc = -EHOSTUNREACH;
817                         goto exit;
818                 }
819         } else if (dest->addrtype == TIPC_ADDR_ID) {
820                 dnode = dest->addr.id.node;
821                 msg_set_type(mhdr, TIPC_DIRECT_MSG);
822                 msg_set_lookup_scope(mhdr, 0);
823                 msg_set_destnode(mhdr, dnode);
824                 msg_set_destport(mhdr, dest->addr.id.ref);
825                 msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
826         }
827
828 new_mtu:
829         mtu = tipc_node_get_mtu(dnode, tsk->port.ref);
830         rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf);
831         if (rc < 0)
832                 goto exit;
833
834         do {
835                 TIPC_SKB_CB(buf)->wakeup_pending = tsk->link_cong;
836                 rc = tipc_link_xmit(buf, dnode, tsk->port.ref);
837                 if (likely(rc >= 0)) {
838                         if (sock->state != SS_READY)
839                                 sock->state = SS_CONNECTING;
840                         rc = dsz;
841                         break;
842                 }
843                 if (rc == -EMSGSIZE)
844                         goto new_mtu;
845                 if (rc != -ELINKCONG)
846                         break;
847                 tsk->link_cong = 1;
848                 rc = tipc_wait_for_sndmsg(sock, &timeo);
849                 if (rc)
850                         kfree_skb_list(buf);
851         } while (!rc);
852 exit:
853         if (iocb)
854                 release_sock(sk);
855
856         return rc;
857 }
858
859 static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
860 {
861         struct sock *sk = sock->sk;
862         struct tipc_sock *tsk = tipc_sk(sk);
863         DEFINE_WAIT(wait);
864         int done;
865
866         do {
867                 int err = sock_error(sk);
868                 if (err)
869                         return err;
870                 if (sock->state == SS_DISCONNECTING)
871                         return -EPIPE;
872                 else if (sock->state != SS_CONNECTED)
873                         return -ENOTCONN;
874                 if (!*timeo_p)
875                         return -EAGAIN;
876                 if (signal_pending(current))
877                         return sock_intr_errno(*timeo_p);
878
879                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
880                 done = sk_wait_event(sk, timeo_p,
881                                      (!tsk->link_cong &&
882                                       !tipc_sk_conn_cong(tsk)) ||
883                                      !tsk->port.connected);
884                 finish_wait(sk_sleep(sk), &wait);
885         } while (!done);
886         return 0;
887 }
888
889 /**
890  * tipc_send_stream - send stream-oriented data
891  * @iocb: (unused)
892  * @sock: socket structure
893  * @m: data to send
894  * @dsz: total length of data to be transmitted
895  *
896  * Used for SOCK_STREAM data.
897  *
898  * Returns the number of bytes sent on success (or partial success),
899  * or errno if no data sent
900  */
901 static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
902                             struct msghdr *m, size_t dsz)
903 {
904         struct sock *sk = sock->sk;
905         struct tipc_sock *tsk = tipc_sk(sk);
906         struct tipc_port *port = &tsk->port;
907         struct tipc_msg *mhdr = &port->phdr;
908         struct sk_buff *buf;
909         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
910         u32 ref = port->ref;
911         int rc = -EINVAL;
912         long timeo;
913         u32 dnode;
914         uint mtu, send, sent = 0;
915
916         /* Handle implied connection establishment */
917         if (unlikely(dest)) {
918                 rc = tipc_sendmsg(iocb, sock, m, dsz);
919                 if (dsz && (dsz == rc))
920                         tsk->sent_unacked = 1;
921                 return rc;
922         }
923         if (dsz > (uint)INT_MAX)
924                 return -EMSGSIZE;
925
926         if (iocb)
927                 lock_sock(sk);
928
929         if (unlikely(sock->state != SS_CONNECTED)) {
930                 if (sock->state == SS_DISCONNECTING)
931                         rc = -EPIPE;
932                 else
933                         rc = -ENOTCONN;
934                 goto exit;
935         }
936
937         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
938         dnode = tipc_port_peernode(port);
939
940 next:
941         mtu = port->max_pkt;
942         send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
943         rc = tipc_msg_build(mhdr, m->msg_iov, sent, send, mtu, &buf);
944         if (unlikely(rc < 0))
945                 goto exit;
946         do {
947                 if (likely(!tipc_sk_conn_cong(tsk))) {
948                         rc = tipc_link_xmit(buf, dnode, ref);
949                         if (likely(!rc)) {
950                                 tsk->sent_unacked++;
951                                 sent += send;
952                                 if (sent == dsz)
953                                         break;
954                                 goto next;
955                         }
956                         if (rc == -EMSGSIZE) {
957                                 port->max_pkt = tipc_node_get_mtu(dnode, ref);
958                                 goto next;
959                         }
960                         if (rc != -ELINKCONG)
961                                 break;
962                         tsk->link_cong = 1;
963                 }
964                 rc = tipc_wait_for_sndpkt(sock, &timeo);
965                 if (rc)
966                         kfree_skb_list(buf);
967         } while (!rc);
968 exit:
969         if (iocb)
970                 release_sock(sk);
971         return sent ? sent : rc;
972 }
973
974 /**
975  * tipc_send_packet - send a connection-oriented message
976  * @iocb: if NULL, indicates that socket lock is already held
977  * @sock: socket structure
978  * @m: message to send
979  * @dsz: length of data to be transmitted
980  *
981  * Used for SOCK_SEQPACKET messages.
982  *
983  * Returns the number of bytes sent on success, or errno otherwise
984  */
985 static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
986                             struct msghdr *m, size_t dsz)
987 {
988         if (dsz > TIPC_MAX_USER_MSG_SIZE)
989                 return -EMSGSIZE;
990
991         return tipc_send_stream(iocb, sock, m, dsz);
992 }
993
994 /* tipc_sk_finish_conn - complete the setup of a connection
995  */
996 static void tipc_sk_finish_conn(struct tipc_port *port, u32 peer_port,
997                                 u32 peer_node)
998 {
999         struct tipc_msg *msg = &port->phdr;
1000
1001         msg_set_destnode(msg, peer_node);
1002         msg_set_destport(msg, peer_port);
1003         msg_set_type(msg, TIPC_CONN_MSG);
1004         msg_set_lookup_scope(msg, 0);
1005         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1006
1007         port->probing_interval = CONN_PROBING_INTERVAL;
1008         port->probing_state = TIPC_CONN_OK;
1009         port->connected = 1;
1010         k_start_timer(&port->timer, port->probing_interval);
1011         tipc_node_add_conn(peer_node, port->ref, peer_port);
1012         port->max_pkt = tipc_node_get_mtu(peer_node, port->ref);
1013 }
1014
1015 /**
1016  * set_orig_addr - capture sender's address for received message
1017  * @m: descriptor for message info
1018  * @msg: received message header
1019  *
1020  * Note: Address is not captured if not requested by receiver.
1021  */
1022 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
1023 {
1024         DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
1025
1026         if (addr) {
1027                 addr->family = AF_TIPC;
1028                 addr->addrtype = TIPC_ADDR_ID;
1029                 memset(&addr->addr, 0, sizeof(addr->addr));
1030                 addr->addr.id.ref = msg_origport(msg);
1031                 addr->addr.id.node = msg_orignode(msg);
1032                 addr->addr.name.domain = 0;     /* could leave uninitialized */
1033                 addr->scope = 0;                /* could leave uninitialized */
1034                 m->msg_namelen = sizeof(struct sockaddr_tipc);
1035         }
1036 }
1037
1038 /**
1039  * anc_data_recv - optionally capture ancillary data for received message
1040  * @m: descriptor for message info
1041  * @msg: received message header
1042  * @tport: TIPC port associated with message
1043  *
1044  * Note: Ancillary data is not captured if not requested by receiver.
1045  *
1046  * Returns 0 if successful, otherwise errno
1047  */
1048 static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
1049                          struct tipc_port *tport)
1050 {
1051         u32 anc_data[3];
1052         u32 err;
1053         u32 dest_type;
1054         int has_name;
1055         int res;
1056
1057         if (likely(m->msg_controllen == 0))
1058                 return 0;
1059
1060         /* Optionally capture errored message object(s) */
1061         err = msg ? msg_errcode(msg) : 0;
1062         if (unlikely(err)) {
1063                 anc_data[0] = err;
1064                 anc_data[1] = msg_data_sz(msg);
1065                 res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
1066                 if (res)
1067                         return res;
1068                 if (anc_data[1]) {
1069                         res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
1070                                        msg_data(msg));
1071                         if (res)
1072                                 return res;
1073                 }
1074         }
1075
1076         /* Optionally capture message destination object */
1077         dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
1078         switch (dest_type) {
1079         case TIPC_NAMED_MSG:
1080                 has_name = 1;
1081                 anc_data[0] = msg_nametype(msg);
1082                 anc_data[1] = msg_namelower(msg);
1083                 anc_data[2] = msg_namelower(msg);
1084                 break;
1085         case TIPC_MCAST_MSG:
1086                 has_name = 1;
1087                 anc_data[0] = msg_nametype(msg);
1088                 anc_data[1] = msg_namelower(msg);
1089                 anc_data[2] = msg_nameupper(msg);
1090                 break;
1091         case TIPC_CONN_MSG:
1092                 has_name = (tport->conn_type != 0);
1093                 anc_data[0] = tport->conn_type;
1094                 anc_data[1] = tport->conn_instance;
1095                 anc_data[2] = tport->conn_instance;
1096                 break;
1097         default:
1098                 has_name = 0;
1099         }
1100         if (has_name) {
1101                 res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
1102                 if (res)
1103                         return res;
1104         }
1105
1106         return 0;
1107 }
1108
1109 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
1110 {
1111         struct sock *sk = sock->sk;
1112         DEFINE_WAIT(wait);
1113         long timeo = *timeop;
1114         int err;
1115
1116         for (;;) {
1117                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1118                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1119                         if (sock->state == SS_DISCONNECTING) {
1120                                 err = -ENOTCONN;
1121                                 break;
1122                         }
1123                         release_sock(sk);
1124                         timeo = schedule_timeout(timeo);
1125                         lock_sock(sk);
1126                 }
1127                 err = 0;
1128                 if (!skb_queue_empty(&sk->sk_receive_queue))
1129                         break;
1130                 err = sock_intr_errno(timeo);
1131                 if (signal_pending(current))
1132                         break;
1133                 err = -EAGAIN;
1134                 if (!timeo)
1135                         break;
1136         }
1137         finish_wait(sk_sleep(sk), &wait);
1138         *timeop = timeo;
1139         return err;
1140 }
1141
1142 /**
1143  * tipc_recvmsg - receive packet-oriented message
1144  * @iocb: (unused)
1145  * @m: descriptor for message info
1146  * @buf_len: total size of user buffer area
1147  * @flags: receive flags
1148  *
1149  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1150  * If the complete message doesn't fit in user area, truncate it.
1151  *
1152  * Returns size of returned message data, errno otherwise
1153  */
1154 static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock,
1155                         struct msghdr *m, size_t buf_len, int flags)
1156 {
1157         struct sock *sk = sock->sk;
1158         struct tipc_sock *tsk = tipc_sk(sk);
1159         struct tipc_port *port = &tsk->port;
1160         struct sk_buff *buf;
1161         struct tipc_msg *msg;
1162         long timeo;
1163         unsigned int sz;
1164         u32 err;
1165         int res;
1166
1167         /* Catch invalid receive requests */
1168         if (unlikely(!buf_len))
1169                 return -EINVAL;
1170
1171         lock_sock(sk);
1172
1173         if (unlikely(sock->state == SS_UNCONNECTED)) {
1174                 res = -ENOTCONN;
1175                 goto exit;
1176         }
1177
1178         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1179 restart:
1180
1181         /* Look for a message in receive queue; wait if necessary */
1182         res = tipc_wait_for_rcvmsg(sock, &timeo);
1183         if (res)
1184                 goto exit;
1185
1186         /* Look at first message in receive queue */
1187         buf = skb_peek(&sk->sk_receive_queue);
1188         msg = buf_msg(buf);
1189         sz = msg_data_sz(msg);
1190         err = msg_errcode(msg);
1191
1192         /* Discard an empty non-errored message & try again */
1193         if ((!sz) && (!err)) {
1194                 advance_rx_queue(sk);
1195                 goto restart;
1196         }
1197
1198         /* Capture sender's address (optional) */
1199         set_orig_addr(m, msg);
1200
1201         /* Capture ancillary data (optional) */
1202         res = anc_data_recv(m, msg, port);
1203         if (res)
1204                 goto exit;
1205
1206         /* Capture message data (if valid) & compute return value (always) */
1207         if (!err) {
1208                 if (unlikely(buf_len < sz)) {
1209                         sz = buf_len;
1210                         m->msg_flags |= MSG_TRUNC;
1211                 }
1212                 res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg),
1213                                               m->msg_iov, sz);
1214                 if (res)
1215                         goto exit;
1216                 res = sz;
1217         } else {
1218                 if ((sock->state == SS_READY) ||
1219                     ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1220                         res = 0;
1221                 else
1222                         res = -ECONNRESET;
1223         }
1224
1225         /* Consume received message (optional) */
1226         if (likely(!(flags & MSG_PEEK))) {
1227                 if ((sock->state != SS_READY) &&
1228                     (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1229                         tipc_acknowledge(port->ref, tsk->rcv_unacked);
1230                         tsk->rcv_unacked = 0;
1231                 }
1232                 advance_rx_queue(sk);
1233         }
1234 exit:
1235         release_sock(sk);
1236         return res;
1237 }
1238
1239 /**
1240  * tipc_recv_stream - receive stream-oriented data
1241  * @iocb: (unused)
1242  * @m: descriptor for message info
1243  * @buf_len: total size of user buffer area
1244  * @flags: receive flags
1245  *
1246  * Used for SOCK_STREAM messages only.  If not enough data is available
1247  * will optionally wait for more; never truncates data.
1248  *
1249  * Returns size of returned message data, errno otherwise
1250  */
1251 static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock,
1252                             struct msghdr *m, size_t buf_len, int flags)
1253 {
1254         struct sock *sk = sock->sk;
1255         struct tipc_sock *tsk = tipc_sk(sk);
1256         struct tipc_port *port = &tsk->port;
1257         struct sk_buff *buf;
1258         struct tipc_msg *msg;
1259         long timeo;
1260         unsigned int sz;
1261         int sz_to_copy, target, needed;
1262         int sz_copied = 0;
1263         u32 err;
1264         int res = 0;
1265
1266         /* Catch invalid receive attempts */
1267         if (unlikely(!buf_len))
1268                 return -EINVAL;
1269
1270         lock_sock(sk);
1271
1272         if (unlikely(sock->state == SS_UNCONNECTED)) {
1273                 res = -ENOTCONN;
1274                 goto exit;
1275         }
1276
1277         target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1278         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1279
1280 restart:
1281         /* Look for a message in receive queue; wait if necessary */
1282         res = tipc_wait_for_rcvmsg(sock, &timeo);
1283         if (res)
1284                 goto exit;
1285
1286         /* Look at first message in receive queue */
1287         buf = skb_peek(&sk->sk_receive_queue);
1288         msg = buf_msg(buf);
1289         sz = msg_data_sz(msg);
1290         err = msg_errcode(msg);
1291
1292         /* Discard an empty non-errored message & try again */
1293         if ((!sz) && (!err)) {
1294                 advance_rx_queue(sk);
1295                 goto restart;
1296         }
1297
1298         /* Optionally capture sender's address & ancillary data of first msg */
1299         if (sz_copied == 0) {
1300                 set_orig_addr(m, msg);
1301                 res = anc_data_recv(m, msg, port);
1302                 if (res)
1303                         goto exit;
1304         }
1305
1306         /* Capture message data (if valid) & compute return value (always) */
1307         if (!err) {
1308                 u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1309
1310                 sz -= offset;
1311                 needed = (buf_len - sz_copied);
1312                 sz_to_copy = (sz <= needed) ? sz : needed;
1313
1314                 res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg) + offset,
1315                                               m->msg_iov, sz_to_copy);
1316                 if (res)
1317                         goto exit;
1318
1319                 sz_copied += sz_to_copy;
1320
1321                 if (sz_to_copy < sz) {
1322                         if (!(flags & MSG_PEEK))
1323                                 TIPC_SKB_CB(buf)->handle =
1324                                 (void *)(unsigned long)(offset + sz_to_copy);
1325                         goto exit;
1326                 }
1327         } else {
1328                 if (sz_copied != 0)
1329                         goto exit; /* can't add error msg to valid data */
1330
1331                 if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1332                         res = 0;
1333                 else
1334                         res = -ECONNRESET;
1335         }
1336
1337         /* Consume received message (optional) */
1338         if (likely(!(flags & MSG_PEEK))) {
1339                 if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1340                         tipc_acknowledge(port->ref, tsk->rcv_unacked);
1341                         tsk->rcv_unacked = 0;
1342                 }
1343                 advance_rx_queue(sk);
1344         }
1345
1346         /* Loop around if more data is required */
1347         if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1348             (!skb_queue_empty(&sk->sk_receive_queue) ||
1349             (sz_copied < target)) &&    /* and more is ready or required */
1350             (!(flags & MSG_PEEK)) &&    /* and aren't just peeking at data */
1351             (!err))                     /* and haven't reached a FIN */
1352                 goto restart;
1353
1354 exit:
1355         release_sock(sk);
1356         return sz_copied ? sz_copied : res;
1357 }
1358
1359 /**
1360  * tipc_write_space - wake up thread if port congestion is released
1361  * @sk: socket
1362  */
1363 static void tipc_write_space(struct sock *sk)
1364 {
1365         struct socket_wq *wq;
1366
1367         rcu_read_lock();
1368         wq = rcu_dereference(sk->sk_wq);
1369         if (wq_has_sleeper(wq))
1370                 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1371                                                 POLLWRNORM | POLLWRBAND);
1372         rcu_read_unlock();
1373 }
1374
1375 /**
1376  * tipc_data_ready - wake up threads to indicate messages have been received
1377  * @sk: socket
1378  * @len: the length of messages
1379  */
1380 static void tipc_data_ready(struct sock *sk)
1381 {
1382         struct socket_wq *wq;
1383
1384         rcu_read_lock();
1385         wq = rcu_dereference(sk->sk_wq);
1386         if (wq_has_sleeper(wq))
1387                 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1388                                                 POLLRDNORM | POLLRDBAND);
1389         rcu_read_unlock();
1390 }
1391
1392 /**
1393  * filter_connect - Handle all incoming messages for a connection-based socket
1394  * @tsk: TIPC socket
1395  * @msg: message
1396  *
1397  * Returns 0 (TIPC_OK) if everyting ok, -TIPC_ERR_NO_PORT otherwise
1398  */
1399 static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
1400 {
1401         struct sock *sk = &tsk->sk;
1402         struct tipc_port *port = &tsk->port;
1403         struct socket *sock = sk->sk_socket;
1404         struct tipc_msg *msg = buf_msg(*buf);
1405
1406         int retval = -TIPC_ERR_NO_PORT;
1407
1408         if (msg_mcast(msg))
1409                 return retval;
1410
1411         switch ((int)sock->state) {
1412         case SS_CONNECTED:
1413                 /* Accept only connection-based messages sent by peer */
1414                 if (msg_connected(msg) && tipc_port_peer_msg(port, msg)) {
1415                         if (unlikely(msg_errcode(msg))) {
1416                                 sock->state = SS_DISCONNECTING;
1417                                 port->connected = 0;
1418                                 /* let timer expire on it's own */
1419                                 tipc_node_remove_conn(tipc_port_peernode(port),
1420                                                       port->ref);
1421                         }
1422                         retval = TIPC_OK;
1423                 }
1424                 break;
1425         case SS_CONNECTING:
1426                 /* Accept only ACK or NACK message */
1427
1428                 if (unlikely(!msg_connected(msg)))
1429                         break;
1430
1431                 if (unlikely(msg_errcode(msg))) {
1432                         sock->state = SS_DISCONNECTING;
1433                         sk->sk_err = ECONNREFUSED;
1434                         retval = TIPC_OK;
1435                         break;
1436                 }
1437
1438                 if (unlikely(msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)) {
1439                         sock->state = SS_DISCONNECTING;
1440                         sk->sk_err = EINVAL;
1441                         retval = TIPC_OK;
1442                         break;
1443                 }
1444
1445                 tipc_sk_finish_conn(port, msg_origport(msg), msg_orignode(msg));
1446                 msg_set_importance(&port->phdr, msg_importance(msg));
1447                 sock->state = SS_CONNECTED;
1448
1449                 /* If an incoming message is an 'ACK-', it should be
1450                  * discarded here because it doesn't contain useful
1451                  * data. In addition, we should try to wake up
1452                  * connect() routine if sleeping.
1453                  */
1454                 if (msg_data_sz(msg) == 0) {
1455                         kfree_skb(*buf);
1456                         *buf = NULL;
1457                         if (waitqueue_active(sk_sleep(sk)))
1458                                 wake_up_interruptible(sk_sleep(sk));
1459                 }
1460                 retval = TIPC_OK;
1461                 break;
1462         case SS_LISTENING:
1463         case SS_UNCONNECTED:
1464                 /* Accept only SYN message */
1465                 if (!msg_connected(msg) && !(msg_errcode(msg)))
1466                         retval = TIPC_OK;
1467                 break;
1468         case SS_DISCONNECTING:
1469                 break;
1470         default:
1471                 pr_err("Unknown socket state %u\n", sock->state);
1472         }
1473         return retval;
1474 }
1475
1476 /**
1477  * rcvbuf_limit - get proper overload limit of socket receive queue
1478  * @sk: socket
1479  * @buf: message
1480  *
1481  * For all connection oriented messages, irrespective of importance,
1482  * the default overload value (i.e. 67MB) is set as limit.
1483  *
1484  * For all connectionless messages, by default new queue limits are
1485  * as belows:
1486  *
1487  * TIPC_LOW_IMPORTANCE       (4 MB)
1488  * TIPC_MEDIUM_IMPORTANCE    (8 MB)
1489  * TIPC_HIGH_IMPORTANCE      (16 MB)
1490  * TIPC_CRITICAL_IMPORTANCE  (32 MB)
1491  *
1492  * Returns overload limit according to corresponding message importance
1493  */
1494 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1495 {
1496         struct tipc_msg *msg = buf_msg(buf);
1497
1498         if (msg_connected(msg))
1499                 return sysctl_tipc_rmem[2];
1500
1501         return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
1502                 msg_importance(msg);
1503 }
1504
1505 /**
1506  * filter_rcv - validate incoming message
1507  * @sk: socket
1508  * @buf: message
1509  *
1510  * Enqueues message on receive queue if acceptable; optionally handles
1511  * disconnect indication for a connected socket.
1512  *
1513  * Called with socket lock already taken; port lock may also be taken.
1514  *
1515  * Returns 0 (TIPC_OK) if message was consumed, -TIPC error code if message
1516  * to be rejected, 1 (TIPC_FWD_MSG) if (CONN_MANAGER) message to be forwarded
1517  */
1518 static int filter_rcv(struct sock *sk, struct sk_buff *buf)
1519 {
1520         struct socket *sock = sk->sk_socket;
1521         struct tipc_sock *tsk = tipc_sk(sk);
1522         struct tipc_msg *msg = buf_msg(buf);
1523         unsigned int limit = rcvbuf_limit(sk, buf);
1524         u32 onode;
1525         int rc = TIPC_OK;
1526
1527         if (unlikely(msg_user(msg) == CONN_MANAGER))
1528                 return tipc_sk_proto_rcv(tsk, &onode, buf);
1529
1530         if (unlikely(msg_user(msg) == SOCK_WAKEUP)) {
1531                 kfree_skb(buf);
1532                 tsk->link_cong = 0;
1533                 sk->sk_write_space(sk);
1534                 return TIPC_OK;
1535         }
1536
1537         /* Reject message if it is wrong sort of message for socket */
1538         if (msg_type(msg) > TIPC_DIRECT_MSG)
1539                 return -TIPC_ERR_NO_PORT;
1540
1541         if (sock->state == SS_READY) {
1542                 if (msg_connected(msg))
1543                         return -TIPC_ERR_NO_PORT;
1544         } else {
1545                 rc = filter_connect(tsk, &buf);
1546                 if (rc != TIPC_OK || buf == NULL)
1547                         return rc;
1548         }
1549
1550         /* Reject message if there isn't room to queue it */
1551         if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
1552                 return -TIPC_ERR_OVERLOAD;
1553
1554         /* Enqueue message */
1555         TIPC_SKB_CB(buf)->handle = NULL;
1556         __skb_queue_tail(&sk->sk_receive_queue, buf);
1557         skb_set_owner_r(buf, sk);
1558
1559         sk->sk_data_ready(sk);
1560         return TIPC_OK;
1561 }
1562
1563 /**
1564  * tipc_backlog_rcv - handle incoming message from backlog queue
1565  * @sk: socket
1566  * @buf: message
1567  *
1568  * Caller must hold socket lock, but not port lock.
1569  *
1570  * Returns 0
1571  */
1572 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
1573 {
1574         int rc;
1575         u32 onode;
1576         struct tipc_sock *tsk = tipc_sk(sk);
1577         uint truesize = buf->truesize;
1578
1579         rc = filter_rcv(sk, buf);
1580
1581         if (likely(!rc)) {
1582                 if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
1583                         atomic_add(truesize, &tsk->dupl_rcvcnt);
1584                 return 0;
1585         }
1586
1587         if ((rc < 0) && !tipc_msg_reverse(buf, &onode, -rc))
1588                 return 0;
1589
1590         tipc_link_xmit(buf, onode, 0);
1591
1592         return 0;
1593 }
1594
1595 /**
1596  * tipc_sk_rcv - handle incoming message
1597  * @buf: buffer containing arriving message
1598  * Consumes buffer
1599  * Returns 0 if success, or errno: -EHOSTUNREACH
1600  */
1601 int tipc_sk_rcv(struct sk_buff *buf)
1602 {
1603         struct tipc_sock *tsk;
1604         struct tipc_port *port;
1605         struct sock *sk;
1606         u32 dport = msg_destport(buf_msg(buf));
1607         int rc = TIPC_OK;
1608         uint limit;
1609         u32 dnode;
1610
1611         /* Validate destination and message */
1612         port = tipc_port_lock(dport);
1613         if (unlikely(!port)) {
1614                 rc = tipc_msg_eval(buf, &dnode);
1615                 goto exit;
1616         }
1617
1618         tsk = tipc_port_to_sock(port);
1619         sk = &tsk->sk;
1620
1621         /* Queue message */
1622         bh_lock_sock(sk);
1623
1624         if (!sock_owned_by_user(sk)) {
1625                 rc = filter_rcv(sk, buf);
1626         } else {
1627                 if (sk->sk_backlog.len == 0)
1628                         atomic_set(&tsk->dupl_rcvcnt, 0);
1629                 limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt);
1630                 if (sk_add_backlog(sk, buf, limit))
1631                         rc = -TIPC_ERR_OVERLOAD;
1632         }
1633         bh_unlock_sock(sk);
1634         tipc_port_unlock(port);
1635
1636         if (likely(!rc))
1637                 return 0;
1638 exit:
1639         if ((rc < 0) && !tipc_msg_reverse(buf, &dnode, -rc))
1640                 return -EHOSTUNREACH;
1641
1642         tipc_link_xmit(buf, dnode, 0);
1643         return (rc < 0) ? -EHOSTUNREACH : 0;
1644 }
1645
1646 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1647 {
1648         struct sock *sk = sock->sk;
1649         DEFINE_WAIT(wait);
1650         int done;
1651
1652         do {
1653                 int err = sock_error(sk);
1654                 if (err)
1655                         return err;
1656                 if (!*timeo_p)
1657                         return -ETIMEDOUT;
1658                 if (signal_pending(current))
1659                         return sock_intr_errno(*timeo_p);
1660
1661                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1662                 done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
1663                 finish_wait(sk_sleep(sk), &wait);
1664         } while (!done);
1665         return 0;
1666 }
1667
1668 /**
1669  * tipc_connect - establish a connection to another TIPC port
1670  * @sock: socket structure
1671  * @dest: socket address for destination port
1672  * @destlen: size of socket address data structure
1673  * @flags: file-related flags associated with socket
1674  *
1675  * Returns 0 on success, errno otherwise
1676  */
1677 static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1678                         int destlen, int flags)
1679 {
1680         struct sock *sk = sock->sk;
1681         struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1682         struct msghdr m = {NULL,};
1683         long timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
1684         socket_state previous;
1685         int res;
1686
1687         lock_sock(sk);
1688
1689         /* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1690         if (sock->state == SS_READY) {
1691                 res = -EOPNOTSUPP;
1692                 goto exit;
1693         }
1694
1695         /*
1696          * Reject connection attempt using multicast address
1697          *
1698          * Note: send_msg() validates the rest of the address fields,
1699          *       so there's no need to do it here
1700          */
1701         if (dst->addrtype == TIPC_ADDR_MCAST) {
1702                 res = -EINVAL;
1703                 goto exit;
1704         }
1705
1706         previous = sock->state;
1707         switch (sock->state) {
1708         case SS_UNCONNECTED:
1709                 /* Send a 'SYN-' to destination */
1710                 m.msg_name = dest;
1711                 m.msg_namelen = destlen;
1712
1713                 /* If connect is in non-blocking case, set MSG_DONTWAIT to
1714                  * indicate send_msg() is never blocked.
1715                  */
1716                 if (!timeout)
1717                         m.msg_flags = MSG_DONTWAIT;
1718
1719                 res = tipc_sendmsg(NULL, sock, &m, 0);
1720                 if ((res < 0) && (res != -EWOULDBLOCK))
1721                         goto exit;
1722
1723                 /* Just entered SS_CONNECTING state; the only
1724                  * difference is that return value in non-blocking
1725                  * case is EINPROGRESS, rather than EALREADY.
1726                  */
1727                 res = -EINPROGRESS;
1728         case SS_CONNECTING:
1729                 if (previous == SS_CONNECTING)
1730                         res = -EALREADY;
1731                 if (!timeout)
1732                         goto exit;
1733                 timeout = msecs_to_jiffies(timeout);
1734                 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1735                 res = tipc_wait_for_connect(sock, &timeout);
1736                 break;
1737         case SS_CONNECTED:
1738                 res = -EISCONN;
1739                 break;
1740         default:
1741                 res = -EINVAL;
1742                 break;
1743         }
1744 exit:
1745         release_sock(sk);
1746         return res;
1747 }
1748
1749 /**
1750  * tipc_listen - allow socket to listen for incoming connections
1751  * @sock: socket structure
1752  * @len: (unused)
1753  *
1754  * Returns 0 on success, errno otherwise
1755  */
1756 static int tipc_listen(struct socket *sock, int len)
1757 {
1758         struct sock *sk = sock->sk;
1759         int res;
1760
1761         lock_sock(sk);
1762
1763         if (sock->state != SS_UNCONNECTED)
1764                 res = -EINVAL;
1765         else {
1766                 sock->state = SS_LISTENING;
1767                 res = 0;
1768         }
1769
1770         release_sock(sk);
1771         return res;
1772 }
1773
1774 static int tipc_wait_for_accept(struct socket *sock, long timeo)
1775 {
1776         struct sock *sk = sock->sk;
1777         DEFINE_WAIT(wait);
1778         int err;
1779
1780         /* True wake-one mechanism for incoming connections: only
1781          * one process gets woken up, not the 'whole herd'.
1782          * Since we do not 'race & poll' for established sockets
1783          * anymore, the common case will execute the loop only once.
1784         */
1785         for (;;) {
1786                 prepare_to_wait_exclusive(sk_sleep(sk), &wait,
1787                                           TASK_INTERRUPTIBLE);
1788                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1789                         release_sock(sk);
1790                         timeo = schedule_timeout(timeo);
1791                         lock_sock(sk);
1792                 }
1793                 err = 0;
1794                 if (!skb_queue_empty(&sk->sk_receive_queue))
1795                         break;
1796                 err = -EINVAL;
1797                 if (sock->state != SS_LISTENING)
1798                         break;
1799                 err = sock_intr_errno(timeo);
1800                 if (signal_pending(current))
1801                         break;
1802                 err = -EAGAIN;
1803                 if (!timeo)
1804                         break;
1805         }
1806         finish_wait(sk_sleep(sk), &wait);
1807         return err;
1808 }
1809
1810 /**
1811  * tipc_accept - wait for connection request
1812  * @sock: listening socket
1813  * @newsock: new socket that is to be connected
1814  * @flags: file-related flags associated with socket
1815  *
1816  * Returns 0 on success, errno otherwise
1817  */
1818 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
1819 {
1820         struct sock *new_sk, *sk = sock->sk;
1821         struct sk_buff *buf;
1822         struct tipc_port *new_port;
1823         struct tipc_msg *msg;
1824         long timeo;
1825         int res;
1826
1827         lock_sock(sk);
1828
1829         if (sock->state != SS_LISTENING) {
1830                 res = -EINVAL;
1831                 goto exit;
1832         }
1833         timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
1834         res = tipc_wait_for_accept(sock, timeo);
1835         if (res)
1836                 goto exit;
1837
1838         buf = skb_peek(&sk->sk_receive_queue);
1839
1840         res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
1841         if (res)
1842                 goto exit;
1843
1844         new_sk = new_sock->sk;
1845         new_port = &tipc_sk(new_sk)->port;
1846         msg = buf_msg(buf);
1847
1848         /* we lock on new_sk; but lockdep sees the lock on sk */
1849         lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
1850
1851         /*
1852          * Reject any stray messages received by new socket
1853          * before the socket lock was taken (very, very unlikely)
1854          */
1855         reject_rx_queue(new_sk);
1856
1857         /* Connect new socket to it's peer */
1858         tipc_sk_finish_conn(new_port, msg_origport(msg), msg_orignode(msg));
1859         new_sock->state = SS_CONNECTED;
1860
1861         tipc_port_set_importance(new_port, msg_importance(msg));
1862         if (msg_named(msg)) {
1863                 new_port->conn_type = msg_nametype(msg);
1864                 new_port->conn_instance = msg_nameinst(msg);
1865         }
1866
1867         /*
1868          * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1869          * Respond to 'SYN+' by queuing it on new socket.
1870          */
1871         if (!msg_data_sz(msg)) {
1872                 struct msghdr m = {NULL,};
1873
1874                 advance_rx_queue(sk);
1875                 tipc_send_packet(NULL, new_sock, &m, 0);
1876         } else {
1877                 __skb_dequeue(&sk->sk_receive_queue);
1878                 __skb_queue_head(&new_sk->sk_receive_queue, buf);
1879                 skb_set_owner_r(buf, new_sk);
1880         }
1881         release_sock(new_sk);
1882 exit:
1883         release_sock(sk);
1884         return res;
1885 }
1886
1887 /**
1888  * tipc_shutdown - shutdown socket connection
1889  * @sock: socket structure
1890  * @how: direction to close (must be SHUT_RDWR)
1891  *
1892  * Terminates connection (if necessary), then purges socket's receive queue.
1893  *
1894  * Returns 0 on success, errno otherwise
1895  */
1896 static int tipc_shutdown(struct socket *sock, int how)
1897 {
1898         struct sock *sk = sock->sk;
1899         struct tipc_sock *tsk = tipc_sk(sk);
1900         struct tipc_port *port = &tsk->port;
1901         struct sk_buff *buf;
1902         u32 dnode;
1903         int res;
1904
1905         if (how != SHUT_RDWR)
1906                 return -EINVAL;
1907
1908         lock_sock(sk);
1909
1910         switch (sock->state) {
1911         case SS_CONNECTING:
1912         case SS_CONNECTED:
1913
1914 restart:
1915                 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1916                 buf = __skb_dequeue(&sk->sk_receive_queue);
1917                 if (buf) {
1918                         if (TIPC_SKB_CB(buf)->handle != NULL) {
1919                                 kfree_skb(buf);
1920                                 goto restart;
1921                         }
1922                         if (tipc_msg_reverse(buf, &dnode, TIPC_CONN_SHUTDOWN))
1923                                 tipc_link_xmit(buf, dnode, port->ref);
1924                         tipc_node_remove_conn(dnode, port->ref);
1925                 } else {
1926                         dnode = tipc_port_peernode(port);
1927                         buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
1928                                               TIPC_CONN_MSG, SHORT_H_SIZE,
1929                                               0, dnode, tipc_own_addr,
1930                                               tipc_port_peerport(port),
1931                                               port->ref, TIPC_CONN_SHUTDOWN);
1932                         tipc_link_xmit(buf, dnode, port->ref);
1933                 }
1934                 port->connected = 0;
1935                 sock->state = SS_DISCONNECTING;
1936                 tipc_node_remove_conn(dnode, port->ref);
1937                 /* fall through */
1938
1939         case SS_DISCONNECTING:
1940
1941                 /* Discard any unreceived messages */
1942                 __skb_queue_purge(&sk->sk_receive_queue);
1943
1944                 /* Wake up anyone sleeping in poll */
1945                 sk->sk_state_change(sk);
1946                 res = 0;
1947                 break;
1948
1949         default:
1950                 res = -ENOTCONN;
1951         }
1952
1953         release_sock(sk);
1954         return res;
1955 }
1956
1957 static void tipc_sk_timeout(unsigned long ref)
1958 {
1959         struct tipc_port *port = tipc_port_lock(ref);
1960         struct tipc_sock *tsk;
1961         struct sock *sk;
1962         struct sk_buff *buf = NULL;
1963         struct tipc_msg *msg = NULL;
1964         u32 peer_port, peer_node;
1965
1966         if (!port)
1967                 return;
1968
1969         if (!port->connected) {
1970                 tipc_port_unlock(port);
1971                 return;
1972         }
1973         tsk = tipc_port_to_sock(port);
1974         sk = &tsk->sk;
1975         bh_lock_sock(sk);
1976         peer_port = tipc_port_peerport(port);
1977         peer_node = tipc_port_peernode(port);
1978
1979         if (port->probing_state == TIPC_CONN_PROBING) {
1980                 /* Previous probe not answered -> self abort */
1981                 buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
1982                                       SHORT_H_SIZE, 0, tipc_own_addr,
1983                                       peer_node, ref, peer_port,
1984                                       TIPC_ERR_NO_PORT);
1985         } else {
1986                 buf = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE,
1987                                       0, peer_node, tipc_own_addr,
1988                                       peer_port, ref, TIPC_OK);
1989                 port->probing_state = TIPC_CONN_PROBING;
1990                 k_start_timer(&port->timer, port->probing_interval);
1991         }
1992         bh_unlock_sock(sk);
1993         tipc_port_unlock(port);
1994         if (!buf)
1995                 return;
1996
1997         msg = buf_msg(buf);
1998         tipc_link_xmit(buf, msg_destnode(msg),  msg_link_selector(msg));
1999 }
2000
2001 /**
2002  * tipc_setsockopt - set socket option
2003  * @sock: socket structure
2004  * @lvl: option level
2005  * @opt: option identifier
2006  * @ov: pointer to new option value
2007  * @ol: length of option value
2008  *
2009  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
2010  * (to ease compatibility).
2011  *
2012  * Returns 0 on success, errno otherwise
2013  */
2014 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2015                            char __user *ov, unsigned int ol)
2016 {
2017         struct sock *sk = sock->sk;
2018         struct tipc_sock *tsk = tipc_sk(sk);
2019         struct tipc_port *port = &tsk->port;
2020         u32 value;
2021         int res;
2022
2023         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2024                 return 0;
2025         if (lvl != SOL_TIPC)
2026                 return -ENOPROTOOPT;
2027         if (ol < sizeof(value))
2028                 return -EINVAL;
2029         res = get_user(value, (u32 __user *)ov);
2030         if (res)
2031                 return res;
2032
2033         lock_sock(sk);
2034
2035         switch (opt) {
2036         case TIPC_IMPORTANCE:
2037                 res = tipc_port_set_importance(port, value);
2038                 break;
2039         case TIPC_SRC_DROPPABLE:
2040                 if (sock->type != SOCK_STREAM)
2041                         tipc_port_set_unreliable(port, value);
2042                 else
2043                         res = -ENOPROTOOPT;
2044                 break;
2045         case TIPC_DEST_DROPPABLE:
2046                 tipc_port_set_unreturnable(port, value);
2047                 break;
2048         case TIPC_CONN_TIMEOUT:
2049                 tipc_sk(sk)->conn_timeout = value;
2050                 /* no need to set "res", since already 0 at this point */
2051                 break;
2052         default:
2053                 res = -EINVAL;
2054         }
2055
2056         release_sock(sk);
2057
2058         return res;
2059 }
2060
2061 /**
2062  * tipc_getsockopt - get socket option
2063  * @sock: socket structure
2064  * @lvl: option level
2065  * @opt: option identifier
2066  * @ov: receptacle for option value
2067  * @ol: receptacle for length of option value
2068  *
2069  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
2070  * (to ease compatibility).
2071  *
2072  * Returns 0 on success, errno otherwise
2073  */
2074 static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2075                            char __user *ov, int __user *ol)
2076 {
2077         struct sock *sk = sock->sk;
2078         struct tipc_sock *tsk = tipc_sk(sk);
2079         struct tipc_port *port = &tsk->port;
2080         int len;
2081         u32 value;
2082         int res;
2083
2084         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2085                 return put_user(0, ol);
2086         if (lvl != SOL_TIPC)
2087                 return -ENOPROTOOPT;
2088         res = get_user(len, ol);
2089         if (res)
2090                 return res;
2091
2092         lock_sock(sk);
2093
2094         switch (opt) {
2095         case TIPC_IMPORTANCE:
2096                 value = tipc_port_importance(port);
2097                 break;
2098         case TIPC_SRC_DROPPABLE:
2099                 value = tipc_port_unreliable(port);
2100                 break;
2101         case TIPC_DEST_DROPPABLE:
2102                 value = tipc_port_unreturnable(port);
2103                 break;
2104         case TIPC_CONN_TIMEOUT:
2105                 value = tipc_sk(sk)->conn_timeout;
2106                 /* no need to set "res", since already 0 at this point */
2107                 break;
2108         case TIPC_NODE_RECVQ_DEPTH:
2109                 value = 0; /* was tipc_queue_size, now obsolete */
2110                 break;
2111         case TIPC_SOCK_RECVQ_DEPTH:
2112                 value = skb_queue_len(&sk->sk_receive_queue);
2113                 break;
2114         default:
2115                 res = -EINVAL;
2116         }
2117
2118         release_sock(sk);
2119
2120         if (res)
2121                 return res;     /* "get" failed */
2122
2123         if (len < sizeof(value))
2124                 return -EINVAL;
2125
2126         if (copy_to_user(ov, &value, sizeof(value)))
2127                 return -EFAULT;
2128
2129         return put_user(sizeof(value), ol);
2130 }
2131
2132 static int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg)
2133 {
2134         struct tipc_sioc_ln_req lnr;
2135         void __user *argp = (void __user *)arg;
2136
2137         switch (cmd) {
2138         case SIOCGETLINKNAME:
2139                 if (copy_from_user(&lnr, argp, sizeof(lnr)))
2140                         return -EFAULT;
2141                 if (!tipc_node_get_linkname(lnr.bearer_id, lnr.peer,
2142                                             lnr.linkname, TIPC_MAX_LINK_NAME)) {
2143                         if (copy_to_user(argp, &lnr, sizeof(lnr)))
2144                                 return -EFAULT;
2145                         return 0;
2146                 }
2147                 return -EADDRNOTAVAIL;
2148         default:
2149                 return -ENOIOCTLCMD;
2150         }
2151 }
2152
2153 /* Protocol switches for the various types of TIPC sockets */
2154
2155 static const struct proto_ops msg_ops = {
2156         .owner          = THIS_MODULE,
2157         .family         = AF_TIPC,
2158         .release        = tipc_release,
2159         .bind           = tipc_bind,
2160         .connect        = tipc_connect,
2161         .socketpair     = sock_no_socketpair,
2162         .accept         = sock_no_accept,
2163         .getname        = tipc_getname,
2164         .poll           = tipc_poll,
2165         .ioctl          = tipc_ioctl,
2166         .listen         = sock_no_listen,
2167         .shutdown       = tipc_shutdown,
2168         .setsockopt     = tipc_setsockopt,
2169         .getsockopt     = tipc_getsockopt,
2170         .sendmsg        = tipc_sendmsg,
2171         .recvmsg        = tipc_recvmsg,
2172         .mmap           = sock_no_mmap,
2173         .sendpage       = sock_no_sendpage
2174 };
2175
2176 static const struct proto_ops packet_ops = {
2177         .owner          = THIS_MODULE,
2178         .family         = AF_TIPC,
2179         .release        = tipc_release,
2180         .bind           = tipc_bind,
2181         .connect        = tipc_connect,
2182         .socketpair     = sock_no_socketpair,
2183         .accept         = tipc_accept,
2184         .getname        = tipc_getname,
2185         .poll           = tipc_poll,
2186         .ioctl          = tipc_ioctl,
2187         .listen         = tipc_listen,
2188         .shutdown       = tipc_shutdown,
2189         .setsockopt     = tipc_setsockopt,
2190         .getsockopt     = tipc_getsockopt,
2191         .sendmsg        = tipc_send_packet,
2192         .recvmsg        = tipc_recvmsg,
2193         .mmap           = sock_no_mmap,
2194         .sendpage       = sock_no_sendpage
2195 };
2196
2197 static const struct proto_ops stream_ops = {
2198         .owner          = THIS_MODULE,
2199         .family         = AF_TIPC,
2200         .release        = tipc_release,
2201         .bind           = tipc_bind,
2202         .connect        = tipc_connect,
2203         .socketpair     = sock_no_socketpair,
2204         .accept         = tipc_accept,
2205         .getname        = tipc_getname,
2206         .poll           = tipc_poll,
2207         .ioctl          = tipc_ioctl,
2208         .listen         = tipc_listen,
2209         .shutdown       = tipc_shutdown,
2210         .setsockopt     = tipc_setsockopt,
2211         .getsockopt     = tipc_getsockopt,
2212         .sendmsg        = tipc_send_stream,
2213         .recvmsg        = tipc_recv_stream,
2214         .mmap           = sock_no_mmap,
2215         .sendpage       = sock_no_sendpage
2216 };
2217
2218 static const struct net_proto_family tipc_family_ops = {
2219         .owner          = THIS_MODULE,
2220         .family         = AF_TIPC,
2221         .create         = tipc_sk_create
2222 };
2223
2224 static struct proto tipc_proto = {
2225         .name           = "TIPC",
2226         .owner          = THIS_MODULE,
2227         .obj_size       = sizeof(struct tipc_sock),
2228         .sysctl_rmem    = sysctl_tipc_rmem
2229 };
2230
2231 static struct proto tipc_proto_kern = {
2232         .name           = "TIPC",
2233         .obj_size       = sizeof(struct tipc_sock),
2234         .sysctl_rmem    = sysctl_tipc_rmem
2235 };
2236
2237 /**
2238  * tipc_socket_init - initialize TIPC socket interface
2239  *
2240  * Returns 0 on success, errno otherwise
2241  */
2242 int tipc_socket_init(void)
2243 {
2244         int res;
2245
2246         res = proto_register(&tipc_proto, 1);
2247         if (res) {
2248                 pr_err("Failed to register TIPC protocol type\n");
2249                 goto out;
2250         }
2251
2252         res = sock_register(&tipc_family_ops);
2253         if (res) {
2254                 pr_err("Failed to register TIPC socket type\n");
2255                 proto_unregister(&tipc_proto);
2256                 goto out;
2257         }
2258  out:
2259         return res;
2260 }
2261
2262 /**
2263  * tipc_socket_stop - stop TIPC socket interface
2264  */
2265 void tipc_socket_stop(void)
2266 {
2267         sock_unregister(tipc_family_ops.family);
2268         proto_unregister(&tipc_proto);
2269 }