From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([208.118.235.92]:60199) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1UDHJF-0002U1-6t for qemu-devel@nongnu.org; Wed, 06 Mar 2013 11:35:18 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1UDHJC-0008DK-Gz for qemu-devel@nongnu.org; Wed, 06 Mar 2013 11:35:17 -0500 Received: from mx1.redhat.com ([209.132.183.28]:54855) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1UDHJC-0008D3-7K for qemu-devel@nongnu.org; Wed, 06 Mar 2013 11:35:14 -0500 Received: from int-mx09.intmail.prod.int.phx2.redhat.com (int-mx09.intmail.prod.int.phx2.redhat.com [10.5.11.22]) by mx1.redhat.com (8.14.4/8.14.4) with ESMTP id r26GZC2x022171 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES256-SHA bits=256 verify=OK) for ; Wed, 6 Mar 2013 11:35:12 -0500 Message-ID: <5137703D.8080401@redhat.com> Date: Wed, 06 Mar 2013 17:35:09 +0100 From: Paolo Bonzini MIME-Version: 1.0 References: <1362584735-30911-1-git-send-email-stefanha@redhat.com> <1362584735-30911-6-git-send-email-stefanha@redhat.com> In-Reply-To: <1362584735-30911-6-git-send-email-stefanha@redhat.com> Content-Type: text/plain; charset=ISO-8859-15 Content-Transfer-Encoding: 7bit Subject: Re: [Qemu-devel] [PATCH 5/5] threadpool: drop global thread pool List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Stefan Hajnoczi Cc: Kevin Wolf , qemu-devel@nongnu.org Il 06/03/2013 16:45, Stefan Hajnoczi ha scritto: > Now that each AioContext has a ThreadPool and the main loop AioContext > can be fetched with qemu_get_aio_context(), we can eliminate the concept > of a global thread pool from thread-pool.c. > > The submit functions must take a ThreadPool* argument. This is certainly ok for thread-pool.c. For raw-posix and raw-win32, what about adding already a bdrv_get_aio_context() function and using that in paio_submit? Is it putting the cart before the horse? Paolo > block/raw-posix.c and block/raw-win32.c use > aio_get_thread_pool(qemu_get_aio_context()) to fetch the main loop's > ThreadPool. > > tests/test-thread-pool.c must be updated to reflect the new > thread_pool_submit() function prototypes. > > Signed-off-by: Stefan Hajnoczi > --- > block/raw-posix.c | 8 ++++++-- > block/raw-win32.c | 4 +++- > include/block/thread-pool.h | 10 ++++++---- > tests/test-thread-pool.c | 44 +++++++++++++++++++++----------------------- > thread-pool.c | 23 +++++++---------------- > 5 files changed, 43 insertions(+), 46 deletions(-) > > diff --git a/block/raw-posix.c b/block/raw-posix.c > index 4dfdf98..01e5ae8 100644 > --- a/block/raw-posix.c > +++ b/block/raw-posix.c > @@ -750,6 +750,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd, > BlockDriverCompletionFunc *cb, void *opaque, int type) > { > RawPosixAIOData *acb = g_slice_new(RawPosixAIOData); > + ThreadPool *pool; > > acb->bs = bs; > acb->aio_type = type; > @@ -763,7 +764,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd, > acb->aio_offset = sector_num * 512; > > trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); > - return thread_pool_submit_aio(aio_worker, acb, cb, opaque); > + pool = aio_get_thread_pool(qemu_get_aio_context()); > + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); > } > > static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs, > @@ -1413,6 +1415,7 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs, > { > BDRVRawState *s = bs->opaque; > RawPosixAIOData *acb; > + ThreadPool *pool; > > if (fd_open(bs) < 0) > return NULL; > @@ -1424,7 +1427,8 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs, > acb->aio_offset = 0; > acb->aio_ioctl_buf = buf; > acb->aio_ioctl_cmd = req; > - return thread_pool_submit_aio(aio_worker, acb, cb, opaque); > + pool = aio_get_thread_pool(qemu_get_aio_context()); > + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); > } > > #elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__) > diff --git a/block/raw-win32.c b/block/raw-win32.c > index b89ac19..515614b 100644 > --- a/block/raw-win32.c > +++ b/block/raw-win32.c > @@ -144,6 +144,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile, > BlockDriverCompletionFunc *cb, void *opaque, int type) > { > RawWin32AIOData *acb = g_slice_new(RawWin32AIOData); > + ThreadPool *pool; > > acb->bs = bs; > acb->hfile = hfile; > @@ -157,7 +158,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile, > acb->aio_offset = sector_num * 512; > > trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); > - return thread_pool_submit_aio(aio_worker, acb, cb, opaque); > + pool = aio_get_thread_pool(qemu_get_aio_context()); > + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); > } > > int qemu_ftruncate64(int fd, int64_t length) > diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h > index e1453c6..32afcdd 100644 > --- a/include/block/thread-pool.h > +++ b/include/block/thread-pool.h > @@ -31,9 +31,11 @@ typedef struct ThreadPool ThreadPool; > ThreadPool *thread_pool_new(struct AioContext *ctx); > void thread_pool_free(ThreadPool *pool); > > -BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, > - BlockDriverCompletionFunc *cb, void *opaque); > -int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg); > -void thread_pool_submit(ThreadPoolFunc *func, void *arg); > +BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool, > + ThreadPoolFunc *func, void *arg, > + BlockDriverCompletionFunc *cb, void *opaque); > +int coroutine_fn thread_pool_submit_co(ThreadPool *pool, > + ThreadPoolFunc *func, void *arg); > +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg); > > #endif > diff --git a/tests/test-thread-pool.c b/tests/test-thread-pool.c > index 9998e03..22915aa 100644 > --- a/tests/test-thread-pool.c > +++ b/tests/test-thread-pool.c > @@ -4,6 +4,8 @@ > #include "block/thread-pool.h" > #include "block/block.h" > > +static AioContext *ctx; > +static ThreadPool *pool; > static int active; > > typedef struct { > @@ -38,19 +40,10 @@ static void done_cb(void *opaque, int ret) > active--; > } > > -/* A non-blocking poll of the main AIO context (we cannot use aio_poll > - * because we do not know the AioContext). > - */ > -static void qemu_aio_wait_nonblocking(void) > -{ > - qemu_notify_event(); > - qemu_aio_wait(); > -} > - > /* Wait until all aio and bh activity has finished */ > static void qemu_aio_wait_all(void) > { > - while (qemu_aio_wait()) { > + while (aio_poll(ctx, true)) { > /* Do nothing */ > } > } > @@ -58,7 +51,7 @@ static void qemu_aio_wait_all(void) > static void test_submit(void) > { > WorkerTestData data = { .n = 0 }; > - thread_pool_submit(worker_cb, &data); > + thread_pool_submit(pool, worker_cb, &data); > qemu_aio_wait_all(); > g_assert_cmpint(data.n, ==, 1); > } > @@ -66,7 +59,8 @@ static void test_submit(void) > static void test_submit_aio(void) > { > WorkerTestData data = { .n = 0, .ret = -EINPROGRESS }; > - data.aiocb = thread_pool_submit_aio(worker_cb, &data, done_cb, &data); > + data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data, > + done_cb, &data); > > /* The callbacks are not called until after the first wait. */ > active = 1; > @@ -84,7 +78,7 @@ static void co_test_cb(void *opaque) > active = 1; > data->n = 0; > data->ret = -EINPROGRESS; > - thread_pool_submit_co(worker_cb, data); > + thread_pool_submit_co(pool, worker_cb, data); > > /* The test continues in test_submit_co, after qemu_coroutine_enter... */ > > @@ -126,12 +120,12 @@ static void test_submit_many(void) > for (i = 0; i < 100; i++) { > data[i].n = 0; > data[i].ret = -EINPROGRESS; > - thread_pool_submit_aio(worker_cb, &data[i], done_cb, &data[i]); > + thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]); > } > > active = 100; > while (active > 0) { > - qemu_aio_wait(); > + aio_poll(ctx, true); > } > for (i = 0; i < 100; i++) { > g_assert_cmpint(data[i].n, ==, 1); > @@ -154,7 +148,7 @@ static void test_cancel(void) > for (i = 0; i < 100; i++) { > data[i].n = 0; > data[i].ret = -EINPROGRESS; > - data[i].aiocb = thread_pool_submit_aio(long_cb, &data[i], > + data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i], > done_cb, &data[i]); > } > > @@ -162,7 +156,8 @@ static void test_cancel(void) > * run, but do not waste too much time... > */ > active = 100; > - qemu_aio_wait_nonblocking(); > + aio_notify(ctx); > + aio_poll(ctx, false); > > /* Wait some time for the threads to start, with some sanity > * testing on the behavior of the scheduler... > @@ -208,11 +203,10 @@ static void test_cancel(void) > > int main(int argc, char **argv) > { > - /* These should be removed once each AioContext has its thread pool. > - * The test should create its own AioContext. > - */ > - qemu_init_main_loop(); > - bdrv_init(); > + int ret; > + > + ctx = aio_context_new(); > + pool = aio_get_thread_pool(ctx); > > g_test_init(&argc, &argv, NULL); > g_test_add_func("/thread-pool/submit", test_submit); > @@ -220,5 +214,9 @@ int main(int argc, char **argv) > g_test_add_func("/thread-pool/submit-co", test_submit_co); > g_test_add_func("/thread-pool/submit-many", test_submit_many); > g_test_add_func("/thread-pool/cancel", test_cancel); > - return g_test_run(); > + > + ret = g_test_run(); > + > + aio_context_unref(ctx); > + return ret; > } > diff --git a/thread-pool.c b/thread-pool.c > index 7a07408..e0e0a47 100644 > --- a/thread-pool.c > +++ b/thread-pool.c > @@ -78,9 +78,6 @@ struct ThreadPool { > bool stopping; > }; > > -/* Currently there is only one thread pool instance. */ > -static ThreadPool global_pool; > - > static void *worker_thread(void *opaque) > { > ThreadPool *pool = opaque; > @@ -239,10 +236,10 @@ static const AIOCBInfo thread_pool_aiocb_info = { > .cancel = thread_pool_cancel, > }; > > -BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, > +BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool, > + ThreadPoolFunc *func, void *arg, > BlockDriverCompletionFunc *cb, void *opaque) > { > - ThreadPool *pool = &global_pool; > ThreadPoolElement *req; > > req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque); > @@ -278,18 +275,19 @@ static void thread_pool_co_cb(void *opaque, int ret) > qemu_coroutine_enter(co->co, NULL); > } > > -int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg) > +int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, > + void *arg) > { > ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; > assert(qemu_in_coroutine()); > - thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc); > + thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc); > qemu_coroutine_yield(); > return tpc.ret; > } > > -void thread_pool_submit(ThreadPoolFunc *func, void *arg) > +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg) > { > - thread_pool_submit_aio(func, arg, NULL, NULL); > + thread_pool_submit_aio(pool, func, arg, NULL, NULL); > } > > static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) > @@ -363,10 +361,3 @@ void thread_pool_free(ThreadPool *pool) > event_notifier_cleanup(&pool->notifier); > g_free(pool); > } > - > -static void thread_pool_init(void) > -{ > - thread_pool_init_one(&global_pool, NULL); > -} > - > -block_init(thread_pool_init) >