From: Wenchao Xia <xiawenc@linux.vnet.ibm.com>
To: Stefan Hajnoczi <stefanha@redhat.com>
Cc: Kevin Wolf <kwolf@redhat.com>,
Paolo Bonzini <pbonzini@redhat.com>,
qemu-devel@nongnu.org
Subject: Re: [Qemu-devel] [PATCH 1/5] threadpool: move globals into struct ThreadPool
Date: Fri, 08 Mar 2013 11:18:56 +0800 [thread overview]
Message-ID: <513958A0.9020703@linux.vnet.ibm.com> (raw)
In-Reply-To: <1362584735-30911-2-git-send-email-stefanha@redhat.com>
Reviewed the patch, it is a pure remove of globals variable.
Reviewed-by: Wenchao Xia <xiawenc@linux.vnet.ibm.com>
> Move global variables into a struct so multiple thread pools can be
> supported in the future.
>
> This patch does not change thread-pool.h interfaces. There is still a
> global thread pool and it is not yet possible to create/destroy
> individual thread pools. Moving the variables into a struct first makes
> later patches easier to review.
>
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
> thread-pool.c | 195 ++++++++++++++++++++++++++++++++++------------------------
> trace-events | 4 +-
> 2 files changed, 117 insertions(+), 82 deletions(-)
>
> diff --git a/thread-pool.c b/thread-pool.c
> index e3ca64d..8a957b9 100644
> --- a/thread-pool.c
> +++ b/thread-pool.c
> @@ -24,7 +24,9 @@
> #include "qemu/event_notifier.h"
> #include "block/thread-pool.h"
>
> -static void do_spawn_thread(void);
> +typedef struct ThreadPool ThreadPool;
> +
> +static void do_spawn_thread(ThreadPool *pool);
>
> typedef struct ThreadPoolElement ThreadPoolElement;
>
> @@ -37,6 +39,7 @@ enum ThreadState {
>
> struct ThreadPoolElement {
> BlockDriverAIOCB common;
> + ThreadPool *pool;
> ThreadPoolFunc *func;
> void *arg;
>
> @@ -54,49 +57,56 @@ struct ThreadPoolElement {
> QLIST_ENTRY(ThreadPoolElement) all;
> };
>
> -static EventNotifier notifier;
> -static QemuMutex lock;
> -static QemuCond check_cancel;
> -static QemuSemaphore sem;
> -static int max_threads = 64;
> -static QEMUBH *new_thread_bh;
> -
> -/* The following variables are protected by the global mutex. */
> -static QLIST_HEAD(, ThreadPoolElement) head;
> -
> -/* The following variables are protected by lock. */
> -static QTAILQ_HEAD(, ThreadPoolElement) request_list;
> -static int cur_threads;
> -static int idle_threads;
> -static int new_threads; /* backlog of threads we need to create */
> -static int pending_threads; /* threads created but not running yet */
> -static int pending_cancellations; /* whether we need a cond_broadcast */
> -
> -static void *worker_thread(void *unused)
> +struct ThreadPool {
> + EventNotifier notifier;
> + QemuMutex lock;
> + QemuCond check_cancel;
> + QemuSemaphore sem;
> + int max_threads;
> + QEMUBH *new_thread_bh;
> +
> + /* The following variables are only accessed from one AioContext. */
> + QLIST_HEAD(, ThreadPoolElement) head;
> +
> + /* The following variables are protected by lock. */
> + QTAILQ_HEAD(, ThreadPoolElement) request_list;
> + int cur_threads;
> + int idle_threads;
> + int new_threads; /* backlog of threads we need to create */
> + int pending_threads; /* threads created but not running yet */
> + int pending_cancellations; /* whether we need a cond_broadcast */
> +};
> +
> +/* Currently there is only one thread pool instance. */
> +static ThreadPool global_pool;
> +
> +static void *worker_thread(void *opaque)
> {
> - qemu_mutex_lock(&lock);
> - pending_threads--;
> - do_spawn_thread();
> + ThreadPool *pool = opaque;
> +
> + qemu_mutex_lock(&pool->lock);
> + pool->pending_threads--;
> + do_spawn_thread(pool);
>
> while (1) {
> ThreadPoolElement *req;
> int ret;
>
> do {
> - idle_threads++;
> - qemu_mutex_unlock(&lock);
> - ret = qemu_sem_timedwait(&sem, 10000);
> - qemu_mutex_lock(&lock);
> - idle_threads--;
> - } while (ret == -1 && !QTAILQ_EMPTY(&request_list));
> + pool->idle_threads++;
> + qemu_mutex_unlock(&pool->lock);
> + ret = qemu_sem_timedwait(&pool->sem, 10000);
> + qemu_mutex_lock(&pool->lock);
> + pool->idle_threads--;
> + } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
> if (ret == -1) {
> break;
> }
>
> - req = QTAILQ_FIRST(&request_list);
> - QTAILQ_REMOVE(&request_list, req, reqs);
> + req = QTAILQ_FIRST(&pool->request_list);
> + QTAILQ_REMOVE(&pool->request_list, req, reqs);
> req->state = THREAD_ACTIVE;
> - qemu_mutex_unlock(&lock);
> + qemu_mutex_unlock(&pool->lock);
>
> ret = req->func(req->arg);
>
> @@ -105,45 +115,47 @@ static void *worker_thread(void *unused)
> smp_wmb();
> req->state = THREAD_DONE;
>
> - qemu_mutex_lock(&lock);
> - if (pending_cancellations) {
> - qemu_cond_broadcast(&check_cancel);
> + qemu_mutex_lock(&pool->lock);
> + if (pool->pending_cancellations) {
> + qemu_cond_broadcast(&pool->check_cancel);
> }
>
> - event_notifier_set(¬ifier);
> + event_notifier_set(&pool->notifier);
> }
>
> - cur_threads--;
> - qemu_mutex_unlock(&lock);
> + pool->cur_threads--;
> + qemu_mutex_unlock(&pool->lock);
> return NULL;
> }
>
> -static void do_spawn_thread(void)
> +static void do_spawn_thread(ThreadPool *pool)
> {
> QemuThread t;
>
> /* Runs with lock taken. */
> - if (!new_threads) {
> + if (!pool->new_threads) {
> return;
> }
>
> - new_threads--;
> - pending_threads++;
> + pool->new_threads--;
> + pool->pending_threads++;
>
> - qemu_thread_create(&t, worker_thread, NULL, QEMU_THREAD_DETACHED);
> + qemu_thread_create(&t, worker_thread, pool, QEMU_THREAD_DETACHED);
> }
>
> static void spawn_thread_bh_fn(void *opaque)
> {
> - qemu_mutex_lock(&lock);
> - do_spawn_thread();
> - qemu_mutex_unlock(&lock);
> + ThreadPool *pool = opaque;
> +
> + qemu_mutex_lock(&pool->lock);
> + do_spawn_thread(pool);
> + qemu_mutex_unlock(&pool->lock);
> }
>
> -static void spawn_thread(void)
> +static void spawn_thread(ThreadPool *pool)
> {
> - cur_threads++;
> - new_threads++;
> + pool->cur_threads++;
> + pool->new_threads++;
> /* If there are threads being created, they will spawn new workers, so
> * we don't spend time creating many threads in a loop holding a mutex or
> * starving the current vcpu.
> @@ -151,23 +163,25 @@ static void spawn_thread(void)
> * If there are no idle threads, ask the main thread to create one, so we
> * inherit the correct affinity instead of the vcpu affinity.
> */
> - if (!pending_threads) {
> - qemu_bh_schedule(new_thread_bh);
> + if (!pool->pending_threads) {
> + qemu_bh_schedule(pool->new_thread_bh);
> }
> }
>
> static void event_notifier_ready(EventNotifier *notifier)
> {
> + ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
> ThreadPoolElement *elem, *next;
>
> event_notifier_test_and_clear(notifier);
> restart:
> - QLIST_FOREACH_SAFE(elem, &head, all, next) {
> + QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
> if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
> continue;
> }
> if (elem->state == THREAD_DONE) {
> - trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
> + trace_thread_pool_complete(pool, elem, elem->common.opaque,
> + elem->ret);
> }
> if (elem->state == THREAD_DONE && elem->common.cb) {
> QLIST_REMOVE(elem, all);
> @@ -186,34 +200,36 @@ restart:
>
> static int thread_pool_active(EventNotifier *notifier)
> {
> - return !QLIST_EMPTY(&head);
> + ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
> + return !QLIST_EMPTY(&pool->head);
> }
>
> static void thread_pool_cancel(BlockDriverAIOCB *acb)
> {
> ThreadPoolElement *elem = (ThreadPoolElement *)acb;
> + ThreadPool *pool = elem->pool;
>
> trace_thread_pool_cancel(elem, elem->common.opaque);
>
> - qemu_mutex_lock(&lock);
> + qemu_mutex_lock(&pool->lock);
> if (elem->state == THREAD_QUEUED &&
> /* No thread has yet started working on elem. we can try to "steal"
> * the item from the worker if we can get a signal from the
> * semaphore. Because this is non-blocking, we can do it with
> * the lock taken and ensure that elem will remain THREAD_QUEUED.
> */
> - qemu_sem_timedwait(&sem, 0) == 0) {
> - QTAILQ_REMOVE(&request_list, elem, reqs);
> + qemu_sem_timedwait(&pool->sem, 0) == 0) {
> + QTAILQ_REMOVE(&pool->request_list, elem, reqs);
> elem->state = THREAD_CANCELED;
> - event_notifier_set(¬ifier);
> + event_notifier_set(&pool->notifier);
> } else {
> - pending_cancellations++;
> + pool->pending_cancellations++;
> while (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
> - qemu_cond_wait(&check_cancel, &lock);
> + qemu_cond_wait(&pool->check_cancel, &pool->lock);
> }
> - pending_cancellations--;
> + pool->pending_cancellations--;
> }
> - qemu_mutex_unlock(&lock);
> + qemu_mutex_unlock(&pool->lock);
> }
>
> static const AIOCBInfo thread_pool_aiocb_info = {
> @@ -224,24 +240,26 @@ static const AIOCBInfo thread_pool_aiocb_info = {
> BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
> BlockDriverCompletionFunc *cb, void *opaque)
> {
> + ThreadPool *pool = &global_pool;
> ThreadPoolElement *req;
>
> req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
> req->func = func;
> req->arg = arg;
> req->state = THREAD_QUEUED;
> + req->pool = pool;
>
> - QLIST_INSERT_HEAD(&head, req, all);
> + QLIST_INSERT_HEAD(&pool->head, req, all);
>
> - trace_thread_pool_submit(req, arg);
> + trace_thread_pool_submit(pool, req, arg);
>
> - qemu_mutex_lock(&lock);
> - if (idle_threads == 0 && cur_threads < max_threads) {
> - spawn_thread();
> + qemu_mutex_lock(&pool->lock);
> + if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
> + spawn_thread(pool);
> }
> - QTAILQ_INSERT_TAIL(&request_list, req, reqs);
> - qemu_mutex_unlock(&lock);
> - qemu_sem_post(&sem);
> + QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
> + qemu_mutex_unlock(&pool->lock);
> + qemu_sem_post(&pool->sem);
> return &req->common;
> }
>
> @@ -272,18 +290,35 @@ void thread_pool_submit(ThreadPoolFunc *func, void *arg)
> thread_pool_submit_aio(func, arg, NULL, NULL);
> }
>
> +static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
> +{
> + memset(pool, 0, sizeof(*pool));
> + event_notifier_init(&pool->notifier, false);
> + qemu_mutex_init(&pool->lock);
> + qemu_cond_init(&pool->check_cancel);
> + qemu_sem_init(&pool->sem, 0);
> + pool->max_threads = 64;
> + if (ctx) {
> + pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
> + } else {
> + pool->new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, pool);
> + }
> +
> + QLIST_INIT(&pool->head);
> + QTAILQ_INIT(&pool->request_list);
> +
> + if (ctx) {
> + aio_set_event_notifier(ctx, &pool->notifier, event_notifier_ready,
> + thread_pool_active);
> + } else {
> + qemu_aio_set_event_notifier(&pool->notifier, event_notifier_ready,
> + thread_pool_active);
> + }
> +}
> +
> static void thread_pool_init(void)
> {
> - QLIST_INIT(&head);
> - event_notifier_init(¬ifier, false);
> - qemu_mutex_init(&lock);
> - qemu_cond_init(&check_cancel);
> - qemu_sem_init(&sem, 0);
> - qemu_aio_set_event_notifier(¬ifier, event_notifier_ready,
> - thread_pool_active);
> -
> - QTAILQ_INIT(&request_list);
> - new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
> + thread_pool_init_one(&global_pool, NULL);
> }
>
> block_init(thread_pool_init)
> diff --git a/trace-events b/trace-events
> index a27ae43..f16c021 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -115,8 +115,8 @@ virtio_blk_data_plane_complete_request(void *s, unsigned int head, int ret) "dat
> vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring physical %#"PRIx64" desc %p avail %p used %p"
>
> # thread-pool.c
> -thread_pool_submit(void *req, void *opaque) "req %p opaque %p"
> -thread_pool_complete(void *req, void *opaque, int ret) "req %p opaque %p ret %d"
> +thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p"
> +thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d"
> thread_pool_cancel(void *req, void *opaque) "req %p opaque %p"
>
> # posix-aio-compat.c
>
--
Best Regards
Wenchao Xia
next prev parent reply other threads:[~2013-03-08 3:20 UTC|newest]
Thread overview: 14+ messages / expand[flat|nested] mbox.gz Atom feed top
2013-03-06 15:45 [Qemu-devel] [PATCH 0/5] threadpool: support multiple ThreadPools Stefan Hajnoczi
2013-03-06 15:45 ` [Qemu-devel] [PATCH 1/5] threadpool: move globals into struct ThreadPool Stefan Hajnoczi
2013-03-06 17:25 ` Paolo Bonzini
2013-03-08 3:18 ` Wenchao Xia [this message]
2013-03-06 15:45 ` [Qemu-devel] [PATCH 2/5] threadpool: add thread_pool_new() and thread_pool_free() Stefan Hajnoczi
2013-03-06 16:36 ` Paolo Bonzini
2013-03-06 15:45 ` [Qemu-devel] [PATCH 3/5] aio: add a ThreadPool instance to AioContext Stefan Hajnoczi
2013-03-06 17:24 ` Paolo Bonzini
2013-03-07 10:02 ` Stefan Hajnoczi
2013-03-06 15:45 ` [Qemu-devel] [PATCH 4/5] main-loop: add qemu_get_aio_context() Stefan Hajnoczi
2013-03-06 17:26 ` Paolo Bonzini
2013-03-06 15:45 ` [Qemu-devel] [PATCH 5/5] threadpool: drop global thread pool Stefan Hajnoczi
2013-03-06 16:35 ` Paolo Bonzini
2013-03-07 10:07 ` Stefan Hajnoczi
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=513958A0.9020703@linux.vnet.ibm.com \
--to=xiawenc@linux.vnet.ibm.com \
--cc=kwolf@redhat.com \
--cc=pbonzini@redhat.com \
--cc=qemu-devel@nongnu.org \
--cc=stefanha@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.