Merge tag 'rxrpc-rewrite-20160907-2' of git://git.kernel.org/pub/scm/linux/kernel...
[cascardo/linux.git] / net / rxrpc / call_object.c
1 /* RxRPC individual remote procedure call handling
2  *
3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  */
11
12 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
13
14 #include <linux/slab.h>
15 #include <linux/module.h>
16 #include <linux/circ_buf.h>
17 #include <linux/spinlock_types.h>
18 #include <net/sock.h>
19 #include <net/af_rxrpc.h>
20 #include "ar-internal.h"
21
22 /*
23  * Maximum lifetime of a call (in jiffies).
24  */
25 unsigned int rxrpc_max_call_lifetime = 60 * HZ;
26
27 const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = {
28         [RXRPC_CALL_UNINITIALISED]              = "Uninit  ",
29         [RXRPC_CALL_CLIENT_AWAIT_CONN]          = "ClWtConn",
30         [RXRPC_CALL_CLIENT_SEND_REQUEST]        = "ClSndReq",
31         [RXRPC_CALL_CLIENT_AWAIT_REPLY]         = "ClAwtRpl",
32         [RXRPC_CALL_CLIENT_RECV_REPLY]          = "ClRcvRpl",
33         [RXRPC_CALL_CLIENT_FINAL_ACK]           = "ClFnlACK",
34         [RXRPC_CALL_SERVER_SECURING]            = "SvSecure",
35         [RXRPC_CALL_SERVER_ACCEPTING]           = "SvAccept",
36         [RXRPC_CALL_SERVER_RECV_REQUEST]        = "SvRcvReq",
37         [RXRPC_CALL_SERVER_ACK_REQUEST]         = "SvAckReq",
38         [RXRPC_CALL_SERVER_SEND_REPLY]          = "SvSndRpl",
39         [RXRPC_CALL_SERVER_AWAIT_ACK]           = "SvAwtACK",
40         [RXRPC_CALL_COMPLETE]                   = "Complete",
41 };
42
43 const char *const rxrpc_call_completions[NR__RXRPC_CALL_COMPLETIONS] = {
44         [RXRPC_CALL_SUCCEEDED]                  = "Complete",
45         [RXRPC_CALL_SERVER_BUSY]                = "SvBusy  ",
46         [RXRPC_CALL_REMOTELY_ABORTED]           = "RmtAbort",
47         [RXRPC_CALL_LOCALLY_ABORTED]            = "LocAbort",
48         [RXRPC_CALL_LOCAL_ERROR]                = "LocError",
49         [RXRPC_CALL_NETWORK_ERROR]              = "NetError",
50 };
51
52 const char rxrpc_call_traces[rxrpc_call__nr_trace][4] = {
53         [rxrpc_call_new_client]         = "NWc",
54         [rxrpc_call_new_service]        = "NWs",
55         [rxrpc_call_queued]             = "QUE",
56         [rxrpc_call_queued_ref]         = "QUR",
57         [rxrpc_call_seen]               = "SEE",
58         [rxrpc_call_got]                = "GOT",
59         [rxrpc_call_got_skb]            = "Gsk",
60         [rxrpc_call_got_userid]         = "Gus",
61         [rxrpc_call_put]                = "PUT",
62         [rxrpc_call_put_skb]            = "Psk",
63         [rxrpc_call_put_userid]         = "Pus",
64         [rxrpc_call_put_noqueue]        = "PNQ",
65 };
66
67 struct kmem_cache *rxrpc_call_jar;
68 LIST_HEAD(rxrpc_calls);
69 DEFINE_RWLOCK(rxrpc_call_lock);
70
71 static void rxrpc_call_life_expired(unsigned long _call);
72 static void rxrpc_ack_time_expired(unsigned long _call);
73 static void rxrpc_resend_time_expired(unsigned long _call);
74 static void rxrpc_cleanup_call(struct rxrpc_call *call);
75
76 /*
77  * find an extant server call
78  * - called in process context with IRQs enabled
79  */
80 struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *rx,
81                                               unsigned long user_call_ID)
82 {
83         struct rxrpc_call *call;
84         struct rb_node *p;
85
86         _enter("%p,%lx", rx, user_call_ID);
87
88         read_lock(&rx->call_lock);
89
90         p = rx->calls.rb_node;
91         while (p) {
92                 call = rb_entry(p, struct rxrpc_call, sock_node);
93
94                 if (user_call_ID < call->user_call_ID)
95                         p = p->rb_left;
96                 else if (user_call_ID > call->user_call_ID)
97                         p = p->rb_right;
98                 else
99                         goto found_extant_call;
100         }
101
102         read_unlock(&rx->call_lock);
103         _leave(" = NULL");
104         return NULL;
105
106 found_extant_call:
107         rxrpc_get_call(call, rxrpc_call_got);
108         read_unlock(&rx->call_lock);
109         _leave(" = %p [%d]", call, atomic_read(&call->usage));
110         return call;
111 }
112
113 /*
114  * allocate a new call
115  */
116 static struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
117 {
118         struct rxrpc_call *call;
119
120         call = kmem_cache_zalloc(rxrpc_call_jar, gfp);
121         if (!call)
122                 return NULL;
123
124         call->acks_winsz = 16;
125         call->acks_window = kmalloc(call->acks_winsz * sizeof(unsigned long),
126                                     gfp);
127         if (!call->acks_window) {
128                 kmem_cache_free(rxrpc_call_jar, call);
129                 return NULL;
130         }
131
132         setup_timer(&call->lifetimer, &rxrpc_call_life_expired,
133                     (unsigned long) call);
134         setup_timer(&call->ack_timer, &rxrpc_ack_time_expired,
135                     (unsigned long) call);
136         setup_timer(&call->resend_timer, &rxrpc_resend_time_expired,
137                     (unsigned long) call);
138         INIT_WORK(&call->processor, &rxrpc_process_call);
139         INIT_LIST_HEAD(&call->link);
140         INIT_LIST_HEAD(&call->chan_wait_link);
141         INIT_LIST_HEAD(&call->accept_link);
142         skb_queue_head_init(&call->rx_queue);
143         skb_queue_head_init(&call->rx_oos_queue);
144         skb_queue_head_init(&call->knlrecv_queue);
145         init_waitqueue_head(&call->waitq);
146         spin_lock_init(&call->lock);
147         rwlock_init(&call->state_lock);
148         atomic_set(&call->usage, 1);
149         call->debug_id = atomic_inc_return(&rxrpc_debug_id);
150
151         memset(&call->sock_node, 0xed, sizeof(call->sock_node));
152
153         call->rx_data_expect = 1;
154         call->rx_data_eaten = 0;
155         call->rx_first_oos = 0;
156         call->ackr_win_top = call->rx_data_eaten + 1 + rxrpc_rx_window_size;
157         call->creation_jif = jiffies;
158         return call;
159 }
160
161 /*
162  * Allocate a new client call.
163  */
164 static struct rxrpc_call *rxrpc_alloc_client_call(struct rxrpc_sock *rx,
165                                                   struct sockaddr_rxrpc *srx,
166                                                   gfp_t gfp)
167 {
168         struct rxrpc_call *call;
169
170         _enter("");
171
172         ASSERT(rx->local != NULL);
173
174         call = rxrpc_alloc_call(gfp);
175         if (!call)
176                 return ERR_PTR(-ENOMEM);
177         call->state = RXRPC_CALL_CLIENT_AWAIT_CONN;
178         call->rx_data_post = 1;
179         call->service_id = srx->srx_service;
180         rcu_assign_pointer(call->socket, rx);
181
182         _leave(" = %p", call);
183         return call;
184 }
185
186 /*
187  * Begin client call.
188  */
189 static int rxrpc_begin_client_call(struct rxrpc_call *call,
190                                    struct rxrpc_conn_parameters *cp,
191                                    struct sockaddr_rxrpc *srx,
192                                    gfp_t gfp)
193 {
194         int ret;
195
196         /* Set up or get a connection record and set the protocol parameters,
197          * including channel number and call ID.
198          */
199         ret = rxrpc_connect_call(call, cp, srx, gfp);
200         if (ret < 0)
201                 return ret;
202
203         spin_lock(&call->conn->params.peer->lock);
204         hlist_add_head(&call->error_link, &call->conn->params.peer->error_targets);
205         spin_unlock(&call->conn->params.peer->lock);
206
207         call->lifetimer.expires = jiffies + rxrpc_max_call_lifetime;
208         add_timer(&call->lifetimer);
209         return 0;
210 }
211
212 /*
213  * set up a call for the given data
214  * - called in process context with IRQs enabled
215  */
216 struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
217                                          struct rxrpc_conn_parameters *cp,
218                                          struct sockaddr_rxrpc *srx,
219                                          unsigned long user_call_ID,
220                                          gfp_t gfp)
221 {
222         struct rxrpc_call *call, *xcall;
223         struct rb_node *parent, **pp;
224         const void *here = __builtin_return_address(0);
225         int ret;
226
227         _enter("%p,%lx", rx, user_call_ID);
228
229         call = rxrpc_alloc_client_call(rx, srx, gfp);
230         if (IS_ERR(call)) {
231                 _leave(" = %ld", PTR_ERR(call));
232                 return call;
233         }
234
235         trace_rxrpc_call(call, rxrpc_call_new_client,
236                          atomic_read(&call->usage), 0,
237                          here, (const void *)user_call_ID);
238
239         /* Publish the call, even though it is incompletely set up as yet */
240         call->user_call_ID = user_call_ID;
241         __set_bit(RXRPC_CALL_HAS_USERID, &call->flags);
242
243         write_lock(&rx->call_lock);
244
245         pp = &rx->calls.rb_node;
246         parent = NULL;
247         while (*pp) {
248                 parent = *pp;
249                 xcall = rb_entry(parent, struct rxrpc_call, sock_node);
250
251                 if (user_call_ID < xcall->user_call_ID)
252                         pp = &(*pp)->rb_left;
253                 else if (user_call_ID > xcall->user_call_ID)
254                         pp = &(*pp)->rb_right;
255                 else
256                         goto found_user_ID_now_present;
257         }
258
259         rxrpc_get_call(call, rxrpc_call_got_userid);
260         rb_link_node(&call->sock_node, parent, pp);
261         rb_insert_color(&call->sock_node, &rx->calls);
262         write_unlock(&rx->call_lock);
263
264         write_lock_bh(&rxrpc_call_lock);
265         list_add_tail(&call->link, &rxrpc_calls);
266         write_unlock_bh(&rxrpc_call_lock);
267
268         ret = rxrpc_begin_client_call(call, cp, srx, gfp);
269         if (ret < 0)
270                 goto error;
271
272         _net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);
273
274         _leave(" = %p [new]", call);
275         return call;
276
277 error:
278         write_lock(&rx->call_lock);
279         rb_erase(&call->sock_node, &rx->calls);
280         write_unlock(&rx->call_lock);
281         rxrpc_put_call(call, rxrpc_call_put_userid);
282
283         write_lock_bh(&rxrpc_call_lock);
284         list_del_init(&call->link);
285         write_unlock_bh(&rxrpc_call_lock);
286
287 error_out:
288         __rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
289                                     RX_CALL_DEAD, ret);
290         set_bit(RXRPC_CALL_RELEASED, &call->flags);
291         rxrpc_put_call(call, rxrpc_call_put);
292         _leave(" = %d", ret);
293         return ERR_PTR(ret);
294
295         /* We unexpectedly found the user ID in the list after taking
296          * the call_lock.  This shouldn't happen unless the user races
297          * with itself and tries to add the same user ID twice at the
298          * same time in different threads.
299          */
300 found_user_ID_now_present:
301         write_unlock(&rx->call_lock);
302         ret = -EEXIST;
303         goto error_out;
304 }
305
306 /*
307  * set up an incoming call
308  * - called in process context with IRQs enabled
309  */
310 struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
311                                        struct rxrpc_connection *conn,
312                                        struct sk_buff *skb)
313 {
314         struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
315         struct rxrpc_call *call, *candidate;
316         const void *here = __builtin_return_address(0);
317         u32 call_id, chan;
318
319         _enter(",%d", conn->debug_id);
320
321         ASSERT(rx != NULL);
322
323         candidate = rxrpc_alloc_call(GFP_NOIO);
324         if (!candidate)
325                 return ERR_PTR(-EBUSY);
326
327         trace_rxrpc_call(candidate, rxrpc_call_new_service,
328                          atomic_read(&candidate->usage), 0, here, NULL);
329
330         chan = sp->hdr.cid & RXRPC_CHANNELMASK;
331         candidate->conn         = conn;
332         candidate->peer         = conn->params.peer;
333         candidate->cid          = sp->hdr.cid;
334         candidate->call_id      = sp->hdr.callNumber;
335         candidate->security_ix  = sp->hdr.securityIndex;
336         candidate->rx_data_post = 0;
337         candidate->state        = RXRPC_CALL_SERVER_ACCEPTING;
338         candidate->flags        |= (1 << RXRPC_CALL_IS_SERVICE);
339         if (conn->security_ix > 0)
340                 candidate->state = RXRPC_CALL_SERVER_SECURING;
341         rcu_assign_pointer(candidate->socket, rx);
342
343         spin_lock(&conn->channel_lock);
344
345         /* set the channel for this call */
346         call = rcu_dereference_protected(conn->channels[chan].call,
347                                          lockdep_is_held(&conn->channel_lock));
348
349         _debug("channel[%u] is %p", candidate->cid & RXRPC_CHANNELMASK, call);
350         if (call && call->call_id == sp->hdr.callNumber) {
351                 /* already set; must've been a duplicate packet */
352                 _debug("extant call [%d]", call->state);
353                 ASSERTCMP(call->conn, ==, conn);
354
355                 read_lock(&call->state_lock);
356                 switch (call->state) {
357                 case RXRPC_CALL_LOCALLY_ABORTED:
358                         if (!test_and_set_bit(RXRPC_CALL_EV_ABORT, &call->events))
359                                 rxrpc_queue_call(call);
360                 case RXRPC_CALL_REMOTELY_ABORTED:
361                         read_unlock(&call->state_lock);
362                         goto aborted_call;
363                 default:
364                         rxrpc_get_call(call, rxrpc_call_got);
365                         read_unlock(&call->state_lock);
366                         goto extant_call;
367                 }
368         }
369
370         if (call) {
371                 /* it seems the channel is still in use from the previous call
372                  * - ditch the old binding if its call is now complete */
373                 _debug("CALL: %u { %s }",
374                        call->debug_id, rxrpc_call_states[call->state]);
375
376                 if (call->state == RXRPC_CALL_COMPLETE) {
377                         __rxrpc_disconnect_call(conn, call);
378                 } else {
379                         spin_unlock(&conn->channel_lock);
380                         kmem_cache_free(rxrpc_call_jar, candidate);
381                         _leave(" = -EBUSY");
382                         return ERR_PTR(-EBUSY);
383                 }
384         }
385
386         /* check the call number isn't duplicate */
387         _debug("check dup");
388         call_id = sp->hdr.callNumber;
389
390         /* We just ignore calls prior to the current call ID.  Terminated calls
391          * are handled via the connection.
392          */
393         if (call_id <= conn->channels[chan].call_counter)
394                 goto old_call; /* TODO: Just drop packet */
395
396         /* make the call available */
397         _debug("new call");
398         call = candidate;
399         candidate = NULL;
400         conn->channels[chan].call_counter = call_id;
401         rcu_assign_pointer(conn->channels[chan].call, call);
402         rxrpc_get_connection(conn);
403         rxrpc_get_peer(call->peer);
404         spin_unlock(&conn->channel_lock);
405
406         spin_lock(&conn->params.peer->lock);
407         hlist_add_head(&call->error_link, &conn->params.peer->error_targets);
408         spin_unlock(&conn->params.peer->lock);
409
410         write_lock_bh(&rxrpc_call_lock);
411         list_add_tail(&call->link, &rxrpc_calls);
412         write_unlock_bh(&rxrpc_call_lock);
413
414         call->service_id = conn->params.service_id;
415
416         _net("CALL incoming %d on CONN %d", call->debug_id, call->conn->debug_id);
417
418         call->lifetimer.expires = jiffies + rxrpc_max_call_lifetime;
419         add_timer(&call->lifetimer);
420         _leave(" = %p {%d} [new]", call, call->debug_id);
421         return call;
422
423 extant_call:
424         spin_unlock(&conn->channel_lock);
425         kmem_cache_free(rxrpc_call_jar, candidate);
426         _leave(" = %p {%d} [extant]", call, call ? call->debug_id : -1);
427         return call;
428
429 aborted_call:
430         spin_unlock(&conn->channel_lock);
431         kmem_cache_free(rxrpc_call_jar, candidate);
432         _leave(" = -ECONNABORTED");
433         return ERR_PTR(-ECONNABORTED);
434
435 old_call:
436         spin_unlock(&conn->channel_lock);
437         kmem_cache_free(rxrpc_call_jar, candidate);
438         _leave(" = -ECONNRESET [old]");
439         return ERR_PTR(-ECONNRESET);
440 }
441
442 /*
443  * Queue a call's work processor, getting a ref to pass to the work queue.
444  */
445 bool rxrpc_queue_call(struct rxrpc_call *call)
446 {
447         const void *here = __builtin_return_address(0);
448         int n = __atomic_add_unless(&call->usage, 1, 0);
449         int m = atomic_read(&call->skb_count);
450         if (n == 0)
451                 return false;
452         if (rxrpc_queue_work(&call->processor))
453                 trace_rxrpc_call(call, rxrpc_call_queued, n + 1, m, here, NULL);
454         else
455                 rxrpc_put_call(call, rxrpc_call_put_noqueue);
456         return true;
457 }
458
459 /*
460  * Queue a call's work processor, passing the callers ref to the work queue.
461  */
462 bool __rxrpc_queue_call(struct rxrpc_call *call)
463 {
464         const void *here = __builtin_return_address(0);
465         int n = atomic_read(&call->usage);
466         int m = atomic_read(&call->skb_count);
467         ASSERTCMP(n, >=, 1);
468         if (rxrpc_queue_work(&call->processor))
469                 trace_rxrpc_call(call, rxrpc_call_queued_ref, n, m, here, NULL);
470         else
471                 rxrpc_put_call(call, rxrpc_call_put_noqueue);
472         return true;
473 }
474
475 /*
476  * Note the re-emergence of a call.
477  */
478 void rxrpc_see_call(struct rxrpc_call *call)
479 {
480         const void *here = __builtin_return_address(0);
481         if (call) {
482                 int n = atomic_read(&call->usage);
483                 int m = atomic_read(&call->skb_count);
484
485                 trace_rxrpc_call(call, rxrpc_call_seen, n, m, here, NULL);
486         }
487 }
488
489 /*
490  * Note the addition of a ref on a call.
491  */
492 void rxrpc_get_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
493 {
494         const void *here = __builtin_return_address(0);
495         int n = atomic_inc_return(&call->usage);
496         int m = atomic_read(&call->skb_count);
497
498         trace_rxrpc_call(call, op, n, m, here, NULL);
499 }
500
501 /*
502  * Note the addition of a ref on a call for a socket buffer.
503  */
504 void rxrpc_get_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb)
505 {
506         const void *here = __builtin_return_address(0);
507         int n = atomic_inc_return(&call->usage);
508         int m = atomic_inc_return(&call->skb_count);
509
510         trace_rxrpc_call(call, rxrpc_call_got_skb, n, m, here, skb);
511 }
512
513 /*
514  * detach a call from a socket and set up for release
515  */
516 void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
517 {
518         _enter("{%d,%d,%d,%d}",
519                call->debug_id, atomic_read(&call->usage),
520                atomic_read(&call->ackr_not_idle),
521                call->rx_first_oos);
522
523         rxrpc_see_call(call);
524
525         spin_lock_bh(&call->lock);
526         if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
527                 BUG();
528         spin_unlock_bh(&call->lock);
529
530         /* dissociate from the socket
531          * - the socket's ref on the call is passed to the death timer
532          */
533         _debug("RELEASE CALL %p (%d)", call, call->debug_id);
534
535         if (call->peer) {
536                 spin_lock(&call->peer->lock);
537                 hlist_del_init(&call->error_link);
538                 spin_unlock(&call->peer->lock);
539         }
540
541         write_lock_bh(&rx->call_lock);
542         if (!list_empty(&call->accept_link)) {
543                 _debug("unlinking once-pending call %p { e=%lx f=%lx }",
544                        call, call->events, call->flags);
545                 ASSERT(!test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
546                 list_del_init(&call->accept_link);
547                 sk_acceptq_removed(&rx->sk);
548         } else if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
549                 rb_erase(&call->sock_node, &rx->calls);
550                 memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
551                 clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
552                 rxrpc_put_call(call, rxrpc_call_put_userid);
553         }
554         write_unlock_bh(&rx->call_lock);
555
556         /* free up the channel for reuse */
557         if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK) {
558                 clear_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events);
559                 rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ACK);
560                 rxrpc_call_completed(call);
561         } else {
562                 write_lock_bh(&call->state_lock);
563
564                 if (call->state < RXRPC_CALL_COMPLETE) {
565                         _debug("+++ ABORTING STATE %d +++\n", call->state);
566                         __rxrpc_abort_call("SKT", call, 0, RX_CALL_DEAD, ECONNRESET);
567                         clear_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events);
568                         rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ABORT);
569                 }
570
571                 write_unlock_bh(&call->state_lock);
572         }
573
574         if (call->conn)
575                 rxrpc_disconnect_call(call);
576
577         /* clean up the Rx queue */
578         if (!skb_queue_empty(&call->rx_queue) ||
579             !skb_queue_empty(&call->rx_oos_queue)) {
580                 struct rxrpc_skb_priv *sp;
581                 struct sk_buff *skb;
582
583                 _debug("purge Rx queues");
584
585                 spin_lock_bh(&call->lock);
586                 while ((skb = skb_dequeue(&call->rx_queue)) ||
587                        (skb = skb_dequeue(&call->rx_oos_queue))) {
588                         spin_unlock_bh(&call->lock);
589
590                         sp = rxrpc_skb(skb);
591                         _debug("- zap %s %%%u #%u",
592                                rxrpc_pkts[sp->hdr.type],
593                                sp->hdr.serial, sp->hdr.seq);
594                         rxrpc_free_skb(skb);
595                         spin_lock_bh(&call->lock);
596                 }
597                 spin_unlock_bh(&call->lock);
598         }
599         rxrpc_purge_queue(&call->knlrecv_queue);
600
601         del_timer_sync(&call->resend_timer);
602         del_timer_sync(&call->ack_timer);
603         del_timer_sync(&call->lifetimer);
604
605         _leave("");
606 }
607
608 /*
609  * release all the calls associated with a socket
610  */
611 void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
612 {
613         struct rxrpc_call *call;
614         struct rb_node *p;
615
616         _enter("%p", rx);
617
618         read_lock_bh(&rx->call_lock);
619
620         /* kill the not-yet-accepted incoming calls */
621         list_for_each_entry(call, &rx->secureq, accept_link) {
622                 rxrpc_release_call(rx, call);
623         }
624
625         list_for_each_entry(call, &rx->acceptq, accept_link) {
626                 rxrpc_release_call(rx, call);
627         }
628
629         /* mark all the calls as no longer wanting incoming packets */
630         for (p = rb_first(&rx->calls); p; p = rb_next(p)) {
631                 call = rb_entry(p, struct rxrpc_call, sock_node);
632                 rxrpc_release_call(rx, call);
633         }
634
635         read_unlock_bh(&rx->call_lock);
636         _leave("");
637 }
638
639 /*
640  * release a call
641  */
642 void rxrpc_put_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
643 {
644         const void *here = __builtin_return_address(0);
645         int n, m;
646
647         ASSERT(call != NULL);
648
649         n = atomic_dec_return(&call->usage);
650         m = atomic_read(&call->skb_count);
651         trace_rxrpc_call(call, op, n, m, here, NULL);
652         ASSERTCMP(n, >=, 0);
653         if (n == 0) {
654                 _debug("call %d dead", call->debug_id);
655                 WARN_ON(m != 0);
656                 rxrpc_cleanup_call(call);
657         }
658 }
659
660 /*
661  * Release a call ref held by a socket buffer.
662  */
663 void rxrpc_put_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb)
664 {
665         const void *here = __builtin_return_address(0);
666         int n, m;
667
668         n = atomic_dec_return(&call->usage);
669         m = atomic_dec_return(&call->skb_count);
670         trace_rxrpc_call(call, rxrpc_call_put_skb, n, m, here, skb);
671         ASSERTCMP(n, >=, 0);
672         if (n == 0) {
673                 _debug("call %d dead", call->debug_id);
674                 WARN_ON(m != 0);
675                 rxrpc_cleanup_call(call);
676         }
677 }
678
679 /*
680  * Final call destruction under RCU.
681  */
682 static void rxrpc_rcu_destroy_call(struct rcu_head *rcu)
683 {
684         struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu);
685
686         rxrpc_purge_queue(&call->rx_queue);
687         rxrpc_purge_queue(&call->knlrecv_queue);
688         rxrpc_put_peer(call->peer);
689         kmem_cache_free(rxrpc_call_jar, call);
690 }
691
692 /*
693  * clean up a call
694  */
695 static void rxrpc_cleanup_call(struct rxrpc_call *call)
696 {
697         _net("DESTROY CALL %d", call->debug_id);
698
699         write_lock_bh(&rxrpc_call_lock);
700         list_del_init(&call->link);
701         write_unlock_bh(&rxrpc_call_lock);
702
703         memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
704
705         del_timer_sync(&call->lifetimer);
706         del_timer_sync(&call->ack_timer);
707         del_timer_sync(&call->resend_timer);
708
709         ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
710         ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
711         ASSERT(!work_pending(&call->processor));
712         ASSERTCMP(call->conn, ==, NULL);
713
714         if (call->acks_window) {
715                 _debug("kill Tx window %d",
716                        CIRC_CNT(call->acks_head, call->acks_tail,
717                                 call->acks_winsz));
718                 smp_mb();
719                 while (CIRC_CNT(call->acks_head, call->acks_tail,
720                                 call->acks_winsz) > 0) {
721                         struct rxrpc_skb_priv *sp;
722                         unsigned long _skb;
723
724                         _skb = call->acks_window[call->acks_tail] & ~1;
725                         sp = rxrpc_skb((struct sk_buff *)_skb);
726                         _debug("+++ clear Tx %u", sp->hdr.seq);
727                         rxrpc_free_skb((struct sk_buff *)_skb);
728                         call->acks_tail =
729                                 (call->acks_tail + 1) & (call->acks_winsz - 1);
730                 }
731
732                 kfree(call->acks_window);
733         }
734
735         rxrpc_free_skb(call->tx_pending);
736
737         rxrpc_purge_queue(&call->rx_queue);
738         ASSERT(skb_queue_empty(&call->rx_oos_queue));
739         rxrpc_purge_queue(&call->knlrecv_queue);
740         call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
741 }
742
743 /*
744  * Make sure that all calls are gone.
745  */
746 void __exit rxrpc_destroy_all_calls(void)
747 {
748         struct rxrpc_call *call;
749
750         _enter("");
751
752         if (list_empty(&rxrpc_calls))
753                 return;
754         
755         write_lock_bh(&rxrpc_call_lock);
756
757         while (!list_empty(&rxrpc_calls)) {
758                 call = list_entry(rxrpc_calls.next, struct rxrpc_call, link);
759                 _debug("Zapping call %p", call);
760
761                 rxrpc_see_call(call);
762                 list_del_init(&call->link);
763
764                 pr_err("Call %p still in use (%d,%d,%s,%lx,%lx)!\n",
765                        call, atomic_read(&call->usage),
766                        atomic_read(&call->ackr_not_idle),
767                        rxrpc_call_states[call->state],
768                        call->flags, call->events);
769                 if (!skb_queue_empty(&call->rx_queue))
770                         pr_err("Rx queue occupied\n");
771                 if (!skb_queue_empty(&call->rx_oos_queue))
772                         pr_err("OOS queue occupied\n");
773
774                 write_unlock_bh(&rxrpc_call_lock);
775                 cond_resched();
776                 write_lock_bh(&rxrpc_call_lock);
777         }
778
779         write_unlock_bh(&rxrpc_call_lock);
780         _leave("");
781 }
782
783 /*
784  * handle call lifetime being exceeded
785  */
786 static void rxrpc_call_life_expired(unsigned long _call)
787 {
788         struct rxrpc_call *call = (struct rxrpc_call *) _call;
789
790         _enter("{%d}", call->debug_id);
791
792         rxrpc_see_call(call);
793         if (call->state >= RXRPC_CALL_COMPLETE)
794                 return;
795
796         set_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events);
797         rxrpc_queue_call(call);
798 }
799
800 /*
801  * handle resend timer expiry
802  * - may not take call->state_lock as this can deadlock against del_timer_sync()
803  */
804 static void rxrpc_resend_time_expired(unsigned long _call)
805 {
806         struct rxrpc_call *call = (struct rxrpc_call *) _call;
807
808         _enter("{%d}", call->debug_id);
809
810         rxrpc_see_call(call);
811         if (call->state >= RXRPC_CALL_COMPLETE)
812                 return;
813
814         clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
815         if (!test_and_set_bit(RXRPC_CALL_EV_RESEND_TIMER, &call->events))
816                 rxrpc_queue_call(call);
817 }
818
819 /*
820  * handle ACK timer expiry
821  */
822 static void rxrpc_ack_time_expired(unsigned long _call)
823 {
824         struct rxrpc_call *call = (struct rxrpc_call *) _call;
825
826         _enter("{%d}", call->debug_id);
827
828         rxrpc_see_call(call);
829         if (call->state >= RXRPC_CALL_COMPLETE)
830                 return;
831
832         if (!test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events))
833                 rxrpc_queue_call(call);
834 }