qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
* [Qemu-devel] v5 [PATCH 0/3] qemu: Threadlets: A generic task offloading framework
@ 2010-10-13 15:30 Arun R Bharadwaj
  2010-10-13 15:31 ` [Qemu-devel] [PATCH 1/3] Introduce threadlets Arun R Bharadwaj
                   ` (2 more replies)
  0 siblings, 3 replies; 29+ messages in thread
From: Arun R Bharadwaj @ 2010-10-13 15:30 UTC (permalink / raw)
  To: qemu-devel

Hi,

This is the v5 of the patch-series to have a generic asynchronous task
offloading framework (called threadlets) within qemu.

V4 can be found here:
http://www.mail-archive.com/qemu-devel@nongnu.org/msg36157.html

Change from v4:

* The earlier code was hitting a null pointer dereference error at
  cpu_exit() with IO thread disabled. This version adds a signal
  handler for SIGUSR2 to call qemu_service_io() which calls cpu_exit()

The following series implements...

---

Aneesh Kumar K.V (1):
      Introduce threadlets

Gautham R Shenoy (2):
      Make paio subsystem use threadlets
      Add helper functions for virtio-9p to use threadlets


 Makefile.objs          |    3 -
 docs/async-support.txt |  141 +++++++++++++++++++++++++++++++++++
 hw/virtio-9p.c         |  165 ++++++++++++++++++++++++++++++++++++++++-
 posix-aio-compat.c     |  195 ++++++++++--------------------------------------
 qemu-threadlets.c      |  190 +++++++++++++++++++++++++++++++++++++++++++++++
 qemu-threadlets.h      |   49 ++++++++++++
 vl.c                   |    3 +
 7 files changed, 588 insertions(+), 158 deletions(-)
 create mode 100644 docs/async-support.txt
 create mode 100644 qemu-threadlets.c
 create mode 100644 qemu-threadlets.h

-- 
arun

^ permalink raw reply	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 1/3] Introduce threadlets
  2010-10-13 15:30 [Qemu-devel] v5 [PATCH 0/3] qemu: Threadlets: A generic task offloading framework Arun R Bharadwaj
@ 2010-10-13 15:31 ` Arun R Bharadwaj
  2010-10-14  9:02   ` Stefan Hajnoczi
  2010-10-14  9:15   ` [Qemu-devel] " Stefan Hajnoczi
  2010-10-13 15:31 ` [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets Arun R Bharadwaj
  2010-10-13 15:31 ` [Qemu-devel] [PATCH 3/3] Add helper functions for virtio-9p to " Arun R Bharadwaj
  2 siblings, 2 replies; 29+ messages in thread
From: Arun R Bharadwaj @ 2010-10-13 15:31 UTC (permalink / raw)
  To: qemu-devel

From: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>

This patch creates a generic asynchronous-task-offloading infrastructure named
threadlets. The core idea has been borrowed from the threading framework that
is being used by paio.

The reason for creating this generic infrastructure is so that other subsystems,
such as virtio-9p could make use of it for offloading tasks that could block.

The patch creates a global queue on-to which subsystems can queue their tasks to
be executed asynchronously.

The patch also provides API's that allow a subsystem to create a private queue
with an associated pool of threads.

[ego@in.ibm.com: Facelift of the code, Documentation, cancel_threadlet
and other helpers]

Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
 Makefile.objs          |    3 +
 docs/async-support.txt |  141 ++++++++++++++++++++++++++++++++++++++++
 qemu-threadlets.c      |  169 ++++++++++++++++++++++++++++++++++++++++++++++++
 qemu-threadlets.h      |   48 ++++++++++++++
 4 files changed, 360 insertions(+), 1 deletions(-)
 create mode 100644 docs/async-support.txt
 create mode 100644 qemu-threadlets.c
 create mode 100644 qemu-threadlets.h

diff --git a/Makefile.objs b/Makefile.objs
index cd5a24b..2cf8aba 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -9,6 +9,8 @@ qobject-obj-y += qerror.o
 
 block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
 block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
+block-obj-$(CONFIG_POSIX) += qemu-thread.o
+block-obj-$(CONFIG_POSIX) += qemu-threadlets.o
 block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 
@@ -124,7 +126,6 @@ endif
 common-obj-y += $(addprefix ui/, $(ui-obj-y))
 
 common-obj-y += iov.o acl.o
-common-obj-$(CONFIG_THREAD) += qemu-thread.o
 common-obj-y += notify.o event_notifier.o
 common-obj-y += qemu-timer.o
 
diff --git a/docs/async-support.txt b/docs/async-support.txt
new file mode 100644
index 0000000..2e8adc9
--- /dev/null
+++ b/docs/async-support.txt
@@ -0,0 +1,141 @@
+== How to use the various asynchronous models supported in Qemu ==
+
+== Threadlets ==
+
+Q.1: What are threadlets ?
+A.1: Threadlets is an infrastructure within QEMU that allows other subsystems
+     to offload possibly blocking work to a queue to be processed by a pool
+     of threads asynchrnonously.
+
+Q.2: When would one want to use threadlets ?
+A.2: Threadlets are useful when there are operations that can be performed
+     outside the context of the VCPU/IO threads inorder to free these latter
+     to service any other guest requests.
+
+Q.3: I have some work that can be executed in an asynchronous context. How
+     should I go about it ?
+A.3: One could follow the steps listed below:
+
+     - Define a function which would do the asynchrnous work.
+	void my_threadlet_func(ThreadletWork *work)
+	{
+	}
+
+     - Declare an object of type ThreadletWork;
+	ThreadletWork work;
+
+
+     - Assign a value to the "func" member of ThreadletWork object.
+	work.func = my_threadlet_func;
+
+     - Submit the threadlet to the global queue.
+	submit_threadletwork(&work);
+
+     - Continue servicing some other guest operations.
+
+Q.4: I want to my_threadlet_func to access some non-global data. How do I do
+     that ?
+A.4: Suppose you want my_threadlet_func to access some non-global data-object
+     of type myPrivateData. In that case one could follow the following steps.
+
+     - Define a member of the type ThreadletWork within myPrivateData.
+	typdef myPrivateData {
+		...;
+		...;
+		...;
+		ThreadletWork work;
+	} myPrivateData;
+
+	myPrivateData myData;
+
+     - Initialize myData.work as described in A.3
+	myData.work.func = my_threadlet_func;
+	submit_threadletwork(&myData.work);
+
+     - Access the myData object inside my_threadlet_func() using container_of
+       primitive
+	void my_threadlet_func(ThreadletWork *work)
+	{
+		myPrivateData *mydata_ptr;
+		mydata_ptr = container_of(work, myPrivateData, work);
+
+		/* mydata_ptr now points to myData object */
+	}
+
+Q.5: Are there any precautions one must take while sharing data with the
+     Asynchrnous thread-pool ?
+A.5: Yes, make sure that the helper function of the type my_threadlet_func()
+     does not access/modify data when it can be accessed or modified in the
+     context of VCPU thread or IO thread. This is because the asynchronous
+     threads in the pool can run in parallel with the VCPU/IOThreads as shown
+     in the figure.
+
+     A typical workflow is as follows:
+
+              VCPU/IOThread
+                   |
+                   | (1)
+                   |
+                   V
+                Offload work              (2)
+      |-------> to threadlets -----------------------------> Helper thread
+      |            |                                               |
+      |            |                                               |
+      |            | (3)                                           | (4)
+      |            |                                               |
+      |         Handle other Guest requests                        |
+      |            |                                               |
+      |            |                                               V
+      |            | (3)                                  Signal the I/O Thread
+      |(6)         |                                               |
+      |            |                                              /
+      |            |                                             /
+      |            V                                            /
+      |          Do the post <---------------------------------/
+      |          processing               (5)
+      |            |
+      |            | (6)
+      |            V
+      |-Yes------ More async work?
+                   |
+                   | (7)
+	           No
+                   |
+                   |
+                   .
+                   .
+
+    Hence one needs to make sure that in the steps (3) and (4) which run in
+    parallel, any global data is accessed within only one context.
+
+Q.6: I have queued a threadlet which I want to cancel. How do I do that ?
+A.6: Threadlets framework provides the API cancel_threadlet:
+       - int cancel_threadletwork(ThreadletWork *work)
+
+     The API scans the ThreadletQueue to see if (work) is present. If it finds
+     work, it'll dequeue work and return 0.
+
+     On the other hand, if it does not find the (work) in the ThreadletQueue,
+     then it'll return 1. This can imply two things. Either the work is being
+     processed by one of the helper threads or it has been processed. The
+     threadlet infrastructure currently _does_not_ distinguish between these
+     two and the onus is on the caller to do that.
+
+Q.7: Apart from the global pool of threads, can I have my own private Queue ?
+A.7: Yes, the threadlets framework allows subsystems to create their own private
+     queues with associated pools of threads.
+
+     - Define a PrivateQueue
+       ThreadletQueue myQueue;
+
+     - Initialize it:
+       threadlet_queue_init(&myQueue, my_max_threads, my_min_threads);
+       where my_max_threads is the maximum number of threads that can be in the
+       thread pool and my_min_threads is the minimum number of active threads
+       that can be in the thread-pool.
+
+     - Submit work:
+       submit_threadletwork_to_queue(&myQueue, &my_work);
+
+     - Cancel work:
+       cancel_threadletwork_on_queue(&myQueue, &my_work);
diff --git a/qemu-threadlets.c b/qemu-threadlets.c
new file mode 100644
index 0000000..1442122
--- /dev/null
+++ b/qemu-threadlets.c
@@ -0,0 +1,169 @@
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Anthony Liguori     <aliguori@us.ibm.com>
+ *  Aneesh Kumar K.V    <aneesh.kumar@linux.vnet.ibm.com>
+ *  Gautham R Shenoy    <ego@in.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ */
+
+#include "qemu-threadlets.h"
+#include "osdep.h"
+
+#define MAX_GLOBAL_THREADS  64
+#define MIN_GLOBAL_THREADS  64
+ThreadletQueue globalqueue;
+static int globalqueue_init;
+
+static void *threadlet_worker(void *data)
+{
+    ThreadletQueue *queue = data;
+
+    qemu_mutex_lock(&(queue->lock));
+    while (1) {
+        ThreadletWork *work;
+        int ret = 0;
+
+        while (QTAILQ_EMPTY(&(queue->request_list)) &&
+               (ret != ETIMEDOUT)) {
+            ret = qemu_cond_timedwait(&(queue->cond),
+					 &(queue->lock), 10*100000);
+        }
+
+        assert(queue->idle_threads != 0);
+        if (QTAILQ_EMPTY(&(queue->request_list))) {
+            if (queue->cur_threads > queue->min_threads) {
+                /* We retain the minimum number of threads */
+                break;
+            }
+        } else {
+            work = QTAILQ_FIRST(&(queue->request_list));
+            QTAILQ_REMOVE(&(queue->request_list), work, node);
+
+            queue->idle_threads--;
+            qemu_mutex_unlock(&(queue->lock));
+
+            /* execute the work function */
+            work->func(work);
+
+            qemu_mutex_lock(&(queue->lock));
+            queue->idle_threads++;
+        }
+    }
+
+    queue->idle_threads--;
+    queue->cur_threads--;
+    qemu_mutex_unlock(&(queue->lock));
+
+    return NULL;
+}
+
+static void spawn_threadlet(ThreadletQueue *queue)
+{
+    QemuThread thread;
+
+    queue->cur_threads++;
+    queue->idle_threads++;
+
+    qemu_thread_create(&thread, threadlet_worker, queue);
+}
+
+/**
+ * submit_threadletwork_to_queue: Submit a new task to a private queue to be
+ *                            executed asynchronously.
+ * @queue: Per-subsystem private queue to which the new task needs
+ *         to be submitted.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work)
+{
+    qemu_mutex_lock(&(queue->lock));
+    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+        spawn_threadlet(queue);
+    }
+    QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
+    qemu_mutex_unlock(&(queue->lock));
+    qemu_cond_signal(&(queue->cond));
+}
+
+/**
+ * submit_threadletwork: Submit to the global queue a new task to be executed
+ *                   asynchronously.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_threadletwork(ThreadletWork *work)
+{
+    if (unlikely(!globalqueue_init)) {
+        threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
+                                MIN_GLOBAL_THREADS);
+        globalqueue_init = 1;
+    }
+
+    submit_threadletwork_to_queue(&globalqueue, work);
+}
+
+/**
+ * cancel_threadletwork_on_queue: Cancel a task queued on a Queue.
+ * @queue: The queue containing the task to be cancelled.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ *          1 otherwise.
+ */
+int cancel_threadletwork_on_queue(ThreadletQueue *queue, ThreadletWork *work)
+{
+    ThreadletWork *ret_work;
+    int found = 0;
+
+    qemu_mutex_lock(&(queue->lock));
+    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
+        if (ret_work == work) {
+            QTAILQ_REMOVE(&(queue->request_list), ret_work, node);
+            found = 1;
+            break;
+        }
+    }
+    qemu_mutex_unlock(&(queue->lock));
+
+    if (found) {
+        return 0;
+    }
+
+    return 1;
+}
+
+/**
+ * cancel_threadletwork: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ *          1 otherwise.
+ */
+int cancel_threadletwork(ThreadletWork *work)
+{
+    return cancel_threadletwork_on_queue(&globalqueue, work);
+}
+
+/**
+ * threadlet_queue_init: Initialize a threadlet queue.
+ * @queue: The threadlet queue to be initialized.
+ * @max_threads: Maximum number of threads processing the queue.
+ * @min_threads: Minimum number of threads processing the queue.
+ */
+void threadlet_queue_init(ThreadletQueue *queue,
+				    int max_threads, int min_threads)
+{
+    queue->cur_threads  = 0;
+    queue->idle_threads = 0;
+    queue->max_threads  = max_threads;
+    queue->min_threads  = min_threads;
+    QTAILQ_INIT(&(queue->request_list));
+    qemu_mutex_init(&(queue->lock));
+    qemu_cond_init(&(queue->cond));
+}
diff --git a/qemu-threadlets.h b/qemu-threadlets.h
new file mode 100644
index 0000000..3df9b10
--- /dev/null
+++ b/qemu-threadlets.h
@@ -0,0 +1,48 @@
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ *  Anthony Liguori     <aliguori@us.ibm.com>
+ *  Aneesh Kumar K.V    <aneesh.kumar@linux.vnet.ibm.com>
+ *  Gautham R Shenoy    <ego@in.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ */
+
+#ifndef QEMU_ASYNC_WORK_H
+#define QEMU_ASYNC_WORK_H
+
+#include "qemu-queue.h"
+#include "qemu-common.h"
+#include "qemu-thread.h"
+
+typedef struct ThreadletQueue
+{
+    QemuMutex lock;
+    QemuCond cond;
+    int max_threads;
+    int min_threads;
+    int cur_threads;
+    int idle_threads;
+    QTAILQ_HEAD(, threadlet_work) request_list;
+} ThreadletQueue;
+
+typedef struct threadlet_work
+{
+    QTAILQ_ENTRY(threadlet_work) node;
+    void (*func)(struct threadlet_work *work);
+} ThreadletWork;
+
+extern void submit_threadletwork_to_queue(ThreadletQueue *queue,
+                                      ThreadletWork *work);
+extern void submit_threadletwork(ThreadletWork *work);
+extern int cancel_threadletwork_on_queue(ThreadletQueue *queue,
+                                        ThreadletWork *work);
+extern int cancel_threadletwork(ThreadletWork *work);
+extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
+                                 int min_threads);
+#endif

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
  2010-10-13 15:30 [Qemu-devel] v5 [PATCH 0/3] qemu: Threadlets: A generic task offloading framework Arun R Bharadwaj
  2010-10-13 15:31 ` [Qemu-devel] [PATCH 1/3] Introduce threadlets Arun R Bharadwaj
@ 2010-10-13 15:31 ` Arun R Bharadwaj
  2010-10-13 15:31 ` [Qemu-devel] [PATCH 3/3] Add helper functions for virtio-9p to " Arun R Bharadwaj
  2 siblings, 0 replies; 29+ messages in thread
From: Arun R Bharadwaj @ 2010-10-13 15:31 UTC (permalink / raw)
  To: qemu-devel

From: Gautham R Shenoy <ego@in.ibm.com>

This patch makes the paio subsystem use the threadlet framework thereby
decoupling asynchronous threading framework portion out of
posix-aio-compat.c

The patch has been tested with fstress.

Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
---
 posix-aio-compat.c |  166 +++++++++-------------------------------------------
 1 files changed, 30 insertions(+), 136 deletions(-)

diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 7b862b5..6977c18 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,6 +29,7 @@
 #include "block_int.h"
 
 #include "block/raw-posix-aio.h"
+#include "qemu-threadlets.h"
 
 
 struct qemu_paiocb {
@@ -51,6 +52,7 @@ struct qemu_paiocb {
     struct qemu_paiocb *next;
 
     int async_context_id;
+    ThreadletWork work;
 };
 
 typedef struct PosixAioState {
@@ -59,15 +61,6 @@ typedef struct PosixAioState {
 } PosixAioState;
 
 
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
-
 #ifdef CONFIG_PREADV
 static int preadv_present = 1;
 #else
@@ -85,39 +78,6 @@ static void die(const char *what)
     die2(errno, what);
 }
 
-static void mutex_lock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_lock(mutex);
-    if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_unlock(mutex);
-    if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
-                           struct timespec *ts)
-{
-    int ret = pthread_cond_timedwait(cond, mutex, ts);
-    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
-    return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
-    int ret = pthread_cond_signal(cond);
-    if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
-                          void *(*start_routine)(void*), void *arg)
-{
-    int ret = pthread_create(thread, attr, start_routine, arg);
-    if (ret) die2(ret, "pthread_create");
-}
-
 static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
 {
     int ret;
@@ -301,106 +261,51 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
     return nbytes;
 }
 
-static void *aio_thread(void *unused)
+static void aio_thread(ThreadletWork *work)
 {
     pid_t pid;
+    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
+    ssize_t ret = 0;
 
     pid = getpid();
+    aiocb->active = 1;
 
-    while (1) {
-        struct qemu_paiocb *aiocb;
-        ssize_t ret = 0;
-        qemu_timeval tv;
-        struct timespec ts;
-
-        qemu_gettimeofday(&tv);
-        ts.tv_sec = tv.tv_sec + 10;
-        ts.tv_nsec = 0;
-
-        mutex_lock(&lock);
-
-        while (QTAILQ_EMPTY(&request_list) &&
-               !(ret == ETIMEDOUT)) {
-            ret = cond_timedwait(&cond, &lock, &ts);
-        }
-
-        if (QTAILQ_EMPTY(&request_list))
-            break;
-
-        aiocb = QTAILQ_FIRST(&request_list);
-        QTAILQ_REMOVE(&request_list, aiocb, node);
-        aiocb->active = 1;
-        idle_threads--;
-        mutex_unlock(&lock);
-
-        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
-        case QEMU_AIO_READ:
-        case QEMU_AIO_WRITE:
-            ret = handle_aiocb_rw(aiocb);
-            break;
-        case QEMU_AIO_FLUSH:
-            ret = handle_aiocb_flush(aiocb);
-            break;
-        case QEMU_AIO_IOCTL:
-            ret = handle_aiocb_ioctl(aiocb);
-            break;
-        default:
-            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
-            ret = -EINVAL;
-            break;
-        }
-
-        mutex_lock(&lock);
-        aiocb->ret = ret;
-        idle_threads++;
-        mutex_unlock(&lock);
-
-        if (kill(pid, aiocb->ev_signo)) die("kill failed");
+    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+    case QEMU_AIO_READ:
+    case QEMU_AIO_WRITE:
+        ret = handle_aiocb_rw(aiocb);
+        break;
+    case QEMU_AIO_FLUSH:
+        ret = handle_aiocb_flush(aiocb);
+        break;
+    case QEMU_AIO_IOCTL:
+        ret = handle_aiocb_ioctl(aiocb);
+        break;
+    default:
+        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+        ret = -EINVAL;
+        break;
     }
 
-    idle_threads--;
-    cur_threads--;
-    mutex_unlock(&lock);
+    aiocb->ret = ret;
 
-    return NULL;
-}
-
-static void spawn_thread(void)
-{
-    sigset_t set, oldset;
-
-    cur_threads++;
-    idle_threads++;
-
-    /* block all signals */
-    if (sigfillset(&set)) die("sigfillset");
-    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
-    thread_create(&thread_id, &attr, aio_thread, NULL);
-
-    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+    if (kill(pid, aiocb->ev_signo)) die("kill failed");
 }
 
 static void qemu_paio_submit(struct qemu_paiocb *aiocb)
 {
     aiocb->ret = -EINPROGRESS;
     aiocb->active = 0;
-    mutex_lock(&lock);
-    if (idle_threads == 0 && cur_threads < max_threads)
-        spawn_thread();
-    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
-    mutex_unlock(&lock);
-    cond_signal(&cond);
+
+    aiocb->work.func = aio_thread;
+    submit_threadletwork(&aiocb->work);
 }
 
 static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
 {
     ssize_t ret;
 
-    mutex_lock(&lock);
     ret = aiocb->ret;
-    mutex_unlock(&lock);
-
     return ret;
 }
 
@@ -536,14 +441,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
     int active = 0;
 
-    mutex_lock(&lock);
     if (!acb->active) {
-        QTAILQ_REMOVE(&request_list, acb, node);
-        acb->ret = -ECANCELED;
+        if (!cancel_threadletwork(&acb->work))
+            acb->ret = -ECANCELED;
+         else
+            active = 1;
     } else if (acb->ret == -EINPROGRESS) {
         active = 1;
     }
-    mutex_unlock(&lock);
 
     if (active) {
         /* fail safe: if the aio could not be canceled, we wait for
@@ -618,7 +523,6 @@ int paio_init(void)
     struct sigaction act;
     PosixAioState *s;
     int fds[2];
-    int ret;
 
     if (posix_aio_state)
         return 0;
@@ -645,16 +549,6 @@ int paio_init(void)
     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
         posix_aio_process_queue, s);
 
-    ret = pthread_attr_init(&attr);
-    if (ret)
-        die2(ret, "pthread_attr_init");
-
-    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-    if (ret)
-        die2(ret, "pthread_attr_setdetachstate");
-
-    QTAILQ_INIT(&request_list);
-
     posix_aio_state = s;
     return 0;
 }

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 3/3] Add helper functions for virtio-9p to use threadlets
  2010-10-13 15:30 [Qemu-devel] v5 [PATCH 0/3] qemu: Threadlets: A generic task offloading framework Arun R Bharadwaj
  2010-10-13 15:31 ` [Qemu-devel] [PATCH 1/3] Introduce threadlets Arun R Bharadwaj
  2010-10-13 15:31 ` [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets Arun R Bharadwaj
@ 2010-10-13 15:31 ` Arun R Bharadwaj
  2 siblings, 0 replies; 29+ messages in thread
From: Arun R Bharadwaj @ 2010-10-13 15:31 UTC (permalink / raw)
  To: qemu-devel

From: Gautham R Shenoy <ego@in.ibm.com>

Add helper functions to enable virtio-9p make use of the threadlets
infrastructure for offloading blocking tasks such as making posix calls on
to the helper threads and handle the post_posix_operations() from the
context of the iothread. This frees the vcpu thread to process any other guest
operations while the processing of v9fs_io is in progress.

Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
 hw/virtio-9p.c     |  165 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 posix-aio-compat.c |   33 +++-------
 qemu-threadlets.c  |   21 +++++++
 qemu-threadlets.h  |    1 
 vl.c               |    3 +
 5 files changed, 200 insertions(+), 23 deletions(-)

diff --git a/hw/virtio-9p.c b/hw/virtio-9p.c
index a871685..174300d 100644
--- a/hw/virtio-9p.c
+++ b/hw/virtio-9p.c
@@ -18,6 +18,7 @@
 #include "fsdev/qemu-fsdev.h"
 #include "virtio-9p-debug.h"
 #include "virtio-9p-xattr.h"
+#include "qemu-threadlets.h"
 
 int debug_9p_pdu;
 
@@ -33,6 +34,146 @@ enum {
     Oappend = 0x80,
 };
 
+struct v9fs_post_op {
+    QTAILQ_ENTRY(v9fs_post_op) node;
+    void (*func)(void *arg);
+    void *arg;
+};
+
+static struct {
+    int rfd;
+    int wfd;
+    QemuMutex lock;
+    QTAILQ_HEAD(, v9fs_post_op) post_op_list;
+} v9fs_async_struct;
+
+static void die2(int err, const char *what)
+{
+    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
+    abort();
+}
+
+static void die(const char *what)
+{
+    die2(errno, what);
+}
+
+#define ASYNC_MAX_PROCESS   5
+
+/**
+ * v9fs_process_post_ops: Process any pending v9fs_post_posix_operation
+ * @arg: Not used.
+ *
+ * This function serves as a callback to the iothread to be called into whenever
+ * the v9fs_async_struct.wfd is written into. This thread goes through the list
+ * of v9fs_post_posix_operations() and executes them. In the process, it might
+ * queue more job on the asynchronous thread pool.
+ */
+static void v9fs_process_post_ops(void *arg)
+{
+    int count = 0;
+    struct v9fs_post_op *post_op;
+    int ret;
+    char byte;
+
+    qemu_mutex_lock(&v9fs_async_struct.lock);
+    do {
+        ret = read(v9fs_async_struct.rfd, &byte, sizeof(byte));
+    } while (ret >= 0 && errno != EAGAIN);
+
+    for (count = 0; count < ASYNC_MAX_PROCESS; count++) {
+        if (QTAILQ_EMPTY(&(v9fs_async_struct.post_op_list))) {
+           break;
+        }
+        post_op = QTAILQ_FIRST(&(v9fs_async_struct.post_op_list));
+        QTAILQ_REMOVE(&(v9fs_async_struct.post_op_list), post_op, node);
+
+        qemu_mutex_unlock(&v9fs_async_struct.lock);
+        post_op->func(post_op->arg);
+        qemu_free(post_op);
+        qemu_mutex_lock(&v9fs_async_struct.lock);
+    }
+    qemu_mutex_unlock(&v9fs_async_struct.lock);
+}
+
+/**
+ * v9fs_async_signal: Inform the io-thread of completion of async job.
+ *
+ * This function is used to inform the iothread that a particular
+ * async-operation pertaining to v9fs has been completed and that the io thread
+ * can handle the v9fs_post_posix_operation.
+ *
+ * This is based on the aio_signal_handler
+ */
+static inline void v9fs_async_signal(void)
+{
+    char byte = 0;
+    ssize_t ret;
+    int tries = 0;
+
+    qemu_mutex_lock(&v9fs_async_struct.lock);
+    do {
+        assert(tries != 100);
+       ret = write(v9fs_async_struct.wfd, &byte, sizeof(byte));
+        tries++;
+    } while (ret < 0 && errno == EAGAIN);
+    qemu_mutex_unlock(&v9fs_async_struct.lock);
+
+    if (ret < 0 && errno != EAGAIN)
+        die("write() in v9fs");
+
+    if (kill(getpid(), SIGUSR2)) die("kill failed");
+}
+
+/**
+ * v9fs_async_helper_done: Marks the completion of the v9fs_async job
+ * @func: v9fs_post_posix_func() for post-processing invoked in the context of
+ *        the io-thread
+ * @arg: Argument to func.
+ *
+ * This function is called from the context of one of the asynchronous threads
+ * in the thread pool. This is called when the asynchronous thread has finished
+ * executing a v9fs_posix_operation. It's purpose is to initiate the process of
+ * informing the io-thread that the v9fs_posix_operation has completed.
+ */
+static void v9fs_async_helper_done(void (*func)(void *arg), void *arg)
+{
+    struct v9fs_post_op *post_op;
+
+    post_op = qemu_mallocz(sizeof(*post_op));
+    post_op->func = func;
+    post_op->arg = arg;
+
+    qemu_mutex_lock(&v9fs_async_struct.lock);
+    QTAILQ_INSERT_TAIL(&(v9fs_async_struct.post_op_list), post_op, node);
+    qemu_mutex_unlock(&v9fs_async_struct.lock);
+
+    v9fs_async_signal();
+}
+
+/**
+ * v9fs_do_async_posix: Offload v9fs_posix_operation onto async thread.
+ * @vs: V9fsOPState variable for the OP operation.
+ * @posix_fn: The posix function which has to be offloaded onto async thread.
+ * @post_fn_ptr: Address of the location to hold the post_fn corresponding to
+ * the posix_fn
+ * @post_fn: The post processing function corresponding to the posix_fn.
+ *
+ * This function is a helper to offload posix_operation on to the asynchronous
+ * thread pool. It sets up the associations with the post_function that needs to
+ * be invoked by from the context of the iothread once the posix_fn has been
+ * executed.
+ */
+static void v9fs_do_async_posix(ThreadletWork *work ,
+                                void (*posix_fn)(ThreadletWork *work),
+                                void (**post_fn_ptr)(void *arg),
+                                void (*post_fn)(void *arg))
+{
+    *post_fn_ptr = post_fn;
+    work->func = posix_fn;
+    submit_threadletwork(work);
+}
+
 static int omode_to_uflags(int8_t mode)
 {
     int ret = 0;
@@ -3639,7 +3780,7 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
     int i, len;
     struct stat stat;
     FsTypeEntry *fse;
-
+    int fds[2];
 
     s = (V9fsState *)virtio_common_init("virtio-9p",
                                     VIRTIO_ID_9P,
@@ -3722,5 +3863,27 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
                         s->tag_len;
     s->vdev.get_config = virtio_9p_get_config;
 
+    if (qemu_pipe(fds) == -1) {
+        fprintf(stderr, "failed to create fd's for virtio-9p\n");
+        exit(1);
+    }
+
+    v9fs_async_struct.rfd = fds[0];
+    v9fs_async_struct.wfd = fds[1];
+
+    printf("v9fs: rfd: %d\n", v9fs_async_struct.rfd);
+    printf("v9fs: wfd: %d\n", v9fs_async_struct.wfd);
+
+    fcntl(fds[0], F_SETFL, O_NONBLOCK);
+    fcntl(fds[1], F_SETFL, O_NONBLOCK);
+
+    qemu_set_fd_handler(fds[0], v9fs_process_post_ops, NULL, NULL);
+    QTAILQ_INIT(&v9fs_async_struct.post_op_list);
+    qemu_mutex_init(&(v9fs_async_struct.lock));
+    /* Create async queue. */
+
+    (void)v9fs_do_async_posix;
+    (void)v9fs_async_helper_done;
+
     return &s->vdev;
 }
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 6977c18..dbf9321 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -261,6 +261,8 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
     return nbytes;
 }
 
+static PosixAioState *posix_aio_state;
+
 static void aio_thread(ThreadletWork *work)
 {
     pid_t pid;
@@ -289,6 +291,15 @@ static void aio_thread(ThreadletWork *work)
 
     aiocb->ret = ret;
 
+    if (posix_aio_state) {
+        char byte = 0;
+        ssize_t ret;
+
+        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
+        if (ret < 0 && errno != EAGAIN)
+            die("write()");
+    }
+
     if (kill(pid, aiocb->ev_signo)) die("kill failed");
 }
 
@@ -401,22 +412,6 @@ static int posix_aio_flush(void *opaque)
     return !!s->first_aio;
 }
 
-static PosixAioState *posix_aio_state;
-
-static void aio_signal_handler(int signum)
-{
-    if (posix_aio_state) {
-        char byte = 0;
-        ssize_t ret;
-
-        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
-        if (ret < 0 && errno != EAGAIN)
-            die("write()");
-    }
-
-    qemu_service_io();
-}
-
 static void paio_remove(struct qemu_paiocb *acb)
 {
     struct qemu_paiocb **pacb;
@@ -520,7 +515,6 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
 
 int paio_init(void)
 {
-    struct sigaction act;
     PosixAioState *s;
     int fds[2];
 
@@ -529,11 +523,6 @@ int paio_init(void)
 
     s = qemu_malloc(sizeof(PosixAioState));
 
-    sigfillset(&act.sa_mask);
-    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
-    act.sa_handler = aio_signal_handler;
-    sigaction(SIGUSR2, &act, NULL);
-
     s->first_aio = NULL;
     if (qemu_pipe(fds) == -1) {
         fprintf(stderr, "failed to create pipe\n");
diff --git a/qemu-threadlets.c b/qemu-threadlets.c
index 1442122..c582d7c 100644
--- a/qemu-threadlets.c
+++ b/qemu-threadlets.c
@@ -15,12 +15,28 @@
 
 #include "qemu-threadlets.h"
 #include "osdep.h"
+#include <signal.h>
 
 #define MAX_GLOBAL_THREADS  64
 #define MIN_GLOBAL_THREADS  64
 ThreadletQueue globalqueue;
 static int globalqueue_init;
 
+static void threadlet_io_completion_signal_handler(int signum)
+{
+    qemu_service_io();
+}
+
+static void threadlet_register_signal_handler(void)
+{
+    struct sigaction act;
+
+    sigfillset(&act.sa_mask);
+    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
+    act.sa_handler = threadlet_io_completion_signal_handler;
+    sigaction(SIGUSR2, &act, NULL);
+}
+
 static void *threadlet_worker(void *data)
 {
     ThreadletQueue *queue = data;
@@ -167,3 +183,8 @@ void threadlet_queue_init(ThreadletQueue *queue,
     qemu_mutex_init(&(queue->lock));
     qemu_cond_init(&(queue->cond));
 }
+
+void threadlet_init(void)
+{
+   threadlet_register_signal_handler();
+}
diff --git a/qemu-threadlets.h b/qemu-threadlets.h
index 3df9b10..ebc1f46 100644
--- a/qemu-threadlets.h
+++ b/qemu-threadlets.h
@@ -45,4 +45,5 @@ extern int cancel_threadletwork_on_queue(ThreadletQueue *queue,
 extern int cancel_threadletwork(ThreadletWork *work);
 extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
                                  int min_threads);
+extern void threadlet_init(void);
 #endif
diff --git a/vl.c b/vl.c
index df414ef..7b9a425 100644
--- a/vl.c
+++ b/vl.c
@@ -148,6 +148,7 @@ int main(int argc, char **argv)
 #include "qemu-config.h"
 #include "qemu-objects.h"
 #include "qemu-options.h"
+#include "qemu-threadlets.h"
 #ifdef CONFIG_VIRTFS
 #include "fsdev/qemu-fsdev.h"
 #endif
@@ -2922,6 +2923,8 @@ int main(int argc, char **argv, char **envp)
             exit(1);
     }
 
+    threadlet_init();
+
     /* init generic devices */
     if (qemu_opts_foreach(qemu_find_opts("device"), device_init_func, NULL, 1) != 0)
         exit(1);

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 2/3]: Make paio subsystem use threadlets
  2010-10-13 16:44 [Qemu-devel] [PATCH 0/3]: Threadlets: A generic task offloading framework Arun R Bharadwaj
@ 2010-10-13 16:48 ` Arun R Bharadwaj
  0 siblings, 0 replies; 29+ messages in thread
From: Arun R Bharadwaj @ 2010-10-13 16:48 UTC (permalink / raw)
  To: qemu-devel; +Cc: Arun Bharadwaj

* Arun R Bharadwaj <arun@linux.vnet.ibm.com> [2010-10-13 22:14:39]:

Make paio subsystem use threadlets

From: Gautham R Shenoy <ego@in.ibm.com>

This patch makes the paio subsystem use the threadlet framework thereby
decoupling asynchronous threading framework portion out of
posix-aio-compat.c

The patch has been tested with fstress.

Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
---
 posix-aio-compat.c |  166 +++++++++-------------------------------------------
 1 files changed, 30 insertions(+), 136 deletions(-)

diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 7b862b5..6977c18 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,6 +29,7 @@
 #include "block_int.h"
 
 #include "block/raw-posix-aio.h"
+#include "qemu-threadlets.h"
 
 
 struct qemu_paiocb {
@@ -51,6 +52,7 @@ struct qemu_paiocb {
     struct qemu_paiocb *next;
 
     int async_context_id;
+    ThreadletWork work;
 };
 
 typedef struct PosixAioState {
@@ -59,15 +61,6 @@ typedef struct PosixAioState {
 } PosixAioState;
 
 
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
-
 #ifdef CONFIG_PREADV
 static int preadv_present = 1;
 #else
@@ -85,39 +78,6 @@ static void die(const char *what)
     die2(errno, what);
 }
 
-static void mutex_lock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_lock(mutex);
-    if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_unlock(mutex);
-    if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
-                           struct timespec *ts)
-{
-    int ret = pthread_cond_timedwait(cond, mutex, ts);
-    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
-    return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
-    int ret = pthread_cond_signal(cond);
-    if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
-                          void *(*start_routine)(void*), void *arg)
-{
-    int ret = pthread_create(thread, attr, start_routine, arg);
-    if (ret) die2(ret, "pthread_create");
-}
-
 static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
 {
     int ret;
@@ -301,106 +261,51 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
     return nbytes;
 }
 
-static void *aio_thread(void *unused)
+static void aio_thread(ThreadletWork *work)
 {
     pid_t pid;
+    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
+    ssize_t ret = 0;
 
     pid = getpid();
+    aiocb->active = 1;
 
-    while (1) {
-        struct qemu_paiocb *aiocb;
-        ssize_t ret = 0;
-        qemu_timeval tv;
-        struct timespec ts;
-
-        qemu_gettimeofday(&tv);
-        ts.tv_sec = tv.tv_sec + 10;
-        ts.tv_nsec = 0;
-
-        mutex_lock(&lock);
-
-        while (QTAILQ_EMPTY(&request_list) &&
-               !(ret == ETIMEDOUT)) {
-            ret = cond_timedwait(&cond, &lock, &ts);
-        }
-
-        if (QTAILQ_EMPTY(&request_list))
-            break;
-
-        aiocb = QTAILQ_FIRST(&request_list);
-        QTAILQ_REMOVE(&request_list, aiocb, node);
-        aiocb->active = 1;
-        idle_threads--;
-        mutex_unlock(&lock);
-
-        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
-        case QEMU_AIO_READ:
-        case QEMU_AIO_WRITE:
-            ret = handle_aiocb_rw(aiocb);
-            break;
-        case QEMU_AIO_FLUSH:
-            ret = handle_aiocb_flush(aiocb);
-            break;
-        case QEMU_AIO_IOCTL:
-            ret = handle_aiocb_ioctl(aiocb);
-            break;
-        default:
-            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
-            ret = -EINVAL;
-            break;
-        }
-
-        mutex_lock(&lock);
-        aiocb->ret = ret;
-        idle_threads++;
-        mutex_unlock(&lock);
-
-        if (kill(pid, aiocb->ev_signo)) die("kill failed");
+    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+    case QEMU_AIO_READ:
+    case QEMU_AIO_WRITE:
+        ret = handle_aiocb_rw(aiocb);
+        break;
+    case QEMU_AIO_FLUSH:
+        ret = handle_aiocb_flush(aiocb);
+        break;
+    case QEMU_AIO_IOCTL:
+        ret = handle_aiocb_ioctl(aiocb);
+        break;
+    default:
+        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+        ret = -EINVAL;
+        break;
     }
 
-    idle_threads--;
-    cur_threads--;
-    mutex_unlock(&lock);
+    aiocb->ret = ret;
 
-    return NULL;
-}
-
-static void spawn_thread(void)
-{
-    sigset_t set, oldset;
-
-    cur_threads++;
-    idle_threads++;
-
-    /* block all signals */
-    if (sigfillset(&set)) die("sigfillset");
-    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
-    thread_create(&thread_id, &attr, aio_thread, NULL);
-
-    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+    if (kill(pid, aiocb->ev_signo)) die("kill failed");
 }
 
 static void qemu_paio_submit(struct qemu_paiocb *aiocb)
 {
     aiocb->ret = -EINPROGRESS;
     aiocb->active = 0;
-    mutex_lock(&lock);
-    if (idle_threads == 0 && cur_threads < max_threads)
-        spawn_thread();
-    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
-    mutex_unlock(&lock);
-    cond_signal(&cond);
+
+    aiocb->work.func = aio_thread;
+    submit_threadletwork(&aiocb->work);
 }
 
 static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
 {
     ssize_t ret;
 
-    mutex_lock(&lock);
     ret = aiocb->ret;
-    mutex_unlock(&lock);
-
     return ret;
 }
 
@@ -536,14 +441,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
     int active = 0;
 
-    mutex_lock(&lock);
     if (!acb->active) {
-        QTAILQ_REMOVE(&request_list, acb, node);
-        acb->ret = -ECANCELED;
+        if (!cancel_threadletwork(&acb->work))
+            acb->ret = -ECANCELED;
+         else
+            active = 1;
     } else if (acb->ret == -EINPROGRESS) {
         active = 1;
     }
-    mutex_unlock(&lock);
 
     if (active) {
         /* fail safe: if the aio could not be canceled, we wait for
@@ -618,7 +523,6 @@ int paio_init(void)
     struct sigaction act;
     PosixAioState *s;
     int fds[2];
-    int ret;
 
     if (posix_aio_state)
         return 0;
@@ -645,16 +549,6 @@ int paio_init(void)
     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
         posix_aio_process_queue, s);
 
-    ret = pthread_attr_init(&attr);
-    if (ret)
-        die2(ret, "pthread_attr_init");
-
-    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-    if (ret)
-        die2(ret, "pthread_attr_setdetachstate");
-
-    QTAILQ_INIT(&request_list);
-
     posix_aio_state = s;
     return 0;
 }

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
  2010-10-13 15:31 ` [Qemu-devel] [PATCH 1/3] Introduce threadlets Arun R Bharadwaj
@ 2010-10-14  9:02   ` Stefan Hajnoczi
  2010-10-14 21:17     ` Venkateswararao Jujjuri (JV)
  2010-10-15 14:42     ` [Qemu-devel] " Paolo Bonzini
  2010-10-14  9:15   ` [Qemu-devel] " Stefan Hajnoczi
  1 sibling, 2 replies; 29+ messages in thread
From: Stefan Hajnoczi @ 2010-10-14  9:02 UTC (permalink / raw)
  To: Arun R Bharadwaj; +Cc: qemu-devel

On Wed, Oct 13, 2010 at 4:31 PM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> From: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
>
> This patch creates a generic asynchronous-task-offloading infrastructure named
> threadlets. The core idea has been borrowed from the threading framework that
> is being used by paio.
>
> The reason for creating this generic infrastructure is so that other subsystems,
> such as virtio-9p could make use of it for offloading tasks that could block.
>
> The patch creates a global queue on-to which subsystems can queue their tasks to
> be executed asynchronously.
>
> The patch also provides API's that allow a subsystem to create a private queue
> with an associated pool of threads.
>
> [ego@in.ibm.com: Facelift of the code, Documentation, cancel_threadlet
> and other helpers]
>
> Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
> Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
> Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
> ---
>  Makefile.objs          |    3 +
>  docs/async-support.txt |  141 ++++++++++++++++++++++++++++++++++++++++
>  qemu-threadlets.c      |  169 ++++++++++++++++++++++++++++++++++++++++++++++++
>  qemu-threadlets.h      |   48 ++++++++++++++
>  4 files changed, 360 insertions(+), 1 deletions(-)
>  create mode 100644 docs/async-support.txt
>  create mode 100644 qemu-threadlets.c
>  create mode 100644 qemu-threadlets.h
>
> diff --git a/Makefile.objs b/Makefile.objs
> index cd5a24b..2cf8aba 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -9,6 +9,8 @@ qobject-obj-y += qerror.o
>
>  block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
>  block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
> +block-obj-$(CONFIG_POSIX) += qemu-thread.o
> +block-obj-$(CONFIG_POSIX) += qemu-threadlets.o
>  block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
>  block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>
> @@ -124,7 +126,6 @@ endif
>  common-obj-y += $(addprefix ui/, $(ui-obj-y))
>
>  common-obj-y += iov.o acl.o
> -common-obj-$(CONFIG_THREAD) += qemu-thread.o
>  common-obj-y += notify.o event_notifier.o
>  common-obj-y += qemu-timer.o
>
> diff --git a/docs/async-support.txt b/docs/async-support.txt
> new file mode 100644
> index 0000000..2e8adc9
> --- /dev/null
> +++ b/docs/async-support.txt

Why not call it docs/threadlets.txt?

> @@ -0,0 +1,141 @@
> +== How to use the various asynchronous models supported in Qemu ==

This only describes threadlets.

> +
> +== Threadlets ==
> +
> +Q.1: What are threadlets ?
> +A.1: Threadlets is an infrastructure within QEMU that allows other subsystems
> +     to offload possibly blocking work to a queue to be processed by a pool
> +     of threads asynchrnonously.

asynchronously typo

> +
> +Q.2: When would one want to use threadlets ?
> +A.2: Threadlets are useful when there are operations that can be performed
> +     outside the context of the VCPU/IO threads inorder to free these latter
> +     to service any other guest requests.
> +
> +Q.3: I have some work that can be executed in an asynchronous context. How
> +     should I go about it ?
> +A.3: One could follow the steps listed below:
> +
> +     - Define a function which would do the asynchrnous work.

asynchronous typo

> +       void my_threadlet_func(ThreadletWork *work)

Usually these functions will be static.

> +       {
> +       }
> +
> +     - Declare an object of type ThreadletWork;
> +       ThreadletWork work;
> +
> +
> +     - Assign a value to the "func" member of ThreadletWork object.
> +       work.func = my_threadlet_func;
> +
> +     - Submit the threadlet to the global queue.
> +       submit_threadletwork(&work);
> +
> +     - Continue servicing some other guest operations.
> +
> +Q.4: I want to my_threadlet_func to access some non-global data. How do I do
> +     that ?
> +A.4: Suppose you want my_threadlet_func to access some non-global data-object
> +     of type myPrivateData. In that case one could follow the following steps.
> +
> +     - Define a member of the type ThreadletWork within myPrivateData.
> +       typdef myPrivateData {

typedef typo, struct missing

Also, the QEMU coding style usually requires CapsNames for struct types...

> +               ...;
> +               ...;
> +               ...;
> +               ThreadletWork work;
> +       } myPrivateData;
> +
> +       myPrivateData myData;

...and myData would be my_data.

> +
> +     - Initialize myData.work as described in A.3
> +       myData.work.func = my_threadlet_func;
> +       submit_threadletwork(&myData.work);
> +
> +     - Access the myData object inside my_threadlet_func() using container_of
> +       primitive
> +       void my_threadlet_func(ThreadletWork *work)
> +       {
> +               myPrivateData *mydata_ptr;
> +               mydata_ptr = container_of(work, myPrivateData, work);
> +
> +               /* mydata_ptr now points to myData object */
> +       }
> +
> +Q.5: Are there any precautions one must take while sharing data with the
> +     Asynchrnous thread-pool ?

asynchronous typo

> +A.5: Yes, make sure that the helper function of the type my_threadlet_func()
> +     does not access/modify data when it can be accessed or modified in the
> +     context of VCPU thread or IO thread. This is because the asynchronous
> +     threads in the pool can run in parallel with the VCPU/IOThreads as shown
> +     in the figure.
> +
> +     A typical workflow is as follows:
> +
> +              VCPU/IOThread
> +                   |
> +                   | (1)
> +                   |
> +                   V
> +                Offload work              (2)
> +      |-------> to threadlets -----------------------------> Helper thread
> +      |            |                                               |
> +      |            |                                               |
> +      |            | (3)                                           | (4)
> +      |            |                                               |
> +      |         Handle other Guest requests                        |
> +      |            |                                               |
> +      |            |                                               V
> +      |            | (3)                                  Signal the I/O Thread
> +      |(6)         |                                               |
> +      |            |                                              /
> +      |            |                                             /
> +      |            V                                            /
> +      |          Do the post <---------------------------------/
> +      |          processing               (5)
> +      |            |
> +      |            | (6)
> +      |            V
> +      |-Yes------ More async work?
> +                   |
> +                   | (7)
> +                  No
> +                   |
> +                   |
> +                   .
> +                   .
> +
> +    Hence one needs to make sure that in the steps (3) and (4) which run in
> +    parallel, any global data is accessed within only one context.
> +
> +Q.6: I have queued a threadlet which I want to cancel. How do I do that ?
> +A.6: Threadlets framework provides the API cancel_threadlet:
> +       - int cancel_threadletwork(ThreadletWork *work)
> +
> +     The API scans the ThreadletQueue to see if (work) is present. If it finds
> +     work, it'll dequeue work and return 0.
> +
> +     On the other hand, if it does not find the (work) in the ThreadletQueue,
> +     then it'll return 1. This can imply two things. Either the work is being
> +     processed by one of the helper threads or it has been processed. The
> +     threadlet infrastructure currently _does_not_ distinguish between these
> +     two and the onus is on the caller to do that.
> +
> +Q.7: Apart from the global pool of threads, can I have my own private Queue ?
> +A.7: Yes, the threadlets framework allows subsystems to create their own private
> +     queues with associated pools of threads.
> +
> +     - Define a PrivateQueue
> +       ThreadletQueue myQueue;
> +
> +     - Initialize it:
> +       threadlet_queue_init(&myQueue, my_max_threads, my_min_threads);
> +       where my_max_threads is the maximum number of threads that can be in the
> +       thread pool and my_min_threads is the minimum number of active threads
> +       that can be in the thread-pool.
> +
> +     - Submit work:
> +       submit_threadletwork_to_queue(&myQueue, &my_work);
> +
> +     - Cancel work:
> +       cancel_threadletwork_on_queue(&myQueue, &my_work);
> diff --git a/qemu-threadlets.c b/qemu-threadlets.c
> new file mode 100644
> index 0000000..1442122
> --- /dev/null
> +++ b/qemu-threadlets.c
> @@ -0,0 +1,169 @@
> +/*
> + * Threadlet support for offloading tasks to be executed asynchronously
> + *
> + * Copyright IBM, Corp. 2008
> + * Copyright IBM, Corp. 2010
> + *
> + * Authors:
> + *  Anthony Liguori     <aliguori@us.ibm.com>
> + *  Aneesh Kumar K.V    <aneesh.kumar@linux.vnet.ibm.com>
> + *  Gautham R Shenoy    <ego@in.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.  See
> + * the COPYING file in the top-level directory.
> + */
> +
> +#include "qemu-threadlets.h"
> +#include "osdep.h"
> +
> +#define MAX_GLOBAL_THREADS  64
> +#define MIN_GLOBAL_THREADS  64
> +ThreadletQueue globalqueue;

This should be static.

> +static int globalqueue_init;
> +
> +static void *threadlet_worker(void *data)
> +{
> +    ThreadletQueue *queue = data;
> +
> +    qemu_mutex_lock(&(queue->lock));
> +    while (1) {
> +        ThreadletWork *work;
> +        int ret = 0;
> +
> +        while (QTAILQ_EMPTY(&(queue->request_list)) &&
> +               (ret != ETIMEDOUT)) {
> +            ret = qemu_cond_timedwait(&(queue->cond),
> +                                        &(queue->lock), 10*100000);
> +        }
> +
> +        assert(queue->idle_threads != 0);
> +        if (QTAILQ_EMPTY(&(queue->request_list))) {
> +            if (queue->cur_threads > queue->min_threads) {
> +                /* We retain the minimum number of threads */
> +                break;
> +            }
> +        } else {
> +            work = QTAILQ_FIRST(&(queue->request_list));
> +            QTAILQ_REMOVE(&(queue->request_list), work, node);
> +
> +            queue->idle_threads--;
> +            qemu_mutex_unlock(&(queue->lock));
> +
> +            /* execute the work function */
> +            work->func(work);
> +
> +            qemu_mutex_lock(&(queue->lock));
> +            queue->idle_threads++;
> +        }
> +    }
> +
> +    queue->idle_threads--;
> +    queue->cur_threads--;
> +    qemu_mutex_unlock(&(queue->lock));
> +
> +    return NULL;
> +}
> +
> +static void spawn_threadlet(ThreadletQueue *queue)
> +{
> +    QemuThread thread;
> +
> +    queue->cur_threads++;
> +    queue->idle_threads++;
> +
> +    qemu_thread_create(&thread, threadlet_worker, queue);
> +}
> +
> +/**
> + * submit_threadletwork_to_queue: Submit a new task to a private queue to be
> + *                            executed asynchronously.
> + * @queue: Per-subsystem private queue to which the new task needs
> + *         to be submitted.
> + * @work: Contains information about the task that needs to be submitted.
> + */
> +void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work)
> +{
> +    qemu_mutex_lock(&(queue->lock));
> +    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
> +        spawn_threadlet(queue);
> +    }
> +    QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
> +    qemu_mutex_unlock(&(queue->lock));
> +    qemu_cond_signal(&(queue->cond));

Worker thread signalling and spawning has race conditions.  See my
previous email:

"There are race conditions here:

1. When a new threadlet is started because there are no idle threads,
qemu_cond_signal() may fire a blank because the threadlet isn't inside
qemu_cond_timedwait() yet.  The result, the work item is deadlocked
until another thread grabs more work off the queue.  If I'm reading
the code correctly this bug is currently present!
2. Moving qemu_cond_signal() outside queue->lock is dangerous for the
same reason: you need to be careful not to qemu_cond_signal() when the
thread isn't inside qemu_cond_timedwait()."

> +}
> +
> +/**
> + * submit_threadletwork: Submit to the global queue a new task to be executed
> + *                   asynchronously.
> + * @work: Contains information about the task that needs to be submitted.
> + */
> +void submit_threadletwork(ThreadletWork *work)
> +{
> +    if (unlikely(!globalqueue_init)) {
> +        threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
> +                                MIN_GLOBAL_THREADS);
> +        globalqueue_init = 1;
> +    }
> +
> +    submit_threadletwork_to_queue(&globalqueue, work);
> +}
> +
> +/**
> + * cancel_threadletwork_on_queue: Cancel a task queued on a Queue.
> + * @queue: The queue containing the task to be cancelled.
> + * @work: Contains the information of the task that needs to be cancelled.
> + *
> + * Returns: 0 if the task is successfully cancelled.
> + *          1 otherwise.
> + */
> +int cancel_threadletwork_on_queue(ThreadletQueue *queue, ThreadletWork *work)
> +{
> +    ThreadletWork *ret_work;
> +    int found = 0;
> +
> +    qemu_mutex_lock(&(queue->lock));
> +    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
> +        if (ret_work == work) {
> +            QTAILQ_REMOVE(&(queue->request_list), ret_work, node);
> +            found = 1;
> +            break;
> +        }
> +    }
> +    qemu_mutex_unlock(&(queue->lock));
> +
> +    if (found) {
> +        return 0;
> +    }
> +
> +    return 1;

Perhaps invert the logic of "found" and just call it "ret", then you
don't need this if statement that flips its value.

> +}
> +
> +/**
> + * cancel_threadletwork: Cancel a task queued on the global queue.
> + * @work: Contains the information of the task that needs to be cancelled.
> + *
> + * Returns: 0 if the task is successfully cancelled.
> + *          1 otherwise.
> + */
> +int cancel_threadletwork(ThreadletWork *work)
> +{
> +    return cancel_threadletwork_on_queue(&globalqueue, work);
> +}
> +
> +/**
> + * threadlet_queue_init: Initialize a threadlet queue.
> + * @queue: The threadlet queue to be initialized.
> + * @max_threads: Maximum number of threads processing the queue.
> + * @min_threads: Minimum number of threads processing the queue.
> + */
> +void threadlet_queue_init(ThreadletQueue *queue,
> +                                   int max_threads, int min_threads)
> +{
> +    queue->cur_threads  = 0;
> +    queue->idle_threads = 0;
> +    queue->max_threads  = max_threads;
> +    queue->min_threads  = min_threads;
> +    QTAILQ_INIT(&(queue->request_list));
> +    qemu_mutex_init(&(queue->lock));
> +    qemu_cond_init(&(queue->cond));
> +}
> diff --git a/qemu-threadlets.h b/qemu-threadlets.h
> new file mode 100644
> index 0000000..3df9b10
> --- /dev/null
> +++ b/qemu-threadlets.h
> @@ -0,0 +1,48 @@
> +/*
> + * Threadlet support for offloading tasks to be executed asynchronously
> + *
> + * Copyright IBM, Corp. 2008
> + * Copyright IBM, Corp. 2010
> + *
> + * Authors:
> + *  Anthony Liguori     <aliguori@us.ibm.com>
> + *  Aneesh Kumar K.V    <aneesh.kumar@linux.vnet.ibm.com>
> + *  Gautham R Shenoy    <ego@in.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.  See
> + * the COPYING file in the top-level directory.
> + */
> +
> +#ifndef QEMU_ASYNC_WORK_H
> +#define QEMU_ASYNC_WORK_H

QEMU_THREADLETS_H?

> +
> +#include "qemu-queue.h"
> +#include "qemu-common.h"
> +#include "qemu-thread.h"
> +
> +typedef struct ThreadletQueue
> +{
> +    QemuMutex lock;
> +    QemuCond cond;
> +    int max_threads;
> +    int min_threads;
> +    int cur_threads;
> +    int idle_threads;
> +    QTAILQ_HEAD(, threadlet_work) request_list;
> +} ThreadletQueue;
> +
> +typedef struct threadlet_work

struct ThreadletWork follows coding style more closely.

Stefan

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
  2010-10-13 15:31 ` [Qemu-devel] [PATCH 1/3] Introduce threadlets Arun R Bharadwaj
  2010-10-14  9:02   ` Stefan Hajnoczi
@ 2010-10-14  9:15   ` Stefan Hajnoczi
  2010-10-14  9:19     ` Gleb Natapov
  2010-10-14 16:16     ` Avi Kivity
  1 sibling, 2 replies; 29+ messages in thread
From: Stefan Hajnoczi @ 2010-10-14  9:15 UTC (permalink / raw)
  To: Arun R Bharadwaj; +Cc: qemu-devel

I forgot to add that the semantics of cancellation make it difficult
to write correct user code.  Every cancellation user needs to add
extra synchronization after the cancel call to handle the case where
the work is currently executing.

This seems tricky to me and I suspect code using this interface will
be buggy.  How about the following?
1. Add a return value indicating that the work is currently executing
(this still requires the caller to add extra synchronization but is at
least explicit) versus work is no longer on the list.
2. Add a flag to block until the work has been cancelled or completed.
 This is useful to callers who are allowed to block.

Stefan

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
  2010-10-14  9:15   ` [Qemu-devel] " Stefan Hajnoczi
@ 2010-10-14  9:19     ` Gleb Natapov
  2010-10-14 16:16     ` Avi Kivity
  1 sibling, 0 replies; 29+ messages in thread
From: Gleb Natapov @ 2010-10-14  9:19 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: Arun R Bharadwaj, qemu-devel

On Thu, Oct 14, 2010 at 10:15:30AM +0100, Stefan Hajnoczi wrote:
> I forgot to add that the semantics of cancellation make it difficult
> to write correct user code.  Every cancellation user needs to add
> extra synchronization after the cancel call to handle the case where
> the work is currently executing.
> 
> This seems tricky to me and I suspect code using this interface will
> be buggy.  How about the following?
> 1. Add a return value indicating that the work is currently executing
> (this still requires the caller to add extra synchronization but is at
> least explicit) versus work is no longer on the list.
> 2. Add a flag to block until the work has been cancelled or completed.
>  This is useful to callers who are allowed to block.
> 
In Linux kernel you usually have two function cancel() and
cancel_sync(). Second variant waits for work completion.

--
			Gleb.

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
  2010-10-14  9:15   ` [Qemu-devel] " Stefan Hajnoczi
  2010-10-14  9:19     ` Gleb Natapov
@ 2010-10-14 16:16     ` Avi Kivity
  2010-10-14 21:32       ` Venkateswararao Jujjuri (JV)
  2010-10-15  8:05       ` Stefan Hajnoczi
  1 sibling, 2 replies; 29+ messages in thread
From: Avi Kivity @ 2010-10-14 16:16 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: Arun R Bharadwaj, qemu-devel

  On 10/14/2010 11:15 AM, Stefan Hajnoczi wrote:
> I forgot to add that the semantics of cancellation make it difficult
> to write correct user code.  Every cancellation user needs to add
> extra synchronization after the cancel call to handle the case where
> the work is currently executing.
>
> This seems tricky to me and I suspect code using this interface will
> be buggy.  How about the following?
> 1. Add a return value indicating that the work is currently executing
> (this still requires the caller to add extra synchronization but is at
> least explicit) versus work is no longer on the list.
> 2. Add a flag to block until the work has been cancelled or completed.
>   This is useful to callers who are allowed to block.

Blocking is somewhat against the spirit of the thing, no?  While I agree 
that the current cancel API is hard to use correctly, blocking defeats 
the purpose of the API.

-- 
error compiling committee.c: too many arguments to function

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
  2010-10-14  9:02   ` Stefan Hajnoczi
@ 2010-10-14 21:17     ` Venkateswararao Jujjuri (JV)
  2010-10-15  9:52       ` Stefan Hajnoczi
  2010-10-15 14:42     ` [Qemu-devel] " Paolo Bonzini
  1 sibling, 1 reply; 29+ messages in thread
From: Venkateswararao Jujjuri (JV) @ 2010-10-14 21:17 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: Arun R Bharadwaj, qemu-devel

On 10/14/2010 2:02 AM, Stefan Hajnoczi wrote:
> On Wed, Oct 13, 2010 at 4:31 PM, Arun R Bharadwaj
> <arun@linux.vnet.ibm.com> wrote:
>> From: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
>>
>> This patch creates a generic asynchronous-task-offloading infrastructure named
>> threadlets. The core idea has been borrowed from the threading framework that
>> is being used by paio.
>>
>> The reason for creating this generic infrastructure is so that other subsystems,
>> such as virtio-9p could make use of it for offloading tasks that could block.
>>
>> The patch creates a global queue on-to which subsystems can queue their tasks to
>> be executed asynchronously.
>>
>> The patch also provides API's that allow a subsystem to create a private queue
>> with an associated pool of threads.
>>
>> [ego@in.ibm.com: Facelift of the code, Documentation, cancel_threadlet
>> and other helpers]
>>
>> Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
>> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
>> Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
>> Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
>> ---
>>  Makefile.objs          |    3 +
>>  docs/async-support.txt |  141 ++++++++++++++++++++++++++++++++++++++++
>>  qemu-threadlets.c      |  169 ++++++++++++++++++++++++++++++++++++++++++++++++
>>  qemu-threadlets.h      |   48 ++++++++++++++
>>  4 files changed, 360 insertions(+), 1 deletions(-)
>>  create mode 100644 docs/async-support.txt
>>  create mode 100644 qemu-threadlets.c
>>  create mode 100644 qemu-threadlets.h
>>
>> diff --git a/Makefile.objs b/Makefile.objs
>> index cd5a24b..2cf8aba 100644
>> --- a/Makefile.objs
>> +++ b/Makefile.objs
>> @@ -9,6 +9,8 @@ qobject-obj-y += qerror.o
>>
>>  block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
>>  block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
>> +block-obj-$(CONFIG_POSIX) += qemu-thread.o
>> +block-obj-$(CONFIG_POSIX) += qemu-threadlets.o
>>  block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
>>  block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>>
>> @@ -124,7 +126,6 @@ endif
>>  common-obj-y += $(addprefix ui/, $(ui-obj-y))
>>
>>  common-obj-y += iov.o acl.o
>> -common-obj-$(CONFIG_THREAD) += qemu-thread.o
>>  common-obj-y += notify.o event_notifier.o
>>  common-obj-y += qemu-timer.o
>>
>> diff --git a/docs/async-support.txt b/docs/async-support.txt
>> new file mode 100644
>> index 0000000..2e8adc9
>> --- /dev/null
>> +++ b/docs/async-support.txt
> 
> Why not call it docs/threadlets.txt?
> 
>> @@ -0,0 +1,141 @@
>> +== How to use the various asynchronous models supported in Qemu ==
> 
> This only describes threadlets.
> 
>> +
>> +== Threadlets ==
>> +
>> +Q.1: What are threadlets ?
>> +A.1: Threadlets is an infrastructure within QEMU that allows other subsystems
>> +     to offload possibly blocking work to a queue to be processed by a pool
>> +     of threads asynchrnonously.
> 
> asynchronously typo
> 
>> +
>> +Q.2: When would one want to use threadlets ?
>> +A.2: Threadlets are useful when there are operations that can be performed
>> +     outside the context of the VCPU/IO threads inorder to free these latter
>> +     to service any other guest requests.
>> +
>> +Q.3: I have some work that can be executed in an asynchronous context. How
>> +     should I go about it ?
>> +A.3: One could follow the steps listed below:
>> +
>> +     - Define a function which would do the asynchrnous work.
> 
> asynchronous typo
> 
>> +       void my_threadlet_func(ThreadletWork *work)
> 
> Usually these functions will be static.
> 
>> +       {
>> +       }
>> +
>> +     - Declare an object of type ThreadletWork;
>> +       ThreadletWork work;
>> +
>> +
>> +     - Assign a value to the "func" member of ThreadletWork object.
>> +       work.func = my_threadlet_func;
>> +
>> +     - Submit the threadlet to the global queue.
>> +       submit_threadletwork(&work);
>> +
>> +     - Continue servicing some other guest operations.
>> +
>> +Q.4: I want to my_threadlet_func to access some non-global data. How do I do
>> +     that ?
>> +A.4: Suppose you want my_threadlet_func to access some non-global data-object
>> +     of type myPrivateData. In that case one could follow the following steps.
>> +
>> +     - Define a member of the type ThreadletWork within myPrivateData.
>> +       typdef myPrivateData {
> 
> typedef typo, struct missing
> 
> Also, the QEMU coding style usually requires CapsNames for struct types...
> 
>> +               ...;
>> +               ...;
>> +               ...;
>> +               ThreadletWork work;
>> +       } myPrivateData;
>> +
>> +       myPrivateData myData;
> 
> ...and myData would be my_data.
> 
>> +
>> +     - Initialize myData.work as described in A.3
>> +       myData.work.func = my_threadlet_func;
>> +       submit_threadletwork(&myData.work);
>> +
>> +     - Access the myData object inside my_threadlet_func() using container_of
>> +       primitive
>> +       void my_threadlet_func(ThreadletWork *work)
>> +       {
>> +               myPrivateData *mydata_ptr;
>> +               mydata_ptr = container_of(work, myPrivateData, work);
>> +
>> +               /* mydata_ptr now points to myData object */
>> +       }
>> +
>> +Q.5: Are there any precautions one must take while sharing data with the
>> +     Asynchrnous thread-pool ?
> 
> asynchronous typo
> 
>> +A.5: Yes, make sure that the helper function of the type my_threadlet_func()
>> +     does not access/modify data when it can be accessed or modified in the
>> +     context of VCPU thread or IO thread. This is because the asynchronous
>> +     threads in the pool can run in parallel with the VCPU/IOThreads as shown
>> +     in the figure.
>> +
>> +     A typical workflow is as follows:
>> +
>> +              VCPU/IOThread
>> +                   |
>> +                   | (1)
>> +                   |
>> +                   V
>> +                Offload work              (2)
>> +      |-------> to threadlets -----------------------------> Helper thread
>> +      |            |                                               |
>> +      |            |                                               |
>> +      |            | (3)                                           | (4)
>> +      |            |                                               |
>> +      |         Handle other Guest requests                        |
>> +      |            |                                               |
>> +      |            |                                               V
>> +      |            | (3)                                  Signal the I/O Thread
>> +      |(6)         |                                               |
>> +      |            |                                              /
>> +      |            |                                             /
>> +      |            V                                            /
>> +      |          Do the post <---------------------------------/
>> +      |          processing               (5)
>> +      |            |
>> +      |            | (6)
>> +      |            V
>> +      |-Yes------ More async work?
>> +                   |
>> +                   | (7)
>> +                  No
>> +                   |
>> +                   |
>> +                   .
>> +                   .
>> +
>> +    Hence one needs to make sure that in the steps (3) and (4) which run in
>> +    parallel, any global data is accessed within only one context.
>> +
>> +Q.6: I have queued a threadlet which I want to cancel. How do I do that ?
>> +A.6: Threadlets framework provides the API cancel_threadlet:
>> +       - int cancel_threadletwork(ThreadletWork *work)
>> +
>> +     The API scans the ThreadletQueue to see if (work) is present. If it finds
>> +     work, it'll dequeue work and return 0.
>> +
>> +     On the other hand, if it does not find the (work) in the ThreadletQueue,
>> +     then it'll return 1. This can imply two things. Either the work is being
>> +     processed by one of the helper threads or it has been processed. The
>> +     threadlet infrastructure currently _does_not_ distinguish between these
>> +     two and the onus is on the caller to do that.
>> +
>> +Q.7: Apart from the global pool of threads, can I have my own private Queue ?
>> +A.7: Yes, the threadlets framework allows subsystems to create their own private
>> +     queues with associated pools of threads.
>> +
>> +     - Define a PrivateQueue
>> +       ThreadletQueue myQueue;
>> +
>> +     - Initialize it:
>> +       threadlet_queue_init(&myQueue, my_max_threads, my_min_threads);
>> +       where my_max_threads is the maximum number of threads that can be in the
>> +       thread pool and my_min_threads is the minimum number of active threads
>> +       that can be in the thread-pool.
>> +
>> +     - Submit work:
>> +       submit_threadletwork_to_queue(&myQueue, &my_work);
>> +
>> +     - Cancel work:
>> +       cancel_threadletwork_on_queue(&myQueue, &my_work);
>> diff --git a/qemu-threadlets.c b/qemu-threadlets.c
>> new file mode 100644
>> index 0000000..1442122
>> --- /dev/null
>> +++ b/qemu-threadlets.c
>> @@ -0,0 +1,169 @@
>> +/*
>> + * Threadlet support for offloading tasks to be executed asynchronously
>> + *
>> + * Copyright IBM, Corp. 2008
>> + * Copyright IBM, Corp. 2010
>> + *
>> + * Authors:
>> + *  Anthony Liguori     <aliguori@us.ibm.com>
>> + *  Aneesh Kumar K.V    <aneesh.kumar@linux.vnet.ibm.com>
>> + *  Gautham R Shenoy    <ego@in.ibm.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2.  See
>> + * the COPYING file in the top-level directory.
>> + */
>> +
>> +#include "qemu-threadlets.h"
>> +#include "osdep.h"
>> +
>> +#define MAX_GLOBAL_THREADS  64
>> +#define MIN_GLOBAL_THREADS  64
>> +ThreadletQueue globalqueue;
> 
> This should be static.
> 
>> +static int globalqueue_init;
>> +
>> +static void *threadlet_worker(void *data)
>> +{
>> +    ThreadletQueue *queue = data;
>> +
>> +    qemu_mutex_lock(&(queue->lock));
>> +    while (1) {
>> +        ThreadletWork *work;
>> +        int ret = 0;
>> +
>> +        while (QTAILQ_EMPTY(&(queue->request_list)) &&
>> +               (ret != ETIMEDOUT)) {
>> +            ret = qemu_cond_timedwait(&(queue->cond),
>> +                                        &(queue->lock), 10*100000);
>> +        }
>> +
>> +        assert(queue->idle_threads != 0);
>> +        if (QTAILQ_EMPTY(&(queue->request_list))) {
>> +            if (queue->cur_threads > queue->min_threads) {
>> +                /* We retain the minimum number of threads */
>> +                break;
>> +            }
>> +        } else {
>> +            work = QTAILQ_FIRST(&(queue->request_list));
>> +            QTAILQ_REMOVE(&(queue->request_list), work, node);
>> +
>> +            queue->idle_threads--;
>> +            qemu_mutex_unlock(&(queue->lock));
>> +
>> +            /* execute the work function */
>> +            work->func(work);
>> +
>> +            qemu_mutex_lock(&(queue->lock));
>> +            queue->idle_threads++;
>> +        }
>> +    }
>> +
>> +    queue->idle_threads--;
>> +    queue->cur_threads--;
>> +    qemu_mutex_unlock(&(queue->lock));
>> +
>> +    return NULL;
>> +}
>> +
>> +static void spawn_threadlet(ThreadletQueue *queue)
>> +{
>> +    QemuThread thread;
>> +
>> +    queue->cur_threads++;
>> +    queue->idle_threads++;
>> +
>> +    qemu_thread_create(&thread, threadlet_worker, queue);
>> +}
>> +
>> +/**
>> + * submit_threadletwork_to_queue: Submit a new task to a private queue to be
>> + *                            executed asynchronously.
>> + * @queue: Per-subsystem private queue to which the new task needs
>> + *         to be submitted.
>> + * @work: Contains information about the task that needs to be submitted.
>> + */
>> +void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work)
>> +{
>> +    qemu_mutex_lock(&(queue->lock));
>> +    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
>> +        spawn_threadlet(queue);
>> +    }
>> +    QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
>> +    qemu_mutex_unlock(&(queue->lock));
>> +    qemu_cond_signal(&(queue->cond));
> 
> Worker thread signalling and spawning has race conditions.  See my
> previous email:
> 
> "There are race conditions here:
> 
> 1. When a new threadlet is started because there are no idle threads,
> qemu_cond_signal() may fire a blank because the threadlet isn't inside
> qemu_cond_timedwait() yet.  The result, the work item is deadlocked
> until another thread grabs more work off the queue.  If I'm reading
> the code correctly this bug is currently present!

Moving QTAILQ_INSERT_TAIL() ahead of spawn_threadlet() should take care of this
issue
right?

If not the new work should be blocked until the new thread wait is timed out
or another threadlet done with its previous job..which ever occurs first.
As the worker_thread go and recheck the queue at the end of each timeout.


> 2. Moving qemu_cond_signal() outside queue->lock is dangerous for the
> same reason: you need to be careful not to qemu_cond_signal() when the
> thread isn't inside qemu_cond_timedwait()."

Again here the worst case scenario is timedwait duration if the work is not flowing
continuously. If we have to employ another synchronization technique, with
another set of locks etc..we need to be sure that the additional cost is justified.

Thanks,
JV

> 
>> +}
>> +
>> +/**
>> + * submit_threadletwork: Submit to the global queue a new task to be executed
>> + *                   asynchronously.
>> + * @work: Contains information about the task that needs to be submitted.
>> + */
>> +void submit_threadletwork(ThreadletWork *work)
>> +{
>> +    if (unlikely(!globalqueue_init)) {
>> +        threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
>> +                                MIN_GLOBAL_THREADS);
>> +        globalqueue_init = 1;
>> +    }
>> +
>> +    submit_threadletwork_to_queue(&globalqueue, work);
>> +}
>> +
>> +/**
>> + * cancel_threadletwork_on_queue: Cancel a task queued on a Queue.
>> + * @queue: The queue containing the task to be cancelled.
>> + * @work: Contains the information of the task that needs to be cancelled.
>> + *
>> + * Returns: 0 if the task is successfully cancelled.
>> + *          1 otherwise.
>> + */
>> +int cancel_threadletwork_on_queue(ThreadletQueue *queue, ThreadletWork *work)
>> +{
>> +    ThreadletWork *ret_work;
>> +    int found = 0;
>> +
>> +    qemu_mutex_lock(&(queue->lock));
>> +    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
>> +        if (ret_work == work) {
>> +            QTAILQ_REMOVE(&(queue->request_list), ret_work, node);
>> +            found = 1;
>> +            break;
>> +        }
>> +    }
>> +    qemu_mutex_unlock(&(queue->lock));
>> +
>> +    if (found) {
>> +        return 0;
>> +    }
>> +
>> +    return 1;
> 
> Perhaps invert the logic of "found" and just call it "ret", then you
> don't need this if statement that flips its value.
> 
>> +}
>> +
>> +/**
>> + * cancel_threadletwork: Cancel a task queued on the global queue.
>> + * @work: Contains the information of the task that needs to be cancelled.
>> + *
>> + * Returns: 0 if the task is successfully cancelled.
>> + *          1 otherwise.
>> + */
>> +int cancel_threadletwork(ThreadletWork *work)
>> +{
>> +    return cancel_threadletwork_on_queue(&globalqueue, work);
>> +}
>> +
>> +/**
>> + * threadlet_queue_init: Initialize a threadlet queue.
>> + * @queue: The threadlet queue to be initialized.
>> + * @max_threads: Maximum number of threads processing the queue.
>> + * @min_threads: Minimum number of threads processing the queue.
>> + */
>> +void threadlet_queue_init(ThreadletQueue *queue,
>> +                                   int max_threads, int min_threads)
>> +{
>> +    queue->cur_threads  = 0;
>> +    queue->idle_threads = 0;
>> +    queue->max_threads  = max_threads;
>> +    queue->min_threads  = min_threads;
>> +    QTAILQ_INIT(&(queue->request_list));
>> +    qemu_mutex_init(&(queue->lock));
>> +    qemu_cond_init(&(queue->cond));
>> +}
>> diff --git a/qemu-threadlets.h b/qemu-threadlets.h
>> new file mode 100644
>> index 0000000..3df9b10
>> --- /dev/null
>> +++ b/qemu-threadlets.h
>> @@ -0,0 +1,48 @@
>> +/*
>> + * Threadlet support for offloading tasks to be executed asynchronously
>> + *
>> + * Copyright IBM, Corp. 2008
>> + * Copyright IBM, Corp. 2010
>> + *
>> + * Authors:
>> + *  Anthony Liguori     <aliguori@us.ibm.com>
>> + *  Aneesh Kumar K.V    <aneesh.kumar@linux.vnet.ibm.com>
>> + *  Gautham R Shenoy    <ego@in.ibm.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2.  See
>> + * the COPYING file in the top-level directory.
>> + */
>> +
>> +#ifndef QEMU_ASYNC_WORK_H
>> +#define QEMU_ASYNC_WORK_H
> 
> QEMU_THREADLETS_H?
> 
>> +
>> +#include "qemu-queue.h"
>> +#include "qemu-common.h"
>> +#include "qemu-thread.h"
>> +
>> +typedef struct ThreadletQueue
>> +{
>> +    QemuMutex lock;
>> +    QemuCond cond;
>> +    int max_threads;
>> +    int min_threads;
>> +    int cur_threads;
>> +    int idle_threads;
>> +    QTAILQ_HEAD(, threadlet_work) request_list;
>> +} ThreadletQueue;
>> +
>> +typedef struct threadlet_work
> 
> struct ThreadletWork follows coding style more closely.
> 
> Stefan
> 

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
  2010-10-14 16:16     ` Avi Kivity
@ 2010-10-14 21:32       ` Venkateswararao Jujjuri (JV)
  2010-10-17  8:57         ` Avi Kivity
  2010-10-15  8:05       ` Stefan Hajnoczi
  1 sibling, 1 reply; 29+ messages in thread
From: Venkateswararao Jujjuri (JV) @ 2010-10-14 21:32 UTC (permalink / raw)
  To: Avi Kivity; +Cc: Stefan Hajnoczi, qemu-devel, Arun R Bharadwaj

On 10/14/2010 9:16 AM, Avi Kivity wrote:
>  On 10/14/2010 11:15 AM, Stefan Hajnoczi wrote:
>> I forgot to add that the semantics of cancellation make it difficult
>> to write correct user code.  Every cancellation user needs to add
>> extra synchronization after the cancel call to handle the case where
>> the work is currently executing.
>>
>> This seems tricky to me and I suspect code using this interface will
>> be buggy.  How about the following?
>> 1. Add a return value indicating that the work is currently executing
>> (this still requires the caller to add extra synchronization but is at
>> least explicit) versus work is no longer on the list.

Current semantics is .. if not on the list it is currently executing or we are
done with it.
How do we differentiate between those two? Do we need any other state?
>> 2. Add a flag to block until the work has been cancelled or completed.
>>   This is useful to callers who are allowed to block.
> 
> Blocking is somewhat against the spirit of the thing, no?  While I agree that
> the current cancel API is hard to use correctly, blocking defeats the purpose of
> the API.
> 
Are you proposing to add additional state in the return
(canceled/running/not-canceled)
and leave the synchronization part to the user?
i.e not to provide any additional interface for the user to wait
for the scheduled work to finish? Just trying to understand.

Thanks,
JV

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
  2010-10-14 16:16     ` Avi Kivity
  2010-10-14 21:32       ` Venkateswararao Jujjuri (JV)
@ 2010-10-15  8:05       ` Stefan Hajnoczi
  1 sibling, 0 replies; 29+ messages in thread
From: Stefan Hajnoczi @ 2010-10-15  8:05 UTC (permalink / raw)
  To: Avi Kivity; +Cc: Arun R Bharadwaj, qemu-devel

On Thu, Oct 14, 2010 at 5:16 PM, Avi Kivity <avi@redhat.com> wrote:
>  On 10/14/2010 11:15 AM, Stefan Hajnoczi wrote:
>>
>> I forgot to add that the semantics of cancellation make it difficult
>> to write correct user code.  Every cancellation user needs to add
>> extra synchronization after the cancel call to handle the case where
>> the work is currently executing.
>>
>> This seems tricky to me and I suspect code using this interface will
>> be buggy.  How about the following?
>> 1. Add a return value indicating that the work is currently executing
>> (this still requires the caller to add extra synchronization but is at
>> least explicit) versus work is no longer on the list.
>> 2. Add a flag to block until the work has been cancelled or completed.
>>  This is useful to callers who are allowed to block.
>
> Blocking is somewhat against the spirit of the thing, no?  While I agree
> that the current cancel API is hard to use correctly, blocking defeats the
> purpose of the API.

Yes, that's why it would have to be an optional flag or a separate
function.  The idea is that callers who are able to block on
cancellation can use common code instead of implementing it from
scratch each time.

Stefan

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
  2010-10-14 21:17     ` Venkateswararao Jujjuri (JV)
@ 2010-10-15  9:52       ` Stefan Hajnoczi
  2010-10-15 14:56         ` Venkateswararao Jujjuri (JV)
  0 siblings, 1 reply; 29+ messages in thread
From: Stefan Hajnoczi @ 2010-10-15  9:52 UTC (permalink / raw)
  To: Venkateswararao Jujjuri (JV); +Cc: Arun R Bharadwaj, qemu-devel

On Thu, Oct 14, 2010 at 10:17 PM, Venkateswararao Jujjuri (JV)
<jvrao@linux.vnet.ibm.com> wrote:
> On 10/14/2010 2:02 AM, Stefan Hajnoczi wrote:
>> On Wed, Oct 13, 2010 at 4:31 PM, Arun R Bharadwaj
>>> +static void *threadlet_worker(void *data)
>>> +{
>>> +    ThreadletQueue *queue = data;
>>> +
>>> +    qemu_mutex_lock(&(queue->lock));
>>> +    while (1) {
>>> +        ThreadletWork *work;
>>> +        int ret = 0;
>>> +
>>> +        while (QTAILQ_EMPTY(&(queue->request_list)) &&
>>> +               (ret != ETIMEDOUT)) {
>>> +            ret = qemu_cond_timedwait(&(queue->cond),
>>> +                                        &(queue->lock), 10*100000);
>>> +        }
>>> +
>>> +        assert(queue->idle_threads != 0);
>>> +        if (QTAILQ_EMPTY(&(queue->request_list))) {
>>> +            if (queue->cur_threads > queue->min_threads) {
>>> +                /* We retain the minimum number of threads */
>>> +                break;
>>> +            }
>>> +        } else {
>>> +            work = QTAILQ_FIRST(&(queue->request_list));
>>> +            QTAILQ_REMOVE(&(queue->request_list), work, node);
>>> +
>>> +            queue->idle_threads--;
>>> +            qemu_mutex_unlock(&(queue->lock));
>>> +
>>> +            /* execute the work function */
>>> +            work->func(work);
>>> +
>>> +            qemu_mutex_lock(&(queue->lock));
>>> +            queue->idle_threads++;
>>> +        }
>>> +    }
>>> +
>>> +    queue->idle_threads--;
>>> +    queue->cur_threads--;
>>> +    qemu_mutex_unlock(&(queue->lock));
>>> +
>>> +    return NULL;
>>> +}
>>> +
>>> +static void spawn_threadlet(ThreadletQueue *queue)
>>> +{
>>> +    QemuThread thread;
>>> +
>>> +    queue->cur_threads++;
>>> +    queue->idle_threads++;
>>> +
>>> +    qemu_thread_create(&thread, threadlet_worker, queue);
>>> +}
>>> +
>>> +/**
>>> + * submit_threadletwork_to_queue: Submit a new task to a private queue to be
>>> + *                            executed asynchronously.
>>> + * @queue: Per-subsystem private queue to which the new task needs
>>> + *         to be submitted.
>>> + * @work: Contains information about the task that needs to be submitted.
>>> + */
>>> +void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work)
>>> +{
>>> +    qemu_mutex_lock(&(queue->lock));
>>> +    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
>>> +        spawn_threadlet(queue);
>>> +    }
>>> +    QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
>>> +    qemu_mutex_unlock(&(queue->lock));
>>> +    qemu_cond_signal(&(queue->cond));
>>
>> Worker thread signalling and spawning has race conditions.  See my
>> previous email:
>>
>> "There are race conditions here:
>>
>> 1. When a new threadlet is started because there are no idle threads,
>> qemu_cond_signal() may fire a blank because the threadlet isn't inside
>> qemu_cond_timedwait() yet.  The result, the work item is deadlocked
>> until another thread grabs more work off the queue.  If I'm reading
>> the code correctly this bug is currently present!
>
> Moving QTAILQ_INSERT_TAIL() ahead of spawn_threadlet() should take care of this
> issue
> right?

I didn't read the code correctly.  queue->lock is already held by
submit_threadletwork_to_queue() until after QTAILQ_INSERT_TAIL().
threadlet_worker() will only enter its main loop once it acquires
queue->lock.  Therefore the queue definitely has the work before the
spawned thread begins processing.

The work is on the queue when threadlet_worker() enters its main loop,
so it will not need to wait on queue->cond but can process work
immediately.  There is no spawn race condition.

Stefan

^ permalink raw reply	[flat|nested] 29+ messages in thread

* [Qemu-devel] Re: [PATCH 1/3] Introduce threadlets
  2010-10-14  9:02   ` Stefan Hajnoczi
  2010-10-14 21:17     ` Venkateswararao Jujjuri (JV)
@ 2010-10-15 14:42     ` Paolo Bonzini
  1 sibling, 0 replies; 29+ messages in thread
From: Paolo Bonzini @ 2010-10-15 14:42 UTC (permalink / raw)
  To: qemu-devel; +Cc: Arun R Bharadwaj

On 10/14/2010 11:02 AM, Stefan Hajnoczi wrote:
> 2. Moving qemu_cond_signal() outside queue->lock is dangerous for the
> same reason: you need to be careful not to qemu_cond_signal() when the
> thread isn't inside qemu_cond_timedwait()."

Yes, please do so.

I personally consider it bad programming practice to put the condvar 
signal/broadcast outside the mutex.  While I understand that my ideas 
may not matter much, some Mr. Hoare actually invented them without 
signal-outside-lock, and he probably knows better than anyone reading.

Paolo

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
  2010-10-15  9:52       ` Stefan Hajnoczi
@ 2010-10-15 14:56         ` Venkateswararao Jujjuri (JV)
  0 siblings, 0 replies; 29+ messages in thread
From: Venkateswararao Jujjuri (JV) @ 2010-10-15 14:56 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: Arun R Bharadwaj, qemu-devel

On 10/15/2010 2:52 AM, Stefan Hajnoczi wrote:
> On Thu, Oct 14, 2010 at 10:17 PM, Venkateswararao Jujjuri (JV)
> <jvrao@linux.vnet.ibm.com> wrote:
>> On 10/14/2010 2:02 AM, Stefan Hajnoczi wrote:
>>> On Wed, Oct 13, 2010 at 4:31 PM, Arun R Bharadwaj
>>>> +static void *threadlet_worker(void *data)
>>>> +{
>>>> +    ThreadletQueue *queue = data;
>>>> +
>>>> +    qemu_mutex_lock(&(queue->lock));
>>>> +    while (1) {
>>>> +        ThreadletWork *work;
>>>> +        int ret = 0;
>>>> +
>>>> +        while (QTAILQ_EMPTY(&(queue->request_list)) &&
>>>> +               (ret != ETIMEDOUT)) {
>>>> +            ret = qemu_cond_timedwait(&(queue->cond),
>>>> +                                        &(queue->lock), 10*100000);
>>>> +        }
>>>> +
>>>> +        assert(queue->idle_threads != 0);
>>>> +        if (QTAILQ_EMPTY(&(queue->request_list))) {
>>>> +            if (queue->cur_threads > queue->min_threads) {
>>>> +                /* We retain the minimum number of threads */
>>>> +                break;
>>>> +            }
>>>> +        } else {
>>>> +            work = QTAILQ_FIRST(&(queue->request_list));
>>>> +            QTAILQ_REMOVE(&(queue->request_list), work, node);
>>>> +
>>>> +            queue->idle_threads--;
>>>> +            qemu_mutex_unlock(&(queue->lock));
>>>> +
>>>> +            /* execute the work function */
>>>> +            work->func(work);
>>>> +
>>>> +            qemu_mutex_lock(&(queue->lock));
>>>> +            queue->idle_threads++;
>>>> +        }
>>>> +    }
>>>> +
>>>> +    queue->idle_threads--;
>>>> +    queue->cur_threads--;
>>>> +    qemu_mutex_unlock(&(queue->lock));
>>>> +
>>>> +    return NULL;
>>>> +}
>>>> +
>>>> +static void spawn_threadlet(ThreadletQueue *queue)
>>>> +{
>>>> +    QemuThread thread;
>>>> +
>>>> +    queue->cur_threads++;
>>>> +    queue->idle_threads++;
>>>> +
>>>> +    qemu_thread_create(&thread, threadlet_worker, queue);
>>>> +}
>>>> +
>>>> +/**
>>>> + * submit_threadletwork_to_queue: Submit a new task to a private queue to be
>>>> + *                            executed asynchronously.
>>>> + * @queue: Per-subsystem private queue to which the new task needs
>>>> + *         to be submitted.
>>>> + * @work: Contains information about the task that needs to be submitted.
>>>> + */
>>>> +void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work)
>>>> +{
>>>> +    qemu_mutex_lock(&(queue->lock));
>>>> +    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
>>>> +        spawn_threadlet(queue);
>>>> +    }
>>>> +    QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
>>>> +    qemu_mutex_unlock(&(queue->lock));
>>>> +    qemu_cond_signal(&(queue->cond));
>>>
>>> Worker thread signalling and spawning has race conditions.  See my
>>> previous email:
>>>
>>> "There are race conditions here:
>>>
>>> 1. When a new threadlet is started because there are no idle threads,
>>> qemu_cond_signal() may fire a blank because the threadlet isn't inside
>>> qemu_cond_timedwait() yet.  The result, the work item is deadlocked
>>> until another thread grabs more work off the queue.  If I'm reading
>>> the code correctly this bug is currently present!
>>
>> Moving QTAILQ_INSERT_TAIL() ahead of spawn_threadlet() should take care of this
>> issue
>> right?
> 
> I didn't read the code correctly.  queue->lock is already held by
> submit_threadletwork_to_queue() until after QTAILQ_INSERT_TAIL().
> threadlet_worker() will only enter its main loop once it acquires
> queue->lock.  Therefore the queue definitely has the work before the
> spawned thread begins processing.
> 
> The work is on the queue when threadlet_worker() enters its main loop,
> so it will not need to wait on queue->cond but can process work
> immediately.  There is no spawn race condition.

Correct...I too missed that. :)

JV

> 
> Stefan

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
  2010-10-14 21:32       ` Venkateswararao Jujjuri (JV)
@ 2010-10-17  8:57         ` Avi Kivity
  2010-10-18 10:47           ` Arun R Bharadwaj
  0 siblings, 1 reply; 29+ messages in thread
From: Avi Kivity @ 2010-10-17  8:57 UTC (permalink / raw)
  To: Venkateswararao Jujjuri (JV)
  Cc: Stefan Hajnoczi, qemu-devel, Arun R Bharadwaj

  On 10/14/2010 11:32 PM, Venkateswararao Jujjuri (JV) wrote:
> >
> >  Blocking is somewhat against the spirit of the thing, no?  While I agree that
> >  the current cancel API is hard to use correctly, blocking defeats the purpose of
> >  the API.
> >
> Are you proposing to add additional state in the return
> (canceled/running/not-canceled)
> and leave the synchronization part to the user?
> i.e not to provide any additional interface for the user to wait
> for the scheduled work to finish? Just trying to understand.

I wasn't proposing anything since I don't have a good proposal.  Adding 
a callback makes the whole thing an asynchronous design which threads 
are trying to avoid.  Blocking is bad.  Leaving it to the caller is hard 
to use correctly.

Perhaps we can have a threadlet with barrier semantics.  You queue a 
piece of work which is guaranteed to execute after all previously 
submitted work (against the same queue) and before any consequently 
submitted work.

-- 
error compiling committee.c: too many arguments to function

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
  2010-10-17  8:57         ` Avi Kivity
@ 2010-10-18 10:47           ` Arun R Bharadwaj
  2010-10-18 12:29             ` Avi Kivity
  0 siblings, 1 reply; 29+ messages in thread
From: Arun R Bharadwaj @ 2010-10-18 10:47 UTC (permalink / raw)
  To: Avi Kivity; +Cc: Stefan Hajnoczi, Venkateswararao Jujjuri (JV), qemu-devel

* Avi Kivity <avi@redhat.com> [2010-10-17 10:57:23]:

>  On 10/14/2010 11:32 PM, Venkateswararao Jujjuri (JV) wrote:
> >>
> >>  Blocking is somewhat against the spirit of the thing, no?  While I agree that
> >>  the current cancel API is hard to use correctly, blocking defeats the purpose of
> >>  the API.
> >>
> >Are you proposing to add additional state in the return
> >(canceled/running/not-canceled)
> >and leave the synchronization part to the user?
> >i.e not to provide any additional interface for the user to wait
> >for the scheduled work to finish? Just trying to understand.
> 
> I wasn't proposing anything since I don't have a good proposal.
> Adding a callback makes the whole thing an asynchronous design which
> threads are trying to avoid.  Blocking is bad.  Leaving it to the
> caller is hard to use correctly.
> 
> Perhaps we can have a threadlet with barrier semantics.  You queue a
> piece of work which is guaranteed to execute after all previously
> submitted work (against the same queue) and before any consequently
> submitted work.
> 
> -- 
> error compiling committee.c: too many arguments to function
> 
> 

I would suggest that we have 2 APIs - cancel_threadletwork (current
cancel implementation) and cancel_threadletwork_sync (waits for work
to complete). As of now there is no known user for
cancel_threadletwork_sync. So we can keep this as a TODO for later. I
can provide the APIs for both these so that when we have a user for
cancel_threadletwork_sync, we can go ahead and implement it.

-arun 

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
  2010-10-18 10:47           ` Arun R Bharadwaj
@ 2010-10-18 12:29             ` Avi Kivity
  0 siblings, 0 replies; 29+ messages in thread
From: Avi Kivity @ 2010-10-18 12:29 UTC (permalink / raw)
  To: arun; +Cc: Stefan Hajnoczi, Venkateswararao Jujjuri (JV), qemu-devel

  On 10/18/2010 12:47 PM, Arun R Bharadwaj wrote:
> * Avi Kivity<avi@redhat.com>  [2010-10-17 10:57:23]:
>
> >   On 10/14/2010 11:32 PM, Venkateswararao Jujjuri (JV) wrote:
> >  >>
> >  >>   Blocking is somewhat against the spirit of the thing, no?  While I agree that
> >  >>   the current cancel API is hard to use correctly, blocking defeats the purpose of
> >  >>   the API.
> >  >>
> >  >Are you proposing to add additional state in the return
> >  >(canceled/running/not-canceled)
> >  >and leave the synchronization part to the user?
> >  >i.e not to provide any additional interface for the user to wait
> >  >for the scheduled work to finish? Just trying to understand.
> >
> >  I wasn't proposing anything since I don't have a good proposal.
> >  Adding a callback makes the whole thing an asynchronous design which
> >  threads are trying to avoid.  Blocking is bad.  Leaving it to the
> >  caller is hard to use correctly.
> >
> >  Perhaps we can have a threadlet with barrier semantics.  You queue a
> >  piece of work which is guaranteed to execute after all previously
> >  submitted work (against the same queue) and before any consequently
> >  submitted work.
> >
> >  -- 
> >  error compiling committee.c: too many arguments to function
> >
> >
>
> I would suggest that we have 2 APIs - cancel_threadletwork (current
> cancel implementation) and cancel_threadletwork_sync (waits for work
> to complete). As of now there is no known user for
> cancel_threadletwork_sync. So we can keep this as a TODO for later. I
> can provide the APIs for both these so that when we have a user for
> cancel_threadletwork_sync, we can go ahead and implement it.

I agree it's best not to implement c_t_s() now.  Using it implies a 
stall so we should discourage it.

-- 
error compiling committee.c: too many arguments to function

^ permalink raw reply	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
  2010-10-19 17:42 [Qemu-devel] v6: [PATCH 0/3]: Threadlets: A generic task offloading framework Arun R Bharadwaj
@ 2010-10-19 17:43 ` Arun R Bharadwaj
  2010-10-20  2:24   ` Balbir Singh
                     ` (2 more replies)
  0 siblings, 3 replies; 29+ messages in thread
From: Arun R Bharadwaj @ 2010-10-19 17:43 UTC (permalink / raw)
  To: qemu-devel

From: Gautham R Shenoy <ego@in.ibm.com>

This patch makes the paio subsystem use the threadlet framework thereby
decoupling asynchronous threading framework portion out of
posix-aio-compat.c

The patch has been tested with fstress.

Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
---
 posix-aio-compat.c |  166 +++++++++-------------------------------------------
 1 files changed, 30 insertions(+), 136 deletions(-)

diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 7b862b5..6977c18 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,6 +29,7 @@
 #include "block_int.h"
 
 #include "block/raw-posix-aio.h"
+#include "qemu-threadlets.h"
 
 
 struct qemu_paiocb {
@@ -51,6 +52,7 @@ struct qemu_paiocb {
     struct qemu_paiocb *next;
 
     int async_context_id;
+    ThreadletWork work;
 };
 
 typedef struct PosixAioState {
@@ -59,15 +61,6 @@ typedef struct PosixAioState {
 } PosixAioState;
 
 
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
-
 #ifdef CONFIG_PREADV
 static int preadv_present = 1;
 #else
@@ -85,39 +78,6 @@ static void die(const char *what)
     die2(errno, what);
 }
 
-static void mutex_lock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_lock(mutex);
-    if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_unlock(mutex);
-    if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
-                           struct timespec *ts)
-{
-    int ret = pthread_cond_timedwait(cond, mutex, ts);
-    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
-    return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
-    int ret = pthread_cond_signal(cond);
-    if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
-                          void *(*start_routine)(void*), void *arg)
-{
-    int ret = pthread_create(thread, attr, start_routine, arg);
-    if (ret) die2(ret, "pthread_create");
-}
-
 static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
 {
     int ret;
@@ -301,106 +261,51 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
     return nbytes;
 }
 
-static void *aio_thread(void *unused)
+static void aio_thread(ThreadletWork *work)
 {
     pid_t pid;
+    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
+    ssize_t ret = 0;
 
     pid = getpid();
+    aiocb->active = 1;
 
-    while (1) {
-        struct qemu_paiocb *aiocb;
-        ssize_t ret = 0;
-        qemu_timeval tv;
-        struct timespec ts;
-
-        qemu_gettimeofday(&tv);
-        ts.tv_sec = tv.tv_sec + 10;
-        ts.tv_nsec = 0;
-
-        mutex_lock(&lock);
-
-        while (QTAILQ_EMPTY(&request_list) &&
-               !(ret == ETIMEDOUT)) {
-            ret = cond_timedwait(&cond, &lock, &ts);
-        }
-
-        if (QTAILQ_EMPTY(&request_list))
-            break;
-
-        aiocb = QTAILQ_FIRST(&request_list);
-        QTAILQ_REMOVE(&request_list, aiocb, node);
-        aiocb->active = 1;
-        idle_threads--;
-        mutex_unlock(&lock);
-
-        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
-        case QEMU_AIO_READ:
-        case QEMU_AIO_WRITE:
-            ret = handle_aiocb_rw(aiocb);
-            break;
-        case QEMU_AIO_FLUSH:
-            ret = handle_aiocb_flush(aiocb);
-            break;
-        case QEMU_AIO_IOCTL:
-            ret = handle_aiocb_ioctl(aiocb);
-            break;
-        default:
-            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
-            ret = -EINVAL;
-            break;
-        }
-
-        mutex_lock(&lock);
-        aiocb->ret = ret;
-        idle_threads++;
-        mutex_unlock(&lock);
-
-        if (kill(pid, aiocb->ev_signo)) die("kill failed");
+    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+    case QEMU_AIO_READ:
+    case QEMU_AIO_WRITE:
+        ret = handle_aiocb_rw(aiocb);
+        break;
+    case QEMU_AIO_FLUSH:
+        ret = handle_aiocb_flush(aiocb);
+        break;
+    case QEMU_AIO_IOCTL:
+        ret = handle_aiocb_ioctl(aiocb);
+        break;
+    default:
+        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+        ret = -EINVAL;
+        break;
     }
 
-    idle_threads--;
-    cur_threads--;
-    mutex_unlock(&lock);
+    aiocb->ret = ret;
 
-    return NULL;
-}
-
-static void spawn_thread(void)
-{
-    sigset_t set, oldset;
-
-    cur_threads++;
-    idle_threads++;
-
-    /* block all signals */
-    if (sigfillset(&set)) die("sigfillset");
-    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
-    thread_create(&thread_id, &attr, aio_thread, NULL);
-
-    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+    if (kill(pid, aiocb->ev_signo)) die("kill failed");
 }
 
 static void qemu_paio_submit(struct qemu_paiocb *aiocb)
 {
     aiocb->ret = -EINPROGRESS;
     aiocb->active = 0;
-    mutex_lock(&lock);
-    if (idle_threads == 0 && cur_threads < max_threads)
-        spawn_thread();
-    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
-    mutex_unlock(&lock);
-    cond_signal(&cond);
+
+    aiocb->work.func = aio_thread;
+    submit_threadletwork(&aiocb->work);
 }
 
 static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
 {
     ssize_t ret;
 
-    mutex_lock(&lock);
     ret = aiocb->ret;
-    mutex_unlock(&lock);
-
     return ret;
 }
 
@@ -536,14 +441,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
     int active = 0;
 
-    mutex_lock(&lock);
     if (!acb->active) {
-        QTAILQ_REMOVE(&request_list, acb, node);
-        acb->ret = -ECANCELED;
+        if (!cancel_threadletwork(&acb->work))
+            acb->ret = -ECANCELED;
+         else
+            active = 1;
     } else if (acb->ret == -EINPROGRESS) {
         active = 1;
     }
-    mutex_unlock(&lock);
 
     if (active) {
         /* fail safe: if the aio could not be canceled, we wait for
@@ -618,7 +523,6 @@ int paio_init(void)
     struct sigaction act;
     PosixAioState *s;
     int fds[2];
-    int ret;
 
     if (posix_aio_state)
         return 0;
@@ -645,16 +549,6 @@ int paio_init(void)
     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
         posix_aio_process_queue, s);
 
-    ret = pthread_attr_init(&attr);
-    if (ret)
-        die2(ret, "pthread_attr_init");
-
-    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-    if (ret)
-        die2(ret, "pthread_attr_setdetachstate");
-
-    QTAILQ_INIT(&request_list);
-
     posix_aio_state = s;
     return 0;
 }

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
  2010-10-19 17:43 ` [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets Arun R Bharadwaj
@ 2010-10-20  2:24   ` Balbir Singh
  2010-10-20  8:42   ` Kevin Wolf
  2010-10-20  9:30   ` Stefan Hajnoczi
  2 siblings, 0 replies; 29+ messages in thread
From: Balbir Singh @ 2010-10-20  2:24 UTC (permalink / raw)
  To: Arun R Bharadwaj; +Cc: qemu-devel

* Arun R B <arun@linux.vnet.ibm.com> [2010-10-19 23:13:20]:

> From: Gautham R Shenoy <ego@in.ibm.com>
> 
> This patch makes the paio subsystem use the threadlet framework thereby
> decoupling asynchronous threading framework portion out of
> posix-aio-compat.c
> 
> The patch has been tested with fstress.
> 
> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
> Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>

This change seems reasonable to me

Acked-by: Balbir Singh <balbir@linux.vnet.ibm.com>
 

-- 
	Three Cheers,
	Balbir

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
  2010-10-19 17:43 ` [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets Arun R Bharadwaj
  2010-10-20  2:24   ` Balbir Singh
@ 2010-10-20  8:42   ` Kevin Wolf
  2010-10-20  9:30   ` Stefan Hajnoczi
  2 siblings, 0 replies; 29+ messages in thread
From: Kevin Wolf @ 2010-10-20  8:42 UTC (permalink / raw)
  To: Arun R Bharadwaj; +Cc: qemu-devel

Am 19.10.2010 19:43, schrieb Arun R Bharadwaj:
> From: Gautham R Shenoy <ego@in.ibm.com>
> 
> This patch makes the paio subsystem use the threadlet framework thereby
> decoupling asynchronous threading framework portion out of
> posix-aio-compat.c
> 
> The patch has been tested with fstress.
> 
> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
> Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
> ---
>  posix-aio-compat.c |  166 +++++++++-------------------------------------------
>  1 files changed, 30 insertions(+), 136 deletions(-)
> 
> diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> index 7b862b5..6977c18 100644
> --- a/posix-aio-compat.c
> +++ b/posix-aio-compat.c
> @@ -29,6 +29,7 @@
>  #include "block_int.h"
>  
>  #include "block/raw-posix-aio.h"
> +#include "qemu-threadlets.h"
>  
>  
>  struct qemu_paiocb {
> @@ -51,6 +52,7 @@ struct qemu_paiocb {
>      struct qemu_paiocb *next;
>  
>      int async_context_id;
> +    ThreadletWork work;
>  };
>  
>  typedef struct PosixAioState {
> @@ -59,15 +61,6 @@ typedef struct PosixAioState {
>  } PosixAioState;
>  
>  
> -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
> -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
> -static pthread_t thread_id;
> -static pthread_attr_t attr;
> -static int max_threads = 64;
> -static int cur_threads = 0;
> -static int idle_threads = 0;
> -static QTAILQ_HEAD(, qemu_paiocb) request_list;
> -
>  #ifdef CONFIG_PREADV
>  static int preadv_present = 1;
>  #else
> @@ -85,39 +78,6 @@ static void die(const char *what)
>      die2(errno, what);
>  }
>  
> -static void mutex_lock(pthread_mutex_t *mutex)
> -{
> -    int ret = pthread_mutex_lock(mutex);
> -    if (ret) die2(ret, "pthread_mutex_lock");
> -}
> -
> -static void mutex_unlock(pthread_mutex_t *mutex)
> -{
> -    int ret = pthread_mutex_unlock(mutex);
> -    if (ret) die2(ret, "pthread_mutex_unlock");
> -}
> -
> -static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
> -                           struct timespec *ts)
> -{
> -    int ret = pthread_cond_timedwait(cond, mutex, ts);
> -    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
> -    return ret;
> -}
> -
> -static void cond_signal(pthread_cond_t *cond)
> -{
> -    int ret = pthread_cond_signal(cond);
> -    if (ret) die2(ret, "pthread_cond_signal");
> -}
> -
> -static void thread_create(pthread_t *thread, pthread_attr_t *attr,
> -                          void *(*start_routine)(void*), void *arg)
> -{
> -    int ret = pthread_create(thread, attr, start_routine, arg);
> -    if (ret) die2(ret, "pthread_create");
> -}
> -
>  static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
>  {
>      int ret;
> @@ -301,106 +261,51 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
>      return nbytes;
>  }
>  
> -static void *aio_thread(void *unused)
> +static void aio_thread(ThreadletWork *work)
>  {
>      pid_t pid;
> +    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
> +    ssize_t ret = 0;
>  
>      pid = getpid();
> +    aiocb->active = 1;
>  
> -    while (1) {
> -        struct qemu_paiocb *aiocb;
> -        ssize_t ret = 0;
> -        qemu_timeval tv;
> -        struct timespec ts;
> -
> -        qemu_gettimeofday(&tv);
> -        ts.tv_sec = tv.tv_sec + 10;
> -        ts.tv_nsec = 0;
> -
> -        mutex_lock(&lock);
> -
> -        while (QTAILQ_EMPTY(&request_list) &&
> -               !(ret == ETIMEDOUT)) {
> -            ret = cond_timedwait(&cond, &lock, &ts);
> -        }
> -
> -        if (QTAILQ_EMPTY(&request_list))
> -            break;
> -
> -        aiocb = QTAILQ_FIRST(&request_list);
> -        QTAILQ_REMOVE(&request_list, aiocb, node);
> -        aiocb->active = 1;
> -        idle_threads--;
> -        mutex_unlock(&lock);
> -
> -        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> -        case QEMU_AIO_READ:
> -        case QEMU_AIO_WRITE:
> -            ret = handle_aiocb_rw(aiocb);
> -            break;
> -        case QEMU_AIO_FLUSH:
> -            ret = handle_aiocb_flush(aiocb);
> -            break;
> -        case QEMU_AIO_IOCTL:
> -            ret = handle_aiocb_ioctl(aiocb);
> -            break;
> -        default:
> -            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> -            ret = -EINVAL;
> -            break;
> -        }
> -
> -        mutex_lock(&lock);
> -        aiocb->ret = ret;
> -        idle_threads++;
> -        mutex_unlock(&lock);
> -
> -        if (kill(pid, aiocb->ev_signo)) die("kill failed");
> +    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> +    case QEMU_AIO_READ:
> +    case QEMU_AIO_WRITE:
> +        ret = handle_aiocb_rw(aiocb);
> +        break;
> +    case QEMU_AIO_FLUSH:
> +        ret = handle_aiocb_flush(aiocb);
> +        break;
> +    case QEMU_AIO_IOCTL:
> +        ret = handle_aiocb_ioctl(aiocb);
> +        break;
> +    default:
> +        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> +        ret = -EINVAL;
> +        break;
>      }
>  
> -    idle_threads--;
> -    cur_threads--;
> -    mutex_unlock(&lock);
> +    aiocb->ret = ret;
>  
> -    return NULL;
> -}
> -
> -static void spawn_thread(void)
> -{
> -    sigset_t set, oldset;
> -
> -    cur_threads++;
> -    idle_threads++;
> -
> -    /* block all signals */
> -    if (sigfillset(&set)) die("sigfillset");
> -    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
> -
> -    thread_create(&thread_id, &attr, aio_thread, NULL);
> -
> -    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
> +    if (kill(pid, aiocb->ev_signo)) die("kill failed");

Please move the die on a line of itself and add braces.

>  }
>  
>  static void qemu_paio_submit(struct qemu_paiocb *aiocb)
>  {
>      aiocb->ret = -EINPROGRESS;
>      aiocb->active = 0;
> -    mutex_lock(&lock);
> -    if (idle_threads == 0 && cur_threads < max_threads)
> -        spawn_thread();
> -    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
> -    mutex_unlock(&lock);
> -    cond_signal(&cond);
> +
> +    aiocb->work.func = aio_thread;
> +    submit_threadletwork(&aiocb->work);
>  }
>  
>  static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
>  {
>      ssize_t ret;
>  
> -    mutex_lock(&lock);
>      ret = aiocb->ret;
> -    mutex_unlock(&lock);
> -
>      return ret;
>  }
>  
> @@ -536,14 +441,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
>      struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
>      int active = 0;
>  
> -    mutex_lock(&lock);
>      if (!acb->active) {
> -        QTAILQ_REMOVE(&request_list, acb, node);
> -        acb->ret = -ECANCELED;
> +        if (!cancel_threadletwork(&acb->work))
> +            acb->ret = -ECANCELED;
> +         else
> +            active = 1;

Missing braces.

Kevin

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
  2010-10-19 17:43 ` [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets Arun R Bharadwaj
  2010-10-20  2:24   ` Balbir Singh
  2010-10-20  8:42   ` Kevin Wolf
@ 2010-10-20  9:30   ` Stefan Hajnoczi
  2010-10-20 13:16     ` Anthony Liguori
  2010-10-21  8:40     ` Arun R Bharadwaj
  2 siblings, 2 replies; 29+ messages in thread
From: Stefan Hajnoczi @ 2010-10-20  9:30 UTC (permalink / raw)
  To: Arun R Bharadwaj; +Cc: qemu-devel

On Tue, Oct 19, 2010 at 6:43 PM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> From: Gautham R Shenoy <ego@in.ibm.com>
>
> This patch makes the paio subsystem use the threadlet framework thereby
> decoupling asynchronous threading framework portion out of
> posix-aio-compat.c
>
> The patch has been tested with fstress.
>
> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
> Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
> ---
>  posix-aio-compat.c |  166 +++++++++-------------------------------------------
>  1 files changed, 30 insertions(+), 136 deletions(-)
>
> diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> index 7b862b5..6977c18 100644
> --- a/posix-aio-compat.c
> +++ b/posix-aio-compat.c
> @@ -29,6 +29,7 @@
>  #include "block_int.h"
>
>  #include "block/raw-posix-aio.h"
> +#include "qemu-threadlets.h"
>
>
>  struct qemu_paiocb {
> @@ -51,6 +52,7 @@ struct qemu_paiocb {
>     struct qemu_paiocb *next;
>
>     int async_context_id;
> +    ThreadletWork work;

The QTAILQ_ENTRY(qemu_paiocb) node field is no longer used, please remove it.

>  };
>
>  typedef struct PosixAioState {
> @@ -59,15 +61,6 @@ typedef struct PosixAioState {
>  } PosixAioState;
>
>
> -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
> -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
> -static pthread_t thread_id;
> -static pthread_attr_t attr;
> -static int max_threads = 64;
> -static int cur_threads = 0;
> -static int idle_threads = 0;
> -static QTAILQ_HEAD(, qemu_paiocb) request_list;
> -
>  #ifdef CONFIG_PREADV
>  static int preadv_present = 1;
>  #else
> @@ -85,39 +78,6 @@ static void die(const char *what)
>     die2(errno, what);
>  }
>
> -static void mutex_lock(pthread_mutex_t *mutex)
> -{
> -    int ret = pthread_mutex_lock(mutex);
> -    if (ret) die2(ret, "pthread_mutex_lock");
> -}
> -
> -static void mutex_unlock(pthread_mutex_t *mutex)
> -{
> -    int ret = pthread_mutex_unlock(mutex);
> -    if (ret) die2(ret, "pthread_mutex_unlock");
> -}
> -
> -static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
> -                           struct timespec *ts)
> -{
> -    int ret = pthread_cond_timedwait(cond, mutex, ts);
> -    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
> -    return ret;
> -}
> -
> -static void cond_signal(pthread_cond_t *cond)
> -{
> -    int ret = pthread_cond_signal(cond);
> -    if (ret) die2(ret, "pthread_cond_signal");
> -}
> -
> -static void thread_create(pthread_t *thread, pthread_attr_t *attr,
> -                          void *(*start_routine)(void*), void *arg)
> -{
> -    int ret = pthread_create(thread, attr, start_routine, arg);
> -    if (ret) die2(ret, "pthread_create");
> -}
> -
>  static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
>  {
>     int ret;
> @@ -301,106 +261,51 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
>     return nbytes;
>  }
>
> -static void *aio_thread(void *unused)
> +static void aio_thread(ThreadletWork *work)
>  {
>     pid_t pid;
> +    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
> +    ssize_t ret = 0;
>
>     pid = getpid();
> +    aiocb->active = 1;
>
> -    while (1) {
> -        struct qemu_paiocb *aiocb;
> -        ssize_t ret = 0;
> -        qemu_timeval tv;
> -        struct timespec ts;
> -
> -        qemu_gettimeofday(&tv);
> -        ts.tv_sec = tv.tv_sec + 10;
> -        ts.tv_nsec = 0;
> -
> -        mutex_lock(&lock);
> -
> -        while (QTAILQ_EMPTY(&request_list) &&
> -               !(ret == ETIMEDOUT)) {
> -            ret = cond_timedwait(&cond, &lock, &ts);
> -        }
> -
> -        if (QTAILQ_EMPTY(&request_list))
> -            break;
> -
> -        aiocb = QTAILQ_FIRST(&request_list);
> -        QTAILQ_REMOVE(&request_list, aiocb, node);
> -        aiocb->active = 1;
> -        idle_threads--;
> -        mutex_unlock(&lock);
> -
> -        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> -        case QEMU_AIO_READ:
> -        case QEMU_AIO_WRITE:
> -            ret = handle_aiocb_rw(aiocb);
> -            break;
> -        case QEMU_AIO_FLUSH:
> -            ret = handle_aiocb_flush(aiocb);
> -            break;
> -        case QEMU_AIO_IOCTL:
> -            ret = handle_aiocb_ioctl(aiocb);
> -            break;
> -        default:
> -            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> -            ret = -EINVAL;
> -            break;
> -        }
> -
> -        mutex_lock(&lock);
> -        aiocb->ret = ret;
> -        idle_threads++;
> -        mutex_unlock(&lock);
> -
> -        if (kill(pid, aiocb->ev_signo)) die("kill failed");
> +    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> +    case QEMU_AIO_READ:
> +    case QEMU_AIO_WRITE:
> +        ret = handle_aiocb_rw(aiocb);
> +        break;
> +    case QEMU_AIO_FLUSH:
> +        ret = handle_aiocb_flush(aiocb);
> +        break;
> +    case QEMU_AIO_IOCTL:
> +        ret = handle_aiocb_ioctl(aiocb);
> +        break;
> +    default:
> +        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> +        ret = -EINVAL;
> +        break;
>     }
>
> -    idle_threads--;
> -    cur_threads--;
> -    mutex_unlock(&lock);
> +    aiocb->ret = ret;
>
> -    return NULL;
> -}
> -
> -static void spawn_thread(void)
> -{
> -    sigset_t set, oldset;
> -
> -    cur_threads++;
> -    idle_threads++;
> -
> -    /* block all signals */
> -    if (sigfillset(&set)) die("sigfillset");
> -    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
> -
> -    thread_create(&thread_id, &attr, aio_thread, NULL);
> -
> -    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
> +    if (kill(pid, aiocb->ev_signo)) die("kill failed");
>  }
>
>  static void qemu_paio_submit(struct qemu_paiocb *aiocb)
>  {
>     aiocb->ret = -EINPROGRESS;
>     aiocb->active = 0;
> -    mutex_lock(&lock);
> -    if (idle_threads == 0 && cur_threads < max_threads)
> -        spawn_thread();
> -    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
> -    mutex_unlock(&lock);
> -    cond_signal(&cond);
> +
> +    aiocb->work.func = aio_thread;
> +    submit_threadletwork(&aiocb->work);
>  }
>
>  static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
>  {
>     ssize_t ret;
>
> -    mutex_lock(&lock);
>     ret = aiocb->ret;
> -    mutex_unlock(&lock);
> -
>     return ret;
>  }
>
> @@ -536,14 +441,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
>     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
>     int active = 0;
>
> -    mutex_lock(&lock);
>     if (!acb->active) {

I'm not sure the active field serves any purpose.  No memory barriers
are used so the value of active is 0 before the work is executed and 0
*or* 1 while the work is executed.

The cancel_threadletwork() function already indicates whether
cancellation succeeded.  Why not just try to cancel instead of using
the active field?

> -        QTAILQ_REMOVE(&request_list, acb, node);
> -        acb->ret = -ECANCELED;
> +        if (!cancel_threadletwork(&acb->work))
> +            acb->ret = -ECANCELED;
> +         else
> +            active = 1;

The 0 and 1 return value from cancel_threadletwork() is inverted.  See
also my comment on patch 1/3 in this series.

>     } else if (acb->ret == -EINPROGRESS) {
>         active = 1;
>     }
> -    mutex_unlock(&lock);
>
>     if (active) {
>         /* fail safe: if the aio could not be canceled, we wait for

while (qemu_paio_error(acb) == EINPROGRESS)
    ;

Tight loop with no memory barrier reading a memory location that is
updated by another thread.  We shouldn't communicate between threads
without barriers.

Stefan

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
  2010-10-20  9:30   ` Stefan Hajnoczi
@ 2010-10-20 13:16     ` Anthony Liguori
  2010-10-21  8:40     ` Arun R Bharadwaj
  1 sibling, 0 replies; 29+ messages in thread
From: Anthony Liguori @ 2010-10-20 13:16 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: Arun R Bharadwaj, qemu-devel

On 10/20/2010 04:30 AM, Stefan Hajnoczi wrote:
>
>>      } else if (acb->ret == -EINPROGRESS) {
>>          active = 1;
>>      }
>> -    mutex_unlock(&lock);
>>
>>      if (active) {
>>          /* fail safe: if the aio could not be canceled, we wait for
>>      
> while (qemu_paio_error(acb) == EINPROGRESS)
>      ;
>
> Tight loop with no memory barrier reading a memory location that is
> updated by another thread.  We shouldn't communicate between threads
> without barriers.
>    

We shouldn't use a tight loop period.  A condition should be used if 
signalling is needed.

And we shouldn't rely on atomic assignments to communicate between 
threads.  Just use a mutex and avoid being fancier than we need to be.

Regards,

Anthony Liguori

> Stefan
>
>    

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
  2010-10-20  9:30   ` Stefan Hajnoczi
  2010-10-20 13:16     ` Anthony Liguori
@ 2010-10-21  8:40     ` Arun R Bharadwaj
  2010-10-21  9:17       ` Stefan Hajnoczi
  1 sibling, 1 reply; 29+ messages in thread
From: Arun R Bharadwaj @ 2010-10-21  8:40 UTC (permalink / raw)
  To: Stefan Hajnoczi; +Cc: qemu-devel

* Stefan Hajnoczi <stefanha@gmail.com> [2010-10-20 10:30:38]:

> On Tue, Oct 19, 2010 at 6:43 PM, Arun R Bharadwaj
> <arun@linux.vnet.ibm.com> wrote:
> > From: Gautham R Shenoy <ego@in.ibm.com>
> >
> > This patch makes the paio subsystem use the threadlet framework thereby
> > decoupling asynchronous threading framework portion out of
> > posix-aio-compat.c
> >
> > The patch has been tested with fstress.
> >
> > Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
> > Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
> > ---
> >  posix-aio-compat.c |  166 +++++++++-------------------------------------------
> >  1 files changed, 30 insertions(+), 136 deletions(-)
> >
> > diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> > index 7b862b5..6977c18 100644
> > --- a/posix-aio-compat.c
> > +++ b/posix-aio-compat.c
> > @@ -29,6 +29,7 @@
> >  #include "block_int.h"
> >
> >  #include "block/raw-posix-aio.h"
> > +#include "qemu-threadlets.h"
> >
> >
> >  struct qemu_paiocb {
> > @@ -51,6 +52,7 @@ struct qemu_paiocb {
> >     struct qemu_paiocb *next;
> >
> >     int async_context_id;
> > +    ThreadletWork work;
> 
> The QTAILQ_ENTRY(qemu_paiocb) node field is no longer used, please remove it.
> 
> >  };
> >
> >  typedef struct PosixAioState {
> > @@ -59,15 +61,6 @@ typedef struct PosixAioState {
> >  } PosixAioState;
> >
> >
> > -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
> > -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
> > -static pthread_t thread_id;
> > -static pthread_attr_t attr;
> > -static int max_threads = 64;
> > -static int cur_threads = 0;
> > -static int idle_threads = 0;
> > -static QTAILQ_HEAD(, qemu_paiocb) request_list;
> > -
> >  #ifdef CONFIG_PREADV
> >  static int preadv_present = 1;
> >  #else
> > @@ -85,39 +78,6 @@ static void die(const char *what)
> >     die2(errno, what);
> >  }
> >
> > -static void mutex_lock(pthread_mutex_t *mutex)
> > -{
> > -    int ret = pthread_mutex_lock(mutex);
> > -    if (ret) die2(ret, "pthread_mutex_lock");
> > -}
> > -
> > -static void mutex_unlock(pthread_mutex_t *mutex)
> > -{
> > -    int ret = pthread_mutex_unlock(mutex);
> > -    if (ret) die2(ret, "pthread_mutex_unlock");
> > -}
> > -
> > -static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
> > -                           struct timespec *ts)
> > -{
> > -    int ret = pthread_cond_timedwait(cond, mutex, ts);
> > -    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
> > -    return ret;
> > -}
> > -
> > -static void cond_signal(pthread_cond_t *cond)
> > -{
> > -    int ret = pthread_cond_signal(cond);
> > -    if (ret) die2(ret, "pthread_cond_signal");
> > -}
> > -
> > -static void thread_create(pthread_t *thread, pthread_attr_t *attr,
> > -                          void *(*start_routine)(void*), void *arg)
> > -{
> > -    int ret = pthread_create(thread, attr, start_routine, arg);
> > -    if (ret) die2(ret, "pthread_create");
> > -}
> > -
> >  static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
> >  {
> >     int ret;
> > @@ -301,106 +261,51 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
> >     return nbytes;
> >  }
> >
> > -static void *aio_thread(void *unused)
> > +static void aio_thread(ThreadletWork *work)
> >  {
> >     pid_t pid;
> > +    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
> > +    ssize_t ret = 0;
> >
> >     pid = getpid();
> > +    aiocb->active = 1;
> >
> > -    while (1) {
> > -        struct qemu_paiocb *aiocb;
> > -        ssize_t ret = 0;
> > -        qemu_timeval tv;
> > -        struct timespec ts;
> > -
> > -        qemu_gettimeofday(&tv);
> > -        ts.tv_sec = tv.tv_sec + 10;
> > -        ts.tv_nsec = 0;
> > -
> > -        mutex_lock(&lock);
> > -
> > -        while (QTAILQ_EMPTY(&request_list) &&
> > -               !(ret == ETIMEDOUT)) {
> > -            ret = cond_timedwait(&cond, &lock, &ts);
> > -        }
> > -
> > -        if (QTAILQ_EMPTY(&request_list))
> > -            break;
> > -
> > -        aiocb = QTAILQ_FIRST(&request_list);
> > -        QTAILQ_REMOVE(&request_list, aiocb, node);
> > -        aiocb->active = 1;
> > -        idle_threads--;
> > -        mutex_unlock(&lock);
> > -
> > -        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> > -        case QEMU_AIO_READ:
> > -        case QEMU_AIO_WRITE:
> > -            ret = handle_aiocb_rw(aiocb);
> > -            break;
> > -        case QEMU_AIO_FLUSH:
> > -            ret = handle_aiocb_flush(aiocb);
> > -            break;
> > -        case QEMU_AIO_IOCTL:
> > -            ret = handle_aiocb_ioctl(aiocb);
> > -            break;
> > -        default:
> > -            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> > -            ret = -EINVAL;
> > -            break;
> > -        }
> > -
> > -        mutex_lock(&lock);
> > -        aiocb->ret = ret;
> > -        idle_threads++;
> > -        mutex_unlock(&lock);
> > -
> > -        if (kill(pid, aiocb->ev_signo)) die("kill failed");
> > +    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> > +    case QEMU_AIO_READ:
> > +    case QEMU_AIO_WRITE:
> > +        ret = handle_aiocb_rw(aiocb);
> > +        break;
> > +    case QEMU_AIO_FLUSH:
> > +        ret = handle_aiocb_flush(aiocb);
> > +        break;
> > +    case QEMU_AIO_IOCTL:
> > +        ret = handle_aiocb_ioctl(aiocb);
> > +        break;
> > +    default:
> > +        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> > +        ret = -EINVAL;
> > +        break;
> >     }
> >
> > -    idle_threads--;
> > -    cur_threads--;
> > -    mutex_unlock(&lock);
> > +    aiocb->ret = ret;
> >
> > -    return NULL;
> > -}
> > -
> > -static void spawn_thread(void)
> > -{
> > -    sigset_t set, oldset;
> > -
> > -    cur_threads++;
> > -    idle_threads++;
> > -
> > -    /* block all signals */
> > -    if (sigfillset(&set)) die("sigfillset");
> > -    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
> > -
> > -    thread_create(&thread_id, &attr, aio_thread, NULL);
> > -
> > -    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
> > +    if (kill(pid, aiocb->ev_signo)) die("kill failed");
> >  }
> >
> >  static void qemu_paio_submit(struct qemu_paiocb *aiocb)
> >  {
> >     aiocb->ret = -EINPROGRESS;
> >     aiocb->active = 0;
> > -    mutex_lock(&lock);
> > -    if (idle_threads == 0 && cur_threads < max_threads)
> > -        spawn_thread();
> > -    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
> > -    mutex_unlock(&lock);
> > -    cond_signal(&cond);
> > +
> > +    aiocb->work.func = aio_thread;
> > +    submit_threadletwork(&aiocb->work);
> >  }
> >
> >  static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
> >  {
> >     ssize_t ret;
> >
> > -    mutex_lock(&lock);
> >     ret = aiocb->ret;
> > -    mutex_unlock(&lock);
> > -
> >     return ret;
> >  }
> >
> > @@ -536,14 +441,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
> >     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> >     int active = 0;
> >
> > -    mutex_lock(&lock);
> >     if (!acb->active) {
> 
> I'm not sure the active field serves any purpose.  No memory barriers
> are used so the value of active is 0 before the work is executed and 0
> *or* 1 while the work is executed.
> 
> The cancel_threadletwork() function already indicates whether
> cancellation succeeded.  Why not just try to cancel instead of using
> the active field?
> 

This series does not touch the active field anywhere. So I feel we can
implement this as a separate patch instead of clubbing it with this.

-arun
> > -        QTAILQ_REMOVE(&request_list, acb, node);
> > -        acb->ret = -ECANCELED;
> > +        if (!cancel_threadletwork(&acb->work))
> > +            acb->ret = -ECANCELED;
> > +         else
> > +            active = 1;
> 
> The 0 and 1 return value from cancel_threadletwork() is inverted.  See
> also my comment on patch 1/3 in this series.
> 
> >     } else if (acb->ret == -EINPROGRESS) {
> >         active = 1;
> >     }
> > -    mutex_unlock(&lock);
> >
> >     if (active) {
> >         /* fail safe: if the aio could not be canceled, we wait for
> 
> while (qemu_paio_error(acb) == EINPROGRESS)
>     ;
> 
> Tight loop with no memory barrier reading a memory location that is
> updated by another thread.  We shouldn't communicate between threads
> without barriers.
> 
> Stefan
> 

^ permalink raw reply	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
  2010-10-21  8:40     ` Arun R Bharadwaj
@ 2010-10-21  9:17       ` Stefan Hajnoczi
  0 siblings, 0 replies; 29+ messages in thread
From: Stefan Hajnoczi @ 2010-10-21  9:17 UTC (permalink / raw)
  To: arun; +Cc: qemu-devel

On Thu, Oct 21, 2010 at 9:40 AM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> * Stefan Hajnoczi <stefanha@gmail.com> [2010-10-20 10:30:38]:
>
>> On Tue, Oct 19, 2010 at 6:43 PM, Arun R Bharadwaj
>> <arun@linux.vnet.ibm.com> wrote:
>> > From: Gautham R Shenoy <ego@in.ibm.com>
>> >
>> > This patch makes the paio subsystem use the threadlet framework thereby
>> > decoupling asynchronous threading framework portion out of
>> > posix-aio-compat.c
>> >
>> > The patch has been tested with fstress.
>> >
>> > Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
>> > Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
>> > ---
>> >  posix-aio-compat.c |  166 +++++++++-------------------------------------------
>> >  1 files changed, 30 insertions(+), 136 deletions(-)
>> >
>> > diff --git a/posix-aio-compat.c b/posix-aio-compat.c
>> > index 7b862b5..6977c18 100644
>> > --- a/posix-aio-compat.c
>> > +++ b/posix-aio-compat.c
>> > @@ -29,6 +29,7 @@
>> >  #include "block_int.h"
>> >
>> >  #include "block/raw-posix-aio.h"
>> > +#include "qemu-threadlets.h"
>> >
>> >
>> >  struct qemu_paiocb {
>> > @@ -51,6 +52,7 @@ struct qemu_paiocb {
>> >     struct qemu_paiocb *next;
>> >
>> >     int async_context_id;
>> > +    ThreadletWork work;
>>
>> The QTAILQ_ENTRY(qemu_paiocb) node field is no longer used, please remove it.
>>
>> >  };
>> >
>> >  typedef struct PosixAioState {
>> > @@ -59,15 +61,6 @@ typedef struct PosixAioState {
>> >  } PosixAioState;
>> >
>> >
>> > -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
>> > -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
>> > -static pthread_t thread_id;
>> > -static pthread_attr_t attr;
>> > -static int max_threads = 64;
>> > -static int cur_threads = 0;
>> > -static int idle_threads = 0;
>> > -static QTAILQ_HEAD(, qemu_paiocb) request_list;
>> > -
>> >  #ifdef CONFIG_PREADV
>> >  static int preadv_present = 1;
>> >  #else
>> > @@ -85,39 +78,6 @@ static void die(const char *what)
>> >     die2(errno, what);
>> >  }
>> >
>> > -static void mutex_lock(pthread_mutex_t *mutex)
>> > -{
>> > -    int ret = pthread_mutex_lock(mutex);
>> > -    if (ret) die2(ret, "pthread_mutex_lock");
>> > -}
>> > -
>> > -static void mutex_unlock(pthread_mutex_t *mutex)
>> > -{
>> > -    int ret = pthread_mutex_unlock(mutex);
>> > -    if (ret) die2(ret, "pthread_mutex_unlock");
>> > -}
>> > -
>> > -static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
>> > -                           struct timespec *ts)
>> > -{
>> > -    int ret = pthread_cond_timedwait(cond, mutex, ts);
>> > -    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
>> > -    return ret;
>> > -}
>> > -
>> > -static void cond_signal(pthread_cond_t *cond)
>> > -{
>> > -    int ret = pthread_cond_signal(cond);
>> > -    if (ret) die2(ret, "pthread_cond_signal");
>> > -}
>> > -
>> > -static void thread_create(pthread_t *thread, pthread_attr_t *attr,
>> > -                          void *(*start_routine)(void*), void *arg)
>> > -{
>> > -    int ret = pthread_create(thread, attr, start_routine, arg);
>> > -    if (ret) die2(ret, "pthread_create");
>> > -}
>> > -
>> >  static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
>> >  {
>> >     int ret;
>> > @@ -301,106 +261,51 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
>> >     return nbytes;
>> >  }
>> >
>> > -static void *aio_thread(void *unused)
>> > +static void aio_thread(ThreadletWork *work)
>> >  {
>> >     pid_t pid;
>> > +    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
>> > +    ssize_t ret = 0;
>> >
>> >     pid = getpid();
>> > +    aiocb->active = 1;
>> >
>> > -    while (1) {
>> > -        struct qemu_paiocb *aiocb;
>> > -        ssize_t ret = 0;
>> > -        qemu_timeval tv;
>> > -        struct timespec ts;
>> > -
>> > -        qemu_gettimeofday(&tv);
>> > -        ts.tv_sec = tv.tv_sec + 10;
>> > -        ts.tv_nsec = 0;
>> > -
>> > -        mutex_lock(&lock);
>> > -
>> > -        while (QTAILQ_EMPTY(&request_list) &&
>> > -               !(ret == ETIMEDOUT)) {
>> > -            ret = cond_timedwait(&cond, &lock, &ts);
>> > -        }
>> > -
>> > -        if (QTAILQ_EMPTY(&request_list))
>> > -            break;
>> > -
>> > -        aiocb = QTAILQ_FIRST(&request_list);
>> > -        QTAILQ_REMOVE(&request_list, aiocb, node);
>> > -        aiocb->active = 1;
>> > -        idle_threads--;
>> > -        mutex_unlock(&lock);
>> > -
>> > -        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
>> > -        case QEMU_AIO_READ:
>> > -        case QEMU_AIO_WRITE:
>> > -            ret = handle_aiocb_rw(aiocb);
>> > -            break;
>> > -        case QEMU_AIO_FLUSH:
>> > -            ret = handle_aiocb_flush(aiocb);
>> > -            break;
>> > -        case QEMU_AIO_IOCTL:
>> > -            ret = handle_aiocb_ioctl(aiocb);
>> > -            break;
>> > -        default:
>> > -            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
>> > -            ret = -EINVAL;
>> > -            break;
>> > -        }
>> > -
>> > -        mutex_lock(&lock);
>> > -        aiocb->ret = ret;
>> > -        idle_threads++;
>> > -        mutex_unlock(&lock);
>> > -
>> > -        if (kill(pid, aiocb->ev_signo)) die("kill failed");
>> > +    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
>> > +    case QEMU_AIO_READ:
>> > +    case QEMU_AIO_WRITE:
>> > +        ret = handle_aiocb_rw(aiocb);
>> > +        break;
>> > +    case QEMU_AIO_FLUSH:
>> > +        ret = handle_aiocb_flush(aiocb);
>> > +        break;
>> > +    case QEMU_AIO_IOCTL:
>> > +        ret = handle_aiocb_ioctl(aiocb);
>> > +        break;
>> > +    default:
>> > +        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
>> > +        ret = -EINVAL;
>> > +        break;
>> >     }
>> >
>> > -    idle_threads--;
>> > -    cur_threads--;
>> > -    mutex_unlock(&lock);
>> > +    aiocb->ret = ret;
>> >
>> > -    return NULL;
>> > -}
>> > -
>> > -static void spawn_thread(void)
>> > -{
>> > -    sigset_t set, oldset;
>> > -
>> > -    cur_threads++;
>> > -    idle_threads++;
>> > -
>> > -    /* block all signals */
>> > -    if (sigfillset(&set)) die("sigfillset");
>> > -    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
>> > -
>> > -    thread_create(&thread_id, &attr, aio_thread, NULL);
>> > -
>> > -    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
>> > +    if (kill(pid, aiocb->ev_signo)) die("kill failed");
>> >  }
>> >
>> >  static void qemu_paio_submit(struct qemu_paiocb *aiocb)
>> >  {
>> >     aiocb->ret = -EINPROGRESS;
>> >     aiocb->active = 0;
>> > -    mutex_lock(&lock);
>> > -    if (idle_threads == 0 && cur_threads < max_threads)
>> > -        spawn_thread();
>> > -    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
>> > -    mutex_unlock(&lock);
>> > -    cond_signal(&cond);
>> > +
>> > +    aiocb->work.func = aio_thread;
>> > +    submit_threadletwork(&aiocb->work);
>> >  }
>> >
>> >  static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
>> >  {
>> >     ssize_t ret;
>> >
>> > -    mutex_lock(&lock);
>> >     ret = aiocb->ret;
>> > -    mutex_unlock(&lock);
>> > -
>> >     return ret;
>> >  }
>> >
>> > @@ -536,14 +441,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
>> >     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
>> >     int active = 0;
>> >
>> > -    mutex_lock(&lock);
>> >     if (!acb->active) {
>>
>> I'm not sure the active field serves any purpose.  No memory barriers
>> are used so the value of active is 0 before the work is executed and 0
>> *or* 1 while the work is executed.
>>
>> The cancel_threadletwork() function already indicates whether
>> cancellation succeeded.  Why not just try to cancel instead of using
>> the active field?
>>
>
> This series does not touch the active field anywhere. So I feel we can
> implement this as a separate patch instead of clubbing it with this.

I'd prefer for this to be addressed in this patch because the active
field served a function before but no longer works with threadlets.
You're right that the patch doesn't touch it, and QEMU still compiles
fine with it, but it's still broken as I described in the previous
email.

In other words, the patch breaks the active field, please fix it.

Stefan

^ permalink raw reply	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
  2010-10-21 12:10 [Qemu-devel] [PATCH 0/3]: v7: Threadlets: A generic task offloading framework Arun R Bharadwaj
@ 2010-10-21 12:10 ` Arun R Bharadwaj
  2010-10-23 11:57   ` Stefan Hajnoczi
  0 siblings, 1 reply; 29+ messages in thread
From: Arun R Bharadwaj @ 2010-10-21 12:10 UTC (permalink / raw)
  To: qemu-devel

From: Gautham R Shenoy <ego@in.ibm.com>

This patch makes the paio subsystem use the threadlet framework thereby
decoupling asynchronous threading framework portion out of
posix-aio-compat.c

The patch has been tested with fstress.

Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Acked-by: Balbir Singh <balbir@linux.vnet.ibm.com>
---
 posix-aio-compat.c |  170 ++++++++++------------------------------------------
 1 files changed, 33 insertions(+), 137 deletions(-)

diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 7b862b5..2e47736 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,6 +29,7 @@
 #include "block_int.h"
 
 #include "block/raw-posix-aio.h"
+#include "qemu-threadlets.h"
 
 
 struct qemu_paiocb {
@@ -44,13 +45,13 @@ struct qemu_paiocb {
     int ev_signo;
     off_t aio_offset;
 
-    QTAILQ_ENTRY(qemu_paiocb) node;
     int aio_type;
     ssize_t ret;
     int active;
     struct qemu_paiocb *next;
 
     int async_context_id;
+    ThreadletWork work;
 };
 
 typedef struct PosixAioState {
@@ -59,15 +60,6 @@ typedef struct PosixAioState {
 } PosixAioState;
 
 
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
-
 #ifdef CONFIG_PREADV
 static int preadv_present = 1;
 #else
@@ -85,39 +77,6 @@ static void die(const char *what)
     die2(errno, what);
 }
 
-static void mutex_lock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_lock(mutex);
-    if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_unlock(mutex);
-    if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
-                           struct timespec *ts)
-{
-    int ret = pthread_cond_timedwait(cond, mutex, ts);
-    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
-    return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
-    int ret = pthread_cond_signal(cond);
-    if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
-                          void *(*start_routine)(void*), void *arg)
-{
-    int ret = pthread_create(thread, attr, start_routine, arg);
-    if (ret) die2(ret, "pthread_create");
-}
-
 static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
 {
     int ret;
@@ -301,106 +260,53 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
     return nbytes;
 }
 
-static void *aio_thread(void *unused)
+static void aio_thread(ThreadletWork *work)
 {
     pid_t pid;
+    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
+    ssize_t ret = 0;
 
     pid = getpid();
+    aiocb->active = 1;
 
-    while (1) {
-        struct qemu_paiocb *aiocb;
-        ssize_t ret = 0;
-        qemu_timeval tv;
-        struct timespec ts;
-
-        qemu_gettimeofday(&tv);
-        ts.tv_sec = tv.tv_sec + 10;
-        ts.tv_nsec = 0;
-
-        mutex_lock(&lock);
-
-        while (QTAILQ_EMPTY(&request_list) &&
-               !(ret == ETIMEDOUT)) {
-            ret = cond_timedwait(&cond, &lock, &ts);
-        }
-
-        if (QTAILQ_EMPTY(&request_list))
-            break;
-
-        aiocb = QTAILQ_FIRST(&request_list);
-        QTAILQ_REMOVE(&request_list, aiocb, node);
-        aiocb->active = 1;
-        idle_threads--;
-        mutex_unlock(&lock);
-
-        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
-        case QEMU_AIO_READ:
-        case QEMU_AIO_WRITE:
-            ret = handle_aiocb_rw(aiocb);
-            break;
-        case QEMU_AIO_FLUSH:
-            ret = handle_aiocb_flush(aiocb);
-            break;
-        case QEMU_AIO_IOCTL:
-            ret = handle_aiocb_ioctl(aiocb);
-            break;
-        default:
-            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
-            ret = -EINVAL;
-            break;
-        }
-
-        mutex_lock(&lock);
-        aiocb->ret = ret;
-        idle_threads++;
-        mutex_unlock(&lock);
-
-        if (kill(pid, aiocb->ev_signo)) die("kill failed");
+    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+    case QEMU_AIO_READ:
+    case QEMU_AIO_WRITE:
+        ret = handle_aiocb_rw(aiocb);
+        break;
+    case QEMU_AIO_FLUSH:
+        ret = handle_aiocb_flush(aiocb);
+        break;
+    case QEMU_AIO_IOCTL:
+        ret = handle_aiocb_ioctl(aiocb);
+        break;
+    default:
+        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+        ret = -EINVAL;
+        break;
     }
 
-    idle_threads--;
-    cur_threads--;
-    mutex_unlock(&lock);
-
-    return NULL;
-}
+    aiocb->ret = ret;
 
-static void spawn_thread(void)
-{
-    sigset_t set, oldset;
-
-    cur_threads++;
-    idle_threads++;
-
-    /* block all signals */
-    if (sigfillset(&set)) die("sigfillset");
-    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
-    thread_create(&thread_id, &attr, aio_thread, NULL);
-
-    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+    if (kill(pid, aiocb->ev_signo)) {
+        die("kill failed");
+    }
 }
 
 static void qemu_paio_submit(struct qemu_paiocb *aiocb)
 {
     aiocb->ret = -EINPROGRESS;
     aiocb->active = 0;
-    mutex_lock(&lock);
-    if (idle_threads == 0 && cur_threads < max_threads)
-        spawn_thread();
-    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
-    mutex_unlock(&lock);
-    cond_signal(&cond);
+
+    aiocb->work.func = aio_thread;
+    submit_threadletwork(&aiocb->work);
 }
 
 static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
 {
     ssize_t ret;
 
-    mutex_lock(&lock);
     ret = aiocb->ret;
-    mutex_unlock(&lock);
-
     return ret;
 }
 
@@ -536,14 +442,15 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
     int active = 0;
 
-    mutex_lock(&lock);
     if (!acb->active) {
-        QTAILQ_REMOVE(&request_list, acb, node);
-        acb->ret = -ECANCELED;
+        if (!deque_threadletwork(&acb->work)) {
+            acb->ret = -ECANCELED;
+         } else {
+            active = 1;
+         }
     } else if (acb->ret == -EINPROGRESS) {
         active = 1;
     }
-    mutex_unlock(&lock);
 
     if (active) {
         /* fail safe: if the aio could not be canceled, we wait for
@@ -618,7 +525,6 @@ int paio_init(void)
     struct sigaction act;
     PosixAioState *s;
     int fds[2];
-    int ret;
 
     if (posix_aio_state)
         return 0;
@@ -645,16 +551,6 @@ int paio_init(void)
     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
         posix_aio_process_queue, s);
 
-    ret = pthread_attr_init(&attr);
-    if (ret)
-        die2(ret, "pthread_attr_init");
-
-    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-    if (ret)
-        die2(ret, "pthread_attr_setdetachstate");
-
-    QTAILQ_INIT(&request_list);
-
     posix_aio_state = s;
     return 0;
 }

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
  2010-10-21 12:10 ` [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets Arun R Bharadwaj
@ 2010-10-23 11:57   ` Stefan Hajnoczi
  0 siblings, 0 replies; 29+ messages in thread
From: Stefan Hajnoczi @ 2010-10-23 11:57 UTC (permalink / raw)
  To: Arun R Bharadwaj; +Cc: qemu-devel

On Thu, Oct 21, 2010 at 1:10 PM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
>  static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
>  {
>     ssize_t ret;
>
> -    mutex_lock(&lock);
>     ret = aiocb->ret;
> -    mutex_unlock(&lock);
> -
>     return ret;
>  }
>
> @@ -536,14 +442,15 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
>     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
>     int active = 0;
>
> -    mutex_lock(&lock);
>     if (!acb->active) {
> -        QTAILQ_REMOVE(&request_list, acb, node);
> -        acb->ret = -ECANCELED;
> +        if (!deque_threadletwork(&acb->work)) {
> +            acb->ret = -ECANCELED;
> +         } else {
> +            active = 1;
> +         }
>     } else if (acb->ret == -EINPROGRESS) {
>         active = 1;
>     }
> -    mutex_unlock(&lock);
>
>     if (active) {
>         /* fail safe: if the aio could not be canceled, we wait for

Here is the assembly listing of what happens next:

 454:posix-aio-compat.c ****         while (qemu_paio_error(acb) == EINPROGRESS)
 539 0347 48F7D8   		negq	%rax
 540 034a 83F873   		cmpl	$115, %eax
 541 034d 7581     		jne	.L46
 543              	.L58:
 545 0350 EBFD     		jmp	.L58

This while loop is an infinite loop.  The compiler doesn't need to
load acb->ret from memory.

The reason this loop worked before threadlets was because
qemu_paio_return() used to acquire a lock to access acb->ret, forcing
the return value to be loaded each iteration:

static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
{
    ssize_t ret;

    mutex_lock(&lock);
    ret = aiocb->ret;
    mutex_unlock(&lock);

    return ret;
}

Please use synchronization to wait on the request like Anthony suggested.

Stefan

^ permalink raw reply	[flat|nested] 29+ messages in thread

* [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
  2010-10-26 14:14 [Qemu-devel] v8: [PATCH 0/3] Threadlets: A generic task offloading framework Arun R Bharadwaj
@ 2010-10-26 14:14 ` Arun R Bharadwaj
  2010-10-27  9:17   ` Stefan Hajnoczi
  0 siblings, 1 reply; 29+ messages in thread
From: Arun R Bharadwaj @ 2010-10-26 14:14 UTC (permalink / raw)
  To: qemu-devel

From: Gautham R Shenoy <ego@in.ibm.com>

This patch makes the paio subsystem use the threadlet framework thereby
decoupling asynchronous threading framework portion out of
posix-aio-compat.c

The patch has been tested with fstress.

Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Acked-by: Balbir Singh <balbir@linux.vnet.ibm.com>
---
 posix-aio-compat.c |  170 ++++++++++------------------------------------------
 1 files changed, 33 insertions(+), 137 deletions(-)

diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 7b862b5..2e47736 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,6 +29,7 @@
 #include "block_int.h"
 
 #include "block/raw-posix-aio.h"
+#include "qemu-threadlets.h"
 
 
 struct qemu_paiocb {
@@ -44,13 +45,13 @@ struct qemu_paiocb {
     int ev_signo;
     off_t aio_offset;
 
-    QTAILQ_ENTRY(qemu_paiocb) node;
     int aio_type;
     ssize_t ret;
     int active;
     struct qemu_paiocb *next;
 
     int async_context_id;
+    ThreadletWork work;
 };
 
 typedef struct PosixAioState {
@@ -59,15 +60,6 @@ typedef struct PosixAioState {
 } PosixAioState;
 
 
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
-
 #ifdef CONFIG_PREADV
 static int preadv_present = 1;
 #else
@@ -85,39 +77,6 @@ static void die(const char *what)
     die2(errno, what);
 }
 
-static void mutex_lock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_lock(mutex);
-    if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
-    int ret = pthread_mutex_unlock(mutex);
-    if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
-                           struct timespec *ts)
-{
-    int ret = pthread_cond_timedwait(cond, mutex, ts);
-    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
-    return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
-    int ret = pthread_cond_signal(cond);
-    if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
-                          void *(*start_routine)(void*), void *arg)
-{
-    int ret = pthread_create(thread, attr, start_routine, arg);
-    if (ret) die2(ret, "pthread_create");
-}
-
 static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
 {
     int ret;
@@ -301,106 +260,53 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
     return nbytes;
 }
 
-static void *aio_thread(void *unused)
+static void aio_thread(ThreadletWork *work)
 {
     pid_t pid;
+    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
+    ssize_t ret = 0;
 
     pid = getpid();
+    aiocb->active = 1;
 
-    while (1) {
-        struct qemu_paiocb *aiocb;
-        ssize_t ret = 0;
-        qemu_timeval tv;
-        struct timespec ts;
-
-        qemu_gettimeofday(&tv);
-        ts.tv_sec = tv.tv_sec + 10;
-        ts.tv_nsec = 0;
-
-        mutex_lock(&lock);
-
-        while (QTAILQ_EMPTY(&request_list) &&
-               !(ret == ETIMEDOUT)) {
-            ret = cond_timedwait(&cond, &lock, &ts);
-        }
-
-        if (QTAILQ_EMPTY(&request_list))
-            break;
-
-        aiocb = QTAILQ_FIRST(&request_list);
-        QTAILQ_REMOVE(&request_list, aiocb, node);
-        aiocb->active = 1;
-        idle_threads--;
-        mutex_unlock(&lock);
-
-        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
-        case QEMU_AIO_READ:
-        case QEMU_AIO_WRITE:
-            ret = handle_aiocb_rw(aiocb);
-            break;
-        case QEMU_AIO_FLUSH:
-            ret = handle_aiocb_flush(aiocb);
-            break;
-        case QEMU_AIO_IOCTL:
-            ret = handle_aiocb_ioctl(aiocb);
-            break;
-        default:
-            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
-            ret = -EINVAL;
-            break;
-        }
-
-        mutex_lock(&lock);
-        aiocb->ret = ret;
-        idle_threads++;
-        mutex_unlock(&lock);
-
-        if (kill(pid, aiocb->ev_signo)) die("kill failed");
+    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+    case QEMU_AIO_READ:
+    case QEMU_AIO_WRITE:
+        ret = handle_aiocb_rw(aiocb);
+        break;
+    case QEMU_AIO_FLUSH:
+        ret = handle_aiocb_flush(aiocb);
+        break;
+    case QEMU_AIO_IOCTL:
+        ret = handle_aiocb_ioctl(aiocb);
+        break;
+    default:
+        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+        ret = -EINVAL;
+        break;
     }
 
-    idle_threads--;
-    cur_threads--;
-    mutex_unlock(&lock);
-
-    return NULL;
-}
+    aiocb->ret = ret;
 
-static void spawn_thread(void)
-{
-    sigset_t set, oldset;
-
-    cur_threads++;
-    idle_threads++;
-
-    /* block all signals */
-    if (sigfillset(&set)) die("sigfillset");
-    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
-    thread_create(&thread_id, &attr, aio_thread, NULL);
-
-    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+    if (kill(pid, aiocb->ev_signo)) {
+        die("kill failed");
+    }
 }
 
 static void qemu_paio_submit(struct qemu_paiocb *aiocb)
 {
     aiocb->ret = -EINPROGRESS;
     aiocb->active = 0;
-    mutex_lock(&lock);
-    if (idle_threads == 0 && cur_threads < max_threads)
-        spawn_thread();
-    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
-    mutex_unlock(&lock);
-    cond_signal(&cond);
+
+    aiocb->work.func = aio_thread;
+    submit_threadletwork(&aiocb->work);
 }
 
 static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
 {
     ssize_t ret;
 
-    mutex_lock(&lock);
     ret = aiocb->ret;
-    mutex_unlock(&lock);
-
     return ret;
 }
 
@@ -536,14 +442,15 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
     int active = 0;
 
-    mutex_lock(&lock);
     if (!acb->active) {
-        QTAILQ_REMOVE(&request_list, acb, node);
-        acb->ret = -ECANCELED;
+        if (!deque_threadletwork(&acb->work)) {
+            acb->ret = -ECANCELED;
+         } else {
+            active = 1;
+         }
     } else if (acb->ret == -EINPROGRESS) {
         active = 1;
     }
-    mutex_unlock(&lock);
 
     if (active) {
         /* fail safe: if the aio could not be canceled, we wait for
@@ -618,7 +525,6 @@ int paio_init(void)
     struct sigaction act;
     PosixAioState *s;
     int fds[2];
-    int ret;
 
     if (posix_aio_state)
         return 0;
@@ -645,16 +551,6 @@ int paio_init(void)
     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
         posix_aio_process_queue, s);
 
-    ret = pthread_attr_init(&attr);
-    if (ret)
-        die2(ret, "pthread_attr_init");
-
-    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-    if (ret)
-        die2(ret, "pthread_attr_setdetachstate");
-
-    QTAILQ_INIT(&request_list);
-
     posix_aio_state = s;
     return 0;
 }

^ permalink raw reply related	[flat|nested] 29+ messages in thread

* Re: [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
  2010-10-26 14:14 ` [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets Arun R Bharadwaj
@ 2010-10-27  9:17   ` Stefan Hajnoczi
  0 siblings, 0 replies; 29+ messages in thread
From: Stefan Hajnoczi @ 2010-10-27  9:17 UTC (permalink / raw)
  To: Arun R Bharadwaj; +Cc: qemu-devel

On Tue, Oct 26, 2010 at 3:14 PM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> @@ -536,14 +442,15 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
>     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
>     int active = 0;
>
> -    mutex_lock(&lock);
>     if (!acb->active) {
> -        QTAILQ_REMOVE(&request_list, acb, node);
> -        acb->ret = -ECANCELED;
> +        if (!deque_threadletwork(&acb->work)) {
> +            acb->ret = -ECANCELED;
> +         } else {
> +            active = 1;
> +         }
>     } else if (acb->ret == -EINPROGRESS) {
>         active = 1;
>     }
> -    mutex_unlock(&lock);
>
>     if (active) {
>         /* fail safe: if the aio could not be canceled, we wait for

The paio_cancel() infinite loop is still there in this patch.

http://www.mail-archive.com/qemu-devel@nongnu.org/msg44766.html

Stefan

^ permalink raw reply	[flat|nested] 29+ messages in thread

end of thread, other threads:[~2010-10-27  9:17 UTC | newest]

Thread overview: 29+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2010-10-13 15:30 [Qemu-devel] v5 [PATCH 0/3] qemu: Threadlets: A generic task offloading framework Arun R Bharadwaj
2010-10-13 15:31 ` [Qemu-devel] [PATCH 1/3] Introduce threadlets Arun R Bharadwaj
2010-10-14  9:02   ` Stefan Hajnoczi
2010-10-14 21:17     ` Venkateswararao Jujjuri (JV)
2010-10-15  9:52       ` Stefan Hajnoczi
2010-10-15 14:56         ` Venkateswararao Jujjuri (JV)
2010-10-15 14:42     ` [Qemu-devel] " Paolo Bonzini
2010-10-14  9:15   ` [Qemu-devel] " Stefan Hajnoczi
2010-10-14  9:19     ` Gleb Natapov
2010-10-14 16:16     ` Avi Kivity
2010-10-14 21:32       ` Venkateswararao Jujjuri (JV)
2010-10-17  8:57         ` Avi Kivity
2010-10-18 10:47           ` Arun R Bharadwaj
2010-10-18 12:29             ` Avi Kivity
2010-10-15  8:05       ` Stefan Hajnoczi
2010-10-13 15:31 ` [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets Arun R Bharadwaj
2010-10-13 15:31 ` [Qemu-devel] [PATCH 3/3] Add helper functions for virtio-9p to " Arun R Bharadwaj
  -- strict thread matches above, loose matches on Subject: below --
2010-10-13 16:44 [Qemu-devel] [PATCH 0/3]: Threadlets: A generic task offloading framework Arun R Bharadwaj
2010-10-13 16:48 ` [Qemu-devel] [PATCH 2/3]: Make paio subsystem use threadlets Arun R Bharadwaj
2010-10-19 17:42 [Qemu-devel] v6: [PATCH 0/3]: Threadlets: A generic task offloading framework Arun R Bharadwaj
2010-10-19 17:43 ` [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets Arun R Bharadwaj
2010-10-20  2:24   ` Balbir Singh
2010-10-20  8:42   ` Kevin Wolf
2010-10-20  9:30   ` Stefan Hajnoczi
2010-10-20 13:16     ` Anthony Liguori
2010-10-21  8:40     ` Arun R Bharadwaj
2010-10-21  9:17       ` Stefan Hajnoczi
2010-10-21 12:10 [Qemu-devel] [PATCH 0/3]: v7: Threadlets: A generic task offloading framework Arun R Bharadwaj
2010-10-21 12:10 ` [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets Arun R Bharadwaj
2010-10-23 11:57   ` Stefan Hajnoczi
2010-10-26 14:14 [Qemu-devel] v8: [PATCH 0/3] Threadlets: A generic task offloading framework Arun R Bharadwaj
2010-10-26 14:14 ` [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets Arun R Bharadwaj
2010-10-27  9:17   ` Stefan Hajnoczi

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).