qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH 0/5] threadpool: support multiple ThreadPools
@ 2013-03-06 15:45 Stefan Hajnoczi
  2013-03-06 15:45 ` [Qemu-devel] [PATCH 1/5] threadpool: move globals into struct ThreadPool Stefan Hajnoczi
                   ` (4 more replies)
  0 siblings, 5 replies; 14+ messages in thread
From: Stefan Hajnoczi @ 2013-03-06 15:45 UTC (permalink / raw)
  To: qemu-devel; +Cc: Kevin Wolf, Paolo Bonzini, Stefan Hajnoczi

This patch series changes the global thread pool to a one ThreadPool per
AioContext model.  We still only use the main loop AioContext so in practice
there is just one ThreadPool.  But this opens the door to refactoring the block
layer (which depends on ThreadPool) so block devices can be accessed outside
the global mutex in the future.

ThreadPool is tightly bound to an AioContext because it uses an EventNotifier
to signal work completion.  Completed work items are reaped and their callback
functions are invoked from the EventNotifier read handler (executing under
AioContext).

It might be possible to record the AioContext for the completion callback on a
per-request basis and continuing to use a global pool of worker threads.  After
discussing thread pool models with Paolo I have been convinced that it is
simpler and more scalable to have one ThreadPool per AioContext instead.
Therefore this series implements the 1:1 approach.  For details on previous
thread pool model discussion, see:
http://lists.gnu.org/archive/html/qemu-devel/2013-02/msg03987.html

At the end of this series block/raw-posix.c and block/raw-win32.c are aware of
the ThreadPool they submit work to.  The next step after this series is to
associate BlockDriverState with an AioContext so that the block layer can run
outside the global main loop.

Stefan Hajnoczi (5):
  threadpool: move globals into struct ThreadPool
  threadpool: add thread_pool_new() and thread_pool_free()
  aio: add a ThreadPool instance to AioContext
  main-loop: add qemu_get_aio_context()
  threadpool: drop global thread pool

 async.c                     |  11 ++
 block/raw-posix.c           |   8 +-
 block/raw-win32.c           |   4 +-
 include/block/aio.h         |   6 ++
 include/block/thread-pool.h |  15 ++-
 include/qemu/main-loop.h    |   5 +
 main-loop.c                 |   5 +
 tests/test-thread-pool.c    |  44 ++++----
 thread-pool.c               | 252 ++++++++++++++++++++++++++++----------------
 trace-events                |   4 +-
 10 files changed, 233 insertions(+), 121 deletions(-)

-- 
1.8.1.4

^ permalink raw reply	[flat|nested] 14+ messages in thread

* [Qemu-devel] [PATCH 1/5] threadpool: move globals into struct ThreadPool
  2013-03-06 15:45 [Qemu-devel] [PATCH 0/5] threadpool: support multiple ThreadPools Stefan Hajnoczi
@ 2013-03-06 15:45 ` Stefan Hajnoczi
  2013-03-06 17:25   ` Paolo Bonzini
  2013-03-08  3:18   ` Wenchao Xia
  2013-03-06 15:45 ` [Qemu-devel] [PATCH 2/5] threadpool: add thread_pool_new() and thread_pool_free() Stefan Hajnoczi
                   ` (3 subsequent siblings)
  4 siblings, 2 replies; 14+ messages in thread
From: Stefan Hajnoczi @ 2013-03-06 15:45 UTC (permalink / raw)
  To: qemu-devel; +Cc: Kevin Wolf, Paolo Bonzini, Stefan Hajnoczi

Move global variables into a struct so multiple thread pools can be
supported in the future.

This patch does not change thread-pool.h interfaces.  There is still a
global thread pool and it is not yet possible to create/destroy
individual thread pools.  Moving the variables into a struct first makes
later patches easier to review.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 thread-pool.c | 195 ++++++++++++++++++++++++++++++++++------------------------
 trace-events  |   4 +-
 2 files changed, 117 insertions(+), 82 deletions(-)

diff --git a/thread-pool.c b/thread-pool.c
index e3ca64d..8a957b9 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -24,7 +24,9 @@
 #include "qemu/event_notifier.h"
 #include "block/thread-pool.h"
 
-static void do_spawn_thread(void);
+typedef struct ThreadPool ThreadPool;
+
+static void do_spawn_thread(ThreadPool *pool);
 
 typedef struct ThreadPoolElement ThreadPoolElement;
 
@@ -37,6 +39,7 @@ enum ThreadState {
 
 struct ThreadPoolElement {
     BlockDriverAIOCB common;
+    ThreadPool *pool;
     ThreadPoolFunc *func;
     void *arg;
 
@@ -54,49 +57,56 @@ struct ThreadPoolElement {
     QLIST_ENTRY(ThreadPoolElement) all;
 };
 
-static EventNotifier notifier;
-static QemuMutex lock;
-static QemuCond check_cancel;
-static QemuSemaphore sem;
-static int max_threads = 64;
-static QEMUBH *new_thread_bh;
-
-/* The following variables are protected by the global mutex.  */
-static QLIST_HEAD(, ThreadPoolElement) head;
-
-/* The following variables are protected by lock.  */
-static QTAILQ_HEAD(, ThreadPoolElement) request_list;
-static int cur_threads;
-static int idle_threads;
-static int new_threads;     /* backlog of threads we need to create */
-static int pending_threads; /* threads created but not running yet */
-static int pending_cancellations; /* whether we need a cond_broadcast */
-
-static void *worker_thread(void *unused)
+struct ThreadPool {
+    EventNotifier notifier;
+    QemuMutex lock;
+    QemuCond check_cancel;
+    QemuSemaphore sem;
+    int max_threads;
+    QEMUBH *new_thread_bh;
+
+    /* The following variables are only accessed from one AioContext. */
+    QLIST_HEAD(, ThreadPoolElement) head;
+
+    /* The following variables are protected by lock.  */
+    QTAILQ_HEAD(, ThreadPoolElement) request_list;
+    int cur_threads;
+    int idle_threads;
+    int new_threads;     /* backlog of threads we need to create */
+    int pending_threads; /* threads created but not running yet */
+    int pending_cancellations; /* whether we need a cond_broadcast */
+};
+
+/* Currently there is only one thread pool instance. */
+static ThreadPool global_pool;
+
+static void *worker_thread(void *opaque)
 {
-    qemu_mutex_lock(&lock);
-    pending_threads--;
-    do_spawn_thread();
+    ThreadPool *pool = opaque;
+
+    qemu_mutex_lock(&pool->lock);
+    pool->pending_threads--;
+    do_spawn_thread(pool);
 
     while (1) {
         ThreadPoolElement *req;
         int ret;
 
         do {
-            idle_threads++;
-            qemu_mutex_unlock(&lock);
-            ret = qemu_sem_timedwait(&sem, 10000);
-            qemu_mutex_lock(&lock);
-            idle_threads--;
-        } while (ret == -1 && !QTAILQ_EMPTY(&request_list));
+            pool->idle_threads++;
+            qemu_mutex_unlock(&pool->lock);
+            ret = qemu_sem_timedwait(&pool->sem, 10000);
+            qemu_mutex_lock(&pool->lock);
+            pool->idle_threads--;
+        } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
         if (ret == -1) {
             break;
         }
 
-        req = QTAILQ_FIRST(&request_list);
-        QTAILQ_REMOVE(&request_list, req, reqs);
+        req = QTAILQ_FIRST(&pool->request_list);
+        QTAILQ_REMOVE(&pool->request_list, req, reqs);
         req->state = THREAD_ACTIVE;
-        qemu_mutex_unlock(&lock);
+        qemu_mutex_unlock(&pool->lock);
 
         ret = req->func(req->arg);
 
@@ -105,45 +115,47 @@ static void *worker_thread(void *unused)
         smp_wmb();
         req->state = THREAD_DONE;
 
-        qemu_mutex_lock(&lock);
-        if (pending_cancellations) {
-            qemu_cond_broadcast(&check_cancel);
+        qemu_mutex_lock(&pool->lock);
+        if (pool->pending_cancellations) {
+            qemu_cond_broadcast(&pool->check_cancel);
         }
 
-        event_notifier_set(&notifier);
+        event_notifier_set(&pool->notifier);
     }
 
-    cur_threads--;
-    qemu_mutex_unlock(&lock);
+    pool->cur_threads--;
+    qemu_mutex_unlock(&pool->lock);
     return NULL;
 }
 
-static void do_spawn_thread(void)
+static void do_spawn_thread(ThreadPool *pool)
 {
     QemuThread t;
 
     /* Runs with lock taken.  */
-    if (!new_threads) {
+    if (!pool->new_threads) {
         return;
     }
 
-    new_threads--;
-    pending_threads++;
+    pool->new_threads--;
+    pool->pending_threads++;
 
-    qemu_thread_create(&t, worker_thread, NULL, QEMU_THREAD_DETACHED);
+    qemu_thread_create(&t, worker_thread, pool, QEMU_THREAD_DETACHED);
 }
 
 static void spawn_thread_bh_fn(void *opaque)
 {
-    qemu_mutex_lock(&lock);
-    do_spawn_thread();
-    qemu_mutex_unlock(&lock);
+    ThreadPool *pool = opaque;
+
+    qemu_mutex_lock(&pool->lock);
+    do_spawn_thread(pool);
+    qemu_mutex_unlock(&pool->lock);
 }
 
-static void spawn_thread(void)
+static void spawn_thread(ThreadPool *pool)
 {
-    cur_threads++;
-    new_threads++;
+    pool->cur_threads++;
+    pool->new_threads++;
     /* If there are threads being created, they will spawn new workers, so
      * we don't spend time creating many threads in a loop holding a mutex or
      * starving the current vcpu.
@@ -151,23 +163,25 @@ static void spawn_thread(void)
      * If there are no idle threads, ask the main thread to create one, so we
      * inherit the correct affinity instead of the vcpu affinity.
      */
-    if (!pending_threads) {
-        qemu_bh_schedule(new_thread_bh);
+    if (!pool->pending_threads) {
+        qemu_bh_schedule(pool->new_thread_bh);
     }
 }
 
 static void event_notifier_ready(EventNotifier *notifier)
 {
+    ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
     ThreadPoolElement *elem, *next;
 
     event_notifier_test_and_clear(notifier);
 restart:
-    QLIST_FOREACH_SAFE(elem, &head, all, next) {
+    QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
         if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
             continue;
         }
         if (elem->state == THREAD_DONE) {
-            trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
+            trace_thread_pool_complete(pool, elem, elem->common.opaque,
+                                       elem->ret);
         }
         if (elem->state == THREAD_DONE && elem->common.cb) {
             QLIST_REMOVE(elem, all);
@@ -186,34 +200,36 @@ restart:
 
 static int thread_pool_active(EventNotifier *notifier)
 {
-    return !QLIST_EMPTY(&head);
+    ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
+    return !QLIST_EMPTY(&pool->head);
 }
 
 static void thread_pool_cancel(BlockDriverAIOCB *acb)
 {
     ThreadPoolElement *elem = (ThreadPoolElement *)acb;
+    ThreadPool *pool = elem->pool;
 
     trace_thread_pool_cancel(elem, elem->common.opaque);
 
-    qemu_mutex_lock(&lock);
+    qemu_mutex_lock(&pool->lock);
     if (elem->state == THREAD_QUEUED &&
         /* No thread has yet started working on elem. we can try to "steal"
          * the item from the worker if we can get a signal from the
          * semaphore.  Because this is non-blocking, we can do it with
          * the lock taken and ensure that elem will remain THREAD_QUEUED.
          */
-        qemu_sem_timedwait(&sem, 0) == 0) {
-        QTAILQ_REMOVE(&request_list, elem, reqs);
+        qemu_sem_timedwait(&pool->sem, 0) == 0) {
+        QTAILQ_REMOVE(&pool->request_list, elem, reqs);
         elem->state = THREAD_CANCELED;
-        event_notifier_set(&notifier);
+        event_notifier_set(&pool->notifier);
     } else {
-        pending_cancellations++;
+        pool->pending_cancellations++;
         while (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
-            qemu_cond_wait(&check_cancel, &lock);
+            qemu_cond_wait(&pool->check_cancel, &pool->lock);
         }
-        pending_cancellations--;
+        pool->pending_cancellations--;
     }
-    qemu_mutex_unlock(&lock);
+    qemu_mutex_unlock(&pool->lock);
 }
 
 static const AIOCBInfo thread_pool_aiocb_info = {
@@ -224,24 +240,26 @@ static const AIOCBInfo thread_pool_aiocb_info = {
 BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
         BlockDriverCompletionFunc *cb, void *opaque)
 {
+    ThreadPool *pool = &global_pool;
     ThreadPoolElement *req;
 
     req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
     req->func = func;
     req->arg = arg;
     req->state = THREAD_QUEUED;
+    req->pool = pool;
 
-    QLIST_INSERT_HEAD(&head, req, all);
+    QLIST_INSERT_HEAD(&pool->head, req, all);
 
-    trace_thread_pool_submit(req, arg);
+    trace_thread_pool_submit(pool, req, arg);
 
-    qemu_mutex_lock(&lock);
-    if (idle_threads == 0 && cur_threads < max_threads) {
-        spawn_thread();
+    qemu_mutex_lock(&pool->lock);
+    if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
+        spawn_thread(pool);
     }
-    QTAILQ_INSERT_TAIL(&request_list, req, reqs);
-    qemu_mutex_unlock(&lock);
-    qemu_sem_post(&sem);
+    QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
+    qemu_mutex_unlock(&pool->lock);
+    qemu_sem_post(&pool->sem);
     return &req->common;
 }
 
@@ -272,18 +290,35 @@ void thread_pool_submit(ThreadPoolFunc *func, void *arg)
     thread_pool_submit_aio(func, arg, NULL, NULL);
 }
 
+static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
+{
+    memset(pool, 0, sizeof(*pool));
+    event_notifier_init(&pool->notifier, false);
+    qemu_mutex_init(&pool->lock);
+    qemu_cond_init(&pool->check_cancel);
+    qemu_sem_init(&pool->sem, 0);
+    pool->max_threads = 64;
+    if (ctx) {
+        pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
+    } else {
+        pool->new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, pool);
+    }
+
+    QLIST_INIT(&pool->head);
+    QTAILQ_INIT(&pool->request_list);
+
+    if (ctx) {
+        aio_set_event_notifier(ctx, &pool->notifier, event_notifier_ready,
+                               thread_pool_active);
+    } else {
+        qemu_aio_set_event_notifier(&pool->notifier, event_notifier_ready,
+                                    thread_pool_active);
+    }
+}
+
 static void thread_pool_init(void)
 {
-    QLIST_INIT(&head);
-    event_notifier_init(&notifier, false);
-    qemu_mutex_init(&lock);
-    qemu_cond_init(&check_cancel);
-    qemu_sem_init(&sem, 0);
-    qemu_aio_set_event_notifier(&notifier, event_notifier_ready,
-                                thread_pool_active);
-
-    QTAILQ_INIT(&request_list);
-    new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
+    thread_pool_init_one(&global_pool, NULL);
 }
 
 block_init(thread_pool_init)
diff --git a/trace-events b/trace-events
index a27ae43..f16c021 100644
--- a/trace-events
+++ b/trace-events
@@ -115,8 +115,8 @@ virtio_blk_data_plane_complete_request(void *s, unsigned int head, int ret) "dat
 vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring physical %#"PRIx64" desc %p avail %p used %p"
 
 # thread-pool.c
-thread_pool_submit(void *req, void *opaque) "req %p opaque %p"
-thread_pool_complete(void *req, void *opaque, int ret) "req %p opaque %p ret %d"
+thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p"
+thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d"
 thread_pool_cancel(void *req, void *opaque) "req %p opaque %p"
 
 # posix-aio-compat.c
-- 
1.8.1.4

^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [Qemu-devel] [PATCH 2/5] threadpool: add thread_pool_new() and thread_pool_free()
  2013-03-06 15:45 [Qemu-devel] [PATCH 0/5] threadpool: support multiple ThreadPools Stefan Hajnoczi
  2013-03-06 15:45 ` [Qemu-devel] [PATCH 1/5] threadpool: move globals into struct ThreadPool Stefan Hajnoczi
@ 2013-03-06 15:45 ` Stefan Hajnoczi
  2013-03-06 16:36   ` Paolo Bonzini
  2013-03-06 15:45 ` [Qemu-devel] [PATCH 3/5] aio: add a ThreadPool instance to AioContext Stefan Hajnoczi
                   ` (2 subsequent siblings)
  4 siblings, 1 reply; 14+ messages in thread
From: Stefan Hajnoczi @ 2013-03-06 15:45 UTC (permalink / raw)
  To: qemu-devel; +Cc: Kevin Wolf, Paolo Bonzini, Stefan Hajnoczi

ThreadPool is tied to an AioContext through its event notifier, which
dictates in which AioContext the work item's callback function will be
invoked.

In order to support multiple AioContexts we need to support multiple
ThreadPool instances.

This patch adds the new/free functions.  The free function deserves
special attention because it quiesces remaining worker threads.  This
requires a new condition variable and a "stopping" flag to let workers
know they should terminate once idle.

We never needed to do this before since the global threadpool was not
explicitly destroyed until process termination.

Also stash the AioContext pointer in ThreadPool so that we can call
aio_set_event_notifier() in thread_pool_free().  We didn't need to hold
onto AioContext previously since there was no free function.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 include/block/thread-pool.h |  5 ++++
 thread-pool.c               | 56 +++++++++++++++++++++++++++++++++++++++++----
 2 files changed, 57 insertions(+), 4 deletions(-)

diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
index 200703e..e1453c6 100644
--- a/include/block/thread-pool.h
+++ b/include/block/thread-pool.h
@@ -26,6 +26,11 @@
 
 typedef int ThreadPoolFunc(void *opaque);
 
+typedef struct ThreadPool ThreadPool;
+
+ThreadPool *thread_pool_new(struct AioContext *ctx);
+void thread_pool_free(ThreadPool *pool);
+
 BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
      BlockDriverCompletionFunc *cb, void *opaque);
 int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
diff --git a/thread-pool.c b/thread-pool.c
index 8a957b9..7a07408 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -24,8 +24,6 @@
 #include "qemu/event_notifier.h"
 #include "block/thread-pool.h"
 
-typedef struct ThreadPool ThreadPool;
-
 static void do_spawn_thread(ThreadPool *pool);
 
 typedef struct ThreadPoolElement ThreadPoolElement;
@@ -59,8 +57,10 @@ struct ThreadPoolElement {
 
 struct ThreadPool {
     EventNotifier notifier;
+    AioContext *ctx;
     QemuMutex lock;
     QemuCond check_cancel;
+    QemuCond worker_stopped;
     QemuSemaphore sem;
     int max_threads;
     QEMUBH *new_thread_bh;
@@ -75,6 +75,7 @@ struct ThreadPool {
     int new_threads;     /* backlog of threads we need to create */
     int pending_threads; /* threads created but not running yet */
     int pending_cancellations; /* whether we need a cond_broadcast */
+    bool stopping;
 };
 
 /* Currently there is only one thread pool instance. */
@@ -88,7 +89,7 @@ static void *worker_thread(void *opaque)
     pool->pending_threads--;
     do_spawn_thread(pool);
 
-    while (1) {
+    while (!pool->stopping) {
         ThreadPoolElement *req;
         int ret;
 
@@ -99,7 +100,7 @@ static void *worker_thread(void *opaque)
             qemu_mutex_lock(&pool->lock);
             pool->idle_threads--;
         } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
-        if (ret == -1) {
+        if (ret == -1 || pool->stopping) {
             break;
         }
 
@@ -124,6 +125,7 @@ static void *worker_thread(void *opaque)
     }
 
     pool->cur_threads--;
+    qemu_cond_signal(&pool->worker_stopped);
     qemu_mutex_unlock(&pool->lock);
     return NULL;
 }
@@ -294,8 +296,10 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
 {
     memset(pool, 0, sizeof(*pool));
     event_notifier_init(&pool->notifier, false);
+    pool->ctx = ctx;
     qemu_mutex_init(&pool->lock);
     qemu_cond_init(&pool->check_cancel);
+    qemu_cond_init(&pool->worker_stopped);
     qemu_sem_init(&pool->sem, 0);
     pool->max_threads = 64;
     if (ctx) {
@@ -316,6 +320,50 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
     }
 }
 
+ThreadPool *thread_pool_new(AioContext *ctx)
+{
+    ThreadPool *pool = g_new(ThreadPool, 1);
+    thread_pool_init_one(pool, ctx);
+    return pool;
+}
+
+void thread_pool_free(ThreadPool *pool)
+{
+    if (!pool) {
+        return;
+    }
+
+    assert(QLIST_EMPTY(&pool->head));
+
+    qemu_mutex_lock(&pool->lock);
+
+    /* Stop new threads from spawning */
+    qemu_bh_delete(pool->new_thread_bh);
+    pool->cur_threads -= pool->new_threads;
+    pool->new_threads = 0;
+
+    /* Wait for worker threads to terminate */
+    pool->stopping = true;
+    while (pool->cur_threads > 0) {
+        qemu_sem_post(&pool->sem);
+        qemu_cond_wait(&pool->worker_stopped, &pool->lock);
+    }
+
+    qemu_mutex_unlock(&pool->lock);
+
+    if (pool->ctx) {
+        aio_set_event_notifier(pool->ctx, &pool->notifier, NULL, NULL);
+    } else {
+        qemu_aio_set_event_notifier(&pool->notifier, NULL, NULL);
+    }
+    qemu_sem_destroy(&pool->sem);
+    qemu_cond_destroy(&pool->check_cancel);
+    qemu_cond_destroy(&pool->worker_stopped);
+    qemu_mutex_destroy(&pool->lock);
+    event_notifier_cleanup(&pool->notifier);
+    g_free(pool);
+}
+
 static void thread_pool_init(void)
 {
     thread_pool_init_one(&global_pool, NULL);
-- 
1.8.1.4

^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [Qemu-devel] [PATCH 3/5] aio: add a ThreadPool instance to AioContext
  2013-03-06 15:45 [Qemu-devel] [PATCH 0/5] threadpool: support multiple ThreadPools Stefan Hajnoczi
  2013-03-06 15:45 ` [Qemu-devel] [PATCH 1/5] threadpool: move globals into struct ThreadPool Stefan Hajnoczi
  2013-03-06 15:45 ` [Qemu-devel] [PATCH 2/5] threadpool: add thread_pool_new() and thread_pool_free() Stefan Hajnoczi
@ 2013-03-06 15:45 ` Stefan Hajnoczi
  2013-03-06 17:24   ` Paolo Bonzini
  2013-03-06 15:45 ` [Qemu-devel] [PATCH 4/5] main-loop: add qemu_get_aio_context() Stefan Hajnoczi
  2013-03-06 15:45 ` [Qemu-devel] [PATCH 5/5] threadpool: drop global thread pool Stefan Hajnoczi
  4 siblings, 1 reply; 14+ messages in thread
From: Stefan Hajnoczi @ 2013-03-06 15:45 UTC (permalink / raw)
  To: qemu-devel; +Cc: Kevin Wolf, Paolo Bonzini, Stefan Hajnoczi

This patch adds a ThreadPool to AioContext.  It's possible that some
AioContext instances will never use the ThreadPool, so defer creation
until aio_get_thread_pool().

The reason why AioContext should have the ThreadPool is because the
ThreadPool is bound to a AioContext instance where the work item's
callback function is invoked.  It doesn't make sense to keep the
ThreadPool pointer anywhere other than AioContext.  For example,
block/raw-posix.c can get its AioContext's ThreadPool and submit work.

Special note about headers: I used struct ThreadPool in aio.h because
there is a circular dependency if aio.h includes thread-pool.h.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 async.c             | 11 +++++++++++
 include/block/aio.h |  6 ++++++
 2 files changed, 17 insertions(+)

diff --git a/async.c b/async.c
index f2d47ba..90fe906 100644
--- a/async.c
+++ b/async.c
@@ -24,6 +24,7 @@
 
 #include "qemu-common.h"
 #include "block/aio.h"
+#include "block/thread-pool.h"
 #include "qemu/main-loop.h"
 
 /***********************************************************/
@@ -172,6 +173,7 @@ aio_ctx_finalize(GSource     *source)
 {
     AioContext *ctx = (AioContext *) source;
 
+    thread_pool_free(ctx->thread_pool);
     aio_set_event_notifier(ctx, &ctx->notifier, NULL, NULL);
     event_notifier_cleanup(&ctx->notifier);
     g_array_free(ctx->pollfds, TRUE);
@@ -190,6 +192,14 @@ GSource *aio_get_g_source(AioContext *ctx)
     return &ctx->source;
 }
 
+ThreadPool *aio_get_thread_pool(AioContext *ctx)
+{
+    if (!ctx->thread_pool) {
+        ctx->thread_pool = thread_pool_new(ctx);
+    }
+    return ctx->thread_pool;
+}
+
 void aio_notify(AioContext *ctx)
 {
     event_notifier_set(&ctx->notifier);
@@ -200,6 +210,7 @@ AioContext *aio_context_new(void)
     AioContext *ctx;
     ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
     ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
+    ctx->thread_pool = NULL;
     event_notifier_init(&ctx->notifier, false);
     aio_set_event_notifier(ctx, &ctx->notifier, 
                            (EventNotifierHandler *)
diff --git a/include/block/aio.h b/include/block/aio.h
index 5b54d38..1836793 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -66,6 +66,9 @@ typedef struct AioContext {
 
     /* GPollFDs for aio_poll() */
     GArray *pollfds;
+
+    /* Thread pool for performing work and receiving completion callbacks */
+    struct ThreadPool *thread_pool;
 } AioContext;
 
 /* Returns 1 if there are still outstanding AIO requests; 0 otherwise */
@@ -223,6 +226,9 @@ void aio_set_event_notifier(AioContext *ctx,
  */
 GSource *aio_get_g_source(AioContext *ctx);
 
+/* Return the ThreadPool bound to this AioContext */
+struct ThreadPool *aio_get_thread_pool(AioContext *ctx);
+
 /* Functions to operate on the main QEMU AioContext.  */
 
 bool qemu_aio_wait(void);
-- 
1.8.1.4

^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [Qemu-devel] [PATCH 4/5] main-loop: add qemu_get_aio_context()
  2013-03-06 15:45 [Qemu-devel] [PATCH 0/5] threadpool: support multiple ThreadPools Stefan Hajnoczi
                   ` (2 preceding siblings ...)
  2013-03-06 15:45 ` [Qemu-devel] [PATCH 3/5] aio: add a ThreadPool instance to AioContext Stefan Hajnoczi
@ 2013-03-06 15:45 ` Stefan Hajnoczi
  2013-03-06 17:26   ` Paolo Bonzini
  2013-03-06 15:45 ` [Qemu-devel] [PATCH 5/5] threadpool: drop global thread pool Stefan Hajnoczi
  4 siblings, 1 reply; 14+ messages in thread
From: Stefan Hajnoczi @ 2013-03-06 15:45 UTC (permalink / raw)
  To: qemu-devel; +Cc: Kevin Wolf, Paolo Bonzini, Stefan Hajnoczi

It is very useful to get the main loop AioContext, which is a static
variable in main-loop.c.

I'm not sure whether qemu_get_aio_context() will be necessary in the
future once devices focus on using their own AioContext instead of the
main loop AioContext, but for now it allows us to refactor code to
support multiple AioContext while actually passing the main loop
AioContext.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 include/qemu/main-loop.h | 5 +++++
 main-loop.c              | 5 +++++
 2 files changed, 10 insertions(+)

diff --git a/include/qemu/main-loop.h b/include/qemu/main-loop.h
index 0995288..6f0200a 100644
--- a/include/qemu/main-loop.h
+++ b/include/qemu/main-loop.h
@@ -82,6 +82,11 @@ int qemu_init_main_loop(void);
 int main_loop_wait(int nonblocking);
 
 /**
+ * qemu_get_aio_context: Return the main loop's AioContext
+ */
+AioContext *qemu_get_aio_context(void);
+
+/**
  * qemu_notify_event: Force processing of pending events.
  *
  * Similar to signaling a condition variable, qemu_notify_event forces
diff --git a/main-loop.c b/main-loop.c
index 8c9b58c..eb80ff3 100644
--- a/main-loop.c
+++ b/main-loop.c
@@ -109,6 +109,11 @@ static int qemu_signal_init(void)
 
 static AioContext *qemu_aio_context;
 
+AioContext *qemu_get_aio_context(void)
+{
+    return qemu_aio_context;
+}
+
 void qemu_notify_event(void)
 {
     if (!qemu_aio_context) {
-- 
1.8.1.4

^ permalink raw reply related	[flat|nested] 14+ messages in thread

* [Qemu-devel] [PATCH 5/5] threadpool: drop global thread pool
  2013-03-06 15:45 [Qemu-devel] [PATCH 0/5] threadpool: support multiple ThreadPools Stefan Hajnoczi
                   ` (3 preceding siblings ...)
  2013-03-06 15:45 ` [Qemu-devel] [PATCH 4/5] main-loop: add qemu_get_aio_context() Stefan Hajnoczi
@ 2013-03-06 15:45 ` Stefan Hajnoczi
  2013-03-06 16:35   ` Paolo Bonzini
  4 siblings, 1 reply; 14+ messages in thread
From: Stefan Hajnoczi @ 2013-03-06 15:45 UTC (permalink / raw)
  To: qemu-devel; +Cc: Kevin Wolf, Paolo Bonzini, Stefan Hajnoczi

Now that each AioContext has a ThreadPool and the main loop AioContext
can be fetched with qemu_get_aio_context(), we can eliminate the concept
of a global thread pool from thread-pool.c.

The submit functions must take a ThreadPool* argument.

block/raw-posix.c and block/raw-win32.c use
aio_get_thread_pool(qemu_get_aio_context()) to fetch the main loop's
ThreadPool.

tests/test-thread-pool.c must be updated to reflect the new
thread_pool_submit() function prototypes.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 block/raw-posix.c           |  8 ++++++--
 block/raw-win32.c           |  4 +++-
 include/block/thread-pool.h | 10 ++++++----
 tests/test-thread-pool.c    | 44 +++++++++++++++++++++-----------------------
 thread-pool.c               | 23 +++++++----------------
 5 files changed, 43 insertions(+), 46 deletions(-)

diff --git a/block/raw-posix.c b/block/raw-posix.c
index 4dfdf98..01e5ae8 100644
--- a/block/raw-posix.c
+++ b/block/raw-posix.c
@@ -750,6 +750,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
         BlockDriverCompletionFunc *cb, void *opaque, int type)
 {
     RawPosixAIOData *acb = g_slice_new(RawPosixAIOData);
+    ThreadPool *pool;
 
     acb->bs = bs;
     acb->aio_type = type;
@@ -763,7 +764,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
     acb->aio_offset = sector_num * 512;
 
     trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
-    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
+    pool = aio_get_thread_pool(qemu_get_aio_context());
+    return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
 }
 
 static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs,
@@ -1413,6 +1415,7 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs,
 {
     BDRVRawState *s = bs->opaque;
     RawPosixAIOData *acb;
+    ThreadPool *pool;
 
     if (fd_open(bs) < 0)
         return NULL;
@@ -1424,7 +1427,8 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs,
     acb->aio_offset = 0;
     acb->aio_ioctl_buf = buf;
     acb->aio_ioctl_cmd = req;
-    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
+    pool = aio_get_thread_pool(qemu_get_aio_context());
+    return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
 }
 
 #elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__)
diff --git a/block/raw-win32.c b/block/raw-win32.c
index b89ac19..515614b 100644
--- a/block/raw-win32.c
+++ b/block/raw-win32.c
@@ -144,6 +144,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
         BlockDriverCompletionFunc *cb, void *opaque, int type)
 {
     RawWin32AIOData *acb = g_slice_new(RawWin32AIOData);
+    ThreadPool *pool;
 
     acb->bs = bs;
     acb->hfile = hfile;
@@ -157,7 +158,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
     acb->aio_offset = sector_num * 512;
 
     trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
-    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
+    pool = aio_get_thread_pool(qemu_get_aio_context());
+    return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
 }
 
 int qemu_ftruncate64(int fd, int64_t length)
diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
index e1453c6..32afcdd 100644
--- a/include/block/thread-pool.h
+++ b/include/block/thread-pool.h
@@ -31,9 +31,11 @@ typedef struct ThreadPool ThreadPool;
 ThreadPool *thread_pool_new(struct AioContext *ctx);
 void thread_pool_free(ThreadPool *pool);
 
-BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
-     BlockDriverCompletionFunc *cb, void *opaque);
-int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
-void thread_pool_submit(ThreadPoolFunc *func, void *arg);
+BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
+        ThreadPoolFunc *func, void *arg,
+        BlockDriverCompletionFunc *cb, void *opaque);
+int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
+        ThreadPoolFunc *func, void *arg);
+void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
 
 #endif
diff --git a/tests/test-thread-pool.c b/tests/test-thread-pool.c
index 9998e03..22915aa 100644
--- a/tests/test-thread-pool.c
+++ b/tests/test-thread-pool.c
@@ -4,6 +4,8 @@
 #include "block/thread-pool.h"
 #include "block/block.h"
 
+static AioContext *ctx;
+static ThreadPool *pool;
 static int active;
 
 typedef struct {
@@ -38,19 +40,10 @@ static void done_cb(void *opaque, int ret)
     active--;
 }
 
-/* A non-blocking poll of the main AIO context (we cannot use aio_poll
- * because we do not know the AioContext).
- */
-static void qemu_aio_wait_nonblocking(void)
-{
-    qemu_notify_event();
-    qemu_aio_wait();
-}
-
 /* Wait until all aio and bh activity has finished */
 static void qemu_aio_wait_all(void)
 {
-    while (qemu_aio_wait()) {
+    while (aio_poll(ctx, true)) {
         /* Do nothing */
     }
 }
@@ -58,7 +51,7 @@ static void qemu_aio_wait_all(void)
 static void test_submit(void)
 {
     WorkerTestData data = { .n = 0 };
-    thread_pool_submit(worker_cb, &data);
+    thread_pool_submit(pool, worker_cb, &data);
     qemu_aio_wait_all();
     g_assert_cmpint(data.n, ==, 1);
 }
@@ -66,7 +59,8 @@ static void test_submit(void)
 static void test_submit_aio(void)
 {
     WorkerTestData data = { .n = 0, .ret = -EINPROGRESS };
-    data.aiocb = thread_pool_submit_aio(worker_cb, &data, done_cb, &data);
+    data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data,
+                                        done_cb, &data);
 
     /* The callbacks are not called until after the first wait.  */
     active = 1;
@@ -84,7 +78,7 @@ static void co_test_cb(void *opaque)
     active = 1;
     data->n = 0;
     data->ret = -EINPROGRESS;
-    thread_pool_submit_co(worker_cb, data);
+    thread_pool_submit_co(pool, worker_cb, data);
 
     /* The test continues in test_submit_co, after qemu_coroutine_enter... */
 
@@ -126,12 +120,12 @@ static void test_submit_many(void)
     for (i = 0; i < 100; i++) {
         data[i].n = 0;
         data[i].ret = -EINPROGRESS;
-        thread_pool_submit_aio(worker_cb, &data[i], done_cb, &data[i]);
+        thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]);
     }
 
     active = 100;
     while (active > 0) {
-        qemu_aio_wait();
+        aio_poll(ctx, true);
     }
     for (i = 0; i < 100; i++) {
         g_assert_cmpint(data[i].n, ==, 1);
@@ -154,7 +148,7 @@ static void test_cancel(void)
     for (i = 0; i < 100; i++) {
         data[i].n = 0;
         data[i].ret = -EINPROGRESS;
-        data[i].aiocb = thread_pool_submit_aio(long_cb, &data[i],
+        data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i],
                                                done_cb, &data[i]);
     }
 
@@ -162,7 +156,8 @@ static void test_cancel(void)
      * run, but do not waste too much time...
      */
     active = 100;
-    qemu_aio_wait_nonblocking();
+    aio_notify(ctx);
+    aio_poll(ctx, false);
 
     /* Wait some time for the threads to start, with some sanity
      * testing on the behavior of the scheduler...
@@ -208,11 +203,10 @@ static void test_cancel(void)
 
 int main(int argc, char **argv)
 {
-    /* These should be removed once each AioContext has its thread pool.
-     * The test should create its own AioContext.
-     */
-    qemu_init_main_loop();
-    bdrv_init();
+    int ret;
+
+    ctx = aio_context_new();
+    pool = aio_get_thread_pool(ctx);
 
     g_test_init(&argc, &argv, NULL);
     g_test_add_func("/thread-pool/submit", test_submit);
@@ -220,5 +214,9 @@ int main(int argc, char **argv)
     g_test_add_func("/thread-pool/submit-co", test_submit_co);
     g_test_add_func("/thread-pool/submit-many", test_submit_many);
     g_test_add_func("/thread-pool/cancel", test_cancel);
-    return g_test_run();
+
+    ret = g_test_run();
+
+    aio_context_unref(ctx);
+    return ret;
 }
diff --git a/thread-pool.c b/thread-pool.c
index 7a07408..e0e0a47 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -78,9 +78,6 @@ struct ThreadPool {
     bool stopping;
 };
 
-/* Currently there is only one thread pool instance. */
-static ThreadPool global_pool;
-
 static void *worker_thread(void *opaque)
 {
     ThreadPool *pool = opaque;
@@ -239,10 +236,10 @@ static const AIOCBInfo thread_pool_aiocb_info = {
     .cancel             = thread_pool_cancel,
 };
 
-BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
+BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
+        ThreadPoolFunc *func, void *arg,
         BlockDriverCompletionFunc *cb, void *opaque)
 {
-    ThreadPool *pool = &global_pool;
     ThreadPoolElement *req;
 
     req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
@@ -278,18 +275,19 @@ static void thread_pool_co_cb(void *opaque, int ret)
     qemu_coroutine_enter(co->co, NULL);
 }
 
-int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
+int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
+                                       void *arg)
 {
     ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
     assert(qemu_in_coroutine());
-    thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
+    thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
     qemu_coroutine_yield();
     return tpc.ret;
 }
 
-void thread_pool_submit(ThreadPoolFunc *func, void *arg)
+void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
 {
-    thread_pool_submit_aio(func, arg, NULL, NULL);
+    thread_pool_submit_aio(pool, func, arg, NULL, NULL);
 }
 
 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
@@ -363,10 +361,3 @@ void thread_pool_free(ThreadPool *pool)
     event_notifier_cleanup(&pool->notifier);
     g_free(pool);
 }
-
-static void thread_pool_init(void)
-{
-    thread_pool_init_one(&global_pool, NULL);
-}
-
-block_init(thread_pool_init)
-- 
1.8.1.4

^ permalink raw reply related	[flat|nested] 14+ messages in thread

* Re: [Qemu-devel] [PATCH 5/5] threadpool: drop global thread pool
  2013-03-06 15:45 ` [Qemu-devel] [PATCH 5/5] threadpool: drop global thread pool Stefan Hajnoczi
@ 2013-03-06 16:35   ` Paolo Bonzini
  2013-03-07 10:07     ` Stefan Hajnoczi
  0 siblings, 1 reply; 14+ messages in thread
From: Paolo Bonzini @ 2013-03-06 16:35 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: Kevin Wolf, qemu-devel

Il 06/03/2013 16:45, Stefan Hajnoczi ha scritto:
> Now that each AioContext has a ThreadPool and the main loop AioContext
> can be fetched with qemu_get_aio_context(), we can eliminate the concept
> of a global thread pool from thread-pool.c.
> 
> The submit functions must take a ThreadPool* argument.

This is certainly ok for thread-pool.c.  For raw-posix and raw-win32,
what about adding already a bdrv_get_aio_context() function and using
that in paio_submit?  Is it putting the cart before the horse?

Paolo

> block/raw-posix.c and block/raw-win32.c use
> aio_get_thread_pool(qemu_get_aio_context()) to fetch the main loop's
> ThreadPool.
> 
> tests/test-thread-pool.c must be updated to reflect the new
> thread_pool_submit() function prototypes.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
>  block/raw-posix.c           |  8 ++++++--
>  block/raw-win32.c           |  4 +++-
>  include/block/thread-pool.h | 10 ++++++----
>  tests/test-thread-pool.c    | 44 +++++++++++++++++++++-----------------------
>  thread-pool.c               | 23 +++++++----------------
>  5 files changed, 43 insertions(+), 46 deletions(-)
> 
> diff --git a/block/raw-posix.c b/block/raw-posix.c
> index 4dfdf98..01e5ae8 100644
> --- a/block/raw-posix.c
> +++ b/block/raw-posix.c
> @@ -750,6 +750,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
>          BlockDriverCompletionFunc *cb, void *opaque, int type)
>  {
>      RawPosixAIOData *acb = g_slice_new(RawPosixAIOData);
> +    ThreadPool *pool;
>  
>      acb->bs = bs;
>      acb->aio_type = type;
> @@ -763,7 +764,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
>      acb->aio_offset = sector_num * 512;
>  
>      trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
> -    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
> +    pool = aio_get_thread_pool(qemu_get_aio_context());
> +    return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
>  }
>  
>  static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs,
> @@ -1413,6 +1415,7 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs,
>  {
>      BDRVRawState *s = bs->opaque;
>      RawPosixAIOData *acb;
> +    ThreadPool *pool;
>  
>      if (fd_open(bs) < 0)
>          return NULL;
> @@ -1424,7 +1427,8 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs,
>      acb->aio_offset = 0;
>      acb->aio_ioctl_buf = buf;
>      acb->aio_ioctl_cmd = req;
> -    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
> +    pool = aio_get_thread_pool(qemu_get_aio_context());
> +    return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
>  }
>  
>  #elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__)
> diff --git a/block/raw-win32.c b/block/raw-win32.c
> index b89ac19..515614b 100644
> --- a/block/raw-win32.c
> +++ b/block/raw-win32.c
> @@ -144,6 +144,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
>          BlockDriverCompletionFunc *cb, void *opaque, int type)
>  {
>      RawWin32AIOData *acb = g_slice_new(RawWin32AIOData);
> +    ThreadPool *pool;
>  
>      acb->bs = bs;
>      acb->hfile = hfile;
> @@ -157,7 +158,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
>      acb->aio_offset = sector_num * 512;
>  
>      trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
> -    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
> +    pool = aio_get_thread_pool(qemu_get_aio_context());
> +    return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
>  }
>  
>  int qemu_ftruncate64(int fd, int64_t length)
> diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
> index e1453c6..32afcdd 100644
> --- a/include/block/thread-pool.h
> +++ b/include/block/thread-pool.h
> @@ -31,9 +31,11 @@ typedef struct ThreadPool ThreadPool;
>  ThreadPool *thread_pool_new(struct AioContext *ctx);
>  void thread_pool_free(ThreadPool *pool);
>  
> -BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
> -     BlockDriverCompletionFunc *cb, void *opaque);
> -int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
> -void thread_pool_submit(ThreadPoolFunc *func, void *arg);
> +BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
> +        ThreadPoolFunc *func, void *arg,
> +        BlockDriverCompletionFunc *cb, void *opaque);
> +int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
> +        ThreadPoolFunc *func, void *arg);
> +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
>  
>  #endif
> diff --git a/tests/test-thread-pool.c b/tests/test-thread-pool.c
> index 9998e03..22915aa 100644
> --- a/tests/test-thread-pool.c
> +++ b/tests/test-thread-pool.c
> @@ -4,6 +4,8 @@
>  #include "block/thread-pool.h"
>  #include "block/block.h"
>  
> +static AioContext *ctx;
> +static ThreadPool *pool;
>  static int active;
>  
>  typedef struct {
> @@ -38,19 +40,10 @@ static void done_cb(void *opaque, int ret)
>      active--;
>  }
>  
> -/* A non-blocking poll of the main AIO context (we cannot use aio_poll
> - * because we do not know the AioContext).
> - */
> -static void qemu_aio_wait_nonblocking(void)
> -{
> -    qemu_notify_event();
> -    qemu_aio_wait();
> -}
> -
>  /* Wait until all aio and bh activity has finished */
>  static void qemu_aio_wait_all(void)
>  {
> -    while (qemu_aio_wait()) {
> +    while (aio_poll(ctx, true)) {
>          /* Do nothing */
>      }
>  }
> @@ -58,7 +51,7 @@ static void qemu_aio_wait_all(void)
>  static void test_submit(void)
>  {
>      WorkerTestData data = { .n = 0 };
> -    thread_pool_submit(worker_cb, &data);
> +    thread_pool_submit(pool, worker_cb, &data);
>      qemu_aio_wait_all();
>      g_assert_cmpint(data.n, ==, 1);
>  }
> @@ -66,7 +59,8 @@ static void test_submit(void)
>  static void test_submit_aio(void)
>  {
>      WorkerTestData data = { .n = 0, .ret = -EINPROGRESS };
> -    data.aiocb = thread_pool_submit_aio(worker_cb, &data, done_cb, &data);
> +    data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data,
> +                                        done_cb, &data);
>  
>      /* The callbacks are not called until after the first wait.  */
>      active = 1;
> @@ -84,7 +78,7 @@ static void co_test_cb(void *opaque)
>      active = 1;
>      data->n = 0;
>      data->ret = -EINPROGRESS;
> -    thread_pool_submit_co(worker_cb, data);
> +    thread_pool_submit_co(pool, worker_cb, data);
>  
>      /* The test continues in test_submit_co, after qemu_coroutine_enter... */
>  
> @@ -126,12 +120,12 @@ static void test_submit_many(void)
>      for (i = 0; i < 100; i++) {
>          data[i].n = 0;
>          data[i].ret = -EINPROGRESS;
> -        thread_pool_submit_aio(worker_cb, &data[i], done_cb, &data[i]);
> +        thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]);
>      }
>  
>      active = 100;
>      while (active > 0) {
> -        qemu_aio_wait();
> +        aio_poll(ctx, true);
>      }
>      for (i = 0; i < 100; i++) {
>          g_assert_cmpint(data[i].n, ==, 1);
> @@ -154,7 +148,7 @@ static void test_cancel(void)
>      for (i = 0; i < 100; i++) {
>          data[i].n = 0;
>          data[i].ret = -EINPROGRESS;
> -        data[i].aiocb = thread_pool_submit_aio(long_cb, &data[i],
> +        data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i],
>                                                 done_cb, &data[i]);
>      }
>  
> @@ -162,7 +156,8 @@ static void test_cancel(void)
>       * run, but do not waste too much time...
>       */
>      active = 100;
> -    qemu_aio_wait_nonblocking();
> +    aio_notify(ctx);
> +    aio_poll(ctx, false);
>  
>      /* Wait some time for the threads to start, with some sanity
>       * testing on the behavior of the scheduler...
> @@ -208,11 +203,10 @@ static void test_cancel(void)
>  
>  int main(int argc, char **argv)
>  {
> -    /* These should be removed once each AioContext has its thread pool.
> -     * The test should create its own AioContext.
> -     */
> -    qemu_init_main_loop();
> -    bdrv_init();
> +    int ret;
> +
> +    ctx = aio_context_new();
> +    pool = aio_get_thread_pool(ctx);
>  
>      g_test_init(&argc, &argv, NULL);
>      g_test_add_func("/thread-pool/submit", test_submit);
> @@ -220,5 +214,9 @@ int main(int argc, char **argv)
>      g_test_add_func("/thread-pool/submit-co", test_submit_co);
>      g_test_add_func("/thread-pool/submit-many", test_submit_many);
>      g_test_add_func("/thread-pool/cancel", test_cancel);
> -    return g_test_run();
> +
> +    ret = g_test_run();
> +
> +    aio_context_unref(ctx);
> +    return ret;
>  }
> diff --git a/thread-pool.c b/thread-pool.c
> index 7a07408..e0e0a47 100644
> --- a/thread-pool.c
> +++ b/thread-pool.c
> @@ -78,9 +78,6 @@ struct ThreadPool {
>      bool stopping;
>  };
>  
> -/* Currently there is only one thread pool instance. */
> -static ThreadPool global_pool;
> -
>  static void *worker_thread(void *opaque)
>  {
>      ThreadPool *pool = opaque;
> @@ -239,10 +236,10 @@ static const AIOCBInfo thread_pool_aiocb_info = {
>      .cancel             = thread_pool_cancel,
>  };
>  
> -BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
> +BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
> +        ThreadPoolFunc *func, void *arg,
>          BlockDriverCompletionFunc *cb, void *opaque)
>  {
> -    ThreadPool *pool = &global_pool;
>      ThreadPoolElement *req;
>  
>      req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
> @@ -278,18 +275,19 @@ static void thread_pool_co_cb(void *opaque, int ret)
>      qemu_coroutine_enter(co->co, NULL);
>  }
>  
> -int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
> +int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
> +                                       void *arg)
>  {
>      ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
>      assert(qemu_in_coroutine());
> -    thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
> +    thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
>      qemu_coroutine_yield();
>      return tpc.ret;
>  }
>  
> -void thread_pool_submit(ThreadPoolFunc *func, void *arg)
> +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
>  {
> -    thread_pool_submit_aio(func, arg, NULL, NULL);
> +    thread_pool_submit_aio(pool, func, arg, NULL, NULL);
>  }
>  
>  static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
> @@ -363,10 +361,3 @@ void thread_pool_free(ThreadPool *pool)
>      event_notifier_cleanup(&pool->notifier);
>      g_free(pool);
>  }
> -
> -static void thread_pool_init(void)
> -{
> -    thread_pool_init_one(&global_pool, NULL);
> -}
> -
> -block_init(thread_pool_init)
> 

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Qemu-devel] [PATCH 2/5] threadpool: add thread_pool_new() and thread_pool_free()
  2013-03-06 15:45 ` [Qemu-devel] [PATCH 2/5] threadpool: add thread_pool_new() and thread_pool_free() Stefan Hajnoczi
@ 2013-03-06 16:36   ` Paolo Bonzini
  0 siblings, 0 replies; 14+ messages in thread
From: Paolo Bonzini @ 2013-03-06 16:36 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: Kevin Wolf, qemu-devel

Il 06/03/2013 16:45, Stefan Hajnoczi ha scritto:
> +    if (pool->ctx) {
> +        aio_set_event_notifier(pool->ctx, &pool->notifier, NULL, NULL);
> +    } else {
> +        qemu_aio_set_event_notifier(&pool->notifier, NULL, NULL);
> +    }

I have the same complaint about ->ctx here.  I think it's nicer to pick
the right context in the creation function rather than every time you
use pool->ctx (allowing to pass in NULL is fine).

Also, when you drop the global ThreadPool each pool is guaranteed to
have an AioContext and the special casing can be dropped completely.

Paolo

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Qemu-devel] [PATCH 3/5] aio: add a ThreadPool instance to AioContext
  2013-03-06 15:45 ` [Qemu-devel] [PATCH 3/5] aio: add a ThreadPool instance to AioContext Stefan Hajnoczi
@ 2013-03-06 17:24   ` Paolo Bonzini
  2013-03-07 10:02     ` Stefan Hajnoczi
  0 siblings, 1 reply; 14+ messages in thread
From: Paolo Bonzini @ 2013-03-06 17:24 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: Kevin Wolf, qemu-devel

Il 06/03/2013 16:45, Stefan Hajnoczi ha scritto:
> This patch adds a ThreadPool to AioContext.  It's possible that some
> AioContext instances will never use the ThreadPool, so defer creation
> until aio_get_thread_pool().

What lock should protect against doing this twice?

> The reason why AioContext should have the ThreadPool is because the
> ThreadPool is bound to a AioContext instance where the work item's
> callback function is invoked.  It doesn't make sense to keep the
> ThreadPool pointer anywhere other than AioContext.  For example,
> block/raw-posix.c can get its AioContext's ThreadPool and submit work.
> 
> Special note about headers: I used struct ThreadPool in aio.h because
> there is a circular dependency if aio.h includes thread-pool.h.

Otherwise,

Reviewed-by: Paolo Bonzini <pbonzini@redhat.com>

Paolo

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Qemu-devel] [PATCH 1/5] threadpool: move globals into struct ThreadPool
  2013-03-06 15:45 ` [Qemu-devel] [PATCH 1/5] threadpool: move globals into struct ThreadPool Stefan Hajnoczi
@ 2013-03-06 17:25   ` Paolo Bonzini
  2013-03-08  3:18   ` Wenchao Xia
  1 sibling, 0 replies; 14+ messages in thread
From: Paolo Bonzini @ 2013-03-06 17:25 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: Kevin Wolf, qemu-devel

Il 06/03/2013 16:45, Stefan Hajnoczi ha scritto:
> Move global variables into a struct so multiple thread pools can be
> supported in the future.
> 
> This patch does not change thread-pool.h interfaces.  There is still a
> global thread pool and it is not yet possible to create/destroy
> individual thread pools.  Moving the variables into a struct first makes
> later patches easier to review.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
>  thread-pool.c | 195 ++++++++++++++++++++++++++++++++++------------------------
>  trace-events  |   4 +-
>  2 files changed, 117 insertions(+), 82 deletions(-)
> 
> diff --git a/thread-pool.c b/thread-pool.c
> index e3ca64d..8a957b9 100644
> --- a/thread-pool.c
> +++ b/thread-pool.c
> @@ -24,7 +24,9 @@
>  #include "qemu/event_notifier.h"
>  #include "block/thread-pool.h"
>  
> -static void do_spawn_thread(void);
> +typedef struct ThreadPool ThreadPool;
> +
> +static void do_spawn_thread(ThreadPool *pool);
>  
>  typedef struct ThreadPoolElement ThreadPoolElement;
>  
> @@ -37,6 +39,7 @@ enum ThreadState {
>  
>  struct ThreadPoolElement {
>      BlockDriverAIOCB common;
> +    ThreadPool *pool;
>      ThreadPoolFunc *func;
>      void *arg;
>  
> @@ -54,49 +57,56 @@ struct ThreadPoolElement {
>      QLIST_ENTRY(ThreadPoolElement) all;
>  };
>  
> -static EventNotifier notifier;
> -static QemuMutex lock;
> -static QemuCond check_cancel;
> -static QemuSemaphore sem;
> -static int max_threads = 64;
> -static QEMUBH *new_thread_bh;
> -
> -/* The following variables are protected by the global mutex.  */
> -static QLIST_HEAD(, ThreadPoolElement) head;
> -
> -/* The following variables are protected by lock.  */
> -static QTAILQ_HEAD(, ThreadPoolElement) request_list;
> -static int cur_threads;
> -static int idle_threads;
> -static int new_threads;     /* backlog of threads we need to create */
> -static int pending_threads; /* threads created but not running yet */
> -static int pending_cancellations; /* whether we need a cond_broadcast */
> -
> -static void *worker_thread(void *unused)
> +struct ThreadPool {
> +    EventNotifier notifier;
> +    QemuMutex lock;
> +    QemuCond check_cancel;
> +    QemuSemaphore sem;
> +    int max_threads;
> +    QEMUBH *new_thread_bh;
> +
> +    /* The following variables are only accessed from one AioContext. */
> +    QLIST_HEAD(, ThreadPoolElement) head;
> +
> +    /* The following variables are protected by lock.  */
> +    QTAILQ_HEAD(, ThreadPoolElement) request_list;
> +    int cur_threads;
> +    int idle_threads;
> +    int new_threads;     /* backlog of threads we need to create */
> +    int pending_threads; /* threads created but not running yet */
> +    int pending_cancellations; /* whether we need a cond_broadcast */
> +};
> +
> +/* Currently there is only one thread pool instance. */
> +static ThreadPool global_pool;
> +
> +static void *worker_thread(void *opaque)
>  {
> -    qemu_mutex_lock(&lock);
> -    pending_threads--;
> -    do_spawn_thread();
> +    ThreadPool *pool = opaque;
> +
> +    qemu_mutex_lock(&pool->lock);
> +    pool->pending_threads--;
> +    do_spawn_thread(pool);
>  
>      while (1) {
>          ThreadPoolElement *req;
>          int ret;
>  
>          do {
> -            idle_threads++;
> -            qemu_mutex_unlock(&lock);
> -            ret = qemu_sem_timedwait(&sem, 10000);
> -            qemu_mutex_lock(&lock);
> -            idle_threads--;
> -        } while (ret == -1 && !QTAILQ_EMPTY(&request_list));
> +            pool->idle_threads++;
> +            qemu_mutex_unlock(&pool->lock);
> +            ret = qemu_sem_timedwait(&pool->sem, 10000);
> +            qemu_mutex_lock(&pool->lock);
> +            pool->idle_threads--;
> +        } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
>          if (ret == -1) {
>              break;
>          }
>  
> -        req = QTAILQ_FIRST(&request_list);
> -        QTAILQ_REMOVE(&request_list, req, reqs);
> +        req = QTAILQ_FIRST(&pool->request_list);
> +        QTAILQ_REMOVE(&pool->request_list, req, reqs);
>          req->state = THREAD_ACTIVE;
> -        qemu_mutex_unlock(&lock);
> +        qemu_mutex_unlock(&pool->lock);
>  
>          ret = req->func(req->arg);
>  
> @@ -105,45 +115,47 @@ static void *worker_thread(void *unused)
>          smp_wmb();
>          req->state = THREAD_DONE;
>  
> -        qemu_mutex_lock(&lock);
> -        if (pending_cancellations) {
> -            qemu_cond_broadcast(&check_cancel);
> +        qemu_mutex_lock(&pool->lock);
> +        if (pool->pending_cancellations) {
> +            qemu_cond_broadcast(&pool->check_cancel);
>          }
>  
> -        event_notifier_set(&notifier);
> +        event_notifier_set(&pool->notifier);
>      }
>  
> -    cur_threads--;
> -    qemu_mutex_unlock(&lock);
> +    pool->cur_threads--;
> +    qemu_mutex_unlock(&pool->lock);
>      return NULL;
>  }
>  
> -static void do_spawn_thread(void)
> +static void do_spawn_thread(ThreadPool *pool)
>  {
>      QemuThread t;
>  
>      /* Runs with lock taken.  */
> -    if (!new_threads) {
> +    if (!pool->new_threads) {
>          return;
>      }
>  
> -    new_threads--;
> -    pending_threads++;
> +    pool->new_threads--;
> +    pool->pending_threads++;
>  
> -    qemu_thread_create(&t, worker_thread, NULL, QEMU_THREAD_DETACHED);
> +    qemu_thread_create(&t, worker_thread, pool, QEMU_THREAD_DETACHED);
>  }
>  
>  static void spawn_thread_bh_fn(void *opaque)
>  {
> -    qemu_mutex_lock(&lock);
> -    do_spawn_thread();
> -    qemu_mutex_unlock(&lock);
> +    ThreadPool *pool = opaque;
> +
> +    qemu_mutex_lock(&pool->lock);
> +    do_spawn_thread(pool);
> +    qemu_mutex_unlock(&pool->lock);
>  }
>  
> -static void spawn_thread(void)
> +static void spawn_thread(ThreadPool *pool)
>  {
> -    cur_threads++;
> -    new_threads++;
> +    pool->cur_threads++;
> +    pool->new_threads++;
>      /* If there are threads being created, they will spawn new workers, so
>       * we don't spend time creating many threads in a loop holding a mutex or
>       * starving the current vcpu.
> @@ -151,23 +163,25 @@ static void spawn_thread(void)
>       * If there are no idle threads, ask the main thread to create one, so we
>       * inherit the correct affinity instead of the vcpu affinity.
>       */
> -    if (!pending_threads) {
> -        qemu_bh_schedule(new_thread_bh);
> +    if (!pool->pending_threads) {
> +        qemu_bh_schedule(pool->new_thread_bh);
>      }
>  }
>  
>  static void event_notifier_ready(EventNotifier *notifier)
>  {
> +    ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
>      ThreadPoolElement *elem, *next;
>  
>      event_notifier_test_and_clear(notifier);
>  restart:
> -    QLIST_FOREACH_SAFE(elem, &head, all, next) {
> +    QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
>          if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
>              continue;
>          }
>          if (elem->state == THREAD_DONE) {
> -            trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
> +            trace_thread_pool_complete(pool, elem, elem->common.opaque,
> +                                       elem->ret);
>          }
>          if (elem->state == THREAD_DONE && elem->common.cb) {
>              QLIST_REMOVE(elem, all);
> @@ -186,34 +200,36 @@ restart:
>  
>  static int thread_pool_active(EventNotifier *notifier)
>  {
> -    return !QLIST_EMPTY(&head);
> +    ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
> +    return !QLIST_EMPTY(&pool->head);
>  }
>  
>  static void thread_pool_cancel(BlockDriverAIOCB *acb)
>  {
>      ThreadPoolElement *elem = (ThreadPoolElement *)acb;
> +    ThreadPool *pool = elem->pool;
>  
>      trace_thread_pool_cancel(elem, elem->common.opaque);
>  
> -    qemu_mutex_lock(&lock);
> +    qemu_mutex_lock(&pool->lock);
>      if (elem->state == THREAD_QUEUED &&
>          /* No thread has yet started working on elem. we can try to "steal"
>           * the item from the worker if we can get a signal from the
>           * semaphore.  Because this is non-blocking, we can do it with
>           * the lock taken and ensure that elem will remain THREAD_QUEUED.
>           */
> -        qemu_sem_timedwait(&sem, 0) == 0) {
> -        QTAILQ_REMOVE(&request_list, elem, reqs);
> +        qemu_sem_timedwait(&pool->sem, 0) == 0) {
> +        QTAILQ_REMOVE(&pool->request_list, elem, reqs);
>          elem->state = THREAD_CANCELED;
> -        event_notifier_set(&notifier);
> +        event_notifier_set(&pool->notifier);
>      } else {
> -        pending_cancellations++;
> +        pool->pending_cancellations++;
>          while (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
> -            qemu_cond_wait(&check_cancel, &lock);
> +            qemu_cond_wait(&pool->check_cancel, &pool->lock);
>          }
> -        pending_cancellations--;
> +        pool->pending_cancellations--;
>      }
> -    qemu_mutex_unlock(&lock);
> +    qemu_mutex_unlock(&pool->lock);
>  }
>  
>  static const AIOCBInfo thread_pool_aiocb_info = {
> @@ -224,24 +240,26 @@ static const AIOCBInfo thread_pool_aiocb_info = {
>  BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
>          BlockDriverCompletionFunc *cb, void *opaque)
>  {
> +    ThreadPool *pool = &global_pool;
>      ThreadPoolElement *req;
>  
>      req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
>      req->func = func;
>      req->arg = arg;
>      req->state = THREAD_QUEUED;
> +    req->pool = pool;
>  
> -    QLIST_INSERT_HEAD(&head, req, all);
> +    QLIST_INSERT_HEAD(&pool->head, req, all);
>  
> -    trace_thread_pool_submit(req, arg);
> +    trace_thread_pool_submit(pool, req, arg);
>  
> -    qemu_mutex_lock(&lock);
> -    if (idle_threads == 0 && cur_threads < max_threads) {
> -        spawn_thread();
> +    qemu_mutex_lock(&pool->lock);
> +    if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
> +        spawn_thread(pool);
>      }
> -    QTAILQ_INSERT_TAIL(&request_list, req, reqs);
> -    qemu_mutex_unlock(&lock);
> -    qemu_sem_post(&sem);
> +    QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
> +    qemu_mutex_unlock(&pool->lock);
> +    qemu_sem_post(&pool->sem);
>      return &req->common;
>  }
>  
> @@ -272,18 +290,35 @@ void thread_pool_submit(ThreadPoolFunc *func, void *arg)
>      thread_pool_submit_aio(func, arg, NULL, NULL);
>  }
>  
> +static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
> +{
> +    memset(pool, 0, sizeof(*pool));
> +    event_notifier_init(&pool->notifier, false);
> +    qemu_mutex_init(&pool->lock);
> +    qemu_cond_init(&pool->check_cancel);
> +    qemu_sem_init(&pool->sem, 0);
> +    pool->max_threads = 64;
> +    if (ctx) {
> +        pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
> +    } else {
> +        pool->new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, pool);
> +    }
> +
> +    QLIST_INIT(&pool->head);
> +    QTAILQ_INIT(&pool->request_list);
> +
> +    if (ctx) {
> +        aio_set_event_notifier(ctx, &pool->notifier, event_notifier_ready,
> +                               thread_pool_active);
> +    } else {
> +        qemu_aio_set_event_notifier(&pool->notifier, event_notifier_ready,
> +                                    thread_pool_active);
> +    }

Usual comment here about if (ctx).  Otherwise looks good.

> +}
> +
>  static void thread_pool_init(void)
>  {
> -    QLIST_INIT(&head);
> -    event_notifier_init(&notifier, false);
> -    qemu_mutex_init(&lock);
> -    qemu_cond_init(&check_cancel);
> -    qemu_sem_init(&sem, 0);
> -    qemu_aio_set_event_notifier(&notifier, event_notifier_ready,
> -                                thread_pool_active);
> -
> -    QTAILQ_INIT(&request_list);
> -    new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
> +    thread_pool_init_one(&global_pool, NULL);
>  }
>  
>  block_init(thread_pool_init)
> diff --git a/trace-events b/trace-events
> index a27ae43..f16c021 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -115,8 +115,8 @@ virtio_blk_data_plane_complete_request(void *s, unsigned int head, int ret) "dat
>  vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring physical %#"PRIx64" desc %p avail %p used %p"
>  
>  # thread-pool.c
> -thread_pool_submit(void *req, void *opaque) "req %p opaque %p"
> -thread_pool_complete(void *req, void *opaque, int ret) "req %p opaque %p ret %d"
> +thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p"
> +thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d"
>  thread_pool_cancel(void *req, void *opaque) "req %p opaque %p"
>  
>  # posix-aio-compat.c
> 

Reviewed-by: Paolo Bnzini

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Qemu-devel] [PATCH 4/5] main-loop: add qemu_get_aio_context()
  2013-03-06 15:45 ` [Qemu-devel] [PATCH 4/5] main-loop: add qemu_get_aio_context() Stefan Hajnoczi
@ 2013-03-06 17:26   ` Paolo Bonzini
  0 siblings, 0 replies; 14+ messages in thread
From: Paolo Bonzini @ 2013-03-06 17:26 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: Kevin Wolf, qemu-devel

Il 06/03/2013 16:45, Stefan Hajnoczi ha scritto:
> It is very useful to get the main loop AioContext, which is a static
> variable in main-loop.c.
> 
> I'm not sure whether qemu_get_aio_context() will be necessary in the
> future once devices focus on using their own AioContext instead of the
> main loop AioContext, but for now it allows us to refactor code to
> support multiple AioContext while actually passing the main loop
> AioContext.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
>  include/qemu/main-loop.h | 5 +++++
>  main-loop.c              | 5 +++++
>  2 files changed, 10 insertions(+)
> 
> diff --git a/include/qemu/main-loop.h b/include/qemu/main-loop.h
> index 0995288..6f0200a 100644
> --- a/include/qemu/main-loop.h
> +++ b/include/qemu/main-loop.h
> @@ -82,6 +82,11 @@ int qemu_init_main_loop(void);
>  int main_loop_wait(int nonblocking);
>  
>  /**
> + * qemu_get_aio_context: Return the main loop's AioContext
> + */
> +AioContext *qemu_get_aio_context(void);
> +
> +/**
>   * qemu_notify_event: Force processing of pending events.
>   *
>   * Similar to signaling a condition variable, qemu_notify_event forces
> diff --git a/main-loop.c b/main-loop.c
> index 8c9b58c..eb80ff3 100644
> --- a/main-loop.c
> +++ b/main-loop.c
> @@ -109,6 +109,11 @@ static int qemu_signal_init(void)
>  
>  static AioContext *qemu_aio_context;
>  
> +AioContext *qemu_get_aio_context(void)
> +{
> +    return qemu_aio_context;
> +}
> +
>  void qemu_notify_event(void)
>  {
>      if (!qemu_aio_context) {
> 

Reviewed-by: Paolo Bonzini <pbonzini@redhat.com>

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Qemu-devel] [PATCH 3/5] aio: add a ThreadPool instance to AioContext
  2013-03-06 17:24   ` Paolo Bonzini
@ 2013-03-07 10:02     ` Stefan Hajnoczi
  0 siblings, 0 replies; 14+ messages in thread
From: Stefan Hajnoczi @ 2013-03-07 10:02 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: Kevin Wolf, qemu-devel

On Wed, Mar 06, 2013 at 06:24:37PM +0100, Paolo Bonzini wrote:
> Il 06/03/2013 16:45, Stefan Hajnoczi ha scritto:
> > This patch adds a ThreadPool to AioContext.  It's possible that some
> > AioContext instances will never use the ThreadPool, so defer creation
> > until aio_get_thread_pool().
> 
> What lock should protect against doing this twice?

aio_get_thread_pool() is not thread-safe and I don't think it needs to
be.  I imagine it will only be called from the thread that runs the
AioContext.

Stefan

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Qemu-devel] [PATCH 5/5] threadpool: drop global thread pool
  2013-03-06 16:35   ` Paolo Bonzini
@ 2013-03-07 10:07     ` Stefan Hajnoczi
  0 siblings, 0 replies; 14+ messages in thread
From: Stefan Hajnoczi @ 2013-03-07 10:07 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: Kevin Wolf, qemu-devel

On Wed, Mar 06, 2013 at 05:35:09PM +0100, Paolo Bonzini wrote:
> Il 06/03/2013 16:45, Stefan Hajnoczi ha scritto:
> > Now that each AioContext has a ThreadPool and the main loop AioContext
> > can be fetched with qemu_get_aio_context(), we can eliminate the concept
> > of a global thread pool from thread-pool.c.
> > 
> > The submit functions must take a ThreadPool* argument.
> 
> This is certainly ok for thread-pool.c.  For raw-posix and raw-win32,
> what about adding already a bdrv_get_aio_context() function and using
> that in paio_submit?  Is it putting the cart before the horse?

Thanks, we might as well do that.  Then I won't have to go back and
modify block/raw-posix.c and block/raw-win32.c in the next patch series
which lets BlockDriverState bind to an AioContext.

Stefan

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Qemu-devel] [PATCH 1/5] threadpool: move globals into struct ThreadPool
  2013-03-06 15:45 ` [Qemu-devel] [PATCH 1/5] threadpool: move globals into struct ThreadPool Stefan Hajnoczi
  2013-03-06 17:25   ` Paolo Bonzini
@ 2013-03-08  3:18   ` Wenchao Xia
  1 sibling, 0 replies; 14+ messages in thread
From: Wenchao Xia @ 2013-03-08  3:18 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: Kevin Wolf, Paolo Bonzini, qemu-devel

  Reviewed the patch, it is a pure remove of globals variable.

Reviewed-by: Wenchao Xia <xiawenc@linux.vnet.ibm.com>

> Move global variables into a struct so multiple thread pools can be
> supported in the future.
> 
> This patch does not change thread-pool.h interfaces.  There is still a
> global thread pool and it is not yet possible to create/destroy
> individual thread pools.  Moving the variables into a struct first makes
> later patches easier to review.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
>   thread-pool.c | 195 ++++++++++++++++++++++++++++++++++------------------------
>   trace-events  |   4 +-
>   2 files changed, 117 insertions(+), 82 deletions(-)
> 
> diff --git a/thread-pool.c b/thread-pool.c
> index e3ca64d..8a957b9 100644
> --- a/thread-pool.c
> +++ b/thread-pool.c
> @@ -24,7 +24,9 @@
>   #include "qemu/event_notifier.h"
>   #include "block/thread-pool.h"
> 
> -static void do_spawn_thread(void);
> +typedef struct ThreadPool ThreadPool;
> +
> +static void do_spawn_thread(ThreadPool *pool);
> 
>   typedef struct ThreadPoolElement ThreadPoolElement;
> 
> @@ -37,6 +39,7 @@ enum ThreadState {
> 
>   struct ThreadPoolElement {
>       BlockDriverAIOCB common;
> +    ThreadPool *pool;
>       ThreadPoolFunc *func;
>       void *arg;
> 
> @@ -54,49 +57,56 @@ struct ThreadPoolElement {
>       QLIST_ENTRY(ThreadPoolElement) all;
>   };
> 
> -static EventNotifier notifier;
> -static QemuMutex lock;
> -static QemuCond check_cancel;
> -static QemuSemaphore sem;
> -static int max_threads = 64;
> -static QEMUBH *new_thread_bh;
> -
> -/* The following variables are protected by the global mutex.  */
> -static QLIST_HEAD(, ThreadPoolElement) head;
> -
> -/* The following variables are protected by lock.  */
> -static QTAILQ_HEAD(, ThreadPoolElement) request_list;
> -static int cur_threads;
> -static int idle_threads;
> -static int new_threads;     /* backlog of threads we need to create */
> -static int pending_threads; /* threads created but not running yet */
> -static int pending_cancellations; /* whether we need a cond_broadcast */
> -
> -static void *worker_thread(void *unused)
> +struct ThreadPool {
> +    EventNotifier notifier;
> +    QemuMutex lock;
> +    QemuCond check_cancel;
> +    QemuSemaphore sem;
> +    int max_threads;
> +    QEMUBH *new_thread_bh;
> +
> +    /* The following variables are only accessed from one AioContext. */
> +    QLIST_HEAD(, ThreadPoolElement) head;
> +
> +    /* The following variables are protected by lock.  */
> +    QTAILQ_HEAD(, ThreadPoolElement) request_list;
> +    int cur_threads;
> +    int idle_threads;
> +    int new_threads;     /* backlog of threads we need to create */
> +    int pending_threads; /* threads created but not running yet */
> +    int pending_cancellations; /* whether we need a cond_broadcast */
> +};
> +
> +/* Currently there is only one thread pool instance. */
> +static ThreadPool global_pool;
> +
> +static void *worker_thread(void *opaque)
>   {
> -    qemu_mutex_lock(&lock);
> -    pending_threads--;
> -    do_spawn_thread();
> +    ThreadPool *pool = opaque;
> +
> +    qemu_mutex_lock(&pool->lock);
> +    pool->pending_threads--;
> +    do_spawn_thread(pool);
> 
>       while (1) {
>           ThreadPoolElement *req;
>           int ret;
> 
>           do {
> -            idle_threads++;
> -            qemu_mutex_unlock(&lock);
> -            ret = qemu_sem_timedwait(&sem, 10000);
> -            qemu_mutex_lock(&lock);
> -            idle_threads--;
> -        } while (ret == -1 && !QTAILQ_EMPTY(&request_list));
> +            pool->idle_threads++;
> +            qemu_mutex_unlock(&pool->lock);
> +            ret = qemu_sem_timedwait(&pool->sem, 10000);
> +            qemu_mutex_lock(&pool->lock);
> +            pool->idle_threads--;
> +        } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
>           if (ret == -1) {
>               break;
>           }
> 
> -        req = QTAILQ_FIRST(&request_list);
> -        QTAILQ_REMOVE(&request_list, req, reqs);
> +        req = QTAILQ_FIRST(&pool->request_list);
> +        QTAILQ_REMOVE(&pool->request_list, req, reqs);
>           req->state = THREAD_ACTIVE;
> -        qemu_mutex_unlock(&lock);
> +        qemu_mutex_unlock(&pool->lock);
> 
>           ret = req->func(req->arg);
> 
> @@ -105,45 +115,47 @@ static void *worker_thread(void *unused)
>           smp_wmb();
>           req->state = THREAD_DONE;
> 
> -        qemu_mutex_lock(&lock);
> -        if (pending_cancellations) {
> -            qemu_cond_broadcast(&check_cancel);
> +        qemu_mutex_lock(&pool->lock);
> +        if (pool->pending_cancellations) {
> +            qemu_cond_broadcast(&pool->check_cancel);
>           }
> 
> -        event_notifier_set(&notifier);
> +        event_notifier_set(&pool->notifier);
>       }
> 
> -    cur_threads--;
> -    qemu_mutex_unlock(&lock);
> +    pool->cur_threads--;
> +    qemu_mutex_unlock(&pool->lock);
>       return NULL;
>   }
> 
> -static void do_spawn_thread(void)
> +static void do_spawn_thread(ThreadPool *pool)
>   {
>       QemuThread t;
> 
>       /* Runs with lock taken.  */
> -    if (!new_threads) {
> +    if (!pool->new_threads) {
>           return;
>       }
> 
> -    new_threads--;
> -    pending_threads++;
> +    pool->new_threads--;
> +    pool->pending_threads++;
> 
> -    qemu_thread_create(&t, worker_thread, NULL, QEMU_THREAD_DETACHED);
> +    qemu_thread_create(&t, worker_thread, pool, QEMU_THREAD_DETACHED);
>   }
> 
>   static void spawn_thread_bh_fn(void *opaque)
>   {
> -    qemu_mutex_lock(&lock);
> -    do_spawn_thread();
> -    qemu_mutex_unlock(&lock);
> +    ThreadPool *pool = opaque;
> +
> +    qemu_mutex_lock(&pool->lock);
> +    do_spawn_thread(pool);
> +    qemu_mutex_unlock(&pool->lock);
>   }
> 
> -static void spawn_thread(void)
> +static void spawn_thread(ThreadPool *pool)
>   {
> -    cur_threads++;
> -    new_threads++;
> +    pool->cur_threads++;
> +    pool->new_threads++;
>       /* If there are threads being created, they will spawn new workers, so
>        * we don't spend time creating many threads in a loop holding a mutex or
>        * starving the current vcpu.
> @@ -151,23 +163,25 @@ static void spawn_thread(void)
>        * If there are no idle threads, ask the main thread to create one, so we
>        * inherit the correct affinity instead of the vcpu affinity.
>        */
> -    if (!pending_threads) {
> -        qemu_bh_schedule(new_thread_bh);
> +    if (!pool->pending_threads) {
> +        qemu_bh_schedule(pool->new_thread_bh);
>       }
>   }
> 
>   static void event_notifier_ready(EventNotifier *notifier)
>   {
> +    ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
>       ThreadPoolElement *elem, *next;
> 
>       event_notifier_test_and_clear(notifier);
>   restart:
> -    QLIST_FOREACH_SAFE(elem, &head, all, next) {
> +    QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
>           if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
>               continue;
>           }
>           if (elem->state == THREAD_DONE) {
> -            trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
> +            trace_thread_pool_complete(pool, elem, elem->common.opaque,
> +                                       elem->ret);
>           }
>           if (elem->state == THREAD_DONE && elem->common.cb) {
>               QLIST_REMOVE(elem, all);
> @@ -186,34 +200,36 @@ restart:
> 
>   static int thread_pool_active(EventNotifier *notifier)
>   {
> -    return !QLIST_EMPTY(&head);
> +    ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
> +    return !QLIST_EMPTY(&pool->head);
>   }
> 
>   static void thread_pool_cancel(BlockDriverAIOCB *acb)
>   {
>       ThreadPoolElement *elem = (ThreadPoolElement *)acb;
> +    ThreadPool *pool = elem->pool;
> 
>       trace_thread_pool_cancel(elem, elem->common.opaque);
> 
> -    qemu_mutex_lock(&lock);
> +    qemu_mutex_lock(&pool->lock);
>       if (elem->state == THREAD_QUEUED &&
>           /* No thread has yet started working on elem. we can try to "steal"
>            * the item from the worker if we can get a signal from the
>            * semaphore.  Because this is non-blocking, we can do it with
>            * the lock taken and ensure that elem will remain THREAD_QUEUED.
>            */
> -        qemu_sem_timedwait(&sem, 0) == 0) {
> -        QTAILQ_REMOVE(&request_list, elem, reqs);
> +        qemu_sem_timedwait(&pool->sem, 0) == 0) {
> +        QTAILQ_REMOVE(&pool->request_list, elem, reqs);
>           elem->state = THREAD_CANCELED;
> -        event_notifier_set(&notifier);
> +        event_notifier_set(&pool->notifier);
>       } else {
> -        pending_cancellations++;
> +        pool->pending_cancellations++;
>           while (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
> -            qemu_cond_wait(&check_cancel, &lock);
> +            qemu_cond_wait(&pool->check_cancel, &pool->lock);
>           }
> -        pending_cancellations--;
> +        pool->pending_cancellations--;
>       }
> -    qemu_mutex_unlock(&lock);
> +    qemu_mutex_unlock(&pool->lock);
>   }
> 
>   static const AIOCBInfo thread_pool_aiocb_info = {
> @@ -224,24 +240,26 @@ static const AIOCBInfo thread_pool_aiocb_info = {
>   BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
>           BlockDriverCompletionFunc *cb, void *opaque)
>   {
> +    ThreadPool *pool = &global_pool;
>       ThreadPoolElement *req;
> 
>       req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
>       req->func = func;
>       req->arg = arg;
>       req->state = THREAD_QUEUED;
> +    req->pool = pool;
> 
> -    QLIST_INSERT_HEAD(&head, req, all);
> +    QLIST_INSERT_HEAD(&pool->head, req, all);
> 
> -    trace_thread_pool_submit(req, arg);
> +    trace_thread_pool_submit(pool, req, arg);
> 
> -    qemu_mutex_lock(&lock);
> -    if (idle_threads == 0 && cur_threads < max_threads) {
> -        spawn_thread();
> +    qemu_mutex_lock(&pool->lock);
> +    if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
> +        spawn_thread(pool);
>       }
> -    QTAILQ_INSERT_TAIL(&request_list, req, reqs);
> -    qemu_mutex_unlock(&lock);
> -    qemu_sem_post(&sem);
> +    QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
> +    qemu_mutex_unlock(&pool->lock);
> +    qemu_sem_post(&pool->sem);
>       return &req->common;
>   }
> 
> @@ -272,18 +290,35 @@ void thread_pool_submit(ThreadPoolFunc *func, void *arg)
>       thread_pool_submit_aio(func, arg, NULL, NULL);
>   }
> 
> +static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
> +{
> +    memset(pool, 0, sizeof(*pool));
> +    event_notifier_init(&pool->notifier, false);
> +    qemu_mutex_init(&pool->lock);
> +    qemu_cond_init(&pool->check_cancel);
> +    qemu_sem_init(&pool->sem, 0);
> +    pool->max_threads = 64;
> +    if (ctx) {
> +        pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
> +    } else {
> +        pool->new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, pool);
> +    }
> +
> +    QLIST_INIT(&pool->head);
> +    QTAILQ_INIT(&pool->request_list);
> +
> +    if (ctx) {
> +        aio_set_event_notifier(ctx, &pool->notifier, event_notifier_ready,
> +                               thread_pool_active);
> +    } else {
> +        qemu_aio_set_event_notifier(&pool->notifier, event_notifier_ready,
> +                                    thread_pool_active);
> +    }
> +}
> +
>   static void thread_pool_init(void)
>   {
> -    QLIST_INIT(&head);
> -    event_notifier_init(&notifier, false);
> -    qemu_mutex_init(&lock);
> -    qemu_cond_init(&check_cancel);
> -    qemu_sem_init(&sem, 0);
> -    qemu_aio_set_event_notifier(&notifier, event_notifier_ready,
> -                                thread_pool_active);
> -
> -    QTAILQ_INIT(&request_list);
> -    new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
> +    thread_pool_init_one(&global_pool, NULL);
>   }
> 
>   block_init(thread_pool_init)
> diff --git a/trace-events b/trace-events
> index a27ae43..f16c021 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -115,8 +115,8 @@ virtio_blk_data_plane_complete_request(void *s, unsigned int head, int ret) "dat
>   vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring physical %#"PRIx64" desc %p avail %p used %p"
> 
>   # thread-pool.c
> -thread_pool_submit(void *req, void *opaque) "req %p opaque %p"
> -thread_pool_complete(void *req, void *opaque, int ret) "req %p opaque %p ret %d"
> +thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p"
> +thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d"
>   thread_pool_cancel(void *req, void *opaque) "req %p opaque %p"
> 
>   # posix-aio-compat.c
> 


-- 
Best Regards

Wenchao Xia

^ permalink raw reply	[flat|nested] 14+ messages in thread

end of thread, other threads:[~2013-03-08  3:20 UTC | newest]

Thread overview: 14+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2013-03-06 15:45 [Qemu-devel] [PATCH 0/5] threadpool: support multiple ThreadPools Stefan Hajnoczi
2013-03-06 15:45 ` [Qemu-devel] [PATCH 1/5] threadpool: move globals into struct ThreadPool Stefan Hajnoczi
2013-03-06 17:25   ` Paolo Bonzini
2013-03-08  3:18   ` Wenchao Xia
2013-03-06 15:45 ` [Qemu-devel] [PATCH 2/5] threadpool: add thread_pool_new() and thread_pool_free() Stefan Hajnoczi
2013-03-06 16:36   ` Paolo Bonzini
2013-03-06 15:45 ` [Qemu-devel] [PATCH 3/5] aio: add a ThreadPool instance to AioContext Stefan Hajnoczi
2013-03-06 17:24   ` Paolo Bonzini
2013-03-07 10:02     ` Stefan Hajnoczi
2013-03-06 15:45 ` [Qemu-devel] [PATCH 4/5] main-loop: add qemu_get_aio_context() Stefan Hajnoczi
2013-03-06 17:26   ` Paolo Bonzini
2013-03-06 15:45 ` [Qemu-devel] [PATCH 5/5] threadpool: drop global thread pool Stefan Hajnoczi
2013-03-06 16:35   ` Paolo Bonzini
2013-03-07 10:07     ` Stefan Hajnoczi

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).