qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: Ming Lei <ming.lei@canonical.com>
To: qemu-devel@nongnu.org, Paolo Bonzini <pbonzini@redhat.com>,
	Stefan Hajnoczi <stefanha@redhat.com>,
	Kevin Wolf <kwolf@redhat.com>
Cc: Ming Lei <ming.lei@canonical.com>
Subject: [Qemu-devel] [PATCH v7 1/3] linux-aio: fix submit aio as a batch
Date: Mon,  1 Dec 2014 17:04:14 +0800	[thread overview]
Message-ID: <1417424656-29714-2-git-send-email-ming.lei@canonical.com> (raw)
In-Reply-To: <1417424656-29714-1-git-send-email-ming.lei@canonical.com>

In the submit path, we can't complete request directly,
otherwise "Co-routine re-entered recursively" may be caused,
so this patch fixes the issue with below ideas:

	- for -EAGAIN or partial submission, retry the submision
	in following completion cb which is run in BH context
	- for part of submission, update the io queue too
	- for case of io queue full, submit queued requests
	immediatelly and return failure to caller
	- for other failure, abort all queued requests in BH
    context, and requests won't be allow to submit until
    aborting is handled

Reviewed-by: Paolo Bonzini <pbonzini@redhat.com>
Signed-off-by: Ming Lei <ming.lei@canonical.com>
---
 block/linux-aio.c |  116 ++++++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 97 insertions(+), 19 deletions(-)

diff --git a/block/linux-aio.c b/block/linux-aio.c
index d92513b..53c5616 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -38,11 +38,21 @@ struct qemu_laiocb {
     QLIST_ENTRY(qemu_laiocb) node;
 };
 
+/*
+ * TODO: support to batch I/O from multiple bs in one same
+ * AIO context, one important use case is multi-lun scsi,
+ * so in future the IO queue should be per AIO context.
+ */
 typedef struct {
     struct iocb *iocbs[MAX_QUEUED_IO];
     int plugged;
     unsigned int size;
     unsigned int idx;
+
+    /* abort queued requests in BH context */
+    QEMUBH *abort_bh;
+    bool aborting;
+    int abort_ret;
 } LaioQueue;
 
 struct qemu_laio_state {
@@ -59,6 +69,8 @@ struct qemu_laio_state {
     int event_max;
 };
 
+static int ioq_submit(struct qemu_laio_state *s);
+
 static inline ssize_t io_event_ret(struct io_event *ev)
 {
     return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
@@ -135,6 +147,11 @@ static void qemu_laio_completion_bh(void *opaque)
 
         qemu_laio_process_completion(s, laiocb);
     }
+
+    /* Handle -EAGAIN or partial submission */
+    if (s->io_q.idx) {
+        ioq_submit(s);
+    }
 }
 
 static void qemu_laio_completion_cb(EventNotifier *e)
@@ -175,47 +192,100 @@ static void ioq_init(LaioQueue *io_q)
     io_q->size = MAX_QUEUED_IO;
     io_q->idx = 0;
     io_q->plugged = 0;
+    io_q->aborting = false;
 }
 
+/* Always return >= 0 and it means how many requests are submitted */
 static int ioq_submit(struct qemu_laio_state *s)
 {
-    int ret, i = 0;
+    int ret;
     int len = s->io_q.idx;
 
-    do {
-        ret = io_submit(s->ctx, len, s->io_q.iocbs);
-    } while (i++ < 3 && ret == -EAGAIN);
-
-    /* empty io queue */
-    s->io_q.idx = 0;
+    if (!len) {
+        return 0;
+    }
 
+    ret = io_submit(s->ctx, len, s->io_q.iocbs);
     if (ret < 0) {
-        i = 0;
-    } else {
-        i = ret;
+        /* retry in following completion cb */
+        if (ret == -EAGAIN) {
+            return 0;
+        }
+
+        /*
+         * Abort in BH context for avoiding Co-routine re-entered,
+         * and update io queue at that time
+         */
+        s->io_q.aborting = true;
+        s->io_q.abort_ret = ret;
+        qemu_bh_schedule(s->io_q.abort_bh);
+        ret = 0;
     }
 
-    for (; i < len; i++) {
-        struct qemu_laiocb *laiocb =
-            container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
+    /*
+     * update io queue, and retry will be started automatically
+     * in following completion cb for the remainder
+     */
+    if (ret > 0) {
+        if (ret < len) {
+            memmove(&s->io_q.iocbs[0], &s->io_q.iocbs[ret],
+                    (len - ret) * sizeof(struct iocb *));
+        }
+        s->io_q.idx -= ret;
+    }
+
+    return ret;
+}
 
-        laiocb->ret = (ret < 0) ? ret : -EIO;
+static void ioq_abort_bh(void *opaque)
+{
+    struct qemu_laio_state *s = opaque;
+    int i;
+
+    for (i = 0; i < s->io_q.idx; i++) {
+        struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
+                                                  struct qemu_laiocb,
+                                                  iocb);
+        laiocb->ret = s->io_q.abort_ret;
         qemu_laio_process_completion(s, laiocb);
     }
-    return ret;
+
+    s->io_q.idx = 0;
+    s->io_q.aborting = false;
 }
 
-static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
+static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
 {
     unsigned int idx = s->io_q.idx;
 
+    /* Request can't be allowed to submit until aborting is handled */
+    if (unlikely(s->io_q.aborting)) {
+        return -EIO;
+    }
+
+    if (unlikely(idx == s->io_q.size)) {
+        ioq_submit(s);
+
+        if (unlikely(s->io_q.aborting)) {
+            return -EIO;
+        }
+        idx = s->io_q.idx;
+    }
+
+    /* It has to return now if queue is still full */
+    if (unlikely(idx == s->io_q.size)) {
+        return -EAGAIN;
+    }
+
     s->io_q.iocbs[idx++] = iocb;
     s->io_q.idx = idx;
 
-    /* submit immediately if queue is full */
-    if (idx == s->io_q.size) {
+    /* submit immediately if queue depth is above 2/3 */
+    if (idx > s->io_q.size * 2 / 3) {
         ioq_submit(s);
     }
+
+    return 0;
 }
 
 void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
@@ -281,7 +351,9 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
             goto out_free_aiocb;
         }
     } else {
-        ioq_enqueue(s, iocbs);
+        if (ioq_enqueue(s, iocbs) < 0) {
+            goto out_free_aiocb;
+        }
     }
     return &laiocb->common;
 
@@ -296,14 +368,20 @@ void laio_detach_aio_context(void *s_, AioContext *old_context)
 
     aio_set_event_notifier(old_context, &s->e, NULL);
     qemu_bh_delete(s->completion_bh);
+    qemu_bh_delete(s->io_q.abort_bh);
 }
 
 void laio_attach_aio_context(void *s_, AioContext *new_context)
 {
     struct qemu_laio_state *s = s_;
 
+    s->io_q.abort_bh = aio_bh_new(new_context, ioq_abort_bh, s);
     s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
     aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
+
+    if (s->io_q.aborting) {
+        qemu_bh_schedule(s->io_q.abort_bh);
+    }
 }
 
 void *laio_init(void)
-- 
1.7.9.5

  reply	other threads:[~2014-12-01  9:04 UTC|newest]

Thread overview: 5+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2014-12-01  9:04 [Qemu-devel] [PATCH v7 0/3] linux-aio: fix batch submission Ming Lei
2014-12-01  9:04 ` Ming Lei [this message]
2014-12-01  9:04 ` [Qemu-devel] [PATCH v7 2/3] linux-aio: handling -EAGAIN for !s->io_q.plugged case Ming Lei
2014-12-01  9:04 ` [Qemu-devel] [PATCH v7 3/3] linux-aio: remove 'node' from 'struct qemu_laiocb' Ming Lei
2014-12-10 13:11 ` [Qemu-devel] [PATCH v7 0/3] linux-aio: fix batch submission Paolo Bonzini

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1417424656-29714-2-git-send-email-ming.lei@canonical.com \
    --to=ming.lei@canonical.com \
    --cc=kwolf@redhat.com \
    --cc=pbonzini@redhat.com \
    --cc=qemu-devel@nongnu.org \
    --cc=stefanha@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).