[12/17] workqueue: Implement disable/enable for (delayed) work items

Message ID 20240216180559.208276-13-tj@kernel.org
State New
Headers
Series [01/17] workqueue: Cosmetic changes |

Commit Message

Tejun Heo Feb. 16, 2024, 6:04 p.m. UTC
  While (delayed) work items could be flushed and canceled, there was no way
to prevent them from being queued in the future. While this didn't lead to
functional deficiencies, it sometimes required a bit more effort from the
workqueue users to e.g. sequence shutdown steps with more care.

Workqueue is currently in the process of replacing tasklet which does
support disabling and enabling. The feature is used relatively widely to,
for example, temporarily suppress main path while a control plane operation
(reset or config change) is in progress.

To enable easy conversion of tasklet users and as it seems like an inherent
useful feature, this patch implements disabling and enabling of work items.

- A work item carries 10bit disable count in work->data while not queued.
  The access to the count is synchronized by the PENDING bit like all other
  parts of work->data.

- If the count is non-zero, the work item cannot be queued. Any attempt to
  queue the work item fails and returns %false.

- disable_work[_sync](), enable_work(), disable_delayed_work[_sync]() and
  enable_delayed_work() are added.

Signed-off-by: Tejun Heo <tj@kernel.org>
---
 include/linux/workqueue.h |  18 +++-
 kernel/workqueue.c        | 167 ++++++++++++++++++++++++++++++++++++--
 2 files changed, 174 insertions(+), 11 deletions(-)
  

Comments

Lai Jiangshan Feb. 20, 2024, 7:22 a.m. UTC | #1
Hello, Tejun

On Sat, Feb 17, 2024 at 2:06 AM Tejun Heo <tj@kernel.org> wrote:

> - A work item carries 10bit disable count in work->data while not queued.
>   The access to the count is synchronized by the PENDING bit like all other
>   parts of work->data.

It is 16bit disable count in the code.


> @@ -2417,7 +2437,8 @@ bool queue_work_on(int cpu, struct workqueue_struct *wq,
>
>         local_irq_save(irq_flags);
>
> -       if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
> +       if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)) &&
> +           !clear_pending_if_disabled(work)) {
>                 __queue_work(cpu, wq, work);
>                 ret = true;
>         }
> @@ -2577,7 +2598,8 @@ bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
>         /* read the comment in __queue_work() */
>         local_irq_save(irq_flags);
>
> -       if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
> +       if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)) &&
> +           !clear_pending_if_disabled(work)) {
>                 __queue_delayed_work(cpu, wq, dwork, delay);
>                 ret = true;
>         }

It misses the same handling  at queue_work_node() and queue_rcu_work().

But it is bad idea to cancel or disable rcu work since it can be possibly
in the rcu's waiting list.

> @@ -4173,20 +4195,46 @@ bool flush_rcu_work(struct rcu_work *rwork)
>  }
>  EXPORT_SYMBOL(flush_rcu_work);
>

Thanks
Lai
  
Tejun Heo Feb. 20, 2024, 6:38 p.m. UTC | #2
Hello,

On Tue, Feb 20, 2024 at 03:22:26PM +0800, Lai Jiangshan wrote:
> > - A work item carries 10bit disable count in work->data while not queued.
> >   The access to the count is synchronized by the PENDING bit like all other
> >   parts of work->data.
> 
> It is 16bit disable count in the code.

Fixed.

> It misses the same handling  at queue_work_node() and queue_rcu_work().

Oops, fixed queued_work_node() but I don't think the latter is an issue
given that calling work interface functions in the embedded work is not
supported and rcu_work can't even be canceled.

I'm not quite sure flush_delayed_work() is safe. Will think more about that.

> But it is bad idea to cancel or disable rcu work since it can be possibly
> in the rcu's waiting list.

Yeah, this isn't currently supported. If we want to add this, we'd have to
have a mechanism to shoot it down from RCU's pending list.

Thanks.
  
Lai Jiangshan Feb. 21, 2024, 2:54 a.m. UTC | #3
On Wed, Feb 21, 2024 at 2:38 AM Tejun Heo <tj@kernel.org> wrote:
>
> Hello,
>
> On Tue, Feb 20, 2024 at 03:22:26PM +0800, Lai Jiangshan wrote:
> > > - A work item carries 10bit disable count in work->data while not queued.
> > >   The access to the count is synchronized by the PENDING bit like all other
> > >   parts of work->data.
> >
> > It is 16bit disable count in the code.
>
> Fixed.
>
> > It misses the same handling  at queue_work_node() and queue_rcu_work().
>
> Oops, fixed queued_work_node() but I don't think the latter is an issue
> given that calling work interface functions in the embedded work is not
> supported and rcu_work can't even be canceled.

Hello, Tejun

I think it is better to have the same handling (checking disable count)
in queue_rcu_work().

1) code is consistent with other queuing code
2) known state: no work item is queued with disable count > 0
3) catch wrong usages: some complaining code can be added when adding the check.

Adding checking and complaining in the code is as important as
adding a comment stating rcu work is not allowed to be disabled/canceled.

>
> I'm not quite sure flush_delayed_work() is safe. Will think more about that.

I think the code successfully deleting the timer not only owns the pending bit
but also ensures the disable count is zero.

Thanks
Lai
  
Tejun Heo Feb. 21, 2024, 5:47 a.m. UTC | #4
Hello,

On Wed, Feb 21, 2024 at 10:54:46AM +0800, Lai Jiangshan wrote:
> I think it is better to have the same handling (checking disable count)
> in queue_rcu_work().
> 
> 1) code is consistent with other queuing code
> 2) known state: no work item is queued with disable count > 0
> 3) catch wrong usages: some complaining code can be added when adding the check.
> 
> Adding checking and complaining in the code is as important as
> adding a comment stating rcu work is not allowed to be disabled/canceled.

Sure, will add a WARN_ON_ONCE().

> > I'm not quite sure flush_delayed_work() is safe. Will think more about that.
> 
> I think the code successfully deleting the timer not only owns the pending bit
> but also ensures the disable count is zero.

Yeah, this should be fine.

Thanks.
  
Boqun Feng Feb. 26, 2024, 3:42 a.m. UTC | #5
On Fri, Feb 16, 2024 at 08:04:53AM -1000, Tejun Heo wrote:
[...]
> +/**
> + * enable_work - Enable a work item
> + * @work: work item to enable
> + *
> + * Undo disable_work[_sync]() by decrementing @work's disable count. @work can
> + * only be queued if its disable count is 0.
> + *
> + * Must be called from a sleepable context. Returns %true if the disable count
> + * reached 0. Otherwise, %false.
> + */
> +bool enable_work(struct work_struct *work)
> +{
> +	struct work_offq_data offqd;
> +	unsigned long irq_flags;
> +
> +	work_grab_pending(work, 0, &irq_flags);
> +
> +	work_offqd_unpack(&offqd, *work_data_bits(work));
> +	work_offqd_enable(&offqd);
> +	set_work_pool_and_clear_pending(work, offqd.pool_id,
> +					work_offqd_pack_flags(&offqd));
> +	local_irq_enable();

Maybe
	local_irq_restore(irq_flags);

?

Because in a later patch, you make this funciton callable in any
context, so it may be called with irq disabled.

Regards,
Boqun

> +
> +	return !offqd.disable;
> +}
> +EXPORT_SYMBOL_GPL(enable_work);
[..]
  
Tejun Heo Feb. 26, 2024, 6:30 p.m. UTC | #6
Hello,

On Sun, Feb 25, 2024 at 07:42:27PM -0800, Boqun Feng wrote:
> > +bool enable_work(struct work_struct *work)
> > +{
> > +	struct work_offq_data offqd;
> > +	unsigned long irq_flags;
> > +
> > +	work_grab_pending(work, 0, &irq_flags);
> > +
> > +	work_offqd_unpack(&offqd, *work_data_bits(work));
> > +	work_offqd_enable(&offqd);
> > +	set_work_pool_and_clear_pending(work, offqd.pool_id,
> > +					work_offqd_pack_flags(&offqd));
> > +	local_irq_enable();
> 
> Maybe
> 	local_irq_restore(irq_flags);
> 
> ?
> 
> Because in a later patch, you make this funciton callable in any
> context, so it may be called with irq disabled.

Yeah, Lai already pointed that out. Will send out an updated patchset later.

Thanks.
  

Patch

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index e15fc77bf2e2..f25915e47efb 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -51,20 +51,23 @@  enum work_bits {
 	 * data contains off-queue information when !WORK_STRUCT_PWQ.
 	 *
 	 * MSB
-	 * [ pool ID ] [ OFFQ flags ] [ STRUCT flags ]
-	 *                 1 bit        4 or 5 bits
+	 * [ pool ID ] [ disable depth ] [ OFFQ flags ] [ STRUCT flags ]
+	 *                  16 bits          1 bit        4 or 5 bits
 	 */
 	WORK_OFFQ_FLAG_SHIFT	= WORK_STRUCT_FLAG_BITS,
 	WORK_OFFQ_CANCELING_BIT = WORK_OFFQ_FLAG_SHIFT,
 	WORK_OFFQ_FLAG_END,
 	WORK_OFFQ_FLAG_BITS	= WORK_OFFQ_FLAG_END - WORK_OFFQ_FLAG_SHIFT,
 
+	WORK_OFFQ_DISABLE_SHIFT	= WORK_OFFQ_FLAG_SHIFT + WORK_OFFQ_FLAG_BITS,
+	WORK_OFFQ_DISABLE_BITS	= 16,
+
 	/*
 	 * When a work item is off queue, the high bits encode off-queue flags
 	 * and the last pool it was on. Cap pool ID to 31 bits and use the
 	 * highest number to indicate that no pool is associated.
 	 */
-	WORK_OFFQ_POOL_SHIFT	= WORK_OFFQ_FLAG_SHIFT + WORK_OFFQ_FLAG_BITS,
+	WORK_OFFQ_POOL_SHIFT	= WORK_OFFQ_DISABLE_SHIFT + WORK_OFFQ_DISABLE_BITS,
 	WORK_OFFQ_LEFT		= BITS_PER_LONG - WORK_OFFQ_POOL_SHIFT,
 	WORK_OFFQ_POOL_BITS	= WORK_OFFQ_LEFT <= 31 ? WORK_OFFQ_LEFT : 31,
 };
@@ -98,6 +101,7 @@  enum wq_misc_consts {
 /* Convenience constants - of type 'unsigned long', not 'enum'! */
 #define WORK_OFFQ_CANCELING	(1ul << WORK_OFFQ_CANCELING_BIT)
 #define WORK_OFFQ_FLAG_MASK	(((1ul << WORK_OFFQ_FLAG_BITS) - 1) << WORK_OFFQ_FLAG_SHIFT)
+#define WORK_OFFQ_DISABLE_MASK	(((1ul << WORK_OFFQ_DISABLE_BITS) - 1) << WORK_OFFQ_DISABLE_SHIFT)
 #define WORK_OFFQ_POOL_NONE	((1ul << WORK_OFFQ_POOL_BITS) - 1)
 #define WORK_STRUCT_NO_POOL	(WORK_OFFQ_POOL_NONE << WORK_OFFQ_POOL_SHIFT)
 #define WORK_STRUCT_PWQ_MASK	(~((1ul << WORK_STRUCT_PWQ_SHIFT) - 1))
@@ -556,6 +560,14 @@  extern bool flush_delayed_work(struct delayed_work *dwork);
 extern bool cancel_delayed_work(struct delayed_work *dwork);
 extern bool cancel_delayed_work_sync(struct delayed_work *dwork);
 
+extern bool disable_work(struct work_struct *work);
+extern bool disable_work_sync(struct work_struct *work);
+extern bool enable_work(struct work_struct *work);
+
+extern bool disable_delayed_work(struct delayed_work *dwork);
+extern bool disable_delayed_work_sync(struct delayed_work *dwork);
+extern bool enable_delayed_work(struct delayed_work *dwork);
+
 extern bool flush_rcu_work(struct rcu_work *rwork);
 
 extern void workqueue_set_max_active(struct workqueue_struct *wq,
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index e8f310aa16d6..87e40e6e14cb 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -98,6 +98,7 @@  enum worker_flags {
 
 enum work_cancel_flags {
 	WORK_CANCEL_DELAYED	= 1 << 0,	/* canceling a delayed_work */
+	WORK_CANCEL_DISABLE	= 1 << 1,	/* canceling to disable */
 };
 
 enum wq_internal_consts {
@@ -393,6 +394,7 @@  struct wq_pod_type {
 
 struct work_offq_data {
 	u32			pool_id;
+	u32			disable;
 	u32			flags;
 };
 
@@ -903,12 +905,15 @@  static void work_offqd_unpack(struct work_offq_data *offqd, unsigned long data)
 
 	offqd->pool_id = shift_and_mask(data, WORK_OFFQ_POOL_SHIFT,
 					WORK_OFFQ_POOL_BITS);
+	offqd->disable = shift_and_mask(data, WORK_OFFQ_DISABLE_SHIFT,
+					WORK_OFFQ_DISABLE_BITS);
 	offqd->flags = data & WORK_OFFQ_FLAG_MASK;
 }
 
 static unsigned long work_offqd_pack_flags(struct work_offq_data *offqd)
 {
-	return (unsigned long)offqd->flags;
+	return ((unsigned long)offqd->disable << WORK_OFFQ_DISABLE_SHIFT) |
+		((unsigned long)offqd->flags);
 }
 
 static bool work_is_canceling(struct work_struct *work)
@@ -2395,6 +2400,21 @@  static void __queue_work(int cpu, struct workqueue_struct *wq,
 	rcu_read_unlock();
 }
 
+static bool clear_pending_if_disabled(struct work_struct *work)
+{
+	unsigned long data = *work_data_bits(work);
+	struct work_offq_data offqd;
+
+	if (likely((data & WORK_STRUCT_PWQ) ||
+		   !(data & WORK_OFFQ_DISABLE_MASK)))
+		return false;
+
+	work_offqd_unpack(&offqd, data);
+	set_work_pool_and_clear_pending(work, offqd.pool_id,
+					work_offqd_pack_flags(&offqd));
+	return true;
+}
+
 /**
  * queue_work_on - queue work on specific cpu
  * @cpu: CPU number to execute work on
@@ -2417,7 +2437,8 @@  bool queue_work_on(int cpu, struct workqueue_struct *wq,
 
 	local_irq_save(irq_flags);
 
-	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
+	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)) &&
+	    !clear_pending_if_disabled(work)) {
 		__queue_work(cpu, wq, work);
 		ret = true;
 	}
@@ -2577,7 +2598,8 @@  bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
 	/* read the comment in __queue_work() */
 	local_irq_save(irq_flags);
 
-	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
+	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)) &&
+	    !clear_pending_if_disabled(work)) {
 		__queue_delayed_work(cpu, wq, dwork, delay);
 		ret = true;
 	}
@@ -4173,20 +4195,46 @@  bool flush_rcu_work(struct rcu_work *rwork)
 }
 EXPORT_SYMBOL(flush_rcu_work);
 
+static void work_offqd_disable(struct work_offq_data *offqd)
+{
+	const unsigned long max = (1lu << WORK_OFFQ_DISABLE_BITS) - 1;
+
+	if (likely(offqd->disable < max))
+		offqd->disable++;
+	else
+		WARN_ONCE(true, "workqueue: work disable count overflowed\n");
+}
+
+static void work_offqd_enable(struct work_offq_data *offqd)
+{
+	if (likely(offqd->disable > 0))
+		offqd->disable--;
+	else
+		WARN_ONCE(true, "workqueue: work disable count underflowed\n");
+}
+
 static bool __cancel_work(struct work_struct *work, u32 cflags)
 {
 	struct work_offq_data offqd;
 	unsigned long irq_flags;
 	int ret;
 
-	do {
-		ret = try_to_grab_pending(work, cflags, &irq_flags);
-	} while (unlikely(ret == -EAGAIN));
+	if (cflags & WORK_CANCEL_DISABLE) {
+		ret = work_grab_pending(work, cflags, &irq_flags);
+	} else {
+		do {
+			ret = try_to_grab_pending(work, cflags, &irq_flags);
+		} while (unlikely(ret == -EAGAIN));
 
-	if (unlikely(ret < 0))
-		return false;
+		if (unlikely(ret < 0))
+			return false;
+	}
 
 	work_offqd_unpack(&offqd, *work_data_bits(work));
+
+	if (cflags & WORK_CANCEL_DISABLE)
+		work_offqd_disable(&offqd);
+
 	set_work_pool_and_clear_pending(work, offqd.pool_id,
 					work_offqd_pack_flags(&offqd));
 	local_irq_restore(irq_flags);
@@ -4203,6 +4251,10 @@  static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
 	ret = work_grab_pending(work, cflags, &irq_flags);
 
 	work_offqd_unpack(&offqd, *work_data_bits(work));
+
+	if (cflags & WORK_CANCEL_DISABLE)
+		work_offqd_disable(&offqd);
+
 	offqd.flags |= WORK_OFFQ_CANCELING;
 	set_work_pool_and_keep_pending(work, offqd.pool_id,
 				       work_offqd_pack_flags(&offqd));
@@ -4302,6 +4354,105 @@  bool cancel_delayed_work_sync(struct delayed_work *dwork)
 }
 EXPORT_SYMBOL(cancel_delayed_work_sync);
 
+/**
+ * disable_work - Disable and cancel a work item
+ * @work: work item to disable
+ *
+ * Disable @work by incrementing its disable count and cancel it if currently
+ * pending. As long as the disable count is non-zero, any attempt to queue @work
+ * will fail and return %false. The maximum supported disable depth is 2 to the
+ * power of %WORK_OFFQ_DISABLE_BITS, currently 65536.
+ *
+ * Must be called from a sleepable context. Returns %true if @work was pending,
+ * %false otherwise.
+ */
+bool disable_work(struct work_struct *work)
+{
+	return __cancel_work(work, WORK_CANCEL_DISABLE);
+}
+EXPORT_SYMBOL_GPL(disable_work);
+
+/**
+ * disable_work_sync - Disable, cancel and drain a work item
+ * @work: work item to disable
+ *
+ * Similar to disable_work() but also wait for @work to finish if currently
+ * executing.
+ *
+ * Must be called from a sleepable context. Returns %true if @work was pending,
+ * %false otherwise.
+ */
+bool disable_work_sync(struct work_struct *work)
+{
+	return __cancel_work_sync(work, WORK_CANCEL_DISABLE);
+}
+EXPORT_SYMBOL_GPL(disable_work_sync);
+
+/**
+ * enable_work - Enable a work item
+ * @work: work item to enable
+ *
+ * Undo disable_work[_sync]() by decrementing @work's disable count. @work can
+ * only be queued if its disable count is 0.
+ *
+ * Must be called from a sleepable context. Returns %true if the disable count
+ * reached 0. Otherwise, %false.
+ */
+bool enable_work(struct work_struct *work)
+{
+	struct work_offq_data offqd;
+	unsigned long irq_flags;
+
+	work_grab_pending(work, 0, &irq_flags);
+
+	work_offqd_unpack(&offqd, *work_data_bits(work));
+	work_offqd_enable(&offqd);
+	set_work_pool_and_clear_pending(work, offqd.pool_id,
+					work_offqd_pack_flags(&offqd));
+	local_irq_enable();
+
+	return !offqd.disable;
+}
+EXPORT_SYMBOL_GPL(enable_work);
+
+/**
+ * disable_delayed_work - Disable and cancel a delayed work item
+ * @dwork: delayed work item to disable
+ *
+ * disable_work() for delayed work items.
+ */
+bool disable_delayed_work(struct delayed_work *dwork)
+{
+	return __cancel_work(&dwork->work,
+			     WORK_CANCEL_DELAYED | WORK_CANCEL_DISABLE);
+}
+EXPORT_SYMBOL_GPL(disable_delayed_work);
+
+/**
+ * disable_delayed_work_sync - Disable, cancel and drain a delayed work item
+ * @dwork: delayed work item to disable
+ *
+ * disable_work_sync() for delayed work items.
+ */
+bool disable_delayed_work_sync(struct delayed_work *dwork)
+{
+	return __cancel_work_sync(&dwork->work,
+				  WORK_CANCEL_DELAYED | WORK_CANCEL_DISABLE);
+}
+EXPORT_SYMBOL_GPL(disable_delayed_work_sync);
+
+/**
+ * enable_delayed_work - Enable a delayed work item
+ * @dwork: delayed work item to enable
+ *
+ * enable_work() for delayed work items.
+ */
+bool enable_delayed_work(struct delayed_work *dwork)
+{
+	return enable_work(&dwork->work);
+}
+EXPORT_SYMBOL_GPL(enable_delayed_work);
+
 /**
  * schedule_on_each_cpu - execute a function synchronously on each online CPU
  * @func: the function to call