From: Tejun Heo <tj@kernel.org>
To: linux-kernel@vger.kernel.org, jeff@garzik.org, mingo@elte.hu,
akpm@linux-foundation.org, jens.axboe@oracle.com,
rusty@rustcorp.com.au, cl@linux-foundation.org,
dhowells@redhat.com, arjan@linux.intel.com,
torvalds@linux-foundation.org, avi@redhat.com,
peterz@infradead.org, andi@firstfloor.org, fweisbec@gmail.com
Cc: Tejun Heo <tj@kernel.org>
Subject: [PATCH 19/21] workqueue: introduce worker
Date: Tue, 17 Nov 2009 02:15:24 +0900 [thread overview]
Message-ID: <1258391726-30264-20-git-send-email-tj@kernel.org> (raw)
In-Reply-To: <1258391726-30264-1-git-send-email-tj@kernel.org>
Separate out worker thread related information to struct worker from
struct cpu_workqueue_struct and implement helper functions to deal
with the new struct worker. The only change which is visible outside
is that now workqueue worker are all named "kworker/CPUID:WORKERID"
where WORKERID is allocated from per-cpu ida.
This is in preparation of concurrency managed workqueue where shared
multiple workers would be available per cpu.
Signed-off-by: Tejun Heo <tj@kernel.org>
---
kernel/workqueue.c | 219 +++++++++++++++++++++++++++++++++++++---------------
1 files changed, 157 insertions(+), 62 deletions(-)
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index dce5ad5..6694b3e 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -34,6 +34,7 @@
#include <linux/debug_locks.h>
#include <linux/lockdep.h>
#include <linux/mutex.h>
+#include <linux/idr.h>
/*
* Structure fields follow one of the following exclusion rules.
@@ -47,6 +48,15 @@
* W: workqueue_lock protected.
*/
+struct cpu_workqueue_struct;
+
+struct worker {
+ struct work_struct *current_work; /* L: work being processed */
+ struct task_struct *task; /* I: worker task */
+ struct cpu_workqueue_struct *cwq; /* I: the associated cwq */
+ int id; /* I: worker id */
+};
+
/*
* The per-CPU workqueue (if single thread, we always use the first
* possible cpu). The lower WORK_STRUCT_FLAG_BITS of
@@ -59,14 +69,14 @@ struct cpu_workqueue_struct {
struct list_head worklist;
wait_queue_head_t more_work;
- struct work_struct *current_work;
+ unsigned int cpu;
+ struct worker *worker;
struct workqueue_struct *wq; /* I: the owning workqueue */
int work_color; /* L: current color */
int flush_color; /* L: flushing color */
int nr_in_flight[WORK_COLORS];
/* L: nr of in_flight works */
- struct task_struct *thread;
} __attribute__((aligned(1 << WORK_STRUCT_FLAG_BITS)));
/*
@@ -215,6 +225,9 @@ static inline void debug_work_deactivate(struct work_struct *work) { }
/* Serializes the accesses to the list of workqueues. */
static DEFINE_SPINLOCK(workqueue_lock);
static LIST_HEAD(workqueues);
+static DEFINE_PER_CPU(struct ida, worker_ida);
+
+static int worker_thread(void *__worker);
static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
struct workqueue_struct *wq)
@@ -413,6 +426,105 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
}
EXPORT_SYMBOL_GPL(queue_delayed_work_on);
+static struct worker *alloc_worker(void)
+{
+ struct worker *worker;
+
+ worker = kzalloc(sizeof(*worker), GFP_KERNEL);
+ return worker;
+}
+
+/**
+ * create_worker - create a new workqueue worker
+ * @cwq: cwq the new worker will belong to
+ * @bind: whether to set affinity to @cpu or not
+ *
+ * Create a new worker which is bound to @cwq. The returned worker
+ * can be started by calling start_worker() or destroyed using
+ * destroy_worker().
+ *
+ * CONTEXT:
+ * Might sleep. Does GFP_KERNEL allocations.
+ *
+ * RETURNS:
+ * Pointer to the newly created worker.
+ */
+static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
+{
+ int id = -1;
+ struct worker *worker = NULL;
+
+ spin_lock(&workqueue_lock);
+ while (ida_get_new(&per_cpu(worker_ida, cwq->cpu), &id)) {
+ spin_unlock_irq(&workqueue_lock);
+ if (!ida_pre_get(&per_cpu(worker_ida, cwq->cpu), GFP_KERNEL))
+ goto fail;
+ spin_lock(&workqueue_lock);
+ }
+ spin_unlock_irq(&workqueue_lock);
+
+ worker = alloc_worker();
+ if (!worker)
+ goto fail;
+
+ worker->cwq = cwq;
+ worker->id = id;
+
+ worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
+ cwq->cpu, id);
+ if (IS_ERR(worker->task))
+ goto fail;
+
+ if (bind)
+ kthread_bind(worker->task, cwq->cpu);
+
+ return worker;
+fail:
+ if (id >= 0) {
+ spin_lock(&workqueue_lock);
+ ida_remove(&per_cpu(worker_ida, cwq->cpu), id);
+ spin_unlock_irq(&workqueue_lock);
+ }
+ kfree(worker);
+ return NULL;
+}
+
+/**
+ * start_worker - start a newly created worker
+ * @worker: worker to start
+ *
+ * Start @worker.
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock).
+ */
+static void start_worker(struct worker *worker)
+{
+ wake_up_process(worker->task);
+}
+
+/**
+ * destroy_worker - destroy a workqueue worker
+ * @worker: worker to be destroyed
+ *
+ * Destroy @worker.
+ */
+static void destroy_worker(struct worker *worker)
+{
+ int cpu = worker->cwq->cpu;
+ int id = worker->id;
+
+ /* sanity check frenzy */
+ BUG_ON(worker->current_work);
+
+ kthread_stop(worker->task);
+ kfree(worker);
+
+ spin_lock_irq(&workqueue_lock);
+ ida_remove(&per_cpu(worker_ida, cpu), id);
+ spin_unlock_irq(&workqueue_lock);
+}
+
/**
* cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
* @cwq: cwq of interest
@@ -450,7 +562,7 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq,
/**
* process_one_work - process single work
- * @cwq: cwq to process work for
+ * @worker: self
* @work: work to process
*
* Process @work. This function contains all the logics necessary to
@@ -462,9 +574,9 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq,
* CONTEXT:
* spin_lock_irq(cwq->lock) which is released and regrabbed.
*/
-static void process_one_work(struct cpu_workqueue_struct *cwq,
- struct work_struct *work)
+static void process_one_work(struct worker *worker, struct work_struct *work)
{
+ struct cpu_workqueue_struct *cwq = worker->cwq;
struct workqueue_struct *wq = cwq->wq;
bool single_thread = wq->flags & WQ_SINGLE_THREAD;
work_func_t f = work->func;
@@ -481,7 +593,7 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
#endif
/* claim and process */
debug_work_deactivate(work);
- cwq->current_work = work;
+ worker->current_work = work;
work_color = work_flags_to_color(*work_data_bits(work));
list_del_init(&work->entry);
@@ -515,30 +627,33 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
spin_lock_irq(&cwq->lock);
/* we're done with it, release */
- cwq->current_work = NULL;
+ worker->current_work = NULL;
cwq_dec_nr_in_flight(cwq, work_color);
}
-static void run_workqueue(struct cpu_workqueue_struct *cwq)
+static void run_workqueue(struct worker *worker)
{
+ struct cpu_workqueue_struct *cwq = worker->cwq;
+
spin_lock_irq(&cwq->lock);
while (!list_empty(&cwq->worklist)) {
struct work_struct *work = list_entry(cwq->worklist.next,
struct work_struct, entry);
- process_one_work(cwq, work);
+ process_one_work(worker, work);
}
spin_unlock_irq(&cwq->lock);
}
/**
* worker_thread - the worker thread function
- * @__cwq: cwq to serve
+ * @__worker: self
*
* The cwq worker thread function.
*/
-static int worker_thread(void *__cwq)
+static int worker_thread(void *__worker)
{
- struct cpu_workqueue_struct *cwq = __cwq;
+ struct worker *worker = __worker;
+ struct cpu_workqueue_struct *cwq = worker->cwq;
DEFINE_WAIT(wait);
if (cwq->wq->flags & WQ_FREEZEABLE)
@@ -557,7 +672,7 @@ static int worker_thread(void *__cwq)
if (kthread_should_stop())
break;
- run_workqueue(cwq);
+ run_workqueue(worker);
}
return 0;
@@ -855,7 +970,7 @@ int flush_work(struct work_struct *work)
goto already_gone;
prev = &work->entry;
} else {
- if (cwq->current_work != work)
+ if (!cwq->worker || cwq->worker->current_work != work)
goto already_gone;
prev = &cwq->worklist;
}
@@ -920,7 +1035,7 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
int running = 0;
spin_lock_irq(&cwq->lock);
- if (unlikely(cwq->current_work == work)) {
+ if (unlikely(cwq->worker && cwq->worker->current_work == work)) {
insert_wq_barrier(cwq, &barr, cwq->worklist.next);
running = 1;
}
@@ -1183,52 +1298,21 @@ int current_is_keventd(void)
BUG_ON(!keventd_wq);
cwq = get_cwq(cpu, keventd_wq);
- if (current == cwq->thread)
+ if (current == cwq->worker->task)
ret = 1;
return ret;
}
-static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
-{
- struct workqueue_struct *wq = cwq->wq;
- struct task_struct *p;
-
- p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
- /*
- * Nobody can add the work_struct to this cwq,
- * if (caller is __create_workqueue)
- * nobody should see this wq
- * else // caller is CPU_UP_PREPARE
- * cpu is not on cpu_online_map
- * so we can abort safely.
- */
- if (IS_ERR(p))
- return PTR_ERR(p);
- cwq->thread = p;
-
- return 0;
-}
-
-static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
-{
- struct task_struct *p = cwq->thread;
-
- if (p != NULL) {
- if (cpu >= 0)
- kthread_bind(p, cpu);
- wake_up_process(p);
- }
-}
-
struct workqueue_struct *__create_workqueue_key(const char *name,
unsigned int flags,
struct lock_class_key *key,
const char *lock_name)
{
struct workqueue_struct *wq;
- int err = 0, cpu;
+ bool failed = false;
+ unsigned int cpu;
wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
@@ -1267,20 +1351,24 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
for_each_possible_cpu(cpu) {
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+ cwq->cpu = cpu;
cwq->wq = wq;
cwq->flush_color = -1;
spin_lock_init(&cwq->lock);
INIT_LIST_HEAD(&cwq->worklist);
init_waitqueue_head(&cwq->more_work);
- if (err || !cpu_online(cpu))
+ if (failed || !cpu_online(cpu))
continue;
- err = create_workqueue_thread(cwq, cpu);
- start_workqueue_thread(cwq, cpu);
+ cwq->worker = create_worker(cwq, true);
+ if (cwq->worker)
+ start_worker(cwq->worker);
+ else
+ failed = true;
}
cpu_maps_update_done();
- if (err) {
+ if (failed) {
destroy_workqueue(wq);
wq = NULL;
}
@@ -1316,9 +1404,9 @@ void destroy_workqueue(struct workqueue_struct *wq)
int i;
/* cpu_add_remove_lock protects cwq->thread */
- if (cwq->thread) {
- kthread_stop(cwq->thread);
- cwq->thread = NULL;
+ if (cwq->worker) {
+ destroy_worker(cwq->worker);
+ cwq->worker = NULL;
}
for (i = 0; i < WORK_COLORS; i++)
@@ -1349,7 +1437,8 @@ undo:
switch (action) {
case CPU_UP_PREPARE:
- if (!create_workqueue_thread(cwq, cpu))
+ cwq->worker = create_worker(cwq, false);
+ if (cwq->worker)
break;
printk(KERN_ERR "workqueue [%s] for %i failed\n",
wq->name, cpu);
@@ -1358,17 +1447,18 @@ undo:
goto undo;
case CPU_ONLINE:
- start_workqueue_thread(cwq, cpu);
+ kthread_bind(cwq->worker->task, cpu);
+ start_worker(cwq->worker);
break;
case CPU_UP_CANCELED:
- start_workqueue_thread(cwq, -1);
+ start_worker(cwq->worker);
case CPU_POST_DEAD:
flush_workqueue(wq);
/* cpu_add_remove_lock protects cwq->thread */
- if (cwq->thread) {
- kthread_stop(cwq->thread);
- cwq->thread = NULL;
+ if (cwq->worker) {
+ destroy_worker(cwq->worker);
+ cwq->worker = NULL;
}
break;
}
@@ -1426,6 +1516,8 @@ EXPORT_SYMBOL_GPL(work_on_cpu);
void __init init_workqueues(void)
{
+ unsigned int cpu;
+
/*
* cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
* Make sure that the alignment isn't lower than that of
@@ -1435,6 +1527,9 @@ void __init init_workqueues(void)
BUILD_BUG_ON(__alignof__(struct cpu_workqueue_struct) <
__alignof__(unsigned long long));
+ for_each_possible_cpu(cpu)
+ ida_init(&per_cpu(worker_ida, cpu));
+
hotcpu_notifier(workqueue_cpu_callback, 0);
keventd_wq = create_workqueue("events");
BUG_ON(!keventd_wq);
--
1.6.4.2
next prev parent reply other threads:[~2009-11-16 17:18 UTC|newest]
Thread overview: 47+ messages / expand[flat|nested] mbox.gz Atom feed top
2009-11-16 17:15 [PATCHSET] workqueue: prepare for concurrency managed workqueue Tejun Heo
2009-11-16 17:15 ` [PATCH 01/21] workqueue: fix race condition in schedule_on_each_cpu() Tejun Heo
2009-11-16 23:04 ` Frederic Weisbecker
2009-11-17 0:08 ` Tejun Heo
2009-11-17 7:04 ` Avi Kivity
2009-11-17 16:16 ` Tejun Heo
2009-11-16 17:15 ` [PATCH 02/21] sched, kvm: fix race condition involving sched_in_preempt_notifers Tejun Heo
2009-11-16 17:15 ` [PATCH 03/21] workqueue: Add debugobjects support Tejun Heo
2009-11-16 17:15 ` [PATCH 04/21] sched: implement scheduler notifiers Tejun Heo
2009-11-16 18:31 ` Avi Kivity
2009-11-16 18:43 ` Tejun Heo
2009-11-16 18:41 ` Peter Zijlstra
2009-11-16 18:54 ` Tejun Heo
2009-11-16 20:29 ` Peter Zijlstra
2009-11-17 16:16 ` Tejun Heo
2009-11-16 17:15 ` [PATCH 05/21] kvm: convert kvm to use new " Tejun Heo
2009-11-16 17:15 ` [PATCH 06/21] sched: drop preempt notifiers Tejun Heo
2009-11-16 17:15 ` [PATCH 07/21] sched: implement sched_notifier_wake_up_process() Tejun Heo
2009-11-16 17:15 ` [PATCH 08/21] scheduler: implement force_cpus_allowed_ptr() Tejun Heo
2009-11-17 5:14 ` Rusty Russell
2009-11-17 5:19 ` Tejun Heo
2009-11-16 17:15 ` [PATCH 09/21] acpi: use queue_work_on() instead of binding workqueue worker to cpu0 Tejun Heo
2009-11-16 17:15 ` [PATCH 10/21] stop_machine: reimplement without using workqueue Tejun Heo
2009-11-16 17:15 ` [PATCH 11/21] workqueue: misc/cosmetic updates Tejun Heo
2009-11-16 17:15 ` [PATCH 12/21] workqueue: merge feature parametesr into flags Tejun Heo
2009-11-16 17:15 ` [PATCH 13/21] workqueue: update cwq alignement and make one more flag bit available Tejun Heo
2009-11-16 17:15 ` [PATCH 14/21] workqueue: define both bit position and mask for work flags Tejun Heo
2009-11-16 17:15 ` [PATCH 15/21] workqueue: separate out process_one_work() Tejun Heo
2009-11-16 17:15 ` [PATCH 16/21] workqueue: temporarily disable workqueue tracing Tejun Heo
2009-11-16 17:15 ` [PATCH 17/21] workqueue: simple reimplementation of SINGLE_THREAD workqueue Tejun Heo
2009-11-17 0:47 ` Andy Walls
2009-11-17 5:23 ` Tejun Heo
2009-11-17 12:05 ` Andy Walls
2009-11-17 16:21 ` Tejun Heo
2009-11-17 16:26 ` Hi ... I want to introduce myself :) Setiajie 余鴻昌
2009-11-17 15:05 ` [PATCH 17/21] workqueue: simple reimplementation of SINGLE_THREAD workqueue Linus Torvalds
2009-11-17 16:12 ` Tejun Heo
2009-11-17 19:01 ` Linus Torvalds
2009-11-17 14:03 ` Johannes Berg
2009-11-17 16:24 ` Tejun Heo
2009-11-16 17:15 ` [PATCH 18/21] workqueue: reimplement workqueue flushing using color coded works Tejun Heo
2009-11-16 17:15 ` Tejun Heo [this message]
2009-11-17 11:39 ` [PATCH 19/21] workqueue: introduce worker Louis Rilling
2009-11-17 11:51 ` Louis Rilling
2009-11-17 16:25 ` Tejun Heo
2009-11-16 17:15 ` [PATCH 20/21] workqueue: reimplement work flushing using linked works Tejun Heo
2009-11-16 17:15 ` [PATCH 21/21] workqueue: reimplement workqueue freeze using cwq->frozen_works queue Tejun Heo
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1258391726-30264-20-git-send-email-tj@kernel.org \
--to=tj@kernel.org \
--cc=akpm@linux-foundation.org \
--cc=andi@firstfloor.org \
--cc=arjan@linux.intel.com \
--cc=avi@redhat.com \
--cc=cl@linux-foundation.org \
--cc=dhowells@redhat.com \
--cc=fweisbec@gmail.com \
--cc=jeff@garzik.org \
--cc=jens.axboe@oracle.com \
--cc=linux-kernel@vger.kernel.org \
--cc=mingo@elte.hu \
--cc=peterz@infradead.org \
--cc=rusty@rustcorp.com.au \
--cc=torvalds@linux-foundation.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.