[RFC,2/4] workqueue: support holding a mutex for each work

Message ID 20230510175846.cc21c84b0e6b.I9d3df459c43a78530d9c2046724bb45626402d5f@changeid
State New
Headers
Series wifi locking simplification start |

Commit Message

Johannes Berg May 10, 2023, 4:04 p.m. UTC
  From: Johannes Berg <johannes.berg@intel.com>

Add a bit of new infrastructure so that on ordered
workqueues, work will be executed with a specified
mutex held. This can be used to simplify subsystem
implementations, and together with the pause() and
resume() APIs can significantly simplify things in
subsystems using both.

The alternative to this is to manually lock in each
work item, but there can be many of them and this
may require special locking API inside the work
items vs. outside, since the outside might need to
also pause the workqueue.

For example, in wifi, I imagine using this for all
control paths so that wiphy_lock() will also pause
the workqueue, and all works will execute with the
same mutex that is used to implement wiphy_lock()
held. Then, work structs only need to be removed,
without _sync(), removing many potential causes of
locking problems.

Signed-off-by: Johannes Berg <johannes.berg@intel.com>
---
 include/linux/workqueue.h | 30 +++++++++++++++++++++++++++---
 kernel/workqueue.c        | 30 +++++++++++++++++++++++-------
 2 files changed, 50 insertions(+), 10 deletions(-)
  

Comments

Tejun Heo May 10, 2023, 6:34 p.m. UTC | #1
On Wed, May 10, 2023 at 06:04:26PM +0200, Johannes Berg wrote:
> @@ -2387,7 +2389,13 @@ __acquires(&pool->lock)
>  	 */
>  	lockdep_invariant_state(true);
>  	trace_workqueue_execute_start(work);
> -	worker->current_func(work);
> +	if (unlikely(pwq->wq->work_mutex)) {
> +		mutex_lock(pwq->wq->work_mutex);
> +		worker->current_func(work);
> +		mutex_unlock(pwq->wq->work_mutex);
> +	} else {
> +		worker->current_func(work);
> +	}

Ah, I don't know about this. This can't be that difficult to do from the
callee side, right?

Thanks.
  
Johannes Berg May 10, 2023, 7:16 p.m. UTC | #2
On Wed, 2023-05-10 at 08:34 -1000, Tejun Heo wrote:
> On Wed, May 10, 2023 at 06:04:26PM +0200, Johannes Berg wrote:
> > @@ -2387,7 +2389,13 @@ __acquires(&pool->lock)
> >  	 */
> >  	lockdep_invariant_state(true);
> >  	trace_workqueue_execute_start(work);
> > -	worker->current_func(work);
> > +	if (unlikely(pwq->wq->work_mutex)) {
> > +		mutex_lock(pwq->wq->work_mutex);
> > +		worker->current_func(work);
> > +		mutex_unlock(pwq->wq->work_mutex);
> > +	} else {
> > +		worker->current_func(work);
> > +	}
> 
> Ah, I don't know about this. This can't be that difficult to do from the
> callee side, right?
> 

Yeah I thought you'd say that :)

It isn't difficult, the issue is just that in the case I'm envisioning,
you can't just call wiphy_lock() since that would attempt to pause the
workqueue, which can't work from on the workqueue itself. So you need
wiphy_lock_from_work()/wiphy_unlock_from_work() or remember to use the
mutex directly there, which all seemed more error-prone and harder to
maintain.

But anyway I could easily implement _both_ of these in cfg80211
directly, with just a linked list of works and a single struct
work_struct to execute things on the list, with the right locking. That
might be easier overall, just at the expense of more churn while
converting, but that's not even necessarily _bad_, it would really
guarantee that we can tell immediately the work is properly done...

I'll play with that idea some, I guess. Would you still want the
pause/resume patch anyway, even if I end up not using it then?

johannes
  
Tejun Heo May 10, 2023, 7:28 p.m. UTC | #3
Hello,

On Wed, May 10, 2023 at 09:16:09PM +0200, Johannes Berg wrote:
> Yeah I thought you'd say that :)

Sorry about being so predictable. :)

> It isn't difficult, the issue is just that in the case I'm envisioning,
> you can't just call wiphy_lock() since that would attempt to pause the
> workqueue, which can't work from on the workqueue itself. So you need
> wiphy_lock_from_work()/wiphy_unlock_from_work() or remember to use the
> mutex directly there, which all seemed more error-prone and harder to
> maintain.
> 
> But anyway I could easily implement _both_ of these in cfg80211
> directly, with just a linked list of works and a single struct
> work_struct to execute things on the list, with the right locking. That
> might be easier overall, just at the expense of more churn while
> converting, but that's not even necessarily _bad_, it would really
> guarantee that we can tell immediately the work is properly done...
> 
> I'll play with that idea some, I guess. Would you still want the
> pause/resume patch anyway, even if I end up not using it then?

I think it's something inherently useful (along with the ability to do the
same thing to a work time - ie. cancel and inhibit a work item to be
queued0); however, it's probably not a good idea to merge without an in-tree
user. Would you mind posting a fixed patch nonetheless for future reference
if it's not too much hassle?

Thanks.
  

Patch

diff --git a/include/linux/workqueue.h b/include/linux/workqueue.h
index 5e2413017a89..9d0a1bf4d5f7 100644
--- a/include/linux/workqueue.h
+++ b/include/linux/workqueue.h
@@ -387,12 +387,16 @@  extern struct workqueue_struct *system_freezable_wq;
 extern struct workqueue_struct *system_power_efficient_wq;
 extern struct workqueue_struct *system_freezable_power_efficient_wq;
 
+__printf(1, 5) struct workqueue_struct *
+__alloc_workqueue(const char *fmt, unsigned int flags, int max_active,
+		  struct mutex *work_mutex, ...);
+
 /**
  * alloc_workqueue - allocate a workqueue
  * @fmt: printf format for the name of the workqueue
  * @flags: WQ_* flags
  * @max_active: max in-flight work items, 0 for default
- * remaining args: args for @fmt
+ * args: args for @fmt
  *
  * Allocate a workqueue with the specified parameters.  For detailed
  * information on WQ_* flags, please refer to
@@ -401,8 +405,8 @@  extern struct workqueue_struct *system_freezable_power_efficient_wq;
  * RETURNS:
  * Pointer to the allocated workqueue on success, %NULL on failure.
  */
-__printf(1, 4) struct workqueue_struct *
-alloc_workqueue(const char *fmt, unsigned int flags, int max_active, ...);
+#define alloc_workqueue(fmt, flags, max_active, args...) \
+	__alloc_workqueue(fmt, flags, max_active, NULL, ##args)
 
 /**
  * alloc_ordered_workqueue - allocate an ordered workqueue
@@ -421,6 +425,26 @@  alloc_workqueue(const char *fmt, unsigned int flags, int max_active, ...);
 	alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED |		\
 			__WQ_ORDERED_EXPLICIT | (flags), 1, ##args)
 
+/**
+ * alloc_ordered_workqueue_mtx - allocate an ordered workqueue with work mutex
+ * @fmt: printf format for the name of the workqueue
+ * @flags: WQ_* flags (only WQ_FREEZABLE and WQ_MEM_RECLAIM are meaningful)
+ * @work_mutex: mutex to hold for each work execution
+ * @args: args for @fmt
+ *
+ * Allocate an ordered workqueue.  An ordered workqueue executes at
+ * most one work item at any given time in the queued order.
+ *
+ * The work mutex will be held for each work execution.
+ *
+ * RETURNS:
+ * Pointer to the allocated workqueue on success, %NULL on failure.
+ */
+#define alloc_ordered_workqueue_mtx(fmt, flags, work_mutex, args...)	\
+	__alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED |		\
+			  __WQ_ORDERED_EXPLICIT | (flags), 1,		\
+			  work_mutex, ##args)
+
 #define create_workqueue(name)						\
 	alloc_workqueue("%s", __WQ_LEGACY | WQ_MEM_RECLAIM, 1, (name))
 #define create_freezable_workqueue(name)				\
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index 418d99ff8325..2c573e25690c 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -278,6 +278,8 @@  struct workqueue_struct {
 	struct workqueue_attrs	*unbound_attrs;	/* PW: only for unbound wqs */
 	struct pool_workqueue	*dfl_pwq;	/* PW: only for unbound wqs */
 
+	struct mutex		*work_mutex;	/* user mutex held for work */
+
 #ifdef CONFIG_SYSFS
 	struct wq_device	*wq_dev;	/* I: for sysfs interface */
 #endif
@@ -2387,7 +2389,13 @@  __acquires(&pool->lock)
 	 */
 	lockdep_invariant_state(true);
 	trace_workqueue_execute_start(work);
-	worker->current_func(work);
+	if (unlikely(pwq->wq->work_mutex)) {
+		mutex_lock(pwq->wq->work_mutex);
+		worker->current_func(work);
+		mutex_unlock(pwq->wq->work_mutex);
+	} else {
+		worker->current_func(work);
+	}
 	/*
 	 * While we must be careful to not use "work" after this, the trace
 	 * point will only record its address.
@@ -4404,10 +4412,12 @@  static int init_rescuer(struct workqueue_struct *wq)
 	return 0;
 }
 
-__printf(1, 4)
-struct workqueue_struct *alloc_workqueue(const char *fmt,
-					 unsigned int flags,
-					 int max_active, ...)
+__printf(1, 5)
+struct workqueue_struct *__alloc_workqueue(const char *fmt,
+					   unsigned int flags,
+					   int max_active,
+					   struct mutex *work_mutex,
+					   ...)
 {
 	size_t tbl_size = 0;
 	va_list args;
@@ -4432,6 +4442,10 @@  struct workqueue_struct *alloc_workqueue(const char *fmt,
 	if (flags & WQ_UNBOUND)
 		tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);
 
+	/* can only reach this by calling the internal API */
+	if (WARN_ON(!(flags & __WQ_ORDERED_EXPLICIT) && work_mutex))
+		return NULL;
+
 	wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);
 	if (!wq)
 		return NULL;
@@ -4442,7 +4456,9 @@  struct workqueue_struct *alloc_workqueue(const char *fmt,
 			goto err_free_wq;
 	}
 
-	va_start(args, max_active);
+	wq->work_mutex = work_mutex;
+
+	va_start(args, work_mutex);
 	vsnprintf(wq->name, sizeof(wq->name), fmt, args);
 	va_end(args);
 
@@ -4500,7 +4516,7 @@  struct workqueue_struct *alloc_workqueue(const char *fmt,
 	destroy_workqueue(wq);
 	return NULL;
 }
-EXPORT_SYMBOL_GPL(alloc_workqueue);
+EXPORT_SYMBOL_GPL(__alloc_workqueue);
 
 static bool pwq_busy(struct pool_workqueue *pwq)
 {