qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
* [Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c
  2010-11-08 10:46 [Qemu-devel] [REFACTORED CODE] [PATCH 0/3] Threadlets: A generic task offloading framework Arun R Bharadwaj
@ 2010-11-08 10:47 ` Arun R Bharadwaj
  0 siblings, 0 replies; 16+ messages in thread
From: Arun R Bharadwaj @ 2010-11-08 10:47 UTC (permalink / raw)
  To: qemu-devel

The reason for creating this generic infrastructure is so that other subsystems,
such as virtio-9p could make use of it for offloading tasks that could block.

Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
---
 Makefile.objs          |    1 
 docs/async-support.txt |  141 +++++++++++++++++++++++++++++++++++++++
 posix-aio-compat.c     |  173 ------------------------------------------------
 qemu-threadlets.c      |  167 ++++++++++++++++++++++++++++++++++++++++++++++
 qemu-threadlets.h      |   48 +++++++++++++
 5 files changed, 358 insertions(+), 172 deletions(-)
 create mode 100644 docs/async-support.txt
 create mode 100644 qemu-threadlets.c
 create mode 100644 qemu-threadlets.h

diff --git a/Makefile.objs b/Makefile.objs
index 3b7ec27..2cf8aba 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -10,6 +10,7 @@ qobject-obj-y += qerror.o
 block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
 block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
 block-obj-$(CONFIG_POSIX) += qemu-thread.o
+block-obj-$(CONFIG_POSIX) += qemu-threadlets.o
 block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 
diff --git a/docs/async-support.txt b/docs/async-support.txt
new file mode 100644
index 0000000..9f22b9a
--- /dev/null
+++ b/docs/async-support.txt
@@ -0,0 +1,141 @@
+== How to use the threadlets infrastructure supported in Qemu ==
+
+== Threadlets ==
+
+Q.1: What are threadlets ?
+A.1: Threadlets is an infrastructure within QEMU that allows other subsystems
+     to offload possibly blocking work to a queue to be processed by a pool
+     of threads asynchronously.
+
+Q.2: When would one want to use threadlets ?
+A.2: Threadlets are useful when there are operations that can be performed
+     outside the context of the VCPU/IO threads inorder to free these latter
+     to service any other guest requests.
+
+Q.3: I have some work that can be executed in an asynchronous context. How
+     should I go about it ?
+A.3: One could follow the steps listed below:
+
+     - Define a function which would do the asynchronous work.
+	static void my_threadlet_func(ThreadletWork *work)
+	{
+	}
+
+     - Declare an object of type ThreadletWork;
+	ThreadletWork work;
+
+
+     - Assign a value to the "func" member of ThreadletWork object.
+	work.func = my_threadlet_func;
+
+     - Submit the threadlet to the global queue.
+	submit_threadletwork(&work);
+
+     - Continue servicing some other guest operations.
+
+Q.4: I want to my_threadlet_func to access some non-global data. How do I do
+     that ?
+A.4: Suppose you want my_threadlet_func to access some non-global data-object
+     of type myPrivateData. In that case one could follow the following steps.
+
+     - Define a member of the type ThreadletWork within myPrivateData.
+	typedef struct MyPrivateData {
+		...;
+		...;
+		...;
+		ThreadletWork work;
+	} MyPrivateData;
+
+	MyPrivateData my_data;
+
+     - Initialize myData.work as described in A.3
+	myData.work.func = my_threadlet_func;
+	submit_threadletwork(&myData.work);
+
+     - Access the myData object inside my_threadlet_func() using container_of
+       primitive
+	static void my_threadlet_func(ThreadletWork *work)
+	{
+		myPrivateData *mydata_ptr;
+		mydata_ptr = container_of(work, myPrivateData, work);
+
+		/* mydata_ptr now points to myData object */
+	}
+
+Q.5: Are there any precautions one must take while sharing data with the
+     Asynchronous thread-pool ?
+A.5: Yes, make sure that the helper function of the type my_threadlet_func()
+     does not access/modify data when it can be accessed or modified in the
+     context of VCPU thread or IO thread. This is because the asynchronous
+     threads in the pool can run in parallel with the VCPU/IOThreads as shown
+     in the figure.
+
+     A typical workflow is as follows:
+
+              VCPU/IOThread
+                   |
+                   | (1)
+                   |
+                   V
+                Offload work              (2)
+      |-------> to threadlets -----------------------------> Helper thread
+      |            |                                               |
+      |            |                                               |
+      |            | (3)                                           | (4)
+      |            |                                               |
+      |         Handle other Guest requests                        |
+      |            |                                               |
+      |            |                                               V
+      |            | (3)                                  Signal the I/O Thread
+      |(6)         |                                               |
+      |            |                                              /
+      |            |                                             /
+      |            V                                            /
+      |          Do the post <---------------------------------/
+      |          processing               (5)
+      |            |
+      |            | (6)
+      |            V
+      |-Yes------ More async work?
+                   |
+                   | (7)
+	           No
+                   |
+                   |
+                   .
+                   .
+
+    Hence one needs to make sure that in the steps (3) and (4) which run in
+    parallel, any global data is accessed within only one context.
+
+Q.6: I have queued a threadlet which I want to cancel. How do I do that ?
+A.6: Threadlets framework provides the API cancel_threadlet:
+       - int cancel_threadletwork(ThreadletWork *work)
+
+     The API scans the ThreadletQueue to see if (work) is present. If it finds
+     work, it'll dequeue work and return 0.
+
+     On the other hand, if it does not find the (work) in the ThreadletQueue,
+     then it'll return 1. This can imply two things. Either the work is being
+     processed by one of the helper threads or it has been processed. The
+     threadlet infrastructure currently _does_not_ distinguish between these
+     two and the onus is on the caller to do that.
+
+Q.7: Apart from the global pool of threads, can I have my own private Queue ?
+A.7: Yes, the threadlets framework allows subsystems to create their own private
+     queues with associated pools of threads.
+
+     - Define a PrivateQueue
+       ThreadletQueue myQueue;
+
+     - Initialize it:
+       threadlet_queue_init(&myQueue, my_max_threads, my_min_threads);
+       where my_max_threads is the maximum number of threads that can be in the
+       thread pool and my_min_threads is the minimum number of active threads
+       that can be in the thread-pool.
+
+     - Submit work:
+       submit_threadletwork_to_queue(&myQueue, &my_work);
+
+     - Cancel work:
+       cancel_threadletwork_on_queue(&myQueue, &my_work);
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 00b2a4e..9e02d18 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,33 +29,10 @@
 #include "block_int.h"
 
 #include "block/raw-posix-aio.h"
-#include "qemu-thread.h"
-
-#define MAX_GLOBAL_THREADS  64
-#define MIN_GLOBAL_THREADS   8
+#include "qemu-threadlets.h"
 
 QemuMutex aiocb_mutex;
 
-typedef struct ThreadletQueue
-{
-    QemuMutex lock;
-    QemuCond cond;
-    int max_threads;
-    int min_threads;
-    int cur_threads;
-    int idle_threads;
-    QTAILQ_HEAD(, ThreadletWork) request_list;
-} ThreadletQueue;
-
-typedef struct ThreadletWork
-{
-    QTAILQ_ENTRY(ThreadletWork) node;
-    void (*func)(struct ThreadletWork *work);
-} ThreadletWork;
-
-static ThreadletQueue globalqueue;
-static int globalqueue_init;
-
 struct qemu_paiocb {
     BlockDriverAIOCB common;
     int aio_fildes;
@@ -83,154 +60,6 @@ typedef struct PosixAioState {
     struct qemu_paiocb *first_aio;
 } PosixAioState;
 
-static void *threadlet_worker(void *data)
-{
-    ThreadletQueue *queue = data;
-
-    qemu_mutex_lock(&queue->lock);
-    while (1) {
-        ThreadletWork *work;
-        int ret = 0;
-
-        while (QTAILQ_EMPTY(&queue->request_list) &&
-               (ret != ETIMEDOUT)) {
-            /* wait for cond to be signalled or broadcast for 1000s */
-            ret = qemu_cond_timedwait((&queue->cond),
-                                         &(queue->lock), 10*100000);
-        }
-
-        assert(queue->idle_threads != 0);
-        if (QTAILQ_EMPTY(&queue->request_list)) {
-            if (queue->cur_threads > queue->min_threads) {
-                /* We retain the minimum number of threads */
-                break;
-            }
-        } else {
-            work = QTAILQ_FIRST(&queue->request_list);
-            QTAILQ_REMOVE(&queue->request_list, work, node);
-
-            queue->idle_threads--;
-            qemu_mutex_unlock(&queue->lock);
-
-            /* execute the work function */
-            work->func(work);
-
-            qemu_mutex_lock(&queue->lock);
-            queue->idle_threads++;
-        }
-    }
-
-    queue->idle_threads--;
-    queue->cur_threads--;
-    qemu_mutex_unlock(&queue->lock);
-
-    return NULL;
-}
-
-static void spawn_threadlet(ThreadletQueue *queue)
-{
-    QemuThread thread;
-
-    queue->cur_threads++;
-    queue->idle_threads++;
-
-    qemu_thread_create(&thread, threadlet_worker, queue);
-}
-
-/**
- * submit_work_to_queue: Submit a new task to a private queue to be
- *                            executed asynchronously.
- * @queue: Per-subsystem private queue to which the new task needs
- *         to be submitted.
- * @work: Contains information about the task that needs to be submitted.
- */
-static void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
-{
-    qemu_mutex_lock(&queue->lock);
-    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
-        spawn_threadlet(queue);
-    } else {
-        qemu_cond_signal(&queue->cond);
-    }
-    QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
-    qemu_mutex_unlock(&queue->lock);
-}
-
-static void threadlet_queue_init(ThreadletQueue *queue,
-                          int max_threads, int min_threads);
-
-/**
- * submit_work: Submit to the global queue a new task to be executed
- *                   asynchronously.
- * @work: Contains information about the task that needs to be submitted.
- */
-static void submit_work(ThreadletWork *work)
-{
-    if (!globalqueue_init) {
-        threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
-                                MIN_GLOBAL_THREADS);
-        globalqueue_init = 1;
-    }
-
-    submit_work_to_queue(&globalqueue, work);
-}
-
-/**
- * dequeue_work_on_queue: Cancel a task queued on a Queue.
- * @queue: The queue containing the task to be cancelled.
- * @work: Contains the information of the task that needs to be cancelled.
- *
- * Returns: 0 if the task is successfully cancelled.
- *          1 otherwise.
- */
-static int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
-{
-    ThreadletWork *ret_work;
-    int ret = 1;
-
-    qemu_mutex_lock(&queue->lock);
-    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
-        if (ret_work == work) {
-            QTAILQ_REMOVE(&queue->request_list, ret_work, node);
-            ret = 0;
-            break;
-        }
-    }
-    qemu_mutex_unlock(&queue->lock);
-
-    return ret;
-}
-
-/**
- * dequeue_work: Cancel a task queued on the global queue.
- * @work: Contains the information of the task that needs to be cancelled.
- *
- * Returns: 0 if the task is successfully cancelled.
- *          1 otherwise.
- */
-static int dequeue_work(ThreadletWork *work)
-{
-    return dequeue_work_on_queue(&globalqueue, work);
-}
-
-/**
- * threadlet_queue_init: Initialize a threadlet queue.
- * @queue: The threadlet queue to be initialized.
- * @max_threads: Maximum number of threads processing the queue.
- * @min_threads: Minimum number of threads processing the queue.
- */
-static void threadlet_queue_init(ThreadletQueue *queue,
-                                    int max_threads, int min_threads)
-{
-    queue->cur_threads  = 0;
-    queue->idle_threads = 0;
-    queue->max_threads  = max_threads;
-    queue->min_threads  = min_threads;
-    QTAILQ_INIT(&queue->request_list);
-    qemu_mutex_init(&queue->lock);
-    qemu_cond_init(&queue->cond);
-}
-
 #ifdef CONFIG_PREADV
 static int preadv_present = 1;
 #else
diff --git a/qemu-threadlets.c b/qemu-threadlets.c
new file mode 100644
index 0000000..a79062c
--- /dev/null
+++ b/qemu-threadlets.c
@@ -0,0 +1,167 @@
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Anthony Liguori     <aliguori@us.ibm.com>
+ *  Aneesh Kumar K.V    <aneesh.kumar@linux.vnet.ibm.com>
+ *  Gautham R Shenoy    <ego@in.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ */
+
+#include "qemu-threadlets.h"
+#include "osdep.h"
+
+#define MAX_GLOBAL_THREADS  64
+#define MIN_GLOBAL_THREADS   8
+static ThreadletQueue globalqueue;
+static int globalqueue_init;
+
+static void *threadlet_worker(void *data)
+{
+    ThreadletQueue *queue = data;
+
+    qemu_mutex_lock(&queue->lock);
+    while (1) {
+        ThreadletWork *work;
+        int ret = 0;
+
+        while (QTAILQ_EMPTY(&queue->request_list) &&
+               (ret != ETIMEDOUT)) {
+            /* wait for cond to be signalled or broadcast for 1000s */
+            ret = qemu_cond_timedwait((&queue->cond),
+                                         &(queue->lock), 10*100000);
+        }
+
+        assert(queue->idle_threads != 0);
+        if (QTAILQ_EMPTY(&queue->request_list)) {
+            if (queue->cur_threads > queue->min_threads) {
+                /* We retain the minimum number of threads */
+                break;
+            }
+        } else {
+            work = QTAILQ_FIRST(&queue->request_list);
+            QTAILQ_REMOVE(&queue->request_list, work, node);
+
+            queue->idle_threads--;
+            qemu_mutex_unlock(&queue->lock);
+
+            /* execute the work function */
+            work->func(work);
+
+            qemu_mutex_lock(&queue->lock);
+            queue->idle_threads++;
+        }
+    }
+
+    queue->idle_threads--;
+    queue->cur_threads--;
+    qemu_mutex_unlock(&queue->lock);
+
+    return NULL;
+}
+
+static void spawn_threadlet(ThreadletQueue *queue)
+{
+    QemuThread thread;
+
+    queue->cur_threads++;
+    queue->idle_threads++;
+
+    qemu_thread_create(&thread, threadlet_worker, queue);
+}
+
+/**
+ * submit_work_to_queue: Submit a new task to a private queue to be
+ *                            executed asynchronously.
+ * @queue: Per-subsystem private queue to which the new task needs
+ *         to be submitted.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
+{
+    qemu_mutex_lock(&queue->lock);
+    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+        spawn_threadlet(queue);
+    } else {
+        qemu_cond_signal(&queue->cond);
+    }
+    QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
+    qemu_mutex_unlock(&queue->lock);
+}
+
+/**
+ * submit_work: Submit to the global queue a new task to be executed
+ *                   asynchronously.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_work(ThreadletWork *work)
+{
+    if (!globalqueue_init) {
+        threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
+                                MIN_GLOBAL_THREADS);
+        globalqueue_init = 1;
+    }
+
+    submit_work_to_queue(&globalqueue, work);
+}
+
+/**
+ * dequeue_work_on_queue: Cancel a task queued on a Queue.
+ * @queue: The queue containing the task to be cancelled.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ *          1 otherwise.
+ */
+int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
+{
+    ThreadletWork *ret_work;
+    int ret = 1;
+
+    qemu_mutex_lock(&queue->lock);
+    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
+        if (ret_work == work) {
+            QTAILQ_REMOVE(&queue->request_list, ret_work, node);
+            ret = 0;
+            break;
+        }
+    }
+    qemu_mutex_unlock(&queue->lock);
+
+    return ret;
+}
+
+/**
+ * dequeue_work: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ *          1 otherwise.
+ */
+int dequeue_work(ThreadletWork *work)
+{
+    return dequeue_work_on_queue(&globalqueue, work);
+}
+
+/**
+ * threadlet_queue_init: Initialize a threadlet queue.
+ * @queue: The threadlet queue to be initialized.
+ * @max_threads: Maximum number of threads processing the queue.
+ * @min_threads: Minimum number of threads processing the queue.
+ */
+void threadlet_queue_init(ThreadletQueue *queue,
+                                    int max_threads, int min_threads)
+{
+    queue->cur_threads  = 0;
+    queue->idle_threads = 0;
+    queue->max_threads  = max_threads;
+    queue->min_threads  = min_threads;
+    QTAILQ_INIT(&queue->request_list);
+    qemu_mutex_init(&queue->lock);
+    qemu_cond_init(&queue->cond);
+}
diff --git a/qemu-threadlets.h b/qemu-threadlets.h
new file mode 100644
index 0000000..cdb5faa
--- /dev/null
+++ b/qemu-threadlets.h
@@ -0,0 +1,48 @@
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Anthony Liguori     <aliguori@us.ibm.com>
+ *  Aneesh Kumar K.V    <aneesh.kumar@linux.vnet.ibm.com>
+ *  Gautham R Shenoy    <ego@in.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ */
+
+#ifndef QEMU_ASYNC_WORK_H
+#define QEMU_ASYNC_WORK_H
+
+#include "qemu-queue.h"
+#include "qemu-common.h"
+#include "qemu-thread.h"
+
+typedef struct ThreadletQueue
+{
+    QemuMutex lock;
+    QemuCond cond;
+    int max_threads;
+    int min_threads;
+    int cur_threads;
+    int idle_threads;
+    QTAILQ_HEAD(, ThreadletWork) request_list;
+} ThreadletQueue;
+
+typedef struct ThreadletWork
+{
+    QTAILQ_ENTRY(ThreadletWork) node;
+    void (*func)(struct ThreadletWork *work);
+} ThreadletWork;
+
+extern void submit_work_to_queue(ThreadletQueue *queue,
+                                 ThreadletWork *work);
+extern void submit_work(ThreadletWork *work);
+extern int dequeue_work_on_queue(ThreadletQueue *queue,
+                                 ThreadletWork *work);
+extern int dequeue_work(ThreadletWork *work);
+extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
+                                 int min_threads);
+#endif

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] v10: [PATCH 0/3] Threadlets: A generic task offloading framework.
@ 2010-11-10 13:19 Arun R Bharadwaj
  2010-11-10 13:19 ` [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure Arun R Bharadwaj
                   ` (2 more replies)
  0 siblings, 3 replies; 16+ messages in thread
From: Arun R Bharadwaj @ 2010-11-10 13:19 UTC (permalink / raw)
  To: qemu-devel

Hi,

This is v10 of the patch-series to have a generic
asynchronous task offloading framework (called
threadlets) within qemu.

Changelog:

* Removed the unnecessary active field from the
  structure qemu_paiocb.

* Rewrote the logic of the function paio_cancel

* Removed the unused CONFIG_THREAD=y from the
  configure code.



The following series implements...

---

Aneesh Kumar K.V (1):
      Make paio subsystem use threadlets infrastructure

Arun R Bharadwaj (1):
      Move threadlets infrastructure to qemu-threadlets.c

Gautham R Shenoy (1):
      Add helper functions to enable virtio-9p make use of the threadlets


 Makefile.objs          |    3 -
 configure              |    2 
 docs/async-support.txt |  141 ++++++++++++++++++++++++++++++
 hw/virtio-9p.c         |  164 +++++++++++++++++++++++++++++++++++
 posix-aio-compat.c     |  226 +++++++++++-------------------------------------
 qemu-threadlets.c      |  189 ++++++++++++++++++++++++++++++++++++++++
 qemu-threadlets.h      |   49 ++++++++++
 vl.c                   |    3 +
 8 files changed, 600 insertions(+), 177 deletions(-)
 create mode 100644 docs/async-support.txt
 create mode 100644 qemu-threadlets.c
 create mode 100644 qemu-threadlets.h

-- 
arun

^ permalink raw reply	[flat|nested] 16+ messages in thread

* [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure
  2010-11-10 13:19 [Qemu-devel] v10: [PATCH 0/3] Threadlets: A generic task offloading framework Arun R Bharadwaj
@ 2010-11-10 13:19 ` Arun R Bharadwaj
  2010-11-10 13:45   ` Stefan Hajnoczi
  2010-11-10 13:19 ` [Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c Arun R Bharadwaj
  2010-11-10 13:19 ` [Qemu-devel] [PATCH 3/3] Add helper functions to enable virtio-9p make use of the threadlets Arun R Bharadwaj
  2 siblings, 1 reply; 16+ messages in thread
From: Arun R Bharadwaj @ 2010-11-10 13:19 UTC (permalink / raw)
  To: qemu-devel

From: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>

This patch creates a generic asynchronous-task-offloading infrastructure named
threadlets.

The patch creates a global queue on-to which subsystems can queue their tasks to
be executed asynchronously.

The patch also provides API's that allow a subsystem to create a private queue
with an associated pool of threads.

The patch has been tested with fstress.

Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
---
 Makefile.objs      |    2 
 configure          |    2 
 posix-aio-compat.c |  353 +++++++++++++++++++++++++++++++---------------------
 3 files changed, 210 insertions(+), 147 deletions(-)

diff --git a/Makefile.objs b/Makefile.objs
index cd5a24b..3b7ec27 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -9,6 +9,7 @@ qobject-obj-y += qerror.o
 
 block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
 block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
+block-obj-$(CONFIG_POSIX) += qemu-thread.o
 block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 
@@ -124,7 +125,6 @@ endif
 common-obj-y += $(addprefix ui/, $(ui-obj-y))
 
 common-obj-y += iov.o acl.o
-common-obj-$(CONFIG_THREAD) += qemu-thread.o
 common-obj-y += notify.o event_notifier.o
 common-obj-y += qemu-timer.o
 
diff --git a/configure b/configure
index a079a49..addf733 100755
--- a/configure
+++ b/configure
@@ -2456,7 +2456,6 @@ if test "$vnc_png" != "no" ; then
 fi
 if test "$vnc_thread" != "no" ; then
   echo "CONFIG_VNC_THREAD=y" >> $config_host_mak
-  echo "CONFIG_THREAD=y" >> $config_host_mak
 fi
 if test "$fnmatch" = "yes" ; then
   echo "CONFIG_FNMATCH=y" >> $config_host_mak
@@ -2534,7 +2533,6 @@ if test "$xen" = "yes" ; then
 fi
 if test "$io_thread" = "yes" ; then
   echo "CONFIG_IOTHREAD=y" >> $config_host_mak
-  echo "CONFIG_THREAD=y" >> $config_host_mak
 fi
 if test "$linux_aio" = "yes" ; then
   echo "CONFIG_LINUX_AIO=y" >> $config_host_mak
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 7b862b5..dc5f8ec 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,7 +29,33 @@
 #include "block_int.h"
 
 #include "block/raw-posix-aio.h"
+#include "qemu-thread.h"
 
+#define MAX_GLOBAL_THREADS  64
+#define MIN_GLOBAL_THREADS   8
+
+static QemuMutex aiocb_mutex;
+static QemuCond aiocb_completion;
+
+typedef struct ThreadletQueue
+{
+    QemuMutex lock;
+    QemuCond cond;
+    int max_threads;
+    int min_threads;
+    int cur_threads;
+    int idle_threads;
+    QTAILQ_HEAD(, ThreadletWork) request_list;
+} ThreadletQueue;
+
+typedef struct ThreadletWork
+{
+    QTAILQ_ENTRY(ThreadletWork) node;
+    void (*func)(struct ThreadletWork *work);
+} ThreadletWork;
+
+static ThreadletQueue globalqueue;
+static int globalqueue_init;
 
 struct qemu_paiocb {
     BlockDriverAIOCB common;
@@ -44,13 +70,12 @@ struct qemu_paiocb {
     int ev_signo;
     off_t aio_offset;
 
-    QTAILQ_ENTRY(qemu_paiocb) node;
     int aio_type;
     ssize_t ret;
-    int active;
     struct qemu_paiocb *next;
 
     int async_context_id;
+    ThreadletWork work;
 };
 
 typedef struct PosixAioState {
@@ -58,64 +83,169 @@ typedef struct PosixAioState {
     struct qemu_paiocb *first_aio;
 } PosixAioState;
 
+static void *threadlet_worker(void *data)
+{
+    ThreadletQueue *queue = data;
 
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
+    qemu_mutex_lock(&queue->lock);
+    while (1) {
+        ThreadletWork *work;
+        int ret = 0;
+
+        while (QTAILQ_EMPTY(&queue->request_list) &&
+               (ret != ETIMEDOUT)) {
+            /* wait for cond to be signalled or broadcast for 1000s */
+            ret = qemu_cond_timedwait((&queue->cond),
+                                         &(queue->lock), 10*100000);
+        }
 
-#ifdef CONFIG_PREADV
-static int preadv_present = 1;
-#else
-static int preadv_present = 0;
-#endif
+        assert(queue->idle_threads != 0);
+        if (QTAILQ_EMPTY(&queue->request_list)) {
+            if (queue->cur_threads > queue->min_threads) {
+                /* We retain the minimum number of threads */
+                break;
+            }
+        } else {
+            work = QTAILQ_FIRST(&queue->request_list);
+            QTAILQ_REMOVE(&queue->request_list, work, node);
 
-static void die2(int err, const char *what)
-{
-    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
-    abort();
+            queue->idle_threads--;
+            qemu_mutex_unlock(&queue->lock);
+
+            /* execute the work function */
+            work->func(work);
+
+            qemu_mutex_lock(&queue->lock);
+            queue->idle_threads++;
+        }
+    }
+
+    queue->idle_threads--;
+    queue->cur_threads--;
+    qemu_mutex_unlock(&queue->lock);
+
+    return NULL;
 }
 
-static void die(const char *what)
+static void spawn_threadlet(ThreadletQueue *queue)
 {
-    die2(errno, what);
+    QemuThread thread;
+
+    queue->cur_threads++;
+    queue->idle_threads++;
+
+    qemu_thread_create(&thread, threadlet_worker, queue);
 }
 
-static void mutex_lock(pthread_mutex_t *mutex)
+/**
+ * submit_work_to_queue: Submit a new task to a private queue to be
+ *                            executed asynchronously.
+ * @queue: Per-subsystem private queue to which the new task needs
+ *         to be submitted.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+static void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
 {
-    int ret = pthread_mutex_lock(mutex);
-    if (ret) die2(ret, "pthread_mutex_lock");
+    qemu_mutex_lock(&queue->lock);
+    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+        spawn_threadlet(queue);
+    } else {
+        qemu_cond_signal(&queue->cond);
+    }
+    QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
+    qemu_mutex_unlock(&queue->lock);
 }
 
-static void mutex_unlock(pthread_mutex_t *mutex)
+static void threadlet_queue_init(ThreadletQueue *queue,
+                          int max_threads, int min_threads);
+
+/**
+ * submit_work: Submit to the global queue a new task to be executed
+ *                   asynchronously.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+static void submit_work(ThreadletWork *work)
 {
-    int ret = pthread_mutex_unlock(mutex);
-    if (ret) die2(ret, "pthread_mutex_unlock");
+    if (!globalqueue_init) {
+        threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
+                                MIN_GLOBAL_THREADS);
+        globalqueue_init = 1;
+    }
+
+    submit_work_to_queue(&globalqueue, work);
 }
 
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
-                           struct timespec *ts)
+/**
+ * dequeue_work_on_queue: Cancel a task queued on a Queue.
+ * @queue: The queue containing the task to be cancelled.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ *          1 otherwise.
+ */
+static int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
 {
-    int ret = pthread_cond_timedwait(cond, mutex, ts);
-    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
+    ThreadletWork *ret_work;
+    int ret = 1;
+
+    qemu_mutex_lock(&queue->lock);
+    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
+        if (ret_work == work) {
+            QTAILQ_REMOVE(&queue->request_list, ret_work, node);
+            ret = 0;
+            break;
+        }
+    }
+    qemu_mutex_unlock(&queue->lock);
+
     return ret;
 }
 
-static void cond_signal(pthread_cond_t *cond)
+/**
+ * dequeue_work: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ *          1 otherwise.
+ */
+static int dequeue_work(ThreadletWork *work)
+{
+    return dequeue_work_on_queue(&globalqueue, work);
+}
+
+/**
+ * threadlet_queue_init: Initialize a threadlet queue.
+ * @queue: The threadlet queue to be initialized.
+ * @max_threads: Maximum number of threads processing the queue.
+ * @min_threads: Minimum number of threads processing the queue.
+ */
+static void threadlet_queue_init(ThreadletQueue *queue,
+                                    int max_threads, int min_threads)
+{
+    queue->cur_threads  = 0;
+    queue->idle_threads = 0;
+    queue->max_threads  = max_threads;
+    queue->min_threads  = min_threads;
+    QTAILQ_INIT(&queue->request_list);
+    qemu_mutex_init(&queue->lock);
+    qemu_cond_init(&queue->cond);
+}
+
+#ifdef CONFIG_PREADV
+static int preadv_present = 1;
+#else
+static int preadv_present;
+#endif
+
+static void die2(int err, const char *what)
 {
-    int ret = pthread_cond_signal(cond);
-    if (ret) die2(ret, "pthread_cond_signal");
+    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
+    abort();
 }
 
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
-                          void *(*start_routine)(void*), void *arg)
+static void die(const char *what)
 {
-    int ret = pthread_create(thread, attr, start_routine, arg);
-    if (ret) die2(ret, "pthread_create");
+    die2(errno, what);
 }
 
 static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
@@ -301,106 +431,58 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
     return nbytes;
 }
 
-static void *aio_thread(void *unused)
+static void aio_thread(ThreadletWork *work)
 {
     pid_t pid;
+    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
+    ssize_t ret = 0;
 
     pid = getpid();
 
-    while (1) {
-        struct qemu_paiocb *aiocb;
-        ssize_t ret = 0;
-        qemu_timeval tv;
-        struct timespec ts;
-
-        qemu_gettimeofday(&tv);
-        ts.tv_sec = tv.tv_sec + 10;
-        ts.tv_nsec = 0;
-
-        mutex_lock(&lock);
-
-        while (QTAILQ_EMPTY(&request_list) &&
-               !(ret == ETIMEDOUT)) {
-            ret = cond_timedwait(&cond, &lock, &ts);
-        }
-
-        if (QTAILQ_EMPTY(&request_list))
-            break;
-
-        aiocb = QTAILQ_FIRST(&request_list);
-        QTAILQ_REMOVE(&request_list, aiocb, node);
-        aiocb->active = 1;
-        idle_threads--;
-        mutex_unlock(&lock);
-
-        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
-        case QEMU_AIO_READ:
-        case QEMU_AIO_WRITE:
-            ret = handle_aiocb_rw(aiocb);
-            break;
-        case QEMU_AIO_FLUSH:
-            ret = handle_aiocb_flush(aiocb);
-            break;
-        case QEMU_AIO_IOCTL:
-            ret = handle_aiocb_ioctl(aiocb);
-            break;
-        default:
-            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
-            ret = -EINVAL;
-            break;
-        }
-
-        mutex_lock(&lock);
-        aiocb->ret = ret;
-        idle_threads++;
-        mutex_unlock(&lock);
-
-        if (kill(pid, aiocb->ev_signo)) die("kill failed");
+    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+    case QEMU_AIO_READ:
+    case QEMU_AIO_WRITE:
+        ret = handle_aiocb_rw(aiocb);
+        break;
+    case QEMU_AIO_FLUSH:
+        ret = handle_aiocb_flush(aiocb);
+        break;
+    case QEMU_AIO_IOCTL:
+        ret = handle_aiocb_ioctl(aiocb);
+        break;
+    default:
+        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+        ret = -EINVAL;
+        break;
     }
 
-    idle_threads--;
-    cur_threads--;
-    mutex_unlock(&lock);
-
-    return NULL;
-}
-
-static void spawn_thread(void)
-{
-    sigset_t set, oldset;
-
-    cur_threads++;
-    idle_threads++;
+    qemu_mutex_lock(&aiocb_mutex);
+    aiocb->ret = ret;
+    qemu_mutex_unlock(&aiocb_mutex);
 
-    /* block all signals */
-    if (sigfillset(&set)) die("sigfillset");
-    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
-    thread_create(&thread_id, &attr, aio_thread, NULL);
-
-    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+    if (kill(pid, aiocb->ev_signo)) {
+        die("kill failed");
+    }
 }
 
 static void qemu_paio_submit(struct qemu_paiocb *aiocb)
 {
+    qemu_mutex_lock(&aiocb_mutex);
     aiocb->ret = -EINPROGRESS;
-    aiocb->active = 0;
-    mutex_lock(&lock);
-    if (idle_threads == 0 && cur_threads < max_threads)
-        spawn_thread();
-    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
-    mutex_unlock(&lock);
-    cond_signal(&cond);
+    qemu_mutex_unlock(&aiocb_mutex);
+
+    aiocb->work.func = aio_thread;
+    submit_work(&aiocb->work);
 }
 
 static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
 {
     ssize_t ret;
 
-    mutex_lock(&lock);
+    qemu_mutex_lock(&aiocb_mutex);
     ret = aiocb->ret;
-    mutex_unlock(&lock);
-
+    qemu_mutex_unlock(&aiocb_mutex);
+    qemu_cond_broadcast(&aiocb_completion);
     return ret;
 }
 
@@ -534,22 +616,14 @@ static void paio_remove(struct qemu_paiocb *acb)
 static void paio_cancel(BlockDriverAIOCB *blockacb)
 {
     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
-    int active = 0;
-
-    mutex_lock(&lock);
-    if (!acb->active) {
-        QTAILQ_REMOVE(&request_list, acb, node);
-        acb->ret = -ECANCELED;
-    } else if (acb->ret == -EINPROGRESS) {
-        active = 1;
-    }
-    mutex_unlock(&lock);
 
-    if (active) {
-        /* fail safe: if the aio could not be canceled, we wait for
-           it */
-        while (qemu_paio_error(acb) == EINPROGRESS)
-            ;
+    if (dequeue_work(&acb->work) != 0) {
+        /* Wait for running work item to complete */
+        qemu_mutex_lock(&aiocb_mutex);
+        while (acb->ret == -EINPROGRESS) {
+            qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
+        }
+        qemu_mutex_unlock(&aiocb_mutex);
     }
 
     paio_remove(acb);
@@ -618,11 +692,12 @@ int paio_init(void)
     struct sigaction act;
     PosixAioState *s;
     int fds[2];
-    int ret;
 
     if (posix_aio_state)
         return 0;
 
+    qemu_mutex_init(&aiocb_mutex);
+
     s = qemu_malloc(sizeof(PosixAioState));
 
     sigfillset(&act.sa_mask);
@@ -645,16 +720,6 @@ int paio_init(void)
     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
         posix_aio_process_queue, s);
 
-    ret = pthread_attr_init(&attr);
-    if (ret)
-        die2(ret, "pthread_attr_init");
-
-    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-    if (ret)
-        die2(ret, "pthread_attr_setdetachstate");
-
-    QTAILQ_INIT(&request_list);
-
     posix_aio_state = s;
     return 0;
 }

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c
  2010-11-10 13:19 [Qemu-devel] v10: [PATCH 0/3] Threadlets: A generic task offloading framework Arun R Bharadwaj
  2010-11-10 13:19 ` [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure Arun R Bharadwaj
@ 2010-11-10 13:19 ` Arun R Bharadwaj
  2010-11-10 13:47   ` Stefan Hajnoczi
  2010-11-10 17:29   ` Blue Swirl
  2010-11-10 13:19 ` [Qemu-devel] [PATCH 3/3] Add helper functions to enable virtio-9p make use of the threadlets Arun R Bharadwaj
  2 siblings, 2 replies; 16+ messages in thread
From: Arun R Bharadwaj @ 2010-11-10 13:19 UTC (permalink / raw)
  To: qemu-devel

The reason for creating this generic infrastructure is so that other subsystems,
such as virtio-9p could make use of it for offloading tasks that could block.

Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
---
 Makefile.objs          |    1 
 docs/async-support.txt |  141 +++++++++++++++++++++++++++++++++++++++
 posix-aio-compat.c     |  173 ------------------------------------------------
 qemu-threadlets.c      |  168 +++++++++++++++++++++++++++++++++++++++++++++++
 qemu-threadlets.h      |   48 +++++++++++++
 5 files changed, 359 insertions(+), 172 deletions(-)
 create mode 100644 docs/async-support.txt
 create mode 100644 qemu-threadlets.c
 create mode 100644 qemu-threadlets.h

diff --git a/Makefile.objs b/Makefile.objs
index 3b7ec27..2cf8aba 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -10,6 +10,7 @@ qobject-obj-y += qerror.o
 block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
 block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
 block-obj-$(CONFIG_POSIX) += qemu-thread.o
+block-obj-$(CONFIG_POSIX) += qemu-threadlets.o
 block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 
diff --git a/docs/async-support.txt b/docs/async-support.txt
new file mode 100644
index 0000000..9f22b9a
--- /dev/null
+++ b/docs/async-support.txt
@@ -0,0 +1,141 @@
+== How to use the threadlets infrastructure supported in Qemu ==
+
+== Threadlets ==
+
+Q.1: What are threadlets ?
+A.1: Threadlets is an infrastructure within QEMU that allows other subsystems
+     to offload possibly blocking work to a queue to be processed by a pool
+     of threads asynchronously.
+
+Q.2: When would one want to use threadlets ?
+A.2: Threadlets are useful when there are operations that can be performed
+     outside the context of the VCPU/IO threads inorder to free these latter
+     to service any other guest requests.
+
+Q.3: I have some work that can be executed in an asynchronous context. How
+     should I go about it ?
+A.3: One could follow the steps listed below:
+
+     - Define a function which would do the asynchronous work.
+	static void my_threadlet_func(ThreadletWork *work)
+	{
+	}
+
+     - Declare an object of type ThreadletWork;
+	ThreadletWork work;
+
+
+     - Assign a value to the "func" member of ThreadletWork object.
+	work.func = my_threadlet_func;
+
+     - Submit the threadlet to the global queue.
+	submit_threadletwork(&work);
+
+     - Continue servicing some other guest operations.
+
+Q.4: I want to my_threadlet_func to access some non-global data. How do I do
+     that ?
+A.4: Suppose you want my_threadlet_func to access some non-global data-object
+     of type myPrivateData. In that case one could follow the following steps.
+
+     - Define a member of the type ThreadletWork within myPrivateData.
+	typedef struct MyPrivateData {
+		...;
+		...;
+		...;
+		ThreadletWork work;
+	} MyPrivateData;
+
+	MyPrivateData my_data;
+
+     - Initialize myData.work as described in A.3
+	myData.work.func = my_threadlet_func;
+	submit_threadletwork(&myData.work);
+
+     - Access the myData object inside my_threadlet_func() using container_of
+       primitive
+	static void my_threadlet_func(ThreadletWork *work)
+	{
+		myPrivateData *mydata_ptr;
+		mydata_ptr = container_of(work, myPrivateData, work);
+
+		/* mydata_ptr now points to myData object */
+	}
+
+Q.5: Are there any precautions one must take while sharing data with the
+     Asynchronous thread-pool ?
+A.5: Yes, make sure that the helper function of the type my_threadlet_func()
+     does not access/modify data when it can be accessed or modified in the
+     context of VCPU thread or IO thread. This is because the asynchronous
+     threads in the pool can run in parallel with the VCPU/IOThreads as shown
+     in the figure.
+
+     A typical workflow is as follows:
+
+              VCPU/IOThread
+                   |
+                   | (1)
+                   |
+                   V
+                Offload work              (2)
+      |-------> to threadlets -----------------------------> Helper thread
+      |            |                                               |
+      |            |                                               |
+      |            | (3)                                           | (4)
+      |            |                                               |
+      |         Handle other Guest requests                        |
+      |            |                                               |
+      |            |                                               V
+      |            | (3)                                  Signal the I/O Thread
+      |(6)         |                                               |
+      |            |                                              /
+      |            |                                             /
+      |            V                                            /
+      |          Do the post <---------------------------------/
+      |          processing               (5)
+      |            |
+      |            | (6)
+      |            V
+      |-Yes------ More async work?
+                   |
+                   | (7)
+	           No
+                   |
+                   |
+                   .
+                   .
+
+    Hence one needs to make sure that in the steps (3) and (4) which run in
+    parallel, any global data is accessed within only one context.
+
+Q.6: I have queued a threadlet which I want to cancel. How do I do that ?
+A.6: Threadlets framework provides the API cancel_threadlet:
+       - int cancel_threadletwork(ThreadletWork *work)
+
+     The API scans the ThreadletQueue to see if (work) is present. If it finds
+     work, it'll dequeue work and return 0.
+
+     On the other hand, if it does not find the (work) in the ThreadletQueue,
+     then it'll return 1. This can imply two things. Either the work is being
+     processed by one of the helper threads or it has been processed. The
+     threadlet infrastructure currently _does_not_ distinguish between these
+     two and the onus is on the caller to do that.
+
+Q.7: Apart from the global pool of threads, can I have my own private Queue ?
+A.7: Yes, the threadlets framework allows subsystems to create their own private
+     queues with associated pools of threads.
+
+     - Define a PrivateQueue
+       ThreadletQueue myQueue;
+
+     - Initialize it:
+       threadlet_queue_init(&myQueue, my_max_threads, my_min_threads);
+       where my_max_threads is the maximum number of threads that can be in the
+       thread pool and my_min_threads is the minimum number of active threads
+       that can be in the thread-pool.
+
+     - Submit work:
+       submit_threadletwork_to_queue(&myQueue, &my_work);
+
+     - Cancel work:
+       cancel_threadletwork_on_queue(&myQueue, &my_work);
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index dc5f8ec..b357449 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,34 +29,11 @@
 #include "block_int.h"
 
 #include "block/raw-posix-aio.h"
-#include "qemu-thread.h"
-
-#define MAX_GLOBAL_THREADS  64
-#define MIN_GLOBAL_THREADS   8
+#include "qemu-threadlets.h"
 
 static QemuMutex aiocb_mutex;
 static QemuCond aiocb_completion;
 
-typedef struct ThreadletQueue
-{
-    QemuMutex lock;
-    QemuCond cond;
-    int max_threads;
-    int min_threads;
-    int cur_threads;
-    int idle_threads;
-    QTAILQ_HEAD(, ThreadletWork) request_list;
-} ThreadletQueue;
-
-typedef struct ThreadletWork
-{
-    QTAILQ_ENTRY(ThreadletWork) node;
-    void (*func)(struct ThreadletWork *work);
-} ThreadletWork;
-
-static ThreadletQueue globalqueue;
-static int globalqueue_init;
-
 struct qemu_paiocb {
     BlockDriverAIOCB common;
     int aio_fildes;
@@ -83,154 +60,6 @@ typedef struct PosixAioState {
     struct qemu_paiocb *first_aio;
 } PosixAioState;
 
-static void *threadlet_worker(void *data)
-{
-    ThreadletQueue *queue = data;
-
-    qemu_mutex_lock(&queue->lock);
-    while (1) {
-        ThreadletWork *work;
-        int ret = 0;
-
-        while (QTAILQ_EMPTY(&queue->request_list) &&
-               (ret != ETIMEDOUT)) {
-            /* wait for cond to be signalled or broadcast for 1000s */
-            ret = qemu_cond_timedwait((&queue->cond),
-                                         &(queue->lock), 10*100000);
-        }
-
-        assert(queue->idle_threads != 0);
-        if (QTAILQ_EMPTY(&queue->request_list)) {
-            if (queue->cur_threads > queue->min_threads) {
-                /* We retain the minimum number of threads */
-                break;
-            }
-        } else {
-            work = QTAILQ_FIRST(&queue->request_list);
-            QTAILQ_REMOVE(&queue->request_list, work, node);
-
-            queue->idle_threads--;
-            qemu_mutex_unlock(&queue->lock);
-
-            /* execute the work function */
-            work->func(work);
-
-            qemu_mutex_lock(&queue->lock);
-            queue->idle_threads++;
-        }
-    }
-
-    queue->idle_threads--;
-    queue->cur_threads--;
-    qemu_mutex_unlock(&queue->lock);
-
-    return NULL;
-}
-
-static void spawn_threadlet(ThreadletQueue *queue)
-{
-    QemuThread thread;
-
-    queue->cur_threads++;
-    queue->idle_threads++;
-
-    qemu_thread_create(&thread, threadlet_worker, queue);
-}
-
-/**
- * submit_work_to_queue: Submit a new task to a private queue to be
- *                            executed asynchronously.
- * @queue: Per-subsystem private queue to which the new task needs
- *         to be submitted.
- * @work: Contains information about the task that needs to be submitted.
- */
-static void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
-{
-    qemu_mutex_lock(&queue->lock);
-    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
-        spawn_threadlet(queue);
-    } else {
-        qemu_cond_signal(&queue->cond);
-    }
-    QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
-    qemu_mutex_unlock(&queue->lock);
-}
-
-static void threadlet_queue_init(ThreadletQueue *queue,
-                          int max_threads, int min_threads);
-
-/**
- * submit_work: Submit to the global queue a new task to be executed
- *                   asynchronously.
- * @work: Contains information about the task that needs to be submitted.
- */
-static void submit_work(ThreadletWork *work)
-{
-    if (!globalqueue_init) {
-        threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
-                                MIN_GLOBAL_THREADS);
-        globalqueue_init = 1;
-    }
-
-    submit_work_to_queue(&globalqueue, work);
-}
-
-/**
- * dequeue_work_on_queue: Cancel a task queued on a Queue.
- * @queue: The queue containing the task to be cancelled.
- * @work: Contains the information of the task that needs to be cancelled.
- *
- * Returns: 0 if the task is successfully cancelled.
- *          1 otherwise.
- */
-static int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
-{
-    ThreadletWork *ret_work;
-    int ret = 1;
-
-    qemu_mutex_lock(&queue->lock);
-    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
-        if (ret_work == work) {
-            QTAILQ_REMOVE(&queue->request_list, ret_work, node);
-            ret = 0;
-            break;
-        }
-    }
-    qemu_mutex_unlock(&queue->lock);
-
-    return ret;
-}
-
-/**
- * dequeue_work: Cancel a task queued on the global queue.
- * @work: Contains the information of the task that needs to be cancelled.
- *
- * Returns: 0 if the task is successfully cancelled.
- *          1 otherwise.
- */
-static int dequeue_work(ThreadletWork *work)
-{
-    return dequeue_work_on_queue(&globalqueue, work);
-}
-
-/**
- * threadlet_queue_init: Initialize a threadlet queue.
- * @queue: The threadlet queue to be initialized.
- * @max_threads: Maximum number of threads processing the queue.
- * @min_threads: Minimum number of threads processing the queue.
- */
-static void threadlet_queue_init(ThreadletQueue *queue,
-                                    int max_threads, int min_threads)
-{
-    queue->cur_threads  = 0;
-    queue->idle_threads = 0;
-    queue->max_threads  = max_threads;
-    queue->min_threads  = min_threads;
-    QTAILQ_INIT(&queue->request_list);
-    qemu_mutex_init(&queue->lock);
-    qemu_cond_init(&queue->cond);
-}
-
 #ifdef CONFIG_PREADV
 static int preadv_present = 1;
 #else
diff --git a/qemu-threadlets.c b/qemu-threadlets.c
new file mode 100644
index 0000000..23b4ecf
--- /dev/null
+++ b/qemu-threadlets.c
@@ -0,0 +1,168 @@
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Anthony Liguori     <aliguori@us.ibm.com>
+ *  Aneesh Kumar K.V    <aneesh.kumar@linux.vnet.ibm.com>
+ *  Gautham R Shenoy    <gautham.shenoy@gmail.com>
+ *  Arun R Bharadwaj    <arun@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ */
+
+#include "qemu-threadlets.h"
+#include "osdep.h"
+
+#define MAX_GLOBAL_THREADS  64
+#define MIN_GLOBAL_THREADS   8
+static ThreadletQueue globalqueue;
+static int globalqueue_init;
+
+static void *threadlet_worker(void *data)
+{
+    ThreadletQueue *queue = data;
+
+    qemu_mutex_lock(&queue->lock);
+    while (1) {
+        ThreadletWork *work;
+        int ret = 0;
+
+        while (QTAILQ_EMPTY(&queue->request_list) &&
+               (ret != ETIMEDOUT)) {
+            /* wait for cond to be signalled or broadcast for 1000s */
+            ret = qemu_cond_timedwait((&queue->cond),
+                                         &(queue->lock), 10*100000);
+        }
+
+        assert(queue->idle_threads != 0);
+        if (QTAILQ_EMPTY(&queue->request_list)) {
+            if (queue->cur_threads > queue->min_threads) {
+                /* We retain the minimum number of threads */
+                break;
+            }
+        } else {
+            work = QTAILQ_FIRST(&queue->request_list);
+            QTAILQ_REMOVE(&queue->request_list, work, node);
+
+            queue->idle_threads--;
+            qemu_mutex_unlock(&queue->lock);
+
+            /* execute the work function */
+            work->func(work);
+
+            qemu_mutex_lock(&queue->lock);
+            queue->idle_threads++;
+        }
+    }
+
+    queue->idle_threads--;
+    queue->cur_threads--;
+    qemu_mutex_unlock(&queue->lock);
+
+    return NULL;
+}
+
+static void spawn_threadlet(ThreadletQueue *queue)
+{
+    QemuThread thread;
+
+    queue->cur_threads++;
+    queue->idle_threads++;
+
+    qemu_thread_create(&thread, threadlet_worker, queue);
+}
+
+/**
+ * submit_work_to_queue: Submit a new task to a private queue to be
+ *                            executed asynchronously.
+ * @queue: Per-subsystem private queue to which the new task needs
+ *         to be submitted.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
+{
+    qemu_mutex_lock(&queue->lock);
+    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+        spawn_threadlet(queue);
+    } else {
+        qemu_cond_signal(&queue->cond);
+    }
+    QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
+    qemu_mutex_unlock(&queue->lock);
+}
+
+/**
+ * submit_work: Submit to the global queue a new task to be executed
+ *                   asynchronously.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_work(ThreadletWork *work)
+{
+    if (!globalqueue_init) {
+        threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
+                                MIN_GLOBAL_THREADS);
+        globalqueue_init = 1;
+    }
+
+    submit_work_to_queue(&globalqueue, work);
+}
+
+/**
+ * dequeue_work_on_queue: Cancel a task queued on a Queue.
+ * @queue: The queue containing the task to be cancelled.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ *          1 otherwise.
+ */
+int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
+{
+    ThreadletWork *ret_work;
+    int ret = 1;
+
+    qemu_mutex_lock(&queue->lock);
+    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
+        if (ret_work == work) {
+            QTAILQ_REMOVE(&queue->request_list, ret_work, node);
+            ret = 0;
+            break;
+        }
+    }
+    qemu_mutex_unlock(&queue->lock);
+
+    return ret;
+}
+
+/**
+ * dequeue_work: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ *          1 otherwise.
+ */
+int dequeue_work(ThreadletWork *work)
+{
+    return dequeue_work_on_queue(&globalqueue, work);
+}
+
+/**
+ * threadlet_queue_init: Initialize a threadlet queue.
+ * @queue: The threadlet queue to be initialized.
+ * @max_threads: Maximum number of threads processing the queue.
+ * @min_threads: Minimum number of threads processing the queue.
+ */
+void threadlet_queue_init(ThreadletQueue *queue,
+                                    int max_threads, int min_threads)
+{
+    queue->cur_threads  = 0;
+    queue->idle_threads = 0;
+    queue->max_threads  = max_threads;
+    queue->min_threads  = min_threads;
+    QTAILQ_INIT(&queue->request_list);
+    qemu_mutex_init(&queue->lock);
+    qemu_cond_init(&queue->cond);
+}
diff --git a/qemu-threadlets.h b/qemu-threadlets.h
new file mode 100644
index 0000000..f869304
--- /dev/null
+++ b/qemu-threadlets.h
@@ -0,0 +1,48 @@
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Anthony Liguori     <aliguori@us.ibm.com>
+ *  Aneesh Kumar K.V    <aneesh.kumar@linux.vnet.ibm.com>
+ *  Gautham R Shenoy    <gautham.shenoy@gmail.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ */
+
+#ifndef QEMU_ASYNC_WORK_H
+#define QEMU_ASYNC_WORK_H
+
+#include "qemu-queue.h"
+#include "qemu-common.h"
+#include "qemu-thread.h"
+
+typedef struct ThreadletQueue
+{
+    QemuMutex lock;
+    QemuCond cond;
+    int max_threads;
+    int min_threads;
+    int cur_threads;
+    int idle_threads;
+    QTAILQ_HEAD(, ThreadletWork) request_list;
+} ThreadletQueue;
+
+typedef struct ThreadletWork
+{
+    QTAILQ_ENTRY(ThreadletWork) node;
+    void (*func)(struct ThreadletWork *work);
+} ThreadletWork;
+
+extern void submit_work_to_queue(ThreadletQueue *queue,
+                                 ThreadletWork *work);
+extern void submit_work(ThreadletWork *work);
+extern int dequeue_work_on_queue(ThreadletQueue *queue,
+                                 ThreadletWork *work);
+extern int dequeue_work(ThreadletWork *work);
+extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
+                                 int min_threads);
+#endif

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [Qemu-devel] [PATCH 3/3] Add helper functions to enable virtio-9p make use of the threadlets
  2010-11-10 13:19 [Qemu-devel] v10: [PATCH 0/3] Threadlets: A generic task offloading framework Arun R Bharadwaj
  2010-11-10 13:19 ` [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure Arun R Bharadwaj
  2010-11-10 13:19 ` [Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c Arun R Bharadwaj
@ 2010-11-10 13:19 ` Arun R Bharadwaj
  2010-11-10 13:54   ` Stefan Hajnoczi
  2 siblings, 1 reply; 16+ messages in thread
From: Arun R Bharadwaj @ 2010-11-10 13:19 UTC (permalink / raw)
  To: qemu-devel

From: Gautham R Shenoy <ego@in.ibm.com>

infrastructure for offloading blocking tasks such as making posix calls on
to the helper threads and handle the post_posix_operations() from the
context of the iothread. This frees the vcpu thread to process any other guest
operations while the processing of v9fs_io is in progress.

Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
 hw/virtio-9p.c     |  164 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 posix-aio-compat.c |   30 +++-------
 qemu-threadlets.c  |   21 +++++++
 qemu-threadlets.h  |    1 
 vl.c               |    3 +
 5 files changed, 196 insertions(+), 23 deletions(-)

diff --git a/hw/virtio-9p.c b/hw/virtio-9p.c
index 7c59988..1ada57b 100644
--- a/hw/virtio-9p.c
+++ b/hw/virtio-9p.c
@@ -18,6 +18,7 @@
 #include "fsdev/qemu-fsdev.h"
 #include "virtio-9p-debug.h"
 #include "virtio-9p-xattr.h"
+#include "qemu-threadlets.h"
 
 int debug_9p_pdu;
 
@@ -33,6 +34,149 @@ enum {
     Oappend = 0x80,
 };
 
+typedef struct V9fsPostOp {
+    QTAILQ_ENTRY(V9fsPostOp) node;
+    void (*func)(void *arg);
+    void *arg;
+} V9fsPostOp;
+
+static struct {
+    int rfd;
+    int wfd;
+    QemuMutex lock;
+    QTAILQ_HEAD(, V9fsPostOp) post_op_list;
+} v9fs_async_struct;
+
+static void die2(int err, const char *what)
+{
+    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
+    abort();
+}
+
+static void die(const char *what)
+{
+    die2(errno, what);
+}
+
+#define ASYNC_MAX_PROCESS   5
+
+/**
+ * v9fs_process_post_ops: Process any pending v9fs_post_posix_operation
+ * @arg: Not used.
+ *
+ * This function serves as a callback to the iothread to be called into whenever
+ * the v9fs_async_struct.wfd is written into. This thread goes through the list
+ * of v9fs_post_posix_operations() and executes them. In the process, it might
+ * queue more job on the asynchronous thread pool.
+ */
+static void v9fs_process_post_ops(void *arg)
+{
+    int count = 0;
+    struct V9fsPostOp *post_op;
+    int ret;
+    char byte;
+
+    qemu_mutex_lock(&v9fs_async_struct.lock);
+    do {
+        ret = read(v9fs_async_struct.rfd, &byte, sizeof(byte));
+    } while (ret > 1 || (ret == -1 && errno == EINTR));
+
+    for (count = 0; count < ASYNC_MAX_PROCESS; count++) {
+        if (QTAILQ_EMPTY(&(v9fs_async_struct.post_op_list))) {
+            break;
+        }
+        post_op = QTAILQ_FIRST(&(v9fs_async_struct.post_op_list));
+        QTAILQ_REMOVE(&(v9fs_async_struct.post_op_list), post_op, node);
+
+        qemu_mutex_unlock(&v9fs_async_struct.lock);
+        post_op->func(post_op->arg);
+        qemu_free(post_op);
+        qemu_mutex_lock(&v9fs_async_struct.lock);
+    }
+    qemu_mutex_unlock(&v9fs_async_struct.lock);
+}
+
+/**
+ * v9fs_async_signal: Inform the io-thread of completion of async job.
+ *
+ * This function is used to inform the iothread that a particular
+ * async-operation pertaining to v9fs has been completed and that the io thread
+ * can handle the v9fs_post_posix_operation.
+ *
+ * This is based on the aio_signal_handler
+ */
+static inline void v9fs_async_signal(void)
+{
+    char byte = 0;
+    ssize_t ret;
+    int tries = 0;
+
+    qemu_mutex_lock(&v9fs_async_struct.lock);
+    do {
+        assert(tries != 100);
+       ret = write(v9fs_async_struct.wfd, &byte, sizeof(byte));
+        tries++;
+    } while (ret < 0 && errno == EAGAIN);
+    qemu_mutex_unlock(&v9fs_async_struct.lock);
+
+    if (ret < 0 && errno != EAGAIN) {
+        die("write() in v9fs");
+    }
+
+    if (kill(getpid(), SIGUSR2)) {
+        die("kill failed");
+    }
+}
+
+/**
+ * v9fs_async_helper_done: Marks the completion of the v9fs_async job
+ * @func: v9fs_post_posix_func() for post-processing invoked in the context of
+ *        the io-thread
+ * @arg: Argument to func.
+ *
+ * This function is called from the context of one of the asynchronous threads
+ * in the thread pool. This is called when the asynchronous thread has finished
+ * executing a v9fs_posix_operation. It's purpose is to initiate the process of
+ * informing the io-thread that the v9fs_posix_operation has completed.
+ */
+static void v9fs_async_helper_done(void (*func)(void *arg), void *arg)
+{
+    struct V9fsPostOp *post_op;
+
+    post_op = qemu_mallocz(sizeof(*post_op));
+    post_op->func = func;
+    post_op->arg = arg;
+
+    qemu_mutex_lock(&v9fs_async_struct.lock);
+    QTAILQ_INSERT_TAIL(&(v9fs_async_struct.post_op_list), post_op, node);
+    qemu_mutex_unlock(&v9fs_async_struct.lock);
+
+    v9fs_async_signal();
+}
+
+/**
+ * v9fs_do_async_posix: Offload v9fs_posix_operation onto async thread.
+ * @vs: V9fsOPState variable for the OP operation.
+ * @posix_fn: The posix function which has to be offloaded onto async thread.
+ * @post_fn_ptr: Address of the location to hold the post_fn corresponding to
+ * the posix_fn
+ * @post_fn: The post processing function corresponding to the posix_fn.
+ *
+ * This function is a helper to offload posix_operation on to the asynchronous
+ * thread pool. It sets up the associations with the post_function that needs to
+ * be invoked by from the context of the iothread once the posix_fn has been
+ * executed.
+ */
+static void v9fs_do_async_posix(ThreadletWork *work ,
+                                void (*posix_fn)(ThreadletWork *work),
+                                void (**post_fn_ptr)(void *arg),
+                                void (*post_fn)(void *arg))
+{
+    *post_fn_ptr = post_fn;
+    work->func = posix_fn;
+    submit_work(work);
+}
+
 static int omode_to_uflags(int8_t mode)
 {
     int ret = 0;
@@ -3657,7 +3801,7 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
     int i, len;
     struct stat stat;
     FsTypeEntry *fse;
-
+    int fds[2];
 
     s = (V9fsState *)virtio_common_init("virtio-9p",
                                     VIRTIO_ID_9P,
@@ -3740,5 +3884,23 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
                         s->tag_len;
     s->vdev.get_config = virtio_9p_get_config;
 
+    if (qemu_pipe(fds) == -1) {
+        fprintf(stderr, "failed to create fd's for virtio-9p\n");
+        exit(1);
+    }
+
+    v9fs_async_struct.rfd = fds[0];
+    v9fs_async_struct.wfd = fds[1];
+
+    fcntl(fds[0], F_SETFL, O_NONBLOCK);
+    fcntl(fds[1], F_SETFL, O_NONBLOCK);
+
+    qemu_set_fd_handler(fds[0], v9fs_process_post_ops, NULL, NULL);
+    QTAILQ_INIT(&v9fs_async_struct.post_op_list);
+    qemu_mutex_init(&(v9fs_async_struct.lock));
+
+    (void)v9fs_do_async_posix;
+    (void)v9fs_async_helper_done;
+
     return &s->vdev;
 }
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index b357449..515579f 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -260,11 +260,14 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
     return nbytes;
 }
 
+static PosixAioState *posix_aio_state;
+
 static void aio_thread(ThreadletWork *work)
 {
     pid_t pid;
     struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
     ssize_t ret = 0;
+    char byte = 0;
 
     pid = getpid();
 
@@ -289,6 +292,11 @@ static void aio_thread(ThreadletWork *work)
     aiocb->ret = ret;
     qemu_mutex_unlock(&aiocb_mutex);
 
+    ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
+    if (ret < 0 && errno != EAGAIN) {
+        die("write()");
+    }
+
     if (kill(pid, aiocb->ev_signo)) {
         die("kill failed");
     }
@@ -407,22 +415,6 @@ static int posix_aio_flush(void *opaque)
     return !!s->first_aio;
 }
 
-static PosixAioState *posix_aio_state;
-
-static void aio_signal_handler(int signum)
-{
-    if (posix_aio_state) {
-        char byte = 0;
-        ssize_t ret;
-
-        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
-        if (ret < 0 && errno != EAGAIN)
-            die("write()");
-    }
-
-    qemu_service_io();
-}
-
 static void paio_remove(struct qemu_paiocb *acb)
 {
     struct qemu_paiocb **pacb;
@@ -518,7 +510,6 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
 
 int paio_init(void)
 {
-    struct sigaction act;
     PosixAioState *s;
     int fds[2];
 
@@ -529,11 +520,6 @@ int paio_init(void)
 
     s = qemu_malloc(sizeof(PosixAioState));
 
-    sigfillset(&act.sa_mask);
-    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
-    act.sa_handler = aio_signal_handler;
-    sigaction(SIGUSR2, &act, NULL);
-
     s->first_aio = NULL;
     if (qemu_pipe(fds) == -1) {
         fprintf(stderr, "failed to create pipe\n");
diff --git a/qemu-threadlets.c b/qemu-threadlets.c
index 23b4ecf..1e78f56 100644
--- a/qemu-threadlets.c
+++ b/qemu-threadlets.c
@@ -16,12 +16,28 @@
 
 #include "qemu-threadlets.h"
 #include "osdep.h"
+#include <signal.h>
 
 #define MAX_GLOBAL_THREADS  64
 #define MIN_GLOBAL_THREADS   8
 static ThreadletQueue globalqueue;
 static int globalqueue_init;
 
+static void threadlet_io_completion_signal_handler(int signum)
+{
+    qemu_service_io();
+}
+
+static void threadlet_register_signal_handler(void)
+{
+    struct sigaction act;
+
+    sigfillset(&act.sa_mask);
+    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
+    act.sa_handler = threadlet_io_completion_signal_handler;
+    sigaction(SIGUSR2, &act, NULL);
+}
+
 static void *threadlet_worker(void *data)
 {
     ThreadletQueue *queue = data;
@@ -166,3 +182,8 @@ void threadlet_queue_init(ThreadletQueue *queue,
     qemu_mutex_init(&queue->lock);
     qemu_cond_init(&queue->cond);
 }
+
+void threadlet_init(void)
+{
+   threadlet_register_signal_handler();
+}
diff --git a/qemu-threadlets.h b/qemu-threadlets.h
index f869304..6f67a56 100644
--- a/qemu-threadlets.h
+++ b/qemu-threadlets.h
@@ -45,4 +45,5 @@ extern int dequeue_work_on_queue(ThreadletQueue *queue,
 extern int dequeue_work(ThreadletWork *work);
 extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
                                  int min_threads);
+extern void threadlet_init(void);
 #endif
diff --git a/vl.c b/vl.c
index df414ef..7b9a425 100644
--- a/vl.c
+++ b/vl.c
@@ -148,6 +148,7 @@ int main(int argc, char **argv)
 #include "qemu-config.h"
 #include "qemu-objects.h"
 #include "qemu-options.h"
+#include "qemu-threadlets.h"
 #ifdef CONFIG_VIRTFS
 #include "fsdev/qemu-fsdev.h"
 #endif
@@ -2922,6 +2923,8 @@ int main(int argc, char **argv, char **envp)
             exit(1);
     }
 
+    threadlet_init();
+
     /* init generic devices */
     if (qemu_opts_foreach(qemu_find_opts("device"), device_init_func, NULL, 1) != 0)
         exit(1);

^ permalink raw reply related	[flat|nested] 16+ messages in thread

* Re: [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure
  2010-11-10 13:19 ` [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure Arun R Bharadwaj
@ 2010-11-10 13:45   ` Stefan Hajnoczi
  2010-11-10 17:54     ` Arun R Bharadwaj
  2010-11-11  8:41     ` [Qemu-devel] " Paolo Bonzini
  0 siblings, 2 replies; 16+ messages in thread
From: Stefan Hajnoczi @ 2010-11-10 13:45 UTC (permalink / raw)
  To: Arun R Bharadwaj; +Cc: qemu-devel

On Wed, Nov 10, 2010 at 1:19 PM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> @@ -301,106 +431,58 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
>     return nbytes;
>  }
>
> -static void *aio_thread(void *unused)
> +static void aio_thread(ThreadletWork *work)
>  {
>     pid_t pid;
> +    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
> +    ssize_t ret = 0;
>
>     pid = getpid();
>
> -    while (1) {
> -        struct qemu_paiocb *aiocb;
> -        ssize_t ret = 0;
> -        qemu_timeval tv;
> -        struct timespec ts;
> -
> -        qemu_gettimeofday(&tv);
> -        ts.tv_sec = tv.tv_sec + 10;
> -        ts.tv_nsec = 0;
> -
> -        mutex_lock(&lock);
> -
> -        while (QTAILQ_EMPTY(&request_list) &&
> -               !(ret == ETIMEDOUT)) {
> -            ret = cond_timedwait(&cond, &lock, &ts);
> -        }
> -
> -        if (QTAILQ_EMPTY(&request_list))
> -            break;
> -
> -        aiocb = QTAILQ_FIRST(&request_list);
> -        QTAILQ_REMOVE(&request_list, aiocb, node);
> -        aiocb->active = 1;
> -        idle_threads--;
> -        mutex_unlock(&lock);
> -
> -        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> -        case QEMU_AIO_READ:
> -        case QEMU_AIO_WRITE:
> -            ret = handle_aiocb_rw(aiocb);
> -            break;
> -        case QEMU_AIO_FLUSH:
> -            ret = handle_aiocb_flush(aiocb);
> -            break;
> -        case QEMU_AIO_IOCTL:
> -            ret = handle_aiocb_ioctl(aiocb);
> -            break;
> -        default:
> -            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> -            ret = -EINVAL;
> -            break;
> -        }
> -
> -        mutex_lock(&lock);
> -        aiocb->ret = ret;
> -        idle_threads++;
> -        mutex_unlock(&lock);
> -
> -        if (kill(pid, aiocb->ev_signo)) die("kill failed");
> +    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> +    case QEMU_AIO_READ:
> +    case QEMU_AIO_WRITE:
> +        ret = handle_aiocb_rw(aiocb);
> +        break;
> +    case QEMU_AIO_FLUSH:
> +        ret = handle_aiocb_flush(aiocb);
> +        break;
> +    case QEMU_AIO_IOCTL:
> +        ret = handle_aiocb_ioctl(aiocb);
> +        break;
> +    default:
> +        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> +        ret = -EINVAL;
> +        break;
>     }
>
> -    idle_threads--;
> -    cur_threads--;
> -    mutex_unlock(&lock);
> -
> -    return NULL;
> -}
> -
> -static void spawn_thread(void)
> -{
> -    sigset_t set, oldset;
> -
> -    cur_threads++;
> -    idle_threads++;
> +    qemu_mutex_lock(&aiocb_mutex);
> +    aiocb->ret = ret;

This is where qemu_cond_broadcast() is needed.

> +    qemu_mutex_unlock(&aiocb_mutex);
>
> -    /* block all signals */
> -    if (sigfillset(&set)) die("sigfillset");
> -    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
> -
> -    thread_create(&thread_id, &attr, aio_thread, NULL);
> -
> -    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
> +    if (kill(pid, aiocb->ev_signo)) {
> +        die("kill failed");
> +    }
>  }
>
>  static void qemu_paio_submit(struct qemu_paiocb *aiocb)
>  {
> +    qemu_mutex_lock(&aiocb_mutex);
>     aiocb->ret = -EINPROGRESS;
> -    aiocb->active = 0;
> -    mutex_lock(&lock);
> -    if (idle_threads == 0 && cur_threads < max_threads)
> -        spawn_thread();
> -    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
> -    mutex_unlock(&lock);
> -    cond_signal(&cond);
> +    qemu_mutex_unlock(&aiocb_mutex);
> +
> +    aiocb->work.func = aio_thread;
> +    submit_work(&aiocb->work);
>  }
>
>  static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
>  {
>     ssize_t ret;
>
> -    mutex_lock(&lock);
> +    qemu_mutex_lock(&aiocb_mutex);
>     ret = aiocb->ret;
> -    mutex_unlock(&lock);
> -
> +    qemu_mutex_unlock(&aiocb_mutex);
> +    qemu_cond_broadcast(&aiocb_completion);

qemu_paio_return() gets the return value from qemu_paiocb.  It
shouldn't have side effects and is the wrong place to broadcast
completion.

>     return ret;
>  }
>
> @@ -534,22 +616,14 @@ static void paio_remove(struct qemu_paiocb *acb)
>  static void paio_cancel(BlockDriverAIOCB *blockacb)
>  {
>     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> -    int active = 0;
> -
> -    mutex_lock(&lock);
> -    if (!acb->active) {
> -        QTAILQ_REMOVE(&request_list, acb, node);
> -        acb->ret = -ECANCELED;
> -    } else if (acb->ret == -EINPROGRESS) {
> -        active = 1;
> -    }
> -    mutex_unlock(&lock);
>
> -    if (active) {
> -        /* fail safe: if the aio could not be canceled, we wait for
> -           it */
> -        while (qemu_paio_error(acb) == EINPROGRESS)
> -            ;
> +    if (dequeue_work(&acb->work) != 0) {
> +        /* Wait for running work item to complete */
> +        qemu_mutex_lock(&aiocb_mutex);
> +        while (acb->ret == -EINPROGRESS) {
> +            qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
> +        }
> +        qemu_mutex_unlock(&aiocb_mutex);

I wonder if the condition variable has a measurable performance
overhead.  We unconditionally broadcast on paiocb completion.  One
idea would be to keep a counter of waiters (should only ever be 0 or
1) protected by aiocb_mutex and broadcast only when there is a waiter.
 I just want to share this idea, I don't know if it's necessary to
implement it or if it could even work without a race condition.

>     }
>
>     paio_remove(acb);
> @@ -618,11 +692,12 @@ int paio_init(void)
>     struct sigaction act;
>     PosixAioState *s;
>     int fds[2];
> -    int ret;
>
>     if (posix_aio_state)
>         return 0;
>
> +    qemu_mutex_init(&aiocb_mutex);

Also needs qemu_cond_init(&aiocb_completion).

Stefan

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c
  2010-11-10 13:19 ` [Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c Arun R Bharadwaj
@ 2010-11-10 13:47   ` Stefan Hajnoczi
  2010-11-11  8:18     ` Arun R Bharadwaj
  2010-11-10 17:29   ` Blue Swirl
  1 sibling, 1 reply; 16+ messages in thread
From: Stefan Hajnoczi @ 2010-11-10 13:47 UTC (permalink / raw)
  To: Arun R Bharadwaj; +Cc: qemu-devel

On Wed, Nov 10, 2010 at 1:19 PM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> The reason for creating this generic infrastructure is so that other subsystems,
> such as virtio-9p could make use of it for offloading tasks that could block.
>
> Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
> Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
> Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com>
> Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
> ---
>  Makefile.objs          |    1
>  docs/async-support.txt |  141 +++++++++++++++++++++++++++++++++++++++
>  posix-aio-compat.c     |  173 ------------------------------------------------
>  qemu-threadlets.c      |  168 +++++++++++++++++++++++++++++++++++++++++++++++
>  qemu-threadlets.h      |   48 +++++++++++++
>  5 files changed, 359 insertions(+), 172 deletions(-)
>  create mode 100644 docs/async-support.txt
>  create mode 100644 qemu-threadlets.c
>  create mode 100644 qemu-threadlets.h

Looks good.

Stefan

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [Qemu-devel] [PATCH 3/3] Add helper functions to enable virtio-9p make use of the threadlets
  2010-11-10 13:19 ` [Qemu-devel] [PATCH 3/3] Add helper functions to enable virtio-9p make use of the threadlets Arun R Bharadwaj
@ 2010-11-10 13:54   ` Stefan Hajnoczi
  0 siblings, 0 replies; 16+ messages in thread
From: Stefan Hajnoczi @ 2010-11-10 13:54 UTC (permalink / raw)
  To: Arun R Bharadwaj; +Cc: qemu-devel

On Wed, Nov 10, 2010 at 1:19 PM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> +static void v9fs_process_post_ops(void *arg)
> +{
> +    int count = 0;
> +    struct V9fsPostOp *post_op;
> +    int ret;
> +    char byte;
> +
> +    qemu_mutex_lock(&v9fs_async_struct.lock);

I don't think it is necessary to lock across rfd/wfd read()/write().
rfd/wfd are never changed once v9fs_async_struct has been set up.

Only post_op_list needs to be protected.

> +    do {
> +        ret = read(v9fs_async_struct.rfd, &byte, sizeof(byte));
> +    } while (ret > 1 || (ret == -1 && errno == EINTR));

ret > 1 looks like a typo, should be ret > 0.  sizeof(byte) == 1 means
ret > 1 is impossible.

Stefan

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c
  2010-11-10 13:19 ` [Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c Arun R Bharadwaj
  2010-11-10 13:47   ` Stefan Hajnoczi
@ 2010-11-10 17:29   ` Blue Swirl
  2010-11-10 17:56     ` Arun R Bharadwaj
  1 sibling, 1 reply; 16+ messages in thread
From: Blue Swirl @ 2010-11-10 17:29 UTC (permalink / raw)
  To: Arun R Bharadwaj; +Cc: qemu-devel

On Wed, Nov 10, 2010 at 1:19 PM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> The reason for creating this generic infrastructure is so that other subsystems,
> such as virtio-9p could make use of it for offloading tasks that could block.
>
> Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
> Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
> Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com>
> Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
> ---
>  Makefile.objs          |    1
>  docs/async-support.txt |  141 +++++++++++++++++++++++++++++++++++++++
>  posix-aio-compat.c     |  173 ------------------------------------------------
>  qemu-threadlets.c      |  168 +++++++++++++++++++++++++++++++++++++++++++++++
>  qemu-threadlets.h      |   48 +++++++++++++
>  5 files changed, 359 insertions(+), 172 deletions(-)
>  create mode 100644 docs/async-support.txt
>  create mode 100644 qemu-threadlets.c
>  create mode 100644 qemu-threadlets.h
>
> diff --git a/Makefile.objs b/Makefile.objs
> index 3b7ec27..2cf8aba 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -10,6 +10,7 @@ qobject-obj-y += qerror.o
>  block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
>  block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
>  block-obj-$(CONFIG_POSIX) += qemu-thread.o
> +block-obj-$(CONFIG_POSIX) += qemu-threadlets.o
>  block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
>  block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>
> diff --git a/docs/async-support.txt b/docs/async-support.txt
> new file mode 100644
> index 0000000..9f22b9a
> --- /dev/null
> +++ b/docs/async-support.txt
> @@ -0,0 +1,141 @@
> +== How to use the threadlets infrastructure supported in Qemu ==
> +
> +== Threadlets ==
> +
> +Q.1: What are threadlets ?
> +A.1: Threadlets is an infrastructure within QEMU that allows other subsystems
> +     to offload possibly blocking work to a queue to be processed by a pool
> +     of threads asynchronously.
> +
> +Q.2: When would one want to use threadlets ?
> +A.2: Threadlets are useful when there are operations that can be performed
> +     outside the context of the VCPU/IO threads inorder to free these latter
> +     to service any other guest requests.
> +
> +Q.3: I have some work that can be executed in an asynchronous context. How
> +     should I go about it ?
> +A.3: One could follow the steps listed below:
> +
> +     - Define a function which would do the asynchronous work.
> +       static void my_threadlet_func(ThreadletWork *work)
> +       {
> +       }
> +
> +     - Declare an object of type ThreadletWork;
> +       ThreadletWork work;
> +
> +
> +     - Assign a value to the "func" member of ThreadletWork object.
> +       work.func = my_threadlet_func;
> +
> +     - Submit the threadlet to the global queue.
> +       submit_threadletwork(&work);
> +
> +     - Continue servicing some other guest operations.
> +
> +Q.4: I want to my_threadlet_func to access some non-global data. How do I do
> +     that ?
> +A.4: Suppose you want my_threadlet_func to access some non-global data-object
> +     of type myPrivateData. In that case one could follow the following steps.
> +
> +     - Define a member of the type ThreadletWork within myPrivateData.
> +       typedef struct MyPrivateData {
> +               ...;
> +               ...;
> +               ...;
> +               ThreadletWork work;
> +       } MyPrivateData;
> +
> +       MyPrivateData my_data;
> +
> +     - Initialize myData.work as described in A.3
> +       myData.work.func = my_threadlet_func;
> +       submit_threadletwork(&myData.work);
> +
> +     - Access the myData object inside my_threadlet_func() using container_of
> +       primitive
> +       static void my_threadlet_func(ThreadletWork *work)
> +       {
> +               myPrivateData *mydata_ptr;
> +               mydata_ptr = container_of(work, myPrivateData, work);
> +
> +               /* mydata_ptr now points to myData object */
> +       }
> +
> +Q.5: Are there any precautions one must take while sharing data with the
> +     Asynchronous thread-pool ?
> +A.5: Yes, make sure that the helper function of the type my_threadlet_func()
> +     does not access/modify data when it can be accessed or modified in the
> +     context of VCPU thread or IO thread. This is because the asynchronous
> +     threads in the pool can run in parallel with the VCPU/IOThreads as shown
> +     in the figure.
> +
> +     A typical workflow is as follows:
> +
> +              VCPU/IOThread
> +                   |
> +                   | (1)
> +                   |
> +                   V
> +                Offload work              (2)
> +      |-------> to threadlets -----------------------------> Helper thread
> +      |            |                                               |
> +      |            |                                               |
> +      |            | (3)                                           | (4)
> +      |            |                                               |
> +      |         Handle other Guest requests                        |
> +      |            |                                               |
> +      |            |                                               V
> +      |            | (3)                                  Signal the I/O Thread
> +      |(6)         |                                               |
> +      |            |                                              /
> +      |            |                                             /
> +      |            V                                            /
> +      |          Do the post <---------------------------------/
> +      |          processing               (5)
> +      |            |
> +      |            | (6)
> +      |            V
> +      |-Yes------ More async work?
> +                   |
> +                   | (7)
> +                  No
> +                   |
> +                   |
> +                   .
> +                   .
> +
> +    Hence one needs to make sure that in the steps (3) and (4) which run in
> +    parallel, any global data is accessed within only one context.
> +
> +Q.6: I have queued a threadlet which I want to cancel. How do I do that ?
> +A.6: Threadlets framework provides the API cancel_threadlet:
> +       - int cancel_threadletwork(ThreadletWork *work)
> +
> +     The API scans the ThreadletQueue to see if (work) is present. If it finds
> +     work, it'll dequeue work and return 0.
> +
> +     On the other hand, if it does not find the (work) in the ThreadletQueue,
> +     then it'll return 1. This can imply two things. Either the work is being
> +     processed by one of the helper threads or it has been processed. The
> +     threadlet infrastructure currently _does_not_ distinguish between these
> +     two and the onus is on the caller to do that.
> +
> +Q.7: Apart from the global pool of threads, can I have my own private Queue ?
> +A.7: Yes, the threadlets framework allows subsystems to create their own private
> +     queues with associated pools of threads.
> +
> +     - Define a PrivateQueue
> +       ThreadletQueue myQueue;
> +
> +     - Initialize it:
> +       threadlet_queue_init(&myQueue, my_max_threads, my_min_threads);
> +       where my_max_threads is the maximum number of threads that can be in the
> +       thread pool and my_min_threads is the minimum number of active threads
> +       that can be in the thread-pool.
> +
> +     - Submit work:
> +       submit_threadletwork_to_queue(&myQueue, &my_work);
> +
> +     - Cancel work:
> +       cancel_threadletwork_on_queue(&myQueue, &my_work);
> diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> index dc5f8ec..b357449 100644
> --- a/posix-aio-compat.c
> +++ b/posix-aio-compat.c
> @@ -29,34 +29,11 @@
>  #include "block_int.h"
>
>  #include "block/raw-posix-aio.h"
> -#include "qemu-thread.h"
> -
> -#define MAX_GLOBAL_THREADS  64
> -#define MIN_GLOBAL_THREADS   8
> +#include "qemu-threadlets.h"
>
>  static QemuMutex aiocb_mutex;
>  static QemuCond aiocb_completion;
>
> -typedef struct ThreadletQueue
> -{
> -    QemuMutex lock;
> -    QemuCond cond;
> -    int max_threads;
> -    int min_threads;
> -    int cur_threads;
> -    int idle_threads;
> -    QTAILQ_HEAD(, ThreadletWork) request_list;
> -} ThreadletQueue;
> -
> -typedef struct ThreadletWork
> -{
> -    QTAILQ_ENTRY(ThreadletWork) node;
> -    void (*func)(struct ThreadletWork *work);
> -} ThreadletWork;
> -
> -static ThreadletQueue globalqueue;
> -static int globalqueue_init;
> -
>  struct qemu_paiocb {
>     BlockDriverAIOCB common;
>     int aio_fildes;
> @@ -83,154 +60,6 @@ typedef struct PosixAioState {
>     struct qemu_paiocb *first_aio;
>  } PosixAioState;
>
> -static void *threadlet_worker(void *data)
> -{
> -    ThreadletQueue *queue = data;
> -
> -    qemu_mutex_lock(&queue->lock);
> -    while (1) {
> -        ThreadletWork *work;
> -        int ret = 0;
> -
> -        while (QTAILQ_EMPTY(&queue->request_list) &&
> -               (ret != ETIMEDOUT)) {
> -            /* wait for cond to be signalled or broadcast for 1000s */
> -            ret = qemu_cond_timedwait((&queue->cond),
> -                                         &(queue->lock), 10*100000);
> -        }
> -
> -        assert(queue->idle_threads != 0);
> -        if (QTAILQ_EMPTY(&queue->request_list)) {
> -            if (queue->cur_threads > queue->min_threads) {
> -                /* We retain the minimum number of threads */
> -                break;
> -            }
> -        } else {
> -            work = QTAILQ_FIRST(&queue->request_list);
> -            QTAILQ_REMOVE(&queue->request_list, work, node);
> -
> -            queue->idle_threads--;
> -            qemu_mutex_unlock(&queue->lock);
> -
> -            /* execute the work function */
> -            work->func(work);
> -
> -            qemu_mutex_lock(&queue->lock);
> -            queue->idle_threads++;
> -        }
> -    }
> -
> -    queue->idle_threads--;
> -    queue->cur_threads--;
> -    qemu_mutex_unlock(&queue->lock);
> -
> -    return NULL;
> -}
> -
> -static void spawn_threadlet(ThreadletQueue *queue)
> -{
> -    QemuThread thread;
> -
> -    queue->cur_threads++;
> -    queue->idle_threads++;
> -
> -    qemu_thread_create(&thread, threadlet_worker, queue);
> -}
> -
> -/**
> - * submit_work_to_queue: Submit a new task to a private queue to be
> - *                            executed asynchronously.
> - * @queue: Per-subsystem private queue to which the new task needs
> - *         to be submitted.
> - * @work: Contains information about the task that needs to be submitted.
> - */
> -static void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
> -{
> -    qemu_mutex_lock(&queue->lock);
> -    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
> -        spawn_threadlet(queue);
> -    } else {
> -        qemu_cond_signal(&queue->cond);
> -    }
> -    QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
> -    qemu_mutex_unlock(&queue->lock);
> -}
> -
> -static void threadlet_queue_init(ThreadletQueue *queue,
> -                          int max_threads, int min_threads);
> -
> -/**
> - * submit_work: Submit to the global queue a new task to be executed
> - *                   asynchronously.
> - * @work: Contains information about the task that needs to be submitted.
> - */
> -static void submit_work(ThreadletWork *work)
> -{
> -    if (!globalqueue_init) {
> -        threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
> -                                MIN_GLOBAL_THREADS);
> -        globalqueue_init = 1;
> -    }
> -
> -    submit_work_to_queue(&globalqueue, work);
> -}
> -
> -/**
> - * dequeue_work_on_queue: Cancel a task queued on a Queue.
> - * @queue: The queue containing the task to be cancelled.
> - * @work: Contains the information of the task that needs to be cancelled.
> - *
> - * Returns: 0 if the task is successfully cancelled.
> - *          1 otherwise.
> - */
> -static int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
> -{
> -    ThreadletWork *ret_work;
> -    int ret = 1;
> -
> -    qemu_mutex_lock(&queue->lock);
> -    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
> -        if (ret_work == work) {
> -            QTAILQ_REMOVE(&queue->request_list, ret_work, node);
> -            ret = 0;
> -            break;
> -        }
> -    }
> -    qemu_mutex_unlock(&queue->lock);
> -
> -    return ret;
> -}
> -
> -/**
> - * dequeue_work: Cancel a task queued on the global queue.
> - * @work: Contains the information of the task that needs to be cancelled.
> - *
> - * Returns: 0 if the task is successfully cancelled.
> - *          1 otherwise.
> - */
> -static int dequeue_work(ThreadletWork *work)
> -{
> -    return dequeue_work_on_queue(&globalqueue, work);
> -}
> -
> -/**
> - * threadlet_queue_init: Initialize a threadlet queue.
> - * @queue: The threadlet queue to be initialized.
> - * @max_threads: Maximum number of threads processing the queue.
> - * @min_threads: Minimum number of threads processing the queue.
> - */
> -static void threadlet_queue_init(ThreadletQueue *queue,
> -                                    int max_threads, int min_threads)
> -{
> -    queue->cur_threads  = 0;
> -    queue->idle_threads = 0;
> -    queue->max_threads  = max_threads;
> -    queue->min_threads  = min_threads;
> -    QTAILQ_INIT(&queue->request_list);
> -    qemu_mutex_init(&queue->lock);
> -    qemu_cond_init(&queue->cond);
> -}
> -
>  #ifdef CONFIG_PREADV
>  static int preadv_present = 1;
>  #else
> diff --git a/qemu-threadlets.c b/qemu-threadlets.c
> new file mode 100644
> index 0000000..23b4ecf
> --- /dev/null
> +++ b/qemu-threadlets.c
> @@ -0,0 +1,168 @@
> +/*
> + * Threadlet support for offloading tasks to be executed asynchronously
> + *
> + * Copyright IBM, Corp. 2008
> + * Copyright IBM, Corp. 2010
> + *
> + * Authors:
> + *  Anthony Liguori     <aliguori@us.ibm.com>
> + *  Aneesh Kumar K.V    <aneesh.kumar@linux.vnet.ibm.com>
> + *  Gautham R Shenoy    <gautham.shenoy@gmail.com>
> + *  Arun R Bharadwaj    <arun@linux.vnet.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.  See
> + * the COPYING file in the top-level directory.
> + */
> +
> +#include "qemu-threadlets.h"
> +#include "osdep.h"
> +
> +#define MAX_GLOBAL_THREADS  64
> +#define MIN_GLOBAL_THREADS   8
> +static ThreadletQueue globalqueue;
> +static int globalqueue_init;
> +
> +static void *threadlet_worker(void *data)
> +{
> +    ThreadletQueue *queue = data;
> +
> +    qemu_mutex_lock(&queue->lock);
> +    while (1) {
> +        ThreadletWork *work;
> +        int ret = 0;
> +
> +        while (QTAILQ_EMPTY(&queue->request_list) &&
> +               (ret != ETIMEDOUT)) {
> +            /* wait for cond to be signalled or broadcast for 1000s */
> +            ret = qemu_cond_timedwait((&queue->cond),
> +                                         &(queue->lock), 10*100000);
> +        }
> +
> +        assert(queue->idle_threads != 0);
> +        if (QTAILQ_EMPTY(&queue->request_list)) {
> +            if (queue->cur_threads > queue->min_threads) {
> +                /* We retain the minimum number of threads */
> +                break;
> +            }
> +        } else {
> +            work = QTAILQ_FIRST(&queue->request_list);
> +            QTAILQ_REMOVE(&queue->request_list, work, node);
> +
> +            queue->idle_threads--;
> +            qemu_mutex_unlock(&queue->lock);
> +
> +            /* execute the work function */
> +            work->func(work);
> +
> +            qemu_mutex_lock(&queue->lock);
> +            queue->idle_threads++;
> +        }
> +    }
> +
> +    queue->idle_threads--;
> +    queue->cur_threads--;
> +    qemu_mutex_unlock(&queue->lock);
> +
> +    return NULL;
> +}
> +
> +static void spawn_threadlet(ThreadletQueue *queue)
> +{
> +    QemuThread thread;
> +
> +    queue->cur_threads++;
> +    queue->idle_threads++;
> +
> +    qemu_thread_create(&thread, threadlet_worker, queue);
> +}
> +
> +/**
> + * submit_work_to_queue: Submit a new task to a private queue to be
> + *                            executed asynchronously.
> + * @queue: Per-subsystem private queue to which the new task needs
> + *         to be submitted.
> + * @work: Contains information about the task that needs to be submitted.
> + */
> +void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
> +{
> +    qemu_mutex_lock(&queue->lock);
> +    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
> +        spawn_threadlet(queue);
> +    } else {
> +        qemu_cond_signal(&queue->cond);
> +    }
> +    QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
> +    qemu_mutex_unlock(&queue->lock);
> +}
> +
> +/**
> + * submit_work: Submit to the global queue a new task to be executed
> + *                   asynchronously.
> + * @work: Contains information about the task that needs to be submitted.
> + */
> +void submit_work(ThreadletWork *work)
> +{
> +    if (!globalqueue_init) {
> +        threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
> +                                MIN_GLOBAL_THREADS);
> +        globalqueue_init = 1;
> +    }
> +
> +    submit_work_to_queue(&globalqueue, work);
> +}
> +
> +/**
> + * dequeue_work_on_queue: Cancel a task queued on a Queue.
> + * @queue: The queue containing the task to be cancelled.
> + * @work: Contains the information of the task that needs to be cancelled.
> + *
> + * Returns: 0 if the task is successfully cancelled.
> + *          1 otherwise.
> + */
> +int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
> +{
> +    ThreadletWork *ret_work;
> +    int ret = 1;
> +
> +    qemu_mutex_lock(&queue->lock);
> +    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
> +        if (ret_work == work) {
> +            QTAILQ_REMOVE(&queue->request_list, ret_work, node);
> +            ret = 0;
> +            break;
> +        }
> +    }
> +    qemu_mutex_unlock(&queue->lock);
> +
> +    return ret;
> +}
> +
> +/**
> + * dequeue_work: Cancel a task queued on the global queue.
> + * @work: Contains the information of the task that needs to be cancelled.
> + *
> + * Returns: 0 if the task is successfully cancelled.
> + *          1 otherwise.
> + */
> +int dequeue_work(ThreadletWork *work)
> +{
> +    return dequeue_work_on_queue(&globalqueue, work);
> +}
> +
> +/**
> + * threadlet_queue_init: Initialize a threadlet queue.
> + * @queue: The threadlet queue to be initialized.
> + * @max_threads: Maximum number of threads processing the queue.
> + * @min_threads: Minimum number of threads processing the queue.
> + */
> +void threadlet_queue_init(ThreadletQueue *queue,
> +                                    int max_threads, int min_threads)
> +{
> +    queue->cur_threads  = 0;
> +    queue->idle_threads = 0;
> +    queue->max_threads  = max_threads;
> +    queue->min_threads  = min_threads;
> +    QTAILQ_INIT(&queue->request_list);
> +    qemu_mutex_init(&queue->lock);
> +    qemu_cond_init(&queue->cond);
> +}
> diff --git a/qemu-threadlets.h b/qemu-threadlets.h
> new file mode 100644
> index 0000000..f869304
> --- /dev/null
> +++ b/qemu-threadlets.h
> @@ -0,0 +1,48 @@
> +/*
> + * Threadlet support for offloading tasks to be executed asynchronously
> + *
> + * Copyright IBM, Corp. 2008
> + * Copyright IBM, Corp. 2010
> + *
> + * Authors:
> + *  Anthony Liguori     <aliguori@us.ibm.com>
> + *  Aneesh Kumar K.V    <aneesh.kumar@linux.vnet.ibm.com>
> + *  Gautham R Shenoy    <gautham.shenoy@gmail.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.  See
> + * the COPYING file in the top-level directory.
> + */
> +
> +#ifndef QEMU_ASYNC_WORK_H
> +#define QEMU_ASYNC_WORK_H
> +
> +#include "qemu-queue.h"
> +#include "qemu-common.h"
> +#include "qemu-thread.h"
> +
> +typedef struct ThreadletQueue
> +{
> +    QemuMutex lock;
> +    QemuCond cond;
> +    int max_threads;
> +    int min_threads;
> +    int cur_threads;
> +    int idle_threads;
> +    QTAILQ_HEAD(, ThreadletWork) request_list;
> +} ThreadletQueue;
> +
> +typedef struct ThreadletWork
> +{
> +    QTAILQ_ENTRY(ThreadletWork) node;
> +    void (*func)(struct ThreadletWork *work);
> +} ThreadletWork;
> +
> +extern void submit_work_to_queue(ThreadletQueue *queue,
> +                                 ThreadletWork *work);
> +extern void submit_work(ThreadletWork *work);
> +extern int dequeue_work_on_queue(ThreadletQueue *queue,
> +                                 ThreadletWork *work);
> +extern int dequeue_work(ThreadletWork *work);
> +extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
> +                                 int min_threads);

Please remove 'extern' qualifiers, they are useless for function
declarations. In most cases they are not used in QEMU.

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure
  2010-11-10 13:45   ` Stefan Hajnoczi
@ 2010-11-10 17:54     ` Arun R Bharadwaj
  2010-11-10 20:13       ` Stefan Hajnoczi
  2010-11-11  8:41     ` [Qemu-devel] " Paolo Bonzini
  1 sibling, 1 reply; 16+ messages in thread
From: Arun R Bharadwaj @ 2010-11-10 17:54 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: qemu-devel

* Stefan Hajnoczi <stefanha@gmail.com> [2010-11-10 13:45:29]:

> On Wed, Nov 10, 2010 at 1:19 PM, Arun R Bharadwaj
> <arun@linux.vnet.ibm.com> wrote:
> > @@ -301,106 +431,58 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
> >     return nbytes;
> >  }
> >
> > -static void *aio_thread(void *unused)
> > +static void aio_thread(ThreadletWork *work)
> >  {
> >     pid_t pid;
> > +    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
> > +    ssize_t ret = 0;
> >
> >     pid = getpid();
> >
> > -    while (1) {
> > -        struct qemu_paiocb *aiocb;
> > -        ssize_t ret = 0;
> > -        qemu_timeval tv;
> > -        struct timespec ts;
> > -
> > -        qemu_gettimeofday(&tv);
> > -        ts.tv_sec = tv.tv_sec + 10;
> > -        ts.tv_nsec = 0;
> > -
> > -        mutex_lock(&lock);
> > -
> > -        while (QTAILQ_EMPTY(&request_list) &&
> > -               !(ret == ETIMEDOUT)) {
> > -            ret = cond_timedwait(&cond, &lock, &ts);
> > -        }
> > -
> > -        if (QTAILQ_EMPTY(&request_list))
> > -            break;
> > -
> > -        aiocb = QTAILQ_FIRST(&request_list);
> > -        QTAILQ_REMOVE(&request_list, aiocb, node);
> > -        aiocb->active = 1;
> > -        idle_threads--;
> > -        mutex_unlock(&lock);
> > -
> > -        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> > -        case QEMU_AIO_READ:
> > -        case QEMU_AIO_WRITE:
> > -            ret = handle_aiocb_rw(aiocb);
> > -            break;
> > -        case QEMU_AIO_FLUSH:
> > -            ret = handle_aiocb_flush(aiocb);
> > -            break;
> > -        case QEMU_AIO_IOCTL:
> > -            ret = handle_aiocb_ioctl(aiocb);
> > -            break;
> > -        default:
> > -            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> > -            ret = -EINVAL;
> > -            break;
> > -        }
> > -
> > -        mutex_lock(&lock);
> > -        aiocb->ret = ret;
> > -        idle_threads++;
> > -        mutex_unlock(&lock);
> > -
> > -        if (kill(pid, aiocb->ev_signo)) die("kill failed");
> > +    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> > +    case QEMU_AIO_READ:
> > +    case QEMU_AIO_WRITE:
> > +        ret = handle_aiocb_rw(aiocb);
> > +        break;
> > +    case QEMU_AIO_FLUSH:
> > +        ret = handle_aiocb_flush(aiocb);
> > +        break;
> > +    case QEMU_AIO_IOCTL:
> > +        ret = handle_aiocb_ioctl(aiocb);
> > +        break;
> > +    default:
> > +        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> > +        ret = -EINVAL;
> > +        break;
> >     }
> >
> > -    idle_threads--;
> > -    cur_threads--;
> > -    mutex_unlock(&lock);
> > -
> > -    return NULL;
> > -}
> > -
> > -static void spawn_thread(void)
> > -{
> > -    sigset_t set, oldset;
> > -
> > -    cur_threads++;
> > -    idle_threads++;
> > +    qemu_mutex_lock(&aiocb_mutex);
> > +    aiocb->ret = ret;
> 
> This is where qemu_cond_broadcast() is needed.
> 
> > +    qemu_mutex_unlock(&aiocb_mutex);
> >
> > -    /* block all signals */
> > -    if (sigfillset(&set)) die("sigfillset");
> > -    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
> > -
> > -    thread_create(&thread_id, &attr, aio_thread, NULL);
> > -
> > -    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
> > +    if (kill(pid, aiocb->ev_signo)) {
> > +        die("kill failed");
> > +    }
> >  }
> >
> 
> I wonder if the condition variable has a measurable performance
> overhead.  We unconditionally broadcast on paiocb completion.  One
> idea would be to keep a counter of waiters (should only ever be 0 or
> 1) protected by aiocb_mutex and broadcast only when there is a waiter.
>  I just want to share this idea, I don't know if it's necessary to
> implement it or if it could even work without a race condition.
> 

I did not understand exactly why we are going to see a performane hit.
We will be doing a broadcast only after the aio_thread has finished
the work right? So how is this going to affect performance even if we
do a useless broadcast?

-arun
> >     }
> >
> >     paio_remove(acb);
> > @@ -618,11 +692,12 @@ int paio_init(void)
> >     struct sigaction act;
> >     PosixAioState *s;
> >     int fds[2];
> > -    int ret;
> >
> >     if (posix_aio_state)
> >         return 0;
> >
> > +    qemu_mutex_init(&aiocb_mutex);
> 
> Also needs qemu_cond_init(&aiocb_completion).
> 
> Stefan
> 

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c
  2010-11-10 17:29   ` Blue Swirl
@ 2010-11-10 17:56     ` Arun R Bharadwaj
  0 siblings, 0 replies; 16+ messages in thread
From: Arun R Bharadwaj @ 2010-11-10 17:56 UTC (permalink / raw)
  To: Blue Swirl; +Cc: qemu-devel

* Blue Swirl <blauwirbel@gmail.com> [2010-11-10 17:29:30]:

> On Wed, Nov 10, 2010 at 1:19 PM, Arun R Bharadwaj
> <arun@linux.vnet.ibm.com> wrote:
> > The reason for creating this generic infrastructure is so that other subsystems,
> > such as virtio-9p could make use of it for offloading tasks that could block.
> >
> > Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
> > Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
> > Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com>
> > Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
> > ---
> >  Makefile.objs          |    1
> >  docs/async-support.txt |  141 +++++++++++++++++++++++++++++++++++++++
> >  posix-aio-compat.c     |  173 ------------------------------------------------
> >  qemu-threadlets.c      |  168 +++++++++++++++++++++++++++++++++++++++++++++++
> >  qemu-threadlets.h      |   48 +++++++++++++
> >  5 files changed, 359 insertions(+), 172 deletions(-)
> >  create mode 100644 docs/async-support.txt
> >  create mode 100644 qemu-threadlets.c
> >  create mode 100644 qemu-threadlets.h
> >
> > diff --git a/qemu-threadlets.h b/qemu-threadlets.h
> > new file mode 100644
> > index 0000000..f869304
> > --- /dev/null
> > +++ b/qemu-threadlets.h
> > @@ -0,0 +1,48 @@
> > +/*
> > + * Threadlet support for offloading tasks to be executed asynchronously
> > + *
> > + * Copyright IBM, Corp. 2008
> > + * Copyright IBM, Corp. 2010
> > + *
> > + * Authors:
> > + *  Anthony Liguori     <aliguori@us.ibm.com>
> > + *  Aneesh Kumar K.V    <aneesh.kumar@linux.vnet.ibm.com>
> > + *  Gautham R Shenoy    <gautham.shenoy@gmail.com>
> > + *
> > + * This work is licensed under the terms of the GNU GPL, version 2.  See
> > + * the COPYING file in the top-level directory.
> > + */
> > +
> > +#ifndef QEMU_ASYNC_WORK_H
> > +#define QEMU_ASYNC_WORK_H
> > +
> > +#include "qemu-queue.h"
> > +#include "qemu-common.h"
> > +#include "qemu-thread.h"
> > +
> > +typedef struct ThreadletQueue
> > +{
> > +    QemuMutex lock;
> > +    QemuCond cond;
> > +    int max_threads;
> > +    int min_threads;
> > +    int cur_threads;
> > +    int idle_threads;
> > +    QTAILQ_HEAD(, ThreadletWork) request_list;
> > +} ThreadletQueue;
> > +
> > +typedef struct ThreadletWork
> > +{
> > +    QTAILQ_ENTRY(ThreadletWork) node;
> > +    void (*func)(struct ThreadletWork *work);
> > +} ThreadletWork;
> > +
> > +extern void submit_work_to_queue(ThreadletQueue *queue,
> > +                                 ThreadletWork *work);
> > +extern void submit_work(ThreadletWork *work);
> > +extern int dequeue_work_on_queue(ThreadletQueue *queue,
> > +                                 ThreadletWork *work);
> > +extern int dequeue_work(ThreadletWork *work);
> > +extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
> > +                                 int min_threads);
> 
> Please remove 'extern' qualifiers, they are useless for function
> declarations. In most cases they are not used in QEMU.

Will change these. Thanks for the review.

-arun

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure
  2010-11-10 17:54     ` Arun R Bharadwaj
@ 2010-11-10 20:13       ` Stefan Hajnoczi
  0 siblings, 0 replies; 16+ messages in thread
From: Stefan Hajnoczi @ 2010-11-10 20:13 UTC (permalink / raw)
  To: arun; +Cc: qemu-devel

On Wed, Nov 10, 2010 at 5:54 PM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> * Stefan Hajnoczi <stefanha@gmail.com> [2010-11-10 13:45:29]:
>> On Wed, Nov 10, 2010 at 1:19 PM, Arun R Bharadwaj
>> <arun@linux.vnet.ibm.com> wrote:
>> I wonder if the condition variable has a measurable performance
>> overhead.  We unconditionally broadcast on paiocb completion.  One
>> idea would be to keep a counter of waiters (should only ever be 0 or
>> 1) protected by aiocb_mutex and broadcast only when there is a waiter.
>>  I just want to share this idea, I don't know if it's necessary to
>> implement it or if it could even work without a race condition.
>>
>
> I did not understand exactly why we are going to see a performane hit.
> We will be doing a broadcast only after the aio_thread has finished
> the work right? So how is this going to affect performance even if we
> do a useless broadcast?

If aio_thread() broadcasts before raising the signal then POSIX aio
request completion is delayed by the time it takes to broadcast.  I
don't know if it matters though.

Stefan

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c
  2010-11-10 13:47   ` Stefan Hajnoczi
@ 2010-11-11  8:18     ` Arun R Bharadwaj
  0 siblings, 0 replies; 16+ messages in thread
From: Arun R Bharadwaj @ 2010-11-11  8:18 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: qemu-devel

* Stefan Hajnoczi <stefanha@gmail.com> [2010-11-10 13:47:22]:

> On Wed, Nov 10, 2010 at 1:19 PM, Arun R Bharadwaj
> <arun@linux.vnet.ibm.com> wrote:
> > The reason for creating this generic infrastructure is so that other subsystems,
> > such as virtio-9p could make use of it for offloading tasks that could block.
> >
> > Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
> > Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
> > Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com>
> > Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
> > ---
> >  Makefile.objs          |    1
> >  docs/async-support.txt |  141 +++++++++++++++++++++++++++++++++++++++
> >  posix-aio-compat.c     |  173 ------------------------------------------------
> >  qemu-threadlets.c      |  168 +++++++++++++++++++++++++++++++++++++++++++++++
> >  qemu-threadlets.h      |   48 +++++++++++++
> >  5 files changed, 359 insertions(+), 172 deletions(-)
> >  create mode 100644 docs/async-support.txt
> >  create mode 100644 qemu-threadlets.c
> >  create mode 100644 qemu-threadlets.h
> 
> Looks good.
> 
> Stefan

Thanks Stefan, I will add your Acked-by to this patch.

-arun

^ permalink raw reply	[flat|nested] 16+ messages in thread

* [Qemu-devel] Re: [PATCH 1/3] Make paio subsystem use threadlets infrastructure
  2010-11-10 13:45   ` Stefan Hajnoczi
  2010-11-10 17:54     ` Arun R Bharadwaj
@ 2010-11-11  8:41     ` Paolo Bonzini
  2010-11-11  9:26       ` Stefan Hajnoczi
  1 sibling, 1 reply; 16+ messages in thread
From: Paolo Bonzini @ 2010-11-11  8:41 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: Arun R Bharadwaj, qemu-devel

On 11/10/2010 02:45 PM, Stefan Hajnoczi wrote:
> I wonder if the condition variable has a measurable performance
> overhead.  We unconditionally broadcast on paiocb completion.  One
> idea would be to keep a counter of waiters (should only ever be 0 or
> 1) protected by aiocb_mutex and broadcast only when there is a waiter.

This should be handled anyway by the pthreads library.  If we are sure 
there is only one waiter at most, you can change broadcast to signal but 
that would already be microoptimization.

Alternatively, you can use futexes to implement a completion signal 
(similar to Win32 events).  But it seems too much effort.

Paolo

^ permalink raw reply	[flat|nested] 16+ messages in thread

* [Qemu-devel] Re: [PATCH 1/3] Make paio subsystem use threadlets infrastructure
  2010-11-11  8:41     ` [Qemu-devel] " Paolo Bonzini
@ 2010-11-11  9:26       ` Stefan Hajnoczi
  0 siblings, 0 replies; 16+ messages in thread
From: Stefan Hajnoczi @ 2010-11-11  9:26 UTC (permalink / raw)
  To: Paolo Bonzini; +Cc: Arun R Bharadwaj, qemu-devel

On Thu, Nov 11, 2010 at 8:41 AM, Paolo Bonzini <pbonzini@redhat.com> wrote:
> On 11/10/2010 02:45 PM, Stefan Hajnoczi wrote:
>>
>> I wonder if the condition variable has a measurable performance
>> overhead.  We unconditionally broadcast on paiocb completion.  One
>> idea would be to keep a counter of waiters (should only ever be 0 or
>> 1) protected by aiocb_mutex and broadcast only when there is a waiter.
>
> This should be handled anyway by the pthreads library.  If we are sure there
> is only one waiter at most, you can change broadcast to signal but that
> would already be microoptimization.
>
> Alternatively, you can use futexes to implement a completion signal (similar
> to Win32 events).  But it seems too much effort.

Yes, let's use standard cond vars for now.

Stefan

^ permalink raw reply	[flat|nested] 16+ messages in thread

* [Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c
  2010-11-15 17:53 [Qemu-devel] [PATCH 0/3] v11: Threadlets: A generic task offloading framework Arun R Bharadwaj
@ 2010-11-15 17:53 ` Arun R Bharadwaj
  0 siblings, 0 replies; 16+ messages in thread
From: Arun R Bharadwaj @ 2010-11-15 17:53 UTC (permalink / raw)
  To: qemu-devel; +Cc: kwolf

The reason for creating this generic infrastructure is so that other subsystems,
such as virtio-9p could make use of it for offloading tasks that could block.

Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>

Acked-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
 Makefile.objs          |    1 
 docs/async-support.txt |  141 +++++++++++++++++++++++++++++++++++++++
 posix-aio-compat.c     |  173 ------------------------------------------------
 qemu-threadlets.c      |  168 +++++++++++++++++++++++++++++++++++++++++++++++
 qemu-threadlets.h      |   46 +++++++++++++
 5 files changed, 357 insertions(+), 172 deletions(-)
 create mode 100644 docs/async-support.txt
 create mode 100644 qemu-threadlets.c
 create mode 100644 qemu-threadlets.h

diff --git a/Makefile.objs b/Makefile.objs
index 3b7ec27..2cf8aba 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -10,6 +10,7 @@ qobject-obj-y += qerror.o
 block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
 block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
 block-obj-$(CONFIG_POSIX) += qemu-thread.o
+block-obj-$(CONFIG_POSIX) += qemu-threadlets.o
 block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 
diff --git a/docs/async-support.txt b/docs/async-support.txt
new file mode 100644
index 0000000..9f22b9a
--- /dev/null
+++ b/docs/async-support.txt
@@ -0,0 +1,141 @@
+== How to use the threadlets infrastructure supported in Qemu ==
+
+== Threadlets ==
+
+Q.1: What are threadlets ?
+A.1: Threadlets is an infrastructure within QEMU that allows other subsystems
+     to offload possibly blocking work to a queue to be processed by a pool
+     of threads asynchronously.
+
+Q.2: When would one want to use threadlets ?
+A.2: Threadlets are useful when there are operations that can be performed
+     outside the context of the VCPU/IO threads inorder to free these latter
+     to service any other guest requests.
+
+Q.3: I have some work that can be executed in an asynchronous context. How
+     should I go about it ?
+A.3: One could follow the steps listed below:
+
+     - Define a function which would do the asynchronous work.
+	static void my_threadlet_func(ThreadletWork *work)
+	{
+	}
+
+     - Declare an object of type ThreadletWork;
+	ThreadletWork work;
+
+
+     - Assign a value to the "func" member of ThreadletWork object.
+	work.func = my_threadlet_func;
+
+     - Submit the threadlet to the global queue.
+	submit_threadletwork(&work);
+
+     - Continue servicing some other guest operations.
+
+Q.4: I want to my_threadlet_func to access some non-global data. How do I do
+     that ?
+A.4: Suppose you want my_threadlet_func to access some non-global data-object
+     of type myPrivateData. In that case one could follow the following steps.
+
+     - Define a member of the type ThreadletWork within myPrivateData.
+	typedef struct MyPrivateData {
+		...;
+		...;
+		...;
+		ThreadletWork work;
+	} MyPrivateData;
+
+	MyPrivateData my_data;
+
+     - Initialize myData.work as described in A.3
+	myData.work.func = my_threadlet_func;
+	submit_threadletwork(&myData.work);
+
+     - Access the myData object inside my_threadlet_func() using container_of
+       primitive
+	static void my_threadlet_func(ThreadletWork *work)
+	{
+		myPrivateData *mydata_ptr;
+		mydata_ptr = container_of(work, myPrivateData, work);
+
+		/* mydata_ptr now points to myData object */
+	}
+
+Q.5: Are there any precautions one must take while sharing data with the
+     Asynchronous thread-pool ?
+A.5: Yes, make sure that the helper function of the type my_threadlet_func()
+     does not access/modify data when it can be accessed or modified in the
+     context of VCPU thread or IO thread. This is because the asynchronous
+     threads in the pool can run in parallel with the VCPU/IOThreads as shown
+     in the figure.
+
+     A typical workflow is as follows:
+
+              VCPU/IOThread
+                   |
+                   | (1)
+                   |
+                   V
+                Offload work              (2)
+      |-------> to threadlets -----------------------------> Helper thread
+      |            |                                               |
+      |            |                                               |
+      |            | (3)                                           | (4)
+      |            |                                               |
+      |         Handle other Guest requests                        |
+      |            |                                               |
+      |            |                                               V
+      |            | (3)                                  Signal the I/O Thread
+      |(6)         |                                               |
+      |            |                                              /
+      |            |                                             /
+      |            V                                            /
+      |          Do the post <---------------------------------/
+      |          processing               (5)
+      |            |
+      |            | (6)
+      |            V
+      |-Yes------ More async work?
+                   |
+                   | (7)
+	           No
+                   |
+                   |
+                   .
+                   .
+
+    Hence one needs to make sure that in the steps (3) and (4) which run in
+    parallel, any global data is accessed within only one context.
+
+Q.6: I have queued a threadlet which I want to cancel. How do I do that ?
+A.6: Threadlets framework provides the API cancel_threadlet:
+       - int cancel_threadletwork(ThreadletWork *work)
+
+     The API scans the ThreadletQueue to see if (work) is present. If it finds
+     work, it'll dequeue work and return 0.
+
+     On the other hand, if it does not find the (work) in the ThreadletQueue,
+     then it'll return 1. This can imply two things. Either the work is being
+     processed by one of the helper threads or it has been processed. The
+     threadlet infrastructure currently _does_not_ distinguish between these
+     two and the onus is on the caller to do that.
+
+Q.7: Apart from the global pool of threads, can I have my own private Queue ?
+A.7: Yes, the threadlets framework allows subsystems to create their own private
+     queues with associated pools of threads.
+
+     - Define a PrivateQueue
+       ThreadletQueue myQueue;
+
+     - Initialize it:
+       threadlet_queue_init(&myQueue, my_max_threads, my_min_threads);
+       where my_max_threads is the maximum number of threads that can be in the
+       thread pool and my_min_threads is the minimum number of active threads
+       that can be in the thread-pool.
+
+     - Submit work:
+       submit_threadletwork_to_queue(&myQueue, &my_work);
+
+     - Cancel work:
+       cancel_threadletwork_on_queue(&myQueue, &my_work);
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index e1812fc..d8cae6d 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,34 +29,11 @@
 #include "block_int.h"
 
 #include "block/raw-posix-aio.h"
-#include "qemu-thread.h"
-
-#define MAX_GLOBAL_THREADS  64
-#define MIN_GLOBAL_THREADS   8
+#include "qemu-threadlets.h"
 
 static QemuMutex aiocb_mutex;
 static QemuCond aiocb_completion;
 
-typedef struct ThreadletQueue
-{
-    QemuMutex lock;
-    QemuCond cond;
-    int max_threads;
-    int min_threads;
-    int cur_threads;
-    int idle_threads;
-    QTAILQ_HEAD(, ThreadletWork) request_list;
-} ThreadletQueue;
-
-typedef struct ThreadletWork
-{
-    QTAILQ_ENTRY(ThreadletWork) node;
-    void (*func)(struct ThreadletWork *work);
-} ThreadletWork;
-
-static ThreadletQueue globalqueue;
-static int globalqueue_init;
-
 struct qemu_paiocb {
     BlockDriverAIOCB common;
     int aio_fildes;
@@ -83,154 +60,6 @@ typedef struct PosixAioState {
     struct qemu_paiocb *first_aio;
 } PosixAioState;
 
-static void *threadlet_worker(void *data)
-{
-    ThreadletQueue *queue = data;
-
-    qemu_mutex_lock(&queue->lock);
-    while (1) {
-        ThreadletWork *work;
-        int ret = 0;
-
-        while (QTAILQ_EMPTY(&queue->request_list) &&
-               (ret != ETIMEDOUT)) {
-            /* wait for cond to be signalled or broadcast for 1000s */
-            ret = qemu_cond_timedwait((&queue->cond),
-                                         &(queue->lock), 10*100000);
-        }
-
-        assert(queue->idle_threads != 0);
-        if (QTAILQ_EMPTY(&queue->request_list)) {
-            if (queue->cur_threads > queue->min_threads) {
-                /* We retain the minimum number of threads */
-                break;
-            }
-        } else {
-            work = QTAILQ_FIRST(&queue->request_list);
-            QTAILQ_REMOVE(&queue->request_list, work, node);
-
-            queue->idle_threads--;
-            qemu_mutex_unlock(&queue->lock);
-
-            /* execute the work function */
-            work->func(work);
-
-            qemu_mutex_lock(&queue->lock);
-            queue->idle_threads++;
-        }
-    }
-
-    queue->idle_threads--;
-    queue->cur_threads--;
-    qemu_mutex_unlock(&queue->lock);
-
-    return NULL;
-}
-
-static void spawn_threadlet(ThreadletQueue *queue)
-{
-    QemuThread thread;
-
-    queue->cur_threads++;
-    queue->idle_threads++;
-
-    qemu_thread_create(&thread, threadlet_worker, queue);
-}
-
-/**
- * submit_work_to_queue: Submit a new task to a private queue to be
- *                            executed asynchronously.
- * @queue: Per-subsystem private queue to which the new task needs
- *         to be submitted.
- * @work: Contains information about the task that needs to be submitted.
- */
-static void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
-{
-    qemu_mutex_lock(&queue->lock);
-    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
-        spawn_threadlet(queue);
-    } else {
-        qemu_cond_signal(&queue->cond);
-    }
-    QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
-    qemu_mutex_unlock(&queue->lock);
-}
-
-static void threadlet_queue_init(ThreadletQueue *queue,
-                          int max_threads, int min_threads);
-
-/**
- * submit_work: Submit to the global queue a new task to be executed
- *                   asynchronously.
- * @work: Contains information about the task that needs to be submitted.
- */
-static void submit_work(ThreadletWork *work)
-{
-    if (!globalqueue_init) {
-        threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
-                                MIN_GLOBAL_THREADS);
-        globalqueue_init = 1;
-    }
-
-    submit_work_to_queue(&globalqueue, work);
-}
-
-/**
- * dequeue_work_on_queue: Cancel a task queued on a Queue.
- * @queue: The queue containing the task to be cancelled.
- * @work: Contains the information of the task that needs to be cancelled.
- *
- * Returns: 0 if the task is successfully cancelled.
- *          1 otherwise.
- */
-static int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
-{
-    ThreadletWork *ret_work;
-    int ret = 1;
-
-    qemu_mutex_lock(&queue->lock);
-    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
-        if (ret_work == work) {
-            QTAILQ_REMOVE(&queue->request_list, ret_work, node);
-            ret = 0;
-            break;
-        }
-    }
-    qemu_mutex_unlock(&queue->lock);
-
-    return ret;
-}
-
-/**
- * dequeue_work: Cancel a task queued on the global queue.
- * @work: Contains the information of the task that needs to be cancelled.
- *
- * Returns: 0 if the task is successfully cancelled.
- *          1 otherwise.
- */
-static int dequeue_work(ThreadletWork *work)
-{
-    return dequeue_work_on_queue(&globalqueue, work);
-}
-
-/**
- * threadlet_queue_init: Initialize a threadlet queue.
- * @queue: The threadlet queue to be initialized.
- * @max_threads: Maximum number of threads processing the queue.
- * @min_threads: Minimum number of threads processing the queue.
- */
-static void threadlet_queue_init(ThreadletQueue *queue,
-                                    int max_threads, int min_threads)
-{
-    queue->cur_threads  = 0;
-    queue->idle_threads = 0;
-    queue->max_threads  = max_threads;
-    queue->min_threads  = min_threads;
-    QTAILQ_INIT(&queue->request_list);
-    qemu_mutex_init(&queue->lock);
-    qemu_cond_init(&queue->cond);
-}
-
 #ifdef CONFIG_PREADV
 static int preadv_present = 1;
 #else
diff --git a/qemu-threadlets.c b/qemu-threadlets.c
new file mode 100644
index 0000000..23b4ecf
--- /dev/null
+++ b/qemu-threadlets.c
@@ -0,0 +1,168 @@
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Anthony Liguori     <aliguori@us.ibm.com>
+ *  Aneesh Kumar K.V    <aneesh.kumar@linux.vnet.ibm.com>
+ *  Gautham R Shenoy    <gautham.shenoy@gmail.com>
+ *  Arun R Bharadwaj    <arun@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ */
+
+#include "qemu-threadlets.h"
+#include "osdep.h"
+
+#define MAX_GLOBAL_THREADS  64
+#define MIN_GLOBAL_THREADS   8
+static ThreadletQueue globalqueue;
+static int globalqueue_init;
+
+static void *threadlet_worker(void *data)
+{
+    ThreadletQueue *queue = data;
+
+    qemu_mutex_lock(&queue->lock);
+    while (1) {
+        ThreadletWork *work;
+        int ret = 0;
+
+        while (QTAILQ_EMPTY(&queue->request_list) &&
+               (ret != ETIMEDOUT)) {
+            /* wait for cond to be signalled or broadcast for 1000s */
+            ret = qemu_cond_timedwait((&queue->cond),
+                                         &(queue->lock), 10*100000);
+        }
+
+        assert(queue->idle_threads != 0);
+        if (QTAILQ_EMPTY(&queue->request_list)) {
+            if (queue->cur_threads > queue->min_threads) {
+                /* We retain the minimum number of threads */
+                break;
+            }
+        } else {
+            work = QTAILQ_FIRST(&queue->request_list);
+            QTAILQ_REMOVE(&queue->request_list, work, node);
+
+            queue->idle_threads--;
+            qemu_mutex_unlock(&queue->lock);
+
+            /* execute the work function */
+            work->func(work);
+
+            qemu_mutex_lock(&queue->lock);
+            queue->idle_threads++;
+        }
+    }
+
+    queue->idle_threads--;
+    queue->cur_threads--;
+    qemu_mutex_unlock(&queue->lock);
+
+    return NULL;
+}
+
+static void spawn_threadlet(ThreadletQueue *queue)
+{
+    QemuThread thread;
+
+    queue->cur_threads++;
+    queue->idle_threads++;
+
+    qemu_thread_create(&thread, threadlet_worker, queue);
+}
+
+/**
+ * submit_work_to_queue: Submit a new task to a private queue to be
+ *                            executed asynchronously.
+ * @queue: Per-subsystem private queue to which the new task needs
+ *         to be submitted.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
+{
+    qemu_mutex_lock(&queue->lock);
+    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+        spawn_threadlet(queue);
+    } else {
+        qemu_cond_signal(&queue->cond);
+    }
+    QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
+    qemu_mutex_unlock(&queue->lock);
+}
+
+/**
+ * submit_work: Submit to the global queue a new task to be executed
+ *                   asynchronously.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_work(ThreadletWork *work)
+{
+    if (!globalqueue_init) {
+        threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
+                                MIN_GLOBAL_THREADS);
+        globalqueue_init = 1;
+    }
+
+    submit_work_to_queue(&globalqueue, work);
+}
+
+/**
+ * dequeue_work_on_queue: Cancel a task queued on a Queue.
+ * @queue: The queue containing the task to be cancelled.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ *          1 otherwise.
+ */
+int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
+{
+    ThreadletWork *ret_work;
+    int ret = 1;
+
+    qemu_mutex_lock(&queue->lock);
+    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
+        if (ret_work == work) {
+            QTAILQ_REMOVE(&queue->request_list, ret_work, node);
+            ret = 0;
+            break;
+        }
+    }
+    qemu_mutex_unlock(&queue->lock);
+
+    return ret;
+}
+
+/**
+ * dequeue_work: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ *          1 otherwise.
+ */
+int dequeue_work(ThreadletWork *work)
+{
+    return dequeue_work_on_queue(&globalqueue, work);
+}
+
+/**
+ * threadlet_queue_init: Initialize a threadlet queue.
+ * @queue: The threadlet queue to be initialized.
+ * @max_threads: Maximum number of threads processing the queue.
+ * @min_threads: Minimum number of threads processing the queue.
+ */
+void threadlet_queue_init(ThreadletQueue *queue,
+                                    int max_threads, int min_threads)
+{
+    queue->cur_threads  = 0;
+    queue->idle_threads = 0;
+    queue->max_threads  = max_threads;
+    queue->min_threads  = min_threads;
+    QTAILQ_INIT(&queue->request_list);
+    qemu_mutex_init(&queue->lock);
+    qemu_cond_init(&queue->cond);
+}
diff --git a/qemu-threadlets.h b/qemu-threadlets.h
new file mode 100644
index 0000000..39461a5
--- /dev/null
+++ b/qemu-threadlets.h
@@ -0,0 +1,46 @@
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Anthony Liguori     <aliguori@us.ibm.com>
+ *  Aneesh Kumar K.V    <aneesh.kumar@linux.vnet.ibm.com>
+ *  Gautham R Shenoy    <gautham.shenoy@gmail.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ */
+
+#ifndef QEMU_ASYNC_WORK_H
+#define QEMU_ASYNC_WORK_H
+
+#include "qemu-queue.h"
+#include "qemu-common.h"
+#include "qemu-thread.h"
+
+typedef struct ThreadletQueue
+{
+    QemuMutex lock;
+    QemuCond cond;
+    int max_threads;
+    int min_threads;
+    int cur_threads;
+    int idle_threads;
+    QTAILQ_HEAD(, ThreadletWork) request_list;
+} ThreadletQueue;
+
+typedef struct ThreadletWork
+{
+    QTAILQ_ENTRY(ThreadletWork) node;
+    void (*func)(struct ThreadletWork *work);
+} ThreadletWork;
+
+void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work);
+void submit_work(ThreadletWork *work);
+int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work);
+int dequeue_work(ThreadletWork *work);
+void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
+                          int min_threads);
+#endif

^ permalink raw reply related	[flat|nested] 16+ messages in thread

end of thread, other threads:[~2010-11-15 17:54 UTC | newest]

Thread overview: 16+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2010-11-10 13:19 [Qemu-devel] v10: [PATCH 0/3] Threadlets: A generic task offloading framework Arun R Bharadwaj
2010-11-10 13:19 ` [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure Arun R Bharadwaj
2010-11-10 13:45   ` Stefan Hajnoczi
2010-11-10 17:54     ` Arun R Bharadwaj
2010-11-10 20:13       ` Stefan Hajnoczi
2010-11-11  8:41     ` [Qemu-devel] " Paolo Bonzini
2010-11-11  9:26       ` Stefan Hajnoczi
2010-11-10 13:19 ` [Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c Arun R Bharadwaj
2010-11-10 13:47   ` Stefan Hajnoczi
2010-11-11  8:18     ` Arun R Bharadwaj
2010-11-10 17:29   ` Blue Swirl
2010-11-10 17:56     ` Arun R Bharadwaj
2010-11-10 13:19 ` [Qemu-devel] [PATCH 3/3] Add helper functions to enable virtio-9p make use of the threadlets Arun R Bharadwaj
2010-11-10 13:54   ` Stefan Hajnoczi
  -- strict thread matches above, loose matches on Subject: below --
2010-11-15 17:53 [Qemu-devel] [PATCH 0/3] v11: Threadlets: A generic task offloading framework Arun R Bharadwaj
2010-11-15 17:53 ` [Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c Arun R Bharadwaj
2010-11-08 10:46 [Qemu-devel] [REFACTORED CODE] [PATCH 0/3] Threadlets: A generic task offloading framework Arun R Bharadwaj
2010-11-08 10:47 ` [Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c Arun R Bharadwaj

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).