public inbox for linux-kernel@vger.kernel.org
 help / color / mirror / Atom feed
From: Tejun Heo <tj@kernel.org>
To: torvalds@linux-foundation.org, awalls@radix.net,
	linux-kernel@vger.kernel.org, jeff@garzik.org, mingo@elte.hu,
	akpm@linux-foundation.org, jens.axboe@oracle.com,
	rusty@rustcorp.com.au, cl@linux-foundation.org,
	dhowells@redhat.com, arjan@linux.intel.com, avi@redhat.com,
	peterz@infradead.org, johannes@sipsolutions.net,
	andi@firstfloor.org
Cc: Tejun Heo <tj@kernel.org>
Subject: [PATCH 17/27] workqueue: introduce worker
Date: Fri, 18 Dec 2009 21:57:58 +0900	[thread overview]
Message-ID: <1261141088-2014-18-git-send-email-tj@kernel.org> (raw)
In-Reply-To: <1261141088-2014-1-git-send-email-tj@kernel.org>

Separate out worker thread related information to struct worker from
struct cpu_workqueue_struct and implement helper functions to deal
with the new struct worker.  The only change which is visible outside
is that now workqueue worker are all named "kworker/CPUID:WORKERID"
where WORKERID is allocated from per-cpu ida.

This is in preparation of concurrency managed workqueue where shared
multiple workers would be available per cpu.

Signed-off-by: Tejun Heo <tj@kernel.org>
---
 kernel/workqueue.c |  211 +++++++++++++++++++++++++++++++++++++---------------
 1 files changed, 151 insertions(+), 60 deletions(-)

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 9e55c80..17d270b 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -33,6 +33,7 @@
 #include <linux/kallsyms.h>
 #include <linux/debug_locks.h>
 #include <linux/lockdep.h>
+#include <linux/idr.h>
 
 /*
  * Structure fields follow one of the following exclusion rules.
@@ -46,6 +47,15 @@
  * W: workqueue_lock protected.
  */
 
+struct cpu_workqueue_struct;
+
+struct worker {
+	struct work_struct	*current_work;	/* L: work being processed */
+	struct task_struct	*task;		/* I: worker task */
+	struct cpu_workqueue_struct *cwq;	/* I: the associated cwq */
+	int			id;		/* I: worker id */
+};
+
 /*
  * The per-CPU workqueue (if single thread, we always use the first
  * possible cpu).  The lower WORK_STRUCT_FLAG_BITS of
@@ -58,14 +68,14 @@ struct cpu_workqueue_struct {
 
 	struct list_head worklist;
 	wait_queue_head_t more_work;
-	struct work_struct *current_work;
+	unsigned int		cpu;
+	struct worker		*worker;
 
 	struct workqueue_struct *wq;		/* I: the owning workqueue */
 	int			work_color;	/* L: current color */
 	int			flush_color;	/* L: flushing color */
 	int			nr_in_flight[WORK_NR_COLORS];
 						/* L: nr of in_flight works */
-	struct task_struct	*thread;
 };
 
 /*
@@ -213,6 +223,9 @@ static inline void debug_work_deactivate(struct work_struct *work) { }
 /* Serializes the accesses to the list of workqueues. */
 static DEFINE_SPINLOCK(workqueue_lock);
 static LIST_HEAD(workqueues);
+static DEFINE_PER_CPU(struct ida, worker_ida);
+
+static int worker_thread(void *__worker);
 
 static int singlethread_cpu __read_mostly;
 
@@ -420,6 +433,105 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
 }
 EXPORT_SYMBOL_GPL(queue_delayed_work_on);
 
+static struct worker *alloc_worker(void)
+{
+	struct worker *worker;
+
+	worker = kzalloc(sizeof(*worker), GFP_KERNEL);
+	return worker;
+}
+
+/**
+ * create_worker - create a new workqueue worker
+ * @cwq: cwq the new worker will belong to
+ * @bind: whether to set affinity to @cpu or not
+ *
+ * Create a new worker which is bound to @cwq.  The returned worker
+ * can be started by calling start_worker() or destroyed using
+ * destroy_worker().
+ *
+ * CONTEXT:
+ * Might sleep.  Does GFP_KERNEL allocations.
+ *
+ * RETURNS:
+ * Pointer to the newly created worker.
+ */
+static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
+{
+	int id = -1;
+	struct worker *worker = NULL;
+
+	spin_lock(&workqueue_lock);
+	while (ida_get_new(&per_cpu(worker_ida, cwq->cpu), &id)) {
+		spin_unlock(&workqueue_lock);
+		if (!ida_pre_get(&per_cpu(worker_ida, cwq->cpu), GFP_KERNEL))
+			goto fail;
+		spin_lock(&workqueue_lock);
+	}
+	spin_unlock(&workqueue_lock);
+
+	worker = alloc_worker();
+	if (!worker)
+		goto fail;
+
+	worker->cwq = cwq;
+	worker->id = id;
+
+	worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
+				      cwq->cpu, id);
+	if (IS_ERR(worker->task))
+		goto fail;
+
+	if (bind)
+		kthread_bind(worker->task, cwq->cpu);
+
+	return worker;
+fail:
+	if (id >= 0) {
+		spin_lock(&workqueue_lock);
+		ida_remove(&per_cpu(worker_ida, cwq->cpu), id);
+		spin_unlock(&workqueue_lock);
+	}
+	kfree(worker);
+	return NULL;
+}
+
+/**
+ * start_worker - start a newly created worker
+ * @worker: worker to start
+ *
+ * Start @worker.
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock).
+ */
+static void start_worker(struct worker *worker)
+{
+	wake_up_process(worker->task);
+}
+
+/**
+ * destroy_worker - destroy a workqueue worker
+ * @worker: worker to be destroyed
+ *
+ * Destroy @worker.
+ */
+static void destroy_worker(struct worker *worker)
+{
+	int cpu = worker->cwq->cpu;
+	int id = worker->id;
+
+	/* sanity check frenzy */
+	BUG_ON(worker->current_work);
+
+	kthread_stop(worker->task);
+	kfree(worker);
+
+	spin_lock(&workqueue_lock);
+	ida_remove(&per_cpu(worker_ida, cpu), id);
+	spin_unlock(&workqueue_lock);
+}
+
 /**
  * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
  * @cwq: cwq of interest
@@ -460,7 +572,7 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
 
 /**
  * process_one_work - process single work
- * @cwq: cwq to process work for
+ * @worker: self
  * @work: work to process
  *
  * Process @work.  This function contains all the logics necessary to
@@ -472,9 +584,9 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
  * CONTEXT:
  * spin_lock_irq(cwq->lock) which is released and regrabbed.
  */
-static void process_one_work(struct cpu_workqueue_struct *cwq,
-			     struct work_struct *work)
+static void process_one_work(struct worker *worker, struct work_struct *work)
 {
+	struct cpu_workqueue_struct *cwq = worker->cwq;
 	work_func_t f = work->func;
 	int work_color;
 #ifdef CONFIG_LOCKDEP
@@ -489,7 +601,7 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
 #endif
 	/* claim and process */
 	debug_work_deactivate(work);
-	cwq->current_work = work;
+	worker->current_work = work;
 	work_color = work_flags_to_color(*work_data_bits(work));
 	list_del_init(&work->entry);
 
@@ -516,30 +628,33 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
 	spin_lock_irq(&cwq->lock);
 
 	/* we're done with it, release */
-	cwq->current_work = NULL;
+	worker->current_work = NULL;
 	cwq_dec_nr_in_flight(cwq, work_color);
 }
 
-static void run_workqueue(struct cpu_workqueue_struct *cwq)
+static void run_workqueue(struct worker *worker)
 {
+	struct cpu_workqueue_struct *cwq = worker->cwq;
+
 	spin_lock_irq(&cwq->lock);
 	while (!list_empty(&cwq->worklist)) {
 		struct work_struct *work = list_entry(cwq->worklist.next,
 						struct work_struct, entry);
-		process_one_work(cwq, work);
+		process_one_work(worker, work);
 	}
 	spin_unlock_irq(&cwq->lock);
 }
 
 /**
  * worker_thread - the worker thread function
- * @__cwq: cwq to serve
+ * @__worker: self
  *
  * The cwq worker thread function.
  */
-static int worker_thread(void *__cwq)
+static int worker_thread(void *__worker)
 {
-	struct cpu_workqueue_struct *cwq = __cwq;
+	struct worker *worker = __worker;
+	struct cpu_workqueue_struct *cwq = worker->cwq;
 	DEFINE_WAIT(wait);
 
 	if (cwq->wq->flags & WQ_FREEZEABLE)
@@ -558,7 +673,7 @@ static int worker_thread(void *__cwq)
 		if (kthread_should_stop())
 			break;
 
-		run_workqueue(cwq);
+		run_workqueue(worker);
 	}
 
 	return 0;
@@ -856,7 +971,7 @@ int flush_work(struct work_struct *work)
 			goto already_gone;
 		prev = &work->entry;
 	} else {
-		if (cwq->current_work != work)
+		if (!cwq->worker || cwq->worker->current_work != work)
 			goto already_gone;
 		prev = &cwq->worklist;
 	}
@@ -921,7 +1036,7 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
 	int running = 0;
 
 	spin_lock_irq(&cwq->lock);
-	if (unlikely(cwq->current_work == work)) {
+	if (unlikely(cwq->worker && cwq->worker->current_work == work)) {
 		insert_wq_barrier(cwq, &barr, cwq->worklist.next);
 		running = 1;
 	}
@@ -1184,7 +1299,7 @@ int current_is_keventd(void)
 	BUG_ON(!keventd_wq);
 
 	cwq = get_cwq(cpu, keventd_wq);
-	if (current == cwq->thread)
+	if (current == cwq->worker->task)
 		ret = 1;
 
 	return ret;
@@ -1229,38 +1344,6 @@ static void free_cwqs(struct cpu_workqueue_struct *cwqs)
 #endif
 }
 
-static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
-{
-	struct workqueue_struct *wq = cwq->wq;
-	struct task_struct *p;
-
-	p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
-	/*
-	 * Nobody can add the work_struct to this cwq,
-	 *	if (caller is __create_workqueue)
-	 *		nobody should see this wq
-	 *	else // caller is CPU_UP_PREPARE
-	 *		cpu is not on cpu_online_map
-	 * so we can abort safely.
-	 */
-	if (IS_ERR(p))
-		return PTR_ERR(p);
-	cwq->thread = p;
-
-	return 0;
-}
-
-static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
-{
-	struct task_struct *p = cwq->thread;
-
-	if (p != NULL) {
-		if (cpu >= 0)
-			kthread_bind(p, cpu);
-		wake_up_process(p);
-	}
-}
-
 struct workqueue_struct *__create_workqueue_key(const char *name,
 						unsigned int flags,
 						struct lock_class_key *key,
@@ -1268,7 +1351,8 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
 {
 	bool singlethread = flags & WQ_SINGLE_THREAD;
 	struct workqueue_struct *wq;
-	int err = 0, cpu;
+	bool failed = false;
+	unsigned int cpu;
 
 	wq = kzalloc(sizeof(*wq), GFP_KERNEL);
 	if (!wq)
@@ -1298,23 +1382,25 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
 		struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
 
 		BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
+		cwq->cpu = cpu;
 		cwq->wq = wq;
 		cwq->flush_color = -1;
 		spin_lock_init(&cwq->lock);
 		INIT_LIST_HEAD(&cwq->worklist);
 		init_waitqueue_head(&cwq->more_work);
 
-		if (err)
+		if (failed)
 			continue;
-		err = create_workqueue_thread(cwq, cpu);
-		if (cpu_online(cpu) && !singlethread)
-			start_workqueue_thread(cwq, cpu);
+		cwq->worker = create_worker(cwq,
+					    cpu_online(cpu) && !singlethread);
+		if (cwq->worker)
+			start_worker(cwq->worker);
 		else
-			start_workqueue_thread(cwq, -1);
+			failed = true;
 	}
 	cpu_maps_update_done();
 
-	if (err) {
+	if (failed) {
 		destroy_workqueue(wq);
 		wq = NULL;
 	}
@@ -1353,9 +1439,9 @@ void destroy_workqueue(struct workqueue_struct *wq)
 		struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
 		int i;
 
-		if (cwq->thread) {
-			kthread_stop(cwq->thread);
-			cwq->thread = NULL;
+		if (cwq->worker) {
+			destroy_worker(cwq->worker);
+			cwq->worker = NULL;
 		}
 
 		for (i = 0; i < WORK_NR_COLORS; i++)
@@ -1385,8 +1471,8 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
 
 		switch (action) {
 		case CPU_ONLINE:
-			__set_cpus_allowed(cwq->thread, get_cpu_mask(cpu),
-					   true);
+			__set_cpus_allowed(cwq->worker->task,
+					   get_cpu_mask(cpu), true);
 			break;
 
 		case CPU_POST_DEAD:
@@ -1447,6 +1533,8 @@ EXPORT_SYMBOL_GPL(work_on_cpu);
 
 void __init init_workqueues(void)
 {
+	unsigned int cpu;
+
 	/*
 	 * cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
 	 * Make sure that the alignment isn't lower than that of
@@ -1455,6 +1543,9 @@ void __init init_workqueues(void)
 	BUILD_BUG_ON(__alignof__(struct cpu_workqueue_struct) <
 		     __alignof__(unsigned long long));
 
+	for_each_possible_cpu(cpu)
+		ida_init(&per_cpu(worker_ida, cpu));
+
 	singlethread_cpu = cpumask_first(cpu_possible_mask);
 	hotcpu_notifier(workqueue_cpu_callback, 0);
 	keventd_wq = create_workqueue("events");
-- 
1.6.4.2


  parent reply	other threads:[~2009-12-18 13:03 UTC|newest]

Thread overview: 104+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2009-12-18 12:57 Tejun Heo
2009-12-18 12:57 ` [PATCH 01/27] sched: rename preempt_notifiers to sched_notifiers and refactor implementation Tejun Heo
2009-12-18 12:57 ` [PATCH 02/27] sched: refactor try_to_wake_up() Tejun Heo
2009-12-18 12:57 ` [PATCH 03/27] sched: implement __set_cpus_allowed() Tejun Heo
2009-12-18 12:57 ` [PATCH 04/27] sched: make sched_notifiers unconditional Tejun Heo
2009-12-18 12:57 ` [PATCH 05/27] sched: add wakeup/sleep sched_notifiers and allow NULL notifier ops Tejun Heo
2009-12-18 12:57 ` [PATCH 06/27] sched: implement try_to_wake_up_local() Tejun Heo
2009-12-18 12:57 ` [PATCH 07/27] acpi: use queue_work_on() instead of binding workqueue worker to cpu0 Tejun Heo
2009-12-18 12:57 ` [PATCH 08/27] stop_machine: reimplement without using workqueue Tejun Heo
2009-12-18 12:57 ` [PATCH 09/27] workqueue: misc/cosmetic updates Tejun Heo
2009-12-18 12:57 ` [PATCH 10/27] workqueue: merge feature parameters into flags Tejun Heo
2009-12-18 12:57 ` [PATCH 11/27] workqueue: define both bit position and mask for work flags Tejun Heo
2009-12-18 12:57 ` [PATCH 12/27] workqueue: separate out process_one_work() Tejun Heo
2009-12-18 12:57 ` [PATCH 13/27] workqueue: temporarily disable workqueue tracing Tejun Heo
2009-12-18 12:57 ` [PATCH 14/27] workqueue: kill cpu_populated_map Tejun Heo
2009-12-18 12:57 ` [PATCH 15/27] workqueue: update cwq alignement Tejun Heo
2009-12-18 12:57 ` [PATCH 16/27] workqueue: reimplement workqueue flushing using color coded works Tejun Heo
2009-12-18 12:57 ` Tejun Heo [this message]
2009-12-18 12:57 ` [PATCH 18/27] workqueue: reimplement work flushing using linked works Tejun Heo
2009-12-18 12:58 ` [PATCH 19/27] workqueue: implement per-cwq active work limit Tejun Heo
2009-12-18 12:58 ` [PATCH 20/27] workqueue: reimplement workqueue freeze using max_active Tejun Heo
2009-12-18 12:58 ` [PATCH 21/27] workqueue: introduce global cwq and unify cwq locks Tejun Heo
2009-12-18 12:58 ` [PATCH 22/27] workqueue: implement worker states Tejun Heo
2009-12-18 12:58 ` [PATCH 23/27] workqueue: reimplement CPU hotplugging support using trustee Tejun Heo
2009-12-18 12:58 ` [PATCH 24/27] workqueue: make single thread workqueue shared worker pool friendly Tejun Heo
2009-12-18 12:58 ` [PATCH 25/27] workqueue: use shared worklist and pool all workers per cpu Tejun Heo
2009-12-18 12:58 ` [PATCH 26/27] workqueue: implement concurrency managed dynamic worker pool Tejun Heo
2009-12-18 12:58 ` [PATCH 27/27] workqueue: increase max_active of keventd and kill current_is_keventd() Tejun Heo
2009-12-18 13:00 ` SUBJ: [RFC PATCHSET] concurrency managed workqueue, take#2 Tejun Heo
2009-12-18 13:03 ` Tejun Heo
2009-12-18 13:45 ` workqueue thing Peter Zijlstra
2009-12-18 13:50   ` Andi Kleen
2009-12-18 15:01     ` Arjan van de Ven
2009-12-21  3:19       ` Tejun Heo
2009-12-21  9:17       ` Jens Axboe
2009-12-21 10:35         ` Peter Zijlstra
2009-12-21 11:09         ` Andi Kleen
2009-12-21 11:17           ` Arjan van de Ven
2009-12-21 11:33             ` Andi Kleen
2009-12-21 13:18             ` Tejun Heo
2009-12-21 11:11         ` Arjan van de Ven
2009-12-21 13:22           ` Tejun Heo
2009-12-21 13:53             ` Arjan van de Ven
2009-12-21 14:19               ` Tejun Heo
2009-12-21 15:19                 ` Arjan van de Ven
2009-12-22  0:00                   ` Tejun Heo
2009-12-22 11:10                     ` Peter Zijlstra
2009-12-22 17:20                       ` Linus Torvalds
2009-12-22 17:47                         ` Peter Zijlstra
2009-12-22 18:07                           ` Andi Kleen
2009-12-22 18:20                             ` Peter Zijlstra
2009-12-23  8:17                             ` Stijn Devriendt
2009-12-23  8:43                               ` Peter Zijlstra
2009-12-23  9:01                                 ` Stijn Devriendt
2009-12-22 18:28                           ` Linus Torvalds
2009-12-23  8:06                             ` Johannes Berg
2009-12-23  3:37                           ` Tejun Heo
2009-12-23  6:52                             ` Herbert Xu
2009-12-23  8:00                               ` Steffen Klassert
2009-12-23  8:01                                 ` [PATCH 0/2] Parallel crypto/IPsec v7 Steffen Klassert
2009-12-23  8:03                                   ` [PATCH 1/2] padata: generic parallelization/serialization interface Steffen Klassert
2009-12-23  8:04                                   ` [PATCH 2/2] crypto: pcrypt - Add pcrypt crypto parallelization wrapper Steffen Klassert
2010-01-07  5:39                                   ` [PATCH 0/2] Parallel crypto/IPsec v7 Herbert Xu
2010-01-16  9:44                                     ` David Miller
2009-12-18 15:30   ` workqueue thing Linus Torvalds
2009-12-18 15:39     ` Ingo Molnar
2009-12-18 15:39     ` Peter Zijlstra
2009-12-18 15:47       ` Linus Torvalds
2009-12-18 15:53         ` Peter Zijlstra
2009-12-21  3:04   ` Tejun Heo
2009-12-21  9:22     ` Peter Zijlstra
2009-12-21 13:30       ` Tejun Heo
2009-12-21 14:26         ` Peter Zijlstra
2009-12-21 23:50           ` Tejun Heo
2009-12-22 11:00             ` Peter Zijlstra
2009-12-22 11:03             ` Peter Zijlstra
2009-12-23  3:43               ` Tejun Heo
2009-12-22 11:04             ` Peter Zijlstra
2009-12-23  3:48               ` Tejun Heo
2009-12-22 11:06             ` Peter Zijlstra
2009-12-23  4:18               ` Tejun Heo
2009-12-23  4:42                 ` Linus Torvalds
2009-12-23  6:02                   ` Ingo Molnar
2009-12-23  6:13                     ` Jeff Garzik
2009-12-23  7:53                       ` Ingo Molnar
2009-12-23  8:41                       ` Peter Zijlstra
2009-12-23 10:25                         ` Jeff Garzik
2009-12-23 13:33                           ` Stefan Richter
2009-12-23 14:20                           ` Mark Brown
2009-12-23  7:09                     ` Tejun Heo
2009-12-23  8:01                       ` Ingo Molnar
2009-12-23  8:12                         ` Ingo Molnar
2009-12-23  8:32                           ` Tejun Heo
2009-12-23  8:42                             ` Ingo Molnar
2009-12-23  8:27                         ` Tejun Heo
2009-12-23  8:37                           ` Ingo Molnar
2009-12-23  8:49                             ` Tejun Heo
2009-12-23  8:49                               ` Ingo Molnar
2009-12-23  9:03                                 ` Tejun Heo
2009-12-23 13:40                             ` Stefan Richter
2009-12-23 13:43                               ` Stefan Richter
2009-12-23  8:25                       ` Arjan van de Ven
2009-12-23 13:00                     ` Stefan Richter
2009-12-23  8:31             ` Stijn Devriendt

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1261141088-2014-18-git-send-email-tj@kernel.org \
    --to=tj@kernel.org \
    --cc=akpm@linux-foundation.org \
    --cc=andi@firstfloor.org \
    --cc=arjan@linux.intel.com \
    --cc=avi@redhat.com \
    --cc=awalls@radix.net \
    --cc=cl@linux-foundation.org \
    --cc=dhowells@redhat.com \
    --cc=jeff@garzik.org \
    --cc=jens.axboe@oracle.com \
    --cc=johannes@sipsolutions.net \
    --cc=linux-kernel@vger.kernel.org \
    --cc=mingo@elte.hu \
    --cc=peterz@infradead.org \
    --cc=rusty@rustcorp.com.au \
    --cc=torvalds@linux-foundation.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox