* [Qemu-devel] [PATCH 01/13] Add aiocb_mutex and aiocb_completion.
2011-01-04 5:27 [Qemu-devel] [PATCH 00/13] Threadlets infrastructure Arun R Bharadwaj
@ 2011-01-04 5:27 ` Arun R Bharadwaj
2011-01-05 19:53 ` [Qemu-devel] " Stefan Hajnoczi
2011-01-04 5:27 ` [Qemu-devel] [PATCH 02/13] Introduce work concept in posix-aio-compat.c Arun R Bharadwaj
` (12 subsequent siblings)
13 siblings, 1 reply; 27+ messages in thread
From: Arun R Bharadwaj @ 2011-01-04 5:27 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha, aneesh.kumar
This patch adds the aiocb_mutex to protect aiocb.
This patch also removes the infinite loop present in paio_cancel.
Since there can only be one cancellation at a time, we need to
introduce a condition variable. For this, we need a
global aiocb_completion condition variable.
This patch also adds the Makefile entry to compile qemu-thread.c
when CONFIG_POSIX is set, instead of the unused CONFIG_THREAD.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
Makefile.objs | 2 +-
posix-aio-compat.c | 37 ++++++++++++++++++++++++++++---------
2 files changed, 29 insertions(+), 10 deletions(-)
diff --git a/Makefile.objs b/Makefile.objs
index cd5a24b..3b7ec27 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -9,6 +9,7 @@ qobject-obj-y += qerror.o
block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
+block-obj-$(CONFIG_POSIX) += qemu-thread.o
block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
@@ -124,7 +125,6 @@ endif
common-obj-y += $(addprefix ui/, $(ui-obj-y))
common-obj-y += iov.o acl.o
-common-obj-$(CONFIG_THREAD) += qemu-thread.o
common-obj-y += notify.o event_notifier.o
common-obj-y += qemu-timer.o
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 7b862b5..ae5e20e 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -27,9 +27,12 @@
#include "qemu-common.h"
#include "trace.h"
#include "block_int.h"
+#include "qemu-thread.h"
#include "block/raw-posix-aio.h"
+static QemuMutex aiocb_mutex;
+static QemuCond aiocb_completion;
struct qemu_paiocb {
BlockDriverAIOCB common;
@@ -351,10 +354,14 @@ static void *aio_thread(void *unused)
}
mutex_lock(&lock);
- aiocb->ret = ret;
idle_threads++;
mutex_unlock(&lock);
+ qemu_mutex_lock(&aiocb_mutex);
+ aiocb->ret = ret;
+ qemu_cond_broadcast(&aiocb_completion);
+ qemu_mutex_unlock(&aiocb_mutex);
+
if (kill(pid, aiocb->ev_signo)) die("kill failed");
}
@@ -383,8 +390,11 @@ static void spawn_thread(void)
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
{
+ qemu_mutex_lock(&aiocb_mutex);
aiocb->ret = -EINPROGRESS;
aiocb->active = 0;
+ qemu_mutex_unlock(&aiocb_mutex);
+
mutex_lock(&lock);
if (idle_threads == 0 && cur_threads < max_threads)
spawn_thread();
@@ -397,9 +407,9 @@ static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
{
ssize_t ret;
- mutex_lock(&lock);
+ qemu_mutex_lock(&aiocb_mutex);
ret = aiocb->ret;
- mutex_unlock(&lock);
+ qemu_mutex_unlock(&aiocb_mutex);
return ret;
}
@@ -545,13 +555,19 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
}
mutex_unlock(&lock);
- if (active) {
- /* fail safe: if the aio could not be canceled, we wait for
- it */
- while (qemu_paio_error(acb) == EINPROGRESS)
- ;
+ qemu_mutex_lock(&aiocb_mutex);
+ if (!active) {
+ acb->ret = -ECANCELED;
+ } else {
+ while (acb->ret == -EINPROGRESS) {
+ /*
+ * fail safe: if the aio could not be canceled,
+ * we wait for it
+ */
+ qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
+ }
}
-
+ qemu_mutex_unlock(&aiocb_mutex);
paio_remove(acb);
}
@@ -623,6 +639,9 @@ int paio_init(void)
if (posix_aio_state)
return 0;
+ qemu_mutex_init(&aiocb_mutex);
+ qemu_cond_init(&aiocb_completion);
+
s = qemu_malloc(sizeof(PosixAioState));
sigfillset(&act.sa_mask);
^ permalink raw reply related [flat|nested] 27+ messages in thread
* [Qemu-devel] Re: [PATCH 01/13] Add aiocb_mutex and aiocb_completion.
2011-01-04 5:27 ` [Qemu-devel] [PATCH 01/13] Add aiocb_mutex and aiocb_completion Arun R Bharadwaj
@ 2011-01-05 19:53 ` Stefan Hajnoczi
2011-01-06 10:27 ` Arun R Bharadwaj
0 siblings, 1 reply; 27+ messages in thread
From: Stefan Hajnoczi @ 2011-01-05 19:53 UTC (permalink / raw)
To: Arun R Bharadwaj; +Cc: aliguori, qemu-devel, aneesh.kumar
On Tue, Jan 04, 2011 at 10:57:08AM +0530, Arun R Bharadwaj wrote:
> @@ -545,13 +555,19 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
> }
> mutex_unlock(&lock);
>
> - if (active) {
> - /* fail safe: if the aio could not be canceled, we wait for
> - it */
> - while (qemu_paio_error(acb) == EINPROGRESS)
> - ;
> + qemu_mutex_lock(&aiocb_mutex);
> + if (!active) {
> + acb->ret = -ECANCELED;
> + } else {
> + while (acb->ret == -EINPROGRESS) {
> + /*
> + * fail safe: if the aio could not be canceled,
> + * we wait for it
> + */
> + qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
> + }
> }
> -
> + qemu_mutex_unlock(&aiocb_mutex);
> paio_remove(acb);
> }
acb->ret and acb->active have been moved under aiocb_mutex. They are still
accessed under lock here and this needs to be fixed:
mutex_lock(&lock);
if (!acb->active) {
QTAILQ_REMOVE(&request_list, acb, node);
acb->ret = -ECANCELED;
} else if (acb->ret == -EINPROGRESS) {
active = 1;
}
mutex_unlock(&lock);
Stefan
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Qemu-devel] Re: [PATCH 01/13] Add aiocb_mutex and aiocb_completion.
2011-01-05 19:53 ` [Qemu-devel] " Stefan Hajnoczi
@ 2011-01-06 10:27 ` Arun R Bharadwaj
0 siblings, 0 replies; 27+ messages in thread
From: Arun R Bharadwaj @ 2011-01-06 10:27 UTC (permalink / raw)
To: Stefan Hajnoczi; +Cc: aliguori, qemu-devel, aneesh.kumar
* Stefan Hajnoczi <stefanha@linux.vnet.ibm.com> [2011-01-05 19:53:44]:
> On Tue, Jan 04, 2011 at 10:57:08AM +0530, Arun R Bharadwaj wrote:
> > @@ -545,13 +555,19 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
> > }
> > mutex_unlock(&lock);
> >
> > - if (active) {
> > - /* fail safe: if the aio could not be canceled, we wait for
> > - it */
> > - while (qemu_paio_error(acb) == EINPROGRESS)
> > - ;
> > + qemu_mutex_lock(&aiocb_mutex);
> > + if (!active) {
> > + acb->ret = -ECANCELED;
> > + } else {
> > + while (acb->ret == -EINPROGRESS) {
> > + /*
> > + * fail safe: if the aio could not be canceled,
> > + * we wait for it
> > + */
> > + qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
> > + }
> > }
> > -
> > + qemu_mutex_unlock(&aiocb_mutex);
> > paio_remove(acb);
> > }
>
> acb->ret and acb->active have been moved under aiocb_mutex. They are still
> accessed under lock here and this needs to be fixed:
>
> mutex_lock(&lock);
> if (!acb->active) {
> QTAILQ_REMOVE(&request_list, acb, node);
> acb->ret = -ECANCELED;
> } else if (acb->ret == -EINPROGRESS) {
> active = 1;
> }
> mutex_unlock(&lock);
>
You are right. This needs to go under aiocb_mutex too.
-arun
> Stefan
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Qemu-devel] [PATCH 02/13] Introduce work concept in posix-aio-compat.c
2011-01-04 5:27 [Qemu-devel] [PATCH 00/13] Threadlets infrastructure Arun R Bharadwaj
2011-01-04 5:27 ` [Qemu-devel] [PATCH 01/13] Add aiocb_mutex and aiocb_completion Arun R Bharadwaj
@ 2011-01-04 5:27 ` Arun R Bharadwaj
2011-01-04 5:27 ` [Qemu-devel] [PATCH 03/13] Add callback function to ThreadletWork structure Arun R Bharadwaj
` (11 subsequent siblings)
13 siblings, 0 replies; 27+ messages in thread
From: Arun R Bharadwaj @ 2011-01-04 5:27 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha, aneesh.kumar
This patch introduces work concept by introducing the
ThreadletWork structure.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
posix-aio-compat.c | 20 ++++++++++++++------
1 files changed, 14 insertions(+), 6 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index ae5e20e..de52eb5 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -33,6 +33,10 @@
static QemuMutex aiocb_mutex;
static QemuCond aiocb_completion;
+typedef struct ThreadletWork
+{
+ QTAILQ_ENTRY(ThreadletWork) node;
+} ThreadletWork;
struct qemu_paiocb {
BlockDriverAIOCB common;
@@ -47,13 +51,13 @@ struct qemu_paiocb {
int ev_signo;
off_t aio_offset;
- QTAILQ_ENTRY(qemu_paiocb) node;
int aio_type;
ssize_t ret;
int active;
struct qemu_paiocb *next;
int async_context_id;
+ ThreadletWork work;
};
typedef struct PosixAioState {
@@ -69,7 +73,7 @@ static pthread_attr_t attr;
static int max_threads = 64;
static int cur_threads = 0;
static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
+static QTAILQ_HEAD(, ThreadletWork) request_list;
#ifdef CONFIG_PREADV
static int preadv_present = 1;
@@ -307,6 +311,7 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
static void *aio_thread(void *unused)
{
pid_t pid;
+ ThreadletWork *work;
pid = getpid();
@@ -330,8 +335,11 @@ static void *aio_thread(void *unused)
if (QTAILQ_EMPTY(&request_list))
break;
- aiocb = QTAILQ_FIRST(&request_list);
- QTAILQ_REMOVE(&request_list, aiocb, node);
+ work = QTAILQ_FIRST(&request_list);
+ QTAILQ_REMOVE(&request_list, work, node);
+
+ aiocb = container_of(work, struct qemu_paiocb, work);
+
aiocb->active = 1;
idle_threads--;
mutex_unlock(&lock);
@@ -398,7 +406,7 @@ static void qemu_paio_submit(struct qemu_paiocb *aiocb)
mutex_lock(&lock);
if (idle_threads == 0 && cur_threads < max_threads)
spawn_thread();
- QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
+ QTAILQ_INSERT_TAIL(&request_list, &aiocb->work, node);
mutex_unlock(&lock);
cond_signal(&cond);
}
@@ -548,7 +556,7 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
mutex_lock(&lock);
if (!acb->active) {
- QTAILQ_REMOVE(&request_list, acb, node);
+ QTAILQ_REMOVE(&request_list, &acb->work, node);
acb->ret = -ECANCELED;
} else if (acb->ret == -EINPROGRESS) {
active = 1;
^ permalink raw reply related [flat|nested] 27+ messages in thread
* [Qemu-devel] [PATCH 03/13] Add callback function to ThreadletWork structure.
2011-01-04 5:27 [Qemu-devel] [PATCH 00/13] Threadlets infrastructure Arun R Bharadwaj
2011-01-04 5:27 ` [Qemu-devel] [PATCH 01/13] Add aiocb_mutex and aiocb_completion Arun R Bharadwaj
2011-01-04 5:27 ` [Qemu-devel] [PATCH 02/13] Introduce work concept in posix-aio-compat.c Arun R Bharadwaj
@ 2011-01-04 5:27 ` Arun R Bharadwaj
2011-01-05 19:54 ` [Qemu-devel] " Stefan Hajnoczi
2011-01-04 5:27 ` [Qemu-devel] [PATCH 04/13] Add ThreadletQueue Arun R Bharadwaj
` (10 subsequent siblings)
13 siblings, 1 reply; 27+ messages in thread
From: Arun R Bharadwaj @ 2011-01-04 5:27 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha, aneesh.kumar
This patch adds the callback function to the ThreadletWork
structure and moves aio handler as a callback function.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
posix-aio-compat.c | 88 +++++++++++++++++++++++++++++-----------------------
1 files changed, 49 insertions(+), 39 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index de52eb5..0a4d82b 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -36,6 +36,7 @@ static QemuCond aiocb_completion;
typedef struct ThreadletWork
{
QTAILQ_ENTRY(ThreadletWork) node;
+ void (*func)(struct ThreadletWork *work);
} ThreadletWork;
struct qemu_paiocb {
@@ -308,18 +309,14 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
return nbytes;
}
-static void *aio_thread(void *unused)
+static void *threadlet_worker(void *data)
{
- pid_t pid;
- ThreadletWork *work;
-
- pid = getpid();
while (1) {
- struct qemu_paiocb *aiocb;
ssize_t ret = 0;
qemu_timeval tv;
struct timespec ts;
+ ThreadletWork *work;
qemu_gettimeofday(&tv);
ts.tv_sec = tv.tv_sec + 10;
@@ -332,52 +329,63 @@ static void *aio_thread(void *unused)
ret = cond_timedwait(&cond, &lock, &ts);
}
- if (QTAILQ_EMPTY(&request_list))
+ if (QTAILQ_EMPTY(&request_list)) {
+ idle_threads--;
+ cur_threads--;
+ mutex_unlock(&lock);
break;
-
+ }
work = QTAILQ_FIRST(&request_list);
QTAILQ_REMOVE(&request_list, work, node);
-
- aiocb = container_of(work, struct qemu_paiocb, work);
-
- aiocb->active = 1;
idle_threads--;
mutex_unlock(&lock);
- switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
- case QEMU_AIO_READ:
- case QEMU_AIO_WRITE:
- ret = handle_aiocb_rw(aiocb);
- break;
- case QEMU_AIO_FLUSH:
- ret = handle_aiocb_flush(aiocb);
- break;
- case QEMU_AIO_IOCTL:
- ret = handle_aiocb_ioctl(aiocb);
- break;
- default:
- fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
- ret = -EINVAL;
- break;
- }
-
+ work->func(work);
mutex_lock(&lock);
idle_threads++;
mutex_unlock(&lock);
- qemu_mutex_lock(&aiocb_mutex);
- aiocb->ret = ret;
- qemu_cond_broadcast(&aiocb_completion);
- qemu_mutex_unlock(&aiocb_mutex);
+ }
+
+ return NULL;
+}
+
+static void aio_thread(ThreadletWork *work)
+{
+ pid_t pid;
+ ssize_t ret = 0;
+ struct qemu_paiocb *aiocb;
+
+ pid = getpid();
+ aiocb = container_of(work, struct qemu_paiocb, work);
+ aiocb->active = 1;
- if (kill(pid, aiocb->ev_signo)) die("kill failed");
+ switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+ case QEMU_AIO_READ:
+ case QEMU_AIO_WRITE:
+ ret = handle_aiocb_rw(aiocb);
+ break;
+ case QEMU_AIO_FLUSH:
+ ret = handle_aiocb_flush(aiocb);
+ break;
+ case QEMU_AIO_IOCTL:
+ ret = handle_aiocb_ioctl(aiocb);
+ break;
+ default:
+ fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+ ret = -EINVAL;
+ break;
}
- idle_threads--;
- cur_threads--;
- mutex_unlock(&lock);
+ qemu_mutex_lock(&aiocb_mutex);
+ aiocb->ret = ret;
+ qemu_cond_broadcast(&aiocb_completion);
+ qemu_mutex_unlock(&aiocb_mutex);
- return NULL;
+ if (kill(pid, aiocb->ev_signo)) {
+ die("kill failed");
+ }
+ return;
}
static void spawn_thread(void)
@@ -391,7 +399,7 @@ static void spawn_thread(void)
if (sigfillset(&set)) die("sigfillset");
if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
- thread_create(&thread_id, &attr, aio_thread, NULL);
+ thread_create(&thread_id, &attr, threadlet_worker, NULL);
if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
}
@@ -406,6 +414,8 @@ static void qemu_paio_submit(struct qemu_paiocb *aiocb)
mutex_lock(&lock);
if (idle_threads == 0 && cur_threads < max_threads)
spawn_thread();
+
+ aiocb->work.func = aio_thread;
QTAILQ_INSERT_TAIL(&request_list, &aiocb->work, node);
mutex_unlock(&lock);
cond_signal(&cond);
^ permalink raw reply related [flat|nested] 27+ messages in thread
* [Qemu-devel] Re: [PATCH 03/13] Add callback function to ThreadletWork structure.
2011-01-04 5:27 ` [Qemu-devel] [PATCH 03/13] Add callback function to ThreadletWork structure Arun R Bharadwaj
@ 2011-01-05 19:54 ` Stefan Hajnoczi
2011-01-06 10:24 ` Arun R Bharadwaj
0 siblings, 1 reply; 27+ messages in thread
From: Stefan Hajnoczi @ 2011-01-05 19:54 UTC (permalink / raw)
To: Arun R Bharadwaj; +Cc: aliguori, qemu-devel, aneesh.kumar
On Tue, Jan 04, 2011 at 10:57:20AM +0530, Arun R Bharadwaj wrote:
> +static void aio_thread(ThreadletWork *work)
> +{
aio_thread() is not a descriptive name here. This isn't the top-level
thread function, just the work->func. Please choose something like
handle_aiocb() or handle_work().
> + pid_t pid;
> + ssize_t ret = 0;
> + struct qemu_paiocb *aiocb;
> +
> + pid = getpid();
> + aiocb = container_of(work, struct qemu_paiocb, work);
> + aiocb->active = 1;
aiocb_mutex?
>
> - if (kill(pid, aiocb->ev_signo)) die("kill failed");
> + switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> + case QEMU_AIO_READ:
> + case QEMU_AIO_WRITE:
> + ret = handle_aiocb_rw(aiocb);
> + break;
> + case QEMU_AIO_FLUSH:
> + ret = handle_aiocb_flush(aiocb);
> + break;
> + case QEMU_AIO_IOCTL:
> + ret = handle_aiocb_ioctl(aiocb);
> + break;
> + default:
> + fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> + ret = -EINVAL;
> + break;
> }
>
> - idle_threads--;
> - cur_threads--;
> - mutex_unlock(&lock);
> + qemu_mutex_lock(&aiocb_mutex);
> + aiocb->ret = ret;
> + qemu_cond_broadcast(&aiocb_completion);
> + qemu_mutex_unlock(&aiocb_mutex);
>
> - return NULL;
> + if (kill(pid, aiocb->ev_signo)) {
> + die("kill failed");
> + }
> + return;
return not needed in void function, please remove.
Stefan
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Qemu-devel] Re: [PATCH 03/13] Add callback function to ThreadletWork structure.
2011-01-05 19:54 ` [Qemu-devel] " Stefan Hajnoczi
@ 2011-01-06 10:24 ` Arun R Bharadwaj
0 siblings, 0 replies; 27+ messages in thread
From: Arun R Bharadwaj @ 2011-01-06 10:24 UTC (permalink / raw)
To: Stefan Hajnoczi; +Cc: aliguori, qemu-devel, aneesh.kumar
* Stefan Hajnoczi <stefanha@linux.vnet.ibm.com> [2011-01-05 19:54:17]:
> On Tue, Jan 04, 2011 at 10:57:20AM +0530, Arun R Bharadwaj wrote:
> > +static void aio_thread(ThreadletWork *work)
> > +{
>
> aio_thread() is not a descriptive name here. This isn't the top-level
> thread function, just the work->func. Please choose something like
> handle_aiocb() or handle_work().
>
> > + pid_t pid;
> > + ssize_t ret = 0;
> > + struct qemu_paiocb *aiocb;
> > +
> > + pid = getpid();
> > + aiocb = container_of(work, struct qemu_paiocb, work);
> > + aiocb->active = 1;
>
> aiocb_mutex?
>
I'll take care of this.
-arun
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Qemu-devel] [PATCH 04/13] Add ThreadletQueue.
2011-01-04 5:27 [Qemu-devel] [PATCH 00/13] Threadlets infrastructure Arun R Bharadwaj
` (2 preceding siblings ...)
2011-01-04 5:27 ` [Qemu-devel] [PATCH 03/13] Add callback function to ThreadletWork structure Arun R Bharadwaj
@ 2011-01-04 5:27 ` Arun R Bharadwaj
2011-01-05 19:54 ` [Qemu-devel] " Stefan Hajnoczi
2011-01-04 5:27 ` [Qemu-devel] [PATCH 05/13] Threadlet: Add submit_work threadlet API Arun R Bharadwaj
` (9 subsequent siblings)
13 siblings, 1 reply; 27+ messages in thread
From: Arun R Bharadwaj @ 2011-01-04 5:27 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha, aneesh.kumar
This patch adds a global queue of type ThreadletQueue and
removes the earlier usage of request_list queue.
We want to create the thread on the first submit. Hence we
need to track whether the globalqueue is initialized or not.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
posix-aio-compat.c | 161 +++++++++++++++++++++++++---------------------------
1 files changed, 77 insertions(+), 84 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 0a4d82b..7b41235 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -31,8 +31,23 @@
#include "block/raw-posix-aio.h"
+#define MAX_GLOBAL_THREADS 64
+#define MIN_GLOBAL_THREADS 8
+
static QemuMutex aiocb_mutex;
static QemuCond aiocb_completion;
+
+typedef struct ThreadletQueue
+{
+ QemuMutex lock;
+ QemuCond cond;
+ int max_threads;
+ int min_threads;
+ int cur_threads;
+ int idle_threads;
+ QTAILQ_HEAD(, ThreadletWork) request_list;
+} ThreadletQueue;
+
typedef struct ThreadletWork
{
QTAILQ_ENTRY(ThreadletWork) node;
@@ -66,15 +81,10 @@ typedef struct PosixAioState {
struct qemu_paiocb *first_aio;
} PosixAioState;
-
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
+/* Default ThreadletQueue */
+static ThreadletQueue globalqueue;
+static int globalqueue_init;
static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, ThreadletWork) request_list;
#ifdef CONFIG_PREADV
static int preadv_present = 1;
@@ -93,32 +103,6 @@ static void die(const char *what)
die2(errno, what);
}
-static void mutex_lock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_lock(mutex);
- if (ret) die2(ret, "pthread_mutex_lock");
-}
-
-static void mutex_unlock(pthread_mutex_t *mutex)
-{
- int ret = pthread_mutex_unlock(mutex);
- if (ret) die2(ret, "pthread_mutex_unlock");
-}
-
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
- struct timespec *ts)
-{
- int ret = pthread_cond_timedwait(cond, mutex, ts);
- if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
- return ret;
-}
-
-static void cond_signal(pthread_cond_t *cond)
-{
- int ret = pthread_cond_signal(cond);
- if (ret) die2(ret, "pthread_cond_signal");
-}
-
static void thread_create(pthread_t *thread, pthread_attr_t *attr,
void *(*start_routine)(void*), void *arg)
{
@@ -311,42 +295,45 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
static void *threadlet_worker(void *data)
{
+ ThreadletQueue *queue = data;
+ qemu_mutex_lock(&queue->lock);
while (1) {
- ssize_t ret = 0;
- qemu_timeval tv;
- struct timespec ts;
ThreadletWork *work;
+ int ret = 0;
- qemu_gettimeofday(&tv);
- ts.tv_sec = tv.tv_sec + 10;
- ts.tv_nsec = 0;
-
- mutex_lock(&lock);
-
- while (QTAILQ_EMPTY(&request_list) &&
- !(ret == ETIMEDOUT)) {
- ret = cond_timedwait(&cond, &lock, &ts);
+ while (QTAILQ_EMPTY(&queue->request_list) &&
+ (ret != ETIMEDOUT)) {
+ /* wait for cond to be signalled or broadcast for 1000s */
+ ret = qemu_cond_timedwait((&queue->cond),
+ &(queue->lock), 10*100000);
}
- if (QTAILQ_EMPTY(&request_list)) {
- idle_threads--;
- cur_threads--;
- mutex_unlock(&lock);
- break;
- }
- work = QTAILQ_FIRST(&request_list);
- QTAILQ_REMOVE(&request_list, work, node);
- idle_threads--;
- mutex_unlock(&lock);
+ assert(queue->idle_threads != 0);
+ if (QTAILQ_EMPTY(&queue->request_list)) {
+ if (queue->cur_threads > queue->min_threads) {
+ /* We retain the minimum number of threads */
+ break;
+ }
+ } else {
+ work = QTAILQ_FIRST(&queue->request_list);
+ QTAILQ_REMOVE(&queue->request_list, work, node);
- work->func(work);
- mutex_lock(&lock);
- idle_threads++;
- mutex_unlock(&lock);
+ queue->idle_threads--;
+ qemu_mutex_unlock(&queue->lock);
+ /* execute the work function */
+ work->func(work);
+
+ qemu_mutex_lock(&queue->lock);
+ queue->idle_threads++;
+ }
}
+ queue->idle_threads--;
+ queue->cur_threads--;
+ qemu_mutex_unlock(&queue->lock);
+
return NULL;
}
@@ -388,22 +375,24 @@ static void aio_thread(ThreadletWork *work)
return;
}
-static void spawn_thread(void)
+static void spawn_threadlet(ThreadletQueue *queue)
{
+ pthread_t thread_id;
sigset_t set, oldset;
- cur_threads++;
- idle_threads++;
+ queue->cur_threads++;
+ queue->idle_threads++;
/* block all signals */
if (sigfillset(&set)) die("sigfillset");
if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
- thread_create(&thread_id, &attr, threadlet_worker, NULL);
+ thread_create(&thread_id, &attr, threadlet_worker, queue);
if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
}
+
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
{
qemu_mutex_lock(&aiocb_mutex);
@@ -411,14 +400,29 @@ static void qemu_paio_submit(struct qemu_paiocb *aiocb)
aiocb->active = 0;
qemu_mutex_unlock(&aiocb_mutex);
- mutex_lock(&lock);
- if (idle_threads == 0 && cur_threads < max_threads)
- spawn_thread();
+ qemu_mutex_lock(&globalqueue.lock);
+
+ if (!globalqueue_init) {
+ globalqueue.cur_threads = 0;
+ globalqueue.idle_threads = 0;
+ globalqueue.max_threads = MAX_GLOBAL_THREADS;
+ globalqueue.min_threads = MIN_GLOBAL_THREADS;
+ QTAILQ_INIT(&globalqueue.request_list);
+ qemu_mutex_init(&globalqueue.lock);
+ qemu_cond_init(&globalqueue.cond);
+
+ globalqueue_init = 1;
+ }
+
+ if (globalqueue.idle_threads == 0 &&
+ globalqueue.cur_threads < globalqueue.max_threads)
+ spawn_threadlet(&globalqueue);
aiocb->work.func = aio_thread;
- QTAILQ_INSERT_TAIL(&request_list, &aiocb->work, node);
- mutex_unlock(&lock);
- cond_signal(&cond);
+
+ QTAILQ_INSERT_TAIL(&globalqueue.request_list, &aiocb->work, node);
+ qemu_cond_signal(&globalqueue.cond);
+ qemu_mutex_unlock(&globalqueue.lock);
}
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
@@ -564,14 +568,14 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
int active = 0;
- mutex_lock(&lock);
+ qemu_mutex_lock(&globalqueue.lock);
if (!acb->active) {
- QTAILQ_REMOVE(&request_list, &acb->work, node);
+ QTAILQ_REMOVE(&globalqueue.request_list, &acb->work, node);
acb->ret = -ECANCELED;
} else if (acb->ret == -EINPROGRESS) {
active = 1;
}
- mutex_unlock(&lock);
+ qemu_mutex_unlock(&globalqueue.lock);
qemu_mutex_lock(&aiocb_mutex);
if (!active) {
@@ -652,7 +656,6 @@ int paio_init(void)
struct sigaction act;
PosixAioState *s;
int fds[2];
- int ret;
if (posix_aio_state)
return 0;
@@ -682,16 +685,6 @@ int paio_init(void)
qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
posix_aio_process_queue, s);
- ret = pthread_attr_init(&attr);
- if (ret)
- die2(ret, "pthread_attr_init");
-
- ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
- if (ret)
- die2(ret, "pthread_attr_setdetachstate");
-
- QTAILQ_INIT(&request_list);
-
posix_aio_state = s;
return 0;
}
^ permalink raw reply related [flat|nested] 27+ messages in thread
* [Qemu-devel] Re: [PATCH 04/13] Add ThreadletQueue.
2011-01-04 5:27 ` [Qemu-devel] [PATCH 04/13] Add ThreadletQueue Arun R Bharadwaj
@ 2011-01-05 19:54 ` Stefan Hajnoczi
2011-01-07 6:06 ` Arun R Bharadwaj
0 siblings, 1 reply; 27+ messages in thread
From: Stefan Hajnoczi @ 2011-01-05 19:54 UTC (permalink / raw)
To: Arun R Bharadwaj; +Cc: aliguori, qemu-devel, aneesh.kumar
On Tue, Jan 04, 2011 at 10:57:27AM +0530, Arun R Bharadwaj wrote:
> @@ -66,15 +81,10 @@ typedef struct PosixAioState {
> struct qemu_paiocb *first_aio;
> } PosixAioState;
>
> -
> -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
> -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
> -static pthread_t thread_id;
> +/* Default ThreadletQueue */
> +static ThreadletQueue globalqueue;
> +static int globalqueue_init;
> static pthread_attr_t attr;
> @@ -388,22 +375,24 @@ static void aio_thread(ThreadletWork *work)
> return;
> }
>
> -static void spawn_thread(void)
> +static void spawn_threadlet(ThreadletQueue *queue)
> {
> + pthread_t thread_id;
> sigset_t set, oldset;
>
> - cur_threads++;
> - idle_threads++;
> + queue->cur_threads++;
> + queue->idle_threads++;
>
> /* block all signals */
> if (sigfillset(&set)) die("sigfillset");
> if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
>
> - thread_create(&thread_id, &attr, threadlet_worker, NULL);
> + thread_create(&thread_id, &attr, threadlet_worker, queue);
>
> if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
> }
>
> +
> static void qemu_paio_submit(struct qemu_paiocb *aiocb)
> {
> qemu_mutex_lock(&aiocb_mutex);
> @@ -682,16 +685,6 @@ int paio_init(void)
> qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
> posix_aio_process_queue, s);
>
> - ret = pthread_attr_init(&attr);
> - if (ret)
> - die2(ret, "pthread_attr_init");
> -
> - ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
> - if (ret)
> - die2(ret, "pthread_attr_setdetachstate");
attr is no longer initialized but still used?
Stefan
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Qemu-devel] Re: [PATCH 04/13] Add ThreadletQueue.
2011-01-05 19:54 ` [Qemu-devel] " Stefan Hajnoczi
@ 2011-01-07 6:06 ` Arun R Bharadwaj
0 siblings, 0 replies; 27+ messages in thread
From: Arun R Bharadwaj @ 2011-01-07 6:06 UTC (permalink / raw)
To: Stefan Hajnoczi; +Cc: aliguori, qemu-devel, aneesh.kumar
* Stefan Hajnoczi <stefanha@linux.vnet.ibm.com> [2011-01-05 19:54:38]:
> On Tue, Jan 04, 2011 at 10:57:27AM +0530, Arun R Bharadwaj wrote:
> > @@ -66,15 +81,10 @@ typedef struct PosixAioState {
> > struct qemu_paiocb *first_aio;
> > } PosixAioState;
> >
> > -
> > -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
> > -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
> > -static pthread_t thread_id;
> > +/* Default ThreadletQueue */
> > +static ThreadletQueue globalqueue;
> > +static int globalqueue_init;
> > static pthread_attr_t attr;
>
> > @@ -388,22 +375,24 @@ static void aio_thread(ThreadletWork *work)
> > return;
> > }
> >
> > -static void spawn_thread(void)
> > +static void spawn_threadlet(ThreadletQueue *queue)
> > {
> > + pthread_t thread_id;
> > sigset_t set, oldset;
> >
> > - cur_threads++;
> > - idle_threads++;
> > + queue->cur_threads++;
> > + queue->idle_threads++;
> >
> > /* block all signals */
> > if (sigfillset(&set)) die("sigfillset");
> > if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
> >
> > - thread_create(&thread_id, &attr, threadlet_worker, NULL);
> > + thread_create(&thread_id, &attr, threadlet_worker, queue);
> >
> > if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
> > }
> >
> > +
> > static void qemu_paio_submit(struct qemu_paiocb *aiocb)
> > {
> > qemu_mutex_lock(&aiocb_mutex);
>
> > @@ -682,16 +685,6 @@ int paio_init(void)
> > qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
> > posix_aio_process_queue, s);
> >
> > - ret = pthread_attr_init(&attr);
> > - if (ret)
> > - die2(ret, "pthread_attr_init");
> > -
> > - ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
> > - if (ret)
> > - die2(ret, "pthread_attr_setdetachstate");
>
> attr is no longer initialized but still used?
>
This initialization is still needed. Thanks for pointing this out.
This should be removed in patch 8/13 instead.
-arun
> Stefan
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Qemu-devel] [PATCH 05/13] Threadlet: Add submit_work threadlet API.
2011-01-04 5:27 [Qemu-devel] [PATCH 00/13] Threadlets infrastructure Arun R Bharadwaj
` (3 preceding siblings ...)
2011-01-04 5:27 ` [Qemu-devel] [PATCH 04/13] Add ThreadletQueue Arun R Bharadwaj
@ 2011-01-04 5:27 ` Arun R Bharadwaj
2011-01-04 5:27 ` [Qemu-devel] [PATCH 06/13] Threadlet: Add dequeue_work " Arun R Bharadwaj
` (8 subsequent siblings)
13 siblings, 0 replies; 27+ messages in thread
From: Arun R Bharadwaj @ 2011-01-04 5:27 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha, aneesh.kumar
This patch adds submit work threadlet API and shows how
the qemu_paio_submit changes to submit_work.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
posix-aio-compat.c | 33 ++++++++++++++++++++++-----------
1 files changed, 22 insertions(+), 11 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 7b41235..ff6e08b 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -393,13 +393,13 @@ static void spawn_threadlet(ThreadletQueue *queue)
}
-static void qemu_paio_submit(struct qemu_paiocb *aiocb)
+/**
+ * submit_work: Submit to the global queue a new task to be executed
+ * asynchronously.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+static void submit_work(ThreadletWork *work)
{
- qemu_mutex_lock(&aiocb_mutex);
- aiocb->ret = -EINPROGRESS;
- aiocb->active = 0;
- qemu_mutex_unlock(&aiocb_mutex);
-
qemu_mutex_lock(&globalqueue.lock);
if (!globalqueue_init) {
@@ -415,13 +415,13 @@ static void qemu_paio_submit(struct qemu_paiocb *aiocb)
}
if (globalqueue.idle_threads == 0 &&
- globalqueue.cur_threads < globalqueue.max_threads)
+ globalqueue.cur_threads < globalqueue.max_threads) {
spawn_threadlet(&globalqueue);
- aiocb->work.func = aio_thread;
-
- QTAILQ_INSERT_TAIL(&globalqueue.request_list, &aiocb->work, node);
- qemu_cond_signal(&globalqueue.cond);
+ } else {
+ qemu_cond_signal(&globalqueue.cond);
+ }
+ QTAILQ_INSERT_TAIL(&globalqueue.request_list, work, node);
qemu_mutex_unlock(&globalqueue.lock);
}
@@ -448,6 +448,17 @@ static int qemu_paio_error(struct qemu_paiocb *aiocb)
return ret;
}
+static void qemu_paio_submit(struct qemu_paiocb *aiocb)
+{
+ qemu_mutex_lock(&aiocb_mutex);
+ aiocb->ret = -EINPROGRESS;
+ aiocb->active = 0;
+ qemu_mutex_unlock(&aiocb_mutex);
+
+ aiocb->work.func = aio_thread;
+ submit_work(&aiocb->work);
+}
+
static int posix_aio_process_queue(void *opaque)
{
PosixAioState *s = opaque;
^ permalink raw reply related [flat|nested] 27+ messages in thread
* [Qemu-devel] [PATCH 06/13] Threadlet: Add dequeue_work threadlet API
2011-01-04 5:27 [Qemu-devel] [PATCH 00/13] Threadlets infrastructure Arun R Bharadwaj
` (4 preceding siblings ...)
2011-01-04 5:27 ` [Qemu-devel] [PATCH 05/13] Threadlet: Add submit_work threadlet API Arun R Bharadwaj
@ 2011-01-04 5:27 ` Arun R Bharadwaj
2011-01-05 19:55 ` [Qemu-devel] " Stefan Hajnoczi
2011-01-04 5:27 ` [Qemu-devel] [PATCH 07/13] Remove active field in qemu_aiocb structure Arun R Bharadwaj
` (7 subsequent siblings)
13 siblings, 1 reply; 27+ messages in thread
From: Arun R Bharadwaj @ 2011-01-04 5:27 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha, aneesh.kumar
This patch adds dequeue_work threadlet API and shows how
the paio_cancel changes to dequeue_work.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
posix-aio-compat.c | 46 ++++++++++++++++++++++++++--------------------
1 files changed, 26 insertions(+), 20 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index ff6e08b..8f1a9b6 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -574,33 +574,39 @@ static void paio_remove(struct qemu_paiocb *acb)
}
}
-static void paio_cancel(BlockDriverAIOCB *blockacb)
+/**
+ * dequeue_work: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ * 1 otherwise.
+ */
+static int dequeue_work(ThreadletWork *work)
{
- struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
- int active = 0;
+ int ret = 1;
qemu_mutex_lock(&globalqueue.lock);
- if (!acb->active) {
- QTAILQ_REMOVE(&globalqueue.request_list, &acb->work, node);
- acb->ret = -ECANCELED;
- } else if (acb->ret == -EINPROGRESS) {
- active = 1;
- }
+ QTAILQ_REMOVE(&globalqueue.request_list, work, node);
+ ret = 0;
qemu_mutex_unlock(&globalqueue.lock);
- qemu_mutex_lock(&aiocb_mutex);
- if (!active) {
- acb->ret = -ECANCELED;
- } else {
- while (acb->ret == -EINPROGRESS) {
- /*
- * fail safe: if the aio could not be canceled,
- * we wait for it
- */
- qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
+ return ret;
+}
+
+static void paio_cancel(BlockDriverAIOCB *blockacb)
+{
+ struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
+ if (!acb->active) {
+ if (dequeue_work(&acb->work) != 0) {
+ /* Wait for running work item to complete */
+ qemu_mutex_lock(&aiocb_mutex);
+ while (acb->ret == -EINPROGRESS) {
+ qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
+ }
+ qemu_mutex_unlock(&aiocb_mutex);
}
}
- qemu_mutex_unlock(&aiocb_mutex);
+
paio_remove(acb);
}
^ permalink raw reply related [flat|nested] 27+ messages in thread
* [Qemu-devel] Re: [PATCH 06/13] Threadlet: Add dequeue_work threadlet API
2011-01-04 5:27 ` [Qemu-devel] [PATCH 06/13] Threadlet: Add dequeue_work " Arun R Bharadwaj
@ 2011-01-05 19:55 ` Stefan Hajnoczi
2011-01-06 10:43 ` Arun R Bharadwaj
0 siblings, 1 reply; 27+ messages in thread
From: Stefan Hajnoczi @ 2011-01-05 19:55 UTC (permalink / raw)
To: Arun R Bharadwaj; +Cc: aliguori, qemu-devel, aneesh.kumar
On Tue, Jan 04, 2011 at 10:57:39AM +0530, Arun R Bharadwaj wrote:
> @@ -574,33 +574,39 @@ static void paio_remove(struct qemu_paiocb *acb)
> }
> }
>
> -static void paio_cancel(BlockDriverAIOCB *blockacb)
> +/**
> + * dequeue_work: Cancel a task queued on the global queue.
> + * @work: Contains the information of the task that needs to be cancelled.
> + *
> + * Returns: 0 if the task is successfully cancelled.
> + * 1 otherwise.
> + */
> +static int dequeue_work(ThreadletWork *work)
> {
> - struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> - int active = 0;
> + int ret = 1;
>
> qemu_mutex_lock(&globalqueue.lock);
> - if (!acb->active) {
> - QTAILQ_REMOVE(&globalqueue.request_list, &acb->work, node);
> - acb->ret = -ECANCELED;
> - } else if (acb->ret == -EINPROGRESS) {
> - active = 1;
> - }
> + QTAILQ_REMOVE(&globalqueue.request_list, work, node);
> + ret = 0;
> qemu_mutex_unlock(&globalqueue.lock);
>
> - qemu_mutex_lock(&aiocb_mutex);
> - if (!active) {
> - acb->ret = -ECANCELED;
> - } else {
> - while (acb->ret == -EINPROGRESS) {
> - /*
> - * fail safe: if the aio could not be canceled,
> - * we wait for it
> - */
> - qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
> + return ret;
It always succeeds? Why bother with the ret local variable?
> +}
> +
> +static void paio_cancel(BlockDriverAIOCB *blockacb)
> +{
> + struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> + if (!acb->active) {
> + if (dequeue_work(&acb->work) != 0) {
> + /* Wait for running work item to complete */
> + qemu_mutex_lock(&aiocb_mutex);
> + while (acb->ret == -EINPROGRESS) {
> + qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
> + }
> + qemu_mutex_unlock(&aiocb_mutex);
> }
> }
> - qemu_mutex_unlock(&aiocb_mutex);
> +
> paio_remove(acb);
I'm not convinced this function works. If the request is active in a
worker thread and paio_cancel() is called then we invoke paio_remove().
Stefan
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Qemu-devel] Re: [PATCH 06/13] Threadlet: Add dequeue_work threadlet API
2011-01-05 19:55 ` [Qemu-devel] " Stefan Hajnoczi
@ 2011-01-06 10:43 ` Arun R Bharadwaj
2011-01-07 11:06 ` Stefan Hajnoczi
0 siblings, 1 reply; 27+ messages in thread
From: Arun R Bharadwaj @ 2011-01-06 10:43 UTC (permalink / raw)
To: Stefan Hajnoczi; +Cc: aliguori, qemu-devel, aneesh.kumar
* Stefan Hajnoczi <stefanha@linux.vnet.ibm.com> [2011-01-05 19:55:46]:
> On Tue, Jan 04, 2011 at 10:57:39AM +0530, Arun R Bharadwaj wrote:
> > @@ -574,33 +574,39 @@ static void paio_remove(struct qemu_paiocb *acb)
> > }
> > }
> >
> > -static void paio_cancel(BlockDriverAIOCB *blockacb)
> > +/**
> > + * dequeue_work: Cancel a task queued on the global queue.
> > + * @work: Contains the information of the task that needs to be cancelled.
> > + *
> > + * Returns: 0 if the task is successfully cancelled.
> > + * 1 otherwise.
> > + */
> > +static int dequeue_work(ThreadletWork *work)
> > {
> > - struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> > - int active = 0;
> > + int ret = 1;
> >
> > qemu_mutex_lock(&globalqueue.lock);
> > - if (!acb->active) {
> > - QTAILQ_REMOVE(&globalqueue.request_list, &acb->work, node);
> > - acb->ret = -ECANCELED;
> > - } else if (acb->ret == -EINPROGRESS) {
> > - active = 1;
> > - }
> > + QTAILQ_REMOVE(&globalqueue.request_list, work, node);
> > + ret = 0;
> > qemu_mutex_unlock(&globalqueue.lock);
> >
> > - qemu_mutex_lock(&aiocb_mutex);
> > - if (!active) {
> > - acb->ret = -ECANCELED;
> > - } else {
> > - while (acb->ret == -EINPROGRESS) {
> > - /*
> > - * fail safe: if the aio could not be canceled,
> > - * we wait for it
> > - */
> > - qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
> > + return ret;
>
> It always succeeds? Why bother with the ret local variable?
>
Yes, I'll remove this.
> > +}
> > +
> > +static void paio_cancel(BlockDriverAIOCB *blockacb)
> > +{
> > + struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
> > + if (!acb->active) {
> > + if (dequeue_work(&acb->work) != 0) {
> > + /* Wait for running work item to complete */
> > + qemu_mutex_lock(&aiocb_mutex);
> > + while (acb->ret == -EINPROGRESS) {
> > + qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
> > + }
> > + qemu_mutex_unlock(&aiocb_mutex);
> > }
> > }
> > - qemu_mutex_unlock(&aiocb_mutex);
> > +
> > paio_remove(acb);
>
> I'm not convinced this function works. If the request is active in a
> worker thread and paio_cancel() is called then we invoke paio_remove().
>
True. So can we do this: Since we have a patch which separately
removes the active field [PATCH 7/13], can we fold patch 7 and this
patch into a single patch? So that way we can maintain the
correctness, because we are actually waiting for the active work to
complete by doing a while (acb->ret == -EINPROGRESS)
-arun
> Stefan
^ permalink raw reply [flat|nested] 27+ messages in thread
* Re: [Qemu-devel] Re: [PATCH 06/13] Threadlet: Add dequeue_work threadlet API
2011-01-06 10:43 ` Arun R Bharadwaj
@ 2011-01-07 11:06 ` Stefan Hajnoczi
0 siblings, 0 replies; 27+ messages in thread
From: Stefan Hajnoczi @ 2011-01-07 11:06 UTC (permalink / raw)
To: arun; +Cc: aneesh.kumar, aliguori, Stefan Hajnoczi, qemu-devel
On Thu, Jan 6, 2011 at 10:43 AM, Arun R Bharadwaj
<arun@linux.vnet.ibm.com> wrote:
> * Stefan Hajnoczi <stefanha@linux.vnet.ibm.com> [2011-01-05 19:55:46]:
>
>> On Tue, Jan 04, 2011 at 10:57:39AM +0530, Arun R Bharadwaj wrote:
>> > @@ -574,33 +574,39 @@ static void paio_remove(struct qemu_paiocb *acb)
>> > }
>> > }
>> >
>> > -static void paio_cancel(BlockDriverAIOCB *blockacb)
>> > +/**
>> > + * dequeue_work: Cancel a task queued on the global queue.
>> > + * @work: Contains the information of the task that needs to be cancelled.
>> > + *
>> > + * Returns: 0 if the task is successfully cancelled.
>> > + * 1 otherwise.
>> > + */
>> > +static int dequeue_work(ThreadletWork *work)
>> > {
>> > - struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
>> > - int active = 0;
>> > + int ret = 1;
>> >
>> > qemu_mutex_lock(&globalqueue.lock);
>> > - if (!acb->active) {
>> > - QTAILQ_REMOVE(&globalqueue.request_list, &acb->work, node);
>> > - acb->ret = -ECANCELED;
>> > - } else if (acb->ret == -EINPROGRESS) {
>> > - active = 1;
>> > - }
>> > + QTAILQ_REMOVE(&globalqueue.request_list, work, node);
>> > + ret = 0;
>> > qemu_mutex_unlock(&globalqueue.lock);
>> >
>> > - qemu_mutex_lock(&aiocb_mutex);
>> > - if (!active) {
>> > - acb->ret = -ECANCELED;
>> > - } else {
>> > - while (acb->ret == -EINPROGRESS) {
>> > - /*
>> > - * fail safe: if the aio could not be canceled,
>> > - * we wait for it
>> > - */
>> > - qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
>> > + return ret;
>>
>> It always succeeds? Why bother with the ret local variable?
>>
>
> Yes, I'll remove this.
>
>> > +}
>> > +
>> > +static void paio_cancel(BlockDriverAIOCB *blockacb)
>> > +{
>> > + struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
>> > + if (!acb->active) {
>> > + if (dequeue_work(&acb->work) != 0) {
>> > + /* Wait for running work item to complete */
>> > + qemu_mutex_lock(&aiocb_mutex);
>> > + while (acb->ret == -EINPROGRESS) {
>> > + qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
>> > + }
>> > + qemu_mutex_unlock(&aiocb_mutex);
>> > }
>> > }
>> > - qemu_mutex_unlock(&aiocb_mutex);
>> > +
>> > paio_remove(acb);
>>
>> I'm not convinced this function works. If the request is active in a
>> worker thread and paio_cancel() is called then we invoke paio_remove().
>>
>
> True. So can we do this: Since we have a patch which separately
> removes the active field [PATCH 7/13], can we fold patch 7 and this
> patch into a single patch? So that way we can maintain the
> correctness, because we are actually waiting for the active work to
> complete by doing a while (acb->ret == -EINPROGRESS)
Sounds good. IIRC the paio_cancel() code in the last version of the
patch looked correct, so it probably *is* just a case of not splitting
this part up into separate patches.
Stefan
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Qemu-devel] [PATCH 07/13] Remove active field in qemu_aiocb structure.
2011-01-04 5:27 [Qemu-devel] [PATCH 00/13] Threadlets infrastructure Arun R Bharadwaj
` (5 preceding siblings ...)
2011-01-04 5:27 ` [Qemu-devel] [PATCH 06/13] Threadlet: Add dequeue_work " Arun R Bharadwaj
@ 2011-01-04 5:27 ` Arun R Bharadwaj
2011-01-04 5:27 ` [Qemu-devel] [PATCH 08/13] Remove thread_create routine Arun R Bharadwaj
` (6 subsequent siblings)
13 siblings, 0 replies; 27+ messages in thread
From: Arun R Bharadwaj @ 2011-01-04 5:27 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha, aneesh.kumar
The active field in the qemu_aiocb structure is now useless.
Remove it.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
posix-aio-compat.c | 17 ++++++-----------
1 files changed, 6 insertions(+), 11 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 8f1a9b6..fd9fcfd 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -69,7 +69,6 @@ struct qemu_paiocb {
int aio_type;
ssize_t ret;
- int active;
struct qemu_paiocb *next;
int async_context_id;
@@ -345,7 +344,6 @@ static void aio_thread(ThreadletWork *work)
pid = getpid();
aiocb = container_of(work, struct qemu_paiocb, work);
- aiocb->active = 1;
switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
case QEMU_AIO_READ:
@@ -452,7 +450,6 @@ static void qemu_paio_submit(struct qemu_paiocb *aiocb)
{
qemu_mutex_lock(&aiocb_mutex);
aiocb->ret = -EINPROGRESS;
- aiocb->active = 0;
qemu_mutex_unlock(&aiocb_mutex);
aiocb->work.func = aio_thread;
@@ -596,15 +593,13 @@ static int dequeue_work(ThreadletWork *work)
static void paio_cancel(BlockDriverAIOCB *blockacb)
{
struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
- if (!acb->active) {
- if (dequeue_work(&acb->work) != 0) {
- /* Wait for running work item to complete */
- qemu_mutex_lock(&aiocb_mutex);
- while (acb->ret == -EINPROGRESS) {
- qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
- }
- qemu_mutex_unlock(&aiocb_mutex);
+ if (dequeue_work(&acb->work) != 0) {
+ /* Wait for running work item to complete */
+ qemu_mutex_lock(&aiocb_mutex);
+ while (acb->ret == -EINPROGRESS) {
+ qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
}
+ qemu_mutex_unlock(&aiocb_mutex);
}
paio_remove(acb);
^ permalink raw reply related [flat|nested] 27+ messages in thread
* [Qemu-devel] [PATCH 08/13] Remove thread_create routine.
2011-01-04 5:27 [Qemu-devel] [PATCH 00/13] Threadlets infrastructure Arun R Bharadwaj
` (6 preceding siblings ...)
2011-01-04 5:27 ` [Qemu-devel] [PATCH 07/13] Remove active field in qemu_aiocb structure Arun R Bharadwaj
@ 2011-01-04 5:27 ` Arun R Bharadwaj
2011-01-05 19:56 ` [Qemu-devel] " Stefan Hajnoczi
2011-01-04 5:27 ` [Qemu-devel] [PATCH 09/13] Threadlet: Add aio_signal_handler threadlet API Arun R Bharadwaj
` (5 subsequent siblings)
13 siblings, 1 reply; 27+ messages in thread
From: Arun R Bharadwaj @ 2011-01-04 5:27 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha, aneesh.kumar
Remove thread_create and use qemu_thread_create instead.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
posix-aio-compat.c | 19 ++-----------------
1 files changed, 2 insertions(+), 17 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index fd9fcfd..3b01c9b 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -83,7 +83,6 @@ typedef struct PosixAioState {
/* Default ThreadletQueue */
static ThreadletQueue globalqueue;
static int globalqueue_init;
-static pthread_attr_t attr;
#ifdef CONFIG_PREADV
static int preadv_present = 1;
@@ -102,13 +101,6 @@ static void die(const char *what)
die2(errno, what);
}
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
- void *(*start_routine)(void*), void *arg)
-{
- int ret = pthread_create(thread, attr, start_routine, arg);
- if (ret) die2(ret, "pthread_create");
-}
-
static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
{
int ret;
@@ -375,19 +367,12 @@ static void aio_thread(ThreadletWork *work)
static void spawn_threadlet(ThreadletQueue *queue)
{
- pthread_t thread_id;
- sigset_t set, oldset;
+ QemuThread thread;
queue->cur_threads++;
queue->idle_threads++;
- /* block all signals */
- if (sigfillset(&set)) die("sigfillset");
- if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
-
- thread_create(&thread_id, &attr, threadlet_worker, queue);
-
- if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+ qemu_thread_create(&thread, threadlet_worker, queue);
}
^ permalink raw reply related [flat|nested] 27+ messages in thread
* [Qemu-devel] Re: [PATCH 08/13] Remove thread_create routine.
2011-01-04 5:27 ` [Qemu-devel] [PATCH 08/13] Remove thread_create routine Arun R Bharadwaj
@ 2011-01-05 19:56 ` Stefan Hajnoczi
2011-01-07 5:59 ` Arun R Bharadwaj
0 siblings, 1 reply; 27+ messages in thread
From: Stefan Hajnoczi @ 2011-01-05 19:56 UTC (permalink / raw)
To: Arun R Bharadwaj; +Cc: aliguori, qemu-devel, aneesh.kumar
On Tue, Jan 04, 2011 at 10:57:49AM +0530, Arun R Bharadwaj wrote:
> Remove thread_create and use qemu_thread_create instead.
>
> Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
> ---
> posix-aio-compat.c | 19 ++-----------------
> 1 files changed, 2 insertions(+), 17 deletions(-)
>
Can we remove #include <pthread.h> now?
Stefan
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Qemu-devel] Re: [PATCH 08/13] Remove thread_create routine.
2011-01-05 19:56 ` [Qemu-devel] " Stefan Hajnoczi
@ 2011-01-07 5:59 ` Arun R Bharadwaj
0 siblings, 0 replies; 27+ messages in thread
From: Arun R Bharadwaj @ 2011-01-07 5:59 UTC (permalink / raw)
To: Stefan Hajnoczi; +Cc: aliguori, qemu-devel, aneesh.kumar
* Stefan Hajnoczi <stefanha@linux.vnet.ibm.com> [2011-01-05 19:56:00]:
> On Tue, Jan 04, 2011 at 10:57:49AM +0530, Arun R Bharadwaj wrote:
> > Remove thread_create and use qemu_thread_create instead.
> >
> > Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
> > ---
> > posix-aio-compat.c | 19 ++-----------------
> > 1 files changed, 2 insertions(+), 17 deletions(-)
> >
>
> Can we remove #include <pthread.h> now?
>
Yes we can now remove this.
> Stefan
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Qemu-devel] [PATCH 09/13] Threadlet: Add aio_signal_handler threadlet API
2011-01-04 5:27 [Qemu-devel] [PATCH 00/13] Threadlets infrastructure Arun R Bharadwaj
` (7 preceding siblings ...)
2011-01-04 5:27 ` [Qemu-devel] [PATCH 08/13] Remove thread_create routine Arun R Bharadwaj
@ 2011-01-04 5:27 ` Arun R Bharadwaj
2011-01-04 5:28 ` [Qemu-devel] [PATCH 10/13] Remove all instances of CONFIG_THREAD Arun R Bharadwaj
` (4 subsequent siblings)
13 siblings, 0 replies; 27+ messages in thread
From: Arun R Bharadwaj @ 2011-01-04 5:27 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha, aneesh.kumar
This patch adds aio_signal_handler threadlet API. Earler
posix-aio-compat.c had its own signal handler code. Now
abstract this, in the later patch it is moved to a generic
code so that it can be used by other subsystems.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
posix-aio-compat.c | 49 +++++++++++++++++++++++++++----------------------
qemu-thread.h | 1 +
vl.c | 3 +++
3 files changed, 31 insertions(+), 22 deletions(-)
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 3b01c9b..d5874d9 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -328,11 +328,14 @@ static void *threadlet_worker(void *data)
return NULL;
}
+static PosixAioState *posix_aio_state;
+
static void aio_thread(ThreadletWork *work)
{
pid_t pid;
ssize_t ret = 0;
struct qemu_paiocb *aiocb;
+ char byte = 0;
pid = getpid();
aiocb = container_of(work, struct qemu_paiocb, work);
@@ -359,12 +362,36 @@ static void aio_thread(ThreadletWork *work)
qemu_cond_broadcast(&aiocb_completion);
qemu_mutex_unlock(&aiocb_mutex);
+ ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
+ if (ret < 0 && errno != EAGAIN) {
+ die("write()");
+ }
+
if (kill(pid, aiocb->ev_signo)) {
die("kill failed");
}
return;
}
+static void threadlet_io_completion_signal_handler(int signum)
+{
+ qemu_service_io();
+}
+
+static void threadlet_register_signal_handler(void)
+{
+ struct sigaction act;
+ sigfillset(&act.sa_mask);
+ act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
+ act.sa_handler = threadlet_io_completion_signal_handler;
+ sigaction(SIGUSR2, &act, NULL);
+}
+
+void threadlet_init(void)
+{
+ threadlet_register_signal_handler();
+}
+
static void spawn_threadlet(ThreadletQueue *queue)
{
QemuThread thread;
@@ -521,22 +548,6 @@ static int posix_aio_flush(void *opaque)
return !!s->first_aio;
}
-static PosixAioState *posix_aio_state;
-
-static void aio_signal_handler(int signum)
-{
- if (posix_aio_state) {
- char byte = 0;
- ssize_t ret;
-
- ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
- if (ret < 0 && errno != EAGAIN)
- die("write()");
- }
-
- qemu_service_io();
-}
-
static void paio_remove(struct qemu_paiocb *acb)
{
struct qemu_paiocb **pacb;
@@ -650,7 +661,6 @@ BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
int paio_init(void)
{
- struct sigaction act;
PosixAioState *s;
int fds[2];
@@ -662,11 +672,6 @@ int paio_init(void)
s = qemu_malloc(sizeof(PosixAioState));
- sigfillset(&act.sa_mask);
- act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
- act.sa_handler = aio_signal_handler;
- sigaction(SIGUSR2, &act, NULL);
-
s->first_aio = NULL;
if (qemu_pipe(fds) == -1) {
fprintf(stderr, "failed to create pipe\n");
diff --git a/qemu-thread.h b/qemu-thread.h
index 19bb30c..c5b579c 100644
--- a/qemu-thread.h
+++ b/qemu-thread.h
@@ -40,5 +40,6 @@ void qemu_thread_signal(QemuThread *thread, int sig);
void qemu_thread_self(QemuThread *thread);
int qemu_thread_equal(QemuThread *thread1, QemuThread *thread2);
void qemu_thread_exit(void *retval);
+void threadlet_init(void);
#endif
diff --git a/vl.c b/vl.c
index df414ef..aba805f 100644
--- a/vl.c
+++ b/vl.c
@@ -148,6 +148,7 @@ int main(int argc, char **argv)
#include "qemu-config.h"
#include "qemu-objects.h"
#include "qemu-options.h"
+#include "qemu-thread.h"
#ifdef CONFIG_VIRTFS
#include "fsdev/qemu-fsdev.h"
#endif
@@ -2922,6 +2923,8 @@ int main(int argc, char **argv, char **envp)
exit(1);
}
+ threadlet_init();
+
/* init generic devices */
if (qemu_opts_foreach(qemu_find_opts("device"), device_init_func, NULL, 1) != 0)
exit(1);
^ permalink raw reply related [flat|nested] 27+ messages in thread
* [Qemu-devel] [PATCH 10/13] Remove all instances of CONFIG_THREAD
2011-01-04 5:27 [Qemu-devel] [PATCH 00/13] Threadlets infrastructure Arun R Bharadwaj
` (8 preceding siblings ...)
2011-01-04 5:27 ` [Qemu-devel] [PATCH 09/13] Threadlet: Add aio_signal_handler threadlet API Arun R Bharadwaj
@ 2011-01-04 5:28 ` Arun R Bharadwaj
2011-01-04 5:28 ` [Qemu-devel] [PATCH 11/13] Move threadlet code to qemu-threadlets.c Arun R Bharadwaj
` (3 subsequent siblings)
13 siblings, 0 replies; 27+ messages in thread
From: Arun R Bharadwaj @ 2011-01-04 5:28 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha, aneesh.kumar
Remove all instances of CONFIG_THREAD, which is not used
anymore.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
configure | 2 --
1 files changed, 0 insertions(+), 2 deletions(-)
diff --git a/configure b/configure
index a079a49..addf733 100755
--- a/configure
+++ b/configure
@@ -2456,7 +2456,6 @@ if test "$vnc_png" != "no" ; then
fi
if test "$vnc_thread" != "no" ; then
echo "CONFIG_VNC_THREAD=y" >> $config_host_mak
- echo "CONFIG_THREAD=y" >> $config_host_mak
fi
if test "$fnmatch" = "yes" ; then
echo "CONFIG_FNMATCH=y" >> $config_host_mak
@@ -2534,7 +2533,6 @@ if test "$xen" = "yes" ; then
fi
if test "$io_thread" = "yes" ; then
echo "CONFIG_IOTHREAD=y" >> $config_host_mak
- echo "CONFIG_THREAD=y" >> $config_host_mak
fi
if test "$linux_aio" = "yes" ; then
echo "CONFIG_LINUX_AIO=y" >> $config_host_mak
^ permalink raw reply related [flat|nested] 27+ messages in thread
* [Qemu-devel] [PATCH 11/13] Move threadlet code to qemu-threadlets.c
2011-01-04 5:27 [Qemu-devel] [PATCH 00/13] Threadlets infrastructure Arun R Bharadwaj
` (9 preceding siblings ...)
2011-01-04 5:28 ` [Qemu-devel] [PATCH 10/13] Remove all instances of CONFIG_THREAD Arun R Bharadwaj
@ 2011-01-04 5:28 ` Arun R Bharadwaj
2011-01-04 5:28 ` [Qemu-devel] [PATCH 12/13] Threadlets: Add functionality to create private queues Arun R Bharadwaj
` (2 subsequent siblings)
13 siblings, 0 replies; 27+ messages in thread
From: Arun R Bharadwaj @ 2011-01-04 5:28 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha, aneesh.kumar
This patch moves the threadlet queue API code to
qemu-threadlets.c where these APIs can be used by
other subsystems.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
Makefile.objs | 1
posix-aio-compat.c | 151 ----------------------------------------------------
qemu-thread.h | 1
qemu-threadlets.c | 148 +++++++++++++++++++++++++++++++++++++++++++++++++++
qemu-threadlets.h | 43 +++++++++++++++
vl.c | 2 -
6 files changed, 194 insertions(+), 152 deletions(-)
create mode 100644 qemu-threadlets.c
create mode 100644 qemu-threadlets.h
diff --git a/Makefile.objs b/Makefile.objs
index 3b7ec27..2cf8aba 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -10,6 +10,7 @@ qobject-obj-y += qerror.o
block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
block-obj-$(CONFIG_POSIX) += qemu-thread.o
+block-obj-$(CONFIG_POSIX) += qemu-threadlets.o
block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index d5874d9..e6de882 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -27,33 +27,13 @@
#include "qemu-common.h"
#include "trace.h"
#include "block_int.h"
-#include "qemu-thread.h"
+#include "qemu-threadlets.h"
#include "block/raw-posix-aio.h"
-#define MAX_GLOBAL_THREADS 64
-#define MIN_GLOBAL_THREADS 8
-
static QemuMutex aiocb_mutex;
static QemuCond aiocb_completion;
-typedef struct ThreadletQueue
-{
- QemuMutex lock;
- QemuCond cond;
- int max_threads;
- int min_threads;
- int cur_threads;
- int idle_threads;
- QTAILQ_HEAD(, ThreadletWork) request_list;
-} ThreadletQueue;
-
-typedef struct ThreadletWork
-{
- QTAILQ_ENTRY(ThreadletWork) node;
- void (*func)(struct ThreadletWork *work);
-} ThreadletWork;
-
struct qemu_paiocb {
BlockDriverAIOCB common;
int aio_fildes;
@@ -80,10 +60,6 @@ typedef struct PosixAioState {
struct qemu_paiocb *first_aio;
} PosixAioState;
-/* Default ThreadletQueue */
-static ThreadletQueue globalqueue;
-static int globalqueue_init;
-
#ifdef CONFIG_PREADV
static int preadv_present = 1;
#else
@@ -284,50 +260,6 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
return nbytes;
}
-static void *threadlet_worker(void *data)
-{
- ThreadletQueue *queue = data;
-
- qemu_mutex_lock(&queue->lock);
- while (1) {
- ThreadletWork *work;
- int ret = 0;
-
- while (QTAILQ_EMPTY(&queue->request_list) &&
- (ret != ETIMEDOUT)) {
- /* wait for cond to be signalled or broadcast for 1000s */
- ret = qemu_cond_timedwait((&queue->cond),
- &(queue->lock), 10*100000);
- }
-
- assert(queue->idle_threads != 0);
- if (QTAILQ_EMPTY(&queue->request_list)) {
- if (queue->cur_threads > queue->min_threads) {
- /* We retain the minimum number of threads */
- break;
- }
- } else {
- work = QTAILQ_FIRST(&queue->request_list);
- QTAILQ_REMOVE(&queue->request_list, work, node);
-
- queue->idle_threads--;
- qemu_mutex_unlock(&queue->lock);
-
- /* execute the work function */
- work->func(work);
-
- qemu_mutex_lock(&queue->lock);
- queue->idle_threads++;
- }
- }
-
- queue->idle_threads--;
- queue->cur_threads--;
- qemu_mutex_unlock(&queue->lock);
-
- return NULL;
-}
-
static PosixAioState *posix_aio_state;
static void aio_thread(ThreadletWork *work)
@@ -373,68 +305,6 @@ static void aio_thread(ThreadletWork *work)
return;
}
-static void threadlet_io_completion_signal_handler(int signum)
-{
- qemu_service_io();
-}
-
-static void threadlet_register_signal_handler(void)
-{
- struct sigaction act;
- sigfillset(&act.sa_mask);
- act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
- act.sa_handler = threadlet_io_completion_signal_handler;
- sigaction(SIGUSR2, &act, NULL);
-}
-
-void threadlet_init(void)
-{
- threadlet_register_signal_handler();
-}
-
-static void spawn_threadlet(ThreadletQueue *queue)
-{
- QemuThread thread;
-
- queue->cur_threads++;
- queue->idle_threads++;
-
- qemu_thread_create(&thread, threadlet_worker, queue);
-}
-
-
-/**
- * submit_work: Submit to the global queue a new task to be executed
- * asynchronously.
- * @work: Contains information about the task that needs to be submitted.
- */
-static void submit_work(ThreadletWork *work)
-{
- qemu_mutex_lock(&globalqueue.lock);
-
- if (!globalqueue_init) {
- globalqueue.cur_threads = 0;
- globalqueue.idle_threads = 0;
- globalqueue.max_threads = MAX_GLOBAL_THREADS;
- globalqueue.min_threads = MIN_GLOBAL_THREADS;
- QTAILQ_INIT(&globalqueue.request_list);
- qemu_mutex_init(&globalqueue.lock);
- qemu_cond_init(&globalqueue.cond);
-
- globalqueue_init = 1;
- }
-
- if (globalqueue.idle_threads == 0 &&
- globalqueue.cur_threads < globalqueue.max_threads) {
- spawn_threadlet(&globalqueue);
-
- } else {
- qemu_cond_signal(&globalqueue.cond);
- }
- QTAILQ_INSERT_TAIL(&globalqueue.request_list, work, node);
- qemu_mutex_unlock(&globalqueue.lock);
-}
-
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
{
ssize_t ret;
@@ -567,25 +437,6 @@ static void paio_remove(struct qemu_paiocb *acb)
}
}
-/**
- * dequeue_work: Cancel a task queued on the global queue.
- * @work: Contains the information of the task that needs to be cancelled.
- *
- * Returns: 0 if the task is successfully cancelled.
- * 1 otherwise.
- */
-static int dequeue_work(ThreadletWork *work)
-{
- int ret = 1;
-
- qemu_mutex_lock(&globalqueue.lock);
- QTAILQ_REMOVE(&globalqueue.request_list, work, node);
- ret = 0;
- qemu_mutex_unlock(&globalqueue.lock);
-
- return ret;
-}
-
static void paio_cancel(BlockDriverAIOCB *blockacb)
{
struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
diff --git a/qemu-thread.h b/qemu-thread.h
index c5b579c..19bb30c 100644
--- a/qemu-thread.h
+++ b/qemu-thread.h
@@ -40,6 +40,5 @@ void qemu_thread_signal(QemuThread *thread, int sig);
void qemu_thread_self(QemuThread *thread);
int qemu_thread_equal(QemuThread *thread1, QemuThread *thread2);
void qemu_thread_exit(void *retval);
-void threadlet_init(void);
#endif
diff --git a/qemu-threadlets.c b/qemu-threadlets.c
new file mode 100644
index 0000000..4702c02
--- /dev/null
+++ b/qemu-threadlets.c
@@ -0,0 +1,148 @@
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ * Anthony Liguori <aliguori@us.ibm.com>
+ * Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
+ * Gautham R Shenoy <gautham.shenoy@gmail.com>
+ * Arun R Bharadwaj <arun@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ */
+
+#include <signal.h>
+
+#include "qemu-threadlets.h"
+#include "osdep.h"
+
+#define MAX_GLOBAL_THREADS 64
+#define MIN_GLOBAL_THREADS 8
+
+static ThreadletQueue globalqueue;
+static int globalqueue_init;
+
+static void threadlet_io_completion_signal_handler(int signum)
+{
+ qemu_service_io();
+}
+
+static void threadlet_register_signal_handler(void)
+{
+ struct sigaction act;
+ sigfillset(&act.sa_mask);
+ act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
+ act.sa_handler = threadlet_io_completion_signal_handler;
+ sigaction(SIGUSR2, &act, NULL);
+}
+
+void threadlet_init(void)
+{
+ threadlet_register_signal_handler();
+}
+
+static void *threadlet_worker(void *data)
+{
+ ThreadletQueue *queue = data;
+
+ qemu_mutex_lock(&queue->lock);
+ while (1) {
+ ThreadletWork *work;
+ int ret = 0;
+
+ while (QTAILQ_EMPTY(&queue->request_list) &&
+ (ret != ETIMEDOUT)) {
+ /* wait for cond to be signalled or broadcast for 1000s */
+ ret = qemu_cond_timedwait((&queue->cond),
+ &(queue->lock), 10*100000);
+ }
+
+ assert(queue->idle_threads != 0);
+ if (QTAILQ_EMPTY(&queue->request_list)) {
+ if (queue->cur_threads > queue->min_threads) {
+ /* We retain the minimum number of threads */
+ break;
+ }
+ } else {
+ work = QTAILQ_FIRST(&queue->request_list);
+ QTAILQ_REMOVE(&queue->request_list, work, node);
+
+ queue->idle_threads--;
+ qemu_mutex_unlock(&queue->lock);
+
+ /* execute the work function */
+ work->func(work);
+
+ qemu_mutex_lock(&queue->lock);
+ queue->idle_threads++;
+ }
+ }
+
+ queue->idle_threads--;
+ queue->cur_threads--;
+ qemu_mutex_unlock(&queue->lock);
+
+ return NULL;
+}
+
+static void spawn_threadlet(ThreadletQueue *queue)
+{
+ QemuThread thread;
+
+ queue->cur_threads++;
+ queue->idle_threads++;
+
+ qemu_thread_create(&thread, threadlet_worker, queue);
+}
+
+/**
+ * submit_work: Submit to the global queue a new task to be executed
+ * asynchronously.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_work(ThreadletWork *work)
+{
+ if (!globalqueue_init) {
+ globalqueue.cur_threads = 0;
+ globalqueue.idle_threads = 0;
+ globalqueue.max_threads = MAX_GLOBAL_THREADS;
+ globalqueue.min_threads = MIN_GLOBAL_THREADS;
+ QTAILQ_INIT(&globalqueue.request_list);
+ qemu_mutex_init(&globalqueue.lock);
+ qemu_cond_init(&globalqueue.cond);
+
+ globalqueue_init = 1;
+ }
+
+ qemu_mutex_lock(&globalqueue.lock);
+ if (globalqueue.idle_threads == 0 &&
+ globalqueue.cur_threads < globalqueue.max_threads) {
+ spawn_threadlet(&globalqueue);
+ } else {
+ qemu_cond_signal(&globalqueue.cond);
+ }
+ QTAILQ_INSERT_TAIL(&globalqueue.request_list, work, node);
+ qemu_mutex_unlock(&globalqueue.lock);
+}
+
+/**
+ * dequeue_work: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ * 1 otherwise.
+ */
+int dequeue_work(ThreadletWork *work)
+{
+ int ret = 1;
+
+ qemu_mutex_lock(&globalqueue.lock);
+ QTAILQ_REMOVE(&globalqueue.request_list, work, node);
+ ret = 0;
+ qemu_mutex_unlock(&globalqueue.lock);
+
+ return ret;
+}
diff --git a/qemu-threadlets.h b/qemu-threadlets.h
new file mode 100644
index 0000000..03bb86b
--- /dev/null
+++ b/qemu-threadlets.h
@@ -0,0 +1,43 @@
+/*
+ * Threadlet support for offloading tasks to be executed asynchronously
+ *
+ * Copyright IBM, Corp. 2008
+ * Copyright IBM, Corp. 2010
+ *
+ * Authors:
+ * Anthony Liguori <aliguori@us.ibm.com>
+ * Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
+ * Gautham R Shenoy <gautham.shenoy@gmail.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ */
+
+#ifndef QEMU_ASYNC_WORK_H
+#define QEMU_ASYNC_WORK_H
+
+#include "qemu-queue.h"
+#include "qemu-common.h"
+#include "qemu-thread.h"
+
+typedef struct ThreadletQueue
+{
+ QemuMutex lock;
+ QemuCond cond;
+ int max_threads;
+ int min_threads;
+ int cur_threads;
+ int idle_threads;
+ QTAILQ_HEAD(, ThreadletWork) request_list;
+} ThreadletQueue;
+
+typedef struct ThreadletWork
+{
+ QTAILQ_ENTRY(ThreadletWork) node;
+ void (*func)(struct ThreadletWork *work);
+} ThreadletWork;
+
+void submit_work(ThreadletWork *work);
+int dequeue_work(ThreadletWork *work);
+void threadlet_init(void);
+#endif
diff --git a/vl.c b/vl.c
index aba805f..7b9a425 100644
--- a/vl.c
+++ b/vl.c
@@ -148,7 +148,7 @@ int main(int argc, char **argv)
#include "qemu-config.h"
#include "qemu-objects.h"
#include "qemu-options.h"
-#include "qemu-thread.h"
+#include "qemu-threadlets.h"
#ifdef CONFIG_VIRTFS
#include "fsdev/qemu-fsdev.h"
#endif
^ permalink raw reply related [flat|nested] 27+ messages in thread
* [Qemu-devel] [PATCH 12/13] Threadlets: Add functionality to create private queues.
2011-01-04 5:27 [Qemu-devel] [PATCH 00/13] Threadlets infrastructure Arun R Bharadwaj
` (10 preceding siblings ...)
2011-01-04 5:28 ` [Qemu-devel] [PATCH 11/13] Move threadlet code to qemu-threadlets.c Arun R Bharadwaj
@ 2011-01-04 5:28 ` Arun R Bharadwaj
2011-01-04 5:28 ` [Qemu-devel] [PATCH 13/13] Threadlets: Add documentation Arun R Bharadwaj
2011-01-04 23:13 ` [Qemu-devel] [PATCH 00/13] Threadlets infrastructure Venkateswararao Jujjuri (JV)
13 siblings, 0 replies; 27+ messages in thread
From: Arun R Bharadwaj @ 2011-01-04 5:28 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha, aneesh.kumar
This patch allows subsystems to create their own private
queues with associated pools of threads.
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
---
qemu-threadlets.c | 80 ++++++++++++++++++++++++++++++++++++++---------------
qemu-threadlets.h | 4 +++
2 files changed, 62 insertions(+), 22 deletions(-)
diff --git a/qemu-threadlets.c b/qemu-threadlets.c
index 4702c02..7075331 100644
--- a/qemu-threadlets.c
+++ b/qemu-threadlets.c
@@ -99,6 +99,25 @@ static void spawn_threadlet(ThreadletQueue *queue)
}
/**
+ * submit_work_to_queue: Submit a new task to a private queue to be
+ * executed asynchronously.
+ * @queue: Per-subsystem private queue to which the new task needs
+ * to be submitted.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
+{
+ qemu_mutex_lock(&queue->lock);
+ if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+ spawn_threadlet(queue);
+ } else {
+ qemu_cond_signal(&queue->cond);
+ }
+ QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
+ qemu_mutex_unlock(&queue->lock);
+}
+
+/**
* submit_work: Submit to the global queue a new task to be executed
* asynchronously.
* @work: Contains information about the task that needs to be submitted.
@@ -106,43 +125,60 @@ static void spawn_threadlet(ThreadletQueue *queue)
void submit_work(ThreadletWork *work)
{
if (!globalqueue_init) {
- globalqueue.cur_threads = 0;
- globalqueue.idle_threads = 0;
- globalqueue.max_threads = MAX_GLOBAL_THREADS;
- globalqueue.min_threads = MIN_GLOBAL_THREADS;
- QTAILQ_INIT(&globalqueue.request_list);
- qemu_mutex_init(&globalqueue.lock);
- qemu_cond_init(&globalqueue.cond);
-
+ threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
+ MIN_GLOBAL_THREADS);
globalqueue_init = 1;
}
- qemu_mutex_lock(&globalqueue.lock);
- if (globalqueue.idle_threads == 0 &&
- globalqueue.cur_threads < globalqueue.max_threads) {
- spawn_threadlet(&globalqueue);
- } else {
- qemu_cond_signal(&globalqueue.cond);
- }
- QTAILQ_INSERT_TAIL(&globalqueue.request_list, work, node);
- qemu_mutex_unlock(&globalqueue.lock);
+ submit_work_to_queue(&globalqueue, work);
}
/**
- * dequeue_work: Cancel a task queued on the global queue.
+ * dequeue_work_on_queue: Cancel a task queued on a Queue.
+ * @queue: The queue containing the task to be cancelled.
* @work: Contains the information of the task that needs to be cancelled.
*
* Returns: 0 if the task is successfully cancelled.
* 1 otherwise.
*/
-int dequeue_work(ThreadletWork *work)
+int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
{
int ret = 1;
- qemu_mutex_lock(&globalqueue.lock);
- QTAILQ_REMOVE(&globalqueue.request_list, work, node);
+ qemu_mutex_lock(&queue->lock);
+ QTAILQ_REMOVE(&queue->request_list, work, node);
ret = 0;
- qemu_mutex_unlock(&globalqueue.lock);
+ qemu_mutex_unlock(&queue->lock);
return ret;
}
+
+/**
+ * dequeue_work: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ * 1 otherwise.
+ */
+int dequeue_work(ThreadletWork *work)
+{
+ return dequeue_work_on_queue(&globalqueue, work);
+}
+
+/**
+ * threadlet_queue_init: Initialize a threadlet queue.
+ * @queue: The threadlet queue to be initialized.
+ * @max_threads: Maximum number of threads processing the queue.
+ * @min_threads: Minimum number of threads processing the queue.
+ */
+void threadlet_queue_init(ThreadletQueue *queue,
+ int max_threads, int min_threads)
+{
+ queue->cur_threads = 0;
+ queue->idle_threads = 0;
+ queue->max_threads = max_threads;
+ queue->min_threads = min_threads;
+ QTAILQ_INIT(&queue->request_list);
+ qemu_mutex_init(&queue->lock);
+ qemu_cond_init(&queue->cond);
+}
diff --git a/qemu-threadlets.h b/qemu-threadlets.h
index 03bb86b..993d7ab 100644
--- a/qemu-threadlets.h
+++ b/qemu-threadlets.h
@@ -37,7 +37,11 @@ typedef struct ThreadletWork
void (*func)(struct ThreadletWork *work);
} ThreadletWork;
+void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work);
void submit_work(ThreadletWork *work);
+int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work);
int dequeue_work(ThreadletWork *work);
void threadlet_init(void);
+void threadlet_queue_init(ThreadletQueue *queue, int max_threads,
+ int min_threads);
#endif
^ permalink raw reply related [flat|nested] 27+ messages in thread
* [Qemu-devel] [PATCH 13/13] Threadlets: Add documentation
2011-01-04 5:27 [Qemu-devel] [PATCH 00/13] Threadlets infrastructure Arun R Bharadwaj
` (11 preceding siblings ...)
2011-01-04 5:28 ` [Qemu-devel] [PATCH 12/13] Threadlets: Add functionality to create private queues Arun R Bharadwaj
@ 2011-01-04 5:28 ` Arun R Bharadwaj
2011-01-04 23:13 ` [Qemu-devel] [PATCH 00/13] Threadlets infrastructure Venkateswararao Jujjuri (JV)
13 siblings, 0 replies; 27+ messages in thread
From: Arun R Bharadwaj @ 2011-01-04 5:28 UTC (permalink / raw)
To: qemu-devel; +Cc: aliguori, stefanha, aneesh.kumar
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com>
---
docs/async-support.txt | 141 ++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 141 insertions(+), 0 deletions(-)
create mode 100644 docs/async-support.txt
diff --git a/docs/async-support.txt b/docs/async-support.txt
new file mode 100644
index 0000000..9f22b9a
--- /dev/null
+++ b/docs/async-support.txt
@@ -0,0 +1,141 @@
+== How to use the threadlets infrastructure supported in Qemu ==
+
+== Threadlets ==
+
+Q.1: What are threadlets ?
+A.1: Threadlets is an infrastructure within QEMU that allows other subsystems
+ to offload possibly blocking work to a queue to be processed by a pool
+ of threads asynchronously.
+
+Q.2: When would one want to use threadlets ?
+A.2: Threadlets are useful when there are operations that can be performed
+ outside the context of the VCPU/IO threads inorder to free these latter
+ to service any other guest requests.
+
+Q.3: I have some work that can be executed in an asynchronous context. How
+ should I go about it ?
+A.3: One could follow the steps listed below:
+
+ - Define a function which would do the asynchronous work.
+ static void my_threadlet_func(ThreadletWork *work)
+ {
+ }
+
+ - Declare an object of type ThreadletWork;
+ ThreadletWork work;
+
+
+ - Assign a value to the "func" member of ThreadletWork object.
+ work.func = my_threadlet_func;
+
+ - Submit the threadlet to the global queue.
+ submit_threadletwork(&work);
+
+ - Continue servicing some other guest operations.
+
+Q.4: I want to my_threadlet_func to access some non-global data. How do I do
+ that ?
+A.4: Suppose you want my_threadlet_func to access some non-global data-object
+ of type myPrivateData. In that case one could follow the following steps.
+
+ - Define a member of the type ThreadletWork within myPrivateData.
+ typedef struct MyPrivateData {
+ ...;
+ ...;
+ ...;
+ ThreadletWork work;
+ } MyPrivateData;
+
+ MyPrivateData my_data;
+
+ - Initialize myData.work as described in A.3
+ myData.work.func = my_threadlet_func;
+ submit_threadletwork(&myData.work);
+
+ - Access the myData object inside my_threadlet_func() using container_of
+ primitive
+ static void my_threadlet_func(ThreadletWork *work)
+ {
+ myPrivateData *mydata_ptr;
+ mydata_ptr = container_of(work, myPrivateData, work);
+
+ /* mydata_ptr now points to myData object */
+ }
+
+Q.5: Are there any precautions one must take while sharing data with the
+ Asynchronous thread-pool ?
+A.5: Yes, make sure that the helper function of the type my_threadlet_func()
+ does not access/modify data when it can be accessed or modified in the
+ context of VCPU thread or IO thread. This is because the asynchronous
+ threads in the pool can run in parallel with the VCPU/IOThreads as shown
+ in the figure.
+
+ A typical workflow is as follows:
+
+ VCPU/IOThread
+ |
+ | (1)
+ |
+ V
+ Offload work (2)
+ |-------> to threadlets -----------------------------> Helper thread
+ | | |
+ | | |
+ | | (3) | (4)
+ | | |
+ | Handle other Guest requests |
+ | | |
+ | | V
+ | | (3) Signal the I/O Thread
+ |(6) | |
+ | | /
+ | | /
+ | V /
+ | Do the post <---------------------------------/
+ | processing (5)
+ | |
+ | | (6)
+ | V
+ |-Yes------ More async work?
+ |
+ | (7)
+ No
+ |
+ |
+ .
+ .
+
+ Hence one needs to make sure that in the steps (3) and (4) which run in
+ parallel, any global data is accessed within only one context.
+
+Q.6: I have queued a threadlet which I want to cancel. How do I do that ?
+A.6: Threadlets framework provides the API cancel_threadlet:
+ - int cancel_threadletwork(ThreadletWork *work)
+
+ The API scans the ThreadletQueue to see if (work) is present. If it finds
+ work, it'll dequeue work and return 0.
+
+ On the other hand, if it does not find the (work) in the ThreadletQueue,
+ then it'll return 1. This can imply two things. Either the work is being
+ processed by one of the helper threads or it has been processed. The
+ threadlet infrastructure currently _does_not_ distinguish between these
+ two and the onus is on the caller to do that.
+
+Q.7: Apart from the global pool of threads, can I have my own private Queue ?
+A.7: Yes, the threadlets framework allows subsystems to create their own private
+ queues with associated pools of threads.
+
+ - Define a PrivateQueue
+ ThreadletQueue myQueue;
+
+ - Initialize it:
+ threadlet_queue_init(&myQueue, my_max_threads, my_min_threads);
+ where my_max_threads is the maximum number of threads that can be in the
+ thread pool and my_min_threads is the minimum number of active threads
+ that can be in the thread-pool.
+
+ - Submit work:
+ submit_threadletwork_to_queue(&myQueue, &my_work);
+
+ - Cancel work:
+ cancel_threadletwork_on_queue(&myQueue, &my_work);
^ permalink raw reply related [flat|nested] 27+ messages in thread
* Re: [Qemu-devel] [PATCH 00/13] Threadlets infrastructure.
2011-01-04 5:27 [Qemu-devel] [PATCH 00/13] Threadlets infrastructure Arun R Bharadwaj
` (12 preceding siblings ...)
2011-01-04 5:28 ` [Qemu-devel] [PATCH 13/13] Threadlets: Add documentation Arun R Bharadwaj
@ 2011-01-04 23:13 ` Venkateswararao Jujjuri (JV)
2011-01-05 1:43 ` Arun R Bharadwaj
13 siblings, 1 reply; 27+ messages in thread
From: Venkateswararao Jujjuri (JV) @ 2011-01-04 23:13 UTC (permalink / raw)
To: Arun R Bharadwaj; +Cc: aneesh.kumar, aliguori, qemu-devel, stefanha
On 1/3/2011 9:27 PM, Arun R Bharadwaj wrote:
> Hi,
>
> This patch series implements threadlets infrastructure in qemu.
>
> This is a complete rework of the earlier patch series so that
> it becomes easier to review. I have broken down the earlier
> patch series as asked by Anthony
>
> The following series implements...
Arun, can you please describe the testing done on this patch series?
Thanks,
JV
>
> ---
>
> Arun R Bharadwaj (13):
> Add aiocb_mutex and aiocb_completion.
> Introduce work concept in posix-aio-compat.c
> Add callback function to ThreadletWork structure.
> Add ThreadletQueue.
> Threadlet: Add submit_work threadlet API.
> Threadlet: Add dequeue_work threadlet API
> Remove active field in qemu_aiocb structure.
> Remove thread_create routine.
> Threadlet: Add aio_signal_handler threadlet API
> Remove all instances of CONFIG_THREAD
> Move threadlet code to qemu-threadlets.c
> Threadlets: Add functionality to create private queues.
> Threadlets: Add documentation
>
>
> Makefile.objs | 3 -
> configure | 2
> docs/async-support.txt | 141 +++++++++++++++++++++++++++++
> posix-aio-compat.c | 237 ++++++++++++------------------------------------
> qemu-threadlets.c | 184 +++++++++++++++++++++++++++++++++++++
> qemu-threadlets.h | 47 ++++++++++
> vl.c | 3 +
> 7 files changed, 437 insertions(+), 180 deletions(-)
> create mode 100644 docs/async-support.txt
> create mode 100644 qemu-threadlets.c
> create mode 100644 qemu-threadlets.h
>
^ permalink raw reply [flat|nested] 27+ messages in thread
* Re: [Qemu-devel] [PATCH 00/13] Threadlets infrastructure.
2011-01-04 23:13 ` [Qemu-devel] [PATCH 00/13] Threadlets infrastructure Venkateswararao Jujjuri (JV)
@ 2011-01-05 1:43 ` Arun R Bharadwaj
0 siblings, 0 replies; 27+ messages in thread
From: Arun R Bharadwaj @ 2011-01-05 1:43 UTC (permalink / raw)
To: Venkateswararao Jujjuri (JV); +Cc: aneesh.kumar, aliguori, qemu-devel, stefanha
* Venkateswararao Jujjuri (JV) <jvrao@linux.vnet.ibm.com> [2011-01-04 15:13:45]:
> On 1/3/2011 9:27 PM, Arun R Bharadwaj wrote:
> > Hi,
> >
> > This patch series implements threadlets infrastructure in qemu.
> >
> > This is a complete rework of the earlier patch series so that
> > it becomes easier to review. I have broken down the earlier
> > patch series as asked by Anthony
> >
> > The following series implements...
>
> Arun, can you please describe the testing done on this patch series?
>
I have done the following testing:
Every patch applied incrementally builds the qemu without any issues.
Also I have run basic fsstress testing on each patch. Will be running
kvm-autotest on the series. So I will post more detailed results.
-arun
> Thanks,
> JV
>
> >
> > ---
> >
> > Arun R Bharadwaj (13):
> > Add aiocb_mutex and aiocb_completion.
> > Introduce work concept in posix-aio-compat.c
> > Add callback function to ThreadletWork structure.
> > Add ThreadletQueue.
> > Threadlet: Add submit_work threadlet API.
> > Threadlet: Add dequeue_work threadlet API
> > Remove active field in qemu_aiocb structure.
> > Remove thread_create routine.
> > Threadlet: Add aio_signal_handler threadlet API
> > Remove all instances of CONFIG_THREAD
> > Move threadlet code to qemu-threadlets.c
> > Threadlets: Add functionality to create private queues.
> > Threadlets: Add documentation
> >
> >
> > Makefile.objs | 3 -
> > configure | 2
> > docs/async-support.txt | 141 +++++++++++++++++++++++++++++
> > posix-aio-compat.c | 237 ++++++++++++------------------------------------
> > qemu-threadlets.c | 184 +++++++++++++++++++++++++++++++++++++
> > qemu-threadlets.h | 47 ++++++++++
> > vl.c | 3 +
> > 7 files changed, 437 insertions(+), 180 deletions(-)
> > create mode 100644 docs/async-support.txt
> > create mode 100644 qemu-threadlets.c
> > create mode 100644 qemu-threadlets.h
> >
>
>
^ permalink raw reply [flat|nested] 27+ messages in thread