Merge tag 'nfs-for-4.7-1' of git://git.linux-nfs.org/projects/anna/linux-nfs
authorLinus Torvalds <torvalds@linux-foundation.org>
Thu, 26 May 2016 17:33:33 +0000 (10:33 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Thu, 26 May 2016 17:33:33 +0000 (10:33 -0700)
Pull NFS client updates from Anna Schumaker:
 "Highlights include:

  Features:
   - Add support for the NFS v4.2 COPY operation
   - Add support for NFS/RDMA over IPv6

  Bugfixes and cleanups:
   - Avoid race that crashes nfs_init_commit()
   - Fix oops in callback path
   - Fix LOCK/OPEN race when unlinking an open file
   - Choose correct stateids when using delegations in setattr, read and
     write
   - Don't send empty SETATTR after OPEN_CREATE
   - xprtrdma: Prevent server from writing a reply into memory client
     has released
   - xprtrdma: Support using Read list and Reply chunk in one RPC call"

* tag 'nfs-for-4.7-1' of git://git.linux-nfs.org/projects/anna/linux-nfs: (61 commits)
  pnfs: pnfs_update_layout needs to consider if strict iomode checking is on
  nfs/flexfiles: Use the layout segment for reading unless it a IOMODE_RW and reading is disabled
  nfs/flexfiles: Helper function to detect FF_FLAGS_NO_READ_IO
  nfs: avoid race that crashes nfs_init_commit
  NFS: checking for NULL instead of IS_ERR() in nfs_commit_file()
  pnfs: make pnfs_layout_process more robust
  pnfs: rework LAYOUTGET retry handling
  pnfs: lift retry logic from send_layoutget to pnfs_update_layout
  pnfs: fix bad error handling in send_layoutget
  flexfiles: add kerneldoc header to nfs4_ff_layout_prepare_ds
  flexfiles: remove pointless setting of NFS_LAYOUT_RETURN_REQUESTED
  pnfs: only tear down lsegs that precede seqid in LAYOUTRETURN args
  pnfs: keep track of the return sequence number in pnfs_layout_hdr
  pnfs: record sequence in pnfs_layout_segment when it's created
  pnfs: don't merge new ff lsegs with ones that have LAYOUTRETURN bit set
  pNFS/flexfiles: When initing reads or writes, we might have to retry connecting to DSes
  pNFS/flexfiles: When checking for available DSes, conditionally check for MDS io
  pNFS/flexfile: Fix erroneous fall back to read/write through the MDS
  NFS: Reclaim writes via writepage are opportunistic
  NFSv4: Use the right stateid for delegations in setattr, read and write
  ...

1  2 
fs/nfs/direct.c
fs/nfs/nfs42proc.c
fs/nfs/nfs4proc.c
include/linux/nfs_xdr.h
net/sunrpc/xprtrdma/frwr_ops.c
net/sunrpc/xprtsock.c

diff --combined fs/nfs/direct.c
@@@ -87,6 -87,7 +87,7 @@@ struct nfs_direct_req 
        int                     mirror_count;
  
        ssize_t                 count,          /* bytes actually processed */
+                               max_count,      /* max expected count */
                                bytes_left,     /* bytes left to be sent */
                                io_start,       /* start of IO */
                                error;          /* any reported error */
@@@ -123,6 -124,8 +124,8 @@@ nfs_direct_good_bytes(struct nfs_direct
        int i;
        ssize_t count;
  
+       WARN_ON_ONCE(dreq->count >= dreq->max_count);
        if (dreq->mirror_count == 1) {
                dreq->mirrors[hdr->pgio_mirror_idx].count += hdr->good_bytes;
                dreq->count += hdr->good_bytes;
@@@ -250,7 -253,7 +253,7 @@@ static int nfs_direct_cmp_commit_data_v
   * shunt off direct read and write requests before the VFS gets them,
   * so this method is only ever called for swap.
   */
 -ssize_t nfs_direct_IO(struct kiocb *iocb, struct iov_iter *iter, loff_t pos)
 +ssize_t nfs_direct_IO(struct kiocb *iocb, struct iov_iter *iter)
  {
        struct inode *inode = iocb->ki_filp->f_mapping->host;
  
        VM_BUG_ON(iov_iter_count(iter) != PAGE_SIZE);
  
        if (iov_iter_rw(iter) == READ)
 -              return nfs_file_direct_read(iocb, iter, pos);
 +              return nfs_file_direct_read(iocb, iter);
        return nfs_file_direct_write(iocb, iter);
  }
  
@@@ -275,7 -278,7 +278,7 @@@ static void nfs_direct_release_pages(st
  void nfs_init_cinfo_from_dreq(struct nfs_commit_info *cinfo,
                              struct nfs_direct_req *dreq)
  {
-       cinfo->lock = &dreq->inode->i_lock;
+       cinfo->inode = dreq->inode;
        cinfo->mds = &dreq->mds_cinfo;
        cinfo->ds = &dreq->ds_cinfo;
        cinfo->dreq = dreq;
@@@ -396,7 -399,7 +399,7 @@@ static void nfs_direct_complete(struct 
  static void nfs_direct_readpage_release(struct nfs_page *req)
  {
        dprintk("NFS: direct read done (%s/%llu %d@%lld)\n",
 -              d_inode(req->wb_context->dentry)->i_sb->s_id,
 +              req->wb_context->dentry->d_sb->s_id,
                (unsigned long long)NFS_FILEID(d_inode(req->wb_context->dentry)),
                req->wb_bytes,
                (long long)req_offset(req));
@@@ -545,6 -548,7 +548,6 @@@ static ssize_t nfs_direct_read_schedule
   * nfs_file_direct_read - file direct read operation for NFS files
   * @iocb: target I/O control block
   * @iter: vector of user buffers into which to read data
 - * @pos: byte offset in file where reading starts
   *
   * We use this function for direct reads instead of calling
   * generic_file_aio_read() in order to avoid gfar's check to see if
   * client must read the updated atime from the server back into its
   * cache.
   */
 -ssize_t nfs_file_direct_read(struct kiocb *iocb, struct iov_iter *iter,
 -                              loff_t pos)
 +ssize_t nfs_file_direct_read(struct kiocb *iocb, struct iov_iter *iter)
  {
        struct file *file = iocb->ki_filp;
        struct address_space *mapping = file->f_mapping;
        nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count);
  
        dfprintk(FILE, "NFS: direct read(%pD2, %zd@%Ld)\n",
 -              file, count, (long long) pos);
 +              file, count, (long long) iocb->ki_pos);
  
        result = 0;
        if (!count)
                goto out_unlock;
  
        dreq->inode = inode;
-       dreq->bytes_left = count;
+       dreq->bytes_left = dreq->max_count = count;
 -      dreq->io_start = pos;
 +      dreq->io_start = iocb->ki_pos;
        dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
        l_ctx = nfs_get_lock_context(dreq->ctx);
        if (IS_ERR(l_ctx)) {
                dreq->iocb = iocb;
  
        NFS_I(inode)->read_io += count;
 -      result = nfs_direct_read_schedule_iovec(dreq, iter, pos);
 +      result = nfs_direct_read_schedule_iovec(dreq, iter, iocb->ki_pos);
  
        inode_unlock(inode);
  
        if (!result) {
                result = nfs_direct_wait(dreq);
                if (result > 0)
 -                      iocb->ki_pos = pos + result;
 +                      iocb->ki_pos += result;
        }
  
        nfs_direct_req_release(dreq);
@@@ -630,13 -635,13 +633,13 @@@ nfs_direct_write_scan_commit_list(struc
                                  struct list_head *list,
                                  struct nfs_commit_info *cinfo)
  {
-       spin_lock(cinfo->lock);
+       spin_lock(&cinfo->inode->i_lock);
  #ifdef CONFIG_NFS_V4_1
        if (cinfo->ds != NULL && cinfo->ds->nwritten != 0)
                NFS_SERVER(inode)->pnfs_curr_ld->recover_commit_reqs(list, cinfo);
  #endif
        nfs_scan_commit_list(&cinfo->mds->list, list, cinfo, 0);
-       spin_unlock(cinfo->lock);
+       spin_unlock(&cinfo->inode->i_lock);
  }
  
  static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
                if (!nfs_pageio_add_request(&desc, req)) {
                        nfs_list_remove_request(req);
                        nfs_list_add_request(req, &failed);
-                       spin_lock(cinfo.lock);
+                       spin_lock(&cinfo.inode->i_lock);
                        dreq->flags = 0;
                        if (desc.pg_error < 0)
                                dreq->error = desc.pg_error;
                        else
                                dreq->error = -EIO;
-                       spin_unlock(cinfo.lock);
+                       spin_unlock(&cinfo.inode->i_lock);
                }
                nfs_release_request(req);
        }
@@@ -967,6 -972,7 +970,6 @@@ static ssize_t nfs_direct_write_schedul
   * nfs_file_direct_write - file direct write operation for NFS files
   * @iocb: target I/O control block
   * @iter: vector of user buffers from which to write data
 - * @pos: byte offset in file where writing starts
   *
   * We use this function for direct writes instead of calling
   * generic_file_aio_write() in order to avoid taking the inode
@@@ -1023,7 -1029,7 +1026,7 @@@ ssize_t nfs_file_direct_write(struct ki
                goto out_unlock;
  
        dreq->inode = inode;
-       dreq->bytes_left = iov_iter_count(iter);
+       dreq->bytes_left = dreq->max_count = iov_iter_count(iter);
        dreq->io_start = pos;
        dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
        l_ctx = nfs_get_lock_context(dreq->ctx);
                        if (i_size_read(inode) < iocb->ki_pos)
                                i_size_write(inode, iocb->ki_pos);
                        spin_unlock(&inode->i_lock);
 -                      generic_write_sync(file, pos, result);
 +
 +                      /* XXX: should check the generic_write_sync retval */
 +                      generic_write_sync(iocb, result);
                }
        }
        nfs_direct_req_release(dreq);
diff --combined fs/nfs/nfs42proc.c
@@@ -126,6 -126,111 +126,111 @@@ int nfs42_proc_deallocate(struct file *
        return err;
  }
  
 -              mutex_lock(&file_inode(dst)->i_mutex);
+ static ssize_t _nfs42_proc_copy(struct file *src, loff_t pos_src,
+                               struct nfs_lock_context *src_lock,
+                               struct file *dst, loff_t pos_dst,
+                               struct nfs_lock_context *dst_lock,
+                               size_t count)
+ {
+       struct nfs42_copy_args args = {
+               .src_fh         = NFS_FH(file_inode(src)),
+               .src_pos        = pos_src,
+               .dst_fh         = NFS_FH(file_inode(dst)),
+               .dst_pos        = pos_dst,
+               .count          = count,
+       };
+       struct nfs42_copy_res res;
+       struct rpc_message msg = {
+               .rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_COPY],
+               .rpc_argp = &args,
+               .rpc_resp = &res,
+       };
+       struct inode *dst_inode = file_inode(dst);
+       struct nfs_server *server = NFS_SERVER(dst_inode);
+       int status;
+       status = nfs4_set_rw_stateid(&args.src_stateid, src_lock->open_context,
+                                    src_lock, FMODE_READ);
+       if (status)
+               return status;
+       status = nfs4_set_rw_stateid(&args.dst_stateid, dst_lock->open_context,
+                                    dst_lock, FMODE_WRITE);
+       if (status)
+               return status;
+       status = nfs4_call_sync(server->client, server, &msg,
+                               &args.seq_args, &res.seq_res, 0);
+       if (status == -ENOTSUPP)
+               server->caps &= ~NFS_CAP_COPY;
+       if (status)
+               return status;
+       if (res.write_res.verifier.committed != NFS_FILE_SYNC) {
+               status = nfs_commit_file(dst, &res.write_res.verifier.verifier);
+               if (status)
+                       return status;
+       }
+       truncate_pagecache_range(dst_inode, pos_dst,
+                                pos_dst + res.write_res.count);
+       return res.write_res.count;
+ }
+ ssize_t nfs42_proc_copy(struct file *src, loff_t pos_src,
+                       struct file *dst, loff_t pos_dst,
+                       size_t count)
+ {
+       struct nfs_server *server = NFS_SERVER(file_inode(dst));
+       struct nfs_lock_context *src_lock;
+       struct nfs_lock_context *dst_lock;
+       struct nfs4_exception src_exception = { };
+       struct nfs4_exception dst_exception = { };
+       ssize_t err, err2;
+       if (!nfs_server_capable(file_inode(dst), NFS_CAP_COPY))
+               return -EOPNOTSUPP;
+       src_lock = nfs_get_lock_context(nfs_file_open_context(src));
+       if (IS_ERR(src_lock))
+               return PTR_ERR(src_lock);
+       src_exception.inode = file_inode(src);
+       src_exception.state = src_lock->open_context->state;
+       dst_lock = nfs_get_lock_context(nfs_file_open_context(dst));
+       if (IS_ERR(dst_lock)) {
+               err = PTR_ERR(dst_lock);
+               goto out_put_src_lock;
+       }
+       dst_exception.inode = file_inode(dst);
+       dst_exception.state = dst_lock->open_context->state;
+       do {
 -              mutex_unlock(&file_inode(dst)->i_mutex);
++              inode_lock(file_inode(dst));
+               err = _nfs42_proc_copy(src, pos_src, src_lock,
+                                      dst, pos_dst, dst_lock, count);
++              inode_unlock(file_inode(dst));
+               if (err == -ENOTSUPP) {
+                       err = -EOPNOTSUPP;
+                       break;
+               }
+               err2 = nfs4_handle_exception(server, err, &src_exception);
+               err  = nfs4_handle_exception(server, err, &dst_exception);
+               if (!err)
+                       err = err2;
+       } while (src_exception.retry || dst_exception.retry);
+       nfs_put_lock_context(dst_lock);
+ out_put_src_lock:
+       nfs_put_lock_context(src_lock);
+       return err;
+ }
  static loff_t _nfs42_proc_llseek(struct file *filep,
                struct nfs_lock_context *lock, loff_t offset, int whence)
  {
@@@ -232,7 -337,7 +337,7 @@@ nfs42_layoutstat_done(struct rpc_task *
                         * with the current stateid.
                         */
                        set_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags);
-                       pnfs_mark_matching_lsegs_invalid(lo, &head, NULL);
+                       pnfs_mark_matching_lsegs_invalid(lo, &head, NULL, 0);
                        spin_unlock(&inode->i_lock);
                        pnfs_free_lseg_list(&head);
                } else
diff --combined fs/nfs/nfs4proc.c
  #define NFS4_POLL_RETRY_MIN   (HZ/10)
  #define NFS4_POLL_RETRY_MAX   (15*HZ)
  
+ /* file attributes which can be mapped to nfs attributes */
+ #define NFS4_VALID_ATTRS (ATTR_MODE \
+       | ATTR_UID \
+       | ATTR_GID \
+       | ATTR_SIZE \
+       | ATTR_ATIME \
+       | ATTR_MTIME \
+       | ATTR_CTIME \
+       | ATTR_ATIME_SET \
+       | ATTR_MTIME_SET)
  struct nfs4_opendata;
  static int _nfs4_proc_open(struct nfs4_opendata *data);
  static int _nfs4_recover_proc_open(struct nfs4_opendata *data);
@@@ -416,6 -427,7 +427,7 @@@ static int nfs4_do_handle_exception(str
                case -NFS4ERR_DELAY:
                        nfs_inc_server_stats(server, NFSIOS_DELAY);
                case -NFS4ERR_GRACE:
+               case -NFS4ERR_RECALLCONFLICT:
                        exception->delay = 1;
                        return 0;
  
@@@ -2558,15 -2570,20 +2570,20 @@@ static int _nfs4_do_open(struct inode *
        if ((opendata->o_arg.open_flags & (O_CREAT|O_EXCL)) == (O_CREAT|O_EXCL) &&
            (opendata->o_arg.createmode != NFS4_CREATE_GUARDED)) {
                nfs4_exclusive_attrset(opendata, sattr, &label);
-               nfs_fattr_init(opendata->o_res.f_attr);
-               status = nfs4_do_setattr(state->inode, cred,
-                               opendata->o_res.f_attr, sattr,
-                               state, label, olabel);
-               if (status == 0) {
-                       nfs_setattr_update_inode(state->inode, sattr,
-                                       opendata->o_res.f_attr);
-                       nfs_setsecurity(state->inode, opendata->o_res.f_attr, olabel);
+               /*
+                * send create attributes which was not set by open
+                * with an extra setattr.
+                */
+               if (sattr->ia_valid & NFS4_VALID_ATTRS) {
+                       nfs_fattr_init(opendata->o_res.f_attr);
+                       status = nfs4_do_setattr(state->inode, cred,
+                                       opendata->o_res.f_attr, sattr,
+                                       state, label, olabel);
+                       if (status == 0) {
+                               nfs_setattr_update_inode(state->inode, sattr,
+                                               opendata->o_res.f_attr);
+                               nfs_setsecurity(state->inode, opendata->o_res.f_attr, olabel);
+                       }
                }
        }
        if (opened && opendata->file_created)
@@@ -2676,6 -2693,7 +2693,7 @@@ static int _nfs4_do_setattr(struct inod
                .rpc_resp       = &res,
                .rpc_cred       = cred,
          };
+       struct rpc_cred *delegation_cred = NULL;
        unsigned long timestamp = jiffies;
        fmode_t fmode;
        bool truncate;
        truncate = (sattr->ia_valid & ATTR_SIZE) ? true : false;
        fmode = truncate ? FMODE_WRITE : FMODE_READ;
  
-       if (nfs4_copy_delegation_stateid(&arg.stateid, inode, fmode)) {
+       if (nfs4_copy_delegation_stateid(inode, fmode, &arg.stateid, &delegation_cred)) {
                /* Use that stateid */
        } else if (truncate && state != NULL) {
                struct nfs_lockowner lockowner = {
                };
                if (!nfs4_valid_open_stateid(state))
                        return -EBADF;
-               if (nfs4_select_rw_stateid(&arg.stateid, state, FMODE_WRITE,
-                               &lockowner) == -EIO)
+               if (nfs4_select_rw_stateid(state, FMODE_WRITE, &lockowner,
+                               &arg.stateid, &delegation_cred) == -EIO)
                        return -EBADF;
        } else
                nfs4_stateid_copy(&arg.stateid, &zero_stateid);
+       if (delegation_cred)
+               msg.rpc_cred = delegation_cred;
  
        status = nfs4_call_sync(server->client, server, &msg, &arg.seq_args, &res.seq_res, 1);
+       put_rpccred(delegation_cred);
        if (status == 0 && state != NULL)
                renew_lease(server, timestamp);
        trace_nfs4_setattr(inode, &arg.stateid, status);
@@@ -3777,7 -3799,7 +3799,7 @@@ static void nfs4_proc_unlink_setup(stru
  
  static void nfs4_proc_unlink_rpc_prepare(struct rpc_task *task, struct nfs_unlinkdata *data)
  {
 -      nfs4_setup_sequence(NFS_SERVER(data->dir),
 +      nfs4_setup_sequence(NFS_SB(data->dentry->d_sb),
                        &data->args.seq_args,
                        &data->res.seq_res,
                        task);
@@@ -4285,7 -4307,7 +4307,7 @@@ int nfs4_set_rw_stateid(nfs4_stateid *s
  
        if (l_ctx != NULL)
                lockowner = &l_ctx->lockowner;
-       return nfs4_select_rw_stateid(stateid, ctx->state, fmode, lockowner);
+       return nfs4_select_rw_stateid(ctx->state, fmode, lockowner, stateid, NULL);
  }
  EXPORT_SYMBOL_GPL(nfs4_set_rw_stateid);
  
@@@ -6054,6 -6076,7 +6076,7 @@@ static int nfs41_lock_expired(struct nf
  static int _nfs4_proc_setlk(struct nfs4_state *state, int cmd, struct file_lock *request)
  {
        struct nfs_inode *nfsi = NFS_I(state->inode);
+       struct nfs4_state_owner *sp = state->owner;
        unsigned char fl_flags = request->fl_flags;
        int status = -ENOLCK;
  
        status = do_vfs_lock(state->inode, request);
        if (status < 0)
                goto out;
+       mutex_lock(&sp->so_delegreturn_mutex);
        down_read(&nfsi->rwsem);
        if (test_bit(NFS_DELEGATED_STATE, &state->flags)) {
                /* Yes: cache locks! */
                request->fl_flags = fl_flags & ~FL_SLEEP;
                status = do_vfs_lock(state->inode, request);
                up_read(&nfsi->rwsem);
+               mutex_unlock(&sp->so_delegreturn_mutex);
                goto out;
        }
        up_read(&nfsi->rwsem);
+       mutex_unlock(&sp->so_delegreturn_mutex);
        status = _nfs4_do_setlk(state, cmd, request, NFS_LOCK_NEW);
  out:
        request->fl_flags = fl_flags;
@@@ -6263,10 -6289,10 +6289,10 @@@ static int nfs4_xattr_set_nfs4_acl(cons
  }
  
  static int nfs4_xattr_get_nfs4_acl(const struct xattr_handler *handler,
 -                                 struct dentry *dentry, const char *key,
 -                                 void *buf, size_t buflen)
 +                                 struct dentry *unused, struct inode *inode,
 +                                 const char *key, void *buf, size_t buflen)
  {
 -      return nfs4_proc_get_acl(d_inode(dentry), buf, buflen);
 +      return nfs4_proc_get_acl(inode, buf, buflen);
  }
  
  static bool nfs4_xattr_list_nfs4_acl(struct dentry *dentry)
@@@ -6288,11 -6314,11 +6314,11 @@@ static int nfs4_xattr_set_nfs4_label(co
  }
  
  static int nfs4_xattr_get_nfs4_label(const struct xattr_handler *handler,
 -                                   struct dentry *dentry, const char *key,
 -                                   void *buf, size_t buflen)
 +                                   struct dentry *unused, struct inode *inode,
 +                                   const char *key, void *buf, size_t buflen)
  {
        if (security_ismaclabel(key))
 -              return nfs4_get_security_label(d_inode(dentry), buf, buflen);
 +              return nfs4_get_security_label(inode, buf, buflen);
        return -EOPNOTSUPP;
  }
  
@@@ -7351,9 -7377,11 +7377,11 @@@ int nfs4_proc_get_lease_time(struct nfs
   * always set csa_cachethis to FALSE because the current implementation
   * of the back channel DRC only supports caching the CB_SEQUENCE operation.
   */
- static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args)
+ static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args,
+                                   struct rpc_clnt *clnt)
  {
        unsigned int max_rqst_sz, max_resp_sz;
+       unsigned int max_bc_payload = rpc_max_bc_payload(clnt);
  
        max_rqst_sz = NFS_MAX_FILE_IO_SIZE + nfs41_maxwrite_overhead;
        max_resp_sz = NFS_MAX_FILE_IO_SIZE + nfs41_maxread_overhead;
                args->fc_attrs.max_ops, args->fc_attrs.max_reqs);
  
        /* Back channel attributes */
-       args->bc_attrs.max_rqst_sz = PAGE_SIZE;
-       args->bc_attrs.max_resp_sz = PAGE_SIZE;
+       args->bc_attrs.max_rqst_sz = max_bc_payload;
+       args->bc_attrs.max_resp_sz = max_bc_payload;
        args->bc_attrs.max_resp_sz_cached = 0;
        args->bc_attrs.max_ops = NFS4_MAX_BACK_CHANNEL_OPS;
        args->bc_attrs.max_reqs = NFS41_BC_MAX_CALLBACKS;
@@@ -7476,7 -7504,7 +7504,7 @@@ static int _nfs4_proc_create_session(st
        };
        int status;
  
-       nfs4_init_channel_attrs(&args);
+       nfs4_init_channel_attrs(&args, clp->cl_rpcclient);
        args.flags = (SESSION4_PERSIST | SESSION4_BACK_CHAN);
  
        status = rpc_call_sync(session->clp->cl_rpcclient, &msg, RPC_TASK_TIMEOUT);
@@@ -7820,40 -7848,34 +7848,34 @@@ nfs4_layoutget_prepare(struct rpc_task 
        struct nfs4_layoutget *lgp = calldata;
        struct nfs_server *server = NFS_SERVER(lgp->args.inode);
        struct nfs4_session *session = nfs4_get_session(server);
-       int ret;
  
        dprintk("--> %s\n", __func__);
-       /* Note the is a race here, where a CB_LAYOUTRECALL can come in
-        * right now covering the LAYOUTGET we are about to send.
-        * However, that is not so catastrophic, and there seems
-        * to be no way to prevent it completely.
-        */
-       if (nfs41_setup_sequence(session, &lgp->args.seq_args,
-                               &lgp->res.seq_res, task))
-               return;
-       ret = pnfs_choose_layoutget_stateid(&lgp->args.stateid,
-                                         NFS_I(lgp->args.inode)->layout,
-                                         &lgp->args.range,
-                                         lgp->args.ctx->state);
-       if (ret < 0)
-               rpc_exit(task, ret);
+       nfs41_setup_sequence(session, &lgp->args.seq_args,
+                               &lgp->res.seq_res, task);
+       dprintk("<-- %s\n", __func__);
  }
  
  static void nfs4_layoutget_done(struct rpc_task *task, void *calldata)
  {
        struct nfs4_layoutget *lgp = calldata;
+       dprintk("--> %s\n", __func__);
+       nfs41_sequence_done(task, &lgp->res.seq_res);
+       dprintk("<-- %s\n", __func__);
+ }
+ static int
+ nfs4_layoutget_handle_exception(struct rpc_task *task,
+               struct nfs4_layoutget *lgp, struct nfs4_exception *exception)
+ {
        struct inode *inode = lgp->args.inode;
        struct nfs_server *server = NFS_SERVER(inode);
        struct pnfs_layout_hdr *lo;
-       struct nfs4_state *state = NULL;
-       unsigned long timeo, now, giveup;
+       int status = task->tk_status;
  
        dprintk("--> %s tk_status => %d\n", __func__, -task->tk_status);
  
-       if (!nfs41_sequence_done(task, &lgp->res.seq_res))
-               goto out;
-       switch (task->tk_status) {
+       switch (status) {
        case 0:
                goto out;
  
         * retry go inband.
         */
        case -NFS4ERR_LAYOUTUNAVAILABLE:
-               task->tk_status = -ENODATA;
+               status = -ENODATA;
                goto out;
        /*
         * NFS4ERR_BADLAYOUT means the MDS cannot return a layout of
         * length lgp->args.minlength != 0 (see RFC5661 section 18.43.3).
         */
        case -NFS4ERR_BADLAYOUT:
-               goto out_overflow;
+               status = -EOVERFLOW;
+               goto out;
        /*
         * NFS4ERR_LAYOUTTRYLATER is a conflict with another client
         * (or clients) writing to the same RAID stripe except when
         * the minlength argument is 0 (see RFC5661 section 18.43.3).
+        *
+        * Treat it like we would RECALLCONFLICT -- we retry for a little
+        * while, and then eventually give up.
         */
        case -NFS4ERR_LAYOUTTRYLATER:
-               if (lgp->args.minlength == 0)
-                       goto out_overflow;
-       /*
-        * NFS4ERR_RECALLCONFLICT is when conflict with self (must recall
-        * existing layout before getting a new one).
-        */
-       case -NFS4ERR_RECALLCONFLICT:
-               timeo = rpc_get_timeout(task->tk_client);
-               giveup = lgp->args.timestamp + timeo;
-               now = jiffies;
-               if (time_after(giveup, now)) {
-                       unsigned long delay;
-                       /* Delay for:
-                        * - Not less then NFS4_POLL_RETRY_MIN.
-                        * - One last time a jiffie before we give up
-                        * - exponential backoff (time_now minus start_attempt)
-                        */
-                       delay = max_t(unsigned long, NFS4_POLL_RETRY_MIN,
-                                   min((giveup - now - 1),
-                                       now - lgp->args.timestamp));
-                       dprintk("%s: NFS4ERR_RECALLCONFLICT waiting %lu\n",
-                               __func__, delay);
-                       rpc_delay(task, delay);
-                       /* Do not call nfs4_async_handle_error() */
-                       goto out_restart;
+               if (lgp->args.minlength == 0) {
+                       status = -EOVERFLOW;
+                       goto out;
                }
-               break;
+               /* Fallthrough */
+       case -NFS4ERR_RECALLCONFLICT:
+               nfs4_handle_exception(server, -NFS4ERR_RECALLCONFLICT,
+                                       exception);
+               status = -ERECALLCONFLICT;
+               goto out;
        case -NFS4ERR_EXPIRED:
        case -NFS4ERR_BAD_STATEID:
+               exception->timeout = 0;
                spin_lock(&inode->i_lock);
                if (nfs4_stateid_match(&lgp->args.stateid,
                                        &lgp->args.ctx->state->stateid)) {
                        spin_unlock(&inode->i_lock);
                        /* If the open stateid was bad, then recover it. */
-                       state = lgp->args.ctx->state;
+                       exception->state = lgp->args.ctx->state;
                        break;
                }
                lo = NFS_I(inode)->layout;
                         * with the current stateid.
                         */
                        set_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags);
-                       pnfs_mark_matching_lsegs_invalid(lo, &head, NULL);
+                       pnfs_mark_matching_lsegs_invalid(lo, &head, NULL, 0);
                        spin_unlock(&inode->i_lock);
                        pnfs_free_lseg_list(&head);
                } else
                        spin_unlock(&inode->i_lock);
-               goto out_restart;
+               status = -EAGAIN;
+               goto out;
        }
-       if (nfs4_async_handle_error(task, server, state, &lgp->timeout) == -EAGAIN)
-               goto out_restart;
+       status = nfs4_handle_exception(server, status, exception);
+       if (exception->retry)
+               status = -EAGAIN;
  out:
        dprintk("<-- %s\n", __func__);
-       return;
- out_restart:
-       task->tk_status = 0;
-       rpc_restart_call_prepare(task);
-       return;
- out_overflow:
-       task->tk_status = -EOVERFLOW;
-       goto out;
+       return status;
  }
  
  static size_t max_response_pages(struct nfs_server *server)
@@@ -8013,7 -8017,7 +8017,7 @@@ static const struct rpc_call_ops nfs4_l
  };
  
  struct pnfs_layout_segment *
- nfs4_proc_layoutget(struct nfs4_layoutget *lgp, gfp_t gfp_flags)
+ nfs4_proc_layoutget(struct nfs4_layoutget *lgp, long *timeout, gfp_t gfp_flags)
  {
        struct inode *inode = lgp->args.inode;
        struct nfs_server *server = NFS_SERVER(inode);
                .flags = RPC_TASK_ASYNC,
        };
        struct pnfs_layout_segment *lseg = NULL;
+       struct nfs4_exception exception = { .timeout = *timeout };
        int status = 0;
  
        dprintk("--> %s\n", __func__);
                return ERR_PTR(-ENOMEM);
        }
        lgp->args.layout.pglen = max_pages * PAGE_SIZE;
-       lgp->args.timestamp = jiffies;
  
        lgp->res.layoutp = &lgp->args.layout;
        lgp->res.seq_res.sr_slot = NULL;
        if (IS_ERR(task))
                return ERR_CAST(task);
        status = nfs4_wait_for_completion_rpc_task(task);
-       if (status == 0)
-               status = task->tk_status;
+       if (status == 0) {
+               status = nfs4_layoutget_handle_exception(task, lgp, &exception);
+               *timeout = exception.timeout;
+       }
        trace_nfs4_layoutget(lgp->args.ctx,
                        &lgp->args.range,
                        &lgp->res.range,
                        &lgp->res.stateid,
                        status);
        /* if layoutp->len is 0, nfs4_layoutget_prepare called rpc_exit */
        if (status == 0 && lgp->res.layoutp->len)
                lseg = pnfs_layout_process(lgp);
@@@ -8118,7 -8126,8 +8126,8 @@@ static void nfs4_layoutreturn_release(v
  
        dprintk("--> %s\n", __func__);
        spin_lock(&lo->plh_inode->i_lock);
-       pnfs_mark_matching_lsegs_invalid(lo, &freeme, &lrp->args.range);
+       pnfs_mark_matching_lsegs_invalid(lo, &freeme, &lrp->args.range,
+                       be32_to_cpu(lrp->args.stateid.seqid));
        pnfs_mark_layout_returned_if_empty(lo);
        if (lrp->res.lrs_present)
                pnfs_set_layout_stateid(lo, &lrp->res.stateid, true);
@@@ -8653,6 -8662,9 +8662,9 @@@ nfs41_free_lock_state(struct nfs_serve
  static bool nfs41_match_stateid(const nfs4_stateid *s1,
                const nfs4_stateid *s2)
  {
+       if (s1->type != s2->type)
+               return false;
        if (memcmp(s1->other, s2->other, sizeof(s1->other)) != 0)
                return false;
  
@@@ -8793,6 -8805,7 +8805,7 @@@ static const struct nfs4_minor_version_
                | NFS_CAP_STATEID_NFSV41
                | NFS_CAP_ATOMIC_OPEN_V1
                | NFS_CAP_ALLOCATE
+               | NFS_CAP_COPY
                | NFS_CAP_DEALLOCATE
                | NFS_CAP_SEEK
                | NFS_CAP_LAYOUTSTATS
diff --combined include/linux/nfs_xdr.h
@@@ -233,7 -233,6 +233,6 @@@ struct nfs4_layoutget_args 
        struct inode *inode;
        struct nfs_open_context *ctx;
        nfs4_stateid stateid;
-       unsigned long timestamp;
        struct nfs4_layoutdriver_data layout;
  };
  
@@@ -251,7 -250,6 +250,6 @@@ struct nfs4_layoutget 
        struct nfs4_layoutget_res res;
        struct rpc_cred *cred;
        gfp_t gfp_flags;
-       long timeout;
  };
  
  struct nfs4_getdeviceinfo_args {
@@@ -1343,6 -1341,32 +1341,32 @@@ struct nfs42_falloc_res 
        const struct nfs_server         *falloc_server;
  };
  
+ struct nfs42_copy_args {
+       struct nfs4_sequence_args       seq_args;
+       struct nfs_fh                   *src_fh;
+       nfs4_stateid                    src_stateid;
+       u64                             src_pos;
+       struct nfs_fh                   *dst_fh;
+       nfs4_stateid                    dst_stateid;
+       u64                             dst_pos;
+       u64                             count;
+ };
+ struct nfs42_write_res {
+       u64                     count;
+       struct nfs_writeverf    verifier;
+ };
+ struct nfs42_copy_res {
+       struct nfs4_sequence_res        seq_res;
+       struct nfs42_write_res          write_res;
+       bool                            consecutive;
+       bool                            synchronous;
+ };
  struct nfs42_seek_args {
        struct nfs4_sequence_args       seq_args;
  
@@@ -1431,7 -1455,7 +1455,7 @@@ struct nfs_commit_completion_ops 
  };
  
  struct nfs_commit_info {
-       spinlock_t                      *lock;  /* inode->i_lock */
+       struct inode                    *inode; /* Needed for inode->i_lock */
        struct nfs_mds_commit_info      *mds;
        struct pnfs_ds_commit_info      *ds;
        struct nfs_direct_req           *dreq;  /* O_DIRECT request */
@@@ -1468,10 -1492,10 +1492,10 @@@ struct nfs_pgio_completion_ops 
  };
  
  struct nfs_unlinkdata {
 -      struct hlist_node list;
        struct nfs_removeargs args;
        struct nfs_removeres res;
 -      struct inode *dir;
 +      struct dentry *dentry;
 +      wait_queue_head_t wq;
        struct rpc_cred *cred;
        struct nfs_fattr dir_attr;
        long timeout;
@@@ -98,6 -98,47 +98,47 @@@ frwr_destroy_recovery_wq(void
        destroy_workqueue(wq);
  }
  
+ static int
+ __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
+ {
+       struct rpcrdma_frmr *f = &r->frmr;
+       int rc;
+       rc = ib_dereg_mr(f->fr_mr);
+       if (rc) {
+               pr_warn("rpcrdma: ib_dereg_mr status %d, frwr %p orphaned\n",
+                       rc, r);
+               return rc;
+       }
+       f->fr_mr = ib_alloc_mr(ia->ri_pd, IB_MR_TYPE_MEM_REG,
+                              ia->ri_max_frmr_depth);
+       if (IS_ERR(f->fr_mr)) {
+               pr_warn("rpcrdma: ib_alloc_mr status %ld, frwr %p orphaned\n",
+                       PTR_ERR(f->fr_mr), r);
+               return PTR_ERR(f->fr_mr);
+       }
+       dprintk("RPC:       %s: recovered FRMR %p\n", __func__, r);
+       f->fr_state = FRMR_IS_INVALID;
+       return 0;
+ }
+ static void
+ __frwr_reset_and_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
+ {
+       struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+       struct rpcrdma_frmr *f = &mw->frmr;
+       int rc;
+       rc = __frwr_reset_mr(ia, mw);
+       ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents, f->fr_dir);
+       if (rc)
+               return;
+       rpcrdma_put_mw(r_xprt, mw);
+ }
  /* Deferred reset of a single FRMR. Generate a fresh rkey by
   * replacing the MR.
   *
@@@ -109,26 -150,10 +150,10 @@@ static voi
  __frwr_recovery_worker(struct work_struct *work)
  {
        struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
-                                           frmr.fr_work);
-       struct rpcrdma_xprt *r_xprt = r->frmr.fr_xprt;
-       unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
-       struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
-       if (ib_dereg_mr(r->frmr.fr_mr))
-               goto out_fail;
+                                           mw_work);
  
-       r->frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
-       if (IS_ERR(r->frmr.fr_mr))
-               goto out_fail;
-       dprintk("RPC:       %s: recovered FRMR %p\n", __func__, r);
-       r->frmr.fr_state = FRMR_IS_INVALID;
-       rpcrdma_put_mw(r_xprt, r);
+       __frwr_reset_and_unmap(r->mw_xprt, r);
        return;
- out_fail:
-       pr_warn("RPC:       %s: FRMR %p unrecovered\n",
-               __func__, r);
  }
  
  /* A broken MR was discovered in a context that can't sleep.
  static void
  __frwr_queue_recovery(struct rpcrdma_mw *r)
  {
-       INIT_WORK(&r->frmr.fr_work, __frwr_recovery_worker);
-       queue_work(frwr_recovery_wq, &r->frmr.fr_work);
+       INIT_WORK(&r->mw_work, __frwr_recovery_worker);
+       queue_work(frwr_recovery_wq, &r->mw_work);
  }
  
  static int
@@@ -152,11 -177,11 +177,11 @@@ __frwr_init(struct rpcrdma_mw *r, struc
        if (IS_ERR(f->fr_mr))
                goto out_mr_err;
  
-       f->sg = kcalloc(depth, sizeof(*f->sg), GFP_KERNEL);
-       if (!f->sg)
+       f->fr_sg = kcalloc(depth, sizeof(*f->fr_sg), GFP_KERNEL);
+       if (!f->fr_sg)
                goto out_list_err;
  
-       sg_init_table(f->sg, depth);
+       sg_init_table(f->fr_sg, depth);
  
        init_completion(&f->fr_linv_done);
  
@@@ -185,7 -210,7 +210,7 @@@ __frwr_release(struct rpcrdma_mw *r
        if (rc)
                dprintk("RPC:       %s: ib_dereg_mr status %i\n",
                        __func__, rc);
-       kfree(r->frmr.sg);
+       kfree(r->frmr.fr_sg);
  }
  
  static int
@@@ -231,6 -256,9 +256,9 @@@ frwr_op_open(struct rpcrdma_ia *ia, str
                                               depth;
        }
  
+       rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1,
+                                                     RPCRDMA_MAX_DATA_SEGS /
+                                                     ia->ri_max_frmr_depth));
        return 0;
  }
  
@@@ -243,7 -271,7 +271,7 @@@ frwr_op_maxpages(struct rpcrdma_xprt *r
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
  
        return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
-                    rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth);
+                    RPCRDMA_MAX_HDR_SEGS * ia->ri_max_frmr_depth);
  }
  
  static void
@@@ -350,9 -378,9 +378,9 @@@ frwr_op_init(struct rpcrdma_xprt *r_xpr
                        return rc;
                }
  
+               r->mw_xprt = r_xprt;
                list_add(&r->mw_list, &buf->rb_mws);
                list_add(&r->mw_all, &buf->rb_all);
-               r->frmr.fr_xprt = r_xprt;
        }
  
        return 0;
@@@ -396,12 -424,12 +424,12 @@@ frwr_op_map(struct rpcrdma_xprt *r_xprt
  
        for (i = 0; i < nsegs;) {
                if (seg->mr_page)
-                       sg_set_page(&frmr->sg[i],
+                       sg_set_page(&frmr->fr_sg[i],
                                    seg->mr_page,
                                    seg->mr_len,
                                    offset_in_page(seg->mr_offset));
                else
-                       sg_set_buf(&frmr->sg[i], seg->mr_offset,
+                       sg_set_buf(&frmr->fr_sg[i], seg->mr_offset,
                                   seg->mr_len);
  
                ++seg;
                    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
                        break;
        }
-       frmr->sg_nents = i;
+       frmr->fr_nents = i;
+       frmr->fr_dir = direction;
  
-       dma_nents = ib_dma_map_sg(device, frmr->sg, frmr->sg_nents, direction);
+       dma_nents = ib_dma_map_sg(device, frmr->fr_sg, frmr->fr_nents, direction);
        if (!dma_nents) {
                pr_err("RPC:       %s: failed to dma map sg %p sg_nents %u\n",
-                      __func__, frmr->sg, frmr->sg_nents);
+                      __func__, frmr->fr_sg, frmr->fr_nents);
                return -ENOMEM;
        }
  
-       n = ib_map_mr_sg(mr, frmr->sg, frmr->sg_nents, NULL, PAGE_SIZE);
-       if (unlikely(n != frmr->sg_nents)) {
 -      n = ib_map_mr_sg(mr, frmr->fr_sg, frmr->fr_nents, PAGE_SIZE);
++      n = ib_map_mr_sg(mr, frmr->fr_sg, frmr->fr_nents, NULL, PAGE_SIZE);
+       if (unlikely(n != frmr->fr_nents)) {
                pr_err("RPC:       %s: failed to map mr %p (%u/%u)\n",
-                      __func__, frmr->fr_mr, n, frmr->sg_nents);
+                      __func__, frmr->fr_mr, n, frmr->fr_nents);
                rc = n < 0 ? n : -EINVAL;
                goto out_senderr;
        }
  
        dprintk("RPC:       %s: Using frmr %p to map %u segments (%u bytes)\n",
-               __func__, mw, frmr->sg_nents, mr->length);
+               __func__, mw, frmr->fr_nents, mr->length);
  
        key = (u8)(mr->rkey & 0x000000FF);
        ib_update_fast_reg_key(mr, ++key);
        if (rc)
                goto out_senderr;
  
-       seg1->mr_dir = direction;
        seg1->rl_mw = mw;
        seg1->mr_rkey = mr->rkey;
        seg1->mr_base = mr->iova;
-       seg1->mr_nsegs = frmr->sg_nents;
+       seg1->mr_nsegs = frmr->fr_nents;
        seg1->mr_len = mr->length;
  
-       return frmr->sg_nents;
+       return frmr->fr_nents;
  
  out_senderr:
        dprintk("RPC:       %s: ib_post_send status %i\n", __func__, rc);
-       ib_dma_unmap_sg(device, frmr->sg, dma_nents, direction);
        __frwr_queue_recovery(mw);
        return rc;
  }
@@@ -487,24 -514,6 +514,6 @@@ __frwr_prepare_linv_wr(struct rpcrdma_m
        return invalidate_wr;
  }
  
- static void
- __frwr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
-                int rc)
- {
-       struct ib_device *device = r_xprt->rx_ia.ri_device;
-       struct rpcrdma_mw *mw = seg->rl_mw;
-       struct rpcrdma_frmr *f = &mw->frmr;
-       seg->rl_mw = NULL;
-       ib_dma_unmap_sg(device, f->sg, f->sg_nents, seg->mr_dir);
-       if (!rc)
-               rpcrdma_put_mw(r_xprt, mw);
-       else
-               __frwr_queue_recovery(mw);
- }
  /* Invalidate all memory regions that were registered for "req".
   *
   * Sleeps until it is safe for the host CPU to access the
@@@ -518,6 -527,7 +527,7 @@@ frwr_op_unmap_sync(struct rpcrdma_xprt 
        struct rpcrdma_mr_seg *seg;
        unsigned int i, nchunks;
        struct rpcrdma_frmr *f;
+       struct rpcrdma_mw *mw;
        int rc;
  
        dprintk("RPC:       %s: req %p\n", __func__, req);
         * unless ri_id->qp is a valid pointer.
         */
        rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr);
-       if (rc) {
-               pr_warn("%s: ib_post_send failed %i\n", __func__, rc);
-               rdma_disconnect(ia->ri_id);
-               goto unmap;
-       }
+       if (rc)
+               goto reset_mrs;
  
        wait_for_completion(&f->fr_linv_done);
  
  unmap:
        for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
                seg = &req->rl_segments[i];
+               mw = seg->rl_mw;
+               seg->rl_mw = NULL;
  
-               __frwr_dma_unmap(r_xprt, seg, rc);
+               ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents,
+                               f->fr_dir);
+               rpcrdma_put_mw(r_xprt, mw);
  
                i += seg->mr_nsegs;
                seg->mr_nsegs = 0;
        }
  
        req->rl_nchunks = 0;
- }
+       return;
  
- /* Post a LOCAL_INV Work Request to prevent further remote access
-  * via RDMA READ or RDMA WRITE.
-  */
- static int
- frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
- {
-       struct rpcrdma_mr_seg *seg1 = seg;
-       struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-       struct rpcrdma_mw *mw = seg1->rl_mw;
-       struct rpcrdma_frmr *frmr = &mw->frmr;
-       struct ib_send_wr *invalidate_wr, *bad_wr;
-       int rc, nsegs = seg->mr_nsegs;
+ reset_mrs:
+       pr_warn("%s: ib_post_send failed %i\n", __func__, rc);
  
-       dprintk("RPC:       %s: FRMR %p\n", __func__, mw);
+       /* Find and reset the MRs in the LOCAL_INV WRs that did not
+        * get posted. This is synchronous, and slow.
+        */
+       for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
+               seg = &req->rl_segments[i];
+               mw = seg->rl_mw;
+               f = &mw->frmr;
  
-       seg1->rl_mw = NULL;
-       frmr->fr_state = FRMR_IS_INVALID;
-       invalidate_wr = &mw->frmr.fr_invwr;
+               if (mw->frmr.fr_mr->rkey == bad_wr->ex.invalidate_rkey) {
+                       __frwr_reset_mr(ia, mw);
+                       bad_wr = bad_wr->next;
+               }
  
-       memset(invalidate_wr, 0, sizeof(*invalidate_wr));
-       frmr->fr_cqe.done = frwr_wc_localinv;
-       invalidate_wr->wr_cqe = &frmr->fr_cqe;
-       invalidate_wr->opcode = IB_WR_LOCAL_INV;
-       invalidate_wr->ex.invalidate_rkey = frmr->fr_mr->rkey;
-       DECR_CQCOUNT(&r_xprt->rx_ep);
+               i += seg->mr_nsegs;
+       }
+       goto unmap;
+ }
  
-       ib_dma_unmap_sg(ia->ri_device, frmr->sg, frmr->sg_nents, seg1->mr_dir);
-       read_lock(&ia->ri_qplock);
-       rc = ib_post_send(ia->ri_id->qp, invalidate_wr, &bad_wr);
-       read_unlock(&ia->ri_qplock);
-       if (rc)
-               goto out_err;
+ /* Use a slow, safe mechanism to invalidate all memory regions
+  * that were registered for "req".
+  */
+ static void
+ frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+                  bool sync)
+ {
+       struct rpcrdma_mr_seg *seg;
+       struct rpcrdma_mw *mw;
+       unsigned int i;
  
-       rpcrdma_put_mw(r_xprt, mw);
-       return nsegs;
+       for (i = 0; req->rl_nchunks; req->rl_nchunks--) {
+               seg = &req->rl_segments[i];
+               mw = seg->rl_mw;
  
- out_err:
-       dprintk("RPC:       %s: ib_post_send status %i\n", __func__, rc);
-       __frwr_queue_recovery(mw);
-       return nsegs;
+               if (sync)
+                       __frwr_reset_and_unmap(r_xprt, mw);
+               else
+                       __frwr_queue_recovery(mw);
+               i += seg->mr_nsegs;
+               seg->mr_nsegs = 0;
+               seg->rl_mw = NULL;
+       }
  }
  
  static void
@@@ -643,7 -659,7 +659,7 @@@ frwr_op_destroy(struct rpcrdma_buffer *
  const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
        .ro_map                         = frwr_op_map,
        .ro_unmap_sync                  = frwr_op_unmap_sync,
-       .ro_unmap                       = frwr_op_unmap,
+       .ro_unmap_safe                  = frwr_op_unmap_safe,
        .ro_open                        = frwr_op_open,
        .ro_maxpages                    = frwr_op_maxpages,
        .ro_init                        = frwr_op_init,
diff --combined net/sunrpc/xprtsock.c
@@@ -995,14 -995,15 +995,14 @@@ static void xs_udp_data_read_skb(struc
        u32 _xid;
        __be32 *xp;
  
 -      repsize = skb->len - sizeof(struct udphdr);
 +      repsize = skb->len;
        if (repsize < 4) {
                dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
                return;
        }
  
        /* Copy the XID from the skb... */
 -      xp = skb_header_pointer(skb, sizeof(struct udphdr),
 -                              sizeof(_xid), &_xid);
 +      xp = skb_header_pointer(skb, 0, sizeof(_xid), &_xid);
        if (xp == NULL)
                return;
  
  
        /* Suck it into the iovec, verify checksum if not done by hw. */
        if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
 -              UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
 +              __UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
                goto out_unlock;
        }
  
 -      UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
 +      __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
  
        xprt_adjust_cwnd(xprt, task, copied);
        xprt_complete_rqst(task, copied);
@@@ -1364,6 -1365,11 +1364,11 @@@ static int xs_tcp_bc_up(struct svc_ser
                return ret;
        return 0;
  }
+ static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt)
+ {
+       return PAGE_SIZE;
+ }
  #else
  static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
                                        struct xdr_skb_reader *desc)
@@@ -1880,7 -1886,8 +1885,7 @@@ static inline void xs_reclassify_socket
  
  static inline void xs_reclassify_socket(int family, struct socket *sock)
  {
 -      WARN_ON_ONCE(sock_owned_by_user(sock->sk));
 -      if (sock_owned_by_user(sock->sk))
 +      if (WARN_ON_ONCE(!sock_allow_reclassification(sock->sk)))
                return;
  
        switch (family) {
@@@ -1950,7 -1957,6 +1955,7 @@@ static int xs_local_finish_connecting(s
                sk->sk_user_data = xprt;
                sk->sk_data_ready = xs_data_ready;
                sk->sk_write_space = xs_udp_write_space;
 +              sock_set_flag(sk, SOCK_FASYNC);
                sk->sk_error_report = xs_error_report;
                sk->sk_allocation = GFP_NOIO;
  
@@@ -2137,7 -2143,6 +2142,7 @@@ static void xs_udp_finish_connecting(st
                sk->sk_user_data = xprt;
                sk->sk_data_ready = xs_data_ready;
                sk->sk_write_space = xs_udp_write_space;
 +              sock_set_flag(sk, SOCK_FASYNC);
                sk->sk_allocation = GFP_NOIO;
  
                xprt_set_connected(xprt);
@@@ -2239,7 -2244,6 +2244,7 @@@ static int xs_tcp_finish_connecting(str
                sk->sk_data_ready = xs_tcp_data_ready;
                sk->sk_state_change = xs_tcp_state_change;
                sk->sk_write_space = xs_tcp_write_space;
 +              sock_set_flag(sk, SOCK_FASYNC);
                sk->sk_error_report = xs_error_report;
                sk->sk_allocation = GFP_NOIO;
  
@@@ -2661,6 -2665,7 +2666,7 @@@ static struct rpc_xprt_ops xs_tcp_ops 
  #ifdef CONFIG_SUNRPC_BACKCHANNEL
        .bc_setup               = xprt_setup_bc,
        .bc_up                  = xs_tcp_bc_up,
+       .bc_maxpayload          = xs_tcp_bc_maxpayload,
        .bc_free_rqst           = xprt_free_bc_rqst,
        .bc_destroy             = xprt_destroy_bc,
  #endif