[PATCH-wq,v2,2/5] workqueue: Enable unbound cpumask update on ordered workqueues
Commit Message
Ordered workqueues does not currently follow changes made to the
global unbound cpumask because per-pool workqueue changes may break
the ordering guarantee. IOW, a work function in an ordered workqueue
may run on an isolated CPU.
This patch enables ordered workqueues to follow changes made to
the global unbound cpumask by temporaily freeze the newly allocated
pool_workqueue by using the new frozen flag to freeze execution of
newly queued work items until the old pwq has been properly flushed.
This enables ordered workqueues to follow the unbound cpumask changes
like other unbound workqueues at the expense of some delay in execution
of work functions during the transition period.
Signed-off-by: Waiman Long <longman@redhat.com>
---
kernel/workqueue.c | 93 +++++++++++++++++++++++++++++++++++++++-------
1 file changed, 80 insertions(+), 13 deletions(-)
Comments
Hello, Waiman.
On Sat, Feb 03, 2024 at 10:43:31AM -0500, Waiman Long wrote:
> @@ -242,6 +242,7 @@ struct pool_workqueue {
> int refcnt; /* L: reference count */
> int nr_in_flight[WORK_NR_COLORS];
> /* L: nr of in_flight works */
> + int frozen; /* L: temporarily frozen */
Can we say "plugged" instead? This is a bit confusing because freeze/thaw
are used by PM.
> @@ -1667,6 +1668,9 @@ static bool pwq_tryinc_nr_active(struct pool_workqueue *pwq, bool fill)
>
> lockdep_assert_held(&pool->lock);
>
> + if (pwq->frozen)
> + return false;
Given that this only applied to unbound workqueues, this can be after the if
(!nna) block, right? And also maybe add unlikely()?
> +/**
> + * thaw_pwq - thaw a frozen pool_workqueue
> + * @pwq: pool_workqueue to be thawed
> + */
> +static void thaw_pwq(struct pool_workqueue *pwq)
and "unplug".
> @@ -4595,6 +4614,14 @@ static void pwq_release_workfn(struct kthread_work *work)
> mutex_lock(&wq->mutex);
> list_del_rcu(&pwq->pwqs_node);
> is_last = list_empty(&wq->pwqs);
> +
> + /*
> + * For ordered workqueue with a frozen dfl_pwq, thaw it now.
> + */
> + if (!is_last && (wq->flags & __WQ_ORDERED_EXPLICIT) &&
> + wq->dfl_pwq->frozen)
> + thaw_pwq(wq->dfl_pwq);
It should thaw the last pwq in the wq->pwqs list, not the current dfl_pwq,
right? Imagine an ordered workqueue went through two unbound mask changes.
There will be three pwq's. Let's say there are work items queued on all
three. A, B, C are the workqueues in the oldest to neweest order. 0, 1, 2..
are work items in the queueing order.
dfl_pwq --\
|
v
pwqs -> C -> B -> A (oldest)
| | |
3 2 0
|
1
dfl_pwq should point to the latest pwq as that's where the new work items
should be queued. But, execution should only be allowed in the oldest pwq to
maintain execution order.
I think maybe a simpler way express the logic is "always keep only the last
pwq in the pwq list unplugged".
Also, we likely want to test __WQ_ORDERED as this should also apply to
implicitly ordered workqueues (they should go away eventually but still with
us for now). The problem is that __WQ_ORDERED can go off and when it goes of
it'd need to unplug all the pwqs and so on.
> @@ -4758,10 +4785,30 @@ static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx)
> {
> if (ctx) {
> int cpu;
> + bool refcheck = false;
>
> for_each_possible_cpu(cpu)
> put_pwq_unlocked(ctx->pwq_tbl[cpu]);
> +
> + /*
> + * For ordered workqueue with a frozen dfl_pwq and a reference
> + * count of 1 in ctx->dfl_pwq, it is highly likely that the
> + * refcnt will become 0 after the final put_pwq(). Acquire
> + * wq->mutex to ensure that the pwq won't be freed by
> + * pwq_release_workfn() when we check pwq later.
> + */
> + if ((ctx->wq->flags & __WQ_ORDERED_EXPLICIT) &&
> + ctx->wq->dfl_pwq->frozen &&
> + (ctx->dfl_pwq->refcnt == 1)) {
> + mutex_lock(&ctx->wq->mutex);
> + refcheck = true;
> + }
> put_pwq_unlocked(ctx->dfl_pwq);
> + if (refcheck) {
> + if (!ctx->dfl_pwq->refcnt)
> + thaw_pwq(ctx->wq->dfl_pwq);
> + mutex_unlock(&ctx->wq->mutex);
> + }
I don't think it's a worthwhile optimization to not grab wq->mutex in
apply_wqattrs path. Can you please simplify the code?
> @@ -6316,11 +6367,28 @@ static int workqueue_apply_unbound_cpumask(const cpumask_var_t unbound_cpumask)
> if (!(wq->flags & WQ_UNBOUND) || (wq->flags & __WQ_DESTROYING))
> continue;
>
> - /* creating multiple pwqs breaks ordering guarantee */
> + /*
> + * We does not support changing cpumask of an ordered workqueue
^
do
> + * again before the previous cpumask change is completed.
Maybe explain why?
> + * Sleep up to 100ms in 10ms interval to allow previous
> + * operation to complete and skip it if not done by then.
> + */
> if (!list_empty(&wq->pwqs)) {
> - if (wq->flags & __WQ_ORDERED_EXPLICIT)
> - continue;
> - wq->flags &= ~__WQ_ORDERED;
> + struct pool_workqueue *pwq = wq->dfl_pwq;
> +
> + if (!(wq->flags & __WQ_ORDERED_EXPLICIT)) {
> + wq->flags &= ~__WQ_ORDERED;
> + } else if (pwq && pwq->frozen) {
> + int i;
> +
> + for (i = 0; i < 10; i++) {
> + msleep(10);
> + if (!pwq->frozen)
> + break;
> + }
> + if (WARN_ON_ONCE(pwq->frozen))
> + continue;
> + }
I don't understand this. Why do we need to block further changes?
Thanks.
@@ -242,6 +242,7 @@ struct pool_workqueue {
int refcnt; /* L: reference count */
int nr_in_flight[WORK_NR_COLORS];
/* L: nr of in_flight works */
+ int frozen; /* L: temporarily frozen */
/*
* nr_active management and WORK_STRUCT_INACTIVE:
@@ -1667,6 +1668,9 @@ static bool pwq_tryinc_nr_active(struct pool_workqueue *pwq, bool fill)
lockdep_assert_held(&pool->lock);
+ if (pwq->frozen)
+ return false;
+
if (!nna) {
/* per-cpu workqueue, pwq->nr_active is sufficient */
obtained = pwq->nr_active < READ_ONCE(wq->max_active);
@@ -1747,6 +1751,21 @@ static bool pwq_activate_first_inactive(struct pool_workqueue *pwq, bool fill)
}
}
+/**
+ * thaw_pwq - thaw a frozen pool_workqueue
+ * @pwq: pool_workqueue to be thawed
+ */
+static void thaw_pwq(struct pool_workqueue *pwq)
+{
+ unsigned long flags;
+
+ raw_spin_lock_irqsave(&pwq->pool->lock, flags);
+ pwq->frozen = false;
+ if (pwq_activate_first_inactive(pwq, true))
+ kick_pool(pwq->pool);
+ raw_spin_unlock_irqrestore(&pwq->pool->lock, flags);
+}
+
/**
* node_activate_pending_pwq - Activate a pending pwq on a wq_node_nr_active
* @nna: wq_node_nr_active to activate a pending pwq for
@@ -4595,6 +4614,14 @@ static void pwq_release_workfn(struct kthread_work *work)
mutex_lock(&wq->mutex);
list_del_rcu(&pwq->pwqs_node);
is_last = list_empty(&wq->pwqs);
+
+ /*
+ * For ordered workqueue with a frozen dfl_pwq, thaw it now.
+ */
+ if (!is_last && (wq->flags & __WQ_ORDERED_EXPLICIT) &&
+ wq->dfl_pwq->frozen)
+ thaw_pwq(wq->dfl_pwq);
+
mutex_unlock(&wq->mutex);
}
@@ -4758,10 +4785,30 @@ static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx)
{
if (ctx) {
int cpu;
+ bool refcheck = false;
for_each_possible_cpu(cpu)
put_pwq_unlocked(ctx->pwq_tbl[cpu]);
+
+ /*
+ * For ordered workqueue with a frozen dfl_pwq and a reference
+ * count of 1 in ctx->dfl_pwq, it is highly likely that the
+ * refcnt will become 0 after the final put_pwq(). Acquire
+ * wq->mutex to ensure that the pwq won't be freed by
+ * pwq_release_workfn() when we check pwq later.
+ */
+ if ((ctx->wq->flags & __WQ_ORDERED_EXPLICIT) &&
+ ctx->wq->dfl_pwq->frozen &&
+ (ctx->dfl_pwq->refcnt == 1)) {
+ mutex_lock(&ctx->wq->mutex);
+ refcheck = true;
+ }
put_pwq_unlocked(ctx->dfl_pwq);
+ if (refcheck) {
+ if (!ctx->dfl_pwq->refcnt)
+ thaw_pwq(ctx->wq->dfl_pwq);
+ mutex_unlock(&ctx->wq->mutex);
+ }
free_workqueue_attrs(ctx->attrs);
@@ -4821,6 +4868,15 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
cpumask_copy(new_attrs->__pod_cpumask, new_attrs->cpumask);
ctx->attrs = new_attrs;
+ /*
+ * For initialized ordered workqueues, there is only one pwq (dfl_pwq).
+ * Temporarily the frozen flag of ctx->dfl_pwq to freeze the execution
+ * of newly queued work items until execution of older work items in
+ * the old pwq has completed.
+ */
+ if (!list_empty(&wq->pwqs) && (wq->flags & __WQ_ORDERED_EXPLICIT))
+ ctx->dfl_pwq->frozen = true;
+
ctx->wq = wq;
return ctx;
@@ -4861,13 +4917,8 @@ static int apply_workqueue_attrs_locked(struct workqueue_struct *wq,
if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
return -EINVAL;
- /* creating multiple pwqs breaks ordering guarantee */
- if (!list_empty(&wq->pwqs)) {
- if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
- return -EINVAL;
-
+ if (!list_empty(&wq->pwqs) && !(wq->flags & __WQ_ORDERED_EXPLICIT))
wq->flags &= ~__WQ_ORDERED;
- }
ctx = apply_wqattrs_prepare(wq, attrs, wq_unbound_cpumask);
if (IS_ERR(ctx))
@@ -6316,11 +6367,28 @@ static int workqueue_apply_unbound_cpumask(const cpumask_var_t unbound_cpumask)
if (!(wq->flags & WQ_UNBOUND) || (wq->flags & __WQ_DESTROYING))
continue;
- /* creating multiple pwqs breaks ordering guarantee */
+ /*
+ * We does not support changing cpumask of an ordered workqueue
+ * again before the previous cpumask change is completed.
+ * Sleep up to 100ms in 10ms interval to allow previous
+ * operation to complete and skip it if not done by then.
+ */
if (!list_empty(&wq->pwqs)) {
- if (wq->flags & __WQ_ORDERED_EXPLICIT)
- continue;
- wq->flags &= ~__WQ_ORDERED;
+ struct pool_workqueue *pwq = wq->dfl_pwq;
+
+ if (!(wq->flags & __WQ_ORDERED_EXPLICIT)) {
+ wq->flags &= ~__WQ_ORDERED;
+ } else if (pwq && pwq->frozen) {
+ int i;
+
+ for (i = 0; i < 10; i++) {
+ msleep(10);
+ if (!pwq->frozen)
+ break;
+ }
+ if (WARN_ON_ONCE(pwq->frozen))
+ continue;
+ }
}
ctx = apply_wqattrs_prepare(wq, wq->unbound_attrs, unbound_cpumask);
@@ -6836,9 +6904,8 @@ int workqueue_sysfs_register(struct workqueue_struct *wq)
int ret;
/*
- * Adjusting max_active or creating new pwqs by applying
- * attributes breaks ordering guarantee. Disallow exposing ordered
- * workqueues.
+ * Adjusting max_active breaks ordering guarantee. Disallow exposing
+ * ordered workqueues.
*/
if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
return -EINVAL;