From: Fam Zheng <famz@redhat.com>
To: Paolo Bonzini <pbonzini@redhat.com>
Cc: borntraeger@de.ibm.com, qemu-devel@nongnu.org, stefanha@redhat.com
Subject: Re: [Qemu-devel] [PATCH 04/15] rcu: add call_rcu
Date: Mon, 26 Jan 2015 14:04:07 +0800 [thread overview]
Message-ID: <20150126060407.GD2354@ad.nay.redhat.com> (raw)
In-Reply-To: <1421938053-10318-5-git-send-email-pbonzini@redhat.com>
On Thu, 01/22 15:47, Paolo Bonzini wrote:
> Asynchronous callbacks provided by call_rcu are particularly important
> for QEMU, because the BQL makes it hard to use synchronize_rcu.
>
> In addition, the current RCU implementation is not particularly friendly
> to multiple concurrent synchronize_rcu callers, making call_rcu even
> more important.
>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
> docs/rcu.txt | 110 +++++++++++++++++++++++++++++++++++++++++++++++--
> include/qemu/rcu.h | 22 ++++++++++
> util/rcu.c | 119 +++++++++++++++++++++++++++++++++++++++++++++++++++++
> 3 files changed, 247 insertions(+), 4 deletions(-)
>
> diff --git a/docs/rcu.txt b/docs/rcu.txt
> index 9938ad3..61752b9 100644
> --- a/docs/rcu.txt
> +++ b/docs/rcu.txt
> @@ -82,7 +82,50 @@ The core RCU API is small:
> Note that it would be valid for another update to come while
> synchronize_rcu is running. Because of this, it is better that
> the updater releases any locks it may hold before calling
> - synchronize_rcu.
> + synchronize_rcu. If this is not possible (for example, because
> + the updater is protected by the BQL), you can use call_rcu.
> +
> + void call_rcu1(struct rcu_head * head,
> + void (*func)(struct rcu_head *head));
> +
> + This function invokes func(head) after all pre-existing RCU
> + read-side critical sections on all threads have completed. This
> + marks the end of the removal phase, with func taking care
> + asynchronously of the reclamation phase.
> +
> + The foo struct needs to have an rcu_head structure added,
> + perhaps as follows:
> +
> + struct foo {
> + struct rcu_head rcu;
> + int a;
> + char b;
> + long c;
> + };
> +
> + so that the reclaimer function can fetch the struct foo address
> + and free it:
> +
> + call_rcu1(&foo.rcu, foo_reclaim);
> +
> + void foo_reclaim(struct rcu_head *rp)
> + {
> + struct foo *fp = container_of(rp, struct foo, rcu);
> + g_free(fp);
> + }
> +
> + For the common case where the rcu_head member is the first of the
> + struct, you can use the following macro.
> +
> + void call_rcu(T *p,
> + void (*func)(T *p),
> + field-name);
> +
> + call_rcu1 is typically used through this macro, in the common case
> + where the "struct rcu_head" is the first field in the struct. In
> + the above case, one could have written simply:
> +
> + call_rcu(foo_reclaim, g_free, rcu);
>
> typeof(*p) atomic_rcu_read(p);
>
> @@ -153,6 +196,11 @@ DIFFERENCES WITH LINUX
> - atomic_rcu_read and atomic_rcu_set replace rcu_dereference and
> rcu_assign_pointer. They take a _pointer_ to the variable being accessed.
>
> +- call_rcu is a macro that has an extra argument (the name of the first
> + field in the struct, which must be a struct rcu_head), and expects the
> + type of the callback's argument to be the type of the first argument.
> + call_rcu1 is the same as Linux's call_rcu.
> +
>
> RCU PATTERNS
> ============
> @@ -206,7 +254,47 @@ The write side looks simply like this (with appropriate locking):
> synchronize_rcu();
> free(old);
>
> -Note that the same idiom would be possible with reader/writer
> +If the processing cannot be done purely within the critical section, it
> +is possible to combine this idiom with a "real" reference count:
> +
> + rcu_read_lock();
> + p = atomic_rcu_read(&foo);
> + foo_ref(p);
> + rcu_read_unlock();
> + /* do something with p. */
> + foo_unref(p);
> +
> +The write side can be like this:
> +
> + qemu_mutex_lock(&foo_mutex);
> + old = foo;
> + atomic_rcu_set(&foo, new);
> + qemu_mutex_unlock(&foo_mutex);
> + synchronize_rcu();
> + foo_unref(old);
> +
> +or with call_rcu:
> +
> + qemu_mutex_lock(&foo_mutex);
> + old = foo;
> + atomic_rcu_set(&foo, new);
> + qemu_mutex_unlock(&foo_mutex);
> + call_rcu(foo_unref, old, rcu);
> +
> +In both cases, the write side only performs removal. Reclamation
> +happens when the last reference to a "foo" object is dropped.
> +Using synchronize_rcu() is undesirably expensive, because the
> +last reference may be dropped on the read side. Hence you can
> +use call_rcu() instead:
> +
> + foo_unref(struct foo *p) {
> + if (atomic_fetch_dec(&p->refcount) == 1) {
> + call_rcu(foo_destroy, p, rcu);
> + }
> + }
> +
> +
> +Note that the same idioms would be possible with reader/writer
> locks:
>
> read_lock(&foo_rwlock); write_mutex_lock(&foo_rwlock);
> @@ -216,13 +304,27 @@ locks:
> write_mutex_unlock(&foo_rwlock);
> free(p);
>
> + ------------------------------------------------------------------
> +
> + read_lock(&foo_rwlock); write_mutex_lock(&foo_rwlock);
> + p = foo; old = foo;
> + foo_ref(p); foo = new;
> + read_unlock(&foo_rwlock); foo_unref(old);
> + /* do something with p. */ write_mutex_unlock(&foo_rwlock);
> + read_lock(&foo_rwlock);
> + foo_unref(p);
> + read_unlock(&foo_rwlock);
> +
> +foo_unref could use a mechanism such as bottom halves to move deallocation
> +out of the write-side critical section.
> +
>
> RCU resizable arrays
> --------------------
>
> Resizable arrays can be used with RCU. The expensive RCU synchronization
> -only needs to take place when the array is resized. The two items to
> -take care of are:
> +(or call_rcu) only needs to take place when the array is resized.
> +The two items to take care of are:
>
> - ensuring that the old version of the array is available between removal
> and reclamation;
> diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
> index da043f2..068a279 100644
> --- a/include/qemu/rcu.h
> +++ b/include/qemu/rcu.h
> @@ -118,6 +118,28 @@ extern void synchronize_rcu(void);
> extern void rcu_register_thread(void);
> extern void rcu_unregister_thread(void);
>
> +struct rcu_head;
> +typedef void RCUCBFunc(struct rcu_head *head);
> +
> +struct rcu_head {
> + struct rcu_head *next;
> + RCUCBFunc *func;
> +};
> +
> +extern void call_rcu1(struct rcu_head *head, RCUCBFunc *func);
> +
> +/* The operands of the minus operator must have the same type,
> + * which must be the one that we specify in the cast.
> + */
> +#define call_rcu(head, func, field) \
> + call_rcu1(({ \
> + char __attribute__((unused)) \
> + offset_must_be_zero[-offsetof(typeof(*(head)), field)], \
> + func_type_invalid = (func) - (void (*)(typeof(head)))(func); \
> + &(head)->field; \
> + }), \
> + (RCUCBFunc *)(func))
> +
> #ifdef __cplusplus
> }
> #endif
> diff --git a/util/rcu.c b/util/rcu.c
> index 1f737d5..c9c3e6e 100644
> --- a/util/rcu.c
> +++ b/util/rcu.c
> @@ -26,6 +26,7 @@
> * IBM's contributions to this file may be relicensed under LGPLv2 or later.
> */
>
> +#include "qemu-common.h"
> #include <stdio.h>
> #include <assert.h>
> #include <stdlib.h>
> @@ -33,6 +34,7 @@
> #include <errno.h>
> #include "qemu/rcu.h"
> #include "qemu/atomic.h"
> +#include "qemu/thread.h"
>
> /*
> * Global grace period counter. Bit 0 is always one in rcu_gp_ctr.
> @@ -149,6 +151,116 @@ void synchronize_rcu(void)
> qemu_mutex_unlock(&rcu_gp_lock);
> }
>
> +
> +#define RCU_CALL_MIN_SIZE 30
> +
> +/* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h
> + * from liburcu. Note that head is only used by the consumer.
> + */
> +static struct rcu_head dummy;
> +static struct rcu_head *head = &dummy, **tail = &dummy.next;
> +static int rcu_call_count;
> +static QemuEvent rcu_call_ready_event;
> +
> +static void enqueue(struct rcu_head *node)
> +{
> + struct rcu_head **old_tail;
> +
> + node->next = NULL;
> + old_tail = atomic_xchg(&tail, &node->next);
> + atomic_mb_set(old_tail, node);
> +}
> +
> +static struct rcu_head *try_dequeue(void)
> +{
> + struct rcu_head *node, *next;
> +
> +retry:
> + /* Test for an empty list, which we do not expect. Note that for
> + * the consumer head and tail are always consistent. The head
> + * is consistent because only the consumer reads/writes it.
> + * The tail, because it is the first step in the enqueuing.
> + * It is only the next pointers that might be inconsistent.
> + */
> + if (head == &dummy && atomic_mb_read(&tail) == &dummy.next) {
> + abort();
> + }
> +
> + /* If the head node has NULL in its next pointer, the value is
> + * wrong and we need to wait until its enqueuer finishes the update.
> + */
> + node = head;
> + next = atomic_mb_read(&head->next);
> + if (!next) {
> + return NULL;
> + }
> +
> + /* Since we are the sole consumer, and we excluded the empty case
> + * above, the queue will always have at least two nodes: the
> + * dummy node, and the one being removed. So we do not need to update
> + * the tail pointer.
> + */
> + head = next;
> +
> + /* If we dequeued the dummy node, add it back at the end and retry. */
> + if (node == &dummy) {
> + enqueue(node);
> + goto retry;
> + }
> +
> + return node;
> +}
> +
> +static void *call_rcu_thread(void *opaque)
> +{
> + struct rcu_head *node;
> +
> + for (;;) {
> + int tries = 0;
> + int n = atomic_read(&rcu_call_count);
> +
> + /* Heuristically wait for a decent number of callbacks to pile up.
> + * Fetch rcu_call_count now, we only must process elements that were
> + * added before synchronize_rcu() starts.
> + */
> + while (n < RCU_CALL_MIN_SIZE && ++tries <= 5) {
> + g_usleep(100000);
> + qemu_event_reset(&rcu_call_ready_event);
> + n = atomic_read(&rcu_call_count);
> + if (n < RCU_CALL_MIN_SIZE) {
> + qemu_event_wait(&rcu_call_ready_event);
> + n = atomic_read(&rcu_call_count);
> + }
> + }
> +
> + atomic_sub(&rcu_call_count, n);
> + synchronize_rcu();
> + while (n > 0) {
> + node = try_dequeue();
> + while (!node) {
> + qemu_event_reset(&rcu_call_ready_event);
> + node = try_dequeue();
> + if (!node) {
> + qemu_event_wait(&rcu_call_ready_event);
> + node = try_dequeue();
> + }
> + }
> +
> + n--;
> + node->func(node);
> + }
> + }
> + abort();
> +}
> +
> +void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
> +{
> + node->func = func;
> + enqueue(node);
> + atomic_inc(&rcu_call_count);
> + qemu_event_set(&rcu_call_ready_event);
> +}
> +
> void rcu_register_thread(void)
> {
> assert(rcu_reader.ctr == 0);
> @@ -166,7 +278,14 @@ void rcu_unregister_thread(void)
>
> static void __attribute__((__constructor__)) rcu_init(void)
> {
> + QemuThread thread;
> +
> qemu_mutex_init(&rcu_gp_lock);
> qemu_event_init(&rcu_gp_event, true);
> +
> + qemu_event_init(&rcu_call_ready_event, false);
> + qemu_thread_create(&thread, "call_rcu", call_rcu_thread,
> + NULL, QEMU_THREAD_DETACHED);
> +
> rcu_register_thread();
> }
> --
> 1.8.3.1
>
>
>
Reviewed-by: Fam Zheng <famz@redhat.com>
next prev parent reply other threads:[~2015-01-26 6:04 UTC|newest]
Thread overview: 27+ messages / expand[flat|nested] mbox.gz Atom feed top
2015-01-22 14:47 [Qemu-devel] [PATCH v2 00/15] RCUification of the memory API, parts 1 and 2 Paolo Bonzini
2015-01-22 14:47 ` [Qemu-devel] [PATCH 01/15] rcu: add rcu library Paolo Bonzini
2015-01-26 3:13 ` Fam Zheng
2015-01-22 14:47 ` [Qemu-devel] [PATCH 02/15] rcu: add rcutorture Paolo Bonzini
2015-01-26 3:31 ` Fam Zheng
2015-01-22 14:47 ` [Qemu-devel] [PATCH 03/15] rcu: allow nesting of rcu_read_lock/rcu_read_unlock Paolo Bonzini
2015-01-26 3:32 ` Fam Zheng
2015-01-22 14:47 ` [Qemu-devel] [PATCH 04/15] rcu: add call_rcu Paolo Bonzini
2015-01-26 6:04 ` Fam Zheng [this message]
2015-01-22 14:47 ` [Qemu-devel] [PATCH 05/15] memory: remove assertion on memory_region_destroy Paolo Bonzini
2015-01-26 6:04 ` Fam Zheng
2015-01-22 14:47 ` [Qemu-devel] [PATCH 06/15] memory: protect current_map by RCU Paolo Bonzini
2015-01-26 6:16 ` Fam Zheng
2015-01-22 14:47 ` [Qemu-devel] [PATCH 07/15] memory: avoid ref/unref in memory_region_find Paolo Bonzini
2015-01-26 6:24 ` Fam Zheng
2015-01-26 9:19 ` Paolo Bonzini
2015-01-22 14:47 ` [Qemu-devel] [PATCH 08/15] exec: introduce cpu_reload_memory_map Paolo Bonzini
2015-01-22 14:47 ` [Qemu-devel] [PATCH 09/15] exec: make iotlb RCU-friendly Paolo Bonzini
2015-01-22 14:47 ` [Qemu-devel] [PATCH 10/15] exec: RCUify AddressSpaceDispatch Paolo Bonzini
2015-01-28 5:45 ` Fam Zheng
2015-01-28 9:44 ` Paolo Bonzini
2015-02-03 10:07 ` Paolo Bonzini
2015-01-22 14:47 ` [Qemu-devel] [PATCH 11/15] rcu: introduce RCU-enabled QLIST Paolo Bonzini
2015-01-22 14:47 ` [Qemu-devel] [PATCH 12/15] exec: protect mru_block with RCU Paolo Bonzini
2015-01-22 14:47 ` [Qemu-devel] [PATCH 13/15] cosmetic changes preparing for the following patches Paolo Bonzini
2015-01-22 14:47 ` [Qemu-devel] [PATCH 14/15] exec: convert ram_list to QLIST Paolo Bonzini
2015-01-22 14:47 ` [Qemu-devel] [PATCH 15/15] Convert ram_list to RCU Paolo Bonzini
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20150126060407.GD2354@ad.nay.redhat.com \
--to=famz@redhat.com \
--cc=borntraeger@de.ibm.com \
--cc=pbonzini@redhat.com \
--cc=qemu-devel@nongnu.org \
--cc=stefanha@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.