All of lore.kernel.org
 help / color / mirror / Atom feed
From: "Paul E. McKenney" <paulmck@linux.vnet.ibm.com>
To: Lai Jiangshan <eag0628@gmail.com>
Cc: Lai Jiangshan <laijs@cn.fujitsu.com>,
	linux-kernel@vger.kernel.org, mingo@elte.hu, dipankar@in.ibm.com,
	akpm@linux-foundation.org, mathieu.desnoyers@polymtl.ca,
	josh@joshtriplett.org, niv@us.ibm.com, tglx@linutronix.de,
	peterz@infradead.org, rostedt@goodmis.org,
	Valdis.Kletnieks@vt.edu, dhowells@redhat.com,
	eric.dumazet@gmail.com, darren@dvhart.com, fweisbec@gmail.com,
	patches@linaro.org
Subject: Re: [RFC PATCH 5/5 single-thread-version] implement per-domain single-thread state machine call_srcu()
Date: Mon, 12 Mar 2012 11:03:21 -0700	[thread overview]
Message-ID: <20120312180321.GG2471@linux.vnet.ibm.com> (raw)
In-Reply-To: <CACvQF534RJcu8-yKcByqthifUHCLsggGXuRPAKqf7mZMXT3taA@mail.gmail.com>

On Sat, Mar 10, 2012 at 11:16:48AM +0800, Lai Jiangshan wrote:
> On Fri, Mar 9, 2012 at 4:35 AM, Paul E. McKenney
> <paulmck@linux.vnet.ibm.com> wrote:
> > On Wed, Mar 07, 2012 at 11:54:02AM +0800, Lai Jiangshan wrote:
> >> This patch is on the top of the 4 previous patches(1/6, 2/6, 3/6, 4/6).
> >>
> >> o     state machine is light way and single-threaded, it is preemptible when checking.
> >>
> >> o     state machine is a work_struct. So, there is no thread occupied
> >>       by SRCU when the srcu is not actived(no callback). And it does
> >>       not sleep(avoid to occupy a thread when sleep).
> >>
> >> o     state machine is the only thread can flip/check/write(*) the srcu_struct,
> >>       so we don't need any mutex.
> >>       (write(*): except ->per_cpu_ref, ->running, ->batch_queue)
> >>
> >> o     synchronize_srcu() is always call call_srcu().
> >>       synchronize_srcu_expedited() is also.
> >>       It is OK for mb()-based srcu are extremely fast.
> >>
> >> o     In current kernel, we can expect that there are only 1 callback per gp.
> >>       so callback is probably called in the same CPU when it is queued.
> >>
> >> The trip of a callback:
> >>       1) ->batch_queue when call_srcu()
> >>
> >>       2) ->batch_check0 when try to do check_zero
> >>
> >>       3) ->batch_check1 after finish its first check_zero and the flip
> >>
> >>       4) ->batch_done after finish its second check_zero
> >>
> >> The current requirement of the callbacks:
> >>       The callback will be called inside process context.
> >>       The callback should be fast without any sleeping path.
> >>
> >> Signed-off-by: Lai Jiangshan <laijs@cn.fujitsu.com>
> >> ---
> >>  include/linux/rcupdate.h |    2 +-
> >>  include/linux/srcu.h     |   28 +++++-
> >>  kernel/rcupdate.c        |   24 ++++-
> >>  kernel/rcutorture.c      |   44 ++++++++-
> >>  kernel/srcu.c            |  238 ++++++++++++++++++++++++++++++++-------------
> >>  5 files changed, 259 insertions(+), 77 deletions(-)
> >> diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
> >> index 9372174..d98eab2 100644
> >> --- a/include/linux/rcupdate.h
> >> +++ b/include/linux/rcupdate.h
> >> @@ -222,7 +222,7 @@ extern void rcu_irq_exit(void);
> >>   * TREE_RCU and rcu_barrier_() primitives in TINY_RCU.
> >>   */
> >>
> >> -typedef void call_rcu_func_t(struct rcu_head *head,
> >> +typedef void (*call_rcu_func_t)(struct rcu_head *head,
> >
> > I don't see what this applies against.  The old patch 5/6 created
> > a "(*call_rcu_func_t)(struct rcu_head *head," and I don't see what
> > created the "call_rcu_func_t(struct rcu_head *head,".
> 
> typedef void call_rcu_func_t(...) declares a function type, not a
> function pointer
> type. I use a line of code as following:
> 
> call_rcu_func_t crf = func;
> 
> if call_rcu_func_t is a function type, the above code can't be complied,
> I need to covert it to function pointer type.

Got it, thank you!

> >>                            void (*func)(struct rcu_head *head));
> >>  void wait_rcu_gp(call_rcu_func_t crf);
> >>
> >> diff --git a/include/linux/srcu.h b/include/linux/srcu.h
> >> index df8f5f7..56cb774 100644
> >> --- a/include/linux/srcu.h
> >> +++ b/include/linux/srcu.h
> >> @@ -29,6 +29,7 @@
> >>
> >>  #include <linux/mutex.h>
> >>  #include <linux/rcupdate.h>
> >> +#include <linux/workqueue.h>
> >>
> >>  struct srcu_struct_array {
> >>       unsigned long c[2];
> >> @@ -39,10 +40,23 @@ struct srcu_struct_array {
> >>  #define SRCU_REF_MASK                (ULONG_MAX >> SRCU_USAGE_BITS)
> >>  #define SRCU_USAGE_COUNT     (SRCU_REF_MASK + 1)
> >>
> >> +struct rcu_batch {
> >> +     struct rcu_head *head, **tail;
> >> +};
> >> +
> >>  struct srcu_struct {
> >>       unsigned completed;
> >>       struct srcu_struct_array __percpu *per_cpu_ref;
> >> -     struct mutex mutex;
> >> +     spinlock_t queue_lock; /* protect ->batch_queue, ->running */
> >> +     bool running;
> >> +     /* callbacks just queued */
> >> +     struct rcu_batch batch_queue;
> >> +     /* callbacks try to do the first check_zero */
> >> +     struct rcu_batch batch_check0;
> >> +     /* callbacks done with the first check_zero and the flip */
> >> +     struct rcu_batch batch_check1;
> >> +     struct rcu_batch batch_done;
> >> +     struct delayed_work work;
> >
> > Why not use your multiple-tail-pointer trick here?  (The one that is
> > used in treercu.)
> 
> 1) Make the code of the advance of batches simpler.
> 2) batch_queue is protected by lock, so it will be hard to use
> multiple-tail-pointer trick.
> 3) rcu_batch API do add a little more runtime overhead, but this
> overhead is just
> several cpu-instructions, I think it is OK. It is good tradeoff when
> compare to the readability.

OK, let's see how it goes.

> I think we can also use rcu_batch for rcutree/rcutiny.

Hmmm...  Readability and speed both improved when moving from something
resembing rcu_batch to the current multi-tailed lists.  ;-)

> >>       unsigned long snap[NR_CPUS];
> >>  #ifdef CONFIG_DEBUG_LOCK_ALLOC
> >>       struct lockdep_map dep_map;
> >> @@ -67,12 +81,24 @@ int init_srcu_struct(struct srcu_struct *sp);
> >>
> >>  #endif /* #else #ifdef CONFIG_DEBUG_LOCK_ALLOC */
> >>
> >> +/* draft
> >> + * queue callbacks which will be invoked after grace period.
> >> + * The callback will be called inside process context.
> >> + * The callback should be fast without any sleeping path.
> >> + */
> >> +void call_srcu(struct srcu_struct *sp, struct rcu_head *head,
> >> +             void (*func)(struct rcu_head *head));
> >> +
> >> +typedef void (*call_srcu_func_t)(struct srcu_struct *sp, struct rcu_head *head,
> >> +             void (*func)(struct rcu_head *head));
> >> +void __wait_srcu_gp(struct srcu_struct *sp, call_srcu_func_t crf);
> >>  void cleanup_srcu_struct(struct srcu_struct *sp);
> >>  int __srcu_read_lock(struct srcu_struct *sp) __acquires(sp);
> >>  void __srcu_read_unlock(struct srcu_struct *sp, int idx) __releases(sp);
> >>  void synchronize_srcu(struct srcu_struct *sp);
> >>  void synchronize_srcu_expedited(struct srcu_struct *sp);
> >>  long srcu_batches_completed(struct srcu_struct *sp);
> >> +void srcu_barrier(struct srcu_struct *sp);
> >>
> >>  #ifdef CONFIG_DEBUG_LOCK_ALLOC
> >>
> >> diff --git a/kernel/rcupdate.c b/kernel/rcupdate.c
> >> index a86f174..f9b551f 100644
> >> --- a/kernel/rcupdate.c
> >> +++ b/kernel/rcupdate.c
> >> @@ -45,6 +45,7 @@
> >>  #include <linux/mutex.h>
> >>  #include <linux/export.h>
> >>  #include <linux/hardirq.h>
> >> +#include <linux/srcu.h>
> >>
> >>  #define CREATE_TRACE_POINTS
> >>  #include <trace/events/rcu.h>
> >> @@ -123,20 +124,39 @@ static void wakeme_after_rcu(struct rcu_head  *head)
> >>       complete(&rcu->completion);
> >>  }
> >>
> >> -void wait_rcu_gp(call_rcu_func_t crf)
> >> +static void __wait_rcu_gp(void *domain, void *func)
> >>  {
> >>       struct rcu_synchronize rcu;
> >>
> >>       init_rcu_head_on_stack(&rcu.head);
> >>       init_completion(&rcu.completion);
> >> +
> >>       /* Will wake me after RCU finished. */
> >> -     crf(&rcu.head, wakeme_after_rcu);
> >> +     if (!domain) {
> >> +             call_rcu_func_t crf = func;
> >> +             crf(&rcu.head, wakeme_after_rcu);
> >> +     } else {
> >> +             call_srcu_func_t crf = func;
> >> +             crf(domain, &rcu.head, wakeme_after_rcu);
> >> +     }
> >> +
> >>       /* Wait for it. */
> >>       wait_for_completion(&rcu.completion);
> >>       destroy_rcu_head_on_stack(&rcu.head);
> >>  }
> >
> > Mightn't it be simpler and faster to just have a separate wait_srcu_gp()
> > that doesn't share code with wait_rcu_gp()?  I am all for sharing code,
> > but this might be hrting more than helping.
> >
> >> +
> >> +void wait_rcu_gp(call_rcu_func_t crf)
> >> +{
> >> +     __wait_rcu_gp(NULL, crf);
> >> +}
> >>  EXPORT_SYMBOL_GPL(wait_rcu_gp);
> >>
> >> +/* srcu.c internel */
> >> +void __wait_srcu_gp(struct srcu_struct *sp, call_srcu_func_t crf)
> >> +{
> >> +     __wait_rcu_gp(sp, crf);
> >> +}
> >> +
> >>  #ifdef CONFIG_PROVE_RCU
> >>  /*
> >>   * wrapper function to avoid #include problems.
> >> diff --git a/kernel/rcutorture.c b/kernel/rcutorture.c
> >> index 54e5724..40d24d0 100644
> >
> > OK, so your original patch #6 is folded into this?  I don't have a strong
> > view either way, just need to know.
> >
> >> --- a/kernel/rcutorture.c
> >> +++ b/kernel/rcutorture.c
> >> @@ -623,6 +623,11 @@ static int srcu_torture_completed(void)
> >>       return srcu_batches_completed(&srcu_ctl);
> >>  }
> >>
> >> +static void srcu_torture_deferred_free(struct rcu_torture *rp)
> >> +{
> >> +     call_srcu(&srcu_ctl, &rp->rtort_rcu, rcu_torture_cb);
> >> +}
> >> +
> >>  static void srcu_torture_synchronize(void)
> >>  {
> >>       synchronize_srcu(&srcu_ctl);
> >> @@ -652,7 +657,7 @@ static struct rcu_torture_ops srcu_ops = {
> >>       .read_delay     = srcu_read_delay,
> >>       .readunlock     = srcu_torture_read_unlock,
> >>       .completed      = srcu_torture_completed,
> >> -     .deferred_free  = rcu_sync_torture_deferred_free,
> >> +     .deferred_free  = srcu_torture_deferred_free,
> >>       .sync           = srcu_torture_synchronize,
> >>       .call           = NULL,
> >>       .cb_barrier     = NULL,
> >> @@ -660,6 +665,21 @@ static struct rcu_torture_ops srcu_ops = {
> >>       .name           = "srcu"
> >>  };
> >>
> >> +static struct rcu_torture_ops srcu_sync_ops = {
> >> +     .init           = srcu_torture_init,
> >> +     .cleanup        = srcu_torture_cleanup,
> >> +     .readlock       = srcu_torture_read_lock,
> >> +     .read_delay     = srcu_read_delay,
> >> +     .readunlock     = srcu_torture_read_unlock,
> >> +     .completed      = srcu_torture_completed,
> >> +     .deferred_free  = rcu_sync_torture_deferred_free,
> >> +     .sync           = srcu_torture_synchronize,
> >> +     .call           = NULL,
> >> +     .cb_barrier     = NULL,
> >> +     .stats          = srcu_torture_stats,
> >> +     .name           = "srcu_sync"
> >> +};
> >> +
> >>  static int srcu_torture_read_lock_raw(void) __acquires(&srcu_ctl)
> >>  {
> >>       return srcu_read_lock_raw(&srcu_ctl);
> >> @@ -677,7 +697,7 @@ static struct rcu_torture_ops srcu_raw_ops = {
> >>       .read_delay     = srcu_read_delay,
> >>       .readunlock     = srcu_torture_read_unlock_raw,
> >>       .completed      = srcu_torture_completed,
> >> -     .deferred_free  = rcu_sync_torture_deferred_free,
> >> +     .deferred_free  = srcu_torture_deferred_free,
> >>       .sync           = srcu_torture_synchronize,
> >>       .call           = NULL,
> >>       .cb_barrier     = NULL,
> >> @@ -685,6 +705,21 @@ static struct rcu_torture_ops srcu_raw_ops = {
> >>       .name           = "srcu_raw"
> >>  };
> >>
> >> +static struct rcu_torture_ops srcu_raw_sync_ops = {
> >> +     .init           = srcu_torture_init,
> >> +     .cleanup        = srcu_torture_cleanup,
> >> +     .readlock       = srcu_torture_read_lock_raw,
> >> +     .read_delay     = srcu_read_delay,
> >> +     .readunlock     = srcu_torture_read_unlock_raw,
> >> +     .completed      = srcu_torture_completed,
> >> +     .deferred_free  = rcu_sync_torture_deferred_free,
> >> +     .sync           = srcu_torture_synchronize,
> >> +     .call           = NULL,
> >> +     .cb_barrier     = NULL,
> >> +     .stats          = srcu_torture_stats,
> >> +     .name           = "srcu_raw_sync"
> >> +};
> >> +
> >>  static void srcu_torture_synchronize_expedited(void)
> >>  {
> >>       synchronize_srcu_expedited(&srcu_ctl);
> >> @@ -1673,7 +1708,7 @@ static int rcu_torture_barrier_init(void)
> >>       for (i = 0; i < n_barrier_cbs; i++) {
> >>               init_waitqueue_head(&barrier_cbs_wq[i]);
> >>               barrier_cbs_tasks[i] = kthread_run(rcu_torture_barrier_cbs,
> >> -                                                (void *)i,
> >> +                                                (void *)(long)i,
> >>                                                  "rcu_torture_barrier_cbs");
> >>               if (IS_ERR(barrier_cbs_tasks[i])) {
> >>                       ret = PTR_ERR(barrier_cbs_tasks[i]);
> >> @@ -1857,7 +1892,8 @@ rcu_torture_init(void)
> >>       static struct rcu_torture_ops *torture_ops[] =
> >>               { &rcu_ops, &rcu_sync_ops, &rcu_expedited_ops,
> >>                 &rcu_bh_ops, &rcu_bh_sync_ops, &rcu_bh_expedited_ops,
> >> -               &srcu_ops, &srcu_raw_ops, &srcu_expedited_ops,
> >> +               &srcu_ops, &srcu_sync_ops, &srcu_raw_ops,
> >> +               &srcu_raw_sync_ops, &srcu_expedited_ops,
> >>                 &sched_ops, &sched_sync_ops, &sched_expedited_ops, };
> >>
> >>       mutex_lock(&fullstop_mutex);
> >> diff --git a/kernel/srcu.c b/kernel/srcu.c
> >> index d101ed5..532f890 100644
> >> --- a/kernel/srcu.c
> >> +++ b/kernel/srcu.c
> >> @@ -34,10 +34,60 @@
> >>  #include <linux/delay.h>
> >>  #include <linux/srcu.h>
> >>
> >> +static inline void rcu_batch_init(struct rcu_batch *b)
> >> +{
> >> +     b->head = NULL;
> >> +     b->tail = &b->head;
> >> +}
> >> +
> >> +static inline void rcu_batch_queue(struct rcu_batch *b, struct rcu_head *head)
> >> +{
> >> +     *b->tail = head;
> >> +     b->tail = &head->next;
> >> +}
> >> +
> >> +static inline bool rcu_batch_empty(struct rcu_batch *b)
> >> +{
> >> +     return b->tail == &b->head;
> >> +}
> >> +
> >> +static inline struct rcu_head *rcu_batch_dequeue(struct rcu_batch *b)
> >> +{
> >> +     struct rcu_head *head;
> >> +
> >> +     if (rcu_batch_empty(b))
> >> +             return NULL;
> >> +
> >> +     head = b->head;
> >> +     b->head = head->next;
> >> +     if (b->tail == &head->next)
> >> +             rcu_batch_init(b);
> >> +
> >> +     return head;
> >> +}
> >> +
> >> +static inline void rcu_batch_move(struct rcu_batch *to, struct rcu_batch *from)
> >> +{
> >> +     if (!rcu_batch_empty(from)) {
> >> +             *to->tail = from->head;
> >> +             to->tail = from->tail;
> >> +             rcu_batch_init(from);
> >> +     }
> >> +}
> >
> > And perhaps this is why you don't want the multi-tailed queue?
> >
> >> +
> >> +/* single-thread state-machine */
> >> +static void process_srcu(struct work_struct *work);
> >> +
> >>  static int init_srcu_struct_fields(struct srcu_struct *sp)
> >>  {
> >>       sp->completed = 0;
> >> -     mutex_init(&sp->mutex);
> >> +     spin_lock_init(&sp->queue_lock);
> >> +     sp->running = false;
> >> +     rcu_batch_init(&sp->batch_queue);
> >> +     rcu_batch_init(&sp->batch_check0);
> >> +     rcu_batch_init(&sp->batch_check1);
> >> +     rcu_batch_init(&sp->batch_done);
> >> +     INIT_DELAYED_WORK(&sp->work, process_srcu);
> >>       sp->per_cpu_ref = alloc_percpu(struct srcu_struct_array);
> >>       return sp->per_cpu_ref ? 0 : -ENOMEM;
> >>  }
> >> @@ -254,11 +304,9 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock);
> >>   * we repeatedly block for 1-millisecond time periods.  This approach
> >>   * has done well in testing, so there is no need for a config parameter.
> >>   */
> >> -#define SYNCHRONIZE_SRCU_READER_DELAY        5
> >> -#define SYNCHRONIZE_SRCU_TRYCOUNT    2
> >> -#define SYNCHRONIZE_SRCU_EXP_TRYCOUNT        12
> >> +#define SRCU_RETRY_CHECK_DELAY       5
> >>
> >> -static void wait_idx(struct srcu_struct *sp, int idx, int trycount)
> >> +static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount)
> >>  {
> >>       /*
> >>        * If a reader fetches the index before the ->completed increment,
> >> @@ -271,19 +319,12 @@ static void wait_idx(struct srcu_struct *sp, int idx, int trycount)
> >>        */
> >>       smp_mb(); /* D */
> >>
> >> -     /*
> >> -      * SRCU read-side critical sections are normally short, so wait
> >> -      * a small amount of time before possibly blocking.
> >> -      */
> >> -     if (!srcu_readers_active_idx_check(sp, idx)) {
> >> -             udelay(SYNCHRONIZE_SRCU_READER_DELAY);
> >> -             while (!srcu_readers_active_idx_check(sp, idx)) {
> >> -                     if (trycount > 0) {
> >> -                             trycount--;
> >> -                             udelay(SYNCHRONIZE_SRCU_READER_DELAY);
> >> -                     } else
> >> -                             schedule_timeout_interruptible(1);
> >> -             }
> >> +     for (;;) {
> >> +             if (srcu_readers_active_idx_check(sp, idx))
> >> +                     break;
> >> +             if (--trycount <= 0)
> >> +                     return false;
> >> +             udelay(SRCU_RETRY_CHECK_DELAY);
> >>       }
> >>
> >>       /*
> >> @@ -297,6 +338,8 @@ static void wait_idx(struct srcu_struct *sp, int idx, int trycount)
> >>        * the next flipping.
> >>        */
> >>       smp_mb(); /* E */
> >> +
> >> +     return true;
> >>  }
> >>
> >>  /*
> >> @@ -308,10 +351,27 @@ static void srcu_flip(struct srcu_struct *sp)
> >>       ACCESS_ONCE(sp->completed)++;
> >>  }
> >>
> >> +void call_srcu(struct srcu_struct *sp, struct rcu_head *head,
> >> +             void (*func)(struct rcu_head *head))
> >> +{
> >> +     unsigned long flags;
> >> +
> >> +     head->next = NULL;
> >> +     head->func = func;
> >> +     spin_lock_irqsave(&sp->queue_lock, flags);
> >> +     rcu_batch_queue(&sp->batch_queue, head);
> >> +     if (!sp->running) {
> >> +             sp->running = true;
> >> +             queue_delayed_work(system_nrt_wq, &sp->work, 0);
> >> +     }
> >> +     spin_unlock_irqrestore(&sp->queue_lock, flags);
> >> +}
> >> +EXPORT_SYMBOL_GPL(call_srcu);
> >> +
> >>  /*
> >>   * Helper function for synchronize_srcu() and synchronize_srcu_expedited().
> >>   */
> >> -static void __synchronize_srcu(struct srcu_struct *sp, int trycount)
> >> +static void __synchronize_srcu(struct srcu_struct *sp)
> >>  {
> >>       rcu_lockdep_assert(!lock_is_held(&sp->dep_map) &&
> >>                          !lock_is_held(&rcu_bh_lock_map) &&
> >> @@ -319,54 +379,7 @@ static void __synchronize_srcu(struct srcu_struct *sp, int trycount)
> >>                          !lock_is_held(&rcu_sched_lock_map),
> >>                          "Illegal synchronize_srcu() in same-type SRCU (or RCU) read-side critical section");
> >>
> >> -     mutex_lock(&sp->mutex);
> >> -
> >> -     /*
> >> -      * Suppose that during the previous grace period, a reader
> >> -      * picked up the old value of the index, but did not increment
> >> -      * its counter until after the previous instance of
> >> -      * __synchronize_srcu() did the counter summation and recheck.
> >> -      * That previous grace period was OK because the reader did
> >> -      * not start until after the grace period started, so the grace
> >> -      * period was not obligated to wait for that reader.
> >> -      *
> >> -      * However, the current SRCU grace period does have to wait for
> >> -      * that reader.  This is handled by invoking wait_idx() on the
> >> -      * non-active set of counters (hence sp->completed - 1).  Once
> >> -      * wait_idx() returns, we know that all readers that picked up
> >> -      * the old value of ->completed and that already incremented their
> >> -      * counter will have completed.
> >> -      *
> >> -      * But what about readers that picked up the old value of
> >> -      * ->completed, but -still- have not managed to increment their
> >> -      * counter?  We do not need to wait for those readers, because
> >> -      * they will have started their SRCU read-side critical section
> >> -      * after the current grace period starts.
> >> -      *
> >> -      * Because it is unlikely that readers will be preempted between
> >> -      * fetching ->completed and incrementing their counter, wait_idx()
> >> -      * will normally not need to wait.
> >> -      */
> >> -     wait_idx(sp, (sp->completed - 1) & 0x1, trycount);
> >> -
> >> -     /*
> >> -      * Now that wait_idx() has waited for the really old readers,
> >> -      *
> >> -      * Flip the readers' index by incrementing ->completed, then wait
> >> -      * until there are no more readers using the counters referenced by
> >> -      * the old index value.  (Recall that the index is the bottom bit
> >> -      * of ->completed.)
> >> -      *
> >> -      * Of course, it is possible that a reader might be delayed for the
> >> -      * full duration of flip_idx_and_wait() between fetching the
> >> -      * index and incrementing its counter.  This possibility is handled
> >> -      * by the next __synchronize_srcu() invoking wait_idx() for such
> >> -      * readers before starting a new grace period.
> >> -      */
> >> -     srcu_flip(sp);
> >> -     wait_idx(sp, (sp->completed - 1) & 0x1, trycount);
> >> -
> >> -     mutex_unlock(&sp->mutex);
> >> +     __wait_srcu_gp(sp, call_srcu);
> >>  }
> >>
> >>  /**
> >> @@ -385,7 +398,7 @@ static void __synchronize_srcu(struct srcu_struct *sp, int trycount)
> >>   */
> >>  void synchronize_srcu(struct srcu_struct *sp)
> >>  {
> >> -     __synchronize_srcu(sp, SYNCHRONIZE_SRCU_TRYCOUNT);
> >> +     __synchronize_srcu(sp);
> >>  }
> >>  EXPORT_SYMBOL_GPL(synchronize_srcu);
> >>
> >> @@ -406,10 +419,16 @@ EXPORT_SYMBOL_GPL(synchronize_srcu);
> >>   */
> >>  void synchronize_srcu_expedited(struct srcu_struct *sp)
> >>  {
> >> -     __synchronize_srcu(sp, SYNCHRONIZE_SRCU_EXP_TRYCOUNT);
> >> +     __synchronize_srcu(sp);
> >>  }
> >
> > OK, I'll bite...  Why aren't synchronize_srcu_expedited() and
> > synchronize_srcu() different?
> 
> In mb()-based srcu, synchronize_srcu() is very fast,
> synchronize_srcu_expedited() makes less sense than before.

I am worried about expedited callbacks getting backed up behind
non-expedited callbacks (especially given Peter's point about per-VMA
SRCU callbacks) and behind other workqueue uses.

> But when wait_srcu_gp() is move back here, I will use
> a bigger "trycount" for synchronize_srcu_expedited().
> 
> And any problem for srcu_advance_batches()?

I prefer the use of "return" that you and Peter discussed later.

What sort of testing are you doing?

							Thanx, Paul

> Thanks.
> Lai
> 
> >
> >                                                        Thanx, Paul
> >
> >>  EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
> >>
> >> +void srcu_barrier(struct srcu_struct *sp)
> >> +{
> >> +     __synchronize_srcu(sp);
> >> +}
> >> +EXPORT_SYMBOL_GPL(srcu_barrier);
> >> +
> >>  /**
> >>   * srcu_batches_completed - return batches completed.
> >>   * @sp: srcu_struct on which to report batch completion.
> >> @@ -423,3 +442,84 @@ long srcu_batches_completed(struct srcu_struct *sp)
> >>       return sp->completed;
> >>  }
> >>  EXPORT_SYMBOL_GPL(srcu_batches_completed);
> >> +
> >> +#define SRCU_CALLBACK_BATCH  10
> >> +#define SRCU_INTERVAL                1
> >> +
> >> +static void srcu_collect_new(struct srcu_struct *sp)
> >> +{
> >> +     if (!rcu_batch_empty(&sp->batch_queue)) {
> >> +             spin_lock_irq(&sp->queue_lock);
> >> +             rcu_batch_move(&sp->batch_check0, &sp->batch_queue);
> >> +             spin_unlock_irq(&sp->queue_lock);
> >> +     }
> >> +}
> >> +
> >> +static void srcu_advance_batches(struct srcu_struct *sp)
> >> +{
> >> +     int idx = 1 - (sp->completed & 0x1UL);
> >> +
> >> +     /*
> >> +      * SRCU read-side critical sections are normally short, so check
> >> +      * twice after a flip.
> >> +      */
> >> +     if (!rcu_batch_empty(&sp->batch_check1) ||
> >> +         !rcu_batch_empty(&sp->batch_check0)) {
> >> +             if (try_check_zero(sp, idx, 1)) {
> >> +                     rcu_batch_move(&sp->batch_done, &sp->batch_check1);
> >> +                     rcu_batch_move(&sp->batch_check1, &sp->batch_check0);
> >> +                     if (!rcu_batch_empty(&sp->batch_check1)) {
> >> +                             srcu_flip(sp);
> >> +                             if (try_check_zero(sp, 1 - idx, 2)) {
> >> +                                     rcu_batch_move(&sp->batch_done,
> >> +                                             &sp->batch_check1);
> >> +                             }
> >> +                     }
> >> +             }
> >> +     }
> >> +}
> >> +
> >> +static void srcu_invoke_callbacks(struct srcu_struct *sp)
> >> +{
> >> +     int i;
> >> +     struct rcu_head *head;
> >> +
> >> +     for (i = 0; i < SRCU_CALLBACK_BATCH; i++) {
> >> +             head = rcu_batch_dequeue(&sp->batch_done);
> >> +             if (!head)
> >> +                     break;
> >> +             head->func(head);
> >> +     }
> >> +}
> >> +
> >> +static void srcu_reschedule(struct srcu_struct *sp)
> >> +{
> >> +     bool running = true;
> >> +
> >> +     if (rcu_batch_empty(&sp->batch_done) &&
> >> +         rcu_batch_empty(&sp->batch_check1) &&
> >> +         rcu_batch_empty(&sp->batch_check0) &&
> >> +         rcu_batch_empty(&sp->batch_queue)) {
> >> +             spin_lock_irq(&sp->queue_lock);
> >> +             if (rcu_batch_empty(&sp->batch_queue)) {
> >> +                     sp->running = false;
> >> +                     running = false;
> >> +             }
> >> +             spin_unlock_irq(&sp->queue_lock);
> >> +     }
> >> +
> >> +     if (running)
> >> +             queue_delayed_work(system_nrt_wq, &sp->work, SRCU_INTERVAL);
> >> +}
> >> +
> >> +static void process_srcu(struct work_struct *work)
> >> +{
> >> +     struct srcu_struct *sp;
> >> +
> >> +     sp = container_of(work, struct srcu_struct, work.work);
> >> +
> >> +     srcu_collect_new(sp);
> >> +     srcu_advance_batches(sp);
> >> +     srcu_invoke_callbacks(sp);
> >> +     srcu_reschedule(sp);
> >> +}
> >>
> >>
> >>
> >
> > --
> > To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
> > the body of a message to majordomo@vger.kernel.org
> > More majordomo info at  http://vger.kernel.org/majordomo-info.html
> > Please read the FAQ at  http://www.tux.org/lkml/
> 


  reply	other threads:[~2012-03-12 18:03 UTC|newest]

Thread overview: 100+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2012-02-13  2:09 [PATCH RFC tip/core/rcu] rcu: direct algorithmic SRCU implementation Paul E. McKenney
2012-02-15 12:59 ` Peter Zijlstra
2012-02-16  6:35   ` Paul E. McKenney
2012-02-16 10:50     ` Mathieu Desnoyers
2012-02-16 10:52       ` Peter Zijlstra
2012-02-16 11:14         ` Mathieu Desnoyers
2012-02-15 14:31 ` Mathieu Desnoyers
2012-02-15 14:51   ` Mathieu Desnoyers
2012-02-16  6:38     ` Paul E. McKenney
2012-02-16 11:00       ` Mathieu Desnoyers
2012-02-16 11:51         ` Peter Zijlstra
2012-02-16 12:18           ` Mathieu Desnoyers
2012-02-16 12:44             ` Peter Zijlstra
2012-02-16 14:52               ` Mathieu Desnoyers
2012-02-16 14:58                 ` Peter Zijlstra
2012-02-16 15:13               ` Paul E. McKenney
2012-02-20  7:15 ` Lai Jiangshan
2012-02-20 17:44   ` Paul E. McKenney
2012-02-21  1:11     ` Lai Jiangshan
2012-02-21  1:50       ` Paul E. McKenney
2012-02-21  8:44         ` Lai Jiangshan
2012-02-21 17:24           ` Paul E. McKenney
2012-02-22  9:29             ` [PATCH 1/3 RFC paul/rcu/srcu] srcu: Remove fast check path Lai Jiangshan
2012-02-22  9:29             ` [PATCH 2/3 RFC paul/rcu/srcu] srcu: only increase the upper bit for srcu_read_lock() Lai Jiangshan
2012-02-22  9:50               ` Peter Zijlstra
2012-02-22 21:20               ` Paul E. McKenney
2012-02-22 21:26                 ` Paul E. McKenney
2012-02-22 21:39                   ` Steven Rostedt
2012-02-23  1:01                     ` Paul E. McKenney
2012-02-22  9:29             ` [PATCH 3/3 RFC paul/rcu/srcu] srcu: flip only once for every grace period Lai Jiangshan
2012-02-23  1:01               ` Paul E. McKenney
2012-02-24  8:06               ` Lai Jiangshan
2012-02-24 20:01                 ` Paul E. McKenney
2012-02-27  8:01                   ` [PATCH 1/2 RFC] srcu: change the comments of the wait algorithm Lai Jiangshan
2012-02-27  8:01                   ` [PATCH 2/2 RFC] srcu: implement Peter's checking algorithm Lai Jiangshan
2012-02-27 18:30                     ` Paul E. McKenney
2012-02-28  1:51                       ` Lai Jiangshan
2012-02-28 13:47                         ` Paul E. McKenney
2012-02-29 10:07                           ` Lai Jiangshan
2012-02-29 13:55                             ` Paul E. McKenney
2012-03-01  2:31                               ` Lai Jiangshan
2012-03-01 13:20                                 ` Paul E. McKenney
2012-03-10  3:41                                   ` Lai Jiangshan
2012-03-06  8:42             ` [RFC PATCH 0/6 paul/rcu/srcu] srcu: implement call_srcu() Lai Jiangshan
2012-03-06  9:57               ` [PATCH 1/6] remove unused srcu_barrier() Lai Jiangshan
2012-03-06  9:57                 ` [PATCH 2/6] Don't touch the snap in srcu_readers_active() Lai Jiangshan
2012-03-08 19:14                   ` Paul E. McKenney
2012-03-06  9:57                 ` [PATCH 3/6] use "int trycount" instead of "bool expedited" Lai Jiangshan
2012-03-08 19:25                   ` Paul E. McKenney
2012-03-06  9:57                 ` [PATCH 4/6] remove flip_idx_and_wait() Lai Jiangshan
2012-03-06 10:41                   ` Peter Zijlstra
2012-03-07  3:54                   ` [RFC PATCH 5/5 single-thread-version] implement per-domain single-thread state machine call_srcu() Lai Jiangshan
2012-03-08 13:04                     ` Peter Zijlstra
2012-03-08 14:17                       ` Lai Jiangshan
2012-03-08 13:08                     ` Peter Zijlstra
2012-03-08 20:35                     ` Paul E. McKenney
2012-03-10  3:16                       ` Lai Jiangshan
2012-03-12 18:03                         ` Paul E. McKenney [this message]
2012-03-14  7:47                           ` Lai Jiangshan
2012-04-10 20:15                             ` Paul E. McKenney
2012-03-06  9:57                 ` [RFC PATCH 5/6] implement per-cpu&per-domain " Lai Jiangshan
2012-03-06 10:47                   ` Peter Zijlstra
2012-03-08 19:44                     ` Paul E. McKenney
2012-03-06 10:58                   ` Peter Zijlstra
2012-03-06 15:17                     ` Lai Jiangshan
2012-03-06 15:38                       ` Peter Zijlstra
2012-03-08 19:49                         ` Paul E. McKenney
2012-03-10 10:12                           ` Peter Zijlstra
2012-03-12 17:52                             ` Paul E. McKenney
2012-03-06 11:16                   ` Peter Zijlstra
2012-03-06 15:12                     ` Lai Jiangshan
2012-03-06 15:34                       ` Peter Zijlstra
2012-03-08 19:58                         ` Paul E. McKenney
2012-03-10  3:32                           ` Lai Jiangshan
2012-03-10 10:09                           ` Peter Zijlstra
2012-03-12 17:54                             ` Paul E. McKenney
2012-03-12 17:58                               ` Peter Zijlstra
2012-03-12 18:32                                 ` Paul E. McKenney
2012-03-12 20:25                                   ` Peter Zijlstra
2012-03-12 23:15                                     ` Paul E. McKenney
2012-03-12 23:18                                       ` Peter Zijlstra
2012-03-12 23:38                                         ` Paul E. McKenney
2012-03-06 15:26                     ` Lai Jiangshan
2012-03-06 15:37                       ` Peter Zijlstra
2012-03-06 11:17                   ` Peter Zijlstra
2012-03-06 11:22                   ` Peter Zijlstra
2012-03-06 11:35                   ` Peter Zijlstra
2012-03-06 11:36                   ` Peter Zijlstra
2012-03-06 11:39                   ` Peter Zijlstra
2012-03-06 14:50                     ` Lai Jiangshan
2012-03-06 11:52                   ` Peter Zijlstra
2012-03-06 14:44                     ` Lai Jiangshan
2012-03-06 15:31                       ` Peter Zijlstra
2012-03-06 15:32                       ` Peter Zijlstra
2012-03-07  6:44                         ` Lai Jiangshan
2012-03-07  8:10                       ` Gilad Ben-Yossef
2012-03-07  9:21                         ` Lai Jiangshan
2012-03-06 14:47                     ` Lai Jiangshan
2012-03-06  9:57                 ` [PATCH 6/6] add srcu torture test Lai Jiangshan
2012-03-08 19:03                 ` [PATCH 1/6] remove unused srcu_barrier() Paul E. McKenney

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20120312180321.GG2471@linux.vnet.ibm.com \
    --to=paulmck@linux.vnet.ibm.com \
    --cc=Valdis.Kletnieks@vt.edu \
    --cc=akpm@linux-foundation.org \
    --cc=darren@dvhart.com \
    --cc=dhowells@redhat.com \
    --cc=dipankar@in.ibm.com \
    --cc=eag0628@gmail.com \
    --cc=eric.dumazet@gmail.com \
    --cc=fweisbec@gmail.com \
    --cc=josh@joshtriplett.org \
    --cc=laijs@cn.fujitsu.com \
    --cc=linux-kernel@vger.kernel.org \
    --cc=mathieu.desnoyers@polymtl.ca \
    --cc=mingo@elte.hu \
    --cc=niv@us.ibm.com \
    --cc=patches@linaro.org \
    --cc=peterz@infradead.org \
    --cc=rostedt@goodmis.org \
    --cc=tglx@linutronix.de \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.