[3/6] locking/rwsem: Rework writer wakeup

Message ID 20230223123319.487908155@infradead.org
State New
Headers
Series locking/rwsem: Rework writer wakeup and handoff |

Commit Message

Peter Zijlstra Feb. 23, 2023, 12:26 p.m. UTC
  Currently readers and writers have distinctly different wait/wake
methods. For readers the ->count adjustment happens on the wakeup
side, while for writers the ->count adjustment happens on the wait
side.

This asymmetry is unfortunate since the wake side has an additional
guarantee -- specifically, the wake side has observed the unlocked
state, and thus it can know that speculative READER_BIAS perbutations
on ->count are just that, they will be undone.

Additionally, unifying the wait/wake methods allows sharing code.

As such, do a straight-forward transform of the writer wakeup into the
wake side.

Signed-off-by: Peter Zijlstra (Intel) <peterz@infradead.org>
---
 kernel/locking/rwsem.c |  253 ++++++++++++++++++++++---------------------------
 1 file changed, 115 insertions(+), 138 deletions(-)
  

Comments

Waiman Long Feb. 23, 2023, 9:38 p.m. UTC | #1
On 2/23/23 07:26, Peter Zijlstra wrote:
> Currently readers and writers have distinctly different wait/wake
> methods. For readers the ->count adjustment happens on the wakeup
> side, while for writers the ->count adjustment happens on the wait
> side.
>
> This asymmetry is unfortunate since the wake side has an additional
> guarantee -- specifically, the wake side has observed the unlocked
> state, and thus it can know that speculative READER_BIAS perbutations
> on ->count are just that, they will be undone.
>
> Additionally, unifying the wait/wake methods allows sharing code.
>
> As such, do a straight-forward transform of the writer wakeup into the
> wake side.
>
> Signed-off-by: Peter Zijlstra (Intel) <peterz@infradead.org>
> ---
>   kernel/locking/rwsem.c |  253 ++++++++++++++++++++++---------------------------
>   1 file changed, 115 insertions(+), 138 deletions(-)
>
> --- a/kernel/locking/rwsem.c
> +++ b/kernel/locking/rwsem.c
> @@ -107,7 +107,7 @@
>    *
>    * There are three places where the lock handoff bit may be set or cleared.
>    * 1) rwsem_mark_wake() for readers		-- set, clear
> - * 2) rwsem_try_write_lock() for writers	-- set, clear
> + * 2) rwsem_writer_wake() for writers	-- set, clear
>    * 3) rwsem_del_waiter()			-- clear
>    *
>    * For all the above cases, wait_lock will be held. A writer must also
> @@ -377,7 +377,7 @@ rwsem_add_waiter(struct rw_semaphore *se
>   /*
>    * Remove a waiter from the wait_list and clear flags.
>    *
> - * Both rwsem_mark_wake() and rwsem_try_write_lock() contain a full 'copy' of
> + * Both rwsem_mark_wake() and rwsem_writer_wake() contain a full 'copy' of
>    * this function. Modify with care.
>    *
>    * Return: true if wait_list isn't empty and false otherwise
> @@ -394,6 +394,100 @@ rwsem_del_waiter(struct rw_semaphore *se
>   	return false;
>   }
>   
> +static inline void
> +rwsem_waiter_wake(struct rwsem_waiter *waiter, struct wake_q_head *wake_q)
> +{
> +	struct task_struct *tsk;
> +
> +	tsk = waiter->task;
> +	get_task_struct(tsk);
> +
> +	/*
> +	 * Ensure calling get_task_struct() before setting the reader
> +	 * waiter to nil such that rwsem_down_read_slowpath() cannot
> +	 * race with do_exit() by always holding a reference count
> +	 * to the task to wakeup.
> +	 */
> +	smp_store_release(&waiter->task, NULL);
> +	/*
> +	 * Ensure issuing the wakeup (either by us or someone else)
> +	 * after setting the reader waiter to nil.
> +	 */
> +	wake_q_add_safe(wake_q, tsk);
> +}
> +
> +/*
> + * This function must be called with the sem->wait_lock held to prevent
> + * race conditions between checking the rwsem wait list and setting the
> + * sem->count accordingly.
> + *
> + * Implies rwsem_del_waiter() on success.
> + */
> +static void rwsem_writer_wake(struct rw_semaphore *sem,
> +			      struct rwsem_waiter *waiter,
> +			      struct wake_q_head *wake_q)
> +{
> +	struct rwsem_waiter *first = rwsem_first_waiter(sem);
> +	long count, new;
> +
> +	lockdep_assert_held(&sem->wait_lock);
> +
> +	count = atomic_long_read(&sem->count);
> +	do {
> +		bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
> +
> +		if (has_handoff) {
> +			/*
> +			 * Honor handoff bit and yield only when the first
> +			 * waiter is the one that set it. Otherwisee, we
> +			 * still try to acquire the rwsem.
> +			 */
> +			if (first->handoff_set && (waiter != first))
> +				return;
> +		}
This "if" statement if for a non-first waiter that somehow got woken up 
to have a chance to steal the lock. Now the handoff is done in the wake 
side for the first waiter, this "if" statement is not applicable and can 
be removed.
> +
> +		new = count;
> +
> +		if (count & RWSEM_LOCK_MASK) {
> +			/*
> +			 * A waiter (first or not) can set the handoff bit
> +			 * if it is an RT task or wait in the wait queue
> +			 * for too long.
> +			 */
> +			if (has_handoff || (!rt_task(waiter->task) &&
> +					    !time_after(jiffies, waiter->timeout)))
> +				return;
> +
> +			new |= RWSEM_FLAG_HANDOFF;
> +		} else {
> +			new |= RWSEM_WRITER_LOCKED;
> +			new &= ~RWSEM_FLAG_HANDOFF;
> +
> +			if (list_is_singular(&sem->wait_list))
> +				new &= ~RWSEM_FLAG_WAITERS;
> +		}
> +	} while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
> +
> +	/*
> +	 * We have either acquired the lock with handoff bit cleared or set
> +	 * the handoff bit. Only the first waiter can have its handoff_set
> +	 * set here to enable optimistic spinning in slowpath loop.
> +	 */
> +	if (new & RWSEM_FLAG_HANDOFF) {
> +		first->handoff_set = true;
> +		lockevent_inc(rwsem_wlock_handoff);
> +		return;
> +	}
> +
> +	/*
> +	 * Have rwsem_writer_wake() fully imply rwsem_del_waiter() on
> +	 * success.
> +	 */
> +	list_del(&waiter->list);
> +	rwsem_set_owner(sem);
> +	rwsem_waiter_wake(waiter, wake_q);
> +}
> +
>   /*
>    * handle the lock release when processes blocked on it that can now run
>    * - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must
> @@ -424,23 +518,12 @@ static void rwsem_mark_wake(struct rw_se
>   	 */
>   	waiter = rwsem_first_waiter(sem);
>   
> -	if (waiter->type != RWSEM_WAITING_FOR_WRITE)
> -		goto wake_readers;
> -
> -	if (wake_type == RWSEM_WAKE_ANY) {
> -		/*
> -		 * Mark writer at the front of the queue for wakeup.
> -		 * Until the task is actually later awoken later by
> -		 * the caller, other writers are able to steal it.
> -		 * Readers, on the other hand, will block as they
> -		 * will notice the queued writer.
> -		 */
> -		wake_q_add(wake_q, waiter->task);
> -		lockevent_inc(rwsem_wake_writer);
> +	if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
> +		if (wake_type == RWSEM_WAKE_ANY)
> +			rwsem_writer_wake(sem, waiter, wake_q);
> +		return;
>   	}
> -	return;
>   
> -wake_readers:
>   	/*
>   	 * No reader wakeup if there are too many of them already.
>   	 */
> @@ -547,25 +630,8 @@ static void rwsem_mark_wake(struct rw_se
>   		atomic_long_add(adjustment, &sem->count);
>   
>   	/* 2nd pass */
> -	list_for_each_entry_safe(waiter, tmp, &wlist, list) {
> -		struct task_struct *tsk;
> -
> -		tsk = waiter->task;
> -		get_task_struct(tsk);
> -
> -		/*
> -		 * Ensure calling get_task_struct() before setting the reader
> -		 * waiter to nil such that rwsem_down_read_slowpath() cannot
> -		 * race with do_exit() by always holding a reference count
> -		 * to the task to wakeup.
> -		 */
> -		smp_store_release(&waiter->task, NULL);
> -		/*
> -		 * Ensure issuing the wakeup (either by us or someone else)
> -		 * after setting the reader waiter to nil.
> -		 */
> -		wake_q_add_safe(wake_q, tsk);
> -	}
> +	list_for_each_entry_safe(waiter, tmp, &wlist, list)
> +		rwsem_waiter_wake(waiter, wake_q);
>   }
>   
>   /*
> @@ -596,77 +662,6 @@ rwsem_del_wake_waiter(struct rw_semaphor
>   }
>   
>   /*
> - * This function must be called with the sem->wait_lock held to prevent
> - * race conditions between checking the rwsem wait list and setting the
> - * sem->count accordingly.
> - *
> - * Implies rwsem_del_waiter() on success.
> - */
> -static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
> -					struct rwsem_waiter *waiter)
> -{
> -	struct rwsem_waiter *first = rwsem_first_waiter(sem);
> -	long count, new;
> -
> -	lockdep_assert_held(&sem->wait_lock);
> -
> -	count = atomic_long_read(&sem->count);
> -	do {
> -		bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
> -
> -		if (has_handoff) {
> -			/*
> -			 * Honor handoff bit and yield only when the first
> -			 * waiter is the one that set it. Otherwisee, we
> -			 * still try to acquire the rwsem.
> -			 */
> -			if (first->handoff_set && (waiter != first))
> -				return false;
> -		}
> -
> -		new = count;
> -
> -		if (count & RWSEM_LOCK_MASK) {
> -			/*
> -			 * A waiter (first or not) can set the handoff bit
> -			 * if it is an RT task or wait in the wait queue
> -			 * for too long.
> -			 */
> -			if (has_handoff || (!rt_task(waiter->task) &&
> -					    !time_after(jiffies, waiter->timeout)))
> -				return false;
> -
> -			new |= RWSEM_FLAG_HANDOFF;
> -		} else {
> -			new |= RWSEM_WRITER_LOCKED;
> -			new &= ~RWSEM_FLAG_HANDOFF;
> -
> -			if (list_is_singular(&sem->wait_list))
> -				new &= ~RWSEM_FLAG_WAITERS;
> -		}
> -	} while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
> -
> -	/*
> -	 * We have either acquired the lock with handoff bit cleared or set
> -	 * the handoff bit. Only the first waiter can have its handoff_set
> -	 * set here to enable optimistic spinning in slowpath loop.
> -	 */
> -	if (new & RWSEM_FLAG_HANDOFF) {
> -		first->handoff_set = true;
> -		lockevent_inc(rwsem_wlock_handoff);
> -		return false;
> -	}
> -
> -	/*
> -	 * Have rwsem_try_write_lock() fully imply rwsem_del_waiter() on
> -	 * success.
> -	 */
> -	list_del(&waiter->list);
> -	rwsem_set_owner(sem);
> -	return true;
> -}
> -
> -/*
>    * The rwsem_spin_on_owner() function returns the following 4 values
>    * depending on the lock owner state.
>    *   OWNER_NULL  : owner is currently NULL
> @@ -1072,7 +1067,7 @@ rwsem_down_read_slowpath(struct rw_semap
>   	for (;;) {
>   		set_current_state(state);
>   		if (!smp_load_acquire(&waiter.task)) {
> -			/* Matches rwsem_mark_wake()'s smp_store_release(). */
> +			/* Matches rwsem_waiter_wake()'s smp_store_release(). */
>   			break;
>   		}
>   		if (signal_pending_state(state, current)) {
> @@ -1143,54 +1138,36 @@ rwsem_down_write_slowpath(struct rw_sema
>   	} else {
>   		atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);
>   	}
> +	raw_spin_unlock_irq(&sem->wait_lock);
>   
>   	/* wait until we successfully acquire the lock */
> -	set_current_state(state);
>   	trace_contention_begin(sem, LCB_F_WRITE);
>   
>   	for (;;) {
> -		if (rwsem_try_write_lock(sem, &waiter)) {
> -			/* rwsem_try_write_lock() implies ACQUIRE on success */
> +		set_current_state(state);
> +		if (!smp_load_acquire(&waiter.task)) {
> +			/* Matches rwsem_waiter_wake()'s smp_store_release(). */
>   			break;
>   		}
> -
> -		raw_spin_unlock_irq(&sem->wait_lock);
> -
> -		if (signal_pending_state(state, current))
> -			goto out_nolock;
> -
> -		/*
> -		 * After setting the handoff bit and failing to acquire
> -		 * the lock, attempt to spin on owner to accelerate lock
> -		 * transfer. If the previous owner is a on-cpu writer and it
> -		 * has just released the lock, OWNER_NULL will be returned.
> -		 * In this case, we attempt to acquire the lock again
> -		 * without sleeping.
> -		 */
> -		if (waiter.handoff_set) {
> -			enum owner_state owner_state;
> -
> -			owner_state = rwsem_spin_on_owner(sem);
> -			if (owner_state == OWNER_NULL)
> -				goto trylock_again;
> +		if (signal_pending_state(state, current)) {
> +			raw_spin_lock_irq(&sem->wait_lock);
> +			if (waiter.task)
> +				goto out_nolock;
> +			raw_spin_unlock_irq(&sem->wait_lock);
> +			/* Ordered by sem->wait_lock against rwsem_mark_wake(). */
> +			break;
>   		}
> -
>   		schedule_preempt_disabled();
>   		lockevent_inc(rwsem_sleep_writer);
> -		set_current_state(state);
> -trylock_again:
> -		raw_spin_lock_irq(&sem->wait_lock);
>   	}
>   	__set_current_state(TASK_RUNNING);
> -	raw_spin_unlock_irq(&sem->wait_lock);
>   	lockevent_inc(rwsem_wlock);
>   	trace_contention_end(sem, 0);
>   	return sem;
>   
>   out_nolock:
> -	__set_current_state(TASK_RUNNING);
> -	raw_spin_lock_irq(&sem->wait_lock);
>   	rwsem_del_wake_waiter(sem, &waiter, &wake_q);
> +	__set_current_state(TASK_RUNNING);
>   	lockevent_inc(rwsem_wlock_fail);
>   	trace_contention_end(sem, -EINTR);
>   	return ERR_PTR(-EINTR);

I believe it is better to change state inside the wait_lock critical 
section to provide a release barrier for free.

Cheers,
Longman
  
Peter Zijlstra Feb. 26, 2023, 11:58 a.m. UTC | #2
On Thu, Feb 23, 2023 at 04:38:08PM -0500, Waiman Long wrote:

> > +static void rwsem_writer_wake(struct rw_semaphore *sem,
> > +			      struct rwsem_waiter *waiter,
> > +			      struct wake_q_head *wake_q)
> > +{
> > +	struct rwsem_waiter *first = rwsem_first_waiter(sem);
> > +	long count, new;
> > +
> > +	lockdep_assert_held(&sem->wait_lock);
> > +
> > +	count = atomic_long_read(&sem->count);
> > +	do {
> > +		bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
> > +
> > +		if (has_handoff) {
> > +			/*
> > +			 * Honor handoff bit and yield only when the first
> > +			 * waiter is the one that set it. Otherwisee, we
> > +			 * still try to acquire the rwsem.
> > +			 */
> > +			if (first->handoff_set && (waiter != first))
> > +				return;
> > +		}
> This "if" statement if for a non-first waiter that somehow got woken up to
> have a chance to steal the lock. Now the handoff is done in the wake side
> for the first waiter, this "if" statement is not applicable and can be
> removed.

Yeah, that can be cleaned up, something like the below. But that doesn't
appear to be the cause of issues.

--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -427,25 +427,12 @@ static void rwsem_writer_wake(struct rw_
 			      struct rwsem_waiter *waiter,
 			      struct wake_q_head *wake_q)
 {
-	struct rwsem_waiter *first = rwsem_first_waiter(sem);
 	long count, new;
 
 	lockdep_assert_held(&sem->wait_lock);
 
 	count = atomic_long_read(&sem->count);
 	do {
-		bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
-
-		if (has_handoff) {
-			/*
-			 * Honor handoff bit and yield only when the first
-			 * waiter is the one that set it. Otherwisee, we
-			 * still try to acquire the rwsem.
-			 */
-			if (first->handoff_set && (waiter != first))
-				return;
-		}
-
 		new = count;
 
 		if (count & RWSEM_LOCK_MASK) {
@@ -454,8 +441,9 @@ static void rwsem_writer_wake(struct rw_
 			 * if it is an RT task or wait in the wait queue
 			 * for too long.
 			 */
-			if (has_handoff || (!rt_task(waiter->task) &&
-					    !time_after(jiffies, waiter->timeout)))
+			if ((count & RWSEM_FLAG_HANDOFF) ||
+			    (!rt_task(waiter->task) &&
+			     !time_after(jiffies, waiter->timeout)))
 				return;
 
 			new |= RWSEM_FLAG_HANDOFF;
@@ -474,7 +462,7 @@ static void rwsem_writer_wake(struct rw_
 	 * set here to enable optimistic spinning in slowpath loop.
 	 */
 	if (new & RWSEM_FLAG_HANDOFF) {
-		first->handoff_set = true;
+		waiter->handoff_set = true;
 		lockevent_inc(rwsem_wlock_handoff);
 		return;
 	}
  
Peter Zijlstra Feb. 26, 2023, 11:59 a.m. UTC | #3
On Thu, Feb 23, 2023 at 01:26:45PM +0100, Peter Zijlstra wrote:
> +/*
> + * This function must be called with the sem->wait_lock held to prevent
> + * race conditions between checking the rwsem wait list and setting the
> + * sem->count accordingly.
> + *
> + * Implies rwsem_del_waiter() on success.
> + */
> +static void rwsem_writer_wake(struct rw_semaphore *sem,
> +			      struct rwsem_waiter *waiter,
> +			      struct wake_q_head *wake_q)
> +{
> +	struct rwsem_waiter *first = rwsem_first_waiter(sem);
> +	long count, new;
> +
> +	lockdep_assert_held(&sem->wait_lock);
> +
> +	count = atomic_long_read(&sem->count);
> +	do {
> +		bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
> +
> +		if (has_handoff) {
> +			/*
> +			 * Honor handoff bit and yield only when the first
> +			 * waiter is the one that set it. Otherwisee, we
> +			 * still try to acquire the rwsem.
> +			 */
> +			if (first->handoff_set && (waiter != first))
> +				return;
> +		}
> +
> +		new = count;
> +
> +		if (count & RWSEM_LOCK_MASK) {
> +			/*
> +			 * A waiter (first or not) can set the handoff bit
> +			 * if it is an RT task or wait in the wait queue
> +			 * for too long.
> +			 */
> +			if (has_handoff || (!rt_task(waiter->task) &&
> +					    !time_after(jiffies, waiter->timeout)))
> +				return;
> +
> +			new |= RWSEM_FLAG_HANDOFF;
> +		} else {
> +			new |= RWSEM_WRITER_LOCKED;
> +			new &= ~RWSEM_FLAG_HANDOFF;
> +
> +			if (list_is_singular(&sem->wait_list))
> +				new &= ~RWSEM_FLAG_WAITERS;
> +		}
> +	} while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
> +
> +	/*
> +	 * We have either acquired the lock with handoff bit cleared or set
> +	 * the handoff bit. Only the first waiter can have its handoff_set
> +	 * set here to enable optimistic spinning in slowpath loop.
> +	 */
> +	if (new & RWSEM_FLAG_HANDOFF) {
> +		first->handoff_set = true;
> +		lockevent_inc(rwsem_wlock_handoff);
> +		return;
> +	}
> +
> +	/*
> +	 * Have rwsem_writer_wake() fully imply rwsem_del_waiter() on
> +	 * success.
> +	 */
> +	list_del(&waiter->list);
> +	rwsem_set_owner(sem);

At the very least this needs to be:

	atomic_long_set(&sem->owner, (long)waiter->task);

> +	rwsem_waiter_wake(waiter, wake_q);
> +}
  
Peter Zijlstra Feb. 26, 2023, noon UTC | #4
On Thu, Feb 23, 2023 at 04:38:08PM -0500, Waiman Long wrote:

> > @@ -1143,54 +1138,36 @@ rwsem_down_write_slowpath(struct rw_sema
> >   	} else {
> >   		atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);
> >   	}
> > +	raw_spin_unlock_irq(&sem->wait_lock);
> >   	/* wait until we successfully acquire the lock */
> > -	set_current_state(state);
> >   	trace_contention_begin(sem, LCB_F_WRITE);
> >   	for (;;) {
> > -		if (rwsem_try_write_lock(sem, &waiter)) {
> > -			/* rwsem_try_write_lock() implies ACQUIRE on success */
> > +		set_current_state(state);
> > +		if (!smp_load_acquire(&waiter.task)) {
> > +			/* Matches rwsem_waiter_wake()'s smp_store_release(). */
> >   			break;
> >   		}
> > -
> > -		raw_spin_unlock_irq(&sem->wait_lock);
> > -
> > -		if (signal_pending_state(state, current))
> > -			goto out_nolock;
> > -
> > -		/*
> > -		 * After setting the handoff bit and failing to acquire
> > -		 * the lock, attempt to spin on owner to accelerate lock
> > -		 * transfer. If the previous owner is a on-cpu writer and it
> > -		 * has just released the lock, OWNER_NULL will be returned.
> > -		 * In this case, we attempt to acquire the lock again
> > -		 * without sleeping.
> > -		 */
> > -		if (waiter.handoff_set) {
> > -			enum owner_state owner_state;
> > -
> > -			owner_state = rwsem_spin_on_owner(sem);
> > -			if (owner_state == OWNER_NULL)
> > -				goto trylock_again;
> > +		if (signal_pending_state(state, current)) {
> > +			raw_spin_lock_irq(&sem->wait_lock);
> > +			if (waiter.task)
> > +				goto out_nolock;
> > +			raw_spin_unlock_irq(&sem->wait_lock);
> > +			/* Ordered by sem->wait_lock against rwsem_mark_wake(). */
> > +			break;
> >   		}
> > -
> >   		schedule_preempt_disabled();
> >   		lockevent_inc(rwsem_sleep_writer);
> > -		set_current_state(state);
> > -trylock_again:
> > -		raw_spin_lock_irq(&sem->wait_lock);
> >   	}
> >   	__set_current_state(TASK_RUNNING);
> > -	raw_spin_unlock_irq(&sem->wait_lock);
> >   	lockevent_inc(rwsem_wlock);
> >   	trace_contention_end(sem, 0);
> >   	return sem;
> >   out_nolock:
> > -	__set_current_state(TASK_RUNNING);
> > -	raw_spin_lock_irq(&sem->wait_lock);
> >   	rwsem_del_wake_waiter(sem, &waiter, &wake_q);
> > +	__set_current_state(TASK_RUNNING);
> >   	lockevent_inc(rwsem_wlock_fail);
> >   	trace_contention_end(sem, -EINTR);
> >   	return ERR_PTR(-EINTR);
> 
> I believe it is better to change state inside the wait_lock critical section
> to provide a release barrier for free.

I can't follow... a release for what? Note that the reader slowpath has
this exact form already.
  
Peter Zijlstra Feb. 26, 2023, 3:04 p.m. UTC | #5
On Thu, Feb 23, 2023 at 01:26:45PM +0100, Peter Zijlstra wrote:
> @@ -1072,7 +1067,7 @@ rwsem_down_read_slowpath(struct rw_semap
>  	for (;;) {
>  		set_current_state(state);
>  		if (!smp_load_acquire(&waiter.task)) {
> -			/* Matches rwsem_mark_wake()'s smp_store_release(). */
> +			/* Matches rwsem_waiter_wake()'s smp_store_release(). */
>  			break;
>  		}
>  		if (signal_pending_state(state, current)) {
> @@ -1143,54 +1138,36 @@ rwsem_down_write_slowpath(struct rw_sema
>  	} else {
>  		atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);

Found it; if we remove the try_write_lock below, then at least this
new-waiter path needs to still do a trylock.

Let me go test the other patches on top of all this and push out a fresh
set if that all still works.

>  	}
> +	raw_spin_unlock_irq(&sem->wait_lock);
>  
>  	/* wait until we successfully acquire the lock */
> -	set_current_state(state);
>  	trace_contention_begin(sem, LCB_F_WRITE);
>  
>  	for (;;) {
> -		if (rwsem_try_write_lock(sem, &waiter)) {
> -			/* rwsem_try_write_lock() implies ACQUIRE on success */
> +		set_current_state(state);
> +		if (!smp_load_acquire(&waiter.task)) {
> +			/* Matches rwsem_waiter_wake()'s smp_store_release(). */
>  			break;
>  		}
> -
> -		raw_spin_unlock_irq(&sem->wait_lock);
> -
> -		if (signal_pending_state(state, current))
> -			goto out_nolock;
> -
> -		/*
> -		 * After setting the handoff bit and failing to acquire
> -		 * the lock, attempt to spin on owner to accelerate lock
> -		 * transfer. If the previous owner is a on-cpu writer and it
> -		 * has just released the lock, OWNER_NULL will be returned.
> -		 * In this case, we attempt to acquire the lock again
> -		 * without sleeping.
> -		 */
> -		if (waiter.handoff_set) {
> -			enum owner_state owner_state;
> -
> -			owner_state = rwsem_spin_on_owner(sem);
> -			if (owner_state == OWNER_NULL)
> -				goto trylock_again;
> +		if (signal_pending_state(state, current)) {
> +			raw_spin_lock_irq(&sem->wait_lock);
> +			if (waiter.task)
> +				goto out_nolock;
> +			raw_spin_unlock_irq(&sem->wait_lock);
> +			/* Ordered by sem->wait_lock against rwsem_mark_wake(). */
> +			break;
>  		}
> -
>  		schedule_preempt_disabled();
>  		lockevent_inc(rwsem_sleep_writer);
> -		set_current_state(state);
> -trylock_again:
> -		raw_spin_lock_irq(&sem->wait_lock);
>  	}
>  	__set_current_state(TASK_RUNNING);
> -	raw_spin_unlock_irq(&sem->wait_lock);
>  	lockevent_inc(rwsem_wlock);
>  	trace_contention_end(sem, 0);
>  	return sem;
>  
>  out_nolock:
> -	__set_current_state(TASK_RUNNING);
> -	raw_spin_lock_irq(&sem->wait_lock);
>  	rwsem_del_wake_waiter(sem, &waiter, &wake_q);
> +	__set_current_state(TASK_RUNNING);
>  	lockevent_inc(rwsem_wlock_fail);
>  	trace_contention_end(sem, -EINTR);
>  	return ERR_PTR(-EINTR);
> 
>
  
Peter Zijlstra Feb. 26, 2023, 4:51 p.m. UTC | #6
On Sun, Feb 26, 2023 at 04:04:35PM +0100, Peter Zijlstra wrote:
> On Thu, Feb 23, 2023 at 01:26:45PM +0100, Peter Zijlstra wrote:
> > @@ -1072,7 +1067,7 @@ rwsem_down_read_slowpath(struct rw_semap
> >  	for (;;) {
> >  		set_current_state(state);
> >  		if (!smp_load_acquire(&waiter.task)) {
> > -			/* Matches rwsem_mark_wake()'s smp_store_release(). */
> > +			/* Matches rwsem_waiter_wake()'s smp_store_release(). */
> >  			break;
> >  		}
> >  		if (signal_pending_state(state, current)) {
> > @@ -1143,54 +1138,36 @@ rwsem_down_write_slowpath(struct rw_sema
> >  	} else {
> >  		atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);
> 
> Found it; if we remove the try_write_lock below, then at least this
> new-waiter path needs to still do a trylock.
> 
> Let me go test the other patches on top of all this and push out a fresh
> set if that all still works.

queue.git locking/core

We'll see what the robots make of it.
  
Waiman Long Feb. 26, 2023, 9:31 p.m. UTC | #7
On 2/26/23 07:00, Peter Zijlstra wrote:
> On Thu, Feb 23, 2023 at 04:38:08PM -0500, Waiman Long wrote:
>
>>> @@ -1143,54 +1138,36 @@ rwsem_down_write_slowpath(struct rw_sema
>>>    	} else {
>>>    		atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);
>>>    	}
>>> +	raw_spin_unlock_irq(&sem->wait_lock);
>>>    	/* wait until we successfully acquire the lock */
>>> -	set_current_state(state);
>>>    	trace_contention_begin(sem, LCB_F_WRITE);
>>>    	for (;;) {
>>> -		if (rwsem_try_write_lock(sem, &waiter)) {
>>> -			/* rwsem_try_write_lock() implies ACQUIRE on success */
>>> +		set_current_state(state);
>>> +		if (!smp_load_acquire(&waiter.task)) {
>>> +			/* Matches rwsem_waiter_wake()'s smp_store_release(). */
>>>    			break;
>>>    		}
>>> -
>>> -		raw_spin_unlock_irq(&sem->wait_lock);
>>> -
>>> -		if (signal_pending_state(state, current))
>>> -			goto out_nolock;
>>> -
>>> -		/*
>>> -		 * After setting the handoff bit and failing to acquire
>>> -		 * the lock, attempt to spin on owner to accelerate lock
>>> -		 * transfer. If the previous owner is a on-cpu writer and it
>>> -		 * has just released the lock, OWNER_NULL will be returned.
>>> -		 * In this case, we attempt to acquire the lock again
>>> -		 * without sleeping.
>>> -		 */
>>> -		if (waiter.handoff_set) {
>>> -			enum owner_state owner_state;
>>> -
>>> -			owner_state = rwsem_spin_on_owner(sem);
>>> -			if (owner_state == OWNER_NULL)
>>> -				goto trylock_again;
>>> +		if (signal_pending_state(state, current)) {
>>> +			raw_spin_lock_irq(&sem->wait_lock);
>>> +			if (waiter.task)
>>> +				goto out_nolock;
>>> +			raw_spin_unlock_irq(&sem->wait_lock);
>>> +			/* Ordered by sem->wait_lock against rwsem_mark_wake(). */
>>> +			break;
>>>    		}
>>> -
>>>    		schedule_preempt_disabled();
>>>    		lockevent_inc(rwsem_sleep_writer);
>>> -		set_current_state(state);
>>> -trylock_again:
>>> -		raw_spin_lock_irq(&sem->wait_lock);
>>>    	}
>>>    	__set_current_state(TASK_RUNNING);
>>> -	raw_spin_unlock_irq(&sem->wait_lock);
>>>    	lockevent_inc(rwsem_wlock);
>>>    	trace_contention_end(sem, 0);
>>>    	return sem;
>>>    out_nolock:
>>> -	__set_current_state(TASK_RUNNING);
>>> -	raw_spin_lock_irq(&sem->wait_lock);
>>>    	rwsem_del_wake_waiter(sem, &waiter, &wake_q);
>>> +	__set_current_state(TASK_RUNNING);
>>>    	lockevent_inc(rwsem_wlock_fail);
>>>    	trace_contention_end(sem, -EINTR);
>>>    	return ERR_PTR(-EINTR);
>> I believe it is better to change state inside the wait_lock critical section
>> to provide a release barrier for free.
> I can't follow... a release for what? Note that the reader slowpath has
> this exact form already.\

You are right. I forgot that we don't need synchronization when setting 
state to TASK_RUNNING.

Cheers,
Longman
  
Waiman Long Feb. 27, 2023, 12:22 a.m. UTC | #8
On 2/26/23 11:51, Peter Zijlstra wrote:
> On Sun, Feb 26, 2023 at 04:04:35PM +0100, Peter Zijlstra wrote:
>> On Thu, Feb 23, 2023 at 01:26:45PM +0100, Peter Zijlstra wrote:
>>> @@ -1072,7 +1067,7 @@ rwsem_down_read_slowpath(struct rw_semap
>>>   	for (;;) {
>>>   		set_current_state(state);
>>>   		if (!smp_load_acquire(&waiter.task)) {
>>> -			/* Matches rwsem_mark_wake()'s smp_store_release(). */
>>> +			/* Matches rwsem_waiter_wake()'s smp_store_release(). */
>>>   			break;
>>>   		}
>>>   		if (signal_pending_state(state, current)) {
>>> @@ -1143,54 +1138,36 @@ rwsem_down_write_slowpath(struct rw_sema
>>>   	} else {
>>>   		atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);
>> Found it; if we remove the try_write_lock below, then at least this
>> new-waiter path needs to still do a trylock.
>>
>> Let me go test the other patches on top of all this and push out a fresh
>> set if that all still works.
> queue.git locking/core
>
> We'll see what the robots make of it.

 From your new patch 3:

@@ -1151,55 +1154,39 @@ rwsem_down_write_slowpath(struct rw_semaphore 
*sem, int state)
                 }
         } else {
                 atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);
+               if (rwsem_try_write_lock(sem, &waiter))
+                       waiter.task = NULL;
         }
+       raw_spin_unlock_irq(&sem->wait_lock);

         /* wait until we successfully acquire the lock */
-       set_current_state(state);
         trace_contention_begin(sem, LCB_F_WRITE);

         for (;;) {
-               if (rwsem_try_write_lock(sem, &waiter)) {
-                       /* rwsem_try_write_lock() implies ACQUIRE on 
success */
+               set_current_state(state);
+               if (!smp_load_acquire(&waiter.task)) {
+                       /* Matches rwsem_waiter_wake()'s 
smp_store_release(). */
                         break;
                 }
-

The additional rwsem_try_write_lock() call seems to address the missed 
wakeup problem AFAICT.

I do have some concern that early lock transfer to a lock owner that has 
not been woken up yet may suppress writer lock stealing from optimistic 
spinning causing some performance regression in some cases. Let's see if 
the test robot report anything.

Cheers,
Longman
  
Peter Zijlstra Feb. 27, 2023, 10:31 a.m. UTC | #9
On Sun, Feb 26, 2023 at 07:22:47PM -0500, Waiman Long wrote:

> @@ -1151,55 +1154,39 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem,
> int state)
>                 }
>         } else {
>                 atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);
> +               if (rwsem_try_write_lock(sem, &waiter))
> +                       waiter.task = NULL;
>         }
> +       raw_spin_unlock_irq(&sem->wait_lock);
> 
>         /* wait until we successfully acquire the lock */
> -       set_current_state(state);
>         trace_contention_begin(sem, LCB_F_WRITE);
> 
>         for (;;) {
> -               if (rwsem_try_write_lock(sem, &waiter)) {
> -                       /* rwsem_try_write_lock() implies ACQUIRE on success
> */
> +               set_current_state(state);
> +               if (!smp_load_acquire(&waiter.task)) {
> +                       /* Matches rwsem_waiter_wake()'s
> smp_store_release(). */
>                         break;
>                 }
> -
> 
> The additional rwsem_try_write_lock() call seems to address the missed
> wakeup problem AFAICT.

Indeed, prior to this I could readily reproduce the lockup.

So when thinking about missing wakeups I noticed this race on WAITERS.
If we queue but the unlock does not yet observe WAITERS the unlock does
not go into the slow path and wakeup gets lost.

Reader side fixes this with rwsem_cond_wake_waiter(), but I could not
convince myself that is correct for writer side -- perhaps it is, will
need to think more on that.

> I do have some concern that early lock transfer to a lock owner that has not
> been woken up yet may suppress writer lock stealing from optimistic spinning
> causing some performance regression in some cases. Let's see if the test
> robot report anything.

Ah yes, I suppose that is indeed a possibility. Given this is all under
wait_lock and the spinner is not, I was hoping it would still have
sufficient time to win. But yes, robots will tell us.
  
Waiman Long Feb. 27, 2023, 8:16 p.m. UTC | #10
On 2/27/23 05:31, Peter Zijlstra wrote:
>> I do have some concern that early lock transfer to a lock owner that has not
>> been woken up yet may suppress writer lock stealing from optimistic spinning
>> causing some performance regression in some cases. Let's see if the test
>> robot report anything.
> Ah yes, I suppose that is indeed a possibility. Given this is all under
> wait_lock and the spinner is not, I was hoping it would still have
> sufficient time to win. But yes, robots will tell us.
>
I run my rwsem locking microbenchmark on a 2-socket 96-thread x86-64
system with lock event turned on for 15 secs.

Before this patchset:

Running locktest with rwsem [runtime = 15s, r% = 50%, load = 100]
Threads = 96, Min/Mean/Max = 74,506/91,260/112,409
Threads = 96, Total Rate = 584,091 op/s; Percpu Rate = 6,084 op/s

rwsem_opt_fail=127305
rwsem_opt_lock=4252147
rwsem_opt_nospin=28920
rwsem_rlock=2713129
rwsem_rlock_fail=0
rwsem_rlock_fast=5
rwsem_rlock_handoff=280
rwsem_rlock_steal=1486617
rwsem_sleep_reader=2713085
rwsem_sleep_writer=4313369
rwsem_wake_reader=29876
rwsem_wake_writer=5829160
rwsem_wlock=127305
rwsem_wlock_fail=0
rwsem_wlock_handoff=2515

After this patchset:

Running locktest with rwsem [runtime = 15s, r% = 50%, load = 100]
Threads = 96, Min/Mean/Max = 26,573/26,749/26,833
Threads = 96, Total Rate = 171,184 op/s; Percpu Rate = 1,783 op/s

rwsem_opt_fail=1265481
rwsem_opt_lock=17939
rwsem_rlock=1266157
rwsem_rlock_fail=0
rwsem_rlock_fast=0
rwsem_rlock_handoff=0
rwsem_rlock_steal=551
rwsem_sleep_reader=1266157
rwsem_sleep_writer=1265481
rwsem_wake_reader=26612
rwsem_wake_writer=0
rwsem_wlock=1265481
rwsem_wlock_ehandoff=94
rwsem_wlock_fail=0
rwsem_wlock_handoff=94

So the locking rate is reduced to just 29.3% of the original. Looking at
the number of successful writer lock stealings from optimistic spinning
(rwsem_opt_lock), it is reduced from 4252147 to 17939. It is just about
0.4% of the original.

So for workloads that have a lot of writer contention, there will be
performance regressions. Do you mind if we try to keep the original
logic of my patchset to allow write lock acquisition in writer slow
path, but transfer the lock ownership in the wakeup path when handoff
is required. We can do this with some minor code changes on top of your
current patchset.

Regards,
Longman
  
Peter Zijlstra March 20, 2023, 8:12 a.m. UTC | #11
On Mon, Feb 27, 2023 at 03:16:25PM -0500, Waiman Long wrote:
> On 2/27/23 05:31, Peter Zijlstra wrote:
> > > I do have some concern that early lock transfer to a lock owner that has not
> > > been woken up yet may suppress writer lock stealing from optimistic spinning
> > > causing some performance regression in some cases. Let's see if the test
> > > robot report anything.
> > Ah yes, I suppose that is indeed a possibility. Given this is all under
> > wait_lock and the spinner is not, I was hoping it would still have
> > sufficient time to win. But yes, robots will tell us.
> > 
> I run my rwsem locking microbenchmark on a 2-socket 96-thread x86-64
> system with lock event turned on for 15 secs.
> 
> Before this patchset:
> 
> Running locktest with rwsem [runtime = 15s, r% = 50%, load = 100]
> Threads = 96, Min/Mean/Max = 74,506/91,260/112,409
> Threads = 96, Total Rate = 584,091 op/s; Percpu Rate = 6,084 op/s
> 
> rwsem_opt_fail=127305
> rwsem_opt_lock=4252147
> rwsem_opt_nospin=28920
> rwsem_rlock=2713129
> rwsem_rlock_fail=0
> rwsem_rlock_fast=5
> rwsem_rlock_handoff=280
> rwsem_rlock_steal=1486617
> rwsem_sleep_reader=2713085
> rwsem_sleep_writer=4313369
> rwsem_wake_reader=29876
> rwsem_wake_writer=5829160
> rwsem_wlock=127305
> rwsem_wlock_fail=0
> rwsem_wlock_handoff=2515
> 
> After this patchset:
> 
> Running locktest with rwsem [runtime = 15s, r% = 50%, load = 100]
> Threads = 96, Min/Mean/Max = 26,573/26,749/26,833
> Threads = 96, Total Rate = 171,184 op/s; Percpu Rate = 1,783 op/s
> 
> rwsem_opt_fail=1265481
> rwsem_opt_lock=17939
> rwsem_rlock=1266157
> rwsem_rlock_fail=0
> rwsem_rlock_fast=0
> rwsem_rlock_handoff=0
> rwsem_rlock_steal=551
> rwsem_sleep_reader=1266157
> rwsem_sleep_writer=1265481
> rwsem_wake_reader=26612
> rwsem_wake_writer=0
> rwsem_wlock=1265481
> rwsem_wlock_ehandoff=94
> rwsem_wlock_fail=0
> rwsem_wlock_handoff=94
> 
> So the locking rate is reduced to just 29.3% of the original. Looking at
> the number of successful writer lock stealings from optimistic spinning
> (rwsem_opt_lock), it is reduced from 4252147 to 17939. It is just about
> 0.4% of the original.
> 
> So for workloads that have a lot of writer contention, there will be
> performance regressions. Do you mind if we try to keep the original
> logic of my patchset to allow write lock acquisition in writer slow
> path, but transfer the lock ownership in the wakeup path when handoff
> is required. We can do this with some minor code changes on top of your
> current patchset.

Urgh, sorry, I seem to have lost sight of this... those results,..
sadness :/

Yeah, I suppose there's nothing for it but to have live with that mess,
be very sure to add comments eludicating any future poor sod reading it
as to why the code is the way it is.
  
Waiman Long March 20, 2023, 5:36 p.m. UTC | #12
On 3/20/23 04:12, Peter Zijlstra wrote:
> On Mon, Feb 27, 2023 at 03:16:25PM -0500, Waiman Long wrote:
>> On 2/27/23 05:31, Peter Zijlstra wrote:
>>>> I do have some concern that early lock transfer to a lock owner that has not
>>>> been woken up yet may suppress writer lock stealing from optimistic spinning
>>>> causing some performance regression in some cases. Let's see if the test
>>>> robot report anything.
>>> Ah yes, I suppose that is indeed a possibility. Given this is all under
>>> wait_lock and the spinner is not, I was hoping it would still have
>>> sufficient time to win. But yes, robots will tell us.
>>>
>> I run my rwsem locking microbenchmark on a 2-socket 96-thread x86-64
>> system with lock event turned on for 15 secs.
>>
>> Before this patchset:
>>
>> Running locktest with rwsem [runtime = 15s, r% = 50%, load = 100]
>> Threads = 96, Min/Mean/Max = 74,506/91,260/112,409
>> Threads = 96, Total Rate = 584,091 op/s; Percpu Rate = 6,084 op/s
>>
>> rwsem_opt_fail=127305
>> rwsem_opt_lock=4252147
>> rwsem_opt_nospin=28920
>> rwsem_rlock=2713129
>> rwsem_rlock_fail=0
>> rwsem_rlock_fast=5
>> rwsem_rlock_handoff=280
>> rwsem_rlock_steal=1486617
>> rwsem_sleep_reader=2713085
>> rwsem_sleep_writer=4313369
>> rwsem_wake_reader=29876
>> rwsem_wake_writer=5829160
>> rwsem_wlock=127305
>> rwsem_wlock_fail=0
>> rwsem_wlock_handoff=2515
>>
>> After this patchset:
>>
>> Running locktest with rwsem [runtime = 15s, r% = 50%, load = 100]
>> Threads = 96, Min/Mean/Max = 26,573/26,749/26,833
>> Threads = 96, Total Rate = 171,184 op/s; Percpu Rate = 1,783 op/s
>>
>> rwsem_opt_fail=1265481
>> rwsem_opt_lock=17939
>> rwsem_rlock=1266157
>> rwsem_rlock_fail=0
>> rwsem_rlock_fast=0
>> rwsem_rlock_handoff=0
>> rwsem_rlock_steal=551
>> rwsem_sleep_reader=1266157
>> rwsem_sleep_writer=1265481
>> rwsem_wake_reader=26612
>> rwsem_wake_writer=0
>> rwsem_wlock=1265481
>> rwsem_wlock_ehandoff=94
>> rwsem_wlock_fail=0
>> rwsem_wlock_handoff=94
>>
>> So the locking rate is reduced to just 29.3% of the original. Looking at
>> the number of successful writer lock stealings from optimistic spinning
>> (rwsem_opt_lock), it is reduced from 4252147 to 17939. It is just about
>> 0.4% of the original.
>>
>> So for workloads that have a lot of writer contention, there will be
>> performance regressions. Do you mind if we try to keep the original
>> logic of my patchset to allow write lock acquisition in writer slow
>> path, but transfer the lock ownership in the wakeup path when handoff
>> is required. We can do this with some minor code changes on top of your
>> current patchset.
> Urgh, sorry, I seem to have lost sight of this... those results,..
> sadness :/
>
> Yeah, I suppose there's nothing for it but to have live with that mess,
> be very sure to add comments eludicating any future poor sod reading it
> as to why the code is the way it is.

OK, I will add additional patches to your series to remediate the 
performance degradation. Hopefully, I am planning to get it done either 
by the end of the week or early next week.

Thanks,
Longman
  

Patch

--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -107,7 +107,7 @@ 
  *
  * There are three places where the lock handoff bit may be set or cleared.
  * 1) rwsem_mark_wake() for readers		-- set, clear
- * 2) rwsem_try_write_lock() for writers	-- set, clear
+ * 2) rwsem_writer_wake() for writers	-- set, clear
  * 3) rwsem_del_waiter()			-- clear
  *
  * For all the above cases, wait_lock will be held. A writer must also
@@ -377,7 +377,7 @@  rwsem_add_waiter(struct rw_semaphore *se
 /*
  * Remove a waiter from the wait_list and clear flags.
  *
- * Both rwsem_mark_wake() and rwsem_try_write_lock() contain a full 'copy' of
+ * Both rwsem_mark_wake() and rwsem_writer_wake() contain a full 'copy' of
  * this function. Modify with care.
  *
  * Return: true if wait_list isn't empty and false otherwise
@@ -394,6 +394,100 @@  rwsem_del_waiter(struct rw_semaphore *se
 	return false;
 }
 
+static inline void
+rwsem_waiter_wake(struct rwsem_waiter *waiter, struct wake_q_head *wake_q)
+{
+	struct task_struct *tsk;
+
+	tsk = waiter->task;
+	get_task_struct(tsk);
+
+	/*
+	 * Ensure calling get_task_struct() before setting the reader
+	 * waiter to nil such that rwsem_down_read_slowpath() cannot
+	 * race with do_exit() by always holding a reference count
+	 * to the task to wakeup.
+	 */
+	smp_store_release(&waiter->task, NULL);
+	/*
+	 * Ensure issuing the wakeup (either by us or someone else)
+	 * after setting the reader waiter to nil.
+	 */
+	wake_q_add_safe(wake_q, tsk);
+}
+
+/*
+ * This function must be called with the sem->wait_lock held to prevent
+ * race conditions between checking the rwsem wait list and setting the
+ * sem->count accordingly.
+ *
+ * Implies rwsem_del_waiter() on success.
+ */
+static void rwsem_writer_wake(struct rw_semaphore *sem,
+			      struct rwsem_waiter *waiter,
+			      struct wake_q_head *wake_q)
+{
+	struct rwsem_waiter *first = rwsem_first_waiter(sem);
+	long count, new;
+
+	lockdep_assert_held(&sem->wait_lock);
+
+	count = atomic_long_read(&sem->count);
+	do {
+		bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
+
+		if (has_handoff) {
+			/*
+			 * Honor handoff bit and yield only when the first
+			 * waiter is the one that set it. Otherwisee, we
+			 * still try to acquire the rwsem.
+			 */
+			if (first->handoff_set && (waiter != first))
+				return;
+		}
+
+		new = count;
+
+		if (count & RWSEM_LOCK_MASK) {
+			/*
+			 * A waiter (first or not) can set the handoff bit
+			 * if it is an RT task or wait in the wait queue
+			 * for too long.
+			 */
+			if (has_handoff || (!rt_task(waiter->task) &&
+					    !time_after(jiffies, waiter->timeout)))
+				return;
+
+			new |= RWSEM_FLAG_HANDOFF;
+		} else {
+			new |= RWSEM_WRITER_LOCKED;
+			new &= ~RWSEM_FLAG_HANDOFF;
+
+			if (list_is_singular(&sem->wait_list))
+				new &= ~RWSEM_FLAG_WAITERS;
+		}
+	} while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
+
+	/*
+	 * We have either acquired the lock with handoff bit cleared or set
+	 * the handoff bit. Only the first waiter can have its handoff_set
+	 * set here to enable optimistic spinning in slowpath loop.
+	 */
+	if (new & RWSEM_FLAG_HANDOFF) {
+		first->handoff_set = true;
+		lockevent_inc(rwsem_wlock_handoff);
+		return;
+	}
+
+	/*
+	 * Have rwsem_writer_wake() fully imply rwsem_del_waiter() on
+	 * success.
+	 */
+	list_del(&waiter->list);
+	rwsem_set_owner(sem);
+	rwsem_waiter_wake(waiter, wake_q);
+}
+
 /*
  * handle the lock release when processes blocked on it that can now run
  * - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must
@@ -424,23 +518,12 @@  static void rwsem_mark_wake(struct rw_se
 	 */
 	waiter = rwsem_first_waiter(sem);
 
-	if (waiter->type != RWSEM_WAITING_FOR_WRITE)
-		goto wake_readers;
-
-	if (wake_type == RWSEM_WAKE_ANY) {
-		/*
-		 * Mark writer at the front of the queue for wakeup.
-		 * Until the task is actually later awoken later by
-		 * the caller, other writers are able to steal it.
-		 * Readers, on the other hand, will block as they
-		 * will notice the queued writer.
-		 */
-		wake_q_add(wake_q, waiter->task);
-		lockevent_inc(rwsem_wake_writer);
+	if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
+		if (wake_type == RWSEM_WAKE_ANY)
+			rwsem_writer_wake(sem, waiter, wake_q);
+		return;
 	}
-	return;
 
-wake_readers:
 	/*
 	 * No reader wakeup if there are too many of them already.
 	 */
@@ -547,25 +630,8 @@  static void rwsem_mark_wake(struct rw_se
 		atomic_long_add(adjustment, &sem->count);
 
 	/* 2nd pass */
-	list_for_each_entry_safe(waiter, tmp, &wlist, list) {
-		struct task_struct *tsk;
-
-		tsk = waiter->task;
-		get_task_struct(tsk);
-
-		/*
-		 * Ensure calling get_task_struct() before setting the reader
-		 * waiter to nil such that rwsem_down_read_slowpath() cannot
-		 * race with do_exit() by always holding a reference count
-		 * to the task to wakeup.
-		 */
-		smp_store_release(&waiter->task, NULL);
-		/*
-		 * Ensure issuing the wakeup (either by us or someone else)
-		 * after setting the reader waiter to nil.
-		 */
-		wake_q_add_safe(wake_q, tsk);
-	}
+	list_for_each_entry_safe(waiter, tmp, &wlist, list)
+		rwsem_waiter_wake(waiter, wake_q);
 }
 
 /*
@@ -596,77 +662,6 @@  rwsem_del_wake_waiter(struct rw_semaphor
 }
 
 /*
- * This function must be called with the sem->wait_lock held to prevent
- * race conditions between checking the rwsem wait list and setting the
- * sem->count accordingly.
- *
- * Implies rwsem_del_waiter() on success.
- */
-static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
-					struct rwsem_waiter *waiter)
-{
-	struct rwsem_waiter *first = rwsem_first_waiter(sem);
-	long count, new;
-
-	lockdep_assert_held(&sem->wait_lock);
-
-	count = atomic_long_read(&sem->count);
-	do {
-		bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
-
-		if (has_handoff) {
-			/*
-			 * Honor handoff bit and yield only when the first
-			 * waiter is the one that set it. Otherwisee, we
-			 * still try to acquire the rwsem.
-			 */
-			if (first->handoff_set && (waiter != first))
-				return false;
-		}
-
-		new = count;
-
-		if (count & RWSEM_LOCK_MASK) {
-			/*
-			 * A waiter (first or not) can set the handoff bit
-			 * if it is an RT task or wait in the wait queue
-			 * for too long.
-			 */
-			if (has_handoff || (!rt_task(waiter->task) &&
-					    !time_after(jiffies, waiter->timeout)))
-				return false;
-
-			new |= RWSEM_FLAG_HANDOFF;
-		} else {
-			new |= RWSEM_WRITER_LOCKED;
-			new &= ~RWSEM_FLAG_HANDOFF;
-
-			if (list_is_singular(&sem->wait_list))
-				new &= ~RWSEM_FLAG_WAITERS;
-		}
-	} while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
-
-	/*
-	 * We have either acquired the lock with handoff bit cleared or set
-	 * the handoff bit. Only the first waiter can have its handoff_set
-	 * set here to enable optimistic spinning in slowpath loop.
-	 */
-	if (new & RWSEM_FLAG_HANDOFF) {
-		first->handoff_set = true;
-		lockevent_inc(rwsem_wlock_handoff);
-		return false;
-	}
-
-	/*
-	 * Have rwsem_try_write_lock() fully imply rwsem_del_waiter() on
-	 * success.
-	 */
-	list_del(&waiter->list);
-	rwsem_set_owner(sem);
-	return true;
-}
-
-/*
  * The rwsem_spin_on_owner() function returns the following 4 values
  * depending on the lock owner state.
  *   OWNER_NULL  : owner is currently NULL
@@ -1072,7 +1067,7 @@  rwsem_down_read_slowpath(struct rw_semap
 	for (;;) {
 		set_current_state(state);
 		if (!smp_load_acquire(&waiter.task)) {
-			/* Matches rwsem_mark_wake()'s smp_store_release(). */
+			/* Matches rwsem_waiter_wake()'s smp_store_release(). */
 			break;
 		}
 		if (signal_pending_state(state, current)) {
@@ -1143,54 +1138,36 @@  rwsem_down_write_slowpath(struct rw_sema
 	} else {
 		atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);
 	}
+	raw_spin_unlock_irq(&sem->wait_lock);
 
 	/* wait until we successfully acquire the lock */
-	set_current_state(state);
 	trace_contention_begin(sem, LCB_F_WRITE);
 
 	for (;;) {
-		if (rwsem_try_write_lock(sem, &waiter)) {
-			/* rwsem_try_write_lock() implies ACQUIRE on success */
+		set_current_state(state);
+		if (!smp_load_acquire(&waiter.task)) {
+			/* Matches rwsem_waiter_wake()'s smp_store_release(). */
 			break;
 		}
-
-		raw_spin_unlock_irq(&sem->wait_lock);
-
-		if (signal_pending_state(state, current))
-			goto out_nolock;
-
-		/*
-		 * After setting the handoff bit and failing to acquire
-		 * the lock, attempt to spin on owner to accelerate lock
-		 * transfer. If the previous owner is a on-cpu writer and it
-		 * has just released the lock, OWNER_NULL will be returned.
-		 * In this case, we attempt to acquire the lock again
-		 * without sleeping.
-		 */
-		if (waiter.handoff_set) {
-			enum owner_state owner_state;
-
-			owner_state = rwsem_spin_on_owner(sem);
-			if (owner_state == OWNER_NULL)
-				goto trylock_again;
+		if (signal_pending_state(state, current)) {
+			raw_spin_lock_irq(&sem->wait_lock);
+			if (waiter.task)
+				goto out_nolock;
+			raw_spin_unlock_irq(&sem->wait_lock);
+			/* Ordered by sem->wait_lock against rwsem_mark_wake(). */
+			break;
 		}
-
 		schedule_preempt_disabled();
 		lockevent_inc(rwsem_sleep_writer);
-		set_current_state(state);
-trylock_again:
-		raw_spin_lock_irq(&sem->wait_lock);
 	}
 	__set_current_state(TASK_RUNNING);
-	raw_spin_unlock_irq(&sem->wait_lock);
 	lockevent_inc(rwsem_wlock);
 	trace_contention_end(sem, 0);
 	return sem;
 
 out_nolock:
-	__set_current_state(TASK_RUNNING);
-	raw_spin_lock_irq(&sem->wait_lock);
 	rwsem_del_wake_waiter(sem, &waiter, &wake_q);
+	__set_current_state(TASK_RUNNING);
 	lockevent_inc(rwsem_wlock_fail);
 	trace_contention_end(sem, -EINTR);
 	return ERR_PTR(-EINTR);