From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([208.118.235.92]:37920) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1UDnrJ-0007Ya-IF for qemu-devel@nongnu.org; Thu, 07 Mar 2013 22:20:40 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1UDnrG-0003W0-UW for qemu-devel@nongnu.org; Thu, 07 Mar 2013 22:20:37 -0500 Received: from e28smtp06.in.ibm.com ([122.248.162.6]:58687) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1UDnrF-0003TK-US for qemu-devel@nongnu.org; Thu, 07 Mar 2013 22:20:34 -0500 Received: from /spool/local by e28smtp06.in.ibm.com with IBM ESMTP SMTP Gateway: Authorized Use Only! Violators will be prosecuted for from ; Fri, 8 Mar 2013 08:46:53 +0530 Received: from d28relay01.in.ibm.com (d28relay01.in.ibm.com [9.184.220.58]) by d28dlp02.in.ibm.com (Postfix) with ESMTP id 194AE394002D for ; Fri, 8 Mar 2013 08:50:24 +0530 (IST) Received: from d28av04.in.ibm.com (d28av04.in.ibm.com [9.184.220.66]) by d28relay01.in.ibm.com (8.13.8/8.13.8/NCO v10.0) with ESMTP id r283KIci20906044 for ; Fri, 8 Mar 2013 08:50:19 +0530 Received: from d28av04.in.ibm.com (loopback [127.0.0.1]) by d28av04.in.ibm.com (8.14.4/8.13.1/NCO v10.0 AVout) with ESMTP id r283KN5T003818 for ; Fri, 8 Mar 2013 14:20:23 +1100 Message-ID: <513958A0.9020703@linux.vnet.ibm.com> Date: Fri, 08 Mar 2013 11:18:56 +0800 From: Wenchao Xia MIME-Version: 1.0 References: <1362584735-30911-1-git-send-email-stefanha@redhat.com> <1362584735-30911-2-git-send-email-stefanha@redhat.com> In-Reply-To: <1362584735-30911-2-git-send-email-stefanha@redhat.com> Content-Type: text/plain; charset=GB2312 Content-Transfer-Encoding: 7bit Subject: Re: [Qemu-devel] [PATCH 1/5] threadpool: move globals into struct ThreadPool List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Stefan Hajnoczi Cc: Kevin Wolf , Paolo Bonzini , qemu-devel@nongnu.org Reviewed the patch, it is a pure remove of globals variable. Reviewed-by: Wenchao Xia > Move global variables into a struct so multiple thread pools can be > supported in the future. > > This patch does not change thread-pool.h interfaces. There is still a > global thread pool and it is not yet possible to create/destroy > individual thread pools. Moving the variables into a struct first makes > later patches easier to review. > > Signed-off-by: Stefan Hajnoczi > --- > thread-pool.c | 195 ++++++++++++++++++++++++++++++++++------------------------ > trace-events | 4 +- > 2 files changed, 117 insertions(+), 82 deletions(-) > > diff --git a/thread-pool.c b/thread-pool.c > index e3ca64d..8a957b9 100644 > --- a/thread-pool.c > +++ b/thread-pool.c > @@ -24,7 +24,9 @@ > #include "qemu/event_notifier.h" > #include "block/thread-pool.h" > > -static void do_spawn_thread(void); > +typedef struct ThreadPool ThreadPool; > + > +static void do_spawn_thread(ThreadPool *pool); > > typedef struct ThreadPoolElement ThreadPoolElement; > > @@ -37,6 +39,7 @@ enum ThreadState { > > struct ThreadPoolElement { > BlockDriverAIOCB common; > + ThreadPool *pool; > ThreadPoolFunc *func; > void *arg; > > @@ -54,49 +57,56 @@ struct ThreadPoolElement { > QLIST_ENTRY(ThreadPoolElement) all; > }; > > -static EventNotifier notifier; > -static QemuMutex lock; > -static QemuCond check_cancel; > -static QemuSemaphore sem; > -static int max_threads = 64; > -static QEMUBH *new_thread_bh; > - > -/* The following variables are protected by the global mutex. */ > -static QLIST_HEAD(, ThreadPoolElement) head; > - > -/* The following variables are protected by lock. */ > -static QTAILQ_HEAD(, ThreadPoolElement) request_list; > -static int cur_threads; > -static int idle_threads; > -static int new_threads; /* backlog of threads we need to create */ > -static int pending_threads; /* threads created but not running yet */ > -static int pending_cancellations; /* whether we need a cond_broadcast */ > - > -static void *worker_thread(void *unused) > +struct ThreadPool { > + EventNotifier notifier; > + QemuMutex lock; > + QemuCond check_cancel; > + QemuSemaphore sem; > + int max_threads; > + QEMUBH *new_thread_bh; > + > + /* The following variables are only accessed from one AioContext. */ > + QLIST_HEAD(, ThreadPoolElement) head; > + > + /* The following variables are protected by lock. */ > + QTAILQ_HEAD(, ThreadPoolElement) request_list; > + int cur_threads; > + int idle_threads; > + int new_threads; /* backlog of threads we need to create */ > + int pending_threads; /* threads created but not running yet */ > + int pending_cancellations; /* whether we need a cond_broadcast */ > +}; > + > +/* Currently there is only one thread pool instance. */ > +static ThreadPool global_pool; > + > +static void *worker_thread(void *opaque) > { > - qemu_mutex_lock(&lock); > - pending_threads--; > - do_spawn_thread(); > + ThreadPool *pool = opaque; > + > + qemu_mutex_lock(&pool->lock); > + pool->pending_threads--; > + do_spawn_thread(pool); > > while (1) { > ThreadPoolElement *req; > int ret; > > do { > - idle_threads++; > - qemu_mutex_unlock(&lock); > - ret = qemu_sem_timedwait(&sem, 10000); > - qemu_mutex_lock(&lock); > - idle_threads--; > - } while (ret == -1 && !QTAILQ_EMPTY(&request_list)); > + pool->idle_threads++; > + qemu_mutex_unlock(&pool->lock); > + ret = qemu_sem_timedwait(&pool->sem, 10000); > + qemu_mutex_lock(&pool->lock); > + pool->idle_threads--; > + } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list)); > if (ret == -1) { > break; > } > > - req = QTAILQ_FIRST(&request_list); > - QTAILQ_REMOVE(&request_list, req, reqs); > + req = QTAILQ_FIRST(&pool->request_list); > + QTAILQ_REMOVE(&pool->request_list, req, reqs); > req->state = THREAD_ACTIVE; > - qemu_mutex_unlock(&lock); > + qemu_mutex_unlock(&pool->lock); > > ret = req->func(req->arg); > > @@ -105,45 +115,47 @@ static void *worker_thread(void *unused) > smp_wmb(); > req->state = THREAD_DONE; > > - qemu_mutex_lock(&lock); > - if (pending_cancellations) { > - qemu_cond_broadcast(&check_cancel); > + qemu_mutex_lock(&pool->lock); > + if (pool->pending_cancellations) { > + qemu_cond_broadcast(&pool->check_cancel); > } > > - event_notifier_set(¬ifier); > + event_notifier_set(&pool->notifier); > } > > - cur_threads--; > - qemu_mutex_unlock(&lock); > + pool->cur_threads--; > + qemu_mutex_unlock(&pool->lock); > return NULL; > } > > -static void do_spawn_thread(void) > +static void do_spawn_thread(ThreadPool *pool) > { > QemuThread t; > > /* Runs with lock taken. */ > - if (!new_threads) { > + if (!pool->new_threads) { > return; > } > > - new_threads--; > - pending_threads++; > + pool->new_threads--; > + pool->pending_threads++; > > - qemu_thread_create(&t, worker_thread, NULL, QEMU_THREAD_DETACHED); > + qemu_thread_create(&t, worker_thread, pool, QEMU_THREAD_DETACHED); > } > > static void spawn_thread_bh_fn(void *opaque) > { > - qemu_mutex_lock(&lock); > - do_spawn_thread(); > - qemu_mutex_unlock(&lock); > + ThreadPool *pool = opaque; > + > + qemu_mutex_lock(&pool->lock); > + do_spawn_thread(pool); > + qemu_mutex_unlock(&pool->lock); > } > > -static void spawn_thread(void) > +static void spawn_thread(ThreadPool *pool) > { > - cur_threads++; > - new_threads++; > + pool->cur_threads++; > + pool->new_threads++; > /* If there are threads being created, they will spawn new workers, so > * we don't spend time creating many threads in a loop holding a mutex or > * starving the current vcpu. > @@ -151,23 +163,25 @@ static void spawn_thread(void) > * If there are no idle threads, ask the main thread to create one, so we > * inherit the correct affinity instead of the vcpu affinity. > */ > - if (!pending_threads) { > - qemu_bh_schedule(new_thread_bh); > + if (!pool->pending_threads) { > + qemu_bh_schedule(pool->new_thread_bh); > } > } > > static void event_notifier_ready(EventNotifier *notifier) > { > + ThreadPool *pool = container_of(notifier, ThreadPool, notifier); > ThreadPoolElement *elem, *next; > > event_notifier_test_and_clear(notifier); > restart: > - QLIST_FOREACH_SAFE(elem, &head, all, next) { > + QLIST_FOREACH_SAFE(elem, &pool->head, all, next) { > if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) { > continue; > } > if (elem->state == THREAD_DONE) { > - trace_thread_pool_complete(elem, elem->common.opaque, elem->ret); > + trace_thread_pool_complete(pool, elem, elem->common.opaque, > + elem->ret); > } > if (elem->state == THREAD_DONE && elem->common.cb) { > QLIST_REMOVE(elem, all); > @@ -186,34 +200,36 @@ restart: > > static int thread_pool_active(EventNotifier *notifier) > { > - return !QLIST_EMPTY(&head); > + ThreadPool *pool = container_of(notifier, ThreadPool, notifier); > + return !QLIST_EMPTY(&pool->head); > } > > static void thread_pool_cancel(BlockDriverAIOCB *acb) > { > ThreadPoolElement *elem = (ThreadPoolElement *)acb; > + ThreadPool *pool = elem->pool; > > trace_thread_pool_cancel(elem, elem->common.opaque); > > - qemu_mutex_lock(&lock); > + qemu_mutex_lock(&pool->lock); > if (elem->state == THREAD_QUEUED && > /* No thread has yet started working on elem. we can try to "steal" > * the item from the worker if we can get a signal from the > * semaphore. Because this is non-blocking, we can do it with > * the lock taken and ensure that elem will remain THREAD_QUEUED. > */ > - qemu_sem_timedwait(&sem, 0) == 0) { > - QTAILQ_REMOVE(&request_list, elem, reqs); > + qemu_sem_timedwait(&pool->sem, 0) == 0) { > + QTAILQ_REMOVE(&pool->request_list, elem, reqs); > elem->state = THREAD_CANCELED; > - event_notifier_set(¬ifier); > + event_notifier_set(&pool->notifier); > } else { > - pending_cancellations++; > + pool->pending_cancellations++; > while (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) { > - qemu_cond_wait(&check_cancel, &lock); > + qemu_cond_wait(&pool->check_cancel, &pool->lock); > } > - pending_cancellations--; > + pool->pending_cancellations--; > } > - qemu_mutex_unlock(&lock); > + qemu_mutex_unlock(&pool->lock); > } > > static const AIOCBInfo thread_pool_aiocb_info = { > @@ -224,24 +240,26 @@ static const AIOCBInfo thread_pool_aiocb_info = { > BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, > BlockDriverCompletionFunc *cb, void *opaque) > { > + ThreadPool *pool = &global_pool; > ThreadPoolElement *req; > > req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque); > req->func = func; > req->arg = arg; > req->state = THREAD_QUEUED; > + req->pool = pool; > > - QLIST_INSERT_HEAD(&head, req, all); > + QLIST_INSERT_HEAD(&pool->head, req, all); > > - trace_thread_pool_submit(req, arg); > + trace_thread_pool_submit(pool, req, arg); > > - qemu_mutex_lock(&lock); > - if (idle_threads == 0 && cur_threads < max_threads) { > - spawn_thread(); > + qemu_mutex_lock(&pool->lock); > + if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) { > + spawn_thread(pool); > } > - QTAILQ_INSERT_TAIL(&request_list, req, reqs); > - qemu_mutex_unlock(&lock); > - qemu_sem_post(&sem); > + QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); > + qemu_mutex_unlock(&pool->lock); > + qemu_sem_post(&pool->sem); > return &req->common; > } > > @@ -272,18 +290,35 @@ void thread_pool_submit(ThreadPoolFunc *func, void *arg) > thread_pool_submit_aio(func, arg, NULL, NULL); > } > > +static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) > +{ > + memset(pool, 0, sizeof(*pool)); > + event_notifier_init(&pool->notifier, false); > + qemu_mutex_init(&pool->lock); > + qemu_cond_init(&pool->check_cancel); > + qemu_sem_init(&pool->sem, 0); > + pool->max_threads = 64; > + if (ctx) { > + pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); > + } else { > + pool->new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, pool); > + } > + > + QLIST_INIT(&pool->head); > + QTAILQ_INIT(&pool->request_list); > + > + if (ctx) { > + aio_set_event_notifier(ctx, &pool->notifier, event_notifier_ready, > + thread_pool_active); > + } else { > + qemu_aio_set_event_notifier(&pool->notifier, event_notifier_ready, > + thread_pool_active); > + } > +} > + > static void thread_pool_init(void) > { > - QLIST_INIT(&head); > - event_notifier_init(¬ifier, false); > - qemu_mutex_init(&lock); > - qemu_cond_init(&check_cancel); > - qemu_sem_init(&sem, 0); > - qemu_aio_set_event_notifier(¬ifier, event_notifier_ready, > - thread_pool_active); > - > - QTAILQ_INIT(&request_list); > - new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL); > + thread_pool_init_one(&global_pool, NULL); > } > > block_init(thread_pool_init) > diff --git a/trace-events b/trace-events > index a27ae43..f16c021 100644 > --- a/trace-events > +++ b/trace-events > @@ -115,8 +115,8 @@ virtio_blk_data_plane_complete_request(void *s, unsigned int head, int ret) "dat > vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring physical %#"PRIx64" desc %p avail %p used %p" > > # thread-pool.c > -thread_pool_submit(void *req, void *opaque) "req %p opaque %p" > -thread_pool_complete(void *req, void *opaque, int ret) "req %p opaque %p ret %d" > +thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p" > +thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d" > thread_pool_cancel(void *req, void *opaque) "req %p opaque %p" > > # posix-aio-compat.c > -- Best Regards Wenchao Xia