From: Tejun Heo <tj@kernel.org>
To: linux-kernel@vger.kernel.org, laijs@cn.fujitsu.com
Cc: axboe@kernel.dk, jmoyer@redhat.com, zab@redhat.com,
Tejun Heo <tj@kernel.org>
Subject: [PATCH 17/31] workqueue: implement attribute-based unbound worker_pool management
Date: Fri, 1 Mar 2013 19:24:08 -0800 [thread overview]
Message-ID: <1362194662-2344-18-git-send-email-tj@kernel.org> (raw)
In-Reply-To: <1362194662-2344-1-git-send-email-tj@kernel.org>
This patch makes unbound worker_pools reference counted and
dynamically created and destroyed as workqueues needing them come and
go. All unbound worker_pools are hashed on unbound_pool_hash which is
keyed by the content of worker_pool->attrs.
When an unbound workqueue is allocated, get_unbound_pool() is called
with the attributes of the workqueue. If there already is a matching
worker_pool, the reference count is bumped and the pool is returned.
If not, a new worker_pool with matching attributes is created and
returned.
When an unbound workqueue is destroyed, put_unbound_pool() is called
which decrements the reference count of the associated worker_pool.
If the refcnt reaches zero, the worker_pool is destroyed in sched-RCU
safe way.
Note that the standard unbound worker_pools - normal and highpri ones
with no specific cpumask affinity - are no longer created explicitly
during init_workqueues(). init_workqueues() only initializes
workqueue_attrs to be used for standard unbound pools -
unbound_std_wq_attrs[]. The pools are spawned on demand as workqueues
are created.
Signed-off-by: Tejun Heo <tj@kernel.org>
---
kernel/workqueue.c | 230 ++++++++++++++++++++++++++++++++++++++++++++++++++---
1 file changed, 218 insertions(+), 12 deletions(-)
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 7eba824..fb91b67 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -41,6 +41,7 @@
#include <linux/debug_locks.h>
#include <linux/lockdep.h>
#include <linux/idr.h>
+#include <linux/jhash.h>
#include <linux/hashtable.h>
#include <linux/rculist.h>
@@ -80,6 +81,7 @@ enum {
NR_STD_WORKER_POOLS = 2, /* # standard pools per cpu */
+ UNBOUND_POOL_HASH_ORDER = 6, /* hashed by pool->attrs */
BUSY_WORKER_HASH_ORDER = 6, /* 64 pointers */
MAX_IDLE_WORKERS_RATIO = 4, /* 1/4 of busy can be idle */
@@ -149,6 +151,8 @@ struct worker_pool {
struct ida worker_ida; /* L: for worker IDs */
struct workqueue_attrs *attrs; /* I: worker attributes */
+ struct hlist_node hash_node; /* R: unbound_pool_hash node */
+ atomic_t refcnt; /* refcnt for unbound pools */
/*
* The current concurrency level. As it's likely to be accessed
@@ -156,6 +160,12 @@ struct worker_pool {
* cacheline.
*/
atomic_t nr_running ____cacheline_aligned_in_smp;
+
+ /*
+ * Destruction of pool is sched-RCU protected to allow dereferences
+ * from get_work_pool().
+ */
+ struct rcu_head rcu;
} ____cacheline_aligned_in_smp;
/*
@@ -218,6 +228,11 @@ struct workqueue_struct {
static struct kmem_cache *pwq_cache;
+/* hash of all unbound pools keyed by pool->attrs */
+static DEFINE_HASHTABLE(unbound_pool_hash, UNBOUND_POOL_HASH_ORDER);
+
+static struct workqueue_attrs *unbound_std_wq_attrs[NR_STD_WORKER_POOLS];
+
struct workqueue_struct *system_wq __read_mostly;
EXPORT_SYMBOL_GPL(system_wq);
struct workqueue_struct *system_highpri_wq __read_mostly;
@@ -1740,7 +1755,7 @@ static struct worker *create_worker(struct worker_pool *pool)
worker->pool = pool;
worker->id = id;
- if (pool->cpu != WORK_CPU_UNBOUND)
+ if (pool->cpu >= 0)
worker->task = kthread_create_on_node(worker_thread,
worker, cpu_to_node(pool->cpu),
"kworker/%d:%d%s", pool->cpu, id, pri);
@@ -3159,6 +3174,54 @@ fail:
return NULL;
}
+static void copy_workqueue_attrs(struct workqueue_attrs *to,
+ const struct workqueue_attrs *from)
+{
+ to->nice = from->nice;
+ cpumask_copy(to->cpumask, from->cpumask);
+}
+
+/*
+ * Hacky implementation of jhash of bitmaps which only considers the
+ * specified number of bits. We probably want a proper implementation in
+ * include/linux/jhash.h.
+ */
+static u32 jhash_bitmap(const unsigned long *bitmap, int bits, u32 hash)
+{
+ int nr_longs = bits / BITS_PER_LONG;
+ int nr_leftover = bits % BITS_PER_LONG;
+ unsigned long leftover = 0;
+
+ if (nr_longs)
+ hash = jhash(bitmap, nr_longs * sizeof(long), hash);
+ if (nr_leftover) {
+ bitmap_copy(&leftover, bitmap + nr_longs, nr_leftover);
+ hash = jhash(&leftover, sizeof(long), hash);
+ }
+ return hash;
+}
+
+/* hash value of the content of @attr */
+static u32 wqattrs_hash(const struct workqueue_attrs *attrs)
+{
+ u32 hash = 0;
+
+ hash = jhash_1word(attrs->nice, hash);
+ hash = jhash_bitmap(cpumask_bits(attrs->cpumask), nr_cpu_ids, hash);
+ return hash;
+}
+
+/* content equality test */
+static bool wqattrs_equal(const struct workqueue_attrs *a,
+ const struct workqueue_attrs *b)
+{
+ if (a->nice != b->nice)
+ return false;
+ if (!cpumask_equal(a->cpumask, b->cpumask))
+ return false;
+ return true;
+}
+
/**
* init_worker_pool - initialize a newly zalloc'd worker_pool
* @pool: worker_pool to initialize
@@ -3169,6 +3232,8 @@ fail:
static int init_worker_pool(struct worker_pool *pool)
{
spin_lock_init(&pool->lock);
+ pool->id = -1;
+ pool->cpu = -1;
pool->flags |= POOL_DISASSOCIATED;
INIT_LIST_HEAD(&pool->worklist);
INIT_LIST_HEAD(&pool->idle_list);
@@ -3185,12 +3250,133 @@ static int init_worker_pool(struct worker_pool *pool)
mutex_init(&pool->assoc_mutex);
ida_init(&pool->worker_ida);
+ INIT_HLIST_NODE(&pool->hash_node);
+ atomic_set(&pool->refcnt, 1);
pool->attrs = alloc_workqueue_attrs(GFP_KERNEL);
if (!pool->attrs)
return -ENOMEM;
return 0;
}
+static void rcu_free_pool(struct rcu_head *rcu)
+{
+ struct worker_pool *pool = container_of(rcu, struct worker_pool, rcu);
+
+ ida_destroy(&pool->worker_ida);
+ free_workqueue_attrs(pool->attrs);
+ kfree(pool);
+}
+
+/**
+ * put_unbound_pool - put a worker_pool
+ * @pool: worker_pool to put
+ *
+ * Put @pool. If its refcnt reaches zero, it gets destroyed in sched-RCU
+ * safe manner.
+ */
+static void put_unbound_pool(struct worker_pool *pool)
+{
+ struct worker *worker;
+
+ if (!atomic_dec_and_test(&pool->refcnt))
+ return;
+
+ /* sanity checks */
+ if (WARN_ON(!(pool->flags & POOL_DISASSOCIATED)))
+ return;
+ if (WARN_ON(pool->nr_workers != pool->nr_idle))
+ return;
+ if (WARN_ON(!list_empty(&pool->worklist)))
+ return;
+
+ /* release id and unhash */
+ spin_lock_irq(&workqueue_lock);
+ if (pool->id >= 0)
+ idr_remove(&worker_pool_idr, pool->id);
+ hash_del(&pool->hash_node);
+ spin_unlock_irq(&workqueue_lock);
+
+ /* lock out manager and destroy all workers */
+ mutex_lock(&pool->manager_mutex);
+ spin_lock_irq(&pool->lock);
+
+ while ((worker = first_worker(pool)))
+ destroy_worker(worker);
+ WARN_ON(pool->nr_workers || pool->nr_idle);
+
+ spin_unlock_irq(&pool->lock);
+ mutex_unlock(&pool->manager_mutex);
+
+ /* shut down the timers */
+ del_timer_sync(&pool->idle_timer);
+ del_timer_sync(&pool->mayday_timer);
+
+ /* sched-RCU protected to allow dereferences from get_work_pool() */
+ call_rcu_sched(&pool->rcu, rcu_free_pool);
+}
+
+/**
+ * get_unbound_pool - get a worker_pool with the specified attributes
+ * @attrs: the attributes of the worker_pool to get
+ *
+ * Obtain a worker_pool which has the same attributes as @attrs, bump the
+ * reference count and return it. If there already is a matching
+ * worker_pool, it will be used; otherwise, this function attempts to
+ * create a new one. On failure, returns NULL.
+ */
+static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
+{
+ static DEFINE_MUTEX(create_mutex);
+ u32 hash = wqattrs_hash(attrs);
+ struct worker_pool *pool;
+ struct hlist_node *tmp;
+ struct worker *worker;
+
+ mutex_lock(&create_mutex);
+
+ /* do we already have a matching pool? */
+ spin_lock_irq(&workqueue_lock);
+ hash_for_each_possible(unbound_pool_hash, pool, tmp, hash_node, hash) {
+ if (wqattrs_equal(pool->attrs, attrs)) {
+ atomic_inc(&pool->refcnt);
+ goto out_unlock;
+ }
+ }
+ spin_unlock_irq(&workqueue_lock);
+
+ /* nope, create a new one */
+ pool = kzalloc(sizeof(*pool), GFP_KERNEL);
+ if (!pool || init_worker_pool(pool) < 0)
+ goto fail;
+
+ copy_workqueue_attrs(pool->attrs, attrs);
+
+ if (worker_pool_assign_id(pool) < 0)
+ goto fail;
+
+ /* create and start the initial worker */
+ worker = create_worker(pool);
+ if (!worker)
+ goto fail;
+
+ spin_lock_irq(&pool->lock);
+ start_worker(worker);
+ spin_unlock_irq(&pool->lock);
+
+ /* install */
+ spin_lock_irq(&workqueue_lock);
+ hash_add(unbound_pool_hash, &pool->hash_node, hash);
+out_unlock:
+ spin_unlock_irq(&workqueue_lock);
+ mutex_unlock(&create_mutex);
+ return pool;
+fail:
+ mutex_unlock(&create_mutex);
+ if (pool)
+ put_unbound_pool(pool);
+ return NULL;
+}
+
static int alloc_and_link_pwqs(struct workqueue_struct *wq)
{
bool highpri = wq->flags & WQ_HIGHPRI;
@@ -3215,7 +3401,12 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq)
if (!pwq)
return -ENOMEM;
- pwq->pool = get_std_worker_pool(WORK_CPU_UNBOUND, highpri);
+ pwq->pool = get_unbound_pool(unbound_std_wq_attrs[highpri]);
+ if (!pwq->pool) {
+ kmem_cache_free(pwq_cache, pwq);
+ return -ENOMEM;
+ }
+
list_add_tail_rcu(&pwq->pwqs_node, &wq->pwqs);
}
@@ -3393,6 +3584,15 @@ void destroy_workqueue(struct workqueue_struct *wq)
kfree(wq->rescuer);
}
+ /*
+ * We're the sole accessor of @wq at this point. Directly access
+ * the first pwq and put its pool.
+ */
+ if (wq->flags & WQ_UNBOUND) {
+ pwq = list_first_entry(&wq->pwqs, struct pool_workqueue,
+ pwqs_node);
+ put_unbound_pool(pwq->pool);
+ }
free_pwqs(wq);
kfree(wq);
}
@@ -3856,19 +4056,14 @@ static int __init init_workqueues(void)
hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN);
/* initialize CPU pools */
- for_each_wq_cpu(cpu) {
+ for_each_possible_cpu(cpu) {
struct worker_pool *pool;
i = 0;
for_each_std_worker_pool(pool, cpu) {
BUG_ON(init_worker_pool(pool));
pool->cpu = cpu;
-
- if (cpu != WORK_CPU_UNBOUND)
- cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
- else
- cpumask_setall(pool->attrs->cpumask);
-
+ cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
pool->attrs->nice = std_nice[i++];
/* alloc pool ID */
@@ -3877,14 +4072,13 @@ static int __init init_workqueues(void)
}
/* create the initial worker */
- for_each_online_wq_cpu(cpu) {
+ for_each_online_cpu(cpu) {
struct worker_pool *pool;
for_each_std_worker_pool(pool, cpu) {
struct worker *worker;
- if (cpu != WORK_CPU_UNBOUND)
- pool->flags &= ~POOL_DISASSOCIATED;
+ pool->flags &= ~POOL_DISASSOCIATED;
worker = create_worker(pool);
BUG_ON(!worker);
@@ -3894,6 +4088,18 @@ static int __init init_workqueues(void)
}
}
+ /* create default unbound wq attrs */
+ for (i = 0; i < NR_STD_WORKER_POOLS; i++) {
+ struct workqueue_attrs *attrs;
+
+ BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL)));
+
+ attrs->nice = std_nice[i];
+ cpumask_setall(attrs->cpumask);
+
+ unbound_std_wq_attrs[i] = attrs;
+ }
+
system_wq = alloc_workqueue("events", 0, 0);
system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
system_long_wq = alloc_workqueue("events_long", 0, 0);
--
1.8.1.2
next prev parent reply other threads:[~2013-03-02 3:30 UTC|newest]
Thread overview: 77+ messages / expand[flat|nested] mbox.gz Atom feed top
2013-03-02 3:23 [PATCHSET wq/for-3.10-tmp] workqueue: implement workqueue with custom worker attributes Tejun Heo
2013-03-02 3:23 ` [PATCH 01/31] workqueue: make sanity checks less punshing using WARN_ON[_ONCE]()s Tejun Heo
2013-03-02 3:23 ` [PATCH 02/31] workqueue: make workqueue_lock irq-safe Tejun Heo
2013-03-02 3:23 ` [PATCH 03/31] workqueue: introduce kmem_cache for pool_workqueues Tejun Heo
2013-03-02 3:23 ` [PATCH 04/31] workqueue: add workqueue_struct->pwqs list Tejun Heo
2013-03-02 3:23 ` [PATCH 05/31] workqueue: replace for_each_pwq_cpu() with for_each_pwq() Tejun Heo
2013-03-02 3:23 ` [PATCH 06/31] workqueue: introduce for_each_pool() Tejun Heo
2013-03-02 3:23 ` [PATCH 07/31] workqueue: restructure pool / pool_workqueue iterations in freeze/thaw functions Tejun Heo
2013-03-10 10:09 ` Lai Jiangshan
2013-03-10 12:34 ` Tejun Heo
2013-03-02 3:23 ` [PATCH 08/31] workqueue: add wokrqueue_struct->maydays list to replace mayday cpu iterators Tejun Heo
2013-03-02 3:24 ` [PATCH 09/31] workqueue: consistently use int for @cpu variables Tejun Heo
2013-03-02 3:24 ` [PATCH 10/31] workqueue: remove workqueue_struct->pool_wq.single Tejun Heo
2013-03-02 3:24 ` [PATCH 11/31] workqueue: replace get_pwq() with explicit per_cpu_ptr() accesses and first_pwq() Tejun Heo
2013-03-02 3:24 ` [PATCH 12/31] workqueue: update synchronization rules on workqueue->pwqs Tejun Heo
2013-03-10 10:09 ` Lai Jiangshan
2013-03-10 12:38 ` Tejun Heo
2013-03-12 18:20 ` [PATCH v2 " Tejun Heo
2013-03-02 3:24 ` [PATCH 13/31] workqueue: update synchronization rules on worker_pool_idr Tejun Heo
2013-03-12 18:20 ` [PATCH v2 " Tejun Heo
2013-03-02 3:24 ` [PATCH 14/31] workqueue: replace POOL_MANAGING_WORKERS flag with worker_pool->manager_mutex Tejun Heo
2013-03-10 10:09 ` Lai Jiangshan
2013-03-10 12:46 ` Tejun Heo
2013-03-12 18:19 ` [PATCH v2 " Tejun Heo
2013-03-02 3:24 ` [PATCH 15/31] workqueue: separate out init_worker_pool() from init_workqueues() Tejun Heo
2013-03-02 3:24 ` [PATCH 16/31] workqueue: introduce workqueue_attrs Tejun Heo
2013-03-04 18:37 ` [PATCH v2 " Tejun Heo
2013-03-05 22:29 ` Ryan Mallon
2013-03-05 22:33 ` Tejun Heo
2013-03-05 22:34 ` Tejun Heo
2013-03-05 22:40 ` Ryan Mallon
2013-03-05 22:44 ` Tejun Heo
2013-03-05 23:20 ` Ryan Mallon
2013-03-05 23:28 ` Tejun Heo
2013-03-02 3:24 ` Tejun Heo [this message]
2013-03-10 10:08 ` [PATCH 17/31] workqueue: implement attribute-based unbound worker_pool management Lai Jiangshan
2013-03-10 12:58 ` Tejun Heo
2013-03-10 18:36 ` Tejun Heo
2013-03-12 18:21 ` [PATCH v2 " Tejun Heo
2013-03-02 3:24 ` [PATCH 18/31] workqueue: remove unbound_std_worker_pools[] and related helpers Tejun Heo
2013-03-02 3:24 ` [PATCH 19/31] workqueue: drop "std" from cpu_std_worker_pools and for_each_std_worker_pool() Tejun Heo
2013-03-02 3:24 ` [PATCH 20/31] workqueue: add pool ID to the names of unbound kworkers Tejun Heo
2013-03-02 3:24 ` [PATCH 21/31] workqueue: drop WQ_RESCUER and test workqueue->rescuer for NULL instead Tejun Heo
2013-03-02 3:24 ` [PATCH 22/31] workqueue: restructure __alloc_workqueue_key() Tejun Heo
2013-03-02 3:24 ` [PATCH 23/31] workqueue: implement get/put_pwq() Tejun Heo
2013-03-02 3:24 ` [PATCH 24/31] workqueue: prepare flush_workqueue() for dynamic creation and destrucion of unbound pool_workqueues Tejun Heo
2013-03-02 3:24 ` [PATCH 25/31] workqueue: perform non-reentrancy test when queueing to unbound workqueues too Tejun Heo
2013-03-02 3:24 ` [PATCH 26/31] workqueue: implement apply_workqueue_attrs() Tejun Heo
2013-03-02 3:24 ` [PATCH 27/31] workqueue: make it clear that WQ_DRAINING is an internal flag Tejun Heo
2013-03-02 3:24 ` [PATCH 28/31] workqueue: reject increasing max_active for ordered workqueues Tejun Heo
2013-03-04 18:30 ` [PATCH UPDATED 28/31] workqueue: reject adjusting max_active or applying attrs to " Tejun Heo
2013-03-02 3:24 ` [PATCH 29/31] cpumask: implement cpumask_parse() Tejun Heo
2013-03-02 3:24 ` [PATCH 30/31] driver/base: implement subsys_virtual_register() Tejun Heo
2013-03-02 18:17 ` Greg Kroah-Hartman
2013-03-02 20:26 ` Tejun Heo
2013-03-03 6:42 ` Kay Sievers
2013-03-05 20:43 ` Tejun Heo
2013-03-07 23:31 ` Greg Kroah-Hartman
2013-03-08 0:04 ` Kay Sievers
2013-03-10 11:57 ` Tejun Heo
2013-03-10 16:45 ` Greg Kroah-Hartman
2013-03-10 17:00 ` Kay Sievers
2013-03-10 17:24 ` Greg Kroah-Hartman
2013-03-10 17:50 ` Kay Sievers
2013-03-10 18:34 ` Tejun Heo
2013-03-12 18:40 ` Tejun Heo
2013-03-02 3:24 ` [PATCH 31/31] workqueue: implement sysfs interface for workqueues Tejun Heo
2013-03-04 18:30 ` [PATCH v2 " Tejun Heo
2013-03-05 20:41 ` [PATCHSET wq/for-3.10-tmp] workqueue: implement workqueue with custom worker attributes Tejun Heo
2013-03-10 10:34 ` Lai Jiangshan
2013-03-10 12:01 ` Tejun Heo
2013-03-11 15:24 ` Tejun Heo
2013-03-11 15:40 ` Lai Jiangshan
2013-03-11 15:42 ` Lai Jiangshan
2013-03-11 15:43 ` Tejun Heo
2013-03-12 18:10 ` Tejun Heo
2013-03-12 18:34 ` Tejun Heo
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1362194662-2344-18-git-send-email-tj@kernel.org \
--to=tj@kernel.org \
--cc=axboe@kernel.dk \
--cc=jmoyer@redhat.com \
--cc=laijs@cn.fujitsu.com \
--cc=linux-kernel@vger.kernel.org \
--cc=zab@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.