qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: Anthony Liguori <anthony@codemonkey.ws>
To: Gautham R Shenoy <ego@in.ibm.com>
Cc: "Aneesh Kumar K.V" <aneesh.kumar@linux.vnet.ibm.com>,
	Qemu-development List <qemu-devel@nongnu.org>,
	Corentin Chary <corentin.chary@gmail.com>,
	Avi Kivity <avi@redhat.com>
Subject: Re: [Qemu-devel] [PATCH V3 2/3] qemu: Generic asynchronous threading framework to offload tasks
Date: Fri, 04 Jun 2010 08:16:19 -0500	[thread overview]
Message-ID: <4C08FCA3.3020602@codemonkey.ws> (raw)
In-Reply-To: <20100603085623.25546.14585.stgit@localhost.localdomain>

On 06/03/2010 03:56 AM, Gautham R Shenoy wrote:
> From: Aneesh Kumar K.V<aneesh.kumar@linux.vnet.ibm.com>
>
> This patch creates a generic asynchronous-task-offloading infrastructure. It's
> extracted out of the threading framework that is being used by paio.
>
> The reason for extracting out this generic infrastructure of the
> posix-aio-compat.c is so that other subsystems, such as virtio-9p could make use
> of it for offloading tasks that could block.
>
> [ego@in.ibm.com: work_item_pool, async_work_init, async_work_release,
> async_cancel_work]
>
> Signed-off-by: Aneesh Kumar K.V<aneesh.kumar@linux.vnet.ibm.com>
> Signed-off-by: Gautham R Shenoy<ego@in.ibm.com>
> ---
>   Makefile.objs |    3 +
>   async-work.c  |  136 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>   async-work.h  |   85 ++++++++++++++++++++++++++++++++++++
>   3 files changed, 223 insertions(+), 1 deletions(-)
>   create mode 100644 async-work.c
>   create mode 100644 async-work.h
>
> diff --git a/Makefile.objs b/Makefile.objs
> index ecdd53e..fd5ea4d 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -9,6 +9,8 @@ qobject-obj-y += qerror.o
>
>   block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
>   block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
> +block-obj-y += qemu-thread.o
> +block-obj-y += async-work.o
>   block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
>   block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>
> @@ -108,7 +110,6 @@ common-obj-y += iov.o
>   common-obj-$(CONFIG_VNC_TLS) += vnc-tls.o vnc-auth-vencrypt.o
>   common-obj-$(CONFIG_VNC_SASL) += vnc-auth-sasl.o
>   common-obj-$(CONFIG_COCOA) += cocoa.o
> -common-obj-$(CONFIG_IOTHREAD) += qemu-thread.o
>   common-obj-y += notify.o event_notifier.o
>   common-obj-y += qemu-timer.o
>
> diff --git a/async-work.c b/async-work.c
> new file mode 100644
> index 0000000..0675732
> --- /dev/null
> +++ b/async-work.c
> @@ -0,0 +1,136 @@
> +/*
> + * Async work support
> + *
> + * Copyright IBM, Corp. 2010
> + *
> + * Authors:
> + *  Aneesh Kumar K.V<aneesh.kumar@linux.vnet.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.  See
> + * the COPYING file in the top-level directory.
>    

Please preserve the original copyright of the copied code.

> + */
> +#include<stdio.h>
> +#include<errno.h>
> +#include<string.h>
> +#include<stdlib.h>
> +#include<signal.h>
>    

qemu-common.h should have all of these.  Generally, you should avoid 
including system headers because qemu headers take care of portability.

> +#include "async-work.h"
> +#include "osdep.h"
> +
> +static void async_abort(int err, const char *what)
> +{
> +    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
> +    abort();
> +}
> +
> +static void *async_worker_thread(void *data)
> +{
> +    struct async_queue *queue = data;
> +
> +    while (1) {
> +        struct work_item *work;
> +        int ret = 0;
> +        qemu_mutex_lock(&(queue->lock));
> +
> +        while (QTAILQ_EMPTY(&(queue->request_list))&&
> +               (ret != ETIMEDOUT)) {
> +            ret = qemu_cond_timedwait(&(queue->cond),
> +					&(queue->lock), 10*100000);
> +        }
> +
> +        if (QTAILQ_EMPTY(&(queue->request_list)))
> +            goto check_exit;
> +
> +        work = QTAILQ_FIRST(&(queue->request_list));
> +        QTAILQ_REMOVE(&(queue->request_list), work, node);
> +        queue->idle_threads--;
> +        qemu_mutex_unlock(&(queue->lock));
> +
> +        /* execute the work function */
> +        work->func(work);
> +        async_work_release(queue, work);
> +
> +        qemu_mutex_lock(&(queue->lock));
> +        queue->idle_threads++;
> +
> +check_exit:
> +        if ((queue->idle_threads>  0)&&
> +            (queue->cur_threads>  queue->min_threads)) {
> +            /* we retain minimum number of threads */
> +            break;
> +        }
> +        qemu_mutex_unlock(&(queue->lock));
> +    }
> +
> +    queue->idle_threads--;
> +    queue->cur_threads--;
> +    qemu_mutex_unlock(&(queue->lock));
> +
> +    return NULL;
> +}
> +
> +static void spawn_async_thread(struct async_queue *queue)
> +{
> +    QemuThreadAttr attr;
> +    QemuThread thread;
> +    sigset_t set, oldset;
> +
> +    queue->cur_threads++;
> +    queue->idle_threads++;
> +
> +    qemu_thread_attr_init(&attr);
> +
> +    /* create a detached thread so that we don't need to wait on it */
> +    qemu_thread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
> +
> +    /* block all signals */
> +    if (sigfillset(&set)) {
> +        async_abort(errno, "sigfillset");
> +    }
> +
> +    if (sigprocmask(SIG_SETMASK,&set,&oldset)) {
> +        async_abort(errno, "sigprocmask");
> +    }
> +
> +    qemu_thread_create_attr(&thread,&attr, async_worker_thread, queue);
> +
> +    if (sigprocmask(SIG_SETMASK,&oldset, NULL)) {
> +        async_abort(errno, "sigprocmask restore");
> +    }
> +}
> +
> +void qemu_async_submit(struct async_queue *queue, struct work_item *work)
> +{
> +    qemu_mutex_lock(&(queue->lock));
> +    if (queue->idle_threads == 0&&  queue->cur_threads<  queue->max_threads) {
> +        spawn_async_thread(queue);
> +    }
> +    QTAILQ_INSERT_TAIL(&(queue->request_list), work, node);
> +    qemu_mutex_unlock(&(queue->lock));
> +    qemu_cond_signal(&(queue->cond));
> +}
> +
> +int qemu_async_cancel_work(struct async_queue *queue, struct work_item *work)
> +{
> +    struct work_item *ret_work;
> +    int found = 0;
> +
> +    qemu_mutex_lock(&(queue->lock));
> +    QTAILQ_FOREACH(ret_work,&(queue->request_list), node) {
> +        if (ret_work == work) {
> +            QTAILQ_REMOVE(&(queue->request_list), ret_work, node);
> +            found = 1;
> +            break;
> +        }
> +    }
> +    qemu_mutex_unlock(&(queue->lock));
> +
> +    if (found) {
> +        async_work_release(queue, work);
> +        return 0;
> +    }
> +
> +    return 1;
> +}
> +
> diff --git a/async-work.h b/async-work.h
> new file mode 100644
> index 0000000..8389f56
> --- /dev/null
> +++ b/async-work.h
> @@ -0,0 +1,85 @@
> +/*
> + * Async work support
> + *
> + * Copyright IBM, Corp. 2010
> + *
> + * Authors:
> + *  Aneesh Kumar K.V<aneesh.kumar@linux.vnet.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.  See
> + * the COPYING file in the top-level directory.
> + *
> + */
> +#ifndef QEMU_ASYNC_WORK_H
> +#define QEMU_ASYNC_WORK_H
> +
> +#include "qemu-queue.h"
> +#include "qemu-common.h"
> +#include "qemu-thread.h"
> +
> +struct async_queue
> +{
> +    QemuMutex lock;
> +    QemuCond cond;
> +    int max_threads;
> +    int min_threads;
> +    int cur_threads;
> +    int idle_threads;
> +    QTAILQ_HEAD(, work_item) request_list;
> +    QTAILQ_HEAD(, work_item) work_item_pool;
> +};
> +
> +struct work_item
> +{
> +    QTAILQ_ENTRY(work_item) node;
> +    void (*func)(struct work_item *work);
> +    void *private;
> +};
>    

Structs are not named in accordance to CODING_STYLE.

> +static inline void async_queue_init(struct async_queue *queue,
> +				    int max_threads, int min_threads)
> +{
> +    queue->cur_threads  = 0;
> +    queue->idle_threads = 0;
> +    queue->max_threads  = max_threads;
> +    queue->min_threads  = min_threads;
> +    QTAILQ_INIT(&(queue->request_list));
> +    QTAILQ_INIT(&(queue->work_item_pool));
> +    qemu_mutex_init(&(queue->lock));
> +    qemu_cond_init(&(queue->cond));
> +}
>    

I'd prefer there be a single queue that everything used verses multiple 
queues.  Otherwise, we'll end up having per device queues and my concern 
is that we'll end up with thousands and thousands of threads with no 
central place to tune the maximum thread number.

> +static inline struct work_item *async_work_init(struct async_queue *queue,
> +				   void (*func)(struct work_item *),
> +				   void *data)
>    

I'd suggest actually using a Notifier as the worker or at least 
something that looks exactly like it.  There's no need to pass a void * 
because more often than not, a caller just wants to pass a state 
structure anyway and they can embed the Notifier within the structure.  IOW:

async_work_submit(queue, &s->worker);

Then in the callback:

DeviceState *s = container_of(worker, DeviceState, worker);

I don't think the name makes the most sense either.  I think something like:

threadlet_submit()

Would work best.  It would be good for there to be a big comment warning 
that the routine does not run with the qemu_mutex and therefore cannot 
make use of any qemu functions without very special consideration.


There shouldn't need to be an explicit init vs. submit function either.

Regards,

Anthony Liguori

  parent reply	other threads:[~2010-06-04 13:16 UTC|newest]

Thread overview: 14+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2010-06-03  8:56 [Qemu-devel] [PATCH V3 0/3] qemu: Make AIO threading framework generic Gautham R Shenoy
2010-06-03  8:56 ` [Qemu-devel] [PATCH V3 1/3] qemu: Add qemu-wrappers for pthread_attr_t Gautham R Shenoy
2010-06-03 12:31   ` [Qemu-devel] " Paolo Bonzini
2010-06-04 13:07     ` Anthony Liguori
2010-06-04 13:19       ` Corentin Chary
2010-06-04 13:27         ` Paolo Bonzini
2010-06-10 11:12         ` Gautham R Shenoy
2010-06-03  8:56 ` [Qemu-devel] [PATCH V3 2/3] qemu: Generic asynchronous threading framework to offload tasks Gautham R Shenoy
2010-06-03 11:41   ` [Qemu-devel] " Corentin Chary
2010-06-03 12:37     ` Paolo Bonzini
2010-06-04 13:16   ` Anthony Liguori [this message]
2010-06-05  7:03     ` [Qemu-devel] " Corentin Chary
2010-06-10 11:37     ` Gautham R Shenoy
2010-06-03  8:56 ` [Qemu-devel] [PATCH V3 3/3] qemu: Convert AIO code to use the generic threading infrastructure Gautham R Shenoy

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=4C08FCA3.3020602@codemonkey.ws \
    --to=anthony@codemonkey.ws \
    --cc=aneesh.kumar@linux.vnet.ibm.com \
    --cc=avi@redhat.com \
    --cc=corentin.chary@gmail.com \
    --cc=ego@in.ibm.com \
    --cc=qemu-devel@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).