* [Qemu-devel] [PATCH 1/3] Introduce threadlets
2010-10-19 17:42 [Qemu-devel] v6: [PATCH 0/3]: Threadlets: A generic task offloading framework Arun R Bharadwaj
@ 2010-10-19 17:42 ` Arun R Bharadwaj
2010-10-19 18:36 ` Balbir Singh
2010-10-20 8:16 ` Stefan Hajnoczi
2010-10-19 17:43 ` [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets Arun R Bharadwaj
` (2 subsequent siblings)
3 siblings, 2 replies; 38+ messages in thread
From: Arun R Bharadwaj @ 2010-10-19 17:42 UTC (permalink / raw)
To: qemu-devel
From: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
This patch creates a generic asynchronous-task-offloading infrastructure named
threadlets. The core idea has been borrowed from the threading framework that
is being used by paio.
The reason for creating this generic infrastructure is so that other subsystems,
such as virtio-9p could make use of it for offloading tasks that could block.
The patch creates a global queue on-to which subsystems can queue their tasks to
be executed asynchronously.
The patch also provides API's that allow a subsystem to create a private queue
with an associated pool of threads.
[ego@in.ibm.com: Facelift of the code, Documentation, cancel_threadlet
and other helpers]
Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
Makefile.objs | 3 +
docs/async-support.txt | 141 +++++++++++++++++++++++++++++++++++++++++
qemu-threadlets.c | 165 ++++++++++++++++++++++++++++++++++++++++++++++++
qemu-threadlets.h | 48 ++++++++++++++
4 files changed, 356 insertions(+), 1 deletions(-)
create mode 100644 docs/async-support.txt
create mode 100644 qemu-threadlets.c
create mode 100644 qemu-threadlets.h
diff --git a/Makefile.objs b/Makefile.objs
index cd5a24b..2cf8aba 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -9,6 +9,8 @@ qobject-obj-y += qerror.o
block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
+block-obj-$(CONFIG_POSIX) += qemu-thread.o
+block-obj-$(CONFIG_POSIX) += qemu-threadlets.o
block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
@@ -124,7 +126,6 @@ endif
common-obj-y += $(addprefix ui/, $(ui-obj-y))
common-obj-y += iov.o acl.o
-common-obj-$(CONFIG_THREAD) += qemu-thread.o
common-obj-y += notify.o event_notifier.o
common-obj-y += qemu-timer.o
diff --git a/docs/async-support.txt b/docs/async-support.txt
new file mode 100644
index 0000000..9f22b9a
--- /dev/null
+++ b/docs/async-support.txt
@@ -0,0 +1,141 @@
+== How to use the threadlets infrastructure supported in Qemu ==
+
+== Threadlets ==
+
+Q.1: What are threadlets ?
+A.1: Threadlets is an infrastructure within QEMU that allows other subsystems
+ to offload possibly blocking work to a queue to be processed by a pool
+ of threads asynchronously.
+
+Q.2: When would one want to use threadlets ?
+A.2: Threadlets are useful when there are operations that can be performed
+ outside the context of the VCPU/IO threads inorder to free these latter
+ to service any other guest requests.
+
+Q.3: I have some work that can be executed in an asynchronous context. How
+ should I go about it ?
+A.3: One could follow the steps listed below:
+
+ - Define a function which would do the asynchronous work.
+ static void my_threadlet_func(ThreadletWork *work)
+ {
+ }
+
+ - Declare an object of type ThreadletWork;
+ ThreadletWork work;
+
+
+ - Assign a value to the "func" member of ThreadletWork object.
+ work.func = my_threadlet_func;
+
+ - Submit the threadlet to the global queue.
+ submit_threadletwork(&work);
+
+ - Continue servicing some other guest operations.
+
+Q.4: I want to my_threadlet_func to access some non-global data. How do I do
+ that ?
+A.4: Suppose you want my_threadlet_func to access some non-global data-object
+ of type myPrivateData. In that case one could follow the following steps.
+
+ - Define a member of the type ThreadletWork within myPrivateData.
+ typedef struct MyPrivateData {
+ ...;
+ ...;
+ ...;
+ ThreadletWork work;
+ } MyPrivateData;
+
+ MyPrivateData my_data;
+
+ - Initialize myData.work as described in A.3
+ myData.work.func = my_threadlet_func;
+ submit_threadletwork(&myData.work);
+
+ - Access the myData object inside my_threadlet_func() using container_of
+ primitive
+ static void my_threadlet_func(ThreadletWork *work)
+ {
+ myPrivateData *mydata_ptr;
+ mydata_ptr = container_of(work, myPrivateData, work);
+
+ /* mydata_ptr now points to myData object */
+ }
+
+Q.5: Are there any precautions one must take while sharing data with the
+ Asynchronous thread-pool ?
+A.5: Yes, make sure that the helper function of the type my_threadlet_func()
+ does not access/modify data when it can be accessed or modified in the
+ context of VCPU thread or IO thread. This is because the asynchronous
+ threads in the pool can run in parallel with the VCPU/IOThreads as shown
+ in the figure.
+
+ A typical workflow is as follows:
+
+ VCPU/IOThread
+ |
+ | (1)
+ |
+ V
+ Offload work (2)
+ |-------> to threadlets -----------------------------> Helper thread
+ | | |
+ | | |
+ | | (3) | (4)
+ | | |
+ | Handle other Guest requests |
+ | | |
+ | | V
+ | | (3) Signal the I/O Thread
+ |(6) | |
+ | | /
+ | | /
+ | V /
+ | Do the post <---------------------------------/
+ | processing (5)
+ | |
+ | | (6)
+ | V
+ |-Yes------ More async work?
+ |
+ | (7)
+ No
+ |
+ |
+ .
+ .
+
+ Hence one needs to make sure that in the steps (3) and (4) which run in
+ parallel, any global data is accessed within only one context.
+
+Q.6: I have queued a threadlet which I want to cancel. How do I do that ?
+A.6: Threadlets framework provides the API cancel_threadlet:
+ - int cancel_threadletwork(ThreadletWork *work)
+
+ The API scans the ThreadletQueue to see if (work) is present. If it finds
+ work, it'll dequeue work and return 0.
+
+ On the other hand, if it does not find the (work) in the ThreadletQueue,
+ then it'll return 1. This can imply two things. Either the work is being
+ processed by one of the helper threads or it has been processed. The
+ threadlet infrastructure currently _does_not_ distinguish between these
+ two and the onus is on the caller to do that.
+
+Q.7: Apart from the global pool of threads, can I have my own private Queue ?
+A.7: Yes, the threadlets framework allows subsystems to create their own private
+ queues with associated pools of threads.
+
+ - Define a PrivateQueue
+ ThreadletQueue myQueue;
+
+ - Initialize it:
+ threadlet_queue_init(&myQueue, my_max_threads, my_min_threads);
+ where my_max_threads is the maximum number of threads that can be in the
+ thread pool and my_min_threads is the minimum number of active threads
+ that can be in the thread-pool.
+
+ - Submit work:
+ submit_threadletwork_to_queue(&myQueue, &my_work);
+
+ - Cancel work:
+ cancel_threadletwork_on_queue(&myQueue, &my_work);
diff --git a/qemu-threadlets.c b/qemu-threadlets.c
new file mode 100644
index 0000000..fd33752
--- /dev/null
+++ b/qemu-threadlets.c
@@ -0,0 +1,165 @@
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ * Anthony Liguori <aliguori@us.ibm.com>
+ * Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
+ * Gautham R Shenoy <ego@in.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ */
+
+#include "qemu-threadlets.h"
+#include "osdep.h"
+
+#define MAX_GLOBAL_THREADS 64
+#define MIN_GLOBAL_THREADS 64
+static ThreadletQueue globalqueue;
+static int globalqueue_init;
+
+static void *threadlet_worker(void *data)
+{
+ ThreadletQueue *queue = data;
+
+ qemu_mutex_lock(&(queue->lock));
+ while (1) {
+ ThreadletWork *work;
+ int ret = 0;
+
+ while (QTAILQ_EMPTY(&(queue->request_list)) &&
+ (ret != ETIMEDOUT)) {
+ ret = qemu_cond_timedwait(&(queue->cond),
+ &(queue->lock), 10*100000);
+ }
+
+ assert(queue->idle_threads != 0);
+ if (QTAILQ_EMPTY(&(queue->request_list))) {
+ if (queue->cur_threads > queue->min_threads) {
+ /* We retain the minimum number of threads */
+ break;
+ }
+ } else {
+ work = QTAILQ_FIRST(&(queue->request_list));
+ QTAILQ_REMOVE(&(queue->request_list), work, node);
+
+ queue->idle_threads--;
+ qemu_mutex_unlock(&(queue->lock));
+
+ /* execute the work function */
+ work->func(work);
+
+ qemu_mutex_lock(&(queue->lock));
+ queue->idle_threads++;
+ }
+ }
+
+ queue->idle_threads--;
+ queue->cur_threads--;
+ qemu_mutex_unlock(&(queue->lock));
+
+ return NULL;
+}
+
+static void spawn_threadlet(ThreadletQueue *queue)
+{
+ QemuThread thread;
+
+ queue->cur_threads++;
+ queue->idle_threads++;
+
+ qemu_thread_create(&thread, threadlet_worker, queue);
+}
+
+/**
+ * submit_threadletwork_to_queue: Submit a new task to a private queue to be
+ * executed asynchronously.
+ * @queue: Per-subsystem private queue to which the new task needs
+ * to be submitted.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work)
+{
+ qemu_mutex_lock(&(queue->lock));
+ if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+ spawn_threadlet(queue);
+ }
+ QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
+ qemu_mutex_unlock(&(queue->lock));
+ qemu_cond_signal(&(queue->cond));
+}
+
+/**
+ * submit_threadletwork: Submit to the global queue a new task to be executed
+ * asynchronously.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_threadletwork(ThreadletWork *work)
+{
+ if (unlikely(!globalqueue_init)) {
+ threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
+ MIN_GLOBAL_THREADS);
+ globalqueue_init = 1;
+ }
+
+ submit_threadletwork_to_queue(&globalqueue, work);
+}
+
+/**
+ * cancel_threadletwork_on_queue: Cancel a task queued on a Queue.
+ * @queue: The queue containing the task to be cancelled.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ * 1 otherwise.
+ */
+int cancel_threadletwork_on_queue(ThreadletQueue *queue, ThreadletWork *work)
+{
+ ThreadletWork *ret_work;
+ int ret = 0;
+
+ qemu_mutex_lock(&(queue->lock));
+ QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
+ if (ret_work == work) {
+ QTAILQ_REMOVE(&(queue->request_list), ret_work, node);
+ ret = 1;
+ break;
+ }
+ }
+ qemu_mutex_unlock(&(queue->lock));
+
+ return ret;
+}
+
+/**
+ * cancel_threadletwork: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ * 1 otherwise.
+ */
+int cancel_threadletwork(ThreadletWork *work)
+{
+ return cancel_threadletwork_on_queue(&globalqueue, work);
+}
+
+/**
+ * threadlet_queue_init: Initialize a threadlet queue.
+ * @queue: The threadlet queue to be initialized.
+ * @max_threads: Maximum number of threads processing the queue.
+ * @min_threads: Minimum number of threads processing the queue.
+ */
+void threadlet_queue_init(ThreadletQueue *queue,
+ int max_threads, int min_threads)
+{
+ queue->cur_threads = 0;
+ queue->idle_threads = 0;
+ queue->max_threads = max_threads;
+ queue->min_threads = min_threads;
+ QTAILQ_INIT(&(queue->request_list));
+ qemu_mutex_init(&(queue->lock));
+ qemu_cond_init(&(queue->cond));
+}
diff --git a/qemu-threadlets.h b/qemu-threadlets.h
new file mode 100644
index 0000000..9c8f9e5
--- /dev/null
+++ b/qemu-threadlets.h
@@ -0,0 +1,48 @@
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ * Anthony Liguori <aliguori@us.ibm.com>
+ * Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
+ * Gautham R Shenoy <ego@in.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ */
+
+#ifndef QEMU_ASYNC_WORK_H
+#define QEMU_ASYNC_WORK_H
+
+#include "qemu-queue.h"
+#include "qemu-common.h"
+#include "qemu-thread.h"
+
+typedef struct ThreadletQueue
+{
+ QemuMutex lock;
+ QemuCond cond;
+ int max_threads;
+ int min_threads;
+ int cur_threads;
+ int idle_threads;
+ QTAILQ_HEAD(, ThreadletWork) request_list;
+} ThreadletQueue;
+
+typedef struct ThreadletWork
+{
+ QTAILQ_ENTRY(ThreadletWork) node;
+ void (*func)(struct ThreadletWork *work);
+} ThreadletWork;
+
+extern void submit_threadletwork_to_queue(ThreadletQueue *queue,
+ ThreadletWork *work);
+extern void submit_threadletwork(ThreadletWork *work);
+extern int cancel_threadletwork_on_queue(ThreadletQueue *queue,
+ ThreadletWork *work);
+extern int cancel_threadletwork(ThreadletWork *work);
+extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
+ int min_threads);
+#endif
^ permalink raw reply related [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
2010-10-19 17:42 ` [Qemu-devel] [PATCH 1/3] Introduce threadlets Arun R Bharadwaj
@ 2010-10-19 18:36 ` Balbir Singh
2010-10-19 19:01 ` [Qemu-devel] " Paolo Bonzini
` (2 more replies)
2010-10-20 8:16 ` Stefan Hajnoczi
1 sibling, 3 replies; 38+ messages in thread
From: Balbir Singh @ 2010-10-19 18:36 UTC (permalink / raw)
To: Arun R Bharadwaj; +Cc: qemu-devel
* Arun R B <arun@linux.vnet.ibm.com> [2010-10-19 23:12:45]:
> From: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
>
> This patch creates a generic asynchronous-task-offloading infrastructure named
> threadlets. The core idea has been borrowed from the threading framework that
> is being used by paio.
>
> The reason for creating this generic infrastructure is so that other subsystems,
> such as virtio-9p could make use of it for offloading tasks that could block.
>
> The patch creates a global queue on-to which subsystems can queue their tasks to
> be executed asynchronously.
>
> The patch also provides API's that allow a subsystem to create a private queue
> with an associated pool of threads.
>
> [ego@in.ibm.com: Facelift of the code, Documentation, cancel_threadlet
> and other helpers]
>
> Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
> Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
> Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
> ---
> Makefile.objs | 3 +
> docs/async-support.txt | 141 +++++++++++++++++++++++++++++++++++++++++
> qemu-threadlets.c | 165 ++++++++++++++++++++++++++++++++++++++++++++++++
> qemu-threadlets.h | 48 ++++++++++++++
> 4 files changed, 356 insertions(+), 1 deletions(-)
> create mode 100644 docs/async-support.txt
> create mode 100644 qemu-threadlets.c
> create mode 100644 qemu-threadlets.h
>
> diff --git a/Makefile.objs b/Makefile.objs
> index cd5a24b..2cf8aba 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -9,6 +9,8 @@ qobject-obj-y += qerror.o
>
> block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
> block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
> +block-obj-$(CONFIG_POSIX) += qemu-thread.o
> +block-obj-$(CONFIG_POSIX) += qemu-threadlets.o
> block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
> block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>
> @@ -124,7 +126,6 @@ endif
> common-obj-y += $(addprefix ui/, $(ui-obj-y))
>
> common-obj-y += iov.o acl.o
> -common-obj-$(CONFIG_THREAD) += qemu-thread.o
> common-obj-y += notify.o event_notifier.o
> common-obj-y += qemu-timer.o
>
> diff --git a/docs/async-support.txt b/docs/async-support.txt
> new file mode 100644
> index 0000000..9f22b9a
> --- /dev/null
> +++ b/docs/async-support.txt
> @@ -0,0 +1,141 @@
> +== How to use the threadlets infrastructure supported in Qemu ==
> +
> +== Threadlets ==
> +
> +Q.1: What are threadlets ?
> +A.1: Threadlets is an infrastructure within QEMU that allows other subsystems
> + to offload possibly blocking work to a queue to be processed by a pool
> + of threads asynchronously.
> +
> +Q.2: When would one want to use threadlets ?
> +A.2: Threadlets are useful when there are operations that can be performed
> + outside the context of the VCPU/IO threads inorder to free these latter
> + to service any other guest requests.
> +
> +Q.3: I have some work that can be executed in an asynchronous context. How
> + should I go about it ?
> +A.3: One could follow the steps listed below:
> +
> + - Define a function which would do the asynchronous work.
> + static void my_threadlet_func(ThreadletWork *work)
> + {
> + }
> +
> + - Declare an object of type ThreadletWork;
> + ThreadletWork work;
> +
> +
> + - Assign a value to the "func" member of ThreadletWork object.
> + work.func = my_threadlet_func;
> +
> + - Submit the threadlet to the global queue.
> + submit_threadletwork(&work);
> +
> + - Continue servicing some other guest operations.
> +
> +Q.4: I want to my_threadlet_func to access some non-global data. How do I do
> + that ?
> +A.4: Suppose you want my_threadlet_func to access some non-global data-object
> + of type myPrivateData. In that case one could follow the following steps.
> +
> + - Define a member of the type ThreadletWork within myPrivateData.
> + typedef struct MyPrivateData {
> + ...;
> + ...;
> + ...;
> + ThreadletWork work;
> + } MyPrivateData;
> +
> + MyPrivateData my_data;
> +
> + - Initialize myData.work as described in A.3
> + myData.work.func = my_threadlet_func;
> + submit_threadletwork(&myData.work);
> +
> + - Access the myData object inside my_threadlet_func() using container_of
> + primitive
> + static void my_threadlet_func(ThreadletWork *work)
> + {
> + myPrivateData *mydata_ptr;
> + mydata_ptr = container_of(work, myPrivateData, work);
> +
> + /* mydata_ptr now points to myData object */
> + }
> +
> +Q.5: Are there any precautions one must take while sharing data with the
> + Asynchronous thread-pool ?
> +A.5: Yes, make sure that the helper function of the type my_threadlet_func()
> + does not access/modify data when it can be accessed or modified in the
> + context of VCPU thread or IO thread. This is because the asynchronous
> + threads in the pool can run in parallel with the VCPU/IOThreads as shown
> + in the figure.
> +
> + A typical workflow is as follows:
> +
> + VCPU/IOThread
> + |
> + | (1)
> + |
> + V
> + Offload work (2)
> + |-------> to threadlets -----------------------------> Helper thread
> + | | |
> + | | |
> + | | (3) | (4)
> + | | |
> + | Handle other Guest requests |
> + | | |
> + | | V
> + | | (3) Signal the I/O Thread
> + |(6) | |
> + | | /
> + | | /
> + | V /
> + | Do the post <---------------------------------/
> + | processing (5)
> + | |
> + | | (6)
> + | V
> + |-Yes------ More async work?
> + |
> + | (7)
> + No
> + |
> + |
> + .
> + .
> +
> + Hence one needs to make sure that in the steps (3) and (4) which run in
> + parallel, any global data is accessed within only one context.
> +
> +Q.6: I have queued a threadlet which I want to cancel. How do I do that ?
> +A.6: Threadlets framework provides the API cancel_threadlet:
> + - int cancel_threadletwork(ThreadletWork *work)
> +
> + The API scans the ThreadletQueue to see if (work) is present. If it finds
> + work, it'll dequeue work and return 0.
> +
> + On the other hand, if it does not find the (work) in the ThreadletQueue,
> + then it'll return 1. This can imply two things. Either the work is being
> + processed by one of the helper threads or it has been processed. The
> + threadlet infrastructure currently _does_not_ distinguish between these
> + two and the onus is on the caller to do that.
> +
> +Q.7: Apart from the global pool of threads, can I have my own private Queue ?
> +A.7: Yes, the threadlets framework allows subsystems to create their own private
> + queues with associated pools of threads.
> +
> + - Define a PrivateQueue
> + ThreadletQueue myQueue;
> +
> + - Initialize it:
> + threadlet_queue_init(&myQueue, my_max_threads, my_min_threads);
> + where my_max_threads is the maximum number of threads that can be in the
> + thread pool and my_min_threads is the minimum number of active threads
> + that can be in the thread-pool.
> +
> + - Submit work:
> + submit_threadletwork_to_queue(&myQueue, &my_work);
> +
> + - Cancel work:
> + cancel_threadletwork_on_queue(&myQueue, &my_work);
> diff --git a/qemu-threadlets.c b/qemu-threadlets.c
> new file mode 100644
> index 0000000..fd33752
> --- /dev/null
> +++ b/qemu-threadlets.c
> @@ -0,0 +1,165 @@
> +/*
> + * Threadlet support for offloading tasks to be executed asynchronously
> + *
> + * Copyright IBM, Corp. 2008
> + * Copyright IBM, Corp. 2010
> + *
> + * Authors:
> + * Anthony Liguori <aliguori@us.ibm.com>
> + * Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
> + * Gautham R Shenoy <ego@in.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2. See
> + * the COPYING file in the top-level directory.
> + */
> +
> +#include "qemu-threadlets.h"
> +#include "osdep.h"
> +
> +#define MAX_GLOBAL_THREADS 64
> +#define MIN_GLOBAL_THREADS 64
> +static ThreadletQueue globalqueue;
> +static int globalqueue_init;
> +
> +static void *threadlet_worker(void *data)
> +{
> + ThreadletQueue *queue = data;
> +
Ideally you need
s = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
But qemu will need to wrap this around as well.
> + qemu_mutex_lock(&(queue->lock));
> + while (1) {
> + ThreadletWork *work;
> + int ret = 0;
> +
> + while (QTAILQ_EMPTY(&(queue->request_list)) &&
> + (ret != ETIMEDOUT)) {
> + ret = qemu_cond_timedwait(&(queue->cond),
> + &(queue->lock), 10*100000);
Ewww... what is 10*100000, can we use something more meaningful
please?
> + }
> +
> + assert(queue->idle_threads != 0);
This assertion holds because we believe one of the idle_threads
actually did the dequeuing, right?
> + if (QTAILQ_EMPTY(&(queue->request_list))) {
> + if (queue->cur_threads > queue->min_threads) {
> + /* We retain the minimum number of threads */
> + break;
> + }
> + } else {
> + work = QTAILQ_FIRST(&(queue->request_list));
> + QTAILQ_REMOVE(&(queue->request_list), work, node);
> +
> + queue->idle_threads--;
> + qemu_mutex_unlock(&(queue->lock));
> +
> + /* execute the work function */
> + work->func(work);
> +
> + qemu_mutex_lock(&(queue->lock));
> + queue->idle_threads++;
> + }
> + }
> +
> + queue->idle_threads--;
> + queue->cur_threads--;
> + qemu_mutex_unlock(&(queue->lock));
> +
> + return NULL;
Does anybody do a join on the exiting thread from the pool?
> +}
> +
> +static void spawn_threadlet(ThreadletQueue *queue)
> +{
> + QemuThread thread;
> +
> + queue->cur_threads++;
> + queue->idle_threads++;
> +
> + qemu_thread_create(&thread, threadlet_worker, queue);
> +}
> +
> +/**
> + * submit_threadletwork_to_queue: Submit a new task to a private queue to be
> + * executed asynchronously.
> + * @queue: Per-subsystem private queue to which the new task needs
> + * to be submitted.
> + * @work: Contains information about the task that needs to be submitted.
> + */
> +void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work)
> +{
> + qemu_mutex_lock(&(queue->lock));
> + if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
> + spawn_threadlet(queue);
So we hold queue->lock, spawn the thread, the spawned thread tries to
acquire queue->lock
> + }
> + QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
> + qemu_mutex_unlock(&(queue->lock));
> + qemu_cond_signal(&(queue->cond));
In the case that we just spawned the threadlet, the cond_signal is
spurious. If we need predictable scheduling behaviour,
qemu_cond_signal needs to happen with queue->lock held.
I'd rewrite the function as
/**
* submit_threadletwork_to_queue: Submit a new task to a private queue to be
* executed asynchronously.
* @queue: Per-subsystem private queue to which the new task needs
* to be submitted.
* @work: Contains information about the task that needs to be submitted.
*/
void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work)
{
qemu_mutex_lock(&(queue->lock));
if (queue->idle_threads == 0 && (queue->cur_threads < queue->max_threads)) {
spawn_threadlet(queue);
} else {
qemu_cond_signal(&(queue->cond));
}
QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
qemu_mutex_unlock(&(queue->lock));
}
> +/**
> + * submit_threadletwork: Submit to the global queue a new task to be executed
> + * asynchronously.
> + * @work: Contains information about the task that needs to be submitted.
> + */
> +void submit_threadletwork(ThreadletWork *work)
> +{
> + if (unlikely(!globalqueue_init)) {
> + threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
> + MIN_GLOBAL_THREADS);
> + globalqueue_init = 1;
> + }
What protects globalqueue_init?
> +
> + submit_threadletwork_to_queue(&globalqueue, work);
> +}
> +
> +/**
> + * cancel_threadletwork_on_queue: Cancel a task queued on a Queue.
> + * @queue: The queue containing the task to be cancelled.
> + * @work: Contains the information of the task that needs to be cancelled.
> + *
> + * Returns: 0 if the task is successfully cancelled.
> + * 1 otherwise.
> + */
> +int cancel_threadletwork_on_queue(ThreadletQueue *queue, ThreadletWork *work)
> +{
> + ThreadletWork *ret_work;
> + int ret = 0;
> +
> + qemu_mutex_lock(&(queue->lock));
> + QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
> + if (ret_work == work) {
> + QTAILQ_REMOVE(&(queue->request_list), ret_work, node);
> + ret = 1;
> + break;
> + }
> + }
> + qemu_mutex_unlock(&(queue->lock));
> +
> + return ret;
> +}
> +
> +/**
> + * cancel_threadletwork: Cancel a task queued on the global queue.
NOTE: cancel is a confusing term, thread cancel is different from
cancelling a job on the global queue, I'd preferrably call this
dequeue_threadletwork
Generic question, is thread a reason to use threadletwork as one word,
instead of threadlet_work? Specially since the data structure is
called ThreadletWork.
> + * @work: Contains the information of the task that needs to be cancelled.
> + *
> + * Returns: 0 if the task is successfully cancelled.
> + * 1 otherwise.
> + */
> +int cancel_threadletwork(ThreadletWork *work)
> +{
> + return cancel_threadletwork_on_queue(&globalqueue, work);
> +}
> +
> +/**
> + * threadlet_queue_init: Initialize a threadlet queue.
> + * @queue: The threadlet queue to be initialized.
> + * @max_threads: Maximum number of threads processing the queue.
> + * @min_threads: Minimum number of threads processing the queue.
> + */
> +void threadlet_queue_init(ThreadletQueue *queue,
> + int max_threads, int min_threads)
> +{
> + queue->cur_threads = 0;
> + queue->idle_threads = 0;
> + queue->max_threads = max_threads;
> + queue->min_threads = min_threads;
> + QTAILQ_INIT(&(queue->request_list));
> + qemu_mutex_init(&(queue->lock));
> + qemu_cond_init(&(queue->cond));
> +}
> diff --git a/qemu-threadlets.h b/qemu-threadlets.h
> new file mode 100644
> index 0000000..9c8f9e5
> --- /dev/null
> +++ b/qemu-threadlets.h
> @@ -0,0 +1,48 @@
> +/*
> + * Threadlet support for offloading tasks to be executed asynchronously
> + *
> + * Copyright IBM, Corp. 2008
> + * Copyright IBM, Corp. 2010
> + *
> + * Authors:
> + * Anthony Liguori <aliguori@us.ibm.com>
> + * Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
> + * Gautham R Shenoy <ego@in.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2. See
> + * the COPYING file in the top-level directory.
> + */
> +
> +#ifndef QEMU_ASYNC_WORK_H
> +#define QEMU_ASYNC_WORK_H
> +
> +#include "qemu-queue.h"
> +#include "qemu-common.h"
> +#include "qemu-thread.h"
> +
> +typedef struct ThreadletQueue
> +{
> + QemuMutex lock;
> + QemuCond cond;
> + int max_threads;
> + int min_threads;
> + int cur_threads;
> + int idle_threads;
> + QTAILQ_HEAD(, ThreadletWork) request_list;
> +} ThreadletQueue;
> +
> +typedef struct ThreadletWork
> +{
> + QTAILQ_ENTRY(ThreadletWork) node;
> + void (*func)(struct ThreadletWork *work);
> +} ThreadletWork;
> +
> +extern void submit_threadletwork_to_queue(ThreadletQueue *queue,
> + ThreadletWork *work);
> +extern void submit_threadletwork(ThreadletWork *work);
> +extern int cancel_threadletwork_on_queue(ThreadletQueue *queue,
> + ThreadletWork *work);
> +extern int cancel_threadletwork(ThreadletWork *work);
> +extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
> + int min_threads);
> +#endif
>
>
--
Three Cheers,
Balbir
^ permalink raw reply [flat|nested] 38+ messages in thread
* [Qemu-devel] Re: [PATCH 1/3] Introduce threadlets
2010-10-19 18:36 ` Balbir Singh
@ 2010-10-19 19:01 ` Paolo Bonzini
2010-10-19 19:12 ` Balbir Singh
2010-10-19 21:00 ` [Qemu-devel] " Venkateswararao Jujjuri (JV)
2010-10-19 21:36 ` Anthony Liguori
2 siblings, 1 reply; 38+ messages in thread
From: Paolo Bonzini @ 2010-10-19 19:01 UTC (permalink / raw)
To: balbir; +Cc: Arun R Bharadwaj, qemu-devel
On 10/19/2010 08:36 PM, Balbir Singh wrote:
> Ideally you need
>
> s = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
>
> But qemu will need to wrap this around as well.
Why? QEMU is never using thread cancellation.
Paolo
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] Re: [PATCH 1/3] Introduce threadlets
2010-10-19 19:01 ` [Qemu-devel] " Paolo Bonzini
@ 2010-10-19 19:12 ` Balbir Singh
2010-10-19 19:29 ` Paolo Bonzini
0 siblings, 1 reply; 38+ messages in thread
From: Balbir Singh @ 2010-10-19 19:12 UTC (permalink / raw)
To: Paolo Bonzini; +Cc: Arun R Bharadwaj, qemu-devel
* Paolo Bonzini <pbonzini@redhat.com> [2010-10-19 21:01:03]:
> On 10/19/2010 08:36 PM, Balbir Singh wrote:
> >Ideally you need
> >
> > s = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
> >
> >But qemu will need to wrap this around as well.
>
> Why? QEMU is never using thread cancellation.
>
Yes, I agree, in the longer run, cancellation is a good way to kill
threads, specially in a thread pool. My comment was more along the
lines of good practices and potential use of pthread_cancel(), not a
strict comment on something urgent or broken.
--
Three Cheers,
Balbir
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] Re: [PATCH 1/3] Introduce threadlets
2010-10-19 19:12 ` Balbir Singh
@ 2010-10-19 19:29 ` Paolo Bonzini
0 siblings, 0 replies; 38+ messages in thread
From: Paolo Bonzini @ 2010-10-19 19:29 UTC (permalink / raw)
To: balbir; +Cc: Arun R Bharadwaj, qemu-devel
On 10/19/2010 09:12 PM, Balbir Singh wrote:
> > >Ideally you need
> > >
> > > s = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
> > >
> > > But qemu will need to wrap this around as well.
> >
> > Why? QEMU is never using thread cancellation.
>
> Yes, I agree, in the longer run, cancellation is a good way to kill
> threads, specially in a thread pool. My comment was more along the
> lines of good practices and potential use of pthread_cancel(), not a
> strict comment on something urgent or broken.
But there is no such use; as long as we keep ourselves to the
qemu-thread API, we know that nothing will use cancellation.
The day qemu-thread will introduce cancellation functions we'll care
about enabling/disabling it in some threads.
Paolo
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
2010-10-19 18:36 ` Balbir Singh
2010-10-19 19:01 ` [Qemu-devel] " Paolo Bonzini
@ 2010-10-19 21:00 ` Venkateswararao Jujjuri (JV)
2010-10-20 2:26 ` Balbir Singh
2010-10-19 21:36 ` Anthony Liguori
2 siblings, 1 reply; 38+ messages in thread
From: Venkateswararao Jujjuri (JV) @ 2010-10-19 21:00 UTC (permalink / raw)
To: balbir; +Cc: Arun R Bharadwaj, qemu-devel
On 10/19/2010 11:36 AM, Balbir Singh wrote:
> * Arun R B <arun@linux.vnet.ibm.com> [2010-10-19 23:12:45]:
>
>> From: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
>>
>> This patch creates a generic asynchronous-task-offloading infrastructure named
>> threadlets. The core idea has been borrowed from the threading framework that
>> is being used by paio.
>>
>> The reason for creating this generic infrastructure is so that other subsystems,
>> such as virtio-9p could make use of it for offloading tasks that could block.
>>
>> The patch creates a global queue on-to which subsystems can queue their tasks to
>> be executed asynchronously.
>>
>> The patch also provides API's that allow a subsystem to create a private queue
>> with an associated pool of threads.
>>
>> [ego@in.ibm.com: Facelift of the code, Documentation, cancel_threadlet
>> and other helpers]
>>
>> Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
>> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
>> Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
>> Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
>> ---
>> Makefile.objs | 3 +
>> docs/async-support.txt | 141 +++++++++++++++++++++++++++++++++++++++++
>> qemu-threadlets.c | 165 ++++++++++++++++++++++++++++++++++++++++++++++++
>> qemu-threadlets.h | 48 ++++++++++++++
>> 4 files changed, 356 insertions(+), 1 deletions(-)
>> create mode 100644 docs/async-support.txt
>> create mode 100644 qemu-threadlets.c
>> create mode 100644 qemu-threadlets.h
>>
>> diff --git a/Makefile.objs b/Makefile.objs
>> index cd5a24b..2cf8aba 100644
>> --- a/Makefile.objs
>> +++ b/Makefile.objs
>> @@ -9,6 +9,8 @@ qobject-obj-y += qerror.o
>>
>> block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
>> block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
>> +block-obj-$(CONFIG_POSIX) += qemu-thread.o
>> +block-obj-$(CONFIG_POSIX) += qemu-threadlets.o
>> block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
>> block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>>
>> @@ -124,7 +126,6 @@ endif
>> common-obj-y += $(addprefix ui/, $(ui-obj-y))
>>
>> common-obj-y += iov.o acl.o
>> -common-obj-$(CONFIG_THREAD) += qemu-thread.o
>> common-obj-y += notify.o event_notifier.o
>> common-obj-y += qemu-timer.o
>>
>> diff --git a/docs/async-support.txt b/docs/async-support.txt
>> new file mode 100644
>> index 0000000..9f22b9a
>> --- /dev/null
>> +++ b/docs/async-support.txt
>> @@ -0,0 +1,141 @@
>> +== How to use the threadlets infrastructure supported in Qemu ==
>> +
>> +== Threadlets ==
>> +
>> +Q.1: What are threadlets ?
>> +A.1: Threadlets is an infrastructure within QEMU that allows other subsystems
>> + to offload possibly blocking work to a queue to be processed by a pool
>> + of threads asynchronously.
>> +
>> +Q.2: When would one want to use threadlets ?
>> +A.2: Threadlets are useful when there are operations that can be performed
>> + outside the context of the VCPU/IO threads inorder to free these latter
>> + to service any other guest requests.
>> +
>> +Q.3: I have some work that can be executed in an asynchronous context. How
>> + should I go about it ?
>> +A.3: One could follow the steps listed below:
>> +
>> + - Define a function which would do the asynchronous work.
>> + static void my_threadlet_func(ThreadletWork *work)
>> + {
>> + }
>> +
>> + - Declare an object of type ThreadletWork;
>> + ThreadletWork work;
>> +
>> +
>> + - Assign a value to the "func" member of ThreadletWork object.
>> + work.func = my_threadlet_func;
>> +
>> + - Submit the threadlet to the global queue.
>> + submit_threadletwork(&work);
>> +
>> + - Continue servicing some other guest operations.
>> +
>> +Q.4: I want to my_threadlet_func to access some non-global data. How do I do
>> + that ?
>> +A.4: Suppose you want my_threadlet_func to access some non-global data-object
>> + of type myPrivateData. In that case one could follow the following steps.
>> +
>> + - Define a member of the type ThreadletWork within myPrivateData.
>> + typedef struct MyPrivateData {
>> + ...;
>> + ...;
>> + ...;
>> + ThreadletWork work;
>> + } MyPrivateData;
>> +
>> + MyPrivateData my_data;
>> +
>> + - Initialize myData.work as described in A.3
>> + myData.work.func = my_threadlet_func;
>> + submit_threadletwork(&myData.work);
>> +
>> + - Access the myData object inside my_threadlet_func() using container_of
>> + primitive
>> + static void my_threadlet_func(ThreadletWork *work)
>> + {
>> + myPrivateData *mydata_ptr;
>> + mydata_ptr = container_of(work, myPrivateData, work);
>> +
>> + /* mydata_ptr now points to myData object */
>> + }
>> +
>> +Q.5: Are there any precautions one must take while sharing data with the
>> + Asynchronous thread-pool ?
>> +A.5: Yes, make sure that the helper function of the type my_threadlet_func()
>> + does not access/modify data when it can be accessed or modified in the
>> + context of VCPU thread or IO thread. This is because the asynchronous
>> + threads in the pool can run in parallel with the VCPU/IOThreads as shown
>> + in the figure.
>> +
>> + A typical workflow is as follows:
>> +
>> + VCPU/IOThread
>> + |
>> + | (1)
>> + |
>> + V
>> + Offload work (2)
>> + |-------> to threadlets -----------------------------> Helper thread
>> + | | |
>> + | | |
>> + | | (3) | (4)
>> + | | |
>> + | Handle other Guest requests |
>> + | | |
>> + | | V
>> + | | (3) Signal the I/O Thread
>> + |(6) | |
>> + | | /
>> + | | /
>> + | V /
>> + | Do the post <---------------------------------/
>> + | processing (5)
>> + | |
>> + | | (6)
>> + | V
>> + |-Yes------ More async work?
>> + |
>> + | (7)
>> + No
>> + |
>> + |
>> + .
>> + .
>> +
>> + Hence one needs to make sure that in the steps (3) and (4) which run in
>> + parallel, any global data is accessed within only one context.
>> +
>> +Q.6: I have queued a threadlet which I want to cancel. How do I do that ?
>> +A.6: Threadlets framework provides the API cancel_threadlet:
>> + - int cancel_threadletwork(ThreadletWork *work)
>> +
>> + The API scans the ThreadletQueue to see if (work) is present. If it finds
>> + work, it'll dequeue work and return 0.
>> +
>> + On the other hand, if it does not find the (work) in the ThreadletQueue,
>> + then it'll return 1. This can imply two things. Either the work is being
>> + processed by one of the helper threads or it has been processed. The
>> + threadlet infrastructure currently _does_not_ distinguish between these
>> + two and the onus is on the caller to do that.
>> +
>> +Q.7: Apart from the global pool of threads, can I have my own private Queue ?
>> +A.7: Yes, the threadlets framework allows subsystems to create their own private
>> + queues with associated pools of threads.
>> +
>> + - Define a PrivateQueue
>> + ThreadletQueue myQueue;
>> +
>> + - Initialize it:
>> + threadlet_queue_init(&myQueue, my_max_threads, my_min_threads);
>> + where my_max_threads is the maximum number of threads that can be in the
>> + thread pool and my_min_threads is the minimum number of active threads
>> + that can be in the thread-pool.
>> +
>> + - Submit work:
>> + submit_threadletwork_to_queue(&myQueue, &my_work);
>> +
>> + - Cancel work:
>> + cancel_threadletwork_on_queue(&myQueue, &my_work);
>> diff --git a/qemu-threadlets.c b/qemu-threadlets.c
>> new file mode 100644
>> index 0000000..fd33752
>> --- /dev/null
>> +++ b/qemu-threadlets.c
>> @@ -0,0 +1,165 @@
>> +/*
>> + * Threadlet support for offloading tasks to be executed asynchronously
>> + *
>> + * Copyright IBM, Corp. 2008
>> + * Copyright IBM, Corp. 2010
>> + *
>> + * Authors:
>> + * Anthony Liguori <aliguori@us.ibm.com>
>> + * Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
>> + * Gautham R Shenoy <ego@in.ibm.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2. See
>> + * the COPYING file in the top-level directory.
>> + */
>> +
>> +#include "qemu-threadlets.h"
>> +#include "osdep.h"
>> +
>> +#define MAX_GLOBAL_THREADS 64
>> +#define MIN_GLOBAL_THREADS 64
>> +static ThreadletQueue globalqueue;
>> +static int globalqueue_init;
>> +
>> +static void *threadlet_worker(void *data)
>> +{
>> + ThreadletQueue *queue = data;
>> +
> Ideally you need
>
> s = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
>
> But qemu will need to wrap this around as well.
>> + qemu_mutex_lock(&(queue->lock));
>> + while (1) {
>> + ThreadletWork *work;
>> + int ret = 0;
>> +
>> + while (QTAILQ_EMPTY(&(queue->request_list)) &&
>> + (ret != ETIMEDOUT)) {
>> + ret = qemu_cond_timedwait(&(queue->cond),
>> + &(queue->lock), 10*100000);
>
> Ewww... what is 10*100000, can we use something more meaningful
> please?
Or at least some comment...
>
>> + }
>> +
>> + assert(queue->idle_threads != 0);
>
> This assertion holds because we believe one of the idle_threads
> actually did the dequeuing, right?
Correct.. or there is no work..and we are woken up by the time out.
Of course in that case #of worker threads should be min_threads.
>
>> + if (QTAILQ_EMPTY(&(queue->request_list))) {
>> + if (queue->cur_threads > queue->min_threads) {
>> + /* We retain the minimum number of threads */
>> + break;
>> + }
>> + } else {
>> + work = QTAILQ_FIRST(&(queue->request_list));
>> + QTAILQ_REMOVE(&(queue->request_list), work, node);
>> +
>> + queue->idle_threads--;
>> + qemu_mutex_unlock(&(queue->lock));
>> +
>> + /* execute the work function */
>> + work->func(work);
>> +
>> + qemu_mutex_lock(&(queue->lock));
>> + queue->idle_threads++;
>> + }
>> + }
>> +
>> + queue->idle_threads--;
>> + queue->cur_threads--;
>> + qemu_mutex_unlock(&(queue->lock));
>> +
>> + return NULL;
> Does anybody do a join on the exiting thread from the pool?
Not sure what you mean here. We keep min threads and float
threads up to max limit on need basis.
>
>> +}
>> +
>> +static void spawn_threadlet(ThreadletQueue *queue)
>> +{
>> + QemuThread thread;
>> +
>> + queue->cur_threads++;
>> + queue->idle_threads++;
>> +
>> + qemu_thread_create(&thread, threadlet_worker, queue);
>
>> +}
>> +
>> +/**
>> + * submit_threadletwork_to_queue: Submit a new task to a private queue to be
>> + * executed asynchronously.
>> + * @queue: Per-subsystem private queue to which the new task needs
>> + * to be submitted.
>> + * @work: Contains information about the task that needs to be submitted.
>> + */
>> +void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work)
>> +{
>> + qemu_mutex_lock(&(queue->lock));
>> + if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
>> + spawn_threadlet(queue);
>
> So we hold queue->lock, spawn the thread, the spawned thread tries to
> acquire queue->lock
>
>> + }
>> + QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
>> + qemu_mutex_unlock(&(queue->lock));
>> + qemu_cond_signal(&(queue->cond));
>
> In the case that we just spawned the threadlet, the cond_signal is
> spurious. If we need predictable scheduling behaviour,
> qemu_cond_signal needs to happen with queue->lock held.
>
> I'd rewrite the function as
>
> /**
> * submit_threadletwork_to_queue: Submit a new task to a private queue to be
> * executed asynchronously.
> * @queue: Per-subsystem private queue to which the new task needs
> * to be submitted.
> * @work: Contains information about the task that needs to be submitted.
> */
> void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work)
> {
> qemu_mutex_lock(&(queue->lock));
> if (queue->idle_threads == 0 && (queue->cur_threads < queue->max_threads)) {
> spawn_threadlet(queue);
> } else {
> qemu_cond_signal(&(queue->cond));
> }
> QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
> qemu_mutex_unlock(&(queue->lock));
> }
>
This looks fine to me. may be more cleaner than the previous one..but functionally
not much different.
>> +/**
>> + * submit_threadletwork: Submit to the global queue a new task to be executed
>> + * asynchronously.
>> + * @work: Contains information about the task that needs to be submitted.
>> + */
>> +void submit_threadletwork(ThreadletWork *work)
>> +{
>> + if (unlikely(!globalqueue_init)) {
>> + threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
>> + MIN_GLOBAL_THREADS);
>> + globalqueue_init = 1;
>> + }
>
> What protects globalqueue_init?
It should be called in vCPU thread context which is serialized by definition.
- JV
>
>> +
>> + submit_threadletwork_to_queue(&globalqueue, work);
>> +}
>> +
>> +/**
>> + * cancel_threadletwork_on_queue: Cancel a task queued on a Queue.
>> + * @queue: The queue containing the task to be cancelled.
>> + * @work: Contains the information of the task that needs to be cancelled.
>> + *
>> + * Returns: 0 if the task is successfully cancelled.
>> + * 1 otherwise.
>> + */
>> +int cancel_threadletwork_on_queue(ThreadletQueue *queue, ThreadletWork *work)
>> +{
>> + ThreadletWork *ret_work;
>> + int ret = 0;
>> +
>> + qemu_mutex_lock(&(queue->lock));
>> + QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
>> + if (ret_work == work) {
>> + QTAILQ_REMOVE(&(queue->request_list), ret_work, node);
>> + ret = 1;
>> + break;
>> + }
>> + }
>> + qemu_mutex_unlock(&(queue->lock));
>> +
>> + return ret;
>> +}
>> +
>> +/**
>> + * cancel_threadletwork: Cancel a task queued on the global queue.
>
> NOTE: cancel is a confusing term, thread cancel is different from
> cancelling a job on the global queue, I'd preferrably call this
> dequeue_threadletwork
>
> Generic question, is thread a reason to use threadletwork as one word,
> instead of threadlet_work? Specially since the data structure is
> called ThreadletWork.
>
>> + * @work: Contains the information of the task that needs to be cancelled.
>> + *
>> + * Returns: 0 if the task is successfully cancelled.
>> + * 1 otherwise.
>> + */
>> +int cancel_threadletwork(ThreadletWork *work)
>> +{
>> + return cancel_threadletwork_on_queue(&globalqueue, work);
>> +}
>> +
>> +/**
>> + * threadlet_queue_init: Initialize a threadlet queue.
>> + * @queue: The threadlet queue to be initialized.
>> + * @max_threads: Maximum number of threads processing the queue.
>> + * @min_threads: Minimum number of threads processing the queue.
>> + */
>> +void threadlet_queue_init(ThreadletQueue *queue,
>> + int max_threads, int min_threads)
>> +{
>> + queue->cur_threads = 0;
>> + queue->idle_threads = 0;
>> + queue->max_threads = max_threads;
>> + queue->min_threads = min_threads;
>> + QTAILQ_INIT(&(queue->request_list));
>> + qemu_mutex_init(&(queue->lock));
>> + qemu_cond_init(&(queue->cond));
>> +}
>> diff --git a/qemu-threadlets.h b/qemu-threadlets.h
>> new file mode 100644
>> index 0000000..9c8f9e5
>> --- /dev/null
>> +++ b/qemu-threadlets.h
>> @@ -0,0 +1,48 @@
>> +/*
>> + * Threadlet support for offloading tasks to be executed asynchronously
>> + *
>> + * Copyright IBM, Corp. 2008
>> + * Copyright IBM, Corp. 2010
>> + *
>> + * Authors:
>> + * Anthony Liguori <aliguori@us.ibm.com>
>> + * Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
>> + * Gautham R Shenoy <ego@in.ibm.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2. See
>> + * the COPYING file in the top-level directory.
>> + */
>> +
>> +#ifndef QEMU_ASYNC_WORK_H
>> +#define QEMU_ASYNC_WORK_H
>> +
>> +#include "qemu-queue.h"
>> +#include "qemu-common.h"
>> +#include "qemu-thread.h"
>> +
>> +typedef struct ThreadletQueue
>> +{
>> + QemuMutex lock;
>> + QemuCond cond;
>> + int max_threads;
>> + int min_threads;
>> + int cur_threads;
>> + int idle_threads;
>> + QTAILQ_HEAD(, ThreadletWork) request_list;
>> +} ThreadletQueue;
>> +
>> +typedef struct ThreadletWork
>> +{
>> + QTAILQ_ENTRY(ThreadletWork) node;
>> + void (*func)(struct ThreadletWork *work);
>> +} ThreadletWork;
>> +
>> +extern void submit_threadletwork_to_queue(ThreadletQueue *queue,
>> + ThreadletWork *work);
>> +extern void submit_threadletwork(ThreadletWork *work);
>> +extern int cancel_threadletwork_on_queue(ThreadletQueue *queue,
>> + ThreadletWork *work);
>> +extern int cancel_threadletwork(ThreadletWork *work);
>> +extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
>> + int min_threads);
>> +#endif
>>
>>
>
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
2010-10-19 21:00 ` [Qemu-devel] " Venkateswararao Jujjuri (JV)
@ 2010-10-20 2:26 ` Balbir Singh
0 siblings, 0 replies; 38+ messages in thread
From: Balbir Singh @ 2010-10-20 2:26 UTC (permalink / raw)
To: Venkateswararao Jujjuri (JV); +Cc: Arun R Bharadwaj, qemu-devel
* Venkateswararao Jujjuri (JV) <jvrao@linux.vnet.ibm.com> [2010-10-19 14:00:24]:
> >
> > In the case that we just spawned the threadlet, the cond_signal is
> > spurious. If we need predictable scheduling behaviour,
> > qemu_cond_signal needs to happen with queue->lock held.
> >
> > I'd rewrite the function as
> >
> > /**
> > * submit_threadletwork_to_queue: Submit a new task to a private queue to be
> > * executed asynchronously.
> > * @queue: Per-subsystem private queue to which the new task needs
> > * to be submitted.
> > * @work: Contains information about the task that needs to be submitted.
> > */
> > void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work)
> > {
> > qemu_mutex_lock(&(queue->lock));
> > if (queue->idle_threads == 0 && (queue->cur_threads < queue->max_threads)) {
> > spawn_threadlet(queue);
> > } else {
> > qemu_cond_signal(&(queue->cond));
> > }
> > QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
> > qemu_mutex_unlock(&(queue->lock));
> > }
> >
> This looks fine to me. may be more cleaner than the previous one..but functionally
> not much different.
>
It potentially does better at avoiding the spurious wakeup problem
(reduces the window). In another email I mentioned the man page says
"however, if predictable scheduling behavior is required, then that
mutex shall be locked by the thread calling pthread_cond_broadcast()
or pthread_cond_signal()"
--
Three Cheers,
Balbir
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
2010-10-19 18:36 ` Balbir Singh
2010-10-19 19:01 ` [Qemu-devel] " Paolo Bonzini
2010-10-19 21:00 ` [Qemu-devel] " Venkateswararao Jujjuri (JV)
@ 2010-10-19 21:36 ` Anthony Liguori
2010-10-20 2:22 ` Balbir Singh
2010-10-20 3:19 ` Venkateswararao Jujjuri (JV)
2 siblings, 2 replies; 38+ messages in thread
From: Anthony Liguori @ 2010-10-19 21:36 UTC (permalink / raw)
To: balbir; +Cc: Arun R Bharadwaj, qemu-devel
On 10/19/2010 01:36 PM, Balbir Singh wrote:
>> + qemu_mutex_lock(&(queue->lock));
>> + while (1) {
>> + ThreadletWork *work;
>> + int ret = 0;
>> +
>> + while (QTAILQ_EMPTY(&(queue->request_list))&&
>> + (ret != ETIMEDOUT)) {
>> + ret = qemu_cond_timedwait(&(queue->cond),
>> + &(queue->lock), 10*100000);
>>
> Ewww... what is 10*100000, can we use something more meaningful
> please?
>
A define is fine but honestly, it's pretty darn obvious what it means...
>> + }
>> +
>> + assert(queue->idle_threads != 0);
>>
> This assertion holds because we believe one of the idle_threads
> actually did the dequeuing, right?
>
An idle thread is a thread is one that is not doing work. At this point
in the code, we are not doing any work (yet) so if idle_threads count is
zero, something is horribly wrong. We're also going to unconditionally
decrement in the future code path which means that if idle_threads is 0,
it's going to become -1.
The use of idle_thread is to detect whether it's necessary to spawn an
additional thread.
>> + if (QTAILQ_EMPTY(&(queue->request_list))) {
>> + if (queue->cur_threads> queue->min_threads) {
>> + /* We retain the minimum number of threads */
>> + break;
>> + }
>> + } else {
>> + work = QTAILQ_FIRST(&(queue->request_list));
>> + QTAILQ_REMOVE(&(queue->request_list), work, node);
>> +
>> + queue->idle_threads--;
>> + qemu_mutex_unlock(&(queue->lock));
>> +
>> + /* execute the work function */
>> + work->func(work);
>> +
>> + qemu_mutex_lock(&(queue->lock));
>> + queue->idle_threads++;
>> + }
>> + }
>> +
>> + queue->idle_threads--;
>> + queue->cur_threads--;
>> + qemu_mutex_unlock(&(queue->lock));
>> +
>> + return NULL;
>>
> Does anybody do a join on the exiting thread from the pool?
>
No. The thread is created in a detached state.
>> +}
>> +
>> +static void spawn_threadlet(ThreadletQueue *queue)
>> +{
>> + QemuThread thread;
>> +
>> + queue->cur_threads++;
>> + queue->idle_threads++;
>> +
>> + qemu_thread_create(&thread, threadlet_worker, queue);
>>
>
>> +}
>> +
>> +/**
>> + * submit_threadletwork_to_queue: Submit a new task to a private queue to be
>> + * executed asynchronously.
>> + * @queue: Per-subsystem private queue to which the new task needs
>> + * to be submitted.
>> + * @work: Contains information about the task that needs to be submitted.
>> + */
>> +void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work)
>> +{
>> + qemu_mutex_lock(&(queue->lock));
>> + if (queue->idle_threads == 0&& queue->cur_threads< queue->max_threads) {
>> + spawn_threadlet(queue);
>>
> So we hold queue->lock, spawn the thread, the spawned thread tries to
> acquire queue->lock
>
Yup.
>> + }
>> + QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
>> + qemu_mutex_unlock(&(queue->lock));
>> + qemu_cond_signal(&(queue->cond));
>>
> In the case that we just spawned the threadlet, the cond_signal is
> spurious. If we need predictable scheduling behaviour,
> qemu_cond_signal needs to happen with queue->lock held.
>
It doesn't really affect predictability..
> I'd rewrite the function as
>
> /**
> * submit_threadletwork_to_queue: Submit a new task to a private queue to be
> * executed asynchronously.
> * @queue: Per-subsystem private queue to which the new task needs
> * to be submitted.
> * @work: Contains information about the task that needs to be submitted.
> */
> void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work)
> {
> qemu_mutex_lock(&(queue->lock));
> if (queue->idle_threads == 0&& (queue->cur_threads< queue->max_threads)) {
> spawn_threadlet(queue);
> } else {
> qemu_cond_signal(&(queue->cond));
> }
> QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
> qemu_mutex_unlock(&(queue->lock));
> }
>
I think this is a lot more fragile. You're relying on the fact that
signal will not cause the signalled thread to actually awaken until we
release the lock and doing work after signalling that the signalled
thread needs to be completed before it wakes up.
I think you're a lot more robust in the long term if you treat condition
signalling as a hand off point because it makes the code a lot more
explicit about what's happening.
>> +/**
>> + * submit_threadletwork: Submit to the global queue a new task to be executed
>> + * asynchronously.
>> + * @work: Contains information about the task that needs to be submitted.
>> + */
>> +void submit_threadletwork(ThreadletWork *work)
>> +{
>> + if (unlikely(!globalqueue_init)) {
>> + threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
>> + MIN_GLOBAL_THREADS);
>> + globalqueue_init = 1;
>> + }
>>
> What protects globalqueue_init?
>
qemu_mutex, and that unlikely is almost certainly a premature optimization.
Regards,
Anthony Liguori
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
2010-10-19 21:36 ` Anthony Liguori
@ 2010-10-20 2:22 ` Balbir Singh
2010-10-20 3:46 ` Venkateswararao Jujjuri (JV)
2010-10-20 13:13 ` Anthony Liguori
2010-10-20 3:19 ` Venkateswararao Jujjuri (JV)
1 sibling, 2 replies; 38+ messages in thread
From: Balbir Singh @ 2010-10-20 2:22 UTC (permalink / raw)
To: Anthony Liguori; +Cc: Arun R Bharadwaj, qemu-devel
* Anthony Liguori <anthony@codemonkey.ws> [2010-10-19 16:36:31]:
> On 10/19/2010 01:36 PM, Balbir Singh wrote:
> >>+ qemu_mutex_lock(&(queue->lock));
> >>+ while (1) {
> >>+ ThreadletWork *work;
> >>+ int ret = 0;
> >>+
> >>+ while (QTAILQ_EMPTY(&(queue->request_list))&&
> >>+ (ret != ETIMEDOUT)) {
> >>+ ret = qemu_cond_timedwait(&(queue->cond),
> >>+ &(queue->lock), 10*100000);
> >Ewww... what is 10*100000, can we use something more meaningful
> >please?
>
> A define is fine but honestly, it's pretty darn obvious what it means...
>
> >>+ }
> >>+
> >>+ assert(queue->idle_threads != 0);
> >This assertion holds because we believe one of the idle_threads
> >actually did the dequeuing, right?
>
> An idle thread is a thread is one that is not doing work. At this
> point in the code, we are not doing any work (yet) so if
> idle_threads count is zero, something is horribly wrong. We're also
> going to unconditionally decrement in the future code path which
> means that if idle_threads is 0, it's going to become -1.
>
> The use of idle_thread is to detect whether it's necessary to spawn
> an additional thread.
>
We can hit this assert if pthread_cond_signal() is called outside of
the mutex, let me try and explain below
> >>+ if (QTAILQ_EMPTY(&(queue->request_list))) {
> >>+ if (queue->cur_threads> queue->min_threads) {
> >>+ /* We retain the minimum number of threads */
> >>+ break;
> >>+ }
> >>+ } else {
> >>+ work = QTAILQ_FIRST(&(queue->request_list));
> >>+ QTAILQ_REMOVE(&(queue->request_list), work, node);
> >>+
> >>+ queue->idle_threads--;
> >>+ qemu_mutex_unlock(&(queue->lock));
> >>+
> >>+ /* execute the work function */
> >>+ work->func(work);
> >>+
> >>+ qemu_mutex_lock(&(queue->lock));
> >>+ queue->idle_threads++;
> >>+ }
> >>+ }
> >>+
> >>+ queue->idle_threads--;
> >>+ queue->cur_threads--;
> >>+ qemu_mutex_unlock(&(queue->lock));
> >>+
> >>+ return NULL;
> >Does anybody do a join on the exiting thread from the pool?
>
> No. The thread is created in a detached state.
>
That makes sense, thanks for clarifying
> >>+}
> >>+
> >>+static void spawn_threadlet(ThreadletQueue *queue)
> >>+{
> >>+ QemuThread thread;
> >>+
> >>+ queue->cur_threads++;
> >>+ queue->idle_threads++;
> >>+
> >>+ qemu_thread_create(&thread, threadlet_worker, queue);
> >>+}
> >>+
> >>+/**
> >>+ * submit_threadletwork_to_queue: Submit a new task to a private queue to be
> >>+ * executed asynchronously.
> >>+ * @queue: Per-subsystem private queue to which the new task needs
> >>+ * to be submitted.
> >>+ * @work: Contains information about the task that needs to be submitted.
> >>+ */
> >>+void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work)
> >>+{
> >>+ qemu_mutex_lock(&(queue->lock));
> >>+ if (queue->idle_threads == 0&& queue->cur_threads< queue->max_threads) {
> >>+ spawn_threadlet(queue);
> >So we hold queue->lock, spawn the thread, the spawned thread tries to
> >acquire queue->lock
>
> Yup.
>
> >>+ }
> >>+ QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
> >>+ qemu_mutex_unlock(&(queue->lock));
> >>+ qemu_cond_signal(&(queue->cond));
> >In the case that we just spawned the threadlet, the cond_signal is
> >spurious. If we need predictable scheduling behaviour,
> >qemu_cond_signal needs to happen with queue->lock held.
>
> It doesn't really affect predictability..
>
> >I'd rewrite the function as
> >
> >/**
> > * submit_threadletwork_to_queue: Submit a new task to a private queue to be
> > * executed asynchronously.
> > * @queue: Per-subsystem private queue to which the new task needs
> > * to be submitted.
> > * @work: Contains information about the task that needs to be submitted.
> > */
> >void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work)
> >{
> > qemu_mutex_lock(&(queue->lock));
> > if (queue->idle_threads == 0&& (queue->cur_threads< queue->max_threads)) {
> > spawn_threadlet(queue);
> > } else {
> > qemu_cond_signal(&(queue->cond));
> > }
> > QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
> > qemu_mutex_unlock(&(queue->lock));
> >}
>
> I think this is a lot more fragile. You're relying on the fact that
> signal will not cause the signalled thread to actually awaken until
> we release the lock and doing work after signalling that the
> signalled thread needs to be completed before it wakes up.
>
> I think you're a lot more robust in the long term if you treat
> condition signalling as a hand off point because it makes the code a
> lot more explicit about what's happening.
>
OK, here is a situation that can happen
T1 T2
--- ---
threadlet submit_threadletwork_to_queue
(sees condition as no work) mutex_lock
qemu_cond_timedwait add_work
... mutex_unlock
T3
--
cancel_threadlet_work_on_queue
mutex_lock (grabs it) before T1 can
cancels the work
qemu_cond_signal
T1
--
Grabs mutex_lock (from within cond_timedwait)
Now there is no work to do, the condition
has changed before the thread wakes up
The man page also states
"however, if predictable scheduling behavior is required, then that
mutex shall be locked by the thread calling pthread_cond_broadcast()
or pthread_cond_signal()"
> >>+/**
> >>+ * submit_threadletwork: Submit to the global queue a new task to be executed
> >>+ * asynchronously.
> >>+ * @work: Contains information about the task that needs to be submitted.
> >>+ */
> >>+void submit_threadletwork(ThreadletWork *work)
> >>+{
> >>+ if (unlikely(!globalqueue_init)) {
> >>+ threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
> >>+ MIN_GLOBAL_THREADS);
> >>+ globalqueue_init = 1;
> >>+ }
> >What protects globalqueue_init?
>
> qemu_mutex, and that unlikely is almost certainly a premature optimization.
>
> Regards,
>
> Anthony Liguori
>
--
Three Cheers,
Balbir
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
2010-10-20 2:22 ` Balbir Singh
@ 2010-10-20 3:46 ` Venkateswararao Jujjuri (JV)
2010-10-20 13:05 ` Balbir Singh
2010-10-20 13:13 ` Anthony Liguori
1 sibling, 1 reply; 38+ messages in thread
From: Venkateswararao Jujjuri (JV) @ 2010-10-20 3:46 UTC (permalink / raw)
To: balbir; +Cc: Arun R Bharadwaj, qemu-devel
On 10/19/2010 7:22 PM, Balbir Singh wrote:
> * Anthony Liguori <anthony@codemonkey.ws> [2010-10-19 16:36:31]:
>
>> On 10/19/2010 01:36 PM, Balbir Singh wrote:
>>>> + qemu_mutex_lock(&(queue->lock));
>>>> + while (1) {
>>>> + ThreadletWork *work;
>>>> + int ret = 0;
>>>> +
>>>> + while (QTAILQ_EMPTY(&(queue->request_list))&&
>>>> + (ret != ETIMEDOUT)) {
>>>> + ret = qemu_cond_timedwait(&(queue->cond),
>>>> + &(queue->lock), 10*100000);
>>> Ewww... what is 10*100000, can we use something more meaningful
>>> please?
>>
>> A define is fine but honestly, it's pretty darn obvious what it means...
>>
>>>> + }
>>>> +
>>>> + assert(queue->idle_threads != 0);
>>> This assertion holds because we believe one of the idle_threads
>>> actually did the dequeuing, right?
>>
>> An idle thread is a thread is one that is not doing work. At this
>> point in the code, we are not doing any work (yet) so if
>> idle_threads count is zero, something is horribly wrong. We're also
>> going to unconditionally decrement in the future code path which
>> means that if idle_threads is 0, it's going to become -1.
>>
>> The use of idle_thread is to detect whether it's necessary to spawn
>> an additional thread.
>>
>
> We can hit this assert if pthread_cond_signal() is called outside of
> the mutex, let me try and explain below
>
>>>> + if (QTAILQ_EMPTY(&(queue->request_list))) {
>>>> + if (queue->cur_threads> queue->min_threads) {
>>>> + /* We retain the minimum number of threads */
>>>> + break;
>>>> + }
>>>> + } else {
>>>> + work = QTAILQ_FIRST(&(queue->request_list));
>>>> + QTAILQ_REMOVE(&(queue->request_list), work, node);
>>>> +
>>>> + queue->idle_threads--;
>>>> + qemu_mutex_unlock(&(queue->lock));
>>>> +
>>>> + /* execute the work function */
>>>> + work->func(work);
>>>> +
>>>> + qemu_mutex_lock(&(queue->lock));
>>>> + queue->idle_threads++;
>>>> + }
>>>> + }
>>>> +
>>>> + queue->idle_threads--;
>>>> + queue->cur_threads--;
>>>> + qemu_mutex_unlock(&(queue->lock));
>>>> +
>>>> + return NULL;
>>> Does anybody do a join on the exiting thread from the pool?
>>
>> No. The thread is created in a detached state.
>>
>
> That makes sense, thanks for clarifying
>
>>>> +}
>>>> +
>>>> +static void spawn_threadlet(ThreadletQueue *queue)
>>>> +{
>>>> + QemuThread thread;
>>>> +
>>>> + queue->cur_threads++;
>>>> + queue->idle_threads++;
>>>> +
>>>> + qemu_thread_create(&thread, threadlet_worker, queue);
>>>> +}
>>>> +
>>>> +/**
>>>> + * submit_threadletwork_to_queue: Submit a new task to a private queue to be
>>>> + * executed asynchronously.
>>>> + * @queue: Per-subsystem private queue to which the new task needs
>>>> + * to be submitted.
>>>> + * @work: Contains information about the task that needs to be submitted.
>>>> + */
>>>> +void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work)
>>>> +{
>>>> + qemu_mutex_lock(&(queue->lock));
>>>> + if (queue->idle_threads == 0&& queue->cur_threads< queue->max_threads) {
>>>> + spawn_threadlet(queue);
>>> So we hold queue->lock, spawn the thread, the spawned thread tries to
>>> acquire queue->lock
>>
>> Yup.
>>
>>>> + }
>>>> + QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
>>>> + qemu_mutex_unlock(&(queue->lock));
>>>> + qemu_cond_signal(&(queue->cond));
>>> In the case that we just spawned the threadlet, the cond_signal is
>>> spurious. If we need predictable scheduling behaviour,
>>> qemu_cond_signal needs to happen with queue->lock held.
>>
>> It doesn't really affect predictability..
>>
>>> I'd rewrite the function as
>>>
>>> /**
>>> * submit_threadletwork_to_queue: Submit a new task to a private queue to be
>>> * executed asynchronously.
>>> * @queue: Per-subsystem private queue to which the new task needs
>>> * to be submitted.
>>> * @work: Contains information about the task that needs to be submitted.
>>> */
>>> void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work)
>>> {
>>> qemu_mutex_lock(&(queue->lock));
>>> if (queue->idle_threads == 0&& (queue->cur_threads< queue->max_threads)) {
>>> spawn_threadlet(queue);
>>> } else {
>>> qemu_cond_signal(&(queue->cond));
>>> }
>>> QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
>>> qemu_mutex_unlock(&(queue->lock));
>>> }
>>
>> I think this is a lot more fragile. You're relying on the fact that
>> signal will not cause the signalled thread to actually awaken until
>> we release the lock and doing work after signalling that the
>> signalled thread needs to be completed before it wakes up.
>>
>> I think you're a lot more robust in the long term if you treat
>> condition signalling as a hand off point because it makes the code a
>> lot more explicit about what's happening.
>>
>
> OK, here is a situation that can happen
>
> T1 T2
> --- ---
> threadlet submit_threadletwork_to_queue
> (sees condition as no work) mutex_lock
> qemu_cond_timedwait add_work
> ... mutex_unlock
>
> T3
> --
> cancel_threadlet_work_on_queue
> mutex_lock (grabs it) before T1 can
> cancels the work
>
>
> qemu_cond_signal
>
> T1
> --
> Grabs mutex_lock (from within cond_timedwait)
> Now there is no work to do, the condition
> has changed before the thread wakes up
So what? It won't find any work and goes back to sleep or exits.
idle_threads is decremented only in threadlet_worker(). Given that
we have a threadlet that is not doing anywork the assert should never hit unless
something horribly wrong .
- JV
>
>
> The man page also states
>
> "however, if predictable scheduling behavior is required, then that
> mutex shall be locked by the thread calling pthread_cond_broadcast()
> or pthread_cond_signal()"
>
>>>> +/**
>>>> + * submit_threadletwork: Submit to the global queue a new task to be executed
>>>> + * asynchronously.
>>>> + * @work: Contains information about the task that needs to be submitted.
>>>> + */
>>>> +void submit_threadletwork(ThreadletWork *work)
>>>> +{
>>>> + if (unlikely(!globalqueue_init)) {
>>>> + threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
>>>> + MIN_GLOBAL_THREADS);
>>>> + globalqueue_init = 1;
>>>> + }
>>> What protects globalqueue_init?
>>
>> qemu_mutex, and that unlikely is almost certainly a premature optimization.
>>
>> Regards,
>>
>> Anthony Liguori
>>
>
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
2010-10-20 3:46 ` Venkateswararao Jujjuri (JV)
@ 2010-10-20 13:05 ` Balbir Singh
0 siblings, 0 replies; 38+ messages in thread
From: Balbir Singh @ 2010-10-20 13:05 UTC (permalink / raw)
To: Venkateswararao Jujjuri (JV); +Cc: Arun R Bharadwaj, qemu-devel
* Venkateswararao Jujjuri (JV) <jvrao@linux.vnet.ibm.com> [2010-10-19 20:46:35]:
> >> I think this is a lot more fragile. You're relying on the fact that
> >> signal will not cause the signalled thread to actually awaken until
> >> we release the lock and doing work after signalling that the
> >> signalled thread needs to be completed before it wakes up.
> >>
> >> I think you're a lot more robust in the long term if you treat
> >> condition signalling as a hand off point because it makes the code a
> >> lot more explicit about what's happening.
> >>
> >
> > OK, here is a situation that can happen
> >
> > T1 T2
> > --- ---
> > threadlet submit_threadletwork_to_queue
> > (sees condition as no work) mutex_lock
> > qemu_cond_timedwait add_work
> > ... mutex_unlock
> >
> > T3
> > --
> > cancel_threadlet_work_on_queue
> > mutex_lock (grabs it) before T1 can
> > cancels the work
> >
> >
> > qemu_cond_signal
> >
> > T1
> > --
> > Grabs mutex_lock (from within cond_timedwait)
> > Now there is no work to do, the condition
> > has changed before the thread wakes up
>
> So what? It won't find any work and goes back to sleep or exits.
>
Spurious wakeups are not good - they waste CPU cycles, consume energy.
Beyond that if we look at generic design
a. We want the thread condition to not change before it wakes up
(reduce that window at-least)
b. Although we don't care about thread priorities today in threadlet,
if we ever did and by good design you'd want the thread your waking up
to be contending for the mutex as soon the notifier releases the lock,
otherwise a lower priority thread can starve the original sleeper.
The code as posted today, does not have functional issues except for
opening up the window for spurious wakeups.
> idle_threads is decremented only in threadlet_worker(). Given that
> we have a threadlet that is not doing anywork the assert should never hit unless
> something horribly wrong .
>
--
Three Cheers,
Balbir
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
2010-10-20 2:22 ` Balbir Singh
2010-10-20 3:46 ` Venkateswararao Jujjuri (JV)
@ 2010-10-20 13:13 ` Anthony Liguori
1 sibling, 0 replies; 38+ messages in thread
From: Anthony Liguori @ 2010-10-20 13:13 UTC (permalink / raw)
To: balbir; +Cc: Arun R Bharadwaj, qemu-devel
On 10/19/2010 09:22 PM, Balbir Singh wrote:
>
> OK, here is a situation that can happen
>
> T1 T2
> --- ---
> threadlet submit_threadletwork_to_queue
> (sees condition as no work) mutex_lock
> qemu_cond_timedwait add_work
> ... mutex_unlock
>
> T3
> --
> cancel_threadlet_work_on_queue
> mutex_lock (grabs it) before T1 can
> cancels the work
>
>
> qemu_cond_signal
>
> T1
> --
> Grabs mutex_lock (from within cond_timedwait)
> Now there is no work to do, the condition
> has changed before the thread wakes up
>
>
> The man page also states
>
> "however, if predictable scheduling behavior is required, then that
> mutex shall be locked by the thread calling pthread_cond_broadcast()
> or pthread_cond_signal()"
>
The scenario you're describing is a spurious wakeup. Any code that uses
conditions ought to handle spurious wakeups. The typical idiom for this is:
while (no_work_available()) {
pthread_cond_wait(cond, lock);
}
So yes, pthread_cond_timedwait() will return but the while loop
condition will be checked first. In the scenario you describe, we'll go
immediately back to sleep and the assert will not be triggered.
As I mentioned originally, in the absence of performance data, code
readability trumps premature optimization. I think the code is a lot
more readable if the signaling point is outside of the mutex.
Regards,
Anthony Liguori
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
2010-10-19 21:36 ` Anthony Liguori
2010-10-20 2:22 ` Balbir Singh
@ 2010-10-20 3:19 ` Venkateswararao Jujjuri (JV)
1 sibling, 0 replies; 38+ messages in thread
From: Venkateswararao Jujjuri (JV) @ 2010-10-20 3:19 UTC (permalink / raw)
To: Anthony Liguori; +Cc: Arun R Bharadwaj, qemu-devel, balbir
On 10/19/2010 2:36 PM, Anthony Liguori wrote:
> On 10/19/2010 01:36 PM, Balbir Singh wrote:
>>> + qemu_mutex_lock(&(queue->lock));
>>> + while (1) {
>>> + ThreadletWork *work;
>>> + int ret = 0;
>>> +
>>> + while (QTAILQ_EMPTY(&(queue->request_list))&&
>>> + (ret != ETIMEDOUT)) {
>>> + ret = qemu_cond_timedwait(&(queue->cond),
>>> + &(queue->lock), 10*100000);
>>>
>> Ewww... what is 10*100000, can we use something more meaningful
>> please?
>>
>
> A define is fine but honestly, it's pretty darn obvious what it means...
>
>>> + }
>>> +
>>> + assert(queue->idle_threads != 0);
>>>
>> This assertion holds because we believe one of the idle_threads
>> actually did the dequeuing, right?
>>
>
> An idle thread is a thread is one that is not doing work. At this point in the
> code, we are not doing any work (yet) so if idle_threads count is zero,
> something is horribly wrong. We're also going to unconditionally decrement in
> the future code path which means that if idle_threads is 0, it's going to become
> -1.
>
> The use of idle_thread is to detect whether it's necessary to spawn an
> additional thread.
>
>>> + if (QTAILQ_EMPTY(&(queue->request_list))) {
>>> + if (queue->cur_threads> queue->min_threads) {
>>> + /* We retain the minimum number of threads */
>>> + break;
>>> + }
>>> + } else {
>>> + work = QTAILQ_FIRST(&(queue->request_list));
>>> + QTAILQ_REMOVE(&(queue->request_list), work, node);
>>> +
>>> + queue->idle_threads--;
>>> + qemu_mutex_unlock(&(queue->lock));
>>> +
>>> + /* execute the work function */
>>> + work->func(work);
>>> +
>>> + qemu_mutex_lock(&(queue->lock));
>>> + queue->idle_threads++;
>>> + }
>>> + }
>>> +
>>> + queue->idle_threads--;
>>> + queue->cur_threads--;
>>> + qemu_mutex_unlock(&(queue->lock));
>>> +
>>> + return NULL;
>>>
>> Does anybody do a join on the exiting thread from the pool?
>>
>
> No. The thread is created in a detached state.
>
>>> +}
>>> +
>>> +static void spawn_threadlet(ThreadletQueue *queue)
>>> +{
>>> + QemuThread thread;
>>> +
>>> + queue->cur_threads++;
>>> + queue->idle_threads++;
>>> +
>>> + qemu_thread_create(&thread, threadlet_worker, queue);
>>>
>>
>>> +}
>>> +
>>> +/**
>>> + * submit_threadletwork_to_queue: Submit a new task to a private queue to be
>>> + * executed asynchronously.
>>> + * @queue: Per-subsystem private queue to which the new task needs
>>> + * to be submitted.
>>> + * @work: Contains information about the task that needs to be submitted.
>>> + */
>>> +void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work)
>>> +{
>>> + qemu_mutex_lock(&(queue->lock));
>>> + if (queue->idle_threads == 0&& queue->cur_threads< queue->max_threads) {
>>> + spawn_threadlet(queue);
>>>
>> So we hold queue->lock, spawn the thread, the spawned thread tries to
>> acquire queue->lock
>>
>
> Yup.
>
>>> + }
>>> + QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
>>> + qemu_mutex_unlock(&(queue->lock));
>>> + qemu_cond_signal(&(queue->cond));
>>>
>> In the case that we just spawned the threadlet, the cond_signal is
>> spurious. If we need predictable scheduling behaviour,
>> qemu_cond_signal needs to happen with queue->lock held.
>>
>
> It doesn't really affect predictability..
>
>> I'd rewrite the function as
>>
>> /**
>> * submit_threadletwork_to_queue: Submit a new task to a private queue to be
>> * executed asynchronously.
>> * @queue: Per-subsystem private queue to which the new task needs
>> * to be submitted.
>> * @work: Contains information about the task that needs to be submitted.
>> */
>> void submit_threadletwork_to_queue(ThreadletQueue *queue, ThreadletWork *work)
>> {
>> qemu_mutex_lock(&(queue->lock));
>> if (queue->idle_threads == 0&& (queue->cur_threads< queue->max_threads)) {
>> spawn_threadlet(queue);
>> } else {
>> qemu_cond_signal(&(queue->cond));
>> }
>> QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
>> qemu_mutex_unlock(&(queue->lock));
>> }
>>
>
> I think this is a lot more fragile. You're relying on the fact that signal will
> not cause the signalled thread to actually awaken until we release the lock and
> doing work after signalling that the signalled thread needs to be completed
> before it wakes up.
Given that qemu_cond_timedwait() need to get the queue->lock before returning
the singalled thread will wakeup and wait on the queue->lock.
- JV
>
> I think you're a lot more robust in the long term if you treat condition
> signalling as a hand off point because it makes the code a lot more explicit
> about what's happening.
>
>>> +/**
>>> + * submit_threadletwork: Submit to the global queue a new task to be executed
>>> + * asynchronously.
>>> + * @work: Contains information about the task that needs to be submitted.
>>> + */
>>> +void submit_threadletwork(ThreadletWork *work)
>>> +{
>>> + if (unlikely(!globalqueue_init)) {
>>> + threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
>>> + MIN_GLOBAL_THREADS);
>>> + globalqueue_init = 1;
>>> + }
>>>
>> What protects globalqueue_init?
>>
>
> qemu_mutex, and that unlikely is almost certainly a premature optimization.
>
> Regards,
>
> Anthony Liguori
>
>
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] [PATCH 1/3] Introduce threadlets
2010-10-19 17:42 ` [Qemu-devel] [PATCH 1/3] Introduce threadlets Arun R Bharadwaj
2010-10-19 18:36 ` Balbir Singh
@ 2010-10-20 8:16 ` Stefan Hajnoczi
1 sibling, 0 replies; 38+ messages in thread
From: Stefan Hajnoczi @ 2010-10-20 8:16 UTC (permalink / raw)
To: Arun R Bharadwaj; +Cc: qemu-devel
On Tue, Oct 19, 2010 at 6:42 PM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> +/**
> + * cancel_threadletwork_on_queue: Cancel a task queued on a Queue.
> + * @queue: The queue containing the task to be cancelled.
> + * @work: Contains the information of the task that needs to be cancelled.
> + *
> + * Returns: 0 if the task is successfully cancelled.
> + * 1 otherwise.
The return value comment doesn't correspond to how I read the code.
If the work was cancelled the code returns 1. Otherwise it returns 0.
> + */
> +int cancel_threadletwork_on_queue(ThreadletQueue *queue, ThreadletWork *work)
> +{
> + ThreadletWork *ret_work;
> + int ret = 0;
> +
> + qemu_mutex_lock(&(queue->lock));
> + QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
> + if (ret_work == work) {
> + QTAILQ_REMOVE(&(queue->request_list), ret_work, node);
> + ret = 1;
> + break;
> + }
> + }
> + qemu_mutex_unlock(&(queue->lock));
> +
> + return ret;
> +}
Stefan
^ permalink raw reply [flat|nested] 38+ messages in thread
* [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
2010-10-19 17:42 [Qemu-devel] v6: [PATCH 0/3]: Threadlets: A generic task offloading framework Arun R Bharadwaj
2010-10-19 17:42 ` [Qemu-devel] [PATCH 1/3] Introduce threadlets Arun R Bharadwaj
@ 2010-10-19 17:43 ` Arun R Bharadwaj
2010-10-20 2:24 ` Balbir Singh
` (2 more replies)
2010-10-19 17:43 ` [Qemu-devel] [PATCH 3/3] Add helper functions for virtio-9p to " Arun R Bharadwaj
2010-10-20 11:57 ` [Qemu-devel] v6: [PATCH 0/3]: Threadlets: A generic task offloading framework Amit Shah
3 siblings, 3 replies; 38+ messages in thread
From: Arun R Bharadwaj @ 2010-10-19 17:43 UTC (permalink / raw)
To: qemu-devel
From: Gautham R Shenoy <ego@in.ibm.com>
This patch makes the paio subsystem use the threadlet framework thereby
decoupling asynchronous threading framework portion out of
posix-aio-compat.c
The patch has been tested with fstress.
Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
---
posix-aio-compat.c | 166 +++++++++-------------------------------------------
1 files changed, 30 insertions(+), 136 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 7b862b5..6977c18 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,6 +29,7 @@
#include "block_int.h"
#include "block/raw-posix-aio.h"
+#include "qemu-threadlets.h"
struct qemu_paiocb {
@@ -51,6 +52,7 @@ struct qemu_paiocb {
struct qemu_paiocb *next;
int async_context_id;
+ ThreadletWork work;
};
typedef struct PosixAioState {
@@ -59,15 +61,6 @@ typedef struct PosixAioState {
} PosixAioState;
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
-
#ifdef CONFIG_PREADV
static int preadv_present = 1;
#else
@@ -85,39 +78,6 @@ static void die(const char *what)
die2(errno, what);
}
-static void mutex_lock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_lock(mutex);
- if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_unlock(mutex);
- if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
- struct timespec *ts)
-{
- int ret = pthread_cond_timedwait(cond, mutex, ts);
- if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
- return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
- int ret = pthread_cond_signal(cond);
- if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
- void *(*start_routine)(void*), void *arg)
-{
- int ret = pthread_create(thread, attr, start_routine, arg);
- if (ret) die2(ret, "pthread_create");
-}
-
static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
{
int ret;
@@ -301,106 +261,51 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
return nbytes;
}
-static void *aio_thread(void *unused)
+static void aio_thread(ThreadletWork *work)
{
pid_t pid;
+ struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
+ ssize_t ret = 0;
pid = getpid();
+ aiocb->active = 1;
- while (1) {
- struct qemu_paiocb *aiocb;
- ssize_t ret = 0;
- qemu_timeval tv;
- struct timespec ts;
-
- qemu_gettimeofday(&tv);
- ts.tv_sec = tv.tv_sec + 10;
- ts.tv_nsec = 0;
-
- mutex_lock(&lock);
-
- while (QTAILQ_EMPTY(&request_list) &&
- !(ret == ETIMEDOUT)) {
- ret = cond_timedwait(&cond, &lock, &ts);
- }
-
- if (QTAILQ_EMPTY(&request_list))
- break;
-
- aiocb = QTAILQ_FIRST(&request_list);
- QTAILQ_REMOVE(&request_list, aiocb, node);
- aiocb->active = 1;
- idle_threads--;
- mutex_unlock(&lock);
-
- switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
- case QEMU_AIO_READ:
- case QEMU_AIO_WRITE:
- ret = handle_aiocb_rw(aiocb);
- break;
- case QEMU_AIO_FLUSH:
- ret = handle_aiocb_flush(aiocb);
- break;
- case QEMU_AIO_IOCTL:
- ret = handle_aiocb_ioctl(aiocb);
- break;
- default:
- fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
- ret = -EINVAL;
- break;
- }
-
- mutex_lock(&lock);
- aiocb->ret = ret;
- idle_threads++;
- mutex_unlock(&lock);
-
- if (kill(pid, aiocb->ev_signo)) die("kill failed");
+ switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+ case QEMU_AIO_READ:
+ case QEMU_AIO_WRITE:
+ ret = handle_aiocb_rw(aiocb);
+ break;
+ case QEMU_AIO_FLUSH:
+ ret = handle_aiocb_flush(aiocb);
+ break;
+ case QEMU_AIO_IOCTL:
+ ret = handle_aiocb_ioctl(aiocb);
+ break;
+ default:
+ fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+ ret = -EINVAL;
+ break;
}
- idle_threads--;
- cur_threads--;
- mutex_unlock(&lock);
+ aiocb->ret = ret;
- return NULL;
-}
-
-static void spawn_thread(void)
-{
- sigset_t set, oldset;
-
- cur_threads++;
- idle_threads++;
-
- /* block all signals */
- if (sigfillset(&set)) die("sigfillset");
- if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
- thread_create(&thread_id, &attr, aio_thread, NULL);
-
- if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+ if (kill(pid, aiocb->ev_signo)) die("kill failed");
}
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
{
aiocb->ret = -EINPROGRESS;
aiocb->active = 0;
- mutex_lock(&lock);
- if (idle_threads == 0 && cur_threads < max_threads)
- spawn_thread();
- QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
- mutex_unlock(&lock);
- cond_signal(&cond);
+
+ aiocb->work.func = aio_thread;
+ submit_threadletwork(&aiocb->work);
}
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
{
ssize_t ret;
- mutex_lock(&lock);
ret = aiocb->ret;
- mutex_unlock(&lock);
-
return ret;
}
@@ -536,14 +441,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
int active = 0;
- mutex_lock(&lock);
if (!acb->active) {
- QTAILQ_REMOVE(&request_list, acb, node);
- acb->ret = -ECANCELED;
+ if (!cancel_threadletwork(&acb->work))
+ acb->ret = -ECANCELED;
+ else
+ active = 1;
} else if (acb->ret == -EINPROGRESS) {
active = 1;
}
- mutex_unlock(&lock);
if (active) {
/* fail safe: if the aio could not be canceled, we wait for
@@ -618,7 +523,6 @@ int paio_init(void)
struct sigaction act;
PosixAioState *s;
int fds[2];
- int ret;
if (posix_aio_state)
return 0;
@@ -645,16 +549,6 @@ int paio_init(void)
qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
posix_aio_process_queue, s);
- ret = pthread_attr_init(&attr);
- if (ret)
- die2(ret, "pthread_attr_init");
-
- ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- if (ret)
- die2(ret, "pthread_attr_setdetachstate");
-
- QTAILQ_INIT(&request_list);
-
posix_aio_state = s;
return 0;
}
^ permalink raw reply related [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
2010-10-19 17:43 ` [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets Arun R Bharadwaj
@ 2010-10-20 2:24 ` Balbir Singh
2010-10-20 8:42 ` Kevin Wolf
2010-10-20 9:30 ` Stefan Hajnoczi
2 siblings, 0 replies; 38+ messages in thread
From: Balbir Singh @ 2010-10-20 2:24 UTC (permalink / raw)
To: Arun R Bharadwaj; +Cc: qemu-devel
* Arun R B <arun@linux.vnet.ibm.com> [2010-10-19 23:13:20]:
> From: Gautham R Shenoy <ego@in.ibm.com>
>
> This patch makes the paio subsystem use the threadlet framework thereby
> decoupling asynchronous threading framework portion out of
> posix-aio-compat.c
>
> The patch has been tested with fstress.
>
> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
> Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
This change seems reasonable to me
Acked-by: Balbir Singh <balbir@linux.vnet.ibm.com>
--
Three Cheers,
Balbir
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
2010-10-19 17:43 ` [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets Arun R Bharadwaj
2010-10-20 2:24 ` Balbir Singh
@ 2010-10-20 8:42 ` Kevin Wolf
2010-10-20 9:30 ` Stefan Hajnoczi
2 siblings, 0 replies; 38+ messages in thread
From: Kevin Wolf @ 2010-10-20 8:42 UTC (permalink / raw)
To: Arun R Bharadwaj; +Cc: qemu-devel
Am 19.10.2010 19:43, schrieb Arun R Bharadwaj:
> From: Gautham R Shenoy <ego@in.ibm.com>
>
> This patch makes the paio subsystem use the threadlet framework thereby
> decoupling asynchronous threading framework portion out of
> posix-aio-compat.c
>
> The patch has been tested with fstress.
>
> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
> Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
> ---
> posix-aio-compat.c | 166 +++++++++-------------------------------------------
> 1 files changed, 30 insertions(+), 136 deletions(-)
>
> diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> index 7b862b5..6977c18 100644
> --- a/posix-aio-compat.c
> +++ b/posix-aio-compat.c
> @@ -29,6 +29,7 @@
> #include "block_int.h"
>
> #include "block/raw-posix-aio.h"
> +#include "qemu-threadlets.h"
>
>
> struct qemu_paiocb {
> @@ -51,6 +52,7 @@ struct qemu_paiocb {
> struct qemu_paiocb *next;
>
> int async_context_id;
> + ThreadletWork work;
> };
>
> typedef struct PosixAioState {
> @@ -59,15 +61,6 @@ typedef struct PosixAioState {
> } PosixAioState;
>
>
> -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
> -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
> -static pthread_t thread_id;
> -static pthread_attr_t attr;
> -static int max_threads = 64;
> -static int cur_threads = 0;
> -static int idle_threads = 0;
> -static QTAILQ_HEAD(, qemu_paiocb) request_list;
> -
> #ifdef CONFIG_PREADV
> static int preadv_present = 1;
> #else
> @@ -85,39 +78,6 @@ static void die(const char *what)
> die2(errno, what);
> }
>
> -static void mutex_lock(pthread_mutex_t *mutex)
> -{
> - int ret = pthread_mutex_lock(mutex);
> - if (ret) die2(ret, "pthread_mutex_lock");
> -}
> -
> -static void mutex_unlock(pthread_mutex_t *mutex)
> -{
> - int ret = pthread_mutex_unlock(mutex);
> - if (ret) die2(ret, "pthread_mutex_unlock");
> -}
> -
> -static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
> - struct timespec *ts)
> -{
> - int ret = pthread_cond_timedwait(cond, mutex, ts);
> - if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
> - return ret;
> -}
> -
> -static void cond_signal(pthread_cond_t *cond)
> -{
> - int ret = pthread_cond_signal(cond);
> - if (ret) die2(ret, "pthread_cond_signal");
> -}
> -
> -static void thread_create(pthread_t *thread, pthread_attr_t *attr,
> - void *(*start_routine)(void*), void *arg)
> -{
> - int ret = pthread_create(thread, attr, start_routine, arg);
> - if (ret) die2(ret, "pthread_create");
> -}
> -
> static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
> {
> int ret;
> @@ -301,106 +261,51 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
> return nbytes;
> }
>
> -static void *aio_thread(void *unused)
> +static void aio_thread(ThreadletWork *work)
> {
> pid_t pid;
> + struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
> + ssize_t ret = 0;
>
> pid = getpid();
> + aiocb->active = 1;
>
> - while (1) {
> - struct qemu_paiocb *aiocb;
> - ssize_t ret = 0;
> - qemu_timeval tv;
> - struct timespec ts;
> -
> - qemu_gettimeofday(&tv);
> - ts.tv_sec = tv.tv_sec + 10;
> - ts.tv_nsec = 0;
> -
> - mutex_lock(&lock);
> -
> - while (QTAILQ_EMPTY(&request_list) &&
> - !(ret == ETIMEDOUT)) {
> - ret = cond_timedwait(&cond, &lock, &ts);
> - }
> -
> - if (QTAILQ_EMPTY(&request_list))
> - break;
> -
> - aiocb = QTAILQ_FIRST(&request_list);
> - QTAILQ_REMOVE(&request_list, aiocb, node);
> - aiocb->active = 1;
> - idle_threads--;
> - mutex_unlock(&lock);
> -
> - switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> - case QEMU_AIO_READ:
> - case QEMU_AIO_WRITE:
> - ret = handle_aiocb_rw(aiocb);
> - break;
> - case QEMU_AIO_FLUSH:
> - ret = handle_aiocb_flush(aiocb);
> - break;
> - case QEMU_AIO_IOCTL:
> - ret = handle_aiocb_ioctl(aiocb);
> - break;
> - default:
> - fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> - ret = -EINVAL;
> - break;
> - }
> -
> - mutex_lock(&lock);
> - aiocb->ret = ret;
> - idle_threads++;
> - mutex_unlock(&lock);
> -
> - if (kill(pid, aiocb->ev_signo)) die("kill failed");
> + switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> + case QEMU_AIO_READ:
> + case QEMU_AIO_WRITE:
> + ret = handle_aiocb_rw(aiocb);
> + break;
> + case QEMU_AIO_FLUSH:
> + ret = handle_aiocb_flush(aiocb);
> + break;
> + case QEMU_AIO_IOCTL:
> + ret = handle_aiocb_ioctl(aiocb);
> + break;
> + default:
> + fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> + ret = -EINVAL;
> + break;
> }
>
> - idle_threads--;
> - cur_threads--;
> - mutex_unlock(&lock);
> + aiocb->ret = ret;
>
> - return NULL;
> -}
> -
> -static void spawn_thread(void)
> -{
> - sigset_t set, oldset;
> -
> - cur_threads++;
> - idle_threads++;
> -
> - /* block all signals */
> - if (sigfillset(&set)) die("sigfillset");
> - if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
> -
> - thread_create(&thread_id, &attr, aio_thread, NULL);
> -
> - if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
> + if (kill(pid, aiocb->ev_signo)) die("kill failed");
Please move the die on a line of itself and add braces.
> }
>
> static void qemu_paio_submit(struct qemu_paiocb *aiocb)
> {
> aiocb->ret = -EINPROGRESS;
> aiocb->active = 0;
> - mutex_lock(&lock);
> - if (idle_threads == 0 && cur_threads < max_threads)
> - spawn_thread();
> - QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
> - mutex_unlock(&lock);
> - cond_signal(&cond);
> +
> + aiocb->work.func = aio_thread;
> + submit_threadletwork(&aiocb->work);
> }
>
> static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
> {
> ssize_t ret;
>
> - mutex_lock(&lock);
> ret = aiocb->ret;
> - mutex_unlock(&lock);
> -
> return ret;
> }
>
> @@ -536,14 +441,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
> struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> int active = 0;
>
> - mutex_lock(&lock);
> if (!acb->active) {
> - QTAILQ_REMOVE(&request_list, acb, node);
> - acb->ret = -ECANCELED;
> + if (!cancel_threadletwork(&acb->work))
> + acb->ret = -ECANCELED;
> + else
> + active = 1;
Missing braces.
Kevin
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
2010-10-19 17:43 ` [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets Arun R Bharadwaj
2010-10-20 2:24 ` Balbir Singh
2010-10-20 8:42 ` Kevin Wolf
@ 2010-10-20 9:30 ` Stefan Hajnoczi
2010-10-20 13:16 ` Anthony Liguori
2010-10-21 8:40 ` Arun R Bharadwaj
2 siblings, 2 replies; 38+ messages in thread
From: Stefan Hajnoczi @ 2010-10-20 9:30 UTC (permalink / raw)
To: Arun R Bharadwaj; +Cc: qemu-devel
On Tue, Oct 19, 2010 at 6:43 PM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> From: Gautham R Shenoy <ego@in.ibm.com>
>
> This patch makes the paio subsystem use the threadlet framework thereby
> decoupling asynchronous threading framework portion out of
> posix-aio-compat.c
>
> The patch has been tested with fstress.
>
> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
> Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
> ---
> posix-aio-compat.c | 166 +++++++++-------------------------------------------
> 1 files changed, 30 insertions(+), 136 deletions(-)
>
> diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> index 7b862b5..6977c18 100644
> --- a/posix-aio-compat.c
> +++ b/posix-aio-compat.c
> @@ -29,6 +29,7 @@
> #include "block_int.h"
>
> #include "block/raw-posix-aio.h"
> +#include "qemu-threadlets.h"
>
>
> struct qemu_paiocb {
> @@ -51,6 +52,7 @@ struct qemu_paiocb {
> struct qemu_paiocb *next;
>
> int async_context_id;
> + ThreadletWork work;
The QTAILQ_ENTRY(qemu_paiocb) node field is no longer used, please remove it.
> };
>
> typedef struct PosixAioState {
> @@ -59,15 +61,6 @@ typedef struct PosixAioState {
> } PosixAioState;
>
>
> -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
> -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
> -static pthread_t thread_id;
> -static pthread_attr_t attr;
> -static int max_threads = 64;
> -static int cur_threads = 0;
> -static int idle_threads = 0;
> -static QTAILQ_HEAD(, qemu_paiocb) request_list;
> -
> #ifdef CONFIG_PREADV
> static int preadv_present = 1;
> #else
> @@ -85,39 +78,6 @@ static void die(const char *what)
> die2(errno, what);
> }
>
> -static void mutex_lock(pthread_mutex_t *mutex)
> -{
> - int ret = pthread_mutex_lock(mutex);
> - if (ret) die2(ret, "pthread_mutex_lock");
> -}
> -
> -static void mutex_unlock(pthread_mutex_t *mutex)
> -{
> - int ret = pthread_mutex_unlock(mutex);
> - if (ret) die2(ret, "pthread_mutex_unlock");
> -}
> -
> -static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
> - struct timespec *ts)
> -{
> - int ret = pthread_cond_timedwait(cond, mutex, ts);
> - if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
> - return ret;
> -}
> -
> -static void cond_signal(pthread_cond_t *cond)
> -{
> - int ret = pthread_cond_signal(cond);
> - if (ret) die2(ret, "pthread_cond_signal");
> -}
> -
> -static void thread_create(pthread_t *thread, pthread_attr_t *attr,
> - void *(*start_routine)(void*), void *arg)
> -{
> - int ret = pthread_create(thread, attr, start_routine, arg);
> - if (ret) die2(ret, "pthread_create");
> -}
> -
> static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
> {
> int ret;
> @@ -301,106 +261,51 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
> return nbytes;
> }
>
> -static void *aio_thread(void *unused)
> +static void aio_thread(ThreadletWork *work)
> {
> pid_t pid;
> + struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
> + ssize_t ret = 0;
>
> pid = getpid();
> + aiocb->active = 1;
>
> - while (1) {
> - struct qemu_paiocb *aiocb;
> - ssize_t ret = 0;
> - qemu_timeval tv;
> - struct timespec ts;
> -
> - qemu_gettimeofday(&tv);
> - ts.tv_sec = tv.tv_sec + 10;
> - ts.tv_nsec = 0;
> -
> - mutex_lock(&lock);
> -
> - while (QTAILQ_EMPTY(&request_list) &&
> - !(ret == ETIMEDOUT)) {
> - ret = cond_timedwait(&cond, &lock, &ts);
> - }
> -
> - if (QTAILQ_EMPTY(&request_list))
> - break;
> -
> - aiocb = QTAILQ_FIRST(&request_list);
> - QTAILQ_REMOVE(&request_list, aiocb, node);
> - aiocb->active = 1;
> - idle_threads--;
> - mutex_unlock(&lock);
> -
> - switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> - case QEMU_AIO_READ:
> - case QEMU_AIO_WRITE:
> - ret = handle_aiocb_rw(aiocb);
> - break;
> - case QEMU_AIO_FLUSH:
> - ret = handle_aiocb_flush(aiocb);
> - break;
> - case QEMU_AIO_IOCTL:
> - ret = handle_aiocb_ioctl(aiocb);
> - break;
> - default:
> - fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> - ret = -EINVAL;
> - break;
> - }
> -
> - mutex_lock(&lock);
> - aiocb->ret = ret;
> - idle_threads++;
> - mutex_unlock(&lock);
> -
> - if (kill(pid, aiocb->ev_signo)) die("kill failed");
> + switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> + case QEMU_AIO_READ:
> + case QEMU_AIO_WRITE:
> + ret = handle_aiocb_rw(aiocb);
> + break;
> + case QEMU_AIO_FLUSH:
> + ret = handle_aiocb_flush(aiocb);
> + break;
> + case QEMU_AIO_IOCTL:
> + ret = handle_aiocb_ioctl(aiocb);
> + break;
> + default:
> + fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> + ret = -EINVAL;
> + break;
> }
>
> - idle_threads--;
> - cur_threads--;
> - mutex_unlock(&lock);
> + aiocb->ret = ret;
>
> - return NULL;
> -}
> -
> -static void spawn_thread(void)
> -{
> - sigset_t set, oldset;
> -
> - cur_threads++;
> - idle_threads++;
> -
> - /* block all signals */
> - if (sigfillset(&set)) die("sigfillset");
> - if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
> -
> - thread_create(&thread_id, &attr, aio_thread, NULL);
> -
> - if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
> + if (kill(pid, aiocb->ev_signo)) die("kill failed");
> }
>
> static void qemu_paio_submit(struct qemu_paiocb *aiocb)
> {
> aiocb->ret = -EINPROGRESS;
> aiocb->active = 0;
> - mutex_lock(&lock);
> - if (idle_threads == 0 && cur_threads < max_threads)
> - spawn_thread();
> - QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
> - mutex_unlock(&lock);
> - cond_signal(&cond);
> +
> + aiocb->work.func = aio_thread;
> + submit_threadletwork(&aiocb->work);
> }
>
> static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
> {
> ssize_t ret;
>
> - mutex_lock(&lock);
> ret = aiocb->ret;
> - mutex_unlock(&lock);
> -
> return ret;
> }
>
> @@ -536,14 +441,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
> struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> int active = 0;
>
> - mutex_lock(&lock);
> if (!acb->active) {
I'm not sure the active field serves any purpose. No memory barriers
are used so the value of active is 0 before the work is executed and 0
*or* 1 while the work is executed.
The cancel_threadletwork() function already indicates whether
cancellation succeeded. Why not just try to cancel instead of using
the active field?
> - QTAILQ_REMOVE(&request_list, acb, node);
> - acb->ret = -ECANCELED;
> + if (!cancel_threadletwork(&acb->work))
> + acb->ret = -ECANCELED;
> + else
> + active = 1;
The 0 and 1 return value from cancel_threadletwork() is inverted. See
also my comment on patch 1/3 in this series.
> } else if (acb->ret == -EINPROGRESS) {
> active = 1;
> }
> - mutex_unlock(&lock);
>
> if (active) {
> /* fail safe: if the aio could not be canceled, we wait for
while (qemu_paio_error(acb) == EINPROGRESS)
;
Tight loop with no memory barrier reading a memory location that is
updated by another thread. We shouldn't communicate between threads
without barriers.
Stefan
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
2010-10-20 9:30 ` Stefan Hajnoczi
@ 2010-10-20 13:16 ` Anthony Liguori
2010-10-21 8:40 ` Arun R Bharadwaj
1 sibling, 0 replies; 38+ messages in thread
From: Anthony Liguori @ 2010-10-20 13:16 UTC (permalink / raw)
To: Stefan Hajnoczi; +Cc: Arun R Bharadwaj, qemu-devel
On 10/20/2010 04:30 AM, Stefan Hajnoczi wrote:
>
>> } else if (acb->ret == -EINPROGRESS) {
>> active = 1;
>> }
>> - mutex_unlock(&lock);
>>
>> if (active) {
>> /* fail safe: if the aio could not be canceled, we wait for
>>
> while (qemu_paio_error(acb) == EINPROGRESS)
> ;
>
> Tight loop with no memory barrier reading a memory location that is
> updated by another thread. We shouldn't communicate between threads
> without barriers.
>
We shouldn't use a tight loop period. A condition should be used if
signalling is needed.
And we shouldn't rely on atomic assignments to communicate between
threads. Just use a mutex and avoid being fancier than we need to be.
Regards,
Anthony Liguori
> Stefan
>
>
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
2010-10-20 9:30 ` Stefan Hajnoczi
2010-10-20 13:16 ` Anthony Liguori
@ 2010-10-21 8:40 ` Arun R Bharadwaj
2010-10-21 9:17 ` Stefan Hajnoczi
1 sibling, 1 reply; 38+ messages in thread
From: Arun R Bharadwaj @ 2010-10-21 8:40 UTC (permalink / raw)
To: Stefan Hajnoczi; +Cc: qemu-devel
* Stefan Hajnoczi <stefanha@gmail.com> [2010-10-20 10:30:38]:
> On Tue, Oct 19, 2010 at 6:43 PM, Arun R Bharadwaj
> <arun@linux.vnet.ibm.com> wrote:
> > From: Gautham R Shenoy <ego@in.ibm.com>
> >
> > This patch makes the paio subsystem use the threadlet framework thereby
> > decoupling asynchronous threading framework portion out of
> > posix-aio-compat.c
> >
> > The patch has been tested with fstress.
> >
> > Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
> > Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
> > ---
> > posix-aio-compat.c | 166 +++++++++-------------------------------------------
> > 1 files changed, 30 insertions(+), 136 deletions(-)
> >
> > diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> > index 7b862b5..6977c18 100644
> > --- a/posix-aio-compat.c
> > +++ b/posix-aio-compat.c
> > @@ -29,6 +29,7 @@
> > #include "block_int.h"
> >
> > #include "block/raw-posix-aio.h"
> > +#include "qemu-threadlets.h"
> >
> >
> > struct qemu_paiocb {
> > @@ -51,6 +52,7 @@ struct qemu_paiocb {
> > struct qemu_paiocb *next;
> >
> > int async_context_id;
> > + ThreadletWork work;
>
> The QTAILQ_ENTRY(qemu_paiocb) node field is no longer used, please remove it.
>
> > };
> >
> > typedef struct PosixAioState {
> > @@ -59,15 +61,6 @@ typedef struct PosixAioState {
> > } PosixAioState;
> >
> >
> > -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
> > -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
> > -static pthread_t thread_id;
> > -static pthread_attr_t attr;
> > -static int max_threads = 64;
> > -static int cur_threads = 0;
> > -static int idle_threads = 0;
> > -static QTAILQ_HEAD(, qemu_paiocb) request_list;
> > -
> > #ifdef CONFIG_PREADV
> > static int preadv_present = 1;
> > #else
> > @@ -85,39 +78,6 @@ static void die(const char *what)
> > die2(errno, what);
> > }
> >
> > -static void mutex_lock(pthread_mutex_t *mutex)
> > -{
> > - int ret = pthread_mutex_lock(mutex);
> > - if (ret) die2(ret, "pthread_mutex_lock");
> > -}
> > -
> > -static void mutex_unlock(pthread_mutex_t *mutex)
> > -{
> > - int ret = pthread_mutex_unlock(mutex);
> > - if (ret) die2(ret, "pthread_mutex_unlock");
> > -}
> > -
> > -static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
> > - struct timespec *ts)
> > -{
> > - int ret = pthread_cond_timedwait(cond, mutex, ts);
> > - if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
> > - return ret;
> > -}
> > -
> > -static void cond_signal(pthread_cond_t *cond)
> > -{
> > - int ret = pthread_cond_signal(cond);
> > - if (ret) die2(ret, "pthread_cond_signal");
> > -}
> > -
> > -static void thread_create(pthread_t *thread, pthread_attr_t *attr,
> > - void *(*start_routine)(void*), void *arg)
> > -{
> > - int ret = pthread_create(thread, attr, start_routine, arg);
> > - if (ret) die2(ret, "pthread_create");
> > -}
> > -
> > static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
> > {
> > int ret;
> > @@ -301,106 +261,51 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
> > return nbytes;
> > }
> >
> > -static void *aio_thread(void *unused)
> > +static void aio_thread(ThreadletWork *work)
> > {
> > pid_t pid;
> > + struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
> > + ssize_t ret = 0;
> >
> > pid = getpid();
> > + aiocb->active = 1;
> >
> > - while (1) {
> > - struct qemu_paiocb *aiocb;
> > - ssize_t ret = 0;
> > - qemu_timeval tv;
> > - struct timespec ts;
> > -
> > - qemu_gettimeofday(&tv);
> > - ts.tv_sec = tv.tv_sec + 10;
> > - ts.tv_nsec = 0;
> > -
> > - mutex_lock(&lock);
> > -
> > - while (QTAILQ_EMPTY(&request_list) &&
> > - !(ret == ETIMEDOUT)) {
> > - ret = cond_timedwait(&cond, &lock, &ts);
> > - }
> > -
> > - if (QTAILQ_EMPTY(&request_list))
> > - break;
> > -
> > - aiocb = QTAILQ_FIRST(&request_list);
> > - QTAILQ_REMOVE(&request_list, aiocb, node);
> > - aiocb->active = 1;
> > - idle_threads--;
> > - mutex_unlock(&lock);
> > -
> > - switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> > - case QEMU_AIO_READ:
> > - case QEMU_AIO_WRITE:
> > - ret = handle_aiocb_rw(aiocb);
> > - break;
> > - case QEMU_AIO_FLUSH:
> > - ret = handle_aiocb_flush(aiocb);
> > - break;
> > - case QEMU_AIO_IOCTL:
> > - ret = handle_aiocb_ioctl(aiocb);
> > - break;
> > - default:
> > - fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> > - ret = -EINVAL;
> > - break;
> > - }
> > -
> > - mutex_lock(&lock);
> > - aiocb->ret = ret;
> > - idle_threads++;
> > - mutex_unlock(&lock);
> > -
> > - if (kill(pid, aiocb->ev_signo)) die("kill failed");
> > + switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> > + case QEMU_AIO_READ:
> > + case QEMU_AIO_WRITE:
> > + ret = handle_aiocb_rw(aiocb);
> > + break;
> > + case QEMU_AIO_FLUSH:
> > + ret = handle_aiocb_flush(aiocb);
> > + break;
> > + case QEMU_AIO_IOCTL:
> > + ret = handle_aiocb_ioctl(aiocb);
> > + break;
> > + default:
> > + fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> > + ret = -EINVAL;
> > + break;
> > }
> >
> > - idle_threads--;
> > - cur_threads--;
> > - mutex_unlock(&lock);
> > + aiocb->ret = ret;
> >
> > - return NULL;
> > -}
> > -
> > -static void spawn_thread(void)
> > -{
> > - sigset_t set, oldset;
> > -
> > - cur_threads++;
> > - idle_threads++;
> > -
> > - /* block all signals */
> > - if (sigfillset(&set)) die("sigfillset");
> > - if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
> > -
> > - thread_create(&thread_id, &attr, aio_thread, NULL);
> > -
> > - if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
> > + if (kill(pid, aiocb->ev_signo)) die("kill failed");
> > }
> >
> > static void qemu_paio_submit(struct qemu_paiocb *aiocb)
> > {
> > aiocb->ret = -EINPROGRESS;
> > aiocb->active = 0;
> > - mutex_lock(&lock);
> > - if (idle_threads == 0 && cur_threads < max_threads)
> > - spawn_thread();
> > - QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
> > - mutex_unlock(&lock);
> > - cond_signal(&cond);
> > +
> > + aiocb->work.func = aio_thread;
> > + submit_threadletwork(&aiocb->work);
> > }
> >
> > static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
> > {
> > ssize_t ret;
> >
> > - mutex_lock(&lock);
> > ret = aiocb->ret;
> > - mutex_unlock(&lock);
> > -
> > return ret;
> > }
> >
> > @@ -536,14 +441,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
> > struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> > int active = 0;
> >
> > - mutex_lock(&lock);
> > if (!acb->active) {
>
> I'm not sure the active field serves any purpose. No memory barriers
> are used so the value of active is 0 before the work is executed and 0
> *or* 1 while the work is executed.
>
> The cancel_threadletwork() function already indicates whether
> cancellation succeeded. Why not just try to cancel instead of using
> the active field?
>
This series does not touch the active field anywhere. So I feel we can
implement this as a separate patch instead of clubbing it with this.
-arun
> > - QTAILQ_REMOVE(&request_list, acb, node);
> > - acb->ret = -ECANCELED;
> > + if (!cancel_threadletwork(&acb->work))
> > + acb->ret = -ECANCELED;
> > + else
> > + active = 1;
>
> The 0 and 1 return value from cancel_threadletwork() is inverted. See
> also my comment on patch 1/3 in this series.
>
> > } else if (acb->ret == -EINPROGRESS) {
> > active = 1;
> > }
> > - mutex_unlock(&lock);
> >
> > if (active) {
> > /* fail safe: if the aio could not be canceled, we wait for
>
> while (qemu_paio_error(acb) == EINPROGRESS)
> ;
>
> Tight loop with no memory barrier reading a memory location that is
> updated by another thread. We shouldn't communicate between threads
> without barriers.
>
> Stefan
>
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
2010-10-21 8:40 ` Arun R Bharadwaj
@ 2010-10-21 9:17 ` Stefan Hajnoczi
0 siblings, 0 replies; 38+ messages in thread
From: Stefan Hajnoczi @ 2010-10-21 9:17 UTC (permalink / raw)
To: arun; +Cc: qemu-devel
On Thu, Oct 21, 2010 at 9:40 AM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> * Stefan Hajnoczi <stefanha@gmail.com> [2010-10-20 10:30:38]:
>
>> On Tue, Oct 19, 2010 at 6:43 PM, Arun R Bharadwaj
>> <arun@linux.vnet.ibm.com> wrote:
>> > From: Gautham R Shenoy <ego@in.ibm.com>
>> >
>> > This patch makes the paio subsystem use the threadlet framework thereby
>> > decoupling asynchronous threading framework portion out of
>> > posix-aio-compat.c
>> >
>> > The patch has been tested with fstress.
>> >
>> > Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
>> > Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
>> > ---
>> > posix-aio-compat.c | 166 +++++++++-------------------------------------------
>> > 1 files changed, 30 insertions(+), 136 deletions(-)
>> >
>> > diff --git a/posix-aio-compat.c b/posix-aio-compat.c
>> > index 7b862b5..6977c18 100644
>> > --- a/posix-aio-compat.c
>> > +++ b/posix-aio-compat.c
>> > @@ -29,6 +29,7 @@
>> > #include "block_int.h"
>> >
>> > #include "block/raw-posix-aio.h"
>> > +#include "qemu-threadlets.h"
>> >
>> >
>> > struct qemu_paiocb {
>> > @@ -51,6 +52,7 @@ struct qemu_paiocb {
>> > struct qemu_paiocb *next;
>> >
>> > int async_context_id;
>> > + ThreadletWork work;
>>
>> The QTAILQ_ENTRY(qemu_paiocb) node field is no longer used, please remove it.
>>
>> > };
>> >
>> > typedef struct PosixAioState {
>> > @@ -59,15 +61,6 @@ typedef struct PosixAioState {
>> > } PosixAioState;
>> >
>> >
>> > -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
>> > -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
>> > -static pthread_t thread_id;
>> > -static pthread_attr_t attr;
>> > -static int max_threads = 64;
>> > -static int cur_threads = 0;
>> > -static int idle_threads = 0;
>> > -static QTAILQ_HEAD(, qemu_paiocb) request_list;
>> > -
>> > #ifdef CONFIG_PREADV
>> > static int preadv_present = 1;
>> > #else
>> > @@ -85,39 +78,6 @@ static void die(const char *what)
>> > die2(errno, what);
>> > }
>> >
>> > -static void mutex_lock(pthread_mutex_t *mutex)
>> > -{
>> > - int ret = pthread_mutex_lock(mutex);
>> > - if (ret) die2(ret, "pthread_mutex_lock");
>> > -}
>> > -
>> > -static void mutex_unlock(pthread_mutex_t *mutex)
>> > -{
>> > - int ret = pthread_mutex_unlock(mutex);
>> > - if (ret) die2(ret, "pthread_mutex_unlock");
>> > -}
>> > -
>> > -static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
>> > - struct timespec *ts)
>> > -{
>> > - int ret = pthread_cond_timedwait(cond, mutex, ts);
>> > - if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
>> > - return ret;
>> > -}
>> > -
>> > -static void cond_signal(pthread_cond_t *cond)
>> > -{
>> > - int ret = pthread_cond_signal(cond);
>> > - if (ret) die2(ret, "pthread_cond_signal");
>> > -}
>> > -
>> > -static void thread_create(pthread_t *thread, pthread_attr_t *attr,
>> > - void *(*start_routine)(void*), void *arg)
>> > -{
>> > - int ret = pthread_create(thread, attr, start_routine, arg);
>> > - if (ret) die2(ret, "pthread_create");
>> > -}
>> > -
>> > static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
>> > {
>> > int ret;
>> > @@ -301,106 +261,51 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
>> > return nbytes;
>> > }
>> >
>> > -static void *aio_thread(void *unused)
>> > +static void aio_thread(ThreadletWork *work)
>> > {
>> > pid_t pid;
>> > + struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
>> > + ssize_t ret = 0;
>> >
>> > pid = getpid();
>> > + aiocb->active = 1;
>> >
>> > - while (1) {
>> > - struct qemu_paiocb *aiocb;
>> > - ssize_t ret = 0;
>> > - qemu_timeval tv;
>> > - struct timespec ts;
>> > -
>> > - qemu_gettimeofday(&tv);
>> > - ts.tv_sec = tv.tv_sec + 10;
>> > - ts.tv_nsec = 0;
>> > -
>> > - mutex_lock(&lock);
>> > -
>> > - while (QTAILQ_EMPTY(&request_list) &&
>> > - !(ret == ETIMEDOUT)) {
>> > - ret = cond_timedwait(&cond, &lock, &ts);
>> > - }
>> > -
>> > - if (QTAILQ_EMPTY(&request_list))
>> > - break;
>> > -
>> > - aiocb = QTAILQ_FIRST(&request_list);
>> > - QTAILQ_REMOVE(&request_list, aiocb, node);
>> > - aiocb->active = 1;
>> > - idle_threads--;
>> > - mutex_unlock(&lock);
>> > -
>> > - switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
>> > - case QEMU_AIO_READ:
>> > - case QEMU_AIO_WRITE:
>> > - ret = handle_aiocb_rw(aiocb);
>> > - break;
>> > - case QEMU_AIO_FLUSH:
>> > - ret = handle_aiocb_flush(aiocb);
>> > - break;
>> > - case QEMU_AIO_IOCTL:
>> > - ret = handle_aiocb_ioctl(aiocb);
>> > - break;
>> > - default:
>> > - fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
>> > - ret = -EINVAL;
>> > - break;
>> > - }
>> > -
>> > - mutex_lock(&lock);
>> > - aiocb->ret = ret;
>> > - idle_threads++;
>> > - mutex_unlock(&lock);
>> > -
>> > - if (kill(pid, aiocb->ev_signo)) die("kill failed");
>> > + switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
>> > + case QEMU_AIO_READ:
>> > + case QEMU_AIO_WRITE:
>> > + ret = handle_aiocb_rw(aiocb);
>> > + break;
>> > + case QEMU_AIO_FLUSH:
>> > + ret = handle_aiocb_flush(aiocb);
>> > + break;
>> > + case QEMU_AIO_IOCTL:
>> > + ret = handle_aiocb_ioctl(aiocb);
>> > + break;
>> > + default:
>> > + fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
>> > + ret = -EINVAL;
>> > + break;
>> > }
>> >
>> > - idle_threads--;
>> > - cur_threads--;
>> > - mutex_unlock(&lock);
>> > + aiocb->ret = ret;
>> >
>> > - return NULL;
>> > -}
>> > -
>> > -static void spawn_thread(void)
>> > -{
>> > - sigset_t set, oldset;
>> > -
>> > - cur_threads++;
>> > - idle_threads++;
>> > -
>> > - /* block all signals */
>> > - if (sigfillset(&set)) die("sigfillset");
>> > - if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
>> > -
>> > - thread_create(&thread_id, &attr, aio_thread, NULL);
>> > -
>> > - if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
>> > + if (kill(pid, aiocb->ev_signo)) die("kill failed");
>> > }
>> >
>> > static void qemu_paio_submit(struct qemu_paiocb *aiocb)
>> > {
>> > aiocb->ret = -EINPROGRESS;
>> > aiocb->active = 0;
>> > - mutex_lock(&lock);
>> > - if (idle_threads == 0 && cur_threads < max_threads)
>> > - spawn_thread();
>> > - QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
>> > - mutex_unlock(&lock);
>> > - cond_signal(&cond);
>> > +
>> > + aiocb->work.func = aio_thread;
>> > + submit_threadletwork(&aiocb->work);
>> > }
>> >
>> > static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
>> > {
>> > ssize_t ret;
>> >
>> > - mutex_lock(&lock);
>> > ret = aiocb->ret;
>> > - mutex_unlock(&lock);
>> > -
>> > return ret;
>> > }
>> >
>> > @@ -536,14 +441,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
>> > struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
>> > int active = 0;
>> >
>> > - mutex_lock(&lock);
>> > if (!acb->active) {
>>
>> I'm not sure the active field serves any purpose. No memory barriers
>> are used so the value of active is 0 before the work is executed and 0
>> *or* 1 while the work is executed.
>>
>> The cancel_threadletwork() function already indicates whether
>> cancellation succeeded. Why not just try to cancel instead of using
>> the active field?
>>
>
> This series does not touch the active field anywhere. So I feel we can
> implement this as a separate patch instead of clubbing it with this.
I'd prefer for this to be addressed in this patch because the active
field served a function before but no longer works with threadlets.
You're right that the patch doesn't touch it, and QEMU still compiles
fine with it, but it's still broken as I described in the previous
email.
In other words, the patch breaks the active field, please fix it.
Stefan
^ permalink raw reply [flat|nested] 38+ messages in thread
* [Qemu-devel] [PATCH 3/3] Add helper functions for virtio-9p to use threadlets
2010-10-19 17:42 [Qemu-devel] v6: [PATCH 0/3]: Threadlets: A generic task offloading framework Arun R Bharadwaj
2010-10-19 17:42 ` [Qemu-devel] [PATCH 1/3] Introduce threadlets Arun R Bharadwaj
2010-10-19 17:43 ` [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets Arun R Bharadwaj
@ 2010-10-19 17:43 ` Arun R Bharadwaj
2010-10-20 11:19 ` Stefan Hajnoczi
2010-10-20 11:57 ` [Qemu-devel] v6: [PATCH 0/3]: Threadlets: A generic task offloading framework Amit Shah
3 siblings, 1 reply; 38+ messages in thread
From: Arun R Bharadwaj @ 2010-10-19 17:43 UTC (permalink / raw)
To: qemu-devel
From: Gautham R Shenoy <ego@in.ibm.com>
Add helper functions to enable virtio-9p make use of the threadlets
infrastructure for offloading blocking tasks such as making posix calls on
to the helper threads and handle the post_posix_operations() from the
context of the iothread. This frees the vcpu thread to process any other guest
operations while the processing of v9fs_io is in progress.
Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
hw/virtio-9p.c | 165 ++++++++++++++++++++++++++++++++++++++++++++++++++++
posix-aio-compat.c | 33 +++-------
qemu-threadlets.c | 21 +++++++
qemu-threadlets.h | 1
vl.c | 3 +
5 files changed, 200 insertions(+), 23 deletions(-)
diff --git a/hw/virtio-9p.c b/hw/virtio-9p.c
index a871685..174300d 100644
--- a/hw/virtio-9p.c
+++ b/hw/virtio-9p.c
@@ -18,6 +18,7 @@
#include "fsdev/qemu-fsdev.h"
#include "virtio-9p-debug.h"
#include "virtio-9p-xattr.h"
+#include "qemu-threadlets.h"
int debug_9p_pdu;
@@ -33,6 +34,146 @@ enum {
Oappend = 0x80,
};
+struct v9fs_post_op {
+ QTAILQ_ENTRY(v9fs_post_op) node;
+ void (*func)(void *arg);
+ void *arg;
+};
+
+static struct {
+ int rfd;
+ int wfd;
+ QemuMutex lock;
+ QTAILQ_HEAD(, v9fs_post_op) post_op_list;
+} v9fs_async_struct;
+
+static void die2(int err, const char *what)
+{
+ fprintf(stderr, "%s failed: %s\n", what, strerror(err));
+ abort();
+}
+
+static void die(const char *what)
+{
+ die2(errno, what);
+}
+
+#define ASYNC_MAX_PROCESS 5
+
+/**
+ * v9fs_process_post_ops: Process any pending v9fs_post_posix_operation
+ * @arg: Not used.
+ *
+ * This function serves as a callback to the iothread to be called into whenever
+ * the v9fs_async_struct.wfd is written into. This thread goes through the list
+ * of v9fs_post_posix_operations() and executes them. In the process, it might
+ * queue more job on the asynchronous thread pool.
+ */
+static void v9fs_process_post_ops(void *arg)
+{
+ int count = 0;
+ struct v9fs_post_op *post_op;
+ int ret;
+ char byte;
+
+ qemu_mutex_lock(&v9fs_async_struct.lock);
+ do {
+ ret = read(v9fs_async_struct.rfd, &byte, sizeof(byte));
+ } while (ret >= 0 && errno != EAGAIN);
+
+ for (count = 0; count < ASYNC_MAX_PROCESS; count++) {
+ if (QTAILQ_EMPTY(&(v9fs_async_struct.post_op_list))) {
+ break;
+ }
+ post_op = QTAILQ_FIRST(&(v9fs_async_struct.post_op_list));
+ QTAILQ_REMOVE(&(v9fs_async_struct.post_op_list), post_op, node);
+
+ qemu_mutex_unlock(&v9fs_async_struct.lock);
+ post_op->func(post_op->arg);
+ qemu_free(post_op);
+ qemu_mutex_lock(&v9fs_async_struct.lock);
+ }
+ qemu_mutex_unlock(&v9fs_async_struct.lock);
+}
+
+/**
+ * v9fs_async_signal: Inform the io-thread of completion of async job.
+ *
+ * This function is used to inform the iothread that a particular
+ * async-operation pertaining to v9fs has been completed and that the io thread
+ * can handle the v9fs_post_posix_operation.
+ *
+ * This is based on the aio_signal_handler
+ */
+static inline void v9fs_async_signal(void)
+{
+ char byte = 0;
+ ssize_t ret;
+ int tries = 0;
+
+ qemu_mutex_lock(&v9fs_async_struct.lock);
+ do {
+ assert(tries != 100);
+ ret = write(v9fs_async_struct.wfd, &byte, sizeof(byte));
+ tries++;
+ } while (ret < 0 && errno == EAGAIN);
+ qemu_mutex_unlock(&v9fs_async_struct.lock);
+
+ if (ret < 0 && errno != EAGAIN)
+ die("write() in v9fs");
+
+ if (kill(getpid(), SIGUSR2)) die("kill failed");
+}
+
+/**
+ * v9fs_async_helper_done: Marks the completion of the v9fs_async job
+ * @func: v9fs_post_posix_func() for post-processing invoked in the context of
+ * the io-thread
+ * @arg: Argument to func.
+ *
+ * This function is called from the context of one of the asynchronous threads
+ * in the thread pool. This is called when the asynchronous thread has finished
+ * executing a v9fs_posix_operation. It's purpose is to initiate the process of
+ * informing the io-thread that the v9fs_posix_operation has completed.
+ */
+static void v9fs_async_helper_done(void (*func)(void *arg), void *arg)
+{
+ struct v9fs_post_op *post_op;
+
+ post_op = qemu_mallocz(sizeof(*post_op));
+ post_op->func = func;
+ post_op->arg = arg;
+
+ qemu_mutex_lock(&v9fs_async_struct.lock);
+ QTAILQ_INSERT_TAIL(&(v9fs_async_struct.post_op_list), post_op, node);
+ qemu_mutex_unlock(&v9fs_async_struct.lock);
+
+ v9fs_async_signal();
+}
+
+/**
+ * v9fs_do_async_posix: Offload v9fs_posix_operation onto async thread.
+ * @vs: V9fsOPState variable for the OP operation.
+ * @posix_fn: The posix function which has to be offloaded onto async thread.
+ * @post_fn_ptr: Address of the location to hold the post_fn corresponding to
+ * the posix_fn
+ * @post_fn: The post processing function corresponding to the posix_fn.
+ *
+ * This function is a helper to offload posix_operation on to the asynchronous
+ * thread pool. It sets up the associations with the post_function that needs to
+ * be invoked by from the context of the iothread once the posix_fn has been
+ * executed.
+ */
+static void v9fs_do_async_posix(ThreadletWork *work ,
+ void (*posix_fn)(ThreadletWork *work),
+ void (**post_fn_ptr)(void *arg),
+ void (*post_fn)(void *arg))
+{
+ *post_fn_ptr = post_fn;
+ work->func = posix_fn;
+ submit_threadletwork(work);
+}
+
static int omode_to_uflags(int8_t mode)
{
int ret = 0;
@@ -3639,7 +3780,7 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
int i, len;
struct stat stat;
FsTypeEntry *fse;
-
+ int fds[2];
s = (V9fsState *)virtio_common_init("virtio-9p",
VIRTIO_ID_9P,
@@ -3722,5 +3863,27 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
s->tag_len;
s->vdev.get_config = virtio_9p_get_config;
+ if (qemu_pipe(fds) == -1) {
+ fprintf(stderr, "failed to create fd's for virtio-9p\n");
+ exit(1);
+ }
+
+ v9fs_async_struct.rfd = fds[0];
+ v9fs_async_struct.wfd = fds[1];
+
+ printf("v9fs: rfd: %d\n", v9fs_async_struct.rfd);
+ printf("v9fs: wfd: %d\n", v9fs_async_struct.wfd);
+
+ fcntl(fds[0], F_SETFL, O_NONBLOCK);
+ fcntl(fds[1], F_SETFL, O_NONBLOCK);
+
+ qemu_set_fd_handler(fds[0], v9fs_process_post_ops, NULL, NULL);
+ QTAILQ_INIT(&v9fs_async_struct.post_op_list);
+ qemu_mutex_init(&(v9fs_async_struct.lock));
+ /* Create async queue. */
+
+ (void)v9fs_do_async_posix;
+ (void)v9fs_async_helper_done;
+
return &s->vdev;
}
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 6977c18..dbf9321 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -261,6 +261,8 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
return nbytes;
}
+static PosixAioState *posix_aio_state;
+
static void aio_thread(ThreadletWork *work)
{
pid_t pid;
@@ -289,6 +291,15 @@ static void aio_thread(ThreadletWork *work)
aiocb->ret = ret;
+ if (posix_aio_state) {
+ char byte = 0;
+ ssize_t ret;
+
+ ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
+ if (ret < 0 && errno != EAGAIN)
+ die("write()");
+ }
+
if (kill(pid, aiocb->ev_signo)) die("kill failed");
}
@@ -401,22 +412,6 @@ static int posix_aio_flush(void *opaque)
return !!s->first_aio;
}
-static PosixAioState *posix_aio_state;
-
-static void aio_signal_handler(int signum)
-{
- if (posix_aio_state) {
- char byte = 0;
- ssize_t ret;
-
- ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
- if (ret < 0 && errno != EAGAIN)
- die("write()");
- }
-
- qemu_service_io();
-}
-
static void paio_remove(struct qemu_paiocb *acb)
{
struct qemu_paiocb **pacb;
@@ -520,7 +515,6 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
int paio_init(void)
{
- struct sigaction act;
PosixAioState *s;
int fds[2];
@@ -529,11 +523,6 @@ int paio_init(void)
s = qemu_malloc(sizeof(PosixAioState));
- sigfillset(&act.sa_mask);
- act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
- act.sa_handler = aio_signal_handler;
- sigaction(SIGUSR2, &act, NULL);
-
s->first_aio = NULL;
if (qemu_pipe(fds) == -1) {
fprintf(stderr, "failed to create pipe\n");
diff --git a/qemu-threadlets.c b/qemu-threadlets.c
index fd33752..df02f4e 100644
--- a/qemu-threadlets.c
+++ b/qemu-threadlets.c
@@ -15,12 +15,28 @@
#include "qemu-threadlets.h"
#include "osdep.h"
+#include <signal.h>
#define MAX_GLOBAL_THREADS 64
#define MIN_GLOBAL_THREADS 64
static ThreadletQueue globalqueue;
static int globalqueue_init;
+static void threadlet_io_completion_signal_handler(int signum)
+{
+ qemu_service_io();
+}
+
+static void threadlet_register_signal_handler(void)
+{
+ struct sigaction act;
+
+ sigfillset(&act.sa_mask);
+ act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
+ act.sa_handler = threadlet_io_completion_signal_handler;
+ sigaction(SIGUSR2, &act, NULL);
+}
+
static void *threadlet_worker(void *data)
{
ThreadletQueue *queue = data;
@@ -163,3 +179,8 @@ void threadlet_queue_init(ThreadletQueue *queue,
qemu_mutex_init(&(queue->lock));
qemu_cond_init(&(queue->cond));
}
+
+void threadlet_init(void)
+{
+ threadlet_register_signal_handler();
+}
diff --git a/qemu-threadlets.h b/qemu-threadlets.h
index 9c8f9e5..3de589b 100644
--- a/qemu-threadlets.h
+++ b/qemu-threadlets.h
@@ -45,4 +45,5 @@ extern int cancel_threadletwork_on_queue(ThreadletQueue *queue,
extern int cancel_threadletwork(ThreadletWork *work);
extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
int min_threads);
+extern void threadlet_init(void);
#endif
diff --git a/vl.c b/vl.c
index df414ef..7b9a425 100644
--- a/vl.c
+++ b/vl.c
@@ -148,6 +148,7 @@ int main(int argc, char **argv)
#include "qemu-config.h"
#include "qemu-objects.h"
#include "qemu-options.h"
+#include "qemu-threadlets.h"
#ifdef CONFIG_VIRTFS
#include "fsdev/qemu-fsdev.h"
#endif
@@ -2922,6 +2923,8 @@ int main(int argc, char **argv, char **envp)
exit(1);
}
+ threadlet_init();
+
/* init generic devices */
if (qemu_opts_foreach(qemu_find_opts("device"), device_init_func, NULL, 1) != 0)
exit(1);
^ permalink raw reply related [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] [PATCH 3/3] Add helper functions for virtio-9p to use threadlets
2010-10-19 17:43 ` [Qemu-devel] [PATCH 3/3] Add helper functions for virtio-9p to " Arun R Bharadwaj
@ 2010-10-20 11:19 ` Stefan Hajnoczi
2010-10-20 13:17 ` Anthony Liguori
0 siblings, 1 reply; 38+ messages in thread
From: Stefan Hajnoczi @ 2010-10-20 11:19 UTC (permalink / raw)
To: Arun R Bharadwaj; +Cc: qemu-devel
On Tue, Oct 19, 2010 at 6:43 PM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> From: Gautham R Shenoy <ego@in.ibm.com>
>
> Add helper functions to enable virtio-9p make use of the threadlets
> infrastructure for offloading blocking tasks such as making posix calls on
> to the helper threads and handle the post_posix_operations() from the
> context of the iothread. This frees the vcpu thread to process any other guest
> operations while the processing of v9fs_io is in progress.
>
> Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
> Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
> Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
> ---
> hw/virtio-9p.c | 165 ++++++++++++++++++++++++++++++++++++++++++++++++++++
> posix-aio-compat.c | 33 +++-------
> qemu-threadlets.c | 21 +++++++
> qemu-threadlets.h | 1
> vl.c | 3 +
> 5 files changed, 200 insertions(+), 23 deletions(-)
I wish --enable-io-thread was the default and only model. The signals
and pipes are ugly and should be hidden behind a QEMU eventfd
abstraction, which would also reduce the code duplication between
posix-aio-compat.c and virtio-9p. I'm not asking you to do this but I
hope we'll get there eventually.
Does anyone know why non-io-thread still exists and is used by default?
> diff --git a/hw/virtio-9p.c b/hw/virtio-9p.c
> index a871685..174300d 100644
> --- a/hw/virtio-9p.c
> +++ b/hw/virtio-9p.c
> @@ -18,6 +18,7 @@
> #include "fsdev/qemu-fsdev.h"
> #include "virtio-9p-debug.h"
> #include "virtio-9p-xattr.h"
> +#include "qemu-threadlets.h"
>
> int debug_9p_pdu;
>
> @@ -33,6 +34,146 @@ enum {
> Oappend = 0x80,
> };
>
> +struct v9fs_post_op {
> + QTAILQ_ENTRY(v9fs_post_op) node;
> + void (*func)(void *arg);
> + void *arg;
> +};
> +
> +static struct {
> + int rfd;
> + int wfd;
> + QemuMutex lock;
> + QTAILQ_HEAD(, v9fs_post_op) post_op_list;
> +} v9fs_async_struct;
> +
> +static void die2(int err, const char *what)
> +{
> + fprintf(stderr, "%s failed: %s\n", what, strerror(err));
> + abort();
> +}
> +
> +static void die(const char *what)
> +{
> + die2(errno, what);
> +}
> +
> +#define ASYNC_MAX_PROCESS 5
What does this constant define? I think it is an arbitrary limit on
the amount of work you want to do per v9fs_process_post_ops() call?
> +
> +/**
> + * v9fs_process_post_ops: Process any pending v9fs_post_posix_operation
> + * @arg: Not used.
> + *
> + * This function serves as a callback to the iothread to be called into whenever
> + * the v9fs_async_struct.wfd is written into. This thread goes through the list
> + * of v9fs_post_posix_operations() and executes them. In the process, it might
> + * queue more job on the asynchronous thread pool.
> + */
> +static void v9fs_process_post_ops(void *arg)
> +{
> + int count = 0;
> + struct v9fs_post_op *post_op;
> + int ret;
> + char byte;
> +
> + qemu_mutex_lock(&v9fs_async_struct.lock);
> + do {
> + ret = read(v9fs_async_struct.rfd, &byte, sizeof(byte));
> + } while (ret >= 0 && errno != EAGAIN);
ret >= 0 && errno != EAGAIN looks odd to me. Should && be ||?
> +
> + for (count = 0; count < ASYNC_MAX_PROCESS; count++) {
> + if (QTAILQ_EMPTY(&(v9fs_async_struct.post_op_list))) {
> + break;
> + }
> + post_op = QTAILQ_FIRST(&(v9fs_async_struct.post_op_list));
> + QTAILQ_REMOVE(&(v9fs_async_struct.post_op_list), post_op, node);
> +
> + qemu_mutex_unlock(&v9fs_async_struct.lock);
> + post_op->func(post_op->arg);
> + qemu_free(post_op);
> + qemu_mutex_lock(&v9fs_async_struct.lock);
> + }
> + qemu_mutex_unlock(&v9fs_async_struct.lock);
> +}
> +
> +/**
> + * v9fs_async_signal: Inform the io-thread of completion of async job.
> + *
> + * This function is used to inform the iothread that a particular
> + * async-operation pertaining to v9fs has been completed and that the io thread
> + * can handle the v9fs_post_posix_operation.
> + *
> + * This is based on the aio_signal_handler
> + */
> +static inline void v9fs_async_signal(void)
> +{
> + char byte = 0;
> + ssize_t ret;
> + int tries = 0;
> +
> + qemu_mutex_lock(&v9fs_async_struct.lock);
> + do {
> + assert(tries != 100);
> + ret = write(v9fs_async_struct.wfd, &byte, sizeof(byte));
> + tries++;
> + } while (ret < 0 && errno == EAGAIN);
> + qemu_mutex_unlock(&v9fs_async_struct.lock);
> +
> + if (ret < 0 && errno != EAGAIN)
> + die("write() in v9fs");
> +
> + if (kill(getpid(), SIGUSR2)) die("kill failed");
> +}
> +
> +/**
> + * v9fs_async_helper_done: Marks the completion of the v9fs_async job
> + * @func: v9fs_post_posix_func() for post-processing invoked in the context of
> + * the io-thread
> + * @arg: Argument to func.
> + *
> + * This function is called from the context of one of the asynchronous threads
> + * in the thread pool. This is called when the asynchronous thread has finished
> + * executing a v9fs_posix_operation. It's purpose is to initiate the process of
> + * informing the io-thread that the v9fs_posix_operation has completed.
> + */
> +static void v9fs_async_helper_done(void (*func)(void *arg), void *arg)
> +{
> + struct v9fs_post_op *post_op;
> +
> + post_op = qemu_mallocz(sizeof(*post_op));
> + post_op->func = func;
> + post_op->arg = arg;
> +
> + qemu_mutex_lock(&v9fs_async_struct.lock);
> + QTAILQ_INSERT_TAIL(&(v9fs_async_struct.post_op_list), post_op, node);
> + qemu_mutex_unlock(&v9fs_async_struct.lock);
> +
> + v9fs_async_signal();
> +}
> +
> +/**
> + * v9fs_do_async_posix: Offload v9fs_posix_operation onto async thread.
> + * @vs: V9fsOPState variable for the OP operation.
> + * @posix_fn: The posix function which has to be offloaded onto async thread.
> + * @post_fn_ptr: Address of the location to hold the post_fn corresponding to
> + * the posix_fn
> + * @post_fn: The post processing function corresponding to the posix_fn.
> + *
> + * This function is a helper to offload posix_operation on to the asynchronous
> + * thread pool. It sets up the associations with the post_function that needs to
> + * be invoked by from the context of the iothread once the posix_fn has been
> + * executed.
> + */
> +static void v9fs_do_async_posix(ThreadletWork *work ,
> + void (*posix_fn)(ThreadletWork *work),
> + void (**post_fn_ptr)(void *arg),
> + void (*post_fn)(void *arg))
> +{
> + *post_fn_ptr = post_fn;
> + work->func = posix_fn;
> + submit_threadletwork(work);
> +}
> +
> static int omode_to_uflags(int8_t mode)
> {
> int ret = 0;
> @@ -3639,7 +3780,7 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
> int i, len;
> struct stat stat;
> FsTypeEntry *fse;
> -
> + int fds[2];
>
> s = (V9fsState *)virtio_common_init("virtio-9p",
> VIRTIO_ID_9P,
> @@ -3722,5 +3863,27 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, V9fsConf *conf)
> s->tag_len;
> s->vdev.get_config = virtio_9p_get_config;
>
> + if (qemu_pipe(fds) == -1) {
> + fprintf(stderr, "failed to create fd's for virtio-9p\n");
> + exit(1);
> + }
> +
> + v9fs_async_struct.rfd = fds[0];
> + v9fs_async_struct.wfd = fds[1];
> +
> + printf("v9fs: rfd: %d\n", v9fs_async_struct.rfd);
> + printf("v9fs: wfd: %d\n", v9fs_async_struct.wfd);
This looks like debug code.
> +
> + fcntl(fds[0], F_SETFL, O_NONBLOCK);
> + fcntl(fds[1], F_SETFL, O_NONBLOCK);
> +
> + qemu_set_fd_handler(fds[0], v9fs_process_post_ops, NULL, NULL);
> + QTAILQ_INIT(&v9fs_async_struct.post_op_list);
> + qemu_mutex_init(&(v9fs_async_struct.lock));
> + /* Create async queue. */
Not sure what the purpose of this comment is.
> +
> + (void)v9fs_do_async_posix;
> + (void)v9fs_async_helper_done;
These will disappear once there is a user for these functions?
> +
> return &s->vdev;
> }
> diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> index 6977c18..dbf9321 100644
> --- a/posix-aio-compat.c
> +++ b/posix-aio-compat.c
> @@ -261,6 +261,8 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
> return nbytes;
> }
>
> +static PosixAioState *posix_aio_state;
> +
> static void aio_thread(ThreadletWork *work)
> {
> pid_t pid;
> @@ -289,6 +291,15 @@ static void aio_thread(ThreadletWork *work)
>
> aiocb->ret = ret;
>
> + if (posix_aio_state) {
The posix_aio_state check was necessary to guard the signal handler
before posix aio was fully set up. In aio_thread() posix_aio_state
must always be set up. There's no need to check.
Stefan
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] [PATCH 3/3] Add helper functions for virtio-9p to use threadlets
2010-10-20 11:19 ` Stefan Hajnoczi
@ 2010-10-20 13:17 ` Anthony Liguori
0 siblings, 0 replies; 38+ messages in thread
From: Anthony Liguori @ 2010-10-20 13:17 UTC (permalink / raw)
To: Stefan Hajnoczi; +Cc: Arun R Bharadwaj, qemu-devel
On 10/20/2010 06:19 AM, Stefan Hajnoczi wrote:
> On Tue, Oct 19, 2010 at 6:43 PM, Arun R Bharadwaj
> <arun@linux.vnet.ibm.com> wrote:
>
>> From: Gautham R Shenoy<ego@in.ibm.com>
>>
>> Add helper functions to enable virtio-9p make use of the threadlets
>> infrastructure for offloading blocking tasks such as making posix calls on
>> to the helper threads and handle the post_posix_operations() from the
>> context of the iothread. This frees the vcpu thread to process any other guest
>> operations while the processing of v9fs_io is in progress.
>>
>> Signed-off-by: Gautham R Shenoy<ego@in.ibm.com>
>> Signed-off-by: Sripathi Kodi<sripathik@in.ibm.com>
>> Signed-off-by: Arun R Bharadwaj<arun@linux.vnet.ibm.com>
>> ---
>> hw/virtio-9p.c | 165 ++++++++++++++++++++++++++++++++++++++++++++++++++++
>> posix-aio-compat.c | 33 +++-------
>> qemu-threadlets.c | 21 +++++++
>> qemu-threadlets.h | 1
>> vl.c | 3 +
>> 5 files changed, 200 insertions(+), 23 deletions(-)
>>
> I wish --enable-io-thread was the default and only model. The signals
> and pipes are ugly and should be hidden behind a QEMU eventfd
> abstraction, which would also reduce the code duplication between
> posix-aio-compat.c and virtio-9p. I'm not asking you to do this but I
> hope we'll get there eventually.
>
> Does anyone know why non-io-thread still exists and is used by default?
>
There are still issues with --enable-io-thread and TCG.
Regards,
Anthony Liguori
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] v6: [PATCH 0/3]: Threadlets: A generic task offloading framework
2010-10-19 17:42 [Qemu-devel] v6: [PATCH 0/3]: Threadlets: A generic task offloading framework Arun R Bharadwaj
` (2 preceding siblings ...)
2010-10-19 17:43 ` [Qemu-devel] [PATCH 3/3] Add helper functions for virtio-9p to " Arun R Bharadwaj
@ 2010-10-20 11:57 ` Amit Shah
2010-10-20 12:05 ` Stefan Hajnoczi
3 siblings, 1 reply; 38+ messages in thread
From: Amit Shah @ 2010-10-20 11:57 UTC (permalink / raw)
To: Arun R Bharadwaj; +Cc: qemu-devel
On (Tue) Oct 19 2010 [23:12:20], Arun R Bharadwaj wrote:
> Hi,
>
> This is the v6 of the patch-series to have a generic asynchronous task
> offloading framework (called threadlets) within qemu.
>
> Request to consider pulling this series as discussed during the
> Qemu-devel call.
I tried this out with virtio-serial (patch below). Have a couple of
things to note:
- Guests get a SIGUSR2 on startup sometimes. This doesn't happen with
qemu.git, so looks like it's introduced by this patchset.
- After running some tests, I get an abort. I still have to look at
what's causing it, but doesn't look like it's related to virtio-serial
code.
Program received signal SIGABRT, Aborted.
0x0000003dc76329a5 in raise () from /lib64/libc.so.6
Missing separate debuginfos, use: debuginfo-install
SDL-1.2.14-8.fc13.x86_64 glibc-2.12.1-2.x86_64
libX11-1.3.1-3.fc13.x86_64 libXau-1.0.5-1.fc12.x86_64
libpng-1.2.44-1.fc13.x86_64 libxcb-1.5-1.fc13.x86_64
ncurses-libs-5.7-7.20100130.fc13.x86_64 zlib-1.2.3-23.fc12.x86_64
(gdb) bt
#0 0x0000003dc76329a5 in raise () from /lib64/libc.so.6
#1 0x0000003dc7634185 in abort () from /lib64/libc.so.6
#2 0x00000000004bf829 in qemu_get_ram_ptr (addr=<value optimized out>)
at /home/amit/src/qemu/exec.c:2936
#3 0x00000000004bf9a7 in lduw_phys (addr=<value optimized out>) at
/home/amit/src/qemu/exec.c:3836
#4 0x0000000000557c90 in vring_avail_idx (vq=0x17b9320, idx=1333) at
/home/amit/src/qemu/hw/virtio.c:133
#5 virtqueue_num_heads (vq=0x17b9320, idx=1333) at
/home/amit/src/qemu/hw/virtio.c:252
#6 0x0000000000557e5e in virtqueue_avail_bytes (vq=0x17b9320,
in_bytes=4096, out_bytes=0) at /home/amit/src/qemu/hw/virtio.c:311
- I'm using a threadlet to queue up several work items which are to be
processed in a fifo order. There's no cancel function for a threadlet
that either processes all work and then quits the thread or just
cancels all pending work and quits.
Amit
diff --git a/hw/virtio-serial-bus.c b/hw/virtio-serial-bus.c
index 74ba5ec..caaafbe 100644
--- a/hw/virtio-serial-bus.c
+++ b/hw/virtio-serial-bus.c
@@ -51,6 +51,14 @@ struct VirtIOSerial {
struct virtio_console_config config;
};
+typedef struct VirtIOSerialWork {
+ ThreadletWork work;
+ VirtIOSerialPort *port;
+ VirtQueue *vq;
+ VirtIODevice *vdev;
+ int discard;
+} VirtIOSerialWork;
+
static VirtIOSerialPort *find_port_by_id(VirtIOSerial *vser, uint32_t id)
{
VirtIOSerialPort *port;
@@ -113,10 +121,20 @@ static size_t write_to_port(VirtIOSerialPort *port,
return offset;
}
-static void do_flush_queued_data(VirtIOSerialPort *port, VirtQueue *vq,
- VirtIODevice *vdev, bool discard)
+static void async_flush_queued_data(ThreadletWork *work)
{
+ VirtIOSerialPort *port;
+ VirtIOSerialWork *vs_work;
+ VirtQueue *vq;
+ VirtIODevice *vdev;
VirtQueueElement elem;
+ int discard;
+
+ vs_work = DO_UPCAST(VirtIOSerialWork, work, work);
+ port = vs_work->port;
+ vq = vs_work->vq;
+ vdev = vs_work->vdev;
+ discard = vs_work->discard;
assert(port || discard);
assert(virtio_queue_ready(vq));
@@ -136,6 +154,24 @@ static void do_flush_queued_data(VirtIOSerialPort *port, VirtQueue *vq,
virtqueue_push(vq, &elem, 0);
}
virtio_notify(vdev, vq);
+
+ qemu_free(vs_work);
+}
+
+static void do_flush_queued_data(VirtIOSerialPort *port, VirtQueue *vq,
+ VirtIODevice *vdev, bool discard)
+{
+ VirtIOSerialWork *vs_work;
+
+ /* TODO: can just do the needful if discard is true */
+
+ vs_work = qemu_malloc(sizeof(*vs_work));
+ vs_work->work.func = async_flush_queued_data;
+ vs_work->discard = discard;
+ vs_work->vdev = vdev;
+ vs_work->vq = vq;
+ vs_work->port = port;
+ submit_threadletwork_to_queue(&port->tqueue, &vs_work->work);
}
static void flush_queued_data(VirtIOSerialPort *port, bool discard)
@@ -699,6 +735,12 @@ static int virtser_port_qdev_init(DeviceState *qdev, DeviceInfo *base)
port->ivq = port->vser->ivqs[port->id];
port->ovq = port->vser->ovqs[port->id];
+ /*
+ * Just one thread to process all the work -- we don't want guest
+ * buffers to be processed out-of-order.
+ */
+ threadlet_queue_init(&port->tqueue, 1, 1);
+
add_port(port->vser, port->id);
/* Send an update to the guest about this new port added */
@@ -717,6 +759,8 @@ static int virtser_port_qdev_exit(DeviceState *qdev)
QTAILQ_REMOVE(&vser->ports, port, next);
+ /* TODO: Cancel threadlet */
+
if (port->info->exit)
port->info->exit(dev);
diff --git a/hw/virtio-serial.h b/hw/virtio-serial.h
index ff08c40..15e0982 100644
--- a/hw/virtio-serial.h
+++ b/hw/virtio-serial.h
@@ -15,6 +15,7 @@
#ifndef _QEMU_VIRTIO_SERIAL_H
#define _QEMU_VIRTIO_SERIAL_H
+#include "qemu-threadlets.h"
#include "qdev.h"
#include "virtio.h"
@@ -88,6 +89,13 @@ struct VirtIOSerialPort {
VirtQueue *ivq, *ovq;
/*
+ * Threadlet queue for processing work for this port -- ensures we
+ * don't block the guest that writes out data and also other
+ * ports.
+ */
+ ThreadletQueue tqueue;
+
+ /*
* This name is sent to the guest and exported via sysfs.
* The guest could create symlinks based on this information.
* The name is in the reverse fqdn format, like org.qemu.console.0
^ permalink raw reply related [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] v6: [PATCH 0/3]: Threadlets: A generic task offloading framework
2010-10-20 11:57 ` [Qemu-devel] v6: [PATCH 0/3]: Threadlets: A generic task offloading framework Amit Shah
@ 2010-10-20 12:05 ` Stefan Hajnoczi
2010-10-20 13:18 ` Anthony Liguori
0 siblings, 1 reply; 38+ messages in thread
From: Stefan Hajnoczi @ 2010-10-20 12:05 UTC (permalink / raw)
To: Amit Shah; +Cc: Arun R Bharadwaj, qemu-devel
On Wed, Oct 20, 2010 at 12:57 PM, Amit Shah <amit.shah@redhat.com> wrote:
> On (Tue) Oct 19 2010 [23:12:20], Arun R Bharadwaj wrote:
>> Hi,
>>
>> This is the v6 of the patch-series to have a generic asynchronous task
>> offloading framework (called threadlets) within qemu.
>>
>> Request to consider pulling this series as discussed during the
>> Qemu-devel call.
>
> I tried this out with virtio-serial (patch below). Have a couple of
> things to note:
>
> - Guests get a SIGUSR2 on startup sometimes. This doesn't happen with
> qemu.git, so looks like it's introduced by this patchset.
>
> - After running some tests, I get an abort. I still have to look at
> what's causing it, but doesn't look like it's related to virtio-serial
> code.
>
> Program received signal SIGABRT, Aborted.
> 0x0000003dc76329a5 in raise () from /lib64/libc.so.6
> Missing separate debuginfos, use: debuginfo-install
> SDL-1.2.14-8.fc13.x86_64 glibc-2.12.1-2.x86_64
> libX11-1.3.1-3.fc13.x86_64 libXau-1.0.5-1.fc12.x86_64
> libpng-1.2.44-1.fc13.x86_64 libxcb-1.5-1.fc13.x86_64
> ncurses-libs-5.7-7.20100130.fc13.x86_64 zlib-1.2.3-23.fc12.x86_64
> (gdb) bt
> #0 0x0000003dc76329a5 in raise () from /lib64/libc.so.6
> #1 0x0000003dc7634185 in abort () from /lib64/libc.so.6
> #2 0x00000000004bf829 in qemu_get_ram_ptr (addr=<value optimized out>)
> at /home/amit/src/qemu/exec.c:2936
> #3 0x00000000004bf9a7 in lduw_phys (addr=<value optimized out>) at
> /home/amit/src/qemu/exec.c:3836
> #4 0x0000000000557c90 in vring_avail_idx (vq=0x17b9320, idx=1333) at
> /home/amit/src/qemu/hw/virtio.c:133
> #5 virtqueue_num_heads (vq=0x17b9320, idx=1333) at
> /home/amit/src/qemu/hw/virtio.c:252
> #6 0x0000000000557e5e in virtqueue_avail_bytes (vq=0x17b9320,
> in_bytes=4096, out_bytes=0) at /home/amit/src/qemu/hw/virtio.c:311
>
> - I'm using a threadlet to queue up several work items which are to be
> processed in a fifo order. There's no cancel function for a threadlet
> that either processes all work and then quits the thread or just
> cancels all pending work and quits.
>
> Amit
>
>
> diff --git a/hw/virtio-serial-bus.c b/hw/virtio-serial-bus.c
> index 74ba5ec..caaafbe 100644
> --- a/hw/virtio-serial-bus.c
> +++ b/hw/virtio-serial-bus.c
> @@ -51,6 +51,14 @@ struct VirtIOSerial {
> struct virtio_console_config config;
> };
>
> +typedef struct VirtIOSerialWork {
> + ThreadletWork work;
> + VirtIOSerialPort *port;
> + VirtQueue *vq;
> + VirtIODevice *vdev;
> + int discard;
> +} VirtIOSerialWork;
> +
> static VirtIOSerialPort *find_port_by_id(VirtIOSerial *vser, uint32_t id)
> {
> VirtIOSerialPort *port;
> @@ -113,10 +121,20 @@ static size_t write_to_port(VirtIOSerialPort *port,
> return offset;
> }
>
> -static void do_flush_queued_data(VirtIOSerialPort *port, VirtQueue *vq,
> - VirtIODevice *vdev, bool discard)
> +static void async_flush_queued_data(ThreadletWork *work)
> {
> + VirtIOSerialPort *port;
> + VirtIOSerialWork *vs_work;
> + VirtQueue *vq;
> + VirtIODevice *vdev;
> VirtQueueElement elem;
> + int discard;
> +
> + vs_work = DO_UPCAST(VirtIOSerialWork, work, work);
> + port = vs_work->port;
> + vq = vs_work->vq;
> + vdev = vs_work->vdev;
> + discard = vs_work->discard;
>
> assert(port || discard);
> assert(virtio_queue_ready(vq));
You cannot access guest memory using QEMU RAM functions (or use the
virtqueue_pop() function which uses them) from a thread without taking
the QEMU global mutex.
The abort stack trace is a result of accessing guest RAM from two
threads simultaneously.
In general it is not safe to use QEMU functions from a thread unless
they are explicitly written to work outside the QEMU global mutex.
Most functions assume the global mutex, which serializes I/O thread
and vcpu changes to global state, is held.
Stefan
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] v6: [PATCH 0/3]: Threadlets: A generic task offloading framework
2010-10-20 12:05 ` Stefan Hajnoczi
@ 2010-10-20 13:18 ` Anthony Liguori
2010-10-22 9:59 ` Amit Shah
0 siblings, 1 reply; 38+ messages in thread
From: Anthony Liguori @ 2010-10-20 13:18 UTC (permalink / raw)
To: Stefan Hajnoczi; +Cc: Amit Shah, Arun R Bharadwaj, qemu-devel
On 10/20/2010 07:05 AM, Stefan Hajnoczi wrote:
> On Wed, Oct 20, 2010 at 12:57 PM, Amit Shah<amit.shah@redhat.com> wrote:
>
>> On (Tue) Oct 19 2010 [23:12:20], Arun R Bharadwaj wrote:
>>
>>> Hi,
>>>
>>> This is the v6 of the patch-series to have a generic asynchronous task
>>> offloading framework (called threadlets) within qemu.
>>>
>>> Request to consider pulling this series as discussed during the
>>> Qemu-devel call.
>>>
>> I tried this out with virtio-serial (patch below). Have a couple of
>> things to note:
>>
>> - Guests get a SIGUSR2 on startup sometimes. This doesn't happen with
>> qemu.git, so looks like it's introduced by this patchset.
>>
>> - After running some tests, I get an abort. I still have to look at
>> what's causing it, but doesn't look like it's related to virtio-serial
>> code.
>>
>> Program received signal SIGABRT, Aborted.
>> 0x0000003dc76329a5 in raise () from /lib64/libc.so.6
>> Missing separate debuginfos, use: debuginfo-install
>> SDL-1.2.14-8.fc13.x86_64 glibc-2.12.1-2.x86_64
>> libX11-1.3.1-3.fc13.x86_64 libXau-1.0.5-1.fc12.x86_64
>> libpng-1.2.44-1.fc13.x86_64 libxcb-1.5-1.fc13.x86_64
>> ncurses-libs-5.7-7.20100130.fc13.x86_64 zlib-1.2.3-23.fc12.x86_64
>> (gdb) bt
>> #0 0x0000003dc76329a5 in raise () from /lib64/libc.so.6
>> #1 0x0000003dc7634185 in abort () from /lib64/libc.so.6
>> #2 0x00000000004bf829 in qemu_get_ram_ptr (addr=<value optimized out>)
>> at /home/amit/src/qemu/exec.c:2936
>> #3 0x00000000004bf9a7 in lduw_phys (addr=<value optimized out>) at
>> /home/amit/src/qemu/exec.c:3836
>> #4 0x0000000000557c90 in vring_avail_idx (vq=0x17b9320, idx=1333) at
>> /home/amit/src/qemu/hw/virtio.c:133
>> #5 virtqueue_num_heads (vq=0x17b9320, idx=1333) at
>> /home/amit/src/qemu/hw/virtio.c:252
>> #6 0x0000000000557e5e in virtqueue_avail_bytes (vq=0x17b9320,
>> in_bytes=4096, out_bytes=0) at /home/amit/src/qemu/hw/virtio.c:311
>>
>> - I'm using a threadlet to queue up several work items which are to be
>> processed in a fifo order. There's no cancel function for a threadlet
>> that either processes all work and then quits the thread or just
>> cancels all pending work and quits.
>>
>> Amit
>>
>>
>> diff --git a/hw/virtio-serial-bus.c b/hw/virtio-serial-bus.c
>> index 74ba5ec..caaafbe 100644
>> --- a/hw/virtio-serial-bus.c
>> +++ b/hw/virtio-serial-bus.c
>> @@ -51,6 +51,14 @@ struct VirtIOSerial {
>> struct virtio_console_config config;
>> };
>>
>> +typedef struct VirtIOSerialWork {
>> + ThreadletWork work;
>> + VirtIOSerialPort *port;
>> + VirtQueue *vq;
>> + VirtIODevice *vdev;
>> + int discard;
>> +} VirtIOSerialWork;
>> +
>> static VirtIOSerialPort *find_port_by_id(VirtIOSerial *vser, uint32_t id)
>> {
>> VirtIOSerialPort *port;
>> @@ -113,10 +121,20 @@ static size_t write_to_port(VirtIOSerialPort *port,
>> return offset;
>> }
>>
>> -static void do_flush_queued_data(VirtIOSerialPort *port, VirtQueue *vq,
>> - VirtIODevice *vdev, bool discard)
>> +static void async_flush_queued_data(ThreadletWork *work)
>> {
>> + VirtIOSerialPort *port;
>> + VirtIOSerialWork *vs_work;
>> + VirtQueue *vq;
>> + VirtIODevice *vdev;
>> VirtQueueElement elem;
>> + int discard;
>> +
>> + vs_work = DO_UPCAST(VirtIOSerialWork, work, work);
>> + port = vs_work->port;
>> + vq = vs_work->vq;
>> + vdev = vs_work->vdev;
>> + discard = vs_work->discard;
>>
>> assert(port || discard);
>> assert(virtio_queue_ready(vq));
>>
> You cannot access guest memory using QEMU RAM functions (or use the
> virtqueue_pop() function which uses them) from a thread without taking
> the QEMU global mutex.
>
> The abort stack trace is a result of accessing guest RAM from two
> threads simultaneously.
>
> In general it is not safe to use QEMU functions from a thread unless
> they are explicitly written to work outside the QEMU global mutex.
> Most functions assume the global mutex, which serializes I/O thread
> and vcpu changes to global state, is held.
>
Yes, threadlets are only meant to be used to make synchronous system
calls asynchronous. They are not meant to add parallelism to QEMU (yet).
Regards,
Anthony Liguori
> Stefan
>
>
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] v6: [PATCH 0/3]: Threadlets: A generic task offloading framework
2010-10-20 13:18 ` Anthony Liguori
@ 2010-10-22 9:59 ` Amit Shah
2010-10-23 12:05 ` Stefan Hajnoczi
0 siblings, 1 reply; 38+ messages in thread
From: Amit Shah @ 2010-10-22 9:59 UTC (permalink / raw)
To: Anthony Liguori; +Cc: Stefan Hajnoczi, qemu-devel, Arun R Bharadwaj
On (Wed) Oct 20 2010 [08:18:51], Anthony Liguori wrote:
> On 10/20/2010 07:05 AM, Stefan Hajnoczi wrote:
> >On Wed, Oct 20, 2010 at 12:57 PM, Amit Shah<amit.shah@redhat.com> wrote:
> >>On (Tue) Oct 19 2010 [23:12:20], Arun R Bharadwaj wrote:
> >>>Hi,
> >>>
> >>>This is the v6 of the patch-series to have a generic asynchronous task
> >>>offloading framework (called threadlets) within qemu.
> >>>
> >>>Request to consider pulling this series as discussed during the
> >>>Qemu-devel call.
> >>I tried this out with virtio-serial (patch below). Have a couple of
> >>things to note:
> >>
> >>- Guests get a SIGUSR2 on startup sometimes. This doesn't happen with
> >> qemu.git, so looks like it's introduced by this patchset.
> >>
> >>- After running some tests, I get an abort. I still have to look at
> >> what's causing it, but doesn't look like it's related to virtio-serial
> >> code.
> >>
> >>Program received signal SIGABRT, Aborted.
> >>0x0000003dc76329a5 in raise () from /lib64/libc.so.6
> >>Missing separate debuginfos, use: debuginfo-install
> >>SDL-1.2.14-8.fc13.x86_64 glibc-2.12.1-2.x86_64
> >>libX11-1.3.1-3.fc13.x86_64 libXau-1.0.5-1.fc12.x86_64
> >>libpng-1.2.44-1.fc13.x86_64 libxcb-1.5-1.fc13.x86_64
> >>ncurses-libs-5.7-7.20100130.fc13.x86_64 zlib-1.2.3-23.fc12.x86_64
> >>(gdb) bt
> >>#0 0x0000003dc76329a5 in raise () from /lib64/libc.so.6
> >>#1 0x0000003dc7634185 in abort () from /lib64/libc.so.6
> >>#2 0x00000000004bf829 in qemu_get_ram_ptr (addr=<value optimized out>)
> >>at /home/amit/src/qemu/exec.c:2936
> >>#3 0x00000000004bf9a7 in lduw_phys (addr=<value optimized out>) at
> >>/home/amit/src/qemu/exec.c:3836
> >>#4 0x0000000000557c90 in vring_avail_idx (vq=0x17b9320, idx=1333) at
> >>/home/amit/src/qemu/hw/virtio.c:133
> >>#5 virtqueue_num_heads (vq=0x17b9320, idx=1333) at
> >>/home/amit/src/qemu/hw/virtio.c:252
> >>#6 0x0000000000557e5e in virtqueue_avail_bytes (vq=0x17b9320,
> >>in_bytes=4096, out_bytes=0) at /home/amit/src/qemu/hw/virtio.c:311
> >>
> >>- I'm using a threadlet to queue up several work items which are to be
> >> processed in a fifo order. There's no cancel function for a threadlet
> >> that either processes all work and then quits the thread or just
> >> cancels all pending work and quits.
> >>
> >> Amit
> >>
> >>
> >>diff --git a/hw/virtio-serial-bus.c b/hw/virtio-serial-bus.c
> >>index 74ba5ec..caaafbe 100644
> >>--- a/hw/virtio-serial-bus.c
> >>+++ b/hw/virtio-serial-bus.c
> >>@@ -51,6 +51,14 @@ struct VirtIOSerial {
> >> struct virtio_console_config config;
> >> };
> >>
> >>+typedef struct VirtIOSerialWork {
> >>+ ThreadletWork work;
> >>+ VirtIOSerialPort *port;
> >>+ VirtQueue *vq;
> >>+ VirtIODevice *vdev;
> >>+ int discard;
> >>+} VirtIOSerialWork;
> >>+
> >> static VirtIOSerialPort *find_port_by_id(VirtIOSerial *vser, uint32_t id)
> >> {
> >> VirtIOSerialPort *port;
> >>@@ -113,10 +121,20 @@ static size_t write_to_port(VirtIOSerialPort *port,
> >> return offset;
> >> }
> >>
> >>-static void do_flush_queued_data(VirtIOSerialPort *port, VirtQueue *vq,
> >>- VirtIODevice *vdev, bool discard)
> >>+static void async_flush_queued_data(ThreadletWork *work)
> >> {
> >>+ VirtIOSerialPort *port;
> >>+ VirtIOSerialWork *vs_work;
> >>+ VirtQueue *vq;
> >>+ VirtIODevice *vdev;
> >> VirtQueueElement elem;
> >>+ int discard;
> >>+
> >>+ vs_work = DO_UPCAST(VirtIOSerialWork, work, work);
> >>+ port = vs_work->port;
> >>+ vq = vs_work->vq;
> >>+ vdev = vs_work->vdev;
> >>+ discard = vs_work->discard;
> >>
> >> assert(port || discard);
> >> assert(virtio_queue_ready(vq));
> >You cannot access guest memory using QEMU RAM functions (or use the
> >virtqueue_pop() function which uses them) from a thread without taking
> >the QEMU global mutex.
> >
> >The abort stack trace is a result of accessing guest RAM from two
> >threads simultaneously.
> >
> >In general it is not safe to use QEMU functions from a thread unless
> >they are explicitly written to work outside the QEMU global mutex.
> >Most functions assume the global mutex, which serializes I/O thread
> >and vcpu changes to global state, is held.
>
> Yes, threadlets are only meant to be used to make synchronous system
> calls asynchronous. They are not meant to add parallelism to QEMU
> (yet).
Yes -- I realised that. (But I don't see why the virtio rings get
modified elsewhere other than the only function I push/pop from above.)
Anyway, just one question as I've still not read the code: does a
running work item in a threadlet block migration? Do the remaining work
items in a threadlet get migrated fine?
Amit
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] v6: [PATCH 0/3]: Threadlets: A generic task offloading framework
2010-10-22 9:59 ` Amit Shah
@ 2010-10-23 12:05 ` Stefan Hajnoczi
2010-10-27 7:57 ` Amit Shah
0 siblings, 1 reply; 38+ messages in thread
From: Stefan Hajnoczi @ 2010-10-23 12:05 UTC (permalink / raw)
To: Amit Shah; +Cc: Arun R Bharadwaj, qemu-devel
On Fri, Oct 22, 2010 at 10:59 AM, Amit Shah <amit.shah@redhat.com> wrote:
> On (Wed) Oct 20 2010 [08:18:51], Anthony Liguori wrote:
>> On 10/20/2010 07:05 AM, Stefan Hajnoczi wrote:
>> >On Wed, Oct 20, 2010 at 12:57 PM, Amit Shah<amit.shah@redhat.com> wrote:
>> >>On (Tue) Oct 19 2010 [23:12:20], Arun R Bharadwaj wrote:
>> >>>Hi,
>> >>>
>> >>>This is the v6 of the patch-series to have a generic asynchronous task
>> >>>offloading framework (called threadlets) within qemu.
>> >>>
>> >>>Request to consider pulling this series as discussed during the
>> >>>Qemu-devel call.
>> >>I tried this out with virtio-serial (patch below). Have a couple of
>> >>things to note:
>> >>
>> >>- Guests get a SIGUSR2 on startup sometimes. This doesn't happen with
>> >> qemu.git, so looks like it's introduced by this patchset.
>> >>
>> >>- After running some tests, I get an abort. I still have to look at
>> >> what's causing it, but doesn't look like it's related to virtio-serial
>> >> code.
>> >>
>> >>Program received signal SIGABRT, Aborted.
>> >>0x0000003dc76329a5 in raise () from /lib64/libc.so.6
>> >>Missing separate debuginfos, use: debuginfo-install
>> >>SDL-1.2.14-8.fc13.x86_64 glibc-2.12.1-2.x86_64
>> >>libX11-1.3.1-3.fc13.x86_64 libXau-1.0.5-1.fc12.x86_64
>> >>libpng-1.2.44-1.fc13.x86_64 libxcb-1.5-1.fc13.x86_64
>> >>ncurses-libs-5.7-7.20100130.fc13.x86_64 zlib-1.2.3-23.fc12.x86_64
>> >>(gdb) bt
>> >>#0 0x0000003dc76329a5 in raise () from /lib64/libc.so.6
>> >>#1 0x0000003dc7634185 in abort () from /lib64/libc.so.6
>> >>#2 0x00000000004bf829 in qemu_get_ram_ptr (addr=<value optimized out>)
>> >>at /home/amit/src/qemu/exec.c:2936
>> >>#3 0x00000000004bf9a7 in lduw_phys (addr=<value optimized out>) at
>> >>/home/amit/src/qemu/exec.c:3836
>> >>#4 0x0000000000557c90 in vring_avail_idx (vq=0x17b9320, idx=1333) at
>> >>/home/amit/src/qemu/hw/virtio.c:133
>> >>#5 virtqueue_num_heads (vq=0x17b9320, idx=1333) at
>> >>/home/amit/src/qemu/hw/virtio.c:252
>> >>#6 0x0000000000557e5e in virtqueue_avail_bytes (vq=0x17b9320,
>> >>in_bytes=4096, out_bytes=0) at /home/amit/src/qemu/hw/virtio.c:311
>> >>
>> >>- I'm using a threadlet to queue up several work items which are to be
>> >> processed in a fifo order. There's no cancel function for a threadlet
>> >> that either processes all work and then quits the thread or just
>> >> cancels all pending work and quits.
>> >>
>> >> Amit
>> >>
>> >>
>> >>diff --git a/hw/virtio-serial-bus.c b/hw/virtio-serial-bus.c
>> >>index 74ba5ec..caaafbe 100644
>> >>--- a/hw/virtio-serial-bus.c
>> >>+++ b/hw/virtio-serial-bus.c
>> >>@@ -51,6 +51,14 @@ struct VirtIOSerial {
>> >> struct virtio_console_config config;
>> >> };
>> >>
>> >>+typedef struct VirtIOSerialWork {
>> >>+ ThreadletWork work;
>> >>+ VirtIOSerialPort *port;
>> >>+ VirtQueue *vq;
>> >>+ VirtIODevice *vdev;
>> >>+ int discard;
>> >>+} VirtIOSerialWork;
>> >>+
>> >> static VirtIOSerialPort *find_port_by_id(VirtIOSerial *vser, uint32_t id)
>> >> {
>> >> VirtIOSerialPort *port;
>> >>@@ -113,10 +121,20 @@ static size_t write_to_port(VirtIOSerialPort *port,
>> >> return offset;
>> >> }
>> >>
>> >>-static void do_flush_queued_data(VirtIOSerialPort *port, VirtQueue *vq,
>> >>- VirtIODevice *vdev, bool discard)
>> >>+static void async_flush_queued_data(ThreadletWork *work)
>> >> {
>> >>+ VirtIOSerialPort *port;
>> >>+ VirtIOSerialWork *vs_work;
>> >>+ VirtQueue *vq;
>> >>+ VirtIODevice *vdev;
>> >> VirtQueueElement elem;
>> >>+ int discard;
>> >>+
>> >>+ vs_work = DO_UPCAST(VirtIOSerialWork, work, work);
>> >>+ port = vs_work->port;
>> >>+ vq = vs_work->vq;
>> >>+ vdev = vs_work->vdev;
>> >>+ discard = vs_work->discard;
>> >>
>> >> assert(port || discard);
>> >> assert(virtio_queue_ready(vq));
>> >You cannot access guest memory using QEMU RAM functions (or use the
>> >virtqueue_pop() function which uses them) from a thread without taking
>> >the QEMU global mutex.
>> >
>> >The abort stack trace is a result of accessing guest RAM from two
>> >threads simultaneously.
>> >
>> >In general it is not safe to use QEMU functions from a thread unless
>> >they are explicitly written to work outside the QEMU global mutex.
>> >Most functions assume the global mutex, which serializes I/O thread
>> >and vcpu changes to global state, is held.
>>
>> Yes, threadlets are only meant to be used to make synchronous system
>> calls asynchronous. They are not meant to add parallelism to QEMU
>> (yet).
>
> Yes -- I realised that. (But I don't see why the virtio rings get
> modified elsewhere other than the only function I push/pop from above.)
The particular race condition triggered by virtqueue_pop() is because
guest RAM access functions try to optimize frequently accessed memory
to keep lookup time to a minimum. The current RAM block is put onto
the beginning of the lookup list so the next access to it will be
faster. When two threads are looking up guest memory simultaneously
one may be modifying the list while the other traverses it...
> Anyway, just one question as I've still not read the code: does a
> running work item in a threadlet block migration? Do the remaining work
> items in a threadlet get migrated fine?
There's no migration support in the threadlets infrastructure itself.
Threadlets users need to think about how to safely migrate, either
through cancellation or waiting for all pending work to complete. For
example, migration will quiesce aio requests using qemu_aio_flush() so
there is no outstanding work to be migrated.
Stefan
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] v6: [PATCH 0/3]: Threadlets: A generic task offloading framework
2010-10-23 12:05 ` Stefan Hajnoczi
@ 2010-10-27 7:57 ` Amit Shah
2010-10-27 8:37 ` Stefan Hajnoczi
0 siblings, 1 reply; 38+ messages in thread
From: Amit Shah @ 2010-10-27 7:57 UTC (permalink / raw)
To: Stefan Hajnoczi; +Cc: Arun R Bharadwaj, qemu-devel
On (Sat) Oct 23 2010 [13:05:48], Stefan Hajnoczi wrote:
> >> >You cannot access guest memory using QEMU RAM functions (or use the
> >> >virtqueue_pop() function which uses them) from a thread without taking
> >> >the QEMU global mutex.
> >> >
> >> >The abort stack trace is a result of accessing guest RAM from two
> >> >threads simultaneously.
> >> >
> >> >In general it is not safe to use QEMU functions from a thread unless
> >> >they are explicitly written to work outside the QEMU global mutex.
> >> >Most functions assume the global mutex, which serializes I/O thread
> >> >and vcpu changes to global state, is held.
> >>
> >> Yes, threadlets are only meant to be used to make synchronous system
> >> calls asynchronous. They are not meant to add parallelism to QEMU
> >> (yet).
> >
> > Yes -- I realised that. (But I don't see why the virtio rings get
> > modified elsewhere other than the only function I push/pop from above.)
>
> The particular race condition triggered by virtqueue_pop() is because
> guest RAM access functions try to optimize frequently accessed memory
> to keep lookup time to a minimum. The current RAM block is put onto
> the beginning of the lookup list so the next access to it will be
> faster. When two threads are looking up guest memory simultaneously
> one may be modifying the list while the other traverses it...
Right, thanks.
> > Anyway, just one question as I've still not read the code: does a
> > running work item in a threadlet block migration? Do the remaining work
> > items in a threadlet get migrated fine?
>
> There's no migration support in the threadlets infrastructure itself.
>
> Threadlets users need to think about how to safely migrate, either
> through cancellation or waiting for all pending work to complete. For
> example, migration will quiesce aio requests using qemu_aio_flush() so
> there is no outstanding work to be migrated.
The problem then is that the API doesn't have functions to either stall
all pending work in a threadlet or cancel all work or even block the
caller till all work is finished. I think the API relies on the work
signalling some sort of completion, which means callers (or threadlet
users) have to keep track of all the work scheduled, which might be
uneecessary.
Amit
^ permalink raw reply [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] v6: [PATCH 0/3]: Threadlets: A generic task offloading framework
2010-10-27 7:57 ` Amit Shah
@ 2010-10-27 8:37 ` Stefan Hajnoczi
0 siblings, 0 replies; 38+ messages in thread
From: Stefan Hajnoczi @ 2010-10-27 8:37 UTC (permalink / raw)
To: Amit Shah; +Cc: Arun R Bharadwaj, qemu-devel
On Wed, Oct 27, 2010 at 8:57 AM, Amit Shah <amit.shah@redhat.com> wrote:
> On (Sat) Oct 23 2010 [13:05:48], Stefan Hajnoczi wrote:
>> >> >You cannot access guest memory using QEMU RAM functions (or use the
>> >> >virtqueue_pop() function which uses them) from a thread without taking
>> >> >the QEMU global mutex.
>> >> >
>> >> >The abort stack trace is a result of accessing guest RAM from two
>> >> >threads simultaneously.
>> >> >
>> >> >In general it is not safe to use QEMU functions from a thread unless
>> >> >they are explicitly written to work outside the QEMU global mutex.
>> >> >Most functions assume the global mutex, which serializes I/O thread
>> >> >and vcpu changes to global state, is held.
>> >>
>> >> Yes, threadlets are only meant to be used to make synchronous system
>> >> calls asynchronous. They are not meant to add parallelism to QEMU
>> >> (yet).
>> >
>> > Yes -- I realised that. (But I don't see why the virtio rings get
>> > modified elsewhere other than the only function I push/pop from above.)
>>
>> The particular race condition triggered by virtqueue_pop() is because
>> guest RAM access functions try to optimize frequently accessed memory
>> to keep lookup time to a minimum. The current RAM block is put onto
>> the beginning of the lookup list so the next access to it will be
>> faster. When two threads are looking up guest memory simultaneously
>> one may be modifying the list while the other traverses it...
>
> Right, thanks.
>
>> > Anyway, just one question as I've still not read the code: does a
>> > running work item in a threadlet block migration? Do the remaining work
>> > items in a threadlet get migrated fine?
>>
>> There's no migration support in the threadlets infrastructure itself.
>>
>> Threadlets users need to think about how to safely migrate, either
>> through cancellation or waiting for all pending work to complete. For
>> example, migration will quiesce aio requests using qemu_aio_flush() so
>> there is no outstanding work to be migrated.
>
> The problem then is that the API doesn't have functions to either stall
> all pending work in a threadlet or cancel all work or even block the
> caller till all work is finished. I think the API relies on the work
> signalling some sort of completion, which means callers (or threadlet
> users) have to keep track of all the work scheduled, which might be
> uneecessary.
I agree that placing responsibility on the caller is not ideal in
simple cases like waiting for all work to complete. On the other hand
it allows custom behavior to be implemented since the caller has full
control.
I'd wait to see how callers actually use threadlets and deal with
migration. Patterns may emerge and can be put into common code.
Stefan
^ permalink raw reply [flat|nested] 38+ messages in thread
* [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
2010-10-26 14:14 [Qemu-devel] v8: [PATCH 0/3] " Arun R Bharadwaj
@ 2010-10-26 14:14 ` Arun R Bharadwaj
2010-10-27 9:17 ` Stefan Hajnoczi
0 siblings, 1 reply; 38+ messages in thread
From: Arun R Bharadwaj @ 2010-10-26 14:14 UTC (permalink / raw)
To: qemu-devel
From: Gautham R Shenoy <ego@in.ibm.com>
This patch makes the paio subsystem use the threadlet framework thereby
decoupling asynchronous threading framework portion out of
posix-aio-compat.c
The patch has been tested with fstress.
Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Acked-by: Balbir Singh <balbir@linux.vnet.ibm.com>
---
posix-aio-compat.c | 170 ++++++++++------------------------------------------
1 files changed, 33 insertions(+), 137 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 7b862b5..2e47736 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,6 +29,7 @@
#include "block_int.h"
#include "block/raw-posix-aio.h"
+#include "qemu-threadlets.h"
struct qemu_paiocb {
@@ -44,13 +45,13 @@ struct qemu_paiocb {
int ev_signo;
off_t aio_offset;
- QTAILQ_ENTRY(qemu_paiocb) node;
int aio_type;
ssize_t ret;
int active;
struct qemu_paiocb *next;
int async_context_id;
+ ThreadletWork work;
};
typedef struct PosixAioState {
@@ -59,15 +60,6 @@ typedef struct PosixAioState {
} PosixAioState;
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
-
#ifdef CONFIG_PREADV
static int preadv_present = 1;
#else
@@ -85,39 +77,6 @@ static void die(const char *what)
die2(errno, what);
}
-static void mutex_lock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_lock(mutex);
- if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_unlock(mutex);
- if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
- struct timespec *ts)
-{
- int ret = pthread_cond_timedwait(cond, mutex, ts);
- if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
- return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
- int ret = pthread_cond_signal(cond);
- if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
- void *(*start_routine)(void*), void *arg)
-{
- int ret = pthread_create(thread, attr, start_routine, arg);
- if (ret) die2(ret, "pthread_create");
-}
-
static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
{
int ret;
@@ -301,106 +260,53 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
return nbytes;
}
-static void *aio_thread(void *unused)
+static void aio_thread(ThreadletWork *work)
{
pid_t pid;
+ struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
+ ssize_t ret = 0;
pid = getpid();
+ aiocb->active = 1;
- while (1) {
- struct qemu_paiocb *aiocb;
- ssize_t ret = 0;
- qemu_timeval tv;
- struct timespec ts;
-
- qemu_gettimeofday(&tv);
- ts.tv_sec = tv.tv_sec + 10;
- ts.tv_nsec = 0;
-
- mutex_lock(&lock);
-
- while (QTAILQ_EMPTY(&request_list) &&
- !(ret == ETIMEDOUT)) {
- ret = cond_timedwait(&cond, &lock, &ts);
- }
-
- if (QTAILQ_EMPTY(&request_list))
- break;
-
- aiocb = QTAILQ_FIRST(&request_list);
- QTAILQ_REMOVE(&request_list, aiocb, node);
- aiocb->active = 1;
- idle_threads--;
- mutex_unlock(&lock);
-
- switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
- case QEMU_AIO_READ:
- case QEMU_AIO_WRITE:
- ret = handle_aiocb_rw(aiocb);
- break;
- case QEMU_AIO_FLUSH:
- ret = handle_aiocb_flush(aiocb);
- break;
- case QEMU_AIO_IOCTL:
- ret = handle_aiocb_ioctl(aiocb);
- break;
- default:
- fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
- ret = -EINVAL;
- break;
- }
-
- mutex_lock(&lock);
- aiocb->ret = ret;
- idle_threads++;
- mutex_unlock(&lock);
-
- if (kill(pid, aiocb->ev_signo)) die("kill failed");
+ switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+ case QEMU_AIO_READ:
+ case QEMU_AIO_WRITE:
+ ret = handle_aiocb_rw(aiocb);
+ break;
+ case QEMU_AIO_FLUSH:
+ ret = handle_aiocb_flush(aiocb);
+ break;
+ case QEMU_AIO_IOCTL:
+ ret = handle_aiocb_ioctl(aiocb);
+ break;
+ default:
+ fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+ ret = -EINVAL;
+ break;
}
- idle_threads--;
- cur_threads--;
- mutex_unlock(&lock);
-
- return NULL;
-}
+ aiocb->ret = ret;
-static void spawn_thread(void)
-{
- sigset_t set, oldset;
-
- cur_threads++;
- idle_threads++;
-
- /* block all signals */
- if (sigfillset(&set)) die("sigfillset");
- if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
- thread_create(&thread_id, &attr, aio_thread, NULL);
-
- if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+ if (kill(pid, aiocb->ev_signo)) {
+ die("kill failed");
+ }
}
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
{
aiocb->ret = -EINPROGRESS;
aiocb->active = 0;
- mutex_lock(&lock);
- if (idle_threads == 0 && cur_threads < max_threads)
- spawn_thread();
- QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
- mutex_unlock(&lock);
- cond_signal(&cond);
+
+ aiocb->work.func = aio_thread;
+ submit_threadletwork(&aiocb->work);
}
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
{
ssize_t ret;
- mutex_lock(&lock);
ret = aiocb->ret;
- mutex_unlock(&lock);
-
return ret;
}
@@ -536,14 +442,15 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
int active = 0;
- mutex_lock(&lock);
if (!acb->active) {
- QTAILQ_REMOVE(&request_list, acb, node);
- acb->ret = -ECANCELED;
+ if (!deque_threadletwork(&acb->work)) {
+ acb->ret = -ECANCELED;
+ } else {
+ active = 1;
+ }
} else if (acb->ret == -EINPROGRESS) {
active = 1;
}
- mutex_unlock(&lock);
if (active) {
/* fail safe: if the aio could not be canceled, we wait for
@@ -618,7 +525,6 @@ int paio_init(void)
struct sigaction act;
PosixAioState *s;
int fds[2];
- int ret;
if (posix_aio_state)
return 0;
@@ -645,16 +551,6 @@ int paio_init(void)
qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
posix_aio_process_queue, s);
- ret = pthread_attr_init(&attr);
- if (ret)
- die2(ret, "pthread_attr_init");
-
- ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- if (ret)
- die2(ret, "pthread_attr_setdetachstate");
-
- QTAILQ_INIT(&request_list);
-
posix_aio_state = s;
return 0;
}
^ permalink raw reply related [flat|nested] 38+ messages in thread
* [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
2010-10-21 12:10 [Qemu-devel] [PATCH 0/3]: v7: Threadlets: A generic task offloading framework Arun R Bharadwaj
@ 2010-10-21 12:10 ` Arun R Bharadwaj
2010-10-23 11:57 ` Stefan Hajnoczi
0 siblings, 1 reply; 38+ messages in thread
From: Arun R Bharadwaj @ 2010-10-21 12:10 UTC (permalink / raw)
To: qemu-devel
From: Gautham R Shenoy <ego@in.ibm.com>
This patch makes the paio subsystem use the threadlet framework thereby
decoupling asynchronous threading framework portion out of
posix-aio-compat.c
The patch has been tested with fstress.
Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Acked-by: Balbir Singh <balbir@linux.vnet.ibm.com>
---
posix-aio-compat.c | 170 ++++++++++------------------------------------------
1 files changed, 33 insertions(+), 137 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 7b862b5..2e47736 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,6 +29,7 @@
#include "block_int.h"
#include "block/raw-posix-aio.h"
+#include "qemu-threadlets.h"
struct qemu_paiocb {
@@ -44,13 +45,13 @@ struct qemu_paiocb {
int ev_signo;
off_t aio_offset;
- QTAILQ_ENTRY(qemu_paiocb) node;
int aio_type;
ssize_t ret;
int active;
struct qemu_paiocb *next;
int async_context_id;
+ ThreadletWork work;
};
typedef struct PosixAioState {
@@ -59,15 +60,6 @@ typedef struct PosixAioState {
} PosixAioState;
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
-
#ifdef CONFIG_PREADV
static int preadv_present = 1;
#else
@@ -85,39 +77,6 @@ static void die(const char *what)
die2(errno, what);
}
-static void mutex_lock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_lock(mutex);
- if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_unlock(mutex);
- if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
- struct timespec *ts)
-{
- int ret = pthread_cond_timedwait(cond, mutex, ts);
- if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
- return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
- int ret = pthread_cond_signal(cond);
- if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
- void *(*start_routine)(void*), void *arg)
-{
- int ret = pthread_create(thread, attr, start_routine, arg);
- if (ret) die2(ret, "pthread_create");
-}
-
static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
{
int ret;
@@ -301,106 +260,53 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
return nbytes;
}
-static void *aio_thread(void *unused)
+static void aio_thread(ThreadletWork *work)
{
pid_t pid;
+ struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
+ ssize_t ret = 0;
pid = getpid();
+ aiocb->active = 1;
- while (1) {
- struct qemu_paiocb *aiocb;
- ssize_t ret = 0;
- qemu_timeval tv;
- struct timespec ts;
-
- qemu_gettimeofday(&tv);
- ts.tv_sec = tv.tv_sec + 10;
- ts.tv_nsec = 0;
-
- mutex_lock(&lock);
-
- while (QTAILQ_EMPTY(&request_list) &&
- !(ret == ETIMEDOUT)) {
- ret = cond_timedwait(&cond, &lock, &ts);
- }
-
- if (QTAILQ_EMPTY(&request_list))
- break;
-
- aiocb = QTAILQ_FIRST(&request_list);
- QTAILQ_REMOVE(&request_list, aiocb, node);
- aiocb->active = 1;
- idle_threads--;
- mutex_unlock(&lock);
-
- switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
- case QEMU_AIO_READ:
- case QEMU_AIO_WRITE:
- ret = handle_aiocb_rw(aiocb);
- break;
- case QEMU_AIO_FLUSH:
- ret = handle_aiocb_flush(aiocb);
- break;
- case QEMU_AIO_IOCTL:
- ret = handle_aiocb_ioctl(aiocb);
- break;
- default:
- fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
- ret = -EINVAL;
- break;
- }
-
- mutex_lock(&lock);
- aiocb->ret = ret;
- idle_threads++;
- mutex_unlock(&lock);
-
- if (kill(pid, aiocb->ev_signo)) die("kill failed");
+ switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+ case QEMU_AIO_READ:
+ case QEMU_AIO_WRITE:
+ ret = handle_aiocb_rw(aiocb);
+ break;
+ case QEMU_AIO_FLUSH:
+ ret = handle_aiocb_flush(aiocb);
+ break;
+ case QEMU_AIO_IOCTL:
+ ret = handle_aiocb_ioctl(aiocb);
+ break;
+ default:
+ fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+ ret = -EINVAL;
+ break;
}
- idle_threads--;
- cur_threads--;
- mutex_unlock(&lock);
-
- return NULL;
-}
+ aiocb->ret = ret;
-static void spawn_thread(void)
-{
- sigset_t set, oldset;
-
- cur_threads++;
- idle_threads++;
-
- /* block all signals */
- if (sigfillset(&set)) die("sigfillset");
- if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
- thread_create(&thread_id, &attr, aio_thread, NULL);
-
- if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+ if (kill(pid, aiocb->ev_signo)) {
+ die("kill failed");
+ }
}
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
{
aiocb->ret = -EINPROGRESS;
aiocb->active = 0;
- mutex_lock(&lock);
- if (idle_threads == 0 && cur_threads < max_threads)
- spawn_thread();
- QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
- mutex_unlock(&lock);
- cond_signal(&cond);
+
+ aiocb->work.func = aio_thread;
+ submit_threadletwork(&aiocb->work);
}
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
{
ssize_t ret;
- mutex_lock(&lock);
ret = aiocb->ret;
- mutex_unlock(&lock);
-
return ret;
}
@@ -536,14 +442,15 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
int active = 0;
- mutex_lock(&lock);
if (!acb->active) {
- QTAILQ_REMOVE(&request_list, acb, node);
- acb->ret = -ECANCELED;
+ if (!deque_threadletwork(&acb->work)) {
+ acb->ret = -ECANCELED;
+ } else {
+ active = 1;
+ }
} else if (acb->ret == -EINPROGRESS) {
active = 1;
}
- mutex_unlock(&lock);
if (active) {
/* fail safe: if the aio could not be canceled, we wait for
@@ -618,7 +525,6 @@ int paio_init(void)
struct sigaction act;
PosixAioState *s;
int fds[2];
- int ret;
if (posix_aio_state)
return 0;
@@ -645,16 +551,6 @@ int paio_init(void)
qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
posix_aio_process_queue, s);
- ret = pthread_attr_init(&attr);
- if (ret)
- die2(ret, "pthread_attr_init");
-
- ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- if (ret)
- die2(ret, "pthread_attr_setdetachstate");
-
- QTAILQ_INIT(&request_list);
-
posix_aio_state = s;
return 0;
}
^ permalink raw reply related [flat|nested] 38+ messages in thread
* Re: [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
2010-10-21 12:10 ` [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets Arun R Bharadwaj
@ 2010-10-23 11:57 ` Stefan Hajnoczi
0 siblings, 0 replies; 38+ messages in thread
From: Stefan Hajnoczi @ 2010-10-23 11:57 UTC (permalink / raw)
To: Arun R Bharadwaj; +Cc: qemu-devel
On Thu, Oct 21, 2010 at 1:10 PM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
> {
> ssize_t ret;
>
> - mutex_lock(&lock);
> ret = aiocb->ret;
> - mutex_unlock(&lock);
> -
> return ret;
> }
>
> @@ -536,14 +442,15 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
> struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> int active = 0;
>
> - mutex_lock(&lock);
> if (!acb->active) {
> - QTAILQ_REMOVE(&request_list, acb, node);
> - acb->ret = -ECANCELED;
> + if (!deque_threadletwork(&acb->work)) {
> + acb->ret = -ECANCELED;
> + } else {
> + active = 1;
> + }
> } else if (acb->ret == -EINPROGRESS) {
> active = 1;
> }
> - mutex_unlock(&lock);
>
> if (active) {
> /* fail safe: if the aio could not be canceled, we wait for
Here is the assembly listing of what happens next:
454:posix-aio-compat.c **** while (qemu_paio_error(acb) == EINPROGRESS)
539 0347 48F7D8 negq %rax
540 034a 83F873 cmpl $115, %eax
541 034d 7581 jne .L46
543 .L58:
545 0350 EBFD jmp .L58
This while loop is an infinite loop. The compiler doesn't need to
load acb->ret from memory.
The reason this loop worked before threadlets was because
qemu_paio_return() used to acquire a lock to access acb->ret, forcing
the return value to be loaded each iteration:
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
{
ssize_t ret;
mutex_lock(&lock);
ret = aiocb->ret;
mutex_unlock(&lock);
return ret;
}
Please use synchronization to wait on the request like Anthony suggested.
Stefan
^ permalink raw reply [flat|nested] 38+ messages in thread
* [Qemu-devel] [PATCH 2/3]: Make paio subsystem use threadlets
2010-10-13 16:44 [Qemu-devel] [PATCH 0/3]: Threadlets: A generic task offloading framework Arun R Bharadwaj
@ 2010-10-13 16:48 ` Arun R Bharadwaj
0 siblings, 0 replies; 38+ messages in thread
From: Arun R Bharadwaj @ 2010-10-13 16:48 UTC (permalink / raw)
To: qemu-devel; +Cc: Arun Bharadwaj
* Arun R Bharadwaj <arun@linux.vnet.ibm.com> [2010-10-13 22:14:39]:
Make paio subsystem use threadlets
From: Gautham R Shenoy <ego@in.ibm.com>
This patch makes the paio subsystem use the threadlet framework thereby
decoupling asynchronous threading framework portion out of
posix-aio-compat.c
The patch has been tested with fstress.
Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
---
posix-aio-compat.c | 166 +++++++++-------------------------------------------
1 files changed, 30 insertions(+), 136 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 7b862b5..6977c18 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,6 +29,7 @@
#include "block_int.h"
#include "block/raw-posix-aio.h"
+#include "qemu-threadlets.h"
struct qemu_paiocb {
@@ -51,6 +52,7 @@ struct qemu_paiocb {
struct qemu_paiocb *next;
int async_context_id;
+ ThreadletWork work;
};
typedef struct PosixAioState {
@@ -59,15 +61,6 @@ typedef struct PosixAioState {
} PosixAioState;
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
-
#ifdef CONFIG_PREADV
static int preadv_present = 1;
#else
@@ -85,39 +78,6 @@ static void die(const char *what)
die2(errno, what);
}
-static void mutex_lock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_lock(mutex);
- if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_unlock(mutex);
- if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
- struct timespec *ts)
-{
- int ret = pthread_cond_timedwait(cond, mutex, ts);
- if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
- return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
- int ret = pthread_cond_signal(cond);
- if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
- void *(*start_routine)(void*), void *arg)
-{
- int ret = pthread_create(thread, attr, start_routine, arg);
- if (ret) die2(ret, "pthread_create");
-}
-
static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
{
int ret;
@@ -301,106 +261,51 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
return nbytes;
}
-static void *aio_thread(void *unused)
+static void aio_thread(ThreadletWork *work)
{
pid_t pid;
+ struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
+ ssize_t ret = 0;
pid = getpid();
+ aiocb->active = 1;
- while (1) {
- struct qemu_paiocb *aiocb;
- ssize_t ret = 0;
- qemu_timeval tv;
- struct timespec ts;
-
- qemu_gettimeofday(&tv);
- ts.tv_sec = tv.tv_sec + 10;
- ts.tv_nsec = 0;
-
- mutex_lock(&lock);
-
- while (QTAILQ_EMPTY(&request_list) &&
- !(ret == ETIMEDOUT)) {
- ret = cond_timedwait(&cond, &lock, &ts);
- }
-
- if (QTAILQ_EMPTY(&request_list))
- break;
-
- aiocb = QTAILQ_FIRST(&request_list);
- QTAILQ_REMOVE(&request_list, aiocb, node);
- aiocb->active = 1;
- idle_threads--;
- mutex_unlock(&lock);
-
- switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
- case QEMU_AIO_READ:
- case QEMU_AIO_WRITE:
- ret = handle_aiocb_rw(aiocb);
- break;
- case QEMU_AIO_FLUSH:
- ret = handle_aiocb_flush(aiocb);
- break;
- case QEMU_AIO_IOCTL:
- ret = handle_aiocb_ioctl(aiocb);
- break;
- default:
- fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
- ret = -EINVAL;
- break;
- }
-
- mutex_lock(&lock);
- aiocb->ret = ret;
- idle_threads++;
- mutex_unlock(&lock);
-
- if (kill(pid, aiocb->ev_signo)) die("kill failed");
+ switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+ case QEMU_AIO_READ:
+ case QEMU_AIO_WRITE:
+ ret = handle_aiocb_rw(aiocb);
+ break;
+ case QEMU_AIO_FLUSH:
+ ret = handle_aiocb_flush(aiocb);
+ break;
+ case QEMU_AIO_IOCTL:
+ ret = handle_aiocb_ioctl(aiocb);
+ break;
+ default:
+ fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+ ret = -EINVAL;
+ break;
}
- idle_threads--;
- cur_threads--;
- mutex_unlock(&lock);
+ aiocb->ret = ret;
- return NULL;
-}
-
-static void spawn_thread(void)
-{
- sigset_t set, oldset;
-
- cur_threads++;
- idle_threads++;
-
- /* block all signals */
- if (sigfillset(&set)) die("sigfillset");
- if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
- thread_create(&thread_id, &attr, aio_thread, NULL);
-
- if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+ if (kill(pid, aiocb->ev_signo)) die("kill failed");
}
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
{
aiocb->ret = -EINPROGRESS;
aiocb->active = 0;
- mutex_lock(&lock);
- if (idle_threads == 0 && cur_threads < max_threads)
- spawn_thread();
- QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
- mutex_unlock(&lock);
- cond_signal(&cond);
+
+ aiocb->work.func = aio_thread;
+ submit_threadletwork(&aiocb->work);
}
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
{
ssize_t ret;
- mutex_lock(&lock);
ret = aiocb->ret;
- mutex_unlock(&lock);
-
return ret;
}
@@ -536,14 +441,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
int active = 0;
- mutex_lock(&lock);
if (!acb->active) {
- QTAILQ_REMOVE(&request_list, acb, node);
- acb->ret = -ECANCELED;
+ if (!cancel_threadletwork(&acb->work))
+ acb->ret = -ECANCELED;
+ else
+ active = 1;
} else if (acb->ret == -EINPROGRESS) {
active = 1;
}
- mutex_unlock(&lock);
if (active) {
/* fail safe: if the aio could not be canceled, we wait for
@@ -618,7 +523,6 @@ int paio_init(void)
struct sigaction act;
PosixAioState *s;
int fds[2];
- int ret;
if (posix_aio_state)
return 0;
@@ -645,16 +549,6 @@ int paio_init(void)
qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
posix_aio_process_queue, s);
- ret = pthread_attr_init(&attr);
- if (ret)
- die2(ret, "pthread_attr_init");
-
- ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- if (ret)
- die2(ret, "pthread_attr_setdetachstate");
-
- QTAILQ_INIT(&request_list);
-
posix_aio_state = s;
return 0;
}
^ permalink raw reply related [flat|nested] 38+ messages in thread
* [Qemu-devel] [PATCH 2/3] Make paio subsystem use threadlets
2010-10-13 15:30 [Qemu-devel] v5 [PATCH 0/3] qemu: Threadlets: A generic task offloading framework Arun R Bharadwaj
@ 2010-10-13 15:31 ` Arun R Bharadwaj
0 siblings, 0 replies; 38+ messages in thread
From: Arun R Bharadwaj @ 2010-10-13 15:31 UTC (permalink / raw)
To: qemu-devel
From: Gautham R Shenoy <ego@in.ibm.com>
This patch makes the paio subsystem use the threadlet framework thereby
decoupling asynchronous threading framework portion out of
posix-aio-compat.c
The patch has been tested with fstress.
Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
---
posix-aio-compat.c | 166 +++++++++-------------------------------------------
1 files changed, 30 insertions(+), 136 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 7b862b5..6977c18 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,6 +29,7 @@
#include "block_int.h"
#include "block/raw-posix-aio.h"
+#include "qemu-threadlets.h"
struct qemu_paiocb {
@@ -51,6 +52,7 @@ struct qemu_paiocb {
struct qemu_paiocb *next;
int async_context_id;
+ ThreadletWork work;
};
typedef struct PosixAioState {
@@ -59,15 +61,6 @@ typedef struct PosixAioState {
} PosixAioState;
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
-
#ifdef CONFIG_PREADV
static int preadv_present = 1;
#else
@@ -85,39 +78,6 @@ static void die(const char *what)
die2(errno, what);
}
-static void mutex_lock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_lock(mutex);
- if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_unlock(mutex);
- if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
- struct timespec *ts)
-{
- int ret = pthread_cond_timedwait(cond, mutex, ts);
- if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
- return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
- int ret = pthread_cond_signal(cond);
- if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
- void *(*start_routine)(void*), void *arg)
-{
- int ret = pthread_create(thread, attr, start_routine, arg);
- if (ret) die2(ret, "pthread_create");
-}
-
static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
{
int ret;
@@ -301,106 +261,51 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
return nbytes;
}
-static void *aio_thread(void *unused)
+static void aio_thread(ThreadletWork *work)
{
pid_t pid;
+ struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
+ ssize_t ret = 0;
pid = getpid();
+ aiocb->active = 1;
- while (1) {
- struct qemu_paiocb *aiocb;
- ssize_t ret = 0;
- qemu_timeval tv;
- struct timespec ts;
-
- qemu_gettimeofday(&tv);
- ts.tv_sec = tv.tv_sec + 10;
- ts.tv_nsec = 0;
-
- mutex_lock(&lock);
-
- while (QTAILQ_EMPTY(&request_list) &&
- !(ret == ETIMEDOUT)) {
- ret = cond_timedwait(&cond, &lock, &ts);
- }
-
- if (QTAILQ_EMPTY(&request_list))
- break;
-
- aiocb = QTAILQ_FIRST(&request_list);
- QTAILQ_REMOVE(&request_list, aiocb, node);
- aiocb->active = 1;
- idle_threads--;
- mutex_unlock(&lock);
-
- switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
- case QEMU_AIO_READ:
- case QEMU_AIO_WRITE:
- ret = handle_aiocb_rw(aiocb);
- break;
- case QEMU_AIO_FLUSH:
- ret = handle_aiocb_flush(aiocb);
- break;
- case QEMU_AIO_IOCTL:
- ret = handle_aiocb_ioctl(aiocb);
- break;
- default:
- fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
- ret = -EINVAL;
- break;
- }
-
- mutex_lock(&lock);
- aiocb->ret = ret;
- idle_threads++;
- mutex_unlock(&lock);
-
- if (kill(pid, aiocb->ev_signo)) die("kill failed");
+ switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+ case QEMU_AIO_READ:
+ case QEMU_AIO_WRITE:
+ ret = handle_aiocb_rw(aiocb);
+ break;
+ case QEMU_AIO_FLUSH:
+ ret = handle_aiocb_flush(aiocb);
+ break;
+ case QEMU_AIO_IOCTL:
+ ret = handle_aiocb_ioctl(aiocb);
+ break;
+ default:
+ fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+ ret = -EINVAL;
+ break;
}
- idle_threads--;
- cur_threads--;
- mutex_unlock(&lock);
+ aiocb->ret = ret;
- return NULL;
-}
-
-static void spawn_thread(void)
-{
- sigset_t set, oldset;
-
- cur_threads++;
- idle_threads++;
-
- /* block all signals */
- if (sigfillset(&set)) die("sigfillset");
- if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
- thread_create(&thread_id, &attr, aio_thread, NULL);
-
- if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+ if (kill(pid, aiocb->ev_signo)) die("kill failed");
}
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
{
aiocb->ret = -EINPROGRESS;
aiocb->active = 0;
- mutex_lock(&lock);
- if (idle_threads == 0 && cur_threads < max_threads)
- spawn_thread();
- QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
- mutex_unlock(&lock);
- cond_signal(&cond);
+
+ aiocb->work.func = aio_thread;
+ submit_threadletwork(&aiocb->work);
}
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
{
ssize_t ret;
- mutex_lock(&lock);
ret = aiocb->ret;
- mutex_unlock(&lock);
-
return ret;
}
@@ -536,14 +441,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
int active = 0;
- mutex_lock(&lock);
if (!acb->active) {
- QTAILQ_REMOVE(&request_list, acb, node);
- acb->ret = -ECANCELED;
+ if (!cancel_threadletwork(&acb->work))
+ acb->ret = -ECANCELED;
+ else
+ active = 1;
} else if (acb->ret == -EINPROGRESS) {
active = 1;
}
- mutex_unlock(&lock);
if (active) {
/* fail safe: if the aio could not be canceled, we wait for
@@ -618,7 +523,6 @@ int paio_init(void)
struct sigaction act;
PosixAioState *s;
int fds[2];
- int ret;
if (posix_aio_state)
return 0;
@@ -645,16 +549,6 @@ int paio_init(void)
qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
posix_aio_process_queue, s);
- ret = pthread_attr_init(&attr);
- if (ret)
- die2(ret, "pthread_attr_init");
-
- ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- if (ret)
- die2(ret, "pthread_attr_setdetachstate");
-
- QTAILQ_INIT(&request_list);
-
posix_aio_state = s;
return 0;
}
^ permalink raw reply related [flat|nested] 38+ messages in thread