vhost: lockless enqueuing
authorJason Wang <jasowang@redhat.com>
Tue, 26 Apr 2016 02:14:33 +0000 (22:14 -0400)
committerMichael S. Tsirkin <mst@redhat.com>
Mon, 1 Aug 2016 18:44:51 +0000 (21:44 +0300)
We use spinlock to synchronize the work list now which may cause
unnecessary contentions. So this patch switch to use llist to remove
this contention. Pktgen tests shows about 5% improvement:

Before:
~1300000 pps
After:
~1370000 pps

Signed-off-by: Jason Wang <jasowang@redhat.com>
Reviewed-by: Michael S. Tsirkin <mst@redhat.com>
Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
drivers/vhost/vhost.c
drivers/vhost/vhost.h

index 73dd16d..0061a7b 100644 (file)
@@ -168,7 +168,7 @@ static int vhost_poll_wakeup(wait_queue_t *wait, unsigned mode, int sync,
 
 void vhost_work_init(struct vhost_work *work, vhost_work_fn_t fn)
 {
-       INIT_LIST_HEAD(&work->node);
+       clear_bit(VHOST_WORK_QUEUED, &work->flags);
        work->fn = fn;
        init_waitqueue_head(&work->done);
 }
@@ -246,15 +246,16 @@ EXPORT_SYMBOL_GPL(vhost_poll_flush);
 
 void vhost_work_queue(struct vhost_dev *dev, struct vhost_work *work)
 {
-       unsigned long flags;
+       if (!dev->worker)
+               return;
 
-       spin_lock_irqsave(&dev->work_lock, flags);
-       if (list_empty(&work->node)) {
-               list_add_tail(&work->node, &dev->work_list);
-               spin_unlock_irqrestore(&dev->work_lock, flags);
+       if (!test_and_set_bit(VHOST_WORK_QUEUED, &work->flags)) {
+               /* We can only add the work to the list after we're
+                * sure it was not in the list.
+                */
+               smp_mb();
+               llist_add(&work->node, &dev->work_list);
                wake_up_process(dev->worker);
-       } else {
-               spin_unlock_irqrestore(&dev->work_lock, flags);
        }
 }
 EXPORT_SYMBOL_GPL(vhost_work_queue);
@@ -262,7 +263,7 @@ EXPORT_SYMBOL_GPL(vhost_work_queue);
 /* A lockless hint for busy polling code to exit the loop */
 bool vhost_has_work(struct vhost_dev *dev)
 {
-       return !list_empty(&dev->work_list);
+       return !llist_empty(&dev->work_list);
 }
 EXPORT_SYMBOL_GPL(vhost_has_work);
 
@@ -305,7 +306,8 @@ static void vhost_vq_reset(struct vhost_dev *dev,
 static int vhost_worker(void *data)
 {
        struct vhost_dev *dev = data;
-       struct vhost_work *work = NULL;
+       struct vhost_work *work, *work_next;
+       struct llist_node *node;
        mm_segment_t oldfs = get_fs();
 
        set_fs(USER_DS);
@@ -315,29 +317,25 @@ static int vhost_worker(void *data)
                /* mb paired w/ kthread_stop */
                set_current_state(TASK_INTERRUPTIBLE);
 
-               spin_lock_irq(&dev->work_lock);
-
                if (kthread_should_stop()) {
-                       spin_unlock_irq(&dev->work_lock);
                        __set_current_state(TASK_RUNNING);
                        break;
                }
-               if (!list_empty(&dev->work_list)) {
-                       work = list_first_entry(&dev->work_list,
-                                               struct vhost_work, node);
-                       list_del_init(&work->node);
-               } else
-                       work = NULL;
-               spin_unlock_irq(&dev->work_lock);
 
-               if (work) {
+               node = llist_del_all(&dev->work_list);
+               if (!node)
+                       schedule();
+
+               node = llist_reverse_order(node);
+               /* make sure flag is seen after deletion */
+               smp_wmb();
+               llist_for_each_entry_safe(work, work_next, node, node) {
+                       clear_bit(VHOST_WORK_QUEUED, &work->flags);
                        __set_current_state(TASK_RUNNING);
                        work->fn(work);
                        if (need_resched())
                                schedule();
-               } else
-                       schedule();
-
+               }
        }
        unuse_mm(dev->mm);
        set_fs(oldfs);
@@ -398,9 +396,9 @@ void vhost_dev_init(struct vhost_dev *dev,
        dev->log_file = NULL;
        dev->memory = NULL;
        dev->mm = NULL;
-       spin_lock_init(&dev->work_lock);
-       INIT_LIST_HEAD(&dev->work_list);
        dev->worker = NULL;
+       init_llist_head(&dev->work_list);
+
 
        for (i = 0; i < dev->nvqs; ++i) {
                vq = dev->vqs[i];
@@ -566,7 +564,7 @@ void vhost_dev_cleanup(struct vhost_dev *dev, bool locked)
        /* No one will access memory at this point */
        kvfree(dev->memory);
        dev->memory = NULL;
-       WARN_ON(!list_empty(&dev->work_list));
+       WARN_ON(!llist_empty(&dev->work_list));
        if (dev->worker) {
                kthread_stop(dev->worker);
                dev->worker = NULL;
index d36d8be..6690e64 100644 (file)
 struct vhost_work;
 typedef void (*vhost_work_fn_t)(struct vhost_work *work);
 
+#define VHOST_WORK_QUEUED 1
 struct vhost_work {
-       struct list_head          node;
+       struct llist_node         node;
        vhost_work_fn_t           fn;
        wait_queue_head_t         done;
        int                       flushing;
        unsigned                  queue_seq;
        unsigned                  done_seq;
+       unsigned long             flags;
 };
 
 /* Poll a file (eventfd or socket) */
@@ -126,8 +128,7 @@ struct vhost_dev {
        int nvqs;
        struct file *log_file;
        struct eventfd_ctx *log_ctx;
-       spinlock_t work_lock;
-       struct list_head work_list;
+       struct llist_head work_list;
        struct task_struct *worker;
 };