#include <linux/lockdep.h>
#include <linux/idr.h>
+#include "workqueue_sched.h"
+
enum {
/* global_cwq flags */
+ GCWQ_MANAGE_WORKERS = 1 << 0, /* need to manage workers */
+ GCWQ_MANAGING_WORKERS = 1 << 1, /* managing workers */
+ GCWQ_DISASSOCIATED = 1 << 2, /* cpu can't serve workers */
GCWQ_FREEZING = 1 << 3, /* freeze in progress */
+ GCWQ_HIGHPRI_PENDING = 1 << 4, /* highpri works on queue */
/* worker flags */
WORKER_STARTED = 1 << 0, /* started */
WORKER_DIE = 1 << 1, /* die die die */
WORKER_IDLE = 1 << 2, /* is idle */
+ WORKER_PREP = 1 << 3, /* preparing to run works */
WORKER_ROGUE = 1 << 4, /* not bound to any cpu */
+ WORKER_REBIND = 1 << 5, /* mom is home, come back */
+ WORKER_CPU_INTENSIVE = 1 << 6, /* cpu intensive */
+
+ WORKER_NOT_RUNNING = WORKER_PREP | WORKER_ROGUE | WORKER_REBIND |
+ WORKER_CPU_INTENSIVE,
/* gcwq->trustee_state */
TRUSTEE_START = 0, /* start */
BUSY_WORKER_HASH_SIZE = 1 << BUSY_WORKER_HASH_ORDER,
BUSY_WORKER_HASH_MASK = BUSY_WORKER_HASH_SIZE - 1,
+ MAX_IDLE_WORKERS_RATIO = 4, /* 1/4 of busy can be idle */
+ IDLE_WORKER_TIMEOUT = 300 * HZ, /* keep idle ones for 5 mins */
+
+ MAYDAY_INITIAL_TIMEOUT = HZ / 100, /* call for help after 10ms */
+ MAYDAY_INTERVAL = HZ / 10, /* and then every 100ms */
+ CREATE_COOLDOWN = HZ, /* time to breath after fail */
TRUSTEE_COOLDOWN = HZ / 10, /* for trustee draining */
+
+ /*
+ * Rescue workers are used only on emergencies and shared by
+ * all cpus. Give -20.
+ */
+ RESCUER_NICE_LEVEL = -20,
};
/*
*
* I: Set during initialization and read-only afterwards.
*
+ * P: Preemption protected. Disabling preemption is enough and should
+ * only be modified and accessed from the local cpu.
+ *
* L: gcwq->lock protected. Access with gcwq->lock held.
*
+ * X: During normal operation, modification requires gcwq->lock and
+ * should be done only from local cpu. Either disabling preemption
+ * on local cpu or grabbing gcwq->lock is enough for read access.
+ * While trustee is in charge, it's identical to L.
+ *
* F: wq->flush_mutex protected.
*
* W: workqueue_lock protected.
*/
struct global_cwq;
-struct cpu_workqueue_struct;
+/*
+ * The poor guys doing the actual heavy lifting. All on-duty workers
+ * are either serving the manager role, on idle list or on busy hash.
+ */
struct worker {
/* on idle list while idle, on busy hash table while busy */
union {
struct list_head scheduled; /* L: scheduled works */
struct task_struct *task; /* I: worker task */
struct global_cwq *gcwq; /* I: the associated gcwq */
- struct cpu_workqueue_struct *cwq; /* I: the associated cwq */
- unsigned int flags; /* L: flags */
+ /* 64 bytes boundary on 64bit, 32 on 32bit */
+ unsigned long last_active; /* L: last active timestamp */
+ unsigned int flags; /* X: flags */
int id; /* I: worker id */
+ struct work_struct rebind_work; /* L: rebind worker to cpu */
};
/*
- * Global per-cpu workqueue.
+ * Global per-cpu workqueue. There's one and only one for each cpu
+ * and all works are queued and processed here regardless of their
+ * target workqueues.
*/
struct global_cwq {
spinlock_t lock; /* the gcwq lock */
+ struct list_head worklist; /* L: list of pending works */
unsigned int cpu; /* I: the associated cpu */
unsigned int flags; /* L: GCWQ_* flags */
int nr_idle; /* L: currently idle ones */
/* workers are chained either in the idle_list or busy_hash */
- struct list_head idle_list; /* L: list of idle workers */
+ struct list_head idle_list; /* X: list of idle workers */
struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE];
/* L: hash of busy workers */
+ struct timer_list idle_timer; /* L: worker idle timeout */
+ struct timer_list mayday_timer; /* L: SOS timer for dworkers */
+
struct ida worker_ida; /* L: for worker IDs */
struct task_struct *trustee; /* L: for gcwq shutdown */
unsigned int trustee_state; /* L: trustee state */
wait_queue_head_t trustee_wait; /* trustee wait */
+ struct worker *first_idle; /* L: first idle worker */
} ____cacheline_aligned_in_smp;
/*
*/
struct cpu_workqueue_struct {
struct global_cwq *gcwq; /* I: the associated gcwq */
- struct list_head worklist;
- struct worker *worker;
struct workqueue_struct *wq; /* I: the owning workqueue */
int work_color; /* L: current color */
int flush_color; /* L: flushing color */
unsigned long single_cpu; /* cpu for single cpu wq */
- int saved_max_active; /* I: saved cwq max_active */
+ cpumask_var_t mayday_mask; /* cpus requesting rescue */
+ struct worker *rescuer; /* I: rescue worker */
+
+ int saved_max_active; /* W: saved cwq max_active */
const char *name; /* I: workqueue name */
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
};
+struct workqueue_struct *system_wq __read_mostly;
+struct workqueue_struct *system_long_wq __read_mostly;
+struct workqueue_struct *system_nrt_wq __read_mostly;
+EXPORT_SYMBOL_GPL(system_wq);
+EXPORT_SYMBOL_GPL(system_long_wq);
+EXPORT_SYMBOL_GPL(system_nrt_wq);
+
#define for_each_busy_worker(worker, i, pos, gcwq) \
for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) \
hlist_for_each_entry(worker, pos, &gcwq->busy_hash[i], hentry)
static LIST_HEAD(workqueues);
static bool workqueue_freezing; /* W: have wqs started freezing? */
+/*
+ * The almighty global cpu workqueues. nr_running is the only field
+ * which is expected to be used frequently by other cpus via
+ * try_to_wake_up(). Put it in a separate cacheline.
+ */
static DEFINE_PER_CPU(struct global_cwq, global_cwq);
+static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, gcwq_nr_running);
static int worker_thread(void *__worker);
return &per_cpu(global_cwq, cpu);
}
+static atomic_t *get_gcwq_nr_running(unsigned int cpu)
+{
+ return &per_cpu(gcwq_nr_running, cpu);
+}
+
static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
struct workqueue_struct *wq)
{
if (cpu == NR_CPUS)
return NULL;
- BUG_ON(cpu >= num_possible_cpus());
+ BUG_ON(cpu >= nr_cpu_ids);
return get_gcwq(cpu);
}
+/*
+ * Policy functions. These define the policies on how the global
+ * worker pool is managed. Unless noted otherwise, these functions
+ * assume that they're being called with gcwq->lock held.
+ */
+
+static bool __need_more_worker(struct global_cwq *gcwq)
+{
+ return !atomic_read(get_gcwq_nr_running(gcwq->cpu)) ||
+ gcwq->flags & GCWQ_HIGHPRI_PENDING;
+}
+
+/*
+ * Need to wake up a worker? Called from anything but currently
+ * running workers.
+ */
+static bool need_more_worker(struct global_cwq *gcwq)
+{
+ return !list_empty(&gcwq->worklist) && __need_more_worker(gcwq);
+}
+
+/* Can I start working? Called from busy but !running workers. */
+static bool may_start_working(struct global_cwq *gcwq)
+{
+ return gcwq->nr_idle;
+}
+
+/* Do I need to keep working? Called from currently running workers. */
+static bool keep_working(struct global_cwq *gcwq)
+{
+ atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu);
+
+ return !list_empty(&gcwq->worklist) && atomic_read(nr_running) <= 1;
+}
+
+/* Do we need a new worker? Called from manager. */
+static bool need_to_create_worker(struct global_cwq *gcwq)
+{
+ return need_more_worker(gcwq) && !may_start_working(gcwq);
+}
+
+/* Do I need to be the manager? */
+static bool need_to_manage_workers(struct global_cwq *gcwq)
+{
+ return need_to_create_worker(gcwq) || gcwq->flags & GCWQ_MANAGE_WORKERS;
+}
+
+/* Do we have too many workers and should some go away? */
+static bool too_many_workers(struct global_cwq *gcwq)
+{
+ bool managing = gcwq->flags & GCWQ_MANAGING_WORKERS;
+ int nr_idle = gcwq->nr_idle + managing; /* manager is considered idle */
+ int nr_busy = gcwq->nr_workers - nr_idle;
+
+ return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy;
+}
+
+/*
+ * Wake up functions.
+ */
+
+/* Return the first worker. Safe with preemption disabled */
+static struct worker *first_worker(struct global_cwq *gcwq)
+{
+ if (unlikely(list_empty(&gcwq->idle_list)))
+ return NULL;
+
+ return list_first_entry(&gcwq->idle_list, struct worker, entry);
+}
+
+/**
+ * wake_up_worker - wake up an idle worker
+ * @gcwq: gcwq to wake worker for
+ *
+ * Wake up the first idle worker of @gcwq.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ */
+static void wake_up_worker(struct global_cwq *gcwq)
+{
+ struct worker *worker = first_worker(gcwq);
+
+ if (likely(worker))
+ wake_up_process(worker->task);
+}
+
+/**
+ * wq_worker_waking_up - a worker is waking up
+ * @task: task waking up
+ * @cpu: CPU @task is waking up to
+ *
+ * This function is called during try_to_wake_up() when a worker is
+ * being awoken.
+ *
+ * CONTEXT:
+ * spin_lock_irq(rq->lock)
+ */
+void wq_worker_waking_up(struct task_struct *task, unsigned int cpu)
+{
+ struct worker *worker = kthread_data(task);
+
+ if (likely(!(worker->flags & WORKER_NOT_RUNNING)))
+ atomic_inc(get_gcwq_nr_running(cpu));
+}
+
+/**
+ * wq_worker_sleeping - a worker is going to sleep
+ * @task: task going to sleep
+ * @cpu: CPU in question, must be the current CPU number
+ *
+ * This function is called during schedule() when a busy worker is
+ * going to sleep. Worker on the same cpu can be woken up by
+ * returning pointer to its task.
+ *
+ * CONTEXT:
+ * spin_lock_irq(rq->lock)
+ *
+ * RETURNS:
+ * Worker task on @cpu to wake up, %NULL if none.
+ */
+struct task_struct *wq_worker_sleeping(struct task_struct *task,
+ unsigned int cpu)
+{
+ struct worker *worker = kthread_data(task), *to_wakeup = NULL;
+ struct global_cwq *gcwq = get_gcwq(cpu);
+ atomic_t *nr_running = get_gcwq_nr_running(cpu);
+
+ if (unlikely(worker->flags & WORKER_NOT_RUNNING))
+ return NULL;
+
+ /* this can only happen on the local cpu */
+ BUG_ON(cpu != raw_smp_processor_id());
+
+ /*
+ * The counterpart of the following dec_and_test, implied mb,
+ * worklist not empty test sequence is in insert_work().
+ * Please read comment there.
+ *
+ * NOT_RUNNING is clear. This means that trustee is not in
+ * charge and we're running on the local cpu w/ rq lock held
+ * and preemption disabled, which in turn means that none else
+ * could be manipulating idle_list, so dereferencing idle_list
+ * without gcwq lock is safe.
+ */
+ if (atomic_dec_and_test(nr_running) && !list_empty(&gcwq->worklist))
+ to_wakeup = first_worker(gcwq);
+ return to_wakeup ? to_wakeup->task : NULL;
+}
+
+/**
+ * worker_set_flags - set worker flags and adjust nr_running accordingly
+ * @worker: self
+ * @flags: flags to set
+ * @wakeup: wakeup an idle worker if necessary
+ *
+ * Set @flags in @worker->flags and adjust nr_running accordingly. If
+ * nr_running becomes zero and @wakeup is %true, an idle worker is
+ * woken up.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock)
+ */
+static inline void worker_set_flags(struct worker *worker, unsigned int flags,
+ bool wakeup)
+{
+ struct global_cwq *gcwq = worker->gcwq;
+
+ WARN_ON_ONCE(worker->task != current);
+
+ /*
+ * If transitioning into NOT_RUNNING, adjust nr_running and
+ * wake up an idle worker as necessary if requested by
+ * @wakeup.
+ */
+ if ((flags & WORKER_NOT_RUNNING) &&
+ !(worker->flags & WORKER_NOT_RUNNING)) {
+ atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu);
+
+ if (wakeup) {
+ if (atomic_dec_and_test(nr_running) &&
+ !list_empty(&gcwq->worklist))
+ wake_up_worker(gcwq);
+ } else
+ atomic_dec(nr_running);
+ }
+
+ worker->flags |= flags;
+}
+
+/**
+ * worker_clr_flags - clear worker flags and adjust nr_running accordingly
+ * @worker: self
+ * @flags: flags to clear
+ *
+ * Clear @flags in @worker->flags and adjust nr_running accordingly.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock)
+ */
+static inline void worker_clr_flags(struct worker *worker, unsigned int flags)
+{
+ struct global_cwq *gcwq = worker->gcwq;
+ unsigned int oflags = worker->flags;
+
+ WARN_ON_ONCE(worker->task != current);
+
+ worker->flags &= ~flags;
+
+ /* if transitioning out of NOT_RUNNING, increment nr_running */
+ if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING))
+ if (!(worker->flags & WORKER_NOT_RUNNING))
+ atomic_inc(get_gcwq_nr_running(gcwq->cpu));
+}
+
/**
* busy_worker_head - return the busy hash head for a work
* @gcwq: gcwq of interest
}
/**
- * insert_work - insert a work into cwq
+ * gcwq_determine_ins_pos - find insertion position
+ * @gcwq: gcwq of interest
+ * @cwq: cwq a work is being queued for
+ *
+ * A work for @cwq is about to be queued on @gcwq, determine insertion
+ * position for the work. If @cwq is for HIGHPRI wq, the work is
+ * queued at the head of the queue but in FIFO order with respect to
+ * other HIGHPRI works; otherwise, at the end of the queue. This
+ * function also sets GCWQ_HIGHPRI_PENDING flag to hint @gcwq that
+ * there are HIGHPRI works pending.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ *
+ * RETURNS:
+ * Pointer to inserstion position.
+ */
+static inline struct list_head *gcwq_determine_ins_pos(struct global_cwq *gcwq,
+ struct cpu_workqueue_struct *cwq)
+{
+ struct work_struct *twork;
+
+ if (likely(!(cwq->wq->flags & WQ_HIGHPRI)))
+ return &gcwq->worklist;
+
+ list_for_each_entry(twork, &gcwq->worklist, entry) {
+ struct cpu_workqueue_struct *tcwq = get_work_cwq(twork);
+
+ if (!(tcwq->wq->flags & WQ_HIGHPRI))
+ break;
+ }
+
+ gcwq->flags |= GCWQ_HIGHPRI_PENDING;
+ return &twork->entry;
+}
+
+/**
+ * insert_work - insert a work into gcwq
* @cwq: cwq @work belongs to
* @work: work to insert
* @head: insertion point
* @extra_flags: extra WORK_STRUCT_* flags to set
*
- * Insert @work into @cwq after @head.
+ * Insert @work which belongs to @cwq into @gcwq after @head.
+ * @extra_flags is or'd to work_struct flags.
*
* CONTEXT:
* spin_lock_irq(gcwq->lock).
struct work_struct *work, struct list_head *head,
unsigned int extra_flags)
{
+ struct global_cwq *gcwq = cwq->gcwq;
+
/* we own @work, set data and link */
set_work_cwq(work, cwq, extra_flags);
smp_wmb();
list_add_tail(&work->entry, head);
- wake_up_process(cwq->worker->task);
+
+ /*
+ * Ensure either worker_sched_deactivated() sees the above
+ * list_add_tail() or we see zero nr_running to avoid workers
+ * lying around lazily while there are works to be processed.
+ */
+ smp_mb();
+
+ if (__need_more_worker(gcwq))
+ wake_up_worker(gcwq);
}
/**
debug_work_activate(work);
- /* determine gcwq to use */
+ /*
+ * Determine gcwq to use. SINGLE_CPU is inherently
+ * NON_REENTRANT, so test it first.
+ */
if (!(wq->flags & WQ_SINGLE_CPU)) {
- /* just use the requested cpu for multicpu workqueues */
+ struct global_cwq *last_gcwq;
+
+ /*
+ * It's multi cpu. If @wq is non-reentrant and @work
+ * was previously on a different cpu, it might still
+ * be running there, in which case the work needs to
+ * be queued on that cpu to guarantee non-reentrance.
+ */
gcwq = get_gcwq(cpu);
- spin_lock_irqsave(&gcwq->lock, flags);
+ if (wq->flags & WQ_NON_REENTRANT &&
+ (last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) {
+ struct worker *worker;
+
+ spin_lock_irqsave(&last_gcwq->lock, flags);
+
+ worker = find_worker_executing_work(last_gcwq, work);
+
+ if (worker && worker->current_cwq->wq == wq)
+ gcwq = last_gcwq;
+ else {
+ /* meh... not running there, queue here */
+ spin_unlock_irqrestore(&last_gcwq->lock, flags);
+ spin_lock_irqsave(&gcwq->lock, flags);
+ }
+ } else
+ spin_lock_irqsave(&gcwq->lock, flags);
} else {
unsigned int req_cpu = cpu;
if (likely(cwq->nr_active < cwq->max_active)) {
cwq->nr_active++;
- worklist = &cwq->worklist;
+ worklist = gcwq_determine_ins_pos(gcwq, cwq);
} else
worklist = &cwq->delayed_works;
BUG_ON(!list_empty(&worker->entry) &&
(worker->hentry.next || worker->hentry.pprev));
+ /* can't use worker_set_flags(), also called from start_worker() */
worker->flags |= WORKER_IDLE;
gcwq->nr_idle++;
+ worker->last_active = jiffies;
/* idle_list is LIFO */
list_add(&worker->entry, &gcwq->idle_list);
- if (unlikely(worker->flags & WORKER_ROGUE))
+ if (likely(!(worker->flags & WORKER_ROGUE))) {
+ if (too_many_workers(gcwq) && !timer_pending(&gcwq->idle_timer))
+ mod_timer(&gcwq->idle_timer,
+ jiffies + IDLE_WORKER_TIMEOUT);
+ } else
wake_up_all(&gcwq->trustee_wait);
+
+ /* sanity check nr_running */
+ WARN_ON_ONCE(gcwq->nr_workers == gcwq->nr_idle &&
+ atomic_read(get_gcwq_nr_running(gcwq->cpu)));
}
/**
struct global_cwq *gcwq = worker->gcwq;
BUG_ON(!(worker->flags & WORKER_IDLE));
- worker->flags &= ~WORKER_IDLE;
+ worker_clr_flags(worker, WORKER_IDLE);
gcwq->nr_idle--;
list_del_init(&worker->entry);
}
+/**
+ * worker_maybe_bind_and_lock - bind worker to its cpu if possible and lock gcwq
+ * @worker: self
+ *
+ * Works which are scheduled while the cpu is online must at least be
+ * scheduled to a worker which is bound to the cpu so that if they are
+ * flushed from cpu callbacks while cpu is going down, they are
+ * guaranteed to execute on the cpu.
+ *
+ * This function is to be used by rogue workers and rescuers to bind
+ * themselves to the target cpu and may race with cpu going down or
+ * coming online. kthread_bind() can't be used because it may put the
+ * worker to already dead cpu and set_cpus_allowed_ptr() can't be used
+ * verbatim as it's best effort and blocking and gcwq may be
+ * [dis]associated in the meantime.
+ *
+ * This function tries set_cpus_allowed() and locks gcwq and verifies
+ * the binding against GCWQ_DISASSOCIATED which is set during
+ * CPU_DYING and cleared during CPU_ONLINE, so if the worker enters
+ * idle state or fetches works without dropping lock, it can guarantee
+ * the scheduling requirement described in the first paragraph.
+ *
+ * CONTEXT:
+ * Might sleep. Called without any lock but returns with gcwq->lock
+ * held.
+ *
+ * RETURNS:
+ * %true if the associated gcwq is online (@worker is successfully
+ * bound), %false if offline.
+ */
+static bool worker_maybe_bind_and_lock(struct worker *worker)
+{
+ struct global_cwq *gcwq = worker->gcwq;
+ struct task_struct *task = worker->task;
+
+ while (true) {
+ /*
+ * The following call may fail, succeed or succeed
+ * without actually migrating the task to the cpu if
+ * it races with cpu hotunplug operation. Verify
+ * against GCWQ_DISASSOCIATED.
+ */
+ set_cpus_allowed_ptr(task, get_cpu_mask(gcwq->cpu));
+
+ spin_lock_irq(&gcwq->lock);
+ if (gcwq->flags & GCWQ_DISASSOCIATED)
+ return false;
+ if (task_cpu(task) == gcwq->cpu &&
+ cpumask_equal(¤t->cpus_allowed,
+ get_cpu_mask(gcwq->cpu)))
+ return true;
+ spin_unlock_irq(&gcwq->lock);
+
+ /* CPU has come up inbetween, retry migration */
+ cpu_relax();
+ }
+}
+
+/*
+ * Function for worker->rebind_work used to rebind rogue busy workers
+ * to the associated cpu which is coming back online. This is
+ * scheduled by cpu up but can race with other cpu hotplug operations
+ * and may be executed twice without intervening cpu down.
+ */
+static void worker_rebind_fn(struct work_struct *work)
+{
+ struct worker *worker = container_of(work, struct worker, rebind_work);
+ struct global_cwq *gcwq = worker->gcwq;
+
+ if (worker_maybe_bind_and_lock(worker))
+ worker_clr_flags(worker, WORKER_REBIND);
+
+ spin_unlock_irq(&gcwq->lock);
+}
+
static struct worker *alloc_worker(void)
{
struct worker *worker;
if (worker) {
INIT_LIST_HEAD(&worker->entry);
INIT_LIST_HEAD(&worker->scheduled);
+ INIT_WORK(&worker->rebind_work, worker_rebind_fn);
+ /* on creation a worker is in !idle && prep state */
+ worker->flags = WORKER_PREP;
}
return worker;
}
/**
* create_worker - create a new workqueue worker
- * @cwq: cwq the new worker will belong to
+ * @gcwq: gcwq the new worker will belong to
* @bind: whether to set affinity to @cpu or not
*
- * Create a new worker which is bound to @cwq. The returned worker
+ * Create a new worker which is bound to @gcwq. The returned worker
* can be started by calling start_worker() or destroyed using
* destroy_worker().
*
* RETURNS:
* Pointer to the newly created worker.
*/
-static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
+static struct worker *create_worker(struct global_cwq *gcwq, bool bind)
{
- struct global_cwq *gcwq = cwq->gcwq;
int id = -1;
struct worker *worker = NULL;
goto fail;
worker->gcwq = gcwq;
- worker->cwq = cwq;
worker->id = id;
worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
}
/**
- * destroy_worker - destroy a workqueue worker
- * @worker: worker to be destroyed
+ * destroy_worker - destroy a workqueue worker
+ * @worker: worker to be destroyed
+ *
+ * Destroy @worker and adjust @gcwq stats accordingly.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock) which is released and regrabbed.
+ */
+static void destroy_worker(struct worker *worker)
+{
+ struct global_cwq *gcwq = worker->gcwq;
+ int id = worker->id;
+
+ /* sanity check frenzy */
+ BUG_ON(worker->current_work);
+ BUG_ON(!list_empty(&worker->scheduled));
+
+ if (worker->flags & WORKER_STARTED)
+ gcwq->nr_workers--;
+ if (worker->flags & WORKER_IDLE)
+ gcwq->nr_idle--;
+
+ list_del_init(&worker->entry);
+ worker->flags |= WORKER_DIE;
+
+ spin_unlock_irq(&gcwq->lock);
+
+ kthread_stop(worker->task);
+ kfree(worker);
+
+ spin_lock_irq(&gcwq->lock);
+ ida_remove(&gcwq->worker_ida, id);
+}
+
+static void idle_worker_timeout(unsigned long __gcwq)
+{
+ struct global_cwq *gcwq = (void *)__gcwq;
+
+ spin_lock_irq(&gcwq->lock);
+
+ if (too_many_workers(gcwq)) {
+ struct worker *worker;
+ unsigned long expires;
+
+ /* idle_list is kept in LIFO order, check the last one */
+ worker = list_entry(gcwq->idle_list.prev, struct worker, entry);
+ expires = worker->last_active + IDLE_WORKER_TIMEOUT;
+
+ if (time_before(jiffies, expires))
+ mod_timer(&gcwq->idle_timer, expires);
+ else {
+ /* it's been idle for too long, wake up manager */
+ gcwq->flags |= GCWQ_MANAGE_WORKERS;
+ wake_up_worker(gcwq);
+ }
+ }
+
+ spin_unlock_irq(&gcwq->lock);
+}
+
+static bool send_mayday(struct work_struct *work)
+{
+ struct cpu_workqueue_struct *cwq = get_work_cwq(work);
+ struct workqueue_struct *wq = cwq->wq;
+
+ if (!(wq->flags & WQ_RESCUER))
+ return false;
+
+ /* mayday mayday mayday */
+ if (!cpumask_test_and_set_cpu(cwq->gcwq->cpu, wq->mayday_mask))
+ wake_up_process(wq->rescuer->task);
+ return true;
+}
+
+static void gcwq_mayday_timeout(unsigned long __gcwq)
+{
+ struct global_cwq *gcwq = (void *)__gcwq;
+ struct work_struct *work;
+
+ spin_lock_irq(&gcwq->lock);
+
+ if (need_to_create_worker(gcwq)) {
+ /*
+ * We've been trying to create a new worker but
+ * haven't been successful. We might be hitting an
+ * allocation deadlock. Send distress signals to
+ * rescuers.
+ */
+ list_for_each_entry(work, &gcwq->worklist, entry)
+ send_mayday(work);
+ }
+
+ spin_unlock_irq(&gcwq->lock);
+
+ mod_timer(&gcwq->mayday_timer, jiffies + MAYDAY_INTERVAL);
+}
+
+/**
+ * maybe_create_worker - create a new worker if necessary
+ * @gcwq: gcwq to create a new worker for
+ *
+ * Create a new worker for @gcwq if necessary. @gcwq is guaranteed to
+ * have at least one idle worker on return from this function. If
+ * creating a new worker takes longer than MAYDAY_INTERVAL, mayday is
+ * sent to all rescuers with works scheduled on @gcwq to resolve
+ * possible allocation deadlock.
+ *
+ * On return, need_to_create_worker() is guaranteed to be false and
+ * may_start_working() true.
+ *
+ * LOCKING:
+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times. Does GFP_KERNEL allocations. Called only from
+ * manager.
+ *
+ * RETURNS:
+ * false if no action was taken and gcwq->lock stayed locked, true
+ * otherwise.
+ */
+static bool maybe_create_worker(struct global_cwq *gcwq)
+{
+ if (!need_to_create_worker(gcwq))
+ return false;
+restart:
+ /* if we don't make progress in MAYDAY_INITIAL_TIMEOUT, call for help */
+ mod_timer(&gcwq->mayday_timer, jiffies + MAYDAY_INITIAL_TIMEOUT);
+
+ while (true) {
+ struct worker *worker;
+
+ spin_unlock_irq(&gcwq->lock);
+
+ worker = create_worker(gcwq, true);
+ if (worker) {
+ del_timer_sync(&gcwq->mayday_timer);
+ spin_lock_irq(&gcwq->lock);
+ start_worker(worker);
+ BUG_ON(need_to_create_worker(gcwq));
+ return true;
+ }
+
+ if (!need_to_create_worker(gcwq))
+ break;
+
+ spin_unlock_irq(&gcwq->lock);
+ __set_current_state(TASK_INTERRUPTIBLE);
+ schedule_timeout(CREATE_COOLDOWN);
+ spin_lock_irq(&gcwq->lock);
+ if (!need_to_create_worker(gcwq))
+ break;
+ }
+
+ spin_unlock_irq(&gcwq->lock);
+ del_timer_sync(&gcwq->mayday_timer);
+ spin_lock_irq(&gcwq->lock);
+ if (need_to_create_worker(gcwq))
+ goto restart;
+ return true;
+}
+
+/**
+ * maybe_destroy_worker - destroy workers which have been idle for a while
+ * @gcwq: gcwq to destroy workers for
+ *
+ * Destroy @gcwq workers which have been idle for longer than
+ * IDLE_WORKER_TIMEOUT.
+ *
+ * LOCKING:
+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times. Called only from manager.
+ *
+ * RETURNS:
+ * false if no action was taken and gcwq->lock stayed locked, true
+ * otherwise.
+ */
+static bool maybe_destroy_workers(struct global_cwq *gcwq)
+{
+ bool ret = false;
+
+ while (too_many_workers(gcwq)) {
+ struct worker *worker;
+ unsigned long expires;
+
+ worker = list_entry(gcwq->idle_list.prev, struct worker, entry);
+ expires = worker->last_active + IDLE_WORKER_TIMEOUT;
+
+ if (time_before(jiffies, expires)) {
+ mod_timer(&gcwq->idle_timer, expires);
+ break;
+ }
+
+ destroy_worker(worker);
+ ret = true;
+ }
+
+ return ret;
+}
+
+/**
+ * manage_workers - manage worker pool
+ * @worker: self
*
- * Destroy @worker and adjust @gcwq stats accordingly.
+ * Assume the manager role and manage gcwq worker pool @worker belongs
+ * to. At any given time, there can be only zero or one manager per
+ * gcwq. The exclusion is handled automatically by this function.
+ *
+ * The caller can safely start processing works on false return. On
+ * true return, it's guaranteed that need_to_create_worker() is false
+ * and may_start_working() is true.
*
* CONTEXT:
- * spin_lock_irq(gcwq->lock) which is released and regrabbed.
+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times. Does GFP_KERNEL allocations.
+ *
+ * RETURNS:
+ * false if no action was taken and gcwq->lock stayed locked, true if
+ * some action was taken.
*/
-static void destroy_worker(struct worker *worker)
+static bool manage_workers(struct worker *worker)
{
struct global_cwq *gcwq = worker->gcwq;
- int id = worker->id;
+ bool ret = false;
- /* sanity check frenzy */
- BUG_ON(worker->current_work);
- BUG_ON(!list_empty(&worker->scheduled));
+ if (gcwq->flags & GCWQ_MANAGING_WORKERS)
+ return ret;
- if (worker->flags & WORKER_STARTED)
- gcwq->nr_workers--;
- if (worker->flags & WORKER_IDLE)
- gcwq->nr_idle--;
+ gcwq->flags &= ~GCWQ_MANAGE_WORKERS;
+ gcwq->flags |= GCWQ_MANAGING_WORKERS;
- list_del_init(&worker->entry);
- worker->flags |= WORKER_DIE;
+ /*
+ * Destroy and then create so that may_start_working() is true
+ * on return.
+ */
+ ret |= maybe_destroy_workers(gcwq);
+ ret |= maybe_create_worker(gcwq);
- spin_unlock_irq(&gcwq->lock);
+ gcwq->flags &= ~GCWQ_MANAGING_WORKERS;
- kthread_stop(worker->task);
- kfree(worker);
+ /*
+ * The trustee might be waiting to take over the manager
+ * position, tell it we're done.
+ */
+ if (unlikely(gcwq->trustee))
+ wake_up_all(&gcwq->trustee_wait);
- spin_lock_irq(&gcwq->lock);
- ida_remove(&gcwq->worker_ida, id);
+ return ret;
}
/**
{
struct work_struct *work = list_first_entry(&cwq->delayed_works,
struct work_struct, entry);
+ struct list_head *pos = gcwq_determine_ins_pos(cwq->gcwq, cwq);
- move_linked_works(work, &cwq->worklist, NULL);
+ move_linked_works(work, pos, NULL);
cwq->nr_active++;
}
*/
static void process_one_work(struct worker *worker, struct work_struct *work)
{
- struct cpu_workqueue_struct *cwq = worker->cwq;
+ struct cpu_workqueue_struct *cwq = get_work_cwq(work);
struct global_cwq *gcwq = cwq->gcwq;
struct hlist_head *bwh = busy_worker_head(gcwq, work);
+ bool cpu_intensive = cwq->wq->flags & WQ_CPU_INTENSIVE;
work_func_t f = work->func;
int work_color;
+ struct worker *collision;
#ifdef CONFIG_LOCKDEP
/*
* It is permissible to free the struct work_struct from
*/
struct lockdep_map lockdep_map = work->lockdep_map;
#endif
+ /*
+ * A single work shouldn't be executed concurrently by
+ * multiple workers on a single cpu. Check whether anyone is
+ * already processing the work. If so, defer the work to the
+ * currently executing one.
+ */
+ collision = __find_worker_executing_work(gcwq, bwh, work);
+ if (unlikely(collision)) {
+ move_linked_works(work, &collision->scheduled, NULL);
+ return;
+ }
+
/* claim and process */
debug_work_deactivate(work);
hlist_add_head(&worker->hentry, bwh);
worker->current_cwq = cwq;
work_color = get_work_color(work);
- BUG_ON(get_work_cwq(work) != cwq);
/* record the current cpu number in the work data and dequeue */
set_work_cpu(work, gcwq->cpu);
list_del_init(&work->entry);
+ /*
+ * If HIGHPRI_PENDING, check the next work, and, if HIGHPRI,
+ * wake up another worker; otherwise, clear HIGHPRI_PENDING.
+ */
+ if (unlikely(gcwq->flags & GCWQ_HIGHPRI_PENDING)) {
+ struct work_struct *nwork = list_first_entry(&gcwq->worklist,
+ struct work_struct, entry);
+
+ if (!list_empty(&gcwq->worklist) &&
+ get_work_cwq(nwork)->wq->flags & WQ_HIGHPRI)
+ wake_up_worker(gcwq);
+ else
+ gcwq->flags &= ~GCWQ_HIGHPRI_PENDING;
+ }
+
+ /*
+ * CPU intensive works don't participate in concurrency
+ * management. They're the scheduler's responsibility.
+ */
+ if (unlikely(cpu_intensive))
+ worker_set_flags(worker, WORKER_CPU_INTENSIVE, true);
+
spin_unlock_irq(&gcwq->lock);
work_clear_pending(work);
spin_lock_irq(&gcwq->lock);
+ /* clear cpu intensive status */
+ if (unlikely(cpu_intensive))
+ worker_clr_flags(worker, WORKER_CPU_INTENSIVE);
+
/* we're done with it, release */
hlist_del_init(&worker->hentry);
worker->current_work = NULL;
* worker_thread - the worker thread function
* @__worker: self
*
- * The cwq worker thread function.
+ * The gcwq worker thread function. There's a single dynamic pool of
+ * these per each cpu. These workers process all works regardless of
+ * their specific target workqueue. The only exception is works which
+ * belong to workqueues with a rescuer which will be explained in
+ * rescuer_thread().
*/
static int worker_thread(void *__worker)
{
struct worker *worker = __worker;
struct global_cwq *gcwq = worker->gcwq;
- struct cpu_workqueue_struct *cwq = worker->cwq;
+ /* tell the scheduler that this is a workqueue worker */
+ worker->task->flags |= PF_WQ_WORKER;
woke_up:
spin_lock_irq(&gcwq->lock);
/* DIE can be set only while we're idle, checking here is enough */
if (worker->flags & WORKER_DIE) {
spin_unlock_irq(&gcwq->lock);
+ worker->task->flags &= ~PF_WQ_WORKER;
return 0;
}
worker_leave_idle(worker);
recheck:
+ /* no more worker necessary? */
+ if (!need_more_worker(gcwq))
+ goto sleep;
+
+ /* do we need to manage? */
+ if (unlikely(!may_start_working(gcwq)) && manage_workers(worker))
+ goto recheck;
+
/*
* ->scheduled list can only be filled while a worker is
* preparing to process a work or actually processing it.
*/
BUG_ON(!list_empty(&worker->scheduled));
- while (!list_empty(&cwq->worklist)) {
+ /*
+ * When control reaches this point, we're guaranteed to have
+ * at least one idle worker or that someone else has already
+ * assumed the manager role.
+ */
+ worker_clr_flags(worker, WORKER_PREP);
+
+ do {
struct work_struct *work =
- list_first_entry(&cwq->worklist,
+ list_first_entry(&gcwq->worklist,
struct work_struct, entry);
- /*
- * The following is a rather inefficient way to close
- * race window against cpu hotplug operations. Will
- * be replaced soon.
- */
- if (unlikely(!(worker->flags & WORKER_ROGUE) &&
- !cpumask_equal(&worker->task->cpus_allowed,
- get_cpu_mask(gcwq->cpu)))) {
- spin_unlock_irq(&gcwq->lock);
- set_cpus_allowed_ptr(worker->task,
- get_cpu_mask(gcwq->cpu));
- cpu_relax();
- spin_lock_irq(&gcwq->lock);
- goto recheck;
- }
-
if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
/* optimization path, not strictly necessary */
process_one_work(worker, work);
move_linked_works(work, &worker->scheduled, NULL);
process_scheduled_works(worker);
}
- }
+ } while (keep_working(gcwq));
+
+ worker_set_flags(worker, WORKER_PREP, false);
+sleep:
+ if (unlikely(need_to_manage_workers(gcwq)) && manage_workers(worker))
+ goto recheck;
/*
- * gcwq->lock is held and there's no work to process, sleep.
- * Workers are woken up only while holding gcwq->lock, so
- * setting the current state before releasing gcwq->lock is
- * enough to prevent losing any event.
+ * gcwq->lock is held and there's no work to process and no
+ * need to manage, sleep. Workers are woken up only while
+ * holding gcwq->lock or from local cpu, so setting the
+ * current state before releasing gcwq->lock is enough to
+ * prevent losing any event.
*/
worker_enter_idle(worker);
__set_current_state(TASK_INTERRUPTIBLE);
goto woke_up;
}
+/**
+ * rescuer_thread - the rescuer thread function
+ * @__wq: the associated workqueue
+ *
+ * Workqueue rescuer thread function. There's one rescuer for each
+ * workqueue which has WQ_RESCUER set.
+ *
+ * Regular work processing on a gcwq may block trying to create a new
+ * worker which uses GFP_KERNEL allocation which has slight chance of
+ * developing into deadlock if some works currently on the same queue
+ * need to be processed to satisfy the GFP_KERNEL allocation. This is
+ * the problem rescuer solves.
+ *
+ * When such condition is possible, the gcwq summons rescuers of all
+ * workqueues which have works queued on the gcwq and let them process
+ * those works so that forward progress can be guaranteed.
+ *
+ * This should happen rarely.
+ */
+static int rescuer_thread(void *__wq)
+{
+ struct workqueue_struct *wq = __wq;
+ struct worker *rescuer = wq->rescuer;
+ struct list_head *scheduled = &rescuer->scheduled;
+ unsigned int cpu;
+
+ set_user_nice(current, RESCUER_NICE_LEVEL);
+repeat:
+ set_current_state(TASK_INTERRUPTIBLE);
+
+ if (kthread_should_stop())
+ return 0;
+
+ for_each_cpu(cpu, wq->mayday_mask) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+ struct global_cwq *gcwq = cwq->gcwq;
+ struct work_struct *work, *n;
+
+ __set_current_state(TASK_RUNNING);
+ cpumask_clear_cpu(cpu, wq->mayday_mask);
+
+ /* migrate to the target cpu if possible */
+ rescuer->gcwq = gcwq;
+ worker_maybe_bind_and_lock(rescuer);
+
+ /*
+ * Slurp in all works issued via this workqueue and
+ * process'em.
+ */
+ BUG_ON(!list_empty(&rescuer->scheduled));
+ list_for_each_entry_safe(work, n, &gcwq->worklist, entry)
+ if (get_work_cwq(work) == cwq)
+ move_linked_works(work, scheduled, &n);
+
+ process_scheduled_works(rescuer);
+ spin_unlock_irq(&gcwq->lock);
+ }
+
+ schedule();
+ goto repeat;
+}
+
struct wq_barrier {
struct work_struct work;
struct completion done;
mutex_lock(&wq->flush_mutex);
+ /* we might have raced, check again with mutex held */
+ if (wq->first_flusher != &this_flusher)
+ goto out_unlock;
+
wq->first_flusher = NULL;
BUG_ON(!list_empty(&this_flusher.list));
}
EXPORT_SYMBOL(cancel_delayed_work_sync);
-static struct workqueue_struct *keventd_wq __read_mostly;
-
/**
* schedule_work - put work task in global workqueue
* @work: job to be done
*/
int schedule_work(struct work_struct *work)
{
- return queue_work(keventd_wq, work);
+ return queue_work(system_wq, work);
}
EXPORT_SYMBOL(schedule_work);
*/
int schedule_work_on(int cpu, struct work_struct *work)
{
- return queue_work_on(cpu, keventd_wq, work);
+ return queue_work_on(cpu, system_wq, work);
}
EXPORT_SYMBOL(schedule_work_on);
int schedule_delayed_work(struct delayed_work *dwork,
unsigned long delay)
{
- return queue_delayed_work(keventd_wq, dwork, delay);
+ return queue_delayed_work(system_wq, dwork, delay);
}
EXPORT_SYMBOL(schedule_delayed_work);
int schedule_delayed_work_on(int cpu,
struct delayed_work *dwork, unsigned long delay)
{
- return queue_delayed_work_on(cpu, keventd_wq, dwork, delay);
+ return queue_delayed_work_on(cpu, system_wq, dwork, delay);
}
EXPORT_SYMBOL(schedule_delayed_work_on);
int schedule_on_each_cpu(work_func_t func)
{
int cpu;
- int orig = -1;
struct work_struct *works;
works = alloc_percpu(struct work_struct);
get_online_cpus();
- /*
- * When running in keventd don't schedule a work item on
- * itself. Can just call directly because the work queue is
- * already bound. This also is faster.
- */
- if (current_is_keventd())
- orig = raw_smp_processor_id();
-
for_each_online_cpu(cpu) {
struct work_struct *work = per_cpu_ptr(works, cpu);
INIT_WORK(work, func);
- if (cpu != orig)
- schedule_work_on(cpu, work);
+ schedule_work_on(cpu, work);
}
- if (orig >= 0)
- func(per_cpu_ptr(works, orig));
for_each_online_cpu(cpu)
flush_work(per_cpu_ptr(works, cpu));
*/
void flush_scheduled_work(void)
{
- flush_workqueue(keventd_wq);
+ flush_workqueue(system_wq);
}
EXPORT_SYMBOL(flush_scheduled_work);
int keventd_up(void)
{
- return keventd_wq != NULL;
-}
-
-int current_is_keventd(void)
-{
- struct cpu_workqueue_struct *cwq;
- int cpu = raw_smp_processor_id(); /* preempt-safe: keventd is per-cpu */
- int ret = 0;
-
- BUG_ON(!keventd_wq);
-
- cwq = get_cwq(cpu, keventd_wq);
- if (current == cwq->worker->task)
- ret = 1;
-
- return ret;
-
+ return system_wq != NULL;
}
static struct cpu_workqueue_struct *alloc_cwqs(void)
#endif
}
-struct workqueue_struct *__create_workqueue_key(const char *name,
- unsigned int flags,
- int max_active,
- struct lock_class_key *key,
- const char *lock_name)
+static int wq_clamp_max_active(int max_active, const char *name)
+{
+ if (max_active < 1 || max_active > WQ_MAX_ACTIVE)
+ printk(KERN_WARNING "workqueue: max_active %d requested for %s "
+ "is out of range, clamping between %d and %d\n",
+ max_active, name, 1, WQ_MAX_ACTIVE);
+
+ return clamp_val(max_active, 1, WQ_MAX_ACTIVE);
+}
+
+struct workqueue_struct *__alloc_workqueue_key(const char *name,
+ unsigned int flags,
+ int max_active,
+ struct lock_class_key *key,
+ const char *lock_name)
{
struct workqueue_struct *wq;
- bool failed = false;
unsigned int cpu;
- max_active = clamp_val(max_active, 1, INT_MAX);
+ max_active = max_active ?: WQ_DFL_ACTIVE;
+ max_active = wq_clamp_max_active(max_active, name);
wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
INIT_LIST_HEAD(&wq->list);
- cpu_maps_update_begin();
- /*
- * We must initialize cwqs for each possible cpu even if we
- * are going to call destroy_workqueue() finally. Otherwise
- * cpu_up() can hit the uninitialized cwq once we drop the
- * lock.
- */
for_each_possible_cpu(cpu) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
struct global_cwq *gcwq = get_gcwq(cpu);
cwq->wq = wq;
cwq->flush_color = -1;
cwq->max_active = max_active;
- INIT_LIST_HEAD(&cwq->worklist);
INIT_LIST_HEAD(&cwq->delayed_works);
+ }
- if (failed)
- continue;
- cwq->worker = create_worker(cwq, cpu_online(cpu));
- if (cwq->worker)
- start_worker(cwq->worker);
- else
- failed = true;
+ if (flags & WQ_RESCUER) {
+ struct worker *rescuer;
+
+ if (!alloc_cpumask_var(&wq->mayday_mask, GFP_KERNEL))
+ goto err;
+
+ wq->rescuer = rescuer = alloc_worker();
+ if (!rescuer)
+ goto err;
+
+ rescuer->task = kthread_create(rescuer_thread, wq, "%s", name);
+ if (IS_ERR(rescuer->task))
+ goto err;
+
+ wq->rescuer = rescuer;
+ rescuer->task->flags |= PF_THREAD_BOUND;
+ wake_up_process(rescuer->task);
}
/*
spin_unlock(&workqueue_lock);
- cpu_maps_update_done();
-
- if (failed) {
- destroy_workqueue(wq);
- wq = NULL;
- }
return wq;
err:
if (wq) {
free_cwqs(wq->cpu_wq);
+ free_cpumask_var(wq->mayday_mask);
+ kfree(wq->rescuer);
kfree(wq);
}
return NULL;
}
-EXPORT_SYMBOL_GPL(__create_workqueue_key);
+EXPORT_SYMBOL_GPL(__alloc_workqueue_key);
/**
* destroy_workqueue - safely terminate a workqueue
* wq list is used to freeze wq, remove from list after
* flushing is complete in case freeze races us.
*/
- cpu_maps_update_begin();
spin_lock(&workqueue_lock);
list_del(&wq->list);
spin_unlock(&workqueue_lock);
- cpu_maps_update_done();
+ /* sanity check */
for_each_possible_cpu(cpu) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
int i;
- if (cwq->worker) {
- spin_lock_irq(&cwq->gcwq->lock);
- destroy_worker(cwq->worker);
- cwq->worker = NULL;
- spin_unlock_irq(&cwq->gcwq->lock);
- }
-
for (i = 0; i < WORK_NR_COLORS; i++)
BUG_ON(cwq->nr_in_flight[i]);
BUG_ON(cwq->nr_active);
BUG_ON(!list_empty(&cwq->delayed_works));
}
+ if (wq->flags & WQ_RESCUER) {
+ kthread_stop(wq->rescuer->task);
+ free_cpumask_var(wq->mayday_mask);
+ }
+
free_cwqs(wq->cpu_wq);
kfree(wq);
}
EXPORT_SYMBOL_GPL(destroy_workqueue);
+/**
+ * workqueue_set_max_active - adjust max_active of a workqueue
+ * @wq: target workqueue
+ * @max_active: new max_active value.
+ *
+ * Set max_active of @wq to @max_active.
+ *
+ * CONTEXT:
+ * Don't call from IRQ context.
+ */
+void workqueue_set_max_active(struct workqueue_struct *wq, int max_active)
+{
+ unsigned int cpu;
+
+ max_active = wq_clamp_max_active(max_active, wq->name);
+
+ spin_lock(&workqueue_lock);
+
+ wq->saved_max_active = max_active;
+
+ for_each_possible_cpu(cpu) {
+ struct global_cwq *gcwq = get_gcwq(cpu);
+
+ spin_lock_irq(&gcwq->lock);
+
+ if (!(wq->flags & WQ_FREEZEABLE) ||
+ !(gcwq->flags & GCWQ_FREEZING))
+ get_cwq(gcwq->cpu, wq)->max_active = max_active;
+
+ spin_unlock_irq(&gcwq->lock);
+ }
+
+ spin_unlock(&workqueue_lock);
+}
+EXPORT_SYMBOL_GPL(workqueue_set_max_active);
+
+/**
+ * workqueue_congested - test whether a workqueue is congested
+ * @cpu: CPU in question
+ * @wq: target workqueue
+ *
+ * Test whether @wq's cpu workqueue for @cpu is congested. There is
+ * no synchronization around this function and the test result is
+ * unreliable and only useful as advisory hints or for debugging.
+ *
+ * RETURNS:
+ * %true if congested, %false otherwise.
+ */
+bool workqueue_congested(unsigned int cpu, struct workqueue_struct *wq)
+{
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+
+ return !list_empty(&cwq->delayed_works);
+}
+EXPORT_SYMBOL_GPL(workqueue_congested);
+
+/**
+ * work_cpu - return the last known associated cpu for @work
+ * @work: the work of interest
+ *
+ * RETURNS:
+ * CPU number if @work was ever queued. NR_CPUS otherwise.
+ */
+unsigned int work_cpu(struct work_struct *work)
+{
+ struct global_cwq *gcwq = get_work_gcwq(work);
+
+ return gcwq ? gcwq->cpu : NR_CPUS;
+}
+EXPORT_SYMBOL_GPL(work_cpu);
+
+/**
+ * work_busy - test whether a work is currently pending or running
+ * @work: the work to be tested
+ *
+ * Test whether @work is currently pending or running. There is no
+ * synchronization around this function and the test result is
+ * unreliable and only useful as advisory hints or for debugging.
+ * Especially for reentrant wqs, the pending state might hide the
+ * running state.
+ *
+ * RETURNS:
+ * OR'd bitmask of WORK_BUSY_* bits.
+ */
+unsigned int work_busy(struct work_struct *work)
+{
+ struct global_cwq *gcwq = get_work_gcwq(work);
+ unsigned long flags;
+ unsigned int ret = 0;
+
+ if (!gcwq)
+ return false;
+
+ spin_lock_irqsave(&gcwq->lock, flags);
+
+ if (work_pending(work))
+ ret |= WORK_BUSY_PENDING;
+ if (find_worker_executing_work(gcwq, work))
+ ret |= WORK_BUSY_RUNNING;
+
+ spin_unlock_irqrestore(&gcwq->lock, flags);
+
+ return ret;
+}
+EXPORT_SYMBOL_GPL(work_busy);
+
/*
* CPU hotplug.
*
- * CPU hotplug is implemented by allowing cwqs to be detached from
- * CPU, running with unbound workers and allowing them to be
- * reattached later if the cpu comes back online. A separate thread
- * is created to govern cwqs in such state and is called the trustee.
+ * There are two challenges in supporting CPU hotplug. Firstly, there
+ * are a lot of assumptions on strong associations among work, cwq and
+ * gcwq which make migrating pending and scheduled works very
+ * difficult to implement without impacting hot paths. Secondly,
+ * gcwqs serve mix of short, long and very long running works making
+ * blocked draining impractical.
+ *
+ * This is solved by allowing a gcwq to be detached from CPU, running
+ * it with unbound (rogue) workers and allowing it to be reattached
+ * later if the cpu comes back online. A separate thread is created
+ * to govern a gcwq in such state and is called the trustee of the
+ * gcwq.
*
* Trustee states and their descriptions.
*
* new trustee is started with this state.
*
* IN_CHARGE Once started, trustee will enter this state after
- * making all existing workers rogue. DOWN_PREPARE waits
- * for trustee to enter this state. After reaching
- * IN_CHARGE, trustee tries to execute the pending
- * worklist until it's empty and the state is set to
- * BUTCHER, or the state is set to RELEASE.
+ * assuming the manager role and making all existing
+ * workers rogue. DOWN_PREPARE waits for trustee to
+ * enter this state. After reaching IN_CHARGE, trustee
+ * tries to execute the pending worklist until it's empty
+ * and the state is set to BUTCHER, or the state is set
+ * to RELEASE.
*
* BUTCHER Command state which is set by the cpu callback after
* the cpu has went down. Once this state is set trustee
* RELEASE Command state which is set by the cpu callback if the
* cpu down has been canceled or it has come online
* again. After recognizing this state, trustee stops
- * trying to drain or butcher and transits to DONE.
+ * trying to drain or butcher and clears ROGUE, rebinds
+ * all remaining workers back to the cpu and releases
+ * manager role.
*
* DONE Trustee will enter this state after BUTCHER or RELEASE
* is complete.
{
struct global_cwq *gcwq = __gcwq;
struct worker *worker;
+ struct work_struct *work;
struct hlist_node *pos;
+ long rc;
int i;
BUG_ON(gcwq->cpu != smp_processor_id());
spin_lock_irq(&gcwq->lock);
/*
- * Make all workers rogue. Trustee must be bound to the
- * target cpu and can't be cancelled.
+ * Claim the manager position and make all workers rogue.
+ * Trustee must be bound to the target cpu and can't be
+ * cancelled.
*/
BUG_ON(gcwq->cpu != smp_processor_id());
+ rc = trustee_wait_event(!(gcwq->flags & GCWQ_MANAGING_WORKERS));
+ BUG_ON(rc < 0);
+
+ gcwq->flags |= GCWQ_MANAGING_WORKERS;
list_for_each_entry(worker, &gcwq->idle_list, entry)
worker->flags |= WORKER_ROGUE;
for_each_busy_worker(worker, i, pos, gcwq)
worker->flags |= WORKER_ROGUE;
+ /*
+ * Call schedule() so that we cross rq->lock and thus can
+ * guarantee sched callbacks see the rogue flag. This is
+ * necessary as scheduler callbacks may be invoked from other
+ * cpus.
+ */
+ spin_unlock_irq(&gcwq->lock);
+ schedule();
+ spin_lock_irq(&gcwq->lock);
+
+ /*
+ * Sched callbacks are disabled now. Zap nr_running. After
+ * this, nr_running stays zero and need_more_worker() and
+ * keep_working() are always true as long as the worklist is
+ * not empty.
+ */
+ atomic_set(get_gcwq_nr_running(gcwq->cpu), 0);
+
+ spin_unlock_irq(&gcwq->lock);
+ del_timer_sync(&gcwq->idle_timer);
+ spin_lock_irq(&gcwq->lock);
+
/*
* We're now in charge. Notify and proceed to drain. We need
* to keep the gcwq running during the whole CPU down
/*
* The original cpu is in the process of dying and may go away
* anytime now. When that happens, we and all workers would
- * be migrated to other cpus. Try draining any left work.
- * Note that if the gcwq is frozen, there may be frozen works
- * in freezeable cwqs. Don't declare completion while frozen.
+ * be migrated to other cpus. Try draining any left work. We
+ * want to get it over with ASAP - spam rescuers, wake up as
+ * many idlers as necessary and create new ones till the
+ * worklist is empty. Note that if the gcwq is frozen, there
+ * may be frozen works in freezeable cwqs. Don't declare
+ * completion while frozen.
*/
while (gcwq->nr_workers != gcwq->nr_idle ||
gcwq->flags & GCWQ_FREEZING ||
gcwq->trustee_state == TRUSTEE_IN_CHARGE) {
+ int nr_works = 0;
+
+ list_for_each_entry(work, &gcwq->worklist, entry) {
+ send_mayday(work);
+ nr_works++;
+ }
+
+ list_for_each_entry(worker, &gcwq->idle_list, entry) {
+ if (!nr_works--)
+ break;
+ wake_up_process(worker->task);
+ }
+
+ if (need_to_create_worker(gcwq)) {
+ spin_unlock_irq(&gcwq->lock);
+ worker = create_worker(gcwq, false);
+ spin_lock_irq(&gcwq->lock);
+ if (worker) {
+ worker->flags |= WORKER_ROGUE;
+ start_worker(worker);
+ }
+ }
+
/* give a breather */
if (trustee_wait_event_timeout(false, TRUSTEE_COOLDOWN) < 0)
break;
}
+ /*
+ * Either all works have been scheduled and cpu is down, or
+ * cpu down has already been canceled. Wait for and butcher
+ * all workers till we're canceled.
+ */
+ do {
+ rc = trustee_wait_event(!list_empty(&gcwq->idle_list));
+ while (!list_empty(&gcwq->idle_list))
+ destroy_worker(list_first_entry(&gcwq->idle_list,
+ struct worker, entry));
+ } while (gcwq->nr_workers && rc >= 0);
+
+ /*
+ * At this point, either draining has completed and no worker
+ * is left, or cpu down has been canceled or the cpu is being
+ * brought back up. There shouldn't be any idle one left.
+ * Tell the remaining busy ones to rebind once it finishes the
+ * currently scheduled works by scheduling the rebind_work.
+ */
+ WARN_ON(!list_empty(&gcwq->idle_list));
+
+ for_each_busy_worker(worker, i, pos, gcwq) {
+ struct work_struct *rebind_work = &worker->rebind_work;
+
+ /*
+ * Rebind_work may race with future cpu hotplug
+ * operations. Use a separate flag to mark that
+ * rebinding is scheduled.
+ */
+ worker->flags |= WORKER_REBIND;
+ worker->flags &= ~WORKER_ROGUE;
+
+ /* queue rebind_work, wq doesn't matter, use the default one */
+ if (test_and_set_bit(WORK_STRUCT_PENDING_BIT,
+ work_data_bits(rebind_work)))
+ continue;
+
+ debug_work_activate(rebind_work);
+ insert_work(get_cwq(gcwq->cpu, system_wq), rebind_work,
+ worker->scheduled.next,
+ work_color_to_flags(WORK_NO_COLOR));
+ }
+
+ /* relinquish manager role */
+ gcwq->flags &= ~GCWQ_MANAGING_WORKERS;
+
/* notify completion */
gcwq->trustee = NULL;
gcwq->trustee_state = TRUSTEE_DONE;
unsigned int cpu = (unsigned long)hcpu;
struct global_cwq *gcwq = get_gcwq(cpu);
struct task_struct *new_trustee = NULL;
- struct worker *worker;
- struct hlist_node *pos;
+ struct worker *uninitialized_var(new_worker);
unsigned long flags;
- int i;
action &= ~CPU_TASKS_FROZEN;
if (IS_ERR(new_trustee))
return notifier_from_errno(PTR_ERR(new_trustee));
kthread_bind(new_trustee, cpu);
+ /* fall through */
+ case CPU_UP_PREPARE:
+ BUG_ON(gcwq->first_idle);
+ new_worker = create_worker(gcwq, false);
+ if (!new_worker) {
+ if (new_trustee)
+ kthread_stop(new_trustee);
+ return NOTIFY_BAD;
+ }
}
/* some are called w/ irq disabled, don't disturb irq status */
gcwq->trustee_state = TRUSTEE_START;
wake_up_process(gcwq->trustee);
wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE);
+ /* fall through */
+ case CPU_UP_PREPARE:
+ BUG_ON(gcwq->first_idle);
+ gcwq->first_idle = new_worker;
+ break;
+
+ case CPU_DYING:
+ /*
+ * Before this, the trustee and all workers except for
+ * the ones which are still executing works from
+ * before the last CPU down must be on the cpu. After
+ * this, they'll all be diasporas.
+ */
+ gcwq->flags |= GCWQ_DISASSOCIATED;
break;
case CPU_POST_DEAD:
gcwq->trustee_state = TRUSTEE_BUTCHER;
+ /* fall through */
+ case CPU_UP_CANCELED:
+ destroy_worker(gcwq->first_idle);
+ gcwq->first_idle = NULL;
break;
case CPU_DOWN_FAILED:
case CPU_ONLINE:
+ gcwq->flags &= ~GCWQ_DISASSOCIATED;
if (gcwq->trustee_state != TRUSTEE_DONE) {
gcwq->trustee_state = TRUSTEE_RELEASE;
wake_up_process(gcwq->trustee);
wait_trustee_state(gcwq, TRUSTEE_DONE);
}
- /* clear ROGUE from all workers */
- list_for_each_entry(worker, &gcwq->idle_list, entry)
- worker->flags &= ~WORKER_ROGUE;
-
- for_each_busy_worker(worker, i, pos, gcwq)
- worker->flags &= ~WORKER_ROGUE;
+ /*
+ * Trustee is done and there might be no worker left.
+ * Put the first_idle in and request a real manager to
+ * take a look.
+ */
+ spin_unlock_irq(&gcwq->lock);
+ kthread_bind(gcwq->first_idle->task, cpu);
+ spin_lock_irq(&gcwq->lock);
+ gcwq->flags |= GCWQ_MANAGE_WORKERS;
+ start_worker(gcwq->first_idle);
+ gcwq->first_idle = NULL;
break;
}
*
* Start freezing workqueues. After this function returns, all
* freezeable workqueues will queue new works to their frozen_works
- * list instead of the cwq ones.
+ * list instead of gcwq->worklist.
*
* CONTEXT:
* Grabs and releases workqueue_lock and gcwq->lock's.
* thaw_workqueues - thaw workqueues
*
* Thaw workqueues. Normal queueing is restored and all collected
- * frozen works are transferred to their respective cwq worklists.
+ * frozen works are transferred to their respective gcwq worklists.
*
* CONTEXT:
* Grabs and releases workqueue_lock and gcwq->lock's.
if (wq->single_cpu == gcwq->cpu &&
!cwq->nr_active && list_empty(&cwq->delayed_works))
cwq_unbind_single_cpu(cwq);
-
- wake_up_process(cwq->worker->task);
}
+ wake_up_worker(gcwq);
+
spin_unlock_irq(&gcwq->lock);
}
struct global_cwq *gcwq = get_gcwq(cpu);
spin_lock_init(&gcwq->lock);
+ INIT_LIST_HEAD(&gcwq->worklist);
gcwq->cpu = cpu;
INIT_LIST_HEAD(&gcwq->idle_list);
for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)
INIT_HLIST_HEAD(&gcwq->busy_hash[i]);
+ init_timer_deferrable(&gcwq->idle_timer);
+ gcwq->idle_timer.function = idle_worker_timeout;
+ gcwq->idle_timer.data = (unsigned long)gcwq;
+
+ setup_timer(&gcwq->mayday_timer, gcwq_mayday_timeout,
+ (unsigned long)gcwq);
+
ida_init(&gcwq->worker_ida);
gcwq->trustee_state = TRUSTEE_DONE;
init_waitqueue_head(&gcwq->trustee_wait);
}
- keventd_wq = create_workqueue("events");
- BUG_ON(!keventd_wq);
+ /* create the initial worker */
+ for_each_online_cpu(cpu) {
+ struct global_cwq *gcwq = get_gcwq(cpu);
+ struct worker *worker;
+
+ worker = create_worker(gcwq, true);
+ BUG_ON(!worker);
+ spin_lock_irq(&gcwq->lock);
+ start_worker(worker);
+ spin_unlock_irq(&gcwq->lock);
+ }
+
+ system_wq = alloc_workqueue("events", 0, 0);
+ system_long_wq = alloc_workqueue("events_long", 0, 0);
+ system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);
+ BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq);
}