* [PATCH RFC] rcu: Make rcu_barrier() less disruptive
@ 2012-03-15 16:48 Paul E. McKenney
2012-03-15 17:45 ` Mathieu Desnoyers
2012-03-19 14:23 ` Srivatsa S. Bhat
0 siblings, 2 replies; 6+ messages in thread
From: Paul E. McKenney @ 2012-03-15 16:48 UTC (permalink / raw)
To: linux-kernel, srivatsa.bhat
Cc: mingo, laijs, dipankar, akpm, mathieu.desnoyers, josh, niv, tglx,
peterz, rostedt, Valdis.Kletnieks, dhowells, eric.dumazet, darren,
fweisbec, patches
The rcu_barrier() primitive interrupts each and every CPU, registering
a callback on every CPU. Once all of these callbacks have been invoked,
rcu_barrier() knows that every callback that was registered before
the call to rcu_barrier() has also been invoked.
However, there is no point in registering a callback on a CPU that
currently has no callbacks, most especially if that CPU is in a
deep idle state. This commit therefore makes rcu_barrier() avoid
interrupting CPUs that have no callbacks. Doing this requires reworking
the handling of orphaned callbacks, otherwise callbacks could slip through
rcu_barrier()'s net by being orphaned from a CPU that rcu_barrier() had
not yet interrupted to a CPU that rcu_barrier() had already interrupted.
This reworking was needed anyway to take a first step towards weaning
RCU from the CPU_DYING notifier's use of stop_cpu().
Signed-off-by: Paul E. McKenney <paul.mckenney@linaro.org>
Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
diff --git a/kernel/rcutree.c b/kernel/rcutree.c
index 403306b..8269656 100644
--- a/kernel/rcutree.c
+++ b/kernel/rcutree.c
@@ -75,6 +75,8 @@ static struct lock_class_key rcu_node_class[NUM_RCU_LVLS];
.gpnum = -300, \
.completed = -300, \
.onofflock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.onofflock), \
+ .orphan_nxttail = &structname##_state.orphan_nxtlist, \
+ .orphan_donetail = &structname##_state.orphan_donelist, \
.fqslock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.fqslock), \
.n_force_qs = 0, \
.n_force_qs_ngp = 0, \
@@ -145,6 +147,13 @@ static void invoke_rcu_callbacks(struct rcu_state *rsp, struct rcu_data *rdp);
unsigned long rcutorture_testseq;
unsigned long rcutorture_vernum;
+/* State information for rcu_barrier() and friends. */
+
+static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
+static atomic_t rcu_barrier_cpu_count;
+static DEFINE_MUTEX(rcu_barrier_mutex);
+static DECLARE_WAIT_QUEUE_HEAD(rcu_barrier_wq);
+
/*
* Return true if an RCU grace period is in progress. The ACCESS_ONCE()s
* permit this function to be invoked without holding the root rcu_node
@@ -1311,7 +1320,60 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
#ifdef CONFIG_HOTPLUG_CPU
/*
- * Move a dying CPU's RCU callbacks to online CPU's callback list.
+ * Adopt the RCU callbacks from the specified rcu_state structure's
+ * orphanage. The caller must hold the ->onofflock.
+ */
+static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
+{
+ int i;
+ struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);
+
+ /*
+ * If there is an rcu_barrier() operation in progress, then
+ * only the task doing that operation is permitted to adopt
+ * callbacks. To do otherwise breaks rcu_barrier() and friends
+ * by causing them to fail to wait for the callbacks in the
+ * orphanage.
+ */
+ if (rsp->rcu_barrier_in_progress &&
+ rsp->rcu_barrier_in_progress != current)
+ return;
+
+ /* Do the accounting first. */
+ rdp->qlen_lazy += rsp->qlen_lazy;
+ rdp->qlen += rsp->qlen;
+ rdp->n_cbs_adopted += rsp->qlen;
+ rsp->qlen_lazy = 0;
+ rsp->qlen = 0;
+
+ /*
+ * We do not need a memory barrier here because the only way we
+ * can get here if there is an rcu_barrier() in flight is if
+ * we are the task doing the rcu_barrier().
+ */
+
+ /* First adopt the ready-to-invoke callbacks. */
+ if (rsp->orphan_donelist != NULL) {
+ *rsp->orphan_donetail = *rdp->nxttail[RCU_DONE_TAIL];
+ *rdp->nxttail[RCU_DONE_TAIL] = rsp->orphan_donelist;
+ for (i = RCU_NEXT_SIZE - 1; i >= RCU_DONE_TAIL; i--)
+ if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL])
+ rdp->nxttail[i] = rsp->orphan_donetail;
+ rsp->orphan_donelist = NULL;
+ rsp->orphan_donetail = &rsp->orphan_donelist;
+ }
+
+ /* And then adopt the callbacks that still need a grace period. */
+ if (rsp->orphan_nxtlist != NULL) {
+ *rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxtlist;
+ rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxttail;
+ rsp->orphan_nxtlist = NULL;
+ rsp->orphan_nxttail = &rsp->orphan_nxtlist;
+ }
+}
+
+/*
+ * Move a dying CPU's RCU callbacks to the rcu_state structure's orphanage.
* Also record a quiescent state for this CPU for the current grace period.
* Synchronization and interrupt disabling are not required because
* this function executes in stop_machine() context. Therefore, cleanup
@@ -1325,64 +1387,67 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
{
int i;
+ unsigned long flags;
unsigned long mask;
- int receive_cpu = cpumask_any(cpu_online_mask);
+ bool orphaned = 0;
struct rcu_data *rdp = this_cpu_ptr(rsp->rda);
- struct rcu_data *receive_rdp = per_cpu_ptr(rsp->rda, receive_cpu);
RCU_TRACE(struct rcu_node *rnp = rdp->mynode); /* For dying CPU. */
- /* First, adjust the counts. */
+ /* Move the callbacks to the orphanage under ->onofflock protection. */
+ raw_spin_lock_irqsave(&rsp->onofflock, flags);
+
+ /* First adjust the counts. */
if (rdp->nxtlist != NULL) {
- receive_rdp->qlen_lazy += rdp->qlen_lazy;
- receive_rdp->qlen += rdp->qlen;
+ rsp->qlen_lazy += rdp->qlen_lazy;
+ rsp->qlen += rdp->qlen;
+ rdp->n_cbs_orphaned += rdp->qlen;
rdp->qlen_lazy = 0;
rdp->qlen = 0;
+ orphaned = 1;
}
/*
- * Next, move ready-to-invoke callbacks to be invoked on some
- * other CPU. These will not be required to pass through another
- * grace period: They are done, regardless of CPU.
+ * Next, move those callbacks still needing a grace period to
+ * the orphanage, where some other CPU will pick them up.
+ * Some of the callbacks might have gone partway through a grace
+ * period, but that is too bad. They get to start over because we
+ * cannot assume that grace periods are synchronized across CPUs.
+ * We don't bother updating the ->nxttail[] array yet, instead
+ * we just reset the whole thing later on.
*/
- if (rdp->nxtlist != NULL &&
- rdp->nxttail[RCU_DONE_TAIL] != &rdp->nxtlist) {
- struct rcu_head *oldhead;
- struct rcu_head **oldtail;
- struct rcu_head **newtail;
-
- oldhead = rdp->nxtlist;
- oldtail = receive_rdp->nxttail[RCU_DONE_TAIL];
- rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL];
- *rdp->nxttail[RCU_DONE_TAIL] = *oldtail;
- *receive_rdp->nxttail[RCU_DONE_TAIL] = oldhead;
- newtail = rdp->nxttail[RCU_DONE_TAIL];
- for (i = RCU_DONE_TAIL; i < RCU_NEXT_SIZE; i++) {
- if (receive_rdp->nxttail[i] == oldtail)
- receive_rdp->nxttail[i] = newtail;
- if (rdp->nxttail[i] == newtail)
- rdp->nxttail[i] = &rdp->nxtlist;
- }
+ if (*rdp->nxttail[RCU_DONE_TAIL] != NULL) {
+ *rsp->orphan_nxttail = *rdp->nxttail[RCU_DONE_TAIL];
+ rsp->orphan_nxttail = rdp->nxttail[RCU_NEXT_TAIL];
+ *rdp->nxttail[RCU_DONE_TAIL] = NULL;
}
/*
- * Finally, put the rest of the callbacks at the end of the list.
- * The ones that made it partway through get to start over: We
- * cannot assume that grace periods are synchronized across CPUs.
- * (We could splice RCU_WAIT_TAIL into RCU_NEXT_READY_TAIL, but
- * this does not seem compelling. Not yet, anyway.)
+ * Then move the ready-to-invoke callbacks to the orphanage,
+ * where some other CPU will pick them up. These will not be
+ * required to pass though another grace period: They are done.
*/
if (rdp->nxtlist != NULL) {
- *receive_rdp->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist;
- receive_rdp->nxttail[RCU_NEXT_TAIL] =
- rdp->nxttail[RCU_NEXT_TAIL];
- receive_rdp->n_cbs_adopted += rdp->qlen;
- rdp->n_cbs_orphaned += rdp->qlen;
-
- rdp->nxtlist = NULL;
- for (i = 0; i < RCU_NEXT_SIZE; i++)
- rdp->nxttail[i] = &rdp->nxtlist;
+ *rsp->orphan_donetail = rdp->nxtlist;
+ rsp->orphan_donetail = rdp->nxttail[RCU_DONE_TAIL];
}
+ /* Finally, initialize the rcu_data structure's list to empty. */
+ rdp->nxtlist = NULL;
+ for (i = 0; i < RCU_NEXT_SIZE; i++)
+ rdp->nxttail[i] = &rdp->nxtlist;
+
+ /*
+ * Wait up the rcu_barrier() task if there is one and if we
+ * actually sent anything to the orphanage. Except that we
+ * must delay the wakeup until ->onofflock is released to
+ * avoid deadlock.
+ */
+ if (!rsp->rcu_barrier_in_progress)
+ orphaned = 0;
+ raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
+ if (orphaned)
+ wake_up(&rcu_barrier_wq);
+
/*
* Record a quiescent state for the dying CPU. This is safe
* only because we have already cleared out the callbacks.
@@ -1415,11 +1480,14 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
rcu_stop_cpu_kthread(cpu);
rcu_node_kthread_setaffinity(rnp, -1);
- /* Remove the dying CPU from the bitmasks in the rcu_node hierarchy. */
+ /* Remove the dead CPU from the bitmasks in the rcu_node hierarchy. */
/* Exclude any attempts to start a new grace period. */
raw_spin_lock_irqsave(&rsp->onofflock, flags);
+ /* Collect the dead CPU's callbacks. */
+ rcu_adopt_orphan_cbs(rsp);
+
/* Remove the outgoing CPU from the masks in the rcu_node hierarchy. */
mask = rdp->grpmask; /* rnp->grplo is constant. */
do {
@@ -1456,6 +1524,10 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
#else /* #ifdef CONFIG_HOTPLUG_CPU */
+static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
+{
+}
+
static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
{
}
@@ -1524,9 +1596,6 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
rcu_is_callbacks_kthread());
/* Update count, and requeue any remaining callbacks. */
- rdp->qlen_lazy -= count_lazy;
- rdp->qlen -= count;
- rdp->n_cbs_invoked += count;
if (list != NULL) {
*tail = rdp->nxtlist;
rdp->nxtlist = list;
@@ -1536,6 +1605,10 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
else
break;
}
+ smp_mb(); /* List handling before counting for rcu_barrier(). */
+ rdp->qlen_lazy -= count_lazy;
+ rdp->qlen -= count;
+ rdp->n_cbs_invoked += count;
/* Reinstate batch limit if we have worked down the excess. */
if (rdp->blimit == LONG_MAX && rdp->qlen <= qlowmark)
@@ -1824,13 +1897,14 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
rdp = this_cpu_ptr(rsp->rda);
/* Add the callback to our list. */
- *rdp->nxttail[RCU_NEXT_TAIL] = head;
- rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
rdp->qlen++;
if (lazy)
rdp->qlen_lazy++;
else
rcu_idle_count_callbacks_posted();
+ smp_mb(); /* Count before adding callback for rcu_barrier(). */
+ *rdp->nxttail[RCU_NEXT_TAIL] = head;
+ rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
if (__is_kfree_rcu_offset((unsigned long)func))
trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func,
@@ -2169,15 +2243,10 @@ static int rcu_cpu_has_callbacks(int cpu)
rcu_preempt_cpu_has_callbacks(cpu);
}
-static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
-static atomic_t rcu_barrier_cpu_count;
-static DEFINE_MUTEX(rcu_barrier_mutex);
-static struct completion rcu_barrier_completion;
-
static void rcu_barrier_callback(struct rcu_head *notused)
{
if (atomic_dec_and_test(&rcu_barrier_cpu_count))
- complete(&rcu_barrier_completion);
+ wake_up(&rcu_barrier_wq);
}
/*
@@ -2203,27 +2272,121 @@ static void _rcu_barrier(struct rcu_state *rsp,
void (*call_rcu_func)(struct rcu_head *head,
void (*func)(struct rcu_head *head)))
{
- BUG_ON(in_interrupt());
+ int cpu;
+ unsigned long flags;
+ struct rcu_data *rdp;
+ struct rcu_head rh;
+
+ init_rcu_head_on_stack(&rh);
+
/* Take mutex to serialize concurrent rcu_barrier() requests. */
mutex_lock(&rcu_barrier_mutex);
- init_completion(&rcu_barrier_completion);
+
+ smp_mb(); /* Prevent any prior operations from leaking in. */
+
/*
- * Initialize rcu_barrier_cpu_count to 1, then invoke
- * rcu_barrier_func() on each CPU, so that each CPU also has
- * incremented rcu_barrier_cpu_count. Only then is it safe to
- * decrement rcu_barrier_cpu_count -- otherwise the first CPU
- * might complete its grace period before all of the other CPUs
- * did their increment, causing this function to return too
- * early. Note that on_each_cpu() disables irqs, which prevents
- * any CPUs from coming online or going offline until each online
- * CPU has queued its RCU-barrier callback.
+ * Initialize the count to one rather than to zero in order to
+ * avoid a too-soon return to zero in case of a short grace period
+ * (or preemption of this task). Also flag this task as doing
+ * an rcu_barrier(). This will prevent anyone else from adopting
+ * orphaned callbacks, which could cause otherwise failure if a
+ * CPU went offline and quickly came back online. To see this,
+ * consider the following sequence of events:
+ *
+ * 1. We cause CPU 0 to post an rcu_barrier_callback() callback.
+ * 2. CPU 1 goes offline, orphaning its callbacks.
+ * 3. CPU 0 adopts CPU 1's orphaned callbacks.
+ * 4. CPU 1 comes back online.
+ * 5. We cause CPU 1 to post an rcu_barrier_callback() callback.
+ * 6. Both rcu_barrier_callback() callbacks are invoked, awakening
+ * us -- but before CPU 1's orphaned callbacks are invoked!!!
*/
atomic_set(&rcu_barrier_cpu_count, 1);
- on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
- if (atomic_dec_and_test(&rcu_barrier_cpu_count))
- complete(&rcu_barrier_completion);
- wait_for_completion(&rcu_barrier_completion);
+ raw_spin_lock_irqsave(&rsp->onofflock, flags);
+ rsp->rcu_barrier_in_progress = current;
+ raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
+
+ /*
+ * Force every CPU with callbacks to register a new callback
+ * that will tell us when all the preceding callbacks have
+ * been invoked. If an offline CPU has callbacks, wait for
+ * it to either come back online or to finish orphaning those
+ * callbacks.
+ */
+ for_each_possible_cpu(cpu) {
+ preempt_disable();
+ rdp = per_cpu_ptr(rsp->rda, cpu);
+ if (cpu_is_offline(cpu)) {
+ preempt_enable();
+ while (cpu_is_offline(cpu) &&
+ ACCESS_ONCE(rdp->qlen))
+ schedule_timeout_interruptible(1);
+ } else if (ACCESS_ONCE(rdp->qlen)) {
+ smp_call_function_single(cpu, rcu_barrier_func,
+ (void *)call_rcu_func, 1);
+ preempt_enable();
+ }
+ }
+
+ /*
+ * Force any ongoing CPU-hotplug operations to complete,
+ * so that any callbacks from the outgoing CPUs are now in
+ * the orphanage.
+ */
+ cpu_maps_update_begin();
+ cpu_maps_update_done();
+
+ /*
+ * Now that all online CPUs have rcu_barrier_callback() callbacks
+ * posted, we can adopt all of the orphaned callbacks and place
+ * an rcu_barrier_callback() callback after them. When that is done,
+ * we are guaranteed to have an rcu_barrier_callback() callback
+ * following every callback that could possibly have been
+ * registered before _rcu_barrier() was called.
+ */
+ raw_spin_lock_irqsave(&rsp->onofflock, flags);
+ rcu_adopt_orphan_cbs(rsp);
+ atomic_inc(&rcu_barrier_cpu_count);
+ call_rcu_func(&rh, rcu_barrier_callback);
+ raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
+
+ /*
+ * Now that we have an rcu_barrier_callback() callback on each
+ * CPU, and thus each counted, remove the initial count.
+ */
+ atomic_dec(&rcu_barrier_cpu_count);
+ smp_mb__after_atomic_dec();
+
+ /*
+ * Loop waiting for all rcu_barrier_callback() callbacks to be
+ * invoked. Adopt any orphaned callbacks in the meantime, just
+ * in case one of the rcu_barrier_callback() callbacks is orphaned.
+ */
+ while (atomic_read(&rcu_barrier_cpu_count) > 0) {
+ wait_event(rcu_barrier_wq,
+ atomic_read(&rcu_barrier_cpu_count) == 0 ||
+ ACCESS_ONCE(rsp->qlen));
+ if (ACCESS_ONCE(rsp->qlen)) {
+ raw_spin_lock_irqsave(&rsp->onofflock, flags);
+ rcu_adopt_orphan_cbs(rsp);
+ raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
+ }
+ }
+
+ /*
+ * Done, so let others adopt orphaned callbacks. But avoid
+ * indefinite postponement of any additional orphans by adopting
+ * one more time.
+ */
+ raw_spin_lock_irqsave(&rsp->onofflock, flags);
+ rcu_adopt_orphan_cbs(rsp);
+ rsp->rcu_barrier_in_progress = NULL;
+ raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
+
+ /* Other rcu_barrier() invocations can now safely proceed. */
mutex_unlock(&rcu_barrier_mutex);
+
+ destroy_rcu_head_on_stack(&rh);
}
/**
diff --git a/kernel/rcutree.h b/kernel/rcutree.h
index 36ca28e..1e49c56 100644
--- a/kernel/rcutree.h
+++ b/kernel/rcutree.h
@@ -371,6 +371,17 @@ struct rcu_state {
raw_spinlock_t onofflock; /* exclude on/offline and */
/* starting new GP. */
+ struct rcu_head *orphan_nxtlist; /* Orphaned callbacks that */
+ /* need a grace period. */
+ struct rcu_head **orphan_nxttail; /* Tail of above. */
+ struct rcu_head *orphan_donelist; /* Orphaned callbacks that */
+ /* are ready to invoke. */
+ struct rcu_head **orphan_donetail; /* Tail of above. */
+ long qlen_lazy; /* Number of lazy callbacks. */
+ long qlen; /* Total number of callbacks. */
+ struct task_struct *rcu_barrier_in_progress;
+ /* Task doing rcu_barrier(), */
+ /* or NULL if no barrier. */
raw_spinlock_t fqslock; /* Only one task forcing */
/* quiescent states. */
unsigned long jiffies_force_qs; /* Time at which to invoke */
diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
index ed459ed..d4bc16d 100644
--- a/kernel/rcutree_trace.c
+++ b/kernel/rcutree_trace.c
@@ -271,13 +271,13 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp)
gpnum = rsp->gpnum;
seq_printf(m, "c=%lu g=%lu s=%d jfq=%ld j=%x "
- "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu\n",
+ "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld/%ld\n",
rsp->completed, gpnum, rsp->fqs_state,
(long)(rsp->jiffies_force_qs - jiffies),
(int)(jiffies & 0xffff),
rsp->n_force_qs, rsp->n_force_qs_ngp,
rsp->n_force_qs - rsp->n_force_qs_ngp,
- rsp->n_force_qs_lh);
+ rsp->n_force_qs_lh, rsp->qlen_lazy, rsp->qlen);
for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) {
if (rnp->level != level) {
seq_puts(m, "\n");
^ permalink raw reply related [flat|nested] 6+ messages in thread
* Re: [PATCH RFC] rcu: Make rcu_barrier() less disruptive
2012-03-15 16:48 [PATCH RFC] rcu: Make rcu_barrier() less disruptive Paul E. McKenney
@ 2012-03-15 17:45 ` Mathieu Desnoyers
2012-03-15 18:21 ` Paul E. McKenney
2012-03-19 14:23 ` Srivatsa S. Bhat
1 sibling, 1 reply; 6+ messages in thread
From: Mathieu Desnoyers @ 2012-03-15 17:45 UTC (permalink / raw)
To: Paul E. McKenney
Cc: linux-kernel, srivatsa.bhat, mingo, laijs, dipankar, akpm, josh,
niv, tglx, peterz, rostedt, Valdis.Kletnieks, dhowells,
eric.dumazet, darren, fweisbec, patches
* Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote:
> The rcu_barrier() primitive interrupts each and every CPU, registering
> a callback on every CPU. Once all of these callbacks have been invoked,
> rcu_barrier() knows that every callback that was registered before
> the call to rcu_barrier() has also been invoked.
>
> However, there is no point in registering a callback on a CPU that
> currently has no callbacks, most especially if that CPU is in a
> deep idle state. This commit therefore makes rcu_barrier() avoid
> interrupting CPUs that have no callbacks. Doing this requires reworking
> the handling of orphaned callbacks, otherwise callbacks could slip through
> rcu_barrier()'s net by being orphaned from a CPU that rcu_barrier() had
> not yet interrupted to a CPU that rcu_barrier() had already interrupted.
> This reworking was needed anyway to take a first step towards weaning
> RCU from the CPU_DYING notifier's use of stop_cpu().
Quoting Documentation/RCU/rcubarrier.txt:
"We instead need the rcu_barrier() primitive. This primitive is similar
to synchronize_rcu(), but instead of waiting solely for a grace
period to elapse, it also waits for all outstanding RCU callbacks to
complete. Pseudo-code using rcu_barrier() is as follows:"
The patch you propose seems like a good approach to make rcu_barrier
less disruptive, but everyone need to be aware that rcu_barrier() would
quit having the side-effect of doing the equivalent of
"synchronize_rcu()" from now on: within this new approach, in the case
where there are no pending callbacks, rcu_barrier() could, AFAIU, return
without waiting for the current grace period to complete.
Any use of rcu_barrier() that would assume that a synchronize_rcu() is
implicit with the rcu_barrier() execution would be a bug anyway, but
those might only show up after this patch is applied. I would therefore
recommend to audit all rcu_barrier() users to ensure none is expecting
rcu_barrier to act as a synchronize_rcu before pushing this change.
Thanks,
Mathieu
>
> Signed-off-by: Paul E. McKenney <paul.mckenney@linaro.org>
> Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
>
> diff --git a/kernel/rcutree.c b/kernel/rcutree.c
> index 403306b..8269656 100644
> --- a/kernel/rcutree.c
> +++ b/kernel/rcutree.c
> @@ -75,6 +75,8 @@ static struct lock_class_key rcu_node_class[NUM_RCU_LVLS];
> .gpnum = -300, \
> .completed = -300, \
> .onofflock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.onofflock), \
> + .orphan_nxttail = &structname##_state.orphan_nxtlist, \
> + .orphan_donetail = &structname##_state.orphan_donelist, \
> .fqslock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.fqslock), \
> .n_force_qs = 0, \
> .n_force_qs_ngp = 0, \
> @@ -145,6 +147,13 @@ static void invoke_rcu_callbacks(struct rcu_state *rsp, struct rcu_data *rdp);
> unsigned long rcutorture_testseq;
> unsigned long rcutorture_vernum;
>
> +/* State information for rcu_barrier() and friends. */
> +
> +static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
> +static atomic_t rcu_barrier_cpu_count;
> +static DEFINE_MUTEX(rcu_barrier_mutex);
> +static DECLARE_WAIT_QUEUE_HEAD(rcu_barrier_wq);
> +
> /*
> * Return true if an RCU grace period is in progress. The ACCESS_ONCE()s
> * permit this function to be invoked without holding the root rcu_node
> @@ -1311,7 +1320,60 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
> #ifdef CONFIG_HOTPLUG_CPU
>
> /*
> - * Move a dying CPU's RCU callbacks to online CPU's callback list.
> + * Adopt the RCU callbacks from the specified rcu_state structure's
> + * orphanage. The caller must hold the ->onofflock.
> + */
> +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
> +{
> + int i;
> + struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);
> +
> + /*
> + * If there is an rcu_barrier() operation in progress, then
> + * only the task doing that operation is permitted to adopt
> + * callbacks. To do otherwise breaks rcu_barrier() and friends
> + * by causing them to fail to wait for the callbacks in the
> + * orphanage.
> + */
> + if (rsp->rcu_barrier_in_progress &&
> + rsp->rcu_barrier_in_progress != current)
> + return;
> +
> + /* Do the accounting first. */
> + rdp->qlen_lazy += rsp->qlen_lazy;
> + rdp->qlen += rsp->qlen;
> + rdp->n_cbs_adopted += rsp->qlen;
> + rsp->qlen_lazy = 0;
> + rsp->qlen = 0;
> +
> + /*
> + * We do not need a memory barrier here because the only way we
> + * can get here if there is an rcu_barrier() in flight is if
> + * we are the task doing the rcu_barrier().
> + */
> +
> + /* First adopt the ready-to-invoke callbacks. */
> + if (rsp->orphan_donelist != NULL) {
> + *rsp->orphan_donetail = *rdp->nxttail[RCU_DONE_TAIL];
> + *rdp->nxttail[RCU_DONE_TAIL] = rsp->orphan_donelist;
> + for (i = RCU_NEXT_SIZE - 1; i >= RCU_DONE_TAIL; i--)
> + if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL])
> + rdp->nxttail[i] = rsp->orphan_donetail;
> + rsp->orphan_donelist = NULL;
> + rsp->orphan_donetail = &rsp->orphan_donelist;
> + }
> +
> + /* And then adopt the callbacks that still need a grace period. */
> + if (rsp->orphan_nxtlist != NULL) {
> + *rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxtlist;
> + rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxttail;
> + rsp->orphan_nxtlist = NULL;
> + rsp->orphan_nxttail = &rsp->orphan_nxtlist;
> + }
> +}
> +
> +/*
> + * Move a dying CPU's RCU callbacks to the rcu_state structure's orphanage.
> * Also record a quiescent state for this CPU for the current grace period.
> * Synchronization and interrupt disabling are not required because
> * this function executes in stop_machine() context. Therefore, cleanup
> @@ -1325,64 +1387,67 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
> static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
> {
> int i;
> + unsigned long flags;
> unsigned long mask;
> - int receive_cpu = cpumask_any(cpu_online_mask);
> + bool orphaned = 0;
> struct rcu_data *rdp = this_cpu_ptr(rsp->rda);
> - struct rcu_data *receive_rdp = per_cpu_ptr(rsp->rda, receive_cpu);
> RCU_TRACE(struct rcu_node *rnp = rdp->mynode); /* For dying CPU. */
>
> - /* First, adjust the counts. */
> + /* Move the callbacks to the orphanage under ->onofflock protection. */
> + raw_spin_lock_irqsave(&rsp->onofflock, flags);
> +
> + /* First adjust the counts. */
> if (rdp->nxtlist != NULL) {
> - receive_rdp->qlen_lazy += rdp->qlen_lazy;
> - receive_rdp->qlen += rdp->qlen;
> + rsp->qlen_lazy += rdp->qlen_lazy;
> + rsp->qlen += rdp->qlen;
> + rdp->n_cbs_orphaned += rdp->qlen;
> rdp->qlen_lazy = 0;
> rdp->qlen = 0;
> + orphaned = 1;
> }
>
> /*
> - * Next, move ready-to-invoke callbacks to be invoked on some
> - * other CPU. These will not be required to pass through another
> - * grace period: They are done, regardless of CPU.
> + * Next, move those callbacks still needing a grace period to
> + * the orphanage, where some other CPU will pick them up.
> + * Some of the callbacks might have gone partway through a grace
> + * period, but that is too bad. They get to start over because we
> + * cannot assume that grace periods are synchronized across CPUs.
> + * We don't bother updating the ->nxttail[] array yet, instead
> + * we just reset the whole thing later on.
> */
> - if (rdp->nxtlist != NULL &&
> - rdp->nxttail[RCU_DONE_TAIL] != &rdp->nxtlist) {
> - struct rcu_head *oldhead;
> - struct rcu_head **oldtail;
> - struct rcu_head **newtail;
> -
> - oldhead = rdp->nxtlist;
> - oldtail = receive_rdp->nxttail[RCU_DONE_TAIL];
> - rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL];
> - *rdp->nxttail[RCU_DONE_TAIL] = *oldtail;
> - *receive_rdp->nxttail[RCU_DONE_TAIL] = oldhead;
> - newtail = rdp->nxttail[RCU_DONE_TAIL];
> - for (i = RCU_DONE_TAIL; i < RCU_NEXT_SIZE; i++) {
> - if (receive_rdp->nxttail[i] == oldtail)
> - receive_rdp->nxttail[i] = newtail;
> - if (rdp->nxttail[i] == newtail)
> - rdp->nxttail[i] = &rdp->nxtlist;
> - }
> + if (*rdp->nxttail[RCU_DONE_TAIL] != NULL) {
> + *rsp->orphan_nxttail = *rdp->nxttail[RCU_DONE_TAIL];
> + rsp->orphan_nxttail = rdp->nxttail[RCU_NEXT_TAIL];
> + *rdp->nxttail[RCU_DONE_TAIL] = NULL;
> }
>
> /*
> - * Finally, put the rest of the callbacks at the end of the list.
> - * The ones that made it partway through get to start over: We
> - * cannot assume that grace periods are synchronized across CPUs.
> - * (We could splice RCU_WAIT_TAIL into RCU_NEXT_READY_TAIL, but
> - * this does not seem compelling. Not yet, anyway.)
> + * Then move the ready-to-invoke callbacks to the orphanage,
> + * where some other CPU will pick them up. These will not be
> + * required to pass though another grace period: They are done.
> */
> if (rdp->nxtlist != NULL) {
> - *receive_rdp->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist;
> - receive_rdp->nxttail[RCU_NEXT_TAIL] =
> - rdp->nxttail[RCU_NEXT_TAIL];
> - receive_rdp->n_cbs_adopted += rdp->qlen;
> - rdp->n_cbs_orphaned += rdp->qlen;
> -
> - rdp->nxtlist = NULL;
> - for (i = 0; i < RCU_NEXT_SIZE; i++)
> - rdp->nxttail[i] = &rdp->nxtlist;
> + *rsp->orphan_donetail = rdp->nxtlist;
> + rsp->orphan_donetail = rdp->nxttail[RCU_DONE_TAIL];
> }
>
> + /* Finally, initialize the rcu_data structure's list to empty. */
> + rdp->nxtlist = NULL;
> + for (i = 0; i < RCU_NEXT_SIZE; i++)
> + rdp->nxttail[i] = &rdp->nxtlist;
> +
> + /*
> + * Wait up the rcu_barrier() task if there is one and if we
> + * actually sent anything to the orphanage. Except that we
> + * must delay the wakeup until ->onofflock is released to
> + * avoid deadlock.
> + */
> + if (!rsp->rcu_barrier_in_progress)
> + orphaned = 0;
> + raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> + if (orphaned)
> + wake_up(&rcu_barrier_wq);
> +
> /*
> * Record a quiescent state for the dying CPU. This is safe
> * only because we have already cleared out the callbacks.
> @@ -1415,11 +1480,14 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
> rcu_stop_cpu_kthread(cpu);
> rcu_node_kthread_setaffinity(rnp, -1);
>
> - /* Remove the dying CPU from the bitmasks in the rcu_node hierarchy. */
> + /* Remove the dead CPU from the bitmasks in the rcu_node hierarchy. */
>
> /* Exclude any attempts to start a new grace period. */
> raw_spin_lock_irqsave(&rsp->onofflock, flags);
>
> + /* Collect the dead CPU's callbacks. */
> + rcu_adopt_orphan_cbs(rsp);
> +
> /* Remove the outgoing CPU from the masks in the rcu_node hierarchy. */
> mask = rdp->grpmask; /* rnp->grplo is constant. */
> do {
> @@ -1456,6 +1524,10 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
>
> #else /* #ifdef CONFIG_HOTPLUG_CPU */
>
> +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
> +{
> +}
> +
> static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
> {
> }
> @@ -1524,9 +1596,6 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
> rcu_is_callbacks_kthread());
>
> /* Update count, and requeue any remaining callbacks. */
> - rdp->qlen_lazy -= count_lazy;
> - rdp->qlen -= count;
> - rdp->n_cbs_invoked += count;
> if (list != NULL) {
> *tail = rdp->nxtlist;
> rdp->nxtlist = list;
> @@ -1536,6 +1605,10 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
> else
> break;
> }
> + smp_mb(); /* List handling before counting for rcu_barrier(). */
> + rdp->qlen_lazy -= count_lazy;
> + rdp->qlen -= count;
> + rdp->n_cbs_invoked += count;
>
> /* Reinstate batch limit if we have worked down the excess. */
> if (rdp->blimit == LONG_MAX && rdp->qlen <= qlowmark)
> @@ -1824,13 +1897,14 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
> rdp = this_cpu_ptr(rsp->rda);
>
> /* Add the callback to our list. */
> - *rdp->nxttail[RCU_NEXT_TAIL] = head;
> - rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
> rdp->qlen++;
> if (lazy)
> rdp->qlen_lazy++;
> else
> rcu_idle_count_callbacks_posted();
> + smp_mb(); /* Count before adding callback for rcu_barrier(). */
> + *rdp->nxttail[RCU_NEXT_TAIL] = head;
> + rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
>
> if (__is_kfree_rcu_offset((unsigned long)func))
> trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func,
> @@ -2169,15 +2243,10 @@ static int rcu_cpu_has_callbacks(int cpu)
> rcu_preempt_cpu_has_callbacks(cpu);
> }
>
> -static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
> -static atomic_t rcu_barrier_cpu_count;
> -static DEFINE_MUTEX(rcu_barrier_mutex);
> -static struct completion rcu_barrier_completion;
> -
> static void rcu_barrier_callback(struct rcu_head *notused)
> {
> if (atomic_dec_and_test(&rcu_barrier_cpu_count))
> - complete(&rcu_barrier_completion);
> + wake_up(&rcu_barrier_wq);
> }
>
> /*
> @@ -2203,27 +2272,121 @@ static void _rcu_barrier(struct rcu_state *rsp,
> void (*call_rcu_func)(struct rcu_head *head,
> void (*func)(struct rcu_head *head)))
> {
> - BUG_ON(in_interrupt());
> + int cpu;
> + unsigned long flags;
> + struct rcu_data *rdp;
> + struct rcu_head rh;
> +
> + init_rcu_head_on_stack(&rh);
> +
> /* Take mutex to serialize concurrent rcu_barrier() requests. */
> mutex_lock(&rcu_barrier_mutex);
> - init_completion(&rcu_barrier_completion);
> +
> + smp_mb(); /* Prevent any prior operations from leaking in. */
> +
> /*
> - * Initialize rcu_barrier_cpu_count to 1, then invoke
> - * rcu_barrier_func() on each CPU, so that each CPU also has
> - * incremented rcu_barrier_cpu_count. Only then is it safe to
> - * decrement rcu_barrier_cpu_count -- otherwise the first CPU
> - * might complete its grace period before all of the other CPUs
> - * did their increment, causing this function to return too
> - * early. Note that on_each_cpu() disables irqs, which prevents
> - * any CPUs from coming online or going offline until each online
> - * CPU has queued its RCU-barrier callback.
> + * Initialize the count to one rather than to zero in order to
> + * avoid a too-soon return to zero in case of a short grace period
> + * (or preemption of this task). Also flag this task as doing
> + * an rcu_barrier(). This will prevent anyone else from adopting
> + * orphaned callbacks, which could cause otherwise failure if a
> + * CPU went offline and quickly came back online. To see this,
> + * consider the following sequence of events:
> + *
> + * 1. We cause CPU 0 to post an rcu_barrier_callback() callback.
> + * 2. CPU 1 goes offline, orphaning its callbacks.
> + * 3. CPU 0 adopts CPU 1's orphaned callbacks.
> + * 4. CPU 1 comes back online.
> + * 5. We cause CPU 1 to post an rcu_barrier_callback() callback.
> + * 6. Both rcu_barrier_callback() callbacks are invoked, awakening
> + * us -- but before CPU 1's orphaned callbacks are invoked!!!
> */
> atomic_set(&rcu_barrier_cpu_count, 1);
> - on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
> - if (atomic_dec_and_test(&rcu_barrier_cpu_count))
> - complete(&rcu_barrier_completion);
> - wait_for_completion(&rcu_barrier_completion);
> + raw_spin_lock_irqsave(&rsp->onofflock, flags);
> + rsp->rcu_barrier_in_progress = current;
> + raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> +
> + /*
> + * Force every CPU with callbacks to register a new callback
> + * that will tell us when all the preceding callbacks have
> + * been invoked. If an offline CPU has callbacks, wait for
> + * it to either come back online or to finish orphaning those
> + * callbacks.
> + */
> + for_each_possible_cpu(cpu) {
> + preempt_disable();
> + rdp = per_cpu_ptr(rsp->rda, cpu);
> + if (cpu_is_offline(cpu)) {
> + preempt_enable();
> + while (cpu_is_offline(cpu) &&
> + ACCESS_ONCE(rdp->qlen))
> + schedule_timeout_interruptible(1);
> + } else if (ACCESS_ONCE(rdp->qlen)) {
> + smp_call_function_single(cpu, rcu_barrier_func,
> + (void *)call_rcu_func, 1);
> + preempt_enable();
> + }
> + }
> +
> + /*
> + * Force any ongoing CPU-hotplug operations to complete,
> + * so that any callbacks from the outgoing CPUs are now in
> + * the orphanage.
> + */
> + cpu_maps_update_begin();
> + cpu_maps_update_done();
> +
> + /*
> + * Now that all online CPUs have rcu_barrier_callback() callbacks
> + * posted, we can adopt all of the orphaned callbacks and place
> + * an rcu_barrier_callback() callback after them. When that is done,
> + * we are guaranteed to have an rcu_barrier_callback() callback
> + * following every callback that could possibly have been
> + * registered before _rcu_barrier() was called.
> + */
> + raw_spin_lock_irqsave(&rsp->onofflock, flags);
> + rcu_adopt_orphan_cbs(rsp);
> + atomic_inc(&rcu_barrier_cpu_count);
> + call_rcu_func(&rh, rcu_barrier_callback);
> + raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> +
> + /*
> + * Now that we have an rcu_barrier_callback() callback on each
> + * CPU, and thus each counted, remove the initial count.
> + */
> + atomic_dec(&rcu_barrier_cpu_count);
> + smp_mb__after_atomic_dec();
> +
> + /*
> + * Loop waiting for all rcu_barrier_callback() callbacks to be
> + * invoked. Adopt any orphaned callbacks in the meantime, just
> + * in case one of the rcu_barrier_callback() callbacks is orphaned.
> + */
> + while (atomic_read(&rcu_barrier_cpu_count) > 0) {
> + wait_event(rcu_barrier_wq,
> + atomic_read(&rcu_barrier_cpu_count) == 0 ||
> + ACCESS_ONCE(rsp->qlen));
> + if (ACCESS_ONCE(rsp->qlen)) {
> + raw_spin_lock_irqsave(&rsp->onofflock, flags);
> + rcu_adopt_orphan_cbs(rsp);
> + raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> + }
> + }
> +
> + /*
> + * Done, so let others adopt orphaned callbacks. But avoid
> + * indefinite postponement of any additional orphans by adopting
> + * one more time.
> + */
> + raw_spin_lock_irqsave(&rsp->onofflock, flags);
> + rcu_adopt_orphan_cbs(rsp);
> + rsp->rcu_barrier_in_progress = NULL;
> + raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> +
> + /* Other rcu_barrier() invocations can now safely proceed. */
> mutex_unlock(&rcu_barrier_mutex);
> +
> + destroy_rcu_head_on_stack(&rh);
> }
>
> /**
> diff --git a/kernel/rcutree.h b/kernel/rcutree.h
> index 36ca28e..1e49c56 100644
> --- a/kernel/rcutree.h
> +++ b/kernel/rcutree.h
> @@ -371,6 +371,17 @@ struct rcu_state {
>
> raw_spinlock_t onofflock; /* exclude on/offline and */
> /* starting new GP. */
> + struct rcu_head *orphan_nxtlist; /* Orphaned callbacks that */
> + /* need a grace period. */
> + struct rcu_head **orphan_nxttail; /* Tail of above. */
> + struct rcu_head *orphan_donelist; /* Orphaned callbacks that */
> + /* are ready to invoke. */
> + struct rcu_head **orphan_donetail; /* Tail of above. */
> + long qlen_lazy; /* Number of lazy callbacks. */
> + long qlen; /* Total number of callbacks. */
> + struct task_struct *rcu_barrier_in_progress;
> + /* Task doing rcu_barrier(), */
> + /* or NULL if no barrier. */
> raw_spinlock_t fqslock; /* Only one task forcing */
> /* quiescent states. */
> unsigned long jiffies_force_qs; /* Time at which to invoke */
> diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
> index ed459ed..d4bc16d 100644
> --- a/kernel/rcutree_trace.c
> +++ b/kernel/rcutree_trace.c
> @@ -271,13 +271,13 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp)
>
> gpnum = rsp->gpnum;
> seq_printf(m, "c=%lu g=%lu s=%d jfq=%ld j=%x "
> - "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu\n",
> + "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld/%ld\n",
> rsp->completed, gpnum, rsp->fqs_state,
> (long)(rsp->jiffies_force_qs - jiffies),
> (int)(jiffies & 0xffff),
> rsp->n_force_qs, rsp->n_force_qs_ngp,
> rsp->n_force_qs - rsp->n_force_qs_ngp,
> - rsp->n_force_qs_lh);
> + rsp->n_force_qs_lh, rsp->qlen_lazy, rsp->qlen);
> for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) {
> if (rnp->level != level) {
> seq_puts(m, "\n");
>
--
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com
^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [PATCH RFC] rcu: Make rcu_barrier() less disruptive
2012-03-15 17:45 ` Mathieu Desnoyers
@ 2012-03-15 18:21 ` Paul E. McKenney
2012-03-15 18:31 ` Paul E. McKenney
0 siblings, 1 reply; 6+ messages in thread
From: Paul E. McKenney @ 2012-03-15 18:21 UTC (permalink / raw)
To: Mathieu Desnoyers
Cc: linux-kernel, srivatsa.bhat, mingo, laijs, dipankar, akpm, josh,
niv, tglx, peterz, rostedt, Valdis.Kletnieks, dhowells,
eric.dumazet, darren, fweisbec, patches
On Thu, Mar 15, 2012 at 01:45:27PM -0400, Mathieu Desnoyers wrote:
> * Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote:
> > The rcu_barrier() primitive interrupts each and every CPU, registering
> > a callback on every CPU. Once all of these callbacks have been invoked,
> > rcu_barrier() knows that every callback that was registered before
> > the call to rcu_barrier() has also been invoked.
> >
> > However, there is no point in registering a callback on a CPU that
> > currently has no callbacks, most especially if that CPU is in a
> > deep idle state. This commit therefore makes rcu_barrier() avoid
> > interrupting CPUs that have no callbacks. Doing this requires reworking
> > the handling of orphaned callbacks, otherwise callbacks could slip through
> > rcu_barrier()'s net by being orphaned from a CPU that rcu_barrier() had
> > not yet interrupted to a CPU that rcu_barrier() had already interrupted.
> > This reworking was needed anyway to take a first step towards weaning
> > RCU from the CPU_DYING notifier's use of stop_cpu().
>
> Quoting Documentation/RCU/rcubarrier.txt:
>
> "We instead need the rcu_barrier() primitive. This primitive is similar
> to synchronize_rcu(), but instead of waiting solely for a grace
> period to elapse, it also waits for all outstanding RCU callbacks to
> complete. Pseudo-code using rcu_barrier() is as follows:"
>
> The patch you propose seems like a good approach to make rcu_barrier
> less disruptive, but everyone need to be aware that rcu_barrier() would
> quit having the side-effect of doing the equivalent of
> "synchronize_rcu()" from now on: within this new approach, in the case
> where there are no pending callbacks, rcu_barrier() could, AFAIU, return
> without waiting for the current grace period to complete.
>
> Any use of rcu_barrier() that would assume that a synchronize_rcu() is
> implicit with the rcu_barrier() execution would be a bug anyway, but
> those might only show up after this patch is applied. I would therefore
> recommend to audit all rcu_barrier() users to ensure none is expecting
> rcu_barrier to act as a synchronize_rcu before pushing this change.
Good catch!
I am going to chicken out and explicitly wait for a grace period if there
were no callbacks. Having rcu_barrier() very rarely be a quick no-op does
sound like a standing invitation for subtle non-reproducible bugs. ;-)
Thanx, Paul
> Thanks,
>
> Mathieu
>
> >
> > Signed-off-by: Paul E. McKenney <paul.mckenney@linaro.org>
> > Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
> >
> > diff --git a/kernel/rcutree.c b/kernel/rcutree.c
> > index 403306b..8269656 100644
> > --- a/kernel/rcutree.c
> > +++ b/kernel/rcutree.c
> > @@ -75,6 +75,8 @@ static struct lock_class_key rcu_node_class[NUM_RCU_LVLS];
> > .gpnum = -300, \
> > .completed = -300, \
> > .onofflock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.onofflock), \
> > + .orphan_nxttail = &structname##_state.orphan_nxtlist, \
> > + .orphan_donetail = &structname##_state.orphan_donelist, \
> > .fqslock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.fqslock), \
> > .n_force_qs = 0, \
> > .n_force_qs_ngp = 0, \
> > @@ -145,6 +147,13 @@ static void invoke_rcu_callbacks(struct rcu_state *rsp, struct rcu_data *rdp);
> > unsigned long rcutorture_testseq;
> > unsigned long rcutorture_vernum;
> >
> > +/* State information for rcu_barrier() and friends. */
> > +
> > +static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
> > +static atomic_t rcu_barrier_cpu_count;
> > +static DEFINE_MUTEX(rcu_barrier_mutex);
> > +static DECLARE_WAIT_QUEUE_HEAD(rcu_barrier_wq);
> > +
> > /*
> > * Return true if an RCU grace period is in progress. The ACCESS_ONCE()s
> > * permit this function to be invoked without holding the root rcu_node
> > @@ -1311,7 +1320,60 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
> > #ifdef CONFIG_HOTPLUG_CPU
> >
> > /*
> > - * Move a dying CPU's RCU callbacks to online CPU's callback list.
> > + * Adopt the RCU callbacks from the specified rcu_state structure's
> > + * orphanage. The caller must hold the ->onofflock.
> > + */
> > +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
> > +{
> > + int i;
> > + struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);
> > +
> > + /*
> > + * If there is an rcu_barrier() operation in progress, then
> > + * only the task doing that operation is permitted to adopt
> > + * callbacks. To do otherwise breaks rcu_barrier() and friends
> > + * by causing them to fail to wait for the callbacks in the
> > + * orphanage.
> > + */
> > + if (rsp->rcu_barrier_in_progress &&
> > + rsp->rcu_barrier_in_progress != current)
> > + return;
> > +
> > + /* Do the accounting first. */
> > + rdp->qlen_lazy += rsp->qlen_lazy;
> > + rdp->qlen += rsp->qlen;
> > + rdp->n_cbs_adopted += rsp->qlen;
> > + rsp->qlen_lazy = 0;
> > + rsp->qlen = 0;
> > +
> > + /*
> > + * We do not need a memory barrier here because the only way we
> > + * can get here if there is an rcu_barrier() in flight is if
> > + * we are the task doing the rcu_barrier().
> > + */
> > +
> > + /* First adopt the ready-to-invoke callbacks. */
> > + if (rsp->orphan_donelist != NULL) {
> > + *rsp->orphan_donetail = *rdp->nxttail[RCU_DONE_TAIL];
> > + *rdp->nxttail[RCU_DONE_TAIL] = rsp->orphan_donelist;
> > + for (i = RCU_NEXT_SIZE - 1; i >= RCU_DONE_TAIL; i--)
> > + if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL])
> > + rdp->nxttail[i] = rsp->orphan_donetail;
> > + rsp->orphan_donelist = NULL;
> > + rsp->orphan_donetail = &rsp->orphan_donelist;
> > + }
> > +
> > + /* And then adopt the callbacks that still need a grace period. */
> > + if (rsp->orphan_nxtlist != NULL) {
> > + *rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxtlist;
> > + rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxttail;
> > + rsp->orphan_nxtlist = NULL;
> > + rsp->orphan_nxttail = &rsp->orphan_nxtlist;
> > + }
> > +}
> > +
> > +/*
> > + * Move a dying CPU's RCU callbacks to the rcu_state structure's orphanage.
> > * Also record a quiescent state for this CPU for the current grace period.
> > * Synchronization and interrupt disabling are not required because
> > * this function executes in stop_machine() context. Therefore, cleanup
> > @@ -1325,64 +1387,67 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
> > static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
> > {
> > int i;
> > + unsigned long flags;
> > unsigned long mask;
> > - int receive_cpu = cpumask_any(cpu_online_mask);
> > + bool orphaned = 0;
> > struct rcu_data *rdp = this_cpu_ptr(rsp->rda);
> > - struct rcu_data *receive_rdp = per_cpu_ptr(rsp->rda, receive_cpu);
> > RCU_TRACE(struct rcu_node *rnp = rdp->mynode); /* For dying CPU. */
> >
> > - /* First, adjust the counts. */
> > + /* Move the callbacks to the orphanage under ->onofflock protection. */
> > + raw_spin_lock_irqsave(&rsp->onofflock, flags);
> > +
> > + /* First adjust the counts. */
> > if (rdp->nxtlist != NULL) {
> > - receive_rdp->qlen_lazy += rdp->qlen_lazy;
> > - receive_rdp->qlen += rdp->qlen;
> > + rsp->qlen_lazy += rdp->qlen_lazy;
> > + rsp->qlen += rdp->qlen;
> > + rdp->n_cbs_orphaned += rdp->qlen;
> > rdp->qlen_lazy = 0;
> > rdp->qlen = 0;
> > + orphaned = 1;
> > }
> >
> > /*
> > - * Next, move ready-to-invoke callbacks to be invoked on some
> > - * other CPU. These will not be required to pass through another
> > - * grace period: They are done, regardless of CPU.
> > + * Next, move those callbacks still needing a grace period to
> > + * the orphanage, where some other CPU will pick them up.
> > + * Some of the callbacks might have gone partway through a grace
> > + * period, but that is too bad. They get to start over because we
> > + * cannot assume that grace periods are synchronized across CPUs.
> > + * We don't bother updating the ->nxttail[] array yet, instead
> > + * we just reset the whole thing later on.
> > */
> > - if (rdp->nxtlist != NULL &&
> > - rdp->nxttail[RCU_DONE_TAIL] != &rdp->nxtlist) {
> > - struct rcu_head *oldhead;
> > - struct rcu_head **oldtail;
> > - struct rcu_head **newtail;
> > -
> > - oldhead = rdp->nxtlist;
> > - oldtail = receive_rdp->nxttail[RCU_DONE_TAIL];
> > - rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL];
> > - *rdp->nxttail[RCU_DONE_TAIL] = *oldtail;
> > - *receive_rdp->nxttail[RCU_DONE_TAIL] = oldhead;
> > - newtail = rdp->nxttail[RCU_DONE_TAIL];
> > - for (i = RCU_DONE_TAIL; i < RCU_NEXT_SIZE; i++) {
> > - if (receive_rdp->nxttail[i] == oldtail)
> > - receive_rdp->nxttail[i] = newtail;
> > - if (rdp->nxttail[i] == newtail)
> > - rdp->nxttail[i] = &rdp->nxtlist;
> > - }
> > + if (*rdp->nxttail[RCU_DONE_TAIL] != NULL) {
> > + *rsp->orphan_nxttail = *rdp->nxttail[RCU_DONE_TAIL];
> > + rsp->orphan_nxttail = rdp->nxttail[RCU_NEXT_TAIL];
> > + *rdp->nxttail[RCU_DONE_TAIL] = NULL;
> > }
> >
> > /*
> > - * Finally, put the rest of the callbacks at the end of the list.
> > - * The ones that made it partway through get to start over: We
> > - * cannot assume that grace periods are synchronized across CPUs.
> > - * (We could splice RCU_WAIT_TAIL into RCU_NEXT_READY_TAIL, but
> > - * this does not seem compelling. Not yet, anyway.)
> > + * Then move the ready-to-invoke callbacks to the orphanage,
> > + * where some other CPU will pick them up. These will not be
> > + * required to pass though another grace period: They are done.
> > */
> > if (rdp->nxtlist != NULL) {
> > - *receive_rdp->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist;
> > - receive_rdp->nxttail[RCU_NEXT_TAIL] =
> > - rdp->nxttail[RCU_NEXT_TAIL];
> > - receive_rdp->n_cbs_adopted += rdp->qlen;
> > - rdp->n_cbs_orphaned += rdp->qlen;
> > -
> > - rdp->nxtlist = NULL;
> > - for (i = 0; i < RCU_NEXT_SIZE; i++)
> > - rdp->nxttail[i] = &rdp->nxtlist;
> > + *rsp->orphan_donetail = rdp->nxtlist;
> > + rsp->orphan_donetail = rdp->nxttail[RCU_DONE_TAIL];
> > }
> >
> > + /* Finally, initialize the rcu_data structure's list to empty. */
> > + rdp->nxtlist = NULL;
> > + for (i = 0; i < RCU_NEXT_SIZE; i++)
> > + rdp->nxttail[i] = &rdp->nxtlist;
> > +
> > + /*
> > + * Wait up the rcu_barrier() task if there is one and if we
> > + * actually sent anything to the orphanage. Except that we
> > + * must delay the wakeup until ->onofflock is released to
> > + * avoid deadlock.
> > + */
> > + if (!rsp->rcu_barrier_in_progress)
> > + orphaned = 0;
> > + raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> > + if (orphaned)
> > + wake_up(&rcu_barrier_wq);
> > +
> > /*
> > * Record a quiescent state for the dying CPU. This is safe
> > * only because we have already cleared out the callbacks.
> > @@ -1415,11 +1480,14 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
> > rcu_stop_cpu_kthread(cpu);
> > rcu_node_kthread_setaffinity(rnp, -1);
> >
> > - /* Remove the dying CPU from the bitmasks in the rcu_node hierarchy. */
> > + /* Remove the dead CPU from the bitmasks in the rcu_node hierarchy. */
> >
> > /* Exclude any attempts to start a new grace period. */
> > raw_spin_lock_irqsave(&rsp->onofflock, flags);
> >
> > + /* Collect the dead CPU's callbacks. */
> > + rcu_adopt_orphan_cbs(rsp);
> > +
> > /* Remove the outgoing CPU from the masks in the rcu_node hierarchy. */
> > mask = rdp->grpmask; /* rnp->grplo is constant. */
> > do {
> > @@ -1456,6 +1524,10 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
> >
> > #else /* #ifdef CONFIG_HOTPLUG_CPU */
> >
> > +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
> > +{
> > +}
> > +
> > static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
> > {
> > }
> > @@ -1524,9 +1596,6 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
> > rcu_is_callbacks_kthread());
> >
> > /* Update count, and requeue any remaining callbacks. */
> > - rdp->qlen_lazy -= count_lazy;
> > - rdp->qlen -= count;
> > - rdp->n_cbs_invoked += count;
> > if (list != NULL) {
> > *tail = rdp->nxtlist;
> > rdp->nxtlist = list;
> > @@ -1536,6 +1605,10 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
> > else
> > break;
> > }
> > + smp_mb(); /* List handling before counting for rcu_barrier(). */
> > + rdp->qlen_lazy -= count_lazy;
> > + rdp->qlen -= count;
> > + rdp->n_cbs_invoked += count;
> >
> > /* Reinstate batch limit if we have worked down the excess. */
> > if (rdp->blimit == LONG_MAX && rdp->qlen <= qlowmark)
> > @@ -1824,13 +1897,14 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
> > rdp = this_cpu_ptr(rsp->rda);
> >
> > /* Add the callback to our list. */
> > - *rdp->nxttail[RCU_NEXT_TAIL] = head;
> > - rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
> > rdp->qlen++;
> > if (lazy)
> > rdp->qlen_lazy++;
> > else
> > rcu_idle_count_callbacks_posted();
> > + smp_mb(); /* Count before adding callback for rcu_barrier(). */
> > + *rdp->nxttail[RCU_NEXT_TAIL] = head;
> > + rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
> >
> > if (__is_kfree_rcu_offset((unsigned long)func))
> > trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func,
> > @@ -2169,15 +2243,10 @@ static int rcu_cpu_has_callbacks(int cpu)
> > rcu_preempt_cpu_has_callbacks(cpu);
> > }
> >
> > -static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
> > -static atomic_t rcu_barrier_cpu_count;
> > -static DEFINE_MUTEX(rcu_barrier_mutex);
> > -static struct completion rcu_barrier_completion;
> > -
> > static void rcu_barrier_callback(struct rcu_head *notused)
> > {
> > if (atomic_dec_and_test(&rcu_barrier_cpu_count))
> > - complete(&rcu_barrier_completion);
> > + wake_up(&rcu_barrier_wq);
> > }
> >
> > /*
> > @@ -2203,27 +2272,121 @@ static void _rcu_barrier(struct rcu_state *rsp,
> > void (*call_rcu_func)(struct rcu_head *head,
> > void (*func)(struct rcu_head *head)))
> > {
> > - BUG_ON(in_interrupt());
> > + int cpu;
> > + unsigned long flags;
> > + struct rcu_data *rdp;
> > + struct rcu_head rh;
> > +
> > + init_rcu_head_on_stack(&rh);
> > +
> > /* Take mutex to serialize concurrent rcu_barrier() requests. */
> > mutex_lock(&rcu_barrier_mutex);
> > - init_completion(&rcu_barrier_completion);
> > +
> > + smp_mb(); /* Prevent any prior operations from leaking in. */
> > +
> > /*
> > - * Initialize rcu_barrier_cpu_count to 1, then invoke
> > - * rcu_barrier_func() on each CPU, so that each CPU also has
> > - * incremented rcu_barrier_cpu_count. Only then is it safe to
> > - * decrement rcu_barrier_cpu_count -- otherwise the first CPU
> > - * might complete its grace period before all of the other CPUs
> > - * did their increment, causing this function to return too
> > - * early. Note that on_each_cpu() disables irqs, which prevents
> > - * any CPUs from coming online or going offline until each online
> > - * CPU has queued its RCU-barrier callback.
> > + * Initialize the count to one rather than to zero in order to
> > + * avoid a too-soon return to zero in case of a short grace period
> > + * (or preemption of this task). Also flag this task as doing
> > + * an rcu_barrier(). This will prevent anyone else from adopting
> > + * orphaned callbacks, which could cause otherwise failure if a
> > + * CPU went offline and quickly came back online. To see this,
> > + * consider the following sequence of events:
> > + *
> > + * 1. We cause CPU 0 to post an rcu_barrier_callback() callback.
> > + * 2. CPU 1 goes offline, orphaning its callbacks.
> > + * 3. CPU 0 adopts CPU 1's orphaned callbacks.
> > + * 4. CPU 1 comes back online.
> > + * 5. We cause CPU 1 to post an rcu_barrier_callback() callback.
> > + * 6. Both rcu_barrier_callback() callbacks are invoked, awakening
> > + * us -- but before CPU 1's orphaned callbacks are invoked!!!
> > */
> > atomic_set(&rcu_barrier_cpu_count, 1);
> > - on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
> > - if (atomic_dec_and_test(&rcu_barrier_cpu_count))
> > - complete(&rcu_barrier_completion);
> > - wait_for_completion(&rcu_barrier_completion);
> > + raw_spin_lock_irqsave(&rsp->onofflock, flags);
> > + rsp->rcu_barrier_in_progress = current;
> > + raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> > +
> > + /*
> > + * Force every CPU with callbacks to register a new callback
> > + * that will tell us when all the preceding callbacks have
> > + * been invoked. If an offline CPU has callbacks, wait for
> > + * it to either come back online or to finish orphaning those
> > + * callbacks.
> > + */
> > + for_each_possible_cpu(cpu) {
> > + preempt_disable();
> > + rdp = per_cpu_ptr(rsp->rda, cpu);
> > + if (cpu_is_offline(cpu)) {
> > + preempt_enable();
> > + while (cpu_is_offline(cpu) &&
> > + ACCESS_ONCE(rdp->qlen))
> > + schedule_timeout_interruptible(1);
> > + } else if (ACCESS_ONCE(rdp->qlen)) {
> > + smp_call_function_single(cpu, rcu_barrier_func,
> > + (void *)call_rcu_func, 1);
> > + preempt_enable();
> > + }
> > + }
> > +
> > + /*
> > + * Force any ongoing CPU-hotplug operations to complete,
> > + * so that any callbacks from the outgoing CPUs are now in
> > + * the orphanage.
> > + */
> > + cpu_maps_update_begin();
> > + cpu_maps_update_done();
> > +
> > + /*
> > + * Now that all online CPUs have rcu_barrier_callback() callbacks
> > + * posted, we can adopt all of the orphaned callbacks and place
> > + * an rcu_barrier_callback() callback after them. When that is done,
> > + * we are guaranteed to have an rcu_barrier_callback() callback
> > + * following every callback that could possibly have been
> > + * registered before _rcu_barrier() was called.
> > + */
> > + raw_spin_lock_irqsave(&rsp->onofflock, flags);
> > + rcu_adopt_orphan_cbs(rsp);
> > + atomic_inc(&rcu_barrier_cpu_count);
> > + call_rcu_func(&rh, rcu_barrier_callback);
> > + raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> > +
> > + /*
> > + * Now that we have an rcu_barrier_callback() callback on each
> > + * CPU, and thus each counted, remove the initial count.
> > + */
> > + atomic_dec(&rcu_barrier_cpu_count);
> > + smp_mb__after_atomic_dec();
> > +
> > + /*
> > + * Loop waiting for all rcu_barrier_callback() callbacks to be
> > + * invoked. Adopt any orphaned callbacks in the meantime, just
> > + * in case one of the rcu_barrier_callback() callbacks is orphaned.
> > + */
> > + while (atomic_read(&rcu_barrier_cpu_count) > 0) {
> > + wait_event(rcu_barrier_wq,
> > + atomic_read(&rcu_barrier_cpu_count) == 0 ||
> > + ACCESS_ONCE(rsp->qlen));
> > + if (ACCESS_ONCE(rsp->qlen)) {
> > + raw_spin_lock_irqsave(&rsp->onofflock, flags);
> > + rcu_adopt_orphan_cbs(rsp);
> > + raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> > + }
> > + }
> > +
> > + /*
> > + * Done, so let others adopt orphaned callbacks. But avoid
> > + * indefinite postponement of any additional orphans by adopting
> > + * one more time.
> > + */
> > + raw_spin_lock_irqsave(&rsp->onofflock, flags);
> > + rcu_adopt_orphan_cbs(rsp);
> > + rsp->rcu_barrier_in_progress = NULL;
> > + raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> > +
> > + /* Other rcu_barrier() invocations can now safely proceed. */
> > mutex_unlock(&rcu_barrier_mutex);
> > +
> > + destroy_rcu_head_on_stack(&rh);
> > }
> >
> > /**
> > diff --git a/kernel/rcutree.h b/kernel/rcutree.h
> > index 36ca28e..1e49c56 100644
> > --- a/kernel/rcutree.h
> > +++ b/kernel/rcutree.h
> > @@ -371,6 +371,17 @@ struct rcu_state {
> >
> > raw_spinlock_t onofflock; /* exclude on/offline and */
> > /* starting new GP. */
> > + struct rcu_head *orphan_nxtlist; /* Orphaned callbacks that */
> > + /* need a grace period. */
> > + struct rcu_head **orphan_nxttail; /* Tail of above. */
> > + struct rcu_head *orphan_donelist; /* Orphaned callbacks that */
> > + /* are ready to invoke. */
> > + struct rcu_head **orphan_donetail; /* Tail of above. */
> > + long qlen_lazy; /* Number of lazy callbacks. */
> > + long qlen; /* Total number of callbacks. */
> > + struct task_struct *rcu_barrier_in_progress;
> > + /* Task doing rcu_barrier(), */
> > + /* or NULL if no barrier. */
> > raw_spinlock_t fqslock; /* Only one task forcing */
> > /* quiescent states. */
> > unsigned long jiffies_force_qs; /* Time at which to invoke */
> > diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
> > index ed459ed..d4bc16d 100644
> > --- a/kernel/rcutree_trace.c
> > +++ b/kernel/rcutree_trace.c
> > @@ -271,13 +271,13 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp)
> >
> > gpnum = rsp->gpnum;
> > seq_printf(m, "c=%lu g=%lu s=%d jfq=%ld j=%x "
> > - "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu\n",
> > + "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld/%ld\n",
> > rsp->completed, gpnum, rsp->fqs_state,
> > (long)(rsp->jiffies_force_qs - jiffies),
> > (int)(jiffies & 0xffff),
> > rsp->n_force_qs, rsp->n_force_qs_ngp,
> > rsp->n_force_qs - rsp->n_force_qs_ngp,
> > - rsp->n_force_qs_lh);
> > + rsp->n_force_qs_lh, rsp->qlen_lazy, rsp->qlen);
> > for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) {
> > if (rnp->level != level) {
> > seq_puts(m, "\n");
> >
>
> --
> Mathieu Desnoyers
> Operating System Efficiency R&D Consultant
> EfficiOS Inc.
> http://www.efficios.com
>
^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [PATCH RFC] rcu: Make rcu_barrier() less disruptive
2012-03-15 18:21 ` Paul E. McKenney
@ 2012-03-15 18:31 ` Paul E. McKenney
2012-03-15 18:50 ` Mathieu Desnoyers
0 siblings, 1 reply; 6+ messages in thread
From: Paul E. McKenney @ 2012-03-15 18:31 UTC (permalink / raw)
To: Mathieu Desnoyers
Cc: linux-kernel, srivatsa.bhat, mingo, laijs, dipankar, akpm, josh,
niv, tglx, peterz, rostedt, Valdis.Kletnieks, dhowells,
eric.dumazet, darren, fweisbec, patches
On Thu, Mar 15, 2012 at 11:21:59AM -0700, Paul E. McKenney wrote:
> On Thu, Mar 15, 2012 at 01:45:27PM -0400, Mathieu Desnoyers wrote:
> > * Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote:
> > > The rcu_barrier() primitive interrupts each and every CPU, registering
> > > a callback on every CPU. Once all of these callbacks have been invoked,
> > > rcu_barrier() knows that every callback that was registered before
> > > the call to rcu_barrier() has also been invoked.
> > >
> > > However, there is no point in registering a callback on a CPU that
> > > currently has no callbacks, most especially if that CPU is in a
> > > deep idle state. This commit therefore makes rcu_barrier() avoid
> > > interrupting CPUs that have no callbacks. Doing this requires reworking
> > > the handling of orphaned callbacks, otherwise callbacks could slip through
> > > rcu_barrier()'s net by being orphaned from a CPU that rcu_barrier() had
> > > not yet interrupted to a CPU that rcu_barrier() had already interrupted.
> > > This reworking was needed anyway to take a first step towards weaning
> > > RCU from the CPU_DYING notifier's use of stop_cpu().
> >
> > Quoting Documentation/RCU/rcubarrier.txt:
> >
> > "We instead need the rcu_barrier() primitive. This primitive is similar
> > to synchronize_rcu(), but instead of waiting solely for a grace
> > period to elapse, it also waits for all outstanding RCU callbacks to
> > complete. Pseudo-code using rcu_barrier() is as follows:"
> >
> > The patch you propose seems like a good approach to make rcu_barrier
> > less disruptive, but everyone need to be aware that rcu_barrier() would
> > quit having the side-effect of doing the equivalent of
> > "synchronize_rcu()" from now on: within this new approach, in the case
> > where there are no pending callbacks, rcu_barrier() could, AFAIU, return
> > without waiting for the current grace period to complete.
> >
> > Any use of rcu_barrier() that would assume that a synchronize_rcu() is
> > implicit with the rcu_barrier() execution would be a bug anyway, but
> > those might only show up after this patch is applied. I would therefore
> > recommend to audit all rcu_barrier() users to ensure none is expecting
> > rcu_barrier to act as a synchronize_rcu before pushing this change.
>
> Good catch!
>
> I am going to chicken out and explicitly wait for a grace period if there
> were no callbacks. Having rcu_barrier() very rarely be a quick no-op does
> sound like a standing invitation for subtle non-reproducible bugs. ;-)
I take it back...
After adopting callbacks (rcu_adopt_orphan_cbs()), _rcu_barrier()
unconditionally posts a callback on the current CPU and waits for it.
So _rcu_barrier() actually does always wait for a grace period.
Yes, I could be more dainty and make rcu_adopt_orphan_cbs() return an
indication of whether there were any callbacks, and then post the callback
only if either there were some callbacks adopted or if there were no calls
to smp_call_function_single(). But that adds complexity for almost no
benefit -- and no one can accuse _rcu_barrier() of being a fastpath! ;-)
Or am I missing something here?
Thanx, Paul
^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [PATCH RFC] rcu: Make rcu_barrier() less disruptive
2012-03-15 18:31 ` Paul E. McKenney
@ 2012-03-15 18:50 ` Mathieu Desnoyers
0 siblings, 0 replies; 6+ messages in thread
From: Mathieu Desnoyers @ 2012-03-15 18:50 UTC (permalink / raw)
To: Paul E. McKenney
Cc: linux-kernel, srivatsa.bhat, mingo, laijs, dipankar, akpm, josh,
niv, tglx, peterz, rostedt, Valdis.Kletnieks, dhowells,
eric.dumazet, darren, fweisbec, patches
* Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote:
> On Thu, Mar 15, 2012 at 11:21:59AM -0700, Paul E. McKenney wrote:
> > On Thu, Mar 15, 2012 at 01:45:27PM -0400, Mathieu Desnoyers wrote:
> > > * Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote:
> > > > The rcu_barrier() primitive interrupts each and every CPU, registering
> > > > a callback on every CPU. Once all of these callbacks have been invoked,
> > > > rcu_barrier() knows that every callback that was registered before
> > > > the call to rcu_barrier() has also been invoked.
> > > >
> > > > However, there is no point in registering a callback on a CPU that
> > > > currently has no callbacks, most especially if that CPU is in a
> > > > deep idle state. This commit therefore makes rcu_barrier() avoid
> > > > interrupting CPUs that have no callbacks. Doing this requires reworking
> > > > the handling of orphaned callbacks, otherwise callbacks could slip through
> > > > rcu_barrier()'s net by being orphaned from a CPU that rcu_barrier() had
> > > > not yet interrupted to a CPU that rcu_barrier() had already interrupted.
> > > > This reworking was needed anyway to take a first step towards weaning
> > > > RCU from the CPU_DYING notifier's use of stop_cpu().
> > >
> > > Quoting Documentation/RCU/rcubarrier.txt:
> > >
> > > "We instead need the rcu_barrier() primitive. This primitive is similar
> > > to synchronize_rcu(), but instead of waiting solely for a grace
> > > period to elapse, it also waits for all outstanding RCU callbacks to
> > > complete. Pseudo-code using rcu_barrier() is as follows:"
> > >
> > > The patch you propose seems like a good approach to make rcu_barrier
> > > less disruptive, but everyone need to be aware that rcu_barrier() would
> > > quit having the side-effect of doing the equivalent of
> > > "synchronize_rcu()" from now on: within this new approach, in the case
> > > where there are no pending callbacks, rcu_barrier() could, AFAIU, return
> > > without waiting for the current grace period to complete.
> > >
> > > Any use of rcu_barrier() that would assume that a synchronize_rcu() is
> > > implicit with the rcu_barrier() execution would be a bug anyway, but
> > > those might only show up after this patch is applied. I would therefore
> > > recommend to audit all rcu_barrier() users to ensure none is expecting
> > > rcu_barrier to act as a synchronize_rcu before pushing this change.
> >
> > Good catch!
> >
> > I am going to chicken out and explicitly wait for a grace period if there
> > were no callbacks. Having rcu_barrier() very rarely be a quick no-op does
> > sound like a standing invitation for subtle non-reproducible bugs. ;-)
>
> I take it back...
>
> After adopting callbacks (rcu_adopt_orphan_cbs()), _rcu_barrier()
> unconditionally posts a callback on the current CPU and waits for it.
> So _rcu_barrier() actually does always wait for a grace period.
Ah ok, that should handle it then.
>
> Yes, I could be more dainty and make rcu_adopt_orphan_cbs() return an
> indication of whether there were any callbacks, and then post the callback
> only if either there were some callbacks adopted or if there were no calls
> to smp_call_function_single(). But that adds complexity for almost no
> benefit -- and no one can accuse _rcu_barrier() of being a fastpath! ;-)
>
> Or am I missing something here?
Nope, I think it all makes sense.
Thanks,
Mathieu
>
> Thanx, Paul
>
--
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com
^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [PATCH RFC] rcu: Make rcu_barrier() less disruptive
2012-03-15 16:48 [PATCH RFC] rcu: Make rcu_barrier() less disruptive Paul E. McKenney
2012-03-15 17:45 ` Mathieu Desnoyers
@ 2012-03-19 14:23 ` Srivatsa S. Bhat
1 sibling, 0 replies; 6+ messages in thread
From: Srivatsa S. Bhat @ 2012-03-19 14:23 UTC (permalink / raw)
To: paulmck
Cc: linux-kernel, mingo, laijs, dipankar, akpm, mathieu.desnoyers,
josh, niv, tglx, peterz, rostedt, Valdis.Kletnieks, dhowells,
eric.dumazet, darren, fweisbec, patches
Hi Paul,
On 03/15/2012 10:18 PM, Paul E. McKenney wrote:
> The rcu_barrier() primitive interrupts each and every CPU, registering
> a callback on every CPU. Once all of these callbacks have been invoked,
> rcu_barrier() knows that every callback that was registered before
> the call to rcu_barrier() has also been invoked.
>
> However, there is no point in registering a callback on a CPU that
> currently has no callbacks, most especially if that CPU is in a
> deep idle state. This commit therefore makes rcu_barrier() avoid
> interrupting CPUs that have no callbacks. Doing this requires reworking
> the handling of orphaned callbacks, otherwise callbacks could slip through
> rcu_barrier()'s net by being orphaned from a CPU that rcu_barrier() had
> not yet interrupted to a CPU that rcu_barrier() had already interrupted.
> This reworking was needed anyway to take a first step towards weaning
> RCU from the CPU_DYING notifier's use of stop_cpu().
>
s/stop_cpu()/stop_machine() ?
Thank you for doing this! :-)
> Signed-off-by: Paul E. McKenney <paul.mckenney@linaro.org>
> Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
>
Regards,
Srivatsa S. Bhat
IBM Linux Technology Center
^ permalink raw reply [flat|nested] 6+ messages in thread
end of thread, other threads:[~2012-03-19 14:33 UTC | newest]
Thread overview: 6+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2012-03-15 16:48 [PATCH RFC] rcu: Make rcu_barrier() less disruptive Paul E. McKenney
2012-03-15 17:45 ` Mathieu Desnoyers
2012-03-15 18:21 ` Paul E. McKenney
2012-03-15 18:31 ` Paul E. McKenney
2012-03-15 18:50 ` Mathieu Desnoyers
2012-03-19 14:23 ` Srivatsa S. Bhat
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox