* [Qemu-devel] [PATCH V5 0/2] Threadlets: A generic task offloading framework
@ 2010-06-22 11:18 Gautham R Shenoy
2010-06-22 11:18 ` [Qemu-devel] [PATCH V5 1/2] qemu: Generic task offloading framework: threadlets Gautham R Shenoy
` (2 more replies)
0 siblings, 3 replies; 5+ messages in thread
From: Gautham R Shenoy @ 2010-06-22 11:18 UTC (permalink / raw)
To: Qemu-development List
Cc: Anthony Liguori, Avi Kivity, Corentin Chary, Paolo Bonzini
Hi,
This is the v5 of the patch-series to have a generic asynchronous task
offloading framework (called threadlets) within qemu.
V4 can be found here:
http://lists.gnu.org/archive/html/qemu-devel/2010-06/msg02152.html
Changes from V4:
=====================================================================
- Updated the Makefile to make the object files depend on CONFIG_POSIX.
- Did away with the pthread_barrier_init() and pthread_barrier_wait().
- Did away with flush_threadlet_queue() helper.
- Fixed the attribution in copyright, helper names.
- Fixed the idle-thread-exit logic.
- Added documentation in docs/
Description
=====================================================================
This patch series decouples the asynchronous threading framework
implementation from posix-aio-compat.c to implement a generic asynchronous
task offloading threading framework called threadlets which can be used
by other subsystems within QEMU.
Currently within QEMU, the AIO subsystem (paio) creates a bunch of
asynchronous threads to offload any blocking operations so that
the vcpu threads and the IO thread can go back to servicing any
other guest requests.
This offloading framework can be used by subsystems such as virtio-9p,
Asynchronous encoding for vnc-server, so that the vcpu thread can offload
blocking operations on to the asynchronous threads and resume servicing
any other guest requests. The asynchronous threads, after
finishing the blocking operations can then transfer the control over
to the IO thread so that the latter can handle the post_blocking_operation().
The patch series passed fsstress test without any issues.
Could it be considered for inclusion ?
---
Aneesh Kumar K.V (1):
qemu: Generic task offloading framework: threadlets
Gautham R Shenoy (1):
qemu: Convert AIO code to use threadlets.
Makefile.objs | 3 +
docs/async-support.txt | 141 ++++++++++++++++++++++++++++++++++++++++
posix-aio-compat.c | 152 +++++++------------------------------------
qemu-threadlets.c | 170 ++++++++++++++++++++++++++++++++++++++++++++++++
qemu-threadlets.h | 48 ++++++++++++++
5 files changed, 385 insertions(+), 129 deletions(-)
create mode 100644 docs/async-support.txt
create mode 100644 qemu-threadlets.c
create mode 100644 qemu-threadlets.h
--
Thanks and Regards
gautham.
^ permalink raw reply [flat|nested] 5+ messages in thread
* [Qemu-devel] [PATCH V5 1/2] qemu: Generic task offloading framework: threadlets
2010-06-22 11:18 [Qemu-devel] [PATCH V5 0/2] Threadlets: A generic task offloading framework Gautham R Shenoy
@ 2010-06-22 11:18 ` Gautham R Shenoy
2010-06-22 13:00 ` [Qemu-devel] " Paolo Bonzini
2010-06-22 11:18 ` [Qemu-devel] [PATCH V5 2/2] qemu: Convert AIO code to use threadlets Gautham R Shenoy
2010-07-09 5:47 ` [Qemu-devel] [PATCH V5 0/2] Threadlets: A generic task offloading framework Gautham R Shenoy
2 siblings, 1 reply; 5+ messages in thread
From: Gautham R Shenoy @ 2010-06-22 11:18 UTC (permalink / raw)
To: Qemu-development List
Cc: Anthony Liguori, Paolo Bonzini, Aneesh Kumar K.V, Corentin Chary,
Avi Kivity
From: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
This patch creates a generic asynchronous-task-offloading infrastructure named
threadlets. The core idea has been borrowed from the threading framework that
is being used by paio.
The reason for creating this generic infrastructure is so that other subsystems,
such as virtio-9p could make use of it for offloading tasks that could block.
The patch creates a global queue on-to which subsystems can queue their tasks to
be executed asynchronously.
The patch also provides API's that allow a subsystem to create a private queue
with an associated pool of threads.
[ego@in.ibm.com: Facelift of the code, Documentation, cancel_threadlet
and other helpers]
Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
---
Makefile.objs | 3 +
docs/async-support.txt | 141 ++++++++++++++++++++++++++++++++++++++++
qemu-threadlets.c | 170 ++++++++++++++++++++++++++++++++++++++++++++++++
qemu-threadlets.h | 48 ++++++++++++++
4 files changed, 361 insertions(+), 1 deletions(-)
create mode 100644 docs/async-support.txt
create mode 100644 qemu-threadlets.c
create mode 100644 qemu-threadlets.h
diff --git a/Makefile.objs b/Makefile.objs
index 1a942e5..5d48ace 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -9,6 +9,8 @@ qobject-obj-y += qerror.o
block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
+block-obj-$(CONFIG_POSIX) += qemu-thread.o
+block-obj-$(CONFIG_POSIX) += qemu-threadlets.o
block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
@@ -109,7 +111,6 @@ common-obj-y += iov.o
common-obj-$(CONFIG_VNC_TLS) += vnc-tls.o vnc-auth-vencrypt.o
common-obj-$(CONFIG_VNC_SASL) += vnc-auth-sasl.o
common-obj-$(CONFIG_COCOA) += cocoa.o
-common-obj-$(CONFIG_IOTHREAD) += qemu-thread.o
common-obj-y += notify.o event_notifier.o
common-obj-y += qemu-timer.o
diff --git a/docs/async-support.txt b/docs/async-support.txt
new file mode 100644
index 0000000..0f12c8d
--- /dev/null
+++ b/docs/async-support.txt
@@ -0,0 +1,141 @@
+== How to use the various asynchronous models supported in Qemu ==
+
+== Threadlets ==
+
+Q.1: What are threadlets ?
+A.1: Threadlets is an infrastructure within QEMU that allows other subsystems
+ to offload possibly blocking work to a queue to be processed by a pool
+ of threads asynchrnonously.
+
+Q.2: When would one want to use threadlets ?
+A.2: Threadlets are useful when there are operations that can be performed
+ outside the context of the VCPU/IO threads inorder to free these latter
+ to service any other guest requests.
+
+Q.3: I have some work that can be executed in an asynchronous context. How
+ should I go about it ?
+A.3: One could follow the steps listed below:
+
+ - Define a function which would do the asynchrnous work.
+ void my_threadlet_func(ThreadletWork *work)
+ {
+ }
+
+ - Declare an object of type ThreadletWork;
+ ThreadletWork work;
+
+
+ - Assign a value to the "func" member of ThreadletWork object.
+ work.func = my_threadlet_func;
+
+ - Submit the threadlet to the global queue.
+ submit_threadlet(&work);
+
+ - Continue servicing some other guest operations.
+
+Q.4: I want to my_threadlet_func to access some non-global data. How do I do
+ that ?
+A.4: Suppose you want my_threadlet_func to access some non-global data-object
+ of type myPrivateData. In that case one could follow the following steps.
+
+ - Define a member of the type ThreadletWork within myPrivateData.
+ typdef myPrivateData {
+ ...;
+ ...;
+ ...;
+ ThreadletWork work;
+ } myPrivateData;
+
+ myPrivateData myData;
+
+ - Initialize myData.work as described in A.3
+ myData.work.func = my_threadlet_func;
+ submit_threadlet(&myData.work);
+
+ - Access the myData object inside my_threadlet_func() using container_of
+ primitive
+ void my_threadlet_func(ThreadletWork *work)
+ {
+ myPrivateData *mydata_ptr;
+ mydata_ptr = container_of(work, myPrivateData, work);
+
+ /* mydata_ptr now points to myData object */
+ }
+
+Q.5: Are there any precautions one must take while sharing data with the
+ Asynchrnous thread-pool ?
+A.5: Yes, make sure that the helper function of the type my_threadlet_func()
+ does not access/modify data when it can be accessed or modified in the
+ context of VCPU thread or IO thread. This is because the asynchronous
+ threads in the pool can run in parallel with the VCPU/IOThreads as shown
+ in the figure.
+
+ A typical workflow is as follows:
+
+ VCPU/IOThread
+ |
+ | (1)
+ |
+ V
+ Offload work (2)
+ |-------> to threadlets -----------------------------> Helper thread
+ | | |
+ | | |
+ | | (3) | (4)
+ | | |
+ | Handle other Guest requests |
+ | | |
+ | | V
+ | | (3) Signal the I/O Thread
+ |(6) | |
+ | | /
+ | | /
+ | V /
+ | Do the post <---------------------------------/
+ | processing (5)
+ | |
+ | | (6)
+ | V
+ |-Yes------ More async work?
+ |
+ | (7)
+ No
+ |
+ |
+ .
+ .
+
+ Hence one needs to make sure that in the steps (3) and (4) which run in
+ parallel, any global data is accessed within only one context.
+
+Q.6: I have queued a threadlet which I want to cancel. How do I do that ?
+A.6: Threadlets framework provides the API cancel_threadlet:
+ - int cancel_threadlet(ThreadletWork *work)
+
+ The API scans the ThreadletQueue to see if (work) is present. If it finds
+ work, it'll dequeue work and return 0.
+
+ On the other hand, if it does not find the (work) in the ThreadletQueue,
+ then it'll return 1. This can imply two things. Either the work is being
+ processed by one of the helper threads or it has been processed. The
+ threadlet infrastructure currently _does_not_ distinguish between these
+ two and the onus is on the caller to do that.
+
+Q.7: Apart from the global pool of threads, can I have my own private Queue ?
+A.7: Yes, the threadlets framework allows subsystems to create their own private
+ queues with associated pools of threads.
+
+ - Define a PrivateQueue
+ ThreadletQueue myQueue;
+
+ - Initialize it:
+ threadlet_queue_init(&myQueue, my_max_threads, my_min_threads);
+ where my_max_threads is the maximum number of threads that can be in the
+ thread pool and my_min_threads is the minimum number of active threads
+ that can be in the thread-pool.
+
+ - Submit work:
+ submit_threadlet_to_queue(&myQueue, &my_work);
+
+ - Cancel work:
+ cancel_threadlet_on_queue(&myQueue, &my_work);
diff --git a/qemu-threadlets.c b/qemu-threadlets.c
new file mode 100644
index 0000000..13f6bb8
--- /dev/null
+++ b/qemu-threadlets.c
@@ -0,0 +1,170 @@
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ * Anthony Liguori <aliguori@us.ibm.com>
+ * Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
+ * Gautham R Shenoy <ego@in.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ */
+
+#include "qemu-threadlets.h"
+#include "osdep.h"
+
+#define MAX_GLOBAL_THREADS 64
+#define MIN_GLOBAL_THREADS 64
+ThreadletQueue globalqueue;
+static int globalqueue_init;
+
+static void *threadlet_worker(void *data)
+{
+ ThreadletQueue *queue = data;
+
+ while (1) {
+ ThreadletWork *work;
+ int ret = 0;
+ qemu_mutex_lock(&(queue->lock));
+
+ while (QTAILQ_EMPTY(&(queue->request_list)) &&
+ (ret != ETIMEDOUT)) {
+ ret = qemu_cond_timedwait(&(queue->cond),
+ &(queue->lock), 10*100000);
+ }
+
+ assert(queue->idle_threads != 0);
+ if (QTAILQ_EMPTY(&(queue->request_list))) {
+ if (queue->cur_threads > queue->min_threads) {
+ /* We retain the minimum number of threads */
+ break;
+ }
+ } else {
+ work = QTAILQ_FIRST(&(queue->request_list));
+ QTAILQ_REMOVE(&(queue->request_list), work, node);
+
+ queue->idle_threads--;
+ qemu_mutex_unlock(&(queue->lock));
+
+ /* execute the work function */
+ work->func(work);
+
+ qemu_mutex_lock(&(queue->lock));
+ queue->idle_threads++;
+ }
+ qemu_mutex_unlock(&(queue->lock));
+ }
+
+ queue->idle_threads--;
+ queue->cur_threads--;
+ qemu_mutex_unlock(&queue->lock);
+
+ return NULL;
+}
+
+static void spawn_threadlet(ThreadletQueue *queue)
+{
+ QemuThread thread;
+
+ queue->cur_threads++;
+ queue->idle_threads++;
+
+ qemu_thread_create(&thread, threadlet_worker, queue);
+}
+
+/**
+ * submit_threadlet_to_queue: Submit a new task to a private queue to be
+ * executed asynchronously.
+ * @queue: Per-subsystem private queue to which the new task needs
+ * to be submitted.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_threadlet_to_queue(ThreadletQueue *queue, ThreadletWork *work)
+{
+ qemu_mutex_lock(&(queue->lock));
+ if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+ spawn_threadlet(queue);
+ }
+ QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
+ qemu_mutex_unlock(&(queue->lock));
+ qemu_cond_signal(&(queue->cond));
+}
+
+/**
+ * submit_threadlet: Submit to the global queue a new task to be executed
+ * asynchronously.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_threadlet(ThreadletWork *work)
+{
+ if (unlikely(!globalqueue_init)) {
+ threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
+ MIN_GLOBAL_THREADS);
+ globalqueue_init = 1;
+ }
+
+ submit_threadlet_to_queue(&globalqueue, work);
+}
+
+/**
+ * cancel_threadlet_on_queue: Cancel a task queued on a Queue.
+ * @queue: The queue containing the task to be cancelled.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ * 1 otherwise.
+ */
+int cancel_threadlet_on_queue(ThreadletQueue *queue, ThreadletWork *work)
+{
+ ThreadletWork *ret_work;
+ int found = 0;
+
+ qemu_mutex_lock(&(queue->lock));
+ QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
+ if (ret_work == work) {
+ QTAILQ_REMOVE(&(queue->request_list), ret_work, node);
+ found = 1;
+ break;
+ }
+ }
+ qemu_mutex_unlock(&(queue->lock));
+
+ if (found) {
+ return 0;
+ }
+
+ return 1;
+}
+
+/**
+ * cancel_threadlet: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ * 1 otherwise.
+ */
+int cancel_threadlet(ThreadletWork *work)
+{
+ return cancel_threadlet_on_queue(&globalqueue, work);
+}
+
+/**
+ * threadlet_queue_init: Initialize a threadlet queue.
+ * @queue: The threadlet queue to be initialized.
+ * @max_threads: Maximum number of threads processing the queue.
+ * @min_threads: Minimum number of threads processing the queue.
+ */
+void threadlet_queue_init(ThreadletQueue *queue,
+ int max_threads, int min_threads)
+{
+ queue->cur_threads = 0;
+ queue->idle_threads = 0;
+ queue->max_threads = max_threads;
+ queue->min_threads = min_threads;
+ QTAILQ_INIT(&(queue->request_list));
+ qemu_mutex_init(&(queue->lock));
+ qemu_cond_init(&(queue->cond));
+}
diff --git a/qemu-threadlets.h b/qemu-threadlets.h
new file mode 100644
index 0000000..d58c10f
--- /dev/null
+++ b/qemu-threadlets.h
@@ -0,0 +1,48 @@
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ * Anthony Liguori <aliguori@us.ibm.com>
+ * Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
+ * Gautham R Shenoy <ego@in.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ */
+
+#ifndef QEMU_ASYNC_WORK_H
+#define QEMU_ASYNC_WORK_H
+
+#include "qemu-queue.h"
+#include "qemu-common.h"
+#include "qemu-thread.h"
+
+typedef struct ThreadletQueue
+{
+ QemuMutex lock;
+ QemuCond cond;
+ int max_threads;
+ int min_threads;
+ int cur_threads;
+ int idle_threads;
+ QTAILQ_HEAD(, threadlet_work) request_list;
+} ThreadletQueue;
+
+typedef struct threadlet_work
+{
+ QTAILQ_ENTRY(threadlet_work) node;
+ void (*func)(struct threadlet_work *work);
+} ThreadletWork;
+
+extern void submit_threadlet_to_queue(ThreadletQueue *queue,
+ ThreadletWork *work);
+extern void submit_threadlet(ThreadletWork *work);
+extern int cancel_threadlet_on_queue(ThreadletQueue *queue,
+ ThreadletWork *work);
+extern int cancel_threadlet(ThreadletWork *work);
+extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
+ int min_threads);
+#endif
^ permalink raw reply related [flat|nested] 5+ messages in thread
* [Qemu-devel] [PATCH V5 2/2] qemu: Convert AIO code to use threadlets.
2010-06-22 11:18 [Qemu-devel] [PATCH V5 0/2] Threadlets: A generic task offloading framework Gautham R Shenoy
2010-06-22 11:18 ` [Qemu-devel] [PATCH V5 1/2] qemu: Generic task offloading framework: threadlets Gautham R Shenoy
@ 2010-06-22 11:18 ` Gautham R Shenoy
2010-07-09 5:47 ` [Qemu-devel] [PATCH V5 0/2] Threadlets: A generic task offloading framework Gautham R Shenoy
2 siblings, 0 replies; 5+ messages in thread
From: Gautham R Shenoy @ 2010-06-22 11:18 UTC (permalink / raw)
To: Qemu-development List
Cc: Anthony Liguori, Avi Kivity, Corentin Chary, Paolo Bonzini
This patch makes the paio subsystem use the threadlet framework thereby
decoupling asynchronous threading framework portion out of posix-aio-compat.c
The patch has been tested with fstress.
Signed-off-by: Gautham R Shenoy <ego@in.ibm.com>
---
posix-aio-compat.c | 152 ++++++++--------------------------------------------
1 files changed, 24 insertions(+), 128 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index b43c531..370b679 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -28,6 +28,7 @@
#include "block_int.h"
#include "block/raw-posix-aio.h"
+#include "qemu-threadlets.h"
struct qemu_paiocb {
@@ -50,6 +51,7 @@ struct qemu_paiocb {
struct qemu_paiocb *next;
int async_context_id;
+ ThreadletWork work;
};
typedef struct PosixAioState {
@@ -57,16 +59,6 @@ typedef struct PosixAioState {
struct qemu_paiocb *first_aio;
} PosixAioState;
-
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
-
#ifdef CONFIG_PREADV
static int preadv_present = 1;
#else
@@ -84,39 +76,6 @@ static void die(const char *what)
die2(errno, what);
}
-static void mutex_lock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_lock(mutex);
- if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_unlock(mutex);
- if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
- struct timespec *ts)
-{
- int ret = pthread_cond_timedwait(cond, mutex, ts);
- if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
- return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
- int ret = pthread_cond_signal(cond);
- if (ret) die2(ret, "pthread_cond_signal");
-}
-
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
- void *(*start_routine)(void*), void *arg)
-{
- int ret = pthread_create(thread, attr, start_routine, arg);
- if (ret) die2(ret, "pthread_create");
-}
-
static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
{
int ret;
@@ -300,47 +259,27 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
return nbytes;
}
-static void *aio_thread(void *unused)
+static void aio_thread(ThreadletWork *work)
{
- pid_t pid;
-
- pid = getpid();
-
- while (1) {
- struct qemu_paiocb *aiocb;
- ssize_t ret = 0;
- qemu_timeval tv;
- struct timespec ts;
-
- qemu_gettimeofday(&tv);
- ts.tv_sec = tv.tv_sec + 10;
- ts.tv_nsec = 0;
- mutex_lock(&lock);
+ pid_t pid;
- while (QTAILQ_EMPTY(&request_list) &&
- !(ret == ETIMEDOUT)) {
- ret = cond_timedwait(&cond, &lock, &ts);
- }
+ struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
+ ssize_t ret = 0;
- if (QTAILQ_EMPTY(&request_list))
- break;
+ pid = getpid();
- aiocb = QTAILQ_FIRST(&request_list);
- QTAILQ_REMOVE(&request_list, aiocb, node);
- aiocb->active = 1;
- idle_threads--;
- mutex_unlock(&lock);
+ aiocb->active = 1;
- switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
- case QEMU_AIO_READ:
- case QEMU_AIO_WRITE:
+ switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+ case QEMU_AIO_READ:
+ case QEMU_AIO_WRITE:
ret = handle_aiocb_rw(aiocb);
break;
- case QEMU_AIO_FLUSH:
- ret = handle_aiocb_flush(aiocb);
- break;
- case QEMU_AIO_IOCTL:
+ case QEMU_AIO_FLUSH:
+ ret = handle_aiocb_flush(aiocb);
+ break;
+ case QEMU_AIO_IOCTL:
ret = handle_aiocb_ioctl(aiocb);
break;
default:
@@ -349,57 +288,25 @@ static void *aio_thread(void *unused)
break;
}
- mutex_lock(&lock);
- aiocb->ret = ret;
- idle_threads++;
- mutex_unlock(&lock);
+ aiocb->ret = ret;
- if (kill(pid, aiocb->ev_signo)) die("kill failed");
- }
-
- idle_threads--;
- cur_threads--;
- mutex_unlock(&lock);
-
- return NULL;
-}
-
-static void spawn_thread(void)
-{
- sigset_t set, oldset;
-
- cur_threads++;
- idle_threads++;
-
- /* block all signals */
- if (sigfillset(&set)) die("sigfillset");
- if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
- thread_create(&thread_id, &attr, aio_thread, NULL);
-
- if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+ if (kill(pid, aiocb->ev_signo)) die("kill failed");
}
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
{
aiocb->ret = -EINPROGRESS;
aiocb->active = 0;
- mutex_lock(&lock);
- if (idle_threads == 0 && cur_threads < max_threads)
- spawn_thread();
- QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
- mutex_unlock(&lock);
- cond_signal(&cond);
+
+ aiocb->work.func = aio_thread;
+ submit_threadlet(&aiocb->work);
}
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
{
ssize_t ret;
- mutex_lock(&lock);
ret = aiocb->ret;
- mutex_unlock(&lock);
-
return ret;
}
@@ -535,14 +442,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
int active = 0;
- mutex_lock(&lock);
if (!acb->active) {
- QTAILQ_REMOVE(&request_list, acb, node);
- acb->ret = -ECANCELED;
+ if (!cancel_threadlet(&acb->work))
+ acb->ret = -ECANCELED;
+ else
+ active = 1;
} else if (acb->ret == -EINPROGRESS) {
active = 1;
}
- mutex_unlock(&lock);
if (active) {
/* fail safe: if the aio could not be canceled, we wait for
@@ -615,7 +522,6 @@ int paio_init(void)
struct sigaction act;
PosixAioState *s;
int fds[2];
- int ret;
if (posix_aio_state)
return 0;
@@ -642,16 +548,6 @@ int paio_init(void)
qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
posix_aio_process_queue, s);
- ret = pthread_attr_init(&attr);
- if (ret)
- die2(ret, "pthread_attr_init");
-
- ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- if (ret)
- die2(ret, "pthread_attr_setdetachstate");
-
- QTAILQ_INIT(&request_list);
-
posix_aio_state = s;
return 0;
}
^ permalink raw reply related [flat|nested] 5+ messages in thread
* [Qemu-devel] Re: [PATCH V5 1/2] qemu: Generic task offloading framework: threadlets
2010-06-22 11:18 ` [Qemu-devel] [PATCH V5 1/2] qemu: Generic task offloading framework: threadlets Gautham R Shenoy
@ 2010-06-22 13:00 ` Paolo Bonzini
0 siblings, 0 replies; 5+ messages in thread
From: Paolo Bonzini @ 2010-06-22 13:00 UTC (permalink / raw)
To: Gautham R Shenoy
Cc: Anthony Liguori, Avi Kivity, Qemu-development List,
Corentin Chary, Aneesh Kumar K.V
On 06/22/2010 01:18 PM, Gautham R Shenoy wrote:
> From: Aneesh Kumar K.V<aneesh.kumar@linux.vnet.ibm.com>
>
> This patch creates a generic asynchronous-task-offloading infrastructure named
> threadlets. The core idea has been borrowed from the threading framework that
> is being used by paio.
>
> The reason for creating this generic infrastructure is so that other subsystems,
> such as virtio-9p could make use of it for offloading tasks that could block.
>
> The patch creates a global queue on-to which subsystems can queue their tasks to
> be executed asynchronously.
>
> The patch also provides API's that allow a subsystem to create a private queue
> with an associated pool of threads.
>
> [ego@in.ibm.com: Facelift of the code, Documentation, cancel_threadlet
> and other helpers]
>
> Signed-off-by: Aneesh Kumar K.V<aneesh.kumar@linux.vnet.ibm.com>
> Signed-off-by: Gautham R Shenoy<ego@in.ibm.com>
> ---
> Makefile.objs | 3 +
> docs/async-support.txt | 141 ++++++++++++++++++++++++++++++++++++++++
> qemu-threadlets.c | 170 ++++++++++++++++++++++++++++++++++++++++++++++++
> qemu-threadlets.h | 48 ++++++++++++++
> 4 files changed, 361 insertions(+), 1 deletions(-)
> create mode 100644 docs/async-support.txt
> create mode 100644 qemu-threadlets.c
> create mode 100644 qemu-threadlets.h
>
> diff --git a/Makefile.objs b/Makefile.objs
> index 1a942e5..5d48ace 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -9,6 +9,8 @@ qobject-obj-y += qerror.o
>
> block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
> block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
> +block-obj-$(CONFIG_POSIX) += qemu-thread.o
> +block-obj-$(CONFIG_POSIX) += qemu-threadlets.o
> block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
> block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>
> @@ -109,7 +111,6 @@ common-obj-y += iov.o
> common-obj-$(CONFIG_VNC_TLS) += vnc-tls.o vnc-auth-vencrypt.o
> common-obj-$(CONFIG_VNC_SASL) += vnc-auth-sasl.o
> common-obj-$(CONFIG_COCOA) += cocoa.o
> -common-obj-$(CONFIG_IOTHREAD) += qemu-thread.o
> common-obj-y += notify.o event_notifier.o
> common-obj-y += qemu-timer.o
>
> diff --git a/docs/async-support.txt b/docs/async-support.txt
> new file mode 100644
> index 0000000..0f12c8d
> --- /dev/null
> +++ b/docs/async-support.txt
> @@ -0,0 +1,141 @@
> +== How to use the various asynchronous models supported in Qemu ==
> +
> +== Threadlets ==
> +
> +Q.1: What are threadlets ?
> +A.1: Threadlets is an infrastructure within QEMU that allows other subsystems
> + to offload possibly blocking work to a queue to be processed by a pool
> + of threads asynchrnonously.
> +
> +Q.2: When would one want to use threadlets ?
> +A.2: Threadlets are useful when there are operations that can be performed
> + outside the context of the VCPU/IO threads inorder to free these latter
> + to service any other guest requests.
> +
> +Q.3: I have some work that can be executed in an asynchronous context. How
> + should I go about it ?
> +A.3: One could follow the steps listed below:
> +
> + - Define a function which would do the asynchrnous work.
> + void my_threadlet_func(ThreadletWork *work)
> + {
> + }
> +
> + - Declare an object of type ThreadletWork;
> + ThreadletWork work;
> +
> +
> + - Assign a value to the "func" member of ThreadletWork object.
> + work.func = my_threadlet_func;
> +
> + - Submit the threadlet to the global queue.
> + submit_threadlet(&work);
> +
> + - Continue servicing some other guest operations.
> +
> +Q.4: I want to my_threadlet_func to access some non-global data. How do I do
> + that ?
> +A.4: Suppose you want my_threadlet_func to access some non-global data-object
> + of type myPrivateData. In that case one could follow the following steps.
> +
> + - Define a member of the type ThreadletWork within myPrivateData.
> + typdef myPrivateData {
> + ...;
> + ...;
> + ...;
> + ThreadletWork work;
> + } myPrivateData;
> +
> + myPrivateData myData;
> +
> + - Initialize myData.work as described in A.3
> + myData.work.func = my_threadlet_func;
> + submit_threadlet(&myData.work);
> +
> + - Access the myData object inside my_threadlet_func() using container_of
> + primitive
> + void my_threadlet_func(ThreadletWork *work)
> + {
> + myPrivateData *mydata_ptr;
> + mydata_ptr = container_of(work, myPrivateData, work);
> +
> + /* mydata_ptr now points to myData object */
> + }
> +
> +Q.5: Are there any precautions one must take while sharing data with the
> + Asynchrnous thread-pool ?
> +A.5: Yes, make sure that the helper function of the type my_threadlet_func()
> + does not access/modify data when it can be accessed or modified in the
> + context of VCPU thread or IO thread. This is because the asynchronous
> + threads in the pool can run in parallel with the VCPU/IOThreads as shown
> + in the figure.
> +
> + A typical workflow is as follows:
> +
> + VCPU/IOThread
> + |
> + | (1)
> + |
> + V
> + Offload work (2)
> + |-------> to threadlets -----------------------------> Helper thread
> + | | |
> + | | |
> + | | (3) | (4)
> + | | |
> + | Handle other Guest requests |
> + | | |
> + | | V
> + | | (3) Signal the I/O Thread
> + |(6) | |
> + | | /
> + | | /
> + | V /
> + | Do the post<---------------------------------/
> + | processing (5)
> + | |
> + | | (6)
> + | V
> + |-Yes------ More async work?
> + |
> + | (7)
> + No
> + |
> + |
> + .
> + .
> +
> + Hence one needs to make sure that in the steps (3) and (4) which run in
> + parallel, any global data is accessed within only one context.
> +
> +Q.6: I have queued a threadlet which I want to cancel. How do I do that ?
> +A.6: Threadlets framework provides the API cancel_threadlet:
> + - int cancel_threadlet(ThreadletWork *work)
> +
> + The API scans the ThreadletQueue to see if (work) is present. If it finds
> + work, it'll dequeue work and return 0.
> +
> + On the other hand, if it does not find the (work) in the ThreadletQueue,
> + then it'll return 1. This can imply two things. Either the work is being
> + processed by one of the helper threads or it has been processed. The
> + threadlet infrastructure currently _does_not_ distinguish between these
> + two and the onus is on the caller to do that.
> +
> +Q.7: Apart from the global pool of threads, can I have my own private Queue ?
> +A.7: Yes, the threadlets framework allows subsystems to create their own private
> + queues with associated pools of threads.
> +
> + - Define a PrivateQueue
> + ThreadletQueue myQueue;
> +
> + - Initialize it:
> + threadlet_queue_init(&myQueue, my_max_threads, my_min_threads);
> + where my_max_threads is the maximum number of threads that can be in the
> + thread pool and my_min_threads is the minimum number of active threads
> + that can be in the thread-pool.
> +
> + - Submit work:
> + submit_threadlet_to_queue(&myQueue,&my_work);
> +
> + - Cancel work:
> + cancel_threadlet_on_queue(&myQueue,&my_work);
> diff --git a/qemu-threadlets.c b/qemu-threadlets.c
> new file mode 100644
> index 0000000..13f6bb8
> --- /dev/null
> +++ b/qemu-threadlets.c
> @@ -0,0 +1,170 @@
> +/*
> + * Threadlet support for offloading tasks to be executed asynchronously
> + *
> + * Copyright IBM, Corp. 2008
> + * Copyright IBM, Corp. 2010
> + *
> + * Authors:
> + * Anthony Liguori<aliguori@us.ibm.com>
> + * Aneesh Kumar K.V<aneesh.kumar@linux.vnet.ibm.com>
> + * Gautham R Shenoy<ego@in.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2. See
> + * the COPYING file in the top-level directory.
> + */
> +
> +#include "qemu-threadlets.h"
> +#include "osdep.h"
> +
> +#define MAX_GLOBAL_THREADS 64
> +#define MIN_GLOBAL_THREADS 64
> +ThreadletQueue globalqueue;
> +static int globalqueue_init;
> +
> +static void *threadlet_worker(void *data)
> +{
> + ThreadletQueue *queue = data;
> +
> + while (1) {
> + ThreadletWork *work;
> + int ret = 0;
> + qemu_mutex_lock(&(queue->lock));
> +
> + while (QTAILQ_EMPTY(&(queue->request_list))&&
> + (ret != ETIMEDOUT)) {
> + ret = qemu_cond_timedwait(&(queue->cond),
> + &(queue->lock), 10*100000);
> + }
> +
> + assert(queue->idle_threads != 0);
> + if (QTAILQ_EMPTY(&(queue->request_list))) {
> + if (queue->cur_threads> queue->min_threads) {
> + /* We retain the minimum number of threads */
> + break;
> + }
> + } else {
> + work = QTAILQ_FIRST(&(queue->request_list));
> + QTAILQ_REMOVE(&(queue->request_list), work, node);
> +
> + queue->idle_threads--;
> + qemu_mutex_unlock(&(queue->lock));
> +
> + /* execute the work function */
> + work->func(work);
> +
> + qemu_mutex_lock(&(queue->lock));
> + queue->idle_threads++;
> + }
> + qemu_mutex_unlock(&(queue->lock));
> + }
> +
> + queue->idle_threads--;
> + queue->cur_threads--;
> + qemu_mutex_unlock(&queue->lock);
> +
> + return NULL;
> +}
> +
> +static void spawn_threadlet(ThreadletQueue *queue)
> +{
> + QemuThread thread;
> +
> + queue->cur_threads++;
> + queue->idle_threads++;
> +
> + qemu_thread_create(&thread, threadlet_worker, queue);
> +}
> +
> +/**
> + * submit_threadlet_to_queue: Submit a new task to a private queue to be
> + * executed asynchronously.
> + * @queue: Per-subsystem private queue to which the new task needs
> + * to be submitted.
> + * @work: Contains information about the task that needs to be submitted.
> + */
> +void submit_threadlet_to_queue(ThreadletQueue *queue, ThreadletWork *work)
> +{
> + qemu_mutex_lock(&(queue->lock));
> + if (queue->idle_threads == 0&& queue->cur_threads< queue->max_threads) {
> + spawn_threadlet(queue);
> + }
> + QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
> + qemu_mutex_unlock(&(queue->lock));
> + qemu_cond_signal(&(queue->cond));
> +}
> +
> +/**
> + * submit_threadlet: Submit to the global queue a new task to be executed
> + * asynchronously.
> + * @work: Contains information about the task that needs to be submitted.
> + */
> +void submit_threadlet(ThreadletWork *work)
> +{
> + if (unlikely(!globalqueue_init)) {
> + threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
> + MIN_GLOBAL_THREADS);
> + globalqueue_init = 1;
> + }
> +
> + submit_threadlet_to_queue(&globalqueue, work);
> +}
> +
> +/**
> + * cancel_threadlet_on_queue: Cancel a task queued on a Queue.
> + * @queue: The queue containing the task to be cancelled.
> + * @work: Contains the information of the task that needs to be cancelled.
> + *
> + * Returns: 0 if the task is successfully cancelled.
> + * 1 otherwise.
> + */
> +int cancel_threadlet_on_queue(ThreadletQueue *queue, ThreadletWork *work)
> +{
> + ThreadletWork *ret_work;
> + int found = 0;
> +
> + qemu_mutex_lock(&(queue->lock));
> + QTAILQ_FOREACH(ret_work,&(queue->request_list), node) {
> + if (ret_work == work) {
> + QTAILQ_REMOVE(&(queue->request_list), ret_work, node);
> + found = 1;
> + break;
> + }
> + }
> + qemu_mutex_unlock(&(queue->lock));
> +
> + if (found) {
> + return 0;
> + }
> +
> + return 1;
> +}
> +
> +/**
> + * cancel_threadlet: Cancel a task queued on the global queue.
> + * @work: Contains the information of the task that needs to be cancelled.
> + *
> + * Returns: 0 if the task is successfully cancelled.
> + * 1 otherwise.
> + */
> +int cancel_threadlet(ThreadletWork *work)
> +{
> + return cancel_threadlet_on_queue(&globalqueue, work);
> +}
> +
> +/**
> + * threadlet_queue_init: Initialize a threadlet queue.
> + * @queue: The threadlet queue to be initialized.
> + * @max_threads: Maximum number of threads processing the queue.
> + * @min_threads: Minimum number of threads processing the queue.
> + */
> +void threadlet_queue_init(ThreadletQueue *queue,
> + int max_threads, int min_threads)
> +{
> + queue->cur_threads = 0;
> + queue->idle_threads = 0;
> + queue->max_threads = max_threads;
> + queue->min_threads = min_threads;
> + QTAILQ_INIT(&(queue->request_list));
> + qemu_mutex_init(&(queue->lock));
> + qemu_cond_init(&(queue->cond));
> +}
> diff --git a/qemu-threadlets.h b/qemu-threadlets.h
> new file mode 100644
> index 0000000..d58c10f
> --- /dev/null
> +++ b/qemu-threadlets.h
> @@ -0,0 +1,48 @@
> +/*
> + * Threadlet support for offloading tasks to be executed asynchronously
> + *
> + * Copyright IBM, Corp. 2008
> + * Copyright IBM, Corp. 2010
> + *
> + * Authors:
> + * Anthony Liguori<aliguori@us.ibm.com>
> + * Aneesh Kumar K.V<aneesh.kumar@linux.vnet.ibm.com>
> + * Gautham R Shenoy<ego@in.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2. See
> + * the COPYING file in the top-level directory.
> + */
> +
> +#ifndef QEMU_ASYNC_WORK_H
> +#define QEMU_ASYNC_WORK_H
> +
> +#include "qemu-queue.h"
> +#include "qemu-common.h"
> +#include "qemu-thread.h"
> +
> +typedef struct ThreadletQueue
> +{
> + QemuMutex lock;
> + QemuCond cond;
> + int max_threads;
> + int min_threads;
> + int cur_threads;
> + int idle_threads;
> + QTAILQ_HEAD(, threadlet_work) request_list;
> +} ThreadletQueue;
> +
> +typedef struct threadlet_work
> +{
> + QTAILQ_ENTRY(threadlet_work) node;
> + void (*func)(struct threadlet_work *work);
> +} ThreadletWork;
> +
> +extern void submit_threadlet_to_queue(ThreadletQueue *queue,
> + ThreadletWork *work);
> +extern void submit_threadlet(ThreadletWork *work);
> +extern int cancel_threadlet_on_queue(ThreadletQueue *queue,
> + ThreadletWork *work);
> +extern int cancel_threadlet(ThreadletWork *work);
> +extern void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
> + int min_threads);
> +#endif
>
Acked-by: Paolo Bonzini <pbonzini@redhat.com>
^ permalink raw reply [flat|nested] 5+ messages in thread
* Re: [Qemu-devel] [PATCH V5 0/2] Threadlets: A generic task offloading framework
2010-06-22 11:18 [Qemu-devel] [PATCH V5 0/2] Threadlets: A generic task offloading framework Gautham R Shenoy
2010-06-22 11:18 ` [Qemu-devel] [PATCH V5 1/2] qemu: Generic task offloading framework: threadlets Gautham R Shenoy
2010-06-22 11:18 ` [Qemu-devel] [PATCH V5 2/2] qemu: Convert AIO code to use threadlets Gautham R Shenoy
@ 2010-07-09 5:47 ` Gautham R Shenoy
2 siblings, 0 replies; 5+ messages in thread
From: Gautham R Shenoy @ 2010-07-09 5:47 UTC (permalink / raw)
To: Qemu-development List
Cc: Anthony Liguori, Avi Kivity, Corentin Chary, Paolo Bonzini
Hi,
I couldn't see this in the mainline codebase.
Wondering if something else needs to be done in this patch-set.
Thanks and Regards
gautham.
On Tue, Jun 22, 2010 at 04:48:08PM +0530, Gautham R Shenoy wrote:
> Hi,
>
> This is the v5 of the patch-series to have a generic asynchronous task
> offloading framework (called threadlets) within qemu.
>
> V4 can be found here:
> http://lists.gnu.org/archive/html/qemu-devel/2010-06/msg02152.html
>
> Changes from V4:
> =====================================================================
> - Updated the Makefile to make the object files depend on CONFIG_POSIX.
>
> - Did away with the pthread_barrier_init() and pthread_barrier_wait().
>
> - Did away with flush_threadlet_queue() helper.
>
> - Fixed the attribution in copyright, helper names.
>
> - Fixed the idle-thread-exit logic.
>
> - Added documentation in docs/
>
> Description
> =====================================================================
> This patch series decouples the asynchronous threading framework
> implementation from posix-aio-compat.c to implement a generic asynchronous
> task offloading threading framework called threadlets which can be used
> by other subsystems within QEMU.
>
> Currently within QEMU, the AIO subsystem (paio) creates a bunch of
> asynchronous threads to offload any blocking operations so that
> the vcpu threads and the IO thread can go back to servicing any
> other guest requests.
>
> This offloading framework can be used by subsystems such as virtio-9p,
> Asynchronous encoding for vnc-server, so that the vcpu thread can offload
> blocking operations on to the asynchronous threads and resume servicing
> any other guest requests. The asynchronous threads, after
> finishing the blocking operations can then transfer the control over
> to the IO thread so that the latter can handle the post_blocking_operation().
>
> The patch series passed fsstress test without any issues.
>
> Could it be considered for inclusion ?
>
> ---
>
> Aneesh Kumar K.V (1):
> qemu: Generic task offloading framework: threadlets
>
> Gautham R Shenoy (1):
> qemu: Convert AIO code to use threadlets.
>
>
> Makefile.objs | 3 +
> docs/async-support.txt | 141 ++++++++++++++++++++++++++++++++++++++++
> posix-aio-compat.c | 152 +++++++------------------------------------
> qemu-threadlets.c | 170 ++++++++++++++++++++++++++++++++++++++++++++++++
> qemu-threadlets.h | 48 ++++++++++++++
> 5 files changed, 385 insertions(+), 129 deletions(-)
> create mode 100644 docs/async-support.txt
> create mode 100644 qemu-threadlets.c
> create mode 100644 qemu-threadlets.h
>
> --
> Thanks and Regards
> gautham.
>
--
Thanks and Regards
gautham
^ permalink raw reply [flat|nested] 5+ messages in thread
end of thread, other threads:[~2010-07-09 5:48 UTC | newest]
Thread overview: 5+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2010-06-22 11:18 [Qemu-devel] [PATCH V5 0/2] Threadlets: A generic task offloading framework Gautham R Shenoy
2010-06-22 11:18 ` [Qemu-devel] [PATCH V5 1/2] qemu: Generic task offloading framework: threadlets Gautham R Shenoy
2010-06-22 13:00 ` [Qemu-devel] " Paolo Bonzini
2010-06-22 11:18 ` [Qemu-devel] [PATCH V5 2/2] qemu: Convert AIO code to use threadlets Gautham R Shenoy
2010-07-09 5:47 ` [Qemu-devel] [PATCH V5 0/2] Threadlets: A generic task offloading framework Gautham R Shenoy
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).