iommu/amd: No need to wait iommu completion if no dte irq entry change
[cascardo/linux.git] / fs / afs / rxrpc.c
1 /* Maintain an RxRPC server socket to do AFS communications through
2  *
3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  */
11
12 #include <linux/slab.h>
13 #include <net/sock.h>
14 #include <net/af_rxrpc.h>
15 #include <rxrpc/packet.h>
16 #include "internal.h"
17 #include "afs_cm.h"
18
19 static struct socket *afs_socket; /* my RxRPC socket */
20 static struct workqueue_struct *afs_async_calls;
21 static atomic_t afs_outstanding_calls;
22 static atomic_t afs_outstanding_skbs;
23
24 static void afs_wake_up_call_waiter(struct afs_call *);
25 static int afs_wait_for_call_to_complete(struct afs_call *);
26 static void afs_wake_up_async_call(struct afs_call *);
27 static int afs_dont_wait_for_call_to_complete(struct afs_call *);
28 static void afs_process_async_call(struct afs_call *);
29 static void afs_rx_interceptor(struct sock *, unsigned long, struct sk_buff *);
30 static int afs_deliver_cm_op_id(struct afs_call *, struct sk_buff *, bool);
31
32 /* synchronous call management */
33 const struct afs_wait_mode afs_sync_call = {
34         .rx_wakeup      = afs_wake_up_call_waiter,
35         .wait           = afs_wait_for_call_to_complete,
36 };
37
38 /* asynchronous call management */
39 const struct afs_wait_mode afs_async_call = {
40         .rx_wakeup      = afs_wake_up_async_call,
41         .wait           = afs_dont_wait_for_call_to_complete,
42 };
43
44 /* asynchronous incoming call management */
45 static const struct afs_wait_mode afs_async_incoming_call = {
46         .rx_wakeup      = afs_wake_up_async_call,
47 };
48
49 /* asynchronous incoming call initial processing */
50 static const struct afs_call_type afs_RXCMxxxx = {
51         .name           = "CB.xxxx",
52         .deliver        = afs_deliver_cm_op_id,
53         .abort_to_error = afs_abort_to_error,
54 };
55
56 static void afs_collect_incoming_call(struct work_struct *);
57
58 static struct sk_buff_head afs_incoming_calls;
59 static DECLARE_WORK(afs_collect_incoming_call_work, afs_collect_incoming_call);
60
61 static void afs_async_workfn(struct work_struct *work)
62 {
63         struct afs_call *call = container_of(work, struct afs_call, async_work);
64
65         call->async_workfn(call);
66 }
67
68 static int afs_wait_atomic_t(atomic_t *p)
69 {
70         schedule();
71         return 0;
72 }
73
74 /*
75  * open an RxRPC socket and bind it to be a server for callback notifications
76  * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
77  */
78 int afs_open_socket(void)
79 {
80         struct sockaddr_rxrpc srx;
81         struct socket *socket;
82         int ret;
83
84         _enter("");
85
86         skb_queue_head_init(&afs_incoming_calls);
87
88         ret = -ENOMEM;
89         afs_async_calls = create_singlethread_workqueue("kafsd");
90         if (!afs_async_calls)
91                 goto error_0;
92
93         ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET, &socket);
94         if (ret < 0)
95                 goto error_1;
96
97         socket->sk->sk_allocation = GFP_NOFS;
98
99         /* bind the callback manager's address to make this a server socket */
100         srx.srx_family                  = AF_RXRPC;
101         srx.srx_service                 = CM_SERVICE;
102         srx.transport_type              = SOCK_DGRAM;
103         srx.transport_len               = sizeof(srx.transport.sin);
104         srx.transport.sin.sin_family    = AF_INET;
105         srx.transport.sin.sin_port      = htons(AFS_CM_PORT);
106         memset(&srx.transport.sin.sin_addr, 0,
107                sizeof(srx.transport.sin.sin_addr));
108
109         ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
110         if (ret < 0)
111                 goto error_2;
112
113         ret = kernel_listen(socket, INT_MAX);
114         if (ret < 0)
115                 goto error_2;
116
117         rxrpc_kernel_intercept_rx_messages(socket, afs_rx_interceptor);
118
119         afs_socket = socket;
120         _leave(" = 0");
121         return 0;
122
123 error_2:
124         sock_release(socket);
125 error_1:
126         destroy_workqueue(afs_async_calls);
127 error_0:
128         _leave(" = %d", ret);
129         return ret;
130 }
131
132 /*
133  * close the RxRPC socket AFS was using
134  */
135 void afs_close_socket(void)
136 {
137         _enter("");
138
139         wait_on_atomic_t(&afs_outstanding_calls, afs_wait_atomic_t,
140                          TASK_UNINTERRUPTIBLE);
141         _debug("no outstanding calls");
142
143         sock_release(afs_socket);
144
145         _debug("dework");
146         destroy_workqueue(afs_async_calls);
147
148         ASSERTCMP(atomic_read(&afs_outstanding_skbs), ==, 0);
149         _leave("");
150 }
151
152 /*
153  * note that the data in a socket buffer is now delivered and that the buffer
154  * should be freed
155  */
156 static void afs_data_delivered(struct sk_buff *skb)
157 {
158         if (!skb) {
159                 _debug("DLVR NULL [%d]", atomic_read(&afs_outstanding_skbs));
160                 dump_stack();
161         } else {
162                 _debug("DLVR %p{%u} [%d]",
163                        skb, skb->mark, atomic_read(&afs_outstanding_skbs));
164                 if (atomic_dec_return(&afs_outstanding_skbs) == -1)
165                         BUG();
166                 rxrpc_kernel_data_delivered(skb);
167         }
168 }
169
170 /*
171  * free a socket buffer
172  */
173 static void afs_free_skb(struct sk_buff *skb)
174 {
175         if (!skb) {
176                 _debug("FREE NULL [%d]", atomic_read(&afs_outstanding_skbs));
177                 dump_stack();
178         } else {
179                 _debug("FREE %p{%u} [%d]",
180                        skb, skb->mark, atomic_read(&afs_outstanding_skbs));
181                 if (atomic_dec_return(&afs_outstanding_skbs) == -1)
182                         BUG();
183                 rxrpc_kernel_free_skb(skb);
184         }
185 }
186
187 /*
188  * free a call
189  */
190 static void afs_free_call(struct afs_call *call)
191 {
192         _debug("DONE %p{%s} [%d]",
193                call, call->type->name, atomic_read(&afs_outstanding_calls));
194
195         ASSERTCMP(call->rxcall, ==, NULL);
196         ASSERT(!work_pending(&call->async_work));
197         ASSERT(skb_queue_empty(&call->rx_queue));
198         ASSERT(call->type->name != NULL);
199
200         kfree(call->request);
201         kfree(call);
202
203         if (atomic_dec_and_test(&afs_outstanding_calls))
204                 wake_up_atomic_t(&afs_outstanding_calls);
205 }
206
207 /*
208  * End a call but do not free it
209  */
210 static void afs_end_call_nofree(struct afs_call *call)
211 {
212         if (call->rxcall) {
213                 rxrpc_kernel_end_call(call->rxcall);
214                 call->rxcall = NULL;
215         }
216         if (call->type->destructor)
217                 call->type->destructor(call);
218 }
219
220 /*
221  * End a call and free it
222  */
223 static void afs_end_call(struct afs_call *call)
224 {
225         afs_end_call_nofree(call);
226         afs_free_call(call);
227 }
228
229 /*
230  * allocate a call with flat request and reply buffers
231  */
232 struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type,
233                                      size_t request_size, size_t reply_size)
234 {
235         struct afs_call *call;
236
237         call = kzalloc(sizeof(*call), GFP_NOFS);
238         if (!call)
239                 goto nomem_call;
240
241         _debug("CALL %p{%s} [%d]",
242                call, type->name, atomic_read(&afs_outstanding_calls));
243         atomic_inc(&afs_outstanding_calls);
244
245         call->type = type;
246         call->request_size = request_size;
247         call->reply_max = reply_size;
248
249         if (request_size) {
250                 call->request = kmalloc(request_size, GFP_NOFS);
251                 if (!call->request)
252                         goto nomem_free;
253         }
254
255         if (reply_size) {
256                 call->buffer = kmalloc(reply_size, GFP_NOFS);
257                 if (!call->buffer)
258                         goto nomem_free;
259         }
260
261         init_waitqueue_head(&call->waitq);
262         skb_queue_head_init(&call->rx_queue);
263         return call;
264
265 nomem_free:
266         afs_free_call(call);
267 nomem_call:
268         return NULL;
269 }
270
271 /*
272  * clean up a call with flat buffer
273  */
274 void afs_flat_call_destructor(struct afs_call *call)
275 {
276         _enter("");
277
278         kfree(call->request);
279         call->request = NULL;
280         kfree(call->buffer);
281         call->buffer = NULL;
282 }
283
284 /*
285  * attach the data from a bunch of pages on an inode to a call
286  */
287 static int afs_send_pages(struct afs_call *call, struct msghdr *msg,
288                           struct kvec *iov)
289 {
290         struct page *pages[8];
291         unsigned count, n, loop, offset, to;
292         pgoff_t first = call->first, last = call->last;
293         int ret;
294
295         _enter("");
296
297         offset = call->first_offset;
298         call->first_offset = 0;
299
300         do {
301                 _debug("attach %lx-%lx", first, last);
302
303                 count = last - first + 1;
304                 if (count > ARRAY_SIZE(pages))
305                         count = ARRAY_SIZE(pages);
306                 n = find_get_pages_contig(call->mapping, first, count, pages);
307                 ASSERTCMP(n, ==, count);
308
309                 loop = 0;
310                 do {
311                         msg->msg_flags = 0;
312                         to = PAGE_SIZE;
313                         if (first + loop >= last)
314                                 to = call->last_to;
315                         else
316                                 msg->msg_flags = MSG_MORE;
317                         iov->iov_base = kmap(pages[loop]) + offset;
318                         iov->iov_len = to - offset;
319                         offset = 0;
320
321                         _debug("- range %u-%u%s",
322                                offset, to, msg->msg_flags ? " [more]" : "");
323                         iov_iter_kvec(&msg->msg_iter, WRITE | ITER_KVEC,
324                                       iov, 1, to - offset);
325
326                         /* have to change the state *before* sending the last
327                          * packet as RxRPC might give us the reply before it
328                          * returns from sending the request */
329                         if (first + loop >= last)
330                                 call->state = AFS_CALL_AWAIT_REPLY;
331                         ret = rxrpc_kernel_send_data(call->rxcall, msg,
332                                                      to - offset);
333                         kunmap(pages[loop]);
334                         if (ret < 0)
335                                 break;
336                 } while (++loop < count);
337                 first += count;
338
339                 for (loop = 0; loop < count; loop++)
340                         put_page(pages[loop]);
341                 if (ret < 0)
342                         break;
343         } while (first <= last);
344
345         _leave(" = %d", ret);
346         return ret;
347 }
348
349 /*
350  * initiate a call
351  */
352 int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
353                   const struct afs_wait_mode *wait_mode)
354 {
355         struct sockaddr_rxrpc srx;
356         struct rxrpc_call *rxcall;
357         struct msghdr msg;
358         struct kvec iov[1];
359         int ret;
360         struct sk_buff *skb;
361
362         _enter("%x,{%d},", addr->s_addr, ntohs(call->port));
363
364         ASSERT(call->type != NULL);
365         ASSERT(call->type->name != NULL);
366
367         _debug("____MAKE %p{%s,%x} [%d]____",
368                call, call->type->name, key_serial(call->key),
369                atomic_read(&afs_outstanding_calls));
370
371         call->wait_mode = wait_mode;
372         call->async_workfn = afs_process_async_call;
373         INIT_WORK(&call->async_work, afs_async_workfn);
374
375         memset(&srx, 0, sizeof(srx));
376         srx.srx_family = AF_RXRPC;
377         srx.srx_service = call->service_id;
378         srx.transport_type = SOCK_DGRAM;
379         srx.transport_len = sizeof(srx.transport.sin);
380         srx.transport.sin.sin_family = AF_INET;
381         srx.transport.sin.sin_port = call->port;
382         memcpy(&srx.transport.sin.sin_addr, addr, 4);
383
384         /* create a call */
385         rxcall = rxrpc_kernel_begin_call(afs_socket, &srx, call->key,
386                                          (unsigned long) call, gfp);
387         call->key = NULL;
388         if (IS_ERR(rxcall)) {
389                 ret = PTR_ERR(rxcall);
390                 goto error_kill_call;
391         }
392
393         call->rxcall = rxcall;
394
395         /* send the request */
396         iov[0].iov_base = call->request;
397         iov[0].iov_len  = call->request_size;
398
399         msg.msg_name            = NULL;
400         msg.msg_namelen         = 0;
401         iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1,
402                       call->request_size);
403         msg.msg_control         = NULL;
404         msg.msg_controllen      = 0;
405         msg.msg_flags           = (call->send_pages ? MSG_MORE : 0);
406
407         /* have to change the state *before* sending the last packet as RxRPC
408          * might give us the reply before it returns from sending the
409          * request */
410         if (!call->send_pages)
411                 call->state = AFS_CALL_AWAIT_REPLY;
412         ret = rxrpc_kernel_send_data(rxcall, &msg, call->request_size);
413         if (ret < 0)
414                 goto error_do_abort;
415
416         if (call->send_pages) {
417                 ret = afs_send_pages(call, &msg, iov);
418                 if (ret < 0)
419                         goto error_do_abort;
420         }
421
422         /* at this point, an async call may no longer exist as it may have
423          * already completed */
424         return wait_mode->wait(call);
425
426 error_do_abort:
427         rxrpc_kernel_abort_call(rxcall, RX_USER_ABORT);
428         while ((skb = skb_dequeue(&call->rx_queue)))
429                 afs_free_skb(skb);
430 error_kill_call:
431         afs_end_call(call);
432         _leave(" = %d", ret);
433         return ret;
434 }
435
436 /*
437  * Handles intercepted messages that were arriving in the socket's Rx queue.
438  *
439  * Called from the AF_RXRPC call processor in waitqueue process context.  For
440  * each call, it is guaranteed this will be called in order of packet to be
441  * delivered.
442  */
443 static void afs_rx_interceptor(struct sock *sk, unsigned long user_call_ID,
444                                struct sk_buff *skb)
445 {
446         struct afs_call *call = (struct afs_call *) user_call_ID;
447
448         _enter("%p,,%u", call, skb->mark);
449
450         _debug("ICPT %p{%u} [%d]",
451                skb, skb->mark, atomic_read(&afs_outstanding_skbs));
452
453         ASSERTCMP(sk, ==, afs_socket->sk);
454         atomic_inc(&afs_outstanding_skbs);
455
456         if (!call) {
457                 /* its an incoming call for our callback service */
458                 skb_queue_tail(&afs_incoming_calls, skb);
459                 queue_work(afs_wq, &afs_collect_incoming_call_work);
460         } else {
461                 /* route the messages directly to the appropriate call */
462                 skb_queue_tail(&call->rx_queue, skb);
463                 call->wait_mode->rx_wakeup(call);
464         }
465
466         _leave("");
467 }
468
469 /*
470  * deliver messages to a call
471  */
472 static void afs_deliver_to_call(struct afs_call *call)
473 {
474         struct sk_buff *skb;
475         bool last;
476         u32 abort_code;
477         int ret;
478
479         _enter("");
480
481         while ((call->state == AFS_CALL_AWAIT_REPLY ||
482                 call->state == AFS_CALL_AWAIT_OP_ID ||
483                 call->state == AFS_CALL_AWAIT_REQUEST ||
484                 call->state == AFS_CALL_AWAIT_ACK) &&
485                (skb = skb_dequeue(&call->rx_queue))) {
486                 switch (skb->mark) {
487                 case RXRPC_SKB_MARK_DATA:
488                         _debug("Rcv DATA");
489                         last = rxrpc_kernel_is_data_last(skb);
490                         ret = call->type->deliver(call, skb, last);
491                         switch (ret) {
492                         case 0:
493                                 if (last &&
494                                     call->state == AFS_CALL_AWAIT_REPLY)
495                                         call->state = AFS_CALL_COMPLETE;
496                                 break;
497                         case -ENOTCONN:
498                                 abort_code = RX_CALL_DEAD;
499                                 goto do_abort;
500                         case -ENOTSUPP:
501                                 abort_code = RX_INVALID_OPERATION;
502                                 goto do_abort;
503                         default:
504                                 abort_code = RXGEN_CC_UNMARSHAL;
505                                 if (call->state != AFS_CALL_AWAIT_REPLY)
506                                         abort_code = RXGEN_SS_UNMARSHAL;
507                         do_abort:
508                                 rxrpc_kernel_abort_call(call->rxcall,
509                                                         abort_code);
510                                 call->error = ret;
511                                 call->state = AFS_CALL_ERROR;
512                                 break;
513                         }
514                         afs_data_delivered(skb);
515                         skb = NULL;
516                         continue;
517                 case RXRPC_SKB_MARK_FINAL_ACK:
518                         _debug("Rcv ACK");
519                         call->state = AFS_CALL_COMPLETE;
520                         break;
521                 case RXRPC_SKB_MARK_BUSY:
522                         _debug("Rcv BUSY");
523                         call->error = -EBUSY;
524                         call->state = AFS_CALL_BUSY;
525                         break;
526                 case RXRPC_SKB_MARK_REMOTE_ABORT:
527                         abort_code = rxrpc_kernel_get_abort_code(skb);
528                         call->error = call->type->abort_to_error(abort_code);
529                         call->state = AFS_CALL_ABORTED;
530                         _debug("Rcv ABORT %u -> %d", abort_code, call->error);
531                         break;
532                 case RXRPC_SKB_MARK_LOCAL_ABORT:
533                         abort_code = rxrpc_kernel_get_abort_code(skb);
534                         call->error = call->type->abort_to_error(abort_code);
535                         call->state = AFS_CALL_ABORTED;
536                         _debug("Loc ABORT %u -> %d", abort_code, call->error);
537                         break;
538                 case RXRPC_SKB_MARK_NET_ERROR:
539                         call->error = -rxrpc_kernel_get_error_number(skb);
540                         call->state = AFS_CALL_ERROR;
541                         _debug("Rcv NET ERROR %d", call->error);
542                         break;
543                 case RXRPC_SKB_MARK_LOCAL_ERROR:
544                         call->error = -rxrpc_kernel_get_error_number(skb);
545                         call->state = AFS_CALL_ERROR;
546                         _debug("Rcv LOCAL ERROR %d", call->error);
547                         break;
548                 default:
549                         BUG();
550                         break;
551                 }
552
553                 afs_free_skb(skb);
554         }
555
556         /* make sure the queue is empty if the call is done with (we might have
557          * aborted the call early because of an unmarshalling error) */
558         if (call->state >= AFS_CALL_COMPLETE) {
559                 while ((skb = skb_dequeue(&call->rx_queue)))
560                         afs_free_skb(skb);
561                 if (call->incoming)
562                         afs_end_call(call);
563         }
564
565         _leave("");
566 }
567
568 /*
569  * wait synchronously for a call to complete
570  */
571 static int afs_wait_for_call_to_complete(struct afs_call *call)
572 {
573         struct sk_buff *skb;
574         int ret;
575
576         DECLARE_WAITQUEUE(myself, current);
577
578         _enter("");
579
580         add_wait_queue(&call->waitq, &myself);
581         for (;;) {
582                 set_current_state(TASK_INTERRUPTIBLE);
583
584                 /* deliver any messages that are in the queue */
585                 if (!skb_queue_empty(&call->rx_queue)) {
586                         __set_current_state(TASK_RUNNING);
587                         afs_deliver_to_call(call);
588                         continue;
589                 }
590
591                 ret = call->error;
592                 if (call->state >= AFS_CALL_COMPLETE)
593                         break;
594                 ret = -EINTR;
595                 if (signal_pending(current))
596                         break;
597                 schedule();
598         }
599
600         remove_wait_queue(&call->waitq, &myself);
601         __set_current_state(TASK_RUNNING);
602
603         /* kill the call */
604         if (call->state < AFS_CALL_COMPLETE) {
605                 _debug("call incomplete");
606                 rxrpc_kernel_abort_call(call->rxcall, RX_CALL_DEAD);
607                 while ((skb = skb_dequeue(&call->rx_queue)))
608                         afs_free_skb(skb);
609         }
610
611         _debug("call complete");
612         afs_end_call(call);
613         _leave(" = %d", ret);
614         return ret;
615 }
616
617 /*
618  * wake up a waiting call
619  */
620 static void afs_wake_up_call_waiter(struct afs_call *call)
621 {
622         wake_up(&call->waitq);
623 }
624
625 /*
626  * wake up an asynchronous call
627  */
628 static void afs_wake_up_async_call(struct afs_call *call)
629 {
630         _enter("");
631         queue_work(afs_async_calls, &call->async_work);
632 }
633
634 /*
635  * put a call into asynchronous mode
636  * - mustn't touch the call descriptor as the call my have completed by the
637  *   time we get here
638  */
639 static int afs_dont_wait_for_call_to_complete(struct afs_call *call)
640 {
641         _enter("");
642         return -EINPROGRESS;
643 }
644
645 /*
646  * delete an asynchronous call
647  */
648 static void afs_delete_async_call(struct afs_call *call)
649 {
650         _enter("");
651
652         afs_free_call(call);
653
654         _leave("");
655 }
656
657 /*
658  * perform processing on an asynchronous call
659  * - on a multiple-thread workqueue this work item may try to run on several
660  *   CPUs at the same time
661  */
662 static void afs_process_async_call(struct afs_call *call)
663 {
664         _enter("");
665
666         if (!skb_queue_empty(&call->rx_queue))
667                 afs_deliver_to_call(call);
668
669         if (call->state >= AFS_CALL_COMPLETE && call->wait_mode) {
670                 if (call->wait_mode->async_complete)
671                         call->wait_mode->async_complete(call->reply,
672                                                         call->error);
673                 call->reply = NULL;
674
675                 /* kill the call */
676                 afs_end_call_nofree(call);
677
678                 /* we can't just delete the call because the work item may be
679                  * queued */
680                 call->async_workfn = afs_delete_async_call;
681                 queue_work(afs_async_calls, &call->async_work);
682         }
683
684         _leave("");
685 }
686
687 /*
688  * empty a socket buffer into a flat reply buffer
689  */
690 void afs_transfer_reply(struct afs_call *call, struct sk_buff *skb)
691 {
692         size_t len = skb->len;
693
694         if (skb_copy_bits(skb, 0, call->buffer + call->reply_size, len) < 0)
695                 BUG();
696         call->reply_size += len;
697 }
698
699 /*
700  * accept the backlog of incoming calls
701  */
702 static void afs_collect_incoming_call(struct work_struct *work)
703 {
704         struct rxrpc_call *rxcall;
705         struct afs_call *call = NULL;
706         struct sk_buff *skb;
707
708         while ((skb = skb_dequeue(&afs_incoming_calls))) {
709                 _debug("new call");
710
711                 /* don't need the notification */
712                 afs_free_skb(skb);
713
714                 if (!call) {
715                         call = kzalloc(sizeof(struct afs_call), GFP_KERNEL);
716                         if (!call) {
717                                 rxrpc_kernel_reject_call(afs_socket);
718                                 return;
719                         }
720
721                         call->async_workfn = afs_process_async_call;
722                         INIT_WORK(&call->async_work, afs_async_workfn);
723                         call->wait_mode = &afs_async_incoming_call;
724                         call->type = &afs_RXCMxxxx;
725                         init_waitqueue_head(&call->waitq);
726                         skb_queue_head_init(&call->rx_queue);
727                         call->state = AFS_CALL_AWAIT_OP_ID;
728
729                         _debug("CALL %p{%s} [%d]",
730                                call, call->type->name,
731                                atomic_read(&afs_outstanding_calls));
732                         atomic_inc(&afs_outstanding_calls);
733                 }
734
735                 rxcall = rxrpc_kernel_accept_call(afs_socket,
736                                                   (unsigned long) call);
737                 if (!IS_ERR(rxcall)) {
738                         call->rxcall = rxcall;
739                         call = NULL;
740                 }
741         }
742
743         if (call)
744                 afs_free_call(call);
745 }
746
747 /*
748  * grab the operation ID from an incoming cache manager call
749  */
750 static int afs_deliver_cm_op_id(struct afs_call *call, struct sk_buff *skb,
751                                 bool last)
752 {
753         size_t len = skb->len;
754         void *oibuf = (void *) &call->operation_ID;
755
756         _enter("{%u},{%zu},%d", call->offset, len, last);
757
758         ASSERTCMP(call->offset, <, 4);
759
760         /* the operation ID forms the first four bytes of the request data */
761         len = min_t(size_t, len, 4 - call->offset);
762         if (skb_copy_bits(skb, 0, oibuf + call->offset, len) < 0)
763                 BUG();
764         if (!pskb_pull(skb, len))
765                 BUG();
766         call->offset += len;
767
768         if (call->offset < 4) {
769                 if (last) {
770                         _leave(" = -EBADMSG [op ID short]");
771                         return -EBADMSG;
772                 }
773                 _leave(" = 0 [incomplete]");
774                 return 0;
775         }
776
777         call->state = AFS_CALL_AWAIT_REQUEST;
778
779         /* ask the cache manager to route the call (it'll change the call type
780          * if successful) */
781         if (!afs_cm_incoming_call(call))
782                 return -ENOTSUPP;
783
784         /* pass responsibility for the remainer of this message off to the
785          * cache manager op */
786         return call->type->deliver(call, skb, last);
787 }
788
789 /*
790  * send an empty reply
791  */
792 void afs_send_empty_reply(struct afs_call *call)
793 {
794         struct msghdr msg;
795
796         _enter("");
797
798         msg.msg_name            = NULL;
799         msg.msg_namelen         = 0;
800         iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, NULL, 0, 0);
801         msg.msg_control         = NULL;
802         msg.msg_controllen      = 0;
803         msg.msg_flags           = 0;
804
805         call->state = AFS_CALL_AWAIT_ACK;
806         switch (rxrpc_kernel_send_data(call->rxcall, &msg, 0)) {
807         case 0:
808                 _leave(" [replied]");
809                 return;
810
811         case -ENOMEM:
812                 _debug("oom");
813                 rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT);
814         default:
815                 afs_end_call(call);
816                 _leave(" [error]");
817                 return;
818         }
819 }
820
821 /*
822  * send a simple reply
823  */
824 void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
825 {
826         struct msghdr msg;
827         struct kvec iov[1];
828         int n;
829
830         _enter("");
831
832         iov[0].iov_base         = (void *) buf;
833         iov[0].iov_len          = len;
834         msg.msg_name            = NULL;
835         msg.msg_namelen         = 0;
836         iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1, len);
837         msg.msg_control         = NULL;
838         msg.msg_controllen      = 0;
839         msg.msg_flags           = 0;
840
841         call->state = AFS_CALL_AWAIT_ACK;
842         n = rxrpc_kernel_send_data(call->rxcall, &msg, len);
843         if (n >= 0) {
844                 /* Success */
845                 _leave(" [replied]");
846                 return;
847         }
848
849         if (n == -ENOMEM) {
850                 _debug("oom");
851                 rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT);
852         }
853         afs_end_call(call);
854         _leave(" [error]");
855 }
856
857 /*
858  * extract a piece of data from the received data socket buffers
859  */
860 int afs_extract_data(struct afs_call *call, struct sk_buff *skb,
861                      bool last, void *buf, size_t count)
862 {
863         size_t len = skb->len;
864
865         _enter("{%u},{%zu},%d,,%zu", call->offset, len, last, count);
866
867         ASSERTCMP(call->offset, <, count);
868
869         len = min_t(size_t, len, count - call->offset);
870         if (skb_copy_bits(skb, 0, buf + call->offset, len) < 0 ||
871             !pskb_pull(skb, len))
872                 BUG();
873         call->offset += len;
874
875         if (call->offset < count) {
876                 if (last) {
877                         _leave(" = -EBADMSG [%d < %zu]", call->offset, count);
878                         return -EBADMSG;
879                 }
880                 _leave(" = -EAGAIN");
881                 return -EAGAIN;
882         }
883         return 0;
884 }