qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: Stefan Hajnoczi <stefanha@redhat.com>
To: qemu-devel@nongnu.org
Cc: Anthony Liguori <aliguori@us.ibm.com>
Subject: [Qemu-devel] [PATCH 20/28] threadpool: add thread_pool_new() and thread_pool_free()
Date: Fri, 15 Mar 2013 16:14:17 +0100	[thread overview]
Message-ID: <1363360465-5247-21-git-send-email-stefanha@redhat.com> (raw)
In-Reply-To: <1363360465-5247-1-git-send-email-stefanha@redhat.com>

ThreadPool is tied to an AioContext through its event notifier, which
dictates in which AioContext the work item's callback function will be
invoked.

In order to support multiple AioContexts we need to support multiple
ThreadPool instances.

This patch adds the new/free functions.  The free function deserves
special attention because it quiesces remaining worker threads.  This
requires a new condition variable and a "stopping" flag to let workers
know they should terminate once idle.

We never needed to do this before since the global threadpool was not
explicitly destroyed until process termination.

Also stash the AioContext pointer in ThreadPool so that we can call
aio_set_event_notifier() in thread_pool_free().  We didn't need to hold
onto AioContext previously since there was no free function.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
Reviewed-by: Paolo Bonzini <pbonzini@redhat.com>
---
 include/block/thread-pool.h |  5 +++++
 thread-pool.c               | 52 +++++++++++++++++++++++++++++++++++++++++----
 2 files changed, 53 insertions(+), 4 deletions(-)

diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
index 200703e..e1453c6 100644
--- a/include/block/thread-pool.h
+++ b/include/block/thread-pool.h
@@ -26,6 +26,11 @@
 
 typedef int ThreadPoolFunc(void *opaque);
 
+typedef struct ThreadPool ThreadPool;
+
+ThreadPool *thread_pool_new(struct AioContext *ctx);
+void thread_pool_free(ThreadPool *pool);
+
 BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
      BlockDriverCompletionFunc *cb, void *opaque);
 int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
diff --git a/thread-pool.c b/thread-pool.c
index a0aecd0..d1e4570 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -24,8 +24,6 @@
 #include "qemu/event_notifier.h"
 #include "block/thread-pool.h"
 
-typedef struct ThreadPool ThreadPool;
-
 static void do_spawn_thread(ThreadPool *pool);
 
 typedef struct ThreadPoolElement ThreadPoolElement;
@@ -59,8 +57,10 @@ struct ThreadPoolElement {
 
 struct ThreadPool {
     EventNotifier notifier;
+    AioContext *ctx;
     QemuMutex lock;
     QemuCond check_cancel;
+    QemuCond worker_stopped;
     QemuSemaphore sem;
     int max_threads;
     QEMUBH *new_thread_bh;
@@ -75,6 +75,7 @@ struct ThreadPool {
     int new_threads;     /* backlog of threads we need to create */
     int pending_threads; /* threads created but not running yet */
     int pending_cancellations; /* whether we need a cond_broadcast */
+    bool stopping;
 };
 
 /* Currently there is only one thread pool instance. */
@@ -88,7 +89,7 @@ static void *worker_thread(void *opaque)
     pool->pending_threads--;
     do_spawn_thread(pool);
 
-    while (1) {
+    while (!pool->stopping) {
         ThreadPoolElement *req;
         int ret;
 
@@ -99,7 +100,7 @@ static void *worker_thread(void *opaque)
             qemu_mutex_lock(&pool->lock);
             pool->idle_threads--;
         } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
-        if (ret == -1) {
+        if (ret == -1 || pool->stopping) {
             break;
         }
 
@@ -124,6 +125,7 @@ static void *worker_thread(void *opaque)
     }
 
     pool->cur_threads--;
+    qemu_cond_signal(&pool->worker_stopped);
     qemu_mutex_unlock(&pool->lock);
     return NULL;
 }
@@ -298,8 +300,10 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
 
     memset(pool, 0, sizeof(*pool));
     event_notifier_init(&pool->notifier, false);
+    pool->ctx = ctx;
     qemu_mutex_init(&pool->lock);
     qemu_cond_init(&pool->check_cancel);
+    qemu_cond_init(&pool->worker_stopped);
     qemu_sem_init(&pool->sem, 0);
     pool->max_threads = 64;
     pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
@@ -311,6 +315,46 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
                            thread_pool_active);
 }
 
+ThreadPool *thread_pool_new(AioContext *ctx)
+{
+    ThreadPool *pool = g_new(ThreadPool, 1);
+    thread_pool_init_one(pool, ctx);
+    return pool;
+}
+
+void thread_pool_free(ThreadPool *pool)
+{
+    if (!pool) {
+        return;
+    }
+
+    assert(QLIST_EMPTY(&pool->head));
+
+    qemu_mutex_lock(&pool->lock);
+
+    /* Stop new threads from spawning */
+    qemu_bh_delete(pool->new_thread_bh);
+    pool->cur_threads -= pool->new_threads;
+    pool->new_threads = 0;
+
+    /* Wait for worker threads to terminate */
+    pool->stopping = true;
+    while (pool->cur_threads > 0) {
+        qemu_sem_post(&pool->sem);
+        qemu_cond_wait(&pool->worker_stopped, &pool->lock);
+    }
+
+    qemu_mutex_unlock(&pool->lock);
+
+    aio_set_event_notifier(pool->ctx, &pool->notifier, NULL, NULL);
+    qemu_sem_destroy(&pool->sem);
+    qemu_cond_destroy(&pool->check_cancel);
+    qemu_cond_destroy(&pool->worker_stopped);
+    qemu_mutex_destroy(&pool->lock);
+    event_notifier_cleanup(&pool->notifier);
+    g_free(pool);
+}
+
 static void thread_pool_init(void)
 {
     thread_pool_init_one(&global_pool, NULL);
-- 
1.8.1.4

  parent reply	other threads:[~2013-03-15 15:15 UTC|newest]

Thread overview: 32+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2013-03-15 15:13 [Qemu-devel] [PULL 00/28] Block patches Stefan Hajnoczi
2013-03-15 15:13 ` [Qemu-devel] [PATCH 01/28] block: Add options QDict to .bdrv_open() Stefan Hajnoczi
2013-03-15 15:13 ` [Qemu-devel] [PATCH 02/28] block: Add options QDict to bdrv_open() prototype Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 03/28] Add qdict_clone_shallow() Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 04/28] block: Add options QDict to bdrv_open_common() Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 05/28] qemu-option: Add qemu_opts_absorb_qdict() Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 06/28] blockdev: Keep a copy of DriveInfo.serial Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 07/28] block: Support driver specific options in drive_init() Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 08/28] qcow2: Allow lazy refcounts to be enabled on the command line Stefan Hajnoczi
2013-03-15 17:02   ` Paolo Bonzini
2013-03-15 17:35     ` Anthony Liguori
2013-03-15 15:14 ` [Qemu-devel] [PATCH 09/28] qcow2: flush refcount cache correctly in alloc_refcount_block() Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 10/28] qcow2: flush refcount cache correctly in qcow2_write_snapshots() Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 11/28] qcow2: set L2 cache dependency in qcow2_alloc_bytes() Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 12/28] qcow2: flush in qcow2_update_snapshot_refcount() Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 13/28] qcow2: drop flush in update_cluster_refcount() Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 14/28] qcow2: drop unnecessary flush in qcow2_update_snapshot_refcount() Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 15/28] qcow2: make is_allocated return true for zero clusters Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 16/28] sheepdog: use non-blocking fd in coroutine context Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 17/28] sheepdog: set io_flush handler in do_co_req Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 18/28] main-loop: add qemu_get_aio_context() Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 19/28] threadpool: move globals into struct ThreadPool Stefan Hajnoczi
2013-03-15 15:14 ` Stefan Hajnoczi [this message]
2013-03-15 15:14 ` [Qemu-devel] [PATCH 21/28] aio: add a ThreadPool instance to AioContext Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 22/28] block: add bdrv_get_aio_context() Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 23/28] threadpool: drop global thread pool Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 24/28] coroutine: use AioContext for CoQueue BH Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 25/28] dataplane: fix hang introduced by AioContext transition Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 26/28] qemu-iotests: add tests for rebasing zero clusters Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 27/28] qemu-iotests: use -nographic in test case 007 Stefan Hajnoczi
2013-03-15 15:14 ` [Qemu-devel] [PATCH 28/28] blockdev: Fix up copyright and permission notice Stefan Hajnoczi
2013-03-15 15:46 ` [Qemu-devel] [PULL 00/28] Block patches Anthony Liguori

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1363360465-5247-21-git-send-email-stefanha@redhat.com \
    --to=stefanha@redhat.com \
    --cc=aliguori@us.ibm.com \
    --cc=qemu-devel@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).