Merge tag 'nfsd-4.9' of git://linux-nfs.org/~bfields/linux
authorLinus Torvalds <torvalds@linux-foundation.org>
Fri, 14 Oct 2016 04:04:42 +0000 (21:04 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Fri, 14 Oct 2016 04:04:42 +0000 (21:04 -0700)
Pull nfsd updates from Bruce Fields:
 "Some RDMA work and some good bugfixes, and two new features that could
  benefit from user testing:

   - Anna Schumacker contributed a simple NFSv4.2 COPY implementation.
     COPY is already supported on the client side, so a call to
     copy_file_range() on a recent client should now result in a
     server-side copy that doesn't require all the data to make a round
     trip to the client and back.

   - Jeff Layton implemented callbacks to notify clients when contended
     locks become available, which should reduce latency on workloads
     with contended locks"

* tag 'nfsd-4.9' of git://linux-nfs.org/~bfields/linux:
  NFSD: Implement the COPY call
  nfsd: handle EUCLEAN
  nfsd: only WARN once on unmapped errors
  exportfs: be careful to only return expected errors.
  nfsd4: setclientid_confirm with unmatched verifier should fail
  nfsd: randomize SETCLIENTID reply to help distinguish servers
  nfsd: set the MAY_NOTIFY_LOCK flag in OPEN replies
  nfs: add a new NFS4_OPEN_RESULT_MAY_NOTIFY_LOCK constant
  nfsd: add a LRU list for blocked locks
  nfsd: have nfsd4_lock use blocking locks for v4.1+ locks
  nfsd: plumb in a CB_NOTIFY_LOCK operation
  NFSD: fix corruption in notifier registration
  svcrdma: support Remote Invalidation
  svcrdma: Server-side support for rpcrdma_connect_private
  rpcrdma: RDMA/CM private message data structure
  svcrdma: Skip put_page() when send_reply() fails
  svcrdma: Tail iovec leaves an orphaned DMA mapping
  nfsd: fix dprintk in nfsd4_encode_getdeviceinfo
  nfsd: eliminate cb_minorversion field
  nfsd: don't set a FL_LAYOUT lease for flexfiles layouts

1  2 
fs/nfsd/nfs4state.c
fs/nfsd/nfsproc.c
net/sunrpc/xprtrdma/svc_rdma_transport.c

diff --combined fs/nfsd/nfs4state.c
@@@ -99,6 -99,7 +99,7 @@@ static struct kmem_cache *odstate_slab
  static void free_session(struct nfsd4_session *);
  
  static const struct nfsd4_callback_ops nfsd4_cb_recall_ops;
+ static const struct nfsd4_callback_ops nfsd4_cb_notify_lock_ops;
  
  static bool is_session_dead(struct nfsd4_session *ses)
  {
@@@ -210,6 -211,85 +211,85 @@@ static void nfsd4_put_session(struct nf
        spin_unlock(&nn->client_lock);
  }
  
+ static struct nfsd4_blocked_lock *
+ find_blocked_lock(struct nfs4_lockowner *lo, struct knfsd_fh *fh,
+                       struct nfsd_net *nn)
+ {
+       struct nfsd4_blocked_lock *cur, *found = NULL;
+       spin_lock(&nn->client_lock);
+       list_for_each_entry(cur, &lo->lo_blocked, nbl_list) {
+               if (fh_match(fh, &cur->nbl_fh)) {
+                       list_del_init(&cur->nbl_list);
+                       list_del_init(&cur->nbl_lru);
+                       found = cur;
+                       break;
+               }
+       }
+       spin_unlock(&nn->client_lock);
+       if (found)
+               posix_unblock_lock(&found->nbl_lock);
+       return found;
+ }
+ static struct nfsd4_blocked_lock *
+ find_or_allocate_block(struct nfs4_lockowner *lo, struct knfsd_fh *fh,
+                       struct nfsd_net *nn)
+ {
+       struct nfsd4_blocked_lock *nbl;
+       nbl = find_blocked_lock(lo, fh, nn);
+       if (!nbl) {
+               nbl= kmalloc(sizeof(*nbl), GFP_KERNEL);
+               if (nbl) {
+                       fh_copy_shallow(&nbl->nbl_fh, fh);
+                       locks_init_lock(&nbl->nbl_lock);
+                       nfsd4_init_cb(&nbl->nbl_cb, lo->lo_owner.so_client,
+                                       &nfsd4_cb_notify_lock_ops,
+                                       NFSPROC4_CLNT_CB_NOTIFY_LOCK);
+               }
+       }
+       return nbl;
+ }
+ static void
+ free_blocked_lock(struct nfsd4_blocked_lock *nbl)
+ {
+       locks_release_private(&nbl->nbl_lock);
+       kfree(nbl);
+ }
+ static int
+ nfsd4_cb_notify_lock_done(struct nfsd4_callback *cb, struct rpc_task *task)
+ {
+       /*
+        * Since this is just an optimization, we don't try very hard if it
+        * turns out not to succeed. We'll requeue it on NFS4ERR_DELAY, and
+        * just quit trying on anything else.
+        */
+       switch (task->tk_status) {
+       case -NFS4ERR_DELAY:
+               rpc_delay(task, 1 * HZ);
+               return 0;
+       default:
+               return 1;
+       }
+ }
+ static void
+ nfsd4_cb_notify_lock_release(struct nfsd4_callback *cb)
+ {
+       struct nfsd4_blocked_lock       *nbl = container_of(cb,
+                                               struct nfsd4_blocked_lock, nbl_cb);
+       free_blocked_lock(nbl);
+ }
+ static const struct nfsd4_callback_ops nfsd4_cb_notify_lock_ops = {
+       .done           = nfsd4_cb_notify_lock_done,
+       .release        = nfsd4_cb_notify_lock_release,
+ };
  static inline struct nfs4_stateowner *
  nfs4_get_stateowner(struct nfs4_stateowner *sop)
  {
@@@ -1903,7 -1983,7 +1983,7 @@@ static bool groups_equal(struct group_i
        if (g1->ngroups != g2->ngroups)
                return false;
        for (i=0; i<g1->ngroups; i++)
 -              if (!gid_eq(GROUP_AT(g1, i), GROUP_AT(g2, i)))
 +              if (!gid_eq(g1->gid[i], g2->gid[i]))
                        return false;
        return true;
  }
@@@ -3224,9 -3304,10 +3304,10 @@@ nfsd4_setclientid_confirm(struct svc_rq
                goto out;
        /* cases below refer to rfc 3530 section 14.2.34: */
        if (!unconf || !same_verf(&confirm, &unconf->cl_confirm)) {
-               if (conf && !unconf) /* case 2: probable retransmit */
+               if (conf && same_verf(&confirm, &conf->cl_confirm)) {
+                       /* case 2: probable retransmit */
                        status = nfs_ok;
-               else /* case 4: client hasn't noticed we rebooted yet? */
+               else /* case 4: client hasn't noticed we rebooted yet? */
                        status = nfserr_stale_clientid;
                goto out;
        }
        * To finish the open response, we just need to set the rflags.
        */
        open->op_rflags = NFS4_OPEN_RESULT_LOCKTYPE_POSIX;
-       if (!(open->op_openowner->oo_flags & NFS4_OO_CONFIRMED) &&
-           !nfsd4_has_session(&resp->cstate))
+       if (nfsd4_has_session(&resp->cstate))
+               open->op_rflags |= NFS4_OPEN_RESULT_MAY_NOTIFY_LOCK;
+       else if (!(open->op_openowner->oo_flags & NFS4_OO_CONFIRMED))
                open->op_rflags |= NFS4_OPEN_RESULT_CONFIRM;
        if (dp)
                nfs4_put_stid(&dp->dl_stid);
        if (stp)
@@@ -4501,6 -4584,7 +4584,7 @@@ nfs4_laundromat(struct nfsd_net *nn
        struct nfs4_openowner *oo;
        struct nfs4_delegation *dp;
        struct nfs4_ol_stateid *stp;
+       struct nfsd4_blocked_lock *nbl;
        struct list_head *pos, *next, reaplist;
        time_t cutoff = get_seconds() - nn->nfsd4_lease;
        time_t t, new_timeo = nn->nfsd4_lease;
        }
        spin_unlock(&nn->client_lock);
  
+       /*
+        * It's possible for a client to try and acquire an already held lock
+        * that is being held for a long time, and then lose interest in it.
+        * So, we clean out any un-revisited request after a lease period
+        * under the assumption that the client is no longer interested.
+        *
+        * RFC5661, sec. 9.6 states that the client must not rely on getting
+        * notifications and must continue to poll for locks, even when the
+        * server supports them. Thus this shouldn't lead to clients blocking
+        * indefinitely once the lock does become free.
+        */
+       BUG_ON(!list_empty(&reaplist));
+       spin_lock(&nn->client_lock);
+       while (!list_empty(&nn->blocked_locks_lru)) {
+               nbl = list_first_entry(&nn->blocked_locks_lru,
+                                       struct nfsd4_blocked_lock, nbl_lru);
+               if (time_after((unsigned long)nbl->nbl_time,
+                              (unsigned long)cutoff)) {
+                       t = nbl->nbl_time - cutoff;
+                       new_timeo = min(new_timeo, t);
+                       break;
+               }
+               list_move(&nbl->nbl_lru, &reaplist);
+               list_del_init(&nbl->nbl_list);
+       }
+       spin_unlock(&nn->client_lock);
+       while (!list_empty(&reaplist)) {
+               nbl = list_first_entry(&nn->blocked_locks_lru,
+                                       struct nfsd4_blocked_lock, nbl_lru);
+               list_del_init(&nbl->nbl_lru);
+               posix_unblock_lock(&nbl->nbl_lock);
+               free_blocked_lock(nbl);
+       }
        new_timeo = max_t(time_t, new_timeo, NFSD_LAUNDROMAT_MINTIMEOUT);
        return new_timeo;
  }
@@@ -5309,7 -5428,31 +5428,31 @@@ nfsd4_fl_put_owner(fl_owner_t owner
                nfs4_put_stateowner(&lo->lo_owner);
  }
  
+ static void
+ nfsd4_lm_notify(struct file_lock *fl)
+ {
+       struct nfs4_lockowner           *lo = (struct nfs4_lockowner *)fl->fl_owner;
+       struct net                      *net = lo->lo_owner.so_client->net;
+       struct nfsd_net                 *nn = net_generic(net, nfsd_net_id);
+       struct nfsd4_blocked_lock       *nbl = container_of(fl,
+                                               struct nfsd4_blocked_lock, nbl_lock);
+       bool queue = false;
+       /* An empty list means that something else is going to be using it */
+       spin_lock(&nn->client_lock);
+       if (!list_empty(&nbl->nbl_list)) {
+               list_del_init(&nbl->nbl_list);
+               list_del_init(&nbl->nbl_lru);
+               queue = true;
+       }
+       spin_unlock(&nn->client_lock);
+       if (queue)
+               nfsd4_run_cb(&nbl->nbl_cb);
+ }
  static const struct lock_manager_operations nfsd_posix_mng_ops  = {
+       .lm_notify = nfsd4_lm_notify,
        .lm_get_owner = nfsd4_fl_get_owner,
        .lm_put_owner = nfsd4_fl_put_owner,
  };
@@@ -5407,6 -5550,7 +5550,7 @@@ alloc_init_lock_stateowner(unsigned in
        lo = alloc_stateowner(lockowner_slab, &lock->lk_new_owner, clp);
        if (!lo)
                return NULL;
+       INIT_LIST_HEAD(&lo->lo_blocked);
        INIT_LIST_HEAD(&lo->lo_owner.so_stateids);
        lo->lo_owner.so_is_open_owner = 0;
        lo->lo_owner.so_seqid = lock->lk_new_lock_seqid;
@@@ -5588,12 -5732,15 +5732,15 @@@ nfsd4_lock(struct svc_rqst *rqstp, stru
        struct nfs4_ol_stateid *open_stp = NULL;
        struct nfs4_file *fp;
        struct file *filp = NULL;
+       struct nfsd4_blocked_lock *nbl = NULL;
        struct file_lock *file_lock = NULL;
        struct file_lock *conflock = NULL;
        __be32 status = 0;
        int lkflg;
        int err;
        bool new = false;
+       unsigned char fl_type;
+       unsigned int fl_flags = FL_POSIX;
        struct net *net = SVC_NET(rqstp);
        struct nfsd_net *nn = net_generic(net, nfsd_net_id);
  
        if (!locks_in_grace(net) && lock->lk_reclaim)
                goto out;
  
-       file_lock = locks_alloc_lock();
-       if (!file_lock) {
-               dprintk("NFSD: %s: unable to allocate lock!\n", __func__);
-               status = nfserr_jukebox;
-               goto out;
-       }
        fp = lock_stp->st_stid.sc_file;
        switch (lock->lk_type) {
-               case NFS4_READ_LT:
                case NFS4_READW_LT:
+                       if (nfsd4_has_session(cstate))
+                               fl_flags |= FL_SLEEP;
+                       /* Fallthrough */
+               case NFS4_READ_LT:
                        spin_lock(&fp->fi_lock);
                        filp = find_readable_file_locked(fp);
                        if (filp)
                                get_lock_access(lock_stp, NFS4_SHARE_ACCESS_READ);
                        spin_unlock(&fp->fi_lock);
-                       file_lock->fl_type = F_RDLCK;
+                       fl_type = F_RDLCK;
                        break;
-               case NFS4_WRITE_LT:
                case NFS4_WRITEW_LT:
+                       if (nfsd4_has_session(cstate))
+                               fl_flags |= FL_SLEEP;
+                       /* Fallthrough */
+               case NFS4_WRITE_LT:
                        spin_lock(&fp->fi_lock);
                        filp = find_writeable_file_locked(fp);
                        if (filp)
                                get_lock_access(lock_stp, NFS4_SHARE_ACCESS_WRITE);
                        spin_unlock(&fp->fi_lock);
-                       file_lock->fl_type = F_WRLCK;
+                       fl_type = F_WRLCK;
                        break;
                default:
                        status = nfserr_inval;
                goto out;
        }
        if (!filp) {
                status = nfserr_openmode;
                goto out;
        }
  
+       nbl = find_or_allocate_block(lock_sop, &fp->fi_fhandle, nn);
+       if (!nbl) {
+               dprintk("NFSD: %s: unable to allocate block!\n", __func__);
+               status = nfserr_jukebox;
+               goto out;
+       }
+       file_lock = &nbl->nbl_lock;
+       file_lock->fl_type = fl_type;
        file_lock->fl_owner = (fl_owner_t)lockowner(nfs4_get_stateowner(&lock_sop->lo_owner));
        file_lock->fl_pid = current->tgid;
        file_lock->fl_file = filp;
-       file_lock->fl_flags = FL_POSIX;
+       file_lock->fl_flags = fl_flags;
        file_lock->fl_lmops = &nfsd_posix_mng_ops;
        file_lock->fl_start = lock->lk_offset;
        file_lock->fl_end = last_byte_offset(lock->lk_offset, lock->lk_length);
                goto out;
        }
  
+       if (fl_flags & FL_SLEEP) {
+               nbl->nbl_time = jiffies;
+               spin_lock(&nn->client_lock);
+               list_add_tail(&nbl->nbl_list, &lock_sop->lo_blocked);
+               list_add_tail(&nbl->nbl_lru, &nn->blocked_locks_lru);
+               spin_unlock(&nn->client_lock);
+       }
        err = vfs_lock_file(filp, F_SETLK, file_lock, conflock);
-       switch (-err) {
+       switch (err) {
        case 0: /* success! */
                nfs4_inc_and_copy_stateid(&lock->lk_resp_stateid, &lock_stp->st_stid);
                status = 0;
                break;
-       case (EAGAIN):          /* conflock holds conflicting lock */
+       case FILE_LOCK_DEFERRED:
+               nbl = NULL;
+               /* Fallthrough */
+       case -EAGAIN:           /* conflock holds conflicting lock */
                status = nfserr_denied;
                dprintk("NFSD: nfsd4_lock: conflicting lock found!\n");
                nfs4_set_lock_denied(conflock, &lock->lk_denied);
                break;
-       case (EDEADLK):
+       case -EDEADLK:
                status = nfserr_deadlock;
                break;
        default:
                break;
        }
  out:
+       if (nbl) {
+               /* dequeue it if we queued it before */
+               if (fl_flags & FL_SLEEP) {
+                       spin_lock(&nn->client_lock);
+                       list_del_init(&nbl->nbl_list);
+                       list_del_init(&nbl->nbl_lru);
+                       spin_unlock(&nn->client_lock);
+               }
+               free_blocked_lock(nbl);
+       }
        if (filp)
                fput(filp);
        if (lock_stp) {
        if (open_stp)
                nfs4_put_stid(&open_stp->st_stid);
        nfsd4_bump_seqid(cstate, status);
-       if (file_lock)
-               locks_free_lock(file_lock);
        if (conflock)
                locks_free_lock(conflock);
        return status;
@@@ -6768,6 -6943,7 +6943,7 @@@ static int nfs4_state_create_net(struc
        INIT_LIST_HEAD(&nn->client_lru);
        INIT_LIST_HEAD(&nn->close_lru);
        INIT_LIST_HEAD(&nn->del_recall_lru);
+       INIT_LIST_HEAD(&nn->blocked_locks_lru);
        spin_lock_init(&nn->client_lock);
  
        INIT_DELAYED_WORK(&nn->laundromat_work, laundromat_main);
@@@ -6865,6 -7041,7 +7041,7 @@@ nfs4_state_shutdown_net(struct net *net
        struct nfs4_delegation *dp = NULL;
        struct list_head *pos, *next, reaplist;
        struct nfsd_net *nn = net_generic(net, nfsd_net_id);
+       struct nfsd4_blocked_lock *nbl;
  
        cancel_delayed_work_sync(&nn->laundromat_work);
        locks_end_grace(&nn->nfsd4_manager);
                nfs4_put_stid(&dp->dl_stid);
        }
  
+       BUG_ON(!list_empty(&reaplist));
+       spin_lock(&nn->client_lock);
+       while (!list_empty(&nn->blocked_locks_lru)) {
+               nbl = list_first_entry(&nn->blocked_locks_lru,
+                                       struct nfsd4_blocked_lock, nbl_lru);
+               list_move(&nbl->nbl_lru, &reaplist);
+               list_del_init(&nbl->nbl_list);
+       }
+       spin_unlock(&nn->client_lock);
+       while (!list_empty(&reaplist)) {
+               nbl = list_first_entry(&nn->blocked_locks_lru,
+                                       struct nfsd4_blocked_lock, nbl_lru);
+               list_del_init(&nbl->nbl_lru);
+               posix_unblock_lock(&nbl->nbl_lock);
+               free_blocked_lock(nbl);
+       }
        nfsd4_client_tracking_exit(net);
        nfs4_state_destroy_net(net);
  }
diff --combined fs/nfsd/nfsproc.c
@@@ -74,10 -74,10 +74,10 @@@ nfsd_proc_setattr(struct svc_rqst *rqst
         * which only requires access, and "set-[ac]time-to-X" which
         * requires ownership.
         * So if it looks like it might be "set both to the same time which
 -       * is close to now", and if inode_change_ok fails, then we
 +       * is close to now", and if setattr_prepare fails, then we
         * convert to "set to now" instead of "set to explicit time"
         *
 -       * We only call inode_change_ok as the last test as technically
 +       * We only call setattr_prepare as the last test as technically
         * it is not an interface that we should be using.
         */
  #define BOTH_TIME_SET (ATTR_ATIME_SET | ATTR_MTIME_SET)
                 * request is.  We require it be within 30 minutes of now.
                 */
                time_t delta = iap->ia_atime.tv_sec - get_seconds();
 -              struct inode *inode;
  
                nfserr = fh_verify(rqstp, fhp, 0, NFSD_MAY_NOP);
                if (nfserr)
                        goto done;
 -              inode = d_inode(fhp->fh_dentry);
  
                if (delta < 0)
                        delta = -delta;
                if (delta < MAX_TOUCH_TIME_ERROR &&
 -                  inode_change_ok(inode, iap) != 0) {
 +                  setattr_prepare(fhp->fh_dentry, iap) != 0) {
                        /*
                         * Turn off ATTR_[AM]TIME_SET but leave ATTR_[AM]TIME.
                         * This will cause notify_change to set these times
@@@ -789,6 -791,7 +789,7 @@@ nfserrno (int errno
                { nfserr_toosmall, -ETOOSMALL },
                { nfserr_serverfault, -ESERVERFAULT },
                { nfserr_serverfault, -ENFILE },
+               { nfserr_io, -EUCLEAN },
        };
        int     i;
  
                if (nfs_errtbl[i].syserr == errno)
                        return nfs_errtbl[i].nfserr;
        }
-       WARN(1, "nfsd: non-standard errno: %d\n", errno);
+       WARN_ONCE(1, "nfsd: non-standard errno: %d\n", errno);
        return nfserr_io;
  }
  
@@@ -198,6 -198,7 +198,7 @@@ struct svc_rdma_op_ctxt *svc_rdma_get_c
  
  out:
        ctxt->count = 0;
+       ctxt->mapped_sges = 0;
        ctxt->frmr = NULL;
        return ctxt;
  
@@@ -221,22 -222,27 +222,27 @@@ out_empty
  void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
  {
        struct svcxprt_rdma *xprt = ctxt->xprt;
-       int i;
-       for (i = 0; i < ctxt->count && ctxt->sge[i].length; i++) {
+       struct ib_device *device = xprt->sc_cm_id->device;
+       u32 lkey = xprt->sc_pd->local_dma_lkey;
+       unsigned int i, count;
+       for (count = 0, i = 0; i < ctxt->mapped_sges; i++) {
                /*
                 * Unmap the DMA addr in the SGE if the lkey matches
                 * the local_dma_lkey, otherwise, ignore it since it is
                 * an FRMR lkey and will be unmapped later when the
                 * last WR that uses it completes.
                 */
-               if (ctxt->sge[i].lkey == xprt->sc_pd->local_dma_lkey) {
-                       atomic_dec(&xprt->sc_dma_used);
-                       ib_dma_unmap_page(xprt->sc_cm_id->device,
+               if (ctxt->sge[i].lkey == lkey) {
+                       count++;
+                       ib_dma_unmap_page(device,
                                            ctxt->sge[i].addr,
                                            ctxt->sge[i].length,
                                            ctxt->direction);
                }
        }
+       ctxt->mapped_sges = 0;
+       atomic_sub(count, &xprt->sc_dma_used);
  }
  
  void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
@@@ -600,7 -606,7 +606,7 @@@ int svc_rdma_post_recv(struct svcxprt_r
                                     DMA_FROM_DEVICE);
                if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa))
                        goto err_put_ctxt;
-               atomic_inc(&xprt->sc_dma_used);
+               svc_rdma_count_mappings(xprt, ctxt);
                ctxt->sge[sge_no].addr = pa;
                ctxt->sge[sge_no].length = PAGE_SIZE;
                ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey;
@@@ -642,6 -648,26 +648,26 @@@ int svc_rdma_repost_recv(struct svcxprt
        return ret;
  }
  
+ static void
+ svc_rdma_parse_connect_private(struct svcxprt_rdma *newxprt,
+                              struct rdma_conn_param *param)
+ {
+       const struct rpcrdma_connect_private *pmsg = param->private_data;
+       if (pmsg &&
+           pmsg->cp_magic == rpcrdma_cmp_magic &&
+           pmsg->cp_version == RPCRDMA_CMP_VERSION) {
+               newxprt->sc_snd_w_inv = pmsg->cp_flags &
+                                       RPCRDMA_CMP_F_SND_W_INV_OK;
+               dprintk("svcrdma: client send_size %u, recv_size %u "
+                       "remote inv %ssupported\n",
+                       rpcrdma_decode_buffer_size(pmsg->cp_send_size),
+                       rpcrdma_decode_buffer_size(pmsg->cp_recv_size),
+                       newxprt->sc_snd_w_inv ? "" : "un");
+       }
+ }
  /*
   * This function handles the CONNECT_REQUEST event on a listening
   * endpoint. It is passed the cma_id for the _new_ connection. The context in
   * will call the recvfrom method on the listen xprt which will accept the new
   * connection.
   */
- static void handle_connect_req(struct rdma_cm_id *new_cma_id, size_t client_ird)
+ static void handle_connect_req(struct rdma_cm_id *new_cma_id,
+                              struct rdma_conn_param *param)
  {
        struct svcxprt_rdma *listen_xprt = new_cma_id->context;
        struct svcxprt_rdma *newxprt;
        new_cma_id->context = newxprt;
        dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
                newxprt, newxprt->sc_cm_id, listen_xprt);
+       svc_rdma_parse_connect_private(newxprt, param);
  
        /* Save client advertised inbound read limit for use later in accept. */
-       newxprt->sc_ord = client_ird;
+       newxprt->sc_ord = param->initiator_depth;
  
        /* Set the local and remote addresses in the transport */
        sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
@@@ -706,8 -734,7 +734,7 @@@ static int rdma_listen_handler(struct r
                dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, "
                        "event = %s (%d)\n", cma_id, cma_id->context,
                        rdma_event_msg(event->event), event->event);
-               handle_connect_req(cma_id,
-                                  event->param.conn.initiator_depth);
+               handle_connect_req(cma_id, &event->param.conn);
                break;
  
        case RDMA_CM_EVENT_ESTABLISHED:
@@@ -941,6 -968,7 +968,7 @@@ static struct svc_xprt *svc_rdma_accept
        struct svcxprt_rdma *listen_rdma;
        struct svcxprt_rdma *newxprt = NULL;
        struct rdma_conn_param conn_param;
+       struct rpcrdma_connect_private pmsg;
        struct ib_qp_init_attr qp_attr;
        struct ib_device *dev;
        unsigned int i;
        newxprt->sc_ord = min_t(size_t, dev->attrs.max_qp_rd_atom, newxprt->sc_ord);
        newxprt->sc_ord = min_t(size_t, svcrdma_ord, newxprt->sc_ord);
  
 -      newxprt->sc_pd = ib_alloc_pd(dev);
 +      newxprt->sc_pd = ib_alloc_pd(dev, 0);
        if (IS_ERR(newxprt->sc_pd)) {
                dprintk("svcrdma: error creating PD for connect request\n");
                goto errout;
                        dev->attrs.max_fast_reg_page_list_len;
                newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;
                newxprt->sc_reader = rdma_read_chunk_frmr;
-       }
+       } else
+               newxprt->sc_snd_w_inv = false;
  
        /*
         * Determine if a DMA MR is required and if so, what privs are required
        /* Swap out the handler */
        newxprt->sc_cm_id->event_handler = rdma_cma_handler;
  
+       /* Construct RDMA-CM private message */
+       pmsg.cp_magic = rpcrdma_cmp_magic;
+       pmsg.cp_version = RPCRDMA_CMP_VERSION;
+       pmsg.cp_flags = 0;
+       pmsg.cp_send_size = pmsg.cp_recv_size =
+               rpcrdma_encode_buffer_size(newxprt->sc_max_req_size);
        /* Accept Connection */
        set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
        memset(&conn_param, 0, sizeof conn_param);
        conn_param.responder_resources = 0;
        conn_param.initiator_depth = newxprt->sc_ord;
+       conn_param.private_data = &pmsg;
+       conn_param.private_data_len = sizeof(pmsg);
        ret = rdma_accept(newxprt->sc_cm_id, &conn_param);
        if (ret) {
                dprintk("svcrdma: failed to accept new connection, ret=%d\n",