From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from [140.186.70.92] (port=56365 helo=eggs.gnu.org) by lists.gnu.org with esmtp (Exim 4.43) id 1PZzRW-0001e8-T7 for qemu-devel@nongnu.org; Tue, 04 Jan 2011 00:28:24 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1PZzRU-0004lm-VH for qemu-devel@nongnu.org; Tue, 04 Jan 2011 00:28:22 -0500 Received: from e23smtp07.au.ibm.com ([202.81.31.140]:60524) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1PZzRT-0004la-Ve for qemu-devel@nongnu.org; Tue, 04 Jan 2011 00:28:20 -0500 Received: from d23relay03.au.ibm.com (d23relay03.au.ibm.com [202.81.31.245]) by e23smtp07.au.ibm.com (8.14.4/8.13.1) with ESMTP id p045SIL5022531 for ; Tue, 4 Jan 2011 16:28:18 +1100 Received: from d23av03.au.ibm.com (d23av03.au.ibm.com [9.190.234.97]) by d23relay03.au.ibm.com (8.13.8/8.13.8/NCO v10.0) with ESMTP id p045SIgD2343002 for ; Tue, 4 Jan 2011 16:28:18 +1100 Received: from d23av03.au.ibm.com (loopback [127.0.0.1]) by d23av03.au.ibm.com (8.14.4/8.13.1/NCO v10.0 AVout) with ESMTP id p045SI9R004721 for ; Tue, 4 Jan 2011 16:28:18 +1100 From: Arun R Bharadwaj Date: Tue, 04 Jan 2011 10:58:13 +0530 Message-ID: <20110104052813.15887.84972.stgit@localhost6.localdomain6> In-Reply-To: <20110104052627.15887.43436.stgit@localhost6.localdomain6> References: <20110104052627.15887.43436.stgit@localhost6.localdomain6> MIME-Version: 1.0 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: 7bit Subject: [Qemu-devel] [PATCH 11/13] Move threadlet code to qemu-threadlets.c List-Id: qemu-devel.nongnu.org List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: qemu-devel@nongnu.org Cc: aliguori@us.ibm.com, stefanha@linux.vnet.ibm.com, aneesh.kumar@linux.vnet.ibm.com This patch moves the threadlet queue API code to qemu-threadlets.c where these APIs can be used by other subsystems. Signed-off-by: Arun R Bharadwaj --- Makefile.objs | 1 posix-aio-compat.c | 151 ---------------------------------------------------- qemu-thread.h | 1 qemu-threadlets.c | 148 +++++++++++++++++++++++++++++++++++++++++++++++++++ qemu-threadlets.h | 43 +++++++++++++++ vl.c | 2 - 6 files changed, 194 insertions(+), 152 deletions(-) create mode 100644 qemu-threadlets.c create mode 100644 qemu-threadlets.h diff --git a/Makefile.objs b/Makefile.objs index 3b7ec27..2cf8aba 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -10,6 +10,7 @@ qobject-obj-y += qerror.o block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o block-obj-$(CONFIG_POSIX) += qemu-thread.o +block-obj-$(CONFIG_POSIX) += qemu-threadlets.o block-obj-$(CONFIG_POSIX) += posix-aio-compat.o block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o diff --git a/posix-aio-compat.c b/posix-aio-compat.c index d5874d9..e6de882 100644 --- a/posix-aio-compat.c +++ b/posix-aio-compat.c @@ -27,33 +27,13 @@ #include "qemu-common.h" #include "trace.h" #include "block_int.h" -#include "qemu-thread.h" +#include "qemu-threadlets.h" #include "block/raw-posix-aio.h" -#define MAX_GLOBAL_THREADS 64 -#define MIN_GLOBAL_THREADS 8 - static QemuMutex aiocb_mutex; static QemuCond aiocb_completion; -typedef struct ThreadletQueue -{ - QemuMutex lock; - QemuCond cond; - int max_threads; - int min_threads; - int cur_threads; - int idle_threads; - QTAILQ_HEAD(, ThreadletWork) request_list; -} ThreadletQueue; - -typedef struct ThreadletWork -{ - QTAILQ_ENTRY(ThreadletWork) node; - void (*func)(struct ThreadletWork *work); -} ThreadletWork; - struct qemu_paiocb { BlockDriverAIOCB common; int aio_fildes; @@ -80,10 +60,6 @@ typedef struct PosixAioState { struct qemu_paiocb *first_aio; } PosixAioState; -/* Default ThreadletQueue */ -static ThreadletQueue globalqueue; -static int globalqueue_init; - #ifdef CONFIG_PREADV static int preadv_present = 1; #else @@ -284,50 +260,6 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb) return nbytes; } -static void *threadlet_worker(void *data) -{ - ThreadletQueue *queue = data; - - qemu_mutex_lock(&queue->lock); - while (1) { - ThreadletWork *work; - int ret = 0; - - while (QTAILQ_EMPTY(&queue->request_list) && - (ret != ETIMEDOUT)) { - /* wait for cond to be signalled or broadcast for 1000s */ - ret = qemu_cond_timedwait((&queue->cond), - &(queue->lock), 10*100000); - } - - assert(queue->idle_threads != 0); - if (QTAILQ_EMPTY(&queue->request_list)) { - if (queue->cur_threads > queue->min_threads) { - /* We retain the minimum number of threads */ - break; - } - } else { - work = QTAILQ_FIRST(&queue->request_list); - QTAILQ_REMOVE(&queue->request_list, work, node); - - queue->idle_threads--; - qemu_mutex_unlock(&queue->lock); - - /* execute the work function */ - work->func(work); - - qemu_mutex_lock(&queue->lock); - queue->idle_threads++; - } - } - - queue->idle_threads--; - queue->cur_threads--; - qemu_mutex_unlock(&queue->lock); - - return NULL; -} - static PosixAioState *posix_aio_state; static void aio_thread(ThreadletWork *work) @@ -373,68 +305,6 @@ static void aio_thread(ThreadletWork *work) return; } -static void threadlet_io_completion_signal_handler(int signum) -{ - qemu_service_io(); -} - -static void threadlet_register_signal_handler(void) -{ - struct sigaction act; - sigfillset(&act.sa_mask); - act.sa_flags = 0; /* do not restart syscalls to interrupt select() */ - act.sa_handler = threadlet_io_completion_signal_handler; - sigaction(SIGUSR2, &act, NULL); -} - -void threadlet_init(void) -{ - threadlet_register_signal_handler(); -} - -static void spawn_threadlet(ThreadletQueue *queue) -{ - QemuThread thread; - - queue->cur_threads++; - queue->idle_threads++; - - qemu_thread_create(&thread, threadlet_worker, queue); -} - - -/** - * submit_work: Submit to the global queue a new task to be executed - * asynchronously. - * @work: Contains information about the task that needs to be submitted. - */ -static void submit_work(ThreadletWork *work) -{ - qemu_mutex_lock(&globalqueue.lock); - - if (!globalqueue_init) { - globalqueue.cur_threads = 0; - globalqueue.idle_threads = 0; - globalqueue.max_threads = MAX_GLOBAL_THREADS; - globalqueue.min_threads = MIN_GLOBAL_THREADS; - QTAILQ_INIT(&globalqueue.request_list); - qemu_mutex_init(&globalqueue.lock); - qemu_cond_init(&globalqueue.cond); - - globalqueue_init = 1; - } - - if (globalqueue.idle_threads == 0 && - globalqueue.cur_threads < globalqueue.max_threads) { - spawn_threadlet(&globalqueue); - - } else { - qemu_cond_signal(&globalqueue.cond); - } - QTAILQ_INSERT_TAIL(&globalqueue.request_list, work, node); - qemu_mutex_unlock(&globalqueue.lock); -} - static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb) { ssize_t ret; @@ -567,25 +437,6 @@ static void paio_remove(struct qemu_paiocb *acb) } } -/** - * dequeue_work: Cancel a task queued on the global queue. - * @work: Contains the information of the task that needs to be cancelled. - * - * Returns: 0 if the task is successfully cancelled. - * 1 otherwise. - */ -static int dequeue_work(ThreadletWork *work) -{ - int ret = 1; - - qemu_mutex_lock(&globalqueue.lock); - QTAILQ_REMOVE(&globalqueue.request_list, work, node); - ret = 0; - qemu_mutex_unlock(&globalqueue.lock); - - return ret; -} - static void paio_cancel(BlockDriverAIOCB *blockacb) { struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb; diff --git a/qemu-thread.h b/qemu-thread.h index c5b579c..19bb30c 100644 --- a/qemu-thread.h +++ b/qemu-thread.h @@ -40,6 +40,5 @@ void qemu_thread_signal(QemuThread *thread, int sig); void qemu_thread_self(QemuThread *thread); int qemu_thread_equal(QemuThread *thread1, QemuThread *thread2); void qemu_thread_exit(void *retval); -void threadlet_init(void); #endif diff --git a/qemu-threadlets.c b/qemu-threadlets.c new file mode 100644 index 0000000..4702c02 --- /dev/null +++ b/qemu-threadlets.c @@ -0,0 +1,148 @@ +/* + * Threadlet support for offloading tasks to be executed asynchronously + * + * Copyright IBM, Corp. 2008 + * Copyright IBM, Corp. 2010 + * + * Authors: + * Anthony Liguori + * Aneesh Kumar K.V + * Gautham R Shenoy + * Arun R Bharadwaj + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + */ + +#include + +#include "qemu-threadlets.h" +#include "osdep.h" + +#define MAX_GLOBAL_THREADS 64 +#define MIN_GLOBAL_THREADS 8 + +static ThreadletQueue globalqueue; +static int globalqueue_init; + +static void threadlet_io_completion_signal_handler(int signum) +{ + qemu_service_io(); +} + +static void threadlet_register_signal_handler(void) +{ + struct sigaction act; + sigfillset(&act.sa_mask); + act.sa_flags = 0; /* do not restart syscalls to interrupt select() */ + act.sa_handler = threadlet_io_completion_signal_handler; + sigaction(SIGUSR2, &act, NULL); +} + +void threadlet_init(void) +{ + threadlet_register_signal_handler(); +} + +static void *threadlet_worker(void *data) +{ + ThreadletQueue *queue = data; + + qemu_mutex_lock(&queue->lock); + while (1) { + ThreadletWork *work; + int ret = 0; + + while (QTAILQ_EMPTY(&queue->request_list) && + (ret != ETIMEDOUT)) { + /* wait for cond to be signalled or broadcast for 1000s */ + ret = qemu_cond_timedwait((&queue->cond), + &(queue->lock), 10*100000); + } + + assert(queue->idle_threads != 0); + if (QTAILQ_EMPTY(&queue->request_list)) { + if (queue->cur_threads > queue->min_threads) { + /* We retain the minimum number of threads */ + break; + } + } else { + work = QTAILQ_FIRST(&queue->request_list); + QTAILQ_REMOVE(&queue->request_list, work, node); + + queue->idle_threads--; + qemu_mutex_unlock(&queue->lock); + + /* execute the work function */ + work->func(work); + + qemu_mutex_lock(&queue->lock); + queue->idle_threads++; + } + } + + queue->idle_threads--; + queue->cur_threads--; + qemu_mutex_unlock(&queue->lock); + + return NULL; +} + +static void spawn_threadlet(ThreadletQueue *queue) +{ + QemuThread thread; + + queue->cur_threads++; + queue->idle_threads++; + + qemu_thread_create(&thread, threadlet_worker, queue); +} + +/** + * submit_work: Submit to the global queue a new task to be executed + * asynchronously. + * @work: Contains information about the task that needs to be submitted. + */ +void submit_work(ThreadletWork *work) +{ + if (!globalqueue_init) { + globalqueue.cur_threads = 0; + globalqueue.idle_threads = 0; + globalqueue.max_threads = MAX_GLOBAL_THREADS; + globalqueue.min_threads = MIN_GLOBAL_THREADS; + QTAILQ_INIT(&globalqueue.request_list); + qemu_mutex_init(&globalqueue.lock); + qemu_cond_init(&globalqueue.cond); + + globalqueue_init = 1; + } + + qemu_mutex_lock(&globalqueue.lock); + if (globalqueue.idle_threads == 0 && + globalqueue.cur_threads < globalqueue.max_threads) { + spawn_threadlet(&globalqueue); + } else { + qemu_cond_signal(&globalqueue.cond); + } + QTAILQ_INSERT_TAIL(&globalqueue.request_list, work, node); + qemu_mutex_unlock(&globalqueue.lock); +} + +/** + * dequeue_work: Cancel a task queued on the global queue. + * @work: Contains the information of the task that needs to be cancelled. + * + * Returns: 0 if the task is successfully cancelled. + * 1 otherwise. + */ +int dequeue_work(ThreadletWork *work) +{ + int ret = 1; + + qemu_mutex_lock(&globalqueue.lock); + QTAILQ_REMOVE(&globalqueue.request_list, work, node); + ret = 0; + qemu_mutex_unlock(&globalqueue.lock); + + return ret; +} diff --git a/qemu-threadlets.h b/qemu-threadlets.h new file mode 100644 index 0000000..03bb86b --- /dev/null +++ b/qemu-threadlets.h @@ -0,0 +1,43 @@ +/* + * Threadlet support for offloading tasks to be executed asynchronously + * + * Copyright IBM, Corp. 2008 + * Copyright IBM, Corp. 2010 + * + * Authors: + * Anthony Liguori + * Aneesh Kumar K.V + * Gautham R Shenoy + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + */ + +#ifndef QEMU_ASYNC_WORK_H +#define QEMU_ASYNC_WORK_H + +#include "qemu-queue.h" +#include "qemu-common.h" +#include "qemu-thread.h" + +typedef struct ThreadletQueue +{ + QemuMutex lock; + QemuCond cond; + int max_threads; + int min_threads; + int cur_threads; + int idle_threads; + QTAILQ_HEAD(, ThreadletWork) request_list; +} ThreadletQueue; + +typedef struct ThreadletWork +{ + QTAILQ_ENTRY(ThreadletWork) node; + void (*func)(struct ThreadletWork *work); +} ThreadletWork; + +void submit_work(ThreadletWork *work); +int dequeue_work(ThreadletWork *work); +void threadlet_init(void); +#endif diff --git a/vl.c b/vl.c index aba805f..7b9a425 100644 --- a/vl.c +++ b/vl.c @@ -148,7 +148,7 @@ int main(int argc, char **argv) #include "qemu-config.h" #include "qemu-objects.h" #include "qemu-options.h" -#include "qemu-thread.h" +#include "qemu-threadlets.h" #ifdef CONFIG_VIRTFS #include "fsdev/qemu-fsdev.h" #endif