Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net
[cascardo/linux.git] / net / tipc / socket.c
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012-2014, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "name_table.h"
39 #include "node.h"
40 #include "link.h"
41 #include <linux/export.h>
42 #include "config.h"
43 #include "socket.h"
44
45 #define SS_LISTENING    -1      /* socket is listening */
46 #define SS_READY        -2      /* socket is connectionless */
47
48 #define CONN_TIMEOUT_DEFAULT  8000      /* default connect timeout = 8s */
49 #define CONN_PROBING_INTERVAL 3600000   /* [ms] => 1 h */
50 #define TIPC_FWD_MSG          1
51 #define TIPC_CONN_OK          0
52 #define TIPC_CONN_PROBING     1
53
54 /**
55  * struct tipc_sock - TIPC socket structure
56  * @sk: socket - interacts with 'port' and with user via the socket API
57  * @connected: non-zero if port is currently connected to a peer port
58  * @conn_type: TIPC type used when connection was established
59  * @conn_instance: TIPC instance used when connection was established
60  * @published: non-zero if port has one or more associated names
61  * @max_pkt: maximum packet size "hint" used when building messages sent by port
62  * @ref: unique reference to port in TIPC object registry
63  * @phdr: preformatted message header used when sending messages
64  * @port_list: adjacent ports in TIPC's global list of ports
65  * @publications: list of publications for port
66  * @pub_count: total # of publications port has made during its lifetime
67  * @probing_state:
68  * @probing_interval:
69  * @timer:
70  * @port: port - interacts with 'sk' and with the rest of the TIPC stack
71  * @peer_name: the peer of the connection, if any
72  * @conn_timeout: the time we can wait for an unresponded setup request
73  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
74  * @link_cong: non-zero if owner must sleep because of link congestion
75  * @sent_unacked: # messages sent by socket, and not yet acked by peer
76  * @rcv_unacked: # messages read by user, but not yet acked back to peer
77  */
78 struct tipc_sock {
79         struct sock sk;
80         int connected;
81         u32 conn_type;
82         u32 conn_instance;
83         int published;
84         u32 max_pkt;
85         u32 ref;
86         struct tipc_msg phdr;
87         struct list_head sock_list;
88         struct list_head publications;
89         u32 pub_count;
90         u32 probing_state;
91         u32 probing_interval;
92         struct timer_list timer;
93         uint conn_timeout;
94         atomic_t dupl_rcvcnt;
95         bool link_cong;
96         uint sent_unacked;
97         uint rcv_unacked;
98 };
99
100 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
101 static void tipc_data_ready(struct sock *sk);
102 static void tipc_write_space(struct sock *sk);
103 static int tipc_release(struct socket *sock);
104 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
105 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
106 static void tipc_sk_timeout(unsigned long ref);
107 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
108                            struct tipc_name_seq const *seq);
109 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
110                             struct tipc_name_seq const *seq);
111 static u32 tipc_sk_ref_acquire(struct tipc_sock *tsk);
112 static void tipc_sk_ref_discard(u32 ref);
113 static struct tipc_sock *tipc_sk_get(u32 ref);
114 static struct tipc_sock *tipc_sk_get_next(u32 *ref);
115 static void tipc_sk_put(struct tipc_sock *tsk);
116
117 static const struct proto_ops packet_ops;
118 static const struct proto_ops stream_ops;
119 static const struct proto_ops msg_ops;
120
121 static struct proto tipc_proto;
122 static struct proto tipc_proto_kern;
123
124 static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {
125         [TIPC_NLA_SOCK_UNSPEC]          = { .type = NLA_UNSPEC },
126         [TIPC_NLA_SOCK_ADDR]            = { .type = NLA_U32 },
127         [TIPC_NLA_SOCK_REF]             = { .type = NLA_U32 },
128         [TIPC_NLA_SOCK_CON]             = { .type = NLA_NESTED },
129         [TIPC_NLA_SOCK_HAS_PUBL]        = { .type = NLA_FLAG }
130 };
131
132 /*
133  * Revised TIPC socket locking policy:
134  *
135  * Most socket operations take the standard socket lock when they start
136  * and hold it until they finish (or until they need to sleep).  Acquiring
137  * this lock grants the owner exclusive access to the fields of the socket
138  * data structures, with the exception of the backlog queue.  A few socket
139  * operations can be done without taking the socket lock because they only
140  * read socket information that never changes during the life of the socket.
141  *
142  * Socket operations may acquire the lock for the associated TIPC port if they
143  * need to perform an operation on the port.  If any routine needs to acquire
144  * both the socket lock and the port lock it must take the socket lock first
145  * to avoid the risk of deadlock.
146  *
147  * The dispatcher handling incoming messages cannot grab the socket lock in
148  * the standard fashion, since invoked it runs at the BH level and cannot block.
149  * Instead, it checks to see if the socket lock is currently owned by someone,
150  * and either handles the message itself or adds it to the socket's backlog
151  * queue; in the latter case the queued message is processed once the process
152  * owning the socket lock releases it.
153  *
154  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
155  * the problem of a blocked socket operation preventing any other operations
156  * from occurring.  However, applications must be careful if they have
157  * multiple threads trying to send (or receive) on the same socket, as these
158  * operations might interfere with each other.  For example, doing a connect
159  * and a receive at the same time might allow the receive to consume the
160  * ACK message meant for the connect.  While additional work could be done
161  * to try and overcome this, it doesn't seem to be worthwhile at the present.
162  *
163  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
164  * that another operation that must be performed in a non-blocking manner is
165  * not delayed for very long because the lock has already been taken.
166  *
167  * NOTE: This code assumes that certain fields of a port/socket pair are
168  * constant over its lifetime; such fields can be examined without taking
169  * the socket lock and/or port lock, and do not need to be re-read even
170  * after resuming processing after waiting.  These fields include:
171  *   - socket type
172  *   - pointer to socket sk structure (aka tipc_sock structure)
173  *   - pointer to port structure
174  *   - port reference
175  */
176
177 static u32 tsk_peer_node(struct tipc_sock *tsk)
178 {
179         return msg_destnode(&tsk->phdr);
180 }
181
182 static u32 tsk_peer_port(struct tipc_sock *tsk)
183 {
184         return msg_destport(&tsk->phdr);
185 }
186
187 static  bool tsk_unreliable(struct tipc_sock *tsk)
188 {
189         return msg_src_droppable(&tsk->phdr) != 0;
190 }
191
192 static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
193 {
194         msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
195 }
196
197 static bool tsk_unreturnable(struct tipc_sock *tsk)
198 {
199         return msg_dest_droppable(&tsk->phdr) != 0;
200 }
201
202 static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
203 {
204         msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
205 }
206
207 static int tsk_importance(struct tipc_sock *tsk)
208 {
209         return msg_importance(&tsk->phdr);
210 }
211
212 static int tsk_set_importance(struct tipc_sock *tsk, int imp)
213 {
214         if (imp > TIPC_CRITICAL_IMPORTANCE)
215                 return -EINVAL;
216         msg_set_importance(&tsk->phdr, (u32)imp);
217         return 0;
218 }
219
220 static struct tipc_sock *tipc_sk(const struct sock *sk)
221 {
222         return container_of(sk, struct tipc_sock, sk);
223 }
224
225 static int tsk_conn_cong(struct tipc_sock *tsk)
226 {
227         return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
228 }
229
230 /**
231  * tsk_advance_rx_queue - discard first buffer in socket receive queue
232  *
233  * Caller must hold socket lock
234  */
235 static void tsk_advance_rx_queue(struct sock *sk)
236 {
237         kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
238 }
239
240 /**
241  * tsk_rej_rx_queue - reject all buffers in socket receive queue
242  *
243  * Caller must hold socket lock
244  */
245 static void tsk_rej_rx_queue(struct sock *sk)
246 {
247         struct sk_buff *skb;
248         u32 dnode;
249
250         while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
251                 if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT))
252                         tipc_link_xmit_skb(skb, dnode, 0);
253         }
254 }
255
256 /* tsk_peer_msg - verify if message was sent by connected port's peer
257  *
258  * Handles cases where the node's network address has changed from
259  * the default of <0.0.0> to its configured setting.
260  */
261 static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
262 {
263         u32 peer_port = tsk_peer_port(tsk);
264         u32 orig_node;
265         u32 peer_node;
266
267         if (unlikely(!tsk->connected))
268                 return false;
269
270         if (unlikely(msg_origport(msg) != peer_port))
271                 return false;
272
273         orig_node = msg_orignode(msg);
274         peer_node = tsk_peer_node(tsk);
275
276         if (likely(orig_node == peer_node))
277                 return true;
278
279         if (!orig_node && (peer_node == tipc_own_addr))
280                 return true;
281
282         if (!peer_node && (orig_node == tipc_own_addr))
283                 return true;
284
285         return false;
286 }
287
288 /**
289  * tipc_sk_create - create a TIPC socket
290  * @net: network namespace (must be default network)
291  * @sock: pre-allocated socket structure
292  * @protocol: protocol indicator (must be 0)
293  * @kern: caused by kernel or by userspace?
294  *
295  * This routine creates additional data structures used by the TIPC socket,
296  * initializes them, and links them together.
297  *
298  * Returns 0 on success, errno otherwise
299  */
300 static int tipc_sk_create(struct net *net, struct socket *sock,
301                           int protocol, int kern)
302 {
303         const struct proto_ops *ops;
304         socket_state state;
305         struct sock *sk;
306         struct tipc_sock *tsk;
307         struct tipc_msg *msg;
308         u32 ref;
309
310         /* Validate arguments */
311         if (unlikely(protocol != 0))
312                 return -EPROTONOSUPPORT;
313
314         switch (sock->type) {
315         case SOCK_STREAM:
316                 ops = &stream_ops;
317                 state = SS_UNCONNECTED;
318                 break;
319         case SOCK_SEQPACKET:
320                 ops = &packet_ops;
321                 state = SS_UNCONNECTED;
322                 break;
323         case SOCK_DGRAM:
324         case SOCK_RDM:
325                 ops = &msg_ops;
326                 state = SS_READY;
327                 break;
328         default:
329                 return -EPROTOTYPE;
330         }
331
332         /* Allocate socket's protocol area */
333         if (!kern)
334                 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
335         else
336                 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern);
337
338         if (sk == NULL)
339                 return -ENOMEM;
340
341         tsk = tipc_sk(sk);
342         ref = tipc_sk_ref_acquire(tsk);
343         if (!ref) {
344                 pr_warn("Socket create failed; reference table exhausted\n");
345                 return -ENOMEM;
346         }
347         tsk->max_pkt = MAX_PKT_DEFAULT;
348         tsk->ref = ref;
349         INIT_LIST_HEAD(&tsk->publications);
350         msg = &tsk->phdr;
351         tipc_msg_init(msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
352                       NAMED_H_SIZE, 0);
353         msg_set_origport(msg, ref);
354
355         /* Finish initializing socket data structures */
356         sock->ops = ops;
357         sock->state = state;
358         sock_init_data(sock, sk);
359         k_init_timer(&tsk->timer, (Handler)tipc_sk_timeout, ref);
360         sk->sk_backlog_rcv = tipc_backlog_rcv;
361         sk->sk_rcvbuf = sysctl_tipc_rmem[1];
362         sk->sk_data_ready = tipc_data_ready;
363         sk->sk_write_space = tipc_write_space;
364         tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
365         tsk->sent_unacked = 0;
366         atomic_set(&tsk->dupl_rcvcnt, 0);
367
368         if (sock->state == SS_READY) {
369                 tsk_set_unreturnable(tsk, true);
370                 if (sock->type == SOCK_DGRAM)
371                         tsk_set_unreliable(tsk, true);
372         }
373         return 0;
374 }
375
376 /**
377  * tipc_sock_create_local - create TIPC socket from inside TIPC module
378  * @type: socket type - SOCK_RDM or SOCK_SEQPACKET
379  *
380  * We cannot use sock_creat_kern here because it bumps module user count.
381  * Since socket owner and creator is the same module we must make sure
382  * that module count remains zero for module local sockets, otherwise
383  * we cannot do rmmod.
384  *
385  * Returns 0 on success, errno otherwise
386  */
387 int tipc_sock_create_local(int type, struct socket **res)
388 {
389         int rc;
390
391         rc = sock_create_lite(AF_TIPC, type, 0, res);
392         if (rc < 0) {
393                 pr_err("Failed to create kernel socket\n");
394                 return rc;
395         }
396         tipc_sk_create(&init_net, *res, 0, 1);
397
398         return 0;
399 }
400
401 /**
402  * tipc_sock_release_local - release socket created by tipc_sock_create_local
403  * @sock: the socket to be released.
404  *
405  * Module reference count is not incremented when such sockets are created,
406  * so we must keep it from being decremented when they are released.
407  */
408 void tipc_sock_release_local(struct socket *sock)
409 {
410         tipc_release(sock);
411         sock->ops = NULL;
412         sock_release(sock);
413 }
414
415 /**
416  * tipc_sock_accept_local - accept a connection on a socket created
417  * with tipc_sock_create_local. Use this function to avoid that
418  * module reference count is inadvertently incremented.
419  *
420  * @sock:    the accepting socket
421  * @newsock: reference to the new socket to be created
422  * @flags:   socket flags
423  */
424
425 int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
426                            int flags)
427 {
428         struct sock *sk = sock->sk;
429         int ret;
430
431         ret = sock_create_lite(sk->sk_family, sk->sk_type,
432                                sk->sk_protocol, newsock);
433         if (ret < 0)
434                 return ret;
435
436         ret = tipc_accept(sock, *newsock, flags);
437         if (ret < 0) {
438                 sock_release(*newsock);
439                 return ret;
440         }
441         (*newsock)->ops = sock->ops;
442         return ret;
443 }
444
445 /**
446  * tipc_release - destroy a TIPC socket
447  * @sock: socket to destroy
448  *
449  * This routine cleans up any messages that are still queued on the socket.
450  * For DGRAM and RDM socket types, all queued messages are rejected.
451  * For SEQPACKET and STREAM socket types, the first message is rejected
452  * and any others are discarded.  (If the first message on a STREAM socket
453  * is partially-read, it is discarded and the next one is rejected instead.)
454  *
455  * NOTE: Rejected messages are not necessarily returned to the sender!  They
456  * are returned or discarded according to the "destination droppable" setting
457  * specified for the message by the sender.
458  *
459  * Returns 0 on success, errno otherwise
460  */
461 static int tipc_release(struct socket *sock)
462 {
463         struct sock *sk = sock->sk;
464         struct tipc_sock *tsk;
465         struct sk_buff *skb;
466         u32 dnode;
467
468         /*
469          * Exit if socket isn't fully initialized (occurs when a failed accept()
470          * releases a pre-allocated child socket that was never used)
471          */
472         if (sk == NULL)
473                 return 0;
474
475         tsk = tipc_sk(sk);
476         lock_sock(sk);
477
478         /*
479          * Reject all unreceived messages, except on an active connection
480          * (which disconnects locally & sends a 'FIN+' to peer)
481          */
482         dnode = tsk_peer_node(tsk);
483         while (sock->state != SS_DISCONNECTING) {
484                 skb = __skb_dequeue(&sk->sk_receive_queue);
485                 if (skb == NULL)
486                         break;
487                 if (TIPC_SKB_CB(skb)->handle != NULL)
488                         kfree_skb(skb);
489                 else {
490                         if ((sock->state == SS_CONNECTING) ||
491                             (sock->state == SS_CONNECTED)) {
492                                 sock->state = SS_DISCONNECTING;
493                                 tsk->connected = 0;
494                                 tipc_node_remove_conn(dnode, tsk->ref);
495                         }
496                         if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT))
497                                 tipc_link_xmit_skb(skb, dnode, 0);
498                 }
499         }
500
501         tipc_sk_withdraw(tsk, 0, NULL);
502         tipc_sk_ref_discard(tsk->ref);
503         k_cancel_timer(&tsk->timer);
504         if (tsk->connected) {
505                 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
506                                       SHORT_H_SIZE, 0, dnode, tipc_own_addr,
507                                       tsk_peer_port(tsk),
508                                       tsk->ref, TIPC_ERR_NO_PORT);
509                 if (skb)
510                         tipc_link_xmit_skb(skb, dnode, tsk->ref);
511                 tipc_node_remove_conn(dnode, tsk->ref);
512         }
513         k_term_timer(&tsk->timer);
514
515         /* Discard any remaining (connection-based) messages in receive queue */
516         __skb_queue_purge(&sk->sk_receive_queue);
517
518         /* Reject any messages that accumulated in backlog queue */
519         sock->state = SS_DISCONNECTING;
520         release_sock(sk);
521         sock_put(sk);
522         sock->sk = NULL;
523
524         return 0;
525 }
526
527 /**
528  * tipc_bind - associate or disassocate TIPC name(s) with a socket
529  * @sock: socket structure
530  * @uaddr: socket address describing name(s) and desired operation
531  * @uaddr_len: size of socket address data structure
532  *
533  * Name and name sequence binding is indicated using a positive scope value;
534  * a negative scope value unbinds the specified name.  Specifying no name
535  * (i.e. a socket address length of 0) unbinds all names from the socket.
536  *
537  * Returns 0 on success, errno otherwise
538  *
539  * NOTE: This routine doesn't need to take the socket lock since it doesn't
540  *       access any non-constant socket information.
541  */
542 static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
543                      int uaddr_len)
544 {
545         struct sock *sk = sock->sk;
546         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
547         struct tipc_sock *tsk = tipc_sk(sk);
548         int res = -EINVAL;
549
550         lock_sock(sk);
551         if (unlikely(!uaddr_len)) {
552                 res = tipc_sk_withdraw(tsk, 0, NULL);
553                 goto exit;
554         }
555
556         if (uaddr_len < sizeof(struct sockaddr_tipc)) {
557                 res = -EINVAL;
558                 goto exit;
559         }
560         if (addr->family != AF_TIPC) {
561                 res = -EAFNOSUPPORT;
562                 goto exit;
563         }
564
565         if (addr->addrtype == TIPC_ADDR_NAME)
566                 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
567         else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
568                 res = -EAFNOSUPPORT;
569                 goto exit;
570         }
571
572         if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
573             (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
574             (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
575                 res = -EACCES;
576                 goto exit;
577         }
578
579         res = (addr->scope > 0) ?
580                 tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) :
581                 tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq);
582 exit:
583         release_sock(sk);
584         return res;
585 }
586
587 /**
588  * tipc_getname - get port ID of socket or peer socket
589  * @sock: socket structure
590  * @uaddr: area for returned socket address
591  * @uaddr_len: area for returned length of socket address
592  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
593  *
594  * Returns 0 on success, errno otherwise
595  *
596  * NOTE: This routine doesn't need to take the socket lock since it only
597  *       accesses socket information that is unchanging (or which changes in
598  *       a completely predictable manner).
599  */
600 static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
601                         int *uaddr_len, int peer)
602 {
603         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
604         struct tipc_sock *tsk = tipc_sk(sock->sk);
605
606         memset(addr, 0, sizeof(*addr));
607         if (peer) {
608                 if ((sock->state != SS_CONNECTED) &&
609                         ((peer != 2) || (sock->state != SS_DISCONNECTING)))
610                         return -ENOTCONN;
611                 addr->addr.id.ref = tsk_peer_port(tsk);
612                 addr->addr.id.node = tsk_peer_node(tsk);
613         } else {
614                 addr->addr.id.ref = tsk->ref;
615                 addr->addr.id.node = tipc_own_addr;
616         }
617
618         *uaddr_len = sizeof(*addr);
619         addr->addrtype = TIPC_ADDR_ID;
620         addr->family = AF_TIPC;
621         addr->scope = 0;
622         addr->addr.name.domain = 0;
623
624         return 0;
625 }
626
627 /**
628  * tipc_poll - read and possibly block on pollmask
629  * @file: file structure associated with the socket
630  * @sock: socket for which to calculate the poll bits
631  * @wait: ???
632  *
633  * Returns pollmask value
634  *
635  * COMMENTARY:
636  * It appears that the usual socket locking mechanisms are not useful here
637  * since the pollmask info is potentially out-of-date the moment this routine
638  * exits.  TCP and other protocols seem to rely on higher level poll routines
639  * to handle any preventable race conditions, so TIPC will do the same ...
640  *
641  * TIPC sets the returned events as follows:
642  *
643  * socket state         flags set
644  * ------------         ---------
645  * unconnected          no read flags
646  *                      POLLOUT if port is not congested
647  *
648  * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
649  *                      no write flags
650  *
651  * connected            POLLIN/POLLRDNORM if data in rx queue
652  *                      POLLOUT if port is not congested
653  *
654  * disconnecting        POLLIN/POLLRDNORM/POLLHUP
655  *                      no write flags
656  *
657  * listening            POLLIN if SYN in rx queue
658  *                      no write flags
659  *
660  * ready                POLLIN/POLLRDNORM if data in rx queue
661  * [connectionless]     POLLOUT (since port cannot be congested)
662  *
663  * IMPORTANT: The fact that a read or write operation is indicated does NOT
664  * imply that the operation will succeed, merely that it should be performed
665  * and will not block.
666  */
667 static unsigned int tipc_poll(struct file *file, struct socket *sock,
668                               poll_table *wait)
669 {
670         struct sock *sk = sock->sk;
671         struct tipc_sock *tsk = tipc_sk(sk);
672         u32 mask = 0;
673
674         sock_poll_wait(file, sk_sleep(sk), wait);
675
676         switch ((int)sock->state) {
677         case SS_UNCONNECTED:
678                 if (!tsk->link_cong)
679                         mask |= POLLOUT;
680                 break;
681         case SS_READY:
682         case SS_CONNECTED:
683                 if (!tsk->link_cong && !tsk_conn_cong(tsk))
684                         mask |= POLLOUT;
685                 /* fall thru' */
686         case SS_CONNECTING:
687         case SS_LISTENING:
688                 if (!skb_queue_empty(&sk->sk_receive_queue))
689                         mask |= (POLLIN | POLLRDNORM);
690                 break;
691         case SS_DISCONNECTING:
692                 mask = (POLLIN | POLLRDNORM | POLLHUP);
693                 break;
694         }
695
696         return mask;
697 }
698
699 /**
700  * tipc_sendmcast - send multicast message
701  * @sock: socket structure
702  * @seq: destination address
703  * @msg: message to send
704  * @dsz: total length of message data
705  * @timeo: timeout to wait for wakeup
706  *
707  * Called from function tipc_sendmsg(), which has done all sanity checks
708  * Returns the number of bytes sent on success, or errno
709  */
710 static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
711                           struct msghdr *msg, size_t dsz, long timeo)
712 {
713         struct sock *sk = sock->sk;
714         struct tipc_msg *mhdr = &tipc_sk(sk)->phdr;
715         struct sk_buff_head head;
716         uint mtu;
717         int rc;
718
719         msg_set_type(mhdr, TIPC_MCAST_MSG);
720         msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
721         msg_set_destport(mhdr, 0);
722         msg_set_destnode(mhdr, 0);
723         msg_set_nametype(mhdr, seq->type);
724         msg_set_namelower(mhdr, seq->lower);
725         msg_set_nameupper(mhdr, seq->upper);
726         msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
727
728 new_mtu:
729         mtu = tipc_bclink_get_mtu();
730         __skb_queue_head_init(&head);
731         rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &head);
732         if (unlikely(rc < 0))
733                 return rc;
734
735         do {
736                 rc = tipc_bclink_xmit(&head);
737                 if (likely(rc >= 0)) {
738                         rc = dsz;
739                         break;
740                 }
741                 if (rc == -EMSGSIZE)
742                         goto new_mtu;
743                 if (rc != -ELINKCONG)
744                         break;
745                 tipc_sk(sk)->link_cong = 1;
746                 rc = tipc_wait_for_sndmsg(sock, &timeo);
747                 if (rc)
748                         __skb_queue_purge(&head);
749         } while (!rc);
750         return rc;
751 }
752
753 /* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
754  */
755 void tipc_sk_mcast_rcv(struct sk_buff *buf)
756 {
757         struct tipc_msg *msg = buf_msg(buf);
758         struct tipc_port_list dports = {0, NULL, };
759         struct tipc_port_list *item;
760         struct sk_buff *b;
761         uint i, last, dst = 0;
762         u32 scope = TIPC_CLUSTER_SCOPE;
763
764         if (in_own_node(msg_orignode(msg)))
765                 scope = TIPC_NODE_SCOPE;
766
767         /* Create destination port list: */
768         tipc_nametbl_mc_translate(msg_nametype(msg),
769                                   msg_namelower(msg),
770                                   msg_nameupper(msg),
771                                   scope,
772                                   &dports);
773         last = dports.count;
774         if (!last) {
775                 kfree_skb(buf);
776                 return;
777         }
778
779         for (item = &dports; item; item = item->next) {
780                 for (i = 0; i < PLSIZE && ++dst <= last; i++) {
781                         b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf;
782                         if (!b) {
783                                 pr_warn("Failed do clone mcast rcv buffer\n");
784                                 continue;
785                         }
786                         msg_set_destport(msg, item->ports[i]);
787                         tipc_sk_rcv(b);
788                 }
789         }
790         tipc_port_list_free(&dports);
791 }
792
793 /**
794  * tipc_sk_proto_rcv - receive a connection mng protocol message
795  * @tsk: receiving socket
796  * @dnode: node to send response message to, if any
797  * @buf: buffer containing protocol message
798  * Returns 0 (TIPC_OK) if message was consumed, 1 (TIPC_FWD_MSG) if
799  * (CONN_PROBE_REPLY) message should be forwarded.
800  */
801 static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode,
802                              struct sk_buff *buf)
803 {
804         struct tipc_msg *msg = buf_msg(buf);
805         int conn_cong;
806
807         /* Ignore if connection cannot be validated: */
808         if (!tsk_peer_msg(tsk, msg))
809                 goto exit;
810
811         tsk->probing_state = TIPC_CONN_OK;
812
813         if (msg_type(msg) == CONN_ACK) {
814                 conn_cong = tsk_conn_cong(tsk);
815                 tsk->sent_unacked -= msg_msgcnt(msg);
816                 if (conn_cong)
817                         tsk->sk.sk_write_space(&tsk->sk);
818         } else if (msg_type(msg) == CONN_PROBE) {
819                 if (!tipc_msg_reverse(buf, dnode, TIPC_OK))
820                         return TIPC_OK;
821                 msg_set_type(msg, CONN_PROBE_REPLY);
822                 return TIPC_FWD_MSG;
823         }
824         /* Do nothing if msg_type() == CONN_PROBE_REPLY */
825 exit:
826         kfree_skb(buf);
827         return TIPC_OK;
828 }
829
830 /**
831  * dest_name_check - verify user is permitted to send to specified port name
832  * @dest: destination address
833  * @m: descriptor for message to be sent
834  *
835  * Prevents restricted configuration commands from being issued by
836  * unauthorized users.
837  *
838  * Returns 0 if permission is granted, otherwise errno
839  */
840 static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
841 {
842         struct tipc_cfg_msg_hdr hdr;
843
844         if (unlikely(dest->addrtype == TIPC_ADDR_ID))
845                 return 0;
846         if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
847                 return 0;
848         if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
849                 return 0;
850         if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
851                 return -EACCES;
852
853         if (!m->msg_iovlen || (m->msg_iov[0].iov_len < sizeof(hdr)))
854                 return -EMSGSIZE;
855         if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr)))
856                 return -EFAULT;
857         if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN)))
858                 return -EACCES;
859
860         return 0;
861 }
862
863 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
864 {
865         struct sock *sk = sock->sk;
866         struct tipc_sock *tsk = tipc_sk(sk);
867         DEFINE_WAIT(wait);
868         int done;
869
870         do {
871                 int err = sock_error(sk);
872                 if (err)
873                         return err;
874                 if (sock->state == SS_DISCONNECTING)
875                         return -EPIPE;
876                 if (!*timeo_p)
877                         return -EAGAIN;
878                 if (signal_pending(current))
879                         return sock_intr_errno(*timeo_p);
880
881                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
882                 done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
883                 finish_wait(sk_sleep(sk), &wait);
884         } while (!done);
885         return 0;
886 }
887
888 /**
889  * tipc_sendmsg - send message in connectionless manner
890  * @iocb: if NULL, indicates that socket lock is already held
891  * @sock: socket structure
892  * @m: message to send
893  * @dsz: amount of user data to be sent
894  *
895  * Message must have an destination specified explicitly.
896  * Used for SOCK_RDM and SOCK_DGRAM messages,
897  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
898  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
899  *
900  * Returns the number of bytes sent on success, or errno otherwise
901  */
902 static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
903                         struct msghdr *m, size_t dsz)
904 {
905         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
906         struct sock *sk = sock->sk;
907         struct tipc_sock *tsk = tipc_sk(sk);
908         struct tipc_msg *mhdr = &tsk->phdr;
909         u32 dnode, dport;
910         struct sk_buff_head head;
911         struct sk_buff *skb;
912         struct tipc_name_seq *seq = &dest->addr.nameseq;
913         u32 mtu;
914         long timeo;
915         int rc = -EINVAL;
916
917         if (unlikely(!dest))
918                 return -EDESTADDRREQ;
919
920         if (unlikely((m->msg_namelen < sizeof(*dest)) ||
921                      (dest->family != AF_TIPC)))
922                 return -EINVAL;
923
924         if (dsz > TIPC_MAX_USER_MSG_SIZE)
925                 return -EMSGSIZE;
926
927         if (iocb)
928                 lock_sock(sk);
929
930         if (unlikely(sock->state != SS_READY)) {
931                 if (sock->state == SS_LISTENING) {
932                         rc = -EPIPE;
933                         goto exit;
934                 }
935                 if (sock->state != SS_UNCONNECTED) {
936                         rc = -EISCONN;
937                         goto exit;
938                 }
939                 if (tsk->published) {
940                         rc = -EOPNOTSUPP;
941                         goto exit;
942                 }
943                 if (dest->addrtype == TIPC_ADDR_NAME) {
944                         tsk->conn_type = dest->addr.name.name.type;
945                         tsk->conn_instance = dest->addr.name.name.instance;
946                 }
947         }
948         rc = dest_name_check(dest, m);
949         if (rc)
950                 goto exit;
951
952         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
953
954         if (dest->addrtype == TIPC_ADDR_MCAST) {
955                 rc = tipc_sendmcast(sock, seq, m, dsz, timeo);
956                 goto exit;
957         } else if (dest->addrtype == TIPC_ADDR_NAME) {
958                 u32 type = dest->addr.name.name.type;
959                 u32 inst = dest->addr.name.name.instance;
960                 u32 domain = dest->addr.name.domain;
961
962                 dnode = domain;
963                 msg_set_type(mhdr, TIPC_NAMED_MSG);
964                 msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
965                 msg_set_nametype(mhdr, type);
966                 msg_set_nameinst(mhdr, inst);
967                 msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
968                 dport = tipc_nametbl_translate(type, inst, &dnode);
969                 msg_set_destnode(mhdr, dnode);
970                 msg_set_destport(mhdr, dport);
971                 if (unlikely(!dport && !dnode)) {
972                         rc = -EHOSTUNREACH;
973                         goto exit;
974                 }
975         } else if (dest->addrtype == TIPC_ADDR_ID) {
976                 dnode = dest->addr.id.node;
977                 msg_set_type(mhdr, TIPC_DIRECT_MSG);
978                 msg_set_lookup_scope(mhdr, 0);
979                 msg_set_destnode(mhdr, dnode);
980                 msg_set_destport(mhdr, dest->addr.id.ref);
981                 msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
982         }
983
984 new_mtu:
985         mtu = tipc_node_get_mtu(dnode, tsk->ref);
986         __skb_queue_head_init(&head);
987         rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &head);
988         if (rc < 0)
989                 goto exit;
990
991         do {
992                 skb = skb_peek(&head);
993                 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
994                 rc = tipc_link_xmit(&head, dnode, tsk->ref);
995                 if (likely(rc >= 0)) {
996                         if (sock->state != SS_READY)
997                                 sock->state = SS_CONNECTING;
998                         rc = dsz;
999                         break;
1000                 }
1001                 if (rc == -EMSGSIZE)
1002                         goto new_mtu;
1003                 if (rc != -ELINKCONG)
1004                         break;
1005                 tsk->link_cong = 1;
1006                 rc = tipc_wait_for_sndmsg(sock, &timeo);
1007                 if (rc)
1008                         __skb_queue_purge(&head);
1009         } while (!rc);
1010 exit:
1011         if (iocb)
1012                 release_sock(sk);
1013
1014         return rc;
1015 }
1016
1017 static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
1018 {
1019         struct sock *sk = sock->sk;
1020         struct tipc_sock *tsk = tipc_sk(sk);
1021         DEFINE_WAIT(wait);
1022         int done;
1023
1024         do {
1025                 int err = sock_error(sk);
1026                 if (err)
1027                         return err;
1028                 if (sock->state == SS_DISCONNECTING)
1029                         return -EPIPE;
1030                 else if (sock->state != SS_CONNECTED)
1031                         return -ENOTCONN;
1032                 if (!*timeo_p)
1033                         return -EAGAIN;
1034                 if (signal_pending(current))
1035                         return sock_intr_errno(*timeo_p);
1036
1037                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1038                 done = sk_wait_event(sk, timeo_p,
1039                                      (!tsk->link_cong &&
1040                                       !tsk_conn_cong(tsk)) ||
1041                                      !tsk->connected);
1042                 finish_wait(sk_sleep(sk), &wait);
1043         } while (!done);
1044         return 0;
1045 }
1046
1047 /**
1048  * tipc_send_stream - send stream-oriented data
1049  * @iocb: (unused)
1050  * @sock: socket structure
1051  * @m: data to send
1052  * @dsz: total length of data to be transmitted
1053  *
1054  * Used for SOCK_STREAM data.
1055  *
1056  * Returns the number of bytes sent on success (or partial success),
1057  * or errno if no data sent
1058  */
1059 static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
1060                             struct msghdr *m, size_t dsz)
1061 {
1062         struct sock *sk = sock->sk;
1063         struct tipc_sock *tsk = tipc_sk(sk);
1064         struct tipc_msg *mhdr = &tsk->phdr;
1065         struct sk_buff_head head;
1066         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1067         u32 ref = tsk->ref;
1068         int rc = -EINVAL;
1069         long timeo;
1070         u32 dnode;
1071         uint mtu, send, sent = 0;
1072
1073         /* Handle implied connection establishment */
1074         if (unlikely(dest)) {
1075                 rc = tipc_sendmsg(iocb, sock, m, dsz);
1076                 if (dsz && (dsz == rc))
1077                         tsk->sent_unacked = 1;
1078                 return rc;
1079         }
1080         if (dsz > (uint)INT_MAX)
1081                 return -EMSGSIZE;
1082
1083         if (iocb)
1084                 lock_sock(sk);
1085
1086         if (unlikely(sock->state != SS_CONNECTED)) {
1087                 if (sock->state == SS_DISCONNECTING)
1088                         rc = -EPIPE;
1089                 else
1090                         rc = -ENOTCONN;
1091                 goto exit;
1092         }
1093
1094         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1095         dnode = tsk_peer_node(tsk);
1096
1097 next:
1098         mtu = tsk->max_pkt;
1099         send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
1100         __skb_queue_head_init(&head);
1101         rc = tipc_msg_build(mhdr, m, sent, send, mtu, &head);
1102         if (unlikely(rc < 0))
1103                 goto exit;
1104         do {
1105                 if (likely(!tsk_conn_cong(tsk))) {
1106                         rc = tipc_link_xmit(&head, dnode, ref);
1107                         if (likely(!rc)) {
1108                                 tsk->sent_unacked++;
1109                                 sent += send;
1110                                 if (sent == dsz)
1111                                         break;
1112                                 goto next;
1113                         }
1114                         if (rc == -EMSGSIZE) {
1115                                 tsk->max_pkt = tipc_node_get_mtu(dnode, ref);
1116                                 goto next;
1117                         }
1118                         if (rc != -ELINKCONG)
1119                                 break;
1120                         tsk->link_cong = 1;
1121                 }
1122                 rc = tipc_wait_for_sndpkt(sock, &timeo);
1123                 if (rc)
1124                         __skb_queue_purge(&head);
1125         } while (!rc);
1126 exit:
1127         if (iocb)
1128                 release_sock(sk);
1129         return sent ? sent : rc;
1130 }
1131
1132 /**
1133  * tipc_send_packet - send a connection-oriented message
1134  * @iocb: if NULL, indicates that socket lock is already held
1135  * @sock: socket structure
1136  * @m: message to send
1137  * @dsz: length of data to be transmitted
1138  *
1139  * Used for SOCK_SEQPACKET messages.
1140  *
1141  * Returns the number of bytes sent on success, or errno otherwise
1142  */
1143 static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
1144                             struct msghdr *m, size_t dsz)
1145 {
1146         if (dsz > TIPC_MAX_USER_MSG_SIZE)
1147                 return -EMSGSIZE;
1148
1149         return tipc_send_stream(iocb, sock, m, dsz);
1150 }
1151
1152 /* tipc_sk_finish_conn - complete the setup of a connection
1153  */
1154 static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1155                                 u32 peer_node)
1156 {
1157         struct tipc_msg *msg = &tsk->phdr;
1158
1159         msg_set_destnode(msg, peer_node);
1160         msg_set_destport(msg, peer_port);
1161         msg_set_type(msg, TIPC_CONN_MSG);
1162         msg_set_lookup_scope(msg, 0);
1163         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1164
1165         tsk->probing_interval = CONN_PROBING_INTERVAL;
1166         tsk->probing_state = TIPC_CONN_OK;
1167         tsk->connected = 1;
1168         k_start_timer(&tsk->timer, tsk->probing_interval);
1169         tipc_node_add_conn(peer_node, tsk->ref, peer_port);
1170         tsk->max_pkt = tipc_node_get_mtu(peer_node, tsk->ref);
1171 }
1172
1173 /**
1174  * set_orig_addr - capture sender's address for received message
1175  * @m: descriptor for message info
1176  * @msg: received message header
1177  *
1178  * Note: Address is not captured if not requested by receiver.
1179  */
1180 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
1181 {
1182         DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
1183
1184         if (addr) {
1185                 addr->family = AF_TIPC;
1186                 addr->addrtype = TIPC_ADDR_ID;
1187                 memset(&addr->addr, 0, sizeof(addr->addr));
1188                 addr->addr.id.ref = msg_origport(msg);
1189                 addr->addr.id.node = msg_orignode(msg);
1190                 addr->addr.name.domain = 0;     /* could leave uninitialized */
1191                 addr->scope = 0;                /* could leave uninitialized */
1192                 m->msg_namelen = sizeof(struct sockaddr_tipc);
1193         }
1194 }
1195
1196 /**
1197  * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
1198  * @m: descriptor for message info
1199  * @msg: received message header
1200  * @tsk: TIPC port associated with message
1201  *
1202  * Note: Ancillary data is not captured if not requested by receiver.
1203  *
1204  * Returns 0 if successful, otherwise errno
1205  */
1206 static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
1207                                  struct tipc_sock *tsk)
1208 {
1209         u32 anc_data[3];
1210         u32 err;
1211         u32 dest_type;
1212         int has_name;
1213         int res;
1214
1215         if (likely(m->msg_controllen == 0))
1216                 return 0;
1217
1218         /* Optionally capture errored message object(s) */
1219         err = msg ? msg_errcode(msg) : 0;
1220         if (unlikely(err)) {
1221                 anc_data[0] = err;
1222                 anc_data[1] = msg_data_sz(msg);
1223                 res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
1224                 if (res)
1225                         return res;
1226                 if (anc_data[1]) {
1227                         res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
1228                                        msg_data(msg));
1229                         if (res)
1230                                 return res;
1231                 }
1232         }
1233
1234         /* Optionally capture message destination object */
1235         dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
1236         switch (dest_type) {
1237         case TIPC_NAMED_MSG:
1238                 has_name = 1;
1239                 anc_data[0] = msg_nametype(msg);
1240                 anc_data[1] = msg_namelower(msg);
1241                 anc_data[2] = msg_namelower(msg);
1242                 break;
1243         case TIPC_MCAST_MSG:
1244                 has_name = 1;
1245                 anc_data[0] = msg_nametype(msg);
1246                 anc_data[1] = msg_namelower(msg);
1247                 anc_data[2] = msg_nameupper(msg);
1248                 break;
1249         case TIPC_CONN_MSG:
1250                 has_name = (tsk->conn_type != 0);
1251                 anc_data[0] = tsk->conn_type;
1252                 anc_data[1] = tsk->conn_instance;
1253                 anc_data[2] = tsk->conn_instance;
1254                 break;
1255         default:
1256                 has_name = 0;
1257         }
1258         if (has_name) {
1259                 res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
1260                 if (res)
1261                         return res;
1262         }
1263
1264         return 0;
1265 }
1266
1267 static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
1268 {
1269         struct sk_buff *skb = NULL;
1270         struct tipc_msg *msg;
1271         u32 peer_port = tsk_peer_port(tsk);
1272         u32 dnode = tsk_peer_node(tsk);
1273
1274         if (!tsk->connected)
1275                 return;
1276         skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode,
1277                               tipc_own_addr, peer_port, tsk->ref, TIPC_OK);
1278         if (!skb)
1279                 return;
1280         msg = buf_msg(skb);
1281         msg_set_msgcnt(msg, ack);
1282         tipc_link_xmit_skb(skb, dnode, msg_link_selector(msg));
1283 }
1284
1285 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
1286 {
1287         struct sock *sk = sock->sk;
1288         DEFINE_WAIT(wait);
1289         long timeo = *timeop;
1290         int err;
1291
1292         for (;;) {
1293                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1294                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1295                         if (sock->state == SS_DISCONNECTING) {
1296                                 err = -ENOTCONN;
1297                                 break;
1298                         }
1299                         release_sock(sk);
1300                         timeo = schedule_timeout(timeo);
1301                         lock_sock(sk);
1302                 }
1303                 err = 0;
1304                 if (!skb_queue_empty(&sk->sk_receive_queue))
1305                         break;
1306                 err = sock_intr_errno(timeo);
1307                 if (signal_pending(current))
1308                         break;
1309                 err = -EAGAIN;
1310                 if (!timeo)
1311                         break;
1312         }
1313         finish_wait(sk_sleep(sk), &wait);
1314         *timeop = timeo;
1315         return err;
1316 }
1317
1318 /**
1319  * tipc_recvmsg - receive packet-oriented message
1320  * @iocb: (unused)
1321  * @m: descriptor for message info
1322  * @buf_len: total size of user buffer area
1323  * @flags: receive flags
1324  *
1325  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1326  * If the complete message doesn't fit in user area, truncate it.
1327  *
1328  * Returns size of returned message data, errno otherwise
1329  */
1330 static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock,
1331                         struct msghdr *m, size_t buf_len, int flags)
1332 {
1333         struct sock *sk = sock->sk;
1334         struct tipc_sock *tsk = tipc_sk(sk);
1335         struct sk_buff *buf;
1336         struct tipc_msg *msg;
1337         long timeo;
1338         unsigned int sz;
1339         u32 err;
1340         int res;
1341
1342         /* Catch invalid receive requests */
1343         if (unlikely(!buf_len))
1344                 return -EINVAL;
1345
1346         lock_sock(sk);
1347
1348         if (unlikely(sock->state == SS_UNCONNECTED)) {
1349                 res = -ENOTCONN;
1350                 goto exit;
1351         }
1352
1353         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1354 restart:
1355
1356         /* Look for a message in receive queue; wait if necessary */
1357         res = tipc_wait_for_rcvmsg(sock, &timeo);
1358         if (res)
1359                 goto exit;
1360
1361         /* Look at first message in receive queue */
1362         buf = skb_peek(&sk->sk_receive_queue);
1363         msg = buf_msg(buf);
1364         sz = msg_data_sz(msg);
1365         err = msg_errcode(msg);
1366
1367         /* Discard an empty non-errored message & try again */
1368         if ((!sz) && (!err)) {
1369                 tsk_advance_rx_queue(sk);
1370                 goto restart;
1371         }
1372
1373         /* Capture sender's address (optional) */
1374         set_orig_addr(m, msg);
1375
1376         /* Capture ancillary data (optional) */
1377         res = tipc_sk_anc_data_recv(m, msg, tsk);
1378         if (res)
1379                 goto exit;
1380
1381         /* Capture message data (if valid) & compute return value (always) */
1382         if (!err) {
1383                 if (unlikely(buf_len < sz)) {
1384                         sz = buf_len;
1385                         m->msg_flags |= MSG_TRUNC;
1386                 }
1387                 res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz);
1388                 if (res)
1389                         goto exit;
1390                 res = sz;
1391         } else {
1392                 if ((sock->state == SS_READY) ||
1393                     ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1394                         res = 0;
1395                 else
1396                         res = -ECONNRESET;
1397         }
1398
1399         /* Consume received message (optional) */
1400         if (likely(!(flags & MSG_PEEK))) {
1401                 if ((sock->state != SS_READY) &&
1402                     (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1403                         tipc_sk_send_ack(tsk, tsk->rcv_unacked);
1404                         tsk->rcv_unacked = 0;
1405                 }
1406                 tsk_advance_rx_queue(sk);
1407         }
1408 exit:
1409         release_sock(sk);
1410         return res;
1411 }
1412
1413 /**
1414  * tipc_recv_stream - receive stream-oriented data
1415  * @iocb: (unused)
1416  * @m: descriptor for message info
1417  * @buf_len: total size of user buffer area
1418  * @flags: receive flags
1419  *
1420  * Used for SOCK_STREAM messages only.  If not enough data is available
1421  * will optionally wait for more; never truncates data.
1422  *
1423  * Returns size of returned message data, errno otherwise
1424  */
1425 static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock,
1426                             struct msghdr *m, size_t buf_len, int flags)
1427 {
1428         struct sock *sk = sock->sk;
1429         struct tipc_sock *tsk = tipc_sk(sk);
1430         struct sk_buff *buf;
1431         struct tipc_msg *msg;
1432         long timeo;
1433         unsigned int sz;
1434         int sz_to_copy, target, needed;
1435         int sz_copied = 0;
1436         u32 err;
1437         int res = 0;
1438
1439         /* Catch invalid receive attempts */
1440         if (unlikely(!buf_len))
1441                 return -EINVAL;
1442
1443         lock_sock(sk);
1444
1445         if (unlikely(sock->state == SS_UNCONNECTED)) {
1446                 res = -ENOTCONN;
1447                 goto exit;
1448         }
1449
1450         target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1451         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1452
1453 restart:
1454         /* Look for a message in receive queue; wait if necessary */
1455         res = tipc_wait_for_rcvmsg(sock, &timeo);
1456         if (res)
1457                 goto exit;
1458
1459         /* Look at first message in receive queue */
1460         buf = skb_peek(&sk->sk_receive_queue);
1461         msg = buf_msg(buf);
1462         sz = msg_data_sz(msg);
1463         err = msg_errcode(msg);
1464
1465         /* Discard an empty non-errored message & try again */
1466         if ((!sz) && (!err)) {
1467                 tsk_advance_rx_queue(sk);
1468                 goto restart;
1469         }
1470
1471         /* Optionally capture sender's address & ancillary data of first msg */
1472         if (sz_copied == 0) {
1473                 set_orig_addr(m, msg);
1474                 res = tipc_sk_anc_data_recv(m, msg, tsk);
1475                 if (res)
1476                         goto exit;
1477         }
1478
1479         /* Capture message data (if valid) & compute return value (always) */
1480         if (!err) {
1481                 u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1482
1483                 sz -= offset;
1484                 needed = (buf_len - sz_copied);
1485                 sz_to_copy = (sz <= needed) ? sz : needed;
1486
1487                 res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset,
1488                                             m, sz_to_copy);
1489                 if (res)
1490                         goto exit;
1491
1492                 sz_copied += sz_to_copy;
1493
1494                 if (sz_to_copy < sz) {
1495                         if (!(flags & MSG_PEEK))
1496                                 TIPC_SKB_CB(buf)->handle =
1497                                 (void *)(unsigned long)(offset + sz_to_copy);
1498                         goto exit;
1499                 }
1500         } else {
1501                 if (sz_copied != 0)
1502                         goto exit; /* can't add error msg to valid data */
1503
1504                 if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1505                         res = 0;
1506                 else
1507                         res = -ECONNRESET;
1508         }
1509
1510         /* Consume received message (optional) */
1511         if (likely(!(flags & MSG_PEEK))) {
1512                 if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1513                         tipc_sk_send_ack(tsk, tsk->rcv_unacked);
1514                         tsk->rcv_unacked = 0;
1515                 }
1516                 tsk_advance_rx_queue(sk);
1517         }
1518
1519         /* Loop around if more data is required */
1520         if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1521             (!skb_queue_empty(&sk->sk_receive_queue) ||
1522             (sz_copied < target)) &&    /* and more is ready or required */
1523             (!(flags & MSG_PEEK)) &&    /* and aren't just peeking at data */
1524             (!err))                     /* and haven't reached a FIN */
1525                 goto restart;
1526
1527 exit:
1528         release_sock(sk);
1529         return sz_copied ? sz_copied : res;
1530 }
1531
1532 /**
1533  * tipc_write_space - wake up thread if port congestion is released
1534  * @sk: socket
1535  */
1536 static void tipc_write_space(struct sock *sk)
1537 {
1538         struct socket_wq *wq;
1539
1540         rcu_read_lock();
1541         wq = rcu_dereference(sk->sk_wq);
1542         if (wq_has_sleeper(wq))
1543                 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1544                                                 POLLWRNORM | POLLWRBAND);
1545         rcu_read_unlock();
1546 }
1547
1548 /**
1549  * tipc_data_ready - wake up threads to indicate messages have been received
1550  * @sk: socket
1551  * @len: the length of messages
1552  */
1553 static void tipc_data_ready(struct sock *sk)
1554 {
1555         struct socket_wq *wq;
1556
1557         rcu_read_lock();
1558         wq = rcu_dereference(sk->sk_wq);
1559         if (wq_has_sleeper(wq))
1560                 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1561                                                 POLLRDNORM | POLLRDBAND);
1562         rcu_read_unlock();
1563 }
1564
1565 /**
1566  * filter_connect - Handle all incoming messages for a connection-based socket
1567  * @tsk: TIPC socket
1568  * @msg: message
1569  *
1570  * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise
1571  */
1572 static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
1573 {
1574         struct sock *sk = &tsk->sk;
1575         struct socket *sock = sk->sk_socket;
1576         struct tipc_msg *msg = buf_msg(*buf);
1577         int retval = -TIPC_ERR_NO_PORT;
1578
1579         if (msg_mcast(msg))
1580                 return retval;
1581
1582         switch ((int)sock->state) {
1583         case SS_CONNECTED:
1584                 /* Accept only connection-based messages sent by peer */
1585                 if (tsk_peer_msg(tsk, msg)) {
1586                         if (unlikely(msg_errcode(msg))) {
1587                                 sock->state = SS_DISCONNECTING;
1588                                 tsk->connected = 0;
1589                                 /* let timer expire on it's own */
1590                                 tipc_node_remove_conn(tsk_peer_node(tsk),
1591                                                       tsk->ref);
1592                         }
1593                         retval = TIPC_OK;
1594                 }
1595                 break;
1596         case SS_CONNECTING:
1597                 /* Accept only ACK or NACK message */
1598
1599                 if (unlikely(!msg_connected(msg)))
1600                         break;
1601
1602                 if (unlikely(msg_errcode(msg))) {
1603                         sock->state = SS_DISCONNECTING;
1604                         sk->sk_err = ECONNREFUSED;
1605                         retval = TIPC_OK;
1606                         break;
1607                 }
1608
1609                 if (unlikely(msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)) {
1610                         sock->state = SS_DISCONNECTING;
1611                         sk->sk_err = EINVAL;
1612                         retval = TIPC_OK;
1613                         break;
1614                 }
1615
1616                 tipc_sk_finish_conn(tsk, msg_origport(msg), msg_orignode(msg));
1617                 msg_set_importance(&tsk->phdr, msg_importance(msg));
1618                 sock->state = SS_CONNECTED;
1619
1620                 /* If an incoming message is an 'ACK-', it should be
1621                  * discarded here because it doesn't contain useful
1622                  * data. In addition, we should try to wake up
1623                  * connect() routine if sleeping.
1624                  */
1625                 if (msg_data_sz(msg) == 0) {
1626                         kfree_skb(*buf);
1627                         *buf = NULL;
1628                         if (waitqueue_active(sk_sleep(sk)))
1629                                 wake_up_interruptible(sk_sleep(sk));
1630                 }
1631                 retval = TIPC_OK;
1632                 break;
1633         case SS_LISTENING:
1634         case SS_UNCONNECTED:
1635                 /* Accept only SYN message */
1636                 if (!msg_connected(msg) && !(msg_errcode(msg)))
1637                         retval = TIPC_OK;
1638                 break;
1639         case SS_DISCONNECTING:
1640                 break;
1641         default:
1642                 pr_err("Unknown socket state %u\n", sock->state);
1643         }
1644         return retval;
1645 }
1646
1647 /**
1648  * rcvbuf_limit - get proper overload limit of socket receive queue
1649  * @sk: socket
1650  * @buf: message
1651  *
1652  * For all connection oriented messages, irrespective of importance,
1653  * the default overload value (i.e. 67MB) is set as limit.
1654  *
1655  * For all connectionless messages, by default new queue limits are
1656  * as belows:
1657  *
1658  * TIPC_LOW_IMPORTANCE       (4 MB)
1659  * TIPC_MEDIUM_IMPORTANCE    (8 MB)
1660  * TIPC_HIGH_IMPORTANCE      (16 MB)
1661  * TIPC_CRITICAL_IMPORTANCE  (32 MB)
1662  *
1663  * Returns overload limit according to corresponding message importance
1664  */
1665 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1666 {
1667         struct tipc_msg *msg = buf_msg(buf);
1668
1669         if (msg_connected(msg))
1670                 return sysctl_tipc_rmem[2];
1671
1672         return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
1673                 msg_importance(msg);
1674 }
1675
1676 /**
1677  * filter_rcv - validate incoming message
1678  * @sk: socket
1679  * @buf: message
1680  *
1681  * Enqueues message on receive queue if acceptable; optionally handles
1682  * disconnect indication for a connected socket.
1683  *
1684  * Called with socket lock already taken; port lock may also be taken.
1685  *
1686  * Returns 0 (TIPC_OK) if message was consumed, -TIPC error code if message
1687  * to be rejected, 1 (TIPC_FWD_MSG) if (CONN_MANAGER) message to be forwarded
1688  */
1689 static int filter_rcv(struct sock *sk, struct sk_buff *buf)
1690 {
1691         struct socket *sock = sk->sk_socket;
1692         struct tipc_sock *tsk = tipc_sk(sk);
1693         struct tipc_msg *msg = buf_msg(buf);
1694         unsigned int limit = rcvbuf_limit(sk, buf);
1695         u32 onode;
1696         int rc = TIPC_OK;
1697
1698         if (unlikely(msg_user(msg) == CONN_MANAGER))
1699                 return tipc_sk_proto_rcv(tsk, &onode, buf);
1700
1701         if (unlikely(msg_user(msg) == SOCK_WAKEUP)) {
1702                 kfree_skb(buf);
1703                 tsk->link_cong = 0;
1704                 sk->sk_write_space(sk);
1705                 return TIPC_OK;
1706         }
1707
1708         /* Reject message if it is wrong sort of message for socket */
1709         if (msg_type(msg) > TIPC_DIRECT_MSG)
1710                 return -TIPC_ERR_NO_PORT;
1711
1712         if (sock->state == SS_READY) {
1713                 if (msg_connected(msg))
1714                         return -TIPC_ERR_NO_PORT;
1715         } else {
1716                 rc = filter_connect(tsk, &buf);
1717                 if (rc != TIPC_OK || buf == NULL)
1718                         return rc;
1719         }
1720
1721         /* Reject message if there isn't room to queue it */
1722         if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
1723                 return -TIPC_ERR_OVERLOAD;
1724
1725         /* Enqueue message */
1726         TIPC_SKB_CB(buf)->handle = NULL;
1727         __skb_queue_tail(&sk->sk_receive_queue, buf);
1728         skb_set_owner_r(buf, sk);
1729
1730         sk->sk_data_ready(sk);
1731         return TIPC_OK;
1732 }
1733
1734 /**
1735  * tipc_backlog_rcv - handle incoming message from backlog queue
1736  * @sk: socket
1737  * @skb: message
1738  *
1739  * Caller must hold socket lock, but not port lock.
1740  *
1741  * Returns 0
1742  */
1743 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1744 {
1745         int rc;
1746         u32 onode;
1747         struct tipc_sock *tsk = tipc_sk(sk);
1748         uint truesize = skb->truesize;
1749
1750         rc = filter_rcv(sk, skb);
1751
1752         if (likely(!rc)) {
1753                 if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
1754                         atomic_add(truesize, &tsk->dupl_rcvcnt);
1755                 return 0;
1756         }
1757
1758         if ((rc < 0) && !tipc_msg_reverse(skb, &onode, -rc))
1759                 return 0;
1760
1761         tipc_link_xmit_skb(skb, onode, 0);
1762
1763         return 0;
1764 }
1765
1766 /**
1767  * tipc_sk_rcv - handle incoming message
1768  * @skb: buffer containing arriving message
1769  * Consumes buffer
1770  * Returns 0 if success, or errno: -EHOSTUNREACH
1771  */
1772 int tipc_sk_rcv(struct sk_buff *skb)
1773 {
1774         struct tipc_sock *tsk;
1775         struct sock *sk;
1776         u32 dport = msg_destport(buf_msg(skb));
1777         int rc = TIPC_OK;
1778         uint limit;
1779         u32 dnode;
1780
1781         /* Validate destination and message */
1782         tsk = tipc_sk_get(dport);
1783         if (unlikely(!tsk)) {
1784                 rc = tipc_msg_eval(skb, &dnode);
1785                 goto exit;
1786         }
1787         sk = &tsk->sk;
1788
1789         /* Queue message */
1790         spin_lock_bh(&sk->sk_lock.slock);
1791
1792         if (!sock_owned_by_user(sk)) {
1793                 rc = filter_rcv(sk, skb);
1794         } else {
1795                 if (sk->sk_backlog.len == 0)
1796                         atomic_set(&tsk->dupl_rcvcnt, 0);
1797                 limit = rcvbuf_limit(sk, skb) + atomic_read(&tsk->dupl_rcvcnt);
1798                 if (sk_add_backlog(sk, skb, limit))
1799                         rc = -TIPC_ERR_OVERLOAD;
1800         }
1801         spin_unlock_bh(&sk->sk_lock.slock);
1802         tipc_sk_put(tsk);
1803         if (likely(!rc))
1804                 return 0;
1805 exit:
1806         if ((rc < 0) && !tipc_msg_reverse(skb, &dnode, -rc))
1807                 return -EHOSTUNREACH;
1808
1809         tipc_link_xmit_skb(skb, dnode, 0);
1810         return (rc < 0) ? -EHOSTUNREACH : 0;
1811 }
1812
1813 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1814 {
1815         struct sock *sk = sock->sk;
1816         DEFINE_WAIT(wait);
1817         int done;
1818
1819         do {
1820                 int err = sock_error(sk);
1821                 if (err)
1822                         return err;
1823                 if (!*timeo_p)
1824                         return -ETIMEDOUT;
1825                 if (signal_pending(current))
1826                         return sock_intr_errno(*timeo_p);
1827
1828                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1829                 done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
1830                 finish_wait(sk_sleep(sk), &wait);
1831         } while (!done);
1832         return 0;
1833 }
1834
1835 /**
1836  * tipc_connect - establish a connection to another TIPC port
1837  * @sock: socket structure
1838  * @dest: socket address for destination port
1839  * @destlen: size of socket address data structure
1840  * @flags: file-related flags associated with socket
1841  *
1842  * Returns 0 on success, errno otherwise
1843  */
1844 static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1845                         int destlen, int flags)
1846 {
1847         struct sock *sk = sock->sk;
1848         struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1849         struct msghdr m = {NULL,};
1850         long timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
1851         socket_state previous;
1852         int res;
1853
1854         lock_sock(sk);
1855
1856         /* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1857         if (sock->state == SS_READY) {
1858                 res = -EOPNOTSUPP;
1859                 goto exit;
1860         }
1861
1862         /*
1863          * Reject connection attempt using multicast address
1864          *
1865          * Note: send_msg() validates the rest of the address fields,
1866          *       so there's no need to do it here
1867          */
1868         if (dst->addrtype == TIPC_ADDR_MCAST) {
1869                 res = -EINVAL;
1870                 goto exit;
1871         }
1872
1873         previous = sock->state;
1874         switch (sock->state) {
1875         case SS_UNCONNECTED:
1876                 /* Send a 'SYN-' to destination */
1877                 m.msg_name = dest;
1878                 m.msg_namelen = destlen;
1879
1880                 /* If connect is in non-blocking case, set MSG_DONTWAIT to
1881                  * indicate send_msg() is never blocked.
1882                  */
1883                 if (!timeout)
1884                         m.msg_flags = MSG_DONTWAIT;
1885
1886                 res = tipc_sendmsg(NULL, sock, &m, 0);
1887                 if ((res < 0) && (res != -EWOULDBLOCK))
1888                         goto exit;
1889
1890                 /* Just entered SS_CONNECTING state; the only
1891                  * difference is that return value in non-blocking
1892                  * case is EINPROGRESS, rather than EALREADY.
1893                  */
1894                 res = -EINPROGRESS;
1895         case SS_CONNECTING:
1896                 if (previous == SS_CONNECTING)
1897                         res = -EALREADY;
1898                 if (!timeout)
1899                         goto exit;
1900                 timeout = msecs_to_jiffies(timeout);
1901                 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1902                 res = tipc_wait_for_connect(sock, &timeout);
1903                 break;
1904         case SS_CONNECTED:
1905                 res = -EISCONN;
1906                 break;
1907         default:
1908                 res = -EINVAL;
1909                 break;
1910         }
1911 exit:
1912         release_sock(sk);
1913         return res;
1914 }
1915
1916 /**
1917  * tipc_listen - allow socket to listen for incoming connections
1918  * @sock: socket structure
1919  * @len: (unused)
1920  *
1921  * Returns 0 on success, errno otherwise
1922  */
1923 static int tipc_listen(struct socket *sock, int len)
1924 {
1925         struct sock *sk = sock->sk;
1926         int res;
1927
1928         lock_sock(sk);
1929
1930         if (sock->state != SS_UNCONNECTED)
1931                 res = -EINVAL;
1932         else {
1933                 sock->state = SS_LISTENING;
1934                 res = 0;
1935         }
1936
1937         release_sock(sk);
1938         return res;
1939 }
1940
1941 static int tipc_wait_for_accept(struct socket *sock, long timeo)
1942 {
1943         struct sock *sk = sock->sk;
1944         DEFINE_WAIT(wait);
1945         int err;
1946
1947         /* True wake-one mechanism for incoming connections: only
1948          * one process gets woken up, not the 'whole herd'.
1949          * Since we do not 'race & poll' for established sockets
1950          * anymore, the common case will execute the loop only once.
1951         */
1952         for (;;) {
1953                 prepare_to_wait_exclusive(sk_sleep(sk), &wait,
1954                                           TASK_INTERRUPTIBLE);
1955                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1956                         release_sock(sk);
1957                         timeo = schedule_timeout(timeo);
1958                         lock_sock(sk);
1959                 }
1960                 err = 0;
1961                 if (!skb_queue_empty(&sk->sk_receive_queue))
1962                         break;
1963                 err = -EINVAL;
1964                 if (sock->state != SS_LISTENING)
1965                         break;
1966                 err = sock_intr_errno(timeo);
1967                 if (signal_pending(current))
1968                         break;
1969                 err = -EAGAIN;
1970                 if (!timeo)
1971                         break;
1972         }
1973         finish_wait(sk_sleep(sk), &wait);
1974         return err;
1975 }
1976
1977 /**
1978  * tipc_accept - wait for connection request
1979  * @sock: listening socket
1980  * @newsock: new socket that is to be connected
1981  * @flags: file-related flags associated with socket
1982  *
1983  * Returns 0 on success, errno otherwise
1984  */
1985 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
1986 {
1987         struct sock *new_sk, *sk = sock->sk;
1988         struct sk_buff *buf;
1989         struct tipc_sock *new_tsock;
1990         struct tipc_msg *msg;
1991         long timeo;
1992         int res;
1993
1994         lock_sock(sk);
1995
1996         if (sock->state != SS_LISTENING) {
1997                 res = -EINVAL;
1998                 goto exit;
1999         }
2000         timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
2001         res = tipc_wait_for_accept(sock, timeo);
2002         if (res)
2003                 goto exit;
2004
2005         buf = skb_peek(&sk->sk_receive_queue);
2006
2007         res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
2008         if (res)
2009                 goto exit;
2010
2011         new_sk = new_sock->sk;
2012         new_tsock = tipc_sk(new_sk);
2013         msg = buf_msg(buf);
2014
2015         /* we lock on new_sk; but lockdep sees the lock on sk */
2016         lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
2017
2018         /*
2019          * Reject any stray messages received by new socket
2020          * before the socket lock was taken (very, very unlikely)
2021          */
2022         tsk_rej_rx_queue(new_sk);
2023
2024         /* Connect new socket to it's peer */
2025         tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
2026         new_sock->state = SS_CONNECTED;
2027
2028         tsk_set_importance(new_tsock, msg_importance(msg));
2029         if (msg_named(msg)) {
2030                 new_tsock->conn_type = msg_nametype(msg);
2031                 new_tsock->conn_instance = msg_nameinst(msg);
2032         }
2033
2034         /*
2035          * Respond to 'SYN-' by discarding it & returning 'ACK'-.
2036          * Respond to 'SYN+' by queuing it on new socket.
2037          */
2038         if (!msg_data_sz(msg)) {
2039                 struct msghdr m = {NULL,};
2040
2041                 tsk_advance_rx_queue(sk);
2042                 tipc_send_packet(NULL, new_sock, &m, 0);
2043         } else {
2044                 __skb_dequeue(&sk->sk_receive_queue);
2045                 __skb_queue_head(&new_sk->sk_receive_queue, buf);
2046                 skb_set_owner_r(buf, new_sk);
2047         }
2048         release_sock(new_sk);
2049 exit:
2050         release_sock(sk);
2051         return res;
2052 }
2053
2054 /**
2055  * tipc_shutdown - shutdown socket connection
2056  * @sock: socket structure
2057  * @how: direction to close (must be SHUT_RDWR)
2058  *
2059  * Terminates connection (if necessary), then purges socket's receive queue.
2060  *
2061  * Returns 0 on success, errno otherwise
2062  */
2063 static int tipc_shutdown(struct socket *sock, int how)
2064 {
2065         struct sock *sk = sock->sk;
2066         struct tipc_sock *tsk = tipc_sk(sk);
2067         struct sk_buff *skb;
2068         u32 dnode;
2069         int res;
2070
2071         if (how != SHUT_RDWR)
2072                 return -EINVAL;
2073
2074         lock_sock(sk);
2075
2076         switch (sock->state) {
2077         case SS_CONNECTING:
2078         case SS_CONNECTED:
2079
2080 restart:
2081                 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
2082                 skb = __skb_dequeue(&sk->sk_receive_queue);
2083                 if (skb) {
2084                         if (TIPC_SKB_CB(skb)->handle != NULL) {
2085                                 kfree_skb(skb);
2086                                 goto restart;
2087                         }
2088                         if (tipc_msg_reverse(skb, &dnode, TIPC_CONN_SHUTDOWN))
2089                                 tipc_link_xmit_skb(skb, dnode, tsk->ref);
2090                         tipc_node_remove_conn(dnode, tsk->ref);
2091                 } else {
2092                         dnode = tsk_peer_node(tsk);
2093                         skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2094                                               TIPC_CONN_MSG, SHORT_H_SIZE,
2095                                               0, dnode, tipc_own_addr,
2096                                               tsk_peer_port(tsk),
2097                                               tsk->ref, TIPC_CONN_SHUTDOWN);
2098                         tipc_link_xmit_skb(skb, dnode, tsk->ref);
2099                 }
2100                 tsk->connected = 0;
2101                 sock->state = SS_DISCONNECTING;
2102                 tipc_node_remove_conn(dnode, tsk->ref);
2103                 /* fall through */
2104
2105         case SS_DISCONNECTING:
2106
2107                 /* Discard any unreceived messages */
2108                 __skb_queue_purge(&sk->sk_receive_queue);
2109
2110                 /* Wake up anyone sleeping in poll */
2111                 sk->sk_state_change(sk);
2112                 res = 0;
2113                 break;
2114
2115         default:
2116                 res = -ENOTCONN;
2117         }
2118
2119         release_sock(sk);
2120         return res;
2121 }
2122
2123 static void tipc_sk_timeout(unsigned long ref)
2124 {
2125         struct tipc_sock *tsk;
2126         struct sock *sk;
2127         struct sk_buff *skb = NULL;
2128         u32 peer_port, peer_node;
2129
2130         tsk = tipc_sk_get(ref);
2131         if (!tsk)
2132                 return;
2133
2134         sk = &tsk->sk;
2135         bh_lock_sock(sk);
2136         if (!tsk->connected) {
2137                 bh_unlock_sock(sk);
2138                 goto exit;
2139         }
2140         peer_port = tsk_peer_port(tsk);
2141         peer_node = tsk_peer_node(tsk);
2142
2143         if (tsk->probing_state == TIPC_CONN_PROBING) {
2144                 /* Previous probe not answered -> self abort */
2145                 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
2146                                       SHORT_H_SIZE, 0, tipc_own_addr,
2147                                       peer_node, ref, peer_port,
2148                                       TIPC_ERR_NO_PORT);
2149         } else {
2150                 skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE,
2151                                       0, peer_node, tipc_own_addr,
2152                                       peer_port, ref, TIPC_OK);
2153                 tsk->probing_state = TIPC_CONN_PROBING;
2154                 k_start_timer(&tsk->timer, tsk->probing_interval);
2155         }
2156         bh_unlock_sock(sk);
2157         if (skb)
2158                 tipc_link_xmit_skb(skb, peer_node, ref);
2159 exit:
2160         tipc_sk_put(tsk);
2161 }
2162
2163 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
2164                            struct tipc_name_seq const *seq)
2165 {
2166         struct publication *publ;
2167         u32 key;
2168
2169         if (tsk->connected)
2170                 return -EINVAL;
2171         key = tsk->ref + tsk->pub_count + 1;
2172         if (key == tsk->ref)
2173                 return -EADDRINUSE;
2174
2175         publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
2176                                     scope, tsk->ref, key);
2177         if (unlikely(!publ))
2178                 return -EINVAL;
2179
2180         list_add(&publ->pport_list, &tsk->publications);
2181         tsk->pub_count++;
2182         tsk->published = 1;
2183         return 0;
2184 }
2185
2186 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2187                             struct tipc_name_seq const *seq)
2188 {
2189         struct publication *publ;
2190         struct publication *safe;
2191         int rc = -EINVAL;
2192
2193         list_for_each_entry_safe(publ, safe, &tsk->publications, pport_list) {
2194                 if (seq) {
2195                         if (publ->scope != scope)
2196                                 continue;
2197                         if (publ->type != seq->type)
2198                                 continue;
2199                         if (publ->lower != seq->lower)
2200                                 continue;
2201                         if (publ->upper != seq->upper)
2202                                 break;
2203                         tipc_nametbl_withdraw(publ->type, publ->lower,
2204                                               publ->ref, publ->key);
2205                         rc = 0;
2206                         break;
2207                 }
2208                 tipc_nametbl_withdraw(publ->type, publ->lower,
2209                                       publ->ref, publ->key);
2210                 rc = 0;
2211         }
2212         if (list_empty(&tsk->publications))
2213                 tsk->published = 0;
2214         return rc;
2215 }
2216
2217 static int tipc_sk_show(struct tipc_sock *tsk, char *buf,
2218                         int len, int full_id)
2219 {
2220         struct publication *publ;
2221         int ret;
2222
2223         if (full_id)
2224                 ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:",
2225                                     tipc_zone(tipc_own_addr),
2226                                     tipc_cluster(tipc_own_addr),
2227                                     tipc_node(tipc_own_addr), tsk->ref);
2228         else
2229                 ret = tipc_snprintf(buf, len, "%-10u:", tsk->ref);
2230
2231         if (tsk->connected) {
2232                 u32 dport = tsk_peer_port(tsk);
2233                 u32 destnode = tsk_peer_node(tsk);
2234
2235                 ret += tipc_snprintf(buf + ret, len - ret,
2236                                      " connected to <%u.%u.%u:%u>",
2237                                      tipc_zone(destnode),
2238                                      tipc_cluster(destnode),
2239                                      tipc_node(destnode), dport);
2240                 if (tsk->conn_type != 0)
2241                         ret += tipc_snprintf(buf + ret, len - ret,
2242                                              " via {%u,%u}", tsk->conn_type,
2243                                              tsk->conn_instance);
2244         } else if (tsk->published) {
2245                 ret += tipc_snprintf(buf + ret, len - ret, " bound to");
2246                 list_for_each_entry(publ, &tsk->publications, pport_list) {
2247                         if (publ->lower == publ->upper)
2248                                 ret += tipc_snprintf(buf + ret, len - ret,
2249                                                      " {%u,%u}", publ->type,
2250                                                      publ->lower);
2251                         else
2252                                 ret += tipc_snprintf(buf + ret, len - ret,
2253                                                      " {%u,%u,%u}", publ->type,
2254                                                      publ->lower, publ->upper);
2255                 }
2256         }
2257         ret += tipc_snprintf(buf + ret, len - ret, "\n");
2258         return ret;
2259 }
2260
2261 struct sk_buff *tipc_sk_socks_show(void)
2262 {
2263         struct sk_buff *buf;
2264         struct tlv_desc *rep_tlv;
2265         char *pb;
2266         int pb_len;
2267         struct tipc_sock *tsk;
2268         int str_len = 0;
2269         u32 ref = 0;
2270
2271         buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
2272         if (!buf)
2273                 return NULL;
2274         rep_tlv = (struct tlv_desc *)buf->data;
2275         pb = TLV_DATA(rep_tlv);
2276         pb_len = ULTRA_STRING_MAX_LEN;
2277
2278         tsk = tipc_sk_get_next(&ref);
2279         for (; tsk; tsk = tipc_sk_get_next(&ref)) {
2280                 lock_sock(&tsk->sk);
2281                 str_len += tipc_sk_show(tsk, pb + str_len,
2282                                         pb_len - str_len, 0);
2283                 release_sock(&tsk->sk);
2284                 tipc_sk_put(tsk);
2285         }
2286         str_len += 1;   /* for "\0" */
2287         skb_put(buf, TLV_SPACE(str_len));
2288         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
2289
2290         return buf;
2291 }
2292
2293 /* tipc_sk_reinit: set non-zero address in all existing sockets
2294  *                 when we go from standalone to network mode.
2295  */
2296 void tipc_sk_reinit(void)
2297 {
2298         struct tipc_msg *msg;
2299         u32 ref = 0;
2300         struct tipc_sock *tsk = tipc_sk_get_next(&ref);
2301
2302         for (; tsk; tsk = tipc_sk_get_next(&ref)) {
2303                 lock_sock(&tsk->sk);
2304                 msg = &tsk->phdr;
2305                 msg_set_prevnode(msg, tipc_own_addr);
2306                 msg_set_orignode(msg, tipc_own_addr);
2307                 release_sock(&tsk->sk);
2308                 tipc_sk_put(tsk);
2309         }
2310 }
2311
2312 /**
2313  * struct reference - TIPC socket reference entry
2314  * @tsk: pointer to socket associated with reference entry
2315  * @ref: reference value for socket (combines instance & array index info)
2316  */
2317 struct reference {
2318         struct tipc_sock *tsk;
2319         u32 ref;
2320 };
2321
2322 /**
2323  * struct tipc_ref_table - table of TIPC socket reference entries
2324  * @entries: pointer to array of reference entries
2325  * @capacity: array index of first unusable entry
2326  * @init_point: array index of first uninitialized entry
2327  * @first_free: array index of first unused socket reference entry
2328  * @last_free: array index of last unused socket reference entry
2329  * @index_mask: bitmask for array index portion of reference values
2330  * @start_mask: initial value for instance value portion of reference values
2331  */
2332 struct ref_table {
2333         struct reference *entries;
2334         u32 capacity;
2335         u32 init_point;
2336         u32 first_free;
2337         u32 last_free;
2338         u32 index_mask;
2339         u32 start_mask;
2340 };
2341
2342 /* Socket reference table consists of 2**N entries.
2343  *
2344  * State        Socket ptr      Reference
2345  * -----        ----------      ---------
2346  * In use        non-NULL       XXXX|own index
2347  *                              (XXXX changes each time entry is acquired)
2348  * Free            NULL         YYYY|next free index
2349  *                              (YYYY is one more than last used XXXX)
2350  * Uninitialized   NULL         0
2351  *
2352  * Entry 0 is not used; this allows index 0 to denote the end of the free list.
2353  *
2354  * Note that a reference value of 0 does not necessarily indicate that an
2355  * entry is uninitialized, since the last entry in the free list could also
2356  * have a reference value of 0 (although this is unlikely).
2357  */
2358
2359 static struct ref_table tipc_ref_table;
2360
2361 static DEFINE_RWLOCK(ref_table_lock);
2362
2363 /**
2364  * tipc_ref_table_init - create reference table for sockets
2365  */
2366 int tipc_sk_ref_table_init(u32 req_sz, u32 start)
2367 {
2368         struct reference *table;
2369         u32 actual_sz;
2370
2371         /* account for unused entry, then round up size to a power of 2 */
2372
2373         req_sz++;
2374         for (actual_sz = 16; actual_sz < req_sz; actual_sz <<= 1) {
2375                 /* do nothing */
2376         };
2377
2378         /* allocate table & mark all entries as uninitialized */
2379         table = vzalloc(actual_sz * sizeof(struct reference));
2380         if (table == NULL)
2381                 return -ENOMEM;
2382
2383         tipc_ref_table.entries = table;
2384         tipc_ref_table.capacity = req_sz;
2385         tipc_ref_table.init_point = 1;
2386         tipc_ref_table.first_free = 0;
2387         tipc_ref_table.last_free = 0;
2388         tipc_ref_table.index_mask = actual_sz - 1;
2389         tipc_ref_table.start_mask = start & ~tipc_ref_table.index_mask;
2390
2391         return 0;
2392 }
2393
2394 /**
2395  * tipc_ref_table_stop - destroy reference table for sockets
2396  */
2397 void tipc_sk_ref_table_stop(void)
2398 {
2399         if (!tipc_ref_table.entries)
2400                 return;
2401         vfree(tipc_ref_table.entries);
2402         tipc_ref_table.entries = NULL;
2403 }
2404
2405 /* tipc_ref_acquire - create reference to a socket
2406  *
2407  * Register an socket pointer in the reference table.
2408  * Returns a unique reference value that is used from then on to retrieve the
2409  * socket pointer, or to determine if the socket has been deregistered.
2410  */
2411 u32 tipc_sk_ref_acquire(struct tipc_sock *tsk)
2412 {
2413         u32 index;
2414         u32 index_mask;
2415         u32 next_plus_upper;
2416         u32 ref = 0;
2417         struct reference *entry;
2418
2419         if (unlikely(!tsk)) {
2420                 pr_err("Attempt to acquire ref. to non-existent obj\n");
2421                 return 0;
2422         }
2423         if (unlikely(!tipc_ref_table.entries)) {
2424                 pr_err("Ref. table not found in acquisition attempt\n");
2425                 return 0;
2426         }
2427
2428         /* Take a free entry, if available; otherwise initialize a new one */
2429         write_lock_bh(&ref_table_lock);
2430         index = tipc_ref_table.first_free;
2431         entry = &tipc_ref_table.entries[index];
2432
2433         if (likely(index)) {
2434                 index = tipc_ref_table.first_free;
2435                 entry = &tipc_ref_table.entries[index];
2436                 index_mask = tipc_ref_table.index_mask;
2437                 next_plus_upper = entry->ref;
2438                 tipc_ref_table.first_free = next_plus_upper & index_mask;
2439                 ref = (next_plus_upper & ~index_mask) + index;
2440                 entry->tsk = tsk;
2441         } else if (tipc_ref_table.init_point < tipc_ref_table.capacity) {
2442                 index = tipc_ref_table.init_point++;
2443                 entry = &tipc_ref_table.entries[index];
2444                 ref = tipc_ref_table.start_mask + index;
2445         }
2446
2447         if (ref) {
2448                 entry->ref = ref;
2449                 entry->tsk = tsk;
2450         }
2451         write_unlock_bh(&ref_table_lock);
2452         return ref;
2453 }
2454
2455 /* tipc_sk_ref_discard - invalidate reference to an socket
2456  *
2457  * Disallow future references to an socket and free up the entry for re-use.
2458  */
2459 void tipc_sk_ref_discard(u32 ref)
2460 {
2461         struct reference *entry;
2462         u32 index;
2463         u32 index_mask;
2464
2465         if (unlikely(!tipc_ref_table.entries)) {
2466                 pr_err("Ref. table not found during discard attempt\n");
2467                 return;
2468         }
2469
2470         index_mask = tipc_ref_table.index_mask;
2471         index = ref & index_mask;
2472         entry = &tipc_ref_table.entries[index];
2473
2474         write_lock_bh(&ref_table_lock);
2475
2476         if (unlikely(!entry->tsk)) {
2477                 pr_err("Attempt to discard ref. to non-existent socket\n");
2478                 goto exit;
2479         }
2480         if (unlikely(entry->ref != ref)) {
2481                 pr_err("Attempt to discard non-existent reference\n");
2482                 goto exit;
2483         }
2484
2485         /* Mark entry as unused; increment instance part of entry's
2486          *   reference to invalidate any subsequent references
2487          */
2488
2489         entry->tsk = NULL;
2490         entry->ref = (ref & ~index_mask) + (index_mask + 1);
2491
2492         /* Append entry to free entry list */
2493         if (unlikely(tipc_ref_table.first_free == 0))
2494                 tipc_ref_table.first_free = index;
2495         else
2496                 tipc_ref_table.entries[tipc_ref_table.last_free].ref |= index;
2497         tipc_ref_table.last_free = index;
2498 exit:
2499         write_unlock_bh(&ref_table_lock);
2500 }
2501
2502 /* tipc_sk_get - find referenced socket and return pointer to it
2503  */
2504 struct tipc_sock *tipc_sk_get(u32 ref)
2505 {
2506         struct reference *entry;
2507         struct tipc_sock *tsk;
2508
2509         if (unlikely(!tipc_ref_table.entries))
2510                 return NULL;
2511         read_lock_bh(&ref_table_lock);
2512         entry = &tipc_ref_table.entries[ref & tipc_ref_table.index_mask];
2513         tsk = entry->tsk;
2514         if (likely(tsk && (entry->ref == ref)))
2515                 sock_hold(&tsk->sk);
2516         else
2517                 tsk = NULL;
2518         read_unlock_bh(&ref_table_lock);
2519         return tsk;
2520 }
2521
2522 /* tipc_sk_get_next - lock & return next socket after referenced one
2523 */
2524 struct tipc_sock *tipc_sk_get_next(u32 *ref)
2525 {
2526         struct reference *entry;
2527         struct tipc_sock *tsk = NULL;
2528         uint index = *ref & tipc_ref_table.index_mask;
2529
2530         read_lock_bh(&ref_table_lock);
2531         while (++index < tipc_ref_table.capacity) {
2532                 entry = &tipc_ref_table.entries[index];
2533                 if (!entry->tsk)
2534                         continue;
2535                 tsk = entry->tsk;
2536                 sock_hold(&tsk->sk);
2537                 *ref = entry->ref;
2538                 break;
2539         }
2540         read_unlock_bh(&ref_table_lock);
2541         return tsk;
2542 }
2543
2544 static void tipc_sk_put(struct tipc_sock *tsk)
2545 {
2546         sock_put(&tsk->sk);
2547 }
2548
2549 /**
2550  * tipc_setsockopt - set socket option
2551  * @sock: socket structure
2552  * @lvl: option level
2553  * @opt: option identifier
2554  * @ov: pointer to new option value
2555  * @ol: length of option value
2556  *
2557  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
2558  * (to ease compatibility).
2559  *
2560  * Returns 0 on success, errno otherwise
2561  */
2562 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2563                            char __user *ov, unsigned int ol)
2564 {
2565         struct sock *sk = sock->sk;
2566         struct tipc_sock *tsk = tipc_sk(sk);
2567         u32 value;
2568         int res;
2569
2570         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2571                 return 0;
2572         if (lvl != SOL_TIPC)
2573                 return -ENOPROTOOPT;
2574         if (ol < sizeof(value))
2575                 return -EINVAL;
2576         res = get_user(value, (u32 __user *)ov);
2577         if (res)
2578                 return res;
2579
2580         lock_sock(sk);
2581
2582         switch (opt) {
2583         case TIPC_IMPORTANCE:
2584                 res = tsk_set_importance(tsk, value);
2585                 break;
2586         case TIPC_SRC_DROPPABLE:
2587                 if (sock->type != SOCK_STREAM)
2588                         tsk_set_unreliable(tsk, value);
2589                 else
2590                         res = -ENOPROTOOPT;
2591                 break;
2592         case TIPC_DEST_DROPPABLE:
2593                 tsk_set_unreturnable(tsk, value);
2594                 break;
2595         case TIPC_CONN_TIMEOUT:
2596                 tipc_sk(sk)->conn_timeout = value;
2597                 /* no need to set "res", since already 0 at this point */
2598                 break;
2599         default:
2600                 res = -EINVAL;
2601         }
2602
2603         release_sock(sk);
2604
2605         return res;
2606 }
2607
2608 /**
2609  * tipc_getsockopt - get socket option
2610  * @sock: socket structure
2611  * @lvl: option level
2612  * @opt: option identifier
2613  * @ov: receptacle for option value
2614  * @ol: receptacle for length of option value
2615  *
2616  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
2617  * (to ease compatibility).
2618  *
2619  * Returns 0 on success, errno otherwise
2620  */
2621 static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2622                            char __user *ov, int __user *ol)
2623 {
2624         struct sock *sk = sock->sk;
2625         struct tipc_sock *tsk = tipc_sk(sk);
2626         int len;
2627         u32 value;
2628         int res;
2629
2630         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2631                 return put_user(0, ol);
2632         if (lvl != SOL_TIPC)
2633                 return -ENOPROTOOPT;
2634         res = get_user(len, ol);
2635         if (res)
2636                 return res;
2637
2638         lock_sock(sk);
2639
2640         switch (opt) {
2641         case TIPC_IMPORTANCE:
2642                 value = tsk_importance(tsk);
2643                 break;
2644         case TIPC_SRC_DROPPABLE:
2645                 value = tsk_unreliable(tsk);
2646                 break;
2647         case TIPC_DEST_DROPPABLE:
2648                 value = tsk_unreturnable(tsk);
2649                 break;
2650         case TIPC_CONN_TIMEOUT:
2651                 value = tsk->conn_timeout;
2652                 /* no need to set "res", since already 0 at this point */
2653                 break;
2654         case TIPC_NODE_RECVQ_DEPTH:
2655                 value = 0; /* was tipc_queue_size, now obsolete */
2656                 break;
2657         case TIPC_SOCK_RECVQ_DEPTH:
2658                 value = skb_queue_len(&sk->sk_receive_queue);
2659                 break;
2660         default:
2661                 res = -EINVAL;
2662         }
2663
2664         release_sock(sk);
2665
2666         if (res)
2667                 return res;     /* "get" failed */
2668
2669         if (len < sizeof(value))
2670                 return -EINVAL;
2671
2672         if (copy_to_user(ov, &value, sizeof(value)))
2673                 return -EFAULT;
2674
2675         return put_user(sizeof(value), ol);
2676 }
2677
2678 static int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg)
2679 {
2680         struct tipc_sioc_ln_req lnr;
2681         void __user *argp = (void __user *)arg;
2682
2683         switch (cmd) {
2684         case SIOCGETLINKNAME:
2685                 if (copy_from_user(&lnr, argp, sizeof(lnr)))
2686                         return -EFAULT;
2687                 if (!tipc_node_get_linkname(lnr.bearer_id & 0xffff, lnr.peer,
2688                                             lnr.linkname, TIPC_MAX_LINK_NAME)) {
2689                         if (copy_to_user(argp, &lnr, sizeof(lnr)))
2690                                 return -EFAULT;
2691                         return 0;
2692                 }
2693                 return -EADDRNOTAVAIL;
2694         default:
2695                 return -ENOIOCTLCMD;
2696         }
2697 }
2698
2699 /* Protocol switches for the various types of TIPC sockets */
2700
2701 static const struct proto_ops msg_ops = {
2702         .owner          = THIS_MODULE,
2703         .family         = AF_TIPC,
2704         .release        = tipc_release,
2705         .bind           = tipc_bind,
2706         .connect        = tipc_connect,
2707         .socketpair     = sock_no_socketpair,
2708         .accept         = sock_no_accept,
2709         .getname        = tipc_getname,
2710         .poll           = tipc_poll,
2711         .ioctl          = tipc_ioctl,
2712         .listen         = sock_no_listen,
2713         .shutdown       = tipc_shutdown,
2714         .setsockopt     = tipc_setsockopt,
2715         .getsockopt     = tipc_getsockopt,
2716         .sendmsg        = tipc_sendmsg,
2717         .recvmsg        = tipc_recvmsg,
2718         .mmap           = sock_no_mmap,
2719         .sendpage       = sock_no_sendpage
2720 };
2721
2722 static const struct proto_ops packet_ops = {
2723         .owner          = THIS_MODULE,
2724         .family         = AF_TIPC,
2725         .release        = tipc_release,
2726         .bind           = tipc_bind,
2727         .connect        = tipc_connect,
2728         .socketpair     = sock_no_socketpair,
2729         .accept         = tipc_accept,
2730         .getname        = tipc_getname,
2731         .poll           = tipc_poll,
2732         .ioctl          = tipc_ioctl,
2733         .listen         = tipc_listen,
2734         .shutdown       = tipc_shutdown,
2735         .setsockopt     = tipc_setsockopt,
2736         .getsockopt     = tipc_getsockopt,
2737         .sendmsg        = tipc_send_packet,
2738         .recvmsg        = tipc_recvmsg,
2739         .mmap           = sock_no_mmap,
2740         .sendpage       = sock_no_sendpage
2741 };
2742
2743 static const struct proto_ops stream_ops = {
2744         .owner          = THIS_MODULE,
2745         .family         = AF_TIPC,
2746         .release        = tipc_release,
2747         .bind           = tipc_bind,
2748         .connect        = tipc_connect,
2749         .socketpair     = sock_no_socketpair,
2750         .accept         = tipc_accept,
2751         .getname        = tipc_getname,
2752         .poll           = tipc_poll,
2753         .ioctl          = tipc_ioctl,
2754         .listen         = tipc_listen,
2755         .shutdown       = tipc_shutdown,
2756         .setsockopt     = tipc_setsockopt,
2757         .getsockopt     = tipc_getsockopt,
2758         .sendmsg        = tipc_send_stream,
2759         .recvmsg        = tipc_recv_stream,
2760         .mmap           = sock_no_mmap,
2761         .sendpage       = sock_no_sendpage
2762 };
2763
2764 static const struct net_proto_family tipc_family_ops = {
2765         .owner          = THIS_MODULE,
2766         .family         = AF_TIPC,
2767         .create         = tipc_sk_create
2768 };
2769
2770 static struct proto tipc_proto = {
2771         .name           = "TIPC",
2772         .owner          = THIS_MODULE,
2773         .obj_size       = sizeof(struct tipc_sock),
2774         .sysctl_rmem    = sysctl_tipc_rmem
2775 };
2776
2777 static struct proto tipc_proto_kern = {
2778         .name           = "TIPC",
2779         .obj_size       = sizeof(struct tipc_sock),
2780         .sysctl_rmem    = sysctl_tipc_rmem
2781 };
2782
2783 /**
2784  * tipc_socket_init - initialize TIPC socket interface
2785  *
2786  * Returns 0 on success, errno otherwise
2787  */
2788 int tipc_socket_init(void)
2789 {
2790         int res;
2791
2792         res = proto_register(&tipc_proto, 1);
2793         if (res) {
2794                 pr_err("Failed to register TIPC protocol type\n");
2795                 goto out;
2796         }
2797
2798         res = sock_register(&tipc_family_ops);
2799         if (res) {
2800                 pr_err("Failed to register TIPC socket type\n");
2801                 proto_unregister(&tipc_proto);
2802                 goto out;
2803         }
2804  out:
2805         return res;
2806 }
2807
2808 /**
2809  * tipc_socket_stop - stop TIPC socket interface
2810  */
2811 void tipc_socket_stop(void)
2812 {
2813         sock_unregister(tipc_family_ops.family);
2814         proto_unregister(&tipc_proto);
2815 }
2816
2817 /* Caller should hold socket lock for the passed tipc socket. */
2818 static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
2819 {
2820         u32 peer_node;
2821         u32 peer_port;
2822         struct nlattr *nest;
2823
2824         peer_node = tsk_peer_node(tsk);
2825         peer_port = tsk_peer_port(tsk);
2826
2827         nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON);
2828
2829         if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
2830                 goto msg_full;
2831         if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
2832                 goto msg_full;
2833
2834         if (tsk->conn_type != 0) {
2835                 if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
2836                         goto msg_full;
2837                 if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type))
2838                         goto msg_full;
2839                 if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance))
2840                         goto msg_full;
2841         }
2842         nla_nest_end(skb, nest);
2843
2844         return 0;
2845
2846 msg_full:
2847         nla_nest_cancel(skb, nest);
2848
2849         return -EMSGSIZE;
2850 }
2851
2852 /* Caller should hold socket lock for the passed tipc socket. */
2853 static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
2854                             struct tipc_sock *tsk)
2855 {
2856         int err;
2857         void *hdr;
2858         struct nlattr *attrs;
2859
2860         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2861                           &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
2862         if (!hdr)
2863                 goto msg_cancel;
2864
2865         attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
2866         if (!attrs)
2867                 goto genlmsg_cancel;
2868         if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->ref))
2869                 goto attr_msg_cancel;
2870         if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tipc_own_addr))
2871                 goto attr_msg_cancel;
2872
2873         if (tsk->connected) {
2874                 err = __tipc_nl_add_sk_con(skb, tsk);
2875                 if (err)
2876                         goto attr_msg_cancel;
2877         } else if (!list_empty(&tsk->publications)) {
2878                 if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
2879                         goto attr_msg_cancel;
2880         }
2881         nla_nest_end(skb, attrs);
2882         genlmsg_end(skb, hdr);
2883
2884         return 0;
2885
2886 attr_msg_cancel:
2887         nla_nest_cancel(skb, attrs);
2888 genlmsg_cancel:
2889         genlmsg_cancel(skb, hdr);
2890 msg_cancel:
2891         return -EMSGSIZE;
2892 }
2893
2894 int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
2895 {
2896         int err;
2897         struct tipc_sock *tsk;
2898         u32 prev_ref = cb->args[0];
2899         u32 ref = prev_ref;
2900
2901         tsk = tipc_sk_get_next(&ref);
2902         for (; tsk; tsk = tipc_sk_get_next(&ref)) {
2903                 lock_sock(&tsk->sk);
2904                 err = __tipc_nl_add_sk(skb, cb, tsk);
2905                 release_sock(&tsk->sk);
2906                 tipc_sk_put(tsk);
2907                 if (err)
2908                         break;
2909
2910                 prev_ref = ref;
2911         }
2912
2913         cb->args[0] = prev_ref;
2914
2915         return skb->len;
2916 }
2917
2918 /* Caller should hold socket lock for the passed tipc socket. */
2919 static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
2920                                  struct netlink_callback *cb,
2921                                  struct publication *publ)
2922 {
2923         void *hdr;
2924         struct nlattr *attrs;
2925
2926         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2927                           &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
2928         if (!hdr)
2929                 goto msg_cancel;
2930
2931         attrs = nla_nest_start(skb, TIPC_NLA_PUBL);
2932         if (!attrs)
2933                 goto genlmsg_cancel;
2934
2935         if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
2936                 goto attr_msg_cancel;
2937         if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type))
2938                 goto attr_msg_cancel;
2939         if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower))
2940                 goto attr_msg_cancel;
2941         if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper))
2942                 goto attr_msg_cancel;
2943
2944         nla_nest_end(skb, attrs);
2945         genlmsg_end(skb, hdr);
2946
2947         return 0;
2948
2949 attr_msg_cancel:
2950         nla_nest_cancel(skb, attrs);
2951 genlmsg_cancel:
2952         genlmsg_cancel(skb, hdr);
2953 msg_cancel:
2954         return -EMSGSIZE;
2955 }
2956
2957 /* Caller should hold socket lock for the passed tipc socket. */
2958 static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
2959                                   struct netlink_callback *cb,
2960                                   struct tipc_sock *tsk, u32 *last_publ)
2961 {
2962         int err;
2963         struct publication *p;
2964
2965         if (*last_publ) {
2966                 list_for_each_entry(p, &tsk->publications, pport_list) {
2967                         if (p->key == *last_publ)
2968                                 break;
2969                 }
2970                 if (p->key != *last_publ) {
2971                         /* We never set seq or call nl_dump_check_consistent()
2972                          * this means that setting prev_seq here will cause the
2973                          * consistence check to fail in the netlink callback
2974                          * handler. Resulting in the last NLMSG_DONE message
2975                          * having the NLM_F_DUMP_INTR flag set.
2976                          */
2977                         cb->prev_seq = 1;
2978                         *last_publ = 0;
2979                         return -EPIPE;
2980                 }
2981         } else {
2982                 p = list_first_entry(&tsk->publications, struct publication,
2983                                      pport_list);
2984         }
2985
2986         list_for_each_entry_from(p, &tsk->publications, pport_list) {
2987                 err = __tipc_nl_add_sk_publ(skb, cb, p);
2988                 if (err) {
2989                         *last_publ = p->key;
2990                         return err;
2991                 }
2992         }
2993         *last_publ = 0;
2994
2995         return 0;
2996 }
2997
2998 int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
2999 {
3000         int err;
3001         u32 tsk_ref = cb->args[0];
3002         u32 last_publ = cb->args[1];
3003         u32 done = cb->args[2];
3004         struct tipc_sock *tsk;
3005
3006         if (!tsk_ref) {
3007                 struct nlattr **attrs;
3008                 struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
3009
3010                 err = tipc_nlmsg_parse(cb->nlh, &attrs);
3011                 if (err)
3012                         return err;
3013
3014                 err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,
3015                                        attrs[TIPC_NLA_SOCK],
3016                                        tipc_nl_sock_policy);
3017                 if (err)
3018                         return err;
3019
3020                 if (!sock[TIPC_NLA_SOCK_REF])
3021                         return -EINVAL;
3022
3023                 tsk_ref = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
3024         }
3025
3026         if (done)
3027                 return 0;
3028
3029         tsk = tipc_sk_get(tsk_ref);
3030         if (!tsk)
3031                 return -EINVAL;
3032
3033         lock_sock(&tsk->sk);
3034         err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
3035         if (!err)
3036                 done = 1;
3037         release_sock(&tsk->sk);
3038         tipc_sk_put(tsk);
3039
3040         cb->args[0] = tsk_ref;
3041         cb->args[1] = last_publ;
3042         cb->args[2] = done;
3043
3044         return skb->len;
3045 }