Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/jikos/trivial
[cascardo/linux.git] / net / sunrpc / xprtrdma / verbs.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <linux/prefetch.h>
53 #include <asm/bitops.h>
54
55 #include "xprt_rdma.h"
56
57 /*
58  * Globals/Macros
59  */
60
61 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
62 # define RPCDBG_FACILITY        RPCDBG_TRANS
63 #endif
64
65 static void rpcrdma_reset_frmrs(struct rpcrdma_ia *);
66 static void rpcrdma_reset_fmrs(struct rpcrdma_ia *);
67
68 /*
69  * internal functions
70  */
71
72 /*
73  * handle replies in tasklet context, using a single, global list
74  * rdma tasklet function -- just turn around and call the func
75  * for all replies on the list
76  */
77
78 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
79 static LIST_HEAD(rpcrdma_tasklets_g);
80
81 static void
82 rpcrdma_run_tasklet(unsigned long data)
83 {
84         struct rpcrdma_rep *rep;
85         void (*func)(struct rpcrdma_rep *);
86         unsigned long flags;
87
88         data = data;
89         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
90         while (!list_empty(&rpcrdma_tasklets_g)) {
91                 rep = list_entry(rpcrdma_tasklets_g.next,
92                                  struct rpcrdma_rep, rr_list);
93                 list_del(&rep->rr_list);
94                 func = rep->rr_func;
95                 rep->rr_func = NULL;
96                 spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
97
98                 if (func)
99                         func(rep);
100                 else
101                         rpcrdma_recv_buffer_put(rep);
102
103                 spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
104         }
105         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
106 }
107
108 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
109
110 static const char * const async_event[] = {
111         "CQ error",
112         "QP fatal error",
113         "QP request error",
114         "QP access error",
115         "communication established",
116         "send queue drained",
117         "path migration successful",
118         "path mig error",
119         "device fatal error",
120         "port active",
121         "port error",
122         "LID change",
123         "P_key change",
124         "SM change",
125         "SRQ error",
126         "SRQ limit reached",
127         "last WQE reached",
128         "client reregister",
129         "GID change",
130 };
131
132 #define ASYNC_MSG(status)                                       \
133         ((status) < ARRAY_SIZE(async_event) ?                   \
134                 async_event[(status)] : "unknown async error")
135
136 static void
137 rpcrdma_schedule_tasklet(struct list_head *sched_list)
138 {
139         unsigned long flags;
140
141         spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
142         list_splice_tail(sched_list, &rpcrdma_tasklets_g);
143         spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
144         tasklet_schedule(&rpcrdma_tasklet_g);
145 }
146
147 static void
148 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
149 {
150         struct rpcrdma_ep *ep = context;
151
152         pr_err("RPC:       %s: %s on device %s ep %p\n",
153                __func__, ASYNC_MSG(event->event),
154                 event->device->name, context);
155         if (ep->rep_connected == 1) {
156                 ep->rep_connected = -EIO;
157                 rpcrdma_conn_func(ep);
158                 wake_up_all(&ep->rep_connect_wait);
159         }
160 }
161
162 static void
163 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
164 {
165         struct rpcrdma_ep *ep = context;
166
167         pr_err("RPC:       %s: %s on device %s ep %p\n",
168                __func__, ASYNC_MSG(event->event),
169                 event->device->name, context);
170         if (ep->rep_connected == 1) {
171                 ep->rep_connected = -EIO;
172                 rpcrdma_conn_func(ep);
173                 wake_up_all(&ep->rep_connect_wait);
174         }
175 }
176
177 static const char * const wc_status[] = {
178         "success",
179         "local length error",
180         "local QP operation error",
181         "local EE context operation error",
182         "local protection error",
183         "WR flushed",
184         "memory management operation error",
185         "bad response error",
186         "local access error",
187         "remote invalid request error",
188         "remote access error",
189         "remote operation error",
190         "transport retry counter exceeded",
191         "RNR retrycounter exceeded",
192         "local RDD violation error",
193         "remove invalid RD request",
194         "operation aborted",
195         "invalid EE context number",
196         "invalid EE context state",
197         "fatal error",
198         "response timeout error",
199         "general error",
200 };
201
202 #define COMPLETION_MSG(status)                                  \
203         ((status) < ARRAY_SIZE(wc_status) ?                     \
204                 wc_status[(status)] : "unexpected completion error")
205
206 static void
207 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
208 {
209         if (likely(wc->status == IB_WC_SUCCESS))
210                 return;
211
212         /* WARNING: Only wr_id and status are reliable at this point */
213         if (wc->wr_id == 0ULL) {
214                 if (wc->status != IB_WC_WR_FLUSH_ERR)
215                         pr_err("RPC:       %s: SEND: %s\n",
216                                __func__, COMPLETION_MSG(wc->status));
217         } else {
218                 struct rpcrdma_mw *r;
219
220                 r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
221                 r->r.frmr.fr_state = FRMR_IS_STALE;
222                 pr_err("RPC:       %s: frmr %p (stale): %s\n",
223                        __func__, r, COMPLETION_MSG(wc->status));
224         }
225 }
226
227 static int
228 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
229 {
230         struct ib_wc *wcs;
231         int budget, count, rc;
232
233         budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
234         do {
235                 wcs = ep->rep_send_wcs;
236
237                 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
238                 if (rc <= 0)
239                         return rc;
240
241                 count = rc;
242                 while (count-- > 0)
243                         rpcrdma_sendcq_process_wc(wcs++);
244         } while (rc == RPCRDMA_POLLSIZE && --budget);
245         return 0;
246 }
247
248 /*
249  * Handle send, fast_reg_mr, and local_inv completions.
250  *
251  * Send events are typically suppressed and thus do not result
252  * in an upcall. Occasionally one is signaled, however. This
253  * prevents the provider's completion queue from wrapping and
254  * losing a completion.
255  */
256 static void
257 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
258 {
259         struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
260         int rc;
261
262         rc = rpcrdma_sendcq_poll(cq, ep);
263         if (rc) {
264                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
265                         __func__, rc);
266                 return;
267         }
268
269         rc = ib_req_notify_cq(cq,
270                         IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
271         if (rc == 0)
272                 return;
273         if (rc < 0) {
274                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
275                         __func__, rc);
276                 return;
277         }
278
279         rpcrdma_sendcq_poll(cq, ep);
280 }
281
282 static void
283 rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
284 {
285         struct rpcrdma_rep *rep =
286                         (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
287
288         /* WARNING: Only wr_id and status are reliable at this point */
289         if (wc->status != IB_WC_SUCCESS)
290                 goto out_fail;
291
292         /* status == SUCCESS means all fields in wc are trustworthy */
293         if (wc->opcode != IB_WC_RECV)
294                 return;
295
296         dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
297                 __func__, rep, wc->byte_len);
298
299         rep->rr_len = wc->byte_len;
300         ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
301                                    rdmab_addr(rep->rr_rdmabuf),
302                                    rep->rr_len, DMA_FROM_DEVICE);
303         prefetch(rdmab_to_msg(rep->rr_rdmabuf));
304
305 out_schedule:
306         list_add_tail(&rep->rr_list, sched_list);
307         return;
308 out_fail:
309         if (wc->status != IB_WC_WR_FLUSH_ERR)
310                 pr_err("RPC:       %s: rep %p: %s\n",
311                        __func__, rep, COMPLETION_MSG(wc->status));
312         rep->rr_len = ~0U;
313         goto out_schedule;
314 }
315
316 static int
317 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
318 {
319         struct list_head sched_list;
320         struct ib_wc *wcs;
321         int budget, count, rc;
322
323         INIT_LIST_HEAD(&sched_list);
324         budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
325         do {
326                 wcs = ep->rep_recv_wcs;
327
328                 rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
329                 if (rc <= 0)
330                         goto out_schedule;
331
332                 count = rc;
333                 while (count-- > 0)
334                         rpcrdma_recvcq_process_wc(wcs++, &sched_list);
335         } while (rc == RPCRDMA_POLLSIZE && --budget);
336         rc = 0;
337
338 out_schedule:
339         rpcrdma_schedule_tasklet(&sched_list);
340         return rc;
341 }
342
343 /*
344  * Handle receive completions.
345  *
346  * It is reentrant but processes single events in order to maintain
347  * ordering of receives to keep server credits.
348  *
349  * It is the responsibility of the scheduled tasklet to return
350  * recv buffers to the pool. NOTE: this affects synchronization of
351  * connection shutdown. That is, the structures required for
352  * the completion of the reply handler must remain intact until
353  * all memory has been reclaimed.
354  */
355 static void
356 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
357 {
358         struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
359         int rc;
360
361         rc = rpcrdma_recvcq_poll(cq, ep);
362         if (rc) {
363                 dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
364                         __func__, rc);
365                 return;
366         }
367
368         rc = ib_req_notify_cq(cq,
369                         IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
370         if (rc == 0)
371                 return;
372         if (rc < 0) {
373                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
374                         __func__, rc);
375                 return;
376         }
377
378         rpcrdma_recvcq_poll(cq, ep);
379 }
380
381 static void
382 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
383 {
384         struct ib_wc wc;
385         LIST_HEAD(sched_list);
386
387         while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
388                 rpcrdma_recvcq_process_wc(&wc, &sched_list);
389         if (!list_empty(&sched_list))
390                 rpcrdma_schedule_tasklet(&sched_list);
391         while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
392                 rpcrdma_sendcq_process_wc(&wc);
393 }
394
395 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
396 static const char * const conn[] = {
397         "address resolved",
398         "address error",
399         "route resolved",
400         "route error",
401         "connect request",
402         "connect response",
403         "connect error",
404         "unreachable",
405         "rejected",
406         "established",
407         "disconnected",
408         "device removal",
409         "multicast join",
410         "multicast error",
411         "address change",
412         "timewait exit",
413 };
414
415 #define CONNECTION_MSG(status)                                          \
416         ((status) < ARRAY_SIZE(conn) ?                                  \
417                 conn[(status)] : "unrecognized connection error")
418 #endif
419
420 static int
421 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
422 {
423         struct rpcrdma_xprt *xprt = id->context;
424         struct rpcrdma_ia *ia = &xprt->rx_ia;
425         struct rpcrdma_ep *ep = &xprt->rx_ep;
426 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
427         struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
428 #endif
429         struct ib_qp_attr *attr = &ia->ri_qp_attr;
430         struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
431         int connstate = 0;
432
433         switch (event->event) {
434         case RDMA_CM_EVENT_ADDR_RESOLVED:
435         case RDMA_CM_EVENT_ROUTE_RESOLVED:
436                 ia->ri_async_rc = 0;
437                 complete(&ia->ri_done);
438                 break;
439         case RDMA_CM_EVENT_ADDR_ERROR:
440                 ia->ri_async_rc = -EHOSTUNREACH;
441                 dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
442                         __func__, ep);
443                 complete(&ia->ri_done);
444                 break;
445         case RDMA_CM_EVENT_ROUTE_ERROR:
446                 ia->ri_async_rc = -ENETUNREACH;
447                 dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
448                         __func__, ep);
449                 complete(&ia->ri_done);
450                 break;
451         case RDMA_CM_EVENT_ESTABLISHED:
452                 connstate = 1;
453                 ib_query_qp(ia->ri_id->qp, attr,
454                             IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
455                             iattr);
456                 dprintk("RPC:       %s: %d responder resources"
457                         " (%d initiator)\n",
458                         __func__, attr->max_dest_rd_atomic,
459                         attr->max_rd_atomic);
460                 goto connected;
461         case RDMA_CM_EVENT_CONNECT_ERROR:
462                 connstate = -ENOTCONN;
463                 goto connected;
464         case RDMA_CM_EVENT_UNREACHABLE:
465                 connstate = -ENETDOWN;
466                 goto connected;
467         case RDMA_CM_EVENT_REJECTED:
468                 connstate = -ECONNREFUSED;
469                 goto connected;
470         case RDMA_CM_EVENT_DISCONNECTED:
471                 connstate = -ECONNABORTED;
472                 goto connected;
473         case RDMA_CM_EVENT_DEVICE_REMOVAL:
474                 connstate = -ENODEV;
475 connected:
476                 dprintk("RPC:       %s: %sconnected\n",
477                                         __func__, connstate > 0 ? "" : "dis");
478                 ep->rep_connected = connstate;
479                 rpcrdma_conn_func(ep);
480                 wake_up_all(&ep->rep_connect_wait);
481                 /*FALLTHROUGH*/
482         default:
483                 dprintk("RPC:       %s: %pI4:%u (ep 0x%p): %s\n",
484                         __func__, &addr->sin_addr.s_addr,
485                         ntohs(addr->sin_port), ep,
486                         CONNECTION_MSG(event->event));
487                 break;
488         }
489
490 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
491         if (connstate == 1) {
492                 int ird = attr->max_dest_rd_atomic;
493                 int tird = ep->rep_remote_cma.responder_resources;
494                 printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
495                         "on %s, memreg %d slots %d ird %d%s\n",
496                         &addr->sin_addr.s_addr,
497                         ntohs(addr->sin_port),
498                         ia->ri_id->device->name,
499                         ia->ri_memreg_strategy,
500                         xprt->rx_buf.rb_max_requests,
501                         ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
502         } else if (connstate < 0) {
503                 printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
504                         &addr->sin_addr.s_addr,
505                         ntohs(addr->sin_port),
506                         connstate);
507         }
508 #endif
509
510         return 0;
511 }
512
513 static struct rdma_cm_id *
514 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
515                         struct rpcrdma_ia *ia, struct sockaddr *addr)
516 {
517         struct rdma_cm_id *id;
518         int rc;
519
520         init_completion(&ia->ri_done);
521
522         id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
523         if (IS_ERR(id)) {
524                 rc = PTR_ERR(id);
525                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
526                         __func__, rc);
527                 return id;
528         }
529
530         ia->ri_async_rc = -ETIMEDOUT;
531         rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
532         if (rc) {
533                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
534                         __func__, rc);
535                 goto out;
536         }
537         wait_for_completion_interruptible_timeout(&ia->ri_done,
538                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
539         rc = ia->ri_async_rc;
540         if (rc)
541                 goto out;
542
543         ia->ri_async_rc = -ETIMEDOUT;
544         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
545         if (rc) {
546                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
547                         __func__, rc);
548                 goto out;
549         }
550         wait_for_completion_interruptible_timeout(&ia->ri_done,
551                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
552         rc = ia->ri_async_rc;
553         if (rc)
554                 goto out;
555
556         return id;
557
558 out:
559         rdma_destroy_id(id);
560         return ERR_PTR(rc);
561 }
562
563 /*
564  * Drain any cq, prior to teardown.
565  */
566 static void
567 rpcrdma_clean_cq(struct ib_cq *cq)
568 {
569         struct ib_wc wc;
570         int count = 0;
571
572         while (1 == ib_poll_cq(cq, 1, &wc))
573                 ++count;
574
575         if (count)
576                 dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
577                         __func__, count, wc.opcode);
578 }
579
580 /*
581  * Exported functions.
582  */
583
584 /*
585  * Open and initialize an Interface Adapter.
586  *  o initializes fields of struct rpcrdma_ia, including
587  *    interface and provider attributes and protection zone.
588  */
589 int
590 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
591 {
592         int rc, mem_priv;
593         struct rpcrdma_ia *ia = &xprt->rx_ia;
594         struct ib_device_attr *devattr = &ia->ri_devattr;
595
596         ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
597         if (IS_ERR(ia->ri_id)) {
598                 rc = PTR_ERR(ia->ri_id);
599                 goto out1;
600         }
601
602         ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
603         if (IS_ERR(ia->ri_pd)) {
604                 rc = PTR_ERR(ia->ri_pd);
605                 dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
606                         __func__, rc);
607                 goto out2;
608         }
609
610         rc = ib_query_device(ia->ri_id->device, devattr);
611         if (rc) {
612                 dprintk("RPC:       %s: ib_query_device failed %d\n",
613                         __func__, rc);
614                 goto out3;
615         }
616
617         if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
618                 ia->ri_have_dma_lkey = 1;
619                 ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
620         }
621
622         if (memreg == RPCRDMA_FRMR) {
623                 /* Requires both frmr reg and local dma lkey */
624                 if ((devattr->device_cap_flags &
625                      (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
626                     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
627                         dprintk("RPC:       %s: FRMR registration "
628                                 "not supported by HCA\n", __func__);
629                         memreg = RPCRDMA_MTHCAFMR;
630                 } else {
631                         /* Mind the ia limit on FRMR page list depth */
632                         ia->ri_max_frmr_depth = min_t(unsigned int,
633                                 RPCRDMA_MAX_DATA_SEGS,
634                                 devattr->max_fast_reg_page_list_len);
635                 }
636         }
637         if (memreg == RPCRDMA_MTHCAFMR) {
638                 if (!ia->ri_id->device->alloc_fmr) {
639                         dprintk("RPC:       %s: MTHCAFMR registration "
640                                 "not supported by HCA\n", __func__);
641                         memreg = RPCRDMA_ALLPHYSICAL;
642                 }
643         }
644
645         /*
646          * Optionally obtain an underlying physical identity mapping in
647          * order to do a memory window-based bind. This base registration
648          * is protected from remote access - that is enabled only by binding
649          * for the specific bytes targeted during each RPC operation, and
650          * revoked after the corresponding completion similar to a storage
651          * adapter.
652          */
653         switch (memreg) {
654         case RPCRDMA_FRMR:
655                 break;
656         case RPCRDMA_ALLPHYSICAL:
657                 mem_priv = IB_ACCESS_LOCAL_WRITE |
658                                 IB_ACCESS_REMOTE_WRITE |
659                                 IB_ACCESS_REMOTE_READ;
660                 goto register_setup;
661         case RPCRDMA_MTHCAFMR:
662                 if (ia->ri_have_dma_lkey)
663                         break;
664                 mem_priv = IB_ACCESS_LOCAL_WRITE;
665         register_setup:
666                 ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
667                 if (IS_ERR(ia->ri_bind_mem)) {
668                         printk(KERN_ALERT "%s: ib_get_dma_mr for "
669                                 "phys register failed with %lX\n",
670                                 __func__, PTR_ERR(ia->ri_bind_mem));
671                         rc = -ENOMEM;
672                         goto out3;
673                 }
674                 break;
675         default:
676                 printk(KERN_ERR "RPC: Unsupported memory "
677                                 "registration mode: %d\n", memreg);
678                 rc = -ENOMEM;
679                 goto out3;
680         }
681         dprintk("RPC:       %s: memory registration strategy is %d\n",
682                 __func__, memreg);
683
684         /* Else will do memory reg/dereg for each chunk */
685         ia->ri_memreg_strategy = memreg;
686
687         rwlock_init(&ia->ri_qplock);
688         return 0;
689
690 out3:
691         ib_dealloc_pd(ia->ri_pd);
692         ia->ri_pd = NULL;
693 out2:
694         rdma_destroy_id(ia->ri_id);
695         ia->ri_id = NULL;
696 out1:
697         return rc;
698 }
699
700 /*
701  * Clean up/close an IA.
702  *   o if event handles and PD have been initialized, free them.
703  *   o close the IA
704  */
705 void
706 rpcrdma_ia_close(struct rpcrdma_ia *ia)
707 {
708         int rc;
709
710         dprintk("RPC:       %s: entering\n", __func__);
711         if (ia->ri_bind_mem != NULL) {
712                 rc = ib_dereg_mr(ia->ri_bind_mem);
713                 dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
714                         __func__, rc);
715         }
716         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
717                 if (ia->ri_id->qp)
718                         rdma_destroy_qp(ia->ri_id);
719                 rdma_destroy_id(ia->ri_id);
720                 ia->ri_id = NULL;
721         }
722         if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
723                 rc = ib_dealloc_pd(ia->ri_pd);
724                 dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
725                         __func__, rc);
726         }
727 }
728
729 /*
730  * Create unconnected endpoint.
731  */
732 int
733 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
734                                 struct rpcrdma_create_data_internal *cdata)
735 {
736         struct ib_device_attr *devattr = &ia->ri_devattr;
737         struct ib_cq *sendcq, *recvcq;
738         int rc, err;
739
740         /* check provider's send/recv wr limits */
741         if (cdata->max_requests > devattr->max_qp_wr)
742                 cdata->max_requests = devattr->max_qp_wr;
743
744         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
745         ep->rep_attr.qp_context = ep;
746         /* send_cq and recv_cq initialized below */
747         ep->rep_attr.srq = NULL;
748         ep->rep_attr.cap.max_send_wr = cdata->max_requests;
749         switch (ia->ri_memreg_strategy) {
750         case RPCRDMA_FRMR: {
751                 int depth = 7;
752
753                 /* Add room for frmr register and invalidate WRs.
754                  * 1. FRMR reg WR for head
755                  * 2. FRMR invalidate WR for head
756                  * 3. N FRMR reg WRs for pagelist
757                  * 4. N FRMR invalidate WRs for pagelist
758                  * 5. FRMR reg WR for tail
759                  * 6. FRMR invalidate WR for tail
760                  * 7. The RDMA_SEND WR
761                  */
762
763                 /* Calculate N if the device max FRMR depth is smaller than
764                  * RPCRDMA_MAX_DATA_SEGS.
765                  */
766                 if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
767                         int delta = RPCRDMA_MAX_DATA_SEGS -
768                                     ia->ri_max_frmr_depth;
769
770                         do {
771                                 depth += 2; /* FRMR reg + invalidate */
772                                 delta -= ia->ri_max_frmr_depth;
773                         } while (delta > 0);
774
775                 }
776                 ep->rep_attr.cap.max_send_wr *= depth;
777                 if (ep->rep_attr.cap.max_send_wr > devattr->max_qp_wr) {
778                         cdata->max_requests = devattr->max_qp_wr / depth;
779                         if (!cdata->max_requests)
780                                 return -EINVAL;
781                         ep->rep_attr.cap.max_send_wr = cdata->max_requests *
782                                                        depth;
783                 }
784                 break;
785         }
786         default:
787                 break;
788         }
789         ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
790         ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
791         ep->rep_attr.cap.max_recv_sge = 1;
792         ep->rep_attr.cap.max_inline_data = 0;
793         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
794         ep->rep_attr.qp_type = IB_QPT_RC;
795         ep->rep_attr.port_num = ~0;
796
797         if (cdata->padding) {
798                 ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding,
799                                                       GFP_KERNEL);
800                 if (IS_ERR(ep->rep_padbuf))
801                         return PTR_ERR(ep->rep_padbuf);
802         } else
803                 ep->rep_padbuf = NULL;
804
805         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
806                 "iovs: send %d recv %d\n",
807                 __func__,
808                 ep->rep_attr.cap.max_send_wr,
809                 ep->rep_attr.cap.max_recv_wr,
810                 ep->rep_attr.cap.max_send_sge,
811                 ep->rep_attr.cap.max_recv_sge);
812
813         /* set trigger for requesting send completion */
814         ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
815         if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS)
816                 ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS;
817         else if (ep->rep_cqinit <= 2)
818                 ep->rep_cqinit = 0;
819         INIT_CQCOUNT(ep);
820         init_waitqueue_head(&ep->rep_connect_wait);
821         INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
822
823         sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
824                                   rpcrdma_cq_async_error_upcall, ep,
825                                   ep->rep_attr.cap.max_send_wr + 1, 0);
826         if (IS_ERR(sendcq)) {
827                 rc = PTR_ERR(sendcq);
828                 dprintk("RPC:       %s: failed to create send CQ: %i\n",
829                         __func__, rc);
830                 goto out1;
831         }
832
833         rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
834         if (rc) {
835                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
836                         __func__, rc);
837                 goto out2;
838         }
839
840         recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
841                                   rpcrdma_cq_async_error_upcall, ep,
842                                   ep->rep_attr.cap.max_recv_wr + 1, 0);
843         if (IS_ERR(recvcq)) {
844                 rc = PTR_ERR(recvcq);
845                 dprintk("RPC:       %s: failed to create recv CQ: %i\n",
846                         __func__, rc);
847                 goto out2;
848         }
849
850         rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
851         if (rc) {
852                 dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
853                         __func__, rc);
854                 ib_destroy_cq(recvcq);
855                 goto out2;
856         }
857
858         ep->rep_attr.send_cq = sendcq;
859         ep->rep_attr.recv_cq = recvcq;
860
861         /* Initialize cma parameters */
862
863         /* RPC/RDMA does not use private data */
864         ep->rep_remote_cma.private_data = NULL;
865         ep->rep_remote_cma.private_data_len = 0;
866
867         /* Client offers RDMA Read but does not initiate */
868         ep->rep_remote_cma.initiator_depth = 0;
869         if (devattr->max_qp_rd_atom > 32)       /* arbitrary but <= 255 */
870                 ep->rep_remote_cma.responder_resources = 32;
871         else
872                 ep->rep_remote_cma.responder_resources =
873                                                 devattr->max_qp_rd_atom;
874
875         ep->rep_remote_cma.retry_count = 7;
876         ep->rep_remote_cma.flow_control = 0;
877         ep->rep_remote_cma.rnr_retry_count = 0;
878
879         return 0;
880
881 out2:
882         err = ib_destroy_cq(sendcq);
883         if (err)
884                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
885                         __func__, err);
886 out1:
887         rpcrdma_free_regbuf(ia, ep->rep_padbuf);
888         return rc;
889 }
890
891 /*
892  * rpcrdma_ep_destroy
893  *
894  * Disconnect and destroy endpoint. After this, the only
895  * valid operations on the ep are to free it (if dynamically
896  * allocated) or re-create it.
897  */
898 void
899 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
900 {
901         int rc;
902
903         dprintk("RPC:       %s: entering, connected is %d\n",
904                 __func__, ep->rep_connected);
905
906         cancel_delayed_work_sync(&ep->rep_connect_worker);
907
908         if (ia->ri_id->qp) {
909                 rpcrdma_ep_disconnect(ep, ia);
910                 rdma_destroy_qp(ia->ri_id);
911                 ia->ri_id->qp = NULL;
912         }
913
914         rpcrdma_free_regbuf(ia, ep->rep_padbuf);
915
916         rpcrdma_clean_cq(ep->rep_attr.recv_cq);
917         rc = ib_destroy_cq(ep->rep_attr.recv_cq);
918         if (rc)
919                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
920                         __func__, rc);
921
922         rpcrdma_clean_cq(ep->rep_attr.send_cq);
923         rc = ib_destroy_cq(ep->rep_attr.send_cq);
924         if (rc)
925                 dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
926                         __func__, rc);
927 }
928
929 /*
930  * Connect unconnected endpoint.
931  */
932 int
933 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
934 {
935         struct rdma_cm_id *id, *old;
936         int rc = 0;
937         int retry_count = 0;
938
939         if (ep->rep_connected != 0) {
940                 struct rpcrdma_xprt *xprt;
941 retry:
942                 dprintk("RPC:       %s: reconnecting...\n", __func__);
943
944                 rpcrdma_ep_disconnect(ep, ia);
945                 rpcrdma_flush_cqs(ep);
946
947                 switch (ia->ri_memreg_strategy) {
948                 case RPCRDMA_FRMR:
949                         rpcrdma_reset_frmrs(ia);
950                         break;
951                 case RPCRDMA_MTHCAFMR:
952                         rpcrdma_reset_fmrs(ia);
953                         break;
954                 case RPCRDMA_ALLPHYSICAL:
955                         break;
956                 default:
957                         rc = -EIO;
958                         goto out;
959                 }
960
961                 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
962                 id = rpcrdma_create_id(xprt, ia,
963                                 (struct sockaddr *)&xprt->rx_data.addr);
964                 if (IS_ERR(id)) {
965                         rc = -EHOSTUNREACH;
966                         goto out;
967                 }
968                 /* TEMP TEMP TEMP - fail if new device:
969                  * Deregister/remarshal *all* requests!
970                  * Close and recreate adapter, pd, etc!
971                  * Re-determine all attributes still sane!
972                  * More stuff I haven't thought of!
973                  * Rrrgh!
974                  */
975                 if (ia->ri_id->device != id->device) {
976                         printk("RPC:       %s: can't reconnect on "
977                                 "different device!\n", __func__);
978                         rdma_destroy_id(id);
979                         rc = -ENETUNREACH;
980                         goto out;
981                 }
982                 /* END TEMP */
983                 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
984                 if (rc) {
985                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
986                                 __func__, rc);
987                         rdma_destroy_id(id);
988                         rc = -ENETUNREACH;
989                         goto out;
990                 }
991
992                 write_lock(&ia->ri_qplock);
993                 old = ia->ri_id;
994                 ia->ri_id = id;
995                 write_unlock(&ia->ri_qplock);
996
997                 rdma_destroy_qp(old);
998                 rdma_destroy_id(old);
999         } else {
1000                 dprintk("RPC:       %s: connecting...\n", __func__);
1001                 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
1002                 if (rc) {
1003                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
1004                                 __func__, rc);
1005                         /* do not update ep->rep_connected */
1006                         return -ENETUNREACH;
1007                 }
1008         }
1009
1010         ep->rep_connected = 0;
1011
1012         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
1013         if (rc) {
1014                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
1015                                 __func__, rc);
1016                 goto out;
1017         }
1018
1019         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
1020
1021         /*
1022          * Check state. A non-peer reject indicates no listener
1023          * (ECONNREFUSED), which may be a transient state. All
1024          * others indicate a transport condition which has already
1025          * undergone a best-effort.
1026          */
1027         if (ep->rep_connected == -ECONNREFUSED &&
1028             ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
1029                 dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
1030                 goto retry;
1031         }
1032         if (ep->rep_connected <= 0) {
1033                 /* Sometimes, the only way to reliably connect to remote
1034                  * CMs is to use same nonzero values for ORD and IRD. */
1035                 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
1036                     (ep->rep_remote_cma.responder_resources == 0 ||
1037                      ep->rep_remote_cma.initiator_depth !=
1038                                 ep->rep_remote_cma.responder_resources)) {
1039                         if (ep->rep_remote_cma.responder_resources == 0)
1040                                 ep->rep_remote_cma.responder_resources = 1;
1041                         ep->rep_remote_cma.initiator_depth =
1042                                 ep->rep_remote_cma.responder_resources;
1043                         goto retry;
1044                 }
1045                 rc = ep->rep_connected;
1046         } else {
1047                 dprintk("RPC:       %s: connected\n", __func__);
1048         }
1049
1050 out:
1051         if (rc)
1052                 ep->rep_connected = rc;
1053         return rc;
1054 }
1055
1056 /*
1057  * rpcrdma_ep_disconnect
1058  *
1059  * This is separate from destroy to facilitate the ability
1060  * to reconnect without recreating the endpoint.
1061  *
1062  * This call is not reentrant, and must not be made in parallel
1063  * on the same endpoint.
1064  */
1065 void
1066 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
1067 {
1068         int rc;
1069
1070         rpcrdma_flush_cqs(ep);
1071         rc = rdma_disconnect(ia->ri_id);
1072         if (!rc) {
1073                 /* returns without wait if not connected */
1074                 wait_event_interruptible(ep->rep_connect_wait,
1075                                                         ep->rep_connected != 1);
1076                 dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
1077                         (ep->rep_connected == 1) ? "still " : "dis");
1078         } else {
1079                 dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
1080                 ep->rep_connected = rc;
1081         }
1082 }
1083
1084 static struct rpcrdma_req *
1085 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
1086 {
1087         struct rpcrdma_req *req;
1088
1089         req = kzalloc(sizeof(*req), GFP_KERNEL);
1090         if (req == NULL)
1091                 return ERR_PTR(-ENOMEM);
1092
1093         req->rl_buffer = &r_xprt->rx_buf;
1094         return req;
1095 }
1096
1097 static struct rpcrdma_rep *
1098 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
1099 {
1100         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1101         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1102         struct rpcrdma_rep *rep;
1103         int rc;
1104
1105         rc = -ENOMEM;
1106         rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1107         if (rep == NULL)
1108                 goto out;
1109
1110         rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
1111                                                GFP_KERNEL);
1112         if (IS_ERR(rep->rr_rdmabuf)) {
1113                 rc = PTR_ERR(rep->rr_rdmabuf);
1114                 goto out_free;
1115         }
1116
1117         rep->rr_buffer = &r_xprt->rx_buf;
1118         return rep;
1119
1120 out_free:
1121         kfree(rep);
1122 out:
1123         return ERR_PTR(rc);
1124 }
1125
1126 static int
1127 rpcrdma_init_fmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
1128 {
1129         int mr_access_flags = IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ;
1130         struct ib_fmr_attr fmr_attr = {
1131                 .max_pages      = RPCRDMA_MAX_DATA_SEGS,
1132                 .max_maps       = 1,
1133                 .page_shift     = PAGE_SHIFT
1134         };
1135         struct rpcrdma_mw *r;
1136         int i, rc;
1137
1138         i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
1139         dprintk("RPC:       %s: initializing %d FMRs\n", __func__, i);
1140
1141         while (i--) {
1142                 r = kzalloc(sizeof(*r), GFP_KERNEL);
1143                 if (r == NULL)
1144                         return -ENOMEM;
1145
1146                 r->r.fmr = ib_alloc_fmr(ia->ri_pd, mr_access_flags, &fmr_attr);
1147                 if (IS_ERR(r->r.fmr)) {
1148                         rc = PTR_ERR(r->r.fmr);
1149                         dprintk("RPC:       %s: ib_alloc_fmr failed %i\n",
1150                                 __func__, rc);
1151                         goto out_free;
1152                 }
1153
1154                 list_add(&r->mw_list, &buf->rb_mws);
1155                 list_add(&r->mw_all, &buf->rb_all);
1156         }
1157         return 0;
1158
1159 out_free:
1160         kfree(r);
1161         return rc;
1162 }
1163
1164 static int
1165 rpcrdma_init_frmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
1166 {
1167         struct rpcrdma_frmr *f;
1168         struct rpcrdma_mw *r;
1169         int i, rc;
1170
1171         i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
1172         dprintk("RPC:       %s: initializing %d FRMRs\n", __func__, i);
1173
1174         while (i--) {
1175                 r = kzalloc(sizeof(*r), GFP_KERNEL);
1176                 if (r == NULL)
1177                         return -ENOMEM;
1178                 f = &r->r.frmr;
1179
1180                 f->fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1181                                                 ia->ri_max_frmr_depth);
1182                 if (IS_ERR(f->fr_mr)) {
1183                         rc = PTR_ERR(f->fr_mr);
1184                         dprintk("RPC:       %s: ib_alloc_fast_reg_mr "
1185                                 "failed %i\n", __func__, rc);
1186                         goto out_free;
1187                 }
1188
1189                 f->fr_pgl = ib_alloc_fast_reg_page_list(ia->ri_id->device,
1190                                                         ia->ri_max_frmr_depth);
1191                 if (IS_ERR(f->fr_pgl)) {
1192                         rc = PTR_ERR(f->fr_pgl);
1193                         dprintk("RPC:       %s: ib_alloc_fast_reg_page_list "
1194                                 "failed %i\n", __func__, rc);
1195
1196                         ib_dereg_mr(f->fr_mr);
1197                         goto out_free;
1198                 }
1199
1200                 list_add(&r->mw_list, &buf->rb_mws);
1201                 list_add(&r->mw_all, &buf->rb_all);
1202         }
1203
1204         return 0;
1205
1206 out_free:
1207         kfree(r);
1208         return rc;
1209 }
1210
1211 int
1212 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1213 {
1214         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1215         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1216         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1217         char *p;
1218         size_t len;
1219         int i, rc;
1220
1221         buf->rb_max_requests = cdata->max_requests;
1222         spin_lock_init(&buf->rb_lock);
1223
1224         /* Need to allocate:
1225          *   1.  arrays for send and recv pointers
1226          *   2.  arrays of struct rpcrdma_req to fill in pointers
1227          *   3.  array of struct rpcrdma_rep for replies
1228          * Send/recv buffers in req/rep need to be registered
1229          */
1230         len = buf->rb_max_requests *
1231                 (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1232
1233         p = kzalloc(len, GFP_KERNEL);
1234         if (p == NULL) {
1235                 dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1236                         __func__, len);
1237                 rc = -ENOMEM;
1238                 goto out;
1239         }
1240         buf->rb_pool = p;       /* for freeing it later */
1241
1242         buf->rb_send_bufs = (struct rpcrdma_req **) p;
1243         p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1244         buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1245         p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1246
1247         INIT_LIST_HEAD(&buf->rb_mws);
1248         INIT_LIST_HEAD(&buf->rb_all);
1249         switch (ia->ri_memreg_strategy) {
1250         case RPCRDMA_FRMR:
1251                 rc = rpcrdma_init_frmrs(ia, buf);
1252                 if (rc)
1253                         goto out;
1254                 break;
1255         case RPCRDMA_MTHCAFMR:
1256                 rc = rpcrdma_init_fmrs(ia, buf);
1257                 if (rc)
1258                         goto out;
1259                 break;
1260         default:
1261                 break;
1262         }
1263
1264         for (i = 0; i < buf->rb_max_requests; i++) {
1265                 struct rpcrdma_req *req;
1266                 struct rpcrdma_rep *rep;
1267
1268                 req = rpcrdma_create_req(r_xprt);
1269                 if (IS_ERR(req)) {
1270                         dprintk("RPC:       %s: request buffer %d alloc"
1271                                 " failed\n", __func__, i);
1272                         rc = PTR_ERR(req);
1273                         goto out;
1274                 }
1275                 buf->rb_send_bufs[i] = req;
1276
1277                 rep = rpcrdma_create_rep(r_xprt);
1278                 if (IS_ERR(rep)) {
1279                         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1280                                 __func__, i);
1281                         rc = PTR_ERR(rep);
1282                         goto out;
1283                 }
1284                 buf->rb_recv_bufs[i] = rep;
1285         }
1286
1287         return 0;
1288 out:
1289         rpcrdma_buffer_destroy(buf);
1290         return rc;
1291 }
1292
1293 static void
1294 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
1295 {
1296         if (!rep)
1297                 return;
1298
1299         rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
1300         kfree(rep);
1301 }
1302
1303 static void
1304 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
1305 {
1306         if (!req)
1307                 return;
1308
1309         rpcrdma_free_regbuf(ia, req->rl_sendbuf);
1310         rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
1311         kfree(req);
1312 }
1313
1314 static void
1315 rpcrdma_destroy_fmrs(struct rpcrdma_buffer *buf)
1316 {
1317         struct rpcrdma_mw *r;
1318         int rc;
1319
1320         while (!list_empty(&buf->rb_all)) {
1321                 r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
1322                 list_del(&r->mw_all);
1323                 list_del(&r->mw_list);
1324
1325                 rc = ib_dealloc_fmr(r->r.fmr);
1326                 if (rc)
1327                         dprintk("RPC:       %s: ib_dealloc_fmr failed %i\n",
1328                                 __func__, rc);
1329
1330                 kfree(r);
1331         }
1332 }
1333
1334 static void
1335 rpcrdma_destroy_frmrs(struct rpcrdma_buffer *buf)
1336 {
1337         struct rpcrdma_mw *r;
1338         int rc;
1339
1340         while (!list_empty(&buf->rb_all)) {
1341                 r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
1342                 list_del(&r->mw_all);
1343                 list_del(&r->mw_list);
1344
1345                 rc = ib_dereg_mr(r->r.frmr.fr_mr);
1346                 if (rc)
1347                         dprintk("RPC:       %s: ib_dereg_mr failed %i\n",
1348                                 __func__, rc);
1349                 ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1350
1351                 kfree(r);
1352         }
1353 }
1354
1355 void
1356 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1357 {
1358         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1359         int i;
1360
1361         /* clean up in reverse order from create
1362          *   1.  recv mr memory (mr free, then kfree)
1363          *   2.  send mr memory (mr free, then kfree)
1364          *   3.  MWs
1365          */
1366         dprintk("RPC:       %s: entering\n", __func__);
1367
1368         for (i = 0; i < buf->rb_max_requests; i++) {
1369                 if (buf->rb_recv_bufs)
1370                         rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
1371                 if (buf->rb_send_bufs)
1372                         rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
1373         }
1374
1375         switch (ia->ri_memreg_strategy) {
1376         case RPCRDMA_FRMR:
1377                 rpcrdma_destroy_frmrs(buf);
1378                 break;
1379         case RPCRDMA_MTHCAFMR:
1380                 rpcrdma_destroy_fmrs(buf);
1381                 break;
1382         default:
1383                 break;
1384         }
1385
1386         kfree(buf->rb_pool);
1387 }
1388
1389 /* After a disconnect, unmap all FMRs.
1390  *
1391  * This is invoked only in the transport connect worker in order
1392  * to serialize with rpcrdma_register_fmr_external().
1393  */
1394 static void
1395 rpcrdma_reset_fmrs(struct rpcrdma_ia *ia)
1396 {
1397         struct rpcrdma_xprt *r_xprt =
1398                                 container_of(ia, struct rpcrdma_xprt, rx_ia);
1399         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1400         struct list_head *pos;
1401         struct rpcrdma_mw *r;
1402         LIST_HEAD(l);
1403         int rc;
1404
1405         list_for_each(pos, &buf->rb_all) {
1406                 r = list_entry(pos, struct rpcrdma_mw, mw_all);
1407
1408                 INIT_LIST_HEAD(&l);
1409                 list_add(&r->r.fmr->list, &l);
1410                 rc = ib_unmap_fmr(&l);
1411                 if (rc)
1412                         dprintk("RPC:       %s: ib_unmap_fmr failed %i\n",
1413                                 __func__, rc);
1414         }
1415 }
1416
1417 /* After a disconnect, a flushed FAST_REG_MR can leave an FRMR in
1418  * an unusable state. Find FRMRs in this state and dereg / reg
1419  * each.  FRMRs that are VALID and attached to an rpcrdma_req are
1420  * also torn down.
1421  *
1422  * This gives all in-use FRMRs a fresh rkey and leaves them INVALID.
1423  *
1424  * This is invoked only in the transport connect worker in order
1425  * to serialize with rpcrdma_register_frmr_external().
1426  */
1427 static void
1428 rpcrdma_reset_frmrs(struct rpcrdma_ia *ia)
1429 {
1430         struct rpcrdma_xprt *r_xprt =
1431                                 container_of(ia, struct rpcrdma_xprt, rx_ia);
1432         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1433         struct list_head *pos;
1434         struct rpcrdma_mw *r;
1435         int rc;
1436
1437         list_for_each(pos, &buf->rb_all) {
1438                 r = list_entry(pos, struct rpcrdma_mw, mw_all);
1439
1440                 if (r->r.frmr.fr_state == FRMR_IS_INVALID)
1441                         continue;
1442
1443                 rc = ib_dereg_mr(r->r.frmr.fr_mr);
1444                 if (rc)
1445                         dprintk("RPC:       %s: ib_dereg_mr failed %i\n",
1446                                 __func__, rc);
1447                 ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1448
1449                 r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1450                                         ia->ri_max_frmr_depth);
1451                 if (IS_ERR(r->r.frmr.fr_mr)) {
1452                         rc = PTR_ERR(r->r.frmr.fr_mr);
1453                         dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1454                                 " failed %i\n", __func__, rc);
1455                         continue;
1456                 }
1457                 r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
1458                                         ia->ri_id->device,
1459                                         ia->ri_max_frmr_depth);
1460                 if (IS_ERR(r->r.frmr.fr_pgl)) {
1461                         rc = PTR_ERR(r->r.frmr.fr_pgl);
1462                         dprintk("RPC:       %s: "
1463                                 "ib_alloc_fast_reg_page_list "
1464                                 "failed %i\n", __func__, rc);
1465
1466                         ib_dereg_mr(r->r.frmr.fr_mr);
1467                         continue;
1468                 }
1469                 r->r.frmr.fr_state = FRMR_IS_INVALID;
1470         }
1471 }
1472
1473 /* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
1474  * some req segments uninitialized.
1475  */
1476 static void
1477 rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
1478 {
1479         if (*mw) {
1480                 list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
1481                 *mw = NULL;
1482         }
1483 }
1484
1485 /* Cycle mw's back in reverse order, and "spin" them.
1486  * This delays and scrambles reuse as much as possible.
1487  */
1488 static void
1489 rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1490 {
1491         struct rpcrdma_mr_seg *seg = req->rl_segments;
1492         struct rpcrdma_mr_seg *seg1 = seg;
1493         int i;
1494
1495         for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
1496                 rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
1497         rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
1498 }
1499
1500 static void
1501 rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1502 {
1503         buf->rb_send_bufs[--buf->rb_send_index] = req;
1504         req->rl_niovs = 0;
1505         if (req->rl_reply) {
1506                 buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1507                 req->rl_reply->rr_func = NULL;
1508                 req->rl_reply = NULL;
1509         }
1510 }
1511
1512 /* rpcrdma_unmap_one() was already done by rpcrdma_deregister_frmr_external().
1513  * Redo only the ib_post_send().
1514  */
1515 static void
1516 rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
1517 {
1518         struct rpcrdma_xprt *r_xprt =
1519                                 container_of(ia, struct rpcrdma_xprt, rx_ia);
1520         struct ib_send_wr invalidate_wr, *bad_wr;
1521         int rc;
1522
1523         dprintk("RPC:       %s: FRMR %p is stale\n", __func__, r);
1524
1525         /* When this FRMR is re-inserted into rb_mws, it is no longer stale */
1526         r->r.frmr.fr_state = FRMR_IS_INVALID;
1527
1528         memset(&invalidate_wr, 0, sizeof(invalidate_wr));
1529         invalidate_wr.wr_id = (unsigned long)(void *)r;
1530         invalidate_wr.opcode = IB_WR_LOCAL_INV;
1531         invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
1532         DECR_CQCOUNT(&r_xprt->rx_ep);
1533
1534         dprintk("RPC:       %s: frmr %p invalidating rkey %08x\n",
1535                 __func__, r, r->r.frmr.fr_mr->rkey);
1536
1537         read_lock(&ia->ri_qplock);
1538         rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1539         read_unlock(&ia->ri_qplock);
1540         if (rc) {
1541                 /* Force rpcrdma_buffer_get() to retry */
1542                 r->r.frmr.fr_state = FRMR_IS_STALE;
1543                 dprintk("RPC:       %s: ib_post_send failed, %i\n",
1544                         __func__, rc);
1545         }
1546 }
1547
1548 static void
1549 rpcrdma_retry_flushed_linv(struct list_head *stale,
1550                            struct rpcrdma_buffer *buf)
1551 {
1552         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1553         struct list_head *pos;
1554         struct rpcrdma_mw *r;
1555         unsigned long flags;
1556
1557         list_for_each(pos, stale) {
1558                 r = list_entry(pos, struct rpcrdma_mw, mw_list);
1559                 rpcrdma_retry_local_inv(r, ia);
1560         }
1561
1562         spin_lock_irqsave(&buf->rb_lock, flags);
1563         list_splice_tail(stale, &buf->rb_mws);
1564         spin_unlock_irqrestore(&buf->rb_lock, flags);
1565 }
1566
1567 static struct rpcrdma_req *
1568 rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
1569                          struct list_head *stale)
1570 {
1571         struct rpcrdma_mw *r;
1572         int i;
1573
1574         i = RPCRDMA_MAX_SEGS - 1;
1575         while (!list_empty(&buf->rb_mws)) {
1576                 r = list_entry(buf->rb_mws.next,
1577                                struct rpcrdma_mw, mw_list);
1578                 list_del(&r->mw_list);
1579                 if (r->r.frmr.fr_state == FRMR_IS_STALE) {
1580                         list_add(&r->mw_list, stale);
1581                         continue;
1582                 }
1583                 req->rl_segments[i].rl_mw = r;
1584                 if (unlikely(i-- == 0))
1585                         return req;     /* Success */
1586         }
1587
1588         /* Not enough entries on rb_mws for this req */
1589         rpcrdma_buffer_put_sendbuf(req, buf);
1590         rpcrdma_buffer_put_mrs(req, buf);
1591         return NULL;
1592 }
1593
1594 static struct rpcrdma_req *
1595 rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1596 {
1597         struct rpcrdma_mw *r;
1598         int i;
1599
1600         i = RPCRDMA_MAX_SEGS - 1;
1601         while (!list_empty(&buf->rb_mws)) {
1602                 r = list_entry(buf->rb_mws.next,
1603                                struct rpcrdma_mw, mw_list);
1604                 list_del(&r->mw_list);
1605                 req->rl_segments[i].rl_mw = r;
1606                 if (unlikely(i-- == 0))
1607                         return req;     /* Success */
1608         }
1609
1610         /* Not enough entries on rb_mws for this req */
1611         rpcrdma_buffer_put_sendbuf(req, buf);
1612         rpcrdma_buffer_put_mrs(req, buf);
1613         return NULL;
1614 }
1615
1616 /*
1617  * Get a set of request/reply buffers.
1618  *
1619  * Reply buffer (if needed) is attached to send buffer upon return.
1620  * Rule:
1621  *    rb_send_index and rb_recv_index MUST always be pointing to the
1622  *    *next* available buffer (non-NULL). They are incremented after
1623  *    removing buffers, and decremented *before* returning them.
1624  */
1625 struct rpcrdma_req *
1626 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1627 {
1628         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1629         struct list_head stale;
1630         struct rpcrdma_req *req;
1631         unsigned long flags;
1632
1633         spin_lock_irqsave(&buffers->rb_lock, flags);
1634         if (buffers->rb_send_index == buffers->rb_max_requests) {
1635                 spin_unlock_irqrestore(&buffers->rb_lock, flags);
1636                 dprintk("RPC:       %s: out of request buffers\n", __func__);
1637                 return ((struct rpcrdma_req *)NULL);
1638         }
1639
1640         req = buffers->rb_send_bufs[buffers->rb_send_index];
1641         if (buffers->rb_send_index < buffers->rb_recv_index) {
1642                 dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1643                         __func__,
1644                         buffers->rb_recv_index - buffers->rb_send_index);
1645                 req->rl_reply = NULL;
1646         } else {
1647                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1648                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1649         }
1650         buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1651
1652         INIT_LIST_HEAD(&stale);
1653         switch (ia->ri_memreg_strategy) {
1654         case RPCRDMA_FRMR:
1655                 req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
1656                 break;
1657         case RPCRDMA_MTHCAFMR:
1658                 req = rpcrdma_buffer_get_fmrs(req, buffers);
1659                 break;
1660         default:
1661                 break;
1662         }
1663         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1664         if (!list_empty(&stale))
1665                 rpcrdma_retry_flushed_linv(&stale, buffers);
1666         return req;
1667 }
1668
1669 /*
1670  * Put request/reply buffers back into pool.
1671  * Pre-decrement counter/array index.
1672  */
1673 void
1674 rpcrdma_buffer_put(struct rpcrdma_req *req)
1675 {
1676         struct rpcrdma_buffer *buffers = req->rl_buffer;
1677         struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1678         unsigned long flags;
1679
1680         spin_lock_irqsave(&buffers->rb_lock, flags);
1681         rpcrdma_buffer_put_sendbuf(req, buffers);
1682         switch (ia->ri_memreg_strategy) {
1683         case RPCRDMA_FRMR:
1684         case RPCRDMA_MTHCAFMR:
1685                 rpcrdma_buffer_put_mrs(req, buffers);
1686                 break;
1687         default:
1688                 break;
1689         }
1690         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1691 }
1692
1693 /*
1694  * Recover reply buffers from pool.
1695  * This happens when recovering from error conditions.
1696  * Post-increment counter/array index.
1697  */
1698 void
1699 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1700 {
1701         struct rpcrdma_buffer *buffers = req->rl_buffer;
1702         unsigned long flags;
1703
1704         spin_lock_irqsave(&buffers->rb_lock, flags);
1705         if (buffers->rb_recv_index < buffers->rb_max_requests) {
1706                 req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1707                 buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1708         }
1709         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1710 }
1711
1712 /*
1713  * Put reply buffers back into pool when not attached to
1714  * request. This happens in error conditions.
1715  */
1716 void
1717 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1718 {
1719         struct rpcrdma_buffer *buffers = rep->rr_buffer;
1720         unsigned long flags;
1721
1722         rep->rr_func = NULL;
1723         spin_lock_irqsave(&buffers->rb_lock, flags);
1724         buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1725         spin_unlock_irqrestore(&buffers->rb_lock, flags);
1726 }
1727
1728 /*
1729  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1730  */
1731
1732 static int
1733 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1734                                 struct ib_mr **mrp, struct ib_sge *iov)
1735 {
1736         struct ib_phys_buf ipb;
1737         struct ib_mr *mr;
1738         int rc;
1739
1740         /*
1741          * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1742          */
1743         iov->addr = ib_dma_map_single(ia->ri_id->device,
1744                         va, len, DMA_BIDIRECTIONAL);
1745         if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
1746                 return -ENOMEM;
1747
1748         iov->length = len;
1749
1750         if (ia->ri_have_dma_lkey) {
1751                 *mrp = NULL;
1752                 iov->lkey = ia->ri_dma_lkey;
1753                 return 0;
1754         } else if (ia->ri_bind_mem != NULL) {
1755                 *mrp = NULL;
1756                 iov->lkey = ia->ri_bind_mem->lkey;
1757                 return 0;
1758         }
1759
1760         ipb.addr = iov->addr;
1761         ipb.size = iov->length;
1762         mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1763                         IB_ACCESS_LOCAL_WRITE, &iov->addr);
1764
1765         dprintk("RPC:       %s: phys convert: 0x%llx "
1766                         "registered 0x%llx length %d\n",
1767                         __func__, (unsigned long long)ipb.addr,
1768                         (unsigned long long)iov->addr, len);
1769
1770         if (IS_ERR(mr)) {
1771                 *mrp = NULL;
1772                 rc = PTR_ERR(mr);
1773                 dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1774         } else {
1775                 *mrp = mr;
1776                 iov->lkey = mr->lkey;
1777                 rc = 0;
1778         }
1779
1780         return rc;
1781 }
1782
1783 static int
1784 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1785                                 struct ib_mr *mr, struct ib_sge *iov)
1786 {
1787         int rc;
1788
1789         ib_dma_unmap_single(ia->ri_id->device,
1790                         iov->addr, iov->length, DMA_BIDIRECTIONAL);
1791
1792         if (NULL == mr)
1793                 return 0;
1794
1795         rc = ib_dereg_mr(mr);
1796         if (rc)
1797                 dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1798         return rc;
1799 }
1800
1801 /**
1802  * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1803  * @ia: controlling rpcrdma_ia
1804  * @size: size of buffer to be allocated, in bytes
1805  * @flags: GFP flags
1806  *
1807  * Returns pointer to private header of an area of internally
1808  * registered memory, or an ERR_PTR. The registered buffer follows
1809  * the end of the private header.
1810  *
1811  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1812  * receiving the payload of RDMA RECV operations. regbufs are not
1813  * used for RDMA READ/WRITE operations, thus are registered only for
1814  * LOCAL access.
1815  */
1816 struct rpcrdma_regbuf *
1817 rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
1818 {
1819         struct rpcrdma_regbuf *rb;
1820         int rc;
1821
1822         rc = -ENOMEM;
1823         rb = kmalloc(sizeof(*rb) + size, flags);
1824         if (rb == NULL)
1825                 goto out;
1826
1827         rb->rg_size = size;
1828         rb->rg_owner = NULL;
1829         rc = rpcrdma_register_internal(ia, rb->rg_base, size,
1830                                        &rb->rg_mr, &rb->rg_iov);
1831         if (rc)
1832                 goto out_free;
1833
1834         return rb;
1835
1836 out_free:
1837         kfree(rb);
1838 out:
1839         return ERR_PTR(rc);
1840 }
1841
1842 /**
1843  * rpcrdma_free_regbuf - deregister and free registered buffer
1844  * @ia: controlling rpcrdma_ia
1845  * @rb: regbuf to be deregistered and freed
1846  */
1847 void
1848 rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1849 {
1850         if (rb) {
1851                 rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov);
1852                 kfree(rb);
1853         }
1854 }
1855
1856 /*
1857  * Wrappers for chunk registration, shared by read/write chunk code.
1858  */
1859
1860 static void
1861 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1862 {
1863         seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1864         seg->mr_dmalen = seg->mr_len;
1865         if (seg->mr_page)
1866                 seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1867                                 seg->mr_page, offset_in_page(seg->mr_offset),
1868                                 seg->mr_dmalen, seg->mr_dir);
1869         else
1870                 seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1871                                 seg->mr_offset,
1872                                 seg->mr_dmalen, seg->mr_dir);
1873         if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1874                 dprintk("RPC:       %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1875                         __func__,
1876                         (unsigned long long)seg->mr_dma,
1877                         seg->mr_offset, seg->mr_dmalen);
1878         }
1879 }
1880
1881 static void
1882 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1883 {
1884         if (seg->mr_page)
1885                 ib_dma_unmap_page(ia->ri_id->device,
1886                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1887         else
1888                 ib_dma_unmap_single(ia->ri_id->device,
1889                                 seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1890 }
1891
1892 static int
1893 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1894                         int *nsegs, int writing, struct rpcrdma_ia *ia,
1895                         struct rpcrdma_xprt *r_xprt)
1896 {
1897         struct rpcrdma_mr_seg *seg1 = seg;
1898         struct rpcrdma_mw *mw = seg1->rl_mw;
1899         struct rpcrdma_frmr *frmr = &mw->r.frmr;
1900         struct ib_mr *mr = frmr->fr_mr;
1901         struct ib_send_wr fastreg_wr, *bad_wr;
1902         u8 key;
1903         int len, pageoff;
1904         int i, rc;
1905         int seg_len;
1906         u64 pa;
1907         int page_no;
1908
1909         pageoff = offset_in_page(seg1->mr_offset);
1910         seg1->mr_offset -= pageoff;     /* start of page */
1911         seg1->mr_len += pageoff;
1912         len = -pageoff;
1913         if (*nsegs > ia->ri_max_frmr_depth)
1914                 *nsegs = ia->ri_max_frmr_depth;
1915         for (page_no = i = 0; i < *nsegs;) {
1916                 rpcrdma_map_one(ia, seg, writing);
1917                 pa = seg->mr_dma;
1918                 for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
1919                         frmr->fr_pgl->page_list[page_no++] = pa;
1920                         pa += PAGE_SIZE;
1921                 }
1922                 len += seg->mr_len;
1923                 ++seg;
1924                 ++i;
1925                 /* Check for holes */
1926                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1927                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1928                         break;
1929         }
1930         dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1931                 __func__, mw, i);
1932
1933         frmr->fr_state = FRMR_IS_VALID;
1934
1935         memset(&fastreg_wr, 0, sizeof(fastreg_wr));
1936         fastreg_wr.wr_id = (unsigned long)(void *)mw;
1937         fastreg_wr.opcode = IB_WR_FAST_REG_MR;
1938         fastreg_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1939         fastreg_wr.wr.fast_reg.page_list = frmr->fr_pgl;
1940         fastreg_wr.wr.fast_reg.page_list_len = page_no;
1941         fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1942         fastreg_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
1943         if (fastreg_wr.wr.fast_reg.length < len) {
1944                 rc = -EIO;
1945                 goto out_err;
1946         }
1947
1948         /* Bump the key */
1949         key = (u8)(mr->rkey & 0x000000FF);
1950         ib_update_fast_reg_key(mr, ++key);
1951
1952         fastreg_wr.wr.fast_reg.access_flags = (writing ?
1953                                 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1954                                 IB_ACCESS_REMOTE_READ);
1955         fastreg_wr.wr.fast_reg.rkey = mr->rkey;
1956         DECR_CQCOUNT(&r_xprt->rx_ep);
1957
1958         rc = ib_post_send(ia->ri_id->qp, &fastreg_wr, &bad_wr);
1959         if (rc) {
1960                 dprintk("RPC:       %s: failed ib_post_send for register,"
1961                         " status %i\n", __func__, rc);
1962                 ib_update_fast_reg_key(mr, --key);
1963                 goto out_err;
1964         } else {
1965                 seg1->mr_rkey = mr->rkey;
1966                 seg1->mr_base = seg1->mr_dma + pageoff;
1967                 seg1->mr_nsegs = i;
1968                 seg1->mr_len = len;
1969         }
1970         *nsegs = i;
1971         return 0;
1972 out_err:
1973         frmr->fr_state = FRMR_IS_INVALID;
1974         while (i--)
1975                 rpcrdma_unmap_one(ia, --seg);
1976         return rc;
1977 }
1978
1979 static int
1980 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1981                         struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1982 {
1983         struct rpcrdma_mr_seg *seg1 = seg;
1984         struct ib_send_wr invalidate_wr, *bad_wr;
1985         int rc;
1986
1987         seg1->rl_mw->r.frmr.fr_state = FRMR_IS_INVALID;
1988
1989         memset(&invalidate_wr, 0, sizeof invalidate_wr);
1990         invalidate_wr.wr_id = (unsigned long)(void *)seg1->rl_mw;
1991         invalidate_wr.opcode = IB_WR_LOCAL_INV;
1992         invalidate_wr.ex.invalidate_rkey = seg1->rl_mw->r.frmr.fr_mr->rkey;
1993         DECR_CQCOUNT(&r_xprt->rx_ep);
1994
1995         read_lock(&ia->ri_qplock);
1996         while (seg1->mr_nsegs--)
1997                 rpcrdma_unmap_one(ia, seg++);
1998         rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1999         read_unlock(&ia->ri_qplock);
2000         if (rc) {
2001                 /* Force rpcrdma_buffer_get() to retry */
2002                 seg1->rl_mw->r.frmr.fr_state = FRMR_IS_STALE;
2003                 dprintk("RPC:       %s: failed ib_post_send for invalidate,"
2004                         " status %i\n", __func__, rc);
2005         }
2006         return rc;
2007 }
2008
2009 static int
2010 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
2011                         int *nsegs, int writing, struct rpcrdma_ia *ia)
2012 {
2013         struct rpcrdma_mr_seg *seg1 = seg;
2014         u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
2015         int len, pageoff, i, rc;
2016
2017         pageoff = offset_in_page(seg1->mr_offset);
2018         seg1->mr_offset -= pageoff;     /* start of page */
2019         seg1->mr_len += pageoff;
2020         len = -pageoff;
2021         if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
2022                 *nsegs = RPCRDMA_MAX_DATA_SEGS;
2023         for (i = 0; i < *nsegs;) {
2024                 rpcrdma_map_one(ia, seg, writing);
2025                 physaddrs[i] = seg->mr_dma;
2026                 len += seg->mr_len;
2027                 ++seg;
2028                 ++i;
2029                 /* Check for holes */
2030                 if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
2031                     offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
2032                         break;
2033         }
2034         rc = ib_map_phys_fmr(seg1->rl_mw->r.fmr, physaddrs, i, seg1->mr_dma);
2035         if (rc) {
2036                 dprintk("RPC:       %s: failed ib_map_phys_fmr "
2037                         "%u@0x%llx+%i (%d)... status %i\n", __func__,
2038                         len, (unsigned long long)seg1->mr_dma,
2039                         pageoff, i, rc);
2040                 while (i--)
2041                         rpcrdma_unmap_one(ia, --seg);
2042         } else {
2043                 seg1->mr_rkey = seg1->rl_mw->r.fmr->rkey;
2044                 seg1->mr_base = seg1->mr_dma + pageoff;
2045                 seg1->mr_nsegs = i;
2046                 seg1->mr_len = len;
2047         }
2048         *nsegs = i;
2049         return rc;
2050 }
2051
2052 static int
2053 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
2054                         struct rpcrdma_ia *ia)
2055 {
2056         struct rpcrdma_mr_seg *seg1 = seg;
2057         LIST_HEAD(l);
2058         int rc;
2059
2060         list_add(&seg1->rl_mw->r.fmr->list, &l);
2061         rc = ib_unmap_fmr(&l);
2062         read_lock(&ia->ri_qplock);
2063         while (seg1->mr_nsegs--)
2064                 rpcrdma_unmap_one(ia, seg++);
2065         read_unlock(&ia->ri_qplock);
2066         if (rc)
2067                 dprintk("RPC:       %s: failed ib_unmap_fmr,"
2068                         " status %i\n", __func__, rc);
2069         return rc;
2070 }
2071
2072 int
2073 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
2074                         int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
2075 {
2076         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
2077         int rc = 0;
2078
2079         switch (ia->ri_memreg_strategy) {
2080
2081         case RPCRDMA_ALLPHYSICAL:
2082                 rpcrdma_map_one(ia, seg, writing);
2083                 seg->mr_rkey = ia->ri_bind_mem->rkey;
2084                 seg->mr_base = seg->mr_dma;
2085                 seg->mr_nsegs = 1;
2086                 nsegs = 1;
2087                 break;
2088
2089         /* Registration using frmr registration */
2090         case RPCRDMA_FRMR:
2091                 rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
2092                 break;
2093
2094         /* Registration using fmr memory registration */
2095         case RPCRDMA_MTHCAFMR:
2096                 rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
2097                 break;
2098
2099         default:
2100                 return -EIO;
2101         }
2102         if (rc)
2103                 return rc;
2104
2105         return nsegs;
2106 }
2107
2108 int
2109 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
2110                 struct rpcrdma_xprt *r_xprt)
2111 {
2112         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
2113         int nsegs = seg->mr_nsegs, rc;
2114
2115         switch (ia->ri_memreg_strategy) {
2116
2117         case RPCRDMA_ALLPHYSICAL:
2118                 read_lock(&ia->ri_qplock);
2119                 rpcrdma_unmap_one(ia, seg);
2120                 read_unlock(&ia->ri_qplock);
2121                 break;
2122
2123         case RPCRDMA_FRMR:
2124                 rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
2125                 break;
2126
2127         case RPCRDMA_MTHCAFMR:
2128                 rc = rpcrdma_deregister_fmr_external(seg, ia);
2129                 break;
2130
2131         default:
2132                 break;
2133         }
2134         return nsegs;
2135 }
2136
2137 /*
2138  * Prepost any receive buffer, then post send.
2139  *
2140  * Receive buffer is donated to hardware, reclaimed upon recv completion.
2141  */
2142 int
2143 rpcrdma_ep_post(struct rpcrdma_ia *ia,
2144                 struct rpcrdma_ep *ep,
2145                 struct rpcrdma_req *req)
2146 {
2147         struct ib_send_wr send_wr, *send_wr_fail;
2148         struct rpcrdma_rep *rep = req->rl_reply;
2149         int rc;
2150
2151         if (rep) {
2152                 rc = rpcrdma_ep_post_recv(ia, ep, rep);
2153                 if (rc)
2154                         goto out;
2155                 req->rl_reply = NULL;
2156         }
2157
2158         send_wr.next = NULL;
2159         send_wr.wr_id = 0ULL;   /* no send cookie */
2160         send_wr.sg_list = req->rl_send_iov;
2161         send_wr.num_sge = req->rl_niovs;
2162         send_wr.opcode = IB_WR_SEND;
2163         if (send_wr.num_sge == 4)       /* no need to sync any pad (constant) */
2164                 ib_dma_sync_single_for_device(ia->ri_id->device,
2165                         req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
2166                         DMA_TO_DEVICE);
2167         ib_dma_sync_single_for_device(ia->ri_id->device,
2168                 req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
2169                 DMA_TO_DEVICE);
2170         ib_dma_sync_single_for_device(ia->ri_id->device,
2171                 req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
2172                 DMA_TO_DEVICE);
2173
2174         if (DECR_CQCOUNT(ep) > 0)
2175                 send_wr.send_flags = 0;
2176         else { /* Provider must take a send completion every now and then */
2177                 INIT_CQCOUNT(ep);
2178                 send_wr.send_flags = IB_SEND_SIGNALED;
2179         }
2180
2181         rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
2182         if (rc)
2183                 dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
2184                         rc);
2185 out:
2186         return rc;
2187 }
2188
2189 /*
2190  * (Re)post a receive buffer.
2191  */
2192 int
2193 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
2194                      struct rpcrdma_ep *ep,
2195                      struct rpcrdma_rep *rep)
2196 {
2197         struct ib_recv_wr recv_wr, *recv_wr_fail;
2198         int rc;
2199
2200         recv_wr.next = NULL;
2201         recv_wr.wr_id = (u64) (unsigned long) rep;
2202         recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
2203         recv_wr.num_sge = 1;
2204
2205         ib_dma_sync_single_for_cpu(ia->ri_id->device,
2206                                    rdmab_addr(rep->rr_rdmabuf),
2207                                    rdmab_length(rep->rr_rdmabuf),
2208                                    DMA_BIDIRECTIONAL);
2209
2210         rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
2211
2212         if (rc)
2213                 dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
2214                         rc);
2215         return rc;
2216 }
2217
2218 /* Physical mapping means one Read/Write list entry per-page.
2219  * All list entries must fit within an inline buffer
2220  *
2221  * NB: The server must return a Write list for NFS READ,
2222  *     which has the same constraint. Factor in the inline
2223  *     rsize as well.
2224  */
2225 static size_t
2226 rpcrdma_physical_max_payload(struct rpcrdma_xprt *r_xprt)
2227 {
2228         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
2229         unsigned int inline_size, pages;
2230
2231         inline_size = min_t(unsigned int,
2232                             cdata->inline_wsize, cdata->inline_rsize);
2233         inline_size -= RPCRDMA_HDRLEN_MIN;
2234         pages = inline_size / sizeof(struct rpcrdma_segment);
2235         return pages << PAGE_SHIFT;
2236 }
2237
2238 static size_t
2239 rpcrdma_mr_max_payload(struct rpcrdma_xprt *r_xprt)
2240 {
2241         return RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT;
2242 }
2243
2244 size_t
2245 rpcrdma_max_payload(struct rpcrdma_xprt *r_xprt)
2246 {
2247         size_t result;
2248
2249         switch (r_xprt->rx_ia.ri_memreg_strategy) {
2250         case RPCRDMA_ALLPHYSICAL:
2251                 result = rpcrdma_physical_max_payload(r_xprt);
2252                 break;
2253         default:
2254                 result = rpcrdma_mr_max_payload(r_xprt);
2255         }
2256         return result;
2257 }