[1/3] rcu: Reduce synchronize_rcu() waiting time

Message ID 20231025140915.590390-2-urezki@gmail.com
State New
Headers
Series reduce latency of synchronize_rcu() |

Commit Message

Uladzislau Rezki Oct. 25, 2023, 2:09 p.m. UTC
  A call to a synchronize_rcu() can be optimized from time point of
view. Different workloads can be affected by this especially the
ones which use this API in its time critical sections.

For example if CONFIG_RCU_NOCB_CPU is set, the wakeme_after_rcu()
callback can be delayed and such delay depends on:

- where in a nocb list it is located;
- how fast previous callbacks completed.

1. On our Android devices i can easily trigger the scenario when
it is a last in the list out of ~3600 callbacks:

<snip>
  <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt CBs=3613 bl=28
...
  <...>-29      [001] ..... 21950.152578: rcu_invoke_callback: rcu_preempt rhp=00000000b2d6dee8 func=__free_vm_area_struct.cfi_jt
  <...>-29      [001] ..... 21950.152579: rcu_invoke_callback: rcu_preempt rhp=00000000a446f607 func=__free_vm_area_struct.cfi_jt
  <...>-29      [001] ..... 21950.152580: rcu_invoke_callback: rcu_preempt rhp=00000000a5cab03b func=__free_vm_area_struct.cfi_jt
  <...>-29      [001] ..... 21950.152581: rcu_invoke_callback: rcu_preempt rhp=0000000013b7e5ee func=__free_vm_area_struct.cfi_jt
  <...>-29      [001] ..... 21950.152582: rcu_invoke_callback: rcu_preempt rhp=000000000a8ca6f9 func=__free_vm_area_struct.cfi_jt
  <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
  <...>-29      [001] d..1. 21950.152625: rcu_batch_end: rcu_preempt CBs-invoked=3612 idle=....
<snip>

2. We use cpuset/cgroup to classify tasks and assign them into
different cgroups. For example "backgrond" group which binds tasks
only to little CPUs or "foreground" which makes use of all CPUs.
Tasks can be migrated between groups by a request if an acceleration
is needed.

See below an example how "surfaceflinger" task gets migrated.
Initially it is located in the "system-background" cgroup which
allows to run only on little cores. In order to speed it up it
can be temporary moved into "foreground" cgroup which allows
to use big/all CPUs:

cgroup_attach_task():
 -> cgroup_migrate_execute()
   -> cpuset_can_attach()
     -> percpu_down_write()
       -> rcu_sync_enter()
         -> synchronize_rcu()
   -> now move tasks to the new cgroup.
 -> cgroup_migrate_finish()

<snip>
         rcuop/1-29      [000] .....  7030.528570: rcu_invoke_callback: rcu_preempt rhp=00000000461605e0 func=wakeme_after_rcu.cfi_jt
    PERFD-SERVER-1855    [000] d..1.  7030.530293: cgroup_attach_task: dst_root=3 dst_id=22 dst_level=1 dst_path=/foreground pid=1900 comm=surfaceflinger
    PERFD-SERVER-1855    [000] d..1.  7030.530383: cgroup_attach_task: dst_root=3 dst_id=22 dst_level=1 dst_path=/foreground pid=1900 comm=surfaceflinger
   TimerDispatch-2768    [002] d..5.  7030.537542: sched_migrate_task: comm=surfaceflinger pid=1900 prio=98 orig_cpu=0 dest_cpu=4
<snip>

"A moving time" depends on how fast synchronize_rcu() completes. See
the first trace line. The migration has not occurred until the sync
was done first. Please note, number of different callbacks to be
invoked can be thousands.

3. To address this drawback, maintain a separate track that consists
of synchronize_rcu() callers only. The GP-kthread, that drivers a GP
either wake-ups a worker to drain all list or directly wakes-up end
user if it is one in the drain list.

4. This patch improves the performance of synchronize_rcu() approximately
by ~30% on synthetic tests. The real test case, camera launch time, shows
below figures(time is in milliseconds):

542 vs 489 diff: 9%
540 vs 466 diff: 13%
518 vs 468 diff: 9%
531 vs 457 diff: 13%
548 vs 475 diff: 13%
509 vs 484 diff: 4%

Synthetic test:

Hardware: x86_64 64 CPUs, 64GB of memory

- 60.000 tasks(simultaneous);
- each task does(1000 loops)
     synchronize_rcu();
     kfree(p);

default: CONFIG_RCU_NOCB_CPU: takes 323 seconds to complete all users;
patch: CONFIG_RCU_NOCB_CPU: takes 240 seconds to complete all users.

Please note, by default this functionality is OFF and the old way is
still used instead, In order to activate it, please do:

echo 1 > /sys/module/rcutree/parameters/rcu_normal_wake_from_gp

Signed-off-by: Uladzislau Rezki (Sony) <urezki@gmail.com>
---
 kernel/rcu/tree.c     | 150 +++++++++++++++++++++++++++++++++++++++++-
 kernel/rcu/tree_exp.h |   2 +-
 2 files changed, 150 insertions(+), 2 deletions(-)
  

Comments

Frederic Weisbecker Oct. 25, 2023, 3:13 p.m. UTC | #1
Le Wed, Oct 25, 2023 at 04:09:13PM +0200, Uladzislau Rezki (Sony) a écrit :
> +/*
> + * Helper function for rcu_gp_cleanup().
> + */
> +static void rcu_sr_normal_gp_cleanup(void)
> +{
> +	struct llist_node *head, *tail, *pos;
> +	int i = 0;
> +
> +	tail = READ_ONCE(sr.wait_tail);
> +	head = llist_del_all(&sr.wait);

This could be llist_empty() first to do a quick
cheap check. And then __llist_del_all() here because
it appears nothing else than gp kthread can touch sr.wait.

> +
> +	llist_for_each_safe(pos, head, head) {

Two times head intended here? There should be some
temporary storage in the middle.

> +		rcu_sr_normal_complete(pos);
> +
> +		if (++i == MAX_SR_WAKE_FROM_GP) {
> +			/* If last, process it also. */
> +			if (head && !head->next)
> +				continue;
> +			break;
> +		}
> +	}
> +
> +	if (head) {
> +		/* Can be not empty. */
> +		llist_add_batch(head, tail, &sr.done);
> +		queue_work(system_highpri_wq, &sr_normal_gp_cleanup);

So you can have:

* Queue to sr.curr is atomic fully ordered
* Check and move from sr.curr to sr.wait is atomic fully ordered
* Check from sr.wait can have a quick unatomic unordered
  llist_empty() check. Then extract unatomic unordered as well.
* If too many, move atomic/ordered to sr.done.

Am I missing something?
  
Boqun Feng Oct. 25, 2023, 5:17 p.m. UTC | #2
On Wed, Oct 25, 2023 at 04:09:13PM +0200, Uladzislau Rezki (Sony) wrote:
> A call to a synchronize_rcu() can be optimized from time point of
> view. Different workloads can be affected by this especially the
> ones which use this API in its time critical sections.
> 
> For example if CONFIG_RCU_NOCB_CPU is set, the wakeme_after_rcu()
> callback can be delayed and such delay depends on:
> 
> - where in a nocb list it is located;
> - how fast previous callbacks completed.
> 
> 1. On our Android devices i can easily trigger the scenario when
> it is a last in the list out of ~3600 callbacks:
> 

I wonder how many of the callbacks are queued via call_rcu_hurry()? If
not a lot, I wonder whether we can resolve the problem differently, see
below:

> <snip>
>   <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt CBs=3613 bl=28
> ...
>   <...>-29      [001] ..... 21950.152578: rcu_invoke_callback: rcu_preempt rhp=00000000b2d6dee8 func=__free_vm_area_struct.cfi_jt
>   <...>-29      [001] ..... 21950.152579: rcu_invoke_callback: rcu_preempt rhp=00000000a446f607 func=__free_vm_area_struct.cfi_jt
>   <...>-29      [001] ..... 21950.152580: rcu_invoke_callback: rcu_preempt rhp=00000000a5cab03b func=__free_vm_area_struct.cfi_jt
>   <...>-29      [001] ..... 21950.152581: rcu_invoke_callback: rcu_preempt rhp=0000000013b7e5ee func=__free_vm_area_struct.cfi_jt
>   <...>-29      [001] ..... 21950.152582: rcu_invoke_callback: rcu_preempt rhp=000000000a8ca6f9 func=__free_vm_area_struct.cfi_jt
>   <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
>   <...>-29      [001] d..1. 21950.152625: rcu_batch_end: rcu_preempt CBs-invoked=3612 idle=....
> <snip>
> 
> 2. We use cpuset/cgroup to classify tasks and assign them into
> different cgroups. For example "backgrond" group which binds tasks
> only to little CPUs or "foreground" which makes use of all CPUs.
> Tasks can be migrated between groups by a request if an acceleration
> is needed.
> 
> See below an example how "surfaceflinger" task gets migrated.
> Initially it is located in the "system-background" cgroup which
> allows to run only on little cores. In order to speed it up it
> can be temporary moved into "foreground" cgroup which allows
> to use big/all CPUs:
> 
> cgroup_attach_task():
>  -> cgroup_migrate_execute()
>    -> cpuset_can_attach()
>      -> percpu_down_write()
>        -> rcu_sync_enter()
>          -> synchronize_rcu()
>    -> now move tasks to the new cgroup.
>  -> cgroup_migrate_finish()
> 
> <snip>
>          rcuop/1-29      [000] .....  7030.528570: rcu_invoke_callback: rcu_preempt rhp=00000000461605e0 func=wakeme_after_rcu.cfi_jt
>     PERFD-SERVER-1855    [000] d..1.  7030.530293: cgroup_attach_task: dst_root=3 dst_id=22 dst_level=1 dst_path=/foreground pid=1900 comm=surfaceflinger
>     PERFD-SERVER-1855    [000] d..1.  7030.530383: cgroup_attach_task: dst_root=3 dst_id=22 dst_level=1 dst_path=/foreground pid=1900 comm=surfaceflinger
>    TimerDispatch-2768    [002] d..5.  7030.537542: sched_migrate_task: comm=surfaceflinger pid=1900 prio=98 orig_cpu=0 dest_cpu=4
> <snip>
> 
> "A moving time" depends on how fast synchronize_rcu() completes. See
> the first trace line. The migration has not occurred until the sync
> was done first. Please note, number of different callbacks to be
> invoked can be thousands.
> 
> 3. To address this drawback, maintain a separate track that consists
> of synchronize_rcu() callers only. The GP-kthread, that drivers a GP
> either wake-ups a worker to drain all list or directly wakes-up end
> user if it is one in the drain list.
> 

Late to the party, but I kinda wonder whether we can resolve it by:

1) either introduce a separate seglist that only contains callbacks
queued by call_rcu_hurry(), and whenever after an GP and callbacks are
ready, call_rcu_hurry() callbacks will be called first.

2) or make call_rcu_hurry() callbacks always inserted at the head of the
NEXT list instead of the tail, e.g. (untested code):

diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c
index f71fac422c8f..89a875f8ecc7 100644
--- a/kernel/rcu/rcu_segcblist.c
+++ b/kernel/rcu/rcu_segcblist.c
@@ -338,13 +338,21 @@ bool rcu_segcblist_nextgp(struct rcu_segcblist *rsclp, unsigned long *lp)
  * absolutely not OK for it to ever miss posting a callback.
  */
 void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp,
-			   struct rcu_head *rhp)
+			   struct rcu_head *rhp,
+			   bool is_lazy)
 {
 	rcu_segcblist_inc_len(rsclp);
 	rcu_segcblist_inc_seglen(rsclp, RCU_NEXT_TAIL);
-	rhp->next = NULL;
-	WRITE_ONCE(*rsclp->tails[RCU_NEXT_TAIL], rhp);
-	WRITE_ONCE(rsclp->tails[RCU_NEXT_TAIL], &rhp->next);
+	/* If hurry and the list is not empty, put it in the front */
+	if (!is_lazy && rcu_segcblist_get_seglen(rsclp, RCU_NEXT_TAIL) > 1) {
+		// hurry callback, queued at front
+		rhp->next = READ_ONCE(*rsclp->tails[RCU_NEXT_READY_TAIL]);
+		WRITE_ONCE(*rsclp->tails[RCU_NEXT_READY_TAIL], rhp);
+	} else {
+		rhp->next = NULL;
+		WRITE_ONCE(*rsclp->tails[RCU_NEXT_TAIL], rhp);
+		WRITE_ONCE(rsclp->tails[RCU_NEXT_TAIL], &rhp->next);
+	}
 }
 
 /*
diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h
index 4fe877f5f654..459475bb8df9 100644
--- a/kernel/rcu/rcu_segcblist.h
+++ b/kernel/rcu/rcu_segcblist.h
@@ -136,7 +136,8 @@ struct rcu_head *rcu_segcblist_first_cb(struct rcu_segcblist *rsclp);
 struct rcu_head *rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp);
 bool rcu_segcblist_nextgp(struct rcu_segcblist *rsclp, unsigned long *lp);
 void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp,
-			   struct rcu_head *rhp);
+			   struct rcu_head *rhp,
+			   bool is_lazy);
 bool rcu_segcblist_entrain(struct rcu_segcblist *rsclp,
 			   struct rcu_head *rhp);
 void rcu_segcblist_extract_done_cbs(struct rcu_segcblist *rsclp,
diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c
index 20d7a238d675..53adf5ab9c9f 100644
--- a/kernel/rcu/srcutree.c
+++ b/kernel/rcu/srcutree.c
@@ -1241,7 +1241,7 @@ static unsigned long srcu_gp_start_if_needed(struct srcu_struct *ssp,
 		sdp = raw_cpu_ptr(ssp->sda);
 	spin_lock_irqsave_sdp_contention(sdp, &flags);
 	if (rhp)
-		rcu_segcblist_enqueue(&sdp->srcu_cblist, rhp);
+		rcu_segcblist_enqueue(&sdp->srcu_cblist, rhp, true);
 	rcu_segcblist_advance(&sdp->srcu_cblist,
 			      rcu_seq_current(&ssp->srcu_sup->srcu_gp_seq));
 	s = rcu_seq_snap(&ssp->srcu_sup->srcu_gp_seq);
diff --git a/kernel/rcu/tasks.h b/kernel/rcu/tasks.h
index 8d65f7d576a3..7dec7c68f88f 100644
--- a/kernel/rcu/tasks.h
+++ b/kernel/rcu/tasks.h
@@ -362,7 +362,7 @@ static void call_rcu_tasks_generic(struct rcu_head *rhp, rcu_callback_t func,
 	}
 	if (needwake)
 		rtpcp->urgent_gp = 3;
-	rcu_segcblist_enqueue(&rtpcp->cblist, rhp);
+	rcu_segcblist_enqueue(&rtpcp->cblist, rhp, true);
 	raw_spin_unlock_irqrestore_rcu_node(rtpcp, flags);
 	if (unlikely(needadjust)) {
 		raw_spin_lock_irqsave(&rtp->cbs_gbl_lock, flags);
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index cb1caefa8bd0..e05cbff40dc7 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -2670,7 +2670,7 @@ __call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy_in)
 	if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy))
 		return; // Enqueued onto ->nocb_bypass, so just leave.
 	// If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock.
-	rcu_segcblist_enqueue(&rdp->cblist, head);
+	rcu_segcblist_enqueue(&rdp->cblist, head, lazy_in);
 	if (__is_kvfree_rcu_offset((unsigned long)func))
 		trace_rcu_kvfree_callback(rcu_state.name, head,
 					 (unsigned long)func,

Sure, there may be some corner cases I'm missing, but I think overall
this is better than (sorta) duplicating the logic of seglist (the llist
in sr_normal_state) or the logic of wake_rcu_gp()
(synchronize_rcu_normal).

Anyway, these are just if-you-have-time-to-try options ;-)

Regards,
Boqun

> 4. This patch improves the performance of synchronize_rcu() approximately
> by ~30% on synthetic tests. The real test case, camera launch time, shows
> below figures(time is in milliseconds):
> 
> 542 vs 489 diff: 9%
> 540 vs 466 diff: 13%
> 518 vs 468 diff: 9%
> 531 vs 457 diff: 13%
> 548 vs 475 diff: 13%
> 509 vs 484 diff: 4%
> 
> Synthetic test:
> 
> Hardware: x86_64 64 CPUs, 64GB of memory
> 
> - 60.000 tasks(simultaneous);
> - each task does(1000 loops)
>      synchronize_rcu();
>      kfree(p);
> 
> default: CONFIG_RCU_NOCB_CPU: takes 323 seconds to complete all users;
> patch: CONFIG_RCU_NOCB_CPU: takes 240 seconds to complete all users.
> 
> Please note, by default this functionality is OFF and the old way is
> still used instead, In order to activate it, please do:
> 
> echo 1 > /sys/module/rcutree/parameters/rcu_normal_wake_from_gp
> 
> Signed-off-by: Uladzislau Rezki (Sony) <urezki@gmail.com>
> ---
[...]
  
Uladzislau Rezki Oct. 26, 2023, 1 p.m. UTC | #3
On Wed, Oct 25, 2023 at 05:13:27PM +0200, Frederic Weisbecker wrote:
> Le Wed, Oct 25, 2023 at 04:09:13PM +0200, Uladzislau Rezki (Sony) a écrit :
> > +/*
> > + * Helper function for rcu_gp_cleanup().
> > + */
> > +static void rcu_sr_normal_gp_cleanup(void)
> > +{
> > +	struct llist_node *head, *tail, *pos;
> > +	int i = 0;
> > +
> > +	tail = READ_ONCE(sr.wait_tail);
> > +	head = llist_del_all(&sr.wait);
> 
> This could be llist_empty() first to do a quick
> cheap check. And then __llist_del_all() here because
> it appears nothing else than gp kthread can touch sr.wait.
> 
No problem i can fix it. Initially i had a check first!

> > +
> > +	llist_for_each_safe(pos, head, head) {
> 
> Two times head intended here? There should be some
> temporary storage in the middle.
> 
Yes. It is intentially done. The head is updated, i.e. shifted to a next,
because we directly process users from a GP. The number is limited to 5
all the rest is deferred.

> > +		rcu_sr_normal_complete(pos);
> > +
> > +		if (++i == MAX_SR_WAKE_FROM_GP) {
> > +			/* If last, process it also. */
> > +			if (head && !head->next)
> > +				continue;
> > +			break;
> > +		}
> > +	}
> > +
> > +	if (head) {
> > +		/* Can be not empty. */
> > +		llist_add_batch(head, tail, &sr.done);
> > +		queue_work(system_highpri_wq, &sr_normal_gp_cleanup);
> 
> So you can have:
> 
> * Queue to sr.curr is atomic fully ordered
> * Check and move from sr.curr to sr.wait is atomic fully ordered
> * Check from sr.wait can have a quick unatomic unordered
>   llist_empty() check. Then extract unatomic unordered as well.
> * If too many, move atomic/ordered to sr.done.
> 
> Am I missing something?
>
If too many move to done and kick the helper. The sr.wait can not
be touched until the rcu_sr_normal_gp_cleanup() is completed, i.e.:

<snip>
GP-kthread(same and one task context):
    rcu_sr_normal_gp_cleanup();
    wait for a grace period;
    rcu_sr_normal_gp_cleanup();
<snip>

Am i missing your point?

--
Uladzislau Rezki
  
Uladzislau Rezki Oct. 26, 2023, 1:09 p.m. UTC | #4
On Wed, Oct 25, 2023 at 10:17:22AM -0700, Boqun Feng wrote:
> On Wed, Oct 25, 2023 at 04:09:13PM +0200, Uladzislau Rezki (Sony) wrote:
> > A call to a synchronize_rcu() can be optimized from time point of
> > view. Different workloads can be affected by this especially the
> > ones which use this API in its time critical sections.
> > 
> > For example if CONFIG_RCU_NOCB_CPU is set, the wakeme_after_rcu()
> > callback can be delayed and such delay depends on:
> > 
> > - where in a nocb list it is located;
> > - how fast previous callbacks completed.
> > 
> > 1. On our Android devices i can easily trigger the scenario when
> > it is a last in the list out of ~3600 callbacks:
> > 
> 
> I wonder how many of the callbacks are queued via call_rcu_hurry()? If
> not a lot, I wonder whether we can resolve the problem differently, see
> below:
> 
It might be many. call_rcu_hurry() is still a new API. I expect the
usage of it will be increased especially in places where the memory
have to be reclaimed asap, for example small devices which are suffering
from the OOM killer.

> > <snip>
> >   <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt CBs=3613 bl=28
> > ...
> >   <...>-29      [001] ..... 21950.152578: rcu_invoke_callback: rcu_preempt rhp=00000000b2d6dee8 func=__free_vm_area_struct.cfi_jt
> >   <...>-29      [001] ..... 21950.152579: rcu_invoke_callback: rcu_preempt rhp=00000000a446f607 func=__free_vm_area_struct.cfi_jt
> >   <...>-29      [001] ..... 21950.152580: rcu_invoke_callback: rcu_preempt rhp=00000000a5cab03b func=__free_vm_area_struct.cfi_jt
> >   <...>-29      [001] ..... 21950.152581: rcu_invoke_callback: rcu_preempt rhp=0000000013b7e5ee func=__free_vm_area_struct.cfi_jt
> >   <...>-29      [001] ..... 21950.152582: rcu_invoke_callback: rcu_preempt rhp=000000000a8ca6f9 func=__free_vm_area_struct.cfi_jt
> >   <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
> >   <...>-29      [001] d..1. 21950.152625: rcu_batch_end: rcu_preempt CBs-invoked=3612 idle=....
> > <snip>
> > 
> > 2. We use cpuset/cgroup to classify tasks and assign them into
> > different cgroups. For example "backgrond" group which binds tasks
> > only to little CPUs or "foreground" which makes use of all CPUs.
> > Tasks can be migrated between groups by a request if an acceleration
> > is needed.
> > 
> > See below an example how "surfaceflinger" task gets migrated.
> > Initially it is located in the "system-background" cgroup which
> > allows to run only on little cores. In order to speed it up it
> > can be temporary moved into "foreground" cgroup which allows
> > to use big/all CPUs:
> > 
> > cgroup_attach_task():
> >  -> cgroup_migrate_execute()
> >    -> cpuset_can_attach()
> >      -> percpu_down_write()
> >        -> rcu_sync_enter()
> >          -> synchronize_rcu()
> >    -> now move tasks to the new cgroup.
> >  -> cgroup_migrate_finish()
> > 
> > <snip>
> >          rcuop/1-29      [000] .....  7030.528570: rcu_invoke_callback: rcu_preempt rhp=00000000461605e0 func=wakeme_after_rcu.cfi_jt
> >     PERFD-SERVER-1855    [000] d..1.  7030.530293: cgroup_attach_task: dst_root=3 dst_id=22 dst_level=1 dst_path=/foreground pid=1900 comm=surfaceflinger
> >     PERFD-SERVER-1855    [000] d..1.  7030.530383: cgroup_attach_task: dst_root=3 dst_id=22 dst_level=1 dst_path=/foreground pid=1900 comm=surfaceflinger
> >    TimerDispatch-2768    [002] d..5.  7030.537542: sched_migrate_task: comm=surfaceflinger pid=1900 prio=98 orig_cpu=0 dest_cpu=4
> > <snip>
> > 
> > "A moving time" depends on how fast synchronize_rcu() completes. See
> > the first trace line. The migration has not occurred until the sync
> > was done first. Please note, number of different callbacks to be
> > invoked can be thousands.
> > 
> > 3. To address this drawback, maintain a separate track that consists
> > of synchronize_rcu() callers only. The GP-kthread, that drivers a GP
> > either wake-ups a worker to drain all list or directly wakes-up end
> > user if it is one in the drain list.
> > 
> 
> Late to the party, but I kinda wonder whether we can resolve it by:
> 
> 1) either introduce a separate seglist that only contains callbacks
> queued by call_rcu_hurry(), and whenever after an GP and callbacks are
> ready, call_rcu_hurry() callbacks will be called first.
> 
> 2) or make call_rcu_hurry() callbacks always inserted at the head of the
> NEXT list instead of the tail, e.g. (untested code):
> 
> diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c
> index f71fac422c8f..89a875f8ecc7 100644
> --- a/kernel/rcu/rcu_segcblist.c
> +++ b/kernel/rcu/rcu_segcblist.c
> @@ -338,13 +338,21 @@ bool rcu_segcblist_nextgp(struct rcu_segcblist *rsclp, unsigned long *lp)
>   * absolutely not OK for it to ever miss posting a callback.
>   */
>  void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp,
> -			   struct rcu_head *rhp)
> +			   struct rcu_head *rhp,
> +			   bool is_lazy)
>  {
>  	rcu_segcblist_inc_len(rsclp);
>  	rcu_segcblist_inc_seglen(rsclp, RCU_NEXT_TAIL);
> -	rhp->next = NULL;
> -	WRITE_ONCE(*rsclp->tails[RCU_NEXT_TAIL], rhp);
> -	WRITE_ONCE(rsclp->tails[RCU_NEXT_TAIL], &rhp->next);
> +	/* If hurry and the list is not empty, put it in the front */
> +	if (!is_lazy && rcu_segcblist_get_seglen(rsclp, RCU_NEXT_TAIL) > 1) {
> +		// hurry callback, queued at front
> +		rhp->next = READ_ONCE(*rsclp->tails[RCU_NEXT_READY_TAIL]);
> +		WRITE_ONCE(*rsclp->tails[RCU_NEXT_READY_TAIL], rhp);
> +	} else {
> +		rhp->next = NULL;
> +		WRITE_ONCE(*rsclp->tails[RCU_NEXT_TAIL], rhp);
> +		WRITE_ONCE(rsclp->tails[RCU_NEXT_TAIL], &rhp->next);
> +	}
>  }
>  
>  /*
> diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h
> index 4fe877f5f654..459475bb8df9 100644
> --- a/kernel/rcu/rcu_segcblist.h
> +++ b/kernel/rcu/rcu_segcblist.h
> @@ -136,7 +136,8 @@ struct rcu_head *rcu_segcblist_first_cb(struct rcu_segcblist *rsclp);
>  struct rcu_head *rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp);
>  bool rcu_segcblist_nextgp(struct rcu_segcblist *rsclp, unsigned long *lp);
>  void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp,
> -			   struct rcu_head *rhp);
> +			   struct rcu_head *rhp,
> +			   bool is_lazy);
>  bool rcu_segcblist_entrain(struct rcu_segcblist *rsclp,
>  			   struct rcu_head *rhp);
>  void rcu_segcblist_extract_done_cbs(struct rcu_segcblist *rsclp,
> diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c
> index 20d7a238d675..53adf5ab9c9f 100644
> --- a/kernel/rcu/srcutree.c
> +++ b/kernel/rcu/srcutree.c
> @@ -1241,7 +1241,7 @@ static unsigned long srcu_gp_start_if_needed(struct srcu_struct *ssp,
>  		sdp = raw_cpu_ptr(ssp->sda);
>  	spin_lock_irqsave_sdp_contention(sdp, &flags);
>  	if (rhp)
> -		rcu_segcblist_enqueue(&sdp->srcu_cblist, rhp);
> +		rcu_segcblist_enqueue(&sdp->srcu_cblist, rhp, true);
>  	rcu_segcblist_advance(&sdp->srcu_cblist,
>  			      rcu_seq_current(&ssp->srcu_sup->srcu_gp_seq));
>  	s = rcu_seq_snap(&ssp->srcu_sup->srcu_gp_seq);
> diff --git a/kernel/rcu/tasks.h b/kernel/rcu/tasks.h
> index 8d65f7d576a3..7dec7c68f88f 100644
> --- a/kernel/rcu/tasks.h
> +++ b/kernel/rcu/tasks.h
> @@ -362,7 +362,7 @@ static void call_rcu_tasks_generic(struct rcu_head *rhp, rcu_callback_t func,
>  	}
>  	if (needwake)
>  		rtpcp->urgent_gp = 3;
> -	rcu_segcblist_enqueue(&rtpcp->cblist, rhp);
> +	rcu_segcblist_enqueue(&rtpcp->cblist, rhp, true);
>  	raw_spin_unlock_irqrestore_rcu_node(rtpcp, flags);
>  	if (unlikely(needadjust)) {
>  		raw_spin_lock_irqsave(&rtp->cbs_gbl_lock, flags);
> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> index cb1caefa8bd0..e05cbff40dc7 100644
> --- a/kernel/rcu/tree.c
> +++ b/kernel/rcu/tree.c
> @@ -2670,7 +2670,7 @@ __call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy_in)
>  	if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy))
>  		return; // Enqueued onto ->nocb_bypass, so just leave.
>  	// If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock.
> -	rcu_segcblist_enqueue(&rdp->cblist, head);
> +	rcu_segcblist_enqueue(&rdp->cblist, head, lazy_in);
>  	if (__is_kvfree_rcu_offset((unsigned long)func))
>  		trace_rcu_kvfree_callback(rcu_state.name, head,
>  					 (unsigned long)func,
> 
> Sure, there may be some corner cases I'm missing, but I think overall
> this is better than (sorta) duplicating the logic of seglist (the llist
> in sr_normal_state) or the logic of wake_rcu_gp()
> (synchronize_rcu_normal).
> 
> Anyway, these are just if-you-have-time-to-try options ;-)
> 
Hm.. You still mix callbacks and there is a dependency in order
of execution. The callback process time also might be varied from
one callback to another.

If you have many *_hurry() calls we end in the same situation. Apart
of that we also have !CONFIG_RCU_NOCB_CPU path that is also covered
by the patch that is in question.

--
Uladzislau Rezki
  
Frederic Weisbecker Oct. 26, 2023, 2:22 p.m. UTC | #5
On Thu, Oct 26, 2023 at 03:00:25PM +0200, Uladzislau Rezki wrote:
> On Wed, Oct 25, 2023 at 05:13:27PM +0200, Frederic Weisbecker wrote:
> > > +	llist_for_each_safe(pos, head, head) {
> > 
> > Two times head intended here? There should be some
> > temporary storage in the middle.
> > 
> Yes. It is intentially done. The head is updated, i.e. shifted to a next,
> because we directly process users from a GP. The number is limited to 5
> all the rest is deferred.

Ah ok.

> > So you can have:
> > 
> > * Queue to sr.curr is atomic fully ordered
> > * Check and move from sr.curr to sr.wait is atomic fully ordered
> > * Check from sr.wait can have a quick unatomic unordered
> >   llist_empty() check. Then extract unatomic unordered as well.
> > * If too many, move atomic/ordered to sr.done.
> > 
> > Am I missing something?
> >
> If too many move to done and kick the helper. The sr.wait can not
> be touched until the rcu_sr_normal_gp_cleanup() is completed, i.e.:
> 
> <snip>
> GP-kthread(same and one task context):
>     rcu_sr_normal_gp_cleanup();
>     wait for a grace period;
>     rcu_sr_normal_gp_cleanup();
> <snip>
> 
> Am i missing your point?

Yeah got it. My point was just that any manipulation of sr.wait can be
done without atomic/ordered operations. Such as using __list_empty() and
__llist_del_all().

Ah there is also the line:

   llist_add_batch(head, tail, &sr.wait);

in rcu_sr_normal_gp_init() that can be turned into __llist_add_batch()

Thanks.
  
Uladzislau Rezki Oct. 26, 2023, 3:14 p.m. UTC | #6
On Thu, Oct 26, 2023 at 04:22:17PM +0200, Frederic Weisbecker wrote:
> On Thu, Oct 26, 2023 at 03:00:25PM +0200, Uladzislau Rezki wrote:
> > On Wed, Oct 25, 2023 at 05:13:27PM +0200, Frederic Weisbecker wrote:
> > > > +	llist_for_each_safe(pos, head, head) {
> > > 
> > > Two times head intended here? There should be some
> > > temporary storage in the middle.
> > > 
> > Yes. It is intentially done. The head is updated, i.e. shifted to a next,
> > because we directly process users from a GP. The number is limited to 5
> > all the rest is deferred.
> 
> Ah ok.
> 
> > > So you can have:
> > > 
> > > * Queue to sr.curr is atomic fully ordered
> > > * Check and move from sr.curr to sr.wait is atomic fully ordered
> > > * Check from sr.wait can have a quick unatomic unordered
> > >   llist_empty() check. Then extract unatomic unordered as well.
> > > * If too many, move atomic/ordered to sr.done.
> > > 
> > > Am I missing something?
> > >
> > If too many move to done and kick the helper. The sr.wait can not
> > be touched until the rcu_sr_normal_gp_cleanup() is completed, i.e.:
> > 
> > <snip>
> > GP-kthread(same and one task context):
> >     rcu_sr_normal_gp_cleanup();
> >     wait for a grace period;
> >     rcu_sr_normal_gp_cleanup();
> > <snip>
> > 
> > Am i missing your point?
> 
> Yeah got it. My point was just that any manipulation of sr.wait can be
> done without atomic/ordered operations. Such as using __list_empty() and
> __llist_del_all().
> 
> Ah there is also the line:
> 
>    llist_add_batch(head, tail, &sr.wait);
> 
> in rcu_sr_normal_gp_init() that can be turned into __llist_add_batch()
> 
Thank you for the good input. Indeed we can manipulate sr.wait using
__llist* functions. I will update it accordingly. So, see your point.

Appreciate for your review!

--
Uladzislau Rezki
  
Boqun Feng Oct. 29, 2023, 6:21 p.m. UTC | #7
On Thu, Oct 26, 2023 at 03:09:02PM +0200, Uladzislau Rezki wrote:
[...]
> > Late to the party, but I kinda wonder whether we can resolve it by:
> > 
> > 1) either introduce a separate seglist that only contains callbacks
> > queued by call_rcu_hurry(), and whenever after an GP and callbacks are
> > ready, call_rcu_hurry() callbacks will be called first.
> > 
> > 2) or make call_rcu_hurry() callbacks always inserted at the head of the
> > NEXT list instead of the tail, e.g. (untested code):
> > 
> > diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c
> > index f71fac422c8f..89a875f8ecc7 100644
> > --- a/kernel/rcu/rcu_segcblist.c
> > +++ b/kernel/rcu/rcu_segcblist.c
> > @@ -338,13 +338,21 @@ bool rcu_segcblist_nextgp(struct rcu_segcblist *rsclp, unsigned long *lp)
> >   * absolutely not OK for it to ever miss posting a callback.
> >   */
> >  void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp,
> > -			   struct rcu_head *rhp)
> > +			   struct rcu_head *rhp,
> > +			   bool is_lazy)
> >  {
> >  	rcu_segcblist_inc_len(rsclp);
> >  	rcu_segcblist_inc_seglen(rsclp, RCU_NEXT_TAIL);
> > -	rhp->next = NULL;
> > -	WRITE_ONCE(*rsclp->tails[RCU_NEXT_TAIL], rhp);
> > -	WRITE_ONCE(rsclp->tails[RCU_NEXT_TAIL], &rhp->next);
> > +	/* If hurry and the list is not empty, put it in the front */
> > +	if (!is_lazy && rcu_segcblist_get_seglen(rsclp, RCU_NEXT_TAIL) > 1) {
> > +		// hurry callback, queued at front
> > +		rhp->next = READ_ONCE(*rsclp->tails[RCU_NEXT_READY_TAIL]);
> > +		WRITE_ONCE(*rsclp->tails[RCU_NEXT_READY_TAIL], rhp);
> > +	} else {
> > +		rhp->next = NULL;
> > +		WRITE_ONCE(*rsclp->tails[RCU_NEXT_TAIL], rhp);
> > +		WRITE_ONCE(rsclp->tails[RCU_NEXT_TAIL], &rhp->next);
> > +	}
> >  }
> >  
> >  /*
> > diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h
> > index 4fe877f5f654..459475bb8df9 100644
> > --- a/kernel/rcu/rcu_segcblist.h
> > +++ b/kernel/rcu/rcu_segcblist.h
> > @@ -136,7 +136,8 @@ struct rcu_head *rcu_segcblist_first_cb(struct rcu_segcblist *rsclp);
> >  struct rcu_head *rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp);
> >  bool rcu_segcblist_nextgp(struct rcu_segcblist *rsclp, unsigned long *lp);
> >  void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp,
> > -			   struct rcu_head *rhp);
> > +			   struct rcu_head *rhp,
> > +			   bool is_lazy);
> >  bool rcu_segcblist_entrain(struct rcu_segcblist *rsclp,
> >  			   struct rcu_head *rhp);
> >  void rcu_segcblist_extract_done_cbs(struct rcu_segcblist *rsclp,
> > diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c
> > index 20d7a238d675..53adf5ab9c9f 100644
> > --- a/kernel/rcu/srcutree.c
> > +++ b/kernel/rcu/srcutree.c
> > @@ -1241,7 +1241,7 @@ static unsigned long srcu_gp_start_if_needed(struct srcu_struct *ssp,
> >  		sdp = raw_cpu_ptr(ssp->sda);
> >  	spin_lock_irqsave_sdp_contention(sdp, &flags);
> >  	if (rhp)
> > -		rcu_segcblist_enqueue(&sdp->srcu_cblist, rhp);
> > +		rcu_segcblist_enqueue(&sdp->srcu_cblist, rhp, true);
> >  	rcu_segcblist_advance(&sdp->srcu_cblist,
> >  			      rcu_seq_current(&ssp->srcu_sup->srcu_gp_seq));
> >  	s = rcu_seq_snap(&ssp->srcu_sup->srcu_gp_seq);
> > diff --git a/kernel/rcu/tasks.h b/kernel/rcu/tasks.h
> > index 8d65f7d576a3..7dec7c68f88f 100644
> > --- a/kernel/rcu/tasks.h
> > +++ b/kernel/rcu/tasks.h
> > @@ -362,7 +362,7 @@ static void call_rcu_tasks_generic(struct rcu_head *rhp, rcu_callback_t func,
> >  	}
> >  	if (needwake)
> >  		rtpcp->urgent_gp = 3;
> > -	rcu_segcblist_enqueue(&rtpcp->cblist, rhp);
> > +	rcu_segcblist_enqueue(&rtpcp->cblist, rhp, true);
> >  	raw_spin_unlock_irqrestore_rcu_node(rtpcp, flags);
> >  	if (unlikely(needadjust)) {
> >  		raw_spin_lock_irqsave(&rtp->cbs_gbl_lock, flags);
> > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> > index cb1caefa8bd0..e05cbff40dc7 100644
> > --- a/kernel/rcu/tree.c
> > +++ b/kernel/rcu/tree.c
> > @@ -2670,7 +2670,7 @@ __call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy_in)
> >  	if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy))
> >  		return; // Enqueued onto ->nocb_bypass, so just leave.
> >  	// If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock.
> > -	rcu_segcblist_enqueue(&rdp->cblist, head);
> > +	rcu_segcblist_enqueue(&rdp->cblist, head, lazy_in);
> >  	if (__is_kvfree_rcu_offset((unsigned long)func))
> >  		trace_rcu_kvfree_callback(rcu_state.name, head,
> >  					 (unsigned long)func,
> > 

Surprisingly, this survives from a whole rcutorture run ;-)

> > Sure, there may be some corner cases I'm missing, but I think overall
> > this is better than (sorta) duplicating the logic of seglist (the llist
> > in sr_normal_state) or the logic of wake_rcu_gp()
> > (synchronize_rcu_normal).
> > 
> > Anyway, these are just if-you-have-time-to-try options ;-)
> > 
> Hm.. You still mix callbacks and there is a dependency in order
> of execution. The callback process time also might be varied from
> one callback to another.
> 
> If you have many *_hurry() calls we end in the same situation. Apart

I plan to resolve that by only puting a call_rcu_hurry(wakeme_after_gp)
in the front of the list.

> of that we also have !CONFIG_RCU_NOCB_CPU path that is also covered
> by the patch that is in question.

I don't see why the above approach doesn't work for
!CONFIG_RCU_NOCB_CPU, but I maybe miss something here.

Do you have a benchmark I can try out to see if my diff can achieve the
similar result? Thanks!

Regards,
Boqun

> 
> --
> Uladzislau Rezki
  
Uladzislau Rezki Nov. 1, 2023, 10:33 a.m. UTC | #8
On Sun, Oct 29, 2023 at 11:21:11AM -0700, Boqun Feng wrote:
> On Thu, Oct 26, 2023 at 03:09:02PM +0200, Uladzislau Rezki wrote:
> [...]
> > > Late to the party, but I kinda wonder whether we can resolve it by:
> > > 
> > > 1) either introduce a separate seglist that only contains callbacks
> > > queued by call_rcu_hurry(), and whenever after an GP and callbacks are
> > > ready, call_rcu_hurry() callbacks will be called first.
> > > 
> > > 2) or make call_rcu_hurry() callbacks always inserted at the head of the
> > > NEXT list instead of the tail, e.g. (untested code):
> > > 
> > > diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c
> > > index f71fac422c8f..89a875f8ecc7 100644
> > > --- a/kernel/rcu/rcu_segcblist.c
> > > +++ b/kernel/rcu/rcu_segcblist.c
> > > @@ -338,13 +338,21 @@ bool rcu_segcblist_nextgp(struct rcu_segcblist *rsclp, unsigned long *lp)
> > >   * absolutely not OK for it to ever miss posting a callback.
> > >   */
> > >  void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp,
> > > -			   struct rcu_head *rhp)
> > > +			   struct rcu_head *rhp,
> > > +			   bool is_lazy)
> > >  {
> > >  	rcu_segcblist_inc_len(rsclp);
> > >  	rcu_segcblist_inc_seglen(rsclp, RCU_NEXT_TAIL);
> > > -	rhp->next = NULL;
> > > -	WRITE_ONCE(*rsclp->tails[RCU_NEXT_TAIL], rhp);
> > > -	WRITE_ONCE(rsclp->tails[RCU_NEXT_TAIL], &rhp->next);
> > > +	/* If hurry and the list is not empty, put it in the front */
> > > +	if (!is_lazy && rcu_segcblist_get_seglen(rsclp, RCU_NEXT_TAIL) > 1) {
> > > +		// hurry callback, queued at front
> > > +		rhp->next = READ_ONCE(*rsclp->tails[RCU_NEXT_READY_TAIL]);
> > > +		WRITE_ONCE(*rsclp->tails[RCU_NEXT_READY_TAIL], rhp);
> > > +	} else {
> > > +		rhp->next = NULL;
> > > +		WRITE_ONCE(*rsclp->tails[RCU_NEXT_TAIL], rhp);
> > > +		WRITE_ONCE(rsclp->tails[RCU_NEXT_TAIL], &rhp->next);
> > > +	}
> > >  }
> > >  
> > >  /*
> > > diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h
> > > index 4fe877f5f654..459475bb8df9 100644
> > > --- a/kernel/rcu/rcu_segcblist.h
> > > +++ b/kernel/rcu/rcu_segcblist.h
> > > @@ -136,7 +136,8 @@ struct rcu_head *rcu_segcblist_first_cb(struct rcu_segcblist *rsclp);
> > >  struct rcu_head *rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp);
> > >  bool rcu_segcblist_nextgp(struct rcu_segcblist *rsclp, unsigned long *lp);
> > >  void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp,
> > > -			   struct rcu_head *rhp);
> > > +			   struct rcu_head *rhp,
> > > +			   bool is_lazy);
> > >  bool rcu_segcblist_entrain(struct rcu_segcblist *rsclp,
> > >  			   struct rcu_head *rhp);
> > >  void rcu_segcblist_extract_done_cbs(struct rcu_segcblist *rsclp,
> > > diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c
> > > index 20d7a238d675..53adf5ab9c9f 100644
> > > --- a/kernel/rcu/srcutree.c
> > > +++ b/kernel/rcu/srcutree.c
> > > @@ -1241,7 +1241,7 @@ static unsigned long srcu_gp_start_if_needed(struct srcu_struct *ssp,
> > >  		sdp = raw_cpu_ptr(ssp->sda);
> > >  	spin_lock_irqsave_sdp_contention(sdp, &flags);
> > >  	if (rhp)
> > > -		rcu_segcblist_enqueue(&sdp->srcu_cblist, rhp);
> > > +		rcu_segcblist_enqueue(&sdp->srcu_cblist, rhp, true);
> > >  	rcu_segcblist_advance(&sdp->srcu_cblist,
> > >  			      rcu_seq_current(&ssp->srcu_sup->srcu_gp_seq));
> > >  	s = rcu_seq_snap(&ssp->srcu_sup->srcu_gp_seq);
> > > diff --git a/kernel/rcu/tasks.h b/kernel/rcu/tasks.h
> > > index 8d65f7d576a3..7dec7c68f88f 100644
> > > --- a/kernel/rcu/tasks.h
> > > +++ b/kernel/rcu/tasks.h
> > > @@ -362,7 +362,7 @@ static void call_rcu_tasks_generic(struct rcu_head *rhp, rcu_callback_t func,
> > >  	}
> > >  	if (needwake)
> > >  		rtpcp->urgent_gp = 3;
> > > -	rcu_segcblist_enqueue(&rtpcp->cblist, rhp);
> > > +	rcu_segcblist_enqueue(&rtpcp->cblist, rhp, true);
> > >  	raw_spin_unlock_irqrestore_rcu_node(rtpcp, flags);
> > >  	if (unlikely(needadjust)) {
> > >  		raw_spin_lock_irqsave(&rtp->cbs_gbl_lock, flags);
> > > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> > > index cb1caefa8bd0..e05cbff40dc7 100644
> > > --- a/kernel/rcu/tree.c
> > > +++ b/kernel/rcu/tree.c
> > > @@ -2670,7 +2670,7 @@ __call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy_in)
> > >  	if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy))
> > >  		return; // Enqueued onto ->nocb_bypass, so just leave.
> > >  	// If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock.
> > > -	rcu_segcblist_enqueue(&rdp->cblist, head);
> > > +	rcu_segcblist_enqueue(&rdp->cblist, head, lazy_in);
> > >  	if (__is_kvfree_rcu_offset((unsigned long)func))
> > >  		trace_rcu_kvfree_callback(rcu_state.name, head,
> > >  					 (unsigned long)func,
> > > 
> 
> Surprisingly, this survives from a whole rcutorture run ;-)
> 
> > > Sure, there may be some corner cases I'm missing, but I think overall
> > > this is better than (sorta) duplicating the logic of seglist (the llist
> > > in sr_normal_state) or the logic of wake_rcu_gp()
> > > (synchronize_rcu_normal).
> > > 
> > > Anyway, these are just if-you-have-time-to-try options ;-)
> > > 
> > Hm.. You still mix callbacks and there is a dependency in order
> > of execution. The callback process time also might be varied from
> > one callback to another.
> > 
> > If you have many *_hurry() calls we end in the same situation. Apart
> 
> I plan to resolve that by only puting a call_rcu_hurry(wakeme_after_gp)
> in the front of the list.
> 
> > of that we also have !CONFIG_RCU_NOCB_CPU path that is also covered
> > by the patch that is in question.
> 
> I don't see why the above approach doesn't work for
> !CONFIG_RCU_NOCB_CPU, but I maybe miss something here.
> 
Basically it does not work, because you do not fix the mixing "issue".
I have been working on it and we agreed to separate it. Because it is
just makes sense. The reason and the problem i see, i described in the
commit message of v2.

>
> Do you have a benchmark I can try out to see if my diff can achieve the
> similar result? Thanks!
> 
There is no a good benchmark. But you can write it for sure. I tested
three scenarios:

- Run a camera app on our Android devices. Measuring app launch in
  milliseconds;
- Doing synchronize_rcu() and kfree(ptr) simultaneously by 10K/etc
  workers. It is important test case because we have a fallback to
  this scenario for our kvfree_rcu_mightslepp() API.
- I had a look at time delta of loading 100 kernel modules.

That were my main test cases.

--
Uladzislau Rezki
  
Uladzislau Rezki Nov. 1, 2023, 3:35 p.m. UTC | #9
On Wed, Nov 01, 2023 at 11:33:35AM +0100, Uladzislau Rezki wrote:
> On Sun, Oct 29, 2023 at 11:21:11AM -0700, Boqun Feng wrote:
> > On Thu, Oct 26, 2023 at 03:09:02PM +0200, Uladzislau Rezki wrote:
> > [...]
> > > > Late to the party, but I kinda wonder whether we can resolve it by:
> > > > 
> > > > 1) either introduce a separate seglist that only contains callbacks
> > > > queued by call_rcu_hurry(), and whenever after an GP and callbacks are
> > > > ready, call_rcu_hurry() callbacks will be called first.
> > > > 
> > > > 2) or make call_rcu_hurry() callbacks always inserted at the head of the
> > > > NEXT list instead of the tail, e.g. (untested code):
> > > > 
> > > > diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c
> > > > index f71fac422c8f..89a875f8ecc7 100644
> > > > --- a/kernel/rcu/rcu_segcblist.c
> > > > +++ b/kernel/rcu/rcu_segcblist.c
> > > > @@ -338,13 +338,21 @@ bool rcu_segcblist_nextgp(struct rcu_segcblist *rsclp, unsigned long *lp)
> > > >   * absolutely not OK for it to ever miss posting a callback.
> > > >   */
> > > >  void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp,
> > > > -			   struct rcu_head *rhp)
> > > > +			   struct rcu_head *rhp,
> > > > +			   bool is_lazy)
> > > >  {
> > > >  	rcu_segcblist_inc_len(rsclp);
> > > >  	rcu_segcblist_inc_seglen(rsclp, RCU_NEXT_TAIL);
> > > > -	rhp->next = NULL;
> > > > -	WRITE_ONCE(*rsclp->tails[RCU_NEXT_TAIL], rhp);
> > > > -	WRITE_ONCE(rsclp->tails[RCU_NEXT_TAIL], &rhp->next);
> > > > +	/* If hurry and the list is not empty, put it in the front */
> > > > +	if (!is_lazy && rcu_segcblist_get_seglen(rsclp, RCU_NEXT_TAIL) > 1) {
> > > > +		// hurry callback, queued at front
> > > > +		rhp->next = READ_ONCE(*rsclp->tails[RCU_NEXT_READY_TAIL]);
> > > > +		WRITE_ONCE(*rsclp->tails[RCU_NEXT_READY_TAIL], rhp);
> > > > +	} else {
> > > > +		rhp->next = NULL;
> > > > +		WRITE_ONCE(*rsclp->tails[RCU_NEXT_TAIL], rhp);
> > > > +		WRITE_ONCE(rsclp->tails[RCU_NEXT_TAIL], &rhp->next);
> > > > +	}
> > > >  }
> > > >  
> > > >  /*
> > > > diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h
> > > > index 4fe877f5f654..459475bb8df9 100644
> > > > --- a/kernel/rcu/rcu_segcblist.h
> > > > +++ b/kernel/rcu/rcu_segcblist.h
> > > > @@ -136,7 +136,8 @@ struct rcu_head *rcu_segcblist_first_cb(struct rcu_segcblist *rsclp);
> > > >  struct rcu_head *rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp);
> > > >  bool rcu_segcblist_nextgp(struct rcu_segcblist *rsclp, unsigned long *lp);
> > > >  void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp,
> > > > -			   struct rcu_head *rhp);
> > > > +			   struct rcu_head *rhp,
> > > > +			   bool is_lazy);
> > > >  bool rcu_segcblist_entrain(struct rcu_segcblist *rsclp,
> > > >  			   struct rcu_head *rhp);
> > > >  void rcu_segcblist_extract_done_cbs(struct rcu_segcblist *rsclp,
> > > > diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c
> > > > index 20d7a238d675..53adf5ab9c9f 100644
> > > > --- a/kernel/rcu/srcutree.c
> > > > +++ b/kernel/rcu/srcutree.c
> > > > @@ -1241,7 +1241,7 @@ static unsigned long srcu_gp_start_if_needed(struct srcu_struct *ssp,
> > > >  		sdp = raw_cpu_ptr(ssp->sda);
> > > >  	spin_lock_irqsave_sdp_contention(sdp, &flags);
> > > >  	if (rhp)
> > > > -		rcu_segcblist_enqueue(&sdp->srcu_cblist, rhp);
> > > > +		rcu_segcblist_enqueue(&sdp->srcu_cblist, rhp, true);
> > > >  	rcu_segcblist_advance(&sdp->srcu_cblist,
> > > >  			      rcu_seq_current(&ssp->srcu_sup->srcu_gp_seq));
> > > >  	s = rcu_seq_snap(&ssp->srcu_sup->srcu_gp_seq);
> > > > diff --git a/kernel/rcu/tasks.h b/kernel/rcu/tasks.h
> > > > index 8d65f7d576a3..7dec7c68f88f 100644
> > > > --- a/kernel/rcu/tasks.h
> > > > +++ b/kernel/rcu/tasks.h
> > > > @@ -362,7 +362,7 @@ static void call_rcu_tasks_generic(struct rcu_head *rhp, rcu_callback_t func,
> > > >  	}
> > > >  	if (needwake)
> > > >  		rtpcp->urgent_gp = 3;
> > > > -	rcu_segcblist_enqueue(&rtpcp->cblist, rhp);
> > > > +	rcu_segcblist_enqueue(&rtpcp->cblist, rhp, true);
> > > >  	raw_spin_unlock_irqrestore_rcu_node(rtpcp, flags);
> > > >  	if (unlikely(needadjust)) {
> > > >  		raw_spin_lock_irqsave(&rtp->cbs_gbl_lock, flags);
> > > > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> > > > index cb1caefa8bd0..e05cbff40dc7 100644
> > > > --- a/kernel/rcu/tree.c
> > > > +++ b/kernel/rcu/tree.c
> > > > @@ -2670,7 +2670,7 @@ __call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy_in)
> > > >  	if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy))
> > > >  		return; // Enqueued onto ->nocb_bypass, so just leave.
> > > >  	// If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock.
> > > > -	rcu_segcblist_enqueue(&rdp->cblist, head);
> > > > +	rcu_segcblist_enqueue(&rdp->cblist, head, lazy_in);
> > > >  	if (__is_kvfree_rcu_offset((unsigned long)func))
> > > >  		trace_rcu_kvfree_callback(rcu_state.name, head,
> > > >  					 (unsigned long)func,
> > > > 
> > 
> > Surprisingly, this survives from a whole rcutorture run ;-)
> > 
> > > > Sure, there may be some corner cases I'm missing, but I think overall
> > > > this is better than (sorta) duplicating the logic of seglist (the llist
> > > > in sr_normal_state) or the logic of wake_rcu_gp()
> > > > (synchronize_rcu_normal).
> > > > 
> > > > Anyway, these are just if-you-have-time-to-try options ;-)
> > > > 
> > > Hm.. You still mix callbacks and there is a dependency in order
> > > of execution. The callback process time also might be varied from
> > > one callback to another.
> > > 
> > > If you have many *_hurry() calls we end in the same situation. Apart
> > 
> > I plan to resolve that by only puting a call_rcu_hurry(wakeme_after_gp)
> > in the front of the list.
> > 
> > > of that we also have !CONFIG_RCU_NOCB_CPU path that is also covered
> > > by the patch that is in question.
> > 
> > I don't see why the above approach doesn't work for
> > !CONFIG_RCU_NOCB_CPU, but I maybe miss something here.
> > 
> Basically it does not work, because you do not fix the mixing "issue".
> I have been working on it and we agreed to separate it. Because it is
> just makes sense. The reason and the problem i see, i described in the
> commit message of v2.
> 
> >
> > Do you have a benchmark I can try out to see if my diff can achieve the
> > similar result? Thanks!
> > 
> There is no a good benchmark. But you can write it for sure. I tested
> three scenarios:
> 
> - Run a camera app on our Android devices. Measuring app launch in
>   milliseconds;
> - Doing synchronize_rcu() and kfree(ptr) simultaneously by 10K/etc
>   workers. It is important test case because we have a fallback to
>   this scenario for our kvfree_rcu_mightslepp() API.
> - I had a look at time delta of loading 100 kernel modules.
> 
> That were my main test cases.
> 
I will provide the patches and test steps, so you can try on.
Tomorrow i will send it!

--
Uladzislau Rezki
  
Boqun Feng Nov. 2, 2023, 1:04 a.m. UTC | #10
On Wed, Nov 01, 2023 at 04:35:05PM +0100, Uladzislau Rezki wrote:
[...]
> > Basically it does not work, because you do not fix the mixing "issue".
> > I have been working on it and we agreed to separate it. Because it is
> > just makes sense. The reason and the problem i see, i described in the
> > commit message of v2.

As I understand it, your point is "if we want synchronize_rcu() faster
in all the cases, then a separate list makes a lot of sense since it
won't be affected by different configs and etc.", right? If so, then
understood.

I don't have any problem on that your patch does a good work on making
synchronize_rcu() faster, and actually I don't think my proposal
necessarily blocks your patch. However, I wonder: why synchronize_rcu()
is more special than other call_rcu_hurry() cases? Sure,
synchronize_rcu() means blocking and unblocking ealier is always
desirable, but call_rcu_hurry() could also queue a callback that wake
up other thread, right? By making synchronize_rcu() faster, do we end up
making other call_rcu_hurry() slow? So in my proposal, as you can see,
I tried to be fair among all call_rcu_hurry() users, and hope that's
a better solution from the whole kernel viewpoint.

Also another fear I have is the following story:

(Let say your improvement gets merged. And 6 months later)

Someone shows up and say "hi, the synchronize_rcu() latency reduce work
is great, but we have 1024 CPUs, so the single list in sr_normal_state
becomes a bottleneck, synchronize_rcu() on some CPUs gets delayed by
other CPU's synchronize_rcu(), can we make the list percpu?"

(And 6 months later)

Someone shows up and say "hi, the percpu list for low latency
synchronize_rcu() is great, however, we want to save some CPU power by
putting CPUs into groups and naming one leader of each group to handle
synchronize_rcu() wakeups for the whole group, let's use kconfig
CONFIG_RCU_NOSR_CPU to control that feature"

Well, it's a story, so it may not happen, but I don't think the
possiblity of totally re-inventing RCU callback lists and NOCB is 0 ;-)

Anyway, I should stop being annoying here, I will use your test steps to
check my idea, and will let you know!

> > 
> > >
> > > Do you have a benchmark I can try out to see if my diff can achieve the
> > > similar result? Thanks!
> > > 
> > There is no a good benchmark. But you can write it for sure. I tested
> > three scenarios:
> > 
> > - Run a camera app on our Android devices. Measuring app launch in
> >   milliseconds;
> > - Doing synchronize_rcu() and kfree(ptr) simultaneously by 10K/etc
> >   workers. It is important test case because we have a fallback to
> >   this scenario for our kvfree_rcu_mightslepp() API.
> > - I had a look at time delta of loading 100 kernel modules.
> > 
> > That were my main test cases.
> > 
> I will provide the patches and test steps, so you can try on.
> Tomorrow i will send it!
> 

Thanks!

Regards,
Boqun

> --
> Uladzislau Rezki
  
Uladzislau Rezki Nov. 2, 2023, 12:35 p.m. UTC | #11
On Wed, Nov 01, 2023 at 06:04:40PM -0700, Boqun Feng wrote:
> On Wed, Nov 01, 2023 at 04:35:05PM +0100, Uladzislau Rezki wrote:
> [...]
> > > Basically it does not work, because you do not fix the mixing "issue".
> > > I have been working on it and we agreed to separate it. Because it is
> > > just makes sense. The reason and the problem i see, i described in the
> > > commit message of v2.
> 
> As I understand it, your point is "if we want synchronize_rcu() faster
> in all the cases, then a separate list makes a lot of sense since it
> won't be affected by different configs and etc.", right? If so, then
> understood.
> 
Indeed. That was the reason. Since the call is blocked a waiter can to
do the job until it is done!

> I don't have any problem on that your patch does a good work on making
> synchronize_rcu() faster, and actually I don't think my proposal
> necessarily blocks your patch. However, I wonder: why synchronize_rcu()
> is more special than other call_rcu_hurry() cases? Sure,
> synchronize_rcu() means blocking and unblocking ealier is always
> desirable, but call_rcu_hurry() could also queue a callback that wake
> up other thread, right? By making synchronize_rcu() faster, do we end up
> making other call_rcu_hurry() slow? So in my proposal, as you can see,
> I tried to be fair among all call_rcu_hurry() users, and hope that's
> a better solution from the whole kernel viewpoint.
> 
The sync call blocks the caller, as you noted. I agree with you that
your proposal does not block my work and that is good, otherwise it is
just pointless doing one thing separately. As for your proposal, hurry
callbacks are also important for LAZY configuration because for !LAZY
all of them are hurry.

For LAZY it makes sense to me to start executing the ones which are
hurry! It would be good to analyze it more.

So i agree with you.

> Also another fear I have is the following story:
> 
> (Let say your improvement gets merged. And 6 months later)
> 
> Someone shows up and say "hi, the synchronize_rcu() latency reduce work
> is great, but we have 1024 CPUs, so the single list in sr_normal_state
> becomes a bottleneck, synchronize_rcu() on some CPUs gets delayed by
> other CPU's synchronize_rcu(), can we make the list percpu?"
> 
> (And 6 months later)
> 
> Someone shows up and say "hi, the percpu list for low latency
> synchronize_rcu() is great, however, we want to save some CPU power by
> putting CPUs into groups and naming one leader of each group to handle
> synchronize_rcu() wakeups for the whole group, let's use kconfig
> CONFIG_RCU_NOSR_CPU to control that feature"
> 
> Well, it's a story, so it may not happen, but I don't think the
> possiblity of totally re-inventing RCU callback lists and NOCB is 0 ;-)
> 
I have tested 60K doing synchronize_rcu() and it still wins. That was
just data. But to be honest i do not consider such scenario to be ever
happen. I do not want to speculate here.

>
> Anyway, I should stop being annoying here, I will use your test steps to
> check my idea, and will let you know!
>
Well, no problem. Let's focus on testing :)

1st test case is to check kvfree_rcu_mightsleep() slow path and improve it:

<snip>
...
	/*
	 * Inline kvfree() after synchronize_rcu(). We can do
	 * it from might_sleep() context only, so the current
	 * CPU can pass the QS state.
	 */
	if (!success) {
		debug_rcu_head_unqueue((struct rcu_head *) ptr);
		synchronize_rcu();
		kvfree(ptr);
	}
}
EXPORT_SYMBOL_GPL(kvfree_call_rcu);
<snip>

as we might hit it if low memory condition. See below the steps how i test:

Apply below patch:

<snip>
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 189975f57e78..92a5cf6c0441 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -3492,8 +3492,11 @@ void kvfree_call_rcu(struct rcu_head *head, void *ptr)
 	 * only. For other places please embed an rcu_head to
 	 * your data.
 	 */
-	if (!head)
+	if (!head) {
 		might_sleep();
+		success = false;
+		goto sync_kvfree;
+	}
 
 	// Queue the object but don't yet schedule the batch.
 	if (debug_rcu_head_queue(ptr)) {
@@ -3531,6 +3534,7 @@ void kvfree_call_rcu(struct rcu_head *head, void *ptr)
 unlock_return:
 	krc_this_cpu_unlock(krcp, flags);
 
+sync_kvfree:
 	/*
 	 * Inline kvfree() after synchronize_rcu(). We can do
 	 * it from might_sleep() context only, so the current
<snip>

so we are reverted into a slow path always. To test it i use test_vmalloc.sh
test-suite as it contains single/double argument tests. It is located at the
./coding/linux.git/tools/testing/selftests/vm/.

<snip>
urezki@pc638:~$ sudo ./test_vmalloc.sh
...
parm:           use_huge:Use vmalloc_huge in fix_size_alloc_test (bool)
parm:           run_test_mask:Set tests specified in the mask.

                id: 1,    name: fix_size_alloc_test
                id: 2,    name: full_fit_alloc_test
                id: 4,    name: long_busy_list_alloc_test
                id: 8,    name: random_size_alloc_test
                id: 16,   name: fix_align_alloc_test
                id: 32,   name: random_size_align_alloc_test
                id: 64,   name: align_shift_alloc_test
                id: 128,  name: pcpu_alloc_test
                id: 256,  name: kvfree_rcu_1_arg_vmalloc_test
                id: 512,  name: kvfree_rcu_2_arg_vmalloc_test
                id: 1024, name: vm_map_ram_test
...
<snip>

256 is what we need. So run it. Example:

sudo ./test_vmalloc.sh run_test_mask=256 nr_threads=20 test_loop_count=1000

mask - is only our test case, nr_threads - number of worker doing single-kvfree
and each worker does it 1000 times.

Once it is completed you type "dmesg" to see the results. You will see
how many CPU cycles test took.

2. Also you can measure latency/duration of synchronize_rcu() using ftrace:

<snip>
root@pc638:/sys/kernel/debug/tracing# cat available_tracers 
blk mmiotrace function_graph function nop
root@pc638:/sys/kernel/debug/tracing# echo function_graph > current_tracer 
root@pc638:/sys/kernel/debug/tracing# cat current_tracer 
function_graph
root@pc638:/sys/kernel/debug/tracing# echo synchronize_rcu_normal > ./set_ftrace_filter 
root@pc638:/sys/kernel/debug/tracing# cat set_ftrace_filter 
synchronize_rcu_normal
root@pc638:/sys/kernel/debug/tracing# echo synchronize_rcu_normal > set_graph_function
root@pc638:/sys/kernel/debug/tracing#
<snip>

Run above test to see the duration. You will get a lot of data which
i usually plot for visualization.

3. Also i am testing loading a kernel modules, first 100:

<snip>
urezki@pc638:~$ cat load_modules.sh
#!/bin/bash

# Exclude test_vmalloc.ko
MODULES_LIST=(`find /lib/modules/$(uname -r) -type f \
        \( -iname "*.ko" -not -iname "test_vmalloc*" \) | awk -F"/" '{print $NF}' | sed 's/.ko//'`)

function moduleExist(){
        MODULE="$1"
        if lsmod | grep "$MODULE" &> /dev/null ; then
                return 0
        else
                return 1
        fi
}

i=0

for module_name in ${MODULES_LIST[@]}; do
        sudo modprobe $module_name

        if moduleExist ${module_name}; then
                ((i=i+1))
        fi

        if [ $i -eq 100 ]; then
                echo "Done 100 loading!"
                exit 0
        fi
done
urezki@pc638:~$
<snip>

time ./load_modules.sh to compare. I run it under qemu and usually i run Debian there.

4. Camera use case is what i also check since its launch time is important
for end user. I think you need some extra setup to perform it, so i skip it.

--
Uladzislau Rezki
  

Patch

diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 78554e7181dd..63e46a8f3e60 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -1384,6 +1384,121 @@  static void rcu_poll_gp_seq_end_unlocked(unsigned long *snap)
 		raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
 }
 
+/*
+ * There are three lists for handling synchronize_rcu() users.
+ * A first list corresponds to new coming users, second for users
+ * which wait for a grace period and third is for which a grace
+ * period is passed.
+ */
+static struct sr_normal_state {
+	struct llist_head curr;	/* request a GP users. */
+	struct llist_head wait;	/* wait for GP users. */
+	struct llist_head done;	/* ready for GP users. */
+
+	/*
+	 * In order to quickly add a batch of nodes to already
+	 * existing list, which is done, a tail is maintained.
+	 */
+	struct llist_node *wait_tail;
+} sr;
+
+/* Disabled by default. */
+static int rcu_normal_wake_from_gp;
+module_param(rcu_normal_wake_from_gp, int, 0644);
+
+static void rcu_sr_normal_complete(struct llist_node *node)
+{
+	struct rcu_synchronize *rs = container_of(
+		(struct rcu_head *) node, struct rcu_synchronize, head);
+	unsigned long oldstate = (unsigned long) rs->head.func;
+
+	WARN_ONCE(!poll_state_synchronize_rcu(oldstate),
+		"A full grace period is not passed yet: %lu",
+		rcu_seq_diff(get_state_synchronize_rcu(), oldstate));
+
+	/* Finally. */
+	complete(&rs->completion);
+}
+
+static void rcu_sr_normal_gp_cleanup_work(struct work_struct *work)
+{
+	struct llist_node *done, *rcu, *next;
+
+	done = llist_del_all(&sr.done);
+	if (!done)
+		return;
+
+	llist_for_each_safe(rcu, next, done)
+		rcu_sr_normal_complete(rcu);
+}
+static DECLARE_WORK(sr_normal_gp_cleanup, rcu_sr_normal_gp_cleanup_work);
+
+/*
+ * This is hard-coded and it is a maximum number of
+ * synchronize_rcu() users(might be +1 extra), which
+ * are awaken directly by the rcu_gp_kthread(). The
+ * reset is deferred to a dedicated worker.
+ */
+#define MAX_SR_WAKE_FROM_GP 5
+
+/*
+ * Helper function for rcu_gp_cleanup().
+ */
+static void rcu_sr_normal_gp_cleanup(void)
+{
+	struct llist_node *head, *tail, *pos;
+	int i = 0;
+
+	tail = READ_ONCE(sr.wait_tail);
+	head = llist_del_all(&sr.wait);
+
+	llist_for_each_safe(pos, head, head) {
+		rcu_sr_normal_complete(pos);
+
+		if (++i == MAX_SR_WAKE_FROM_GP) {
+			/* If last, process it also. */
+			if (head && !head->next)
+				continue;
+			break;
+		}
+	}
+
+	if (head) {
+		/* Can be not empty. */
+		llist_add_batch(head, tail, &sr.done);
+		queue_work(system_highpri_wq, &sr_normal_gp_cleanup);
+	}
+}
+
+/*
+ * Helper function for rcu_gp_init().
+ */
+static void rcu_sr_normal_gp_init(void)
+{
+	struct llist_node *head, *tail;
+
+	tail = llist_del_all(&sr.curr);
+	head = llist_reverse_order(tail);
+
+	if (!head)
+		return;
+
+	/*
+	 * A waiting list of GP should be empty on this step,
+	 * since a GP-kthread, rcu_gp_init() -> gp_cleanup(),
+	 * rolls it over. If not, it is a BUG, warn a user.
+	 */
+	WARN_ON_ONCE(!llist_empty(&sr.wait));
+
+	WRITE_ONCE(sr.wait_tail, tail);
+	llist_add_batch(head, tail, &sr.wait);
+}
+
+static void rcu_sr_normal_add_req(struct rcu_synchronize *rs)
+{
+	llist_add((struct llist_node *) &rs->head, &sr.curr);
+}
+
 /*
  * Initialize a new grace period.  Return false if no grace period required.
  */
@@ -1418,6 +1533,7 @@  static noinline_for_stack bool rcu_gp_init(void)
 	/* Record GP times before starting GP, hence rcu_seq_start(). */
 	rcu_seq_start(&rcu_state.gp_seq);
 	ASSERT_EXCLUSIVE_WRITER(rcu_state.gp_seq);
+	rcu_sr_normal_gp_init();
 	trace_rcu_grace_period(rcu_state.name, rcu_state.gp_seq, TPS("start"));
 	rcu_poll_gp_seq_start(&rcu_state.gp_seq_polled_snap);
 	raw_spin_unlock_irq_rcu_node(rnp);
@@ -1787,6 +1903,9 @@  static noinline void rcu_gp_cleanup(void)
 	}
 	raw_spin_unlock_irq_rcu_node(rnp);
 
+	// Make synchronize_rcu() users aware of the end of old grace period.
+	rcu_sr_normal_gp_cleanup();
+
 	// If strict, make all CPUs aware of the end of the old grace period.
 	if (IS_ENABLED(CONFIG_RCU_STRICT_GRACE_PERIOD))
 		on_each_cpu(rcu_strict_gp_boundary, NULL, 0);
@@ -3500,6 +3619,35 @@  static int rcu_blocking_is_gp(void)
 	return true;
 }
 
+/*
+ * Helper function for the synchronize_rcu() API.
+ */
+static void synchronize_rcu_normal(void)
+{
+	struct rcu_synchronize rs;
+
+	if (READ_ONCE(rcu_normal_wake_from_gp)) {
+		init_rcu_head_on_stack(&rs.head);
+		init_completion(&rs.completion);
+
+		/*
+		 * This code might be preempted, therefore take a GP
+		 * snapshot before adding a request.
+		 */
+		rs.head.func = (void *) get_state_synchronize_rcu();
+		rcu_sr_normal_add_req(&rs);
+
+		/* Kick a GP and start waiting. */
+		(void) start_poll_synchronize_rcu();
+
+		/* Now we can wait. */
+		wait_for_completion(&rs.completion);
+		destroy_rcu_head_on_stack(&rs.head);
+	} else {
+		wait_rcu_gp(call_rcu_hurry);
+	}
+}
+
 /**
  * synchronize_rcu - wait until a grace period has elapsed.
  *
@@ -3551,7 +3699,7 @@  void synchronize_rcu(void)
 		if (rcu_gp_is_expedited())
 			synchronize_rcu_expedited();
 		else
-			wait_rcu_gp(call_rcu_hurry);
+			synchronize_rcu_normal();
 		return;
 	}
 
diff --git a/kernel/rcu/tree_exp.h b/kernel/rcu/tree_exp.h
index 6d7cea5d591f..279a37beb05a 100644
--- a/kernel/rcu/tree_exp.h
+++ b/kernel/rcu/tree_exp.h
@@ -987,7 +987,7 @@  void synchronize_rcu_expedited(void)
 
 	/* If expedited grace periods are prohibited, fall back to normal. */
 	if (rcu_gp_is_normal()) {
-		wait_rcu_gp(call_rcu_hurry);
+		synchronize_rcu_normal();
 		return;
 	}