All of lore.kernel.org
 help / color / mirror / Atom feed
From: Peter Zijlstra <peterz@infradead.org>
To: Waiman Long <longman@redhat.com>
Cc: Ingo Molnar <mingo@redhat.com>, Will Deacon <will@kernel.org>,
	linux-kernel@vger.kernel.org, Davidlohr Bueso <dave@stgolabs.net>,
	mazhenhua <mazhenhua@xiaomi.com>, Hillf Danton <hdanton@sina.com>,
	Maria Yu <quic_aiquny@quicinc.com>
Subject: Re: [PATCH v4] locking/rwsem: Make handoff bit handling more consistent
Date: Fri, 12 Nov 2021 13:10:59 +0100	[thread overview]
Message-ID: <YY5Z009P2jJ4X484@hirez.programming.kicks-ass.net> (raw)
In-Reply-To: <20211112040753.389380-1-longman@redhat.com>

On Thu, Nov 11, 2021 at 11:07:53PM -0500, Waiman Long wrote:
> @@ -889,6 +892,20 @@ rwsem_spin_on_owner(struct rw_semaphore *sem)
>  }
>  #endif
>  
> +/*
> + * Common code to handle rwsem flags in out_nolock path with wait_lock held.
> + * If there is more than one waiter in the queue and the HANDOFF bit is set,
> + * the next waiter will inherit it if the first waiter is removed.
> + */
> +static inline void rwsem_out_nolock_clear_flags(struct rw_semaphore *sem,
> +						struct rwsem_waiter *waiter)

I'm going to rename that, it doesn't just clear flags, it dequeues the
waiter.

Argh, rwsem_mark_wake() doesn't clear HANDOFF when list_empty(), and
write_slowpath() is *far* too clever about all of this.


> +{
> +	list_del(&waiter->list);
	if (list_empty(&sem->wait_list)) {
> +		atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS,
> +				   &sem->count);
	}
> +}



> @@ -1098,7 +1110,7 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
>  		 * In this case, we attempt to acquire the lock again
>  		 * without sleeping.
>  		 */
> -		if (wstate == WRITER_HANDOFF) {
> +		if (waiter.handoff_set) {

I'm thinking this wants to be something like:

		if (rwsem_first_waiter(sem) == &waiter && waiter.handoff_set) {

>  			enum owner_state owner_state;
>  
>  			preempt_disable();

How's this (on top) then?

---
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -104,11 +104,10 @@
  * atomic_long_fetch_add() is used to obtain reader lock, whereas
  * atomic_long_cmpxchg() will be used to obtain writer lock.
  *
- * There are four places where the lock handoff bit may be set or cleared.
- * 1) rwsem_mark_wake() for readers            -- set, clear
- * 2) rwsem_try_write_lock() for writers       -- set, clear
- * 3) Error path of rwsem_down_write_slowpath() -- clear
- * 4) Error path of rwsem_down_read_slowpath()  -- clear
+ * There are three places where the lock handoff bit may be set or cleared.
+ * 1) rwsem_mark_wake() for readers		-- set, clear
+ * 2) rwsem_try_write_lock() for writers	-- set, clear
+ * 3) rwsem_del_waiter()			-- clear
  *
  * For all the above cases, wait_lock will be held. A writer must also
  * be the first one in the wait_list to be eligible for setting the handoff
@@ -363,6 +362,31 @@ enum rwsem_wake_type {
  */
 #define MAX_READERS_WAKEUP	0x100
 
+static inline void
+rwsem_add_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
+{
+	lockdep_assert_held(&sem->wait_lock);
+	list_add_tail(&waiter->list, &sem->wait_list);
+	/* caller will set RWSEM_FLAG_WAITERS */
+}
+
+/*
+ * Remove a waiter from the wait_list and clear flags.
+ *
+ * Both rwsem_mark_wake() and rwsem_try_write_lock() contain a full 'copy' of
+ * this function. Modify with care.
+ */
+static inline void
+rwsem_del_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
+{
+	lockdep_assert_held(&sem->wait_lock);
+	list_del(&waiter->list);
+	if (likely(!list_empty(&sem->wait_list)))
+		return;
+
+	atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS, &sem->count);
+}
+
 /*
  * handle the lock release when processes blocked on it that can now run
  * - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must
@@ -374,6 +398,8 @@ enum rwsem_wake_type {
  *   preferably when the wait_lock is released
  * - woken process blocks are discarded from the list after having task zeroed
  * - writers are only marked woken if downgrading is false
+ *
+ * Implies rwsem_del_waiter() for all woken readers.
  */
 static void rwsem_mark_wake(struct rw_semaphore *sem,
 			    enum rwsem_wake_type wake_type,
@@ -488,18 +514,25 @@ static void rwsem_mark_wake(struct rw_se
 
 	adjustment = woken * RWSEM_READER_BIAS - adjustment;
 	lockevent_cond_inc(rwsem_wake_reader, woken);
+
+	oldcount = atomic_long_read(&sem->count);
 	if (list_empty(&sem->wait_list)) {
-		/* hit end of list above */
+		/*
+		 * Combined with list_move_tail() above, this implies
+		 * rwsem_del_waiter().
+		 */
 		adjustment -= RWSEM_FLAG_WAITERS;
+		if (oldcount & RWSEM_FLAG_HANDOFF)
+			adjustment -= RWSEM_FLAG_HANDOFF;
+	} else if (woken) {
+		/*
+		 * When we've woken a reader, we no longer need to force
+		 * writers to give up the lock and we can clear HANDOFF.
+		 */
+		if (oldcount & RWSEM_FLAG_HANDOFF)
+			adjustment -= RWSEM_FLAG_HANDOFF;
 	}
 
-	/*
-	 * When we've woken a reader, we no longer need to force writers
-	 * to give up the lock and we can clear HANDOFF.
-	 */
-	if (woken && (atomic_long_read(&sem->count) & RWSEM_FLAG_HANDOFF))
-		adjustment -= RWSEM_FLAG_HANDOFF;
-
 	if (adjustment)
 		atomic_long_add(adjustment, &sem->count);
 
@@ -529,6 +562,8 @@ static void rwsem_mark_wake(struct rw_se
  * This function must be called with the sem->wait_lock held to prevent
  * race conditions between checking the rwsem wait list and setting the
  * sem->count accordingly.
+ *
+ * Implies rwsem_del_waiter() on success.
  */
 static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
 					struct rwsem_waiter *waiter)
@@ -575,6 +610,11 @@ static inline bool rwsem_try_write_lock(
 		return false;
 	}
 
+	/*
+	 * Have rwsem_try_write_lock() fully imply rwsem_del_waiter() on
+	 * success.
+	 */
+	list_del(&waiter->list);
 	rwsem_set_owner(sem);
 	return true;
 }
@@ -893,20 +933,6 @@ rwsem_spin_on_owner(struct rw_semaphore
 #endif
 
 /*
- * Common code to handle rwsem flags in out_nolock path with wait_lock held.
- * If there is more than one waiter in the queue and the HANDOFF bit is set,
- * the next waiter will inherit it if the first waiter is removed.
- */
-static inline void rwsem_out_nolock_clear_flags(struct rw_semaphore *sem,
-						struct rwsem_waiter *waiter)
-{
-	list_del(&waiter->list);
-	if (list_empty(&sem->wait_list))
-		atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS,
-				   &sem->count);
-}
-
-/*
  * Wait for the read lock to be granted
  */
 static struct rw_semaphore __sched *
@@ -973,7 +999,7 @@ rwsem_down_read_slowpath(struct rw_semap
 		}
 		adjustment += RWSEM_FLAG_WAITERS;
 	}
-	list_add_tail(&waiter.list, &sem->wait_list);
+	rwsem_add_waiter(sem, &waiter);
 
 	/* we're now waiting on the lock, but no longer actively locking */
 	count = atomic_long_add_return(adjustment, &sem->count);
@@ -1019,7 +1045,7 @@ rwsem_down_read_slowpath(struct rw_semap
 	return sem;
 
 out_nolock:
-	rwsem_out_nolock_clear_flags(sem, &waiter);
+	rwsem_del_waiter(sem, &waiter);
 	raw_spin_unlock_irq(&sem->wait_lock);
 	__set_current_state(TASK_RUNNING);
 	lockevent_inc(rwsem_rlock_fail);
@@ -1034,7 +1060,6 @@ rwsem_down_write_slowpath(struct rw_sema
 {
 	long count;
 	struct rwsem_waiter waiter;
-	struct rw_semaphore *ret = sem;
 	DEFINE_WAKE_Q(wake_q);
 
 	/* do optimistic spinning and steal lock if possible */
@@ -1053,7 +1078,7 @@ rwsem_down_write_slowpath(struct rw_sema
 	waiter.handoff_set = false;
 
 	raw_spin_lock_irq(&sem->wait_lock);
-	list_add_tail(&waiter.list, &sem->wait_list);
+	rwsem_add_waiter(sem, &waiter);
 
 	/* we're now waiting on the lock */
 	if (rwsem_first_waiter(sem) != &waiter) {
@@ -1110,7 +1135,7 @@ rwsem_down_write_slowpath(struct rw_sema
 		 * In this case, we attempt to acquire the lock again
 		 * without sleeping.
 		 */
-		if (waiter.handoff_set) {
+		if (rwsem_first_waiter(sem) == &waiter && waiter.handoff_set) {
 			enum owner_state owner_state;
 
 			preempt_disable();
@@ -1128,16 +1153,14 @@ rwsem_down_write_slowpath(struct rw_sema
 		raw_spin_lock_irq(&sem->wait_lock);
 	}
 	__set_current_state(TASK_RUNNING);
-	list_del(&waiter.list);
 	raw_spin_unlock_irq(&sem->wait_lock);
 	lockevent_inc(rwsem_wlock);
-
-	return ret;
+	return sem;
 
 out_nolock:
 	__set_current_state(TASK_RUNNING);
 	raw_spin_lock_irq(&sem->wait_lock);
-	rwsem_out_nolock_clear_flags(sem, &waiter);
+	rwsem_del_waiter(sem, &waiter);
 	if (!list_empty(&sem->wait_list))
 		rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
 	raw_spin_unlock_irq(&sem->wait_lock);

  reply	other threads:[~2021-11-12 12:11 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-11-12  4:07 [PATCH v4] locking/rwsem: Make handoff bit handling more consistent Waiman Long
2021-11-12 12:10 ` Peter Zijlstra [this message]
2021-11-15  3:38   ` Waiman Long
2021-11-15 16:01     ` Peter Zijlstra
2021-11-15 22:29       ` Waiman Long
2021-11-15 23:07         ` Peter Zijlstra
2021-11-16  0:09           ` Waiman Long
2021-11-15 15:45   ` Aiqun(Maria) Yu
2021-11-15 16:58     ` Waiman Long

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=YY5Z009P2jJ4X484@hirez.programming.kicks-ass.net \
    --to=peterz@infradead.org \
    --cc=dave@stgolabs.net \
    --cc=hdanton@sina.com \
    --cc=linux-kernel@vger.kernel.org \
    --cc=longman@redhat.com \
    --cc=mazhenhua@xiaomi.com \
    --cc=mingo@redhat.com \
    --cc=quic_aiquny@quicinc.com \
    --cc=will@kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.