[1/1] Reduce synchronize_rcu() waiting time

Message ID 20230321102748.127923-1-urezki@gmail.com
State New
Headers
Series [1/1] Reduce synchronize_rcu() waiting time |

Commit Message

Uladzislau Rezki March 21, 2023, 10:27 a.m. UTC
  A call to a synchronize_rcu() can be expensive from time point of
view. Different workloads can be affected by this especially the
ones which use this API in its time critical sections.

For example in case of NOCB scenario the wakeme_after_rcu() callback
invocation depends on where in a nocb-list it is located. Below is an
example when it was the last out of ~3600 callbacks:

<snip>
  <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt CBs=3613 bl=28
...
  <...>-29      [001] ..... 21950.152578: rcu_invoke_callback: rcu_preempt rhp=00000000b2d6dee8 func=__free_vm_area_struct.cfi_jt
  <...>-29      [001] ..... 21950.152579: rcu_invoke_callback: rcu_preempt rhp=00000000a446f607 func=__free_vm_area_struct.cfi_jt
  <...>-29      [001] ..... 21950.152580: rcu_invoke_callback: rcu_preempt rhp=00000000a5cab03b func=__free_vm_area_struct.cfi_jt
  <...>-29      [001] ..... 21950.152581: rcu_invoke_callback: rcu_preempt rhp=0000000013b7e5ee func=__free_vm_area_struct.cfi_jt
  <...>-29      [001] ..... 21950.152582: rcu_invoke_callback: rcu_preempt rhp=000000000a8ca6f9 func=__free_vm_area_struct.cfi_jt
  <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
  <...>-29      [001] d..1. 21950.152625: rcu_batch_end: rcu_preempt CBs-invoked=3612 idle=....
<snip>

As performance results, i will provide it once we are ready with a
patch.

Signed-off-by: Uladzislau Rezki (Sony) <urezki@gmail.com>
---
 kernel/rcu/tree.c | 104 +++++++++++++++++++++++++++++++++++++++++++++-
 kernel/rcu/tree.h |   9 ++++
 2 files changed, 112 insertions(+), 1 deletion(-)
  

Comments

Qiuxu Zhuo March 21, 2023, 2:03 p.m. UTC | #1
> From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> Sent: Tuesday, March 21, 2023 6:28 PM
> [...]
> Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> 
> A call to a synchronize_rcu() can be expensive from time point of view.
> Different workloads can be affected by this especially the ones which use this
> API in its time critical sections.
> 

This is interesting and meaningful research. ;-)

> For example in case of NOCB scenario the wakeme_after_rcu() callback
> invocation depends on where in a nocb-list it is located. Below is an example
> when it was the last out of ~3600 callbacks:
> 
> <snip>
>   <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt
> CBs=3613 bl=28
> ...
>   <...>-29      [001] ..... 21950.152578: rcu_invoke_callback: rcu_preempt
> rhp=00000000b2d6dee8 func=__free_vm_area_struct.cfi_jt
>   <...>-29      [001] ..... 21950.152579: rcu_invoke_callback: rcu_preempt
> rhp=00000000a446f607 func=__free_vm_area_struct.cfi_jt
>   <...>-29      [001] ..... 21950.152580: rcu_invoke_callback: rcu_preempt
> rhp=00000000a5cab03b func=__free_vm_area_struct.cfi_jt
>   <...>-29      [001] ..... 21950.152581: rcu_invoke_callback: rcu_preempt
> rhp=0000000013b7e5ee func=__free_vm_area_struct.cfi_jt
>   <...>-29      [001] ..... 21950.152582: rcu_invoke_callback: rcu_preempt
> rhp=000000000a8ca6f9 func=__free_vm_area_struct.cfi_jt
>   <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt
> rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
>   <...>-29      [001] d..1. 21950.152625: rcu_batch_end: rcu_preempt CBs-
> invoked=3612 idle=....
> <snip>
>

Did the results above tell us that CBs-invoked=3612 during the time 21950.145313 ~ 21950.152625?

If possible, may I know the steps, commands, and related parameters to produce the results above?
Thank you!

- Qiuxu
 
> As performance results, i will provide it once we are ready with a patch.
> 
> Signed-off-by: Uladzislau Rezki (Sony) <urezki@gmail.com>
> ---
>  kernel/rcu/tree.c | 104
> +++++++++++++++++++++++++++++++++++++++++++++-
[...]
  
Uladzislau Rezki March 21, 2023, 3:15 p.m. UTC | #2
On Tue, Mar 21, 2023 at 02:03:19PM +0000, Zhuo, Qiuxu wrote:
> > From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > Sent: Tuesday, March 21, 2023 6:28 PM
> > [...]
> > Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> > 
> > A call to a synchronize_rcu() can be expensive from time point of view.
> > Different workloads can be affected by this especially the ones which use this
> > API in its time critical sections.
> > 
> 
> This is interesting and meaningful research. ;-)
> 
> > For example in case of NOCB scenario the wakeme_after_rcu() callback
> > invocation depends on where in a nocb-list it is located. Below is an example
> > when it was the last out of ~3600 callbacks:
> > 
> > <snip>
> >   <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt
> > CBs=3613 bl=28
> > ...
> >   <...>-29      [001] ..... 21950.152578: rcu_invoke_callback: rcu_preempt
> > rhp=00000000b2d6dee8 func=__free_vm_area_struct.cfi_jt
> >   <...>-29      [001] ..... 21950.152579: rcu_invoke_callback: rcu_preempt
> > rhp=00000000a446f607 func=__free_vm_area_struct.cfi_jt
> >   <...>-29      [001] ..... 21950.152580: rcu_invoke_callback: rcu_preempt
> > rhp=00000000a5cab03b func=__free_vm_area_struct.cfi_jt
> >   <...>-29      [001] ..... 21950.152581: rcu_invoke_callback: rcu_preempt
> > rhp=0000000013b7e5ee func=__free_vm_area_struct.cfi_jt
> >   <...>-29      [001] ..... 21950.152582: rcu_invoke_callback: rcu_preempt
> > rhp=000000000a8ca6f9 func=__free_vm_area_struct.cfi_jt
> >   <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt
> > rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
> >   <...>-29      [001] d..1. 21950.152625: rcu_batch_end: rcu_preempt CBs-
> > invoked=3612 idle=....
> > <snip>
> >
> 
> Did the results above tell us that CBs-invoked=3612 during the time 21950.145313 ~ 21950.152625?
> 
Yes.

>
> If possible, may I know the steps, commands, and related parameters to produce the results above?
> Thank you!
> 
Build the kernel with CONFIG_RCU_TRACE configuration. Update your "set_event"
file with appropriate traces:

<snip>
XQ-DQ54:/sys/kernel/tracing # echo rcu:rcu_batch_start rcu:rcu_batch_end rcu:rcu_invoke_callback > set_event

XQ-DQ54:/sys/kernel/tracing # cat set_event
rcu:rcu_batch_start
rcu:rcu_invoke_callback
rcu:rcu_batch_end
XQ-DQ54:/sys/kernel/tracing #
<snip>

Collect traces as much as you want: XQ-DQ54:/sys/kernel/tracing # echo 1 > tracing_on; sleep 10; echo 0 > tracing_on
Next problem is how to parse it. Of course you will not be able to parse
megabytes of traces. For that purpose i use a special C trace parser.
If you need an example please let me know i can show here.

--
Uladzislau Rezki
  
Qiuxu Zhuo March 22, 2023, 1:49 a.m. UTC | #3
Hi Rezki,

> From: Uladzislau Rezki <urezki@gmail.com>
> Sent: Tuesday, March 21, 2023 11:16 PM
> [...]
> >
> >
> > If possible, may I know the steps, commands, and related parameters to
> produce the results above?
> > Thank you!
> >
> Build the kernel with CONFIG_RCU_TRACE configuration. Update your
> "set_event"
> file with appropriate traces:
> 
> <snip>
> XQ-DQ54:/sys/kernel/tracing # echo rcu:rcu_batch_start rcu:rcu_batch_end
> rcu:rcu_invoke_callback > set_event
> 
> XQ-DQ54:/sys/kernel/tracing # cat set_event rcu:rcu_batch_start
> rcu:rcu_invoke_callback rcu:rcu_batch_end XQ-DQ54:/sys/kernel/tracing #
> <snip>
> 
> Collect traces as much as you want: XQ-DQ54:/sys/kernel/tracing # echo 1 >
> tracing_on; sleep 10; echo 0 > tracing_on Next problem is how to parse it. Of

Thanks for the detailed steps to collect the related testing results.

> course you will not be able to parse megabytes of traces. For that purpose i
> use a special C trace parser.
> If you need an example please let me know i can show here.
>

Yes, your example parser should save me from a huge amount of traces. ;-)
Thanks so much for your sharing.

- Qiuxu

> --
> Uladzislau Rezki
  
Uladzislau Rezki March 22, 2023, 6:50 a.m. UTC | #4
On Wed, Mar 22, 2023 at 01:49:50AM +0000, Zhuo, Qiuxu wrote:
> Hi Rezki,
> 
> > From: Uladzislau Rezki <urezki@gmail.com>
> > Sent: Tuesday, March 21, 2023 11:16 PM
> > [...]
> > >
> > >
> > > If possible, may I know the steps, commands, and related parameters to
> > produce the results above?
> > > Thank you!
> > >
> > Build the kernel with CONFIG_RCU_TRACE configuration. Update your
> > "set_event"
> > file with appropriate traces:
> > 
> > <snip>
> > XQ-DQ54:/sys/kernel/tracing # echo rcu:rcu_batch_start rcu:rcu_batch_end
> > rcu:rcu_invoke_callback > set_event
> > 
> > XQ-DQ54:/sys/kernel/tracing # cat set_event rcu:rcu_batch_start
> > rcu:rcu_invoke_callback rcu:rcu_batch_end XQ-DQ54:/sys/kernel/tracing #
> > <snip>
> > 
> > Collect traces as much as you want: XQ-DQ54:/sys/kernel/tracing # echo 1 >
> > tracing_on; sleep 10; echo 0 > tracing_on Next problem is how to parse it. Of
> 
> Thanks for the detailed steps to collect the related testing results.
> 
> > course you will not be able to parse megabytes of traces. For that purpose i
> > use a special C trace parser.
> > If you need an example please let me know i can show here.
> >
> 
> Yes, your example parser should save me from a huge amount of traces. ;-)
> Thanks so much for your sharing.
> 
See below the C program. It is ugly but it does what you need. Please have
a look at main function:

<snip>
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <ctype.h>
#include <string.h>

/*
 * Splay-tree implementation to store data: key,value
 * See https://en.wikipedia.org/wiki/Splay_tree
 */
#define offsetof(TYPE, MEMBER)	((unsigned long)&((TYPE *)0)->MEMBER)
#define container_of(ptr, type, member)			\
({												\
	void *__mptr = (void *)(ptr);					\
	((type *)(__mptr - offsetof(type, member)));	\
})

#define SP_INIT_NODE(node)									\
	((node)->left = (node)->right = (node)->parent = NULL)

struct splay_node {
	struct splay_node *left;
	struct splay_node *right;
	struct splay_node *parent;
	unsigned long val;
};

struct splay_root {
	struct splay_node *sp_root;
};

static inline void
set_parent(struct splay_node *n, struct splay_node *p)
{
	if (n)
		n->parent = p;
}

static inline void
change_child(struct splay_node *p,
	struct splay_node *old, struct splay_node *new)
{
	if (p) {
		if (p->left == old)
			p->left = new;
		else
			p->right = new;
	}
}

/*
 * left rotation of node (r), (rc) is (r)'s right child
 */
static inline struct splay_node *
left_pivot(struct splay_node *r)
{
	struct splay_node *rc;

	/*
	 * set (rc) to be the new root
	 */
	rc = r->right;

	/*
	 * point parent to new left/right child
	 */
	rc->parent = r->parent;

	/*
	 * change child of the p->parent.
	 */
	change_child(r->parent, r, rc);

	/*
	 * set (r)'s right child to be (rc)'s left child
	 */
	r->right = rc->left;

	/*
	 * change parent of rc's left child
	 */
	set_parent(rc->left, r);

	/*
	 * set new parent of rotated node
	 */
	r->parent = rc;

	/*
	 * set (rc)'s left child to be (r)
	 */
	rc->left = r;

	/*
	 * return the new root
	 */
	return rc;
}

/*
 * right rotation of node (r), (lc) is (r)'s left child
 */
static inline struct splay_node *
right_pivot(struct splay_node *r)
{
	struct splay_node *lc;

	/*
	 * set (lc) to be the new root
	 */
	lc = r->left;

	/*
	 * point parent to new left/right child
	 */
	lc->parent = r->parent;

	/*
	 * change child of the p->parent.
	 */
	change_child(r->parent, r, lc);

	/*
	 * set (r)'s left child to be (lc)'s right child
	 */
	r->left = lc->right;

	/*
	 * change parent of lc's right child
	 */
	set_parent(lc->right, r);

	/*
	 * set new parent of rotated node
	 */
	r->parent = lc;

	/*
	 * set (lc)'s right child to be (r)
	 */
	lc->right = r;

	/*
	 * return the new root
	 */
	return lc;
}

static struct splay_node *
top_down_splay(unsigned long vstart,
	struct splay_node *root, struct splay_root *sp_root)
{
	/*
	 * During the splitting process two temporary trees are formed.
	 * "l" contains all keys less than the search key/vstart and "r"
	 * contains all keys greater than the search key/vstart.
	 */
	struct splay_node head, *ltree_max, *rtree_max;
	struct splay_node *ltree_prev, *rtree_prev;

	if (root == NULL)
		return NULL;

	SP_INIT_NODE(&head);
	ltree_max = rtree_max = &head;
	ltree_prev = rtree_prev = NULL;

	while (1) {
		if (vstart < root->val && root->left) {
			if (vstart < root->left->val) {
				root = right_pivot(root);

				if (root->left == NULL)
					break;
			}

			/*
			 * Build right subtree.
			 */
			rtree_max->left = root;
			rtree_max->left->parent = rtree_prev;
			rtree_max = rtree_max->left;
			rtree_prev = root;
			root = root->left;
		} else if (vstart > root->val && root->right) {
			if (vstart > root->right->val) {
				root = left_pivot(root);

				if (root->right == NULL)
					break;
			}

			/*
			 * Build left subtree.
			 */
			ltree_max->right = root;
			ltree_max->right->parent = ltree_prev;
			ltree_max = ltree_max->right;
			ltree_prev = root;
			root = root->right;
		} else {
			break;
		}
	}

	/*
	 * Assemble the tree.
	 */
	ltree_max->right = root->left;
	rtree_max->left = root->right;
	root->left = head.right;
	root->right = head.left;

	set_parent(ltree_max->right, ltree_max);
	set_parent(rtree_max->left, rtree_max);
	set_parent(root->left, root);
	set_parent(root->right, root);
	root->parent = NULL;

	/*
	 * Set new root. Please note it might be the same.
	 */
	sp_root->sp_root = root;
	return sp_root->sp_root;
}

struct splay_node *
splay_search(unsigned long key, struct splay_root *root)
{
	struct splay_node *n;

	n = top_down_splay(key, root->sp_root, root);
	if (n && n->val == key)
		return n;

	return NULL;
}

static bool
splay_insert(struct splay_node *n, struct splay_root *sp_root)
{
	struct splay_node *r;

	SP_INIT_NODE(n);

	r = top_down_splay(n->val, sp_root->sp_root, sp_root);
	if (r == NULL) {
		/* First element in the tree */
		sp_root->sp_root = n;
		return false;
	}

	if (n->val < r->val) {
		n->left = r->left;
		n->right = r;

		set_parent(r->left, n);
		r->parent = n;
		r->left = NULL;
	} else if (n->val > r->val) {
		n->right = r->right;
		n->left = r;

		set_parent(r->right, n);
		r->parent = n;
		r->right = NULL;
	} else {
		/*
		 * Same, indicate as not success insertion.
		 */
		return false;
	}

	sp_root->sp_root = n;
	return true;
}

static bool
splay_delete_init(struct splay_node *n, struct splay_root *sp_root)
{
	struct splay_node *subtree[2];
	unsigned long val = n->val;

	/* 1. Splay the node to the root. */
	n = top_down_splay(n->val, sp_root->sp_root, sp_root);
	if (n == NULL || n->val != val)
		return false;

	/* 2. Save left/right sub-trees. */
	subtree[0] = n->left;
	subtree[1] = n->right;

	/* 3. Now remove the node. */
	SP_INIT_NODE(n);

	if (subtree[0]) {
		/* 4. Splay the largest node in left sub-tree to the root. */
		top_down_splay(val, subtree[0], sp_root);

		/* 5. Attach the right sub-tree as the right child of the left sub-tree. */
		sp_root->sp_root->right = subtree[1];

		/* 6. Update the parent of right sub-tree */
		set_parent(subtree[1], sp_root->sp_root);
	} else {
		/* 7. Left sub-tree is NULL, just point to right one. */
		sp_root->sp_root = subtree[1];
	}

	/* 8. Set parent of root node to NULL. */
	if (sp_root->sp_root)
		sp_root->sp_root->parent = NULL;

	return true;
}

static FILE *
open_perf_script_file(const char *path)
{
	FILE *f = NULL;

	if (path == NULL)
		goto out;

	f = fopen(path, "r");
	if (!f)
		goto out;

out:
	return f;
}

static int
get_one_line(FILE *file, char *buf, size_t len)
{
	int i = 0;

	memset(buf, '\0', len);

	for (i = 0; i < len - 1; i++) {
		int c = fgetc(file);

		if (c == EOF)
			return EOF;

		if (c == '\n')
			break;

		if (c != '\r')
			buf[i] = c;
	}

	return i;
}

static int
read_string_till_string(char *buf, char *out, size_t out_len, char *in, size_t in_len)
{
	int i, j;

	memset(out, '\0', out_len);

	for (i = 0; i < out_len; i++) {
		if (buf[i] != in[0]) {
			out[i] = buf[i];
			continue;
		}

		for (j = 0; j < in_len; j++) {
			if (buf[i + j] != in[j])
				break;
		}

		/* Found. */
		if (j == in_len)
			return 1;
	}

	return 0;
}

/*
 * find pattern is  "something [003] 8640.034785: something"
 */
static inline void
get_cpu_sec_usec_in_string(const char *s, int *cpu, int *sec, int *usec)
{
	char usec_buf[32] = {'\0'};
	char sec_buf[32] = {'\0'};
	char cpu_buf[32] = {'\0'};
	bool found_sec = false;
	bool found_usec = false;
	bool found_cpu = false;
	int i, j, dot;

	*cpu = *sec = *usec = -1;

	for (i = 0, j = 0; s[i] != '\0'; i++) {
		if (s[i] == '.') {
			dot = i++;

			/* take microseconds */
			for (j = 0; j < sizeof(usec_buf); j++) {
				if (isdigit(s[i])) {
					usec_buf[j] = s[i];
				} else {
					if (s[i] == ':' && j > 0)
						found_usec = true;
					else
						found_usec = false;

					/* Terminate here. */
					break;
				}

				i++;
			}

			if (found_usec) {
				/* roll back */
				while (s[i] != ' ' && i > 0)
					i--;

				/* take seconds */
				for (j = 0; j < sizeof(sec_buf); j++) {
					if (isdigit(s[++i])) {
						sec_buf[j] = s[i];
					} else {
						if (s[i] == '.' && j > 0)
							found_sec = true;
						else
							found_sec = false;
						
						/* Terminate here. */
						break;
					}
				}
			}

			if (found_sec && found_usec) {
				/* roll back */
				while (s[i] != '[' && i > 0)
					i--;

				/* take seconds */
				for (j = 0; j < sizeof(cpu_buf); j++) {
					if (isdigit(s[++i])) {
						cpu_buf[j] = s[i];
					} else {
						if (s[i] == ']' && j > 0)
							found_cpu = true;
						else
							found_cpu = false;
						
						/* Terminate here. */
						break;
					}
				}

				if (found_cpu && found_sec && found_usec) {
					*sec = atoi(sec_buf);
					*usec = atoi(usec_buf);
					*cpu = atoi(cpu_buf);
					return;
				}
			}

			/*
			 * Check next dot pattern.
			 */
			found_sec = false;
			found_usec = false;
			found_cpu = false;
			i = dot;
		}
	}
}

/*
 * find pattern is  "something comm=foo android thr1 pid=123 something"
 */
static inline int
get_comm_pid_in_string(const char *buf, char *comm, ssize_t len, int *pid)
{
	char *sc, *sp;
	int rv, i;

	memset(comm, '\0', len);

	sc = strstr(buf, "comm=");
	if (sc)
		sp = strstr(sc, " pid=");

	if (!sc || !sp)
		return -1;

	for (i = 0, sc += 5; sc != sp; i++) {
		if (i < len) {
			if (*sc == ' ')
				comm[i] = '-';
			else
				comm[i] = *sc;

			sc++;
		}
	}

	/* Read pid. */
	rv = sscanf(sp, " pid=%d", pid);
	if (rv != 1)
		return -1;

	return 1;
}

static void
perf_script_softirq_delay(FILE *file, int delay_usec)
{
	char buf[4096] = { '\0' };
	char buf_1[4096] = { '\0' };
	long offset;
	char *s;
	int rv;

	while (1) {
		rv = get_one_line(file, buf, sizeof(buf));
		offset = ftell(file);

		if (rv != EOF) {
			s = strstr(buf, "irq:softirq_raise:");
			if (s) {
				char extra[512] = {'\0'};
				int sec_0, usec_0;
				int sec_1, usec_1;
				int handle_vector;
				int rise_vector;
				int cpu_0;
				int cpu_1;

				/*
				 * swapper     0    [000]  6010.619854:  irq:softirq_raise: vec=7 [action=SCHED]
				 * android.bg  3052 [001]  6000.076212:  irq:softirq_entry: vec=9 [action=RCU]
				 */
				(void) sscanf(s, "%s vec=%d", extra, &rise_vector);
				get_cpu_sec_usec_in_string(buf, &cpu_0, &sec_0, &usec_0);

				while (1) {
					rv = get_one_line(file, buf_1, sizeof(buf_1));
					if (rv == EOF)
						break;

					s = strstr(buf_1, "irq:softirq_entry:");
					if (s) {
						(void) sscanf(s, "%s vec=%d", extra, &handle_vector);
						get_cpu_sec_usec_in_string(buf_1, &cpu_1, &sec_1, &usec_1);

						if (cpu_0 == cpu_1 && rise_vector == handle_vector) {
							int delta_time_usec = (sec_1 - sec_0) * 1000000 + (usec_1 - usec_0);

							if (delta_time_usec > delay_usec)
								fprintf(stdout, "{\n%s\n%s\n} diff %d usec\n", buf, buf_1, delta_time_usec);
							break;
						}
					}
				}
			}

			rv = fseek(file, offset, SEEK_SET);
			if (rv)
				fprintf(stdout, "fseek error !!!\n");
		} else {
			break;
		}
	}
}

static void
perf_script_softirq_duration(FILE *file, int duration_usec)
{
	char buf[4096] = { '\0' };
	char buf_1[4096] = { '\0' };
	long offset;
	char *s;
	int rv;

	while (1) {
		rv = get_one_line(file, buf, sizeof(buf));
		offset = ftell(file);

		if (rv != EOF) {
			s = strstr(buf, "irq:softirq_entry:");
			if (s) {
				char extra[512] = {'\0'};
				int sec_0, usec_0;
				int sec_1, usec_1;
				int handle_vector;
				int rise_vector;
				int cpu_0;
				int cpu_1;

				/*
				 * swapper     0    [000]  6010.619854:  irq:softirq_entry: vec=7 [action=SCHED]
				 * android.bg  3052 [001]  6000.076212:  irq:softirq_exit: vec=9 [action=RCU]
				 */
				(void) sscanf(s, "%s vec=%d", extra, &rise_vector);
				get_cpu_sec_usec_in_string(buf, &cpu_0, &sec_0, &usec_0);

				while (1) {
					rv = get_one_line(file, buf_1, sizeof(buf_1));
					if (rv == EOF)
						break;

					s = strstr(buf_1, "irq:softirq_exit:");
					if (s) {
						(void) sscanf(s, "%s vec=%d", extra, &handle_vector);
						get_cpu_sec_usec_in_string(buf_1, &cpu_1, &sec_1, &usec_1);

						if (cpu_0 == cpu_1 && rise_vector == handle_vector) {
							int delta_time_usec = (sec_1 - sec_0) * 1000000 + (usec_1 - usec_0);

							if (delta_time_usec > duration_usec)
								fprintf(stdout, "{\n%s\n%s\n} diff %d usec\n", buf, buf_1, delta_time_usec);
							break;
						}
					}
				}
			}

			rv = fseek(file, offset, SEEK_SET);
			if (rv)
				fprintf(stdout, "fseek error !!!\n");
		} else {
			break;
		}
	}
}

static void
perf_script_hardirq_duration(FILE *file, int duration_msec)
{
	char buf[4096] = { '\0' };
	char buf_1[4096] = { '\0' };
	long offset;
	char *s;
	int rv;

	while (1) {
		rv = get_one_line(file, buf, sizeof(buf));
		offset = ftell(file);

		if (rv != EOF) {
			s = strstr(buf, "irq:irq_handler_entry:");
			if (s) {
				char extra[512] = {'\0'};
				int sec_0, usec_0;
				int sec_1, usec_1;
				int handle_vector;
				int rise_vector;
				int cpu_0;
				int cpu_1;

				/*
				 * swapper     0 [002]  6205.804133: irq:irq_handler_entry: irq=11 name=arch_timer
				 * swapper     0 [002]  6205.804228:  irq:irq_handler_exit: irq=11 ret=handled
				 */
				(void) sscanf(s, "%s irq=%d", extra, &rise_vector);
				get_cpu_sec_usec_in_string(buf, &cpu_0, &sec_0, &usec_0);

				while (1) {
					rv = get_one_line(file, buf_1, sizeof(buf_1));
					if (rv == EOF)
						break;

					s = strstr(buf_1, "irq:irq_handler_exit:");
					if (s) {
						(void) sscanf(s, "%s irq=%d", extra, &handle_vector);
						get_cpu_sec_usec_in_string(buf_1, &cpu_1, &sec_1, &usec_1);

						if (cpu_0 == cpu_1 && rise_vector == handle_vector) {
							int delta_time_usec = (sec_1 - sec_0) * 1000000 + (usec_1 - usec_0);

							if (delta_time_usec > duration_msec)
								fprintf(stdout, "{\n%s\n%s\n} diff %d usec\n", buf, buf_1, delta_time_usec);
							break;
						}
					}
				}
			}

			rv = fseek(file, offset, SEEK_SET);
			if (rv)
				fprintf(stdout, "fseek error !!!\n");
		} else {
			break;
		}
	}
}

struct irq_stat {
	int irq;
	int count;
	char irq_name[512];

	int min_interval;
	int max_interval;
	int avg_interval;

	unsigned int time_stamp_usec;
	struct splay_node node;
};

static struct irq_stat *
new_irq_node_init(int irq, char *irq_name)
{
	struct irq_stat *n = calloc(1, sizeof(*n));

	if (n) {
		n->irq = irq;
		(void) strncpy(n->irq_name, irq_name, sizeof(n->irq_name));
		n->node.val = irq;
	}

	return n;
}

static void
perf_script_hardirq_stat(FILE *file)
{
	struct splay_root sproot = { NULL };
	struct irq_stat *node;
	char buf[4096] = { '\0' };
	char extra[256] = {'\0'};
	char irq_name[256] = {'\0'};
	unsigned int time_stamp_usec;
	int cpu, sec, usec;
	int rv, irq;
	char *s;

	while (1) {
		rv = get_one_line(file, buf, sizeof(buf));
		if (rv == EOF)
			break;

		 s = strstr(buf, "irq:irq_handler_entry:");
		 if (s == NULL)
			 continue;

		 /*
		  * format is as follow one:
		  * sleep  1418 [003]  8780.957112:             irq:irq_handler_entry: irq=11 name=arch_timer
		  */
		 rv = sscanf(s, "%s irq=%d name=%s", extra, &irq, irq_name);
	 	 if (rv != 3)
	 		 continue;

		 get_cpu_sec_usec_in_string(buf, &cpu, &sec, &usec);
		 time_stamp_usec = (sec * 1000000) + usec;

		 if (sproot.sp_root == NULL) {
			 node = new_irq_node_init(irq, irq_name);
			 if (node)
				 splay_insert(&node->node, &sproot);
		 }

		 top_down_splay(irq, sproot.sp_root, &sproot);
		 node = container_of(sproot.sp_root, struct irq_stat, node);

		 /* Found the entry in the tree. */
		 if (node->irq == irq) {
			 if (node->time_stamp_usec) {
				 unsigned int delta = time_stamp_usec - node->time_stamp_usec;

				 if (delta < node->min_interval || !node->min_interval)
					 node->min_interval = delta;

				 if (delta > node->max_interval)
					 node->max_interval = delta;

				 node->avg_interval += delta;
			 }

			 /* Save the last time for this IRQ entry. */
			 node->time_stamp_usec = time_stamp_usec;
		 } else {
			 /* Allocate a new record and place it to the tree. */
			 node = new_irq_node_init(irq, irq_name);
			 if (node)
				 splay_insert(&node->node, &sproot);

		 }

		 /* Update the timestamp for this entry. */
		 node->time_stamp_usec = time_stamp_usec;
		 node->count++;
	}

	/* Dump the tree. */
	while (sproot.sp_root) {
		node = container_of(sproot.sp_root, struct irq_stat, node);

		fprintf(stdout, "irq: %5d name: %30s count: %7d, min: %10d, max: %10d, avg: %10d\n",
				node->irq, node->irq_name, node->count,
				node->min_interval, node->max_interval, node->avg_interval / node->count);

		splay_delete_init(&node->node, &sproot);
		free(node);
	}

	fprintf(stdout, "\tRun './a.out ./perf.script | sort -nk 6' to sort by column 6.\n");
}

struct sched_waking {
	unsigned int wakeup_nr;
	char comm[4096];
	int pid;

	int min;
	int max;
	int avg;

	unsigned int time_stamp_usec;
	struct splay_node node;
};

static struct sched_waking *
new_sched_waking_node_init(int pid, char *comm)
{
	struct sched_waking *n = calloc(1, sizeof(*n));

	if (n) {
		n->pid = pid;
		(void) strncpy(n->comm, comm, sizeof(n->comm));
		n->node.val = pid;
	}

	return n;
}

/*
 * How many times a task is awaken + min/max/avg stat.
 */
static void
perf_script_sched_waking_stat(const char *name, FILE *file, const char *script)
{
	struct splay_root sroot = { NULL };
	struct sched_waking *n;
	char buf[4096] = { '\0' };
	char comm[256] = {'\0'};
	unsigned int time_stamp_usec;
	unsigned int total_waking = 0;
	int cpu, sec, usec;
	int rv, pid;
	char *s;
	
	while (1) {
		rv = get_one_line(file, buf, sizeof(buf));
		if (rv == EOF)
			break;
		/*
		 * format is as follow one:
		 * foo[1] 7521 [002] 10.431216: sched_waking: comm=tr pid=2 prio=120 target_cpu=006
		 */
		s = strstr(buf, "sched_waking:");
		if (s == NULL)
			continue;

		rv = get_comm_pid_in_string(s, comm, sizeof(comm), &pid);
		if (rv < 0) {
			printf("ERROR: skip entry...\n");
			continue;
		}

		get_cpu_sec_usec_in_string(buf, &cpu, &sec, &usec);
		time_stamp_usec = (sec * 1000000) + usec;

		if (sroot.sp_root == NULL) {
			n = new_sched_waking_node_init(pid, comm);
			if (n)
				splay_insert(&n->node, &sroot);
		}

		top_down_splay(pid, sroot.sp_root, &sroot);
		n = container_of(sroot.sp_root, struct sched_waking, node);

		/* Found the entry in the tree. */
		if (n->pid == pid) {
			if (n->time_stamp_usec) {
				unsigned int delta = time_stamp_usec - n->time_stamp_usec;

				if (delta < n->min || !n->min)
					n->min = delta;

				if (delta > n->max)
					n->max = delta;

				n->avg += delta;
			}

			/* Save the last time for this wake-up entry. */
			n->time_stamp_usec = time_stamp_usec;
		} else {
			/* Allocate a new record and place it to the tree. */
			n = new_sched_waking_node_init(pid, comm);
			if (n)
				splay_insert(&n->node, &sroot);
		}

		/* Update the timestamp for this entry. */
		n->time_stamp_usec = time_stamp_usec;
		n->wakeup_nr++;
	}

	/* Dump the Splay-tree. */
	while (sroot.sp_root) {
		n = container_of(sroot.sp_root, struct sched_waking, node);
		fprintf(stdout, "name: %30s pid: %10d woken-up %5d\tinterval: min %5d\tmax %5d\tavg %5d\n",
			n->comm, n->pid, n->wakeup_nr,
			n->min, n->max, n->avg / n->wakeup_nr);

		total_waking += n->wakeup_nr;
		splay_delete_init(&n->node, &sroot);
		free(n);
	}

	fprintf(stdout, "=== Total: %u ===\n", total_waking);
	fprintf(stdout, "\tRun './%s %s | sort -nk 6' to sort by column 6.\n", name, script);
}

/*
 * Latency of try_to_wake_up path + select a CPU + placing a task into run-queue.
 */
static void
perf_script_sched_wakeup_latency(const char *name, FILE *file, const char *script)
{
	struct splay_root sroot = { NULL };
	struct splay_node *node;
	struct sched_waking *n;
	char buf[4096] = { '\0' };
	char comm[256] = {'\0'};
	unsigned int time_stamp_usec;
	unsigned int wakeup_latency_usec;
	unsigned int total_waking = 0;
	int cpu, sec, usec;
	int rv, pid;
	char *sched_waking_wakeup;
	bool sched_waking_event;

	while (1) {
		rv = get_one_line(file, buf, sizeof(buf));
		if (rv == EOF)
			break;

		/*
		 * format is as follow one:
		 * foo[1] 7521 [002] 10.431216: sched_waking: comm=tr pid=2 prio=120 target_cpu=006
		 */
		sched_waking_wakeup = strstr(buf, "sched_waking:");
		sched_waking_event = !!sched_waking_wakeup;

		if (!sched_waking_event) {
			/*
			 * format is as follow one:
			 * foo[1] 7521 [002] 10.431216: sched_wakeup: comm=tr pid=2 prio=120 target_cpu=006
			 */
			sched_waking_wakeup = strstr(buf, "sched_wakeup:");
			if (sched_waking_wakeup == NULL)
				continue;
		}

		rv = get_comm_pid_in_string(sched_waking_wakeup, comm, sizeof(comm), &pid);
		if (rv < 0) {
			printf("ERROR: skip entry...\n");
			continue;
		}

		get_cpu_sec_usec_in_string(buf, &cpu, &sec, &usec);
		time_stamp_usec = (sec * 1000000) + usec;

		/*
		 * Let's check if it exists, if so update it
		 * otherwise create a new node and insert.
		 */
		if (sched_waking_event) {
			node = top_down_splay(pid, sroot.sp_root, &sroot);
			if (node == NULL) {
				n = new_sched_waking_node_init(pid, comm);
				splay_insert(&n->node, &sroot);
			} else {
				n = container_of(node, struct sched_waking, node);
				if (n->pid != pid) {
					n = new_sched_waking_node_init(pid, comm);
					splay_insert(&n->node, &sroot);
				}
			}

			n->time_stamp_usec = time_stamp_usec;
			continue;
		}

		node = top_down_splay(pid, sroot.sp_root, &sroot);
		if (node == NULL) {
			fprintf(stdout, "error: no sched_waking event for %d pid yet.\n", pid);
			continue;
		}

		n = container_of(node, struct sched_waking, node);
		if (n->pid != pid) {
			fprintf(stdout, "error: sched_wakeup event does not match with any sched_waking event.\n");
			continue;
		}

		wakeup_latency_usec = time_stamp_usec - n->time_stamp_usec;

		if (wakeup_latency_usec < n->min || !n->min)
			n->min = wakeup_latency_usec;

		if (wakeup_latency_usec > n->max)
			n->max = wakeup_latency_usec;

		if (n->avg + wakeup_latency_usec < n->avg)
			fprintf(stdout, "error: counter is overflowed...\n");

		fprintf(stdout, "%s: %d wake-up latency: %u waken on %d CPU\n", comm, pid, wakeup_latency_usec, cpu);

		n->avg += wakeup_latency_usec;
		n->wakeup_nr++;
	}

	/* Dump the Splay-tree. */
	while (sroot.sp_root) {
		n = container_of(sroot.sp_root, struct sched_waking, node);
		/* fprintf(stdout, "name: %30s pid: %10d woken-up %5d\twakeup-latency: min %5d\tmax %5d\tavg %5d\n", */
		/* 	n->comm, n->pid, n->wakeup_nr, */
		/* 	n->min, n->max, n->avg / n->wakeup_nr); */

		total_waking += n->wakeup_nr;
		splay_delete_init(&n->node, &sroot);
		free(n);
	}

	/* fprintf(stdout, "=== Total: %u ===\n", total_waking); */
	/* fprintf(stdout, "\tRun '%s %s | sort -nk 6' to sort by column 6.\n", name, script); */
}


/*
 * Requires:
 *   rcu:rcu_batch_start
 *   rcu:rcu_invoke_callback
 *   rcu:rcu_batch_end
 */
static void
perf_script_synchronize_rcu_latency(const char *name, FILE *file, const char *script)
{
	char buf[4096] = { '\0' };
	char buf_1[4096] = { '\0' };
	long offset;
	char *s;
	int rv;

	while (1) {
		rv = get_one_line(file, buf, sizeof(buf));
		offset = ftell(file);

		if (rv != EOF) {
			s = strstr(buf, "rcu_batch_start:");
			if (s) {
				int delta_time_usec;
				int sec_0, usec_0;
				int sec_1, usec_1;
				int pid_0, pid_1;
				int extra;

				/*
				 * rcuop/5-58      [007] d..1.  6111.808748: rcu_batch_start: rcu_preempt CBs=23 bl=10
				 */
				(void) sscanf(buf, "%*[^0-9]%d-%d", &pid_0, &pid_0);
				get_cpu_sec_usec_in_string(buf, &extra, &sec_0, &usec_0);

				while (1) {
					rv = get_one_line(file, buf_1, sizeof(buf_1));
					if (rv == EOF)
						break;

					/* Do not care what a string format is on this step. */
					(void) sscanf(buf, "%*[^0-9]%d-%d", &pid_1, &pid_1);

					/*
					 * rcuop/4-51      [007] d..1.  6111.816758: rcu_batch_end: rcu_preempt CBs-invoked=1 idle=....
					 */
					s = strstr(buf_1, "rcu_batch_end:");
					if (s && pid_0 == pid_1)
						break;

					/*
					 * rcuop/1-29      [005] .....  6111.808745: rcu_invoke_callback: rcu_preempt rhp=0000000093881c60 func=file_free_rcu.cfi_jt
					 */
					s = strstr(buf_1, "rcu_invoke_callback:");
					if (!s || pid_0 != pid_1)
						continue;

					s = strstr(buf_1, "wakeme_after_rcu");
					if (s) {
						get_cpu_sec_usec_in_string(buf_1, &extra, &sec_1, &usec_1);
						delta_time_usec = (sec_1 - sec_0) * 1000000 + (usec_1 - usec_0);
						fprintf(stdout, "{\n%s\n%s\n} latency %d usec\n", buf, buf_1, delta_time_usec);
						break;
					}
				}
			}

			rv = fseek(file, offset, SEEK_SET);
			if (rv)
				fprintf(stdout, "fseek error !!!\n");
		} else {
			break;
		}
	}

}

int main(int argc, char **argv)
{
	FILE *file;

	file = open_perf_script_file(argv[1]);
	if (file == NULL) {
		fprintf(stdout, "%s:%d failed: specify a perf script file\n", __func__, __LINE__);
		exit(-1);
	}

	/* perf_script_softirq_delay(file, 1000); */
	/* perf_script_softirq_duration(file, 500); */
	/* perf_script_hardirq_duration(file, 500); */
	/* perf_script_hardirq_stat(file); */
	/* perf_script_sched_waking_stat(argv[0], file, argv[1]); */
	/* perf_script_sched_wakeup_latency(argv[0], file, argv[1]); */
	perf_script_synchronize_rcu_latency(argv[0], file, argv[1]);

	return 0;
}
<snip>

Running it as "./a.out app_launch_rcu_trace.txt" will produce below results:

<snip>
...
{
    <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt CBs=3613 bl=28
    <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
} latency 7270 usec
{
    rcuop/5-58      [005] d..1. 21951.352767: rcu_batch_start: rcu_preempt CBs=3 bl=10
    rcuop/5-58      [005] ..... 21951.352768: rcu_invoke_callback: rcu_preempt rhp=00000000b995fac1 func=wakeme_after_rcu.cfi_jt
} latency 1 usec
{
    rcuop/6-65      [007] d..1. 21951.804768: rcu_batch_start: rcu_preempt CBs=5 bl=10
    rcuop/6-65      [007] ..... 21951.804769: rcu_invoke_callback: rcu_preempt rhp=00000000b995fac1 func=wakeme_after_rcu.cfi_jt
} latency 1 usec
{
    rcuop/7-72      [006] d..1. 21951.884774: rcu_batch_start: rcu_preempt CBs=3517 bl=27
    rcuop/7-72      [006] ..... 21951.885979: rcu_invoke_callback: rcu_preempt rhp=000000005119eccc func=wakeme_after_rcu.cfi_jt
} latency 1205 usec
{
    rcuop/5-58      [007] d..1. 21951.912853: rcu_batch_start: rcu_preempt CBs=193 bl=10
    rcuop/5-58      [007] ..... 21951.912975: rcu_invoke_callback: rcu_preempt rhp=00000000b995fac1 func=wakeme_after_rcu.cfi_jt
} latency 122 usec
...
<snip>

now you have a pair.

--
Uladzislau Rezki
  
Qiuxu Zhuo March 22, 2023, 11:21 a.m. UTC | #5
Hi Rezki,

Thank you so much for sharing this super cool parser! ;-)
It will help me/us to speed up parsing the tracing logs. 

-Qiuxu

> From: Uladzislau Rezki <urezki@gmail.com>
> Sent: Wednesday, March 22, 2023 2:50 PM
> To: Zhuo, Qiuxu <qiuxu.zhuo@intel.com>
> Cc: Uladzislau Rezki <urezki@gmail.com>; Paul E . McKenney
> <paulmck@kernel.org>; RCU <rcu@vger.kernel.org>;
> quic_neeraju@quicinc.com; Boqun Feng <boqun.feng@gmail.com>; Joel
> Fernandes <joel@joelfernandes.org>; LKML <linux-kernel@vger.kernel.org>;
> Oleksiy Avramchenko <oleksiy.avramchenko@sony.com>; Steven Rostedt
> <rostedt@goodmis.org>; Frederic Weisbecker <frederic@kernel.org>
> Subject: Re: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> 
> On Wed, Mar 22, 2023 at 01:49:50AM +0000, Zhuo, Qiuxu wrote:
> > Hi Rezki,
> >
> > > From: Uladzislau Rezki <urezki@gmail.com>
> > > Sent: Tuesday, March 21, 2023 11:16 PM [...]
> > > >
> > > >
> > > > If possible, may I know the steps, commands, and related
> > > > parameters to
> > > produce the results above?
> > > > Thank you!
> > > >
> > > Build the kernel with CONFIG_RCU_TRACE configuration. Update your
> > > "set_event"
> > > file with appropriate traces:
> > >
> > > <snip>
> > > XQ-DQ54:/sys/kernel/tracing # echo rcu:rcu_batch_start
> > > rcu:rcu_batch_end rcu:rcu_invoke_callback > set_event
> > >
> > > XQ-DQ54:/sys/kernel/tracing # cat set_event rcu:rcu_batch_start
> > > rcu:rcu_invoke_callback rcu:rcu_batch_end
> > > XQ-DQ54:/sys/kernel/tracing # <snip>
> > >
> > > Collect traces as much as you want: XQ-DQ54:/sys/kernel/tracing #
> > > echo 1 > tracing_on; sleep 10; echo 0 > tracing_on Next problem is
> > > how to parse it. Of
> >
> > Thanks for the detailed steps to collect the related testing results.
> >
> > > course you will not be able to parse megabytes of traces. For that
> > > purpose i use a special C trace parser.
> > > If you need an example please let me know i can show here.
> > >
> >
> > Yes, your example parser should save me from a huge amount of traces.
> > ;-) Thanks so much for your sharing.
> >
> See below the C program. It is ugly but it does what you need. Please have a
> look at main function:
> 
> <snip>
> #include <stdio.h>
> #include <stdlib.h>
> #include <stdbool.h>
> #include <ctype.h>
> #include <string.h>
> 
> /*
>  * Splay-tree implementation to store data: key,value
>  * See https://en.wikipedia.org/wiki/Splay_tree
>  */
> #define offsetof(TYPE, MEMBER)	((unsigned long)&((TYPE *)0)-
> >MEMBER)
> #define container_of(ptr, type, member)			\
> ({
> 			\
> 	void *__mptr = (void *)(ptr);					\
> 	((type *)(__mptr - offsetof(type, member)));	\
> })
> 
> #define SP_INIT_NODE(node)
> 			\
> 	((node)->left = (node)->right = (node)->parent = NULL)
> 
> struct splay_node {
> 	struct splay_node *left;
> 	struct splay_node *right;
> 	struct splay_node *parent;
> 	unsigned long val;
> };
> 
> struct splay_root {
> 	struct splay_node *sp_root;
> };
> 
> static inline void
> set_parent(struct splay_node *n, struct splay_node *p) {
> 	if (n)
> 		n->parent = p;
> }
> 
> static inline void
> change_child(struct splay_node *p,
> 	struct splay_node *old, struct splay_node *new) {
> 	if (p) {
> 		if (p->left == old)
> 			p->left = new;
> 		else
> 			p->right = new;
> 	}
> }
> 
> /*
>  * left rotation of node (r), (rc) is (r)'s right child  */ static inline struct
> splay_node * left_pivot(struct splay_node *r) {
> 	struct splay_node *rc;
> 
> 	/*
> 	 * set (rc) to be the new root
> 	 */
> 	rc = r->right;
> 
> 	/*
> 	 * point parent to new left/right child
> 	 */
> 	rc->parent = r->parent;
> 
> 	/*
> 	 * change child of the p->parent.
> 	 */
> 	change_child(r->parent, r, rc);
> 
> 	/*
> 	 * set (r)'s right child to be (rc)'s left child
> 	 */
> 	r->right = rc->left;
> 
> 	/*
> 	 * change parent of rc's left child
> 	 */
> 	set_parent(rc->left, r);
> 
> 	/*
> 	 * set new parent of rotated node
> 	 */
> 	r->parent = rc;
> 
> 	/*
> 	 * set (rc)'s left child to be (r)
> 	 */
> 	rc->left = r;
> 
> 	/*
> 	 * return the new root
> 	 */
> 	return rc;
> }
> 
> /*
>  * right rotation of node (r), (lc) is (r)'s left child  */ static inline struct
> splay_node * right_pivot(struct splay_node *r) {
> 	struct splay_node *lc;
> 
> 	/*
> 	 * set (lc) to be the new root
> 	 */
> 	lc = r->left;
> 
> 	/*
> 	 * point parent to new left/right child
> 	 */
> 	lc->parent = r->parent;
> 
> 	/*
> 	 * change child of the p->parent.
> 	 */
> 	change_child(r->parent, r, lc);
> 
> 	/*
> 	 * set (r)'s left child to be (lc)'s right child
> 	 */
> 	r->left = lc->right;
> 
> 	/*
> 	 * change parent of lc's right child
> 	 */
> 	set_parent(lc->right, r);
> 
> 	/*
> 	 * set new parent of rotated node
> 	 */
> 	r->parent = lc;
> 
> 	/*
> 	 * set (lc)'s right child to be (r)
> 	 */
> 	lc->right = r;
> 
> 	/*
> 	 * return the new root
> 	 */
> 	return lc;
> }
> 
> static struct splay_node *
> top_down_splay(unsigned long vstart,
> 	struct splay_node *root, struct splay_root *sp_root) {
> 	/*
> 	 * During the splitting process two temporary trees are formed.
> 	 * "l" contains all keys less than the search key/vstart and "r"
> 	 * contains all keys greater than the search key/vstart.
> 	 */
> 	struct splay_node head, *ltree_max, *rtree_max;
> 	struct splay_node *ltree_prev, *rtree_prev;
> 
> 	if (root == NULL)
> 		return NULL;
> 
> 	SP_INIT_NODE(&head);
> 	ltree_max = rtree_max = &head;
> 	ltree_prev = rtree_prev = NULL;
> 
> 	while (1) {
> 		if (vstart < root->val && root->left) {
> 			if (vstart < root->left->val) {
> 				root = right_pivot(root);
> 
> 				if (root->left == NULL)
> 					break;
> 			}
> 
> 			/*
> 			 * Build right subtree.
> 			 */
> 			rtree_max->left = root;
> 			rtree_max->left->parent = rtree_prev;
> 			rtree_max = rtree_max->left;
> 			rtree_prev = root;
> 			root = root->left;
> 		} else if (vstart > root->val && root->right) {
> 			if (vstart > root->right->val) {
> 				root = left_pivot(root);
> 
> 				if (root->right == NULL)
> 					break;
> 			}
> 
> 			/*
> 			 * Build left subtree.
> 			 */
> 			ltree_max->right = root;
> 			ltree_max->right->parent = ltree_prev;
> 			ltree_max = ltree_max->right;
> 			ltree_prev = root;
> 			root = root->right;
> 		} else {
> 			break;
> 		}
> 	}
> 
> 	/*
> 	 * Assemble the tree.
> 	 */
> 	ltree_max->right = root->left;
> 	rtree_max->left = root->right;
> 	root->left = head.right;
> 	root->right = head.left;
> 
> 	set_parent(ltree_max->right, ltree_max);
> 	set_parent(rtree_max->left, rtree_max);
> 	set_parent(root->left, root);
> 	set_parent(root->right, root);
> 	root->parent = NULL;
> 
> 	/*
> 	 * Set new root. Please note it might be the same.
> 	 */
> 	sp_root->sp_root = root;
> 	return sp_root->sp_root;
> }
> 
> struct splay_node *
> splay_search(unsigned long key, struct splay_root *root) {
> 	struct splay_node *n;
> 
> 	n = top_down_splay(key, root->sp_root, root);
> 	if (n && n->val == key)
> 		return n;
> 
> 	return NULL;
> }
> 
> static bool
> splay_insert(struct splay_node *n, struct splay_root *sp_root) {
> 	struct splay_node *r;
> 
> 	SP_INIT_NODE(n);
> 
> 	r = top_down_splay(n->val, sp_root->sp_root, sp_root);
> 	if (r == NULL) {
> 		/* First element in the tree */
> 		sp_root->sp_root = n;
> 		return false;
> 	}
> 
> 	if (n->val < r->val) {
> 		n->left = r->left;
> 		n->right = r;
> 
> 		set_parent(r->left, n);
> 		r->parent = n;
> 		r->left = NULL;
> 	} else if (n->val > r->val) {
> 		n->right = r->right;
> 		n->left = r;
> 
> 		set_parent(r->right, n);
> 		r->parent = n;
> 		r->right = NULL;
> 	} else {
> 		/*
> 		 * Same, indicate as not success insertion.
> 		 */
> 		return false;
> 	}
> 
> 	sp_root->sp_root = n;
> 	return true;
> }
> 
> static bool
> splay_delete_init(struct splay_node *n, struct splay_root *sp_root) {
> 	struct splay_node *subtree[2];
> 	unsigned long val = n->val;
> 
> 	/* 1. Splay the node to the root. */
> 	n = top_down_splay(n->val, sp_root->sp_root, sp_root);
> 	if (n == NULL || n->val != val)
> 		return false;
> 
> 	/* 2. Save left/right sub-trees. */
> 	subtree[0] = n->left;
> 	subtree[1] = n->right;
> 
> 	/* 3. Now remove the node. */
> 	SP_INIT_NODE(n);
> 
> 	if (subtree[0]) {
> 		/* 4. Splay the largest node in left sub-tree to the root. */
> 		top_down_splay(val, subtree[0], sp_root);
> 
> 		/* 5. Attach the right sub-tree as the right child of the left
> sub-tree. */
> 		sp_root->sp_root->right = subtree[1];
> 
> 		/* 6. Update the parent of right sub-tree */
> 		set_parent(subtree[1], sp_root->sp_root);
> 	} else {
> 		/* 7. Left sub-tree is NULL, just point to right one. */
> 		sp_root->sp_root = subtree[1];
> 	}
> 
> 	/* 8. Set parent of root node to NULL. */
> 	if (sp_root->sp_root)
> 		sp_root->sp_root->parent = NULL;
> 
> 	return true;
> }
> 
> static FILE *
> open_perf_script_file(const char *path)
> {
> 	FILE *f = NULL;
> 
> 	if (path == NULL)
> 		goto out;
> 
> 	f = fopen(path, "r");
> 	if (!f)
> 		goto out;
> 
> out:
> 	return f;
> }
> 
> static int
> get_one_line(FILE *file, char *buf, size_t len) {
> 	int i = 0;
> 
> 	memset(buf, '\0', len);
> 
> 	for (i = 0; i < len - 1; i++) {
> 		int c = fgetc(file);
> 
> 		if (c == EOF)
> 			return EOF;
> 
> 		if (c == '\n')
> 			break;
> 
> 		if (c != '\r')
> 			buf[i] = c;
> 	}
> 
> 	return i;
> }
> 
> static int
> read_string_till_string(char *buf, char *out, size_t out_len, char *in, size_t
> in_len) {
> 	int i, j;
> 
> 	memset(out, '\0', out_len);
> 
> 	for (i = 0; i < out_len; i++) {
> 		if (buf[i] != in[0]) {
> 			out[i] = buf[i];
> 			continue;
> 		}
> 
> 		for (j = 0; j < in_len; j++) {
> 			if (buf[i + j] != in[j])
> 				break;
> 		}
> 
> 		/* Found. */
> 		if (j == in_len)
> 			return 1;
> 	}
> 
> 	return 0;
> }
> 
> /*
>  * find pattern is  "something [003] 8640.034785: something"
>  */
> static inline void
> get_cpu_sec_usec_in_string(const char *s, int *cpu, int *sec, int *usec) {
> 	char usec_buf[32] = {'\0'};
> 	char sec_buf[32] = {'\0'};
> 	char cpu_buf[32] = {'\0'};
> 	bool found_sec = false;
> 	bool found_usec = false;
> 	bool found_cpu = false;
> 	int i, j, dot;
> 
> 	*cpu = *sec = *usec = -1;
> 
> 	for (i = 0, j = 0; s[i] != '\0'; i++) {
> 		if (s[i] == '.') {
> 			dot = i++;
> 
> 			/* take microseconds */
> 			for (j = 0; j < sizeof(usec_buf); j++) {
> 				if (isdigit(s[i])) {
> 					usec_buf[j] = s[i];
> 				} else {
> 					if (s[i] == ':' && j > 0)
> 						found_usec = true;
> 					else
> 						found_usec = false;
> 
> 					/* Terminate here. */
> 					break;
> 				}
> 
> 				i++;
> 			}
> 
> 			if (found_usec) {
> 				/* roll back */
> 				while (s[i] != ' ' && i > 0)
> 					i--;
> 
> 				/* take seconds */
> 				for (j = 0; j < sizeof(sec_buf); j++) {
> 					if (isdigit(s[++i])) {
> 						sec_buf[j] = s[i];
> 					} else {
> 						if (s[i] == '.' && j > 0)
> 							found_sec = true;
> 						else
> 							found_sec = false;
> 
> 						/* Terminate here. */
> 						break;
> 					}
> 				}
> 			}
> 
> 			if (found_sec && found_usec) {
> 				/* roll back */
> 				while (s[i] != '[' && i > 0)
> 					i--;
> 
> 				/* take seconds */
> 				for (j = 0; j < sizeof(cpu_buf); j++) {
> 					if (isdigit(s[++i])) {
> 						cpu_buf[j] = s[i];
> 					} else {
> 						if (s[i] == ']' && j > 0)
> 							found_cpu = true;
> 						else
> 							found_cpu = false;
> 
> 						/* Terminate here. */
> 						break;
> 					}
> 				}
> 
> 				if (found_cpu && found_sec && found_usec)
> {
> 					*sec = atoi(sec_buf);
> 					*usec = atoi(usec_buf);
> 					*cpu = atoi(cpu_buf);
> 					return;
> 				}
> 			}
> 
> 			/*
> 			 * Check next dot pattern.
> 			 */
> 			found_sec = false;
> 			found_usec = false;
> 			found_cpu = false;
> 			i = dot;
> 		}
> 	}
> }
> 
> /*
>  * find pattern is  "something comm=foo android thr1 pid=123 something"
>  */
> static inline int
> get_comm_pid_in_string(const char *buf, char *comm, ssize_t len, int *pid) {
> 	char *sc, *sp;
> 	int rv, i;
> 
> 	memset(comm, '\0', len);
> 
> 	sc = strstr(buf, "comm=");
> 	if (sc)
> 		sp = strstr(sc, " pid=");
> 
> 	if (!sc || !sp)
> 		return -1;
> 
> 	for (i = 0, sc += 5; sc != sp; i++) {
> 		if (i < len) {
> 			if (*sc == ' ')
> 				comm[i] = '-';
> 			else
> 				comm[i] = *sc;
> 
> 			sc++;
> 		}
> 	}
> 
> 	/* Read pid. */
> 	rv = sscanf(sp, " pid=%d", pid);
> 	if (rv != 1)
> 		return -1;
> 
> 	return 1;
> }
> 
> static void
> perf_script_softirq_delay(FILE *file, int delay_usec) {
> 	char buf[4096] = { '\0' };
> 	char buf_1[4096] = { '\0' };
> 	long offset;
> 	char *s;
> 	int rv;
> 
> 	while (1) {
> 		rv = get_one_line(file, buf, sizeof(buf));
> 		offset = ftell(file);
> 
> 		if (rv != EOF) {
> 			s = strstr(buf, "irq:softirq_raise:");
> 			if (s) {
> 				char extra[512] = {'\0'};
> 				int sec_0, usec_0;
> 				int sec_1, usec_1;
> 				int handle_vector;
> 				int rise_vector;
> 				int cpu_0;
> 				int cpu_1;
> 
> 				/*
> 				 * swapper     0    [000]  6010.619854:
> irq:softirq_raise: vec=7 [action=SCHED]
> 				 * android.bg  3052 [001]  6000.076212:
> irq:softirq_entry: vec=9 [action=RCU]
> 				 */
> 				(void) sscanf(s, "%s vec=%d", extra,
> &rise_vector);
> 				get_cpu_sec_usec_in_string(buf, &cpu_0,
> &sec_0, &usec_0);
> 
> 				while (1) {
> 					rv = get_one_line(file, buf_1,
> sizeof(buf_1));
> 					if (rv == EOF)
> 						break;
> 
> 					s = strstr(buf_1, "irq:softirq_entry:");
> 					if (s) {
> 						(void) sscanf(s, "%s vec=%d",
> extra, &handle_vector);
> 
> 	get_cpu_sec_usec_in_string(buf_1, &cpu_1, &sec_1, &usec_1);
> 
> 						if (cpu_0 == cpu_1 &&
> rise_vector == handle_vector) {
> 							int delta_time_usec =
> (sec_1 - sec_0) * 1000000 + (usec_1 - usec_0);
> 
> 							if (delta_time_usec >
> delay_usec)
> 								fprintf(stdout,
> "{\n%s\n%s\n} diff %d usec\n", buf, buf_1, delta_time_usec);
> 							break;
> 						}
> 					}
> 				}
> 			}
> 
> 			rv = fseek(file, offset, SEEK_SET);
> 			if (rv)
> 				fprintf(stdout, "fseek error !!!\n");
> 		} else {
> 			break;
> 		}
> 	}
> }
> 
> static void
> perf_script_softirq_duration(FILE *file, int duration_usec) {
> 	char buf[4096] = { '\0' };
> 	char buf_1[4096] = { '\0' };
> 	long offset;
> 	char *s;
> 	int rv;
> 
> 	while (1) {
> 		rv = get_one_line(file, buf, sizeof(buf));
> 		offset = ftell(file);
> 
> 		if (rv != EOF) {
> 			s = strstr(buf, "irq:softirq_entry:");
> 			if (s) {
> 				char extra[512] = {'\0'};
> 				int sec_0, usec_0;
> 				int sec_1, usec_1;
> 				int handle_vector;
> 				int rise_vector;
> 				int cpu_0;
> 				int cpu_1;
> 
> 				/*
> 				 * swapper     0    [000]  6010.619854:
> irq:softirq_entry: vec=7 [action=SCHED]
> 				 * android.bg  3052 [001]  6000.076212:
> irq:softirq_exit: vec=9 [action=RCU]
> 				 */
> 				(void) sscanf(s, "%s vec=%d", extra,
> &rise_vector);
> 				get_cpu_sec_usec_in_string(buf, &cpu_0,
> &sec_0, &usec_0);
> 
> 				while (1) {
> 					rv = get_one_line(file, buf_1,
> sizeof(buf_1));
> 					if (rv == EOF)
> 						break;
> 
> 					s = strstr(buf_1, "irq:softirq_exit:");
> 					if (s) {
> 						(void) sscanf(s, "%s vec=%d",
> extra, &handle_vector);
> 
> 	get_cpu_sec_usec_in_string(buf_1, &cpu_1, &sec_1, &usec_1);
> 
> 						if (cpu_0 == cpu_1 &&
> rise_vector == handle_vector) {
> 							int delta_time_usec =
> (sec_1 - sec_0) * 1000000 + (usec_1 - usec_0);
> 
> 							if (delta_time_usec >
> duration_usec)
> 								fprintf(stdout,
> "{\n%s\n%s\n} diff %d usec\n", buf, buf_1, delta_time_usec);
> 							break;
> 						}
> 					}
> 				}
> 			}
> 
> 			rv = fseek(file, offset, SEEK_SET);
> 			if (rv)
> 				fprintf(stdout, "fseek error !!!\n");
> 		} else {
> 			break;
> 		}
> 	}
> }
> 
> static void
> perf_script_hardirq_duration(FILE *file, int duration_msec) {
> 	char buf[4096] = { '\0' };
> 	char buf_1[4096] = { '\0' };
> 	long offset;
> 	char *s;
> 	int rv;
> 
> 	while (1) {
> 		rv = get_one_line(file, buf, sizeof(buf));
> 		offset = ftell(file);
> 
> 		if (rv != EOF) {
> 			s = strstr(buf, "irq:irq_handler_entry:");
> 			if (s) {
> 				char extra[512] = {'\0'};
> 				int sec_0, usec_0;
> 				int sec_1, usec_1;
> 				int handle_vector;
> 				int rise_vector;
> 				int cpu_0;
> 				int cpu_1;
> 
> 				/*
> 				 * swapper     0 [002]  6205.804133:
> irq:irq_handler_entry: irq=11 name=arch_timer
> 				 * swapper     0 [002]  6205.804228:
> irq:irq_handler_exit: irq=11 ret=handled
> 				 */
> 				(void) sscanf(s, "%s irq=%d", extra,
> &rise_vector);
> 				get_cpu_sec_usec_in_string(buf, &cpu_0,
> &sec_0, &usec_0);
> 
> 				while (1) {
> 					rv = get_one_line(file, buf_1,
> sizeof(buf_1));
> 					if (rv == EOF)
> 						break;
> 
> 					s = strstr(buf_1,
> "irq:irq_handler_exit:");
> 					if (s) {
> 						(void) sscanf(s, "%s irq=%d",
> extra, &handle_vector);
> 
> 	get_cpu_sec_usec_in_string(buf_1, &cpu_1, &sec_1, &usec_1);
> 
> 						if (cpu_0 == cpu_1 &&
> rise_vector == handle_vector) {
> 							int delta_time_usec =
> (sec_1 - sec_0) * 1000000 + (usec_1 - usec_0);
> 
> 							if (delta_time_usec >
> duration_msec)
> 								fprintf(stdout,
> "{\n%s\n%s\n} diff %d usec\n", buf, buf_1, delta_time_usec);
> 							break;
> 						}
> 					}
> 				}
> 			}
> 
> 			rv = fseek(file, offset, SEEK_SET);
> 			if (rv)
> 				fprintf(stdout, "fseek error !!!\n");
> 		} else {
> 			break;
> 		}
> 	}
> }
> 
> struct irq_stat {
> 	int irq;
> 	int count;
> 	char irq_name[512];
> 
> 	int min_interval;
> 	int max_interval;
> 	int avg_interval;
> 
> 	unsigned int time_stamp_usec;
> 	struct splay_node node;
> };
> 
> static struct irq_stat *
> new_irq_node_init(int irq, char *irq_name) {
> 	struct irq_stat *n = calloc(1, sizeof(*n));
> 
> 	if (n) {
> 		n->irq = irq;
> 		(void) strncpy(n->irq_name, irq_name, sizeof(n->irq_name));
> 		n->node.val = irq;
> 	}
> 
> 	return n;
> }
> 
> static void
> perf_script_hardirq_stat(FILE *file)
> {
> 	struct splay_root sproot = { NULL };
> 	struct irq_stat *node;
> 	char buf[4096] = { '\0' };
> 	char extra[256] = {'\0'};
> 	char irq_name[256] = {'\0'};
> 	unsigned int time_stamp_usec;
> 	int cpu, sec, usec;
> 	int rv, irq;
> 	char *s;
> 
> 	while (1) {
> 		rv = get_one_line(file, buf, sizeof(buf));
> 		if (rv == EOF)
> 			break;
> 
> 		 s = strstr(buf, "irq:irq_handler_entry:");
> 		 if (s == NULL)
> 			 continue;
> 
> 		 /*
> 		  * format is as follow one:
> 		  * sleep  1418 [003]  8780.957112:
> irq:irq_handler_entry: irq=11 name=arch_timer
> 		  */
> 		 rv = sscanf(s, "%s irq=%d name=%s", extra, &irq, irq_name);
> 	 	 if (rv != 3)
> 	 		 continue;
> 
> 		 get_cpu_sec_usec_in_string(buf, &cpu, &sec, &usec);
> 		 time_stamp_usec = (sec * 1000000) + usec;
> 
> 		 if (sproot.sp_root == NULL) {
> 			 node = new_irq_node_init(irq, irq_name);
> 			 if (node)
> 				 splay_insert(&node->node, &sproot);
> 		 }
> 
> 		 top_down_splay(irq, sproot.sp_root, &sproot);
> 		 node = container_of(sproot.sp_root, struct irq_stat, node);
> 
> 		 /* Found the entry in the tree. */
> 		 if (node->irq == irq) {
> 			 if (node->time_stamp_usec) {
> 				 unsigned int delta = time_stamp_usec -
> node->time_stamp_usec;
> 
> 				 if (delta < node->min_interval || !node-
> >min_interval)
> 					 node->min_interval = delta;
> 
> 				 if (delta > node->max_interval)
> 					 node->max_interval = delta;
> 
> 				 node->avg_interval += delta;
> 			 }
> 
> 			 /* Save the last time for this IRQ entry. */
> 			 node->time_stamp_usec = time_stamp_usec;
> 		 } else {
> 			 /* Allocate a new record and place it to the tree. */
> 			 node = new_irq_node_init(irq, irq_name);
> 			 if (node)
> 				 splay_insert(&node->node, &sproot);
> 
> 		 }
> 
> 		 /* Update the timestamp for this entry. */
> 		 node->time_stamp_usec = time_stamp_usec;
> 		 node->count++;
> 	}
> 
> 	/* Dump the tree. */
> 	while (sproot.sp_root) {
> 		node = container_of(sproot.sp_root, struct irq_stat, node);
> 
> 		fprintf(stdout, "irq: %5d name: %30s count: %7d, min: %10d,
> max: %10d, avg: %10d\n",
> 				node->irq, node->irq_name, node->count,
> 				node->min_interval, node->max_interval,
> node->avg_interval / node->count);
> 
> 		splay_delete_init(&node->node, &sproot);
> 		free(node);
> 	}
> 
> 	fprintf(stdout, "\tRun './a.out ./perf.script | sort -nk 6' to sort by
> column 6.\n"); }
> 
> struct sched_waking {
> 	unsigned int wakeup_nr;
> 	char comm[4096];
> 	int pid;
> 
> 	int min;
> 	int max;
> 	int avg;
> 
> 	unsigned int time_stamp_usec;
> 	struct splay_node node;
> };
> 
> static struct sched_waking *
> new_sched_waking_node_init(int pid, char *comm) {
> 	struct sched_waking *n = calloc(1, sizeof(*n));
> 
> 	if (n) {
> 		n->pid = pid;
> 		(void) strncpy(n->comm, comm, sizeof(n->comm));
> 		n->node.val = pid;
> 	}
> 
> 	return n;
> }
> 
> /*
>  * How many times a task is awaken + min/max/avg stat.
>  */
> static void
> perf_script_sched_waking_stat(const char *name, FILE *file, const char
> *script) {
> 	struct splay_root sroot = { NULL };
> 	struct sched_waking *n;
> 	char buf[4096] = { '\0' };
> 	char comm[256] = {'\0'};
> 	unsigned int time_stamp_usec;
> 	unsigned int total_waking = 0;
> 	int cpu, sec, usec;
> 	int rv, pid;
> 	char *s;
> 
> 	while (1) {
> 		rv = get_one_line(file, buf, sizeof(buf));
> 		if (rv == EOF)
> 			break;
> 		/*
> 		 * format is as follow one:
> 		 * foo[1] 7521 [002] 10.431216: sched_waking: comm=tr
> pid=2 prio=120 target_cpu=006
> 		 */
> 		s = strstr(buf, "sched_waking:");
> 		if (s == NULL)
> 			continue;
> 
> 		rv = get_comm_pid_in_string(s, comm, sizeof(comm), &pid);
> 		if (rv < 0) {
> 			printf("ERROR: skip entry...\n");
> 			continue;
> 		}
> 
> 		get_cpu_sec_usec_in_string(buf, &cpu, &sec, &usec);
> 		time_stamp_usec = (sec * 1000000) + usec;
> 
> 		if (sroot.sp_root == NULL) {
> 			n = new_sched_waking_node_init(pid, comm);
> 			if (n)
> 				splay_insert(&n->node, &sroot);
> 		}
> 
> 		top_down_splay(pid, sroot.sp_root, &sroot);
> 		n = container_of(sroot.sp_root, struct sched_waking, node);
> 
> 		/* Found the entry in the tree. */
> 		if (n->pid == pid) {
> 			if (n->time_stamp_usec) {
> 				unsigned int delta = time_stamp_usec - n-
> >time_stamp_usec;
> 
> 				if (delta < n->min || !n->min)
> 					n->min = delta;
> 
> 				if (delta > n->max)
> 					n->max = delta;
> 
> 				n->avg += delta;
> 			}
> 
> 			/* Save the last time for this wake-up entry. */
> 			n->time_stamp_usec = time_stamp_usec;
> 		} else {
> 			/* Allocate a new record and place it to the tree. */
> 			n = new_sched_waking_node_init(pid, comm);
> 			if (n)
> 				splay_insert(&n->node, &sroot);
> 		}
> 
> 		/* Update the timestamp for this entry. */
> 		n->time_stamp_usec = time_stamp_usec;
> 		n->wakeup_nr++;
> 	}
> 
> 	/* Dump the Splay-tree. */
> 	while (sroot.sp_root) {
> 		n = container_of(sroot.sp_root, struct sched_waking, node);
> 		fprintf(stdout, "name: %30s pid: %10d woken-
> up %5d\tinterval: min %5d\tmax %5d\tavg %5d\n",
> 			n->comm, n->pid, n->wakeup_nr,
> 			n->min, n->max, n->avg / n->wakeup_nr);
> 
> 		total_waking += n->wakeup_nr;
> 		splay_delete_init(&n->node, &sroot);
> 		free(n);
> 	}
> 
> 	fprintf(stdout, "=== Total: %u ===\n", total_waking);
> 	fprintf(stdout, "\tRun './%s %s | sort -nk 6' to sort by column 6.\n",
> name, script); }
> 
> /*
>  * Latency of try_to_wake_up path + select a CPU + placing a task into run-
> queue.
>  */
> static void
> perf_script_sched_wakeup_latency(const char *name, FILE *file, const char
> *script) {
> 	struct splay_root sroot = { NULL };
> 	struct splay_node *node;
> 	struct sched_waking *n;
> 	char buf[4096] = { '\0' };
> 	char comm[256] = {'\0'};
> 	unsigned int time_stamp_usec;
> 	unsigned int wakeup_latency_usec;
> 	unsigned int total_waking = 0;
> 	int cpu, sec, usec;
> 	int rv, pid;
> 	char *sched_waking_wakeup;
> 	bool sched_waking_event;
> 
> 	while (1) {
> 		rv = get_one_line(file, buf, sizeof(buf));
> 		if (rv == EOF)
> 			break;
> 
> 		/*
> 		 * format is as follow one:
> 		 * foo[1] 7521 [002] 10.431216: sched_waking: comm=tr
> pid=2 prio=120 target_cpu=006
> 		 */
> 		sched_waking_wakeup = strstr(buf, "sched_waking:");
> 		sched_waking_event = !!sched_waking_wakeup;
> 
> 		if (!sched_waking_event) {
> 			/*
> 			 * format is as follow one:
> 			 * foo[1] 7521 [002] 10.431216: sched_wakeup:
> comm=tr pid=2 prio=120 target_cpu=006
> 			 */
> 			sched_waking_wakeup = strstr(buf,
> "sched_wakeup:");
> 			if (sched_waking_wakeup == NULL)
> 				continue;
> 		}
> 
> 		rv = get_comm_pid_in_string(sched_waking_wakeup, comm,
> sizeof(comm), &pid);
> 		if (rv < 0) {
> 			printf("ERROR: skip entry...\n");
> 			continue;
> 		}
> 
> 		get_cpu_sec_usec_in_string(buf, &cpu, &sec, &usec);
> 		time_stamp_usec = (sec * 1000000) + usec;
> 
> 		/*
> 		 * Let's check if it exists, if so update it
> 		 * otherwise create a new node and insert.
> 		 */
> 		if (sched_waking_event) {
> 			node = top_down_splay(pid, sroot.sp_root, &sroot);
> 			if (node == NULL) {
> 				n = new_sched_waking_node_init(pid,
> comm);
> 				splay_insert(&n->node, &sroot);
> 			} else {
> 				n = container_of(node, struct sched_waking,
> node);
> 				if (n->pid != pid) {
> 					n = new_sched_waking_node_init(pid,
> comm);
> 					splay_insert(&n->node, &sroot);
> 				}
> 			}
> 
> 			n->time_stamp_usec = time_stamp_usec;
> 			continue;
> 		}
> 
> 		node = top_down_splay(pid, sroot.sp_root, &sroot);
> 		if (node == NULL) {
> 			fprintf(stdout, "error: no sched_waking event for %d
> pid yet.\n", pid);
> 			continue;
> 		}
> 
> 		n = container_of(node, struct sched_waking, node);
> 		if (n->pid != pid) {
> 			fprintf(stdout, "error: sched_wakeup event does not
> match with any sched_waking event.\n");
> 			continue;
> 		}
> 
> 		wakeup_latency_usec = time_stamp_usec - n-
> >time_stamp_usec;
> 
> 		if (wakeup_latency_usec < n->min || !n->min)
> 			n->min = wakeup_latency_usec;
> 
> 		if (wakeup_latency_usec > n->max)
> 			n->max = wakeup_latency_usec;
> 
> 		if (n->avg + wakeup_latency_usec < n->avg)
> 			fprintf(stdout, "error: counter is overflowed...\n");
> 
> 		fprintf(stdout, "%s: %d wake-up latency: %u waken on %d
> CPU\n", comm, pid, wakeup_latency_usec, cpu);
> 
> 		n->avg += wakeup_latency_usec;
> 		n->wakeup_nr++;
> 	}
> 
> 	/* Dump the Splay-tree. */
> 	while (sroot.sp_root) {
> 		n = container_of(sroot.sp_root, struct sched_waking, node);
> 		/* fprintf(stdout, "name: %30s pid: %10d woken-
> up %5d\twakeup-latency: min %5d\tmax %5d\tavg %5d\n", */
> 		/* 	n->comm, n->pid, n->wakeup_nr, */
> 		/* 	n->min, n->max, n->avg / n->wakeup_nr); */
> 
> 		total_waking += n->wakeup_nr;
> 		splay_delete_init(&n->node, &sroot);
> 		free(n);
> 	}
> 
> 	/* fprintf(stdout, "=== Total: %u ===\n", total_waking); */
> 	/* fprintf(stdout, "\tRun '%s %s | sort -nk 6' to sort by column 6.\n",
> name, script); */ }
> 
> 
> /*
>  * Requires:
>  *   rcu:rcu_batch_start
>  *   rcu:rcu_invoke_callback
>  *   rcu:rcu_batch_end
>  */
> static void
> perf_script_synchronize_rcu_latency(const char *name, FILE *file, const char
> *script) {
> 	char buf[4096] = { '\0' };
> 	char buf_1[4096] = { '\0' };
> 	long offset;
> 	char *s;
> 	int rv;
> 
> 	while (1) {
> 		rv = get_one_line(file, buf, sizeof(buf));
> 		offset = ftell(file);
> 
> 		if (rv != EOF) {
> 			s = strstr(buf, "rcu_batch_start:");
> 			if (s) {
> 				int delta_time_usec;
> 				int sec_0, usec_0;
> 				int sec_1, usec_1;
> 				int pid_0, pid_1;
> 				int extra;
> 
> 				/*
> 				 * rcuop/5-58      [007] d..1.  6111.808748:
> rcu_batch_start: rcu_preempt CBs=23 bl=10
> 				 */
> 				(void) sscanf(buf, "%*[^0-9]%d-%d", &pid_0,
> &pid_0);
> 				get_cpu_sec_usec_in_string(buf, &extra,
> &sec_0, &usec_0);
> 
> 				while (1) {
> 					rv = get_one_line(file, buf_1,
> sizeof(buf_1));
> 					if (rv == EOF)
> 						break;
> 
> 					/* Do not care what a string format is
> on this step. */
> 					(void) sscanf(buf, "%*[^0-9]%d-%d",
> &pid_1, &pid_1);
> 
> 					/*
> 					 * rcuop/4-51      [007] d..1.
> 6111.816758: rcu_batch_end: rcu_preempt CBs-invoked=1 idle=....
> 					 */
> 					s = strstr(buf_1, "rcu_batch_end:");
> 					if (s && pid_0 == pid_1)
> 						break;
> 
> 					/*
> 					 * rcuop/1-29      [005] .....
> 6111.808745: rcu_invoke_callback: rcu_preempt rhp=0000000093881c60
> func=file_free_rcu.cfi_jt
> 					 */
> 					s = strstr(buf_1,
> "rcu_invoke_callback:");
> 					if (!s || pid_0 != pid_1)
> 						continue;
> 
> 					s = strstr(buf_1,
> "wakeme_after_rcu");
> 					if (s) {
> 
> 	get_cpu_sec_usec_in_string(buf_1, &extra, &sec_1, &usec_1);
> 						delta_time_usec = (sec_1 -
> sec_0) * 1000000 + (usec_1 - usec_0);
> 						fprintf(stdout, "{\n%s\n%s\n}
> latency %d usec\n", buf, buf_1, delta_time_usec);
> 						break;
> 					}
> 				}
> 			}
> 
> 			rv = fseek(file, offset, SEEK_SET);
> 			if (rv)
> 				fprintf(stdout, "fseek error !!!\n");
> 		} else {
> 			break;
> 		}
> 	}
> 
> }
> 
> int main(int argc, char **argv)
> {
> 	FILE *file;
> 
> 	file = open_perf_script_file(argv[1]);
> 	if (file == NULL) {
> 		fprintf(stdout, "%s:%d failed: specify a perf script file\n",
> __func__, __LINE__);
> 		exit(-1);
> 	}
> 
> 	/* perf_script_softirq_delay(file, 1000); */
> 	/* perf_script_softirq_duration(file, 500); */
> 	/* perf_script_hardirq_duration(file, 500); */
> 	/* perf_script_hardirq_stat(file); */
> 	/* perf_script_sched_waking_stat(argv[0], file, argv[1]); */
> 	/* perf_script_sched_wakeup_latency(argv[0], file, argv[1]); */
> 	perf_script_synchronize_rcu_latency(argv[0], file, argv[1]);
> 
> 	return 0;
> }
> <snip>
> 
> Running it as "./a.out app_launch_rcu_trace.txt" will produce below results:

Thanks for the usage example.

> <snip>
> ...
> {
>     <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt
> CBs=3613 bl=28
>     <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt
> rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
> } latency 7270 usec
> {
>     rcuop/5-58      [005] d..1. 21951.352767: rcu_batch_start: rcu_preempt
> CBs=3 bl=10
>     rcuop/5-58      [005] ..... 21951.352768: rcu_invoke_callback: rcu_preempt
> rhp=00000000b995fac1 func=wakeme_after_rcu.cfi_jt
> } latency 1 usec
> {
>     rcuop/6-65      [007] d..1. 21951.804768: rcu_batch_start: rcu_preempt
> CBs=5 bl=10
>     rcuop/6-65      [007] ..... 21951.804769: rcu_invoke_callback: rcu_preempt
> rhp=00000000b995fac1 func=wakeme_after_rcu.cfi_jt
> } latency 1 usec
> {
>     rcuop/7-72      [006] d..1. 21951.884774: rcu_batch_start: rcu_preempt
> CBs=3517 bl=27
>     rcuop/7-72      [006] ..... 21951.885979: rcu_invoke_callback: rcu_preempt
> rhp=000000005119eccc func=wakeme_after_rcu.cfi_jt
> } latency 1205 usec
> {
>     rcuop/5-58      [007] d..1. 21951.912853: rcu_batch_start: rcu_preempt
> CBs=193 bl=10
>     rcuop/5-58      [007] ..... 21951.912975: rcu_invoke_callback: rcu_preempt
> rhp=00000000b995fac1 func=wakeme_after_rcu.cfi_jt
> } latency 122 usec
> ...
> <snip>
> 
> now you have a pair.
> 
> --
> Uladzislau Rezki
  
Zqiang March 27, 2023, 11:21 a.m. UTC | #6
> > From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > Sent: Tuesday, March 21, 2023 6:28 PM
> > [...]
> > Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> > 
> > A call to a synchronize_rcu() can be expensive from time point of view.
> > Different workloads can be affected by this especially the ones which use this
> > API in its time critical sections.
> > 
> 
> This is interesting and meaningful research. ;-)
> 
> > For example in case of NOCB scenario the wakeme_after_rcu() callback
> > invocation depends on where in a nocb-list it is located. Below is an example
> > when it was the last out of ~3600 callbacks:
>



Can it be implemented separately as follows?  it seems that the code is simpler
(only personal opinion)  😊.

But I didn't test whether this reduce synchronize_rcu() waiting time

+static void rcu_poll_wait_gp(struct rcu_tasks *rtp)
+{
+       unsigned long gp_snap;
+
+       gp_snap = start_poll_synchronize_rcu();
+       while (!poll_state_synchronize_rcu(gp_snap))
+               schedule_timeout_idle(1);
+}
+
+void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func);
+DEFINE_RCU_TASKS(rcu_poll, rcu_poll_wait_gp, call_rcu_poll,
+                 "RCU Poll");
+void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func)
+{
+       call_rcu_tasks_generic(rhp, func, &rcu_poll);
+}
+EXPORT_SYMBOL_GPL(call_rcu_poll);
+
+void synchronize_rcu_poll(void)
+{
+       synchronize_rcu_tasks_generic(&rcu_poll);
+}
+EXPORT_SYMBOL_GPL(synchronize_rcu_poll);
+
+static int __init rcu_spawn_poll_kthread(void)
+{
+       cblist_init_generic(&rcu_poll);
+       rcu_poll.gp_sleep = HZ / 10;
+       rcu_spawn_tasks_kthread_generic(&rcu_poll);
+       return 0;
+}

Thanks
Zqiang


> > 
> > <snip>
> >   <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt
> > CBs=3613 bl=28
> > ...
> >   <...>-29      [001] ..... 21950.152578: rcu_invoke_callback: rcu_preempt
> > rhp=00000000b2d6dee8 func=__free_vm_area_struct.cfi_jt
> >   <...>-29      [001] ..... 21950.152579: rcu_invoke_callback: rcu_preempt
> > rhp=00000000a446f607 func=__free_vm_area_struct.cfi_jt
> >   <...>-29      [001] ..... 21950.152580: rcu_invoke_callback: rcu_preempt
> > rhp=00000000a5cab03b func=__free_vm_area_struct.cfi_jt
> >   <...>-29      [001] ..... 21950.152581: rcu_invoke_callback: rcu_preempt
> > rhp=0000000013b7e5ee func=__free_vm_area_struct.cfi_jt
> >   <...>-29      [001] ..... 21950.152582: rcu_invoke_callback: rcu_preempt
> > rhp=000000000a8ca6f9 func=__free_vm_area_struct.cfi_jt
> >   <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt
> > rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
> >   <...>-29      [001] d..1. 21950.152625: rcu_batch_end: rcu_preempt CBs-
> > invoked=3612 idle=....
> > <snip>
> >
> 
> Did the results above tell us that CBs-invoked=3612 during the time 21950.145313 ~ 21950.152625?
> 
>Yes.
>
>
> If possible, may I know the steps, commands, and related parameters to produce the results above?
> Thank you!
> 
>Build the kernel with CONFIG_RCU_TRACE configuration. Update your "set_event"
>file with appropriate traces:
>
><snip>
>XQ-DQ54:/sys/kernel/tracing # echo rcu:rcu_batch_start rcu:rcu_batch_end rcu:rcu_invoke_callback > set_event
>
>XQ-DQ54:/sys/kernel/tracing # cat set_event
>rcu:rcu_batch_start
>rcu:rcu_invoke_callback
>rcu:rcu_batch_end
>XQ-DQ54:/sys/kernel/tracing #
><snip>
>
>Collect traces as much as you want: XQ-DQ54:/sys/kernel/tracing # echo 1 > tracing_on; sleep 10; echo 0 > tracing_on
>Next problem is how to parse it. Of course you will not be able to parse
>megabytes of traces. For that purpose i use a special C trace parser.
>If you need an example please let me know i can show here.
>
>--
>Uladzislau Rezki
  
Uladzislau Rezki March 27, 2023, 5:47 p.m. UTC | #7
On Mon, Mar 27, 2023 at 11:21:23AM +0000, Zhang, Qiang1 wrote:
> > > From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > > Sent: Tuesday, March 21, 2023 6:28 PM
> > > [...]
> > > Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> > > 
> > > A call to a synchronize_rcu() can be expensive from time point of view.
> > > Different workloads can be affected by this especially the ones which use this
> > > API in its time critical sections.
> > > 
> > 
> > This is interesting and meaningful research. ;-)
> > 
> > > For example in case of NOCB scenario the wakeme_after_rcu() callback
> > > invocation depends on where in a nocb-list it is located. Below is an example
> > > when it was the last out of ~3600 callbacks:
> >
> 
> 
> 
> Can it be implemented separately as follows?  it seems that the code is simpler
> (only personal opinion)  😊.
> 
> But I didn't test whether this reduce synchronize_rcu() waiting time
> 
> +static void rcu_poll_wait_gp(struct rcu_tasks *rtp)
> +{
> +       unsigned long gp_snap;
> +
> +       gp_snap = start_poll_synchronize_rcu();
> +       while (!poll_state_synchronize_rcu(gp_snap))
> +               schedule_timeout_idle(1);
> +}
> +
> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func);
> +DEFINE_RCU_TASKS(rcu_poll, rcu_poll_wait_gp, call_rcu_poll,
> +                 "RCU Poll");
> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func)
> +{
> +       call_rcu_tasks_generic(rhp, func, &rcu_poll);
> +}
> +EXPORT_SYMBOL_GPL(call_rcu_poll);
> +
> +void synchronize_rcu_poll(void)
> +{
> +       synchronize_rcu_tasks_generic(&rcu_poll);
> +}
> +EXPORT_SYMBOL_GPL(synchronize_rcu_poll);
> +
> +static int __init rcu_spawn_poll_kthread(void)
> +{
> +       cblist_init_generic(&rcu_poll);
> +       rcu_poll.gp_sleep = HZ / 10;
> +       rcu_spawn_tasks_kthread_generic(&rcu_poll);
> +       return 0;
> +}
> 
Uh.. I am working on v2 of original patch where i need to add a Kconfig
parameter. You are inventing a new API :)

--
Uladzislau Rezki
  
Joel Fernandes March 27, 2023, 9:23 p.m. UTC | #8
On Mon, Mar 27, 2023 at 7:21 AM Zhang, Qiang1 <qiang1.zhang@intel.com> wrote:
>
> > > From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > > Sent: Tuesday, March 21, 2023 6:28 PM
> > > [...]
> > > Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> > >
> > > A call to a synchronize_rcu() can be expensive from time point of view.
> > > Different workloads can be affected by this especially the ones which use this
> > > API in its time critical sections.
> > >
> >
> > This is interesting and meaningful research. ;-)
> >
> > > For example in case of NOCB scenario the wakeme_after_rcu() callback
> > > invocation depends on where in a nocb-list it is located. Below is an example
> > > when it was the last out of ~3600 callbacks:
> >
>
>
>
> Can it be implemented separately as follows?  it seems that the code is simpler
> (only personal opinion)
>
> But I didn't test whether this reduce synchronize_rcu() waiting time

Isn't it broken because you are trying to implement synchronize_rcu()
on top of a different RCU implementation: tasks-RCU? Or did I miss
something?

Also, I think Vlad is trying to improve the existing synchronize_rcu()
by shortcutting the wake up of the sleeper (instead of going through
an async callback which in turn did a wakeup). So he's not changing
the RCU implementation, he is just making it faster.

thanks,

 - Joel


> +static void rcu_poll_wait_gp(struct rcu_tasks *rtp)
> +{
> +       unsigned long gp_snap;
> +
> +       gp_snap = start_poll_synchronize_rcu();
> +       while (!poll_state_synchronize_rcu(gp_snap))
> +               schedule_timeout_idle(1);
> +}
> +
> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func);
> +DEFINE_RCU_TASKS(rcu_poll, rcu_poll_wait_gp, call_rcu_poll,
> +                 "RCU Poll");
> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func)
> +{
> +       call_rcu_tasks_generic(rhp, func, &rcu_poll);
> +}
> +EXPORT_SYMBOL_GPL(call_rcu_poll);
> +
> +void synchronize_rcu_poll(void)
> +{
> +       synchronize_rcu_tasks_generic(&rcu_poll);
> +}
> +EXPORT_SYMBOL_GPL(synchronize_rcu_poll);
> +
> +static int __init rcu_spawn_poll_kthread(void)
> +{
> +       cblist_init_generic(&rcu_poll);
> +       rcu_poll.gp_sleep = HZ / 10;
> +       rcu_spawn_tasks_kthread_generic(&rcu_poll);
> +       return 0;
> +}
>
> Thanks
> Zqiang
>
>
> > >
> > > <snip>
> > >   <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt
> > > CBs=3613 bl=28
> > > ...
> > >   <...>-29      [001] ..... 21950.152578: rcu_invoke_callback: rcu_preempt
> > > rhp=00000000b2d6dee8 func=__free_vm_area_struct.cfi_jt
> > >   <...>-29      [001] ..... 21950.152579: rcu_invoke_callback: rcu_preempt
> > > rhp=00000000a446f607 func=__free_vm_area_struct.cfi_jt
> > >   <...>-29      [001] ..... 21950.152580: rcu_invoke_callback: rcu_preempt
> > > rhp=00000000a5cab03b func=__free_vm_area_struct.cfi_jt
> > >   <...>-29      [001] ..... 21950.152581: rcu_invoke_callback: rcu_preempt
> > > rhp=0000000013b7e5ee func=__free_vm_area_struct.cfi_jt
> > >   <...>-29      [001] ..... 21950.152582: rcu_invoke_callback: rcu_preempt
> > > rhp=000000000a8ca6f9 func=__free_vm_area_struct.cfi_jt
> > >   <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt
> > > rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
> > >   <...>-29      [001] d..1. 21950.152625: rcu_batch_end: rcu_preempt CBs-
> > > invoked=3612 idle=....
> > > <snip>
> > >
> >
> > Did the results above tell us that CBs-invoked=3612 during the time 21950.145313 ~ 21950.152625?
> >
> >Yes.
> >
> >
> > If possible, may I know the steps, commands, and related parameters to produce the results above?
> > Thank you!
> >
> >Build the kernel with CONFIG_RCU_TRACE configuration. Update your "set_event"
> >file with appropriate traces:
> >
> ><snip>
> >XQ-DQ54:/sys/kernel/tracing # echo rcu:rcu_batch_start rcu:rcu_batch_end rcu:rcu_invoke_callback > set_event
> >
> >XQ-DQ54:/sys/kernel/tracing # cat set_event
> >rcu:rcu_batch_start
> >rcu:rcu_invoke_callback
> >rcu:rcu_batch_end
> >XQ-DQ54:/sys/kernel/tracing #
> ><snip>
> >
> >Collect traces as much as you want: XQ-DQ54:/sys/kernel/tracing # echo 1 > tracing_on; sleep 10; echo 0 > tracing_on
> >Next problem is how to parse it. Of course you will not be able to parse
> >megabytes of traces. For that purpose i use a special C trace parser.
> >If you need an example please let me know i can show here.
> >
> >--
> >Uladzislau Rezki
  
Steven Rostedt March 27, 2023, 9:48 p.m. UTC | #9
On Tue, 21 Mar 2023 16:15:53 +0100
Uladzislau Rezki <urezki@gmail.com> wrote:

> Collect traces as much as you want: XQ-DQ54:/sys/kernel/tracing # echo 1 > tracing_on; sleep 10; echo 0 > tracing_on
> Next problem is how to parse it. Of course you will not be able to parse
> megabytes of traces. For that purpose i use a special C trace parser.
> If you need an example please let me know i can show here.

Not sure if you are familiar with trace-cmd, but the above could have been:

 # trace-cmd record -e rcu sleep 10

and then you get the trace.dat file, which reports as:

 # trace-cmd report

If you need special parsing, there's a libtracecmd library that lets you do
all that!

  https://www.trace-cmd.org/Documentation/libtracecmd/

And for parsing events:

  https://www.trace-cmd.org/Documentation/libtraceevent/


Basically have:

struct my_info {
	/* store state info here */
};

int main(...) {
	struct tracecmd_input *handle;
	struct my_info info;
	char *file = argv[1];

	handle = tracecmd_open(file);

	tracecmd_follow_event(handle, "rcu", "rcu_batch_start",
			batch_start, &info);

	tracecmd_follow_event(handle, "rcu", "rcu_batch_end",
			batch_end, &info);

	tracecmd_follow_event(handle, "rcu", "rcu_invoke_callback",
			invoke_callback, &info);

	tracecmd_iterate_events(handle, NULL, 0, NULL, NULL);

	tracecmd_close(handle);
}

Where it will iterate the "trace.dat" file passed it, and every time it
hits an event registered by follow_event it will call that function:

static int batch_start(struct tracecmd_input *handle, struct tep_event *event,
		struct tep_record *record, int cpu, void *data)
{
	struct my_info *info = data;

	info->start_timestamp = record->ts;

	return 0;
}

static int batch_end(struct tracecmd_input *handle, struct tep_event *event,
		struct tep_record *record, int cpu, void *data)
{
	struct my_info *info = data;


	printf("time = %lld -> %lld\n", info->start_timestapm,
				record->ts);
	return 0;
}

static int invoke_callback(struct tracecmd_input *handle, struct tep_event *event,
		struct tep_record *record, int cpu, void *data)
{
	struct my_info *info = data;
	struct tep_handle *tep = tracecmd_get_tep(handle);
	static struct tep_format_field *ip_field;
	unsigned long long ip;
	const char *func;
	int pid;

	if (!ip_field)
		ip_field = tep_find_field(event, "func");

	tep_read_number_field(ip_field, record->data, &ip);
	func = tep_find_function(tep, ip);

	pid = tep_data_pid(tep, record);

	if (func)
		printf("Processing function %s for pid %d\n", func, pid);
	else
		printf("Processing address 0x%llx for pid %d\n", ip, pid);

	return 0;
}


And much more ;-)

Oh, and if you just want to read the live trace without recording, you
could always use libtracefs:

 https://www.trace-cmd.org/Documentation/libtracefs/

And instead of using tracecmd_follow_event() with
tracecmd_iterate_events(), you can use:

	const char *systems = { "rcu" };

	tep = tracefs_local_events_systems(NULL, systems);

	tracefs_follow_event(tep, NULL, "rcu", "rcu_invoke_callback",
			invoke_callback, &info);

and iterate the live events with:

	tracefs_iterate_raw_events(tep, NULL, NULL, 0, NULL, NULL);


With the callback for this being (very similar):

static int invoke_callback(struct tep_event *event,
		struct tep_record *record, int cpu, void *data)
{
	struct my_info *info = data;
	struct tep_handle *tep = event->tep;
	static struct tep_format_field *ip_field;
	unsigned long long ip;
	const char *func;
	int pid;

	if (!ip_field)
		ip_field = tep_find_field(event, "func");

	tep_read_number_field(ip_field, record->data, &ip);
	func = tep_find_function(tep, ip);

	pid = tep_data_pid(tep, record);

	if (func)
		printf("Processing function %s for pid %d\n", func, pid);
	else
		printf("Processing address 0x%llx for pid %d\n", ip, pid);

	return 0;
}


-- Steve
  
Steven Rostedt March 27, 2023, 9:50 p.m. UTC | #10
On Mon, 27 Mar 2023 17:48:44 -0400
Steven Rostedt <rostedt@goodmis.org> wrote:

> struct my_info {
> 	/* store state info here */
> };
> 
> int main(...) {
> 	struct tracecmd_input *handle;
> 	struct my_info info;
> 	char *file = argv[1];
> 
> 	handle = tracecmd_open(file);
> 
> 	tracecmd_follow_event(handle, "rcu", "rcu_batch_start",
> 			batch_start, &info);
> 
> 	tracecmd_follow_event(handle, "rcu", "rcu_batch_end",
> 			batch_end, &info);
> 
> 	tracecmd_follow_event(handle, "rcu", "rcu_invoke_callback",
> 			invoke_callback, &info);
> 
> 	tracecmd_iterate_events(handle, NULL, 0, NULL, NULL);
> 
> 	tracecmd_close(handle);
> }

BTW, none of this code was actually tested, so I may have some syntax
errors. I just did this straight from memory, as it's so easy ;-)

-- Steve
  
Zqiang March 28, 2023, 12:12 a.m. UTC | #11
>
> > > From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > > Sent: Tuesday, March 21, 2023 6:28 PM
> > > [...]
> > > Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> > >
> > > A call to a synchronize_rcu() can be expensive from time point of view.
> > > Different workloads can be affected by this especially the ones which use this
> > > API in its time critical sections.
> > >
> >
> > This is interesting and meaningful research. ;-)
> >
> > > For example in case of NOCB scenario the wakeme_after_rcu() callback
> > > invocation depends on where in a nocb-list it is located. Below is an example
> > > when it was the last out of ~3600 callbacks:
> >
>
>
>
> Can it be implemented separately as follows?  it seems that the code is simpler
> (only personal opinion)
>
> But I didn't test whether this reduce synchronize_rcu() waiting time
>
>Isn't it broken because you are trying to implement synchronize_rcu()
>on top of a different RCU implementation: tasks-RCU? Or did I miss
>something?
>
>
>Also, I think Vlad is trying to improve the existing synchronize_rcu()
>by shortcutting the wake up of the sleeper (instead of going through
>an async callback which in turn did a wakeup). So he's not changing
>the RCU implementation, he is just making it faster.

Agree,  this is improve the existing synchronize_rcu(),  make synchronize_rcu()
out of nocb control and the wake-up of synchronize_rcu() is realized separately.

Thanks
Zqiang

>
>thanks,
>
> - Joel
>
>
> +static void rcu_poll_wait_gp(struct rcu_tasks *rtp)
> +{
> +       unsigned long gp_snap;
> +
> +       gp_snap = start_poll_synchronize_rcu();
> +       while (!poll_state_synchronize_rcu(gp_snap))
> +               schedule_timeout_idle(1);
> +}
> +
> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func);
> +DEFINE_RCU_TASKS(rcu_poll, rcu_poll_wait_gp, call_rcu_poll,
> +                 "RCU Poll");
> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func)
> +{
> +       call_rcu_tasks_generic(rhp, func, &rcu_poll);
> +}
> +EXPORT_SYMBOL_GPL(call_rcu_poll);
> +
> +void synchronize_rcu_poll(void)
> +{
> +       synchronize_rcu_tasks_generic(&rcu_poll);
> +}
> +EXPORT_SYMBOL_GPL(synchronize_rcu_poll);
> +
> +static int __init rcu_spawn_poll_kthread(void)
> +{
> +       cblist_init_generic(&rcu_poll);
> +       rcu_poll.gp_sleep = HZ / 10;
> +       rcu_spawn_tasks_kthread_generic(&rcu_poll);
> +       return 0;
> +}
>
> Thanks
> Zqiang
>
>
> > >
> > > <snip>
> > >   <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt
> > > CBs=3613 bl=28
> > > ...
> > >   <...>-29      [001] ..... 21950.152578: rcu_invoke_callback: rcu_preempt
> > > rhp=00000000b2d6dee8 func=__free_vm_area_struct.cfi_jt
> > >   <...>-29      [001] ..... 21950.152579: rcu_invoke_callback: rcu_preempt
> > > rhp=00000000a446f607 func=__free_vm_area_struct.cfi_jt
> > >   <...>-29      [001] ..... 21950.152580: rcu_invoke_callback: rcu_preempt
> > > rhp=00000000a5cab03b func=__free_vm_area_struct.cfi_jt
> > >   <...>-29      [001] ..... 21950.152581: rcu_invoke_callback: rcu_preempt
> > > rhp=0000000013b7e5ee func=__free_vm_area_struct.cfi_jt
> > >   <...>-29      [001] ..... 21950.152582: rcu_invoke_callback: rcu_preempt
> > > rhp=000000000a8ca6f9 func=__free_vm_area_struct.cfi_jt
> > >   <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt
> > > rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
> > >   <...>-29      [001] d..1. 21950.152625: rcu_batch_end: rcu_preempt CBs-
> > > invoked=3612 idle=....
> > > <snip>
> > >
> >
> > Did the results above tell us that CBs-invoked=3612 during the time 21950.145313 ~ 21950.152625?
> >
> >Yes.
> >
> >
> > If possible, may I know the steps, commands, and related parameters to produce the results above?
> > Thank you!
> >
> >Build the kernel with CONFIG_RCU_TRACE configuration. Update your "set_event"
> >file with appropriate traces:
> >
> ><snip>
> >XQ-DQ54:/sys/kernel/tracing # echo rcu:rcu_batch_start rcu:rcu_batch_end rcu:rcu_invoke_callback > set_event
> >
> >XQ-DQ54:/sys/kernel/tracing # cat set_event
> >rcu:rcu_batch_start
> >rcu:rcu_invoke_callback
> >rcu:rcu_batch_end
> >XQ-DQ54:/sys/kernel/tracing #
> ><snip>
> >
> >Collect traces as much as you want: XQ-DQ54:/sys/kernel/tracing # echo 1 > tracing_on; sleep 10; echo 0 > tracing_on
> >Next problem is how to parse it. Of course you will not be able to parse
> >megabytes of traces. For that purpose i use a special C trace parser.
> >If you need an example please let me know i can show here.
> >
> >--
> >Uladzislau Rezki
  
Zqiang March 28, 2023, 12:14 a.m. UTC | #12
> > > From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > > Sent: Tuesday, March 21, 2023 6:28 PM [...]
> > > Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> > > 
> > > A call to a synchronize_rcu() can be expensive from time point of view.
> > > Different workloads can be affected by this especially the ones 
> > > which use this API in its time critical sections.
> > > 
> > 
> > This is interesting and meaningful research. ;-)
> > 
> > > For example in case of NOCB scenario the wakeme_after_rcu() 
> > > callback invocation depends on where in a nocb-list it is located. 
> > > Below is an example when it was the last out of ~3600 callbacks:
> >
> 
> 
> 
> Can it be implemented separately as follows?  it seems that the code 
> is simpler (only personal opinion)  😊.
> 
> But I didn't test whether this reduce synchronize_rcu() waiting time
> 
> +static void rcu_poll_wait_gp(struct rcu_tasks *rtp) {
> +       unsigned long gp_snap;
> +
> +       gp_snap = start_poll_synchronize_rcu();
> +       while (!poll_state_synchronize_rcu(gp_snap))
> +               schedule_timeout_idle(1); }
> +
> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func); 
> +DEFINE_RCU_TASKS(rcu_poll, rcu_poll_wait_gp, call_rcu_poll,
> +                 "RCU Poll");
> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func) {
> +       call_rcu_tasks_generic(rhp, func, &rcu_poll); } 
> +EXPORT_SYMBOL_GPL(call_rcu_poll);
> +
> +void synchronize_rcu_poll(void)
> +{
> +       synchronize_rcu_tasks_generic(&rcu_poll);
> +}
> +EXPORT_SYMBOL_GPL(synchronize_rcu_poll);
> +
> +static int __init rcu_spawn_poll_kthread(void) {
> +       cblist_init_generic(&rcu_poll);
> +       rcu_poll.gp_sleep = HZ / 10;
> +       rcu_spawn_tasks_kthread_generic(&rcu_poll);
> +       return 0;
> +}
> 
>Uh.. I am working on v2 of original patch where i need to add a Kconfig parameter. You are inventing a new API :)

Looking forward to V2 😊.

Thanks
Zqiang

>
>--
>Uladzislau Rezki
  
Paul E. McKenney March 28, 2023, 1:06 a.m. UTC | #13
On Mon, Mar 27, 2023 at 11:21:23AM +0000, Zhang, Qiang1 wrote:
> > > From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > > Sent: Tuesday, March 21, 2023 6:28 PM
> > > [...]
> > > Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> > > 
> > > A call to a synchronize_rcu() can be expensive from time point of view.
> > > Different workloads can be affected by this especially the ones which use this
> > > API in its time critical sections.
> > > 
> > 
> > This is interesting and meaningful research. ;-)
> > 
> > > For example in case of NOCB scenario the wakeme_after_rcu() callback
> > > invocation depends on where in a nocb-list it is located. Below is an example
> > > when it was the last out of ~3600 callbacks:
> >
> 
> 
> 
> Can it be implemented separately as follows?  it seems that the code is simpler
> (only personal opinion)  😊.
> 
> But I didn't test whether this reduce synchronize_rcu() waiting time
> 
> +static void rcu_poll_wait_gp(struct rcu_tasks *rtp)
> +{
> +       unsigned long gp_snap;
> +
> +       gp_snap = start_poll_synchronize_rcu();
> +       while (!poll_state_synchronize_rcu(gp_snap))
> +               schedule_timeout_idle(1);

I could be wrong, but my guess is that the guys working with
battery-powered devices are not going to be very happy with this loop.

All those wakeups by all tasks waiting for a grace period end up
consuming a surprisingly large amount of energy.

							Thanx, Paul

> +}
> +
> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func);
> +DEFINE_RCU_TASKS(rcu_poll, rcu_poll_wait_gp, call_rcu_poll,
> +                 "RCU Poll");
> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func)
> +{
> +       call_rcu_tasks_generic(rhp, func, &rcu_poll);
> +}
> +EXPORT_SYMBOL_GPL(call_rcu_poll);
> +
> +void synchronize_rcu_poll(void)
> +{
> +       synchronize_rcu_tasks_generic(&rcu_poll);
> +}
> +EXPORT_SYMBOL_GPL(synchronize_rcu_poll);
> +
> +static int __init rcu_spawn_poll_kthread(void)
> +{
> +       cblist_init_generic(&rcu_poll);
> +       rcu_poll.gp_sleep = HZ / 10;
> +       rcu_spawn_tasks_kthread_generic(&rcu_poll);
> +       return 0;
> +}
> 
> Thanks
> Zqiang
> 
> 
> > > 
> > > <snip>
> > >   <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt
> > > CBs=3613 bl=28
> > > ...
> > >   <...>-29      [001] ..... 21950.152578: rcu_invoke_callback: rcu_preempt
> > > rhp=00000000b2d6dee8 func=__free_vm_area_struct.cfi_jt
> > >   <...>-29      [001] ..... 21950.152579: rcu_invoke_callback: rcu_preempt
> > > rhp=00000000a446f607 func=__free_vm_area_struct.cfi_jt
> > >   <...>-29      [001] ..... 21950.152580: rcu_invoke_callback: rcu_preempt
> > > rhp=00000000a5cab03b func=__free_vm_area_struct.cfi_jt
> > >   <...>-29      [001] ..... 21950.152581: rcu_invoke_callback: rcu_preempt
> > > rhp=0000000013b7e5ee func=__free_vm_area_struct.cfi_jt
> > >   <...>-29      [001] ..... 21950.152582: rcu_invoke_callback: rcu_preempt
> > > rhp=000000000a8ca6f9 func=__free_vm_area_struct.cfi_jt
> > >   <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt
> > > rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
> > >   <...>-29      [001] d..1. 21950.152625: rcu_batch_end: rcu_preempt CBs-
> > > invoked=3612 idle=....
> > > <snip>
> > >
> > 
> > Did the results above tell us that CBs-invoked=3612 during the time 21950.145313 ~ 21950.152625?
> > 
> >Yes.
> >
> >
> > If possible, may I know the steps, commands, and related parameters to produce the results above?
> > Thank you!
> > 
> >Build the kernel with CONFIG_RCU_TRACE configuration. Update your "set_event"
> >file with appropriate traces:
> >
> ><snip>
> >XQ-DQ54:/sys/kernel/tracing # echo rcu:rcu_batch_start rcu:rcu_batch_end rcu:rcu_invoke_callback > set_event
> >
> >XQ-DQ54:/sys/kernel/tracing # cat set_event
> >rcu:rcu_batch_start
> >rcu:rcu_invoke_callback
> >rcu:rcu_batch_end
> >XQ-DQ54:/sys/kernel/tracing #
> ><snip>
> >
> >Collect traces as much as you want: XQ-DQ54:/sys/kernel/tracing # echo 1 > tracing_on; sleep 10; echo 0 > tracing_on
> >Next problem is how to parse it. Of course you will not be able to parse
> >megabytes of traces. For that purpose i use a special C trace parser.
> >If you need an example please let me know i can show here.
> >
> >--
> >Uladzislau Rezki
  
Qiuxu Zhuo March 28, 2023, 1:28 a.m. UTC | #14
> From: Steven Rostedt <rostedt@goodmis.org>
> Sent: Tuesday, March 28, 2023 5:51 AM
> [...]
> Subject: Re: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> 
> On Mon, 27 Mar 2023 17:48:44 -0400
> Steven Rostedt <rostedt@goodmis.org> wrote:
> 
> > struct my_info {
> > 	/* store state info here */
> > };
> >
> > int main(...) {
> > 	struct tracecmd_input *handle;
> > 	struct my_info info;
> > 	char *file = argv[1];
> >
> > 	handle = tracecmd_open(file);
> >
> > 	tracecmd_follow_event(handle, "rcu", "rcu_batch_start",
> > 			batch_start, &info);
> >
> > 	tracecmd_follow_event(handle, "rcu", "rcu_batch_end",
> > 			batch_end, &info);
> >
> > 	tracecmd_follow_event(handle, "rcu", "rcu_invoke_callback",
> > 			invoke_callback, &info);
> >
> > 	tracecmd_iterate_events(handle, NULL, 0, NULL, NULL);
> >
> > 	tracecmd_close(handle);
> > }
> 
> BTW, none of this code was actually tested, so I may have some syntax
> errors. I just did this straight from memory, as it's so easy ;-)

Thank you Steven for sharing the libtracecmd-based and libtraceevent-based 
methods for trace parsing. Good to know this. This should simplify the code 
to parse trace logs. Pretty useful for me/us ;-)

- Qiuxu

> 
> -- Steve
  
Zqiang March 28, 2023, 1:32 a.m. UTC | #15
> > > From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > > Sent: Tuesday, March 21, 2023 6:28 PM
> > > [...]
> > > Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> > > 
> > > A call to a synchronize_rcu() can be expensive from time point of view.
> > > Different workloads can be affected by this especially the ones which use this
> > > API in its time critical sections.
> > > 
> > 
> > This is interesting and meaningful research. ;-)
> > 
> > > For example in case of NOCB scenario the wakeme_after_rcu() callback
> > > invocation depends on where in a nocb-list it is located. Below is an example
> > > when it was the last out of ~3600 callbacks:
> >
> 
> 
> 
> Can it be implemented separately as follows?  it seems that the code is simpler
> (only personal opinion)  😊.
> 
> But I didn't test whether this reduce synchronize_rcu() waiting time
> 
> +static void rcu_poll_wait_gp(struct rcu_tasks *rtp)
> +{
> +       unsigned long gp_snap;
> +
> +       gp_snap = start_poll_synchronize_rcu();
> +       while (!poll_state_synchronize_rcu(gp_snap))
> +               schedule_timeout_idle(1);
>
>I could be wrong, but my guess is that the guys working with
>battery-powered devices are not going to be very happy with this loop.
>
>All those wakeups by all tasks waiting for a grace period end up
>consuming a surprisingly large amount of energy.


Agree,  maybe Uladzislau 's patch will have similar problems.

Thanks
Zqiang


>
>							Thanx, Paul
>
> +}
> +
> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func);
> +DEFINE_RCU_TASKS(rcu_poll, rcu_poll_wait_gp, call_rcu_poll,
> +                 "RCU Poll");
> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func)
> +{
> +       call_rcu_tasks_generic(rhp, func, &rcu_poll);
> +}
> +EXPORT_SYMBOL_GPL(call_rcu_poll);
> +
> +void synchronize_rcu_poll(void)
> +{
> +       synchronize_rcu_tasks_generic(&rcu_poll);
> +}
> +EXPORT_SYMBOL_GPL(synchronize_rcu_poll);
> +
> +static int __init rcu_spawn_poll_kthread(void)
> +{
> +       cblist_init_generic(&rcu_poll);
> +       rcu_poll.gp_sleep = HZ / 10;
> +       rcu_spawn_tasks_kthread_generic(&rcu_poll);
> +       return 0;
> +}
> 
> Thanks
> Zqiang
> 
> 
> > > 
> > > <snip>
> > >   <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt
> > > CBs=3613 bl=28
> > > ...
> > >   <...>-29      [001] ..... 21950.152578: rcu_invoke_callback: rcu_preempt
> > > rhp=00000000b2d6dee8 func=__free_vm_area_struct.cfi_jt
> > >   <...>-29      [001] ..... 21950.152579: rcu_invoke_callback: rcu_preempt
> > > rhp=00000000a446f607 func=__free_vm_area_struct.cfi_jt
> > >   <...>-29      [001] ..... 21950.152580: rcu_invoke_callback: rcu_preempt
> > > rhp=00000000a5cab03b func=__free_vm_area_struct.cfi_jt
> > >   <...>-29      [001] ..... 21950.152581: rcu_invoke_callback: rcu_preempt
> > > rhp=0000000013b7e5ee func=__free_vm_area_struct.cfi_jt
> > >   <...>-29      [001] ..... 21950.152582: rcu_invoke_callback: rcu_preempt
> > > rhp=000000000a8ca6f9 func=__free_vm_area_struct.cfi_jt
> > >   <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt
> > > rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
> > >   <...>-29      [001] d..1. 21950.152625: rcu_batch_end: rcu_preempt CBs-
> > > invoked=3612 idle=....
> > > <snip>
> > >
> > 
> > Did the results above tell us that CBs-invoked=3612 during the time 21950.145313 ~ 21950.152625?
> > 
> >Yes.
> >
> >
> > If possible, may I know the steps, commands, and related parameters to produce the results above?
> > Thank you!
> > 
> >Build the kernel with CONFIG_RCU_TRACE configuration. Update your "set_event"
> >file with appropriate traces:
> >
> ><snip>
> >XQ-DQ54:/sys/kernel/tracing # echo rcu:rcu_batch_start rcu:rcu_batch_end rcu:rcu_invoke_callback > set_event
> >
> >XQ-DQ54:/sys/kernel/tracing # cat set_event
> >rcu:rcu_batch_start
> >rcu:rcu_invoke_callback
> >rcu:rcu_batch_end
> >XQ-DQ54:/sys/kernel/tracing #
> ><snip>
> >
> >Collect traces as much as you want: XQ-DQ54:/sys/kernel/tracing # echo 1 > tracing_on; sleep 10; echo 0 > tracing_on
> >Next problem is how to parse it. Of course you will not be able to parse
> >megabytes of traces. For that purpose i use a special C trace parser.
> >If you need an example please let me know i can show here.
> >
> >--
> >Uladzislau Rezki
  
Paul E. McKenney March 28, 2023, 1:59 a.m. UTC | #16
On Tue, Mar 28, 2023 at 01:32:56AM +0000, Zhang, Qiang1 wrote:
> > > > From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > > > Sent: Tuesday, March 21, 2023 6:28 PM
> > > > [...]
> > > > Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> > > > 
> > > > A call to a synchronize_rcu() can be expensive from time point of view.
> > > > Different workloads can be affected by this especially the ones which use this
> > > > API in its time critical sections.
> > > > 
> > > 
> > > This is interesting and meaningful research. ;-)
> > > 
> > > > For example in case of NOCB scenario the wakeme_after_rcu() callback
> > > > invocation depends on where in a nocb-list it is located. Below is an example
> > > > when it was the last out of ~3600 callbacks:
> > >
> > 
> > 
> > 
> > Can it be implemented separately as follows?  it seems that the code is simpler
> > (only personal opinion)  😊.
> > 
> > But I didn't test whether this reduce synchronize_rcu() waiting time
> > 
> > +static void rcu_poll_wait_gp(struct rcu_tasks *rtp)
> > +{
> > +       unsigned long gp_snap;
> > +
> > +       gp_snap = start_poll_synchronize_rcu();
> > +       while (!poll_state_synchronize_rcu(gp_snap))
> > +               schedule_timeout_idle(1);
> >
> >I could be wrong, but my guess is that the guys working with
> >battery-powered devices are not going to be very happy with this loop.
> >
> >All those wakeups by all tasks waiting for a grace period end up
> >consuming a surprisingly large amount of energy.
> 
> Agree,  maybe Uladzislau 's patch will have similar problems.

We will soon find out.  ;-)

							Thanx, Paul

> Thanks
> Zqiang
> 
> 
> >
> >							Thanx, Paul
> >
> > +}
> > +
> > +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func);
> > +DEFINE_RCU_TASKS(rcu_poll, rcu_poll_wait_gp, call_rcu_poll,
> > +                 "RCU Poll");
> > +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func)
> > +{
> > +       call_rcu_tasks_generic(rhp, func, &rcu_poll);
> > +}
> > +EXPORT_SYMBOL_GPL(call_rcu_poll);
> > +
> > +void synchronize_rcu_poll(void)
> > +{
> > +       synchronize_rcu_tasks_generic(&rcu_poll);
> > +}
> > +EXPORT_SYMBOL_GPL(synchronize_rcu_poll);
> > +
> > +static int __init rcu_spawn_poll_kthread(void)
> > +{
> > +       cblist_init_generic(&rcu_poll);
> > +       rcu_poll.gp_sleep = HZ / 10;
> > +       rcu_spawn_tasks_kthread_generic(&rcu_poll);
> > +       return 0;
> > +}
> > 
> > Thanks
> > Zqiang
> > 
> > 
> > > > 
> > > > <snip>
> > > >   <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt
> > > > CBs=3613 bl=28
> > > > ...
> > > >   <...>-29      [001] ..... 21950.152578: rcu_invoke_callback: rcu_preempt
> > > > rhp=00000000b2d6dee8 func=__free_vm_area_struct.cfi_jt
> > > >   <...>-29      [001] ..... 21950.152579: rcu_invoke_callback: rcu_preempt
> > > > rhp=00000000a446f607 func=__free_vm_area_struct.cfi_jt
> > > >   <...>-29      [001] ..... 21950.152580: rcu_invoke_callback: rcu_preempt
> > > > rhp=00000000a5cab03b func=__free_vm_area_struct.cfi_jt
> > > >   <...>-29      [001] ..... 21950.152581: rcu_invoke_callback: rcu_preempt
> > > > rhp=0000000013b7e5ee func=__free_vm_area_struct.cfi_jt
> > > >   <...>-29      [001] ..... 21950.152582: rcu_invoke_callback: rcu_preempt
> > > > rhp=000000000a8ca6f9 func=__free_vm_area_struct.cfi_jt
> > > >   <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt
> > > > rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
> > > >   <...>-29      [001] d..1. 21950.152625: rcu_batch_end: rcu_preempt CBs-
> > > > invoked=3612 idle=....
> > > > <snip>
> > > >
> > > 
> > > Did the results above tell us that CBs-invoked=3612 during the time 21950.145313 ~ 21950.152625?
> > > 
> > >Yes.
> > >
> > >
> > > If possible, may I know the steps, commands, and related parameters to produce the results above?
> > > Thank you!
> > > 
> > >Build the kernel with CONFIG_RCU_TRACE configuration. Update your "set_event"
> > >file with appropriate traces:
> > >
> > ><snip>
> > >XQ-DQ54:/sys/kernel/tracing # echo rcu:rcu_batch_start rcu:rcu_batch_end rcu:rcu_invoke_callback > set_event
> > >
> > >XQ-DQ54:/sys/kernel/tracing # cat set_event
> > >rcu:rcu_batch_start
> > >rcu:rcu_invoke_callback
> > >rcu:rcu_batch_end
> > >XQ-DQ54:/sys/kernel/tracing #
> > ><snip>
> > >
> > >Collect traces as much as you want: XQ-DQ54:/sys/kernel/tracing # echo 1 > tracing_on; sleep 10; echo 0 > tracing_on
> > >Next problem is how to parse it. Of course you will not be able to parse
> > >megabytes of traces. For that purpose i use a special C trace parser.
> > >If you need an example please let me know i can show here.
> > >
> > >--
> > >Uladzislau Rezki
  
Joel Fernandes March 28, 2023, 2:29 a.m. UTC | #17
Hello,

> On Mar 27, 2023, at 9:06 PM, Paul E. McKenney <paulmck@kernel.org> wrote:
> 
> On Mon, Mar 27, 2023 at 11:21:23AM +0000, Zhang, Qiang1 wrote:
>>>> From: Uladzislau Rezki (Sony) <urezki@gmail.com>
>>>> Sent: Tuesday, March 21, 2023 6:28 PM
>>>> [...]
>>>> Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
>>>> 
>>>> A call to a synchronize_rcu() can be expensive from time point of view.
>>>> Different workloads can be affected by this especially the ones which use this
>>>> API in its time critical sections.
>>>> 
>>> 
>>> This is interesting and meaningful research. ;-)
>>> 
>>>> For example in case of NOCB scenario the wakeme_after_rcu() callback
>>>> invocation depends on where in a nocb-list it is located. Below is an example
>>>> when it was the last out of ~3600 callbacks:
>>> 
>> 
>> 
>> 
>> Can it be implemented separately as follows?  it seems that the code is simpler
>> (only personal opinion)  😊.
>> 
>> But I didn't test whether this reduce synchronize_rcu() waiting time
>> 
>> +static void rcu_poll_wait_gp(struct rcu_tasks *rtp)
>> +{
>> +       unsigned long gp_snap;
>> +
>> +       gp_snap = start_poll_synchronize_rcu();
>> +       while (!poll_state_synchronize_rcu(gp_snap))
>> +               schedule_timeout_idle(1);
> 
> I could be wrong, but my guess is that the guys working with
> battery-powered devices are not going to be very happy with this loop.
> 
> All those wakeups by all tasks waiting for a grace period end up
> consuming a surprisingly large amount of energy.

Is that really the common case? On the general topic of wake-ups:
Most of the time there should be only one
task waiting synchronously on a GP to end. If that is
true, then it feels like waking
up nocb Kthreads which indirectly wake other threads is doing more work than usual?

I am curious to measure how much does Vlad patch reduce wakeups in the common case.

I was also wondering how Vlad patch effects RCU-barrier ordering. I guess
we want the wake up to happen in the order of
other callbacks also waiting.

One last note, most battery powered systems are perhaps already using expedited RCU ;-)

Thoughts?

 - Joel 

> 
>                            Thanx, Paul
> 
>> +}
>> +
>> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func);
>> +DEFINE_RCU_TASKS(rcu_poll, rcu_poll_wait_gp, call_rcu_poll,
>> +                 "RCU Poll");
>> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func)
>> +{
>> +       call_rcu_tasks_generic(rhp, func, &rcu_poll);
>> +}
>> +EXPORT_SYMBOL_GPL(call_rcu_poll);
>> +
>> +void synchronize_rcu_poll(void)
>> +{
>> +       synchronize_rcu_tasks_generic(&rcu_poll);
>> +}
>> +EXPORT_SYMBOL_GPL(synchronize_rcu_poll);
>> +
>> +static int __init rcu_spawn_poll_kthread(void)
>> +{
>> +       cblist_init_generic(&rcu_poll);
>> +       rcu_poll.gp_sleep = HZ / 10;
>> +       rcu_spawn_tasks_kthread_generic(&rcu_poll);
>> +       return 0;
>> +}
>> 
>> Thanks
>> Zqiang
>> 
>> 
>>>> 
>>>> <snip>
>>>>  <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt
>>>> CBs=3613 bl=28
>>>> ...
>>>>  <...>-29      [001] ..... 21950.152578: rcu_invoke_callback: rcu_preempt
>>>> rhp=00000000b2d6dee8 func=__free_vm_area_struct.cfi_jt
>>>>  <...>-29      [001] ..... 21950.152579: rcu_invoke_callback: rcu_preempt
>>>> rhp=00000000a446f607 func=__free_vm_area_struct.cfi_jt
>>>>  <...>-29      [001] ..... 21950.152580: rcu_invoke_callback: rcu_preempt
>>>> rhp=00000000a5cab03b func=__free_vm_area_struct.cfi_jt
>>>>  <...>-29      [001] ..... 21950.152581: rcu_invoke_callback: rcu_preempt
>>>> rhp=0000000013b7e5ee func=__free_vm_area_struct.cfi_jt
>>>>  <...>-29      [001] ..... 21950.152582: rcu_invoke_callback: rcu_preempt
>>>> rhp=000000000a8ca6f9 func=__free_vm_area_struct.cfi_jt
>>>>  <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt
>>>> rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
>>>>  <...>-29      [001] d..1. 21950.152625: rcu_batch_end: rcu_preempt CBs-
>>>> invoked=3612 idle=....
>>>> <snip>
>>>> 
>>> 
>>> Did the results above tell us that CBs-invoked=3612 during the time 21950.145313 ~ 21950.152625?
>>> 
>>> Yes.
>>> 
>>> 
>>> If possible, may I know the steps, commands, and related parameters to produce the results above?
>>> Thank you!
>>> 
>>> Build the kernel with CONFIG_RCU_TRACE configuration. Update your "set_event"
>>> file with appropriate traces:
>>> 
>>> <snip>
>>> XQ-DQ54:/sys/kernel/tracing # echo rcu:rcu_batch_start rcu:rcu_batch_end rcu:rcu_invoke_callback > set_event
>>> 
>>> XQ-DQ54:/sys/kernel/tracing # cat set_event
>>> rcu:rcu_batch_start
>>> rcu:rcu_invoke_callback
>>> rcu:rcu_batch_end
>>> XQ-DQ54:/sys/kernel/tracing #
>>> <snip>
>>> 
>>> Collect traces as much as you want: XQ-DQ54:/sys/kernel/tracing # echo 1 > tracing_on; sleep 10; echo 0 > tracing_on
>>> Next problem is how to parse it. Of course you will not be able to parse
>>> megabytes of traces. For that purpose i use a special C trace parser.
>>> If you need an example please let me know i can show here.
>>> 
>>> --
>>> Uladzislau Rezki
  
Paul E. McKenney March 28, 2023, 3:26 p.m. UTC | #18
On Mon, Mar 27, 2023 at 10:29:31PM -0400, Joel Fernandes wrote:
> Hello,
> 
> > On Mar 27, 2023, at 9:06 PM, Paul E. McKenney <paulmck@kernel.org> wrote:
> > 
> > On Mon, Mar 27, 2023 at 11:21:23AM +0000, Zhang, Qiang1 wrote:
> >>>> From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> >>>> Sent: Tuesday, March 21, 2023 6:28 PM
> >>>> [...]
> >>>> Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> >>>> 
> >>>> A call to a synchronize_rcu() can be expensive from time point of view.
> >>>> Different workloads can be affected by this especially the ones which use this
> >>>> API in its time critical sections.
> >>>> 
> >>> 
> >>> This is interesting and meaningful research. ;-)
> >>> 
> >>>> For example in case of NOCB scenario the wakeme_after_rcu() callback
> >>>> invocation depends on where in a nocb-list it is located. Below is an example
> >>>> when it was the last out of ~3600 callbacks:
> >>> 
> >> 
> >> 
> >> 
> >> Can it be implemented separately as follows?  it seems that the code is simpler
> >> (only personal opinion)  😊.
> >> 
> >> But I didn't test whether this reduce synchronize_rcu() waiting time
> >> 
> >> +static void rcu_poll_wait_gp(struct rcu_tasks *rtp)
> >> +{
> >> +       unsigned long gp_snap;
> >> +
> >> +       gp_snap = start_poll_synchronize_rcu();
> >> +       while (!poll_state_synchronize_rcu(gp_snap))
> >> +               schedule_timeout_idle(1);
> > 
> > I could be wrong, but my guess is that the guys working with
> > battery-powered devices are not going to be very happy with this loop.
> > 
> > All those wakeups by all tasks waiting for a grace period end up
> > consuming a surprisingly large amount of energy.
> 
> Is that really the common case? On the general topic of wake-ups:
> Most of the time there should be only one
> task waiting synchronously on a GP to end. If that is
> true, then it feels like waking
> up nocb Kthreads which indirectly wake other threads is doing more work than usual?

A good question, and the number of outstanding synchronize_rcu()
calls will of course be limited by the number of tasks in the system.
But I myself have raised the ire of battery-powered embedded folks with
a rather small number of wakeups, so...

And on larger systems there can be a tradeoff between contention on
the one hand and number of wakeups on the other.

The original nocb implementation in fact had the grace-period kthead
waking up all of what are now called rcuoc kthreads.  The indirect scheme
reduced the total number of wakeups by up to 50% and also reduced the
CPU consumption of the grace-period kthread, which otherwise would have
become a bottleneck on large systems.

And also, a scheme that directly wakes tasks waiting in synchronize_rcu()
might well use the same ->nocb_gp_wq[] waitqueues that are used by the
rcuog kthreads, if that is what you were getting at.

> I am curious to measure how much does Vlad patch reduce wakeups in the common case.

Sounds like a good thing to measure!

> I was also wondering how Vlad patch effects RCU-barrier ordering. I guess
> we want the wake up to happen in the order of
> other callbacks also waiting.

OK, I will bite.  Why would rcu_barrier() need to care about the
synchronize_rcu() invocations if they no longer used call_rcu()?

> One last note, most battery powered systems are perhaps already using expedited RCU ;-)

Good point.  And that does raise the question of exactly what workloads
and systems want faster wakeups from synchronize_rcu() and cannot get
this effect from expedited grace periods.

							Thanx, Paul

> Thoughts?
> 
>  - Joel 
> 
> > 
> >                            Thanx, Paul
> > 
> >> +}
> >> +
> >> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func);
> >> +DEFINE_RCU_TASKS(rcu_poll, rcu_poll_wait_gp, call_rcu_poll,
> >> +                 "RCU Poll");
> >> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func)
> >> +{
> >> +       call_rcu_tasks_generic(rhp, func, &rcu_poll);
> >> +}
> >> +EXPORT_SYMBOL_GPL(call_rcu_poll);
> >> +
> >> +void synchronize_rcu_poll(void)
> >> +{
> >> +       synchronize_rcu_tasks_generic(&rcu_poll);
> >> +}
> >> +EXPORT_SYMBOL_GPL(synchronize_rcu_poll);
> >> +
> >> +static int __init rcu_spawn_poll_kthread(void)
> >> +{
> >> +       cblist_init_generic(&rcu_poll);
> >> +       rcu_poll.gp_sleep = HZ / 10;
> >> +       rcu_spawn_tasks_kthread_generic(&rcu_poll);
> >> +       return 0;
> >> +}
> >> 
> >> Thanks
> >> Zqiang
> >> 
> >> 
> >>>> 
> >>>> <snip>
> >>>>  <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt
> >>>> CBs=3613 bl=28
> >>>> ...
> >>>>  <...>-29      [001] ..... 21950.152578: rcu_invoke_callback: rcu_preempt
> >>>> rhp=00000000b2d6dee8 func=__free_vm_area_struct.cfi_jt
> >>>>  <...>-29      [001] ..... 21950.152579: rcu_invoke_callback: rcu_preempt
> >>>> rhp=00000000a446f607 func=__free_vm_area_struct.cfi_jt
> >>>>  <...>-29      [001] ..... 21950.152580: rcu_invoke_callback: rcu_preempt
> >>>> rhp=00000000a5cab03b func=__free_vm_area_struct.cfi_jt
> >>>>  <...>-29      [001] ..... 21950.152581: rcu_invoke_callback: rcu_preempt
> >>>> rhp=0000000013b7e5ee func=__free_vm_area_struct.cfi_jt
> >>>>  <...>-29      [001] ..... 21950.152582: rcu_invoke_callback: rcu_preempt
> >>>> rhp=000000000a8ca6f9 func=__free_vm_area_struct.cfi_jt
> >>>>  <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt
> >>>> rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
> >>>>  <...>-29      [001] d..1. 21950.152625: rcu_batch_end: rcu_preempt CBs-
> >>>> invoked=3612 idle=....
> >>>> <snip>
> >>>> 
> >>> 
> >>> Did the results above tell us that CBs-invoked=3612 during the time 21950.145313 ~ 21950.152625?
> >>> 
> >>> Yes.
> >>> 
> >>> 
> >>> If possible, may I know the steps, commands, and related parameters to produce the results above?
> >>> Thank you!
> >>> 
> >>> Build the kernel with CONFIG_RCU_TRACE configuration. Update your "set_event"
> >>> file with appropriate traces:
> >>> 
> >>> <snip>
> >>> XQ-DQ54:/sys/kernel/tracing # echo rcu:rcu_batch_start rcu:rcu_batch_end rcu:rcu_invoke_callback > set_event
> >>> 
> >>> XQ-DQ54:/sys/kernel/tracing # cat set_event
> >>> rcu:rcu_batch_start
> >>> rcu:rcu_invoke_callback
> >>> rcu:rcu_batch_end
> >>> XQ-DQ54:/sys/kernel/tracing #
> >>> <snip>
> >>> 
> >>> Collect traces as much as you want: XQ-DQ54:/sys/kernel/tracing # echo 1 > tracing_on; sleep 10; echo 0 > tracing_on
> >>> Next problem is how to parse it. Of course you will not be able to parse
> >>> megabytes of traces. For that purpose i use a special C trace parser.
> >>> If you need an example please let me know i can show here.
> >>> 
> >>> --
> >>> Uladzislau Rezki
  
Paul E. McKenney March 28, 2023, 10:14 p.m. UTC | #19
On Tue, Mar 28, 2023 at 08:26:13AM -0700, Paul E. McKenney wrote:
> On Mon, Mar 27, 2023 at 10:29:31PM -0400, Joel Fernandes wrote:
> > Hello,
> > 
> > > On Mar 27, 2023, at 9:06 PM, Paul E. McKenney <paulmck@kernel.org> wrote:
> > > 
> > > On Mon, Mar 27, 2023 at 11:21:23AM +0000, Zhang, Qiang1 wrote:
> > >>>> From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > >>>> Sent: Tuesday, March 21, 2023 6:28 PM
> > >>>> [...]
> > >>>> Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> > >>>> 
> > >>>> A call to a synchronize_rcu() can be expensive from time point of view.
> > >>>> Different workloads can be affected by this especially the ones which use this
> > >>>> API in its time critical sections.
> > >>>> 
> > >>> 
> > >>> This is interesting and meaningful research. ;-)
> > >>> 
> > >>>> For example in case of NOCB scenario the wakeme_after_rcu() callback
> > >>>> invocation depends on where in a nocb-list it is located. Below is an example
> > >>>> when it was the last out of ~3600 callbacks:
> > >>> 
> > >> 
> > >> 
> > >> 
> > >> Can it be implemented separately as follows?  it seems that the code is simpler
> > >> (only personal opinion)  😊.
> > >> 
> > >> But I didn't test whether this reduce synchronize_rcu() waiting time
> > >> 
> > >> +static void rcu_poll_wait_gp(struct rcu_tasks *rtp)
> > >> +{
> > >> +       unsigned long gp_snap;
> > >> +
> > >> +       gp_snap = start_poll_synchronize_rcu();
> > >> +       while (!poll_state_synchronize_rcu(gp_snap))
> > >> +               schedule_timeout_idle(1);
> > > 
> > > I could be wrong, but my guess is that the guys working with
> > > battery-powered devices are not going to be very happy with this loop.
> > > 
> > > All those wakeups by all tasks waiting for a grace period end up
> > > consuming a surprisingly large amount of energy.
> > 
> > Is that really the common case? On the general topic of wake-ups:
> > Most of the time there should be only one
> > task waiting synchronously on a GP to end. If that is
> > true, then it feels like waking
> > up nocb Kthreads which indirectly wake other threads is doing more work than usual?
> 
> A good question, and the number of outstanding synchronize_rcu()
> calls will of course be limited by the number of tasks in the system.
> But I myself have raised the ire of battery-powered embedded folks with
> a rather small number of wakeups, so...
> 
> And on larger systems there can be a tradeoff between contention on
> the one hand and number of wakeups on the other.
> 
> The original nocb implementation in fact had the grace-period kthead
> waking up all of what are now called rcuoc kthreads.  The indirect scheme
> reduced the total number of wakeups by up to 50% and also reduced the
> CPU consumption of the grace-period kthread, which otherwise would have
> become a bottleneck on large systems.
> 
> And also, a scheme that directly wakes tasks waiting in synchronize_rcu()
> might well use the same ->nocb_gp_wq[] waitqueues that are used by the
> rcuog kthreads, if that is what you were getting at.

And on small systems, you might of course have the rcuog kthread directly
invoke callbacks if there are not very many of them.  This would of
course need to be done quite carefully to avoid any number of races
with the rcuoc kthreads.  You could do the same thing on a large system,
but on a per-rcuog basis.

I vaguely recall discussing this in one of our sessions, but who knows?

Would this really be of benefit?  Or did you have something else in mind?

							Thanx, Paul

> > I am curious to measure how much does Vlad patch reduce wakeups in the common case.
> 
> Sounds like a good thing to measure!
> 
> > I was also wondering how Vlad patch effects RCU-barrier ordering. I guess
> > we want the wake up to happen in the order of
> > other callbacks also waiting.
> 
> OK, I will bite.  Why would rcu_barrier() need to care about the
> synchronize_rcu() invocations if they no longer used call_rcu()?
> 
> > One last note, most battery powered systems are perhaps already using expedited RCU ;-)
> 
> Good point.  And that does raise the question of exactly what workloads
> and systems want faster wakeups from synchronize_rcu() and cannot get
> this effect from expedited grace periods.
> 
> 							Thanx, Paul
> 
> > Thoughts?
> > 
> >  - Joel 
> > 
> > > 
> > >                            Thanx, Paul
> > > 
> > >> +}
> > >> +
> > >> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func);
> > >> +DEFINE_RCU_TASKS(rcu_poll, rcu_poll_wait_gp, call_rcu_poll,
> > >> +                 "RCU Poll");
> > >> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func)
> > >> +{
> > >> +       call_rcu_tasks_generic(rhp, func, &rcu_poll);
> > >> +}
> > >> +EXPORT_SYMBOL_GPL(call_rcu_poll);
> > >> +
> > >> +void synchronize_rcu_poll(void)
> > >> +{
> > >> +       synchronize_rcu_tasks_generic(&rcu_poll);
> > >> +}
> > >> +EXPORT_SYMBOL_GPL(synchronize_rcu_poll);
> > >> +
> > >> +static int __init rcu_spawn_poll_kthread(void)
> > >> +{
> > >> +       cblist_init_generic(&rcu_poll);
> > >> +       rcu_poll.gp_sleep = HZ / 10;
> > >> +       rcu_spawn_tasks_kthread_generic(&rcu_poll);
> > >> +       return 0;
> > >> +}
> > >> 
> > >> Thanks
> > >> Zqiang
> > >> 
> > >> 
> > >>>> 
> > >>>> <snip>
> > >>>>  <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt
> > >>>> CBs=3613 bl=28
> > >>>> ...
> > >>>>  <...>-29      [001] ..... 21950.152578: rcu_invoke_callback: rcu_preempt
> > >>>> rhp=00000000b2d6dee8 func=__free_vm_area_struct.cfi_jt
> > >>>>  <...>-29      [001] ..... 21950.152579: rcu_invoke_callback: rcu_preempt
> > >>>> rhp=00000000a446f607 func=__free_vm_area_struct.cfi_jt
> > >>>>  <...>-29      [001] ..... 21950.152580: rcu_invoke_callback: rcu_preempt
> > >>>> rhp=00000000a5cab03b func=__free_vm_area_struct.cfi_jt
> > >>>>  <...>-29      [001] ..... 21950.152581: rcu_invoke_callback: rcu_preempt
> > >>>> rhp=0000000013b7e5ee func=__free_vm_area_struct.cfi_jt
> > >>>>  <...>-29      [001] ..... 21950.152582: rcu_invoke_callback: rcu_preempt
> > >>>> rhp=000000000a8ca6f9 func=__free_vm_area_struct.cfi_jt
> > >>>>  <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt
> > >>>> rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
> > >>>>  <...>-29      [001] d..1. 21950.152625: rcu_batch_end: rcu_preempt CBs-
> > >>>> invoked=3612 idle=....
> > >>>> <snip>
> > >>>> 
> > >>> 
> > >>> Did the results above tell us that CBs-invoked=3612 during the time 21950.145313 ~ 21950.152625?
> > >>> 
> > >>> Yes.
> > >>> 
> > >>> 
> > >>> If possible, may I know the steps, commands, and related parameters to produce the results above?
> > >>> Thank you!
> > >>> 
> > >>> Build the kernel with CONFIG_RCU_TRACE configuration. Update your "set_event"
> > >>> file with appropriate traces:
> > >>> 
> > >>> <snip>
> > >>> XQ-DQ54:/sys/kernel/tracing # echo rcu:rcu_batch_start rcu:rcu_batch_end rcu:rcu_invoke_callback > set_event
> > >>> 
> > >>> XQ-DQ54:/sys/kernel/tracing # cat set_event
> > >>> rcu:rcu_batch_start
> > >>> rcu:rcu_invoke_callback
> > >>> rcu:rcu_batch_end
> > >>> XQ-DQ54:/sys/kernel/tracing #
> > >>> <snip>
> > >>> 
> > >>> Collect traces as much as you want: XQ-DQ54:/sys/kernel/tracing # echo 1 > tracing_on; sleep 10; echo 0 > tracing_on
> > >>> Next problem is how to parse it. Of course you will not be able to parse
> > >>> megabytes of traces. For that purpose i use a special C trace parser.
> > >>> If you need an example please let me know i can show here.
> > >>> 
> > >>> --
> > >>> Uladzislau Rezki
  
Joel Fernandes March 30, 2023, 3:09 p.m. UTC | #20
On Tue, Mar 28, 2023 at 08:26:13AM -0700, Paul E. McKenney wrote:
> On Mon, Mar 27, 2023 at 10:29:31PM -0400, Joel Fernandes wrote:
> > Hello,
> > 
> > > On Mar 27, 2023, at 9:06 PM, Paul E. McKenney <paulmck@kernel.org> wrote:
> > > 
> > > On Mon, Mar 27, 2023 at 11:21:23AM +0000, Zhang, Qiang1 wrote:
> > >>>> From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > >>>> Sent: Tuesday, March 21, 2023 6:28 PM
> > >>>> [...]
> > >>>> Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> > >>>> 
> > >>>> A call to a synchronize_rcu() can be expensive from time point of view.
> > >>>> Different workloads can be affected by this especially the ones which use this
> > >>>> API in its time critical sections.
> > >>>> 
> > >>> 
> > >>> This is interesting and meaningful research. ;-)
> > >>> 
> > >>>> For example in case of NOCB scenario the wakeme_after_rcu() callback
> > >>>> invocation depends on where in a nocb-list it is located. Below is an example
> > >>>> when it was the last out of ~3600 callbacks:
> > >>> 
> > >> 
> > >> 
> > >> 
> > >> Can it be implemented separately as follows?  it seems that the code is simpler
> > >> (only personal opinion)  😊.
> > >> 
> > >> But I didn't test whether this reduce synchronize_rcu() waiting time
> > >> 
> > >> +static void rcu_poll_wait_gp(struct rcu_tasks *rtp)
> > >> +{
> > >> +       unsigned long gp_snap;
> > >> +
> > >> +       gp_snap = start_poll_synchronize_rcu();
> > >> +       while (!poll_state_synchronize_rcu(gp_snap))
> > >> +               schedule_timeout_idle(1);
> > > 
> > > I could be wrong, but my guess is that the guys working with
> > > battery-powered devices are not going to be very happy with this loop.
> > > 
> > > All those wakeups by all tasks waiting for a grace period end up
> > > consuming a surprisingly large amount of energy.
> > 
> > Is that really the common case? On the general topic of wake-ups:
> > Most of the time there should be only one
> > task waiting synchronously on a GP to end. If that is
> > true, then it feels like waking
> > up nocb Kthreads which indirectly wake other threads is doing more work than usual?
> 
> A good question, and the number of outstanding synchronize_rcu()
> calls will of course be limited by the number of tasks in the system.
> But I myself have raised the ire of battery-powered embedded folks with
> a rather small number of wakeups, so...

But unless I am missing something, even if there is single synchronize_rcu(),
you have a flurry of potential wakeups right now, instead of the bare minimum
I think. I have not measured how many wake ups, but I'd love to when I get
time. Maybe Vlad has some numbers.

> And on larger systems there can be a tradeoff between contention on
> the one hand and number of wakeups on the other.
> 
> The original nocb implementation in fact had the grace-period kthead
> waking up all of what are now called rcuoc kthreads.  The indirect scheme
> reduced the total number of wakeups by up to 50% and also reduced the
> CPU consumption of the grace-period kthread, which otherwise would have
> become a bottleneck on large systems.

Thanks for the background.

> And also, a scheme that directly wakes tasks waiting in synchronize_rcu()
> might well use the same ->nocb_gp_wq[] waitqueues that are used by the
> rcuog kthreads, if that is what you were getting at.

Yes that's what I was getting at. I thought Vlad was going for doing direct
wake ups from the main RCU GP thread that orchestates RCU grace period
cycles.

> > I am curious to measure how much does Vlad patch reduce wakeups in the common case.
> 
> Sounds like a good thing to measure!

Ok. At the moment I am preparing 2 talks I am giving at OSPM for real-time and
timers. Plus preparing the PR, so I'm fully booked. :(  [and the LWN article..].
> 
> > I was also wondering how Vlad patch effects RCU-barrier ordering. I guess
> > we want the wake up to happen in the order of
> > other callbacks also waiting.
> 
> OK, I will bite.  Why would rcu_barrier() need to care about the
> synchronize_rcu() invocations if they no longer used call_rcu()?

Hm, I was just going for the fact that it is a behavioral change. Not
illuding that it would certainly cause an issue. As we know, Linux kernel
developers have interesting ways of using RCU APIs. :-)

But yes, it may not be an issue considering expedited synchronize_rcu() also
has such behavior anyway, if I'm not mistaken.

> > One last note, most battery powered systems are perhaps already using expedited RCU ;-)
> 
> Good point.  And that does raise the question of exactly what workloads
> and systems want faster wakeups from synchronize_rcu() and cannot get
> this effect from expedited grace periods.

Maybe the kind of workloads that don't need GP completion very quickly, but
just want to reduce wakeups. The wakeups do have a cost, the scheduler can
wake up several idle CPUs to "spread the awakened load" and cause wastage
power. And also contend on locks during the wake up.

thanks,

 - Joel


> >  - Joel 
> > 
> > > 
> > >                            Thanx, Paul
> > > 
> > >> +}
> > >> +
> > >> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func);
> > >> +DEFINE_RCU_TASKS(rcu_poll, rcu_poll_wait_gp, call_rcu_poll,
> > >> +                 "RCU Poll");
> > >> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func)
> > >> +{
> > >> +       call_rcu_tasks_generic(rhp, func, &rcu_poll);
> > >> +}
> > >> +EXPORT_SYMBOL_GPL(call_rcu_poll);
> > >> +
> > >> +void synchronize_rcu_poll(void)
> > >> +{
> > >> +       synchronize_rcu_tasks_generic(&rcu_poll);
> > >> +}
> > >> +EXPORT_SYMBOL_GPL(synchronize_rcu_poll);
> > >> +
> > >> +static int __init rcu_spawn_poll_kthread(void)
> > >> +{
> > >> +       cblist_init_generic(&rcu_poll);
> > >> +       rcu_poll.gp_sleep = HZ / 10;
> > >> +       rcu_spawn_tasks_kthread_generic(&rcu_poll);
> > >> +       return 0;
> > >> +}
> > >> 
> > >> Thanks
> > >> Zqiang
> > >> 
> > >> 
> > >>>> 
> > >>>> <snip>
> > >>>>  <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt
> > >>>> CBs=3613 bl=28
> > >>>> ...
> > >>>>  <...>-29      [001] ..... 21950.152578: rcu_invoke_callback: rcu_preempt
> > >>>> rhp=00000000b2d6dee8 func=__free_vm_area_struct.cfi_jt
> > >>>>  <...>-29      [001] ..... 21950.152579: rcu_invoke_callback: rcu_preempt
> > >>>> rhp=00000000a446f607 func=__free_vm_area_struct.cfi_jt
> > >>>>  <...>-29      [001] ..... 21950.152580: rcu_invoke_callback: rcu_preempt
> > >>>> rhp=00000000a5cab03b func=__free_vm_area_struct.cfi_jt
> > >>>>  <...>-29      [001] ..... 21950.152581: rcu_invoke_callback: rcu_preempt
> > >>>> rhp=0000000013b7e5ee func=__free_vm_area_struct.cfi_jt
> > >>>>  <...>-29      [001] ..... 21950.152582: rcu_invoke_callback: rcu_preempt
> > >>>> rhp=000000000a8ca6f9 func=__free_vm_area_struct.cfi_jt
> > >>>>  <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt
> > >>>> rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
> > >>>>  <...>-29      [001] d..1. 21950.152625: rcu_batch_end: rcu_preempt CBs-
> > >>>> invoked=3612 idle=....
> > >>>> <snip>
> > >>>> 
> > >>> 
> > >>> Did the results above tell us that CBs-invoked=3612 during the time 21950.145313 ~ 21950.152625?
> > >>> 
> > >>> Yes.
> > >>> 
> > >>> 
> > >>> If possible, may I know the steps, commands, and related parameters to produce the results above?
> > >>> Thank you!
> > >>> 
> > >>> Build the kernel with CONFIG_RCU_TRACE configuration. Update your "set_event"
> > >>> file with appropriate traces:
> > >>> 
> > >>> <snip>
> > >>> XQ-DQ54:/sys/kernel/tracing # echo rcu:rcu_batch_start rcu:rcu_batch_end rcu:rcu_invoke_callback > set_event
> > >>> 
> > >>> XQ-DQ54:/sys/kernel/tracing # cat set_event
> > >>> rcu:rcu_batch_start
> > >>> rcu:rcu_invoke_callback
> > >>> rcu:rcu_batch_end
> > >>> XQ-DQ54:/sys/kernel/tracing #
> > >>> <snip>
> > >>> 
> > >>> Collect traces as much as you want: XQ-DQ54:/sys/kernel/tracing # echo 1 > tracing_on; sleep 10; echo 0 > tracing_on
> > >>> Next problem is how to parse it. Of course you will not be able to parse
> > >>> megabytes of traces. For that purpose i use a special C trace parser.
> > >>> If you need an example please let me know i can show here.
> > >>> 
> > >>> --
> > >>> Uladzislau Rezki
  
Joel Fernandes March 30, 2023, 3:11 p.m. UTC | #21
On Tue, Mar 28, 2023 at 03:14:32PM -0700, Paul E. McKenney wrote:
> On Tue, Mar 28, 2023 at 08:26:13AM -0700, Paul E. McKenney wrote:
> > On Mon, Mar 27, 2023 at 10:29:31PM -0400, Joel Fernandes wrote:
> > > Hello,
> > > 
> > > > On Mar 27, 2023, at 9:06 PM, Paul E. McKenney <paulmck@kernel.org> wrote:
> > > > 
> > > > On Mon, Mar 27, 2023 at 11:21:23AM +0000, Zhang, Qiang1 wrote:
> > > >>>> From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > > >>>> Sent: Tuesday, March 21, 2023 6:28 PM
> > > >>>> [...]
> > > >>>> Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> > > >>>> 
> > > >>>> A call to a synchronize_rcu() can be expensive from time point of view.
> > > >>>> Different workloads can be affected by this especially the ones which use this
> > > >>>> API in its time critical sections.
> > > >>>> 
> > > >>> 
> > > >>> This is interesting and meaningful research. ;-)
> > > >>> 
> > > >>>> For example in case of NOCB scenario the wakeme_after_rcu() callback
> > > >>>> invocation depends on where in a nocb-list it is located. Below is an example
> > > >>>> when it was the last out of ~3600 callbacks:
> > > >>> 
> > > >> 
> > > >> 
> > > >> 
> > > >> Can it be implemented separately as follows?  it seems that the code is simpler
> > > >> (only personal opinion)  😊.
> > > >> 
> > > >> But I didn't test whether this reduce synchronize_rcu() waiting time
> > > >> 
> > > >> +static void rcu_poll_wait_gp(struct rcu_tasks *rtp)
> > > >> +{
> > > >> +       unsigned long gp_snap;
> > > >> +
> > > >> +       gp_snap = start_poll_synchronize_rcu();
> > > >> +       while (!poll_state_synchronize_rcu(gp_snap))
> > > >> +               schedule_timeout_idle(1);
> > > > 
> > > > I could be wrong, but my guess is that the guys working with
> > > > battery-powered devices are not going to be very happy with this loop.
> > > > 
> > > > All those wakeups by all tasks waiting for a grace period end up
> > > > consuming a surprisingly large amount of energy.
> > > 
> > > Is that really the common case? On the general topic of wake-ups:
> > > Most of the time there should be only one
> > > task waiting synchronously on a GP to end. If that is
> > > true, then it feels like waking
> > > up nocb Kthreads which indirectly wake other threads is doing more work than usual?
> > 
> > A good question, and the number of outstanding synchronize_rcu()
> > calls will of course be limited by the number of tasks in the system.
> > But I myself have raised the ire of battery-powered embedded folks with
> > a rather small number of wakeups, so...
> > 
> > And on larger systems there can be a tradeoff between contention on
> > the one hand and number of wakeups on the other.
> > 
> > The original nocb implementation in fact had the grace-period kthead
> > waking up all of what are now called rcuoc kthreads.  The indirect scheme
> > reduced the total number of wakeups by up to 50% and also reduced the
> > CPU consumption of the grace-period kthread, which otherwise would have
> > become a bottleneck on large systems.
> > 
> > And also, a scheme that directly wakes tasks waiting in synchronize_rcu()
> > might well use the same ->nocb_gp_wq[] waitqueues that are used by the
> > rcuog kthreads, if that is what you were getting at.
> 
> And on small systems, you might of course have the rcuog kthread directly
> invoke callbacks if there are not very many of them.  This would of
> course need to be done quite carefully to avoid any number of races
> with the rcuoc kthreads.  You could do the same thing on a large system,
> but on a per-rcuog basis.
> 
> I vaguely recall discussing this in one of our sessions, but who knows?
> 
> Would this really be of benefit?  Or did you have something else in mind?

Yes, this is what I was also referring to.

Not sure about benefit, depends on workloads and measurement.

thanks,

 - Joel


> 
> 							Thanx, Paul
> 
> > > I am curious to measure how much does Vlad patch reduce wakeups in the common case.
> > 
> > Sounds like a good thing to measure!
> > 
> > > I was also wondering how Vlad patch effects RCU-barrier ordering. I guess
> > > we want the wake up to happen in the order of
> > > other callbacks also waiting.
> > 
> > OK, I will bite.  Why would rcu_barrier() need to care about the
> > synchronize_rcu() invocations if they no longer used call_rcu()?
> > 
> > > One last note, most battery powered systems are perhaps already using expedited RCU ;-)
> > 
> > Good point.  And that does raise the question of exactly what workloads
> > and systems want faster wakeups from synchronize_rcu() and cannot get
> > this effect from expedited grace periods.
> > 
> > 							Thanx, Paul
> > 
> > > Thoughts?
> > > 
> > >  - Joel 
> > > 
> > > > 
> > > >                            Thanx, Paul
> > > > 
> > > >> +}
> > > >> +
> > > >> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func);
> > > >> +DEFINE_RCU_TASKS(rcu_poll, rcu_poll_wait_gp, call_rcu_poll,
> > > >> +                 "RCU Poll");
> > > >> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func)
> > > >> +{
> > > >> +       call_rcu_tasks_generic(rhp, func, &rcu_poll);
> > > >> +}
> > > >> +EXPORT_SYMBOL_GPL(call_rcu_poll);
> > > >> +
> > > >> +void synchronize_rcu_poll(void)
> > > >> +{
> > > >> +       synchronize_rcu_tasks_generic(&rcu_poll);
> > > >> +}
> > > >> +EXPORT_SYMBOL_GPL(synchronize_rcu_poll);
> > > >> +
> > > >> +static int __init rcu_spawn_poll_kthread(void)
> > > >> +{
> > > >> +       cblist_init_generic(&rcu_poll);
> > > >> +       rcu_poll.gp_sleep = HZ / 10;
> > > >> +       rcu_spawn_tasks_kthread_generic(&rcu_poll);
> > > >> +       return 0;
> > > >> +}
> > > >> 
> > > >> Thanks
> > > >> Zqiang
> > > >> 
> > > >> 
> > > >>>> 
> > > >>>> <snip>
> > > >>>>  <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt
> > > >>>> CBs=3613 bl=28
> > > >>>> ...
> > > >>>>  <...>-29      [001] ..... 21950.152578: rcu_invoke_callback: rcu_preempt
> > > >>>> rhp=00000000b2d6dee8 func=__free_vm_area_struct.cfi_jt
> > > >>>>  <...>-29      [001] ..... 21950.152579: rcu_invoke_callback: rcu_preempt
> > > >>>> rhp=00000000a446f607 func=__free_vm_area_struct.cfi_jt
> > > >>>>  <...>-29      [001] ..... 21950.152580: rcu_invoke_callback: rcu_preempt
> > > >>>> rhp=00000000a5cab03b func=__free_vm_area_struct.cfi_jt
> > > >>>>  <...>-29      [001] ..... 21950.152581: rcu_invoke_callback: rcu_preempt
> > > >>>> rhp=0000000013b7e5ee func=__free_vm_area_struct.cfi_jt
> > > >>>>  <...>-29      [001] ..... 21950.152582: rcu_invoke_callback: rcu_preempt
> > > >>>> rhp=000000000a8ca6f9 func=__free_vm_area_struct.cfi_jt
> > > >>>>  <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt
> > > >>>> rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
> > > >>>>  <...>-29      [001] d..1. 21950.152625: rcu_batch_end: rcu_preempt CBs-
> > > >>>> invoked=3612 idle=....
> > > >>>> <snip>
> > > >>>> 
> > > >>> 
> > > >>> Did the results above tell us that CBs-invoked=3612 during the time 21950.145313 ~ 21950.152625?
> > > >>> 
> > > >>> Yes.
> > > >>> 
> > > >>> 
> > > >>> If possible, may I know the steps, commands, and related parameters to produce the results above?
> > > >>> Thank you!
> > > >>> 
> > > >>> Build the kernel with CONFIG_RCU_TRACE configuration. Update your "set_event"
> > > >>> file with appropriate traces:
> > > >>> 
> > > >>> <snip>
> > > >>> XQ-DQ54:/sys/kernel/tracing # echo rcu:rcu_batch_start rcu:rcu_batch_end rcu:rcu_invoke_callback > set_event
> > > >>> 
> > > >>> XQ-DQ54:/sys/kernel/tracing # cat set_event
> > > >>> rcu:rcu_batch_start
> > > >>> rcu:rcu_invoke_callback
> > > >>> rcu:rcu_batch_end
> > > >>> XQ-DQ54:/sys/kernel/tracing #
> > > >>> <snip>
> > > >>> 
> > > >>> Collect traces as much as you want: XQ-DQ54:/sys/kernel/tracing # echo 1 > tracing_on; sleep 10; echo 0 > tracing_on
> > > >>> Next problem is how to parse it. Of course you will not be able to parse
> > > >>> megabytes of traces. For that purpose i use a special C trace parser.
> > > >>> If you need an example please let me know i can show here.
> > > >>> 
> > > >>> --
> > > >>> Uladzislau Rezki
  
Uladzislau Rezki March 30, 2023, 3:43 p.m. UTC | #22
On Thu, Mar 30, 2023 at 03:09:33PM +0000, Joel Fernandes wrote:
> On Tue, Mar 28, 2023 at 08:26:13AM -0700, Paul E. McKenney wrote:
> > On Mon, Mar 27, 2023 at 10:29:31PM -0400, Joel Fernandes wrote:
> > > Hello,
> > > 
> > > > On Mar 27, 2023, at 9:06 PM, Paul E. McKenney <paulmck@kernel.org> wrote:
> > > > 
> > > > On Mon, Mar 27, 2023 at 11:21:23AM +0000, Zhang, Qiang1 wrote:
> > > >>>> From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > > >>>> Sent: Tuesday, March 21, 2023 6:28 PM
> > > >>>> [...]
> > > >>>> Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> > > >>>> 
> > > >>>> A call to a synchronize_rcu() can be expensive from time point of view.
> > > >>>> Different workloads can be affected by this especially the ones which use this
> > > >>>> API in its time critical sections.
> > > >>>> 
> > > >>> 
> > > >>> This is interesting and meaningful research. ;-)
> > > >>> 
> > > >>>> For example in case of NOCB scenario the wakeme_after_rcu() callback
> > > >>>> invocation depends on where in a nocb-list it is located. Below is an example
> > > >>>> when it was the last out of ~3600 callbacks:
> > > >>> 
> > > >> 
> > > >> 
> > > >> 
> > > >> Can it be implemented separately as follows?  it seems that the code is simpler
> > > >> (only personal opinion)  😊.
> > > >> 
> > > >> But I didn't test whether this reduce synchronize_rcu() waiting time
> > > >> 
> > > >> +static void rcu_poll_wait_gp(struct rcu_tasks *rtp)
> > > >> +{
> > > >> +       unsigned long gp_snap;
> > > >> +
> > > >> +       gp_snap = start_poll_synchronize_rcu();
> > > >> +       while (!poll_state_synchronize_rcu(gp_snap))
> > > >> +               schedule_timeout_idle(1);
> > > > 
> > > > I could be wrong, but my guess is that the guys working with
> > > > battery-powered devices are not going to be very happy with this loop.
> > > > 
> > > > All those wakeups by all tasks waiting for a grace period end up
> > > > consuming a surprisingly large amount of energy.
> > > 
> > > Is that really the common case? On the general topic of wake-ups:
> > > Most of the time there should be only one
> > > task waiting synchronously on a GP to end. If that is
> > > true, then it feels like waking
> > > up nocb Kthreads which indirectly wake other threads is doing more work than usual?
> > 
> > A good question, and the number of outstanding synchronize_rcu()
> > calls will of course be limited by the number of tasks in the system.
> > But I myself have raised the ire of battery-powered embedded folks with
> > a rather small number of wakeups, so...
> 
> But unless I am missing something, even if there is single synchronize_rcu(),
> you have a flurry of potential wakeups right now, instead of the bare minimum
> I think. I have not measured how many wake ups, but I'd love to when I get
> time. Maybe Vlad has some numbers.
> 
I will measure and have a look at wake-ups. But, what we have for now is
if there are two callers of synchronize_rcu() on different CPUs, i guess
two nocb-kthreads have to handle it, thus two nocb-kthreads have to be
awaken to do the work. This patch needs only one wake-up to serve all
users.

Anyway, i will provide some data and analysis of it.

--
Uladzislau Rezki
  
Paul E. McKenney March 30, 2023, 6:57 p.m. UTC | #23
On Thu, Mar 30, 2023 at 03:09:33PM +0000, Joel Fernandes wrote:
> On Tue, Mar 28, 2023 at 08:26:13AM -0700, Paul E. McKenney wrote:
> > On Mon, Mar 27, 2023 at 10:29:31PM -0400, Joel Fernandes wrote:
> > > Hello,
> > > 
> > > > On Mar 27, 2023, at 9:06 PM, Paul E. McKenney <paulmck@kernel.org> wrote:
> > > > 
> > > > On Mon, Mar 27, 2023 at 11:21:23AM +0000, Zhang, Qiang1 wrote:
> > > >>>> From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > > >>>> Sent: Tuesday, March 21, 2023 6:28 PM
> > > >>>> [...]
> > > >>>> Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> > > >>>> 
> > > >>>> A call to a synchronize_rcu() can be expensive from time point of view.
> > > >>>> Different workloads can be affected by this especially the ones which use this
> > > >>>> API in its time critical sections.
> > > >>>> 
> > > >>> 
> > > >>> This is interesting and meaningful research. ;-)
> > > >>> 
> > > >>>> For example in case of NOCB scenario the wakeme_after_rcu() callback
> > > >>>> invocation depends on where in a nocb-list it is located. Below is an example
> > > >>>> when it was the last out of ~3600 callbacks:
> > > >>> 
> > > >> 
> > > >> 
> > > >> 
> > > >> Can it be implemented separately as follows?  it seems that the code is simpler
> > > >> (only personal opinion)  😊.
> > > >> 
> > > >> But I didn't test whether this reduce synchronize_rcu() waiting time
> > > >> 
> > > >> +static void rcu_poll_wait_gp(struct rcu_tasks *rtp)
> > > >> +{
> > > >> +       unsigned long gp_snap;
> > > >> +
> > > >> +       gp_snap = start_poll_synchronize_rcu();
> > > >> +       while (!poll_state_synchronize_rcu(gp_snap))
> > > >> +               schedule_timeout_idle(1);
> > > > 
> > > > I could be wrong, but my guess is that the guys working with
> > > > battery-powered devices are not going to be very happy with this loop.
> > > > 
> > > > All those wakeups by all tasks waiting for a grace period end up
> > > > consuming a surprisingly large amount of energy.
> > > 
> > > Is that really the common case? On the general topic of wake-ups:
> > > Most of the time there should be only one
> > > task waiting synchronously on a GP to end. If that is
> > > true, then it feels like waking
> > > up nocb Kthreads which indirectly wake other threads is doing more work than usual?
> > 
> > A good question, and the number of outstanding synchronize_rcu()
> > calls will of course be limited by the number of tasks in the system.
> > But I myself have raised the ire of battery-powered embedded folks with
> > a rather small number of wakeups, so...
> 
> But unless I am missing something, even if there is single synchronize_rcu(),
> you have a flurry of potential wakeups right now, instead of the bare minimum
> I think. I have not measured how many wake ups, but I'd love to when I get
> time. Maybe Vlad has some numbers.

It is complicated.  ;-)

Perhaps you are thinking in terms of an isolated synchronize_rcu()
call in an otherwise idle system with all RCU callbacks offloaded,
but booted with rcutree.use_softirq=1.

In that case, there would be three wakeups, one for the rcuog
kthread, one for the rcuoc kthread, and one for the task that invoked
synchronize_rcu().

As above, but with rcutree.use_softirq=0, there would be some additional
wakeups of the rcuc kthreads.  For rcutree.use_softirq=1, there might
also be ksoftirqd wakeups, but probably not on almost-idle systems.

As the system gets busier, the rcuog (and rcuc, if any) kthead wakeups
get amortized over increasing numbers of synchronize_rcu() calls, as to
a lesser extent do the rcuoc kthread wakeups.

On the other hand, a nearly idle system without RCU callback offloading
that is booted with rcutree.use_softirq=1 will just have the one wakeup
in the common case.  Everything else will happen in softirq context,
and ksoftirqd won't get involved in systems that are nearly idle.
Which is yet another reason softirq is so hard to kill.  ;-)

Going back to the offloaded-callbacks case, as I understand it, wakeups
that cause IPIs to be sent to non-idle CPUs are not all that bad for
battery lifetime, so that would figure in as well.  Of course, in the
worst case, the rcuog, rcuoc, and synchronize_rcu() tasks might all be
on different CPUs.  Though perhaps an energy-aware scheduler might have
something to say about that situation, at least in the absence of manual
pinning of tasks to CPUs.

So this will depend on many things, and actual measurements will be
needed.  Also, the default needs to be to opt out of this, at least
initially.  Focusing the debugging risk on the people who actively want
this feature helps keep the larger user community more calm.  ;-)

> > And on larger systems there can be a tradeoff between contention on
> > the one hand and number of wakeups on the other.
> > 
> > The original nocb implementation in fact had the grace-period kthead
> > waking up all of what are now called rcuoc kthreads.  The indirect scheme
> > reduced the total number of wakeups by up to 50% and also reduced the
> > CPU consumption of the grace-period kthread, which otherwise would have
> > become a bottleneck on large systems.
> 
> Thanks for the background.
> 
> > And also, a scheme that directly wakes tasks waiting in synchronize_rcu()
> > might well use the same ->nocb_gp_wq[] waitqueues that are used by the
> > rcuog kthreads, if that is what you were getting at.
> 
> Yes that's what I was getting at. I thought Vlad was going for doing direct
> wake ups from the main RCU GP thread that orchestates RCU grace period
> cycles.

Maybe he was wanting to have something that worked independently of
rcu_nocbs CPUs?  Though if so, one way to make that work would be
to make those waitqueues available on all CPUs.

> > > I am curious to measure how much does Vlad patch reduce wakeups in the common case.
> > 
> > Sounds like a good thing to measure!
> 
> Ok. At the moment I am preparing 2 talks I am giving at OSPM for real-time and
> timers. Plus preparing the PR, so I'm fully booked. :(  [and the LWN article..].

Maybe someone like Vlad will do the measuring, and if not, it will still
be around after OSPM and the upcoming merge window.  Your guys' schedule
is my schedule on this one.

> > > I was also wondering how Vlad patch effects RCU-barrier ordering. I guess
> > > we want the wake up to happen in the order of
> > > other callbacks also waiting.
> > 
> > OK, I will bite.  Why would rcu_barrier() need to care about the
> > synchronize_rcu() invocations if they no longer used call_rcu()?
> 
> Hm, I was just going for the fact that it is a behavioral change. Not
> illuding that it would certainly cause an issue. As we know, Linux kernel
> developers have interesting ways of using RCU APIs. :-)
> 
> But yes, it may not be an issue considering expedited synchronize_rcu() also
> has such behavior anyway, if I'm not mistaken.

All fair points, and this is part of why this needs to be opt-in.

But my rationale is that wakeup delays mean that there is no way for
the caller to determine whether or not rcu_barrier() did or did not wait
for in-flight synchronize_rcu() calls.

Which might well be a failure of imagination on my part.

> > > One last note, most battery powered systems are perhaps already using expedited RCU ;-)
> > 
> > Good point.  And that does raise the question of exactly what workloads
> > and systems want faster wakeups from synchronize_rcu() and cannot get
> > this effect from expedited grace periods.
> 
> Maybe the kind of workloads that don't need GP completion very quickly, but
> just want to reduce wakeups. The wakeups do have a cost, the scheduler can
> wake up several idle CPUs to "spread the awakened load" and cause wastage
> power. And also contend on locks during the wake up.

In that case, the commit log needs to clearly call this out, and at
least some of the measurements need to focus on this case.

I might have misread it, but v1 of Uladzislau's patch seemed to focus
on latency rather than on wakeup minimization.

							Thanx, Paul

> thanks,
> 
>  - Joel
> 
> 
> > >  - Joel 
> > > 
> > > > 
> > > >                            Thanx, Paul
> > > > 
> > > >> +}
> > > >> +
> > > >> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func);
> > > >> +DEFINE_RCU_TASKS(rcu_poll, rcu_poll_wait_gp, call_rcu_poll,
> > > >> +                 "RCU Poll");
> > > >> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func)
> > > >> +{
> > > >> +       call_rcu_tasks_generic(rhp, func, &rcu_poll);
> > > >> +}
> > > >> +EXPORT_SYMBOL_GPL(call_rcu_poll);
> > > >> +
> > > >> +void synchronize_rcu_poll(void)
> > > >> +{
> > > >> +       synchronize_rcu_tasks_generic(&rcu_poll);
> > > >> +}
> > > >> +EXPORT_SYMBOL_GPL(synchronize_rcu_poll);
> > > >> +
> > > >> +static int __init rcu_spawn_poll_kthread(void)
> > > >> +{
> > > >> +       cblist_init_generic(&rcu_poll);
> > > >> +       rcu_poll.gp_sleep = HZ / 10;
> > > >> +       rcu_spawn_tasks_kthread_generic(&rcu_poll);
> > > >> +       return 0;
> > > >> +}
> > > >> 
> > > >> Thanks
> > > >> Zqiang
> > > >> 
> > > >> 
> > > >>>> 
> > > >>>> <snip>
> > > >>>>  <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt
> > > >>>> CBs=3613 bl=28
> > > >>>> ...
> > > >>>>  <...>-29      [001] ..... 21950.152578: rcu_invoke_callback: rcu_preempt
> > > >>>> rhp=00000000b2d6dee8 func=__free_vm_area_struct.cfi_jt
> > > >>>>  <...>-29      [001] ..... 21950.152579: rcu_invoke_callback: rcu_preempt
> > > >>>> rhp=00000000a446f607 func=__free_vm_area_struct.cfi_jt
> > > >>>>  <...>-29      [001] ..... 21950.152580: rcu_invoke_callback: rcu_preempt
> > > >>>> rhp=00000000a5cab03b func=__free_vm_area_struct.cfi_jt
> > > >>>>  <...>-29      [001] ..... 21950.152581: rcu_invoke_callback: rcu_preempt
> > > >>>> rhp=0000000013b7e5ee func=__free_vm_area_struct.cfi_jt
> > > >>>>  <...>-29      [001] ..... 21950.152582: rcu_invoke_callback: rcu_preempt
> > > >>>> rhp=000000000a8ca6f9 func=__free_vm_area_struct.cfi_jt
> > > >>>>  <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt
> > > >>>> rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
> > > >>>>  <...>-29      [001] d..1. 21950.152625: rcu_batch_end: rcu_preempt CBs-
> > > >>>> invoked=3612 idle=....
> > > >>>> <snip>
> > > >>>> 
> > > >>> 
> > > >>> Did the results above tell us that CBs-invoked=3612 during the time 21950.145313 ~ 21950.152625?
> > > >>> 
> > > >>> Yes.
> > > >>> 
> > > >>> 
> > > >>> If possible, may I know the steps, commands, and related parameters to produce the results above?
> > > >>> Thank you!
> > > >>> 
> > > >>> Build the kernel with CONFIG_RCU_TRACE configuration. Update your "set_event"
> > > >>> file with appropriate traces:
> > > >>> 
> > > >>> <snip>
> > > >>> XQ-DQ54:/sys/kernel/tracing # echo rcu:rcu_batch_start rcu:rcu_batch_end rcu:rcu_invoke_callback > set_event
> > > >>> 
> > > >>> XQ-DQ54:/sys/kernel/tracing # cat set_event
> > > >>> rcu:rcu_batch_start
> > > >>> rcu:rcu_invoke_callback
> > > >>> rcu:rcu_batch_end
> > > >>> XQ-DQ54:/sys/kernel/tracing #
> > > >>> <snip>
> > > >>> 
> > > >>> Collect traces as much as you want: XQ-DQ54:/sys/kernel/tracing # echo 1 > tracing_on; sleep 10; echo 0 > tracing_on
> > > >>> Next problem is how to parse it. Of course you will not be able to parse
> > > >>> megabytes of traces. For that purpose i use a special C trace parser.
> > > >>> If you need an example please let me know i can show here.
> > > >>> 
> > > >>> --
> > > >>> Uladzislau Rezki
  
Paul E. McKenney March 30, 2023, 6:58 p.m. UTC | #24
On Thu, Mar 30, 2023 at 05:43:15PM +0200, Uladzislau Rezki wrote:
> On Thu, Mar 30, 2023 at 03:09:33PM +0000, Joel Fernandes wrote:
> > On Tue, Mar 28, 2023 at 08:26:13AM -0700, Paul E. McKenney wrote:
> > > On Mon, Mar 27, 2023 at 10:29:31PM -0400, Joel Fernandes wrote:
> > > > Hello,
> > > > 
> > > > > On Mar 27, 2023, at 9:06 PM, Paul E. McKenney <paulmck@kernel.org> wrote:
> > > > > 
> > > > > On Mon, Mar 27, 2023 at 11:21:23AM +0000, Zhang, Qiang1 wrote:
> > > > >>>> From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > > > >>>> Sent: Tuesday, March 21, 2023 6:28 PM
> > > > >>>> [...]
> > > > >>>> Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> > > > >>>> 
> > > > >>>> A call to a synchronize_rcu() can be expensive from time point of view.
> > > > >>>> Different workloads can be affected by this especially the ones which use this
> > > > >>>> API in its time critical sections.
> > > > >>>> 
> > > > >>> 
> > > > >>> This is interesting and meaningful research. ;-)
> > > > >>> 
> > > > >>>> For example in case of NOCB scenario the wakeme_after_rcu() callback
> > > > >>>> invocation depends on where in a nocb-list it is located. Below is an example
> > > > >>>> when it was the last out of ~3600 callbacks:
> > > > >>> 
> > > > >> 
> > > > >> 
> > > > >> 
> > > > >> Can it be implemented separately as follows?  it seems that the code is simpler
> > > > >> (only personal opinion)  😊.
> > > > >> 
> > > > >> But I didn't test whether this reduce synchronize_rcu() waiting time
> > > > >> 
> > > > >> +static void rcu_poll_wait_gp(struct rcu_tasks *rtp)
> > > > >> +{
> > > > >> +       unsigned long gp_snap;
> > > > >> +
> > > > >> +       gp_snap = start_poll_synchronize_rcu();
> > > > >> +       while (!poll_state_synchronize_rcu(gp_snap))
> > > > >> +               schedule_timeout_idle(1);
> > > > > 
> > > > > I could be wrong, but my guess is that the guys working with
> > > > > battery-powered devices are not going to be very happy with this loop.
> > > > > 
> > > > > All those wakeups by all tasks waiting for a grace period end up
> > > > > consuming a surprisingly large amount of energy.
> > > > 
> > > > Is that really the common case? On the general topic of wake-ups:
> > > > Most of the time there should be only one
> > > > task waiting synchronously on a GP to end. If that is
> > > > true, then it feels like waking
> > > > up nocb Kthreads which indirectly wake other threads is doing more work than usual?
> > > 
> > > A good question, and the number of outstanding synchronize_rcu()
> > > calls will of course be limited by the number of tasks in the system.
> > > But I myself have raised the ire of battery-powered embedded folks with
> > > a rather small number of wakeups, so...
> > 
> > But unless I am missing something, even if there is single synchronize_rcu(),
> > you have a flurry of potential wakeups right now, instead of the bare minimum
> > I think. I have not measured how many wake ups, but I'd love to when I get
> > time. Maybe Vlad has some numbers.
> > 
> I will measure and have a look at wake-ups. But, what we have for now is
> if there are two callers of synchronize_rcu() on different CPUs, i guess
> two nocb-kthreads have to handle it, thus two nocb-kthreads have to be
> awaken to do the work. This patch needs only one wake-up to serve all
> users.

One wakeup per synchronize_rcu(), right?

> Anyway, i will provide some data and analysis of it.

Looking forward to seeing it!

							Thanx, Paul
  
Paul E. McKenney March 30, 2023, 7:01 p.m. UTC | #25
On Thu, Mar 30, 2023 at 03:11:15PM +0000, Joel Fernandes wrote:
> On Tue, Mar 28, 2023 at 03:14:32PM -0700, Paul E. McKenney wrote:
> > On Tue, Mar 28, 2023 at 08:26:13AM -0700, Paul E. McKenney wrote:
> > > On Mon, Mar 27, 2023 at 10:29:31PM -0400, Joel Fernandes wrote:
> > > > Hello,
> > > > 
> > > > > On Mar 27, 2023, at 9:06 PM, Paul E. McKenney <paulmck@kernel.org> wrote:
> > > > > 
> > > > > On Mon, Mar 27, 2023 at 11:21:23AM +0000, Zhang, Qiang1 wrote:
> > > > >>>> From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > > > >>>> Sent: Tuesday, March 21, 2023 6:28 PM
> > > > >>>> [...]
> > > > >>>> Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> > > > >>>> 
> > > > >>>> A call to a synchronize_rcu() can be expensive from time point of view.
> > > > >>>> Different workloads can be affected by this especially the ones which use this
> > > > >>>> API in its time critical sections.
> > > > >>>> 
> > > > >>> 
> > > > >>> This is interesting and meaningful research. ;-)
> > > > >>> 
> > > > >>>> For example in case of NOCB scenario the wakeme_after_rcu() callback
> > > > >>>> invocation depends on where in a nocb-list it is located. Below is an example
> > > > >>>> when it was the last out of ~3600 callbacks:
> > > > >>> 
> > > > >> 
> > > > >> 
> > > > >> 
> > > > >> Can it be implemented separately as follows?  it seems that the code is simpler
> > > > >> (only personal opinion)  😊.
> > > > >> 
> > > > >> But I didn't test whether this reduce synchronize_rcu() waiting time
> > > > >> 
> > > > >> +static void rcu_poll_wait_gp(struct rcu_tasks *rtp)
> > > > >> +{
> > > > >> +       unsigned long gp_snap;
> > > > >> +
> > > > >> +       gp_snap = start_poll_synchronize_rcu();
> > > > >> +       while (!poll_state_synchronize_rcu(gp_snap))
> > > > >> +               schedule_timeout_idle(1);
> > > > > 
> > > > > I could be wrong, but my guess is that the guys working with
> > > > > battery-powered devices are not going to be very happy with this loop.
> > > > > 
> > > > > All those wakeups by all tasks waiting for a grace period end up
> > > > > consuming a surprisingly large amount of energy.
> > > > 
> > > > Is that really the common case? On the general topic of wake-ups:
> > > > Most of the time there should be only one
> > > > task waiting synchronously on a GP to end. If that is
> > > > true, then it feels like waking
> > > > up nocb Kthreads which indirectly wake other threads is doing more work than usual?
> > > 
> > > A good question, and the number of outstanding synchronize_rcu()
> > > calls will of course be limited by the number of tasks in the system.
> > > But I myself have raised the ire of battery-powered embedded folks with
> > > a rather small number of wakeups, so...
> > > 
> > > And on larger systems there can be a tradeoff between contention on
> > > the one hand and number of wakeups on the other.
> > > 
> > > The original nocb implementation in fact had the grace-period kthead
> > > waking up all of what are now called rcuoc kthreads.  The indirect scheme
> > > reduced the total number of wakeups by up to 50% and also reduced the
> > > CPU consumption of the grace-period kthread, which otherwise would have
> > > become a bottleneck on large systems.
> > > 
> > > And also, a scheme that directly wakes tasks waiting in synchronize_rcu()
> > > might well use the same ->nocb_gp_wq[] waitqueues that are used by the
> > > rcuog kthreads, if that is what you were getting at.
> > 
> > And on small systems, you might of course have the rcuog kthread directly
> > invoke callbacks if there are not very many of them.  This would of
> > course need to be done quite carefully to avoid any number of races
> > with the rcuoc kthreads.  You could do the same thing on a large system,
> > but on a per-rcuog basis.
> > 
> > I vaguely recall discussing this in one of our sessions, but who knows?
> > 
> > Would this really be of benefit?  Or did you have something else in mind?
> 
> Yes, this is what I was also referring to.
> 
> Not sure about benefit, depends on workloads and measurement.

There are of course potential downsides, including slower handling of
callback floods and tuning difficulties due to there being no good way
thus far to estimate how much time a given RCU callback will consume.

But if there are significant benefits to battery-powered systems, then
enabling this sort of thing only on those systems could make a lot
of sense.

							Thanx, Paul

> thanks,
> 
>  - Joel
> 
> 
> > 
> > 							Thanx, Paul
> > 
> > > > I am curious to measure how much does Vlad patch reduce wakeups in the common case.
> > > 
> > > Sounds like a good thing to measure!
> > > 
> > > > I was also wondering how Vlad patch effects RCU-barrier ordering. I guess
> > > > we want the wake up to happen in the order of
> > > > other callbacks also waiting.
> > > 
> > > OK, I will bite.  Why would rcu_barrier() need to care about the
> > > synchronize_rcu() invocations if they no longer used call_rcu()?
> > > 
> > > > One last note, most battery powered systems are perhaps already using expedited RCU ;-)
> > > 
> > > Good point.  And that does raise the question of exactly what workloads
> > > and systems want faster wakeups from synchronize_rcu() and cannot get
> > > this effect from expedited grace periods.
> > > 
> > > 							Thanx, Paul
> > > 
> > > > Thoughts?
> > > > 
> > > >  - Joel 
> > > > 
> > > > > 
> > > > >                            Thanx, Paul
> > > > > 
> > > > >> +}
> > > > >> +
> > > > >> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func);
> > > > >> +DEFINE_RCU_TASKS(rcu_poll, rcu_poll_wait_gp, call_rcu_poll,
> > > > >> +                 "RCU Poll");
> > > > >> +void call_rcu_poll(struct rcu_head *rhp, rcu_callback_t func)
> > > > >> +{
> > > > >> +       call_rcu_tasks_generic(rhp, func, &rcu_poll);
> > > > >> +}
> > > > >> +EXPORT_SYMBOL_GPL(call_rcu_poll);
> > > > >> +
> > > > >> +void synchronize_rcu_poll(void)
> > > > >> +{
> > > > >> +       synchronize_rcu_tasks_generic(&rcu_poll);
> > > > >> +}
> > > > >> +EXPORT_SYMBOL_GPL(synchronize_rcu_poll);
> > > > >> +
> > > > >> +static int __init rcu_spawn_poll_kthread(void)
> > > > >> +{
> > > > >> +       cblist_init_generic(&rcu_poll);
> > > > >> +       rcu_poll.gp_sleep = HZ / 10;
> > > > >> +       rcu_spawn_tasks_kthread_generic(&rcu_poll);
> > > > >> +       return 0;
> > > > >> +}
> > > > >> 
> > > > >> Thanks
> > > > >> Zqiang
> > > > >> 
> > > > >> 
> > > > >>>> 
> > > > >>>> <snip>
> > > > >>>>  <...>-29      [001] d..1. 21950.145313: rcu_batch_start: rcu_preempt
> > > > >>>> CBs=3613 bl=28
> > > > >>>> ...
> > > > >>>>  <...>-29      [001] ..... 21950.152578: rcu_invoke_callback: rcu_preempt
> > > > >>>> rhp=00000000b2d6dee8 func=__free_vm_area_struct.cfi_jt
> > > > >>>>  <...>-29      [001] ..... 21950.152579: rcu_invoke_callback: rcu_preempt
> > > > >>>> rhp=00000000a446f607 func=__free_vm_area_struct.cfi_jt
> > > > >>>>  <...>-29      [001] ..... 21950.152580: rcu_invoke_callback: rcu_preempt
> > > > >>>> rhp=00000000a5cab03b func=__free_vm_area_struct.cfi_jt
> > > > >>>>  <...>-29      [001] ..... 21950.152581: rcu_invoke_callback: rcu_preempt
> > > > >>>> rhp=0000000013b7e5ee func=__free_vm_area_struct.cfi_jt
> > > > >>>>  <...>-29      [001] ..... 21950.152582: rcu_invoke_callback: rcu_preempt
> > > > >>>> rhp=000000000a8ca6f9 func=__free_vm_area_struct.cfi_jt
> > > > >>>>  <...>-29      [001] ..... 21950.152583: rcu_invoke_callback: rcu_preempt
> > > > >>>> rhp=000000008f162ca8 func=wakeme_after_rcu.cfi_jt
> > > > >>>>  <...>-29      [001] d..1. 21950.152625: rcu_batch_end: rcu_preempt CBs-
> > > > >>>> invoked=3612 idle=....
> > > > >>>> <snip>
> > > > >>>> 
> > > > >>> 
> > > > >>> Did the results above tell us that CBs-invoked=3612 during the time 21950.145313 ~ 21950.152625?
> > > > >>> 
> > > > >>> Yes.
> > > > >>> 
> > > > >>> 
> > > > >>> If possible, may I know the steps, commands, and related parameters to produce the results above?
> > > > >>> Thank you!
> > > > >>> 
> > > > >>> Build the kernel with CONFIG_RCU_TRACE configuration. Update your "set_event"
> > > > >>> file with appropriate traces:
> > > > >>> 
> > > > >>> <snip>
> > > > >>> XQ-DQ54:/sys/kernel/tracing # echo rcu:rcu_batch_start rcu:rcu_batch_end rcu:rcu_invoke_callback > set_event
> > > > >>> 
> > > > >>> XQ-DQ54:/sys/kernel/tracing # cat set_event
> > > > >>> rcu:rcu_batch_start
> > > > >>> rcu:rcu_invoke_callback
> > > > >>> rcu:rcu_batch_end
> > > > >>> XQ-DQ54:/sys/kernel/tracing #
> > > > >>> <snip>
> > > > >>> 
> > > > >>> Collect traces as much as you want: XQ-DQ54:/sys/kernel/tracing # echo 1 > tracing_on; sleep 10; echo 0 > tracing_on
> > > > >>> Next problem is how to parse it. Of course you will not be able to parse
> > > > >>> megabytes of traces. For that purpose i use a special C trace parser.
> > > > >>> If you need an example please let me know i can show here.
> > > > >>> 
> > > > >>> --
> > > > >>> Uladzislau Rezki
  
Uladzislau Rezki March 30, 2023, 7:18 p.m. UTC | #26
On Thu, Mar 30, 2023 at 11:58:41AM -0700, Paul E. McKenney wrote:
> On Thu, Mar 30, 2023 at 05:43:15PM +0200, Uladzislau Rezki wrote:
> > On Thu, Mar 30, 2023 at 03:09:33PM +0000, Joel Fernandes wrote:
> > > On Tue, Mar 28, 2023 at 08:26:13AM -0700, Paul E. McKenney wrote:
> > > > On Mon, Mar 27, 2023 at 10:29:31PM -0400, Joel Fernandes wrote:
> > > > > Hello,
> > > > > 
> > > > > > On Mar 27, 2023, at 9:06 PM, Paul E. McKenney <paulmck@kernel.org> wrote:
> > > > > > 
> > > > > > On Mon, Mar 27, 2023 at 11:21:23AM +0000, Zhang, Qiang1 wrote:
> > > > > >>>> From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > > > > >>>> Sent: Tuesday, March 21, 2023 6:28 PM
> > > > > >>>> [...]
> > > > > >>>> Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> > > > > >>>> 
> > > > > >>>> A call to a synchronize_rcu() can be expensive from time point of view.
> > > > > >>>> Different workloads can be affected by this especially the ones which use this
> > > > > >>>> API in its time critical sections.
> > > > > >>>> 
> > > > > >>> 
> > > > > >>> This is interesting and meaningful research. ;-)
> > > > > >>> 
> > > > > >>>> For example in case of NOCB scenario the wakeme_after_rcu() callback
> > > > > >>>> invocation depends on where in a nocb-list it is located. Below is an example
> > > > > >>>> when it was the last out of ~3600 callbacks:
> > > > > >>> 
> > > > > >> 
> > > > > >> 
> > > > > >> 
> > > > > >> Can it be implemented separately as follows?  it seems that the code is simpler
> > > > > >> (only personal opinion)  😊.
> > > > > >> 
> > > > > >> But I didn't test whether this reduce synchronize_rcu() waiting time
> > > > > >> 
> > > > > >> +static void rcu_poll_wait_gp(struct rcu_tasks *rtp)
> > > > > >> +{
> > > > > >> +       unsigned long gp_snap;
> > > > > >> +
> > > > > >> +       gp_snap = start_poll_synchronize_rcu();
> > > > > >> +       while (!poll_state_synchronize_rcu(gp_snap))
> > > > > >> +               schedule_timeout_idle(1);
> > > > > > 
> > > > > > I could be wrong, but my guess is that the guys working with
> > > > > > battery-powered devices are not going to be very happy with this loop.
> > > > > > 
> > > > > > All those wakeups by all tasks waiting for a grace period end up
> > > > > > consuming a surprisingly large amount of energy.
> > > > > 
> > > > > Is that really the common case? On the general topic of wake-ups:
> > > > > Most of the time there should be only one
> > > > > task waiting synchronously on a GP to end. If that is
> > > > > true, then it feels like waking
> > > > > up nocb Kthreads which indirectly wake other threads is doing more work than usual?
> > > > 
> > > > A good question, and the number of outstanding synchronize_rcu()
> > > > calls will of course be limited by the number of tasks in the system.
> > > > But I myself have raised the ire of battery-powered embedded folks with
> > > > a rather small number of wakeups, so...
> > > 
> > > But unless I am missing something, even if there is single synchronize_rcu(),
> > > you have a flurry of potential wakeups right now, instead of the bare minimum
> > > I think. I have not measured how many wake ups, but I'd love to when I get
> > > time. Maybe Vlad has some numbers.
> > > 
> > I will measure and have a look at wake-ups. But, what we have for now is
> > if there are two callers of synchronize_rcu() on different CPUs, i guess
> > two nocb-kthreads have to handle it, thus two nocb-kthreads have to be
> > awaken to do the work. This patch needs only one wake-up to serve all
> > users.
> 
> One wakeup per synchronize_rcu(), right?
> 
The gp-kthread wake-ups only one work, in its turn a worker wake-ups all
registered users of synchronize_rcu() for which a gp was passed. How many
users of synchonize_rcu() awaken by one worker depends on how many were
registered before initiating a new GP by the gp-kthread.

> > Anyway, i will provide some data and analysis of it.
> 
> Looking forward to seeing it!
> 
Good. I will switch fully on it soon. I need to sort out some perf.
issues at work.

--
Uladzislau Rezki
  
Paul E. McKenney March 30, 2023, 9:16 p.m. UTC | #27
On Thu, Mar 30, 2023 at 09:18:44PM +0200, Uladzislau Rezki wrote:
> On Thu, Mar 30, 2023 at 11:58:41AM -0700, Paul E. McKenney wrote:
> > On Thu, Mar 30, 2023 at 05:43:15PM +0200, Uladzislau Rezki wrote:
> > > On Thu, Mar 30, 2023 at 03:09:33PM +0000, Joel Fernandes wrote:
> > > > On Tue, Mar 28, 2023 at 08:26:13AM -0700, Paul E. McKenney wrote:
> > > > > On Mon, Mar 27, 2023 at 10:29:31PM -0400, Joel Fernandes wrote:
> > > > > > Hello,
> > > > > > 
> > > > > > > On Mar 27, 2023, at 9:06 PM, Paul E. McKenney <paulmck@kernel.org> wrote:
> > > > > > > 
> > > > > > > On Mon, Mar 27, 2023 at 11:21:23AM +0000, Zhang, Qiang1 wrote:
> > > > > > >>>> From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > > > > > >>>> Sent: Tuesday, March 21, 2023 6:28 PM
> > > > > > >>>> [...]
> > > > > > >>>> Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> > > > > > >>>> 
> > > > > > >>>> A call to a synchronize_rcu() can be expensive from time point of view.
> > > > > > >>>> Different workloads can be affected by this especially the ones which use this
> > > > > > >>>> API in its time critical sections.
> > > > > > >>>> 
> > > > > > >>> 
> > > > > > >>> This is interesting and meaningful research. ;-)
> > > > > > >>> 
> > > > > > >>>> For example in case of NOCB scenario the wakeme_after_rcu() callback
> > > > > > >>>> invocation depends on where in a nocb-list it is located. Below is an example
> > > > > > >>>> when it was the last out of ~3600 callbacks:
> > > > > > >>> 
> > > > > > >> 
> > > > > > >> 
> > > > > > >> 
> > > > > > >> Can it be implemented separately as follows?  it seems that the code is simpler
> > > > > > >> (only personal opinion)  😊.
> > > > > > >> 
> > > > > > >> But I didn't test whether this reduce synchronize_rcu() waiting time
> > > > > > >> 
> > > > > > >> +static void rcu_poll_wait_gp(struct rcu_tasks *rtp)
> > > > > > >> +{
> > > > > > >> +       unsigned long gp_snap;
> > > > > > >> +
> > > > > > >> +       gp_snap = start_poll_synchronize_rcu();
> > > > > > >> +       while (!poll_state_synchronize_rcu(gp_snap))
> > > > > > >> +               schedule_timeout_idle(1);
> > > > > > > 
> > > > > > > I could be wrong, but my guess is that the guys working with
> > > > > > > battery-powered devices are not going to be very happy with this loop.
> > > > > > > 
> > > > > > > All those wakeups by all tasks waiting for a grace period end up
> > > > > > > consuming a surprisingly large amount of energy.
> > > > > > 
> > > > > > Is that really the common case? On the general topic of wake-ups:
> > > > > > Most of the time there should be only one
> > > > > > task waiting synchronously on a GP to end. If that is
> > > > > > true, then it feels like waking
> > > > > > up nocb Kthreads which indirectly wake other threads is doing more work than usual?
> > > > > 
> > > > > A good question, and the number of outstanding synchronize_rcu()
> > > > > calls will of course be limited by the number of tasks in the system.
> > > > > But I myself have raised the ire of battery-powered embedded folks with
> > > > > a rather small number of wakeups, so...
> > > > 
> > > > But unless I am missing something, even if there is single synchronize_rcu(),
> > > > you have a flurry of potential wakeups right now, instead of the bare minimum
> > > > I think. I have not measured how many wake ups, but I'd love to when I get
> > > > time. Maybe Vlad has some numbers.
> > > > 
> > > I will measure and have a look at wake-ups. But, what we have for now is
> > > if there are two callers of synchronize_rcu() on different CPUs, i guess
> > > two nocb-kthreads have to handle it, thus two nocb-kthreads have to be
> > > awaken to do the work. This patch needs only one wake-up to serve all
> > > users.
> > 
> > One wakeup per synchronize_rcu(), right?
> > 
> The gp-kthread wake-ups only one work, in its turn a worker wake-ups all
> registered users of synchronize_rcu() for which a gp was passed. How many
> users of synchonize_rcu() awaken by one worker depends on how many were
> registered before initiating a new GP by the gp-kthread.
> 
> > > Anyway, i will provide some data and analysis of it.
> > 
> > Looking forward to seeing it!
> > 
> Good. I will switch fully on it soon. I need to sort out some perf.
> issues at work.

And if you are looking for reduced wakeups instead of lower latency for
synchronize_rcu(), I could see where the extra workqueue wakeup might
be a problem for you.

Assuming that this is all default-off, you could keep a count of the
number of required wakeups for each grace period (indexed as usual by
the bottom few bits of the grace-period counter without the low-order
state bits), and do the wakeups directly from the grace-period kthread
if there are not all that many of them.

Except that, given that workqueues try hard to make the handler be on the
same CPU as the one that did the corresponding schedule_work() invocation,
it is not clear that this particular wakeup is really costing you enough
to notice.  (That CPU is not idle, after all.)  But there is nothing
quite like measuring the actual energy consumption on real hardware!

							Thanx, Paul
  
Uladzislau Rezki March 31, 2023, 10:55 a.m. UTC | #28
On Thu, Mar 30, 2023 at 02:16:36PM -0700, Paul E. McKenney wrote:
> On Thu, Mar 30, 2023 at 09:18:44PM +0200, Uladzislau Rezki wrote:
> > On Thu, Mar 30, 2023 at 11:58:41AM -0700, Paul E. McKenney wrote:
> > > On Thu, Mar 30, 2023 at 05:43:15PM +0200, Uladzislau Rezki wrote:
> > > > On Thu, Mar 30, 2023 at 03:09:33PM +0000, Joel Fernandes wrote:
> > > > > On Tue, Mar 28, 2023 at 08:26:13AM -0700, Paul E. McKenney wrote:
> > > > > > On Mon, Mar 27, 2023 at 10:29:31PM -0400, Joel Fernandes wrote:
> > > > > > > Hello,
> > > > > > > 
> > > > > > > > On Mar 27, 2023, at 9:06 PM, Paul E. McKenney <paulmck@kernel.org> wrote:
> > > > > > > > 
> > > > > > > > On Mon, Mar 27, 2023 at 11:21:23AM +0000, Zhang, Qiang1 wrote:
> > > > > > > >>>> From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > > > > > > >>>> Sent: Tuesday, March 21, 2023 6:28 PM
> > > > > > > >>>> [...]
> > > > > > > >>>> Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> > > > > > > >>>> 
> > > > > > > >>>> A call to a synchronize_rcu() can be expensive from time point of view.
> > > > > > > >>>> Different workloads can be affected by this especially the ones which use this
> > > > > > > >>>> API in its time critical sections.
> > > > > > > >>>> 
> > > > > > > >>> 
> > > > > > > >>> This is interesting and meaningful research. ;-)
> > > > > > > >>> 
> > > > > > > >>>> For example in case of NOCB scenario the wakeme_after_rcu() callback
> > > > > > > >>>> invocation depends on where in a nocb-list it is located. Below is an example
> > > > > > > >>>> when it was the last out of ~3600 callbacks:
> > > > > > > >>> 
> > > > > > > >> 
> > > > > > > >> 
> > > > > > > >> 
> > > > > > > >> Can it be implemented separately as follows?  it seems that the code is simpler
> > > > > > > >> (only personal opinion)  😊.
> > > > > > > >> 
> > > > > > > >> But I didn't test whether this reduce synchronize_rcu() waiting time
> > > > > > > >> 
> > > > > > > >> +static void rcu_poll_wait_gp(struct rcu_tasks *rtp)
> > > > > > > >> +{
> > > > > > > >> +       unsigned long gp_snap;
> > > > > > > >> +
> > > > > > > >> +       gp_snap = start_poll_synchronize_rcu();
> > > > > > > >> +       while (!poll_state_synchronize_rcu(gp_snap))
> > > > > > > >> +               schedule_timeout_idle(1);
> > > > > > > > 
> > > > > > > > I could be wrong, but my guess is that the guys working with
> > > > > > > > battery-powered devices are not going to be very happy with this loop.
> > > > > > > > 
> > > > > > > > All those wakeups by all tasks waiting for a grace period end up
> > > > > > > > consuming a surprisingly large amount of energy.
> > > > > > > 
> > > > > > > Is that really the common case? On the general topic of wake-ups:
> > > > > > > Most of the time there should be only one
> > > > > > > task waiting synchronously on a GP to end. If that is
> > > > > > > true, then it feels like waking
> > > > > > > up nocb Kthreads which indirectly wake other threads is doing more work than usual?
> > > > > > 
> > > > > > A good question, and the number of outstanding synchronize_rcu()
> > > > > > calls will of course be limited by the number of tasks in the system.
> > > > > > But I myself have raised the ire of battery-powered embedded folks with
> > > > > > a rather small number of wakeups, so...
> > > > > 
> > > > > But unless I am missing something, even if there is single synchronize_rcu(),
> > > > > you have a flurry of potential wakeups right now, instead of the bare minimum
> > > > > I think. I have not measured how many wake ups, but I'd love to when I get
> > > > > time. Maybe Vlad has some numbers.
> > > > > 
> > > > I will measure and have a look at wake-ups. But, what we have for now is
> > > > if there are two callers of synchronize_rcu() on different CPUs, i guess
> > > > two nocb-kthreads have to handle it, thus two nocb-kthreads have to be
> > > > awaken to do the work. This patch needs only one wake-up to serve all
> > > > users.
> > > 
> > > One wakeup per synchronize_rcu(), right?
> > > 
> > The gp-kthread wake-ups only one work, in its turn a worker wake-ups all
> > registered users of synchronize_rcu() for which a gp was passed. How many
> > users of synchonize_rcu() awaken by one worker depends on how many were
> > registered before initiating a new GP by the gp-kthread.
> > 
> > > > Anyway, i will provide some data and analysis of it.
> > > 
> > > Looking forward to seeing it!
> > > 
> > Good. I will switch fully on it soon. I need to sort out some perf.
> > issues at work.
> 
> And if you are looking for reduced wakeups instead of lower latency for
> synchronize_rcu(), I could see where the extra workqueue wakeup might
> be a problem for you.
> 
> Assuming that this is all default-off, you could keep a count of the
> number of required wakeups for each grace period (indexed as usual by
> the bottom few bits of the grace-period counter without the low-order
> state bits), and do the wakeups directly from the grace-period kthread
> if there are not all that many of them.
> 
At least if there is only one user of synchronize_rcu(), we can wake
it directly, i mean to invoke comlete() from the gp-kthread. I think
we should split such parts into different patches.

>
> Except that, given that workqueues try hard to make the handler be on the
> same CPU as the one that did the corresponding schedule_work() invocation,
> it is not clear that this particular wakeup is really costing you enough
> to notice.  (That CPU is not idle, after all.)  But there is nothing
> quite like measuring the actual energy consumption on real hardware!
> 
AFAICR the schedule_work() wants to use current CPU, indeed. For few
users it might be OK to comlete() them directly. Energy wise, +1 wake-up
of our worker to kick all users, if not "direct" option. I do not think
energy is a problem here.

--
Uladzislau Rezki
  
Paul E. McKenney March 31, 2023, 6:39 p.m. UTC | #29
On Fri, Mar 31, 2023 at 12:55:36PM +0200, Uladzislau Rezki wrote:
> On Thu, Mar 30, 2023 at 02:16:36PM -0700, Paul E. McKenney wrote:
> > On Thu, Mar 30, 2023 at 09:18:44PM +0200, Uladzislau Rezki wrote:
> > > On Thu, Mar 30, 2023 at 11:58:41AM -0700, Paul E. McKenney wrote:
> > > > On Thu, Mar 30, 2023 at 05:43:15PM +0200, Uladzislau Rezki wrote:
> > > > > On Thu, Mar 30, 2023 at 03:09:33PM +0000, Joel Fernandes wrote:
> > > > > > On Tue, Mar 28, 2023 at 08:26:13AM -0700, Paul E. McKenney wrote:
> > > > > > > On Mon, Mar 27, 2023 at 10:29:31PM -0400, Joel Fernandes wrote:
> > > > > > > > Hello,
> > > > > > > > 
> > > > > > > > > On Mar 27, 2023, at 9:06 PM, Paul E. McKenney <paulmck@kernel.org> wrote:
> > > > > > > > > 
> > > > > > > > > On Mon, Mar 27, 2023 at 11:21:23AM +0000, Zhang, Qiang1 wrote:
> > > > > > > > >>>> From: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > > > > > > > >>>> Sent: Tuesday, March 21, 2023 6:28 PM
> > > > > > > > >>>> [...]
> > > > > > > > >>>> Subject: [PATCH 1/1] Reduce synchronize_rcu() waiting time
> > > > > > > > >>>> 
> > > > > > > > >>>> A call to a synchronize_rcu() can be expensive from time point of view.
> > > > > > > > >>>> Different workloads can be affected by this especially the ones which use this
> > > > > > > > >>>> API in its time critical sections.
> > > > > > > > >>>> 
> > > > > > > > >>> 
> > > > > > > > >>> This is interesting and meaningful research. ;-)
> > > > > > > > >>> 
> > > > > > > > >>>> For example in case of NOCB scenario the wakeme_after_rcu() callback
> > > > > > > > >>>> invocation depends on where in a nocb-list it is located. Below is an example
> > > > > > > > >>>> when it was the last out of ~3600 callbacks:
> > > > > > > > >>> 
> > > > > > > > >> 
> > > > > > > > >> 
> > > > > > > > >> 
> > > > > > > > >> Can it be implemented separately as follows?  it seems that the code is simpler
> > > > > > > > >> (only personal opinion)  😊.
> > > > > > > > >> 
> > > > > > > > >> But I didn't test whether this reduce synchronize_rcu() waiting time
> > > > > > > > >> 
> > > > > > > > >> +static void rcu_poll_wait_gp(struct rcu_tasks *rtp)
> > > > > > > > >> +{
> > > > > > > > >> +       unsigned long gp_snap;
> > > > > > > > >> +
> > > > > > > > >> +       gp_snap = start_poll_synchronize_rcu();
> > > > > > > > >> +       while (!poll_state_synchronize_rcu(gp_snap))
> > > > > > > > >> +               schedule_timeout_idle(1);
> > > > > > > > > 
> > > > > > > > > I could be wrong, but my guess is that the guys working with
> > > > > > > > > battery-powered devices are not going to be very happy with this loop.
> > > > > > > > > 
> > > > > > > > > All those wakeups by all tasks waiting for a grace period end up
> > > > > > > > > consuming a surprisingly large amount of energy.
> > > > > > > > 
> > > > > > > > Is that really the common case? On the general topic of wake-ups:
> > > > > > > > Most of the time there should be only one
> > > > > > > > task waiting synchronously on a GP to end. If that is
> > > > > > > > true, then it feels like waking
> > > > > > > > up nocb Kthreads which indirectly wake other threads is doing more work than usual?
> > > > > > > 
> > > > > > > A good question, and the number of outstanding synchronize_rcu()
> > > > > > > calls will of course be limited by the number of tasks in the system.
> > > > > > > But I myself have raised the ire of battery-powered embedded folks with
> > > > > > > a rather small number of wakeups, so...
> > > > > > 
> > > > > > But unless I am missing something, even if there is single synchronize_rcu(),
> > > > > > you have a flurry of potential wakeups right now, instead of the bare minimum
> > > > > > I think. I have not measured how many wake ups, but I'd love to when I get
> > > > > > time. Maybe Vlad has some numbers.
> > > > > > 
> > > > > I will measure and have a look at wake-ups. But, what we have for now is
> > > > > if there are two callers of synchronize_rcu() on different CPUs, i guess
> > > > > two nocb-kthreads have to handle it, thus two nocb-kthreads have to be
> > > > > awaken to do the work. This patch needs only one wake-up to serve all
> > > > > users.
> > > > 
> > > > One wakeup per synchronize_rcu(), right?
> > > > 
> > > The gp-kthread wake-ups only one work, in its turn a worker wake-ups all
> > > registered users of synchronize_rcu() for which a gp was passed. How many
> > > users of synchonize_rcu() awaken by one worker depends on how many were
> > > registered before initiating a new GP by the gp-kthread.
> > > 
> > > > > Anyway, i will provide some data and analysis of it.
> > > > 
> > > > Looking forward to seeing it!
> > > > 
> > > Good. I will switch fully on it soon. I need to sort out some perf.
> > > issues at work.
> > 
> > And if you are looking for reduced wakeups instead of lower latency for
> > synchronize_rcu(), I could see where the extra workqueue wakeup might
> > be a problem for you.
> > 
> > Assuming that this is all default-off, you could keep a count of the
> > number of required wakeups for each grace period (indexed as usual by
> > the bottom few bits of the grace-period counter without the low-order
> > state bits), and do the wakeups directly from the grace-period kthread
> > if there are not all that many of them.
> > 
> At least if there is only one user of synchronize_rcu(), we can wake
> it directly, i mean to invoke comlete() from the gp-kthread. I think
> we should split such parts into different patches.

Agreed, especially given that some experimentation will be required
to work out which techniques actually help.

> > Except that, given that workqueues try hard to make the handler be on the
> > same CPU as the one that did the corresponding schedule_work() invocation,
> > it is not clear that this particular wakeup is really costing you enough
> > to notice.  (That CPU is not idle, after all.)  But there is nothing
> > quite like measuring the actual energy consumption on real hardware!
> > 
> AFAICR the schedule_work() wants to use current CPU, indeed. For few
> users it might be OK to comlete() them directly. Energy wise, +1 wake-up
> of our worker to kick all users, if not "direct" option. I do not think
> energy is a problem here.

Agreed, estimating/modeling energy consumption is as far as I know still
is more of an art than a science.  I did something like eight or nine
rewrites of the old NO_HZ_FULL code learning that one.  ;-)

							Thanx, Paul
  

Patch

diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index ee27a03d7576..349200a058da 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -1380,6 +1380,78 @@  static void rcu_poll_gp_seq_end_unlocked(unsigned long *snap)
 		raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
 }
 
+static void rcu_sr_normal_add_req_this_cpu(struct list_head *list)
+{
+	struct rcu_data *rdp = raw_cpu_ptr(&rcu_data);
+
+	raw_spin_lock(&rdp->sr_array_lock);
+	list_add(list, &rdp->sr_array[0]);
+	raw_spin_unlock(&rdp->sr_array_lock);
+}
+
+static void rcu_sr_normal_gp_cleanup_work(struct work_struct *work)
+{
+	struct list_head gp_ready_list, *it, *next;
+	struct rcu_synchronize *rs;
+	struct rcu_data *rdp;
+	int i;
+
+	for_each_possible_cpu(i) {
+		rdp = per_cpu_ptr(&rcu_data, i);
+		if (list_empty(&rdp->sr_array[2]))
+			continue;
+
+		raw_spin_lock(&rdp->sr_array_lock);
+		list_replace_init(&rdp->sr_array[2], &gp_ready_list);
+		raw_spin_unlock(&rdp->sr_array_lock);
+
+		list_for_each_safe(it, next, &gp_ready_list) {
+			rs = container_of((struct rcu_head *) it, typeof(*rs), head);
+			complete(&rs->completion);
+		}
+	}
+}
+static DECLARE_WORK(sr_normal_gp_cleanup, rcu_sr_normal_gp_cleanup_work);
+
+static void rcu_sr_normal_gp_cleanup(void)
+{
+	bool need_sr_cleanup = false;
+	struct rcu_data *rdp;
+	int i;
+
+	for_each_possible_cpu(i) {
+		rdp = per_cpu_ptr(&rcu_data, i);
+		if (list_empty(&rdp->sr_array[1]))
+			continue;
+
+		raw_spin_lock(&rdp->sr_array_lock);
+		list_splice_init(&rdp->sr_array[1], &rdp->sr_array[2]);
+		raw_spin_unlock(&rdp->sr_array_lock);
+
+		/* OK, there are ready to go users. */
+		need_sr_cleanup = true;
+	}
+
+	if (need_sr_cleanup)
+		queue_work(system_highpri_wq, &sr_normal_gp_cleanup);
+}
+
+static void rcu_sr_normal_gp_init(void)
+{
+	struct rcu_data *rdp;
+	int i;
+
+	for_each_possible_cpu(i) {
+		rdp = per_cpu_ptr(&rcu_data, i);
+		if (list_empty(&rdp->sr_array[0]))
+			continue;
+
+		raw_spin_lock(&rdp->sr_array_lock);
+		list_splice_init(&rdp->sr_array[0], &rdp->sr_array[1]);
+		raw_spin_unlock(&rdp->sr_array_lock);
+	}
+}
+
 /*
  * Initialize a new grace period.  Return false if no grace period required.
  */
@@ -1477,6 +1549,8 @@  static noinline_for_stack bool rcu_gp_init(void)
 		arch_spin_unlock(&rcu_state.ofl_lock);
 		local_irq_restore(flags);
 	}
+
+	rcu_sr_normal_gp_init();		/* Prepare synchronize_rcu() users. */
 	rcu_gp_slow(gp_preinit_delay); /* Races with CPU hotplug. */
 
 	/*
@@ -1771,6 +1845,9 @@  static noinline void rcu_gp_cleanup(void)
 	}
 	raw_spin_unlock_irq_rcu_node(rnp);
 
+	// Make synchronize_rcu() users aware of the end of old grace period.
+	rcu_sr_normal_gp_cleanup();
+
 	// If strict, make all CPUs aware of the end of the old grace period.
 	if (IS_ENABLED(CONFIG_RCU_STRICT_GRACE_PERIOD))
 		on_each_cpu(rcu_strict_gp_boundary, NULL, 0);
@@ -3434,6 +3511,25 @@  static int rcu_blocking_is_gp(void)
 	return true;
 }
 
+/*
+ * Helper function for the synchronize_rcu() API.
+ */
+static void synchronize_rcu_normal(void)
+{
+	struct rcu_synchronize rs;
+
+	init_rcu_head_on_stack(&rs.head);
+	init_completion(&rs.completion);
+	rcu_sr_normal_add_req_this_cpu((struct list_head *) &rs.head);
+
+	/* Kick a grace period if there is a need. */
+	(void) start_poll_synchronize_rcu();
+
+	/* Now we can wait. */
+	wait_for_completion(&rs.completion);
+	destroy_rcu_head_on_stack(&rs.head);
+}
+
 /**
  * synchronize_rcu - wait until a grace period has elapsed.
  *
@@ -3485,7 +3581,7 @@  void synchronize_rcu(void)
 		if (rcu_gp_is_expedited())
 			synchronize_rcu_expedited();
 		else
-			wait_rcu_gp(call_rcu_hurry);
+			synchronize_rcu_normal();
 		return;
 	}
 
@@ -4215,6 +4311,7 @@  rcu_boot_init_percpu_data(int cpu)
 {
 	struct context_tracking *ct = this_cpu_ptr(&context_tracking);
 	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+	int i;
 
 	/* Set up local state, ensuring consistent view of global state. */
 	rdp->grpmask = leaf_node_cpu_bit(rdp->mynode, cpu);
@@ -4229,6 +4326,11 @@  rcu_boot_init_percpu_data(int cpu)
 	rdp->last_sched_clock = jiffies;
 	rdp->cpu = cpu;
 	rcu_boot_init_nocb_percpu_data(rdp);
+
+	for (i = 0; i < ARRAY_SIZE(rdp->sr_array); i++)
+		INIT_LIST_HEAD(&rdp->sr_array[i]);
+
+	raw_spin_lock_init(&rdp->sr_array_lock);
 }
 
 /*
diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
index 192536916f9a..f09969bf7a8d 100644
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -284,6 +284,15 @@  struct rcu_data {
 
 	long lazy_len;			/* Length of buffered lazy callbacks. */
 	int cpu;
+
+	/*
+	 * This array of lists is for handling synchronize_rcu() users.
+	 * An index 0 corresponds to new users, 1 for users which wait
+	 * for a grace period and last one is for which a grace period
+	 * is passed.
+	 */
+	struct list_head sr_array[3];
+	raw_spinlock_t sr_array_lock;
 };
 
 /* Values for nocb_defer_wakeup field in struct rcu_data. */