[printk,v1,10/18] printk: nobkl: Add emit function and callback functions for atomic printing

Message ID 20230302195618.156940-11-john.ogness@linutronix.de
State New
Headers
Series threaded/atomic console support |

Commit Message

John Ogness March 2, 2023, 7:56 p.m. UTC
  From: Thomas Gleixner <tglx@linutronix.de>

Implement an emit function for non-BKL consoles to output printk
messages. It utilizes the lockless printk_get_next_message() and
console_prepend_dropped() functions to retrieve/build the output
message. The emit function includes the required safety points to
check for handover/takeover and calls a new write_atomic callback
of the console driver to output the message. It also includes proper
handling for updating the non-BKL console sequence number.

Co-developed-by: John Ogness <john.ogness@linutronix.de>
Signed-off-by: John Ogness <john.ogness@linutronix.de>
Signed-off-by: Thomas Gleixner (Intel) <tglx@linutronix.de>
---
 include/linux/console.h      |   8 +++
 kernel/printk/internal.h     |   9 +++
 kernel/printk/printk.c       |  12 ++--
 kernel/printk/printk_nobkl.c | 121 ++++++++++++++++++++++++++++++++++-
 4 files changed, 141 insertions(+), 9 deletions(-)
  

Comments

kernel test robot March 3, 2023, 12:19 a.m. UTC | #1
Hi John,

I love your patch! Perhaps something to improve:

[auto build test WARNING on 10d639febe5629687dac17c4a7500a96537ce11a]

url:    https://github.com/intel-lab-lkp/linux/commits/John-Ogness/kdb-do-not-assume-write-callback-available/20230303-040039
base:   10d639febe5629687dac17c4a7500a96537ce11a
patch link:    https://lore.kernel.org/r/20230302195618.156940-11-john.ogness%40linutronix.de
patch subject: [PATCH printk v1 10/18] printk: nobkl: Add emit function and callback functions for atomic printing
config: nios2-buildonly-randconfig-r004-20230302 (https://download.01.org/0day-ci/archive/20230303/202303030859.j7DLimWU-lkp@intel.com/config)
compiler: nios2-linux-gcc (GCC) 12.1.0
reproduce (this is a W=1 build):
        wget https://raw.githubusercontent.com/intel/lkp-tests/master/sbin/make.cross -O ~/bin/make.cross
        chmod +x ~/bin/make.cross
        # https://github.com/intel-lab-lkp/linux/commit/cae46beabb2dfe79a4c4c602601fa538a8d840f7
        git remote add linux-review https://github.com/intel-lab-lkp/linux
        git fetch --no-tags linux-review John-Ogness/kdb-do-not-assume-write-callback-available/20230303-040039
        git checkout cae46beabb2dfe79a4c4c602601fa538a8d840f7
        # save the config file
        mkdir build_dir && cp config build_dir/.config
        COMPILER_INSTALL_PATH=$HOME/0day COMPILER=gcc-12.1.0 make.cross W=1 O=build_dir ARCH=nios2 olddefconfig
        COMPILER_INSTALL_PATH=$HOME/0day COMPILER=gcc-12.1.0 make.cross W=1 O=build_dir ARCH=nios2 SHELL=/bin/bash kernel/printk/

If you fix the issue, kindly add following tag where applicable
| Reported-by: kernel test robot <lkp@intel.com>
| Link: https://lore.kernel.org/oe-kbuild-all/202303030859.j7DLimWU-lkp@intel.com/

All warnings (new ones prefixed by >>):

>> kernel/printk/printk.c:2841:6: warning: no previous prototype for 'printk_get_next_message' [-Wmissing-prototypes]
    2841 | bool printk_get_next_message(struct printk_message *pmsg, u64 seq,
         |      ^~~~~~~~~~~~~~~~~~~~~~~


vim +/printk_get_next_message +2841 kernel/printk/printk.c

  2821	
  2822	/*
  2823	 * Read and format the specified record (or a later record if the specified
  2824	 * record is not available).
  2825	 *
  2826	 * @pmsg will contain the formatted result. @pmsg->pbufs must point to a
  2827	 * struct printk_buffers.
  2828	 *
  2829	 * @seq is the record to read and format. If it is not available, the next
  2830	 * valid record is read.
  2831	 *
  2832	 * @is_extended specifies if the message should be formatted for extended
  2833	 * console output.
  2834	 *
  2835	 * @may_supress specifies if records may be skipped based on loglevel.
  2836	 *
  2837	 * Returns false if no record is available. Otherwise true and all fields
  2838	 * of @pmsg are valid. (See the documentation of struct printk_message
  2839	 * for information about the @pmsg fields.)
  2840	 */
> 2841	bool printk_get_next_message(struct printk_message *pmsg, u64 seq,
  2842				     bool is_extended, bool may_suppress)
  2843	{
  2844		static int panic_console_dropped;
  2845	
  2846		struct printk_buffers *pbufs = pmsg->pbufs;
  2847		const size_t scratchbuf_sz = sizeof(pbufs->scratchbuf);
  2848		const size_t outbuf_sz = sizeof(pbufs->outbuf);
  2849		char *scratchbuf = &pbufs->scratchbuf[0];
  2850		char *outbuf = &pbufs->outbuf[0];
  2851		struct printk_info info;
  2852		struct printk_record r;
  2853		size_t len = 0;
  2854	
  2855		/*
  2856		 * Formatting extended messages requires a separate buffer, so use the
  2857		 * scratch buffer to read in the ringbuffer text.
  2858		 *
  2859		 * Formatting normal messages is done in-place, so read the ringbuffer
  2860		 * text directly into the output buffer.
  2861		 */
  2862		if (is_extended)
  2863			prb_rec_init_rd(&r, &info, scratchbuf, scratchbuf_sz);
  2864		else
  2865			prb_rec_init_rd(&r, &info, outbuf, outbuf_sz);
  2866	
  2867		if (!prb_read_valid(prb, seq, &r))
  2868			return false;
  2869	
  2870		pmsg->seq = r.info->seq;
  2871		pmsg->dropped = r.info->seq - seq;
  2872	
  2873		/*
  2874		 * Check for dropped messages in panic here so that printk
  2875		 * suppression can occur as early as possible if necessary.
  2876		 */
  2877		if (pmsg->dropped &&
  2878		    panic_in_progress() &&
  2879		    panic_console_dropped++ > 10) {
  2880			suppress_panic_printk = 1;
  2881			pr_warn_once("Too many dropped messages. Suppress messages on non-panic CPUs to prevent livelock.\n");
  2882		}
  2883	
  2884		/* Skip record that has level above the console loglevel. */
  2885		if (may_suppress && suppress_message_printing(r.info->level))
  2886			goto out;
  2887	
  2888		if (is_extended) {
  2889			len = info_print_ext_header(outbuf, outbuf_sz, r.info);
  2890			len += msg_print_ext_body(outbuf + len, outbuf_sz - len,
  2891						  &r.text_buf[0], r.info->text_len, &r.info->dev_info);
  2892		} else {
  2893			len = record_print_text(&r, console_msg_format & MSG_FORMAT_SYSLOG, printk_time);
  2894		}
  2895	out:
  2896		pmsg->outbuf_len = len;
  2897		return true;
  2898	}
  2899
  
John Ogness March 3, 2023, 10:55 a.m. UTC | #2
On 2023-03-03, kernel test robot <lkp@intel.com> wrote:
>>> kernel/printk/printk.c:2841:6: warning: no previous prototype for 'printk_get_next_message' [-Wmissing-prototypes]
>     2841 | bool printk_get_next_message(struct printk_message *pmsg, u64 seq,
>          |      ^~~~~~~~~~~~~~~~~~~~~~~

This function needs to be declared for !CONFIG_PRINTK as well.

diff --git a/kernel/printk/internal.h b/kernel/printk/internal.h
index 8856beed65da..60d6bf18247e 100644
--- a/kernel/printk/internal.h
+++ b/kernel/printk/internal.h
@@ -188,10 +188,11 @@ struct cons_context_data {
 	struct printk_buffers		pbufs;
 };
 
-#ifdef CONFIG_PRINTK
-
 bool printk_get_next_message(struct printk_message *pmsg, u64 seq,
 			     bool is_extended, bool may_supress);
+
+#ifdef CONFIG_PRINTK
+
 void console_prepend_dropped(struct printk_message *pmsg,
 			     unsigned long dropped);
  
Petr Mladek March 31, 2023, 10:29 a.m. UTC | #3
On Thu 2023-03-02 21:02:10, John Ogness wrote:
> From: Thomas Gleixner <tglx@linutronix.de>
> 
> Implement an emit function for non-BKL consoles to output printk
> messages. It utilizes the lockless printk_get_next_message() and
> console_prepend_dropped() functions to retrieve/build the output
> message. The emit function includes the required safety points to
> check for handover/takeover and calls a new write_atomic callback
> of the console driver to output the message. It also includes proper
> handling for updating the non-BKL console sequence number.
> 
> --- a/kernel/printk/printk_nobkl.c
> +++ b/kernel/printk/printk_nobkl.c
> @@ -1086,6 +1086,123 @@ bool console_exit_unsafe(struct cons_write_context *wctxt)
>  	return __console_update_unsafe(wctxt, false);
>  }
>  
> +/**
> + * cons_get_record - Fill the buffer with the next pending ringbuffer record
> + * @wctxt:	The write context which will be handed to the write function
> + *
> + * Returns:	True if there are records available. If the next record should
> + *		be printed, the output buffer is filled and @wctxt->outbuf
> + *		points to the text to print. If @wctxt->outbuf is NULL after
> + *		the call, the record should not be printed but the caller must
> + *		still update the console sequence number.
> + *
> + *		False means that there are no pending records anymore and the
> + *		printing can stop.
> + */
> +static bool cons_get_record(struct cons_write_context *wctxt)
> +{
> +	struct cons_context *ctxt = &ACCESS_PRIVATE(wctxt, ctxt);
> +	struct console *con = ctxt->console;
> +	bool is_extended = console_srcu_read_flags(con) & CON_EXTENDED;
> +	struct printk_message pmsg = {
> +		.pbufs = ctxt->pbufs,
> +	};
> +
> +	if (!printk_get_next_message(&pmsg, ctxt->newseq, is_extended, true))
> +		return false;
> +
> +	ctxt->newseq = pmsg.seq;
> +	ctxt->dropped += pmsg.dropped;
> +
> +	if (pmsg.outbuf_len == 0) {
> +		wctxt->outbuf = NULL;
> +	} else {
> +		if (ctxt->dropped && !is_extended)
> +			console_prepend_dropped(&pmsg, ctxt->dropped);
> +		wctxt->outbuf = &pmsg.pbufs->outbuf[0];
> +	}
> +
> +	wctxt->len = pmsg.outbuf_len;

This function seems to be needed only because we duplicate the information
in both struct printk_message and struct cons_write_context.

I think that we will not need this function at all if we bundle
struct printk_message into struct cons_context. I mean to replace:

struct cons_context {
	[...]
	struct printk_buffers	*pbufs;
	u64			newseq;
	unsigned long		dropped;
	[...]
}

with

struct cons_context {
	[...]
	struct printk_message pmsg;
	[...]
}

> +
> +	return true;
> +}
> +
> +/**
> + * cons_emit_record - Emit record in the acquired context
> + * @wctxt:	The write context that will be handed to the write function
> + *
> + * Returns:	False if the operation was aborted (takeover or handover).
> + *		True otherwise
> + *
> + * When false is returned, the caller is not allowed to touch console state.
> + * The console is owned by someone else. If the caller wants to print more
> + * it has to reacquire the console first.
> + *
> + * When true is returned, @wctxt->ctxt.backlog indicates whether there are
> + * still records pending in the ringbuffer,
> + */
> +static int __maybe_unused cons_emit_record(struct cons_write_context *wctxt)
> +{
> +	struct cons_context *ctxt = &ACCESS_PRIVATE(wctxt, ctxt);
> +	struct console *con = ctxt->console;
> +	bool done = false;
> +
> +	/*
> +	 * @con->dropped is not protected in case of hostile takeovers so
> +	 * the update below is racy. Annotate it accordingly.
> +	 */
> +	ctxt->dropped = data_race(READ_ONCE(con->dropped));
> +
> +	/* Fill the output buffer with the next record */
> +	ctxt->backlog = cons_get_record(wctxt);
> +	if (!ctxt->backlog)
> +		return true;
> +
> +	/* Safety point. Don't touch state in case of takeover */
> +	if (!console_can_proceed(wctxt))
> +		return false;
> +
> +	/* Counterpart to the read above */
> +	WRITE_ONCE(con->dropped, ctxt->dropped);

These racy hacks with ctxt-> dropped won't be needed if we bundle
strcut printk_message into struct cons_context.

> +
> +	/*
> +	 * In case of skipped records, Update sequence state in @con.
> +	 */
> +	if (!wctxt->outbuf)
> +		goto update;
> +
> +	/* Tell the driver about potential unsafe state */
> +	wctxt->unsafe = ctxt->state.unsafe;
> +
> +	if (!ctxt->thread && con->write_atomic) {
> +		done = con->write_atomic(con, wctxt);
> +	} else {
> +		cons_release(ctxt);
> +		WARN_ON_ONCE(1);
> +		return false;
> +	}
> +
> +	/* If not done, the write was aborted due to takeover */
> +	if (!done)
> +		return false;
> +
> +	/* If there was a dropped message, it has now been output. */
> +	if (ctxt->dropped) {
> +		ctxt->dropped = 0;
> +		/* Counterpart to the read above */
> +		WRITE_ONCE(con->dropped, ctxt->dropped);

I suggest to use atomic_t for con->dropped and use

		atomic_sub(ctxt->dropped, &con->dropped);

> +	}
> +update:
> +	ctxt->newseq++;
> +	/*
> +	 * The sequence update attempt is not part of console_release()
> +	 * because in panic situations the console is not released by
> +	 * the panic CPU until all records are written. On 32bit the
> +	 * sequence is separate from state anyway.
> +	 */
> +	return cons_seq_try_update(ctxt);
> +}

Best Regards,
Petr
  
Petr Mladek March 31, 2023, 10:36 a.m. UTC | #4
On Thu 2023-03-02 21:02:10, John Ogness wrote:
> From: Thomas Gleixner <tglx@linutronix.de>
> 
> Implement an emit function for non-BKL consoles to output printk
> messages. It utilizes the lockless printk_get_next_message() and
> console_prepend_dropped() functions to retrieve/build the output
> message. The emit function includes the required safety points to
> check for handover/takeover and calls a new write_atomic callback
> of the console driver to output the message. It also includes proper
> handling for updating the non-BKL console sequence number.
> 
> --- a/kernel/printk/printk_nobkl.c
> +++ b/kernel/printk/printk_nobkl.c
> +/**
> + * cons_emit_record - Emit record in the acquired context
> + * @wctxt:	The write context that will be handed to the write function
> + *
> + * Returns:	False if the operation was aborted (takeover or handover).
> + *		True otherwise
> + *
> + * When false is returned, the caller is not allowed to touch console state.
> + * The console is owned by someone else. If the caller wants to print more
> + * it has to reacquire the console first.
> + *
> + * When true is returned, @wctxt->ctxt.backlog indicates whether there are
> + * still records pending in the ringbuffer,

This is inconsistent and a bit confusing. This seems to be the only
function returning "true" when there is no pending output.

All the other functions cons_get_record(), console_emit_next_record(),
and printk_get_next_message() return false in this case.

It has to distinguish 3 different return states anyway, same as
console_emit_next_record(). I suggest to use the same semantic
and distinguish "no pending records" and "handed over lock"
via a "handover" flag. Or maybe the caller should just check
if it still owns the lock.

> + */
> +static int __maybe_unused cons_emit_record(struct cons_write_context *wctxt)
> +{
> +	struct cons_context *ctxt = &ACCESS_PRIVATE(wctxt, ctxt);
> +	struct console *con = ctxt->console;
> +	bool done = false;
> +
> +	/*
> +	 * @con->dropped is not protected in case of hostile takeovers so
> +	 * the update below is racy. Annotate it accordingly.
> +	 */
> +	ctxt->dropped = data_race(READ_ONCE(con->dropped));
> +
> +	/* Fill the output buffer with the next record */
> +	ctxt->backlog = cons_get_record(wctxt);
> +	if (!ctxt->backlog)
> +		return true;
> +
> +	/* Safety point. Don't touch state in case of takeover */
> +	if (!console_can_proceed(wctxt))
> +		return false;
> +
> +	/* Counterpart to the read above */
> +	WRITE_ONCE(con->dropped, ctxt->dropped);
> +
> +	/*
> +	 * In case of skipped records, Update sequence state in @con.
> +	 */
> +	if (!wctxt->outbuf)
> +		goto update;
> +
> +	/* Tell the driver about potential unsafe state */
> +	wctxt->unsafe = ctxt->state.unsafe;
> +
> +	if (!ctxt->thread && con->write_atomic) {

I would expect this check in console_is_usable(), same as for legacy
consoles.

And what is actually the difference between con->write_atomic()
and con->write_thread(), where write_thread() is added later
in 11th patch?

I guess that the motivation is that the kthread variant
might sleep. But I do not see it described anywhere.

Do we really need two callbacks? I would expect that the code
would be basically the same.

Maybe, the callback could call cond_resched() when running
in kthread but this information might be passed via a flag.

Or is this a preparation for tty code where the implementation
would be really different?

> +		done = con->write_atomic(con, wctxt);
> +	} else {
> +		cons_release(ctxt);
> +		WARN_ON_ONCE(1);
> +		return false;
> +	}

Best Regards,
Petr
  
Petr Mladek April 4, 2023, 2:09 p.m. UTC | #5
On Mon 2023-04-03 21:17:26, John Ogness wrote:
> On 2023-04-03, Petr Mladek <pmladek@suse.com> wrote:
> >> The main difference is that the kthread variant is invoked _only_
> >> from the kthread printer. It may or may not mean that the callback
> >> can sleep. (That depends on how the console implements the
> >> port_lock() callback.) But the function can be certain that it wasn't
> >> called from any bizarre interrupt/nmi/scheduler contexts.
> >> 
> >> The atomic callback can be called from anywhere! Including when it
> >> was already inside the atomic callback function! That probably
> >> requires much more careful coding than in the kthread case.
> >
> > Is it just about coding? Or is it also about making write_khtread()
> > better schedulable, please?
> 
> For UARTs there probably isn't much of a difference because most disable
> interrupts anyway. A diff in the latest version [0] of the 8250 nbcon
> console between serial8250_console_write_atomic() and
> serial8250_console_write_thread() shows no significant difference in the
> two except that the atomic variant may prefix with a newline if it
> interrupted a printing context.
> 
> But for vtcon and netconsole I expect there would be a very significant
> difference. For vtcon (and possibly netconsole) I expect there will be
> no atomic implementation at all.

Then these consoles might need another solution for the panic()
situation, like blocking the kthread and switching to the legacy
mode.

OK, so it might really make sense to have a separate callback
for the kthread and emergency/panic contexts.


> > Hmm, it is very questional if the callbacks should be optional.
> >
> > Do we really want to allow introducing non-blocking consoles without
> > the way to print emergency and panic messages? Such a console would
> > not be fully-flegged replacement of the legacy mode.
> 
> Not necessarily. For vtcon we are "planning" on a BSoD, probably based
> on the kmsg_dump interface. For netconsole it could be similar.
> 
> We are trying to give drivers an opportunity to implement some safety
> and control to maximize the chances of getting a dump out without
> jeopardizing other panic functions.
> 
> As a quick implementation a UART driver could simply set @unsafe upon
> entrace of write_thread() and clear it on exit. Then its write_atomic()
> would only be called at the very end of panic() if the panic happened
> during printing. For its write_atomic() implementation it could be a
> NOOP if !oops_in_progress. All locking is handled with the port_lock()
> callback, which is only called from the kthread context. It isn't
> particularly pretty, but it most likely would be more reliable than what
> we have now.

If I get it correctly, the above scenario requires both
write_kthread() and write_atomic(). Otherwise, it would not be
able to print in panic() at all. Right, please?


> > What about making write_atomic() mandatory and write_kthread()
> > optional?
> 
> I doubt there will ever be a write_atomic() for vtcon. BSoD based on
> kmsg_dump is a simpler approach.

But we would need to add an infrastructure for the BSoD(). For example,
call yet another callback at the end of panic(). Also it does not mean
that we might completely give up on printk() messages in panic().

Anyway, this might be solved later. I would really like to enforce
having both callbacks and good enough solution for now. It might later
be updated to another good enough solution using another panic() mode.


> > write_atomic() would be used in the kthread when write_kthread()
> > is not defined. write_kthread() would allow to create an alternative
> > implementation that might work better in the well defined kthread
> > context.
> 
> write_atomic() is the difficult callback to implement. It certainly
> could be used in the write_thread() case if no write_thread() was
> provided. But I still think there are valid cases to have a
> write_thread() implementation without a write_atomic().

The proposed framework does not provide a solution for consoles
that can't implement write_atomic(). And ignoring the messages
in panic() is not acceptable.

Either we need to enforce another good enough solution for these
consoles. Or we must not allow them now. We could update the logic
later when we see how the BSoD really looks and works.

Best Regards,
Petr
  

Patch

diff --git a/include/linux/console.h b/include/linux/console.h
index 0779757cb917..15f71ccfcd9d 100644
--- a/include/linux/console.h
+++ b/include/linux/console.h
@@ -250,10 +250,12 @@  struct printk_buffers;
  * @newseq:		The sequence number for progress
  * @prio:		Priority of the context
  * @pbufs:		Pointer to the text buffer for this context
+ * @dropped:		Dropped counter for the current context
  * @thread:		The acquire is printk thread context
  * @hostile:		Hostile takeover requested. Cleared on normal
  *			acquire or friendly handover
  * @spinwait:		Spinwait on acquire if possible
+ * @backlog:		Ringbuffer has pending records
  */
 struct cons_context {
 	struct console		*console;
@@ -266,9 +268,11 @@  struct cons_context {
 	unsigned int		spinwait_max_us;
 	enum cons_prio		prio;
 	struct printk_buffers	*pbufs;
+	unsigned long		dropped;
 	unsigned int		thread		: 1;
 	unsigned int		hostile		: 1;
 	unsigned int		spinwait	: 1;
+	unsigned int		backlog		: 1;
 };
 
 /**
@@ -310,6 +314,7 @@  struct cons_context_data;
  * @atomic_state:	State array for NOBKL consoles; real and handover
  * @atomic_seq:		Sequence for record tracking (32bit only)
  * @thread_pbufs:	Pointer to thread private buffer
+ * @write_atomic:	Write callback for atomic context
  * @pcpu_data:		Pointer to percpu context data
  */
 struct console {
@@ -337,6 +342,9 @@  struct console {
 	atomic_t		__private atomic_seq;
 #endif
 	struct printk_buffers	*thread_pbufs;
+
+	bool (*write_atomic)(struct console *con, struct cons_write_context *wctxt);
+
 	struct cons_context_data	__percpu *pcpu_data;
 };
 
diff --git a/kernel/printk/internal.h b/kernel/printk/internal.h
index 15a412065327..13dd0ce23c37 100644
--- a/kernel/printk/internal.h
+++ b/kernel/printk/internal.h
@@ -133,3 +133,12 @@  struct printk_message {
 struct cons_context_data {
 	struct printk_buffers		pbufs;
 };
+
+#ifdef CONFIG_PRINTK
+
+bool printk_get_next_message(struct printk_message *pmsg, u64 seq,
+			     bool is_extended, bool may_supress);
+void console_prepend_dropped(struct printk_message *pmsg,
+			     unsigned long dropped);
+
+#endif
diff --git a/kernel/printk/printk.c b/kernel/printk/printk.c
index 21b31183ff2b..eab0358baa6f 100644
--- a/kernel/printk/printk.c
+++ b/kernel/printk/printk.c
@@ -715,9 +715,6 @@  static ssize_t msg_print_ext_body(char *buf, size_t size,
 	return len;
 }
 
-static bool printk_get_next_message(struct printk_message *pmsg, u64 seq,
-				    bool is_extended, bool may_supress);
-
 /* /dev/kmsg - userspace message inject/listen interface */
 struct devkmsg_user {
 	atomic64_t seq;
@@ -2786,7 +2783,7 @@  static void __console_unlock(void)
  * If @pmsg->pbufs->outbuf is modified, @pmsg->outbuf_len is updated.
  */
 #ifdef CONFIG_PRINTK
-static void console_prepend_dropped(struct printk_message *pmsg, unsigned long dropped)
+void console_prepend_dropped(struct printk_message *pmsg, unsigned long dropped)
 {
 	struct printk_buffers *pbufs = pmsg->pbufs;
 	const size_t scratchbuf_sz = sizeof(pbufs->scratchbuf);
@@ -2818,7 +2815,8 @@  static void console_prepend_dropped(struct printk_message *pmsg, unsigned long d
 	pmsg->outbuf_len += len;
 }
 #else
-#define console_prepend_dropped(pmsg, dropped)
+static inline void console_prepend_dropped(struct printk_message *pmsg,
+					   unsigned long dropped) { }
 #endif /* CONFIG_PRINTK */
 
 /*
@@ -2840,8 +2838,8 @@  static void console_prepend_dropped(struct printk_message *pmsg, unsigned long d
  * of @pmsg are valid. (See the documentation of struct printk_message
  * for information about the @pmsg fields.)
  */
-static bool printk_get_next_message(struct printk_message *pmsg, u64 seq,
-				    bool is_extended, bool may_suppress)
+bool printk_get_next_message(struct printk_message *pmsg, u64 seq,
+			     bool is_extended, bool may_suppress)
 {
 	static int panic_console_dropped;
 
diff --git a/kernel/printk/printk_nobkl.c b/kernel/printk/printk_nobkl.c
index 3318a79a150a..5c591bced1be 100644
--- a/kernel/printk/printk_nobkl.c
+++ b/kernel/printk/printk_nobkl.c
@@ -317,7 +317,7 @@  static void cons_context_set_seq(struct cons_context *ctxt)
  * invalid. Caller has to reacquire the console.
  */
 #ifdef CONFIG_64BIT
-static bool __maybe_unused cons_seq_try_update(struct cons_context *ctxt)
+static bool cons_seq_try_update(struct cons_context *ctxt)
 {
 	struct console *con = ctxt->console;
 	struct cons_state old;
@@ -346,7 +346,7 @@  static bool __maybe_unused cons_seq_try_update(struct cons_context *ctxt)
 }
 #else
 static bool cons_release(struct cons_context *ctxt);
-static bool __maybe_unused cons_seq_try_update(struct cons_context *ctxt)
+static bool cons_seq_try_update(struct cons_context *ctxt)
 {
 	struct console *con = ctxt->console;
 	struct cons_state state;
@@ -1086,6 +1086,123 @@  bool console_exit_unsafe(struct cons_write_context *wctxt)
 	return __console_update_unsafe(wctxt, false);
 }
 
+/**
+ * cons_get_record - Fill the buffer with the next pending ringbuffer record
+ * @wctxt:	The write context which will be handed to the write function
+ *
+ * Returns:	True if there are records available. If the next record should
+ *		be printed, the output buffer is filled and @wctxt->outbuf
+ *		points to the text to print. If @wctxt->outbuf is NULL after
+ *		the call, the record should not be printed but the caller must
+ *		still update the console sequence number.
+ *
+ *		False means that there are no pending records anymore and the
+ *		printing can stop.
+ */
+static bool cons_get_record(struct cons_write_context *wctxt)
+{
+	struct cons_context *ctxt = &ACCESS_PRIVATE(wctxt, ctxt);
+	struct console *con = ctxt->console;
+	bool is_extended = console_srcu_read_flags(con) & CON_EXTENDED;
+	struct printk_message pmsg = {
+		.pbufs = ctxt->pbufs,
+	};
+
+	if (!printk_get_next_message(&pmsg, ctxt->newseq, is_extended, true))
+		return false;
+
+	ctxt->newseq = pmsg.seq;
+	ctxt->dropped += pmsg.dropped;
+
+	if (pmsg.outbuf_len == 0) {
+		wctxt->outbuf = NULL;
+	} else {
+		if (ctxt->dropped && !is_extended)
+			console_prepend_dropped(&pmsg, ctxt->dropped);
+		wctxt->outbuf = &pmsg.pbufs->outbuf[0];
+	}
+
+	wctxt->len = pmsg.outbuf_len;
+
+	return true;
+}
+
+/**
+ * cons_emit_record - Emit record in the acquired context
+ * @wctxt:	The write context that will be handed to the write function
+ *
+ * Returns:	False if the operation was aborted (takeover or handover).
+ *		True otherwise
+ *
+ * When false is returned, the caller is not allowed to touch console state.
+ * The console is owned by someone else. If the caller wants to print more
+ * it has to reacquire the console first.
+ *
+ * When true is returned, @wctxt->ctxt.backlog indicates whether there are
+ * still records pending in the ringbuffer,
+ */
+static int __maybe_unused cons_emit_record(struct cons_write_context *wctxt)
+{
+	struct cons_context *ctxt = &ACCESS_PRIVATE(wctxt, ctxt);
+	struct console *con = ctxt->console;
+	bool done = false;
+
+	/*
+	 * @con->dropped is not protected in case of hostile takeovers so
+	 * the update below is racy. Annotate it accordingly.
+	 */
+	ctxt->dropped = data_race(READ_ONCE(con->dropped));
+
+	/* Fill the output buffer with the next record */
+	ctxt->backlog = cons_get_record(wctxt);
+	if (!ctxt->backlog)
+		return true;
+
+	/* Safety point. Don't touch state in case of takeover */
+	if (!console_can_proceed(wctxt))
+		return false;
+
+	/* Counterpart to the read above */
+	WRITE_ONCE(con->dropped, ctxt->dropped);
+
+	/*
+	 * In case of skipped records, Update sequence state in @con.
+	 */
+	if (!wctxt->outbuf)
+		goto update;
+
+	/* Tell the driver about potential unsafe state */
+	wctxt->unsafe = ctxt->state.unsafe;
+
+	if (!ctxt->thread && con->write_atomic) {
+		done = con->write_atomic(con, wctxt);
+	} else {
+		cons_release(ctxt);
+		WARN_ON_ONCE(1);
+		return false;
+	}
+
+	/* If not done, the write was aborted due to takeover */
+	if (!done)
+		return false;
+
+	/* If there was a dropped message, it has now been output. */
+	if (ctxt->dropped) {
+		ctxt->dropped = 0;
+		/* Counterpart to the read above */
+		WRITE_ONCE(con->dropped, ctxt->dropped);
+	}
+update:
+	ctxt->newseq++;
+	/*
+	 * The sequence update attempt is not part of console_release()
+	 * because in panic situations the console is not released by
+	 * the panic CPU until all records are written. On 32bit the
+	 * sequence is separate from state anyway.
+	 */
+	return cons_seq_try_update(ctxt);
+}
+
 /**
  * cons_nobkl_init - Initialize the NOBKL console specific data
  * @con:	Console to initialize