All of lore.kernel.org
 help / color / mirror / Atom feed
From: "Paul E. McKenney" <paulmck@linux.vnet.ibm.com>
To: Mike Galbraith <efault@gmx.de>
Cc: LKML <linux-kernel@vger.kernel.org>
Subject: Re: TREE_SRCU slows hotplug by factor ~16
Date: Wed, 26 Apr 2017 07:31:45 -0700	[thread overview]
Message-ID: <20170426143145.GA5683@linux.vnet.ibm.com> (raw)
In-Reply-To: <20170425223642.GA10931@linux.vnet.ibm.com>

On Tue, Apr 25, 2017 at 03:36:42PM -0700, Paul E. McKenney wrote:
> On Mon, Apr 24, 2017 at 09:24:42AM -0700, Paul E. McKenney wrote:
> > On Mon, Apr 24, 2017 at 09:35:03AM +0200, Mike Galbraith wrote:
> > > On Sun, 2017-04-23 at 23:22 -0700, Paul E. McKenney wrote:
> > > 
> > > > Could you please collect an ftrace (or whatever) showing the timestamp
> > > > sequence of calls to synchronize_srcu(), synchronize_srcu_expedited(),
> > > > and call_srcu() during the execution of the stress script?  If it is easy
> > > > to do, also the timestamp sequence of returns from synchronize_srcu()
> > > > and synchronize_srcu_expedited()?
> > > 
> > > The first two minutes below config bits, trace_printk([enter/exit]) in
> > > each function.  If you (unlikely, but..) want the whole trace, holler.
> > > 
> > > # RCU Subsystem
> > > CONFIG_TREE_RCU=y
> > > CONFIG_RCU_EXPERT=y
> > > CONFIG_SRCU=y
> > > # CONFIG_CLASSIC_SRCU is not set
> > > CONFIG_TREE_SRCU=y
> > > # CONFIG_TASKS_RCU is not set
> > > CONFIG_RCU_STALL_COMMON=y
> > > CONFIG_RCU_FANOUT=64
> > > CONFIG_RCU_FANOUT_LEAF=16
> > > # CONFIG_RCU_FAST_NO_HZ is not set
> > > # CONFIG_TREE_RCU_TRACE is not set
> > > CONFIG_RCU_KTHREAD_PRIO=0
> > > CONFIG_RCU_NOCB_CPU=y
> > > CONFIG_RCU_NOCB_CPU_NONE=y
> > > # CONFIG_RCU_NOCB_CPU_ZERO is not set
> > > # CONFIG_RCU_NOCB_CPU_ALL is not set
> > > # RCU Debugging
> > > # CONFIG_PROVE_RCU is not set
> > > # CONFIG_SPARSE_RCU_POINTER is not set
> > > # CONFIG_RCU_PERF_TEST is not set
> > > # CONFIG_RCU_TORTURE_TEST is not set
> > > CONFIG_RCU_CPU_STALL_TIMEOUT=60
> > > # CONFIG_RCU_TRACE is not set
> > > # CONFIG_RCU_EQS_DEBUG is not set
> > 
> > Thank you for the data, extremely helpful!  I now know that I must
> > immediately fix this the hard way rather than trying several simpler
> > possibilities that, according to your trace data, would be complete
> > wastes of time.
> > 
> > My initial calculation was actually optimistic.  There was one
> > exit-to-enter interval at about 80 milliseconds, but the second
> > longest is 342 microseconds, and the vast majority are under 200
> > microseconds.
> > 
> > Back to the drawing board!
> > 
> > This will take some time, but I should have a patch for you in a few days,
> > hopefully sooner.
> 
> Making progress, might have something late tomorrow (Wednesday) Pacific
> Time, will definitely have something Thursday Pacific Time.

And a sneak preview, semi-tested.  If you get a chance to run this, please
let me know now it goes.

This should apply on -tip or on -rcu.  Probably on recent -next as well.

							Thanx, Paul

------------------------------------------------------------------------

diff --git a/Documentation/admin-guide/kernel-parameters.txt b/Documentation/admin-guide/kernel-parameters.txt
index facc20a3f962..4a4b9266c4de 100644
--- a/Documentation/admin-guide/kernel-parameters.txt
+++ b/Documentation/admin-guide/kernel-parameters.txt
@@ -3779,6 +3779,14 @@
 	spia_pedr=
 	spia_peddr=
 
+	srcutree.exp_holdoff [KNL]
+			Specifies how many nanoseconds must elapse
+			since the end of the last SRCU grace period for
+			a given srcu_struct until the next normal SRCU
+			grace period will be considered for automatic
+			expediting.  Set to zero to disable automatic
+			expediting.
+
 	stacktrace	[FTRACE]
 			Enabled the stack tracer on boot up.
 
diff --git a/include/linux/srcuclassic.h b/include/linux/srcuclassic.h
index 41cf99930f34..5753f7322262 100644
--- a/include/linux/srcuclassic.h
+++ b/include/linux/srcuclassic.h
@@ -98,4 +98,18 @@ void synchronize_srcu_expedited(struct srcu_struct *sp);
 void srcu_barrier(struct srcu_struct *sp);
 unsigned long srcu_batches_completed(struct srcu_struct *sp);
 
+static inline void srcutorture_get_gp_data(enum rcutorture_type test_type,
+					   struct srcu_struct *sp, int *flags,
+					   unsigned long *gpnum,
+					   unsigned long *completed)
+{
+	if (test_type != SRCU_FLAVOR)
+		return;
+	*flags = 0;
+	*completed = sp->completed;
+	*gpnum = *completed;
+	if (sp->batch_queue.head || sp->batch_check0.head || sp->batch_check0.head)
+		(*gpnum)++;
+}
+
 #endif
diff --git a/include/linux/srcutiny.h b/include/linux/srcutiny.h
index 4f284e4f4d8c..42311ee0334f 100644
--- a/include/linux/srcutiny.h
+++ b/include/linux/srcutiny.h
@@ -78,4 +78,16 @@ static inline unsigned long srcu_batches_completed(struct srcu_struct *sp)
 	return 0;
 }
 
+static inline void srcutorture_get_gp_data(enum rcutorture_type test_type,
+					   struct srcu_struct *sp, int *flags,
+					   unsigned long *gpnum,
+					   unsigned long *completed)
+{
+	if (test_type != SRCU_FLAVOR)
+		return;
+	*flags = 0;
+	*completed = sp->srcu_gp_seq;
+	*gpnum = *completed;
+}
+
 #endif
diff --git a/include/linux/srcutree.h b/include/linux/srcutree.h
index 0400e211aa44..32e86d85fd11 100644
--- a/include/linux/srcutree.h
+++ b/include/linux/srcutree.h
@@ -43,10 +43,13 @@ struct srcu_data {
 	spinlock_t lock ____cacheline_internodealigned_in_smp;
 	struct rcu_segcblist srcu_cblist;	/* List of callbacks.*/
 	unsigned long srcu_gp_seq_needed;	/* Furthest future GP needed. */
+	unsigned long srcu_gp_seq_needed_exp;	/* Furthest future exp GP. */
 	bool srcu_cblist_invoking;		/* Invoking these CBs? */
 	struct delayed_work work;		/* Context for CB invoking. */
 	struct rcu_head srcu_barrier_head;	/* For srcu_barrier() use. */
 	struct srcu_node *mynode;		/* Leaf srcu_node. */
+	unsigned long grpmask;			/* Mask for leaf srcu_node */
+						/*  ->srcu_data_have_cbs[]. */
 	int cpu;
 	struct srcu_struct *sp;
 };
@@ -59,6 +62,9 @@ struct srcu_node {
 	unsigned long srcu_have_cbs[4];		/* GP seq for children */
 						/*  having CBs, but only */
 						/*  is > ->srcu_gq_seq. */
+	unsigned long srcu_data_have_cbs[4];	/* Which srcu_data structs */
+						/*  have CBs for given GP? */
+	unsigned long srcu_gp_seq_needed_exp;	/* Furthest future exp GP. */
 	struct srcu_node *srcu_parent;		/* Next up in tree. */
 	int grplo;				/* Least CPU for node. */
 	int grphi;				/* Biggest CPU for node. */
@@ -77,7 +83,8 @@ struct srcu_struct {
 	unsigned int srcu_idx;			/* Current rdr array element. */
 	unsigned long srcu_gp_seq;		/* Grace-period seq #. */
 	unsigned long srcu_gp_seq_needed;	/* Latest gp_seq needed. */
-	atomic_t srcu_exp_cnt;			/* # ongoing expedited GPs. */
+	unsigned long srcu_gp_seq_needed_exp;	/* Furthest future exp GP. */
+	unsigned long srcu_last_gp_end;		/* Last GP end timestamp (ns) */
 	struct srcu_data __percpu *sda;		/* Per-CPU srcu_data array. */
 	unsigned long srcu_barrier_seq;		/* srcu_barrier seq #. */
 	struct mutex srcu_barrier_mutex;	/* Serialize barrier ops. */
@@ -136,4 +143,8 @@ void synchronize_srcu_expedited(struct srcu_struct *sp);
 void srcu_barrier(struct srcu_struct *sp);
 unsigned long srcu_batches_completed(struct srcu_struct *sp);
 
+void srcutorture_get_gp_data(enum rcutorture_type test_type,
+			     struct srcu_struct *sp, int *flags,
+			     unsigned long *gpnum, unsigned long *completed);
+
 #endif
diff --git a/kernel/rcu/rcutorture.c b/kernel/rcu/rcutorture.c
index e9d4527cdd43..ae6e574d4cf5 100644
--- a/kernel/rcu/rcutorture.c
+++ b/kernel/rcu/rcutorture.c
@@ -1360,12 +1360,14 @@ rcu_torture_stats_print(void)
 		cur_ops->stats();
 	if (rtcv_snap == rcu_torture_current_version &&
 	    rcu_torture_current != NULL) {
-		int __maybe_unused flags;
-		unsigned long __maybe_unused gpnum;
-		unsigned long __maybe_unused completed;
+		int __maybe_unused flags = 0;
+		unsigned long __maybe_unused gpnum = 0;
+		unsigned long __maybe_unused completed = 0;
 
 		rcutorture_get_gp_data(cur_ops->ttype,
 				       &flags, &gpnum, &completed);
+		srcutorture_get_gp_data(cur_ops->ttype, srcu_ctlp,
+					&flags, &gpnum, &completed);
 		wtp = READ_ONCE(writer_task);
 		pr_alert("??? Writer stall state %s(%d) g%lu c%lu f%#x ->state %#lx\n",
 			 rcu_torture_writer_state_getname(),
diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c
index 9ecf0acc18eb..b09221912f48 100644
--- a/kernel/rcu/srcutree.c
+++ b/kernel/rcu/srcutree.c
@@ -34,10 +34,14 @@
 #include <linux/sched.h>
 #include <linux/smp.h>
 #include <linux/delay.h>
+#include <linux/module.h>
 #include <linux/srcu.h>
 
 #include "rcu.h"
 
+ulong exp_holdoff = 50 * 1000; /* Holdoff (ns) for auto-expediting. */
+module_param(exp_holdoff, ulong, 0444);
+
 static void srcu_invoke_callbacks(struct work_struct *work);
 static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay);
 
@@ -66,8 +70,13 @@ static void init_srcu_struct_nodes(struct srcu_struct *sp, bool is_static)
 	/* Each pass through this loop initializes one srcu_node structure. */
 	rcu_for_each_node_breadth_first(sp, snp) {
 		spin_lock_init(&snp->lock);
-		for (i = 0; i < ARRAY_SIZE(snp->srcu_have_cbs); i++)
+		WARN_ON_ONCE(ARRAY_SIZE(snp->srcu_have_cbs) !=
+			     ARRAY_SIZE(snp->srcu_data_have_cbs));
+		for (i = 0; i < ARRAY_SIZE(snp->srcu_have_cbs); i++) {
 			snp->srcu_have_cbs[i] = 0;
+			snp->srcu_data_have_cbs[i] = 0;
+		}
+		snp->srcu_gp_seq_needed_exp = 0;
 		snp->grplo = -1;
 		snp->grphi = -1;
 		if (snp == &sp->node[0]) {
@@ -98,6 +107,7 @@ static void init_srcu_struct_nodes(struct srcu_struct *sp, bool is_static)
 		rcu_segcblist_init(&sdp->srcu_cblist);
 		sdp->srcu_cblist_invoking = false;
 		sdp->srcu_gp_seq_needed = sp->srcu_gp_seq;
+		sdp->srcu_gp_seq_needed_exp = sp->srcu_gp_seq;
 		sdp->mynode = &snp_first[cpu / levelspread[level]];
 		for (snp = sdp->mynode; snp != NULL; snp = snp->srcu_parent) {
 			if (snp->grplo < 0)
@@ -107,6 +117,7 @@ static void init_srcu_struct_nodes(struct srcu_struct *sp, bool is_static)
 		sdp->cpu = cpu;
 		INIT_DELAYED_WORK(&sdp->work, srcu_invoke_callbacks);
 		sdp->sp = sp;
+		sdp->grpmask = 1 << (cpu - sdp->mynode->grplo);
 		if (is_static)
 			continue;
 
@@ -130,7 +141,6 @@ static int init_srcu_struct_fields(struct srcu_struct *sp, bool is_static)
 	mutex_init(&sp->srcu_gp_mutex);
 	sp->srcu_idx = 0;
 	sp->srcu_gp_seq = 0;
-	atomic_set(&sp->srcu_exp_cnt, 0);
 	sp->srcu_barrier_seq = 0;
 	mutex_init(&sp->srcu_barrier_mutex);
 	atomic_set(&sp->srcu_barrier_cpu_cnt, 0);
@@ -138,6 +148,8 @@ static int init_srcu_struct_fields(struct srcu_struct *sp, bool is_static)
 	if (!is_static)
 		sp->sda = alloc_percpu(struct srcu_data);
 	init_srcu_struct_nodes(sp, is_static);
+	sp->srcu_gp_seq_needed_exp = 0;
+	sp->srcu_last_gp_end = ktime_get_mono_fast_ns();
 	smp_store_release(&sp->srcu_gp_seq_needed, 0); /* Init done. */
 	return sp->sda ? 0 : -ENOMEM;
 }
@@ -302,6 +314,18 @@ static bool srcu_readers_active(struct srcu_struct *sp)
 
 #define SRCU_INTERVAL		1
 
+/*
+ * Return grace-period delay, zero if there are expedited grace
+ * periods pending, SRCU_INTERVAL otherwise.
+ */
+static unsigned long srcu_get_delay(struct srcu_struct *sp)
+{
+	if (ULONG_CMP_LT(READ_ONCE(sp->srcu_gp_seq),
+			 READ_ONCE(sp->srcu_gp_seq_needed_exp)))
+		return 0;
+	return SRCU_INTERVAL;
+}
+
 /**
  * cleanup_srcu_struct - deconstruct a sleep-RCU structure
  * @sp: structure to clean up.
@@ -313,7 +337,8 @@ void cleanup_srcu_struct(struct srcu_struct *sp)
 {
 	int cpu;
 
-	WARN_ON_ONCE(atomic_read(&sp->srcu_exp_cnt));
+	if (WARN_ON(!srcu_get_delay(sp)))
+		return; /* Leakage unless caller handles error. */
 	if (WARN_ON(srcu_readers_active(sp)))
 		return; /* Leakage unless caller handles error. */
 	flush_delayed_work(&sp->work);
@@ -382,6 +407,7 @@ static void srcu_gp_start(struct srcu_struct *sp)
 			      rcu_seq_current(&sp->srcu_gp_seq));
 	(void)rcu_segcblist_accelerate(&sdp->srcu_cblist,
 				       rcu_seq_snap(&sp->srcu_gp_seq));
+	smp_mb(); /* Order prior store to ->srcu_gp_seq_needed vs. GP start. */
 	rcu_seq_start(&sp->srcu_gp_seq);
 	state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
 	WARN_ON_ONCE(state != SRCU_STATE_SCAN1);
@@ -434,16 +460,20 @@ static void srcu_schedule_cbs_sdp(struct srcu_data *sdp, unsigned long delay)
 
 /*
  * Schedule callback invocation for all srcu_data structures associated
- * with the specified srcu_node structure, if possible, on the corresponding
- * CPUs.
+ * with the specified srcu_node structure that have callbacks for the
+ * just-completed grace period, the one corresponding to idx.  If possible,
+ * schedule this invocation on the corresponding CPUs.
  */
-static void srcu_schedule_cbs_snp(struct srcu_struct *sp, struct srcu_node *snp)
+static void srcu_schedule_cbs_snp(struct srcu_struct *sp, struct srcu_node *snp,
+				  unsigned long mask, unsigned long delay)
 {
 	int cpu;
 
-	for (cpu = snp->grplo; cpu <= snp->grphi; cpu++)
-		srcu_schedule_cbs_sdp(per_cpu_ptr(sp->sda, cpu),
-				      atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL);
+	for (cpu = snp->grplo; cpu <= snp->grphi; cpu++) {
+		if (!(mask & (1 << (cpu - snp->grplo))))
+			continue;
+		srcu_schedule_cbs_sdp(per_cpu_ptr(sp->sda, cpu), delay);
+	}
 }
 
 /*
@@ -457,10 +487,12 @@ static void srcu_schedule_cbs_snp(struct srcu_struct *sp, struct srcu_node *snp)
  */
 static void srcu_gp_end(struct srcu_struct *sp)
 {
+	unsigned long cbdelay;
 	bool cbs;
 	unsigned long gpseq;
 	int idx;
 	int idxnext;
+	unsigned long mask;
 	struct srcu_node *snp;
 
 	/* Prevent more than one additional grace period. */
@@ -470,8 +502,12 @@ static void srcu_gp_end(struct srcu_struct *sp)
 	spin_lock_irq(&sp->gp_lock);
 	idx = rcu_seq_state(sp->srcu_gp_seq);
 	WARN_ON_ONCE(idx != SRCU_STATE_SCAN2);
+	cbdelay = srcu_get_delay(sp);
+	sp->srcu_last_gp_end = ktime_get_mono_fast_ns();
 	rcu_seq_end(&sp->srcu_gp_seq);
 	gpseq = rcu_seq_current(&sp->srcu_gp_seq);
+	if (ULONG_CMP_LT(sp->srcu_gp_seq_needed_exp, gpseq))
+		sp->srcu_gp_seq_needed_exp = gpseq;
 	spin_unlock_irq(&sp->gp_lock);
 	mutex_unlock(&sp->srcu_gp_mutex);
 	/* A new grace period can start at this point.  But only one. */
@@ -486,10 +522,14 @@ static void srcu_gp_end(struct srcu_struct *sp)
 			cbs = snp->srcu_have_cbs[idx] == gpseq;
 		snp->srcu_have_cbs[idx] = gpseq;
 		rcu_seq_set_state(&snp->srcu_have_cbs[idx], 1);
+		if (ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, gpseq))
+			snp->srcu_gp_seq_needed_exp = gpseq;
+		mask = snp->srcu_data_have_cbs[idx];
+		snp->srcu_data_have_cbs[idx] = 0;
 		spin_unlock_irq(&snp->lock);
 		if (cbs) {
 			smp_mb(); /* GP end before CB invocation. */
-			srcu_schedule_cbs_snp(sp, snp);
+			srcu_schedule_cbs_snp(sp, snp, mask, cbdelay);
 		}
 	}
 
@@ -504,25 +544,52 @@ static void srcu_gp_end(struct srcu_struct *sp)
 		srcu_gp_start(sp);
 		spin_unlock_irq(&sp->gp_lock);
 		/* Throttle expedited grace periods: Should be rare! */
-		srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) &&
-				    rcu_seq_ctr(gpseq) & 0xf
-				    ? 0
-				    : SRCU_INTERVAL);
+		srcu_reschedule(sp, rcu_seq_ctr(gpseq) & 0x3ff
+				    ? 0 : SRCU_INTERVAL);
 	} else {
 		spin_unlock_irq(&sp->gp_lock);
 	}
 }
 
 /*
+ * Funnel-locking scheme to scalably mediate many concurrent expedited
+ * grace-period requests.  This function is invoked for the first known
+ * expedited request for a grace period that has already been requested,
+ * but without expediting.  To start a completely new grace period,
+ * whether expedited or not, use srcu_funnel_gp_start() instead.
+ */
+static void srcu_funnel_exp_start(struct srcu_struct *sp, struct srcu_node *snp,
+				  unsigned long s)
+{
+	unsigned long flags;
+
+	for (; snp != NULL; snp = snp->srcu_parent) {
+		if (rcu_seq_done(&sp->srcu_gp_seq, s) ||
+		    ULONG_CMP_GE(READ_ONCE(snp->srcu_gp_seq_needed_exp), s))
+			return;
+		spin_lock_irqsave(&snp->lock, flags);
+		if (ULONG_CMP_GE(snp->srcu_gp_seq_needed_exp, s)) {
+			spin_unlock_irqrestore(&snp->lock, flags);
+			return;
+		}
+		WRITE_ONCE(snp->srcu_gp_seq_needed_exp, s);
+		spin_unlock_irqrestore(&snp->lock, flags);
+	}
+	spin_lock_irqsave(&sp->gp_lock, flags);
+	if (!ULONG_CMP_LT(sp->srcu_gp_seq_needed_exp, s))
+		sp->srcu_gp_seq_needed_exp = s;
+	spin_unlock_irqrestore(&sp->gp_lock, flags);
+}
+
+/*
  * Funnel-locking scheme to scalably mediate many concurrent grace-period
  * requests.  The winner has to do the work of actually starting grace
  * period s.  Losers must either ensure that their desired grace-period
  * number is recorded on at least their leaf srcu_node structure, or they
  * must take steps to invoke their own callbacks.
  */
-static void srcu_funnel_gp_start(struct srcu_struct *sp,
-				 struct srcu_data *sdp,
-				 unsigned long s)
+static void srcu_funnel_gp_start(struct srcu_struct *sp, struct srcu_data *sdp,
+				 unsigned long s, bool do_norm)
 {
 	unsigned long flags;
 	int idx = rcu_seq_ctr(s) % ARRAY_SIZE(sdp->mynode->srcu_have_cbs);
@@ -536,14 +603,25 @@ static void srcu_funnel_gp_start(struct srcu_struct *sp,
 		spin_lock_irqsave(&snp->lock, flags);
 		if (ULONG_CMP_GE(snp->srcu_have_cbs[idx], s)) {
 			snp_seq = snp->srcu_have_cbs[idx];
+			if (snp == sdp->mynode && snp_seq == s)
+				snp->srcu_data_have_cbs[idx] |= sdp->grpmask;
 			spin_unlock_irqrestore(&snp->lock, flags);
 			if (snp == sdp->mynode && snp_seq != s) {
 				smp_mb(); /* CBs after GP! */
-				srcu_schedule_cbs_sdp(sdp, 0);
+				srcu_schedule_cbs_sdp(sdp, do_norm
+							   ? SRCU_INTERVAL
+							   : 0);
+				return;
 			}
+			if (!do_norm)
+				srcu_funnel_exp_start(sp, snp, s);
 			return;
 		}
 		snp->srcu_have_cbs[idx] = s;
+		if (snp == sdp->mynode)
+			snp->srcu_data_have_cbs[idx] |= sdp->grpmask;
+		if (!do_norm && ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, s))
+			snp->srcu_gp_seq_needed_exp = s;
 		spin_unlock_irqrestore(&snp->lock, flags);
 	}
 
@@ -556,6 +634,8 @@ static void srcu_funnel_gp_start(struct srcu_struct *sp,
 		 */
 		smp_store_release(&sp->srcu_gp_seq_needed, s); /*^^^*/
 	}
+	if (!do_norm && ULONG_CMP_LT(sp->srcu_gp_seq_needed_exp, s))
+		sp->srcu_gp_seq_needed_exp = s;
 
 	/* If grace period not already done and none in progress, start it. */
 	if (!rcu_seq_done(&sp->srcu_gp_seq, s) &&
@@ -563,9 +643,7 @@ static void srcu_funnel_gp_start(struct srcu_struct *sp,
 		WARN_ON_ONCE(ULONG_CMP_GE(sp->srcu_gp_seq, sp->srcu_gp_seq_needed));
 		srcu_gp_start(sp);
 		queue_delayed_work(system_power_efficient_wq, &sp->work,
-				   atomic_read(&sp->srcu_exp_cnt)
-				   ? 0
-				   : SRCU_INTERVAL);
+				   srcu_get_delay(sp));
 	}
 	spin_unlock_irqrestore(&sp->gp_lock, flags);
 }
@@ -580,7 +658,7 @@ static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount)
 	for (;;) {
 		if (srcu_readers_active_idx_check(sp, idx))
 			return true;
-		if (--trycount + !!atomic_read(&sp->srcu_exp_cnt) <= 0)
+		if (--trycount + !srcu_get_delay(sp) <= 0)
 			return false;
 		udelay(SRCU_RETRY_CHECK_DELAY);
 	}
@@ -606,6 +684,67 @@ static void srcu_flip(struct srcu_struct *sp)
 }
 
 /*
+ * If SRCU is likely idle, return true, otherwise return false.
+ *
+ * Note that it is OK for several current from-idle requests for a new
+ * grace period from idle to specify expediting because they will all end
+ * up requesting the same grace period anyhow.  So no loss.
+ *
+ * Note also that if any CPU (including the current one) is still invoking
+ * callbacks, this function will nevertheless say "idle".  This is not
+ * ideal, but the overhead of checking all CPUs' callback lists is even
+ * less ideal, especially on large systems.  Furthermore, the wakeup
+ * can happen before the callback is fully removed, so we have no choice
+ * but to accept this type of error.
+ *
+ * This function is also subject to counter-wrap errors, but let's face
+ * it, if this function was preempted for enough time for the counters
+ * to wrap, it really doesn't matter whether or not we expedite the grace
+ * period.  The extra overhead of a needlessly expedited grace period is
+ * negligible when amoritized over that time period, and the extra latency
+ * of a needlessly non-expedited grace period is similarly negligible.
+ */
+static bool srcu_might_be_idle(struct srcu_struct *sp)
+{
+	unsigned long curseq;
+	unsigned long flags;
+	struct srcu_data *sdp;
+	unsigned long t;
+
+	/* If the local srcu_data structure has callbacks, not idle.  */
+	local_irq_save(flags);
+	sdp = this_cpu_ptr(sp->sda);
+	if (rcu_segcblist_pend_cbs(&sdp->srcu_cblist)) {
+		local_irq_restore(flags);
+		return false; /* Callbacks already present, so not idle. */
+	}
+	local_irq_restore(flags);
+
+	/*
+	 * No local callbacks, so probabalistically probe global state.
+	 * Exact information would require acquiring locks, which would
+	 * kill scalability, hence the probabalistic nature of the probe.
+	 */
+
+	/* First, see if enough time has passed since the last GP. */
+	t = ktime_get_mono_fast_ns();
+	if (exp_holdoff == 0 ||
+	    time_in_range_open(t, sp->srcu_last_gp_end,
+			       sp->srcu_last_gp_end + exp_holdoff))
+		return false; /* Too soon after last GP. */
+
+	/* Next, check for probable idleness. */
+	curseq = rcu_seq_current(&sp->srcu_gp_seq);
+	smp_mb(); /* Order ->srcu_gp_seq with ->srcu_gp_seq_needed. */
+	if (ULONG_CMP_LT(curseq, READ_ONCE(sp->srcu_gp_seq_needed)))
+		return false; /* Grace period in progress, so not idle. */
+	smp_mb(); /* Order ->srcu_gp_seq with prior access. */
+	if (curseq != rcu_seq_current(&sp->srcu_gp_seq))
+		return false; /* GP # changed, so not idle. */
+	return true; /* With reasonable probability, idle! */
+}
+
+/*
  * Enqueue an SRCU callback on the srcu_data structure associated with
  * the current CPU and the specified srcu_struct structure, initiating
  * grace-period processing if it is not already running.
@@ -633,10 +772,11 @@ static void srcu_flip(struct srcu_struct *sp)
  * srcu_read_lock(), and srcu_read_unlock() that are all passed the same
  * srcu_struct structure.
  */
-void call_srcu(struct srcu_struct *sp, struct rcu_head *rhp,
-	       rcu_callback_t func)
+void __call_srcu(struct srcu_struct *sp, struct rcu_head *rhp,
+		 rcu_callback_t func, bool do_norm)
 {
 	unsigned long flags;
+	bool needexp = false;
 	bool needgp = false;
 	unsigned long s;
 	struct srcu_data *sdp;
@@ -655,16 +795,28 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *rhp,
 		sdp->srcu_gp_seq_needed = s;
 		needgp = true;
 	}
+	if (ULONG_CMP_LT(sdp->srcu_gp_seq_needed_exp, s)) {
+		sdp->srcu_gp_seq_needed_exp = s;
+		needexp = true;
+	}
 	spin_unlock_irqrestore(&sdp->lock, flags);
 	if (needgp)
-		srcu_funnel_gp_start(sp, sdp, s);
+		srcu_funnel_gp_start(sp, sdp, s, do_norm);
+	else
+		srcu_funnel_exp_start(sp, sdp->mynode, s);
+}
+
+void call_srcu(struct srcu_struct *sp, struct rcu_head *rhp,
+	       rcu_callback_t func)
+{
+	__call_srcu(sp, rhp, func, true);
 }
 EXPORT_SYMBOL_GPL(call_srcu);
 
 /*
  * Helper function for synchronize_srcu() and synchronize_srcu_expedited().
  */
-static void __synchronize_srcu(struct srcu_struct *sp)
+static void __synchronize_srcu(struct srcu_struct *sp, bool do_norm)
 {
 	struct rcu_synchronize rcu;
 
@@ -680,7 +832,7 @@ static void __synchronize_srcu(struct srcu_struct *sp)
 	check_init_srcu_struct(sp);
 	init_completion(&rcu.completion);
 	init_rcu_head_on_stack(&rcu.head);
-	call_srcu(sp, &rcu.head, wakeme_after_rcu);
+	__call_srcu(sp, &rcu.head, wakeme_after_rcu, do_norm);
 	wait_for_completion(&rcu.completion);
 	destroy_rcu_head_on_stack(&rcu.head);
 }
@@ -697,18 +849,7 @@ static void __synchronize_srcu(struct srcu_struct *sp)
  */
 void synchronize_srcu_expedited(struct srcu_struct *sp)
 {
-	bool do_norm = rcu_gp_is_normal();
-
-	check_init_srcu_struct(sp);
-	if (!do_norm) {
-		atomic_inc(&sp->srcu_exp_cnt);
-		smp_mb__after_atomic(); /* increment before GP. */
-	}
-	__synchronize_srcu(sp);
-	if (!do_norm) {
-		smp_mb__before_atomic(); /* GP before decrement. */
-		WARN_ON_ONCE(atomic_dec_return(&sp->srcu_exp_cnt) < 0);
-	}
+	__synchronize_srcu(sp, rcu_gp_is_normal());
 }
 EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
 
@@ -750,13 +891,18 @@ EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
  * Of course, these memory-ordering guarantees apply only when
  * synchronize_srcu(), srcu_read_lock(), and srcu_read_unlock() are
  * passed the same srcu_struct structure.
+ *
+ * If SRCU is likely idle, expedite the first request.  This semantic
+ * was provided by Classic SRCU, and is relied upon by its users, so TREE
+ * SRCU must also provide it.  Note that detecting idleness is heuristic
+ * and subject to both false positives and negatives.
  */
 void synchronize_srcu(struct srcu_struct *sp)
 {
-	if (rcu_gp_is_expedited())
+	if (srcu_might_be_idle(sp) || rcu_gp_is_expedited())
 		synchronize_srcu_expedited(sp);
 	else
-		__synchronize_srcu(sp);
+		__synchronize_srcu(sp, true);
 }
 EXPORT_SYMBOL_GPL(synchronize_srcu);
 
@@ -991,6 +1137,18 @@ void process_srcu(struct work_struct *work)
 	sp = container_of(work, struct srcu_struct, work.work);
 
 	srcu_advance_state(sp);
-	srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL);
+	srcu_reschedule(sp, srcu_get_delay(sp));
 }
 EXPORT_SYMBOL_GPL(process_srcu);
+
+void srcutorture_get_gp_data(enum rcutorture_type test_type,
+			     struct srcu_struct *sp, int *flags,
+			     unsigned long *gpnum, unsigned long *completed)
+{
+	if (test_type != SRCU_FLAVOR)
+		return;
+	*flags = 0;
+	*completed = rcu_seq_ctr(sp->srcu_gp_seq);
+	*gpnum = rcu_seq_ctr(sp->srcu_gp_seq_needed);
+}
+EXPORT_SYMBOL_GPL(srcutorture_get_gp_data);
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 23aa02587d0f..91fff49d5869 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -704,15 +704,11 @@ void rcutorture_get_gp_data(enum rcutorture_type test_type, int *flags,
 	default:
 		break;
 	}
-	if (rsp != NULL) {
-		*flags = READ_ONCE(rsp->gp_flags);
-		*gpnum = READ_ONCE(rsp->gpnum);
-		*completed = READ_ONCE(rsp->completed);
+	if (rsp == NULL)
 		return;
-	}
-	*flags = 0;
-	*gpnum = 0;
-	*completed = 0;
+	*flags = READ_ONCE(rsp->gp_flags);
+	*gpnum = READ_ONCE(rsp->gpnum);
+	*completed = READ_ONCE(rsp->completed);
 }
 EXPORT_SYMBOL_GPL(rcutorture_get_gp_data);
 

  reply	other threads:[~2017-04-26 14:31 UTC|newest]

Thread overview: 24+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2017-04-24  2:48 TREE_SRCU slows hotplug by factor ~16 Mike Galbraith
2017-04-24  3:32 ` Paul E. McKenney
2017-04-24  5:24   ` Mike Galbraith
2017-04-24  6:22     ` Paul E. McKenney
2017-04-24  7:35       ` Mike Galbraith
2017-04-24  8:43         ` Mike Galbraith
2017-04-24 16:24         ` Paul E. McKenney
2017-04-25 22:36           ` Paul E. McKenney
2017-04-26 14:31             ` Paul E. McKenney [this message]
2017-04-26 15:26               ` Mike Galbraith
2017-04-26 15:44                 ` Paul E. McKenney
2017-04-26 15:49                   ` Mike Galbraith
2017-04-26 16:00                     ` Paul E. McKenney
2017-04-26 17:45                     ` Mike Galbraith
2017-04-26 17:55                       ` Paul E. McKenney
2017-04-26 17:56                         ` Paul E. McKenney
2017-04-26 18:12                           ` Mike Galbraith
2017-04-26 18:25                             ` Paul E. McKenney
2017-04-27  3:43                             ` Mike Galbraith
2017-04-27  4:11                               ` Paul E. McKenney
2017-04-27  4:15                                 ` Mike Galbraith
2017-04-27  5:32                                   ` Paul E. McKenney
2017-04-27  5:44                                     ` Mike Galbraith
2017-04-27 12:37                                       ` Paul E. McKenney

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20170426143145.GA5683@linux.vnet.ibm.com \
    --to=paulmck@linux.vnet.ibm.com \
    --cc=efault@gmx.de \
    --cc=linux-kernel@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.