From: "Paul E. McKenney" <paulmck@linux.vnet.ibm.com>
To: linux-kernel@vger.kernel.org
Cc: mingo@kernel.org, jiangshanlai@gmail.com, dipankar@in.ibm.com,
akpm@linux-foundation.org, mathieu.desnoyers@efficios.com,
josh@joshtriplett.org, tglx@linutronix.de, peterz@infradead.org,
rostedt@goodmis.org, dhowells@redhat.com, edumazet@google.com,
fweisbec@gmail.com, oleg@redhat.com,
"Paul E. McKenney" <paulmck@linux.vnet.ibm.com>
Subject: [PATCH tip/core/rcu 02/15] rcu: Use timer as backstop for NOCB deferred wakeups
Date: Mon, 24 Jul 2017 14:44:31 -0700 [thread overview]
Message-ID: <1500932684-10469-2-git-send-email-paulmck@linux.vnet.ibm.com> (raw)
In-Reply-To: <20170724214425.GA9665@linux.vnet.ibm.com>
The handling of RCU's no-CBs CPUs has a maintenance headache, namely
that if call_rcu() is invoked with interrupts disabled, the rcuo kthread
wakeup must be defered to a point where we can be sure that scheduler
locks are not held. Of course, there are a lot of code paths leading
from an interrupts-disabled invocation of call_rcu(), and missing any
one of these can result in excessive callback-invocation latency, and
potentially even system hangs.
This commit therefore uses a timer to guarantee that the wakeup will
eventually occur. If one of the deferred-wakeup points kicks in, then
the timer is simply cancelled.
This commit also fixes up an incomplete removal of commits that were
intended to plug remaining exit paths, which should have the added
benefit of reducing the overhead of RCU's context-switch hooks. In
addition, it simplifies leader-to-follower callback-list handoff by
introducing locking. The call_rcu()-to-leader handoff continues to
use atomic operations in order to maintain good real-time latency for
common-case use of call_rcu().
Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
[ paulmck: Dan Carpenter fix for mod_timer() usage bug found by smatch. ]
---
kernel/rcu/tree.h | 2 +
kernel/rcu/tree_plugin.h | 179 +++++++++++++++++++++++++++++------------------
2 files changed, 111 insertions(+), 70 deletions(-)
diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
index 9af0f31d6847..fe83f684ddcd 100644
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -268,7 +268,9 @@ struct rcu_data {
struct rcu_head **nocb_follower_tail;
struct swait_queue_head nocb_wq; /* For nocb kthreads to sleep on. */
struct task_struct *nocb_kthread;
+ raw_spinlock_t nocb_lock; /* Guard following pair of fields. */
int nocb_defer_wakeup; /* Defer wakeup of nocb_kthread. */
+ struct timer_list nocb_timer; /* Enforce finite deferral. */
/* The following fields are used by the leader, hence own cacheline. */
struct rcu_head *nocb_gp_head ____cacheline_internodealigned_in_smp;
diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
index 908b309d60d7..bb9e6e43130f 100644
--- a/kernel/rcu/tree_plugin.h
+++ b/kernel/rcu/tree_plugin.h
@@ -1788,23 +1788,62 @@ bool rcu_is_nocb_cpu(int cpu)
}
/*
- * Kick the leader kthread for this NOCB group.
+ * Kick the leader kthread for this NOCB group. Caller holds ->nocb_lock
+ * and this function releases it.
*/
-static void wake_nocb_leader(struct rcu_data *rdp, bool force)
+static void __wake_nocb_leader(struct rcu_data *rdp, bool force,
+ unsigned long flags)
+ __releases(rdp->nocb_lock)
{
struct rcu_data *rdp_leader = rdp->nocb_leader;
- if (!READ_ONCE(rdp_leader->nocb_kthread))
+ lockdep_assert_held(&rdp->nocb_lock);
+ if (!READ_ONCE(rdp_leader->nocb_kthread)) {
+ raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
return;
- if (READ_ONCE(rdp_leader->nocb_leader_sleep) || force) {
+ }
+ if (rdp_leader->nocb_leader_sleep || force) {
/* Prior smp_mb__after_atomic() orders against prior enqueue. */
WRITE_ONCE(rdp_leader->nocb_leader_sleep, false);
+ del_timer(&rdp->nocb_timer);
+ raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
smp_mb(); /* ->nocb_leader_sleep before swake_up(). */
swake_up(&rdp_leader->nocb_wq);
+ } else {
+ raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
}
}
/*
+ * Kick the leader kthread for this NOCB group, but caller has not
+ * acquired locks.
+ */
+static void wake_nocb_leader(struct rcu_data *rdp, bool force)
+{
+ unsigned long flags;
+
+ raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+ __wake_nocb_leader(rdp, force, flags);
+}
+
+/*
+ * Arrange to wake the leader kthread for this NOCB group at some
+ * future time when it is safe to do so.
+ */
+static void wake_nocb_leader_defer(struct rcu_data *rdp, int waketype,
+ const char *reason)
+{
+ unsigned long flags;
+
+ raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+ if (rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_NOT)
+ mod_timer(&rdp->nocb_timer, jiffies + 1);
+ WRITE_ONCE(rdp->nocb_defer_wakeup, waketype);
+ trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, reason);
+ raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
+}
+
+/*
* Does the specified CPU need an RCU callback for the specified flavor
* of rcu_barrier()?
*/
@@ -1891,11 +1930,8 @@ static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
TPS("WakeEmpty"));
} else {
- WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE);
- /* Store ->nocb_defer_wakeup before ->rcu_urgent_qs. */
- smp_store_release(this_cpu_ptr(&rcu_dynticks.rcu_urgent_qs), true);
- trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
- TPS("WakeEmptyIsDeferred"));
+ wake_nocb_leader_defer(rdp, RCU_NOCB_WAKE,
+ TPS("WakeEmptyIsDeferred"));
}
rdp->qlen_last_fqs_check = 0;
} else if (len > rdp->qlen_last_fqs_check + qhimark) {
@@ -1905,11 +1941,8 @@ static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
TPS("WakeOvf"));
} else {
- WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_FORCE);
- /* Store ->nocb_defer_wakeup before ->rcu_urgent_qs. */
- smp_store_release(this_cpu_ptr(&rcu_dynticks.rcu_urgent_qs), true);
- trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
- TPS("WakeOvfIsDeferred"));
+ wake_nocb_leader_defer(rdp, RCU_NOCB_WAKE,
+ TPS("WakeOvfIsDeferred"));
}
rdp->qlen_last_fqs_check = LONG_MAX / 2;
} else {
@@ -2031,6 +2064,7 @@ static void rcu_nocb_wait_gp(struct rcu_data *rdp)
static void nocb_leader_wait(struct rcu_data *my_rdp)
{
bool firsttime = true;
+ unsigned long flags;
bool gotcbs;
struct rcu_data *rdp;
struct rcu_head **tail;
@@ -2042,7 +2076,11 @@ static void nocb_leader_wait(struct rcu_data *my_rdp)
trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu, "Sleep");
swait_event_interruptible(my_rdp->nocb_wq,
!READ_ONCE(my_rdp->nocb_leader_sleep));
- /* Memory barrier handled by smp_mb() calls below and repoll. */
+ raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags);
+ my_rdp->nocb_leader_sleep = true;
+ WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
+ del_timer(&my_rdp->nocb_timer);
+ raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags);
} else if (firsttime) {
firsttime = false; /* Don't drown trace log with "Poll"! */
trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu, "Poll");
@@ -2054,7 +2092,7 @@ static void nocb_leader_wait(struct rcu_data *my_rdp)
* nocb_gp_head, where they await a grace period.
*/
gotcbs = false;
- smp_mb(); /* wakeup before ->nocb_head reads. */
+ smp_mb(); /* wakeup and _sleep before ->nocb_head reads. */
for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) {
rdp->nocb_gp_head = READ_ONCE(rdp->nocb_head);
if (!rdp->nocb_gp_head)
@@ -2066,56 +2104,41 @@ static void nocb_leader_wait(struct rcu_data *my_rdp)
gotcbs = true;
}
- /*
- * If there were no callbacks, sleep a bit, rescan after a
- * memory barrier, and go retry.
- */
+ /* No callbacks? Sleep a bit if polling, and go retry. */
if (unlikely(!gotcbs)) {
- if (!rcu_nocb_poll)
+ WARN_ON(signal_pending(current));
+ if (rcu_nocb_poll) {
+ schedule_timeout_interruptible(1);
+ } else {
trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu,
"WokeEmpty");
- WARN_ON(signal_pending(current));
- schedule_timeout_interruptible(1);
-
- /* Rescan in case we were a victim of memory ordering. */
- my_rdp->nocb_leader_sleep = true;
- smp_mb(); /* Ensure _sleep true before scan. */
- for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower)
- if (READ_ONCE(rdp->nocb_head)) {
- /* Found CB, so short-circuit next wait. */
- my_rdp->nocb_leader_sleep = false;
- break;
- }
+ }
goto wait_again;
}
/* Wait for one grace period. */
rcu_nocb_wait_gp(my_rdp);
- /*
- * We left ->nocb_leader_sleep unset to reduce cache thrashing.
- * We set it now, but recheck for new callbacks while
- * traversing our follower list.
- */
- my_rdp->nocb_leader_sleep = true;
- smp_mb(); /* Ensure _sleep true before scan of ->nocb_head. */
-
/* Each pass through the following loop wakes a follower, if needed. */
for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) {
- if (READ_ONCE(rdp->nocb_head))
+ if (!rcu_nocb_poll &&
+ READ_ONCE(rdp->nocb_head) &&
+ READ_ONCE(my_rdp->nocb_leader_sleep)) {
+ raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags);
my_rdp->nocb_leader_sleep = false;/* No need to sleep.*/
+ raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags);
+ }
if (!rdp->nocb_gp_head)
continue; /* No CBs, so no need to wake follower. */
/* Append callbacks to follower's "done" list. */
- tail = xchg(&rdp->nocb_follower_tail, rdp->nocb_gp_tail);
+ raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+ tail = rdp->nocb_follower_tail;
+ rdp->nocb_follower_tail = rdp->nocb_gp_tail;
*tail = rdp->nocb_gp_head;
- smp_mb__after_atomic(); /* Store *tail before wakeup. */
+ raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
if (rdp != my_rdp && tail == &rdp->nocb_follower_head) {
- /*
- * List was empty, wake up the follower.
- * Memory barriers supplied by atomic_long_add().
- */
+ /* List was empty, so wake up the follower. */
swake_up(&rdp->nocb_wq);
}
}
@@ -2131,28 +2154,16 @@ static void nocb_leader_wait(struct rcu_data *my_rdp)
*/
static void nocb_follower_wait(struct rcu_data *rdp)
{
- bool firsttime = true;
-
for (;;) {
- if (!rcu_nocb_poll) {
- trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
- "FollowerSleep");
- swait_event_interruptible(rdp->nocb_wq,
- READ_ONCE(rdp->nocb_follower_head));
- } else if (firsttime) {
- /* Don't drown trace log with "Poll"! */
- firsttime = false;
- trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "Poll");
- }
+ trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "FollowerSleep");
+ swait_event_interruptible(rdp->nocb_wq,
+ READ_ONCE(rdp->nocb_follower_head));
if (smp_load_acquire(&rdp->nocb_follower_head)) {
/* ^^^ Ensure CB invocation follows _head test. */
return;
}
- if (!rcu_nocb_poll)
- trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
- "WokeEmpty");
WARN_ON(signal_pending(current));
- schedule_timeout_interruptible(1);
+ trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "WokeEmpty");
}
}
@@ -2165,6 +2176,7 @@ static void nocb_follower_wait(struct rcu_data *rdp)
static int rcu_nocb_kthread(void *arg)
{
int c, cl;
+ unsigned long flags;
struct rcu_head *list;
struct rcu_head *next;
struct rcu_head **tail;
@@ -2179,11 +2191,14 @@ static int rcu_nocb_kthread(void *arg)
nocb_follower_wait(rdp);
/* Pull the ready-to-invoke callbacks onto local list. */
- list = READ_ONCE(rdp->nocb_follower_head);
+ raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+ list = rdp->nocb_follower_head;
+ rdp->nocb_follower_head = NULL;
+ tail = rdp->nocb_follower_tail;
+ rdp->nocb_follower_tail = &rdp->nocb_follower_head;
+ raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
BUG_ON(!list);
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "WokeNonEmpty");
- WRITE_ONCE(rdp->nocb_follower_head, NULL);
- tail = xchg(&rdp->nocb_follower_tail, &rdp->nocb_follower_head);
/* Each pass through the following loop invokes a callback. */
trace_rcu_batch_start(rdp->rsp->name,
@@ -2226,18 +2241,39 @@ static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp)
}
/* Do a deferred wakeup of rcu_nocb_kthread(). */
-static void do_nocb_deferred_wakeup(struct rcu_data *rdp)
+static void do_nocb_deferred_wakeup_common(struct rcu_data *rdp)
{
+ unsigned long flags;
int ndw;
- if (!rcu_nocb_need_deferred_wakeup(rdp))
+ raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+ if (!rcu_nocb_need_deferred_wakeup(rdp)) {
+ raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
return;
+ }
ndw = READ_ONCE(rdp->nocb_defer_wakeup);
WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
- wake_nocb_leader(rdp, ndw == RCU_NOCB_WAKE_FORCE);
+ __wake_nocb_leader(rdp, ndw == RCU_NOCB_WAKE_FORCE, flags);
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, TPS("DeferredWake"));
}
+/* Do a deferred wakeup of rcu_nocb_kthread() from a timer handler. */
+static void do_nocb_deferred_wakeup_timer(unsigned long x)
+{
+ do_nocb_deferred_wakeup_common((struct rcu_data *)x);
+}
+
+/*
+ * Do a deferred wakeup of rcu_nocb_kthread() from fastpath.
+ * This means we do an inexact common-case check. Note that if
+ * we miss, ->nocb_timer will eventually clean things up.
+ */
+static void do_nocb_deferred_wakeup(struct rcu_data *rdp)
+{
+ if (rcu_nocb_need_deferred_wakeup(rdp))
+ do_nocb_deferred_wakeup_common(rdp);
+}
+
void __init rcu_init_nohz(void)
{
int cpu;
@@ -2287,6 +2323,9 @@ static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
rdp->nocb_tail = &rdp->nocb_head;
init_swait_queue_head(&rdp->nocb_wq);
rdp->nocb_follower_tail = &rdp->nocb_follower_head;
+ raw_spin_lock_init(&rdp->nocb_lock);
+ setup_timer(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer,
+ (unsigned long)rdp);
}
/*
--
2.5.2
next prev parent reply other threads:[~2017-07-24 21:46 UTC|newest]
Thread overview: 35+ messages / expand[flat|nested] mbox.gz Atom feed top
2017-07-24 21:44 [PATCH tip/core/rcu 0/15] General fixes Paul E. McKenney
2017-07-24 21:44 ` [PATCH tip/core/rcu 01/15] sched,rcu: Make cond_resched() provide RCU quiescent state Paul E. McKenney
2017-08-15 16:19 ` [PATCH v5 " Paul E. McKenney
2017-08-17 8:22 ` Ingo Molnar
2017-08-17 12:40 ` Paul E. McKenney
2017-07-24 21:44 ` Paul E. McKenney [this message]
2017-07-25 18:12 ` [PATCH tip/core/rcu 02/15] rcu: Use timer as backstop for NOCB deferred wakeups Steven Rostedt
2017-07-25 19:18 ` Paul E. McKenney
2017-07-25 22:17 ` Steven Rostedt
2017-07-26 0:05 ` Paul E. McKenney
2017-07-26 21:18 ` Steven Rostedt
2017-07-26 21:47 ` Paul E. McKenney
2017-07-26 23:09 ` Steven Rostedt
2017-07-27 17:33 ` Paul E. McKenney
2017-07-24 21:44 ` [PATCH tip/core/rcu 03/15] rcu: Drive TASKS_RCU directly off of PREEMPT Paul E. McKenney
2017-07-25 18:14 ` Steven Rostedt
2017-07-25 19:19 ` Paul E. McKenney
2017-07-24 21:44 ` [PATCH tip/core/rcu 04/15] rcu: Create reasonable API for do_exit() TASKS_RCU processing Paul E. McKenney
2017-07-24 21:44 ` [PATCH tip/core/rcu 05/15] rcu: Add TPS() to event-traced strings Paul E. McKenney
2017-07-28 1:32 ` Steven Rostedt
2017-07-24 21:44 ` [PATCH tip/core/rcu 06/15] rcu: Move rcu.h to new trivial-function style Paul E. McKenney
2017-07-24 21:44 ` [PATCH tip/core/rcu 07/15] rcu: Add event tracing to ->gp_tasks update at GP start Paul E. McKenney
2017-07-28 1:38 ` Steven Rostedt
2017-07-28 3:22 ` Paul E. McKenney
2017-07-28 12:18 ` Steven Rostedt
2017-07-28 17:13 ` Paul E. McKenney
2017-07-24 21:44 ` [PATCH tip/core/rcu 08/15] swait: add idle variants which don't contribute to load average Paul E. McKenney
2017-07-24 21:44 ` [PATCH tip/core/rcu 09/15] rcu: use idle versions of swait to make idle-hack clear Paul E. McKenney
2017-07-24 21:44 ` [PATCH tip/core/rcu 10/15] rcu: Add TPS() protection for _rcu_barrier_trace strings Paul E. McKenney
2017-07-28 1:40 ` Steven Rostedt
2017-07-24 21:44 ` [PATCH tip/core/rcu 11/15] rcu/tracing: Set disable_rcu_irq_enter on rcu_eqs_exit() Paul E. McKenney
2017-07-24 21:44 ` [PATCH tip/core/rcu 12/15] rcu: Add assertions verifying blocked-tasks list Paul E. McKenney
2017-07-24 21:44 ` [PATCH tip/core/rcu 13/15] rcu: Make rcu_idle_enter() rely on callers disabling irqs Paul E. McKenney
2017-07-24 21:44 ` [PATCH tip/core/rcu 14/15] rcu: Add warning to rcu_idle_enter() for irqs enabled Paul E. McKenney
2017-07-24 21:44 ` [PATCH tip/core/rcu 15/15] rcu: Remove exports from rcu_idle_exit() and rcu_idle_enter() Paul E. McKenney
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1500932684-10469-2-git-send-email-paulmck@linux.vnet.ibm.com \
--to=paulmck@linux.vnet.ibm.com \
--cc=akpm@linux-foundation.org \
--cc=dhowells@redhat.com \
--cc=dipankar@in.ibm.com \
--cc=edumazet@google.com \
--cc=fweisbec@gmail.com \
--cc=jiangshanlai@gmail.com \
--cc=josh@joshtriplett.org \
--cc=linux-kernel@vger.kernel.org \
--cc=mathieu.desnoyers@efficios.com \
--cc=mingo@kernel.org \
--cc=oleg@redhat.com \
--cc=peterz@infradead.org \
--cc=rostedt@goodmis.org \
--cc=tglx@linutronix.de \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).