[v2,1/2] ring-buffer: Introducing ring-buffer mapping functions

Message ID 20230322102244.3239740-2-vdonnefort@google.com
State New
Headers
Series Introducing trace buffer mapping by user-space |

Commit Message

Vincent Donnefort March 22, 2023, 10:22 a.m. UTC
  In preparation for allowing the user-space to map a ring-buffer, add
a set of mapping functions:

  ring_buffer_{map,unmap}()
  ring_buffer_map_fault()

And controls on the ring-buffer:

  ring_buffer_get_reader_page()  /* swap reader and head */
  ring_buffer_update_meta_page()

Mapping the ring-buffer also involves:

  A unique ID for each page of the ring-buffer, as currently the pages
  are only identified through their in-kernel VA.

  A meta-page, where are stored statistics about the ring-buffer and
  a page IDs list, ordered. A field gives what page is the reader
  one and one to gives where the ring-buffer starts in the list of data
  pages.

The linear mapping exposes the meta-page, and each page of the
ring-buffer, ordered following their unique ID, assigned during the
first mapping.

Once mapped, no page can get in or out of the ring-buffer: the buffer
size will remain unmodified and the splice enabling functions will in
reality simply memcpy the data instead of swapping pages.

Signed-off-by: Vincent Donnefort <vdonnefort@google.com>
  

Comments

Steven Rostedt March 29, 2023, 2:44 a.m. UTC | #1
On Wed, 22 Mar 2023 10:22:43 +0000
Vincent Donnefort <vdonnefort@google.com> wrote:

> +#include <linux/types.h>
> +
> +struct ring_buffer_meta_page_header {
> +#if __BITS_PER_LONG == 64
> +	__u64	entries;
> +	__u64	overrun;
> +#else
> +	__u32	entries;
> +	__u32	overrun;
> +#endif
> +	__u32	pages_touched;
> +	__u32	meta_page_size;
> +	__u32	reader_page;	/* page ID for the reader page */
> +	__u32	nr_data_pages;	/* doesn't take into account the reader_page */
> +	__u32	data_page_head;	/* ring-buffer head as an offset from data_start */
> +	__u32	data_start;	/* offset within the meta page */
> +};
> +

I've been playing with this a bit, and I'm thinking, do we need the
data_pages[] array on the meta page?

I noticed that I'm not even using it.

Currently, we need to do a ioctl every time we finish with the reader page,
and that updates the reader_page in the meta data to point to the next page
to read. When do we need to look at the data_start section?

-- Steve
  
Vincent Donnefort March 29, 2023, 9:19 a.m. UTC | #2
On Tue, Mar 28, 2023 at 10:44:11PM -0400, Steven Rostedt wrote:
> On Wed, 22 Mar 2023 10:22:43 +0000
> Vincent Donnefort <vdonnefort@google.com> wrote:
> 
> > +#include <linux/types.h>
> > +
> > +struct ring_buffer_meta_page_header {
> > +#if __BITS_PER_LONG == 64
> > +	__u64	entries;
> > +	__u64	overrun;
> > +#else
> > +	__u32	entries;
> > +	__u32	overrun;
> > +#endif
> > +	__u32	pages_touched;
> > +	__u32	meta_page_size;
> > +	__u32	reader_page;	/* page ID for the reader page */
> > +	__u32	nr_data_pages;	/* doesn't take into account the reader_page */
> > +	__u32	data_page_head;	/* ring-buffer head as an offset from data_start */
> > +	__u32	data_start;	/* offset within the meta page */
> > +};
> > +
> 
> I've been playing with this a bit, and I'm thinking, do we need the
> data_pages[] array on the meta page?
> 
> I noticed that I'm not even using it.
> 
> Currently, we need to do a ioctl every time we finish with the reader page,
> and that updates the reader_page in the meta data to point to the next page
> to read. When do we need to look at the data_start section?

This is for non-consuming read, to get all the pages in order.

If we remove this section we would lose this ability ... but we'd also simplify
the code by a good order of magnitude (don't need the update ioctl anymore, no
need to keep those pages in order and everything can fit a 0-order meta-page).
And the non-consuming read doesn't bring much to the user over the pipe version.

This will although impact our hypervisor tracing which will only be able to
expose trace_pipe interfaces. But I don't think it is a problem, all userspace
tools only relying on consuming read anyway.

So if you're happy dropping this support, let's get rid of it.
  
Steven Rostedt March 29, 2023, 11:03 a.m. UTC | #3
On Wed, 29 Mar 2023 10:19:44 +0100
Vincent Donnefort <vdonnefort@google.com> wrote:

> > I've been playing with this a bit, and I'm thinking, do we need the
> > data_pages[] array on the meta page?
> > 
> > I noticed that I'm not even using it.
> > 
> > Currently, we need to do a ioctl every time we finish with the reader page,
> > and that updates the reader_page in the meta data to point to the next page
> > to read. When do we need to look at the data_start section?  
> 
> This is for non-consuming read, to get all the pages in order.

Yeah, I was trying to see how a non consuming read would work, and was
having issues figuring that out without the tail page being updated.

> 
> If we remove this section we would lose this ability ... but we'd also simplify
> the code by a good order of magnitude (don't need the update ioctl anymore, no
> need to keep those pages in order and everything can fit a 0-order meta-page).
> And the non-consuming read doesn't bring much to the user over the pipe version.
> 
> This will although impact our hypervisor tracing which will only be able to
> expose trace_pipe interfaces. But I don't think it is a problem, all userspace
> tools only relying on consuming read anyway.
> 
> So if you're happy dropping this support, let's get rid of it.

I don't really want to get rid of it, but perhaps break it up where we
don't have it in the first release, but add it in a second one. That will
also make sure that we can expand the API if necessary (one reason I wanted
the "data_start" in the first place).

Let's drop it for now, but be able to add it later, an have the current
structure be:

struct ring_buffer_meta_page_header {
#if __BITS_PER_LONG == 64
	__u64	entries;
	__u64	overrun;
#else
	__u32	entries;
	__u32	overrun;
#endif
	__u32	pages_touched;
	__u32	meta_page_size;
	__u32	reader_page;	/* page ID for the reader page */
	__u32	nr_data_pages;	/* doesn't take into account the reader_page */
};

BTW, shouldn't the nr_data_pages take into account the reader page? As it
is part of the array we traverse isn't it?

-- Steve
  
Steven Rostedt March 29, 2023, 12:07 p.m. UTC | #4
On Wed, 29 Mar 2023 07:03:53 -0400
Steven Rostedt <rostedt@goodmis.org> wrote:

> struct ring_buffer_meta_page_header {
> #if __BITS_PER_LONG == 64
> 	__u64	entries;
> 	__u64	overrun;
> #else
> 	__u32	entries;
> 	__u32	overrun;
> #endif
> 	__u32	pages_touched;
> 	__u32	meta_page_size;
> 	__u32	reader_page;	/* page ID for the reader page */
> 	__u32	nr_data_pages;	/* doesn't take into account the reader_page */
> };
> 
> BTW, shouldn't the nr_data_pages take into account the reader page? As it
> is part of the array we traverse isn't it?

Ah, I guess nr_data_pages is the length of the index mapping, not the
array of pages mapped?

-- Steve
  
Vincent Donnefort March 29, 2023, 12:23 p.m. UTC | #5
On Wed, Mar 29, 2023 at 07:03:53AM -0400, Steven Rostedt wrote:
> On Wed, 29 Mar 2023 10:19:44 +0100
> Vincent Donnefort <vdonnefort@google.com> wrote:
> 
> > > I've been playing with this a bit, and I'm thinking, do we need the
> > > data_pages[] array on the meta page?
> > > 
> > > I noticed that I'm not even using it.
> > > 
> > > Currently, we need to do a ioctl every time we finish with the reader page,
> > > and that updates the reader_page in the meta data to point to the next page
> > > to read. When do we need to look at the data_start section?  
> > 
> > This is for non-consuming read, to get all the pages in order.
> 
> Yeah, I was trying to see how a non consuming read would work, and was
> having issues figuring that out without the tail page being updated.

Would the userspace really need to know where is the tail page? It can just stop
whenever it finds out a page doesn't have any events, and make sure it does not
loop once back to the head?

> 
> > 
> > If we remove this section we would lose this ability ... but we'd also simplify
> > the code by a good order of magnitude (don't need the update ioctl anymore, no
> > need to keep those pages in order and everything can fit a 0-order meta-page).
> > And the non-consuming read doesn't bring much to the user over the pipe version.
> > 
> > This will although impact our hypervisor tracing which will only be able to
> > expose trace_pipe interfaces. But I don't think it is a problem, all userspace
> > tools only relying on consuming read anyway.
> > 
> > So if you're happy dropping this support, let's get rid of it.
> 
> I don't really want to get rid of it, but perhaps break it up where we
> don't have it in the first release, but add it in a second one. That will
> also make sure that we can expand the API if necessary (one reason I wanted
> the "data_start" in the first place).
> 
> Let's drop it for now, but be able to add it later, an have the current
> structure be:

Ok, I will prepare a V3 accordingly.

> 
> struct ring_buffer_meta_page_header {
> #if __BITS_PER_LONG == 64
> 	__u64	entries;
> 	__u64	overrun;
> #else
> 	__u32	entries;
> 	__u32	overrun;
> #endif
> 	__u32	pages_touched;
> 	__u32	meta_page_size;
> 	__u32	reader_page;	/* page ID for the reader page */
> 	__u32	nr_data_pages;	/* doesn't take into account the reader_page */
> };
> 
> BTW, shouldn't the nr_data_pages take into account the reader page? As it
> is part of the array we traverse isn't it?

It depends if the reader page has ever been swapped out. If yes, the reader
would have to start from reader_page and then switch to the data_pages.
Which sounds like a fiddly interface for the userspace.

So yeah, consuming-read only feels like a better start.

> 
> -- Steve
  
Vincent Donnefort March 29, 2023, 12:27 p.m. UTC | #6
On Wed, Mar 29, 2023 at 08:07:58AM -0400, Steven Rostedt wrote:
> On Wed, 29 Mar 2023 07:03:53 -0400
> Steven Rostedt <rostedt@goodmis.org> wrote:
> 
> > struct ring_buffer_meta_page_header {
> > #if __BITS_PER_LONG == 64
> > 	__u64	entries;
> > 	__u64	overrun;
> > #else
> > 	__u32	entries;
> > 	__u32	overrun;
> > #endif
> > 	__u32	pages_touched;
> > 	__u32	meta_page_size;
> > 	__u32	reader_page;	/* page ID for the reader page */
> > 	__u32	nr_data_pages;	/* doesn't take into account the reader_page */
> > };
> > 
> > BTW, shouldn't the nr_data_pages take into account the reader page? As it
> > is part of the array we traverse isn't it?
> 
> Ah, I guess nr_data_pages is the length of the index mapping, not the
> array of pages mapped?

Yes correct, data_pages[nr_data_pages] and the reader_page being excluded...
which might not be the easiest interface, as the size of the buffer to read
depends on if the reader_page has data to be read or not.

> 
> -- Steve
  
Steven Rostedt March 29, 2023, 12:47 p.m. UTC | #7
On Wed, 29 Mar 2023 13:23:01 +0100
Vincent Donnefort <vdonnefort@google.com> wrote:

> On Wed, Mar 29, 2023 at 07:03:53AM -0400, Steven Rostedt wrote:
> > On Wed, 29 Mar 2023 10:19:44 +0100
> > Vincent Donnefort <vdonnefort@google.com> wrote:
> >   
> > > > I've been playing with this a bit, and I'm thinking, do we need the
> > > > data_pages[] array on the meta page?
> > > > 
> > > > I noticed that I'm not even using it.
> > > > 
> > > > Currently, we need to do a ioctl every time we finish with the reader page,
> > > > and that updates the reader_page in the meta data to point to the next page
> > > > to read. When do we need to look at the data_start section?    
> > > 
> > > This is for non-consuming read, to get all the pages in order.  
> > 
> > Yeah, I was trying to see how a non consuming read would work, and was
> > having issues figuring that out without the tail page being updated.  
> 
> Would the userspace really need to know where is the tail page? It can just stop
> whenever it finds out a page doesn't have any events, and make sure it does not
> loop once back to the head?

I'm trying to come up with a possible algorithm that doesn't need
ioctls. It would need to know if the writer moved or not. Probably need
a counter that gets incremented every time the writer goes to a new page.

Having the tail page was just a convenient way to know where the end is.

> 
> >   
> > > 
> > > If we remove this section we would lose this ability ... but we'd also simplify
> > > the code by a good order of magnitude (don't need the update ioctl anymore, no
> > > need to keep those pages in order and everything can fit a 0-order meta-page).
> > > And the non-consuming read doesn't bring much to the user over the pipe version.
> > > 
> > > This will although impact our hypervisor tracing which will only be able to
> > > expose trace_pipe interfaces. But I don't think it is a problem, all userspace
> > > tools only relying on consuming read anyway.
> > > 
> > > So if you're happy dropping this support, let's get rid of it.  
> > 
> > I don't really want to get rid of it, but perhaps break it up where we
> > don't have it in the first release, but add it in a second one. That will
> > also make sure that we can expand the API if necessary (one reason I wanted
> > the "data_start" in the first place).
> > 
> > Let's drop it for now, but be able to add it later, an have the current
> > structure be:  
> 
> Ok, I will prepare a V3 accordingly.
> 
> > 
> > struct ring_buffer_meta_page_header {
> > #if __BITS_PER_LONG == 64
> > 	__u64	entries;
> > 	__u64	overrun;
> > #else
> > 	__u32	entries;
> > 	__u32	overrun;
> > #endif
> > 	__u32	pages_touched;
> > 	__u32	meta_page_size;
> > 	__u32	reader_page;	/* page ID for the reader page */
> > 	__u32	nr_data_pages;	/* doesn't take into account the reader_page */
> > };
> > 
> > BTW, shouldn't the nr_data_pages take into account the reader page? As it
> > is part of the array we traverse isn't it?  
> 
> It depends if the reader page has ever been swapped out. If yes, the reader
> would have to start from reader_page and then switch to the data_pages.
> Which sounds like a fiddly interface for the userspace.
> 
> So yeah, consuming-read only feels like a better start.
> 

I agree. I'd like to get something in that can be extended, but simple
enough that it's not too much of a barrier wrt getting the API correct.

-- Steve
  
Steven Rostedt March 29, 2023, 12:51 p.m. UTC | #8
On Wed, 29 Mar 2023 07:03:53 -0400
Steven Rostedt <rostedt@goodmis.org> wrote:

> struct ring_buffer_meta_page_header {
> #if __BITS_PER_LONG == 64
> 	__u64	entries;
> 	__u64	overrun;
> #else
> 	__u32	entries;
> 	__u32	overrun;
> #endif
> 	__u32	pages_touched;
> 	__u32	meta_page_size;
> 	__u32	reader_page;	/* page ID for the reader page */
> 	__u32	nr_data_pages;	/* doesn't take into account the reader_page */
> };

Oh, I guess we should also expose the amount read on the reader page,
that gets updated on the ioctl. That is, if the first time we read the
reader page and the page is not full and unmap the pages, and then new
events were added to the reader page, we should not re-read the events
that were read previously.

That is, expose cpu_buffer->reader_page->read

-- Steve
  
Vincent Donnefort March 29, 2023, 1:01 p.m. UTC | #9
On Wed, Mar 29, 2023 at 08:51:06AM -0400, Steven Rostedt wrote:
> On Wed, 29 Mar 2023 07:03:53 -0400
> Steven Rostedt <rostedt@goodmis.org> wrote:
> 
> > struct ring_buffer_meta_page_header {
> > #if __BITS_PER_LONG == 64
> > 	__u64	entries;
> > 	__u64	overrun;
> > #else
> > 	__u32	entries;
> > 	__u32	overrun;
> > #endif
> > 	__u32	pages_touched;
> > 	__u32	meta_page_size;
> > 	__u32	reader_page;	/* page ID for the reader page */
> > 	__u32	nr_data_pages;	/* doesn't take into account the reader_page */
> > };
> 
> Oh, I guess we should also expose the amount read on the reader page,
> that gets updated on the ioctl. That is, if the first time we read the
> reader page and the page is not full and unmap the pages, and then new
> events were added to the reader page, we should not re-read the events
> that were read previously.
> 
> That is, expose cpu_buffer->reader_page->read

Couldn't it be an issue of updating cpu_buffer->reader_page->read during the
ioctl? I guess we would write the value of the current written events on that
page, hopping for the userspace reader to read it all.

But then if new events are written, the reader doesn't need the ioctl to read
them, it can just check the meta->entries field or the commit field in the
reader_page header?

So it's much likely cpu_buffer->reader_page->read will go out of sync?

> 
> -- Steve
  
Vincent Donnefort March 29, 2023, 1:10 p.m. UTC | #10
On Wed, Mar 29, 2023 at 08:47:35AM -0400, Steven Rostedt wrote:
> On Wed, 29 Mar 2023 13:23:01 +0100
> Vincent Donnefort <vdonnefort@google.com> wrote:
> 
> > On Wed, Mar 29, 2023 at 07:03:53AM -0400, Steven Rostedt wrote:
> > > On Wed, 29 Mar 2023 10:19:44 +0100
> > > Vincent Donnefort <vdonnefort@google.com> wrote:
> > >   
> > > > > I've been playing with this a bit, and I'm thinking, do we need the
> > > > > data_pages[] array on the meta page?
> > > > > 
> > > > > I noticed that I'm not even using it.
> > > > > 
> > > > > Currently, we need to do a ioctl every time we finish with the reader page,
> > > > > and that updates the reader_page in the meta data to point to the next page
> > > > > to read. When do we need to look at the data_start section?    
> > > > 
> > > > This is for non-consuming read, to get all the pages in order.  
> > > 
> > > Yeah, I was trying to see how a non consuming read would work, and was
> > > having issues figuring that out without the tail page being updated.  
> > 
> > Would the userspace really need to know where is the tail page? It can just stop
> > whenever it finds out a page doesn't have any events, and make sure it does not
> > loop once back to the head?
> 
> I'm trying to come up with a possible algorithm that doesn't need
> ioctls. It would need to know if the writer moved or not. Probably need
> a counter that gets incremented every time the writer goes to a new page.

The v2 of this series only updates the head page in the update ioctl
(ring_buffer_update_meta_page()) Couldn't find a nice way around that as it can
be either updated by the reader or the writer. So the best solution seemed a
call to set_head_page().

[...]
  
Steven Rostedt March 29, 2023, 1:11 p.m. UTC | #11
On Wed, 29 Mar 2023 14:01:01 +0100
Vincent Donnefort <vdonnefort@google.com> wrote:

> > Oh, I guess we should also expose the amount read on the reader page,
> > that gets updated on the ioctl. That is, if the first time we read the
> > reader page and the page is not full and unmap the pages, and then new
> > events were added to the reader page, we should not re-read the events
> > that were read previously.
> > 
> > That is, expose cpu_buffer->reader_page->read  
> 
> Couldn't it be an issue of updating cpu_buffer->reader_page->read during the
> ioctl? I guess we would write the value of the current written events on that
> page, hopping for the userspace reader to read it all.
> 
> But then if new events are written, the reader doesn't need the ioctl to read
> them, it can just check the meta->entries field or the commit field in the
> reader_page header?
> 
> So it's much likely cpu_buffer->reader_page->read will go out of sync?

Here's the issue I found during testing:

write 10 events to ring buffer (all go into the reader page)

Run application that maps the pages, and reads the 10 events, and exits.

Write 10 more events to ring buffer (all are appended to the reader page)

Run application that maps the pages and reads 20 events, and exits.

It read the 10 previous events, but should not have. It should have
only read the last 10 that were not read previously.

-- Steve
  
Steven Rostedt March 29, 2023, 1:14 p.m. UTC | #12
On Wed, 29 Mar 2023 14:10:09 +0100
Vincent Donnefort <vdonnefort@google.com> wrote:

> The v2 of this series only updates the head page in the update ioctl
> (ring_buffer_update_meta_page()) Couldn't find a nice way around that as it can
> be either updated by the reader or the writer. So the best solution seemed a
> call to set_head_page().

Yes, the update will race between readers and writers. Let's not worry
about this at the moment and not expose it yet. I'm looking at other
ways around this too.

-- Steve
  
Vincent Donnefort March 29, 2023, 1:31 p.m. UTC | #13
On Wed, Mar 29, 2023 at 09:11:07AM -0400, Steven Rostedt wrote:
> On Wed, 29 Mar 2023 14:01:01 +0100
> Vincent Donnefort <vdonnefort@google.com> wrote:
> 
> > > Oh, I guess we should also expose the amount read on the reader page,
> > > that gets updated on the ioctl. That is, if the first time we read the
> > > reader page and the page is not full and unmap the pages, and then new
> > > events were added to the reader page, we should not re-read the events
> > > that were read previously.
> > > 
> > > That is, expose cpu_buffer->reader_page->read  
> > 
> > Couldn't it be an issue of updating cpu_buffer->reader_page->read during the
> > ioctl? I guess we would write the value of the current written events on that
> > page, hopping for the userspace reader to read it all.
> > 
> > But then if new events are written, the reader doesn't need the ioctl to read
> > them, it can just check the meta->entries field or the commit field in the
> > reader_page header?
> > 
> > So it's much likely cpu_buffer->reader_page->read will go out of sync?
> 
> Here's the issue I found during testing:
> 
> write 10 events to ring buffer (all go into the reader page)
> 
> Run application that maps the pages, and reads the 10 events, and exits.
> 
> Write 10 more events to ring buffer (all are appended to the reader page)
> 
> Run application that maps the pages and reads 20 events, and exits.
> 
> It read the 10 previous events, but should not have. It should have
> only read the last 10 that were not read previously.

I see.

We can say we update cpu_buffer->reader_page->read on the get_reader_page ioctl,
to the most recent value possible, which will have the consequence of actually
"flushing" those events?

If the reader decides to read events past this value then it just can't expect
them to not be duplicated?

I suppose it'd be down the reader to store meta->read somehwere?

   prev_read = meta->read
   ioctl(fd, TRACE_MMAP_IOCTL_GET_READER_PAGE)
   /* read events from prev_read to meta->read */


> 
> -- Steve
  
Steven Rostedt March 29, 2023, 1:36 p.m. UTC | #14
On Wed, 29 Mar 2023 14:31:07 +0100
Vincent Donnefort <vdonnefort@google.com> wrote:

> We can say we update cpu_buffer->reader_page->read on the get_reader_page ioctl,
> to the most recent value possible, which will have the consequence of actually
> "flushing" those events?

Yes. It should be no different than doing a normal read of the
trace_pipe_raw file, which does the same.

> 
> If the reader decides to read events past this value then it just can't expect
> them to not be duplicated?
> 
> I suppose it'd be down the reader to store meta->read somehwere?
> 
>    prev_read = meta->read
>    ioctl(fd, TRACE_MMAP_IOCTL_GET_READER_PAGE)
>    /* read events from prev_read to meta->read */

Yes, in fact it shouldn't need to call the ioctl until after it read it.

Maybe, we should have the ioctl take a parameter of how much was read?
To prevent races?

That is, it should pass in the page->commit that it used to to read the
pages.

-- Steve
  
Vincent Donnefort March 29, 2023, 1:55 p.m. UTC | #15
On Wed, Mar 29, 2023 at 09:36:46AM -0400, Steven Rostedt wrote:
> On Wed, 29 Mar 2023 14:31:07 +0100
> Vincent Donnefort <vdonnefort@google.com> wrote:
> 
> > We can say we update cpu_buffer->reader_page->read on the get_reader_page ioctl,
> > to the most recent value possible, which will have the consequence of actually
> > "flushing" those events?
> 
> Yes. It should be no different than doing a normal read of the
> trace_pipe_raw file, which does the same.
> 
> > 
> > If the reader decides to read events past this value then it just can't expect
> > them to not be duplicated?
> > 
> > I suppose it'd be down the reader to store meta->read somehwere?
> > 
> >    prev_read = meta->read
> >    ioctl(fd, TRACE_MMAP_IOCTL_GET_READER_PAGE)
> >    /* read events from prev_read to meta->read */
> 
> Yes, in fact it shouldn't need to call the ioctl until after it read it.
> 
> Maybe, we should have the ioctl take a parameter of how much was read?
> To prevent races?

Races would only be with other consuming readers. In that case we'd probably
have many other problems anyway as I suppose nothing would prevent another one
of swapping the page while our userspace reader is still processing it?

I don't know if this is worth splitting the ABI between the meta-page and the
ioctl parameters for this?

Or maybe we should say the meta-page contains things modified by the writer and
parameters modified by the reader are passed by the get_reader_page ioctl i.e.
the reader page ID and cpu_buffer->reader_page->read? (for the hyp tracing, we
have up to 4 registers for the HVC which would replace in our case the ioctl)

> 
> That is, it should pass in the page->commit that it used to to read the
> pages.
> 
> -- Steve
  
Vincent Donnefort March 29, 2023, 3:08 p.m. UTC | #16
On Wed, Mar 29, 2023 at 02:55:41PM +0100, Vincent Donnefort wrote:
> On Wed, Mar 29, 2023 at 09:36:46AM -0400, Steven Rostedt wrote:
> > On Wed, 29 Mar 2023 14:31:07 +0100
> > Vincent Donnefort <vdonnefort@google.com> wrote:
> > 
> > > We can say we update cpu_buffer->reader_page->read on the get_reader_page ioctl,
> > > to the most recent value possible, which will have the consequence of actually
> > > "flushing" those events?
> > 
> > Yes. It should be no different than doing a normal read of the
> > trace_pipe_raw file, which does the same.
> > 
> > > 
> > > If the reader decides to read events past this value then it just can't expect
> > > them to not be duplicated?
> > > 
> > > I suppose it'd be down the reader to store meta->read somehwere?
> > > 
> > >    prev_read = meta->read
> > >    ioctl(fd, TRACE_MMAP_IOCTL_GET_READER_PAGE)
> > >    /* read events from prev_read to meta->read */
> > 
> > Yes, in fact it shouldn't need to call the ioctl until after it read it.
> > 
> > Maybe, we should have the ioctl take a parameter of how much was read?
> > To prevent races?
> 
> Races would only be with other consuming readers. In that case we'd probably
> have many other problems anyway as I suppose nothing would prevent another one
> of swapping the page while our userspace reader is still processing it?
> 
> I don't know if this is worth splitting the ABI between the meta-page and the
> ioctl parameters for this?
> 
> Or maybe we should say the meta-page contains things modified by the writer and
> parameters modified by the reader are passed by the get_reader_page ioctl i.e.
> the reader page ID and cpu_buffer->reader_page->read? (for the hyp tracing, we
> have up to 4 registers for the HVC which would replace in our case the ioctl)

Or we can keep everything in the meta-page but update the "reader bits" only
during the get_reader_page ioctl.

I can prepare something around those lines for the v3.

> 
> > 
> > That is, it should pass in the page->commit that it used to to read the
> > pages.
> > 
> > -- Steve
  
Steven Rostedt March 29, 2023, 3:32 p.m. UTC | #17
On Wed, 29 Mar 2023 14:55:41 +0100
Vincent Donnefort <vdonnefort@google.com> wrote:

> > Yes, in fact it shouldn't need to call the ioctl until after it read it.
> > 
> > Maybe, we should have the ioctl take a parameter of how much was read?
> > To prevent races?  
> 
> Races would only be with other consuming readers. In that case we'd probably
> have many other problems anyway as I suppose nothing would prevent another one
> of swapping the page while our userspace reader is still processing it?

I'm not worried about user space readers. I'm worried about writers, as
the ioctl will update the reader_page->read = reader_page->commit. The time
that the reader last read and stopped and then called the ioctl, a writer
could fill the page, then the ioctl may even swap the page. By passing in
the read amount, the ioctl will know if it needs to keep the same page or
not.

> 
> I don't know if this is worth splitting the ABI between the meta-page and the
> ioctl parameters for this?
> 
> Or maybe we should say the meta-page contains things modified by the writer and
> parameters modified by the reader are passed by the get_reader_page ioctl i.e.
> the reader page ID and cpu_buffer->reader_page->read? (for the hyp tracing, we
> have up to 4 registers for the HVC which would replace in our case the ioctl)

I don't think we need the reader_page id, as that should never move without
reader involvement. If there's more than one reader, that's up to the
readers to keep track of each other, not the kernel.

Which BTW, the more I look at doing this without ioctls, I think we may
need to update things slightly different.

I would keep the current approach, but for clarification of terminology, we
have:

meta_data - the data that holds information that is shared between user and
	kernel space.

data_pages - this is a separate mapping that holds the mapped ring buffer
	pages. In user space, this is one contiguous array and also holds
	the reader page.

data_index - This is an array of what the writer sees. It maps the index
	into data_pages[] of where to find the mapped pages. It does not
	contain the reader page. We currently map this with the meta_data,
	but that's not a requirement (although we may continue to do so).

I'm thinking that we make the data_index[] elements into a structure:

struct trace_map_data_index {
	int		idx;	/* index into data_pages[] */
	int		cnt;	/* counter updated by writer */
};

The cnt is initialized to zero when initially mapped.

Instead of having the bpage->id = index into data_pages[], have it equal
the index into data_index[].

The cpu_buffer->reader_page->id = -1;

meta_data->reader_page = index into data_pages[] of reader page

The swapping of the header page would look something like this:

static inline void
rb_meta_page_head_swap(struct ring_buffer_per_cpu *cpu_buffer)
{
	struct ring_buffer_meta_page *meta = cpu_buffer->meta_page;
	int head_page;

	if (!READ_ONCE(cpu_buffer->mapped))
		return;

	head_page = meta->data_pages[meta->hdr.data_page_head];
	meta->data_pages[meta->hdr.data_page_head] = meta->hdr.reader_page;
	meta->hdr.reader_page = head_page;
	meta->data_pages[head_page]->id = -1;
}

As hdr.data_page_head would be an index into data_index[] and not
data_pages[].

The fact that bpage->id points to the data_index[] and not the data_pages[]
means that the writer can easily get to that index, and modify the count.
That way, in rb_tail_page_update() (between cmpxchgs) we can do something
like:

	if (cpu_buffer->mapped) {
		meta = cpu_buffer->meta_page;
		meta->data_index[next_page->id].cnt++;
	}

And this will allow the reader to know if the current page it is on just
got overwritten by the writer, by doing:

	prev_id = meta->data_index[this_page].cnt;
	smp_rmb();
	read event (copy it, whatever)
	smp_rmb();
	if (prev_id != meta->data_index[this_page].cnt)
		/* read data may be corrupted, abort it */


Does this make sense?

-- Steve
  
Vincent Donnefort March 30, 2023, 10:30 a.m. UTC | #18
On Wed, Mar 29, 2023 at 11:32:34AM -0400, Steven Rostedt wrote:
> On Wed, 29 Mar 2023 14:55:41 +0100
> Vincent Donnefort <vdonnefort@google.com> wrote:
> 
> > > Yes, in fact it shouldn't need to call the ioctl until after it read it.
> > > 
> > > Maybe, we should have the ioctl take a parameter of how much was read?
> > > To prevent races?  
> > 
> > Races would only be with other consuming readers. In that case we'd probably
> > have many other problems anyway as I suppose nothing would prevent another one
> > of swapping the page while our userspace reader is still processing it?
> 
> I'm not worried about user space readers. I'm worried about writers, as
> the ioctl will update the reader_page->read = reader_page->commit. The time
> that the reader last read and stopped and then called the ioctl, a writer
> could fill the page, then the ioctl may even swap the page. By passing in
> the read amount, the ioctl will know if it needs to keep the same page or
> not.

How about?

userspace:

  prev_read = meta->read;
  ioctl(TRACE_MMAP_IOCTL_GET_READER_PAGE)

kernel:
    ring_buffer_get_reader_page()
      rb_get_reader_page(cpu_buffer);
      cpu_buffer->reader_page->read = rb_page_size(reader);
      meta->read = cpu_buffer->reader_page->read;

userspace:
   /* if new page prev_read = 0 */
   /* read between prev_read and meta->read */

If the writer does anything in-between, wouldn't rb_get_reader_page() handle it
nicely by returning the same reader as more would be there to read?

It is similar to rb_advance_reader() except we'd be moving several events at
once?

> 
> > 
> > I don't know if this is worth splitting the ABI between the meta-page and the
> > ioctl parameters for this?
> > 
> > Or maybe we should say the meta-page contains things modified by the writer and
> > parameters modified by the reader are passed by the get_reader_page ioctl i.e.
> > the reader page ID and cpu_buffer->reader_page->read? (for the hyp tracing, we
> > have up to 4 registers for the HVC which would replace in our case the ioctl)
> 
> I don't think we need the reader_page id, as that should never move without
> reader involvement. If there's more than one reader, that's up to the
> readers to keep track of each other, not the kernel.
> 
> Which BTW, the more I look at doing this without ioctls, I think we may
> need to update things slightly different.
> 
> I would keep the current approach, but for clarification of terminology, we
> have:
> 
> meta_data - the data that holds information that is shared between user and
> 	kernel space.
> 
> data_pages - this is a separate mapping that holds the mapped ring buffer
> 	pages. In user space, this is one contiguous array and also holds
> 	the reader page.
> 
> data_index - This is an array of what the writer sees. It maps the index
> 	into data_pages[] of where to find the mapped pages. It does not
> 	contain the reader page. We currently map this with the meta_data,
> 	but that's not a requirement (although we may continue to do so).
> 
> I'm thinking that we make the data_index[] elements into a structure:
> 
> struct trace_map_data_index {
> 	int		idx;	/* index into data_pages[] */
> 	int		cnt;	/* counter updated by writer */
> };
> 
> The cnt is initialized to zero when initially mapped.
> 
> Instead of having the bpage->id = index into data_pages[], have it equal
> the index into data_index[].
> 
> The cpu_buffer->reader_page->id = -1;
> 
> meta_data->reader_page = index into data_pages[] of reader page
> 
> The swapping of the header page would look something like this:
> 
> static inline void
> rb_meta_page_head_swap(struct ring_buffer_per_cpu *cpu_buffer)
> {
> 	struct ring_buffer_meta_page *meta = cpu_buffer->meta_page;
> 	int head_page;
> 
> 	if (!READ_ONCE(cpu_buffer->mapped))
> 		return;
> 
> 	head_page = meta->data_pages[meta->hdr.data_page_head];
> 	meta->data_pages[meta->hdr.data_page_head] = meta->hdr.reader_page;
> 	meta->hdr.reader_page = head_page;
> 	meta->data_pages[head_page]->id = -1;
> }
> 
> As hdr.data_page_head would be an index into data_index[] and not
> data_pages[].
> 
> The fact that bpage->id points to the data_index[] and not the data_pages[]
> means that the writer can easily get to that index, and modify the count.
> That way, in rb_tail_page_update() (between cmpxchgs) we can do something
> like:
> 
> 	if (cpu_buffer->mapped) {
> 		meta = cpu_buffer->meta_page;
> 		meta->data_index[next_page->id].cnt++;
> 	}
> 
> And this will allow the reader to know if the current page it is on just
> got overwritten by the writer, by doing:
> 
> 	prev_id = meta->data_index[this_page].cnt;
> 	smp_rmb();
> 	read event (copy it, whatever)
> 	smp_rmb();
> 	if (prev_id != meta->data_index[this_page].cnt)
> 		/* read data may be corrupted, abort it */

Couldn't the reader just check for the page commit field? rb_iter_head_event()
does something like this to check if the writer is on its page.

> 
> 
> Does this make sense?
> 
> -- Steve
  
Vincent Donnefort March 30, 2023, 2:48 p.m. UTC | #19
[...]

> > > struct ring_buffer_meta_page_header {
> > > #if __BITS_PER_LONG == 64
> > > 	__u64	entries;
> > > 	__u64	overrun;
> > > #else
> > > 	__u32	entries;
> > > 	__u32	overrun;
> > > #endif
> > > 	__u32	pages_touched;
> > > 	__u32	meta_page_size;
> > > 	__u32	reader_page;	/* page ID for the reader page */
> > > 	__u32	nr_data_pages;	/* doesn't take into account the reader_page */
> > > };
> > > 
> > > BTW, shouldn't the nr_data_pages take into account the reader page? As it
> > > is part of the array we traverse isn't it?  
> > 
> > It depends if the reader page has ever been swapped out. If yes, the reader
> > would have to start from reader_page and then switch to the data_pages.
> > Which sounds like a fiddly interface for the userspace.
> > 
> > So yeah, consuming-read only feels like a better start.
> > 
> 
> I agree. I'd like to get something in that can be extended, but simple
> enough that it's not too much of a barrier wrt getting the API correct.
> 
> -- Steve

Something I just realized though. In the event of being able to upstream the
hypervisor tracing based on the ring_buffer_meta_page, without non-consumming
support, we wouldn't have the "trace" file which is used to reset the buffers.

I'd guess we'd have to either create one that is read-only (a bit strange) or
let trace_pipe reset the buffer(s).
  
Steven Rostedt March 30, 2023, 3:21 p.m. UTC | #20
On Thu, 30 Mar 2023 11:30:51 +0100
Vincent Donnefort <vdonnefort@google.com> wrote:

> How about?
> 
> userspace:
> 
>   prev_read = meta->read;
>   ioctl(TRACE_MMAP_IOCTL_GET_READER_PAGE)
> 
> kernel:
>     ring_buffer_get_reader_page()
>       rb_get_reader_page(cpu_buffer);
>       cpu_buffer->reader_page->read = rb_page_size(reader);
>       meta->read = cpu_buffer->reader_page->read;
> 
> userspace:
>    /* if new page prev_read = 0 */
>    /* read between prev_read and meta->read */
> 
> If the writer does anything in-between, wouldn't rb_get_reader_page() handle it
> nicely by returning the same reader as more would be there to read?
> 
> It is similar to rb_advance_reader() except we'd be moving several events at
> once?

Yeah, I think that can work. So we just need to make sure that the meta
page has the "read" variable passed through.

-- Steve
  

Patch

diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
index 782e14f62201..4897e17ebdde 100644
--- a/include/linux/ring_buffer.h
+++ b/include/linux/ring_buffer.h
@@ -6,6 +6,8 @@ 
 #include <linux/seq_file.h>
 #include <linux/poll.h>
 
+#include <uapi/linux/trace_mmap.h>
+
 struct trace_buffer;
 struct ring_buffer_iter;
 
@@ -211,4 +213,10 @@  int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node);
 #define trace_rb_cpu_prepare	NULL
 #endif
 
+int ring_buffer_map(struct trace_buffer *buffer, int cpu);
+int ring_buffer_unmap(struct trace_buffer *buffer, int cpu);
+struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
+				   unsigned long pgoff);
+int ring_buffer_get_reader_page(struct trace_buffer *buffer, int cpu);
+int ring_buffer_update_meta_page(struct trace_buffer *buffer, int cpu);
 #endif /* _LINUX_RING_BUFFER_H */
diff --git a/include/uapi/linux/trace_mmap.h b/include/uapi/linux/trace_mmap.h
new file mode 100644
index 000000000000..7794314a80e9
--- /dev/null
+++ b/include/uapi/linux/trace_mmap.h
@@ -0,0 +1,25 @@ 
+/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
+#ifndef _UAPI_TRACE_MMAP_H_
+#define _UAPI_TRACE_MMAP_H_
+
+#include <asm/bitsperlong.h>
+
+#include <linux/types.h>
+
+struct ring_buffer_meta_page_header {
+#if __BITS_PER_LONG == 64
+	__u64	entries;
+	__u64	overrun;
+#else
+	__u32	entries;
+	__u32	overrun;
+#endif
+	__u32	pages_touched;
+	__u32	meta_page_size;
+	__u32	reader_page;	/* page ID for the reader page */
+	__u32	nr_data_pages;	/* doesn't take into account the reader_page */
+	__u32	data_page_head;	/* ring-buffer head as an offset from data_start */
+	__u32	data_start;	/* offset within the meta page */
+};
+
+#endif /* _UAPI_TRACE_MMAP_H_ */
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index af50d931b020..8b9e773fef2e 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -332,6 +332,7 @@  struct buffer_page {
 	local_t		 entries;	/* entries on this page */
 	unsigned long	 real_end;	/* real end of data */
 	struct buffer_data_page *page;	/* Actual data page */
+	u32		 id;		/* ID for external mapping */
 };
 
 /*
@@ -489,6 +490,11 @@  typedef struct rb_time_struct rb_time_t;
 
 #define MAX_NEST	5
 
+struct ring_buffer_meta_page {
+	struct ring_buffer_meta_page_header	hdr;
+	__u32					data_pages[];
+};
+
 /*
  * head_page == tail_page && head == tail then buffer is empty.
  */
@@ -529,6 +535,13 @@  struct ring_buffer_per_cpu {
 	rb_time_t			before_stamp;
 	u64				event_stamp[MAX_NEST];
 	u64				read_stamp;
+
+	int				mapped;
+	int				meta_order;
+	struct mutex			mapping_lock;
+	unsigned long			*page_ids;	/* ID to addr */
+	struct ring_buffer_meta_page	*meta_page;
+
 	/* ring buffer pages to update, > 0 to add, < 0 to remove */
 	long				nr_pages_to_update;
 	struct list_head		new_pages; /* new pages to add */
@@ -1452,12 +1465,38 @@  static inline void rb_inc_page(struct buffer_page **bpage)
 	*bpage = list_entry(p, struct buffer_page, list);
 }
 
+static inline void
+rb_meta_page_head_move(struct ring_buffer_per_cpu *cpu_buffer, unsigned long num)
+{
+	struct ring_buffer_meta_page *meta = cpu_buffer->meta_page;
+	unsigned long head_id;
+
+	if (!READ_ONCE(cpu_buffer->mapped))
+		return;
+
+	head_id = meta->hdr.data_page_head;
+	meta->hdr.data_page_head = (head_id + num) % cpu_buffer->nr_pages;
+}
+
+static inline void
+rb_meta_page_head_swap(struct ring_buffer_per_cpu *cpu_buffer)
+{
+	struct ring_buffer_meta_page *meta = cpu_buffer->meta_page;
+
+	if (!READ_ONCE(cpu_buffer->mapped))
+		return;
+
+	meta->hdr.reader_page = cpu_buffer->head_page->id;
+	meta->data_pages[meta->hdr.data_page_head] = cpu_buffer->reader_page->id;
+}
+
 static struct buffer_page *
 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
 {
 	struct buffer_page *head;
 	struct buffer_page *page;
 	struct list_head *list;
+	unsigned long cnt = 0;
 	int i;
 
 	if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
@@ -1479,9 +1518,12 @@  rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
 		do {
 			if (rb_is_head_page(page, page->list.prev)) {
 				cpu_buffer->head_page = page;
+				rb_meta_page_head_move(cpu_buffer, cnt);
+
 				return page;
 			}
 			rb_inc_page(&page);
+			cnt++;
 		} while (page != head);
 	}
 
@@ -1567,6 +1609,13 @@  static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
 		/* Again, either we update tail_page or an interrupt does */
 		(void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page);
 	}
+
+	if (READ_ONCE(cpu_buffer->mapped)) {
+		/* Ensure the meta_page is ready */
+		smp_rmb();
+		WRITE_ONCE(cpu_buffer->meta_page->hdr.pages_touched,
+			   local_read(&cpu_buffer->pages_touched));
+	}
 }
 
 static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
@@ -1735,6 +1784,7 @@  rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
 	init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
 	init_waitqueue_head(&cpu_buffer->irq_work.waiters);
 	init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
+	mutex_init(&cpu_buffer->mapping_lock);
 
 	bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
 			    GFP_KERNEL, cpu_to_node(cpu));
@@ -2173,7 +2223,6 @@  int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size,
 	/* prevent another thread from changing buffer sizes */
 	mutex_lock(&buffer->mutex);
 
-
 	if (cpu_id == RING_BUFFER_ALL_CPUS) {
 		/*
 		 * Don't succeed if resizing is disabled, as a reader might be
@@ -2523,6 +2572,13 @@  rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
 		local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
 		local_inc(&cpu_buffer->pages_lost);
 
+		if (READ_ONCE(cpu_buffer->mapped)) {
+			/* Ensure the meta_page is ready */
+			smp_rmb();
+			WRITE_ONCE(cpu_buffer->meta_page->hdr.overrun,
+				   local_read(&cpu_buffer->overrun));
+		}
+
 		/*
 		 * The entries will be zeroed out when we move the
 		 * tail page.
@@ -3179,6 +3235,14 @@  static inline void rb_event_discard(struct ring_buffer_event *event)
 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer)
 {
 	local_inc(&cpu_buffer->entries);
+
+	if (READ_ONCE(cpu_buffer->mapped)) {
+		/* Ensure the meta_page is ready */
+		smp_rmb();
+		WRITE_ONCE(cpu_buffer->meta_page->hdr.entries,
+			   local_read(&cpu_buffer->entries));
+	}
+
 	rb_end_commit(cpu_buffer);
 }
 
@@ -3482,7 +3546,7 @@  static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer,
 		return;
 
 	/*
-	 * If this interrupted another event, 
+	 * If this interrupted another event,
 	 */
 	if (atomic_inc_return(this_cpu_ptr(&checking)) != 1)
 		goto out;
@@ -4643,7 +4707,9 @@  rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
 	 * Now make the new head point back to the reader page.
 	 */
 	rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
+	rb_meta_page_head_swap(cpu_buffer);
 	rb_inc_page(&cpu_buffer->head_page);
+	rb_meta_page_head_move(cpu_buffer, 1);
 
 	local_inc(&cpu_buffer->pages_read);
 
@@ -5285,6 +5351,12 @@  rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
 	cpu_buffer->lost_events = 0;
 	cpu_buffer->last_overrun = 0;
 
+	if (READ_ONCE(cpu_buffer->mapped)) {
+		WRITE_ONCE(cpu_buffer->meta_page->hdr.entries, 0);
+		WRITE_ONCE(cpu_buffer->meta_page->hdr.pages_touched, 0);
+		WRITE_ONCE(cpu_buffer->meta_page->hdr.overrun, 0);
+	}
+
 	rb_head_page_activate(cpu_buffer);
 }
 
@@ -5489,6 +5561,11 @@  int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
 	cpu_buffer_a = buffer_a->buffers[cpu];
 	cpu_buffer_b = buffer_b->buffers[cpu];
 
+	if (READ_ONCE(cpu_buffer_a->mapped) || READ_ONCE(cpu_buffer_b->mapped)) {
+		ret = -EBUSY;
+		goto out;
+	}
+
 	/* At least make sure the two buffers are somewhat the same */
 	if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
 		goto out;
@@ -5722,7 +5799,8 @@  int ring_buffer_read_page(struct trace_buffer *buffer,
 	 * Otherwise, we can simply swap the page with the one passed in.
 	 */
 	if (read || (len < (commit - read)) ||
-	    cpu_buffer->reader_page == cpu_buffer->commit_page) {
+	    cpu_buffer->reader_page == cpu_buffer->commit_page ||
+	    READ_ONCE(cpu_buffer->mapped)) {
 		struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
 		unsigned int rpos = read;
 		unsigned int pos = 0;
@@ -5839,6 +5917,306 @@  int ring_buffer_read_page(struct trace_buffer *buffer,
 }
 EXPORT_SYMBOL_GPL(ring_buffer_read_page);
 
+#define META_PAGE_MAX_PAGES \
+	((PAGE_SIZE - (offsetof(struct ring_buffer_meta_page, data_pages))) >> 2)
+
+static void unmap_page(unsigned long addr)
+{
+	struct page *page = virt_to_page(addr);
+
+	page->mapping = NULL;
+}
+
+static void rb_free_page_ids(struct ring_buffer_per_cpu *cpu_buffer)
+{
+	int i;
+
+	for (i = 0; i < cpu_buffer->nr_pages; i++)
+		unmap_page(cpu_buffer->page_ids[i]);
+
+	kfree(cpu_buffer->page_ids);
+	cpu_buffer->page_ids = NULL;
+}
+
+static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+	struct page *meta_pages;
+	int pages;
+	int order = 0;
+
+	if (cpu_buffer->meta_page)
+		return 0;
+
+	if (cpu_buffer->nr_pages > META_PAGE_MAX_PAGES) {
+		/* Calculate how many more pages we need to hold indexes */
+		pages = DIV_ROUND_UP(cpu_buffer->nr_pages - META_PAGE_MAX_PAGES,
+				     PAGE_SIZE / sizeof(u32));
+		/* Add back the meta_page itself */
+		pages++;
+		order = fls(pages) - 1;
+	}
+	meta_pages = alloc_pages(GFP_USER, order);
+	if (!meta_pages)
+		return -ENOMEM;
+
+	cpu_buffer->meta_page = page_to_virt(meta_pages);
+	cpu_buffer->meta_order = order;
+
+	return 0;
+}
+
+static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+	unsigned long addr = (unsigned long)cpu_buffer->meta_page;
+	int i;
+
+	for (i = 0; i < (1 << cpu_buffer->meta_order); i++) {
+		unmap_page(addr);
+		addr += PAGE_SIZE;
+	}
+
+	free_pages((unsigned long)cpu_buffer->meta_page, cpu_buffer->meta_order);
+	cpu_buffer->meta_page = NULL;
+}
+
+static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer,
+				   unsigned long *page_ids)
+{
+	struct ring_buffer_meta_page *meta = cpu_buffer->meta_page;
+	struct buffer_page *first_page, *bpage;
+	int data_page_end;
+	int id = 0;
+
+	page_ids[id] = (unsigned long)cpu_buffer->reader_page->page;
+	cpu_buffer->reader_page->id = id++;
+
+	/* Calculate the last index of data_pages[] */
+	data_page_end = (1 << (cpu_buffer->meta_order + PAGE_SHIFT)) -
+		offsetof(struct ring_buffer_meta_page, data_pages);
+	data_page_end /= sizeof(u32);
+
+	first_page = bpage = rb_set_head_page(cpu_buffer);
+	do {
+		if (id > data_page_end) {
+			WARN_ON(1);
+			break;
+		}
+
+		page_ids[id] = (unsigned long)bpage->page;
+		bpage->id = id;
+		meta->data_pages[id - 1] = id;
+
+		rb_inc_page(&bpage);
+		id++;
+	} while (bpage != first_page);
+
+	/* install page ID to kern VA translation */
+	cpu_buffer->page_ids = page_ids;
+
+	meta->hdr.entries = 0;
+	meta->hdr.overrun = 0;
+	meta->hdr.pages_touched = 0;
+	meta->hdr.reader_page = cpu_buffer->reader_page->id;
+	meta->hdr.nr_data_pages = cpu_buffer->nr_pages;
+	meta->hdr.meta_page_size = 1 << (cpu_buffer->meta_order + PAGE_SHIFT);
+	meta->hdr.data_page_head = 0;
+	meta->hdr.data_start = offsetof(struct ring_buffer_meta_page, data_pages);
+}
+
+static inline struct ring_buffer_per_cpu *
+rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu)
+{
+	struct ring_buffer_per_cpu *cpu_buffer;
+
+	if (!cpumask_test_cpu(cpu, buffer->cpumask))
+		return ERR_PTR(-EINVAL);
+
+	cpu_buffer = buffer->buffers[cpu];
+
+	mutex_lock(&cpu_buffer->mapping_lock);
+
+	if (!cpu_buffer->mapped) {
+		mutex_unlock(&cpu_buffer->mapping_lock);
+		return ERR_PTR(-ENODEV);
+	}
+
+	return cpu_buffer;
+}
+
+static inline void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer)
+{
+	mutex_unlock(&cpu_buffer->mapping_lock);
+}
+
+int ring_buffer_map(struct trace_buffer *buffer, int cpu)
+{
+	struct ring_buffer_per_cpu *cpu_buffer;
+	unsigned long flags, *page_ids;
+	int err = 0;
+
+	if (!cpumask_test_cpu(cpu, buffer->cpumask))
+		return -EINVAL;
+
+	cpu_buffer = buffer->buffers[cpu];
+
+	mutex_lock(&cpu_buffer->mapping_lock);
+
+	if (cpu_buffer->mapped) {
+		WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped + 1);
+		goto unlock;
+	}
+
+	/* prevent another thread from changing buffer sizes */
+	mutex_lock(&buffer->mutex);
+	atomic_inc(&cpu_buffer->resize_disabled);
+	mutex_unlock(&buffer->mutex);
+
+	err = rb_alloc_meta_page(cpu_buffer);
+	if (err) {
+		atomic_dec(&cpu_buffer->resize_disabled);
+		goto unlock;
+	}
+
+	/* page_ids include the reader page while nr_pages does not */
+	page_ids = kzalloc(sizeof(*page_ids) * (cpu_buffer->nr_pages + 1),
+			   GFP_KERNEL);
+	if (!page_ids) {
+		rb_free_meta_page(cpu_buffer);
+		atomic_dec(&cpu_buffer->resize_disabled);
+		err = -ENOMEM;
+		goto unlock;
+	}
+
+	/*
+	 * Lock all readers to block any page swap until the page IDs are
+	 * assigned.
+	 */
+	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+
+	rb_setup_ids_meta_page(cpu_buffer, page_ids);
+	/*
+	 * Ensure the writer will observe the meta-page before
+	 * cpu_buffer->mapped.
+	 */
+	smp_wmb();
+	WRITE_ONCE(cpu_buffer->mapped, 1);
+
+	/* Init meta_page values unless the writer did it already */
+	cmpxchg(&cpu_buffer->meta_page->hdr.entries, 0,
+		local_read(&cpu_buffer->entries));
+	cmpxchg(&cpu_buffer->meta_page->hdr.overrun, 0,
+		local_read(&cpu_buffer->overrun));
+	cmpxchg(&cpu_buffer->meta_page->hdr.pages_touched, 0,
+		local_read(&cpu_buffer->pages_touched));
+
+	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+unlock:
+	mutex_unlock(&cpu_buffer->mapping_lock);
+
+	return err;
+}
+
+int ring_buffer_unmap(struct trace_buffer *buffer, int cpu)
+{
+	struct ring_buffer_per_cpu *cpu_buffer;
+	int err = 0;
+
+	if (!cpumask_test_cpu(cpu, buffer->cpumask))
+		return -EINVAL;
+
+	cpu_buffer = buffer->buffers[cpu];
+
+	mutex_lock(&cpu_buffer->mapping_lock);
+
+	if (!cpu_buffer->mapped) {
+		err = -ENODEV;
+		goto unlock;
+	}
+
+	WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped - 1);
+	if (!cpu_buffer->mapped) {
+		/* Wait the writer and readers to observe !mapped */
+		synchronize_rcu();
+
+		rb_free_page_ids(cpu_buffer);
+		rb_free_meta_page(cpu_buffer);
+		atomic_dec(&cpu_buffer->resize_disabled);
+	}
+
+unlock:
+	mutex_unlock(&cpu_buffer->mapping_lock);
+
+	return err;
+}
+
+/*
+ *   +--------------+
+ *   |   meta page  |  pgoff=0
+ *   |     ...      |
+ *   |              |  pgoff=(1<<cpu_buffer->meta_order - 1)
+ *   +--------------+
+ *   |  data page1  |  page_ids=0
+ *   +--------------+
+ *   |  data page2  |  page_ids=1
+ *         ...
+ */
+struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
+				   unsigned long pgoff)
+{
+	struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
+
+	if (pgoff < (1 << cpu_buffer->meta_order))
+		return virt_to_page((void *)cpu_buffer->meta_page + (pgoff << PAGE_SHIFT));
+
+	pgoff -= (1 << cpu_buffer->meta_order);
+
+	if (pgoff > cpu_buffer->nr_pages)
+		return NULL;
+
+	return virt_to_page(cpu_buffer->page_ids[pgoff]);
+}
+
+int ring_buffer_get_reader_page(struct trace_buffer *buffer, int cpu)
+{
+	struct ring_buffer_per_cpu *cpu_buffer;
+	struct buffer_page *reader;
+	unsigned long flags;
+
+	cpu_buffer = rb_get_mapped_buffer(buffer, cpu);
+	if (IS_ERR(cpu_buffer))
+		return (int)PTR_ERR(cpu_buffer);
+
+	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+	reader = cpu_buffer->reader_page;
+	reader->read = rb_page_size(reader);
+	if (!rb_per_cpu_empty(cpu_buffer))
+		WARN_ON(!rb_get_reader_page(cpu_buffer));
+	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+
+	rb_put_mapped_buffer(cpu_buffer);
+
+	return 0;
+}
+
+int ring_buffer_update_meta_page(struct trace_buffer *buffer, int cpu)
+{
+	struct ring_buffer_per_cpu *cpu_buffer;
+	unsigned long flags;
+
+	cpu_buffer = rb_get_mapped_buffer(buffer, cpu);
+	if (IS_ERR(cpu_buffer))
+		return PTR_ERR(cpu_buffer);
+
+	/* Update the head page if the writer moved it */
+	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+	rb_set_head_page(cpu_buffer);
+	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+
+	rb_put_mapped_buffer(cpu_buffer);
+
+	return 0;
+}
+
 /*
  * We only allocate new buffers, never free them if the CPU goes down.
  * If we were to free the buffer, then the user would lose any trace that was in