[wq/for-6.9,v4,2/4] workqueue: Enable unbound cpumask update on ordered workqueues

Message ID 20240207011911.975608-3-longman@redhat.com
State New
Headers
Series workqueue: Enable unbound cpumask update on ordered workqueues |

Commit Message

Waiman Long Feb. 7, 2024, 1:19 a.m. UTC
  Ordered workqueues does not currently follow changes made to the
global unbound cpumask because per-pool workqueue changes may break
the ordering guarantee. IOW, a work function in an ordered workqueue
may run on an isolated CPU.

This patch enables ordered workqueues to follow changes made to the
global unbound cpumask by temporaily plug or suspend the newly allocated
pool_workqueue from executing newly queued work items until the old
pwq has been properly drained. For ordered workqueues, there should
only be one pwq that is unplugged, the rests should be plugged.

This enables ordered workqueues to follow the unbound cpumask changes
like other unbound workqueues at the expense of some delay in execution
of work functions during the transition period.

Signed-off-by: Waiman Long <longman@redhat.com>
---
 kernel/workqueue.c | 102 ++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 92 insertions(+), 10 deletions(-)
  

Comments

Tejun Heo Feb. 7, 2024, 5:25 p.m. UTC | #1
Hello, Waiman.

On Tue, Feb 06, 2024 at 08:19:09PM -0500, Waiman Long wrote:
..
> + * The unplugging is done either in apply_wqattrs_cleanup() [fast path] when
> + * the workqueue was idle or in pwq_release_workfn() [slow path] when the
> + * workqueue was busy.

I'm not sure the distinction between fast and slow paths is all that useful
here. Both are really cold paths.

> +static void unplug_oldest_pwq(struct workqueue_struct *wq,
> +			      struct pool_workqueue *exlude_pwq)
> +{
> +	struct pool_workqueue *pwq;
> +	unsigned long flags;
> +	bool found = false;
> +
> +	for_each_pwq(pwq, wq) {
> +		if (pwq == exlude_pwq)
> +			continue;
> +		if (!pwq->plugged)
> +			return;	/* No unplug needed */
> +		found = true;
> +		break;
> +	}
> +	if (WARN_ON_ONCE(!found))
> +		return;
> +
> +	raw_spin_lock_irqsave(&pwq->pool->lock, flags);
> +	if (!pwq->plugged)
> +		goto out_unlock;
> +	pwq->plugged = false;
> +	if (pwq_activate_first_inactive(pwq, true))
> +		kick_pool(pwq->pool);
> +out_unlock:
> +	raw_spin_unlock_irqrestore(&pwq->pool->lock, flags);
> +}

I don't quite understand why this needs iteration and @exclude_pwq.
Shouldn't something like the following be enough?

static void unplug_oldest_pwq(struct workqueue_struct *wq)
{
	struct pool_workqueue *pwq;

	raw_spin_lock_irq(&pwq->pool->lock);
	pwq = list_first_entry_or_null(&pwq->pwqs, ...);
	if (pwq)
		pwq->plugged = false;
	raw_spin_unlock_irq(&pwq->pool->lock);
}

> @@ -4740,6 +4796,13 @@ static void pwq_release_workfn(struct kthread_work *work)
>  		mutex_lock(&wq->mutex);
>  		list_del_rcu(&pwq->pwqs_node);
>  		is_last = list_empty(&wq->pwqs);
> +
> +		/*
> +		 * For ordered workqueue with a plugged dfl_pwq, restart it now.
> +		 */
> +		if (!is_last && (wq->flags & __WQ_ORDERED))
> +			unplug_oldest_pwq(wq, NULL);

This makes sense.

> @@ -4906,8 +4969,26 @@ static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx)
..
> +		/*
> +		 * It is possible that ctx->dfl_pwq (previous wq->dfl_pwq)
> +		 * may not be the oldest one with the plugged flag still set.
> +		 * unplug_oldest_pwq() will still do the right thing to allow
> +		 * only one unplugged pwq in the workqueue.
> +		 */
> +		if ((ctx->wq->flags & __WQ_ORDERED) &&
> +		     ctx->dfl_pwq && !ctx->dfl_pwq->refcnt)
> +			unplug_oldest_pwq(ctx->wq, ctx->dfl_pwq);
> +		rcu_read_unlock();

But why do we need this? Isn't all that needed to call unplug_oldest during
workqueue initialization and chaining unplugging from pwq release from there
on?

Thanks.
  
Waiman Long Feb. 7, 2024, 8:59 p.m. UTC | #2
On 2/7/24 12:25, Tejun Heo wrote:
> Hello, Waiman.
>
> On Tue, Feb 06, 2024 at 08:19:09PM -0500, Waiman Long wrote:
> ...
>> + * The unplugging is done either in apply_wqattrs_cleanup() [fast path] when
>> + * the workqueue was idle or in pwq_release_workfn() [slow path] when the
>> + * workqueue was busy.
> I'm not sure the distinction between fast and slow paths is all that useful
> here. Both are really cold paths.
Yes, both are cold paths. Maybe a more accurate description is with 
respect to the latency that a new work item may experience since 
apply_wqattrs_cleanup() should be executed earlier than 
pwq_release_workfn().
>
>> +static void unplug_oldest_pwq(struct workqueue_struct *wq,
>> +			      struct pool_workqueue *exlude_pwq)
>> +{
>> +	struct pool_workqueue *pwq;
>> +	unsigned long flags;
>> +	bool found = false;
>> +
>> +	for_each_pwq(pwq, wq) {
>> +		if (pwq == exlude_pwq)
>> +			continue;
>> +		if (!pwq->plugged)
>> +			return;	/* No unplug needed */
>> +		found = true;
>> +		break;
>> +	}
>> +	if (WARN_ON_ONCE(!found))
>> +		return;
>> +
>> +	raw_spin_lock_irqsave(&pwq->pool->lock, flags);
>> +	if (!pwq->plugged)
>> +		goto out_unlock;
>> +	pwq->plugged = false;
>> +	if (pwq_activate_first_inactive(pwq, true))
>> +		kick_pool(pwq->pool);
>> +out_unlock:
>> +	raw_spin_unlock_irqrestore(&pwq->pool->lock, flags);
>> +}
> I don't quite understand why this needs iteration and @exclude_pwq.
> Shouldn't something like the following be enough?
>
> static void unplug_oldest_pwq(struct workqueue_struct *wq)
> {
> 	struct pool_workqueue *pwq;
>
> 	raw_spin_lock_irq(&pwq->pool->lock);
> 	pwq = list_first_entry_or_null(&pwq->pwqs, ...);
> 	if (pwq)
> 		pwq->plugged = false;
> 	raw_spin_unlock_irq(&pwq->pool->lock);
> }
>
It is because this function can be called from apply_wqattrs_cleanup() 
where I need to exclude ctx->dfl_pwq from being considered.
>> @@ -4740,6 +4796,13 @@ static void pwq_release_workfn(struct kthread_work *work)
>>   		mutex_lock(&wq->mutex);
>>   		list_del_rcu(&pwq->pwqs_node);
>>   		is_last = list_empty(&wq->pwqs);
>> +
>> +		/*
>> +		 * For ordered workqueue with a plugged dfl_pwq, restart it now.
>> +		 */
>> +		if (!is_last && (wq->flags & __WQ_ORDERED))
>> +			unplug_oldest_pwq(wq, NULL);
> This makes sense.
>
>> @@ -4906,8 +4969,26 @@ static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx)
> ...
>> +		/*
>> +		 * It is possible that ctx->dfl_pwq (previous wq->dfl_pwq)
>> +		 * may not be the oldest one with the plugged flag still set.
>> +		 * unplug_oldest_pwq() will still do the right thing to allow
>> +		 * only one unplugged pwq in the workqueue.
>> +		 */
>> +		if ((ctx->wq->flags & __WQ_ORDERED) &&
>> +		     ctx->dfl_pwq && !ctx->dfl_pwq->refcnt)
>> +			unplug_oldest_pwq(ctx->wq, ctx->dfl_pwq);
>> +		rcu_read_unlock();
> But why do we need this? Isn't all that needed to call unplug_oldest during
> workqueue initialization and chaining unplugging from pwq release from there
> on?

Yes, it is possible to just do unplug_oldest_pwq() in 
pwq_release_workfn() and don't do it in apply_wqattrs_cleanup(). As said 
above, I just want to reduce the latency when the old pwq to be retired 
is idle. I can certainly update the patch to just do it in 
pwq_release_workfn() if you don't that it is necessary to do that too in 
apply_wqattrs_cleanup(). That will eliminate the need for the extra 
arugment and simplify unplug_oldest_pwq().

Cheers,
Longman
  
Tejun Heo Feb. 7, 2024, 9:18 p.m. UTC | #3
Hello, Waiman.

On Wed, Feb 07, 2024 at 03:59:06PM -0500, Waiman Long wrote:
> > But why do we need this? Isn't all that needed to call unplug_oldest during
> > workqueue initialization and chaining unplugging from pwq release from there
> > on?
> 
> Yes, it is possible to just do unplug_oldest_pwq() in pwq_release_workfn()
> and don't do it in apply_wqattrs_cleanup(). As said above, I just want to
> reduce the latency when the old pwq to be retired is idle. I can certainly

They should retire as soon as all the work items are done.

> update the patch to just do it in pwq_release_workfn() if you don't that it
> is necessary to do that too in apply_wqattrs_cleanup(). That will eliminate
> the need for the extra arugment and simplify unplug_oldest_pwq().

So, yeah, let's please keep it simple for now. We can add optimizations
later if this becomes a problem, right?

Thanks.
  

Patch

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index fa7bd3b34f52..49fe082fe328 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -255,6 +255,7 @@  struct pool_workqueue {
 	int			refcnt;		/* L: reference count */
 	int			nr_in_flight[WORK_NR_COLORS];
 						/* L: nr of in_flight works */
+	bool			plugged;	/* L: execution suspended */
 
 	/*
 	 * nr_active management and WORK_STRUCT_INACTIVE:
@@ -1708,6 +1709,9 @@  static bool pwq_tryinc_nr_active(struct pool_workqueue *pwq, bool fill)
 		goto out;
 	}
 
+	if (unlikely(pwq->plugged))
+		return false;
+
 	/*
 	 * Unbound workqueue uses per-node shared nr_active $nna. If @pwq is
 	 * already waiting on $nna, pwq_dec_nr_active() will maintain the
@@ -1782,6 +1786,58 @@  static bool pwq_activate_first_inactive(struct pool_workqueue *pwq, bool fill)
 	}
 }
 
+/**
+ * unplug_oldest_pwq - restart an oldest plugged pool_workqueue
+ * @wq: workqueue_struct to be restarted
+ * @pwq: pwq to be excluded
+ *
+ * pwq's are linked into wq->pwqs with the oldest first. For ordered
+ * workqueues, only the oldest pwq is unplugged, the others are plugged to
+ * suspend execution until the oldest one is drained. When this happens, the
+ * next oldest one (first plugged pwq in iteration) will be unplugged to
+ * restart work item execution to ensure proper work item ordering.
+ *
+ *    dfl_pwq --------------+     [P] - plugged
+ *                          |
+ *                          v
+ *    pwqs -> A -> B [P] -> C [P] (newest)
+ *            |    |        |
+ *            1    3        5
+ *            |    |        |
+ *            2    4        6
+ *
+ * The unplugging is done either in apply_wqattrs_cleanup() [fast path] when
+ * the workqueue was idle or in pwq_release_workfn() [slow path] when the
+ * workqueue was busy.
+ */
+static void unplug_oldest_pwq(struct workqueue_struct *wq,
+			      struct pool_workqueue *exlude_pwq)
+{
+	struct pool_workqueue *pwq;
+	unsigned long flags;
+	bool found = false;
+
+	for_each_pwq(pwq, wq) {
+		if (pwq == exlude_pwq)
+			continue;
+		if (!pwq->plugged)
+			return;	/* No unplug needed */
+		found = true;
+		break;
+	}
+	if (WARN_ON_ONCE(!found))
+		return;
+
+	raw_spin_lock_irqsave(&pwq->pool->lock, flags);
+	if (!pwq->plugged)
+		goto out_unlock;
+	pwq->plugged = false;
+	if (pwq_activate_first_inactive(pwq, true))
+		kick_pool(pwq->pool);
+out_unlock:
+	raw_spin_unlock_irqrestore(&pwq->pool->lock, flags);
+}
+
 /**
  * node_activate_pending_pwq - Activate a pending pwq on a wq_node_nr_active
  * @nna: wq_node_nr_active to activate a pending pwq for
@@ -4740,6 +4796,13 @@  static void pwq_release_workfn(struct kthread_work *work)
 		mutex_lock(&wq->mutex);
 		list_del_rcu(&pwq->pwqs_node);
 		is_last = list_empty(&wq->pwqs);
+
+		/*
+		 * For ordered workqueue with a plugged dfl_pwq, restart it now.
+		 */
+		if (!is_last && (wq->flags & __WQ_ORDERED))
+			unplug_oldest_pwq(wq, NULL);
+
 		mutex_unlock(&wq->mutex);
 	}
 
@@ -4906,8 +4969,26 @@  static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx)
 
 		for_each_possible_cpu(cpu)
 			put_pwq_unlocked(ctx->pwq_tbl[cpu]);
+
+		/*
+		 * Acquire rcu_read_lock() before refcnt can become 0 to
+		 * ensure that ctx->dfl_pwq won't be freed & we can
+		 * iterate wq->pwqs.
+		 */
+		rcu_read_lock();
 		put_pwq_unlocked(ctx->dfl_pwq);
 
+		/*
+		 * It is possible that ctx->dfl_pwq (previous wq->dfl_pwq)
+		 * may not be the oldest one with the plugged flag still set.
+		 * unplug_oldest_pwq() will still do the right thing to allow
+		 * only one unplugged pwq in the workqueue.
+		 */
+		if ((ctx->wq->flags & __WQ_ORDERED) &&
+		     ctx->dfl_pwq && !ctx->dfl_pwq->refcnt)
+			unplug_oldest_pwq(ctx->wq, ctx->dfl_pwq);
+		rcu_read_unlock();
+
 		free_workqueue_attrs(ctx->attrs);
 
 		kfree(ctx);
@@ -4966,6 +5047,15 @@  apply_wqattrs_prepare(struct workqueue_struct *wq,
 	cpumask_copy(new_attrs->__pod_cpumask, new_attrs->cpumask);
 	ctx->attrs = new_attrs;
 
+	/*
+	 * For initialized ordered workqueues, there is only one pwq (dfl_pwq).
+	 * Set the plugged flag of ctx->dfl_pwq to suspend execution of newly
+	 * queued work items until execution of older work items in the old
+	 * pwq's have completed.
+	 */
+	if (!list_empty(&wq->pwqs) && (wq->flags & __WQ_ORDERED))
+		ctx->dfl_pwq->plugged = true;
+
 	ctx->wq = wq;
 	return ctx;
 
@@ -5006,10 +5096,6 @@  static int apply_workqueue_attrs_locked(struct workqueue_struct *wq,
 	if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
 		return -EINVAL;
 
-	/* creating multiple pwqs breaks ordering guarantee */
-	if (!list_empty(&wq->pwqs) && WARN_ON(wq->flags & __WQ_ORDERED))
-		return -EINVAL;
-
 	ctx = apply_wqattrs_prepare(wq, attrs, wq_unbound_cpumask);
 	if (IS_ERR(ctx))
 		return PTR_ERR(ctx);
@@ -6489,9 +6575,6 @@  static int workqueue_apply_unbound_cpumask(const cpumask_var_t unbound_cpumask)
 	list_for_each_entry(wq, &workqueues, list) {
 		if (!(wq->flags & WQ_UNBOUND) || (wq->flags & __WQ_DESTROYING))
 			continue;
-		/* creating multiple pwqs breaks ordering guarantee */
-		if (wq->flags & __WQ_ORDERED)
-			continue;
 
 		ctx = apply_wqattrs_prepare(wq, wq->unbound_attrs, unbound_cpumask);
 		if (IS_ERR(ctx)) {
@@ -7006,9 +7089,8 @@  int workqueue_sysfs_register(struct workqueue_struct *wq)
 	int ret;
 
 	/*
-	 * Adjusting max_active or creating new pwqs by applying
-	 * attributes breaks ordering guarantee.  Disallow exposing ordered
-	 * workqueues.
+	 * Adjusting max_active breaks ordering guarantee.  Disallow exposing
+	 * ordered workqueues.
 	 */
 	if (WARN_ON(wq->flags & __WQ_ORDERED))
 		return -EINVAL;