rxrpc: Use the peer record to distribute network errors
[cascardo/linux.git] / net / rxrpc / call_object.c
1 /* RxRPC individual remote procedure call handling
2  *
3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  */
11
12 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
13
14 #include <linux/slab.h>
15 #include <linux/module.h>
16 #include <linux/circ_buf.h>
17 #include <linux/hashtable.h>
18 #include <linux/spinlock_types.h>
19 #include <net/sock.h>
20 #include <net/af_rxrpc.h>
21 #include "ar-internal.h"
22
23 /*
24  * Maximum lifetime of a call (in jiffies).
25  */
26 unsigned int rxrpc_max_call_lifetime = 60 * HZ;
27
28 /*
29  * Time till dead call expires after last use (in jiffies).
30  */
31 unsigned int rxrpc_dead_call_expiry = 2 * HZ;
32
33 const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = {
34         [RXRPC_CALL_CLIENT_SEND_REQUEST]        = "ClSndReq",
35         [RXRPC_CALL_CLIENT_AWAIT_REPLY]         = "ClAwtRpl",
36         [RXRPC_CALL_CLIENT_RECV_REPLY]          = "ClRcvRpl",
37         [RXRPC_CALL_CLIENT_FINAL_ACK]           = "ClFnlACK",
38         [RXRPC_CALL_SERVER_SECURING]            = "SvSecure",
39         [RXRPC_CALL_SERVER_ACCEPTING]           = "SvAccept",
40         [RXRPC_CALL_SERVER_RECV_REQUEST]        = "SvRcvReq",
41         [RXRPC_CALL_SERVER_ACK_REQUEST]         = "SvAckReq",
42         [RXRPC_CALL_SERVER_SEND_REPLY]          = "SvSndRpl",
43         [RXRPC_CALL_SERVER_AWAIT_ACK]           = "SvAwtACK",
44         [RXRPC_CALL_COMPLETE]                   = "Complete",
45         [RXRPC_CALL_SERVER_BUSY]                = "SvBusy  ",
46         [RXRPC_CALL_REMOTELY_ABORTED]           = "RmtAbort",
47         [RXRPC_CALL_LOCALLY_ABORTED]            = "LocAbort",
48         [RXRPC_CALL_NETWORK_ERROR]              = "NetError",
49         [RXRPC_CALL_DEAD]                       = "Dead    ",
50 };
51
52 struct kmem_cache *rxrpc_call_jar;
53 LIST_HEAD(rxrpc_calls);
54 DEFINE_RWLOCK(rxrpc_call_lock);
55
56 static void rxrpc_destroy_call(struct work_struct *work);
57 static void rxrpc_call_life_expired(unsigned long _call);
58 static void rxrpc_dead_call_expired(unsigned long _call);
59 static void rxrpc_ack_time_expired(unsigned long _call);
60 static void rxrpc_resend_time_expired(unsigned long _call);
61
62 static DEFINE_SPINLOCK(rxrpc_call_hash_lock);
63 static DEFINE_HASHTABLE(rxrpc_call_hash, 10);
64
65 /*
66  * Hash function for rxrpc_call_hash
67  */
68 static unsigned long rxrpc_call_hashfunc(
69         u8              in_clientflag,
70         u32             cid,
71         u32             call_id,
72         u32             epoch,
73         u16             service_id,
74         sa_family_t     proto,
75         void            *localptr,
76         unsigned int    addr_size,
77         const u8        *peer_addr)
78 {
79         const u16 *p;
80         unsigned int i;
81         unsigned long key;
82
83         _enter("");
84
85         key = (unsigned long)localptr;
86         /* We just want to add up the __be32 values, so forcing the
87          * cast should be okay.
88          */
89         key += epoch;
90         key += service_id;
91         key += call_id;
92         key += (cid & RXRPC_CIDMASK) >> RXRPC_CIDSHIFT;
93         key += cid & RXRPC_CHANNELMASK;
94         key += in_clientflag;
95         key += proto;
96         /* Step through the peer address in 16-bit portions for speed */
97         for (i = 0, p = (const u16 *)peer_addr; i < addr_size >> 1; i++, p++)
98                 key += *p;
99         _leave(" key = 0x%lx", key);
100         return key;
101 }
102
103 /*
104  * Add a call to the hashtable
105  */
106 static void rxrpc_call_hash_add(struct rxrpc_call *call)
107 {
108         unsigned long key;
109         unsigned int addr_size = 0;
110
111         _enter("");
112         switch (call->proto) {
113         case AF_INET:
114                 addr_size = sizeof(call->peer_ip.ipv4_addr);
115                 break;
116         case AF_INET6:
117                 addr_size = sizeof(call->peer_ip.ipv6_addr);
118                 break;
119         default:
120                 break;
121         }
122         key = rxrpc_call_hashfunc(call->in_clientflag, call->cid,
123                                   call->call_id, call->epoch,
124                                   call->service_id, call->proto,
125                                   call->conn->trans->local, addr_size,
126                                   call->peer_ip.ipv6_addr);
127         /* Store the full key in the call */
128         call->hash_key = key;
129         spin_lock(&rxrpc_call_hash_lock);
130         hash_add_rcu(rxrpc_call_hash, &call->hash_node, key);
131         spin_unlock(&rxrpc_call_hash_lock);
132         _leave("");
133 }
134
135 /*
136  * Remove a call from the hashtable
137  */
138 static void rxrpc_call_hash_del(struct rxrpc_call *call)
139 {
140         _enter("");
141         spin_lock(&rxrpc_call_hash_lock);
142         hash_del_rcu(&call->hash_node);
143         spin_unlock(&rxrpc_call_hash_lock);
144         _leave("");
145 }
146
147 /*
148  * Find a call in the hashtable and return it, or NULL if it
149  * isn't there.
150  */
151 struct rxrpc_call *rxrpc_find_call_hash(
152         struct rxrpc_host_header *hdr,
153         void            *localptr,
154         sa_family_t     proto,
155         const void      *peer_addr)
156 {
157         unsigned long key;
158         unsigned int addr_size = 0;
159         struct rxrpc_call *call = NULL;
160         struct rxrpc_call *ret = NULL;
161         u8 in_clientflag = hdr->flags & RXRPC_CLIENT_INITIATED;
162
163         _enter("");
164         switch (proto) {
165         case AF_INET:
166                 addr_size = sizeof(call->peer_ip.ipv4_addr);
167                 break;
168         case AF_INET6:
169                 addr_size = sizeof(call->peer_ip.ipv6_addr);
170                 break;
171         default:
172                 break;
173         }
174
175         key = rxrpc_call_hashfunc(in_clientflag, hdr->cid, hdr->callNumber,
176                                   hdr->epoch, hdr->serviceId,
177                                   proto, localptr, addr_size,
178                                   peer_addr);
179         hash_for_each_possible_rcu(rxrpc_call_hash, call, hash_node, key) {
180                 if (call->hash_key == key &&
181                     call->call_id == hdr->callNumber &&
182                     call->cid == hdr->cid &&
183                     call->in_clientflag == in_clientflag &&
184                     call->service_id == hdr->serviceId &&
185                     call->proto == proto &&
186                     call->local == localptr &&
187                     memcmp(call->peer_ip.ipv6_addr, peer_addr,
188                            addr_size) == 0 &&
189                     call->epoch == hdr->epoch) {
190                         ret = call;
191                         break;
192                 }
193         }
194         _leave(" = %p", ret);
195         return ret;
196 }
197
198 /*
199  * find an extant server call
200  * - called in process context with IRQs enabled
201  */
202 struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *rx,
203                                               unsigned long user_call_ID)
204 {
205         struct rxrpc_call *call;
206         struct rb_node *p;
207
208         _enter("%p,%lx", rx, user_call_ID);
209
210         read_lock(&rx->call_lock);
211
212         p = rx->calls.rb_node;
213         while (p) {
214                 call = rb_entry(p, struct rxrpc_call, sock_node);
215
216                 if (user_call_ID < call->user_call_ID)
217                         p = p->rb_left;
218                 else if (user_call_ID > call->user_call_ID)
219                         p = p->rb_right;
220                 else
221                         goto found_extant_call;
222         }
223
224         read_unlock(&rx->call_lock);
225         _leave(" = NULL");
226         return NULL;
227
228 found_extant_call:
229         rxrpc_get_call(call);
230         read_unlock(&rx->call_lock);
231         _leave(" = %p [%d]", call, atomic_read(&call->usage));
232         return call;
233 }
234
235 /*
236  * allocate a new call
237  */
238 static struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
239 {
240         struct rxrpc_call *call;
241
242         call = kmem_cache_zalloc(rxrpc_call_jar, gfp);
243         if (!call)
244                 return NULL;
245
246         call->acks_winsz = 16;
247         call->acks_window = kmalloc(call->acks_winsz * sizeof(unsigned long),
248                                     gfp);
249         if (!call->acks_window) {
250                 kmem_cache_free(rxrpc_call_jar, call);
251                 return NULL;
252         }
253
254         setup_timer(&call->lifetimer, &rxrpc_call_life_expired,
255                     (unsigned long) call);
256         setup_timer(&call->deadspan, &rxrpc_dead_call_expired,
257                     (unsigned long) call);
258         setup_timer(&call->ack_timer, &rxrpc_ack_time_expired,
259                     (unsigned long) call);
260         setup_timer(&call->resend_timer, &rxrpc_resend_time_expired,
261                     (unsigned long) call);
262         INIT_WORK(&call->destroyer, &rxrpc_destroy_call);
263         INIT_WORK(&call->processor, &rxrpc_process_call);
264         INIT_LIST_HEAD(&call->accept_link);
265         skb_queue_head_init(&call->rx_queue);
266         skb_queue_head_init(&call->rx_oos_queue);
267         init_waitqueue_head(&call->tx_waitq);
268         spin_lock_init(&call->lock);
269         rwlock_init(&call->state_lock);
270         atomic_set(&call->usage, 1);
271         call->debug_id = atomic_inc_return(&rxrpc_debug_id);
272         call->state = RXRPC_CALL_CLIENT_SEND_REQUEST;
273
274         memset(&call->sock_node, 0xed, sizeof(call->sock_node));
275
276         call->rx_data_expect = 1;
277         call->rx_data_eaten = 0;
278         call->rx_first_oos = 0;
279         call->ackr_win_top = call->rx_data_eaten + 1 + rxrpc_rx_window_size;
280         call->creation_jif = jiffies;
281         return call;
282 }
283
284 /*
285  * allocate a new client call and attempt to get a connection slot for it
286  */
287 static struct rxrpc_call *rxrpc_alloc_client_call(
288         struct rxrpc_sock *rx,
289         struct rxrpc_transport *trans,
290         struct rxrpc_conn_bundle *bundle,
291         gfp_t gfp)
292 {
293         struct rxrpc_call *call;
294         int ret;
295
296         _enter("");
297
298         ASSERT(rx != NULL);
299         ASSERT(trans != NULL);
300         ASSERT(bundle != NULL);
301
302         call = rxrpc_alloc_call(gfp);
303         if (!call)
304                 return ERR_PTR(-ENOMEM);
305
306         sock_hold(&rx->sk);
307         call->socket = rx;
308         call->rx_data_post = 1;
309
310         ret = rxrpc_connect_call(rx, trans, bundle, call, gfp);
311         if (ret < 0) {
312                 kmem_cache_free(rxrpc_call_jar, call);
313                 return ERR_PTR(ret);
314         }
315
316         /* Record copies of information for hashtable lookup */
317         call->proto = rx->proto;
318         call->local = trans->local;
319         switch (call->proto) {
320         case AF_INET:
321                 call->peer_ip.ipv4_addr =
322                         trans->peer->srx.transport.sin.sin_addr.s_addr;
323                 break;
324         case AF_INET6:
325                 memcpy(call->peer_ip.ipv6_addr,
326                        trans->peer->srx.transport.sin6.sin6_addr.in6_u.u6_addr8,
327                        sizeof(call->peer_ip.ipv6_addr));
328                 break;
329         }
330         call->epoch = call->conn->epoch;
331         call->service_id = call->conn->service_id;
332         call->in_clientflag = call->conn->in_clientflag;
333         /* Add the new call to the hashtable */
334         rxrpc_call_hash_add(call);
335
336         spin_lock(&call->conn->trans->peer->lock);
337         hlist_add_head(&call->error_link, &call->conn->trans->peer->error_targets);
338         spin_unlock(&call->conn->trans->peer->lock);
339
340         call->lifetimer.expires = jiffies + rxrpc_max_call_lifetime;
341         add_timer(&call->lifetimer);
342
343         _leave(" = %p", call);
344         return call;
345 }
346
347 /*
348  * set up a call for the given data
349  * - called in process context with IRQs enabled
350  */
351 struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
352                                          struct rxrpc_transport *trans,
353                                          struct rxrpc_conn_bundle *bundle,
354                                          unsigned long user_call_ID,
355                                          gfp_t gfp)
356 {
357         struct rxrpc_call *call, *xcall;
358         struct rb_node *parent, **pp;
359
360         _enter("%p,%d,%d,%lx",
361                rx, trans->debug_id, bundle ? bundle->debug_id : -1,
362                user_call_ID);
363
364         call = rxrpc_alloc_client_call(rx, trans, bundle, gfp);
365         if (IS_ERR(call)) {
366                 _leave(" = %ld", PTR_ERR(call));
367                 return call;
368         }
369
370         call->user_call_ID = user_call_ID;
371         __set_bit(RXRPC_CALL_HAS_USERID, &call->flags);
372
373         write_lock(&rx->call_lock);
374
375         pp = &rx->calls.rb_node;
376         parent = NULL;
377         while (*pp) {
378                 parent = *pp;
379                 xcall = rb_entry(parent, struct rxrpc_call, sock_node);
380
381                 if (user_call_ID < xcall->user_call_ID)
382                         pp = &(*pp)->rb_left;
383                 else if (user_call_ID > xcall->user_call_ID)
384                         pp = &(*pp)->rb_right;
385                 else
386                         goto found_user_ID_now_present;
387         }
388
389         rxrpc_get_call(call);
390
391         rb_link_node(&call->sock_node, parent, pp);
392         rb_insert_color(&call->sock_node, &rx->calls);
393         write_unlock(&rx->call_lock);
394
395         write_lock_bh(&rxrpc_call_lock);
396         list_add_tail(&call->link, &rxrpc_calls);
397         write_unlock_bh(&rxrpc_call_lock);
398
399         _net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);
400
401         _leave(" = %p [new]", call);
402         return call;
403
404         /* We unexpectedly found the user ID in the list after taking
405          * the call_lock.  This shouldn't happen unless the user races
406          * with itself and tries to add the same user ID twice at the
407          * same time in different threads.
408          */
409 found_user_ID_now_present:
410         write_unlock(&rx->call_lock);
411         rxrpc_put_call(call);
412         _leave(" = -EEXIST [%p]", call);
413         return ERR_PTR(-EEXIST);
414 }
415
416 /*
417  * set up an incoming call
418  * - called in process context with IRQs enabled
419  */
420 struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
421                                        struct rxrpc_connection *conn,
422                                        struct rxrpc_host_header *hdr)
423 {
424         struct rxrpc_call *call, *candidate;
425         struct rb_node **p, *parent;
426         u32 call_id;
427
428         _enter(",%d", conn->debug_id);
429
430         ASSERT(rx != NULL);
431
432         candidate = rxrpc_alloc_call(GFP_NOIO);
433         if (!candidate)
434                 return ERR_PTR(-EBUSY);
435
436         candidate->socket = rx;
437         candidate->conn = conn;
438         candidate->cid = hdr->cid;
439         candidate->call_id = hdr->callNumber;
440         candidate->channel = hdr->cid & RXRPC_CHANNELMASK;
441         candidate->rx_data_post = 0;
442         candidate->state = RXRPC_CALL_SERVER_ACCEPTING;
443         if (conn->security_ix > 0)
444                 candidate->state = RXRPC_CALL_SERVER_SECURING;
445
446         write_lock_bh(&conn->lock);
447
448         /* set the channel for this call */
449         call = conn->channels[candidate->channel];
450         _debug("channel[%u] is %p", candidate->channel, call);
451         if (call && call->call_id == hdr->callNumber) {
452                 /* already set; must've been a duplicate packet */
453                 _debug("extant call [%d]", call->state);
454                 ASSERTCMP(call->conn, ==, conn);
455
456                 read_lock(&call->state_lock);
457                 switch (call->state) {
458                 case RXRPC_CALL_LOCALLY_ABORTED:
459                         if (!test_and_set_bit(RXRPC_CALL_EV_ABORT, &call->events))
460                                 rxrpc_queue_call(call);
461                 case RXRPC_CALL_REMOTELY_ABORTED:
462                         read_unlock(&call->state_lock);
463                         goto aborted_call;
464                 default:
465                         rxrpc_get_call(call);
466                         read_unlock(&call->state_lock);
467                         goto extant_call;
468                 }
469         }
470
471         if (call) {
472                 /* it seems the channel is still in use from the previous call
473                  * - ditch the old binding if its call is now complete */
474                 _debug("CALL: %u { %s }",
475                        call->debug_id, rxrpc_call_states[call->state]);
476
477                 if (call->state >= RXRPC_CALL_COMPLETE) {
478                         conn->channels[call->channel] = NULL;
479                 } else {
480                         write_unlock_bh(&conn->lock);
481                         kmem_cache_free(rxrpc_call_jar, candidate);
482                         _leave(" = -EBUSY");
483                         return ERR_PTR(-EBUSY);
484                 }
485         }
486
487         /* check the call number isn't duplicate */
488         _debug("check dup");
489         call_id = hdr->callNumber;
490         p = &conn->calls.rb_node;
491         parent = NULL;
492         while (*p) {
493                 parent = *p;
494                 call = rb_entry(parent, struct rxrpc_call, conn_node);
495
496                 /* The tree is sorted in order of the __be32 value without
497                  * turning it into host order.
498                  */
499                 if (call_id < call->call_id)
500                         p = &(*p)->rb_left;
501                 else if (call_id > call->call_id)
502                         p = &(*p)->rb_right;
503                 else
504                         goto old_call;
505         }
506
507         /* make the call available */
508         _debug("new call");
509         call = candidate;
510         candidate = NULL;
511         rb_link_node(&call->conn_node, parent, p);
512         rb_insert_color(&call->conn_node, &conn->calls);
513         conn->channels[call->channel] = call;
514         sock_hold(&rx->sk);
515         atomic_inc(&conn->usage);
516         write_unlock_bh(&conn->lock);
517
518         spin_lock(&conn->trans->peer->lock);
519         hlist_add_head(&call->error_link, &conn->trans->peer->error_targets);
520         spin_unlock(&conn->trans->peer->lock);
521
522         write_lock_bh(&rxrpc_call_lock);
523         list_add_tail(&call->link, &rxrpc_calls);
524         write_unlock_bh(&rxrpc_call_lock);
525
526         /* Record copies of information for hashtable lookup */
527         call->proto = rx->proto;
528         call->local = conn->trans->local;
529         switch (call->proto) {
530         case AF_INET:
531                 call->peer_ip.ipv4_addr =
532                         conn->trans->peer->srx.transport.sin.sin_addr.s_addr;
533                 break;
534         case AF_INET6:
535                 memcpy(call->peer_ip.ipv6_addr,
536                        conn->trans->peer->srx.transport.sin6.sin6_addr.in6_u.u6_addr8,
537                        sizeof(call->peer_ip.ipv6_addr));
538                 break;
539         default:
540                 break;
541         }
542         call->epoch = conn->epoch;
543         call->service_id = conn->service_id;
544         call->in_clientflag = conn->in_clientflag;
545         /* Add the new call to the hashtable */
546         rxrpc_call_hash_add(call);
547
548         _net("CALL incoming %d on CONN %d", call->debug_id, call->conn->debug_id);
549
550         call->lifetimer.expires = jiffies + rxrpc_max_call_lifetime;
551         add_timer(&call->lifetimer);
552         _leave(" = %p {%d} [new]", call, call->debug_id);
553         return call;
554
555 extant_call:
556         write_unlock_bh(&conn->lock);
557         kmem_cache_free(rxrpc_call_jar, candidate);
558         _leave(" = %p {%d} [extant]", call, call ? call->debug_id : -1);
559         return call;
560
561 aborted_call:
562         write_unlock_bh(&conn->lock);
563         kmem_cache_free(rxrpc_call_jar, candidate);
564         _leave(" = -ECONNABORTED");
565         return ERR_PTR(-ECONNABORTED);
566
567 old_call:
568         write_unlock_bh(&conn->lock);
569         kmem_cache_free(rxrpc_call_jar, candidate);
570         _leave(" = -ECONNRESET [old]");
571         return ERR_PTR(-ECONNRESET);
572 }
573
574 /*
575  * detach a call from a socket and set up for release
576  */
577 void rxrpc_release_call(struct rxrpc_call *call)
578 {
579         struct rxrpc_connection *conn = call->conn;
580         struct rxrpc_sock *rx = call->socket;
581
582         _enter("{%d,%d,%d,%d}",
583                call->debug_id, atomic_read(&call->usage),
584                atomic_read(&call->ackr_not_idle),
585                call->rx_first_oos);
586
587         spin_lock_bh(&call->lock);
588         if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
589                 BUG();
590         spin_unlock_bh(&call->lock);
591
592         /* dissociate from the socket
593          * - the socket's ref on the call is passed to the death timer
594          */
595         _debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);
596
597         write_lock_bh(&rx->call_lock);
598         if (!list_empty(&call->accept_link)) {
599                 _debug("unlinking once-pending call %p { e=%lx f=%lx }",
600                        call, call->events, call->flags);
601                 ASSERT(!test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
602                 list_del_init(&call->accept_link);
603                 sk_acceptq_removed(&rx->sk);
604         } else if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
605                 rb_erase(&call->sock_node, &rx->calls);
606                 memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
607                 clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
608         }
609         write_unlock_bh(&rx->call_lock);
610
611         /* free up the channel for reuse */
612         spin_lock(&conn->trans->client_lock);
613         write_lock_bh(&conn->lock);
614         write_lock(&call->state_lock);
615
616         if (conn->channels[call->channel] == call)
617                 conn->channels[call->channel] = NULL;
618
619         if (conn->out_clientflag && conn->bundle) {
620                 conn->avail_calls++;
621                 switch (conn->avail_calls) {
622                 case 1:
623                         list_move_tail(&conn->bundle_link,
624                                        &conn->bundle->avail_conns);
625                 case 2 ... RXRPC_MAXCALLS - 1:
626                         ASSERT(conn->channels[0] == NULL ||
627                                conn->channels[1] == NULL ||
628                                conn->channels[2] == NULL ||
629                                conn->channels[3] == NULL);
630                         break;
631                 case RXRPC_MAXCALLS:
632                         list_move_tail(&conn->bundle_link,
633                                        &conn->bundle->unused_conns);
634                         ASSERT(conn->channels[0] == NULL &&
635                                conn->channels[1] == NULL &&
636                                conn->channels[2] == NULL &&
637                                conn->channels[3] == NULL);
638                         break;
639                 default:
640                         pr_err("conn->avail_calls=%d\n", conn->avail_calls);
641                         BUG();
642                 }
643         }
644
645         spin_unlock(&conn->trans->client_lock);
646
647         if (call->state < RXRPC_CALL_COMPLETE &&
648             call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
649                 _debug("+++ ABORTING STATE %d +++\n", call->state);
650                 call->state = RXRPC_CALL_LOCALLY_ABORTED;
651                 call->local_abort = RX_CALL_DEAD;
652                 set_bit(RXRPC_CALL_EV_ABORT, &call->events);
653                 rxrpc_queue_call(call);
654         }
655         write_unlock(&call->state_lock);
656         write_unlock_bh(&conn->lock);
657
658         /* clean up the Rx queue */
659         if (!skb_queue_empty(&call->rx_queue) ||
660             !skb_queue_empty(&call->rx_oos_queue)) {
661                 struct rxrpc_skb_priv *sp;
662                 struct sk_buff *skb;
663
664                 _debug("purge Rx queues");
665
666                 spin_lock_bh(&call->lock);
667                 while ((skb = skb_dequeue(&call->rx_queue)) ||
668                        (skb = skb_dequeue(&call->rx_oos_queue))) {
669                         sp = rxrpc_skb(skb);
670                         if (sp->call) {
671                                 ASSERTCMP(sp->call, ==, call);
672                                 rxrpc_put_call(call);
673                                 sp->call = NULL;
674                         }
675                         skb->destructor = NULL;
676                         spin_unlock_bh(&call->lock);
677
678                         _debug("- zap %s %%%u #%u",
679                                rxrpc_pkts[sp->hdr.type],
680                                sp->hdr.serial, sp->hdr.seq);
681                         rxrpc_free_skb(skb);
682                         spin_lock_bh(&call->lock);
683                 }
684                 spin_unlock_bh(&call->lock);
685
686                 ASSERTCMP(call->state, !=, RXRPC_CALL_COMPLETE);
687         }
688
689         del_timer_sync(&call->resend_timer);
690         del_timer_sync(&call->ack_timer);
691         del_timer_sync(&call->lifetimer);
692         call->deadspan.expires = jiffies + rxrpc_dead_call_expiry;
693         add_timer(&call->deadspan);
694
695         _leave("");
696 }
697
698 /*
699  * handle a dead call being ready for reaping
700  */
701 static void rxrpc_dead_call_expired(unsigned long _call)
702 {
703         struct rxrpc_call *call = (struct rxrpc_call *) _call;
704
705         _enter("{%d}", call->debug_id);
706
707         write_lock_bh(&call->state_lock);
708         call->state = RXRPC_CALL_DEAD;
709         write_unlock_bh(&call->state_lock);
710         rxrpc_put_call(call);
711 }
712
713 /*
714  * mark a call as to be released, aborting it if it's still in progress
715  * - called with softirqs disabled
716  */
717 static void rxrpc_mark_call_released(struct rxrpc_call *call)
718 {
719         bool sched;
720
721         write_lock(&call->state_lock);
722         if (call->state < RXRPC_CALL_DEAD) {
723                 sched = false;
724                 if (call->state < RXRPC_CALL_COMPLETE) {
725                         _debug("abort call %p", call);
726                         call->state = RXRPC_CALL_LOCALLY_ABORTED;
727                         call->local_abort = RX_CALL_DEAD;
728                         if (!test_and_set_bit(RXRPC_CALL_EV_ABORT, &call->events))
729                                 sched = true;
730                 }
731                 if (!test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
732                         sched = true;
733                 if (sched)
734                         rxrpc_queue_call(call);
735         }
736         write_unlock(&call->state_lock);
737 }
738
739 /*
740  * release all the calls associated with a socket
741  */
742 void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
743 {
744         struct rxrpc_call *call;
745         struct rb_node *p;
746
747         _enter("%p", rx);
748
749         read_lock_bh(&rx->call_lock);
750
751         /* mark all the calls as no longer wanting incoming packets */
752         for (p = rb_first(&rx->calls); p; p = rb_next(p)) {
753                 call = rb_entry(p, struct rxrpc_call, sock_node);
754                 rxrpc_mark_call_released(call);
755         }
756
757         /* kill the not-yet-accepted incoming calls */
758         list_for_each_entry(call, &rx->secureq, accept_link) {
759                 rxrpc_mark_call_released(call);
760         }
761
762         list_for_each_entry(call, &rx->acceptq, accept_link) {
763                 rxrpc_mark_call_released(call);
764         }
765
766         read_unlock_bh(&rx->call_lock);
767         _leave("");
768 }
769
770 /*
771  * release a call
772  */
773 void __rxrpc_put_call(struct rxrpc_call *call)
774 {
775         ASSERT(call != NULL);
776
777         _enter("%p{u=%d}", call, atomic_read(&call->usage));
778
779         ASSERTCMP(atomic_read(&call->usage), >, 0);
780
781         if (atomic_dec_and_test(&call->usage)) {
782                 _debug("call %d dead", call->debug_id);
783                 ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
784                 rxrpc_queue_work(&call->destroyer);
785         }
786         _leave("");
787 }
788
789 /*
790  * clean up a call
791  */
792 static void rxrpc_cleanup_call(struct rxrpc_call *call)
793 {
794         _net("DESTROY CALL %d", call->debug_id);
795
796         ASSERT(call->socket);
797
798         memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
799
800         del_timer_sync(&call->lifetimer);
801         del_timer_sync(&call->deadspan);
802         del_timer_sync(&call->ack_timer);
803         del_timer_sync(&call->resend_timer);
804
805         ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
806         ASSERTCMP(call->events, ==, 0);
807         if (work_pending(&call->processor)) {
808                 _debug("defer destroy");
809                 rxrpc_queue_work(&call->destroyer);
810                 return;
811         }
812
813         if (call->conn) {
814                 spin_lock(&call->conn->trans->peer->lock);
815                 hlist_del_init(&call->error_link);
816                 spin_unlock(&call->conn->trans->peer->lock);
817
818                 write_lock_bh(&call->conn->lock);
819                 rb_erase(&call->conn_node, &call->conn->calls);
820                 write_unlock_bh(&call->conn->lock);
821                 rxrpc_put_connection(call->conn);
822         }
823
824         /* Remove the call from the hash */
825         rxrpc_call_hash_del(call);
826
827         if (call->acks_window) {
828                 _debug("kill Tx window %d",
829                        CIRC_CNT(call->acks_head, call->acks_tail,
830                                 call->acks_winsz));
831                 smp_mb();
832                 while (CIRC_CNT(call->acks_head, call->acks_tail,
833                                 call->acks_winsz) > 0) {
834                         struct rxrpc_skb_priv *sp;
835                         unsigned long _skb;
836
837                         _skb = call->acks_window[call->acks_tail] & ~1;
838                         sp = rxrpc_skb((struct sk_buff *)_skb);
839                         _debug("+++ clear Tx %u", sp->hdr.seq);
840                         rxrpc_free_skb((struct sk_buff *)_skb);
841                         call->acks_tail =
842                                 (call->acks_tail + 1) & (call->acks_winsz - 1);
843                 }
844
845                 kfree(call->acks_window);
846         }
847
848         rxrpc_free_skb(call->tx_pending);
849
850         rxrpc_purge_queue(&call->rx_queue);
851         ASSERT(skb_queue_empty(&call->rx_oos_queue));
852         sock_put(&call->socket->sk);
853         kmem_cache_free(rxrpc_call_jar, call);
854 }
855
856 /*
857  * destroy a call
858  */
859 static void rxrpc_destroy_call(struct work_struct *work)
860 {
861         struct rxrpc_call *call =
862                 container_of(work, struct rxrpc_call, destroyer);
863
864         _enter("%p{%d,%d,%p}",
865                call, atomic_read(&call->usage), call->channel, call->conn);
866
867         ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
868
869         write_lock_bh(&rxrpc_call_lock);
870         list_del_init(&call->link);
871         write_unlock_bh(&rxrpc_call_lock);
872
873         rxrpc_cleanup_call(call);
874         _leave("");
875 }
876
877 /*
878  * preemptively destroy all the call records from a transport endpoint rather
879  * than waiting for them to time out
880  */
881 void __exit rxrpc_destroy_all_calls(void)
882 {
883         struct rxrpc_call *call;
884
885         _enter("");
886         write_lock_bh(&rxrpc_call_lock);
887
888         while (!list_empty(&rxrpc_calls)) {
889                 call = list_entry(rxrpc_calls.next, struct rxrpc_call, link);
890                 _debug("Zapping call %p", call);
891
892                 list_del_init(&call->link);
893
894                 switch (atomic_read(&call->usage)) {
895                 case 0:
896                         ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
897                         break;
898                 case 1:
899                         if (del_timer_sync(&call->deadspan) != 0 &&
900                             call->state != RXRPC_CALL_DEAD)
901                                 rxrpc_dead_call_expired((unsigned long) call);
902                         if (call->state != RXRPC_CALL_DEAD)
903                                 break;
904                 default:
905                         pr_err("Call %p still in use (%d,%d,%s,%lx,%lx)!\n",
906                                call, atomic_read(&call->usage),
907                                atomic_read(&call->ackr_not_idle),
908                                rxrpc_call_states[call->state],
909                                call->flags, call->events);
910                         if (!skb_queue_empty(&call->rx_queue))
911                                 pr_err("Rx queue occupied\n");
912                         if (!skb_queue_empty(&call->rx_oos_queue))
913                                 pr_err("OOS queue occupied\n");
914                         break;
915                 }
916
917                 write_unlock_bh(&rxrpc_call_lock);
918                 cond_resched();
919                 write_lock_bh(&rxrpc_call_lock);
920         }
921
922         write_unlock_bh(&rxrpc_call_lock);
923         _leave("");
924 }
925
926 /*
927  * handle call lifetime being exceeded
928  */
929 static void rxrpc_call_life_expired(unsigned long _call)
930 {
931         struct rxrpc_call *call = (struct rxrpc_call *) _call;
932
933         if (call->state >= RXRPC_CALL_COMPLETE)
934                 return;
935
936         _enter("{%d}", call->debug_id);
937         read_lock_bh(&call->state_lock);
938         if (call->state < RXRPC_CALL_COMPLETE) {
939                 set_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events);
940                 rxrpc_queue_call(call);
941         }
942         read_unlock_bh(&call->state_lock);
943 }
944
945 /*
946  * handle resend timer expiry
947  * - may not take call->state_lock as this can deadlock against del_timer_sync()
948  */
949 static void rxrpc_resend_time_expired(unsigned long _call)
950 {
951         struct rxrpc_call *call = (struct rxrpc_call *) _call;
952
953         _enter("{%d}", call->debug_id);
954
955         if (call->state >= RXRPC_CALL_COMPLETE)
956                 return;
957
958         clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
959         if (!test_and_set_bit(RXRPC_CALL_EV_RESEND_TIMER, &call->events))
960                 rxrpc_queue_call(call);
961 }
962
963 /*
964  * handle ACK timer expiry
965  */
966 static void rxrpc_ack_time_expired(unsigned long _call)
967 {
968         struct rxrpc_call *call = (struct rxrpc_call *) _call;
969
970         _enter("{%d}", call->debug_id);
971
972         if (call->state >= RXRPC_CALL_COMPLETE)
973                 return;
974
975         read_lock_bh(&call->state_lock);
976         if (call->state < RXRPC_CALL_COMPLETE &&
977             !test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events))
978                 rxrpc_queue_call(call);
979         read_unlock_bh(&call->state_lock);
980 }