public inbox for linux-kernel@vger.kernel.org
 help / color / mirror / Atom feed
From: Waiman Long <longman@redhat.com>
To: Peter Zijlstra <peterz@infradead.org>,
	Ingo Molnar <mingo@redhat.com>,
	Thomas Gleixner <tglx@linutronix.de>,
	"H. Peter Anvin" <hpa@zytor.com>
Cc: linux-kernel@vger.kernel.org, Waiman Long <longman@redhat.com>
Subject: [RFC PATCH 5/7] locking/rtqspinlock: Handle priority boosting
Date: Tue,  3 Jan 2017 13:00:28 -0500	[thread overview]
Message-ID: <1483466430-8028-6-git-send-email-longman@redhat.com> (raw)
In-Reply-To: <1483466430-8028-1-git-send-email-longman@redhat.com>

As the priority of a task may get boosted due to an acquired rtmutex,
we will need to periodically check the task priority to see if it
gets boosted. For an originally non-RT task, that means unqueuing from
the MCS wait queue before doing an RT spinning. So the unqueuing code
from osq_lock is borrowed to handle the unqueuing aspect of it.

Signed-off-by: Waiman Long <longman@redhat.com>
---
 kernel/locking/qspinlock.c    |  25 ++++-
 kernel/locking/qspinlock_rt.h | 235 +++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 253 insertions(+), 7 deletions(-)

diff --git a/kernel/locking/qspinlock.c b/kernel/locking/qspinlock.c
index cb5c2e5..b25ad58 100644
--- a/kernel/locking/qspinlock.c
+++ b/kernel/locking/qspinlock.c
@@ -270,10 +270,18 @@ static __always_inline u32  __pv_wait_head_or_lock(struct qspinlock *lock,
 /*
  * Realtime queued spinlock is mutual exclusive with native and PV spinlocks.
  */
+#define RT_RETRY	((u32)-1)
+
 #ifdef CONFIG_REALTIME_QUEUED_SPINLOCKS
 #include "qspinlock_rt.h"
 #else
-static __always_inline u32 rt_wait_head_or_retry(struct qspinlock *lock)
+static __always_inline void rt_init_node(struct mcs_spinlock *node, u32 tail) {}
+static __always_inline bool rt_wait_node_or_unqueue(struct qspinlock *lock,
+						    struct mcs_spinlock *node,
+						    struct mcs_spinlock *prev)
+						{ return false; }
+static __always_inline u32  rt_spin_lock_or_retry(struct qspinlock *lock,
+						  struct mcs_spinlock *node)
 						{ return 0; }
 #define rt_pending(v)		0
 #define rt_enabled()		false
@@ -514,6 +522,7 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
 	node->locked = 0;
 	node->next = NULL;
 	pv_init_node(node);
+	rt_init_node(node, tail);
 
 	/*
 	 * We touched a (possibly) cold cacheline in the per-cpu queue node;
@@ -552,6 +561,10 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
 		WRITE_ONCE(prev->next, node);
 
 		pv_wait_node(node, prev);
+
+		if (rt_wait_node_or_unqueue(lock, node, prev))
+			goto queue;	/* Retry RT locking */
+
 		arch_mcs_spin_lock_contended(&node->locked);
 
 		/*
@@ -591,10 +604,14 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
 
 	/*
 	 * The RT rt_wait_head_or_lock function, if active, will acquire the
-	 * lock and return a non-zero value.
+	 * lock, release the MCS lock and return a non-zero value. We need to
+	 * retry with RT spinning when RT_RETRY is returned.
 	 */
-	if ((val = rt_wait_head_or_retry(lock)))
-		goto locked;
+	val = rt_spin_lock_or_retry(lock, node);
+	if (val == RT_RETRY)
+		goto queue;
+	else if (val)
+		return;
 
 	val = smp_cond_load_acquire(&lock->val.counter, !(VAL & _Q_LOCKED_PENDING_MASK));
 
diff --git a/kernel/locking/qspinlock_rt.h b/kernel/locking/qspinlock_rt.h
index c2f2722..0c4d051 100644
--- a/kernel/locking/qspinlock_rt.h
+++ b/kernel/locking/qspinlock_rt.h
@@ -37,6 +37,12 @@
  * doing so, we can query the highest priority task that is waiting on the
  * outer lock and adjust our waiting priority accordingly. To speed up nested
  * spinlock calls, they will have a minimum RT priority of 1 to begin with.
+ *
+ * To handle priority boosting due to an acquired rt-mutex, The task->prio
+ * field is queried in each iteration of the loop. For originally non-RT tasks,
+ * it will have to break out of the MCS wait queue just like what is done
+ * in the OSQ lock. Then it has to retry RT spinning if it has been boosted
+ * to RT priority.
  */
 #include <linux/sched.h>
 
@@ -45,9 +51,56 @@
 #endif
 
 /*
+ * For proper unqueuing from the MCS wait queue, we need to store the encoded
+ * tail code as well the previous node pointer into the extra MCS node. Since
+ * CPUs in interrupt context won't use the per-CPU MCS nodes anymore. So only
+ * one is needed for process context CPUs. As a result, we can use the
+ * additional nodes for data storage. Here, we allow 2 nodes per cpu in case
+ * we want to put softIRQ CPUs into the queue as well.
+ */
+struct rt_node {
+	struct mcs_spinlock	mcs;
+	struct mcs_spinlock	__reserved;
+	struct mcs_spinlock	*prev;
+	u32			tail;
+};
+
+/*
  * =========================[ Helper Functions ]=========================
  */
 
+static u32 cmpxchg_tail_acquire(struct qspinlock *lock, u32 old, u32 new)
+{
+	struct __qspinlock *l = (void *)lock;
+
+	return cmpxchg_acquire(&l->tail, old >> _Q_TAIL_OFFSET,
+			       new >> _Q_TAIL_OFFSET) << _Q_TAIL_OFFSET;
+}
+
+static u32 cmpxchg_tail_release(struct qspinlock *lock, u32 old, u32 new)
+{
+	struct __qspinlock *l = (void *)lock;
+
+	return cmpxchg_release(&l->tail, old >> _Q_TAIL_OFFSET,
+			       new >> _Q_TAIL_OFFSET) << _Q_TAIL_OFFSET;
+}
+
+static inline void rt_write_prev(struct mcs_spinlock *node,
+				 struct mcs_spinlock *prev)
+{
+	WRITE_ONCE(((struct rt_node *)node)->prev, prev);
+}
+
+static inline u32 rt_read_tail(struct mcs_spinlock *node)
+{
+	return READ_ONCE(((struct rt_node *)node)->tail);
+}
+
+static inline struct mcs_spinlock *rt_read_prev(struct mcs_spinlock *node)
+{
+	return READ_ONCE(((struct rt_node *)node)->prev);
+}
+
 /*
  * Translate the priority of a task to an equivalent RT priority
  */
@@ -141,6 +194,56 @@ static bool __rt_spin_trylock(struct qspinlock *lock,
 }
 
 /*
+ * MCS wait queue unqueuing code, borrow mostly from osq_lock.c.
+ */
+static struct mcs_spinlock *
+mcsq_wait_next(struct qspinlock *lock, struct mcs_spinlock *node,
+	       struct mcs_spinlock *prev)
+{
+	 struct mcs_spinlock *next = NULL;
+	 u32 tail = rt_read_tail(node);
+	 u32 old;
+
+	/*
+	 * If there is a prev node in queue, the 'old' value will be
+	 * the prev node's tail value. Otherwise, it's set to 0 since if
+	 * we're the only one in queue, the queue will then become empty.
+	 */
+	old = prev ? rt_read_tail(prev) : 0;
+
+	for (;;) {
+		if ((atomic_read(&lock->val) & _Q_TAIL_MASK) == tail &&
+		    cmpxchg_tail_acquire(lock, tail, old) == tail) {
+			/*
+			 * We are at the queue tail, we moved the @lock back.
+			 * @prev will now observe @lock and will complete its
+			 * unlock()/unqueue().
+			 */
+			break;
+		}
+
+		/*
+		 * We must xchg() the @node->next value, because if we were to
+		 * leave it in, a concurrent unlock()/unqueue() from
+		 * @node->next might complete Step-A and think its @prev is
+		 * still valid.
+		 *
+		 * If the concurrent unlock()/unqueue() wins the race, we'll
+		 * wait for either @lock to point to us, through its Step-B, or
+		 * wait for a new @node->next from its Step-C.
+		 */
+		if (node->next) {
+			next = xchg(&node->next, NULL);
+			if (next)
+				break;
+		}
+
+		cpu_relax();
+	}
+	return next;
+}
+
+/*
  * ====================[ Functions Used by qspinlock.c ]====================
  */
 
@@ -158,6 +261,17 @@ static inline int rt_pending(int val)
 }
 
 /*
+ * Initialize the RT fields of a MCS node.
+ */
+static inline void rt_init_node(struct mcs_spinlock *node, u32 tail)
+{
+	struct rt_node *n = (struct rt_node *)node;
+
+	n->prev = NULL;
+	n->tail = tail;
+}
+
+/*
  * Return: true if locked acquired
  *	   false if queuing in the MCS wait queue is needed.
  */
@@ -167,16 +281,98 @@ static inline bool rt_spin_trylock(struct qspinlock *lock)
 }
 
 /*
+ * Return: true if it has been unqueued and need to retry locking.
+ *	   false if it becomes the wait queue head & proceed to next step.
+ */
+static bool rt_wait_node_or_unqueue(struct qspinlock *lock,
+				    struct mcs_spinlock *node,
+				    struct mcs_spinlock *prev)
+{
+	struct mcs_spinlock *next;
+
+	rt_write_prev(node, prev);	/* Save previous node pointer */
+
+	while (!READ_ONCE(node->locked)) {
+		if (rt_task_priority(current, 0))
+			goto unqueue;
+		cpu_relax();
+	}
+	return false;
+
+unqueue:
+	/*
+	 * Step - A  -- stabilize @prev
+	 *
+	 * Undo our @prev->next assignment; this will make @prev's
+	 * unlock()/unqueue() wait for a next pointer since @lock points
+	 * to us (or later).
+	 */
+	for (;;) {
+		if (prev->next == node &&
+		    cmpxchg(&prev->next, node, NULL) == node)
+			break;
+
+		/*
+		 * We can only fail the cmpxchg() racing against an unlock(),
+		 * in which case we should observe @node->locked becoming
+		 * true.
+		 */
+		if (smp_load_acquire(&node->locked))
+			return false;
+
+		cpu_relax();
+
+		/*
+		 * Or we race against a concurrent unqueue()'s step-B, in which
+		 * case its step-C will write us a new @node->prev pointer.
+		 */
+		prev = rt_read_prev(node);
+	}
+
+	/*
+	 * Step - B -- stabilize @next
+	 *
+	 * Similar to unlock(), wait for @node->next or move @lock from @node
+	 * back to @prev.
+	 */
+	next = mcsq_wait_next(lock, node, prev);
+
+	/*
+	 * Step - C -- unlink
+	 *
+	 * @prev is stable because its still waiting for a new @prev->next
+	 * pointer, @next is stable because our @node->next pointer is NULL and
+	 * it will wait in Step-A.
+	 */
+	if (next) {
+		rt_write_prev(next, prev);
+		WRITE_ONCE(prev->next, next);
+	}
+
+	/*
+	 * Release the node.
+	 */
+	__this_cpu_dec(mcs_nodes[0].count);
+
+	return true;	/* Need to retry RT spinning */
+}
+
+/*
  * We need to make the non-RT tasks wait longer if RT tasks are spinning for
  * the lock. This is done to reduce the chance that a non-RT task may
  * accidently grab the lock away from the RT tasks in the short interval
  * where the pending priority may be reset after an RT task acquires the lock.
  *
- * Return: Current value of the lock.
+ * Return: RT_RETRY if it needs to retry locking.
+ *	   1 if lock acquired.
  */
-static u32 rt_wait_head_or_retry(struct qspinlock *lock)
+static u32 rt_spin_lock_or_retry(struct qspinlock *lock,
+				 struct mcs_spinlock *node)
 {
 	struct __qspinlock *l = (void *)lock;
+	struct mcs_spinlock *next;
+	bool retry = false;
+	u32 tail;
 
 	for (;;) {
 		u16 lockpend = READ_ONCE(l->locked_pending);
@@ -187,6 +383,14 @@ static u32 rt_wait_head_or_retry(struct qspinlock *lock)
 			if (!lockpend)
 				break;
 		}
+		/*
+		 * We need to break out of the non-RT wait queue and do
+		 * RT spinnning if we become an RT task.
+		 */
+		if (rt_task_priority(current, 0)) {
+			retry = true;
+			goto unlock;
+		}
 
 		/*
 		 * 4 cpu_relax's if RT tasks present.
@@ -198,7 +402,32 @@ static u32 rt_wait_head_or_retry(struct qspinlock *lock)
 		}
 		cpu_relax();
 	}
-	return atomic_read(&lock->val);
+
+unlock:
+	/*
+	 * Remove itself from the MCS wait queue (unlock).
+	 */
+	tail = rt_read_tail(node);
+	if (cmpxchg_tail_release(lock, tail, 0) == tail)
+		goto release;
+
+	/*
+	 * Second case.
+	 */
+	next = xchg(&node->next, NULL);
+	if (!next)
+		next = mcsq_wait_next(lock, node, NULL);
+
+	if (next)
+		WRITE_ONCE(next->locked, 1);
+
+release:
+	/*
+	 * Release the node.
+	 */
+	__this_cpu_dec(mcs_nodes[0].count);
+
+	return retry ? RT_RETRY : 1;
 }
 
 /*
-- 
1.8.3.1

  parent reply	other threads:[~2017-01-03 18:01 UTC|newest]

Thread overview: 24+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2017-01-03 18:00 [RFC PATCH 0/7] locking/rtqspinlock: Realtime queued spinlocks Waiman Long
2017-01-03 18:00 ` [RFC PATCH 1/7] locking/spinlock: Remove the unused spin_lock_bh_nested API Waiman Long
2017-01-03 18:00 ` [RFC PATCH 2/7] locking/rtqspinlock: Introduce realtime queued spinlocks Waiman Long
2017-01-03 18:00 ` [RFC PATCH 3/7] locking/rtqspinlock: Use static RT priority when in interrupt context Waiman Long
2017-01-03 18:00 ` [RFC PATCH 4/7] locking/rtqspinlock: Override spin_lock_nested with special RT variants Waiman Long
2017-01-03 18:00 ` Waiman Long [this message]
2017-01-03 18:00 ` [RFC PATCH 6/7] locking/rtqspinlock: Voluntarily yield CPU when need_sched() Waiman Long
2017-01-04 10:07   ` Boqun Feng
2017-01-04 21:57     ` Waiman Long
2017-01-05 10:16   ` Daniel Bristot de Oliveira
2017-01-03 18:00 ` [RFC PATCH 7/7] locking/rtqspinlock: Enable collection of event counts Waiman Long
2017-01-04 12:49 ` [RFC PATCH 0/7] locking/rtqspinlock: Realtime queued spinlocks Peter Zijlstra
2017-01-04 15:25   ` Waiman Long
2017-01-04 15:55     ` Steven Rostedt
2017-01-04 20:02       ` Waiman Long
2017-01-05 18:43         ` Steven Rostedt
2017-01-05  9:26     ` Daniel Bristot de Oliveira
2017-01-05  9:44     ` Peter Zijlstra
2017-01-05 15:55       ` Waiman Long
2017-01-05 16:08         ` Peter Zijlstra
2017-01-05 17:07           ` Waiman Long
2017-01-05 18:50             ` Steven Rostedt
2017-01-05 19:24               ` Waiman Long
2017-01-05 18:05           ` Daniel Bristot de Oliveira

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1483466430-8028-6-git-send-email-longman@redhat.com \
    --to=longman@redhat.com \
    --cc=hpa@zytor.com \
    --cc=linux-kernel@vger.kernel.org \
    --cc=mingo@redhat.com \
    --cc=peterz@infradead.org \
    --cc=tglx@linutronix.de \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox