qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: Peter Xu <peterx@redhat.com>
To: Akihiko Odaki <odaki@rsg.ci.i.u-tokyo.ac.jp>
Cc: qemu-devel@nongnu.org,
	"Dmitry Osipenko" <dmitry.osipenko@collabora.com>,
	"Paolo Bonzini" <pbonzini@redhat.com>,
	"Michael S. Tsirkin" <mst@redhat.com>,
	"Alex Bennée" <alex.bennee@linaro.org>
Subject: Re: [PATCH 4/5] rcu: Wake the RCU thread when draining
Date: Wed, 5 Nov 2025 15:43:42 -0500	[thread overview]
Message-ID: <aQu2_izqViAbJ3A9@x1.local> (raw)
In-Reply-To: <38c07d37-1b4d-42d2-ab41-fbe1c860dd3b@rsg.ci.i.u-tokyo.ac.jp>

On Mon, Nov 03, 2025 at 06:45:30PM +0900, Akihiko Odaki wrote:
> On 2025/10/30 3:22, Peter Xu wrote:
> > On Wed, Oct 29, 2025 at 03:12:48PM +0900, Akihiko Odaki wrote:
> > > drain_call_rcu() triggers the force quiescent state, but it can be
> > > delayed if the RCU thread is sleeping. Ensure the force quiescent state
> > > is immediately triggered by waking the RCU thread up.
> > > 
> > > The logic to trigger the force quiescent state is decoupled as
> > > force_rcu() so that it can be used independently.
> > > 
> > > Signed-off-by: Akihiko Odaki <odaki@rsg.ci.i.u-tokyo.ac.jp>
> > > ---
> > >   include/qemu/rcu.h |   1 +
> > >   util/rcu.c         | 106 ++++++++++++++++++++++++++++++++---------------------
> > >   2 files changed, 65 insertions(+), 42 deletions(-)
> > > 
> > > diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
> > > index 020dbe4d8b77..d6aa4e5854d3 100644
> > > --- a/include/qemu/rcu.h
> > > +++ b/include/qemu/rcu.h
> > > @@ -118,6 +118,7 @@ static inline void rcu_read_unlock(void)
> > >       }
> > >   }
> > > +void force_rcu(void);
> > >   void synchronize_rcu(void);
> > >   /*
> > > diff --git a/util/rcu.c b/util/rcu.c
> > > index 3c4af9d213c8..85f9333f5dff 100644
> > > --- a/util/rcu.c
> > > +++ b/util/rcu.c
> > > @@ -49,10 +49,13 @@
> > >   unsigned long rcu_gp_ctr = RCU_GP_LOCKED;
> > >   QemuEvent rcu_gp_event;
> > > -static int in_drain_call_rcu;
> > > +static bool forced;
> > >   static int rcu_call_count;
> > >   static QemuMutex rcu_registry_lock;
> > > +/* Set when the forced variable is set or rcu_call_count becomes non-zero. */
> > > +static QemuEvent sync_event;
> > > +
> > >   /*
> > >    * Check whether a quiescent state was crossed between the beginning of
> > >    * update_counter_and_wait and now.
> > > @@ -74,36 +77,21 @@ QEMU_DEFINE_CO_TLS(struct rcu_reader_data, rcu_reader)
> > >   typedef QLIST_HEAD(, rcu_reader_data) ThreadList;
> > >   static ThreadList registry = QLIST_HEAD_INITIALIZER(registry);
> > > +void force_rcu(void)
> > > +{
> > > +    qatomic_set(&forced, true);
> > > +    qemu_event_set(&sync_event);
> > > +}
> > > +
> > >   /* Wait for previous parity/grace period to be empty of readers.  */
> > > -static void wait_for_readers(void)
> > > +static void wait_for_readers(bool sleep)
> > >   {
> > >       ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders);
> > >       struct rcu_reader_data *index, *tmp;
> > > -    int sleeps = 0;
> > > -    bool forced = false;
> > > +    int sleeps = sleep ? 5 : 0;
> > > +    bool waiting = false;
> > >       for (;;) {
> > > -        /*
> > > -         * Force the grace period to end and wait for it if any of the
> > > -         * following heuristical conditions are satisfied:
> > > -         * - A decent number of callbacks piled up.
> > > -         * - It timed out.
> > > -         * - It is in a drain_call_rcu() call.
> > > -         *
> > > -         * Otherwise, periodically poll the grace period, hoping it ends
> > > -         * promptly.
> > > -         */
> > > -        if (!forced &&
> > > -            (qatomic_read(&rcu_call_count) >= RCU_CALL_MIN_SIZE ||
> > > -             sleeps >= 5 || qatomic_read(&in_drain_call_rcu))) {
> > > -            forced = true;
> > > -
> > > -            QLIST_FOREACH(index, &registry, node) {
> > > -                notifier_list_notify(&index->force_rcu, NULL);
> > > -                qatomic_set(&index->waiting, true);
> > > -            }
> > > -        }
> > 
> > IIUC this is the part to set index->waiting first whenever necessary, then
> > that'll guarantee the wait(rcu_gp_event) will be notified in rcu unlock.
> > 
> > Now we removed this chunk, then could it happen if waiting=true and the
> > wait(rcu_gp_event) may wait for more than it should (as nobody will wake it
> > up if all threads have waiting=false)?
> 
> It is not removed, but it is moved along with the assignment of the waiting
> variable. So index->waiting is still set whenever the waiting variable is
> set and no hang up will happen.

Ah, I noticed the "waiting" is actually the global variable and I think
when I read it last time I somehow misread it with "sleep" (which was
passed down from the caller).

        if (waiting) {  <--------------------
            qemu_event_wait(&rcu_gp_event);
            qemu_event_reset(&rcu_gp_event);
        } else if (qatomic_read(&rcu_call_count) >= RCU_CALL_MIN_SIZE ||

Yeah, I think it's non-issue.  Please ignore my question.

> 
> > 
> > The other thing is, right below here there's the code and comment:
> > 
> >          /* Here, order the stores to index->waiting before the loads of
> >           * index->ctr.  Pairs with smp_mb_placeholder() in rcu_read_unlock(),
> >           * ensuring that the loads of index->ctr are sequentially consistent.
> >           *
> >           * If this is the last iteration, this barrier also prevents
> >           * frees from seeping upwards, and orders the two wait phases
> >           * on architectures with 32-bit longs; see enter_qs().
> >           */
> >          smp_mb_global();
> > 
> > IIUC it explains the mb_global() to order the updates of waiting and the
> > reads of index->ctr.  It doesn't look like applicable anymore.  Said that,
> > I think we should indeed still need some barrier to make sure we read
> > index->ctr at least to be after we update global gp_ctr (done before
> > calling wait_for_readers()).  I'm not sure if it means the mb is needed,
> > however maybe at least the comment is outdated if so.
> 
> It is still applicable. The stores to index->waiting is still present and
> needs to be ordered before the loads of index->ctr.
> 
> > 
> > > -
> > >           /* Here, order the stores to index->waiting before the loads of
> > >            * index->ctr.  Pairs with smp_mb_placeholder() in rcu_read_unlock(),
> > >            * ensuring that the loads of index->ctr are sequentially consistent.
> > > @@ -150,7 +138,8 @@ static void wait_for_readers(void)
> > >            */
> > >           qemu_mutex_unlock(&rcu_registry_lock);
> > > -        if (forced) {
> > > +        if (waiting) {
> > > +            /* Wait for the forced quiescent state. */
> > >               qemu_event_wait(&rcu_gp_event);
> > >               /*
> > > @@ -158,9 +147,25 @@ static void wait_for_readers(void)
> > >                * while we walk the list.
> > >                */
> > >               qemu_event_reset(&rcu_gp_event);
> > > +        } else if (qatomic_read(&rcu_call_count) >= RCU_CALL_MIN_SIZE ||
> > > +                   !sleeps || qemu_event_timedwait(&sync_event, 10)) {
> > > +            /*
> > > +             * Now one of the following heuristical conditions is satisfied:
> > > +             * - A decent number of callbacks piled up.
> > > +             * - It timed out.
> > > +             * - force_rcu() was called.
> > > +             *
> > > +             * Force a quiescent state.
> > > +             */
> > > +            waiting = true;
> > > +
> > > +            QLIST_FOREACH(index, &registry, node) {
> > > +                notifier_list_notify(&index->force_rcu, NULL);
> > > +                qatomic_set(&index->waiting, true);
> > > +            }
> > >           } else {
> > > -            g_usleep(10000);
> > > -            sleeps++;
> > > +            /* Try again. */
> > > +            sleeps--;
> > >           }
> > >           qemu_mutex_lock(&rcu_registry_lock);
> > > @@ -170,7 +175,7 @@ static void wait_for_readers(void)
> > >       QLIST_SWAP(&registry, &qsreaders, node);
> > >   }
> > > -static void enter_qs(void)
> > > +static void enter_qs(bool sleep)
> > >   {
> > >       /* Write RCU-protected pointers before reading p_rcu_reader->ctr.
> > >        * Pairs with smp_mb_placeholder() in rcu_read_lock().
> > > @@ -189,14 +194,14 @@ static void enter_qs(void)
> > >                * Switch parity: 0 -> 1, 1 -> 0.
> > >                */
> > >               qatomic_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
> > > -            wait_for_readers();
> > > +            wait_for_readers(sleep);
> > >               qatomic_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
> > >           } else {
> > >               /* Increment current grace period.  */
> > >               qatomic_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
> > >           }
> > > -        wait_for_readers();
> > > +        wait_for_readers(sleep);
> > >       }
> > >   }
> > > @@ -205,7 +210,6 @@ static void enter_qs(void)
> > >    */
> > >   static struct rcu_head dummy;
> > >   static struct rcu_head *head = &dummy, **tail = &dummy.next;
> > > -static QemuEvent rcu_call_ready_event;
> > >   static void enqueue(struct rcu_head *node)
> > >   {
> > > @@ -282,6 +286,7 @@ static void *call_rcu_thread(void *opaque)
> > >       rcu_register_thread();
> > >       for (;;) {
> > > +        bool sleep = true;
> > >           int n;
> > >           /*
> > > @@ -289,7 +294,7 @@ static void *call_rcu_thread(void *opaque)
> > >            * added before enter_qs() starts.
> > >            */
> > >           for (;;) {
> > > -            qemu_event_reset(&rcu_call_ready_event);
> > > +            qemu_event_reset(&sync_event);
> > >               n = qatomic_read(&rcu_call_count);
> > >               if (n) {
> > >                   break;
> > > @@ -298,20 +303,36 @@ static void *call_rcu_thread(void *opaque)
> > >   #if defined(CONFIG_MALLOC_TRIM)
> > >               malloc_trim(4 * 1024 * 1024);
> > >   #endif
> > > -            qemu_event_wait(&rcu_call_ready_event);
> > > +            qemu_event_wait(&sync_event);
> > > +        }
> > > +
> > > +        /*
> > > +         * Ensure that an event for a rcu_call_count change will not interrupt
> > > +         * wait_for_readers().
> > > +         */
> > > +        qemu_event_reset(&sync_event);
> > > +
> > > +        /*
> > > +         * Ensure that the forced variable has not been set after fetching
> > > +         * rcu_call_count; otherwise we may get confused by a force quiescent
> > > +         * state request for an element later than n.
> > > +         */
> > > +        while (qatomic_xchg(&forced, false)) {
> > > +            sleep = false;
> > > +            n = qatomic_read(&rcu_call_count);
> > >           }
> > 
> > This is pretty tricky, and I wonder if it will make the code easier to read
> > if we convert the sync_event to be a semaphore instead.  When as a sem, it
> > will take account of whatever kick to it, either a call_rcu1() or an
> > enforced rcu flush, so that we don't need to reset it.  Meanwhile, we don't
> > worry on an slightly outdated "n" read because the 2nd round of sem_wait()
> > will catch that new "n".
> > 
> > Instead, worst case is rcu thread runs one more round without seeing
> > callbacks on the queue.
> > 
> > I'm not sure if that could help simplying code, maybe also make it less
> > error-prone.
> 
> Semaphore is not applicable here because it will not de-duplicate concurrent
> kicks of RCU threads.

Why concurrent kicks of rcu threads is a problem?  QemuSemaphore is
thread-safe itself, meanwhile IIUC it only still causes call_rcu_thread()
loops some more rounds reading "n", which looks all safe. No?

> 
> > 
> > > -        enter_qs();
> > > +        enter_qs(sleep);
> > >           qatomic_sub(&rcu_call_count, n);
> > >           bql_lock();
> > >           while (n > 0) {
> > >               node = try_dequeue();
> > >               while (!node) {
> > 
> > I have a pure question here not relevant to your changes.. do you know when
> > this "if" will trigger?  It seems to me the enqueue() should always happen
> > before the increment of rcu_call_count:
> > 
> > void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
> > {
> >      node->func = func;
> >      enqueue(node);
> > 
> >      if (!qatomic_fetch_inc(&rcu_call_count)) {
> >          qemu_event_set(&sync_event);
> >      }
> > }>
> > I believe qatomic_fetch_inc() is RMW so it's strong mb and order
> > guaranteed.  Then here why the node can be null even if we're sure >=n have
> > been enqueued?
> 
> Indeed, enqueue() always happens before the increment of rcu_call_count
> performed by the same thread.
> 
> The node can still be NULL when there are two concurrent call_rcu1()
> executions. In the following example, rcu_call_count will be greater than
> the number of visible nodes after (A) and before (B):
> 
> Thread T                 Thread U
> call_rcu1(O)
>  enqueue(O)
>   Load N from tail
>   tail = O->next
>                          call_rcu1(P)
>                           enqueue(P)
>                            Load O->next from tail
>                            tail = P
>                            O->next = P
>                           rcu_call_count++ (A)
>   N->next = O (B)
>   rcu_call_count++

Thanks, yeah it makes sense.  If you think worthwhile, maybe we could add a
comment after the first try_dequeue().

-- 
Peter Xu



  reply	other threads:[~2025-11-05 20:44 UTC|newest]

Thread overview: 29+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-10-29  6:12 [PATCH 0/5] virtio-gpu: Force RCU when unmapping blob Akihiko Odaki
2025-10-29  6:12 ` [PATCH 1/5] futex: Add qemu_futex_timedwait() Akihiko Odaki
2025-10-30 16:13   ` Alex Bennée
2025-10-29  6:12 ` [PATCH 2/5] qemu-thread: Add qemu_event_timedwait() Akihiko Odaki
2025-10-29  6:12 ` [PATCH 3/5] rcu: Use call_rcu() in synchronize_rcu() Akihiko Odaki
2025-10-29  6:12 ` [PATCH 4/5] rcu: Wake the RCU thread when draining Akihiko Odaki
2025-10-29 18:22   ` Peter Xu
2025-11-03  9:45     ` Akihiko Odaki
2025-11-05 20:43       ` Peter Xu [this message]
2025-11-06  1:40         ` Akihiko Odaki
2025-11-06 21:52           ` Peter Xu
2025-11-07  1:47             ` Akihiko Odaki
2025-11-07 14:00               ` Peter Xu
2025-11-08  1:47                 ` Akihiko Odaki
2025-11-13 17:03                   ` Peter Xu
2025-11-14  1:24                     ` Akihiko Odaki
2025-11-14 15:30                       ` Peter Xu
2025-11-15  1:58                         ` Akihiko Odaki
2025-11-15  2:59                           ` Akihiko Odaki
2025-11-17 16:42                             ` Peter Xu
2025-11-17 22:53                               ` Akihiko Odaki
2025-11-17 16:39                           ` Peter Xu
     [not found]             ` <1b318ad8-48b3-4968-86ca-c62aef3b3bd4@rsg.ci.i.u-tokyo.ac.jp>
     [not found]               ` <7c49d808-ccb8-4262-ae6c-2ac746b43b80@rsg.ci.i.u-tokyo.ac.jp>
2025-11-13 17:30                 ` Peter Xu
2025-11-14  1:12                   ` Akihiko Odaki
2025-10-29  6:12 ` [PATCH 5/5] virtio-gpu: Force RCU when unmapping blob Akihiko Odaki
2025-10-30 11:18   ` Dmitry Osipenko
2025-10-30 11:17 ` [PATCH 0/5] " Dmitry Osipenko
2025-10-30 17:59 ` Alex Bennée
2025-10-31 21:32 ` Alex Bennée

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=aQu2_izqViAbJ3A9@x1.local \
    --to=peterx@redhat.com \
    --cc=alex.bennee@linaro.org \
    --cc=dmitry.osipenko@collabora.com \
    --cc=mst@redhat.com \
    --cc=odaki@rsg.ci.i.u-tokyo.ac.jp \
    --cc=pbonzini@redhat.com \
    --cc=qemu-devel@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).