rxrpc: Improve the call tracking tracepoint
[cascardo/linux.git] / net / rxrpc / call_object.c
1 /* RxRPC individual remote procedure call handling
2  *
3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  */
11
12 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
13
14 #include <linux/slab.h>
15 #include <linux/module.h>
16 #include <linux/circ_buf.h>
17 #include <linux/spinlock_types.h>
18 #include <net/sock.h>
19 #include <net/af_rxrpc.h>
20 #include "ar-internal.h"
21
22 /*
23  * Maximum lifetime of a call (in jiffies).
24  */
25 unsigned int rxrpc_max_call_lifetime = 60 * HZ;
26
27 /*
28  * Time till dead call expires after last use (in jiffies).
29  */
30 unsigned int rxrpc_dead_call_expiry = 2 * HZ;
31
32 const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = {
33         [RXRPC_CALL_UNINITIALISED]              = "Uninit  ",
34         [RXRPC_CALL_CLIENT_AWAIT_CONN]          = "ClWtConn",
35         [RXRPC_CALL_CLIENT_SEND_REQUEST]        = "ClSndReq",
36         [RXRPC_CALL_CLIENT_AWAIT_REPLY]         = "ClAwtRpl",
37         [RXRPC_CALL_CLIENT_RECV_REPLY]          = "ClRcvRpl",
38         [RXRPC_CALL_CLIENT_FINAL_ACK]           = "ClFnlACK",
39         [RXRPC_CALL_SERVER_SECURING]            = "SvSecure",
40         [RXRPC_CALL_SERVER_ACCEPTING]           = "SvAccept",
41         [RXRPC_CALL_SERVER_RECV_REQUEST]        = "SvRcvReq",
42         [RXRPC_CALL_SERVER_ACK_REQUEST]         = "SvAckReq",
43         [RXRPC_CALL_SERVER_SEND_REPLY]          = "SvSndRpl",
44         [RXRPC_CALL_SERVER_AWAIT_ACK]           = "SvAwtACK",
45         [RXRPC_CALL_COMPLETE]                   = "Complete",
46         [RXRPC_CALL_DEAD]                       = "Dead    ",
47 };
48
49 const char *const rxrpc_call_completions[NR__RXRPC_CALL_COMPLETIONS] = {
50         [RXRPC_CALL_SUCCEEDED]                  = "Complete",
51         [RXRPC_CALL_SERVER_BUSY]                = "SvBusy  ",
52         [RXRPC_CALL_REMOTELY_ABORTED]           = "RmtAbort",
53         [RXRPC_CALL_LOCALLY_ABORTED]            = "LocAbort",
54         [RXRPC_CALL_LOCAL_ERROR]                = "LocError",
55         [RXRPC_CALL_NETWORK_ERROR]              = "NetError",
56 };
57
58 const char rxrpc_call_traces[rxrpc_call__nr_trace][4] = {
59         [rxrpc_call_new_client]         = "NWc",
60         [rxrpc_call_new_service]        = "NWs",
61         [rxrpc_call_queued]             = "QUE",
62         [rxrpc_call_queued_ref]         = "QUR",
63         [rxrpc_call_seen]               = "SEE",
64         [rxrpc_call_got]                = "GOT",
65         [rxrpc_call_got_skb]            = "Gsk",
66         [rxrpc_call_got_userid]         = "Gus",
67         [rxrpc_call_put]                = "PUT",
68         [rxrpc_call_put_skb]            = "Psk",
69         [rxrpc_call_put_userid]         = "Pus",
70         [rxrpc_call_put_noqueue]        = "PNQ",
71 };
72
73 struct kmem_cache *rxrpc_call_jar;
74 LIST_HEAD(rxrpc_calls);
75 DEFINE_RWLOCK(rxrpc_call_lock);
76
77 static void rxrpc_destroy_call(struct work_struct *work);
78 static void rxrpc_call_life_expired(unsigned long _call);
79 static void rxrpc_dead_call_expired(unsigned long _call);
80 static void rxrpc_ack_time_expired(unsigned long _call);
81 static void rxrpc_resend_time_expired(unsigned long _call);
82
83 /*
84  * find an extant server call
85  * - called in process context with IRQs enabled
86  */
87 struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *rx,
88                                               unsigned long user_call_ID)
89 {
90         struct rxrpc_call *call;
91         struct rb_node *p;
92
93         _enter("%p,%lx", rx, user_call_ID);
94
95         read_lock(&rx->call_lock);
96
97         p = rx->calls.rb_node;
98         while (p) {
99                 call = rb_entry(p, struct rxrpc_call, sock_node);
100
101                 if (user_call_ID < call->user_call_ID)
102                         p = p->rb_left;
103                 else if (user_call_ID > call->user_call_ID)
104                         p = p->rb_right;
105                 else
106                         goto found_extant_call;
107         }
108
109         read_unlock(&rx->call_lock);
110         _leave(" = NULL");
111         return NULL;
112
113 found_extant_call:
114         rxrpc_get_call(call, rxrpc_call_got);
115         read_unlock(&rx->call_lock);
116         _leave(" = %p [%d]", call, atomic_read(&call->usage));
117         return call;
118 }
119
120 /*
121  * allocate a new call
122  */
123 static struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
124 {
125         struct rxrpc_call *call;
126
127         call = kmem_cache_zalloc(rxrpc_call_jar, gfp);
128         if (!call)
129                 return NULL;
130
131         call->acks_winsz = 16;
132         call->acks_window = kmalloc(call->acks_winsz * sizeof(unsigned long),
133                                     gfp);
134         if (!call->acks_window) {
135                 kmem_cache_free(rxrpc_call_jar, call);
136                 return NULL;
137         }
138
139         setup_timer(&call->lifetimer, &rxrpc_call_life_expired,
140                     (unsigned long) call);
141         setup_timer(&call->deadspan, &rxrpc_dead_call_expired,
142                     (unsigned long) call);
143         setup_timer(&call->ack_timer, &rxrpc_ack_time_expired,
144                     (unsigned long) call);
145         setup_timer(&call->resend_timer, &rxrpc_resend_time_expired,
146                     (unsigned long) call);
147         INIT_WORK(&call->destroyer, &rxrpc_destroy_call);
148         INIT_WORK(&call->processor, &rxrpc_process_call);
149         INIT_LIST_HEAD(&call->link);
150         INIT_LIST_HEAD(&call->chan_wait_link);
151         INIT_LIST_HEAD(&call->accept_link);
152         skb_queue_head_init(&call->rx_queue);
153         skb_queue_head_init(&call->rx_oos_queue);
154         skb_queue_head_init(&call->knlrecv_queue);
155         init_waitqueue_head(&call->waitq);
156         spin_lock_init(&call->lock);
157         rwlock_init(&call->state_lock);
158         atomic_set(&call->usage, 1);
159         call->debug_id = atomic_inc_return(&rxrpc_debug_id);
160
161         memset(&call->sock_node, 0xed, sizeof(call->sock_node));
162
163         call->rx_data_expect = 1;
164         call->rx_data_eaten = 0;
165         call->rx_first_oos = 0;
166         call->ackr_win_top = call->rx_data_eaten + 1 + rxrpc_rx_window_size;
167         call->creation_jif = jiffies;
168         return call;
169 }
170
171 /*
172  * Allocate a new client call.
173  */
174 static struct rxrpc_call *rxrpc_alloc_client_call(struct rxrpc_sock *rx,
175                                                   struct sockaddr_rxrpc *srx,
176                                                   gfp_t gfp)
177 {
178         struct rxrpc_call *call;
179
180         _enter("");
181
182         ASSERT(rx->local != NULL);
183
184         call = rxrpc_alloc_call(gfp);
185         if (!call)
186                 return ERR_PTR(-ENOMEM);
187         call->state = RXRPC_CALL_CLIENT_AWAIT_CONN;
188
189         sock_hold(&rx->sk);
190         call->socket = rx;
191         call->rx_data_post = 1;
192         call->service_id = srx->srx_service;
193
194         _leave(" = %p", call);
195         return call;
196 }
197
198 /*
199  * Begin client call.
200  */
201 static int rxrpc_begin_client_call(struct rxrpc_call *call,
202                                    struct rxrpc_conn_parameters *cp,
203                                    struct sockaddr_rxrpc *srx,
204                                    gfp_t gfp)
205 {
206         int ret;
207
208         /* Set up or get a connection record and set the protocol parameters,
209          * including channel number and call ID.
210          */
211         ret = rxrpc_connect_call(call, cp, srx, gfp);
212         if (ret < 0)
213                 return ret;
214
215         spin_lock(&call->conn->params.peer->lock);
216         hlist_add_head(&call->error_link, &call->conn->params.peer->error_targets);
217         spin_unlock(&call->conn->params.peer->lock);
218
219         call->lifetimer.expires = jiffies + rxrpc_max_call_lifetime;
220         add_timer(&call->lifetimer);
221         return 0;
222 }
223
224 /*
225  * set up a call for the given data
226  * - called in process context with IRQs enabled
227  */
228 struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
229                                          struct rxrpc_conn_parameters *cp,
230                                          struct sockaddr_rxrpc *srx,
231                                          unsigned long user_call_ID,
232                                          gfp_t gfp)
233 {
234         struct rxrpc_call *call, *xcall;
235         struct rb_node *parent, **pp;
236         const void *here = __builtin_return_address(0);
237         int ret;
238
239         _enter("%p,%lx", rx, user_call_ID);
240
241         call = rxrpc_alloc_client_call(rx, srx, gfp);
242         if (IS_ERR(call)) {
243                 _leave(" = %ld", PTR_ERR(call));
244                 return call;
245         }
246
247         trace_rxrpc_call(call, 0, atomic_read(&call->usage), 0, here,
248                          (const void *)user_call_ID);
249
250         /* Publish the call, even though it is incompletely set up as yet */
251         call->user_call_ID = user_call_ID;
252         __set_bit(RXRPC_CALL_HAS_USERID, &call->flags);
253
254         write_lock(&rx->call_lock);
255
256         pp = &rx->calls.rb_node;
257         parent = NULL;
258         while (*pp) {
259                 parent = *pp;
260                 xcall = rb_entry(parent, struct rxrpc_call, sock_node);
261
262                 if (user_call_ID < xcall->user_call_ID)
263                         pp = &(*pp)->rb_left;
264                 else if (user_call_ID > xcall->user_call_ID)
265                         pp = &(*pp)->rb_right;
266                 else
267                         goto found_user_ID_now_present;
268         }
269
270         rxrpc_get_call(call, rxrpc_call_got_userid);
271         rb_link_node(&call->sock_node, parent, pp);
272         rb_insert_color(&call->sock_node, &rx->calls);
273         write_unlock(&rx->call_lock);
274
275         write_lock_bh(&rxrpc_call_lock);
276         list_add_tail(&call->link, &rxrpc_calls);
277         write_unlock_bh(&rxrpc_call_lock);
278
279         ret = rxrpc_begin_client_call(call, cp, srx, gfp);
280         if (ret < 0)
281                 goto error;
282
283         _net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);
284
285         _leave(" = %p [new]", call);
286         return call;
287
288 error:
289         write_lock(&rx->call_lock);
290         rb_erase(&call->sock_node, &rx->calls);
291         write_unlock(&rx->call_lock);
292         rxrpc_put_call(call, rxrpc_call_put_userid);
293
294         write_lock_bh(&rxrpc_call_lock);
295         list_del_init(&call->link);
296         write_unlock_bh(&rxrpc_call_lock);
297
298         set_bit(RXRPC_CALL_RELEASED, &call->flags);
299         call->state = RXRPC_CALL_DEAD;
300         rxrpc_put_call(call, rxrpc_call_put);
301         _leave(" = %d", ret);
302         return ERR_PTR(ret);
303
304         /* We unexpectedly found the user ID in the list after taking
305          * the call_lock.  This shouldn't happen unless the user races
306          * with itself and tries to add the same user ID twice at the
307          * same time in different threads.
308          */
309 found_user_ID_now_present:
310         write_unlock(&rx->call_lock);
311         set_bit(RXRPC_CALL_RELEASED, &call->flags);
312         call->state = RXRPC_CALL_DEAD;
313         rxrpc_put_call(call, rxrpc_call_put);
314         _leave(" = -EEXIST [%p]", call);
315         return ERR_PTR(-EEXIST);
316 }
317
318 /*
319  * set up an incoming call
320  * - called in process context with IRQs enabled
321  */
322 struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
323                                        struct rxrpc_connection *conn,
324                                        struct sk_buff *skb)
325 {
326         struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
327         struct rxrpc_call *call, *candidate;
328         const void *here = __builtin_return_address(0);
329         u32 call_id, chan;
330
331         _enter(",%d", conn->debug_id);
332
333         ASSERT(rx != NULL);
334
335         candidate = rxrpc_alloc_call(GFP_NOIO);
336         if (!candidate)
337                 return ERR_PTR(-EBUSY);
338
339         trace_rxrpc_call(candidate, rxrpc_call_new_service,
340                          atomic_read(&candidate->usage), 0, here, NULL);
341
342         chan = sp->hdr.cid & RXRPC_CHANNELMASK;
343         candidate->socket       = rx;
344         candidate->conn         = conn;
345         candidate->peer         = conn->params.peer;
346         candidate->cid          = sp->hdr.cid;
347         candidate->call_id      = sp->hdr.callNumber;
348         candidate->rx_data_post = 0;
349         candidate->state        = RXRPC_CALL_SERVER_ACCEPTING;
350         candidate->flags        |= (1 << RXRPC_CALL_IS_SERVICE);
351         if (conn->security_ix > 0)
352                 candidate->state = RXRPC_CALL_SERVER_SECURING;
353
354         spin_lock(&conn->channel_lock);
355
356         /* set the channel for this call */
357         call = rcu_dereference_protected(conn->channels[chan].call,
358                                          lockdep_is_held(&conn->channel_lock));
359
360         _debug("channel[%u] is %p", candidate->cid & RXRPC_CHANNELMASK, call);
361         if (call && call->call_id == sp->hdr.callNumber) {
362                 /* already set; must've been a duplicate packet */
363                 _debug("extant call [%d]", call->state);
364                 ASSERTCMP(call->conn, ==, conn);
365
366                 read_lock(&call->state_lock);
367                 switch (call->state) {
368                 case RXRPC_CALL_LOCALLY_ABORTED:
369                         if (!test_and_set_bit(RXRPC_CALL_EV_ABORT, &call->events))
370                                 rxrpc_queue_call(call);
371                 case RXRPC_CALL_REMOTELY_ABORTED:
372                         read_unlock(&call->state_lock);
373                         goto aborted_call;
374                 default:
375                         rxrpc_get_call(call, rxrpc_call_got);
376                         read_unlock(&call->state_lock);
377                         goto extant_call;
378                 }
379         }
380
381         if (call) {
382                 /* it seems the channel is still in use from the previous call
383                  * - ditch the old binding if its call is now complete */
384                 _debug("CALL: %u { %s }",
385                        call->debug_id, rxrpc_call_states[call->state]);
386
387                 if (call->state == RXRPC_CALL_COMPLETE) {
388                         __rxrpc_disconnect_call(conn, call);
389                 } else {
390                         spin_unlock(&conn->channel_lock);
391                         kmem_cache_free(rxrpc_call_jar, candidate);
392                         _leave(" = -EBUSY");
393                         return ERR_PTR(-EBUSY);
394                 }
395         }
396
397         /* check the call number isn't duplicate */
398         _debug("check dup");
399         call_id = sp->hdr.callNumber;
400
401         /* We just ignore calls prior to the current call ID.  Terminated calls
402          * are handled via the connection.
403          */
404         if (call_id <= conn->channels[chan].call_counter)
405                 goto old_call; /* TODO: Just drop packet */
406
407         /* make the call available */
408         _debug("new call");
409         call = candidate;
410         candidate = NULL;
411         conn->channels[chan].call_counter = call_id;
412         rcu_assign_pointer(conn->channels[chan].call, call);
413         sock_hold(&rx->sk);
414         rxrpc_get_connection(conn);
415         rxrpc_get_peer(call->peer);
416         spin_unlock(&conn->channel_lock);
417
418         spin_lock(&conn->params.peer->lock);
419         hlist_add_head(&call->error_link, &conn->params.peer->error_targets);
420         spin_unlock(&conn->params.peer->lock);
421
422         write_lock_bh(&rxrpc_call_lock);
423         list_add_tail(&call->link, &rxrpc_calls);
424         write_unlock_bh(&rxrpc_call_lock);
425
426         call->service_id = conn->params.service_id;
427
428         _net("CALL incoming %d on CONN %d", call->debug_id, call->conn->debug_id);
429
430         call->lifetimer.expires = jiffies + rxrpc_max_call_lifetime;
431         add_timer(&call->lifetimer);
432         _leave(" = %p {%d} [new]", call, call->debug_id);
433         return call;
434
435 extant_call:
436         spin_unlock(&conn->channel_lock);
437         kmem_cache_free(rxrpc_call_jar, candidate);
438         _leave(" = %p {%d} [extant]", call, call ? call->debug_id : -1);
439         return call;
440
441 aborted_call:
442         spin_unlock(&conn->channel_lock);
443         kmem_cache_free(rxrpc_call_jar, candidate);
444         _leave(" = -ECONNABORTED");
445         return ERR_PTR(-ECONNABORTED);
446
447 old_call:
448         spin_unlock(&conn->channel_lock);
449         kmem_cache_free(rxrpc_call_jar, candidate);
450         _leave(" = -ECONNRESET [old]");
451         return ERR_PTR(-ECONNRESET);
452 }
453
454 /*
455  * Note the re-emergence of a call.
456  */
457 void rxrpc_see_call(struct rxrpc_call *call)
458 {
459         const void *here = __builtin_return_address(0);
460         if (call) {
461                 int n = atomic_read(&call->usage);
462                 int m = atomic_read(&call->skb_count);
463
464                 trace_rxrpc_call(call, rxrpc_call_seen, n, m, here, NULL);
465         }
466 }
467
468 /*
469  * Note the addition of a ref on a call.
470  */
471 void rxrpc_get_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
472 {
473         const void *here = __builtin_return_address(0);
474         int n = atomic_inc_return(&call->usage);
475         int m = atomic_read(&call->skb_count);
476
477         trace_rxrpc_call(call, op, n, m, here, NULL);
478 }
479
480 /*
481  * Note the addition of a ref on a call for a socket buffer.
482  */
483 void rxrpc_get_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb)
484 {
485         const void *here = __builtin_return_address(0);
486         int n = atomic_inc_return(&call->usage);
487         int m = atomic_inc_return(&call->skb_count);
488
489         trace_rxrpc_call(call, rxrpc_call_got_skb, n, m, here, skb);
490 }
491
492 /*
493  * detach a call from a socket and set up for release
494  */
495 void rxrpc_release_call(struct rxrpc_call *call)
496 {
497         struct rxrpc_connection *conn = call->conn;
498         struct rxrpc_sock *rx = call->socket;
499
500         _enter("{%d,%d,%d,%d}",
501                call->debug_id, atomic_read(&call->usage),
502                atomic_read(&call->ackr_not_idle),
503                call->rx_first_oos);
504
505         rxrpc_see_call(call);
506
507         spin_lock_bh(&call->lock);
508         if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
509                 BUG();
510         spin_unlock_bh(&call->lock);
511
512         /* dissociate from the socket
513          * - the socket's ref on the call is passed to the death timer
514          */
515         _debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);
516
517         spin_lock(&conn->params.peer->lock);
518         hlist_del_init(&call->error_link);
519         spin_unlock(&conn->params.peer->lock);
520
521         write_lock_bh(&rx->call_lock);
522         if (!list_empty(&call->accept_link)) {
523                 _debug("unlinking once-pending call %p { e=%lx f=%lx }",
524                        call, call->events, call->flags);
525                 ASSERT(!test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
526                 list_del_init(&call->accept_link);
527                 sk_acceptq_removed(&rx->sk);
528         } else if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
529                 rb_erase(&call->sock_node, &rx->calls);
530                 memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
531                 clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
532         }
533         write_unlock_bh(&rx->call_lock);
534
535         /* free up the channel for reuse */
536         write_lock_bh(&call->state_lock);
537
538         if (call->state < RXRPC_CALL_COMPLETE &&
539             call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
540                 _debug("+++ ABORTING STATE %d +++\n", call->state);
541                 __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET);
542         }
543         write_unlock_bh(&call->state_lock);
544
545         rxrpc_disconnect_call(call);
546
547         /* clean up the Rx queue */
548         if (!skb_queue_empty(&call->rx_queue) ||
549             !skb_queue_empty(&call->rx_oos_queue)) {
550                 struct rxrpc_skb_priv *sp;
551                 struct sk_buff *skb;
552
553                 _debug("purge Rx queues");
554
555                 spin_lock_bh(&call->lock);
556                 while ((skb = skb_dequeue(&call->rx_queue)) ||
557                        (skb = skb_dequeue(&call->rx_oos_queue))) {
558                         spin_unlock_bh(&call->lock);
559
560                         sp = rxrpc_skb(skb);
561                         _debug("- zap %s %%%u #%u",
562                                rxrpc_pkts[sp->hdr.type],
563                                sp->hdr.serial, sp->hdr.seq);
564                         rxrpc_free_skb(skb);
565                         spin_lock_bh(&call->lock);
566                 }
567                 spin_unlock_bh(&call->lock);
568         }
569
570         del_timer_sync(&call->resend_timer);
571         del_timer_sync(&call->ack_timer);
572         del_timer_sync(&call->lifetimer);
573         call->deadspan.expires = jiffies + rxrpc_dead_call_expiry;
574         add_timer(&call->deadspan);
575
576         _leave("");
577 }
578
579 /*
580  * handle a dead call being ready for reaping
581  */
582 static void rxrpc_dead_call_expired(unsigned long _call)
583 {
584         struct rxrpc_call *call = (struct rxrpc_call *) _call;
585
586         _enter("{%d}", call->debug_id);
587
588         rxrpc_see_call(call);
589         write_lock_bh(&call->state_lock);
590         call->state = RXRPC_CALL_DEAD;
591         write_unlock_bh(&call->state_lock);
592         rxrpc_put_call(call, rxrpc_call_put);
593 }
594
595 /*
596  * mark a call as to be released, aborting it if it's still in progress
597  * - called with softirqs disabled
598  */
599 static void rxrpc_mark_call_released(struct rxrpc_call *call)
600 {
601         bool sched = false;
602
603         rxrpc_see_call(call);
604         write_lock(&call->state_lock);
605         if (call->state < RXRPC_CALL_DEAD) {
606                 sched = __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET);
607                 if (!test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
608                         sched = true;
609         }
610         write_unlock(&call->state_lock);
611         if (sched)
612                 rxrpc_queue_call(call);
613 }
614
615 /*
616  * release all the calls associated with a socket
617  */
618 void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
619 {
620         struct rxrpc_call *call;
621         struct rb_node *p;
622
623         _enter("%p", rx);
624
625         read_lock_bh(&rx->call_lock);
626
627         /* kill the not-yet-accepted incoming calls */
628         list_for_each_entry(call, &rx->secureq, accept_link) {
629                 rxrpc_mark_call_released(call);
630         }
631
632         list_for_each_entry(call, &rx->acceptq, accept_link) {
633                 rxrpc_mark_call_released(call);
634         }
635
636         /* mark all the calls as no longer wanting incoming packets */
637         for (p = rb_first(&rx->calls); p; p = rb_next(p)) {
638                 call = rb_entry(p, struct rxrpc_call, sock_node);
639                 rxrpc_mark_call_released(call);
640         }
641
642         read_unlock_bh(&rx->call_lock);
643         _leave("");
644 }
645
646 /*
647  * release a call
648  */
649 void rxrpc_put_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
650 {
651         const void *here = __builtin_return_address(0);
652         int n, m;
653
654         ASSERT(call != NULL);
655
656         n = atomic_dec_return(&call->usage);
657         m = atomic_read(&call->skb_count);
658         trace_rxrpc_call(call, op, n, m, here, NULL);
659         ASSERTCMP(n, >=, 0);
660         if (n == 0) {
661                 _debug("call %d dead", call->debug_id);
662                 WARN_ON(m != 0);
663                 ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
664                 rxrpc_queue_work(&call->destroyer);
665         }
666 }
667
668 /*
669  * Release a call ref held by a socket buffer.
670  */
671 void rxrpc_put_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb)
672 {
673         const void *here = __builtin_return_address(0);
674         int n, m;
675
676         n = atomic_dec_return(&call->usage);
677         m = atomic_dec_return(&call->skb_count);
678         trace_rxrpc_call(call, rxrpc_call_put_skb, n, m, here, skb);
679         ASSERTCMP(n, >=, 0);
680         if (n == 0) {
681                 _debug("call %d dead", call->debug_id);
682                 WARN_ON(m != 0);
683                 ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
684                 rxrpc_queue_work(&call->destroyer);
685         }
686 }
687
688 /*
689  * Final call destruction under RCU.
690  */
691 static void rxrpc_rcu_destroy_call(struct rcu_head *rcu)
692 {
693         struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu);
694
695         rxrpc_purge_queue(&call->rx_queue);
696         rxrpc_purge_queue(&call->knlrecv_queue);
697         rxrpc_put_peer(call->peer);
698         kmem_cache_free(rxrpc_call_jar, call);
699 }
700
701 /*
702  * clean up a call
703  */
704 static void rxrpc_cleanup_call(struct rxrpc_call *call)
705 {
706         _net("DESTROY CALL %d", call->debug_id);
707
708         ASSERT(call->socket);
709
710         memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
711
712         del_timer_sync(&call->lifetimer);
713         del_timer_sync(&call->deadspan);
714         del_timer_sync(&call->ack_timer);
715         del_timer_sync(&call->resend_timer);
716
717         ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
718         ASSERTCMP(call->events, ==, 0);
719         if (work_pending(&call->processor)) {
720                 _debug("defer destroy");
721                 rxrpc_queue_work(&call->destroyer);
722                 return;
723         }
724
725         ASSERTCMP(call->conn, ==, NULL);
726
727         if (call->acks_window) {
728                 _debug("kill Tx window %d",
729                        CIRC_CNT(call->acks_head, call->acks_tail,
730                                 call->acks_winsz));
731                 smp_mb();
732                 while (CIRC_CNT(call->acks_head, call->acks_tail,
733                                 call->acks_winsz) > 0) {
734                         struct rxrpc_skb_priv *sp;
735                         unsigned long _skb;
736
737                         _skb = call->acks_window[call->acks_tail] & ~1;
738                         sp = rxrpc_skb((struct sk_buff *)_skb);
739                         _debug("+++ clear Tx %u", sp->hdr.seq);
740                         rxrpc_free_skb((struct sk_buff *)_skb);
741                         call->acks_tail =
742                                 (call->acks_tail + 1) & (call->acks_winsz - 1);
743                 }
744
745                 kfree(call->acks_window);
746         }
747
748         rxrpc_free_skb(call->tx_pending);
749
750         rxrpc_purge_queue(&call->rx_queue);
751         ASSERT(skb_queue_empty(&call->rx_oos_queue));
752         rxrpc_purge_queue(&call->knlrecv_queue);
753         sock_put(&call->socket->sk);
754         call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
755 }
756
757 /*
758  * destroy a call
759  */
760 static void rxrpc_destroy_call(struct work_struct *work)
761 {
762         struct rxrpc_call *call =
763                 container_of(work, struct rxrpc_call, destroyer);
764
765         _enter("%p{%d,%x,%p}",
766                call, atomic_read(&call->usage), call->cid, call->conn);
767
768         ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
769
770         write_lock_bh(&rxrpc_call_lock);
771         list_del_init(&call->link);
772         write_unlock_bh(&rxrpc_call_lock);
773
774         rxrpc_cleanup_call(call);
775         _leave("");
776 }
777
778 /*
779  * preemptively destroy all the call records from a transport endpoint rather
780  * than waiting for them to time out
781  */
782 void __exit rxrpc_destroy_all_calls(void)
783 {
784         struct rxrpc_call *call;
785
786         _enter("");
787         write_lock_bh(&rxrpc_call_lock);
788
789         while (!list_empty(&rxrpc_calls)) {
790                 call = list_entry(rxrpc_calls.next, struct rxrpc_call, link);
791                 _debug("Zapping call %p", call);
792
793                 rxrpc_see_call(call);
794                 list_del_init(&call->link);
795
796                 switch (atomic_read(&call->usage)) {
797                 case 0:
798                         ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
799                         break;
800                 case 1:
801                         if (del_timer_sync(&call->deadspan) != 0 &&
802                             call->state != RXRPC_CALL_DEAD)
803                                 rxrpc_dead_call_expired((unsigned long) call);
804                         if (call->state != RXRPC_CALL_DEAD)
805                                 break;
806                 default:
807                         pr_err("Call %p still in use (%d,%d,%s,%lx,%lx)!\n",
808                                call, atomic_read(&call->usage),
809                                atomic_read(&call->ackr_not_idle),
810                                rxrpc_call_states[call->state],
811                                call->flags, call->events);
812                         if (!skb_queue_empty(&call->rx_queue))
813                                 pr_err("Rx queue occupied\n");
814                         if (!skb_queue_empty(&call->rx_oos_queue))
815                                 pr_err("OOS queue occupied\n");
816                         break;
817                 }
818
819                 write_unlock_bh(&rxrpc_call_lock);
820                 cond_resched();
821                 write_lock_bh(&rxrpc_call_lock);
822         }
823
824         write_unlock_bh(&rxrpc_call_lock);
825         _leave("");
826 }
827
828 /*
829  * handle call lifetime being exceeded
830  */
831 static void rxrpc_call_life_expired(unsigned long _call)
832 {
833         struct rxrpc_call *call = (struct rxrpc_call *) _call;
834
835         _enter("{%d}", call->debug_id);
836
837         rxrpc_see_call(call);
838         if (call->state >= RXRPC_CALL_COMPLETE)
839                 return;
840
841         set_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events);
842         rxrpc_queue_call(call);
843 }
844
845 /*
846  * handle resend timer expiry
847  * - may not take call->state_lock as this can deadlock against del_timer_sync()
848  */
849 static void rxrpc_resend_time_expired(unsigned long _call)
850 {
851         struct rxrpc_call *call = (struct rxrpc_call *) _call;
852
853         _enter("{%d}", call->debug_id);
854
855         rxrpc_see_call(call);
856         if (call->state >= RXRPC_CALL_COMPLETE)
857                 return;
858
859         clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
860         if (!test_and_set_bit(RXRPC_CALL_EV_RESEND_TIMER, &call->events))
861                 rxrpc_queue_call(call);
862 }
863
864 /*
865  * handle ACK timer expiry
866  */
867 static void rxrpc_ack_time_expired(unsigned long _call)
868 {
869         struct rxrpc_call *call = (struct rxrpc_call *) _call;
870
871         _enter("{%d}", call->debug_id);
872
873         rxrpc_see_call(call);
874         if (call->state >= RXRPC_CALL_COMPLETE)
875                 return;
876
877         if (!test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events))
878                 rxrpc_queue_call(call);
879 }