From: Stefan Hajnoczi <stefanha@gmail.com>
To: "Maciej S. Szmigiero" <mail@maciej.szmigiero.name>
Cc: "Peter Xu" <peterx@redhat.com>, "Fabiano Rosas" <farosas@suse.de>,
"Alex Williamson" <alex.williamson@redhat.com>,
"Cédric Le Goater" <clg@redhat.com>,
"Eric Blake" <eblake@redhat.com>,
"Markus Armbruster" <armbru@redhat.com>,
"Daniel P . Berrangé" <berrange@redhat.com>,
"Avihai Horon" <avihaih@nvidia.com>,
"Joao Martins" <joao.m.martins@oracle.com>,
qemu-devel@nongnu.org
Subject: Re: [PATCH v2 05/17] thread-pool: Implement non-AIO (generic) pool support
Date: Tue, 3 Sep 2024 09:55:24 -0400 [thread overview]
Message-ID: <CAJSP0QU+dhh_e2LJRoGCxtCEh6C2-GBoZoFZL2W-SMSQPzUtYg@mail.gmail.com> (raw)
In-Reply-To: <54947c3a1df713f5b69d8296938f3da41116ffe0.1724701542.git.maciej.szmigiero@oracle.com>
On Tue, 27 Aug 2024 at 13:58, Maciej S. Szmigiero
<mail@maciej.szmigiero.name> wrote:
>
> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>
> Migration code wants to manage device data sending threads in one place.
>
> QEMU has an existing thread pool implementation, however it was limited
> to queuing AIO operations only and essentially had a 1:1 mapping between
> the current AioContext and the ThreadPool in use.
>
> Implement what is necessary to queue generic (non-AIO) work on a ThreadPool
> too.
>
> This brings a few new operations on a pool:
> * thread_pool_set_minmax_threads() explicitly sets the minimum and maximum
> thread count in the pool.
>
> * thread_pool_join() operation waits until all the submitted work requests
> have finished.
>
> * thread_pool_poll() lets the new thread and / or thread completion bottom
> halves run (if they are indeed scheduled to be run).
> It is useful for thread pool users that need to launch or terminate new
> threads without returning to the QEMU main loop.
Did you consider glib's GThreadPool?
https://docs.gtk.org/glib/struct.ThreadPool.html
QEMU's thread pool is integrated into the QEMU event loop. If your
goal is to bypass the QEMU event loop, then you may as well use the
glib API instead.
thread_pool_join() and thread_pool_poll() will lead to code that
blocks the event loop. QEMU's aio_poll() and nested event loops in
general are a source of hangs and re-entrancy bugs. I would prefer not
introducing these issues in the QEMU ThreadPool API.
>
> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
> ---
> include/block/thread-pool.h | 10 ++++-
> tests/unit/test-thread-pool.c | 2 +-
> util/thread-pool.c | 77 ++++++++++++++++++++++++++++++-----
> 3 files changed, 76 insertions(+), 13 deletions(-)
>
> diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
> index b484c4780ea6..1769496056cd 100644
> --- a/include/block/thread-pool.h
> +++ b/include/block/thread-pool.h
> @@ -37,9 +37,15 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
> void *arg, GDestroyNotify arg_destroy,
> BlockCompletionFunc *cb, void *opaque);
> int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
> -void thread_pool_submit(ThreadPoolFunc *func,
> - void *arg, GDestroyNotify arg_destroy);
> +BlockAIOCB *thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
> + void *arg, GDestroyNotify arg_destroy,
> + BlockCompletionFunc *cb, void *opaque);
>
> +void thread_pool_join(ThreadPool *pool);
> +void thread_pool_poll(ThreadPool *pool);
> +
> +void thread_pool_set_minmax_threads(ThreadPool *pool,
> + int min_threads, int max_threads);
> void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
>
> #endif
> diff --git a/tests/unit/test-thread-pool.c b/tests/unit/test-thread-pool.c
> index e4afb9e36292..469c0f7057b6 100644
> --- a/tests/unit/test-thread-pool.c
> +++ b/tests/unit/test-thread-pool.c
> @@ -46,7 +46,7 @@ static void done_cb(void *opaque, int ret)
> static void test_submit(void)
> {
> WorkerTestData data = { .n = 0 };
> - thread_pool_submit(worker_cb, &data, NULL);
> + thread_pool_submit(NULL, worker_cb, &data, NULL, NULL, NULL);
> while (data.n == 0) {
> aio_poll(ctx, true);
> }
> diff --git a/util/thread-pool.c b/util/thread-pool.c
> index 69a87ee79252..2bf3be875a51 100644
> --- a/util/thread-pool.c
> +++ b/util/thread-pool.c
> @@ -60,6 +60,7 @@ struct ThreadPool {
> QemuMutex lock;
> QemuCond worker_stopped;
> QemuCond request_cond;
> + QemuCond no_requests_cond;
> QEMUBH *new_thread_bh;
>
> /* The following variables are only accessed from one AioContext. */
> @@ -73,6 +74,7 @@ struct ThreadPool {
> int pending_threads; /* threads created but not running yet */
> int min_threads;
> int max_threads;
> + size_t requests_executing;
> };
>
> static void *worker_thread(void *opaque)
> @@ -107,6 +109,10 @@ static void *worker_thread(void *opaque)
> req = QTAILQ_FIRST(&pool->request_list);
> QTAILQ_REMOVE(&pool->request_list, req, reqs);
> req->state = THREAD_ACTIVE;
> +
> + assert(pool->requests_executing < SIZE_MAX);
> + pool->requests_executing++;
> +
> qemu_mutex_unlock(&pool->lock);
>
> ret = req->func(req->arg);
> @@ -118,6 +124,14 @@ static void *worker_thread(void *opaque)
>
> qemu_bh_schedule(pool->completion_bh);
> qemu_mutex_lock(&pool->lock);
> +
> + assert(pool->requests_executing > 0);
> + pool->requests_executing--;
> +
> + if (pool->requests_executing == 0 &&
> + QTAILQ_EMPTY(&pool->request_list)) {
> + qemu_cond_signal(&pool->no_requests_cond);
> + }
> }
>
> pool->cur_threads--;
> @@ -243,13 +257,16 @@ static const AIOCBInfo thread_pool_aiocb_info = {
> .cancel_async = thread_pool_cancel,
> };
>
> -BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
> - void *arg, GDestroyNotify arg_destroy,
> - BlockCompletionFunc *cb, void *opaque)
> +BlockAIOCB *thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
> + void *arg, GDestroyNotify arg_destroy,
> + BlockCompletionFunc *cb, void *opaque)
> {
> ThreadPoolElement *req;
> AioContext *ctx = qemu_get_current_aio_context();
> - ThreadPool *pool = aio_get_thread_pool(ctx);
> +
> + if (!pool) {
> + pool = aio_get_thread_pool(ctx);
> + }
>
> /* Assert that the thread submitting work is the same running the pool */
> assert(pool->ctx == qemu_get_current_aio_context());
> @@ -275,6 +292,18 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
> return &req->common;
> }
>
> +BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
> + void *arg, GDestroyNotify arg_destroy,
> + BlockCompletionFunc *cb, void *opaque)
> +{
> + return thread_pool_submit(NULL, func, arg, arg_destroy, cb, opaque);
> +}
> +
> +void thread_pool_poll(ThreadPool *pool)
> +{
> + aio_bh_poll(pool->ctx);
> +}
> +
> typedef struct ThreadPoolCo {
> Coroutine *co;
> int ret;
> @@ -297,18 +326,38 @@ int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
> return tpc.ret;
> }
>
> -void thread_pool_submit(ThreadPoolFunc *func,
> - void *arg, GDestroyNotify arg_destroy)
> +void thread_pool_join(ThreadPool *pool)
> {
> - thread_pool_submit_aio(func, arg, arg_destroy, NULL, NULL);
> + /* Assert that the thread waiting is the same running the pool */
> + assert(pool->ctx == qemu_get_current_aio_context());
> +
> + qemu_mutex_lock(&pool->lock);
> +
> + if (pool->requests_executing > 0 ||
> + !QTAILQ_EMPTY(&pool->request_list)) {
> + qemu_cond_wait(&pool->no_requests_cond, &pool->lock);
> + }
> + assert(pool->requests_executing == 0 &&
> + QTAILQ_EMPTY(&pool->request_list));
> +
> + qemu_mutex_unlock(&pool->lock);
> +
> + aio_bh_poll(pool->ctx);
> +
> + assert(QLIST_EMPTY(&pool->head));
> }
>
> -void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
> +void thread_pool_set_minmax_threads(ThreadPool *pool,
> + int min_threads, int max_threads)
> {
> + assert(min_threads >= 0);
> + assert(max_threads > 0);
> + assert(max_threads >= min_threads);
> +
> qemu_mutex_lock(&pool->lock);
>
> - pool->min_threads = ctx->thread_pool_min;
> - pool->max_threads = ctx->thread_pool_max;
> + pool->min_threads = min_threads;
> + pool->max_threads = max_threads;
>
> /*
> * We either have to:
> @@ -330,6 +379,12 @@ void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
> qemu_mutex_unlock(&pool->lock);
> }
>
> +void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
> +{
> + thread_pool_set_minmax_threads(pool,
> + ctx->thread_pool_min, ctx->thread_pool_max);
> +}
> +
> static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
> {
> if (!ctx) {
> @@ -342,6 +397,7 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
> qemu_mutex_init(&pool->lock);
> qemu_cond_init(&pool->worker_stopped);
> qemu_cond_init(&pool->request_cond);
> + qemu_cond_init(&pool->no_requests_cond);
> pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
>
> QLIST_INIT(&pool->head);
> @@ -382,6 +438,7 @@ void thread_pool_free(ThreadPool *pool)
> qemu_mutex_unlock(&pool->lock);
>
> qemu_bh_delete(pool->completion_bh);
> + qemu_cond_destroy(&pool->no_requests_cond);
> qemu_cond_destroy(&pool->request_cond);
> qemu_cond_destroy(&pool->worker_stopped);
> qemu_mutex_destroy(&pool->lock);
>
next prev parent reply other threads:[~2024-09-03 15:01 UTC|newest]
Thread overview: 128+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-08-27 17:54 [PATCH v2 00/17] Multifd 🔀 device state transfer support with VFIO consumer Maciej S. Szmigiero
2024-08-27 17:54 ` [PATCH v2 01/17] vfio/migration: Add save_{iterate, complete_precopy}_started trace events Maciej S. Szmigiero
2024-09-05 13:08 ` [PATCH v2 01/17] vfio/migration: Add save_{iterate,complete_precopy}_started " Avihai Horon
2024-09-09 18:04 ` Maciej S. Szmigiero
2024-09-11 14:50 ` Avihai Horon
2024-08-27 17:54 ` [PATCH v2 02/17] migration/ram: Add load start trace event Maciej S. Szmigiero
2024-08-28 18:44 ` Fabiano Rosas
2024-08-28 20:21 ` Maciej S. Szmigiero
2024-08-27 17:54 ` [PATCH v2 03/17] migration/multifd: Zero p->flags before starting filling a packet Maciej S. Szmigiero
2024-08-28 18:50 ` Fabiano Rosas
2024-09-09 15:41 ` Peter Xu
2024-08-27 17:54 ` [PATCH v2 04/17] thread-pool: Add a DestroyNotify parameter to thread_pool_submit{, _aio)() Maciej S. Szmigiero
2024-08-27 17:54 ` [PATCH v2 05/17] thread-pool: Implement non-AIO (generic) pool support Maciej S. Szmigiero
2024-09-02 22:07 ` Fabiano Rosas
2024-09-03 12:02 ` Maciej S. Szmigiero
2024-09-03 14:26 ` Fabiano Rosas
2024-09-03 18:14 ` Maciej S. Szmigiero
2024-09-03 13:55 ` Stefan Hajnoczi [this message]
2024-09-03 16:54 ` Maciej S. Szmigiero
2024-09-03 19:04 ` Stefan Hajnoczi
2024-09-09 16:45 ` Peter Xu
2024-09-09 18:38 ` Maciej S. Szmigiero
2024-09-09 19:12 ` Peter Xu
2024-09-09 19:16 ` Maciej S. Szmigiero
2024-09-09 19:24 ` Peter Xu
2024-08-27 17:54 ` [PATCH v2 06/17] migration: Add save_live_complete_precopy_{begin, end} handlers Maciej S. Szmigiero
2024-08-28 19:03 ` [PATCH v2 06/17] migration: Add save_live_complete_precopy_{begin,end} handlers Fabiano Rosas
2024-09-05 13:45 ` Avihai Horon
2024-09-09 17:59 ` Peter Xu
2024-09-09 18:32 ` Maciej S. Szmigiero
2024-09-09 19:08 ` Peter Xu
2024-09-09 19:32 ` Peter Xu
2024-09-19 19:48 ` Maciej S. Szmigiero
2024-09-19 19:47 ` Maciej S. Szmigiero
2024-09-19 20:54 ` Peter Xu
2024-09-20 15:22 ` Maciej S. Szmigiero
2024-09-20 16:08 ` Peter Xu
2024-09-09 18:05 ` Maciej S. Szmigiero
2024-08-27 17:54 ` [PATCH v2 07/17] migration: Add qemu_loadvm_load_state_buffer() and its handler Maciej S. Szmigiero
2024-08-30 19:05 ` Fabiano Rosas
2024-09-05 14:15 ` Avihai Horon
2024-09-09 18:05 ` Maciej S. Szmigiero
2024-08-27 17:54 ` [PATCH v2 08/17] migration: Add load_finish handler and associated functions Maciej S. Szmigiero
2024-08-30 19:28 ` Fabiano Rosas
2024-09-05 15:13 ` Avihai Horon
2024-09-09 18:05 ` Maciej S. Szmigiero
2024-09-09 20:03 ` Peter Xu
2024-09-19 19:49 ` Maciej S. Szmigiero
2024-09-19 21:11 ` Peter Xu
2024-09-20 15:23 ` Maciej S. Szmigiero
2024-09-20 16:45 ` Peter Xu
2024-09-26 22:34 ` Maciej S. Szmigiero
2024-09-27 0:53 ` Peter Xu
2024-09-30 19:25 ` Maciej S. Szmigiero
2024-09-30 21:57 ` Peter Xu
2024-10-01 20:41 ` Maciej S. Szmigiero
2024-10-01 21:30 ` Peter Xu
2024-10-02 20:11 ` Maciej S. Szmigiero
2024-10-02 21:25 ` Peter Xu
2024-10-03 20:34 ` Maciej S. Szmigiero
2024-10-03 21:17 ` Peter Xu
2024-08-27 17:54 ` [PATCH v2 09/17] migration/multifd: Device state transfer support - receive side Maciej S. Szmigiero
2024-08-30 20:22 ` Fabiano Rosas
2024-09-02 20:12 ` Maciej S. Szmigiero
2024-09-03 14:42 ` Fabiano Rosas
2024-09-03 18:41 ` Maciej S. Szmigiero
2024-09-09 19:52 ` Peter Xu
2024-09-19 19:49 ` Maciej S. Szmigiero
2024-09-05 16:47 ` Avihai Horon
2024-09-09 18:05 ` Maciej S. Szmigiero
2024-09-12 8:13 ` Avihai Horon
2024-09-12 13:52 ` Fabiano Rosas
2024-09-19 19:59 ` Maciej S. Szmigiero
2024-08-27 17:54 ` [PATCH v2 10/17] migration/multifd: Convert multifd_send()::next_channel to atomic Maciej S. Szmigiero
2024-08-30 18:13 ` Fabiano Rosas
2024-09-02 20:11 ` Maciej S. Szmigiero
2024-09-03 15:01 ` Fabiano Rosas
2024-09-03 20:04 ` Maciej S. Szmigiero
2024-09-10 14:13 ` Peter Xu
2024-08-27 17:54 ` [PATCH v2 11/17] migration/multifd: Add an explicit MultiFDSendData destructor Maciej S. Szmigiero
2024-08-30 13:12 ` Fabiano Rosas
2024-08-27 17:54 ` [PATCH v2 12/17] migration/multifd: Device state transfer support - send side Maciej S. Szmigiero
2024-08-29 0:41 ` Fabiano Rosas
2024-08-29 20:03 ` Maciej S. Szmigiero
2024-08-30 13:02 ` Fabiano Rosas
2024-09-09 19:40 ` Peter Xu
2024-09-19 19:50 ` Maciej S. Szmigiero
2024-09-10 19:48 ` Peter Xu
2024-09-12 18:43 ` Fabiano Rosas
2024-09-13 0:23 ` Peter Xu
2024-09-13 13:21 ` Fabiano Rosas
2024-09-13 14:19 ` Peter Xu
2024-09-13 15:04 ` Fabiano Rosas
2024-09-13 15:22 ` Peter Xu
2024-09-13 18:26 ` Fabiano Rosas
2024-09-17 15:39 ` Peter Xu
2024-09-17 17:07 ` Cédric Le Goater
2024-09-17 17:50 ` Peter Xu
2024-09-19 19:51 ` Maciej S. Szmigiero
2024-09-19 19:49 ` Maciej S. Szmigiero
2024-09-19 21:17 ` Peter Xu
2024-09-20 15:23 ` Maciej S. Szmigiero
2024-09-20 17:09 ` Peter Xu
2024-09-10 16:06 ` Peter Xu
2024-09-19 19:49 ` Maciej S. Szmigiero
2024-09-19 21:18 ` Peter Xu
2024-08-27 17:54 ` [PATCH v2 13/17] migration/multifd: Add migration_has_device_state_support() Maciej S. Szmigiero
2024-08-30 18:55 ` Fabiano Rosas
2024-09-02 20:11 ` Maciej S. Szmigiero
2024-09-03 15:09 ` Fabiano Rosas
2024-08-27 17:54 ` [PATCH v2 14/17] migration: Add save_live_complete_precopy_thread handler Maciej S. Szmigiero
2024-08-27 17:54 ` [PATCH v2 15/17] vfio/migration: Multifd device state transfer support - receive side Maciej S. Szmigiero
2024-09-09 8:55 ` Avihai Horon
2024-09-09 18:06 ` Maciej S. Szmigiero
2024-09-12 8:20 ` Avihai Horon
2024-09-12 8:45 ` Cédric Le Goater
2024-08-27 17:54 ` [PATCH v2 16/17] vfio/migration: Add x-migration-multifd-transfer VFIO property Maciej S. Szmigiero
2024-08-27 17:54 ` [PATCH v2 17/17] vfio/migration: Multifd device state transfer support - send side Maciej S. Szmigiero
2024-09-09 11:41 ` Avihai Horon
2024-09-09 18:07 ` Maciej S. Szmigiero
2024-09-12 8:26 ` Avihai Horon
2024-09-12 8:57 ` Cédric Le Goater
2024-08-28 20:46 ` [PATCH v2 00/17] Multifd 🔀 device state transfer support with VFIO consumer Fabiano Rosas
2024-08-28 21:58 ` Maciej S. Szmigiero
2024-08-29 0:51 ` Fabiano Rosas
2024-08-29 20:02 ` Maciej S. Szmigiero
2024-10-11 13:58 ` Cédric Le Goater
2024-10-15 21:12 ` Maciej S. Szmigiero
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=CAJSP0QU+dhh_e2LJRoGCxtCEh6C2-GBoZoFZL2W-SMSQPzUtYg@mail.gmail.com \
--to=stefanha@gmail.com \
--cc=alex.williamson@redhat.com \
--cc=armbru@redhat.com \
--cc=avihaih@nvidia.com \
--cc=berrange@redhat.com \
--cc=clg@redhat.com \
--cc=eblake@redhat.com \
--cc=farosas@suse.de \
--cc=joao.m.martins@oracle.com \
--cc=mail@maciej.szmigiero.name \
--cc=peterx@redhat.com \
--cc=qemu-devel@nongnu.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).