SUNRPC: get rid of the request wait queue
authorTrond Myklebust <trond.myklebust@primarydata.com>
Sun, 3 Aug 2014 17:03:10 +0000 (13:03 -0400)
committerJ. Bruce Fields <bfields@redhat.com>
Sun, 17 Aug 2014 16:00:11 +0000 (12:00 -0400)
We're always _only_ waking up tasks from within the sp_threads list, so
we know that they are enqueued and alive. The rq_wait waitqueue is just
a distraction with extra atomic semantics.

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
Signed-off-by: J. Bruce Fields <bfields@redhat.com>
include/linux/sunrpc/svc.h
net/sunrpc/svc.c
net/sunrpc/svc_xprt.c

index cf61ecd..2167846 100644 (file)
@@ -280,7 +280,6 @@ struct svc_rqst {
        bool                    rq_splice_ok;   /* turned off in gss privacy
                                                 * to prevent encrypting page
                                                 * cache pages */
-       wait_queue_head_t       rq_wait;        /* synchronization */
        struct task_struct      *rq_task;       /* service thread */
 };
 
index 1db5007..ca8a795 100644 (file)
@@ -612,8 +612,6 @@ svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
        if (!rqstp)
                goto out_enomem;
 
-       init_waitqueue_head(&rqstp->rq_wait);
-
        serv->sv_nrthreads++;
        spin_lock_bh(&pool->sp_lock);
        pool->sp_nrthreads++;
index 08e49d1..faaf2b4 100644 (file)
@@ -348,8 +348,6 @@ static void svc_xprt_do_enqueue(struct svc_xprt *xprt)
 
        cpu = get_cpu();
        pool = svc_pool_for_cpu(xprt->xpt_server, cpu);
-       put_cpu();
-
        spin_lock_bh(&pool->sp_lock);
 
        if (!list_empty(&pool->sp_threads) &&
@@ -382,10 +380,15 @@ static void svc_xprt_do_enqueue(struct svc_xprt *xprt)
                        printk(KERN_ERR
                                "svc_xprt_enqueue: server %p, rq_xprt=%p!\n",
                                rqstp, rqstp->rq_xprt);
-               rqstp->rq_xprt = xprt;
+               /* Note the order of the following 3 lines:
+                * We want to assign xprt to rqstp->rq_xprt only _after_
+                * we've woken up the process, so that we don't race with
+                * the lockless check in svc_get_next_xprt().
+                */
                svc_xprt_get(xprt);
+               wake_up_process(rqstp->rq_task);
+               rqstp->rq_xprt = xprt;
                pool->sp_stats.threads_woken++;
-               wake_up(&rqstp->rq_wait);
        } else {
                dprintk("svc: transport %p put into queue\n", xprt);
                list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
@@ -394,6 +397,7 @@ static void svc_xprt_do_enqueue(struct svc_xprt *xprt)
 
 out_unlock:
        spin_unlock_bh(&pool->sp_lock);
+       put_cpu();
 }
 
 /*
@@ -509,7 +513,7 @@ void svc_wake_up(struct svc_serv *serv)
                        svc_thread_dequeue(pool, rqstp);
                        rqstp->rq_xprt = NULL;
                         */
-                       wake_up(&rqstp->rq_wait);
+                       wake_up_process(rqstp->rq_task);
                } else
                        pool->sp_task_pending = 1;
                spin_unlock_bh(&pool->sp_lock);
@@ -628,7 +632,6 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
 {
        struct svc_xprt *xprt;
        struct svc_pool         *pool = rqstp->rq_pool;
-       DECLARE_WAITQUEUE(wait, current);
        long                    time_left;
 
        /* Normally we will wait up to 5 seconds for any required
@@ -654,15 +657,15 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
                        xprt = ERR_PTR(-EAGAIN);
                        goto out;
                }
-               /* No data pending. Go to sleep */
-               svc_thread_enqueue(pool, rqstp);
-
                /*
                 * We have to be able to interrupt this wait
                 * to bring down the daemons ...
                 */
                set_current_state(TASK_INTERRUPTIBLE);
 
+               /* No data pending. Go to sleep */
+               svc_thread_enqueue(pool, rqstp);
+
                /*
                 * checking kthread_should_stop() here allows us to avoid
                 * locking and signalling when stopping kthreads that call
@@ -676,14 +679,13 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
                        goto out;
                }
 
-               add_wait_queue(&rqstp->rq_wait, &wait);
                spin_unlock_bh(&pool->sp_lock);
 
                time_left = schedule_timeout(timeout);
+               __set_current_state(TASK_RUNNING);
 
                try_to_freeze();
 
-               remove_wait_queue(&rqstp->rq_wait, &wait);
                xprt = rqstp->rq_xprt;
                if (xprt != NULL)
                        return xprt;
@@ -786,10 +788,10 @@ int svc_recv(struct svc_rqst *rqstp, long timeout)
                printk(KERN_ERR
                        "svc_recv: service %p, transport not NULL!\n",
                         rqstp);
-       if (waitqueue_active(&rqstp->rq_wait))
-               printk(KERN_ERR
-                       "svc_recv: service %p, wait queue active!\n",
-                        rqstp);
+
+       /* Make sure the task pointer is set! */
+       if (WARN_ON_ONCE(!rqstp->rq_task))
+               rqstp->rq_task = current_task;
 
        err = svc_alloc_arg(rqstp);
        if (err)