All of lore.kernel.org
 help / color / mirror / Atom feed
From: "Paul E. McKenney" <paulmck@linux.vnet.ibm.com>
To: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Cc: linux-kernel@vger.kernel.org, srivatsa.bhat@linux.vnet.ibm.com,
	mingo@elte.hu, laijs@cn.fujitsu.com, dipankar@in.ibm.com,
	akpm@linux-foundation.org, josh@joshtriplett.org, niv@us.ibm.com,
	tglx@linutronix.de, peterz@infradead.org, rostedt@goodmis.org,
	Valdis.Kletnieks@vt.edu, dhowells@redhat.com,
	eric.dumazet@gmail.com, darren@dvhart.com, fweisbec@gmail.com,
	patches@linaro.org
Subject: Re: [PATCH RFC] rcu: Make rcu_barrier() less disruptive
Date: Thu, 15 Mar 2012 11:21:59 -0700	[thread overview]
Message-ID: <20120315182159.GJ2381@linux.vnet.ibm.com> (raw)
In-Reply-To: <20120315174527.GA775@Krystal>

On Thu, Mar 15, 2012 at 01:45:27PM -0400, Mathieu Desnoyers wrote:
> * Paul E. McKenney (paulmck@linux.vnet.ibm.com) wrote:
> > The rcu_barrier() primitive interrupts each and every CPU, registering
> > a callback on every CPU.  Once all of these callbacks have been invoked,
> > rcu_barrier() knows that every callback that was registered before
> > the call to rcu_barrier() has also been invoked.
> > 
> > However, there is no point in registering a callback on a CPU that
> > currently has no callbacks, most especially if that CPU is in a
> > deep idle state.  This commit therefore makes rcu_barrier() avoid
> > interrupting CPUs that have no callbacks.  Doing this requires reworking
> > the handling of orphaned callbacks, otherwise callbacks could slip through
> > rcu_barrier()'s net by being orphaned from a CPU that rcu_barrier() had
> > not yet interrupted to a CPU that rcu_barrier() had already interrupted.
> > This reworking was needed anyway to take a first step towards weaning
> > RCU from the CPU_DYING notifier's use of stop_cpu().
> 
> Quoting Documentation/RCU/rcubarrier.txt:
> 
> "We instead need the rcu_barrier() primitive. This primitive is similar
> to synchronize_rcu(), but instead of waiting solely for a grace
> period to elapse, it also waits for all outstanding RCU callbacks to
> complete. Pseudo-code using rcu_barrier() is as follows:"
> 
> The patch you propose seems like a good approach to make rcu_barrier
> less disruptive, but everyone need to be aware that rcu_barrier() would
> quit having the side-effect of doing the equivalent of
> "synchronize_rcu()" from now on: within this new approach, in the case
> where there are no pending callbacks, rcu_barrier() could, AFAIU, return
> without waiting for the current grace period to complete.
> 
> Any use of rcu_barrier() that would assume that a synchronize_rcu() is
> implicit with the rcu_barrier() execution would be a bug anyway, but
> those might only show up after this patch is applied. I would therefore
> recommend to audit all rcu_barrier() users to ensure none is expecting
> rcu_barrier to act as a synchronize_rcu before pushing this change.

Good catch!

I am going to chicken out and explicitly wait for a grace period if there
were no callbacks.  Having rcu_barrier() very rarely be a quick no-op does
sound like a standing invitation for subtle non-reproducible bugs.  ;-)

							Thanx, Paul

> Thanks,
> 
> Mathieu
> 
> > 
> > Signed-off-by: Paul E. McKenney <paul.mckenney@linaro.org>
> > Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
> > 
> > diff --git a/kernel/rcutree.c b/kernel/rcutree.c
> > index 403306b..8269656 100644
> > --- a/kernel/rcutree.c
> > +++ b/kernel/rcutree.c
> > @@ -75,6 +75,8 @@ static struct lock_class_key rcu_node_class[NUM_RCU_LVLS];
> >  	.gpnum = -300, \
> >  	.completed = -300, \
> >  	.onofflock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.onofflock), \
> > +	.orphan_nxttail = &structname##_state.orphan_nxtlist, \
> > +	.orphan_donetail = &structname##_state.orphan_donelist, \
> >  	.fqslock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.fqslock), \
> >  	.n_force_qs = 0, \
> >  	.n_force_qs_ngp = 0, \
> > @@ -145,6 +147,13 @@ static void invoke_rcu_callbacks(struct rcu_state *rsp, struct rcu_data *rdp);
> >  unsigned long rcutorture_testseq;
> >  unsigned long rcutorture_vernum;
> >  
> > +/* State information for rcu_barrier() and friends. */
> > +
> > +static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
> > +static atomic_t rcu_barrier_cpu_count;
> > +static DEFINE_MUTEX(rcu_barrier_mutex);
> > +static DECLARE_WAIT_QUEUE_HEAD(rcu_barrier_wq);
> > +
> >  /*
> >   * Return true if an RCU grace period is in progress.  The ACCESS_ONCE()s
> >   * permit this function to be invoked without holding the root rcu_node
> > @@ -1311,7 +1320,60 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
> >  #ifdef CONFIG_HOTPLUG_CPU
> >  
> >  /*
> > - * Move a dying CPU's RCU callbacks to online CPU's callback list.
> > + * Adopt the RCU callbacks from the specified rcu_state structure's
> > + * orphanage.  The caller must hold the ->onofflock.
> > + */
> > +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
> > +{
> > +	int i;
> > +	struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);
> > +
> > +	/*
> > +	 * If there is an rcu_barrier() operation in progress, then
> > +	 * only the task doing that operation is permitted to adopt
> > +	 * callbacks.  To do otherwise breaks rcu_barrier() and friends
> > +	 * by causing them to fail to wait for the callbacks in the
> > +	 * orphanage.
> > +	 */
> > +	if (rsp->rcu_barrier_in_progress &&
> > +	    rsp->rcu_barrier_in_progress != current)
> > +		return;
> > +
> > +	/* Do the accounting first. */
> > +	rdp->qlen_lazy += rsp->qlen_lazy;
> > +	rdp->qlen += rsp->qlen;
> > +	rdp->n_cbs_adopted += rsp->qlen;
> > +	rsp->qlen_lazy = 0;
> > +	rsp->qlen = 0;
> > +
> > +	/*
> > +	 * We do not need a memory barrier here because the only way we
> > +	 * can get here if there is an rcu_barrier() in flight is if
> > +	 * we are the task doing the rcu_barrier().
> > +	 */
> > +
> > +	/* First adopt the ready-to-invoke callbacks. */
> > +	if (rsp->orphan_donelist != NULL) {
> > +		*rsp->orphan_donetail = *rdp->nxttail[RCU_DONE_TAIL];
> > +		*rdp->nxttail[RCU_DONE_TAIL] = rsp->orphan_donelist;
> > +		for (i = RCU_NEXT_SIZE - 1; i >= RCU_DONE_TAIL; i--)
> > +			if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL])
> > +				rdp->nxttail[i] = rsp->orphan_donetail;
> > +		rsp->orphan_donelist = NULL;
> > +		rsp->orphan_donetail = &rsp->orphan_donelist;
> > +	}
> > +
> > +	/* And then adopt the callbacks that still need a grace period. */
> > +	if (rsp->orphan_nxtlist != NULL) {
> > +		*rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxtlist;
> > +		rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxttail;
> > +		rsp->orphan_nxtlist = NULL;
> > +		rsp->orphan_nxttail = &rsp->orphan_nxtlist;
> > +	}
> > +}
> > +
> > +/*
> > + * Move a dying CPU's RCU callbacks to the rcu_state structure's orphanage.
> >   * Also record a quiescent state for this CPU for the current grace period.
> >   * Synchronization and interrupt disabling are not required because
> >   * this function executes in stop_machine() context.  Therefore, cleanup
> > @@ -1325,64 +1387,67 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
> >  static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
> >  {
> >  	int i;
> > +	unsigned long flags;
> >  	unsigned long mask;
> > -	int receive_cpu = cpumask_any(cpu_online_mask);
> > +	bool orphaned = 0;
> >  	struct rcu_data *rdp = this_cpu_ptr(rsp->rda);
> > -	struct rcu_data *receive_rdp = per_cpu_ptr(rsp->rda, receive_cpu);
> >  	RCU_TRACE(struct rcu_node *rnp = rdp->mynode); /* For dying CPU. */
> >  
> > -	/* First, adjust the counts. */
> > +	/* Move the callbacks to the orphanage under ->onofflock protection. */
> > +	raw_spin_lock_irqsave(&rsp->onofflock, flags);
> > +
> > +	/* First adjust the counts. */
> >  	if (rdp->nxtlist != NULL) {
> > -		receive_rdp->qlen_lazy += rdp->qlen_lazy;
> > -		receive_rdp->qlen += rdp->qlen;
> > +		rsp->qlen_lazy += rdp->qlen_lazy;
> > +		rsp->qlen += rdp->qlen;
> > +		rdp->n_cbs_orphaned += rdp->qlen;
> >  		rdp->qlen_lazy = 0;
> >  		rdp->qlen = 0;
> > +		orphaned = 1;
> >  	}
> >  
> >  	/*
> > -	 * Next, move ready-to-invoke callbacks to be invoked on some
> > -	 * other CPU.  These will not be required to pass through another
> > -	 * grace period:  They are done, regardless of CPU.
> > +	 * Next, move those callbacks still needing a grace period to
> > +	 * the orphanage, where some other CPU will pick them up.
> > +	 * Some of the callbacks might have gone partway through a grace
> > +	 * period, but that is too bad.  They get to start over because we
> > +	 * cannot assume that grace periods are synchronized across CPUs.
> > +	 * We don't bother updating the ->nxttail[] array yet, instead
> > +	 * we just reset the whole thing later on.
> >  	 */
> > -	if (rdp->nxtlist != NULL &&
> > -	    rdp->nxttail[RCU_DONE_TAIL] != &rdp->nxtlist) {
> > -		struct rcu_head *oldhead;
> > -		struct rcu_head **oldtail;
> > -		struct rcu_head **newtail;
> > -
> > -		oldhead = rdp->nxtlist;
> > -		oldtail = receive_rdp->nxttail[RCU_DONE_TAIL];
> > -		rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL];
> > -		*rdp->nxttail[RCU_DONE_TAIL] = *oldtail;
> > -		*receive_rdp->nxttail[RCU_DONE_TAIL] = oldhead;
> > -		newtail = rdp->nxttail[RCU_DONE_TAIL];
> > -		for (i = RCU_DONE_TAIL; i < RCU_NEXT_SIZE; i++) {
> > -			if (receive_rdp->nxttail[i] == oldtail)
> > -				receive_rdp->nxttail[i] = newtail;
> > -			if (rdp->nxttail[i] == newtail)
> > -				rdp->nxttail[i] = &rdp->nxtlist;
> > -		}
> > +	if (*rdp->nxttail[RCU_DONE_TAIL] != NULL) {
> > +		*rsp->orphan_nxttail = *rdp->nxttail[RCU_DONE_TAIL];
> > +		rsp->orphan_nxttail = rdp->nxttail[RCU_NEXT_TAIL];
> > +		*rdp->nxttail[RCU_DONE_TAIL] = NULL;
> >  	}
> >  
> >  	/*
> > -	 * Finally, put the rest of the callbacks at the end of the list.
> > -	 * The ones that made it partway through get to start over:  We
> > -	 * cannot assume that grace periods are synchronized across CPUs.
> > -	 * (We could splice RCU_WAIT_TAIL into RCU_NEXT_READY_TAIL, but
> > -	 * this does not seem compelling.  Not yet, anyway.)
> > +	 * Then move the ready-to-invoke callbacks to the orphanage,
> > +	 * where some other CPU will pick them up.  These will not be
> > +	 * required to pass though another grace period: They are done.
> >  	 */
> >  	if (rdp->nxtlist != NULL) {
> > -		*receive_rdp->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist;
> > -		receive_rdp->nxttail[RCU_NEXT_TAIL] =
> > -				rdp->nxttail[RCU_NEXT_TAIL];
> > -		receive_rdp->n_cbs_adopted += rdp->qlen;
> > -		rdp->n_cbs_orphaned += rdp->qlen;
> > -
> > -		rdp->nxtlist = NULL;
> > -		for (i = 0; i < RCU_NEXT_SIZE; i++)
> > -			rdp->nxttail[i] = &rdp->nxtlist;
> > +		*rsp->orphan_donetail = rdp->nxtlist;
> > +		rsp->orphan_donetail = rdp->nxttail[RCU_DONE_TAIL];
> >  	}
> >  
> > +	/* Finally, initialize the rcu_data structure's list to empty.  */
> > +	rdp->nxtlist = NULL;
> > +	for (i = 0; i < RCU_NEXT_SIZE; i++)
> > +		rdp->nxttail[i] = &rdp->nxtlist;
> > +
> > +	/*
> > +	 * Wait up the rcu_barrier() task if there is one and if we
> > +	 * actually sent anything to the orphanage.  Except that we
> > +	 * must delay the wakeup until ->onofflock is released to
> > +	 * avoid deadlock.
> > +	 */
> > +	if (!rsp->rcu_barrier_in_progress)
> > +		orphaned = 0;
> > +	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> > +	if (orphaned)
> > +		wake_up(&rcu_barrier_wq);
> > +
> >  	/*
> >  	 * Record a quiescent state for the dying CPU.  This is safe
> >  	 * only because we have already cleared out the callbacks.
> > @@ -1415,11 +1480,14 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
> >  	rcu_stop_cpu_kthread(cpu);
> >  	rcu_node_kthread_setaffinity(rnp, -1);
> >  
> > -	/* Remove the dying CPU from the bitmasks in the rcu_node hierarchy. */
> > +	/* Remove the dead CPU from the bitmasks in the rcu_node hierarchy. */
> >  
> >  	/* Exclude any attempts to start a new grace period. */
> >  	raw_spin_lock_irqsave(&rsp->onofflock, flags);
> >  
> > +	/* Collect the dead CPU's callbacks. */
> > +	rcu_adopt_orphan_cbs(rsp);
> > +
> >  	/* Remove the outgoing CPU from the masks in the rcu_node hierarchy. */
> >  	mask = rdp->grpmask;	/* rnp->grplo is constant. */
> >  	do {
> > @@ -1456,6 +1524,10 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
> >  
> >  #else /* #ifdef CONFIG_HOTPLUG_CPU */
> >  
> > +static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
> > +{
> > +}
> > +
> >  static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
> >  {
> >  }
> > @@ -1524,9 +1596,6 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
> >  			    rcu_is_callbacks_kthread());
> >  
> >  	/* Update count, and requeue any remaining callbacks. */
> > -	rdp->qlen_lazy -= count_lazy;
> > -	rdp->qlen -= count;
> > -	rdp->n_cbs_invoked += count;
> >  	if (list != NULL) {
> >  		*tail = rdp->nxtlist;
> >  		rdp->nxtlist = list;
> > @@ -1536,6 +1605,10 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
> >  			else
> >  				break;
> >  	}
> > +	smp_mb(); /* List handling before counting for rcu_barrier(). */
> > +	rdp->qlen_lazy -= count_lazy;
> > +	rdp->qlen -= count;
> > +	rdp->n_cbs_invoked += count;
> >  
> >  	/* Reinstate batch limit if we have worked down the excess. */
> >  	if (rdp->blimit == LONG_MAX && rdp->qlen <= qlowmark)
> > @@ -1824,13 +1897,14 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
> >  	rdp = this_cpu_ptr(rsp->rda);
> >  
> >  	/* Add the callback to our list. */
> > -	*rdp->nxttail[RCU_NEXT_TAIL] = head;
> > -	rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
> >  	rdp->qlen++;
> >  	if (lazy)
> >  		rdp->qlen_lazy++;
> >  	else
> >  		rcu_idle_count_callbacks_posted();
> > +	smp_mb();  /* Count before adding callback for rcu_barrier(). */
> > +	*rdp->nxttail[RCU_NEXT_TAIL] = head;
> > +	rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
> >  
> >  	if (__is_kfree_rcu_offset((unsigned long)func))
> >  		trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func,
> > @@ -2169,15 +2243,10 @@ static int rcu_cpu_has_callbacks(int cpu)
> >  	       rcu_preempt_cpu_has_callbacks(cpu);
> >  }
> >  
> > -static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
> > -static atomic_t rcu_barrier_cpu_count;
> > -static DEFINE_MUTEX(rcu_barrier_mutex);
> > -static struct completion rcu_barrier_completion;
> > -
> >  static void rcu_barrier_callback(struct rcu_head *notused)
> >  {
> >  	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
> > -		complete(&rcu_barrier_completion);
> > +		wake_up(&rcu_barrier_wq);
> >  }
> >  
> >  /*
> > @@ -2203,27 +2272,121 @@ static void _rcu_barrier(struct rcu_state *rsp,
> >  			 void (*call_rcu_func)(struct rcu_head *head,
> >  					       void (*func)(struct rcu_head *head)))
> >  {
> > -	BUG_ON(in_interrupt());
> > +	int cpu;
> > +	unsigned long flags;
> > +	struct rcu_data *rdp;
> > +	struct rcu_head rh;
> > +
> > +	init_rcu_head_on_stack(&rh);
> > +
> >  	/* Take mutex to serialize concurrent rcu_barrier() requests. */
> >  	mutex_lock(&rcu_barrier_mutex);
> > -	init_completion(&rcu_barrier_completion);
> > +
> > +	smp_mb();  /* Prevent any prior operations from leaking in. */
> > +
> >  	/*
> > -	 * Initialize rcu_barrier_cpu_count to 1, then invoke
> > -	 * rcu_barrier_func() on each CPU, so that each CPU also has
> > -	 * incremented rcu_barrier_cpu_count.  Only then is it safe to
> > -	 * decrement rcu_barrier_cpu_count -- otherwise the first CPU
> > -	 * might complete its grace period before all of the other CPUs
> > -	 * did their increment, causing this function to return too
> > -	 * early.  Note that on_each_cpu() disables irqs, which prevents
> > -	 * any CPUs from coming online or going offline until each online
> > -	 * CPU has queued its RCU-barrier callback.
> > +	 * Initialize the count to one rather than to zero in order to
> > +	 * avoid a too-soon return to zero in case of a short grace period
> > +	 * (or preemption of this task).  Also flag this task as doing
> > +	 * an rcu_barrier().  This will prevent anyone else from adopting
> > +	 * orphaned callbacks, which could cause otherwise failure if a
> > +	 * CPU went offline and quickly came back online.  To see this,
> > +	 * consider the following sequence of events:
> > +	 *
> > +	 * 1.	We cause CPU 0 to post an rcu_barrier_callback() callback.
> > +	 * 2.	CPU 1 goes offline, orphaning its callbacks.
> > +	 * 3.	CPU 0 adopts CPU 1's orphaned callbacks.
> > +	 * 4.	CPU 1 comes back online.
> > +	 * 5.	We cause CPU 1 to post an rcu_barrier_callback() callback.
> > +	 * 6.	Both rcu_barrier_callback() callbacks are invoked, awakening
> > +	 *	us -- but before CPU 1's orphaned callbacks are invoked!!!
> >  	 */
> >  	atomic_set(&rcu_barrier_cpu_count, 1);
> > -	on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
> > -	if (atomic_dec_and_test(&rcu_barrier_cpu_count))
> > -		complete(&rcu_barrier_completion);
> > -	wait_for_completion(&rcu_barrier_completion);
> > +	raw_spin_lock_irqsave(&rsp->onofflock, flags);
> > +	rsp->rcu_barrier_in_progress = current;
> > +	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> > +
> > +	/*
> > +	 * Force every CPU with callbacks to register a new callback
> > +	 * that will tell us when all the preceding callbacks have
> > +	 * been invoked.  If an offline CPU has callbacks, wait for
> > +	 * it to either come back online or to finish orphaning those
> > +	 * callbacks.
> > +	 */
> > +	for_each_possible_cpu(cpu) {
> > +		preempt_disable();
> > +		rdp = per_cpu_ptr(rsp->rda, cpu);
> > +		if (cpu_is_offline(cpu)) {
> > +			preempt_enable();
> > +			while (cpu_is_offline(cpu) &&
> > +			       ACCESS_ONCE(rdp->qlen))
> > +				schedule_timeout_interruptible(1);
> > +		} else if (ACCESS_ONCE(rdp->qlen)) {
> > +			smp_call_function_single(cpu, rcu_barrier_func,
> > +						 (void *)call_rcu_func, 1);
> > +			preempt_enable();
> > +		}
> > +	}
> > +
> > +	/*
> > +	 * Force any ongoing CPU-hotplug operations to complete,
> > +	 * so that any callbacks from the outgoing CPUs are now in
> > +	 * the orphanage.
> > +	 */
> > +	cpu_maps_update_begin();
> > +	cpu_maps_update_done();
> > +
> > +	/*
> > +	 * Now that all online CPUs have rcu_barrier_callback() callbacks
> > +	 * posted, we can adopt all of the orphaned callbacks and place
> > +	 * an rcu_barrier_callback() callback after them.  When that is done,
> > +	 * we are guaranteed to have an rcu_barrier_callback() callback
> > +	 * following every callback that could possibly have been
> > +	 * registered before _rcu_barrier() was called.
> > +	 */
> > +	raw_spin_lock_irqsave(&rsp->onofflock, flags);
> > +	rcu_adopt_orphan_cbs(rsp);
> > +	atomic_inc(&rcu_barrier_cpu_count);
> > +	call_rcu_func(&rh, rcu_barrier_callback);
> > +	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> > +
> > +	/*
> > +	 * Now that we have an rcu_barrier_callback() callback on each
> > +	 * CPU, and thus each counted, remove the initial count.
> > +	 */
> > +	atomic_dec(&rcu_barrier_cpu_count);
> > +	smp_mb__after_atomic_dec();
> > +
> > +	/*
> > +	 * Loop waiting for all rcu_barrier_callback() callbacks to be
> > +	 * invoked.  Adopt any orphaned callbacks in the meantime, just
> > +	 * in case one of the rcu_barrier_callback() callbacks is orphaned.
> > +	 */
> > +	while (atomic_read(&rcu_barrier_cpu_count) > 0) {
> > +		wait_event(rcu_barrier_wq,
> > +			   atomic_read(&rcu_barrier_cpu_count) == 0 ||
> > +			   ACCESS_ONCE(rsp->qlen));
> > +		if (ACCESS_ONCE(rsp->qlen)) {
> > +			raw_spin_lock_irqsave(&rsp->onofflock, flags);
> > +			rcu_adopt_orphan_cbs(rsp);
> > +			raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> > +		}
> > +	}
> > +
> > +	/*
> > +	 * Done, so let others adopt orphaned callbacks.  But avoid
> > +	 * indefinite postponement of any additional orphans by adopting
> > +	 * one more time.
> > +	 */
> > +	raw_spin_lock_irqsave(&rsp->onofflock, flags);
> > +	rcu_adopt_orphan_cbs(rsp);
> > +	rsp->rcu_barrier_in_progress = NULL;
> > +	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> > +
> > +	/* Other rcu_barrier() invocations can now safely proceed. */
> >  	mutex_unlock(&rcu_barrier_mutex);
> > +
> > +	destroy_rcu_head_on_stack(&rh);
> >  }
> >  
> >  /**
> > diff --git a/kernel/rcutree.h b/kernel/rcutree.h
> > index 36ca28e..1e49c56 100644
> > --- a/kernel/rcutree.h
> > +++ b/kernel/rcutree.h
> > @@ -371,6 +371,17 @@ struct rcu_state {
> >  
> >  	raw_spinlock_t onofflock;		/* exclude on/offline and */
> >  						/*  starting new GP. */
> > +	struct rcu_head *orphan_nxtlist;	/* Orphaned callbacks that */
> > +						/*  need a grace period. */
> > +	struct rcu_head **orphan_nxttail;	/* Tail of above. */
> > +	struct rcu_head *orphan_donelist;	/* Orphaned callbacks that */
> > +						/*  are ready to invoke. */
> > +	struct rcu_head **orphan_donetail;	/* Tail of above. */
> > +	long qlen_lazy;				/* Number of lazy callbacks. */
> > +	long qlen;				/* Total number of callbacks. */
> > +	struct task_struct *rcu_barrier_in_progress;
> > +						/* Task doing rcu_barrier(), */
> > +						/*  or NULL if no barrier. */
> >  	raw_spinlock_t fqslock;			/* Only one task forcing */
> >  						/*  quiescent states. */
> >  	unsigned long jiffies_force_qs;		/* Time at which to invoke */
> > diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
> > index ed459ed..d4bc16d 100644
> > --- a/kernel/rcutree_trace.c
> > +++ b/kernel/rcutree_trace.c
> > @@ -271,13 +271,13 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp)
> >  
> >  	gpnum = rsp->gpnum;
> >  	seq_printf(m, "c=%lu g=%lu s=%d jfq=%ld j=%x "
> > -		      "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu\n",
> > +		      "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld/%ld\n",
> >  		   rsp->completed, gpnum, rsp->fqs_state,
> >  		   (long)(rsp->jiffies_force_qs - jiffies),
> >  		   (int)(jiffies & 0xffff),
> >  		   rsp->n_force_qs, rsp->n_force_qs_ngp,
> >  		   rsp->n_force_qs - rsp->n_force_qs_ngp,
> > -		   rsp->n_force_qs_lh);
> > +		   rsp->n_force_qs_lh, rsp->qlen_lazy, rsp->qlen);
> >  	for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) {
> >  		if (rnp->level != level) {
> >  			seq_puts(m, "\n");
> > 
> 
> -- 
> Mathieu Desnoyers
> Operating System Efficiency R&D Consultant
> EfficiOS Inc.
> http://www.efficios.com
> 


  reply	other threads:[~2012-03-15 18:25 UTC|newest]

Thread overview: 6+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2012-03-15 16:48 [PATCH RFC] rcu: Make rcu_barrier() less disruptive Paul E. McKenney
2012-03-15 17:45 ` Mathieu Desnoyers
2012-03-15 18:21   ` Paul E. McKenney [this message]
2012-03-15 18:31     ` Paul E. McKenney
2012-03-15 18:50       ` Mathieu Desnoyers
2012-03-19 14:23 ` Srivatsa S. Bhat

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20120315182159.GJ2381@linux.vnet.ibm.com \
    --to=paulmck@linux.vnet.ibm.com \
    --cc=Valdis.Kletnieks@vt.edu \
    --cc=akpm@linux-foundation.org \
    --cc=darren@dvhart.com \
    --cc=dhowells@redhat.com \
    --cc=dipankar@in.ibm.com \
    --cc=eric.dumazet@gmail.com \
    --cc=fweisbec@gmail.com \
    --cc=josh@joshtriplett.org \
    --cc=laijs@cn.fujitsu.com \
    --cc=linux-kernel@vger.kernel.org \
    --cc=mathieu.desnoyers@efficios.com \
    --cc=mingo@elte.hu \
    --cc=niv@us.ibm.com \
    --cc=patches@linaro.org \
    --cc=peterz@infradead.org \
    --cc=rostedt@goodmis.org \
    --cc=srivatsa.bhat@linux.vnet.ibm.com \
    --cc=tglx@linutronix.de \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.