public inbox for linux-kernel@vger.kernel.org
 help / color / mirror / Atom feed
* [RFC, PATCH -rt] NMI-safe mb- and atomic-free RT RCU
@ 2006-07-26  0:17 Paul E. McKenney
  2006-07-27  1:39 ` Esben Nielsen
  0 siblings, 1 reply; 13+ messages in thread
From: Paul E. McKenney @ 2006-07-26  0:17 UTC (permalink / raw)
  To: linux-kernel
  Cc: nielsen.esben, compudj, billh, billh, rostedt, tglx, mingo,
	dipankar, rusty

Not for inclusion, should be viewed with great suspicion.

This patch provides an NMI-safe realtime RCU.  Hopefully, Mathieu can
make use of synchronize_sched() instead, since I have not yet figured
out a way to make this NMI-safe and still get rid of the interrupt
disabling.  ;-)

						Thanx, Paul

Signed-off-by: Paul E. McKenney <paulmck@us.ibm.com> (but please don't apply)

 include/linux/sched.h |    3 
 kernel/rcupreempt.c   |  577 ++++++++++++++++++++++++++++++++++++++++----------
 2 files changed, 470 insertions(+), 110 deletions(-)

diff -urpNa -X dontdiff linux-2.6.17-rt7/include/linux/sched.h linux-2.6.17-rt7-norrupt/include/linux/sched.h
--- linux-2.6.17-rt7/include/linux/sched.h	2006-07-19 01:43:09.000000000 -0700
+++ linux-2.6.17-rt7-norrupt/include/linux/sched.h	2006-07-22 15:44:36.000000000 -0700
@@ -868,8 +868,7 @@ struct task_struct {
 
 #ifdef CONFIG_PREEMPT_RCU
 	int rcu_read_lock_nesting;
-	atomic_t *rcu_flipctr1;
-	atomic_t *rcu_flipctr2;
+	int rcu_flipctr_idx;
 #endif
 #ifdef CONFIG_SCHEDSTATS
 	struct sched_info sched_info;
diff -urpNa -X dontdiff linux-2.6.17-rt7/kernel/rcupreempt.c linux-2.6.17-rt7-norrupt/kernel/rcupreempt.c
--- linux-2.6.17-rt7/kernel/rcupreempt.c	2006-07-19 01:43:09.000000000 -0700
+++ linux-2.6.17-rt7-norrupt/kernel/rcupreempt.c	2006-07-22 20:21:46.000000000 -0700
@@ -15,11 +15,13 @@
  * along with this program; if not, write to the Free Software
  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
  *
- * Copyright (C) IBM Corporation, 2001
+ * Copyright (C) IBM Corporation, 2006
  *
  * Authors: Paul E. McKenney <paulmck@us.ibm.com>
  *		With thanks to Esben Nielsen, Bill Huey, and Ingo Molnar
- *		for pushing me away from locks and towards counters.
+ *		for pushing me away from locks and towards counters, and
+ *		to Suparna Bhattacharya for pushing me completely away
+ *		from atomic instructions on the read side.
  *
  * Papers:  http://www.rdrop.com/users/paulmck/RCU
  *
@@ -73,12 +75,20 @@ struct rcu_data {
 	long		n_done_remove;
 	atomic_t	n_done_invoked;
 	long		n_rcu_check_callbacks;
-	atomic_t	n_rcu_try_flip1;
-	long		n_rcu_try_flip2;
-	long		n_rcu_try_flip3;
+	atomic_t	n_rcu_try_flip_1;
 	atomic_t	n_rcu_try_flip_e1;
-	long		n_rcu_try_flip_e2;
-	long		n_rcu_try_flip_e3;
+	long		n_rcu_try_flip_i1;
+	long		n_rcu_try_flip_ie1;
+	long		n_rcu_try_flip_g1;
+	long		n_rcu_try_flip_a1;
+	long		n_rcu_try_flip_ae1;
+	long		n_rcu_try_flip_a2;
+	long		n_rcu_try_flip_z1;
+	long		n_rcu_try_flip_ze1;
+	long		n_rcu_try_flip_z2;
+	long		n_rcu_try_flip_m1;
+	long		n_rcu_try_flip_me1;
+	long		n_rcu_try_flip_m2;
 #endif /* #ifdef CONFIG_RCU_STATS */
 };
 struct rcu_ctrlblk {
@@ -90,8 +100,51 @@ static struct rcu_ctrlblk rcu_ctrlblk = 
 	.fliplock = RAW_SPIN_LOCK_UNLOCKED,
 	.completed = 0,
 };
-static DEFINE_PER_CPU(atomic_t [2], rcu_flipctr) =
-	{ ATOMIC_INIT(0), ATOMIC_INIT(0) };
+static DEFINE_PER_CPU(int [2], rcu_flipctr) = { 0, 0 };
+
+/*
+ * States for rcu_try_flip() and friends.
+ */
+
+enum rcu_try_flip_states {
+	rcu_try_flip_idle_state,	/* "I" */
+	rcu_try_flip_in_gp_state,	/* "G" */
+	rcu_try_flip_waitack_state, 	/* "A" */
+	rcu_try_flip_waitzero_state,	/* "Z" */
+	rcu_try_flip_waitmb_state	/* "M" */
+};
+static enum rcu_try_flip_states rcu_try_flip_state = rcu_try_flip_idle_state;
+#ifdef CONFIG_RCU_STATS
+static char *rcu_try_flip_state_names[] =
+	{ "idle", "gp", "waitack", "waitzero", "waitmb" };
+#endif /* #ifdef CONFIG_RCU_STATS */
+
+/*
+ * Enum and per-CPU flag to determine when each CPU has seen
+ * the most recent counter flip.
+ */
+
+enum rcu_flip_flag_values {
+	rcu_flip_seen,		/* Steady/initial state, last flip seen. */
+				/* Only GP detector can update. */
+	rcu_flipped		/* Flip just completed, need confirmation. */
+				/* Only corresponding CPU can update. */
+};
+static DEFINE_PER_CPU(enum rcu_flip_flag_values, rcu_flip_flag) = rcu_flip_seen;
+
+/*
+ * Enum and per-CPU flag to determine when each CPU has executed the
+ * needed memory barrier to fence in memory references from its last RCU
+ * read-side critical section in the just-completed grace period.
+ */
+
+enum rcu_mb_flag_values {
+	rcu_mb_done,		/* Steady/initial state, no mb()s required. */
+				/* Only GP detector can update. */
+	rcu_mb_needed		/* Flip just completed, need an mb(). */
+				/* Only corresponding CPU can update. */
+};
+static DEFINE_PER_CPU(enum rcu_mb_flag_values, rcu_mb_flag) = rcu_mb_done;
 
 /*
  * Return the number of RCU batches processed thus far.  Useful
@@ -105,93 +158,182 @@ long rcu_batches_completed(void)
 void
 rcu_read_lock(void)
 {
-	int flipctr;
+	int idx;
+	int nesting;
 	unsigned long oldirq;
 
-	local_irq_save(oldirq);
-
+	raw_local_irq_save(oldirq);
 	trace_special(current->rcu_read_lock_nesting,
 		      (unsigned long) current->rcu_flipctr1,
 		      rcu_ctrlblk.completed);
-	if (current->rcu_read_lock_nesting++ == 0) {
+
+	nesting = current->rcu_read_lock_nesting;
+
+	/*
+	 * Any rcu_read_lock()s called from NMI handlers
+	 * at any point must have matching rcu_read_unlock()
+	 * calls in that same handler, so we will not see the
+	 * value of current->rcu_read_lock_nesting change.
+	 */
+
+	if (nesting != 0) {
 
 		/*
-		 * Outermost nesting of rcu_read_lock(), so atomically
+		 * There is an enclosing rcu_read_lock(), so all we
+		 * need to do is to increment the counter.
+		 */
+
+		current->rcu_read_lock_nesting = nesting + 1;
+
+	} else {
+
+		/*
+		 * Outermost nesting of rcu_read_lock(), so we must
 		 * increment the current counter for the current CPU.
+		 * This must be done carefully, because NMIs can
+		 * occur at any point in this code, and any rcu_read_lock()
+		 * and rcu_read_unlock() pairs in the NMI handlers
+		 * must interact non-destructively with this code.
+		 * Lots of barrier() calls, and -very- careful ordering.
+		 *
+		 * Changes to this code, including this one, must be
+		 * inspected, validated, and tested extremely carefully!!!
 		 */
 
-		flipctr = rcu_ctrlblk.completed & 0x1;
+		/*
+		 * First, pick up the index.  Enforce ordering for
+		 * both compilers and for DEC Alpha.
+		 */
+
+		idx = rcu_ctrlblk.completed & 0x1;
 		smp_read_barrier_depends();
-		current->rcu_flipctr1 = &(__get_cpu_var(rcu_flipctr)[flipctr]);
-		/* Can optimize to non-atomic on fastpath, but start simple. */
-		atomic_inc(current->rcu_flipctr1);
-		smp_mb__after_atomic_inc();  /* might optimize out... */
-		if (unlikely(flipctr != (rcu_ctrlblk.completed & 0x1))) {
-
-			/*
-			 * We raced with grace-period processing (flip).
-			 * Although we cannot be preempted here, there
-			 * could be interrupts, ECC errors and the like,
-			 * so just nail down both sides of the rcu_flipctr
-			 * array for the duration of our RCU read-side
-			 * critical section, preventing a second flip
-			 * from racing with us.  At some point, it would
-			 * be safe to decrement one of the counters, but
-			 * we have no way of knowing when that would be.
-			 * So just decrement them both in rcu_read_unlock().
-			 */
-
-			current->rcu_flipctr2 =
-				&(__get_cpu_var(rcu_flipctr)[!flipctr]);
-			/* Can again optimize to non-atomic on fastpath. */
-			atomic_inc(current->rcu_flipctr2);
-			smp_mb__after_atomic_inc();  /* might optimize out... */
-		}
+		barrier();
+
+		/*
+		 * Increment the per-CPU counter. NMI handlers
+		 * might increment it as well, but they had better
+		 * properly nest their rcu_read_lock()/rcu_read_unlock()
+		 * pairs so that the value is restored before the handler
+		 * returns to us.  Enforce ordering for compilers.
+		 */
+
+		__get_cpu_var(rcu_flipctr)[idx]++;
+		barrier();
+
+		/*
+		 * It is now safe to increment the task's nesting count.
+		 * NMIs that occur after this statement will route
+		 * their rcu_read_lock() calls through the "then" clause
+		 * of this "if" statement, and thus will no longer come
+		 * through this path.  Enforce ordering for compilers.
+		 */
+
+		current->rcu_read_lock_nesting = nesting + 1;
+		barrier();
+
+		/*
+		 * It is now safe to store the index into our task
+		 * structure.  Doing so earlier would have resulted
+		 * in it getting clobbered by NMI handlers.
+		 */
+
+		current->rcu_flipctr_idx = idx;
 	}
+
 	trace_special((unsigned long) current->rcu_flipctr1,
 		      (unsigned long) current->rcu_flipctr2,
 		      rcu_ctrlblk.completed);
-	local_irq_restore(oldirq);
+	raw_local_irq_restore(oldirq);
 }
 
 void
 rcu_read_unlock(void)
 {
+	int idx;
+	int nesting;
 	unsigned long oldirq;
 
-	local_irq_save(oldirq);
+	raw_local_irq_save(oldirq);
 	trace_special((unsigned long) current->rcu_flipctr1,
 		      (unsigned long) current->rcu_flipctr2,
 		      current->rcu_read_lock_nesting);
-	if (--current->rcu_read_lock_nesting == 0) {
+
+	nesting = current->rcu_read_lock_nesting;
+
+	/*
+	 * Any rcu_read_lock()s called from NMI handlers
+	 * at any point must have matching rcu_read_unlock()
+	 * calls in that same handler, so we will not see the
+	 * value of current->rcu_read_lock_nesting change.
+	 */
+
+	if (nesting > 1) {
 
 		/*
-		 * Just atomically decrement whatever we incremented.
-		 * Might later want to awaken some task waiting for the
-		 * grace period to complete, but keep it simple for the
-		 * moment.
+		 * There is an enclosing rcu_read_lock(), so all we
+		 * need to do is to decrement the counter.
 		 */
 
-		smp_mb__before_atomic_dec();
-		atomic_dec(current->rcu_flipctr1);
-		current->rcu_flipctr1 = NULL;
-		if (unlikely(current->rcu_flipctr2 != NULL)) {
-			atomic_dec(current->rcu_flipctr2);
-			current->rcu_flipctr2 = NULL;
-		}
+		current->rcu_read_lock_nesting = nesting - 1;
+
+	} else {
+
+		/*
+		 * Outermost nesting of rcu_read_unlock(), so we must
+		 * decrement the current counter for the current CPU.
+		 * This must be done carefully, because NMIs can
+		 * occur at any point in this code, and any rcu_read_lock()
+		 * and rcu_read_unlock() pairs in the NMI handlers
+		 * must interact non-destructively with this code.
+		 * Lots of barrier() calls, and -very- careful ordering.
+		 *
+		 * Changes to this code, including this one, must be
+		 * inspected, validated, and tested extremely carefully!!!
+		 */
+
+		/*
+		 * First, pick up the index.  Enforce ordering for
+		 * both compilers and for DEC Alpha.
+		 */
+
+		idx = current->rcu_flipctr_idx;
+		smp_read_barrier_depends();
+		barrier();
+
+		/*
+		 * It is now safe to decrement the task's nesting count.
+		 * NMIs that occur after this statement will route
+		 * their rcu_read_lock() calls through this "else" clause
+		 * of this "if" statement, and thus will start incrementing
+		 * the per-CPU counter on their own.  Enforce ordering for
+		 * compilers.
+		 */
+
+		current->rcu_read_lock_nesting = nesting - 1;
+		barrier();
+
+		/*
+		 * Decrement the per-CPU counter. NMI handlers
+		 * might increment it as well, but they had better
+		 * properly nest their rcu_read_lock()/rcu_read_unlock()
+		 * pairs so that the value is restored before the handler
+		 * returns to us.
+		 */
+
+		__get_cpu_var(rcu_flipctr)[idx]--;
 	}
 
 	trace_special((unsigned long)current->rcu_flipctr1,
 		      (unsigned long) current->rcu_flipctr2,
 		      current->rcu_read_lock_nesting);
-	local_irq_restore(oldirq);
+	raw_local_irq_restore(oldirq);
 }
 
 static void
 __rcu_advance_callbacks(void)
 {
 
-	if (rcu_data.completed != rcu_ctrlblk.completed) {
+	if ((rcu_data.completed >> 1) != (rcu_ctrlblk.completed >> 1)) {
 		if (rcu_data.waitlist != NULL) {
 			*rcu_data.donetail = rcu_data.waitlist;
 			rcu_data.donetail = rcu_data.waittail;
@@ -216,13 +358,186 @@ __rcu_advance_callbacks(void)
 			rcu_data.waittail = &rcu_data.waitlist;
 		}
 		rcu_data.completed = rcu_ctrlblk.completed;
+	} else if (rcu_data.completed != rcu_ctrlblk.completed) {
+		rcu_data.completed = rcu_ctrlblk.completed;
+	}
+}
+
+/*
+ * Get here when RCU is idle.  Decide whether we need to
+ * move out of idle state, and return zero if so.
+ * "Straightforward" approach for the moment, might later
+ * use callback-list lengths, grace-period duration, or
+ * some such to determine when to exit idle state.
+ * Might also need a pre-idle test that does not acquire
+ * the lock, but let's get the simple case working first...
+ */
+
+static int
+rcu_try_flip_idle(int flipctr)
+{
+#ifdef CONFIG_RCU_STATS
+	rcu_data.n_rcu_try_flip_i1++;
+#endif /* #ifdef CONFIG_RCU_STATS */
+
+	if (!rcu_pending(smp_processor_id())) {
+#ifdef CONFIG_RCU_STATS
+		rcu_data.n_rcu_try_flip_ie1++;
+#endif /* #ifdef CONFIG_RCU_STATS */
+		return 1;
 	}
+	return 0;
+}
+
+/*
+ * Flip processing up to and including the flip, as well as
+ * telling CPUs to acknowledge the flip.
+ */
+
+static int
+rcu_try_flip_in_gp(int flipctr)
+{
+	int cpu;
+
+#ifdef CONFIG_RCU_STATS
+	rcu_data.n_rcu_try_flip_g1++;
+#endif /* #ifdef CONFIG_RCU_STATS */
+
+	/*
+	 * Do the flip.
+	 */
+
+	rcu_ctrlblk.completed++;  /* stands in for rcu_try_flip_g2 */
+
+	/*
+	 * Need a memory barrier so that other CPUs see the new
+	 * counter value before they see the subsequent change of all
+	 * the rcu_flip_flag instances to rcu_flipped.
+	 */
+
+	smp_mb();
+
+	/* Now ask each CPU for acknowledgement of the flip. */
+
+	for_each_cpu(cpu)
+		per_cpu(rcu_flip_flag, cpu) = rcu_flipped;
+
+	return 0;
+}
+
+/*
+ * Wait for CPUs to acknowledge the flip.
+ */
+
+static int
+rcu_try_flip_waitack(int flipctr)
+{
+	int cpu;
+
+#ifdef CONFIG_RCU_STATS
+	rcu_data.n_rcu_try_flip_a1++;
+#endif /* #ifdef CONFIG_RCU_STATS */
+
+	for_each_cpu(cpu)
+		if (per_cpu(rcu_flip_flag, cpu) != rcu_flip_seen) {
+#ifdef CONFIG_RCU_STATS
+			rcu_data.n_rcu_try_flip_ae1++;
+#endif /* #ifdef CONFIG_RCU_STATS */
+			return 1;
+		}
+
+	/*
+	 * Make sure our checks above don't bleed into subsequent
+	 * waiting for the sum of the counters to reach zero.
+	 */
+
+	smp_mb();
+
+#ifdef CONFIG_RCU_STATS
+	rcu_data.n_rcu_try_flip_a2++;
+#endif /* #ifdef CONFIG_RCU_STATS */
+
+	return 0;
+}
+
+/*
+ * Wait for collective ``last'' counter to reach zero,
+ * then tell all CPUs to do an end-of-grace-period memory barrier.
+ */
+
+static int
+rcu_try_flip_waitzero(int flipctr)
+{
+	int cpu;
+	int lastidx = !(flipctr & 0x1);
+	int sum = 0;
+
+#ifdef CONFIG_RCU_STATS
+	rcu_data.n_rcu_try_flip_z1++;
+#endif /* #ifdef CONFIG_RCU_STATS */
+
+	/* Check to see if the sum of the "last" counters is zero. */
+
+	for_each_cpu(cpu)
+		sum += per_cpu(rcu_flipctr, cpu)[lastidx];
+	if (sum != 0) {
+#ifdef CONFIG_RCU_STATS
+		rcu_data.n_rcu_try_flip_ze1++;
+#endif /* #ifdef CONFIG_RCU_STATS */
+		return 1;
+	}
+
+#ifdef CONFIG_RCU_STATS
+	rcu_data.n_rcu_try_flip_z2++;
+#endif /* #ifdef CONFIG_RCU_STATS */
+
+	/* Make sure we don't call for memory barriers before we see zero. */
+
+	smp_mb();
+
+	/* Call for a memory barrier from each CPU. */
+
+	for_each_cpu(cpu)
+		per_cpu(rcu_mb_flag, cpu) = rcu_mb_needed;
+
+	return 0;
+}
+
+/*
+ * Wait for all CPUs to do their end-of-grace-period memory barrier.
+ * Return 0 once all CPUs have done so.
+ */
+
+static int
+rcu_try_flip_waitmb(int flipctr)
+{
+	int cpu;
+
+#ifdef CONFIG_RCU_STATS
+	rcu_data.n_rcu_try_flip_m1++;
+#endif /* #ifdef CONFIG_RCU_STATS */
+
+	for_each_cpu(cpu)
+		if (per_cpu(rcu_mb_flag, cpu) != rcu_mb_done) {
+#ifdef CONFIG_RCU_STATS
+			rcu_data.n_rcu_try_flip_me1++;
+#endif /* #ifdef CONFIG_RCU_STATS */
+			return 1;
+		}
+
+	smp_mb(); /* Ensure that the above checks precede any following flip. */
+
+#ifdef CONFIG_RCU_STATS
+	rcu_data.n_rcu_try_flip_m2++;
+#endif /* #ifdef CONFIG_RCU_STATS */
+
+	return 0;
 }
 
 /*
  * Attempt a single flip of the counters.  Remember, a single flip does
  * -not- constitute a grace period.  Instead, the interval between
- * a pair of consecutive flips is a grace period.
+ * at least three consecutive flips is a grace period.
  *
  * If anyone is nuts enough to run this CONFIG_PREEMPT_RCU implementation
  * on a large SMP, they might want to use a hierarchical organization of
@@ -231,13 +546,11 @@ __rcu_advance_callbacks(void)
 static void
 rcu_try_flip(void)
 {
-	int cpu;
 	long flipctr;
 	unsigned long oldirq;
 
-	flipctr = rcu_ctrlblk.completed;
 #ifdef CONFIG_RCU_STATS
-	atomic_inc(&rcu_data.n_rcu_try_flip1);
+	atomic_inc(&rcu_data.n_rcu_try_flip_1);
 #endif /* #ifdef CONFIG_RCU_STATS */
 	if (unlikely(!spin_trylock_irqsave(&rcu_ctrlblk.fliplock, oldirq))) {
 #ifdef CONFIG_RCU_STATS
@@ -245,52 +558,82 @@ rcu_try_flip(void)
 #endif /* #ifdef CONFIG_RCU_STATS */
 		return;
 	}
-	if (unlikely(flipctr != rcu_ctrlblk.completed)) {
-
-		/* Our work is done!  ;-) */
-
-#ifdef CONFIG_RCU_STATS
-		rcu_data.n_rcu_try_flip_e2++;
-#endif /* #ifdef CONFIG_RCU_STATS */
-		spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
-		return;
-	}
-	flipctr &= 0x1;
 
 	/*
-	 * Check for completion of all RCU read-side critical sections
-	 * that started prior to the previous flip.
+	 * Take the next transition(s) through the RCU grace-period
+	 * flip-counter state machine.
 	 */
 
-#ifdef CONFIG_RCU_STATS
-	rcu_data.n_rcu_try_flip2++;
-#endif /* #ifdef CONFIG_RCU_STATS */
-	for_each_cpu(cpu) {
-		if (atomic_read(&per_cpu(rcu_flipctr, cpu)[!flipctr]) != 0) {
-#ifdef CONFIG_RCU_STATS
-			rcu_data.n_rcu_try_flip_e3++;
-#endif /* #ifdef CONFIG_RCU_STATS */
-			spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
-			return;
-		}
+	flipctr = rcu_ctrlblk.completed;
+	switch (rcu_try_flip_state) {
+	case rcu_try_flip_idle_state:
+		if (rcu_try_flip_idle(flipctr))
+			break;
+		rcu_try_flip_state = rcu_try_flip_in_gp_state;
+	case rcu_try_flip_in_gp_state:
+		if (rcu_try_flip_in_gp(flipctr))
+			break;
+		rcu_try_flip_state = rcu_try_flip_waitack_state;
+	case rcu_try_flip_waitack_state:
+		if (rcu_try_flip_waitack(flipctr))
+			break;
+		rcu_try_flip_state = rcu_try_flip_waitzero_state;
+	case rcu_try_flip_waitzero_state:
+		if (rcu_try_flip_waitzero(flipctr))
+			break;
+		rcu_try_flip_state = rcu_try_flip_waitmb_state;
+	case rcu_try_flip_waitmb_state:
+		if (rcu_try_flip_waitmb(flipctr))
+			break;
+		rcu_try_flip_state = rcu_try_flip_idle_state;
 	}
+	spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
+}
 
-	/* Do the flip. */
+/*
+ * Check to see if this CPU needs to report that it has seen the most
+ * recent counter flip, thereby declaring that all subsequent
+ * rcu_read_lock() invocations will respect this flip.
+ */
 
-	smp_mb();
-	rcu_ctrlblk.completed++;
+static void
+rcu_check_flipseen(int cpu)
+{
+	if (per_cpu(rcu_flip_flag, cpu) == rcu_flipped) {
+		smp_mb();  /* Subsequent counter acccesses must see new value */
+		per_cpu(rcu_flip_flag, cpu) = rcu_flip_seen;
+		smp_mb();  /* probably be implied by interrupt, but... */
+	}
+}
 
-#ifdef CONFIG_RCU_STATS
-	rcu_data.n_rcu_try_flip3++;
-#endif /* #ifdef CONFIG_RCU_STATS */
-	spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
+/*
+ * Check to see if this CPU needs to do a memory barrier in order to
+ * ensure that any prior RCU read-side critical sections have committed
+ * their counter manipulations and critical-section memory references
+ * before declaring the grace period to be completed.
+ */
+
+static void
+rcu_check_mb(int cpu)
+{
+	if (per_cpu(rcu_mb_flag, cpu) == rcu_mb_needed) {
+		smp_mb();
+		per_cpu(rcu_mb_flag, cpu) = rcu_mb_done;
+	}
 }
 
+/*
+ * This function is called periodically on each CPU in hardware interrupt
+ * context.
+ */
+
 void
 rcu_check_callbacks(int cpu, int user)
 {
 	unsigned long oldirq;
 
+	rcu_check_flipseen(cpu);
+	rcu_check_mb(cpu);
 	if (rcu_ctrlblk.completed == rcu_data.completed) {
 		rcu_try_flip();
 		if (rcu_ctrlblk.completed == rcu_data.completed) {
@@ -359,10 +702,10 @@ call_rcu(struct rcu_head *head,
 }
 
 /*
- * Crude hack, reduces but does not eliminate possibility of failure.
- * Needs to wait for all CPUs to pass through a -voluntary- context
- * switch to eliminate possibility of failure.  (Maybe just crank
- * priority down...)
+ * Wait until all currently running preempt_disable() code segments
+ * (including hardware-irq-disable segments) complete.  Note that
+ * in -rt this does -not- necessarily result in all currently executing
+ * interrupt -handlers- having completed.
  */
 void
 synchronize_sched(void)
@@ -390,7 +733,7 @@ rcu_pending(int cpu)
 
 void __init rcu_init(void)
 {
-/*&&&&*/printk("WARNING: experimental RCU implementation.\n");
+/*&&&&*/printk("WARNING: experimental non-atomic RCU implementation.\n");
 	spin_lock_init(&rcu_data.lock);
 	rcu_data.completed = 0;
 	rcu_data.nextlist = NULL;
@@ -416,7 +759,8 @@ int rcu_read_proc_data(char *page)
 	return sprintf(page,
 		       "ggp=%ld lgp=%ld rcc=%ld\n"
 		       "na=%ld nl=%ld wa=%ld wl=%ld da=%ld dl=%ld dr=%ld di=%d\n"
-		       "rtf1=%d rtf2=%ld rtf3=%ld rtfe1=%d rtfe2=%ld rtfe3=%ld\n",
+		       "1=%d e1=%d i1=%ld ie1=%ld g1=%ld a1=%ld ae1=%ld a2=%ld\n"
+		       "z1=%ld ze1=%ld z2=%ld m1=%ld me1=%ld m2=%ld\n",
 
 		       rcu_ctrlblk.completed,
 		       rcu_data.completed,
@@ -431,12 +775,20 @@ int rcu_read_proc_data(char *page)
 		       rcu_data.n_done_remove,
 		       atomic_read(&rcu_data.n_done_invoked),
 
-		       atomic_read(&rcu_data.n_rcu_try_flip1),
-		       rcu_data.n_rcu_try_flip2,
-		       rcu_data.n_rcu_try_flip3,
+		       atomic_read(&rcu_data.n_rcu_try_flip_1),
 		       atomic_read(&rcu_data.n_rcu_try_flip_e1),
-		       rcu_data.n_rcu_try_flip_e2,
-		       rcu_data.n_rcu_try_flip_e3);
+		       rcu_data.n_rcu_try_flip_i1,
+		       rcu_data.n_rcu_try_flip_ie1,
+		       rcu_data.n_rcu_try_flip_g1,
+		       rcu_data.n_rcu_try_flip_a1,
+		       rcu_data.n_rcu_try_flip_ae1,
+		       rcu_data.n_rcu_try_flip_a2,
+		       rcu_data.n_rcu_try_flip_z1,
+		       rcu_data.n_rcu_try_flip_ze1,
+		       rcu_data.n_rcu_try_flip_z2,
+		       rcu_data.n_rcu_try_flip_m1,
+		       rcu_data.n_rcu_try_flip_me1,
+		       rcu_data.n_rcu_try_flip_m2);
 }
 
 int rcu_read_proc_gp_data(char *page)
@@ -464,14 +816,23 @@ int rcu_read_proc_ctrs_data(char *page)
 	int cpu;
 	int f = rcu_data.completed & 0x1;
 
-	cnt += sprintf(&page[cnt], "CPU last cur\n");
+	cnt += sprintf(&page[cnt], "CPU last cur F M\n");
 	for_each_cpu(cpu) {
-		cnt += sprintf(&page[cnt], "%3d %4d %3d\n",
+		cnt += sprintf(&page[cnt], "%3d %4d %3d %d %d\n",
 			       cpu,
-			       atomic_read(&per_cpu(rcu_flipctr, cpu)[!f]),
-			       atomic_read(&per_cpu(rcu_flipctr, cpu)[f]));
-	}
-	cnt += sprintf(&page[cnt], "ggp = %ld\n", rcu_data.completed);
+			       per_cpu(rcu_flipctr, cpu)[!f],
+			       per_cpu(rcu_flipctr, cpu)[f],
+			       per_cpu(rcu_flip_flag, cpu),
+			       per_cpu(rcu_mb_flag, cpu));
+	}
+	cnt += sprintf(&page[cnt], "ggp = %ld, state = %d",
+		       rcu_data.completed, rcu_try_flip_state);
+	if ((0 <= rcu_try_flip_state) &&
+	    (rcu_try_flip_state <= sizeof(rcu_try_flip_state_names) /
+	    			   sizeof(rcu_try_flip_state_names[0])))
+		cnt += sprintf(&page[cnt], " (%s)",
+			       rcu_try_flip_state_names[rcu_try_flip_state]);
+	cnt += sprintf(&page[cnt], "\n");
 	return (cnt);
 }
 

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [RFC, PATCH -rt] NMI-safe mb- and atomic-free RT RCU
  2006-07-26  0:17 [RFC, PATCH -rt] NMI-safe mb- and atomic-free RT RCU Paul E. McKenney
@ 2006-07-27  1:39 ` Esben Nielsen
  2006-07-27  1:39   ` Paul E. McKenney
  0 siblings, 1 reply; 13+ messages in thread
From: Esben Nielsen @ 2006-07-27  1:39 UTC (permalink / raw)
  To: Paul E. McKenney
  Cc: linux-kernel, nielsen.esben, compudj, billh, billh, rostedt, tglx,
	mingo, dipankar, rusty



On Tue, 25 Jul 2006, Paul E. McKenney wrote:

> Not for inclusion, should be viewed with great suspicion.
>
> This patch provides an NMI-safe realtime RCU.  Hopefully, Mathieu can
> make use of synchronize_sched() instead, since I have not yet figured
> out a way to make this NMI-safe and still get rid of the interrupt
> disabling.  ;-)
>
> 						Thanx, Paul

I must say I don't understand all this. It looks very complicated. Is it 
really needed?

I have been thinking about the following design:

void rcu_read_lock()
{
 	if (!in_interrupt())
 		current->rcu_read_lock_count++;
}
void rcu_read_unlock()
{
 	if (!in_interrupt())
 		current->rcu_read_lock_count--;
}

Somewhere in schedule():

 	rq->rcu_read_lock_count += prev->rcu_read_lock_count;
 	if (!rq->rcu_read_lock_count)
 		forward_waiting_rcu_jobs();
 	rq->rcu_read_lock_count -= next->rcu_read_lock_count;

Now what should forward_waiting_rcu_jobs() do?

I imagine a circular datastructur of all the CPUs. When a call_rcu() is 
run on a CPU it is first added a list of jobs for that CPU. When 
forward_waiting_rcu_jobs() is called all the pending jobs are forwarded to 
the next CPU. The next CPU will bring it along the next CPU in the circle 
along with it's own jobs. When jobs hit the original CPU they will be 
executed. Or rather, when the CPU just before calls 
forward_waiting_rcu_jobs(), it sends the jobs belonging to the next CPU to 
the RCU-task of the next CPU, where they will be executed, instead of to 
the scheduler (runqueue) of the next CPU, where it will just be send out on a
new roundtrip along the circle.

If you use a structure like the plist then the forwarding procedure can be 
done in O(number of online CPUs) time worst case - much less in the usual 
case where the lists are almost empty.

Now the problem is: What happens if a task in a rcu read-side lock is 
migrated? Then the rcu_read_lock_count on the source CPU will stay in plus 
while on the target CPU it will go in minus. This ought to be simply 
fixeable by adding task->rcu_read_lock_count to the target runqueue before 
migrating and subtracting it from the old runqueue after migrating. But 
there is another problem: RCU-jobs refering to data used by the task being 
migrated might have been forwarded from the target CPU. Thus the migration
task have to go back along the circle of CPUs and move all the relevant
RCU-jobs back to the target CPU to be forwarded again. This is also doable in
number of CPUs between source and target times O(<number of online CPUs>)
(see above) time.

To avoid a task in a read-side lock being starved for too long the 
following line can be added to normal_prio():
   if (p->rcu_read_lock_count)
 	p->prio = MAX_RT_PRIO;

I don't have time to code this nor a SMP machine to test it on. But I can 
give the idea to you anyways in the hope you might code it :-)

Esben


>
> Signed-off-by: Paul E. McKenney <paulmck@us.ibm.com> (but please don't apply)
>
> include/linux/sched.h |    3
> kernel/rcupreempt.c   |  577 ++++++++++++++++++++++++++++++++++++++++----------
> 2 files changed, 470 insertions(+), 110 deletions(-)
>
> diff -urpNa -X dontdiff linux-2.6.17-rt7/include/linux/sched.h linux-2.6.17-rt7-norrupt/include/linux/sched.h
> --- linux-2.6.17-rt7/include/linux/sched.h	2006-07-19 01:43:09.000000000 -0700
> +++ linux-2.6.17-rt7-norrupt/include/linux/sched.h	2006-07-22 15:44:36.000000000 -0700
> @@ -868,8 +868,7 @@ struct task_struct {
>
> #ifdef CONFIG_PREEMPT_RCU
> 	int rcu_read_lock_nesting;
> -	atomic_t *rcu_flipctr1;
> -	atomic_t *rcu_flipctr2;
> +	int rcu_flipctr_idx;
> #endif
> #ifdef CONFIG_SCHEDSTATS
> 	struct sched_info sched_info;
> diff -urpNa -X dontdiff linux-2.6.17-rt7/kernel/rcupreempt.c linux-2.6.17-rt7-norrupt/kernel/rcupreempt.c
> --- linux-2.6.17-rt7/kernel/rcupreempt.c	2006-07-19 01:43:09.000000000 -0700
> +++ linux-2.6.17-rt7-norrupt/kernel/rcupreempt.c	2006-07-22 20:21:46.000000000 -0700
> @@ -15,11 +15,13 @@
>  * along with this program; if not, write to the Free Software
>  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
>  *
> - * Copyright (C) IBM Corporation, 2001
> + * Copyright (C) IBM Corporation, 2006
>  *
>  * Authors: Paul E. McKenney <paulmck@us.ibm.com>
>  *		With thanks to Esben Nielsen, Bill Huey, and Ingo Molnar
> - *		for pushing me away from locks and towards counters.
> + *		for pushing me away from locks and towards counters, and
> + *		to Suparna Bhattacharya for pushing me completely away
> + *		from atomic instructions on the read side.
>  *
>  * Papers:  http://www.rdrop.com/users/paulmck/RCU
>  *
> @@ -73,12 +75,20 @@ struct rcu_data {
> 	long		n_done_remove;
> 	atomic_t	n_done_invoked;
> 	long		n_rcu_check_callbacks;
> -	atomic_t	n_rcu_try_flip1;
> -	long		n_rcu_try_flip2;
> -	long		n_rcu_try_flip3;
> +	atomic_t	n_rcu_try_flip_1;
> 	atomic_t	n_rcu_try_flip_e1;
> -	long		n_rcu_try_flip_e2;
> -	long		n_rcu_try_flip_e3;
> +	long		n_rcu_try_flip_i1;
> +	long		n_rcu_try_flip_ie1;
> +	long		n_rcu_try_flip_g1;
> +	long		n_rcu_try_flip_a1;
> +	long		n_rcu_try_flip_ae1;
> +	long		n_rcu_try_flip_a2;
> +	long		n_rcu_try_flip_z1;
> +	long		n_rcu_try_flip_ze1;
> +	long		n_rcu_try_flip_z2;
> +	long		n_rcu_try_flip_m1;
> +	long		n_rcu_try_flip_me1;
> +	long		n_rcu_try_flip_m2;
> #endif /* #ifdef CONFIG_RCU_STATS */
> };
> struct rcu_ctrlblk {
> @@ -90,8 +100,51 @@ static struct rcu_ctrlblk rcu_ctrlblk =
> 	.fliplock = RAW_SPIN_LOCK_UNLOCKED,
> 	.completed = 0,
> };
> -static DEFINE_PER_CPU(atomic_t [2], rcu_flipctr) =
> -	{ ATOMIC_INIT(0), ATOMIC_INIT(0) };
> +static DEFINE_PER_CPU(int [2], rcu_flipctr) = { 0, 0 };
> +
> +/*
> + * States for rcu_try_flip() and friends.
> + */
> +
> +enum rcu_try_flip_states {
> +	rcu_try_flip_idle_state,	/* "I" */
> +	rcu_try_flip_in_gp_state,	/* "G" */
> +	rcu_try_flip_waitack_state, 	/* "A" */
> +	rcu_try_flip_waitzero_state,	/* "Z" */
> +	rcu_try_flip_waitmb_state	/* "M" */
> +};
> +static enum rcu_try_flip_states rcu_try_flip_state = rcu_try_flip_idle_state;
> +#ifdef CONFIG_RCU_STATS
> +static char *rcu_try_flip_state_names[] =
> +	{ "idle", "gp", "waitack", "waitzero", "waitmb" };
> +#endif /* #ifdef CONFIG_RCU_STATS */
> +
> +/*
> + * Enum and per-CPU flag to determine when each CPU has seen
> + * the most recent counter flip.
> + */
> +
> +enum rcu_flip_flag_values {
> +	rcu_flip_seen,		/* Steady/initial state, last flip seen. */
> +				/* Only GP detector can update. */
> +	rcu_flipped		/* Flip just completed, need confirmation. */
> +				/* Only corresponding CPU can update. */
> +};
> +static DEFINE_PER_CPU(enum rcu_flip_flag_values, rcu_flip_flag) = rcu_flip_seen;
> +
> +/*
> + * Enum and per-CPU flag to determine when each CPU has executed the
> + * needed memory barrier to fence in memory references from its last RCU
> + * read-side critical section in the just-completed grace period.
> + */
> +
> +enum rcu_mb_flag_values {
> +	rcu_mb_done,		/* Steady/initial state, no mb()s required. */
> +				/* Only GP detector can update. */
> +	rcu_mb_needed		/* Flip just completed, need an mb(). */
> +				/* Only corresponding CPU can update. */
> +};
> +static DEFINE_PER_CPU(enum rcu_mb_flag_values, rcu_mb_flag) = rcu_mb_done;
>
> /*
>  * Return the number of RCU batches processed thus far.  Useful
> @@ -105,93 +158,182 @@ long rcu_batches_completed(void)
> void
> rcu_read_lock(void)
> {
> -	int flipctr;
> +	int idx;
> +	int nesting;
> 	unsigned long oldirq;
>
> -	local_irq_save(oldirq);
> -
> +	raw_local_irq_save(oldirq);
> 	trace_special(current->rcu_read_lock_nesting,
> 		      (unsigned long) current->rcu_flipctr1,
> 		      rcu_ctrlblk.completed);
> -	if (current->rcu_read_lock_nesting++ == 0) {
> +
> +	nesting = current->rcu_read_lock_nesting;
> +
> +	/*
> +	 * Any rcu_read_lock()s called from NMI handlers
> +	 * at any point must have matching rcu_read_unlock()
> +	 * calls in that same handler, so we will not see the
> +	 * value of current->rcu_read_lock_nesting change.
> +	 */
> +
> +	if (nesting != 0) {
>
> 		/*
> -		 * Outermost nesting of rcu_read_lock(), so atomically
> +		 * There is an enclosing rcu_read_lock(), so all we
> +		 * need to do is to increment the counter.
> +		 */
> +
> +		current->rcu_read_lock_nesting = nesting + 1;
> +
> +	} else {
> +
> +		/*
> +		 * Outermost nesting of rcu_read_lock(), so we must
> 		 * increment the current counter for the current CPU.
> +		 * This must be done carefully, because NMIs can
> +		 * occur at any point in this code, and any rcu_read_lock()
> +		 * and rcu_read_unlock() pairs in the NMI handlers
> +		 * must interact non-destructively with this code.
> +		 * Lots of barrier() calls, and -very- careful ordering.
> +		 *
> +		 * Changes to this code, including this one, must be
> +		 * inspected, validated, and tested extremely carefully!!!
> 		 */
>
> -		flipctr = rcu_ctrlblk.completed & 0x1;
> +		/*
> +		 * First, pick up the index.  Enforce ordering for
> +		 * both compilers and for DEC Alpha.
> +		 */
> +
> +		idx = rcu_ctrlblk.completed & 0x1;
> 		smp_read_barrier_depends();
> -		current->rcu_flipctr1 = &(__get_cpu_var(rcu_flipctr)[flipctr]);
> -		/* Can optimize to non-atomic on fastpath, but start simple. */
> -		atomic_inc(current->rcu_flipctr1);
> -		smp_mb__after_atomic_inc();  /* might optimize out... */
> -		if (unlikely(flipctr != (rcu_ctrlblk.completed & 0x1))) {
> -
> -			/*
> -			 * We raced with grace-period processing (flip).
> -			 * Although we cannot be preempted here, there
> -			 * could be interrupts, ECC errors and the like,
> -			 * so just nail down both sides of the rcu_flipctr
> -			 * array for the duration of our RCU read-side
> -			 * critical section, preventing a second flip
> -			 * from racing with us.  At some point, it would
> -			 * be safe to decrement one of the counters, but
> -			 * we have no way of knowing when that would be.
> -			 * So just decrement them both in rcu_read_unlock().
> -			 */
> -
> -			current->rcu_flipctr2 =
> -				&(__get_cpu_var(rcu_flipctr)[!flipctr]);
> -			/* Can again optimize to non-atomic on fastpath. */
> -			atomic_inc(current->rcu_flipctr2);
> -			smp_mb__after_atomic_inc();  /* might optimize out... */
> -		}
> +		barrier();
> +
> +		/*
> +		 * Increment the per-CPU counter. NMI handlers
> +		 * might increment it as well, but they had better
> +		 * properly nest their rcu_read_lock()/rcu_read_unlock()
> +		 * pairs so that the value is restored before the handler
> +		 * returns to us.  Enforce ordering for compilers.
> +		 */
> +
> +		__get_cpu_var(rcu_flipctr)[idx]++;
> +		barrier();
> +
> +		/*
> +		 * It is now safe to increment the task's nesting count.
> +		 * NMIs that occur after this statement will route
> +		 * their rcu_read_lock() calls through the "then" clause
> +		 * of this "if" statement, and thus will no longer come
> +		 * through this path.  Enforce ordering for compilers.
> +		 */
> +
> +		current->rcu_read_lock_nesting = nesting + 1;
> +		barrier();
> +
> +		/*
> +		 * It is now safe to store the index into our task
> +		 * structure.  Doing so earlier would have resulted
> +		 * in it getting clobbered by NMI handlers.
> +		 */
> +
> +		current->rcu_flipctr_idx = idx;
> 	}
> +
> 	trace_special((unsigned long) current->rcu_flipctr1,
> 		      (unsigned long) current->rcu_flipctr2,
> 		      rcu_ctrlblk.completed);
> -	local_irq_restore(oldirq);
> +	raw_local_irq_restore(oldirq);
> }
>
> void
> rcu_read_unlock(void)
> {
> +	int idx;
> +	int nesting;
> 	unsigned long oldirq;
>
> -	local_irq_save(oldirq);
> +	raw_local_irq_save(oldirq);
> 	trace_special((unsigned long) current->rcu_flipctr1,
> 		      (unsigned long) current->rcu_flipctr2,
> 		      current->rcu_read_lock_nesting);
> -	if (--current->rcu_read_lock_nesting == 0) {
> +
> +	nesting = current->rcu_read_lock_nesting;
> +
> +	/*
> +	 * Any rcu_read_lock()s called from NMI handlers
> +	 * at any point must have matching rcu_read_unlock()
> +	 * calls in that same handler, so we will not see the
> +	 * value of current->rcu_read_lock_nesting change.
> +	 */
> +
> +	if (nesting > 1) {
>
> 		/*
> -		 * Just atomically decrement whatever we incremented.
> -		 * Might later want to awaken some task waiting for the
> -		 * grace period to complete, but keep it simple for the
> -		 * moment.
> +		 * There is an enclosing rcu_read_lock(), so all we
> +		 * need to do is to decrement the counter.
> 		 */
>
> -		smp_mb__before_atomic_dec();
> -		atomic_dec(current->rcu_flipctr1);
> -		current->rcu_flipctr1 = NULL;
> -		if (unlikely(current->rcu_flipctr2 != NULL)) {
> -			atomic_dec(current->rcu_flipctr2);
> -			current->rcu_flipctr2 = NULL;
> -		}
> +		current->rcu_read_lock_nesting = nesting - 1;
> +
> +	} else {
> +
> +		/*
> +		 * Outermost nesting of rcu_read_unlock(), so we must
> +		 * decrement the current counter for the current CPU.
> +		 * This must be done carefully, because NMIs can
> +		 * occur at any point in this code, and any rcu_read_lock()
> +		 * and rcu_read_unlock() pairs in the NMI handlers
> +		 * must interact non-destructively with this code.
> +		 * Lots of barrier() calls, and -very- careful ordering.
> +		 *
> +		 * Changes to this code, including this one, must be
> +		 * inspected, validated, and tested extremely carefully!!!
> +		 */
> +
> +		/*
> +		 * First, pick up the index.  Enforce ordering for
> +		 * both compilers and for DEC Alpha.
> +		 */
> +
> +		idx = current->rcu_flipctr_idx;
> +		smp_read_barrier_depends();
> +		barrier();
> +
> +		/*
> +		 * It is now safe to decrement the task's nesting count.
> +		 * NMIs that occur after this statement will route
> +		 * their rcu_read_lock() calls through this "else" clause
> +		 * of this "if" statement, and thus will start incrementing
> +		 * the per-CPU counter on their own.  Enforce ordering for
> +		 * compilers.
> +		 */
> +
> +		current->rcu_read_lock_nesting = nesting - 1;
> +		barrier();
> +
> +		/*
> +		 * Decrement the per-CPU counter. NMI handlers
> +		 * might increment it as well, but they had better
> +		 * properly nest their rcu_read_lock()/rcu_read_unlock()
> +		 * pairs so that the value is restored before the handler
> +		 * returns to us.
> +		 */
> +
> +		__get_cpu_var(rcu_flipctr)[idx]--;
> 	}
>
> 	trace_special((unsigned long)current->rcu_flipctr1,
> 		      (unsigned long) current->rcu_flipctr2,
> 		      current->rcu_read_lock_nesting);
> -	local_irq_restore(oldirq);
> +	raw_local_irq_restore(oldirq);
> }
>
> static void
> __rcu_advance_callbacks(void)
> {
>
> -	if (rcu_data.completed != rcu_ctrlblk.completed) {
> +	if ((rcu_data.completed >> 1) != (rcu_ctrlblk.completed >> 1)) {
> 		if (rcu_data.waitlist != NULL) {
> 			*rcu_data.donetail = rcu_data.waitlist;
> 			rcu_data.donetail = rcu_data.waittail;
> @@ -216,13 +358,186 @@ __rcu_advance_callbacks(void)
> 			rcu_data.waittail = &rcu_data.waitlist;
> 		}
> 		rcu_data.completed = rcu_ctrlblk.completed;
> +	} else if (rcu_data.completed != rcu_ctrlblk.completed) {
> +		rcu_data.completed = rcu_ctrlblk.completed;
> +	}
> +}
> +
> +/*
> + * Get here when RCU is idle.  Decide whether we need to
> + * move out of idle state, and return zero if so.
> + * "Straightforward" approach for the moment, might later
> + * use callback-list lengths, grace-period duration, or
> + * some such to determine when to exit idle state.
> + * Might also need a pre-idle test that does not acquire
> + * the lock, but let's get the simple case working first...
> + */
> +
> +static int
> +rcu_try_flip_idle(int flipctr)
> +{
> +#ifdef CONFIG_RCU_STATS
> +	rcu_data.n_rcu_try_flip_i1++;
> +#endif /* #ifdef CONFIG_RCU_STATS */
> +
> +	if (!rcu_pending(smp_processor_id())) {
> +#ifdef CONFIG_RCU_STATS
> +		rcu_data.n_rcu_try_flip_ie1++;
> +#endif /* #ifdef CONFIG_RCU_STATS */
> +		return 1;
> 	}
> +	return 0;
> +}
> +
> +/*
> + * Flip processing up to and including the flip, as well as
> + * telling CPUs to acknowledge the flip.
> + */
> +
> +static int
> +rcu_try_flip_in_gp(int flipctr)
> +{
> +	int cpu;
> +
> +#ifdef CONFIG_RCU_STATS
> +	rcu_data.n_rcu_try_flip_g1++;
> +#endif /* #ifdef CONFIG_RCU_STATS */
> +
> +	/*
> +	 * Do the flip.
> +	 */
> +
> +	rcu_ctrlblk.completed++;  /* stands in for rcu_try_flip_g2 */
> +
> +	/*
> +	 * Need a memory barrier so that other CPUs see the new
> +	 * counter value before they see the subsequent change of all
> +	 * the rcu_flip_flag instances to rcu_flipped.
> +	 */
> +
> +	smp_mb();
> +
> +	/* Now ask each CPU for acknowledgement of the flip. */
> +
> +	for_each_cpu(cpu)
> +		per_cpu(rcu_flip_flag, cpu) = rcu_flipped;
> +
> +	return 0;
> +}
> +
> +/*
> + * Wait for CPUs to acknowledge the flip.
> + */
> +
> +static int
> +rcu_try_flip_waitack(int flipctr)
> +{
> +	int cpu;
> +
> +#ifdef CONFIG_RCU_STATS
> +	rcu_data.n_rcu_try_flip_a1++;
> +#endif /* #ifdef CONFIG_RCU_STATS */
> +
> +	for_each_cpu(cpu)
> +		if (per_cpu(rcu_flip_flag, cpu) != rcu_flip_seen) {
> +#ifdef CONFIG_RCU_STATS
> +			rcu_data.n_rcu_try_flip_ae1++;
> +#endif /* #ifdef CONFIG_RCU_STATS */
> +			return 1;
> +		}
> +
> +	/*
> +	 * Make sure our checks above don't bleed into subsequent
> +	 * waiting for the sum of the counters to reach zero.
> +	 */
> +
> +	smp_mb();
> +
> +#ifdef CONFIG_RCU_STATS
> +	rcu_data.n_rcu_try_flip_a2++;
> +#endif /* #ifdef CONFIG_RCU_STATS */
> +
> +	return 0;
> +}
> +
> +/*
> + * Wait for collective ``last'' counter to reach zero,
> + * then tell all CPUs to do an end-of-grace-period memory barrier.
> + */
> +
> +static int
> +rcu_try_flip_waitzero(int flipctr)
> +{
> +	int cpu;
> +	int lastidx = !(flipctr & 0x1);
> +	int sum = 0;
> +
> +#ifdef CONFIG_RCU_STATS
> +	rcu_data.n_rcu_try_flip_z1++;
> +#endif /* #ifdef CONFIG_RCU_STATS */
> +
> +	/* Check to see if the sum of the "last" counters is zero. */
> +
> +	for_each_cpu(cpu)
> +		sum += per_cpu(rcu_flipctr, cpu)[lastidx];
> +	if (sum != 0) {
> +#ifdef CONFIG_RCU_STATS
> +		rcu_data.n_rcu_try_flip_ze1++;
> +#endif /* #ifdef CONFIG_RCU_STATS */
> +		return 1;
> +	}
> +
> +#ifdef CONFIG_RCU_STATS
> +	rcu_data.n_rcu_try_flip_z2++;
> +#endif /* #ifdef CONFIG_RCU_STATS */
> +
> +	/* Make sure we don't call for memory barriers before we see zero. */
> +
> +	smp_mb();
> +
> +	/* Call for a memory barrier from each CPU. */
> +
> +	for_each_cpu(cpu)
> +		per_cpu(rcu_mb_flag, cpu) = rcu_mb_needed;
> +
> +	return 0;
> +}
> +
> +/*
> + * Wait for all CPUs to do their end-of-grace-period memory barrier.
> + * Return 0 once all CPUs have done so.
> + */
> +
> +static int
> +rcu_try_flip_waitmb(int flipctr)
> +{
> +	int cpu;
> +
> +#ifdef CONFIG_RCU_STATS
> +	rcu_data.n_rcu_try_flip_m1++;
> +#endif /* #ifdef CONFIG_RCU_STATS */
> +
> +	for_each_cpu(cpu)
> +		if (per_cpu(rcu_mb_flag, cpu) != rcu_mb_done) {
> +#ifdef CONFIG_RCU_STATS
> +			rcu_data.n_rcu_try_flip_me1++;
> +#endif /* #ifdef CONFIG_RCU_STATS */
> +			return 1;
> +		}
> +
> +	smp_mb(); /* Ensure that the above checks precede any following flip. */
> +
> +#ifdef CONFIG_RCU_STATS
> +	rcu_data.n_rcu_try_flip_m2++;
> +#endif /* #ifdef CONFIG_RCU_STATS */
> +
> +	return 0;
> }
>
> /*
>  * Attempt a single flip of the counters.  Remember, a single flip does
>  * -not- constitute a grace period.  Instead, the interval between
> - * a pair of consecutive flips is a grace period.
> + * at least three consecutive flips is a grace period.
>  *
>  * If anyone is nuts enough to run this CONFIG_PREEMPT_RCU implementation
>  * on a large SMP, they might want to use a hierarchical organization of
> @@ -231,13 +546,11 @@ __rcu_advance_callbacks(void)
> static void
> rcu_try_flip(void)
> {
> -	int cpu;
> 	long flipctr;
> 	unsigned long oldirq;
>
> -	flipctr = rcu_ctrlblk.completed;
> #ifdef CONFIG_RCU_STATS
> -	atomic_inc(&rcu_data.n_rcu_try_flip1);
> +	atomic_inc(&rcu_data.n_rcu_try_flip_1);
> #endif /* #ifdef CONFIG_RCU_STATS */
> 	if (unlikely(!spin_trylock_irqsave(&rcu_ctrlblk.fliplock, oldirq))) {
> #ifdef CONFIG_RCU_STATS
> @@ -245,52 +558,82 @@ rcu_try_flip(void)
> #endif /* #ifdef CONFIG_RCU_STATS */
> 		return;
> 	}
> -	if (unlikely(flipctr != rcu_ctrlblk.completed)) {
> -
> -		/* Our work is done!  ;-) */
> -
> -#ifdef CONFIG_RCU_STATS
> -		rcu_data.n_rcu_try_flip_e2++;
> -#endif /* #ifdef CONFIG_RCU_STATS */
> -		spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
> -		return;
> -	}
> -	flipctr &= 0x1;
>
> 	/*
> -	 * Check for completion of all RCU read-side critical sections
> -	 * that started prior to the previous flip.
> +	 * Take the next transition(s) through the RCU grace-period
> +	 * flip-counter state machine.
> 	 */
>
> -#ifdef CONFIG_RCU_STATS
> -	rcu_data.n_rcu_try_flip2++;
> -#endif /* #ifdef CONFIG_RCU_STATS */
> -	for_each_cpu(cpu) {
> -		if (atomic_read(&per_cpu(rcu_flipctr, cpu)[!flipctr]) != 0) {
> -#ifdef CONFIG_RCU_STATS
> -			rcu_data.n_rcu_try_flip_e3++;
> -#endif /* #ifdef CONFIG_RCU_STATS */
> -			spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
> -			return;
> -		}
> +	flipctr = rcu_ctrlblk.completed;
> +	switch (rcu_try_flip_state) {
> +	case rcu_try_flip_idle_state:
> +		if (rcu_try_flip_idle(flipctr))
> +			break;
> +		rcu_try_flip_state = rcu_try_flip_in_gp_state;
> +	case rcu_try_flip_in_gp_state:
> +		if (rcu_try_flip_in_gp(flipctr))
> +			break;
> +		rcu_try_flip_state = rcu_try_flip_waitack_state;
> +	case rcu_try_flip_waitack_state:
> +		if (rcu_try_flip_waitack(flipctr))
> +			break;
> +		rcu_try_flip_state = rcu_try_flip_waitzero_state;
> +	case rcu_try_flip_waitzero_state:
> +		if (rcu_try_flip_waitzero(flipctr))
> +			break;
> +		rcu_try_flip_state = rcu_try_flip_waitmb_state;
> +	case rcu_try_flip_waitmb_state:
> +		if (rcu_try_flip_waitmb(flipctr))
> +			break;
> +		rcu_try_flip_state = rcu_try_flip_idle_state;
> 	}
> +	spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
> +}
>
> -	/* Do the flip. */
> +/*
> + * Check to see if this CPU needs to report that it has seen the most
> + * recent counter flip, thereby declaring that all subsequent
> + * rcu_read_lock() invocations will respect this flip.
> + */
>
> -	smp_mb();
> -	rcu_ctrlblk.completed++;
> +static void
> +rcu_check_flipseen(int cpu)
> +{
> +	if (per_cpu(rcu_flip_flag, cpu) == rcu_flipped) {
> +		smp_mb();  /* Subsequent counter acccesses must see new value */
> +		per_cpu(rcu_flip_flag, cpu) = rcu_flip_seen;
> +		smp_mb();  /* probably be implied by interrupt, but... */
> +	}
> +}
>
> -#ifdef CONFIG_RCU_STATS
> -	rcu_data.n_rcu_try_flip3++;
> -#endif /* #ifdef CONFIG_RCU_STATS */
> -	spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
> +/*
> + * Check to see if this CPU needs to do a memory barrier in order to
> + * ensure that any prior RCU read-side critical sections have committed
> + * their counter manipulations and critical-section memory references
> + * before declaring the grace period to be completed.
> + */
> +
> +static void
> +rcu_check_mb(int cpu)
> +{
> +	if (per_cpu(rcu_mb_flag, cpu) == rcu_mb_needed) {
> +		smp_mb();
> +		per_cpu(rcu_mb_flag, cpu) = rcu_mb_done;
> +	}
> }
>
> +/*
> + * This function is called periodically on each CPU in hardware interrupt
> + * context.
> + */
> +
> void
> rcu_check_callbacks(int cpu, int user)
> {
> 	unsigned long oldirq;
>
> +	rcu_check_flipseen(cpu);
> +	rcu_check_mb(cpu);
> 	if (rcu_ctrlblk.completed == rcu_data.completed) {
> 		rcu_try_flip();
> 		if (rcu_ctrlblk.completed == rcu_data.completed) {
> @@ -359,10 +702,10 @@ call_rcu(struct rcu_head *head,
> }
>
> /*
> - * Crude hack, reduces but does not eliminate possibility of failure.
> - * Needs to wait for all CPUs to pass through a -voluntary- context
> - * switch to eliminate possibility of failure.  (Maybe just crank
> - * priority down...)
> + * Wait until all currently running preempt_disable() code segments
> + * (including hardware-irq-disable segments) complete.  Note that
> + * in -rt this does -not- necessarily result in all currently executing
> + * interrupt -handlers- having completed.
>  */
> void
> synchronize_sched(void)
> @@ -390,7 +733,7 @@ rcu_pending(int cpu)
>
> void __init rcu_init(void)
> {
> -/*&&&&*/printk("WARNING: experimental RCU implementation.\n");
> +/*&&&&*/printk("WARNING: experimental non-atomic RCU implementation.\n");
> 	spin_lock_init(&rcu_data.lock);
> 	rcu_data.completed = 0;
> 	rcu_data.nextlist = NULL;
> @@ -416,7 +759,8 @@ int rcu_read_proc_data(char *page)
> 	return sprintf(page,
> 		       "ggp=%ld lgp=%ld rcc=%ld\n"
> 		       "na=%ld nl=%ld wa=%ld wl=%ld da=%ld dl=%ld dr=%ld di=%d\n"
> -		       "rtf1=%d rtf2=%ld rtf3=%ld rtfe1=%d rtfe2=%ld rtfe3=%ld\n",
> +		       "1=%d e1=%d i1=%ld ie1=%ld g1=%ld a1=%ld ae1=%ld a2=%ld\n"
> +		       "z1=%ld ze1=%ld z2=%ld m1=%ld me1=%ld m2=%ld\n",
>
> 		       rcu_ctrlblk.completed,
> 		       rcu_data.completed,
> @@ -431,12 +775,20 @@ int rcu_read_proc_data(char *page)
> 		       rcu_data.n_done_remove,
> 		       atomic_read(&rcu_data.n_done_invoked),
>
> -		       atomic_read(&rcu_data.n_rcu_try_flip1),
> -		       rcu_data.n_rcu_try_flip2,
> -		       rcu_data.n_rcu_try_flip3,
> +		       atomic_read(&rcu_data.n_rcu_try_flip_1),
> 		       atomic_read(&rcu_data.n_rcu_try_flip_e1),
> -		       rcu_data.n_rcu_try_flip_e2,
> -		       rcu_data.n_rcu_try_flip_e3);
> +		       rcu_data.n_rcu_try_flip_i1,
> +		       rcu_data.n_rcu_try_flip_ie1,
> +		       rcu_data.n_rcu_try_flip_g1,
> +		       rcu_data.n_rcu_try_flip_a1,
> +		       rcu_data.n_rcu_try_flip_ae1,
> +		       rcu_data.n_rcu_try_flip_a2,
> +		       rcu_data.n_rcu_try_flip_z1,
> +		       rcu_data.n_rcu_try_flip_ze1,
> +		       rcu_data.n_rcu_try_flip_z2,
> +		       rcu_data.n_rcu_try_flip_m1,
> +		       rcu_data.n_rcu_try_flip_me1,
> +		       rcu_data.n_rcu_try_flip_m2);
> }
>
> int rcu_read_proc_gp_data(char *page)
> @@ -464,14 +816,23 @@ int rcu_read_proc_ctrs_data(char *page)
> 	int cpu;
> 	int f = rcu_data.completed & 0x1;
>
> -	cnt += sprintf(&page[cnt], "CPU last cur\n");
> +	cnt += sprintf(&page[cnt], "CPU last cur F M\n");
> 	for_each_cpu(cpu) {
> -		cnt += sprintf(&page[cnt], "%3d %4d %3d\n",
> +		cnt += sprintf(&page[cnt], "%3d %4d %3d %d %d\n",
> 			       cpu,
> -			       atomic_read(&per_cpu(rcu_flipctr, cpu)[!f]),
> -			       atomic_read(&per_cpu(rcu_flipctr, cpu)[f]));
> -	}
> -	cnt += sprintf(&page[cnt], "ggp = %ld\n", rcu_data.completed);
> +			       per_cpu(rcu_flipctr, cpu)[!f],
> +			       per_cpu(rcu_flipctr, cpu)[f],
> +			       per_cpu(rcu_flip_flag, cpu),
> +			       per_cpu(rcu_mb_flag, cpu));
> +	}
> +	cnt += sprintf(&page[cnt], "ggp = %ld, state = %d",
> +		       rcu_data.completed, rcu_try_flip_state);
> +	if ((0 <= rcu_try_flip_state) &&
> +	    (rcu_try_flip_state <= sizeof(rcu_try_flip_state_names) /
> +	    			   sizeof(rcu_try_flip_state_names[0])))
> +		cnt += sprintf(&page[cnt], " (%s)",
> +			       rcu_try_flip_state_names[rcu_try_flip_state]);
> +	cnt += sprintf(&page[cnt], "\n");
> 	return (cnt);
> }
>
>

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [RFC, PATCH -rt] NMI-safe mb- and atomic-free RT RCU
  2006-07-27  1:39 ` Esben Nielsen
@ 2006-07-27  1:39   ` Paul E. McKenney
  2006-07-27 11:00     ` Esben Nielsen
  0 siblings, 1 reply; 13+ messages in thread
From: Paul E. McKenney @ 2006-07-27  1:39 UTC (permalink / raw)
  To: Esben Nielsen
  Cc: linux-kernel, compudj, billh, rostedt, tglx, mingo, dipankar,
	rusty

On Thu, Jul 27, 2006 at 02:39:07AM +0100, Esben Nielsen wrote:
> 
> 
> On Tue, 25 Jul 2006, Paul E. McKenney wrote:
> 
> >Not for inclusion, should be viewed with great suspicion.
> >
> >This patch provides an NMI-safe realtime RCU.  Hopefully, Mathieu can
> >make use of synchronize_sched() instead, since I have not yet figured
> >out a way to make this NMI-safe and still get rid of the interrupt
> >disabling.  ;-)
> >
> >						Thanx, Paul
> 
> I must say I don't understand all this. It looks very complicated. Is it 
> really needed?
> 
> I have been thinking about the following design:
> 
> void rcu_read_lock()
> {
> 	if (!in_interrupt())
> 		current->rcu_read_lock_count++;
> }
> void rcu_read_unlock()
> {
> 	if (!in_interrupt())
> 		current->rcu_read_lock_count--;
> }
> 
> Somewhere in schedule():
> 
> 	rq->rcu_read_lock_count += prev->rcu_read_lock_count;
> 	if (!rq->rcu_read_lock_count)
> 		forward_waiting_rcu_jobs();
> 	rq->rcu_read_lock_count -= next->rcu_read_lock_count;

So rq->rcu_read_lock_count contains the sum of the counts of all
tasks that were scheduled away from this CPU.

What happens in face of the following sequence of events?
Assume that all tasks stay on CPU 0 for the moment.

o	Task A does rcu_read_lock().

o	Task A is preempted.  rq->rcu_read_lock_count is nonzero.

o	Task B runs and does rcu_read_lock().

o	Task B is preempted (perhaps because it dropped the lock
	that was causing it to have high priority).  Regardless of
	the reason for the preemption, rq->rcu_read_lock_count is
	nonzero.

o	Task C runs and does rcu_read_lock().

o	Task C is preempted.  rq->rcu_read_lock_count is nonzero.

o	Task A runs again, and does rcu_read_unlock().

o	Task A is preempted.  rq->rcu_read_lock_count is nonzero.

o	Task D runs and does rcu_read_lock().

o	Task D is preempted.  rq->rcu_read_lock_count is nonzero.

And so on.  As long as at least one of the preempted tasks is in an
RCU critical section, you never do your forward_waiting_rcu_jobs(),
and the grace period goes on forever.  Or at least until you OOM the
machine.

So what am I missing here?

I could imagine introducing a pair of counters per runqueue, but then
we end up with the same issues with counter-flip that we have now.

Another question -- what happens if a given CPU stays idle?  Wouldn't
the callbacks then just start piling up on that CPU?

> Now what should forward_waiting_rcu_jobs() do?
> 
> I imagine a circular datastructur of all the CPUs. When a call_rcu() is 
> run on a CPU it is first added a list of jobs for that CPU. When 
> forward_waiting_rcu_jobs() is called all the pending jobs are forwarded to 
> the next CPU. The next CPU will bring it along the next CPU in the circle 
> along with it's own jobs. When jobs hit the original CPU they will be 
> executed. Or rather, when the CPU just before calls 
> forward_waiting_rcu_jobs(), it sends the jobs belonging to the next CPU to 
> the RCU-task of the next CPU, where they will be executed, instead of to 
> the scheduler (runqueue) of the next CPU, where it will just be send out on 
> a
> new roundtrip along the circle.
> 
> If you use a structure like the plist then the forwarding procedure can be 
> done in O(number of online CPUs) time worst case - much less in the usual 
> case where the lists are almost empty.
> 
> Now the problem is: What happens if a task in a rcu read-side lock is 
> migrated? Then the rcu_read_lock_count on the source CPU will stay in plus 
> while on the target CPU it will go in minus. This ought to be simply 
> fixeable by adding task->rcu_read_lock_count to the target runqueue before 
> migrating and subtracting it from the old runqueue after migrating. But 
> there is another problem: RCU-jobs refering to data used by the task being 
> migrated might have been forwarded from the target CPU. Thus the migration
> task have to go back along the circle of CPUs and move all the relevant
> RCU-jobs back to the target CPU to be forwarded again. This is also doable 
> in
> number of CPUs between source and target times O(<number of online CPUs>)
> (see above) time.

So if I have the right (or wrong) pattern of task migrations, the RCU
callbacks never get to their originating CPU?

Alternatively, if the task residing in the RCU read-side critical section
is forwarded around the loop of CPUs, callbacks circulating around this
loop might execute before the RCU read-side critical section completes.

> To avoid a task in a read-side lock being starved for too long the 
> following line can be added to normal_prio():
>   if (p->rcu_read_lock_count)
> 	p->prio = MAX_RT_PRIO;

But doesn't this have the same impact on latency as disabling preemption
in rcu_read_lock() and then re-enabling it in rcu_read_unlock()?

Also, doesn't this circular data structure need to handle CPU hotplug?

> I don't have time to code this nor a SMP machine to test it on. But I can 
> give the idea to you anyways in the hope you might code it :-)

I am beginning to think that it will not be at all simple by the time I
code up all the required fixups.  Or am I missing something?

							Thanx, Paul

> Esben
> 
> 
> >
> >Signed-off-by: Paul E. McKenney <paulmck@us.ibm.com> (but please don't 
> >apply)
> >
> >include/linux/sched.h |    3
> >kernel/rcupreempt.c   |  577 
> >++++++++++++++++++++++++++++++++++++++++----------
> >2 files changed, 470 insertions(+), 110 deletions(-)
> >
> >diff -urpNa -X dontdiff linux-2.6.17-rt7/include/linux/sched.h 
> >linux-2.6.17-rt7-norrupt/include/linux/sched.h
> >--- linux-2.6.17-rt7/include/linux/sched.h	2006-07-19 
> >01:43:09.000000000 -0700
> >+++ linux-2.6.17-rt7-norrupt/include/linux/sched.h	2006-07-22 
> >15:44:36.000000000 -0700
> >@@ -868,8 +868,7 @@ struct task_struct {
> >
> >#ifdef CONFIG_PREEMPT_RCU
> >	int rcu_read_lock_nesting;
> >-	atomic_t *rcu_flipctr1;
> >-	atomic_t *rcu_flipctr2;
> >+	int rcu_flipctr_idx;
> >#endif
> >#ifdef CONFIG_SCHEDSTATS
> >	struct sched_info sched_info;
> >diff -urpNa -X dontdiff linux-2.6.17-rt7/kernel/rcupreempt.c 
> >linux-2.6.17-rt7-norrupt/kernel/rcupreempt.c
> >--- linux-2.6.17-rt7/kernel/rcupreempt.c	2006-07-19 
> >01:43:09.000000000 -0700
> >+++ linux-2.6.17-rt7-norrupt/kernel/rcupreempt.c	2006-07-22 
> >20:21:46.000000000 -0700
> >@@ -15,11 +15,13 @@
> > * along with this program; if not, write to the Free Software
> > * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, 
> > USA.
> > *
> >- * Copyright (C) IBM Corporation, 2001
> >+ * Copyright (C) IBM Corporation, 2006
> > *
> > * Authors: Paul E. McKenney <paulmck@us.ibm.com>
> > *		With thanks to Esben Nielsen, Bill Huey, and Ingo Molnar
> >- *		for pushing me away from locks and towards counters.
> >+ *		for pushing me away from locks and towards counters, and
> >+ *		to Suparna Bhattacharya for pushing me completely away
> >+ *		from atomic instructions on the read side.
> > *
> > * Papers:  http://www.rdrop.com/users/paulmck/RCU
> > *
> >@@ -73,12 +75,20 @@ struct rcu_data {
> >	long		n_done_remove;
> >	atomic_t	n_done_invoked;
> >	long		n_rcu_check_callbacks;
> >-	atomic_t	n_rcu_try_flip1;
> >-	long		n_rcu_try_flip2;
> >-	long		n_rcu_try_flip3;
> >+	atomic_t	n_rcu_try_flip_1;
> >	atomic_t	n_rcu_try_flip_e1;
> >-	long		n_rcu_try_flip_e2;
> >-	long		n_rcu_try_flip_e3;
> >+	long		n_rcu_try_flip_i1;
> >+	long		n_rcu_try_flip_ie1;
> >+	long		n_rcu_try_flip_g1;
> >+	long		n_rcu_try_flip_a1;
> >+	long		n_rcu_try_flip_ae1;
> >+	long		n_rcu_try_flip_a2;
> >+	long		n_rcu_try_flip_z1;
> >+	long		n_rcu_try_flip_ze1;
> >+	long		n_rcu_try_flip_z2;
> >+	long		n_rcu_try_flip_m1;
> >+	long		n_rcu_try_flip_me1;
> >+	long		n_rcu_try_flip_m2;
> >#endif /* #ifdef CONFIG_RCU_STATS */
> >};
> >struct rcu_ctrlblk {
> >@@ -90,8 +100,51 @@ static struct rcu_ctrlblk rcu_ctrlblk =
> >	.fliplock = RAW_SPIN_LOCK_UNLOCKED,
> >	.completed = 0,
> >};
> >-static DEFINE_PER_CPU(atomic_t [2], rcu_flipctr) =
> >-	{ ATOMIC_INIT(0), ATOMIC_INIT(0) };
> >+static DEFINE_PER_CPU(int [2], rcu_flipctr) = { 0, 0 };
> >+
> >+/*
> >+ * States for rcu_try_flip() and friends.
> >+ */
> >+
> >+enum rcu_try_flip_states {
> >+	rcu_try_flip_idle_state,	/* "I" */
> >+	rcu_try_flip_in_gp_state,	/* "G" */
> >+	rcu_try_flip_waitack_state, 	/* "A" */
> >+	rcu_try_flip_waitzero_state,	/* "Z" */
> >+	rcu_try_flip_waitmb_state	/* "M" */
> >+};
> >+static enum rcu_try_flip_states rcu_try_flip_state = 
> >rcu_try_flip_idle_state;
> >+#ifdef CONFIG_RCU_STATS
> >+static char *rcu_try_flip_state_names[] =
> >+	{ "idle", "gp", "waitack", "waitzero", "waitmb" };
> >+#endif /* #ifdef CONFIG_RCU_STATS */
> >+
> >+/*
> >+ * Enum and per-CPU flag to determine when each CPU has seen
> >+ * the most recent counter flip.
> >+ */
> >+
> >+enum rcu_flip_flag_values {
> >+	rcu_flip_seen,		/* Steady/initial state, last flip seen. */
> >+				/* Only GP detector can update. */
> >+	rcu_flipped		/* Flip just completed, need confirmation. */
> >+				/* Only corresponding CPU can update. */
> >+};
> >+static DEFINE_PER_CPU(enum rcu_flip_flag_values, rcu_flip_flag) = 
> >rcu_flip_seen;
> >+
> >+/*
> >+ * Enum and per-CPU flag to determine when each CPU has executed the
> >+ * needed memory barrier to fence in memory references from its last RCU
> >+ * read-side critical section in the just-completed grace period.
> >+ */
> >+
> >+enum rcu_mb_flag_values {
> >+	rcu_mb_done,		/* Steady/initial state, no mb()s required. 
> >*/
> >+				/* Only GP detector can update. */
> >+	rcu_mb_needed		/* Flip just completed, need an mb(). */
> >+				/* Only corresponding CPU can update. */
> >+};
> >+static DEFINE_PER_CPU(enum rcu_mb_flag_values, rcu_mb_flag) = rcu_mb_done;
> >
> >/*
> > * Return the number of RCU batches processed thus far.  Useful
> >@@ -105,93 +158,182 @@ long rcu_batches_completed(void)
> >void
> >rcu_read_lock(void)
> >{
> >-	int flipctr;
> >+	int idx;
> >+	int nesting;
> >	unsigned long oldirq;
> >
> >-	local_irq_save(oldirq);
> >-
> >+	raw_local_irq_save(oldirq);
> >	trace_special(current->rcu_read_lock_nesting,
> >		      (unsigned long) current->rcu_flipctr1,
> >		      rcu_ctrlblk.completed);
> >-	if (current->rcu_read_lock_nesting++ == 0) {
> >+
> >+	nesting = current->rcu_read_lock_nesting;
> >+
> >+	/*
> >+	 * Any rcu_read_lock()s called from NMI handlers
> >+	 * at any point must have matching rcu_read_unlock()
> >+	 * calls in that same handler, so we will not see the
> >+	 * value of current->rcu_read_lock_nesting change.
> >+	 */
> >+
> >+	if (nesting != 0) {
> >
> >		/*
> >-		 * Outermost nesting of rcu_read_lock(), so atomically
> >+		 * There is an enclosing rcu_read_lock(), so all we
> >+		 * need to do is to increment the counter.
> >+		 */
> >+
> >+		current->rcu_read_lock_nesting = nesting + 1;
> >+
> >+	} else {
> >+
> >+		/*
> >+		 * Outermost nesting of rcu_read_lock(), so we must
> >		 * increment the current counter for the current CPU.
> >+		 * This must be done carefully, because NMIs can
> >+		 * occur at any point in this code, and any rcu_read_lock()
> >+		 * and rcu_read_unlock() pairs in the NMI handlers
> >+		 * must interact non-destructively with this code.
> >+		 * Lots of barrier() calls, and -very- careful ordering.
> >+		 *
> >+		 * Changes to this code, including this one, must be
> >+		 * inspected, validated, and tested extremely carefully!!!
> >		 */
> >
> >-		flipctr = rcu_ctrlblk.completed & 0x1;
> >+		/*
> >+		 * First, pick up the index.  Enforce ordering for
> >+		 * both compilers and for DEC Alpha.
> >+		 */
> >+
> >+		idx = rcu_ctrlblk.completed & 0x1;
> >		smp_read_barrier_depends();
> >-		current->rcu_flipctr1 = 
> >&(__get_cpu_var(rcu_flipctr)[flipctr]);
> >-		/* Can optimize to non-atomic on fastpath, but start simple. 
> >*/
> >-		atomic_inc(current->rcu_flipctr1);
> >-		smp_mb__after_atomic_inc();  /* might optimize out... */
> >-		if (unlikely(flipctr != (rcu_ctrlblk.completed & 0x1))) {
> >-
> >-			/*
> >-			 * We raced with grace-period processing (flip).
> >-			 * Although we cannot be preempted here, there
> >-			 * could be interrupts, ECC errors and the like,
> >-			 * so just nail down both sides of the rcu_flipctr
> >-			 * array for the duration of our RCU read-side
> >-			 * critical section, preventing a second flip
> >-			 * from racing with us.  At some point, it would
> >-			 * be safe to decrement one of the counters, but
> >-			 * we have no way of knowing when that would be.
> >-			 * So just decrement them both in rcu_read_unlock().
> >-			 */
> >-
> >-			current->rcu_flipctr2 =
> >-				&(__get_cpu_var(rcu_flipctr)[!flipctr]);
> >-			/* Can again optimize to non-atomic on fastpath. */
> >-			atomic_inc(current->rcu_flipctr2);
> >-			smp_mb__after_atomic_inc();  /* might optimize 
> >out... */
> >-		}
> >+		barrier();
> >+
> >+		/*
> >+		 * Increment the per-CPU counter. NMI handlers
> >+		 * might increment it as well, but they had better
> >+		 * properly nest their rcu_read_lock()/rcu_read_unlock()
> >+		 * pairs so that the value is restored before the handler
> >+		 * returns to us.  Enforce ordering for compilers.
> >+		 */
> >+
> >+		__get_cpu_var(rcu_flipctr)[idx]++;
> >+		barrier();
> >+
> >+		/*
> >+		 * It is now safe to increment the task's nesting count.
> >+		 * NMIs that occur after this statement will route
> >+		 * their rcu_read_lock() calls through the "then" clause
> >+		 * of this "if" statement, and thus will no longer come
> >+		 * through this path.  Enforce ordering for compilers.
> >+		 */
> >+
> >+		current->rcu_read_lock_nesting = nesting + 1;
> >+		barrier();
> >+
> >+		/*
> >+		 * It is now safe to store the index into our task
> >+		 * structure.  Doing so earlier would have resulted
> >+		 * in it getting clobbered by NMI handlers.
> >+		 */
> >+
> >+		current->rcu_flipctr_idx = idx;
> >	}
> >+
> >	trace_special((unsigned long) current->rcu_flipctr1,
> >		      (unsigned long) current->rcu_flipctr2,
> >		      rcu_ctrlblk.completed);
> >-	local_irq_restore(oldirq);
> >+	raw_local_irq_restore(oldirq);
> >}
> >
> >void
> >rcu_read_unlock(void)
> >{
> >+	int idx;
> >+	int nesting;
> >	unsigned long oldirq;
> >
> >-	local_irq_save(oldirq);
> >+	raw_local_irq_save(oldirq);
> >	trace_special((unsigned long) current->rcu_flipctr1,
> >		      (unsigned long) current->rcu_flipctr2,
> >		      current->rcu_read_lock_nesting);
> >-	if (--current->rcu_read_lock_nesting == 0) {
> >+
> >+	nesting = current->rcu_read_lock_nesting;
> >+
> >+	/*
> >+	 * Any rcu_read_lock()s called from NMI handlers
> >+	 * at any point must have matching rcu_read_unlock()
> >+	 * calls in that same handler, so we will not see the
> >+	 * value of current->rcu_read_lock_nesting change.
> >+	 */
> >+
> >+	if (nesting > 1) {
> >
> >		/*
> >-		 * Just atomically decrement whatever we incremented.
> >-		 * Might later want to awaken some task waiting for the
> >-		 * grace period to complete, but keep it simple for the
> >-		 * moment.
> >+		 * There is an enclosing rcu_read_lock(), so all we
> >+		 * need to do is to decrement the counter.
> >		 */
> >
> >-		smp_mb__before_atomic_dec();
> >-		atomic_dec(current->rcu_flipctr1);
> >-		current->rcu_flipctr1 = NULL;
> >-		if (unlikely(current->rcu_flipctr2 != NULL)) {
> >-			atomic_dec(current->rcu_flipctr2);
> >-			current->rcu_flipctr2 = NULL;
> >-		}
> >+		current->rcu_read_lock_nesting = nesting - 1;
> >+
> >+	} else {
> >+
> >+		/*
> >+		 * Outermost nesting of rcu_read_unlock(), so we must
> >+		 * decrement the current counter for the current CPU.
> >+		 * This must be done carefully, because NMIs can
> >+		 * occur at any point in this code, and any rcu_read_lock()
> >+		 * and rcu_read_unlock() pairs in the NMI handlers
> >+		 * must interact non-destructively with this code.
> >+		 * Lots of barrier() calls, and -very- careful ordering.
> >+		 *
> >+		 * Changes to this code, including this one, must be
> >+		 * inspected, validated, and tested extremely carefully!!!
> >+		 */
> >+
> >+		/*
> >+		 * First, pick up the index.  Enforce ordering for
> >+		 * both compilers and for DEC Alpha.
> >+		 */
> >+
> >+		idx = current->rcu_flipctr_idx;
> >+		smp_read_barrier_depends();
> >+		barrier();
> >+
> >+		/*
> >+		 * It is now safe to decrement the task's nesting count.
> >+		 * NMIs that occur after this statement will route
> >+		 * their rcu_read_lock() calls through this "else" clause
> >+		 * of this "if" statement, and thus will start incrementing
> >+		 * the per-CPU counter on their own.  Enforce ordering for
> >+		 * compilers.
> >+		 */
> >+
> >+		current->rcu_read_lock_nesting = nesting - 1;
> >+		barrier();
> >+
> >+		/*
> >+		 * Decrement the per-CPU counter. NMI handlers
> >+		 * might increment it as well, but they had better
> >+		 * properly nest their rcu_read_lock()/rcu_read_unlock()
> >+		 * pairs so that the value is restored before the handler
> >+		 * returns to us.
> >+		 */
> >+
> >+		__get_cpu_var(rcu_flipctr)[idx]--;
> >	}
> >
> >	trace_special((unsigned long)current->rcu_flipctr1,
> >		      (unsigned long) current->rcu_flipctr2,
> >		      current->rcu_read_lock_nesting);
> >-	local_irq_restore(oldirq);
> >+	raw_local_irq_restore(oldirq);
> >}
> >
> >static void
> >__rcu_advance_callbacks(void)
> >{
> >
> >-	if (rcu_data.completed != rcu_ctrlblk.completed) {
> >+	if ((rcu_data.completed >> 1) != (rcu_ctrlblk.completed >> 1)) {
> >		if (rcu_data.waitlist != NULL) {
> >			*rcu_data.donetail = rcu_data.waitlist;
> >			rcu_data.donetail = rcu_data.waittail;
> >@@ -216,13 +358,186 @@ __rcu_advance_callbacks(void)
> >			rcu_data.waittail = &rcu_data.waitlist;
> >		}
> >		rcu_data.completed = rcu_ctrlblk.completed;
> >+	} else if (rcu_data.completed != rcu_ctrlblk.completed) {
> >+		rcu_data.completed = rcu_ctrlblk.completed;
> >+	}
> >+}
> >+
> >+/*
> >+ * Get here when RCU is idle.  Decide whether we need to
> >+ * move out of idle state, and return zero if so.
> >+ * "Straightforward" approach for the moment, might later
> >+ * use callback-list lengths, grace-period duration, or
> >+ * some such to determine when to exit idle state.
> >+ * Might also need a pre-idle test that does not acquire
> >+ * the lock, but let's get the simple case working first...
> >+ */
> >+
> >+static int
> >+rcu_try_flip_idle(int flipctr)
> >+{
> >+#ifdef CONFIG_RCU_STATS
> >+	rcu_data.n_rcu_try_flip_i1++;
> >+#endif /* #ifdef CONFIG_RCU_STATS */
> >+
> >+	if (!rcu_pending(smp_processor_id())) {
> >+#ifdef CONFIG_RCU_STATS
> >+		rcu_data.n_rcu_try_flip_ie1++;
> >+#endif /* #ifdef CONFIG_RCU_STATS */
> >+		return 1;
> >	}
> >+	return 0;
> >+}
> >+
> >+/*
> >+ * Flip processing up to and including the flip, as well as
> >+ * telling CPUs to acknowledge the flip.
> >+ */
> >+
> >+static int
> >+rcu_try_flip_in_gp(int flipctr)
> >+{
> >+	int cpu;
> >+
> >+#ifdef CONFIG_RCU_STATS
> >+	rcu_data.n_rcu_try_flip_g1++;
> >+#endif /* #ifdef CONFIG_RCU_STATS */
> >+
> >+	/*
> >+	 * Do the flip.
> >+	 */
> >+
> >+	rcu_ctrlblk.completed++;  /* stands in for rcu_try_flip_g2 */
> >+
> >+	/*
> >+	 * Need a memory barrier so that other CPUs see the new
> >+	 * counter value before they see the subsequent change of all
> >+	 * the rcu_flip_flag instances to rcu_flipped.
> >+	 */
> >+
> >+	smp_mb();
> >+
> >+	/* Now ask each CPU for acknowledgement of the flip. */
> >+
> >+	for_each_cpu(cpu)
> >+		per_cpu(rcu_flip_flag, cpu) = rcu_flipped;
> >+
> >+	return 0;
> >+}
> >+
> >+/*
> >+ * Wait for CPUs to acknowledge the flip.
> >+ */
> >+
> >+static int
> >+rcu_try_flip_waitack(int flipctr)
> >+{
> >+	int cpu;
> >+
> >+#ifdef CONFIG_RCU_STATS
> >+	rcu_data.n_rcu_try_flip_a1++;
> >+#endif /* #ifdef CONFIG_RCU_STATS */
> >+
> >+	for_each_cpu(cpu)
> >+		if (per_cpu(rcu_flip_flag, cpu) != rcu_flip_seen) {
> >+#ifdef CONFIG_RCU_STATS
> >+			rcu_data.n_rcu_try_flip_ae1++;
> >+#endif /* #ifdef CONFIG_RCU_STATS */
> >+			return 1;
> >+		}
> >+
> >+	/*
> >+	 * Make sure our checks above don't bleed into subsequent
> >+	 * waiting for the sum of the counters to reach zero.
> >+	 */
> >+
> >+	smp_mb();
> >+
> >+#ifdef CONFIG_RCU_STATS
> >+	rcu_data.n_rcu_try_flip_a2++;
> >+#endif /* #ifdef CONFIG_RCU_STATS */
> >+
> >+	return 0;
> >+}
> >+
> >+/*
> >+ * Wait for collective ``last'' counter to reach zero,
> >+ * then tell all CPUs to do an end-of-grace-period memory barrier.
> >+ */
> >+
> >+static int
> >+rcu_try_flip_waitzero(int flipctr)
> >+{
> >+	int cpu;
> >+	int lastidx = !(flipctr & 0x1);
> >+	int sum = 0;
> >+
> >+#ifdef CONFIG_RCU_STATS
> >+	rcu_data.n_rcu_try_flip_z1++;
> >+#endif /* #ifdef CONFIG_RCU_STATS */
> >+
> >+	/* Check to see if the sum of the "last" counters is zero. */
> >+
> >+	for_each_cpu(cpu)
> >+		sum += per_cpu(rcu_flipctr, cpu)[lastidx];
> >+	if (sum != 0) {
> >+#ifdef CONFIG_RCU_STATS
> >+		rcu_data.n_rcu_try_flip_ze1++;
> >+#endif /* #ifdef CONFIG_RCU_STATS */
> >+		return 1;
> >+	}
> >+
> >+#ifdef CONFIG_RCU_STATS
> >+	rcu_data.n_rcu_try_flip_z2++;
> >+#endif /* #ifdef CONFIG_RCU_STATS */
> >+
> >+	/* Make sure we don't call for memory barriers before we see zero. */
> >+
> >+	smp_mb();
> >+
> >+	/* Call for a memory barrier from each CPU. */
> >+
> >+	for_each_cpu(cpu)
> >+		per_cpu(rcu_mb_flag, cpu) = rcu_mb_needed;
> >+
> >+	return 0;
> >+}
> >+
> >+/*
> >+ * Wait for all CPUs to do their end-of-grace-period memory barrier.
> >+ * Return 0 once all CPUs have done so.
> >+ */
> >+
> >+static int
> >+rcu_try_flip_waitmb(int flipctr)
> >+{
> >+	int cpu;
> >+
> >+#ifdef CONFIG_RCU_STATS
> >+	rcu_data.n_rcu_try_flip_m1++;
> >+#endif /* #ifdef CONFIG_RCU_STATS */
> >+
> >+	for_each_cpu(cpu)
> >+		if (per_cpu(rcu_mb_flag, cpu) != rcu_mb_done) {
> >+#ifdef CONFIG_RCU_STATS
> >+			rcu_data.n_rcu_try_flip_me1++;
> >+#endif /* #ifdef CONFIG_RCU_STATS */
> >+			return 1;
> >+		}
> >+
> >+	smp_mb(); /* Ensure that the above checks precede any following 
> >flip. */
> >+
> >+#ifdef CONFIG_RCU_STATS
> >+	rcu_data.n_rcu_try_flip_m2++;
> >+#endif /* #ifdef CONFIG_RCU_STATS */
> >+
> >+	return 0;
> >}
> >
> >/*
> > * Attempt a single flip of the counters.  Remember, a single flip does
> > * -not- constitute a grace period.  Instead, the interval between
> >- * a pair of consecutive flips is a grace period.
> >+ * at least three consecutive flips is a grace period.
> > *
> > * If anyone is nuts enough to run this CONFIG_PREEMPT_RCU implementation
> > * on a large SMP, they might want to use a hierarchical organization of
> >@@ -231,13 +546,11 @@ __rcu_advance_callbacks(void)
> >static void
> >rcu_try_flip(void)
> >{
> >-	int cpu;
> >	long flipctr;
> >	unsigned long oldirq;
> >
> >-	flipctr = rcu_ctrlblk.completed;
> >#ifdef CONFIG_RCU_STATS
> >-	atomic_inc(&rcu_data.n_rcu_try_flip1);
> >+	atomic_inc(&rcu_data.n_rcu_try_flip_1);
> >#endif /* #ifdef CONFIG_RCU_STATS */
> >	if (unlikely(!spin_trylock_irqsave(&rcu_ctrlblk.fliplock, oldirq))) {
> >#ifdef CONFIG_RCU_STATS
> >@@ -245,52 +558,82 @@ rcu_try_flip(void)
> >#endif /* #ifdef CONFIG_RCU_STATS */
> >		return;
> >	}
> >-	if (unlikely(flipctr != rcu_ctrlblk.completed)) {
> >-
> >-		/* Our work is done!  ;-) */
> >-
> >-#ifdef CONFIG_RCU_STATS
> >-		rcu_data.n_rcu_try_flip_e2++;
> >-#endif /* #ifdef CONFIG_RCU_STATS */
> >-		spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
> >-		return;
> >-	}
> >-	flipctr &= 0x1;
> >
> >	/*
> >-	 * Check for completion of all RCU read-side critical sections
> >-	 * that started prior to the previous flip.
> >+	 * Take the next transition(s) through the RCU grace-period
> >+	 * flip-counter state machine.
> >	 */
> >
> >-#ifdef CONFIG_RCU_STATS
> >-	rcu_data.n_rcu_try_flip2++;
> >-#endif /* #ifdef CONFIG_RCU_STATS */
> >-	for_each_cpu(cpu) {
> >-		if (atomic_read(&per_cpu(rcu_flipctr, cpu)[!flipctr]) != 0) {
> >-#ifdef CONFIG_RCU_STATS
> >-			rcu_data.n_rcu_try_flip_e3++;
> >-#endif /* #ifdef CONFIG_RCU_STATS */
> >-			spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, 
> >oldirq);
> >-			return;
> >-		}
> >+	flipctr = rcu_ctrlblk.completed;
> >+	switch (rcu_try_flip_state) {
> >+	case rcu_try_flip_idle_state:
> >+		if (rcu_try_flip_idle(flipctr))
> >+			break;
> >+		rcu_try_flip_state = rcu_try_flip_in_gp_state;
> >+	case rcu_try_flip_in_gp_state:
> >+		if (rcu_try_flip_in_gp(flipctr))
> >+			break;
> >+		rcu_try_flip_state = rcu_try_flip_waitack_state;
> >+	case rcu_try_flip_waitack_state:
> >+		if (rcu_try_flip_waitack(flipctr))
> >+			break;
> >+		rcu_try_flip_state = rcu_try_flip_waitzero_state;
> >+	case rcu_try_flip_waitzero_state:
> >+		if (rcu_try_flip_waitzero(flipctr))
> >+			break;
> >+		rcu_try_flip_state = rcu_try_flip_waitmb_state;
> >+	case rcu_try_flip_waitmb_state:
> >+		if (rcu_try_flip_waitmb(flipctr))
> >+			break;
> >+		rcu_try_flip_state = rcu_try_flip_idle_state;
> >	}
> >+	spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
> >+}
> >
> >-	/* Do the flip. */
> >+/*
> >+ * Check to see if this CPU needs to report that it has seen the most
> >+ * recent counter flip, thereby declaring that all subsequent
> >+ * rcu_read_lock() invocations will respect this flip.
> >+ */
> >
> >-	smp_mb();
> >-	rcu_ctrlblk.completed++;
> >+static void
> >+rcu_check_flipseen(int cpu)
> >+{
> >+	if (per_cpu(rcu_flip_flag, cpu) == rcu_flipped) {
> >+		smp_mb();  /* Subsequent counter acccesses must see new 
> >value */
> >+		per_cpu(rcu_flip_flag, cpu) = rcu_flip_seen;
> >+		smp_mb();  /* probably be implied by interrupt, but... */
> >+	}
> >+}
> >
> >-#ifdef CONFIG_RCU_STATS
> >-	rcu_data.n_rcu_try_flip3++;
> >-#endif /* #ifdef CONFIG_RCU_STATS */
> >-	spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
> >+/*
> >+ * Check to see if this CPU needs to do a memory barrier in order to
> >+ * ensure that any prior RCU read-side critical sections have committed
> >+ * their counter manipulations and critical-section memory references
> >+ * before declaring the grace period to be completed.
> >+ */
> >+
> >+static void
> >+rcu_check_mb(int cpu)
> >+{
> >+	if (per_cpu(rcu_mb_flag, cpu) == rcu_mb_needed) {
> >+		smp_mb();
> >+		per_cpu(rcu_mb_flag, cpu) = rcu_mb_done;
> >+	}
> >}
> >
> >+/*
> >+ * This function is called periodically on each CPU in hardware interrupt
> >+ * context.
> >+ */
> >+
> >void
> >rcu_check_callbacks(int cpu, int user)
> >{
> >	unsigned long oldirq;
> >
> >+	rcu_check_flipseen(cpu);
> >+	rcu_check_mb(cpu);
> >	if (rcu_ctrlblk.completed == rcu_data.completed) {
> >		rcu_try_flip();
> >		if (rcu_ctrlblk.completed == rcu_data.completed) {
> >@@ -359,10 +702,10 @@ call_rcu(struct rcu_head *head,
> >}
> >
> >/*
> >- * Crude hack, reduces but does not eliminate possibility of failure.
> >- * Needs to wait for all CPUs to pass through a -voluntary- context
> >- * switch to eliminate possibility of failure.  (Maybe just crank
> >- * priority down...)
> >+ * Wait until all currently running preempt_disable() code segments
> >+ * (including hardware-irq-disable segments) complete.  Note that
> >+ * in -rt this does -not- necessarily result in all currently executing
> >+ * interrupt -handlers- having completed.
> > */
> >void
> >synchronize_sched(void)
> >@@ -390,7 +733,7 @@ rcu_pending(int cpu)
> >
> >void __init rcu_init(void)
> >{
> >-/*&&&&*/printk("WARNING: experimental RCU implementation.\n");
> >+/*&&&&*/printk("WARNING: experimental non-atomic RCU implementation.\n");
> >	spin_lock_init(&rcu_data.lock);
> >	rcu_data.completed = 0;
> >	rcu_data.nextlist = NULL;
> >@@ -416,7 +759,8 @@ int rcu_read_proc_data(char *page)
> >	return sprintf(page,
> >		       "ggp=%ld lgp=%ld rcc=%ld\n"
> >		       "na=%ld nl=%ld wa=%ld wl=%ld da=%ld dl=%ld dr=%ld 
> >		       di=%d\n"
> >-		       "rtf1=%d rtf2=%ld rtf3=%ld rtfe1=%d rtfe2=%ld 
> >rtfe3=%ld\n",
> >+		       "1=%d e1=%d i1=%ld ie1=%ld g1=%ld a1=%ld ae1=%ld 
> >a2=%ld\n"
> >+		       "z1=%ld ze1=%ld z2=%ld m1=%ld me1=%ld m2=%ld\n",
> >
> >		       rcu_ctrlblk.completed,
> >		       rcu_data.completed,
> >@@ -431,12 +775,20 @@ int rcu_read_proc_data(char *page)
> >		       rcu_data.n_done_remove,
> >		       atomic_read(&rcu_data.n_done_invoked),
> >
> >-		       atomic_read(&rcu_data.n_rcu_try_flip1),
> >-		       rcu_data.n_rcu_try_flip2,
> >-		       rcu_data.n_rcu_try_flip3,
> >+		       atomic_read(&rcu_data.n_rcu_try_flip_1),
> >		       atomic_read(&rcu_data.n_rcu_try_flip_e1),
> >-		       rcu_data.n_rcu_try_flip_e2,
> >-		       rcu_data.n_rcu_try_flip_e3);
> >+		       rcu_data.n_rcu_try_flip_i1,
> >+		       rcu_data.n_rcu_try_flip_ie1,
> >+		       rcu_data.n_rcu_try_flip_g1,
> >+		       rcu_data.n_rcu_try_flip_a1,
> >+		       rcu_data.n_rcu_try_flip_ae1,
> >+		       rcu_data.n_rcu_try_flip_a2,
> >+		       rcu_data.n_rcu_try_flip_z1,
> >+		       rcu_data.n_rcu_try_flip_ze1,
> >+		       rcu_data.n_rcu_try_flip_z2,
> >+		       rcu_data.n_rcu_try_flip_m1,
> >+		       rcu_data.n_rcu_try_flip_me1,
> >+		       rcu_data.n_rcu_try_flip_m2);
> >}
> >
> >int rcu_read_proc_gp_data(char *page)
> >@@ -464,14 +816,23 @@ int rcu_read_proc_ctrs_data(char *page)
> >	int cpu;
> >	int f = rcu_data.completed & 0x1;
> >
> >-	cnt += sprintf(&page[cnt], "CPU last cur\n");
> >+	cnt += sprintf(&page[cnt], "CPU last cur F M\n");
> >	for_each_cpu(cpu) {
> >-		cnt += sprintf(&page[cnt], "%3d %4d %3d\n",
> >+		cnt += sprintf(&page[cnt], "%3d %4d %3d %d %d\n",
> >			       cpu,
> >-			       atomic_read(&per_cpu(rcu_flipctr, cpu)[!f]),
> >-			       atomic_read(&per_cpu(rcu_flipctr, cpu)[f]));
> >-	}
> >-	cnt += sprintf(&page[cnt], "ggp = %ld\n", rcu_data.completed);
> >+			       per_cpu(rcu_flipctr, cpu)[!f],
> >+			       per_cpu(rcu_flipctr, cpu)[f],
> >+			       per_cpu(rcu_flip_flag, cpu),
> >+			       per_cpu(rcu_mb_flag, cpu));
> >+	}
> >+	cnt += sprintf(&page[cnt], "ggp = %ld, state = %d",
> >+		       rcu_data.completed, rcu_try_flip_state);
> >+	if ((0 <= rcu_try_flip_state) &&
> >+	    (rcu_try_flip_state <= sizeof(rcu_try_flip_state_names) /
> >+	    			   sizeof(rcu_try_flip_state_names[0])))
> >+		cnt += sprintf(&page[cnt], " (%s)",
> >+			       rcu_try_flip_state_names[rcu_try_flip_state]);
> >+	cnt += sprintf(&page[cnt], "\n");
> >	return (cnt);
> >}
> >
> >

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [RFC, PATCH -rt] NMI-safe mb- and atomic-free RT RCU
  2006-07-27  1:39   ` Paul E. McKenney
@ 2006-07-27 11:00     ` Esben Nielsen
  2006-07-27 15:46       ` Paul E. McKenney
  0 siblings, 1 reply; 13+ messages in thread
From: Esben Nielsen @ 2006-07-27 11:00 UTC (permalink / raw)
  To: Paul E. McKenney
  Cc: Esben Nielsen, linux-kernel, compudj, billh, rostedt, tglx, mingo,
	dipankar, rusty

Hi Paul,
  Thanks, for entering a discussion of my idea! Even though you are busy 
and are critical towards my idea, you take your time to answer! Thanks.


On Wed, 26 Jul 2006, Paul E. McKenney wrote:

> On Thu, Jul 27, 2006 at 02:39:07AM +0100, Esben Nielsen wrote:
>>
>>
>> On Tue, 25 Jul 2006, Paul E. McKenney wrote:
>>
>>> Not for inclusion, should be viewed with great suspicion.
>>>
>>> This patch provides an NMI-safe realtime RCU.  Hopefully, Mathieu can
>>> make use of synchronize_sched() instead, since I have not yet figured
>>> out a way to make this NMI-safe and still get rid of the interrupt
>>> disabling.  ;-)
>>>
>>> 						Thanx, Paul
>>
>> I must say I don't understand all this. It looks very complicated. Is it
>> really needed?
>>
>> I have been thinking about the following design:
>>
>> void rcu_read_lock()
>> {
>> 	if (!in_interrupt())
>> 		current->rcu_read_lock_count++;
>> }
>> void rcu_read_unlock()
>> {
>> 	if (!in_interrupt())
>> 		current->rcu_read_lock_count--;
>> }
>>
>> Somewhere in schedule():
>>
>> 	rq->rcu_read_lock_count += prev->rcu_read_lock_count;
>> 	if (!rq->rcu_read_lock_count)
>> 		forward_waiting_rcu_jobs();
>> 	rq->rcu_read_lock_count -= next->rcu_read_lock_count;
>
> So rq->rcu_read_lock_count contains the sum of the counts of all
> tasks that were scheduled away from this CPU.
>
> What happens in face of the following sequence of events?
> Assume that all tasks stay on CPU 0 for the moment.
>
> o	Task A does rcu_read_lock().
>
> o	Task A is preempted.  rq->rcu_read_lock_count is nonzero.
>
> o	Task B runs and does rcu_read_lock().
>
> o	Task B is preempted (perhaps because it dropped the lock
> 	that was causing it to have high priority).  Regardless of
> 	the reason for the preemption, rq->rcu_read_lock_count is
> 	nonzero.
>
> o	Task C runs and does rcu_read_lock().
>
> o	Task C is preempted.  rq->rcu_read_lock_count is nonzero.
>
> o	Task A runs again, and does rcu_read_unlock().
>
> o	Task A is preempted.  rq->rcu_read_lock_count is nonzero.
>
> o	Task D runs and does rcu_read_lock().
>
> o	Task D is preempted.  rq->rcu_read_lock_count is nonzero.
>
> And so on.  As long as at least one of the preempted tasks is in an
> RCU critical section, you never do your forward_waiting_rcu_jobs(),
> and the grace period goes on forever.  Or at least until you OOM the
> machine.
>
> So what am I missing here?

The boosting idea below. Then tasks A-D must be RT tasks for this to 
happen. And the machine must anyway run out of RT tasks or it will 
effectively lock up.

>
> I could imagine introducing a pair of counters per runqueue, but then
> we end up with the same issues with counter-flip that we have now.
>
> Another question -- what happens if a given CPU stays idle?  Wouldn't
> the callbacks then just start piling up on that CPU?

How can a CPU stay idle? There is a tick every 2.5 ms. Even without that 
the previous CPU can make it schedule if it sees the jobs piling up. Or if 
that is considered too expensive, it can take over and forward the 
jobs to the next CPU.

>
>> Now what should forward_waiting_rcu_jobs() do?
>>
>> I imagine a circular datastructur of all the CPUs. When a call_rcu() is
>> run on a CPU it is first added a list of jobs for that CPU. When
>> forward_waiting_rcu_jobs() is called all the pending jobs are forwarded to
>> the next CPU. The next CPU will bring it along the next CPU in the circle
>> along with it's own jobs. When jobs hit the original CPU they will be
>> executed. Or rather, when the CPU just before calls
>> forward_waiting_rcu_jobs(), it sends the jobs belonging to the next CPU to
>> the RCU-task of the next CPU, where they will be executed, instead of to
>> the scheduler (runqueue) of the next CPU, where it will just be send out on
>> a
>> new roundtrip along the circle.
>>
>> If you use a structure like the plist then the forwarding procedure can be
>> done in O(number of online CPUs) time worst case - much less in the usual
>> case where the lists are almost empty.
>>
>> Now the problem is: What happens if a task in a rcu read-side lock is
>> migrated? Then the rcu_read_lock_count on the source CPU will stay in plus
>> while on the target CPU it will go in minus. This ought to be simply
>> fixeable by adding task->rcu_read_lock_count to the target runqueue before
>> migrating and subtracting it from the old runqueue after migrating. But
>> there is another problem: RCU-jobs refering to data used by the task being
>> migrated might have been forwarded from the target CPU. Thus the migration
>> task have to go back along the circle of CPUs and move all the relevant
>> RCU-jobs back to the target CPU to be forwarded again. This is also doable
>> in
>> number of CPUs between source and target times O(<number of online CPUs>)
>> (see above) time.
>
> So if I have the right (or wrong) pattern of task migrations, the RCU
> callbacks never get to their originating CPU?
>

In principle, yes. But if the machine starts to migrate tasks that wildly 
it wont get any work done anyway, because all it's time is done doing 
migration.

> Alternatively, if the task residing in the RCU read-side critical section
> is forwarded around the loop of CPUs, callbacks circulating around this
> loop might execute before the RCU read-side critical section completes.
>

That is why some of the callbacks (those which has parsed the target CPU 
but not yet the source CPU) have to be moved back to the target CPU.

I just came up with an even simpler solution:
Delay the subtraction of the task->rcu_read_lock_count from 
srcrq->rcu_read_lock_count until the task calls rcu_read_unlock(). That 
can be done by flagging the task (do task->rcu_read_lock_count |= 
0x80000000) and do a simple
 	if (unlickely(current->rcu_read_lock_count == 0x80000000))
 		fix_rcu_read_lock_count_on_old_cpu();
in rcu_read_unlock(). Now the task can be migrated again before calloing 
fix_rcu_read_lock_count_on_old_cpu(). The relevant RCU jobs still can't 
get past the original CPU before the task have called 
fix_rcu_read_lock_count_on_old_cpu(), so all subsequent migrations can just
do the count down on the intermediate CPUs right away.

>> To avoid a task in a read-side lock being starved for too long the
>> following line can be added to normal_prio():
>>   if (p->rcu_read_lock_count)
>> 	p->prio = MAX_RT_PRIO;
>
> But doesn't this have the same impact on latency as disabling preemption
> in rcu_read_lock() and then re-enabling it in rcu_read_unlock()?
>

No, RT tasks can still preempt the RCU read side lock. But SCHED_OTHER and 
SCHED_BATCH can't. You can also the RCU read side boosting prioritiy 
dynamic and let the system adjust it or just let the admin adjust it.

> Also, doesn't this circular data structure need to handle CPU hotplug?
>

Ofcourse. I don't know about hotplug though. But it sounds simple to 
migrate the tasks away, take the CPU out of the circle and then forward 
the last RCU jobs from that CPU.

>> I don't have time to code this nor a SMP machine to test it on. But I can
>> give the idea to you anyways in the hope you might code it :-)
>
> I am beginning to think that it will not be at all simple by the time I
> code up all the required fixups.  Or am I missing something?

Ofcourse, implementing something is always a lot harder than writing the 
idea down. Anyway, we already worked out some of the hardest details :-)

Esben

>
> 							Thanx, Paul
>
>> Esben
>>
>>
>>>
>>> Signed-off-by: Paul E. McKenney <paulmck@us.ibm.com> (but please don't
>>> apply)
>>>
>>> include/linux/sched.h |    3
>>> kernel/rcupreempt.c   |  577
>>> ++++++++++++++++++++++++++++++++++++++++----------
>>> 2 files changed, 470 insertions(+), 110 deletions(-)
>>>
>>> diff -urpNa -X dontdiff linux-2.6.17-rt7/include/linux/sched.h
>>> linux-2.6.17-rt7-norrupt/include/linux/sched.h
>>> --- linux-2.6.17-rt7/include/linux/sched.h	2006-07-19
>>> 01:43:09.000000000 -0700
>>> +++ linux-2.6.17-rt7-norrupt/include/linux/sched.h	2006-07-22
>>> 15:44:36.000000000 -0700
>>> @@ -868,8 +868,7 @@ struct task_struct {
>>>
>>> #ifdef CONFIG_PREEMPT_RCU
>>> 	int rcu_read_lock_nesting;
>>> -	atomic_t *rcu_flipctr1;
>>> -	atomic_t *rcu_flipctr2;
>>> +	int rcu_flipctr_idx;
>>> #endif
>>> #ifdef CONFIG_SCHEDSTATS
>>> 	struct sched_info sched_info;
>>> diff -urpNa -X dontdiff linux-2.6.17-rt7/kernel/rcupreempt.c
>>> linux-2.6.17-rt7-norrupt/kernel/rcupreempt.c
>>> --- linux-2.6.17-rt7/kernel/rcupreempt.c	2006-07-19
>>> 01:43:09.000000000 -0700
>>> +++ linux-2.6.17-rt7-norrupt/kernel/rcupreempt.c	2006-07-22
>>> 20:21:46.000000000 -0700
>>> @@ -15,11 +15,13 @@
>>> * along with this program; if not, write to the Free Software
>>> * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
>>> USA.
>>> *
>>> - * Copyright (C) IBM Corporation, 2001
>>> + * Copyright (C) IBM Corporation, 2006
>>> *
>>> * Authors: Paul E. McKenney <paulmck@us.ibm.com>
>>> *		With thanks to Esben Nielsen, Bill Huey, and Ingo Molnar
>>> - *		for pushing me away from locks and towards counters.
>>> + *		for pushing me away from locks and towards counters, and
>>> + *		to Suparna Bhattacharya for pushing me completely away
>>> + *		from atomic instructions on the read side.
>>> *
>>> * Papers:  http://www.rdrop.com/users/paulmck/RCU
>>> *
>>> @@ -73,12 +75,20 @@ struct rcu_data {
>>> 	long		n_done_remove;
>>> 	atomic_t	n_done_invoked;
>>> 	long		n_rcu_check_callbacks;
>>> -	atomic_t	n_rcu_try_flip1;
>>> -	long		n_rcu_try_flip2;
>>> -	long		n_rcu_try_flip3;
>>> +	atomic_t	n_rcu_try_flip_1;
>>> 	atomic_t	n_rcu_try_flip_e1;
>>> -	long		n_rcu_try_flip_e2;
>>> -	long		n_rcu_try_flip_e3;
>>> +	long		n_rcu_try_flip_i1;
>>> +	long		n_rcu_try_flip_ie1;
>>> +	long		n_rcu_try_flip_g1;
>>> +	long		n_rcu_try_flip_a1;
>>> +	long		n_rcu_try_flip_ae1;
>>> +	long		n_rcu_try_flip_a2;
>>> +	long		n_rcu_try_flip_z1;
>>> +	long		n_rcu_try_flip_ze1;
>>> +	long		n_rcu_try_flip_z2;
>>> +	long		n_rcu_try_flip_m1;
>>> +	long		n_rcu_try_flip_me1;
>>> +	long		n_rcu_try_flip_m2;
>>> #endif /* #ifdef CONFIG_RCU_STATS */
>>> };
>>> struct rcu_ctrlblk {
>>> @@ -90,8 +100,51 @@ static struct rcu_ctrlblk rcu_ctrlblk =
>>> 	.fliplock = RAW_SPIN_LOCK_UNLOCKED,
>>> 	.completed = 0,
>>> };
>>> -static DEFINE_PER_CPU(atomic_t [2], rcu_flipctr) =
>>> -	{ ATOMIC_INIT(0), ATOMIC_INIT(0) };
>>> +static DEFINE_PER_CPU(int [2], rcu_flipctr) = { 0, 0 };
>>> +
>>> +/*
>>> + * States for rcu_try_flip() and friends.
>>> + */
>>> +
>>> +enum rcu_try_flip_states {
>>> +	rcu_try_flip_idle_state,	/* "I" */
>>> +	rcu_try_flip_in_gp_state,	/* "G" */
>>> +	rcu_try_flip_waitack_state, 	/* "A" */
>>> +	rcu_try_flip_waitzero_state,	/* "Z" */
>>> +	rcu_try_flip_waitmb_state	/* "M" */
>>> +};
>>> +static enum rcu_try_flip_states rcu_try_flip_state =
>>> rcu_try_flip_idle_state;
>>> +#ifdef CONFIG_RCU_STATS
>>> +static char *rcu_try_flip_state_names[] =
>>> +	{ "idle", "gp", "waitack", "waitzero", "waitmb" };
>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>> +
>>> +/*
>>> + * Enum and per-CPU flag to determine when each CPU has seen
>>> + * the most recent counter flip.
>>> + */
>>> +
>>> +enum rcu_flip_flag_values {
>>> +	rcu_flip_seen,		/* Steady/initial state, last flip seen. */
>>> +				/* Only GP detector can update. */
>>> +	rcu_flipped		/* Flip just completed, need confirmation. */
>>> +				/* Only corresponding CPU can update. */
>>> +};
>>> +static DEFINE_PER_CPU(enum rcu_flip_flag_values, rcu_flip_flag) =
>>> rcu_flip_seen;
>>> +
>>> +/*
>>> + * Enum and per-CPU flag to determine when each CPU has executed the
>>> + * needed memory barrier to fence in memory references from its last RCU
>>> + * read-side critical section in the just-completed grace period.
>>> + */
>>> +
>>> +enum rcu_mb_flag_values {
>>> +	rcu_mb_done,		/* Steady/initial state, no mb()s required.
>>> */
>>> +				/* Only GP detector can update. */
>>> +	rcu_mb_needed		/* Flip just completed, need an mb(). */
>>> +				/* Only corresponding CPU can update. */
>>> +};
>>> +static DEFINE_PER_CPU(enum rcu_mb_flag_values, rcu_mb_flag) = rcu_mb_done;
>>>
>>> /*
>>> * Return the number of RCU batches processed thus far.  Useful
>>> @@ -105,93 +158,182 @@ long rcu_batches_completed(void)
>>> void
>>> rcu_read_lock(void)
>>> {
>>> -	int flipctr;
>>> +	int idx;
>>> +	int nesting;
>>> 	unsigned long oldirq;
>>>
>>> -	local_irq_save(oldirq);
>>> -
>>> +	raw_local_irq_save(oldirq);
>>> 	trace_special(current->rcu_read_lock_nesting,
>>> 		      (unsigned long) current->rcu_flipctr1,
>>> 		      rcu_ctrlblk.completed);
>>> -	if (current->rcu_read_lock_nesting++ == 0) {
>>> +
>>> +	nesting = current->rcu_read_lock_nesting;
>>> +
>>> +	/*
>>> +	 * Any rcu_read_lock()s called from NMI handlers
>>> +	 * at any point must have matching rcu_read_unlock()
>>> +	 * calls in that same handler, so we will not see the
>>> +	 * value of current->rcu_read_lock_nesting change.
>>> +	 */
>>> +
>>> +	if (nesting != 0) {
>>>
>>> 		/*
>>> -		 * Outermost nesting of rcu_read_lock(), so atomically
>>> +		 * There is an enclosing rcu_read_lock(), so all we
>>> +		 * need to do is to increment the counter.
>>> +		 */
>>> +
>>> +		current->rcu_read_lock_nesting = nesting + 1;
>>> +
>>> +	} else {
>>> +
>>> +		/*
>>> +		 * Outermost nesting of rcu_read_lock(), so we must
>>> 		 * increment the current counter for the current CPU.
>>> +		 * This must be done carefully, because NMIs can
>>> +		 * occur at any point in this code, and any rcu_read_lock()
>>> +		 * and rcu_read_unlock() pairs in the NMI handlers
>>> +		 * must interact non-destructively with this code.
>>> +		 * Lots of barrier() calls, and -very- careful ordering.
>>> +		 *
>>> +		 * Changes to this code, including this one, must be
>>> +		 * inspected, validated, and tested extremely carefully!!!
>>> 		 */
>>>
>>> -		flipctr = rcu_ctrlblk.completed & 0x1;
>>> +		/*
>>> +		 * First, pick up the index.  Enforce ordering for
>>> +		 * both compilers and for DEC Alpha.
>>> +		 */
>>> +
>>> +		idx = rcu_ctrlblk.completed & 0x1;
>>> 		smp_read_barrier_depends();
>>> -		current->rcu_flipctr1 =
>>> &(__get_cpu_var(rcu_flipctr)[flipctr]);
>>> -		/* Can optimize to non-atomic on fastpath, but start simple.
>>> */
>>> -		atomic_inc(current->rcu_flipctr1);
>>> -		smp_mb__after_atomic_inc();  /* might optimize out... */
>>> -		if (unlikely(flipctr != (rcu_ctrlblk.completed & 0x1))) {
>>> -
>>> -			/*
>>> -			 * We raced with grace-period processing (flip).
>>> -			 * Although we cannot be preempted here, there
>>> -			 * could be interrupts, ECC errors and the like,
>>> -			 * so just nail down both sides of the rcu_flipctr
>>> -			 * array for the duration of our RCU read-side
>>> -			 * critical section, preventing a second flip
>>> -			 * from racing with us.  At some point, it would
>>> -			 * be safe to decrement one of the counters, but
>>> -			 * we have no way of knowing when that would be.
>>> -			 * So just decrement them both in rcu_read_unlock().
>>> -			 */
>>> -
>>> -			current->rcu_flipctr2 =
>>> -				&(__get_cpu_var(rcu_flipctr)[!flipctr]);
>>> -			/* Can again optimize to non-atomic on fastpath. */
>>> -			atomic_inc(current->rcu_flipctr2);
>>> -			smp_mb__after_atomic_inc();  /* might optimize
>>> out... */
>>> -		}
>>> +		barrier();
>>> +
>>> +		/*
>>> +		 * Increment the per-CPU counter. NMI handlers
>>> +		 * might increment it as well, but they had better
>>> +		 * properly nest their rcu_read_lock()/rcu_read_unlock()
>>> +		 * pairs so that the value is restored before the handler
>>> +		 * returns to us.  Enforce ordering for compilers.
>>> +		 */
>>> +
>>> +		__get_cpu_var(rcu_flipctr)[idx]++;
>>> +		barrier();
>>> +
>>> +		/*
>>> +		 * It is now safe to increment the task's nesting count.
>>> +		 * NMIs that occur after this statement will route
>>> +		 * their rcu_read_lock() calls through the "then" clause
>>> +		 * of this "if" statement, and thus will no longer come
>>> +		 * through this path.  Enforce ordering for compilers.
>>> +		 */
>>> +
>>> +		current->rcu_read_lock_nesting = nesting + 1;
>>> +		barrier();
>>> +
>>> +		/*
>>> +		 * It is now safe to store the index into our task
>>> +		 * structure.  Doing so earlier would have resulted
>>> +		 * in it getting clobbered by NMI handlers.
>>> +		 */
>>> +
>>> +		current->rcu_flipctr_idx = idx;
>>> 	}
>>> +
>>> 	trace_special((unsigned long) current->rcu_flipctr1,
>>> 		      (unsigned long) current->rcu_flipctr2,
>>> 		      rcu_ctrlblk.completed);
>>> -	local_irq_restore(oldirq);
>>> +	raw_local_irq_restore(oldirq);
>>> }
>>>
>>> void
>>> rcu_read_unlock(void)
>>> {
>>> +	int idx;
>>> +	int nesting;
>>> 	unsigned long oldirq;
>>>
>>> -	local_irq_save(oldirq);
>>> +	raw_local_irq_save(oldirq);
>>> 	trace_special((unsigned long) current->rcu_flipctr1,
>>> 		      (unsigned long) current->rcu_flipctr2,
>>> 		      current->rcu_read_lock_nesting);
>>> -	if (--current->rcu_read_lock_nesting == 0) {
>>> +
>>> +	nesting = current->rcu_read_lock_nesting;
>>> +
>>> +	/*
>>> +	 * Any rcu_read_lock()s called from NMI handlers
>>> +	 * at any point must have matching rcu_read_unlock()
>>> +	 * calls in that same handler, so we will not see the
>>> +	 * value of current->rcu_read_lock_nesting change.
>>> +	 */
>>> +
>>> +	if (nesting > 1) {
>>>
>>> 		/*
>>> -		 * Just atomically decrement whatever we incremented.
>>> -		 * Might later want to awaken some task waiting for the
>>> -		 * grace period to complete, but keep it simple for the
>>> -		 * moment.
>>> +		 * There is an enclosing rcu_read_lock(), so all we
>>> +		 * need to do is to decrement the counter.
>>> 		 */
>>>
>>> -		smp_mb__before_atomic_dec();
>>> -		atomic_dec(current->rcu_flipctr1);
>>> -		current->rcu_flipctr1 = NULL;
>>> -		if (unlikely(current->rcu_flipctr2 != NULL)) {
>>> -			atomic_dec(current->rcu_flipctr2);
>>> -			current->rcu_flipctr2 = NULL;
>>> -		}
>>> +		current->rcu_read_lock_nesting = nesting - 1;
>>> +
>>> +	} else {
>>> +
>>> +		/*
>>> +		 * Outermost nesting of rcu_read_unlock(), so we must
>>> +		 * decrement the current counter for the current CPU.
>>> +		 * This must be done carefully, because NMIs can
>>> +		 * occur at any point in this code, and any rcu_read_lock()
>>> +		 * and rcu_read_unlock() pairs in the NMI handlers
>>> +		 * must interact non-destructively with this code.
>>> +		 * Lots of barrier() calls, and -very- careful ordering.
>>> +		 *
>>> +		 * Changes to this code, including this one, must be
>>> +		 * inspected, validated, and tested extremely carefully!!!
>>> +		 */
>>> +
>>> +		/*
>>> +		 * First, pick up the index.  Enforce ordering for
>>> +		 * both compilers and for DEC Alpha.
>>> +		 */
>>> +
>>> +		idx = current->rcu_flipctr_idx;
>>> +		smp_read_barrier_depends();
>>> +		barrier();
>>> +
>>> +		/*
>>> +		 * It is now safe to decrement the task's nesting count.
>>> +		 * NMIs that occur after this statement will route
>>> +		 * their rcu_read_lock() calls through this "else" clause
>>> +		 * of this "if" statement, and thus will start incrementing
>>> +		 * the per-CPU counter on their own.  Enforce ordering for
>>> +		 * compilers.
>>> +		 */
>>> +
>>> +		current->rcu_read_lock_nesting = nesting - 1;
>>> +		barrier();
>>> +
>>> +		/*
>>> +		 * Decrement the per-CPU counter. NMI handlers
>>> +		 * might increment it as well, but they had better
>>> +		 * properly nest their rcu_read_lock()/rcu_read_unlock()
>>> +		 * pairs so that the value is restored before the handler
>>> +		 * returns to us.
>>> +		 */
>>> +
>>> +		__get_cpu_var(rcu_flipctr)[idx]--;
>>> 	}
>>>
>>> 	trace_special((unsigned long)current->rcu_flipctr1,
>>> 		      (unsigned long) current->rcu_flipctr2,
>>> 		      current->rcu_read_lock_nesting);
>>> -	local_irq_restore(oldirq);
>>> +	raw_local_irq_restore(oldirq);
>>> }
>>>
>>> static void
>>> __rcu_advance_callbacks(void)
>>> {
>>>
>>> -	if (rcu_data.completed != rcu_ctrlblk.completed) {
>>> +	if ((rcu_data.completed >> 1) != (rcu_ctrlblk.completed >> 1)) {
>>> 		if (rcu_data.waitlist != NULL) {
>>> 			*rcu_data.donetail = rcu_data.waitlist;
>>> 			rcu_data.donetail = rcu_data.waittail;
>>> @@ -216,13 +358,186 @@ __rcu_advance_callbacks(void)
>>> 			rcu_data.waittail = &rcu_data.waitlist;
>>> 		}
>>> 		rcu_data.completed = rcu_ctrlblk.completed;
>>> +	} else if (rcu_data.completed != rcu_ctrlblk.completed) {
>>> +		rcu_data.completed = rcu_ctrlblk.completed;
>>> +	}
>>> +}
>>> +
>>> +/*
>>> + * Get here when RCU is idle.  Decide whether we need to
>>> + * move out of idle state, and return zero if so.
>>> + * "Straightforward" approach for the moment, might later
>>> + * use callback-list lengths, grace-period duration, or
>>> + * some such to determine when to exit idle state.
>>> + * Might also need a pre-idle test that does not acquire
>>> + * the lock, but let's get the simple case working first...
>>> + */
>>> +
>>> +static int
>>> +rcu_try_flip_idle(int flipctr)
>>> +{
>>> +#ifdef CONFIG_RCU_STATS
>>> +	rcu_data.n_rcu_try_flip_i1++;
>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>> +
>>> +	if (!rcu_pending(smp_processor_id())) {
>>> +#ifdef CONFIG_RCU_STATS
>>> +		rcu_data.n_rcu_try_flip_ie1++;
>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>> +		return 1;
>>> 	}
>>> +	return 0;
>>> +}
>>> +
>>> +/*
>>> + * Flip processing up to and including the flip, as well as
>>> + * telling CPUs to acknowledge the flip.
>>> + */
>>> +
>>> +static int
>>> +rcu_try_flip_in_gp(int flipctr)
>>> +{
>>> +	int cpu;
>>> +
>>> +#ifdef CONFIG_RCU_STATS
>>> +	rcu_data.n_rcu_try_flip_g1++;
>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>> +
>>> +	/*
>>> +	 * Do the flip.
>>> +	 */
>>> +
>>> +	rcu_ctrlblk.completed++;  /* stands in for rcu_try_flip_g2 */
>>> +
>>> +	/*
>>> +	 * Need a memory barrier so that other CPUs see the new
>>> +	 * counter value before they see the subsequent change of all
>>> +	 * the rcu_flip_flag instances to rcu_flipped.
>>> +	 */
>>> +
>>> +	smp_mb();
>>> +
>>> +	/* Now ask each CPU for acknowledgement of the flip. */
>>> +
>>> +	for_each_cpu(cpu)
>>> +		per_cpu(rcu_flip_flag, cpu) = rcu_flipped;
>>> +
>>> +	return 0;
>>> +}
>>> +
>>> +/*
>>> + * Wait for CPUs to acknowledge the flip.
>>> + */
>>> +
>>> +static int
>>> +rcu_try_flip_waitack(int flipctr)
>>> +{
>>> +	int cpu;
>>> +
>>> +#ifdef CONFIG_RCU_STATS
>>> +	rcu_data.n_rcu_try_flip_a1++;
>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>> +
>>> +	for_each_cpu(cpu)
>>> +		if (per_cpu(rcu_flip_flag, cpu) != rcu_flip_seen) {
>>> +#ifdef CONFIG_RCU_STATS
>>> +			rcu_data.n_rcu_try_flip_ae1++;
>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>> +			return 1;
>>> +		}
>>> +
>>> +	/*
>>> +	 * Make sure our checks above don't bleed into subsequent
>>> +	 * waiting for the sum of the counters to reach zero.
>>> +	 */
>>> +
>>> +	smp_mb();
>>> +
>>> +#ifdef CONFIG_RCU_STATS
>>> +	rcu_data.n_rcu_try_flip_a2++;
>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>> +
>>> +	return 0;
>>> +}
>>> +
>>> +/*
>>> + * Wait for collective ``last'' counter to reach zero,
>>> + * then tell all CPUs to do an end-of-grace-period memory barrier.
>>> + */
>>> +
>>> +static int
>>> +rcu_try_flip_waitzero(int flipctr)
>>> +{
>>> +	int cpu;
>>> +	int lastidx = !(flipctr & 0x1);
>>> +	int sum = 0;
>>> +
>>> +#ifdef CONFIG_RCU_STATS
>>> +	rcu_data.n_rcu_try_flip_z1++;
>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>> +
>>> +	/* Check to see if the sum of the "last" counters is zero. */
>>> +
>>> +	for_each_cpu(cpu)
>>> +		sum += per_cpu(rcu_flipctr, cpu)[lastidx];
>>> +	if (sum != 0) {
>>> +#ifdef CONFIG_RCU_STATS
>>> +		rcu_data.n_rcu_try_flip_ze1++;
>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>> +		return 1;
>>> +	}
>>> +
>>> +#ifdef CONFIG_RCU_STATS
>>> +	rcu_data.n_rcu_try_flip_z2++;
>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>> +
>>> +	/* Make sure we don't call for memory barriers before we see zero. */
>>> +
>>> +	smp_mb();
>>> +
>>> +	/* Call for a memory barrier from each CPU. */
>>> +
>>> +	for_each_cpu(cpu)
>>> +		per_cpu(rcu_mb_flag, cpu) = rcu_mb_needed;
>>> +
>>> +	return 0;
>>> +}
>>> +
>>> +/*
>>> + * Wait for all CPUs to do their end-of-grace-period memory barrier.
>>> + * Return 0 once all CPUs have done so.
>>> + */
>>> +
>>> +static int
>>> +rcu_try_flip_waitmb(int flipctr)
>>> +{
>>> +	int cpu;
>>> +
>>> +#ifdef CONFIG_RCU_STATS
>>> +	rcu_data.n_rcu_try_flip_m1++;
>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>> +
>>> +	for_each_cpu(cpu)
>>> +		if (per_cpu(rcu_mb_flag, cpu) != rcu_mb_done) {
>>> +#ifdef CONFIG_RCU_STATS
>>> +			rcu_data.n_rcu_try_flip_me1++;
>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>> +			return 1;
>>> +		}
>>> +
>>> +	smp_mb(); /* Ensure that the above checks precede any following
>>> flip. */
>>> +
>>> +#ifdef CONFIG_RCU_STATS
>>> +	rcu_data.n_rcu_try_flip_m2++;
>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>> +
>>> +	return 0;
>>> }
>>>
>>> /*
>>> * Attempt a single flip of the counters.  Remember, a single flip does
>>> * -not- constitute a grace period.  Instead, the interval between
>>> - * a pair of consecutive flips is a grace period.
>>> + * at least three consecutive flips is a grace period.
>>> *
>>> * If anyone is nuts enough to run this CONFIG_PREEMPT_RCU implementation
>>> * on a large SMP, they might want to use a hierarchical organization of
>>> @@ -231,13 +546,11 @@ __rcu_advance_callbacks(void)
>>> static void
>>> rcu_try_flip(void)
>>> {
>>> -	int cpu;
>>> 	long flipctr;
>>> 	unsigned long oldirq;
>>>
>>> -	flipctr = rcu_ctrlblk.completed;
>>> #ifdef CONFIG_RCU_STATS
>>> -	atomic_inc(&rcu_data.n_rcu_try_flip1);
>>> +	atomic_inc(&rcu_data.n_rcu_try_flip_1);
>>> #endif /* #ifdef CONFIG_RCU_STATS */
>>> 	if (unlikely(!spin_trylock_irqsave(&rcu_ctrlblk.fliplock, oldirq))) {
>>> #ifdef CONFIG_RCU_STATS
>>> @@ -245,52 +558,82 @@ rcu_try_flip(void)
>>> #endif /* #ifdef CONFIG_RCU_STATS */
>>> 		return;
>>> 	}
>>> -	if (unlikely(flipctr != rcu_ctrlblk.completed)) {
>>> -
>>> -		/* Our work is done!  ;-) */
>>> -
>>> -#ifdef CONFIG_RCU_STATS
>>> -		rcu_data.n_rcu_try_flip_e2++;
>>> -#endif /* #ifdef CONFIG_RCU_STATS */
>>> -		spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
>>> -		return;
>>> -	}
>>> -	flipctr &= 0x1;
>>>
>>> 	/*
>>> -	 * Check for completion of all RCU read-side critical sections
>>> -	 * that started prior to the previous flip.
>>> +	 * Take the next transition(s) through the RCU grace-period
>>> +	 * flip-counter state machine.
>>> 	 */
>>>
>>> -#ifdef CONFIG_RCU_STATS
>>> -	rcu_data.n_rcu_try_flip2++;
>>> -#endif /* #ifdef CONFIG_RCU_STATS */
>>> -	for_each_cpu(cpu) {
>>> -		if (atomic_read(&per_cpu(rcu_flipctr, cpu)[!flipctr]) != 0) {
>>> -#ifdef CONFIG_RCU_STATS
>>> -			rcu_data.n_rcu_try_flip_e3++;
>>> -#endif /* #ifdef CONFIG_RCU_STATS */
>>> -			spin_unlock_irqrestore(&rcu_ctrlblk.fliplock,
>>> oldirq);
>>> -			return;
>>> -		}
>>> +	flipctr = rcu_ctrlblk.completed;
>>> +	switch (rcu_try_flip_state) {
>>> +	case rcu_try_flip_idle_state:
>>> +		if (rcu_try_flip_idle(flipctr))
>>> +			break;
>>> +		rcu_try_flip_state = rcu_try_flip_in_gp_state;
>>> +	case rcu_try_flip_in_gp_state:
>>> +		if (rcu_try_flip_in_gp(flipctr))
>>> +			break;
>>> +		rcu_try_flip_state = rcu_try_flip_waitack_state;
>>> +	case rcu_try_flip_waitack_state:
>>> +		if (rcu_try_flip_waitack(flipctr))
>>> +			break;
>>> +		rcu_try_flip_state = rcu_try_flip_waitzero_state;
>>> +	case rcu_try_flip_waitzero_state:
>>> +		if (rcu_try_flip_waitzero(flipctr))
>>> +			break;
>>> +		rcu_try_flip_state = rcu_try_flip_waitmb_state;
>>> +	case rcu_try_flip_waitmb_state:
>>> +		if (rcu_try_flip_waitmb(flipctr))
>>> +			break;
>>> +		rcu_try_flip_state = rcu_try_flip_idle_state;
>>> 	}
>>> +	spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
>>> +}
>>>
>>> -	/* Do the flip. */
>>> +/*
>>> + * Check to see if this CPU needs to report that it has seen the most
>>> + * recent counter flip, thereby declaring that all subsequent
>>> + * rcu_read_lock() invocations will respect this flip.
>>> + */
>>>
>>> -	smp_mb();
>>> -	rcu_ctrlblk.completed++;
>>> +static void
>>> +rcu_check_flipseen(int cpu)
>>> +{
>>> +	if (per_cpu(rcu_flip_flag, cpu) == rcu_flipped) {
>>> +		smp_mb();  /* Subsequent counter acccesses must see new
>>> value */
>>> +		per_cpu(rcu_flip_flag, cpu) = rcu_flip_seen;
>>> +		smp_mb();  /* probably be implied by interrupt, but... */
>>> +	}
>>> +}
>>>
>>> -#ifdef CONFIG_RCU_STATS
>>> -	rcu_data.n_rcu_try_flip3++;
>>> -#endif /* #ifdef CONFIG_RCU_STATS */
>>> -	spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
>>> +/*
>>> + * Check to see if this CPU needs to do a memory barrier in order to
>>> + * ensure that any prior RCU read-side critical sections have committed
>>> + * their counter manipulations and critical-section memory references
>>> + * before declaring the grace period to be completed.
>>> + */
>>> +
>>> +static void
>>> +rcu_check_mb(int cpu)
>>> +{
>>> +	if (per_cpu(rcu_mb_flag, cpu) == rcu_mb_needed) {
>>> +		smp_mb();
>>> +		per_cpu(rcu_mb_flag, cpu) = rcu_mb_done;
>>> +	}
>>> }
>>>
>>> +/*
>>> + * This function is called periodically on each CPU in hardware interrupt
>>> + * context.
>>> + */
>>> +
>>> void
>>> rcu_check_callbacks(int cpu, int user)
>>> {
>>> 	unsigned long oldirq;
>>>
>>> +	rcu_check_flipseen(cpu);
>>> +	rcu_check_mb(cpu);
>>> 	if (rcu_ctrlblk.completed == rcu_data.completed) {
>>> 		rcu_try_flip();
>>> 		if (rcu_ctrlblk.completed == rcu_data.completed) {
>>> @@ -359,10 +702,10 @@ call_rcu(struct rcu_head *head,
>>> }
>>>
>>> /*
>>> - * Crude hack, reduces but does not eliminate possibility of failure.
>>> - * Needs to wait for all CPUs to pass through a -voluntary- context
>>> - * switch to eliminate possibility of failure.  (Maybe just crank
>>> - * priority down...)
>>> + * Wait until all currently running preempt_disable() code segments
>>> + * (including hardware-irq-disable segments) complete.  Note that
>>> + * in -rt this does -not- necessarily result in all currently executing
>>> + * interrupt -handlers- having completed.
>>> */
>>> void
>>> synchronize_sched(void)
>>> @@ -390,7 +733,7 @@ rcu_pending(int cpu)
>>>
>>> void __init rcu_init(void)
>>> {
>>> -/*&&&&*/printk("WARNING: experimental RCU implementation.\n");
>>> +/*&&&&*/printk("WARNING: experimental non-atomic RCU implementation.\n");
>>> 	spin_lock_init(&rcu_data.lock);
>>> 	rcu_data.completed = 0;
>>> 	rcu_data.nextlist = NULL;
>>> @@ -416,7 +759,8 @@ int rcu_read_proc_data(char *page)
>>> 	return sprintf(page,
>>> 		       "ggp=%ld lgp=%ld rcc=%ld\n"
>>> 		       "na=%ld nl=%ld wa=%ld wl=%ld da=%ld dl=%ld dr=%ld
>>> 		       di=%d\n"
>>> -		       "rtf1=%d rtf2=%ld rtf3=%ld rtfe1=%d rtfe2=%ld
>>> rtfe3=%ld\n",
>>> +		       "1=%d e1=%d i1=%ld ie1=%ld g1=%ld a1=%ld ae1=%ld
>>> a2=%ld\n"
>>> +		       "z1=%ld ze1=%ld z2=%ld m1=%ld me1=%ld m2=%ld\n",
>>>
>>> 		       rcu_ctrlblk.completed,
>>> 		       rcu_data.completed,
>>> @@ -431,12 +775,20 @@ int rcu_read_proc_data(char *page)
>>> 		       rcu_data.n_done_remove,
>>> 		       atomic_read(&rcu_data.n_done_invoked),
>>>
>>> -		       atomic_read(&rcu_data.n_rcu_try_flip1),
>>> -		       rcu_data.n_rcu_try_flip2,
>>> -		       rcu_data.n_rcu_try_flip3,
>>> +		       atomic_read(&rcu_data.n_rcu_try_flip_1),
>>> 		       atomic_read(&rcu_data.n_rcu_try_flip_e1),
>>> -		       rcu_data.n_rcu_try_flip_e2,
>>> -		       rcu_data.n_rcu_try_flip_e3);
>>> +		       rcu_data.n_rcu_try_flip_i1,
>>> +		       rcu_data.n_rcu_try_flip_ie1,
>>> +		       rcu_data.n_rcu_try_flip_g1,
>>> +		       rcu_data.n_rcu_try_flip_a1,
>>> +		       rcu_data.n_rcu_try_flip_ae1,
>>> +		       rcu_data.n_rcu_try_flip_a2,
>>> +		       rcu_data.n_rcu_try_flip_z1,
>>> +		       rcu_data.n_rcu_try_flip_ze1,
>>> +		       rcu_data.n_rcu_try_flip_z2,
>>> +		       rcu_data.n_rcu_try_flip_m1,
>>> +		       rcu_data.n_rcu_try_flip_me1,
>>> +		       rcu_data.n_rcu_try_flip_m2);
>>> }
>>>
>>> int rcu_read_proc_gp_data(char *page)
>>> @@ -464,14 +816,23 @@ int rcu_read_proc_ctrs_data(char *page)
>>> 	int cpu;
>>> 	int f = rcu_data.completed & 0x1;
>>>
>>> -	cnt += sprintf(&page[cnt], "CPU last cur\n");
>>> +	cnt += sprintf(&page[cnt], "CPU last cur F M\n");
>>> 	for_each_cpu(cpu) {
>>> -		cnt += sprintf(&page[cnt], "%3d %4d %3d\n",
>>> +		cnt += sprintf(&page[cnt], "%3d %4d %3d %d %d\n",
>>> 			       cpu,
>>> -			       atomic_read(&per_cpu(rcu_flipctr, cpu)[!f]),
>>> -			       atomic_read(&per_cpu(rcu_flipctr, cpu)[f]));
>>> -	}
>>> -	cnt += sprintf(&page[cnt], "ggp = %ld\n", rcu_data.completed);
>>> +			       per_cpu(rcu_flipctr, cpu)[!f],
>>> +			       per_cpu(rcu_flipctr, cpu)[f],
>>> +			       per_cpu(rcu_flip_flag, cpu),
>>> +			       per_cpu(rcu_mb_flag, cpu));
>>> +	}
>>> +	cnt += sprintf(&page[cnt], "ggp = %ld, state = %d",
>>> +		       rcu_data.completed, rcu_try_flip_state);
>>> +	if ((0 <= rcu_try_flip_state) &&
>>> +	    (rcu_try_flip_state <= sizeof(rcu_try_flip_state_names) /
>>> +	    			   sizeof(rcu_try_flip_state_names[0])))
>>> +		cnt += sprintf(&page[cnt], " (%s)",
>>> +			       rcu_try_flip_state_names[rcu_try_flip_state]);
>>> +	cnt += sprintf(&page[cnt], "\n");
>>> 	return (cnt);
>>> }
>>>
>>>
>

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [RFC, PATCH -rt] NMI-safe mb- and atomic-free RT RCU
  2006-07-27 11:00     ` Esben Nielsen
@ 2006-07-27 15:46       ` Paul E. McKenney
  2006-07-27 19:53         ` Bill Huey
  2006-07-28  0:22         ` Esben Nielsen
  0 siblings, 2 replies; 13+ messages in thread
From: Paul E. McKenney @ 2006-07-27 15:46 UTC (permalink / raw)
  To: Esben Nielsen
  Cc: linux-kernel, compudj, billh, rostedt, tglx, mingo, dipankar,
	rusty

On Thu, Jul 27, 2006 at 12:00:13PM +0100, Esben Nielsen wrote:
> Hi Paul,
>  Thanks, for entering a discussion of my idea! Even though you are busy 
> and are critical towards my idea, you take your time to answer! Thanks.

I appreciate the time you took to write down your ideas, and for your
taking my comments in the spirit intended.

> On Wed, 26 Jul 2006, Paul E. McKenney wrote:
> 
> >On Thu, Jul 27, 2006 at 02:39:07AM +0100, Esben Nielsen wrote:
> >>
> >>
> >>On Tue, 25 Jul 2006, Paul E. McKenney wrote:
> >>
> >>>Not for inclusion, should be viewed with great suspicion.
> >>>
> >>>This patch provides an NMI-safe realtime RCU.  Hopefully, Mathieu can
> >>>make use of synchronize_sched() instead, since I have not yet figured
> >>>out a way to make this NMI-safe and still get rid of the interrupt
> >>>disabling.  ;-)
> >>>
> >>>						Thanx, Paul
> >>
> >>I must say I don't understand all this. It looks very complicated. Is it
> >>really needed?
> >>
> >>I have been thinking about the following design:
> >>
> >>void rcu_read_lock()
> >>{
> >>	if (!in_interrupt())
> >>		current->rcu_read_lock_count++;
> >>}
> >>void rcu_read_unlock()
> >>{
> >>	if (!in_interrupt())
> >>		current->rcu_read_lock_count--;
> >>}
> >>
> >>Somewhere in schedule():
> >>
> >>	rq->rcu_read_lock_count += prev->rcu_read_lock_count;
> >>	if (!rq->rcu_read_lock_count)
> >>		forward_waiting_rcu_jobs();
> >>	rq->rcu_read_lock_count -= next->rcu_read_lock_count;
> >
> >So rq->rcu_read_lock_count contains the sum of the counts of all
> >tasks that were scheduled away from this CPU.
> >
> >What happens in face of the following sequence of events?
> >Assume that all tasks stay on CPU 0 for the moment.
> >
> >o	Task A does rcu_read_lock().
> >
> >o	Task A is preempted.  rq->rcu_read_lock_count is nonzero.
> >
> >o	Task B runs and does rcu_read_lock().
> >
> >o	Task B is preempted (perhaps because it dropped the lock
> >	that was causing it to have high priority).  Regardless of
> >	the reason for the preemption, rq->rcu_read_lock_count is
> >	nonzero.
> >
> >o	Task C runs and does rcu_read_lock().
> >
> >o	Task C is preempted.  rq->rcu_read_lock_count is nonzero.
> >
> >o	Task A runs again, and does rcu_read_unlock().
> >
> >o	Task A is preempted.  rq->rcu_read_lock_count is nonzero.
> >
> >o	Task D runs and does rcu_read_lock().
> >
> >o	Task D is preempted.  rq->rcu_read_lock_count is nonzero.
> >
> >And so on.  As long as at least one of the preempted tasks is in an
> >RCU critical section, you never do your forward_waiting_rcu_jobs(),
> >and the grace period goes on forever.  Or at least until you OOM the
> >machine.
> >
> >So what am I missing here?
> 
> The boosting idea below. Then tasks A-D must be RT tasks for this to 
> happen. And the machine must anyway run out of RT tasks or it will 
> effectively lock up.

I agree that boosting would drive any particular task out of its
RCU read-side critical section, but I do not agree that you have
identified a valid induction argument that proves that we run out
of tasks (yet, anyway).  For one thing, tasks acquiring locks in
the RCU read-side critical sections could be priority boosted,
so that a small set of tasks could rise and fall in priority as
RT tasks attempted to acquire locks held by this small set of
tasks.  The RT tasks need not be running on the same CPU that the
non-realtime "victim" tasks running in RCU read-side critical sections,
therefore, the RT tasks need not be preempting these non-realtime
tasks.

To be more specific, imagine a four-CPU system with three RT tasks
running on three of the four CPUs and a large number of non-realtime
tasks running on all four CPUs.  Consider the code in __d_lookup(), which
acquires dentry->d_lock under rcu_read_lock().  Imagine a situation
where the three RT tasks are repeatedly traversing pathnames with a
long common path prefix, with this same prefix being traversed by the
non-realtime tasks.  This could result in the non-realtime tasks being
repeatedly priority-boosted and preempted (preempted both by each
other and by RT tasks).

Over to you!

> >I could imagine introducing a pair of counters per runqueue, but then
> >we end up with the same issues with counter-flip that we have now.
> >
> >Another question -- what happens if a given CPU stays idle?  Wouldn't
> >the callbacks then just start piling up on that CPU?
> 
> How can a CPU stay idle? There is a tick every 2.5 ms. Even without that 
> the previous CPU can make it schedule if it sees the jobs piling up. Or if 
> that is considered too expensive, it can take over and forward the 
> jobs to the next CPU.

Tickless idle, prized by embedded systems.  Here an idle CPU gets shut
down if there is nothing in particular for it to do.  Classic RCU does a
dance with CPUs entering tickless idle state in order to avoid expecting
those CPUs to participate in future grace periods.

This had its challenges for the bitmask-based Classic RCU, and would
have a few additional challenges for your circular linked-list setup.

Current -rt RCU avoids this problem by dint of being too stupid to
need to know what CPUs are alive or not.  My recent patch needs something
similar to the tickless-idle change to Classic RCU.

> >>Now what should forward_waiting_rcu_jobs() do?
> >>
> >>I imagine a circular datastructur of all the CPUs. When a call_rcu() is
> >>run on a CPU it is first added a list of jobs for that CPU. When
> >>forward_waiting_rcu_jobs() is called all the pending jobs are forwarded to
> >>the next CPU. The next CPU will bring it along the next CPU in the circle
> >>along with it's own jobs. When jobs hit the original CPU they will be
> >>executed. Or rather, when the CPU just before calls
> >>forward_waiting_rcu_jobs(), it sends the jobs belonging to the next CPU to
> >>the RCU-task of the next CPU, where they will be executed, instead of to
> >>the scheduler (runqueue) of the next CPU, where it will just be send out 
> >>on
> >>a
> >>new roundtrip along the circle.
> >>
> >>If you use a structure like the plist then the forwarding procedure can be
> >>done in O(number of online CPUs) time worst case - much less in the usual
> >>case where the lists are almost empty.
> >>
> >>Now the problem is: What happens if a task in a rcu read-side lock is
> >>migrated? Then the rcu_read_lock_count on the source CPU will stay in plus
> >>while on the target CPU it will go in minus. This ought to be simply
> >>fixeable by adding task->rcu_read_lock_count to the target runqueue before
> >>migrating and subtracting it from the old runqueue after migrating. But
> >>there is another problem: RCU-jobs refering to data used by the task being
> >>migrated might have been forwarded from the target CPU. Thus the migration
> >>task have to go back along the circle of CPUs and move all the relevant
> >>RCU-jobs back to the target CPU to be forwarded again. This is also doable
> >>in
> >>number of CPUs between source and target times O(<number of online CPUs>)
> >>(see above) time.
> >
> >So if I have the right (or wrong) pattern of task migrations, the RCU
> >callbacks never get to their originating CPU?
> 
> In principle, yes. But if the machine starts to migrate tasks that wildly 
> it wont get any work done anyway, because all it's time is done doing 
> migration.

Since you are waiting for a context switch to move the RCU callbacks,
the migration does not have to be all that wild.  In an eight-CPU system,
if I migrate a preempted task a couple of times in the course of the
eight context switches required for the callbacks to circle the loop
of CPUs, we are in trouble, right?

> >Alternatively, if the task residing in the RCU read-side critical section
> >is forwarded around the loop of CPUs, callbacks circulating around this
> >loop might execute before the RCU read-side critical section completes.
> 
> That is why some of the callbacks (those which has parsed the target CPU 
> but not yet the source CPU) have to be moved back to the target CPU.

Yep.  Leaving them to follow their planned circuit indeed does not cut it.

> I just came up with an even simpler solution:
> Delay the subtraction of the task->rcu_read_lock_count from 
> srcrq->rcu_read_lock_count until the task calls rcu_read_unlock(). That 
> can be done by flagging the task (do task->rcu_read_lock_count |= 
> 0x80000000) and do a simple
> 	if (unlickely(current->rcu_read_lock_count == 0x80000000))
> 		fix_rcu_read_lock_count_on_old_cpu();
> in rcu_read_unlock(). Now the task can be migrated again before calloing 
> fix_rcu_read_lock_count_on_old_cpu(). The relevant RCU jobs still can't 
> get past the original CPU before the task have called 
> fix_rcu_read_lock_count_on_old_cpu(), so all subsequent migrations can just
> do the count down on the intermediate CPUs right away.

But now you need atomic instructions in all rcu_read_lock() and
rcu_read_unlock() invocations to handle the possibility that some
migrated task will decrement the counter at the same time we are
incrementing it?  I cannot tell for sure because you have not supplied
sample code for fix_rcu_read_lock_count_on_old_cpu().

> >>To avoid a task in a read-side lock being starved for too long the
> >>following line can be added to normal_prio():
> >>  if (p->rcu_read_lock_count)
> >>	p->prio = MAX_RT_PRIO;
> >
> >But doesn't this have the same impact on latency as disabling preemption
> >in rcu_read_lock() and then re-enabling it in rcu_read_unlock()?
> 
> No, RT tasks can still preempt the RCU read side lock. But SCHED_OTHER and 
> SCHED_BATCH can't. You can also the RCU read side boosting prioritiy 
> dynamic and let the system adjust it or just let the admin adjust it.

Fair enough -- I misread MAX_RT_PRIO as MAX_PRIO.

This approach I can get behind -- my thought has been to boost to
either MAX_RT_PRIO or MAX_RT_PRIO-1 when preempt_schedule() sees that
it is preempting an RCU read-side critical section.

So I agree with you on at least one point!  ;-)

A possible elaboration would be to keep a linked list of tasks preempted
in their RCU read-side critical sections so that they can be further
boosted to the highest possible priority (numerical value of zero,
not sure what the proper symbol is) if the grace period takes too many
jiffies to complete.  Another piece is priority boosting when blocking
on a mutex from within an RCU read-side critical section.

> >Also, doesn't this circular data structure need to handle CPU hotplug?
> 
> Ofcourse. I don't know about hotplug though. But it sounds simple to 
> migrate the tasks away, take the CPU out of the circle and then forward 
> the last RCU jobs from that CPU.

Doing it efficiently is the difficulty, particularly for tickless-idle
systems where CPUs need to be added and removed on a regular basis.
Also, what locking design would you use in order to avoid deadlock?
There is a hotplug mutex, but seems like you might need to acquire some
number of rq mutexes as well.

> >>I don't have time to code this nor a SMP machine to test it on. But I can
> >>give the idea to you anyways in the hope you might code it :-)
> >
> >I am beginning to think that it will not be at all simple by the time I
> >code up all the required fixups.  Or am I missing something?
> 
> Ofcourse, implementing something is always a lot harder than writing the 
> idea down. Anyway, we already worked out some of the hardest details :-)

I certainly agree with your first sentence.  On your second sentence,
I would s/worked out/mentioned/  ;-)

Another approach I am looking at does not permit rcu_read_lock() in
NMI/SMI/hardirq, but is much simpler.  Its downside is that it cannot
serve as common code between CONFIG_PREEMPT_RT and CONFIG_PREEMPT.

							Thanx, Paul

> >>>Signed-off-by: Paul E. McKenney <paulmck@us.ibm.com> (but please don't
> >>>apply)
> >>>
> >>>include/linux/sched.h |    3
> >>>kernel/rcupreempt.c   |  577
> >>>++++++++++++++++++++++++++++++++++++++++----------
> >>>2 files changed, 470 insertions(+), 110 deletions(-)
> >>>
> >>>diff -urpNa -X dontdiff linux-2.6.17-rt7/include/linux/sched.h
> >>>linux-2.6.17-rt7-norrupt/include/linux/sched.h
> >>>--- linux-2.6.17-rt7/include/linux/sched.h	2006-07-19
> >>>01:43:09.000000000 -0700
> >>>+++ linux-2.6.17-rt7-norrupt/include/linux/sched.h	2006-07-22
> >>>15:44:36.000000000 -0700
> >>>@@ -868,8 +868,7 @@ struct task_struct {
> >>>
> >>>#ifdef CONFIG_PREEMPT_RCU
> >>>	int rcu_read_lock_nesting;
> >>>-	atomic_t *rcu_flipctr1;
> >>>-	atomic_t *rcu_flipctr2;
> >>>+	int rcu_flipctr_idx;
> >>>#endif
> >>>#ifdef CONFIG_SCHEDSTATS
> >>>	struct sched_info sched_info;
> >>>diff -urpNa -X dontdiff linux-2.6.17-rt7/kernel/rcupreempt.c
> >>>linux-2.6.17-rt7-norrupt/kernel/rcupreempt.c
> >>>--- linux-2.6.17-rt7/kernel/rcupreempt.c	2006-07-19
> >>>01:43:09.000000000 -0700
> >>>+++ linux-2.6.17-rt7-norrupt/kernel/rcupreempt.c	2006-07-22
> >>>20:21:46.000000000 -0700
> >>>@@ -15,11 +15,13 @@
> >>>* along with this program; if not, write to the Free Software
> >>>* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
> >>>USA.
> >>>*
> >>>- * Copyright (C) IBM Corporation, 2001
> >>>+ * Copyright (C) IBM Corporation, 2006
> >>>*
> >>>* Authors: Paul E. McKenney <paulmck@us.ibm.com>
> >>>*		With thanks to Esben Nielsen, Bill Huey, and Ingo Molnar
> >>>- *		for pushing me away from locks and towards counters.
> >>>+ *		for pushing me away from locks and towards counters, and
> >>>+ *		to Suparna Bhattacharya for pushing me completely away
> >>>+ *		from atomic instructions on the read side.
> >>>*
> >>>* Papers:  http://www.rdrop.com/users/paulmck/RCU
> >>>*
> >>>@@ -73,12 +75,20 @@ struct rcu_data {
> >>>	long		n_done_remove;
> >>>	atomic_t	n_done_invoked;
> >>>	long		n_rcu_check_callbacks;
> >>>-	atomic_t	n_rcu_try_flip1;
> >>>-	long		n_rcu_try_flip2;
> >>>-	long		n_rcu_try_flip3;
> >>>+	atomic_t	n_rcu_try_flip_1;
> >>>	atomic_t	n_rcu_try_flip_e1;
> >>>-	long		n_rcu_try_flip_e2;
> >>>-	long		n_rcu_try_flip_e3;
> >>>+	long		n_rcu_try_flip_i1;
> >>>+	long		n_rcu_try_flip_ie1;
> >>>+	long		n_rcu_try_flip_g1;
> >>>+	long		n_rcu_try_flip_a1;
> >>>+	long		n_rcu_try_flip_ae1;
> >>>+	long		n_rcu_try_flip_a2;
> >>>+	long		n_rcu_try_flip_z1;
> >>>+	long		n_rcu_try_flip_ze1;
> >>>+	long		n_rcu_try_flip_z2;
> >>>+	long		n_rcu_try_flip_m1;
> >>>+	long		n_rcu_try_flip_me1;
> >>>+	long		n_rcu_try_flip_m2;
> >>>#endif /* #ifdef CONFIG_RCU_STATS */
> >>>};
> >>>struct rcu_ctrlblk {
> >>>@@ -90,8 +100,51 @@ static struct rcu_ctrlblk rcu_ctrlblk =
> >>>	.fliplock = RAW_SPIN_LOCK_UNLOCKED,
> >>>	.completed = 0,
> >>>};
> >>>-static DEFINE_PER_CPU(atomic_t [2], rcu_flipctr) =
> >>>-	{ ATOMIC_INIT(0), ATOMIC_INIT(0) };
> >>>+static DEFINE_PER_CPU(int [2], rcu_flipctr) = { 0, 0 };
> >>>+
> >>>+/*
> >>>+ * States for rcu_try_flip() and friends.
> >>>+ */
> >>>+
> >>>+enum rcu_try_flip_states {
> >>>+	rcu_try_flip_idle_state,	/* "I" */
> >>>+	rcu_try_flip_in_gp_state,	/* "G" */
> >>>+	rcu_try_flip_waitack_state, 	/* "A" */
> >>>+	rcu_try_flip_waitzero_state,	/* "Z" */
> >>>+	rcu_try_flip_waitmb_state	/* "M" */
> >>>+};
> >>>+static enum rcu_try_flip_states rcu_try_flip_state =
> >>>rcu_try_flip_idle_state;
> >>>+#ifdef CONFIG_RCU_STATS
> >>>+static char *rcu_try_flip_state_names[] =
> >>>+	{ "idle", "gp", "waitack", "waitzero", "waitmb" };
> >>>+#endif /* #ifdef CONFIG_RCU_STATS */
> >>>+
> >>>+/*
> >>>+ * Enum and per-CPU flag to determine when each CPU has seen
> >>>+ * the most recent counter flip.
> >>>+ */
> >>>+
> >>>+enum rcu_flip_flag_values {
> >>>+	rcu_flip_seen,		/* Steady/initial state, last flip seen. */
> >>>+				/* Only GP detector can update. */
> >>>+	rcu_flipped		/* Flip just completed, need confirmation. */
> >>>+				/* Only corresponding CPU can update. */
> >>>+};
> >>>+static DEFINE_PER_CPU(enum rcu_flip_flag_values, rcu_flip_flag) =
> >>>rcu_flip_seen;
> >>>+
> >>>+/*
> >>>+ * Enum and per-CPU flag to determine when each CPU has executed the
> >>>+ * needed memory barrier to fence in memory references from its last RCU
> >>>+ * read-side critical section in the just-completed grace period.
> >>>+ */
> >>>+
> >>>+enum rcu_mb_flag_values {
> >>>+	rcu_mb_done,		/* Steady/initial state, no mb()s required.
> >>>*/
> >>>+				/* Only GP detector can update. */
> >>>+	rcu_mb_needed		/* Flip just completed, need an mb(). */
> >>>+				/* Only corresponding CPU can update. */
> >>>+};
> >>>+static DEFINE_PER_CPU(enum rcu_mb_flag_values, rcu_mb_flag) = 
> >>>rcu_mb_done;
> >>>
> >>>/*
> >>>* Return the number of RCU batches processed thus far.  Useful
> >>>@@ -105,93 +158,182 @@ long rcu_batches_completed(void)
> >>>void
> >>>rcu_read_lock(void)
> >>>{
> >>>-	int flipctr;
> >>>+	int idx;
> >>>+	int nesting;
> >>>	unsigned long oldirq;
> >>>
> >>>-	local_irq_save(oldirq);
> >>>-
> >>>+	raw_local_irq_save(oldirq);
> >>>	trace_special(current->rcu_read_lock_nesting,
> >>>		      (unsigned long) current->rcu_flipctr1,
> >>>		      rcu_ctrlblk.completed);
> >>>-	if (current->rcu_read_lock_nesting++ == 0) {
> >>>+
> >>>+	nesting = current->rcu_read_lock_nesting;
> >>>+
> >>>+	/*
> >>>+	 * Any rcu_read_lock()s called from NMI handlers
> >>>+	 * at any point must have matching rcu_read_unlock()
> >>>+	 * calls in that same handler, so we will not see the
> >>>+	 * value of current->rcu_read_lock_nesting change.
> >>>+	 */
> >>>+
> >>>+	if (nesting != 0) {
> >>>
> >>>		/*
> >>>-		 * Outermost nesting of rcu_read_lock(), so atomically
> >>>+		 * There is an enclosing rcu_read_lock(), so all we
> >>>+		 * need to do is to increment the counter.
> >>>+		 */
> >>>+
> >>>+		current->rcu_read_lock_nesting = nesting + 1;
> >>>+
> >>>+	} else {
> >>>+
> >>>+		/*
> >>>+		 * Outermost nesting of rcu_read_lock(), so we must
> >>>		 * increment the current counter for the current CPU.
> >>>+		 * This must be done carefully, because NMIs can
> >>>+		 * occur at any point in this code, and any rcu_read_lock()
> >>>+		 * and rcu_read_unlock() pairs in the NMI handlers
> >>>+		 * must interact non-destructively with this code.
> >>>+		 * Lots of barrier() calls, and -very- careful ordering.
> >>>+		 *
> >>>+		 * Changes to this code, including this one, must be
> >>>+		 * inspected, validated, and tested extremely carefully!!!
> >>>		 */
> >>>
> >>>-		flipctr = rcu_ctrlblk.completed & 0x1;
> >>>+		/*
> >>>+		 * First, pick up the index.  Enforce ordering for
> >>>+		 * both compilers and for DEC Alpha.
> >>>+		 */
> >>>+
> >>>+		idx = rcu_ctrlblk.completed & 0x1;
> >>>		smp_read_barrier_depends();
> >>>-		current->rcu_flipctr1 =
> >>>&(__get_cpu_var(rcu_flipctr)[flipctr]);
> >>>-		/* Can optimize to non-atomic on fastpath, but start simple.
> >>>*/
> >>>-		atomic_inc(current->rcu_flipctr1);
> >>>-		smp_mb__after_atomic_inc();  /* might optimize out... */
> >>>-		if (unlikely(flipctr != (rcu_ctrlblk.completed & 0x1))) {
> >>>-
> >>>-			/*
> >>>-			 * We raced with grace-period processing (flip).
> >>>-			 * Although we cannot be preempted here, there
> >>>-			 * could be interrupts, ECC errors and the like,
> >>>-			 * so just nail down both sides of the rcu_flipctr
> >>>-			 * array for the duration of our RCU read-side
> >>>-			 * critical section, preventing a second flip
> >>>-			 * from racing with us.  At some point, it would
> >>>-			 * be safe to decrement one of the counters, but
> >>>-			 * we have no way of knowing when that would be.
> >>>-			 * So just decrement them both in rcu_read_unlock().
> >>>-			 */
> >>>-
> >>>-			current->rcu_flipctr2 =
> >>>-				&(__get_cpu_var(rcu_flipctr)[!flipctr]);
> >>>-			/* Can again optimize to non-atomic on fastpath. */
> >>>-			atomic_inc(current->rcu_flipctr2);
> >>>-			smp_mb__after_atomic_inc();  /* might optimize
> >>>out... */
> >>>-		}
> >>>+		barrier();
> >>>+
> >>>+		/*
> >>>+		 * Increment the per-CPU counter. NMI handlers
> >>>+		 * might increment it as well, but they had better
> >>>+		 * properly nest their rcu_read_lock()/rcu_read_unlock()
> >>>+		 * pairs so that the value is restored before the handler
> >>>+		 * returns to us.  Enforce ordering for compilers.
> >>>+		 */
> >>>+
> >>>+		__get_cpu_var(rcu_flipctr)[idx]++;
> >>>+		barrier();
> >>>+
> >>>+		/*
> >>>+		 * It is now safe to increment the task's nesting count.
> >>>+		 * NMIs that occur after this statement will route
> >>>+		 * their rcu_read_lock() calls through the "then" clause
> >>>+		 * of this "if" statement, and thus will no longer come
> >>>+		 * through this path.  Enforce ordering for compilers.
> >>>+		 */
> >>>+
> >>>+		current->rcu_read_lock_nesting = nesting + 1;
> >>>+		barrier();
> >>>+
> >>>+		/*
> >>>+		 * It is now safe to store the index into our task
> >>>+		 * structure.  Doing so earlier would have resulted
> >>>+		 * in it getting clobbered by NMI handlers.
> >>>+		 */
> >>>+
> >>>+		current->rcu_flipctr_idx = idx;
> >>>	}
> >>>+
> >>>	trace_special((unsigned long) current->rcu_flipctr1,
> >>>		      (unsigned long) current->rcu_flipctr2,
> >>>		      rcu_ctrlblk.completed);
> >>>-	local_irq_restore(oldirq);
> >>>+	raw_local_irq_restore(oldirq);
> >>>}
> >>>
> >>>void
> >>>rcu_read_unlock(void)
> >>>{
> >>>+	int idx;
> >>>+	int nesting;
> >>>	unsigned long oldirq;
> >>>
> >>>-	local_irq_save(oldirq);
> >>>+	raw_local_irq_save(oldirq);
> >>>	trace_special((unsigned long) current->rcu_flipctr1,
> >>>		      (unsigned long) current->rcu_flipctr2,
> >>>		      current->rcu_read_lock_nesting);
> >>>-	if (--current->rcu_read_lock_nesting == 0) {
> >>>+
> >>>+	nesting = current->rcu_read_lock_nesting;
> >>>+
> >>>+	/*
> >>>+	 * Any rcu_read_lock()s called from NMI handlers
> >>>+	 * at any point must have matching rcu_read_unlock()
> >>>+	 * calls in that same handler, so we will not see the
> >>>+	 * value of current->rcu_read_lock_nesting change.
> >>>+	 */
> >>>+
> >>>+	if (nesting > 1) {
> >>>
> >>>		/*
> >>>-		 * Just atomically decrement whatever we incremented.
> >>>-		 * Might later want to awaken some task waiting for the
> >>>-		 * grace period to complete, but keep it simple for the
> >>>-		 * moment.
> >>>+		 * There is an enclosing rcu_read_lock(), so all we
> >>>+		 * need to do is to decrement the counter.
> >>>		 */
> >>>
> >>>-		smp_mb__before_atomic_dec();
> >>>-		atomic_dec(current->rcu_flipctr1);
> >>>-		current->rcu_flipctr1 = NULL;
> >>>-		if (unlikely(current->rcu_flipctr2 != NULL)) {
> >>>-			atomic_dec(current->rcu_flipctr2);
> >>>-			current->rcu_flipctr2 = NULL;
> >>>-		}
> >>>+		current->rcu_read_lock_nesting = nesting - 1;
> >>>+
> >>>+	} else {
> >>>+
> >>>+		/*
> >>>+		 * Outermost nesting of rcu_read_unlock(), so we must
> >>>+		 * decrement the current counter for the current CPU.
> >>>+		 * This must be done carefully, because NMIs can
> >>>+		 * occur at any point in this code, and any rcu_read_lock()
> >>>+		 * and rcu_read_unlock() pairs in the NMI handlers
> >>>+		 * must interact non-destructively with this code.
> >>>+		 * Lots of barrier() calls, and -very- careful ordering.
> >>>+		 *
> >>>+		 * Changes to this code, including this one, must be
> >>>+		 * inspected, validated, and tested extremely carefully!!!
> >>>+		 */
> >>>+
> >>>+		/*
> >>>+		 * First, pick up the index.  Enforce ordering for
> >>>+		 * both compilers and for DEC Alpha.
> >>>+		 */
> >>>+
> >>>+		idx = current->rcu_flipctr_idx;
> >>>+		smp_read_barrier_depends();
> >>>+		barrier();
> >>>+
> >>>+		/*
> >>>+		 * It is now safe to decrement the task's nesting count.
> >>>+		 * NMIs that occur after this statement will route
> >>>+		 * their rcu_read_lock() calls through this "else" clause
> >>>+		 * of this "if" statement, and thus will start incrementing
> >>>+		 * the per-CPU counter on their own.  Enforce ordering for
> >>>+		 * compilers.
> >>>+		 */
> >>>+
> >>>+		current->rcu_read_lock_nesting = nesting - 1;
> >>>+		barrier();
> >>>+
> >>>+		/*
> >>>+		 * Decrement the per-CPU counter. NMI handlers
> >>>+		 * might increment it as well, but they had better
> >>>+		 * properly nest their rcu_read_lock()/rcu_read_unlock()
> >>>+		 * pairs so that the value is restored before the handler
> >>>+		 * returns to us.
> >>>+		 */
> >>>+
> >>>+		__get_cpu_var(rcu_flipctr)[idx]--;
> >>>	}
> >>>
> >>>	trace_special((unsigned long)current->rcu_flipctr1,
> >>>		      (unsigned long) current->rcu_flipctr2,
> >>>		      current->rcu_read_lock_nesting);
> >>>-	local_irq_restore(oldirq);
> >>>+	raw_local_irq_restore(oldirq);
> >>>}
> >>>
> >>>static void
> >>>__rcu_advance_callbacks(void)
> >>>{
> >>>
> >>>-	if (rcu_data.completed != rcu_ctrlblk.completed) {
> >>>+	if ((rcu_data.completed >> 1) != (rcu_ctrlblk.completed >> 1)) {
> >>>		if (rcu_data.waitlist != NULL) {
> >>>			*rcu_data.donetail = rcu_data.waitlist;
> >>>			rcu_data.donetail = rcu_data.waittail;
> >>>@@ -216,13 +358,186 @@ __rcu_advance_callbacks(void)
> >>>			rcu_data.waittail = &rcu_data.waitlist;
> >>>		}
> >>>		rcu_data.completed = rcu_ctrlblk.completed;
> >>>+	} else if (rcu_data.completed != rcu_ctrlblk.completed) {
> >>>+		rcu_data.completed = rcu_ctrlblk.completed;
> >>>+	}
> >>>+}
> >>>+
> >>>+/*
> >>>+ * Get here when RCU is idle.  Decide whether we need to
> >>>+ * move out of idle state, and return zero if so.
> >>>+ * "Straightforward" approach for the moment, might later
> >>>+ * use callback-list lengths, grace-period duration, or
> >>>+ * some such to determine when to exit idle state.
> >>>+ * Might also need a pre-idle test that does not acquire
> >>>+ * the lock, but let's get the simple case working first...
> >>>+ */
> >>>+
> >>>+static int
> >>>+rcu_try_flip_idle(int flipctr)
> >>>+{
> >>>+#ifdef CONFIG_RCU_STATS
> >>>+	rcu_data.n_rcu_try_flip_i1++;
> >>>+#endif /* #ifdef CONFIG_RCU_STATS */
> >>>+
> >>>+	if (!rcu_pending(smp_processor_id())) {
> >>>+#ifdef CONFIG_RCU_STATS
> >>>+		rcu_data.n_rcu_try_flip_ie1++;
> >>>+#endif /* #ifdef CONFIG_RCU_STATS */
> >>>+		return 1;
> >>>	}
> >>>+	return 0;
> >>>+}
> >>>+
> >>>+/*
> >>>+ * Flip processing up to and including the flip, as well as
> >>>+ * telling CPUs to acknowledge the flip.
> >>>+ */
> >>>+
> >>>+static int
> >>>+rcu_try_flip_in_gp(int flipctr)
> >>>+{
> >>>+	int cpu;
> >>>+
> >>>+#ifdef CONFIG_RCU_STATS
> >>>+	rcu_data.n_rcu_try_flip_g1++;
> >>>+#endif /* #ifdef CONFIG_RCU_STATS */
> >>>+
> >>>+	/*
> >>>+	 * Do the flip.
> >>>+	 */
> >>>+
> >>>+	rcu_ctrlblk.completed++;  /* stands in for rcu_try_flip_g2 */
> >>>+
> >>>+	/*
> >>>+	 * Need a memory barrier so that other CPUs see the new
> >>>+	 * counter value before they see the subsequent change of all
> >>>+	 * the rcu_flip_flag instances to rcu_flipped.
> >>>+	 */
> >>>+
> >>>+	smp_mb();
> >>>+
> >>>+	/* Now ask each CPU for acknowledgement of the flip. */
> >>>+
> >>>+	for_each_cpu(cpu)
> >>>+		per_cpu(rcu_flip_flag, cpu) = rcu_flipped;
> >>>+
> >>>+	return 0;
> >>>+}
> >>>+
> >>>+/*
> >>>+ * Wait for CPUs to acknowledge the flip.
> >>>+ */
> >>>+
> >>>+static int
> >>>+rcu_try_flip_waitack(int flipctr)
> >>>+{
> >>>+	int cpu;
> >>>+
> >>>+#ifdef CONFIG_RCU_STATS
> >>>+	rcu_data.n_rcu_try_flip_a1++;
> >>>+#endif /* #ifdef CONFIG_RCU_STATS */
> >>>+
> >>>+	for_each_cpu(cpu)
> >>>+		if (per_cpu(rcu_flip_flag, cpu) != rcu_flip_seen) {
> >>>+#ifdef CONFIG_RCU_STATS
> >>>+			rcu_data.n_rcu_try_flip_ae1++;
> >>>+#endif /* #ifdef CONFIG_RCU_STATS */
> >>>+			return 1;
> >>>+		}
> >>>+
> >>>+	/*
> >>>+	 * Make sure our checks above don't bleed into subsequent
> >>>+	 * waiting for the sum of the counters to reach zero.
> >>>+	 */
> >>>+
> >>>+	smp_mb();
> >>>+
> >>>+#ifdef CONFIG_RCU_STATS
> >>>+	rcu_data.n_rcu_try_flip_a2++;
> >>>+#endif /* #ifdef CONFIG_RCU_STATS */
> >>>+
> >>>+	return 0;
> >>>+}
> >>>+
> >>>+/*
> >>>+ * Wait for collective ``last'' counter to reach zero,
> >>>+ * then tell all CPUs to do an end-of-grace-period memory barrier.
> >>>+ */
> >>>+
> >>>+static int
> >>>+rcu_try_flip_waitzero(int flipctr)
> >>>+{
> >>>+	int cpu;
> >>>+	int lastidx = !(flipctr & 0x1);
> >>>+	int sum = 0;
> >>>+
> >>>+#ifdef CONFIG_RCU_STATS
> >>>+	rcu_data.n_rcu_try_flip_z1++;
> >>>+#endif /* #ifdef CONFIG_RCU_STATS */
> >>>+
> >>>+	/* Check to see if the sum of the "last" counters is zero. */
> >>>+
> >>>+	for_each_cpu(cpu)
> >>>+		sum += per_cpu(rcu_flipctr, cpu)[lastidx];
> >>>+	if (sum != 0) {
> >>>+#ifdef CONFIG_RCU_STATS
> >>>+		rcu_data.n_rcu_try_flip_ze1++;
> >>>+#endif /* #ifdef CONFIG_RCU_STATS */
> >>>+		return 1;
> >>>+	}
> >>>+
> >>>+#ifdef CONFIG_RCU_STATS
> >>>+	rcu_data.n_rcu_try_flip_z2++;
> >>>+#endif /* #ifdef CONFIG_RCU_STATS */
> >>>+
> >>>+	/* Make sure we don't call for memory barriers before we see zero. */
> >>>+
> >>>+	smp_mb();
> >>>+
> >>>+	/* Call for a memory barrier from each CPU. */
> >>>+
> >>>+	for_each_cpu(cpu)
> >>>+		per_cpu(rcu_mb_flag, cpu) = rcu_mb_needed;
> >>>+
> >>>+	return 0;
> >>>+}
> >>>+
> >>>+/*
> >>>+ * Wait for all CPUs to do their end-of-grace-period memory barrier.
> >>>+ * Return 0 once all CPUs have done so.
> >>>+ */
> >>>+
> >>>+static int
> >>>+rcu_try_flip_waitmb(int flipctr)
> >>>+{
> >>>+	int cpu;
> >>>+
> >>>+#ifdef CONFIG_RCU_STATS
> >>>+	rcu_data.n_rcu_try_flip_m1++;
> >>>+#endif /* #ifdef CONFIG_RCU_STATS */
> >>>+
> >>>+	for_each_cpu(cpu)
> >>>+		if (per_cpu(rcu_mb_flag, cpu) != rcu_mb_done) {
> >>>+#ifdef CONFIG_RCU_STATS
> >>>+			rcu_data.n_rcu_try_flip_me1++;
> >>>+#endif /* #ifdef CONFIG_RCU_STATS */
> >>>+			return 1;
> >>>+		}
> >>>+
> >>>+	smp_mb(); /* Ensure that the above checks precede any following
> >>>flip. */
> >>>+
> >>>+#ifdef CONFIG_RCU_STATS
> >>>+	rcu_data.n_rcu_try_flip_m2++;
> >>>+#endif /* #ifdef CONFIG_RCU_STATS */
> >>>+
> >>>+	return 0;
> >>>}
> >>>
> >>>/*
> >>>* Attempt a single flip of the counters.  Remember, a single flip does
> >>>* -not- constitute a grace period.  Instead, the interval between
> >>>- * a pair of consecutive flips is a grace period.
> >>>+ * at least three consecutive flips is a grace period.
> >>>*
> >>>* If anyone is nuts enough to run this CONFIG_PREEMPT_RCU implementation
> >>>* on a large SMP, they might want to use a hierarchical organization of
> >>>@@ -231,13 +546,11 @@ __rcu_advance_callbacks(void)
> >>>static void
> >>>rcu_try_flip(void)
> >>>{
> >>>-	int cpu;
> >>>	long flipctr;
> >>>	unsigned long oldirq;
> >>>
> >>>-	flipctr = rcu_ctrlblk.completed;
> >>>#ifdef CONFIG_RCU_STATS
> >>>-	atomic_inc(&rcu_data.n_rcu_try_flip1);
> >>>+	atomic_inc(&rcu_data.n_rcu_try_flip_1);
> >>>#endif /* #ifdef CONFIG_RCU_STATS */
> >>>	if (unlikely(!spin_trylock_irqsave(&rcu_ctrlblk.fliplock, oldirq))) {
> >>>#ifdef CONFIG_RCU_STATS
> >>>@@ -245,52 +558,82 @@ rcu_try_flip(void)
> >>>#endif /* #ifdef CONFIG_RCU_STATS */
> >>>		return;
> >>>	}
> >>>-	if (unlikely(flipctr != rcu_ctrlblk.completed)) {
> >>>-
> >>>-		/* Our work is done!  ;-) */
> >>>-
> >>>-#ifdef CONFIG_RCU_STATS
> >>>-		rcu_data.n_rcu_try_flip_e2++;
> >>>-#endif /* #ifdef CONFIG_RCU_STATS */
> >>>-		spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
> >>>-		return;
> >>>-	}
> >>>-	flipctr &= 0x1;
> >>>
> >>>	/*
> >>>-	 * Check for completion of all RCU read-side critical sections
> >>>-	 * that started prior to the previous flip.
> >>>+	 * Take the next transition(s) through the RCU grace-period
> >>>+	 * flip-counter state machine.
> >>>	 */
> >>>
> >>>-#ifdef CONFIG_RCU_STATS
> >>>-	rcu_data.n_rcu_try_flip2++;
> >>>-#endif /* #ifdef CONFIG_RCU_STATS */
> >>>-	for_each_cpu(cpu) {
> >>>-		if (atomic_read(&per_cpu(rcu_flipctr, cpu)[!flipctr]) != 0) {
> >>>-#ifdef CONFIG_RCU_STATS
> >>>-			rcu_data.n_rcu_try_flip_e3++;
> >>>-#endif /* #ifdef CONFIG_RCU_STATS */
> >>>-			spin_unlock_irqrestore(&rcu_ctrlblk.fliplock,
> >>>oldirq);
> >>>-			return;
> >>>-		}
> >>>+	flipctr = rcu_ctrlblk.completed;
> >>>+	switch (rcu_try_flip_state) {
> >>>+	case rcu_try_flip_idle_state:
> >>>+		if (rcu_try_flip_idle(flipctr))
> >>>+			break;
> >>>+		rcu_try_flip_state = rcu_try_flip_in_gp_state;
> >>>+	case rcu_try_flip_in_gp_state:
> >>>+		if (rcu_try_flip_in_gp(flipctr))
> >>>+			break;
> >>>+		rcu_try_flip_state = rcu_try_flip_waitack_state;
> >>>+	case rcu_try_flip_waitack_state:
> >>>+		if (rcu_try_flip_waitack(flipctr))
> >>>+			break;
> >>>+		rcu_try_flip_state = rcu_try_flip_waitzero_state;
> >>>+	case rcu_try_flip_waitzero_state:
> >>>+		if (rcu_try_flip_waitzero(flipctr))
> >>>+			break;
> >>>+		rcu_try_flip_state = rcu_try_flip_waitmb_state;
> >>>+	case rcu_try_flip_waitmb_state:
> >>>+		if (rcu_try_flip_waitmb(flipctr))
> >>>+			break;
> >>>+		rcu_try_flip_state = rcu_try_flip_idle_state;
> >>>	}
> >>>+	spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
> >>>+}
> >>>
> >>>-	/* Do the flip. */
> >>>+/*
> >>>+ * Check to see if this CPU needs to report that it has seen the most
> >>>+ * recent counter flip, thereby declaring that all subsequent
> >>>+ * rcu_read_lock() invocations will respect this flip.
> >>>+ */
> >>>
> >>>-	smp_mb();
> >>>-	rcu_ctrlblk.completed++;
> >>>+static void
> >>>+rcu_check_flipseen(int cpu)
> >>>+{
> >>>+	if (per_cpu(rcu_flip_flag, cpu) == rcu_flipped) {
> >>>+		smp_mb();  /* Subsequent counter acccesses must see new
> >>>value */
> >>>+		per_cpu(rcu_flip_flag, cpu) = rcu_flip_seen;
> >>>+		smp_mb();  /* probably be implied by interrupt, but... */
> >>>+	}
> >>>+}
> >>>
> >>>-#ifdef CONFIG_RCU_STATS
> >>>-	rcu_data.n_rcu_try_flip3++;
> >>>-#endif /* #ifdef CONFIG_RCU_STATS */
> >>>-	spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
> >>>+/*
> >>>+ * Check to see if this CPU needs to do a memory barrier in order to
> >>>+ * ensure that any prior RCU read-side critical sections have committed
> >>>+ * their counter manipulations and critical-section memory references
> >>>+ * before declaring the grace period to be completed.
> >>>+ */
> >>>+
> >>>+static void
> >>>+rcu_check_mb(int cpu)
> >>>+{
> >>>+	if (per_cpu(rcu_mb_flag, cpu) == rcu_mb_needed) {
> >>>+		smp_mb();
> >>>+		per_cpu(rcu_mb_flag, cpu) = rcu_mb_done;
> >>>+	}
> >>>}
> >>>
> >>>+/*
> >>>+ * This function is called periodically on each CPU in hardware 
> >>>interrupt
> >>>+ * context.
> >>>+ */
> >>>+
> >>>void
> >>>rcu_check_callbacks(int cpu, int user)
> >>>{
> >>>	unsigned long oldirq;
> >>>
> >>>+	rcu_check_flipseen(cpu);
> >>>+	rcu_check_mb(cpu);
> >>>	if (rcu_ctrlblk.completed == rcu_data.completed) {
> >>>		rcu_try_flip();
> >>>		if (rcu_ctrlblk.completed == rcu_data.completed) {
> >>>@@ -359,10 +702,10 @@ call_rcu(struct rcu_head *head,
> >>>}
> >>>
> >>>/*
> >>>- * Crude hack, reduces but does not eliminate possibility of failure.
> >>>- * Needs to wait for all CPUs to pass through a -voluntary- context
> >>>- * switch to eliminate possibility of failure.  (Maybe just crank
> >>>- * priority down...)
> >>>+ * Wait until all currently running preempt_disable() code segments
> >>>+ * (including hardware-irq-disable segments) complete.  Note that
> >>>+ * in -rt this does -not- necessarily result in all currently executing
> >>>+ * interrupt -handlers- having completed.
> >>>*/
> >>>void
> >>>synchronize_sched(void)
> >>>@@ -390,7 +733,7 @@ rcu_pending(int cpu)
> >>>
> >>>void __init rcu_init(void)
> >>>{
> >>>-/*&&&&*/printk("WARNING: experimental RCU implementation.\n");
> >>>+/*&&&&*/printk("WARNING: experimental non-atomic RCU 
> >>>implementation.\n");
> >>>	spin_lock_init(&rcu_data.lock);
> >>>	rcu_data.completed = 0;
> >>>	rcu_data.nextlist = NULL;
> >>>@@ -416,7 +759,8 @@ int rcu_read_proc_data(char *page)
> >>>	return sprintf(page,
> >>>		       "ggp=%ld lgp=%ld rcc=%ld\n"
> >>>		       "na=%ld nl=%ld wa=%ld wl=%ld da=%ld dl=%ld dr=%ld
> >>>		       di=%d\n"
> >>>-		       "rtf1=%d rtf2=%ld rtf3=%ld rtfe1=%d rtfe2=%ld
> >>>rtfe3=%ld\n",
> >>>+		       "1=%d e1=%d i1=%ld ie1=%ld g1=%ld a1=%ld ae1=%ld
> >>>a2=%ld\n"
> >>>+		       "z1=%ld ze1=%ld z2=%ld m1=%ld me1=%ld m2=%ld\n",
> >>>
> >>>		       rcu_ctrlblk.completed,
> >>>		       rcu_data.completed,
> >>>@@ -431,12 +775,20 @@ int rcu_read_proc_data(char *page)
> >>>		       rcu_data.n_done_remove,
> >>>		       atomic_read(&rcu_data.n_done_invoked),
> >>>
> >>>-		       atomic_read(&rcu_data.n_rcu_try_flip1),
> >>>-		       rcu_data.n_rcu_try_flip2,
> >>>-		       rcu_data.n_rcu_try_flip3,
> >>>+		       atomic_read(&rcu_data.n_rcu_try_flip_1),
> >>>		       atomic_read(&rcu_data.n_rcu_try_flip_e1),
> >>>-		       rcu_data.n_rcu_try_flip_e2,
> >>>-		       rcu_data.n_rcu_try_flip_e3);
> >>>+		       rcu_data.n_rcu_try_flip_i1,
> >>>+		       rcu_data.n_rcu_try_flip_ie1,
> >>>+		       rcu_data.n_rcu_try_flip_g1,
> >>>+		       rcu_data.n_rcu_try_flip_a1,
> >>>+		       rcu_data.n_rcu_try_flip_ae1,
> >>>+		       rcu_data.n_rcu_try_flip_a2,
> >>>+		       rcu_data.n_rcu_try_flip_z1,
> >>>+		       rcu_data.n_rcu_try_flip_ze1,
> >>>+		       rcu_data.n_rcu_try_flip_z2,
> >>>+		       rcu_data.n_rcu_try_flip_m1,
> >>>+		       rcu_data.n_rcu_try_flip_me1,
> >>>+		       rcu_data.n_rcu_try_flip_m2);
> >>>}
> >>>
> >>>int rcu_read_proc_gp_data(char *page)
> >>>@@ -464,14 +816,23 @@ int rcu_read_proc_ctrs_data(char *page)
> >>>	int cpu;
> >>>	int f = rcu_data.completed & 0x1;
> >>>
> >>>-	cnt += sprintf(&page[cnt], "CPU last cur\n");
> >>>+	cnt += sprintf(&page[cnt], "CPU last cur F M\n");
> >>>	for_each_cpu(cpu) {
> >>>-		cnt += sprintf(&page[cnt], "%3d %4d %3d\n",
> >>>+		cnt += sprintf(&page[cnt], "%3d %4d %3d %d %d\n",
> >>>			       cpu,
> >>>-			       atomic_read(&per_cpu(rcu_flipctr, cpu)[!f]),
> >>>-			       atomic_read(&per_cpu(rcu_flipctr, cpu)[f]));
> >>>-	}
> >>>-	cnt += sprintf(&page[cnt], "ggp = %ld\n", rcu_data.completed);
> >>>+			       per_cpu(rcu_flipctr, cpu)[!f],
> >>>+			       per_cpu(rcu_flipctr, cpu)[f],
> >>>+			       per_cpu(rcu_flip_flag, cpu),
> >>>+			       per_cpu(rcu_mb_flag, cpu));
> >>>+	}
> >>>+	cnt += sprintf(&page[cnt], "ggp = %ld, state = %d",
> >>>+		       rcu_data.completed, rcu_try_flip_state);
> >>>+	if ((0 <= rcu_try_flip_state) &&
> >>>+	    (rcu_try_flip_state <= sizeof(rcu_try_flip_state_names) /
> >>>+	    			   sizeof(rcu_try_flip_state_names[0])))
> >>>+		cnt += sprintf(&page[cnt], " (%s)",
> >>>+			       rcu_try_flip_state_names[rcu_try_flip_state]);
> >>>+	cnt += sprintf(&page[cnt], "\n");
> >>>	return (cnt);
> >>>}
> >>>
> >>>
> >

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [RFC, PATCH -rt] NMI-safe mb- and atomic-free RT RCU
  2006-07-27 15:46       ` Paul E. McKenney
@ 2006-07-27 19:53         ` Bill Huey
  2006-07-28  0:02           ` Paul E. McKenney
  2006-07-28  0:22         ` Esben Nielsen
  1 sibling, 1 reply; 13+ messages in thread
From: Bill Huey @ 2006-07-27 19:53 UTC (permalink / raw)
  To: Paul E. McKenney
  Cc: Esben Nielsen, linux-kernel, compudj, rostedt, tglx, mingo,
	dipankar, rusty, Bill Huey (hui)

On Thu, Jul 27, 2006 at 08:46:37AM -0700, Paul E. McKenney wrote:
> On Thu, Jul 27, 2006 at 12:00:13PM +0100, Esben Nielsen wrote:
> > No, RT tasks can still preempt the RCU read side lock. But SCHED_OTHER and 
> > SCHED_BATCH can't. You can also the RCU read side boosting prioritiy 
> > dynamic and let the system adjust it or just let the admin adjust it.
> 
> Fair enough -- I misread MAX_RT_PRIO as MAX_PRIO.
> 
> This approach I can get behind -- my thought has been to boost to
> either MAX_RT_PRIO or MAX_RT_PRIO-1 when preempt_schedule() sees that
> it is preempting an RCU read-side critical section.
> 
> So I agree with you on at least one point!  ;-)

This is the approach that I suggested to you, Paul, at OLS after your talk.
Again, if you go about this path then you might think about extending the
scheduler to have an additional parameter regarding a preemption threshold
instead of doing this stuff indirectly with priority manipulations like
the above. It was something that I was considering when I was doing my
Linux kernel preemption project to fix the problems with RCU-ed read side
thread migrating to another CPU.

If folks go down this discussion track, it's going to open a can of
scheduling worms with possiblities (threshold, priority, irq-thread
priority, global run queue for SCHED_FIFO tasks) pushing into global run
queue logic stuff. It's a bit spooky for the Linux kernel. Some of the
thread migration pinning stuff with per CPU locks was rejected by Linus
way back.

> A possible elaboration would be to keep a linked list of tasks preempted
> in their RCU read-side critical sections so that they can be further
> boosted to the highest possible priority (numerical value of zero,
> not sure what the proper symbol is) if the grace period takes too many
> jiffies to complete.  Another piece is priority boosting when blocking
> on a mutex from within an RCU read-side critical section.

I'm not sure how folks feel about putting something like that in the
scheduler path since it's such a specialized cases. Some of the scheduler
folks might come out against this.

> Doing it efficiently is the difficulty, particularly for tickless-idle
> systems where CPUs need to be added and removed on a regular basis.
> Also, what locking design would you use in order to avoid deadlock?
> There is a hotplug mutex, but seems like you might need to acquire some
> number of rq mutexes as well.

I'm not understanding what you exactly mean by tickless idle systems.

Are you talking about isolating a CPU for SCHED_FIFO and friends tasks
only as in the CPU reservation stuff but with ticks off in many proprietary
RTOSes ? Don't mean to be tangental here, I just need clarification.

> Another approach I am looking at does not permit rcu_read_lock() in
> NMI/SMI/hardirq, but is much simpler.  Its downside is that it cannot
> serve as common code between CONFIG_PREEMPT_RT and CONFIG_PREEMPT.

bill


^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [RFC, PATCH -rt] NMI-safe mb- and atomic-free RT RCU
  2006-07-27 19:53         ` Bill Huey
@ 2006-07-28  0:02           ` Paul E. McKenney
  2006-07-28  0:48             ` Bill Huey
  0 siblings, 1 reply; 13+ messages in thread
From: Paul E. McKenney @ 2006-07-28  0:02 UTC (permalink / raw)
  To: Bill Huey
  Cc: Esben Nielsen, linux-kernel, compudj, rostedt, tglx, mingo,
	dipankar, rusty

On Thu, Jul 27, 2006 at 12:53:56PM -0700, Bill Huey wrote:
> On Thu, Jul 27, 2006 at 08:46:37AM -0700, Paul E. McKenney wrote:
> > On Thu, Jul 27, 2006 at 12:00:13PM +0100, Esben Nielsen wrote:
> > > No, RT tasks can still preempt the RCU read side lock. But SCHED_OTHER and 
> > > SCHED_BATCH can't. You can also the RCU read side boosting prioritiy 
> > > dynamic and let the system adjust it or just let the admin adjust it.
> > 
> > Fair enough -- I misread MAX_RT_PRIO as MAX_PRIO.
> > 
> > This approach I can get behind -- my thought has been to boost to
> > either MAX_RT_PRIO or MAX_RT_PRIO-1 when preempt_schedule() sees that
> > it is preempting an RCU read-side critical section.
> > 
> > So I agree with you on at least one point!  ;-)
> 
> This is the approach that I suggested to you, Paul, at OLS after your talk.

Yep.  I have been messing with it for some time, see the RCUboost*
patches in http://www.rdrop.com/users/paulmck/patches/ if you wish
to see a large number of subtly broken ways of approaching this.  ;-)

> Again, if you go about this path then you might think about extending the
> scheduler to have an additional parameter regarding a preemption threshold
> instead of doing this stuff indirectly with priority manipulations like
> the above. It was something that I was considering when I was doing my
> Linux kernel preemption project to fix the problems with RCU-ed read side
> thread migrating to another CPU.
>
> If folks go down this discussion track, it's going to open a can of
> scheduling worms with possiblities (threshold, priority, irq-thread
> priority, global run queue for SCHED_FIFO tasks) pushing into global run
> queue logic stuff. It's a bit spooky for the Linux kernel. Some of the
> thread migration pinning stuff with per CPU locks was rejected by Linus
> way back.

I have been dealing with worms enough for me without the preemption
threshold.  I agree that it might well be useful, but I am getting
my full quota of worms and spookiness with my current approach.  ;-)

> > A possible elaboration would be to keep a linked list of tasks preempted
> > in their RCU read-side critical sections so that they can be further
> > boosted to the highest possible priority (numerical value of zero,
> > not sure what the proper symbol is) if the grace period takes too many
> > jiffies to complete.  Another piece is priority boosting when blocking
> > on a mutex from within an RCU read-side critical section.
> 
> I'm not sure how folks feel about putting something like that in the
> scheduler path since it's such a specialized cases. Some of the scheduler
> folks might come out against this.

They might well.  And the resulting discussion might reveal a better
way.  Or it might well turn out that the simple approach of boosting
to an intermediate level without the list will suffice.

> > Doing it efficiently is the difficulty, particularly for tickless-idle
> > systems where CPUs need to be added and removed on a regular basis.
> > Also, what locking design would you use in order to avoid deadlock?
> > There is a hotplug mutex, but seems like you might need to acquire some
> > number of rq mutexes as well.
> 
> I'm not understanding what you exactly mean by tickless idle systems.

Some architectures power CPUs down (including stopping the scheduling
clock interrupt) when the CPU goes idle for a sufficient time.  The
CPU has sort of been hotplug-removed, but not quite.  In any case, once
the CPU is in that state, RCU needs to count it out of the grace-period
detection.

One difference between the tickless idle and the hot-unplug state is that
a CPU can (and does) enter and exit tickless idle state quite frequently.

> Are you talking about isolating a CPU for SCHED_FIFO and friends tasks
> only as in the CPU reservation stuff but with ticks off in many proprietary
> RTOSes ? Don't mean to be tangental here, I just need clarification.

No.  Although that was something I was messing with a couple of years back,
I am now leaving that to others.

							Thanx, Paul

> > Another approach I am looking at does not permit rcu_read_lock() in
> > NMI/SMI/hardirq, but is much simpler.  Its downside is that it cannot
> > serve as common code between CONFIG_PREEMPT_RT and CONFIG_PREEMPT.
> 
> bill
> 

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [RFC, PATCH -rt] NMI-safe mb- and atomic-free RT RCU
  2006-07-27 15:46       ` Paul E. McKenney
  2006-07-27 19:53         ` Bill Huey
@ 2006-07-28  0:22         ` Esben Nielsen
  2006-07-28  0:47           ` Paul E. McKenney
  1 sibling, 1 reply; 13+ messages in thread
From: Esben Nielsen @ 2006-07-28  0:22 UTC (permalink / raw)
  To: Paul E. McKenney
  Cc: Esben Nielsen, linux-kernel, compudj, billh, rostedt, tglx, mingo,
	dipankar, rusty



On Thu, 27 Jul 2006, Paul E. McKenney wrote:

> On Thu, Jul 27, 2006 at 12:00:13PM +0100, Esben Nielsen wrote:
>> Hi Paul,
>>  Thanks, for entering a discussion of my idea! Even though you are busy
>> and are critical towards my idea, you take your time to answer! Thanks.
>
> I appreciate the time you took to write down your ideas, and for your
> taking my comments in the spirit intended.

Well, I just think it is fun :-)

>
>> On Wed, 26 Jul 2006, Paul E. McKenney wrote:
>>
>>> On Thu, Jul 27, 2006 at 02:39:07AM +0100, Esben Nielsen wrote:
>>>>
>>>>
>>>> On Tue, 25 Jul 2006, Paul E. McKenney wrote:
>>>>
>>>>> Not for inclusion, should be viewed with great suspicion.
>>>>>
>>>>> This patch provides an NMI-safe realtime RCU.  Hopefully, Mathieu can
>>>>> make use of synchronize_sched() instead, since I have not yet figured
>>>>> out a way to make this NMI-safe and still get rid of the interrupt
>>>>> disabling.  ;-)
>>>>>
>>>>> 						Thanx, Paul
>>>>
>>>> I must say I don't understand all this. It looks very complicated. Is it
>>>> really needed?
>>>>
>>>> I have been thinking about the following design:
>>>>
>>>> void rcu_read_lock()
>>>> {
>>>> 	if (!in_interrupt())
>>>> 		current->rcu_read_lock_count++;
>>>> }
>>>> void rcu_read_unlock()
>>>> {
>>>> 	if (!in_interrupt())
>>>> 		current->rcu_read_lock_count--;
>>>> }
>>>>
>>>> Somewhere in schedule():
>>>>
>>>> 	rq->rcu_read_lock_count += prev->rcu_read_lock_count;
>>>> 	if (!rq->rcu_read_lock_count)
>>>> 		forward_waiting_rcu_jobs();
>>>> 	rq->rcu_read_lock_count -= next->rcu_read_lock_count;
>>>
>>> So rq->rcu_read_lock_count contains the sum of the counts of all
>>> tasks that were scheduled away from this CPU.
>>>
>>> What happens in face of the following sequence of events?
>>> Assume that all tasks stay on CPU 0 for the moment.
>>>
>>> o	Task A does rcu_read_lock().
>>>
>>> o	Task A is preempted.  rq->rcu_read_lock_count is nonzero.
>>>
>>> o	Task B runs and does rcu_read_lock().
>>>
>>> o	Task B is preempted (perhaps because it dropped the lock
>>> 	that was causing it to have high priority).  Regardless of
>>> 	the reason for the preemption, rq->rcu_read_lock_count is
>>> 	nonzero.
>>>
>>> o	Task C runs and does rcu_read_lock().
>>>
>>> o	Task C is preempted.  rq->rcu_read_lock_count is nonzero.
>>>
>>> o	Task A runs again, and does rcu_read_unlock().
>>>
>>> o	Task A is preempted.  rq->rcu_read_lock_count is nonzero.
>>>
>>> o	Task D runs and does rcu_read_lock().
>>>
>>> o	Task D is preempted.  rq->rcu_read_lock_count is nonzero.
>>>
>>> And so on.  As long as at least one of the preempted tasks is in an
>>> RCU critical section, you never do your forward_waiting_rcu_jobs(),
>>> and the grace period goes on forever.  Or at least until you OOM the
>>> machine.
>>>
>>> So what am I missing here?
>>
>> The boosting idea below. Then tasks A-D must be RT tasks for this to
>> happen. And the machine must anyway run out of RT tasks or it will
>> effectively lock up.
>
> I agree that boosting would drive any particular task out of its
> RCU read-side critical section, but I do not agree that you have
> identified a valid induction argument that proves that we run out
> of tasks (yet, anyway).  For one thing, tasks acquiring locks in
> the RCU read-side critical sections could be priority boosted,
> so that a small set of tasks could rise and fall in priority as
> RT tasks attempted to acquire locks held by this small set of
> tasks.  The RT tasks need not be running on the same CPU that the
> non-realtime "victim" tasks running in RCU read-side critical sections,
> therefore, the RT tasks need not be preempting these non-realtime
> tasks.
>
> To be more specific, imagine a four-CPU system with three RT tasks
> running on three of the four CPUs and a large number of non-realtime
> tasks running on all four CPUs.  Consider the code in __d_lookup(), which
> acquires dentry->d_lock under rcu_read_lock().  Imagine a situation
> where the three RT tasks are repeatedly traversing pathnames with a
> long common path prefix, with this same prefix being traversed by the
> non-realtime tasks.  This could result in the non-realtime tasks being
> repeatedly priority-boosted and preempted (preempted both by each
> other and by RT tasks).
>
> Over to you!
>
The PI code will never lower the priority lower than task->normal_prio.
So if you somewhere in schedule() set normal_prio to something high PI 
will not destroy it.

>>> I could imagine introducing a pair of counters per runqueue, but then
>>> we end up with the same issues with counter-flip that we have now.
>>>
>>> Another question -- what happens if a given CPU stays idle?  Wouldn't
>>> the callbacks then just start piling up on that CPU?
>>
>> How can a CPU stay idle? There is a tick every 2.5 ms. Even without that
>> the previous CPU can make it schedule if it sees the jobs piling up. Or if
>> that is considered too expensive, it can take over and forward the
>> jobs to the next CPU.
>
> Tickless idle, prized by embedded systems.  Here an idle CPU gets shut
> down if there is nothing in particular for it to do.  Classic RCU does a
> dance with CPUs entering tickless idle state in order to avoid expecting
> those CPUs to participate in future grace periods.
>
> This had its challenges for the bitmask-based Classic RCU, and would
> have a few additional challenges for your circular linked-list setup.

Can't the previous CPU just see that when it wants to send the RCU-jobs 
along to it and send it to the next CPU instead?
Or when a CPU goes idle it can take itself out of the circle to avoid the 
extra load on the previous CPU.

>
> Current -rt RCU avoids this problem by dint of being too stupid to
> need to know what CPUs are alive or not.  My recent patch needs something
> similar to the tickless-idle change to Classic RCU.
>
>>>> Now what should forward_waiting_rcu_jobs() do?
>>>>
>>>> I imagine a circular datastructur of all the CPUs. When a call_rcu() is
>>>> run on a CPU it is first added a list of jobs for that CPU. When
>>>> forward_waiting_rcu_jobs() is called all the pending jobs are forwarded to
>>>> the next CPU. The next CPU will bring it along the next CPU in the circle
>>>> along with it's own jobs. When jobs hit the original CPU they will be
>>>> executed. Or rather, when the CPU just before calls
>>>> forward_waiting_rcu_jobs(), it sends the jobs belonging to the next CPU to
>>>> the RCU-task of the next CPU, where they will be executed, instead of to
>>>> the scheduler (runqueue) of the next CPU, where it will just be send out
>>>> on
>>>> a
>>>> new roundtrip along the circle.
>>>>
>>>> If you use a structure like the plist then the forwarding procedure can be
>>>> done in O(number of online CPUs) time worst case - much less in the usual
>>>> case where the lists are almost empty.
>>>>
>>>> Now the problem is: What happens if a task in a rcu read-side lock is
>>>> migrated? Then the rcu_read_lock_count on the source CPU will stay in plus
>>>> while on the target CPU it will go in minus. This ought to be simply
>>>> fixeable by adding task->rcu_read_lock_count to the target runqueue before
>>>> migrating and subtracting it from the old runqueue after migrating. But
>>>> there is another problem: RCU-jobs refering to data used by the task being
>>>> migrated might have been forwarded from the target CPU. Thus the migration
>>>> task have to go back along the circle of CPUs and move all the relevant
>>>> RCU-jobs back to the target CPU to be forwarded again. This is also doable
>>>> in
>>>> number of CPUs between source and target times O(<number of online CPUs>)
>>>> (see above) time.
>>>
>>> So if I have the right (or wrong) pattern of task migrations, the RCU
>>> callbacks never get to their originating CPU?
>>
>> In principle, yes. But if the machine starts to migrate tasks that wildly
>> it wont get any work done anyway, because all it's time is done doing
>> migration.
>
> Since you are waiting for a context switch to move the RCU callbacks,
> the migration does not have to be all that wild.  In an eight-CPU system,
> if I migrate a preempted task a couple of times in the course of the
> eight context switches required for the callbacks to circle the loop
> of CPUs, we are in trouble, right?
>

Could be. The other idea I came up with below doesn't have that problem.

>>> Alternatively, if the task residing in the RCU read-side critical section
>>> is forwarded around the loop of CPUs, callbacks circulating around this
>>> loop might execute before the RCU read-side critical section completes.
>>
>> That is why some of the callbacks (those which has parsed the target CPU
>> but not yet the source CPU) have to be moved back to the target CPU.
>
> Yep.  Leaving them to follow their planned circuit indeed does not cut it.
>
>> I just came up with an even simpler solution:
>> Delay the subtraction of the task->rcu_read_lock_count from
>> srcrq->rcu_read_lock_count until the task calls rcu_read_unlock(). That
>> can be done by flagging the task (do task->rcu_read_lock_count |=
>> 0x80000000) and do a simple
>> 	if (unlickely(current->rcu_read_lock_count == 0x80000000))
>> 		fix_rcu_read_lock_count_on_old_cpu();
>> in rcu_read_unlock(). Now the task can be migrated again before calloing
>> fix_rcu_read_lock_count_on_old_cpu(). The relevant RCU jobs still can't
>> get past the original CPU before the task have called
>> fix_rcu_read_lock_count_on_old_cpu(), so all subsequent migrations can just
>> do the count down on the intermediate CPUs right away.
>
> But now you need atomic instructions in all rcu_read_lock() and
> rcu_read_unlock() invocations to handle the possibility that some
> migrated task will decrement the counter at the same time we are
> incrementing it?  I cannot tell for sure because you have not supplied
> sample code for fix_rcu_read_lock_count_on_old_cpu().

Now the atomic part is only kicking in if the task was actually migrated 
while under RCU readside section so it is rare. But consider this code:

void fix_rcu_read_lock_count_on_old_cpu()
{
 	int cpuid = current->old_cpuid;
 	int count = current->old_rcu_read_lock_count;

 	runqueue_t *rq = cpu_rq(cpuid);
         spin_lock_irqsave(&rq->lock, flags);
 	rq->rcu_read_lock_count -= count;
         spin_unlock_irqrestore(&rq->lock, flags);
}

Then there can be something about if the CPU was taken out meanwhile. If 
the rq->rcu_read_lock is 0 after this, then it can go completely out 
of the RCU CPU circle now (see below).

>
>>>> To avoid a task in a read-side lock being starved for too long the
>>>> following line can be added to normal_prio():
>>>>  if (p->rcu_read_lock_count)
>>>> 	p->prio = MAX_RT_PRIO;
>>>
>>> But doesn't this have the same impact on latency as disabling preemption
>>> in rcu_read_lock() and then re-enabling it in rcu_read_unlock()?
>>
>> No, RT tasks can still preempt the RCU read side lock. But SCHED_OTHER and
>> SCHED_BATCH can't. You can also the RCU read side boosting prioritiy
>> dynamic and let the system adjust it or just let the admin adjust it.
>
> Fair enough -- I misread MAX_RT_PRIO as MAX_PRIO.
>
> This approach I can get behind -- my thought has been to boost to
> either MAX_RT_PRIO or MAX_RT_PRIO-1 when preempt_schedule() sees that
> it is preempting an RCU read-side critical section.
>
> So I agree with you on at least one point!  ;-)
>
> A possible elaboration would be to keep a linked list of tasks preempted
> in their RCU read-side critical sections so that they can be further
> boosted to the highest possible priority (numerical value of zero,
> not sure what the proper symbol is) if the grace period takes too many
> jiffies to complete.  Another piece is priority boosting when blocking
> on a mutex from within an RCU read-side critical section.
>
>>> Also, doesn't this circular data structure need to handle CPU hotplug?
>>
>> Ofcourse. I don't know about hotplug though. But it sounds simple to
>> migrate the tasks away, take the CPU out of the circle and then forward
>> the last RCU jobs from that CPU.
>
> Doing it efficiently is the difficulty, particularly for tickless-idle
> systems where CPUs need to be added and removed on a regular basis.
> Also, what locking design would you use in order to avoid deadlock?
> There is a hotplug mutex, but seems like you might need to acquire some
> number of rq mutexes as well.

I think it can be done very effectively. When taking a CPU out: If, after 
migrating all the tasks off the CPU, the rq->rcu_read_lock_count is 0, 
alter the previous CPUs next CPU to this CPUs next CPU. Then forward the 
pending RCU jobs. If some of the migrated task where in an RCU read side 
section rq->rcu_read_lock_count > 0. Then the CPU has to be keeped in the 
RCU circle until all are done. Either an event can be set up or the 
previous CPU can check on it everytime it tries to forward jobs to the 
CPU.
When putting the CPU in again: Simply add it to the circle when it is up 
running.

>
>>>> I don't have time to code this nor a SMP machine to test it on. But I can
>>>> give the idea to you anyways in the hope you might code it :-)
>>>
>>> I am beginning to think that it will not be at all simple by the time I
>>> code up all the required fixups.  Or am I missing something?
>>
>> Ofcourse, implementing something is always a lot harder than writing the
>> idea down. Anyway, we already worked out some of the hardest details :-)
>
> I certainly agree with your first sentence.  On your second sentence,
> I would s/worked out/mentioned/  ;-)
>
> Another approach I am looking at does not permit rcu_read_lock() in
> NMI/SMI/hardirq, but is much simpler.  Its downside is that it cannot
> serve as common code between CONFIG_PREEMPT_RT and CONFIG_PREEMPT.
>
Iack :-( That is not good. :-(

Esben

> 							Thanx, Paul
>
>>>>> Signed-off-by: Paul E. McKenney <paulmck@us.ibm.com> (but please don't
>>>>> apply)
>>>>>
>>>>> include/linux/sched.h |    3
>>>>> kernel/rcupreempt.c   |  577
>>>>> ++++++++++++++++++++++++++++++++++++++++----------
>>>>> 2 files changed, 470 insertions(+), 110 deletions(-)
>>>>>
>>>>> diff -urpNa -X dontdiff linux-2.6.17-rt7/include/linux/sched.h
>>>>> linux-2.6.17-rt7-norrupt/include/linux/sched.h
>>>>> --- linux-2.6.17-rt7/include/linux/sched.h	2006-07-19
>>>>> 01:43:09.000000000 -0700
>>>>> +++ linux-2.6.17-rt7-norrupt/include/linux/sched.h	2006-07-22
>>>>> 15:44:36.000000000 -0700
>>>>> @@ -868,8 +868,7 @@ struct task_struct {
>>>>>
>>>>> #ifdef CONFIG_PREEMPT_RCU
>>>>> 	int rcu_read_lock_nesting;
>>>>> -	atomic_t *rcu_flipctr1;
>>>>> -	atomic_t *rcu_flipctr2;
>>>>> +	int rcu_flipctr_idx;
>>>>> #endif
>>>>> #ifdef CONFIG_SCHEDSTATS
>>>>> 	struct sched_info sched_info;
>>>>> diff -urpNa -X dontdiff linux-2.6.17-rt7/kernel/rcupreempt.c
>>>>> linux-2.6.17-rt7-norrupt/kernel/rcupreempt.c
>>>>> --- linux-2.6.17-rt7/kernel/rcupreempt.c	2006-07-19
>>>>> 01:43:09.000000000 -0700
>>>>> +++ linux-2.6.17-rt7-norrupt/kernel/rcupreempt.c	2006-07-22
>>>>> 20:21:46.000000000 -0700
>>>>> @@ -15,11 +15,13 @@
>>>>> * along with this program; if not, write to the Free Software
>>>>> * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
>>>>> USA.
>>>>> *
>>>>> - * Copyright (C) IBM Corporation, 2001
>>>>> + * Copyright (C) IBM Corporation, 2006
>>>>> *
>>>>> * Authors: Paul E. McKenney <paulmck@us.ibm.com>
>>>>> *		With thanks to Esben Nielsen, Bill Huey, and Ingo Molnar
>>>>> - *		for pushing me away from locks and towards counters.
>>>>> + *		for pushing me away from locks and towards counters, and
>>>>> + *		to Suparna Bhattacharya for pushing me completely away
>>>>> + *		from atomic instructions on the read side.
>>>>> *
>>>>> * Papers:  http://www.rdrop.com/users/paulmck/RCU
>>>>> *
>>>>> @@ -73,12 +75,20 @@ struct rcu_data {
>>>>> 	long		n_done_remove;
>>>>> 	atomic_t	n_done_invoked;
>>>>> 	long		n_rcu_check_callbacks;
>>>>> -	atomic_t	n_rcu_try_flip1;
>>>>> -	long		n_rcu_try_flip2;
>>>>> -	long		n_rcu_try_flip3;
>>>>> +	atomic_t	n_rcu_try_flip_1;
>>>>> 	atomic_t	n_rcu_try_flip_e1;
>>>>> -	long		n_rcu_try_flip_e2;
>>>>> -	long		n_rcu_try_flip_e3;
>>>>> +	long		n_rcu_try_flip_i1;
>>>>> +	long		n_rcu_try_flip_ie1;
>>>>> +	long		n_rcu_try_flip_g1;
>>>>> +	long		n_rcu_try_flip_a1;
>>>>> +	long		n_rcu_try_flip_ae1;
>>>>> +	long		n_rcu_try_flip_a2;
>>>>> +	long		n_rcu_try_flip_z1;
>>>>> +	long		n_rcu_try_flip_ze1;
>>>>> +	long		n_rcu_try_flip_z2;
>>>>> +	long		n_rcu_try_flip_m1;
>>>>> +	long		n_rcu_try_flip_me1;
>>>>> +	long		n_rcu_try_flip_m2;
>>>>> #endif /* #ifdef CONFIG_RCU_STATS */
>>>>> };
>>>>> struct rcu_ctrlblk {
>>>>> @@ -90,8 +100,51 @@ static struct rcu_ctrlblk rcu_ctrlblk =
>>>>> 	.fliplock = RAW_SPIN_LOCK_UNLOCKED,
>>>>> 	.completed = 0,
>>>>> };
>>>>> -static DEFINE_PER_CPU(atomic_t [2], rcu_flipctr) =
>>>>> -	{ ATOMIC_INIT(0), ATOMIC_INIT(0) };
>>>>> +static DEFINE_PER_CPU(int [2], rcu_flipctr) = { 0, 0 };
>>>>> +
>>>>> +/*
>>>>> + * States for rcu_try_flip() and friends.
>>>>> + */
>>>>> +
>>>>> +enum rcu_try_flip_states {
>>>>> +	rcu_try_flip_idle_state,	/* "I" */
>>>>> +	rcu_try_flip_in_gp_state,	/* "G" */
>>>>> +	rcu_try_flip_waitack_state, 	/* "A" */
>>>>> +	rcu_try_flip_waitzero_state,	/* "Z" */
>>>>> +	rcu_try_flip_waitmb_state	/* "M" */
>>>>> +};
>>>>> +static enum rcu_try_flip_states rcu_try_flip_state =
>>>>> rcu_try_flip_idle_state;
>>>>> +#ifdef CONFIG_RCU_STATS
>>>>> +static char *rcu_try_flip_state_names[] =
>>>>> +	{ "idle", "gp", "waitack", "waitzero", "waitmb" };
>>>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>>>> +
>>>>> +/*
>>>>> + * Enum and per-CPU flag to determine when each CPU has seen
>>>>> + * the most recent counter flip.
>>>>> + */
>>>>> +
>>>>> +enum rcu_flip_flag_values {
>>>>> +	rcu_flip_seen,		/* Steady/initial state, last flip seen. */
>>>>> +				/* Only GP detector can update. */
>>>>> +	rcu_flipped		/* Flip just completed, need confirmation. */
>>>>> +				/* Only corresponding CPU can update. */
>>>>> +};
>>>>> +static DEFINE_PER_CPU(enum rcu_flip_flag_values, rcu_flip_flag) =
>>>>> rcu_flip_seen;
>>>>> +
>>>>> +/*
>>>>> + * Enum and per-CPU flag to determine when each CPU has executed the
>>>>> + * needed memory barrier to fence in memory references from its last RCU
>>>>> + * read-side critical section in the just-completed grace period.
>>>>> + */
>>>>> +
>>>>> +enum rcu_mb_flag_values {
>>>>> +	rcu_mb_done,		/* Steady/initial state, no mb()s required.
>>>>> */
>>>>> +				/* Only GP detector can update. */
>>>>> +	rcu_mb_needed		/* Flip just completed, need an mb(). */
>>>>> +				/* Only corresponding CPU can update. */
>>>>> +};
>>>>> +static DEFINE_PER_CPU(enum rcu_mb_flag_values, rcu_mb_flag) =
>>>>> rcu_mb_done;
>>>>>
>>>>> /*
>>>>> * Return the number of RCU batches processed thus far.  Useful
>>>>> @@ -105,93 +158,182 @@ long rcu_batches_completed(void)
>>>>> void
>>>>> rcu_read_lock(void)
>>>>> {
>>>>> -	int flipctr;
>>>>> +	int idx;
>>>>> +	int nesting;
>>>>> 	unsigned long oldirq;
>>>>>
>>>>> -	local_irq_save(oldirq);
>>>>> -
>>>>> +	raw_local_irq_save(oldirq);
>>>>> 	trace_special(current->rcu_read_lock_nesting,
>>>>> 		      (unsigned long) current->rcu_flipctr1,
>>>>> 		      rcu_ctrlblk.completed);
>>>>> -	if (current->rcu_read_lock_nesting++ == 0) {
>>>>> +
>>>>> +	nesting = current->rcu_read_lock_nesting;
>>>>> +
>>>>> +	/*
>>>>> +	 * Any rcu_read_lock()s called from NMI handlers
>>>>> +	 * at any point must have matching rcu_read_unlock()
>>>>> +	 * calls in that same handler, so we will not see the
>>>>> +	 * value of current->rcu_read_lock_nesting change.
>>>>> +	 */
>>>>> +
>>>>> +	if (nesting != 0) {
>>>>>
>>>>> 		/*
>>>>> -		 * Outermost nesting of rcu_read_lock(), so atomically
>>>>> +		 * There is an enclosing rcu_read_lock(), so all we
>>>>> +		 * need to do is to increment the counter.
>>>>> +		 */
>>>>> +
>>>>> +		current->rcu_read_lock_nesting = nesting + 1;
>>>>> +
>>>>> +	} else {
>>>>> +
>>>>> +		/*
>>>>> +		 * Outermost nesting of rcu_read_lock(), so we must
>>>>> 		 * increment the current counter for the current CPU.
>>>>> +		 * This must be done carefully, because NMIs can
>>>>> +		 * occur at any point in this code, and any rcu_read_lock()
>>>>> +		 * and rcu_read_unlock() pairs in the NMI handlers
>>>>> +		 * must interact non-destructively with this code.
>>>>> +		 * Lots of barrier() calls, and -very- careful ordering.
>>>>> +		 *
>>>>> +		 * Changes to this code, including this one, must be
>>>>> +		 * inspected, validated, and tested extremely carefully!!!
>>>>> 		 */
>>>>>
>>>>> -		flipctr = rcu_ctrlblk.completed & 0x1;
>>>>> +		/*
>>>>> +		 * First, pick up the index.  Enforce ordering for
>>>>> +		 * both compilers and for DEC Alpha.
>>>>> +		 */
>>>>> +
>>>>> +		idx = rcu_ctrlblk.completed & 0x1;
>>>>> 		smp_read_barrier_depends();
>>>>> -		current->rcu_flipctr1 =
>>>>> &(__get_cpu_var(rcu_flipctr)[flipctr]);
>>>>> -		/* Can optimize to non-atomic on fastpath, but start simple.
>>>>> */
>>>>> -		atomic_inc(current->rcu_flipctr1);
>>>>> -		smp_mb__after_atomic_inc();  /* might optimize out... */
>>>>> -		if (unlikely(flipctr != (rcu_ctrlblk.completed & 0x1))) {
>>>>> -
>>>>> -			/*
>>>>> -			 * We raced with grace-period processing (flip).
>>>>> -			 * Although we cannot be preempted here, there
>>>>> -			 * could be interrupts, ECC errors and the like,
>>>>> -			 * so just nail down both sides of the rcu_flipctr
>>>>> -			 * array for the duration of our RCU read-side
>>>>> -			 * critical section, preventing a second flip
>>>>> -			 * from racing with us.  At some point, it would
>>>>> -			 * be safe to decrement one of the counters, but
>>>>> -			 * we have no way of knowing when that would be.
>>>>> -			 * So just decrement them both in rcu_read_unlock().
>>>>> -			 */
>>>>> -
>>>>> -			current->rcu_flipctr2 =
>>>>> -				&(__get_cpu_var(rcu_flipctr)[!flipctr]);
>>>>> -			/* Can again optimize to non-atomic on fastpath. */
>>>>> -			atomic_inc(current->rcu_flipctr2);
>>>>> -			smp_mb__after_atomic_inc();  /* might optimize
>>>>> out... */
>>>>> -		}
>>>>> +		barrier();
>>>>> +
>>>>> +		/*
>>>>> +		 * Increment the per-CPU counter. NMI handlers
>>>>> +		 * might increment it as well, but they had better
>>>>> +		 * properly nest their rcu_read_lock()/rcu_read_unlock()
>>>>> +		 * pairs so that the value is restored before the handler
>>>>> +		 * returns to us.  Enforce ordering for compilers.
>>>>> +		 */
>>>>> +
>>>>> +		__get_cpu_var(rcu_flipctr)[idx]++;
>>>>> +		barrier();
>>>>> +
>>>>> +		/*
>>>>> +		 * It is now safe to increment the task's nesting count.
>>>>> +		 * NMIs that occur after this statement will route
>>>>> +		 * their rcu_read_lock() calls through the "then" clause
>>>>> +		 * of this "if" statement, and thus will no longer come
>>>>> +		 * through this path.  Enforce ordering for compilers.
>>>>> +		 */
>>>>> +
>>>>> +		current->rcu_read_lock_nesting = nesting + 1;
>>>>> +		barrier();
>>>>> +
>>>>> +		/*
>>>>> +		 * It is now safe to store the index into our task
>>>>> +		 * structure.  Doing so earlier would have resulted
>>>>> +		 * in it getting clobbered by NMI handlers.
>>>>> +		 */
>>>>> +
>>>>> +		current->rcu_flipctr_idx = idx;
>>>>> 	}
>>>>> +
>>>>> 	trace_special((unsigned long) current->rcu_flipctr1,
>>>>> 		      (unsigned long) current->rcu_flipctr2,
>>>>> 		      rcu_ctrlblk.completed);
>>>>> -	local_irq_restore(oldirq);
>>>>> +	raw_local_irq_restore(oldirq);
>>>>> }
>>>>>
>>>>> void
>>>>> rcu_read_unlock(void)
>>>>> {
>>>>> +	int idx;
>>>>> +	int nesting;
>>>>> 	unsigned long oldirq;
>>>>>
>>>>> -	local_irq_save(oldirq);
>>>>> +	raw_local_irq_save(oldirq);
>>>>> 	trace_special((unsigned long) current->rcu_flipctr1,
>>>>> 		      (unsigned long) current->rcu_flipctr2,
>>>>> 		      current->rcu_read_lock_nesting);
>>>>> -	if (--current->rcu_read_lock_nesting == 0) {
>>>>> +
>>>>> +	nesting = current->rcu_read_lock_nesting;
>>>>> +
>>>>> +	/*
>>>>> +	 * Any rcu_read_lock()s called from NMI handlers
>>>>> +	 * at any point must have matching rcu_read_unlock()
>>>>> +	 * calls in that same handler, so we will not see the
>>>>> +	 * value of current->rcu_read_lock_nesting change.
>>>>> +	 */
>>>>> +
>>>>> +	if (nesting > 1) {
>>>>>
>>>>> 		/*
>>>>> -		 * Just atomically decrement whatever we incremented.
>>>>> -		 * Might later want to awaken some task waiting for the
>>>>> -		 * grace period to complete, but keep it simple for the
>>>>> -		 * moment.
>>>>> +		 * There is an enclosing rcu_read_lock(), so all we
>>>>> +		 * need to do is to decrement the counter.
>>>>> 		 */
>>>>>
>>>>> -		smp_mb__before_atomic_dec();
>>>>> -		atomic_dec(current->rcu_flipctr1);
>>>>> -		current->rcu_flipctr1 = NULL;
>>>>> -		if (unlikely(current->rcu_flipctr2 != NULL)) {
>>>>> -			atomic_dec(current->rcu_flipctr2);
>>>>> -			current->rcu_flipctr2 = NULL;
>>>>> -		}
>>>>> +		current->rcu_read_lock_nesting = nesting - 1;
>>>>> +
>>>>> +	} else {
>>>>> +
>>>>> +		/*
>>>>> +		 * Outermost nesting of rcu_read_unlock(), so we must
>>>>> +		 * decrement the current counter for the current CPU.
>>>>> +		 * This must be done carefully, because NMIs can
>>>>> +		 * occur at any point in this code, and any rcu_read_lock()
>>>>> +		 * and rcu_read_unlock() pairs in the NMI handlers
>>>>> +		 * must interact non-destructively with this code.
>>>>> +		 * Lots of barrier() calls, and -very- careful ordering.
>>>>> +		 *
>>>>> +		 * Changes to this code, including this one, must be
>>>>> +		 * inspected, validated, and tested extremely carefully!!!
>>>>> +		 */
>>>>> +
>>>>> +		/*
>>>>> +		 * First, pick up the index.  Enforce ordering for
>>>>> +		 * both compilers and for DEC Alpha.
>>>>> +		 */
>>>>> +
>>>>> +		idx = current->rcu_flipctr_idx;
>>>>> +		smp_read_barrier_depends();
>>>>> +		barrier();
>>>>> +
>>>>> +		/*
>>>>> +		 * It is now safe to decrement the task's nesting count.
>>>>> +		 * NMIs that occur after this statement will route
>>>>> +		 * their rcu_read_lock() calls through this "else" clause
>>>>> +		 * of this "if" statement, and thus will start incrementing
>>>>> +		 * the per-CPU counter on their own.  Enforce ordering for
>>>>> +		 * compilers.
>>>>> +		 */
>>>>> +
>>>>> +		current->rcu_read_lock_nesting = nesting - 1;
>>>>> +		barrier();
>>>>> +
>>>>> +		/*
>>>>> +		 * Decrement the per-CPU counter. NMI handlers
>>>>> +		 * might increment it as well, but they had better
>>>>> +		 * properly nest their rcu_read_lock()/rcu_read_unlock()
>>>>> +		 * pairs so that the value is restored before the handler
>>>>> +		 * returns to us.
>>>>> +		 */
>>>>> +
>>>>> +		__get_cpu_var(rcu_flipctr)[idx]--;
>>>>> 	}
>>>>>
>>>>> 	trace_special((unsigned long)current->rcu_flipctr1,
>>>>> 		      (unsigned long) current->rcu_flipctr2,
>>>>> 		      current->rcu_read_lock_nesting);
>>>>> -	local_irq_restore(oldirq);
>>>>> +	raw_local_irq_restore(oldirq);
>>>>> }
>>>>>
>>>>> static void
>>>>> __rcu_advance_callbacks(void)
>>>>> {
>>>>>
>>>>> -	if (rcu_data.completed != rcu_ctrlblk.completed) {
>>>>> +	if ((rcu_data.completed >> 1) != (rcu_ctrlblk.completed >> 1)) {
>>>>> 		if (rcu_data.waitlist != NULL) {
>>>>> 			*rcu_data.donetail = rcu_data.waitlist;
>>>>> 			rcu_data.donetail = rcu_data.waittail;
>>>>> @@ -216,13 +358,186 @@ __rcu_advance_callbacks(void)
>>>>> 			rcu_data.waittail = &rcu_data.waitlist;
>>>>> 		}
>>>>> 		rcu_data.completed = rcu_ctrlblk.completed;
>>>>> +	} else if (rcu_data.completed != rcu_ctrlblk.completed) {
>>>>> +		rcu_data.completed = rcu_ctrlblk.completed;
>>>>> +	}
>>>>> +}
>>>>> +
>>>>> +/*
>>>>> + * Get here when RCU is idle.  Decide whether we need to
>>>>> + * move out of idle state, and return zero if so.
>>>>> + * "Straightforward" approach for the moment, might later
>>>>> + * use callback-list lengths, grace-period duration, or
>>>>> + * some such to determine when to exit idle state.
>>>>> + * Might also need a pre-idle test that does not acquire
>>>>> + * the lock, but let's get the simple case working first...
>>>>> + */
>>>>> +
>>>>> +static int
>>>>> +rcu_try_flip_idle(int flipctr)
>>>>> +{
>>>>> +#ifdef CONFIG_RCU_STATS
>>>>> +	rcu_data.n_rcu_try_flip_i1++;
>>>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>>>> +
>>>>> +	if (!rcu_pending(smp_processor_id())) {
>>>>> +#ifdef CONFIG_RCU_STATS
>>>>> +		rcu_data.n_rcu_try_flip_ie1++;
>>>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>>>> +		return 1;
>>>>> 	}
>>>>> +	return 0;
>>>>> +}
>>>>> +
>>>>> +/*
>>>>> + * Flip processing up to and including the flip, as well as
>>>>> + * telling CPUs to acknowledge the flip.
>>>>> + */
>>>>> +
>>>>> +static int
>>>>> +rcu_try_flip_in_gp(int flipctr)
>>>>> +{
>>>>> +	int cpu;
>>>>> +
>>>>> +#ifdef CONFIG_RCU_STATS
>>>>> +	rcu_data.n_rcu_try_flip_g1++;
>>>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>>>> +
>>>>> +	/*
>>>>> +	 * Do the flip.
>>>>> +	 */
>>>>> +
>>>>> +	rcu_ctrlblk.completed++;  /* stands in for rcu_try_flip_g2 */
>>>>> +
>>>>> +	/*
>>>>> +	 * Need a memory barrier so that other CPUs see the new
>>>>> +	 * counter value before they see the subsequent change of all
>>>>> +	 * the rcu_flip_flag instances to rcu_flipped.
>>>>> +	 */
>>>>> +
>>>>> +	smp_mb();
>>>>> +
>>>>> +	/* Now ask each CPU for acknowledgement of the flip. */
>>>>> +
>>>>> +	for_each_cpu(cpu)
>>>>> +		per_cpu(rcu_flip_flag, cpu) = rcu_flipped;
>>>>> +
>>>>> +	return 0;
>>>>> +}
>>>>> +
>>>>> +/*
>>>>> + * Wait for CPUs to acknowledge the flip.
>>>>> + */
>>>>> +
>>>>> +static int
>>>>> +rcu_try_flip_waitack(int flipctr)
>>>>> +{
>>>>> +	int cpu;
>>>>> +
>>>>> +#ifdef CONFIG_RCU_STATS
>>>>> +	rcu_data.n_rcu_try_flip_a1++;
>>>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>>>> +
>>>>> +	for_each_cpu(cpu)
>>>>> +		if (per_cpu(rcu_flip_flag, cpu) != rcu_flip_seen) {
>>>>> +#ifdef CONFIG_RCU_STATS
>>>>> +			rcu_data.n_rcu_try_flip_ae1++;
>>>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>>>> +			return 1;
>>>>> +		}
>>>>> +
>>>>> +	/*
>>>>> +	 * Make sure our checks above don't bleed into subsequent
>>>>> +	 * waiting for the sum of the counters to reach zero.
>>>>> +	 */
>>>>> +
>>>>> +	smp_mb();
>>>>> +
>>>>> +#ifdef CONFIG_RCU_STATS
>>>>> +	rcu_data.n_rcu_try_flip_a2++;
>>>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>>>> +
>>>>> +	return 0;
>>>>> +}
>>>>> +
>>>>> +/*
>>>>> + * Wait for collective ``last'' counter to reach zero,
>>>>> + * then tell all CPUs to do an end-of-grace-period memory barrier.
>>>>> + */
>>>>> +
>>>>> +static int
>>>>> +rcu_try_flip_waitzero(int flipctr)
>>>>> +{
>>>>> +	int cpu;
>>>>> +	int lastidx = !(flipctr & 0x1);
>>>>> +	int sum = 0;
>>>>> +
>>>>> +#ifdef CONFIG_RCU_STATS
>>>>> +	rcu_data.n_rcu_try_flip_z1++;
>>>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>>>> +
>>>>> +	/* Check to see if the sum of the "last" counters is zero. */
>>>>> +
>>>>> +	for_each_cpu(cpu)
>>>>> +		sum += per_cpu(rcu_flipctr, cpu)[lastidx];
>>>>> +	if (sum != 0) {
>>>>> +#ifdef CONFIG_RCU_STATS
>>>>> +		rcu_data.n_rcu_try_flip_ze1++;
>>>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>>>> +		return 1;
>>>>> +	}
>>>>> +
>>>>> +#ifdef CONFIG_RCU_STATS
>>>>> +	rcu_data.n_rcu_try_flip_z2++;
>>>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>>>> +
>>>>> +	/* Make sure we don't call for memory barriers before we see zero. */
>>>>> +
>>>>> +	smp_mb();
>>>>> +
>>>>> +	/* Call for a memory barrier from each CPU. */
>>>>> +
>>>>> +	for_each_cpu(cpu)
>>>>> +		per_cpu(rcu_mb_flag, cpu) = rcu_mb_needed;
>>>>> +
>>>>> +	return 0;
>>>>> +}
>>>>> +
>>>>> +/*
>>>>> + * Wait for all CPUs to do their end-of-grace-period memory barrier.
>>>>> + * Return 0 once all CPUs have done so.
>>>>> + */
>>>>> +
>>>>> +static int
>>>>> +rcu_try_flip_waitmb(int flipctr)
>>>>> +{
>>>>> +	int cpu;
>>>>> +
>>>>> +#ifdef CONFIG_RCU_STATS
>>>>> +	rcu_data.n_rcu_try_flip_m1++;
>>>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>>>> +
>>>>> +	for_each_cpu(cpu)
>>>>> +		if (per_cpu(rcu_mb_flag, cpu) != rcu_mb_done) {
>>>>> +#ifdef CONFIG_RCU_STATS
>>>>> +			rcu_data.n_rcu_try_flip_me1++;
>>>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>>>> +			return 1;
>>>>> +		}
>>>>> +
>>>>> +	smp_mb(); /* Ensure that the above checks precede any following
>>>>> flip. */
>>>>> +
>>>>> +#ifdef CONFIG_RCU_STATS
>>>>> +	rcu_data.n_rcu_try_flip_m2++;
>>>>> +#endif /* #ifdef CONFIG_RCU_STATS */
>>>>> +
>>>>> +	return 0;
>>>>> }
>>>>>
>>>>> /*
>>>>> * Attempt a single flip of the counters.  Remember, a single flip does
>>>>> * -not- constitute a grace period.  Instead, the interval between
>>>>> - * a pair of consecutive flips is a grace period.
>>>>> + * at least three consecutive flips is a grace period.
>>>>> *
>>>>> * If anyone is nuts enough to run this CONFIG_PREEMPT_RCU implementation
>>>>> * on a large SMP, they might want to use a hierarchical organization of
>>>>> @@ -231,13 +546,11 @@ __rcu_advance_callbacks(void)
>>>>> static void
>>>>> rcu_try_flip(void)
>>>>> {
>>>>> -	int cpu;
>>>>> 	long flipctr;
>>>>> 	unsigned long oldirq;
>>>>>
>>>>> -	flipctr = rcu_ctrlblk.completed;
>>>>> #ifdef CONFIG_RCU_STATS
>>>>> -	atomic_inc(&rcu_data.n_rcu_try_flip1);
>>>>> +	atomic_inc(&rcu_data.n_rcu_try_flip_1);
>>>>> #endif /* #ifdef CONFIG_RCU_STATS */
>>>>> 	if (unlikely(!spin_trylock_irqsave(&rcu_ctrlblk.fliplock, oldirq))) {
>>>>> #ifdef CONFIG_RCU_STATS
>>>>> @@ -245,52 +558,82 @@ rcu_try_flip(void)
>>>>> #endif /* #ifdef CONFIG_RCU_STATS */
>>>>> 		return;
>>>>> 	}
>>>>> -	if (unlikely(flipctr != rcu_ctrlblk.completed)) {
>>>>> -
>>>>> -		/* Our work is done!  ;-) */
>>>>> -
>>>>> -#ifdef CONFIG_RCU_STATS
>>>>> -		rcu_data.n_rcu_try_flip_e2++;
>>>>> -#endif /* #ifdef CONFIG_RCU_STATS */
>>>>> -		spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
>>>>> -		return;
>>>>> -	}
>>>>> -	flipctr &= 0x1;
>>>>>
>>>>> 	/*
>>>>> -	 * Check for completion of all RCU read-side critical sections
>>>>> -	 * that started prior to the previous flip.
>>>>> +	 * Take the next transition(s) through the RCU grace-period
>>>>> +	 * flip-counter state machine.
>>>>> 	 */
>>>>>
>>>>> -#ifdef CONFIG_RCU_STATS
>>>>> -	rcu_data.n_rcu_try_flip2++;
>>>>> -#endif /* #ifdef CONFIG_RCU_STATS */
>>>>> -	for_each_cpu(cpu) {
>>>>> -		if (atomic_read(&per_cpu(rcu_flipctr, cpu)[!flipctr]) != 0) {
>>>>> -#ifdef CONFIG_RCU_STATS
>>>>> -			rcu_data.n_rcu_try_flip_e3++;
>>>>> -#endif /* #ifdef CONFIG_RCU_STATS */
>>>>> -			spin_unlock_irqrestore(&rcu_ctrlblk.fliplock,
>>>>> oldirq);
>>>>> -			return;
>>>>> -		}
>>>>> +	flipctr = rcu_ctrlblk.completed;
>>>>> +	switch (rcu_try_flip_state) {
>>>>> +	case rcu_try_flip_idle_state:
>>>>> +		if (rcu_try_flip_idle(flipctr))
>>>>> +			break;
>>>>> +		rcu_try_flip_state = rcu_try_flip_in_gp_state;
>>>>> +	case rcu_try_flip_in_gp_state:
>>>>> +		if (rcu_try_flip_in_gp(flipctr))
>>>>> +			break;
>>>>> +		rcu_try_flip_state = rcu_try_flip_waitack_state;
>>>>> +	case rcu_try_flip_waitack_state:
>>>>> +		if (rcu_try_flip_waitack(flipctr))
>>>>> +			break;
>>>>> +		rcu_try_flip_state = rcu_try_flip_waitzero_state;
>>>>> +	case rcu_try_flip_waitzero_state:
>>>>> +		if (rcu_try_flip_waitzero(flipctr))
>>>>> +			break;
>>>>> +		rcu_try_flip_state = rcu_try_flip_waitmb_state;
>>>>> +	case rcu_try_flip_waitmb_state:
>>>>> +		if (rcu_try_flip_waitmb(flipctr))
>>>>> +			break;
>>>>> +		rcu_try_flip_state = rcu_try_flip_idle_state;
>>>>> 	}
>>>>> +	spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
>>>>> +}
>>>>>
>>>>> -	/* Do the flip. */
>>>>> +/*
>>>>> + * Check to see if this CPU needs to report that it has seen the most
>>>>> + * recent counter flip, thereby declaring that all subsequent
>>>>> + * rcu_read_lock() invocations will respect this flip.
>>>>> + */
>>>>>
>>>>> -	smp_mb();
>>>>> -	rcu_ctrlblk.completed++;
>>>>> +static void
>>>>> +rcu_check_flipseen(int cpu)
>>>>> +{
>>>>> +	if (per_cpu(rcu_flip_flag, cpu) == rcu_flipped) {
>>>>> +		smp_mb();  /* Subsequent counter acccesses must see new
>>>>> value */
>>>>> +		per_cpu(rcu_flip_flag, cpu) = rcu_flip_seen;
>>>>> +		smp_mb();  /* probably be implied by interrupt, but... */
>>>>> +	}
>>>>> +}
>>>>>
>>>>> -#ifdef CONFIG_RCU_STATS
>>>>> -	rcu_data.n_rcu_try_flip3++;
>>>>> -#endif /* #ifdef CONFIG_RCU_STATS */
>>>>> -	spin_unlock_irqrestore(&rcu_ctrlblk.fliplock, oldirq);
>>>>> +/*
>>>>> + * Check to see if this CPU needs to do a memory barrier in order to
>>>>> + * ensure that any prior RCU read-side critical sections have committed
>>>>> + * their counter manipulations and critical-section memory references
>>>>> + * before declaring the grace period to be completed.
>>>>> + */
>>>>> +
>>>>> +static void
>>>>> +rcu_check_mb(int cpu)
>>>>> +{
>>>>> +	if (per_cpu(rcu_mb_flag, cpu) == rcu_mb_needed) {
>>>>> +		smp_mb();
>>>>> +		per_cpu(rcu_mb_flag, cpu) = rcu_mb_done;
>>>>> +	}
>>>>> }
>>>>>
>>>>> +/*
>>>>> + * This function is called periodically on each CPU in hardware
>>>>> interrupt
>>>>> + * context.
>>>>> + */
>>>>> +
>>>>> void
>>>>> rcu_check_callbacks(int cpu, int user)
>>>>> {
>>>>> 	unsigned long oldirq;
>>>>>
>>>>> +	rcu_check_flipseen(cpu);
>>>>> +	rcu_check_mb(cpu);
>>>>> 	if (rcu_ctrlblk.completed == rcu_data.completed) {
>>>>> 		rcu_try_flip();
>>>>> 		if (rcu_ctrlblk.completed == rcu_data.completed) {
>>>>> @@ -359,10 +702,10 @@ call_rcu(struct rcu_head *head,
>>>>> }
>>>>>
>>>>> /*
>>>>> - * Crude hack, reduces but does not eliminate possibility of failure.
>>>>> - * Needs to wait for all CPUs to pass through a -voluntary- context
>>>>> - * switch to eliminate possibility of failure.  (Maybe just crank
>>>>> - * priority down...)
>>>>> + * Wait until all currently running preempt_disable() code segments
>>>>> + * (including hardware-irq-disable segments) complete.  Note that
>>>>> + * in -rt this does -not- necessarily result in all currently executing
>>>>> + * interrupt -handlers- having completed.
>>>>> */
>>>>> void
>>>>> synchronize_sched(void)
>>>>> @@ -390,7 +733,7 @@ rcu_pending(int cpu)
>>>>>
>>>>> void __init rcu_init(void)
>>>>> {
>>>>> -/*&&&&*/printk("WARNING: experimental RCU implementation.\n");
>>>>> +/*&&&&*/printk("WARNING: experimental non-atomic RCU
>>>>> implementation.\n");
>>>>> 	spin_lock_init(&rcu_data.lock);
>>>>> 	rcu_data.completed = 0;
>>>>> 	rcu_data.nextlist = NULL;
>>>>> @@ -416,7 +759,8 @@ int rcu_read_proc_data(char *page)
>>>>> 	return sprintf(page,
>>>>> 		       "ggp=%ld lgp=%ld rcc=%ld\n"
>>>>> 		       "na=%ld nl=%ld wa=%ld wl=%ld da=%ld dl=%ld dr=%ld
>>>>> 		       di=%d\n"
>>>>> -		       "rtf1=%d rtf2=%ld rtf3=%ld rtfe1=%d rtfe2=%ld
>>>>> rtfe3=%ld\n",
>>>>> +		       "1=%d e1=%d i1=%ld ie1=%ld g1=%ld a1=%ld ae1=%ld
>>>>> a2=%ld\n"
>>>>> +		       "z1=%ld ze1=%ld z2=%ld m1=%ld me1=%ld m2=%ld\n",
>>>>>
>>>>> 		       rcu_ctrlblk.completed,
>>>>> 		       rcu_data.completed,
>>>>> @@ -431,12 +775,20 @@ int rcu_read_proc_data(char *page)
>>>>> 		       rcu_data.n_done_remove,
>>>>> 		       atomic_read(&rcu_data.n_done_invoked),
>>>>>
>>>>> -		       atomic_read(&rcu_data.n_rcu_try_flip1),
>>>>> -		       rcu_data.n_rcu_try_flip2,
>>>>> -		       rcu_data.n_rcu_try_flip3,
>>>>> +		       atomic_read(&rcu_data.n_rcu_try_flip_1),
>>>>> 		       atomic_read(&rcu_data.n_rcu_try_flip_e1),
>>>>> -		       rcu_data.n_rcu_try_flip_e2,
>>>>> -		       rcu_data.n_rcu_try_flip_e3);
>>>>> +		       rcu_data.n_rcu_try_flip_i1,
>>>>> +		       rcu_data.n_rcu_try_flip_ie1,
>>>>> +		       rcu_data.n_rcu_try_flip_g1,
>>>>> +		       rcu_data.n_rcu_try_flip_a1,
>>>>> +		       rcu_data.n_rcu_try_flip_ae1,
>>>>> +		       rcu_data.n_rcu_try_flip_a2,
>>>>> +		       rcu_data.n_rcu_try_flip_z1,
>>>>> +		       rcu_data.n_rcu_try_flip_ze1,
>>>>> +		       rcu_data.n_rcu_try_flip_z2,
>>>>> +		       rcu_data.n_rcu_try_flip_m1,
>>>>> +		       rcu_data.n_rcu_try_flip_me1,
>>>>> +		       rcu_data.n_rcu_try_flip_m2);
>>>>> }
>>>>>
>>>>> int rcu_read_proc_gp_data(char *page)
>>>>> @@ -464,14 +816,23 @@ int rcu_read_proc_ctrs_data(char *page)
>>>>> 	int cpu;
>>>>> 	int f = rcu_data.completed & 0x1;
>>>>>
>>>>> -	cnt += sprintf(&page[cnt], "CPU last cur\n");
>>>>> +	cnt += sprintf(&page[cnt], "CPU last cur F M\n");
>>>>> 	for_each_cpu(cpu) {
>>>>> -		cnt += sprintf(&page[cnt], "%3d %4d %3d\n",
>>>>> +		cnt += sprintf(&page[cnt], "%3d %4d %3d %d %d\n",
>>>>> 			       cpu,
>>>>> -			       atomic_read(&per_cpu(rcu_flipctr, cpu)[!f]),
>>>>> -			       atomic_read(&per_cpu(rcu_flipctr, cpu)[f]));
>>>>> -	}
>>>>> -	cnt += sprintf(&page[cnt], "ggp = %ld\n", rcu_data.completed);
>>>>> +			       per_cpu(rcu_flipctr, cpu)[!f],
>>>>> +			       per_cpu(rcu_flipctr, cpu)[f],
>>>>> +			       per_cpu(rcu_flip_flag, cpu),
>>>>> +			       per_cpu(rcu_mb_flag, cpu));
>>>>> +	}
>>>>> +	cnt += sprintf(&page[cnt], "ggp = %ld, state = %d",
>>>>> +		       rcu_data.completed, rcu_try_flip_state);
>>>>> +	if ((0 <= rcu_try_flip_state) &&
>>>>> +	    (rcu_try_flip_state <= sizeof(rcu_try_flip_state_names) /
>>>>> +	    			   sizeof(rcu_try_flip_state_names[0])))
>>>>> +		cnt += sprintf(&page[cnt], " (%s)",
>>>>> +			       rcu_try_flip_state_names[rcu_try_flip_state]);
>>>>> +	cnt += sprintf(&page[cnt], "\n");
>>>>> 	return (cnt);
>>>>> }
>>>>>
>>>>>
>>>
>

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [RFC, PATCH -rt] NMI-safe mb- and atomic-free RT RCU
  2006-07-28  0:22         ` Esben Nielsen
@ 2006-07-28  0:47           ` Paul E. McKenney
  0 siblings, 0 replies; 13+ messages in thread
From: Paul E. McKenney @ 2006-07-28  0:47 UTC (permalink / raw)
  To: Esben Nielsen
  Cc: linux-kernel, compudj, billh, rostedt, tglx, mingo, dipankar,
	rusty

On Fri, Jul 28, 2006 at 01:22:27AM +0100, Esben Nielsen wrote:
> 
> 
> On Thu, 27 Jul 2006, Paul E. McKenney wrote:
> 
> >On Thu, Jul 27, 2006 at 12:00:13PM +0100, Esben Nielsen wrote:
> >>Hi Paul,
> >> Thanks, for entering a discussion of my idea! Even though you are busy
> >>and are critical towards my idea, you take your time to answer! Thanks.
> >
> >I appreciate the time you took to write down your ideas, and for your
> >taking my comments in the spirit intended.
> 
> Well, I just think it is fun :-)

Hey, it is even more fun if you code it up and test it!  ;-)

> >>On Wed, 26 Jul 2006, Paul E. McKenney wrote:
> >>
> >>>On Thu, Jul 27, 2006 at 02:39:07AM +0100, Esben Nielsen wrote:
> >>>>
> >>>>
> >>>>On Tue, 25 Jul 2006, Paul E. McKenney wrote:
> >>>>
> >>>>>Not for inclusion, should be viewed with great suspicion.
> >>>>>
> >>>>>This patch provides an NMI-safe realtime RCU.  Hopefully, Mathieu can
> >>>>>make use of synchronize_sched() instead, since I have not yet figured
> >>>>>out a way to make this NMI-safe and still get rid of the interrupt
> >>>>>disabling.  ;-)
> >>>>>
> >>>>>						Thanx, Paul
> >>>>
> >>>>I must say I don't understand all this. It looks very complicated. Is it
> >>>>really needed?
> >>>>
> >>>>I have been thinking about the following design:
> >>>>
> >>>>void rcu_read_lock()
> >>>>{
> >>>>	if (!in_interrupt())
> >>>>		current->rcu_read_lock_count++;
> >>>>}
> >>>>void rcu_read_unlock()
> >>>>{
> >>>>	if (!in_interrupt())
> >>>>		current->rcu_read_lock_count--;
> >>>>}
> >>>>
> >>>>Somewhere in schedule():
> >>>>
> >>>>	rq->rcu_read_lock_count += prev->rcu_read_lock_count;
> >>>>	if (!rq->rcu_read_lock_count)
> >>>>		forward_waiting_rcu_jobs();
> >>>>	rq->rcu_read_lock_count -= next->rcu_read_lock_count;
> >>>
> >>>So rq->rcu_read_lock_count contains the sum of the counts of all
> >>>tasks that were scheduled away from this CPU.
> >>>
> >>>What happens in face of the following sequence of events?
> >>>Assume that all tasks stay on CPU 0 for the moment.
> >>>
> >>>o	Task A does rcu_read_lock().
> >>>
> >>>o	Task A is preempted.  rq->rcu_read_lock_count is nonzero.
> >>>
> >>>o	Task B runs and does rcu_read_lock().
> >>>
> >>>o	Task B is preempted (perhaps because it dropped the lock
> >>>	that was causing it to have high priority).  Regardless of
> >>>	the reason for the preemption, rq->rcu_read_lock_count is
> >>>	nonzero.
> >>>
> >>>o	Task C runs and does rcu_read_lock().
> >>>
> >>>o	Task C is preempted.  rq->rcu_read_lock_count is nonzero.
> >>>
> >>>o	Task A runs again, and does rcu_read_unlock().
> >>>
> >>>o	Task A is preempted.  rq->rcu_read_lock_count is nonzero.
> >>>
> >>>o	Task D runs and does rcu_read_lock().
> >>>
> >>>o	Task D is preempted.  rq->rcu_read_lock_count is nonzero.
> >>>
> >>>And so on.  As long as at least one of the preempted tasks is in an
> >>>RCU critical section, you never do your forward_waiting_rcu_jobs(),
> >>>and the grace period goes on forever.  Or at least until you OOM the
> >>>machine.
> >>>
> >>>So what am I missing here?
> >>
> >>The boosting idea below. Then tasks A-D must be RT tasks for this to
> >>happen. And the machine must anyway run out of RT tasks or it will
> >>effectively lock up.
> >
> >I agree that boosting would drive any particular task out of its
> >RCU read-side critical section, but I do not agree that you have
> >identified a valid induction argument that proves that we run out
> >of tasks (yet, anyway).  For one thing, tasks acquiring locks in
> >the RCU read-side critical sections could be priority boosted,
> >so that a small set of tasks could rise and fall in priority as
> >RT tasks attempted to acquire locks held by this small set of
> >tasks.  The RT tasks need not be running on the same CPU that the
> >non-realtime "victim" tasks running in RCU read-side critical sections,
> >therefore, the RT tasks need not be preempting these non-realtime
> >tasks.
> >
> >To be more specific, imagine a four-CPU system with three RT tasks
> >running on three of the four CPUs and a large number of non-realtime
> >tasks running on all four CPUs.  Consider the code in __d_lookup(), which
> >acquires dentry->d_lock under rcu_read_lock().  Imagine a situation
> >where the three RT tasks are repeatedly traversing pathnames with a
> >long common path prefix, with this same prefix being traversed by the
> >non-realtime tasks.  This could result in the non-realtime tasks being
> >repeatedly priority-boosted and preempted (preempted both by each
> >other and by RT tasks).
> >
> >Over to you!
>
> The PI code will never lower the priority lower than task->normal_prio.
> So if you somewhere in schedule() set normal_prio to something high PI 
> will not destroy it.

How does this interact with other priority boosting that might be going
on due to locks that this task holds?  If we exit the RCU read-side
critical section via rcu_read_unlock(), we need to drop the RCU-based
priority boosting, but still keep any lock-based boosting.

In short, I still don't see how you have eliminated the possibility of
a sequence of boosts, preemptions, and blocking on locks that keeps at
least one RCU read-side critical section alive on the CPU at all times,
even if any given RCU read-side critical section eventually gets done.

For a simpler example than the one I gave above, consider three RT mutexes
m[0], m[1], and m[2].  Suppose that there are a few tens of tasks running on a
given CPU, some of which repeatedly acquire and release these locks
in some random order, and others of which do the following repeatedly:

	rcu_read_lock();
	i = random() % 3;
	spin_lock(m[i]);
	/* do something for about 100 microseconds */
	spin_unlock(m[i]);
	/* do something for about 100 microseconds */
	i = random() % 3;
	spin_lock(m[i]);
	/* do something for about 100 microseconds */
	spin_unlock(m[i]);
	rcu_read_unlock();

This setup would easily give you a situation where your CPU has at least
one blocked task holding rcu_read_lock() at all times.

Yes, this is contrived.  If you don't like the fact that it is contrived,
please revisit my earlier dcache example, which has a not-dissimilar
structure, but with a lot more code to look at.  In fact, if anything,
the dcache example is more painful than my contrived example!

Your choice.  But if you want me to take your suggested algorithm
seriously, you need to take one of these two examples seriously.

> >>>I could imagine introducing a pair of counters per runqueue, but then
> >>>we end up with the same issues with counter-flip that we have now.
> >>>
> >>>Another question -- what happens if a given CPU stays idle?  Wouldn't
> >>>the callbacks then just start piling up on that CPU?
> >>
> >>How can a CPU stay idle? There is a tick every 2.5 ms. Even without that
> >>the previous CPU can make it schedule if it sees the jobs piling up. Or if
> >>that is considered too expensive, it can take over and forward the
> >>jobs to the next CPU.
> >
> >Tickless idle, prized by embedded systems.  Here an idle CPU gets shut
> >down if there is nothing in particular for it to do.  Classic RCU does a
> >dance with CPUs entering tickless idle state in order to avoid expecting
> >those CPUs to participate in future grace periods.
> >
> >This had its challenges for the bitmask-based Classic RCU, and would
> >have a few additional challenges for your circular linked-list setup.
> 
> Can't the previous CPU just see that when it wants to send the RCU-jobs 
> along to it and send it to the next CPU instead?
> Or when a CPU goes idle it can take itself out of the circle to avoid the 
> extra load on the previous CPU.

My concern is what happens when the previous CPU is trying to hand off
callbacks just when the next CPU is pulling itself out of the list.
It would be bad form to strand the callbacks on the now-hot-unplugged
runqueue.  It would be even worse form to skip over the CPU before it
got done with the last RCU read-side critical section in kernel code
just before hot-unplugging.

> >Current -rt RCU avoids this problem by dint of being too stupid to
> >need to know what CPUs are alive or not.  My recent patch needs something
> >similar to the tickless-idle change to Classic RCU.
> >
> >>>>Now what should forward_waiting_rcu_jobs() do?
> >>>>
> >>>>I imagine a circular datastructur of all the CPUs. When a call_rcu() is
> >>>>run on a CPU it is first added a list of jobs for that CPU. When
> >>>>forward_waiting_rcu_jobs() is called all the pending jobs are forwarded 
> >>>>to
> >>>>the next CPU. The next CPU will bring it along the next CPU in the 
> >>>>circle
> >>>>along with it's own jobs. When jobs hit the original CPU they will be
> >>>>executed. Or rather, when the CPU just before calls
> >>>>forward_waiting_rcu_jobs(), it sends the jobs belonging to the next CPU 
> >>>>to
> >>>>the RCU-task of the next CPU, where they will be executed, instead of to
> >>>>the scheduler (runqueue) of the next CPU, where it will just be send out
> >>>>on
> >>>>a
> >>>>new roundtrip along the circle.
> >>>>
> >>>>If you use a structure like the plist then the forwarding procedure can 
> >>>>be
> >>>>done in O(number of online CPUs) time worst case - much less in the 
> >>>>usual
> >>>>case where the lists are almost empty.
> >>>>
> >>>>Now the problem is: What happens if a task in a rcu read-side lock is
> >>>>migrated? Then the rcu_read_lock_count on the source CPU will stay in 
> >>>>plus
> >>>>while on the target CPU it will go in minus. This ought to be simply
> >>>>fixeable by adding task->rcu_read_lock_count to the target runqueue 
> >>>>before
> >>>>migrating and subtracting it from the old runqueue after migrating. But
> >>>>there is another problem: RCU-jobs refering to data used by the task 
> >>>>being
> >>>>migrated might have been forwarded from the target CPU. Thus the 
> >>>>migration
> >>>>task have to go back along the circle of CPUs and move all the relevant
> >>>>RCU-jobs back to the target CPU to be forwarded again. This is also 
> >>>>doable
> >>>>in
> >>>>number of CPUs between source and target times O(<number of online 
> >>>>CPUs>)
> >>>>(see above) time.
> >>>
> >>>So if I have the right (or wrong) pattern of task migrations, the RCU
> >>>callbacks never get to their originating CPU?
> >>
> >>In principle, yes. But if the machine starts to migrate tasks that wildly
> >>it wont get any work done anyway, because all it's time is done doing
> >>migration.
> >
> >Since you are waiting for a context switch to move the RCU callbacks,
> >the migration does not have to be all that wild.  In an eight-CPU system,
> >if I migrate a preempted task a couple of times in the course of the
> >eight context switches required for the callbacks to circle the loop
> >of CPUs, we are in trouble, right?
> 
> Could be. The other idea I came up with below doesn't have that problem.

What problem does it have instead?  (Sorry, couldn't resist!)

> >>>Alternatively, if the task residing in the RCU read-side critical section
> >>>is forwarded around the loop of CPUs, callbacks circulating around this
> >>>loop might execute before the RCU read-side critical section completes.
> >>
> >>That is why some of the callbacks (those which has parsed the target CPU
> >>but not yet the source CPU) have to be moved back to the target CPU.
> >
> >Yep.  Leaving them to follow their planned circuit indeed does not cut it.
> >
> >>I just came up with an even simpler solution:
> >>Delay the subtraction of the task->rcu_read_lock_count from
> >>srcrq->rcu_read_lock_count until the task calls rcu_read_unlock(). That
> >>can be done by flagging the task (do task->rcu_read_lock_count |=
> >>0x80000000) and do a simple
> >>	if (unlickely(current->rcu_read_lock_count == 0x80000000))
> >>		fix_rcu_read_lock_count_on_old_cpu();
> >>in rcu_read_unlock(). Now the task can be migrated again before calloing
> >>fix_rcu_read_lock_count_on_old_cpu(). The relevant RCU jobs still can't
> >>get past the original CPU before the task have called
> >>fix_rcu_read_lock_count_on_old_cpu(), so all subsequent migrations can 
> >>just
> >>do the count down on the intermediate CPUs right away.
> >
> >But now you need atomic instructions in all rcu_read_lock() and
> >rcu_read_unlock() invocations to handle the possibility that some
> >migrated task will decrement the counter at the same time we are
> >incrementing it?  I cannot tell for sure because you have not supplied
> >sample code for fix_rcu_read_lock_count_on_old_cpu().
> 
> Now the atomic part is only kicking in if the task was actually migrated 
> while under RCU readside section so it is rare.

How do the other concurrently executing tasks know that a migration is
about to kick in, so that they should switch to atomic operations?

>                                                 But consider this code:
> 
> void fix_rcu_read_lock_count_on_old_cpu()
> {
> 	int cpuid = current->old_cpuid;
> 	int count = current->old_rcu_read_lock_count;
> 
> 	runqueue_t *rq = cpu_rq(cpuid);
>         spin_lock_irqsave(&rq->lock, flags);
> 	rq->rcu_read_lock_count -= count;
>         spin_unlock_irqrestore(&rq->lock, flags);
> }
> 
> Then there can be something about if the CPU was taken out meanwhile. If 
> the rq->rcu_read_lock is 0 after this, then it can go completely out 
> of the RCU CPU circle now (see below).

But then when we pull the runqueue out of the circle, we need to grab
the locks of its neighbors?

> >>>>To avoid a task in a read-side lock being starved for too long the
> >>>>following line can be added to normal_prio():
> >>>> if (p->rcu_read_lock_count)
> >>>>	p->prio = MAX_RT_PRIO;
> >>>
> >>>But doesn't this have the same impact on latency as disabling preemption
> >>>in rcu_read_lock() and then re-enabling it in rcu_read_unlock()?
> >>
> >>No, RT tasks can still preempt the RCU read side lock. But SCHED_OTHER and
> >>SCHED_BATCH can't. You can also the RCU read side boosting prioritiy
> >>dynamic and let the system adjust it or just let the admin adjust it.
> >
> >Fair enough -- I misread MAX_RT_PRIO as MAX_PRIO.
> >
> >This approach I can get behind -- my thought has been to boost to
> >either MAX_RT_PRIO or MAX_RT_PRIO-1 when preempt_schedule() sees that
> >it is preempting an RCU read-side critical section.
> >
> >So I agree with you on at least one point!  ;-)
> >
> >A possible elaboration would be to keep a linked list of tasks preempted
> >in their RCU read-side critical sections so that they can be further
> >boosted to the highest possible priority (numerical value of zero,
> >not sure what the proper symbol is) if the grace period takes too many
> >jiffies to complete.  Another piece is priority boosting when blocking
> >on a mutex from within an RCU read-side critical section.
> >
> >>>Also, doesn't this circular data structure need to handle CPU hotplug?
> >>
> >>Ofcourse. I don't know about hotplug though. But it sounds simple to
> >>migrate the tasks away, take the CPU out of the circle and then forward
> >>the last RCU jobs from that CPU.
> >
> >Doing it efficiently is the difficulty, particularly for tickless-idle
> >systems where CPUs need to be added and removed on a regular basis.
> >Also, what locking design would you use in order to avoid deadlock?
> >There is a hotplug mutex, but seems like you might need to acquire some
> >number of rq mutexes as well.
> 
> I think it can be done very effectively. When taking a CPU out: If, after 
> migrating all the tasks off the CPU, the rq->rcu_read_lock_count is 0, 
> alter the previous CPUs next CPU to this CPUs next CPU. Then forward the 
> pending RCU jobs. If some of the migrated task where in an RCU read side 
> section rq->rcu_read_lock_count > 0. Then the CPU has to be keeped in the 
> RCU circle until all are done. Either an event can be set up or the 
> previous CPU can check on it everytime it tries to forward jobs to the 
> CPU.

What sort of state is the CPU in while it is waiting for all the relevant
RCU callbacks to make their way around the loop?  If it is still active,
it is probably generating more callbacks.  If not, what kind of special
state is it in, and what other parts of the system need to know about that
special state?  Ditto for the runqueue.

> When putting the CPU in again: Simply add it to the circle when it is up 
> running.

Agreed, adding CPUs is -much- easier than removing them.

> >>>>I don't have time to code this nor a SMP machine to test it on. But I 
> >>>>can
> >>>>give the idea to you anyways in the hope you might code it :-)
> >>>
> >>>I am beginning to think that it will not be at all simple by the time I
> >>>code up all the required fixups.  Or am I missing something?
> >>
> >>Ofcourse, implementing something is always a lot harder than writing the
> >>idea down. Anyway, we already worked out some of the hardest details :-)
> >
> >I certainly agree with your first sentence.  On your second sentence,
> >I would s/worked out/mentioned/  ;-)
> >
> >Another approach I am looking at does not permit rcu_read_lock() in
> >NMI/SMI/hardirq, but is much simpler.  Its downside is that it cannot
> >serve as common code between CONFIG_PREEMPT_RT and CONFIG_PREEMPT.
> >
> Iack :-( That is not good. :-(

You can't always get what you want.  ;-)

You might want to look at the OLS 2002 read-copy update paper.  It
describes an algorithm (rcu-sched, due to Rusty, but similar to earlier
algorithms used in the Tornado and K42 research operating systems) that
maintains a circle of CPUs.  However, this was before both tickless idle
and CPU hotplug, and also before realtime RCU.  One major difference
is that Rusty's approach circulates a count rather than the callbacks
themselves.  Rusty's approach should avoid the need to gather callbacks --
though it still would have some challenges handling CPU hotplug.

							Thanx, Paul

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [RFC, PATCH -rt] NMI-safe mb- and atomic-free RT RCU
  2006-07-28  0:02           ` Paul E. McKenney
@ 2006-07-28  0:48             ` Bill Huey
  2006-07-28  4:56               ` Paul E. McKenney
  0 siblings, 1 reply; 13+ messages in thread
From: Bill Huey @ 2006-07-28  0:48 UTC (permalink / raw)
  To: Paul E. McKenney
  Cc: Esben Nielsen, linux-kernel, compudj, rostedt, tglx, mingo,
	dipankar, rusty, Bill Huey (hui)

On Thu, Jul 27, 2006 at 05:02:31PM -0700, Paul E. McKenney wrote:
> On Thu, Jul 27, 2006 at 12:53:56PM -0700, Bill Huey wrote:
> > On Thu, Jul 27, 2006 at 08:46:37AM -0700, Paul E. McKenney wrote:
> > > A possible elaboration would be to keep a linked list of tasks preempted
> > > in their RCU read-side critical sections so that they can be further
> > > boosted to the highest possible priority (numerical value of zero,
> > > not sure what the proper symbol is) if the grace period takes too many
> > > jiffies to complete.  Another piece is priority boosting when blocking
> > > on a mutex from within an RCU read-side critical section.
> > 
> > I'm not sure how folks feel about putting something like that in the
> > scheduler path since it's such a specialized cases. Some of the scheduler
> > folks might come out against this.
> 
> They might well.  And the resulting discussion might reveal a better
> way.  Or it might well turn out that the simple approach of boosting
> to an intermediate level without the list will suffice.

Another thing. What you mention above is really just having a set of owners
for the read side and not really a preemption list tracking thing with RCU
and special scheduler path. The more RCU does this kind of thing the more
it's just like a traditional read/write lock but with more parallelism since
it's holding on to read side owners on a per CPU basis.

This was close to the idea I had for extending read/write locks to be more
parallel friendly for live CPUs, per CPU owner bins on individual cache lines
(I'll clarify if somebody asks), but the use of read/write locks is seldom
and in non-critical places, so just moving the code fully to RCU would be a
better solution. The biggest problem is to scan or denote to some central
structure (task struct, lock struct) when you were either in or out of the
reader section without costly atomic operations. That's a really huge cost
as you know already (OLS slides).

bill


^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [RFC, PATCH -rt] NMI-safe mb- and atomic-free RT RCU
  2006-07-28  0:48             ` Bill Huey
@ 2006-07-28  4:56               ` Paul E. McKenney
  2006-07-28 11:14                 ` Esben Nielsen
  0 siblings, 1 reply; 13+ messages in thread
From: Paul E. McKenney @ 2006-07-28  4:56 UTC (permalink / raw)
  To: Bill Huey
  Cc: Esben Nielsen, linux-kernel, compudj, rostedt, tglx, mingo,
	dipankar, rusty

On Thu, Jul 27, 2006 at 05:48:57PM -0700, Bill Huey wrote:
> On Thu, Jul 27, 2006 at 05:02:31PM -0700, Paul E. McKenney wrote:
> > On Thu, Jul 27, 2006 at 12:53:56PM -0700, Bill Huey wrote:
> > > On Thu, Jul 27, 2006 at 08:46:37AM -0700, Paul E. McKenney wrote:
> > > > A possible elaboration would be to keep a linked list of tasks preempted
> > > > in their RCU read-side critical sections so that they can be further
> > > > boosted to the highest possible priority (numerical value of zero,
> > > > not sure what the proper symbol is) if the grace period takes too many
> > > > jiffies to complete.  Another piece is priority boosting when blocking
> > > > on a mutex from within an RCU read-side critical section.
> > > 
> > > I'm not sure how folks feel about putting something like that in the
> > > scheduler path since it's such a specialized cases. Some of the scheduler
> > > folks might come out against this.
> > 
> > They might well.  And the resulting discussion might reveal a better
> > way.  Or it might well turn out that the simple approach of boosting
> > to an intermediate level without the list will suffice.
> 
> Another thing. What you mention above is really just having a set of owners
> for the read side and not really a preemption list tracking thing with RCU
> and special scheduler path. The more RCU does this kind of thing the more
> it's just like a traditional read/write lock but with more parallelism since
> it's holding on to read side owners on a per CPU basis.

There are certainly some similarities between a priority-boosted RCU
read-side critical section and a priority-boosted read-side rwlock.
In theory, the crucial difference is that as long as one has sufficient
memory, one can delay priority-boosting the RCU read-side critical
sections without hurting update-side latency (aside from the grace period
delays, of course).

So I will no doubt be regretting my long-standing advice to use
synchronize_rcu() over call_rcu().  Sooner or later someone will care
deeply about the grace-period latency.  In fact, I already got some
questions about that at this past OLS.  ;-)

> This was close to the idea I had for extending read/write locks to be more
> parallel friendly for live CPUs, per CPU owner bins on individual cache lines
> (I'll clarify if somebody asks), but the use of read/write locks is seldom
> and in non-critical places, so just moving the code fully to RCU would be a
> better solution. The biggest problem is to scan or denote to some central
> structure (task struct, lock struct) when you were either in or out of the
> reader section without costly atomic operations. That's a really huge cost
> as you know already (OLS slides).

Yep -- create something sort of like brlock, permitting limited read-side
parallelism, and also permitting the current exclusive-lock priority
inheritance to operate naturally.

Easy enough to do with per-CPU variables if warranted.  Although the
write-side lock-acquisition latency can get a bit ugly, since you have
to acquire N locks.

I expect that we all (myself included) have a bit of learning left to
work out the optimal locking strategy so as to provide both realtime
latency and performance/scalability.  ;-)

							Thanx, Paul

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [RFC, PATCH -rt] NMI-safe mb- and atomic-free RT RCU
  2006-07-28  4:56               ` Paul E. McKenney
@ 2006-07-28 11:14                 ` Esben Nielsen
  2006-07-28 15:25                   ` Paul E. McKenney
  0 siblings, 1 reply; 13+ messages in thread
From: Esben Nielsen @ 2006-07-28 11:14 UTC (permalink / raw)
  To: Paul E. McKenney
  Cc: Bill Huey, Esben Nielsen, linux-kernel, compudj, rostedt, tglx,
	mingo, dipankar, rusty



On Thu, 27 Jul 2006, Paul E. McKenney wrote:

> On Thu, Jul 27, 2006 at 05:48:57PM -0700, Bill Huey wrote:
>> On Thu, Jul 27, 2006 at 05:02:31PM -0700, Paul E. McKenney wrote:
>>> On Thu, Jul 27, 2006 at 12:53:56PM -0700, Bill Huey wrote:
>>>> On Thu, Jul 27, 2006 at 08:46:37AM -0700, Paul E. McKenney wrote:
>>>>> A possible elaboration would be to keep a linked list of tasks preempted
>>>>> in their RCU read-side critical sections so that they can be further
>>>>> boosted to the highest possible priority (numerical value of zero,
>>>>> not sure what the proper symbol is) if the grace period takes too many
>>>>> jiffies to complete.  Another piece is priority boosting when blocking
>>>>> on a mutex from within an RCU read-side critical section.
>>>>
>>>> I'm not sure how folks feel about putting something like that in the
>>>> scheduler path since it's such a specialized cases. Some of the scheduler
>>>> folks might come out against this.
>>>
>>> They might well.  And the resulting discussion might reveal a better
>>> way.  Or it might well turn out that the simple approach of boosting
>>> to an intermediate level without the list will suffice.
>>
>> Another thing. What you mention above is really just having a set of owners
>> for the read side and not really a preemption list tracking thing with RCU
>> and special scheduler path. The more RCU does this kind of thing the more
>> it's just like a traditional read/write lock but with more parallelism since
>> it's holding on to read side owners on a per CPU basis.
>
> There are certainly some similarities between a priority-boosted RCU
> read-side critical section and a priority-boosted read-side rwlock.
> In theory, the crucial difference is that as long as one has sufficient
> memory, one can delay priority-boosting the RCU read-side critical
> sections without hurting update-side latency (aside from the grace period
> delays, of course).
>
> So I will no doubt be regretting my long-standing advice to use
> synchronize_rcu() over call_rcu().  Sooner or later someone will care
> deeply about the grace-period latency.  In fact, I already got some
> questions about that at this past OLS.  ;-)

Yick!! Do people really expect these things to finish in a predictable 
amount of time?
This reminds me of C++ hackers starting to code Java. They want to use the 
finalizer to close files etc. just as they use the destructor in C++, but 
can't understand that they have to wait until the garbage collector has 
run.
RCU is a primitive kind of garbage collector. You should never depend on 
how long it is about doing it's work, just that it will get done at some 
point.

Esben

>
>> This was close to the idea I had for extending read/write locks to be more
>> parallel friendly for live CPUs, per CPU owner bins on individual cache lines
>> (I'll clarify if somebody asks), but the use of read/write locks is seldom
>> and in non-critical places, so just moving the code fully to RCU would be a
>> better solution. The biggest problem is to scan or denote to some central
>> structure (task struct, lock struct) when you were either in or out of the
>> reader section without costly atomic operations. That's a really huge cost
>> as you know already (OLS slides).
>
> Yep -- create something sort of like brlock, permitting limited read-side
> parallelism, and also permitting the current exclusive-lock priority
> inheritance to operate naturally.
>
> Easy enough to do with per-CPU variables if warranted.  Although the
> write-side lock-acquisition latency can get a bit ugly, since you have
> to acquire N locks.
>
> I expect that we all (myself included) have a bit of learning left to
> work out the optimal locking strategy so as to provide both realtime
> latency and performance/scalability.  ;-)
>
> 							Thanx, Paul
>

^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [RFC, PATCH -rt] NMI-safe mb- and atomic-free RT RCU
  2006-07-28 11:14                 ` Esben Nielsen
@ 2006-07-28 15:25                   ` Paul E. McKenney
  0 siblings, 0 replies; 13+ messages in thread
From: Paul E. McKenney @ 2006-07-28 15:25 UTC (permalink / raw)
  To: Esben Nielsen
  Cc: Bill Huey, linux-kernel, compudj, rostedt, tglx, mingo, dipankar,
	rusty

On Fri, Jul 28, 2006 at 12:14:18PM +0100, Esben Nielsen wrote:
> On Thu, 27 Jul 2006, Paul E. McKenney wrote:
> >On Thu, Jul 27, 2006 at 05:48:57PM -0700, Bill Huey wrote:
> >>Another thing. What you mention above is really just having a set of 
> >>owners
> >>for the read side and not really a preemption list tracking thing with RCU
> >>and special scheduler path. The more RCU does this kind of thing the more
> >>it's just like a traditional read/write lock but with more parallelism 
> >>since
> >>it's holding on to read side owners on a per CPU basis.
> >
> >There are certainly some similarities between a priority-boosted RCU
> >read-side critical section and a priority-boosted read-side rwlock.
> >In theory, the crucial difference is that as long as one has sufficient
> >memory, one can delay priority-boosting the RCU read-side critical
> >sections without hurting update-side latency (aside from the grace period
> >delays, of course).

Here is a better list of the similarities and differences between
rwlock and RCU in -rt:

	Attribute			rwlock		RCU

	Readers block updates		Y		N

	Readers block freeing		Y		Y (synchronize_rcu())

	Updates block readers		Y		N

	Deterministic readers		N		Y

	Readers block readers		Y (-rt!)	N

	Readers preemptible		Y		Y

	Readers can block on locks	Y		Y

	Readers can block arbitrarily	Y		N (SRCU for this)


So in rwlock, priority boosting of readers is required to allow
updates to happen -at- -all-, while in RCU, priority boosting is
required to get deferred free (call_rcu(), synchronize_rcu()) unstuck.

> >So I will no doubt be regretting my long-standing advice to use
> >synchronize_rcu() over call_rcu().  Sooner or later someone will care
> >deeply about the grace-period latency.  In fact, I already got some
> >questions about that at this past OLS.  ;-)
> 
> Yick!! Do people really expect these things to finish in a predictable 
> amount of time?

Yeah, this is -almost- as unreasonable as expecting preemptible RCU
read-side critical sections!!!  ;-)  ;-)  ;-)

> This reminds me of C++ hackers starting to code Java. They want to use the 
> finalizer to close files etc. just as they use the destructor in C++, but 
> can't understand that they have to wait until the garbage collector has 
> run.
> RCU is a primitive kind of garbage collector. You should never depend on 
> how long it is about doing it's work, just that it will get done at some 
> point.

Seriously, it probably is not all that hard to get an upper bound on
synchronize_rcu() latency, as long as one is willing to put up with the
upper bound being a handful of jiffies (as opposed to being down in the
microsecond range.  In addition, this upper bound would be a function
of the number of tasks, and would also require boosting RCU read-side
priority to maximum.

That said, a (say) ten-jiffy upper bound on synchronize_rcu() would
probably not be what they were looking for.  ;-)

							Thanx, Paul

^ permalink raw reply	[flat|nested] 13+ messages in thread

end of thread, other threads:[~2006-07-28 15:25 UTC | newest]

Thread overview: 13+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2006-07-26  0:17 [RFC, PATCH -rt] NMI-safe mb- and atomic-free RT RCU Paul E. McKenney
2006-07-27  1:39 ` Esben Nielsen
2006-07-27  1:39   ` Paul E. McKenney
2006-07-27 11:00     ` Esben Nielsen
2006-07-27 15:46       ` Paul E. McKenney
2006-07-27 19:53         ` Bill Huey
2006-07-28  0:02           ` Paul E. McKenney
2006-07-28  0:48             ` Bill Huey
2006-07-28  4:56               ` Paul E. McKenney
2006-07-28 11:14                 ` Esben Nielsen
2006-07-28 15:25                   ` Paul E. McKenney
2006-07-28  0:22         ` Esben Nielsen
2006-07-28  0:47           ` Paul E. McKenney

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox