[v11,2/5] ring-buffer: Introducing ring-buffer mapping functions

Message ID 20240111161712.1480333-3-vdonnefort@google.com
State New
Headers
Series Introducing trace buffer mapping by user-space |

Commit Message

Vincent Donnefort Jan. 11, 2024, 4:17 p.m. UTC
  In preparation for allowing the user-space to map a ring-buffer, add
a set of mapping functions:

  ring_buffer_{map,unmap}()
  ring_buffer_map_fault()

And controls on the ring-buffer:

  ring_buffer_map_get_reader()  /* swap reader and head */

Mapping the ring-buffer also involves:

  A unique ID for each subbuf of the ring-buffer, currently they are
  only identified through their in-kernel VA.

  A meta-page, where are stored ring-buffer statistics and a
  description for the current reader

The linear mapping exposes the meta-page, and each subbuf of the
ring-buffer, ordered following their unique ID, assigned during the
first mapping.

Once mapped, no subbuf can get in or out of the ring-buffer: the buffer
size will remain unmodified and the splice enabling functions will in
reality simply memcpy the data instead of swapping subbufs.

Signed-off-by: Vincent Donnefort <vdonnefort@google.com>
  

Comments

Mathieu Desnoyers Jan. 11, 2024, 4:34 p.m. UTC | #1
On 2024-01-11 11:17, Vincent Donnefort wrote:
> In preparation for allowing the user-space to map a ring-buffer, add
> a set of mapping functions:
> 
>    ring_buffer_{map,unmap}()
>    ring_buffer_map_fault()
> 
> And controls on the ring-buffer:
> 
>    ring_buffer_map_get_reader()  /* swap reader and head */
> 
> Mapping the ring-buffer also involves:
> 
>    A unique ID for each subbuf of the ring-buffer, currently they are
>    only identified through their in-kernel VA.
> 
>    A meta-page, where are stored ring-buffer statistics and a
>    description for the current reader
> 

Hi Vincent,

The LTTng kernel tracer has supported mmap'd buffers for nearly 15 years [1],
and has a lot of similarities with this patch series.

LTTng has the notion of "subbuffer id" to allow atomically exchanging a
"reader" extra subbuffer with the subbuffer to be read. It implements
"get subbuffer" / "put subbuffer" ioctls to allow the consumer (reader)
to move the currently read subbuffer position. [2]

It would not hurt to compare your approach to LTTng and highlight
similarities/differences, and the rationale for the differences.

Especially when it comes to designing kernel ABIs, it's good to make sure
that all bases are covered, because those choices will have lasting impacts.

Thanks,

Mathieu

[1] https://git.lttng.org/?p=lttng-modules.git;a=blob;f=src/lib/ringbuffer/ring_buffer_mmap.c
[2] https://git.lttng.org/?p=lttng-modules.git;a=blob;f=src/lib/ringbuffer/ring_buffer_vfs.c
  
Steven Rostedt Jan. 11, 2024, 11:23 p.m. UTC | #2
On Thu, 11 Jan 2024 11:34:58 -0500
Mathieu Desnoyers <mathieu.desnoyers@efficios.com> wrote:


> The LTTng kernel tracer has supported mmap'd buffers for nearly 15 years [1],
> and has a lot of similarities with this patch series.
> 
> LTTng has the notion of "subbuffer id" to allow atomically exchanging a
> "reader" extra subbuffer with the subbuffer to be read. It implements
> "get subbuffer" / "put subbuffer" ioctls to allow the consumer (reader)
> to move the currently read subbuffer position. [2]
> 
> It would not hurt to compare your approach to LTTng and highlight
> similarities/differences, and the rationale for the differences.
> 
> Especially when it comes to designing kernel ABIs, it's good to make sure
> that all bases are covered, because those choices will have lasting impacts.
> 
> Thanks,
> 
> Mathieu
> 
> [1] https://git.lttng.org/?p=lttng-modules.git;a=blob;f=src/lib/ringbuffer/ring_buffer_mmap.c
> [2] https://git.lttng.org/?p=lttng-modules.git;a=blob;f=src/lib/ringbuffer/ring_buffer_vfs.c
> 

Hi Mathieu,

Thanks for sharing!

As we discussed a little bit in the tracing meeting we do somethings
differently but very similar too ;-)

The similarities as that all the sub-buffers are mapped. You have a
reader-sub-buffer as well.

The main difference is that you use an ioctl() that returns where to find
the reader-sub-buffer, where our ioctl() is just "I'm done, get me a new
reader-sub-buffer". Ours will update the meta page.

Our meta page looks like this:

> +struct trace_buffer_meta {
> +	unsigned long	entries;
> +	unsigned long	overrun;
> +	unsigned long	read;

If start tracing: trace-cmd start -e sched_switch and do:

 ~ cat /sys/kernel/tracing/per_cpu/cpu0/stats 
entries: 14
overrun: 0
commit overrun: 0
bytes: 992
oldest event ts: 84844.825372
now ts: 84847.102075
dropped events: 0
read events: 0

You'll see similar to the above.

 entries = entries
 overrun = overrun
 read = read

The above "read" is total number of events read.

Pretty staight forward ;-)


> +
> +	unsigned long	subbufs_touched;
> +	unsigned long	subbufs_lost;
> +	unsigned long	subbufs_read;

Now I'm thinking we may not want this exported, as I'm not sure it's useful.

Vincent, talking with Mathieu, he was suggesting that we only export what
we really need, and I don't think we need the above. Especially since they
are not exposed in the stats file.


> +
> +	struct {
> +		unsigned long	lost_events;
> +		__u32		id;
> +		__u32		read;
> +	} reader;

The above is definitely needed, as all of it is used to read the
reader-page of the sub-buffer.

lost_events is the number of lost events that happened before this
sub-buffer was swapped out.

Hmm, Vincent, I just notice that you update the lost_events as:

> +static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> +	struct trace_buffer_meta *meta = cpu_buffer->meta_page;
> +
> +	WRITE_ONCE(meta->entries, local_read(&cpu_buffer->entries));
> +	WRITE_ONCE(meta->overrun, local_read(&cpu_buffer->overrun));
> +	WRITE_ONCE(meta->read, cpu_buffer->read);
> +
> +	WRITE_ONCE(meta->subbufs_touched, local_read(&cpu_buffer->pages_touched));
> +	WRITE_ONCE(meta->subbufs_lost, local_read(&cpu_buffer->pages_lost));
> +	WRITE_ONCE(meta->subbufs_read, local_read(&cpu_buffer->pages_read));
> +
> +	WRITE_ONCE(meta->reader.read, cpu_buffer->reader_page->read);
> +	WRITE_ONCE(meta->reader.id, cpu_buffer->reader_page->id);
> +	WRITE_ONCE(meta->reader.lost_events, cpu_buffer->lost_events);
> +}

The lost_events may need to be handled differently, as it doesn't always
get cleared. So it may be stale data.


> +
> +	__u32		subbuf_size;
> +	__u32		nr_subbufs;

This gets is the information needed to read the mapped ring buffer.

> +
> +	__u32		meta_page_size;
> +	__u32		meta_struct_len;

The ring buffer gets mapped after "meta_page_size" and this structure is
"meta_struct_len" which will change if we add new data to it.

> +};

-- Steve
  
Vincent Donnefort Jan. 12, 2024, 9:13 a.m. UTC | #3
On Thu, Jan 11, 2024 at 06:23:20PM -0500, Steven Rostedt wrote:
> On Thu, 11 Jan 2024 11:34:58 -0500
> Mathieu Desnoyers <mathieu.desnoyers@efficios.com> wrote:
> 
> 
> > The LTTng kernel tracer has supported mmap'd buffers for nearly 15 years [1],
> > and has a lot of similarities with this patch series.
> > 
> > LTTng has the notion of "subbuffer id" to allow atomically exchanging a
> > "reader" extra subbuffer with the subbuffer to be read. It implements
> > "get subbuffer" / "put subbuffer" ioctls to allow the consumer (reader)
> > to move the currently read subbuffer position. [2]
> > 
> > It would not hurt to compare your approach to LTTng and highlight
> > similarities/differences, and the rationale for the differences.
> > 
> > Especially when it comes to designing kernel ABIs, it's good to make sure
> > that all bases are covered, because those choices will have lasting impacts.
> > 
> > Thanks,
> > 
> > Mathieu
> > 
> > [1] https://git.lttng.org/?p=lttng-modules.git;a=blob;f=src/lib/ringbuffer/ring_buffer_mmap.c
> > [2] https://git.lttng.org/?p=lttng-modules.git;a=blob;f=src/lib/ringbuffer/ring_buffer_vfs.c
> > 
> 
> Hi Mathieu,
> 
> Thanks for sharing!
> 
> As we discussed a little bit in the tracing meeting we do somethings
> differently but very similar too ;-)
> 
> The similarities as that all the sub-buffers are mapped. You have a
> reader-sub-buffer as well.
> 
> The main difference is that you use an ioctl() that returns where to find
> the reader-sub-buffer, where our ioctl() is just "I'm done, get me a new
> reader-sub-buffer". Ours will update the meta page.
> 
> Our meta page looks like this:
> 
> > +struct trace_buffer_meta {
> > +	unsigned long	entries;
> > +	unsigned long	overrun;
> > +	unsigned long	read;
> 
> If start tracing: trace-cmd start -e sched_switch and do:
> 
>  ~ cat /sys/kernel/tracing/per_cpu/cpu0/stats 
> entries: 14
> overrun: 0
> commit overrun: 0
> bytes: 992
> oldest event ts: 84844.825372
> now ts: 84847.102075
> dropped events: 0
> read events: 0
> 
> You'll see similar to the above.
> 
>  entries = entries
>  overrun = overrun
>  read = read
> 
> The above "read" is total number of events read.
> 
> Pretty staight forward ;-)
> 
> 
> > +
> > +	unsigned long	subbufs_touched;
> > +	unsigned long	subbufs_lost;
> > +	unsigned long	subbufs_read;
> 
> Now I'm thinking we may not want this exported, as I'm not sure it's useful.

touched and lost are not useful now, but it'll be for my support of the
hypervisor tracing, that's why I added them already.

subbufs_read could probably go away though as even in that case I can track that
in the reader.

> 
> Vincent, talking with Mathieu, he was suggesting that we only export what
> we really need, and I don't think we need the above. Especially since they
> are not exposed in the stats file.
> 
> 
> > +
> > +	struct {
> > +		unsigned long	lost_events;
> > +		__u32		id;
> > +		__u32		read;
> > +	} reader;
> 
> The above is definitely needed, as all of it is used to read the
> reader-page of the sub-buffer.
> 
> lost_events is the number of lost events that happened before this
> sub-buffer was swapped out.
> 
> Hmm, Vincent, I just notice that you update the lost_events as:
> 
> > +static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> > +{
> > +	struct trace_buffer_meta *meta = cpu_buffer->meta_page;
> > +
> > +	WRITE_ONCE(meta->entries, local_read(&cpu_buffer->entries));
> > +	WRITE_ONCE(meta->overrun, local_read(&cpu_buffer->overrun));
> > +	WRITE_ONCE(meta->read, cpu_buffer->read);
> > +
> > +	WRITE_ONCE(meta->subbufs_touched, local_read(&cpu_buffer->pages_touched));
> > +	WRITE_ONCE(meta->subbufs_lost, local_read(&cpu_buffer->pages_lost));
> > +	WRITE_ONCE(meta->subbufs_read, local_read(&cpu_buffer->pages_read));
> > +
> > +	WRITE_ONCE(meta->reader.read, cpu_buffer->reader_page->read);
> > +	WRITE_ONCE(meta->reader.id, cpu_buffer->reader_page->id);
> > +	WRITE_ONCE(meta->reader.lost_events, cpu_buffer->lost_events);
> > +}
> 
> The lost_events may need to be handled differently, as it doesn't always
> get cleared. So it may be stale data.

My idea was to check this value after the ioctl(). If > 0 then events were lost
between the that ioctl() and the previous swap.

But now if with "[PATCH] ring-buffer: Have mmapped ring buffer keep track of
missed events" we put this information in the page itself, we can get rid of
this field.

> 
> 
> > +
> > +	__u32		subbuf_size;
> > +	__u32		nr_subbufs;
> 
> This gets is the information needed to read the mapped ring buffer.
> 
> > +
> > +	__u32		meta_page_size;
> > +	__u32		meta_struct_len;
> 
> The ring buffer gets mapped after "meta_page_size" and this structure is
> "meta_struct_len" which will change if we add new data to it.
> 
> > +};
> 
> -- Steve
  
Steven Rostedt Jan. 12, 2024, 3:06 p.m. UTC | #4
On Fri, 12 Jan 2024 09:13:02 +0000
Vincent Donnefort <vdonnefort@google.com> wrote:

> > > +
> > > +	unsigned long	subbufs_touched;
> > > +	unsigned long	subbufs_lost;
> > > +	unsigned long	subbufs_read;  
> > 
> > Now I'm thinking we may not want this exported, as I'm not sure it's useful.  
> 
> touched and lost are not useful now, but it'll be for my support of the
> hypervisor tracing, that's why I added them already.
> 
> subbufs_read could probably go away though as even in that case I can track that
> in the reader.

Yeah, but I think we can leave it off for now.

This is something we could extend the structure with. In fact, it can be
different for different buffers!

That is, since they are pretty meaningless for the normal ring buffer, I
don't think we need to export it. But if it's useful for your hypervisor
buffer, it can export it. It would be a good test to know if the extension
works. Of course, that means if the normal ring buffer needs more info, it
must also include this as well, as it will already be part of the extended
structure.


> 
> > 
> > Vincent, talking with Mathieu, he was suggesting that we only export what
> > we really need, and I don't think we need the above. Especially since they
> > are not exposed in the stats file.
> > 
> >   
> > > +
> > > +	struct {
> > > +		unsigned long	lost_events;
> > > +		__u32		id;
> > > +		__u32		read;
> > > +	} reader;  
> > 
> > The above is definitely needed, as all of it is used to read the
> > reader-page of the sub-buffer.
> > 
> > lost_events is the number of lost events that happened before this
> > sub-buffer was swapped out.
> > 
> > Hmm, Vincent, I just notice that you update the lost_events as:
> >   
> > > +static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> > > +{
> > > +	struct trace_buffer_meta *meta = cpu_buffer->meta_page;
> > > +
> > > +	WRITE_ONCE(meta->entries, local_read(&cpu_buffer->entries));
> > > +	WRITE_ONCE(meta->overrun, local_read(&cpu_buffer->overrun));
> > > +	WRITE_ONCE(meta->read, cpu_buffer->read);
> > > +
> > > +	WRITE_ONCE(meta->subbufs_touched, local_read(&cpu_buffer->pages_touched));
> > > +	WRITE_ONCE(meta->subbufs_lost, local_read(&cpu_buffer->pages_lost));
> > > +	WRITE_ONCE(meta->subbufs_read, local_read(&cpu_buffer->pages_read));
> > > +
> > > +	WRITE_ONCE(meta->reader.read, cpu_buffer->reader_page->read);
> > > +	WRITE_ONCE(meta->reader.id, cpu_buffer->reader_page->id);
> > > +	WRITE_ONCE(meta->reader.lost_events, cpu_buffer->lost_events);
> > > +}  
> > 
> > The lost_events may need to be handled differently, as it doesn't always
> > get cleared. So it may be stale data.  
> 
> My idea was to check this value after the ioctl(). If > 0 then events were lost
> between the that ioctl() and the previous swap.
> 
> But now if with "[PATCH] ring-buffer: Have mmapped ring buffer keep track of
> missed events" we put this information in the page itself, we can get rid of
> this field.
> 

I'm thinking both may be good, as the number of dropped events are not
added if there's no room to put it at the end of the ring buffer. When
there's no room, it just sets a flag that there was missed events but
doesn't give how many events were missed.

If anything, it should be in both locations. In the sub-buffer header, to
be consistent with the read/splice version, and in the meta page were if
there's no room to add the count, it can be accessed in the meta page.

-- Steve
  
Steven Rostedt Jan. 12, 2024, 3:58 p.m. UTC | #5
On Fri, 12 Jan 2024 10:06:41 -0500
Steven Rostedt <rostedt@goodmis.org> wrote:

> I'm thinking both may be good, as the number of dropped events are not
> added if there's no room to put it at the end of the ring buffer. When
> there's no room, it just sets a flag that there was missed events but
> doesn't give how many events were missed.
> 
> If anything, it should be in both locations. In the sub-buffer header, to
> be consistent with the read/splice version, and in the meta page were if
> there's no room to add the count, it can be accessed in the meta page.

I think  the meta data page should look like this:

struct trace_buffer_meta {
	__u32		meta_page_size;
	__u32		meta_struct_len;
 
	__u32		subbuf_size;
	__u32		nr_subbufs;
 
	struct {
		__u64		lost_events;
		__u32		id;
		__u32		read;
	} reader;
 
	__u64	entries;
	__u64	overrun;
	__u64	read;

};

1) meta_page_size

      The size of this meta page. It's the first thing the application
      needs to know how much to mmap.

2) meta_struct_len

      The actual length of the structure. It's the second thing the
      application will look at to know what version it is dealing with.

3) subbuf_size
4) nr_subbufs

      The next two is the next thing the application should do. That is to
      memory map in all the sub-buffers. With 1) and this, it knows how to
      do that.

The first four are needed for setup, and never changes once mapped. The
rest gets updated during the trace.

5) reader
  5a) lost_events
  5b) id
  5c) read

      This is actively needed during tracing. It is what is used to know
      where and how to read the reader sub-buffer.

6) entries
7) overrun
8) read

      These are useful statistics, but probably seldom really looked at.
      They should be at the end.

Also notice that I converted all "unsigned long" to __u64. This is because
it is possible to have a 32 bit userspace running this on top of a 64 bit
kernel. If we were to use "long" it would be inconsistent and buggy.

Now if you need subbufs_touched and subbufs_lost. When that gets opened, it
would update the  meta_struct_len accordingly, and the structure would look
like:

struct trace_buffer_meta {
	__u32		meta_page_size;
	__u32		meta_struct_len;
 
	__u32		subbuf_size;
	__u32		nr_subbufs;
 
	struct {
		__u64		lost_events;
		__u32		id;
		__u32		read;
	} reader;
 
	__u64	entries;
	__u64	overrun;
	__u64	read;

	__u64	subbufs_touched;
	__u64	subbufs_lost;
};


Make sense?

-- Steve
  
Masami Hiramatsu (Google) Jan. 15, 2024, 4:43 a.m. UTC | #6
On Thu, 11 Jan 2024 16:17:09 +0000
Vincent Donnefort <vdonnefort@google.com> wrote:

> In preparation for allowing the user-space to map a ring-buffer, add
> a set of mapping functions:
> 
>   ring_buffer_{map,unmap}()
>   ring_buffer_map_fault()
> 
> And controls on the ring-buffer:
> 
>   ring_buffer_map_get_reader()  /* swap reader and head */
> 
> Mapping the ring-buffer also involves:
> 
>   A unique ID for each subbuf of the ring-buffer, currently they are
>   only identified through their in-kernel VA.
> 
>   A meta-page, where are stored ring-buffer statistics and a
>   description for the current reader
> 
> The linear mapping exposes the meta-page, and each subbuf of the
> ring-buffer, ordered following their unique ID, assigned during the
> first mapping.
> 
> Once mapped, no subbuf can get in or out of the ring-buffer: the buffer
> size will remain unmodified and the splice enabling functions will in
> reality simply memcpy the data instead of swapping subbufs.
> 
> Signed-off-by: Vincent Donnefort <vdonnefort@google.com>
> 
> diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
> index fa802db216f9..0841ba8bab14 100644
> --- a/include/linux/ring_buffer.h
> +++ b/include/linux/ring_buffer.h
> @@ -6,6 +6,8 @@
>  #include <linux/seq_file.h>
>  #include <linux/poll.h>
>  
> +#include <uapi/linux/trace_mmap.h>
> +
>  struct trace_buffer;
>  struct ring_buffer_iter;
>  
> @@ -221,4 +223,9 @@ int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node);
>  #define trace_rb_cpu_prepare	NULL
>  #endif
>  
> +int ring_buffer_map(struct trace_buffer *buffer, int cpu);
> +int ring_buffer_unmap(struct trace_buffer *buffer, int cpu);
> +struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
> +				   unsigned long pgoff);
> +int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu);
>  #endif /* _LINUX_RING_BUFFER_H */
> diff --git a/include/uapi/linux/trace_mmap.h b/include/uapi/linux/trace_mmap.h
> new file mode 100644
> index 000000000000..bde39a73ce65
> --- /dev/null
> +++ b/include/uapi/linux/trace_mmap.h
> @@ -0,0 +1,45 @@
> +/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
> +#ifndef _TRACE_MMAP_H_
> +#define _TRACE_MMAP_H_
> +
> +#include <linux/types.h>
> +
> +/**
> + * struct trace_buffer_meta - Ring-buffer Meta-page description
> + * @entries:		Number of entries in the ring-buffer.
> + * @overrun:		Number of entries lost in the ring-buffer.
> + * @read:		Number of entries that have been read.
> + * @subbufs_touched:	Number of subbufs that have been filled.
> + * @subbufs_lost:	Number of subbufs lost to overrun.
> + * @subbufs_read:	Number of subbufs that have been read.
> + * @reader.lost_events:	Number of events lost at the time of the reader swap.
> + * @reader.id:		subbuf ID of the current reader. From 0 to @nr_subbufs - 1
> + * @reader.read:	Number of bytes read on the reader subbuf.
> + * @subbuf_size:	Size of each subbuf, including the header.
> + * @nr_subbufs:		Number of subbfs in the ring-buffer.
> + * @meta_page_size:	Size of this meta-page.
> + * @meta_struct_len:	Size of this structure.
> + */
> +struct trace_buffer_meta {
> +	unsigned long	entries;
> +	unsigned long	overrun;
> +	unsigned long	read;
> +
> +	unsigned long	subbufs_touched;
> +	unsigned long	subbufs_lost;
> +	unsigned long	subbufs_read;
> +
> +	struct {
> +		unsigned long	lost_events;
> +		__u32		id;
> +		__u32		read;
> +	} reader;
> +
> +	__u32		subbuf_size;
> +	__u32		nr_subbufs;
> +
> +	__u32		meta_page_size;
> +	__u32		meta_struct_len;
> +};
> +
> +#endif /* _TRACE_MMAP_H_ */
> diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
> index db73e326fa04..e9ff1c95e896 100644
> --- a/kernel/trace/ring_buffer.c
> +++ b/kernel/trace/ring_buffer.c
> @@ -338,6 +338,7 @@ struct buffer_page {
>  	local_t		 entries;	/* entries on this page */
>  	unsigned long	 real_end;	/* real end of data */
>  	unsigned	 order;		/* order of the page */
> +	u32		 id;		/* ID for external mapping */
>  	struct buffer_data_page *page;	/* Actual data page */
>  };
>  
> @@ -484,6 +485,12 @@ struct ring_buffer_per_cpu {
>  	u64				read_stamp;
>  	/* pages removed since last reset */
>  	unsigned long			pages_removed;
> +
> +	int				mapped;
> +	struct mutex			mapping_lock;
> +	unsigned long			*subbuf_ids;	/* ID to addr */
> +	struct trace_buffer_meta	*meta_page;
> +
>  	/* ring buffer pages to update, > 0 to add, < 0 to remove */
>  	long				nr_pages_to_update;
>  	struct list_head		new_pages; /* new pages to add */
> @@ -1542,6 +1549,7 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
>  	init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
>  	init_waitqueue_head(&cpu_buffer->irq_work.waiters);
>  	init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
> +	mutex_init(&cpu_buffer->mapping_lock);
>  
>  	bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
>  			    GFP_KERNEL, cpu_to_node(cpu));
> @@ -5160,6 +5168,23 @@ static void rb_clear_buffer_page(struct buffer_page *page)
>  	page->read = 0;
>  }
>  
> +static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> +	struct trace_buffer_meta *meta = cpu_buffer->meta_page;
> +
> +	WRITE_ONCE(meta->entries, local_read(&cpu_buffer->entries));
> +	WRITE_ONCE(meta->overrun, local_read(&cpu_buffer->overrun));
> +	WRITE_ONCE(meta->read, cpu_buffer->read);
> +
> +	WRITE_ONCE(meta->subbufs_touched, local_read(&cpu_buffer->pages_touched));
> +	WRITE_ONCE(meta->subbufs_lost, local_read(&cpu_buffer->pages_lost));
> +	WRITE_ONCE(meta->subbufs_read, local_read(&cpu_buffer->pages_read));
> +
> +	WRITE_ONCE(meta->reader.read, cpu_buffer->reader_page->read);
> +	WRITE_ONCE(meta->reader.id, cpu_buffer->reader_page->id);
> +	WRITE_ONCE(meta->reader.lost_events, cpu_buffer->lost_events);
> +}
> +
>  static void
>  rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
>  {
> @@ -5204,6 +5229,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
>  	cpu_buffer->lost_events = 0;
>  	cpu_buffer->last_overrun = 0;
>  
> +	if (cpu_buffer->mapped)
> +		rb_update_meta_page(cpu_buffer);
> +
>  	rb_head_page_activate(cpu_buffer);
>  	cpu_buffer->pages_removed = 0;
>  }
> @@ -5418,6 +5446,11 @@ int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
>  	cpu_buffer_a = buffer_a->buffers[cpu];
>  	cpu_buffer_b = buffer_b->buffers[cpu];
>  
> +	if (READ_ONCE(cpu_buffer_a->mapped) || READ_ONCE(cpu_buffer_b->mapped)) {
> +		ret = -EBUSY;
> +		goto out;
> +	}

Ah, this is not enough to stop snapshot. update_max_tr()@kernel/trace/trace.c
is used for swapping all CPU buffer and it does not use this function.
(but that should use this instead of accessing buffer directly...)

Maybe we need ring_buffer_swap() and it checks all cpu_buffer does not set
mapping bits.

Thank you,

> +
>  	/* At least make sure the two buffers are somewhat the same */
>  	if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
>  		goto out;
> @@ -5682,7 +5715,8 @@ int ring_buffer_read_page(struct trace_buffer *buffer,
>  	 * Otherwise, we can simply swap the page with the one passed in.
>  	 */
>  	if (read || (len < (commit - read)) ||
> -	    cpu_buffer->reader_page == cpu_buffer->commit_page) {
> +	    cpu_buffer->reader_page == cpu_buffer->commit_page ||
> +	    cpu_buffer->mapped) {
>  		struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
>  		unsigned int rpos = read;
>  		unsigned int pos = 0;
> @@ -5901,6 +5935,11 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
>  
>  		cpu_buffer = buffer->buffers[cpu];
>  
> +		if (cpu_buffer->mapped) {
> +			err = -EBUSY;
> +			goto error;
> +		}
> +
>  		/* Update the number of pages to match the new size */
>  		nr_pages = old_size * buffer->buffers[cpu]->nr_pages;
>  		nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size);
> @@ -6002,6 +6041,295 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
>  }
>  EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set);
>  
> +#define subbuf_page(off, start) \
> +	virt_to_page((void *)(start + (off << PAGE_SHIFT)))
> +
> +#define foreach_subbuf_page(sub_order, start, page)		\
> +	page = subbuf_page(0, (start));				\
> +	for (int __off = 0; __off < (1 << (sub_order));		\
> +	     __off++, page = subbuf_page(__off, (start)))
> +
> +static inline void subbuf_map_prepare(unsigned long subbuf_start, int order)
> +{
> +	struct page *page;
> +
> +	/*
> +	 * When allocating order > 0 pages, only the first struct page has a
> +	 * refcount > 1. Increasing the refcount here ensures none of the struct
> +	 * page composing the sub-buffer is freeed when the mapping is closed.
> +	 */
> +	foreach_subbuf_page(order, subbuf_start, page)
> +		page_ref_inc(page);
> +}
> +
> +static inline void subbuf_unmap(unsigned long subbuf_start, int order)
> +{
> +	struct page *page;
> +
> +	foreach_subbuf_page(order, subbuf_start, page) {
> +		page_ref_dec(page);
> +		page->mapping = NULL;
> +	}
> +}
> +
> +static void rb_free_subbuf_ids(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> +	int sub_id;
> +
> +	for (sub_id = 0; sub_id < cpu_buffer->nr_pages + 1; sub_id++)
> +		subbuf_unmap(cpu_buffer->subbuf_ids[sub_id],
> +			     cpu_buffer->buffer->subbuf_order);
> +
> +	kfree(cpu_buffer->subbuf_ids);
> +	cpu_buffer->subbuf_ids = NULL;
> +}
> +
> +static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> +	if (cpu_buffer->meta_page)
> +		return 0;
> +
> +	cpu_buffer->meta_page = page_to_virt(alloc_page(GFP_USER | __GFP_ZERO));
> +	if (!cpu_buffer->meta_page)
> +		return -ENOMEM;
> +
> +	return 0;
> +}
> +
> +static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> +	unsigned long addr = (unsigned long)cpu_buffer->meta_page;
> +
> +	virt_to_page((void *)addr)->mapping = NULL;
> +	free_page(addr);
> +	cpu_buffer->meta_page = NULL;
> +}
> +
> +static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer,
> +				   unsigned long *subbuf_ids)
> +{
> +	struct trace_buffer_meta *meta = cpu_buffer->meta_page;
> +	unsigned int nr_subbufs = cpu_buffer->nr_pages + 1;
> +	struct buffer_page *first_subbuf, *subbuf;
> +	int id = 0;
> +
> +	subbuf_ids[id] = (unsigned long)cpu_buffer->reader_page->page;
> +	subbuf_map_prepare(subbuf_ids[id], cpu_buffer->buffer->subbuf_order);
> +	cpu_buffer->reader_page->id = id++;
> +
> +	first_subbuf = subbuf = rb_set_head_page(cpu_buffer);
> +	do {
> +		if (id >= nr_subbufs) {
> +			WARN_ON(1);
> +			break;
> +		}
> +
> +		subbuf_ids[id] = (unsigned long)subbuf->page;
> +		subbuf->id = id;
> +		subbuf_map_prepare(subbuf_ids[id], cpu_buffer->buffer->subbuf_order);
> +
> +		rb_inc_page(&subbuf);
> +		id++;
> +	} while (subbuf != first_subbuf);
> +
> +	/* install subbuf ID to kern VA translation */
> +	cpu_buffer->subbuf_ids = subbuf_ids;
> +
> +	meta->meta_page_size = PAGE_SIZE;
> +	meta->meta_struct_len = sizeof(*meta);
> +	meta->nr_subbufs = nr_subbufs;
> +	meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE;
> +
> +	rb_update_meta_page(cpu_buffer);
> +}
> +
> +static inline struct ring_buffer_per_cpu *
> +rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu)
> +{
> +	struct ring_buffer_per_cpu *cpu_buffer;
> +
> +	if (!cpumask_test_cpu(cpu, buffer->cpumask))
> +		return ERR_PTR(-EINVAL);
> +
> +	cpu_buffer = buffer->buffers[cpu];
> +
> +	mutex_lock(&cpu_buffer->mapping_lock);
> +
> +	if (!cpu_buffer->mapped) {
> +		mutex_unlock(&cpu_buffer->mapping_lock);
> +		return ERR_PTR(-ENODEV);
> +	}
> +
> +	return cpu_buffer;
> +}
> +
> +static inline void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> +	mutex_unlock(&cpu_buffer->mapping_lock);
> +}
> +
> +int ring_buffer_map(struct trace_buffer *buffer, int cpu)
> +{
> +	struct ring_buffer_per_cpu *cpu_buffer;
> +	unsigned long flags, *subbuf_ids;
> +	int err = 0;
> +
> +	if (!cpumask_test_cpu(cpu, buffer->cpumask))
> +		return -EINVAL;
> +
> +	cpu_buffer = buffer->buffers[cpu];
> +
> +	mutex_lock(&cpu_buffer->mapping_lock);
> +
> +	if (cpu_buffer->mapped) {
> +		if (cpu_buffer->mapped == INT_MAX)
> +			err = -EBUSY;
> +		else
> +			WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped + 1);
> +		mutex_unlock(&cpu_buffer->mapping_lock);
> +		return err;
> +	}
> +
> +	/* prevent another thread from changing buffer sizes */
> +	mutex_lock(&buffer->mutex);
> +
> +	err = rb_alloc_meta_page(cpu_buffer);
> +	if (err)
> +		goto unlock;
> +
> +	/* subbuf_ids include the reader while nr_pages does not */
> +	subbuf_ids = kzalloc(sizeof(*subbuf_ids) * (cpu_buffer->nr_pages + 1),
> +			   GFP_KERNEL);
> +	if (!subbuf_ids) {
> +		rb_free_meta_page(cpu_buffer);
> +		err = -ENOMEM;
> +		goto unlock;
> +	}
> +
> +	atomic_inc(&cpu_buffer->resize_disabled);
> +
> +	/*
> +	 * Lock all readers to block any subbuf swap until the subbuf IDs are
> +	 * assigned.
> +	 */
> +	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
> +
> +	rb_setup_ids_meta_page(cpu_buffer, subbuf_ids);
> +
> +	WRITE_ONCE(cpu_buffer->mapped, 1);
> +
> +	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
> +unlock:
> +	mutex_unlock(&buffer->mutex);
> +	mutex_unlock(&cpu_buffer->mapping_lock);
> +
> +	return err;
> +}
> +
> +int ring_buffer_unmap(struct trace_buffer *buffer, int cpu)
> +{
> +	struct ring_buffer_per_cpu *cpu_buffer;
> +	int err = 0;
> +
> +	if (!cpumask_test_cpu(cpu, buffer->cpumask))
> +		return -EINVAL;
> +
> +	cpu_buffer = buffer->buffers[cpu];
> +
> +	mutex_lock(&cpu_buffer->mapping_lock);
> +
> +	if (!cpu_buffer->mapped) {
> +		err = -ENODEV;
> +		goto unlock;
> +	}
> +
> +	WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped - 1);
> +	if (!cpu_buffer->mapped) {
> +		/* Wait for the writer and readers to observe !mapped */
> +		synchronize_rcu();
> +
> +		rb_free_subbuf_ids(cpu_buffer);
> +		rb_free_meta_page(cpu_buffer);
> +		atomic_dec(&cpu_buffer->resize_disabled);
> +	}
> +unlock:
> +	mutex_unlock(&cpu_buffer->mapping_lock);
> +
> +	return err;
> +}
> +
> +/*
> + *   +--------------+  pgoff == 0
> + *   |   meta page  |
> + *   +--------------+  pgoff == 1
> + *   | subbuffer 0  |
> + *   +--------------+  pgoff == 1 + (1 << subbuf_order)
> + *   | subbuffer 1  |
> + *         ...
> + */
> +struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
> +				   unsigned long pgoff)
> +{
> +	struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
> +	unsigned long subbuf_id, subbuf_offset, addr;
> +	struct page *page;
> +
> +	if (!pgoff)
> +		return virt_to_page((void *)cpu_buffer->meta_page);
> +
> +	pgoff--;
> +
> +	subbuf_id = pgoff >> buffer->subbuf_order;
> +	if (subbuf_id > cpu_buffer->nr_pages)
> +		return NULL;
> +
> +	subbuf_offset = pgoff & ((1UL << buffer->subbuf_order) - 1);
> +	addr = cpu_buffer->subbuf_ids[subbuf_id] + (subbuf_offset * PAGE_SIZE);
> +	page = virt_to_page((void *)addr);
> +
> +	return page;
> +}
> +
> +int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu)
> +{
> +	struct ring_buffer_per_cpu *cpu_buffer;
> +	unsigned long reader_size;
> +	unsigned long flags;
> +
> +	cpu_buffer = rb_get_mapped_buffer(buffer, cpu);
> +	if (IS_ERR(cpu_buffer))
> +		return (int)PTR_ERR(cpu_buffer);
> +
> +	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
> +consume:
> +	if (rb_per_cpu_empty(cpu_buffer))
> +		goto out;
> +
> +	reader_size = rb_page_size(cpu_buffer->reader_page);
> +
> +	/*
> +	 * There are data to be read on the current reader page, we can
> +	 * return to the caller. But before that, we assume the latter will read
> +	 * everything. Let's update the kernel reader accordingly.
> +	 */
> +	if (cpu_buffer->reader_page->read < reader_size) {
> +		while (cpu_buffer->reader_page->read < reader_size)
> +			rb_advance_reader(cpu_buffer);
> +		goto out;
> +	}
> +
> +	if (WARN_ON(!rb_get_reader_page(cpu_buffer)))
> +		goto out;
> +
> +	goto consume;
> +out:
> +	rb_update_meta_page(cpu_buffer);
> +	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
> +	rb_put_mapped_buffer(cpu_buffer);
> +
> +	return 0;
> +}
> +
>  /*
>   * We only allocate new buffers, never free them if the CPU goes down.
>   * If we were to free the buffer, then the user would lose any trace that was in
> -- 
> 2.43.0.275.g3460e3d667-goog
>
  
Vincent Donnefort Jan. 15, 2024, 3:37 p.m. UTC | #7
On Mon, Jan 15, 2024 at 01:43:03PM +0900, Masami Hiramatsu wrote:
> On Thu, 11 Jan 2024 16:17:09 +0000
> Vincent Donnefort <vdonnefort@google.com> wrote:
> 
> > In preparation for allowing the user-space to map a ring-buffer, add
> > a set of mapping functions:
> > 
> >   ring_buffer_{map,unmap}()
> >   ring_buffer_map_fault()
> > 
> > And controls on the ring-buffer:
> > 
> >   ring_buffer_map_get_reader()  /* swap reader and head */
> > 
> > Mapping the ring-buffer also involves:
> > 
> >   A unique ID for each subbuf of the ring-buffer, currently they are
> >   only identified through their in-kernel VA.
> > 
> >   A meta-page, where are stored ring-buffer statistics and a
> >   description for the current reader
> > 
> > The linear mapping exposes the meta-page, and each subbuf of the
> > ring-buffer, ordered following their unique ID, assigned during the
> > first mapping.
> > 
> > Once mapped, no subbuf can get in or out of the ring-buffer: the buffer
> > size will remain unmodified and the splice enabling functions will in
> > reality simply memcpy the data instead of swapping subbufs.
> > 
> > Signed-off-by: Vincent Donnefort <vdonnefort@google.com>
> > 
> > diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
> > index fa802db216f9..0841ba8bab14 100644
> > --- a/include/linux/ring_buffer.h
> > +++ b/include/linux/ring_buffer.h
> > @@ -6,6 +6,8 @@
> >  #include <linux/seq_file.h>
> >  #include <linux/poll.h>
> >  
> > +#include <uapi/linux/trace_mmap.h>
> > +
> >  struct trace_buffer;
> >  struct ring_buffer_iter;
> >  
> > @@ -221,4 +223,9 @@ int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node);
> >  #define trace_rb_cpu_prepare	NULL
> >  #endif
> >  
> > +int ring_buffer_map(struct trace_buffer *buffer, int cpu);
> > +int ring_buffer_unmap(struct trace_buffer *buffer, int cpu);
> > +struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
> > +				   unsigned long pgoff);
> > +int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu);
> >  #endif /* _LINUX_RING_BUFFER_H */
> > diff --git a/include/uapi/linux/trace_mmap.h b/include/uapi/linux/trace_mmap.h
> > new file mode 100644
> > index 000000000000..bde39a73ce65
> > --- /dev/null
> > +++ b/include/uapi/linux/trace_mmap.h
> > @@ -0,0 +1,45 @@
> > +/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
> > +#ifndef _TRACE_MMAP_H_
> > +#define _TRACE_MMAP_H_
> > +
> > +#include <linux/types.h>
> > +
> > +/**
> > + * struct trace_buffer_meta - Ring-buffer Meta-page description
> > + * @entries:		Number of entries in the ring-buffer.
> > + * @overrun:		Number of entries lost in the ring-buffer.
> > + * @read:		Number of entries that have been read.
> > + * @subbufs_touched:	Number of subbufs that have been filled.
> > + * @subbufs_lost:	Number of subbufs lost to overrun.
> > + * @subbufs_read:	Number of subbufs that have been read.
> > + * @reader.lost_events:	Number of events lost at the time of the reader swap.
> > + * @reader.id:		subbuf ID of the current reader. From 0 to @nr_subbufs - 1
> > + * @reader.read:	Number of bytes read on the reader subbuf.
> > + * @subbuf_size:	Size of each subbuf, including the header.
> > + * @nr_subbufs:		Number of subbfs in the ring-buffer.
> > + * @meta_page_size:	Size of this meta-page.
> > + * @meta_struct_len:	Size of this structure.
> > + */
> > +struct trace_buffer_meta {
> > +	unsigned long	entries;
> > +	unsigned long	overrun;
> > +	unsigned long	read;
> > +
> > +	unsigned long	subbufs_touched;
> > +	unsigned long	subbufs_lost;
> > +	unsigned long	subbufs_read;
> > +
> > +	struct {
> > +		unsigned long	lost_events;
> > +		__u32		id;
> > +		__u32		read;
> > +	} reader;
> > +
> > +	__u32		subbuf_size;
> > +	__u32		nr_subbufs;
> > +
> > +	__u32		meta_page_size;
> > +	__u32		meta_struct_len;
> > +};
> > +
> > +#endif /* _TRACE_MMAP_H_ */
> > diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
> > index db73e326fa04..e9ff1c95e896 100644
> > --- a/kernel/trace/ring_buffer.c
> > +++ b/kernel/trace/ring_buffer.c
> > @@ -338,6 +338,7 @@ struct buffer_page {
> >  	local_t		 entries;	/* entries on this page */
> >  	unsigned long	 real_end;	/* real end of data */
> >  	unsigned	 order;		/* order of the page */
> > +	u32		 id;		/* ID for external mapping */
> >  	struct buffer_data_page *page;	/* Actual data page */
> >  };
> >  
> > @@ -484,6 +485,12 @@ struct ring_buffer_per_cpu {
> >  	u64				read_stamp;
> >  	/* pages removed since last reset */
> >  	unsigned long			pages_removed;
> > +
> > +	int				mapped;
> > +	struct mutex			mapping_lock;
> > +	unsigned long			*subbuf_ids;	/* ID to addr */
> > +	struct trace_buffer_meta	*meta_page;
> > +
> >  	/* ring buffer pages to update, > 0 to add, < 0 to remove */
> >  	long				nr_pages_to_update;
> >  	struct list_head		new_pages; /* new pages to add */
> > @@ -1542,6 +1549,7 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
> >  	init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
> >  	init_waitqueue_head(&cpu_buffer->irq_work.waiters);
> >  	init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
> > +	mutex_init(&cpu_buffer->mapping_lock);
> >  
> >  	bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
> >  			    GFP_KERNEL, cpu_to_node(cpu));
> > @@ -5160,6 +5168,23 @@ static void rb_clear_buffer_page(struct buffer_page *page)
> >  	page->read = 0;
> >  }
> >  
> > +static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> > +{
> > +	struct trace_buffer_meta *meta = cpu_buffer->meta_page;
> > +
> > +	WRITE_ONCE(meta->entries, local_read(&cpu_buffer->entries));
> > +	WRITE_ONCE(meta->overrun, local_read(&cpu_buffer->overrun));
> > +	WRITE_ONCE(meta->read, cpu_buffer->read);
> > +
> > +	WRITE_ONCE(meta->subbufs_touched, local_read(&cpu_buffer->pages_touched));
> > +	WRITE_ONCE(meta->subbufs_lost, local_read(&cpu_buffer->pages_lost));
> > +	WRITE_ONCE(meta->subbufs_read, local_read(&cpu_buffer->pages_read));
> > +
> > +	WRITE_ONCE(meta->reader.read, cpu_buffer->reader_page->read);
> > +	WRITE_ONCE(meta->reader.id, cpu_buffer->reader_page->id);
> > +	WRITE_ONCE(meta->reader.lost_events, cpu_buffer->lost_events);
> > +}
> > +
> >  static void
> >  rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
> >  {
> > @@ -5204,6 +5229,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
> >  	cpu_buffer->lost_events = 0;
> >  	cpu_buffer->last_overrun = 0;
> >  
> > +	if (cpu_buffer->mapped)
> > +		rb_update_meta_page(cpu_buffer);
> > +
> >  	rb_head_page_activate(cpu_buffer);
> >  	cpu_buffer->pages_removed = 0;
> >  }
> > @@ -5418,6 +5446,11 @@ int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
> >  	cpu_buffer_a = buffer_a->buffers[cpu];
> >  	cpu_buffer_b = buffer_b->buffers[cpu];
> >  
> > +	if (READ_ONCE(cpu_buffer_a->mapped) || READ_ONCE(cpu_buffer_b->mapped)) {
> > +		ret = -EBUSY;
> > +		goto out;
> > +	}
> 
> Ah, this is not enough to stop snapshot. update_max_tr()@kernel/trace/trace.c
> is used for swapping all CPU buffer and it does not use this function.
> (but that should use this instead of accessing buffer directly...)
> 
> Maybe we need ring_buffer_swap() and it checks all cpu_buffer does not set
> mapping bits.
> 
> Thank you,

How about instead of having ring_buffer_swap_cpu() returning -EBUSY when mapped
we have two functions to block any mapping that trace.c could call?

 int rb_swap_prepare(struct ring_buffer *cpu_buffer)
 {
    mutex_lock(&cpu_buffer->mapping_lock);

    if (!cpu_buffer->mapped)
    	return 0;

    mutex_unlock(&cpu_buffer->mapping_lock);
 }

 int rb_swap_done(struct ring_buffer *cpu_buffer)
 {
    mutex_unlock(&cpu_buffer->mapping_lock);
 }

 int ring_buffer_swap_prepare(struct trace_buffer *buffer, int cpu)
 {
 ...
     
     if (cpu == RING_BUFFER_ALL_CPUS) {
    	 int cpu_i;

         for_each_buffer_cpu(buffer, cpu_i) {
	 	err = rb_swap_prepare(buffer->buffers[cpu_i]);
		if (err)
			break;
	 }

	 if (err)
	    /* Rollback ... */

	 return err;

     } 

     return rb_swap_prepare(buffer->buffers[cpu]);
 }

 void ring_buffer_swap_done(struct trace_buffer *buffer, int cpu)
 {
     if (cpu == RING_BUFFER_ALL_CPUS) {
    	 int cpu_i;

         for_each_buffer_cpu(buffer, cpu_i)
	 	rb_swap_done(buffer->buffers[cpu_i]);
	 return;
     }

     return rb_swap_done(buffer->buffers[cpu]);
 }

[...]
  
Steven Rostedt Jan. 15, 2024, 4:09 p.m. UTC | #8
On Mon, 15 Jan 2024 15:37:31 +0000
Vincent Donnefort <vdonnefort@google.com> wrote:

> > > @@ -5418,6 +5446,11 @@ int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
> > >  	cpu_buffer_a = buffer_a->buffers[cpu];
> > >  	cpu_buffer_b = buffer_b->buffers[cpu];
> > >  
> > > +	if (READ_ONCE(cpu_buffer_a->mapped) || READ_ONCE(cpu_buffer_b->mapped)) {
> > > +		ret = -EBUSY;
> > > +		goto out;
> > > +	}  
> > 
> > Ah, this is not enough to stop snapshot. update_max_tr()@kernel/trace/trace.c
> > is used for swapping all CPU buffer and it does not use this function.
> > (but that should use this instead of accessing buffer directly...)
> > 
> > Maybe we need ring_buffer_swap() and it checks all cpu_buffer does not set
> > mapping bits.
> > 
> > Thank you,  
> 
> How about instead of having ring_buffer_swap_cpu() returning -EBUSY when mapped
> we have two functions to block any mapping that trace.c could call?
> 

No. The ring buffer logic should not care if the user of it is swapping
the entire ring buffer or not. It only cares if parts of the ring
buffer is being swapped or not. That's not the level of scope it should
care about. If we do not want a swap to happen in update_max_tr()
that's not ring_buffer.c's problem. The code to prevent that from
happening should be 100% in trace.c.

The trace.c code knows what's being mapped or not. The accounting to
keep a mapped buffer from being swapped should stay in trace.c, and not
add unnecessary implementation coupling between that and ring_buffer.c.

-- Steve
  
Steven Rostedt Jan. 15, 2024, 4:23 p.m. UTC | #9
On Mon, 15 Jan 2024 11:09:38 -0500
Steven Rostedt <rostedt@goodmis.org> wrote:

> No. The ring buffer logic should not care if the user of it is swapping
> the entire ring buffer or not. It only cares if parts of the ring
> buffer is being swapped or not. That's not the level of scope it should
> care about. If we do not want a swap to happen in update_max_tr()
> that's not ring_buffer.c's problem. The code to prevent that from
> happening should be 100% in trace.c.

What needs to be done, and feel free to add this as a separate patch,
is to have checks where snapshot is used.

  (All errors return -EBUSY)

Before allowing mapping, check to see if:

 1) the current tracer has "use_max_tr" set.
 2) any event has a "snapshot" trigger set
 3) Any function has a "snapshot" command set

Fail if any of the above is true.

Also in reverse, if the buffer is mapped, then fail:

 1) a tracer being set that has "use_max_tr" set.
 2) a "snapshot" command being set on a function
 3) a "snapshot" trigger being set on an event.

For the last two, we may be able to get away with just a below as well.
Adding the tr->flags bit. We could also add a tr->snapshot count to
keep track of everything that is using a snapshot, and if that count is
non-zero, mapping fails.

diff --git a/kernel/trace/trace.c b/kernel/trace/trace.c
index 2a7c6fd934e9..f534f74ae80f 100644
--- a/kernel/trace/trace.c
+++ b/kernel/trace/trace.c
@@ -1175,6 +1175,12 @@ static void tracing_snapshot_instance_cond(struct trace_array *tr,
 		return;
 	}
 
+	if (tr->flags & TRACE_ARRAY_FL_MAPPED) {
+		trace_array_puts(tr, "*** BUFFER IS MEMORY MAPPED ***\n");
+		trace_array_puts(tr, "*** Can not use snapshot (sorry) ***\n");
+		return;
+	}
+
 	local_irq_save(flags);
 	update_max_tr(tr, current, smp_processor_id(), cond_data);
 	local_irq_restore(flags);


-- Steve
  
Vincent Donnefort Jan. 15, 2024, 5:29 p.m. UTC | #10
On Mon, Jan 15, 2024 at 11:23:59AM -0500, Steven Rostedt wrote:
> On Mon, 15 Jan 2024 11:09:38 -0500
> Steven Rostedt <rostedt@goodmis.org> wrote:
> 
> > No. The ring buffer logic should not care if the user of it is swapping
> > the entire ring buffer or not. It only cares if parts of the ring
> > buffer is being swapped or not. That's not the level of scope it should
> > care about. If we do not want a swap to happen in update_max_tr()
> > that's not ring_buffer.c's problem. The code to prevent that from
> > happening should be 100% in trace.c.
> 
> What needs to be done, and feel free to add this as a separate patch,
> is to have checks where snapshot is used.
> 
>   (All errors return -EBUSY)
> 
> Before allowing mapping, check to see if:
> 
>  1) the current tracer has "use_max_tr" set.
>  2) any event has a "snapshot" trigger set
>  3) Any function has a "snapshot" command set

Could we sum-up this with a single check to allocate_snapshot? If that is
allocated it's probably because we'll be using it?

That would simply add the requirement to echo 0 > snapshot before starting the
memory map?

The opposite could be to let tracing_alloc_snapshot_instance() fail whenever a
mapping is in place?

> 
> Fail if any of the above is true.
> 
> Also in reverse, if the buffer is mapped, then fail:
> 
>  1) a tracer being set that has "use_max_tr" set.
>  2) a "snapshot" command being set on a function
>  3) a "snapshot" trigger being set on an event.
> 
> For the last two, we may be able to get away with just a below as well.
> Adding the tr->flags bit. We could also add a tr->snapshot count to
> keep track of everything that is using a snapshot, and if that count is
> non-zero, mapping fails.
> 
> diff --git a/kernel/trace/trace.c b/kernel/trace/trace.c
> index 2a7c6fd934e9..f534f74ae80f 100644
> --- a/kernel/trace/trace.c
> +++ b/kernel/trace/trace.c
> @@ -1175,6 +1175,12 @@ static void tracing_snapshot_instance_cond(struct trace_array *tr,
>  		return;
>  	}
>  
> +	if (tr->flags & TRACE_ARRAY_FL_MAPPED) {
> +		trace_array_puts(tr, "*** BUFFER IS MEMORY MAPPED ***\n");
> +		trace_array_puts(tr, "*** Can not use snapshot (sorry) ***\n");
> +		return;
> +	}
> +
>  	local_irq_save(flags);
>  	update_max_tr(tr, current, smp_processor_id(), cond_data);
>  	local_irq_restore(flags);
> 
> 
> -- Steve
> 
> -- 
> To unsubscribe from this group and stop receiving emails from it, send an email to kernel-team+unsubscribe@android.com.
>
  
Steven Rostedt Jan. 15, 2024, 6:03 p.m. UTC | #11
On Mon, 15 Jan 2024 17:29:09 +0000
Vincent Donnefort <vdonnefort@google.com> wrote:

> > 
> > What needs to be done, and feel free to add this as a separate patch,
> > is to have checks where snapshot is used.
> > 
> >   (All errors return -EBUSY)
> > 
> > Before allowing mapping, check to see if:
> > 
> >  1) the current tracer has "use_max_tr" set.
> >  2) any event has a "snapshot" trigger set
> >  3) Any function has a "snapshot" command set  
> 
> Could we sum-up this with a single check to allocate_snapshot? If that is
> allocated it's probably because we'll be using it?

Not always. It can be allocated at any time and never used.

I'd like to keep the allocation of the snapshot buffer agnostic to if
the buffer is mapped or not, especially since it can be allocated at
boot up and never used. Several of my boxes have "alloc_snapshot" on
the command line even though I don't always use it. But we could update
the output of reading the "snapshot" file to:

 ~# cat /sys/kernel/tracing/snapshot 
# tracer: nop
#
#
# ** Snapshot disabled due to the buffer being memory mapped **
#
# * Snapshot is freed *
#
# Snapshot commands:
# echo 0 > snapshot : Clears and frees snapshot buffer
# echo 1 > snapshot : Allocates snapshot buffer, if not already allocated.
#                      Takes a snapshot of the main buffer.
# echo 2 > snapshot : Clears snapshot buffer (but does not allocate or free)
#                      (Doesn't have to be '2' works with any number that
#                       is not a '0' or '1')

If it is allocated:

 ~# cat /sys/kernel/tracing/snapshot
# tracer: nop
#
# ** Snapshot disabled due to the buffer being memory mapped **
#
# * Snapshot is allocated *
#
# Snapshot commands:
# echo 0 > snapshot : Clears and frees snapshot buffer
# echo 1 > snapshot : Allocates snapshot buffer, if not already allocated.
#                      Takes a snapshot of the main buffer.
# echo 2 > snapshot : Clears snapshot buffer (but does not allocate or free)
#                      (Doesn't have to be '2' works with any number that
#                       is not a '0' or '1')


> 
> That would simply add the requirement to echo 0 > snapshot before starting the
> memory map?

I rather not depend on that. It just makes it more complex for why
mapping failed. If you get -EBUSY, it will be hard to know why.

> 
> The opposite could be to let tracing_alloc_snapshot_instance() fail whenever a
> mapping is in place?

Yes, that would fail if mapping is in place.

Because the snapshot being allocated can be something people want, as
it allows the snapshot to happen immediately when needed, I don't want
the fact that it is allocated to prevent mapping. As mapping may just
happen for a small period of time while an application is running.

-- Steve
  
Masami Hiramatsu (Google) Jan. 15, 2024, 11:48 p.m. UTC | #12
On Mon, 15 Jan 2024 11:23:59 -0500
Steven Rostedt <rostedt@goodmis.org> wrote:

> On Mon, 15 Jan 2024 11:09:38 -0500
> Steven Rostedt <rostedt@goodmis.org> wrote:
> 
> > No. The ring buffer logic should not care if the user of it is swapping
> > the entire ring buffer or not. It only cares if parts of the ring
> > buffer is being swapped or not. That's not the level of scope it should
> > care about. If we do not want a swap to happen in update_max_tr()
> > that's not ring_buffer.c's problem. The code to prevent that from
> > happening should be 100% in trace.c.
> 
> What needs to be done, and feel free to add this as a separate patch,
> is to have checks where snapshot is used.
> 
>   (All errors return -EBUSY)
> 
> Before allowing mapping, check to see if:
> 
>  1) the current tracer has "use_max_tr" set.
>  2) any event has a "snapshot" trigger set
>  3) Any function has a "snapshot" command set
> 
> Fail if any of the above is true.
> 
> Also in reverse, if the buffer is mapped, then fail:
> 
>  1) a tracer being set that has "use_max_tr" set.
>  2) a "snapshot" command being set on a function
>  3) a "snapshot" trigger being set on an event.
> 
> For the last two, we may be able to get away with just a below as well.
> Adding the tr->flags bit. We could also add a tr->snapshot count to
> keep track of everything that is using a snapshot, and if that count is
> non-zero, mapping fails.

BTW, if we allow mapping per-cpu ring buffer, we may need "tr->mapped"
counter in addition to per-cpu mapped bit. Then we can just check
tr->snapshot at mapping, and tr->mapped at preparing snapshot.

Thank you,

> 
> diff --git a/kernel/trace/trace.c b/kernel/trace/trace.c
> index 2a7c6fd934e9..f534f74ae80f 100644
> --- a/kernel/trace/trace.c
> +++ b/kernel/trace/trace.c
> @@ -1175,6 +1175,12 @@ static void tracing_snapshot_instance_cond(struct trace_array *tr,
>  		return;
>  	}
>  
> +	if (tr->flags & TRACE_ARRAY_FL_MAPPED) {
> +		trace_array_puts(tr, "*** BUFFER IS MEMORY MAPPED ***\n");
> +		trace_array_puts(tr, "*** Can not use snapshot (sorry) ***\n");
> +		return;
> +	}
> +
>  	local_irq_save(flags);
>  	update_max_tr(tr, current, smp_processor_id(), cond_data);
>  	local_irq_restore(flags);
> 
> 
> -- Steve
  

Patch

diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
index fa802db216f9..0841ba8bab14 100644
--- a/include/linux/ring_buffer.h
+++ b/include/linux/ring_buffer.h
@@ -6,6 +6,8 @@ 
 #include <linux/seq_file.h>
 #include <linux/poll.h>
 
+#include <uapi/linux/trace_mmap.h>
+
 struct trace_buffer;
 struct ring_buffer_iter;
 
@@ -221,4 +223,9 @@  int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node);
 #define trace_rb_cpu_prepare	NULL
 #endif
 
+int ring_buffer_map(struct trace_buffer *buffer, int cpu);
+int ring_buffer_unmap(struct trace_buffer *buffer, int cpu);
+struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
+				   unsigned long pgoff);
+int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu);
 #endif /* _LINUX_RING_BUFFER_H */
diff --git a/include/uapi/linux/trace_mmap.h b/include/uapi/linux/trace_mmap.h
new file mode 100644
index 000000000000..bde39a73ce65
--- /dev/null
+++ b/include/uapi/linux/trace_mmap.h
@@ -0,0 +1,45 @@ 
+/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
+#ifndef _TRACE_MMAP_H_
+#define _TRACE_MMAP_H_
+
+#include <linux/types.h>
+
+/**
+ * struct trace_buffer_meta - Ring-buffer Meta-page description
+ * @entries:		Number of entries in the ring-buffer.
+ * @overrun:		Number of entries lost in the ring-buffer.
+ * @read:		Number of entries that have been read.
+ * @subbufs_touched:	Number of subbufs that have been filled.
+ * @subbufs_lost:	Number of subbufs lost to overrun.
+ * @subbufs_read:	Number of subbufs that have been read.
+ * @reader.lost_events:	Number of events lost at the time of the reader swap.
+ * @reader.id:		subbuf ID of the current reader. From 0 to @nr_subbufs - 1
+ * @reader.read:	Number of bytes read on the reader subbuf.
+ * @subbuf_size:	Size of each subbuf, including the header.
+ * @nr_subbufs:		Number of subbfs in the ring-buffer.
+ * @meta_page_size:	Size of this meta-page.
+ * @meta_struct_len:	Size of this structure.
+ */
+struct trace_buffer_meta {
+	unsigned long	entries;
+	unsigned long	overrun;
+	unsigned long	read;
+
+	unsigned long	subbufs_touched;
+	unsigned long	subbufs_lost;
+	unsigned long	subbufs_read;
+
+	struct {
+		unsigned long	lost_events;
+		__u32		id;
+		__u32		read;
+	} reader;
+
+	__u32		subbuf_size;
+	__u32		nr_subbufs;
+
+	__u32		meta_page_size;
+	__u32		meta_struct_len;
+};
+
+#endif /* _TRACE_MMAP_H_ */
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index db73e326fa04..e9ff1c95e896 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -338,6 +338,7 @@  struct buffer_page {
 	local_t		 entries;	/* entries on this page */
 	unsigned long	 real_end;	/* real end of data */
 	unsigned	 order;		/* order of the page */
+	u32		 id;		/* ID for external mapping */
 	struct buffer_data_page *page;	/* Actual data page */
 };
 
@@ -484,6 +485,12 @@  struct ring_buffer_per_cpu {
 	u64				read_stamp;
 	/* pages removed since last reset */
 	unsigned long			pages_removed;
+
+	int				mapped;
+	struct mutex			mapping_lock;
+	unsigned long			*subbuf_ids;	/* ID to addr */
+	struct trace_buffer_meta	*meta_page;
+
 	/* ring buffer pages to update, > 0 to add, < 0 to remove */
 	long				nr_pages_to_update;
 	struct list_head		new_pages; /* new pages to add */
@@ -1542,6 +1549,7 @@  rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
 	init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
 	init_waitqueue_head(&cpu_buffer->irq_work.waiters);
 	init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
+	mutex_init(&cpu_buffer->mapping_lock);
 
 	bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
 			    GFP_KERNEL, cpu_to_node(cpu));
@@ -5160,6 +5168,23 @@  static void rb_clear_buffer_page(struct buffer_page *page)
 	page->read = 0;
 }
 
+static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+	struct trace_buffer_meta *meta = cpu_buffer->meta_page;
+
+	WRITE_ONCE(meta->entries, local_read(&cpu_buffer->entries));
+	WRITE_ONCE(meta->overrun, local_read(&cpu_buffer->overrun));
+	WRITE_ONCE(meta->read, cpu_buffer->read);
+
+	WRITE_ONCE(meta->subbufs_touched, local_read(&cpu_buffer->pages_touched));
+	WRITE_ONCE(meta->subbufs_lost, local_read(&cpu_buffer->pages_lost));
+	WRITE_ONCE(meta->subbufs_read, local_read(&cpu_buffer->pages_read));
+
+	WRITE_ONCE(meta->reader.read, cpu_buffer->reader_page->read);
+	WRITE_ONCE(meta->reader.id, cpu_buffer->reader_page->id);
+	WRITE_ONCE(meta->reader.lost_events, cpu_buffer->lost_events);
+}
+
 static void
 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
 {
@@ -5204,6 +5229,9 @@  rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
 	cpu_buffer->lost_events = 0;
 	cpu_buffer->last_overrun = 0;
 
+	if (cpu_buffer->mapped)
+		rb_update_meta_page(cpu_buffer);
+
 	rb_head_page_activate(cpu_buffer);
 	cpu_buffer->pages_removed = 0;
 }
@@ -5418,6 +5446,11 @@  int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
 	cpu_buffer_a = buffer_a->buffers[cpu];
 	cpu_buffer_b = buffer_b->buffers[cpu];
 
+	if (READ_ONCE(cpu_buffer_a->mapped) || READ_ONCE(cpu_buffer_b->mapped)) {
+		ret = -EBUSY;
+		goto out;
+	}
+
 	/* At least make sure the two buffers are somewhat the same */
 	if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
 		goto out;
@@ -5682,7 +5715,8 @@  int ring_buffer_read_page(struct trace_buffer *buffer,
 	 * Otherwise, we can simply swap the page with the one passed in.
 	 */
 	if (read || (len < (commit - read)) ||
-	    cpu_buffer->reader_page == cpu_buffer->commit_page) {
+	    cpu_buffer->reader_page == cpu_buffer->commit_page ||
+	    cpu_buffer->mapped) {
 		struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
 		unsigned int rpos = read;
 		unsigned int pos = 0;
@@ -5901,6 +5935,11 @@  int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
 
 		cpu_buffer = buffer->buffers[cpu];
 
+		if (cpu_buffer->mapped) {
+			err = -EBUSY;
+			goto error;
+		}
+
 		/* Update the number of pages to match the new size */
 		nr_pages = old_size * buffer->buffers[cpu]->nr_pages;
 		nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size);
@@ -6002,6 +6041,295 @@  int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
 }
 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set);
 
+#define subbuf_page(off, start) \
+	virt_to_page((void *)(start + (off << PAGE_SHIFT)))
+
+#define foreach_subbuf_page(sub_order, start, page)		\
+	page = subbuf_page(0, (start));				\
+	for (int __off = 0; __off < (1 << (sub_order));		\
+	     __off++, page = subbuf_page(__off, (start)))
+
+static inline void subbuf_map_prepare(unsigned long subbuf_start, int order)
+{
+	struct page *page;
+
+	/*
+	 * When allocating order > 0 pages, only the first struct page has a
+	 * refcount > 1. Increasing the refcount here ensures none of the struct
+	 * page composing the sub-buffer is freeed when the mapping is closed.
+	 */
+	foreach_subbuf_page(order, subbuf_start, page)
+		page_ref_inc(page);
+}
+
+static inline void subbuf_unmap(unsigned long subbuf_start, int order)
+{
+	struct page *page;
+
+	foreach_subbuf_page(order, subbuf_start, page) {
+		page_ref_dec(page);
+		page->mapping = NULL;
+	}
+}
+
+static void rb_free_subbuf_ids(struct ring_buffer_per_cpu *cpu_buffer)
+{
+	int sub_id;
+
+	for (sub_id = 0; sub_id < cpu_buffer->nr_pages + 1; sub_id++)
+		subbuf_unmap(cpu_buffer->subbuf_ids[sub_id],
+			     cpu_buffer->buffer->subbuf_order);
+
+	kfree(cpu_buffer->subbuf_ids);
+	cpu_buffer->subbuf_ids = NULL;
+}
+
+static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+	if (cpu_buffer->meta_page)
+		return 0;
+
+	cpu_buffer->meta_page = page_to_virt(alloc_page(GFP_USER | __GFP_ZERO));
+	if (!cpu_buffer->meta_page)
+		return -ENOMEM;
+
+	return 0;
+}
+
+static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+	unsigned long addr = (unsigned long)cpu_buffer->meta_page;
+
+	virt_to_page((void *)addr)->mapping = NULL;
+	free_page(addr);
+	cpu_buffer->meta_page = NULL;
+}
+
+static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer,
+				   unsigned long *subbuf_ids)
+{
+	struct trace_buffer_meta *meta = cpu_buffer->meta_page;
+	unsigned int nr_subbufs = cpu_buffer->nr_pages + 1;
+	struct buffer_page *first_subbuf, *subbuf;
+	int id = 0;
+
+	subbuf_ids[id] = (unsigned long)cpu_buffer->reader_page->page;
+	subbuf_map_prepare(subbuf_ids[id], cpu_buffer->buffer->subbuf_order);
+	cpu_buffer->reader_page->id = id++;
+
+	first_subbuf = subbuf = rb_set_head_page(cpu_buffer);
+	do {
+		if (id >= nr_subbufs) {
+			WARN_ON(1);
+			break;
+		}
+
+		subbuf_ids[id] = (unsigned long)subbuf->page;
+		subbuf->id = id;
+		subbuf_map_prepare(subbuf_ids[id], cpu_buffer->buffer->subbuf_order);
+
+		rb_inc_page(&subbuf);
+		id++;
+	} while (subbuf != first_subbuf);
+
+	/* install subbuf ID to kern VA translation */
+	cpu_buffer->subbuf_ids = subbuf_ids;
+
+	meta->meta_page_size = PAGE_SIZE;
+	meta->meta_struct_len = sizeof(*meta);
+	meta->nr_subbufs = nr_subbufs;
+	meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE;
+
+	rb_update_meta_page(cpu_buffer);
+}
+
+static inline struct ring_buffer_per_cpu *
+rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu)
+{
+	struct ring_buffer_per_cpu *cpu_buffer;
+
+	if (!cpumask_test_cpu(cpu, buffer->cpumask))
+		return ERR_PTR(-EINVAL);
+
+	cpu_buffer = buffer->buffers[cpu];
+
+	mutex_lock(&cpu_buffer->mapping_lock);
+
+	if (!cpu_buffer->mapped) {
+		mutex_unlock(&cpu_buffer->mapping_lock);
+		return ERR_PTR(-ENODEV);
+	}
+
+	return cpu_buffer;
+}
+
+static inline void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer)
+{
+	mutex_unlock(&cpu_buffer->mapping_lock);
+}
+
+int ring_buffer_map(struct trace_buffer *buffer, int cpu)
+{
+	struct ring_buffer_per_cpu *cpu_buffer;
+	unsigned long flags, *subbuf_ids;
+	int err = 0;
+
+	if (!cpumask_test_cpu(cpu, buffer->cpumask))
+		return -EINVAL;
+
+	cpu_buffer = buffer->buffers[cpu];
+
+	mutex_lock(&cpu_buffer->mapping_lock);
+
+	if (cpu_buffer->mapped) {
+		if (cpu_buffer->mapped == INT_MAX)
+			err = -EBUSY;
+		else
+			WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped + 1);
+		mutex_unlock(&cpu_buffer->mapping_lock);
+		return err;
+	}
+
+	/* prevent another thread from changing buffer sizes */
+	mutex_lock(&buffer->mutex);
+
+	err = rb_alloc_meta_page(cpu_buffer);
+	if (err)
+		goto unlock;
+
+	/* subbuf_ids include the reader while nr_pages does not */
+	subbuf_ids = kzalloc(sizeof(*subbuf_ids) * (cpu_buffer->nr_pages + 1),
+			   GFP_KERNEL);
+	if (!subbuf_ids) {
+		rb_free_meta_page(cpu_buffer);
+		err = -ENOMEM;
+		goto unlock;
+	}
+
+	atomic_inc(&cpu_buffer->resize_disabled);
+
+	/*
+	 * Lock all readers to block any subbuf swap until the subbuf IDs are
+	 * assigned.
+	 */
+	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+
+	rb_setup_ids_meta_page(cpu_buffer, subbuf_ids);
+
+	WRITE_ONCE(cpu_buffer->mapped, 1);
+
+	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+unlock:
+	mutex_unlock(&buffer->mutex);
+	mutex_unlock(&cpu_buffer->mapping_lock);
+
+	return err;
+}
+
+int ring_buffer_unmap(struct trace_buffer *buffer, int cpu)
+{
+	struct ring_buffer_per_cpu *cpu_buffer;
+	int err = 0;
+
+	if (!cpumask_test_cpu(cpu, buffer->cpumask))
+		return -EINVAL;
+
+	cpu_buffer = buffer->buffers[cpu];
+
+	mutex_lock(&cpu_buffer->mapping_lock);
+
+	if (!cpu_buffer->mapped) {
+		err = -ENODEV;
+		goto unlock;
+	}
+
+	WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped - 1);
+	if (!cpu_buffer->mapped) {
+		/* Wait for the writer and readers to observe !mapped */
+		synchronize_rcu();
+
+		rb_free_subbuf_ids(cpu_buffer);
+		rb_free_meta_page(cpu_buffer);
+		atomic_dec(&cpu_buffer->resize_disabled);
+	}
+unlock:
+	mutex_unlock(&cpu_buffer->mapping_lock);
+
+	return err;
+}
+
+/*
+ *   +--------------+  pgoff == 0
+ *   |   meta page  |
+ *   +--------------+  pgoff == 1
+ *   | subbuffer 0  |
+ *   +--------------+  pgoff == 1 + (1 << subbuf_order)
+ *   | subbuffer 1  |
+ *         ...
+ */
+struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
+				   unsigned long pgoff)
+{
+	struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
+	unsigned long subbuf_id, subbuf_offset, addr;
+	struct page *page;
+
+	if (!pgoff)
+		return virt_to_page((void *)cpu_buffer->meta_page);
+
+	pgoff--;
+
+	subbuf_id = pgoff >> buffer->subbuf_order;
+	if (subbuf_id > cpu_buffer->nr_pages)
+		return NULL;
+
+	subbuf_offset = pgoff & ((1UL << buffer->subbuf_order) - 1);
+	addr = cpu_buffer->subbuf_ids[subbuf_id] + (subbuf_offset * PAGE_SIZE);
+	page = virt_to_page((void *)addr);
+
+	return page;
+}
+
+int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu)
+{
+	struct ring_buffer_per_cpu *cpu_buffer;
+	unsigned long reader_size;
+	unsigned long flags;
+
+	cpu_buffer = rb_get_mapped_buffer(buffer, cpu);
+	if (IS_ERR(cpu_buffer))
+		return (int)PTR_ERR(cpu_buffer);
+
+	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+consume:
+	if (rb_per_cpu_empty(cpu_buffer))
+		goto out;
+
+	reader_size = rb_page_size(cpu_buffer->reader_page);
+
+	/*
+	 * There are data to be read on the current reader page, we can
+	 * return to the caller. But before that, we assume the latter will read
+	 * everything. Let's update the kernel reader accordingly.
+	 */
+	if (cpu_buffer->reader_page->read < reader_size) {
+		while (cpu_buffer->reader_page->read < reader_size)
+			rb_advance_reader(cpu_buffer);
+		goto out;
+	}
+
+	if (WARN_ON(!rb_get_reader_page(cpu_buffer)))
+		goto out;
+
+	goto consume;
+out:
+	rb_update_meta_page(cpu_buffer);
+	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+	rb_put_mapped_buffer(cpu_buffer);
+
+	return 0;
+}
+
 /*
  * We only allocate new buffers, never free them if the CPU goes down.
  * If we were to free the buffer, then the user would lose any trace that was in