IB/core: add support to create a unsafe global rkey to ib_create_pd
[cascardo/linux.git] / net / sunrpc / xprtrdma / svc_rdma_transport.c
1 /*
2  * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
3  * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
4  *
5  * This software is available to you under a choice of one of two
6  * licenses.  You may choose to be licensed under the terms of the GNU
7  * General Public License (GPL) Version 2, available from the file
8  * COPYING in the main directory of this source tree, or the BSD-type
9  * license below:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions
13  * are met:
14  *
15  *      Redistributions of source code must retain the above copyright
16  *      notice, this list of conditions and the following disclaimer.
17  *
18  *      Redistributions in binary form must reproduce the above
19  *      copyright notice, this list of conditions and the following
20  *      disclaimer in the documentation and/or other materials provided
21  *      with the distribution.
22  *
23  *      Neither the name of the Network Appliance, Inc. nor the names of
24  *      its contributors may be used to endorse or promote products
25  *      derived from this software without specific prior written
26  *      permission.
27  *
28  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
29  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
31  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
32  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
33  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
34  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
35  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
36  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
37  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
38  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39  *
40  * Author: Tom Tucker <tom@opengridcomputing.com>
41  */
42
43 #include <linux/sunrpc/svc_xprt.h>
44 #include <linux/sunrpc/debug.h>
45 #include <linux/sunrpc/rpc_rdma.h>
46 #include <linux/interrupt.h>
47 #include <linux/sched.h>
48 #include <linux/slab.h>
49 #include <linux/spinlock.h>
50 #include <linux/workqueue.h>
51 #include <rdma/ib_verbs.h>
52 #include <rdma/rdma_cm.h>
53 #include <linux/sunrpc/svc_rdma.h>
54 #include <linux/export.h>
55 #include "xprt_rdma.h"
56
57 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
58
59 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int);
60 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
61                                         struct net *net,
62                                         struct sockaddr *sa, int salen,
63                                         int flags);
64 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
65 static void svc_rdma_release_rqst(struct svc_rqst *);
66 static void svc_rdma_detach(struct svc_xprt *xprt);
67 static void svc_rdma_free(struct svc_xprt *xprt);
68 static int svc_rdma_has_wspace(struct svc_xprt *xprt);
69 static int svc_rdma_secure_port(struct svc_rqst *);
70
71 static struct svc_xprt_ops svc_rdma_ops = {
72         .xpo_create = svc_rdma_create,
73         .xpo_recvfrom = svc_rdma_recvfrom,
74         .xpo_sendto = svc_rdma_sendto,
75         .xpo_release_rqst = svc_rdma_release_rqst,
76         .xpo_detach = svc_rdma_detach,
77         .xpo_free = svc_rdma_free,
78         .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
79         .xpo_has_wspace = svc_rdma_has_wspace,
80         .xpo_accept = svc_rdma_accept,
81         .xpo_secure_port = svc_rdma_secure_port,
82 };
83
84 struct svc_xprt_class svc_rdma_class = {
85         .xcl_name = "rdma",
86         .xcl_owner = THIS_MODULE,
87         .xcl_ops = &svc_rdma_ops,
88         .xcl_max_payload = RPCSVC_MAXPAYLOAD_RDMA,
89         .xcl_ident = XPRT_TRANSPORT_RDMA,
90 };
91
92 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
93 static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *,
94                                            struct sockaddr *, int, int);
95 static void svc_rdma_bc_detach(struct svc_xprt *);
96 static void svc_rdma_bc_free(struct svc_xprt *);
97
98 static struct svc_xprt_ops svc_rdma_bc_ops = {
99         .xpo_create = svc_rdma_bc_create,
100         .xpo_detach = svc_rdma_bc_detach,
101         .xpo_free = svc_rdma_bc_free,
102         .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
103         .xpo_secure_port = svc_rdma_secure_port,
104 };
105
106 struct svc_xprt_class svc_rdma_bc_class = {
107         .xcl_name = "rdma-bc",
108         .xcl_owner = THIS_MODULE,
109         .xcl_ops = &svc_rdma_bc_ops,
110         .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN)
111 };
112
113 static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv,
114                                            struct net *net,
115                                            struct sockaddr *sa, int salen,
116                                            int flags)
117 {
118         struct svcxprt_rdma *cma_xprt;
119         struct svc_xprt *xprt;
120
121         cma_xprt = rdma_create_xprt(serv, 0);
122         if (!cma_xprt)
123                 return ERR_PTR(-ENOMEM);
124         xprt = &cma_xprt->sc_xprt;
125
126         svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv);
127         serv->sv_bc_xprt = xprt;
128
129         dprintk("svcrdma: %s(%p)\n", __func__, xprt);
130         return xprt;
131 }
132
133 static void svc_rdma_bc_detach(struct svc_xprt *xprt)
134 {
135         dprintk("svcrdma: %s(%p)\n", __func__, xprt);
136 }
137
138 static void svc_rdma_bc_free(struct svc_xprt *xprt)
139 {
140         struct svcxprt_rdma *rdma =
141                 container_of(xprt, struct svcxprt_rdma, sc_xprt);
142
143         dprintk("svcrdma: %s(%p)\n", __func__, xprt);
144         if (xprt)
145                 kfree(rdma);
146 }
147 #endif  /* CONFIG_SUNRPC_BACKCHANNEL */
148
149 static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt,
150                                            gfp_t flags)
151 {
152         struct svc_rdma_op_ctxt *ctxt;
153
154         ctxt = kmalloc(sizeof(*ctxt), flags);
155         if (ctxt) {
156                 ctxt->xprt = xprt;
157                 INIT_LIST_HEAD(&ctxt->free);
158                 INIT_LIST_HEAD(&ctxt->dto_q);
159         }
160         return ctxt;
161 }
162
163 static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt)
164 {
165         unsigned int i;
166
167         /* Each RPC/RDMA credit can consume a number of send
168          * and receive WQEs. One ctxt is allocated for each.
169          */
170         i = xprt->sc_sq_depth + xprt->sc_rq_depth;
171
172         while (i--) {
173                 struct svc_rdma_op_ctxt *ctxt;
174
175                 ctxt = alloc_ctxt(xprt, GFP_KERNEL);
176                 if (!ctxt) {
177                         dprintk("svcrdma: No memory for RDMA ctxt\n");
178                         return false;
179                 }
180                 list_add(&ctxt->free, &xprt->sc_ctxts);
181         }
182         return true;
183 }
184
185 struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
186 {
187         struct svc_rdma_op_ctxt *ctxt = NULL;
188
189         spin_lock_bh(&xprt->sc_ctxt_lock);
190         xprt->sc_ctxt_used++;
191         if (list_empty(&xprt->sc_ctxts))
192                 goto out_empty;
193
194         ctxt = list_first_entry(&xprt->sc_ctxts,
195                                 struct svc_rdma_op_ctxt, free);
196         list_del_init(&ctxt->free);
197         spin_unlock_bh(&xprt->sc_ctxt_lock);
198
199 out:
200         ctxt->count = 0;
201         ctxt->frmr = NULL;
202         return ctxt;
203
204 out_empty:
205         /* Either pre-allocation missed the mark, or send
206          * queue accounting is broken.
207          */
208         spin_unlock_bh(&xprt->sc_ctxt_lock);
209
210         ctxt = alloc_ctxt(xprt, GFP_NOIO);
211         if (ctxt)
212                 goto out;
213
214         spin_lock_bh(&xprt->sc_ctxt_lock);
215         xprt->sc_ctxt_used--;
216         spin_unlock_bh(&xprt->sc_ctxt_lock);
217         WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n");
218         return NULL;
219 }
220
221 void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
222 {
223         struct svcxprt_rdma *xprt = ctxt->xprt;
224         int i;
225         for (i = 0; i < ctxt->count && ctxt->sge[i].length; i++) {
226                 /*
227                  * Unmap the DMA addr in the SGE if the lkey matches
228                  * the local_dma_lkey, otherwise, ignore it since it is
229                  * an FRMR lkey and will be unmapped later when the
230                  * last WR that uses it completes.
231                  */
232                 if (ctxt->sge[i].lkey == xprt->sc_pd->local_dma_lkey) {
233                         atomic_dec(&xprt->sc_dma_used);
234                         ib_dma_unmap_page(xprt->sc_cm_id->device,
235                                             ctxt->sge[i].addr,
236                                             ctxt->sge[i].length,
237                                             ctxt->direction);
238                 }
239         }
240 }
241
242 void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
243 {
244         struct svcxprt_rdma *xprt = ctxt->xprt;
245         int i;
246
247         if (free_pages)
248                 for (i = 0; i < ctxt->count; i++)
249                         put_page(ctxt->pages[i]);
250
251         spin_lock_bh(&xprt->sc_ctxt_lock);
252         xprt->sc_ctxt_used--;
253         list_add(&ctxt->free, &xprt->sc_ctxts);
254         spin_unlock_bh(&xprt->sc_ctxt_lock);
255 }
256
257 static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt)
258 {
259         while (!list_empty(&xprt->sc_ctxts)) {
260                 struct svc_rdma_op_ctxt *ctxt;
261
262                 ctxt = list_first_entry(&xprt->sc_ctxts,
263                                         struct svc_rdma_op_ctxt, free);
264                 list_del(&ctxt->free);
265                 kfree(ctxt);
266         }
267 }
268
269 static struct svc_rdma_req_map *alloc_req_map(gfp_t flags)
270 {
271         struct svc_rdma_req_map *map;
272
273         map = kmalloc(sizeof(*map), flags);
274         if (map)
275                 INIT_LIST_HEAD(&map->free);
276         return map;
277 }
278
279 static bool svc_rdma_prealloc_maps(struct svcxprt_rdma *xprt)
280 {
281         unsigned int i;
282
283         /* One for each receive buffer on this connection. */
284         i = xprt->sc_max_requests;
285
286         while (i--) {
287                 struct svc_rdma_req_map *map;
288
289                 map = alloc_req_map(GFP_KERNEL);
290                 if (!map) {
291                         dprintk("svcrdma: No memory for request map\n");
292                         return false;
293                 }
294                 list_add(&map->free, &xprt->sc_maps);
295         }
296         return true;
297 }
298
299 struct svc_rdma_req_map *svc_rdma_get_req_map(struct svcxprt_rdma *xprt)
300 {
301         struct svc_rdma_req_map *map = NULL;
302
303         spin_lock(&xprt->sc_map_lock);
304         if (list_empty(&xprt->sc_maps))
305                 goto out_empty;
306
307         map = list_first_entry(&xprt->sc_maps,
308                                struct svc_rdma_req_map, free);
309         list_del_init(&map->free);
310         spin_unlock(&xprt->sc_map_lock);
311
312 out:
313         map->count = 0;
314         return map;
315
316 out_empty:
317         spin_unlock(&xprt->sc_map_lock);
318
319         /* Pre-allocation amount was incorrect */
320         map = alloc_req_map(GFP_NOIO);
321         if (map)
322                 goto out;
323
324         WARN_ONCE(1, "svcrdma: empty request map list?\n");
325         return NULL;
326 }
327
328 void svc_rdma_put_req_map(struct svcxprt_rdma *xprt,
329                           struct svc_rdma_req_map *map)
330 {
331         spin_lock(&xprt->sc_map_lock);
332         list_add(&map->free, &xprt->sc_maps);
333         spin_unlock(&xprt->sc_map_lock);
334 }
335
336 static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt)
337 {
338         while (!list_empty(&xprt->sc_maps)) {
339                 struct svc_rdma_req_map *map;
340
341                 map = list_first_entry(&xprt->sc_maps,
342                                        struct svc_rdma_req_map, free);
343                 list_del(&map->free);
344                 kfree(map);
345         }
346 }
347
348 /* QP event handler */
349 static void qp_event_handler(struct ib_event *event, void *context)
350 {
351         struct svc_xprt *xprt = context;
352
353         switch (event->event) {
354         /* These are considered benign events */
355         case IB_EVENT_PATH_MIG:
356         case IB_EVENT_COMM_EST:
357         case IB_EVENT_SQ_DRAINED:
358         case IB_EVENT_QP_LAST_WQE_REACHED:
359                 dprintk("svcrdma: QP event %s (%d) received for QP=%p\n",
360                         ib_event_msg(event->event), event->event,
361                         event->element.qp);
362                 break;
363         /* These are considered fatal events */
364         case IB_EVENT_PATH_MIG_ERR:
365         case IB_EVENT_QP_FATAL:
366         case IB_EVENT_QP_REQ_ERR:
367         case IB_EVENT_QP_ACCESS_ERR:
368         case IB_EVENT_DEVICE_FATAL:
369         default:
370                 dprintk("svcrdma: QP ERROR event %s (%d) received for QP=%p, "
371                         "closing transport\n",
372                         ib_event_msg(event->event), event->event,
373                         event->element.qp);
374                 set_bit(XPT_CLOSE, &xprt->xpt_flags);
375                 break;
376         }
377 }
378
379 /**
380  * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
381  * @cq:        completion queue
382  * @wc:        completed WR
383  *
384  */
385 static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
386 {
387         struct svcxprt_rdma *xprt = cq->cq_context;
388         struct ib_cqe *cqe = wc->wr_cqe;
389         struct svc_rdma_op_ctxt *ctxt;
390
391         /* WARNING: Only wc->wr_cqe and wc->status are reliable */
392         ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
393         ctxt->wc_status = wc->status;
394         svc_rdma_unmap_dma(ctxt);
395
396         if (wc->status != IB_WC_SUCCESS)
397                 goto flushed;
398
399         /* All wc fields are now known to be valid */
400         ctxt->byte_len = wc->byte_len;
401         spin_lock(&xprt->sc_rq_dto_lock);
402         list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
403         spin_unlock(&xprt->sc_rq_dto_lock);
404
405         set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
406         if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
407                 goto out;
408         svc_xprt_enqueue(&xprt->sc_xprt);
409         goto out;
410
411 flushed:
412         if (wc->status != IB_WC_WR_FLUSH_ERR)
413                 pr_warn("svcrdma: receive: %s (%u/0x%x)\n",
414                         ib_wc_status_msg(wc->status),
415                         wc->status, wc->vendor_err);
416         set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
417         svc_rdma_put_context(ctxt, 1);
418
419 out:
420         svc_xprt_put(&xprt->sc_xprt);
421 }
422
423 static void svc_rdma_send_wc_common(struct svcxprt_rdma *xprt,
424                                     struct ib_wc *wc,
425                                     const char *opname)
426 {
427         if (wc->status != IB_WC_SUCCESS)
428                 goto err;
429
430 out:
431         atomic_dec(&xprt->sc_sq_count);
432         wake_up(&xprt->sc_send_wait);
433         return;
434
435 err:
436         set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
437         if (wc->status != IB_WC_WR_FLUSH_ERR)
438                 pr_err("svcrdma: %s: %s (%u/0x%x)\n",
439                        opname, ib_wc_status_msg(wc->status),
440                        wc->status, wc->vendor_err);
441         goto out;
442 }
443
444 static void svc_rdma_send_wc_common_put(struct ib_cq *cq, struct ib_wc *wc,
445                                         const char *opname)
446 {
447         struct svcxprt_rdma *xprt = cq->cq_context;
448
449         svc_rdma_send_wc_common(xprt, wc, opname);
450         svc_xprt_put(&xprt->sc_xprt);
451 }
452
453 /**
454  * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
455  * @cq:        completion queue
456  * @wc:        completed WR
457  *
458  */
459 void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
460 {
461         struct ib_cqe *cqe = wc->wr_cqe;
462         struct svc_rdma_op_ctxt *ctxt;
463
464         svc_rdma_send_wc_common_put(cq, wc, "send");
465
466         ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
467         svc_rdma_unmap_dma(ctxt);
468         svc_rdma_put_context(ctxt, 1);
469 }
470
471 /**
472  * svc_rdma_wc_write - Invoked by RDMA provider for each polled Write WC
473  * @cq:        completion queue
474  * @wc:        completed WR
475  *
476  */
477 void svc_rdma_wc_write(struct ib_cq *cq, struct ib_wc *wc)
478 {
479         struct ib_cqe *cqe = wc->wr_cqe;
480         struct svc_rdma_op_ctxt *ctxt;
481
482         svc_rdma_send_wc_common_put(cq, wc, "write");
483
484         ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
485         svc_rdma_unmap_dma(ctxt);
486         svc_rdma_put_context(ctxt, 0);
487 }
488
489 /**
490  * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC
491  * @cq:        completion queue
492  * @wc:        completed WR
493  *
494  */
495 void svc_rdma_wc_reg(struct ib_cq *cq, struct ib_wc *wc)
496 {
497         svc_rdma_send_wc_common_put(cq, wc, "fastreg");
498 }
499
500 /**
501  * svc_rdma_wc_read - Invoked by RDMA provider for each polled Read WC
502  * @cq:        completion queue
503  * @wc:        completed WR
504  *
505  */
506 void svc_rdma_wc_read(struct ib_cq *cq, struct ib_wc *wc)
507 {
508         struct svcxprt_rdma *xprt = cq->cq_context;
509         struct ib_cqe *cqe = wc->wr_cqe;
510         struct svc_rdma_op_ctxt *ctxt;
511
512         svc_rdma_send_wc_common(xprt, wc, "read");
513
514         ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
515         svc_rdma_unmap_dma(ctxt);
516         svc_rdma_put_frmr(xprt, ctxt->frmr);
517
518         if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
519                 struct svc_rdma_op_ctxt *read_hdr;
520
521                 read_hdr = ctxt->read_hdr;
522                 spin_lock(&xprt->sc_rq_dto_lock);
523                 list_add_tail(&read_hdr->dto_q,
524                               &xprt->sc_read_complete_q);
525                 spin_unlock(&xprt->sc_rq_dto_lock);
526
527                 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
528                 svc_xprt_enqueue(&xprt->sc_xprt);
529         }
530
531         svc_rdma_put_context(ctxt, 0);
532         svc_xprt_put(&xprt->sc_xprt);
533 }
534
535 /**
536  * svc_rdma_wc_inv - Invoked by RDMA provider for each polled LOCAL_INV WC
537  * @cq:        completion queue
538  * @wc:        completed WR
539  *
540  */
541 void svc_rdma_wc_inv(struct ib_cq *cq, struct ib_wc *wc)
542 {
543         svc_rdma_send_wc_common_put(cq, wc, "localInv");
544 }
545
546 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
547                                              int listener)
548 {
549         struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL);
550
551         if (!cma_xprt)
552                 return NULL;
553         svc_xprt_init(&init_net, &svc_rdma_class, &cma_xprt->sc_xprt, serv);
554         INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
555         INIT_LIST_HEAD(&cma_xprt->sc_dto_q);
556         INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
557         INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
558         INIT_LIST_HEAD(&cma_xprt->sc_frmr_q);
559         INIT_LIST_HEAD(&cma_xprt->sc_ctxts);
560         INIT_LIST_HEAD(&cma_xprt->sc_maps);
561         init_waitqueue_head(&cma_xprt->sc_send_wait);
562
563         spin_lock_init(&cma_xprt->sc_lock);
564         spin_lock_init(&cma_xprt->sc_rq_dto_lock);
565         spin_lock_init(&cma_xprt->sc_frmr_q_lock);
566         spin_lock_init(&cma_xprt->sc_ctxt_lock);
567         spin_lock_init(&cma_xprt->sc_map_lock);
568
569         if (listener)
570                 set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
571
572         return cma_xprt;
573 }
574
575 int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
576 {
577         struct ib_recv_wr recv_wr, *bad_recv_wr;
578         struct svc_rdma_op_ctxt *ctxt;
579         struct page *page;
580         dma_addr_t pa;
581         int sge_no;
582         int buflen;
583         int ret;
584
585         ctxt = svc_rdma_get_context(xprt);
586         buflen = 0;
587         ctxt->direction = DMA_FROM_DEVICE;
588         ctxt->cqe.done = svc_rdma_wc_receive;
589         for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
590                 if (sge_no >= xprt->sc_max_sge) {
591                         pr_err("svcrdma: Too many sges (%d)\n", sge_no);
592                         goto err_put_ctxt;
593                 }
594                 page = alloc_page(flags);
595                 if (!page)
596                         goto err_put_ctxt;
597                 ctxt->pages[sge_no] = page;
598                 pa = ib_dma_map_page(xprt->sc_cm_id->device,
599                                      page, 0, PAGE_SIZE,
600                                      DMA_FROM_DEVICE);
601                 if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa))
602                         goto err_put_ctxt;
603                 atomic_inc(&xprt->sc_dma_used);
604                 ctxt->sge[sge_no].addr = pa;
605                 ctxt->sge[sge_no].length = PAGE_SIZE;
606                 ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey;
607                 ctxt->count = sge_no + 1;
608                 buflen += PAGE_SIZE;
609         }
610         recv_wr.next = NULL;
611         recv_wr.sg_list = &ctxt->sge[0];
612         recv_wr.num_sge = ctxt->count;
613         recv_wr.wr_cqe = &ctxt->cqe;
614
615         svc_xprt_get(&xprt->sc_xprt);
616         ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
617         if (ret) {
618                 svc_rdma_unmap_dma(ctxt);
619                 svc_rdma_put_context(ctxt, 1);
620                 svc_xprt_put(&xprt->sc_xprt);
621         }
622         return ret;
623
624  err_put_ctxt:
625         svc_rdma_unmap_dma(ctxt);
626         svc_rdma_put_context(ctxt, 1);
627         return -ENOMEM;
628 }
629
630 int svc_rdma_repost_recv(struct svcxprt_rdma *xprt, gfp_t flags)
631 {
632         int ret = 0;
633
634         ret = svc_rdma_post_recv(xprt, flags);
635         if (ret) {
636                 pr_err("svcrdma: could not post a receive buffer, err=%d.\n",
637                        ret);
638                 pr_err("svcrdma: closing transport %p.\n", xprt);
639                 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
640                 ret = -ENOTCONN;
641         }
642         return ret;
643 }
644
645 /*
646  * This function handles the CONNECT_REQUEST event on a listening
647  * endpoint. It is passed the cma_id for the _new_ connection. The context in
648  * this cma_id is inherited from the listening cma_id and is the svc_xprt
649  * structure for the listening endpoint.
650  *
651  * This function creates a new xprt for the new connection and enqueues it on
652  * the accept queue for the listent xprt. When the listen thread is kicked, it
653  * will call the recvfrom method on the listen xprt which will accept the new
654  * connection.
655  */
656 static void handle_connect_req(struct rdma_cm_id *new_cma_id, size_t client_ird)
657 {
658         struct svcxprt_rdma *listen_xprt = new_cma_id->context;
659         struct svcxprt_rdma *newxprt;
660         struct sockaddr *sa;
661
662         /* Create a new transport */
663         newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0);
664         if (!newxprt) {
665                 dprintk("svcrdma: failed to create new transport\n");
666                 return;
667         }
668         newxprt->sc_cm_id = new_cma_id;
669         new_cma_id->context = newxprt;
670         dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
671                 newxprt, newxprt->sc_cm_id, listen_xprt);
672
673         /* Save client advertised inbound read limit for use later in accept. */
674         newxprt->sc_ord = client_ird;
675
676         /* Set the local and remote addresses in the transport */
677         sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
678         svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
679         sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
680         svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
681
682         /*
683          * Enqueue the new transport on the accept queue of the listening
684          * transport
685          */
686         spin_lock_bh(&listen_xprt->sc_lock);
687         list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
688         spin_unlock_bh(&listen_xprt->sc_lock);
689
690         set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
691         svc_xprt_enqueue(&listen_xprt->sc_xprt);
692 }
693
694 /*
695  * Handles events generated on the listening endpoint. These events will be
696  * either be incoming connect requests or adapter removal  events.
697  */
698 static int rdma_listen_handler(struct rdma_cm_id *cma_id,
699                                struct rdma_cm_event *event)
700 {
701         struct svcxprt_rdma *xprt = cma_id->context;
702         int ret = 0;
703
704         switch (event->event) {
705         case RDMA_CM_EVENT_CONNECT_REQUEST:
706                 dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, "
707                         "event = %s (%d)\n", cma_id, cma_id->context,
708                         rdma_event_msg(event->event), event->event);
709                 handle_connect_req(cma_id,
710                                    event->param.conn.initiator_depth);
711                 break;
712
713         case RDMA_CM_EVENT_ESTABLISHED:
714                 /* Accept complete */
715                 dprintk("svcrdma: Connection completed on LISTEN xprt=%p, "
716                         "cm_id=%p\n", xprt, cma_id);
717                 break;
718
719         case RDMA_CM_EVENT_DEVICE_REMOVAL:
720                 dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n",
721                         xprt, cma_id);
722                 if (xprt)
723                         set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
724                 break;
725
726         default:
727                 dprintk("svcrdma: Unexpected event on listening endpoint %p, "
728                         "event = %s (%d)\n", cma_id,
729                         rdma_event_msg(event->event), event->event);
730                 break;
731         }
732
733         return ret;
734 }
735
736 static int rdma_cma_handler(struct rdma_cm_id *cma_id,
737                             struct rdma_cm_event *event)
738 {
739         struct svc_xprt *xprt = cma_id->context;
740         struct svcxprt_rdma *rdma =
741                 container_of(xprt, struct svcxprt_rdma, sc_xprt);
742         switch (event->event) {
743         case RDMA_CM_EVENT_ESTABLISHED:
744                 /* Accept complete */
745                 svc_xprt_get(xprt);
746                 dprintk("svcrdma: Connection completed on DTO xprt=%p, "
747                         "cm_id=%p\n", xprt, cma_id);
748                 clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags);
749                 svc_xprt_enqueue(xprt);
750                 break;
751         case RDMA_CM_EVENT_DISCONNECTED:
752                 dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n",
753                         xprt, cma_id);
754                 if (xprt) {
755                         set_bit(XPT_CLOSE, &xprt->xpt_flags);
756                         svc_xprt_enqueue(xprt);
757                         svc_xprt_put(xprt);
758                 }
759                 break;
760         case RDMA_CM_EVENT_DEVICE_REMOVAL:
761                 dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, "
762                         "event = %s (%d)\n", cma_id, xprt,
763                         rdma_event_msg(event->event), event->event);
764                 if (xprt) {
765                         set_bit(XPT_CLOSE, &xprt->xpt_flags);
766                         svc_xprt_enqueue(xprt);
767                         svc_xprt_put(xprt);
768                 }
769                 break;
770         default:
771                 dprintk("svcrdma: Unexpected event on DTO endpoint %p, "
772                         "event = %s (%d)\n", cma_id,
773                         rdma_event_msg(event->event), event->event);
774                 break;
775         }
776         return 0;
777 }
778
779 /*
780  * Create a listening RDMA service endpoint.
781  */
782 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
783                                         struct net *net,
784                                         struct sockaddr *sa, int salen,
785                                         int flags)
786 {
787         struct rdma_cm_id *listen_id;
788         struct svcxprt_rdma *cma_xprt;
789         int ret;
790
791         dprintk("svcrdma: Creating RDMA socket\n");
792         if ((sa->sa_family != AF_INET) && (sa->sa_family != AF_INET6)) {
793                 dprintk("svcrdma: Address family %d is not supported.\n", sa->sa_family);
794                 return ERR_PTR(-EAFNOSUPPORT);
795         }
796         cma_xprt = rdma_create_xprt(serv, 1);
797         if (!cma_xprt)
798                 return ERR_PTR(-ENOMEM);
799
800         listen_id = rdma_create_id(&init_net, rdma_listen_handler, cma_xprt,
801                                    RDMA_PS_TCP, IB_QPT_RC);
802         if (IS_ERR(listen_id)) {
803                 ret = PTR_ERR(listen_id);
804                 dprintk("svcrdma: rdma_create_id failed = %d\n", ret);
805                 goto err0;
806         }
807
808         /* Allow both IPv4 and IPv6 sockets to bind a single port
809          * at the same time.
810          */
811 #if IS_ENABLED(CONFIG_IPV6)
812         ret = rdma_set_afonly(listen_id, 1);
813         if (ret) {
814                 dprintk("svcrdma: rdma_set_afonly failed = %d\n", ret);
815                 goto err1;
816         }
817 #endif
818         ret = rdma_bind_addr(listen_id, sa);
819         if (ret) {
820                 dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
821                 goto err1;
822         }
823         cma_xprt->sc_cm_id = listen_id;
824
825         ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
826         if (ret) {
827                 dprintk("svcrdma: rdma_listen failed = %d\n", ret);
828                 goto err1;
829         }
830
831         /*
832          * We need to use the address from the cm_id in case the
833          * caller specified 0 for the port number.
834          */
835         sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr;
836         svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen);
837
838         return &cma_xprt->sc_xprt;
839
840  err1:
841         rdma_destroy_id(listen_id);
842  err0:
843         kfree(cma_xprt);
844         return ERR_PTR(ret);
845 }
846
847 static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt)
848 {
849         struct ib_mr *mr;
850         struct scatterlist *sg;
851         struct svc_rdma_fastreg_mr *frmr;
852         u32 num_sg;
853
854         frmr = kmalloc(sizeof(*frmr), GFP_KERNEL);
855         if (!frmr)
856                 goto err;
857
858         num_sg = min_t(u32, RPCSVC_MAXPAGES, xprt->sc_frmr_pg_list_len);
859         mr = ib_alloc_mr(xprt->sc_pd, IB_MR_TYPE_MEM_REG, num_sg);
860         if (IS_ERR(mr))
861                 goto err_free_frmr;
862
863         sg = kcalloc(RPCSVC_MAXPAGES, sizeof(*sg), GFP_KERNEL);
864         if (!sg)
865                 goto err_free_mr;
866
867         sg_init_table(sg, RPCSVC_MAXPAGES);
868
869         frmr->mr = mr;
870         frmr->sg = sg;
871         INIT_LIST_HEAD(&frmr->frmr_list);
872         return frmr;
873
874  err_free_mr:
875         ib_dereg_mr(mr);
876  err_free_frmr:
877         kfree(frmr);
878  err:
879         return ERR_PTR(-ENOMEM);
880 }
881
882 static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
883 {
884         struct svc_rdma_fastreg_mr *frmr;
885
886         while (!list_empty(&xprt->sc_frmr_q)) {
887                 frmr = list_entry(xprt->sc_frmr_q.next,
888                                   struct svc_rdma_fastreg_mr, frmr_list);
889                 list_del_init(&frmr->frmr_list);
890                 kfree(frmr->sg);
891                 ib_dereg_mr(frmr->mr);
892                 kfree(frmr);
893         }
894 }
895
896 struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
897 {
898         struct svc_rdma_fastreg_mr *frmr = NULL;
899
900         spin_lock_bh(&rdma->sc_frmr_q_lock);
901         if (!list_empty(&rdma->sc_frmr_q)) {
902                 frmr = list_entry(rdma->sc_frmr_q.next,
903                                   struct svc_rdma_fastreg_mr, frmr_list);
904                 list_del_init(&frmr->frmr_list);
905                 frmr->sg_nents = 0;
906         }
907         spin_unlock_bh(&rdma->sc_frmr_q_lock);
908         if (frmr)
909                 return frmr;
910
911         return rdma_alloc_frmr(rdma);
912 }
913
914 void svc_rdma_put_frmr(struct svcxprt_rdma *rdma,
915                        struct svc_rdma_fastreg_mr *frmr)
916 {
917         if (frmr) {
918                 ib_dma_unmap_sg(rdma->sc_cm_id->device,
919                                 frmr->sg, frmr->sg_nents, frmr->direction);
920                 atomic_dec(&rdma->sc_dma_used);
921                 spin_lock_bh(&rdma->sc_frmr_q_lock);
922                 WARN_ON_ONCE(!list_empty(&frmr->frmr_list));
923                 list_add(&frmr->frmr_list, &rdma->sc_frmr_q);
924                 spin_unlock_bh(&rdma->sc_frmr_q_lock);
925         }
926 }
927
928 /*
929  * This is the xpo_recvfrom function for listening endpoints. Its
930  * purpose is to accept incoming connections. The CMA callback handler
931  * has already created a new transport and attached it to the new CMA
932  * ID.
933  *
934  * There is a queue of pending connections hung on the listening
935  * transport. This queue contains the new svc_xprt structure. This
936  * function takes svc_xprt structures off the accept_q and completes
937  * the connection.
938  */
939 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
940 {
941         struct svcxprt_rdma *listen_rdma;
942         struct svcxprt_rdma *newxprt = NULL;
943         struct rdma_conn_param conn_param;
944         struct ib_qp_init_attr qp_attr;
945         struct ib_device *dev;
946         unsigned int i;
947         int ret = 0;
948
949         listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
950         clear_bit(XPT_CONN, &xprt->xpt_flags);
951         /* Get the next entry off the accept list */
952         spin_lock_bh(&listen_rdma->sc_lock);
953         if (!list_empty(&listen_rdma->sc_accept_q)) {
954                 newxprt = list_entry(listen_rdma->sc_accept_q.next,
955                                      struct svcxprt_rdma, sc_accept_q);
956                 list_del_init(&newxprt->sc_accept_q);
957         }
958         if (!list_empty(&listen_rdma->sc_accept_q))
959                 set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
960         spin_unlock_bh(&listen_rdma->sc_lock);
961         if (!newxprt)
962                 return NULL;
963
964         dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n",
965                 newxprt, newxprt->sc_cm_id);
966
967         dev = newxprt->sc_cm_id->device;
968
969         /* Qualify the transport resource defaults with the
970          * capabilities of this particular device */
971         newxprt->sc_max_sge = min((size_t)dev->attrs.max_sge,
972                                   (size_t)RPCSVC_MAXPAGES);
973         newxprt->sc_max_sge_rd = min_t(size_t, dev->attrs.max_sge_rd,
974                                        RPCSVC_MAXPAGES);
975         newxprt->sc_max_req_size = svcrdma_max_req_size;
976         newxprt->sc_max_requests = min_t(u32, dev->attrs.max_qp_wr,
977                                          svcrdma_max_requests);
978         newxprt->sc_max_bc_requests = min_t(u32, dev->attrs.max_qp_wr,
979                                             svcrdma_max_bc_requests);
980         newxprt->sc_rq_depth = newxprt->sc_max_requests +
981                                newxprt->sc_max_bc_requests;
982         newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_rq_depth;
983
984         if (!svc_rdma_prealloc_ctxts(newxprt))
985                 goto errout;
986         if (!svc_rdma_prealloc_maps(newxprt))
987                 goto errout;
988
989         /*
990          * Limit ORD based on client limit, local device limit, and
991          * configured svcrdma limit.
992          */
993         newxprt->sc_ord = min_t(size_t, dev->attrs.max_qp_rd_atom, newxprt->sc_ord);
994         newxprt->sc_ord = min_t(size_t, svcrdma_ord, newxprt->sc_ord);
995
996         newxprt->sc_pd = ib_alloc_pd(dev, 0);
997         if (IS_ERR(newxprt->sc_pd)) {
998                 dprintk("svcrdma: error creating PD for connect request\n");
999                 goto errout;
1000         }
1001         newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth,
1002                                         0, IB_POLL_SOFTIRQ);
1003         if (IS_ERR(newxprt->sc_sq_cq)) {
1004                 dprintk("svcrdma: error creating SQ CQ for connect request\n");
1005                 goto errout;
1006         }
1007         newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth,
1008                                         0, IB_POLL_SOFTIRQ);
1009         if (IS_ERR(newxprt->sc_rq_cq)) {
1010                 dprintk("svcrdma: error creating RQ CQ for connect request\n");
1011                 goto errout;
1012         }
1013
1014         memset(&qp_attr, 0, sizeof qp_attr);
1015         qp_attr.event_handler = qp_event_handler;
1016         qp_attr.qp_context = &newxprt->sc_xprt;
1017         qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
1018         qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth;
1019         qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
1020         qp_attr.cap.max_recv_sge = newxprt->sc_max_sge;
1021         qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
1022         qp_attr.qp_type = IB_QPT_RC;
1023         qp_attr.send_cq = newxprt->sc_sq_cq;
1024         qp_attr.recv_cq = newxprt->sc_rq_cq;
1025         dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n"
1026                 "    cm_id->device=%p, sc_pd->device=%p\n"
1027                 "    cap.max_send_wr = %d\n"
1028                 "    cap.max_recv_wr = %d\n"
1029                 "    cap.max_send_sge = %d\n"
1030                 "    cap.max_recv_sge = %d\n",
1031                 newxprt->sc_cm_id, newxprt->sc_pd,
1032                 dev, newxprt->sc_pd->device,
1033                 qp_attr.cap.max_send_wr,
1034                 qp_attr.cap.max_recv_wr,
1035                 qp_attr.cap.max_send_sge,
1036                 qp_attr.cap.max_recv_sge);
1037
1038         ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
1039         if (ret) {
1040                 dprintk("svcrdma: failed to create QP, ret=%d\n", ret);
1041                 goto errout;
1042         }
1043         newxprt->sc_qp = newxprt->sc_cm_id->qp;
1044
1045         /*
1046          * Use the most secure set of MR resources based on the
1047          * transport type and available memory management features in
1048          * the device. Here's the table implemented below:
1049          *
1050          *              Fast    Global  DMA     Remote WR
1051          *              Reg     LKEY    MR      Access
1052          *              Sup'd   Sup'd   Needed  Needed
1053          *
1054          * IWARP        N       N       Y       Y
1055          *              N       Y       Y       Y
1056          *              Y       N       Y       N
1057          *              Y       Y       N       -
1058          *
1059          * IB           N       N       Y       N
1060          *              N       Y       N       -
1061          *              Y       N       Y       N
1062          *              Y       Y       N       -
1063          *
1064          * NB:  iWARP requires remote write access for the data sink
1065          *      of an RDMA_READ. IB does not.
1066          */
1067         newxprt->sc_reader = rdma_read_chunk_lcl;
1068         if (dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) {
1069                 newxprt->sc_frmr_pg_list_len =
1070                         dev->attrs.max_fast_reg_page_list_len;
1071                 newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;
1072                 newxprt->sc_reader = rdma_read_chunk_frmr;
1073         }
1074
1075         /*
1076          * Determine if a DMA MR is required and if so, what privs are required
1077          */
1078         if (!rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num) &&
1079             !rdma_ib_or_roce(dev, newxprt->sc_cm_id->port_num))
1080                 goto errout;
1081
1082         if (rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num))
1083                 newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_READ_W_INV;
1084
1085         /* Post receive buffers */
1086         for (i = 0; i < newxprt->sc_max_requests; i++) {
1087                 ret = svc_rdma_post_recv(newxprt, GFP_KERNEL);
1088                 if (ret) {
1089                         dprintk("svcrdma: failure posting receive buffers\n");
1090                         goto errout;
1091                 }
1092         }
1093
1094         /* Swap out the handler */
1095         newxprt->sc_cm_id->event_handler = rdma_cma_handler;
1096
1097         /* Accept Connection */
1098         set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
1099         memset(&conn_param, 0, sizeof conn_param);
1100         conn_param.responder_resources = 0;
1101         conn_param.initiator_depth = newxprt->sc_ord;
1102         ret = rdma_accept(newxprt->sc_cm_id, &conn_param);
1103         if (ret) {
1104                 dprintk("svcrdma: failed to accept new connection, ret=%d\n",
1105                        ret);
1106                 goto errout;
1107         }
1108
1109         dprintk("svcrdma: new connection %p accepted with the following "
1110                 "attributes:\n"
1111                 "    local_ip        : %pI4\n"
1112                 "    local_port      : %d\n"
1113                 "    remote_ip       : %pI4\n"
1114                 "    remote_port     : %d\n"
1115                 "    max_sge         : %d\n"
1116                 "    max_sge_rd      : %d\n"
1117                 "    sq_depth        : %d\n"
1118                 "    max_requests    : %d\n"
1119                 "    ord             : %d\n",
1120                 newxprt,
1121                 &((struct sockaddr_in *)&newxprt->sc_cm_id->
1122                          route.addr.src_addr)->sin_addr.s_addr,
1123                 ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->
1124                        route.addr.src_addr)->sin_port),
1125                 &((struct sockaddr_in *)&newxprt->sc_cm_id->
1126                          route.addr.dst_addr)->sin_addr.s_addr,
1127                 ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->
1128                        route.addr.dst_addr)->sin_port),
1129                 newxprt->sc_max_sge,
1130                 newxprt->sc_max_sge_rd,
1131                 newxprt->sc_sq_depth,
1132                 newxprt->sc_max_requests,
1133                 newxprt->sc_ord);
1134
1135         return &newxprt->sc_xprt;
1136
1137  errout:
1138         dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret);
1139         /* Take a reference in case the DTO handler runs */
1140         svc_xprt_get(&newxprt->sc_xprt);
1141         if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp))
1142                 ib_destroy_qp(newxprt->sc_qp);
1143         rdma_destroy_id(newxprt->sc_cm_id);
1144         /* This call to put will destroy the transport */
1145         svc_xprt_put(&newxprt->sc_xprt);
1146         return NULL;
1147 }
1148
1149 static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
1150 {
1151 }
1152
1153 /*
1154  * When connected, an svc_xprt has at least two references:
1155  *
1156  * - A reference held by the cm_id between the ESTABLISHED and
1157  *   DISCONNECTED events. If the remote peer disconnected first, this
1158  *   reference could be gone.
1159  *
1160  * - A reference held by the svc_recv code that called this function
1161  *   as part of close processing.
1162  *
1163  * At a minimum one references should still be held.
1164  */
1165 static void svc_rdma_detach(struct svc_xprt *xprt)
1166 {
1167         struct svcxprt_rdma *rdma =
1168                 container_of(xprt, struct svcxprt_rdma, sc_xprt);
1169         dprintk("svc: svc_rdma_detach(%p)\n", xprt);
1170
1171         /* Disconnect and flush posted WQE */
1172         rdma_disconnect(rdma->sc_cm_id);
1173 }
1174
1175 static void __svc_rdma_free(struct work_struct *work)
1176 {
1177         struct svcxprt_rdma *rdma =
1178                 container_of(work, struct svcxprt_rdma, sc_work);
1179         struct svc_xprt *xprt = &rdma->sc_xprt;
1180
1181         dprintk("svcrdma: %s(%p)\n", __func__, rdma);
1182
1183         if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
1184                 ib_drain_qp(rdma->sc_qp);
1185
1186         /* We should only be called from kref_put */
1187         if (atomic_read(&xprt->xpt_ref.refcount) != 0)
1188                 pr_err("svcrdma: sc_xprt still in use? (%d)\n",
1189                        atomic_read(&xprt->xpt_ref.refcount));
1190
1191         /*
1192          * Destroy queued, but not processed read completions. Note
1193          * that this cleanup has to be done before destroying the
1194          * cm_id because the device ptr is needed to unmap the dma in
1195          * svc_rdma_put_context.
1196          */
1197         while (!list_empty(&rdma->sc_read_complete_q)) {
1198                 struct svc_rdma_op_ctxt *ctxt;
1199                 ctxt = list_entry(rdma->sc_read_complete_q.next,
1200                                   struct svc_rdma_op_ctxt,
1201                                   dto_q);
1202                 list_del_init(&ctxt->dto_q);
1203                 svc_rdma_put_context(ctxt, 1);
1204         }
1205
1206         /* Destroy queued, but not processed recv completions */
1207         while (!list_empty(&rdma->sc_rq_dto_q)) {
1208                 struct svc_rdma_op_ctxt *ctxt;
1209                 ctxt = list_entry(rdma->sc_rq_dto_q.next,
1210                                   struct svc_rdma_op_ctxt,
1211                                   dto_q);
1212                 list_del_init(&ctxt->dto_q);
1213                 svc_rdma_put_context(ctxt, 1);
1214         }
1215
1216         /* Warn if we leaked a resource or under-referenced */
1217         if (rdma->sc_ctxt_used != 0)
1218                 pr_err("svcrdma: ctxt still in use? (%d)\n",
1219                        rdma->sc_ctxt_used);
1220         if (atomic_read(&rdma->sc_dma_used) != 0)
1221                 pr_err("svcrdma: dma still in use? (%d)\n",
1222                        atomic_read(&rdma->sc_dma_used));
1223
1224         /* Final put of backchannel client transport */
1225         if (xprt->xpt_bc_xprt) {
1226                 xprt_put(xprt->xpt_bc_xprt);
1227                 xprt->xpt_bc_xprt = NULL;
1228         }
1229
1230         rdma_dealloc_frmr_q(rdma);
1231         svc_rdma_destroy_ctxts(rdma);
1232         svc_rdma_destroy_maps(rdma);
1233
1234         /* Destroy the QP if present (not a listener) */
1235         if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
1236                 ib_destroy_qp(rdma->sc_qp);
1237
1238         if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
1239                 ib_free_cq(rdma->sc_sq_cq);
1240
1241         if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
1242                 ib_free_cq(rdma->sc_rq_cq);
1243
1244         if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
1245                 ib_dealloc_pd(rdma->sc_pd);
1246
1247         /* Destroy the CM ID */
1248         rdma_destroy_id(rdma->sc_cm_id);
1249
1250         kfree(rdma);
1251 }
1252
1253 static void svc_rdma_free(struct svc_xprt *xprt)
1254 {
1255         struct svcxprt_rdma *rdma =
1256                 container_of(xprt, struct svcxprt_rdma, sc_xprt);
1257         INIT_WORK(&rdma->sc_work, __svc_rdma_free);
1258         queue_work(svc_rdma_wq, &rdma->sc_work);
1259 }
1260
1261 static int svc_rdma_has_wspace(struct svc_xprt *xprt)
1262 {
1263         struct svcxprt_rdma *rdma =
1264                 container_of(xprt, struct svcxprt_rdma, sc_xprt);
1265
1266         /*
1267          * If there are already waiters on the SQ,
1268          * return false.
1269          */
1270         if (waitqueue_active(&rdma->sc_send_wait))
1271                 return 0;
1272
1273         /* Otherwise return true. */
1274         return 1;
1275 }
1276
1277 static int svc_rdma_secure_port(struct svc_rqst *rqstp)
1278 {
1279         return 1;
1280 }
1281
1282 int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
1283 {
1284         struct ib_send_wr *bad_wr, *n_wr;
1285         int wr_count;
1286         int i;
1287         int ret;
1288
1289         if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
1290                 return -ENOTCONN;
1291
1292         wr_count = 1;
1293         for (n_wr = wr->next; n_wr; n_wr = n_wr->next)
1294                 wr_count++;
1295
1296         /* If the SQ is full, wait until an SQ entry is available */
1297         while (1) {
1298                 spin_lock_bh(&xprt->sc_lock);
1299                 if (xprt->sc_sq_depth < atomic_read(&xprt->sc_sq_count) + wr_count) {
1300                         spin_unlock_bh(&xprt->sc_lock);
1301                         atomic_inc(&rdma_stat_sq_starve);
1302
1303                         /* Wait until SQ WR available if SQ still full */
1304                         wait_event(xprt->sc_send_wait,
1305                                    atomic_read(&xprt->sc_sq_count) <
1306                                    xprt->sc_sq_depth);
1307                         if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
1308                                 return -ENOTCONN;
1309                         continue;
1310                 }
1311                 /* Take a transport ref for each WR posted */
1312                 for (i = 0; i < wr_count; i++)
1313                         svc_xprt_get(&xprt->sc_xprt);
1314
1315                 /* Bump used SQ WR count and post */
1316                 atomic_add(wr_count, &xprt->sc_sq_count);
1317                 ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
1318                 if (ret) {
1319                         set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
1320                         atomic_sub(wr_count, &xprt->sc_sq_count);
1321                         for (i = 0; i < wr_count; i ++)
1322                                 svc_xprt_put(&xprt->sc_xprt);
1323                         dprintk("svcrdma: failed to post SQ WR rc=%d, "
1324                                "sc_sq_count=%d, sc_sq_depth=%d\n",
1325                                ret, atomic_read(&xprt->sc_sq_count),
1326                                xprt->sc_sq_depth);
1327                 }
1328                 spin_unlock_bh(&xprt->sc_lock);
1329                 if (ret)
1330                         wake_up(&xprt->sc_send_wait);
1331                 break;
1332         }
1333         return ret;
1334 }