Merge tag 'nfs-for-4.9-1' of git://git.linux-nfs.org/projects/anna/linux-nfs
[cascardo/linux.git] / net / sunrpc / xprtrdma / rpc_rdma.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * rpc_rdma.c
42  *
43  * This file contains the guts of the RPC RDMA protocol, and
44  * does marshaling/unmarshaling, etc. It is also where interfacing
45  * to the Linux RPC framework lives.
46  */
47
48 #include "xprt_rdma.h"
49
50 #include <linux/highmem.h>
51
52 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
53 # define RPCDBG_FACILITY        RPCDBG_TRANS
54 #endif
55
56 static const char transfertypes[][12] = {
57         "inline",       /* no chunks */
58         "read list",    /* some argument via rdma read */
59         "*read list",   /* entire request via rdma read */
60         "write list",   /* some result via rdma write */
61         "reply chunk"   /* entire reply via rdma write */
62 };
63
64 /* Returns size of largest RPC-over-RDMA header in a Call message
65  *
66  * The largest Call header contains a full-size Read list and a
67  * minimal Reply chunk.
68  */
69 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
70 {
71         unsigned int size;
72
73         /* Fixed header fields and list discriminators */
74         size = RPCRDMA_HDRLEN_MIN;
75
76         /* Maximum Read list size */
77         maxsegs += 2;   /* segment for head and tail buffers */
78         size = maxsegs * sizeof(struct rpcrdma_read_chunk);
79
80         /* Minimal Read chunk size */
81         size += sizeof(__be32); /* segment count */
82         size += sizeof(struct rpcrdma_segment);
83         size += sizeof(__be32); /* list discriminator */
84
85         dprintk("RPC:       %s: max call header size = %u\n",
86                 __func__, size);
87         return size;
88 }
89
90 /* Returns size of largest RPC-over-RDMA header in a Reply message
91  *
92  * There is only one Write list or one Reply chunk per Reply
93  * message.  The larger list is the Write list.
94  */
95 static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
96 {
97         unsigned int size;
98
99         /* Fixed header fields and list discriminators */
100         size = RPCRDMA_HDRLEN_MIN;
101
102         /* Maximum Write list size */
103         maxsegs += 2;   /* segment for head and tail buffers */
104         size = sizeof(__be32);          /* segment count */
105         size += maxsegs * sizeof(struct rpcrdma_segment);
106         size += sizeof(__be32); /* list discriminator */
107
108         dprintk("RPC:       %s: max reply header size = %u\n",
109                 __func__, size);
110         return size;
111 }
112
113 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt)
114 {
115         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
116         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
117         unsigned int maxsegs = ia->ri_max_segs;
118
119         ia->ri_max_inline_write = cdata->inline_wsize -
120                                   rpcrdma_max_call_header_size(maxsegs);
121         ia->ri_max_inline_read = cdata->inline_rsize -
122                                  rpcrdma_max_reply_header_size(maxsegs);
123 }
124
125 /* The client can send a request inline as long as the RPCRDMA header
126  * plus the RPC call fit under the transport's inline limit. If the
127  * combined call message size exceeds that limit, the client must use
128  * the read chunk list for this operation.
129  */
130 static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
131                                 struct rpc_rqst *rqst)
132 {
133         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
134
135         return rqst->rq_snd_buf.len <= ia->ri_max_inline_write;
136 }
137
138 /* The client can't know how large the actual reply will be. Thus it
139  * plans for the largest possible reply for that particular ULP
140  * operation. If the maximum combined reply message size exceeds that
141  * limit, the client must provide a write list or a reply chunk for
142  * this request.
143  */
144 static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
145                                    struct rpc_rqst *rqst)
146 {
147         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
148
149         return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
150 }
151
152 /* Split "vec" on page boundaries into segments. FMR registers pages,
153  * not a byte range. Other modes coalesce these segments into a single
154  * MR when they can.
155  */
156 static int
157 rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n)
158 {
159         size_t page_offset;
160         u32 remaining;
161         char *base;
162
163         base = vec->iov_base;
164         page_offset = offset_in_page(base);
165         remaining = vec->iov_len;
166         while (remaining && n < RPCRDMA_MAX_SEGS) {
167                 seg[n].mr_page = NULL;
168                 seg[n].mr_offset = base;
169                 seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
170                 remaining -= seg[n].mr_len;
171                 base += seg[n].mr_len;
172                 ++n;
173                 page_offset = 0;
174         }
175         return n;
176 }
177
178 /*
179  * Chunk assembly from upper layer xdr_buf.
180  *
181  * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
182  * elements. Segments are then coalesced when registered, if possible
183  * within the selected memreg mode.
184  *
185  * Returns positive number of segments converted, or a negative errno.
186  */
187
188 static int
189 rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
190         enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg,
191         bool reminv_expected)
192 {
193         int len, n, p, page_base;
194         struct page **ppages;
195
196         n = 0;
197         if (pos == 0) {
198                 n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n);
199                 if (n == RPCRDMA_MAX_SEGS)
200                         goto out_overflow;
201         }
202
203         len = xdrbuf->page_len;
204         ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
205         page_base = xdrbuf->page_base & ~PAGE_MASK;
206         p = 0;
207         while (len && n < RPCRDMA_MAX_SEGS) {
208                 if (!ppages[p]) {
209                         /* alloc the pagelist for receiving buffer */
210                         ppages[p] = alloc_page(GFP_ATOMIC);
211                         if (!ppages[p])
212                                 return -EAGAIN;
213                 }
214                 seg[n].mr_page = ppages[p];
215                 seg[n].mr_offset = (void *)(unsigned long) page_base;
216                 seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len);
217                 if (seg[n].mr_len > PAGE_SIZE)
218                         goto out_overflow;
219                 len -= seg[n].mr_len;
220                 ++n;
221                 ++p;
222                 page_base = 0;  /* page offset only applies to first page */
223         }
224
225         /* Message overflows the seg array */
226         if (len && n == RPCRDMA_MAX_SEGS)
227                 goto out_overflow;
228
229         /* When encoding the read list, the tail is always sent inline */
230         if (type == rpcrdma_readch)
231                 return n;
232
233         /* When encoding the Write list, some servers need to see an extra
234          * segment for odd-length Write chunks. The upper layer provides
235          * space in the tail iovec for this purpose.
236          */
237         if (type == rpcrdma_writech && reminv_expected)
238                 return n;
239
240         if (xdrbuf->tail[0].iov_len) {
241                 /* the rpcrdma protocol allows us to omit any trailing
242                  * xdr pad bytes, saving the server an RDMA operation. */
243                 if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)
244                         return n;
245                 n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n);
246                 if (n == RPCRDMA_MAX_SEGS)
247                         goto out_overflow;
248         }
249
250         return n;
251
252 out_overflow:
253         pr_err("rpcrdma: segment array overflow\n");
254         return -EIO;
255 }
256
257 static inline __be32 *
258 xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw)
259 {
260         *iptr++ = cpu_to_be32(mw->mw_handle);
261         *iptr++ = cpu_to_be32(mw->mw_length);
262         return xdr_encode_hyper(iptr, mw->mw_offset);
263 }
264
265 /* XDR-encode the Read list. Supports encoding a list of read
266  * segments that belong to a single read chunk.
267  *
268  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
269  *
270  *  Read chunklist (a linked list):
271  *   N elements, position P (same P for all chunks of same arg!):
272  *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
273  *
274  * Returns a pointer to the XDR word in the RDMA header following
275  * the end of the Read list, or an error pointer.
276  */
277 static __be32 *
278 rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
279                          struct rpcrdma_req *req, struct rpc_rqst *rqst,
280                          __be32 *iptr, enum rpcrdma_chunktype rtype)
281 {
282         struct rpcrdma_mr_seg *seg;
283         struct rpcrdma_mw *mw;
284         unsigned int pos;
285         int n, nsegs;
286
287         if (rtype == rpcrdma_noch) {
288                 *iptr++ = xdr_zero;     /* item not present */
289                 return iptr;
290         }
291
292         pos = rqst->rq_snd_buf.head[0].iov_len;
293         if (rtype == rpcrdma_areadch)
294                 pos = 0;
295         seg = req->rl_segments;
296         nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg, false);
297         if (nsegs < 0)
298                 return ERR_PTR(nsegs);
299
300         do {
301                 n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
302                                                  false, &mw);
303                 if (n < 0)
304                         return ERR_PTR(n);
305                 list_add(&mw->mw_list, &req->rl_registered);
306
307                 *iptr++ = xdr_one;      /* item present */
308
309                 /* All read segments in this chunk
310                  * have the same "position".
311                  */
312                 *iptr++ = cpu_to_be32(pos);
313                 iptr = xdr_encode_rdma_segment(iptr, mw);
314
315                 dprintk("RPC: %5u %s: pos %u %u@0x%016llx:0x%08x (%s)\n",
316                         rqst->rq_task->tk_pid, __func__, pos,
317                         mw->mw_length, (unsigned long long)mw->mw_offset,
318                         mw->mw_handle, n < nsegs ? "more" : "last");
319
320                 r_xprt->rx_stats.read_chunk_count++;
321                 seg += n;
322                 nsegs -= n;
323         } while (nsegs);
324
325         /* Finish Read list */
326         *iptr++ = xdr_zero;     /* Next item not present */
327         return iptr;
328 }
329
330 /* XDR-encode the Write list. Supports encoding a list containing
331  * one array of plain segments that belong to a single write chunk.
332  *
333  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
334  *
335  *  Write chunklist (a list of (one) counted array):
336  *   N elements:
337  *    1 - N - HLOO - HLOO - ... - HLOO - 0
338  *
339  * Returns a pointer to the XDR word in the RDMA header following
340  * the end of the Write list, or an error pointer.
341  */
342 static __be32 *
343 rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
344                           struct rpc_rqst *rqst, __be32 *iptr,
345                           enum rpcrdma_chunktype wtype)
346 {
347         struct rpcrdma_mr_seg *seg;
348         struct rpcrdma_mw *mw;
349         int n, nsegs, nchunks;
350         __be32 *segcount;
351
352         if (wtype != rpcrdma_writech) {
353                 *iptr++ = xdr_zero;     /* no Write list present */
354                 return iptr;
355         }
356
357         seg = req->rl_segments;
358         nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
359                                      rqst->rq_rcv_buf.head[0].iov_len,
360                                      wtype, seg,
361                                      r_xprt->rx_ia.ri_reminv_expected);
362         if (nsegs < 0)
363                 return ERR_PTR(nsegs);
364
365         *iptr++ = xdr_one;      /* Write list present */
366         segcount = iptr++;      /* save location of segment count */
367
368         nchunks = 0;
369         do {
370                 n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
371                                                  true, &mw);
372                 if (n < 0)
373                         return ERR_PTR(n);
374                 list_add(&mw->mw_list, &req->rl_registered);
375
376                 iptr = xdr_encode_rdma_segment(iptr, mw);
377
378                 dprintk("RPC: %5u %s: %u@0x016%llx:0x%08x (%s)\n",
379                         rqst->rq_task->tk_pid, __func__,
380                         mw->mw_length, (unsigned long long)mw->mw_offset,
381                         mw->mw_handle, n < nsegs ? "more" : "last");
382
383                 r_xprt->rx_stats.write_chunk_count++;
384                 r_xprt->rx_stats.total_rdma_request += seg->mr_len;
385                 nchunks++;
386                 seg   += n;
387                 nsegs -= n;
388         } while (nsegs);
389
390         /* Update count of segments in this Write chunk */
391         *segcount = cpu_to_be32(nchunks);
392
393         /* Finish Write list */
394         *iptr++ = xdr_zero;     /* Next item not present */
395         return iptr;
396 }
397
398 /* XDR-encode the Reply chunk. Supports encoding an array of plain
399  * segments that belong to a single write (reply) chunk.
400  *
401  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
402  *
403  *  Reply chunk (a counted array):
404  *   N elements:
405  *    1 - N - HLOO - HLOO - ... - HLOO
406  *
407  * Returns a pointer to the XDR word in the RDMA header following
408  * the end of the Reply chunk, or an error pointer.
409  */
410 static __be32 *
411 rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
412                            struct rpcrdma_req *req, struct rpc_rqst *rqst,
413                            __be32 *iptr, enum rpcrdma_chunktype wtype)
414 {
415         struct rpcrdma_mr_seg *seg;
416         struct rpcrdma_mw *mw;
417         int n, nsegs, nchunks;
418         __be32 *segcount;
419
420         if (wtype != rpcrdma_replych) {
421                 *iptr++ = xdr_zero;     /* no Reply chunk present */
422                 return iptr;
423         }
424
425         seg = req->rl_segments;
426         nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg,
427                                      r_xprt->rx_ia.ri_reminv_expected);
428         if (nsegs < 0)
429                 return ERR_PTR(nsegs);
430
431         *iptr++ = xdr_one;      /* Reply chunk present */
432         segcount = iptr++;      /* save location of segment count */
433
434         nchunks = 0;
435         do {
436                 n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
437                                                  true, &mw);
438                 if (n < 0)
439                         return ERR_PTR(n);
440                 list_add(&mw->mw_list, &req->rl_registered);
441
442                 iptr = xdr_encode_rdma_segment(iptr, mw);
443
444                 dprintk("RPC: %5u %s: %u@0x%016llx:0x%08x (%s)\n",
445                         rqst->rq_task->tk_pid, __func__,
446                         mw->mw_length, (unsigned long long)mw->mw_offset,
447                         mw->mw_handle, n < nsegs ? "more" : "last");
448
449                 r_xprt->rx_stats.reply_chunk_count++;
450                 r_xprt->rx_stats.total_rdma_request += seg->mr_len;
451                 nchunks++;
452                 seg   += n;
453                 nsegs -= n;
454         } while (nsegs);
455
456         /* Update count of segments in the Reply chunk */
457         *segcount = cpu_to_be32(nchunks);
458
459         return iptr;
460 }
461
462 /* Prepare the RPC-over-RDMA header SGE.
463  */
464 static bool
465 rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
466                         u32 len)
467 {
468         struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
469         struct ib_sge *sge = &req->rl_send_sge[0];
470
471         if (unlikely(!rpcrdma_regbuf_is_mapped(rb))) {
472                 if (!__rpcrdma_dma_map_regbuf(ia, rb))
473                         return false;
474                 sge->addr = rdmab_addr(rb);
475                 sge->lkey = rdmab_lkey(rb);
476         }
477         sge->length = len;
478
479         ib_dma_sync_single_for_device(ia->ri_device, sge->addr,
480                                       sge->length, DMA_TO_DEVICE);
481         req->rl_send_wr.num_sge++;
482         return true;
483 }
484
485 /* Prepare the Send SGEs. The head and tail iovec, and each entry
486  * in the page list, gets its own SGE.
487  */
488 static bool
489 rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
490                          struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
491 {
492         unsigned int sge_no, page_base, len, remaining;
493         struct rpcrdma_regbuf *rb = req->rl_sendbuf;
494         struct ib_device *device = ia->ri_device;
495         struct ib_sge *sge = req->rl_send_sge;
496         u32 lkey = ia->ri_pd->local_dma_lkey;
497         struct page *page, **ppages;
498
499         /* The head iovec is straightforward, as it is already
500          * DMA-mapped. Sync the content that has changed.
501          */
502         if (!rpcrdma_dma_map_regbuf(ia, rb))
503                 return false;
504         sge_no = 1;
505         sge[sge_no].addr = rdmab_addr(rb);
506         sge[sge_no].length = xdr->head[0].iov_len;
507         sge[sge_no].lkey = rdmab_lkey(rb);
508         ib_dma_sync_single_for_device(device, sge[sge_no].addr,
509                                       sge[sge_no].length, DMA_TO_DEVICE);
510
511         /* If there is a Read chunk, the page list is being handled
512          * via explicit RDMA, and thus is skipped here. However, the
513          * tail iovec may include an XDR pad for the page list, as
514          * well as additional content, and may not reside in the
515          * same page as the head iovec.
516          */
517         if (rtype == rpcrdma_readch) {
518                 len = xdr->tail[0].iov_len;
519
520                 /* Do not include the tail if it is only an XDR pad */
521                 if (len < 4)
522                         goto out;
523
524                 page = virt_to_page(xdr->tail[0].iov_base);
525                 page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
526
527                 /* If the content in the page list is an odd length,
528                  * xdr_write_pages() has added a pad at the beginning
529                  * of the tail iovec. Force the tail's non-pad content
530                  * to land at the next XDR position in the Send message.
531                  */
532                 page_base += len & 3;
533                 len -= len & 3;
534                 goto map_tail;
535         }
536
537         /* If there is a page list present, temporarily DMA map
538          * and prepare an SGE for each page to be sent.
539          */
540         if (xdr->page_len) {
541                 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
542                 page_base = xdr->page_base & ~PAGE_MASK;
543                 remaining = xdr->page_len;
544                 while (remaining) {
545                         sge_no++;
546                         if (sge_no > RPCRDMA_MAX_SEND_SGES - 2)
547                                 goto out_mapping_overflow;
548
549                         len = min_t(u32, PAGE_SIZE - page_base, remaining);
550                         sge[sge_no].addr = ib_dma_map_page(device, *ppages,
551                                                            page_base, len,
552                                                            DMA_TO_DEVICE);
553                         if (ib_dma_mapping_error(device, sge[sge_no].addr))
554                                 goto out_mapping_err;
555                         sge[sge_no].length = len;
556                         sge[sge_no].lkey = lkey;
557
558                         req->rl_mapped_sges++;
559                         ppages++;
560                         remaining -= len;
561                         page_base = 0;
562                 }
563         }
564
565         /* The tail iovec is not always constructed in the same
566          * page where the head iovec resides (see, for example,
567          * gss_wrap_req_priv). To neatly accommodate that case,
568          * DMA map it separately.
569          */
570         if (xdr->tail[0].iov_len) {
571                 page = virt_to_page(xdr->tail[0].iov_base);
572                 page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
573                 len = xdr->tail[0].iov_len;
574
575 map_tail:
576                 sge_no++;
577                 sge[sge_no].addr = ib_dma_map_page(device, page,
578                                                    page_base, len,
579                                                    DMA_TO_DEVICE);
580                 if (ib_dma_mapping_error(device, sge[sge_no].addr))
581                         goto out_mapping_err;
582                 sge[sge_no].length = len;
583                 sge[sge_no].lkey = lkey;
584                 req->rl_mapped_sges++;
585         }
586
587 out:
588         req->rl_send_wr.num_sge = sge_no + 1;
589         return true;
590
591 out_mapping_overflow:
592         pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
593         return false;
594
595 out_mapping_err:
596         pr_err("rpcrdma: Send mapping error\n");
597         return false;
598 }
599
600 bool
601 rpcrdma_prepare_send_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
602                           u32 hdrlen, struct xdr_buf *xdr,
603                           enum rpcrdma_chunktype rtype)
604 {
605         req->rl_send_wr.num_sge = 0;
606         req->rl_mapped_sges = 0;
607
608         if (!rpcrdma_prepare_hdr_sge(ia, req, hdrlen))
609                 goto out_map;
610
611         if (rtype != rpcrdma_areadch)
612                 if (!rpcrdma_prepare_msg_sges(ia, req, xdr, rtype))
613                         goto out_map;
614
615         return true;
616
617 out_map:
618         pr_err("rpcrdma: failed to DMA map a Send buffer\n");
619         return false;
620 }
621
622 void
623 rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
624 {
625         struct ib_device *device = ia->ri_device;
626         struct ib_sge *sge;
627         int count;
628
629         sge = &req->rl_send_sge[2];
630         for (count = req->rl_mapped_sges; count--; sge++)
631                 ib_dma_unmap_page(device, sge->addr, sge->length,
632                                   DMA_TO_DEVICE);
633         req->rl_mapped_sges = 0;
634 }
635
636 /*
637  * Marshal a request: the primary job of this routine is to choose
638  * the transfer modes. See comments below.
639  *
640  * Returns zero on success, otherwise a negative errno.
641  */
642
643 int
644 rpcrdma_marshal_req(struct rpc_rqst *rqst)
645 {
646         struct rpc_xprt *xprt = rqst->rq_xprt;
647         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
648         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
649         enum rpcrdma_chunktype rtype, wtype;
650         struct rpcrdma_msg *headerp;
651         bool ddp_allowed;
652         ssize_t hdrlen;
653         size_t rpclen;
654         __be32 *iptr;
655
656 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
657         if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
658                 return rpcrdma_bc_marshal_reply(rqst);
659 #endif
660
661         headerp = rdmab_to_msg(req->rl_rdmabuf);
662         /* don't byte-swap XID, it's already done in request */
663         headerp->rm_xid = rqst->rq_xid;
664         headerp->rm_vers = rpcrdma_version;
665         headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
666         headerp->rm_type = rdma_msg;
667
668         /* When the ULP employs a GSS flavor that guarantees integrity
669          * or privacy, direct data placement of individual data items
670          * is not allowed.
671          */
672         ddp_allowed = !(rqst->rq_cred->cr_auth->au_flags &
673                                                 RPCAUTH_AUTH_DATATOUCH);
674
675         /*
676          * Chunks needed for results?
677          *
678          * o If the expected result is under the inline threshold, all ops
679          *   return as inline.
680          * o Large read ops return data as write chunk(s), header as
681          *   inline.
682          * o Large non-read ops return as a single reply chunk.
683          */
684         if (rpcrdma_results_inline(r_xprt, rqst))
685                 wtype = rpcrdma_noch;
686         else if (ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ)
687                 wtype = rpcrdma_writech;
688         else
689                 wtype = rpcrdma_replych;
690
691         /*
692          * Chunks needed for arguments?
693          *
694          * o If the total request is under the inline threshold, all ops
695          *   are sent as inline.
696          * o Large write ops transmit data as read chunk(s), header as
697          *   inline.
698          * o Large non-write ops are sent with the entire message as a
699          *   single read chunk (protocol 0-position special case).
700          *
701          * This assumes that the upper layer does not present a request
702          * that both has a data payload, and whose non-data arguments
703          * by themselves are larger than the inline threshold.
704          */
705         if (rpcrdma_args_inline(r_xprt, rqst)) {
706                 rtype = rpcrdma_noch;
707                 rpclen = rqst->rq_snd_buf.len;
708         } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
709                 rtype = rpcrdma_readch;
710                 rpclen = rqst->rq_snd_buf.head[0].iov_len +
711                          rqst->rq_snd_buf.tail[0].iov_len;
712         } else {
713                 r_xprt->rx_stats.nomsg_call_count++;
714                 headerp->rm_type = htonl(RDMA_NOMSG);
715                 rtype = rpcrdma_areadch;
716                 rpclen = 0;
717         }
718
719         /* This implementation supports the following combinations
720          * of chunk lists in one RPC-over-RDMA Call message:
721          *
722          *   - Read list
723          *   - Write list
724          *   - Reply chunk
725          *   - Read list + Reply chunk
726          *
727          * It might not yet support the following combinations:
728          *
729          *   - Read list + Write list
730          *
731          * It does not support the following combinations:
732          *
733          *   - Write list + Reply chunk
734          *   - Read list + Write list + Reply chunk
735          *
736          * This implementation supports only a single chunk in each
737          * Read or Write list. Thus for example the client cannot
738          * send a Call message with a Position Zero Read chunk and a
739          * regular Read chunk at the same time.
740          */
741         iptr = headerp->rm_body.rm_chunks;
742         iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype);
743         if (IS_ERR(iptr))
744                 goto out_unmap;
745         iptr = rpcrdma_encode_write_list(r_xprt, req, rqst, iptr, wtype);
746         if (IS_ERR(iptr))
747                 goto out_unmap;
748         iptr = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, iptr, wtype);
749         if (IS_ERR(iptr))
750                 goto out_unmap;
751         hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;
752
753         dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
754                 rqst->rq_task->tk_pid, __func__,
755                 transfertypes[rtype], transfertypes[wtype],
756                 hdrlen, rpclen);
757
758         if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, hdrlen,
759                                        &rqst->rq_snd_buf, rtype)) {
760                 iptr = ERR_PTR(-EIO);
761                 goto out_unmap;
762         }
763         return 0;
764
765 out_unmap:
766         r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
767         return PTR_ERR(iptr);
768 }
769
770 /*
771  * Chase down a received write or reply chunklist to get length
772  * RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
773  */
774 static int
775 rpcrdma_count_chunks(struct rpcrdma_rep *rep, int wrchunk, __be32 **iptrp)
776 {
777         unsigned int i, total_len;
778         struct rpcrdma_write_chunk *cur_wchunk;
779         char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf);
780
781         i = be32_to_cpu(**iptrp);
782         cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
783         total_len = 0;
784         while (i--) {
785                 struct rpcrdma_segment *seg = &cur_wchunk->wc_target;
786                 ifdebug(FACILITY) {
787                         u64 off;
788                         xdr_decode_hyper((__be32 *)&seg->rs_offset, &off);
789                         dprintk("RPC:       %s: chunk %d@0x%llx:0x%x\n",
790                                 __func__,
791                                 be32_to_cpu(seg->rs_length),
792                                 (unsigned long long)off,
793                                 be32_to_cpu(seg->rs_handle));
794                 }
795                 total_len += be32_to_cpu(seg->rs_length);
796                 ++cur_wchunk;
797         }
798         /* check and adjust for properly terminated write chunk */
799         if (wrchunk) {
800                 __be32 *w = (__be32 *) cur_wchunk;
801                 if (*w++ != xdr_zero)
802                         return -1;
803                 cur_wchunk = (struct rpcrdma_write_chunk *) w;
804         }
805         if ((char *)cur_wchunk > base + rep->rr_len)
806                 return -1;
807
808         *iptrp = (__be32 *) cur_wchunk;
809         return total_len;
810 }
811
812 /**
813  * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs
814  * @rqst: controlling RPC request
815  * @srcp: points to RPC message payload in receive buffer
816  * @copy_len: remaining length of receive buffer content
817  * @pad: Write chunk pad bytes needed (zero for pure inline)
818  *
819  * The upper layer has set the maximum number of bytes it can
820  * receive in each component of rq_rcv_buf. These values are set in
821  * the head.iov_len, page_len, tail.iov_len, and buflen fields.
822  *
823  * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in
824  * many cases this function simply updates iov_base pointers in
825  * rq_rcv_buf to point directly to the received reply data, to
826  * avoid copying reply data.
827  *
828  * Returns the count of bytes which had to be memcopied.
829  */
830 static unsigned long
831 rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
832 {
833         unsigned long fixup_copy_count;
834         int i, npages, curlen;
835         char *destp;
836         struct page **ppages;
837         int page_base;
838
839         /* The head iovec is redirected to the RPC reply message
840          * in the receive buffer, to avoid a memcopy.
841          */
842         rqst->rq_rcv_buf.head[0].iov_base = srcp;
843         rqst->rq_private_buf.head[0].iov_base = srcp;
844
845         /* The contents of the receive buffer that follow
846          * head.iov_len bytes are copied into the page list.
847          */
848         curlen = rqst->rq_rcv_buf.head[0].iov_len;
849         if (curlen > copy_len)
850                 curlen = copy_len;
851         dprintk("RPC:       %s: srcp 0x%p len %d hdrlen %d\n",
852                 __func__, srcp, copy_len, curlen);
853         srcp += curlen;
854         copy_len -= curlen;
855
856         page_base = rqst->rq_rcv_buf.page_base;
857         ppages = rqst->rq_rcv_buf.pages + (page_base >> PAGE_SHIFT);
858         page_base &= ~PAGE_MASK;
859         fixup_copy_count = 0;
860         if (copy_len && rqst->rq_rcv_buf.page_len) {
861                 int pagelist_len;
862
863                 pagelist_len = rqst->rq_rcv_buf.page_len;
864                 if (pagelist_len > copy_len)
865                         pagelist_len = copy_len;
866                 npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT;
867                 for (i = 0; i < npages; i++) {
868                         curlen = PAGE_SIZE - page_base;
869                         if (curlen > pagelist_len)
870                                 curlen = pagelist_len;
871
872                         dprintk("RPC:       %s: page %d"
873                                 " srcp 0x%p len %d curlen %d\n",
874                                 __func__, i, srcp, copy_len, curlen);
875                         destp = kmap_atomic(ppages[i]);
876                         memcpy(destp + page_base, srcp, curlen);
877                         flush_dcache_page(ppages[i]);
878                         kunmap_atomic(destp);
879                         srcp += curlen;
880                         copy_len -= curlen;
881                         fixup_copy_count += curlen;
882                         pagelist_len -= curlen;
883                         if (!pagelist_len)
884                                 break;
885                         page_base = 0;
886                 }
887
888                 /* Implicit padding for the last segment in a Write
889                  * chunk is inserted inline at the front of the tail
890                  * iovec. The upper layer ignores the content of
891                  * the pad. Simply ensure inline content in the tail
892                  * that follows the Write chunk is properly aligned.
893                  */
894                 if (pad)
895                         srcp -= pad;
896         }
897
898         /* The tail iovec is redirected to the remaining data
899          * in the receive buffer, to avoid a memcopy.
900          */
901         if (copy_len || pad) {
902                 rqst->rq_rcv_buf.tail[0].iov_base = srcp;
903                 rqst->rq_private_buf.tail[0].iov_base = srcp;
904         }
905
906         return fixup_copy_count;
907 }
908
909 void
910 rpcrdma_connect_worker(struct work_struct *work)
911 {
912         struct rpcrdma_ep *ep =
913                 container_of(work, struct rpcrdma_ep, rep_connect_worker.work);
914         struct rpcrdma_xprt *r_xprt =
915                 container_of(ep, struct rpcrdma_xprt, rx_ep);
916         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
917
918         spin_lock_bh(&xprt->transport_lock);
919         if (++xprt->connect_cookie == 0)        /* maintain a reserved value */
920                 ++xprt->connect_cookie;
921         if (ep->rep_connected > 0) {
922                 if (!xprt_test_and_set_connected(xprt))
923                         xprt_wake_pending_tasks(xprt, 0);
924         } else {
925                 if (xprt_test_and_clear_connected(xprt))
926                         xprt_wake_pending_tasks(xprt, -ENOTCONN);
927         }
928         spin_unlock_bh(&xprt->transport_lock);
929 }
930
931 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
932 /* By convention, backchannel calls arrive via rdma_msg type
933  * messages, and never populate the chunk lists. This makes
934  * the RPC/RDMA header small and fixed in size, so it is
935  * straightforward to check the RPC header's direction field.
936  */
937 static bool
938 rpcrdma_is_bcall(struct rpcrdma_msg *headerp)
939 {
940         __be32 *p = (__be32 *)headerp;
941
942         if (headerp->rm_type != rdma_msg)
943                 return false;
944         if (headerp->rm_body.rm_chunks[0] != xdr_zero)
945                 return false;
946         if (headerp->rm_body.rm_chunks[1] != xdr_zero)
947                 return false;
948         if (headerp->rm_body.rm_chunks[2] != xdr_zero)
949                 return false;
950
951         /* sanity */
952         if (p[7] != headerp->rm_xid)
953                 return false;
954         /* call direction */
955         if (p[8] != cpu_to_be32(RPC_CALL))
956                 return false;
957
958         return true;
959 }
960 #endif  /* CONFIG_SUNRPC_BACKCHANNEL */
961
962 /*
963  * This function is called when an async event is posted to
964  * the connection which changes the connection state. All it
965  * does at this point is mark the connection up/down, the rpc
966  * timers do the rest.
967  */
968 void
969 rpcrdma_conn_func(struct rpcrdma_ep *ep)
970 {
971         schedule_delayed_work(&ep->rep_connect_worker, 0);
972 }
973
974 /* Process received RPC/RDMA messages.
975  *
976  * Errors must result in the RPC task either being awakened, or
977  * allowed to timeout, to discover the errors at that time.
978  */
979 void
980 rpcrdma_reply_handler(struct work_struct *work)
981 {
982         struct rpcrdma_rep *rep =
983                         container_of(work, struct rpcrdma_rep, rr_work);
984         struct rpcrdma_msg *headerp;
985         struct rpcrdma_req *req;
986         struct rpc_rqst *rqst;
987         struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
988         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
989         __be32 *iptr;
990         int rdmalen, status, rmerr;
991         unsigned long cwnd;
992
993         dprintk("RPC:       %s: incoming rep %p\n", __func__, rep);
994
995         if (rep->rr_len == RPCRDMA_BAD_LEN)
996                 goto out_badstatus;
997         if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
998                 goto out_shortreply;
999
1000         headerp = rdmab_to_msg(rep->rr_rdmabuf);
1001 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1002         if (rpcrdma_is_bcall(headerp))
1003                 goto out_bcall;
1004 #endif
1005
1006         /* Match incoming rpcrdma_rep to an rpcrdma_req to
1007          * get context for handling any incoming chunks.
1008          */
1009         spin_lock_bh(&xprt->transport_lock);
1010         rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
1011         if (!rqst)
1012                 goto out_nomatch;
1013
1014         req = rpcr_to_rdmar(rqst);
1015         if (req->rl_reply)
1016                 goto out_duplicate;
1017
1018         /* Sanity checking has passed. We are now committed
1019          * to complete this transaction.
1020          */
1021         list_del_init(&rqst->rq_list);
1022         spin_unlock_bh(&xprt->transport_lock);
1023         dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
1024                 __func__, rep, req, be32_to_cpu(headerp->rm_xid));
1025
1026         /* from here on, the reply is no longer an orphan */
1027         req->rl_reply = rep;
1028         xprt->reestablish_timeout = 0;
1029
1030         if (headerp->rm_vers != rpcrdma_version)
1031                 goto out_badversion;
1032
1033         /* check for expected message types */
1034         /* The order of some of these tests is important. */
1035         switch (headerp->rm_type) {
1036         case rdma_msg:
1037                 /* never expect read chunks */
1038                 /* never expect reply chunks (two ways to check) */
1039                 /* never expect write chunks without having offered RDMA */
1040                 if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
1041                     (headerp->rm_body.rm_chunks[1] == xdr_zero &&
1042                      headerp->rm_body.rm_chunks[2] != xdr_zero) ||
1043                     (headerp->rm_body.rm_chunks[1] != xdr_zero &&
1044                      list_empty(&req->rl_registered)))
1045                         goto badheader;
1046                 if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
1047                         /* count any expected write chunks in read reply */
1048                         /* start at write chunk array count */
1049                         iptr = &headerp->rm_body.rm_chunks[2];
1050                         rdmalen = rpcrdma_count_chunks(rep, 1, &iptr);
1051                         /* check for validity, and no reply chunk after */
1052                         if (rdmalen < 0 || *iptr++ != xdr_zero)
1053                                 goto badheader;
1054                         rep->rr_len -=
1055                             ((unsigned char *)iptr - (unsigned char *)headerp);
1056                         status = rep->rr_len + rdmalen;
1057                         r_xprt->rx_stats.total_rdma_reply += rdmalen;
1058                         /* special case - last chunk may omit padding */
1059                         if (rdmalen &= 3) {
1060                                 rdmalen = 4 - rdmalen;
1061                                 status += rdmalen;
1062                         }
1063                 } else {
1064                         /* else ordinary inline */
1065                         rdmalen = 0;
1066                         iptr = (__be32 *)((unsigned char *)headerp +
1067                                                         RPCRDMA_HDRLEN_MIN);
1068                         rep->rr_len -= RPCRDMA_HDRLEN_MIN;
1069                         status = rep->rr_len;
1070                 }
1071
1072                 r_xprt->rx_stats.fixup_copy_count +=
1073                         rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len,
1074                                              rdmalen);
1075                 break;
1076
1077         case rdma_nomsg:
1078                 /* never expect read or write chunks, always reply chunks */
1079                 if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
1080                     headerp->rm_body.rm_chunks[1] != xdr_zero ||
1081                     headerp->rm_body.rm_chunks[2] != xdr_one ||
1082                     list_empty(&req->rl_registered))
1083                         goto badheader;
1084                 iptr = (__be32 *)((unsigned char *)headerp +
1085                                                         RPCRDMA_HDRLEN_MIN);
1086                 rdmalen = rpcrdma_count_chunks(rep, 0, &iptr);
1087                 if (rdmalen < 0)
1088                         goto badheader;
1089                 r_xprt->rx_stats.total_rdma_reply += rdmalen;
1090                 /* Reply chunk buffer already is the reply vector - no fixup. */
1091                 status = rdmalen;
1092                 break;
1093
1094         case rdma_error:
1095                 goto out_rdmaerr;
1096
1097 badheader:
1098         default:
1099                 dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
1100                         rqst->rq_task->tk_pid, __func__,
1101                         be32_to_cpu(headerp->rm_type));
1102                 status = -EIO;
1103                 r_xprt->rx_stats.bad_reply_count++;
1104                 break;
1105         }
1106
1107 out:
1108         /* Invalidate and flush the data payloads before waking the
1109          * waiting application. This guarantees the memory region is
1110          * properly fenced from the server before the application
1111          * accesses the data. It also ensures proper send flow
1112          * control: waking the next RPC waits until this RPC has
1113          * relinquished all its Send Queue entries.
1114          */
1115         if (!list_empty(&req->rl_registered))
1116                 r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
1117
1118         spin_lock_bh(&xprt->transport_lock);
1119         cwnd = xprt->cwnd;
1120         xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
1121         if (xprt->cwnd > cwnd)
1122                 xprt_release_rqst_cong(rqst->rq_task);
1123
1124         xprt_complete_rqst(rqst->rq_task, status);
1125         spin_unlock_bh(&xprt->transport_lock);
1126         dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
1127                         __func__, xprt, rqst, status);
1128         return;
1129
1130 out_badstatus:
1131         rpcrdma_recv_buffer_put(rep);
1132         if (r_xprt->rx_ep.rep_connected == 1) {
1133                 r_xprt->rx_ep.rep_connected = -EIO;
1134                 rpcrdma_conn_func(&r_xprt->rx_ep);
1135         }
1136         return;
1137
1138 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1139 out_bcall:
1140         rpcrdma_bc_receive_call(r_xprt, rep);
1141         return;
1142 #endif
1143
1144 /* If the incoming reply terminated a pending RPC, the next
1145  * RPC call will post a replacement receive buffer as it is
1146  * being marshaled.
1147  */
1148 out_badversion:
1149         dprintk("RPC:       %s: invalid version %d\n",
1150                 __func__, be32_to_cpu(headerp->rm_vers));
1151         status = -EIO;
1152         r_xprt->rx_stats.bad_reply_count++;
1153         goto out;
1154
1155 out_rdmaerr:
1156         rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err);
1157         switch (rmerr) {
1158         case ERR_VERS:
1159                 pr_err("%s: server reports header version error (%u-%u)\n",
1160                        __func__,
1161                        be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low),
1162                        be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high));
1163                 break;
1164         case ERR_CHUNK:
1165                 pr_err("%s: server reports header decoding error\n",
1166                        __func__);
1167                 break;
1168         default:
1169                 pr_err("%s: server reports unknown error %d\n",
1170                        __func__, rmerr);
1171         }
1172         status = -EREMOTEIO;
1173         r_xprt->rx_stats.bad_reply_count++;
1174         goto out;
1175
1176 /* If no pending RPC transaction was matched, post a replacement
1177  * receive buffer before returning.
1178  */
1179 out_shortreply:
1180         dprintk("RPC:       %s: short/invalid reply\n", __func__);
1181         goto repost;
1182
1183 out_nomatch:
1184         spin_unlock_bh(&xprt->transport_lock);
1185         dprintk("RPC:       %s: no match for incoming xid 0x%08x len %d\n",
1186                 __func__, be32_to_cpu(headerp->rm_xid),
1187                 rep->rr_len);
1188         goto repost;
1189
1190 out_duplicate:
1191         spin_unlock_bh(&xprt->transport_lock);
1192         dprintk("RPC:       %s: "
1193                 "duplicate reply %p to RPC request %p: xid 0x%08x\n",
1194                 __func__, rep, req, be32_to_cpu(headerp->rm_xid));
1195
1196 repost:
1197         r_xprt->rx_stats.bad_reply_count++;
1198         if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
1199                 rpcrdma_recv_buffer_put(rep);
1200 }