public inbox for linux-kernel@vger.kernel.org
 help / color / mirror / Atom feed
* [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups
@ 2008-08-05 16:21 Paul E. McKenney
  2008-08-05 16:48 ` Steven Rostedt
                   ` (2 more replies)
  0 siblings, 3 replies; 17+ messages in thread
From: Paul E. McKenney @ 2008-08-05 16:21 UTC (permalink / raw)
  To: linux-kernel; +Cc: mingo, akpm, oleg, manfred, dipankar, rostedt, dvhltc, niv

This patch simplifies the locking and memory-barrier usage in the Classic
RCU grace-period-detection mechanism, incorporating Lai Jiangshan's
feedback from the earlier version (http://lkml.org/lkml/2008/8/1/400
and http://lkml.org/lkml/2008/8/3/43).  Passed 10 hours of
rcutorture concurrent with CPUs being put online and taken offline on
a 128-hardware-thread Power machine.  My apologies to whoever in the
Eastern Hemisphere was planning to use this machine over the Western
Hemisphere night, but it was sitting idle and...

So this is ready for tip/core/rcu.

This patch is in preparation for moving to a hierarchical
algorithm to allow the very large SMP machines -- requested by some
people at OLS, and there seem to have been a few recent patches in the
4096-CPU direction as well.  The general idea is to move to a much more
conservative concurrency design, then apply a hierarchy to reduce
contention on the global lock by a few orders of magnitude (larger
machines would see greater reductions).  The reason for taking a
conservative approach is that this code isn't on any fast path.

Prototype in progress.

This patch is against the linux-tip git tree (tip/core/rcu).  If you
wish to test this against 2.6.26, use the following set of patches:

http://www.rdrop.com/users/paulmck/patches/2.6.26-ljsimp-1.patch
http://www.rdrop.com/users/paulmck/patches/2.6.26-ljsimpfix-3.patch

The first patch combines commits 5127bed588a2f8f3a1f732de2a8a190b7df5dce3
and 3cac97cbb14aed00d83eb33d4613b0fe3aaea863 from Lai Jiangshan
<laijs@cn.fujitsu.com>, and the second patch contains my changes.

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
---

 rcuclassic.c |   51 +++++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 41 insertions(+), 10 deletions(-)

diff --git a/kernel/rcuclassic.c b/kernel/rcuclassic.c
index d3553ee..e6f42ef 100644
--- a/kernel/rcuclassic.c
+++ b/kernel/rcuclassic.c
@@ -86,6 +86,7 @@ static void force_quiescent_state(struct rcu_data *rdp,
 	int cpu;
 	cpumask_t cpumask;
 	set_need_resched();
+	spin_lock(&rcp->lock);
 	if (unlikely(!rcp->signaled)) {
 		rcp->signaled = 1;
 		/*
@@ -111,6 +112,7 @@ static void force_quiescent_state(struct rcu_data *rdp,
 		for_each_cpu_mask(cpu, cpumask)
 			smp_send_reschedule(cpu);
 	}
+	spin_unlock(&rcp->lock);
 }
 #else
 static inline void force_quiescent_state(struct rcu_data *rdp,
@@ -124,7 +126,9 @@ static void __call_rcu(struct rcu_head *head, struct rcu_ctrlblk *rcp,
 		struct rcu_data *rdp)
 {
 	long batch;
-	smp_mb(); /* reads the most recently updated value of rcu->cur. */
+
+	head->next = NULL;
+	smp_mb(); /* Read of rcu->cur must happen after any change by caller. */
 
 	/*
 	 * Determine the batch number of this callback.
@@ -174,7 +178,6 @@ void call_rcu(struct rcu_head *head,
 	unsigned long flags;
 
 	head->func = func;
-	head->next = NULL;
 	local_irq_save(flags);
 	__call_rcu(head, &rcu_ctrlblk, &__get_cpu_var(rcu_data));
 	local_irq_restore(flags);
@@ -203,7 +206,6 @@ void call_rcu_bh(struct rcu_head *head,
 	unsigned long flags;
 
 	head->func = func;
-	head->next = NULL;
 	local_irq_save(flags);
 	__call_rcu(head, &rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
 	local_irq_restore(flags);
@@ -389,17 +391,17 @@ static void rcu_move_batch(struct rcu_data *this_rdp, struct rcu_head *list,
 static void __rcu_offline_cpu(struct rcu_data *this_rdp,
 				struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
 {
-	/* if the cpu going offline owns the grace period
+	/*
+	 * if the cpu going offline owns the grace period
 	 * we can block indefinitely waiting for it, so flush
 	 * it here
 	 */
 	spin_lock_bh(&rcp->lock);
 	if (rcp->cur != rcp->completed)
 		cpu_quiet(rdp->cpu, rcp);
-	spin_unlock_bh(&rcp->lock);
-	/* spin_lock implies smp_mb() */
 	rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail, rcp->cur + 1);
 	rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail[2], rcp->cur + 1);
+	spin_unlock_bh(&rcp->lock);
 
 	local_irq_disable();
 	this_rdp->qlen += rdp->qlen;
@@ -433,16 +435,19 @@ static void rcu_offline_cpu(int cpu)
 static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
 					struct rcu_data *rdp)
 {
+	long completed_snap;
+
 	if (rdp->nxtlist) {
 		local_irq_disable();
+		completed_snap = ACCESS_ONCE(rcp->completed);
 
 		/*
 		 * move the other grace-period-completed entries to
 		 * [rdp->nxtlist, *rdp->nxttail[0]) temporarily
 		 */
-		if (!rcu_batch_before(rcp->completed, rdp->batch))
+		if (!rcu_batch_before(completed_snap, rdp->batch))
 			rdp->nxttail[0] = rdp->nxttail[1] = rdp->nxttail[2];
-		else if (!rcu_batch_before(rcp->completed, rdp->batch - 1))
+		else if (!rcu_batch_before(completed_snap, rdp->batch - 1))
 			rdp->nxttail[0] = rdp->nxttail[1];
 
 		/*
@@ -483,20 +488,38 @@ static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
 
 static void rcu_process_callbacks(struct softirq_action *unused)
 {
+	/*
+	 * Memory references from any prior RCU read-side critical sections
+	 * executed by the interrupted code must be see before any RCU
+	 * grace-period manupulations below.
+	 */
+
+	smp_mb(); /* See above block comment. */
+
 	__rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
 	__rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
+
+	/*
+	 * Memory references from any later RCU read-side critical sections
+	 * executed by the interrupted code must be see after any RCU
+	 * grace-period manupulations above.
+	 */
+
+	smp_mb(); /* See above block comment. */
 }
 
 static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
 {
 	if (rdp->nxtlist) {
+		long completed_snap = ACCESS_ONCE(rcp->completed);
+
 		/*
 		 * This cpu has pending rcu entries and the grace period
 		 * for them has completed.
 		 */
-		if (!rcu_batch_before(rcp->completed, rdp->batch))
+		if (!rcu_batch_before(completed_snap, rdp->batch))
 			return 1;
-		if (!rcu_batch_before(rcp->completed, rdp->batch - 1) &&
+		if (!rcu_batch_before(completed_snap, rdp->batch - 1) &&
 				rdp->nxttail[0] != rdp->nxttail[1])
 			return 1;
 		if (rdp->nxttail[0] != &rdp->nxtlist)
@@ -547,6 +570,12 @@ int rcu_needs_cpu(int cpu)
 	return !!rdp->nxtlist || !!rdp_bh->nxtlist || rcu_pending(cpu);
 }
 
+/*
+ * Top-level function driving RCU grace-period detection, normally
+ * invoked from the scheduler-clock interrupt.  This function simply
+ * increments counters that are read only from softirq by this same
+ * CPU, so there are no memory barriers required.
+ */
 void rcu_check_callbacks(int cpu, int user)
 {
 	if (user ||
@@ -590,6 +619,7 @@ void rcu_check_callbacks(int cpu, int user)
 static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
 						struct rcu_data *rdp)
 {
+	spin_lock(&rcp->lock);
 	memset(rdp, 0, sizeof(*rdp));
 	rdp->nxttail[0] = rdp->nxttail[1] = rdp->nxttail[2] = &rdp->nxtlist;
 	rdp->donetail = &rdp->donelist;
@@ -597,6 +627,7 @@ static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
 	rdp->qs_pending = 0;
 	rdp->cpu = cpu;
 	rdp->blimit = blimit;
+	spin_unlock(&rcp->lock);
 }
 
 static void __cpuinit rcu_online_cpu(int cpu)

^ permalink raw reply related	[flat|nested] 17+ messages in thread

* Re: [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups
  2008-08-05 16:21 [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups Paul E. McKenney
@ 2008-08-05 16:48 ` Steven Rostedt
  2008-08-05 17:40   ` Paul E. McKenney
  2008-08-06  5:30 ` Manfred Spraul
  2008-08-15 14:09 ` [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups Ingo Molnar
  2 siblings, 1 reply; 17+ messages in thread
From: Steven Rostedt @ 2008-08-05 16:48 UTC (permalink / raw)
  To: Paul E. McKenney
  Cc: linux-kernel, mingo, akpm, oleg, manfred, dipankar, dvhltc, niv


On Tue, 5 Aug 2008, Paul E. McKenney wrote:
> ---
> 
>  rcuclassic.c |   51 +++++++++++++++++++++++++++++++++++++++++----------
>  1 file changed, 41 insertions(+), 10 deletions(-)
> 
> diff --git a/kernel/rcuclassic.c b/kernel/rcuclassic.c
> index d3553ee..e6f42ef 100644
> --- a/kernel/rcuclassic.c
> +++ b/kernel/rcuclassic.c
> @@ -86,6 +86,7 @@ static void force_quiescent_state(struct rcu_data *rdp,
>  	int cpu;
>  	cpumask_t cpumask;
>  	set_need_resched();
> +	spin_lock(&rcp->lock);
>  	if (unlikely(!rcp->signaled)) {
>  		rcp->signaled = 1;
>  		/*
> @@ -111,6 +112,7 @@ static void force_quiescent_state(struct rcu_data *rdp,
>  		for_each_cpu_mask(cpu, cpumask)
>  			smp_send_reschedule(cpu);
>  	}
> +	spin_unlock(&rcp->lock);
>  }
>  #else
>  static inline void force_quiescent_state(struct rcu_data *rdp,
> @@ -124,7 +126,9 @@ static void __call_rcu(struct rcu_head *head, struct rcu_ctrlblk *rcp,
>  		struct rcu_data *rdp)
>  {
>  	long batch;
> -	smp_mb(); /* reads the most recently updated value of rcu->cur. */
> +
> +	head->next = NULL;
> +	smp_mb(); /* Read of rcu->cur must happen after any change by caller. */
>  
>  	/*
>  	 * Determine the batch number of this callback.
> @@ -174,7 +178,6 @@ void call_rcu(struct rcu_head *head,
>  	unsigned long flags;
>  
>  	head->func = func;
> -	head->next = NULL;
>  	local_irq_save(flags);
>  	__call_rcu(head, &rcu_ctrlblk, &__get_cpu_var(rcu_data));
>  	local_irq_restore(flags);
> @@ -203,7 +206,6 @@ void call_rcu_bh(struct rcu_head *head,
>  	unsigned long flags;
>  
>  	head->func = func;
> -	head->next = NULL;
>  	local_irq_save(flags);
>  	__call_rcu(head, &rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
>  	local_irq_restore(flags);
> @@ -389,17 +391,17 @@ static void rcu_move_batch(struct rcu_data *this_rdp, struct rcu_head *list,
>  static void __rcu_offline_cpu(struct rcu_data *this_rdp,
>  				struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
>  {
> -	/* if the cpu going offline owns the grace period
> +	/*
> +	 * if the cpu going offline owns the grace period
>  	 * we can block indefinitely waiting for it, so flush
>  	 * it here
>  	 */
>  	spin_lock_bh(&rcp->lock);
>  	if (rcp->cur != rcp->completed)
>  		cpu_quiet(rdp->cpu, rcp);
> -	spin_unlock_bh(&rcp->lock);
> -	/* spin_lock implies smp_mb() */

The spin_unlock is being removed here. Was that the smp_mb that was 
implied or is it the above spin_lock_bh?

>  	rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail, rcp->cur + 1);
>  	rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail[2], rcp->cur + 1);
> +	spin_unlock_bh(&rcp->lock);
>  
>  	local_irq_disable();
>  	this_rdp->qlen += rdp->qlen;
> @@ -433,16 +435,19 @@ static void rcu_offline_cpu(int cpu)
>  static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
>  					struct rcu_data *rdp)
>  {
> +	long completed_snap;
> +
>  	if (rdp->nxtlist) {
>  		local_irq_disable();
> +		completed_snap = ACCESS_ONCE(rcp->completed);
>  
>  		/*
>  		 * move the other grace-period-completed entries to
>  		 * [rdp->nxtlist, *rdp->nxttail[0]) temporarily
>  		 */
> -		if (!rcu_batch_before(rcp->completed, rdp->batch))
> +		if (!rcu_batch_before(completed_snap, rdp->batch))
>  			rdp->nxttail[0] = rdp->nxttail[1] = rdp->nxttail[2];
> -		else if (!rcu_batch_before(rcp->completed, rdp->batch - 1))
> +		else if (!rcu_batch_before(completed_snap, rdp->batch - 1))
>  			rdp->nxttail[0] = rdp->nxttail[1];
>  
>  		/*
> @@ -483,20 +488,38 @@ static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
>  
>  static void rcu_process_callbacks(struct softirq_action *unused)
>  {
> +	/*
> +	 * Memory references from any prior RCU read-side critical sections
> +	 * executed by the interrupted code must be see before any RCU
> +	 * grace-period manupulations below.
> +	 */
> +
> +	smp_mb(); /* See above block comment. */
> +
>  	__rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
>  	__rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
> +
> +	/*
> +	 * Memory references from any later RCU read-side critical sections
> +	 * executed by the interrupted code must be see after any RCU
> +	 * grace-period manupulations above.
> +	 */
> +
> +	smp_mb(); /* See above block comment. */
>  }
>  
>  static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
>  {
>  	if (rdp->nxtlist) {
> +		long completed_snap = ACCESS_ONCE(rcp->completed);
> +
>  		/*
>  		 * This cpu has pending rcu entries and the grace period
>  		 * for them has completed.
>  		 */
> -		if (!rcu_batch_before(rcp->completed, rdp->batch))
> +		if (!rcu_batch_before(completed_snap, rdp->batch))
>  			return 1;
> -		if (!rcu_batch_before(rcp->completed, rdp->batch - 1) &&
> +		if (!rcu_batch_before(completed_snap, rdp->batch - 1) &&
>  				rdp->nxttail[0] != rdp->nxttail[1])
>  			return 1;
>  		if (rdp->nxttail[0] != &rdp->nxtlist)
> @@ -547,6 +570,12 @@ int rcu_needs_cpu(int cpu)
>  	return !!rdp->nxtlist || !!rdp_bh->nxtlist || rcu_pending(cpu);
>  }
>  
> +/*
> + * Top-level function driving RCU grace-period detection, normally
> + * invoked from the scheduler-clock interrupt.  This function simply
> + * increments counters that are read only from softirq by this same
> + * CPU, so there are no memory barriers required.
> + */
>  void rcu_check_callbacks(int cpu, int user)
>  {
>  	if (user ||
> @@ -590,6 +619,7 @@ void rcu_check_callbacks(int cpu, int user)
>  static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
>  						struct rcu_data *rdp)
>  {
> +	spin_lock(&rcp->lock);
>  	memset(rdp, 0, sizeof(*rdp));
>  	rdp->nxttail[0] = rdp->nxttail[1] = rdp->nxttail[2] = &rdp->nxtlist;
>  	rdp->donetail = &rdp->donelist;
> @@ -597,6 +627,7 @@ static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
>  	rdp->qs_pending = 0;
>  	rdp->cpu = cpu;
>  	rdp->blimit = blimit;
> +	spin_unlock(&rcp->lock);
>  }
>  
>  static void __cpuinit rcu_online_cpu(int cpu)
> 

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups
  2008-08-05 16:48 ` Steven Rostedt
@ 2008-08-05 17:40   ` Paul E. McKenney
  0 siblings, 0 replies; 17+ messages in thread
From: Paul E. McKenney @ 2008-08-05 17:40 UTC (permalink / raw)
  To: Steven Rostedt
  Cc: linux-kernel, mingo, akpm, oleg, manfred, dipankar, dvhltc, niv

On Tue, Aug 05, 2008 at 12:48:41PM -0400, Steven Rostedt wrote:

Thank you for looking this over, Steve!

> On Tue, 5 Aug 2008, Paul E. McKenney wrote:
> > ---
> > 
> >  rcuclassic.c |   51 +++++++++++++++++++++++++++++++++++++++++----------
> >  1 file changed, 41 insertions(+), 10 deletions(-)
> > 
> > diff --git a/kernel/rcuclassic.c b/kernel/rcuclassic.c
> > index d3553ee..e6f42ef 100644
> > --- a/kernel/rcuclassic.c
> > +++ b/kernel/rcuclassic.c
> > @@ -86,6 +86,7 @@ static void force_quiescent_state(struct rcu_data *rdp,
> >  	int cpu;
> >  	cpumask_t cpumask;
> >  	set_need_resched();
> > +	spin_lock(&rcp->lock);
> >  	if (unlikely(!rcp->signaled)) {
> >  		rcp->signaled = 1;
> >  		/*
> > @@ -111,6 +112,7 @@ static void force_quiescent_state(struct rcu_data *rdp,
> >  		for_each_cpu_mask(cpu, cpumask)
> >  			smp_send_reschedule(cpu);
> >  	}
> > +	spin_unlock(&rcp->lock);
> >  }
> >  #else
> >  static inline void force_quiescent_state(struct rcu_data *rdp,
> > @@ -124,7 +126,9 @@ static void __call_rcu(struct rcu_head *head, struct rcu_ctrlblk *rcp,
> >  		struct rcu_data *rdp)
> >  {
> >  	long batch;
> > -	smp_mb(); /* reads the most recently updated value of rcu->cur. */
> > +
> > +	head->next = NULL;
> > +	smp_mb(); /* Read of rcu->cur must happen after any change by caller. */
> >  
> >  	/*
> >  	 * Determine the batch number of this callback.
> > @@ -174,7 +178,6 @@ void call_rcu(struct rcu_head *head,
> >  	unsigned long flags;
> >  
> >  	head->func = func;
> > -	head->next = NULL;
> >  	local_irq_save(flags);
> >  	__call_rcu(head, &rcu_ctrlblk, &__get_cpu_var(rcu_data));
> >  	local_irq_restore(flags);
> > @@ -203,7 +206,6 @@ void call_rcu_bh(struct rcu_head *head,
> >  	unsigned long flags;
> >  
> >  	head->func = func;
> > -	head->next = NULL;
> >  	local_irq_save(flags);
> >  	__call_rcu(head, &rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
> >  	local_irq_restore(flags);
> > @@ -389,17 +391,17 @@ static void rcu_move_batch(struct rcu_data *this_rdp, struct rcu_head *list,
> >  static void __rcu_offline_cpu(struct rcu_data *this_rdp,
> >  				struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
> >  {
> > -	/* if the cpu going offline owns the grace period
> > +	/*
> > +	 * if the cpu going offline owns the grace period
> >  	 * we can block indefinitely waiting for it, so flush
> >  	 * it here
> >  	 */
> >  	spin_lock_bh(&rcp->lock);
> >  	if (rcp->cur != rcp->completed)
> >  		cpu_quiet(rdp->cpu, rcp);
> > -	spin_unlock_bh(&rcp->lock);
> > -	/* spin_lock implies smp_mb() */
> 
> The spin_unlock is being removed here. Was that the smp_mb that was 
> implied or is it the above spin_lock_bh?

Moving the spinlock down below, combined with adding spinlocks elsewhere,
eliminates the need for the memory barrier -- the locking primitives
provide full ordering and atomicity as well.  This does likely reduce
scalability somewhat, which I will address with a hierarchical approach
in a later patch.

							Thanx, Paul

> >  	rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail, rcp->cur + 1);
> >  	rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail[2], rcp->cur + 1);
> > +	spin_unlock_bh(&rcp->lock);
> >  
> >  	local_irq_disable();
> >  	this_rdp->qlen += rdp->qlen;
> > @@ -433,16 +435,19 @@ static void rcu_offline_cpu(int cpu)
> >  static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
> >  					struct rcu_data *rdp)
> >  {
> > +	long completed_snap;
> > +
> >  	if (rdp->nxtlist) {
> >  		local_irq_disable();
> > +		completed_snap = ACCESS_ONCE(rcp->completed);
> >  
> >  		/*
> >  		 * move the other grace-period-completed entries to
> >  		 * [rdp->nxtlist, *rdp->nxttail[0]) temporarily
> >  		 */
> > -		if (!rcu_batch_before(rcp->completed, rdp->batch))
> > +		if (!rcu_batch_before(completed_snap, rdp->batch))
> >  			rdp->nxttail[0] = rdp->nxttail[1] = rdp->nxttail[2];
> > -		else if (!rcu_batch_before(rcp->completed, rdp->batch - 1))
> > +		else if (!rcu_batch_before(completed_snap, rdp->batch - 1))
> >  			rdp->nxttail[0] = rdp->nxttail[1];
> >  
> >  		/*
> > @@ -483,20 +488,38 @@ static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
> >  
> >  static void rcu_process_callbacks(struct softirq_action *unused)
> >  {
> > +	/*
> > +	 * Memory references from any prior RCU read-side critical sections
> > +	 * executed by the interrupted code must be see before any RCU
> > +	 * grace-period manupulations below.
> > +	 */
> > +
> > +	smp_mb(); /* See above block comment. */
> > +
> >  	__rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
> >  	__rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
> > +
> > +	/*
> > +	 * Memory references from any later RCU read-side critical sections
> > +	 * executed by the interrupted code must be see after any RCU
> > +	 * grace-period manupulations above.
> > +	 */
> > +
> > +	smp_mb(); /* See above block comment. */
> >  }
> >  
> >  static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
> >  {
> >  	if (rdp->nxtlist) {
> > +		long completed_snap = ACCESS_ONCE(rcp->completed);
> > +
> >  		/*
> >  		 * This cpu has pending rcu entries and the grace period
> >  		 * for them has completed.
> >  		 */
> > -		if (!rcu_batch_before(rcp->completed, rdp->batch))
> > +		if (!rcu_batch_before(completed_snap, rdp->batch))
> >  			return 1;
> > -		if (!rcu_batch_before(rcp->completed, rdp->batch - 1) &&
> > +		if (!rcu_batch_before(completed_snap, rdp->batch - 1) &&
> >  				rdp->nxttail[0] != rdp->nxttail[1])
> >  			return 1;
> >  		if (rdp->nxttail[0] != &rdp->nxtlist)
> > @@ -547,6 +570,12 @@ int rcu_needs_cpu(int cpu)
> >  	return !!rdp->nxtlist || !!rdp_bh->nxtlist || rcu_pending(cpu);
> >  }
> >  
> > +/*
> > + * Top-level function driving RCU grace-period detection, normally
> > + * invoked from the scheduler-clock interrupt.  This function simply
> > + * increments counters that are read only from softirq by this same
> > + * CPU, so there are no memory barriers required.
> > + */
> >  void rcu_check_callbacks(int cpu, int user)
> >  {
> >  	if (user ||
> > @@ -590,6 +619,7 @@ void rcu_check_callbacks(int cpu, int user)
> >  static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
> >  						struct rcu_data *rdp)
> >  {
> > +	spin_lock(&rcp->lock);
> >  	memset(rdp, 0, sizeof(*rdp));
> >  	rdp->nxttail[0] = rdp->nxttail[1] = rdp->nxttail[2] = &rdp->nxtlist;
> >  	rdp->donetail = &rdp->donelist;
> > @@ -597,6 +627,7 @@ static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
> >  	rdp->qs_pending = 0;
> >  	rdp->cpu = cpu;
> >  	rdp->blimit = blimit;
> > +	spin_unlock(&rcp->lock);
> >  }
> >  
> >  static void __cpuinit rcu_online_cpu(int cpu)
> > 

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups
  2008-08-05 16:21 [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups Paul E. McKenney
  2008-08-05 16:48 ` Steven Rostedt
@ 2008-08-06  5:30 ` Manfred Spraul
  2008-08-07  3:18   ` Paul E. McKenney
  2008-08-15 14:09 ` [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups Ingo Molnar
  2 siblings, 1 reply; 17+ messages in thread
From: Manfred Spraul @ 2008-08-06  5:30 UTC (permalink / raw)
  To: paulmck; +Cc: linux-kernel, mingo, akpm, oleg, dipankar, rostedt, dvhltc, niv

Hi Paul,

Paul E. McKenney wrote:
> This patch is in preparation for moving to a hierarchical
> algorithm to allow the very large SMP machines -- requested by some
> people at OLS, and there seem to have been a few recent patches in the
> 4096-CPU direction as well.

I thought about hierarchical RCU, but I never found the time to 
implement it.
Do you have a concept in mind?

Right now, I try to understand the current code first - and some of it 
doesn't make much sense.

There are three per-cpu lists:
->nxt
->cur
->done.

Obviously, there must be a quiescent state between cur and done.
But why does the code require a quiescent state between nxt and cur?
I think that's superflous. The only thing that is required is that all 
cpus have moved their callbacks from nxt to cur. That doesn't need a 
quiescent state, this operation could be done in hard interrupt as well.

Thus I think this should work:

1) A callback is inserted into ->nxt.
2) As soon as too many objects are sitting in the ->nxt lists, a new rcu 
cycle is started.
3) As soon as a cpu sees that a new rcu cycle is started, it moves it's 
callbacks from ->nxt to ->cur. No checks for hard_irq_count & friends 
necessary. Especially: same rule for _bh and normal.
4) As soon as all cpus have moved their lists from ->nxt to ->cur, the 
real grace period is started.
5) As soon as all cpus passed a quiescent state (i.e.: now with tests 
for hard_irq_count, different rules for _bh and normal), the list is 
moved from ->cur to ->completed. Once in completed, they can be 
destroyed by performing the callbacks.

What do you think? would that work? It doesn't make much sense that step 
3) tests for a quiescent state.

Step 2) could depend memory pressure.
Step 3) and 4) could be accelerated by force_quiescent_state(), if the 
memory pressure is too high.

--
    Manfred
-> nxt


^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups
  2008-08-06  5:30 ` Manfred Spraul
@ 2008-08-07  3:18   ` Paul E. McKenney
  2008-08-18  9:13     ` Manfred Spraul
  0 siblings, 1 reply; 17+ messages in thread
From: Paul E. McKenney @ 2008-08-07  3:18 UTC (permalink / raw)
  To: Manfred Spraul
  Cc: linux-kernel, mingo, akpm, oleg, dipankar, rostedt, dvhltc, niv

On Wed, Aug 06, 2008 at 07:30:13AM +0200, Manfred Spraul wrote:
> Hi Paul,
>
> Paul E. McKenney wrote:
>> This patch is in preparation for moving to a hierarchical
>> algorithm to allow the very large SMP machines -- requested by some
>> people at OLS, and there seem to have been a few recent patches in the
>> 4096-CPU direction as well.
>
> I thought about hierarchical RCU, but I never found the time to implement 
> it.
> Do you have a concept in mind?

Actually, you did submit a patch for a two-level hierarchy some years
back:

http://marc.theaimsgroup.com/?l=linux-kernel&m=108546384711797&w=2

I am looking to allow multiple levels to accommodate 4096 CPUs, which
pushes me towards locking on the nodes in the hierarchy.  I have
a roughed-out design that (I hope!) avoids deadlock and that allows
adapting to machine topology.  I am also trying to minimize the amount
of arch-specific code needed to construct the hierarchy -- hopefully
just a pair of config parameters.

More as it starts working...

> Right now, I try to understand the current code first - and some of it 
> doesn't make much sense.
>
> There are three per-cpu lists:
> ->nxt
> ->cur
> ->done.
>
> Obviously, there must be a quiescent state between cur and done.
> But why does the code require a quiescent state between nxt and cur?
> I think that's superflous. The only thing that is required is that all cpus 
> have moved their callbacks from nxt to cur. That doesn't need a quiescent 
> state, this operation could be done in hard interrupt as well.

The deal is that we have to put incoming callbacks somewhere while
the batch in ->cur waits for an RCU grace period.  That somewhere is
->nxt.  So to be painfully pedantic, the callbacks in ->nxt are not
waiting for an RCU grace period.  Instead, they are waiting for the
callbacks in ->cur to get out of the way.

> Thus I think this should work:
>
> 1) A callback is inserted into ->nxt.

Yep.

> 2) As soon as too many objects are sitting in the ->nxt lists, a new rcu 
> cycle is started.

Yep, call_rcu() and friends now do this. (In response to denial of
services attacks some years back.)

> 3) As soon as a cpu sees that a new rcu cycle is started, it moves it's 
> callbacks from ->nxt to ->cur. No checks for hard_irq_count & friends 
> necessary. Especially: same rule for _bh and normal.

Yep.  The checks for hard_irq_count are instead intended to determine
if this CPU is already in a quiescent state for the newly started RCU
grace period.  As long as we took the scheduling clock interrupt,
we might as well get our money's worth, right?

> 4) As soon as all cpus have moved their lists from ->nxt to ->cur, the real 
> grace period is started.

Jiangshan took a slightly different approach to handling this situation,
but yes, more or less.  The trick is that the processing in (4) for
->nxt is overlapped with the processing in (5) for ->cur.

> 5) As soon as all cpus passed a quiescent state (i.e.: now with tests for 
> hard_irq_count, different rules for _bh and normal), the list is moved from 
> ->cur to ->completed. Once in completed, they can be destroyed by 
> performing the callbacks.

To ->done rather than ->completed, but yes.

> What do you think? would that work? It doesn't make much sense that step 3) 
> tests for a quiescent state.

The trick is that the work for grace period n and grace period n+1
are overlapped.

> Step 2) could depend memory pressure.

Yep.

> Step 3) and 4) could be accelerated by force_quiescent_state(), if the 
> memory pressure is too high.

Yep -- though we haven't done this except on paper.

							Thanx, Paul

> --
>    Manfred
> -> nxt
>

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups
  2008-08-05 16:21 [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups Paul E. McKenney
  2008-08-05 16:48 ` Steven Rostedt
  2008-08-06  5:30 ` Manfred Spraul
@ 2008-08-15 14:09 ` Ingo Molnar
  2008-08-15 14:24   ` Ingo Molnar
  2 siblings, 1 reply; 17+ messages in thread
From: Ingo Molnar @ 2008-08-15 14:09 UTC (permalink / raw)
  To: Paul E. McKenney
  Cc: linux-kernel, akpm, oleg, manfred, dipankar, rostedt, dvhltc, niv


* Paul E. McKenney <paulmck@linux.vnet.ibm.com> wrote:

> This patch simplifies the locking and memory-barrier usage in the 
> Classic RCU grace-period-detection mechanism, incorporating Lai 
> Jiangshan's feedback from the earlier version 
> (http://lkml.org/lkml/2008/8/1/400 and 
> http://lkml.org/lkml/2008/8/3/43).  Passed 10 hours of rcutorture 
> concurrent with CPUs being put online and taken offline on a 
> 128-hardware-thread Power machine.  My apologies to whoever in the 
> Eastern Hemisphere was planning to use this machine over the Western 
> Hemisphere night, but it was sitting idle and...
> 
> So this is ready for tip/core/rcu.

applied to tip/core/rcu - thanks Paul!

	Ingo

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups
  2008-08-15 14:09 ` [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups Ingo Molnar
@ 2008-08-15 14:24   ` Ingo Molnar
  2008-08-15 14:56     ` Ingo Molnar
                       ` (2 more replies)
  0 siblings, 3 replies; 17+ messages in thread
From: Ingo Molnar @ 2008-08-15 14:24 UTC (permalink / raw)
  To: Paul E. McKenney
  Cc: linux-kernel, akpm, oleg, manfred, dipankar, rostedt, dvhltc, niv


Paul,

one of your two recent RCU patches caused this lockdep splat in -tip 
testing:

------------------->
Brought up 2 CPUs
Total of 2 processors activated (6850.87 BogoMIPS).
PM: Adding info for No Bus:platform
khelper used greatest stack depth: 3124 bytes left

=================================
[ INFO: inconsistent lock state ]
2.6.27-rc3-tip #1
---------------------------------
inconsistent {softirq-on-W} -> {in-softirq-W} usage.
ksoftirqd/0/4 [HC0[0]:SC1[1]:HE1:SE0] takes:
 (&rcu_ctrlblk.lock){-+..}, at: [<c016d91c>] __rcu_process_callbacks+0x1ac/0x1f0
{softirq-on-W} state was registered at:
  [<c01528e4>] __lock_acquire+0x3f4/0x5b0
  [<c0152b29>] lock_acquire+0x89/0xc0
  [<c076142b>] _spin_lock+0x3b/0x70
  [<c016d649>] rcu_init_percpu_data+0x29/0x80
  [<c075e43f>] rcu_cpu_notify+0xaf/0xd0
  [<c076458d>] notifier_call_chain+0x2d/0x60
  [<c0145ede>] __raw_notifier_call_chain+0x1e/0x30
  [<c075db29>] _cpu_up+0x79/0x110
  [<c075dc0d>] cpu_up+0x4d/0x70
  [<c0a769e1>] kernel_init+0xb1/0x200
  [<c01048a3>] kernel_thread_helper+0x7/0x10
  [<ffffffff>] 0xffffffff
irq event stamp: 14
hardirqs last  enabled at (14): [<c01534db>] trace_hardirqs_on+0xb/0x10
hardirqs last disabled at (13): [<c014dbeb>] trace_hardirqs_off+0xb/0x10
softirqs last  enabled at (0): [<c012b186>] copy_process+0x276/0x1190
softirqs last disabled at (11): [<c0105c0a>] call_on_stack+0x1a/0x30

other info that might help us debug this:
no locks held by ksoftirqd/0/4.

stack backtrace:
Pid: 4, comm: ksoftirqd/0 Not tainted 2.6.27-rc3-tip #1
 [<c01504dc>] print_usage_bug+0x16c/0x1b0
 [<c0152455>] mark_lock+0xa75/0xb10
 [<c0108b75>] ? sched_clock+0x15/0x30
 [<c015289d>] __lock_acquire+0x3ad/0x5b0
 [<c0152b29>] lock_acquire+0x89/0xc0
 [<c016d91c>] ? __rcu_process_callbacks+0x1ac/0x1f0
 [<c076142b>] _spin_lock+0x3b/0x70
 [<c016d91c>] ? __rcu_process_callbacks+0x1ac/0x1f0
 [<c016d91c>] __rcu_process_callbacks+0x1ac/0x1f0
 [<c016d986>] rcu_process_callbacks+0x26/0x50
 [<c0132305>] __do_softirq+0x95/0x120
 [<c0132270>] ? __do_softirq+0x0/0x120
 [<c0105c0a>] call_on_stack+0x1a/0x30
 [<c0132426>] ? ksoftirqd+0x96/0x110
 [<c0132390>] ? ksoftirqd+0x0/0x110
 [<c01411f7>] ? kthread+0x47/0x80
 [<c01411b0>] ? kthread+0x0/0x80
 [<c01048a3>] ? kernel_thread_helper+0x7/0x10
 =======================
calling  init_cpufreq_transition_notifier_list+0x0/0x20
initcall init_cpufreq_transition_notifier_list+0x0/0x20 returned 0 after 0 msecs
calling  net_ns_init+0x0/0x190
net_namespace: 676 bytes
initcall net_ns_init+0x0/0x190 returned 0 after 0 msecs
calling  cpufreq_tsc+0x0/0x20
initcall cpufreq_tsc+0x0/0x20 returned 0 after 0 msecs
calling  reboot_init+0x0/0x20
initcall reboot_init+0x0/0x20 returned 0 after 0 msecs
calling  print_banner+0x0/0x10
Booting paravirtualized kernel on bare hardware

<-----------------------

my guess is on:

 commit 1f7b94cd3d564901f9e04a8bc5832ae7bfd690a0
 Author: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
 Date:   Tue Aug 5 09:21:44 2008 -0700

    rcu: classic RCU locking and memory-barrier cleanups
 
	Ingo

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups
  2008-08-15 14:24   ` Ingo Molnar
@ 2008-08-15 14:56     ` Ingo Molnar
  2008-08-15 14:58     ` Paul E. McKenney
  2008-08-17 14:37     ` [PATCH tip/core/rcu] classic RCU locking cleanup fix lockdep problem Paul E. McKenney
  2 siblings, 0 replies; 17+ messages in thread
From: Ingo Molnar @ 2008-08-15 14:56 UTC (permalink / raw)
  To: Paul E. McKenney
  Cc: linux-kernel, akpm, oleg, manfred, dipankar, rostedt, dvhltc, niv


* Ingo Molnar <mingo@elte.hu> wrote:

> my guess is on:
> 
>  commit 1f7b94cd3d564901f9e04a8bc5832ae7bfd690a0
>  Author: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
>  Date:   Tue Aug 5 09:21:44 2008 -0700
> 
>     rcu: classic RCU locking and memory-barrier cleanups

yep, reverting that one made the lockdep warning go away. (note, i still 
kept the commit in tip/core/rcu so a delta fix against the previous 
patch would be preferred)

	Ingo

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups
  2008-08-15 14:24   ` Ingo Molnar
  2008-08-15 14:56     ` Ingo Molnar
@ 2008-08-15 14:58     ` Paul E. McKenney
  2008-08-17 14:37     ` [PATCH tip/core/rcu] classic RCU locking cleanup fix lockdep problem Paul E. McKenney
  2 siblings, 0 replies; 17+ messages in thread
From: Paul E. McKenney @ 2008-08-15 14:58 UTC (permalink / raw)
  To: Ingo Molnar
  Cc: linux-kernel, akpm, oleg, manfred, dipankar, rostedt, dvhltc, niv

On Fri, Aug 15, 2008 at 04:24:30PM +0200, Ingo Molnar wrote:
> 
> Paul,
> 
> one of your two recent RCU patches caused this lockdep splat in -tip 
> testing:

I will beat this up on my plane flight back home.

Note to self: add lockdep to testing mix.  :-/

							Thanx, Paul

> ------------------->
> Brought up 2 CPUs
> Total of 2 processors activated (6850.87 BogoMIPS).
> PM: Adding info for No Bus:platform
> khelper used greatest stack depth: 3124 bytes left
> 
> =================================
> [ INFO: inconsistent lock state ]
> 2.6.27-rc3-tip #1
> ---------------------------------
> inconsistent {softirq-on-W} -> {in-softirq-W} usage.
> ksoftirqd/0/4 [HC0[0]:SC1[1]:HE1:SE0] takes:
>  (&rcu_ctrlblk.lock){-+..}, at: [<c016d91c>] __rcu_process_callbacks+0x1ac/0x1f0
> {softirq-on-W} state was registered at:
>   [<c01528e4>] __lock_acquire+0x3f4/0x5b0
>   [<c0152b29>] lock_acquire+0x89/0xc0
>   [<c076142b>] _spin_lock+0x3b/0x70
>   [<c016d649>] rcu_init_percpu_data+0x29/0x80
>   [<c075e43f>] rcu_cpu_notify+0xaf/0xd0
>   [<c076458d>] notifier_call_chain+0x2d/0x60
>   [<c0145ede>] __raw_notifier_call_chain+0x1e/0x30
>   [<c075db29>] _cpu_up+0x79/0x110
>   [<c075dc0d>] cpu_up+0x4d/0x70
>   [<c0a769e1>] kernel_init+0xb1/0x200
>   [<c01048a3>] kernel_thread_helper+0x7/0x10
>   [<ffffffff>] 0xffffffff
> irq event stamp: 14
> hardirqs last  enabled at (14): [<c01534db>] trace_hardirqs_on+0xb/0x10
> hardirqs last disabled at (13): [<c014dbeb>] trace_hardirqs_off+0xb/0x10
> softirqs last  enabled at (0): [<c012b186>] copy_process+0x276/0x1190
> softirqs last disabled at (11): [<c0105c0a>] call_on_stack+0x1a/0x30
> 
> other info that might help us debug this:
> no locks held by ksoftirqd/0/4.
> 
> stack backtrace:
> Pid: 4, comm: ksoftirqd/0 Not tainted 2.6.27-rc3-tip #1
>  [<c01504dc>] print_usage_bug+0x16c/0x1b0
>  [<c0152455>] mark_lock+0xa75/0xb10
>  [<c0108b75>] ? sched_clock+0x15/0x30
>  [<c015289d>] __lock_acquire+0x3ad/0x5b0
>  [<c0152b29>] lock_acquire+0x89/0xc0
>  [<c016d91c>] ? __rcu_process_callbacks+0x1ac/0x1f0
>  [<c076142b>] _spin_lock+0x3b/0x70
>  [<c016d91c>] ? __rcu_process_callbacks+0x1ac/0x1f0
>  [<c016d91c>] __rcu_process_callbacks+0x1ac/0x1f0
>  [<c016d986>] rcu_process_callbacks+0x26/0x50
>  [<c0132305>] __do_softirq+0x95/0x120
>  [<c0132270>] ? __do_softirq+0x0/0x120
>  [<c0105c0a>] call_on_stack+0x1a/0x30
>  [<c0132426>] ? ksoftirqd+0x96/0x110
>  [<c0132390>] ? ksoftirqd+0x0/0x110
>  [<c01411f7>] ? kthread+0x47/0x80
>  [<c01411b0>] ? kthread+0x0/0x80
>  [<c01048a3>] ? kernel_thread_helper+0x7/0x10
>  =======================
> calling  init_cpufreq_transition_notifier_list+0x0/0x20
> initcall init_cpufreq_transition_notifier_list+0x0/0x20 returned 0 after 0 msecs
> calling  net_ns_init+0x0/0x190
> net_namespace: 676 bytes
> initcall net_ns_init+0x0/0x190 returned 0 after 0 msecs
> calling  cpufreq_tsc+0x0/0x20
> initcall cpufreq_tsc+0x0/0x20 returned 0 after 0 msecs
> calling  reboot_init+0x0/0x20
> initcall reboot_init+0x0/0x20 returned 0 after 0 msecs
> calling  print_banner+0x0/0x10
> Booting paravirtualized kernel on bare hardware
> 
> <-----------------------
> 
> my guess is on:
> 
>  commit 1f7b94cd3d564901f9e04a8bc5832ae7bfd690a0
>  Author: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
>  Date:   Tue Aug 5 09:21:44 2008 -0700
> 
>     rcu: classic RCU locking and memory-barrier cleanups
> 
> 	Ingo

^ permalink raw reply	[flat|nested] 17+ messages in thread

* [PATCH tip/core/rcu] classic RCU locking cleanup fix lockdep problem
  2008-08-15 14:24   ` Ingo Molnar
  2008-08-15 14:56     ` Ingo Molnar
  2008-08-15 14:58     ` Paul E. McKenney
@ 2008-08-17 14:37     ` Paul E. McKenney
  2008-08-17 15:38       ` Ingo Molnar
  2 siblings, 1 reply; 17+ messages in thread
From: Paul E. McKenney @ 2008-08-17 14:37 UTC (permalink / raw)
  To: Ingo Molnar
  Cc: linux-kernel, akpm, oleg, manfred, dipankar, rostedt, dvhltc, niv

On Fri, Aug 15, 2008 at 04:24:30PM +0200, Ingo Molnar wrote:
> 
> Paul,
> 
> one of your two recent RCU patches caused this lockdep splat in -tip 
> testing:
> 
> ------------------->
> Brought up 2 CPUs
> Total of 2 processors activated (6850.87 BogoMIPS).
> PM: Adding info for No Bus:platform
> khelper used greatest stack depth: 3124 bytes left
> 
> =================================
> [ INFO: inconsistent lock state ]
> 2.6.27-rc3-tip #1
> ---------------------------------
> inconsistent {softirq-on-W} -> {in-softirq-W} usage.
> ksoftirqd/0/4 [HC0[0]:SC1[1]:HE1:SE0] takes:
>  (&rcu_ctrlblk.lock){-+..}, at: [<c016d91c>] __rcu_process_callbacks+0x1ac/0x1f0
> {softirq-on-W} state was registered at:
>   [<c01528e4>] __lock_acquire+0x3f4/0x5b0
>   [<c0152b29>] lock_acquire+0x89/0xc0
>   [<c076142b>] _spin_lock+0x3b/0x70
>   [<c016d649>] rcu_init_percpu_data+0x29/0x80
>   [<c075e43f>] rcu_cpu_notify+0xaf/0xd0
>   [<c076458d>] notifier_call_chain+0x2d/0x60
>   [<c0145ede>] __raw_notifier_call_chain+0x1e/0x30
>   [<c075db29>] _cpu_up+0x79/0x110
>   [<c075dc0d>] cpu_up+0x4d/0x70
>   [<c0a769e1>] kernel_init+0xb1/0x200
>   [<c01048a3>] kernel_thread_helper+0x7/0x10
>   [<ffffffff>] 0xffffffff
> irq event stamp: 14
> hardirqs last  enabled at (14): [<c01534db>] trace_hardirqs_on+0xb/0x10
> hardirqs last disabled at (13): [<c014dbeb>] trace_hardirqs_off+0xb/0x10
> softirqs last  enabled at (0): [<c012b186>] copy_process+0x276/0x1190
> softirqs last disabled at (11): [<c0105c0a>] call_on_stack+0x1a/0x30
> 
> other info that might help us debug this:
> no locks held by ksoftirqd/0/4.
> 
> stack backtrace:
> Pid: 4, comm: ksoftirqd/0 Not tainted 2.6.27-rc3-tip #1
>  [<c01504dc>] print_usage_bug+0x16c/0x1b0
>  [<c0152455>] mark_lock+0xa75/0xb10
>  [<c0108b75>] ? sched_clock+0x15/0x30
>  [<c015289d>] __lock_acquire+0x3ad/0x5b0
>  [<c0152b29>] lock_acquire+0x89/0xc0
>  [<c016d91c>] ? __rcu_process_callbacks+0x1ac/0x1f0
>  [<c076142b>] _spin_lock+0x3b/0x70
>  [<c016d91c>] ? __rcu_process_callbacks+0x1ac/0x1f0
>  [<c016d91c>] __rcu_process_callbacks+0x1ac/0x1f0
>  [<c016d986>] rcu_process_callbacks+0x26/0x50
>  [<c0132305>] __do_softirq+0x95/0x120
>  [<c0132270>] ? __do_softirq+0x0/0x120
>  [<c0105c0a>] call_on_stack+0x1a/0x30
>  [<c0132426>] ? ksoftirqd+0x96/0x110
>  [<c0132390>] ? ksoftirqd+0x0/0x110
>  [<c01411f7>] ? kthread+0x47/0x80
>  [<c01411b0>] ? kthread+0x0/0x80
>  [<c01048a3>] ? kernel_thread_helper+0x7/0x10
>  =======================
> calling  init_cpufreq_transition_notifier_list+0x0/0x20
> initcall init_cpufreq_transition_notifier_list+0x0/0x20 returned 0 after 0 msecs
> calling  net_ns_init+0x0/0x190
> net_namespace: 676 bytes
> initcall net_ns_init+0x0/0x190 returned 0 after 0 msecs
> calling  cpufreq_tsc+0x0/0x20
> initcall cpufreq_tsc+0x0/0x20 returned 0 after 0 msecs
> calling  reboot_init+0x0/0x20
> initcall reboot_init+0x0/0x20 returned 0 after 0 msecs
> calling  print_banner+0x0/0x10
> Booting paravirtualized kernel on bare hardware
> 
> <-----------------------
> 
> my guess is on:
> 
>  commit 1f7b94cd3d564901f9e04a8bc5832ae7bfd690a0
>  Author: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
>  Date:   Tue Aug 5 09:21:44 2008 -0700
> 
>     rcu: classic RCU locking and memory-barrier cleanups
> 
> 	Ingo

Fixes a problem detected by lockdep in which rcu->lock was acquired
both in irq context and in process context, but without disabling from
process context.

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
---

 rcuclassic.c |    6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/kernel/rcuclassic.c b/kernel/rcuclassic.c
index 5de1266..fb1f1cc 100644
--- a/kernel/rcuclassic.c
+++ b/kernel/rcuclassic.c
@@ -700,7 +700,9 @@ void rcu_check_callbacks(int cpu, int user)
 static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
 						struct rcu_data *rdp)
 {
-	spin_lock(&rcp->lock);
+	long flags;
+
+	spin_lock_irqsave(&rcp->lock, flags);
 	memset(rdp, 0, sizeof(*rdp));
 	rdp->nxttail[0] = rdp->nxttail[1] = rdp->nxttail[2] = &rdp->nxtlist;
 	rdp->donetail = &rdp->donelist;
@@ -708,7 +710,7 @@ static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
 	rdp->qs_pending = 0;
 	rdp->cpu = cpu;
 	rdp->blimit = blimit;
-	spin_unlock(&rcp->lock);
+	spin_unlock_irqrestore(&rcp->lock, flags);
 }
 
 static void __cpuinit rcu_online_cpu(int cpu)

^ permalink raw reply related	[flat|nested] 17+ messages in thread

* Re: [PATCH tip/core/rcu] classic RCU locking cleanup fix lockdep problem
  2008-08-17 14:37     ` [PATCH tip/core/rcu] classic RCU locking cleanup fix lockdep problem Paul E. McKenney
@ 2008-08-17 15:38       ` Ingo Molnar
  0 siblings, 0 replies; 17+ messages in thread
From: Ingo Molnar @ 2008-08-17 15:38 UTC (permalink / raw)
  To: Paul E. McKenney
  Cc: linux-kernel, akpm, oleg, manfred, dipankar, rostedt, dvhltc, niv


* Paul E. McKenney <paulmck@linux.vnet.ibm.com> wrote:

> Fixes a problem detected by lockdep in which rcu->lock was acquired 
> both in irq context and in process context, but without disabling from 
> process context.

applied to tip/core/rcu - thanks Paul!

	Ingo

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups
  2008-08-07  3:18   ` Paul E. McKenney
@ 2008-08-18  9:13     ` Manfred Spraul
  2008-08-18 14:04       ` Paul E. McKenney
  0 siblings, 1 reply; 17+ messages in thread
From: Manfred Spraul @ 2008-08-18  9:13 UTC (permalink / raw)
  To: paulmck; +Cc: linux-kernel, mingo, akpm, oleg, dipankar, rostedt, dvhltc, niv

Paul E. McKenney wrote:
>
>> Right now, I try to understand the current code first - and some of it 
>> doesn't make much sense.
>>
>> There are three per-cpu lists:
>> ->nxt
>> ->cur
>> ->done.
>>
>> Obviously, there must be a quiescent state between cur and done.
>> But why does the code require a quiescent state between nxt and cur?
>> I think that's superflous. The only thing that is required is that all cpus 
>> have moved their callbacks from nxt to cur. That doesn't need a quiescent 
>> state, this operation could be done in hard interrupt as well.
>>     
>
> The deal is that we have to put incoming callbacks somewhere while
> the batch in ->cur waits for an RCU grace period.  That somewhere is
> ->nxt.  So to be painfully pedantic, the callbacks in ->nxt are not
> waiting for an RCU grace period.  Instead, they are waiting for the
> callbacks in ->cur to get out of the way.
>
>   
Ok, thanks.
If I understand the new code in tip/rcu correctly, you have rewritten 
that block anyway.

I'll try to implement my proposal - on paper, it looks far simpler than 
the current code.
On the one hand, a state machine that keeps track of a global state:
- collect the callbacks in a nxt list.
- wait for quiecent
- destroy the callbacks in the nxt list.
(actually, there will be 5 states, 2 additional for "start the next rcu 
cycle immediately")

On the other hand a cpu bitmap that keeps track of the cpus that have 
completed the work that must be done after a state change.
The last cpu advances the global state.

The state machine could be seq_lock protected, the cpu bitmap could be 
either hierarchical or flat or for uniprocessor just a nop.

Do you have any statistics about rcu_check_callbacks? On my single-cpu 
system, around 2/3 of the calls are from "normal" context, i.e. 
rcu_qsctr_inc() is called.

--
    Manfred

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups
  2008-08-18  9:13     ` Manfred Spraul
@ 2008-08-18 14:04       ` Paul E. McKenney
  2008-08-19 10:48         ` Manfred Spraul
  0 siblings, 1 reply; 17+ messages in thread
From: Paul E. McKenney @ 2008-08-18 14:04 UTC (permalink / raw)
  To: Manfred Spraul
  Cc: linux-kernel, mingo, akpm, oleg, dipankar, rostedt, dvhltc, niv

[-- Attachment #1: Type: text/plain, Size: 3382 bytes --]

On Mon, Aug 18, 2008 at 11:13:45AM +0200, Manfred Spraul wrote:
> Paul E. McKenney wrote:
>>
>>> Right now, I try to understand the current code first - and some of it 
>>> doesn't make much sense.
>>>
>>> There are three per-cpu lists:
>>> ->nxt
>>> ->cur
>>> ->done.
>>>
>>> Obviously, there must be a quiescent state between cur and done.
>>> But why does the code require a quiescent state between nxt and cur?
>>> I think that's superflous. The only thing that is required is that all 
>>> cpus have moved their callbacks from nxt to cur. That doesn't need a 
>>> quiescent state, this operation could be done in hard interrupt as well.
>>>     
>>
>> The deal is that we have to put incoming callbacks somewhere while
>> the batch in ->cur waits for an RCU grace period.  That somewhere is
>> ->nxt.  So to be painfully pedantic, the callbacks in ->nxt are not
>> waiting for an RCU grace period.  Instead, they are waiting for the
>> callbacks in ->cur to get out of the way.
>>
>>   
> Ok, thanks.
> If I understand the new code in tip/rcu correctly, you have rewritten that 
> block anyway.
>
> I'll try to implement my proposal - on paper, it looks far simpler than the 
> current code.

Indeed.  The current code has accreted a bunch of fixes that have greatly
complicated the code.

> On the one hand, a state machine that keeps track of a global state:
> - collect the callbacks in a nxt list.
> - wait for quiecent
> - destroy the callbacks in the nxt list.
> (actually, there will be 5 states, 2 additional for "start the next rcu 
> cycle immediately")
>
> On the other hand a cpu bitmap that keeps track of the cpus that have 
> completed the work that must be done after a state change.
> The last cpu advances the global state.

FWIW, attached is what I have coded up thus far along these lines.
Currently testing in user space, hence some of the strange code and
comments flagged by "@@@" and "&&&&".  It automatically shapes the
hierarchy, reverting to no hierarchy for machines with few CPUs, and
using up to two additional levels for machines with many CPUs.  It would
not be hard to add additional levels, but given that the two additional
levels on a 64-bit machine can support 256K CPUs, this should suffice
for almost all cases.

> The state machine could be seq_lock protected, the cpu bitmap could be 
> either hierarchical or flat or for uniprocessor just a nop.

I chose hierarchical for cache locality.  I used simple spinlocks
instead of seq_lock on the grounds that the hierarchy decreases lock
contention by orders of magnitude.  I do still access some values
outside of locks, but this is greatly reduced compared to the current
code base.

> Do you have any statistics about rcu_check_callbacks? On my single-cpu 
> system, around 2/3 of the calls are from "normal" context, i.e. 
> rcu_qsctr_inc() is called.

This depends critically on the workload.  Workloads such as HPC that
spend almost all of their time in user-mode execution would invoke
rcu_qsctr_inc() almost all of the time, as would workloads with
significant idle time.  On the other hand, it is possible to arrange
so that almost all execution is in the kernel and there are almost no
context switches, in which case, rcu_qsctr_inc() would only be called
after a force_quiescent_state().

But most workloads should see rcu_qsctr_inc() called most of the time.

							Thanx, Paul

[-- Attachment #2: rcuclassic.h --]
[-- Type: text/x-chdr, Size: 3537 bytes --]

/*
 * rcuclassic.h: user-level prototype of hierarchical classic RCU.
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * Copyright (c) 2008 Paul E. McKenney, IBM Corporation.
 */

#define RCU_DONE_TAIL		0	/* Also RCU_WAIT head. */
#define RCU_WAIT_TAIL		1	/* Also RCU_NEXT_READY head. */
#define RCU_NEXT_READY_TAIL	2	/* Also RCU_NEXT head. */
#define RCU_NEXT_TAIL		3
#define RCU_NEXT_SIZE		4

/* Per-CPU data for Read-Copy UPdate. */
struct rcu_data {
	/* 1) quiescent-state and grace-period handling : */
	long		completed;	/* Track rsp->completed gp number */
					/*  in order to detect GP end. */
	long		gpnum;		/* Highest gp number that this CPU */
					/*  is aware of having started. */
	int		passed_quiesc;	/* User-mode/idle loop etc. */
	int		qs_pending;	/* core waits for quiesc state */
					/*  rdp->completed!=rsp->completed, */
					/*  but with less cache thrashing. */
	struct rcu_node *mynode;	/* this CPU's leaf of hierarchy */

	/* 2) batch handling */
	/*
	 * If nxtlist is not NULL, it is partitioned as follows.
	 * Any of the partitions might be empty, in which case the
	 * pointer to that partition will be equal to the pointer for
	 * the following partition.  When the list is empty, all of
	 * the nxttail elements point to nxtlist, which is NULL.
	 *
	 * [*nxttail[2], NULL = *nxttail[3]):
	 *	Entries that might have arrived after current GP ended
	 * [*nxttail[1], *nxttail[2]):
	 *	Entries known to have arrived before current GP ended
	 * [*nxttail[0], *nxttail[1]):
	 *	Entries that batch # <= ->completed - 1: waiting for current GP
	 * [nxtlist, *nxttail[0]):
	 *	Entries that batch # <= ->completed
	 *	The grace period for these entries has completed, and
	 *	the other grace-period-completed entries may be moved
	 *	here temporarily in rcu_process_callbacks().
	 */
	struct rcu_head *nxtlist;
	struct rcu_head **nxttail[RCU_NEXT_SIZE];
	long            qlen; 	 	/* # of queued callbacks */
	/* @@@ struct rcu_head *donelist;	*/
	/* @@@ struct rcu_head **donetail;	*/
	long		blimit;		/* Upper limit on a processed batch */
	int cpu;
	/* struct rcu_head barrier; */
};

/* @@@ DECLARE_PER_CPU(struct rcu_data, rcu_data); */
/* @@@ DECLARE_PER_CPU(struct rcu_data, rcu_bh_data); */
DEFINE_PER_CPU(struct rcu_data, rcu_data);
DEFINE_PER_CPU(struct rcu_data, rcu_bh_data);

/*
 * Increment the quiescent state "counter".
 * This used to really be a counter, but has atrophied.  The reason is
 * that we do not need to know how many quiescent states passed, just
 * if there was at least one since the start of the grace period.
 * The counter has therefore become a simple flag.
 */
static inline void rcu_qsctr_inc(int cpu)
{
	struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
	rdp->passed_quiesc = 1;
}
static inline void rcu_bh_qsctr_inc(int cpu)
{
	struct rcu_data *rdp = &per_cpu(rcu_bh_data, cpu);
	rdp->passed_quiesc = 1;
}

[-- Attachment #3: rcuclassic.c --]
[-- Type: text/x-csrc, Size: 28083 bytes --]

/*
 * rcuclassic.c: user-level prototype of hierarchical classic RCU.
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * Copyright (c) 2008 Paul E. McKenney, IBM Corporation.
 */


#define CONFIG_RCU_FANOUT 3
#define NR_CPUS 5
/* #define CONFIG_RCU_FANOUT_EXACT */

#include <stdio.h>
#include "api.h"
#include "rcuclassic.h"

/* Define shape of hierarchy based on NR_CPUS and CONFIG_RCU_FANOUT. */

#define MAX_RCU_LEVELS 3
#if NR_CPUS <= CONFIG_RCU_FANOUT
#define NUM_RCU_LEVELS 1
#define NUM_RCU_LEVEL_1 1
#define NUM_RCU_LEVEL_2 NR_CPUS
#define NUM_RCU_LEVEL_3 0
#define NUM_RCU_LEVEL_4 0
#define NUM_RCU_NODES NUM_RCU_LEVEL_1
#elif NR_CPUS <= CONFIG_RCU_FANOUT * CONFIG_RCU_FANOUT
#define NUM_RCU_LEVELS 2
#define NUM_RCU_LEVEL_1 1
#define NUM_RCU_LEVEL_2 \
	(((NR_CPUS) + (CONFIG_RCU_FANOUT) - 1) / (CONFIG_RCU_FANOUT))
#define NUM_RCU_LEVEL_3 NR_CPUS
#define NUM_RCU_LEVEL_4 0
#define NUM_RCU_NODES \
	((NUM_RCU_LEVEL_1) + (NUM_RCU_LEVEL_2))
#elif NR_CPUS <= CONFIG_RCU_FANOUT * CONFIG_RCU_FANOUT * CONFIG_RCU_FANOUT
#define NUM_RCU_LEVELS 3
#define RCU_FANOUT_SQ ((CONFIG_RCU_FANOUT) * (CONFIG_RCU_FANOUT))
#define NUM_RCU_LEVEL_1 1
#define NUM_RCU_LEVEL_2 \
	(((NR_CPUS) + (RCU_FANOUT_SQ) - 1) / (RCU_FANOUT_SQ))
#define NUM_RCU_LEVEL_3 \
	((NR_CPUS) + (CONFIG_RCU_FANOUT) - 1) / (CONFIG_RCU_FANOUT)
#define NUM_RCU_LEVEL_4 NR_CPUS
#define NUM_RCU_NODES \
	((NUM_RCU_LEVEL_1) + \
	 (NUM_RCU_LEVEL_2) + \
	 (NUM_RCU_LEVEL_3))
#else
#error "CONFIG_RCU_FANOUT insufficient for NR_CPUS"
#endif

/* Data structure definitions. */

/*
 * Definition for node within the RCU grace-period-detection hierarchy.
 */
struct rcu_node {
	spinlock_t lock;
	long	qsmask;		/* CPUs or groups that need to switch in      */
				/*  order for current grace period to proceed.*/
	long	qsmaskinit;	/* Per-GP initialization for qsmask.	      */
	int	grplo;		/* lowest-numbered CPU or group here.	      */
	int	grphi;		/* highest-numbered CPU or group here.	      */
	char	grpnum;		/* CPU/group number for next level up.	      */
	char	level;		/* root is at level 0.			      */
	struct rcu_node *parent;
} ____cacheline_internodealigned_in_smp;

/*
 * RCU global state, including node hierarchy.  This hierarchy is
 * represented in "heap" form in a dense array.  The root (first level)
 * of the hierarchy is in ->node[0] (referenced by ->level[0]), the second
 * level in ->node[1] through ->node[m] (->node[1] referenced by ->level[1]),
 * and the third level in ->node[m+1] and following (->node[m+1] referenced
 * by ->level[2]).  The number of levels is determined by the number of
 * CPUs and by CONFIG_RCU_FANOUT.  Small systems will have a "hierarchy"
 * consisting of a single rcu_node.
 */
struct rcu_state {
	struct rcu_node node[NUM_RCU_NODES];	/* Hierarchy. */
	struct rcu_node *level[NUM_RCU_LEVELS];	/* Hierarchy levels. */
	int levelcnt[MAX_RCU_LEVELS + 1];	/* # nodes in each level. */
	int levelspread[NUM_RCU_LEVELS];	/* kids/node in each level. */

	/* The following fields are guarded by the root rcu_node's lock. */

	char	signaled ____cacheline_internodealigned_in_smp;
						/* sent GP-kick IPIs? */
	int	gpnum;				/* Current gp number. */
	int	completed;			/* # of last completed gp. */
};

#define RCU_STATE_INITIALIZER(name) { \
	.node = { { \
		.lock = __SPIN_LOCK_UNLOCKED(&name.node[0].lock), \
		.qsmask = 0, \
	} }, \
	.level = { &name.node[0] }, \
	.levelcnt = { \
		NUM_RCU_LEVEL_1,  /* root of hierarchy. */ \
		NUM_RCU_LEVEL_2, \
		NUM_RCU_LEVEL_3, \
		NUM_RCU_LEVEL_4, /* == MAX_RCU_LEVELS */ \
	}, \
	.gpnum = -300, \
	.completed = -300, \
}

struct rcu_state rcu_state = RCU_STATE_INITIALIZER(rcu_state);
/* @@@ DEFINE_PER_CPU(struct rcu_data, rcu_data); */

struct rcu_state rcu_bh_state = RCU_STATE_INITIALIZER(rcu_bh_state);
/* @@@ DEFINE_PER_CPU(struct rcu_data, rcu_bh_data); */

static int blimit = 10;
static int qhimark = 10000;
static int qlowmark = 100;

/*
 * Does the current CPU require a yet-as-unscheduled grace period?
 */
static inline int
cpu_needs_another_gp(struct rcu_state *rsp, struct rcu_data *rdp)
{
	return *rdp->nxttail[RCU_DONE_TAIL] &&
	       ACCESS_ONCE(rsp->completed) == ACCESS_ONCE(rsp->gpnum);
}

/*
 * Return the root node of the specified rcu_state structure.
 */
static inline struct rcu_node *rcu_get_root(struct rcu_state *rsp)
{
	return &rsp->node[0];
}

/*
 * Compute the per-level fanout, either using the exact fanout specified
 * or balancing the tree, depending on CONFIG_RCU_FANOUT_EXACT.
 */
#ifdef CONFIG_RCU_FANOUT_EXACT
void rcu_init_levelspread(struct rcu_state *rsp)
{
	int i;

	for (i = NUM_RCU_LEVELS - 1; i >= 0; i--) {
		levelspread[i] = CONFIG_RCU_FANOUT;
	}
	
}
#else /* #ifdef CONFIG_RCU_FANOUT_EXACT */
void rcu_init_levelspread(struct rcu_state *rsp)
{
	int ccur;
	int cprv;
	int i;

	cprv = NR_CPUS;
	for (i = NUM_RCU_LEVELS - 1; i >= 0; i--) {
		ccur = rsp->levelcnt[i];
		rsp->levelspread[i] = (cprv + ccur - 1) / ccur;
		cprv = ccur;
	}
	
}
#endif /* #else #ifdef CONFIG_RCU_FANOUT_EXACT */

/*
 * When a given CPU first becomes aware of a grace period, it knows
 * that all of its pre-existing callbacks will be covered by the next
 * grace period.
 *
 * Similarly, if a given CPU has not yet let RCU know that it passed
 * through a quiescent state for the current grace period, then that
 * CPU knows that all of its callbacks may safely be invoked at the
 * end of the next grace period.
 */
static inline void
rcu_next_callbacks_are_ready(struct rcu_data *rdp)
{
	rdp->nxttail[RCU_NEXT_READY_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
}

/*
 * Update local state to record the newly noticed grace period.
 */
static void note_new_gpnum(struct rcu_state *rsp, struct rcu_data *rdp)
{
	rdp->qs_pending = 1;
	rdp->passed_quiesc = 0;
	rdp->gpnum = rsp->gpnum;
}

/*
 * Did a new RCU grace period start since we last checked?  Update
 * local state appropriately if so.
 */
static int
check_for_new_grace_period(struct rcu_state *rsp, struct rcu_data *rdp)
{
	unsigned long flags;

	local_irq_save(flags);
	if (rdp->gpnum != rsp->gpnum) {
		note_new_gpnum(rsp, rdp);
		local_irq_restore(flags);
		return 1;
	}
	local_irq_restore(flags);
	return 0;
}

/*
 * Start a new RCU grace period if warranted, re-initializing the hierarchy
 * in preparation for detecting the next grace period.  The caller must hold
 * the root node's ->lock, which is released before return.  Hard irqs must
 * be disabled.
 */
static void rcu_start_gp(struct rcu_state *rsp, struct rcu_data *rdp)
{
	struct rcu_node *rnp = rcu_get_root(rsp);
	struct rcu_node *rnp_cur;
	struct rcu_node *rnp_end;
	struct rcu_node *rnp_stack[NUM_RCU_LEVELS];

	if (!cpu_needs_another_gp(rsp, rdp)) {

		/*
		 * Either there is no need to detect any more grace periods
		 * at the moment, or we are already in the process of
		 * detecting one.  Either way, we should not start a new
		 * RCU grace period, so drop the lock and exit.
		 */

		spin_unlock(&rnp->lock);
		return;
	}

	/* Advance to a new grace period. */

	rsp->gpnum++;
	note_new_gpnum(rsp, rdp);

	/*
	 * Because we are first, we know that all our callbacks will
	 * be covered by this upcoming grace period, even the ones
	 * that were registered arbitrarily recently.
	 */

	rcu_next_callbacks_are_ready(rdp);
	rdp->nxttail[RCU_WAIT_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];

	/* Special-case the common single-level case. */

	if (NUM_RCU_NODES == 1) {
		rnp->qsmask = rnp->qsmaskinit;
		spin_unlock(&rnp->lock);
		return;
	}

	spin_unlock(&rnp->lock);
	
	/*
	 * Set all the quiescent-state-needed bits in all the non-leaf
	 * RCU nodes.  This operation relies on the layout of the
	 * hierarchy within the rsp->node[] array.  Note that other
	 * CPUs will access only the leaves of the hierarchy, which
	 * still indicate that no grace period is in progress.
	 *
	 * We therefore do not need to hold any locks.  Any required
	 * memory barriers will be supplied by the locks guarding the
	 * leaf rcu_nodes in the hierarchy.
	 */

	rnp_end = rsp->level[NUM_RCU_LEVELS - 1];
	for (rnp_cur = &rsp->node[0]; rnp_cur < rnp_end; rnp_cur++)
		rnp_cur->qsmask = rnp_cur->qsmaskinit;

	/*
	 * Now set up the leaf nodes.  Here we must be careful.  First,
	 * we need to hold the lock in order to exclude other CPUs, which
	 * might be contending for the leaf nodes' locks.  Second, as
	 * soon as we initialize a given leaf node, its CPUs might run
	 * up the rest of the hierarchy.  Third, CPUs might be coming
	 * online and going offline during this time.  We must therefore
	 * acquire locks for each node that we touch during this stage.
	 *
	 * Note that the grace period cannot complete until we finish
	 * the initialization process, as there will be at least one
	 * qsmask bit set in the root node until that time, namely the
	 * one corresponding to this CPU.
	 */

	rnp_end = &rsp->node[NUM_RCU_NODES];
	rnp_cur = rsp->level[NUM_RCU_LEVELS - 1];
	for (; rnp_cur < rnp_end; rnp_cur++) {
		spin_lock(&rnp_cur->lock);
		rnp_cur->qsmask = rnp_cur->qsmaskinit;
		spin_unlock(&rnp_cur->lock);
	}
}

/*
 * Advance this CPU's callbacks after the end of an RCU grace period.
 */
static void
rcu_process_gp_end(struct rcu_state *rsp, struct rcu_data *rdp)
{
	long completed_snap;
	unsigned long flags;

	local_irq_save(flags);
	completed_snap = ACCESS_ONCE(rsp->completed);

	/* Did another grace period end? */
	if (rdp->completed != completed_snap) {

		/* Advance callbacks.  No harm if list empty. */
		rdp->nxttail[RCU_DONE_TAIL] = rdp->nxttail[RCU_WAIT_TAIL];
		rdp->nxttail[RCU_WAIT_TAIL] = rdp->nxttail[RCU_NEXT_READY_TAIL];
		rdp->nxttail[RCU_NEXT_READY_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];

		/* Remember that we saw this grace-period completion. */
		rdp->completed = completed_snap;
	}
	local_irq_restore(flags);
}

/*
 * Record a quiescent state for the specified CPU.  Note that a CPU
 * going offline counts as a quiescent state.
 */
static void cpu_quiet(int cpu, struct rcu_state *rsp, struct rcu_data *rdp)
{
	long mask;
	struct rcu_node *rnp;

	rnp = rdp->mynode;
	spin_lock(&rnp->lock);
	mask = 1L << (cpu - rnp->grplo);
	for (;;) {
		if (!(rnp->qsmask & mask)) {

			/* Our bit has already been cleared, so done. */

			spin_unlock(&rnp->lock);
			return;
		}
		rnp->qsmask &= ~mask;
		if (rnp->qsmask != 0) {

			/* Other bits still set at this level, so done. */

			spin_unlock(&rnp->lock);
			return;
		}
		mask = 1L << rnp->grpnum;
		if (rnp->parent == NULL) {

			/* No more levels. */

			break;
		}
		spin_unlock(&rnp->lock);
		rnp = rnp->parent;
		spin_lock(&rnp->lock);
	}

	/*
	 * Get here if we are the last CPU to pass through a quiescent
	 * state for this grace period.  Clean up and let rcu_start_gp()
	 * start up the next grace period if one is needed.  Note that
	 * we still hold rnp->lock, as required by rcu_start_gp().
	 */
	rsp->completed = rsp->gpnum;
/*&&&&*/printf("cpu_quiet: end of grace period detected by %d.\n", rdp->cpu);
	rcu_process_gp_end(rsp, rdp);
	rcu_start_gp(rsp, rdp);
}

/*
 * Check to see if there is a new grace period of which this CPU
 * is not yet aware, and if so, set up local rcu_data state for it.
 * Otherwise, see if this CPU has just passed through its first
 * quiescent state for this grace period, and record that fact if so.
 */
static void
rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
{
	/* If there is now a new grace period, record and return. */
	if (check_for_new_grace_period(rsp, rdp))
		return;

	/* Did this CPU already do its part for the current grace period? */
	if (!rdp->qs_pending)
		return;

	/*
	 * Was there a quiescent state since the beginning of the grace
	 * period? If no, then exit and wait for the next call.
	 */
	if (!rdp->passed_quiesc)
		return;

	/*
	 * Say we did our quiescent state, and set up to process all
	 * currently pending callbacks at the end of the next grace
	 * period.
	 */
	rdp->qs_pending = 0;
	rcu_next_callbacks_are_ready(rdp);
	cpu_quiet(rdp->cpu, rsp, rdp);

	/*
	 * rdp->quiescbatch/rcp->cur and the cpu bitmap can come out of sync
	 * during cpu startup. Ignore the quiescent state.  @@@ fixed???
	 */
}

#ifdef CONFIG_HOTPLUG_CPU


/*
 * Remove the outgoing CPU from the bitmasks in the rcu_node hierarchy
 * and move all callbacks from the outgoing CPU to the current one.
 */
static void __rcu_offline_cpu(int cpu, struct rcu_state *rsp,
			      struct rcu_data *rdp, struct rcu_data *rdp_me)
{
	int i;
	long mask;
	struct rcu_node *rnp;

	/* Remove the outgoing CPU from the masks in the rcu_node hierarchy. */
	rnp = rdp->mynode;
	spin_lock(&rnp->lock);
	mask = 1L << (cpu - rnp->grplo);
	for (;;) {
		rnp->qsmaskinit &= ~mask;
		if (rnp->qsmaskinit != 0) {
			spin_unlock(&rnp->lock);
			break;
		}
		mask = 1L << rnp->grpnum;
		spin_unlock(&rnp->lock);
		rnp = rnp->parent;
		if (rnp == NULL)
			break;
		spin_lock(&rnp->lock);
	}

	/* Being offline is a quiescent state, so go record it. */
	cpu_quiet(cpu, rsp, rdp);

	/*
	 * Move callbacks from the outgoing CPU to the running CPU.
	 * Note that the outgoing CPU is now quiscent, so it is now
	 * (uncharacteristically) safe to access it rcu_data structure.
	 * Note also that we must carefully retain the order of the
	 * outgoing CPU's callbacks in order for rcu_barrier() to work
	 * correctly.  Finally, note that we start all the callbacks
	 * afresh, even those that have passed through a grace period
	 * and are therefore ready to invoke.  The theory is that hotplug
	 * events are rare, and that if they are frequent enough to
	 * indefinitely delay callbacks, you have far worse things to
	 * be worrying about.
	 *
	 * We disable irqs to prevent races with call_rcu() invoked
	 * from interrupt handlers.
	 */
	if (rdp->nxtlist != NULL) {
		local_irq_disable();
		*rdp_me->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist;
		rdp_me->nxttail[RCU_NEXT_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
		rdp->nxtlist = NULL;
		for (i = 0; i < RCU_NEXT_SIZE; i++)
			rdp->nxttail[i] = &rdp->nxtlist;
		local_irq_enable();
	}
}

/*
 * Remove the specified CPU from the RCU hierarchy and move any pending
 * callbacks that it might have to the current CPU.  This code assumes
 * that at least one CPU in the system will remain running at all times.
 * Any attempt to offline -all- CPUs is likely to strand RCU callbacks.
 */
static void rcu_offline_cpu(int cpu)  /* !HOTPLUG_CPU @@@ */
{
	struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
	struct rcu_data *rdp_me = &__get_cpu_var(rcu_data);
	struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);
	struct rcu_data *bh_rdp_me = &__get_cpu_var(rcu_bh_data);

	__rcu_offline_cpu(cpu, &rcu_state, rdp, rdp_me);
	__rcu_offline_cpu(cpu, &rcu_bh_state, bh_rdp, bh_rdp_me);
}

#else /* #ifdef CONFIG_HOTPLUG_CPU */

static inline void
__rcu_offline_cpu(int cpu, struct rcu_state *rsp, struct rcu_data *rdp)
{
}

#endif /* #else #ifdef CONFIG_HOTPLUG_CPU */

/*
 * Invoke any RCU callbacks that have made it to the end of their grace
 * period.
 */
static void rcu_do_batch(struct rcu_data *rdp)
{
	struct rcu_head *next, *list, **tail;
	int count;

	/* If no callbacks are ready, just return.*/
	if (&rdp->nxtlist == rdp->nxttail[RCU_DONE_TAIL])
		return;

	/*
	 * Extract the list of ready callbacks, disabling to prevent
	 * races with call_rcu() from interrupt handlers.
	 */
	local_irq_disable();
	list = rdp->nxtlist;
	rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL];
	*rdp->nxttail[RCU_DONE_TAIL] = NULL;
	tail = rdp->nxttail[RCU_DONE_TAIL];
	for (count = RCU_NEXT_SIZE - 1; count >= 0; count--)
		if (rdp->nxttail[count] == rdp->nxttail[RCU_DONE_TAIL])
			rdp->nxttail[count] = &rdp->nxtlist;
	local_irq_enable();

	/* Invoke callbacks. */
	count = 0;
	while (list) {
		next = list->next;
		prefetch(next);
		list->func(list);
		list = next;
		if (++count >= rdp->blimit)
			break;
	}

	/* Update count, and requeue any remaining callbacks. */
	local_irq_disable();
	rdp->qlen -= count;
	if (list != NULL) {
		*tail = rdp->nxtlist;
		rdp->nxtlist = list;
		for (count = 0; count < RCU_NEXT_SIZE; count++)
			if (&rdp->nxtlist == rdp->nxttail[count])
				rdp->nxttail[count] = tail;
			else
				break;
	}
	local_irq_enable();

	/* Reinstate batch limit if we have worked down the excess. */
	if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
		rdp->blimit = blimit;

	/* Re-raise the RCU softirq if there are callbacks remaining. */
	if (&rdp->nxtlist == rdp->nxttail[RCU_DONE_TAIL])
		raise_rcu_softirq();
}

/*
 * This does the RCU processing work from softirq context.
 */
static void
__rcu_process_callbacks(struct rcu_state *rsp, struct rcu_data *rdp)
{
	/*
	 * Advance callbacks in response to end of earlier grace
	 * period that some other CPU ended.
	 */
	rcu_process_gp_end(rsp, rdp);

	/* Update RCU state based on any recent quiescent states. */
	rcu_check_quiescent_state(rsp, rdp);

	/* Does this CPU require a not-yet-started grace period? */
	if (cpu_needs_another_gp(rsp, rdp)) {
		spin_lock(&rcu_get_root(rsp)->lock);
		rcu_start_gp(rsp, rdp);  /* releases rsp->lock */
	}

	rcu_do_batch(rdp);
}

static void rcu_process_callbacks(struct softirq_action *unused)
{
	/*
	 * Memory references from any prior RCU read-side critical sections
	 * executed by the interrupted code must be see before any RCU
	 * grace-period manupulations below.
	 */

	smp_mb(); /* See above block comment. */

	__rcu_process_callbacks(&rcu_state, &__get_cpu_var(rcu_data));
	__rcu_process_callbacks(&rcu_bh_state, &__get_cpu_var(rcu_bh_data));

	/*
	 * Memory references from any later RCU read-side critical sections
	 * executed by the interrupted code must be see after any RCU
	 * grace-period manupulations above.
	 */

	smp_mb(); /* See above block comment. */
}

/*
 * Check to see if there is any immediate RCU-related work to be done
 * by the current CPU, for the specified type of RCU, returning 1 if so.
 * The checks are in order of increasing expense: checks that can be
 * carried out against CPU-local state are performed first.
 */
static int __rcu_pending(struct rcu_state *rsp, struct rcu_data *rdp)
{
	/* Check for CPU stalls, if enabled. */
	/* @@@ check_cpu_stall(rsp, rdp); @@@ */

	/* Is the RCU core waiting for a quiescent state from this CPU? */
	if (rdp->qs_pending)
		return 1;

	/* Does this CPU have finished callbacks to invoke? */
	if (rdp->nxttail[RCU_DONE_TAIL] != &rdp->nxtlist)
		return 1;

	/* Are there callbacks waiting for a GP that needs to be started? */
	if (cpu_needs_another_gp(rsp, rdp))
		return 1;

	/* Has another RCU grace period has been detected?  */
	if (ACCESS_ONCE(rsp->completed) != rdp->completed)
		return 1;

	/* nothing to do */
	return 0;
}

/*
 * Check to see if there is any immediate RCU-related work to be done
 * by the current CPU, returning 1 if so.  This function is part of the
 * RCU implementation; it is -not- an exported member of the RCU API.
 */
int rcu_pending(int cpu)
{
	return __rcu_pending(&rcu_state, &per_cpu(rcu_data, cpu)) ||
	       __rcu_pending(&rcu_bh_state, &per_cpu(rcu_bh_data, cpu));
}

/*
 * Check to see if this CPU is in a non-context-switch quiescent state
 * (user mode or idle loop for rcu, non-softirq execution for rcu_bh).
 * Also schedule the RCU softirq handler.
 *
 * This function must be called with hardirqs disabled.  It is normally
 * invoked from the scheduling-clock interrupt.  If rcu_pending returns
 * false, there is no point in invoking rcu_check_callbacks().
 */
void rcu_check_callbacks(int cpu, int user)
{
	if (user ||
	    (idle_cpu(cpu) && !in_softirq() &&
				hardirq_count() <= (1 << HARDIRQ_SHIFT))) {

		/*
		 * Get here if this CPU took its interrupt from user
		 * mode or from the idle loop, and if this is not a
		 * nested interrupt.  In this case, the CPU is in
		 * a quiescent state, so count it.
		 *
		 * Also do a memory barrier.  This is needed to handle
		 * the case where writes from a preempt-disable section
		 * of code get reordered into schedule() by this CPU's
		 * write buffer.  The memory barrier makes sure that
		 * the rcu_qsctr_inc() and rcu_bh_qsctr_inc() are see
		 * by other CPUs to happen after any such write.
		 */

		smp_mb();  /* See above block comment. */
		rcu_qsctr_inc(cpu);
		rcu_bh_qsctr_inc(cpu);

	} else if (!in_softirq()) {

		/*
		 * Get here if this CPU did not take its interrupt from
		 * softirq, in other words, if it is not interrupting
		 * a rcu_bh read-side critical section.  This is an _bh
		 * critical section, so count it.  The memory barrier
		 * is needed for the same reason as is the above one.
		 */

		smp_mb();  /* See above block comment. */
		rcu_bh_qsctr_inc(cpu);
	}
	raise_rcu_softirq();
}

static void
__call_rcu(struct rcu_head *head, struct rcu_state *rsp, struct rcu_data *rdp)
{
	smp_mb(); /* Ensure RCU update seen before callback registry. */

	/*
	 * Opportunistically note grace-period endings and beginnings.
	 * Note that we might see a beginning right after we see an
	 * end, but never vice versa, since this CPU has to pass through
	 * a quiescent state betweentimes.
	 */
	rcu_process_gp_end(rsp, rdp);
	check_for_new_grace_period(rsp, rdp);

	*rdp->nxttail[RCU_NEXT_TAIL] = head;
	rdp->nxttail[RCU_NEXT_TAIL] = &head->next;

	if (unlikely(++rdp->qlen > qhimark)) {
		rdp->blimit = INT_MAX;
		/* @@@ force_quiescent_state(rsp, rdp); */
	}
}

/**
 * call_rcu - Queue an RCU callback for invocation after a grace period.
 * @head: structure to be used for queueing the RCU updates.
 * @func: actual update function to be invoked after the grace period
 *
 * The update function will be invoked some time after a full grace
 * period elapses, in other words after all currently executing RCU
 * read-side critical sections have completed.  RCU read-side critical
 * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
 * and may be nested.
 */
void call_rcu(struct rcu_head *head,
				void (*func)(struct rcu_head *rcu))
{
	unsigned long flags;

	head->func = func;
	head->next = NULL;
	local_irq_save(flags);
	__call_rcu(head, &rcu_state, &__get_cpu_var(rcu_data));
	local_irq_restore(flags);
}
/*@@@ EXPORT_SYMBOL_GPL(call_rcu); */

/**
 * call_rcu_bh - Queue an RCU for invocation after a quicker grace period.
 * @head: structure to be used for queueing the RCU updates.
 * @func: actual update function to be invoked after the grace period
 *
 * The update function will be invoked some time after a full grace
 * period elapses, in other words after all currently executing RCU
 * read-side critical sections have completed. call_rcu_bh() assumes
 * that the read-side critical sections end on completion of a softirq
 * handler. This means that read-side critical sections in process
 * context must not be interrupted by softirqs. This interface is to be
 * used when most of the read-side critical sections are in softirq context.
 * RCU read-side critical sections are delimited by rcu_read_lock() and
 * rcu_read_unlock(), * if in interrupt context or rcu_read_lock_bh()
 * and rcu_read_unlock_bh(), if in process context. These may be nested.
 */
void call_rcu_bh(struct rcu_head *head,
				void (*func)(struct rcu_head *rcu))
{
	unsigned long flags;

	head->func = func;
	head->next = NULL;
	local_irq_save(flags);
	__call_rcu(head, &rcu_bh_state, &__get_cpu_var(rcu_bh_data));
	local_irq_restore(flags);
}
/* @@@ EXPORT_SYMBOL_GPL(call_rcu_bh); */

/*
 * Initialize a CPU's per-CPU RCU data.  We take this "scorched earth"
 * approach so that we don't have to worry about how long the CPU has
 * been gone, or whether it ever was online previously.  We do trust the
 * ->mynode field, as it is constant for a given struct rcu_data and
 * initialized during early boot.
 *
 * Note that only one online or offline event can be happening at a given
 * time.  Note also that we can accept some slop in the rsp->completed
 * access due to the fact that this CPU cannot possibly have any RCU
 * callbacks in flight yet.
 */
static void
rcu_init_percpu_data(int cpu, struct rcu_state *rsp, struct rcu_data *rdp)
{
	long completed_snap;
	int i;
	long mask;
	struct rcu_node *rnp = rdp->mynode;

	spin_lock(&rnp->lock);
	completed_snap = ACCESS_ONCE(rsp->completed);
	memset(rdp, 0, sizeof(*rdp));
	rdp->completed = completed_snap;
	rdp->gpnum = completed_snap;
	rdp->passed_quiesc = 1;
	rdp->qs_pending = 0;
	rdp->mynode = rnp;
	for (i = 0; i < RCU_NEXT_SIZE; i++)
		rdp->nxttail[i] = &rdp->nxtlist;
	rdp->blimit = /* @@@ blimit */ 10;
	rdp->cpu = cpu;

	/* Add CPU to rcu_node bitmasks. */

	mask = 1L << (cpu - rnp->grplo);
	for (;;) {
		rnp->qsmaskinit |= mask;
		mask = 1L << rnp->grpnum;
		spin_unlock(&rnp->lock);
		rnp = rnp->parent;
		if ((rnp == NULL) || !!(rnp->qsmaskinit & mask))
			break;
		spin_lock(&rnp->lock);
	}
}

static void __cpuinit rcu_online_cpu(int cpu)
{
	struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
	struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);

	rcu_init_percpu_data(cpu, &rcu_state, rdp);
	rcu_init_percpu_data(cpu, &rcu_bh_state, bh_rdp);
	/* open_softirq(RCU_SOFTIRQ, rcu_process_callbacks, NULL); @@@ */
}

/*
 * Handle CPU online/offline notifcation events.
 */
static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
				unsigned long action, void *hcpu)
{
	long cpu = (long)hcpu;

	switch (action) {
	case CPU_UP_PREPARE:
	case CPU_UP_PREPARE_FROZEN:
		rcu_online_cpu(cpu);
		break;
	case CPU_DEAD:
	case CPU_DEAD_FROZEN:
		rcu_offline_cpu(cpu);
		break;
	default:
		break;
	}
	return NOTIFY_OK;
}

/*
 * Helper function for rcu_init() that initializes one rcu_state structure.
 */
static void __init rcu_init_one(struct rcu_state *rsp)
{
	int i;
	int j;
	struct rcu_node *rnp;

	/* Initialize the level-tracking arrays. */

	for (i = 1; i < NUM_RCU_LEVELS; i++) {
		rsp->level[i] = rsp->level[i - 1] + rsp->levelcnt[i - 1];
	}
	rcu_init_levelspread(rsp);

	/* Initialize the elements themselves, starting from the leaves. */

	for (i = NUM_RCU_LEVELS - 1; i > 0; i--) {
		rnp = rsp->level[i];
		for (j = 0; j < rsp->levelcnt[i]; j++, rnp++) {
			spin_lock_init(&rnp->lock);
			rnp->qsmask = rsp->node[0].qsmask;
			rnp->grplo = j * rsp->levelspread[i];
			rnp->grphi = (j + 1) * rsp->levelspread[i] - 1;
			if (rnp->grphi >= rsp->levelcnt[i + 1])
				rnp->grphi = rsp->levelcnt[i + 1] - 1;
			rnp->qsmaskinit = 0;
			if (i != NUM_RCU_LEVELS - 1)
				rnp->grplo = rnp->grphi = 0;
			rnp->grpnum = j % rsp->levelspread[i - 1];
			rnp->level = i;
			rnp->parent = rsp->level[i - 1] + 
				      j / rsp->levelspread[i - 1];
		}
	}

	/* Initialize the root of the hierarchy. */

	rsp->node[0].qsmaskinit = 0;
	rsp->node[0].grpnum = -1;
	rsp->signaled = 0;
}

/*
 * Helper macro for rcu_init().  To be used nowhere else!
 * Assigns leaf node pointers into each CPU's rcu_data structure.
 */
#define RCU_DATA_PTR_INIT(rsp, rcu_data) \
do { \
	rnp = (rsp)->level[NUM_RCU_LEVELS - 1]; \
	j = 0; \
	for_each_possible_cpu(i) { \
		if (i > rnp[j].grphi) \
			j++; \
		per_cpu(rcu_data, i).mynode = &rnp[j]; \
	} \
} while (0)

static struct notifier_block __cpuinitdata rcu_nb = {
	.notifier_call	= rcu_cpu_notify,
};

static void __init rcu_init(void)
{
	int i;			/* All used by RCU_DATA_PTR_INIT(). */
	int j;
	struct rcu_node *rnp;

	rcu_init_one(&rcu_state);
	RCU_DATA_PTR_INIT(&rcu_state, rcu_data);
	rcu_init_one(&rcu_bh_state);
	RCU_DATA_PTR_INIT(&rcu_bh_state, rcu_bh_data);

	for_each_online_cpu(i)
		rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE, (void *)(long)i);
#if 0 /* @@@ */
	/* Register notifier for non-boot CPUs */
	register_cpu_notifier(&rcu_nb);
#endif /* @@@ #if 0 */
}

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups
  2008-08-18 14:04       ` Paul E. McKenney
@ 2008-08-19 10:48         ` Manfred Spraul
  2008-08-19 14:03           ` Paul E. McKenney
  0 siblings, 1 reply; 17+ messages in thread
From: Manfred Spraul @ 2008-08-19 10:48 UTC (permalink / raw)
  To: paulmck; +Cc: linux-kernel, mingo, akpm, oleg, dipankar, rostedt, dvhltc, niv

[-- Attachment #1: Type: text/plain, Size: 345 bytes --]

Hi Paul,

You are beating me: I've just finished a my implementation, it's attached.
It boots with qemu, rcu torture enabled, both single and 8-cpu.

Two problems are open:
- right now, I don't use rcu_qsctr_inc() at all.
- qlowmark is set to 0, any other value breaks synchronize_rcu().

And I must read your implementation....

--
    Manfred

[-- Attachment #2: rcuclassic.c --]
[-- Type: text/plain, Size: 18378 bytes --]

/*
 * Read-Copy Update mechanism for mutual exclusion
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * Copyright IBM Corporation, 2001
 *
 * Authors: Dipankar Sarma <dipankar@in.ibm.com>
 *	    Manfred Spraul <manfred@colorfullife.com>
 *
 * Based on the original work by Paul McKenney <paulmck@us.ibm.com>
 * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
 * Papers:
 * http://www.rdrop.com/users/paulmck/paper/rclockpdcsproof.pdf
 * http://lse.sourceforge.net/locking/rclock_OLS.2001.05.01c.sc.pdf (OLS2001)
 *
 * For detailed explanation of Read-Copy Update mechanism see -
 * 		Documentation/RCU
 *
 * Rewrite based on a global state machine
 * (C) Manfred Spraul <manfred@colorfullife.com>, 2008
 *
 */
#include <linux/types.h>
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/spinlock.h>
#include <linux/smp.h>
#include <linux/rcupdate.h>
#include <linux/interrupt.h>
#include <linux/sched.h>
#include <asm/atomic.h>
#include <linux/bitops.h>
#include <linux/module.h>
#include <linux/completion.h>
#include <linux/moduleparam.h>
#include <linux/percpu.h>
#include <linux/notifier.h>
#include <linux/cpu.h>
#include <linux/mutex.h>
#include <linux/time.h>
#include <linux/proc_fs.h>

#ifdef CONFIG_DEBUG_LOCK_ALLOC
static struct lock_class_key rcu_lock_key;
struct lockdep_map rcu_lock_map =
	STATIC_LOCKDEP_MAP_INIT("rcu_read_lock", &rcu_lock_key);
EXPORT_SYMBOL_GPL(rcu_lock_map);
#endif

/* Definition for rcupdate control block. */
static struct rcu_global_state rcu_global_state_normal = {
	.lock = __SEQLOCK_UNLOCKED(&rcu_global_state_normal.lock),
	.state = RCU_STATE_DESTROY,
	.start_immediately = 0,
	.cpus = __RCU_CPUMASK_INIT(&rcu_global_state_normal.cpus)
};

static struct rcu_global_state rcu_global_state_bh = {
	.lock = __SEQLOCK_UNLOCKED(&rcu_global_state_bh.lock),
	.state = RCU_STATE_DESTROY,
	.start_immediately = 0,
	.cpus = __RCU_CPUMASK_INIT(&rcu_global_state_bh.cpus)
};

DEFINE_PER_CPU(struct rcu_cpu_state, rcu_cpudata_normal) = { 0L };
DEFINE_PER_CPU(struct rcu_cpu_state, rcu_cpudata_bh) = { 0L };
DEFINE_PER_CPU(struct rcu_cpu_dead, rcu_cpudata_dead) = { 0L };


/* FIXME: setting qlowmark to non-zero causes a hang.
 * probably someone waits for a rcu completion - but
 * the real rcu cycle is never started because qlowmark is not
 * reached. (e.g. synchronize_rcu()).
 * idea: replace with a timer based delay.
 */
int qlowmark = 0;

void rcu_cpumask_init(struct rcu_cpumask *rcm)
{
	BUG_ON(!irqs_disabled());
	spin_lock(&rcm->lock);
	/*
	 * Accessing nohz_cpu_mask before incrementing rcp->cur needs a
	 * Barrier  Otherwise it can cause tickless idle CPUs to be
	 * included in rcp->cpumask, which will extend graceperiods
	 * unnecessarily.
	 */
	smp_mb();
	cpus_andnot(rcm->cpus, cpu_online_map, nohz_cpu_mask);

	spin_unlock(&rcm->lock);
}

int rcu_cpumask_clear_and_test(struct rcu_cpumask *rcm, int cpu)
{
	int ret = 0;

	BUG_ON(!irqs_disabled());
	spin_lock(&rcm->lock);
	cpu_clear(cpu, rcm->cpus);
	if (cpus_empty(rcm->cpus))
		ret = 1;
	spin_unlock(&rcm->lock);

	return ret;
}

long rcu_batches_completed(void)
{
	return rcu_global_state_normal.completed;
}

long rcu_batches_completed_bh(void)
{
	return rcu_global_state_normal.completed;
}

/**
 * rcu_state_startcycle - start the next rcu cycle
 * @rgs: global rcu state
 *
 * The function starts the next rcu cycle, either immediately or
 * by setting rgs->start_immediately.
 */ 
static void rcu_state_startcycle(struct rcu_global_state *rgs)
{
	unsigned seq;
	int do_real_start;

	BUG_ON(!irqs_disabled());
	do {
		seq = read_seqbegin(&rgs->lock);
		if (rgs->start_immediately == 0) {
			do_real_start = 1;
		} else {
			do_real_start = 0;
			BUG_ON(rgs->state == RCU_STATE_DESTROY);
		}
	} while (read_seqretry(&rgs->lock, seq));

	if (do_real_start) {
		write_seqlock(&rgs->lock);
		switch(rgs->state) {
		case RCU_STATE_DESTROY_AND_COLLECT:
		case RCU_STATE_GRACE:
			rgs->start_immediately = 1;
			break;
		case RCU_STATE_DESTROY:
			rgs->state = RCU_STATE_DESTROY_AND_COLLECT;
			BUG_ON(rgs->start_immediately);
			rcu_cpumask_init(&rgs->cpus);
			break;
		default:
			BUG();
		}
		write_sequnlock(&rgs->lock);
	}
}

static void rcu_checkqlen(struct rcu_global_state *rgs, struct rcu_cpu_state *rcs, int inc)
{
	BUG_ON(!irqs_disabled());
	rcs->newqlen += inc; 
	if (unlikely(rcs->newqlen > qlowmark)) {

		/* FIXME: actually, this code only needs to run once,
		 *  i.e. when qlen == qlowmark. But: qlowmark can be changed at runtime.
		 * and: doesn't work anyway, see comment near qlowmark
		 */
		rcu_state_startcycle(rgs);
	}
}


static void __call_rcu(struct rcu_head *head, struct rcu_global_state *rgs,
		struct rcu_cpu_state *rcs)
{
	if (rcs->new == NULL)
		rcs->newtail = &head->next;
	head->next = rcs->new;
	rcs->new = head;

	rcu_checkqlen(rgs, rcs, 1);
}

/**
 * call_rcu - Queue an RCU callback for invocation after a grace period.
 * @head: structure to be used for queueing the RCU updates.
 * @func: actual update function to be invoked after the grace period
 *
 * The update function will be invoked some time after a full grace
 * period elapses, in other words after all currently executing RCU
 * read-side critical sections have completed.  RCU read-side critical
 * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
 * and may be nested.
 */
void call_rcu(struct rcu_head *head,
				void (*func)(struct rcu_head *rcu))
{
	unsigned long flags;

	head->func = func;
	local_irq_save(flags);
	__call_rcu(head, &rcu_global_state_normal, &__get_cpu_var(rcu_cpudata_normal));
	local_irq_restore(flags);
}
EXPORT_SYMBOL_GPL(call_rcu);

/**
 * call_rcu_bh - Queue an RCU for invocation after a quicker grace period.
 * @head: structure to be used for queueing the RCU updates.
 * @func: actual update function to be invoked after the grace period
 *
 * The update function will be invoked some time after a full grace
 * period elapses, in other words after all currently executing RCU
 * read-side critical sections have completed. call_rcu_bh() assumes
 * that the read-side critical sections end on completion of a softirq
 * handler. This means that read-side critical sections in process
 * context must not be interrupted by softirqs. This interface is to be
 * used when most of the read-side critical sections are in softirq context.
 * RCU read-side critical sections are delimited by rcu_read_lock() and
 * rcu_read_unlock(), * if in interrupt context or rcu_read_lock_bh()
 * and rcu_read_unlock_bh(), if in process context. These may be nested.
 */
void call_rcu_bh(struct rcu_head *head,
				void (*func)(struct rcu_head *rcu))
{
	unsigned long flags;

	head->func = func;
	local_irq_save(flags);
	__call_rcu(head, &rcu_global_state_bh, &__get_cpu_var(rcu_cpudata_bh));
	local_irq_restore(flags);
}
EXPORT_SYMBOL_GPL(call_rcu_bh);

#ifdef CONFIG_HOTPLUG_CPU

/**
 * rcu_bulk_add - bulk add new rcu objects.
 * @rgs: global rcu state
 * @rcs: cpu state
 * @h: linked list of rcu objects.
 *
 * Must be called with enabled local interrupts
 */
static void rcu_bulk_add(struct rcu_global_state *rgs, struct rcu_cpu_state *rcs, struct rcu_head *h, struct rcu_head **htail, int len)
{

	BUG_ON(irqs_disabled());

	if (len > 0) {
		local_irq_disable();
		if (rcs->new) {
			(*htail) = rcs->new;
			rcs->new = h;
		} else {
			rcs->new = h;
			rcs->newtail = htail;
		}
		rcu_checkqlen(rgs, rcs, len);
		local_irq_enable();
	}
}

#define RCU_BATCH_MIN		100
#define	RCU_BATCH_INCFACTOR	2
#define RCU_BATCH_DECFACTOR	4

static void rcu_move_and_raise(struct rcu_cpu_state *rcs)
{
	struct rcu_cpu_dead *rcd = &per_cpu(rcu_cpudata_dead, smp_processor_id());

	BUG_ON(!irqs_disabled());

	/* update batch limit:
	 * - if there are still old entries when new entries are added:
	 *   double the batch count.
	 * - if there are no old entries: reduce it by 25%, but never below 100.
	 */
	if (rcd->deadqlen)
		rcd->batchcount = rcd->batchcount*RCU_BATCH_INCFACTOR;
	 else
		rcd->batchcount = rcd->batchcount-rcd->batchcount/RCU_BATCH_DECFACTOR;
	if (rcd->batchcount < RCU_BATCH_MIN)
		rcd->batchcount = RCU_BATCH_MIN;

	if (rcs->oldqlen) {
		(*rcs->oldtail) = rcd->dead;
		rcd->dead = rcs->old;
		rcd->deadqlen += rcs->oldqlen;
		rcs->old = NULL;
		rcs->oldtail = NULL;
		rcs->oldqlen = 0;
	} 
	BUG_ON(rcs->old);
	BUG_ON(rcs->oldtail);
	BUG_ON(rcs->oldqlen);
	raise_softirq(RCU_SOFTIRQ);
}

static void rcu_state_machine(struct rcu_global_state *rgs, struct rcu_cpu_state *rcs, int is_quiet)
{
	int inc_state;
	unsigned seq;
	unsigned long flags;

	inc_state = 0;
	do {
		seq = read_seqbegin(&rgs->lock);
		local_irq_save(flags);
		if (rgs->state != rcs->state) {
			inc_state = 0;
			switch(rgs->state) {
			case RCU_STATE_DESTROY:
				rcs->state = rgs->state;
				rcu_move_and_raise(rcs);
				break;
			case RCU_STATE_DESTROY_AND_COLLECT:
				rcs->state = rgs->state;
				rcu_move_and_raise(rcs);
				rcs->old = rcs->new;
				rcs->oldtail = rcs->newtail;
				rcs->oldqlen = rcs->newqlen;
				rcs->new = NULL;
				rcs->newtail = NULL;
				rcs->newqlen = 0;
				if (rcu_cpumask_clear_and_test(&rgs->cpus, smp_processor_id()))
					inc_state = 1;
				break;
			case RCU_STATE_GRACE: 
				if (is_quiet) {
					rcs->state = rgs->state;
					if (rcu_cpumask_clear_and_test(&rgs->cpus, smp_processor_id()))
						inc_state = 1;
				}
				break;
			default:
				BUG();
			}
		}
		local_irq_restore(flags);
	} while (read_seqretry(&rgs->lock, seq));

	
	if (unlikely(inc_state)) {
		local_irq_save(flags);
		write_seqlock(&rgs->lock);
		/*
		 * double check for races: If e.g. a new cpu starts up it
		 * will call the state machine although it's not listed in the
		 * cpumasks. Then multiple cpu could could see the cleared bitmask
		 * and try to advance the state. In this case, only the first
		 * cpu does something, the remaining incs are ignored.
		 */
		if (rgs->state == rcs->state) {
			/*
			 * advance the state machine:
			 * - from COLLECT to GRACE
			 * - from GRACE to DESTROY/COLLECT
			 */
			switch(rgs->state) {
			case RCU_STATE_DESTROY_AND_COLLECT:
				rgs->state = RCU_STATE_GRACE;
				rcu_cpumask_init(&rgs->cpus);
				break;
			case RCU_STATE_GRACE:
				rgs->completed++;
				if (rgs->start_immediately) {
					rgs->state = RCU_STATE_DESTROY_AND_COLLECT;
					rcu_cpumask_init(&rgs->cpus);
				} else {
					rgs->state = RCU_STATE_DESTROY;
				}
				rgs->start_immediately = 0;
				break;
			default:
				BUG();
			}
		}
		write_sequnlock(&rgs->lock);
		local_irq_restore(flags);
	}
}

static void __rcu_offline_cpu(struct rcu_global_state *rgs, struct rcu_cpu_state *this_rcs,
					struct rcu_cpu_state *other_rcs, int cpu)
{
	/* task 1: move all entries from the new cpu into the lists of the current cpu.
	 * locking: The other cpu is dead, thus no locks are required.
	 *  Thus it's more or less a bulk call_rcu().
	 * For the sake of simplicity, all objects are treated as "new", even the objects
	 * that are already in old.
	 */
	rcu_bulk_add(rgs, this_rcs, other_rcs->new, other_rcs->newtail, other_rcs->newqlen);
	rcu_bulk_add(rgs, this_rcs, other_rcs->old, other_rcs->oldtail, other_rcs->oldqlen);


	/* task 2: handle the cpu bitmask of the other cpu
	 * We know that the other cpu is dead, thus it's guaranteed not to be holding
	 * any pointers to rcu protected objects.
	 */

	rcu_state_machine(rgs, other_rcs, 1);
}

static void rcu_offline_cpu(int cpu)
{
	struct rcu_cpu_state *this_rcs_normal = &get_cpu_var(rcu_cpudata_normal);
	struct rcu_cpu_state *this_rcs_bh = &get_cpu_var(rcu_cpudata_bh);

	BUG_ON(irqs_disabled());

	__rcu_offline_cpu(&rcu_global_state_normal, this_rcs_normal,
					&per_cpu(rcu_cpudata_normal, cpu), cpu);
	__rcu_offline_cpu(&rcu_global_state_bh, this_rcs_bh,
					&per_cpu(rcu_cpudata_bh, cpu), cpu);
	put_cpu_var(rcu_cpudata_normal);
	put_cpu_var(rcu_cpudata_bh);

	BUG_ON(rcu_needs_cpu(cpu));
}

#else

static void rcu_offline_cpu(int cpu)
{
}

#endif

static int __rcu_pending(struct rcu_global_state *rgs, struct rcu_cpu_state *rcs)
{
	/* quick and dirty check for pending */
	if (rgs->state != rcs->state)
		return 1;
	return 0;
}

/*
 * Check to see if there is any immediate RCU-related work to be done
 * by the current CPU, returning 1 if so.  This function is part of the
 * RCU implementation; it is -not- an exported member of the RCU API.
 */
int rcu_pending(int cpu)
{
	return __rcu_pending(&rcu_global_state_normal, &per_cpu(rcu_cpudata_normal, cpu)) ||
		__rcu_pending(&rcu_global_state_bh, &per_cpu(rcu_cpudata_bh, cpu));
}

/*
 * Check to see if any future RCU-related work will need to be done
 * by the current CPU, even if none need be done immediately, returning
 * 1 if so.  This function is part of the RCU implementation; it is -not-
 * an exported member of the RCU API.
 */
int rcu_needs_cpu(int cpu)
{
	struct rcu_cpu_state *rcs_normal = &per_cpu(rcu_cpudata_normal, cpu);
	struct rcu_cpu_state *rcs_bh = &per_cpu(rcu_cpudata_bh, cpu);

	return !!rcs_normal->new || !!rcs_normal->old ||
		!!rcs_bh->new || !!rcs_bh->old ||
		rcu_pending(cpu);
}

/**
 * rcu_check_callback(cpu, user) - external entry point for grace checking
 * @cpu: cpu id.
 * @user: user space was interrupted.
 *
 * Top-level function driving RCU grace-period detection, normally
 * invoked from the scheduler-clock interrupt.  This function simply
 * increments counters that are read only from softirq by this same
 * CPU, so there are no memory barriers required.
 *
 * This function can run with disabled local interrupts, thus all
 * callees must use local_irq_save()
 */
void rcu_check_callbacks(int cpu, int user)
{
	if (user ||
	    (idle_cpu(cpu) && !in_softirq() &&
				hardirq_count() <= (1 << HARDIRQ_SHIFT))) {

		/*
		 * Get here if this CPU took its interrupt from user
		 * mode or from the idle loop, and if this is not a
		 * nested interrupt.  In this case, the CPU is in
		 * a quiescent state, so count it.
		 *
		 */
		rcu_state_machine(&rcu_global_state_normal, &per_cpu(rcu_cpudata_normal, cpu), 1);
		rcu_state_machine(&rcu_global_state_bh, &per_cpu(rcu_cpudata_bh, cpu), 1);

	} else if (!in_softirq()) {

		/*
		 * Get here if this CPU did not take its interrupt from
		 * softirq, in other words, if it is not interrupting
		 * a rcu_bh read-side critical section.  This is an _bh
		 * critical section, so count it.
		 */
		rcu_state_machine(&rcu_global_state_normal, &per_cpu(rcu_cpudata_normal, cpu), 0);
		rcu_state_machine(&rcu_global_state_bh, &per_cpu(rcu_cpudata_bh, cpu), 1);
	} else {
		/*
		 * We are interrupting something. Nevertheless - check if we should collect
		 * rcu objects. This can be done from arbitrary context.
		 */
		rcu_state_machine(&rcu_global_state_normal, &per_cpu(rcu_cpudata_normal, cpu), 0);
		rcu_state_machine(&rcu_global_state_bh, &per_cpu(rcu_cpudata_bh, cpu), 0);
	}
}

void rcu_restart_cpu(int cpu)
{
	BUG_ON(per_cpu(rcu_cpudata_normal, cpu).new != NULL);
	BUG_ON(per_cpu(rcu_cpudata_normal, cpu).old != NULL);
	per_cpu(rcu_cpudata_normal, cpu).state = RCU_STATE_DESTROY;

	BUG_ON(per_cpu(rcu_cpudata_bh, cpu).new != NULL);
	BUG_ON(per_cpu(rcu_cpudata_bh, cpu).old != NULL);
	per_cpu(rcu_cpudata_bh, cpu).state = RCU_STATE_DESTROY;
}

/*
 * Invoke the completed RCU callbacks.
 */
static void rcu_do_batch(struct rcu_cpu_dead *rcd)
{
	struct rcu_head *list;
	int i, count;

	if (!rcd->deadqlen)
		return;

	/* step 1: pull up to rcs->batchcount objects */
	BUG_ON(irqs_disabled());
	local_irq_disable();

	if (rcd->deadqlen > rcd->batchcount) {
		struct rcu_head *walk;

		list = rcd->dead;
		count = rcd->batchcount;
		
		walk = rcd->dead;
		for (i=0;i<count;i++)
			walk = walk->next;		
		rcd->dead = walk;

	} else {
		list = rcd->dead;
		count = rcd->deadqlen;

		rcd->dead = NULL;
	}
	rcd->deadqlen -= count;
	BUG_ON(rcd->deadqlen < 0);

	local_irq_enable();

	/* step 2: call the rcu callbacks */

	for (i=0;i<count;i++) {
		struct rcu_head *next;

		next = list->next;
		prefetch(next);
		list->func(list);
		list = next;
	}

	/* step 3: if still entries left, raise the softirq again */
	if (rcd->deadqlen)
		raise_softirq(RCU_SOFTIRQ);
}

static void rcu_process_callbacks(struct softirq_action *unused)
{
	rcu_do_batch(&per_cpu(rcu_cpudata_dead, smp_processor_id()));
}

static void rcu_init_percpu_data(struct rcu_global_state *rgs, struct rcu_cpu_state *rcs)
{
	rcs->new = rcs->old = NULL;
	rcs->newqlen = rcs->oldqlen = 0;
	rcs->state = RCU_STATE_DESTROY;
}

static void __cpuinit rcu_online_cpu(int cpu)
{
	rcu_init_percpu_data(&rcu_global_state_normal, &per_cpu(rcu_cpudata_normal, cpu));
	rcu_init_percpu_data(&rcu_global_state_bh, &per_cpu(rcu_cpudata_bh, cpu));

	per_cpu(rcu_cpudata_dead, cpu).dead = NULL;
	per_cpu(rcu_cpudata_dead, cpu).deadqlen = 0;
	per_cpu(rcu_cpudata_dead, cpu).batchcount = RCU_BATCH_MIN;

	open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
}

static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
				unsigned long action, void *hcpu)
{
	long cpu = (long)hcpu;

	switch (action) {
	case CPU_UP_PREPARE:
	case CPU_UP_PREPARE_FROZEN:
		rcu_online_cpu(cpu);
		break;
	case CPU_DEAD:
	case CPU_DEAD_FROZEN:
		rcu_offline_cpu(cpu);
		break;
	default:
		break;
	}
	return NOTIFY_OK;
}

static struct notifier_block __cpuinitdata rcu_nb = {
	.notifier_call	= rcu_cpu_notify,
};

/*
 * Initializes rcu mechanism.  Assumed to be called early.
 * That is before local timer(SMP) or jiffie timer (uniproc) is setup.
 * Note that rcu_qsctr and friends are implicitly
 * initialized due to the choice of ``0'' for RCU_CTR_INVALID.
 */
void __init __rcu_init(void)
{
	rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE,
			(void *)(long)smp_processor_id());
	/* Register notifier for non-boot CPUs */
	register_cpu_notifier(&rcu_nb);
}

module_param(qlowmark, int, 0);



[-- Attachment #3: rcuclassic.h --]
[-- Type: text/plain, Size: 5396 bytes --]

/*
 * Read-Copy Update mechanism for mutual exclusion (classic version)
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 * Copyright IBM Corporation, 2001
 *
 * Author: Dipankar Sarma <dipankar@in.ibm.com>
 *
 * Based on the original work by Paul McKenney <paulmck@us.ibm.com>
 * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
 * Papers:
 * http://www.rdrop.com/users/paulmck/paper/rclockpdcsproof.pdf
 * http://lse.sourceforge.net/locking/rclock_OLS.2001.05.01c.sc.pdf (OLS2001)
 *
 * For detailed explanation of Read-Copy Update mechanism see -
 * 		Documentation/RCU
 *
 * Rewrite based on a global state machine
 * (C) Manfred Spraul <manfred@colorfullife.com>, 2008
 */

#ifndef __LINUX_RCUCLASSIC_H
#define __LINUX_RCUCLASSIC_H

#include <linux/cache.h>
#include <linux/spinlock.h>
#include <linux/threads.h>
#include <linux/percpu.h>
#include <linux/cpumask.h>
#include <linux/seqlock.h>
#include <linux/cpumask.h>

/*
 * cpu bitmask:
 * default implementation, flat without hierarchy, not optimized for UP.
 */

struct rcu_cpumask {
	spinlock_t lock;
	cpumask_t cpus;
} ____cacheline_internodealigned_in_smp;

#define __RCU_CPUMASK_INIT(ptr) { .lock = __SPIN_LOCK_UNLOCKED(&(ptr)->lock) }

/*
 * global state machine:
 * - each cpu regularly check the global state and compares it with it's own local state.
 * - if both state do not match, then the cpus do the required work and afterwards
 *   - update their local state
 *   - clear their bit in the cpu bitmask.
 * The state machine is sequence lock protected. It's only read with disabled local interupts.
 * Since all cpus must do something to complete a state change, the current state cannot
 * jump forward by more than one state.
 */

/* RCU_STATE_DESTROY:
 * call callbacks that were registered by call_rcu for the objects in rcu_cpu_state.old
 */
#define RCU_STATE_DESTROY		1
/* RCU_STATE_DESTROY_AND_COLLECT:
 * - call callbacks that were registered by call_rcu for the objects in rcu_cpu_state.old
 * - move the objects from rcu_cpu_state.new to rcu_cpu_state.new
 */
#define RCU_STATE_DESTROY_AND_COLLECT	2
/* RCU_STATE_GRACE
 * - wait for a quiescent state
 */
#define RCU_STATE_GRACE			3

struct rcu_global_state {
	seqlock_t		lock;
	int			state;
	int			start_immediately;
	long			completed;
	struct rcu_cpumask	cpus;
} ____cacheline_internodealigned_in_smp;

struct rcu_cpu_state {

	int state;

	/* new objects, directly from call_rcu().
	 * objects are added LIFO, better for cache hits.
	 * the list are length-based, not NULL-terminated.
	 */
	struct rcu_head *new;	/* new objects */
	struct rcu_head **newtail;
	long            newqlen; 	 /* # of queued callbacks */

	/* objects that are in rcu grace processing. The actual
	 * state depends on rgs->state.
	 */
	struct rcu_head *old;		
	struct rcu_head **oldtail;
	long            oldqlen;
};

struct rcu_cpu_dead {
	/* objects that are scheduled for immediate call of
	 * ->func().
	 * objects are added FIFO, necessary for forward progress.
	 * only one structure for _bh and _normal.
	 */
	struct rcu_head *dead;
	long		deadqlen;

	long		batchcount;
};

DECLARE_PER_CPU(struct rcu_cpu_state, rcu_cpudata_normal);
DECLARE_PER_CPU(struct rcu_cpu_state, rcu_cpudata_bh);
DECLARE_PER_CPU(struct rcu_cpu_dead, rcu_cpudata_dead);

extern long rcu_batches_completed(void);
extern long rcu_batches_completed_bh(void);

extern int rcu_pending(int cpu);
extern int rcu_needs_cpu(int cpu);

#ifdef CONFIG_DEBUG_LOCK_ALLOC
extern struct lockdep_map rcu_lock_map;
# define rcu_read_acquire()	\
			lock_acquire(&rcu_lock_map, 0, 0, 2, 1, _THIS_IP_)
# define rcu_read_release()	lock_release(&rcu_lock_map, 1, _THIS_IP_)
#else
# define rcu_read_acquire()	do { } while (0)
# define rcu_read_release()	do { } while (0)
#endif

#define __rcu_read_lock() \
	do { \
		preempt_disable(); \
		__acquire(RCU); \
		rcu_read_acquire(); \
	} while (0)
#define __rcu_read_unlock() \
	do { \
		rcu_read_release(); \
		__release(RCU); \
		preempt_enable(); \
	} while (0)
#define __rcu_read_lock_bh() \
	do { \
		local_bh_disable(); \
		__acquire(RCU_BH); \
		rcu_read_acquire(); \
	} while (0)
#define __rcu_read_unlock_bh() \
	do { \
		rcu_read_release(); \
		__release(RCU_BH); \
		local_bh_enable(); \
	} while (0)

#define __synchronize_sched() synchronize_rcu()

#define call_rcu_sched(head, func) call_rcu(head, func)

extern void __rcu_init(void);
#define rcu_init_sched()	do { } while (0)
extern void rcu_check_callbacks(int cpu, int user);
extern void rcu_restart_cpu(int cpu);


#define rcu_enter_nohz()	do { } while (0)
#define rcu_exit_nohz()		do { } while (0)

#define rcu_qsctr_inc(cpu)	do { } while (0)
#define rcu_bh_qsctr_inc(cpu)	do { } while (0)

#endif /* __LINUX_RCUCLASSIC_H */

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups
  2008-08-19 10:48         ` Manfred Spraul
@ 2008-08-19 14:03           ` Paul E. McKenney
  2008-08-19 17:16             ` nohz_cpu_mask question (was: Re: [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups) Manfred Spraul
  0 siblings, 1 reply; 17+ messages in thread
From: Paul E. McKenney @ 2008-08-19 14:03 UTC (permalink / raw)
  To: Manfred Spraul
  Cc: linux-kernel, mingo, akpm, oleg, dipankar, rostedt, dvhltc, niv

On Tue, Aug 19, 2008 at 12:48:33PM +0200, Manfred Spraul wrote:
> Hi Paul,
>
> You are beating me: I've just finished a my implementation, it's attached.
> It boots with qemu, rcu torture enabled, both single and 8-cpu.

I started about 2.5 weeks ago, so I probably had a head start.

> Two problems are open:
> - right now, I don't use rcu_qsctr_inc() at all.

Hmmm...  What do you do instead to detect context switches?  Or do you
currently rely completely on scheduling-clock interrupts from idle &c?

> - qlowmark is set to 0, any other value breaks synchronize_rcu().

In my implementation, qlowmark controls only the rate at which callbacks
that have already passed a grace period.  You seem to be using it to
delay start of a grace period until there is a reasonably large amount
of work to do.  My suggestion would be to add a time delay as well,
so that if (say) 2 jiffies have passed since the first callback was
registered via call_rcu(), start the grace-period processing regardless
of how few callbacks there are.

> And I must read your implementation....

Likewise!  I suggest that we each complete our implementations, then
look at what should be taken from each.  Seem reasonable?

							Thanx, Paul

> --
>    Manfred

> /*
>  * Read-Copy Update mechanism for mutual exclusion
>  *
>  * This program is free software; you can redistribute it and/or modify
>  * it under the terms of the GNU General Public License as published by
>  * the Free Software Foundation; either version 2 of the License, or
>  * (at your option) any later version.
>  *
>  * This program is distributed in the hope that it will be useful,
>  * but WITHOUT ANY WARRANTY; without even the implied warranty of
>  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
>  * GNU General Public License for more details.
>  *
>  * You should have received a copy of the GNU General Public License
>  * along with this program; if not, write to the Free Software
>  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
>  *
>  * Copyright IBM Corporation, 2001
>  *
>  * Authors: Dipankar Sarma <dipankar@in.ibm.com>
>  *	    Manfred Spraul <manfred@colorfullife.com>
>  *
>  * Based on the original work by Paul McKenney <paulmck@us.ibm.com>
>  * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
>  * Papers:
>  * http://www.rdrop.com/users/paulmck/paper/rclockpdcsproof.pdf
>  * http://lse.sourceforge.net/locking/rclock_OLS.2001.05.01c.sc.pdf (OLS2001)
>  *
>  * For detailed explanation of Read-Copy Update mechanism see -
>  * 		Documentation/RCU
>  *
>  * Rewrite based on a global state machine
>  * (C) Manfred Spraul <manfred@colorfullife.com>, 2008
>  *
>  */
> #include <linux/types.h>
> #include <linux/kernel.h>
> #include <linux/init.h>
> #include <linux/spinlock.h>
> #include <linux/smp.h>
> #include <linux/rcupdate.h>
> #include <linux/interrupt.h>
> #include <linux/sched.h>
> #include <asm/atomic.h>
> #include <linux/bitops.h>
> #include <linux/module.h>
> #include <linux/completion.h>
> #include <linux/moduleparam.h>
> #include <linux/percpu.h>
> #include <linux/notifier.h>
> #include <linux/cpu.h>
> #include <linux/mutex.h>
> #include <linux/time.h>
> #include <linux/proc_fs.h>
> 
> #ifdef CONFIG_DEBUG_LOCK_ALLOC
> static struct lock_class_key rcu_lock_key;
> struct lockdep_map rcu_lock_map =
> 	STATIC_LOCKDEP_MAP_INIT("rcu_read_lock", &rcu_lock_key);
> EXPORT_SYMBOL_GPL(rcu_lock_map);
> #endif
> 
> /* Definition for rcupdate control block. */
> static struct rcu_global_state rcu_global_state_normal = {
> 	.lock = __SEQLOCK_UNLOCKED(&rcu_global_state_normal.lock),
> 	.state = RCU_STATE_DESTROY,
> 	.start_immediately = 0,
> 	.cpus = __RCU_CPUMASK_INIT(&rcu_global_state_normal.cpus)
> };
> 
> static struct rcu_global_state rcu_global_state_bh = {
> 	.lock = __SEQLOCK_UNLOCKED(&rcu_global_state_bh.lock),
> 	.state = RCU_STATE_DESTROY,
> 	.start_immediately = 0,
> 	.cpus = __RCU_CPUMASK_INIT(&rcu_global_state_bh.cpus)
> };
> 
> DEFINE_PER_CPU(struct rcu_cpu_state, rcu_cpudata_normal) = { 0L };
> DEFINE_PER_CPU(struct rcu_cpu_state, rcu_cpudata_bh) = { 0L };
> DEFINE_PER_CPU(struct rcu_cpu_dead, rcu_cpudata_dead) = { 0L };
> 
> 
> /* FIXME: setting qlowmark to non-zero causes a hang.
>  * probably someone waits for a rcu completion - but
>  * the real rcu cycle is never started because qlowmark is not
>  * reached. (e.g. synchronize_rcu()).
>  * idea: replace with a timer based delay.
>  */
> int qlowmark = 0;
> 
> void rcu_cpumask_init(struct rcu_cpumask *rcm)
> {
> 	BUG_ON(!irqs_disabled());
> 	spin_lock(&rcm->lock);
> 	/*
> 	 * Accessing nohz_cpu_mask before incrementing rcp->cur needs a
> 	 * Barrier  Otherwise it can cause tickless idle CPUs to be
> 	 * included in rcp->cpumask, which will extend graceperiods
> 	 * unnecessarily.
> 	 */
> 	smp_mb();
> 	cpus_andnot(rcm->cpus, cpu_online_map, nohz_cpu_mask);
> 
> 	spin_unlock(&rcm->lock);
> }
> 
> int rcu_cpumask_clear_and_test(struct rcu_cpumask *rcm, int cpu)
> {
> 	int ret = 0;
> 
> 	BUG_ON(!irqs_disabled());
> 	spin_lock(&rcm->lock);
> 	cpu_clear(cpu, rcm->cpus);
> 	if (cpus_empty(rcm->cpus))
> 		ret = 1;
> 	spin_unlock(&rcm->lock);
> 
> 	return ret;
> }
> 
> long rcu_batches_completed(void)
> {
> 	return rcu_global_state_normal.completed;
> }
> 
> long rcu_batches_completed_bh(void)
> {
> 	return rcu_global_state_normal.completed;
> }
> 
> /**
>  * rcu_state_startcycle - start the next rcu cycle
>  * @rgs: global rcu state
>  *
>  * The function starts the next rcu cycle, either immediately or
>  * by setting rgs->start_immediately.
>  */ 
> static void rcu_state_startcycle(struct rcu_global_state *rgs)
> {
> 	unsigned seq;
> 	int do_real_start;
> 
> 	BUG_ON(!irqs_disabled());
> 	do {
> 		seq = read_seqbegin(&rgs->lock);
> 		if (rgs->start_immediately == 0) {
> 			do_real_start = 1;
> 		} else {
> 			do_real_start = 0;
> 			BUG_ON(rgs->state == RCU_STATE_DESTROY);
> 		}
> 	} while (read_seqretry(&rgs->lock, seq));
> 
> 	if (do_real_start) {
> 		write_seqlock(&rgs->lock);
> 		switch(rgs->state) {
> 		case RCU_STATE_DESTROY_AND_COLLECT:
> 		case RCU_STATE_GRACE:
> 			rgs->start_immediately = 1;
> 			break;
> 		case RCU_STATE_DESTROY:
> 			rgs->state = RCU_STATE_DESTROY_AND_COLLECT;
> 			BUG_ON(rgs->start_immediately);
> 			rcu_cpumask_init(&rgs->cpus);
> 			break;
> 		default:
> 			BUG();
> 		}
> 		write_sequnlock(&rgs->lock);
> 	}
> }
> 
> static void rcu_checkqlen(struct rcu_global_state *rgs, struct rcu_cpu_state *rcs, int inc)
> {
> 	BUG_ON(!irqs_disabled());
> 	rcs->newqlen += inc; 
> 	if (unlikely(rcs->newqlen > qlowmark)) {
> 
> 		/* FIXME: actually, this code only needs to run once,
> 		 *  i.e. when qlen == qlowmark. But: qlowmark can be changed at runtime.
> 		 * and: doesn't work anyway, see comment near qlowmark
> 		 */
> 		rcu_state_startcycle(rgs);
> 	}
> }
> 
> 
> static void __call_rcu(struct rcu_head *head, struct rcu_global_state *rgs,
> 		struct rcu_cpu_state *rcs)
> {
> 	if (rcs->new == NULL)
> 		rcs->newtail = &head->next;
> 	head->next = rcs->new;
> 	rcs->new = head;
> 
> 	rcu_checkqlen(rgs, rcs, 1);
> }
> 
> /**
>  * call_rcu - Queue an RCU callback for invocation after a grace period.
>  * @head: structure to be used for queueing the RCU updates.
>  * @func: actual update function to be invoked after the grace period
>  *
>  * The update function will be invoked some time after a full grace
>  * period elapses, in other words after all currently executing RCU
>  * read-side critical sections have completed.  RCU read-side critical
>  * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
>  * and may be nested.
>  */
> void call_rcu(struct rcu_head *head,
> 				void (*func)(struct rcu_head *rcu))
> {
> 	unsigned long flags;
> 
> 	head->func = func;
> 	local_irq_save(flags);
> 	__call_rcu(head, &rcu_global_state_normal, &__get_cpu_var(rcu_cpudata_normal));
> 	local_irq_restore(flags);
> }
> EXPORT_SYMBOL_GPL(call_rcu);
> 
> /**
>  * call_rcu_bh - Queue an RCU for invocation after a quicker grace period.
>  * @head: structure to be used for queueing the RCU updates.
>  * @func: actual update function to be invoked after the grace period
>  *
>  * The update function will be invoked some time after a full grace
>  * period elapses, in other words after all currently executing RCU
>  * read-side critical sections have completed. call_rcu_bh() assumes
>  * that the read-side critical sections end on completion of a softirq
>  * handler. This means that read-side critical sections in process
>  * context must not be interrupted by softirqs. This interface is to be
>  * used when most of the read-side critical sections are in softirq context.
>  * RCU read-side critical sections are delimited by rcu_read_lock() and
>  * rcu_read_unlock(), * if in interrupt context or rcu_read_lock_bh()
>  * and rcu_read_unlock_bh(), if in process context. These may be nested.
>  */
> void call_rcu_bh(struct rcu_head *head,
> 				void (*func)(struct rcu_head *rcu))
> {
> 	unsigned long flags;
> 
> 	head->func = func;
> 	local_irq_save(flags);
> 	__call_rcu(head, &rcu_global_state_bh, &__get_cpu_var(rcu_cpudata_bh));
> 	local_irq_restore(flags);
> }
> EXPORT_SYMBOL_GPL(call_rcu_bh);
> 
> #ifdef CONFIG_HOTPLUG_CPU
> 
> /**
>  * rcu_bulk_add - bulk add new rcu objects.
>  * @rgs: global rcu state
>  * @rcs: cpu state
>  * @h: linked list of rcu objects.
>  *
>  * Must be called with enabled local interrupts
>  */
> static void rcu_bulk_add(struct rcu_global_state *rgs, struct rcu_cpu_state *rcs, struct rcu_head *h, struct rcu_head **htail, int len)
> {
> 
> 	BUG_ON(irqs_disabled());
> 
> 	if (len > 0) {
> 		local_irq_disable();
> 		if (rcs->new) {
> 			(*htail) = rcs->new;
> 			rcs->new = h;
> 		} else {
> 			rcs->new = h;
> 			rcs->newtail = htail;
> 		}
> 		rcu_checkqlen(rgs, rcs, len);
> 		local_irq_enable();
> 	}
> }
> 
> #define RCU_BATCH_MIN		100
> #define	RCU_BATCH_INCFACTOR	2
> #define RCU_BATCH_DECFACTOR	4
> 
> static void rcu_move_and_raise(struct rcu_cpu_state *rcs)
> {
> 	struct rcu_cpu_dead *rcd = &per_cpu(rcu_cpudata_dead, smp_processor_id());
> 
> 	BUG_ON(!irqs_disabled());
> 
> 	/* update batch limit:
> 	 * - if there are still old entries when new entries are added:
> 	 *   double the batch count.
> 	 * - if there are no old entries: reduce it by 25%, but never below 100.
> 	 */
> 	if (rcd->deadqlen)
> 		rcd->batchcount = rcd->batchcount*RCU_BATCH_INCFACTOR;
> 	 else
> 		rcd->batchcount = rcd->batchcount-rcd->batchcount/RCU_BATCH_DECFACTOR;
> 	if (rcd->batchcount < RCU_BATCH_MIN)
> 		rcd->batchcount = RCU_BATCH_MIN;
> 
> 	if (rcs->oldqlen) {
> 		(*rcs->oldtail) = rcd->dead;
> 		rcd->dead = rcs->old;
> 		rcd->deadqlen += rcs->oldqlen;
> 		rcs->old = NULL;
> 		rcs->oldtail = NULL;
> 		rcs->oldqlen = 0;
> 	} 
> 	BUG_ON(rcs->old);
> 	BUG_ON(rcs->oldtail);
> 	BUG_ON(rcs->oldqlen);
> 	raise_softirq(RCU_SOFTIRQ);
> }
> 
> static void rcu_state_machine(struct rcu_global_state *rgs, struct rcu_cpu_state *rcs, int is_quiet)
> {
> 	int inc_state;
> 	unsigned seq;
> 	unsigned long flags;
> 
> 	inc_state = 0;
> 	do {
> 		seq = read_seqbegin(&rgs->lock);
> 		local_irq_save(flags);
> 		if (rgs->state != rcs->state) {
> 			inc_state = 0;
> 			switch(rgs->state) {
> 			case RCU_STATE_DESTROY:
> 				rcs->state = rgs->state;
> 				rcu_move_and_raise(rcs);
> 				break;
> 			case RCU_STATE_DESTROY_AND_COLLECT:
> 				rcs->state = rgs->state;
> 				rcu_move_and_raise(rcs);
> 				rcs->old = rcs->new;
> 				rcs->oldtail = rcs->newtail;
> 				rcs->oldqlen = rcs->newqlen;
> 				rcs->new = NULL;
> 				rcs->newtail = NULL;
> 				rcs->newqlen = 0;
> 				if (rcu_cpumask_clear_and_test(&rgs->cpus, smp_processor_id()))
> 					inc_state = 1;
> 				break;
> 			case RCU_STATE_GRACE: 
> 				if (is_quiet) {
> 					rcs->state = rgs->state;
> 					if (rcu_cpumask_clear_and_test(&rgs->cpus, smp_processor_id()))
> 						inc_state = 1;
> 				}
> 				break;
> 			default:
> 				BUG();
> 			}
> 		}
> 		local_irq_restore(flags);
> 	} while (read_seqretry(&rgs->lock, seq));
> 
> 	
> 	if (unlikely(inc_state)) {
> 		local_irq_save(flags);
> 		write_seqlock(&rgs->lock);
> 		/*
> 		 * double check for races: If e.g. a new cpu starts up it
> 		 * will call the state machine although it's not listed in the
> 		 * cpumasks. Then multiple cpu could could see the cleared bitmask
> 		 * and try to advance the state. In this case, only the first
> 		 * cpu does something, the remaining incs are ignored.
> 		 */
> 		if (rgs->state == rcs->state) {
> 			/*
> 			 * advance the state machine:
> 			 * - from COLLECT to GRACE
> 			 * - from GRACE to DESTROY/COLLECT
> 			 */
> 			switch(rgs->state) {
> 			case RCU_STATE_DESTROY_AND_COLLECT:
> 				rgs->state = RCU_STATE_GRACE;
> 				rcu_cpumask_init(&rgs->cpus);
> 				break;
> 			case RCU_STATE_GRACE:
> 				rgs->completed++;
> 				if (rgs->start_immediately) {
> 					rgs->state = RCU_STATE_DESTROY_AND_COLLECT;
> 					rcu_cpumask_init(&rgs->cpus);
> 				} else {
> 					rgs->state = RCU_STATE_DESTROY;
> 				}
> 				rgs->start_immediately = 0;
> 				break;
> 			default:
> 				BUG();
> 			}
> 		}
> 		write_sequnlock(&rgs->lock);
> 		local_irq_restore(flags);
> 	}
> }
> 
> static void __rcu_offline_cpu(struct rcu_global_state *rgs, struct rcu_cpu_state *this_rcs,
> 					struct rcu_cpu_state *other_rcs, int cpu)
> {
> 	/* task 1: move all entries from the new cpu into the lists of the current cpu.
> 	 * locking: The other cpu is dead, thus no locks are required.
> 	 *  Thus it's more or less a bulk call_rcu().
> 	 * For the sake of simplicity, all objects are treated as "new", even the objects
> 	 * that are already in old.
> 	 */
> 	rcu_bulk_add(rgs, this_rcs, other_rcs->new, other_rcs->newtail, other_rcs->newqlen);
> 	rcu_bulk_add(rgs, this_rcs, other_rcs->old, other_rcs->oldtail, other_rcs->oldqlen);
> 
> 
> 	/* task 2: handle the cpu bitmask of the other cpu
> 	 * We know that the other cpu is dead, thus it's guaranteed not to be holding
> 	 * any pointers to rcu protected objects.
> 	 */
> 
> 	rcu_state_machine(rgs, other_rcs, 1);
> }
> 
> static void rcu_offline_cpu(int cpu)
> {
> 	struct rcu_cpu_state *this_rcs_normal = &get_cpu_var(rcu_cpudata_normal);
> 	struct rcu_cpu_state *this_rcs_bh = &get_cpu_var(rcu_cpudata_bh);
> 
> 	BUG_ON(irqs_disabled());
> 
> 	__rcu_offline_cpu(&rcu_global_state_normal, this_rcs_normal,
> 					&per_cpu(rcu_cpudata_normal, cpu), cpu);
> 	__rcu_offline_cpu(&rcu_global_state_bh, this_rcs_bh,
> 					&per_cpu(rcu_cpudata_bh, cpu), cpu);
> 	put_cpu_var(rcu_cpudata_normal);
> 	put_cpu_var(rcu_cpudata_bh);
> 
> 	BUG_ON(rcu_needs_cpu(cpu));
> }
> 
> #else
> 
> static void rcu_offline_cpu(int cpu)
> {
> }
> 
> #endif
> 
> static int __rcu_pending(struct rcu_global_state *rgs, struct rcu_cpu_state *rcs)
> {
> 	/* quick and dirty check for pending */
> 	if (rgs->state != rcs->state)
> 		return 1;
> 	return 0;
> }
> 
> /*
>  * Check to see if there is any immediate RCU-related work to be done
>  * by the current CPU, returning 1 if so.  This function is part of the
>  * RCU implementation; it is -not- an exported member of the RCU API.
>  */
> int rcu_pending(int cpu)
> {
> 	return __rcu_pending(&rcu_global_state_normal, &per_cpu(rcu_cpudata_normal, cpu)) ||
> 		__rcu_pending(&rcu_global_state_bh, &per_cpu(rcu_cpudata_bh, cpu));
> }
> 
> /*
>  * Check to see if any future RCU-related work will need to be done
>  * by the current CPU, even if none need be done immediately, returning
>  * 1 if so.  This function is part of the RCU implementation; it is -not-
>  * an exported member of the RCU API.
>  */
> int rcu_needs_cpu(int cpu)
> {
> 	struct rcu_cpu_state *rcs_normal = &per_cpu(rcu_cpudata_normal, cpu);
> 	struct rcu_cpu_state *rcs_bh = &per_cpu(rcu_cpudata_bh, cpu);
> 
> 	return !!rcs_normal->new || !!rcs_normal->old ||
> 		!!rcs_bh->new || !!rcs_bh->old ||
> 		rcu_pending(cpu);
> }
> 
> /**
>  * rcu_check_callback(cpu, user) - external entry point for grace checking
>  * @cpu: cpu id.
>  * @user: user space was interrupted.
>  *
>  * Top-level function driving RCU grace-period detection, normally
>  * invoked from the scheduler-clock interrupt.  This function simply
>  * increments counters that are read only from softirq by this same
>  * CPU, so there are no memory barriers required.
>  *
>  * This function can run with disabled local interrupts, thus all
>  * callees must use local_irq_save()
>  */
> void rcu_check_callbacks(int cpu, int user)
> {
> 	if (user ||
> 	    (idle_cpu(cpu) && !in_softirq() &&
> 				hardirq_count() <= (1 << HARDIRQ_SHIFT))) {
> 
> 		/*
> 		 * Get here if this CPU took its interrupt from user
> 		 * mode or from the idle loop, and if this is not a
> 		 * nested interrupt.  In this case, the CPU is in
> 		 * a quiescent state, so count it.
> 		 *
> 		 */
> 		rcu_state_machine(&rcu_global_state_normal, &per_cpu(rcu_cpudata_normal, cpu), 1);
> 		rcu_state_machine(&rcu_global_state_bh, &per_cpu(rcu_cpudata_bh, cpu), 1);
> 
> 	} else if (!in_softirq()) {
> 
> 		/*
> 		 * Get here if this CPU did not take its interrupt from
> 		 * softirq, in other words, if it is not interrupting
> 		 * a rcu_bh read-side critical section.  This is an _bh
> 		 * critical section, so count it.
> 		 */
> 		rcu_state_machine(&rcu_global_state_normal, &per_cpu(rcu_cpudata_normal, cpu), 0);
> 		rcu_state_machine(&rcu_global_state_bh, &per_cpu(rcu_cpudata_bh, cpu), 1);
> 	} else {
> 		/*
> 		 * We are interrupting something. Nevertheless - check if we should collect
> 		 * rcu objects. This can be done from arbitrary context.
> 		 */
> 		rcu_state_machine(&rcu_global_state_normal, &per_cpu(rcu_cpudata_normal, cpu), 0);
> 		rcu_state_machine(&rcu_global_state_bh, &per_cpu(rcu_cpudata_bh, cpu), 0);
> 	}
> }
> 
> void rcu_restart_cpu(int cpu)
> {
> 	BUG_ON(per_cpu(rcu_cpudata_normal, cpu).new != NULL);
> 	BUG_ON(per_cpu(rcu_cpudata_normal, cpu).old != NULL);
> 	per_cpu(rcu_cpudata_normal, cpu).state = RCU_STATE_DESTROY;
> 
> 	BUG_ON(per_cpu(rcu_cpudata_bh, cpu).new != NULL);
> 	BUG_ON(per_cpu(rcu_cpudata_bh, cpu).old != NULL);
> 	per_cpu(rcu_cpudata_bh, cpu).state = RCU_STATE_DESTROY;
> }
> 
> /*
>  * Invoke the completed RCU callbacks.
>  */
> static void rcu_do_batch(struct rcu_cpu_dead *rcd)
> {
> 	struct rcu_head *list;
> 	int i, count;
> 
> 	if (!rcd->deadqlen)
> 		return;
> 
> 	/* step 1: pull up to rcs->batchcount objects */
> 	BUG_ON(irqs_disabled());
> 	local_irq_disable();
> 
> 	if (rcd->deadqlen > rcd->batchcount) {
> 		struct rcu_head *walk;
> 
> 		list = rcd->dead;
> 		count = rcd->batchcount;
> 		
> 		walk = rcd->dead;
> 		for (i=0;i<count;i++)
> 			walk = walk->next;		
> 		rcd->dead = walk;
> 
> 	} else {
> 		list = rcd->dead;
> 		count = rcd->deadqlen;
> 
> 		rcd->dead = NULL;
> 	}
> 	rcd->deadqlen -= count;
> 	BUG_ON(rcd->deadqlen < 0);
> 
> 	local_irq_enable();
> 
> 	/* step 2: call the rcu callbacks */
> 
> 	for (i=0;i<count;i++) {
> 		struct rcu_head *next;
> 
> 		next = list->next;
> 		prefetch(next);
> 		list->func(list);
> 		list = next;
> 	}
> 
> 	/* step 3: if still entries left, raise the softirq again */
> 	if (rcd->deadqlen)
> 		raise_softirq(RCU_SOFTIRQ);
> }
> 
> static void rcu_process_callbacks(struct softirq_action *unused)
> {
> 	rcu_do_batch(&per_cpu(rcu_cpudata_dead, smp_processor_id()));
> }
> 
> static void rcu_init_percpu_data(struct rcu_global_state *rgs, struct rcu_cpu_state *rcs)
> {
> 	rcs->new = rcs->old = NULL;
> 	rcs->newqlen = rcs->oldqlen = 0;
> 	rcs->state = RCU_STATE_DESTROY;
> }
> 
> static void __cpuinit rcu_online_cpu(int cpu)
> {
> 	rcu_init_percpu_data(&rcu_global_state_normal, &per_cpu(rcu_cpudata_normal, cpu));
> 	rcu_init_percpu_data(&rcu_global_state_bh, &per_cpu(rcu_cpudata_bh, cpu));
> 
> 	per_cpu(rcu_cpudata_dead, cpu).dead = NULL;
> 	per_cpu(rcu_cpudata_dead, cpu).deadqlen = 0;
> 	per_cpu(rcu_cpudata_dead, cpu).batchcount = RCU_BATCH_MIN;
> 
> 	open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
> }
> 
> static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
> 				unsigned long action, void *hcpu)
> {
> 	long cpu = (long)hcpu;
> 
> 	switch (action) {
> 	case CPU_UP_PREPARE:
> 	case CPU_UP_PREPARE_FROZEN:
> 		rcu_online_cpu(cpu);
> 		break;
> 	case CPU_DEAD:
> 	case CPU_DEAD_FROZEN:
> 		rcu_offline_cpu(cpu);
> 		break;
> 	default:
> 		break;
> 	}
> 	return NOTIFY_OK;
> }
> 
> static struct notifier_block __cpuinitdata rcu_nb = {
> 	.notifier_call	= rcu_cpu_notify,
> };
> 
> /*
>  * Initializes rcu mechanism.  Assumed to be called early.
>  * That is before local timer(SMP) or jiffie timer (uniproc) is setup.
>  * Note that rcu_qsctr and friends are implicitly
>  * initialized due to the choice of ``0'' for RCU_CTR_INVALID.
>  */
> void __init __rcu_init(void)
> {
> 	rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE,
> 			(void *)(long)smp_processor_id());
> 	/* Register notifier for non-boot CPUs */
> 	register_cpu_notifier(&rcu_nb);
> }
> 
> module_param(qlowmark, int, 0);
> 
> 

> /*
>  * Read-Copy Update mechanism for mutual exclusion (classic version)
>  *
>  * This program is free software; you can redistribute it and/or modify
>  * it under the terms of the GNU General Public License as published by
>  * the Free Software Foundation; either version 2 of the License, or
>  * (at your option) any later version.
>  *
>  * This program is distributed in the hope that it will be useful,
>  * but WITHOUT ANY WARRANTY; without even the implied warranty of
>  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
>  * GNU General Public License for more details.
>  *
>  * You should have received a copy of the GNU General Public License
>  * along with this program; if not, write to the Free Software
>  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
>  *
>  * Copyright IBM Corporation, 2001
>  *
>  * Author: Dipankar Sarma <dipankar@in.ibm.com>
>  *
>  * Based on the original work by Paul McKenney <paulmck@us.ibm.com>
>  * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
>  * Papers:
>  * http://www.rdrop.com/users/paulmck/paper/rclockpdcsproof.pdf
>  * http://lse.sourceforge.net/locking/rclock_OLS.2001.05.01c.sc.pdf (OLS2001)
>  *
>  * For detailed explanation of Read-Copy Update mechanism see -
>  * 		Documentation/RCU
>  *
>  * Rewrite based on a global state machine
>  * (C) Manfred Spraul <manfred@colorfullife.com>, 2008
>  */
> 
> #ifndef __LINUX_RCUCLASSIC_H
> #define __LINUX_RCUCLASSIC_H
> 
> #include <linux/cache.h>
> #include <linux/spinlock.h>
> #include <linux/threads.h>
> #include <linux/percpu.h>
> #include <linux/cpumask.h>
> #include <linux/seqlock.h>
> #include <linux/cpumask.h>
> 
> /*
>  * cpu bitmask:
>  * default implementation, flat without hierarchy, not optimized for UP.
>  */
> 
> struct rcu_cpumask {
> 	spinlock_t lock;
> 	cpumask_t cpus;
> } ____cacheline_internodealigned_in_smp;
> 
> #define __RCU_CPUMASK_INIT(ptr) { .lock = __SPIN_LOCK_UNLOCKED(&(ptr)->lock) }
> 
> /*
>  * global state machine:
>  * - each cpu regularly check the global state and compares it with it's own local state.
>  * - if both state do not match, then the cpus do the required work and afterwards
>  *   - update their local state
>  *   - clear their bit in the cpu bitmask.
>  * The state machine is sequence lock protected. It's only read with disabled local interupts.
>  * Since all cpus must do something to complete a state change, the current state cannot
>  * jump forward by more than one state.
>  */
> 
> /* RCU_STATE_DESTROY:
>  * call callbacks that were registered by call_rcu for the objects in rcu_cpu_state.old
>  */
> #define RCU_STATE_DESTROY		1
> /* RCU_STATE_DESTROY_AND_COLLECT:
>  * - call callbacks that were registered by call_rcu for the objects in rcu_cpu_state.old
>  * - move the objects from rcu_cpu_state.new to rcu_cpu_state.new
>  */
> #define RCU_STATE_DESTROY_AND_COLLECT	2
> /* RCU_STATE_GRACE
>  * - wait for a quiescent state
>  */
> #define RCU_STATE_GRACE			3
> 
> struct rcu_global_state {
> 	seqlock_t		lock;
> 	int			state;
> 	int			start_immediately;
> 	long			completed;
> 	struct rcu_cpumask	cpus;
> } ____cacheline_internodealigned_in_smp;
> 
> struct rcu_cpu_state {
> 
> 	int state;
> 
> 	/* new objects, directly from call_rcu().
> 	 * objects are added LIFO, better for cache hits.
> 	 * the list are length-based, not NULL-terminated.
> 	 */
> 	struct rcu_head *new;	/* new objects */
> 	struct rcu_head **newtail;
> 	long            newqlen; 	 /* # of queued callbacks */
> 
> 	/* objects that are in rcu grace processing. The actual
> 	 * state depends on rgs->state.
> 	 */
> 	struct rcu_head *old;		
> 	struct rcu_head **oldtail;
> 	long            oldqlen;
> };
> 
> struct rcu_cpu_dead {
> 	/* objects that are scheduled for immediate call of
> 	 * ->func().
> 	 * objects are added FIFO, necessary for forward progress.
> 	 * only one structure for _bh and _normal.
> 	 */
> 	struct rcu_head *dead;
> 	long		deadqlen;
> 
> 	long		batchcount;
> };
> 
> DECLARE_PER_CPU(struct rcu_cpu_state, rcu_cpudata_normal);
> DECLARE_PER_CPU(struct rcu_cpu_state, rcu_cpudata_bh);
> DECLARE_PER_CPU(struct rcu_cpu_dead, rcu_cpudata_dead);
> 
> extern long rcu_batches_completed(void);
> extern long rcu_batches_completed_bh(void);
> 
> extern int rcu_pending(int cpu);
> extern int rcu_needs_cpu(int cpu);
> 
> #ifdef CONFIG_DEBUG_LOCK_ALLOC
> extern struct lockdep_map rcu_lock_map;
> # define rcu_read_acquire()	\
> 			lock_acquire(&rcu_lock_map, 0, 0, 2, 1, _THIS_IP_)
> # define rcu_read_release()	lock_release(&rcu_lock_map, 1, _THIS_IP_)
> #else
> # define rcu_read_acquire()	do { } while (0)
> # define rcu_read_release()	do { } while (0)
> #endif
> 
> #define __rcu_read_lock() \
> 	do { \
> 		preempt_disable(); \
> 		__acquire(RCU); \
> 		rcu_read_acquire(); \
> 	} while (0)
> #define __rcu_read_unlock() \
> 	do { \
> 		rcu_read_release(); \
> 		__release(RCU); \
> 		preempt_enable(); \
> 	} while (0)
> #define __rcu_read_lock_bh() \
> 	do { \
> 		local_bh_disable(); \
> 		__acquire(RCU_BH); \
> 		rcu_read_acquire(); \
> 	} while (0)
> #define __rcu_read_unlock_bh() \
> 	do { \
> 		rcu_read_release(); \
> 		__release(RCU_BH); \
> 		local_bh_enable(); \
> 	} while (0)
> 
> #define __synchronize_sched() synchronize_rcu()
> 
> #define call_rcu_sched(head, func) call_rcu(head, func)
> 
> extern void __rcu_init(void);
> #define rcu_init_sched()	do { } while (0)
> extern void rcu_check_callbacks(int cpu, int user);
> extern void rcu_restart_cpu(int cpu);
> 
> 
> #define rcu_enter_nohz()	do { } while (0)
> #define rcu_exit_nohz()		do { } while (0)
> 
> #define rcu_qsctr_inc(cpu)	do { } while (0)
> #define rcu_bh_qsctr_inc(cpu)	do { } while (0)
> 
> #endif /* __LINUX_RCUCLASSIC_H */


^ permalink raw reply	[flat|nested] 17+ messages in thread

* nohz_cpu_mask question (was: Re: [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups)
  2008-08-19 14:03           ` Paul E. McKenney
@ 2008-08-19 17:16             ` Manfred Spraul
  2008-08-19 17:41               ` Paul E. McKenney
  0 siblings, 1 reply; 17+ messages in thread
From: Manfred Spraul @ 2008-08-19 17:16 UTC (permalink / raw)
  To: paulmck, mingo; +Cc: linux-kernel, akpm, oleg, dipankar, rostedt, dvhltc, niv

Hi all,

can someone explain me nohz_cpu_mask?

If I understand it correctly, it merely means that this cpu does not 
take timer interrupts, correct?
The cpu can still take "real" interupts, e.g. a nic interrupt. The cpu 
could also do softirq processing.

Is that correct? Ingo?

If nohz cpus can take normal interrupts, then it would be wrong to 
exclude these cpus from the mask of cpus that must pass a quiescent 
cycle - a softirq could hold on a pointers.

--
    Manfred

^ permalink raw reply	[flat|nested] 17+ messages in thread

* Re: nohz_cpu_mask question (was: Re: [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups)
  2008-08-19 17:16             ` nohz_cpu_mask question (was: Re: [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups) Manfred Spraul
@ 2008-08-19 17:41               ` Paul E. McKenney
  0 siblings, 0 replies; 17+ messages in thread
From: Paul E. McKenney @ 2008-08-19 17:41 UTC (permalink / raw)
  To: Manfred Spraul
  Cc: mingo, linux-kernel, akpm, oleg, dipankar, rostedt, dvhltc, niv

On Tue, Aug 19, 2008 at 07:16:32PM +0200, Manfred Spraul wrote:
> Hi all,
>
> can someone explain me nohz_cpu_mask?
>
> If I understand it correctly, it merely means that this cpu does not take 
> timer interrupts, correct?
> The cpu can still take "real" interupts, e.g. a nic interrupt. The cpu 
> could also do softirq processing.

Yep, that is correct.

> Is that correct? Ingo?
>
> If nohz cpus can take normal interrupts, then it would be wrong to exclude 
> these cpus from the mask of cpus that must pass a quiescent cycle - a 
> softirq could hold on a pointers.

That is indeed the approach that preemptable RCU takes, and
I am considering making classic RCU also take this approach.
The main advantage is that sleeping CPUs need not be awakened
unless it is participating directly in the RCU grace period (as in
it has some callbacks).  See rcu_irq_enter() and rcu_irq_exit() in
kernel/rcupreempt.c as well as rcu_enter_nohz() and rcu_exit_nohz()
in include/linux/rcupreempt.h.  And http://lwn.net/Articles/279077/
for the proof of correctness.  ;-)

Instead, classic RCU prevents any CPU from going into nohz mode in the
first place if RCU is waiting on it.  I am not completely confident
in this algorithm, which is one reason that I am looking to switch to
the preemptable RCU approach.  The other reason is power efficiency --
after all, why bother waking up a CPU just to have it tell RCU that it
is sitting idle?

							Thanx, Paul

^ permalink raw reply	[flat|nested] 17+ messages in thread

end of thread, other threads:[~2008-08-19 17:41 UTC | newest]

Thread overview: 17+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2008-08-05 16:21 [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups Paul E. McKenney
2008-08-05 16:48 ` Steven Rostedt
2008-08-05 17:40   ` Paul E. McKenney
2008-08-06  5:30 ` Manfred Spraul
2008-08-07  3:18   ` Paul E. McKenney
2008-08-18  9:13     ` Manfred Spraul
2008-08-18 14:04       ` Paul E. McKenney
2008-08-19 10:48         ` Manfred Spraul
2008-08-19 14:03           ` Paul E. McKenney
2008-08-19 17:16             ` nohz_cpu_mask question (was: Re: [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups) Manfred Spraul
2008-08-19 17:41               ` Paul E. McKenney
2008-08-15 14:09 ` [PATCH tip/core/rcu] classic RCU locking and memory-barrier cleanups Ingo Molnar
2008-08-15 14:24   ` Ingo Molnar
2008-08-15 14:56     ` Ingo Molnar
2008-08-15 14:58     ` Paul E. McKenney
2008-08-17 14:37     ` [PATCH tip/core/rcu] classic RCU locking cleanup fix lockdep problem Paul E. McKenney
2008-08-17 15:38       ` Ingo Molnar

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox