public inbox for linux-kernel@vger.kernel.org
 help / color / mirror / Atom feed
* [PATCHSET v3 wq/6.10] workqueue: Implement disable/enable_work()
@ 2024-02-27 17:28 Tejun Heo
  2024-02-27 17:28 ` [PATCH 1/6] workqueue: Preserve OFFQ bits in cancel[_sync] paths Tejun Heo
                   ` (6 more replies)
  0 siblings, 7 replies; 13+ messages in thread
From: Tejun Heo @ 2024-02-27 17:28 UTC (permalink / raw)
  To: jiangshanlai
  Cc: torvalds, linux-kernel, allen.lkml, kernel-team, boqun.feng, tglx,
	peterz, romain.perier, mingo

Hello,

Changes since v2 (http://lkml.kernel.org/r/20240221174333.700197-1-tj@kernel.org):

- enable_work() was incorrectly using local_irq_enable() instead of
  local_irq_restore(). Fixed. (Lai, Boqun).

- In __flush_work(), instead of usring the pool returned from
  start_flush_work() and testing it to tell whether the work item being
  flushed is BH, use the work item's WORK_OFFQ_BH bit. This is safe because
  the test is only need when @from_cancel is %true and we know that the work
  item must be off queue and thus its OFFQ bits are valid at that point.
  This makes the code a bit simpler and makes
  0005-workqueue-Update-how-start_flush_work-is-called.patch unnecessary.
  (Lai)

- There were discussions on whether we want to name the syncing variant
  disable_work() and async one dsiable_work_no_sync() instead of naming them
  disable_work_sync() and disable_work() respectively. While there are clear
  benefits in doing so (continuity from tasklet interface, shorter name for
  something used more widely), there is also the clear downside of breaking
  consistency with cancel_work_sync() and cancel_work(), which the disable
  functionality directly builds upon. I'm still leaning towards keeping
  things consistent with existing workqueue interface but open to listening
  to arguments, so if anyone has strong opinions, please pipe up.

Changes since v1 (http://lkml.kernel.org/20240216180559.208276-1-tj@kernel.org):

- Added missing disabled checks to queue_work_node() and
  mod_delayed_work_on(). (Lai)

- __flush_work() was derefing RCU protected pool pointer outside
  rcu_read_lock() section. Deref and test inside and remember the result.
  (Lai)

- queue_rcu_work() doesn't need a disabled check as rcu_work doesn't support
  canceling and disabling; however, in case users reach into rcu_work and
  invoke disable on the embedded work item, add disabled check to
  queue_rcu_work() and trigger warning if disabled. (Lai)

- The first ten cleanup patches have been applied to wq/for-6.9 and dropped
  from this series.

4cb1ef64609f ("workqueue: Implement BH workqueues to eventually replace
tasklets") implemented workqueues that execute work items in the BH context
with the goal of eventually replacing tasklet.

While the existing workqueue API covers the basic queueing and canceling
operations, tasklet also has tasklet_disable*() which blocks the execution
of the tasklet until it's re-enabled with tasklet_enable(). The interface if
fairly widely used and workqueue currently doesn't have a counterpart.

This patchset implements disable/enable_work() and the delayed_work
counterparts to address the gap. The ability to block future executions is
something which some users asked for in the past, and, while not essential
as the caller can and often has to shutdown the queuer anyway, it's a nice
convenience to have. Also, timer grew a similar feature recently with
timer_shutdown().

- tasklet_disable() keeps disable depth so that a tasklet can be disabled
  and re-enabled by multiple parties at the same time. Workqueue is updated
  to carry 16bit disable count in work->data while the work item is not
  queued. When non-zero, attempts to queue the work item fail.

- The cancel_work_sync() path used WORK_OFFQ_CANCELING to synchronize
  multiple cancel_sync attempts. This added a completely separate wait
  mechanism which wasn't very congruent with the rest of workqueue. This was
  because the canceling state was carried in a way which couldn't
  accommodate multiple concurrent uses. This mechanism is replaced by
  disable - cancel_sync now simply disables the work item, flushes and
  re-enables it.

- There is a wart in how tasklet_disable/enable() works. When a tasklet is
  disabled, if the tasklet is queued, it keeps the softirq constantly raised
  until the tasklet is re-enabled and executed. This makes disabling
  unnecessarily expensive and brittle. The only busy looping workqueue's
  implementation does is on the party that's trying to cancel_sync or
  disable_sync to wait for the completion of the currently executing
  instance, which should be safe as long as it's from process and BH
  contexts.

- A disabled tasklet remembers whether it was queued while disabled and
  starts executing when re-enabled. It turns out doing this with work items
  is challenging as there are a lot more to remember and the synchronization
  becomes more complicated too. Looking at the use cases and given the
  continuity from how cancel_work_sync() works, it seems better to just
  ignore queueings which happen while a work item is disabled and require
  the users to explicitly queue the work item after re-enabling as
  necessary. Most users should be able to re-enqueue unconditionally after
  enabling.

After the changes in this patchset, BH workqueues cover most of the
functionalities provided by tasklets. The following is how the two APIs map:

- tasklet_setup/init()		-> INIT_WORK()
- tasklet_schedule()		-> queue_work(system_bh_wq, ...)
- tasklet_hi_schedule()		-> queue_work(system_bh_highpri_wq, ...)
- tasklet_disable_nosync()	-> disable_work()
- tasklet_disable[_in_atomic]()	-> disable_work_sync()
- tasklet_enable()		-> enable_work() + queue_work()
- tasklet_kill()		-> cancel_work_sync()

Note that unlike tasklet_enable(), enable_work() doesn't queue the work item
automatically according to whether the work item was queued while disabled.
While the caller can track this separately, unconditionally scheduling the
work item after enable_work() returns %true should work for most users.

This patchset contains the following 6 patches:

 0001-workqueue-Preserve-OFFQ-bits-in-cancel-_sync-paths.patch
 0002-workqueue-Implement-disable-enable-for-delayed-work-.patch
 0003-workqueue-Remove-WORK_OFFQ_CANCELING.patch
 0004-workqueue-Remember-whether-a-work-item-was-on-a-BH-w.patch
 0005-workqueue-Allow-cancel_work_sync-and-disable_work-fr.patch
 0006-r8152-Convert-from-tasklet-to-BH-workqueue.patch

Given how many invasive workqueue changes are already queued for v6.9 and
the subtle nature of these patches, I think it'd be best to defer this
patchset to v6.10 so that we can use v6.9 as an intermediate verification
point.

0001-0002 implement disable_work() and enable_work(). At this stage, all
disable[_sync]_work and enable_work operations might_sleep(). disable_work()
and enable_work() due to CANCELING synchronization described above.
disable_work_sync() also needs to wait for the in-flight work item to finish
which requires blocking.

0003 replaces CANCELING with internal use of disble/enable. This removes one
ugliness from workqueue code and allows disable_work() and enable_work() to
be used from atomic contexts.

0004-0005 implement busy-wait for BH work items when they're being canceled
thus allowing cancel_work_sync() and disable_work_sync() to be called from
atomic contexts for them. This makes workqueue interface a superset of
tasklet and also makes BH workqueues easier to live with.

0006 converts drivers/net/r8152.c from tasklet to BH workqueue as a
demonstration. It seems to work fine.

The patchset is on top of wq/for-6.9 (ccdec92198df ("workqueue: Control
intensive warning threshold through cmdline")) and also available in the
following git branch:

 git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git disable_work-v3

diffstat follows. Thanks.

 drivers/net/usb/r8152.c   |   44 ++++----
 include/linux/workqueue.h |   23 +++-
 kernel/workqueue.c        |  384 ++++++++++++++++++++++++++++++++++++++++++++++---------------------------

--
tejun

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCH 1/6] workqueue: Preserve OFFQ bits in cancel[_sync] paths
  2024-02-27 17:28 [PATCHSET v3 wq/6.10] workqueue: Implement disable/enable_work() Tejun Heo
@ 2024-02-27 17:28 ` Tejun Heo
  2024-02-27 17:28 ` [PATCH 2/6] workqueue: Implement disable/enable for (delayed) work items Tejun Heo
                   ` (5 subsequent siblings)
  6 siblings, 0 replies; 13+ messages in thread
From: Tejun Heo @ 2024-02-27 17:28 UTC (permalink / raw)
  To: jiangshanlai
  Cc: torvalds, linux-kernel, allen.lkml, kernel-team, boqun.feng, tglx,
	peterz, romain.perier, mingo, Tejun Heo

The cancel[_sync] paths acquire and release WORK_STRUCT_PENDING, and
manipulate WORK_OFFQ_CANCELING. However, they assume that all the OFFQ bit
values except for the pool ID are statically known and don't preserve them,
which is not wrong in the current code as the pool ID and CANCELING are the
only information carried. However, the planned disable/enable support will
add more fields and need them to be preserved.

This patch updates work data handling so that only the bits which need
updating are updated.

- struct work_offq_data is added along with work_offqd_unpack() and
  work_offqd_pack_flags() to help manipulating multiple fields contained in
  work->data. Note that the helpers look a bit silly right now as there
  isn't that much to pack. The next patch will add more.

- mark_work_canceling() which is used only by __cancel_work_sync() is
  replaced by open-coded usage of work_offq_data and
  set_work_pool_and_keep_pending() in __cancel_work_sync().

- __cancel_work[_sync]() uses offq_data helpers to preserve other OFFQ bits
  when clearing WORK_STRUCT_PENDING and WORK_OFFQ_CANCELING at the end.

- This removes all users of get_work_pool_id() which is dropped. Note that
  get_work_pool_id() could handle both WORK_STRUCT_PWQ and !WORK_STRUCT_PWQ
  cases; however, it was only being called after try_to_grab_pending()
  succeeded, in which case WORK_STRUCT_PWQ is never set and thus it's safe
  to use work_offqd_unpack() instead.

No behavior changes intended.

Signed-off-by: Tejun Heo <tj@kernel.org>
---
 include/linux/workqueue.h |  1 +
 kernel/workqueue.c        | 51 ++++++++++++++++++++++++---------------
 2 files changed, 32 insertions(+), 20 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 0ad534fe6673..e15fc77bf2e2 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -97,6 +97,7 @@ enum wq_misc_consts {
 
 /* Convenience constants - of type 'unsigned long', not 'enum'! */
 #define WORK_OFFQ_CANCELING	(1ul << WORK_OFFQ_CANCELING_BIT)
+#define WORK_OFFQ_FLAG_MASK	(((1ul << WORK_OFFQ_FLAG_BITS) - 1) << WORK_OFFQ_FLAG_SHIFT)
 #define WORK_OFFQ_POOL_NONE	((1ul << WORK_OFFQ_POOL_BITS) - 1)
 #define WORK_STRUCT_NO_POOL	(WORK_OFFQ_POOL_NONE << WORK_OFFQ_POOL_SHIFT)
 #define WORK_STRUCT_PWQ_MASK	(~((1ul << WORK_STRUCT_PWQ_SHIFT) - 1))
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 38783e3a60bb..ecd46fbed60b 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -391,6 +391,11 @@ struct wq_pod_type {
 	int			*cpu_pod;	/* cpu -> pod */
 };
 
+struct work_offq_data {
+	u32			pool_id;
+	u32			flags;
+};
+
 static const char *wq_affn_names[WQ_AFFN_NR_TYPES] = {
 	[WQ_AFFN_DFL]		= "default",
 	[WQ_AFFN_CPU]		= "cpu",
@@ -891,29 +896,23 @@ static struct worker_pool *get_work_pool(struct work_struct *work)
 	return idr_find(&worker_pool_idr, pool_id);
 }
 
-/**
- * get_work_pool_id - return the worker pool ID a given work is associated with
- * @work: the work item of interest
- *
- * Return: The worker_pool ID @work was last associated with.
- * %WORK_OFFQ_POOL_NONE if none.
- */
-static int get_work_pool_id(struct work_struct *work)
+static unsigned long shift_and_mask(unsigned long v, u32 shift, u32 bits)
 {
-	unsigned long data = atomic_long_read(&work->data);
+	return (v >> shift) & ((1 << bits) - 1);
+}
 
-	if (data & WORK_STRUCT_PWQ)
-		return work_struct_pwq(data)->pool->id;
+static void work_offqd_unpack(struct work_offq_data *offqd, unsigned long data)
+{
+	WARN_ON_ONCE(data & WORK_STRUCT_PWQ);
 
-	return data >> WORK_OFFQ_POOL_SHIFT;
+	offqd->pool_id = shift_and_mask(data, WORK_OFFQ_POOL_SHIFT,
+					WORK_OFFQ_POOL_BITS);
+	offqd->flags = data & WORK_OFFQ_FLAG_MASK;
 }
 
-static void mark_work_canceling(struct work_struct *work)
+static unsigned long work_offqd_pack_flags(struct work_offq_data *offqd)
 {
-	unsigned long pool_id = get_work_pool_id(work);
-
-	pool_id <<= WORK_OFFQ_POOL_SHIFT;
-	set_work_data(work, pool_id | WORK_STRUCT_PENDING | WORK_OFFQ_CANCELING);
+	return (unsigned long)offqd->flags;
 }
 
 static bool work_is_canceling(struct work_struct *work)
@@ -4186,6 +4185,7 @@ EXPORT_SYMBOL(flush_rcu_work);
 
 static bool __cancel_work(struct work_struct *work, u32 cflags)
 {
+	struct work_offq_data offqd;
 	unsigned long irq_flags;
 	int ret;
 
@@ -4196,19 +4196,26 @@ static bool __cancel_work(struct work_struct *work, u32 cflags)
 	if (unlikely(ret < 0))
 		return false;
 
-	set_work_pool_and_clear_pending(work, get_work_pool_id(work), 0);
+	work_offqd_unpack(&offqd, *work_data_bits(work));
+	set_work_pool_and_clear_pending(work, offqd.pool_id,
+					work_offqd_pack_flags(&offqd));
 	local_irq_restore(irq_flags);
 	return ret;
 }
 
 static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
 {
+	struct work_offq_data offqd;
 	unsigned long irq_flags;
 	bool ret;
 
 	/* claim @work and tell other tasks trying to grab @work to back off */
 	ret = work_grab_pending(work, cflags, &irq_flags);
-	mark_work_canceling(work);
+
+	work_offqd_unpack(&offqd, *work_data_bits(work));
+	offqd.flags |= WORK_OFFQ_CANCELING;
+	set_work_pool_and_keep_pending(work, offqd.pool_id,
+				       work_offqd_pack_flags(&offqd));
 	local_irq_restore(irq_flags);
 
 	/*
@@ -4218,12 +4225,16 @@ static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
 	if (wq_online)
 		__flush_work(work, true);
 
+	work_offqd_unpack(&offqd, *work_data_bits(work));
+
 	/*
 	 * smp_mb() at the end of set_work_pool_and_clear_pending() is paired
 	 * with prepare_to_wait() above so that either waitqueue_active() is
 	 * visible here or !work_is_canceling() is visible there.
 	 */
-	set_work_pool_and_clear_pending(work, WORK_OFFQ_POOL_NONE, 0);
+	offqd.flags &= ~WORK_OFFQ_CANCELING;
+	set_work_pool_and_clear_pending(work, WORK_OFFQ_POOL_NONE,
+					work_offqd_pack_flags(&offqd));
 
 	if (waitqueue_active(&wq_cancel_waitq))
 		__wake_up(&wq_cancel_waitq, TASK_NORMAL, 1, work);
-- 
2.43.2


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 2/6] workqueue: Implement disable/enable for (delayed) work items
  2024-02-27 17:28 [PATCHSET v3 wq/6.10] workqueue: Implement disable/enable_work() Tejun Heo
  2024-02-27 17:28 ` [PATCH 1/6] workqueue: Preserve OFFQ bits in cancel[_sync] paths Tejun Heo
@ 2024-02-27 17:28 ` Tejun Heo
  2024-02-27 17:28 ` [PATCH 3/6] workqueue: Remove WORK_OFFQ_CANCELING Tejun Heo
                   ` (4 subsequent siblings)
  6 siblings, 0 replies; 13+ messages in thread
From: Tejun Heo @ 2024-02-27 17:28 UTC (permalink / raw)
  To: jiangshanlai
  Cc: torvalds, linux-kernel, allen.lkml, kernel-team, boqun.feng, tglx,
	peterz, romain.perier, mingo, Tejun Heo

While (delayed) work items could be flushed and canceled, there was no way
to prevent them from being queued in the future. While this didn't lead to
functional deficiencies, it sometimes required a bit more effort from the
workqueue users to e.g. sequence shutdown steps with more care.

Workqueue is currently in the process of replacing tasklet which does
support disabling and enabling. The feature is used relatively widely to,
for example, temporarily suppress main path while a control plane operation
(reset or config change) is in progress.

To enable easy conversion of tasklet users and as it seems like an inherent
useful feature, this patch implements disabling and enabling of work items.

- A work item carries 16bit disable count in work->data while not queued.
  The access to the count is synchronized by the PENDING bit like all other
  parts of work->data.

- If the count is non-zero, the work item cannot be queued. Any attempt to
  queue the work item fails and returns %false.

- disable_work[_sync](), enable_work(), disable_delayed_work[_sync]() and
  enable_delayed_work() are added.

v3: enable_work() was using local_irq_enable() instead of
    local_irq_restore() to undo IRQ-disable by work_grab_pending(). This is
    awkward now and will become incorrect as enable_work() will later be
    used from IRQ context too. (Lai)

v2: Lai noticed that queue_work_node() wasn't checking the disable count.
    Fixed. queue_rcu_work() is updated to trigger warning if the inner work
    item is disabled.

Signed-off-by: Tejun Heo <tj@kernel.org>
Cc: Lai Jiangshan <jiangshanlai@gmail.com>
---
 include/linux/workqueue.h |  18 +++-
 kernel/workqueue.c        | 177 +++++++++++++++++++++++++++++++++++---
 2 files changed, 182 insertions(+), 13 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index e15fc77bf2e2..f25915e47efb 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -51,20 +51,23 @@ enum work_bits {
 	 * data contains off-queue information when !WORK_STRUCT_PWQ.
 	 *
 	 * MSB
-	 * [ pool ID ] [ OFFQ flags ] [ STRUCT flags ]
-	 *                 1 bit        4 or 5 bits
+	 * [ pool ID ] [ disable depth ] [ OFFQ flags ] [ STRUCT flags ]
+	 *                  16 bits          1 bit        4 or 5 bits
 	 */
 	WORK_OFFQ_FLAG_SHIFT	= WORK_STRUCT_FLAG_BITS,
 	WORK_OFFQ_CANCELING_BIT = WORK_OFFQ_FLAG_SHIFT,
 	WORK_OFFQ_FLAG_END,
 	WORK_OFFQ_FLAG_BITS	= WORK_OFFQ_FLAG_END - WORK_OFFQ_FLAG_SHIFT,
 
+	WORK_OFFQ_DISABLE_SHIFT	= WORK_OFFQ_FLAG_SHIFT + WORK_OFFQ_FLAG_BITS,
+	WORK_OFFQ_DISABLE_BITS	= 16,
+
 	/*
 	 * When a work item is off queue, the high bits encode off-queue flags
 	 * and the last pool it was on. Cap pool ID to 31 bits and use the
 	 * highest number to indicate that no pool is associated.
 	 */
-	WORK_OFFQ_POOL_SHIFT	= WORK_OFFQ_FLAG_SHIFT + WORK_OFFQ_FLAG_BITS,
+	WORK_OFFQ_POOL_SHIFT	= WORK_OFFQ_DISABLE_SHIFT + WORK_OFFQ_DISABLE_BITS,
 	WORK_OFFQ_LEFT		= BITS_PER_LONG - WORK_OFFQ_POOL_SHIFT,
 	WORK_OFFQ_POOL_BITS	= WORK_OFFQ_LEFT <= 31 ? WORK_OFFQ_LEFT : 31,
 };
@@ -98,6 +101,7 @@ enum wq_misc_consts {
 /* Convenience constants - of type 'unsigned long', not 'enum'! */
 #define WORK_OFFQ_CANCELING	(1ul << WORK_OFFQ_CANCELING_BIT)
 #define WORK_OFFQ_FLAG_MASK	(((1ul << WORK_OFFQ_FLAG_BITS) - 1) << WORK_OFFQ_FLAG_SHIFT)
+#define WORK_OFFQ_DISABLE_MASK	(((1ul << WORK_OFFQ_DISABLE_BITS) - 1) << WORK_OFFQ_DISABLE_SHIFT)
 #define WORK_OFFQ_POOL_NONE	((1ul << WORK_OFFQ_POOL_BITS) - 1)
 #define WORK_STRUCT_NO_POOL	(WORK_OFFQ_POOL_NONE << WORK_OFFQ_POOL_SHIFT)
 #define WORK_STRUCT_PWQ_MASK	(~((1ul << WORK_STRUCT_PWQ_SHIFT) - 1))
@@ -556,6 +560,14 @@ extern bool flush_delayed_work(struct delayed_work *dwork);
 extern bool cancel_delayed_work(struct delayed_work *dwork);
 extern bool cancel_delayed_work_sync(struct delayed_work *dwork);
 
+extern bool disable_work(struct work_struct *work);
+extern bool disable_work_sync(struct work_struct *work);
+extern bool enable_work(struct work_struct *work);
+
+extern bool disable_delayed_work(struct delayed_work *dwork);
+extern bool disable_delayed_work_sync(struct delayed_work *dwork);
+extern bool enable_delayed_work(struct delayed_work *dwork);
+
 extern bool flush_rcu_work(struct rcu_work *rwork);
 
 extern void workqueue_set_max_active(struct workqueue_struct *wq,
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index ecd46fbed60b..a2f2847d464b 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -98,6 +98,7 @@ enum worker_flags {
 
 enum work_cancel_flags {
 	WORK_CANCEL_DELAYED	= 1 << 0,	/* canceling a delayed_work */
+	WORK_CANCEL_DISABLE	= 1 << 1,	/* canceling to disable */
 };
 
 enum wq_internal_consts {
@@ -393,6 +394,7 @@ struct wq_pod_type {
 
 struct work_offq_data {
 	u32			pool_id;
+	u32			disable;
 	u32			flags;
 };
 
@@ -907,12 +909,15 @@ static void work_offqd_unpack(struct work_offq_data *offqd, unsigned long data)
 
 	offqd->pool_id = shift_and_mask(data, WORK_OFFQ_POOL_SHIFT,
 					WORK_OFFQ_POOL_BITS);
+	offqd->disable = shift_and_mask(data, WORK_OFFQ_DISABLE_SHIFT,
+					WORK_OFFQ_DISABLE_BITS);
 	offqd->flags = data & WORK_OFFQ_FLAG_MASK;
 }
 
 static unsigned long work_offqd_pack_flags(struct work_offq_data *offqd)
 {
-	return (unsigned long)offqd->flags;
+	return ((unsigned long)offqd->disable << WORK_OFFQ_DISABLE_SHIFT) |
+		((unsigned long)offqd->flags);
 }
 
 static bool work_is_canceling(struct work_struct *work)
@@ -2405,6 +2410,21 @@ static void __queue_work(int cpu, struct workqueue_struct *wq,
 	rcu_read_unlock();
 }
 
+static bool clear_pending_if_disabled(struct work_struct *work)
+{
+	unsigned long data = *work_data_bits(work);
+	struct work_offq_data offqd;
+
+	if (likely((data & WORK_STRUCT_PWQ) ||
+		   !(data & WORK_OFFQ_DISABLE_MASK)))
+		return false;
+
+	work_offqd_unpack(&offqd, data);
+	set_work_pool_and_clear_pending(work, offqd.pool_id,
+					work_offqd_pack_flags(&offqd));
+	return true;
+}
+
 /**
  * queue_work_on - queue work on specific cpu
  * @cpu: CPU number to execute work on
@@ -2427,7 +2447,8 @@ bool queue_work_on(int cpu, struct workqueue_struct *wq,
 
 	local_irq_save(irq_flags);
 
-	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
+	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)) &&
+	    !clear_pending_if_disabled(work)) {
 		__queue_work(cpu, wq, work);
 		ret = true;
 	}
@@ -2505,7 +2526,8 @@ bool queue_work_node(int node, struct workqueue_struct *wq,
 
 	local_irq_save(irq_flags);
 
-	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
+	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)) &&
+	    !clear_pending_if_disabled(work)) {
 		int cpu = select_numa_node_cpu(node);
 
 		__queue_work(cpu, wq, work);
@@ -2587,7 +2609,8 @@ bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
 	/* read the comment in __queue_work() */
 	local_irq_save(irq_flags);
 
-	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
+	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)) &&
+	    !clear_pending_if_disabled(work)) {
 		__queue_delayed_work(cpu, wq, dwork, delay);
 		ret = true;
 	}
@@ -2660,7 +2683,12 @@ bool queue_rcu_work(struct workqueue_struct *wq, struct rcu_work *rwork)
 {
 	struct work_struct *work = &rwork->work;
 
-	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
+	/*
+	 * rcu_work can't be canceled or disabled. Warn if the user reached
+	 * inside @rwork and disabled the inner work.
+	 */
+	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)) &&
+	    !WARN_ON_ONCE(clear_pending_if_disabled(work))) {
 		rwork->wq = wq;
 		call_rcu_hurry(&rwork->rcu, rcu_work_rcufn);
 		return true;
@@ -4183,20 +4211,46 @@ bool flush_rcu_work(struct rcu_work *rwork)
 }
 EXPORT_SYMBOL(flush_rcu_work);
 
+static void work_offqd_disable(struct work_offq_data *offqd)
+{
+	const unsigned long max = (1lu << WORK_OFFQ_DISABLE_BITS) - 1;
+
+	if (likely(offqd->disable < max))
+		offqd->disable++;
+	else
+		WARN_ONCE(true, "workqueue: work disable count overflowed\n");
+}
+
+static void work_offqd_enable(struct work_offq_data *offqd)
+{
+	if (likely(offqd->disable > 0))
+		offqd->disable--;
+	else
+		WARN_ONCE(true, "workqueue: work disable count underflowed\n");
+}
+
 static bool __cancel_work(struct work_struct *work, u32 cflags)
 {
 	struct work_offq_data offqd;
 	unsigned long irq_flags;
 	int ret;
 
-	do {
-		ret = try_to_grab_pending(work, cflags, &irq_flags);
-	} while (unlikely(ret == -EAGAIN));
+	if (cflags & WORK_CANCEL_DISABLE) {
+		ret = work_grab_pending(work, cflags, &irq_flags);
+	} else {
+		do {
+			ret = try_to_grab_pending(work, cflags, &irq_flags);
+		} while (unlikely(ret == -EAGAIN));
 
-	if (unlikely(ret < 0))
-		return false;
+		if (unlikely(ret < 0))
+			return false;
+	}
 
 	work_offqd_unpack(&offqd, *work_data_bits(work));
+
+	if (cflags & WORK_CANCEL_DISABLE)
+		work_offqd_disable(&offqd);
+
 	set_work_pool_and_clear_pending(work, offqd.pool_id,
 					work_offqd_pack_flags(&offqd));
 	local_irq_restore(irq_flags);
@@ -4213,6 +4267,10 @@ static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
 	ret = work_grab_pending(work, cflags, &irq_flags);
 
 	work_offqd_unpack(&offqd, *work_data_bits(work));
+
+	if (cflags & WORK_CANCEL_DISABLE)
+		work_offqd_disable(&offqd);
+
 	offqd.flags |= WORK_OFFQ_CANCELING;
 	set_work_pool_and_keep_pending(work, offqd.pool_id,
 				       work_offqd_pack_flags(&offqd));
@@ -4312,6 +4370,105 @@ bool cancel_delayed_work_sync(struct delayed_work *dwork)
 }
 EXPORT_SYMBOL(cancel_delayed_work_sync);
 
+/**
+ * disable_work - Disable and cancel a work item
+ * @work: work item to disable
+ *
+ * Disable @work by incrementing its disable count and cancel it if currently
+ * pending. As long as the disable count is non-zero, any attempt to queue @work
+ * will fail and return %false. The maximum supported disable depth is 2 to the
+ * power of %WORK_OFFQ_DISABLE_BITS, currently 65536.
+ *
+ * Must be called from a sleepable context. Returns %true if @work was pending,
+ * %false otherwise.
+ */
+bool disable_work(struct work_struct *work)
+{
+	return __cancel_work(work, WORK_CANCEL_DISABLE);
+}
+EXPORT_SYMBOL_GPL(disable_work);
+
+/**
+ * disable_work_sync - Disable, cancel and drain a work item
+ * @work: work item to disable
+ *
+ * Similar to disable_work() but also wait for @work to finish if currently
+ * executing.
+ *
+ * Must be called from a sleepable context. Returns %true if @work was pending,
+ * %false otherwise.
+ */
+bool disable_work_sync(struct work_struct *work)
+{
+	return __cancel_work_sync(work, WORK_CANCEL_DISABLE);
+}
+EXPORT_SYMBOL_GPL(disable_work_sync);
+
+/**
+ * enable_work - Enable a work item
+ * @work: work item to enable
+ *
+ * Undo disable_work[_sync]() by decrementing @work's disable count. @work can
+ * only be queued if its disable count is 0.
+ *
+ * Must be called from a sleepable context. Returns %true if the disable count
+ * reached 0. Otherwise, %false.
+ */
+bool enable_work(struct work_struct *work)
+{
+	struct work_offq_data offqd;
+	unsigned long irq_flags;
+
+	work_grab_pending(work, 0, &irq_flags);
+
+	work_offqd_unpack(&offqd, *work_data_bits(work));
+	work_offqd_enable(&offqd);
+	set_work_pool_and_clear_pending(work, offqd.pool_id,
+					work_offqd_pack_flags(&offqd));
+	local_irq_restore(irq_flags);
+
+	return !offqd.disable;
+}
+EXPORT_SYMBOL_GPL(enable_work);
+
+/**
+ * disable_delayed_work - Disable and cancel a delayed work item
+ * @dwork: delayed work item to disable
+ *
+ * disable_work() for delayed work items.
+ */
+bool disable_delayed_work(struct delayed_work *dwork)
+{
+	return __cancel_work(&dwork->work,
+			     WORK_CANCEL_DELAYED | WORK_CANCEL_DISABLE);
+}
+EXPORT_SYMBOL_GPL(disable_delayed_work);
+
+/**
+ * disable_delayed_work_sync - Disable, cancel and drain a delayed work item
+ * @dwork: delayed work item to disable
+ *
+ * disable_work_sync() for delayed work items.
+ */
+bool disable_delayed_work_sync(struct delayed_work *dwork)
+{
+	return __cancel_work_sync(&dwork->work,
+				  WORK_CANCEL_DELAYED | WORK_CANCEL_DISABLE);
+}
+EXPORT_SYMBOL_GPL(disable_delayed_work_sync);
+
+/**
+ * enable_delayed_work - Enable a delayed work item
+ * @dwork: delayed work item to enable
+ *
+ * enable_work() for delayed work items.
+ */
+bool enable_delayed_work(struct delayed_work *dwork)
+{
+	return enable_work(&dwork->work);
+}
+EXPORT_SYMBOL_GPL(enable_delayed_work);
+
 /**
  * schedule_on_each_cpu - execute a function synchronously on each online CPU
  * @func: the function to call
-- 
2.43.2


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 3/6] workqueue: Remove WORK_OFFQ_CANCELING
  2024-02-27 17:28 [PATCHSET v3 wq/6.10] workqueue: Implement disable/enable_work() Tejun Heo
  2024-02-27 17:28 ` [PATCH 1/6] workqueue: Preserve OFFQ bits in cancel[_sync] paths Tejun Heo
  2024-02-27 17:28 ` [PATCH 2/6] workqueue: Implement disable/enable for (delayed) work items Tejun Heo
@ 2024-02-27 17:28 ` Tejun Heo
  2024-02-27 17:28 ` [PATCH 4/6] workqueue: Remember whether a work item was on a BH workqueue Tejun Heo
                   ` (3 subsequent siblings)
  6 siblings, 0 replies; 13+ messages in thread
From: Tejun Heo @ 2024-02-27 17:28 UTC (permalink / raw)
  To: jiangshanlai
  Cc: torvalds, linux-kernel, allen.lkml, kernel-team, boqun.feng, tglx,
	peterz, romain.perier, mingo, Tejun Heo

cancel[_delayed]_work_sync() guarantees that it can shut down
self-requeueing work items. To achieve that, it grabs and then holds
WORK_STRUCT_PENDING bit set while flushing the currently executing instance.
As the PENDING bit is set, all queueing attempts including the
self-requeueing ones fail and once the currently executing instance is
flushed, the work item should be idle as long as someone else isn't actively
queueing it.

This means that the cancel_work_sync path may hold the PENDING bit set while
flushing the target work item. This isn't a problem for the queueing path -
it can just fail which is the desired effect. It doesn't affect flush. It
doesn't matter to cancel_work either as it can just report that the work
item has successfully canceled. However, if there's another cancel_work_sync
attempt on the work item, it can't simply fail or report success and that
would breach the guarantee that it should provide. cancel_work_sync has to
wait for and grab that PENDING bit and go through the motions.

WORK_OFFQ_CANCELING and wq_cancel_waitq are what implement this
cancel_work_sync to cancel_work_sync wait mechanism. When a work item is
being canceled, WORK_OFFQ_CANCELING is also set on it and other
cancel_work_sync attempts wait on the bit to be cleared using the wait
queue.

While this works, it's an isolated wart which doesn't jive with the rest of
flush and cancel mechanisms and forces enable_work() and disable_work() to
require a sleepable context, which hampers their usability.

Now that a work item can be disabled, we can use that to block queueing
while cancel_work_sync is in progress. Instead of holding PENDING the bit,
it can temporarily disable the work item, flush and then re-enable it as
that'd achieve the same end result of blocking queueings while canceling and
thus enable canceling of self-requeueing work items.

- WORK_OFFQ_CANCELING and the surrounding mechanims are removed.

- work_grab_pending() is now simpler, no longer has to wait for a blocking
  operation and thus can be called from any context.

- With work_grab_pending() simplified, no need to use try_to_grab_pending()
  directly. All users are converted to use work_grab_pending().

- __cancel_work_sync() is updated to __cancel_work() with
  WORK_CANCEL_DISABLE to cancel and plug racing queueing attempts. It then
  flushes and re-enables the work item if necessary.

- These changes allow disable_work() and enable_work() to be called from any
  context.

v2: Lai pointed out that mod_delayed_work_on() needs to check the disable
    count before queueing the delayed work item. Added
    clear_pending_if_disabled() call.

Signed-off-by: Tejun Heo <tj@kernel.org>
Cc: Lai Jiangshan <jiangshanlai@gmail.com>
---
 include/linux/workqueue.h |   4 +-
 kernel/workqueue.c        | 140 ++++++--------------------------------
 2 files changed, 20 insertions(+), 124 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index f25915e47efb..86483743ad28 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -52,10 +52,9 @@ enum work_bits {
 	 *
 	 * MSB
 	 * [ pool ID ] [ disable depth ] [ OFFQ flags ] [ STRUCT flags ]
-	 *                  16 bits          1 bit        4 or 5 bits
+	 *                  16 bits          0 bits       4 or 5 bits
 	 */
 	WORK_OFFQ_FLAG_SHIFT	= WORK_STRUCT_FLAG_BITS,
-	WORK_OFFQ_CANCELING_BIT = WORK_OFFQ_FLAG_SHIFT,
 	WORK_OFFQ_FLAG_END,
 	WORK_OFFQ_FLAG_BITS	= WORK_OFFQ_FLAG_END - WORK_OFFQ_FLAG_SHIFT,
 
@@ -99,7 +98,6 @@ enum wq_misc_consts {
 };
 
 /* Convenience constants - of type 'unsigned long', not 'enum'! */
-#define WORK_OFFQ_CANCELING	(1ul << WORK_OFFQ_CANCELING_BIT)
 #define WORK_OFFQ_FLAG_MASK	(((1ul << WORK_OFFQ_FLAG_BITS) - 1) << WORK_OFFQ_FLAG_SHIFT)
 #define WORK_OFFQ_DISABLE_MASK	(((1ul << WORK_OFFQ_DISABLE_BITS) - 1) << WORK_OFFQ_DISABLE_SHIFT)
 #define WORK_OFFQ_POOL_NONE	((1ul << WORK_OFFQ_POOL_BITS) - 1)
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index a2f2847d464b..07e77130227c 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -495,12 +495,6 @@ static struct workqueue_attrs *unbound_std_wq_attrs[NR_STD_WORKER_POOLS];
 /* I: attributes used when instantiating ordered pools on demand */
 static struct workqueue_attrs *ordered_wq_attrs[NR_STD_WORKER_POOLS];
 
-/*
- * Used to synchronize multiple cancel_sync attempts on the same work item. See
- * work_grab_pending() and __cancel_work_sync().
- */
-static DECLARE_WAIT_QUEUE_HEAD(wq_cancel_waitq);
-
 /*
  * I: kthread_worker to release pwq's. pwq release needs to be bounced to a
  * process context while holding a pool lock. Bounce to a dedicated kthread
@@ -782,11 +776,6 @@ static int work_next_color(int color)
  * corresponding to a work.  Pool is available once the work has been
  * queued anywhere after initialization until it is sync canceled.  pwq is
  * available only while the work item is queued.
- *
- * %WORK_OFFQ_CANCELING is used to mark a work item which is being
- * canceled.  While being canceled, a work item may have its PENDING set
- * but stay off timer and worklist for arbitrarily long and nobody should
- * try to steal the PENDING bit.
  */
 static inline void set_work_data(struct work_struct *work, unsigned long data)
 {
@@ -920,13 +909,6 @@ static unsigned long work_offqd_pack_flags(struct work_offq_data *offqd)
 		((unsigned long)offqd->flags);
 }
 
-static bool work_is_canceling(struct work_struct *work)
-{
-	unsigned long data = atomic_long_read(&work->data);
-
-	return !(data & WORK_STRUCT_PWQ) && (data & WORK_OFFQ_CANCELING);
-}
-
 /*
  * Policy functions.  These define the policies on how the global worker
  * pools are managed.  Unless noted otherwise, these functions assume that
@@ -2055,8 +2037,6 @@ static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, unsigned long work_
  *  1		if @work was pending and we successfully stole PENDING
  *  0		if @work was idle and we claimed PENDING
  *  -EAGAIN	if PENDING couldn't be grabbed at the moment, safe to busy-retry
- *  -ENOENT	if someone else is canceling @work, this state may persist
- *		for arbitrarily long
  *  ========	================================================================
  *
  * Note:
@@ -2152,26 +2132,9 @@ static int try_to_grab_pending(struct work_struct *work, u32 cflags,
 fail:
 	rcu_read_unlock();
 	local_irq_restore(*irq_flags);
-	if (work_is_canceling(work))
-		return -ENOENT;
-	cpu_relax();
 	return -EAGAIN;
 }
 
-struct cwt_wait {
-	wait_queue_entry_t	wait;
-	struct work_struct	*work;
-};
-
-static int cwt_wakefn(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
-{
-	struct cwt_wait *cwait = container_of(wait, struct cwt_wait, wait);
-
-	if (cwait->work != key)
-		return 0;
-	return autoremove_wake_function(wait, mode, sync, key);
-}
-
 /**
  * work_grab_pending - steal work item from worklist and disable irq
  * @work: work item to steal
@@ -2181,7 +2144,7 @@ static int cwt_wakefn(wait_queue_entry_t *wait, unsigned mode, int sync, void *k
  * Grab PENDING bit of @work. @work can be in any stable state - idle, on timer
  * or on worklist.
  *
- * Must be called in process context. IRQ is disabled on return with IRQ state
+ * Can be called from any context. IRQ is disabled on return with IRQ state
  * stored in *@irq_flags. The caller is responsible for re-enabling it using
  * local_irq_restore().
  *
@@ -2190,41 +2153,14 @@ static int cwt_wakefn(wait_queue_entry_t *wait, unsigned mode, int sync, void *k
 static bool work_grab_pending(struct work_struct *work, u32 cflags,
 			      unsigned long *irq_flags)
 {
-	struct cwt_wait cwait;
 	int ret;
 
-	might_sleep();
-repeat:
-	ret = try_to_grab_pending(work, cflags, irq_flags);
-	if (likely(ret >= 0))
-		return ret;
-	if (ret != -ENOENT)
-		goto repeat;
-
-	/*
-	 * Someone is already canceling. Wait for it to finish. flush_work()
-	 * doesn't work for PREEMPT_NONE because we may get woken up between
-	 * @work's completion and the other canceling task resuming and clearing
-	 * CANCELING - flush_work() will return false immediately as @work is no
-	 * longer busy, try_to_grab_pending() will return -ENOENT as @work is
-	 * still being canceled and the other canceling task won't be able to
-	 * clear CANCELING as we're hogging the CPU.
-	 *
-	 * Let's wait for completion using a waitqueue. As this may lead to the
-	 * thundering herd problem, use a custom wake function which matches
-	 * @work along with exclusive wait and wakeup.
-	 */
-	init_wait(&cwait.wait);
-	cwait.wait.func = cwt_wakefn;
-	cwait.work = work;
-
-	prepare_to_wait_exclusive(&wq_cancel_waitq, &cwait.wait,
-				  TASK_UNINTERRUPTIBLE);
-	if (work_is_canceling(work))
-		schedule();
-	finish_wait(&wq_cancel_waitq, &cwait.wait);
-
-	goto repeat;
+	while (true) {
+		ret = try_to_grab_pending(work, cflags, irq_flags);
+		if (ret >= 0)
+			return ret;
+		cpu_relax();
+	}
 }
 
 /**
@@ -2642,19 +2578,14 @@ bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
 			 struct delayed_work *dwork, unsigned long delay)
 {
 	unsigned long irq_flags;
-	int ret;
+	bool ret;
 
-	do {
-		ret = try_to_grab_pending(&dwork->work, WORK_CANCEL_DELAYED,
-					  &irq_flags);
-	} while (unlikely(ret == -EAGAIN));
+	ret = work_grab_pending(&dwork->work, WORK_CANCEL_DELAYED, &irq_flags);
 
-	if (likely(ret >= 0)) {
+	if (!clear_pending_if_disabled(&dwork->work))
 		__queue_delayed_work(cpu, wq, dwork, delay);
-		local_irq_restore(irq_flags);
-	}
 
-	/* -ENOENT from try_to_grab_pending() becomes %true */
+	local_irq_restore(irq_flags);
 	return ret;
 }
 EXPORT_SYMBOL_GPL(mod_delayed_work_on);
@@ -4235,16 +4166,7 @@ static bool __cancel_work(struct work_struct *work, u32 cflags)
 	unsigned long irq_flags;
 	int ret;
 
-	if (cflags & WORK_CANCEL_DISABLE) {
-		ret = work_grab_pending(work, cflags, &irq_flags);
-	} else {
-		do {
-			ret = try_to_grab_pending(work, cflags, &irq_flags);
-		} while (unlikely(ret == -EAGAIN));
-
-		if (unlikely(ret < 0))
-			return false;
-	}
+	ret = work_grab_pending(work, cflags, &irq_flags);
 
 	work_offqd_unpack(&offqd, *work_data_bits(work));
 
@@ -4259,22 +4181,9 @@ static bool __cancel_work(struct work_struct *work, u32 cflags)
 
 static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
 {
-	struct work_offq_data offqd;
-	unsigned long irq_flags;
 	bool ret;
 
-	/* claim @work and tell other tasks trying to grab @work to back off */
-	ret = work_grab_pending(work, cflags, &irq_flags);
-
-	work_offqd_unpack(&offqd, *work_data_bits(work));
-
-	if (cflags & WORK_CANCEL_DISABLE)
-		work_offqd_disable(&offqd);
-
-	offqd.flags |= WORK_OFFQ_CANCELING;
-	set_work_pool_and_keep_pending(work, offqd.pool_id,
-				       work_offqd_pack_flags(&offqd));
-	local_irq_restore(irq_flags);
+	ret = __cancel_work(work, cflags | WORK_CANCEL_DISABLE);
 
 	/*
 	 * Skip __flush_work() during early boot when we know that @work isn't
@@ -4283,19 +4192,8 @@ static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
 	if (wq_online)
 		__flush_work(work, true);
 
-	work_offqd_unpack(&offqd, *work_data_bits(work));
-
-	/*
-	 * smp_mb() at the end of set_work_pool_and_clear_pending() is paired
-	 * with prepare_to_wait() above so that either waitqueue_active() is
-	 * visible here or !work_is_canceling() is visible there.
-	 */
-	offqd.flags &= ~WORK_OFFQ_CANCELING;
-	set_work_pool_and_clear_pending(work, WORK_OFFQ_POOL_NONE,
-					work_offqd_pack_flags(&offqd));
-
-	if (waitqueue_active(&wq_cancel_waitq))
-		__wake_up(&wq_cancel_waitq, TASK_NORMAL, 1, work);
+	if (!(cflags & WORK_CANCEL_DISABLE))
+		enable_work(work);
 
 	return ret;
 }
@@ -4379,8 +4277,8 @@ EXPORT_SYMBOL(cancel_delayed_work_sync);
  * will fail and return %false. The maximum supported disable depth is 2 to the
  * power of %WORK_OFFQ_DISABLE_BITS, currently 65536.
  *
- * Must be called from a sleepable context. Returns %true if @work was pending,
- * %false otherwise.
+ * Can be called from any context. Returns %true if @work was pending, %false
+ * otherwise.
  */
 bool disable_work(struct work_struct *work)
 {
@@ -4411,8 +4309,8 @@ EXPORT_SYMBOL_GPL(disable_work_sync);
  * Undo disable_work[_sync]() by decrementing @work's disable count. @work can
  * only be queued if its disable count is 0.
  *
- * Must be called from a sleepable context. Returns %true if the disable count
- * reached 0. Otherwise, %false.
+ * Can be called from any context. Returns %true if the disable count reached 0.
+ * Otherwise, %false.
  */
 bool enable_work(struct work_struct *work)
 {
-- 
2.43.2


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 4/6] workqueue: Remember whether a work item was on a BH workqueue
  2024-02-27 17:28 [PATCHSET v3 wq/6.10] workqueue: Implement disable/enable_work() Tejun Heo
                   ` (2 preceding siblings ...)
  2024-02-27 17:28 ` [PATCH 3/6] workqueue: Remove WORK_OFFQ_CANCELING Tejun Heo
@ 2024-02-27 17:28 ` Tejun Heo
  2024-02-27 17:28 ` [PATCH 5/6] workqueue: Allow cancel_work_sync() and disable_work() from atomic contexts on BH work items Tejun Heo
                   ` (2 subsequent siblings)
  6 siblings, 0 replies; 13+ messages in thread
From: Tejun Heo @ 2024-02-27 17:28 UTC (permalink / raw)
  To: jiangshanlai
  Cc: torvalds, linux-kernel, allen.lkml, kernel-team, boqun.feng, tglx,
	peterz, romain.perier, mingo, Tejun Heo

Add an off-queue flag, WORK_OFFQ_BH, that indicates whether the last
workqueue the work item was on was a BH one. This will be used to test
whether a work item is BH in cancel_sync path to implement atomic
cancel_sync'ing for BH work items.

Signed-off-by: Tejun Heo <tj@kernel.org>
---
 include/linux/workqueue.h |  4 +++-
 kernel/workqueue.c        | 10 ++++++++--
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 86483743ad28..7710cd52f7f0 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -52,9 +52,10 @@ enum work_bits {
 	 *
 	 * MSB
 	 * [ pool ID ] [ disable depth ] [ OFFQ flags ] [ STRUCT flags ]
-	 *                  16 bits          0 bits       4 or 5 bits
+	 *                  16 bits          1 bit        4 or 5 bits
 	 */
 	WORK_OFFQ_FLAG_SHIFT	= WORK_STRUCT_FLAG_BITS,
+	WORK_OFFQ_BH_BIT	= WORK_OFFQ_FLAG_SHIFT,
 	WORK_OFFQ_FLAG_END,
 	WORK_OFFQ_FLAG_BITS	= WORK_OFFQ_FLAG_END - WORK_OFFQ_FLAG_SHIFT,
 
@@ -98,6 +99,7 @@ enum wq_misc_consts {
 };
 
 /* Convenience constants - of type 'unsigned long', not 'enum'! */
+#define WORK_OFFQ_BH		(1ul << WORK_OFFQ_BH_BIT)
 #define WORK_OFFQ_FLAG_MASK	(((1ul << WORK_OFFQ_FLAG_BITS) - 1) << WORK_OFFQ_FLAG_SHIFT)
 #define WORK_OFFQ_DISABLE_MASK	(((1ul << WORK_OFFQ_DISABLE_BITS) - 1) << WORK_OFFQ_DISABLE_SHIFT)
 #define WORK_OFFQ_POOL_NONE	((1ul << WORK_OFFQ_POOL_BITS) - 1)
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 07e77130227c..5c71fbd9d854 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -763,6 +763,11 @@ static int work_next_color(int color)
 	return (color + 1) % WORK_NR_COLORS;
 }
 
+static unsigned long pool_offq_flags(struct worker_pool *pool)
+{
+	return (pool->flags & POOL_BH) ? WORK_OFFQ_BH : 0;
+}
+
 /*
  * While queued, %WORK_STRUCT_PWQ is set and non flag bits of a work's data
  * contain the pointer to the queued pwq.  Once execution starts, the flag
@@ -2119,7 +2124,8 @@ static int try_to_grab_pending(struct work_struct *work, u32 cflags,
 		 * this destroys work->data needed by the next step, stash it.
 		 */
 		work_data = *work_data_bits(work);
-		set_work_pool_and_keep_pending(work, pool->id, 0);
+		set_work_pool_and_keep_pending(work, pool->id,
+					       pool_offq_flags(pool));
 
 		/* must be the last step, see the function comment */
 		pwq_dec_nr_in_flight(pwq, work_data);
@@ -3171,7 +3177,7 @@ __acquires(&pool->lock)
 	 * PENDING and queued state changes happen together while IRQ is
 	 * disabled.
 	 */
-	set_work_pool_and_clear_pending(work, pool->id, 0);
+	set_work_pool_and_clear_pending(work, pool->id, pool_offq_flags(pool));
 
 	pwq->stats[PWQ_STAT_STARTED]++;
 	raw_spin_unlock_irq(&pool->lock);
-- 
2.43.2


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 5/6] workqueue: Allow cancel_work_sync() and disable_work() from atomic contexts on BH work items
  2024-02-27 17:28 [PATCHSET v3 wq/6.10] workqueue: Implement disable/enable_work() Tejun Heo
                   ` (3 preceding siblings ...)
  2024-02-27 17:28 ` [PATCH 4/6] workqueue: Remember whether a work item was on a BH workqueue Tejun Heo
@ 2024-02-27 17:28 ` Tejun Heo
  2024-02-27 17:28 ` [PATCH 6/6] r8152: Convert from tasklet to BH workqueue Tejun Heo
  2024-02-28 10:00 ` [PATCHSET v3 wq/6.10] workqueue: Implement disable/enable_work() Lai Jiangshan
  6 siblings, 0 replies; 13+ messages in thread
From: Tejun Heo @ 2024-02-27 17:28 UTC (permalink / raw)
  To: jiangshanlai
  Cc: torvalds, linux-kernel, allen.lkml, kernel-team, boqun.feng, tglx,
	peterz, romain.perier, mingo, Tejun Heo

Now that work_grab_pending() can always grab the PENDING bit without
sleeping, the only thing that prevents allowing cancel_work_sync() of a BH
work item from an atomic context is the flushing of the in-flight instance.

When we're flushing a BH work item for cancel_work_sync(), we know that the
work item is not queued and must be executing in a BH context, which means
that it's safe to busy-wait for its completion from a non-hardirq atomic
context.

This patch updates __flush_work() so that it busy-waits when flushing a BH
work item for cancel_work_sync(). might_sleep() is pushed from
start_flush_work() to its callers - when operating on a BH work item,
__cancel_work_sync() now enforces !in_hardirq() instead of might_sleep().

This allows cancel_work_sync() and disable_work() to be called from
non-hardirq atomic contexts on BH work items.

v3: In __flush_work(), test WORK_OFFQ_BH to tell whether a work item being
    canceled can be busy waited instead of making start_flush_work() return
    the pool. (Lai)

v2: Lai pointed out that __flush_work() was accessing pool->flags outside
    the RCU critical section protecting the pool pointer. Fix it by testing
    and remembering the result inside the RCU critical section.

Signed-off-by: Tejun Heo <tj@kernel.org>
Cc: Lai Jiangshan <jiangshanlai@gmail.com>
---
 kernel/workqueue.c | 74 ++++++++++++++++++++++++++++++++++------------
 1 file changed, 55 insertions(+), 19 deletions(-)

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 5c71fbd9d854..7d8eaca294c9 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -4020,8 +4020,6 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
 	struct pool_workqueue *pwq;
 	struct workqueue_struct *wq;
 
-	might_sleep();
-
 	rcu_read_lock();
 	pool = get_work_pool(work);
 	if (!pool) {
@@ -4073,6 +4071,7 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
 static bool __flush_work(struct work_struct *work, bool from_cancel)
 {
 	struct wq_barrier barr;
+	unsigned long data;
 
 	if (WARN_ON(!wq_online))
 		return false;
@@ -4080,13 +4079,41 @@ static bool __flush_work(struct work_struct *work, bool from_cancel)
 	if (WARN_ON(!work->func))
 		return false;
 
-	if (start_flush_work(work, &barr, from_cancel)) {
-		wait_for_completion(&barr.done);
-		destroy_work_on_stack(&barr.work);
-		return true;
-	} else {
+	if (!start_flush_work(work, &barr, from_cancel))
 		return false;
+
+	/*
+	 * start_flush_work() returned %true. If @from_cancel is set, we know
+	 * that @work must have been executing during start_flush_work() and
+	 * can't currently be queued. Its data must contain OFFQ bits. If @work
+	 * was queued on a BH workqueue, we also know that it was running in the
+	 * BH context and thus can be busy-waited.
+	 */
+	data = *work_data_bits(work);
+	if (from_cancel &&
+	    !WARN_ON_ONCE(data & WORK_STRUCT_PWQ) && (data & WORK_OFFQ_BH)) {
+		/*
+		 * On RT, prevent a live lock when %current preempted soft
+		 * interrupt processing or prevents ksoftirqd from running by
+		 * keeping flipping BH. If the BH work item runs on a different
+		 * CPU then this has no effect other than doing the BH
+		 * disable/enable dance for nothing. This is copied from
+		 * kernel/softirq.c::tasklet_unlock_spin_wait().
+		 */
+		while (!try_wait_for_completion(&barr.done)) {
+			if (IS_ENABLED(CONFIG_PREEMPT_RT)) {
+				local_bh_disable();
+				local_bh_enable();
+			} else {
+				cpu_relax();
+			}
+		}
+	} else {
+		wait_for_completion(&barr.done);
 	}
+
+	destroy_work_on_stack(&barr.work);
+	return true;
 }
 
 /**
@@ -4102,6 +4129,7 @@ static bool __flush_work(struct work_struct *work, bool from_cancel)
  */
 bool flush_work(struct work_struct *work)
 {
+	might_sleep();
 	return __flush_work(work, false);
 }
 EXPORT_SYMBOL_GPL(flush_work);
@@ -4191,6 +4219,11 @@ static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
 
 	ret = __cancel_work(work, cflags | WORK_CANCEL_DISABLE);
 
+	if (*work_data_bits(work) & WORK_OFFQ_BH)
+		WARN_ON_ONCE(in_hardirq());
+	else
+		might_sleep();
+
 	/*
 	 * Skip __flush_work() during early boot when we know that @work isn't
 	 * executing. This allows canceling during early boot.
@@ -4217,19 +4250,19 @@ EXPORT_SYMBOL(cancel_work);
  * cancel_work_sync - cancel a work and wait for it to finish
  * @work: the work to cancel
  *
- * Cancel @work and wait for its execution to finish.  This function
- * can be used even if the work re-queues itself or migrates to
- * another workqueue.  On return from this function, @work is
- * guaranteed to be not pending or executing on any CPU.
+ * Cancel @work and wait for its execution to finish. This function can be used
+ * even if the work re-queues itself or migrates to another workqueue. On return
+ * from this function, @work is guaranteed to be not pending or executing on any
+ * CPU as long as there aren't racing enqueues.
  *
- * cancel_work_sync(&delayed_work->work) must not be used for
- * delayed_work's.  Use cancel_delayed_work_sync() instead.
+ * cancel_work_sync(&delayed_work->work) must not be used for delayed_work's.
+ * Use cancel_delayed_work_sync() instead.
  *
- * The caller must ensure that the workqueue on which @work was last
- * queued can't be destroyed before this function returns.
+ * Must be called from a sleepable context if @work was last queued on a non-BH
+ * workqueue. Can also be called from non-hardirq atomic contexts including BH
+ * if @work was last queued on a BH workqueue.
  *
- * Return:
- * %true if @work was pending, %false otherwise.
+ * Returns %true if @work was pending, %false otherwise.
  */
 bool cancel_work_sync(struct work_struct *work)
 {
@@ -4299,8 +4332,11 @@ EXPORT_SYMBOL_GPL(disable_work);
  * Similar to disable_work() but also wait for @work to finish if currently
  * executing.
  *
- * Must be called from a sleepable context. Returns %true if @work was pending,
- * %false otherwise.
+ * Must be called from a sleepable context if @work was last queued on a non-BH
+ * workqueue. Can also be called from non-hardirq atomic contexts including BH
+ * if @work was last queued on a BH workqueue.
+ *
+ * Returns %true if @work was pending, %false otherwise.
  */
 bool disable_work_sync(struct work_struct *work)
 {
-- 
2.43.2


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 6/6] r8152: Convert from tasklet to BH workqueue
  2024-02-27 17:28 [PATCHSET v3 wq/6.10] workqueue: Implement disable/enable_work() Tejun Heo
                   ` (4 preceding siblings ...)
  2024-02-27 17:28 ` [PATCH 5/6] workqueue: Allow cancel_work_sync() and disable_work() from atomic contexts on BH work items Tejun Heo
@ 2024-02-27 17:28 ` Tejun Heo
  2024-02-28 17:33   ` Allen
  2024-02-28 10:00 ` [PATCHSET v3 wq/6.10] workqueue: Implement disable/enable_work() Lai Jiangshan
  6 siblings, 1 reply; 13+ messages in thread
From: Tejun Heo @ 2024-02-27 17:28 UTC (permalink / raw)
  To: jiangshanlai
  Cc: torvalds, linux-kernel, allen.lkml, kernel-team, boqun.feng, tglx,
	peterz, romain.perier, mingo, Tejun Heo

tasklet is being replaced by BH workqueue. No noticeable behavior or
performance changes are expected. The following is how the two APIs map:

- tasklet_setup/init()		-> INIT_WORK()
- tasklet_schedule()		-> queue_work(system_bh_wq, ...)
- tasklet_hi_schedule()		-> queue_work(system_bh_highpri_wq, ...)
- tasklet_disable_nosync()	-> disable_work()
- tasklet_disable[_in_atomic]()	-> disable_work_sync()
- tasklet_enable()		-> enable_work() + queue_work()
- tasklet_kill()		-> cancel_work_sync()

Note that unlike tasklet_enable(), enable_work() doesn't queue the work item
automatically according to whether the work item was queued while disabled.
While the caller can track this separately, unconditionally scheduling the
work item after enable_work() returns %true should work for most users.

r8152 conversion has been tested by repeatedly forcing the device to go
through resets using usbreset under iperf3 generated traffic.

Signed-off-by: Tejun Heo <tj@kernel.org>
---
 drivers/net/usb/r8152.c | 44 ++++++++++++++++++++++-------------------
 1 file changed, 24 insertions(+), 20 deletions(-)

diff --git a/drivers/net/usb/r8152.c b/drivers/net/usb/r8152.c
index 9bf2140fd0a1..24e284b9eb38 100644
--- a/drivers/net/usb/r8152.c
+++ b/drivers/net/usb/r8152.c
@@ -882,7 +882,7 @@ struct r8152 {
 #ifdef CONFIG_PM_SLEEP
 	struct notifier_block pm_notifier;
 #endif
-	struct tasklet_struct tx_tl;
+	struct work_struct tx_work;
 
 	struct rtl_ops {
 		void (*init)(struct r8152 *tp);
@@ -1948,7 +1948,7 @@ static void write_bulk_callback(struct urb *urb)
 		return;
 
 	if (!skb_queue_empty(&tp->tx_queue))
-		tasklet_schedule(&tp->tx_tl);
+		queue_work(system_bh_wq, &tp->tx_work);
 }
 
 static void intr_callback(struct urb *urb)
@@ -2746,9 +2746,9 @@ static void tx_bottom(struct r8152 *tp)
 	} while (res == 0);
 }
 
-static void bottom_half(struct tasklet_struct *t)
+static void bottom_half(struct work_struct *work)
 {
-	struct r8152 *tp = from_tasklet(tp, t, tx_tl);
+	struct r8152 *tp = container_of(work, struct r8152, tx_work);
 
 	if (test_bit(RTL8152_INACCESSIBLE, &tp->flags))
 		return;
@@ -2942,7 +2942,7 @@ static netdev_tx_t rtl8152_start_xmit(struct sk_buff *skb,
 			schedule_delayed_work(&tp->schedule, 0);
 		} else {
 			usb_mark_last_busy(tp->udev);
-			tasklet_schedule(&tp->tx_tl);
+			queue_work(system_bh_wq, &tp->tx_work);
 		}
 	} else if (skb_queue_len(&tp->tx_queue) > tp->tx_qlen) {
 		netif_stop_queue(netdev);
@@ -6824,11 +6824,12 @@ static void set_carrier(struct r8152 *tp)
 	} else {
 		if (netif_carrier_ok(netdev)) {
 			netif_carrier_off(netdev);
-			tasklet_disable(&tp->tx_tl);
+			disable_work_sync(&tp->tx_work);
 			napi_disable(napi);
 			tp->rtl_ops.disable(tp);
 			napi_enable(napi);
-			tasklet_enable(&tp->tx_tl);
+			enable_work(&tp->tx_work);
+			queue_work(system_bh_wq, &tp->tx_work);
 			netif_info(tp, link, netdev, "carrier off\n");
 		}
 	}
@@ -6864,7 +6865,7 @@ static void rtl_work_func_t(struct work_struct *work)
 	/* don't schedule tasket before linking */
 	if (test_and_clear_bit(SCHEDULE_TASKLET, &tp->flags) &&
 	    netif_carrier_ok(tp->netdev))
-		tasklet_schedule(&tp->tx_tl);
+		queue_work(system_bh_wq, &tp->tx_work);
 
 	if (test_and_clear_bit(RX_EPROTO, &tp->flags) &&
 	    !list_empty(&tp->rx_done))
@@ -6971,7 +6972,7 @@ static int rtl8152_open(struct net_device *netdev)
 		goto out_unlock;
 	}
 	napi_enable(&tp->napi);
-	tasklet_enable(&tp->tx_tl);
+	enable_work(&tp->tx_work);
 
 	mutex_unlock(&tp->control);
 
@@ -6999,7 +7000,7 @@ static int rtl8152_close(struct net_device *netdev)
 #ifdef CONFIG_PM_SLEEP
 	unregister_pm_notifier(&tp->pm_notifier);
 #endif
-	tasklet_disable(&tp->tx_tl);
+	disable_work_sync(&tp->tx_work);
 	clear_bit(WORK_ENABLE, &tp->flags);
 	usb_kill_urb(tp->intr_urb);
 	cancel_delayed_work_sync(&tp->schedule);
@@ -8421,7 +8422,7 @@ static int rtl8152_pre_reset(struct usb_interface *intf)
 		return 0;
 
 	netif_stop_queue(netdev);
-	tasklet_disable(&tp->tx_tl);
+	disable_work_sync(&tp->tx_work);
 	clear_bit(WORK_ENABLE, &tp->flags);
 	usb_kill_urb(tp->intr_urb);
 	cancel_delayed_work_sync(&tp->schedule);
@@ -8466,7 +8467,8 @@ static int rtl8152_post_reset(struct usb_interface *intf)
 	}
 
 	napi_enable(&tp->napi);
-	tasklet_enable(&tp->tx_tl);
+	enable_work(&tp->tx_work);
+	queue_work(system_bh_wq, &tp->tx_work);
 	netif_wake_queue(netdev);
 	usb_submit_urb(tp->intr_urb, GFP_KERNEL);
 
@@ -8625,12 +8627,13 @@ static int rtl8152_system_suspend(struct r8152 *tp)
 
 		clear_bit(WORK_ENABLE, &tp->flags);
 		usb_kill_urb(tp->intr_urb);
-		tasklet_disable(&tp->tx_tl);
+		disable_work_sync(&tp->tx_work);
 		napi_disable(napi);
 		cancel_delayed_work_sync(&tp->schedule);
 		tp->rtl_ops.down(tp);
 		napi_enable(napi);
-		tasklet_enable(&tp->tx_tl);
+		enable_work(&tp->tx_work);
+		queue_work(system_bh_wq, &tp->tx_work);
 	}
 
 	return 0;
@@ -9387,11 +9390,12 @@ static int rtl8152_change_mtu(struct net_device *dev, int new_mtu)
 		if (netif_carrier_ok(dev)) {
 			netif_stop_queue(dev);
 			napi_disable(&tp->napi);
-			tasklet_disable(&tp->tx_tl);
+			disable_work_sync(&tp->tx_work);
 			tp->rtl_ops.disable(tp);
 			tp->rtl_ops.enable(tp);
 			rtl_start_rx(tp);
-			tasklet_enable(&tp->tx_tl);
+			enable_work(&tp->tx_work);
+			queue_work(system_bh_wq, &tp->tx_work);
 			napi_enable(&tp->napi);
 			rtl8152_set_rx_mode(dev);
 			netif_wake_queue(dev);
@@ -9819,8 +9823,8 @@ static int rtl8152_probe_once(struct usb_interface *intf,
 	mutex_init(&tp->control);
 	INIT_DELAYED_WORK(&tp->schedule, rtl_work_func_t);
 	INIT_DELAYED_WORK(&tp->hw_phy_work, rtl_hw_phy_work_func_t);
-	tasklet_setup(&tp->tx_tl, bottom_half);
-	tasklet_disable(&tp->tx_tl);
+	INIT_WORK(&tp->tx_work, bottom_half);
+	disable_work(&tp->tx_work);
 
 	netdev->netdev_ops = &rtl8152_netdev_ops;
 	netdev->watchdog_timeo = RTL8152_TX_TIMEOUT;
@@ -9954,7 +9958,7 @@ static int rtl8152_probe_once(struct usb_interface *intf,
 	unregister_netdev(netdev);
 
 out1:
-	tasklet_kill(&tp->tx_tl);
+	cancel_work_sync(&tp->tx_work);
 	cancel_delayed_work_sync(&tp->hw_phy_work);
 	if (tp->rtl_ops.unload)
 		tp->rtl_ops.unload(tp);
@@ -10010,7 +10014,7 @@ static void rtl8152_disconnect(struct usb_interface *intf)
 		rtl_set_unplug(tp);
 
 		unregister_netdev(tp->netdev);
-		tasklet_kill(&tp->tx_tl);
+		cancel_work_sync(&tp->tx_work);
 		cancel_delayed_work_sync(&tp->hw_phy_work);
 		if (tp->rtl_ops.unload)
 			tp->rtl_ops.unload(tp);
-- 
2.43.2


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* Re: [PATCHSET v3 wq/6.10] workqueue: Implement disable/enable_work()
  2024-02-27 17:28 [PATCHSET v3 wq/6.10] workqueue: Implement disable/enable_work() Tejun Heo
                   ` (5 preceding siblings ...)
  2024-02-27 17:28 ` [PATCH 6/6] r8152: Convert from tasklet to BH workqueue Tejun Heo
@ 2024-02-28 10:00 ` Lai Jiangshan
  2024-03-25 17:31   ` Tejun Heo
  6 siblings, 1 reply; 13+ messages in thread
From: Lai Jiangshan @ 2024-02-28 10:00 UTC (permalink / raw)
  To: Tejun Heo
  Cc: torvalds, linux-kernel, allen.lkml, kernel-team, boqun.feng, tglx,
	peterz, romain.perier, mingo

Hello

For patch 1-5:

Reviewed-by: Lai Jiangshan <jiangshanlai@gmail.com>

thanks
Lai

On Wed, Feb 28, 2024 at 1:28 AM Tejun Heo <tj@kernel.org> wrote:

>
>  git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq.git disable_work-v3
>
> diffstat follows. Thanks.
>
>  drivers/net/usb/r8152.c   |   44 ++++----
>  include/linux/workqueue.h |   23 +++-
>  kernel/workqueue.c        |  384 ++++++++++++++++++++++++++++++++++++++++++++++---------------------------
>

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 6/6] r8152: Convert from tasklet to BH workqueue
  2024-02-27 17:28 ` [PATCH 6/6] r8152: Convert from tasklet to BH workqueue Tejun Heo
@ 2024-02-28 17:33   ` Allen
  2024-02-28 17:49     ` Tejun Heo
  0 siblings, 1 reply; 13+ messages in thread
From: Allen @ 2024-02-28 17:33 UTC (permalink / raw)
  To: Tejun Heo
  Cc: jiangshanlai, torvalds, linux-kernel, kernel-team, boqun.feng,
	tglx, peterz, romain.perier, mingo

> tasklet is being replaced by BH workqueue. No noticeable behavior or
> performance changes are expected. The following is how the two APIs map:
>
> - tasklet_setup/init()          -> INIT_WORK()
> - tasklet_schedule()            -> queue_work(system_bh_wq, ...)
> - tasklet_hi_schedule()         -> queue_work(system_bh_highpri_wq, ...)
> - tasklet_disable_nosync()      -> disable_work()
> - tasklet_disable[_in_atomic]() -> disable_work_sync()
> - tasklet_enable()              -> enable_work() + queue_work()
> - tasklet_kill()                -> cancel_work_sync()
>
> Note that unlike tasklet_enable(), enable_work() doesn't queue the work item
> automatically according to whether the work item was queued while disabled.
> While the caller can track this separately, unconditionally scheduling the
> work item after enable_work() returns %true should work for most users.
>
> r8152 conversion has been tested by repeatedly forcing the device to go
> through resets using usbreset under iperf3 generated traffic.
>
> Signed-off-by: Tejun Heo <tj@kernel.org>
> ---
>  drivers/net/usb/r8152.c | 44 ++++++++++++++++++++++-------------------
>  1 file changed, 24 insertions(+), 20 deletions(-)
>
> diff --git a/drivers/net/usb/r8152.c b/drivers/net/usb/r8152.c
> index 9bf2140fd0a1..24e284b9eb38 100644
> --- a/drivers/net/usb/r8152.c
> +++ b/drivers/net/usb/r8152.c
> @@ -882,7 +882,7 @@ struct r8152 {
>  #ifdef CONFIG_PM_SLEEP
>         struct notifier_block pm_notifier;
>  #endif
> -       struct tasklet_struct tx_tl;
> +       struct work_struct tx_work;
>
>         struct rtl_ops {
>                 void (*init)(struct r8152 *tp);
> @@ -1948,7 +1948,7 @@ static void write_bulk_callback(struct urb *urb)
>                 return;
>
>         if (!skb_queue_empty(&tp->tx_queue))
> -               tasklet_schedule(&tp->tx_tl);
> +               queue_work(system_bh_wq, &tp->tx_work);
>  }
>
>  static void intr_callback(struct urb *urb)
> @@ -2746,9 +2746,9 @@ static void tx_bottom(struct r8152 *tp)
>         } while (res == 0);
>  }
>
> -static void bottom_half(struct tasklet_struct *t)
> +static void bottom_half(struct work_struct *work)
>  {
> -       struct r8152 *tp = from_tasklet(tp, t, tx_tl);
> +       struct r8152 *tp = container_of(work, struct r8152, tx_work);
>
>         if (test_bit(RTL8152_INACCESSIBLE, &tp->flags))
>                 return;
> @@ -2942,7 +2942,7 @@ static netdev_tx_t rtl8152_start_xmit(struct sk_buff *skb,
>                         schedule_delayed_work(&tp->schedule, 0);
>                 } else {
>                         usb_mark_last_busy(tp->udev);
> -                       tasklet_schedule(&tp->tx_tl);
> +                       queue_work(system_bh_wq, &tp->tx_work);
>                 }
>         } else if (skb_queue_len(&tp->tx_queue) > tp->tx_qlen) {
>                 netif_stop_queue(netdev);
> @@ -6824,11 +6824,12 @@ static void set_carrier(struct r8152 *tp)
>         } else {
>                 if (netif_carrier_ok(netdev)) {
>                         netif_carrier_off(netdev);
> -                       tasklet_disable(&tp->tx_tl);
> +                       disable_work_sync(&tp->tx_work);
>                         napi_disable(napi);
>                         tp->rtl_ops.disable(tp);
>                         napi_enable(napi);
> -                       tasklet_enable(&tp->tx_tl);
> +                       enable_work(&tp->tx_work);
> +                       queue_work(system_bh_wq, &tp->tx_work);
>                         netif_info(tp, link, netdev, "carrier off\n");
>                 }
>         }
> @@ -6864,7 +6865,7 @@ static void rtl_work_func_t(struct work_struct *work)
>         /* don't schedule tasket before linking */
>         if (test_and_clear_bit(SCHEDULE_TASKLET, &tp->flags) &&
>             netif_carrier_ok(tp->netdev))
> -               tasklet_schedule(&tp->tx_tl);
> +               queue_work(system_bh_wq, &tp->tx_work);
>
>         if (test_and_clear_bit(RX_EPROTO, &tp->flags) &&
>             !list_empty(&tp->rx_done))
> @@ -6971,7 +6972,7 @@ static int rtl8152_open(struct net_device *netdev)
>                 goto out_unlock;
>         }
>         napi_enable(&tp->napi);
> -       tasklet_enable(&tp->tx_tl);
> +       enable_work(&tp->tx_work);

  I think we are missing queue_work() above, right?

   To avoid such situations, could we combine enable_work() + queue_work(),
into a single API?
Perhaps, something like:

static inline bool enable_and_queue_work(struct workqueue_struct *wq,
struct work_struct *work)
{
       if (enable_work(work))
       {
             queue_work(wq, work);
             return true;
       }
       return false;
}


Thanks,
Allen


>         mutex_unlock(&tp->control);
>
> @@ -6999,7 +7000,7 @@ static int rtl8152_close(struct net_device *netdev)
>  #ifdef CONFIG_PM_SLEEP
>         unregister_pm_notifier(&tp->pm_notifier);
>  #endif
> -       tasklet_disable(&tp->tx_tl);
> +       disable_work_sync(&tp->tx_work);
>         clear_bit(WORK_ENABLE, &tp->flags);
>         usb_kill_urb(tp->intr_urb);
>         cancel_delayed_work_sync(&tp->schedule);
> @@ -8421,7 +8422,7 @@ static int rtl8152_pre_reset(struct usb_interface *intf)
>                 return 0;
>
>         netif_stop_queue(netdev);
> -       tasklet_disable(&tp->tx_tl);
> +       disable_work_sync(&tp->tx_work);
>         clear_bit(WORK_ENABLE, &tp->flags);
>         usb_kill_urb(tp->intr_urb);
>         cancel_delayed_work_sync(&tp->schedule);
> @@ -8466,7 +8467,8 @@ static int rtl8152_post_reset(struct usb_interface *intf)
>         }
>
>         napi_enable(&tp->napi);
> -       tasklet_enable(&tp->tx_tl);
> +       enable_work(&tp->tx_work);
> +       queue_work(system_bh_wq, &tp->tx_work);
>         netif_wake_queue(netdev);
>         usb_submit_urb(tp->intr_urb, GFP_KERNEL);
>
> @@ -8625,12 +8627,13 @@ static int rtl8152_system_suspend(struct r8152 *tp)
>
>                 clear_bit(WORK_ENABLE, &tp->flags);
>                 usb_kill_urb(tp->intr_urb);
> -               tasklet_disable(&tp->tx_tl);
> +               disable_work_sync(&tp->tx_work);
>                 napi_disable(napi);
>                 cancel_delayed_work_sync(&tp->schedule);
>                 tp->rtl_ops.down(tp);
>                 napi_enable(napi);
> -               tasklet_enable(&tp->tx_tl);
> +               enable_work(&tp->tx_work);
> +               queue_work(system_bh_wq, &tp->tx_work);
>         }
>
>         return 0;
> @@ -9387,11 +9390,12 @@ static int rtl8152_change_mtu(struct net_device *dev, int new_mtu)
>                 if (netif_carrier_ok(dev)) {
>                         netif_stop_queue(dev);
>                         napi_disable(&tp->napi);
> -                       tasklet_disable(&tp->tx_tl);
> +                       disable_work_sync(&tp->tx_work);
>                         tp->rtl_ops.disable(tp);
>                         tp->rtl_ops.enable(tp);
>                         rtl_start_rx(tp);
> -                       tasklet_enable(&tp->tx_tl);
> +                       enable_work(&tp->tx_work);
> +                       queue_work(system_bh_wq, &tp->tx_work);
>                         napi_enable(&tp->napi);
>                         rtl8152_set_rx_mode(dev);
>                         netif_wake_queue(dev);
> @@ -9819,8 +9823,8 @@ static int rtl8152_probe_once(struct usb_interface *intf,
>         mutex_init(&tp->control);
>         INIT_DELAYED_WORK(&tp->schedule, rtl_work_func_t);
>         INIT_DELAYED_WORK(&tp->hw_phy_work, rtl_hw_phy_work_func_t);
> -       tasklet_setup(&tp->tx_tl, bottom_half);
> -       tasklet_disable(&tp->tx_tl);
> +       INIT_WORK(&tp->tx_work, bottom_half);
> +       disable_work(&tp->tx_work);
>
>         netdev->netdev_ops = &rtl8152_netdev_ops;
>         netdev->watchdog_timeo = RTL8152_TX_TIMEOUT;
> @@ -9954,7 +9958,7 @@ static int rtl8152_probe_once(struct usb_interface *intf,
>         unregister_netdev(netdev);
>
>  out1:
> -       tasklet_kill(&tp->tx_tl);
> +       cancel_work_sync(&tp->tx_work);
>         cancel_delayed_work_sync(&tp->hw_phy_work);
>         if (tp->rtl_ops.unload)
>                 tp->rtl_ops.unload(tp);
> @@ -10010,7 +10014,7 @@ static void rtl8152_disconnect(struct usb_interface *intf)
>                 rtl_set_unplug(tp);
>
>                 unregister_netdev(tp->netdev);
> -               tasklet_kill(&tp->tx_tl);
> +               cancel_work_sync(&tp->tx_work);
>                 cancel_delayed_work_sync(&tp->hw_phy_work);
>                 if (tp->rtl_ops.unload)
>                         tp->rtl_ops.unload(tp);
> --
> 2.43.2
>


-- 
       - Allen

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 6/6] r8152: Convert from tasklet to BH workqueue
  2024-02-28 17:33   ` Allen
@ 2024-02-28 17:49     ` Tejun Heo
  2024-02-28 18:00       ` Allen
  0 siblings, 1 reply; 13+ messages in thread
From: Tejun Heo @ 2024-02-28 17:49 UTC (permalink / raw)
  To: Allen
  Cc: jiangshanlai, torvalds, linux-kernel, kernel-team, boqun.feng,
	tglx, peterz, romain.perier, mingo

Hello,

On Wed, Feb 28, 2024 at 09:33:40AM -0800, Allen wrote:
> > @@ -6971,7 +6972,7 @@ static int rtl8152_open(struct net_device *netdev)
> >                 goto out_unlock;
> >         }
> >         napi_enable(&tp->napi);
> > -       tasklet_enable(&tp->tx_tl);
> > +       enable_work(&tp->tx_work);
> 
>   I think we are missing queue_work() above, right?
> 
>    To avoid such situations, could we combine enable_work() + queue_work(),
> into a single API?

Here, the device is newly being opened and the work item is just disabled
from the init. So, it doesn't need queueing.

> Perhaps, something like:
> 
> static inline bool enable_and_queue_work(struct workqueue_struct *wq,
> struct work_struct *work)
> {
>        if (enable_work(work))
>        {
>              queue_work(wq, work);
>              return true;
>        }
>        return false;
> }

That said, this may still be nice to have.

Thanks.

-- 
tejun

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 6/6] r8152: Convert from tasklet to BH workqueue
  2024-02-28 17:49     ` Tejun Heo
@ 2024-02-28 18:00       ` Allen
  2024-02-28 18:01         ` Tejun Heo
  0 siblings, 1 reply; 13+ messages in thread
From: Allen @ 2024-02-28 18:00 UTC (permalink / raw)
  To: Tejun Heo
  Cc: jiangshanlai, torvalds, linux-kernel, kernel-team, boqun.feng,
	tglx, peterz, romain.perier, mingo

> > > @@ -6971,7 +6972,7 @@ static int rtl8152_open(struct net_device *netdev)
> > >                 goto out_unlock;
> > >         }
> > >         napi_enable(&tp->napi);
> > > -       tasklet_enable(&tp->tx_tl);
> > > +       enable_work(&tp->tx_work);
> >
> >   I think we are missing queue_work() above, right?
> >
> >    To avoid such situations, could we combine enable_work() + queue_work(),
> > into a single API?
>
> Here, the device is newly being opened and the work item is just disabled
> from the init. So, it doesn't need queueing.
>

Ah, my bad. Thanks for pointing it out.

> > Perhaps, something like:
> >
> > static inline bool enable_and_queue_work(struct workqueue_struct *wq,
> > struct work_struct *work)
> > {
> >        if (enable_work(work))
> >        {
> >              queue_work(wq, work);
> >              return true;
> >        }
> >        return false;
> > }
>
> That said, this may still be nice to have.
>

 If the above is okay, I could send out a patch. Please let me know.

Thanks.

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCH 6/6] r8152: Convert from tasklet to BH workqueue
  2024-02-28 18:00       ` Allen
@ 2024-02-28 18:01         ` Tejun Heo
  0 siblings, 0 replies; 13+ messages in thread
From: Tejun Heo @ 2024-02-28 18:01 UTC (permalink / raw)
  To: Allen
  Cc: jiangshanlai, torvalds, linux-kernel, kernel-team, boqun.feng,
	tglx, peterz, romain.perier, mingo

On Wed, Feb 28, 2024 at 10:00:08AM -0800, Allen wrote:
> > > static inline bool enable_and_queue_work(struct workqueue_struct *wq,
> > > struct work_struct *work)
> > > {
> > >        if (enable_work(work))
> > >        {
> > >              queue_work(wq, work);
> > >              return true;
> > >        }
> > >        return false;
> > > }
> >
> > That said, this may still be nice to have.
> 
>  If the above is okay, I could send out a patch. Please let me know.

Yes, please.

Thanks.

-- 
tejun

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [PATCHSET v3 wq/6.10] workqueue: Implement disable/enable_work()
  2024-02-28 10:00 ` [PATCHSET v3 wq/6.10] workqueue: Implement disable/enable_work() Lai Jiangshan
@ 2024-03-25 17:31   ` Tejun Heo
  0 siblings, 0 replies; 13+ messages in thread
From: Tejun Heo @ 2024-03-25 17:31 UTC (permalink / raw)
  To: Lai Jiangshan
  Cc: torvalds, linux-kernel, allen.lkml, kernel-team, boqun.feng, tglx,
	peterz, romain.perier, mingo

On Wed, Feb 28, 2024 at 06:00:22PM +0800, Lai Jiangshan wrote:
> Hello
> 
> For patch 1-5:
> 
> Reviewed-by: Lai Jiangshan <jiangshanlai@gmail.com>

Applied 1-5 to wq/for-6.10.

Thanks.

-- 
tejun

^ permalink raw reply	[flat|nested] 13+ messages in thread

end of thread, other threads:[~2024-03-25 17:31 UTC | newest]

Thread overview: 13+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2024-02-27 17:28 [PATCHSET v3 wq/6.10] workqueue: Implement disable/enable_work() Tejun Heo
2024-02-27 17:28 ` [PATCH 1/6] workqueue: Preserve OFFQ bits in cancel[_sync] paths Tejun Heo
2024-02-27 17:28 ` [PATCH 2/6] workqueue: Implement disable/enable for (delayed) work items Tejun Heo
2024-02-27 17:28 ` [PATCH 3/6] workqueue: Remove WORK_OFFQ_CANCELING Tejun Heo
2024-02-27 17:28 ` [PATCH 4/6] workqueue: Remember whether a work item was on a BH workqueue Tejun Heo
2024-02-27 17:28 ` [PATCH 5/6] workqueue: Allow cancel_work_sync() and disable_work() from atomic contexts on BH work items Tejun Heo
2024-02-27 17:28 ` [PATCH 6/6] r8152: Convert from tasklet to BH workqueue Tejun Heo
2024-02-28 17:33   ` Allen
2024-02-28 17:49     ` Tejun Heo
2024-02-28 18:00       ` Allen
2024-02-28 18:01         ` Tejun Heo
2024-02-28 10:00 ` [PATCHSET v3 wq/6.10] workqueue: Implement disable/enable_work() Lai Jiangshan
2024-03-25 17:31   ` Tejun Heo

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox