From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1753282AbbGOSGE (ORCPT ); Wed, 15 Jul 2015 14:06:04 -0400 Received: from e31.co.us.ibm.com ([32.97.110.149]:41486 "EHLO e31.co.us.ibm.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1752545AbbGOSGB (ORCPT ); Wed, 15 Jul 2015 14:06:01 -0400 X-Helo: d03dlp02.boulder.ibm.com X-MailFrom: paulmck@linux.vnet.ibm.com X-RcptTo: linux-kernel@vger.kernel.org Date: Wed, 15 Jul 2015 11:05:46 -0700 From: "Paul E. McKenney" To: Oleg Nesterov Cc: Linus Torvalds , Peter Zijlstra , Daniel Wagner , Davidlohr Bueso , Ingo Molnar , Tejun Heo , linux-kernel@vger.kernel.org Subject: Re: [PATCH 1/7] rcu: Create rcu_sync infrastructure Message-ID: <20150715180546.GJ3717@linux.vnet.ibm.com> Reply-To: paulmck@linux.vnet.ibm.com References: <20150711233535.GA829@redhat.com> <20150711233548.GA843@redhat.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20150711233548.GA843@redhat.com> User-Agent: Mutt/1.5.21 (2010-09-15) X-TM-AS-MML: disable X-Content-Scanned: Fidelis XPS MAILER x-cbid: 15071518-8236-0000-0000-00000D2E171F Sender: linux-kernel-owner@vger.kernel.org List-ID: X-Mailing-List: linux-kernel@vger.kernel.org On Sun, Jul 12, 2015 at 01:35:48AM +0200, Oleg Nesterov wrote: > It is functionally equivalent to > > struct rcu_sync_struct { > atomic_t counter; > }; > > static inline bool rcu_sync_is_idle(struct rcu_sync_struct *rss) > { If you add an smp_mb() here... > return atomic_read(&rss->counter) == 0; > } > > static inline void rcu_sync_enter(struct rcu_sync_struct *rss) > { > atomic_inc(&rss->counter); > synchronize_sched(); > } > > static inline void rcu_sync_exit(struct rcu_sync_struct *rss) > { > synchronize_sched(); You should be able to demote the above synchronize_sched() to an smp_mb__before_atomic(). Even rare writes should make this tradeoff worthwhile. I will try to call out the equivalent transformation below, for your amusement if for no other reason. ;-) > atomic_dec(&rss->counter); > } > > except: it records the state and synchronize_sched() is only called by > rcu_sync_enter() and only if necessary. > > Reviewed-by: Paul E. McKenney > Signed-off-by: Oleg Nesterov > Signed-off-by: Peter Zijlstra (Intel) > --- > include/linux/rcusync.h | 64 ++++++++++++++++++++++++++++ > kernel/rcu/Makefile | 2 +- > kernel/rcu/sync.c | 108 +++++++++++++++++++++++++++++++++++++++++++++++ > 3 files changed, 173 insertions(+), 1 deletions(-) > create mode 100644 include/linux/rcusync.h > create mode 100644 kernel/rcu/sync.c > > diff --git a/include/linux/rcusync.h b/include/linux/rcusync.h > new file mode 100644 > index 0000000..7858491 > --- /dev/null > +++ b/include/linux/rcusync.h > @@ -0,0 +1,64 @@ > +#ifndef _LINUX_RCUSYNC_H_ > +#define _LINUX_RCUSYNC_H_ > + > +#include > +#include > + > +struct rcu_sync_struct { > + int gp_state; > + int gp_count; > + wait_queue_head_t gp_wait; > + > + int cb_state; > + struct rcu_head cb_head; > + > + void (*sync)(void); > + void (*call)(struct rcu_head *, void (*)(struct rcu_head *)); > +}; > + > +#define ___RCU_SYNC_INIT(name) \ > + .gp_state = 0, \ > + .gp_count = 0, \ > + .gp_wait = __WAIT_QUEUE_HEAD_INITIALIZER(name.gp_wait), \ > + .cb_state = 0 > + > +#define __RCU_SCHED_SYNC_INIT(name) { \ > + ___RCU_SYNC_INIT(name), \ > + .sync = synchronize_sched, \ > + .call = call_rcu_sched, \ > +} > + > +#define __RCU_BH_SYNC_INIT(name) { \ > + ___RCU_SYNC_INIT(name), \ > + .sync = synchronize_rcu_bh, \ > + .call = call_rcu_bh, \ > +} > + > +#define __RCU_SYNC_INIT(name) { \ > + ___RCU_SYNC_INIT(name), \ > + .sync = synchronize_rcu, \ > + .call = call_rcu, \ > +} > + > +#define DEFINE_RCU_SCHED_SYNC(name) \ > + struct rcu_sync_struct name = __RCU_SCHED_SYNC_INIT(name) > + > +#define DEFINE_RCU_BH_SYNC(name) \ > + struct rcu_sync_struct name = __RCU_BH_SYNC_INIT(name) > + > +#define DEFINE_RCU_SYNC(name) \ > + struct rcu_sync_struct name = __RCU_SYNC_INIT(name) > + > +static inline bool rcu_sync_is_idle(struct rcu_sync_struct *rss) > +{ smp_mb(); /* A: Ensure that reader sees last update. */ /* Pairs with B. */ > + return !rss->gp_state; /* GP_IDLE */ > +} > + > +enum rcu_sync_type { RCU_SYNC, RCU_SCHED_SYNC, RCU_BH_SYNC }; > + > +extern void rcu_sync_init(struct rcu_sync_struct *, enum rcu_sync_type); > +extern void rcu_sync_enter(struct rcu_sync_struct *); > +extern void rcu_sync_exit(struct rcu_sync_struct *); > + > +#endif /* _LINUX_RCUSYNC_H_ */ > + > diff --git a/kernel/rcu/Makefile b/kernel/rcu/Makefile > index 50a8084..61a1656 100644 > --- a/kernel/rcu/Makefile > +++ b/kernel/rcu/Makefile > @@ -1,4 +1,4 @@ > -obj-y += update.o > +obj-y += update.o sync.o > obj-$(CONFIG_SRCU) += srcu.o > obj-$(CONFIG_RCU_TORTURE_TEST) += rcutorture.o > obj-$(CONFIG_TREE_RCU) += tree.o > diff --git a/kernel/rcu/sync.c b/kernel/rcu/sync.c > new file mode 100644 > index 0000000..f84176a > --- /dev/null > +++ b/kernel/rcu/sync.c > @@ -0,0 +1,108 @@ > + > +#include > +#include > + > +enum { GP_IDLE = 0, GP_PENDING, GP_PASSED }; > +enum { CB_IDLE = 0, CB_PENDING, CB_REPLAY }; > + > +#define rss_lock gp_wait.lock > + > +void rcu_sync_init(struct rcu_sync_struct *rss, enum rcu_sync_type type) > +{ > + memset(rss, 0, sizeof(*rss)); > + init_waitqueue_head(&rss->gp_wait); > + > + switch (type) { > + case RCU_SYNC: > + rss->sync = synchronize_rcu; > + rss->call = call_rcu; > + break; > + > + case RCU_SCHED_SYNC: > + rss->sync = synchronize_sched; > + rss->call = call_rcu_sched; > + break; > + > + case RCU_BH_SYNC: > + rss->sync = synchronize_rcu_bh; > + rss->call = call_rcu_bh; > + break; > + } > +} > + > +void rcu_sync_enter(struct rcu_sync_struct *rss) > +{ > + bool need_wait, need_sync; > + > + spin_lock_irq(&rss->rss_lock); > + need_wait = rss->gp_count++; > + need_sync = rss->gp_state == GP_IDLE; > + if (need_sync) > + rss->gp_state = GP_PENDING; > + spin_unlock_irq(&rss->rss_lock); > + > + BUG_ON(need_wait && need_sync); > + > + if (need_sync) { > + rss->sync(); > + rss->gp_state = GP_PASSED; > + wake_up_all(&rss->gp_wait); > + } else if (need_wait) { > + wait_event(rss->gp_wait, rss->gp_state == GP_PASSED); > + } else { > + /* > + * Possible when there's a pending CB from a rcu_sync_exit(). > + * Nobody has yet been allowed the 'fast' path and thus we can > + * avoid doing any sync(). The callback will get 'dropped'. > + */ > + BUG_ON(rss->gp_state != GP_PASSED); > + } > +} > + > +static void rcu_sync_func(struct rcu_head *rcu) > +{ > + struct rcu_sync_struct *rss = > + container_of(rcu, struct rcu_sync_struct, cb_head); > + unsigned long flags; > + > + > + BUG_ON(rss->gp_state != GP_PASSED); > + BUG_ON(rss->cb_state == CB_IDLE); > + > + spin_lock_irqsave(&rss->rss_lock, flags); > + if (rss->gp_count) { > + /* > + * A new rcu_sync_begin() has happened; drop the callback. > + */ > + rss->cb_state = CB_IDLE; > + } else if (rss->cb_state == CB_REPLAY) { > + /* > + * A new rcu_sync_exit() has happened; requeue the callback > + * to catch a later GP. > + */ > + rss->cb_state = CB_PENDING; > + rss->call(&rss->cb_head, rcu_sync_func); > + } else { > + /* > + * We're at least a GP after rcu_sync_exit(); eveybody will now > + * have observed the write side critical section. Let 'em rip!. > + */ > + rss->cb_state = CB_IDLE; > + rss->gp_state = GP_IDLE; > + } > + spin_unlock_irqrestore(&rss->rss_lock, flags); > +} > + > +void rcu_sync_exit(struct rcu_sync_struct *rss) > +{ > + spin_lock_irq(&rss->rss_lock); smp_mb(); /* B: Make sure next readers see critical section. */ /* Pairs with A. */ > + if (!--rss->gp_count) { At which point, I believe you can ditch the callback entirely, along with ->cb_state. So, what am I missing here? Are readers really so frequent that the added read-side memory barrier is visible? Given that the current code forces the readers to grab ->rss_lock during the trailing grace period, any writer frequency at all will overwhelm the cost of the read-side memory barrier. Thanx, Paul > + if (rss->cb_state == CB_IDLE) { > + rss->cb_state = CB_PENDING; > + rss->call(&rss->cb_head, rcu_sync_func); > + } else if (rss->cb_state == CB_PENDING) { > + rss->cb_state = CB_REPLAY; > + } > + } > + spin_unlock_irq(&rss->rss_lock); > +} > -- > 1.5.5.1 >