[printk,v3,04/14] printk: ringbuffer: Do not skip non-finalized records with prb_next_seq()

Message ID 20231214214201.499426-5-john.ogness@linutronix.de
State New
Headers
Series fix console flushing |

Commit Message

John Ogness Dec. 14, 2023, 9:41 p.m. UTC
  Commit f244b4dc53e5 ("printk: ringbuffer: Improve
prb_next_seq() performance") introduced an optimization for
prb_next_seq() by using best-effort to track recently finalized
records. However, the order of finalization does not
necessarily match the order of the records. The optimization
changed prb_next_seq() to return inconsistent results, possibly
yielding sequence numbers that are not available to readers
because they are preceded by non-finalized records or they are
not yet visible to the reader CPU.

Rather than simply best-effort tracking recently finalized
records, force the committing writer to read records and
increment the last "contiguous block" of finalized records. In
order to do this, the sequence number instead of ID must be
stored because ID's cannot be directly compared.

A new memory barrier pair is introduced to guarantee that a
reader can always read the records up until the sequence number
returned by prb_next_seq() (unless the records have since
been overwritten in the ringbuffer).

This restores the original functionality of prb_next_seq()
while also keeping the optimization.

For 32bit systems, only the lower 32 bits of the sequence
number are stored. When reading the value, it is expanded to
the full 64bit sequence number using the 32bit seq macros,
which fold in the value returned by prb_first_seq().

Fixes: f244b4dc53e5 ("printk: ringbuffer: Improve prb_next_seq() performance")
Signed-off-by: John Ogness <john.ogness@linutronix.de>
---
 kernel/printk/printk_ringbuffer.c | 164 +++++++++++++++++++++++-------
 kernel/printk/printk_ringbuffer.h |   4 +-
 2 files changed, 127 insertions(+), 41 deletions(-)
  

Comments

Petr Mladek Jan. 12, 2024, 6:05 p.m. UTC | #1
On Thu 2023-12-14 22:47:51, John Ogness wrote:
> Commit f244b4dc53e5 ("printk: ringbuffer: Improve
> prb_next_seq() performance") introduced an optimization for
> prb_next_seq() by using best-effort to track recently finalized
> records. However, the order of finalization does not
> necessarily match the order of the records. The optimization
> changed prb_next_seq() to return inconsistent results, possibly
> yielding sequence numbers that are not available to readers
> because they are preceded by non-finalized records or they are
> not yet visible to the reader CPU.
> 
> Rather than simply best-effort tracking recently finalized
> records, force the committing writer to read records and
> increment the last "contiguous block" of finalized records. In
> order to do this, the sequence number instead of ID must be
> stored because ID's cannot be directly compared.
> 
> A new memory barrier pair is introduced to guarantee that a
> reader can always read the records up until the sequence number
> returned by prb_next_seq() (unless the records have since
> been overwritten in the ringbuffer).
> 
> This restores the original functionality of prb_next_seq()
> while also keeping the optimization.
> 
> For 32bit systems, only the lower 32 bits of the sequence
> number are stored. When reading the value, it is expanded to
> the full 64bit sequence number using the 32bit seq macros,
> which fold in the value returned by prb_first_seq().
> 
> @@ -1441,20 +1445,118 @@ bool prb_reserve_in_last(struct prb_reserved_entry *e, struct printk_ringbuffer
>  	return false;
>  }
>  
> +/*
> + * @last_finalized_seq value guarantees that all records up to and including
> + * this sequence number are finalized and can be read. The only exception are
> + * too old records which have already been overwritten.
> + *
> + * It is also guaranteed that @last_finalized_seq only increases.
> + *
> + * Be aware that finalized records following non-finalized records are not
> + * reported because they are not yet available to the reader. For example,
> + * a new record stored via printk() will not be available to a printer if
> + * it follows a record that has not been finalized yet. However, once that
> + * non-finalized record becomes finalized, @last_finalized_seq will be
> + * appropriately updated and the full set of finalized records will be
> + * available to the printer. And since each printk() caller will either
> + * directly print or trigger deferred printing of all available unprinted
> + * records, all printk() messages will get printed.
> + */
> +static u64 desc_last_finalized_seq(struct printk_ringbuffer *rb)
> +{
> +	struct prb_desc_ring *desc_ring = &rb->desc_ring;
> +	unsigned long ulseq;
> +
> +	/*
> +	 * Guarantee the sequence number is loaded before loading the
> +	 * associated record in order to guarantee that the record can be
> +	 * seen by this CPU. This pairs with desc_update_last_finalized:A.
> +	 */
> +	ulseq = atomic_long_read_acquire(&desc_ring->last_finalized_seq
> +					); /* LMM(desc_last_finalized_seq:A) */
> +
> +	return __ulseq_to_u64seq(rb, ulseq);
> +}
> +
> +static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
> +			    struct printk_record *r, unsigned int *line_count);
> +
> +/*
> + * Check if there are records directly following @last_finalized_seq that are
> + * finalized. If so, update @last_finalized_seq to the latest of these
> + * records. It is not allowed to skip over records that are not yet finalized.
> + */
> +static void desc_update_last_finalized(struct printk_ringbuffer *rb)
> +{
> +	struct prb_desc_ring *desc_ring = &rb->desc_ring;
> +	u64 old_seq = desc_last_finalized_seq(rb);
> +	unsigned long oldval;
> +	unsigned long newval;
> +	u64 finalized_seq;
> +	u64 try_seq;
> +
> +try_again:
> +	finalized_seq = old_seq;
> +	try_seq = finalized_seq + 1;
> +
> +	/* Try to find later finalized records. */
> +	while (_prb_read_valid(rb, &try_seq, NULL, NULL)) {
> +		finalized_seq = try_seq;
> +		try_seq++;
> +	}
> +
> +	/* No update needed if no later finalized record was found. */
> +	if (finalized_seq == old_seq)
> +		return;
> +
> +	oldval = __u64seq_to_ulseq(old_seq);
> +	newval = __u64seq_to_ulseq(finalized_seq);
> +
> +	/*
> +	 * Set the sequence number of a later finalized record that has been
> +	 * seen.
> +	 *
> +	 * Guarantee the record data is visible to other CPUs before storing
> +	 * its sequence number. This pairs with desc_last_finalized_seq:A.
> +	 *
> +	 * Memory barrier involvement:
> +	 *
> +	 * If desc_last_finalized_seq:A reads from
> +	 * desc_update_last_finalized:A, then desc_read:A reads from
> +	 * _prb_commit:B.
> +	 *
> +	 * Relies on:
> +	 *
> +	 * RELEASE from _prb_commit:B to desc_update_last_finalized:A
> +	 *    matching
> +	 * ACQUIRE from desc_last_finalized_seq:A to desc_read:A
> +	 *
> +	 * Note: _prb_commit:B and desc_update_last_finalized:A can be
> +	 *       different CPUs. However, the desc_update_last_finalized:A
> +	 *       CPU (which performs the release) must have previously seen
> +	 *       _prb_commit:B.
> +	 */
> +	if (!atomic_long_try_cmpxchg_release(&desc_ring->last_finalized_seq,
> +				&oldval, newval)) { /* LMM(desc_update_last_finalized:A) */
> +		old_seq = __ulseq_to_u64seq(rb, oldval);
> +		goto try_again;
> +	}
> +}
> +
>  /*
>   * Attempt to finalize a specified descriptor. If this fails, the descriptor
>   * is either already final or it will finalize itself when the writer commits.
>   */
> -static void desc_make_final(struct prb_desc_ring *desc_ring, unsigned long id)
> +static void desc_make_final(struct printk_ringbuffer *rb, unsigned long id)
>  {
> +	struct prb_desc_ring *desc_ring = &rb->desc_ring;
>  	unsigned long prev_state_val = DESC_SV(id, desc_committed);
>  	struct prb_desc *d = to_desc(desc_ring, id);
>  
> -	atomic_long_cmpxchg_relaxed(&d->state_var, prev_state_val,
> -			DESC_SV(id, desc_finalized)); /* LMM(desc_make_final:A) */
> -
> -	/* Best effort to remember the last finalized @id. */
> -	atomic_long_set(&desc_ring->last_finalized_id, id);
> +	if (atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val,
> +			DESC_SV(id, desc_finalized))) { /* LMM(desc_make_final:A) */
> +		desc_update_last_finalized(rb);
> +	}
>  }
> @@ -2008,7 +2107,9 @@ u64 prb_first_valid_seq(struct printk_ringbuffer *rb)
>   * newest sequence number available to readers will be.
>   *
>   * This provides readers a sequence number to jump to if all currently
> - * available records should be skipped.
> + * available records should be skipped. It is guaranteed that all records
> + * previous to the returned value have been finalized and are (or were)
> + * available to the reader.
>   *
>   * Context: Any context.
>   * Return: The sequence number of the next newest (not yet available) record
> @@ -2016,34 +2117,19 @@ u64 prb_first_valid_seq(struct printk_ringbuffer *rb)
>   */
>  u64 prb_next_seq(struct printk_ringbuffer *rb)
>  {
> -	struct prb_desc_ring *desc_ring = &rb->desc_ring;
> -	enum desc_state d_state;
> -	unsigned long id;
>  	u64 seq;
>  
> -	/* Check if the cached @id still points to a valid @seq. */
> -	id = atomic_long_read(&desc_ring->last_finalized_id);
> -	d_state = desc_read(desc_ring, id, NULL, &seq, NULL);
> +	seq = desc_last_finalized_seq(rb);

desc_last_finalized_seq() does internally:

	ulseq = atomic_long_read_acquire(&desc_ring->last_finalized_seq
					); /* LMM(desc_last_finalized_seq:A) */


It guarantees that this CPU would see the data which were seen by the
CPU which updated desc_ring->last_finalized_seq.

So far so good.

The problem is that I somehow miss the counter part. Maybe,
it is not needed. It just looks strange.

> -	if (d_state == desc_finalized || d_state == desc_reusable) {
> -		/*
> -		 * Begin searching after the last finalized record.
> -		 *
> -		 * On 0, the search must begin at 0 because of hack#2
> -		 * of the bootstrapping phase it is not known if a
> -		 * record at index 0 exists.
> -		 */
> -		if (seq != 0)
> -			seq++;
> -	} else {
> -		/*
> -		 * The information about the last finalized sequence number
> -		 * has gone. It should happen only when there is a flood of
> -		 * new messages and the ringbuffer is rapidly recycled.
> -		 * Give up and start from the beginning.
> -		 */
> -		seq = 0;
> -	}
> +	/*
> +	 * Begin searching after the last finalized record.
> +	 *
> +	 * On 0, the search must begin at 0 because of hack#2
> +	 * of the bootstrapping phase it is not known if a
> +	 * record at index 0 exists.
> +	 */
> +	if (seq != 0)
> +		seq++;
>  
>  	/*
>  	 * The information about the last finalized @seq might be inaccurate.

Below is the code:

	while (_prb_read_valid(rb, &seq, NULL, NULL))
		seq++;

Maybe, the release() should be here to make sure that the CPU which
would see this "seq" would also the data.

Would it make sense, please?

Otherwise, it looks good.

Best Regaards,
Petr
  
John Ogness Jan. 15, 2024, 11:55 a.m. UTC | #2
On 2024-01-12, Petr Mladek <pmladek@suse.com> wrote:
>>  u64 prb_next_seq(struct printk_ringbuffer *rb)
>>  {
>> -	struct prb_desc_ring *desc_ring = &rb->desc_ring;
>> -	enum desc_state d_state;
>> -	unsigned long id;
>>  	u64 seq;
>>  
>> -	/* Check if the cached @id still points to a valid @seq. */
>> -	id = atomic_long_read(&desc_ring->last_finalized_id);
>> -	d_state = desc_read(desc_ring, id, NULL, &seq, NULL);
>> +	seq = desc_last_finalized_seq(rb);
>
> desc_last_finalized_seq() does internally:
>
> 	ulseq = atomic_long_read_acquire(&desc_ring->last_finalized_seq
> 					); /* LMM(desc_last_finalized_seq:A) */
>
>
> It guarantees that this CPU would see the data which were seen by the
> CPU which updated desc_ring->last_finalized_seq.
>
> So far so good.
>
> The problem is that I somehow miss the counter part. Maybe,
> it is not needed. It just looks strange.

As the comments in desc_last_finalized_seq() state: "This pairs with
desc_update_last_finalized:A."

desc_update_last_finalized() successfully reads a record and then uses a
cmpxchg_release() to set the new @last_finalized_seq value (of the
record it just read). That is the counter part.

>> -	if (d_state == desc_finalized || d_state == desc_reusable) {
>> -		/*
>> -		 * Begin searching after the last finalized record.
>> -		 *
>> -		 * On 0, the search must begin at 0 because of hack#2
>> -		 * of the bootstrapping phase it is not known if a
>> -		 * record at index 0 exists.
>> -		 */
>> -		if (seq != 0)
>> -			seq++;
>> -	} else {
>> -		/*
>> -		 * The information about the last finalized sequence number
>> -		 * has gone. It should happen only when there is a flood of
>> -		 * new messages and the ringbuffer is rapidly recycled.
>> -		 * Give up and start from the beginning.
>> -		 */
>> -		seq = 0;
>> -	}
>> +	/*
>> +	 * Begin searching after the last finalized record.
>> +	 *
>> +	 * On 0, the search must begin at 0 because of hack#2
>> +	 * of the bootstrapping phase it is not known if a
>> +	 * record at index 0 exists.
>> +	 */
>> +	if (seq != 0)
>> +		seq++;
>>  
>>  	/*
>>  	 * The information about the last finalized @seq might be inaccurate.
>
> Below is the code:
>
> 	while (_prb_read_valid(rb, &seq, NULL, NULL))
> 		seq++;
>
> Maybe, the release() should be here to make sure that the CPU which
> would see this "seq" would also the data.

The acquire is with @last_finalized_seq. So the release must also be
with @last_finalized_seq. The important thing is that the CPU that
updates @last_finalized_seq has actually read the corresponding record
beforehand. That is exactly what desc_update_last_finalized() does.

John
  
Petr Mladek Jan. 15, 2024, 5 p.m. UTC | #3
On Mon 2024-01-15 13:01:36, John Ogness wrote:
> On 2024-01-12, Petr Mladek <pmladek@suse.com> wrote:
> >>  u64 prb_next_seq(struct printk_ringbuffer *rb)
> >>  {
> >> -	struct prb_desc_ring *desc_ring = &rb->desc_ring;
> >> -	enum desc_state d_state;
> >> -	unsigned long id;
> >>  	u64 seq;
> >>  
> >> -	/* Check if the cached @id still points to a valid @seq. */
> >> -	id = atomic_long_read(&desc_ring->last_finalized_id);
> >> -	d_state = desc_read(desc_ring, id, NULL, &seq, NULL);
> >> +	seq = desc_last_finalized_seq(rb);
> >
> > desc_last_finalized_seq() does internally:
> >
> > 	ulseq = atomic_long_read_acquire(&desc_ring->last_finalized_seq
> > 					); /* LMM(desc_last_finalized_seq:A) */
> >
> >
> > It guarantees that this CPU would see the data which were seen by the
> > CPU which updated desc_ring->last_finalized_seq.
> >
> > So far so good.
> >
> > The problem is that I somehow miss the counter part. Maybe,
> > it is not needed. It just looks strange.
> 
> As the comments in desc_last_finalized_seq() state: "This pairs with
> desc_update_last_finalized:A."
> 
> desc_update_last_finalized() successfully reads a record and then uses a
> cmpxchg_release() to set the new @last_finalized_seq value (of the
> record it just read). That is the counter part.
> 
> >> -	if (d_state == desc_finalized || d_state == desc_reusable) {
> >> -		/*
> >> -		 * Begin searching after the last finalized record.
> >> -		 *
> >> -		 * On 0, the search must begin at 0 because of hack#2
> >> -		 * of the bootstrapping phase it is not known if a
> >> -		 * record at index 0 exists.
> >> -		 */
> >> -		if (seq != 0)
> >> -			seq++;
> >> -	} else {
> >> -		/*
> >> -		 * The information about the last finalized sequence number
> >> -		 * has gone. It should happen only when there is a flood of
> >> -		 * new messages and the ringbuffer is rapidly recycled.
> >> -		 * Give up and start from the beginning.
> >> -		 */
> >> -		seq = 0;
> >> -	}
> >> +	/*
> >> +	 * Begin searching after the last finalized record.
> >> +	 *
> >> +	 * On 0, the search must begin at 0 because of hack#2
> >> +	 * of the bootstrapping phase it is not known if a
> >> +	 * record at index 0 exists.
> >> +	 */
> >> +	if (seq != 0)
> >> +		seq++;
> >>  
> >>  	/*
> >>  	 * The information about the last finalized @seq might be inaccurate.
> >
> > Below is the code:
> >
> > 	while (_prb_read_valid(rb, &seq, NULL, NULL))
> > 		seq++;
> >
> > Maybe, the release() should be here to make sure that the CPU which
> > would see this "seq" would also the data.
> 
> The acquire is with @last_finalized_seq. So the release must also be
> with @last_finalized_seq. The important thing is that the CPU that
> updates @last_finalized_seq has actually read the corresponding record
> beforehand. That is exactly what desc_update_last_finalized() does.

I probably did not describe it well. The CPU updating @last_finalized_seq
does the right thing. I was not sure about the CPU which reads
@last_finalized_seq via prb_next_seq().

To make it more clear:

u64 prb_next_seq(struct printk_ringbuffer *rb)
{
	u64 seq;

	seq = desc_last_finalized_seq(rb);
	      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
	      |
	      `-> This includes atomic_long_read_acquire(last_finalized_seq)


	if (seq != 0)
		seq++;

	while (_prb_read_valid(rb, &seq, NULL, NULL))
		seq++;

	return seq;
}

But where is the atomic_long_read_release(last_finalized_seq) in
this code path?

IMHO, the barrier provided by the acquire() is _important_ to make sure
that _prb_read_valid() would see the valid descriptor.

Now, I think that the related read_release(seq) is hidden in:

static int prb_read(struct printk_ringbuffer *rb, u64 seq,
		    struct printk_record *r, unsigned int *line_count)
{
	/* Get a local copy of the correct descriptor (if available). */
	err = desc_read_finalized_seq(desc_ring, id, seq, &desc);

	/* If requested, copy meta data. */
	if (r->info)
		memcpy(r->info, info, sizeof(*(r->info)));

	/* Copy text data. If it fails, this is a data-less record. */
	if (!copy_data(&rb->text_data_ring, &desc.text_blk_lpos, info->text_len,
		       r->text_buf, r->text_buf_size, line_count)) {
		return -ENOENT;
	}

	/* Ensure the record is still finalized and has the same @seq. */
	return desc_read_finalized_seq(desc_ring, id, seq, &desc);
	       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
	       |
	       `-> This includes a memory barrier /* LMM(desc_read:A) */
		   which makes sure that the data are read before
		   the desc/data could be reused.
}

I consider this /* LMM(desc_read:A) */ as a counter part for that
acquire() in prb_next_seq().


Summary:

I saw atomic_long_read_acquire(last_finalized_seq) called from
prb_next_seq() code path. The barrier looked important to me.
But I saw neither the counter-part nor any comment. I wanted
to understand it because it might be important for reviewing
following patches which depend on prb_next_seq().

Does it make sense now, please?

Best Regards,
Petr
  
John Ogness Feb. 5, 2024, 11:33 a.m. UTC | #4
On 2024-01-15, Petr Mladek <pmladek@suse.com> wrote:
>> The acquire is with @last_finalized_seq. So the release must also be
>> with @last_finalized_seq. The important thing is that the CPU that
>> updates @last_finalized_seq has actually read the corresponding
>> record beforehand. That is exactly what desc_update_last_finalized()
>> does.
>
> I probably did not describe it well. The CPU updating
> @last_finalized_seq does the right thing. I was not sure about the CPU
> which reads @last_finalized_seq via prb_next_seq().
>
> To make it more clear:
>
> u64 prb_next_seq(struct printk_ringbuffer *rb)
> {
> 	u64 seq;
>
> 	seq = desc_last_finalized_seq(rb);
> 	      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
> 	      |
> 	      `-> This includes atomic_long_read_acquire(last_finalized_seq)
>
>
> 	if (seq != 0)
> 		seq++;
>
> 	while (_prb_read_valid(rb, &seq, NULL, NULL))
> 		seq++;
>
> 	return seq;
> }
>
> But where is the atomic_long_read_release(last_finalized_seq) in
> this code path?

read_release? The counterpart of this load_acquire is a
store_release. For example:

CPU0                     CPU1
====                     ====
load(varA)
store_release(varB)      load_acquire(varB)
                         load(varA)

If CPU1 reads the value in varB that CPU0 stored, then it is guaranteed
that CPU1 will read the value (or a later value) in varA that CPU0 read.

Translating the above example to this particular patch, we have:

CPU0: desc_update_last_finalized()       CPU1: prb_next_seq()
====                                     ====
_prb_read_valid(seq)
cmpxchg_release(last_finalized_seq,seq)  seq=read_acquire(last_finalized_seq)
                                         _prb_read_valid(seq)

> IMHO, the barrier provided by the acquire() is _important_ to make
> sure that _prb_read_valid() would see the valid descriptor.

Correct.

> Now, I think that the related read_release(seq) is hidden in:
>
> static int prb_read(struct printk_ringbuffer *rb, u64 seq,
> 		    struct printk_record *r, unsigned int *line_count)
> {
> 	/* Get a local copy of the correct descriptor (if available). */
> 	err = desc_read_finalized_seq(desc_ring, id, seq, &desc);
>
> 	/* If requested, copy meta data. */
> 	if (r->info)
> 		memcpy(r->info, info, sizeof(*(r->info)));
>
> 	/* Copy text data. If it fails, this is a data-less record. */
> 	if (!copy_data(&rb->text_data_ring, &desc.text_blk_lpos, info->text_len,
> 		       r->text_buf, r->text_buf_size, line_count)) {
> 		return -ENOENT;
> 	}
>
> 	/* Ensure the record is still finalized and has the same @seq. */
> 	return desc_read_finalized_seq(desc_ring, id, seq, &desc);
> 	       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
> 	       |
> 	       `-> This includes a memory barrier /* LMM(desc_read:A) */
> 		   which makes sure that the data are read before
> 		   the desc/data could be reused.
> }
>
> I consider this /* LMM(desc_read:A) */ as a counter part for that
> acquire() in prb_next_seq().

desc_read:A is not a memory barrier. It only marks the load of the
descriptor state. This is a significant load because prb_next_seq() must
see at least the descriptor state that desc_update_last_finalized() saw.

The memory barrier comments in desc_update_last_finalized() state:

    * If desc_last_finalized_seq:A reads from
    * desc_update_last_finalized:A, then desc_read:A reads from
    * _prb_commit:B.

This is referring to a slightly different situation than the example I
used above because it is referencing where the descriptor state was
stored (_prb_commit:B). The same general picture is valid:

CPU0                              CPU1
====                              ====
_prb_commit:B
desc_update_last_finalized:A      desc_last_finalized_seq:A
                                  desc_read:A

desc_read:A is loding the descriptor state that _prb_commit:B stored.

The extra note in the comment clarifies that _prb_commit:B could also be
denoted as desc_read:A because desc_update_last_finalized() performs a
read (i.e. must have seen) _prb_commit:B.

    * Note: _prb_commit:B and desc_update_last_finalized:A can be
    *       different CPUs. However, the desc_update_last_finalized:A
    *       CPU (which performs the release) must have previously seen
    *       _prb_commit:B.

Normally the CPU committing the record will also update
last_finalized_seq. But it is possible that another CPU updates
last_finalized_seq before the committing CPU because it already sees the
finalized record. In that case the complete (maximally complex) picture
looks like this.

CPU0            CPU1                           CPU2
====            ====                           ====
_prb_commit:B   desc_read:A
                desc_update_last_finalized:A   desc_last_finalized_seq:A
                                               desc_read:A

Any memory barriers in _prb_commit() or desc_read() are irrelevant for
guaranteeing that a CPU reading a sequence value from
desc_last_finalized_seq() will always be able to read that record.

> Summary:
>
> I saw atomic_long_read_acquire(last_finalized_seq) called from
> prb_next_seq() code path. The barrier looked important to me.
> But I saw neither the counter-part nor any comment. I wanted
> to understand it because it might be important for reviewing
> following patches which depend on prb_next_seq().

desc_update_last_finalized:A is the counterpart to
desc_last_finalized_seq:A. IMHO there are plenty of comments that are
formally documenting these memory barriers. Including the new entry in
the summary of all memory barriers:

 *   desc_update_last_finalized:A / desc_last_finalized_seq:A
 *     store finalized record, then set new highest finalized sequence number

John
  
Petr Mladek Feb. 6, 2024, 5:27 p.m. UTC | #5
On Mon 2024-02-05 12:39:30, John Ogness wrote:
> On 2024-01-15, Petr Mladek <pmladek@suse.com> wrote:
> >> The acquire is with @last_finalized_seq. So the release must also be
> >> with @last_finalized_seq. The important thing is that the CPU that
> >> updates @last_finalized_seq has actually read the corresponding
> >> record beforehand. That is exactly what desc_update_last_finalized()
> >> does.
> >
> > I probably did not describe it well. The CPU updating
> > @last_finalized_seq does the right thing. I was not sure about the CPU
> > which reads @last_finalized_seq via prb_next_seq().
> >
> > To make it more clear:
> >
> > u64 prb_next_seq(struct printk_ringbuffer *rb)
> > {
> > 	u64 seq;
> >
> > 	seq = desc_last_finalized_seq(rb);
> > 	      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
> > 	      |
> > 	      `-> This includes atomic_long_read_acquire(last_finalized_seq)
> >
> >
> > 	if (seq != 0)
> > 		seq++;
> >
> > 	while (_prb_read_valid(rb, &seq, NULL, NULL))
> > 		seq++;
> >
> > 	return seq;
> > }
> >
> > But where is the atomic_long_read_release(last_finalized_seq) in
> > this code path?
> 
> read_release? The counterpart of this load_acquire is a
> store_release. For example:
> 
> CPU0                     CPU1
> ====                     ====
> load(varA)
> store_release(varB)      load_acquire(varB)
>                          load(varA)
> 
> If CPU1 reads the value in varB that CPU0 stored, then it is guaranteed
> that CPU1 will read the value (or a later value) in varA that CPU0 read.
> 
> Translating the above example to this particular patch, we have:
> 
> CPU0: desc_update_last_finalized()       CPU1: prb_next_seq()
> ====                                     ====
> _prb_read_valid(seq)
> cmpxchg_release(last_finalized_seq,seq)  seq=read_acquire(last_finalized_seq)
>                                          _prb_read_valid(seq)

I think that I was confused because I had acquire/release mentally
connected with lock/unlock. Where the lock/unlock surrounds some
critical code section. I think that it is actually the most common
usecase.

Our usage is not typical from my POV. There are two aspects:

   1. They do not surround a critical section, at least not
      an obvious one.

   2. In my mental code, this patch patch uses the release
      before the acquire. When I think about this code,
      than I first imagine the write path (using release)
      and then the reader (using acquire).

I think that this code (mis)uses the acquire/release
semantic just for optimized memory barriers.

It might be worth a note that this is not a typical acquire/release
scenario.

> > IMHO, the barrier provided by the acquire() is _important_ to make
> > sure that _prb_read_valid() would see the valid descriptor.
> 
> Correct.
> 
> > Now, I think that the related read_release(seq) is hidden in:
> >
> > static int prb_read(struct printk_ringbuffer *rb, u64 seq,
> > 		    struct printk_record *r, unsigned int *line_count)
> > {
> > 	/* Get a local copy of the correct descriptor (if available). */
> > 	err = desc_read_finalized_seq(desc_ring, id, seq, &desc);
> >
> > 	/* If requested, copy meta data. */
> > 	if (r->info)
> > 		memcpy(r->info, info, sizeof(*(r->info)));
> >
> > 	/* Copy text data. If it fails, this is a data-less record. */
> > 	if (!copy_data(&rb->text_data_ring, &desc.text_blk_lpos, info->text_len,
> > 		       r->text_buf, r->text_buf_size, line_count)) {
> > 		return -ENOENT;
> > 	}
> >
> > 	/* Ensure the record is still finalized and has the same @seq. */
> > 	return desc_read_finalized_seq(desc_ring, id, seq, &desc);
> > 	       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
> > 	       |
> > 	       `-> This includes a memory barrier /* LMM(desc_read:A) */
> > 		   which makes sure that the data are read before
> > 		   the desc/data could be reused.
> > }
> >
> > I consider this /* LMM(desc_read:A) */ as a counter part for that
> > acquire() in prb_next_seq().
> 
> desc_read:A is not a memory barrier. It only marks the load of the
> descriptor state.

I see. The real read barriers are desc_read:B and desc_read:D

> This is a significant load because prb_next_seq() must
> see at least the descriptor state that desc_update_last_finalized() saw.
> 
> The memory barrier comments in desc_update_last_finalized() state:
> 
>     * If desc_last_finalized_seq:A reads from
>     * desc_update_last_finalized:A, then desc_read:A reads from
>     * _prb_commit:B.
> 
> This is referring to a slightly different situation than the example I
> used above because it is referencing where the descriptor state was
> stored (_prb_commit:B). The same general picture is valid:
> 
> CPU0                              CPU1
> ====                              ====
> _prb_commit:B
> desc_update_last_finalized:A      desc_last_finalized_seq:A
>                                   desc_read:A
> 
> desc_read:A is loding the descriptor state that _prb_commit:B stored.
> 
> The extra note in the comment clarifies that _prb_commit:B could also be
> denoted as desc_read:A because desc_update_last_finalized() performs a
> read (i.e. must have seen) _prb_commit:B.
> 
>     * Note: _prb_commit:B and desc_update_last_finalized:A can be
>     *       different CPUs. However, the desc_update_last_finalized:A
>     *       CPU (which performs the release) must have previously seen
>     *       _prb_commit:B.
> 
> Normally the CPU committing the record will also update
> last_finalized_seq. But it is possible that another CPU updates
> last_finalized_seq before the committing CPU because it already sees the
> finalized record. In that case the complete (maximally complex) picture
> looks like this.
> 
> CPU0            CPU1                           CPU2
> ====            ====                           ====
> _prb_commit:B   desc_read:A
>                 desc_update_last_finalized:A   desc_last_finalized_seq:A
>                                                desc_read:A
>
> Any memory barriers in _prb_commit() or desc_read() are irrelevant for
> guaranteeing that a CPU reading a sequence value from
> desc_last_finalized_seq() will always be able to read that record.

I believe that they are relevant exactly because 3 CPUs might be
involved here. I believe that

  + _prb_commit:B makes sure that CPU0 stores all data before
    setting the state as finalized.

  + desc_read:B makes sure that CPU1 will see all data written
    when it reads the finalized state.

  + desc_update_last_finalized:A makes sure that saw the record
    with the given seq in finalized state.

  + desc_last_finalized_seq:A makes sure that it sees the record
    associated with "last_finalized_seq" in the finalized state.

This is why the "Note:" is very important. And maybe desc_read:B
or desc_read:D should be mentioned in the note as well.

Also all these dependencies are really hard to follow. This
is why I suggested to add another note in the 9th patch,
see https://lore.kernel.org/all/ZbowlkOVWiSgCxNX@alley/

Or maybe we should document that pr_read() and _prb_read_valid()
provides these guarantees and just reference it here.

By another words. IMHO, the current "Note:" tries to
prove that _prb_read_valid() guarantees that all data
can be read when it sees the finalized state. IMHO,
we should document this above these functions and
just reference it here.

The current comments for desc_update_last_finalized:A
and prb_next_reserve_seq:A are really hard to follow
because they both try to explain these transitional
guarantees between prb_commit() and prb_read() APIs.
The comments mention many barriers even though the guarantees
should be there by design and should be mentioned
in the prb_read() API.

My motivation is that it took me long time to understand this.
And I am still not sure if I understand it correctly.
IMHO, it might be better to describe some guarantees of
upper level API so that we do not need to look at
the low level barriers again and again.

Best Regards,
Petr
  

Patch

diff --git a/kernel/printk/printk_ringbuffer.c b/kernel/printk/printk_ringbuffer.c
index 49a82ccce8e9..04c26cca546f 100644
--- a/kernel/printk/printk_ringbuffer.c
+++ b/kernel/printk/printk_ringbuffer.c
@@ -6,6 +6,7 @@ 
 #include <linux/errno.h>
 #include <linux/bug.h>
 #include "printk_ringbuffer.h"
+#include "internal.h"
 
 /**
  * DOC: printk_ringbuffer overview
@@ -303,6 +304,9 @@ 
  *
  *   desc_push_tail:B / desc_reserve:D
  *     set descriptor reusable (state), then push descriptor tail (id)
+ *
+ *   desc_update_last_finalized:A / desc_last_finalized_seq:A
+ *     store finalized record, then set new highest finalized sequence number
  */
 
 #define DATA_SIZE(data_ring)		_DATA_SIZE((data_ring)->size_bits)
@@ -1441,20 +1445,118 @@  bool prb_reserve_in_last(struct prb_reserved_entry *e, struct printk_ringbuffer
 	return false;
 }
 
+/*
+ * @last_finalized_seq value guarantees that all records up to and including
+ * this sequence number are finalized and can be read. The only exception are
+ * too old records which have already been overwritten.
+ *
+ * It is also guaranteed that @last_finalized_seq only increases.
+ *
+ * Be aware that finalized records following non-finalized records are not
+ * reported because they are not yet available to the reader. For example,
+ * a new record stored via printk() will not be available to a printer if
+ * it follows a record that has not been finalized yet. However, once that
+ * non-finalized record becomes finalized, @last_finalized_seq will be
+ * appropriately updated and the full set of finalized records will be
+ * available to the printer. And since each printk() caller will either
+ * directly print or trigger deferred printing of all available unprinted
+ * records, all printk() messages will get printed.
+ */
+static u64 desc_last_finalized_seq(struct printk_ringbuffer *rb)
+{
+	struct prb_desc_ring *desc_ring = &rb->desc_ring;
+	unsigned long ulseq;
+
+	/*
+	 * Guarantee the sequence number is loaded before loading the
+	 * associated record in order to guarantee that the record can be
+	 * seen by this CPU. This pairs with desc_update_last_finalized:A.
+	 */
+	ulseq = atomic_long_read_acquire(&desc_ring->last_finalized_seq
+					); /* LMM(desc_last_finalized_seq:A) */
+
+	return __ulseq_to_u64seq(rb, ulseq);
+}
+
+static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
+			    struct printk_record *r, unsigned int *line_count);
+
+/*
+ * Check if there are records directly following @last_finalized_seq that are
+ * finalized. If so, update @last_finalized_seq to the latest of these
+ * records. It is not allowed to skip over records that are not yet finalized.
+ */
+static void desc_update_last_finalized(struct printk_ringbuffer *rb)
+{
+	struct prb_desc_ring *desc_ring = &rb->desc_ring;
+	u64 old_seq = desc_last_finalized_seq(rb);
+	unsigned long oldval;
+	unsigned long newval;
+	u64 finalized_seq;
+	u64 try_seq;
+
+try_again:
+	finalized_seq = old_seq;
+	try_seq = finalized_seq + 1;
+
+	/* Try to find later finalized records. */
+	while (_prb_read_valid(rb, &try_seq, NULL, NULL)) {
+		finalized_seq = try_seq;
+		try_seq++;
+	}
+
+	/* No update needed if no later finalized record was found. */
+	if (finalized_seq == old_seq)
+		return;
+
+	oldval = __u64seq_to_ulseq(old_seq);
+	newval = __u64seq_to_ulseq(finalized_seq);
+
+	/*
+	 * Set the sequence number of a later finalized record that has been
+	 * seen.
+	 *
+	 * Guarantee the record data is visible to other CPUs before storing
+	 * its sequence number. This pairs with desc_last_finalized_seq:A.
+	 *
+	 * Memory barrier involvement:
+	 *
+	 * If desc_last_finalized_seq:A reads from
+	 * desc_update_last_finalized:A, then desc_read:A reads from
+	 * _prb_commit:B.
+	 *
+	 * Relies on:
+	 *
+	 * RELEASE from _prb_commit:B to desc_update_last_finalized:A
+	 *    matching
+	 * ACQUIRE from desc_last_finalized_seq:A to desc_read:A
+	 *
+	 * Note: _prb_commit:B and desc_update_last_finalized:A can be
+	 *       different CPUs. However, the desc_update_last_finalized:A
+	 *       CPU (which performs the release) must have previously seen
+	 *       _prb_commit:B.
+	 */
+	if (!atomic_long_try_cmpxchg_release(&desc_ring->last_finalized_seq,
+				&oldval, newval)) { /* LMM(desc_update_last_finalized:A) */
+		old_seq = __ulseq_to_u64seq(rb, oldval);
+		goto try_again;
+	}
+}
+
 /*
  * Attempt to finalize a specified descriptor. If this fails, the descriptor
  * is either already final or it will finalize itself when the writer commits.
  */
-static void desc_make_final(struct prb_desc_ring *desc_ring, unsigned long id)
+static void desc_make_final(struct printk_ringbuffer *rb, unsigned long id)
 {
+	struct prb_desc_ring *desc_ring = &rb->desc_ring;
 	unsigned long prev_state_val = DESC_SV(id, desc_committed);
 	struct prb_desc *d = to_desc(desc_ring, id);
 
-	atomic_long_cmpxchg_relaxed(&d->state_var, prev_state_val,
-			DESC_SV(id, desc_finalized)); /* LMM(desc_make_final:A) */
-
-	/* Best effort to remember the last finalized @id. */
-	atomic_long_set(&desc_ring->last_finalized_id, id);
+	if (atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val,
+			DESC_SV(id, desc_finalized))) { /* LMM(desc_make_final:A) */
+		desc_update_last_finalized(rb);
+	}
 }
 
 /**
@@ -1550,7 +1652,7 @@  bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
 	 * readers. (For seq==0 there is no previous descriptor.)
 	 */
 	if (info->seq > 0)
-		desc_make_final(desc_ring, DESC_ID(id - 1));
+		desc_make_final(rb, DESC_ID(id - 1));
 
 	r->text_buf = data_alloc(rb, r->text_buf_size, &d->text_blk_lpos, id);
 	/* If text data allocation fails, a data-less record is committed. */
@@ -1643,7 +1745,7 @@  void prb_commit(struct prb_reserved_entry *e)
 	 */
 	head_id = atomic_long_read(&desc_ring->head_id); /* LMM(prb_commit:A) */
 	if (head_id != e->id)
-		desc_make_final(desc_ring, e->id);
+		desc_make_final(e->rb, e->id);
 }
 
 /**
@@ -1663,12 +1765,9 @@  void prb_commit(struct prb_reserved_entry *e)
  */
 void prb_final_commit(struct prb_reserved_entry *e)
 {
-	struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
-
 	_prb_commit(e, desc_finalized);
 
-	/* Best effort to remember the last finalized @id. */
-	atomic_long_set(&desc_ring->last_finalized_id, e->id);
+	desc_update_last_finalized(e->rb);
 }
 
 /*
@@ -2008,7 +2107,9 @@  u64 prb_first_valid_seq(struct printk_ringbuffer *rb)
  * newest sequence number available to readers will be.
  *
  * This provides readers a sequence number to jump to if all currently
- * available records should be skipped.
+ * available records should be skipped. It is guaranteed that all records
+ * previous to the returned value have been finalized and are (or were)
+ * available to the reader.
  *
  * Context: Any context.
  * Return: The sequence number of the next newest (not yet available) record
@@ -2016,34 +2117,19 @@  u64 prb_first_valid_seq(struct printk_ringbuffer *rb)
  */
 u64 prb_next_seq(struct printk_ringbuffer *rb)
 {
-	struct prb_desc_ring *desc_ring = &rb->desc_ring;
-	enum desc_state d_state;
-	unsigned long id;
 	u64 seq;
 
-	/* Check if the cached @id still points to a valid @seq. */
-	id = atomic_long_read(&desc_ring->last_finalized_id);
-	d_state = desc_read(desc_ring, id, NULL, &seq, NULL);
+	seq = desc_last_finalized_seq(rb);
 
-	if (d_state == desc_finalized || d_state == desc_reusable) {
-		/*
-		 * Begin searching after the last finalized record.
-		 *
-		 * On 0, the search must begin at 0 because of hack#2
-		 * of the bootstrapping phase it is not known if a
-		 * record at index 0 exists.
-		 */
-		if (seq != 0)
-			seq++;
-	} else {
-		/*
-		 * The information about the last finalized sequence number
-		 * has gone. It should happen only when there is a flood of
-		 * new messages and the ringbuffer is rapidly recycled.
-		 * Give up and start from the beginning.
-		 */
-		seq = 0;
-	}
+	/*
+	 * Begin searching after the last finalized record.
+	 *
+	 * On 0, the search must begin at 0 because of hack#2
+	 * of the bootstrapping phase it is not known if a
+	 * record at index 0 exists.
+	 */
+	if (seq != 0)
+		seq++;
 
 	/*
 	 * The information about the last finalized @seq might be inaccurate.
@@ -2085,7 +2171,7 @@  void prb_init(struct printk_ringbuffer *rb,
 	rb->desc_ring.infos = infos;
 	atomic_long_set(&rb->desc_ring.head_id, DESC0_ID(descbits));
 	atomic_long_set(&rb->desc_ring.tail_id, DESC0_ID(descbits));
-	atomic_long_set(&rb->desc_ring.last_finalized_id, DESC0_ID(descbits));
+	atomic_long_set(&rb->desc_ring.last_finalized_seq, 0);
 
 	rb->text_data_ring.size_bits = textbits;
 	rb->text_data_ring.data = text_buf;
diff --git a/kernel/printk/printk_ringbuffer.h b/kernel/printk/printk_ringbuffer.h
index ee294aaf4aeb..2d948cc82b5b 100644
--- a/kernel/printk/printk_ringbuffer.h
+++ b/kernel/printk/printk_ringbuffer.h
@@ -75,7 +75,7 @@  struct prb_desc_ring {
 	struct printk_info	*infos;
 	atomic_long_t		head_id;
 	atomic_long_t		tail_id;
-	atomic_long_t		last_finalized_id;
+	atomic_long_t		last_finalized_seq;
 };
 
 /*
@@ -259,7 +259,7 @@  static struct printk_ringbuffer name = {							\
 		.infos		= &_##name##_infos[0],						\
 		.head_id	= ATOMIC_INIT(DESC0_ID(descbits)),				\
 		.tail_id	= ATOMIC_INIT(DESC0_ID(descbits)),				\
-		.last_finalized_id = ATOMIC_INIT(DESC0_ID(descbits)),				\
+		.last_finalized_seq = ATOMIC_INIT(0),						\
 	},											\
 	.text_data_ring = {									\
 		.size_bits	= (avgtextbits) + (descbits),					\