[3/3] sbitmap: Try each queue to wake up at least one waiter

Message ID 20221115224553.23594-4-krisman@suse.de
State New
Headers
Series sbitmap: Fix two issues in the per-bitmap wakeup counter code |

Commit Message

Gabriel Krisman Bertazi Nov. 15, 2022, 10:45 p.m. UTC
  Jan reported the new algorithm as merged might be problematic if the
queue being awaken becomes empty between the waitqueue_active inside
sbq_wake_ptr check and the wake up.  If that happens, wake_up_nr will
not wake up any waiter and we loose too many wake ups.  In order to
guarantee progress, we need to wake up at least one waiter here, if
there are any.  This now requires trying to wake up from every queue.

Instead of walking through all the queues with sbq_wake_ptr, this call
moves the wake up inside that function.  In a previous version of the
patch, I found that updating wake_index several times when walking
through queues had a measurable overhead.  This ensures we only update
it once, at the end.

Fixes: 4f8126bb2308 ("sbitmap: Use single per-bitmap counting to wake up queued tags")
Reported-by: Jan Kara <jack@suse.cz>
Signed-off-by: Gabriel Krisman Bertazi <krisman@suse.de>
---
 lib/sbitmap.c | 28 ++++++++++++----------------
 1 file changed, 12 insertions(+), 16 deletions(-)
  

Comments

Jan Kara Nov. 16, 2022, 11:09 a.m. UTC | #1
On Tue 15-11-22 17:45:53, Gabriel Krisman Bertazi wrote:
> Jan reported the new algorithm as merged might be problematic if the
> queue being awaken becomes empty between the waitqueue_active inside
> sbq_wake_ptr check and the wake up.  If that happens, wake_up_nr will
> not wake up any waiter and we loose too many wake ups.  In order to
> guarantee progress, we need to wake up at least one waiter here, if
> there are any.  This now requires trying to wake up from every queue.
> 
> Instead of walking through all the queues with sbq_wake_ptr, this call
> moves the wake up inside that function.  In a previous version of the
> patch, I found that updating wake_index several times when walking
> through queues had a measurable overhead.  This ensures we only update
> it once, at the end.
> 
> Fixes: 4f8126bb2308 ("sbitmap: Use single per-bitmap counting to wake up queued tags")
> Reported-by: Jan Kara <jack@suse.cz>
> Signed-off-by: Gabriel Krisman Bertazi <krisman@suse.de>

Elegant and looks good to me! Feel free to add:

Reviewed-by: Jan Kara <jack@suse.cz>

								Honza

> ---
>  lib/sbitmap.c | 28 ++++++++++++----------------
>  1 file changed, 12 insertions(+), 16 deletions(-)
> 
> diff --git a/lib/sbitmap.c b/lib/sbitmap.c
> index bea7984f7987..586deb333237 100644
> --- a/lib/sbitmap.c
> +++ b/lib/sbitmap.c
> @@ -560,12 +560,12 @@ void sbitmap_queue_min_shallow_depth(struct sbitmap_queue *sbq,
>  }
>  EXPORT_SYMBOL_GPL(sbitmap_queue_min_shallow_depth);
>  
> -static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq)
> +static void __sbitmap_queue_wake_up(struct sbitmap_queue *sbq, int nr)
>  {
>  	int i, wake_index;
>  
>  	if (!atomic_read(&sbq->ws_active))
> -		return NULL;
> +		return;
>  
>  	wake_index = atomic_read(&sbq->wake_index);
>  	for (i = 0; i < SBQ_WAIT_QUEUES; i++) {
> @@ -579,20 +579,22 @@ static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq)
>  		 */
>  		wake_index = sbq_index_inc(wake_index);
>  
> -		if (waitqueue_active(&ws->wait)) {
> -			if (wake_index != atomic_read(&sbq->wake_index))
> -				atomic_set(&sbq->wake_index, wake_index);
> -			return ws;
> -		}
> +		/*
> +		 * It is sufficient to wake up at least one waiter to
> +		 * guarantee forward progress.
> +		 */
> +		if (waitqueue_active(&ws->wait) &&
> +		    wake_up_nr(&ws->wait, nr))
> +			break;
>  	}
>  
> -	return NULL;
> +	if (wake_index != atomic_read(&sbq->wake_index))
> +		atomic_set(&sbq->wake_index, wake_index);
>  }
>  
>  void sbitmap_queue_wake_up(struct sbitmap_queue *sbq, int nr)
>  {
>  	unsigned int wake_batch = READ_ONCE(sbq->wake_batch);
> -	struct sbq_wait_state *ws = NULL;
>  	unsigned int wakeups;
>  
>  	if (!atomic_read(&sbq->ws_active))
> @@ -604,16 +606,10 @@ void sbitmap_queue_wake_up(struct sbitmap_queue *sbq, int nr)
>  	do {
>  		if (atomic_read(&sbq->completion_cnt) - wakeups < wake_batch)
>  			return;
> -
> -		if (!ws) {
> -			ws = sbq_wake_ptr(sbq);
> -			if (!ws)
> -				return;
> -		}
>  	} while (!atomic_try_cmpxchg(&sbq->wakeup_cnt,
>  				     &wakeups, wakeups + wake_batch));
>  
> -	wake_up_nr(&ws->wait, wake_batch);
> +	__sbitmap_queue_wake_up(sbq, wake_batch);
>  }
>  EXPORT_SYMBOL_GPL(sbitmap_queue_wake_up);
>  
> -- 
> 2.35.3
>
  

Patch

diff --git a/lib/sbitmap.c b/lib/sbitmap.c
index bea7984f7987..586deb333237 100644
--- a/lib/sbitmap.c
+++ b/lib/sbitmap.c
@@ -560,12 +560,12 @@  void sbitmap_queue_min_shallow_depth(struct sbitmap_queue *sbq,
 }
 EXPORT_SYMBOL_GPL(sbitmap_queue_min_shallow_depth);
 
-static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq)
+static void __sbitmap_queue_wake_up(struct sbitmap_queue *sbq, int nr)
 {
 	int i, wake_index;
 
 	if (!atomic_read(&sbq->ws_active))
-		return NULL;
+		return;
 
 	wake_index = atomic_read(&sbq->wake_index);
 	for (i = 0; i < SBQ_WAIT_QUEUES; i++) {
@@ -579,20 +579,22 @@  static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq)
 		 */
 		wake_index = sbq_index_inc(wake_index);
 
-		if (waitqueue_active(&ws->wait)) {
-			if (wake_index != atomic_read(&sbq->wake_index))
-				atomic_set(&sbq->wake_index, wake_index);
-			return ws;
-		}
+		/*
+		 * It is sufficient to wake up at least one waiter to
+		 * guarantee forward progress.
+		 */
+		if (waitqueue_active(&ws->wait) &&
+		    wake_up_nr(&ws->wait, nr))
+			break;
 	}
 
-	return NULL;
+	if (wake_index != atomic_read(&sbq->wake_index))
+		atomic_set(&sbq->wake_index, wake_index);
 }
 
 void sbitmap_queue_wake_up(struct sbitmap_queue *sbq, int nr)
 {
 	unsigned int wake_batch = READ_ONCE(sbq->wake_batch);
-	struct sbq_wait_state *ws = NULL;
 	unsigned int wakeups;
 
 	if (!atomic_read(&sbq->ws_active))
@@ -604,16 +606,10 @@  void sbitmap_queue_wake_up(struct sbitmap_queue *sbq, int nr)
 	do {
 		if (atomic_read(&sbq->completion_cnt) - wakeups < wake_batch)
 			return;
-
-		if (!ws) {
-			ws = sbq_wake_ptr(sbq);
-			if (!ws)
-				return;
-		}
 	} while (!atomic_try_cmpxchg(&sbq->wakeup_cnt,
 				     &wakeups, wakeups + wake_batch));
 
-	wake_up_nr(&ws->wait, wake_batch);
+	__sbitmap_queue_wake_up(sbq, wake_batch);
 }
 EXPORT_SYMBOL_GPL(sbitmap_queue_wake_up);