qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
* [Qemu-devel] [RFC/ PATCH 0/4] qemu: Extend AIO threading framework to a generic one.
@ 2010-05-24 12:52 Gautham R Shenoy
  2010-05-24 12:53 ` [Qemu-devel] [RFC/ PATCH 1/4] qemu: Generic asynchronous threading framework to offload tasks Gautham R Shenoy
                   ` (3 more replies)
  0 siblings, 4 replies; 7+ messages in thread
From: Gautham R Shenoy @ 2010-05-24 12:52 UTC (permalink / raw)
  To: Qemu-development List; +Cc: Anthony Liguori, Avi Kivity

Hi,

This patch series is strictly RFC only.

It decouples the asynchrnous threading framework implementation
from posix-aio-compat.c to implement a generic asynchrnous task
offloading threading framework which can be used by other subsystems
within QEMU.

Currently within QEMU, the AIO subsystem (paio) creates a bunch of
asynchronous threads to offload any blocking operations so that
the vcpu threads and the IO thread can go back to servicing any
other guest requests.

We would want to make use of this offloading framework in case of
the virtio-9p subsystem, where so that the vcpu thread can offload 
POSIX operations on to the asynchronous threads and resume servicing any other
guest requests. The asynchrnous threads, after finishing the POSIX operations
can then transfer the control over to the IO thread so that the latter
can handle the post_posix_operation().

The post_posix_operation() could in turn offload more work to be handled by
the asynnchronous thread pool or sends the results of the operation to the
guest.

The patch series also implements a patch that converts v9fs_stat() call
to make use of the asynchrnous threading framework. This is an example of how
the other calls could be converted eventually.

I had a doubt with respect to the original code where the communication between
the asynchronous aio threads and the io-thread occurs.

In posix-aio-compat.c, we do not do a write(wfd) from the context of the
asynchrnous thread, but instead send a SIGUSR2, the handler of which
does a write(wfd), thereby causing the iothread blocked on select() to unblock.

Why do we send a signal, and not execute whatever aio_signal_handler()
is from the context of the asynchronous thread ? Is it to ensure that the
io-thread does not miss any write(wfd) ?

In this patch series, I have made the virtio-9p make use of SIGUSR1 to
mimic similar behaviour. This is just a temporary piece of code to get
virtio-9p use the async framework for now.

The patch series passed fsstress test without any issues.

Awaiting your comments.
---

Aneesh Kumar K.V (1):
      qemu: Generic asynchronous threading framework to offload tasks

Gautham R Shenoy (3):
      qemu: Convert AIO code to use the generic threading infrastructure.
      virtio-9p: Add async helper functions
      virtio-9p: convert lstat to use async infrastructure.


 Makefile.objs      |    2 +
 async-work.c       |  152 +++++++++++++++++++++++++++++++++++++++++
 async-work.h       |   85 +++++++++++++++++++++++
 hw/virtio-9p.c     |  195 ++++++++++++++++++++++++++++++++++++++++++++++++++--
 hw/virtio-9p.h     |    4 +
 posix-aio-compat.c |  155 ++++++++---------------------------------
 6 files changed, 458 insertions(+), 135 deletions(-)
 create mode 100644 async-work.c
 create mode 100644 async-work.h

-- 
Thanks and Regards
gautham.

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [Qemu-devel] [RFC/ PATCH 1/4] qemu: Generic asynchronous threading framework to offload tasks
  2010-05-24 12:52 [Qemu-devel] [RFC/ PATCH 0/4] qemu: Extend AIO threading framework to a generic one Gautham R Shenoy
@ 2010-05-24 12:53 ` Gautham R Shenoy
  2010-05-24 14:16   ` malc
  2010-05-24 14:54   ` Corentin Chary
  2010-05-24 12:53 ` [Qemu-devel] [RFC/ PATCH 2/4] qemu: Convert AIO code to use the generic threading infrastructure Gautham R Shenoy
                   ` (2 subsequent siblings)
  3 siblings, 2 replies; 7+ messages in thread
From: Gautham R Shenoy @ 2010-05-24 12:53 UTC (permalink / raw)
  To: Qemu-development List; +Cc: Anthony Liguori, Avi Kivity, Aneesh Kumar K.V

From: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>

This patch creates a generic asynchronous-task-offloading infrastructure. It's
extracted out of the threading framework that is being used by paio.

The reason for extracting out this generic infrastructure of the
posix-aio-compat.c is so that other subsystems, such as virtio-9p could make use
of it for offloading tasks that could block.

[ego@in.ibm.com: work_item_pool, async_work_init, async_work_release,
async_cancel_work]

Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
---
 Makefile.objs |    2 +
 async-work.c  |  152 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 async-work.h  |   85 ++++++++++++++++++++++++++++++++
 3 files changed, 239 insertions(+), 0 deletions(-)
 create mode 100644 async-work.c
 create mode 100644 async-work.h

diff --git a/Makefile.objs b/Makefile.objs
index ecdd53e..faf3d67 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -9,6 +9,7 @@ qobject-obj-y += qerror.o
 
 block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
 block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
+block-obj-y += async-work.o
 block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 
@@ -44,6 +45,7 @@ fsdev-obj-$(CONFIG_LINUX) += $(addprefix fsdev/, $(fsdev-nested-y))
 # system emulation, i.e. a single QEMU executable should support all
 # CPUs and machines.
 
+#common-obj-y = $(asyncwork-obj-y)
 common-obj-y = $(block-obj-y)
 common-obj-y += $(net-obj-y)
 common-obj-y += $(qobject-obj-y)
diff --git a/async-work.c b/async-work.c
new file mode 100644
index 0000000..5195bbf
--- /dev/null
+++ b/async-work.c
@@ -0,0 +1,152 @@
+/*
+ * Async work support
+ *
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ */
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <signal.h>
+#include "async-work.h"
+#include "osdep.h"
+
+static void async_abort(int err, const char *what)
+{
+    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
+    abort();
+}
+
+static void *async_worker_thread(void *data)
+{
+    struct async_queue *queue = data;
+
+    while (1) {
+        struct work_item *work;
+        int ret = 0;
+        qemu_timeval tv;
+        struct timespec ts;
+
+        qemu_gettimeofday(&tv);
+        ts.tv_sec = tv.tv_sec + 10;
+        ts.tv_nsec = 0;
+
+        pthread_mutex_lock(&(queue->lock));
+
+        while (QTAILQ_EMPTY(&(queue->request_list)) &&
+               (ret != ETIMEDOUT)) {
+            ret = pthread_cond_timedwait(&(queue->cond),
+					 &(queue->lock), &ts);
+        }
+
+        if (QTAILQ_EMPTY(&(queue->request_list)))
+            goto check_exit;
+
+        work = QTAILQ_FIRST(&(queue->request_list));
+        QTAILQ_REMOVE(&(queue->request_list), work, node);
+        queue->idle_threads--;
+        pthread_mutex_unlock(&(queue->lock));
+
+        /* execute the work function */
+        work->func(work);
+        async_work_release(queue, work);
+
+        pthread_mutex_lock(&(queue->lock));
+        queue->idle_threads++;
+
+check_exit:
+        if ((queue->idle_threads > 0) &&
+            (queue->cur_threads > queue->min_threads)) {
+            /* we retain minimum number of threads */
+            break;
+        }
+        pthread_mutex_unlock(&(queue->lock));
+    }
+
+    queue->idle_threads--;
+    queue->cur_threads--;
+    pthread_mutex_unlock(&(queue->lock));
+
+    return NULL;
+}
+
+static void spawn_async_thread(struct async_queue *queue)
+{
+    int ret;
+    pthread_attr_t attr;
+    pthread_t thread_id;
+    sigset_t set, oldset;
+
+    queue->cur_threads++;
+    queue->idle_threads++;
+    ret = pthread_attr_init(&attr);
+    if (ret) {
+        async_abort(ret, "pthread_attr_init");
+    }
+
+    /* create a detached thread so that we don't need to wait on it */
+    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+    if (ret) {
+        async_abort(ret, "pthread_attr_setdetachstate");
+    }
+
+    /* block all signals */
+    if (sigfillset(&set)) {
+        async_abort(errno, "sigfillset");
+    }
+
+    if (sigprocmask(SIG_SETMASK, &set, &oldset)) {
+        async_abort(errno, "sigprocmask");
+    }
+
+    ret = pthread_create(&thread_id, &attr, async_worker_thread, queue);
+    if (ret) {
+        async_abort(ret, "pthread_create");
+    }
+
+    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) {
+        async_abort(errno, "sigprocmask restore");
+    }
+}
+
+void qemu_async_submit(struct async_queue *queue, struct work_item *work)
+{
+    pthread_mutex_lock(&(queue->lock));
+    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+        spawn_async_thread(queue);
+    }
+    QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
+    pthread_mutex_unlock(&(queue->lock));
+    pthread_cond_signal(&(queue->cond));
+}
+
+int qemu_async_cancel_work(struct async_queue *queue, struct work_item *work)
+{
+    struct work_item *ret_work;
+    int found = 0;
+
+    pthread_mutex_lock(&(queue->lock));
+    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
+        if (ret_work == work) {
+            QTAILQ_REMOVE(&(queue->request_list), ret_work, node);
+            found = 1;
+            break;
+        }
+    }
+    pthread_mutex_unlock(&(queue->lock));
+
+    if (found) {
+        async_work_release(queue, work);
+        return 0;
+    }
+
+    return 1;
+}
+
diff --git a/async-work.h b/async-work.h
new file mode 100644
index 0000000..eef60f7
--- /dev/null
+++ b/async-work.h
@@ -0,0 +1,85 @@
+/*
+ * Async work support
+ *
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ */
+#ifndef QEMU_ASYNC_WORK_H
+#define QEMU_ASYNC_WORK_H
+
+#include <pthread.h>
+#include "qemu-queue.h"
+#include "qemu-common.h"
+
+struct async_queue
+{
+    pthread_mutex_t lock;
+    pthread_cond_t cond;
+    int max_threads;
+    int min_threads;
+    int cur_threads;
+    int idle_threads;
+    QTAILQ_HEAD(, work_item) request_list;
+    QTAILQ_HEAD(, work_item) work_item_pool;
+};
+
+struct work_item
+{
+    QTAILQ_ENTRY(work_item) node;
+    void (*func)(struct work_item *work);
+    void *private;
+};
+
+static inline void async_queue_init(struct async_queue *queue,
+				    int max_threads, int min_threads)
+{
+    queue->cur_threads  = 0;
+    queue->idle_threads = 0;
+    queue->max_threads  = max_threads;
+    queue->min_threads  = min_threads;
+    QTAILQ_INIT(&(queue->request_list));
+    QTAILQ_INIT(&(queue->work_item_pool));
+    pthread_mutex_init(&(queue->lock), NULL);
+    pthread_cond_init(&(queue->cond), NULL);
+}
+
+static inline struct work_item *async_work_init(struct async_queue *queue,
+				   void (*func)(struct work_item *),
+				   void *data)
+{
+    struct work_item *work;
+    pthread_mutex_lock(&(queue->lock));
+    if (QTAILQ_EMPTY(&(queue->work_item_pool))) {
+        work = qemu_mallocz(sizeof(*work));
+    } else {
+        work = QTAILQ_FIRST(&(queue->work_item_pool));
+        QTAILQ_REMOVE(&(queue->work_item_pool), work, node);
+    }
+
+    work->func  = func;
+    work->private  = data;
+    pthread_mutex_unlock(&(queue->lock));
+
+    return work;
+}
+
+static inline void async_work_release(struct async_queue *queue,
+                                        struct work_item *work)
+{
+    pthread_mutex_lock(&(queue->lock));
+    QTAILQ_INSERT_TAIL(&(queue->work_item_pool), work, node);
+    pthread_mutex_unlock(&(queue->lock));
+}
+
+extern void qemu_async_submit(struct async_queue *queue,
+				  struct work_item *work);
+
+extern int qemu_async_cancel_work(struct async_queue *queue,
+                    struct work_item *work);
+#endif

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [Qemu-devel] [RFC/ PATCH 2/4] qemu: Convert AIO code to use the generic threading infrastructure.
  2010-05-24 12:52 [Qemu-devel] [RFC/ PATCH 0/4] qemu: Extend AIO threading framework to a generic one Gautham R Shenoy
  2010-05-24 12:53 ` [Qemu-devel] [RFC/ PATCH 1/4] qemu: Generic asynchronous threading framework to offload tasks Gautham R Shenoy
@ 2010-05-24 12:53 ` Gautham R Shenoy
  2010-05-24 12:53 ` [Qemu-devel] [RFC/ PATCH 3/4] virtio-9p: Add async helper functions Gautham R Shenoy
  2010-05-24 12:53 ` [Qemu-devel] [RFC/ PATCH 4/4] virtio-9p: convert lstat to use async infrastructure Gautham R Shenoy
  3 siblings, 0 replies; 7+ messages in thread
From: Gautham R Shenoy @ 2010-05-24 12:53 UTC (permalink / raw)
  To: Qemu-development List; +Cc: Anthony Liguori, Avi Kivity

This patch makes the paio subsystem use the generic work offloading
infrastructure, there by decoupling asynchronous threading framework portion
out of posix-aio-compat.c

The patch has been tested with fstress.

Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
---
 posix-aio-compat.c |  155 ++++++++++------------------------------------------
 1 files changed, 29 insertions(+), 126 deletions(-)

diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index b43c531..f2e7c6a 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -28,6 +28,7 @@
 #include "block_int.h"
 
 #include "block/raw-posix-aio.h"
+#include "async-work.h"
 
 
 struct qemu_paiocb {
@@ -50,6 +51,7 @@ struct qemu_paiocb {
     struct qemu_paiocb *next;
 
     int async_context_id;
+    struct work_item *work;
 };
 
 typedef struct PosixAioState {
@@ -57,15 +59,8 @@ typedef struct PosixAioState {
     struct qemu_paiocb *first_aio;
 } PosixAioState;
 
-
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
 static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
+static struct async_queue aio_request_list;
 
 #ifdef CONFIG_PREADV
 static int preadv_present = 1;
@@ -84,39 +79,6 @@ static void die(const char *what)
     die2(errno, what);
 }
 
-static void mutex_lock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_lock(mutex);
-    if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_unlock(mutex);
-    if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
-                           struct timespec *ts)
-{
-    int ret = pthread_cond_timedwait(cond, mutex, ts);
-    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
-    return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
-    int ret = pthread_cond_signal(cond);
-    if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
-                          void *(*start_routine)(void*), void *arg)
-{
-    int ret = pthread_create(thread, attr, start_routine, arg);
-    if (ret) die2(ret, "pthread_create");
-}
-
 static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
 {
 	int ret;
@@ -300,47 +262,27 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
     return nbytes;
 }
 
-static void *aio_thread(void *unused)
+static void aio_thread(struct work_item *work)
 {
-    pid_t pid;
 
-    pid = getpid();
-
-    while (1) {
-        struct qemu_paiocb *aiocb;
-        ssize_t ret = 0;
-        qemu_timeval tv;
-        struct timespec ts;
-
-        qemu_gettimeofday(&tv);
-        ts.tv_sec = tv.tv_sec + 10;
-        ts.tv_nsec = 0;
-
-        mutex_lock(&lock);
+    pid_t pid;
 
-        while (QTAILQ_EMPTY(&request_list) &&
-               !(ret == ETIMEDOUT)) {
-            ret = cond_timedwait(&cond, &lock, &ts);
-        }
+    struct qemu_paiocb *aiocb = (struct qemu_paiocb *) work->private;
+    ssize_t ret = 0;
 
-        if (QTAILQ_EMPTY(&request_list))
-            break;
+    pid = getpid();
 
-        aiocb = QTAILQ_FIRST(&request_list);
-        QTAILQ_REMOVE(&request_list, aiocb, node);
-        aiocb->active = 1;
-        idle_threads--;
-        mutex_unlock(&lock);
+    aiocb->active = 1;
 
-        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
-        case QEMU_AIO_READ:
-        case QEMU_AIO_WRITE:
+    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+    case QEMU_AIO_READ:
+    case QEMU_AIO_WRITE:
 		ret = handle_aiocb_rw(aiocb);
 		break;
-        case QEMU_AIO_FLUSH:
-                ret = handle_aiocb_flush(aiocb);
-                break;
-        case QEMU_AIO_IOCTL:
+    case QEMU_AIO_FLUSH:
+        ret = handle_aiocb_flush(aiocb);
+        break;
+    case QEMU_AIO_IOCTL:
 		ret = handle_aiocb_ioctl(aiocb);
 		break;
 	default:
@@ -349,57 +291,28 @@ static void *aio_thread(void *unused)
 		break;
 	}
 
-        mutex_lock(&lock);
-        aiocb->ret = ret;
-        idle_threads++;
-        mutex_unlock(&lock);
-
-        if (kill(pid, aiocb->ev_signo)) die("kill failed");
-    }
-
-    idle_threads--;
-    cur_threads--;
-    mutex_unlock(&lock);
+    aiocb->ret = ret;
 
-    return NULL;
-}
-
-static void spawn_thread(void)
-{
-    sigset_t set, oldset;
-
-    cur_threads++;
-    idle_threads++;
-
-    /* block all signals */
-    if (sigfillset(&set)) die("sigfillset");
-    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
-    thread_create(&thread_id, &attr, aio_thread, NULL);
-
-    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+    if (kill(pid, aiocb->ev_signo)) die("kill failed");
 }
 
 static void qemu_paio_submit(struct qemu_paiocb *aiocb)
 {
+    struct work_item *work;
+
     aiocb->ret = -EINPROGRESS;
     aiocb->active = 0;
-    mutex_lock(&lock);
-    if (idle_threads == 0 && cur_threads < max_threads)
-        spawn_thread();
-    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
-    mutex_unlock(&lock);
-    cond_signal(&cond);
+
+    work = async_work_init(&aio_request_list, aio_thread, aiocb);
+    aiocb->work = work;
+    qemu_async_submit(&aio_request_list, work);
 }
 
 static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
 {
     ssize_t ret;
 
-    mutex_lock(&lock);
     ret = aiocb->ret;
-    mutex_unlock(&lock);
-
     return ret;
 }
 
@@ -535,14 +448,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
     int active = 0;
 
-    mutex_lock(&lock);
     if (!acb->active) {
-        QTAILQ_REMOVE(&request_list, acb, node);
-        acb->ret = -ECANCELED;
+        if (!qemu_async_cancel_work(&aio_request_list, acb->work))
+            acb->ret = -ECANCELED;
+         else
+            active = 1;
     } else if (acb->ret == -EINPROGRESS) {
         active = 1;
     }
-    mutex_unlock(&lock);
 
     if (active) {
         /* fail safe: if the aio could not be canceled, we wait for
@@ -615,7 +528,6 @@ int paio_init(void)
     struct sigaction act;
     PosixAioState *s;
     int fds[2];
-    int ret;
 
     if (posix_aio_state)
         return 0;
@@ -642,16 +554,7 @@ int paio_init(void)
     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
         posix_aio_process_queue, s);
 
-    ret = pthread_attr_init(&attr);
-    if (ret)
-        die2(ret, "pthread_attr_init");
-
-    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-    if (ret)
-        die2(ret, "pthread_attr_setdetachstate");
-
-    QTAILQ_INIT(&request_list);
-
     posix_aio_state = s;
+    async_queue_init(&aio_request_list, max_threads, max_threads);
     return 0;
 }

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [Qemu-devel] [RFC/ PATCH 3/4] virtio-9p: Add async helper functions
  2010-05-24 12:52 [Qemu-devel] [RFC/ PATCH 0/4] qemu: Extend AIO threading framework to a generic one Gautham R Shenoy
  2010-05-24 12:53 ` [Qemu-devel] [RFC/ PATCH 1/4] qemu: Generic asynchronous threading framework to offload tasks Gautham R Shenoy
  2010-05-24 12:53 ` [Qemu-devel] [RFC/ PATCH 2/4] qemu: Convert AIO code to use the generic threading infrastructure Gautham R Shenoy
@ 2010-05-24 12:53 ` Gautham R Shenoy
  2010-05-24 12:53 ` [Qemu-devel] [RFC/ PATCH 4/4] virtio-9p: convert lstat to use async infrastructure Gautham R Shenoy
  3 siblings, 0 replies; 7+ messages in thread
From: Gautham R Shenoy @ 2010-05-24 12:53 UTC (permalink / raw)
  To: Qemu-development List; +Cc: Anthony Liguori, Avi Kivity

Add helper functions to enable virtio-9p make use of the generic asynchronous
threading framework for offloading blocking tasks such as making posix calls on
to the asynchronous threads and handle the post_posix_operations() from the
context of the iothread. This frees the vcpu thread to process any other guest
operations while the processing of v9fs_io is in progress.

Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
---
 hw/virtio-9p.c |  167 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 167 insertions(+), 0 deletions(-)

diff --git a/hw/virtio-9p.c b/hw/virtio-9p.c
index 82bb663..f8f60d3 100644
--- a/hw/virtio-9p.c
+++ b/hw/virtio-9p.c
@@ -17,10 +17,147 @@
 #include "virtio-9p.h"
 #include "fsdev/qemu-fsdev.h"
 #include "virtio-9p-debug.h"
+#include "async-work.h"
 
 int dotu = 1;
 int debug_9p_pdu;
 
+struct v9fs_post_op {
+    QTAILQ_ENTRY(v9fs_post_op) node;
+    void (*func)(void *arg);
+    void *arg;
+};
+
+static struct {
+    struct async_queue virtio_9p_aqueue;
+    int rfd;
+    int wfd;
+    pthread_mutex_t lock;
+    QTAILQ_HEAD(, v9fs_post_op) post_op_list;
+} v9fs_async_struct;
+
+static void die2(int err, const char *what)
+{
+    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
+    abort();
+}
+
+static void die(const char *what)
+{
+    die2(errno, what);
+}
+
+/**
+ * v9fs_signal_handler: Handle the SIGUSR1 signal.
+ * @signum: Is SIGUSR1 in this case.
+ *
+ * This function is used to inform the iothread that a particular
+ * async-operation pertaining to v9fs has been completed and that the io thread
+ * can handle the v9fs_post_posix_operation.
+ *
+ * This is based on the aio_signal_handler
+ */
+static void v9fs_signal_handler(int signum)
+{
+    char byte = 0;
+    ssize_t ret;
+
+    printf("Writing to file descriptor %d\n", v9fs_async_struct.wfd);
+    ret = write(v9fs_async_struct.wfd, &byte, sizeof(byte));
+
+    if (ret < 0 && errno != EAGAIN)
+        die("write() in v9fs");
+
+    qemu_service_io();
+}
+
+#define ASYNC_MAX_PROCESS   5
+
+/**
+ * v9fs_process_post_ops: Process any pending v9fs_post_posix_operation
+ * @arg: Not used.
+ *
+ * This function serves as a callback to the iothread to be called into whenever
+ * the v9fs_async_struct.wfd is written into. This thread goes through the list
+ * of v9fs_post_posix_operations() and executes them. In the process, it might
+ * queue more job on the asynchronous thread pool.
+ */
+static void v9fs_process_post_ops(void *arg)
+{
+    int count = 0;
+    struct v9fs_post_op *post_op;
+
+    pthread_mutex_lock(&v9fs_async_struct.lock);
+    for (count = 0; count < ASYNC_MAX_PROCESS; count++) {
+        if (QTAILQ_EMPTY(&(v9fs_async_struct.post_op_list))) {
+            break;
+        }
+        post_op = QTAILQ_FIRST(&(v9fs_async_struct.post_op_list));
+        QTAILQ_REMOVE(&(v9fs_async_struct.post_op_list), post_op, node);
+
+        pthread_mutex_unlock(&v9fs_async_struct.lock);
+        post_op->func(post_op->arg);
+        qemu_free(post_op);
+        pthread_mutex_lock(&v9fs_async_struct.lock);
+    }
+    pthread_mutex_unlock(&v9fs_async_struct.lock);
+}
+
+/**
+ * v9fs_async_signal: Send a signal to the iothread.
+ * @func: v9fs_post_posix_func() to be called by the iothread.
+ * @arg: Argument to func.
+ *
+ * This function is called from the context of one of the asynchronous threads
+ * in the thread pool. This is called when the asynchronous thread has finished
+ * executing a v9fs_posix_operation. It's purpose is to initiate the process of
+ * informing the iothread that the v9fs_posix_operation has completed.
+ *
+ * This code follows the suit of the aio_thread() and uses SIGUSR1 to notify the
+ * iothread.
+ */
+static void v9fs_async_signal(void (*func)(void *arg), void *arg)
+{
+    struct v9fs_post_op *post_op;
+    pid_t pid = getpid();
+
+    post_op = qemu_mallocz(sizeof(*post_op));
+    post_op->func = func;
+    post_op->arg = arg;
+
+    pthread_mutex_lock(&v9fs_async_struct.lock);
+    QTAILQ_INSERT_TAIL(&(v9fs_async_struct.post_op_list), post_op, node);
+    pthread_mutex_unlock(&v9fs_async_struct.lock);
+
+    if(kill(pid, SIGUSR1)) die("v9fs kill failed");
+}
+
+/**
+ * v9fs_do_async_posix: Offload v9fs_posix_operation onto async thread.
+ * @vs: V9fsOPState variable for the OP operation.
+ * @posix_fn: The posix function which has to be offloaded onto async thread.
+ * @post_fn_ptr: Address of the location to hold the post_fn corresponding to
+ * the posix_fn
+ * @post_fn: The post_fn corresponding to the posix_fn.
+ *
+ * This function is a helper to offload posix_operation on to the asynchronous
+ * thread pool. It sets up the associations with the post_function that needs to
+ * be invoked by from the context of the iothread once the posix_fn has been
+ * executed.
+ */
+static void v9fs_do_async_posix(void *vs ,
+                                void (*posix_fn)(struct work_item *work),
+                                void (**post_fn_ptr)(void *arg),
+                                void (*post_fn)(void *arg))
+{
+    struct work_item *work;
+
+    *post_fn_ptr = post_fn;
+    work = async_work_init(&v9fs_async_struct.virtio_9p_aqueue,
+                            posix_fn, vs);
+    qemu_async_submit(&v9fs_async_struct.virtio_9p_aqueue, work);
+}
+
 enum {
     Oread   = 0x00,
     Owrite  = 0x01,
@@ -2321,6 +2458,8 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
     int i, len;
     struct stat stat;
     FsTypeEntry *fse;
+    int fds[2];
+    struct sigaction act;
 
 
     s = (V9fsState *)virtio_common_init("virtio-9p",
@@ -2395,5 +2534,33 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
                         s->tag_len;
     s->vdev.get_config = virtio_9p_get_config;
 
+    sigfillset(&act.sa_mask);
+    act.sa_flags = 0;
+    act.sa_handler = v9fs_signal_handler;
+    sigaction(SIGUSR1, &act, NULL);
+
+    if (qemu_pipe(fds) == -1) {
+        fprintf(stderr, "failed to create fd's for virtio-9p\n");
+        exit(1);
+    }
+
+    v9fs_async_struct.rfd = fds[0];
+    v9fs_async_struct.wfd = fds[1];
+
+    printf("v9fs: rfd: %d\n", v9fs_async_struct.rfd);
+    printf("v9fs: wfd: %d\n", v9fs_async_struct.wfd);
+
+    fcntl(fds[0], F_SETFL, O_NONBLOCK);
+    fcntl(fds[1], F_SETFL, O_NONBLOCK);
+
+    qemu_set_fd_handler(fds[0], v9fs_process_post_ops, NULL, NULL);
+    QTAILQ_INIT(&v9fs_async_struct.post_op_list);
+    pthread_mutex_init(&(v9fs_async_struct.lock), NULL);
+    /* Create async queue. */
+    async_queue_init(&v9fs_async_struct.virtio_9p_aqueue, 10, 3);
+
+    (void)v9fs_do_async_posix;
+    (void)v9fs_async_signal;
+
     return &s->vdev;
 }

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [Qemu-devel] [RFC/ PATCH 4/4] virtio-9p: convert lstat to use async infrastructure.
  2010-05-24 12:52 [Qemu-devel] [RFC/ PATCH 0/4] qemu: Extend AIO threading framework to a generic one Gautham R Shenoy
                   ` (2 preceding siblings ...)
  2010-05-24 12:53 ` [Qemu-devel] [RFC/ PATCH 3/4] virtio-9p: Add async helper functions Gautham R Shenoy
@ 2010-05-24 12:53 ` Gautham R Shenoy
  3 siblings, 0 replies; 7+ messages in thread
From: Gautham R Shenoy @ 2010-05-24 12:53 UTC (permalink / raw)
  To: Qemu-development List; +Cc: Anthony Liguori, Avi Kivity

This patch converts v9fs_stat() to make use of the async infrastructure.

Every call to v9fs_stat() is processed in the context of the vcpu thread before
offloading the actual stat operation onto an async-thread. The post operation is
handled in the context of the io-thread which in turn does the complete()
operation for this particular v9fs_stat() operation.

Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
---
 hw/virtio-9p.c |   34 ++++++++++++++++++++++------------
 hw/virtio-9p.h |    4 ++++
 2 files changed, 26 insertions(+), 12 deletions(-)

diff --git a/hw/virtio-9p.c b/hw/virtio-9p.c
index f8f60d3..a57fffc 100644
--- a/hw/virtio-9p.c
+++ b/hw/virtio-9p.c
@@ -1218,26 +1218,38 @@ out:
     v9fs_string_free(&aname);
 }
 
-static void v9fs_stat_post_lstat(V9fsState *s, V9fsStatState *vs, int err)
+static void v9fs_stat_post_lstat(void *opaque)
 {
-    if (err == -1) {
-        err = -errno;
+    V9fsStatState *vs = (V9fsStatState *)opaque;
+
+    if (vs->err == -1) {
+        vs->err = -(vs->v9fs_errno);
         goto out;
     }
 
-    err = stat_to_v9stat(s, &vs->fidp->path, &vs->stbuf, &vs->v9stat);
-    if (err) {
+    vs->err = stat_to_v9stat(vs->s, &vs->fidp->path, &vs->stbuf, &vs->v9stat);
+    if (vs->err) {
         goto out;
     }
     vs->offset += pdu_marshal(vs->pdu, vs->offset, "wS", 0, &vs->v9stat);
-    err = vs->offset;
+    vs->err = vs->offset;
 
 out:
-    complete_pdu(s, vs->pdu, err);
+    complete_pdu(vs->s, vs->pdu, vs->err);
     v9fs_stat_free(&vs->v9stat);
     qemu_free(vs);
 }
 
+static void v9fs_stat_do_lstat(struct work_item *work)
+{
+    V9fsStatState *vs = (V9fsStatState *)work->private;
+
+    vs->err = v9fs_do_lstat(vs->s, &vs->fidp->path, &vs->stbuf);
+    vs->v9fs_errno = errno;
+
+    v9fs_async_signal(vs->post_fn, vs);
+}
+
 static void v9fs_stat(V9fsState *s, V9fsPDU *pdu)
 {
     int32_t fid;
@@ -1247,6 +1259,7 @@ static void v9fs_stat(V9fsState *s, V9fsPDU *pdu)
     vs = qemu_malloc(sizeof(*vs));
     vs->pdu = pdu;
     vs->offset = 7;
+    vs->s = s;
 
     memset(&vs->v9stat, 0, sizeof(vs->v9stat));
 
@@ -1258,8 +1271,8 @@ static void v9fs_stat(V9fsState *s, V9fsPDU *pdu)
         goto out;
     }
 
-    err = v9fs_do_lstat(s, &vs->fidp->path, &vs->stbuf);
-    v9fs_stat_post_lstat(s, vs, err);
+    v9fs_do_async_posix(vs, v9fs_stat_do_lstat, &vs->post_fn,
+                        v9fs_stat_post_lstat);
     return;
 
 out:
@@ -2559,8 +2572,5 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
     /* Create async queue. */
     async_queue_init(&v9fs_async_struct.virtio_9p_aqueue, 10, 3);
 
-    (void)v9fs_do_async_posix;
-    (void)v9fs_async_signal;
-
     return &s->vdev;
 }
diff --git a/hw/virtio-9p.h b/hw/virtio-9p.h
index 992c765..b4a1d46 100644
--- a/hw/virtio-9p.h
+++ b/hw/virtio-9p.h
@@ -173,6 +173,10 @@ typedef struct V9fsStatState {
     V9fsStat v9stat;
     V9fsFidState *fidp;
     struct stat stbuf;
+    V9fsState *s;
+    int err;
+    int v9fs_errno;
+    void (*post_fn)(void *arg);
 } V9fsStatState;
 
 typedef struct V9fsWalkState {

^ permalink raw reply related	[flat|nested] 7+ messages in thread

* Re: [Qemu-devel] [RFC/ PATCH 1/4] qemu: Generic asynchronous threading framework to offload tasks
  2010-05-24 12:53 ` [Qemu-devel] [RFC/ PATCH 1/4] qemu: Generic asynchronous threading framework to offload tasks Gautham R Shenoy
@ 2010-05-24 14:16   ` malc
  2010-05-24 14:54   ` Corentin Chary
  1 sibling, 0 replies; 7+ messages in thread
From: malc @ 2010-05-24 14:16 UTC (permalink / raw)
  To: Gautham R Shenoy
  Cc: Aneesh Kumar K.V, Anthony Liguori, Qemu-development List,
	Avi Kivity

On Mon, 24 May 2010, Gautham R Shenoy wrote:

> From: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
> 
> This patch creates a generic asynchronous-task-offloading infrastructure. It's
> extracted out of the threading framework that is being used by paio.
> 
> The reason for extracting out this generic infrastructure of the
> posix-aio-compat.c is so that other subsystems, such as virtio-9p could make use
> of it for offloading tasks that could block.
> 
> [ego@in.ibm.com: work_item_pool, async_work_init, async_work_release,
> async_cancel_work]
> 
> Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>

This patch is rather inconsistent w.r.t. error checking.

[..snip..]

-- 
mailto:av1474@comtv.ru

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [Qemu-devel] [RFC/ PATCH 1/4] qemu: Generic asynchronous threading framework to offload tasks
  2010-05-24 12:53 ` [Qemu-devel] [RFC/ PATCH 1/4] qemu: Generic asynchronous threading framework to offload tasks Gautham R Shenoy
  2010-05-24 14:16   ` malc
@ 2010-05-24 14:54   ` Corentin Chary
  1 sibling, 0 replies; 7+ messages in thread
From: Corentin Chary @ 2010-05-24 14:54 UTC (permalink / raw)
  To: Gautham R Shenoy
  Cc: Aneesh Kumar K.V, Anthony Liguori, Qemu-development List,
	Avi Kivity

On Mon, May 24, 2010 at 2:53 PM, Gautham R Shenoy <ego@in.ibm.com> wrote:
> From: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
>
> This patch creates a generic asynchronous-task-offloading infrastructure. It's
> extracted out of the threading framework that is being used by paio.
>
> The reason for extracting out this generic infrastructure of the
> posix-aio-compat.c is so that other subsystems, such as virtio-9p could make use
> of it for offloading tasks that could block.
>

Hi,
I'm currently working asynchronous-encoding for the VNC server and
using that kind of approach (a queue of jobs, with a worker thread).
I'll post a RFC patch soon, but I would be happy to move to a generic
solution instead of managing my own queue of job.

I noticed that you were using directly pthread_* functions. Maybe you
should use (and extend) qemu-thread.h to provide some
kind of thread abstraction (even if currently qemu-thread is a direct
wrapper to pthread).

Thanks,


-- 
Corentin Chary
http://xf.iksaif.net

^ permalink raw reply	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2010-05-24 14:54 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2010-05-24 12:52 [Qemu-devel] [RFC/ PATCH 0/4] qemu: Extend AIO threading framework to a generic one Gautham R Shenoy
2010-05-24 12:53 ` [Qemu-devel] [RFC/ PATCH 1/4] qemu: Generic asynchronous threading framework to offload tasks Gautham R Shenoy
2010-05-24 14:16   ` malc
2010-05-24 14:54   ` Corentin Chary
2010-05-24 12:53 ` [Qemu-devel] [RFC/ PATCH 2/4] qemu: Convert AIO code to use the generic threading infrastructure Gautham R Shenoy
2010-05-24 12:53 ` [Qemu-devel] [RFC/ PATCH 3/4] virtio-9p: Add async helper functions Gautham R Shenoy
2010-05-24 12:53 ` [Qemu-devel] [RFC/ PATCH 4/4] virtio-9p: convert lstat to use async infrastructure Gautham R Shenoy

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).