* [Qemu-devel] [RFC PATCH v2 0/6] linux-aio: Convert to coroutines @ 2014-12-05 16:06 Kevin Wolf 2014-12-05 16:06 ` [Qemu-devel] [RFC PATCH v2 1/6] qemu-img bench Kevin Wolf ` (5 more replies) 0 siblings, 6 replies; 10+ messages in thread From: Kevin Wolf @ 2014-12-05 16:06 UTC (permalink / raw) To: qemu-devel; +Cc: kwolf, ming.lei, pl, mreitz, stefanha, pbonzini This is my current branch for converting the linux-aio interface to coroutines. I started this as a performance optimisation, but I think it also makes it much easier to avoid the recursive coroutine reentrace that Ming Lei has sent a relatively complex callback-based patch for. See patch 3 for a quick attempt on fixing the same problem in a coroutine-based linux-aio implementation. Not ready to be merged mainly because of incomplete testing and benchmarking. v2: - Added patches 4-6 for handling of -EAGAIN and partial submits so that Ming Lei has a comparison to all of his callback-based series Kevin Wolf (6): qemu-img bench raw-posix: Convert Linux AIO submission to coroutines linux-aio: Don't reenter request coroutine recursively linux-aio: Support partial io_submits linux-aio: On -EAGAIN, wait for completions linux-aio: Queue requests instead of returning -EAGAIN block/linux-aio.c | 159 +++++++++++++++++++++++++++---------------------- block/raw-aio.h | 5 +- block/raw-posix.c | 62 +++++++++---------- qemu-img-cmds.hx | 6 ++ qemu-img.c | 174 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ qemu-img.texi | 10 ++++ 6 files changed, 309 insertions(+), 107 deletions(-) -- 1.8.3.1 ^ permalink raw reply [flat|nested] 10+ messages in thread
* [Qemu-devel] [RFC PATCH v2 1/6] qemu-img bench 2014-12-05 16:06 [Qemu-devel] [RFC PATCH v2 0/6] linux-aio: Convert to coroutines Kevin Wolf @ 2014-12-05 16:06 ` Kevin Wolf 2014-12-05 16:06 ` [Qemu-devel] [RFC PATCH v2 2/6] raw-posix: Convert Linux AIO submission to coroutines Kevin Wolf ` (4 subsequent siblings) 5 siblings, 0 replies; 10+ messages in thread From: Kevin Wolf @ 2014-12-05 16:06 UTC (permalink / raw) To: qemu-devel; +Cc: kwolf, ming.lei, pl, mreitz, stefanha, pbonzini This adds a qemu-img command that allows doing some simple benchmarks for the block layer without involving guest devices and a real VM. For the start, this implements only a test of sequential reads. Signed-off-by: Kevin Wolf <kwolf@redhat.com> --- qemu-img-cmds.hx | 6 ++ qemu-img.c | 174 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ qemu-img.texi | 10 ++++ 3 files changed, 190 insertions(+) diff --git a/qemu-img-cmds.hx b/qemu-img-cmds.hx index 9567774..2b23b57 100644 --- a/qemu-img-cmds.hx +++ b/qemu-img-cmds.hx @@ -9,6 +9,12 @@ STEXI @table @option ETEXI +DEF("bench", img_bench, + "bench [-c count] [-d depth] [-f fmt] [-n] [-q] [-s buffer_size] [-t cache] filename") +STEXI +@item bench [-c @var{count}] [-d @var{depth}] [-f @var{fmt}] [-n] [-q] [-s @var{buffer_size}] [-t @var{cache}] @var{filename} +ETEXI + DEF("check", img_check, "check [-q] [-f fmt] [--output=ofmt] [-r [leaks | all]] [-T src_cache] filename") STEXI diff --git a/qemu-img.c b/qemu-img.c index a42335c..c2388ad 100644 --- a/qemu-img.c +++ b/qemu-img.c @@ -3003,6 +3003,180 @@ out: return 0; } +typedef struct BenchData { + BlockDriverState *bs; + int bufsize; + int nrreq; + int n; + uint8_t *buf; + QEMUIOVector *qiov; + + int in_flight; + uint64_t sector; +} BenchData; + +static void bench_cb(void *opaque, int ret) +{ + BenchData *b = opaque; + BlockAIOCB *acb; + + if (ret < 0) { + error_report("Failed request: %s\n", strerror(-ret)); + exit(EXIT_FAILURE); + } + if (b->in_flight > 0) { + b->n--; + b->in_flight--; + } + + while (b->n > b->in_flight && b->in_flight < b->nrreq) { + acb = bdrv_aio_readv(b->bs, b->sector, b->qiov, + b->bufsize >> BDRV_SECTOR_BITS, + bench_cb, b); + if (!acb) { + error_report("Failed to issue request"); + exit(EXIT_FAILURE); + } + b->in_flight++; + b->sector += b->bufsize; + b->sector %= b->bs->total_sectors; + } +} + +static int img_bench(int argc, char **argv) +{ + int c, ret = 0; + const char *fmt = NULL, *filename; + bool quiet = false; + int count = 75000; + int depth = 64; + size_t bufsize = 4096; + BlockBackend *blk = NULL; + BlockDriverState *bs; + BenchData data = {}; + int flags = BDRV_O_FLAGS; + struct timeval t1, t2; + int i; + + for (;;) { + c = getopt(argc, argv, "hc:d:f:nqs:t:"); + if (c == -1) { + break; + } + + switch (c) { + case 'h': + case '?': + help(); + break; + case 'c': + { + char *end; + errno = 0; + count = strtoul(optarg, &end, 0); + if (errno || *end || count > INT_MAX) { + error_report("Invalid request count specified"); + return 1; + } + break; + } + case 'd': + { + char *end; + errno = 0; + depth = strtoul(optarg, &end, 0); + if (errno || *end || depth > INT_MAX) { + error_report("Invalid request count specified"); + return 1; + } + break; + } + case 'f': + fmt = optarg; + break; + case 'n': + flags |= BDRV_O_NATIVE_AIO; + break; + case 'q': + quiet = true; + break; + case 's': + { + int64_t sval; + char *end; + + sval = strtosz_suffix(optarg, &end, STRTOSZ_DEFSUFFIX_B); + if (sval < 0 || sval > INT_MAX || *end) { + error_report("Invalid buffer size specified"); + return 1; + } + + bufsize = sval; + break; + } + case 't': + ret = bdrv_parse_cache_flags(optarg, &flags); + if (ret < 0) { + error_report("Invalid cache mode"); + ret = -1; + goto out; + } + break; + } + } + + if (optind != argc - 1) { + error_exit("Expecting one image file name"); + } + filename = argv[argc - 1]; + + blk = img_open("image", filename, fmt, flags, true, quiet); + if (!blk) { + ret = -1; + goto out; + } + bs = blk_bs(blk); + + data = (BenchData) { + .bs = bs, + .bufsize = bufsize, + .nrreq = depth, + .n = count, + }; + printf("Sending %d requests, %d bytes each, %d in parallel\n", + data.n, data.bufsize, data.nrreq); + + data.buf = qemu_blockalign(bs, data.nrreq * data.bufsize); + data.qiov = g_new(QEMUIOVector, data.nrreq); + for (i = 0; i < data.nrreq; i++) { + qemu_iovec_init(&data.qiov[i], 1); + qemu_iovec_add(&data.qiov[i], + data.buf + i * data.bufsize, data.bufsize); + } + + gettimeofday(&t1, NULL); + bench_cb(&data, 0); + + while (data.n > 0) { + main_loop_wait(false); + } + gettimeofday(&t2, NULL); + + printf("Run completed in %3.3f seconds.\n", + (t2.tv_sec - t1.tv_sec) + + ((double)(t2.tv_usec - t1.tv_usec) / 1000000)); + +out: + qemu_vfree(data.buf); + blk_unref(blk); + + if (ret) { + return 1; + } + return 0; +} + + static const img_cmd_t img_cmds[] = { #define DEF(option, callback, arg_string) \ { option, callback }, diff --git a/qemu-img.texi b/qemu-img.texi index 0a1ab35..fc499cb 100644 --- a/qemu-img.texi +++ b/qemu-img.texi @@ -117,6 +117,16 @@ Skip the creation of the target volume Command description: @table @option +@item bench [-c @var{count}] [-d @var{depth}] [-f @var{fmt}] [-n] [-q] [-s @var{buffer_size}] [-t @var{cache}] @var{filename} + +Run a simple sequential read benchmark on the specified image. A total number +of @var{count} I/O requests is performed, each @var{buffer_size} bytes in size, +and with @var{depth} requests in parallel. + +If @code{-n} is specified, the native AIO backend is used if possible. On +Linux, this option has no effect unless @code{-t none} or @code{-t directsync} +is specified as well. + @item check [-f @var{fmt}] [--output=@var{ofmt}] [-r [leaks | all]] [-T @var{src_cache}] @var{filename} Perform a consistency check on the disk image @var{filename}. The command can -- 1.8.3.1 ^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Qemu-devel] [RFC PATCH v2 2/6] raw-posix: Convert Linux AIO submission to coroutines 2014-12-05 16:06 [Qemu-devel] [RFC PATCH v2 0/6] linux-aio: Convert to coroutines Kevin Wolf 2014-12-05 16:06 ` [Qemu-devel] [RFC PATCH v2 1/6] qemu-img bench Kevin Wolf @ 2014-12-05 16:06 ` Kevin Wolf 2014-12-05 16:06 ` [Qemu-devel] [RFC PATCH v2 3/6] linux-aio: Don't reenter request coroutine recursively Kevin Wolf ` (3 subsequent siblings) 5 siblings, 0 replies; 10+ messages in thread From: Kevin Wolf @ 2014-12-05 16:06 UTC (permalink / raw) To: qemu-devel; +Cc: kwolf, ming.lei, pl, mreitz, stefanha, pbonzini This improves the performance of requests because an ACB doesn't need to be allocated on the heap any more. It also makes the code nicer and smaller. As a side effect, the codepath taken by aio=threads is changed to use paio_submit_co(). This doesn't change the performance at this point. Results of qemu-img bench -t none -c 10000000 [-n] /dev/loop0: | aio=native | aio=threads | before | with patch | before | with patch ------+----------+------------+----------+------------ run 1 | 29.921s | 26.932s | 35.286s | 35.447s run 2 | 29.793s | 26.252s | 35.276s | 35.111s run 3 | 30.186s | 27.114s | 35.042s | 34.921s run 4 | 30.425s | 26.600s | 35.169s | 34.968s run 5 | 30.041s | 26.263s | 35.224s | 35.000s TODO: Do some more serious benchmarking in VMs with less variance. Results of a quick fio run are vaguely positive. Signed-off-by: Kevin Wolf <kwolf@redhat.com> --- block/linux-aio.c | 70 +++++++++++++++++-------------------------------------- block/raw-aio.h | 5 ++-- block/raw-posix.c | 62 ++++++++++++++++++++++-------------------------- 3 files changed, 52 insertions(+), 85 deletions(-) diff --git a/block/linux-aio.c b/block/linux-aio.c index d92513b..99b259d 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -12,6 +12,7 @@ #include "qemu/queue.h" #include "block/raw-aio.h" #include "qemu/event_notifier.h" +#include "block/coroutine.h" #include <libaio.h> @@ -28,7 +29,7 @@ #define MAX_QUEUED_IO 128 struct qemu_laiocb { - BlockAIOCB common; + Coroutine *co; struct qemu_laio_state *ctx; struct iocb iocb; ssize_t ret; @@ -86,9 +87,9 @@ static void qemu_laio_process_completion(struct qemu_laio_state *s, } } } - laiocb->common.cb(laiocb->common.opaque, ret); - qemu_aio_unref(laiocb); + laiocb->ret = ret; + qemu_coroutine_enter(laiocb->co, NULL); } /* The completion BH fetches completed I/O requests and invokes their @@ -146,30 +147,6 @@ static void qemu_laio_completion_cb(EventNotifier *e) } } -static void laio_cancel(BlockAIOCB *blockacb) -{ - struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb; - struct io_event event; - int ret; - - if (laiocb->ret != -EINPROGRESS) { - return; - } - ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event); - laiocb->ret = -ECANCELED; - if (ret != 0) { - /* iocb is not cancelled, cb will be called by the event loop later */ - return; - } - - laiocb->common.cb(laiocb->common.opaque, laiocb->ret); -} - -static const AIOCBInfo laio_aiocb_info = { - .aiocb_size = sizeof(struct qemu_laiocb), - .cancel_async = laio_cancel, -}; - static void ioq_init(LaioQueue *io_q) { io_q->size = MAX_QUEUED_IO; @@ -243,23 +220,21 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug) return ret; } -BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd, - int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, - BlockCompletionFunc *cb, void *opaque, int type) +int laio_submit_co(BlockDriverState *bs, void *aio_ctx, int fd, + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, int type) { struct qemu_laio_state *s = aio_ctx; - struct qemu_laiocb *laiocb; - struct iocb *iocbs; off_t offset = sector_num * 512; - laiocb = qemu_aio_get(&laio_aiocb_info, bs, cb, opaque); - laiocb->nbytes = nb_sectors * 512; - laiocb->ctx = s; - laiocb->ret = -EINPROGRESS; - laiocb->is_read = (type == QEMU_AIO_READ); - laiocb->qiov = qiov; - - iocbs = &laiocb->iocb; + struct qemu_laiocb laiocb = { + .co = qemu_coroutine_self(), + .nbytes = nb_sectors * 512, + .ctx = s, + .is_read = (type == QEMU_AIO_READ), + .qiov = qiov, + }; + struct iocb *iocbs = &laiocb.iocb; + int ret; switch (type) { case QEMU_AIO_WRITE: @@ -272,22 +247,21 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd, default: fprintf(stderr, "%s: invalid AIO request type 0x%x.\n", __func__, type); - goto out_free_aiocb; + return -EIO; } - io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e)); + io_set_eventfd(&laiocb.iocb, event_notifier_get_fd(&s->e)); if (!s->io_q.plugged) { - if (io_submit(s->ctx, 1, &iocbs) < 0) { - goto out_free_aiocb; + ret = io_submit(s->ctx, 1, &iocbs); + if (ret < 0) { + return ret; } } else { ioq_enqueue(s, iocbs); } - return &laiocb->common; -out_free_aiocb: - qemu_aio_unref(laiocb); - return NULL; + qemu_coroutine_yield(); + return laiocb.ret; } void laio_detach_aio_context(void *s_, AioContext *old_context) diff --git a/block/raw-aio.h b/block/raw-aio.h index 80681ce..b83c8af 100644 --- a/block/raw-aio.h +++ b/block/raw-aio.h @@ -35,9 +35,8 @@ #ifdef CONFIG_LINUX_AIO void *laio_init(void); void laio_cleanup(void *s); -BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd, - int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, - BlockCompletionFunc *cb, void *opaque, int type); +int laio_submit_co(BlockDriverState *bs, void *aio_ctx, int fd, + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, int type); void laio_detach_aio_context(void *s, AioContext *old_context); void laio_attach_aio_context(void *s, AioContext *new_context); void laio_io_plug(BlockDriverState *bs, void *aio_ctx); diff --git a/block/raw-posix.c b/block/raw-posix.c index b1af77e..aa16b53 100644 --- a/block/raw-posix.c +++ b/block/raw-posix.c @@ -1075,14 +1075,13 @@ static BlockAIOCB *paio_submit(BlockDriverState *bs, int fd, return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); } -static BlockAIOCB *raw_aio_submit(BlockDriverState *bs, - int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, - BlockCompletionFunc *cb, void *opaque, int type) +static int coroutine_fn raw_co_rw(BlockDriverState *bs, int64_t sector_num, + int nb_sectors, QEMUIOVector *qiov, int type) { BDRVRawState *s = bs->opaque; if (fd_open(bs) < 0) - return NULL; + return -EIO; /* * Check if the underlying device requires requests to be aligned, @@ -1095,14 +1094,25 @@ static BlockAIOCB *raw_aio_submit(BlockDriverState *bs, type |= QEMU_AIO_MISALIGNED; #ifdef CONFIG_LINUX_AIO } else if (s->use_aio) { - return laio_submit(bs, s->aio_ctx, s->fd, sector_num, qiov, - nb_sectors, cb, opaque, type); + return laio_submit_co(bs, s->aio_ctx, s->fd, sector_num, qiov, + nb_sectors, type); #endif } } - return paio_submit(bs, s->fd, sector_num, qiov, nb_sectors, - cb, opaque, type); + return paio_submit_co(bs, s->fd, sector_num, qiov, nb_sectors, type); +} + +static int coroutine_fn raw_co_readv(BlockDriverState *bs, int64_t sector_num, + int nb_sectors, QEMUIOVector *qiov) +{ + return raw_co_rw(bs, sector_num, nb_sectors, qiov, QEMU_AIO_READ); +} + +static int coroutine_fn raw_co_writev(BlockDriverState *bs, int64_t sector_num, + int nb_sectors, QEMUIOVector *qiov) +{ + return raw_co_rw(bs, sector_num, nb_sectors, qiov, QEMU_AIO_WRITE); } static void raw_aio_plug(BlockDriverState *bs) @@ -1135,22 +1145,6 @@ static void raw_aio_flush_io_queue(BlockDriverState *bs) #endif } -static BlockAIOCB *raw_aio_readv(BlockDriverState *bs, - int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, - BlockCompletionFunc *cb, void *opaque) -{ - return raw_aio_submit(bs, sector_num, qiov, nb_sectors, - cb, opaque, QEMU_AIO_READ); -} - -static BlockAIOCB *raw_aio_writev(BlockDriverState *bs, - int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, - BlockCompletionFunc *cb, void *opaque) -{ - return raw_aio_submit(bs, sector_num, qiov, nb_sectors, - cb, opaque, QEMU_AIO_WRITE); -} - static BlockAIOCB *raw_aio_flush(BlockDriverState *bs, BlockCompletionFunc *cb, void *opaque) { @@ -1701,8 +1695,8 @@ static BlockDriver bdrv_file = { .bdrv_co_get_block_status = raw_co_get_block_status, .bdrv_co_write_zeroes = raw_co_write_zeroes, - .bdrv_aio_readv = raw_aio_readv, - .bdrv_aio_writev = raw_aio_writev, + .bdrv_co_readv = raw_co_readv, + .bdrv_co_writev = raw_co_writev, .bdrv_aio_flush = raw_aio_flush, .bdrv_aio_discard = raw_aio_discard, .bdrv_refresh_limits = raw_refresh_limits, @@ -2103,8 +2097,8 @@ static BlockDriver bdrv_host_device = { .create_opts = &raw_create_opts, .bdrv_co_write_zeroes = hdev_co_write_zeroes, - .bdrv_aio_readv = raw_aio_readv, - .bdrv_aio_writev = raw_aio_writev, + .bdrv_co_readv = raw_co_readv, + .bdrv_co_writev = raw_co_writev, .bdrv_aio_flush = raw_aio_flush, .bdrv_aio_discard = hdev_aio_discard, .bdrv_refresh_limits = raw_refresh_limits, @@ -2252,8 +2246,8 @@ static BlockDriver bdrv_host_floppy = { .bdrv_create = hdev_create, .create_opts = &raw_create_opts, - .bdrv_aio_readv = raw_aio_readv, - .bdrv_aio_writev = raw_aio_writev, + .bdrv_co_readv = raw_co_readv, + .bdrv_co_writev = raw_co_writev, .bdrv_aio_flush = raw_aio_flush, .bdrv_refresh_limits = raw_refresh_limits, .bdrv_io_plug = raw_aio_plug, @@ -2383,8 +2377,8 @@ static BlockDriver bdrv_host_cdrom = { .bdrv_create = hdev_create, .create_opts = &raw_create_opts, - .bdrv_aio_readv = raw_aio_readv, - .bdrv_aio_writev = raw_aio_writev, + .bdrv_co_readv = raw_co_readv, + .bdrv_co_writev = raw_co_writev, .bdrv_aio_flush = raw_aio_flush, .bdrv_refresh_limits = raw_refresh_limits, .bdrv_io_plug = raw_aio_plug, @@ -2520,8 +2514,8 @@ static BlockDriver bdrv_host_cdrom = { .bdrv_create = hdev_create, .create_opts = &raw_create_opts, - .bdrv_aio_readv = raw_aio_readv, - .bdrv_aio_writev = raw_aio_writev, + .bdrv_co_readv = raw_co_readv, + .bdrv_co_writev = raw_co_writev, .bdrv_aio_flush = raw_aio_flush, .bdrv_refresh_limits = raw_refresh_limits, .bdrv_io_plug = raw_aio_plug, -- 1.8.3.1 ^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Qemu-devel] [RFC PATCH v2 3/6] linux-aio: Don't reenter request coroutine recursively 2014-12-05 16:06 [Qemu-devel] [RFC PATCH v2 0/6] linux-aio: Convert to coroutines Kevin Wolf 2014-12-05 16:06 ` [Qemu-devel] [RFC PATCH v2 1/6] qemu-img bench Kevin Wolf 2014-12-05 16:06 ` [Qemu-devel] [RFC PATCH v2 2/6] raw-posix: Convert Linux AIO submission to coroutines Kevin Wolf @ 2014-12-05 16:06 ` Kevin Wolf 2014-12-05 16:06 ` [Qemu-devel] [RFC PATCH v2 4/6] linux-aio: Support partial io_submits Kevin Wolf ` (2 subsequent siblings) 5 siblings, 0 replies; 10+ messages in thread From: Kevin Wolf @ 2014-12-05 16:06 UTC (permalink / raw) To: qemu-devel; +Cc: kwolf, ming.lei, pl, mreitz, stefanha, pbonzini When getting an error while submitting requests, we must be careful to wake up only inactive coroutines. Therefore we must special-case the currently active coroutine and communicate an error for that request using the ordinary return value of ioq_submit(). Signed-off-by: Kevin Wolf <kwolf@redhat.com> --- block/linux-aio.c | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/block/linux-aio.c b/block/linux-aio.c index 99b259d..fd8f0e4 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -177,12 +177,19 @@ static int ioq_submit(struct qemu_laio_state *s) container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb); laiocb->ret = (ret < 0) ? ret : -EIO; - qemu_laio_process_completion(s, laiocb); + if (laiocb->co != qemu_coroutine_self()) { + qemu_coroutine_enter(laiocb->co, NULL); + } else { + /* The return value is used for the currently active coroutine. + * We're always in ioq_enqueue() here, ioq_submit() never runs from + * a request's coroutine.*/ + ret = laiocb->ret; + } } return ret; } -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) { unsigned int idx = s->io_q.idx; @@ -191,7 +198,9 @@ static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) /* submit immediately if queue is full */ if (idx == s->io_q.size) { - ioq_submit(s); + return ioq_submit(s); + } else { + return 0; } } @@ -253,11 +262,11 @@ int laio_submit_co(BlockDriverState *bs, void *aio_ctx, int fd, if (!s->io_q.plugged) { ret = io_submit(s->ctx, 1, &iocbs); - if (ret < 0) { - return ret; - } } else { - ioq_enqueue(s, iocbs); + ret = ioq_enqueue(s, iocbs); + } + if (ret < 0) { + return ret; } qemu_coroutine_yield(); -- 1.8.3.1 ^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Qemu-devel] [RFC PATCH v2 4/6] linux-aio: Support partial io_submits 2014-12-05 16:06 [Qemu-devel] [RFC PATCH v2 0/6] linux-aio: Convert to coroutines Kevin Wolf ` (2 preceding siblings ...) 2014-12-05 16:06 ` [Qemu-devel] [RFC PATCH v2 3/6] linux-aio: Don't reenter request coroutine recursively Kevin Wolf @ 2014-12-05 16:06 ` Kevin Wolf 2014-12-05 16:06 ` [Qemu-devel] [RFC PATCH v2 5/6] linux-aio: On -EAGAIN, wait for completions Kevin Wolf 2014-12-05 16:06 ` [Qemu-devel] [RFC PATCH v2 6/6] linux-aio: Queue requests instead of returning -EAGAIN Kevin Wolf 5 siblings, 0 replies; 10+ messages in thread From: Kevin Wolf @ 2014-12-05 16:06 UTC (permalink / raw) To: qemu-devel; +Cc: kwolf, ming.lei, pl, mreitz, stefanha, pbonzini io_submit() can submit less requests than we passed it. In this case, we used to fail the remaining requests, which isn't very nice. Instead, let's just keep the remaining requests around in the request queue and resubmit them after the next request completion when resources should be available for new requests. Signed-off-by: Kevin Wolf <kwolf@redhat.com> --- block/linux-aio.c | 50 +++++++++++++++++++++++++++++++------------------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/block/linux-aio.c b/block/linux-aio.c index fd8f0e4..1406a71 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -60,6 +60,8 @@ struct qemu_laio_state { int event_max; }; +static int ioq_submit(struct qemu_laio_state *s); + static inline ssize_t io_event_ret(struct io_event *ev) { return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res); @@ -136,6 +138,12 @@ static void qemu_laio_completion_bh(void *opaque) qemu_laio_process_completion(s, laiocb); } + + /* If we couldn't submit all queued requests before, now that we have + * completed some, we might be able to submit more of them. */ + if (s->io_q.idx) { + ioq_submit(s); + } } static void qemu_laio_completion_cb(EventNotifier *e) @@ -163,29 +171,33 @@ static int ioq_submit(struct qemu_laio_state *s) ret = io_submit(s->ctx, len, s->io_q.iocbs); } while (i++ < 3 && ret == -EAGAIN); - /* empty io queue */ - s->io_q.idx = 0; - + /* On io_submit() failure, fail all requests in the queue */ if (ret < 0) { - i = 0; - } else { - i = ret; - } + s->io_q.idx = 0; - for (; i < len; i++) { - struct qemu_laiocb *laiocb = - container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb); - - laiocb->ret = (ret < 0) ? ret : -EIO; - if (laiocb->co != qemu_coroutine_self()) { - qemu_coroutine_enter(laiocb->co, NULL); - } else { - /* The return value is used for the currently active coroutine. - * We're always in ioq_enqueue() here, ioq_submit() never runs from - * a request's coroutine.*/ - ret = laiocb->ret; + for (i = 0; i < len; i++) { + struct qemu_laiocb *laiocb = + container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb); + + laiocb->ret = ret; + if (laiocb->co != qemu_coroutine_self()) { + qemu_coroutine_enter(laiocb->co, NULL); + } else { + /* The return value is used for the currently active coroutine. + * We're always in ioq_enqueue() here, ioq_submit() never runs + * from a queued request's coroutine.*/ + } } + return ret; } + + /* Remove submitted requests from the queue */ + s->io_q.idx = len - ret; + if (s->io_q.idx) { + memmove(s->io_q.iocbs, s->io_q.iocbs + ret, + s->io_q.idx * sizeof(s->io_q.iocbs[0])); + } + return ret; } -- 1.8.3.1 ^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Qemu-devel] [RFC PATCH v2 5/6] linux-aio: On -EAGAIN, wait for completions 2014-12-05 16:06 [Qemu-devel] [RFC PATCH v2 0/6] linux-aio: Convert to coroutines Kevin Wolf ` (3 preceding siblings ...) 2014-12-05 16:06 ` [Qemu-devel] [RFC PATCH v2 4/6] linux-aio: Support partial io_submits Kevin Wolf @ 2014-12-05 16:06 ` Kevin Wolf 2014-12-05 16:06 ` [Qemu-devel] [RFC PATCH v2 6/6] linux-aio: Queue requests instead of returning -EAGAIN Kevin Wolf 5 siblings, 0 replies; 10+ messages in thread From: Kevin Wolf @ 2014-12-05 16:06 UTC (permalink / raw) To: qemu-devel; +Cc: kwolf, ming.lei, pl, mreitz, stefanha, pbonzini Currently, if io_submit() return -EAGAIN, we immediately retry for three times (which will likely still result in -EAGAIN) and then give up, failing all requests in the queue. Instead, keep the requests queued and try to process them as soon as some earlier request completes. If our own queue is full and new requests arrive, the new requests fail with -EAGAIN now (before, the already queued ones would fail and the new ones would be queued). Use the same logic even when the queue is not plugged. In this case we'll not only queue the request, but immediately submit the queue. On success, nothing changes, but in cases where the kernel returns -EAGAIN, the requests are queued now instead of failing. Signed-off-by: Kevin Wolf <kwolf@redhat.com> --- block/linux-aio.c | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/block/linux-aio.c b/block/linux-aio.c index 1406a71..373ec4b 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -167,9 +167,12 @@ static int ioq_submit(struct qemu_laio_state *s) int ret, i = 0; int len = s->io_q.idx; - do { - ret = io_submit(s->ctx, len, s->io_q.iocbs); - } while (i++ < 3 && ret == -EAGAIN); + ret = io_submit(s->ctx, len, s->io_q.iocbs); + if (ret == -EAGAIN) { + /* Leave the requests in the queue, we'll retry as soon as some request + * has completed */ + return 0; + } /* On io_submit() failure, fail all requests in the queue */ if (ret < 0) { @@ -205,11 +208,15 @@ static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) { unsigned int idx = s->io_q.idx; + if (unlikely(idx == s->io_q.size)) { + return -EAGAIN; + } + s->io_q.iocbs[idx++] = iocb; s->io_q.idx = idx; - /* submit immediately if queue is full */ - if (idx == s->io_q.size) { + /* submit immediately if queue is not plugged or full */ + if (!s->io_q.plugged || idx == s->io_q.size) { return ioq_submit(s); } else { return 0; @@ -272,11 +279,7 @@ int laio_submit_co(BlockDriverState *bs, void *aio_ctx, int fd, } io_set_eventfd(&laiocb.iocb, event_notifier_get_fd(&s->e)); - if (!s->io_q.plugged) { - ret = io_submit(s->ctx, 1, &iocbs); - } else { - ret = ioq_enqueue(s, iocbs); - } + ret = ioq_enqueue(s, iocbs); if (ret < 0) { return ret; } -- 1.8.3.1 ^ permalink raw reply related [flat|nested] 10+ messages in thread
* [Qemu-devel] [RFC PATCH v2 6/6] linux-aio: Queue requests instead of returning -EAGAIN 2014-12-05 16:06 [Qemu-devel] [RFC PATCH v2 0/6] linux-aio: Convert to coroutines Kevin Wolf ` (4 preceding siblings ...) 2014-12-05 16:06 ` [Qemu-devel] [RFC PATCH v2 5/6] linux-aio: On -EAGAIN, wait for completions Kevin Wolf @ 2014-12-05 16:06 ` Kevin Wolf 2014-12-05 18:15 ` Paolo Bonzini 5 siblings, 1 reply; 10+ messages in thread From: Kevin Wolf @ 2014-12-05 16:06 UTC (permalink / raw) To: qemu-devel; +Cc: kwolf, ming.lei, pl, mreitz, stefanha, pbonzini If the queue array for io_submit() is already full, but a new request arrives, we cannot add it to that queue anymore. We can, however, use a CoQueue, which is implemented as a list and can therefore queue as many requests as we want. Signed-off-by: Kevin Wolf <kwolf@redhat.com> --- block/linux-aio.c | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/block/linux-aio.c b/block/linux-aio.c index 373ec4b..8e6328b 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -44,6 +44,7 @@ typedef struct { int plugged; unsigned int size; unsigned int idx; + CoQueue waiting; } LaioQueue; struct qemu_laio_state { @@ -160,6 +161,8 @@ static void ioq_init(LaioQueue *io_q) io_q->size = MAX_QUEUED_IO; io_q->idx = 0; io_q->plugged = 0; + + qemu_co_queue_init(&io_q->waiting); } static int ioq_submit(struct qemu_laio_state *s) @@ -201,15 +204,29 @@ static int ioq_submit(struct qemu_laio_state *s) s->io_q.idx * sizeof(s->io_q.iocbs[0])); } + /* Now there should be room for some more requests */ + if (!qemu_co_queue_empty(&s->io_q.waiting)) { + if (qemu_in_coroutine()) { + qemu_co_queue_next(&s->io_q.waiting); + } else { + qemu_co_enter_next(&s->io_q.waiting); + + } + } + return ret; } static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) { unsigned int idx = s->io_q.idx; + bool was_waiting = false; + int ret = 0; - if (unlikely(idx == s->io_q.size)) { - return -EAGAIN; + while (unlikely(idx == s->io_q.size)) { + /* Wait for iocb slots to become free */ + qemu_co_queue_wait(&s->io_q.waiting); + was_waiting = true; } s->io_q.iocbs[idx++] = iocb; @@ -217,10 +234,14 @@ static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) /* submit immediately if queue is not plugged or full */ if (!s->io_q.plugged || idx == s->io_q.size) { - return ioq_submit(s); - } else { - return 0; + ret = ioq_submit(s); + } else if (was_waiting) { + /* We were just woken up and there's stil room in the queue. Wake up + * someone else, too. */ + qemu_co_queue_next(&s->io_q.waiting); } + + return ret; } void laio_io_plug(BlockDriverState *bs, void *aio_ctx) -- 1.8.3.1 ^ permalink raw reply related [flat|nested] 10+ messages in thread
* Re: [Qemu-devel] [RFC PATCH v2 6/6] linux-aio: Queue requests instead of returning -EAGAIN 2014-12-05 16:06 ` [Qemu-devel] [RFC PATCH v2 6/6] linux-aio: Queue requests instead of returning -EAGAIN Kevin Wolf @ 2014-12-05 18:15 ` Paolo Bonzini 2014-12-08 9:34 ` Kevin Wolf 0 siblings, 1 reply; 10+ messages in thread From: Paolo Bonzini @ 2014-12-05 18:15 UTC (permalink / raw) To: Kevin Wolf, qemu-devel; +Cc: ming.lei, pl, stefanha, mreitz On 05/12/2014 17:06, Kevin Wolf wrote: > If the queue array for io_submit() is already full, but a new request > arrives, we cannot add it to that queue anymore. We can, however, use a > CoQueue, which is implemented as a list and can therefore queue as many > requests as we want. > > Signed-off-by: Kevin Wolf <kwolf@redhat.com> > --- > block/linux-aio.c | 31 ++++++++++++++++++++++++++----- > 1 file changed, 26 insertions(+), 5 deletions(-) > > diff --git a/block/linux-aio.c b/block/linux-aio.c > index 373ec4b..8e6328b 100644 > --- a/block/linux-aio.c > +++ b/block/linux-aio.c > @@ -44,6 +44,7 @@ typedef struct { > int plugged; > unsigned int size; > unsigned int idx; > + CoQueue waiting; > } LaioQueue; > > struct qemu_laio_state { > @@ -160,6 +161,8 @@ static void ioq_init(LaioQueue *io_q) > io_q->size = MAX_QUEUED_IO; > io_q->idx = 0; > io_q->plugged = 0; > + > + qemu_co_queue_init(&io_q->waiting); > } > > static int ioq_submit(struct qemu_laio_state *s) > @@ -201,15 +204,29 @@ static int ioq_submit(struct qemu_laio_state *s) > s->io_q.idx * sizeof(s->io_q.iocbs[0])); > } > > + /* Now there should be room for some more requests */ > + if (!qemu_co_queue_empty(&s->io_q.waiting)) { > + if (qemu_in_coroutine()) { > + qemu_co_queue_next(&s->io_q.waiting); > + } else { > + qemu_co_enter_next(&s->io_q.waiting); We should get better performance by wrapping these with plug/unplug. Trivial for the qemu_co_enter_next case, much less for qemu_co_queue_next... This exposes what I think is the main wrinkle in these patches: I'm not sure linux-aio is a great match for the coroutine architecture. You introduce some infrastructure duplication with block.c to track coroutines, and I don't find the coroutine code to be an improvement over Ming Lei's asynchronous one---in fact I actually find it more complicated. Paolo > + } > + } > + > return ret; > } > > static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) > { > unsigned int idx = s->io_q.idx; > + bool was_waiting = false; > + int ret = 0; > > - if (unlikely(idx == s->io_q.size)) { > - return -EAGAIN; > + while (unlikely(idx == s->io_q.size)) { > + /* Wait for iocb slots to become free */ > + qemu_co_queue_wait(&s->io_q.waiting); > + was_waiting = true; > } > > s->io_q.iocbs[idx++] = iocb; > @@ -217,10 +234,14 @@ static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb) > > /* submit immediately if queue is not plugged or full */ > if (!s->io_q.plugged || idx == s->io_q.size) { > - return ioq_submit(s); > - } else { > - return 0; > + ret = ioq_submit(s); > + } else if (was_waiting) { > + /* We were just woken up and there's stil room in the queue. Wake up > + * someone else, too. */ > + qemu_co_queue_next(&s->io_q.waiting); > } > + > + return ret; > } > > void laio_io_plug(BlockDriverState *bs, void *aio_ctx) > ^ permalink raw reply [flat|nested] 10+ messages in thread
* Re: [Qemu-devel] [RFC PATCH v2 6/6] linux-aio: Queue requests instead of returning -EAGAIN 2014-12-05 18:15 ` Paolo Bonzini @ 2014-12-08 9:34 ` Kevin Wolf 2014-12-10 11:38 ` Paolo Bonzini 0 siblings, 1 reply; 10+ messages in thread From: Kevin Wolf @ 2014-12-08 9:34 UTC (permalink / raw) To: Paolo Bonzini; +Cc: ming.lei, pl, qemu-devel, stefanha, mreitz Am 05.12.2014 um 19:15 hat Paolo Bonzini geschrieben: > > > On 05/12/2014 17:06, Kevin Wolf wrote: > > If the queue array for io_submit() is already full, but a new request > > arrives, we cannot add it to that queue anymore. We can, however, use a > > CoQueue, which is implemented as a list and can therefore queue as many > > requests as we want. > > > > Signed-off-by: Kevin Wolf <kwolf@redhat.com> > > --- > > block/linux-aio.c | 31 ++++++++++++++++++++++++++----- > > 1 file changed, 26 insertions(+), 5 deletions(-) > > > > diff --git a/block/linux-aio.c b/block/linux-aio.c > > index 373ec4b..8e6328b 100644 > > --- a/block/linux-aio.c > > +++ b/block/linux-aio.c > > @@ -44,6 +44,7 @@ typedef struct { > > int plugged; > > unsigned int size; > > unsigned int idx; > > + CoQueue waiting; > > } LaioQueue; > > > > struct qemu_laio_state { > > @@ -160,6 +161,8 @@ static void ioq_init(LaioQueue *io_q) > > io_q->size = MAX_QUEUED_IO; > > io_q->idx = 0; > > io_q->plugged = 0; > > + > > + qemu_co_queue_init(&io_q->waiting); > > } > > > > static int ioq_submit(struct qemu_laio_state *s) > > @@ -201,15 +204,29 @@ static int ioq_submit(struct qemu_laio_state *s) > > s->io_q.idx * sizeof(s->io_q.iocbs[0])); > > } > > > > + /* Now there should be room for some more requests */ > > + if (!qemu_co_queue_empty(&s->io_q.waiting)) { > > + if (qemu_in_coroutine()) { > > + qemu_co_queue_next(&s->io_q.waiting); > > + } else { > > + qemu_co_enter_next(&s->io_q.waiting); > > We should get better performance by wrapping these with > plug/unplug. Trivial for the qemu_co_enter_next case, much less for > qemu_co_queue_next... We can probably just use qemu_co_enter_next() everywhere. The only reason why I put a qemu_co_queue_next() there was that it saves a coroutine switch - probably premature optimisation anyway... > This exposes what I think is the main wrinkle in these patches: I'm not > sure linux-aio is a great match for the coroutine architecture. You > introduce some infrastructure duplication with block.c to track > coroutines, and I don't find the coroutine code to be an improvement > over Ming Lei's asynchronous one---in fact I actually find it more > complicated. Really? I found the callback-based one that introduces new BHs and an additional state for a queue that is being aborted (which must be considered everywhere) really ugly, and the resulting code from this coroutine-based series rather clean. I honestly expected that people would debate whether it does the right thing, but that nobody would disagree that it looks nicer - but maybe it's a matter of taste. Also note that this specific patch is doing an additional step that isn't part of Ming's series: Ming's series simply lets requests fail if the queue is full. Also, regardless of that (though I find readability important), my benchmarks seem to suggest that without this conversion, the other optimisations in the queue don't work that well. The fastest performance I've seen so far - including both coroutine and callback based versions - has this conversion applied (measured without patches 4-6 yet, though). Kevin ^ permalink raw reply [flat|nested] 10+ messages in thread
* Re: [Qemu-devel] [RFC PATCH v2 6/6] linux-aio: Queue requests instead of returning -EAGAIN 2014-12-08 9:34 ` Kevin Wolf @ 2014-12-10 11:38 ` Paolo Bonzini 0 siblings, 0 replies; 10+ messages in thread From: Paolo Bonzini @ 2014-12-10 11:38 UTC (permalink / raw) To: Kevin Wolf; +Cc: ming.lei, pl, qemu-devel, stefanha, mreitz On 08/12/2014 10:34, Kevin Wolf wrote: > Also note that this specific patch is doing an additional step that > isn't part of Ming's series: Ming's series simply lets requests fail if > the queue is full. That's true. Perhaps I was just exposed a lot to his code. > I honestly expected that people > would debate whether it does the right thing I'm pretty sure that it does the right thing (minus the comment about plug/unplug). Paolo ^ permalink raw reply [flat|nested] 10+ messages in thread
end of thread, other threads:[~2014-12-10 11:38 UTC | newest] Thread overview: 10+ messages (download: mbox.gz follow: Atom feed -- links below jump to the message on this page -- 2014-12-05 16:06 [Qemu-devel] [RFC PATCH v2 0/6] linux-aio: Convert to coroutines Kevin Wolf 2014-12-05 16:06 ` [Qemu-devel] [RFC PATCH v2 1/6] qemu-img bench Kevin Wolf 2014-12-05 16:06 ` [Qemu-devel] [RFC PATCH v2 2/6] raw-posix: Convert Linux AIO submission to coroutines Kevin Wolf 2014-12-05 16:06 ` [Qemu-devel] [RFC PATCH v2 3/6] linux-aio: Don't reenter request coroutine recursively Kevin Wolf 2014-12-05 16:06 ` [Qemu-devel] [RFC PATCH v2 4/6] linux-aio: Support partial io_submits Kevin Wolf 2014-12-05 16:06 ` [Qemu-devel] [RFC PATCH v2 5/6] linux-aio: On -EAGAIN, wait for completions Kevin Wolf 2014-12-05 16:06 ` [Qemu-devel] [RFC PATCH v2 6/6] linux-aio: Queue requests instead of returning -EAGAIN Kevin Wolf 2014-12-05 18:15 ` Paolo Bonzini 2014-12-08 9:34 ` Kevin Wolf 2014-12-10 11:38 ` Paolo Bonzini
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox; as well as URLs for NNTP newsgroup(s).