- * [Qemu-devel] [PATCH 01/11] coroutine: use QSIMPLEQ instead of QTAILQ
  2016-04-15 11:31 [Qemu-devel] [RFC PATCH resend 00/11] Make CoMutex/CoQueue/CoRwlock thread-safe Paolo Bonzini
@ 2016-04-15 11:31 ` Paolo Bonzini
  2016-04-19 13:45   ` Stefan Hajnoczi
  2016-04-15 11:31 ` [Qemu-devel] [PATCH 02/11] throttle-groups: restart throttled requests from coroutine context Paolo Bonzini
                   ` (10 subsequent siblings)
  11 siblings, 1 reply; 27+ messages in thread
From: Paolo Bonzini @ 2016-04-15 11:31 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz, kwolf, berto
CoQueue do not need to remove any element but the head of the list;
processing is always strictly FIFO.  Therefore, the simpler singly-linked
QSIMPLEQ can be used instead.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 include/qemu/coroutine.h     |  2 +-
 include/qemu/coroutine_int.h |  4 ++--
 util/qemu-coroutine-lock.c   | 22 +++++++++++-----------
 util/qemu-coroutine.c        |  2 +-
 4 files changed, 15 insertions(+), 15 deletions(-)
diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index 305fe76..63ae7fe 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -102,7 +102,7 @@ bool qemu_in_coroutine(void);
  * are built.
  */
 typedef struct CoQueue {
-    QTAILQ_HEAD(, Coroutine) entries;
+    QSIMPLEQ_HEAD(, Coroutine) entries;
 } CoQueue;
 
 /**
diff --git a/include/qemu/coroutine_int.h b/include/qemu/coroutine_int.h
index 42d6838..581a7f5 100644
--- a/include/qemu/coroutine_int.h
+++ b/include/qemu/coroutine_int.h
@@ -41,8 +41,8 @@ struct Coroutine {
     QSLIST_ENTRY(Coroutine) pool_next;
 
     /* Coroutines that should be woken up when we yield or terminate */
-    QTAILQ_HEAD(, Coroutine) co_queue_wakeup;
-    QTAILQ_ENTRY(Coroutine) co_queue_next;
+    QSIMPLEQ_HEAD(, Coroutine) co_queue_wakeup;
+    QSIMPLEQ_ENTRY(Coroutine) co_queue_next;
 };
 
 Coroutine *qemu_coroutine_new(void);
diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
index da37ca7..cf53693 100644
--- a/util/qemu-coroutine-lock.c
+++ b/util/qemu-coroutine-lock.c
@@ -31,13 +31,13 @@
 
 void qemu_co_queue_init(CoQueue *queue)
 {
-    QTAILQ_INIT(&queue->entries);
+    QSIMPLEQ_INIT(&queue->entries);
 }
 
 void coroutine_fn qemu_co_queue_wait(CoQueue *queue)
 {
     Coroutine *self = qemu_coroutine_self();
-    QTAILQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
+    QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
     qemu_coroutine_yield();
     assert(qemu_in_coroutine());
 }
@@ -55,8 +55,8 @@ void qemu_co_queue_run_restart(Coroutine *co)
     Coroutine *next;
 
     trace_qemu_co_queue_run_restart(co);
-    while ((next = QTAILQ_FIRST(&co->co_queue_wakeup))) {
-        QTAILQ_REMOVE(&co->co_queue_wakeup, next, co_queue_next);
+    while ((next = QSIMPLEQ_FIRST(&co->co_queue_wakeup))) {
+        QSIMPLEQ_REMOVE_HEAD(&co->co_queue_wakeup, co_queue_next);
         qemu_coroutine_enter(next, NULL);
     }
 }
@@ -66,13 +66,13 @@ static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
     Coroutine *self = qemu_coroutine_self();
     Coroutine *next;
 
-    if (QTAILQ_EMPTY(&queue->entries)) {
+    if (QSIMPLEQ_EMPTY(&queue->entries)) {
         return false;
     }
 
-    while ((next = QTAILQ_FIRST(&queue->entries)) != NULL) {
-        QTAILQ_REMOVE(&queue->entries, next, co_queue_next);
-        QTAILQ_INSERT_TAIL(&self->co_queue_wakeup, next, co_queue_next);
+    while ((next = QSIMPLEQ_FIRST(&queue->entries)) != NULL) {
+        QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
+        QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, next, co_queue_next);
         trace_qemu_co_queue_next(next);
         if (single) {
             break;
@@ -97,19 +97,19 @@ bool qemu_co_enter_next(CoQueue *queue)
 {
     Coroutine *next;
 
-    next = QTAILQ_FIRST(&queue->entries);
+    next = QSIMPLEQ_FIRST(&queue->entries);
     if (!next) {
         return false;
     }
 
-    QTAILQ_REMOVE(&queue->entries, next, co_queue_next);
+    QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
     qemu_coroutine_enter(next, NULL);
     return true;
 }
 
 bool qemu_co_queue_empty(CoQueue *queue)
 {
-    return QTAILQ_FIRST(&queue->entries) == NULL;
+    return QSIMPLEQ_FIRST(&queue->entries) == NULL;
 }
 
 void qemu_co_mutex_init(CoMutex *mutex)
diff --git a/util/qemu-coroutine.c b/util/qemu-coroutine.c
index 5816702..b7cb636 100644
--- a/util/qemu-coroutine.c
+++ b/util/qemu-coroutine.c
@@ -76,7 +76,7 @@ Coroutine *qemu_coroutine_create(CoroutineEntry *entry)
     }
 
     co->entry = entry;
-    QTAILQ_INIT(&co->co_queue_wakeup);
+    QSIMPLEQ_INIT(&co->co_queue_wakeup);
     return co;
 }
 
-- 
2.5.5
^ permalink raw reply related	[flat|nested] 27+ messages in thread
- * Re: [Qemu-devel] [PATCH 01/11] coroutine: use QSIMPLEQ instead of QTAILQ
  2016-04-15 11:31 ` [Qemu-devel] [PATCH 01/11] coroutine: use QSIMPLEQ instead of QTAILQ Paolo Bonzini
@ 2016-04-19 13:45   ` Stefan Hajnoczi
  0 siblings, 0 replies; 27+ messages in thread
From: Stefan Hajnoczi @ 2016-04-19 13:45 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, kwolf, berto, famz, stefanha
[-- Attachment #1: Type: text/plain, Size: 643 bytes --]
On Fri, Apr 15, 2016 at 01:31:56PM +0200, Paolo Bonzini wrote:
> CoQueue do not need to remove any element but the head of the list;
> processing is always strictly FIFO.  Therefore, the simpler singly-linked
> QSIMPLEQ can be used instead.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  include/qemu/coroutine.h     |  2 +-
>  include/qemu/coroutine_int.h |  4 ++--
>  util/qemu-coroutine-lock.c   | 22 +++++++++++-----------
>  util/qemu-coroutine.c        |  2 +-
>  4 files changed, 15 insertions(+), 15 deletions(-)
Suitable as its own non-RFC patch.
Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 473 bytes --]
^ permalink raw reply	[flat|nested] 27+ messages in thread 
 
- * [Qemu-devel] [PATCH 02/11] throttle-groups: restart throttled requests from coroutine context
  2016-04-15 11:31 [Qemu-devel] [RFC PATCH resend 00/11] Make CoMutex/CoQueue/CoRwlock thread-safe Paolo Bonzini
  2016-04-15 11:31 ` [Qemu-devel] [PATCH 01/11] coroutine: use QSIMPLEQ instead of QTAILQ Paolo Bonzini
@ 2016-04-15 11:31 ` Paolo Bonzini
  2016-04-19 13:49   ` Stefan Hajnoczi
  2016-04-15 11:31 ` [Qemu-devel] [PATCH 03/11] coroutine: delete qemu_co_enter_next Paolo Bonzini
                   ` (9 subsequent siblings)
  11 siblings, 1 reply; 27+ messages in thread
From: Paolo Bonzini @ 2016-04-15 11:31 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz, kwolf, berto
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/throttle-groups.c | 69 +++++++++++++++++++++++++++++++++++--------------
 1 file changed, 49 insertions(+), 20 deletions(-)
diff --git a/block/throttle-groups.c b/block/throttle-groups.c
index 1938e90..53e910e 100644
--- a/block/throttle-groups.c
+++ b/block/throttle-groups.c
@@ -264,8 +264,7 @@ static void schedule_next_request(BlockDriverState *bs, bool is_write)
     /* If it doesn't have to wait, queue it for immediate execution */
     if (!must_wait) {
         /* Give preference to requests from the current bs */
-        if (qemu_in_coroutine() &&
-            qemu_co_queue_next(&bs->throttled_reqs[is_write])) {
+        if (qemu_co_queue_next(&bs->throttled_reqs[is_write])) {
             token = bs;
         } else {
             ThrottleTimers *tt = &token->throttle_timers;
@@ -317,15 +316,54 @@ void coroutine_fn throttle_group_co_io_limits_intercept(BlockDriverState *bs,
     qemu_mutex_unlock(&tg->lock);
 }
 
-void throttle_group_restart_bs(BlockDriverState *bs)
-{
-    int i;
+typedef struct RestartData RestartData;
+struct RestartData {
+    BlockDriverState *bs;
+    bool single;
+    bool is_write;
+};
 
-    for (i = 0; i < 2; i++) {
-        while (qemu_co_enter_next(&bs->throttled_reqs[i])) {
-            ;
+static void throttle_group_restart_queue_entry(void *opaque)
+{
+    RestartData *data = opaque;
+    BlockDriverState *bs = data->bs;
+    bool single = data->single;
+    bool is_write = data->is_write;
+    unsigned count = 0;
+
+    g_free(data);
+    while (qemu_co_queue_next(&bs->throttled_reqs[is_write])) {
+        count++;
+        if (single) {
+            break;
         }
     }
+
+    if (count == 0) {
+        ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
+
+        qemu_mutex_lock(&tg->lock);
+        schedule_next_request(bs, is_write);
+        qemu_mutex_unlock(&tg->lock);
+    }
+}
+
+static void throttle_group_restart_queue(BlockDriverState *bs, bool single,
+                                         bool is_write)
+{
+    RestartData *data = g_new(RestartData, 1);
+
+    data->bs = bs;
+    data->single = single;
+    data->is_write = is_write;
+    qemu_coroutine_enter(qemu_coroutine_create(throttle_group_restart_queue_entry),
+                         &data);
+}
+
+void throttle_group_restart_bs(BlockDriverState *bs)
+{
+    throttle_group_restart_queue(bs, false, 0);
+    throttle_group_restart_queue(bs, false, 1);
 }
 
 /* Update the throttle configuration for a particular group. Similar
@@ -351,8 +389,8 @@ void throttle_group_config(BlockDriverState *bs, ThrottleConfig *cfg)
     throttle_config(ts, tt, cfg);
     qemu_mutex_unlock(&tg->lock);
 
-    qemu_co_enter_next(&bs->throttled_reqs[0]);
-    qemu_co_enter_next(&bs->throttled_reqs[1]);
+    throttle_group_restart_queue(bs, true, 0);
+    throttle_group_restart_queue(bs, true, 1);
 }
 
 /* Get the throttle configuration from a particular group. Similar to
@@ -381,7 +419,6 @@ static void timer_cb(BlockDriverState *bs, bool is_write)
 {
     ThrottleState *ts = bs->throttle_state;
     ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
-    bool empty_queue;
 
     /* The timer has just been fired, so we can update the flag */
     qemu_mutex_lock(&tg->lock);
@@ -390,16 +427,8 @@ static void timer_cb(BlockDriverState *bs, bool is_write)
 
     /* Run the request that was waiting for this timer */
     aio_context_acquire(bdrv_get_aio_context(bs));
-    empty_queue = !qemu_co_enter_next(&bs->throttled_reqs[is_write]);
+    throttle_group_restart_queue(bs, true, is_write);
     aio_context_release(bdrv_get_aio_context(bs));
-
-    /* If the request queue was empty then we have to take care of
-     * scheduling the next one */
-    if (empty_queue) {
-        qemu_mutex_lock(&tg->lock);
-        schedule_next_request(bs, is_write);
-        qemu_mutex_unlock(&tg->lock);
-    }
 }
 
 static void read_timer_cb(void *opaque)
-- 
2.5.5
^ permalink raw reply related	[flat|nested] 27+ messages in thread
- * Re: [Qemu-devel] [PATCH 02/11] throttle-groups: restart throttled requests from coroutine context
  2016-04-15 11:31 ` [Qemu-devel] [PATCH 02/11] throttle-groups: restart throttled requests from coroutine context Paolo Bonzini
@ 2016-04-19 13:49   ` Stefan Hajnoczi
  0 siblings, 0 replies; 27+ messages in thread
From: Stefan Hajnoczi @ 2016-04-19 13:49 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, kwolf, berto, famz, stefanha
[-- Attachment #1: Type: text/plain, Size: 309 bytes --]
On Fri, Apr 15, 2016 at 01:31:57PM +0200, Paolo Bonzini wrote:
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/throttle-groups.c | 69 +++++++++++++++++++++++++++++++++++--------------
>  1 file changed, 49 insertions(+), 20 deletions(-)
Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 473 bytes --]
^ permalink raw reply	[flat|nested] 27+ messages in thread 
 
- * [Qemu-devel] [PATCH 03/11] coroutine: delete qemu_co_enter_next
  2016-04-15 11:31 [Qemu-devel] [RFC PATCH resend 00/11] Make CoMutex/CoQueue/CoRwlock thread-safe Paolo Bonzini
  2016-04-15 11:31 ` [Qemu-devel] [PATCH 01/11] coroutine: use QSIMPLEQ instead of QTAILQ Paolo Bonzini
  2016-04-15 11:31 ` [Qemu-devel] [PATCH 02/11] throttle-groups: restart throttled requests from coroutine context Paolo Bonzini
@ 2016-04-15 11:31 ` Paolo Bonzini
  2016-04-19 13:49   ` Stefan Hajnoczi
  2016-04-15 11:31 ` [Qemu-devel] [PATCH 04/11] aio: introduce aio_co_schedule Paolo Bonzini
                   ` (8 subsequent siblings)
  11 siblings, 1 reply; 27+ messages in thread
From: Paolo Bonzini @ 2016-04-15 11:31 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz, kwolf, berto
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 include/qemu/coroutine.h   |  5 -----
 util/qemu-coroutine-lock.c | 14 --------------
 2 files changed, 19 deletions(-)
diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index 63ae7fe..bb23be0 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -130,11 +130,6 @@ bool coroutine_fn qemu_co_queue_next(CoQueue *queue);
 void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue);
 
 /**
- * Enter the next coroutine in the queue
- */
-bool qemu_co_enter_next(CoQueue *queue);
-
-/**
  * Checks if the CoQueue is empty.
  */
 bool qemu_co_queue_empty(CoQueue *queue);
diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
index cf53693..1f46970 100644
--- a/util/qemu-coroutine-lock.c
+++ b/util/qemu-coroutine-lock.c
@@ -93,20 +93,6 @@ void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue)
     qemu_co_queue_do_restart(queue, false);
 }
 
-bool qemu_co_enter_next(CoQueue *queue)
-{
-    Coroutine *next;
-
-    next = QSIMPLEQ_FIRST(&queue->entries);
-    if (!next) {
-        return false;
-    }
-
-    QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
-    qemu_coroutine_enter(next, NULL);
-    return true;
-}
-
 bool qemu_co_queue_empty(CoQueue *queue)
 {
     return QSIMPLEQ_FIRST(&queue->entries) == NULL;
-- 
2.5.5
^ permalink raw reply related	[flat|nested] 27+ messages in thread
- * Re: [Qemu-devel] [PATCH 03/11] coroutine: delete qemu_co_enter_next
  2016-04-15 11:31 ` [Qemu-devel] [PATCH 03/11] coroutine: delete qemu_co_enter_next Paolo Bonzini
@ 2016-04-19 13:49   ` Stefan Hajnoczi
  0 siblings, 0 replies; 27+ messages in thread
From: Stefan Hajnoczi @ 2016-04-19 13:49 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, kwolf, berto, famz, stefanha
[-- Attachment #1: Type: text/plain, Size: 301 bytes --]
On Fri, Apr 15, 2016 at 01:31:58PM +0200, Paolo Bonzini wrote:
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  include/qemu/coroutine.h   |  5 -----
>  util/qemu-coroutine-lock.c | 14 --------------
>  2 files changed, 19 deletions(-)
Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 473 bytes --]
^ permalink raw reply	[flat|nested] 27+ messages in thread 
 
- * [Qemu-devel] [PATCH 04/11] aio: introduce aio_co_schedule
  2016-04-15 11:31 [Qemu-devel] [RFC PATCH resend 00/11] Make CoMutex/CoQueue/CoRwlock thread-safe Paolo Bonzini
                   ` (2 preceding siblings ...)
  2016-04-15 11:31 ` [Qemu-devel] [PATCH 03/11] coroutine: delete qemu_co_enter_next Paolo Bonzini
@ 2016-04-15 11:31 ` Paolo Bonzini
  2016-04-19 14:31   ` Stefan Hajnoczi
  2016-04-29  5:11   ` Fam Zheng
  2016-04-15 11:32 ` [Qemu-devel] [PATCH 05/11] coroutine-lock: reschedule coroutine on the AioContext it was running on Paolo Bonzini
                   ` (7 subsequent siblings)
  11 siblings, 2 replies; 27+ messages in thread
From: Paolo Bonzini @ 2016-04-15 11:31 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz, kwolf, berto
This provides the infrastructure to start a coroutine on a remote
AioContext.  It will be used by CoMutex and CoQueue, so that
coroutines don't jump from one context to another when they
go to sleep on a mutex or waitqueue.
aio_co_schedule is based on a lock-free multiple-producer,
single-consumer queue.  The multiple producers use cmpxchg to add
to a LIFO stacks.  The consumer (a per-AioContext bottom half) grabs
all items added so far, inverts the list to make it FIFO, and goes
through it one item at a time.
Most of the changes really are in infrastructure and tests.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 async.c                      |  38 ++++++++
 include/block/aio.h          |   9 ++
 include/qemu/coroutine_int.h |  10 +-
 tests/Makefile               |  11 ++-
 tests/iothread.c             |  91 ++++++++++++++++++
 tests/iothread.h             |  25 +++++
 tests/test-aio-multithread.c | 220 +++++++++++++++++++++++++++++++++++++++++++
 trace-events                 |   4 +
 8 files changed, 404 insertions(+), 4 deletions(-)
 create mode 100644 tests/iothread.c
 create mode 100644 tests/iothread.h
 create mode 100644 tests/test-aio-multithread.c
diff --git a/async.c b/async.c
index acd3627..ef8b409 100644
--- a/async.c
+++ b/async.c
@@ -30,6 +30,8 @@
 #include "qemu/main-loop.h"
 #include "qemu/atomic.h"
 #include "block/raw-aio.h"
+#include "trace/generated-tracers.h"
+#include "qemu/coroutine_int.h"
 
 /***********************************************************/
 /* bottom halves (can be seen as timers which expire ASAP) */
@@ -255,6 +257,8 @@ aio_ctx_finalize(GSource     *source)
     }
 #endif
 
+    qemu_bh_delete(ctx->schedule_bh);
+
     qemu_lockcnt_lock(&ctx->list_lock);
     assert(!qemu_lockcnt_count(&ctx->list_lock));
     while (ctx->first_bh) {
@@ -335,6 +339,28 @@ static void event_notifier_dummy_cb(EventNotifier *e)
 {
 }
 
+static void schedule_bh_cb(void *opaque)
+{
+    AioContext *ctx = opaque;
+    QSLIST_HEAD(, Coroutine) straight, reversed;
+
+    QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
+    QSLIST_INIT(&straight);
+
+    while (!QSLIST_EMPTY(&reversed)) {
+        Coroutine *co = QSLIST_FIRST(&reversed);
+        QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
+        QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
+    }
+
+    while (!QSLIST_EMPTY(&straight)) {
+        Coroutine *co = QSLIST_FIRST(&straight);
+        QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
+        trace_aio_schedule_bh_cb(ctx, co);
+        qemu_coroutine_enter(co, NULL);
+    }
+}
+
 AioContext *aio_context_new(Error **errp)
 {
     int ret;
@@ -354,6 +380,10 @@ AioContext *aio_context_new(Error **errp)
     }
     g_source_set_can_recurse(&ctx->source, true);
     qemu_lockcnt_init(&ctx->list_lock);
+
+    ctx->schedule_bh = aio_bh_new(ctx, schedule_bh_cb, ctx);
+    QSLIST_INIT(&ctx->scheduled_coroutines);
+
     aio_set_event_notifier(ctx, &ctx->notifier,
                            false,
                            (EventNotifierHandler *)
@@ -371,6 +401,14 @@ fail:
     return NULL;
 }
 
+void aio_co_schedule(AioContext *ctx, Coroutine *co)
+{
+    trace_aio_co_schedule(ctx, co);
+    QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines,
+                              co, co_scheduled_next);
+    qemu_bh_schedule(ctx->schedule_bh);
+}
+
 void aio_context_ref(AioContext *ctx)
 {
     g_source_ref(&ctx->source);
diff --git a/include/block/aio.h b/include/block/aio.h
index 8f55d1a..0a344c3 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -46,6 +46,7 @@ typedef struct AioHandler AioHandler;
 typedef void QEMUBHFunc(void *opaque);
 typedef void IOHandler(void *opaque);
 
+struct Coroutine;
 struct ThreadPool;
 struct LinuxAioState;
 
@@ -107,6 +108,9 @@ struct AioContext {
     bool notified;
     EventNotifier notifier;
 
+    QSLIST_HEAD(, Coroutine) scheduled_coroutines;
+    QEMUBH *schedule_bh;
+
     /* Thread pool for performing work and receiving completion callbacks.
      * Has its own locking.
      */
@@ -441,6 +445,11 @@ static inline bool aio_node_check(AioContext *ctx, bool is_external)
 }
 
 /**
+ * aio_co_schedule
+ */
+void aio_co_schedule(AioContext *ctx, struct Coroutine *co);
+
+/**
  * @ctx: the aio context
  *
  * Return whether we are running in the I/O thread that manages @ctx.
diff --git a/include/qemu/coroutine_int.h b/include/qemu/coroutine_int.h
index 581a7f5..c0e9aa1 100644
--- a/include/qemu/coroutine_int.h
+++ b/include/qemu/coroutine_int.h
@@ -38,11 +38,19 @@ struct Coroutine {
     CoroutineEntry *entry;
     void *entry_arg;
     Coroutine *caller;
+
+    /* Only used when the coroutine has terminated.  */
     QSLIST_ENTRY(Coroutine) pool_next;
 
-    /* Coroutines that should be woken up when we yield or terminate */
+    /* Coroutines that should be woken up when we yield or terminate.
+     * Only used when the coroutine is running.
+     */
     QSIMPLEQ_HEAD(, Coroutine) co_queue_wakeup;
+
+    /* Only used when the coroutine is sleeping.  */
+    AioContext *ctx;
     QSIMPLEQ_ENTRY(Coroutine) co_queue_next;
+    QSLIST_ENTRY(Coroutine) co_scheduled_next;
 };
 
 Coroutine *qemu_coroutine_new(void);
diff --git a/tests/Makefile b/tests/Makefile
index f60df75..b90b870 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -39,9 +39,13 @@ check-unit-y += tests/test-visitor-serialization$(EXESUF)
 check-unit-y += tests/test-iov$(EXESUF)
 gcov-files-test-iov-y = util/iov.c
 check-unit-y += tests/test-aio$(EXESUF)
+gcov-files-test-aio-y = async-posix.c
+gcov-files-test-aio-$(CONFIG_WIN32) += aio-win32.c
+gcov-files-test-aio-$(CONFIG_POSIX) += aio-posix.c
+check-unit-y += tests/test-aio-multithread$(EXESUF)
+gcov-files-test-aio-multithread-y = $(gcov-files-test-aio-y)
+gcov-files-test-aio-multithread-y += util/qemu-coroutine.c tests/iothread.c
 check-unit-y += tests/test-throttle$(EXESUF)
-gcov-files-test-aio-$(CONFIG_WIN32) = aio-win32.c
-gcov-files-test-aio-$(CONFIG_POSIX) = aio-posix.c
 check-unit-y += tests/test-thread-pool$(EXESUF)
 gcov-files-test-thread-pool-y = thread-pool.c
 gcov-files-test-hbitmap-y = util/hbitmap.c
@@ -400,7 +404,7 @@ test-qapi-obj-y = tests/test-qapi-visit.o tests/test-qapi-types.o \
 	$(test-qom-obj-y)
 test-crypto-obj-y = $(crypto-obj-y) $(test-qom-obj-y)
 test-io-obj-y = $(io-obj-y) $(test-crypto-obj-y)
-test-block-obj-y = $(block-obj-y) $(test-io-obj-y)
+test-block-obj-y = $(block-obj-y) $(test-io-obj-y) tests/iothread.o
 
 tests/check-qint$(EXESUF): tests/check-qint.o $(test-util-obj-y)
 tests/check-qstring$(EXESUF): tests/check-qstring.o $(test-util-obj-y)
@@ -412,6 +416,7 @@ tests/check-qom-interface$(EXESUF): tests/check-qom-interface.o $(test-qom-obj-y
 tests/check-qom-proplist$(EXESUF): tests/check-qom-proplist.o $(test-qom-obj-y)
 tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(test-block-obj-y)
 tests/test-aio$(EXESUF): tests/test-aio.o $(test-block-obj-y)
+tests/test-aio-multithread$(EXESUF): tests/test-aio-multithread.o $(test-block-obj-y)
 tests/test-throttle$(EXESUF): tests/test-throttle.o $(test-block-obj-y)
 tests/test-blockjob-txn$(EXESUF): tests/test-blockjob-txn.o $(test-block-obj-y) $(test-util-obj-y)
 tests/test-thread-pool$(EXESUF): tests/test-thread-pool.o $(test-block-obj-y)
diff --git a/tests/iothread.c b/tests/iothread.c
new file mode 100644
index 0000000..00ab316
--- /dev/null
+++ b/tests/iothread.c
@@ -0,0 +1,91 @@
+/*
+ * Event loop thread implementation for unit tests
+ *
+ * Copyright Red Hat Inc., 2013, 2016
+ *
+ * Authors:
+ *  Stefan Hajnoczi   <stefanha@redhat.com>
+ *  Paolo Bonzini     <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "qapi/error.h"
+#include "block/aio.h"
+#include "qemu/main-loop.h"
+#include "qemu/rcu.h"
+#include "iothread.h"
+
+struct IOThread {
+    AioContext *ctx;
+
+    QemuThread thread;
+    QemuMutex init_done_lock;
+    QemuCond init_done_cond;    /* is thread initialization done? */
+    bool stopping;
+};
+
+static __thread IOThread *my_iothread;
+
+bool aio_context_in_iothread(AioContext *ctx)
+{
+    return ctx == (my_iothread ? my_iothread->ctx : qemu_get_aio_context());
+}
+
+static void *iothread_run(void *opaque)
+{
+    IOThread *iothread = opaque;
+
+    rcu_register_thread();
+
+    my_iothread = iothread;
+    qemu_mutex_lock(&iothread->init_done_lock);
+    iothread->ctx = aio_context_new(&error_abort);
+    qemu_cond_signal(&iothread->init_done_cond);
+    qemu_mutex_unlock(&iothread->init_done_lock);
+
+    while (!atomic_read(&iothread->stopping)) {
+        aio_poll(iothread->ctx, true);
+    }
+
+    rcu_unregister_thread();
+    return NULL;
+}
+
+void iothread_join(IOThread *iothread)
+{
+    iothread->stopping = true;
+    aio_notify(iothread->ctx);
+    qemu_thread_join(&iothread->thread);
+    qemu_cond_destroy(&iothread->init_done_cond);
+    qemu_mutex_destroy(&iothread->init_done_lock);
+    aio_context_unref(iothread->ctx);
+    g_free(iothread);
+}
+
+IOThread *iothread_new(void)
+{
+    IOThread *iothread = g_new0(IOThread, 1);
+
+    qemu_mutex_init(&iothread->init_done_lock);
+    qemu_cond_init(&iothread->init_done_cond);
+    qemu_thread_create(&iothread->thread, NULL, iothread_run,
+                       iothread, QEMU_THREAD_JOINABLE);
+
+    /* Wait for initialization to complete */
+    qemu_mutex_lock(&iothread->init_done_lock);
+    while (iothread->ctx == NULL) {
+        qemu_cond_wait(&iothread->init_done_cond,
+                       &iothread->init_done_lock);
+    }
+    qemu_mutex_unlock(&iothread->init_done_lock);
+    return iothread;
+}
+
+AioContext *iothread_get_aio_context(IOThread *iothread)
+{
+    return iothread->ctx;
+}
diff --git a/tests/iothread.h b/tests/iothread.h
new file mode 100644
index 0000000..4877cea
--- /dev/null
+++ b/tests/iothread.h
@@ -0,0 +1,25 @@
+/*
+ * Event loop thread implementation for unit tests
+ *
+ * Copyright Red Hat Inc., 2013, 2016
+ *
+ * Authors:
+ *  Stefan Hajnoczi   <stefanha@redhat.com>
+ *  Paolo Bonzini     <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+#ifndef TEST_IOTHREAD_H
+#define TEST_IOTHREAD_H
+
+#include "block/aio.h"
+#include "qemu/thread.h"
+
+typedef struct IOThread IOThread;
+
+IOThread *iothread_new(void);
+void iothread_join(IOThread *iothread);
+AioContext *iothread_get_aio_context(IOThread *iothread);
+
+#endif
diff --git a/tests/test-aio-multithread.c b/tests/test-aio-multithread.c
new file mode 100644
index 0000000..94fecf7
--- /dev/null
+++ b/tests/test-aio-multithread.c
@@ -0,0 +1,220 @@
+/*
+ * AioContext multithreading tests
+ *
+ * Copyright Red Hat, Inc. 2016
+ *
+ * Authors:
+ *  Paolo Bonzini    <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include <glib.h>
+#include "block/aio.h"
+#include "qapi/error.h"
+#include "qemu/coroutine.h"
+#include "qemu/thread.h"
+#include "qemu/error-report.h"
+#include "iothread.h"
+
+/* AioContext management */
+
+#define NUM_CONTEXTS 5
+
+static IOThread *threads[NUM_CONTEXTS];
+static AioContext *ctx[NUM_CONTEXTS];
+static __thread int id = -1;
+
+static QemuEvent done_event;
+
+/* Callbacks run a function synchronously on a remote iothread. */
+
+typedef struct CtxRunData {
+    QEMUBH *bh;
+    QEMUBHFunc *cb;
+    void *arg;
+} CtxRunData;
+
+static void ctx_run_bh_cb(void *opaque)
+{
+    CtxRunData *data = opaque;
+    QEMUBHFunc *cb = data->cb;
+    void *arg = data->arg;
+    qemu_bh_delete(data->bh);
+    g_free(data);
+
+    cb(arg);
+    qemu_event_set(&done_event);
+}
+
+static void ctx_run(int i, QEMUBHFunc *cb, void *opaque)
+{
+    CtxRunData *data = g_new(CtxRunData, 1);
+    QEMUBH *bh = aio_bh_new(ctx[i], ctx_run_bh_cb, data);
+
+    data->bh = bh;
+    data->cb = cb;
+    data->arg = opaque;
+
+    qemu_event_reset(&done_event);
+    qemu_bh_schedule(bh);
+    qemu_event_wait(&done_event);
+}
+
+/* Starting the iothreads. */
+
+static void set_id_cb(void *opaque)
+{
+    int *i = opaque;
+
+    id = *i;
+}
+
+static void create_aio_contexts(void)
+{
+    int i;
+
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        threads[i] = iothread_new();
+        ctx[i] = iothread_get_aio_context(threads[i]);
+    }
+
+    qemu_event_init(&done_event, false);
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        ctx_run(i, set_id_cb, &i);
+    }
+}
+
+/* Stopping the iothreads. */
+
+static void join_aio_contexts(void)
+{
+    int i;
+
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        aio_context_ref(ctx[i]);
+    }
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        iothread_join(threads[i]);
+    }
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        aio_context_unref(ctx[i]);
+    }
+    qemu_event_destroy(&done_event);
+}
+
+/* Basic test for the stuff above. */
+
+static void test_lifecycle(void)
+{
+    create_aio_contexts();
+    join_aio_contexts();
+}
+
+/* aio_co_schedule test.  */
+
+static Coroutine *to_schedule[NUM_CONTEXTS];
+
+static bool now_stopping;
+
+static int count_retry;
+static int count_here;
+static int count_other;
+
+static bool schedule_next(int n)
+{
+    Coroutine *co;
+
+    co = atomic_xchg(&to_schedule[n], NULL);
+    if (!co) {
+        atomic_inc(&count_retry);
+        return false;
+    }
+
+    if (n == id) {
+        atomic_inc(&count_here);
+    } else {
+        atomic_inc(&count_other);
+    }
+
+    aio_co_schedule(ctx[n], co);
+    return true;
+}
+
+static void finish_cb(void *opaque)
+{
+    schedule_next(id);
+}
+
+static void test_multi_co_schedule_entry(void *opaque)
+{
+    g_assert(to_schedule[id] == NULL);
+    atomic_mb_set(&to_schedule[id], qemu_coroutine_self());
+
+    while (!atomic_mb_read(&now_stopping)) {
+        int n;
+
+        n = g_test_rand_int_range(0, NUM_CONTEXTS);
+        schedule_next(n);
+        qemu_coroutine_yield();
+
+        g_assert(to_schedule[id] == NULL);
+        atomic_mb_set(&to_schedule[id], qemu_coroutine_self());
+    }
+}
+
+
+static void test_multi_co_schedule(int seconds)
+{
+    int i;
+
+    count_here = count_other = count_retry = 0;
+    now_stopping = false;
+
+    create_aio_contexts();
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        Coroutine *co1 = qemu_coroutine_create(test_multi_co_schedule_entry);
+        aio_co_schedule(ctx[i], co1);
+    }
+
+    g_usleep(seconds * 1000000);
+
+    atomic_mb_set(&now_stopping, true);
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        ctx_run(i, finish_cb, NULL);
+        to_schedule[i] = NULL;
+    }
+
+    join_aio_contexts();
+    g_test_message("scheduled %d, queued %d, retry %d, total %d\n",
+                  count_other, count_here, count_retry,
+                  count_here + count_other + count_retry);
+}
+
+static void test_multi_co_schedule_1(void)
+{
+    test_multi_co_schedule(1);
+}
+
+static void test_multi_co_schedule_10(void)
+{
+    test_multi_co_schedule(10);
+}
+
+/* End of tests.  */
+
+int main(int argc, char **argv)
+{
+    init_clocks();
+
+    g_test_init(&argc, &argv, NULL);
+    g_test_add_func("/aio/multi/lifecycle", test_lifecycle);
+    if (g_test_quick()) {
+        g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1);
+    } else {
+        g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10);
+    }
+    return g_test_run();
+}
diff --git a/trace-events b/trace-events
index 922e70b..78b042c 100644
--- a/trace-events
+++ b/trace-events
@@ -972,6 +972,10 @@ xen_map_cache_return(void* ptr) "%p"
 # hw/i386/xen/xen_platform.c
 xen_platform_log(char *s) "xen platform: %s"
 
+# async.c
+aio_co_schedule(void *ctx, void *co) "ctx %p co %p"
+aio_schedule_bh_cb(void *ctx, void *co) "ctx %p co %p"
+
 # qemu-coroutine.c
 qemu_coroutine_enter(void *from, void *to, void *opaque) "from %p to %p opaque %p"
 qemu_coroutine_yield(void *from, void *to) "from %p to %p"
-- 
2.5.5
^ permalink raw reply related	[flat|nested] 27+ messages in thread
- * Re: [Qemu-devel] [PATCH 04/11] aio: introduce aio_co_schedule
  2016-04-15 11:31 ` [Qemu-devel] [PATCH 04/11] aio: introduce aio_co_schedule Paolo Bonzini
@ 2016-04-19 14:31   ` Stefan Hajnoczi
  2016-05-17 14:57     ` Paolo Bonzini
  2016-04-29  5:11   ` Fam Zheng
  1 sibling, 1 reply; 27+ messages in thread
From: Stefan Hajnoczi @ 2016-04-19 14:31 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, kwolf, berto, famz, stefanha
[-- Attachment #1: Type: text/plain, Size: 1843 bytes --]
On Fri, Apr 15, 2016 at 01:31:59PM +0200, Paolo Bonzini wrote:
> @@ -255,6 +257,8 @@ aio_ctx_finalize(GSource     *source)
>      }
>  #endif
>  
> +    qemu_bh_delete(ctx->schedule_bh);
Please include an assertion that the scheduled coroutines list is empty.
> +
>      qemu_lockcnt_lock(&ctx->list_lock);
>      assert(!qemu_lockcnt_count(&ctx->list_lock));
>      while (ctx->first_bh) {
> @@ -335,6 +339,28 @@ static void event_notifier_dummy_cb(EventNotifier *e)
>  {
>  }
>  
> +static void schedule_bh_cb(void *opaque)
> +{
> +    AioContext *ctx = opaque;
> +    QSLIST_HEAD(, Coroutine) straight, reversed;
> +
> +    QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
> +    QSLIST_INIT(&straight);
> +
> +    while (!QSLIST_EMPTY(&reversed)) {
> +        Coroutine *co = QSLIST_FIRST(&reversed);
> +        QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
> +        QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
> +    }
> +
> +    while (!QSLIST_EMPTY(&straight)) {
> +        Coroutine *co = QSLIST_FIRST(&straight);
> +        QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
> +        trace_aio_schedule_bh_cb(ctx, co);
> +        qemu_coroutine_enter(co, NULL);
> +    }
> +}
This construct brings to mind the use-after-free case when a scheduled
coroutine terminates before it is entered by this loop:
There are two scheduled Coroutines: A and B.  During
qemu_coroutine_enter(A) we enter B.  B then terminates by returning from
its main function.  Once A yields or terminates we still try to enter
the freed B coroutine.
Unfortunately I don't think we have good debugging or an assertion for
this bug.  I'm sure it will occur at some point...   Please document
that the coroutine must not be entered by anyone else while
aio_co_schedule() is active.
[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 473 bytes --]
^ permalink raw reply	[flat|nested] 27+ messages in thread
- * Re: [Qemu-devel] [PATCH 04/11] aio: introduce aio_co_schedule
  2016-04-19 14:31   ` Stefan Hajnoczi
@ 2016-05-17 14:57     ` Paolo Bonzini
  2016-05-26 19:19       ` Stefan Hajnoczi
  0 siblings, 1 reply; 27+ messages in thread
From: Paolo Bonzini @ 2016-05-17 14:57 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: qemu-devel, kwolf, berto, famz, stefanha
On 19/04/2016 16:31, Stefan Hajnoczi wrote:
> On Fri, Apr 15, 2016 at 01:31:59PM +0200, Paolo Bonzini wrote:
>> @@ -255,6 +257,8 @@ aio_ctx_finalize(GSource     *source)
>>      }
>>  #endif
>>  
>> +    qemu_bh_delete(ctx->schedule_bh);
> 
> Please include an assertion that the scheduled coroutines list is empty.
Good idea.
>> +    while (!QSLIST_EMPTY(&straight)) {
>> +        Coroutine *co = QSLIST_FIRST(&straight);
>> +        QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
>> +        trace_aio_schedule_bh_cb(ctx, co);
>> +        qemu_coroutine_enter(co, NULL);
FWIW, this should be wrapped with aio_context_acquire/aio_context_release.
>> +    }
>> +}
> 
> This construct brings to mind the use-after-free case when a scheduled
> coroutine terminates before it is entered by this loop:
> 
> There are two scheduled Coroutines: A and B.  During
> qemu_coroutine_enter(A) we enter B.  B then terminates by returning from
> its main function.  Once A yields or terminates we still try to enter
> the freed B coroutine.
> 
> Unfortunately I don't think we have good debugging or an assertion for
> this bug.  I'm sure it will occur at some point...
aio_co_schedule (and qemu_coroutine_wake which wraps it later in the
series) is quite a low-level interface, so I do not expect many users.
That said, there is at least another case where it will be used.  In the
dataplane branch, where AIO callbacks take the AioContext mutex
themselves, we have:
static void bdrv_co_io_em_complete(void *opaque, int ret)
{
    CoroutineIOCompletion *co = opaque;
    co->ret = ret;
    aio_context_acquire(co->ctx);
    qemu_coroutine_enter(co->coroutine, NULL);
    aio_context_release(co->ctx);
}
...
    acb = bs->drv->bdrv_aio_readv(bs, sector_num, iov, nb_sectors,
                                  bdrv_co_io_em_complete, &co);
    qemu_coroutine_yield();
bdrv_co_io_em_complete here can be called before the coroutine has
yielded.  To prepare for the replacement of the AioContext mutex with
fine-grained mutexes, I think bdrv_co_io_em_complete should do something
like
    if (ctx != qemu_get_current_aio_context()) {
        aio_co_schedule(ctx, co->coroutine);
        return;
    }
    aio_context_acquire(ctx);
    qemu_coroutine_enter(co->coroutine, NULL);
    aio_context_release(ctx);
> Please document
> that the coroutine must not be entered by anyone else while
> aio_co_schedule() is active.
Sure.
Paolo
^ permalink raw reply	[flat|nested] 27+ messages in thread
- * Re: [Qemu-devel] [PATCH 04/11] aio: introduce aio_co_schedule
  2016-05-17 14:57     ` Paolo Bonzini
@ 2016-05-26 19:19       ` Stefan Hajnoczi
  0 siblings, 0 replies; 27+ messages in thread
From: Stefan Hajnoczi @ 2016-05-26 19:19 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, kwolf, berto, famz, stefanha
[-- Attachment #1: Type: text/plain, Size: 1192 bytes --]
On Tue, May 17, 2016 at 04:57:28PM +0200, Paolo Bonzini wrote:
> That said, there is at least another case where it will be used.  In the
> dataplane branch, where AIO callbacks take the AioContext mutex
> themselves, we have:
> 
> static void bdrv_co_io_em_complete(void *opaque, int ret)
> {
>     CoroutineIOCompletion *co = opaque;
> 
>     co->ret = ret;
>     aio_context_acquire(co->ctx);
>     qemu_coroutine_enter(co->coroutine, NULL);
>     aio_context_release(co->ctx);
> }
> 
> ...
> 
>     acb = bs->drv->bdrv_aio_readv(bs, sector_num, iov, nb_sectors,
>                                   bdrv_co_io_em_complete, &co);
>     qemu_coroutine_yield();
> 
> bdrv_co_io_em_complete here can be called before the coroutine has
> yielded.  To prepare for the replacement of the AioContext mutex with
> fine-grained mutexes, I think bdrv_co_io_em_complete should do something
> like
> 
>     if (ctx != qemu_get_current_aio_context()) {
>         aio_co_schedule(ctx, co->coroutine);
>         return;
>     }
> 
>     aio_context_acquire(ctx);
>     qemu_coroutine_enter(co->coroutine, NULL);
>     aio_context_release(ctx);
Okay, that makes sense.
[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 473 bytes --]
^ permalink raw reply	[flat|nested] 27+ messages in thread
 
 
- * Re: [Qemu-devel] [PATCH 04/11] aio: introduce aio_co_schedule
  2016-04-15 11:31 ` [Qemu-devel] [PATCH 04/11] aio: introduce aio_co_schedule Paolo Bonzini
  2016-04-19 14:31   ` Stefan Hajnoczi
@ 2016-04-29  5:11   ` Fam Zheng
  2016-05-17 14:38     ` Paolo Bonzini
  1 sibling, 1 reply; 27+ messages in thread
From: Fam Zheng @ 2016-04-29  5:11 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, stefanha, kwolf, berto
On Fri, 04/15 13:31, Paolo Bonzini wrote:
> This provides the infrastructure to start a coroutine on a remote
> AioContext.  It will be used by CoMutex and CoQueue, so that
> coroutines don't jump from one context to another when they
> go to sleep on a mutex or waitqueue.
> 
> aio_co_schedule is based on a lock-free multiple-producer,
> single-consumer queue.  The multiple producers use cmpxchg to add
> to a LIFO stacks.  The consumer (a per-AioContext bottom half) grabs
s/stacks/stack/ ?
> all items added so far, inverts the list to make it FIFO, and goes
> through it one item at a time.
> 
> Most of the changes really are in infrastructure and tests.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  async.c                      |  38 ++++++++
>  include/block/aio.h          |   9 ++
>  include/qemu/coroutine_int.h |  10 +-
>  tests/Makefile               |  11 ++-
>  tests/iothread.c             |  91 ++++++++++++++++++
>  tests/iothread.h             |  25 +++++
>  tests/test-aio-multithread.c | 220 +++++++++++++++++++++++++++++++++++++++++++
>  trace-events                 |   4 +
>  8 files changed, 404 insertions(+), 4 deletions(-)
>  create mode 100644 tests/iothread.c
>  create mode 100644 tests/iothread.h
>  create mode 100644 tests/test-aio-multithread.c
> 
> diff --git a/async.c b/async.c
> index acd3627..ef8b409 100644
> --- a/async.c
> +++ b/async.c
> @@ -30,6 +30,8 @@
>  #include "qemu/main-loop.h"
>  #include "qemu/atomic.h"
>  #include "block/raw-aio.h"
> +#include "trace/generated-tracers.h"
> +#include "qemu/coroutine_int.h"
>  
>  /***********************************************************/
>  /* bottom halves (can be seen as timers which expire ASAP) */
> @@ -255,6 +257,8 @@ aio_ctx_finalize(GSource     *source)
>      }
>  #endif
>  
> +    qemu_bh_delete(ctx->schedule_bh);
> +
>      qemu_lockcnt_lock(&ctx->list_lock);
>      assert(!qemu_lockcnt_count(&ctx->list_lock));
>      while (ctx->first_bh) {
> @@ -335,6 +339,28 @@ static void event_notifier_dummy_cb(EventNotifier *e)
>  {
>  }
>  
> +static void schedule_bh_cb(void *opaque)
> +{
> +    AioContext *ctx = opaque;
> +    QSLIST_HEAD(, Coroutine) straight, reversed;
> +
> +    QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
> +    QSLIST_INIT(&straight);
> +
> +    while (!QSLIST_EMPTY(&reversed)) {
> +        Coroutine *co = QSLIST_FIRST(&reversed);
> +        QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
> +        QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
> +    }
> +
> +    while (!QSLIST_EMPTY(&straight)) {
> +        Coroutine *co = QSLIST_FIRST(&straight);
> +        QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
> +        trace_aio_schedule_bh_cb(ctx, co);
> +        qemu_coroutine_enter(co, NULL);
> +    }
> +}
> +
>  AioContext *aio_context_new(Error **errp)
>  {
>      int ret;
> diff --git a/include/block/aio.h b/include/block/aio.h
> index 8f55d1a..0a344c3 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -46,6 +46,7 @@ typedef struct AioHandler AioHandler;
>  typedef void QEMUBHFunc(void *opaque);
>  typedef void IOHandler(void *opaque);
>  
> +struct Coroutine;
>  struct ThreadPool;
>  struct LinuxAioState;
>  
> @@ -107,6 +108,9 @@ struct AioContext {
>      bool notified;
>      EventNotifier notifier;
>  
> +    QSLIST_HEAD(, Coroutine) scheduled_coroutines;
> +    QEMUBH *schedule_bh;
Maybe rename it to co_schedule_bh to reflect it's used for schedule coroutines?
> +
>      /* Thread pool for performing work and receiving completion callbacks.
>       * Has its own locking.
>       */
> +int main(int argc, char **argv)
> +{
> +    init_clocks();
> +
> +    g_test_init(&argc, &argv, NULL);
> +    g_test_add_func("/aio/multi/lifecycle", test_lifecycle);
> +    if (g_test_quick()) {
> +        g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1);
> +    } else {
> +        g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10);
Should they use different path names, like /aio/multi/schedule/{1,10}?
> +    }
> +    return g_test_run();
> +}
Fam
^ permalink raw reply	[flat|nested] 27+ messages in thread
- * Re: [Qemu-devel] [PATCH 04/11] aio: introduce aio_co_schedule
  2016-04-29  5:11   ` Fam Zheng
@ 2016-05-17 14:38     ` Paolo Bonzini
  0 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2016-05-17 14:38 UTC (permalink / raw)
  To: Fam Zheng; +Cc: qemu-devel, stefanha, kwolf, berto
On 29/04/2016 07:11, Fam Zheng wrote:
> > +int main(int argc, char **argv)
> > +{
> > +    init_clocks();
> > +
> > +    g_test_init(&argc, &argv, NULL);
> > +    g_test_add_func("/aio/multi/lifecycle", test_lifecycle);
> > +    if (g_test_quick()) {
> > +        g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1);
> > +    } else {
> > +        g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10);
> 
> Should they use different path names, like /aio/multi/schedule/{1,10}?
rcutorture doesn't.  I guess it might be changed too, but I don't think
it's too important since the only difference is the duration.
I've applied all your other suggestions.
Paolo
^ permalink raw reply	[flat|nested] 27+ messages in thread
 
 
- * [Qemu-devel] [PATCH 05/11] coroutine-lock: reschedule coroutine on the AioContext it was running on
  2016-04-15 11:31 [Qemu-devel] [RFC PATCH resend 00/11] Make CoMutex/CoQueue/CoRwlock thread-safe Paolo Bonzini
                   ` (3 preceding siblings ...)
  2016-04-15 11:31 ` [Qemu-devel] [PATCH 04/11] aio: introduce aio_co_schedule Paolo Bonzini
@ 2016-04-15 11:32 ` Paolo Bonzini
  2016-04-15 11:32 ` [Qemu-devel] [PATCH 06/11] coroutine-lock: make CoMutex thread-safe Paolo Bonzini
                   ` (6 subsequent siblings)
  11 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2016-04-15 11:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz, kwolf, berto
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 include/block/aio.h        |  5 +++++
 include/qemu/coroutine.h   | 18 ++++++++++++++++++
 iothread.c                 | 16 ++++++++++++++++
 stubs/iothread.c           | 11 +++++++++++
 tests/iothread.c           | 16 ++++++++++++++++
 trace-events               |  3 ++-
 util/qemu-coroutine-lock.c | 13 ++++++++++---
 7 files changed, 78 insertions(+), 4 deletions(-)
diff --git a/include/block/aio.h b/include/block/aio.h
index 0a344c3..7fa909d 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -450,6 +450,11 @@ static inline bool aio_node_check(AioContext *ctx, bool is_external)
 void aio_co_schedule(AioContext *ctx, struct Coroutine *co);
 
 /**
+ * Return the AioContext whose event loop runs in the current I/O thread.
+ */
+AioContext *qemu_get_current_aio_context(void);
+
+/**
  * @ctx: the aio context
  *
  * Return whether we are running in the I/O thread that manages @ctx.
diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index bb23be0..179ab0b 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -73,6 +73,24 @@ Coroutine *qemu_coroutine_create(CoroutineEntry *entry);
 void qemu_coroutine_enter(Coroutine *coroutine, void *opaque);
 
 /**
+ * Wake up a suspended coroutine in the current thread.
+ *
+ * This function is used by the implementation of qemu_coroutine_wake.
+ * It arranges for @co to restart at the next yield point of the
+ * current coroutine.
+ */
+void qemu_coroutine_queue_next(Coroutine *co);
+
+/**
+ * Wake up a suspended coroutine
+ *
+ * This function is used by the synchronization primitives to wake up
+ * a sleeping coroutine.  For single-threaded scenarios it can simply
+ * call qemu_coroutine_queue_next.
+ */
+void qemu_coroutine_wake(AioContext *ctx, Coroutine *co);
+
+/**
  * Transfer control back to a coroutine's caller
  *
  * This function does not return until the coroutine is re-entered using
diff --git a/iothread.c b/iothread.c
index f66ec95..986d125 100644
--- a/iothread.c
+++ b/iothread.c
@@ -21,6 +21,7 @@
 #include "qemu/error-report.h"
 #include "qemu/rcu.h"
 #include "qemu/main-loop.h"
+#include "qemu/coroutine.h"
 
 typedef ObjectClass IOThreadClass;
 
@@ -31,11 +32,26 @@ typedef ObjectClass IOThreadClass;
 
 static __thread IOThread *my_iothread;
 
+void qemu_get_current_aio_context(void)
+{
+    return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
+}
+
 bool aio_context_in_iothread(AioContext *ctx)
 {
     return ctx == (my_iothread ? my_iothread->ctx : qemu_get_aio_context());
 }
 
+void qemu_coroutine_wake(AioContext *ctx, Coroutine *co)
+{
+    if (ctx != qemu_get_current_aio_context()) {
+        aio_co_schedule(ctx, co);
+        return;
+    }
+
+    qemu_coroutine_queue_next(co);
+}
+
 static void *iothread_run(void *opaque)
 {
     IOThread *iothread = opaque;
diff --git a/stubs/iothread.c b/stubs/iothread.c
index 6c02323..fb3e5e3 100644
--- a/stubs/iothread.c
+++ b/stubs/iothread.c
@@ -1,8 +1,19 @@
 #include "qemu/osdep.h"
 #include "block/aio.h"
 #include "qemu/main-loop.h"
+#include "qemu/coroutine.h"
+
+AioContext *qemu_get_current_aio_context(void)
+{
+    return qemu_get_aio_context();
+}
 
 bool aio_context_in_iothread(AioContext *ctx)
 {
     return ctx == qemu_get_aio_context();
 }
+
+void qemu_coroutine_wake(AioContext *ctx, Coroutine *co)
+{
+    qemu_coroutine_queue_next(co);
+}
diff --git a/tests/iothread.c b/tests/iothread.c
index 00ab316..e0d61b5 100644
--- a/tests/iothread.c
+++ b/tests/iothread.c
@@ -16,6 +16,7 @@
 #include "qapi/error.h"
 #include "block/aio.h"
 #include "qemu/main-loop.h"
+#include "qemu/coroutine.h"
 #include "qemu/rcu.h"
 #include "iothread.h"
 
@@ -30,11 +31,26 @@ struct IOThread {
 
 static __thread IOThread *my_iothread;
 
+AioContext *qemu_get_current_aio_context(void)
+{
+    return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
+}
+
 bool aio_context_in_iothread(AioContext *ctx)
 {
     return ctx == (my_iothread ? my_iothread->ctx : qemu_get_aio_context());
 }
 
+void qemu_coroutine_wake(AioContext *ctx, Coroutine *co)
+{
+    if (ctx != qemu_get_current_aio_context()) {
+        aio_co_schedule(ctx, co);
+        return;
+    }
+
+    qemu_coroutine_queue_next(co);
+}
+
 static void *iothread_run(void *opaque)
 {
     IOThread *iothread = opaque;
diff --git a/trace-events b/trace-events
index 78b042c..be17ba8 100644
--- a/trace-events
+++ b/trace-events
@@ -982,8 +982,9 @@ qemu_coroutine_yield(void *from, void *to) "from %p to %p"
 qemu_coroutine_terminate(void *co) "self %p"
 
 # qemu-coroutine-lock.c
+qemu_coroutine_queue_next(void *from, void *nxt) "%p->%p"
 qemu_co_queue_run_restart(void *co) "co %p"
-qemu_co_queue_next(void *nxt) "next %p"
+qemu_co_queue_next(void *ctx, void *nxt) "context %p next %p"
 qemu_co_mutex_lock_entry(void *mutex, void *self) "mutex %p self %p"
 qemu_co_mutex_lock_return(void *mutex, void *self) "mutex %p self %p"
 qemu_co_mutex_unlock_entry(void *mutex, void *self) "mutex %p self %p"
diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
index 1f46970..0af14ab 100644
--- a/util/qemu-coroutine-lock.c
+++ b/util/qemu-coroutine-lock.c
@@ -27,8 +27,16 @@
 #include "qemu/coroutine.h"
 #include "qemu/coroutine_int.h"
 #include "qemu/queue.h"
+#include "block/aio.h"
 #include "trace.h"
 
+void qemu_coroutine_queue_next(Coroutine *co)
+{
+    Coroutine *self = qemu_coroutine_self();
+    trace_qemu_coroutine_queue_next(self, co);
+    QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next);
+}
+
 void qemu_co_queue_init(CoQueue *queue)
 {
     QSIMPLEQ_INIT(&queue->entries);
@@ -37,6 +45,7 @@ void qemu_co_queue_init(CoQueue *queue)
 void coroutine_fn qemu_co_queue_wait(CoQueue *queue)
 {
     Coroutine *self = qemu_coroutine_self();
+    self->ctx = qemu_get_current_aio_context();
     QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
     qemu_coroutine_yield();
     assert(qemu_in_coroutine());
@@ -63,7 +72,6 @@ void qemu_co_queue_run_restart(Coroutine *co)
 
 static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
 {
-    Coroutine *self = qemu_coroutine_self();
     Coroutine *next;
 
     if (QSIMPLEQ_EMPTY(&queue->entries)) {
@@ -72,8 +80,7 @@ static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
 
     while ((next = QSIMPLEQ_FIRST(&queue->entries)) != NULL) {
         QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
-        QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, next, co_queue_next);
-        trace_qemu_co_queue_next(next);
+        qemu_coroutine_wake(next->ctx, next);
         if (single) {
             break;
         }
-- 
2.5.5
^ permalink raw reply related	[flat|nested] 27+ messages in thread
- * [Qemu-devel] [PATCH 06/11] coroutine-lock: make CoMutex thread-safe
  2016-04-15 11:31 [Qemu-devel] [RFC PATCH resend 00/11] Make CoMutex/CoQueue/CoRwlock thread-safe Paolo Bonzini
                   ` (4 preceding siblings ...)
  2016-04-15 11:32 ` [Qemu-devel] [PATCH 05/11] coroutine-lock: reschedule coroutine on the AioContext it was running on Paolo Bonzini
@ 2016-04-15 11:32 ` Paolo Bonzini
  2016-04-29  6:26   ` Fam Zheng
  2016-04-15 11:32 ` [Qemu-devel] [PATCH 07/11] coroutine-lock: add limited spinning to CoMutex Paolo Bonzini
                   ` (5 subsequent siblings)
  11 siblings, 1 reply; 27+ messages in thread
From: Paolo Bonzini @ 2016-04-15 11:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz, kwolf, berto
This uses the lock-free mutex described in the paper '"Blocking without
Locking", or LFTHREADS: A lock-free thread library' by Gidenstam and
Papatriantafilou.  The same technique is used in OSv, and in fact
the code is essentially a conversion to C of OSv's code.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 include/qemu/coroutine.h     |  16 ++++-
 tests/test-aio-multithread.c |  80 ++++++++++++++++++++++++
 trace-events                 |   1 +
 util/qemu-coroutine-lock.c   | 145 ++++++++++++++++++++++++++++++++++++++++---
 4 files changed, 232 insertions(+), 10 deletions(-)
diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index 179ab0b..018a60d 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -156,9 +156,21 @@ bool qemu_co_queue_empty(CoQueue *queue);
 /**
  * Provides a mutex that can be used to synchronise coroutines
  */
+struct CoWaitRecord;
 typedef struct CoMutex {
-    bool locked;
-    CoQueue queue;
+    /* Count of pending lockers; 0 for a free mutex, 1 for an
+     * uncontended mutex.
+     */
+    unsigned locked;
+
+    /* A queue of waiters.  Elements are added atomically in front of
+     * from_push.  to_pop is only populated, and popped from, by whoever
+     * is in charge of the next wakeup.  This can be an unlocker or,
+     * through the handoff protocol, a locker that is about to go to sleep.
+     */
+    QSLIST_HEAD(, CoWaitRecord) from_push, to_pop;
+
+    unsigned handoff, sequence;
 } CoMutex;
 
 /**
diff --git a/tests/test-aio-multithread.c b/tests/test-aio-multithread.c
index 94fecf7..18b3548 100644
--- a/tests/test-aio-multithread.c
+++ b/tests/test-aio-multithread.c
@@ -203,6 +203,82 @@ static void test_multi_co_schedule_10(void)
     test_multi_co_schedule(10);
 }
 
+/* CoMutex thread-safety.  */
+
+static uint32_t atomic_counter;
+static uint32_t counter;
+static CoMutex comutex;
+
+static void test_multi_co_mutex_entry(void *opaque)
+{
+    while (!atomic_mb_read(&now_stopping)) {
+        qemu_co_mutex_lock(&comutex);
+        counter++;
+        qemu_co_mutex_unlock(&comutex);
+
+        /* Increase atomic_counter *after* releasing the mutex.  Otherwise
+         * there is a chance (it happens about 1 in 3 runs) that the iothread
+         * exits before the coroutine is woken up, causing a spurious
+         * assertion failure.
+         */
+        atomic_inc(&atomic_counter);
+    }
+
+}
+
+static void test_multi_co_mutex(int threads, int seconds)
+{
+    int i;
+
+    qemu_co_mutex_init(&comutex);
+    counter = 0;
+    atomic_counter = 0;
+    now_stopping = false;
+
+    create_aio_contexts();
+    assert(threads <= NUM_CONTEXTS);
+    for (i = 0; i < threads; i++) {
+        Coroutine *co1 = qemu_coroutine_create(test_multi_co_mutex_entry);
+        aio_co_schedule(ctx[i], co1);
+    }
+
+    g_usleep(seconds * 1000000);
+
+    atomic_mb_set(&now_stopping, true);
+    join_aio_contexts();
+    g_test_message("%d iterations/second\n", counter / seconds);
+    g_assert_cmpint(counter, ==, atomic_counter);
+}
+
+/* Testing with NUM_CONTEXTS threads focuses on the queue.  The mutex however
+ * is too contended (and the threads spend too much time in aio_poll)
+ * to actually stress the handoff protocol.
+ */
+static void test_multi_co_mutex_1(void)
+{
+    test_multi_co_mutex(NUM_CONTEXTS, 1);
+}
+
+static void test_multi_co_mutex_10(void)
+{
+    test_multi_co_mutex(NUM_CONTEXTS, 10);
+}
+
+/* Testing with fewer threads stresses the handoff protocol too.  Still, the
+ * case where the locker _can_ pick up a handoff is very rare, happening
+ * about 10 times in 1 million, so increase the runtime a bit compared to
+ * other "quick" testcases that only run for 1 second.
+ */
+static void test_multi_co_mutex_2_3(void)
+{
+    test_multi_co_mutex(2, 3);
+}
+
+static void test_multi_co_mutex_2_30(void)
+{
+    test_multi_co_mutex(2, 30);
+}
+
 /* End of tests.  */
 
 int main(int argc, char **argv)
@@ -213,8 +289,12 @@ int main(int argc, char **argv)
     g_test_add_func("/aio/multi/lifecycle", test_lifecycle);
     if (g_test_quick()) {
         g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1);
+        g_test_add_func("/aio/multi/mutex", test_multi_co_mutex_1);
+        g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_3);
     } else {
         g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10);
+        g_test_add_func("/aio/multi/mutex", test_multi_co_mutex_10);
+        g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_30);
     }
     return g_test_run();
 }
diff --git a/trace-events b/trace-events
index be17ba8..39b8ab2 100644
--- a/trace-events
+++ b/trace-events
@@ -985,6 +985,7 @@ qemu_coroutine_terminate(void *co) "self %p"
 qemu_coroutine_queue_next(void *from, void *nxt) "%p->%p"
 qemu_co_queue_run_restart(void *co) "co %p"
 qemu_co_queue_next(void *ctx, void *nxt) "context %p next %p"
+qemu_co_mutex_lock_uncontended(void *mutex, void *self) "mutex %p self %p"
 qemu_co_mutex_lock_entry(void *mutex, void *self) "mutex %p self %p"
 qemu_co_mutex_lock_return(void *mutex, void *self) "mutex %p self %p"
 qemu_co_mutex_unlock_entry(void *mutex, void *self) "mutex %p self %p"
diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
index 0af14ab..7ed0f37 100644
--- a/util/qemu-coroutine-lock.c
+++ b/util/qemu-coroutine-lock.c
@@ -20,6 +20,10 @@
  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  * THE SOFTWARE.
+ *
+ * The lock-free mutex implementation is based on OSv
+ * (core/lfmutex.cc, include/lockfree/mutex.hh).
+ * Copyright (C) 2013 Cloudius Systems, Ltd.
  */
 
 #include "qemu/osdep.h"
@@ -105,24 +109,111 @@ bool qemu_co_queue_empty(CoQueue *queue)
     return QSIMPLEQ_FIRST(&queue->entries) == NULL;
 }
 
+/* The wait records are handled with a multiple-producer, single-consumer
+ * lock-free queue.  There cannot be two concurrent pop_waiter() calls
+ * because pop_waiter() can only come when mutex->handoff is zero.  This can
+ * happen in three cases:
+ * - in qemu_co_mutex_unlock, before the hand-off protocol has started.
+ *   In this case, qemu_co_mutex_lock will see mutex->handoff == 0 and
+ *   not take part in the handoff.
+ * - in qemu_co_mutex_lock, if it steals the hand-off responsibility from
+ *   qemu_co_mutex_unlock.  In this case, qemu_co_mutex_unlock will fail
+ *   the cmpxchg (it will see either 0 or the next sequence value) and
+ *   exit.  The next hand-off cannot begin until qemu_co_mutex_lock has
+ *   woken up someone.
+ * - in qemu_co_mutex_unlock, if it takes the hand-off token itself.
+ *   In this case another iterations starts with mutex->handoff == 0;
+ *   a concurrent qemu_co_mutex_lock will fail the cmpxchg, and
+ *   qemu_co_mutex_unlock will go back to case (1).
+ *
+ * The following functions manage this queue.
+ */
+typedef struct CoWaitRecord {
+    Coroutine *co;
+    QSLIST_ENTRY(CoWaitRecord) next;
+} CoWaitRecord;
+
+static void push_waiter(CoMutex *mutex, CoWaitRecord *w)
+{
+    w->co = qemu_coroutine_self();
+    QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next);
+}
+
+static void move_waiters(CoMutex *mutex)
+{
+    QSLIST_HEAD(, CoWaitRecord) reversed;
+    QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push);
+    while (!QSLIST_EMPTY(&reversed)) {
+        CoWaitRecord *w = QSLIST_FIRST(&reversed);
+        QSLIST_REMOVE_HEAD(&reversed, next);
+        QSLIST_INSERT_HEAD(&mutex->to_pop, w, next);
+    }
+}
+
+static CoWaitRecord *pop_waiter(CoMutex *mutex)
+{
+    CoWaitRecord *w;
+
+    if (QSLIST_EMPTY(&mutex->to_pop)) {
+        move_waiters(mutex);
+        if (QSLIST_EMPTY(&mutex->to_pop)) {
+            return NULL;
+        }
+    }
+    w = QSLIST_FIRST(&mutex->to_pop);
+    QSLIST_REMOVE_HEAD(&mutex->to_pop, next);
+    return w;
+}
+
+static bool has_waiters(CoMutex *mutex)
+{
+    return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push);
+}
+
 void qemu_co_mutex_init(CoMutex *mutex)
 {
     memset(mutex, 0, sizeof(*mutex));
-    qemu_co_queue_init(&mutex->queue);
 }
 
 void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
 {
     Coroutine *self = qemu_coroutine_self();
+    CoWaitRecord w;
+    unsigned old_handoff;
+
+    if (atomic_fetch_inc(&mutex->locked) == 0) {
+        /* Uncontended.  */
+        trace_qemu_co_mutex_lock_uncontended(mutex, self);
+        return;
+    }
 
     trace_qemu_co_mutex_lock_entry(mutex, self);
+    self->ctx = qemu_get_current_aio_context();
+    w.co = self;
+    push_waiter(mutex, &w);
 
-    while (mutex->locked) {
-        qemu_co_queue_wait(&mutex->queue);
-    }
+    /* This is the "Responsibility Hand-Off" protocol; a lock() picks from
+     * a concurrent unlock() the responsibility of waking somebody up.
+     */
+    old_handoff = atomic_mb_read(&mutex->handoff);
+    if (old_handoff &&
+        has_waiters(mutex) &&
+        atomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) {
+        /* There can be no concurrent pops, because there can be only
+         * one active handoff at a time.
+         */
+        CoWaitRecord *to_wake = pop_waiter(mutex);
+        Coroutine *co = to_wake->co;
+        if (co == self) {
+            /* We got the lock ourselves!  */
+            assert(to_wake == &w);
+            return;
+        }
 
-    mutex->locked = true;
+        qemu_coroutine_wake(co->ctx, co);
+    }
 
+    qemu_coroutine_yield();
     trace_qemu_co_mutex_lock_return(mutex, self);
 }
 
@@ -132,12 +223,50 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
 
     trace_qemu_co_mutex_unlock_entry(mutex, self);
 
-    assert(mutex->locked == true);
+    assert(mutex->locked);
     assert(qemu_in_coroutine());
 
-    mutex->locked = false;
-    qemu_co_queue_next(&mutex->queue);
+    if (atomic_fetch_dec(&mutex->locked) == 1) {
+        /* No waiting qemu_co_mutex_lock().  Pfew, that was easy!  */
+        return;
+    }
+
+    for (;;) {
+        CoWaitRecord *to_wake = pop_waiter(mutex);
+        unsigned our_handoff;
+
+        if (to_wake) {
+            Coroutine *co = to_wake->co;
+            qemu_coroutine_wake(co->ctx, co);
+            goto out;
+        }
+
+        /* Some concurrent lock() is in progress (we know this because of
+         * count) but it hasn't yet put itself on the wait queue.
+         * Pick a sequence number for the handoff protocol (not 0).
+         */
+        if (++mutex->sequence == 0) {
+            mutex->sequence = 1;
+        }
+
+        our_handoff = mutex->sequence;
+        atomic_mb_set(&mutex->handoff, our_handoff);
+        if (!has_waiters(mutex)) {
+            /* The concurrent lock has not added itself yet, so it
+             * will be able to pick our handoff.
+             */
+            goto out;
+        }
+
+        /* Try to do the handoff protocol ourselves; if somebody else has
+         * already taken it, however, we're done and they're responsible.
+         */
+        if (atomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) {
+            goto out;
+        }
+    }
 
+out:
     trace_qemu_co_mutex_unlock_return(mutex, self);
 }
 
-- 
2.5.5
^ permalink raw reply related	[flat|nested] 27+ messages in thread
- * Re: [Qemu-devel] [PATCH 06/11] coroutine-lock: make CoMutex thread-safe
  2016-04-15 11:32 ` [Qemu-devel] [PATCH 06/11] coroutine-lock: make CoMutex thread-safe Paolo Bonzini
@ 2016-04-29  6:26   ` Fam Zheng
  2016-05-17 15:34     ` Paolo Bonzini
  0 siblings, 1 reply; 27+ messages in thread
From: Fam Zheng @ 2016-04-29  6:26 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, stefanha, kwolf, berto
On Fri, 04/15 13:32, Paolo Bonzini wrote:
> +/* The wait records are handled with a multiple-producer, single-consumer
> + * lock-free queue.  There cannot be two concurrent pop_waiter() calls
> + * because pop_waiter() can only come when mutex->handoff is zero.  This can
> + * happen in three cases:
> + * - in qemu_co_mutex_unlock, before the hand-off protocol has started.
> + *   In this case, qemu_co_mutex_lock will see mutex->handoff == 0 and
> + *   not take part in the handoff.
> + * - in qemu_co_mutex_lock, if it steals the hand-off responsibility from
> + *   qemu_co_mutex_unlock.  In this case, qemu_co_mutex_unlock will fail
> + *   the cmpxchg (it will see either 0 or the next sequence value) and
> + *   exit.  The next hand-off cannot begin until qemu_co_mutex_lock has
> + *   woken up someone.
> + * - in qemu_co_mutex_unlock, if it takes the hand-off token itself.
> + *   In this case another iterations starts with mutex->handoff == 0;
s/iterations/iteration/
> + *   a concurrent qemu_co_mutex_lock will fail the cmpxchg, and
> + *   qemu_co_mutex_unlock will go back to case (1).
> + *
> + * The following functions manage this queue.
> + */
> +typedef struct CoWaitRecord {
> +    Coroutine *co;
> +    QSLIST_ENTRY(CoWaitRecord) next;
> +} CoWaitRecord;
> +
<snip>
> @@ -132,12 +223,50 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
>  
>      trace_qemu_co_mutex_unlock_entry(mutex, self);
>  
> -    assert(mutex->locked == true);
> +    assert(mutex->locked);
>      assert(qemu_in_coroutine());
>  
> -    mutex->locked = false;
> -    qemu_co_queue_next(&mutex->queue);
> +    if (atomic_fetch_dec(&mutex->locked) == 1) {
> +        /* No waiting qemu_co_mutex_lock().  Pfew, that was easy!  */
> +        return;
> +    }
> +
> +    for (;;) {
> +        CoWaitRecord *to_wake = pop_waiter(mutex);
> +        unsigned our_handoff;
> +
> +        if (to_wake) {
> +            Coroutine *co = to_wake->co;
> +            qemu_coroutine_wake(co->ctx, co);
> +            goto out;
> +        }
> +
> +        /* Some concurrent lock() is in progress (we know this because of
> +         * count) but it hasn't yet put itself on the wait queue.
Unlike OSv's lfmutex.cc, we don't seem to have count. Should the comment say
"locked" instead?
> +         * Pick a sequence number for the handoff protocol (not 0).
> +         */
> +        if (++mutex->sequence == 0) {
> +            mutex->sequence = 1;
> +        }
> +
> +        our_handoff = mutex->sequence;
> +        atomic_mb_set(&mutex->handoff, our_handoff);
> +        if (!has_waiters(mutex)) {
> +            /* The concurrent lock has not added itself yet, so it
> +             * will be able to pick our handoff.
> +             */
> +            goto out;
> +        }
> +
> +        /* Try to do the handoff protocol ourselves; if somebody else has
> +         * already taken it, however, we're done and they're responsible.
> +         */
> +        if (atomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) {
> +            goto out;
> +        }
> +    }
>  
> +out:
>      trace_qemu_co_mutex_unlock_return(mutex, self);
>  }
>  
> -- 
> 2.5.5
> 
> 
^ permalink raw reply	[flat|nested] 27+ messages in thread
- * Re: [Qemu-devel] [PATCH 06/11] coroutine-lock: make CoMutex thread-safe
  2016-04-29  6:26   ` Fam Zheng
@ 2016-05-17 15:34     ` Paolo Bonzini
  0 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2016-05-17 15:34 UTC (permalink / raw)
  To: Fam Zheng; +Cc: qemu-devel, stefanha, kwolf, berto
On 29/04/2016 08:26, Fam Zheng wrote:
> > +        if (to_wake) {
> > +            Coroutine *co = to_wake->co;
> > +            qemu_coroutine_wake(co->ctx, co);
> > +            goto out;
> > +        }
> > +
> > +        /* Some concurrent lock() is in progress (we know this because of
> > +         * count) but it hasn't yet put itself on the wait queue.
> 
> Unlike OSv's lfmutex.cc, we don't seem to have count. Should the comment say
> "locked" instead?
Yes; we know at least one lock() is in progress because we found
mutex->locked > 1.  locked cannot decrease until unlock() wakes up
someone or until a concurrent lock() completes a handoff---which must be
after unlock() writes a nonzero value tomutex->handoff
Paolo
^ permalink raw reply	[flat|nested] 27+ messages in thread
 
 
- * [Qemu-devel] [PATCH 07/11] coroutine-lock: add limited spinning to CoMutex
  2016-04-15 11:31 [Qemu-devel] [RFC PATCH resend 00/11] Make CoMutex/CoQueue/CoRwlock thread-safe Paolo Bonzini
                   ` (5 preceding siblings ...)
  2016-04-15 11:32 ` [Qemu-devel] [PATCH 06/11] coroutine-lock: make CoMutex thread-safe Paolo Bonzini
@ 2016-04-15 11:32 ` Paolo Bonzini
  2016-04-15 11:32 ` [Qemu-devel] [PATCH 08/11] test-aio-multithread: add performance comparison with thread-based mutexes Paolo Bonzini
                   ` (4 subsequent siblings)
  11 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2016-04-15 11:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz, kwolf, berto
Running a very small critical section on pthread_mutex_t and CoMutex
shows that pthread_mutex_t is much faster because it doesn't actually
go to sleep.  What happens is that the critical section is shorter
than the latency of entering the kernel and thus FUTEX_WAIT always
fails.  With CoMutex there is no such latency but you still want to
avoid wait and wakeup.  So introduce it artificially.
This only works with two waiters; because CoMutex is fair, it will
always have more waits and wakeups than a pthread_mutex_t.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 include/qemu/coroutine.h   |  5 +++++
 util/qemu-coroutine-lock.c | 34 ++++++++++++++++++++++++++++++++--
 2 files changed, 37 insertions(+), 2 deletions(-)
diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index 018a60d..d15a09a 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -163,6 +163,11 @@ typedef struct CoMutex {
      */
     unsigned locked;
 
+    /* Context that is holding the lock.  Useful to avoid spinning
+     * when two coroutines on the same AioContext try to get the lock. :)
+     */
+    AioContext *ctx;
+
     /* A queue of waiters.  Elements are added atomically in front of
      * from_push.  to_pop is only populated, and popped from, by whoever
      * is in charge of the next wakeup.  This can be an unlocker or,
diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
index 7ed0f37..aa59e82 100644
--- a/util/qemu-coroutine-lock.c
+++ b/util/qemu-coroutine-lock.c
@@ -177,18 +177,44 @@ void qemu_co_mutex_init(CoMutex *mutex)
 
 void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
 {
+    AioContext *ctx = qemu_get_current_aio_context();
     Coroutine *self = qemu_coroutine_self();
     CoWaitRecord w;
     unsigned old_handoff;
+    int waiters, i;
+
+    /* Running a very small critical section on pthread_mutex_t and CoMutex
+     * shows that pthread_mutex_t is much faster because it doesn't actually
+     * go to sleep.  What happens is that the critical section is shorter
+     * than the latency of entering the kernel and thus FUTEX_WAIT always
+     * fails.  With CoMutex there is no such latency but you still want to
+     * avoid wait and wakeup.  So introduce it artificially.
+     */
+    i = 0;
+retry_fast_path:
+    waiters = atomic_cmpxchg(&mutex->locked, 0, 1);
+    if (waiters != 0) {
+        while (waiters == 1 && ++i < 1000) {
+            if (atomic_read(&mutex->ctx) == ctx) {
+                break;
+            }
+            if (atomic_read(&mutex->locked) == 0) {
+                goto retry_fast_path;
+            }
+            /* cpu_relax(); */
+        }
+        waiters = atomic_fetch_inc(&mutex->locked);
+    }
 
-    if (atomic_fetch_inc(&mutex->locked) == 0) {
+    if (waiters == 0) {
         /* Uncontended.  */
         trace_qemu_co_mutex_lock_uncontended(mutex, self);
+        mutex->ctx = ctx;
         return;
     }
 
     trace_qemu_co_mutex_lock_entry(mutex, self);
-    self->ctx = qemu_get_current_aio_context();
+    self->ctx = ctx;
     w.co = self;
     push_waiter(mutex, &w);
 
@@ -207,9 +233,11 @@ void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
         if (co == self) {
             /* We got the lock ourselves!  */
             assert(to_wake == &w);
+            mutex->ctx = ctx;
             return;
         }
 
+        mutex->ctx = co->ctx;
         qemu_coroutine_wake(co->ctx, co);
     }
 
@@ -223,6 +251,7 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
 
     trace_qemu_co_mutex_unlock_entry(mutex, self);
 
+    mutex->ctx = NULL;
     assert(mutex->locked);
     assert(qemu_in_coroutine());
 
@@ -237,6 +266,7 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
 
         if (to_wake) {
             Coroutine *co = to_wake->co;
+            mutex->ctx = co->ctx;
             qemu_coroutine_wake(co->ctx, co);
             goto out;
         }
-- 
2.5.5
^ permalink raw reply related	[flat|nested] 27+ messages in thread
- * [Qemu-devel] [PATCH 08/11] test-aio-multithread: add performance comparison with thread-based mutexes
  2016-04-15 11:31 [Qemu-devel] [RFC PATCH resend 00/11] Make CoMutex/CoQueue/CoRwlock thread-safe Paolo Bonzini
                   ` (6 preceding siblings ...)
  2016-04-15 11:32 ` [Qemu-devel] [PATCH 07/11] coroutine-lock: add limited spinning to CoMutex Paolo Bonzini
@ 2016-04-15 11:32 ` Paolo Bonzini
  2016-04-29  6:52   ` Fam Zheng
  2016-04-15 11:32 ` [Qemu-devel] [PATCH 09/11] coroutine-lock: place CoMutex before CoQueue in header Paolo Bonzini
                   ` (3 subsequent siblings)
  11 siblings, 1 reply; 27+ messages in thread
From: Paolo Bonzini @ 2016-04-15 11:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz, kwolf, berto
Add two implementations of the same benchmark as the previous patch,
but using pthreads.  One uses a normal QemuMutex, the other is Linux
only and implements a fair mutex based on MCS locks and futexes.
This shows that the slower performance of the 5-thread case is due to
the fairness of CoMutex, rather than to coroutines.  If fairness does
not matter, as is the case with two threads, CoMutex can actually be
faster than pthreads.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 tests/test-aio-multithread.c | 152 +++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 152 insertions(+)
diff --git a/tests/test-aio-multithread.c b/tests/test-aio-multithread.c
index 18b3548..d7bc1bf 100644
--- a/tests/test-aio-multithread.c
+++ b/tests/test-aio-multithread.c
@@ -279,6 +279,150 @@ static void test_multi_co_mutex_2_30(void)
     test_multi_co_mutex(2, 30);
 }
 
+/* Same test with fair mutexes, for performance comparison.  */
+
+#ifdef CONFIG_LINUX
+#include "qemu/futex.h"
+
+/* The nodes for the mutex reside in this structure (on which we try to avoid
+ * false sharing).  The head of the mutex is in the "mutex_head" variable.
+ */
+static struct {
+    int next, locked;
+    int padding[14];
+} nodes[NUM_CONTEXTS] __attribute__((__aligned__(64)));
+
+static int mutex_head = -1;
+
+static void mcs_mutex_lock(void)
+{
+    int prev;
+
+    nodes[id].next = -1;
+    nodes[id].locked = 1;
+    prev = atomic_xchg(&mutex_head, id);
+    if (prev != -1) {
+        atomic_set(&nodes[prev].next, id);
+        futex_wait(&nodes[id].locked, 1);
+    }
+}
+
+static void mcs_mutex_unlock(void)
+{
+    int next;
+    if (nodes[id].next == -1) {
+        if (atomic_read(&mutex_head) == id &&
+            atomic_cmpxchg(&mutex_head, id, -1) == id) {
+            /* Last item in the list, exit.  */
+            return;
+        }
+        while (atomic_read(&nodes[id].next) == -1) {
+            /* Spin... */
+        }
+    }
+
+    /* Wake up the next in line.  */
+    next = nodes[id].next;
+    nodes[next].locked = 0;
+    futex_wake(&nodes[next].locked, 1);
+}
+
+static void test_multi_fair_mutex_entry(void *opaque)
+{
+    while (!atomic_mb_read(&now_stopping)) {
+        mcs_mutex_lock();
+        counter++;
+        mcs_mutex_unlock();
+        atomic_inc(&atomic_counter);
+    }
+
+}
+
+static void test_multi_fair_mutex(int threads, int seconds)
+{
+    int i;
+
+    assert(mutex_head == -1);
+    counter = 0;
+    atomic_counter = 0;
+    now_stopping = false;
+
+    create_aio_contexts();
+    assert(threads <= NUM_CONTEXTS);
+    for (i = 0; i < threads; i++) {
+        Coroutine *co1 = qemu_coroutine_create(test_multi_fair_mutex_entry);
+        aio_co_schedule(ctx[i], co1);
+    }
+
+    g_usleep(seconds * 1000000);
+
+    atomic_mb_set(&now_stopping, true);
+    join_aio_contexts();
+    g_test_message("%d iterations/second\n", counter / seconds);
+    g_assert_cmpint(counter, ==, atomic_counter);
+}
+
+static void test_multi_fair_mutex_1(void)
+{
+    test_multi_fair_mutex(NUM_CONTEXTS, 1);
+}
+
+static void test_multi_fair_mutex_10(void)
+{
+    test_multi_fair_mutex(NUM_CONTEXTS, 10);
+}
+#endif
+
+/* Same test with pthread mutexes, for performance comparison and
+ * portability.  */
+
+static QemuMutex mutex;
+
+static void test_multi_mutex_entry(void *opaque)
+{
+    while (!atomic_mb_read(&now_stopping)) {
+        qemu_mutex_lock(&mutex);
+        counter++;
+        qemu_mutex_unlock(&mutex);
+        atomic_inc(&atomic_counter);
+    }
+
+}
+
+static void test_multi_mutex(int threads, int seconds)
+{
+    int i;
+
+    qemu_mutex_init(&mutex);
+    counter = 0;
+    atomic_counter = 0;
+    now_stopping = false;
+
+    create_aio_contexts();
+    assert(threads <= NUM_CONTEXTS);
+    for (i = 0; i < threads; i++) {
+        Coroutine *co1 = qemu_coroutine_create(test_multi_mutex_entry);
+        aio_co_schedule(ctx[i], co1);
+    }
+
+    g_usleep(seconds * 1000000);
+
+    atomic_mb_set(&now_stopping, true);
+    join_aio_contexts();
+    g_test_message("%d iterations/second\n", counter / seconds);
+    g_assert_cmpint(counter, ==, atomic_counter);
+}
+
+static void test_multi_mutex_1(void)
+{
+    test_multi_mutex(NUM_CONTEXTS, 1);
+}
+
+static void test_multi_mutex_10(void)
+{
+    test_multi_mutex(NUM_CONTEXTS, 10);
+}
+
 /* End of tests.  */
 
 int main(int argc, char **argv)
@@ -291,10 +435,18 @@ int main(int argc, char **argv)
         g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1);
         g_test_add_func("/aio/multi/mutex", test_multi_co_mutex_1);
         g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_3);
+#ifdef CONFIG_LINUX
+        g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_1);
+#endif
+        g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_1);
     } else {
         g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10);
         g_test_add_func("/aio/multi/mutex", test_multi_co_mutex_10);
         g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_30);
+#ifdef CONFIG_LINUX
+        g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_10);
+#endif
+        g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_10);
     }
     return g_test_run();
 }
-- 
2.5.5
^ permalink raw reply related	[flat|nested] 27+ messages in thread
- * Re: [Qemu-devel] [PATCH 08/11] test-aio-multithread: add performance comparison with thread-based mutexes
  2016-04-15 11:32 ` [Qemu-devel] [PATCH 08/11] test-aio-multithread: add performance comparison with thread-based mutexes Paolo Bonzini
@ 2016-04-29  6:52   ` Fam Zheng
  2016-05-12 16:49     ` Paolo Bonzini
  0 siblings, 1 reply; 27+ messages in thread
From: Fam Zheng @ 2016-04-29  6:52 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, stefanha, kwolf, berto
On Fri, 04/15 13:32, Paolo Bonzini wrote:
> Add two implementations of the same benchmark as the previous patch,
> but using pthreads.  One uses a normal QemuMutex, the other is Linux
> only and implements a fair mutex based on MCS locks and futexes.
> This shows that the slower performance of the 5-thread case is due to
> the fairness of CoMutex, rather than to coroutines.  If fairness does
> not matter, as is the case with two threads, CoMutex can actually be
> faster than pthreads.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  tests/test-aio-multithread.c | 152 +++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 152 insertions(+)
> 
> diff --git a/tests/test-aio-multithread.c b/tests/test-aio-multithread.c
> index 18b3548..d7bc1bf 100644
> --- a/tests/test-aio-multithread.c
> +++ b/tests/test-aio-multithread.c
> @@ -279,6 +279,150 @@ static void test_multi_co_mutex_2_30(void)
>      test_multi_co_mutex(2, 30);
>  }
>  
> +/* Same test with fair mutexes, for performance comparison.  */
> +
> +#ifdef CONFIG_LINUX
> +#include "qemu/futex.h"
Do we have qemu/futex.h?
> +
> +/* The nodes for the mutex reside in this structure (on which we try to avoid
> + * false sharing).  The head of the mutex is in the "mutex_head" variable.
> + */
> +static struct {
> +    int next, locked;
> +    int padding[14];
> +} nodes[NUM_CONTEXTS] __attribute__((__aligned__(64)));
> +
> +static int mutex_head = -1;
> +
> +static void mcs_mutex_lock(void)
> +{
> +    int prev;
> +
> +    nodes[id].next = -1;
> +    nodes[id].locked = 1;
> +    prev = atomic_xchg(&mutex_head, id);
> +    if (prev != -1) {
> +        atomic_set(&nodes[prev].next, id);
> +        futex_wait(&nodes[id].locked, 1);
> +    }
> +}
> +
> +static void mcs_mutex_unlock(void)
> +{
> +    int next;
> +    if (nodes[id].next == -1) {
> +        if (atomic_read(&mutex_head) == id &&
> +            atomic_cmpxchg(&mutex_head, id, -1) == id) {
> +            /* Last item in the list, exit.  */
> +            return;
> +        }
> +        while (atomic_read(&nodes[id].next) == -1) {
> +            /* Spin... */
> +        }
> +    }
> +
> +    /* Wake up the next in line.  */
> +    next = nodes[id].next;
> +    nodes[next].locked = 0;
> +    futex_wake(&nodes[next].locked, 1);
> +}
> +
> +static void test_multi_fair_mutex_entry(void *opaque)
> +{
> +    while (!atomic_mb_read(&now_stopping)) {
> +        mcs_mutex_lock();
> +        counter++;
> +        mcs_mutex_unlock();
> +        atomic_inc(&atomic_counter);
> +    }
> +
> +}
> +
> +static void test_multi_fair_mutex(int threads, int seconds)
> +{
> +    int i;
> +
> +    assert(mutex_head == -1);
> +    counter = 0;
> +    atomic_counter = 0;
> +    now_stopping = false;
> +
> +    create_aio_contexts();
> +    assert(threads <= NUM_CONTEXTS);
> +    for (i = 0; i < threads; i++) {
> +        Coroutine *co1 = qemu_coroutine_create(test_multi_fair_mutex_entry);
> +        aio_co_schedule(ctx[i], co1);
> +    }
> +
> +    g_usleep(seconds * 1000000);
> +
> +    atomic_mb_set(&now_stopping, true);
> +    join_aio_contexts();
> +    g_test_message("%d iterations/second\n", counter / seconds);
> +    g_assert_cmpint(counter, ==, atomic_counter);
> +}
> +
> +static void test_multi_fair_mutex_1(void)
> +{
> +    test_multi_fair_mutex(NUM_CONTEXTS, 1);
> +}
> +
> +static void test_multi_fair_mutex_10(void)
> +{
> +    test_multi_fair_mutex(NUM_CONTEXTS, 10);
> +}
> +#endif
> +
> +/* Same test with pthread mutexes, for performance comparison and
> + * portability.  */
> +
> +static QemuMutex mutex;
> +
> +static void test_multi_mutex_entry(void *opaque)
> +{
> +    while (!atomic_mb_read(&now_stopping)) {
> +        qemu_mutex_lock(&mutex);
> +        counter++;
> +        qemu_mutex_unlock(&mutex);
> +        atomic_inc(&atomic_counter);
> +    }
> +
> +}
> +
> +static void test_multi_mutex(int threads, int seconds)
> +{
> +    int i;
> +
> +    qemu_mutex_init(&mutex);
> +    counter = 0;
> +    atomic_counter = 0;
> +    now_stopping = false;
> +
> +    create_aio_contexts();
> +    assert(threads <= NUM_CONTEXTS);
> +    for (i = 0; i < threads; i++) {
> +        Coroutine *co1 = qemu_coroutine_create(test_multi_mutex_entry);
> +        aio_co_schedule(ctx[i], co1);
> +    }
> +
> +    g_usleep(seconds * 1000000);
> +
> +    atomic_mb_set(&now_stopping, true);
> +    join_aio_contexts();
> +    g_test_message("%d iterations/second\n", counter / seconds);
> +    g_assert_cmpint(counter, ==, atomic_counter);
> +}
> +
> +static void test_multi_mutex_1(void)
> +{
> +    test_multi_mutex(NUM_CONTEXTS, 1);
> +}
> +
> +static void test_multi_mutex_10(void)
> +{
> +    test_multi_mutex(NUM_CONTEXTS, 10);
> +}
> +
>  /* End of tests.  */
>  
>  int main(int argc, char **argv)
> @@ -291,10 +435,18 @@ int main(int argc, char **argv)
>          g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1);
>          g_test_add_func("/aio/multi/mutex", test_multi_co_mutex_1);
>          g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_3);
> +#ifdef CONFIG_LINUX
> +        g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_1);
> +#endif
> +        g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_1);
>      } else {
>          g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10);
>          g_test_add_func("/aio/multi/mutex", test_multi_co_mutex_10);
>          g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_30);
> +#ifdef CONFIG_LINUX
> +        g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_10);
> +#endif
> +        g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_10);
>      }
>      return g_test_run();
>  }
> -- 
> 2.5.5
> 
> 
^ permalink raw reply	[flat|nested] 27+ messages in thread
- * Re: [Qemu-devel] [PATCH 08/11] test-aio-multithread: add performance comparison with thread-based mutexes
  2016-04-29  6:52   ` Fam Zheng
@ 2016-05-12 16:49     ` Paolo Bonzini
  0 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2016-05-12 16:49 UTC (permalink / raw)
  To: Fam Zheng; +Cc: qemu-devel, stefanha, kwolf, berto
> >  tests/test-aio-multithread.c | 152
> >  +++++++++++++++++++++++++++++++++++++++++++
> >  1 file changed, 152 insertions(+)
> > 
> > diff --git a/tests/test-aio-multithread.c b/tests/test-aio-multithread.c
> > index 18b3548..d7bc1bf 100644
> > --- a/tests/test-aio-multithread.c
> > +++ b/tests/test-aio-multithread.c
> > @@ -279,6 +279,150 @@ static void test_multi_co_mutex_2_30(void)
> >      test_multi_co_mutex(2, 30);
> >  }
> >  
> > +/* Same test with fair mutexes, for performance comparison.  */
> > +
> > +#ifdef CONFIG_LINUX
> > +#include "qemu/futex.h"
> 
> Do we have qemu/futex.h?
It must be somewhere in the previous 50 patches...  QemuLockCnt adds it.
Paolo
> > +
> > +/* The nodes for the mutex reside in this structure (on which we try to
> > avoid
> > + * false sharing).  The head of the mutex is in the "mutex_head" variable.
> > + */
> > +static struct {
> > +    int next, locked;
> > +    int padding[14];
> > +} nodes[NUM_CONTEXTS] __attribute__((__aligned__(64)));
> > +
> > +static int mutex_head = -1;
> > +
> > +static void mcs_mutex_lock(void)
> > +{
> > +    int prev;
> > +
> > +    nodes[id].next = -1;
> > +    nodes[id].locked = 1;
> > +    prev = atomic_xchg(&mutex_head, id);
> > +    if (prev != -1) {
> > +        atomic_set(&nodes[prev].next, id);
> > +        futex_wait(&nodes[id].locked, 1);
> > +    }
> > +}
> > +
> > +static void mcs_mutex_unlock(void)
> > +{
> > +    int next;
> > +    if (nodes[id].next == -1) {
> > +        if (atomic_read(&mutex_head) == id &&
> > +            atomic_cmpxchg(&mutex_head, id, -1) == id) {
> > +            /* Last item in the list, exit.  */
> > +            return;
> > +        }
> > +        while (atomic_read(&nodes[id].next) == -1) {
> > +            /* Spin... */
> > +        }
> > +    }
> > +
> > +    /* Wake up the next in line.  */
> > +    next = nodes[id].next;
> > +    nodes[next].locked = 0;
> > +    futex_wake(&nodes[next].locked, 1);
> > +}
> > +
> > +static void test_multi_fair_mutex_entry(void *opaque)
> > +{
> > +    while (!atomic_mb_read(&now_stopping)) {
> > +        mcs_mutex_lock();
> > +        counter++;
> > +        mcs_mutex_unlock();
> > +        atomic_inc(&atomic_counter);
> > +    }
> > +
> > +}
> > +
> > +static void test_multi_fair_mutex(int threads, int seconds)
> > +{
> > +    int i;
> > +
> > +    assert(mutex_head == -1);
> > +    counter = 0;
> > +    atomic_counter = 0;
> > +    now_stopping = false;
> > +
> > +    create_aio_contexts();
> > +    assert(threads <= NUM_CONTEXTS);
> > +    for (i = 0; i < threads; i++) {
> > +        Coroutine *co1 =
> > qemu_coroutine_create(test_multi_fair_mutex_entry);
> > +        aio_co_schedule(ctx[i], co1);
> > +    }
> > +
> > +    g_usleep(seconds * 1000000);
> > +
> > +    atomic_mb_set(&now_stopping, true);
> > +    join_aio_contexts();
> > +    g_test_message("%d iterations/second\n", counter / seconds);
> > +    g_assert_cmpint(counter, ==, atomic_counter);
> > +}
> > +
> > +static void test_multi_fair_mutex_1(void)
> > +{
> > +    test_multi_fair_mutex(NUM_CONTEXTS, 1);
> > +}
> > +
> > +static void test_multi_fair_mutex_10(void)
> > +{
> > +    test_multi_fair_mutex(NUM_CONTEXTS, 10);
> > +}
> > +#endif
> > +
> > +/* Same test with pthread mutexes, for performance comparison and
> > + * portability.  */
> > +
> > +static QemuMutex mutex;
> > +
> > +static void test_multi_mutex_entry(void *opaque)
> > +{
> > +    while (!atomic_mb_read(&now_stopping)) {
> > +        qemu_mutex_lock(&mutex);
> > +        counter++;
> > +        qemu_mutex_unlock(&mutex);
> > +        atomic_inc(&atomic_counter);
> > +    }
> > +
> > +}
> > +
> > +static void test_multi_mutex(int threads, int seconds)
> > +{
> > +    int i;
> > +
> > +    qemu_mutex_init(&mutex);
> > +    counter = 0;
> > +    atomic_counter = 0;
> > +    now_stopping = false;
> > +
> > +    create_aio_contexts();
> > +    assert(threads <= NUM_CONTEXTS);
> > +    for (i = 0; i < threads; i++) {
> > +        Coroutine *co1 = qemu_coroutine_create(test_multi_mutex_entry);
> > +        aio_co_schedule(ctx[i], co1);
> > +    }
> > +
> > +    g_usleep(seconds * 1000000);
> > +
> > +    atomic_mb_set(&now_stopping, true);
> > +    join_aio_contexts();
> > +    g_test_message("%d iterations/second\n", counter / seconds);
> > +    g_assert_cmpint(counter, ==, atomic_counter);
> > +}
> > +
> > +static void test_multi_mutex_1(void)
> > +{
> > +    test_multi_mutex(NUM_CONTEXTS, 1);
> > +}
> > +
> > +static void test_multi_mutex_10(void)
> > +{
> > +    test_multi_mutex(NUM_CONTEXTS, 10);
> > +}
> > +
> >  /* End of tests.  */
> >  
> >  int main(int argc, char **argv)
> > @@ -291,10 +435,18 @@ int main(int argc, char **argv)
> >          g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1);
> >          g_test_add_func("/aio/multi/mutex", test_multi_co_mutex_1);
> >          g_test_add_func("/aio/multi/mutex/handoff",
> >          test_multi_co_mutex_2_3);
> > +#ifdef CONFIG_LINUX
> > +        g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_1);
> > +#endif
> > +        g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_1);
> >      } else {
> >          g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10);
> >          g_test_add_func("/aio/multi/mutex", test_multi_co_mutex_10);
> >          g_test_add_func("/aio/multi/mutex/handoff",
> >          test_multi_co_mutex_2_30);
> > +#ifdef CONFIG_LINUX
> > +        g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_10);
> > +#endif
> > +        g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_10);
> >      }
> >      return g_test_run();
> >  }
> > --
> > 2.5.5
> > 
> > 
> 
^ permalink raw reply	[flat|nested] 27+ messages in thread
 
 
- * [Qemu-devel] [PATCH 09/11] coroutine-lock: place CoMutex before CoQueue in header
  2016-04-15 11:31 [Qemu-devel] [RFC PATCH resend 00/11] Make CoMutex/CoQueue/CoRwlock thread-safe Paolo Bonzini
                   ` (7 preceding siblings ...)
  2016-04-15 11:32 ` [Qemu-devel] [PATCH 08/11] test-aio-multithread: add performance comparison with thread-based mutexes Paolo Bonzini
@ 2016-04-15 11:32 ` Paolo Bonzini
  2016-04-15 11:32 ` [Qemu-devel] [PATCH 10/11] coroutine-lock: add mutex argument to CoQueue APIs Paolo Bonzini
                   ` (2 subsequent siblings)
  11 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2016-04-15 11:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz, kwolf, berto
This will avoid forward references in the next patch.  It is also
more logical because CoQueue is not anymore the basic primitive.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 include/qemu/coroutine.h | 79 ++++++++++++++++++++++++------------------------
 1 file changed, 39 insertions(+), 40 deletions(-)
diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index d15a09a..e8e3431 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -113,46 +113,6 @@ Coroutine *coroutine_fn qemu_coroutine_self(void);
 bool qemu_in_coroutine(void);
 
 
-
-/**
- * CoQueues are a mechanism to queue coroutines in order to continue executing
- * them later. They provide the fundamental primitives on which coroutine locks
- * are built.
- */
-typedef struct CoQueue {
-    QSIMPLEQ_HEAD(, Coroutine) entries;
-} CoQueue;
-
-/**
- * Initialise a CoQueue. This must be called before any other operation is used
- * on the CoQueue.
- */
-void qemu_co_queue_init(CoQueue *queue);
-
-/**
- * Adds the current coroutine to the CoQueue and transfers control to the
- * caller of the coroutine.
- */
-void coroutine_fn qemu_co_queue_wait(CoQueue *queue);
-
-/**
- * Restarts the next coroutine in the CoQueue and removes it from the queue.
- *
- * Returns true if a coroutine was restarted, false if the queue is empty.
- */
-bool coroutine_fn qemu_co_queue_next(CoQueue *queue);
-
-/**
- * Restarts all coroutines in the CoQueue and leaves the queue empty.
- */
-void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue);
-
-/**
- * Checks if the CoQueue is empty.
- */
-bool qemu_co_queue_empty(CoQueue *queue);
-
-
 /**
  * Provides a mutex that can be used to synchronise coroutines
  */
@@ -196,6 +156,45 @@ void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex);
  */
 void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex);
 
+
+/**
+ * CoQueues are a mechanism to queue coroutines in order to continue executing
+ * them later.
+ */
+typedef struct CoQueue {
+    QSIMPLEQ_HEAD(, Coroutine) entries;
+} CoQueue;
+
+/**
+ * Initialise a CoQueue. This must be called before any other operation is used
+ * on the CoQueue.
+ */
+void qemu_co_queue_init(CoQueue *queue);
+
+/**
+ * Adds the current coroutine to the CoQueue and transfers control to the
+ * caller of the coroutine.
+ */
+void coroutine_fn qemu_co_queue_wait(CoQueue *queue);
+
+/**
+ * Restarts the next coroutine in the CoQueue and removes it from the queue.
+ *
+ * Returns true if a coroutine was restarted, false if the queue is empty.
+ */
+bool coroutine_fn qemu_co_queue_next(CoQueue *queue);
+
+/**
+ * Restarts all coroutines in the CoQueue and leaves the queue empty.
+ */
+void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue);
+
+/**
+ * Checks if the CoQueue is empty.
+ */
+bool qemu_co_queue_empty(CoQueue *queue);
+
+
 typedef struct CoRwlock {
     bool writer;
     int reader;
-- 
2.5.5
^ permalink raw reply related	[flat|nested] 27+ messages in thread
- * [Qemu-devel] [PATCH 10/11] coroutine-lock: add mutex argument to CoQueue APIs
  2016-04-15 11:31 [Qemu-devel] [RFC PATCH resend 00/11] Make CoMutex/CoQueue/CoRwlock thread-safe Paolo Bonzini
                   ` (8 preceding siblings ...)
  2016-04-15 11:32 ` [Qemu-devel] [PATCH 09/11] coroutine-lock: place CoMutex before CoQueue in header Paolo Bonzini
@ 2016-04-15 11:32 ` Paolo Bonzini
  2016-04-15 11:32 ` [Qemu-devel] [PATCH 11/11] coroutine-lock: make CoRwlock thread-safe and fair Paolo Bonzini
  2016-04-26 10:54 ` [Qemu-devel] [RFC PATCH resend 00/11] Make CoMutex/CoQueue/CoRwlock thread-safe Stefan Hajnoczi
  11 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2016-04-15 11:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz, kwolf, berto
All that CoQueue needs in order to become thread-safe is help
from an external mutex.  Add this to the API.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/backup.c             |  2 +-
 block/io.c                 |  2 +-
 block/qcow2-cluster.c      |  4 +---
 block/sheepdog.c           |  2 +-
 block/throttle-groups.c    |  2 +-
 hw/9pfs/9p.c               |  2 +-
 include/qemu/coroutine.h   |  8 +++++---
 util/qemu-coroutine-lock.c | 24 +++++++++++++++++++++---
 8 files changed, 32 insertions(+), 14 deletions(-)
diff --git a/block/backup.c b/block/backup.c
index 11949f1..db0231f 100644
--- a/block/backup.c
+++ b/block/backup.c
@@ -68,7 +68,7 @@ static void coroutine_fn wait_for_overlapping_requests(BackupBlockJob *job,
         retry = false;
         QLIST_FOREACH(req, &job->inflight_reqs, list) {
             if (end > req->start && start < req->end) {
-                qemu_co_queue_wait(&req->wait_queue);
+                qemu_co_queue_wait(&req->wait_queue, NULL);
                 retry = true;
                 break;
             }
diff --git a/block/io.c b/block/io.c
index c6ea980..279d9dc 100644
--- a/block/io.c
+++ b/block/io.c
@@ -541,7 +541,7 @@ static bool coroutine_fn wait_serialising_requests(BdrvTrackedRequest *self)
                  * (instead of producing a deadlock in the former case). */
                 if (!req->waiting_for) {
                     self->waiting_for = req;
-                    qemu_co_queue_wait(&req->wait_queue);
+                    qemu_co_queue_wait(&req->wait_queue, NULL);
                     self->waiting_for = NULL;
                     retry = true;
                     waited = true;
diff --git a/block/qcow2-cluster.c b/block/qcow2-cluster.c
index 31ecc10..42bab15 100644
--- a/block/qcow2-cluster.c
+++ b/block/qcow2-cluster.c
@@ -927,9 +927,7 @@ static int handle_dependencies(BlockDriverState *bs, uint64_t guest_offset,
             if (bytes == 0) {
                 /* Wait for the dependency to complete. We need to recheck
                  * the free/allocated clusters when we continue. */
-                qemu_co_mutex_unlock(&s->lock);
-                qemu_co_queue_wait(&old_alloc->dependent_requests);
-                qemu_co_mutex_lock(&s->lock);
+                qemu_co_queue_wait(&old_alloc->dependent_requests, &s->lock);
                 return -EAGAIN;
             }
         }
diff --git a/block/sheepdog.c b/block/sheepdog.c
index 790541f..753ae59 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -2165,7 +2165,7 @@ static void wait_for_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *aioc
 retry:
     QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) {
         if (AIOCBOverlapping(aiocb, cb)) {
-            qemu_co_queue_wait(&s->overlapping_queue);
+            qemu_co_queue_wait(&s->overlapping_queue, NULL);
             goto retry;
         }
     }
diff --git a/block/throttle-groups.c b/block/throttle-groups.c
index 53e910e..5630606 100644
--- a/block/throttle-groups.c
+++ b/block/throttle-groups.c
@@ -302,7 +302,7 @@ void coroutine_fn throttle_group_co_io_limits_intercept(BlockDriverState *bs,
     if (must_wait || bs->pending_reqs[is_write]) {
         bs->pending_reqs[is_write]++;
         qemu_mutex_unlock(&tg->lock);
-        qemu_co_queue_wait(&bs->throttled_reqs[is_write]);
+        qemu_co_queue_wait(&bs->throttled_reqs[is_write], NULL);
         qemu_mutex_lock(&tg->lock);
         bs->pending_reqs[is_write]--;
     }
diff --git a/hw/9pfs/9p.c b/hw/9pfs/9p.c
index f5e3012..5f077e8 100644
--- a/hw/9pfs/9p.c
+++ b/hw/9pfs/9p.c
@@ -2282,7 +2282,7 @@ static void v9fs_flush(void *opaque)
         /*
          * Wait for pdu to complete.
          */
-        qemu_co_queue_wait(&cancel_pdu->complete);
+        qemu_co_queue_wait(&cancel_pdu->complete, NULL);
         cancel_pdu->cancelled = 0;
         pdu_free(cancel_pdu);
     }
diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index e8e3431..25e31a1 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -159,7 +159,8 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex);
 
 /**
  * CoQueues are a mechanism to queue coroutines in order to continue executing
- * them later.
+ * them later.  They are similar to condition variables, but they need help
+ * from an external mutex in order to maintain thread-safety.
  */
 typedef struct CoQueue {
     QSIMPLEQ_HEAD(, Coroutine) entries;
@@ -173,9 +174,10 @@ void qemu_co_queue_init(CoQueue *queue);
 
 /**
  * Adds the current coroutine to the CoQueue and transfers control to the
- * caller of the coroutine.
+ * caller of the coroutine.  The mutex is unlocked during the wait and
+ * locked again afterwards.
  */
-void coroutine_fn qemu_co_queue_wait(CoQueue *queue);
+void coroutine_fn qemu_co_queue_wait(CoQueue *queue, CoMutex *mutex);
 
 /**
  * Restarts the next coroutine in the CoQueue and removes it from the queue.
diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
index aa59e82..828d79a 100644
--- a/util/qemu-coroutine-lock.c
+++ b/util/qemu-coroutine-lock.c
@@ -46,13 +46,31 @@ void qemu_co_queue_init(CoQueue *queue)
     QSIMPLEQ_INIT(&queue->entries);
 }
 
-void coroutine_fn qemu_co_queue_wait(CoQueue *queue)
+void coroutine_fn qemu_co_queue_wait(CoQueue *queue, CoMutex *mutex)
 {
     Coroutine *self = qemu_coroutine_self();
     self->ctx = qemu_get_current_aio_context();
     QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
+
+    if (mutex) {
+        qemu_co_mutex_unlock(mutex);
+    }
+
+    /* There is no race condition here.  Other threads will call
+     * aio_co_schedule on our AioContext, which can reenter this
+     * coroutine but only after this yield and after the main loop
+     * has gone through the next iteration.
+     */
     qemu_coroutine_yield();
     assert(qemu_in_coroutine());
+
+    /* TODO: OSv implements wait morphing here, where the wakeup
+     * primitive automatically places the woken coroutine on the
+     * mutex's queue.  This avoids the thundering herd effect.
+     */
+    if (mutex) {
+        qemu_co_mutex_lock(mutex);
+    }
 }
 
 /**
@@ -309,7 +327,7 @@ void qemu_co_rwlock_init(CoRwlock *lock)
 void qemu_co_rwlock_rdlock(CoRwlock *lock)
 {
     while (lock->writer) {
-        qemu_co_queue_wait(&lock->queue);
+        qemu_co_queue_wait(&lock->queue, NULL);
     }
     lock->reader++;
 }
@@ -333,7 +351,7 @@ void qemu_co_rwlock_unlock(CoRwlock *lock)
 void qemu_co_rwlock_wrlock(CoRwlock *lock)
 {
     while (lock->writer || lock->reader) {
-        qemu_co_queue_wait(&lock->queue);
+        qemu_co_queue_wait(&lock->queue, NULL);
     }
     lock->writer = true;
 }
-- 
2.5.5
^ permalink raw reply related	[flat|nested] 27+ messages in thread
- * [Qemu-devel] [PATCH 11/11] coroutine-lock: make CoRwlock thread-safe and fair
  2016-04-15 11:31 [Qemu-devel] [RFC PATCH resend 00/11] Make CoMutex/CoQueue/CoRwlock thread-safe Paolo Bonzini
                   ` (9 preceding siblings ...)
  2016-04-15 11:32 ` [Qemu-devel] [PATCH 10/11] coroutine-lock: add mutex argument to CoQueue APIs Paolo Bonzini
@ 2016-04-15 11:32 ` Paolo Bonzini
  2016-04-26 10:54 ` [Qemu-devel] [RFC PATCH resend 00/11] Make CoMutex/CoQueue/CoRwlock thread-safe Stefan Hajnoczi
  11 siblings, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2016-04-15 11:32 UTC (permalink / raw)
  To: qemu-devel; +Cc: stefanha, famz, kwolf, berto
This adds a CoMutex around the existing CoQueue.  Because the write-side
can just take CoMutex, the old "writer" field is not necessary anymore.
Instead of removing it altogether, count the number of pending writers
during a read-side critical section and forbid further readers from
entering.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 include/qemu/coroutine.h   |  3 ++-
 util/qemu-coroutine-lock.c | 29 +++++++++++++++++++++--------
 2 files changed, 23 insertions(+), 9 deletions(-)
diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index 25e31a1..0dfb301 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -198,8 +198,9 @@ bool qemu_co_queue_empty(CoQueue *queue);
 
 
 typedef struct CoRwlock {
-    bool writer;
+    int pending_writer;
     int reader;
+    CoMutex mutex;
     CoQueue queue;
 } CoRwlock;
 
diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
index 828d79a..db9cf01 100644
--- a/util/qemu-coroutine-lock.c
+++ b/util/qemu-coroutine-lock.c
@@ -322,36 +322,49 @@ void qemu_co_rwlock_init(CoRwlock *lock)
 {
     memset(lock, 0, sizeof(*lock));
     qemu_co_queue_init(&lock->queue);
+    qemu_co_mutex_init(&lock->mutex);
 }
 
 void qemu_co_rwlock_rdlock(CoRwlock *lock)
 {
-    while (lock->writer) {
-        qemu_co_queue_wait(&lock->queue, NULL);
+    qemu_co_mutex_lock(&lock->mutex);
+    /* For fairness, wait if a writer is in line.  */
+    while (lock->pending_writer) {
+        qemu_co_queue_wait(&lock->queue, &lock->mutex);
     }
     lock->reader++;
+    qemu_co_mutex_unlock(&lock->mutex);
+
+    /* The rest of the read-side critical section is run without the mutex.  */
 }
 
 void qemu_co_rwlock_unlock(CoRwlock *lock)
 {
     assert(qemu_in_coroutine());
-    if (lock->writer) {
-        lock->writer = false;
+    if (!lock->reader) {
+        /* The critical section started in qemu_co_rwlock_wrlock.  */
         qemu_co_queue_restart_all(&lock->queue);
     } else {
+        qemu_co_mutex_lock(&lock->mutex);
         lock->reader--;
-        assert(lock->reader >= 0);
         /* Wakeup only one waiting writer */
         if (!lock->reader) {
             qemu_co_queue_next(&lock->queue);
         }
     }
+    qemu_co_mutex_unlock(&lock->mutex);
 }
 
 void qemu_co_rwlock_wrlock(CoRwlock *lock)
 {
-    while (lock->writer || lock->reader) {
-        qemu_co_queue_wait(&lock->queue, NULL);
+    qemu_co_mutex_lock(&lock->mutex);
+    lock->pending_writer++;
+    while (lock->reader) {
+        qemu_co_queue_wait(&lock->queue, &lock->mutex);
     }
-    lock->writer = true;
+    lock->pending_writer--;
+
+    /* The rest of the write-side critical section is run with
+     * the mutex taken, so lock->reader remains zero.
+     */
 }
-- 
2.5.5
^ permalink raw reply related	[flat|nested] 27+ messages in thread
- * Re: [Qemu-devel] [RFC PATCH resend 00/11] Make CoMutex/CoQueue/CoRwlock thread-safe
  2016-04-15 11:31 [Qemu-devel] [RFC PATCH resend 00/11] Make CoMutex/CoQueue/CoRwlock thread-safe Paolo Bonzini
                   ` (10 preceding siblings ...)
  2016-04-15 11:32 ` [Qemu-devel] [PATCH 11/11] coroutine-lock: make CoRwlock thread-safe and fair Paolo Bonzini
@ 2016-04-26 10:54 ` Stefan Hajnoczi
  2016-04-27 15:42   ` Stefan Hajnoczi
  2016-05-17 15:34   ` Paolo Bonzini
  11 siblings, 2 replies; 27+ messages in thread
From: Stefan Hajnoczi @ 2016-04-26 10:54 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: qemu-devel, famz, kwolf, berto
[-- Attachment #1: Type: text/plain, Size: 7991 bytes --]
On Fri, Apr 15, 2016 at 01:31:55PM +0200, Paolo Bonzini wrote:
> [this time including the mailing list]
> 
> This is yet another tiny bit of the multiqueue work, this time affecting
> the synchronization infrastructure for coroutines.  Currently, coroutines
> synchronize between the main I/O thread and the dataplane iothread through
> the AioContext lock.  However, for multiqueue a single BDS will be used
> by multiple iothreads and hence multiple AioContexts.  This calls for
> a different approach to coroutine synchronization.  This series is my
> first attempt at it.  Besides multiqueue, I think throttling groups
> (which can already span multiple AioContexts) could also take advantage
> of the new CoMutexes.
> 
> The series has two main parts:
> 
> - it makes CoMutex bind coroutines to an AioContexts.  It is of course
>   still possible to move coroutines around with explicit yield and
>   qemu_coroutine_enter, but a CoMutex will now reenter the coroutine
>   where it was running before, rather than in the AioContext of
>   the previous owner.  To do this, a new function aio_co_schedule is
>   introduced to run a coroutine on a given iothread.  I think this could
>   be used in other places too; for now it lets a CoMutex protect stuff
>   across multiple AioContexts without them moving around(*).  Of course
>   currently a CoMutex is generally not used across multiple iothreads,
>   because you have to acquire/release the AioContext around CoMutex
>   critical sections.  However...
> 
> - ... the second change is exactly to make CoMutex thread-safe and remove
>   the need for a "thread-based" mutex around it.  The new CoMutex is
>   exactly the same as a mutex implementation that you'd find in an
>   operating system.  iothreads are the moral equivalent of CPUs in
>   a kernel, while coroutines resemble kernel threads running without
>   preemption on a CPU.  Even if you have to take concurrency into account,
>   the lack of preemption while running coroutines or bottom halves
>   keeps the complexity at bay.  For example, it is easy to synchronize
>   between qemu_co_mutex_lock's yield and the qemu_coroutine_enter in
>   aio_co_schedule's bottom half.
> 
>   Same as before, CoMutex puts coroutines to sleep with
>   qemu_coroutine_yield and wake them up with the new aio_co_schedule.
>   I could have wrapped CoMutex's CoQueue with a "regular" thread mutex or
>   spinlock.  The resulting code would have looked a lot like RFifoLock
>   (with CoQueue replacing RFifoLock's condition variable).  Rather,
>   inspired by the parallel between coroutines and non-preemptive kernel
>   threads, I chose to adopt the same lock-free mutex algorithm as OSv.
>   The algorithm only needs two to four atomic ops for a lock-unlock pair
>   (two when uncontended).  To cover CoQueue, each CoQueue is made to
>   depend on a CoMutex, similar to condition variables.  Most CoQueues
>   already have a corresponding CoMutex so this is not a big deal;
>   converting the others is left for a future series.  I did this because
>   CoQueue looks a lot like OSv's waitqueues; so if necessary, we could
>   even take OSv's support for wait morphing (which avoids the thundering
>   herd problem) and add it to CoMutex and CoQueue.  This may be useful
>   when making tracked_requests thread-safe.
> 
> Kevin: this has nothing to do with my old plan from Brno, and it's
> possibly a lot closer to what you wanted.  Your idea was to automatically
> release the "thread mutex" when a coroutine yields, I think you should
> be fine with not having a thread mutex at all!
> 
> This will need some changes in the formats because, for multiqueue,
> CoMutexes would need to be used like "real" thread mutexes.  Code like
> this:
> 
>     ...
>     qemu_co_mutex_unlock()
>     ... /* still access shared data, but don't yield */
>     qemu_coroutine_yield()
> 
> might be required to use this other pattern:
> 
>     ... /* access shared data, but don't yield */
>     qemu_co_mutex_unlock()
>     qemu_coroutine_yield()
> 
> because "adding a second CPU" is already introducing concurrency that
> wasn't there before.  The "non-preemptive multitasking" reference only
> applies to things that access AioContext-local data.  This includes the
> synchronization primitives implemented in this series, the thread pool,
> the Linux AIO support, but not much else.  It still simplifies _those_
> though. :)
> 
> Anyhow, we'll always have some BlockDriver that do not support multiqueue,
> such as the network protocols.  Thus it would be possible to handle the
> formats one at a time.  raw-posix, raw and qcow2 would already form a
> pretty good set, and the first two do not use CoMutex at all.
> 
> The patch has quite a lot of new code, but about half of it is testcases.
> The new test covers correctness and (when run with --verbose) also takes a
> stab at measuring performance; the results is that performance of CoMutex
> is comparable to pthread mutexes.  The only snag is that that you cannot
> make a direct comparison between CoMutex (fair) and pthread_mutex_t
> (unfair).  For this reason the testcase also measures performance of a
> quick-and-dirty implementation of a fair mutex, based on MCS locks +
> futexes.
> 
> There's a lot of meat in the above text, and I hope it will make the code
> clearer and compensate for the terse commit messages. :)  I'll probably
> write a single-threaded testcase too, just to provide some more unit
> test comparison of "before" and "after".
> 
> I haven't even started a guest with this patches, let alone run
> qemu-iotests... generally the changes are well confined to unit tested
> code, patch 2 for example is completely untested.  There are a couple
> other places that at least need more comments, but I wanted to throw
> the patch out for an early review of the general approach.
> 
> Paolo Bonzini (11):
>   coroutine: use QSIMPLEQ instead of QTAILQ
>   throttle-groups: restart throttled requests from coroutine context
>   coroutine: delete qemu_co_enter_next
>   aio: introduce aio_co_schedule
>   coroutine-lock: reschedule coroutine on the AioContext it was running on
>   coroutine-lock: make CoMutex thread-safe
>   coroutine-lock: add limited spinning to CoMutex
>   test-aio-multithread: add performance comparison with thread-based mutexes
>   coroutine-lock: place CoMutex before CoQueue in header
>   coroutine-lock: add mutex argument to CoQueue APIs
>   coroutine-lock: make CoRwlock thread-safe and fair
> 
>  async.c                      |  38 ++++
>  block/backup.c               |   2 +-
>  block/io.c                   |   2 +-
>  block/qcow2-cluster.c        |   4 +-
>  block/sheepdog.c             |   2 +-
>  block/throttle-groups.c      |  71 +++++--
>  hw/9pfs/9p.c                 |   2 +-
>  include/block/aio.h          |  14 ++
>  include/qemu/atomic.h        |   3 +
>  include/qemu/coroutine.h     | 106 ++++++----
>  include/qemu/coroutine_int.h |  14 +-
>  iothread.c                   |  16 ++
>  stubs/iothread.c             |  11 ++
>  tests/Makefile               |  11 +-
>  tests/iothread.c             | 107 ++++++++++
>  tests/iothread.h             |  25 +++
>  tests/test-aio-multithread.c | 452 +++++++++++++++++++++++++++++++++++++++++++
>  trace-events                 |   8 +-
>  util/qemu-coroutine-lock.c   | 257 ++++++++++++++++++++----
>  util/qemu-coroutine.c        |   2 +-
>  20 files changed, 1037 insertions(+), 110 deletions(-)
>  create mode 100644 tests/iothread.c
>  create mode 100644 tests/iothread.h
>  create mode 100644 tests/test-aio-multithread.c
Looks promising, modulo questions and comments I posted on patches.
I wonder whether it would be cleaner to introduce new primitives instead
of modifying CoMutex/CoQueue.  That way it would be clear whether code
is written to be thread-safe or not.
[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 473 bytes --]
^ permalink raw reply	[flat|nested] 27+ messages in thread
- * Re: [Qemu-devel] [RFC PATCH resend 00/11] Make CoMutex/CoQueue/CoRwlock thread-safe
  2016-04-26 10:54 ` [Qemu-devel] [RFC PATCH resend 00/11] Make CoMutex/CoQueue/CoRwlock thread-safe Stefan Hajnoczi
@ 2016-04-27 15:42   ` Stefan Hajnoczi
  2016-05-17 15:34   ` Paolo Bonzini
  1 sibling, 0 replies; 27+ messages in thread
From: Stefan Hajnoczi @ 2016-04-27 15:42 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: Paolo Bonzini, kwolf, berto, famz, qemu-devel
[-- Attachment #1: Type: text/plain, Size: 8387 bytes --]
On Tue, Apr 26, 2016 at 11:54:19AM +0100, Stefan Hajnoczi wrote:
> On Fri, Apr 15, 2016 at 01:31:55PM +0200, Paolo Bonzini wrote:
> > [this time including the mailing list]
> > 
> > This is yet another tiny bit of the multiqueue work, this time affecting
> > the synchronization infrastructure for coroutines.  Currently, coroutines
> > synchronize between the main I/O thread and the dataplane iothread through
> > the AioContext lock.  However, for multiqueue a single BDS will be used
> > by multiple iothreads and hence multiple AioContexts.  This calls for
> > a different approach to coroutine synchronization.  This series is my
> > first attempt at it.  Besides multiqueue, I think throttling groups
> > (which can already span multiple AioContexts) could also take advantage
> > of the new CoMutexes.
> > 
> > The series has two main parts:
> > 
> > - it makes CoMutex bind coroutines to an AioContexts.  It is of course
> >   still possible to move coroutines around with explicit yield and
> >   qemu_coroutine_enter, but a CoMutex will now reenter the coroutine
> >   where it was running before, rather than in the AioContext of
> >   the previous owner.  To do this, a new function aio_co_schedule is
> >   introduced to run a coroutine on a given iothread.  I think this could
> >   be used in other places too; for now it lets a CoMutex protect stuff
> >   across multiple AioContexts without them moving around(*).  Of course
> >   currently a CoMutex is generally not used across multiple iothreads,
> >   because you have to acquire/release the AioContext around CoMutex
> >   critical sections.  However...
> > 
> > - ... the second change is exactly to make CoMutex thread-safe and remove
> >   the need for a "thread-based" mutex around it.  The new CoMutex is
> >   exactly the same as a mutex implementation that you'd find in an
> >   operating system.  iothreads are the moral equivalent of CPUs in
> >   a kernel, while coroutines resemble kernel threads running without
> >   preemption on a CPU.  Even if you have to take concurrency into account,
> >   the lack of preemption while running coroutines or bottom halves
> >   keeps the complexity at bay.  For example, it is easy to synchronize
> >   between qemu_co_mutex_lock's yield and the qemu_coroutine_enter in
> >   aio_co_schedule's bottom half.
> > 
> >   Same as before, CoMutex puts coroutines to sleep with
> >   qemu_coroutine_yield and wake them up with the new aio_co_schedule.
> >   I could have wrapped CoMutex's CoQueue with a "regular" thread mutex or
> >   spinlock.  The resulting code would have looked a lot like RFifoLock
> >   (with CoQueue replacing RFifoLock's condition variable).  Rather,
> >   inspired by the parallel between coroutines and non-preemptive kernel
> >   threads, I chose to adopt the same lock-free mutex algorithm as OSv.
> >   The algorithm only needs two to four atomic ops for a lock-unlock pair
> >   (two when uncontended).  To cover CoQueue, each CoQueue is made to
> >   depend on a CoMutex, similar to condition variables.  Most CoQueues
> >   already have a corresponding CoMutex so this is not a big deal;
> >   converting the others is left for a future series.  I did this because
> >   CoQueue looks a lot like OSv's waitqueues; so if necessary, we could
> >   even take OSv's support for wait morphing (which avoids the thundering
> >   herd problem) and add it to CoMutex and CoQueue.  This may be useful
> >   when making tracked_requests thread-safe.
> > 
> > Kevin: this has nothing to do with my old plan from Brno, and it's
> > possibly a lot closer to what you wanted.  Your idea was to automatically
> > release the "thread mutex" when a coroutine yields, I think you should
> > be fine with not having a thread mutex at all!
> > 
> > This will need some changes in the formats because, for multiqueue,
> > CoMutexes would need to be used like "real" thread mutexes.  Code like
> > this:
> > 
> >     ...
> >     qemu_co_mutex_unlock()
> >     ... /* still access shared data, but don't yield */
> >     qemu_coroutine_yield()
> > 
> > might be required to use this other pattern:
> > 
> >     ... /* access shared data, but don't yield */
> >     qemu_co_mutex_unlock()
> >     qemu_coroutine_yield()
> > 
> > because "adding a second CPU" is already introducing concurrency that
> > wasn't there before.  The "non-preemptive multitasking" reference only
> > applies to things that access AioContext-local data.  This includes the
> > synchronization primitives implemented in this series, the thread pool,
> > the Linux AIO support, but not much else.  It still simplifies _those_
> > though. :)
> > 
> > Anyhow, we'll always have some BlockDriver that do not support multiqueue,
> > such as the network protocols.  Thus it would be possible to handle the
> > formats one at a time.  raw-posix, raw and qcow2 would already form a
> > pretty good set, and the first two do not use CoMutex at all.
> > 
> > The patch has quite a lot of new code, but about half of it is testcases.
> > The new test covers correctness and (when run with --verbose) also takes a
> > stab at measuring performance; the results is that performance of CoMutex
> > is comparable to pthread mutexes.  The only snag is that that you cannot
> > make a direct comparison between CoMutex (fair) and pthread_mutex_t
> > (unfair).  For this reason the testcase also measures performance of a
> > quick-and-dirty implementation of a fair mutex, based on MCS locks +
> > futexes.
> > 
> > There's a lot of meat in the above text, and I hope it will make the code
> > clearer and compensate for the terse commit messages. :)  I'll probably
> > write a single-threaded testcase too, just to provide some more unit
> > test comparison of "before" and "after".
> > 
> > I haven't even started a guest with this patches, let alone run
> > qemu-iotests... generally the changes are well confined to unit tested
> > code, patch 2 for example is completely untested.  There are a couple
> > other places that at least need more comments, but I wanted to throw
> > the patch out for an early review of the general approach.
> > 
> > Paolo Bonzini (11):
> >   coroutine: use QSIMPLEQ instead of QTAILQ
> >   throttle-groups: restart throttled requests from coroutine context
> >   coroutine: delete qemu_co_enter_next
> >   aio: introduce aio_co_schedule
> >   coroutine-lock: reschedule coroutine on the AioContext it was running on
> >   coroutine-lock: make CoMutex thread-safe
> >   coroutine-lock: add limited spinning to CoMutex
> >   test-aio-multithread: add performance comparison with thread-based mutexes
> >   coroutine-lock: place CoMutex before CoQueue in header
> >   coroutine-lock: add mutex argument to CoQueue APIs
> >   coroutine-lock: make CoRwlock thread-safe and fair
> > 
> >  async.c                      |  38 ++++
> >  block/backup.c               |   2 +-
> >  block/io.c                   |   2 +-
> >  block/qcow2-cluster.c        |   4 +-
> >  block/sheepdog.c             |   2 +-
> >  block/throttle-groups.c      |  71 +++++--
> >  hw/9pfs/9p.c                 |   2 +-
> >  include/block/aio.h          |  14 ++
> >  include/qemu/atomic.h        |   3 +
> >  include/qemu/coroutine.h     | 106 ++++++----
> >  include/qemu/coroutine_int.h |  14 +-
> >  iothread.c                   |  16 ++
> >  stubs/iothread.c             |  11 ++
> >  tests/Makefile               |  11 +-
> >  tests/iothread.c             | 107 ++++++++++
> >  tests/iothread.h             |  25 +++
> >  tests/test-aio-multithread.c | 452 +++++++++++++++++++++++++++++++++++++++++++
> >  trace-events                 |   8 +-
> >  util/qemu-coroutine-lock.c   | 257 ++++++++++++++++++++----
> >  util/qemu-coroutine.c        |   2 +-
> >  20 files changed, 1037 insertions(+), 110 deletions(-)
> >  create mode 100644 tests/iothread.c
> >  create mode 100644 tests/iothread.h
> >  create mode 100644 tests/test-aio-multithread.c
> 
> Looks promising, modulo questions and comments I posted on patches.
I should add that I didn't review the internals of the new thread-safe
CoMutex implemenation due to time constraints.  If you want me to think
through the cases I can do it but I'm happy to trust you and the OSv
code.
[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 473 bytes --]
^ permalink raw reply	[flat|nested] 27+ messages in thread 
- * Re: [Qemu-devel] [RFC PATCH resend 00/11] Make CoMutex/CoQueue/CoRwlock thread-safe
  2016-04-26 10:54 ` [Qemu-devel] [RFC PATCH resend 00/11] Make CoMutex/CoQueue/CoRwlock thread-safe Stefan Hajnoczi
  2016-04-27 15:42   ` Stefan Hajnoczi
@ 2016-05-17 15:34   ` Paolo Bonzini
  1 sibling, 0 replies; 27+ messages in thread
From: Paolo Bonzini @ 2016-05-17 15:34 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: qemu-devel, famz, kwolf, berto
On 26/04/2016 12:54, Stefan Hajnoczi wrote:
> I wonder whether it would be cleaner to introduce new primitives instead
> of modifying CoMutex/CoQueue.  That way it would be clear whether code
> is written to be thread-safe or not.
Interesting suggestion.  I think we have few enough CoMutexes/CoQueues
(also, almost all of them in core code and/or coarse-grained) that we
can do the conversion in one series, but if I'm wrong it's certainly a
possibility.
vdi.c is a CoMutex that I know is not thread safe, but I've done the
conversion to CoRwlock and it is not hard.
Thanks,
Paolo
^ permalink raw reply	[flat|nested] 27+ messages in thread