linux-ide.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH 0/3] Convert libata pio task to slow-work
@ 2009-08-27  9:08 Jens Axboe
  2009-08-27  9:08 ` [PATCH 1/3] slow-work: add delayed_slow_work support Jens Axboe
  2009-08-27 12:36 ` [PATCH 0/3] Convert libata pio task to slow-work Tejun Heo
  0 siblings, 2 replies; 13+ messages in thread
From: Jens Axboe @ 2009-08-27  9:08 UTC (permalink / raw)
  To: linux-kernel, linux-ide; +Cc: tj, alan, jeff, dhowells

Hi,

This patchset adds support for slow-work for delayed slow work and
for cancelling slow work. Note that these patches are totally
untested!

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCH 1/3] slow-work: add delayed_slow_work support
  2009-08-27  9:08 [PATCH 0/3] Convert libata pio task to slow-work Jens Axboe
@ 2009-08-27  9:08 ` Jens Axboe
  2009-08-27  9:08   ` [PATCH 2/3] slow-work: add support for cancellation of slow work Jens Axboe
  2009-08-27 12:36 ` [PATCH 0/3] Convert libata pio task to slow-work Tejun Heo
  1 sibling, 1 reply; 13+ messages in thread
From: Jens Axboe @ 2009-08-27  9:08 UTC (permalink / raw)
  To: linux-kernel, linux-ide; +Cc: tj, alan, jeff, dhowells, Jens Axboe

This adds support for starting slow work with a delay, similar
to the functionality we have for workqueues.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
---
 include/linux/slow-work.h |   15 +++++++++++
 kernel/slow-work.c        |   59 +++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 74 insertions(+), 0 deletions(-)

diff --git a/include/linux/slow-work.h b/include/linux/slow-work.h
index b65c888..12827a8 100644
--- a/include/linux/slow-work.h
+++ b/include/linux/slow-work.h
@@ -51,6 +51,11 @@ struct slow_work {
 	struct list_head	link;	/* link in queue */
 };
 
+struct delayed_slow_work {
+	struct timer_list timer;
+	struct slow_work work;
+};
+
 /**
  * slow_work_init - Initialise a slow work item
  * @work: The work item to initialise
@@ -66,6 +71,13 @@ static inline void slow_work_init(struct slow_work *work,
 	INIT_LIST_HEAD(&work->link);
 }
 
+static inline void delayed_slow_work_init(struct delayed_slow_work *dwork,
+					  const struct slow_work_ops *ops)
+{
+	init_timer(&dwork->timer);
+	slow_work_init(&dwork->work, ops);
+}
+
 /**
  * vslow_work_init - Initialise a very slow work item
  * @work: The work item to initialise
@@ -87,6 +99,9 @@ extern int slow_work_enqueue(struct slow_work *work);
 extern int slow_work_register_user(void);
 extern void slow_work_unregister_user(void);
 
+extern int delayed_slow_work_enqueue(struct delayed_slow_work *dwork,
+					unsigned long delay);
+
 #ifdef CONFIG_SYSCTL
 extern ctl_table slow_work_sysctls[];
 #endif
diff --git a/kernel/slow-work.c b/kernel/slow-work.c
index 09d7519..1eeda59 100644
--- a/kernel/slow-work.c
+++ b/kernel/slow-work.c
@@ -318,6 +318,65 @@ cant_get_ref:
 }
 EXPORT_SYMBOL(slow_work_enqueue);
 
+static void delayed_slow_work_timer(unsigned long data)
+{
+	struct slow_work *work = (struct slow_work *) data;
+	unsigned long flags;
+
+	spin_lock_irqsave(&slow_work_queue_lock, flags);
+	if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags))
+		list_add_tail(&work->link, &vslow_work_queue);
+	else
+		list_add_tail(&work->link, &slow_work_queue);
+	spin_unlock_irqrestore(&slow_work_queue_lock, flags);
+
+	wake_up(&slow_work_thread_wq);
+}
+
+/**
+ * delayed_slow_work_enqueue - Schedule a delayed slow work item for processing
+ * @dwork: The delayed work item to queue
+ * @delay: When to start executing the work
+ *
+ * See slow_work_enqueue(), this functions adds a delay before the work
+ * is actually started. The act of queuing the work is not delayed.
+ */
+int delayed_slow_work_enqueue(struct delayed_slow_work *dwork,
+			      unsigned long delay)
+{
+	struct slow_work *work = &dwork->work;
+	unsigned long flags;
+
+	BUG_ON(slow_work_user_count <= 0);
+	BUG_ON(!work);
+	BUG_ON(!work->ops);
+	BUG_ON(!work->ops->get_ref);
+
+	if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
+		spin_lock_irqsave(&slow_work_queue_lock, flags);
+
+		if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
+			set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
+		} else {
+			if (work->ops->get_ref(work) < 0)
+				goto cant_get_ref;
+		}
+
+		spin_unlock_irqrestore(&slow_work_queue_lock, flags);
+		dwork->timer.expires = jiffies + delay;
+		dwork->timer.data = (unsigned long) work;
+		dwork->timer.function = delayed_slow_work_timer;
+		add_timer(&dwork->timer);
+	}
+
+	return 0;
+
+cant_get_ref:
+	spin_unlock_irqrestore(&slow_work_queue_lock, flags);
+	return -EAGAIN;
+}
+EXPORT_SYMBOL(delayed_slow_work_enqueue);
+
 /*
  * Schedule a cull of the thread pool at some time in the near future
  */
-- 
1.6.4.1.207.g68ea

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 2/3] slow-work: add support for cancellation of slow work
  2009-08-27  9:08 ` [PATCH 1/3] slow-work: add delayed_slow_work support Jens Axboe
@ 2009-08-27  9:08   ` Jens Axboe
  2009-08-27  9:08     ` [PATCH 3/3] libata: switch pio task from workqueue to slow-work Jens Axboe
  2009-08-27  9:14     ` [PATCH 2/3] slow-work: add support for cancellation of slow work Jens Axboe
  0 siblings, 2 replies; 13+ messages in thread
From: Jens Axboe @ 2009-08-27  9:08 UTC (permalink / raw)
  To: linux-kernel, linux-ide; +Cc: tj, alan, jeff, dhowells, Jens Axboe

This adds support for cancellation of queued slow work and delayed
slow work. If the work is already queued, it will not be executed.
If work is attempted queued while the cancellation is running,
it will fail.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
---
 include/linux/slow-work.h |    3 +++
 kernel/slow-work.c        |   42 ++++++++++++++++++++++++++++++++++++++++--
 2 files changed, 43 insertions(+), 2 deletions(-)

diff --git a/include/linux/slow-work.h b/include/linux/slow-work.h
index 12827a8..f2fd4e1 100644
--- a/include/linux/slow-work.h
+++ b/include/linux/slow-work.h
@@ -47,6 +47,7 @@ struct slow_work {
 #define SLOW_WORK_EXECUTING	1	/* item currently executing */
 #define SLOW_WORK_ENQ_DEFERRED	2	/* item enqueue deferred */
 #define SLOW_WORK_VERY_SLOW	3	/* item is very slow */
+#define SLOW_WORK_CANCEL	4	/* item is cancelled, don't enqueue */
 	const struct slow_work_ops *ops; /* operations table for this item */
 	struct list_head	link;	/* link in queue */
 };
@@ -96,11 +97,13 @@ static inline void vslow_work_init(struct slow_work *work,
 }
 
 extern int slow_work_enqueue(struct slow_work *work);
+extern void cancel_slow_work(struct slow_work *work);
 extern int slow_work_register_user(void);
 extern void slow_work_unregister_user(void);
 
 extern int delayed_slow_work_enqueue(struct delayed_slow_work *dwork,
 					unsigned long delay);
+extern void cancel_delayed_slow_work(struct delayed_slow_work *dwork);
 
 #ifdef CONFIG_SYSCTL
 extern ctl_table slow_work_sysctls[];
diff --git a/kernel/slow-work.c b/kernel/slow-work.c
index 1eeda59..9b62bdc 100644
--- a/kernel/slow-work.c
+++ b/kernel/slow-work.c
@@ -194,12 +194,21 @@ static bool slow_work_execute(void)
 	if (!test_and_clear_bit(SLOW_WORK_PENDING, &work->flags))
 		BUG();
 
-	work->ops->execute(work);
+	/*
+	 * Don't execute if the work was cancelled after being added
+	 */
+	if (!test_bit(SLOW_WORK_CANCEL, &work->flags))
+		work->ops->execute(work);
 
 	if (very_slow)
 		atomic_dec(&vslow_work_executing_count);
 	clear_bit_unlock(SLOW_WORK_EXECUTING, &work->flags);
 
+	/*
+	 * Wake anyone waiting for this work to be done
+	 */
+	wake_up_bit(&work->flags, SLOW_WORK_EXECUTING);
+
 	/* if someone tried to enqueue the item whilst we were executing it,
 	 * then it'll be left unenqueued to avoid multiple threads trying to
 	 * execute it simultaneously
@@ -260,12 +269,16 @@ auto_requeue:
  * allowed to pick items to execute.  This ensures that very slow items won't
  * overly block ones that are just ordinarily slow.
  *
- * Returns 0 if successful, -EAGAIN if not.
+ * Returns 0 if successful, -EAGAIN if not (or -EBUSY if cancelled work is
+ * attempted queued)
  */
 int slow_work_enqueue(struct slow_work *work)
 {
 	unsigned long flags;
 
+	if (test_bit(SLOW_WORK_CANCEL, &work->flags))
+		return -EINVAL;
+
 	BUG_ON(slow_work_user_count <= 0);
 	BUG_ON(!work);
 	BUG_ON(!work->ops);
@@ -347,6 +360,9 @@ int delayed_slow_work_enqueue(struct delayed_slow_work *dwork,
 	struct slow_work *work = &dwork->work;
 	unsigned long flags;
 
+	if (test_bit(SLOW_WORK_CANCEL, &work->flags))
+		return -EINVAL;
+
 	BUG_ON(slow_work_user_count <= 0);
 	BUG_ON(!work);
 	BUG_ON(!work->ops);
@@ -377,6 +393,28 @@ cant_get_ref:
 }
 EXPORT_SYMBOL(delayed_slow_work_enqueue);
 
+static int slow_work_wait(void *word)
+{
+	schedule();
+	return 0;
+}
+
+void cancel_slow_work(struct slow_work *work)
+{
+	set_bit(SLOW_WORK_CANCEL, &work->flags);
+	wait_on_bit(&work->flags, SLOW_WORK_EXECUTING, slow_work_wait,
+				TASK_UNINTERRUPTIBLE);
+	clear_bit(SLOW_WORK_CANCEL, &work->flags);
+}
+EXPORT_SYMBOL(cancel_slow_work);
+
+void cancel_delayed_slow_work(struct delayed_slow_work *dwork)
+{
+	del_timer(&dwork->timer);
+	cancel_slow_work(&dwork->work);
+}
+EXPORT_SYMBOL(cancel_delayed_slow_work);
+
 /*
  * Schedule a cull of the thread pool at some time in the near future
  */
-- 
1.6.4.1.207.g68ea


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 3/3] libata: switch pio task from workqueue to slow-work
  2009-08-27  9:08   ` [PATCH 2/3] slow-work: add support for cancellation of slow work Jens Axboe
@ 2009-08-27  9:08     ` Jens Axboe
  2009-08-27  9:14     ` [PATCH 2/3] slow-work: add support for cancellation of slow work Jens Axboe
  1 sibling, 0 replies; 13+ messages in thread
From: Jens Axboe @ 2009-08-27  9:08 UTC (permalink / raw)
  To: linux-kernel, linux-ide; +Cc: tj, alan, jeff, dhowells, Jens Axboe

A workqueue isn't a good fit for the pio task:

- It does not require per-CPU support, thus wasting many threads.
- The pio task would like to have more than one thread per CPU
  in some cases, for the single CPU case of having more than one
  pio device active.

So convert to slow-work instead, this is now possible with support
for delayed slow work and cancellation.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
---
 drivers/ata/libata-core.c |   35 ++++++++++++++++++++++-------------
 drivers/ata/libata-sff.c  |    2 +-
 drivers/ata/libata.h      |    2 +-
 include/linux/libata.h    |    3 ++-
 4 files changed, 26 insertions(+), 16 deletions(-)

diff --git a/drivers/ata/libata-core.c b/drivers/ata/libata-core.c
index 072ba5e..e3f55a2 100644
--- a/drivers/ata/libata-core.c
+++ b/drivers/ata/libata-core.c
@@ -95,7 +95,6 @@ static void ata_dev_xfermask(struct ata_device *dev);
 static unsigned long ata_dev_blacklisted(const struct ata_device *dev);
 
 unsigned int ata_print_id = 1;
-static struct workqueue_struct *ata_wq;
 
 struct workqueue_struct *ata_aux_wq;
 
@@ -1700,7 +1699,7 @@ void ata_pio_queue_task(struct ata_port *ap, void *data, unsigned long delay)
 	ap->port_task_data = data;
 
 	/* may fail if ata_port_flush_task() in progress */
-	queue_delayed_work(ata_wq, &ap->port_task, msecs_to_jiffies(delay));
+	delayed_slow_work_enqueue(&ap->port_task, msecs_to_jiffies(delay));
 }
 
 /**
@@ -1717,7 +1716,7 @@ void ata_port_flush_task(struct ata_port *ap)
 {
 	DPRINTK("ENTER\n");
 
-	cancel_rearming_delayed_work(&ap->port_task);
+	cancel_delayed_slow_work(&ap->port_task);
 
 	if (ata_msg_ctl(ap))
 		ata_port_printk(ap, KERN_DEBUG, "%s: EXIT\n", __func__);
@@ -5600,6 +5599,20 @@ int sata_link_init_spd(struct ata_link *link)
 	return 0;
 }
 
+static int ata_slow_work_get(struct slow_work *work)
+{
+	return 0;
+}
+
+static const struct slow_work_ops ata_work_ops_sff = {
+	.get_ref	= ata_slow_work_get,
+	.execute	= ata_pio_task,
+};
+
+static const struct slow_work_ops ata_work_ops = {
+	.get_ref	= ata_slow_work_get,
+};
+
 /**
  *	ata_port_alloc - allocate and initialize basic ATA port resources
  *	@host: ATA host this allocated port belongs to
@@ -5641,9 +5654,9 @@ struct ata_port *ata_port_alloc(struct ata_host *host)
 #endif
 
 #ifdef CONFIG_ATA_SFF
-	INIT_DELAYED_WORK(&ap->port_task, ata_pio_task);
+	delayed_slow_work_init(&ap->port_task, &ata_work_ops_sff);
 #else
-	INIT_DELAYED_WORK(&ap->port_task, NULL);
+	delayed_slow_work_init(&ap->port_task, &ata_work_ops);
 #endif
 	INIT_DELAYED_WORK(&ap->hotplug_task, ata_scsi_hotplug);
 	INIT_WORK(&ap->scsi_rescan_task, ata_scsi_dev_rescan);
@@ -6580,19 +6593,15 @@ static int __init ata_init(void)
 {
 	ata_parse_force_param();
 
-	ata_wq = create_workqueue("ata");
-	if (!ata_wq)
-		goto free_force_tbl;
-
 	ata_aux_wq = create_singlethread_workqueue("ata_aux");
 	if (!ata_aux_wq)
-		goto free_wq;
+		goto free_force_tbl;
+
+	slow_work_register_user();
 
 	printk(KERN_DEBUG "libata version " DRV_VERSION " loaded.\n");
 	return 0;
 
-free_wq:
-	destroy_workqueue(ata_wq);
 free_force_tbl:
 	kfree(ata_force_tbl);
 	return -ENOMEM;
@@ -6601,8 +6610,8 @@ free_force_tbl:
 static void __exit ata_exit(void)
 {
 	kfree(ata_force_tbl);
-	destroy_workqueue(ata_wq);
 	destroy_workqueue(ata_aux_wq);
+	slow_work_unregister_user();
 }
 
 subsys_initcall(ata_init);
diff --git a/drivers/ata/libata-sff.c b/drivers/ata/libata-sff.c
index bbbb1fa..f795ab7 100644
--- a/drivers/ata/libata-sff.c
+++ b/drivers/ata/libata-sff.c
@@ -1454,7 +1454,7 @@ fsm_start:
 }
 EXPORT_SYMBOL_GPL(ata_sff_hsm_move);
 
-void ata_pio_task(struct work_struct *work)
+void ata_pio_task(struct slow_work *work)
 {
 	struct ata_port *ap =
 		container_of(work, struct ata_port, port_task.work);
diff --git a/drivers/ata/libata.h b/drivers/ata/libata.h
index 89a1e00..d48d14d 100644
--- a/drivers/ata/libata.h
+++ b/drivers/ata/libata.h
@@ -202,7 +202,7 @@ static inline int sata_pmp_attach(struct ata_device *dev)
 extern void ata_dev_select(struct ata_port *ap, unsigned int device,
                            unsigned int wait, unsigned int can_sleep);
 extern u8 ata_irq_on(struct ata_port *ap);
-extern void ata_pio_task(struct work_struct *work);
+extern void ata_pio_task(struct slow_work *work);
 #endif /* CONFIG_ATA_SFF */
 
 #endif /* __LIBATA_H__ */
diff --git a/include/linux/libata.h b/include/linux/libata.h
index e5b6e33..6830c82 100644
--- a/include/linux/libata.h
+++ b/include/linux/libata.h
@@ -34,6 +34,7 @@
 #include <linux/io.h>
 #include <linux/ata.h>
 #include <linux/workqueue.h>
+#include <linux/slow-work.h>
 #include <scsi/scsi_host.h>
 #include <linux/acpi.h>
 #include <linux/cdrom.h>
@@ -734,7 +735,7 @@ struct ata_port {
 	struct device 		*dev;
 
 	void			*port_task_data;
-	struct delayed_work	port_task;
+	struct delayed_slow_work	port_task;
 	struct delayed_work	hotplug_task;
 	struct work_struct	scsi_rescan_task;
 
-- 
1.6.4.1.207.g68ea


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* Re: [PATCH 2/3] slow-work: add support for cancellation of slow work
  2009-08-27  9:08   ` [PATCH 2/3] slow-work: add support for cancellation of slow work Jens Axboe
  2009-08-27  9:08     ` [PATCH 3/3] libata: switch pio task from workqueue to slow-work Jens Axboe
@ 2009-08-27  9:14     ` Jens Axboe
  2009-08-27  9:18       ` [PATCH 2/3] slow-work: add support for cancellation of slow work (updated) Jens Axboe
  1 sibling, 1 reply; 13+ messages in thread
From: Jens Axboe @ 2009-08-27  9:14 UTC (permalink / raw)
  To: linux-kernel, linux-ide; +Cc: tj, alan, jeff, dhowells

On Thu, Aug 27 2009, Jens Axboe wrote:
> @@ -194,12 +194,21 @@ static bool slow_work_execute(void)
>  	if (!test_and_clear_bit(SLOW_WORK_PENDING, &work->flags))
>  		BUG();
>  
> -	work->ops->execute(work);
> +	/*
> +	 * Don't execute if the work was cancelled after being added
> +	 */
> +	if (!test_bit(SLOW_WORK_CANCEL, &work->flags))
> +		work->ops->execute(work);
>  
>  	if (very_slow)
>  		atomic_dec(&vslow_work_executing_count);
>  	clear_bit_unlock(SLOW_WORK_EXECUTING, &work->flags);
>  
> +	/*
> +	 * Wake anyone waiting for this work to be done
> +	 */
> +	wake_up_bit(&work->flags, SLOW_WORK_EXECUTING);
> +
>  	/* if someone tried to enqueue the item whilst we were executing it,
>  	 * then it'll be left unenqueued to avoid multiple threads trying to
>  	 * execute it simultaneously

> +void cancel_slow_work(struct slow_work *work)
> +{
> +	set_bit(SLOW_WORK_CANCEL, &work->flags);
> +	wait_on_bit(&work->flags, SLOW_WORK_EXECUTING, slow_work_wait,
> +				TASK_UNINTERRUPTIBLE);
> +	clear_bit(SLOW_WORK_CANCEL, &work->flags);
> +}
> +EXPORT_SYMBOL(cancel_slow_work);

We want to use SLOW_WORK_PENDING for this logic, not SLOW_WORK_EXECUTING.
I'll update it.

-- 
Jens Axboe

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCH 2/3] slow-work: add support for cancellation of slow work (updated)
  2009-08-27  9:14     ` [PATCH 2/3] slow-work: add support for cancellation of slow work Jens Axboe
@ 2009-08-27  9:18       ` Jens Axboe
  0 siblings, 0 replies; 13+ messages in thread
From: Jens Axboe @ 2009-08-27  9:18 UTC (permalink / raw)
  To: linux-kernel, linux-ide; +Cc: tj, alan, jeff, dhowells


>From 542bd594356125971a518a4c7a3e4ead21ea256a Mon Sep 17 00:00:00 2001
From: Jens Axboe <jens.axboe@oracle.com>
Date: Thu, 27 Aug 2009 11:17:04 +0200
Subject: [PATCH 1/2] slow-work: add support for cancellation of slow work

This adds support for cancellation of queued slow work and delayed
slow work. If the work is already queued, it will not be executed.
If work is attempted queued while the cancellation is running,
it will fail.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
---
 include/linux/slow-work.h |    3 +++
 kernel/slow-work.c        |   43 +++++++++++++++++++++++++++++++++++++++++--
 2 files changed, 44 insertions(+), 2 deletions(-)

diff --git a/include/linux/slow-work.h b/include/linux/slow-work.h
index 12827a8..f2fd4e1 100644
--- a/include/linux/slow-work.h
+++ b/include/linux/slow-work.h
@@ -47,6 +47,7 @@ struct slow_work {
 #define SLOW_WORK_EXECUTING	1	/* item currently executing */
 #define SLOW_WORK_ENQ_DEFERRED	2	/* item enqueue deferred */
 #define SLOW_WORK_VERY_SLOW	3	/* item is very slow */
+#define SLOW_WORK_CANCEL	4	/* item is cancelled, don't enqueue */
 	const struct slow_work_ops *ops; /* operations table for this item */
 	struct list_head	link;	/* link in queue */
 };
@@ -96,11 +97,13 @@ static inline void vslow_work_init(struct slow_work *work,
 }
 
 extern int slow_work_enqueue(struct slow_work *work);
+extern void cancel_slow_work(struct slow_work *work);
 extern int slow_work_register_user(void);
 extern void slow_work_unregister_user(void);
 
 extern int delayed_slow_work_enqueue(struct delayed_slow_work *dwork,
 					unsigned long delay);
+extern void cancel_delayed_slow_work(struct delayed_slow_work *dwork);
 
 #ifdef CONFIG_SYSCTL
 extern ctl_table slow_work_sysctls[];
diff --git a/kernel/slow-work.c b/kernel/slow-work.c
index 1eeda59..a39778a 100644
--- a/kernel/slow-work.c
+++ b/kernel/slow-work.c
@@ -194,7 +194,17 @@ static bool slow_work_execute(void)
 	if (!test_and_clear_bit(SLOW_WORK_PENDING, &work->flags))
 		BUG();
 
-	work->ops->execute(work);
+	/*
+	 * Wake anyone waiting for this work to not be pending anymore
+	 */
+	smp_mb__after_clear_bit();
+	wake_up_bit(&work->flags, SLOW_WORK_PENDING);
+
+	/*
+	 * Don't execute if the work was cancelled after being added
+	 */
+	if (!test_bit(SLOW_WORK_CANCEL, &work->flags))
+		work->ops->execute(work);
 
 	if (very_slow)
 		atomic_dec(&vslow_work_executing_count);
@@ -260,12 +270,16 @@ auto_requeue:
  * allowed to pick items to execute.  This ensures that very slow items won't
  * overly block ones that are just ordinarily slow.
  *
- * Returns 0 if successful, -EAGAIN if not.
+ * Returns 0 if successful, -EAGAIN if not (or -EBUSY if cancelled work is
+ * attempted queued)
  */
 int slow_work_enqueue(struct slow_work *work)
 {
 	unsigned long flags;
 
+	if (test_bit(SLOW_WORK_CANCEL, &work->flags))
+		return -EINVAL;
+
 	BUG_ON(slow_work_user_count <= 0);
 	BUG_ON(!work);
 	BUG_ON(!work->ops);
@@ -347,6 +361,9 @@ int delayed_slow_work_enqueue(struct delayed_slow_work *dwork,
 	struct slow_work *work = &dwork->work;
 	unsigned long flags;
 
+	if (test_bit(SLOW_WORK_CANCEL, &work->flags))
+		return -EINVAL;
+
 	BUG_ON(slow_work_user_count <= 0);
 	BUG_ON(!work);
 	BUG_ON(!work->ops);
@@ -377,6 +394,28 @@ cant_get_ref:
 }
 EXPORT_SYMBOL(delayed_slow_work_enqueue);
 
+static int slow_work_wait(void *word)
+{
+	schedule();
+	return 0;
+}
+
+void cancel_slow_work(struct slow_work *work)
+{
+	set_bit(SLOW_WORK_CANCEL, &work->flags);
+	wait_on_bit(&work->flags, SLOW_WORK_PENDING, slow_work_wait,
+				TASK_UNINTERRUPTIBLE);
+	clear_bit(SLOW_WORK_CANCEL, &work->flags);
+}
+EXPORT_SYMBOL(cancel_slow_work);
+
+void cancel_delayed_slow_work(struct delayed_slow_work *dwork)
+{
+	del_timer(&dwork->timer);
+	cancel_slow_work(&dwork->work);
+}
+EXPORT_SYMBOL(cancel_delayed_slow_work);
+
 /*
  * Schedule a cull of the thread pool at some time in the near future
  */
-- 
1.6.4.1.207.g68ea

-- 
Jens Axboe


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* Re: [PATCH 0/3] Convert libata pio task to slow-work
  2009-08-27  9:08 [PATCH 0/3] Convert libata pio task to slow-work Jens Axboe
  2009-08-27  9:08 ` [PATCH 1/3] slow-work: add delayed_slow_work support Jens Axboe
@ 2009-08-27 12:36 ` Tejun Heo
  2009-08-27 12:49   ` Jens Axboe
  1 sibling, 1 reply; 13+ messages in thread
From: Tejun Heo @ 2009-08-27 12:36 UTC (permalink / raw)
  To: Jens Axboe; +Cc: linux-kernel, linux-ide, alan, jeff, dhowells

Hello, Jens.

Jens Axboe wrote:
> Hi,
> 
> This patchset adds support for slow-work for delayed slow work and
> for cancelling slow work. Note that these patches are totally
> untested!

As what I'm currently working on is likely to collide with these
changes, here is a short summary of what's been going on.

/* excerpted from internal weekly work report and edited */

The principle is the same as I described before.  It hooks into the
scheduler using an alias scheduler class of sched_fair and gets
notifications of workqueue threads going into sleep, waking up and
getting preempted from which worker pool is managed automatically for
full concurrency with the least number of concurrent threads.

There's a global workqueue per-cpu and each actual workqueue is front
to the global one adding necessary attributes and/or defining a
flushing domain.  Each global workqueue can have multiple workers
(upto 128 in the current incarnation) and creates and kicks new ones
as necessary to keep the cpu occupied.

The diffcult part was teaching workqueue how to handle multiple
workers yet maintaining its exclusion properties, flushing rules and
forward progress guarantees - a single work can't be running
concurrently on the same cpu but can across different cpus,
flush_work() deals with single cpu flushing but others deal with all
the cpus and so on.  Because each work struct can't be accessed once
the work actually begins running, keeping track of things become
somewhat difficult as multiple workers now process works from a single
queue.  Anyways, after much head scratching, I think most problems
have been nailed down although I wouldn't know for sure till I get it
actually working.

There's slight more book keeping to do on each work-processing
iteration but overall I think it will be a win considering that it can
remove unnecessary task switchings, usage of different stacks (cache
foot-print) and cross-cpu work bouncing (for currently single threaded
workqueues).  If it really works as expected, it should be able to
replace async, [very]_slow_work and remove most of private workqueues
while losing no concurrency or forward-progress guarantees, which
would be pretty decent.

/**/

I finished first draft implementation and review pass yesterday and it
seems like there shouldn't be any major problem now but I haven't even
tried to compile it yet, so I'm not yet entirely sure how it would
eventually turn out and if I hit some major roadblock I might just
drop it.

It would be nice if merging of this series and the lazy work can be
held a bit but there's no harm in merging either.  If the concurrency
managed workqueue turns out to be a good idea, we can replace it then.

Thanks a lot.

-- 
tejun

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 0/3] Convert libata pio task to slow-work
  2009-08-27 12:36 ` [PATCH 0/3] Convert libata pio task to slow-work Tejun Heo
@ 2009-08-27 12:49   ` Jens Axboe
  2009-08-27 12:58     ` Tejun Heo
  0 siblings, 1 reply; 13+ messages in thread
From: Jens Axboe @ 2009-08-27 12:49 UTC (permalink / raw)
  To: Tejun Heo; +Cc: linux-kernel, linux-ide, alan, jeff, dhowells

On Thu, Aug 27 2009, Tejun Heo wrote:
> Hello, Jens.
> 
> Jens Axboe wrote:
> > Hi,
> > 
> > This patchset adds support for slow-work for delayed slow work and
> > for cancelling slow work. Note that these patches are totally
> > untested!
> 
> As what I'm currently working on is likely to collide with these
> changes, here is a short summary of what's been going on.
> 
> /* excerpted from internal weekly work report and edited */
> 
> The principle is the same as I described before.  It hooks into the
> scheduler using an alias scheduler class of sched_fair and gets
> notifications of workqueue threads going into sleep, waking up and
> getting preempted from which worker pool is managed automatically for
> full concurrency with the least number of concurrent threads.
> 
> There's a global workqueue per-cpu and each actual workqueue is front
> to the global one adding necessary attributes and/or defining a
> flushing domain.  Each global workqueue can have multiple workers
> (upto 128 in the current incarnation) and creates and kicks new ones
> as necessary to keep the cpu occupied.
> 
> The diffcult part was teaching workqueue how to handle multiple
> workers yet maintaining its exclusion properties, flushing rules and
> forward progress guarantees - a single work can't be running
> concurrently on the same cpu but can across different cpus,
> flush_work() deals with single cpu flushing but others deal with all
> the cpus and so on.  Because each work struct can't be accessed once
> the work actually begins running, keeping track of things become
> somewhat difficult as multiple workers now process works from a single
> queue.  Anyways, after much head scratching, I think most problems
> have been nailed down although I wouldn't know for sure till I get it
> actually working.
> 
> There's slight more book keeping to do on each work-processing
> iteration but overall I think it will be a win considering that it can
> remove unnecessary task switchings, usage of different stacks (cache
> foot-print) and cross-cpu work bouncing (for currently single threaded
> workqueues).  If it really works as expected, it should be able to
> replace async, [very]_slow_work and remove most of private workqueues
> while losing no concurrency or forward-progress guarantees, which
> would be pretty decent.
> 
> /**/
> 
> I finished first draft implementation and review pass yesterday and it
> seems like there shouldn't be any major problem now but I haven't even
> tried to compile it yet, so I'm not yet entirely sure how it would
> eventually turn out and if I hit some major roadblock I might just
> drop it.
> 
> It would be nice if merging of this series and the lazy work can be
> held a bit but there's no harm in merging either.  If the concurrency
> managed workqueue turns out to be a good idea, we can replace it then.

It can wait, what you describe above sounds really cool and would
hopefully allow us to get rid of all workqueues (provided it scales well
and doesn't fall down on cache line contention with many different
instances pounding on it).

Care to post it? I know you don't think it's perfect yet, but it would
make a lot more sense to throw effort into this rather than waste time
on partial solutions.

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 0/3] Convert libata pio task to slow-work
  2009-08-27 12:49   ` Jens Axboe
@ 2009-08-27 12:58     ` Tejun Heo
  2009-08-27 18:49       ` Jens Axboe
  0 siblings, 1 reply; 13+ messages in thread
From: Tejun Heo @ 2009-08-27 12:58 UTC (permalink / raw)
  To: Jens Axboe; +Cc: linux-kernel, linux-ide, alan, jeff, dhowells

Hello, Jens.

Jens Axboe wrote:
>> It would be nice if merging of this series and the lazy work can be
>> held a bit but there's no harm in merging either.  If the concurrency
>> managed workqueue turns out to be a good idea, we can replace it then.
> 
> It can wait, what you describe above sounds really cool and would
> hopefully allow us to get rid of all workqueues (provided it scales well
> and doesn't fall down on cache line contention with many different
> instances pounding on it).

Almost all operations are per-cpu so cache lines shouldn't bounce too
much.  The only part I worry about is the part which checks whether a
work is currently executing on the current cpu which currently is
implemeted as a hash table.  The hash table is only 16 pointers long
and will be mostly empty so hopefully it doesn't add any significant
overhead.

> Care to post it? I know you don't think it's perfect yet, but it would
> make a lot more sense to throw effort into this rather than waste time
> on partial solutions.

I have this printed out code with full of red markings from proof
reading and flush implementation is mostly broken.  Please give me a
couple of days.  I'll post a rough unsplit version which at least
compiles with the planned changes applied by the end of the week.  :-)

Thanks.

-- 
tejun

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 0/3] Convert libata pio task to slow-work
  2009-08-27 12:58     ` Tejun Heo
@ 2009-08-27 18:49       ` Jens Axboe
  2009-08-28  7:02         ` Tejun Heo
  0 siblings, 1 reply; 13+ messages in thread
From: Jens Axboe @ 2009-08-27 18:49 UTC (permalink / raw)
  To: Tejun Heo; +Cc: linux-kernel, linux-ide, alan, jeff, dhowells

On Thu, Aug 27 2009, Tejun Heo wrote:
> Hello, Jens.
> 
> Jens Axboe wrote:
> >> It would be nice if merging of this series and the lazy work can be
> >> held a bit but there's no harm in merging either.  If the concurrency
> >> managed workqueue turns out to be a good idea, we can replace it then.
> > 
> > It can wait, what you describe above sounds really cool and would
> > hopefully allow us to get rid of all workqueues (provided it scales well
> > and doesn't fall down on cache line contention with many different
> > instances pounding on it).
> 
> Almost all operations are per-cpu so cache lines shouldn't bounce too
> much.  The only part I worry about is the part which checks whether a
> work is currently executing on the current cpu which currently is
> implemeted as a hash table.  The hash table is only 16 pointers long
> and will be mostly empty so hopefully it doesn't add any significant
> overhead.

OK, we'll let time and experimentation be the judge.

> > Care to post it? I know you don't think it's perfect yet, but it would
> > make a lot more sense to throw effort into this rather than waste time
> > on partial solutions.
> 
> I have this printed out code with full of red markings from proof
> reading and flush implementation is mostly broken.  Please give me a
> couple of days.  I'll post a rough unsplit version which at least
> compiles with the planned changes applied by the end of the week.  :-)

Alright, fair enough.

One question - do the 'exposed' workqueues (the ones that drivers
allocate/create) sitting in front of the global cpu queue allow more
than one thread per cpu, or is that property retained for the global cpu
queue (where it is a necessity)?

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 0/3] Convert libata pio task to slow-work
  2009-08-27 18:49       ` Jens Axboe
@ 2009-08-28  7:02         ` Tejun Heo
  2009-08-28  7:06           ` Jens Axboe
  0 siblings, 1 reply; 13+ messages in thread
From: Tejun Heo @ 2009-08-28  7:02 UTC (permalink / raw)
  To: Jens Axboe; +Cc: linux-kernel, linux-ide, alan, jeff, dhowells

Hello, Jens.

Jens Axboe wrote:
>> Almost all operations are per-cpu so cache lines shouldn't bounce too
>> much.  The only part I worry about is the part which checks whether a
>> work is currently executing on the current cpu which currently is
>> implemeted as a hash table.  The hash table is only 16 pointers long
>> and will be mostly empty so hopefully it doesn't add any significant
>> overhead.
> 
> OK, we'll let time and experimentation be the judge.

Yeap.

>>> Care to post it? I know you don't think it's perfect yet, but it would
>>> make a lot more sense to throw effort into this rather than waste time
>>> on partial solutions.
>> I have this printed out code with full of red markings from proof
>> reading and flush implementation is mostly broken.  Please give me a
>> couple of days.  I'll post a rough unsplit version which at least
>> compiles with the planned changes applied by the end of the week.  :-)
> 
> Alright, fair enough.
> 
> One question - do the 'exposed' workqueues (the ones that drivers
> allocate/create) sitting in front of the global cpu queue allow more
> than one thread per cpu, or is that property retained for the global cpu
> queue (where it is a necessity)?

The exposed workqueues basically just play the gateway and don't have
threads associated with it, well, at least not the normal ones.  It
may have single dedicated thread which usually isn't used but only
gets summoned when a queue stall is detected (new thread needs to be
created but blocks on allocation kind of situation).  So, only the
global cpu queue has normal workers and there are multiple per cpu and
they're shared by all exported workqueues.

Thanks.

-- 
tejun

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 0/3] Convert libata pio task to slow-work
  2009-08-28  7:02         ` Tejun Heo
@ 2009-08-28  7:06           ` Jens Axboe
  2009-08-31  3:24             ` [very-early-draft-unsplit PATCH] workqueue: implement global cpu workqueue Tejun Heo
  0 siblings, 1 reply; 13+ messages in thread
From: Jens Axboe @ 2009-08-28  7:06 UTC (permalink / raw)
  To: Tejun Heo; +Cc: linux-kernel, linux-ide, alan, jeff, dhowells

On Fri, Aug 28 2009, Tejun Heo wrote:
> >>> Care to post it? I know you don't think it's perfect yet, but it would
> >>> make a lot more sense to throw effort into this rather than waste time
> >>> on partial solutions.
> >> I have this printed out code with full of red markings from proof
> >> reading and flush implementation is mostly broken.  Please give me a
> >> couple of days.  I'll post a rough unsplit version which at least
> >> compiles with the planned changes applied by the end of the week.  :-)
> > 
> > Alright, fair enough.
> > 
> > One question - do the 'exposed' workqueues (the ones that drivers
> > allocate/create) sitting in front of the global cpu queue allow more
> > than one thread per cpu, or is that property retained for the global cpu
> > queue (where it is a necessity)?
> 
> The exposed workqueues basically just play the gateway and don't have
> threads associated with it, well, at least not the normal ones.  It
> may have single dedicated thread which usually isn't used but only
> gets summoned when a queue stall is detected (new thread needs to be
> created but blocks on allocation kind of situation).  So, only the
> global cpu queue has normal workers and there are multiple per cpu and
> they're shared by all exported workqueues.

Thanks for the clarification, it answers the question on what level of
functionality is observed by the exported workqueues. Sounds good!

-- 
Jens Axboe


^ permalink raw reply	[flat|nested] 13+ messages in thread

* [very-early-draft-unsplit PATCH] workqueue: implement global cpu workqueue
  2009-08-28  7:06           ` Jens Axboe
@ 2009-08-31  3:24             ` Tejun Heo
  0 siblings, 0 replies; 13+ messages in thread
From: Tejun Heo @ 2009-08-31  3:24 UTC (permalink / raw)
  To: Jens Axboe; +Cc: linux-kernel, linux-ide, alan, jeff, dhowells

Okay, here's the early version which finally builds.  It's a rather
huge patch and contains a large number of known and unknown bugs and
doesn't actually convert any users but it should be enough to show the
basic idea and how each area of the problem space is solved.

I'll post a proper cleaned-up and hopefully working version in several
days.

Thanks.

SERIOUSLY_BROKEN_AND_NOT_SIGNED_OFF_BY_ANYONE
---
 include/linux/kthread.h      |    1
 include/linux/sched.h        |    3
 include/linux/stop_machine.h |    6
 include/linux/workqueue.h    |   52 -
 init/main.c                  |    2
 kernel/kthread.c             |    7
 kernel/sched.c               |   14
 kernel/sched_fair.c          |   58 -
 kernel/sched_idletask.c      |    1
 kernel/sched_rt.c            |    1
 kernel/sched_workqueue.c     |   53 +
 kernel/sched_workqueue.h     |    5
 kernel/stop_machine.c        |  151 +++
 kernel/workqueue.c           | 1738 +++++++++++++++++++++++++++++++++----------
 14 files changed, 1655 insertions(+), 437 deletions(-)

Index: work/include/linux/stop_machine.h
===================================================================
--- work.orig/include/linux/stop_machine.h
+++ work/include/linux/stop_machine.h
@@ -53,6 +53,11 @@ int stop_machine_create(void);
  */
 void stop_machine_destroy(void);

+/**
+ * init_stop_machine: initialize stop_machine during boot
+ */
+void init_stop_machine(void);
+
 #else

 static inline int stop_machine(int (*fn)(void *), void *data,
@@ -67,6 +72,7 @@ static inline int stop_machine(int (*fn)

 static inline int stop_machine_create(void) { return 0; }
 static inline void stop_machine_destroy(void) { }
+static inline void init_stop_machine(void) { }

 #endif /* CONFIG_SMP */
 #endif /* _LINUX_STOP_MACHINE */
Index: work/init/main.c
===================================================================
--- work.orig/init/main.c
+++ work/init/main.c
@@ -35,6 +35,7 @@
 #include <linux/security.h>
 #include <linux/smp.h>
 #include <linux/workqueue.h>
+#include <linux/stop_machine.h>
 #include <linux/profile.h>
 #include <linux/rcupdate.h>
 #include <linux/moduleparam.h>
@@ -807,6 +808,7 @@ static void __init do_basic_setup(void)
 {
 	rcu_init_sched(); /* needed by module_init stage. */
 	init_workqueues();
+	init_stop_machine();
 	cpuset_init_smp();
 	usermodehelper_init();
 	driver_init();
Index: work/kernel/stop_machine.c
===================================================================
--- work.orig/kernel/stop_machine.c
+++ work/kernel/stop_machine.c
@@ -25,6 +25,8 @@ enum stopmachine_state {
 	STOPMACHINE_RUN,
 	/* Exit */
 	STOPMACHINE_EXIT,
+	/* Done */
+	STOPMACHINE_DONE,
 };
 static enum stopmachine_state state;

@@ -42,10 +44,9 @@ static DEFINE_MUTEX(lock);
 static DEFINE_MUTEX(setup_lock);
 /* Users of stop_machine. */
 static int refcount;
-static struct workqueue_struct *stop_machine_wq;
+static struct task_struct **stop_machine_threads;
 static struct stop_machine_data active, idle;
 static const struct cpumask *active_cpus;
-static void *stop_machine_work;

 static void set_state(enum stopmachine_state newstate)
 {
@@ -63,14 +64,31 @@ static void ack_state(void)
 }

 /* This is the actual function which stops the CPU. It runs
- * in the context of a dedicated stopmachine workqueue. */
-static void stop_cpu(struct work_struct *unused)
+ * on dedicated per-cpu kthreads. */
+static int stop_cpu(void *unused)
 {
 	enum stopmachine_state curstate = STOPMACHINE_NONE;
-	struct stop_machine_data *smdata = &idle;
+	struct stop_machine_data *smdata;
 	int cpu = smp_processor_id();
 	int err;

+repeat:
+	/* Wait for __stop_machine() to initiate */
+	while (true) {
+		set_current_state(TASK_INTERRUPTIBLE);
+		/* <- kthread_stop() and __stop_machine()::smp_wmb() */
+		if (kthread_should_stop()) {
+			__set_current_state(TASK_RUNNING);
+			return 0;
+		}
+		if (state == STOPMACHINE_PREPARE)
+			break;
+		schedule();
+	}
+	smp_rmb();	/* <- __stop_machine()::set_state() */
+
+	/* Okay, let's go */
+	smdata = &idle;
 	if (!active_cpus) {
 		if (cpu == cpumask_first(cpu_online_mask))
 			smdata = &active;
@@ -104,6 +122,7 @@ static void stop_cpu(struct work_struct
 	} while (curstate != STOPMACHINE_EXIT);

 	local_irq_enable();
+	goto repeat;
 }

 /* Callback for CPUs which aren't supposed to do anything. */
@@ -112,46 +131,122 @@ static int chill(void *unused)
 	return 0;
 }

+static int create_stop_machine_thread(unsigned int cpu)
+{
+	struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
+	struct task_struct **pp = per_cpu_ptr(stop_machine_threads, cpu);
+	struct task_struct *p;
+
+	if (*pp)
+		return -EBUSY;
+
+	p = kthread_create(stop_cpu, NULL, "kstop/%u", cpu);
+	if (IS_ERR(p))
+		return PTR_ERR(p);
+
+	sched_setscheduler_nocheck(p, SCHED_FIFO, &param);
+	*pp = p;
+	return 0;
+}
+
+/* Should be called with cpu hotplug disabled and setup_lock held */
+static void kill_stop_machine_threads(void)
+{
+	unsigned int cpu;
+
+	if (!stop_machine_threads)
+		return;
+
+	for_each_online_cpu(cpu) {
+		struct task_struct *p = *per_cpu_ptr(stop_machine_threads, cpu);
+		if (p)
+			kthread_stop(p);
+	}
+	free_percpu(stop_machine_threads);
+	stop_machine_threads = NULL;
+}
+
 int stop_machine_create(void)
 {
+	unsigned int cpu;
+
+	get_online_cpus();
 	mutex_lock(&setup_lock);
 	if (refcount)
 		goto done;
-	stop_machine_wq = create_rt_workqueue("kstop");
-	if (!stop_machine_wq)
-		goto err_out;
-	stop_machine_work = alloc_percpu(struct work_struct);
-	if (!stop_machine_work)
+
+	stop_machine_threads = alloc_percpu(struct task_struct *);
+	if (!stop_machine_threads)
 		goto err_out;
+
+	/*
+	 * cpu hotplug is disabled, create only for online cpus,
+	 * cpu_callback() will handle cpu hot [un]plugs.
+	 */
+	for_each_online_cpu(cpu) {
+		if (create_stop_machine_thread(cpu))
+			goto err_out;
+		kthread_bind(*per_cpu_ptr(stop_machine_threads, cpu), cpu);
+	}
 done:
 	refcount++;
 	mutex_unlock(&setup_lock);
+	put_online_cpus();
 	return 0;

 err_out:
-	if (stop_machine_wq)
-		destroy_workqueue(stop_machine_wq);
+	kill_stop_machine_threads();
 	mutex_unlock(&setup_lock);
+	put_online_cpus();
 	return -ENOMEM;
 }
 EXPORT_SYMBOL_GPL(stop_machine_create);

 void stop_machine_destroy(void)
 {
+	get_online_cpus();
 	mutex_lock(&setup_lock);
-	refcount--;
-	if (refcount)
-		goto done;
-	destroy_workqueue(stop_machine_wq);
-	free_percpu(stop_machine_work);
-done:
+	if (!--refcount)
+		kill_stop_machine_threads();
 	mutex_unlock(&setup_lock);
+	put_online_cpus();
 }
 EXPORT_SYMBOL_GPL(stop_machine_destroy);

+static int __cpuinit stop_machine_cpu_callback(struct notifier_block *nfb,
+					       unsigned long action, void *hcpu)
+{
+	unsigned int cpu = (unsigned long)hcpu;
+	struct task_struct **pp = per_cpu_ptr(stop_machine_threads, cpu);
+
+	/* Hotplug exclusion is enough, no need to worry about setup_lock */
+	if (!stop_machine_threads)
+		return NOTIFY_OK;
+
+	switch (action & ~CPU_TASKS_FROZEN) {
+	case CPU_UP_PREPARE:
+		if (create_stop_machine_thread(cpu)) {
+			printk(KERN_ERR "failed to create stop machine "
+			       "thread for %u\n", cpu);
+			return NOTIFY_BAD;
+		}
+		break;
+
+	case CPU_ONLINE:
+		kthread_bind(*pp, cpu);
+		break;
+
+	case CPU_UP_CANCELED:
+	case CPU_POST_DEAD:
+		kthread_stop(*pp);
+		*pp = NULL;
+		break;
+	}
+	return NOTIFY_OK;
+}
+
 int __stop_machine(int (*fn)(void *), void *data, const struct cpumask *cpus)
 {
-	struct work_struct *sm_work;
 	int i, ret;

 	/* Set up initial state. */
@@ -164,19 +259,18 @@ int __stop_machine(int (*fn)(void *), vo
 	idle.fn = chill;
 	idle.data = NULL;

-	set_state(STOPMACHINE_PREPARE);
+	set_state(STOPMACHINE_PREPARE);	/* -> stop_cpu()::smp_rmb() */
+	smp_wmb();			/* -> stop_cpu()::set_current_state() */

 	/* Schedule the stop_cpu work on all cpus: hold this CPU so one
 	 * doesn't hit this CPU until we're ready. */
 	get_cpu();
-	for_each_online_cpu(i) {
-		sm_work = per_cpu_ptr(stop_machine_work, i);
-		INIT_WORK(sm_work, stop_cpu);
-		queue_work_on(i, stop_machine_wq, sm_work);
-	}
+	for_each_online_cpu(i)
+		wake_up_process(*per_cpu_ptr(stop_machine_threads, i));
 	/* This will release the thread on our CPU. */
 	put_cpu();
-	flush_workqueue(stop_machine_wq);
+	while (state < STOPMACHINE_DONE)
+		yield();
 	ret = active.fnret;
 	mutex_unlock(&lock);
 	return ret;
@@ -197,3 +291,8 @@ int stop_machine(int (*fn)(void *), void
 	return ret;
 }
 EXPORT_SYMBOL_GPL(stop_machine);
+
+void __init init_stop_machine(void)
+{
+	hotcpu_notifier(stop_machine_cpu_callback, 0);
+}
Index: work/include/linux/workqueue.h
===================================================================
--- work.orig/include/linux/workqueue.h
+++ work/include/linux/workqueue.h
@@ -22,11 +22,28 @@ typedef void (*work_func_t)(struct work_
  */
 #define work_data_bits(work) ((unsigned long *)(&(work)->data))

+enum {
+	WORK_STRUCT_PENDING_BIT	= 0,	/* work item is pending execution */
+	WORK_STRUCT_COLOR_BIT	= 1,	/* color for workqueue flushing */
+	WORK_STRUCT_LINKED_BIT	= 2,	/* next work is linked to this one */
+
+	WORK_STRUCT_PENDING	= 1 << WORK_STRUCT_PENDING_BIT,
+	WORK_STRUCT_COLOR	= 1 << WORK_STRUCT_COLOR_BIT,
+	WORK_STRUCT_LINKED	= 1 << WORK_STRUCT_LINKED_BIT,
+
+	/*
+	 * Reserve 3bits off of cwq pointer.  This is enough and
+	 * provides acceptable alignment on both 32 and 64bit
+	 * machines.
+	 */
+	WORK_STRUCT_FLAG_BITS	= 3,
+
+	WORK_STRUCT_FLAG_MASK	= (1UL << WORK_STRUCT_FLAG_BITS) - 1,
+	WORK_STRUCT_WQ_DATA_MASK = ~WORK_STRUCT_FLAG_MASK,
+};
+
 struct work_struct {
 	atomic_long_t data;
-#define WORK_STRUCT_PENDING 0		/* T if work item pending execution */
-#define WORK_STRUCT_FLAG_MASK (3UL)
-#define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)
 	struct list_head entry;
 	work_func_t func;
 #ifdef CONFIG_LOCKDEP
@@ -163,14 +180,17 @@ struct execute_work {
 #define work_clear_pending(work) \
 	clear_bit(WORK_STRUCT_PENDING, work_data_bits(work))

+enum {
+	WQ_FREEZEABLE		= 1 << 0, /* freeze during suspend */
+	WQ_EMERGENCY_WORKER	= 1 << 1, /* has an emergency worker */
+};

 extern struct workqueue_struct *
-__create_workqueue_key(const char *name, int singlethread,
-		       int freezeable, int rt, struct lock_class_key *key,
-		       const char *lock_name);
+__create_workqueue_key(const char *name, unsigned int flags,
+		       struct lock_class_key *key, const char *lock_name);

 #ifdef CONFIG_LOCKDEP
-#define __create_workqueue(name, singlethread, freezeable, rt)	\
+#define __create_workqueue(name, flags)				\
 ({								\
 	static struct lock_class_key __key;			\
 	const char *__lock_name;				\
@@ -180,20 +200,20 @@ __create_workqueue_key(const char *name,
 	else							\
 		__lock_name = #name;				\
 								\
-	__create_workqueue_key((name), (singlethread),		\
-			       (freezeable), (rt), &__key,	\
+	__create_workqueue_key((name), (flags), &__key,		\
 			       __lock_name);			\
 })
 #else
-#define __create_workqueue(name, singlethread, freezeable, rt)	\
-	__create_workqueue_key((name), (singlethread), (freezeable), (rt), \
-			       NULL, NULL)
+#define __create_workqueue(name, flags)				\
+	__create_workqueue_key((name), (flags), NULL, NULL)
 #endif

-#define create_workqueue(name) __create_workqueue((name), 0, 0, 0)
-#define create_rt_workqueue(name) __create_workqueue((name), 0, 0, 1)
-#define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1, 0)
-#define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0, 0)
+#define create_workqueue(name)					\
+	__create_workqueue((name), WQ_EMERGENCY_WORKER)
+#define create_freezeable_workqueue(name)			\
+	__create_workqueue((name), WQ_FREEZEABLE | WQ_EMERGENCY_WORKER)
+#define create_singlethread_workqueue(name)			\
+	__create_workqueue((name), WQ_EMERGENCY_WORKER)

 extern void destroy_workqueue(struct workqueue_struct *wq);

Index: work/kernel/workqueue.c
===================================================================
--- work.orig/kernel/workqueue.c
+++ work/kernel/workqueue.c
@@ -29,77 +29,176 @@
 #include <linux/kthread.h>
 #include <linux/hardirq.h>
 #include <linux/mempolicy.h>
-#include <linux/freezer.h>
+#include <linux/freezer.h>		// freezer not implemented yet
 #include <linux/kallsyms.h>
 #include <linux/debug_locks.h>
 #include <linux/lockdep.h>
+#include <linux/wait.h>
 #define CREATE_TRACE_POINTS
-#include <trace/events/workqueue.h>
+#include <trace/events/workqueue.h>	// tracer not implemented yet
+
+#include "sched_workqueue.h"
+
+enum {
+	/* worker state flags */
+	WORKER_STA_IDLE		= 1 << 0,	/* is idle */
+	WORKER_STA_RUNNING	= 1 << 1,	/* busy && TASK_RUNNING */
+	WORKER_STA_ROGUE	= 1 << 2,	/* don't try to track RUNNING */
+
+	/* worker request flags */
+	WORKER_REQ_DIE		= 1 << 1,	/* die die die */
+
+	/* global_cwq flags */
+	GCWQ_MANAGE_WORKERS	= 1 << 0,	/* need to manage workers */
+	GCWQ_MANAGING_WORKERS	= 1 << 1,	/* managing workers */
+
+	/* gcwq->trustee_state */
+	TRUSTEE_NONE		= 0,
+	TRUSTEE_IN_CHARGE	= 1,
+	TRUSTEE_DRAIN		= 2,
+	TRUSTEE_CANCEL		= 3,
+	TRUSTEE_RELEASE		= 4,
+	TRUSTEE_DONE		= 5,
+
+	MAX_CPU_WORKERS_ORDER	= 7,		/* 128 */
+	MAX_WORKERS_PER_CPU	= 1 << MAX_CPU_WORKERS_ORDER,
+
+	BUSY_WORKER_HASH_ORDER	= MAX_CPU_WORKERS_ORDER - 3, /* 16 pointers */
+	BUSY_WORKER_HASH_SIZE	= 1 << BUSY_WORKER_HASH_ORDER,
+	BUSY_WORKER_HASH_MASK	= BUSY_WORKER_HASH_SIZE - 1,
+
+	MAX_IDLE_WORKERS_RATIO	= 4,		/* 1/4 of busy can be idle */
+	IDLE_WORKER_TIMEOUT	= 180 * HZ,	/* keep idle ones for 3 mins */
+
+	MAYDAY_INTERVAL		= 2 * HZ,	/* call for help every 2 secs */
+	CREATE_COOLDOWN		= 5 * HZ,	/* time to breath after fail */
+
+	WORKER_NICE_LEVEL	= -5,		/* bump it up, I mean, down? */
+	EMERGENCY_NICE_LEVEL	= -20,		/* EMERGENCY! */
+};
+
+struct work_notifier {
+	struct list_head	entry;
+	struct completion	*notify;
+};

 /*
- * The per-CPU workqueue (if single thread, we always use the first
- * possible cpu).
+ * Structure fields follow one of the following exclusion rules.
+ *
+ * I: Set during initialization and read-only afterwards.
+ *
+ * P: Preemption protected.  Disabling preemption is enough and should
+ *    only be modified and accessed from the local cpu.
+ *
+ * L: gcwq->lock protected.  Access with gcw->lock held.
+ *
+ * M: Modification requires gcwq->lock and should be done only from
+ *    local cpu.  Disabling preemption is enough to read from local
+ *    cpu.
+ *
+ * D: Don't care.
  */
-struct cpu_workqueue_struct {

-	spinlock_t lock;
+struct global_cwq;
+
+/*
+ * The poor guys doing the actual heavy lifting.
+ */
+struct worker {
+	/* on idle list while idle, on busy hash table while busy */
+	union {
+		struct hlist_node	hentry;	/* L: while idle */
+		struct list_head	entry;	/* L: while busy */
+	};

-	struct list_head worklist;
-	wait_queue_head_t more_work;
-	struct work_struct *current_work;
+	struct work_struct	*current_work;	/* L: work being processed */
+	struct list_head	scheduled;	/* L: scheduled works */
+	struct task_struct	*task;		/* I: worker task */
+	struct global_cwq	*gcwq;		/* I: the associated gcwq */
+	unsigned int		state;		/* P: WORKER_STA_* flags */
+	unsigned int		req_flags;	/* L: requests from outside */
+	unsigned long		last_active;	/* L: last active timestamp */
+};

-	struct workqueue_struct *wq;
-	struct task_struct *thread;
-} ____cacheline_aligned;
+/*
+ * Global per-cpu workqueue.  There's one and only one for each cpu
+ * and all works are queued and processed here regardless of their
+ * target workqueues.
+ */
+struct global_cwq {
+	spinlock_t		lock;		/* the gcwq lock */
+	struct list_head	worklist;	/* L: list of pending works */
+	unsigned int		cpu;		/* I: the associated cpu */
+	unsigned int		flags;		/* L: GCWQ_* flags */
+
+	int			nr_workers;	/* L: total number of workers */
+	int			nr_idle;	/* L: currently idle ones */
+
+	/* track concurrency, used by scheduler callbacks */
+	int			nr_running;	/* P: currently running ones */
+
+	/* workers are chained either in the idle_list or busy_hash */
+	struct list_head	idle_list;	/* M: list of idle workers */
+	struct hlist_head	busy_hash[BUSY_WORKER_HASH_SIZE];
+						/* L: hash of busy workers */
+
+	struct timer_list	idle_timer;	/* L: worker idle timeout */
+	struct timer_list	mayday_timer;	/* L: SOS timer for dworkers */
+
+	struct task_struct	*trustee;	/* L: for gcwq shutdown */
+	int			trustee_state;	/* L: trustee state */
+	int			trustee_target;	/* L: trustee target state */
+	wait_queue_head_t	trustee_wait;	/* D: trustee wait */
+	struct work_struct	trustee_reap;	/* D: grim reaper for trustee */
+};
+
+/*
+ * The per-CPU workqueue.  The lower WORK_STRUCT_FLAG_BITS of
+ * work_struct->data are used for flags and thus cwqs need to be
+ * aligned at two's power of the bits.
+ */
+struct cpu_workqueue_struct {
+	struct global_cwq	*gcwq;		/* I: the associated gcwq */
+	int			nr_in_flight;	/* L: nr of in_flight works */
+	unsigned int		flush_color;	/* L: current flush color */
+	int			flush_cnt;	/* L: in-progress flush count */
+	struct workqueue_struct *wq;		/* I: the owning workqueue */
+} __attribute__((aligned(1 << WORK_STRUCT_FLAG_BITS)));

 /*
  * The externally visible workqueue abstraction is an array of
  * per-CPU workqueues:
  */
 struct workqueue_struct {
-	struct cpu_workqueue_struct *cpu_wq;
-	struct list_head list;
-	const char *name;
-	int singlethread;
-	int freezeable;		/* Freeze threads during suspend */
-	int rt;
+	unsigned int		flags;		/* I: WQ_* flags */
+	struct cpu_workqueue_struct *cpu_wq;	/* I: cwq's */
+
+	struct mutex		flush_mutex;	/* single flush at a time */
+	atomic_t		nr_cwqs_to_flush; /* flush in progress */
+	struct completion	*flush_done;	/* flush done */
+
+	cpumask_var_t		mayday_mask;	/* cpus requesting rescue */
+	struct worker		*emergency;	/* I: emergency worker */
+
+	const char		*name;		/* I: workqueue name */
 #ifdef CONFIG_LOCKDEP
-	struct lockdep_map lockdep_map;
+	struct lockdep_map	lockdep_map;
 #endif
 };

-/* Serializes the accesses to the list of workqueues. */
-static DEFINE_SPINLOCK(workqueue_lock);
-static LIST_HEAD(workqueues);
+/* the almighty global cpu workqueues */
+static DEFINE_PER_CPU(struct global_cwq, global_cwq);

-static int singlethread_cpu __read_mostly;
-static const struct cpumask *cpu_singlethread_map __read_mostly;
-/*
- * _cpu_down() first removes CPU from cpu_online_map, then CPU_DEAD
- * flushes cwq->worklist. This means that flush_workqueue/wait_on_work
- * which comes in between can't use for_each_online_cpu(). We could
- * use cpu_possible_map, the cpumask below is more a documentation
- * than optimization.
- */
-static cpumask_var_t cpu_populated_map __read_mostly;
-
-/* If it's single threaded, it isn't in the list of workqueues. */
-static inline int is_wq_single_threaded(struct workqueue_struct *wq)
-{
-	return wq->singlethread;
-}
+static int worker_thread(void *__worker);

-static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq)
+static struct global_cwq *get_gcwq(unsigned int cpu)
 {
-	return is_wq_single_threaded(wq)
-		? cpu_singlethread_map : cpu_populated_map;
+	return &per_cpu(global_cwq, cpu);
 }

-static
-struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu)
+static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
+					    struct workqueue_struct *wq)
 {
-	if (unlikely(is_wq_single_threaded(wq)))
-		cpu = singlethread_cpu;
 	return per_cpu_ptr(wq->cpu_wq, cpu);
 }

@@ -108,46 +207,295 @@ struct cpu_workqueue_struct *wq_per_cpu(
  * - Must *only* be called if the pending flag is set
  */
 static inline void set_wq_data(struct work_struct *work,
-				struct cpu_workqueue_struct *cwq)
+			       struct cpu_workqueue_struct *cwq,
+			       unsigned long flags)
 {
-	unsigned long new;
-
 	BUG_ON(!work_pending(work));
+	BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);

-	new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING);
-	new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work);
-	atomic_long_set(&work->data, new);
+	atomic_long_set(&work->data, (unsigned long)cwq | flags);
 }

-static inline
-struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
+static inline struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
 {
 	return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK);
 }

-static void insert_work(struct cpu_workqueue_struct *cwq,
-			struct work_struct *work, struct list_head *head)
+/*
+ * Policy functions.  The following functions defines the policies on
+ * how the global worker pool is managed.  Unless noted otherwise,
+ * these functions assume that they're being called with gcwq->lock
+ * held.
+ */
+
+/* Do we need a new worker?  Called from manager. */
+static bool need_new_worker(struct global_cwq *gcwq)
+{
+	return !list_empty(&gcwq->worklist) && !gcwq->nr_idle;
+}
+
+/* Do we have too many workers and some should go away? */
+static bool too_many_workers(struct global_cwq *gcwq)
+{
+	bool managing = gcwq->flags & GCWQ_MANAGING_WORKERS;
+	int nr_idle = gcwq->nr_idle + managing; /* manager is considered idle */
+	int nr_busy = gcwq->nr_workers - nr_idle;
+
+	return nr_idle > 1 && (nr_idle - 1) * MAX_IDLE_WORKERS_RATIO >= nr_busy;
+}
+
+/* Do I need to be the manager?  Called from manager candidates. */
+static bool need_to_manage_workers(struct global_cwq *gcwq)
+{
+	return (need_new_worker(gcwq) || gcwq->flags & GCWQ_MANAGE_WORKERS) &&
+		!(gcwq->flags & GCWQ_MANAGING_WORKERS);
+}
+
+/*
+ * Does a worker need to keep working?  Called from workers, scheduler
+ * callbacks or someone queueing a work.  @max_running determines how
+ * many concurrent workers are allowed.
+ */
+static bool worker_keep_busy(struct global_cwq *gcwq, int max_running)
+{
+	/* keep busy if there's work and nothing else is running */
+	return !list_empty(&gcwq->worklist) && gcwq->nr_running <= max_running;
+}
+
+/*
+ * Wake up functions.
+ */
+
+/* Return the first worker.  Safe with preemption disabled */
+static struct worker *first_worker(struct global_cwq *gcwq)
+{
+	if (unlikely(list_empty(&gcwq->idle_list)))
+		return NULL;
+
+	return list_first_entry(&gcwq->idle_list, struct worker, entry);
+}
+
+/**
+ * wake_up_worker - wake up an idle worker
+ * @gcwq: gcwq to wake worker for
+ *
+ * Wake the first idle worker of @gcwq.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ */
+static void wake_up_worker(struct global_cwq *gcwq)
+{
+	struct worker *worker = first_worker(gcwq);
+
+	if (likely(worker))
+		wake_up_process(worker->task);
+}
+
+/**
+ * sched_wake_up_worker - wake up an idle worker
+ * @gcwq: gcwq to wake worker for
+ *
+ * Wake the first idle worker of @gcwq.
+ *
+ * CONTEXT:
+ * Scheduler callback.  DO NOT call from anywhere else.
+ */
+static void sched_wake_up_worker(struct global_cwq *gcwq)
+{
+	struct worker *worker = first_worker(gcwq);
+
+	if (likely(worker))
+		sched_workqueue_wake_up_process(worker->task);
+}
+
+/*
+ * Scheduler callbacks.  These functions are called during schedule()
+ * with rq lock held.  Don't try to acquire any lock and only access
+ * fields which are safe with preemption disabled from local cpu.
+ */
+
+/* called when a worker task @task wakes up from sleep */
+void sched_workqueue_worker_wakeup(struct task_struct *task)
+{
+	struct worker *worker = kthread_data(task);
+	struct global_cwq *gcwq = worker->gcwq;
+
+	if (unlikely(worker->state & (WORKER_STA_IDLE | WORKER_STA_ROGUE)))
+		return;
+
+	if (likely(!(worker->state & WORKER_STA_RUNNING))) {
+		worker->state |= WORKER_STA_RUNNING;
+		gcwq->nr_running++;
+	}
+}
+
+/* called when a worker task @task goes into sleep */
+void sched_workqueue_worker_sleep(struct task_struct *task)
+{
+	struct worker *worker = kthread_data(task);
+	struct global_cwq *gcwq = worker->gcwq;
+
+	if (unlikely(worker->state & (WORKER_STA_IDLE | WORKER_STA_ROGUE)))
+		return;
+
+	if (likely(worker->state & WORKER_STA_RUNNING)) {
+		worker->state &= ~WORKER_STA_RUNNING;
+		gcwq->nr_running--;
+	}
+
+	if (worker_keep_busy(gcwq, 0))
+		sched_wake_up_worker(gcwq);
+}
+
+/* called when a worker task @task gets preempted */
+void sched_workqueue_worker_preempted(struct task_struct *task)
+{
+	struct worker *worker = kthread_data(task);
+	struct global_cwq *gcwq = worker->gcwq;
+
+	if (unlikely(worker->state & (WORKER_STA_IDLE | WORKER_STA_ROGUE)))
+		return;
+
+	/*
+	 * We're gonna be scheduled out but still accounted as
+	 * running.  Call worker_keep_busy() with @max_running of 1.
+	 * This will allow one extra worker to be scheduled on
+	 * preemption so that one cpu hog doesn't stall the whole
+	 * queue.  I'm not sure whether this is a worthy optimization
+	 * yet.  Maybe we're better off with just bumping up the
+	 * priority of workers.
+	 */
+	if (worker_keep_busy(gcwq, 1))
+		sched_wake_up_worker(gcwq);
+}
+
+/**
+ * busy_worker_head - return the busy hash head for a work
+ * @gcwq: gcwq of interest
+ * @work: work to be hashed
+ *
+ * Return hash head of @gcwq for @work.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ *
+ * RETURNS:
+ * Pointer to the hash head.
+ */
+static struct hlist_head *busy_worker_head(struct global_cwq *gcwq,
+					   struct work_struct *work)
+{
+	const int base_shift = ilog2(sizeof(struct work_struct));
+	unsigned long v = (unsigned long)work;
+
+	v >>= base_shift;
+	v += v >> BUSY_WORKER_HASH_ORDER;
+	v &= BUSY_WORKER_HASH_MASK;
+
+	return &gcwq->busy_hash[v];
+}
+
+/**
+ * __find_worker_executing_work - find worker which is executing a work
+ * @gcwq: gcwq of interest
+ * @bwh: hash head as returned by busy_worker_head()
+ * @work: work to find worker for
+ *
+ * Find a worker which is executing @work on @gcwq.  @bwh should be
+ * the hash head obtained by calling busy_worker_head() with the same
+ * work.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ *
+ * RETURNS:
+ * Pointer to worker which is executing @work if found, NULL
+ * otherwise.
+ */
+static struct worker *__find_worker_executing_work(struct global_cwq *gcwq,
+						   struct hlist_head *bwh,
+						   struct work_struct *work)
+{
+	struct worker *worker;
+	struct hlist_node *tmp;
+
+	hlist_for_each_entry(worker, tmp, bwh, hentry)
+		if (worker->current_work == work)
+			return worker;
+	return NULL;
+}
+
+/**
+ * find_worker_executing_work - find worker which is executing a work
+ * @gcwq: gcwq of interest
+ * @work: work to find worker for
+ *
+ * Find a worker which is executing @work on @gcwq.  This function is
+ * identical to __find_worker_executing_work() except that this
+ * function calculates @bwh itself.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ *
+ * RETURNS:
+ * Pointer to worker which is executing @work if found, NULL
+ * otherwise.
+ */
+static struct worker *find_worker_executing_work(struct global_cwq *gcwq,
+						 struct work_struct *work)
 {
-	trace_workqueue_insertion(cwq->thread, work);
+	return __find_worker_executing_work(gcwq, busy_worker_head(gcwq, work),
+					    work);
+}
+
+/**
+ * insert_work - insert a work into gcwq
+ * @gcwq: target gcwq
+ * @cwq: cwq @work belongs to
+ * @work: work to insert
+ * @head: insertion point
+ * @extra_flags: extra WORK_STRUCT_* flags to set
+ *
+ * Insert @work which belongs to @cwq into @gcwq after @head.
+ * @extra_flags is ORd to WORK_STRUCT flags.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ */
+static void insert_work(struct global_cwq *gcwq,
+			struct cpu_workqueue_struct *cwq,
+			struct work_struct *work, struct list_head *head,
+			unsigned int extra_flags)
+{
+	cwq->nr_in_flight++;
+
+	/* we own @work, set data and link */
+	set_wq_data(work, cwq,
+		    WORK_STRUCT_PENDING | cwq->flush_color | extra_flags);

-	set_wq_data(work, cwq);
 	/*
 	 * Ensure that we get the right work->data if we see the
 	 * result of list_add() below, see try_to_grab_pending().
 	 */
 	smp_wmb();
 	list_add_tail(&work->entry, head);
-	wake_up(&cwq->more_work);
+
+	if (worker_keep_busy(gcwq, 0))
+		wake_up_worker(gcwq);
 }

-static void __queue_work(struct cpu_workqueue_struct *cwq,
+static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
 			 struct work_struct *work)
 {
+	struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+	struct global_cwq *gcwq = cwq->gcwq;
 	unsigned long flags;

-	spin_lock_irqsave(&cwq->lock, flags);
-	insert_work(cwq, work, &cwq->worklist);
-	spin_unlock_irqrestore(&cwq->lock, flags);
+	spin_lock_irqsave(&gcwq->lock, flags);
+	BUG_ON(!list_empty(&work->entry));
+	insert_work(gcwq, cwq, work, &gcwq->worklist, 0);
+	spin_unlock_irqrestore(&gcwq->lock, flags);
 }

 /**
@@ -187,9 +535,8 @@ queue_work_on(int cpu, struct workqueue_
 {
 	int ret = 0;

-	if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
-		BUG_ON(!list_empty(&work->entry));
-		__queue_work(wq_per_cpu(wq, cpu), work);
+	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
+		__queue_work(cpu, wq, work);
 		ret = 1;
 	}
 	return ret;
@@ -200,9 +547,8 @@ static void delayed_work_timer_fn(unsign
 {
 	struct delayed_work *dwork = (struct delayed_work *)__data;
 	struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work);
-	struct workqueue_struct *wq = cwq->wq;

-	__queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work);
+	__queue_work(smp_processor_id(), cwq->wq, &dwork->work);
 }

 /**
@@ -239,14 +585,15 @@ int queue_delayed_work_on(int cpu, struc
 	struct timer_list *timer = &dwork->timer;
 	struct work_struct *work = &dwork->work;

-	if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
+	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
 		BUG_ON(timer_pending(timer));
 		BUG_ON(!list_empty(&work->entry));

 		timer_stats_timer_set_start_info(&dwork->timer);

 		/* This stores cwq for the moment, for the timer_fn */
-		set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id()));
+		set_wq_data(work, get_cwq(raw_smp_processor_id(), wq),
+			    WORK_STRUCT_PENDING);
 		timer->expires = jiffies + delay;
 		timer->data = (unsigned long)dwork;
 		timer->function = delayed_work_timer_fn;
@@ -261,123 +608,560 @@ int queue_delayed_work_on(int cpu, struc
 }
 EXPORT_SYMBOL_GPL(queue_delayed_work_on);

-static void run_workqueue(struct cpu_workqueue_struct *cwq)
+/**
+ * worker_enter_idle - enter idle state
+ * @gcwq: gcwq worker belongs to
+ * @worker: worker which is entering idle state
+ *
+ * @worker is entering idle state.  Update stats and idle timer if
+ * necessary.
+ *
+ * LOCKING:
+ * spin_lock_irq(gcwq->lock).
+ */
+static void worker_enter_idle(struct global_cwq *gcwq, struct worker *worker)
 {
-	spin_lock_irq(&cwq->lock);
-	while (!list_empty(&cwq->worklist)) {
-		struct work_struct *work = list_entry(cwq->worklist.next,
-						struct work_struct, entry);
-		work_func_t f = work->func;
-#ifdef CONFIG_LOCKDEP
-		/*
-		 * It is permissible to free the struct work_struct
-		 * from inside the function that is called from it,
-		 * this we need to take into account for lockdep too.
-		 * To avoid bogus "held lock freed" warnings as well
-		 * as problems when looking into work->lockdep_map,
-		 * make a copy and use that here.
-		 */
-		struct lockdep_map lockdep_map = work->lockdep_map;
-#endif
-		trace_workqueue_execution(cwq->thread, work);
-		cwq->current_work = work;
-		list_del_init(cwq->worklist.next);
-		spin_unlock_irq(&cwq->lock);
+	BUG_ON(worker->state & WORKER_STA_IDLE);
+	BUG_ON(!list_empty(&worker->entry));
+
+	if (worker->state & WORKER_STA_RUNNING) {
+		worker->state &= ~WORKER_STA_RUNNING;
+		gcwq->nr_running--;
+	}
+
+	worker->state |= WORKER_STA_IDLE;
+	gcwq->nr_idle++;
+	worker->last_active = jiffies;
+
+	/* idle_list is LIFO */
+	list_add(&worker->entry, &gcwq->idle_list);
+
+	if (likely(!(worker->state & WORKER_STA_ROGUE))) {
+		if (too_many_workers(gcwq) && !timer_pending(&gcwq->idle_timer))
+			mod_timer(&gcwq->idle_timer,
+				  jiffies + IDLE_WORKER_TIMEOUT);
+	} else
+		wake_up_all(&gcwq->trustee_wait);
+}
+
+/**
+ * worker_leave_idle - leave idle state
+ * @gcwq: gcwq worker belongs to
+ * @worker: worker which is leaving idle state
+ *
+ * @worker is leaving idle state.  Update stats.
+ *
+ * LOCKING:
+ * spin_lock_irq(gcwq->lock).
+ */
+static void worker_leave_idle(struct global_cwq *gcwq, struct worker *worker)
+{
+	BUG_ON(!(worker->state & WORKER_STA_IDLE));
+	worker->state &= ~WORKER_STA_IDLE;
+	gcwq->nr_idle--;
+
+	if (likely(!(worker->state & WORKER_STA_ROGUE))) {
+		worker->state |= WORKER_STA_RUNNING;
+		gcwq->nr_running++;
+	}
+
+	list_del_init(&worker->entry);
+}
+
+static struct worker *alloc_worker(void)
+{
+	struct worker *worker;
+
+	worker = kzalloc(sizeof(*worker), GFP_KERNEL);
+	if (!worker)
+		return NULL;
+
+	INIT_LIST_HEAD(&worker->entry);
+	INIT_LIST_HEAD(&worker->scheduled);
+	/* on creation a worker is not idle */
+	return worker;
+}
+
+/**
+ * create_worker - create a new workqueue worker
+ * @gcwq: gcwq the new worker will belong to
+ * @bind: whether to set affinity to @cpu or not
+ *
+ * Create a new worker which is bound to @gcwq.  Please note that this
+ * function doesn't adjust any stats.  Attaching it to its gcwq is the
+ * caller's responsibility.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock) which is released and regrabbed.  Does
+ * GFP_KERNEL allocations.
+ *
+ * RETURNS:
+ * Pointer to the newly created worker
+ */
+static struct worker *create_worker(struct global_cwq *gcwq, bool bind)
+{
+	struct worker *worker;
+
+	spin_unlock_irq(&gcwq->lock);
+
+	worker = alloc_worker();
+	if (!worker)
+		return NULL;
+
+	worker->gcwq = gcwq;
+
+	worker->task = kthread_create(worker_thread, worker, "kworker/%u",
+				      gcwq->cpu);
+	if (IS_ERR(worker->task)) {
+		kfree(worker);
+		return NULL;
+	}
+
+	if (bind)
+		kthread_bind(worker->task, gcwq->cpu);
+
+	spin_lock_irq(&gcwq->lock);
+	gcwq->nr_workers++;
+	worker_enter_idle(gcwq, worker);
+
+	return worker;
+}
+
+static bool send_mayday(struct work_struct *work)
+{
+	struct cpu_workqueue_struct *cwq = get_wq_data(work);
+	struct workqueue_struct *wq = cwq->wq;
+
+	if (!(wq->flags & WQ_EMERGENCY_WORKER))
+		return false;
+
+	/* mayday mayday mayday */
+	if (!cpumask_test_and_set_cpu(cwq->gcwq->cpu, wq->mayday_mask))
+		wake_up_process(wq->emergency->task);
+	return true;
+}

-		BUG_ON(get_wq_data(work) != cwq);
-		work_clear_pending(work);
-		lock_map_acquire(&cwq->wq->lockdep_map);
-		lock_map_acquire(&lockdep_map);
-		f(work);
-		lock_map_release(&lockdep_map);
-		lock_map_release(&cwq->wq->lockdep_map);
+/**
+ * destroy_worker - destroy a workqueue worker
+ * @worker: worker to be destroyed
+ *
+ * Destroy @worker and adjust @gcwq stats accordingly.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock) which is released and regrabbed.
+ */
+static void destroy_worker(struct worker *worker)
+{
+	struct global_cwq *gcwq = worker->gcwq;
+
+	/* sanity check frenzy */
+	BUG_ON(worker->current_work);
+	BUG_ON(!list_empty(&worker->scheduled));
+	BUG_ON(!(worker->state & WORKER_STA_IDLE));
+	BUG_ON(worker->state & WORKER_STA_RUNNING);
+	BUG_ON(worker->req_flags);
+
+	gcwq->nr_workers--;
+	gcwq->nr_idle--;
+	list_del_init(&worker->entry);
+	worker->req_flags |= WORKER_REQ_DIE;
+
+	spin_unlock_irq(&gcwq->lock);
+
+	kthread_stop(worker->task);
+	kfree(worker);
+
+	spin_lock_irq(&gcwq->lock);
+}
+
+static void idle_worker_timeout(unsigned long __gcwq)
+{
+	struct global_cwq *gcwq = (void *)__gcwq;
+
+	spin_lock_irq(&gcwq->lock);
+
+	if (too_many_workers(gcwq)) {
+		struct worker *worker;
+		unsigned long expires;

-		if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
-			printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
-					"%s/0x%08x/%d\n",
-					current->comm, preempt_count(),
-				       	task_pid_nr(current));
-			printk(KERN_ERR "    last function: ");
-			print_symbol("%s\n", (unsigned long)f);
-			debug_show_held_locks(current);
-			dump_stack();
+		/* idle_list is kept in LIFO order, check the last one */
+		worker = list_entry(gcwq->idle_list.prev, struct worker, entry);
+		expires = worker->last_active + IDLE_WORKER_TIMEOUT;
+
+		if (time_before(jiffies, expires))
+			mod_timer(&gcwq->idle_timer, expires);
+		else {
+			/* it's been idle for too long, wake up manager */
+			gcwq->flags |= GCWQ_MANAGE_WORKERS;
+			wake_up_worker(gcwq);
 		}
+	}
+
+	spin_unlock_irq(&gcwq->lock);
+}

-		spin_lock_irq(&cwq->lock);
-		cwq->current_work = NULL;
+static void gcwq_mayday_timeout(unsigned long __gcwq)
+{
+	struct global_cwq *gcwq = (void *)__gcwq;
+	struct work_struct *work;
+
+	spin_lock_irq(&gcwq->lock);
+
+	if (need_new_worker(gcwq)) {
+		/*
+		 * We've been trying to create a new worker but
+		 * haven't been successful for more than
+		 * MAYDAY_INTERVAL.  We might be hitting an allocation
+		 * deadlock.  Send distress calls to emergency
+		 * workers.
+		 */
+		list_for_each_entry(work, &gcwq->worklist, entry)
+			send_mayday(work);
 	}
-	spin_unlock_irq(&cwq->lock);
+
+	spin_unlock_irq(&gcwq->lock);
+
+	mod_timer(&gcwq->mayday_timer, jiffies + MAYDAY_INTERVAL);
 }

-static int worker_thread(void *__cwq)
+/**
+ * maybe_create_worker - create a new worker if necessary
+ * @gcwq: gcwq to create a new worker for
+ *
+ * Create a new worker for @gcwq if necessary.  @gcwq is guaranteed to
+ * have at least one idle worker on return from this function.  If
+ * creating a new worker takes longer than MAYDAY_INTERVAL, mayday is
+ * sent to all emergency workers with works scheduled on @gcwq to
+ * resolve possible allocation deadlock.
+ *
+ * LOCKING:
+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times.
+ */
+static void maybe_create_worker(struct global_cwq *gcwq)
 {
-	struct cpu_workqueue_struct *cwq = __cwq;
-	DEFINE_WAIT(wait);
+	if (!need_new_worker(gcwq))
+		return;

-	if (cwq->wq->freezeable)
-		set_freezable();
+	/* if we don't make any progress in MAYDAY_INTERVAL, call for help */
+	mod_timer(&gcwq->mayday_timer, jiffies + MAYDAY_INTERVAL);

-	set_user_nice(current, -5);
+	do {
+		if (gcwq->nr_workers >= MAX_WORKERS_PER_CPU) {
+			if (printk_ratelimit())
+				printk(KERN_WARNING "workqueue: too many "
+				       "workers (%d) on cpu %d, can't create "
+				       "new ones\n",
+				       gcwq->nr_workers, gcwq->cpu);
+			goto cooldown;
+		}
+
+		if (create_worker(gcwq, true)) {
+			BUG_ON(need_new_worker(gcwq));
+			break;
+		}

-	for (;;) {
-		prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
-		if (!freezing(current) &&
-		    !kthread_should_stop() &&
-		    list_empty(&cwq->worklist))
-			schedule();
-		finish_wait(&cwq->more_work, &wait);
+		if (!need_new_worker(gcwq))
+			break;
+	cooldown:
+		spin_unlock_irq(&gcwq->lock);
+		schedule_timeout(CREATE_COOLDOWN);
+		spin_lock_irq(&gcwq->lock);
+	} while (need_new_worker(gcwq));

-		try_to_freeze();
+	del_timer_sync(&gcwq->mayday_timer);
+}

-		if (kthread_should_stop())
+/**
+ * maybe_destroy_worker - destroy workers which have been idle for a while
+ * @gcwq: gcwq to destroy workers for
+ *
+ * Destroy @gcwq workers which have been idle for longer than
+ * IDLE_WORKER_TIMEOUT.
+ *
+ * LOCKING:
+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times.
+ */
+static void maybe_destroy_workers(struct global_cwq *gcwq)
+{
+	while (too_many_workers(gcwq)) {
+		struct worker *worker;
+		unsigned long expires;
+
+		worker = list_entry(gcwq->idle_list.prev, struct worker, entry);
+		expires = worker->last_active + IDLE_WORKER_TIMEOUT;
+
+		if (time_before(jiffies, expires)) {
+			mod_timer(&gcwq->idle_timer, expires);
 			break;
+		}

-		run_workqueue(cwq);
+		destroy_worker(worker);
 	}
-
-	return 0;
 }

-struct wq_barrier {
-	struct work_struct	work;
-	struct completion	done;
-};
+static void manage_workers(struct global_cwq *gcwq)
+{
+	BUG_ON(gcwq->flags & GCWQ_MANAGING_WORKERS);

-static void wq_barrier_func(struct work_struct *work)
+	gcwq->flags &= ~GCWQ_MANAGE_WORKERS;
+	gcwq->flags |= GCWQ_MANAGING_WORKERS;
+
+	/*
+	 * Destroy and then create so that one idle worker is
+	 * guaranteed on return.
+	 */
+	maybe_destroy_workers(gcwq);
+	maybe_create_worker(gcwq);
+
+	gcwq->flags &= ~GCWQ_MANAGING_WORKERS;
+
+	if (unlikely(gcwq->trustee))
+		wake_up_all(&gcwq->trustee_wait);
+}
+
+static void schedule_work_to_worker(struct global_cwq *gcwq,
+				    struct worker *worker,
+				    struct work_struct *work,
+				    struct work_struct **nextp)
 {
-	struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
-	complete(&barr->done);
+	struct work_struct *n;
+
+ 	list_for_each_entry_safe_continue(work, n, &gcwq->worklist, entry) {
+		list_move_tail(&work->entry, &worker->scheduled);
+		if (!(*work_data_bits(work) & WORK_STRUCT_LINKED)) {
+			work = n;
+			break;
+		}
+	}
+
+	/*
+	 * If we're already inside safe list traversal and have moved
+	 * multiple works to the scheduled queue, the next position
+	 * needs to be updated.
+	 */
+	if (nextp)
+		*nextp = work;
 }

-static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
-			struct wq_barrier *barr, struct list_head *head)
+static void process_one_work(struct worker *worker, struct work_struct *work)
 {
-	INIT_WORK(&barr->work, wq_barrier_func);
-	__set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));
+	struct cpu_workqueue_struct *cwq = get_wq_data(work);
+	struct global_cwq *gcwq = cwq->gcwq;
+	struct hlist_head *bwh = busy_worker_head(gcwq, work);
+	work_func_t f = work->func;
+	unsigned int work_color;
+	struct worker *collision;
+#ifdef CONFIG_LOCKDEP
+	/*
+	 * It is permissible to free the struct work_struct
+	 * from inside the function that is called from it,
+	 * this we need to take into account for lockdep too.
+	 * To avoid bogus "held lock freed" warnings as well
+	 * as problems when looking into work->lockdep_map,
+	 * make a copy and use that here.
+	 */
+	struct lockdep_map lockdep_map = work->lockdep_map;
+#endif
+	/*
+	 * A single work shouldn't be executed concurrently by
+	 * multiple workers on a single cpu.  Check whether anyone is
+	 * already processing the work.  If so, defer the work to the
+	 * currently executing one.
+	 */
+	collision = __find_worker_executing_work(gcwq, bwh, work);
+	if (unlikely(collision)) {
+		schedule_work_to_worker(gcwq, collision, work, NULL);
+		return;
+	}

-	init_completion(&barr->done);
+	/* claim and process */
+	hlist_add_head(&worker->hentry, bwh);
+	worker->current_work = work;
+	work_color = *work_data_bits(work) & WORK_STRUCT_COLOR;
+	list_del_init(&work->entry);
+
+	spin_unlock_irq(&gcwq->lock);

-	insert_work(cwq, &barr->work, head);
+	work_clear_pending(work);
+	lock_map_acquire(&cwq->wq->lockdep_map);
+	lock_map_acquire(&lockdep_map);
+	f(work);
+	lock_map_release(&lockdep_map);
+	lock_map_release(&cwq->wq->lockdep_map);
+
+	if (unlikely(in_atomic()) || lockdep_depth(current) > 0) {
+		printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
+		       "%s/0x%08x/%d\n",
+		       current->comm, preempt_count(),
+		       task_pid_nr(current));
+		printk(KERN_ERR "    last function: ");
+		print_symbol("%s\n", (unsigned long)f);
+		debug_show_held_locks(current);
+		dump_stack();
+	}
+
+	spin_lock_irq(&gcwq->lock);
+
+	/* we're done with it, release */
+	hlist_del_init(&worker->hentry);
+	worker->current_work = NULL;
+	cwq->nr_in_flight--;
+
+	if (unlikely(cwq->flush_cnt)) {
+		if (work_color ^ cwq->flush_color && !--cwq->flush_cnt &&
+		    atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush))
+			complete(cwq->wq->flush_done);
+	}
 }

-static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
+static void process_scheduled_works(struct global_cwq *gcwq,
+				    struct worker *worker)
 {
-	int active = 0;
-	struct wq_barrier barr;
+	while (!list_empty(&worker->scheduled)) {
+		struct work_struct *work = list_first_entry(&worker->scheduled,
+						struct work_struct, entry);
+		process_one_work(worker, work);
+	}
+}
+
+/**
+ * worker_thread - the worker thread function
+ * @__worker: self
+ *
+ * The gcwq worker thread function.  There's a single dynamic pool of
+ * these per each cpu.  These workers process all works regardless of
+ * their specific target workqueue.  The only exception is works which
+ * are issued to workqueues with an attached emergency worker which
+ * will be explained in emergency_thread().
+ */
+static int worker_thread(void *__worker)
+{
+	struct worker *worker = worker;
+	struct global_cwq *gcwq = worker->gcwq;
+	struct sched_param sched_param = { .sched_priority = 0 };
+
+	/* set workqueue scheduler */
+	worker->task->flags |= PF_WORKQUEUE;
+	sched_setscheduler_nocheck(worker->task, SCHED_NORMAL, &sched_param);
+
+	set_user_nice(current, WORKER_NICE_LEVEL);
+woke_up:
+	spin_lock_irq(&gcwq->lock);
+
+	/* DIE can be set only while we're idle, checking here is enough */
+	if (worker->req_flags & WORKER_REQ_DIE) {
+		spin_unlock_irq(&gcwq->lock);
+		return 0;
+	}
+
+	worker_leave_idle(gcwq, worker);
+repeat:
+	/*
+	 * We just left idle.  The first thing to do is making sure
+	 * the worker pool has at least one idle worker.  Play the
+	 * manager if necessary.
+	 */
+	if (unlikely(need_to_manage_workers(gcwq)))
+		manage_workers(gcwq);
+
+	/*
+	 * When control reaches this point, we're guaranteed to have
+	 * at least one idle worker or that someone else has already
+	 * assumed the manager role.
+	 */
+	while (worker_keep_busy(gcwq, 1)) {
+		struct work_struct *work = list_first_entry(&gcwq->worklist,
+						struct work_struct, entry);

-	WARN_ON(cwq->thread == current);
+		if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
+			/* optimization path, not strictly necessary */
+			BUG_ON(!list_empty(&worker->scheduled));
+			process_one_work(worker, work);
+		} else
+			schedule_work_to_worker(gcwq, worker, work, NULL);
+
+		if (unlikely(!list_empty(&worker->scheduled)))
+			process_scheduled_works(gcwq, worker);

-	spin_lock_irq(&cwq->lock);
-	if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
-		insert_wq_barrier(cwq, &barr, &cwq->worklist);
-		active = 1;
 	}
-	spin_unlock_irq(&cwq->lock);

-	if (active)
-		wait_for_completion(&barr.done);
+	/* this might have changed while we were running works */
+	if (unlikely(need_to_manage_workers(gcwq)))
+		goto repeat;
+
+	/*
+	 * gcwq->lock is held and there's no work to process and no
+	 * need to manage, sleep.  Workers are woken up only while
+	 * holding gcwq->lock or from local cpu, so setting the
+	 * current state before releasing gcwq->lock is enough to
+	 * prevent losing any event.
+	 */
+	worker_enter_idle(gcwq, worker);
+	__set_current_state(TASK_INTERRUPTIBLE);
+	spin_unlock_irq(&gcwq->lock);
+	schedule();
+	goto woke_up;
+}
+
+/**
+ * emergency_thread - the emergency worker thread function
+ * @__wq: the associated workqueue
+ *
+ * Workqueue emergency worker thread function.  There's one emergency
+ * thread for each workqueue which has WQ_EMERGENCY_WORKER set.
+ *
+ * Regular work processing on a gcwq may block trying to create a new
+ * worker which depends on GFP_KERNEL allocation which has slight
+ * chance of developing into deadlock if some works currently on the
+ * same queue need to be processed to finish the GFP_KERNEL
+ * allocation.  This is the problem emergency worker solves.
+ *
+ * When such condition is possible, the gcwq summons emergency workers
+ * of all workqueues which have works queued on the gcwq and let them
+ * process those works so that allocation can succeed and forward
+ * progress can be guaranteed.
+ *
+ * This should happen *VERY* rarely.
+ */
+static int emergency_thread(void *__wq)
+{
+	struct workqueue_struct *wq = __wq;
+	struct worker *worker = wq->emergency;
+	unsigned int cpu;

-	return active;
+	set_user_nice(current, EMERGENCY_NICE_LEVEL);
+repeat:
+	set_current_state(TASK_INTERRUPTIBLE);
+
+	if (kthread_should_stop())
+		return 0;
+
+	for_each_cpu(cpu, wq->mayday_mask) {
+		struct global_cwq *gcwq = get_gcwq(cpu);
+		struct work_struct *work, *n;
+
+		__set_current_state(TASK_RUNNING);
+		cpumask_clear_cpu(cpu, wq->mayday_mask);
+
+		spin_lock_irq(&gcwq->lock);
+
+		/* don't matter for emergency workers but set them anyway */
+		worker->state = WORKER_STA_RUNNING;
+
+		/* slurp in all works issued via this workqueue */
+		list_for_each_entry_safe(work, n, &gcwq->worklist, entry)
+			schedule_work_to_worker(gcwq, worker, work, &n);
+
+		process_scheduled_works(gcwq, worker);
+
+		worker->state = WORKER_STA_IDLE;
+		spin_unlock_irq(&gcwq->lock);
+	}
+
+	schedule();
+	goto repeat;
 }

 /**
@@ -395,17 +1179,98 @@ static int flush_cpu_workqueue(struct cp
  */
 void flush_workqueue(struct workqueue_struct *wq)
 {
-	const struct cpumask *cpu_map = wq_cpu_map(wq);
-	int cpu;
+	DECLARE_COMPLETION_ONSTACK(flush_done);
+	bool wait = false;
+	unsigned int cpu;

-	might_sleep();
 	lock_map_acquire(&wq->lockdep_map);
 	lock_map_release(&wq->lockdep_map);
-	for_each_cpu(cpu, cpu_map)
-		flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
+
+	/* only single flush can be in progress at any given time */
+	mutex_lock(&wq->flush_mutex);
+
+	BUG_ON(atomic_read(&wq->nr_cwqs_to_flush) || wq->flush_done);
+
+	wq->flush_done = &flush_done;
+
+	for_each_possible_cpu(cpu) {
+		struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+		struct global_cwq *gcwq = cwq->gcwq;
+
+		spin_lock_irq(&gcwq->lock);
+
+		BUG_ON(cwq->flush_cnt);
+
+		cwq->flush_color ^= WORK_STRUCT_COLOR;
+		cwq->flush_cnt = cwq->nr_in_flight;
+
+		if (cwq->flush_cnt) {
+			atomic_inc(&wq->nr_cwqs_to_flush);
+			wait = true;
+		}
+
+		spin_unlock_irq(&gcwq->lock);
+	}
+
+	if (wait)
+		wait_for_completion(&flush_done);
+
+	wq->flush_done = NULL;
+
+	mutex_unlock(&wq->flush_mutex);
 }
 EXPORT_SYMBOL_GPL(flush_workqueue);

+struct wq_barrier {
+	struct work_struct	work;
+	struct completion	done;
+};
+
+static void wq_barrier_func(struct work_struct *work)
+{
+	struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
+	complete(&barr->done);
+}
+
+/**
+ * insert_wq_barrier - insert a barrier work
+ * @barr: wq_barrier to insert
+ * @target: target work to attach @barr to
+ * @worker: worker currently executing @target, NULL if @target is not executing
+ *
+ * @barr is linked to @target such that @barr is completed only after
+ * @target finishes execution.  Please note that the ordering
+ * guarantee is observed only with respect to @target and on the local
+ * cpu.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ */
+static void insert_wq_barrier(struct wq_barrier *barr,
+			      struct work_struct *target, struct worker *worker)
+{
+	struct cpu_workqueue_struct *cwq = get_wq_data(target);
+	struct list_head *head;
+	unsigned int linked = 0;
+
+	INIT_WORK(&barr->work, wq_barrier_func);
+	init_completion(&barr->done);
+
+	/*
+	 * If @target is currently being executed, schedule the
+	 * barrier to the worker; otherwise, put it after @target.
+	 */
+	if (worker)
+		head = &worker->scheduled;
+	else {
+		head = target->entry.next;
+		/* there can already be other linked works, inherit the flag */
+		linked = *work_data_bits(target) & WORK_STRUCT_LINKED;
+	}
+
+	insert_work(cwq->gcwq, cwq, &barr->work, head, linked);
+}
+
 /**
  * flush_work - block until a work_struct's callback has terminated
  * @work: the work which is to be flushed
@@ -418,20 +1283,21 @@ EXPORT_SYMBOL_GPL(flush_workqueue);
  */
 int flush_work(struct work_struct *work)
 {
+	struct worker *worker = NULL;
 	struct cpu_workqueue_struct *cwq;
-	struct list_head *prev;
+	struct global_cwq *gcwq;
 	struct wq_barrier barr;

 	might_sleep();
 	cwq = get_wq_data(work);
 	if (!cwq)
 		return 0;
+	gcwq = cwq->gcwq;

 	lock_map_acquire(&cwq->wq->lockdep_map);
 	lock_map_release(&cwq->wq->lockdep_map);

-	prev = NULL;
-	spin_lock_irq(&cwq->lock);
+	spin_lock_irq(&gcwq->lock);
 	if (!list_empty(&work->entry)) {
 		/*
 		 * See the comment near try_to_grab_pending()->smp_rmb().
@@ -439,21 +1305,20 @@ int flush_work(struct work_struct *work)
 		 */
 		smp_rmb();
 		if (unlikely(cwq != get_wq_data(work)))
-			goto out;
-		prev = &work->entry;
+			goto already_gone;
 	} else {
-		if (cwq->current_work != work)
-			goto out;
-		prev = &cwq->worklist;
-	}
-	insert_wq_barrier(cwq, &barr, prev->next);
-out:
-	spin_unlock_irq(&cwq->lock);
-	if (!prev)
-		return 0;
+		worker = find_worker_executing_work(gcwq, work);
+		if (!worker)
+			goto already_gone;
+	}

+	insert_wq_barrier(&barr, work, worker);
+	spin_unlock_irq(&gcwq->lock);
 	wait_for_completion(&barr.done);
 	return 1;
+already_gone:
+	spin_unlock_irq(&gcwq->lock);
+	return 0;
 }
 EXPORT_SYMBOL_GPL(flush_work);

@@ -463,10 +1328,11 @@ EXPORT_SYMBOL_GPL(flush_work);
  */
 static int try_to_grab_pending(struct work_struct *work)
 {
+	struct global_cwq *gcwq;
 	struct cpu_workqueue_struct *cwq;
 	int ret = -1;

-	if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work)))
+	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
 		return 0;

 	/*
@@ -477,8 +1343,9 @@ static int try_to_grab_pending(struct wo
 	cwq = get_wq_data(work);
 	if (!cwq)
 		return ret;
+	gcwq = cwq->gcwq;

-	spin_lock_irq(&cwq->lock);
+	spin_lock_irq(&gcwq->lock);
 	if (!list_empty(&work->entry)) {
 		/*
 		 * This work is queued, but perhaps we locked the wrong cwq.
@@ -491,7 +1358,7 @@ static int try_to_grab_pending(struct wo
 			ret = 1;
 		}
 	}
-	spin_unlock_irq(&cwq->lock);
+	spin_unlock_irq(&gcwq->lock);

 	return ret;
 }
@@ -499,17 +1366,19 @@ static int try_to_grab_pending(struct wo
 static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
 				struct work_struct *work)
 {
+	struct global_cwq *gcwq = cwq->gcwq;
 	struct wq_barrier barr;
-	int running = 0;
+	struct worker *worker;

-	spin_lock_irq(&cwq->lock);
-	if (unlikely(cwq->current_work == work)) {
-		insert_wq_barrier(cwq, &barr, cwq->worklist.next);
-		running = 1;
-	}
-	spin_unlock_irq(&cwq->lock);
+	spin_lock_irq(&gcwq->lock);
+
+	worker = find_worker_executing_work(gcwq, work);
+	if (unlikely(worker))
+		insert_wq_barrier(&barr, work, worker);

-	if (unlikely(running))
+	spin_unlock_irq(&gcwq->lock);
+
+	if (unlikely(worker))
 		wait_for_completion(&barr.done);
 }

@@ -517,7 +1386,6 @@ static void wait_on_work(struct work_str
 {
 	struct cpu_workqueue_struct *cwq;
 	struct workqueue_struct *wq;
-	const struct cpumask *cpu_map;
 	int cpu;

 	might_sleep();
@@ -530,10 +1398,9 @@ static void wait_on_work(struct work_str
 		return;

 	wq = cwq->wq;
-	cpu_map = wq_cpu_map(wq);

-	for_each_cpu(cpu, cpu_map)
-		wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
+	for_each_possible_cpu(cpu)
+		wait_on_cpu_work(get_cwq(cpu, wq), work);
 }

 static int __cancel_work_timer(struct work_struct *work,
@@ -723,165 +1590,66 @@ int keventd_up(void)

 int current_is_keventd(void)
 {
-	struct cpu_workqueue_struct *cwq;
-	int cpu = raw_smp_processor_id(); /* preempt-safe: keventd is per-cpu */
-	int ret = 0;
-
-	BUG_ON(!keventd_wq);
-
-	cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
-	if (current == cwq->thread)
-		ret = 1;
-
-	return ret;
-
-}
-
-static struct cpu_workqueue_struct *
-init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
-{
-	struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
-
-	cwq->wq = wq;
-	spin_lock_init(&cwq->lock);
-	INIT_LIST_HEAD(&cwq->worklist);
-	init_waitqueue_head(&cwq->more_work);
-
-	return cwq;
-}
-
-static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
-{
-	struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
-	struct workqueue_struct *wq = cwq->wq;
-	const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";
-	struct task_struct *p;
-
-	p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
-	/*
-	 * Nobody can add the work_struct to this cwq,
-	 *	if (caller is __create_workqueue)
-	 *		nobody should see this wq
-	 *	else // caller is CPU_UP_PREPARE
-	 *		cpu is not on cpu_online_map
-	 * so we can abort safely.
-	 */
-	if (IS_ERR(p))
-		return PTR_ERR(p);
-	if (cwq->wq->rt)
-		sched_setscheduler_nocheck(p, SCHED_FIFO, &param);
-	cwq->thread = p;
-
-	trace_workqueue_creation(cwq->thread, cpu);
-
-	return 0;
-}
-
-static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
-{
-	struct task_struct *p = cwq->thread;
-
-	if (p != NULL) {
-		if (cpu >= 0)
-			kthread_bind(p, cpu);
-		wake_up_process(p);
-	}
+	return (bool)(current->flags & PF_WORKQUEUE);
 }

 struct workqueue_struct *__create_workqueue_key(const char *name,
-						int singlethread,
-						int freezeable,
-						int rt,
+						unsigned int flags,
 						struct lock_class_key *key,
 						const char *lock_name)
 {
-	struct workqueue_struct *wq;
-	struct cpu_workqueue_struct *cwq;
-	int err = 0, cpu;
+	struct workqueue_struct *wq = NULL;
+	struct cpu_workqueue_struct *cwq = NULL;
+	struct worker *emergency = NULL;
+	unsigned int cpu;

 	wq = kzalloc(sizeof(*wq), GFP_KERNEL);
 	if (!wq)
-		return NULL;
+		goto err;

 	wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
-	if (!wq->cpu_wq) {
-		kfree(wq);
-		return NULL;
-	}
+	if (!wq->cpu_wq)
+		goto err;

+	wq->flags = flags;
+	mutex_init(&wq->flush_mutex);
+	atomic_set(&wq->nr_cwqs_to_flush, 0);
 	wq->name = name;
 	lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
-	wq->singlethread = singlethread;
-	wq->freezeable = freezeable;
-	wq->rt = rt;
-	INIT_LIST_HEAD(&wq->list);
-
-	if (singlethread) {
-		cwq = init_cpu_workqueue(wq, singlethread_cpu);
-		err = create_workqueue_thread(cwq, singlethread_cpu);
-		start_workqueue_thread(cwq, -1);
-	} else {
-		cpu_maps_update_begin();
-		/*
-		 * We must place this wq on list even if the code below fails.
-		 * cpu_down(cpu) can remove cpu from cpu_populated_map before
-		 * destroy_workqueue() takes the lock, in that case we leak
-		 * cwq[cpu]->thread.
-		 */
-		spin_lock(&workqueue_lock);
-		list_add(&wq->list, &workqueues);
-		spin_unlock(&workqueue_lock);
-		/*
-		 * We must initialize cwqs for each possible cpu even if we
-		 * are going to call destroy_workqueue() finally. Otherwise
-		 * cpu_up() can hit the uninitialized cwq once we drop the
-		 * lock.
-		 */
-		for_each_possible_cpu(cpu) {
-			cwq = init_cpu_workqueue(wq, cpu);
-			if (err || !cpu_online(cpu))
-				continue;
-			err = create_workqueue_thread(cwq, cpu);
-			start_workqueue_thread(cwq, cpu);
-		}
-		cpu_maps_update_done();
-	}

-	if (err) {
-		destroy_workqueue(wq);
-		wq = NULL;
+	for_each_possible_cpu(cpu) {
+		struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+
+		cwq->gcwq = get_gcwq(cpu);
+		cwq->wq = wq;
 	}
-	return wq;
-}
-EXPORT_SYMBOL_GPL(__create_workqueue_key);

-static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
-{
-	/*
-	 * Our caller is either destroy_workqueue() or CPU_POST_DEAD,
-	 * cpu_add_remove_lock protects cwq->thread.
-	 */
-	if (cwq->thread == NULL)
-		return;
+	if (flags & WQ_EMERGENCY_WORKER) {
+		if (!alloc_cpumask_var(&wq->mayday_mask, GFP_KERNEL))
+			goto err;
+
+		emergency = alloc_worker();
+		if (!emergency)
+			goto err;
+
+		emergency->task = kthread_create(emergency_thread, wq,
+						 "%s", name);
+		if (IS_ERR(emergency->task))
+			goto err;

-	lock_map_acquire(&cwq->wq->lockdep_map);
-	lock_map_release(&cwq->wq->lockdep_map);
+		wq->emergency = emergency;
+	}

-	flush_cpu_workqueue(cwq);
-	/*
-	 * If the caller is CPU_POST_DEAD and cwq->worklist was not empty,
-	 * a concurrent flush_workqueue() can insert a barrier after us.
-	 * However, in that case run_workqueue() won't return and check
-	 * kthread_should_stop() until it flushes all work_struct's.
-	 * When ->worklist becomes empty it is safe to exit because no
-	 * more work_structs can be queued on this cwq: flush_workqueue
-	 * checks list_empty(), and a "normal" queue_work() can't use
-	 * a dead CPU.
-	 */
-	trace_workqueue_destruction(cwq->thread);
-	kthread_stop(cwq->thread);
-	cwq->thread = NULL;
+	return wq;
+err:
+	if (wq)
+		free_cpumask_var(wq->mayday_mask);
+	kfree(wq);
+	kfree(cwq);
+	kfree(emergency);
+	return NULL;
 }
+EXPORT_SYMBOL_GPL(__create_workqueue_key);

 /**
  * destroy_workqueue - safely terminate a workqueue
@@ -891,70 +1659,273 @@ static void cleanup_workqueue_thread(str
  */
 void destroy_workqueue(struct workqueue_struct *wq)
 {
-	const struct cpumask *cpu_map = wq_cpu_map(wq);
-	int cpu;
+	unsigned int cpu;
+
+	flush_workqueue(wq);

-	cpu_maps_update_begin();
-	spin_lock(&workqueue_lock);
-	list_del(&wq->list);
-	spin_unlock(&workqueue_lock);
-
-	for_each_cpu(cpu, cpu_map)
-		cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu));
- 	cpu_maps_update_done();
+	/* sanity check */
+	for_each_possible_cpu(cpu) {
+		struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+		BUG_ON(cwq->nr_in_flight);
+	}
+
+	if (wq->flags & WQ_EMERGENCY_WORKER) {
+		kthread_stop(wq->emergency->task);
+		free_cpumask_var(wq->mayday_mask);
+	}

 	free_percpu(wq->cpu_wq);
 	kfree(wq);
 }
 EXPORT_SYMBOL_GPL(destroy_workqueue);

+static void set_trustee_target_state(struct global_cwq *gcwq, int target_state)
+{
+	if (gcwq->trustee_target != target_state) {
+		gcwq->trustee_target = target_state;
+		wake_up_all(&gcwq->trustee_wait);
+	}
+}
+
+static void wait_trustee_state(struct global_cwq *gcwq, int target_state)
+{
+	set_trustee_target_state(gcwq, target_state);
+
+	if (gcwq->trustee_state != gcwq->trustee_target) {
+		spin_unlock_irq(&gcwq->lock);
+		__wait_event(gcwq->trustee_wait,
+			     gcwq->trustee_state == TRUSTEE_DONE ||
+			     gcwq->trustee_state == gcwq->trustee_target);
+		spin_lock_irq(&gcwq->lock);
+	}
+}
+
+#define trustee_wait_event_timeout(cond, timeout) ({			\
+	long __ret = (timeout);						\
+	while (!(cond) && gcwq->trustee_target != TRUSTEE_CANCEL) {	\
+		spin_unlock_irq(&gcwq->lock);				\
+		__wait_event_timeout(gcwq->trustee_wait, (cond) ||	\
+				gcwq->trustee_target == TRUSTEE_CANCEL,	\
+				__ret);					\
+		spin_lock_irq(&gcwq->lock);				\
+	}								\
+	gcwq->trustee_target == TRUSTEE_CANCEL ? -1 : (__ret);		\
+})
+
+#define trustee_wait_event(cond) ({					\
+	long __ret1;							\
+	__ret1 = trustee_wait_event_timeout(cond, MAX_SCHEDULE_TIMEOUT);\
+	__ret1 < 0 ? -1 : 0;						\
+})
+
+static int __devinit trustee_state_reached(struct global_cwq *gcwq, int state)
+{
+	gcwq->trustee_state = state;
+	wake_up_all(&gcwq->trustee_wait);
+	return trustee_wait_event(gcwq->trustee_state != gcwq->trustee_target);
+}
+
+static bool __devinit trustee_unset_rogue(struct worker *worker)
+{
+	struct global_cwq *gcwq = worker->gcwq;
+
+	if (!(worker->state & WORKER_STA_ROGUE))
+		return false;
+
+	spin_unlock_irq(&gcwq->lock);
+	BUG_ON(set_cpus_allowed_ptr(worker->task, get_cpu_mask(gcwq->cpu)));
+	spin_lock_irq(&gcwq->lock);
+	worker->state &= ~WORKER_STA_ROGUE;
+	return true;
+}
+
+static void __devinit trustee_reap_workfn(struct work_struct *work)
+{
+	struct global_cwq *gcwq =
+		container_of(work, struct global_cwq, trustee_reap);
+
+	kthread_stop(gcwq->trustee);
+}
+
+static int __devinit trustee_thread(void *__gcwq)
+{
+	struct global_cwq *gcwq = __gcwq;
+	struct worker *worker;
+	struct work_struct *work;
+	int next_state, i;
+
+	spin_lock_irq(&gcwq->lock);
+repeat:
+	next_state = gcwq->trustee_target;
+	switch (next_state) {
+	case TRUSTEE_IN_CHARGE:
+		/*
+		 * Claim the manager position.  Trustee can't be
+		 * cancelled at this point.
+		 */
+		BUG_ON(gcwq->cpu != smp_processor_id());
+		BUG_ON(trustee_wait_event(
+				!(gcwq->flags & GCWQ_MANAGING_WORKERS)) < 0);
+		gcwq->flags |= GCWQ_MANAGING_WORKERS;
+
+		/* make all workers ROGUE */
+		list_for_each_entry(worker, &gcwq->idle_list, entry)
+			worker->state |= WORKER_STA_ROGUE;
+
+		for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) {
+			struct hlist_head *head = &gcwq->busy_hash[i];
+			struct hlist_node *pos;
+
+			hlist_for_each_entry(worker, pos, head, hentry) {
+				if (worker->state & WORKER_STA_RUNNING) {
+					worker->state &= ~WORKER_STA_RUNNING;
+					gcwq->nr_running--;
+				}
+				worker->state |= WORKER_STA_ROGUE;
+			}
+		}
+		WARN_ON(gcwq->nr_running);
+		del_timer_sync(&gcwq->idle_timer);
+		break;
+
+	case TRUSTEE_DRAIN:
+		/* the original cpu is dead, try draining any left work */
+		while (!list_empty(&gcwq->worklist)) {
+			int nr_works = 0;
+
+			list_for_each_entry(work, &gcwq->worklist, entry) {
+				send_mayday(work);
+				nr_works++;
+			}
+
+			list_for_each_entry(worker, &gcwq->idle_list, entry) {
+				if (!nr_works--)
+					break;
+				wake_up_process(worker->task);
+			}
+
+			if (trustee_wait_event_timeout(false, HZ) < 0)
+				break;
+
+			if (need_new_worker(gcwq)) {
+				worker = create_worker(gcwq, false);
+				if (worker) {
+					worker->state |= WORKER_STA_ROGUE;
+					wake_up_process(worker->task);
+				}
+			}
+		}
+
+		/* clean up idle workers */
+		while (gcwq->nr_workers) {
+			while (!list_empty(&gcwq->idle_list)) {
+				worker = list_first_entry(&gcwq->idle_list,
+							  struct worker, entry);
+				destroy_worker(worker);
+			}
+
+			if (trustee_wait_event(
+					!list_empty(&gcwq->idle_list)) < 0)
+				break;
+		}
+
+		if (gcwq->nr_workers)
+			next_state = TRUSTEE_CANCEL;
+		else
+			next_state = TRUSTEE_DONE;
+		break;
+
+	case TRUSTEE_RELEASE:
+	recheck:
+		list_for_each_entry(worker, &gcwq->idle_list, entry)
+			if (trustee_unset_rogue(worker))
+				goto recheck;
+
+		for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) {
+			struct hlist_head *head = &gcwq->busy_hash[i];
+			struct hlist_node *pos;
+
+			hlist_for_each_entry(worker, pos, head, hentry)
+				if (trustee_unset_rogue(worker))
+					goto recheck;
+		}
+
+		next_state = TRUSTEE_DONE;
+		break;
+	}
+	if (gcwq->trustee_state != TRUSTEE_DONE) {
+		trustee_state_reached(gcwq, next_state);
+		goto repeat;
+	}
+
+	spin_unlock_irq(&gcwq->lock);
+	schedule_work(&gcwq->trustee_reap);
+	spin_lock_irq(&gcwq->lock);
+	trustee_state_reached(gcwq, TRUSTEE_DONE);
+	spin_unlock_irq(&gcwq->lock);
+
+	return 0;
+}
+
 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
 						unsigned long action,
 						void *hcpu)
 {
 	unsigned int cpu = (unsigned long)hcpu;
-	struct cpu_workqueue_struct *cwq;
-	struct workqueue_struct *wq;
+	struct global_cwq *gcwq = get_gcwq(cpu);
+	struct task_struct *trustee = NULL;
 	int ret = NOTIFY_OK;

 	action &= ~CPU_TASKS_FROZEN;

-	switch (action) {
-	case CPU_UP_PREPARE:
-		cpumask_set_cpu(cpu, cpu_populated_map);
+	if (action == CPU_DOWN_PREPARE) {
+		trustee = kthread_create(trustee_thread, gcwq,
+					 "workqueue_trustee/%d\n", cpu);
+		if (IS_ERR(trustee))
+			return NOTIFY_BAD;
 	}
-undo:
-	list_for_each_entry(wq, &workqueues, list) {
-		cwq = per_cpu_ptr(wq->cpu_wq, cpu);
-
-		switch (action) {
-		case CPU_UP_PREPARE:
-			if (!create_workqueue_thread(cwq, cpu))
-				break;
-			printk(KERN_ERR "workqueue [%s] for %i failed\n",
-				wq->name, cpu);
-			action = CPU_UP_CANCELED;
-			ret = NOTIFY_BAD;
-			goto undo;

-		case CPU_ONLINE:
-			start_workqueue_thread(cwq, cpu);
-			break;
+	spin_lock_irq(&gcwq->lock);

-		case CPU_UP_CANCELED:
-			start_workqueue_thread(cwq, -1);
-		case CPU_POST_DEAD:
-			cleanup_workqueue_thread(cwq);
-			break;
+	switch (action) {
+	case CPU_UP_PREPARE:
+		wait_trustee_state(gcwq, TRUSTEE_CANCEL);
+		if (gcwq->trustee_state == TRUSTEE_DONE) {
+			/* create the first worker */
+			BUG_ON(gcwq->nr_workers);
+			if (!create_worker(gcwq, true))
+				ret = NOTIFY_BAD;
 		}
-	}
+		break;

-	switch (action) {
 	case CPU_UP_CANCELED:
-	case CPU_POST_DEAD:
-		cpumask_clear_cpu(cpu, cpu_populated_map);
+		wait_trustee_state(gcwq, TRUSTEE_DRAIN);
+		break;
+
+	case CPU_ONLINE:
+		wait_trustee_state(gcwq, TRUSTEE_RELEASE);
+		wake_up_worker(gcwq);
+		break;
+
+	case CPU_DOWN_PREPARE:
+		BUG_ON(gcwq->trustee || gcwq->trustee_state != TRUSTEE_DONE);
+		gcwq->trustee = current;
+		gcwq->trustee_state = TRUSTEE_NONE;
+		gcwq->trustee_target = TRUSTEE_NONE;
+		wake_up_process(trustee);
+		wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE);
+		break;
+
+	case CPU_DOWN_FAILED:
+		wait_trustee_state(gcwq, TRUSTEE_RELEASE);
+		break;
+
+	case CPU_DEAD:
+		set_trustee_target_state(gcwq, TRUSTEE_DRAIN);
+		break;
 	}

+	spin_unlock_irq(&gcwq->lock);
 	return ret;
 }

@@ -1007,12 +1978,41 @@ EXPORT_SYMBOL_GPL(work_on_cpu);

 void __init init_workqueues(void)
 {
-	alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL);
+	unsigned int cpu;
+	int i;

-	cpumask_copy(cpu_populated_map, cpu_online_mask);
-	singlethread_cpu = cpumask_first(cpu_possible_mask);
-	cpu_singlethread_map = cpumask_of(singlethread_cpu);
 	hotcpu_notifier(workqueue_cpu_callback, 0);
+
+	/* initialize gcwqs */
+	for_each_possible_cpu(cpu) {
+		struct global_cwq *gcwq = get_gcwq(cpu);
+
+		spin_lock_init(&gcwq->lock);
+		INIT_LIST_HEAD(&gcwq->worklist);
+		gcwq->cpu = cpu;
+
+		INIT_LIST_HEAD(&gcwq->idle_list);
+		for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)
+			INIT_HLIST_HEAD(&gcwq->busy_hash[i]);
+
+		init_timer_deferrable(&gcwq->idle_timer);
+		gcwq->idle_timer.function = idle_worker_timeout;
+		gcwq->idle_timer.data = (unsigned long)gcwq;
+
+		setup_timer(&gcwq->mayday_timer, gcwq_mayday_timeout,
+			    (unsigned long)gcwq);
+
+		gcwq->trustee_state = TRUSTEE_DONE;
+		gcwq->trustee_target = TRUSTEE_DONE;
+		init_waitqueue_head(&gcwq->trustee_wait);
+		INIT_WORK(&gcwq->trustee_reap, trustee_reap_workfn);
+
+		/* create the first worker */
+		spin_lock_irq(&gcwq->lock);
+		BUG_ON(!create_worker(gcwq, true));
+		spin_unlock_irq(&gcwq->lock);
+	}
+
 	keventd_wq = create_workqueue("events");
 	BUG_ON(!keventd_wq);
 }
Index: work/kernel/sched.c
===================================================================
--- work.orig/kernel/sched.c
+++ work/kernel/sched.c
@@ -1732,10 +1732,13 @@ static void cfs_rq_set_shares(struct cfs

 static void calc_load_account_active(struct rq *this_rq);

+#define sched_class_equal(a, b)		((a)->identity == (b)->identity)
+
 #include "sched_stats.h"
 #include "sched_idletask.c"
 #include "sched_fair.c"
 #include "sched_rt.c"
+#include "sched_workqueue.c"
 #ifdef CONFIG_SCHED_DEBUG
 # include "sched_debug.c"
 #endif
@@ -1906,7 +1909,7 @@ static inline void check_class_changed(s
 				       const struct sched_class *prev_class,
 				       int oldprio, int running)
 {
-	if (prev_class != p->sched_class) {
+	if (!sched_class_equal(prev_class, p->sched_class)) {
 		if (prev_class->switched_from)
 			prev_class->switched_from(rq, p, running);
 		p->sched_class->switched_to(rq, p, running);
@@ -1938,7 +1941,7 @@ task_hot(struct task_struct *p, u64 now,
 			 &p->se == cfs_rq_of(&p->se)->last))
 		return 1;

-	if (p->sched_class != &fair_sched_class)
+	if (!sched_class_equal(p->sched_class, &fair_sched_class))
 		return 0;

 	if (sysctl_sched_migration_cost == -1)
@@ -6085,7 +6088,10 @@ __setscheduler(struct rq *rq, struct tas
 	case SCHED_NORMAL:
 	case SCHED_BATCH:
 	case SCHED_IDLE:
-		p->sched_class = &fair_sched_class;
+		if (p->flags & PF_WORKQUEUE)
+			p->sched_class = &workqueue_sched_class;
+		else
+			p->sched_class = &fair_sched_class;
 		break;
 	case SCHED_FIFO:
 	case SCHED_RR:
@@ -10230,7 +10236,7 @@ cpu_cgroup_can_attach(struct cgroup_subs
 		return -EINVAL;
 #else
 	/* We don't support RT-tasks being in separate groups */
-	if (tsk->sched_class != &fair_sched_class)
+	if (!sched_class_equal(tsk->sched_class, &fair_sched_class))
 		return -EINVAL;
 #endif

Index: work/kernel/sched_fair.c
===================================================================
--- work.orig/kernel/sched_fair.c
+++ work/kernel/sched_fair.c
@@ -934,7 +934,7 @@ static void hrtick_update(struct rq *rq)
 {
 	struct task_struct *curr = rq->curr;

-	if (curr->sched_class != &fair_sched_class)
+	if (!sched_class_equal(curr->sched_class, &fair_sched_class))
 		return;

 	if (cfs_rq_of(&curr->se)->nr_running < sched_nr_latency)
@@ -1450,7 +1450,7 @@ static void check_preempt_wakeup(struct
 		return;
 	}

-	if (unlikely(p->sched_class != &fair_sched_class))
+	if (unlikely(!sched_class_equal(p->sched_class, &fair_sched_class)))
 		return;

 	if (unlikely(se == pse))
@@ -1799,34 +1799,48 @@ static void moved_group_fair(struct task
 /*
  * All the scheduling class methods:
  */
-static const struct sched_class fair_sched_class = {
-	.next			= &idle_sched_class,
-	.enqueue_task		= enqueue_task_fair,
-	.dequeue_task		= dequeue_task_fair,
-	.yield_task		= yield_task_fair,
-
-	.check_preempt_curr	= check_preempt_wakeup,
-
-	.pick_next_task		= pick_next_task_fair,
-	.put_prev_task		= put_prev_task_fair,
+#define FAIR_SCHED_CLASS_INIT_BASE			\
+	.identity		= &fair_sched_class,	\
+	.next			= &idle_sched_class,	\
+	.enqueue_task		= enqueue_task_fair,	\
+	.dequeue_task		= dequeue_task_fair,	\
+	.yield_task		= yield_task_fair,	\
+							\
+	.check_preempt_curr	= check_preempt_wakeup,	\
+							\
+	.pick_next_task		= pick_next_task_fair,	\
+	.put_prev_task		= put_prev_task_fair,	\
+							\
+	.set_curr_task          = set_curr_task_fair,	\
+	.task_tick		= task_tick_fair,	\
+	.task_new		= task_new_fair,	\
+							\
+	.prio_changed		= prio_changed_fair,	\
+	.switched_to		= switched_to_fair,

 #ifdef CONFIG_SMP
-	.select_task_rq		= select_task_rq_fair,
-
-	.load_balance		= load_balance_fair,
+#define FAIR_SCHED_CLASS_INIT_SMP			\
+	.select_task_rq		= select_task_rq_fair,	\
+	.load_balance		= load_balance_fair,	\
 	.move_one_task		= move_one_task_fair,
+#else
+#define FAIR_SCHED_CLASS_INIT_SMP
 #endif

-	.set_curr_task          = set_curr_task_fair,
-	.task_tick		= task_tick_fair,
-	.task_new		= task_new_fair,
-
-	.prio_changed		= prio_changed_fair,
-	.switched_to		= switched_to_fair,
-
 #ifdef CONFIG_FAIR_GROUP_SCHED
+#define FAIR_SCHED_CLASS_INIT_GROUP			\
 	.moved_group		= moved_group_fair,
+#else
+#define FAIR_SCHED_CLASS_INIT_GROUP
 #endif
+
+#define FAIR_SCHED_CLASS_INIT				\
+	FAIR_SCHED_CLASS_INIT_BASE			\
+	FAIR_SCHED_CLASS_INIT_SMP			\
+	FAIR_SCHED_CLASS_INIT_GROUP
+
+static const struct sched_class fair_sched_class = {
+	FAIR_SCHED_CLASS_INIT
 };

 #ifdef CONFIG_SCHED_DEBUG
Index: work/include/linux/sched.h
===================================================================
--- work.orig/include/linux/sched.h
+++ work/include/linux/sched.h
@@ -1023,6 +1023,7 @@ struct sched_domain;

 struct sched_class {
 	const struct sched_class *next;
+	const struct sched_class *identity;

 	void (*enqueue_task) (struct rq *rq, struct task_struct *p, int wakeup);
 	void (*dequeue_task) (struct rq *rq, struct task_struct *p, int sleep);
@@ -1694,6 +1695,7 @@ extern cputime_t task_gtime(struct task_
 #define PF_SPREAD_PAGE	0x01000000	/* Spread page cache over cpuset */
 #define PF_SPREAD_SLAB	0x02000000	/* Spread some slab caches over cpuset */
 #define PF_THREAD_BOUND	0x04000000	/* Thread bound to specific cpu */
+#define PF_WORKQUEUE	0x08000000	/* I'm a workqueue worker */
 #define PF_MEMPOLICY	0x10000000	/* Non-default NUMA mempolicy */
 #define PF_MUTEX_TESTER	0x20000000	/* Thread belongs to the rt mutex tester */
 #define PF_FREEZER_SKIP	0x40000000	/* Freezer should not count it as freezeable */
@@ -1865,6 +1867,7 @@ extern int idle_cpu(int cpu);
 extern int sched_setscheduler(struct task_struct *, int, struct sched_param *);
 extern int sched_setscheduler_nocheck(struct task_struct *, int,
 				      struct sched_param *);
+extern void sched_setscheduler_workqueue(struct task_struct *p);
 extern struct task_struct *idle_task(int cpu);
 extern struct task_struct *curr_task(int cpu);
 extern void set_curr_task(int cpu, struct task_struct *p);
Index: work/kernel/sched_idletask.c
===================================================================
--- work.orig/kernel/sched_idletask.c
+++ work/kernel/sched_idletask.c
@@ -101,6 +101,7 @@ static void prio_changed_idle(struct rq
  * Simple, special scheduling class for the per-CPU idle tasks:
  */
 static const struct sched_class idle_sched_class = {
+	.identity		= &idle_sched_class,
 	/* .next is NULL */
 	/* no enqueue/yield_task for idle tasks */

Index: work/kernel/sched_rt.c
===================================================================
--- work.orig/kernel/sched_rt.c
+++ work/kernel/sched_rt.c
@@ -1739,6 +1739,7 @@ static void set_curr_task_rt(struct rq *
 }

 static const struct sched_class rt_sched_class = {
+	.identity		= &rt_sched_class,
 	.next			= &fair_sched_class,
 	.enqueue_task		= enqueue_task_rt,
 	.dequeue_task		= dequeue_task_rt,
Index: work/kernel/sched_workqueue.c
===================================================================
--- /dev/null
+++ work/kernel/sched_workqueue.c
@@ -0,0 +1,53 @@
+#include "sched_workqueue.h"
+
+static void enqueue_task_wq(struct rq *rq, struct task_struct *p, int wakeup)
+{
+	if (wakeup)
+		sched_workqueue_worker_wakeup(p);
+
+	return enqueue_task_fair(rq, p, wakeup);
+}
+
+static void dequeue_task_wq(struct rq *rq, struct task_struct *p, int sleep)
+{
+	if (sleep)
+		sched_workqueue_worker_sleep(p);
+
+	return dequeue_task_fair(rq, p, sleep);
+}
+
+static void put_prev_task_wq(struct rq *rq, struct task_struct *prev)
+{
+	if (prev->se.on_rq)
+		sched_workqueue_worker_preempted(prev);
+
+	return put_prev_task_fair(rq, prev);
+}
+
+static const struct sched_class workqueue_sched_class = {
+	FAIR_SCHED_CLASS_INIT
+	.enqueue_task		= enqueue_task_wq,
+	.dequeue_task		= dequeue_task_wq,
+	.put_prev_task		= put_prev_task_wq,
+};
+
+bool sched_workqueue_wake_up_process(struct task_struct *p)
+{
+	struct rq *rq = this_rq();
+	bool success = false;
+
+	if (!p->se.on_rq) {
+		schedstat_inc(p, se.nr_wakeups);
+		schedstat_inc(p, se.nr_wakeups_local);
+		activate_task(rq, p, 1);
+		success = true;
+	}
+
+	trace_sched_wakeup(rq, p, success);
+	p->state = TASK_RUNNING;
+#ifdef CONFIG_SMP
+	if (p->sched_class->task_wake_up)
+		p->sched_class->task_wake_up(rq, p);
+#endif
+	return success;
+}
Index: work/include/linux/kthread.h
===================================================================
--- work.orig/include/linux/kthread.h
+++ work/include/linux/kthread.h
@@ -30,6 +30,7 @@ struct task_struct *kthread_create(int (
 void kthread_bind(struct task_struct *k, unsigned int cpu);
 int kthread_stop(struct task_struct *k);
 int kthread_should_stop(void);
+void *kthread_data(struct task_struct *k);

 int kthreadd(void *unused);
 extern struct task_struct *kthreadd_task;
Index: work/kernel/kthread.c
===================================================================
--- work.orig/kernel/kthread.c
+++ work/kernel/kthread.c
@@ -37,6 +37,7 @@ struct kthread_create_info

 struct kthread {
 	int should_stop;
+	void *data;
 	struct completion exited;
 };

@@ -56,6 +57,11 @@ int kthread_should_stop(void)
 }
 EXPORT_SYMBOL(kthread_should_stop);

+void *kthread_data(struct task_struct *task)
+{
+	return to_kthread(current)->data;
+}
+
 static int kthread(void *_create)
 {
 	/* Copy data: it's on kthread's stack */
@@ -66,6 +72,7 @@ static int kthread(void *_create)
 	int ret;

 	self.should_stop = 0;
+	self.data = data;
 	init_completion(&self.exited);
 	current->vfork_done = &self.exited;

Index: work/kernel/sched_workqueue.h
===================================================================
--- /dev/null
+++ work/kernel/sched_workqueue.h
@@ -0,0 +1,5 @@
+void sched_workqueue_worker_wakeup(struct task_struct *task);
+void sched_workqueue_worker_sleep(struct task_struct *task);
+void sched_workqueue_worker_preempted(struct task_struct *task);
+
+bool sched_workqueue_wake_up_process(struct task_struct *p);

^ permalink raw reply	[flat|nested] 13+ messages in thread

end of thread, other threads:[~2009-08-31  3:26 UTC | newest]

Thread overview: 13+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2009-08-27  9:08 [PATCH 0/3] Convert libata pio task to slow-work Jens Axboe
2009-08-27  9:08 ` [PATCH 1/3] slow-work: add delayed_slow_work support Jens Axboe
2009-08-27  9:08   ` [PATCH 2/3] slow-work: add support for cancellation of slow work Jens Axboe
2009-08-27  9:08     ` [PATCH 3/3] libata: switch pio task from workqueue to slow-work Jens Axboe
2009-08-27  9:14     ` [PATCH 2/3] slow-work: add support for cancellation of slow work Jens Axboe
2009-08-27  9:18       ` [PATCH 2/3] slow-work: add support for cancellation of slow work (updated) Jens Axboe
2009-08-27 12:36 ` [PATCH 0/3] Convert libata pio task to slow-work Tejun Heo
2009-08-27 12:49   ` Jens Axboe
2009-08-27 12:58     ` Tejun Heo
2009-08-27 18:49       ` Jens Axboe
2009-08-28  7:02         ` Tejun Heo
2009-08-28  7:06           ` Jens Axboe
2009-08-31  3:24             ` [very-early-draft-unsplit PATCH] workqueue: implement global cpu workqueue Tejun Heo

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).