Merge tag 'nfs-for-4.9-1' of git://git.linux-nfs.org/projects/anna/linux-nfs
[cascardo/linux.git] / net / sunrpc / xprtrdma / rpc_rdma.c
index a47f170..d987c2d 100644 (file)
 # define RPCDBG_FACILITY       RPCDBG_TRANS
 #endif
 
-enum rpcrdma_chunktype {
-       rpcrdma_noch = 0,
-       rpcrdma_readch,
-       rpcrdma_areadch,
-       rpcrdma_writech,
-       rpcrdma_replych
-};
-
 static const char transfertypes[][12] = {
        "inline",       /* no chunks */
        "read list",    /* some argument via rdma read */
@@ -118,10 +110,12 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
        return size;
 }
 
-void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *ia,
-                                 struct rpcrdma_create_data_internal *cdata,
-                                 unsigned int maxsegs)
+void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt)
 {
+       struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+       struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+       unsigned int maxsegs = ia->ri_max_segs;
+
        ia->ri_max_inline_write = cdata->inline_wsize -
                                  rpcrdma_max_call_header_size(maxsegs);
        ia->ri_max_inline_read = cdata->inline_rsize -
@@ -155,42 +149,6 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
        return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
 }
 
-static int
-rpcrdma_tail_pullup(struct xdr_buf *buf)
-{
-       size_t tlen = buf->tail[0].iov_len;
-       size_t skip = tlen & 3;
-
-       /* Do not include the tail if it is only an XDR pad */
-       if (tlen < 4)
-               return 0;
-
-       /* xdr_write_pages() adds a pad at the beginning of the tail
-        * if the content in "buf->pages" is unaligned. Force the
-        * tail's actual content to land at the next XDR position
-        * after the head instead.
-        */
-       if (skip) {
-               unsigned char *src, *dst;
-               unsigned int count;
-
-               src = buf->tail[0].iov_base;
-               dst = buf->head[0].iov_base;
-               dst += buf->head[0].iov_len;
-
-               src += skip;
-               tlen -= skip;
-
-               dprintk("RPC:       %s: skip=%zu, memmove(%p, %p, %zu)\n",
-                       __func__, skip, dst, src, tlen);
-
-               for (count = tlen; count; count--)
-                       *dst++ = *src++;
-       }
-
-       return tlen;
-}
-
 /* Split "vec" on page boundaries into segments. FMR registers pages,
  * not a byte range. Other modes coalesce these segments into a single
  * MR when they can.
@@ -229,7 +187,8 @@ rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n)
 
 static int
 rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
-       enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg)
+       enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg,
+       bool reminv_expected)
 {
        int len, n, p, page_base;
        struct page **ppages;
@@ -271,6 +230,13 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
        if (type == rpcrdma_readch)
                return n;
 
+       /* When encoding the Write list, some servers need to see an extra
+        * segment for odd-length Write chunks. The upper layer provides
+        * space in the tail iovec for this purpose.
+        */
+       if (type == rpcrdma_writech && reminv_expected)
+               return n;
+
        if (xdrbuf->tail[0].iov_len) {
                /* the rpcrdma protocol allows us to omit any trailing
                 * xdr pad bytes, saving the server an RDMA operation. */
@@ -327,7 +293,7 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
        if (rtype == rpcrdma_areadch)
                pos = 0;
        seg = req->rl_segments;
-       nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg);
+       nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg, false);
        if (nsegs < 0)
                return ERR_PTR(nsegs);
 
@@ -391,7 +357,8 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
        seg = req->rl_segments;
        nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
                                     rqst->rq_rcv_buf.head[0].iov_len,
-                                    wtype, seg);
+                                    wtype, seg,
+                                    r_xprt->rx_ia.ri_reminv_expected);
        if (nsegs < 0)
                return ERR_PTR(nsegs);
 
@@ -456,7 +423,8 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
        }
 
        seg = req->rl_segments;
-       nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg);
+       nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg,
+                                    r_xprt->rx_ia.ri_reminv_expected);
        if (nsegs < 0)
                return ERR_PTR(nsegs);
 
@@ -491,74 +459,184 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
        return iptr;
 }
 
-/*
- * Copy write data inline.
- * This function is used for "small" requests. Data which is passed
- * to RPC via iovecs (or page list) is copied directly into the
- * pre-registered memory buffer for this request. For small amounts
- * of data, this is efficient. The cutoff value is tunable.
+/* Prepare the RPC-over-RDMA header SGE.
  */
-static void rpcrdma_inline_pullup(struct rpc_rqst *rqst)
+static bool
+rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
+                       u32 len)
 {
-       int i, npages, curlen;
-       int copy_len;
-       unsigned char *srcp, *destp;
-       struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
-       int page_base;
-       struct page **ppages;
+       struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
+       struct ib_sge *sge = &req->rl_send_sge[0];
+
+       if (unlikely(!rpcrdma_regbuf_is_mapped(rb))) {
+               if (!__rpcrdma_dma_map_regbuf(ia, rb))
+                       return false;
+               sge->addr = rdmab_addr(rb);
+               sge->lkey = rdmab_lkey(rb);
+       }
+       sge->length = len;
+
+       ib_dma_sync_single_for_device(ia->ri_device, sge->addr,
+                                     sge->length, DMA_TO_DEVICE);
+       req->rl_send_wr.num_sge++;
+       return true;
+}
 
-       destp = rqst->rq_svec[0].iov_base;
-       curlen = rqst->rq_svec[0].iov_len;
-       destp += curlen;
+/* Prepare the Send SGEs. The head and tail iovec, and each entry
+ * in the page list, gets its own SGE.
+ */
+static bool
+rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
+                        struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
+{
+       unsigned int sge_no, page_base, len, remaining;
+       struct rpcrdma_regbuf *rb = req->rl_sendbuf;
+       struct ib_device *device = ia->ri_device;
+       struct ib_sge *sge = req->rl_send_sge;
+       u32 lkey = ia->ri_pd->local_dma_lkey;
+       struct page *page, **ppages;
+
+       /* The head iovec is straightforward, as it is already
+        * DMA-mapped. Sync the content that has changed.
+        */
+       if (!rpcrdma_dma_map_regbuf(ia, rb))
+               return false;
+       sge_no = 1;
+       sge[sge_no].addr = rdmab_addr(rb);
+       sge[sge_no].length = xdr->head[0].iov_len;
+       sge[sge_no].lkey = rdmab_lkey(rb);
+       ib_dma_sync_single_for_device(device, sge[sge_no].addr,
+                                     sge[sge_no].length, DMA_TO_DEVICE);
+
+       /* If there is a Read chunk, the page list is being handled
+        * via explicit RDMA, and thus is skipped here. However, the
+        * tail iovec may include an XDR pad for the page list, as
+        * well as additional content, and may not reside in the
+        * same page as the head iovec.
+        */
+       if (rtype == rpcrdma_readch) {
+               len = xdr->tail[0].iov_len;
 
-       dprintk("RPC:       %s: destp 0x%p len %d hdrlen %d\n",
-               __func__, destp, rqst->rq_slen, curlen);
+               /* Do not include the tail if it is only an XDR pad */
+               if (len < 4)
+                       goto out;
 
-       copy_len = rqst->rq_snd_buf.page_len;
+               page = virt_to_page(xdr->tail[0].iov_base);
+               page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
 
-       if (rqst->rq_snd_buf.tail[0].iov_len) {
-               curlen = rqst->rq_snd_buf.tail[0].iov_len;
-               if (destp + copy_len != rqst->rq_snd_buf.tail[0].iov_base) {
-                       memmove(destp + copy_len,
-                               rqst->rq_snd_buf.tail[0].iov_base, curlen);
-                       r_xprt->rx_stats.pullup_copy_count += curlen;
+               /* If the content in the page list is an odd length,
+                * xdr_write_pages() has added a pad at the beginning
+                * of the tail iovec. Force the tail's non-pad content
+                * to land at the next XDR position in the Send message.
+                */
+               page_base += len & 3;
+               len -= len & 3;
+               goto map_tail;
+       }
+
+       /* If there is a page list present, temporarily DMA map
+        * and prepare an SGE for each page to be sent.
+        */
+       if (xdr->page_len) {
+               ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
+               page_base = xdr->page_base & ~PAGE_MASK;
+               remaining = xdr->page_len;
+               while (remaining) {
+                       sge_no++;
+                       if (sge_no > RPCRDMA_MAX_SEND_SGES - 2)
+                               goto out_mapping_overflow;
+
+                       len = min_t(u32, PAGE_SIZE - page_base, remaining);
+                       sge[sge_no].addr = ib_dma_map_page(device, *ppages,
+                                                          page_base, len,
+                                                          DMA_TO_DEVICE);
+                       if (ib_dma_mapping_error(device, sge[sge_no].addr))
+                               goto out_mapping_err;
+                       sge[sge_no].length = len;
+                       sge[sge_no].lkey = lkey;
+
+                       req->rl_mapped_sges++;
+                       ppages++;
+                       remaining -= len;
+                       page_base = 0;
                }
-               dprintk("RPC:       %s: tail destp 0x%p len %d\n",
-                       __func__, destp + copy_len, curlen);
-               rqst->rq_svec[0].iov_len += curlen;
        }
-       r_xprt->rx_stats.pullup_copy_count += copy_len;
 
-       page_base = rqst->rq_snd_buf.page_base;
-       ppages = rqst->rq_snd_buf.pages + (page_base >> PAGE_SHIFT);
-       page_base &= ~PAGE_MASK;
-       npages = PAGE_ALIGN(page_base+copy_len) >> PAGE_SHIFT;
-       for (i = 0; copy_len && i < npages; i++) {
-               curlen = PAGE_SIZE - page_base;
-               if (curlen > copy_len)
-                       curlen = copy_len;
-               dprintk("RPC:       %s: page %d destp 0x%p len %d curlen %d\n",
-                       __func__, i, destp, copy_len, curlen);
-               srcp = kmap_atomic(ppages[i]);
-               memcpy(destp, srcp+page_base, curlen);
-               kunmap_atomic(srcp);
-               rqst->rq_svec[0].iov_len += curlen;
-               destp += curlen;
-               copy_len -= curlen;
-               page_base = 0;
+       /* The tail iovec is not always constructed in the same
+        * page where the head iovec resides (see, for example,
+        * gss_wrap_req_priv). To neatly accommodate that case,
+        * DMA map it separately.
+        */
+       if (xdr->tail[0].iov_len) {
+               page = virt_to_page(xdr->tail[0].iov_base);
+               page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
+               len = xdr->tail[0].iov_len;
+
+map_tail:
+               sge_no++;
+               sge[sge_no].addr = ib_dma_map_page(device, page,
+                                                  page_base, len,
+                                                  DMA_TO_DEVICE);
+               if (ib_dma_mapping_error(device, sge[sge_no].addr))
+                       goto out_mapping_err;
+               sge[sge_no].length = len;
+               sge[sge_no].lkey = lkey;
+               req->rl_mapped_sges++;
        }
-       /* header now contains entire send message */
+
+out:
+       req->rl_send_wr.num_sge = sge_no + 1;
+       return true;
+
+out_mapping_overflow:
+       pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
+       return false;
+
+out_mapping_err:
+       pr_err("rpcrdma: Send mapping error\n");
+       return false;
+}
+
+bool
+rpcrdma_prepare_send_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
+                         u32 hdrlen, struct xdr_buf *xdr,
+                         enum rpcrdma_chunktype rtype)
+{
+       req->rl_send_wr.num_sge = 0;
+       req->rl_mapped_sges = 0;
+
+       if (!rpcrdma_prepare_hdr_sge(ia, req, hdrlen))
+               goto out_map;
+
+       if (rtype != rpcrdma_areadch)
+               if (!rpcrdma_prepare_msg_sges(ia, req, xdr, rtype))
+                       goto out_map;
+
+       return true;
+
+out_map:
+       pr_err("rpcrdma: failed to DMA map a Send buffer\n");
+       return false;
+}
+
+void
+rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
+{
+       struct ib_device *device = ia->ri_device;
+       struct ib_sge *sge;
+       int count;
+
+       sge = &req->rl_send_sge[2];
+       for (count = req->rl_mapped_sges; count--; sge++)
+               ib_dma_unmap_page(device, sge->addr, sge->length,
+                                 DMA_TO_DEVICE);
+       req->rl_mapped_sges = 0;
 }
 
 /*
  * Marshal a request: the primary job of this routine is to choose
  * the transfer modes. See comments below.
  *
- * Prepares up to two IOVs per Call message:
- *
- *  [0] -- RPC RDMA header
- *  [1] -- the RPC header/data
- *
  * Returns zero on success, otherwise a negative errno.
  */
 
@@ -626,12 +704,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
         */
        if (rpcrdma_args_inline(r_xprt, rqst)) {
                rtype = rpcrdma_noch;
-               rpcrdma_inline_pullup(rqst);
-               rpclen = rqst->rq_svec[0].iov_len;
+               rpclen = rqst->rq_snd_buf.len;
        } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
                rtype = rpcrdma_readch;
-               rpclen = rqst->rq_svec[0].iov_len;
-               rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
+               rpclen = rqst->rq_snd_buf.head[0].iov_len +
+                        rqst->rq_snd_buf.tail[0].iov_len;
        } else {
                r_xprt->rx_stats.nomsg_call_count++;
                headerp->rm_type = htonl(RDMA_NOMSG);
@@ -673,34 +750,18 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
                goto out_unmap;
        hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;
 
-       if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
-               goto out_overflow;
-
        dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
                rqst->rq_task->tk_pid, __func__,
                transfertypes[rtype], transfertypes[wtype],
                hdrlen, rpclen);
 
-       req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
-       req->rl_send_iov[0].length = hdrlen;
-       req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
-
-       req->rl_niovs = 1;
-       if (rtype == rpcrdma_areadch)
-               return 0;
-
-       req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
-       req->rl_send_iov[1].length = rpclen;
-       req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
-
-       req->rl_niovs = 2;
+       if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, hdrlen,
+                                      &rqst->rq_snd_buf, rtype)) {
+               iptr = ERR_PTR(-EIO);
+               goto out_unmap;
+       }
        return 0;
 
-out_overflow:
-       pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n",
-               hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]);
-       iptr = ERR_PTR(-EIO);
-
 out_unmap:
        r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
        return PTR_ERR(iptr);
@@ -916,8 +977,10 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
  * allowed to timeout, to discover the errors at that time.
  */
 void
-rpcrdma_reply_handler(struct rpcrdma_rep *rep)
+rpcrdma_reply_handler(struct work_struct *work)
 {
+       struct rpcrdma_rep *rep =
+                       container_of(work, struct rpcrdma_rep, rr_work);
        struct rpcrdma_msg *headerp;
        struct rpcrdma_req *req;
        struct rpc_rqst *rqst;
@@ -1132,6 +1195,6 @@ out_duplicate:
 
 repost:
        r_xprt->rx_stats.bad_reply_count++;
-       if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+       if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
                rpcrdma_recv_buffer_put(rep);
 }