workqueue: introduce NR_WORKER_POOLS and for_each_worker_pool()
authorTejun Heo <tj@kernel.org>
Sat, 14 Jul 2012 05:16:44 +0000 (22:16 -0700)
committerTejun Heo <tj@kernel.org>
Sat, 14 Jul 2012 05:16:44 +0000 (22:16 -0700)
Introduce NR_WORKER_POOLS and for_each_worker_pool() and convert code
paths which need to manipulate all pools in a gcwq to use them.
NR_WORKER_POOLS is currently one and for_each_worker_pool() iterates
over only @gcwq->pool.

Note that nr_running is per-pool property and converted to an array
with NR_WORKER_POOLS elements and renamed to pool_nr_running.  Note
that get_pool_nr_running() currently assumes 0 index.  The next patch
will make use of non-zero index.

The changes in this patch are mechanical and don't caues any
functional difference.  This is to prepare for multiple pools per
gcwq.

v2: nr_running indexing bug in get_pool_nr_running() fixed.

v3: Pointer to array is stupid.  Don't use it in get_pool_nr_running()
    as suggested by Linus.

Signed-off-by: Tejun Heo <tj@kernel.org>
Cc: Tony Luck <tony.luck@intel.com>
Cc: Fengguang Wu <fengguang.wu@intel.com>
Cc: Linus Torvalds <torvalds@linux-foundation.org>
kernel/workqueue.c

index 7a98bae..b0daaea 100644 (file)
@@ -74,6 +74,8 @@ enum {
        TRUSTEE_RELEASE         = 3,            /* release workers */
        TRUSTEE_DONE            = 4,            /* trustee is done */
 
+       NR_WORKER_POOLS         = 1,            /* # worker pools per gcwq */
+
        BUSY_WORKER_HASH_ORDER  = 6,            /* 64 pointers */
        BUSY_WORKER_HASH_SIZE   = 1 << BUSY_WORKER_HASH_ORDER,
        BUSY_WORKER_HASH_MASK   = BUSY_WORKER_HASH_SIZE - 1,
@@ -274,6 +276,9 @@ EXPORT_SYMBOL_GPL(system_nrt_freezable_wq);
 #define CREATE_TRACE_POINTS
 #include <trace/events/workqueue.h>
 
+#define for_each_worker_pool(pool, gcwq)                               \
+       for ((pool) = &(gcwq)->pool; (pool); (pool) = NULL)
+
 #define for_each_busy_worker(worker, i, pos, gcwq)                     \
        for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)                     \
                hlist_for_each_entry(worker, pos, &gcwq->busy_hash[i], hentry)
@@ -454,7 +459,7 @@ static bool workqueue_freezing;             /* W: have wqs started freezing? */
  * try_to_wake_up().  Put it in a separate cacheline.
  */
 static DEFINE_PER_CPU(struct global_cwq, global_cwq);
-static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, gcwq_nr_running);
+static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, pool_nr_running[NR_WORKER_POOLS]);
 
 /*
  * Global cpu workqueue and nr_running counter for unbound gcwq.  The
@@ -462,7 +467,9 @@ static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, gcwq_nr_running);
  * workers have WORKER_UNBOUND set.
  */
 static struct global_cwq unbound_global_cwq;
-static atomic_t unbound_gcwq_nr_running = ATOMIC_INIT(0);      /* always 0 */
+static atomic_t unbound_pool_nr_running[NR_WORKER_POOLS] = {
+       [0 ... NR_WORKER_POOLS - 1]     = ATOMIC_INIT(0),       /* always 0 */
+};
 
 static int worker_thread(void *__worker);
 
@@ -477,11 +484,12 @@ static struct global_cwq *get_gcwq(unsigned int cpu)
 static atomic_t *get_pool_nr_running(struct worker_pool *pool)
 {
        int cpu = pool->gcwq->cpu;
+       int idx = 0;
 
        if (cpu != WORK_CPU_UNBOUND)
-               return &per_cpu(gcwq_nr_running, cpu);
+               return &per_cpu(pool_nr_running, cpu)[idx];
        else
-               return &unbound_gcwq_nr_running;
+               return &unbound_pool_nr_running[idx];
 }
 
 static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
@@ -3345,9 +3353,30 @@ EXPORT_SYMBOL_GPL(work_busy);
        __ret1 < 0 ? -1 : 0;                                            \
 })
 
+static bool gcwq_is_managing_workers(struct global_cwq *gcwq)
+{
+       struct worker_pool *pool;
+
+       for_each_worker_pool(pool, gcwq)
+               if (pool->flags & POOL_MANAGING_WORKERS)
+                       return true;
+       return false;
+}
+
+static bool gcwq_has_idle_workers(struct global_cwq *gcwq)
+{
+       struct worker_pool *pool;
+
+       for_each_worker_pool(pool, gcwq)
+               if (!list_empty(&pool->idle_list))
+                       return true;
+       return false;
+}
+
 static int __cpuinit trustee_thread(void *__gcwq)
 {
        struct global_cwq *gcwq = __gcwq;
+       struct worker_pool *pool;
        struct worker *worker;
        struct work_struct *work;
        struct hlist_node *pos;
@@ -3363,13 +3392,15 @@ static int __cpuinit trustee_thread(void *__gcwq)
         * cancelled.
         */
        BUG_ON(gcwq->cpu != smp_processor_id());
-       rc = trustee_wait_event(!(gcwq->pool.flags & POOL_MANAGING_WORKERS));
+       rc = trustee_wait_event(!gcwq_is_managing_workers(gcwq));
        BUG_ON(rc < 0);
 
-       gcwq->pool.flags |= POOL_MANAGING_WORKERS;
+       for_each_worker_pool(pool, gcwq) {
+               pool->flags |= POOL_MANAGING_WORKERS;
 
-       list_for_each_entry(worker, &gcwq->pool.idle_list, entry)
-               worker->flags |= WORKER_ROGUE;
+               list_for_each_entry(worker, &pool->idle_list, entry)
+                       worker->flags |= WORKER_ROGUE;
+       }
 
        for_each_busy_worker(worker, i, pos, gcwq)
                worker->flags |= WORKER_ROGUE;
@@ -3390,10 +3421,12 @@ static int __cpuinit trustee_thread(void *__gcwq)
         * keep_working() are always true as long as the worklist is
         * not empty.
         */
-       atomic_set(get_pool_nr_running(&gcwq->pool), 0);
+       for_each_worker_pool(pool, gcwq)
+               atomic_set(get_pool_nr_running(pool), 0);
 
        spin_unlock_irq(&gcwq->lock);
-       del_timer_sync(&gcwq->pool.idle_timer);
+       for_each_worker_pool(pool, gcwq)
+               del_timer_sync(&pool->idle_timer);
        spin_lock_irq(&gcwq->lock);
 
        /*
@@ -3415,29 +3448,38 @@ static int __cpuinit trustee_thread(void *__gcwq)
         * may be frozen works in freezable cwqs.  Don't declare
         * completion while frozen.
         */
-       while (gcwq->pool.nr_workers != gcwq->pool.nr_idle ||
-              gcwq->flags & GCWQ_FREEZING ||
-              gcwq->trustee_state == TRUSTEE_IN_CHARGE) {
-               int nr_works = 0;
+       while (true) {
+               bool busy = false;
 
-               list_for_each_entry(work, &gcwq->pool.worklist, entry) {
-                       send_mayday(work);
-                       nr_works++;
-               }
+               for_each_worker_pool(pool, gcwq)
+                       busy |= pool->nr_workers != pool->nr_idle;
 
-               list_for_each_entry(worker, &gcwq->pool.idle_list, entry) {
-                       if (!nr_works--)
-                               break;
-                       wake_up_process(worker->task);
-               }
+               if (!busy && !(gcwq->flags & GCWQ_FREEZING) &&
+                   gcwq->trustee_state != TRUSTEE_IN_CHARGE)
+                       break;
 
-               if (need_to_create_worker(&gcwq->pool)) {
-                       spin_unlock_irq(&gcwq->lock);
-                       worker = create_worker(&gcwq->pool, false);
-                       spin_lock_irq(&gcwq->lock);
-                       if (worker) {
-                               worker->flags |= WORKER_ROGUE;
-                               start_worker(worker);
+               for_each_worker_pool(pool, gcwq) {
+                       int nr_works = 0;
+
+                       list_for_each_entry(work, &pool->worklist, entry) {
+                               send_mayday(work);
+                               nr_works++;
+                       }
+
+                       list_for_each_entry(worker, &pool->idle_list, entry) {
+                               if (!nr_works--)
+                                       break;
+                               wake_up_process(worker->task);
+                       }
+
+                       if (need_to_create_worker(pool)) {
+                               spin_unlock_irq(&gcwq->lock);
+                               worker = create_worker(pool, false);
+                               spin_lock_irq(&gcwq->lock);
+                               if (worker) {
+                                       worker->flags |= WORKER_ROGUE;
+                                       start_worker(worker);
+                               }
                        }
                }
 
@@ -3452,11 +3494,18 @@ static int __cpuinit trustee_thread(void *__gcwq)
         * all workers till we're canceled.
         */
        do {
-               rc = trustee_wait_event(!list_empty(&gcwq->pool.idle_list));
-               while (!list_empty(&gcwq->pool.idle_list))
-                       destroy_worker(list_first_entry(&gcwq->pool.idle_list,
-                                                       struct worker, entry));
-       } while (gcwq->pool.nr_workers && rc >= 0);
+               rc = trustee_wait_event(gcwq_has_idle_workers(gcwq));
+
+               i = 0;
+               for_each_worker_pool(pool, gcwq) {
+                       while (!list_empty(&pool->idle_list)) {
+                               worker = list_first_entry(&pool->idle_list,
+                                                         struct worker, entry);
+                               destroy_worker(worker);
+                       }
+                       i |= pool->nr_workers;
+               }
+       } while (i && rc >= 0);
 
        /*
         * At this point, either draining has completed and no worker
@@ -3465,7 +3514,8 @@ static int __cpuinit trustee_thread(void *__gcwq)
         * Tell the remaining busy ones to rebind once it finishes the
         * currently scheduled works by scheduling the rebind_work.
         */
-       WARN_ON(!list_empty(&gcwq->pool.idle_list));
+       for_each_worker_pool(pool, gcwq)
+               WARN_ON(!list_empty(&pool->idle_list));
 
        for_each_busy_worker(worker, i, pos, gcwq) {
                struct work_struct *rebind_work = &worker->rebind_work;
@@ -3490,7 +3540,8 @@ static int __cpuinit trustee_thread(void *__gcwq)
        }
 
        /* relinquish manager role */
-       gcwq->pool.flags &= ~POOL_MANAGING_WORKERS;
+       for_each_worker_pool(pool, gcwq)
+               pool->flags &= ~POOL_MANAGING_WORKERS;
 
        /* notify completion */
        gcwq->trustee = NULL;
@@ -3532,8 +3583,10 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
        unsigned int cpu = (unsigned long)hcpu;
        struct global_cwq *gcwq = get_gcwq(cpu);
        struct task_struct *new_trustee = NULL;
-       struct worker *uninitialized_var(new_worker);
+       struct worker *new_workers[NR_WORKER_POOLS] = { };
+       struct worker_pool *pool;
        unsigned long flags;
+       int i;
 
        action &= ~CPU_TASKS_FROZEN;
 
@@ -3546,12 +3599,12 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
                kthread_bind(new_trustee, cpu);
                /* fall through */
        case CPU_UP_PREPARE:
-               BUG_ON(gcwq->pool.first_idle);
-               new_worker = create_worker(&gcwq->pool, false);
-               if (!new_worker) {
-                       if (new_trustee)
-                               kthread_stop(new_trustee);
-                       return NOTIFY_BAD;
+               i = 0;
+               for_each_worker_pool(pool, gcwq) {
+                       BUG_ON(pool->first_idle);
+                       new_workers[i] = create_worker(pool, false);
+                       if (!new_workers[i++])
+                               goto err_destroy;
                }
        }
 
@@ -3568,8 +3621,11 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
                wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE);
                /* fall through */
        case CPU_UP_PREPARE:
-               BUG_ON(gcwq->pool.first_idle);
-               gcwq->pool.first_idle = new_worker;
+               i = 0;
+               for_each_worker_pool(pool, gcwq) {
+                       BUG_ON(pool->first_idle);
+                       pool->first_idle = new_workers[i++];
+               }
                break;
 
        case CPU_DYING:
@@ -3586,8 +3642,10 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
                gcwq->trustee_state = TRUSTEE_BUTCHER;
                /* fall through */
        case CPU_UP_CANCELED:
-               destroy_worker(gcwq->pool.first_idle);
-               gcwq->pool.first_idle = NULL;
+               for_each_worker_pool(pool, gcwq) {
+                       destroy_worker(pool->first_idle);
+                       pool->first_idle = NULL;
+               }
                break;
 
        case CPU_DOWN_FAILED:
@@ -3604,18 +3662,32 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
                 * Put the first_idle in and request a real manager to
                 * take a look.
                 */
-               spin_unlock_irq(&gcwq->lock);
-               kthread_bind(gcwq->pool.first_idle->task, cpu);
-               spin_lock_irq(&gcwq->lock);
-               gcwq->pool.flags |= POOL_MANAGE_WORKERS;
-               start_worker(gcwq->pool.first_idle);
-               gcwq->pool.first_idle = NULL;
+               for_each_worker_pool(pool, gcwq) {
+                       spin_unlock_irq(&gcwq->lock);
+                       kthread_bind(pool->first_idle->task, cpu);
+                       spin_lock_irq(&gcwq->lock);
+                       pool->flags |= POOL_MANAGE_WORKERS;
+                       start_worker(pool->first_idle);
+                       pool->first_idle = NULL;
+               }
                break;
        }
 
        spin_unlock_irqrestore(&gcwq->lock, flags);
 
        return notifier_from_errno(0);
+
+err_destroy:
+       if (new_trustee)
+               kthread_stop(new_trustee);
+
+       spin_lock_irqsave(&gcwq->lock, flags);
+       for (i = 0; i < NR_WORKER_POOLS; i++)
+               if (new_workers[i])
+                       destroy_worker(new_workers[i]);
+       spin_unlock_irqrestore(&gcwq->lock, flags);
+
+       return NOTIFY_BAD;
 }
 
 #ifdef CONFIG_SMP
@@ -3774,6 +3846,7 @@ void thaw_workqueues(void)
 
        for_each_gcwq_cpu(cpu) {
                struct global_cwq *gcwq = get_gcwq(cpu);
+               struct worker_pool *pool;
                struct workqueue_struct *wq;
 
                spin_lock_irq(&gcwq->lock);
@@ -3795,7 +3868,8 @@ void thaw_workqueues(void)
                                cwq_activate_first_delayed(cwq);
                }
 
-               wake_up_worker(&gcwq->pool);
+               for_each_worker_pool(pool, gcwq)
+                       wake_up_worker(pool);
 
                spin_unlock_irq(&gcwq->lock);
        }
@@ -3816,25 +3890,29 @@ static int __init init_workqueues(void)
        /* initialize gcwqs */
        for_each_gcwq_cpu(cpu) {
                struct global_cwq *gcwq = get_gcwq(cpu);
+               struct worker_pool *pool;
 
                spin_lock_init(&gcwq->lock);
-               gcwq->pool.gcwq = gcwq;
-               INIT_LIST_HEAD(&gcwq->pool.worklist);
                gcwq->cpu = cpu;
                gcwq->flags |= GCWQ_DISASSOCIATED;
 
-               INIT_LIST_HEAD(&gcwq->pool.idle_list);
                for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)
                        INIT_HLIST_HEAD(&gcwq->busy_hash[i]);
 
-               init_timer_deferrable(&gcwq->pool.idle_timer);
-               gcwq->pool.idle_timer.function = idle_worker_timeout;
-               gcwq->pool.idle_timer.data = (unsigned long)&gcwq->pool;
+               for_each_worker_pool(pool, gcwq) {
+                       pool->gcwq = gcwq;
+                       INIT_LIST_HEAD(&pool->worklist);
+                       INIT_LIST_HEAD(&pool->idle_list);
 
-               setup_timer(&gcwq->pool.mayday_timer, gcwq_mayday_timeout,
-                           (unsigned long)&gcwq->pool);
+                       init_timer_deferrable(&pool->idle_timer);
+                       pool->idle_timer.function = idle_worker_timeout;
+                       pool->idle_timer.data = (unsigned long)pool;
 
-               ida_init(&gcwq->pool.worker_ida);
+                       setup_timer(&pool->mayday_timer, gcwq_mayday_timeout,
+                                   (unsigned long)pool);
+
+                       ida_init(&pool->worker_ida);
+               }
 
                gcwq->trustee_state = TRUSTEE_DONE;
                init_waitqueue_head(&gcwq->trustee_wait);
@@ -3843,15 +3921,20 @@ static int __init init_workqueues(void)
        /* create the initial worker */
        for_each_online_gcwq_cpu(cpu) {
                struct global_cwq *gcwq = get_gcwq(cpu);
-               struct worker *worker;
+               struct worker_pool *pool;
 
                if (cpu != WORK_CPU_UNBOUND)
                        gcwq->flags &= ~GCWQ_DISASSOCIATED;
-               worker = create_worker(&gcwq->pool, true);
-               BUG_ON(!worker);
-               spin_lock_irq(&gcwq->lock);
-               start_worker(worker);
-               spin_unlock_irq(&gcwq->lock);
+
+               for_each_worker_pool(pool, gcwq) {
+                       struct worker *worker;
+
+                       worker = create_worker(pool, true);
+                       BUG_ON(!worker);
+                       spin_lock_irq(&gcwq->lock);
+                       start_worker(worker);
+                       spin_unlock_irq(&gcwq->lock);
+               }
        }
 
        system_wq = alloc_workqueue("events", 0, 0);