From: "Venkateswararao Jujjuri (JV)" <jvrao@linux.vnet.ibm.com>
To: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Cc: kwolf@redhat.com, qemu-devel@nongnu.org
Subject: Re: [Qemu-devel] [PATCH 3/6] Move qemu_paio_submit() to the new infrastructure.
Date: Tue, 30 Nov 2010 15:22:19 -0800 [thread overview]
Message-ID: <4CF5872B.209@linux.vnet.ibm.com> (raw)
In-Reply-To: <20101118180702.4434.89250.stgit@localhost6.localdomain6>
On 11/18/2010 10:07 AM, Arun R Bharadwaj wrote:
> Move qemu_paio_submit() to the new infrastructure and
> introduce the necessary APIs.
>
> Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Looks good to me.
Acked-by: Venkateswararao Jujjuri <jvrao@linux.vnet.ibm.com>
> ---
> posix-aio-compat.c | 224 ++++++++++++++++++++++++++++++++++------------------
> 1 files changed, 147 insertions(+), 77 deletions(-)
>
> diff --git a/posix-aio-compat.c b/posix-aio-compat.c
> index eb1e2db..3f3c461 100644
> --- a/posix-aio-compat.c
> +++ b/posix-aio-compat.c
> @@ -31,6 +31,9 @@
>
> #include "block/raw-posix-aio.h"
>
> +#define MAX_GLOBAL_THREADS 64
> +#define MIN_GLOBAL_THREADS 8
> +
> static QemuMutex aiocb_mutex;
> static QemuCond aiocb_completion;
>
> @@ -52,6 +55,7 @@ typedef struct ThreadletWork
> } ThreadletWork;
>
> static ThreadletQueue globalqueue;
> +static int globalqueue_init;
>
> struct qemu_paiocb {
> BlockDriverAIOCB common;
> @@ -81,6 +85,100 @@ typedef struct PosixAioState {
> struct qemu_paiocb *first_aio;
> } PosixAioState;
>
> +static void *threadlet_worker(void *data)
> +{
> + ThreadletQueue *queue = data;
> +
> + qemu_mutex_lock(&queue->lock);
> + while (1) {
> + ThreadletWork *work;
> + int ret = 0;
> +
> + while (QTAILQ_EMPTY(&queue->request_list) &&
> + (ret != ETIMEDOUT)) {
> + /* wait for cond to be signalled or broadcast for 1000s */
> + ret = qemu_cond_timedwait((&queue->cond),
> + &(queue->lock), 10*100000);
> + }
> +
> + assert(queue->idle_threads != 0);
> + if (QTAILQ_EMPTY(&queue->request_list)) {
> + if (queue->cur_threads > queue->min_threads) {
> + /* We retain the minimum number of threads */
> + break;
> + }
> + } else {
> + work = QTAILQ_FIRST(&queue->request_list);
> + QTAILQ_REMOVE(&queue->request_list, work, node);
> +
> + queue->idle_threads--;
> + qemu_mutex_unlock(&queue->lock);
> +
> + /* execute the work function */
> + work->func(work);
> +
> + qemu_mutex_lock(&queue->lock);
> + queue->idle_threads++;
> + }
> + }
> +
> + queue->idle_threads--;
> + queue->cur_threads--;
> + qemu_mutex_unlock(&queue->lock);
> +
> + return NULL;
> +}
> +
> +static void spawn_threadlet(ThreadletQueue *queue)
> +{
> + QemuThread thread;
> +
> + queue->cur_threads++;
> + queue->idle_threads++;
> +
> + qemu_thread_create(&thread, threadlet_worker, queue);
> +}
> +
> +/**
> + * submit_work_to_queue: Submit a new task to a private queue to be
> + * executed asynchronously.
> + * @queue: Per-subsystem private queue to which the new task needs
> + * to be submitted.
> + * @work: Contains information about the task that needs to be submitted.
> + */
> +
> +static void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
> +{
> + qemu_mutex_lock(&queue->lock);
> + if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
> + spawn_threadlet(queue);
> + } else {
> + qemu_cond_signal(&queue->cond);
> + }
> + QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
> + qemu_mutex_unlock(&queue->lock);
> +}
> +
> +static void threadlet_queue_init(ThreadletQueue *queue,
> + int max_threads, int min_threads);
> +
> +/**
> + * submit_work: Submit to the global queue a new task to be executed
> + * asynchronously.
> + * @work: Contains information about the task that needs to be submitted.
> + */
> +
> +static void submit_work(ThreadletWork *work)
> +{
> + if (!globalqueue_init) {
> + threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
> + MIN_GLOBAL_THREADS);
> + globalqueue_init = 1;
> + }
> +
> + submit_work_to_queue(&globalqueue, work);
> +}
> +
> /**
> * dequeue_work_on_queue: Cancel a task queued on a Queue.
> * @queue: The queue containing the task to be cancelled.
> @@ -121,6 +219,25 @@ static int dequeue_work(ThreadletWork *work)
> return dequeue_work_on_queue(&globalqueue, work);
> }
>
> +/**
> + * threadlet_queue_init: Initialize a threadlet queue.
> + * @queue: The threadlet queue to be initialized.
> + * @max_threads: Maximum number of threads processing the queue.
> + * @min_threads: Minimum number of threads processing the queue.
> + */
> +
> +static void threadlet_queue_init(ThreadletQueue *queue,
> + int max_threads, int min_threads)
> +{
> + queue->cur_threads = 0;
> + queue->idle_threads = 0;
> + queue->max_threads = max_threads;
> + queue->min_threads = min_threads;
> + QTAILQ_INIT(&queue->request_list);
> + qemu_mutex_init(&queue->lock);
> + qemu_cond_init(&queue->cond);
> +}
> +
> static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
> static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
> static pthread_t thread_id;
> @@ -363,96 +480,49 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
> return nbytes;
> }
>
> -static void *aio_thread(void *unused)
> +static void aio_thread(ThreadletWork *work)
> {
> pid_t pid;
> + struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
> + ssize_t ret = 0;
>
> pid = getpid();
>
> - while (1) {
> - struct qemu_paiocb *aiocb;
> - ssize_t ret = 0;
> - qemu_timeval tv;
> - struct timespec ts;
> -
> - qemu_gettimeofday(&tv);
> - ts.tv_sec = tv.tv_sec + 10;
> - ts.tv_nsec = 0;
> -
> - mutex_lock(&lock);
> -
> - while (QTAILQ_EMPTY(&request_list) &&
> - !(ret == ETIMEDOUT)) {
> - ret = cond_timedwait(&cond, &lock, &ts);
> - }
> -
> - if (QTAILQ_EMPTY(&request_list))
> - break;
> -
> - aiocb = QTAILQ_FIRST(&request_list);
> - QTAILQ_REMOVE(&request_list, aiocb, node);
> - aiocb->active = 1;
> - idle_threads--;
> - mutex_unlock(&lock);
> -
> - switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> - case QEMU_AIO_READ:
> - case QEMU_AIO_WRITE:
> - ret = handle_aiocb_rw(aiocb);
> - break;
> - case QEMU_AIO_FLUSH:
> - ret = handle_aiocb_flush(aiocb);
> - break;
> - case QEMU_AIO_IOCTL:
> - ret = handle_aiocb_ioctl(aiocb);
> - break;
> - default:
> - fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> - ret = -EINVAL;
> - break;
> - }
> -
> - mutex_lock(&lock);
> - aiocb->ret = ret;
> - idle_threads++;
> - mutex_unlock(&lock);
> -
> - if (kill(pid, aiocb->ev_signo)) die("kill failed");
> + switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
> + case QEMU_AIO_READ:
> + case QEMU_AIO_WRITE:
> + ret = handle_aiocb_rw(aiocb);
> + break;
> + case QEMU_AIO_FLUSH:
> + ret = handle_aiocb_flush(aiocb);
> + break;
> + case QEMU_AIO_IOCTL:
> + ret = handle_aiocb_ioctl(aiocb);
> + break;
> + default:
> + fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
> + ret = -EINVAL;
> + break;
> }
>
> - idle_threads--;
> - cur_threads--;
> - mutex_unlock(&lock);
> -
> - return NULL;
> -}
> -
> -static void spawn_thread(void)
> -{
> - sigset_t set, oldset;
> -
> - cur_threads++;
> - idle_threads++;
> -
> - /* block all signals */
> - if (sigfillset(&set)) die("sigfillset");
> - if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
> -
> - thread_create(&thread_id, &attr, aio_thread, NULL);
> + qemu_mutex_lock(&aiocb_mutex);
> + aiocb->ret = ret;
> + qemu_cond_broadcast(&aiocb_completion);
> + qemu_mutex_unlock(&aiocb_mutex);
>
> - if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
> + if (kill(pid, aiocb->ev_signo)) {
> + die("kill failed");
> + }
> }
>
> static void qemu_paio_submit(struct qemu_paiocb *aiocb)
> {
> + qemu_mutex_lock(&aiocb_mutex);
> aiocb->ret = -EINPROGRESS;
> - aiocb->active = 0;
> - mutex_lock(&lock);
> - if (idle_threads == 0 && cur_threads < max_threads)
> - spawn_thread();
> - QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
> - mutex_unlock(&lock);
> - cond_signal(&cond);
> + qemu_mutex_unlock(&aiocb_mutex);
> +
> + aiocb->work.func = aio_thread;
> + submit_work(&aiocb->work);
> }
>
> static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
>
>
next prev parent reply other threads:[~2010-12-01 3:31 UTC|newest]
Thread overview: 15+ messages / expand[flat|nested] mbox.gz Atom feed top
2010-11-18 18:06 [Qemu-devel] [PATCH 0/6] v12: Threadlets: A generic task offloading framework Arun R Bharadwaj
2010-11-18 18:06 ` [Qemu-devel] [PATCH 1/6] Make the necessary changes in Makefile and configure file Arun R Bharadwaj
2010-11-19 9:55 ` Stefan Hajnoczi
2010-11-18 18:06 ` [Qemu-devel] [PATCH 2/6] Move paio_cancel() to new infrastructure Arun R Bharadwaj
2010-11-19 10:07 ` Stefan Hajnoczi
2010-11-30 23:20 ` Venkateswararao Jujjuri (JV)
2010-11-18 18:07 ` [Qemu-devel] [PATCH 3/6] Move qemu_paio_submit() to the " Arun R Bharadwaj
2010-11-30 23:22 ` Venkateswararao Jujjuri (JV) [this message]
2010-11-18 18:07 ` [Qemu-devel] [PATCH 4/6] Cleanup posix-aio.compat.c off all the old code Arun R Bharadwaj
2010-11-30 23:23 ` Venkateswararao Jujjuri (JV)
2010-11-18 18:07 ` [Qemu-devel] [PATCH 5/6] Move threadlets infrastructure to qemu-threadlets.c Arun R Bharadwaj
2010-11-30 23:18 ` Venkateswararao Jujjuri (JV)
2010-11-18 18:07 ` [Qemu-devel] [PATCH 6/6] Add helper functions to enable virtio-9p make use of the threadlets Arun R Bharadwaj
2010-11-19 10:17 ` Stefan Hajnoczi
2010-11-30 23:17 ` Venkateswararao Jujjuri (JV)
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=4CF5872B.209@linux.vnet.ibm.com \
--to=jvrao@linux.vnet.ibm.com \
--cc=arun@linux.vnet.ibm.com \
--cc=kwolf@redhat.com \
--cc=qemu-devel@nongnu.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.