* [Qemu-devel] [PATCH v2 1/7] main-loop: add qemu_get_aio_context()
2013-03-07 12:41 [Qemu-devel] [PATCH v2 0/7] threadpool: support multiple ThreadPools Stefan Hajnoczi
@ 2013-03-07 12:41 ` Stefan Hajnoczi
2013-03-07 12:41 ` [Qemu-devel] [PATCH v2 2/7] threadpool: move globals into struct ThreadPool Stefan Hajnoczi
` (7 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Stefan Hajnoczi @ 2013-03-07 12:41 UTC (permalink / raw)
To: qemu-devel; +Cc: Kevin Wolf, Paolo Bonzini, Stefan Hajnoczi
It is very useful to get the main loop AioContext, which is a static
variable in main-loop.c.
I'm not sure whether qemu_get_aio_context() will be necessary in the
future once devices focus on using their own AioContext instead of the
main loop AioContext, but for now it allows us to refactor code to
support multiple AioContext while actually passing the main loop
AioContext.
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
include/qemu/main-loop.h | 5 +++++
main-loop.c | 5 +++++
2 files changed, 10 insertions(+)
diff --git a/include/qemu/main-loop.h b/include/qemu/main-loop.h
index 0995288..6f0200a 100644
--- a/include/qemu/main-loop.h
+++ b/include/qemu/main-loop.h
@@ -82,6 +82,11 @@ int qemu_init_main_loop(void);
int main_loop_wait(int nonblocking);
/**
+ * qemu_get_aio_context: Return the main loop's AioContext
+ */
+AioContext *qemu_get_aio_context(void);
+
+/**
* qemu_notify_event: Force processing of pending events.
*
* Similar to signaling a condition variable, qemu_notify_event forces
diff --git a/main-loop.c b/main-loop.c
index 8c9b58c..eb80ff3 100644
--- a/main-loop.c
+++ b/main-loop.c
@@ -109,6 +109,11 @@ static int qemu_signal_init(void)
static AioContext *qemu_aio_context;
+AioContext *qemu_get_aio_context(void)
+{
+ return qemu_aio_context;
+}
+
void qemu_notify_event(void)
{
if (!qemu_aio_context) {
--
1.8.1.4
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Qemu-devel] [PATCH v2 2/7] threadpool: move globals into struct ThreadPool
2013-03-07 12:41 [Qemu-devel] [PATCH v2 0/7] threadpool: support multiple ThreadPools Stefan Hajnoczi
2013-03-07 12:41 ` [Qemu-devel] [PATCH v2 1/7] main-loop: add qemu_get_aio_context() Stefan Hajnoczi
@ 2013-03-07 12:41 ` Stefan Hajnoczi
2013-03-07 12:41 ` [Qemu-devel] [PATCH v2 3/7] threadpool: add thread_pool_new() and thread_pool_free() Stefan Hajnoczi
` (6 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Stefan Hajnoczi @ 2013-03-07 12:41 UTC (permalink / raw)
To: qemu-devel; +Cc: Kevin Wolf, Paolo Bonzini, Stefan Hajnoczi
Move global variables into a struct so multiple thread pools can be
supported in the future.
This patch does not change thread-pool.h interfaces. There is still a
global thread pool and it is not yet possible to create/destroy
individual thread pools. Moving the variables into a struct first makes
later patches easier to review.
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
thread-pool.c | 190 +++++++++++++++++++++++++++++++++-------------------------
trace-events | 4 +-
2 files changed, 112 insertions(+), 82 deletions(-)
diff --git a/thread-pool.c b/thread-pool.c
index e3ca64d..a0aecd0 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -24,7 +24,9 @@
#include "qemu/event_notifier.h"
#include "block/thread-pool.h"
-static void do_spawn_thread(void);
+typedef struct ThreadPool ThreadPool;
+
+static void do_spawn_thread(ThreadPool *pool);
typedef struct ThreadPoolElement ThreadPoolElement;
@@ -37,6 +39,7 @@ enum ThreadState {
struct ThreadPoolElement {
BlockDriverAIOCB common;
+ ThreadPool *pool;
ThreadPoolFunc *func;
void *arg;
@@ -54,49 +57,56 @@ struct ThreadPoolElement {
QLIST_ENTRY(ThreadPoolElement) all;
};
-static EventNotifier notifier;
-static QemuMutex lock;
-static QemuCond check_cancel;
-static QemuSemaphore sem;
-static int max_threads = 64;
-static QEMUBH *new_thread_bh;
-
-/* The following variables are protected by the global mutex. */
-static QLIST_HEAD(, ThreadPoolElement) head;
-
-/* The following variables are protected by lock. */
-static QTAILQ_HEAD(, ThreadPoolElement) request_list;
-static int cur_threads;
-static int idle_threads;
-static int new_threads; /* backlog of threads we need to create */
-static int pending_threads; /* threads created but not running yet */
-static int pending_cancellations; /* whether we need a cond_broadcast */
-
-static void *worker_thread(void *unused)
+struct ThreadPool {
+ EventNotifier notifier;
+ QemuMutex lock;
+ QemuCond check_cancel;
+ QemuSemaphore sem;
+ int max_threads;
+ QEMUBH *new_thread_bh;
+
+ /* The following variables are only accessed from one AioContext. */
+ QLIST_HEAD(, ThreadPoolElement) head;
+
+ /* The following variables are protected by lock. */
+ QTAILQ_HEAD(, ThreadPoolElement) request_list;
+ int cur_threads;
+ int idle_threads;
+ int new_threads; /* backlog of threads we need to create */
+ int pending_threads; /* threads created but not running yet */
+ int pending_cancellations; /* whether we need a cond_broadcast */
+};
+
+/* Currently there is only one thread pool instance. */
+static ThreadPool global_pool;
+
+static void *worker_thread(void *opaque)
{
- qemu_mutex_lock(&lock);
- pending_threads--;
- do_spawn_thread();
+ ThreadPool *pool = opaque;
+
+ qemu_mutex_lock(&pool->lock);
+ pool->pending_threads--;
+ do_spawn_thread(pool);
while (1) {
ThreadPoolElement *req;
int ret;
do {
- idle_threads++;
- qemu_mutex_unlock(&lock);
- ret = qemu_sem_timedwait(&sem, 10000);
- qemu_mutex_lock(&lock);
- idle_threads--;
- } while (ret == -1 && !QTAILQ_EMPTY(&request_list));
+ pool->idle_threads++;
+ qemu_mutex_unlock(&pool->lock);
+ ret = qemu_sem_timedwait(&pool->sem, 10000);
+ qemu_mutex_lock(&pool->lock);
+ pool->idle_threads--;
+ } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
if (ret == -1) {
break;
}
- req = QTAILQ_FIRST(&request_list);
- QTAILQ_REMOVE(&request_list, req, reqs);
+ req = QTAILQ_FIRST(&pool->request_list);
+ QTAILQ_REMOVE(&pool->request_list, req, reqs);
req->state = THREAD_ACTIVE;
- qemu_mutex_unlock(&lock);
+ qemu_mutex_unlock(&pool->lock);
ret = req->func(req->arg);
@@ -105,45 +115,47 @@ static void *worker_thread(void *unused)
smp_wmb();
req->state = THREAD_DONE;
- qemu_mutex_lock(&lock);
- if (pending_cancellations) {
- qemu_cond_broadcast(&check_cancel);
+ qemu_mutex_lock(&pool->lock);
+ if (pool->pending_cancellations) {
+ qemu_cond_broadcast(&pool->check_cancel);
}
- event_notifier_set(¬ifier);
+ event_notifier_set(&pool->notifier);
}
- cur_threads--;
- qemu_mutex_unlock(&lock);
+ pool->cur_threads--;
+ qemu_mutex_unlock(&pool->lock);
return NULL;
}
-static void do_spawn_thread(void)
+static void do_spawn_thread(ThreadPool *pool)
{
QemuThread t;
/* Runs with lock taken. */
- if (!new_threads) {
+ if (!pool->new_threads) {
return;
}
- new_threads--;
- pending_threads++;
+ pool->new_threads--;
+ pool->pending_threads++;
- qemu_thread_create(&t, worker_thread, NULL, QEMU_THREAD_DETACHED);
+ qemu_thread_create(&t, worker_thread, pool, QEMU_THREAD_DETACHED);
}
static void spawn_thread_bh_fn(void *opaque)
{
- qemu_mutex_lock(&lock);
- do_spawn_thread();
- qemu_mutex_unlock(&lock);
+ ThreadPool *pool = opaque;
+
+ qemu_mutex_lock(&pool->lock);
+ do_spawn_thread(pool);
+ qemu_mutex_unlock(&pool->lock);
}
-static void spawn_thread(void)
+static void spawn_thread(ThreadPool *pool)
{
- cur_threads++;
- new_threads++;
+ pool->cur_threads++;
+ pool->new_threads++;
/* If there are threads being created, they will spawn new workers, so
* we don't spend time creating many threads in a loop holding a mutex or
* starving the current vcpu.
@@ -151,23 +163,25 @@ static void spawn_thread(void)
* If there are no idle threads, ask the main thread to create one, so we
* inherit the correct affinity instead of the vcpu affinity.
*/
- if (!pending_threads) {
- qemu_bh_schedule(new_thread_bh);
+ if (!pool->pending_threads) {
+ qemu_bh_schedule(pool->new_thread_bh);
}
}
static void event_notifier_ready(EventNotifier *notifier)
{
+ ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
ThreadPoolElement *elem, *next;
event_notifier_test_and_clear(notifier);
restart:
- QLIST_FOREACH_SAFE(elem, &head, all, next) {
+ QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
continue;
}
if (elem->state == THREAD_DONE) {
- trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
+ trace_thread_pool_complete(pool, elem, elem->common.opaque,
+ elem->ret);
}
if (elem->state == THREAD_DONE && elem->common.cb) {
QLIST_REMOVE(elem, all);
@@ -186,34 +200,36 @@ restart:
static int thread_pool_active(EventNotifier *notifier)
{
- return !QLIST_EMPTY(&head);
+ ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
+ return !QLIST_EMPTY(&pool->head);
}
static void thread_pool_cancel(BlockDriverAIOCB *acb)
{
ThreadPoolElement *elem = (ThreadPoolElement *)acb;
+ ThreadPool *pool = elem->pool;
trace_thread_pool_cancel(elem, elem->common.opaque);
- qemu_mutex_lock(&lock);
+ qemu_mutex_lock(&pool->lock);
if (elem->state == THREAD_QUEUED &&
/* No thread has yet started working on elem. we can try to "steal"
* the item from the worker if we can get a signal from the
* semaphore. Because this is non-blocking, we can do it with
* the lock taken and ensure that elem will remain THREAD_QUEUED.
*/
- qemu_sem_timedwait(&sem, 0) == 0) {
- QTAILQ_REMOVE(&request_list, elem, reqs);
+ qemu_sem_timedwait(&pool->sem, 0) == 0) {
+ QTAILQ_REMOVE(&pool->request_list, elem, reqs);
elem->state = THREAD_CANCELED;
- event_notifier_set(¬ifier);
+ event_notifier_set(&pool->notifier);
} else {
- pending_cancellations++;
+ pool->pending_cancellations++;
while (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
- qemu_cond_wait(&check_cancel, &lock);
+ qemu_cond_wait(&pool->check_cancel, &pool->lock);
}
- pending_cancellations--;
+ pool->pending_cancellations--;
}
- qemu_mutex_unlock(&lock);
+ qemu_mutex_unlock(&pool->lock);
}
static const AIOCBInfo thread_pool_aiocb_info = {
@@ -224,24 +240,26 @@ static const AIOCBInfo thread_pool_aiocb_info = {
BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
BlockDriverCompletionFunc *cb, void *opaque)
{
+ ThreadPool *pool = &global_pool;
ThreadPoolElement *req;
req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
req->func = func;
req->arg = arg;
req->state = THREAD_QUEUED;
+ req->pool = pool;
- QLIST_INSERT_HEAD(&head, req, all);
+ QLIST_INSERT_HEAD(&pool->head, req, all);
- trace_thread_pool_submit(req, arg);
+ trace_thread_pool_submit(pool, req, arg);
- qemu_mutex_lock(&lock);
- if (idle_threads == 0 && cur_threads < max_threads) {
- spawn_thread();
+ qemu_mutex_lock(&pool->lock);
+ if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
+ spawn_thread(pool);
}
- QTAILQ_INSERT_TAIL(&request_list, req, reqs);
- qemu_mutex_unlock(&lock);
- qemu_sem_post(&sem);
+ QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
+ qemu_mutex_unlock(&pool->lock);
+ qemu_sem_post(&pool->sem);
return &req->common;
}
@@ -272,18 +290,30 @@ void thread_pool_submit(ThreadPoolFunc *func, void *arg)
thread_pool_submit_aio(func, arg, NULL, NULL);
}
+static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
+{
+ if (!ctx) {
+ ctx = qemu_get_aio_context();
+ }
+
+ memset(pool, 0, sizeof(*pool));
+ event_notifier_init(&pool->notifier, false);
+ qemu_mutex_init(&pool->lock);
+ qemu_cond_init(&pool->check_cancel);
+ qemu_sem_init(&pool->sem, 0);
+ pool->max_threads = 64;
+ pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
+
+ QLIST_INIT(&pool->head);
+ QTAILQ_INIT(&pool->request_list);
+
+ aio_set_event_notifier(ctx, &pool->notifier, event_notifier_ready,
+ thread_pool_active);
+}
+
static void thread_pool_init(void)
{
- QLIST_INIT(&head);
- event_notifier_init(¬ifier, false);
- qemu_mutex_init(&lock);
- qemu_cond_init(&check_cancel);
- qemu_sem_init(&sem, 0);
- qemu_aio_set_event_notifier(¬ifier, event_notifier_ready,
- thread_pool_active);
-
- QTAILQ_INIT(&request_list);
- new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
+ thread_pool_init_one(&global_pool, NULL);
}
block_init(thread_pool_init)
diff --git a/trace-events b/trace-events
index a27ae43..f16c021 100644
--- a/trace-events
+++ b/trace-events
@@ -115,8 +115,8 @@ virtio_blk_data_plane_complete_request(void *s, unsigned int head, int ret) "dat
vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring physical %#"PRIx64" desc %p avail %p used %p"
# thread-pool.c
-thread_pool_submit(void *req, void *opaque) "req %p opaque %p"
-thread_pool_complete(void *req, void *opaque, int ret) "req %p opaque %p ret %d"
+thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p"
+thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d"
thread_pool_cancel(void *req, void *opaque) "req %p opaque %p"
# posix-aio-compat.c
--
1.8.1.4
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Qemu-devel] [PATCH v2 3/7] threadpool: add thread_pool_new() and thread_pool_free()
2013-03-07 12:41 [Qemu-devel] [PATCH v2 0/7] threadpool: support multiple ThreadPools Stefan Hajnoczi
2013-03-07 12:41 ` [Qemu-devel] [PATCH v2 1/7] main-loop: add qemu_get_aio_context() Stefan Hajnoczi
2013-03-07 12:41 ` [Qemu-devel] [PATCH v2 2/7] threadpool: move globals into struct ThreadPool Stefan Hajnoczi
@ 2013-03-07 12:41 ` Stefan Hajnoczi
2013-03-07 12:41 ` [Qemu-devel] [PATCH v2 4/7] aio: add a ThreadPool instance to AioContext Stefan Hajnoczi
` (5 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Stefan Hajnoczi @ 2013-03-07 12:41 UTC (permalink / raw)
To: qemu-devel; +Cc: Kevin Wolf, Paolo Bonzini, Stefan Hajnoczi
ThreadPool is tied to an AioContext through its event notifier, which
dictates in which AioContext the work item's callback function will be
invoked.
In order to support multiple AioContexts we need to support multiple
ThreadPool instances.
This patch adds the new/free functions. The free function deserves
special attention because it quiesces remaining worker threads. This
requires a new condition variable and a "stopping" flag to let workers
know they should terminate once idle.
We never needed to do this before since the global threadpool was not
explicitly destroyed until process termination.
Also stash the AioContext pointer in ThreadPool so that we can call
aio_set_event_notifier() in thread_pool_free(). We didn't need to hold
onto AioContext previously since there was no free function.
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
include/block/thread-pool.h | 5 +++++
thread-pool.c | 52 +++++++++++++++++++++++++++++++++++++++++----
2 files changed, 53 insertions(+), 4 deletions(-)
diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
index 200703e..e1453c6 100644
--- a/include/block/thread-pool.h
+++ b/include/block/thread-pool.h
@@ -26,6 +26,11 @@
typedef int ThreadPoolFunc(void *opaque);
+typedef struct ThreadPool ThreadPool;
+
+ThreadPool *thread_pool_new(struct AioContext *ctx);
+void thread_pool_free(ThreadPool *pool);
+
BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
BlockDriverCompletionFunc *cb, void *opaque);
int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
diff --git a/thread-pool.c b/thread-pool.c
index a0aecd0..d1e4570 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -24,8 +24,6 @@
#include "qemu/event_notifier.h"
#include "block/thread-pool.h"
-typedef struct ThreadPool ThreadPool;
-
static void do_spawn_thread(ThreadPool *pool);
typedef struct ThreadPoolElement ThreadPoolElement;
@@ -59,8 +57,10 @@ struct ThreadPoolElement {
struct ThreadPool {
EventNotifier notifier;
+ AioContext *ctx;
QemuMutex lock;
QemuCond check_cancel;
+ QemuCond worker_stopped;
QemuSemaphore sem;
int max_threads;
QEMUBH *new_thread_bh;
@@ -75,6 +75,7 @@ struct ThreadPool {
int new_threads; /* backlog of threads we need to create */
int pending_threads; /* threads created but not running yet */
int pending_cancellations; /* whether we need a cond_broadcast */
+ bool stopping;
};
/* Currently there is only one thread pool instance. */
@@ -88,7 +89,7 @@ static void *worker_thread(void *opaque)
pool->pending_threads--;
do_spawn_thread(pool);
- while (1) {
+ while (!pool->stopping) {
ThreadPoolElement *req;
int ret;
@@ -99,7 +100,7 @@ static void *worker_thread(void *opaque)
qemu_mutex_lock(&pool->lock);
pool->idle_threads--;
} while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
- if (ret == -1) {
+ if (ret == -1 || pool->stopping) {
break;
}
@@ -124,6 +125,7 @@ static void *worker_thread(void *opaque)
}
pool->cur_threads--;
+ qemu_cond_signal(&pool->worker_stopped);
qemu_mutex_unlock(&pool->lock);
return NULL;
}
@@ -298,8 +300,10 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
memset(pool, 0, sizeof(*pool));
event_notifier_init(&pool->notifier, false);
+ pool->ctx = ctx;
qemu_mutex_init(&pool->lock);
qemu_cond_init(&pool->check_cancel);
+ qemu_cond_init(&pool->worker_stopped);
qemu_sem_init(&pool->sem, 0);
pool->max_threads = 64;
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
@@ -311,6 +315,46 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
thread_pool_active);
}
+ThreadPool *thread_pool_new(AioContext *ctx)
+{
+ ThreadPool *pool = g_new(ThreadPool, 1);
+ thread_pool_init_one(pool, ctx);
+ return pool;
+}
+
+void thread_pool_free(ThreadPool *pool)
+{
+ if (!pool) {
+ return;
+ }
+
+ assert(QLIST_EMPTY(&pool->head));
+
+ qemu_mutex_lock(&pool->lock);
+
+ /* Stop new threads from spawning */
+ qemu_bh_delete(pool->new_thread_bh);
+ pool->cur_threads -= pool->new_threads;
+ pool->new_threads = 0;
+
+ /* Wait for worker threads to terminate */
+ pool->stopping = true;
+ while (pool->cur_threads > 0) {
+ qemu_sem_post(&pool->sem);
+ qemu_cond_wait(&pool->worker_stopped, &pool->lock);
+ }
+
+ qemu_mutex_unlock(&pool->lock);
+
+ aio_set_event_notifier(pool->ctx, &pool->notifier, NULL, NULL);
+ qemu_sem_destroy(&pool->sem);
+ qemu_cond_destroy(&pool->check_cancel);
+ qemu_cond_destroy(&pool->worker_stopped);
+ qemu_mutex_destroy(&pool->lock);
+ event_notifier_cleanup(&pool->notifier);
+ g_free(pool);
+}
+
static void thread_pool_init(void)
{
thread_pool_init_one(&global_pool, NULL);
--
1.8.1.4
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Qemu-devel] [PATCH v2 4/7] aio: add a ThreadPool instance to AioContext
2013-03-07 12:41 [Qemu-devel] [PATCH v2 0/7] threadpool: support multiple ThreadPools Stefan Hajnoczi
` (2 preceding siblings ...)
2013-03-07 12:41 ` [Qemu-devel] [PATCH v2 3/7] threadpool: add thread_pool_new() and thread_pool_free() Stefan Hajnoczi
@ 2013-03-07 12:41 ` Stefan Hajnoczi
2013-03-07 12:41 ` [Qemu-devel] [PATCH v2 5/7] block: add bdrv_get_aio_context() Stefan Hajnoczi
` (4 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Stefan Hajnoczi @ 2013-03-07 12:41 UTC (permalink / raw)
To: qemu-devel; +Cc: Kevin Wolf, Paolo Bonzini, Stefan Hajnoczi
This patch adds a ThreadPool to AioContext. It's possible that some
AioContext instances will never use the ThreadPool, so defer creation
until aio_get_thread_pool().
The reason why AioContext should have the ThreadPool is because the
ThreadPool is bound to a AioContext instance where the work item's
callback function is invoked. It doesn't make sense to keep the
ThreadPool pointer anywhere other than AioContext. For example,
block/raw-posix.c can get its AioContext's ThreadPool and submit work.
Special note about headers: I used struct ThreadPool in aio.h because
there is a circular dependency if aio.h includes thread-pool.h.
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
async.c | 11 +++++++++++
include/block/aio.h | 6 ++++++
2 files changed, 17 insertions(+)
diff --git a/async.c b/async.c
index f2d47ba..90fe906 100644
--- a/async.c
+++ b/async.c
@@ -24,6 +24,7 @@
#include "qemu-common.h"
#include "block/aio.h"
+#include "block/thread-pool.h"
#include "qemu/main-loop.h"
/***********************************************************/
@@ -172,6 +173,7 @@ aio_ctx_finalize(GSource *source)
{
AioContext *ctx = (AioContext *) source;
+ thread_pool_free(ctx->thread_pool);
aio_set_event_notifier(ctx, &ctx->notifier, NULL, NULL);
event_notifier_cleanup(&ctx->notifier);
g_array_free(ctx->pollfds, TRUE);
@@ -190,6 +192,14 @@ GSource *aio_get_g_source(AioContext *ctx)
return &ctx->source;
}
+ThreadPool *aio_get_thread_pool(AioContext *ctx)
+{
+ if (!ctx->thread_pool) {
+ ctx->thread_pool = thread_pool_new(ctx);
+ }
+ return ctx->thread_pool;
+}
+
void aio_notify(AioContext *ctx)
{
event_notifier_set(&ctx->notifier);
@@ -200,6 +210,7 @@ AioContext *aio_context_new(void)
AioContext *ctx;
ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
+ ctx->thread_pool = NULL;
event_notifier_init(&ctx->notifier, false);
aio_set_event_notifier(ctx, &ctx->notifier,
(EventNotifierHandler *)
diff --git a/include/block/aio.h b/include/block/aio.h
index 5b54d38..1836793 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -66,6 +66,9 @@ typedef struct AioContext {
/* GPollFDs for aio_poll() */
GArray *pollfds;
+
+ /* Thread pool for performing work and receiving completion callbacks */
+ struct ThreadPool *thread_pool;
} AioContext;
/* Returns 1 if there are still outstanding AIO requests; 0 otherwise */
@@ -223,6 +226,9 @@ void aio_set_event_notifier(AioContext *ctx,
*/
GSource *aio_get_g_source(AioContext *ctx);
+/* Return the ThreadPool bound to this AioContext */
+struct ThreadPool *aio_get_thread_pool(AioContext *ctx);
+
/* Functions to operate on the main QEMU AioContext. */
bool qemu_aio_wait(void);
--
1.8.1.4
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Qemu-devel] [PATCH v2 5/7] block: add bdrv_get_aio_context()
2013-03-07 12:41 [Qemu-devel] [PATCH v2 0/7] threadpool: support multiple ThreadPools Stefan Hajnoczi
` (3 preceding siblings ...)
2013-03-07 12:41 ` [Qemu-devel] [PATCH v2 4/7] aio: add a ThreadPool instance to AioContext Stefan Hajnoczi
@ 2013-03-07 12:41 ` Stefan Hajnoczi
2013-03-07 12:41 ` [Qemu-devel] [PATCH v2 6/7] threadpool: drop global thread pool Stefan Hajnoczi
` (3 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Stefan Hajnoczi @ 2013-03-07 12:41 UTC (permalink / raw)
To: qemu-devel; +Cc: Kevin Wolf, Paolo Bonzini, Stefan Hajnoczi
For now bdrv_get_aio_context() is just a stub that calls
qemu_aio_get_context() since the block layer is currently tied to the
main loop AioContext.
Add the stub now so that the block layer can begin accessing its
AioContext.
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
block.c | 6 ++++++
include/block/block_int.h | 7 +++++++
2 files changed, 13 insertions(+)
diff --git a/block.c b/block.c
index 124a9eb..0e5cd01 100644
--- a/block.c
+++ b/block.c
@@ -4638,3 +4638,9 @@ out:
bdrv_delete(bs);
}
}
+
+AioContext *bdrv_get_aio_context(BlockDriverState *bs)
+{
+ /* Currently BlockDriverState always uses the main loop AioContext */
+ return qemu_get_aio_context();
+}
diff --git a/include/block/block_int.h b/include/block/block_int.h
index eaad53e..966f7fd 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -293,6 +293,13 @@ int get_tmp_filename(char *filename, int size);
void bdrv_set_io_limits(BlockDriverState *bs,
BlockIOLimit *io_limits);
+/**
+ * bdrv_get_aio_context:
+ *
+ * Returns: the currently bound #AioContext
+ */
+AioContext *bdrv_get_aio_context(BlockDriverState *bs);
+
#ifdef _WIN32
int is_windows_drive(const char *filename);
#endif
--
1.8.1.4
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Qemu-devel] [PATCH v2 6/7] threadpool: drop global thread pool
2013-03-07 12:41 [Qemu-devel] [PATCH v2 0/7] threadpool: support multiple ThreadPools Stefan Hajnoczi
` (4 preceding siblings ...)
2013-03-07 12:41 ` [Qemu-devel] [PATCH v2 5/7] block: add bdrv_get_aio_context() Stefan Hajnoczi
@ 2013-03-07 12:41 ` Stefan Hajnoczi
2013-03-07 12:41 ` [Qemu-devel] [PATCH v2 7/7] coroutine: use AioContext for CoQueue BH Stefan Hajnoczi
` (2 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Stefan Hajnoczi @ 2013-03-07 12:41 UTC (permalink / raw)
To: qemu-devel; +Cc: Kevin Wolf, Paolo Bonzini, Stefan Hajnoczi
Now that each AioContext has a ThreadPool and the main loop AioContext
can be fetched with bdrv_get_aio_context(), we can eliminate the concept
of a global thread pool from thread-pool.c.
The submit functions must take a ThreadPool* argument.
block/raw-posix.c and block/raw-win32.c use
aio_get_thread_pool(bdrv_get_aio_context(bs)) to fetch the main loop's
ThreadPool.
tests/test-thread-pool.c must be updated to reflect the new
thread_pool_submit() function prototypes.
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
use bdrv_get_aio_context
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
block/raw-posix.c | 8 ++++++--
block/raw-win32.c | 4 +++-
include/block/thread-pool.h | 10 ++++++----
tests/test-thread-pool.c | 44 +++++++++++++++++++++-----------------------
thread-pool.c | 23 +++++++----------------
5 files changed, 43 insertions(+), 46 deletions(-)
diff --git a/block/raw-posix.c b/block/raw-posix.c
index 4dfdf98..8a3cdbc 100644
--- a/block/raw-posix.c
+++ b/block/raw-posix.c
@@ -750,6 +750,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
BlockDriverCompletionFunc *cb, void *opaque, int type)
{
RawPosixAIOData *acb = g_slice_new(RawPosixAIOData);
+ ThreadPool *pool;
acb->bs = bs;
acb->aio_type = type;
@@ -763,7 +764,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
acb->aio_offset = sector_num * 512;
trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
- return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
+ pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
+ return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
}
static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs,
@@ -1413,6 +1415,7 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs,
{
BDRVRawState *s = bs->opaque;
RawPosixAIOData *acb;
+ ThreadPool *pool;
if (fd_open(bs) < 0)
return NULL;
@@ -1424,7 +1427,8 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs,
acb->aio_offset = 0;
acb->aio_ioctl_buf = buf;
acb->aio_ioctl_cmd = req;
- return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
+ pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
+ return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
}
#elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__)
diff --git a/block/raw-win32.c b/block/raw-win32.c
index b89ac19..18e0068 100644
--- a/block/raw-win32.c
+++ b/block/raw-win32.c
@@ -144,6 +144,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
BlockDriverCompletionFunc *cb, void *opaque, int type)
{
RawWin32AIOData *acb = g_slice_new(RawWin32AIOData);
+ ThreadPool *pool;
acb->bs = bs;
acb->hfile = hfile;
@@ -157,7 +158,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
acb->aio_offset = sector_num * 512;
trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
- return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
+ pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
+ return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
}
int qemu_ftruncate64(int fd, int64_t length)
diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
index e1453c6..32afcdd 100644
--- a/include/block/thread-pool.h
+++ b/include/block/thread-pool.h
@@ -31,9 +31,11 @@ typedef struct ThreadPool ThreadPool;
ThreadPool *thread_pool_new(struct AioContext *ctx);
void thread_pool_free(ThreadPool *pool);
-BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
- BlockDriverCompletionFunc *cb, void *opaque);
-int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
-void thread_pool_submit(ThreadPoolFunc *func, void *arg);
+BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
+ ThreadPoolFunc *func, void *arg,
+ BlockDriverCompletionFunc *cb, void *opaque);
+int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
+ ThreadPoolFunc *func, void *arg);
+void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
#endif
diff --git a/tests/test-thread-pool.c b/tests/test-thread-pool.c
index 9998e03..22915aa 100644
--- a/tests/test-thread-pool.c
+++ b/tests/test-thread-pool.c
@@ -4,6 +4,8 @@
#include "block/thread-pool.h"
#include "block/block.h"
+static AioContext *ctx;
+static ThreadPool *pool;
static int active;
typedef struct {
@@ -38,19 +40,10 @@ static void done_cb(void *opaque, int ret)
active--;
}
-/* A non-blocking poll of the main AIO context (we cannot use aio_poll
- * because we do not know the AioContext).
- */
-static void qemu_aio_wait_nonblocking(void)
-{
- qemu_notify_event();
- qemu_aio_wait();
-}
-
/* Wait until all aio and bh activity has finished */
static void qemu_aio_wait_all(void)
{
- while (qemu_aio_wait()) {
+ while (aio_poll(ctx, true)) {
/* Do nothing */
}
}
@@ -58,7 +51,7 @@ static void qemu_aio_wait_all(void)
static void test_submit(void)
{
WorkerTestData data = { .n = 0 };
- thread_pool_submit(worker_cb, &data);
+ thread_pool_submit(pool, worker_cb, &data);
qemu_aio_wait_all();
g_assert_cmpint(data.n, ==, 1);
}
@@ -66,7 +59,8 @@ static void test_submit(void)
static void test_submit_aio(void)
{
WorkerTestData data = { .n = 0, .ret = -EINPROGRESS };
- data.aiocb = thread_pool_submit_aio(worker_cb, &data, done_cb, &data);
+ data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data,
+ done_cb, &data);
/* The callbacks are not called until after the first wait. */
active = 1;
@@ -84,7 +78,7 @@ static void co_test_cb(void *opaque)
active = 1;
data->n = 0;
data->ret = -EINPROGRESS;
- thread_pool_submit_co(worker_cb, data);
+ thread_pool_submit_co(pool, worker_cb, data);
/* The test continues in test_submit_co, after qemu_coroutine_enter... */
@@ -126,12 +120,12 @@ static void test_submit_many(void)
for (i = 0; i < 100; i++) {
data[i].n = 0;
data[i].ret = -EINPROGRESS;
- thread_pool_submit_aio(worker_cb, &data[i], done_cb, &data[i]);
+ thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]);
}
active = 100;
while (active > 0) {
- qemu_aio_wait();
+ aio_poll(ctx, true);
}
for (i = 0; i < 100; i++) {
g_assert_cmpint(data[i].n, ==, 1);
@@ -154,7 +148,7 @@ static void test_cancel(void)
for (i = 0; i < 100; i++) {
data[i].n = 0;
data[i].ret = -EINPROGRESS;
- data[i].aiocb = thread_pool_submit_aio(long_cb, &data[i],
+ data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i],
done_cb, &data[i]);
}
@@ -162,7 +156,8 @@ static void test_cancel(void)
* run, but do not waste too much time...
*/
active = 100;
- qemu_aio_wait_nonblocking();
+ aio_notify(ctx);
+ aio_poll(ctx, false);
/* Wait some time for the threads to start, with some sanity
* testing on the behavior of the scheduler...
@@ -208,11 +203,10 @@ static void test_cancel(void)
int main(int argc, char **argv)
{
- /* These should be removed once each AioContext has its thread pool.
- * The test should create its own AioContext.
- */
- qemu_init_main_loop();
- bdrv_init();
+ int ret;
+
+ ctx = aio_context_new();
+ pool = aio_get_thread_pool(ctx);
g_test_init(&argc, &argv, NULL);
g_test_add_func("/thread-pool/submit", test_submit);
@@ -220,5 +214,9 @@ int main(int argc, char **argv)
g_test_add_func("/thread-pool/submit-co", test_submit_co);
g_test_add_func("/thread-pool/submit-many", test_submit_many);
g_test_add_func("/thread-pool/cancel", test_cancel);
- return g_test_run();
+
+ ret = g_test_run();
+
+ aio_context_unref(ctx);
+ return ret;
}
diff --git a/thread-pool.c b/thread-pool.c
index d1e4570..0ebd4c2 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -78,9 +78,6 @@ struct ThreadPool {
bool stopping;
};
-/* Currently there is only one thread pool instance. */
-static ThreadPool global_pool;
-
static void *worker_thread(void *opaque)
{
ThreadPool *pool = opaque;
@@ -239,10 +236,10 @@ static const AIOCBInfo thread_pool_aiocb_info = {
.cancel = thread_pool_cancel,
};
-BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
+BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
+ ThreadPoolFunc *func, void *arg,
BlockDriverCompletionFunc *cb, void *opaque)
{
- ThreadPool *pool = &global_pool;
ThreadPoolElement *req;
req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
@@ -278,18 +275,19 @@ static void thread_pool_co_cb(void *opaque, int ret)
qemu_coroutine_enter(co->co, NULL);
}
-int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
+int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
+ void *arg)
{
ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
assert(qemu_in_coroutine());
- thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
+ thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
qemu_coroutine_yield();
return tpc.ret;
}
-void thread_pool_submit(ThreadPoolFunc *func, void *arg)
+void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
{
- thread_pool_submit_aio(func, arg, NULL, NULL);
+ thread_pool_submit_aio(pool, func, arg, NULL, NULL);
}
static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
@@ -354,10 +352,3 @@ void thread_pool_free(ThreadPool *pool)
event_notifier_cleanup(&pool->notifier);
g_free(pool);
}
-
-static void thread_pool_init(void)
-{
- thread_pool_init_one(&global_pool, NULL);
-}
-
-block_init(thread_pool_init)
--
1.8.1.4
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Qemu-devel] [PATCH v2 7/7] coroutine: use AioContext for CoQueue BH
2013-03-07 12:41 [Qemu-devel] [PATCH v2 0/7] threadpool: support multiple ThreadPools Stefan Hajnoczi
` (5 preceding siblings ...)
2013-03-07 12:41 ` [Qemu-devel] [PATCH v2 6/7] threadpool: drop global thread pool Stefan Hajnoczi
@ 2013-03-07 12:41 ` Stefan Hajnoczi
2013-03-07 17:16 ` [Qemu-devel] [PATCH v2 0/7] threadpool: support multiple ThreadPools Paolo Bonzini
2013-03-13 13:37 ` Stefan Hajnoczi
8 siblings, 0 replies; 10+ messages in thread
From: Stefan Hajnoczi @ 2013-03-07 12:41 UTC (permalink / raw)
To: qemu-devel; +Cc: Kevin Wolf, Paolo Bonzini, Stefan Hajnoczi
CoQueue uses a BH to awake coroutines that were made ready to run again
using qemu_co_queue_next() or qemu_co_queue_restart_all(). The BH
currently runs in the iothread AioContext and would break coroutines
that run in a different AioContext.
This is a slightly tricky problem because the lifetime of the BH exceeds
that of the CoQueue. This means coroutines can be awoken after CoQueue
itself has been freed. Also, there is no qemu_co_queue_destroy()
function which we could use to handle freeing resources.
Introducing qemu_co_queue_destroy() has a ripple effect of requiring us
to also add qemu_co_mutex_destroy() and qemu_co_rwlock_destroy(), as
well as updating all callers. Avoid doing that.
We also cannot switch from BH to GIdle function because aio_poll() does
not dispatch GIdle functions. (GIdle functions make memory management
slightly easier because they free themselves.)
Finally, I don't want to move unlock_queue and unlock_bh into
AioContext. That would break encapsulation - AioContext isn't supposed
to know about CoQueue.
This patch implements a different solution: each qemu_co_queue_next() or
qemu_co_queue_restart_all() call creates a new BH and list of coroutines
to wake up. Callers tend to invoke qemu_co_queue_next() and
qemu_co_queue_restart_all() occasionally after blocking I/O, so creating
a new BH for each call shouldn't be massively inefficient.
Note that this patch does not add an interface for specifying the
AioContext. That is left to future patches which will convert CoQueue,
CoMutex, and CoRwlock to expose AioContext.
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
include/block/coroutine.h | 1 +
qemu-coroutine-lock.c | 55 ++++++++++++++++++++++++++++++++---------------
2 files changed, 39 insertions(+), 17 deletions(-)
diff --git a/include/block/coroutine.h b/include/block/coroutine.h
index c31fae3..a978162 100644
--- a/include/block/coroutine.h
+++ b/include/block/coroutine.h
@@ -104,6 +104,7 @@ bool qemu_in_coroutine(void);
*/
typedef struct CoQueue {
QTAILQ_HEAD(, Coroutine) entries;
+ AioContext *ctx;
} CoQueue;
/**
diff --git a/qemu-coroutine-lock.c b/qemu-coroutine-lock.c
index 97ef01c..86efe1f 100644
--- a/qemu-coroutine-lock.c
+++ b/qemu-coroutine-lock.c
@@ -29,28 +29,36 @@
#include "block/aio.h"
#include "trace.h"
-static QTAILQ_HEAD(, Coroutine) unlock_bh_queue =
- QTAILQ_HEAD_INITIALIZER(unlock_bh_queue);
-static QEMUBH* unlock_bh;
+/* Coroutines are awoken from a BH to allow the current coroutine to complete
+ * its flow of execution. The BH may run after the CoQueue has been destroyed,
+ * so keep BH data in a separate heap-allocated struct.
+ */
+typedef struct {
+ QEMUBH *bh;
+ QTAILQ_HEAD(, Coroutine) entries;
+} CoQueueNextData;
static void qemu_co_queue_next_bh(void *opaque)
{
+ CoQueueNextData *data = opaque;
Coroutine *next;
trace_qemu_co_queue_next_bh();
- while ((next = QTAILQ_FIRST(&unlock_bh_queue))) {
- QTAILQ_REMOVE(&unlock_bh_queue, next, co_queue_next);
+ while ((next = QTAILQ_FIRST(&data->entries))) {
+ QTAILQ_REMOVE(&data->entries, next, co_queue_next);
qemu_coroutine_enter(next, NULL);
}
+
+ qemu_bh_delete(data->bh);
+ g_slice_free(CoQueueNextData, data);
}
void qemu_co_queue_init(CoQueue *queue)
{
QTAILQ_INIT(&queue->entries);
- if (!unlock_bh) {
- unlock_bh = qemu_bh_new(qemu_co_queue_next_bh, NULL);
- }
+ /* This will be exposed to callers once there are multiple AioContexts */
+ queue->ctx = qemu_get_aio_context();
}
void coroutine_fn qemu_co_queue_wait(CoQueue *queue)
@@ -69,26 +77,39 @@ void coroutine_fn qemu_co_queue_wait_insert_head(CoQueue *queue)
assert(qemu_in_coroutine());
}
-bool qemu_co_queue_next(CoQueue *queue)
+static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
{
Coroutine *next;
+ CoQueueNextData *data;
+
+ if (QTAILQ_EMPTY(&queue->entries)) {
+ return false;
+ }
- next = QTAILQ_FIRST(&queue->entries);
- if (next) {
+ data = g_slice_new(CoQueueNextData);
+ data->bh = aio_bh_new(queue->ctx, qemu_co_queue_next_bh, data);
+ QTAILQ_INIT(&data->entries);
+ qemu_bh_schedule(data->bh);
+
+ while ((next = QTAILQ_FIRST(&queue->entries)) != NULL) {
QTAILQ_REMOVE(&queue->entries, next, co_queue_next);
- QTAILQ_INSERT_TAIL(&unlock_bh_queue, next, co_queue_next);
+ QTAILQ_INSERT_TAIL(&data->entries, next, co_queue_next);
trace_qemu_co_queue_next(next);
- qemu_bh_schedule(unlock_bh);
+ if (single) {
+ break;
+ }
}
+ return true;
+}
- return (next != NULL);
+bool qemu_co_queue_next(CoQueue *queue)
+{
+ return qemu_co_queue_do_restart(queue, true);
}
void qemu_co_queue_restart_all(CoQueue *queue)
{
- while (qemu_co_queue_next(queue)) {
- /* Do nothing */
- }
+ qemu_co_queue_do_restart(queue, false);
}
bool qemu_co_queue_empty(CoQueue *queue)
--
1.8.1.4
^ permalink raw reply related [flat|nested] 10+ messages in thread
* Re: [Qemu-devel] [PATCH v2 0/7] threadpool: support multiple ThreadPools
2013-03-07 12:41 [Qemu-devel] [PATCH v2 0/7] threadpool: support multiple ThreadPools Stefan Hajnoczi
` (6 preceding siblings ...)
2013-03-07 12:41 ` [Qemu-devel] [PATCH v2 7/7] coroutine: use AioContext for CoQueue BH Stefan Hajnoczi
@ 2013-03-07 17:16 ` Paolo Bonzini
2013-03-13 13:37 ` Stefan Hajnoczi
8 siblings, 0 replies; 10+ messages in thread
From: Paolo Bonzini @ 2013-03-07 17:16 UTC (permalink / raw)
To: Stefan Hajnoczi; +Cc: Kevin Wolf, qemu-devel
Il 07/03/2013 13:41, Stefan Hajnoczi ha scritto:
> This patch series changes the global thread pool to a one ThreadPool per
> AioContext model. We still only use the main loop AioContext so in practice
> there is just one ThreadPool. But this opens the door to refactoring the block
> layer (which depends on ThreadPool) so block devices can be accessed outside
> the global mutex in the future.
>
> ThreadPool is tightly bound to an AioContext because it uses an EventNotifier
> to signal work completion. Completed work items are reaped and their callback
> functions are invoked from the EventNotifier read handler (executing under
> AioContext).
>
> It might be possible to record the AioContext for the completion callback on a
> per-request basis and continuing to use a global pool of worker threads. After
> discussing thread pool models with Paolo I have been convinced that it is
> simpler and more scalable to have one ThreadPool per AioContext instead.
> Therefore this series implements the 1:1 approach. For details on previous
> thread pool model discussion, see:
> http://lists.gnu.org/archive/html/qemu-devel/2013-02/msg03987.html
>
> The final patch was previously separate but I have included it because it
> depends on qemu_get_aio_context(). It is unrelated to ThreadPool but the patch
> reviewers are the same in both instances, so I combined the series.
>
> At the end of this series block/raw-posix.c and block/raw-win32.c are aware of
> the ThreadPool they submit work to. The next step after this series is to
> associate BlockDriverState with an AioContext so that the block layer can run
> outside the global main loop.
>
> v2:
> * Always find AioContext, don't split if (ctx) cases [Paolo]
> * Introduce bdrv_get_aio_context() [Paolo]
> * Add CoQueue AioContext patch since it depends on qemu_get_aio_context()
>
> Stefan Hajnoczi (7):
> main-loop: add qemu_get_aio_context()
> threadpool: move globals into struct ThreadPool
> threadpool: add thread_pool_new() and thread_pool_free()
> aio: add a ThreadPool instance to AioContext
> block: add bdrv_get_aio_context()
> threadpool: drop global thread pool
> coroutine: use AioContext for CoQueue BH
>
> async.c | 11 ++
> block.c | 6 ++
> block/raw-posix.c | 8 +-
> block/raw-win32.c | 4 +-
> include/block/aio.h | 6 ++
> include/block/block_int.h | 7 ++
> include/block/coroutine.h | 1 +
> include/block/thread-pool.h | 15 ++-
> include/qemu/main-loop.h | 5 +
> main-loop.c | 5 +
> qemu-coroutine-lock.c | 55 ++++++----
> tests/test-thread-pool.c | 44 ++++----
> thread-pool.c | 243 ++++++++++++++++++++++++++++----------------
> trace-events | 4 +-
> 14 files changed, 276 insertions(+), 138 deletions(-)
>
Reviewed-by: Paolo Bonzini <pbonzini@redhat.com>
^ permalink raw reply [flat|nested] 10+ messages in thread
* Re: [Qemu-devel] [PATCH v2 0/7] threadpool: support multiple ThreadPools
2013-03-07 12:41 [Qemu-devel] [PATCH v2 0/7] threadpool: support multiple ThreadPools Stefan Hajnoczi
` (7 preceding siblings ...)
2013-03-07 17:16 ` [Qemu-devel] [PATCH v2 0/7] threadpool: support multiple ThreadPools Paolo Bonzini
@ 2013-03-13 13:37 ` Stefan Hajnoczi
8 siblings, 0 replies; 10+ messages in thread
From: Stefan Hajnoczi @ 2013-03-13 13:37 UTC (permalink / raw)
To: Stefan Hajnoczi; +Cc: Kevin Wolf, Paolo Bonzini, qemu-devel
On Thu, Mar 07, 2013 at 01:41:43PM +0100, Stefan Hajnoczi wrote:
> This patch series changes the global thread pool to a one ThreadPool per
> AioContext model. We still only use the main loop AioContext so in practice
> there is just one ThreadPool. But this opens the door to refactoring the block
> layer (which depends on ThreadPool) so block devices can be accessed outside
> the global mutex in the future.
>
> ThreadPool is tightly bound to an AioContext because it uses an EventNotifier
> to signal work completion. Completed work items are reaped and their callback
> functions are invoked from the EventNotifier read handler (executing under
> AioContext).
>
> It might be possible to record the AioContext for the completion callback on a
> per-request basis and continuing to use a global pool of worker threads. After
> discussing thread pool models with Paolo I have been convinced that it is
> simpler and more scalable to have one ThreadPool per AioContext instead.
> Therefore this series implements the 1:1 approach. For details on previous
> thread pool model discussion, see:
> http://lists.gnu.org/archive/html/qemu-devel/2013-02/msg03987.html
>
> The final patch was previously separate but I have included it because it
> depends on qemu_get_aio_context(). It is unrelated to ThreadPool but the patch
> reviewers are the same in both instances, so I combined the series.
>
> At the end of this series block/raw-posix.c and block/raw-win32.c are aware of
> the ThreadPool they submit work to. The next step after this series is to
> associate BlockDriverState with an AioContext so that the block layer can run
> outside the global main loop.
>
> v2:
> * Always find AioContext, don't split if (ctx) cases [Paolo]
> * Introduce bdrv_get_aio_context() [Paolo]
> * Add CoQueue AioContext patch since it depends on qemu_get_aio_context()
>
> Stefan Hajnoczi (7):
> main-loop: add qemu_get_aio_context()
> threadpool: move globals into struct ThreadPool
> threadpool: add thread_pool_new() and thread_pool_free()
> aio: add a ThreadPool instance to AioContext
> block: add bdrv_get_aio_context()
> threadpool: drop global thread pool
> coroutine: use AioContext for CoQueue BH
>
> async.c | 11 ++
> block.c | 6 ++
> block/raw-posix.c | 8 +-
> block/raw-win32.c | 4 +-
> include/block/aio.h | 6 ++
> include/block/block_int.h | 7 ++
> include/block/coroutine.h | 1 +
> include/block/thread-pool.h | 15 ++-
> include/qemu/main-loop.h | 5 +
> main-loop.c | 5 +
> qemu-coroutine-lock.c | 55 ++++++----
> tests/test-thread-pool.c | 44 ++++----
> thread-pool.c | 243 ++++++++++++++++++++++++++++----------------
> trace-events | 4 +-
> 14 files changed, 276 insertions(+), 138 deletions(-)
>
> --
> 1.8.1.4
>
>
Applied to my block tree:
https://github.com/stefanha/qemu/commits/block
Stefan
^ permalink raw reply [flat|nested] 10+ messages in thread