qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
* [Qemu-devel] [RFC PATCH 00/13] RCU implementation for QEMU
@ 2011-08-15 21:08 Paolo Bonzini
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 01/13] add smp_mb() Paolo Bonzini
                   ` (12 more replies)
  0 siblings, 13 replies; 16+ messages in thread
From: Paolo Bonzini @ 2011-08-15 21:08 UTC (permalink / raw)
  To: qemu-devel

Hi,

this is a proof of concept of embedding RCU within QEMU.  While I had
already played a bit with liburcu (from which the algorithms are taken)
a while ago, discussions on the migration thread motivated me to
"finish" this series on the way to KVM Forum.

Unfortunately we cannot use liburcu directly for three reasons: 1) it
is not portable to Windows; 2) it is hardly packaged by distributions;
3) we have special needs for call_rcu to run callbacks under the global
lock, so we need to reimplement at least that part.  The amount of
duplicated code is quite small (around 400 lines of heavily-commented
code), so I think it is acceptable to do that.  The APIs are of course
compatible, so we can always switch later on.

Patches 1 to 4 are preparatory and add some synchronization primitives
that are missing in qemu-threads and used later on.  These wrap liburcu's
use of futexes so that we can make the code portable.  It should be
possible, but not easy, to convince upstream to switch.

Patches 5 to 6 add the main library and a test program.

Patches 7 and 8 add the QEMU-specific call_rcu implementation.  It also
uses some techniques from upstream, but it is more or less self-contained.

Patch 9 is an optimization that I have submitted upstream and will
hopefully be included there too.

Patch 10 marks quiescent states in QEMU's threads.  Patch 11 converts
the iohandlers to RCU---not particularly useful, but it lets us test
all this nice stuff, and especially the call_rcu machinery.  Finally,
patches 12 and 13 are an initial attempt at RCUifying the memory list
with zero overhead for the _write_ sides of the TCG execution threads.

Incomplet, but hopefully not incorrekt (upstream has a formal model of
the core RCU wakeup functionality, and I could adapt to model this
stuff too), so I thought I'd just throw this out.

Paolo Bonzini (13):
  add smp_mb()
  rename qemu_event_{init,read}
  qemu-threads: add QemuEvent
  qemu-threads: add QemuOnce
  add rcu library
  rcu: add rcutorture
  osdep: add qemu_msleep
  add call_rcu support
  rcu: avoid repeated system calls
  rcu: report quiescent states
  rcuify iohandlers
  split MRU ram list
  RCUify ram_list

 Makefile.objs       |    4 +-
 arch_init.c         |   14 ++
 compiler.h          |    2 +
 cpu-all.h           |    8 +-
 cpus.c              |   24 ++-
 exec.c              |  140 +++++++++++-----
 iohandler.c         |   45 +++---
 kvm-all.c           |    3 +
 os-win32.c          |    3 +
 osdep.h             |    1 +
 oslib-posix.c       |    7 +-
 oslib-win32.c       |    5 +
 qemu-barrier.h      |    2 +
 qemu-queue.h        |   11 ++
 qemu-thread-posix.c |  129 +++++++++++++++
 qemu-thread-posix.h |   13 ++
 qemu-thread-win32.c |   45 ++++++
 qemu-thread-win32.h |    9 +
 qemu-thread.h       |   11 ++
 rcu-call.c          |  189 ++++++++++++++++++++++
 rcu-pointer.h       |  119 ++++++++++++++
 rcu.c               |  226 +++++++++++++++++++++++++++
 rcu.h               |  145 +++++++++++++++++
 rcutorture.c        |  433 +++++++++++++++++++++++++++++++++++++++++++++++++++
 vl.c                |    4 +
 25 files changed, 1513 insertions(+), 79 deletions(-)
 create mode 100644 rcu-call.c
 create mode 100644 rcu-pointer.h
 create mode 100644 rcu.c
 create mode 100644 rcu.h
 create mode 100644 rcutorture.c

-- 
1.7.6

^ permalink raw reply	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH 01/13] add smp_mb()
  2011-08-15 21:08 [Qemu-devel] [RFC PATCH 00/13] RCU implementation for QEMU Paolo Bonzini
@ 2011-08-15 21:08 ` Paolo Bonzini
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 02/13] rename qemu_event_{init,read} Paolo Bonzini
                   ` (11 subsequent siblings)
  12 siblings, 0 replies; 16+ messages in thread
From: Paolo Bonzini @ 2011-08-15 21:08 UTC (permalink / raw)
  To: qemu-devel

We'll need a full memory barrier, and __sync_synchronize() is
just too ugly to type.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 qemu-barrier.h |    2 ++
 1 files changed, 2 insertions(+), 0 deletions(-)

diff --git a/qemu-barrier.h b/qemu-barrier.h
index b77fce2..7bd5268 100644
--- a/qemu-barrier.h
+++ b/qemu-barrier.h
@@ -4,6 +4,8 @@
 /* FIXME: arch dependant, x86 version */
 #define smp_wmb()   asm volatile("" ::: "memory")
 
+#define smp_mb()    __sync_synchronize()
+
 /* Compiler barrier */
 #define barrier()   asm volatile("" ::: "memory")
 
-- 
1.7.6

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH 02/13] rename qemu_event_{init,read}
  2011-08-15 21:08 [Qemu-devel] [RFC PATCH 00/13] RCU implementation for QEMU Paolo Bonzini
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 01/13] add smp_mb() Paolo Bonzini
@ 2011-08-15 21:08 ` Paolo Bonzini
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 03/13] qemu-threads: add QemuEvent Paolo Bonzini
                   ` (10 subsequent siblings)
  12 siblings, 0 replies; 16+ messages in thread
From: Paolo Bonzini @ 2011-08-15 21:08 UTC (permalink / raw)
  To: qemu-devel

qemu_event_init clashes with the next patch.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 cpus.c |   12 ++++++------
 1 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/cpus.c b/cpus.c
index 6bf4e3f..73e17a1 100644
--- a/cpus.c
+++ b/cpus.c
@@ -305,7 +305,7 @@ static void qemu_event_increment(void)
     }
 }
 
-static void qemu_event_read(void *opaque)
+static void qemu_eventfd_read(void *opaque)
 {
     int fd = (intptr_t)opaque;
     ssize_t len;
@@ -317,7 +317,7 @@ static void qemu_event_read(void *opaque)
     } while ((len == -1 && errno == EINTR) || len == sizeof(buffer));
 }
 
-static int qemu_event_init(void)
+static int qemu_eventfd_init(void)
 {
     int err;
     int fds[2];
@@ -334,7 +334,7 @@ static int qemu_event_init(void)
     if (err < 0) {
         goto fail;
     }
-    qemu_set_fd_handler2(fds[0], NULL, qemu_event_read, NULL,
+    qemu_set_fd_handler2(fds[0], NULL, qemu_eventfd_read, NULL,
                          (void *)(intptr_t)fds[0]);
 
     io_thread_fd = fds[1];
@@ -500,7 +500,7 @@ static void dummy_event_handler(void *opaque)
 {
 }
 
-static int qemu_event_init(void)
+static int qemu_eventfd_init(void)
 {
     qemu_event_handle = CreateEvent(NULL, FALSE, FALSE, NULL);
     if (!qemu_event_handle) {
@@ -547,7 +547,7 @@ int qemu_init_main_loop(void)
 
     qemu_init_sigbus();
 
-    return qemu_event_init();
+    return qemu_eventfd_init();
 }
 
 void qemu_main_loop_start(void)
@@ -664,7 +664,7 @@ int qemu_init_main_loop(void)
     }
 
     /* Note eventfd must be drained before signalfd handlers run */
-    ret = qemu_event_init();
+    ret = qemu_eventfd_init();
     if (ret) {
         return ret;
     }
-- 
1.7.6

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH 03/13] qemu-threads: add QemuEvent
  2011-08-15 21:08 [Qemu-devel] [RFC PATCH 00/13] RCU implementation for QEMU Paolo Bonzini
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 01/13] add smp_mb() Paolo Bonzini
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 02/13] rename qemu_event_{init,read} Paolo Bonzini
@ 2011-08-15 21:08 ` Paolo Bonzini
  2011-08-17 17:09   ` Blue Swirl
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 04/13] qemu-threads: add QemuOnce Paolo Bonzini
                   ` (9 subsequent siblings)
  12 siblings, 1 reply; 16+ messages in thread
From: Paolo Bonzini @ 2011-08-15 21:08 UTC (permalink / raw)
  To: qemu-devel

This emulates Win32 manual-reset events using futexes or conditional
variables.  Typical ways to use them are with multi-producer,
single-consumer data structures, to test for a complex condition whose
elements come from different threads:

    for (;;) {
        qemu_event_reset(ev);
        ... test complex condition ...
        if (condition is true) {
            break;
        }
        qemu_event_wait(ev);
    }

Alternatively:

    ... compute condition ...
    if (condition) {
        do {
            qemu_event_wait(ev);
            qemu_event_reset(ev);
            ... compute condition ...
        } while(condition);
        qemu_event_set(ev);
    }

QemuEvent provides a very fast userspace path in the common case when
no other thread is waiting, or the event is not changing state.  It
is used to report RCU quiescent states to the thread calling
synchronize_rcu (the latter being the single consumer), and to report
call_rcu invocations to the thread that receives them.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 qemu-thread-posix.c |  124 +++++++++++++++++++++++++++++++++++++++++++++++++++
 qemu-thread-posix.h |    8 +++
 qemu-thread-win32.c |   26 +++++++++++
 qemu-thread-win32.h |    4 ++
 qemu-thread.h       |    8 +++
 5 files changed, 170 insertions(+), 0 deletions(-)

diff --git a/qemu-thread-posix.c b/qemu-thread-posix.c
index 2bd02ef..50e7421 100644
--- a/qemu-thread-posix.c
+++ b/qemu-thread-posix.c
@@ -17,7 +17,10 @@
 #include <signal.h>
 #include <stdint.h>
 #include <string.h>
+#include <limits.h>
+#include <unistd.h>
 #include "qemu-thread.h"
+#include "qemu-barrier.h"
 
 static void error_exit(int err, const char *msg)
 {
@@ -115,6 +118,127 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex)
         error_exit(err, __func__);
 }
 
+#ifdef __linux__
+#include <sys/syscall.h>
+#ifndef FUTEX_WAIT
+#define FUTEX_WAIT              0
+#endif
+#ifndef FUTEX_WAKE
+#define FUTEX_WAKE              1
+#endif
+
+#define futex(...)              syscall(__NR_futex, __VA_ARGS__)
+
+static inline void futex_wake(QemuEvent *ev, int n)
+{
+    futex(ev, FUTEX_WAKE, n, NULL, NULL, 0);
+}
+
+static inline void futex_wait(QemuEvent *ev, unsigned val)
+{
+    futex(ev, FUTEX_WAIT, (int) val, NULL, NULL, 0);
+}
+#else
+static inline void futex_wake(QemuEvent *ev, int n)
+{
+  if (n == 1)
+    pthread_cond_signal(&ev->cond);
+  else
+    pthread_cond_broadcast(&ev->cond);
+}
+
+static inline void futex_wait(QemuEvent *ev, unsigned val)
+{
+  pthread_mutex_lock(&ev->lock);
+  if (ev->value == val)
+    pthread_cond_wait(&ev->cond, &ev->lock);
+  pthread_mutex_unlock(&ev->lock);
+}
+#endif
+
+/* Bit 0 is 1 if there are no waiters.  Bit 1 is 1 if the event is set.
+ * The combination "event_set && event_has_waiters" is impossible.  */
+#define EV_FREE_BIT	1
+#define EV_SET_BIT	2
+
+#define EV_BUSY		0
+#define EV_FREE		1
+#define EV_SET		3
+
+void qemu_event_init(QemuEvent *ev, bool init)
+{
+#ifndef __linux__
+    pthread_mutex_init(&ev->lock, NULL);
+    pthread_cond_init(&ev->cond, NULL);
+#endif
+
+    ev->value = (init ? EV_SET : EV_FREE);
+}
+
+void qemu_event_destroy(QemuEvent *ev)
+{
+#ifndef __linux__
+    pthread_mutex_destroy(&ev->lock);
+    pthread_cond_destroy(&ev->cond);
+#endif
+}
+
+void qemu_event_set(QemuEvent *ev)
+{
+    unsigned value;
+
+    smp_mb();
+    value = ev->value;
+    if (value == EV_SET) {
+        /* Exit on a pre-existing/concurrent set.  */
+        smp_mb();
+    } else {
+        if (__sync_fetch_and_or(&ev->value, EV_SET) == EV_BUSY) {
+            /* There were waiters, wake them up.  */
+            futex_wake(ev, INT_MAX);
+        }
+    }
+}
+
+void qemu_event_reset(QemuEvent *ev)
+{
+    unsigned value;
+
+    smp_mb();
+    value = ev->value;
+    if (value != EV_SET) {
+        /* Exit on a pre-existing reset.  */
+        smp_mb();
+    } else {
+        /* If there was a concurrent reset (or even reset+wait),
+         * do nothing.  Otherwise change EV_SET->EV_FREE.  */
+        __sync_fetch_and_and(&ev->value, ~EV_SET_BIT);
+    }
+}
+
+void qemu_event_wait(QemuEvent *ev)
+{
+    unsigned value, old;
+
+    smp_mb();
+    value = ev->value;
+    if (value == EV_SET) {
+        smp_mb();
+    } else {
+        if (value == EV_FREE) {
+            /* Leave the event reset and tell qemu_event_set that there
+             * are waiters.  No need to retry, because there cannot be
+             * a concurent busy->free transition.  After the CAS, the
+             * event will be either set or busy.  */
+            old = __sync_val_compare_and_swap(&ev->value, EV_FREE, EV_BUSY);
+            if (old == EV_SET) {
+                return;
+            }
+        }
+        futex_wait(ev, EV_BUSY);
+    }
+}
+
 void qemu_thread_create(QemuThread *thread,
                        void *(*start_routine)(void*),
                        void *arg)
diff --git a/qemu-thread-posix.h b/qemu-thread-posix.h
index ee4618e..2f5b63d 100644
--- a/qemu-thread-posix.h
+++ b/qemu-thread-posix.h
@@ -10,6 +10,14 @@ struct QemuCond {
     pthread_cond_t cond;
 };
 
+struct QemuEvent {
+#ifndef __linux__
+    pthread_mutex_t lock;
+    pthread_cond_t cond;
+#endif
+    unsigned value;
+};
+
 struct QemuThread {
     pthread_t thread;
 };
diff --git a/qemu-thread-win32.c b/qemu-thread-win32.c
index 2d2d5ab..9bdbb48 100644
--- a/qemu-thread-win32.c
+++ b/qemu-thread-win32.c
@@ -192,6 +192,32 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex)
     qemu_mutex_lock(mutex);
 }
 
+void qemu_event_init(QemuEvent *ev, bool init)
+{
+    /* Manual reset.  */
+    ev->event = CreateEvent(NULL, TRUE, init, NULL);
+}
+
+void qemu_event_destroy(QemuEvent *mutex)
+{
+    CloseHandle(ev->event);
+}
+
+void qemu_event_set(QemuEvent *ev)
+{
+    SetEvent(ev->event);
+}
+
+void qemu_event_reset(QemuEvent *ev)
+{
+    ResetEvent(ev->event);
+}
+
+void qemu_event_wait(QemuEvent *ev)
+{
+    WaitForSingleObject(ev->event, INFINITE);
+}
+
 struct QemuThreadData {
     QemuThread *thread;
     void *(*start_routine)(void *);
diff --git a/qemu-thread-win32.h b/qemu-thread-win32.h
index 878f86a..ddd6d0f 100644
--- a/qemu-thread-win32.h
+++ b/qemu-thread-win32.h
@@ -13,6 +13,10 @@ struct QemuCond {
     HANDLE continue_event;
 };
 
+struct QemuEvent {
+    HANDLE event;
+};
+
 struct QemuThread {
     HANDLE thread;
     void *ret;
diff --git a/qemu-thread.h b/qemu-thread.h
index 0a73d50..8353e3d 100644
--- a/qemu-thread.h
+++ b/qemu-thread.h
@@ -2,9 +2,11 @@
 #define __QEMU_THREAD_H 1
 
 #include <inttypes.h>
+#include <stdbool.h>
 
 typedef struct QemuMutex QemuMutex;
 typedef struct QemuCond QemuCond;
+typedef struct QemuEvent QemuEvent;
 typedef struct QemuThread QemuThread;
 
 #ifdef _WIN32
@@ -31,6 +33,12 @@ void qemu_cond_signal(QemuCond *cond);
 void qemu_cond_broadcast(QemuCond *cond);
 void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex);
 
+void qemu_event_init(QemuEvent *ev, bool init);
+void qemu_event_set(QemuEvent *ev);
+void qemu_event_reset(QemuEvent *ev);
+void qemu_event_wait(QemuEvent *ev);
+void qemu_event_destroy(QemuEvent *ev);
+
 void qemu_thread_create(QemuThread *thread,
                        void *(*start_routine)(void*),
                        void *arg);
-- 
1.7.6

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH 04/13] qemu-threads: add QemuOnce
  2011-08-15 21:08 [Qemu-devel] [RFC PATCH 00/13] RCU implementation for QEMU Paolo Bonzini
                   ` (2 preceding siblings ...)
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 03/13] qemu-threads: add QemuEvent Paolo Bonzini
@ 2011-08-15 21:08 ` Paolo Bonzini
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 05/13] add rcu library Paolo Bonzini
                   ` (8 subsequent siblings)
  12 siblings, 0 replies; 16+ messages in thread
From: Paolo Bonzini @ 2011-08-15 21:08 UTC (permalink / raw)
  To: qemu-devel

Add Windows and POSIX versions, as usual.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 compiler.h          |    2 ++
 qemu-thread-posix.c |    5 +++++
 qemu-thread-posix.h |    5 +++++
 qemu-thread-win32.c |   19 +++++++++++++++++++
 qemu-thread-win32.h |    5 +++++
 qemu-thread.h       |    3 +++
 6 files changed, 39 insertions(+), 0 deletions(-)

diff --git a/compiler.h b/compiler.h
index 9af5dc6..c87dd96 100644
--- a/compiler.h
+++ b/compiler.h
@@ -31,4 +31,6 @@
 #define GCC_FMT_ATTR(n, m)
 #endif
 
+#define ACCESS_ONCE(x)      (*(volatile typeof(x) *)&(x))
+
 #endif /* COMPILER_H */
diff --git a/qemu-thread-posix.c b/qemu-thread-posix.c
index 50e7421..894134c 100644
--- a/qemu-thread-posix.c
+++ b/qemu-thread-posix.c
@@ -239,6 +239,11 @@ void qemu_event_wait(QemuEvent *ev)
     }
 }
 
+void qemu_once(QemuOnce *o, void (*func)(void))
+{
+    pthread_once(&o->once, func);
+}
+
 void qemu_thread_create(QemuThread *thread,
                        void *(*start_routine)(void*),
                        void *arg)
diff --git a/qemu-thread-posix.h b/qemu-thread-posix.h
index 2f5b63d..d781ca6 100644
--- a/qemu-thread-posix.h
+++ b/qemu-thread-posix.h
@@ -18,6 +18,11 @@ struct QemuEvent {
     unsigned value;
 };
 
+#define QEMU_ONCE_INIT { .once = PTHREAD_ONCE_INIT }
+struct QemuOnce {
+    pthread_once_t once;
+};
+
 struct QemuThread {
     pthread_t thread;
 };
diff --git a/qemu-thread-win32.c b/qemu-thread-win32.c
index 9bdbb48..7b1c407 100644
--- a/qemu-thread-win32.c
+++ b/qemu-thread-win32.c
@@ -12,6 +12,7 @@
  */
 #include "qemu-common.h"
 #include "qemu-thread.h"
+#include "qemu-barrier.h"
 #include <process.h>
 #include <assert.h>
 #include <limits.h>
@@ -218,6 +219,24 @@ void qemu_event_wait(QemuEvent *ev)
     WaitForSingleObject(ev->event, INFINITE);
 }
 
+void qemu_once(QemuOnce *o, void (*func)(void))
+{
+    int old;
+    if (once->state != 2) {
+        old = __sync_val_compare_and_swap (&once->state, 0, 1);
+        if (old == 0) {
+            func();
+            once->state = 2;
+            smp_mb();
+            return;
+        }
+        /* Busy wait until the first thread gives us a green flag.  */
+        while (ACCESS_ONCE(once->state) == 1) {
+            Sleep(0);
+        }
+    }
+}
+
 struct QemuThreadData {
     QemuThread *thread;
     void *(*start_routine)(void *);
diff --git a/qemu-thread-win32.h b/qemu-thread-win32.h
index ddd6d0f..fbee4d9 100644
--- a/qemu-thread-win32.h
+++ b/qemu-thread-win32.h
@@ -17,6 +17,11 @@ struct QemuEvent {
     HANDLE event;
 };
 
+#define QEMU_ONCE_INIT = { .state = 0 }
+struct QemuOnce {
+    int state;
+}
+
 struct QemuThread {
     HANDLE thread;
     void *ret;
diff --git a/qemu-thread.h b/qemu-thread.h
index 8353e3d..ae75638 100644
--- a/qemu-thread.h
+++ b/qemu-thread.h
@@ -7,6 +7,7 @@
 typedef struct QemuMutex QemuMutex;
 typedef struct QemuCond QemuCond;
 typedef struct QemuEvent QemuEvent;
+typedef struct QemuOnce QemuOnce;
 typedef struct QemuThread QemuThread;
 
 #ifdef _WIN32
@@ -39,6 +40,8 @@ void qemu_event_reset(QemuEvent *ev);
 void qemu_event_wait(QemuEvent *ev);
 void qemu_event_destroy(QemuEvent *ev);
 
+void qemu_once(QemuOnce *o, void (*func)(void));
+
 void qemu_thread_create(QemuThread *thread,
                        void *(*start_routine)(void*),
                        void *arg);
-- 
1.7.6

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH 05/13] add rcu library
  2011-08-15 21:08 [Qemu-devel] [RFC PATCH 00/13] RCU implementation for QEMU Paolo Bonzini
                   ` (3 preceding siblings ...)
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 04/13] qemu-threads: add QemuOnce Paolo Bonzini
@ 2011-08-15 21:08 ` Paolo Bonzini
  2011-08-17 17:04   ` Blue Swirl
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 06/13] rcu: add rcutorture Paolo Bonzini
                   ` (7 subsequent siblings)
  12 siblings, 1 reply; 16+ messages in thread
From: Paolo Bonzini @ 2011-08-15 21:08 UTC (permalink / raw)
  To: qemu-devel

This includes a (mangled) copy of the urcu-qsbr code from liburcu.
The main changes are: 1) removing dependencies on many other header files
in liburcu; 2) removing for simplicity the tentative busy waiting in
synchronize_rcu, which has limited performance effects; 3) replacing
futexes in synchronize_rcu with QemuEvents for Win32 portability.
The API is the same as liburcu, so it should be possible in the future
to require liburcu on POSIX systems for example and use our copy only
on Windows.

Among the various versions available I chose urcu-qsbr, which has the
fastest rcu_read_{lock,unlock} but requires the program to manually
annotate quiescent points or intervals.  QEMU threads usually have easily
identified quiescent periods, so this should not be a problem.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 Makefile.objs |    2 +-
 qemu-queue.h  |   11 +++
 rcu-pointer.h |  119 +++++++++++++++++++++++++++++++
 rcu.c         |  221 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 rcu.h         |  135 +++++++++++++++++++++++++++++++++++
 5 files changed, 487 insertions(+), 1 deletions(-)
 create mode 100644 rcu-pointer.h
 create mode 100644 rcu.c
 create mode 100644 rcu.h

diff --git a/Makefile.objs b/Makefile.objs
index 16eef38..902a083 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -6,7 +6,7 @@ qobject-obj-y += qerror.o error.o
 
 #######################################################################
 # oslib-obj-y is code depending on the OS (win32 vs posix)
-oslib-obj-y = osdep.o
+oslib-obj-y = osdep.o rcu.o
 oslib-obj-$(CONFIG_WIN32) += oslib-win32.o qemu-thread-win32.o
 oslib-obj-$(CONFIG_POSIX) += oslib-posix.o qemu-thread-posix.o
 
diff --git a/qemu-queue.h b/qemu-queue.h
index 1d07745..cc8e55b 100644
--- a/qemu-queue.h
+++ b/qemu-queue.h
@@ -100,6 +100,17 @@ struct {                                                                \
         (head)->lh_first = NULL;                                        \
 } while (/*CONSTCOND*/0)
 
+#define QLIST_SWAP(dstlist, srclist, field) do {                        \
+        void *tmplist;                                                  \
+        tmplist = (srclist)->lh_first;                                  \
+        (srclist)->lh_first = (dstlist)->lh_first;                      \
+        if ((srclist)->lh_first != NULL)                                \
+            (srclist)->lh_first->field.le_prev = &(srclist)->lh_first;  \
+        (dstlist)->lh_first = tmplist; \
+        if ((dstlist)->lh_first != NULL)                                \
+            (dstlist)->lh_first->field.le_prev = &(dstlist)->lh_first;  \
+} while (/*CONSTCOND*/0)
+
 #define QLIST_INSERT_AFTER(listelm, elm, field) do {                    \
         if (((elm)->field.le_next = (listelm)->field.le_next) != NULL)  \
                 (listelm)->field.le_next->field.le_prev =               \
diff --git a/rcu-pointer.h b/rcu-pointer.h
new file mode 100644
index 0000000..4381313
--- /dev/null
+++ b/rcu-pointer.h
@@ -0,0 +1,119 @@
+#ifndef _URCU_POINTER_STATIC_H
+#define _URCU_POINTER_STATIC_H
+
+/*
+ * urcu-pointer-static.h
+ *
+ * Userspace RCU header. Operations on pointers.
+ *
+ * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See urcu-pointer.h for
+ * linking dynamically with the userspace rcu library.
+ *
+ * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * IBM's contributions to this file may be relicensed under LGPLv2 or later.
+ */
+
+#include "compiler.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif 
+
+#ifdef __alpha__
+#define smp_read_barrier_depends()   asm volatile("mb":::"memory")
+#else
+#define smp_read_barrier_depends()
+#endif
+
+/**
+ * rcu_dereference - reads (copy) a RCU-protected pointer to a local variable
+ * into a RCU read-side critical section. The pointer can later be safely
+ * dereferenced within the critical section.
+ *
+ * This ensures that the pointer copy is invariant thorough the whole critical
+ * section.
+ *
+ * Inserts memory barriers on architectures that require them (currently only
+ * Alpha) and documents which pointers are protected by RCU.
+ *
+ * The compiler memory barrier in ACCESS_ONCE() ensures that value-speculative
+ * optimizations (e.g. VSS: Value Speculation Scheduling) does not perform the
+ * data read before the pointer read by speculating the value of the pointer.
+ * Correct ordering is ensured because the pointer is read as a volatile access.
+ * This acts as a global side-effect operation, which forbids reordering of
+ * dependent memory operations. Note that such concern about dependency-breaking
+ * optimizations will eventually be taken care of by the "memory_order_consume"
+ * addition to forthcoming C++ standard.
+ *
+ * Should match rcu_assign_pointer() or rcu_xchg_pointer().
+ */
+
+#define rcu_dereference(p)     ({					\
+				typeof(p) _________p1 = ACCESS_ONCE(p); \
+				smp_read_barrier_depends();		\
+				(_________p1);				\
+				})
+
+/**
+ * rcu_cmpxchg_pointer - same as rcu_assign_pointer, but tests if the pointer
+ * is as expected by "old". If succeeds, returns the previous pointer to the
+ * data structure, which can be safely freed after waiting for a quiescent state
+ * using synchronize_rcu(). If fails (unexpected value), returns old (which
+ * should not be freed !).
+ */
+
+#define rcu_cmpxchg_pointer(p, old, _new)				\
+	({								\
+		typeof(*p) _________pold = (old);			\
+		typeof(*p) _________pnew = (_new);			\
+		if (!__builtin_constant_p(_new) ||			\
+		    ((_new) != NULL))					\
+			__sync_synchronize();				\
+		uatomic_cmpxchg(p, _________pold, _________pnew);	\
+	})
+
+#define rcu_set_pointer(p, v)				\
+	({						\
+		typeof(*p) _________pv = (v);		\
+		if (!__builtin_constant_p(v) || 	\
+		    ((v) != NULL))			\
+			__sync_synchronize();		\
+		ACCESS_ONCE(*p) = _________pv;		\
+	})
+
+/**
+ * rcu_assign_pointer - assign (publicize) a pointer to a new data structure
+ * meant to be read by RCU read-side critical sections. Returns the assigned
+ * value.
+ *
+ * Documents which pointers will be dereferenced by RCU read-side critical
+ * sections and adds the required memory barriers on architectures requiring
+ * them. It also makes sure the compiler does not reorder code initializing the
+ * data structure before its publication.
+ *
+ * Should match rcu_dereference_pointer().
+ */
+
+#define rcu_assign_pointer(p, v)	rcu_set_pointer(&(p), v)
+
+#ifdef __cplusplus 
+}
+#endif
+
+#endif /* _URCU_POINTER_STATIC_H */
diff --git a/rcu.c b/rcu.c
new file mode 100644
index 0000000..e2f347a
--- /dev/null
+++ b/rcu.c
@@ -0,0 +1,221 @@
+/*
+ * urcu-qsbr.c
+ *
+ * Userspace RCU QSBR library
+ *
+ * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
+ * Copyright 2011 Red Hat, Inc.
+ *
+ * Ported to QEMU by Paolo Bonzini  <pbonzini@redhat.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * IBM's contributions to this file may be relicensed under LGPLv2 or later.
+ */
+
+#include <stdio.h>
+#include <assert.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <errno.h>
+#include "rcu.h"
+#include "qemu-barrier.h"
+
+/*
+ * Global grace period counter.  Bit 0 is one if the thread is online.
+ * Bits 1 and above are defined in synchronize_rcu/update_counter_and_wait.
+ */
+#define RCU_GP_ONLINE           (1UL << 0)
+#define RCU_GP_CTR              (1UL << 1)
+
+unsigned long rcu_gp_ctr = RCU_GP_ONLINE;
+
+QemuEvent rcu_gp_event;
+QemuMutex rcu_gp_lock;
+
+/*
+ * Check whether a quiescent state was crossed between the beginning of
+ * update_counter_and_wait and now.
+ */
+static inline int rcu_gp_ongoing(unsigned long *ctr)
+{
+	unsigned long v;
+
+	v = ACCESS_ONCE(*ctr);
+	return v && (v != rcu_gp_ctr);
+}
+
+/*
+ * Written to only by each individual reader. Read by both the reader and the
+ * writers.
+ */
+__thread struct rcu_reader rcu_reader;
+
+typedef QLIST_HEAD(, rcu_reader) ThreadList;
+static ThreadList registry = QLIST_HEAD_INITIALIZER(registry);
+
+static void update_counter_and_wait(void)
+{
+	ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders);
+	struct rcu_reader *index, *tmp;
+
+	if (sizeof (rcu_gp_ctr) <= 4) {
+		/* Switch parity: 0 -> 1, 1 -> 0 */
+		ACCESS_ONCE(rcu_gp_ctr) = rcu_gp_ctr ^ RCU_GP_CTR;
+	} else {
+		/* Increment current grace period.  */
+		ACCESS_ONCE(rcu_gp_ctr) = rcu_gp_ctr + RCU_GP_CTR;
+	}
+
+	barrier();
+
+	/*
+	 * Wait for each thread rcu_reader_qs_gp count to become 0.
+	 */
+	for (;;) {
+		/*
+		 * We want to be notified of changes made to rcu_gp_ongoing
+		 * while we walk the list.
+		 */
+		qemu_event_reset(&rcu_gp_event);
+		QLIST_FOREACH_SAFE(index, &registry, node, tmp) {
+			if (!rcu_gp_ongoing(&index->ctr)) {
+				QLIST_REMOVE(index, node);
+				QLIST_INSERT_HEAD(&qsreaders, index, node);
+			}
+		}
+
+		if (QLIST_EMPTY(&registry)) {
+			break;
+		}
+
+		/*
+		 * Wait for one thread to report a quiescent state and
+		 * try again.
+		 */
+		qemu_event_wait(&rcu_gp_event);
+	}
+	smp_mb();
+
+	/* put back the reader list in the registry */
+	QLIST_SWAP(&registry, &qsreaders, node);
+}
+
+/*
+ * Using a two-subphases algorithm for architectures with smaller than 64-bit
+ * long-size to ensure we do not encounter an overflow bug.
+ */
+
+void synchronize_rcu(void)
+{
+	unsigned long was_online;
+
+	was_online = rcu_reader.ctr;
+
+	/* All threads should read qparity before accessing data structure
+	 * where new ptr points to.
+	 */
+
+	/*
+	 * Mark the writer thread offline to make sure we don't wait for
+	 * our own quiescent state. This allows using synchronize_rcu()
+	 * in threads registered as readers.
+	 *
+	 * Also includes a memory barrier.
+	 */
+	if (was_online) {
+		rcu_thread_offline();
+	} else {
+		smp_mb();
+	}
+
+	qemu_mutex_lock(&rcu_gp_lock);
+
+	if (QLIST_EMPTY(&registry))
+		goto out;
+
+	if (sizeof(rcu_gp_ctr) <= 4) {
+		/*
+		 * Wait for previous parity to be empty of readers.
+		 */
+		update_counter_and_wait();	/* 0 -> 1, wait readers in parity 0 */
+
+		/*
+		 * Must finish waiting for quiescent state for parity 0 before
+		 * committing next rcu_gp_ctr update to memory. Failure to
+		 * do so could result in the writer waiting forever while new
+		 * readers are always accessing data (no progress).  Enforce
+		 * compiler-order of load rcu_reader ctr before store to
+		 * rcu_gp_ctr.
+		 */
+		barrier();
+
+		/*
+		 * Adding a memory barrier which is _not_ formally required,
+		 * but makes the model easier to understand. It does not have a
+		 * big performance impact anyway, given this is the write-side.
+		 */
+		smp_mb();
+	}
+
+	/*
+	 * Wait for previous parity/grace period to be empty of readers.
+	 */
+	update_counter_and_wait();	/* 1 -> 0, wait readers in parity 1 */
+out:
+	qemu_mutex_unlock(&rcu_gp_lock);
+
+	if (was_online) {
+		/* Also includes a memory barrier.  */
+		rcu_thread_online();
+	} else {
+		/*
+		 * Finish waiting for reader threads before letting the old
+		 * ptr being freed.
+		 */
+		smp_mb();
+	}
+}
+
+static void rcu_init(void)
+{
+	qemu_mutex_init(&rcu_gp_lock);
+	qemu_event_init(&rcu_gp_event, true);
+}
+
+void rcu_register_thread(void)
+{
+	static QemuOnce init = QEMU_ONCE_INIT;
+	assert(rcu_reader.ctr == 0);
+
+	qemu_once(&init, rcu_init);
+	qemu_mutex_lock(&rcu_gp_lock);
+	QLIST_INSERT_HEAD(&registry, &rcu_reader, node);
+	qemu_mutex_unlock(&rcu_gp_lock);
+	rcu_thread_online();
+}
+
+void rcu_unregister_thread(void)
+{
+	/*
+	 * We have to make the thread offline otherwise we end up dealocking
+	 * with a waiting writer.
+	 */
+	rcu_thread_offline();
+	qemu_mutex_lock(&rcu_gp_lock);
+	QLIST_REMOVE(&rcu_reader, node);
+	qemu_mutex_unlock(&rcu_gp_lock);
+}
diff --git a/rcu.h b/rcu.h
new file mode 100644
index 0000000..569f696
--- /dev/null
+++ b/rcu.h
@@ -0,0 +1,135 @@
+#ifndef _URCU_QSBR_H
+#define _URCU_QSBR_H
+
+/*
+ * urcu-qsbr.h
+ *
+ * Userspace RCU QSBR header.
+ *
+ * LGPL-compatible code should include this header with :
+ *
+ * #define _LGPL_SOURCE
+ * #include <urcu.h>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+c* IBM's contributions to this file may be relicensed under LGPLv2 or later.
+ */
+
+#include <stdlib.h>
+#include <assert.h>
+#include <limits.h>
+#include <unistd.h>
+#include <stdint.h>
+#include <stdbool.h>
+
+#include "compiler.h"
+#include "rcu-pointer.h"
+#include "qemu-thread.h"
+#include "qemu-queue.h"
+#include "qemu-barrier.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif 
+
+/*
+ * Important !
+ *
+ * Each thread containing read-side critical sections must be registered
+ * with rcu_register_thread() before calling rcu_read_lock().
+ * rcu_unregister_thread() should be called before the thread exits.
+ */
+
+#ifdef DEBUG_RCU
+#define rcu_assert(args...)	assert(args)
+#else
+#define rcu_assert(args...)
+#endif
+
+#define RCU_GP_ONLINE		(1UL << 0)
+#define RCU_GP_CTR		(1UL << 1)
+
+/*
+ * Global quiescent period counter with low-order bits unused.
+ * Using a int rather than a char to eliminate false register dependencies
+ * causing stalls on some architectures.
+ */
+extern unsigned long rcu_gp_ctr;
+
+extern QemuEvent rcu_gp_event;
+
+struct rcu_reader {
+	/* Data used by both reader and synchronize_rcu() */
+	unsigned long ctr;
+	/* Data used for registry */
+	QLIST_ENTRY(rcu_reader) node;
+};
+
+extern __thread struct rcu_reader rcu_reader;
+
+static inline void rcu_read_lock(void)
+{
+	rcu_assert(rcu_reader.ctr);
+}
+
+static inline void rcu_read_unlock(void)
+{
+}
+
+static inline void rcu_quiescent_state(void)
+{
+	smp_mb();
+	ACCESS_ONCE(rcu_reader.ctr) = ACCESS_ONCE(rcu_gp_ctr);
+#if 0
+	/* write rcu_reader.ctr before read futex.  Included in
+	 * qemu_event_set. */
+	smp_mb();
+#endif
+	qemu_event_set(&rcu_gp_event);
+}
+
+static inline void rcu_thread_offline(void)
+{
+	smp_mb();
+	ACCESS_ONCE(rcu_reader.ctr) = 0;
+#if 0
+	/* write rcu_reader.ctr before read futex.  Included in
+	 * qemu_event_set. */
+	smp_mb();
+#endif
+	qemu_event_set(&rcu_gp_event);
+}
+
+static inline void rcu_thread_online(void)
+{
+	smp_mb();	/* Ensure the compiler does not reorder us with mutex */
+	ACCESS_ONCE(rcu_reader.ctr) = ACCESS_ONCE(rcu_gp_ctr);
+	smp_mb();
+}
+
+extern void synchronize_rcu(void);
+
+/*
+ * Reader thread registration.
+ */
+extern void rcu_register_thread(void);
+extern void rcu_unregister_thread(void);
+
+#ifdef __cplusplus 
+}
+#endif
+
+#endif /* _URCU_QSBR_H */
-- 
1.7.6

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH 06/13] rcu: add rcutorture
  2011-08-15 21:08 [Qemu-devel] [RFC PATCH 00/13] RCU implementation for QEMU Paolo Bonzini
                   ` (4 preceding siblings ...)
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 05/13] add rcu library Paolo Bonzini
@ 2011-08-15 21:08 ` Paolo Bonzini
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 07/13] osdep: add qemu_msleep Paolo Bonzini
                   ` (6 subsequent siblings)
  12 siblings, 0 replies; 16+ messages in thread
From: Paolo Bonzini @ 2011-08-15 21:08 UTC (permalink / raw)
  To: qemu-devel

A stress test program (works :)).  So far not ported to Windows.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 rcutorture.c |  433 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 433 insertions(+), 0 deletions(-)
 create mode 100644 rcutorture.c

diff --git a/rcutorture.c b/rcutorture.c
new file mode 100644
index 0000000..33b258e
--- /dev/null
+++ b/rcutorture.c
@@ -0,0 +1,433 @@
+/*
+ * rcutorture.h: simple user-level performance/stress test of RCU.
+ *
+ * Usage:
+ * 	./rcu <nreaders> rperf [ <seconds> ]
+ * 		Run a read-side performance test with the specified
+ * 		number of readers for <seconds> seconds.
+ * 		Thus "./rcu 16 rperf 2" would run 16 readers on even-numbered
+ * 		CPUs from 0 to 30.
+ * 	./rcu <nupdaters> uperf [ <seconds> ]
+ * 		Run an update-side performance test with the specified
+ * 		number of updaters and specified duration.
+ * 	./rcu <nreaders> perf [ <seconds> ]
+ * 		Run a combined read/update performance test with the specified
+ * 		number of readers and one updater and specified duration.
+ *
+ * The above tests produce output as follows:
+ *
+ * n_reads: 46008000  n_updates: 146026  nreaders: 2  nupdaters: 1 duration: 1
+ * ns/read: 43.4707  ns/update: 6848.1
+ *
+ * The first line lists the total number of RCU reads and updates executed
+ * during the test, the number of reader threads, the number of updater
+ * threads, and the duration of the test in seconds.  The second line
+ * lists the average duration of each type of operation in nanoseconds,
+ * or "nan" if the corresponding type of operation was not performed.
+ *
+ * 	./rcu <nreaders> stress [ <seconds> ]
+ * 		Run a stress test with the specified number of readers and
+ * 		one updater.  None of the threads are affinitied to any
+ * 		particular CPU.
+ *
+ * This test produces output as follows:
+ *
+ * n_reads: 114633217  n_updates: 3903415  n_mberror: 0
+ * rcu_stress_count: 114618391 14826 0 0 0 0 0 0 0 0 0
+ *
+ * The first line lists the number of RCU read and update operations
+ * executed, followed by the number of memory-ordering violations
+ * (which will be zero in a correct RCU implementation).  The second
+ * line lists the number of readers observing progressively more stale
+ * data.  A correct RCU implementation will have all but the first two
+ * numbers non-zero.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright (c) 2008 Paul E. McKenney, IBM Corporation.
+ */
+
+/*
+ * Test variables.
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <pthread.h>
+#include "rcu.h"
+#include "compiler.h"
+#include "qemu-thread.h"
+
+#include "qemu-thread-posix.c"
+#include "rcu.c"
+
+long long n_reads = 0LL;
+long n_updates = 0L;
+int nthreadsrunning;
+
+char argsbuf[64];
+
+#define GOFLAG_INIT 0
+#define GOFLAG_RUN  1
+#define GOFLAG_STOP 2
+
+volatile int goflag = GOFLAG_INIT;
+
+#define RCU_READ_RUN 1000
+
+#define NR_THREADS 100
+static pthread_t threads[NR_THREADS];
+
+static void create_thread(void *(*func)(void *), void *arg)
+{
+	pthread_t tid;
+	int i;
+
+	if (pthread_create(&tid, NULL, func, arg) != 0) {
+		perror("create_thread:pthread_create");
+		exit(-1);
+	}
+	for (i = 0; i < NR_THREADS; i++) {
+		if (__sync_bool_compare_and_swap(&threads[i], 0, tid))
+			break;
+	}
+	if (i >= NR_THREADS) {
+		fprintf(stderr, "Thread limit of %d exceeded!\n", NR_THREADS);
+		exit(-1);
+	}
+}
+
+static void wait_all_threads(void)
+{
+	int i;
+	void *vp;
+	pthread_t tid;
+
+	for (i = 1; i < NR_THREADS; i++) {
+		tid = threads[i];
+		if (tid != 0) {
+			if (pthread_join(tid, &vp) != 0) {
+				perror("wait_thread:pthread_join");
+				exit(-1);
+			}
+			threads[i] = 0;
+		}
+	}
+}
+
+/*
+ * Performance test.
+ */
+
+void *rcu_read_perf_test(void *arg)
+{
+	int i;
+	long long n_reads_local = 0;
+
+	rcu_register_thread();
+	__sync_fetch_and_add(&nthreadsrunning, 1);
+	rcu_thread_offline();
+	while (goflag == GOFLAG_INIT)
+		poll(NULL, 0, 1);
+	rcu_thread_online();
+	while (goflag == GOFLAG_RUN) {
+		for (i = 0; i < RCU_READ_RUN; i++) {
+			rcu_read_lock();
+			rcu_read_unlock();
+		}
+		n_reads_local += RCU_READ_RUN;
+		rcu_quiescent_state();
+	}
+	__sync_fetch_and_add(&n_reads, n_reads_local);
+	rcu_unregister_thread();
+
+	return (NULL);
+}
+
+void *rcu_update_perf_test(void *arg)
+{
+	long long n_updates_local = 0;
+
+	__sync_fetch_and_add(&nthreadsrunning, 1);
+	while (goflag == GOFLAG_INIT)
+		poll(NULL, 0, 1);
+	while (goflag == GOFLAG_RUN) {
+		synchronize_rcu();
+		n_updates_local++;
+	}
+	__sync_fetch_and_add(&n_updates, n_updates_local);
+	return NULL;
+}
+
+void perftestinit(void)
+{
+	nthreadsrunning = 0;
+}
+
+void perftestrun(int nthreads, int duration, int nreaders, int nupdaters)
+{
+	int t;
+
+	while (ACCESS_ONCE(nthreadsrunning) < nthreads)
+		poll(NULL, 0, 1);
+	goflag = GOFLAG_RUN;
+	sleep(duration);
+	goflag = GOFLAG_STOP;
+	wait_all_threads();
+	printf("n_reads: %lld  n_updates: %ld  nreaders: %d  nupdaters: %d duration: %d\n",
+	       n_reads, n_updates, nreaders, nupdaters, duration);
+	printf("ns/read: %g  ns/update: %g\n",
+	       ((duration * 1000*1000*1000.*(double)nreaders) /
+		(double)n_reads),
+	       ((duration * 1000*1000*1000.*(double)nupdaters) /
+		(double)n_updates));
+	exit(0);
+}
+
+void perftest(int nreaders, int duration)
+{
+	int i;
+
+	perftestinit();
+	for (i = 0; i < nreaders; i++) {
+		create_thread(rcu_read_perf_test, NULL);
+	}
+	create_thread(rcu_update_perf_test, NULL);
+	perftestrun(i + 1, duration, nreaders, 1);
+}
+
+void rperftest(int nreaders, int duration)
+{
+	int i;
+
+	perftestinit();
+	for (i = 0; i < nreaders; i++) {
+		create_thread(rcu_read_perf_test, NULL);
+	}
+	perftestrun(i, duration, nreaders, 0);
+}
+
+void uperftest(int nupdaters, int duration)
+{
+	int i;
+
+	perftestinit();
+	for (i = 0; i < nupdaters; i++) {
+		create_thread(rcu_update_perf_test, NULL);
+	}
+	perftestrun(i, duration, 0, nupdaters);
+}
+
+/*
+ * Stress test.
+ */
+
+#define RCU_STRESS_PIPE_LEN 10
+
+struct rcu_stress {
+	int pipe_count;
+	int mbtest;
+};
+
+struct rcu_stress rcu_stress_array[RCU_STRESS_PIPE_LEN] = { { 0 } };
+struct rcu_stress *rcu_stress_current;
+int rcu_stress_idx = 0;
+
+int n_mberror = 0;
+long long rcu_stress_count[RCU_STRESS_PIPE_LEN + 1];
+
+int garbage = 0;
+
+void *rcu_read_stress_test(void *arg)
+{
+	int i;
+	int itercnt = 0;
+	struct rcu_stress *p;
+	int pc;
+	long long n_reads_local;
+
+	rcu_register_thread();
+	rcu_thread_offline();
+	while (goflag == GOFLAG_INIT)
+		poll(NULL, 0, 1);
+	rcu_thread_online();
+	while (goflag == GOFLAG_RUN) {
+		rcu_read_lock();
+		p = rcu_dereference(rcu_stress_current);
+		if (p->mbtest == 0)
+			n_mberror++;
+		rcu_read_lock();
+		for (i = 0; i < 100; i++)
+			garbage++;
+		rcu_read_unlock();
+		pc = p->pipe_count;
+		rcu_read_unlock();
+		if ((pc > RCU_STRESS_PIPE_LEN) || (pc < 0))
+			pc = RCU_STRESS_PIPE_LEN;
+		__sync_fetch_and_add(&rcu_stress_count[pc], 1);
+		n_reads_local++;
+		rcu_quiescent_state();
+		if ((++itercnt % 0x1000) == 0) {
+			synchronize_rcu();
+		}
+	}
+	__sync_fetch_and_add(&n_reads, n_reads_local);
+	rcu_thread_offline();
+	rcu_unregister_thread();
+
+	return (NULL);
+}
+
+#if 0
+void rcu_update_stress_test_rcu(struct rcu_head *head)
+{
+	if (pthread_mutex_lock(&call_rcu_test_mutex) != 0) {
+		perror("pthread_mutex_lock");
+		exit(-1);
+	}
+	if (pthread_cond_signal(&call_rcu_test_cond) != 0) {
+		perror("pthread_cond_signal");
+		exit(-1);
+	}
+	if (pthread_mutex_unlock(&call_rcu_test_mutex) != 0) {
+		perror("pthread_mutex_unlock");
+		exit(-1);
+	}
+}
+#endif
+
+void *rcu_update_stress_test(void *arg)
+{
+	int i;
+	struct rcu_stress *p;
+#if 0
+	struct rcu_head rh;
+#endif
+
+	while (goflag == GOFLAG_INIT)
+		poll(NULL, 0, 1);
+	while (goflag == GOFLAG_RUN) {
+		i = rcu_stress_idx + 1;
+		if (i >= RCU_STRESS_PIPE_LEN)
+			i = 0;
+		p = &rcu_stress_array[i];
+		p->mbtest = 0;
+		__sync_synchronize();
+		p->pipe_count = 0;
+		p->mbtest = 1;
+		rcu_assign_pointer(rcu_stress_current, p);
+		rcu_stress_idx = i;
+		for (i = 0; i < RCU_STRESS_PIPE_LEN; i++)
+			if (i != rcu_stress_idx)
+				rcu_stress_array[i].pipe_count++;
+#if 0
+		if (n_updates & 0x1)
+			synchronize_rcu();
+		else {
+			if (pthread_mutex_lock(&call_rcu_test_mutex) != 0) {
+				perror("pthread_mutex_lock");
+				exit(-1);
+			}
+			call_rcu(&rh, rcu_update_stress_test_rcu);
+			if (pthread_cond_wait(&call_rcu_test_cond,
+					      &call_rcu_test_mutex) != 0) {
+				perror("pthread_cond_wait");
+				exit(-1);
+			}
+			if (pthread_mutex_unlock(&call_rcu_test_mutex) != 0) {
+				perror("pthread_mutex_unlock");
+				exit(-1);
+			}
+		}
+#else
+			synchronize_rcu();
+#endif
+		n_updates++;
+	}
+	return NULL;
+}
+
+void *rcu_fake_update_stress_test(void *arg)
+{
+	while (goflag == GOFLAG_INIT)
+		poll(NULL, 0, 1);
+	while (goflag == GOFLAG_RUN) {
+		synchronize_rcu();
+		poll(NULL, 0, 1);
+	}
+	return NULL;
+}
+
+void stresstest(int nreaders, int duration)
+{
+	int i;
+	int t;
+	long long *p;
+	long long sum;
+
+	rcu_stress_current = &rcu_stress_array[0];
+	rcu_stress_current->pipe_count = 0;
+	rcu_stress_current->mbtest = 1;
+	for (i = 0; i < nreaders; i++)
+		create_thread(rcu_read_stress_test, NULL);
+	create_thread(rcu_update_stress_test, NULL);
+	for (i = 0; i < 5; i++)
+		create_thread(rcu_fake_update_stress_test, NULL);
+	goflag = GOFLAG_RUN;
+	sleep(duration);
+	goflag = GOFLAG_STOP;
+	wait_all_threads();
+	printf("n_reads: %lld  n_updates: %ld  n_mberror: %d\n",
+	       n_reads, n_updates, n_mberror);
+	printf("rcu_stress_count:");
+	for (i = 0; i <= RCU_STRESS_PIPE_LEN; i++) {
+		printf(" %lld", rcu_stress_count[i]);
+	}
+	printf("\n");
+	exit(0);
+}
+
+/*
+ * Mainprogram.
+ */
+
+void usage(int argc, char *argv[])
+{
+	fprintf(stderr, "Usage: %s [nreaders [ perf | stress ] ]\n", argv[0]);
+	exit(-1);
+}
+
+int main(int argc, char *argv[])
+{
+	int nreaders = 1;
+	int duration = 1;
+
+	if (argc >= 2) {
+		nreaders = strtoul(argv[1], NULL, 0);
+	}
+	if (argc > 3) {
+		duration = strtoul(argv[3], NULL, 0);
+	}
+	if (argc < 3 || strcmp(argv[2], "stress") == 0)
+		stresstest(nreaders, duration);
+	else if (strcmp(argv[2], "rperf") == 0)
+		rperftest(nreaders, duration);
+	else if (strcmp(argv[2], "uperf") == 0)
+		uperftest(nreaders, duration);
+	else if (strcmp(argv[2], "perf") == 0)
+		perftest(nreaders, duration);
+	usage(argc, argv);
+	return 0;
+}
-- 
1.7.6

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH 07/13] osdep: add qemu_msleep
  2011-08-15 21:08 [Qemu-devel] [RFC PATCH 00/13] RCU implementation for QEMU Paolo Bonzini
                   ` (5 preceding siblings ...)
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 06/13] rcu: add rcutorture Paolo Bonzini
@ 2011-08-15 21:08 ` Paolo Bonzini
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 08/13] add call_rcu support Paolo Bonzini
                   ` (5 subsequent siblings)
  12 siblings, 0 replies; 16+ messages in thread
From: Paolo Bonzini @ 2011-08-15 21:08 UTC (permalink / raw)
  To: qemu-devel

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 osdep.h       |    1 +
 oslib-posix.c |    7 ++++++-
 oslib-win32.c |    5 +++++
 3 files changed, 12 insertions(+), 1 deletions(-)

diff --git a/osdep.h b/osdep.h
index a817017..6bf4e1c 100644
--- a/osdep.h
+++ b/osdep.h
@@ -136,6 +136,7 @@ int qemu_madvise(void *addr, size_t len, int advice);
 
 int qemu_create_pidfile(const char *filename);
 int qemu_get_thread_id(void);
+void qemu_msleep(int);
 
 #ifdef _WIN32
 static inline void qemu_timersub(const struct timeval *val1,
diff --git a/oslib-posix.c b/oslib-posix.c
index 196099c..bcbe6a7 100644
--- a/oslib-posix.c
+++ b/oslib-posix.c
@@ -39,7 +39,7 @@ extern int daemon(int, int);
 #include "sysemu.h"
 #include "trace.h"
 #include "qemu_socket.h"
-
+#include <sys/poll.h>
 
 
 int qemu_daemon(int nochdir, int noclose)
@@ -174,3 +174,8 @@ int qemu_utimensat(int dirfd, const char *path, const struct timespec *times,
 
     return utimes(path, &tv[0]);
 }
+
+void qemu_msleep(int msec)
+{
+    poll(NULL, 0, msec);
+}
diff --git a/oslib-win32.c b/oslib-win32.c
index 5f0759f..d823931 100644
--- a/oslib-win32.c
+++ b/oslib-win32.c
@@ -112,3 +112,8 @@ int qemu_gettimeofday(qemu_timeval *tp)
      Do not set errno on error.  */
   return 0;
 }
+
+void qemu_msleep(int msec)
+{
+    Sleep(msec);
+}
-- 
1.7.6

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH 08/13] add call_rcu support
  2011-08-15 21:08 [Qemu-devel] [RFC PATCH 00/13] RCU implementation for QEMU Paolo Bonzini
                   ` (6 preceding siblings ...)
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 07/13] osdep: add qemu_msleep Paolo Bonzini
@ 2011-08-15 21:08 ` Paolo Bonzini
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 09/13] rcu: avoid repeated system calls Paolo Bonzini
                   ` (4 subsequent siblings)
  12 siblings, 0 replies; 16+ messages in thread
From: Paolo Bonzini @ 2011-08-15 21:08 UTC (permalink / raw)
  To: qemu-devel

Unlike the RCU code, this is not (yet) meant to be equivalent to liburcu,
because we want to run call_rcu callbacks in the main thread and under
the global lock.

The data structure is indeed based on those found in liburcu (this makes
me feel safer), but more heavily commented and adapted to replace futexes
with QemuEvents.

This has a dependency on the iothread, because it relies on the fact that
qemu_schedule_bh works outside the global lock.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 Makefile.objs |    2 +-
 rcu-call.c    |  189 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 rcu.h         |    8 +++
 3 files changed, 198 insertions(+), 1 deletions(-)
 create mode 100644 rcu-call.c

diff --git a/Makefile.objs b/Makefile.objs
index 902a083..ea5a6eb 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -77,7 +77,7 @@ common-obj-y += $(net-obj-y)
 common-obj-y += $(qobject-obj-y)
 common-obj-$(CONFIG_LINUX) += $(fsdev-obj-$(CONFIG_LINUX))
 common-obj-y += readline.o console.o cursor.o qemu-error.o
-common-obj-y += $(oslib-obj-y)
+common-obj-y += $(oslib-obj-y) rcu-call.o
 common-obj-$(CONFIG_WIN32) += os-win32.o
 common-obj-$(CONFIG_POSIX) += os-posix.o
 
diff --git a/rcu-call.c b/rcu-call.c
new file mode 100644
index 0000000..cee4d8f
--- /dev/null
+++ b/rcu-call.c
@@ -0,0 +1,189 @@
+/*
+ * call_rcu implementation
+ *
+ * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright 2011 Red Hat, Inc.
+ *
+ * Ported to QEMU by Paolo Bonzini  <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#include "qemu-common.h"
+#include "qemu-thread.h"
+#include "qemu-barrier.h"
+#include "rcu.h"
+
+/* Communication between the call_rcu thread and bottom half.  */
+
+static int rcu_call_count, rcu_call_todo;
+static QemuEvent rcu_call_done_event;
+static QEMUBH *rcu_call_bh;
+
+#define RCU_CALL_MIN_SIZE        30
+
+/* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h
+ * from liburcu.  */
+
+static struct rcu_head dummy;
+static struct rcu_head *head = &dummy, **tail = &dummy.next;
+
+static QemuEvent rcu_call_ready_event;
+
+static void enqueue (struct rcu_head *node)
+{
+    struct rcu_head **old_tail;
+
+    node->next = NULL;
+
+    /*
+     * We have to do:
+     *    old_tail = tail, tail = &node->next;
+     *    *old_tail = node;
+     *
+     * The first line is done in the CAS loop below.  Note that producers only
+     * read from TAIL, so they already see a consistent state after this line.
+     * The loop is equivalent to
+     *
+     *     old_tail = __sync_swap(&tail, &node->next);
+     *
+     * but only clang has __sync_swap, not GCC. :(
+     */
+#if defined __i386__ || defined __x86_64__
+    asm volatile ("xchg%z0 %0, %1"
+                  : "=r" (old_tail), "+m" (ACCESS_ONCE(tail))
+                  : "0" (&node->next)
+                  : "memory");
+#else
+    do {
+        old_tail = ACCESS_ONCE(tail);
+    } while (!__sync_bool_compare_swap(&tail, old_tail, &node->next));
+#endif
+
+    /* At this point the consumer may still see a null head->next.  In this
+       case it will wait, so we have time to set the head->next and wake
+       them up.  */
+    ACCESS_ONCE(*old_tail) = node;
+}
+
+static struct rcu_head *dequeue(void)
+{
+    struct rcu_head *node;
+
+retry:
+    /* Test for an empty list, which we do not expect.  Note that for
+     * the consumer head and tail are always consistent.  The head
+     * is consistent because only the consumer reads/writes it.
+     * The tail, because it is the first step in the enqueuing.
+     * It is only the next pointers that might be inconsistent.  */
+    if (head == &dummy && ACCESS_ONCE(tail) == &dummy.next) {
+        abort();
+    }
+
+    /* Since we are the sole consumer, and we excluded the empty case
+     * above, the queue will always have at least two nodes (the dummy
+     * node, and the one being removed.  This means two things.  First,
+     * we do not need to update the tail pointer; second, if the head node
+     * has NULL in its next pointer, the value is wrong and we need
+     * to wait until its enqueuer finishes the update.  */
+    node = head;
+    head = ACCESS_ONCE(node->next);
+    if (head == NULL) {
+        do {
+            qemu_event_wait(&rcu_call_ready_event);
+            qemu_event_reset(&rcu_call_ready_event);
+            head = ACCESS_ONCE(node->next);
+        } while (head == NULL);
+        qemu_event_set(&rcu_call_ready_event);
+    }
+
+    /* If we dequeued the dummy node, add it back at the end and retry.  */
+    if (node == &dummy) {
+        enqueue(node);
+        goto retry;
+    }
+
+    return node;
+}
+
+static void call_rcu_bh(void *opaque)
+{
+    for (;;) {
+        int n;
+        smp_mb();
+        n = ACCESS_ONCE(rcu_call_todo);
+        if (n == 0) {
+            break;
+        }
+
+        /* We know that N callbacks have seen at least one grace
+         * period since they were added.  Process them.  */
+        __sync_fetch_and_add(&rcu_call_todo, -n);
+        while (n-- > 0) {
+            struct rcu_head *node = dequeue();
+            node->func(node);
+        }
+    }
+    qemu_event_set(&rcu_call_done_event);
+}
+
+static void *call_rcu_thread(void *opaque)
+{
+    for (;;) {
+        int n;
+
+        /* Heuristically wait for a decent number of callbacks to pile up.  */
+        qemu_event_wait(&rcu_call_done_event);
+        n = ACCESS_ONCE(rcu_call_count);
+        if (n < RCU_CALL_MIN_SIZE) {
+            n = ACCESS_ONCE(rcu_call_count);
+            if (n < RCU_CALL_MIN_SIZE) {
+                int tries = 0;
+                do {
+                    qemu_event_wait(&rcu_call_ready_event);
+                    qemu_event_reset(&rcu_call_ready_event);
+                    qemu_msleep(100);
+                    n = ACCESS_ONCE(rcu_call_count);
+                } while (n < RCU_CALL_MIN_SIZE && ++tries <= 5);
+                qemu_event_set(&rcu_call_ready_event);
+            }
+        }
+
+        /* Fetch rcu_call_count now.  The bottom half must only see
+         * elements added before synchronize_rcu() starts.  */
+        __sync_fetch_and_add(&rcu_call_count, -n);
+        synchronize_rcu();
+        __sync_fetch_and_add(&rcu_call_todo, n);
+
+        qemu_event_reset(&rcu_call_done_event);
+        qemu_bh_schedule(rcu_call_bh);
+    }
+    abort();
+}
+
+static void call_rcu_init(void)
+{
+    QemuThread thread;
+
+    qemu_event_init(&rcu_call_ready_event, false);
+    qemu_event_init(&rcu_call_done_event, true);
+    rcu_call_bh = qemu_bh_new(call_rcu_bh, NULL);
+
+    qemu_thread_create(&thread, call_rcu_thread, NULL);
+}
+
+void call_rcu(struct rcu_head *node, void (*func)(struct rcu_head *node))
+{
+    static QemuOnce init = QEMU_ONCE_INIT;
+    qemu_once(&init, call_rcu_init);
+    node->func = func;
+    enqueue(node);
+    __sync_fetch_and_add(&rcu_call_count, 1);
+    qemu_event_set(&rcu_call_ready_event);
+}
+
+void rcu_free(struct rcu_head *head)
+{
+    qemu_free(head);
+}
diff --git a/rcu.h b/rcu.h
index 569f696..86a1fea 100644
--- a/rcu.h
+++ b/rcu.h
@@ -128,6 +128,14 @@ extern void synchronize_rcu(void);
 extern void rcu_register_thread(void);
 extern void rcu_unregister_thread(void);
 
+struct rcu_head {
+	struct rcu_head *next;
+	void (*func)(struct rcu_head *head);
+};
+
+extern void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *head));
+extern void rcu_free(struct rcu_head *head);
+
 #ifdef __cplusplus 
 }
 #endif
-- 
1.7.6

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH 09/13] rcu: avoid repeated system calls
  2011-08-15 21:08 [Qemu-devel] [RFC PATCH 00/13] RCU implementation for QEMU Paolo Bonzini
                   ` (7 preceding siblings ...)
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 08/13] add call_rcu support Paolo Bonzini
@ 2011-08-15 21:08 ` Paolo Bonzini
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 10/13] rcu: report quiescent states Paolo Bonzini
                   ` (3 subsequent siblings)
  12 siblings, 0 replies; 16+ messages in thread
From: Paolo Bonzini @ 2011-08-15 21:08 UTC (permalink / raw)
  To: qemu-devel

Optimization posted to upstream mailing list.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 rcu.c |    5 +++++
 rcu.h |   22 ++++++++++++----------
 2 files changed, 17 insertions(+), 10 deletions(-)

diff --git a/rcu.c b/rcu.c
index e2f347a..aaf53ad 100644
--- a/rcu.c
+++ b/rcu.c
@@ -91,6 +91,11 @@ static void update_counter_and_wait(void)
 		 * while we walk the list.
 		 */
 		qemu_event_reset(&rcu_gp_event);
+		QLIST_FOREACH(index, &registry, node) {
+			ACCESS_ONCE(index->waiting) = true;
+		}
+		__sync_synchronize();
+
 		QLIST_FOREACH_SAFE(index, &registry, node, tmp) {
 			if (!rcu_gp_ongoing(&index->ctr)) {
 				QLIST_REMOVE(index, node);
diff --git a/rcu.h b/rcu.h
index 86a1fea..96ce9d3 100644
--- a/rcu.h
+++ b/rcu.h
@@ -74,6 +74,8 @@ extern QemuEvent rcu_gp_event;
 struct rcu_reader {
 	/* Data used by both reader and synchronize_rcu() */
 	unsigned long ctr;
+	bool waiting;
+
 	/* Data used for registry */
 	QLIST_ENTRY(rcu_reader) node;
 };
@@ -93,24 +95,24 @@ static inline void rcu_quiescent_state(void)
 {
 	smp_mb();
 	ACCESS_ONCE(rcu_reader.ctr) = ACCESS_ONCE(rcu_gp_ctr);
-#if 0
-	/* write rcu_reader.ctr before read futex.  Included in
-	 * qemu_event_set. */
+	/* write rcu_reader.ctr before read waiting.  */
 	smp_mb();
-#endif
-	qemu_event_set(&rcu_gp_event);
+	if (ACCESS_ONCE(rcu_reader.waiting)) {
+		ACCESS_ONCE(rcu_reader.waiting) = false;
+		qemu_event_set(&rcu_gp_event);
+	}
 }
 
 static inline void rcu_thread_offline(void)
 {
 	smp_mb();
 	ACCESS_ONCE(rcu_reader.ctr) = 0;
-#if 0
-	/* write rcu_reader.ctr before read futex.  Included in
-	 * qemu_event_set. */
+	/* write rcu_reader.ctr before read waiting.  */
 	smp_mb();
-#endif
-	qemu_event_set(&rcu_gp_event);
+	if (ACCESS_ONCE(rcu_reader.waiting)) {
+		ACCESS_ONCE(rcu_reader.waiting) = false;
+		qemu_event_set(&rcu_gp_event);
+	}
 }
 
 static inline void rcu_thread_online(void)
-- 
1.7.6

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH 10/13] rcu: report quiescent states
  2011-08-15 21:08 [Qemu-devel] [RFC PATCH 00/13] RCU implementation for QEMU Paolo Bonzini
                   ` (8 preceding siblings ...)
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 09/13] rcu: avoid repeated system calls Paolo Bonzini
@ 2011-08-15 21:08 ` Paolo Bonzini
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 11/13] rcuify iohandlers Paolo Bonzini
                   ` (2 subsequent siblings)
  12 siblings, 0 replies; 16+ messages in thread
From: Paolo Bonzini @ 2011-08-15 21:08 UTC (permalink / raw)
  To: qemu-devel

Our flavor of RCU requires threads to communicate when they are going
offline for extended periods of time on a condition variable, or waiting
for I/O, or executing guest code.  Add markers to this end.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 cpus.c     |   12 ++++++++++++
 kvm-all.c  |    3 +++
 os-win32.c |    3 +++
 vl.c       |    4 ++++
 4 files changed, 22 insertions(+), 0 deletions(-)

diff --git a/cpus.c b/cpus.c
index 73e17a1..d29b8cd 100644
--- a/cpus.c
+++ b/cpus.c
@@ -32,6 +32,7 @@
 #include "kvm.h"
 
 #include "qemu-thread.h"
+#include "rcu.h"
 #include "cpus.h"
 
 #ifndef _WIN32
@@ -712,7 +713,9 @@ void run_on_cpu(CPUState *env, void (*func)(void *data), void *data)
     while (!wi.done) {
         CPUState *self_env = cpu_single_env;
 
+        rcu_thread_offline();
         qemu_cond_wait(&qemu_work_cond, &qemu_global_mutex);
+        rcu_thread_online();
         cpu_single_env = self_env;
     }
 }
@@ -753,13 +756,16 @@ static void qemu_tcg_wait_io_event(void)
        /* Start accounting real time to the virtual clock if the CPUs
           are idle.  */
         qemu_clock_warp(vm_clock);
+        rcu_thread_offline();
         qemu_cond_wait(tcg_halt_cond, &qemu_global_mutex);
     }
 
     while (iothread_requesting_mutex) {
+        rcu_thread_offline();
         qemu_cond_wait(&qemu_io_proceeded_cond, &qemu_global_mutex);
     }
 
+    rcu_thread_online();
     for (env = first_cpu; env != NULL; env = env->next_cpu) {
         qemu_wait_io_event_common(env);
     }
@@ -768,9 +774,11 @@ static void qemu_tcg_wait_io_event(void)
 static void qemu_kvm_wait_io_event(CPUState *env)
 {
     while (cpu_thread_is_idle(env)) {
+        rcu_thread_offline();
         qemu_cond_wait(env->halt_cond, &qemu_global_mutex);
     }
 
+    rcu_thread_online();
     qemu_kvm_eat_signals(env);
     qemu_wait_io_event_common(env);
 }
@@ -780,6 +788,7 @@ static void *qemu_kvm_cpu_thread_fn(void *arg)
     CPUState *env = arg;
     int r;
 
+    rcu_register_thread();
     qemu_mutex_lock(&qemu_global_mutex);
     qemu_thread_get_self(env->thread);
     env->thread_id = qemu_get_thread_id();
@@ -818,6 +827,7 @@ static void *qemu_tcg_cpu_thread_fn(void *arg)
 {
     CPUState *env = arg;
 
+    rcu_register_thread();
     qemu_tcg_init_cpu_signals();
     qemu_thread_get_self(env->thread);
 
@@ -941,7 +951,9 @@ void pause_all_vcpus(void)
     }
 
     while (!all_vcpus_paused()) {
+        rcu_thread_offline();
         qemu_cond_wait(&qemu_pause_cond, &qemu_global_mutex);
+        rcu_thread_online();
         penv = first_cpu;
         while (penv) {
             qemu_cpu_kick(penv);
diff --git a/kvm-all.c b/kvm-all.c
index b9c172b..840f260 100644
--- a/kvm-all.c
+++ b/kvm-all.c
@@ -22,6 +22,7 @@
 
 #include "qemu-common.h"
 #include "qemu-barrier.h"
+#include "rcu.h"
 #include "sysemu.h"
 #include "hw/hw.h"
 #include "gdbstub.h"
@@ -952,7 +953,9 @@ int kvm_cpu_exec(CPUState *env)
         cpu_single_env = NULL;
         qemu_mutex_unlock_iothread();
 
+        rcu_thread_offline();
         run_ret = kvm_vcpu_ioctl(env, KVM_RUN, 0);
+        rcu_thread_online();
 
         qemu_mutex_lock_iothread();
         cpu_single_env = env;
diff --git a/os-win32.c b/os-win32.c
index b6652af..eab4418 100644
--- a/os-win32.c
+++ b/os-win32.c
@@ -32,6 +32,7 @@
 #include "config-host.h"
 #include "sysemu.h"
 #include "qemu-options.h"
+#include "rcu.h"
 
 /***********************************************************/
 /* Functions missing in mingw */
@@ -141,7 +142,9 @@ void os_host_main_loop_wait(int *timeout)
         WaitObjects *w = &wait_objects;
 
         qemu_mutex_unlock_iothread();
+        rcu_thread_offline();
         ret = WaitForMultipleObjects(w->num, w->events, FALSE, *timeout);
+        rcu_thread_online();
         qemu_mutex_lock_iothread();
         if (WAIT_OBJECT_0 + 0 <= ret && ret <= WAIT_OBJECT_0 + w->num - 1) {
             if (w->func[ret - WAIT_OBJECT_0])
diff --git a/vl.c b/vl.c
index c714127..6854036 100644
--- a/vl.c
+++ b/vl.c
@@ -131,6 +131,7 @@ int main(int argc, char **argv)
 #include "console.h"
 #include "sysemu.h"
 #include "gdbstub.h"
+#include "rcu.h"
 #include "qemu-timer.h"
 #include "qemu-char.h"
 #include "cache-utils.h"
@@ -1350,7 +1351,9 @@ void main_loop_wait(int nonblocking)
     slirp_select_fill(&nfds, &rfds, &wfds, &xfds);
 
     qemu_mutex_unlock_iothread();
+    rcu_thread_offline();
     ret = select(nfds + 1, &rfds, &wfds, &xfds, &tv);
+    rcu_thread_online();
     qemu_mutex_lock_iothread();
 
     qemu_iohandler_poll(&rfds, &wfds, &xfds, ret);
@@ -2113,6 +2116,7 @@ int main(int argc, char **argv, char **envp)
 
     QLIST_INIT (&vm_change_state_head);
     os_setup_early_signal_handling();
+    rcu_register_thread();
 
     module_call_init(MODULE_INIT_MACHINE);
     machine = find_default_machine();
-- 
1.7.6

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH 11/13] rcuify iohandlers
  2011-08-15 21:08 [Qemu-devel] [RFC PATCH 00/13] RCU implementation for QEMU Paolo Bonzini
                   ` (9 preceding siblings ...)
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 10/13] rcu: report quiescent states Paolo Bonzini
@ 2011-08-15 21:08 ` Paolo Bonzini
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 12/13] split MRU ram list Paolo Bonzini
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 13/13] RCUify ram_list Paolo Bonzini
  12 siblings, 0 replies; 16+ messages in thread
From: Paolo Bonzini @ 2011-08-15 21:08 UTC (permalink / raw)
  To: qemu-devel

Just a proof of concept, with the write-side still under the global
lock, in order to test the call_rcu code.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 iohandler.c |   45 ++++++++++++++++++++-------------------------
 1 files changed, 20 insertions(+), 25 deletions(-)

diff --git a/iohandler.c b/iohandler.c
index 2b82421..4c1f68f 100644
--- a/iohandler.c
+++ b/iohandler.c
@@ -26,17 +26,18 @@
 #include "qemu-common.h"
 #include "qemu-char.h"
 #include "qemu-queue.h"
+#include "rcu.h"
 
 #ifndef _WIN32
 #include <sys/wait.h>
 #endif
 
 typedef struct IOHandlerRecord {
+    struct rcu_head h;
     int fd;
     IOCanReadHandler *fd_read_poll;
     IOHandler *fd_read;
     IOHandler *fd_write;
-    int deleted;
     void *opaque;
     QLIST_ENTRY(IOHandlerRecord) next;
 } IOHandlerRecord;
@@ -55,28 +56,26 @@ int qemu_set_fd_handler2(int fd,
 {
     IOHandlerRecord *ioh;
 
-    if (!fd_read && !fd_write) {
-        QLIST_FOREACH(ioh, &io_handlers, next) {
-            if (ioh->fd == fd) {
-                ioh->deleted = 1;
-                break;
-            }
-        }
-    } else {
-        QLIST_FOREACH(ioh, &io_handlers, next) {
-            if (ioh->fd == fd)
-                goto found;
+    //qemu_mutex_lock(&iohandler_lock);
+    QLIST_FOREACH(ioh, &io_handlers, next) {
+        if (ioh->fd == fd) {
+            break;
         }
+    }
+    if (ioh) {
+        QLIST_REMOVE(ioh, next);
+        call_rcu(&ioh->h, rcu_free);
+    }
+    if (fd_read || fd_write) {
         ioh = qemu_mallocz(sizeof(IOHandlerRecord));
-        QLIST_INSERT_HEAD(&io_handlers, ioh, next);
-    found:
         ioh->fd = fd;
         ioh->fd_read_poll = fd_read_poll;
         ioh->fd_read = fd_read;
         ioh->fd_write = fd_write;
         ioh->opaque = opaque;
-        ioh->deleted = 0;
+        QLIST_INSERT_HEAD(&io_handlers, ioh, next);
     }
+    //qemu_mutex_lock(&iohandler_unlock);
     return 0;
 }
 
@@ -92,9 +91,8 @@ void qemu_iohandler_fill(int *pnfds, fd_set *readfds, fd_set *writefds, fd_set *
 {
     IOHandlerRecord *ioh;
 
+    rcu_read_lock();
     QLIST_FOREACH(ioh, &io_handlers, next) {
-        if (ioh->deleted)
-            continue;
         if (ioh->fd_read &&
             (!ioh->fd_read_poll ||
              ioh->fd_read_poll(ioh->opaque) != 0)) {
@@ -108,6 +106,7 @@ void qemu_iohandler_fill(int *pnfds, fd_set *readfds, fd_set *writefds, fd_set *
                 *pnfds = ioh->fd;
         }
     }
+    rcu_read_unlock();
 }
 
 void qemu_iohandler_poll(fd_set *readfds, fd_set *writefds, fd_set *xfds, int ret)
@@ -115,20 +114,16 @@ void qemu_iohandler_poll(fd_set *readfds, fd_set *writefds, fd_set *xfds, int re
     if (ret > 0) {
         IOHandlerRecord *pioh, *ioh;
 
+        rcu_read_lock();
         QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) {
-            if (!ioh->deleted && ioh->fd_read && FD_ISSET(ioh->fd, readfds)) {
+            if (ioh->fd_read && FD_ISSET(ioh->fd, readfds)) {
                 ioh->fd_read(ioh->opaque);
             }
-            if (!ioh->deleted && ioh->fd_write && FD_ISSET(ioh->fd, writefds)) {
+            if (ioh->fd_write && FD_ISSET(ioh->fd, writefds)) {
                 ioh->fd_write(ioh->opaque);
             }
-
-            /* Do this last in case read/write handlers marked it for deletion */
-            if (ioh->deleted) {
-                QLIST_REMOVE(ioh, next);
-                qemu_free(ioh);
-            }
         }
+        rcu_read_unlock();
     }
 }
 
-- 
1.7.6

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH 12/13] split MRU ram list
  2011-08-15 21:08 [Qemu-devel] [RFC PATCH 00/13] RCU implementation for QEMU Paolo Bonzini
                   ` (10 preceding siblings ...)
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 11/13] rcuify iohandlers Paolo Bonzini
@ 2011-08-15 21:08 ` Paolo Bonzini
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 13/13] RCUify ram_list Paolo Bonzini
  12 siblings, 0 replies; 16+ messages in thread
From: Paolo Bonzini @ 2011-08-15 21:08 UTC (permalink / raw)
  To: qemu-devel

Outside the execution threads the normal, non-MRU-ized order of
the RAM blocks should always be enough.  So manage two separate
lists, which will have separate locking rules.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 cpu-all.h |    4 +++-
 exec.c    |   16 +++++++++++-----
 2 files changed, 14 insertions(+), 6 deletions(-)

diff --git a/cpu-all.h b/cpu-all.h
index f5c82cd..083d9e6 100644
--- a/cpu-all.h
+++ b/cpu-all.h
@@ -479,8 +479,9 @@ typedef struct RAMBlock {
     ram_addr_t offset;
     ram_addr_t length;
     uint32_t flags;
-    char idstr[256];
     QLIST_ENTRY(RAMBlock) next;
+    QLIST_ENTRY(RAMBlock) next_mru;
+    char idstr[256];
 #if defined(__linux__) && !defined(TARGET_S390X)
     int fd;
 #endif
@@ -489,6 +490,7 @@ typedef struct RAMBlock {
 typedef struct RAMList {
     uint8_t *phys_dirty;
     QLIST_HEAD(, RAMBlock) blocks;
+    QLIST_HEAD(, RAMBlock) blocks_mru;
 } RAMList;
 extern RAMList ram_list;
 
diff --git a/exec.c b/exec.c
index 63adb18..d25e3cc 100644
--- a/exec.c
+++ b/exec.c
@@ -110,7 +110,10 @@ static uint8_t *code_gen_ptr;
 int phys_ram_fd;
 static int in_migration;
 
-RAMList ram_list = { .blocks = QLIST_HEAD_INITIALIZER(ram_list.blocks) };
+RAMList ram_list = {
+    .blocks = QLIST_HEAD_INITIALIZER(ram_list.blocks),
+    .blocks_mru = QLIST_HEAD_INITIALIZER(ram_list.blocks_mru)
+};
 
 static MemoryRegion *system_memory;
 static MemoryRegion *system_io;
@@ -2983,6 +2986,7 @@ ram_addr_t qemu_ram_alloc_from_ptr(DeviceState *dev, const char *name,
     new_block->length = size;
 
     QLIST_INSERT_HEAD(&ram_list.blocks, new_block, next);
+    QLIST_INSERT_HEAD(&ram_list.blocks_mru, new_block, next_mru);
 
     ram_list.phys_dirty = qemu_realloc(ram_list.phys_dirty,
                                        last_ram_offset() >> TARGET_PAGE_BITS);
@@ -3007,6 +3011,7 @@ void qemu_ram_free_from_ptr(ram_addr_t addr)
     QLIST_FOREACH(block, &ram_list.blocks, next) {
         if (addr == block->offset) {
             QLIST_REMOVE(block, next);
+            QLIST_REMOVE(block, next_mru);
             qemu_free(block);
             return;
         }
@@ -3020,6 +3025,7 @@ void qemu_ram_free(ram_addr_t addr)
     QLIST_FOREACH(block, &ram_list.blocks, next) {
         if (addr == block->offset) {
             QLIST_REMOVE(block, next);
+            QLIST_REMOVE(block, next_mru);
             if (block->flags & RAM_PREALLOC_MASK) {
                 ;
             } else if (mem_path) {
@@ -3124,12 +3130,12 @@ void *qemu_get_ram_ptr(ram_addr_t addr)
 {
     RAMBlock *block;
 
-    QLIST_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH(block, &ram_list.blocks_mru, next_mru) {
         if (addr - block->offset < block->length) {
             /* Move this entry to to start of the list.  */
             if (block != QLIST_FIRST(&ram_list.blocks)) {
-                QLIST_REMOVE(block, next);
-                QLIST_INSERT_HEAD(&ram_list.blocks, block, next);
+                QLIST_REMOVE(block, next_mru);
+                QLIST_INSERT_HEAD(&ram_list.blocks_mru, block, next_mru);
             }
             if (xen_enabled()) {
                 /* We need to check if the requested address is in the RAM
@@ -3224,7 +3230,7 @@ int qemu_ram_addr_from_host(void *ptr, ram_addr_t *ram_addr)
         return 0;
     }
 
-    QLIST_FOREACH(block, &ram_list.blocks, next) {
+    QLIST_FOREACH(block, &ram_list.blocks_mru, next_mru) {
         /* This case append when the block is not mapped. */
         if (block->host == NULL) {
             continue;
-- 
1.7.6

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [RFC PATCH 13/13] RCUify ram_list
  2011-08-15 21:08 [Qemu-devel] [RFC PATCH 00/13] RCU implementation for QEMU Paolo Bonzini
                   ` (11 preceding siblings ...)
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 12/13] split MRU ram list Paolo Bonzini
@ 2011-08-15 21:08 ` Paolo Bonzini
  12 siblings, 0 replies; 16+ messages in thread
From: Paolo Bonzini @ 2011-08-15 21:08 UTC (permalink / raw)
  To: qemu-devel

Incomplete because the users of qemu_get_ram_ptr should be wrapped
with rcu_read_lock/rcu_read_unlock.  Happens to work because those
are nops anyway. :)

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 arch_init.c |   14 +++++++
 cpu-all.h   |    4 ++
 exec.c      |  124 ++++++++++++++++++++++++++++++++++++++++------------------
 3 files changed, 103 insertions(+), 39 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index 484b39d..f5a567b 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -117,6 +117,7 @@ static int ram_save_block(QEMUFile *f)
     ram_addr_t current_addr;
     int bytes_sent = 0;
 
+    rcu_read_lock();
     if (!block)
         block = QLIST_FIRST(&ram_list.blocks);
 
@@ -167,6 +168,7 @@ static int ram_save_block(QEMUFile *f)
         current_addr = block->offset + offset;
 
     } while (current_addr != last_block->offset + last_offset);
+    rcu_read_unlock();
 
     last_block = block;
     last_offset = offset;
@@ -181,6 +183,7 @@ static ram_addr_t ram_save_remaining(void)
     RAMBlock *block;
     ram_addr_t count = 0;
 
+    rcu_read_lock();
     QLIST_FOREACH(block, &ram_list.blocks, next) {
         ram_addr_t addr;
         for (addr = block->offset; addr < block->offset + block->length;
@@ -190,6 +193,7 @@ static ram_addr_t ram_save_remaining(void)
             }
         }
     }
+    rcu_read_unlock();
 
     return count;
 }
@@ -209,8 +213,10 @@ uint64_t ram_bytes_total(void)
     RAMBlock *block;
     uint64_t total = 0;
 
+    rcu_read_lock();
     QLIST_FOREACH(block, &ram_list.blocks, next)
         total += block->length;
+    rcu_read_unlock();
 
     return total;
 }
@@ -232,6 +238,7 @@ static void sort_ram_list(void)
     RAMBlock *block, *nblock, **blocks;
     int n;
     n = 0;
+    //qemu_mutex_lock(&ram_list.mutex);
     QLIST_FOREACH(block, &ram_list.blocks, next) {
         ++n;
     }
@@ -245,6 +252,7 @@ static void sort_ram_list(void)
     while (--n >= 0) {
         QLIST_INSERT_HEAD(&ram_list.blocks, blocks[n], next);
     }
+    //qemu_mutex_unlock(&ram_list.mutex);
     qemu_free(blocks);
 }
 
@@ -273,6 +281,7 @@ int ram_save_live(Monitor *mon, QEMUFile *f, int stage, void *opaque)
         sort_ram_list();
 
         /* Make sure all dirty bits are set */
+	rcu_read_lock();
         QLIST_FOREACH(block, &ram_list.blocks, next) {
             for (addr = block->offset; addr < block->offset + block->length;
                  addr += TARGET_PAGE_SIZE) {
@@ -282,17 +291,20 @@ int ram_save_live(Monitor *mon, QEMUFile *f, int stage, void *opaque)
                 }
             }
         }
+	rcu_read_unlock();
 
         /* Enable dirty memory tracking */
         cpu_physical_memory_set_dirty_tracking(1);
 
         qemu_put_be64(f, ram_bytes_total() | RAM_SAVE_FLAG_MEM_SIZE);
 
+	rcu_read_lock();
         QLIST_FOREACH(block, &ram_list.blocks, next) {
             qemu_put_byte(f, strlen(block->idstr));
             qemu_put_buffer(f, (uint8_t *)block->idstr, strlen(block->idstr));
             qemu_put_be64(f, block->length);
         }
+	rcu_read_unlock();
     }
 
     bytes_transferred_last = bytes_transferred;
@@ -374,6 +386,7 @@ int ram_load(QEMUFile *f, void *opaque, int version_id)
         return -EINVAL;
     }
 
+    rcu_read_lock();
     do {
         addr = qemu_get_be64(f);
 
@@ -453,6 +466,7 @@ int ram_load(QEMUFile *f, void *opaque, int version_id)
             return -EIO;
         }
     } while (!(flags & RAM_SAVE_FLAG_EOS));
+    rcu_read_unlock();
 
     return 0;
 }
diff --git a/cpu-all.h b/cpu-all.h
index 083d9e6..7ed3f75 100644
--- a/cpu-all.h
+++ b/cpu-all.h
@@ -21,6 +21,7 @@
 
 #include "qemu-common.h"
 #include "cpu-common.h"
+#include "rcu.h"
 
 /* some important defines:
  *
@@ -475,11 +476,14 @@ extern ram_addr_t ram_size;
 #define RAM_PREALLOC_MASK   (1 << 0)
 
 typedef struct RAMBlock {
+    struct rcu_head h;
     uint8_t *host;
     ram_addr_t offset;
     ram_addr_t length;
     uint32_t flags;
+    // should be protected by its own lock + RCU on the read side
     QLIST_ENTRY(RAMBlock) next;
+    // protected by the iothread lock + RCU to on the read side
     QLIST_ENTRY(RAMBlock) next_mru;
     char idstr[256];
 #if defined(__linux__) && !defined(TARGET_S390X)
diff --git a/exec.c b/exec.c
index d25e3cc..6a7cec7 100644
--- a/exec.c
+++ b/exec.c
@@ -2935,6 +2935,7 @@ ram_addr_t qemu_ram_alloc_from_ptr(DeviceState *dev, const char *name,
     }
     pstrcat(new_block->idstr, sizeof(new_block->idstr), name);
 
+    //qemu_mutex_lock(&ram_list.mutex);
     QLIST_FOREACH(block, &ram_list.blocks, next) {
         if (!strcmp(block->idstr, new_block->idstr)) {
             fprintf(stderr, "RAMBlock \"%s\" already registered, abort!\n",
@@ -2986,6 +2987,7 @@ ram_addr_t qemu_ram_alloc_from_ptr(DeviceState *dev, const char *name,
     new_block->length = size;
 
     QLIST_INSERT_HEAD(&ram_list.blocks, new_block, next);
+    //qemu_mutex_unlock(&ram_list.mutex);
     QLIST_INSERT_HEAD(&ram_list.blocks_mru, new_block, next_mru);
 
     ram_list.phys_dirty = qemu_realloc(ram_list.phys_dirty,
@@ -3008,52 +3010,62 @@ void qemu_ram_free_from_ptr(ram_addr_t addr)
 {
     RAMBlock *block;
 
+    //qemu_mutex_lock(&ram_list.mutex);
     QLIST_FOREACH(block, &ram_list.blocks, next) {
         if (addr == block->offset) {
             QLIST_REMOVE(block, next);
             QLIST_REMOVE(block, next_mru);
-            qemu_free(block);
+            call_rcu(&block->h, rcu_free);
             return;
         }
     }
+    //qemu_mutex_unlock(&ram_list.mutex);
+}
+
+static void rcu_free_block(struct rcu_head *h)
+{
+    RAMBlock *block = container_of(h, RAMBlock, h);
+    if (block->flags & RAM_PREALLOC_MASK) {
+        ;
+    } else if (mem_path) {
+#if defined (__linux__) && !defined(TARGET_S390X)
+        if (block->fd) {
+            munmap(block->host, block->length);
+            close(block->fd);
+        } else {
+            qemu_vfree(block->host);
+        }
+#else
+        abort();
+#endif
+    } else {
+#if defined(TARGET_S390X) && defined(CONFIG_KVM)
+        munmap(block->host, block->length);
+#else
+        if (xen_enabled()) {
+            xen_invalidate_map_cache_entry(block->host);
+        } else {
+            qemu_vfree(block->host);
+        }
+#endif
+    }
+    qemu_free(block);
 }
 
 void qemu_ram_free(ram_addr_t addr)
 {
     RAMBlock *block;
 
+    //qemu_mutex_lock(&ram_list.mutex);
     QLIST_FOREACH(block, &ram_list.blocks, next) {
         if (addr == block->offset) {
             QLIST_REMOVE(block, next);
             QLIST_REMOVE(block, next_mru);
-            if (block->flags & RAM_PREALLOC_MASK) {
-                ;
-            } else if (mem_path) {
-#if defined (__linux__) && !defined(TARGET_S390X)
-                if (block->fd) {
-                    munmap(block->host, block->length);
-                    close(block->fd);
-                } else {
-                    qemu_vfree(block->host);
-                }
-#else
-                abort();
-#endif
-            } else {
-#if defined(TARGET_S390X) && defined(CONFIG_KVM)
-                munmap(block->host, block->length);
-#else
-                if (xen_enabled()) {
-                    xen_invalidate_map_cache_entry(block->host);
-                } else {
-                    qemu_vfree(block->host);
-                }
-#endif
-            }
-            qemu_free(block);
+            call_rcu(&block->h, rcu_free_block);
             return;
         }
     }
+    //qemu_mutex_unlock(&ram_list.mutex);
 
 }
 
@@ -3065,6 +3077,7 @@ void qemu_ram_remap(ram_addr_t addr, ram_addr_t length)
     int flags;
     void *area, *vaddr;
 
+    rcu_read_lock();
     QLIST_FOREACH(block, &ram_list.blocks, next) {
         offset = addr - block->offset;
         if (offset < block->length) {
@@ -3072,8 +3085,9 @@ void qemu_ram_remap(ram_addr_t addr, ram_addr_t length)
             if (block->flags & RAM_PREALLOC_MASK) {
                 ;
             } else {
+		/* No need to munmap the memory, mmap will discard the old mapping
+		 * atomically.  */
                 flags = MAP_FIXED;
-                munmap(vaddr, length);
                 if (mem_path) {
 #if defined(__linux__) && !defined(TARGET_S390X)
                     if (block->fd) {
@@ -3112,9 +3126,10 @@ void qemu_ram_remap(ram_addr_t addr, ram_addr_t length)
                 }
                 qemu_madvise(vaddr, length, QEMU_MADV_MERGEABLE);
             }
-            return;
+            break;
         }
     }
+    rcu_read_unlock();
 }
 #endif /* !_WIN32 */
 
@@ -3128,8 +3143,16 @@ void qemu_ram_remap(ram_addr_t addr, ram_addr_t length)
  */
 void *qemu_get_ram_ptr(ram_addr_t addr)
 {
+    uint8_t *p = NULL;
     RAMBlock *block;
 
+    /* RCU protects the "existence" of the blocks, the iothread lock
+     * protects the next_mru chain.  This rcu_read_lock() is most
+     * likely nested, since the caller probably wants to do something
+     * with the result as well!  FIXME: this is not done anywhere yet.
+     * Due to our RCU implementation we can avoid that, but it's not
+     * clean. */
+    rcu_read_lock();
     QLIST_FOREACH(block, &ram_list.blocks_mru, next_mru) {
         if (addr - block->offset < block->length) {
             /* Move this entry to to start of the list.  */
@@ -3143,20 +3166,24 @@ void *qemu_get_ram_ptr(ram_addr_t addr)
                  * In that case just map until the end of the page.
                  */
                 if (block->offset == 0) {
-                    return xen_map_cache(addr, 0, 0);
+                    p = xen_map_cache(addr, 0, 0);
+		    break;
                 } else if (block->host == NULL) {
                     block->host =
                         xen_map_cache(block->offset, block->length, 1);
                 }
             }
-            return block->host + (addr - block->offset);
+	    p = block->host + (addr - block->offset);
+            break;
         }
     }
+    rcu_read_unlock();
 
-    fprintf(stderr, "Bad ram offset %" PRIx64 "\n", (uint64_t)addr);
-    abort();
-
-    return NULL;
+    if (!p) {
+        fprintf(stderr, "Bad ram offset %" PRIx64 "\n", (uint64_t)addr);
+        abort();
+    }
+    return p;
 }
 
 /* Return a host pointer to ram allocated with qemu_ram_alloc.
@@ -3164,8 +3191,10 @@ void *qemu_get_ram_ptr(ram_addr_t addr)
  */
 void *qemu_safe_ram_ptr(ram_addr_t addr)
 {
+    uint8_t *p = NULL;
     RAMBlock *block;
 
+    rcu_read_lock();
     QLIST_FOREACH(block, &ram_list.blocks, next) {
         if (addr - block->offset < block->length) {
             if (xen_enabled()) {
@@ -3174,20 +3203,25 @@ void *qemu_safe_ram_ptr(ram_addr_t addr)
                  * In that case just map until the end of the page.
                  */
                 if (block->offset == 0) {
-                    return xen_map_cache(addr, 0, 0);
+                    p = xen_map_cache(addr, 0, 0);
+		    break;
                 } else if (block->host == NULL) {
                     block->host =
                         xen_map_cache(block->offset, block->length, 1);
                 }
             }
-            return block->host + (addr - block->offset);
+            p = block->host + (addr - block->offset);
+	    break;
         }
     }
+    rcu_read_unlock();
 
-    fprintf(stderr, "Bad ram offset %" PRIx64 "\n", (uint64_t)addr);
-    abort();
+    if (!p) {
+        fprintf(stderr, "Bad ram offset %" PRIx64 "\n", (uint64_t)addr);
+        abort();
+    }
 
-    return NULL;
+    return p;
 }
 
 /* Return a host pointer to guest's ram. Similar to qemu_get_ram_ptr
@@ -3202,13 +3236,16 @@ void *qemu_ram_ptr_length(ram_addr_t addr, ram_addr_t *size)
     } else {
         RAMBlock *block;
 
+	rcu_read_lock();
         QLIST_FOREACH(block, &ram_list.blocks, next) {
             if (addr - block->offset < block->length) {
                 if (addr - block->offset + *size > block->length)
                     *size = block->length - addr + block->offset;
+	        rcu_read_unlock();
                 return block->host + (addr - block->offset);
             }
         }
+	rcu_read_unlock();
 
         fprintf(stderr, "Bad ram offset %" PRIx64 "\n", (uint64_t)addr);
         abort();
@@ -3230,6 +3267,13 @@ int qemu_ram_addr_from_host(void *ptr, ram_addr_t *ram_addr)
         return 0;
     }
 
+    /* RCU protects the "existence" of the blocks, the iothread lock
+     * protects the next_mru chain.  This rcu_read_lock() is most
+     * likely nested, since the caller probably wants to do something
+     * with the result as well!  FIXME: this is not done anywhere yet.
+     * Due to our RCU implementation we can avoid that, but it's not
+     * clean. */
+    rcu_read_lock();
     QLIST_FOREACH(block, &ram_list.blocks_mru, next_mru) {
         /* This case append when the block is not mapped. */
         if (block->host == NULL) {
@@ -3237,9 +3281,11 @@ int qemu_ram_addr_from_host(void *ptr, ram_addr_t *ram_addr)
         }
         if (host - block->host < block->length) {
             *ram_addr = block->offset + (host - block->host);
+	    rcu_read_unlock();
             return 0;
         }
     }
+    rcu_read_unlock();
 
     return -1;
 }
-- 
1.7.6

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* Re: [Qemu-devel] [RFC PATCH 05/13] add rcu library
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 05/13] add rcu library Paolo Bonzini
@ 2011-08-17 17:04   ` Blue Swirl
  0 siblings, 0 replies; 16+ messages in thread
From: Blue Swirl @ 2011-08-17 17:04 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel

On Mon, Aug 15, 2011 at 9:08 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
> This includes a (mangled) copy of the urcu-qsbr code from liburcu.
> The main changes are: 1) removing dependencies on many other header files
> in liburcu; 2) removing for simplicity the tentative busy waiting in
> synchronize_rcu, which has limited performance effects; 3) replacing
> futexes in synchronize_rcu with QemuEvents for Win32 portability.
> The API is the same as liburcu, so it should be possible in the future
> to require liburcu on POSIX systems for example and use our copy only
> on Windows.
>
> Among the various versions available I chose urcu-qsbr, which has the
> fastest rcu_read_{lock,unlock} but requires the program to manually
> annotate quiescent points or intervals.  QEMU threads usually have easily
> identified quiescent periods, so this should not be a problem.
>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  Makefile.objs |    2 +-
>  qemu-queue.h  |   11 +++
>  rcu-pointer.h |  119 +++++++++++++++++++++++++++++++
>  rcu.c         |  221 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>  rcu.h         |  135 +++++++++++++++++++++++++++++++++++
>  5 files changed, 487 insertions(+), 1 deletions(-)
>  create mode 100644 rcu-pointer.h
>  create mode 100644 rcu.c
>  create mode 100644 rcu.h
>
> diff --git a/Makefile.objs b/Makefile.objs
> index 16eef38..902a083 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -6,7 +6,7 @@ qobject-obj-y += qerror.o error.o
>
>  #######################################################################
>  # oslib-obj-y is code depending on the OS (win32 vs posix)
> -oslib-obj-y = osdep.o
> +oslib-obj-y = osdep.o rcu.o
>  oslib-obj-$(CONFIG_WIN32) += oslib-win32.o qemu-thread-win32.o
>  oslib-obj-$(CONFIG_POSIX) += oslib-posix.o qemu-thread-posix.o
>
> diff --git a/qemu-queue.h b/qemu-queue.h
> index 1d07745..cc8e55b 100644
> --- a/qemu-queue.h
> +++ b/qemu-queue.h
> @@ -100,6 +100,17 @@ struct {                                                                \
>         (head)->lh_first = NULL;                                        \
>  } while (/*CONSTCOND*/0)
>
> +#define QLIST_SWAP(dstlist, srclist, field) do {                        \
> +        void *tmplist;                                                  \
> +        tmplist = (srclist)->lh_first;                                  \
> +        (srclist)->lh_first = (dstlist)->lh_first;                      \
> +        if ((srclist)->lh_first != NULL)                                \
> +            (srclist)->lh_first->field.le_prev = &(srclist)->lh_first;  \
> +        (dstlist)->lh_first = tmplist; \
> +        if ((dstlist)->lh_first != NULL)                                \
> +            (dstlist)->lh_first->field.le_prev = &(dstlist)->lh_first;  \
> +} while (/*CONSTCOND*/0)
> +
>  #define QLIST_INSERT_AFTER(listelm, elm, field) do {                    \
>         if (((elm)->field.le_next = (listelm)->field.le_next) != NULL)  \
>                 (listelm)->field.le_next->field.le_prev =               \
> diff --git a/rcu-pointer.h b/rcu-pointer.h
> new file mode 100644
> index 0000000..4381313
> --- /dev/null
> +++ b/rcu-pointer.h
> @@ -0,0 +1,119 @@
> +#ifndef _URCU_POINTER_STATIC_H
> +#define _URCU_POINTER_STATIC_H
> +
> +/*
> + * urcu-pointer-static.h
> + *
> + * Userspace RCU header. Operations on pointers.
> + *
> + * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See urcu-pointer.h for
> + * linking dynamically with the userspace rcu library.
> + *
> + * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
> + * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + *
> + * IBM's contributions to this file may be relicensed under LGPLv2 or later.
> + */
> +
> +#include "compiler.h"
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#ifdef __alpha__
> +#define smp_read_barrier_depends()   asm volatile("mb":::"memory")
> +#else
> +#define smp_read_barrier_depends()
> +#endif
> +
> +/**
> + * rcu_dereference - reads (copy) a RCU-protected pointer to a local variable
> + * into a RCU read-side critical section. The pointer can later be safely
> + * dereferenced within the critical section.
> + *
> + * This ensures that the pointer copy is invariant thorough the whole critical
> + * section.
> + *
> + * Inserts memory barriers on architectures that require them (currently only
> + * Alpha) and documents which pointers are protected by RCU.
> + *
> + * The compiler memory barrier in ACCESS_ONCE() ensures that value-speculative
> + * optimizations (e.g. VSS: Value Speculation Scheduling) does not perform the
> + * data read before the pointer read by speculating the value of the pointer.
> + * Correct ordering is ensured because the pointer is read as a volatile access.
> + * This acts as a global side-effect operation, which forbids reordering of
> + * dependent memory operations. Note that such concern about dependency-breaking
> + * optimizations will eventually be taken care of by the "memory_order_consume"
> + * addition to forthcoming C++ standard.
> + *
> + * Should match rcu_assign_pointer() or rcu_xchg_pointer().
> + */
> +
> +#define rcu_dereference(p)     ({                                      \
> +                               typeof(p) _________p1 = ACCESS_ONCE(p); \
> +                               smp_read_barrier_depends();             \
> +                               (_________p1);                          \
> +                               })
> +
> +/**
> + * rcu_cmpxchg_pointer - same as rcu_assign_pointer, but tests if the pointer
> + * is as expected by "old". If succeeds, returns the previous pointer to the
> + * data structure, which can be safely freed after waiting for a quiescent state
> + * using synchronize_rcu(). If fails (unexpected value), returns old (which
> + * should not be freed !).
> + */
> +
> +#define rcu_cmpxchg_pointer(p, old, _new)                              \
> +       ({                                                              \
> +               typeof(*p) _________pold = (old);                       \
> +               typeof(*p) _________pnew = (_new);                      \
> +               if (!__builtin_constant_p(_new) ||                      \
> +                   ((_new) != NULL))                                   \
> +                       __sync_synchronize();                           \
> +               uatomic_cmpxchg(p, _________pold, _________pnew);       \
> +       })
> +
> +#define rcu_set_pointer(p, v)                          \
> +       ({                                              \
> +               typeof(*p) _________pv = (v);           \
> +               if (!__builtin_constant_p(v) ||         \
> +                   ((v) != NULL))                      \
> +                       __sync_synchronize();           \
> +               ACCESS_ONCE(*p) = _________pv;          \
> +       })
> +
> +/**
> + * rcu_assign_pointer - assign (publicize) a pointer to a new data structure
> + * meant to be read by RCU read-side critical sections. Returns the assigned
> + * value.
> + *
> + * Documents which pointers will be dereferenced by RCU read-side critical
> + * sections and adds the required memory barriers on architectures requiring
> + * them. It also makes sure the compiler does not reorder code initializing the
> + * data structure before its publication.
> + *
> + * Should match rcu_dereference_pointer().
> + */
> +
> +#define rcu_assign_pointer(p, v)       rcu_set_pointer(&(p), v)
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _URCU_POINTER_STATIC_H */
> diff --git a/rcu.c b/rcu.c
> new file mode 100644
> index 0000000..e2f347a
> --- /dev/null
> +++ b/rcu.c
> @@ -0,0 +1,221 @@
> +/*
> + * urcu-qsbr.c
> + *
> + * Userspace RCU QSBR library
> + *
> + * Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
> + * Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
> + * Copyright 2011 Red Hat, Inc.
> + *
> + * Ported to QEMU by Paolo Bonzini  <pbonzini@redhat.com>
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + *
> + * IBM's contributions to this file may be relicensed under LGPLv2 or later.
> + */
> +
> +#include <stdio.h>
> +#include <assert.h>
> +#include <stdlib.h>
> +#include <stdint.h>
> +#include <errno.h>
> +#include "rcu.h"
> +#include "qemu-barrier.h"
> +
> +/*
> + * Global grace period counter.  Bit 0 is one if the thread is online.
> + * Bits 1 and above are defined in synchronize_rcu/update_counter_and_wait.
> + */
> +#define RCU_GP_ONLINE           (1UL << 0)
> +#define RCU_GP_CTR              (1UL << 1)
> +
> +unsigned long rcu_gp_ctr = RCU_GP_ONLINE;
> +
> +QemuEvent rcu_gp_event;
> +QemuMutex rcu_gp_lock;
> +
> +/*
> + * Check whether a quiescent state was crossed between the beginning of
> + * update_counter_and_wait and now.
> + */
> +static inline int rcu_gp_ongoing(unsigned long *ctr)
> +{
> +       unsigned long v;
> +
> +       v = ACCESS_ONCE(*ctr);
> +       return v && (v != rcu_gp_ctr);
> +}
> +
> +/*
> + * Written to only by each individual reader. Read by both the reader and the
> + * writers.
> + */
> +__thread struct rcu_reader rcu_reader;
> +
> +typedef QLIST_HEAD(, rcu_reader) ThreadList;
> +static ThreadList registry = QLIST_HEAD_INITIALIZER(registry);
> +
> +static void update_counter_and_wait(void)
> +{
> +       ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders);
> +       struct rcu_reader *index, *tmp;
> +
> +       if (sizeof (rcu_gp_ctr) <= 4) {
> +               /* Switch parity: 0 -> 1, 1 -> 0 */
> +               ACCESS_ONCE(rcu_gp_ctr) = rcu_gp_ctr ^ RCU_GP_CTR;
> +       } else {
> +               /* Increment current grace period.  */
> +               ACCESS_ONCE(rcu_gp_ctr) = rcu_gp_ctr + RCU_GP_CTR;
> +       }
> +
> +       barrier();
> +
> +       /*
> +        * Wait for each thread rcu_reader_qs_gp count to become 0.
> +        */
> +       for (;;) {
> +               /*
> +                * We want to be notified of changes made to rcu_gp_ongoing
> +                * while we walk the list.
> +                */
> +               qemu_event_reset(&rcu_gp_event);
> +               QLIST_FOREACH_SAFE(index, &registry, node, tmp) {
> +                       if (!rcu_gp_ongoing(&index->ctr)) {
> +                               QLIST_REMOVE(index, node);
> +                               QLIST_INSERT_HEAD(&qsreaders, index, node);
> +                       }
> +               }
> +
> +               if (QLIST_EMPTY(&registry)) {
> +                       break;
> +               }
> +
> +               /*
> +                * Wait for one thread to report a quiescent state and
> +                * try again.
> +                */
> +               qemu_event_wait(&rcu_gp_event);
> +       }
> +       smp_mb();
> +
> +       /* put back the reader list in the registry */
> +       QLIST_SWAP(&registry, &qsreaders, node);
> +}
> +
> +/*
> + * Using a two-subphases algorithm for architectures with smaller than 64-bit
> + * long-size to ensure we do not encounter an overflow bug.
> + */
> +
> +void synchronize_rcu(void)
> +{
> +       unsigned long was_online;
> +
> +       was_online = rcu_reader.ctr;
> +
> +       /* All threads should read qparity before accessing data structure
> +        * where new ptr points to.
> +        */
> +
> +       /*
> +        * Mark the writer thread offline to make sure we don't wait for
> +        * our own quiescent state. This allows using synchronize_rcu()
> +        * in threads registered as readers.
> +        *
> +        * Also includes a memory barrier.
> +        */
> +       if (was_online) {
> +               rcu_thread_offline();
> +       } else {
> +               smp_mb();
> +       }
> +
> +       qemu_mutex_lock(&rcu_gp_lock);
> +
> +       if (QLIST_EMPTY(&registry))
> +               goto out;
> +
> +       if (sizeof(rcu_gp_ctr) <= 4) {
> +               /*
> +                * Wait for previous parity to be empty of readers.
> +                */
> +               update_counter_and_wait();      /* 0 -> 1, wait readers in parity 0 */
> +
> +               /*
> +                * Must finish waiting for quiescent state for parity 0 before
> +                * committing next rcu_gp_ctr update to memory. Failure to
> +                * do so could result in the writer waiting forever while new
> +                * readers are always accessing data (no progress).  Enforce
> +                * compiler-order of load rcu_reader ctr before store to
> +                * rcu_gp_ctr.
> +                */
> +               barrier();
> +
> +               /*
> +                * Adding a memory barrier which is _not_ formally required,
> +                * but makes the model easier to understand. It does not have a
> +                * big performance impact anyway, given this is the write-side.
> +                */
> +               smp_mb();
> +       }
> +
> +       /*
> +        * Wait for previous parity/grace period to be empty of readers.
> +        */
> +       update_counter_and_wait();      /* 1 -> 0, wait readers in parity 1 */
> +out:
> +       qemu_mutex_unlock(&rcu_gp_lock);
> +
> +       if (was_online) {
> +               /* Also includes a memory barrier.  */
> +               rcu_thread_online();
> +       } else {
> +               /*
> +                * Finish waiting for reader threads before letting the old
> +                * ptr being freed.
> +                */
> +               smp_mb();
> +       }
> +}
> +
> +static void rcu_init(void)
> +{
> +       qemu_mutex_init(&rcu_gp_lock);
> +       qemu_event_init(&rcu_gp_event, true);
> +}
> +
> +void rcu_register_thread(void)
> +{
> +       static QemuOnce init = QEMU_ONCE_INIT;
> +       assert(rcu_reader.ctr == 0);
> +
> +       qemu_once(&init, rcu_init);
> +       qemu_mutex_lock(&rcu_gp_lock);
> +       QLIST_INSERT_HEAD(&registry, &rcu_reader, node);
> +       qemu_mutex_unlock(&rcu_gp_lock);
> +       rcu_thread_online();
> +}
> +
> +void rcu_unregister_thread(void)
> +{
> +       /*
> +        * We have to make the thread offline otherwise we end up dealocking
> +        * with a waiting writer.
> +        */
> +       rcu_thread_offline();
> +       qemu_mutex_lock(&rcu_gp_lock);
> +       QLIST_REMOVE(&rcu_reader, node);
> +       qemu_mutex_unlock(&rcu_gp_lock);
> +}
> diff --git a/rcu.h b/rcu.h
> new file mode 100644
> index 0000000..569f696
> --- /dev/null
> +++ b/rcu.h
> @@ -0,0 +1,135 @@
> +#ifndef _URCU_QSBR_H
> +#define _URCU_QSBR_H
> +
> +/*
> + * urcu-qsbr.h
> + *
> + * Userspace RCU QSBR header.
> + *
> + * LGPL-compatible code should include this header with :
> + *
> + * #define _LGPL_SOURCE
> + * #include <urcu.h>
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
> + *
> +c* IBM's contributions to this file may be relicensed under LGPLv2 or later.
> + */
> +
> +#include <stdlib.h>
> +#include <assert.h>
> +#include <limits.h>
> +#include <unistd.h>
> +#include <stdint.h>
> +#include <stdbool.h>
> +
> +#include "compiler.h"
> +#include "rcu-pointer.h"
> +#include "qemu-thread.h"
> +#include "qemu-queue.h"
> +#include "qemu-barrier.h"
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +/*
> + * Important !
> + *
> + * Each thread containing read-side critical sections must be registered
> + * with rcu_register_thread() before calling rcu_read_lock().
> + * rcu_unregister_thread() should be called before the thread exits.
> + */
> +
> +#ifdef DEBUG_RCU
> +#define rcu_assert(args...)    assert(args)
> +#else
> +#define rcu_assert(args...)
> +#endif
> +
> +#define RCU_GP_ONLINE          (1UL << 0)
> +#define RCU_GP_CTR             (1UL << 1)
> +
> +/*
> + * Global quiescent period counter with low-order bits unused.
> + * Using a int rather than a char to eliminate false register dependencies
> + * causing stalls on some architectures.
> + */
> +extern unsigned long rcu_gp_ctr;
> +
> +extern QemuEvent rcu_gp_event;
> +
> +struct rcu_reader {
> +       /* Data used by both reader and synchronize_rcu() */
> +       unsigned long ctr;
> +       /* Data used for registry */
> +       QLIST_ENTRY(rcu_reader) node;
> +};
> +
> +extern __thread struct rcu_reader rcu_reader;

$ cat thread.c
static __thread int sigusr1_wfd;
$ gcc -v
Reading specs from
/usr/bin/../lib/gcc-lib/sparc64-unknown-openbsd4.9/4.2.1/specs
Target: sparc64-unknown-openbsd4.9
Configured with: OpenBSD/sparc64 system compiler
Thread model: posix
gcc version 4.2.1 20070719
$ gcc -c -pthread thread.c
thread.c:1: error: thread-local storage not supported for this target

> +
> +static inline void rcu_read_lock(void)
> +{
> +       rcu_assert(rcu_reader.ctr);
> +}
> +
> +static inline void rcu_read_unlock(void)
> +{
> +}
> +
> +static inline void rcu_quiescent_state(void)
> +{
> +       smp_mb();
> +       ACCESS_ONCE(rcu_reader.ctr) = ACCESS_ONCE(rcu_gp_ctr);
> +#if 0
> +       /* write rcu_reader.ctr before read futex.  Included in
> +        * qemu_event_set. */
> +       smp_mb();
> +#endif
> +       qemu_event_set(&rcu_gp_event);
> +}
> +
> +static inline void rcu_thread_offline(void)
> +{
> +       smp_mb();
> +       ACCESS_ONCE(rcu_reader.ctr) = 0;
> +#if 0
> +       /* write rcu_reader.ctr before read futex.  Included in
> +        * qemu_event_set. */
> +       smp_mb();
> +#endif
> +       qemu_event_set(&rcu_gp_event);
> +}
> +
> +static inline void rcu_thread_online(void)
> +{
> +       smp_mb();       /* Ensure the compiler does not reorder us with mutex */
> +       ACCESS_ONCE(rcu_reader.ctr) = ACCESS_ONCE(rcu_gp_ctr);
> +       smp_mb();
> +}
> +
> +extern void synchronize_rcu(void);
> +
> +/*
> + * Reader thread registration.
> + */
> +extern void rcu_register_thread(void);
> +extern void rcu_unregister_thread(void);
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _URCU_QSBR_H */
> --
> 1.7.6
>
>
>
>

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [Qemu-devel] [RFC PATCH 03/13] qemu-threads: add QemuEvent
  2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 03/13] qemu-threads: add QemuEvent Paolo Bonzini
@ 2011-08-17 17:09   ` Blue Swirl
  0 siblings, 0 replies; 16+ messages in thread
From: Blue Swirl @ 2011-08-17 17:09 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel

On Mon, Aug 15, 2011 at 9:08 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
> This emulates Win32 manual-reset events using futexes or conditional
> variables.  Typical ways to use them are with multi-producer,
> single-consumer data structures, to test for a complex condition whose
> elements come from different threads:
>
>    for (;;) {
>        qemu_event_reset(ev);
>        ... test complex condition ...
>        if (condition is true) {
>            break;
>        }
>        qemu_event_wait(ev);
>    }
>
> Alternatively:
>
>    ... compute condition ...
>    if (condition) {
>        do {
>            qemu_event_wait(ev);
>            qemu_event_reset(ev);
>            ... compute condition ...
>        } while(condition);
>        qemu_event_set(ev);
>    }
>
> QemuEvent provides a very fast userspace path in the common case when
> no other thread is waiting, or the event is not changing state.  It
> is used to report RCU quiescent states to the thread calling
> synchronize_rcu (the latter being the single consumer), and to report
> call_rcu invocations to the thread that receives them.
>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  qemu-thread-posix.c |  124 +++++++++++++++++++++++++++++++++++++++++++++++++++
>  qemu-thread-posix.h |    8 +++
>  qemu-thread-win32.c |   26 +++++++++++
>  qemu-thread-win32.h |    4 ++
>  qemu-thread.h       |    8 +++
>  5 files changed, 170 insertions(+), 0 deletions(-)
>
> diff --git a/qemu-thread-posix.c b/qemu-thread-posix.c
> index 2bd02ef..50e7421 100644
> --- a/qemu-thread-posix.c
> +++ b/qemu-thread-posix.c
> @@ -17,7 +17,10 @@
>  #include <signal.h>
>  #include <stdint.h>
>  #include <string.h>
> +#include <limits.h>
> +#include <unistd.h>
>  #include "qemu-thread.h"
> +#include "qemu-barrier.h"
>
>  static void error_exit(int err, const char *msg)
>  {
> @@ -115,6 +118,127 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex)
>         error_exit(err, __func__);
>  }
>
> +#ifdef __linux__
> +#include <sys/syscall.h>

futex manual page says
#include <linux/futex.h>
#include <sys/time.h>

Maybe the compatibility stuff below belongs to linux-headers.

> +#ifndef FUTEX_WAIT
> +#define FUTEX_WAIT              0
> +#endif
> +#ifndef FUTEX_WAKE
> +#define FUTEX_WAKE              1
> +#endif
> +
> +#define futex(...)              syscall(__NR_futex, __VA_ARGS__)
> +
> +static inline void futex_wake(QemuEvent *ev, int n)
> +{
> +    futex(ev, FUTEX_WAKE, n, NULL, NULL, 0);
> +}
> +
> +static inline void futex_wait(QemuEvent *ev, unsigned val)
> +{
> +    futex(ev, FUTEX_WAIT, (int) val, NULL, NULL, 0);
> +}
> +#else
> +static inline void futex_wake(QemuEvent *ev, int n)
> +{
> +  if (n == 1)

Missing braces, please use checkpatch.pl.

> +    pthread_cond_signal(&ev->cond);
> +  else
> +    pthread_cond_broadcast(&ev->cond);
> +}
> +
> +static inline void futex_wait(QemuEvent *ev, unsigned val)
> +{
> +  pthread_mutex_lock(&ev->lock);
> +  if (ev->value == val)
> +    pthread_cond_wait(&ev->cond, &ev->lock);
> +  pthread_mutex_unlock(&ev->lock);
> +}
> +#endif
> +
> +/* Bit 0 is 1 if there are no waiters.  Bit 1 is 1 if the event is set.
> + * The combination "event_set && event_has_waiters" is impossible.  */
> +#define EV_FREE_BIT    1
> +#define EV_SET_BIT     2
> +
> +#define EV_BUSY                0
> +#define EV_FREE                1
> +#define EV_SET         3
> +
> +void qemu_event_init(QemuEvent *ev, bool init)
> +{
> +#ifndef __linux__
> +    pthread_mutex_init(&ev->lock, NULL);
> +    pthread_cond_init(&ev->cond, NULL);
> +#endif
> +
> +    ev->value = (init ? EV_SET : EV_FREE);
> +}
> +
> +void qemu_event_destroy(QemuEvent *ev)
> +{
> +#ifndef __linux__
> +    pthread_mutex_destroy(&ev->lock);
> +    pthread_cond_destroy(&ev->cond);
> +#endif
> +}
> +
> +void qemu_event_set(QemuEvent *ev)
> +{
> +    unsigned value;
> +
> +    smp_mb();
> +    value = ev->value;
> +    if (value == EV_SET) {
> +        /* Exit on a pre-existing/concurrent set.  */
> +        smp_mb();
> +    } else {
> +        if (__sync_fetch_and_or(&ev->value, EV_SET) == EV_BUSY) {
> +            /* There were waiters, wake them up.  */
> +            futex_wake(ev, INT_MAX);
> +        }
> +    }
> +}
> +
> +void qemu_event_reset(QemuEvent *ev)
> +{
> +    unsigned value;
> +
> +    smp_mb();
> +    value = ev->value;
> +    if (value != EV_SET) {
> +        /* Exit on a pre-existing reset.  */
> +        smp_mb();
> +    } else {
> +        /* If there was a concurrent reset (or even reset+wait),
> +         * do nothing.  Otherwise change EV_SET->EV_FREE.  */
> +        __sync_fetch_and_and(&ev->value, ~EV_SET_BIT);
> +    }
> +}
> +
> +void qemu_event_wait(QemuEvent *ev)
> +{
> +    unsigned value, old;
> +
> +    smp_mb();
> +    value = ev->value;
> +    if (value == EV_SET) {
> +        smp_mb();
> +    } else {
> +        if (value == EV_FREE) {
> +            /* Leave the event reset and tell qemu_event_set that there
> +             * are waiters.  No need to retry, because there cannot be
> +             * a concurent busy->free transition.  After the CAS, the
> +             * event will be either set or busy.  */
> +            old = __sync_val_compare_and_swap(&ev->value, EV_FREE, EV_BUSY);
> +            if (old == EV_SET) {
> +                return;
> +            }
> +        }
> +        futex_wait(ev, EV_BUSY);
> +    }
> +}
> +
>  void qemu_thread_create(QemuThread *thread,
>                        void *(*start_routine)(void*),
>                        void *arg)
> diff --git a/qemu-thread-posix.h b/qemu-thread-posix.h
> index ee4618e..2f5b63d 100644
> --- a/qemu-thread-posix.h
> +++ b/qemu-thread-posix.h
> @@ -10,6 +10,14 @@ struct QemuCond {
>     pthread_cond_t cond;
>  };
>
> +struct QemuEvent {
> +#ifndef __linux__
> +    pthread_mutex_t lock;
> +    pthread_cond_t cond;
> +#endif
> +    unsigned value;
> +};
> +
>  struct QemuThread {
>     pthread_t thread;
>  };
> diff --git a/qemu-thread-win32.c b/qemu-thread-win32.c
> index 2d2d5ab..9bdbb48 100644
> --- a/qemu-thread-win32.c
> +++ b/qemu-thread-win32.c
> @@ -192,6 +192,32 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex)
>     qemu_mutex_lock(mutex);
>  }
>
> +void qemu_event_init(QemuEvent *ev, bool init)
> +{
> +    /* Manual reset.  */
> +    ev->event = CreateEvent(NULL, TRUE, init, NULL);
> +}
> +
> +void qemu_event_destroy(QemuEvent *mutex)
> +{
> +    CloseHandle(ev->event);
> +}
> +
> +void qemu_event_set(QemuEvent *ev)
> +{
> +    SetEvent(ev->event);
> +}
> +
> +void qemu_event_reset(QemuEvent *ev)
> +{
> +    ResetEvent(ev->event);
> +}
> +
> +void qemu_event_wait(QemuEvent *ev)
> +{
> +    WaitForSingleObject(ev->event, INFINITE);
> +}
> +
>  struct QemuThreadData {
>     QemuThread *thread;
>     void *(*start_routine)(void *);
> diff --git a/qemu-thread-win32.h b/qemu-thread-win32.h
> index 878f86a..ddd6d0f 100644
> --- a/qemu-thread-win32.h
> +++ b/qemu-thread-win32.h
> @@ -13,6 +13,10 @@ struct QemuCond {
>     HANDLE continue_event;
>  };
>
> +struct QemuEvent {
> +    HANDLE event;
> +};
> +
>  struct QemuThread {
>     HANDLE thread;
>     void *ret;
> diff --git a/qemu-thread.h b/qemu-thread.h
> index 0a73d50..8353e3d 100644
> --- a/qemu-thread.h
> +++ b/qemu-thread.h
> @@ -2,9 +2,11 @@
>  #define __QEMU_THREAD_H 1
>
>  #include <inttypes.h>
> +#include <stdbool.h>
>
>  typedef struct QemuMutex QemuMutex;
>  typedef struct QemuCond QemuCond;
> +typedef struct QemuEvent QemuEvent;
>  typedef struct QemuThread QemuThread;
>
>  #ifdef _WIN32
> @@ -31,6 +33,12 @@ void qemu_cond_signal(QemuCond *cond);
>  void qemu_cond_broadcast(QemuCond *cond);
>  void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex);
>
> +void qemu_event_init(QemuEvent *ev, bool init);
> +void qemu_event_set(QemuEvent *ev);
> +void qemu_event_reset(QemuEvent *ev);
> +void qemu_event_wait(QemuEvent *ev);
> +void qemu_event_destroy(QemuEvent *ev);
> +
>  void qemu_thread_create(QemuThread *thread,
>                        void *(*start_routine)(void*),
>                        void *arg);
> --
> 1.7.6
>
>
>
>

^ permalink raw reply	[flat|nested] 16+ messages in thread

end of thread, other threads:[~2011-08-17 17:12 UTC | newest]

Thread overview: 16+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2011-08-15 21:08 [Qemu-devel] [RFC PATCH 00/13] RCU implementation for QEMU Paolo Bonzini
2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 01/13] add smp_mb() Paolo Bonzini
2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 02/13] rename qemu_event_{init,read} Paolo Bonzini
2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 03/13] qemu-threads: add QemuEvent Paolo Bonzini
2011-08-17 17:09   ` Blue Swirl
2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 04/13] qemu-threads: add QemuOnce Paolo Bonzini
2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 05/13] add rcu library Paolo Bonzini
2011-08-17 17:04   ` Blue Swirl
2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 06/13] rcu: add rcutorture Paolo Bonzini
2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 07/13] osdep: add qemu_msleep Paolo Bonzini
2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 08/13] add call_rcu support Paolo Bonzini
2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 09/13] rcu: avoid repeated system calls Paolo Bonzini
2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 10/13] rcu: report quiescent states Paolo Bonzini
2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 11/13] rcuify iohandlers Paolo Bonzini
2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 12/13] split MRU ram list Paolo Bonzini
2011-08-15 21:08 ` [Qemu-devel] [RFC PATCH 13/13] RCUify ram_list Paolo Bonzini

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).