qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: "Cédric Le Goater" <clg@redhat.com>
To: qemu-devel@nongnu.org
Cc: "Alex Williamson" <alex.williamson@redhat.com>,
	"Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>,
	"Fabiano Rosas" <farosas@suse.de>, "Peter Xu" <peterx@redhat.com>,
	"Cédric Le Goater" <clg@redhat.com>
Subject: [PULL 13/42] thread-pool: Implement generic (non-AIO) pool support
Date: Thu,  6 Mar 2025 15:13:49 +0100	[thread overview]
Message-ID: <20250306141419.2015340-14-clg@redhat.com> (raw)
In-Reply-To: <20250306141419.2015340-1-clg@redhat.com>

From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>

Migration code wants to manage device data sending threads in one place.

QEMU has an existing thread pool implementation, however it is limited
to queuing AIO operations only and essentially has a 1:1 mapping between
the current AioContext and the AIO ThreadPool in use.

Implement generic (non-AIO) ThreadPool by essentially wrapping Glib's
GThreadPool.

This brings a few new operations on a pool:
* thread_pool_wait() operation waits until all the submitted work requests
have finished.

* thread_pool_set_max_threads() explicitly sets the maximum thread count
in the pool.

* thread_pool_adjust_max_threads_to_work() adjusts the maximum thread count
in the pool to equal the number of still waiting in queue or unfinished work.

Reviewed-by: Fabiano Rosas <farosas@suse.de>
Reviewed-by: Peter Xu <peterx@redhat.com>
Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
Link: https://lore.kernel.org/qemu-devel/b1efaebdbea7cb7068b8fb74148777012383e12b.1741124640.git.maciej.szmigiero@oracle.com
Signed-off-by: Cédric Le Goater <clg@redhat.com>
---
 include/block/thread-pool.h |  51 ++++++++++++++++
 util/thread-pool.c          | 119 ++++++++++++++++++++++++++++++++++++
 2 files changed, 170 insertions(+)

diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
index 6f27eb085b451ecc712aeff385944541a729c83d..dd48cf07e85f886e9efc44916e4018141f352233 100644
--- a/include/block/thread-pool.h
+++ b/include/block/thread-pool.h
@@ -38,5 +38,56 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
 int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
 void thread_pool_update_params(ThreadPoolAio *pool, struct AioContext *ctx);
 
+/* ------------------------------------------- */
+/* Generic thread pool types and methods below */
+typedef struct ThreadPool ThreadPool;
+
+/* Create a new thread pool. Never returns NULL. */
+ThreadPool *thread_pool_new(void);
+
+/*
+ * Free the thread pool.
+ * Waits for all the previously submitted work to complete before performing
+ * the actual freeing operation.
+ */
+void thread_pool_free(ThreadPool *pool);
+
+/*
+ * Submit a new work (task) for the pool.
+ *
+ * @opaque_destroy is an optional GDestroyNotify for the @opaque argument
+ * to the work function at @func.
+ */
+void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
+                        void *opaque, GDestroyNotify opaque_destroy);
+
+/*
+ * Submit a new work (task) for the pool, making sure it starts getting
+ * processed immediately, launching a new thread for it if necessary.
+ *
+ * @opaque_destroy is an optional GDestroyNotify for the @opaque argument
+ * to the work function at @func.
+ */
+void thread_pool_submit_immediate(ThreadPool *pool, ThreadPoolFunc *func,
+                                  void *opaque, GDestroyNotify opaque_destroy);
+
+/*
+ * Wait for all previously submitted work to complete before returning.
+ *
+ * Can be used as a barrier between two sets of tasks executed on a thread
+ * pool without destroying it or in a performance sensitive path where the
+ * caller just wants to wait for all tasks to complete while deferring the
+ * pool free operation for later, less performance sensitive time.
+ */
+void thread_pool_wait(ThreadPool *pool);
+
+/* Set the maximum number of threads in the pool. */
+bool thread_pool_set_max_threads(ThreadPool *pool, int max_threads);
+
+/*
+ * Adjust the maximum number of threads in the pool to give each task its
+ * own thread (exactly one thread per task).
+ */
+bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool);
 
 #endif
diff --git a/util/thread-pool.c b/util/thread-pool.c
index 908194dc070fb90390c493f0c7fb10062dcb5c15..d2ead6b7285765821f0fa956132b207b5985b015 100644
--- a/util/thread-pool.c
+++ b/util/thread-pool.c
@@ -374,3 +374,122 @@ void thread_pool_free_aio(ThreadPoolAio *pool)
     qemu_mutex_destroy(&pool->lock);
     g_free(pool);
 }
+
+struct ThreadPool {
+    GThreadPool *t;
+    size_t cur_work;
+    QemuMutex cur_work_lock;
+    QemuCond all_finished_cond;
+};
+
+typedef struct {
+    ThreadPoolFunc *func;
+    void *opaque;
+    GDestroyNotify opaque_destroy;
+} ThreadPoolElement;
+
+static void thread_pool_func(gpointer data, gpointer user_data)
+{
+    ThreadPool *pool = user_data;
+    g_autofree ThreadPoolElement *el = data;
+
+    el->func(el->opaque);
+
+    if (el->opaque_destroy) {
+        el->opaque_destroy(el->opaque);
+    }
+
+    QEMU_LOCK_GUARD(&pool->cur_work_lock);
+
+    assert(pool->cur_work > 0);
+    pool->cur_work--;
+
+    if (pool->cur_work == 0) {
+        qemu_cond_signal(&pool->all_finished_cond);
+    }
+}
+
+ThreadPool *thread_pool_new(void)
+{
+    ThreadPool *pool = g_new(ThreadPool, 1);
+
+    pool->cur_work = 0;
+    qemu_mutex_init(&pool->cur_work_lock);
+    qemu_cond_init(&pool->all_finished_cond);
+
+    pool->t = g_thread_pool_new(thread_pool_func, pool, 0, TRUE, NULL);
+    /*
+     * g_thread_pool_new() can only return errors if initial thread(s)
+     * creation fails but we ask for 0 initial threads above.
+     */
+    assert(pool->t);
+
+    return pool;
+}
+
+void thread_pool_free(ThreadPool *pool)
+{
+    /*
+     * With _wait = TRUE this effectively waits for all
+     * previously submitted work to complete first.
+     */
+    g_thread_pool_free(pool->t, FALSE, TRUE);
+
+    qemu_cond_destroy(&pool->all_finished_cond);
+    qemu_mutex_destroy(&pool->cur_work_lock);
+
+    g_free(pool);
+}
+
+void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
+                        void *opaque, GDestroyNotify opaque_destroy)
+{
+    ThreadPoolElement *el = g_new(ThreadPoolElement, 1);
+
+    el->func = func;
+    el->opaque = opaque;
+    el->opaque_destroy = opaque_destroy;
+
+    WITH_QEMU_LOCK_GUARD(&pool->cur_work_lock) {
+        pool->cur_work++;
+    }
+
+    /*
+     * Ignore the return value since this function can only return errors
+     * if creation of an additional thread fails but even in this case the
+     * provided work is still getting queued (just for the existing threads).
+     */
+    g_thread_pool_push(pool->t, el, NULL);
+}
+
+void thread_pool_submit_immediate(ThreadPool *pool, ThreadPoolFunc *func,
+                                  void *opaque, GDestroyNotify opaque_destroy)
+{
+    thread_pool_submit(pool, func, opaque, opaque_destroy);
+    thread_pool_adjust_max_threads_to_work(pool);
+}
+
+void thread_pool_wait(ThreadPool *pool)
+{
+    QEMU_LOCK_GUARD(&pool->cur_work_lock);
+
+    while (pool->cur_work > 0) {
+        qemu_cond_wait(&pool->all_finished_cond,
+                       &pool->cur_work_lock);
+    }
+}
+
+bool thread_pool_set_max_threads(ThreadPool *pool,
+                                 int max_threads)
+{
+    assert(max_threads > 0);
+
+    return g_thread_pool_set_max_threads(pool->t, max_threads, NULL);
+}
+
+bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool)
+{
+    QEMU_LOCK_GUARD(&pool->cur_work_lock);
+
+    return thread_pool_set_max_threads(pool, pool->cur_work);
+}
-- 
2.48.1



  parent reply	other threads:[~2025-03-06 14:17 UTC|newest]

Thread overview: 44+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-03-06 14:13 [PULL 00/42] vfio queue Cédric Le Goater
2025-03-06 14:13 ` [PULL 01/42] vfio: Add property documentation Cédric Le Goater
2025-03-06 14:13 ` [PULL 02/42] vfio/ccw: Replace warn_once_pfch() with warn_report_once() Cédric Le Goater
2025-03-06 14:13 ` [PULL 03/42] hw/pci: Basic support for PCI power management Cédric Le Goater
2025-03-06 14:13 ` [PULL 04/42] pci: Use PCI PM capability initializer Cédric Le Goater
2025-03-06 14:13 ` [PULL 05/42] vfio/pci: Delete local pm_cap Cédric Le Goater
2025-03-06 14:13 ` [PULL 06/42] pcie, virtio: Remove redundant pm_cap Cédric Le Goater
2025-03-06 14:13 ` [PULL 07/42] hw/vfio/pci: Re-order pre-reset Cédric Le Goater
2025-03-06 14:13 ` [PULL 08/42] MAINTAINERS: Add myself as vfio-igd maintainer Cédric Le Goater
2025-03-06 14:13 ` [PULL 09/42] vfio-platform: Deprecate all forms of vfio-platform devices Cédric Le Goater
2025-03-06 14:13 ` [PULL 10/42] migration: Clarify that {load, save}_cleanup handlers can run without setup Cédric Le Goater
2025-03-06 14:13 ` [PULL 11/42] thread-pool: Remove thread_pool_submit() function Cédric Le Goater
2025-03-06 14:13 ` [PULL 12/42] thread-pool: Rename AIO pool functions to *_aio() and data types to *Aio Cédric Le Goater
2025-03-06 14:13 ` Cédric Le Goater [this message]
2025-03-06 14:13 ` [PULL 14/42] migration: Add MIG_CMD_SWITCHOVER_START and its load handler Cédric Le Goater
2025-03-06 14:13 ` [PULL 15/42] migration: Add qemu_loadvm_load_state_buffer() and its handler Cédric Le Goater
2025-03-06 14:13 ` [PULL 16/42] migration: Always take BQL for migration_incoming_state_destroy() Cédric Le Goater
2025-03-06 14:13 ` [PULL 17/42] error: define g_autoptr() cleanup function for the Error type Cédric Le Goater
2025-03-06 14:13 ` [PULL 18/42] migration: Add thread pool of optional load threads Cédric Le Goater
2025-03-06 14:13 ` [PULL 19/42] migration/multifd: Split packet into header and RAM data Cédric Le Goater
2025-03-06 14:13 ` [PULL 20/42] migration/multifd: Device state transfer support - receive side Cédric Le Goater
2025-03-06 14:13 ` [PULL 21/42] migration/multifd: Make multifd_send() thread safe Cédric Le Goater
2025-03-06 14:13 ` [PULL 22/42] migration/multifd: Add an explicit MultiFDSendData destructor Cédric Le Goater
2025-03-06 14:13 ` [PULL 23/42] migration/multifd: Device state transfer support - send side Cédric Le Goater
2025-03-06 14:14 ` [PULL 24/42] migration/multifd: Make MultiFDSendData a struct Cédric Le Goater
2025-03-06 14:14 ` [PULL 25/42] migration/multifd: Add multifd_device_state_supported() Cédric Le Goater
2025-03-06 14:14 ` [PULL 26/42] migration: Add save_live_complete_precopy_thread handler Cédric Le Goater
2025-03-06 14:14 ` [PULL 27/42] vfio/migration: Add load_device_config_state_start trace event Cédric Le Goater
2025-03-06 14:14 ` [PULL 28/42] vfio/migration: Convert bytes_transferred counter to atomic Cédric Le Goater
2025-03-06 14:14 ` [PULL 29/42] vfio/migration: Add vfio_add_bytes_transferred() Cédric Le Goater
2025-03-06 14:14 ` [PULL 30/42] vfio/migration: Move migration channel flags to vfio-common.h header file Cédric Le Goater
2025-03-06 14:14 ` [PULL 31/42] vfio/migration: Multifd device state transfer support - basic types Cédric Le Goater
2025-03-06 14:14 ` [PULL 32/42] vfio/migration: Multifd device state transfer - add support checking function Cédric Le Goater
2025-03-06 14:14 ` [PULL 33/42] vfio/migration: Multifd setup/cleanup functions and associated VFIOMultifd Cédric Le Goater
2025-03-06 14:14 ` [PULL 34/42] vfio/migration: Setup and cleanup multifd transfer in these general methods Cédric Le Goater
2025-03-06 14:14 ` [PULL 35/42] vfio/migration: Multifd device state transfer support - received buffers queuing Cédric Le Goater
2025-03-06 14:14 ` [PULL 36/42] vfio/migration: Multifd device state transfer support - load thread Cédric Le Goater
2025-03-06 14:14 ` [PULL 37/42] migration/qemu-file: Define g_autoptr() cleanup function for QEMUFile Cédric Le Goater
2025-03-06 14:14 ` [PULL 38/42] vfio/migration: Multifd device state transfer support - config loading support Cédric Le Goater
2025-03-06 14:14 ` [PULL 39/42] vfio/migration: Multifd device state transfer support - send side Cédric Le Goater
2025-03-06 14:14 ` [PULL 40/42] vfio/migration: Add x-migration-multifd-transfer VFIO property Cédric Le Goater
2025-03-06 14:14 ` [PULL 41/42] vfio/migration: Make x-migration-multifd-transfer VFIO property mutable Cédric Le Goater
2025-03-06 14:14 ` [PULL 42/42] hw/core/machine: Add compat for x-migration-multifd-transfer VFIO property Cédric Le Goater
2025-03-07  7:18 ` [PULL 00/42] vfio queue Stefan Hajnoczi

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20250306141419.2015340-14-clg@redhat.com \
    --to=clg@redhat.com \
    --cc=alex.williamson@redhat.com \
    --cc=farosas@suse.de \
    --cc=maciej.szmigiero@oracle.com \
    --cc=peterx@redhat.com \
    --cc=qemu-devel@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).