* [Qemu-devel] [RFC/ PATCH 0/2] qemu: Asynchronous task offloading framework.
@ 2010-05-10 6:15 Gautham R Shenoy
2010-05-10 6:15 ` [Qemu-devel] [RFC/ PATCH 1/2] qemu: Create a generic asynchronous threading framework to offload tasks Gautham R Shenoy
2010-05-10 6:15 ` [Qemu-devel] [RFC/ PATCH 2/2] qemu: Convert paio code to use the generic threading infrastructure Gautham R Shenoy
0 siblings, 2 replies; 4+ messages in thread
From: Gautham R Shenoy @ 2010-05-10 6:15 UTC (permalink / raw)
To: Qemu-development List
Hi,
This patch series is strictly RFC only.
It implements a generic asynchrnous task offloading threading framework
based on the threading model used in the AIO subsystem.
Currently within QEMU, AIO subsystem is the only one which creates a bunch of
asynchronous threads to execute any blocking operations so that the vcpu threads
and the IO thread can go back to servicing any other guest requests.
We would want to make use of this offloading model in case of virtio-9p where the
asynchronous threads handle the POSIX operations and transfer the control to the
IO thread which, depending on current-state in the state-machine, either offloads
more work to be handled by the asynnchronous thread pool or sends the results of
the operation to the guest.
For this purpose, we need a generic task-offloading infrastructure, and this
patch series implements just that.
The patch series passed fsstress test without any issues.
Awaiting your comments.
---
Aneesh Kumar K.V (1):
qemu: Create a generic asynchronous threading framework to offload tasks
Gautham R Shenoy (1):
qemu: Convert paio code to use the generic threading infrastructure.
Makefile.objs | 2 +
async-work.c | 149 ++++++++++++++++++++++++++++++++++++++++++++++++++
async-work.h | 85 ++++++++++++++++++++++++++++
posix-aio-compat.c | 156 ++++++++++------------------------------------------
4 files changed, 266 insertions(+), 126 deletions(-)
create mode 100644 async-work.c
create mode 100644 async-work.h
--
Thanks and Regards
gautham.
^ permalink raw reply [flat|nested] 4+ messages in thread
* [Qemu-devel] [RFC/ PATCH 1/2] qemu: Create a generic asynchronous threading framework to offload tasks
2010-05-10 6:15 [Qemu-devel] [RFC/ PATCH 0/2] qemu: Asynchronous task offloading framework Gautham R Shenoy
@ 2010-05-10 6:15 ` Gautham R Shenoy
2010-05-10 6:15 ` [Qemu-devel] [RFC/ PATCH 2/2] qemu: Convert paio code to use the generic threading infrastructure Gautham R Shenoy
1 sibling, 0 replies; 4+ messages in thread
From: Gautham R Shenoy @ 2010-05-10 6:15 UTC (permalink / raw)
To: Qemu-development List
From: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
This patch creates a generic asynchronous-task-offloading infrastructure. It's
based on the threading framework that was being used by paio.
The reason why the generic infrastructure has been extracted out of the
posix-aio-compat.c is so that other subsystems, such as virtio-9p could make use
of it for offloading tasks that could block.
[ego@in.ibm.com: work_item_pool, async_work_init, async_work_release,
async_cancel_work]
Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
---
Makefile.objs | 2 +
async-work.c | 149 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
async-work.h | 85 +++++++++++++++++++++++++++++++++
3 files changed, 236 insertions(+), 0 deletions(-)
create mode 100644 async-work.c
create mode 100644 async-work.h
diff --git a/Makefile.objs b/Makefile.objs
index 32fc37f..7a94fff 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -9,6 +9,7 @@ qobject-obj-y += qerror.o
block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
+block-obj-y += async-work.o
block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
@@ -44,6 +45,7 @@ fsdev-obj-$(CONFIG_LINUX_VIRTFS) += $(addprefix fsdev/, $(fsdev-nested-y))
# system emulation, i.e. a single QEMU executable should support all
# CPUs and machines.
+#common-obj-y = $(asyncwork-obj-y)
common-obj-y = $(block-obj-y)
common-obj-y += $(net-obj-y)
common-obj-y += $(qobject-obj-y)
diff --git a/async-work.c b/async-work.c
new file mode 100644
index 0000000..3f28475
--- /dev/null
+++ b/async-work.c
@@ -0,0 +1,149 @@
+/*
+ * Async work support
+ *
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ * Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ *
+ */
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <signal.h>
+#include "async-work.h"
+#include "osdep.h"
+
+static void async_abort(int err, const char *what)
+{
+ fprintf(stderr, "%s failed: %s\n", what, strerror(err));
+ abort();
+}
+
+static void *async_worker_thread(void *data)
+{
+ struct async_queue *queue = data;
+
+ while (1) {
+ struct work_item *work;
+ int ret = 0;
+ qemu_timeval tv;
+ struct timespec ts;
+
+ qemu_gettimeofday(&tv);
+ ts.tv_sec = tv.tv_sec + 10;
+ ts.tv_nsec = 0;
+
+ pthread_mutex_lock(&(queue->lock));
+
+ while (QTAILQ_EMPTY(&(queue->request_list)) &&
+ (ret != ETIMEDOUT)) {
+ ret = pthread_cond_timedwait(&(queue->cond),
+ &(queue->lock), &ts);
+ }
+
+ if (QTAILQ_EMPTY(&(queue->request_list)))
+ break;
+
+ work = QTAILQ_FIRST(&(queue->request_list));
+ QTAILQ_REMOVE(&(queue->request_list), work, node);
+ queue->idle_threads--;
+ pthread_mutex_unlock(&(queue->lock));
+
+ /* execute the work function */
+ work->func(work);
+
+ pthread_mutex_lock(&(queue->lock));
+ queue->idle_threads++;
+ if ((queue->idle_threads > 0) &&
+ (queue->cur_threads > queue->min_threads)) {
+ /* we retain minimum number of threads */
+ break;
+ }
+ pthread_mutex_unlock(&(queue->lock));
+ }
+
+ queue->idle_threads--;
+ queue->cur_threads--;
+ pthread_mutex_unlock(&(queue->lock));
+
+ return NULL;
+}
+
+static void spawn_async_thread(struct async_queue *queue)
+{
+ int ret;
+ pthread_attr_t attr;
+ pthread_t thread_id;
+ sigset_t set, oldset;
+
+ queue->cur_threads++;
+ queue->idle_threads++;
+ ret = pthread_attr_init(&attr);
+ if (ret) {
+ async_abort(ret, "pthread_attr_init");
+ }
+
+ /* create a detached thread so that we don't need to wait on it */
+ ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+ if (ret) {
+ async_abort(ret, "pthread_attr_setdetachstate");
+ }
+
+ /* block all signals */
+ if (sigfillset(&set)) {
+ async_abort(errno, "sigfillset");
+ }
+
+ if (sigprocmask(SIG_SETMASK, &set, &oldset)) {
+ async_abort(errno, "sigprocmask");
+ }
+
+ ret = pthread_create(&thread_id, &attr, async_worker_thread, queue);
+ if (ret) {
+ async_abort(ret, "pthread_create");
+ }
+
+ if (sigprocmask(SIG_SETMASK, &oldset, NULL)) {
+ async_abort(errno, "sigprocmask restore");
+ }
+}
+
+void qemu_async_submit(struct async_queue *queue, struct work_item *work)
+{
+ pthread_mutex_lock(&(queue->lock));
+ if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+ spawn_async_thread(queue);
+ }
+ QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
+ pthread_mutex_unlock(&(queue->lock));
+ pthread_cond_signal(&(queue->cond));
+}
+
+int qemu_async_cancel_work(struct async_queue *queue, struct work_item *work)
+{
+ struct work_item *ret_work;
+ int found = 0;
+
+ pthread_mutex_lock(&(queue->lock));
+ QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
+ if (ret_work == work) {
+ QTAILQ_REMOVE(&(queue->request_list), ret_work, node);
+ found = 1;
+ break;
+ }
+ }
+ pthread_mutex_unlock(&(queue->lock));
+
+ if (found) {
+ async_work_release(queue, work);
+ return 0;
+ }
+
+ return 1;
+}
+
diff --git a/async-work.h b/async-work.h
new file mode 100644
index 0000000..eef60f7
--- /dev/null
+++ b/async-work.h
@@ -0,0 +1,85 @@
+/*
+ * Async work support
+ *
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ * Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ *
+ */
+#ifndef QEMU_ASYNC_WORK_H
+#define QEMU_ASYNC_WORK_H
+
+#include <pthread.h>
+#include "qemu-queue.h"
+#include "qemu-common.h"
+
+struct async_queue
+{
+ pthread_mutex_t lock;
+ pthread_cond_t cond;
+ int max_threads;
+ int min_threads;
+ int cur_threads;
+ int idle_threads;
+ QTAILQ_HEAD(, work_item) request_list;
+ QTAILQ_HEAD(, work_item) work_item_pool;
+};
+
+struct work_item
+{
+ QTAILQ_ENTRY(work_item) node;
+ void (*func)(struct work_item *work);
+ void *private;
+};
+
+static inline void async_queue_init(struct async_queue *queue,
+ int max_threads, int min_threads)
+{
+ queue->cur_threads = 0;
+ queue->idle_threads = 0;
+ queue->max_threads = max_threads;
+ queue->min_threads = min_threads;
+ QTAILQ_INIT(&(queue->request_list));
+ QTAILQ_INIT(&(queue->work_item_pool));
+ pthread_mutex_init(&(queue->lock), NULL);
+ pthread_cond_init(&(queue->cond), NULL);
+}
+
+static inline struct work_item *async_work_init(struct async_queue *queue,
+ void (*func)(struct work_item *),
+ void *data)
+{
+ struct work_item *work;
+ pthread_mutex_lock(&(queue->lock));
+ if (QTAILQ_EMPTY(&(queue->work_item_pool))) {
+ work = qemu_mallocz(sizeof(*work));
+ } else {
+ work = QTAILQ_FIRST(&(queue->work_item_pool));
+ QTAILQ_REMOVE(&(queue->work_item_pool), work, node);
+ }
+
+ work->func = func;
+ work->private = data;
+ pthread_mutex_unlock(&(queue->lock));
+
+ return work;
+}
+
+static inline void async_work_release(struct async_queue *queue,
+ struct work_item *work)
+{
+ pthread_mutex_lock(&(queue->lock));
+ QTAILQ_INSERT_TAIL(&(queue->work_item_pool), work, node);
+ pthread_mutex_unlock(&(queue->lock));
+}
+
+extern void qemu_async_submit(struct async_queue *queue,
+ struct work_item *work);
+
+extern int qemu_async_cancel_work(struct async_queue *queue,
+ struct work_item *work);
+#endif
^ permalink raw reply related [flat|nested] 4+ messages in thread
* [Qemu-devel] [RFC/ PATCH 2/2] qemu: Convert paio code to use the generic threading infrastructure.
2010-05-10 6:15 [Qemu-devel] [RFC/ PATCH 0/2] qemu: Asynchronous task offloading framework Gautham R Shenoy
2010-05-10 6:15 ` [Qemu-devel] [RFC/ PATCH 1/2] qemu: Create a generic asynchronous threading framework to offload tasks Gautham R Shenoy
@ 2010-05-10 6:15 ` Gautham R Shenoy
2010-05-13 5:52 ` Venkateswararao Jujjuri (JV)
1 sibling, 1 reply; 4+ messages in thread
From: Gautham R Shenoy @ 2010-05-10 6:15 UTC (permalink / raw)
To: Qemu-development List
This patch makes the paio subsystem use the generic work offloading
infrastructure.
The patch has been tested with fstress.
Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
---
posix-aio-compat.c | 156 ++++++++++------------------------------------------
1 files changed, 30 insertions(+), 126 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index b43c531..1b405e4 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -28,6 +28,7 @@
#include "block_int.h"
#include "block/raw-posix-aio.h"
+#include "async-work.h"
struct qemu_paiocb {
@@ -50,6 +51,7 @@ struct qemu_paiocb {
struct qemu_paiocb *next;
int async_context_id;
+ struct work_item *work;
};
typedef struct PosixAioState {
@@ -57,15 +59,8 @@ typedef struct PosixAioState {
struct qemu_paiocb *first_aio;
} PosixAioState;
-
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
+static struct async_queue aio_request_list;
#ifdef CONFIG_PREADV
static int preadv_present = 1;
@@ -84,39 +79,6 @@ static void die(const char *what)
die2(errno, what);
}
-static void mutex_lock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_lock(mutex);
- if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_unlock(mutex);
- if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
- struct timespec *ts)
-{
- int ret = pthread_cond_timedwait(cond, mutex, ts);
- if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
- return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
- int ret = pthread_cond_signal(cond);
- if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
- void *(*start_routine)(void*), void *arg)
-{
- int ret = pthread_create(thread, attr, start_routine, arg);
- if (ret) die2(ret, "pthread_create");
-}
-
static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
{
int ret;
@@ -300,47 +262,27 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
return nbytes;
}
-static void *aio_thread(void *unused)
+static void aio_thread(struct work_item *work)
{
- pid_t pid;
- pid = getpid();
-
- while (1) {
- struct qemu_paiocb *aiocb;
- ssize_t ret = 0;
- qemu_timeval tv;
- struct timespec ts;
-
- qemu_gettimeofday(&tv);
- ts.tv_sec = tv.tv_sec + 10;
- ts.tv_nsec = 0;
-
- mutex_lock(&lock);
+ pid_t pid;
- while (QTAILQ_EMPTY(&request_list) &&
- !(ret == ETIMEDOUT)) {
- ret = cond_timedwait(&cond, &lock, &ts);
- }
+ struct qemu_paiocb *aiocb = (struct qemu_paiocb *) work->private;
+ ssize_t ret = 0;
- if (QTAILQ_EMPTY(&request_list))
- break;
+ pid = getpid();
- aiocb = QTAILQ_FIRST(&request_list);
- QTAILQ_REMOVE(&request_list, aiocb, node);
- aiocb->active = 1;
- idle_threads--;
- mutex_unlock(&lock);
+ aiocb->active = 1;
- switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
- case QEMU_AIO_READ:
- case QEMU_AIO_WRITE:
+ switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+ case QEMU_AIO_READ:
+ case QEMU_AIO_WRITE:
ret = handle_aiocb_rw(aiocb);
break;
- case QEMU_AIO_FLUSH:
- ret = handle_aiocb_flush(aiocb);
- break;
- case QEMU_AIO_IOCTL:
+ case QEMU_AIO_FLUSH:
+ ret = handle_aiocb_flush(aiocb);
+ break;
+ case QEMU_AIO_IOCTL:
ret = handle_aiocb_ioctl(aiocb);
break;
default:
@@ -349,57 +291,29 @@ static void *aio_thread(void *unused)
break;
}
- mutex_lock(&lock);
- aiocb->ret = ret;
- idle_threads++;
- mutex_unlock(&lock);
-
- if (kill(pid, aiocb->ev_signo)) die("kill failed");
- }
-
- idle_threads--;
- cur_threads--;
- mutex_unlock(&lock);
+ aiocb->ret = ret;
- return NULL;
-}
-
-static void spawn_thread(void)
-{
- sigset_t set, oldset;
-
- cur_threads++;
- idle_threads++;
-
- /* block all signals */
- if (sigfillset(&set)) die("sigfillset");
- if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
- thread_create(&thread_id, &attr, aio_thread, NULL);
-
- if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+ if (kill(pid, aiocb->ev_signo)) die("kill failed");
+ async_work_release(&aio_request_list, work);
}
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
{
+ struct work_item *work;
+
aiocb->ret = -EINPROGRESS;
aiocb->active = 0;
- mutex_lock(&lock);
- if (idle_threads == 0 && cur_threads < max_threads)
- spawn_thread();
- QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
- mutex_unlock(&lock);
- cond_signal(&cond);
+
+ work = async_work_init(&aio_request_list, aio_thread, aiocb);
+ aiocb->work = work;
+ qemu_async_submit(&aio_request_list, work);
}
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
{
ssize_t ret;
- mutex_lock(&lock);
ret = aiocb->ret;
- mutex_unlock(&lock);
-
return ret;
}
@@ -535,14 +449,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
int active = 0;
- mutex_lock(&lock);
if (!acb->active) {
- QTAILQ_REMOVE(&request_list, acb, node);
- acb->ret = -ECANCELED;
+ if (!qemu_async_cancel_work(&aio_request_list, acb->work))
+ acb->ret = -ECANCELED;
+ else
+ active = 1;
} else if (acb->ret == -EINPROGRESS) {
active = 1;
}
- mutex_unlock(&lock);
if (active) {
/* fail safe: if the aio could not be canceled, we wait for
@@ -615,7 +529,6 @@ int paio_init(void)
struct sigaction act;
PosixAioState *s;
int fds[2];
- int ret;
if (posix_aio_state)
return 0;
@@ -642,16 +555,7 @@ int paio_init(void)
qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
posix_aio_process_queue, s);
- ret = pthread_attr_init(&attr);
- if (ret)
- die2(ret, "pthread_attr_init");
-
- ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- if (ret)
- die2(ret, "pthread_attr_setdetachstate");
-
- QTAILQ_INIT(&request_list);
-
posix_aio_state = s;
+ async_queue_init(&aio_request_list, max_threads, max_threads);
return 0;
}
^ permalink raw reply related [flat|nested] 4+ messages in thread
* Re: [Qemu-devel] [RFC/ PATCH 2/2] qemu: Convert paio code to use the generic threading infrastructure.
2010-05-10 6:15 ` [Qemu-devel] [RFC/ PATCH 2/2] qemu: Convert paio code to use the generic threading infrastructure Gautham R Shenoy
@ 2010-05-13 5:52 ` Venkateswararao Jujjuri (JV)
0 siblings, 0 replies; 4+ messages in thread
From: Venkateswararao Jujjuri (JV) @ 2010-05-13 5:52 UTC (permalink / raw)
To: Gautham R Shenoy; +Cc: Qemu-development List
Gautham R Shenoy wrote:
> This patch makes the paio subsystem use the generic work offloading
> infrastructure.
>
> The patch has been tested with fstress.
>
> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
Gauthem, surprisingly this makes the code really readable.
Few comments below.
> ---
> posix-aio-compat.c | 156 ++++++++++------------------------------------------
> 1 files changed, 30 insertions(+), 126 deletions(-)
>
> diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> index b43c531..1b405e4 100644
> --- a/posix-aio-compat.c
> +++ b/posix-aio-compat.c
> @@ -28,6 +28,7 @@
> #include "block_int.h"
<SKIP>
> -
> static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
> {
> int ret;
> @@ -300,47 +262,27 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
> return nbytes;
> }
>
> -static void *aio_thread(void *unused)
> +static void aio_thread(struct work_item *work)
May be we should name it different as it is not a thread by itself?
> {
> - pid_t pid;
>
> - pid = getpid();
> -
> - while (1) {
> - struct qemu_paiocb *aiocb;
> - ssize_t ret = 0;
> - qemu_timeval tv;
> - struct timespec ts;
> -
> - qemu_gettimeofday(&tv);
> - ts.tv_sec = tv.tv_sec + 10;
> - ts.tv_nsec = 0;
> -
> - mutex_lock(&lock);
> + pid_t pid;
>
> - while (QTAILQ_EMPTY(&request_list) &&
> - !(ret == ETIMEDOUT)) {
> - ret = cond_timedwait(&cond, &lock, &ts);
> - }
> + struct qemu_paiocb *aiocb = (struct qemu_paiocb *) work->private;
> + ssize_t ret = 0;
>
> - if (QTAILQ_EMPTY(&request_list))
> - break;
> + pid = getpid();
>
> - aiocb = QTAILQ_FIRST(&request_list);
> - QTAILQ_REMOVE(&request_list, aiocb, node);
> - aiocb->active = 1;
> - idle_threads--;
> - mutex_unlock(&lock);
> + aiocb->active = 1;
>
> - switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> - case QEMU_AIO_READ:
> - case QEMU_AIO_WRITE:
> + switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> + case QEMU_AIO_READ:
> + case QEMU_AIO_WRITE:
> ret = handle_aiocb_rw(aiocb);
> break;
> - case QEMU_AIO_FLUSH:
> - ret = handle_aiocb_flush(aiocb);
> - break;
> - case QEMU_AIO_IOCTL:
> + case QEMU_AIO_FLUSH:
> + ret = handle_aiocb_flush(aiocb);
> + break;
> + case QEMU_AIO_IOCTL:
> ret = handle_aiocb_ioctl(aiocb);
> break;
> default:
> @@ -349,57 +291,29 @@ static void *aio_thread(void *unused)
> break;
> }
>
> - mutex_lock(&lock);
> - aiocb->ret = ret;
> - idle_threads++;
> - mutex_unlock(&lock);
> -
> - if (kill(pid, aiocb->ev_signo)) die("kill failed");
> - }
> -
> - idle_threads--;
> - cur_threads--;
> - mutex_unlock(&lock);
> + aiocb->ret = ret;
>
> - return NULL;
> -}
> -
> -static void spawn_thread(void)
> -{
> - sigset_t set, oldset;
> -
> - cur_threads++;
> - idle_threads++;
> -
> - /* block all signals */
> - if (sigfillset(&set)) die("sigfillset");
> - if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
> -
> - thread_create(&thread_id, &attr, aio_thread, NULL);
> -
> - if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
> + if (kill(pid, aiocb->ev_signo)) die("kill failed");
Instead of doing a write/sending signal in the handler context, why not move this
also to the thread pool infrastructure. That way, handlers can be little more segregated
from the aio thread mechanism.
Thanks,
JV
> + async_work_release(&aio_request_list, work);
> }
>
> static void qemu_paio_submit(struct qemu_paiocb *aiocb)
> {
> + struct work_item *work;
> +
> aiocb->ret = -EINPROGRESS;
> aiocb->active = 0;
> - mutex_lock(&lock);
> - if (idle_threads == 0 && cur_threads < max_threads)
> - spawn_thread();
> - QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
> - mutex_unlock(&lock);
> - cond_signal(&cond);
> +
> + work = async_work_init(&aio_request_list, aio_thread, aiocb);
> + aiocb->work = work;
> + qemu_async_submit(&aio_request_list, work);
> }
>
> static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
> {
> ssize_t ret;
>
> - mutex_lock(&lock);
> ret = aiocb->ret;
> - mutex_unlock(&lock);
> -
> return ret;
> }
>
> @@ -535,14 +449,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
> struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> int active = 0;
>
> - mutex_lock(&lock);
> if (!acb->active) {
> - QTAILQ_REMOVE(&request_list, acb, node);
> - acb->ret = -ECANCELED;
> + if (!qemu_async_cancel_work(&aio_request_list, acb->work))
> + acb->ret = -ECANCELED;
> + else
> + active = 1;
> } else if (acb->ret == -EINPROGRESS) {
> active = 1;
> }
> - mutex_unlock(&lock);
>
> if (active) {
> /* fail safe: if the aio could not be canceled, we wait for
> @@ -615,7 +529,6 @@ int paio_init(void)
> struct sigaction act;
> PosixAioState *s;
> int fds[2];
> - int ret;
>
> if (posix_aio_state)
> return 0;
> @@ -642,16 +555,7 @@ int paio_init(void)
> qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
> posix_aio_process_queue, s);
>
> - ret = pthread_attr_init(&attr);
> - if (ret)
> - die2(ret, "pthread_attr_init");
> -
> - ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
> - if (ret)
> - die2(ret, "pthread_attr_setdetachstate");
> -
> - QTAILQ_INIT(&request_list);
> -
> posix_aio_state = s;
> + async_queue_init(&aio_request_list, max_threads, max_threads);
> return 0;
> }
>
>
>
^ permalink raw reply [flat|nested] 4+ messages in thread
end of thread, other threads:[~2010-05-13 5:52 UTC | newest]
Thread overview: 4+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2010-05-10 6:15 [Qemu-devel] [RFC/ PATCH 0/2] qemu: Asynchronous task offloading framework Gautham R Shenoy
2010-05-10 6:15 ` [Qemu-devel] [RFC/ PATCH 1/2] qemu: Create a generic asynchronous threading framework to offload tasks Gautham R Shenoy
2010-05-10 6:15 ` [Qemu-devel] [RFC/ PATCH 2/2] qemu: Convert paio code to use the generic threading infrastructure Gautham R Shenoy
2010-05-13 5:52 ` Venkateswararao Jujjuri (JV)
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).