All of lore.kernel.org
 help / color / mirror / Atom feed
From: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
To: qemu-devel@nongnu.org
Subject: [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure
Date: Mon, 8 Nov 2010 20:03:22 +0530	[thread overview]
Message-ID: <20101108143322.GA10435@linux.vnet.ibm.com> (raw)
In-Reply-To: <20101108104542.6769.22583.stgit@localhost6.localdomain6>

* Arun R Bharadwaj <arun@linux.vnet.ibm.com> [2010-11-08 16:16:50]:

Make paio subsystem use threadlets infrastructure

From: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>

This patch creates a generic asynchronous-task-offloading infrastructure named
threadlets.

The patch creates a global queue on-to which subsystems can queue their tasks to
be executed asynchronously.

The patch also provides API's that allow a subsystem to create a private queue
with an associated pool of threads.

The patch has been tested with fstress.

Signed-off-by: Aneesh Kumar K.V <aneesh.kumar@linux.vnet.ibm.com>
Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com>
Signed-off-by: Gautham R Shenoy <gautham.shenoy@gmail.com>
Signed-off-by: Sripathi Kodi <sripathik@in.ibm.com>
---
 Makefile.objs      |    2 
 posix-aio-compat.c |  340 ++++++++++++++++++++++++++++++++--------------------
 2 files changed, 208 insertions(+), 134 deletions(-)

diff --git a/Makefile.objs b/Makefile.objs
index cd5a24b..3b7ec27 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -9,6 +9,7 @@ qobject-obj-y += qerror.o
 
 block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
 block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
+block-obj-$(CONFIG_POSIX) += qemu-thread.o
 block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 
@@ -124,7 +125,6 @@ endif
 common-obj-y += $(addprefix ui/, $(ui-obj-y))
 
 common-obj-y += iov.o acl.o
-common-obj-$(CONFIG_THREAD) += qemu-thread.o
 common-obj-y += notify.o event_notifier.o
 common-obj-y += qemu-timer.o
 
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 7b862b5..00b2a4e 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -29,7 +29,32 @@
 #include "block_int.h"
 
 #include "block/raw-posix-aio.h"
+#include "qemu-thread.h"
 
+#define MAX_GLOBAL_THREADS  64
+#define MIN_GLOBAL_THREADS   8
+
+QemuMutex aiocb_mutex;
+
+typedef struct ThreadletQueue
+{
+    QemuMutex lock;
+    QemuCond cond;
+    int max_threads;
+    int min_threads;
+    int cur_threads;
+    int idle_threads;
+    QTAILQ_HEAD(, ThreadletWork) request_list;
+} ThreadletQueue;
+
+typedef struct ThreadletWork
+{
+    QTAILQ_ENTRY(ThreadletWork) node;
+    void (*func)(struct ThreadletWork *work);
+} ThreadletWork;
+
+static ThreadletQueue globalqueue;
+static int globalqueue_init;
 
 struct qemu_paiocb {
     BlockDriverAIOCB common;
@@ -44,13 +69,13 @@ struct qemu_paiocb {
     int ev_signo;
     off_t aio_offset;
 
-    QTAILQ_ENTRY(qemu_paiocb) node;
     int aio_type;
     ssize_t ret;
     int active;
     struct qemu_paiocb *next;
 
     int async_context_id;
+    ThreadletWork work;
 };
 
 typedef struct PosixAioState {
@@ -58,64 +83,169 @@ typedef struct PosixAioState {
     struct qemu_paiocb *first_aio;
 } PosixAioState;
 
+static void *threadlet_worker(void *data)
+{
+    ThreadletQueue *queue = data;
 
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static pthread_t thread_id;
-static pthread_attr_t attr;
-static int max_threads = 64;
-static int cur_threads = 0;
-static int idle_threads = 0;
-static QTAILQ_HEAD(, qemu_paiocb) request_list;
+    qemu_mutex_lock(&queue->lock);
+    while (1) {
+        ThreadletWork *work;
+        int ret = 0;
+
+        while (QTAILQ_EMPTY(&queue->request_list) &&
+               (ret != ETIMEDOUT)) {
+            /* wait for cond to be signalled or broadcast for 1000s */
+            ret = qemu_cond_timedwait((&queue->cond),
+                                         &(queue->lock), 10*100000);
+        }
 
-#ifdef CONFIG_PREADV
-static int preadv_present = 1;
-#else
-static int preadv_present = 0;
-#endif
+        assert(queue->idle_threads != 0);
+        if (QTAILQ_EMPTY(&queue->request_list)) {
+            if (queue->cur_threads > queue->min_threads) {
+                /* We retain the minimum number of threads */
+                break;
+            }
+        } else {
+            work = QTAILQ_FIRST(&queue->request_list);
+            QTAILQ_REMOVE(&queue->request_list, work, node);
 
-static void die2(int err, const char *what)
-{
-    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
-    abort();
+            queue->idle_threads--;
+            qemu_mutex_unlock(&queue->lock);
+
+            /* execute the work function */
+            work->func(work);
+
+            qemu_mutex_lock(&queue->lock);
+            queue->idle_threads++;
+        }
+    }
+
+    queue->idle_threads--;
+    queue->cur_threads--;
+    qemu_mutex_unlock(&queue->lock);
+
+    return NULL;
 }
 
-static void die(const char *what)
+static void spawn_threadlet(ThreadletQueue *queue)
 {
-    die2(errno, what);
+    QemuThread thread;
+
+    queue->cur_threads++;
+    queue->idle_threads++;
+
+    qemu_thread_create(&thread, threadlet_worker, queue);
 }
 
-static void mutex_lock(pthread_mutex_t *mutex)
+/**
+ * submit_work_to_queue: Submit a new task to a private queue to be
+ *                            executed asynchronously.
+ * @queue: Per-subsystem private queue to which the new task needs
+ *         to be submitted.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+static void submit_work_to_queue(ThreadletQueue *queue, ThreadletWork *work)
 {
-    int ret = pthread_mutex_lock(mutex);
-    if (ret) die2(ret, "pthread_mutex_lock");
+    qemu_mutex_lock(&queue->lock);
+    if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) {
+        spawn_threadlet(queue);
+    } else {
+        qemu_cond_signal(&queue->cond);
+    }
+    QTAILQ_INSERT_TAIL(&queue->request_list, work, node);
+    qemu_mutex_unlock(&queue->lock);
 }
 
-static void mutex_unlock(pthread_mutex_t *mutex)
+static void threadlet_queue_init(ThreadletQueue *queue,
+                          int max_threads, int min_threads);
+
+/**
+ * submit_work: Submit to the global queue a new task to be executed
+ *                   asynchronously.
+ * @work: Contains information about the task that needs to be submitted.
+ */
+static void submit_work(ThreadletWork *work)
 {
-    int ret = pthread_mutex_unlock(mutex);
-    if (ret) die2(ret, "pthread_mutex_unlock");
+    if (!globalqueue_init) {
+        threadlet_queue_init(&globalqueue, MAX_GLOBAL_THREADS,
+                                MIN_GLOBAL_THREADS);
+        globalqueue_init = 1;
+    }
+
+    submit_work_to_queue(&globalqueue, work);
 }
 
-static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
-                           struct timespec *ts)
+/**
+ * dequeue_work_on_queue: Cancel a task queued on a Queue.
+ * @queue: The queue containing the task to be cancelled.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ *          1 otherwise.
+ */
+static int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
 {
-    int ret = pthread_cond_timedwait(cond, mutex, ts);
-    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
+    ThreadletWork *ret_work;
+    int ret = 1;
+
+    qemu_mutex_lock(&queue->lock);
+    QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
+        if (ret_work == work) {
+            QTAILQ_REMOVE(&queue->request_list, ret_work, node);
+            ret = 0;
+            break;
+        }
+    }
+    qemu_mutex_unlock(&queue->lock);
+
     return ret;
 }
 
-static void cond_signal(pthread_cond_t *cond)
+/**
+ * dequeue_work: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ *          1 otherwise.
+ */
+static int dequeue_work(ThreadletWork *work)
+{
+    return dequeue_work_on_queue(&globalqueue, work);
+}
+
+/**
+ * threadlet_queue_init: Initialize a threadlet queue.
+ * @queue: The threadlet queue to be initialized.
+ * @max_threads: Maximum number of threads processing the queue.
+ * @min_threads: Minimum number of threads processing the queue.
+ */
+static void threadlet_queue_init(ThreadletQueue *queue,
+                                    int max_threads, int min_threads)
+{
+    queue->cur_threads  = 0;
+    queue->idle_threads = 0;
+    queue->max_threads  = max_threads;
+    queue->min_threads  = min_threads;
+    QTAILQ_INIT(&queue->request_list);
+    qemu_mutex_init(&queue->lock);
+    qemu_cond_init(&queue->cond);
+}
+
+#ifdef CONFIG_PREADV
+static int preadv_present = 1;
+#else
+static int preadv_present = 0;
+#endif
+
+static void die2(int err, const char *what)
 {
-    int ret = pthread_cond_signal(cond);
-    if (ret) die2(ret, "pthread_cond_signal");
+    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
+    abort();
 }
 
-static void thread_create(pthread_t *thread, pthread_attr_t *attr,
-                          void *(*start_routine)(void*), void *arg)
+static void die(const char *what)
 {
-    int ret = pthread_create(thread, attr, start_routine, arg);
-    if (ret) die2(ret, "pthread_create");
+    die2(errno, what);
 }
 
 static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
@@ -301,106 +431,59 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
     return nbytes;
 }
 
-static void *aio_thread(void *unused)
+static void aio_thread(ThreadletWork *work)
 {
     pid_t pid;
+    struct qemu_paiocb *aiocb = container_of(work, struct qemu_paiocb, work);
+    ssize_t ret = 0;
 
     pid = getpid();
+    aiocb->active = 1;
 
-    while (1) {
-        struct qemu_paiocb *aiocb;
-        ssize_t ret = 0;
-        qemu_timeval tv;
-        struct timespec ts;
-
-        qemu_gettimeofday(&tv);
-        ts.tv_sec = tv.tv_sec + 10;
-        ts.tv_nsec = 0;
-
-        mutex_lock(&lock);
-
-        while (QTAILQ_EMPTY(&request_list) &&
-               !(ret == ETIMEDOUT)) {
-            ret = cond_timedwait(&cond, &lock, &ts);
-        }
-
-        if (QTAILQ_EMPTY(&request_list))
-            break;
-
-        aiocb = QTAILQ_FIRST(&request_list);
-        QTAILQ_REMOVE(&request_list, aiocb, node);
-        aiocb->active = 1;
-        idle_threads--;
-        mutex_unlock(&lock);
-
-        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
-        case QEMU_AIO_READ:
-        case QEMU_AIO_WRITE:
-            ret = handle_aiocb_rw(aiocb);
-            break;
-        case QEMU_AIO_FLUSH:
-            ret = handle_aiocb_flush(aiocb);
-            break;
-        case QEMU_AIO_IOCTL:
-            ret = handle_aiocb_ioctl(aiocb);
-            break;
-        default:
-            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
-            ret = -EINVAL;
-            break;
-        }
-
-        mutex_lock(&lock);
-        aiocb->ret = ret;
-        idle_threads++;
-        mutex_unlock(&lock);
-
-        if (kill(pid, aiocb->ev_signo)) die("kill failed");
+    switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
+    case QEMU_AIO_READ:
+    case QEMU_AIO_WRITE:
+        ret = handle_aiocb_rw(aiocb);
+        break;
+    case QEMU_AIO_FLUSH:
+        ret = handle_aiocb_flush(aiocb);
+        break;
+    case QEMU_AIO_IOCTL:
+        ret = handle_aiocb_ioctl(aiocb);
+        break;
+    default:
+        fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+        ret = -EINVAL;
+        break;
     }
 
-    idle_threads--;
-    cur_threads--;
-    mutex_unlock(&lock);
-
-    return NULL;
-}
-
-static void spawn_thread(void)
-{
-    sigset_t set, oldset;
-
-    cur_threads++;
-    idle_threads++;
-
-    /* block all signals */
-    if (sigfillset(&set)) die("sigfillset");
-    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
+    qemu_mutex_lock(&aiocb_mutex);
+    aiocb->ret = ret;
+    qemu_mutex_unlock(&aiocb_mutex);
 
-    thread_create(&thread_id, &attr, aio_thread, NULL);
-
-    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
+    if (kill(pid, aiocb->ev_signo)) {
+         die("kill failed");
+    }
 }
 
 static void qemu_paio_submit(struct qemu_paiocb *aiocb)
 {
+    qemu_mutex_lock(&aiocb_mutex);
     aiocb->ret = -EINPROGRESS;
+    qemu_mutex_unlock(&aiocb_mutex);
     aiocb->active = 0;
-    mutex_lock(&lock);
-    if (idle_threads == 0 && cur_threads < max_threads)
-        spawn_thread();
-    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
-    mutex_unlock(&lock);
-    cond_signal(&cond);
+
+    aiocb->work.func = aio_thread;
+    submit_work(&aiocb->work);
 }
 
 static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
 {
     ssize_t ret;
 
-    mutex_lock(&lock);
+    qemu_mutex_lock(&aiocb_mutex);
     ret = aiocb->ret;
-    mutex_unlock(&lock);
-
+    qemu_mutex_unlock(&aiocb_mutex);
     return ret;
 }
 
@@ -536,20 +619,20 @@ static void paio_cancel(BlockDriverAIOCB *blockacb)
     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
     int active = 0;
 
-    mutex_lock(&lock);
     if (!acb->active) {
-        QTAILQ_REMOVE(&request_list, acb, node);
-        acb->ret = -ECANCELED;
+        if (!dequeue_work(&acb->work)) {
+            acb->ret = -ECANCELED;
+         } else {
+            active = 1;
+         }
     } else if (acb->ret == -EINPROGRESS) {
         active = 1;
     }
-    mutex_unlock(&lock);
 
     if (active) {
         /* fail safe: if the aio could not be canceled, we wait for
            it */
-        while (qemu_paio_error(acb) == EINPROGRESS)
-            ;
+        active = qemu_paio_error(acb);
     }
 
     paio_remove(acb);
@@ -618,11 +701,12 @@ int paio_init(void)
     struct sigaction act;
     PosixAioState *s;
     int fds[2];
-    int ret;
 
     if (posix_aio_state)
         return 0;
 
+    qemu_mutex_init(&aiocb_mutex);
+
     s = qemu_malloc(sizeof(PosixAioState));
 
     sigfillset(&act.sa_mask);
@@ -645,16 +729,6 @@ int paio_init(void)
     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
         posix_aio_process_queue, s);
 
-    ret = pthread_attr_init(&attr);
-    if (ret)
-        die2(ret, "pthread_attr_init");
-
-    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-    if (ret)
-        die2(ret, "pthread_attr_setdetachstate");
-
-    QTAILQ_INIT(&request_list);
-
     posix_aio_state = s;
     return 0;
 }

  parent reply	other threads:[~2010-11-08 14:33 UTC|newest]

Thread overview: 20+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2010-11-08 10:46 [Qemu-devel] [REFACTORED CODE] [PATCH 0/3] Threadlets: A generic task offloading framework Arun R Bharadwaj
2010-11-08 10:46 ` [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure Arun R Bharadwaj
2010-11-08 10:47 ` [Qemu-devel] [PATCH 2/3] Move threadlets infrastructure to qemu-threadlets.c Arun R Bharadwaj
2010-11-08 10:47 ` [Qemu-devel] [PATCH 3/3] Add helper functions to enable virtio-9p make use of the threadlets Arun R Bharadwaj
2010-11-09  6:44   ` Venkateswararao Jujjuri (JV)
2010-11-09  9:06   ` Stefan Hajnoczi
2010-11-10  1:41     ` Venkateswararao Jujjuri (JV)
2010-11-10  9:37       ` Stefan Hajnoczi
2010-11-08 12:29 ` [Qemu-devel] [REFACTORED CODE] [PATCH 0/3] Threadlets: A generic task offloading framework Arun R Bharadwaj
2010-11-08 14:33 ` Arun R Bharadwaj [this message]
2010-11-08 21:29   ` [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure Stefan Hajnoczi
2010-11-09 12:06     ` Arun R Bharadwaj
2010-11-09 13:14       ` Stefan Hajnoczi
  -- strict thread matches above, loose matches on Subject: below --
2010-11-10 13:19 [Qemu-devel] v10: [PATCH 0/3] Threadlets: A generic task offloading framework Arun R Bharadwaj
2010-11-10 13:19 ` [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure Arun R Bharadwaj
2010-11-10 13:45   ` Stefan Hajnoczi
2010-11-10 17:54     ` Arun R Bharadwaj
2010-11-10 20:13       ` Stefan Hajnoczi
2010-11-15 17:53 [Qemu-devel] [PATCH 0/3] v11: Threadlets: A generic task offloading framework Arun R Bharadwaj
2010-11-15 17:53 ` [Qemu-devel] [PATCH 1/3] Make paio subsystem use threadlets infrastructure Arun R Bharadwaj
2010-11-15 21:13   ` Anthony Liguori
2010-11-18 18:05     ` Arun R Bharadwaj

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20101108143322.GA10435@linux.vnet.ibm.com \
    --to=arun@linux.vnet.ibm.com \
    --cc=qemu-devel@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.