* [Qemu-devel] [PATCH 02/12] Introduce work concept in posix-aio-compat.c
2011-01-13 12:14 [Qemu-devel] [PATCH 00/12] Threadlets Infrastructure Arun R Bharadwaj
@ 2011-01-13 12:14 ` Arun R Bharadwaj
0 siblings, 0 replies; 14+ messages in thread
From: Arun R Bharadwaj @ 2011-01-13 12:14 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf, aliguori, jvrao, aneesh.kumar
This patch introduces work concept by introducing the
ThreadletWork structure.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
posix-aio-compat.c | 20 ++++++++++++++------
1 files changed, 14 insertions(+), 6 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 82862ec..ddf42f5 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -33,6 +33,10 @@
static QemuMutex aiocb_mutex;
static QemuCond aiocb_completion;
+typedef struct ThreadletWork
+{
+ QTAILQ_ENTRY(ThreadletWork) node;
+} ThreadletWork;
struct qemu_paiocb {
BlockDriverAIOCB common;
@@ -47,13 +51,13 @@ struct qemu_paiocb {
int ev_signo;
off_t aio_offset;
- QTAILQ_ENTRY(qemu_paiocb) node;
int aio_type;
ssize_t ret;
int active;
struct qemu_paiocb *next;
int async_context_id;
+ ThreadletWork work;
};
typedef struct PosixAioState {
@@ -69,7 +73,7 @@ static pthread_attr_t attr;
static int max_threads = 64;
static int cur_threads = 0;
static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
+static QTAILQ_HEAD(, ThreadletWork) request_list;
#ifdef CONFIG_PREADV
static int preadv_present = 1;
@@ -307,6 +311,7 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
static void *aio_thread(void *unused)
{
pid_t pid;
+ ThreadletWork *work;
pid = getpid();
@@ -330,8 +335,11 @@ static void *aio_thread(void *unused)
if (QTAILQ_EMPTY(&request_list))
break;
- aiocb = QTAILQ_FIRST(&request_list);
- QTAILQ_REMOVE(&request_list, aiocb, node);
+ work = QTAILQ_FIRST(&request_list);
+ QTAILQ_REMOVE(&request_list, work, node);
+
+ aiocb = container_of(work, struct qemu_paiocb, work);
+
aiocb->active = 1;
idle_threads--;
mutex_unlock(&lock);
@@ -398,7 +406,7 @@ static void qemu_paio_submit(struct qemu_paiocb *aiocb)
mutex_lock(&lock);
if (idle_threads == 0 && cur_threads < max_threads)
spawn_thread();
- QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
+ QTAILQ_INSERT_TAIL(&request_list, &aiocb->work, node);
mutex_unlock(&lock);
cond_signal(&cond);
}
@@ -548,7 +556,7 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
qemu_mutex_lock(&aiocb_mutex);
if (!acb->active) {
- QTAILQ_REMOVE(&request_list, acb, node);
+ QTAILQ_REMOVE(&request_list, &acb->work, node);
acb->ret = -ECANCELED;
} else if (acb->ret == -EINPROGRESS) {
active = 1;
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [Qemu-devel] [PATCH 00/12] Threadlet Infrastructure
@ 2011-01-20 12:33 Arun R Bharadwaj
2011-01-20 12:33 ` [Qemu-devel] [PATCH 01/12] Add aiocb_mutex and aiocb_completion Arun R Bharadwaj
` (11 more replies)
0 siblings, 12 replies; 14+ messages in thread
From: Arun R Bharadwaj @ 2011-01-20 12:33 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf, aliguori, jvrao, aneesh.kumar, stefanha
Hi,
This patch series introduces threadlet framework
in qemu.
Changelog:
* Modified the dequeue_work function logic which was
earlier wrong. It now returns 0 on success and 1 on
failure.
* removed the locking for container_of which is not
needed in handle_work()
* changed qemu-threadlets.[c|h] to qemu-threadlet.[c|h]
* introduced signal handler APIs in qemu-threadlet.c
directly instead of in posix-aio-compat.c
Testing:
* I have tested this series using fsstress.
Detailed testing information has been mentioned in the
previous iteration.
The following series implements...
---
Arun R Bharadwaj (12):
Add aiocb_mutex and aiocb_completion.
Introduce work concept in posix-aio-compat.c
Add callback function to ThreadletWork structure.
Add ThreadletQueue.
Threadlet: Add submit_work threadlet API.
Threadlet: Add dequeue_work threadlet API and remove active field.
Remove thread_create routine.
Threadlet: Add aio_signal_handler threadlet API
Remove all instances of CONFIG_THREAD
Move threadlet code to qemu-threadlets.c
Threadlets: Add functionality to create private queues.
Threadlets: Add documentation
Makefile.objs | 3 -
configure | 2
docs/async-support.txt | 141 ++++++++++++++++++++++++++++
posix-aio-compat.c | 238 ++++++++++++------------------------------------
qemu-threadlet.c | 187 ++++++++++++++++++++++++++++++++++++++
qemu-threadlet.h | 48 ++++++++++
vl.c | 3 +
7 files changed, 440 insertions(+), 182 deletions(-)
create mode 100644 docs/async-support.txt
create mode 100644 qemu-threadlet.c
create mode 100644 qemu-threadlet.h
--
arun
^ permalink raw reply [flat|nested] 14+ messages in thread
* [Qemu-devel] [PATCH 01/12] Add aiocb_mutex and aiocb_completion.
2011-01-20 12:33 [Qemu-devel] [PATCH 00/12] Threadlet Infrastructure Arun R Bharadwaj
@ 2011-01-20 12:33 ` Arun R Bharadwaj
2011-01-20 12:33 ` [Qemu-devel] [PATCH 02/12] Introduce work concept in posix-aio-compat.c Arun R Bharadwaj
` (10 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Arun R Bharadwaj @ 2011-01-20 12:33 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf, aliguori, jvrao, aneesh.kumar, stefanha
This patch adds the aiocb_mutex to protect aiocb.
This patch also removes the infinite loop present in paio_cancel.
Since there can only be one cancellation at a time, we need to
introduce a condition variable. For this, we need a
global aiocb_completion condition variable.
This patch also adds the Makefile entry to compile qemu-thread.c
when CONFIG_POSIX is set, instead of the unused CONFIG_THREAD.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Reviewed-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
Makefile.objs | 2 +-
posix-aio-compat.c | 39 ++++++++++++++++++++++++++++-----------
2 files changed, 29 insertions(+), 12 deletions(-)
diff --git a/Makefile.objs b/Makefile.objs
index cd5a24b..3b7ec27 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -9,6 +9,7 @@ qobject-obj-y += qerror.o
block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
+block-obj-$(CONFIG_POSIX) += qemu-thread.o
block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
@@ -124,7 +125,6 @@ endif
common-obj-y += $(addprefix ui/, $(ui-obj-y))
common-obj-y += iov.o acl.o
-common-obj-$(CONFIG_THREAD) += qemu-thread.o
common-obj-y += notify.o event_notifier.o
common-obj-y += qemu-timer.o
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 7b862b5..82862ec 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -27,9 +27,12 @@
#include "qemu-common.h"
#include "trace.h"
#include "block_int.h"
+#include "qemu-thread.h"
#include "block/raw-posix-aio.h"
+static QemuMutex aiocb_mutex;
+static QemuCond aiocb_completion;
struct qemu_paiocb {
BlockDriverAIOCB common;
@@ -351,10 +354,14 @@ static void *aio_thread(void *unused)
}
mutex_lock(&lock);
- aiocb->ret = ret;
idle_threads++;
mutex_unlock(&lock);
+ qemu_mutex_lock(&aiocb_mutex);
+ aiocb->ret = ret;
+ qemu_cond_broadcast(&aiocb_completion);
+ qemu_mutex_unlock(&aiocb_mutex);
+
if (kill(pid, aiocb->ev_signo)) die("kill failed");
}
@@ -383,8 +390,11 @@ static void spawn_thread(void)
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
{
+ qemu_mutex_lock(&aiocb_mutex);
aiocb->ret = -EINPROGRESS;
aiocb->active = 0;
+ qemu_mutex_unlock(&aiocb_mutex);
+
mutex_lock(&lock);
if (idle_threads == 0 && cur_threads < max_threads)
spawn_thread();
@@ -397,9 +407,9 @@ static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
{
ssize_t ret;
- mutex_lock(&lock);
+ qemu_mutex_lock(&aiocb_mutex);
ret = aiocb->ret;
- mutex_unlock(&lock);
+ qemu_mutex_unlock(&aiocb_mutex);
return ret;
}
@@ -536,22 +546,26 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
int active = 0;
- mutex_lock(&lock);
+ qemu_mutex_lock(&aiocb_mutex);
if (!acb->active) {
QTAILQ_REMOVE(&request_list, acb, node);
acb->ret = -ECANCELED;
} else if (acb->ret == -EINPROGRESS) {
active = 1;
}
- mutex_unlock(&lock);
- if (active) {
- /* fail safe: if the aio could not be canceled, we wait for
- it */
- while (qemu_paio_error(acb) == EINPROGRESS)
- ;
+ if (!active) {
+ acb->ret = -ECANCELED;
+ } else {
+ while (acb->ret == -EINPROGRESS) {
+ /*
+ * fail safe: if the aio could not be canceled,
+ * we wait for it
+ */
+ qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
+ }
}
-
+ qemu_mutex_unlock(&aiocb_mutex);
paio_remove(acb);
}
@@ -623,6 +637,9 @@ int paio_init(void)
if (posix_aio_state)
return 0;
+ qemu_mutex_init(&aiocb_mutex);
+ qemu_cond_init(&aiocb_completion);
+
s = qemu_malloc(sizeof(PosixAioState));
sigfillset(&act.sa_mask);
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [Qemu-devel] [PATCH 02/12] Introduce work concept in posix-aio-compat.c
2011-01-20 12:33 [Qemu-devel] [PATCH 00/12] Threadlet Infrastructure Arun R Bharadwaj
2011-01-20 12:33 ` [Qemu-devel] [PATCH 01/12] Add aiocb_mutex and aiocb_completion Arun R Bharadwaj
@ 2011-01-20 12:33 ` Arun R Bharadwaj
2011-01-20 12:33 ` [Qemu-devel] [PATCH 03/12] Add callback function to ThreadletWork structure Arun R Bharadwaj
` (9 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Arun R Bharadwaj @ 2011-01-20 12:33 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf, aliguori, jvrao, aneesh.kumar, stefanha
This patch introduces work concept by introducing the
ThreadletWork structure.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Reviewed-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
posix-aio-compat.c | 20 ++++++++++++++------
1 files changed, 14 insertions(+), 6 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 82862ec..ddf42f5 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -33,6 +33,10 @@
static QemuMutex aiocb_mutex;
static QemuCond aiocb_completion;
+typedef struct ThreadletWork
+{
+ QTAILQ_ENTRY(ThreadletWork) node;
+} ThreadletWork;
struct qemu_paiocb {
BlockDriverAIOCB common;
@@ -47,13 +51,13 @@ struct qemu_paiocb {
int ev_signo;
off_t aio_offset;
- QTAILQ_ENTRY(qemu_paiocb) node;
int aio_type;
ssize_t ret;
int active;
struct qemu_paiocb *next;
int async_context_id;
+ ThreadletWork work;
};
typedef struct PosixAioState {
@@ -69,7 +73,7 @@ static pthread_attr_t attr;
static int max_threads = 64;
static int cur_threads = 0;
static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
+static QTAILQ_HEAD(, ThreadletWork) request_list;
#ifdef CONFIG_PREADV
static int preadv_present = 1;
@@ -307,6 +311,7 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
static void *aio_thread(void *unused)
{
pid_t pid;
+ ThreadletWork *work;
pid = getpid();
@@ -330,8 +335,11 @@ static void *aio_thread(void *unused)
if (QTAILQ_EMPTY(&request_list))
break;
- aiocb = QTAILQ_FIRST(&request_list);
- QTAILQ_REMOVE(&request_list, aiocb, node);
+ work = QTAILQ_FIRST(&request_list);
+ QTAILQ_REMOVE(&request_list, work, node);
+
+ aiocb = container_of(work, struct qemu_paiocb, work);
+
aiocb->active = 1;
idle_threads--;
mutex_unlock(&lock);
@@ -398,7 +406,7 @@ static void qemu_paio_submit(struct qemu_paiocb *aiocb)
mutex_lock(&lock);
if (idle_threads == 0 && cur_threads < max_threads)
spawn_thread();
- QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
+ QTAILQ_INSERT_TAIL(&request_list, &aiocb->work, node);
mutex_unlock(&lock);
cond_signal(&cond);
}
@@ -548,7 +556,7 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
qemu_mutex_lock(&aiocb_mutex);
if (!acb->active) {
- QTAILQ_REMOVE(&request_list, acb, node);
+ QTAILQ_REMOVE(&request_list, &acb->work, node);
acb->ret = -ECANCELED;
} else if (acb->ret == -EINPROGRESS) {
active = 1;
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [Qemu-devel] [PATCH 03/12] Add callback function to ThreadletWork structure.
2011-01-20 12:33 [Qemu-devel] [PATCH 00/12] Threadlet Infrastructure Arun R Bharadwaj
2011-01-20 12:33 ` [Qemu-devel] [PATCH 01/12] Add aiocb_mutex and aiocb_completion Arun R Bharadwaj
2011-01-20 12:33 ` [Qemu-devel] [PATCH 02/12] Introduce work concept in posix-aio-compat.c Arun R Bharadwaj
@ 2011-01-20 12:33 ` Arun R Bharadwaj
2011-01-20 12:33 ` [Qemu-devel] [PATCH 04/12] Add ThreadletQueue Arun R Bharadwaj
` (8 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Arun R Bharadwaj @ 2011-01-20 12:33 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf, aliguori, jvrao, aneesh.kumar, stefanha
This patch adds the callback function to the ThreadletWork
structure and moves aio handler as a callback function.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Reviewed-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
posix-aio-compat.c | 89 +++++++++++++++++++++++++++++-----------------------
1 files changed, 50 insertions(+), 39 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index ddf42f5..2792201 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -36,6 +36,7 @@ static QemuCond aiocb_completion;
typedef struct ThreadletWork
{
QTAILQ_ENTRY(ThreadletWork) node;
+ void (*func)(struct ThreadletWork *work);
} ThreadletWork;
struct qemu_paiocb {
@@ -308,18 +309,14 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
return nbytes;
}
-static void *aio_thread(void *unused)
+static void *threadlet_worker(void *data)
{
- pid_t pid;
- ThreadletWork *work;
-
- pid = getpid();
while (1) {
- struct qemu_paiocb *aiocb;
ssize_t ret = 0;
qemu_timeval tv;
struct timespec ts;
+ ThreadletWork *work;
qemu_gettimeofday(&tv);
ts.tv_sec = tv.tv_sec + 10;
@@ -332,52 +329,64 @@ static void *aio_thread(void *unused)
ret = cond_timedwait(&cond, &lock, &ts);
}
- if (QTAILQ_EMPTY(&request_list))
+ if (QTAILQ_EMPTY(&request_list)) {
+ idle_threads--;
+ cur_threads--;
+ mutex_unlock(&lock);
break;
-
+ }
work = QTAILQ_FIRST(&request_list);
QTAILQ_REMOVE(&request_list, work, node);
-
- aiocb = container_of(work, struct qemu_paiocb, work);
-
- aiocb->active = 1;
idle_threads--;
mutex_unlock(&lock);
- switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
- case QEMU_AIO_READ:
- case QEMU_AIO_WRITE:
- ret = handle_aiocb_rw(aiocb);
- break;
- case QEMU_AIO_FLUSH:
- ret = handle_aiocb_flush(aiocb);
- break;
- case QEMU_AIO_IOCTL:
- ret = handle_aiocb_ioctl(aiocb);
- break;
- default:
- fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
- ret = -EINVAL;
- break;
- }
-
+ work->func(work);
mutex_lock(&lock);
idle_threads++;
mutex_unlock(&lock);
- qemu_mutex_lock(&aiocb_mutex);
- aiocb->ret = ret;
- qemu_cond_broadcast(&aiocb_completion);
- qemu_mutex_unlock(&aiocb_mutex);
+ }
+
+ return NULL;
+}
+
+static void handle_work(ThreadletWork *work)
+{
+ pid_t pid;
+ ssize_t ret = 0;
+ struct qemu_paiocb *aiocb;
- if (kill(pid, aiocb->ev_signo)) die("kill failed");
+ pid = getpid();
+ aiocb = container_of(work, struct qemu_paiocb, work);
+ qemu_mutex_lock(&aiocb_mutex);
+ aiocb->active = 1;
+ qemu_mutex_unlock(&aiocb_mutex);
+
+ switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+ case QEMU_AIO_READ:
+ case QEMU_AIO_WRITE:
+ ret = handle_aiocb_rw(aiocb);
+ break;
+ case QEMU_AIO_FLUSH:
+ ret = handle_aiocb_flush(aiocb);
+ break;
+ case QEMU_AIO_IOCTL:
+ ret = handle_aiocb_ioctl(aiocb);
+ break;
+ default:
+ fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+ ret = -EINVAL;
+ break;
}
- idle_threads--;
- cur_threads--;
- mutex_unlock(&lock);
+ qemu_mutex_lock(&aiocb_mutex);
+ aiocb->ret = ret;
+ qemu_cond_broadcast(&aiocb_completion);
+ qemu_mutex_unlock(&aiocb_mutex);
- return NULL;
+ if (kill(pid, aiocb->ev_signo)) {
+ die("kill failed");
+ }
}
static void spawn_thread(void)
@@ -391,7 +400,7 @@ static void spawn_thread(void)
if (sigfillset(&set)) die("sigfillset");
if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
- thread_create(&thread_id, &attr, aio_thread, NULL);
+ thread_create(&thread_id, &attr, threadlet_worker, NULL);
if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
}
@@ -406,6 +415,8 @@ static void qemu_paio_submit(struct qemu_paiocb *aiocb)
mutex_lock(&lock);
if (idle_threads == 0 && cur_threads < max_threads)
spawn_thread();
+
+ aiocb->work.func = handle_work;
QTAILQ_INSERT_TAIL(&request_list, &aiocb->work, node);
mutex_unlock(&lock);
cond_signal(&cond);
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [Qemu-devel] [PATCH 04/12] Add ThreadletQueue.
2011-01-20 12:33 [Qemu-devel] [PATCH 00/12] Threadlet Infrastructure Arun R Bharadwaj
` (2 preceding siblings ...)
2011-01-20 12:33 ` [Qemu-devel] [PATCH 03/12] Add callback function to ThreadletWork structure Arun R Bharadwaj
@ 2011-01-20 12:33 ` Arun R Bharadwaj
2011-01-20 12:33 ` [Qemu-devel] [PATCH 05/12] Threadlet: Add submit_work threadlet API Arun R Bharadwaj
` (7 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Arun R Bharadwaj @ 2011-01-20 12:33 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf, aliguori, jvrao, aneesh.kumar, stefanha
This patch adds a global queue of type ThreadletQueue and
removes the earlier usage of request_list queue.
We want to create the thread on the first submit. Hence we
need to track whether the globalqueue is initialized or not.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Reviewed-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
posix-aio-compat.c | 149 +++++++++++++++++++++++++++-------------------------
1 files changed, 76 insertions(+), 73 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 2792201..011633f 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -31,8 +31,23 @@
#include "block/raw-posix-aio.h"
+#define MAX_GLOBAL_THREADS 64
+#define MIN_GLOBAL_THREADS 8
+
static QemuMutex aiocb_mutex;
static QemuCond aiocb_completion;
+
+typedef struct ThreadletQueue
+{
+ QemuMutex lock;
+ QemuCond cond;
+ int max_threads;
+ int min_threads;
+ int cur_threads;
+ int idle_threads;
+ QTAILQ_HEAD(, ThreadletWork) request_list;
+} ThreadletQueue;
+
typedef struct ThreadletWork
{
QTAILQ_ENTRY(ThreadletWork) node;
@@ -66,15 +81,10 @@ typedef struct PosixAioState {
struct qemu_paiocb *first_aio;
} PosixAioState;
-
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
+/* Default ThreadletQueue */
+static ThreadletQueue globalqueue;
+static int globalqueue_init;
static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, ThreadletWork) request_list;
#ifdef CONFIG_PREADV
static int preadv_present = 1;
@@ -93,32 +103,6 @@ static void die(const char *what)
die2(errno, what);
}
-static void mutex_lock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_lock(mutex);
- if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_unlock(mutex);
- if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
- struct timespec *ts)
-{
- int ret = pthread_cond_timedwait(cond, mutex, ts);
- if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
- return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
- int ret = pthread_cond_signal(cond);
- if (ret) die2(ret, "pthread_cond_signal");
-}
-
static void thread_create(pthread_t *thread, pthread_attr_t *attr,
void *(*start_routine)(void*), void *arg)
{
@@ -311,42 +295,45 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
static void *threadlet_worker(void *data)
{
+ ThreadletQueue *queue = data;
+ qemu_mutex_lock(&queue->lock);
while (1) {
- ssize_t ret = 0;
- qemu_timeval tv;
- struct timespec ts;
ThreadletWork *work;
+ int ret = 0;
- qemu_gettimeofday(&tv);
- ts.tv_sec = tv.tv_sec + 10;
- ts.tv_nsec = 0;
-
- mutex_lock(&lock);
-
- while (QTAILQ_EMPTY(&request_list) &&
- !(ret == ETIMEDOUT)) {
- ret = cond_timedwait(&cond, &lock, &ts);
+ while (QTAILQ_EMPTY(&queue->request_list) &&
+ (ret != ETIMEDOUT)) {
+ /* wait for cond to be signalled or broadcast for 1000s */
+ ret = qemu_cond_timedwait((&queue->cond),
+ &(queue->lock), 10*100000);
}
- if (QTAILQ_EMPTY(&request_list)) {
- idle_threads--;
- cur_threads--;
- mutex_unlock(&lock);
- break;
- }
- work = QTAILQ_FIRST(&request_list);
- QTAILQ_REMOVE(&request_list, work, node);
- idle_threads--;
- mutex_unlock(&lock);
+ assert(queue->idle_threads != 0);
+ if (QTAILQ_EMPTY(&queue->request_list)) {
+ if (queue->cur_threads > queue->min_threads) {
+ /* We retain the minimum number of threads */
+ break;
+ }
+ } else {
+ work = QTAILQ_FIRST(&queue->request_list);
+ QTAILQ_REMOVE(&queue->request_list, work, node);
+
+ queue->idle_threads--;
+ qemu_mutex_unlock(&queue->lock);
- work->func(work);
- mutex_lock(&lock);
- idle_threads++;
- mutex_unlock(&lock);
+ /* execute the work function */
+ work->func(work);
+ qemu_mutex_lock(&queue->lock);
+ queue->idle_threads++;
+ }
}
+ queue->idle_threads--;
+ queue->cur_threads--;
+ qemu_mutex_unlock(&queue->lock);
+
return NULL;
}
@@ -389,18 +376,19 @@ static void handle_work(ThreadletWork *work)
}
}
-static void spawn_thread(void)
+static void spawn_threadlet(ThreadletQueue *queue)
{
+ pthread_t thread_id;
sigset_t set, oldset;
- cur_threads++;
- idle_threads++;
+ queue->cur_threads++;
+ queue->idle_threads++;
/* block all signals */
if (sigfillset(&set)) die("sigfillset");
if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
- thread_create(&thread_id, &attr, threadlet_worker, NULL);
+ thread_create(&thread_id, &attr, threadlet_worker, queue);
if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
}
@@ -412,14 +400,29 @@ static void qemu_paio_submit(struct qemu_paiocb *aiocb)
aiocb->active = 0;
qemu_mutex_unlock(&aiocb_mutex);
- mutex_lock(&lock);
- if (idle_threads == 0 && cur_threads < max_threads)
- spawn_thread();
+ qemu_mutex_lock(&globalqueue.lock);
+
+ if (!globalqueue_init) {
+ globalqueue.cur_threads = 0;
+ globalqueue.idle_threads = 0;
+ globalqueue.max_threads = MAX_GLOBAL_THREADS;
+ globalqueue.min_threads = MIN_GLOBAL_THREADS;
+ QTAILQ_INIT(&globalqueue.request_list);
+ qemu_mutex_init(&globalqueue.lock);
+ qemu_cond_init(&globalqueue.cond);
+
+ globalqueue_init = 1;
+ }
+
+ if (globalqueue.idle_threads == 0 &&
+ globalqueue.cur_threads < globalqueue.max_threads)
+ spawn_threadlet(&globalqueue);
aiocb->work.func = handle_work;
- QTAILQ_INSERT_TAIL(&request_list, &aiocb->work, node);
- mutex_unlock(&lock);
- cond_signal(&cond);
+
+ QTAILQ_INSERT_TAIL(&globalqueue.request_list, &aiocb->work, node);
+ qemu_cond_signal(&globalqueue.cond);
+ qemu_mutex_unlock(&globalqueue.lock);
}
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
@@ -566,12 +569,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
int active = 0;
qemu_mutex_lock(&aiocb_mutex);
+ qemu_mutex_lock(&globalqueue.lock);
if (!acb->active) {
- QTAILQ_REMOVE(&request_list, &acb->work, node);
+ QTAILQ_REMOVE(&globalqueue.request_list, &acb->work, node);
acb->ret = -ECANCELED;
} else if (acb->ret == -EINPROGRESS) {
active = 1;
}
+ qemu_mutex_unlock(&globalqueue.lock);
if (!active) {
acb->ret = -ECANCELED;
@@ -689,8 +694,6 @@ int paio_init(void)
if (ret)
die2(ret, "pthread_attr_setdetachstate");
- QTAILQ_INIT(&request_list);
-
posix_aio_state = s;
return 0;
}
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [Qemu-devel] [PATCH 05/12] Threadlet: Add submit_work threadlet API.
2011-01-20 12:33 [Qemu-devel] [PATCH 00/12] Threadlet Infrastructure Arun R Bharadwaj
` (3 preceding siblings ...)
2011-01-20 12:33 ` [Qemu-devel] [PATCH 04/12] Add ThreadletQueue Arun R Bharadwaj
@ 2011-01-20 12:33 ` Arun R Bharadwaj
2011-01-20 12:33 ` [Qemu-devel] [PATCH 06/12] Threadlet: Add dequeue_work threadlet API and remove active field Arun R Bharadwaj
` (6 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Arun R Bharadwaj @ 2011-01-20 12:33 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf, aliguori, jvrao, aneesh.kumar, stefanha
This patch adds submit work threadlet API and shows how
the qemu_paio_submit changes to submit_work.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Reviewed-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
posix-aio-compat.c | 33 ++++++++++++++++++++++-----------
1 files changed, 22 insertions(+), 11 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 011633f..62e32e3 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -393,13 +393,13 @@ static void spawn_threadlet(ThreadletQueue *queue)
if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
}
-static void qemu_paio_submit(struct qemu_paiocb *aiocb)
+/**
+ * submit_work: Submit to the global queue a new task to be executed
+ * asynchronously.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+static void submit_work(ThreadletWork *work)
{
- qemu_mutex_lock(&aiocb_mutex);
- aiocb->ret = -EINPROGRESS;
- aiocb->active = 0;
- qemu_mutex_unlock(&aiocb_mutex);
-
qemu_mutex_lock(&globalqueue.lock);
if (!globalqueue_init) {
@@ -415,13 +415,13 @@ static void qemu_paio_submit(struct qemu_paiocb *aiocb)
}
if (globalqueue.idle_threads == 0 &&
- globalqueue.cur_threads < globalqueue.max_threads)
+ globalqueue.cur_threads < globalqueue.max_threads) {
spawn_threadlet(&globalqueue);
- aiocb->work.func = handle_work;
-
- QTAILQ_INSERT_TAIL(&globalqueue.request_list, &aiocb->work, node);
- qemu_cond_signal(&globalqueue.cond);
+ } else {
+ qemu_cond_signal(&globalqueue.cond);
+ }
+ QTAILQ_INSERT_TAIL(&globalqueue.request_list, work, node);
qemu_mutex_unlock(&globalqueue.lock);
}
@@ -448,6 +448,17 @@ static int qemu_paio_error(struct qemu_paiocb *aiocb)
return ret;
}
+static void qemu_paio_submit(struct qemu_paiocb *aiocb)
+{
+ qemu_mutex_lock(&aiocb_mutex);
+ aiocb->ret = -EINPROGRESS;
+ aiocb->active = 0;
+ qemu_mutex_unlock(&aiocb_mutex);
+
+ aiocb->work.func = handle_work;
+ submit_work(&aiocb->work);
+}
+
static int posix_aio_process_queue(void *opaque)
{
PosixAioState *s = opaque;
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [Qemu-devel] [PATCH 06/12] Threadlet: Add dequeue_work threadlet API and remove active field.
2011-01-20 12:33 [Qemu-devel] [PATCH 00/12] Threadlet Infrastructure Arun R Bharadwaj
` (4 preceding siblings ...)
2011-01-20 12:33 ` [Qemu-devel] [PATCH 05/12] Threadlet: Add submit_work threadlet API Arun R Bharadwaj
@ 2011-01-20 12:33 ` Arun R Bharadwaj
2011-01-20 12:33 ` [Qemu-devel] [PATCH 07/12] Remove thread_create routine Arun R Bharadwaj
` (5 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Arun R Bharadwaj @ 2011-01-20 12:33 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf, aliguori, jvrao, aneesh.kumar, stefanha
This patch adds dequeue_work threadlet API and shows how
the paio_cancel changes to dequeue_work.
The active field in the qemu_aiocb structure is now useless.
Remove it.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Reviewed-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
posix-aio-compat.c | 48 ++++++++++++++++++++++++++----------------------
1 files changed, 26 insertions(+), 22 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 62e32e3..95ba805 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -69,7 +69,6 @@ struct qemu_paiocb {
int aio_type;
ssize_t ret;
- int active;
struct qemu_paiocb *next;
int async_context_id;
@@ -345,9 +344,6 @@ static void handle_work(ThreadletWork *work)
pid = getpid();
aiocb = container_of(work, struct qemu_paiocb, work);
- qemu_mutex_lock(&aiocb_mutex);
- aiocb->active = 1;
- qemu_mutex_unlock(&aiocb_mutex);
switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
case QEMU_AIO_READ:
@@ -452,7 +448,6 @@ static void qemu_paio_submit(struct qemu_paiocb *aiocb)
{
qemu_mutex_lock(&aiocb_mutex);
aiocb->ret = -EINPROGRESS;
- aiocb->active = 0;
qemu_mutex_unlock(&aiocb_mutex);
aiocb->work.func = handle_work;
@@ -574,33 +569,42 @@ static void paio_remove(struct qemu_paiocb *acb)
}
}
-static void paio_cancel(BlockDriverAIOCB *blockacb)
+/**
+ * dequeue_work: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if successfully dequeued work.
+ * 1 otherwise.
+ */
+static int dequeue_work(ThreadletWork *work)
{
- struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
- int active = 0;
+ int ret = 1;
+ ThreadletWork *ret_work;
- qemu_mutex_lock(&aiocb_mutex);
qemu_mutex_lock(&globalqueue.lock);
- if (!acb->active) {
- QTAILQ_REMOVE(&globalqueue.request_list, &acb->work, node);
- acb->ret = -ECANCELED;
- } else if (acb->ret == -EINPROGRESS) {
- active = 1;
+ QTAILQ_FOREACH(ret_work, &(globalqueue.request_list), node) {
+ if (ret_work == work) {
+ QTAILQ_REMOVE(&globalqueue.request_list, ret_work, node);
+ ret = 0;
+ break;
+ }
}
qemu_mutex_unlock(&globalqueue.lock);
- if (!active) {
- acb->ret = -ECANCELED;
- } else {
+ return ret;
+}
+
+static void paio_cancel(BlockDriverAIOCB *blockacb)
+{
+ struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
+ if (dequeue_work(&acb->work) != 0) {
+ /* Wait for running work item to complete */
+ qemu_mutex_lock(&aiocb_mutex);
while (acb->ret == -EINPROGRESS) {
- /*
- * fail safe: if the aio could not be canceled,
- * we wait for it
- */
qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
}
+ qemu_mutex_unlock(&aiocb_mutex);
}
- qemu_mutex_unlock(&aiocb_mutex);
paio_remove(acb);
}
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [Qemu-devel] [PATCH 07/12] Remove thread_create routine.
2011-01-20 12:33 [Qemu-devel] [PATCH 00/12] Threadlet Infrastructure Arun R Bharadwaj
` (5 preceding siblings ...)
2011-01-20 12:33 ` [Qemu-devel] [PATCH 06/12] Threadlet: Add dequeue_work threadlet API and remove active field Arun R Bharadwaj
@ 2011-01-20 12:33 ` Arun R Bharadwaj
2011-01-20 12:34 ` [Qemu-devel] [PATCH 08/12] Threadlet: Add aio_signal_handler threadlet API Arun R Bharadwaj
` (4 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Arun R Bharadwaj @ 2011-01-20 12:33 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf, aliguori, jvrao, aneesh.kumar, stefanha
Remove thread_create and use qemu_thread_create instead.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Reviewed-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
posix-aio-compat.c | 29 ++---------------------------
1 files changed, 2 insertions(+), 27 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 95ba805..a7625d8 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -13,7 +13,6 @@
#include <sys/ioctl.h>
#include <sys/types.h>
-#include <pthread.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
@@ -83,7 +82,6 @@ typedef struct PosixAioState {
/* Default ThreadletQueue */
static ThreadletQueue globalqueue;
static int globalqueue_init;
-static pthread_attr_t attr;
#ifdef CONFIG_PREADV
static int preadv_present = 1;
@@ -102,13 +100,6 @@ static void die(const char *what)
die2(errno, what);
}
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
- void *(*start_routine)(void*), void *arg)
-{
- int ret = pthread_create(thread, attr, start_routine, arg);
- if (ret) die2(ret, "pthread_create");
-}
-
static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
{
int ret;
@@ -374,19 +365,12 @@ static void handle_work(ThreadletWork *work)
static void spawn_threadlet(ThreadletQueue *queue)
{
- pthread_t thread_id;
- sigset_t set, oldset;
+ QemuThread thread;
queue->cur_threads++;
queue->idle_threads++;
- /* block all signals */
- if (sigfillset(&set)) die("sigfillset");
- if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
- thread_create(&thread_id, &attr, threadlet_worker, queue);
-
- if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+ qemu_thread_create(&thread, threadlet_worker, queue);
}
/**
@@ -671,7 +655,6 @@ int paio_init(void)
struct sigaction act;
PosixAioState *s;
int fds[2];
- int ret;
if (posix_aio_state)
return 0;
@@ -701,14 +684,6 @@ int paio_init(void)
qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
posix_aio_process_queue, s);
- ret = pthread_attr_init(&attr);
- if (ret)
- die2(ret, "pthread_attr_init");
-
- ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- if (ret)
- die2(ret, "pthread_attr_setdetachstate");
-
posix_aio_state = s;
return 0;
}
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [Qemu-devel] [PATCH 08/12] Threadlet: Add aio_signal_handler threadlet API
2011-01-20 12:33 [Qemu-devel] [PATCH 00/12] Threadlet Infrastructure Arun R Bharadwaj
` (6 preceding siblings ...)
2011-01-20 12:33 ` [Qemu-devel] [PATCH 07/12] Remove thread_create routine Arun R Bharadwaj
@ 2011-01-20 12:34 ` Arun R Bharadwaj
2011-01-20 12:34 ` [Qemu-devel] [PATCH 09/12] Remove all instances of CONFIG_THREAD Arun R Bharadwaj
` (3 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Arun R Bharadwaj @ 2011-01-20 12:34 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf, aliguori, jvrao, aneesh.kumar, stefanha
This patch adds aio_signal_handler threadlet API. Earler
posix-aio-compat.c had its own signal handler code. Now
abstract this, in the later patch it is moved to a generic
code so that it can be used by other subsystems.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Reviewed-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
Makefile.objs | 1 +
posix-aio-compat.c | 30 ++++++++----------------------
qemu-threadlet.c | 39 +++++++++++++++++++++++++++++++++++++++
qemu-threadlet.h | 19 +++++++++++++++++++
vl.c | 3 +++
5 files changed, 70 insertions(+), 22 deletions(-)
create mode 100644 qemu-threadlet.c
create mode 100644 qemu-threadlet.h
diff --git a/Makefile.objs b/Makefile.objs
index 3b7ec27..3b11dd0 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -10,6 +10,7 @@ qobject-obj-y += qerror.o
block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
block-obj-$(CONFIG_POSIX) += qemu-thread.o
+block-obj-$(CONFIG_POSIX) += qemu-threadlet.o
block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index a7625d8..96e28db 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -327,11 +327,14 @@ static void *threadlet_worker(void *data)
return NULL;
}
+static PosixAioState *posix_aio_state;
+
static void handle_work(ThreadletWork *work)
{
pid_t pid;
ssize_t ret = 0;
struct qemu_paiocb *aiocb;
+ char byte = 0;
pid = getpid();
aiocb = container_of(work, struct qemu_paiocb, work);
@@ -358,6 +361,11 @@ static void handle_work(ThreadletWork *work)
qemu_cond_broadcast(&aiocb_completion);
qemu_mutex_unlock(&aiocb_mutex);
+ ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
+ if (ret < 0 && errno != EAGAIN) {
+ die("write()");
+ }
+
if (kill(pid, aiocb->ev_signo)) {
die("kill failed");
}
@@ -518,22 +526,6 @@ static int posix_aio_flush(void *opaque)
return !!s->first_aio;
}
-static PosixAioState *posix_aio_state;
-
-static void aio_signal_handler(int signum)
-{
- if (posix_aio_state) {
- char byte = 0;
- ssize_t ret;
-
- ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
- if (ret < 0 && errno != EAGAIN)
- die("write()");
- }
-
- qemu_service_io();
-}
-
static void paio_remove(struct qemu_paiocb *acb)
{
struct qemu_paiocb **pacb;
@@ -652,7 +644,6 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
int paio_init(void)
{
- struct sigaction act;
PosixAioState *s;
int fds[2];
@@ -664,11 +655,6 @@ int paio_init(void)
s = qemu_malloc(sizeof(PosixAioState));
- sigfillset(&act.sa_mask);
- act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
- act.sa_handler = aio_signal_handler;
- sigaction(SIGUSR2, &act, NULL);
-
s->first_aio = NULL;
if (qemu_pipe(fds) == -1) {
fprintf(stderr, "failed to create pipe\n");
diff --git a/qemu-threadlet.c b/qemu-threadlet.c
new file mode 100644
index 0000000..857d08d
--- /dev/null
+++ b/qemu-threadlet.c
@@ -0,0 +1,39 @@
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ * Anthony Liguori <aliguori@us.ibm.com>
+ * Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
+ * Gautham R Shenoy <gautham.shenoy@gmail.com>
+ * Arun R Bharadwaj <arun@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ */
+
+#include <signal.h>
+
+#include "qemu-threadlet.h"
+#include "osdep.h"
+
+static void threadlet_io_completion_signal_handler(int signum)
+{
+ qemu_service_io();
+}
+
+static void threadlet_register_signal_handler(void)
+{
+ struct sigaction act;
+ sigfillset(&act.sa_mask);
+ act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
+ act.sa_handler = threadlet_io_completion_signal_handler;
+ sigaction(SIGUSR2, &act, NULL);
+}
+
+void threadlet_init(void)
+{
+ threadlet_register_signal_handler();
+}
diff --git a/qemu-threadlet.h b/qemu-threadlet.h
new file mode 100644
index 0000000..2c225d6
--- /dev/null
+++ b/qemu-threadlet.h
@@ -0,0 +1,19 @@
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ * Anthony Liguori <aliguori@us.ibm.com>
+ * Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
+ * Gautham R Shenoy <gautham.shenoy@gmail.com>
+ * Arun R Bharadwaj <arun@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ */
+
+#include "qemu-common.h"
+
+void threadlet_init(void);
diff --git a/vl.c b/vl.c
index df414ef..d04fc7a 100644
--- a/vl.c
+++ b/vl.c
@@ -148,6 +148,7 @@ int main(int argc, char **argv)
#include "qemu-config.h"
#include "qemu-objects.h"
#include "qemu-options.h"
+#include "qemu-threadlet.h"
#ifdef CONFIG_VIRTFS
#include "fsdev/qemu-fsdev.h"
#endif
@@ -2922,6 +2923,8 @@ int main(int argc, char **argv, char **envp)
exit(1);
}
+ threadlet_init();
+
/* init generic devices */
if (qemu_opts_foreach(qemu_find_opts("device"), device_init_func, NULL, 1) != 0)
exit(1);
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [Qemu-devel] [PATCH 09/12] Remove all instances of CONFIG_THREAD
2011-01-20 12:33 [Qemu-devel] [PATCH 00/12] Threadlet Infrastructure Arun R Bharadwaj
` (7 preceding siblings ...)
2011-01-20 12:34 ` [Qemu-devel] [PATCH 08/12] Threadlet: Add aio_signal_handler threadlet API Arun R Bharadwaj
@ 2011-01-20 12:34 ` Arun R Bharadwaj
2011-01-20 12:34 ` [Qemu-devel] [PATCH 10/12] Move threadlet code to qemu-threadlets.c Arun R Bharadwaj
` (2 subsequent siblings)
11 siblings, 0 replies; 14+ messages in thread
From: Arun R Bharadwaj @ 2011-01-20 12:34 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf, aliguori, jvrao, aneesh.kumar, stefanha
Remove all instances of CONFIG_THREAD, which is not used
anymore.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Reviewed-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
configure | 2 --
1 files changed, 0 insertions(+), 2 deletions(-)
diff --git a/configure b/configure
index a079a49..addf733 100755
--- a/configure
+++ b/configure
@@ -2456,7 +2456,6 @@ if test "$vnc_png" != "no" ; then
fi
if test "$vnc_thread" != "no" ; then
echo "CONFIG_VNC_THREAD=y" >> $config_host_mak
- echo "CONFIG_THREAD=y" >> $config_host_mak
fi
if test "$fnmatch" = "yes" ; then
echo "CONFIG_FNMATCH=y" >> $config_host_mak
@@ -2534,7 +2533,6 @@ if test "$xen" = "yes" ; then
fi
if test "$io_thread" = "yes" ; then
echo "CONFIG_IOTHREAD=y" >> $config_host_mak
- echo "CONFIG_THREAD=y" >> $config_host_mak
fi
if test "$linux_aio" = "yes" ; then
echo "CONFIG_LINUX_AIO=y" >> $config_host_mak
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [Qemu-devel] [PATCH 10/12] Move threadlet code to qemu-threadlets.c
2011-01-20 12:33 [Qemu-devel] [PATCH 00/12] Threadlet Infrastructure Arun R Bharadwaj
` (8 preceding siblings ...)
2011-01-20 12:34 ` [Qemu-devel] [PATCH 09/12] Remove all instances of CONFIG_THREAD Arun R Bharadwaj
@ 2011-01-20 12:34 ` Arun R Bharadwaj
2011-01-20 12:34 ` [Qemu-devel] [PATCH 11/12] Threadlets: Add functionality to create private queues Arun R Bharadwaj
2011-01-20 12:34 ` [Qemu-devel] [PATCH 12/12] Threadlets: Add documentation Arun R Bharadwaj
11 siblings, 0 replies; 14+ messages in thread
From: Arun R Bharadwaj @ 2011-01-20 12:34 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf, aliguori, jvrao, aneesh.kumar, stefanha
This patch moves the threadlet queue API code to
qemu-threadlets.c where these APIs can be used by
other subsystems.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Reviewed-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
posix-aio-compat.c | 137 ----------------------------------------------------
qemu-threadlet.c | 115 ++++++++++++++++++++++++++++++++++++++++++++
qemu-threadlet.h | 25 +++++++++
3 files changed, 141 insertions(+), 136 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 96e28db..4fc9581 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -26,33 +26,13 @@
#include "qemu-common.h"
#include "trace.h"
#include "block_int.h"
-#include "qemu-thread.h"
+#include "qemu-threadlet.h"
#include "block/raw-posix-aio.h"
-#define MAX_GLOBAL_THREADS 64
-#define MIN_GLOBAL_THREADS 8
-
static QemuMutex aiocb_mutex;
static QemuCond aiocb_completion;
-typedef struct ThreadletQueue
-{
- QemuMutex lock;
- QemuCond cond;
- int max_threads;
- int min_threads;
- int cur_threads;
- int idle_threads;
- QTAILQ_HEAD(, ThreadletWork) request_list;
-} ThreadletQueue;
-
-typedef struct ThreadletWork
-{
- QTAILQ_ENTRY(ThreadletWork) node;
- void (*func)(struct ThreadletWork *work);
-} ThreadletWork;
-
struct qemu_paiocb {
BlockDriverAIOCB common;
int aio_fildes;
@@ -79,10 +59,6 @@ typedef struct PosixAioState {
struct qemu_paiocb *first_aio;
} PosixAioState;
-/* Default ThreadletQueue */
-static ThreadletQueue globalqueue;
-static int globalqueue_init;
-
#ifdef CONFIG_PREADV
static int preadv_present = 1;
#else
@@ -283,50 +259,6 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
return nbytes;
}
-static void *threadlet_worker(void *data)
-{
- ThreadletQueue *queue = data;
-
- qemu_mutex_lock(&queue->lock);
- while (1) {
- ThreadletWork *work;
- int ret = 0;
-
- while (QTAILQ_EMPTY(&queue->request_list) &&
- (ret != ETIMEDOUT)) {
- /* wait for cond to be signalled or broadcast for 1000s */
- ret = qemu_cond_timedwait((&queue->cond),
- &(queue->lock), 10*100000);
- }
-
- assert(queue->idle_threads != 0);
- if (QTAILQ_EMPTY(&queue->request_list)) {
- if (queue->cur_threads > queue->min_threads) {
- /* We retain the minimum number of threads */
- break;
- }
- } else {
- work = QTAILQ_FIRST(&queue->request_list);
- QTAILQ_REMOVE(&queue->request_list, work, node);
-
- queue->idle_threads--;
- qemu_mutex_unlock(&queue->lock);
-
- /* execute the work function */
- work->func(work);
-
- qemu_mutex_lock(&queue->lock);
- queue->idle_threads++;
- }
- }
-
- queue->idle_threads--;
- queue->cur_threads--;
- qemu_mutex_unlock(&queue->lock);
-
- return NULL;
-}
-
static PosixAioState *posix_aio_state;
static void handle_work(ThreadletWork *work)
@@ -371,48 +303,6 @@ static void handle_work(ThreadletWork *work)
}
}
-static void spawn_threadlet(ThreadletQueue *queue)
-{
- QemuThread thread;
-
- queue->cur_threads++;
- queue->idle_threads++;
-
- qemu_thread_create(&thread, threadlet_worker, queue);
-}
-
-/**
- * submit_work: Submit to the global queue a new task to be executed
- * asynchronously.
- * @work: Contains information about the task that needs to be submitted.
- */
-static void submit_work(ThreadletWork *work)
-{
- qemu_mutex_lock(&globalqueue.lock);
-
- if (!globalqueue_init) {
- globalqueue.cur_threads = 0;
- globalqueue.idle_threads = 0;
- globalqueue.max_threads = MAX_GLOBAL_THREADS;
- globalqueue.min_threads = MIN_GLOBAL_THREADS;
- QTAILQ_INIT(&globalqueue.request_list);
- qemu_mutex_init(&globalqueue.lock);
- qemu_cond_init(&globalqueue.cond);
-
- globalqueue_init = 1;
- }
-
- if (globalqueue.idle_threads == 0 &&
- globalqueue.cur_threads < globalqueue.max_threads) {
- spawn_threadlet(&globalqueue);
-
- } else {
- qemu_cond_signal(&globalqueue.cond);
- }
- QTAILQ_INSERT_TAIL(&globalqueue.request_list, work, node);
- qemu_mutex_unlock(&globalqueue.lock);
-}
-
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
{
ssize_t ret;
@@ -545,31 +435,6 @@ static void paio_remove(struct qemu_paiocb *acb)
}
}
-/**
- * dequeue_work: Cancel a task queued on the global queue.
- * @work: Contains the information of the task that needs to be cancelled.
- *
- * Returns: 0 if successfully dequeued work.
- * 1 otherwise.
- */
-static int dequeue_work(ThreadletWork *work)
-{
- int ret = 1;
- ThreadletWork *ret_work;
-
- qemu_mutex_lock(&globalqueue.lock);
- QTAILQ_FOREACH(ret_work, &(globalqueue.request_list), node) {
- if (ret_work == work) {
- QTAILQ_REMOVE(&globalqueue.request_list, ret_work, node);
- ret = 0;
- break;
- }
- }
- qemu_mutex_unlock(&globalqueue.lock);
-
- return ret;
-}
-
static void paio_cancel(BlockDriverAIOCB *blockacb)
{
struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
diff --git a/qemu-threadlet.c b/qemu-threadlet.c
index 857d08d..6523f08 100644
--- a/qemu-threadlet.c
+++ b/qemu-threadlet.c
@@ -19,6 +19,12 @@
#include "qemu-threadlet.h"
#include "osdep.h"
+#define MAX_GLOBAL_THREADS 64
+#define MIN_GLOBAL_THREADS 8
+
+static ThreadletQueue globalqueue;
+static int globalqueue_init;
+
static void threadlet_io_completion_signal_handler(int signum)
{
qemu_service_io();
@@ -37,3 +43,112 @@ void threadlet_init(void)
{
threadlet_register_signal_handler();
}
+
+static void *threadlet_worker(void *data)
+{
+ ThreadletQueue *queue = data;
+
+ qemu_mutex_lock(&queue->lock);
+ while (1) {
+ ThreadletWork *work;
+ int ret = 0;
+
+ while (QTAILQ_EMPTY(&queue->request_list) &&
+ (ret != ETIMEDOUT)) {
+ /* wait for cond to be signalled or broadcast for 1000s */
+ ret = qemu_cond_timedwait((&queue->cond),
+ &(queue->lock), 10*100000);
+ }
+
+ assert(queue->idle_threads != 0);
+ if (QTAILQ_EMPTY(&queue->request_list)) {
+ if (queue->cur_threads > queue->min_threads) {
+ /* We retain the minimum number of threads */
+ break;
+ }
+ } else {
+ work = QTAILQ_FIRST(&queue->request_list);
+ QTAILQ_REMOVE(&queue->request_list, work, node);
+
+ queue->idle_threads--;
+ qemu_mutex_unlock(&queue->lock);
+
+ /* execute the work function */
+ work->func(work);
+
+ qemu_mutex_lock(&queue->lock);
+ queue->idle_threads++;
+ }
+ }
+
+ queue->idle_threads--;
+ queue->cur_threads--;
+ qemu_mutex_unlock(&queue->lock);
+
+ return NULL;
+}
+
+static void spawn_threadlet(ThreadletQueue *queue)
+{
+ QemuThread thread;
+
+ queue->cur_threads++;
+ queue->idle_threads++;
+
+ qemu_thread_create(&thread, threadlet_worker, queue);
+}
+
+/**
+ * submit_work: Submit to the global queue a new task to be executed
+ * asynchronously.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_work(ThreadletWork *work)
+{
+ if (!globalqueue_init) {
+ globalqueue.cur_threads = 0;
+ globalqueue.idle_threads = 0;
+ globalqueue.max_threads = MAX_GLOBAL_THREADS;
+ globalqueue.min_threads = MIN_GLOBAL_THREADS;
+ QTAILQ_INIT(&globalqueue.request_list);
+ qemu_mutex_init(&globalqueue.lock);
+ qemu_cond_init(&globalqueue.cond);
+
+ globalqueue_init = 1;
+ }
+
+ qemu_mutex_lock(&globalqueue.lock);
+ if (globalqueue.idle_threads == 0 &&
+ globalqueue.cur_threads < globalqueue.max_threads) {
+ spawn_threadlet(&globalqueue);
+ } else {
+ qemu_cond_signal(&globalqueue.cond);
+ }
+ QTAILQ_INSERT_TAIL(&globalqueue.request_list, work, node);
+ qemu_mutex_unlock(&globalqueue.lock);
+}
+
+/**
+ * dequeue_work: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if successfully dequeued work.
+ * 1 otherwise.
+ */
+int dequeue_work(ThreadletWork *work)
+{
+ int ret = 1;
+ ThreadletWork *ret_work;
+
+ qemu_mutex_lock(&globalqueue.lock);
+ QTAILQ_FOREACH(ret_work, &(globalqueue.request_list), node) {
+ if (ret_work == work) {
+ QTAILQ_REMOVE(&globalqueue.request_list, ret_work, node);
+ ret = 0;
+ break;
+ }
+ }
+ qemu_mutex_unlock(&globalqueue.lock);
+
+ return ret;
+}
diff --git a/qemu-threadlet.h b/qemu-threadlet.h
index 2c225d6..6b11c86 100644
--- a/qemu-threadlet.h
+++ b/qemu-threadlet.h
@@ -14,6 +14,31 @@
* the COPYING file in the top-level directory.
*/
+#ifndef QEMU_ASYNC_WORK_H
+#define QEMU_ASYNC_WORK_H
+
+#include "qemu-queue.h"
#include "qemu-common.h"
+#include "qemu-thread.h"
+
+typedef struct ThreadletQueue
+{
+ QemuMutex lock;
+ QemuCond cond;
+ int max_threads;
+ int min_threads;
+ int cur_threads;
+ int idle_threads;
+ QTAILQ_HEAD(, ThreadletWork) request_list;
+} ThreadletQueue;
+
+typedef struct ThreadletWork
+{
+ QTAILQ_ENTRY(ThreadletWork) node;
+ void (*func)(struct ThreadletWork *work);
+} ThreadletWork;
+void submit_work(ThreadletWork *work);
+int dequeue_work(ThreadletWork *work);
void threadlet_init(void);
+#endif
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [Qemu-devel] [PATCH 11/12] Threadlets: Add functionality to create private queues.
2011-01-20 12:33 [Qemu-devel] [PATCH 00/12] Threadlet Infrastructure Arun R Bharadwaj
` (9 preceding siblings ...)
2011-01-20 12:34 ` [Qemu-devel] [PATCH 10/12] Move threadlet code to qemu-threadlets.c Arun R Bharadwaj
@ 2011-01-20 12:34 ` Arun R Bharadwaj
2011-01-20 12:34 ` [Qemu-devel] [PATCH 12/12] Threadlets: Add documentation Arun R Bharadwaj
11 siblings, 0 replies; 14+ messages in thread
From: Arun R Bharadwaj @ 2011-01-20 12:34 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf, aliguori, jvrao, aneesh.kumar, stefanha
This patch allows subsystems to create their own private
queues with associated pools of threads.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Reviewed-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
qemu-threadlet.c | 85 +++++++++++++++++++++++++++++++++++++-----------------
qemu-threadlet.h | 4 +++
2 files changed, 63 insertions(+), 26 deletions(-)
diff --git a/qemu-threadlet.c b/qemu-threadlet.c
index 6523f08..4bad8c6 100644
--- a/qemu-threadlet.c
+++ b/qemu-threadlet.c
@@ -99,6 +99,25 @@ static void spawn_threadlet(ThreadletQueue *queue)
}
/**
+ * submit_work_to_queue: Submit a new task to a private queue to be
+ * executed asynchronously.
+ * @queue: Per-subsystem private queue to which the new task needs
+ * to be submitted.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
+{
+ qemu_mutex_lock(&queue->lock);
+ if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+ spawn_threadlet(queue);
+ } else {
+ qemu_cond_signal(&queue->cond);
+ }
+ QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
+ qemu_mutex_unlock(&queue->lock);
+}
+
+/**
* submit_work: Submit to the global queue a new task to be executed
* asynchronously.
* @work: Contains information about the task that needs to be submitted.
@@ -106,49 +125,63 @@ static void spawn_threadlet(ThreadletQueue *queue)
void submit_work(ThreadletWork *work)
{
if (!globalqueue_init) {
- globalqueue.cur_threads = 0;
- globalqueue.idle_threads = 0;
- globalqueue.max_threads = MAX_GLOBAL_THREADS;
- globalqueue.min_threads = MIN_GLOBAL_THREADS;
- QTAILQ_INIT(&globalqueue.request_list);
- qemu_mutex_init(&globalqueue.lock);
- qemu_cond_init(&globalqueue.cond);
-
+ threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
+ MIN_GLOBAL_THREADS);
globalqueue_init = 1;
}
- qemu_mutex_lock(&globalqueue.lock);
- if (globalqueue.idle_threads == 0 &&
- globalqueue.cur_threads < globalqueue.max_threads) {
- spawn_threadlet(&globalqueue);
- } else {
- qemu_cond_signal(&globalqueue.cond);
- }
- QTAILQ_INSERT_TAIL(&globalqueue.request_list, work, node);
- qemu_mutex_unlock(&globalqueue.lock);
+ submit_work_to_queue(&globalqueue, work);
}
/**
- * dequeue_work: Cancel a task queued on the global queue.
+ * dequeue_work_on_queue: Cancel a task queued on a Queue.
+ * @queue: The queue containing the task to be cancelled.
* @work: Contains the information of the task that needs to be cancelled.
- *
- * Returns: 0 if successfully dequeued work.
- * 1 otherwise.
*/
-int dequeue_work(ThreadletWork *work)
+int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
{
int ret = 1;
ThreadletWork *ret_work;
- qemu_mutex_lock(&globalqueue.lock);
- QTAILQ_FOREACH(ret_work, &(globalqueue.request_list), node) {
+ qemu_mutex_lock(&queue->lock);
+ QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
if (ret_work == work) {
- QTAILQ_REMOVE(&globalqueue.request_list, ret_work, node);
+ QTAILQ_REMOVE(&queue->request_list, work, node);
ret = 0;
break;
}
}
- qemu_mutex_unlock(&globalqueue.lock);
+ qemu_mutex_unlock(&queue->lock);
return ret;
}
+
+/**
+ * dequeue_work: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if successfully dequeued work.
+ * 1 otherwise.
+ */
+int dequeue_work(ThreadletWork *work)
+{
+ return dequeue_work_on_queue(&globalqueue, work);
+}
+
+/**
+ * threadlet_queue_init: Initialize a threadlet queue.
+ * @queue: The threadlet queue to be initialized.
+ * @max_threads: Maximum number of threads processing the queue.
+ * @min_threads: Minimum number of threads processing the queue.
+ */
+void threadlet_queue_init(ThreadletQueue *queue,
+ int max_threads, int min_threads)
+{
+ queue->cur_threads = 0;
+ queue->idle_threads = 0;
+ queue->max_threads = max_threads;
+ queue->min_threads = min_threads;
+ QTAILQ_INIT(&queue->request_list);
+ qemu_mutex_init(&queue->lock);
+ qemu_cond_init(&queue->cond);
+}
diff --git a/qemu-threadlet.h b/qemu-threadlet.h
index 6b11c86..25e77f2 100644
--- a/qemu-threadlet.h
+++ b/qemu-threadlet.h
@@ -38,7 +38,11 @@ typedef struct ThreadletWork
void (*func)(struct ThreadletWork *work);
} ThreadletWork;
+void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work);
void submit_work(ThreadletWork *work);
+int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work);
int dequeue_work(ThreadletWork *work);
void threadlet_init(void);
+void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
+ int min_threads);
#endif
^ permalink raw reply related [flat|nested] 14+ messages in thread
* [Qemu-devel] [PATCH 12/12] Threadlets: Add documentation
2011-01-20 12:33 [Qemu-devel] [PATCH 00/12] Threadlet Infrastructure Arun R Bharadwaj
` (10 preceding siblings ...)
2011-01-20 12:34 ` [Qemu-devel] [PATCH 11/12] Threadlets: Add functionality to create private queues Arun R Bharadwaj
@ 2011-01-20 12:34 ` Arun R Bharadwaj
11 siblings, 0 replies; 14+ messages in thread
From: Arun R Bharadwaj @ 2011-01-20 12:34 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf, aliguori, jvrao, aneesh.kumar, stefanha
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com>
Reviewed-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com>
---
docs/async-support.txt | 141 ++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 141 insertions(+), 0 deletions(-)
create mode 100644 docs/async-support.txt
diff --git a/docs/async-support.txt b/docs/async-support.txt
new file mode 100644
index 0000000..9f22b9a
--- /dev/null
+++ b/docs/async-support.txt
@@ -0,0 +1,141 @@
+== How to use the threadlets infrastructure supported in Qemu ==
+
+== Threadlets ==
+
+Q.1: What are threadlets ?
+A.1: Threadlets is an infrastructure within QEMU that allows other subsystems
+ to offload possibly blocking work to a queue to be processed by a pool
+ of threads asynchronously.
+
+Q.2: When would one want to use threadlets ?
+A.2: Threadlets are useful when there are operations that can be performed
+ outside the context of the VCPU/IO threads inorder to free these latter
+ to service any other guest requests.
+
+Q.3: I have some work that can be executed in an asynchronous context. How
+ should I go about it ?
+A.3: One could follow the steps listed below:
+
+ - Define a function which would do the asynchronous work.
+ static void my_threadlet_func(ThreadletWork *work)
+ {
+ }
+
+ - Declare an object of type ThreadletWork;
+ ThreadletWork work;
+
+
+ - Assign a value to the "func" member of ThreadletWork object.
+ work.func = my_threadlet_func;
+
+ - Submit the threadlet to the global queue.
+ submit_threadletwork(&work);
+
+ - Continue servicing some other guest operations.
+
+Q.4: I want to my_threadlet_func to access some non-global data. How do I do
+ that ?
+A.4: Suppose you want my_threadlet_func to access some non-global data-object
+ of type myPrivateData. In that case one could follow the following steps.
+
+ - Define a member of the type ThreadletWork within myPrivateData.
+ typedef struct MyPrivateData {
+ ...;
+ ...;
+ ...;
+ ThreadletWork work;
+ } MyPrivateData;
+
+ MyPrivateData my_data;
+
+ - Initialize myData.work as described in A.3
+ myData.work.func = my_threadlet_func;
+ submit_threadletwork(&myData.work);
+
+ - Access the myData object inside my_threadlet_func() using container_of
+ primitive
+ static void my_threadlet_func(ThreadletWork *work)
+ {
+ myPrivateData *mydata_ptr;
+ mydata_ptr = container_of(work, myPrivateData, work);
+
+ /* mydata_ptr now points to myData object */
+ }
+
+Q.5: Are there any precautions one must take while sharing data with the
+ Asynchronous thread-pool ?
+A.5: Yes, make sure that the helper function of the type my_threadlet_func()
+ does not access/modify data when it can be accessed or modified in the
+ context of VCPU thread or IO thread. This is because the asynchronous
+ threads in the pool can run in parallel with the VCPU/IOThreads as shown
+ in the figure.
+
+ A typical workflow is as follows:
+
+ VCPU/IOThread
+ |
+ | (1)
+ |
+ V
+ Offload work (2)
+ |-------> to threadlets -----------------------------> Helper thread
+ | | |
+ | | |
+ | | (3) | (4)
+ | | |
+ | Handle other Guest requests |
+ | | |
+ | | V
+ | | (3) Signal the I/O Thread
+ |(6) | |
+ | | /
+ | | /
+ | V /
+ | Do the post <---------------------------------/
+ | processing (5)
+ | |
+ | | (6)
+ | V
+ |-Yes------ More async work?
+ |
+ | (7)
+ No
+ |
+ |
+ .
+ .
+
+ Hence one needs to make sure that in the steps (3) and (4) which run in
+ parallel, any global data is accessed within only one context.
+
+Q.6: I have queued a threadlet which I want to cancel. How do I do that ?
+A.6: Threadlets framework provides the API cancel_threadlet:
+ - int cancel_threadletwork(ThreadletWork *work)
+
+ The API scans the ThreadletQueue to see if (work) is present. If it finds
+ work, it'll dequeue work and return 0.
+
+ On the other hand, if it does not find the (work) in the ThreadletQueue,
+ then it'll return 1. This can imply two things. Either the work is being
+ processed by one of the helper threads or it has been processed. The
+ threadlet infrastructure currently _does_not_ distinguish between these
+ two and the onus is on the caller to do that.
+
+Q.7: Apart from the global pool of threads, can I have my own private Queue ?
+A.7: Yes, the threadlets framework allows subsystems to create their own private
+ queues with associated pools of threads.
+
+ - Define a PrivateQueue
+ ThreadletQueue myQueue;
+
+ - Initialize it:
+ threadlet_queue_init(&myQueue, my_max_threads, my_min_threads);
+ where my_max_threads is the maximum number of threads that can be in the
+ thread pool and my_min_threads is the minimum number of active threads
+ that can be in the thread-pool.
+
+ - Submit work:
+ submit_threadletwork_to_queue(&myQueue, &my_work);
+
+ - Cancel work:
+ cancel_threadletwork_on_queue(&myQueue, &my_work);
^ permalink raw reply related [flat|nested] 14+ messages in thread
end of thread, other threads:[~2011-01-20 12:43 UTC | newest]
Thread overview: 14+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2011-01-20 12:33 [Qemu-devel] [PATCH 00/12] Threadlet Infrastructure Arun R Bharadwaj
2011-01-20 12:33 ` [Qemu-devel] [PATCH 01/12] Add aiocb_mutex and aiocb_completion Arun R Bharadwaj
2011-01-20 12:33 ` [Qemu-devel] [PATCH 02/12] Introduce work concept in posix-aio-compat.c Arun R Bharadwaj
2011-01-20 12:33 ` [Qemu-devel] [PATCH 03/12] Add callback function to ThreadletWork structure Arun R Bharadwaj
2011-01-20 12:33 ` [Qemu-devel] [PATCH 04/12] Add ThreadletQueue Arun R Bharadwaj
2011-01-20 12:33 ` [Qemu-devel] [PATCH 05/12] Threadlet: Add submit_work threadlet API Arun R Bharadwaj
2011-01-20 12:33 ` [Qemu-devel] [PATCH 06/12] Threadlet: Add dequeue_work threadlet API and remove active field Arun R Bharadwaj
2011-01-20 12:33 ` [Qemu-devel] [PATCH 07/12] Remove thread_create routine Arun R Bharadwaj
2011-01-20 12:34 ` [Qemu-devel] [PATCH 08/12] Threadlet: Add aio_signal_handler threadlet API Arun R Bharadwaj
2011-01-20 12:34 ` [Qemu-devel] [PATCH 09/12] Remove all instances of CONFIG_THREAD Arun R Bharadwaj
2011-01-20 12:34 ` [Qemu-devel] [PATCH 10/12] Move threadlet code to qemu-threadlets.c Arun R Bharadwaj
2011-01-20 12:34 ` [Qemu-devel] [PATCH 11/12] Threadlets: Add functionality to create private queues Arun R Bharadwaj
2011-01-20 12:34 ` [Qemu-devel] [PATCH 12/12] Threadlets: Add documentation Arun R Bharadwaj
-- strict thread matches above, loose matches on Subject: below --
2011-01-13 12:14 [Qemu-devel] [PATCH 00/12] Threadlets Infrastructure Arun R Bharadwaj
2011-01-13 12:14 ` [Qemu-devel] [PATCH 02/12] Introduce work concept in posix-aio-compat.c Arun R Bharadwaj
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).