* [Qemu-devel] [RFC PATCH v3 0/4] block-queue: Delay and batch metadata writes
@ 2010-11-30 12:48 Kevin Wolf
2010-11-30 12:48 ` [Qemu-devel] [RFC PATCH v3 1/4] block: Implement bdrv_aio_pwrite Kevin Wolf
` (3 more replies)
0 siblings, 4 replies; 9+ messages in thread
From: Kevin Wolf @ 2010-11-30 12:48 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf, stefanha
block-queue itself looks more or less ready to me now, the rest is
possible bug fixes and addressing review comments. So please, give
the first three patches a good review, they are in their final version
otherwise.
This series is only marked RFC because proper qcow2 integration is
missing and merging block-queue doesn't make much sense without a user.
Patch 4/4 contains a hackish version of qcow2 integration which I think
should fully work (I think I addressed Stefan's comment), but is too
ugly to be merged.
Kevin Wolf (4):
block: Implement bdrv_aio_pwrite
Add block-queue
Test cases for block-queue
qcow2: Preliminary block-queue support
Makefile | 1 +
Makefile.objs | 2 +-
block-queue.c | 736 ++++++++++++++++++++++++++++++++++++++++++++++++
block-queue.h | 61 ++++
block.c | 167 +++++++++++
block.h | 2 +
block/qcow2-cluster.c | 39 ++--
block/qcow2-refcount.c | 62 ++--
block/qcow2.c | 83 ++++++-
block/qcow2.h | 5 +
check-block-queue.c | 402 ++++++++++++++++++++++++++
cpus.c | 8 +-
qemu-common.h | 3 +
qemu-tool.c | 5 +
sysemu.h | 1 -
15 files changed, 1521 insertions(+), 56 deletions(-)
create mode 100644 block-queue.c
create mode 100644 block-queue.h
create mode 100644 check-block-queue.c
--
1.7.2.3
^ permalink raw reply [flat|nested] 9+ messages in thread
* [Qemu-devel] [RFC PATCH v3 1/4] block: Implement bdrv_aio_pwrite
2010-11-30 12:48 [Qemu-devel] [RFC PATCH v3 0/4] block-queue: Delay and batch metadata writes Kevin Wolf
@ 2010-11-30 12:48 ` Kevin Wolf
2010-12-02 12:07 ` [Qemu-devel] " Stefan Hajnoczi
2010-11-30 12:48 ` [Qemu-devel] [RFC PATCH v3 2/4] Add block-queue Kevin Wolf
` (2 subsequent siblings)
3 siblings, 1 reply; 9+ messages in thread
From: Kevin Wolf @ 2010-11-30 12:48 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf, stefanha
This implements an asynchronous version of bdrv_pwrite.
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
---
block.c | 167 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
block.h | 2 +
2 files changed, 169 insertions(+), 0 deletions(-)
diff --git a/block.c b/block.c
index 63effd8..f10066e 100644
--- a/block.c
+++ b/block.c
@@ -2106,6 +2106,173 @@ BlockDriverAIOCB *bdrv_aio_writev(BlockDriverState *bs, int64_t sector_num,
return ret;
}
+typedef struct PwriteAIOCB {
+ BlockDriverAIOCB common;
+ int state;
+ int64_t offset;
+ size_t bytes;
+ uint8_t* buf;
+ uint8_t* tmp_buf;
+ struct iovec iov;
+ QEMUIOVector qiov;
+} PwriteAIOCB;
+
+static void pwrite_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+ qemu_aio_flush();
+}
+
+static AIOPool blkqueue_aio_pool = {
+ .aiocb_size = sizeof(PwriteAIOCB),
+ .cancel = pwrite_aio_cancel,
+};
+
+static void bdrv_aio_pwrite_cb(void *opaque, int ret)
+{
+ PwriteAIOCB *acb = opaque;
+ BlockDriverAIOCB *tmp_acb;
+ int64_t sector_num;
+
+ if (ret < 0) {
+ goto done;
+ }
+
+ sector_num = acb->offset >> BDRV_SECTOR_BITS;
+
+ switch (acb->state) {
+ case 0: {
+ /* Read first sector if needed */
+ int len;
+
+ len = (BDRV_SECTOR_SIZE - acb->offset) & (BDRV_SECTOR_SIZE - 1);
+
+ if (len > 0) {
+ acb->state = 1;
+ acb->tmp_buf = qemu_blockalign(acb->common.bs, BDRV_SECTOR_SIZE);
+ acb->iov.iov_base = acb->tmp_buf;
+ acb->iov.iov_len = BDRV_SECTOR_SIZE;
+ qemu_iovec_init_external(&acb->qiov, &acb->iov, 1);
+ tmp_acb = bdrv_aio_readv(acb->common.bs, sector_num, &acb->qiov, 1,
+ bdrv_aio_pwrite_cb, acb);
+ if (tmp_acb == NULL) {
+ bdrv_aio_pwrite_cb(acb, -EIO);
+ }
+ } else {
+ acb->state = 2;
+ bdrv_aio_pwrite_cb(acb, 0);
+ }
+ break;
+ }
+
+ case 1: {
+ /* Modify first cluster and write it back */
+ int len;
+
+ len = (BDRV_SECTOR_SIZE - acb->offset) & (BDRV_SECTOR_SIZE - 1);
+ if (len > acb->bytes) {
+ len = acb->bytes;
+ }
+
+ memcpy(acb->tmp_buf + (acb->offset & (BDRV_SECTOR_SIZE - 1)),
+ acb->buf, len);
+
+ acb->state = 2;
+ acb->offset += len;
+ acb->buf += len;
+ acb->bytes -= len;
+
+ tmp_acb = bdrv_aio_writev(acb->common.bs, sector_num, &acb->qiov, 1,
+ bdrv_aio_pwrite_cb, acb);
+ if (tmp_acb == NULL) {
+ bdrv_aio_pwrite_cb(acb, -EIO);
+ }
+ break;
+ }
+
+ case 2: {
+ /* Write the sectors "in place" */
+ int nb_sectors = acb->bytes >> BDRV_SECTOR_BITS;
+
+ acb->state = 3;
+ if (nb_sectors > 0) {
+ int len = nb_sectors << BDRV_SECTOR_BITS;
+
+ acb->iov.iov_base = acb->buf;
+ acb->iov.iov_len = len;
+ qemu_iovec_init_external(&acb->qiov, &acb->iov, 1);
+
+ acb->offset += len;
+ acb->buf += len;
+ acb->bytes -= len;
+
+ tmp_acb = bdrv_aio_writev(acb->common.bs, sector_num, &acb->qiov,
+ nb_sectors, bdrv_aio_pwrite_cb, acb);
+ if (tmp_acb == NULL) {
+ bdrv_aio_pwrite_cb(acb, -EIO);
+ }
+ } else {
+ bdrv_aio_pwrite_cb(acb, 0);
+ }
+ break;
+ }
+
+ case 3: {
+ /* Read last sector if needed */
+ if (acb->bytes == 0) {
+ goto done;
+ }
+
+ acb->state = 4;
+ acb->iov.iov_base = acb->tmp_buf;
+ acb->iov.iov_len = BDRV_SECTOR_SIZE;
+ qemu_iovec_init_external(&acb->qiov, &acb->iov, 1);
+ tmp_acb = bdrv_aio_readv(acb->common.bs, sector_num, &acb->qiov, 1,
+ bdrv_aio_pwrite_cb, acb);
+ if (tmp_acb == NULL) {
+ bdrv_aio_pwrite_cb(acb, -EIO);
+ }
+ break;
+ }
+
+ case 4:
+ /* Modify and write last sector */
+ acb->state = 5;
+ memcpy(acb->tmp_buf, acb->buf, acb->bytes);
+ tmp_acb = bdrv_aio_writev(acb->common.bs, sector_num, &acb->qiov, 1,
+ bdrv_aio_pwrite_cb, acb);
+ if (tmp_acb == NULL) {
+ bdrv_aio_pwrite_cb(acb, -EIO);
+ }
+ break;
+
+ case 5:
+ goto done;
+ }
+ return;
+
+done:
+ qemu_free(acb->tmp_buf);
+ acb->common.cb(acb->common.opaque, ret);
+ qemu_aio_release(acb);
+}
+
+BlockDriverAIOCB *bdrv_aio_pwrite(BlockDriverState *bs, int64_t offset,
+ void* buf, size_t bytes, BlockDriverCompletionFunc *cb, void *opaque)
+{
+ PwriteAIOCB *acb;
+
+ acb = qemu_aio_get(&blkqueue_aio_pool, bs, cb, opaque);
+ acb->state = 0;
+ acb->offset = offset;
+ acb->buf = buf;
+ acb->bytes = bytes;
+ acb->tmp_buf = NULL;
+
+ bdrv_aio_pwrite_cb(acb, 0);
+
+ return &acb->common;
+}
+
typedef struct MultiwriteCB {
int error;
diff --git a/block.h b/block.h
index 78ecfac..c6e4d90 100644
--- a/block.h
+++ b/block.h
@@ -116,6 +116,8 @@ BlockDriverAIOCB *bdrv_aio_readv(BlockDriverState *bs, int64_t sector_num,
BlockDriverAIOCB *bdrv_aio_writev(BlockDriverState *bs, int64_t sector_num,
QEMUIOVector *iov, int nb_sectors,
BlockDriverCompletionFunc *cb, void *opaque);
+BlockDriverAIOCB *bdrv_aio_pwrite(BlockDriverState *bs, int64_t offset, void* buf,
+ size_t bytes, BlockDriverCompletionFunc *cb, void *opaque);
BlockDriverAIOCB *bdrv_aio_flush(BlockDriverState *bs,
BlockDriverCompletionFunc *cb, void *opaque);
void bdrv_aio_cancel(BlockDriverAIOCB *acb);
--
1.7.2.3
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [Qemu-devel] [RFC PATCH v3 2/4] Add block-queue
2010-11-30 12:48 [Qemu-devel] [RFC PATCH v3 0/4] block-queue: Delay and batch metadata writes Kevin Wolf
2010-11-30 12:48 ` [Qemu-devel] [RFC PATCH v3 1/4] block: Implement bdrv_aio_pwrite Kevin Wolf
@ 2010-11-30 12:48 ` Kevin Wolf
2010-12-03 9:44 ` [Qemu-devel] " Stefan Hajnoczi
2010-11-30 12:48 ` [Qemu-devel] [RFC PATCH v3 3/4] Test cases for block-queue Kevin Wolf
2010-11-30 12:48 ` [Qemu-devel] [RFC PATCH v3 4/4] qcow2: Preliminary block-queue support Kevin Wolf
3 siblings, 1 reply; 9+ messages in thread
From: Kevin Wolf @ 2010-11-30 12:48 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf, stefanha
Instead of directly executing writes and fsyncs, queue them and execute them
asynchronously. What makes this interesting is that we can delay syncs and if
multiple syncs occur, we can merge them into one bdrv_flush.
A typical sequence in qcow2 (simple cluster allocation) looks like this:
1. Update refcount table
2. bdrv_flush
3. Update L2 entry
If we delay the operation and get three of these sequences queued before
actually executing, we end up with the following result, saving two syncs:
1. Update refcount table (req 1)
2. Update refcount table (req 2)
3. Update refcount table (req 3)
4. bdrv_flush
5. Update L2 entry (req 1)
6. Update L2 entry (req 2)
7. Update L2 entry (req 3)
This patch only commits a sync if either the guests has requested a flush or if
a certain number of requests is in the queue, so usually we batch more than
just three requests.
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
---
Makefile.objs | 2 +-
block-queue.c | 736 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
block-queue.h | 61 +++++
3 files changed, 798 insertions(+), 1 deletions(-)
create mode 100644 block-queue.c
create mode 100644 block-queue.h
diff --git a/Makefile.objs b/Makefile.objs
index 23b17ce..76495f3 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -14,7 +14,7 @@ oslib-obj-$(CONFIG_POSIX) += oslib-posix.o
# block-obj-y is code used by both qemu system emulation and qemu-img
block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
-block-obj-y += nbd.o block.o aio.o aes.o qemu-config.o
+block-obj-y += nbd.o block.o aio.o aes.o qemu-config.o block-queue.o
block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
diff --git a/block-queue.c b/block-queue.c
new file mode 100644
index 0000000..235c69b
--- /dev/null
+++ b/block-queue.c
@@ -0,0 +1,736 @@
+/*
+ * QEMU System Emulator
+ *
+ * Copyright (c) 2010 Kevin Wolf <kwolf@redhat.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#include "qemu-common.h"
+#include "qemu-queue.h"
+#include "block_int.h"
+#include "block-queue.h"
+
+//#define BLKQUEUE_DEBUG
+
+#ifdef BLKQUEUE_DEBUG
+#define DPRINTF(fmt, ...) fprintf(stderr, fmt, ##__VA_ARGS__)
+#else
+#define DPRINTF(...) do {} while(0)
+#endif
+
+#define WRITEBACK_MODES (BDRV_O_NOCACHE | BDRV_O_CACHE_WB)
+
+enum blkqueue_req_type {
+ REQ_TYPE_WRITE,
+ REQ_TYPE_BARRIER,
+};
+
+typedef struct BlockQueueAIOCB {
+ BlockDriverAIOCB common;
+ QLIST_ENTRY(BlockQueueAIOCB) link;
+} BlockQueueAIOCB;
+
+typedef struct BlockQueueRequest {
+ enum blkqueue_req_type type;
+ BlockQueue* bq;
+
+ uint64_t offset;
+ void* buf;
+ uint64_t size;
+ unsigned section;
+
+ QLIST_HEAD(, BlockQueueAIOCB) acbs;
+
+ QTAILQ_ENTRY(BlockQueueRequest) link;
+ QSIMPLEQ_ENTRY(BlockQueueRequest) link_section;
+} BlockQueueRequest;
+
+QTAILQ_HEAD(bq_queue_head, BlockQueueRequest);
+
+struct BlockQueue {
+ BlockDriverState* bs;
+
+ int barriers_requested;
+ int barriers_submitted;
+ int queue_size;
+ int flushing;
+
+ BlockQueueErrorHandler error_handler;
+ void* error_opaque;
+ int error_ret;
+
+ unsigned int in_flight_num;
+ enum blkqueue_req_type in_flight_type;
+
+ struct bq_queue_head queue;
+ struct bq_queue_head in_flight;
+
+ QSIMPLEQ_HEAD(, BlockQueueRequest) sections;
+};
+
+typedef int (*blkqueue_rw_fn)(BlockQueueContext *context, uint64_t offset,
+ void *buf, uint64_t size);
+typedef void (*blkqueue_handle_overlap)(void *new, void *old, size_t size);
+
+static void blkqueue_process_request(BlockQueue *bq);
+static void blkqueue_aio_cancel(BlockDriverAIOCB *blockacb);
+
+static AIOPool blkqueue_aio_pool = {
+ .aiocb_size = sizeof(struct BlockQueueAIOCB),
+ .cancel = blkqueue_aio_cancel,
+};
+
+BlockQueue *blkqueue_create(BlockDriverState *bs,
+ BlockQueueErrorHandler error_handler, void *error_opaque)
+{
+ BlockQueue *bq = qemu_mallocz(sizeof(BlockQueue));
+ bq->bs = bs;
+ bq->error_handler = error_handler;
+ bq->error_opaque = error_opaque;
+
+ QTAILQ_INIT(&bq->queue);
+ QTAILQ_INIT(&bq->in_flight);
+ QSIMPLEQ_INIT(&bq->sections);
+
+ return bq;
+}
+
+void blkqueue_init_context(BlockQueueContext* context, BlockQueue *bq)
+{
+ context->bq = bq;
+ context->section = 0;
+}
+
+void blkqueue_destroy(BlockQueue *bq)
+{
+ blkqueue_flush(bq);
+
+ DPRINTF("blkqueue_destroy: %d/%d barriers left\n",
+ bq->barriers_submitted, bq->barriers_requested);
+
+ assert(QTAILQ_FIRST(&bq->in_flight) == NULL);
+ assert(QTAILQ_FIRST(&bq->queue) == NULL);
+ assert(QSIMPLEQ_FIRST(&bq->sections) == NULL);
+ qemu_free(bq);
+}
+
+bool blkqueue_is_empty(BlockQueue *bq)
+{
+ return (QTAILQ_FIRST(&bq->queue) == NULL);
+}
+
+/*
+ * Checks if a new read/write request accesses a region that is written by a
+ * write request in the queue. If so, call the given overlap handler that can
+ * use memcpy to work on the queue instead of accessing the disk.
+ *
+ * Returns true if the new request is handled completely, false if the caller
+ * needs to continue accessing other queues or the disk.
+ */
+static bool blkqueue_check_queue_overlap(BlockQueueContext *context,
+ struct bq_queue_head *queue, uint64_t *_offset, void **_buf,
+ uint64_t *_size,
+ blkqueue_rw_fn recurse, blkqueue_handle_overlap handle_overlap,
+ int min_section)
+{
+ BlockQueueRequest *req;
+
+ uint64_t offset = *_offset;
+ void *buf = *_buf;
+ uint64_t size = *_size;
+
+ /* Reverse order to access most current data */
+ QTAILQ_FOREACH_REVERSE(req, queue, bq_queue_head, link) {
+ uint64_t end = offset + size;
+ uint64_t req_end = req->offset + req->size;
+ uint8_t *read_buf = buf;
+ uint8_t *req_buf = req->buf;
+
+ /* We're only interested in queued writes */
+ if (req->type != REQ_TYPE_WRITE) {
+ continue;
+ }
+
+ /* Ignore requests that are too early (needed for merging requests */
+ if (req->section < min_section) {
+ continue;
+ }
+
+ /*
+ * If we read from a write in the queue (i.e. our read overlaps the
+ * write request), our next write probably depends on this write, so
+ * let's move forward to its section.
+ *
+ * If we're processing a new write, we definitely have a dependency,
+ * because we must not overwrite the newer data by the older one.
+ */
+ if (end > req->offset && offset < req_end) {
+ context->section = MAX(context->section, req->section);
+ }
+
+ /* How we continue, depends on the kind of overlap we have */
+ if ((offset >= req->offset) && (end <= req_end)) {
+ /* Completely contained in the queued request */
+ handle_overlap(buf, &req_buf[offset - req->offset], size);
+ return true;
+ } else if ((end >= req->offset) && (end <= req_end)) {
+ /* Overlap in the end of the new request */
+ assert(offset < req->offset);
+ handle_overlap(&read_buf[req->offset - offset], req_buf,
+ end - req->offset);
+ size = req->offset - offset;
+ } else if ((offset >= req->offset) && (offset < req_end)) {
+ /* Overlap in the start of the new request */
+ assert(end > req_end);
+ handle_overlap(read_buf, &req_buf[offset - req->offset],
+ req_end - offset);
+ buf = read_buf = &read_buf[req_end - offset];
+ offset = req_end;
+ size = end - req_end;
+ } else if ((req->offset >= offset) && (req_end <= end)) {
+ /*
+ * The queued request is completely contained in the new request.
+ * Use memcpy for the data from the queued request here, continue
+ * with the data before the queued request and handle the data
+ * after the queued request with a recursive call.
+ */
+ handle_overlap(&read_buf[req->offset - offset], req_buf,
+ req_end - req->offset);
+ size = req->offset - offset;
+ recurse(context, req_end, &read_buf[req_end - offset],
+ end - req_end);
+ }
+ }
+
+ /* The caller must continue with the request */
+ *_offset = offset;
+ *_buf = buf;
+ *_size = size;
+
+ return false;
+}
+
+static void pread_handle_overlap(void *new, void *old, size_t size)
+{
+ memcpy(new, old, size);
+}
+
+/*
+ * Read from the file like bdrv_pread, but consider pending writes so that
+ * consistency is maintained when blkqueue_pread/pwrite is used instead of
+ * bdrv_pread/pwrite.
+ */
+int blkqueue_pread(BlockQueueContext *context, uint64_t offset, void *buf,
+ uint64_t size)
+{
+ BlockQueue *bq = context->bq;
+ int ret;
+ bool completed;
+
+ /*
+ * First check if there are any pending writes for the same data.
+ *
+ * The latest writes are in bq->queue, and if checking those isn't enough,
+ * we have a second queue of requests that are already submitted, but
+ * haven't completed yet.
+ */
+ completed = blkqueue_check_queue_overlap(context, &bq->queue, &offset,
+ &buf, &size, &blkqueue_pread, &pread_handle_overlap, 0);
+
+ if (!completed) {
+ completed = blkqueue_check_queue_overlap(context, &bq->in_flight,
+ &offset, &buf, &size, &blkqueue_pread, &pread_handle_overlap, 0);
+ }
+
+ if (completed) {
+ return 0;
+ }
+
+ /* The requested is not written in the queue, read it from disk */
+ ret = bdrv_pread(bq->bs, offset, buf, size);
+ if (ret < 0) {
+ return ret;
+ }
+
+ return 0;
+}
+
+static void pwrite_handle_overlap(void *new, void *old, size_t size)
+{
+ DPRINTF("update pwrite: %p <- %p [%ld]\n", old, new, size);
+ memcpy(old, new, size);
+}
+
+/*
+ * Adds a write request to the queue.
+ */
+int blkqueue_pwrite(BlockQueueContext *context, uint64_t offset, void *buf,
+ uint64_t size)
+{
+ BlockQueue *bq = context->bq;
+ BlockQueueRequest *section_req;
+ bool completed;
+
+ /* Don't use the queue for writethrough images */
+ if ((bq->bs->open_flags & WRITEBACK_MODES) == 0) {
+ return bdrv_pwrite(bq->bs, offset, buf, size);
+ }
+
+ /* First check if there are any pending writes for the same data. */
+ DPRINTF("-- pwrite: [%#lx + %ld]\n", offset, size);
+ completed = blkqueue_check_queue_overlap(context, &bq->queue, &offset,
+ &buf, &size, &blkqueue_pwrite, &pwrite_handle_overlap,
+ context->section);
+
+ if (completed) {
+ return 0;
+ }
+
+ /* Create request structure */
+ BlockQueueRequest *req = qemu_malloc(sizeof(*req));
+ QLIST_INIT(&req->acbs);
+ req->type = REQ_TYPE_WRITE;
+ req->bq = bq;
+ req->offset = offset;
+ req->size = size;
+ req->buf = qemu_malloc(size);
+ req->section = context->section;
+ memcpy(req->buf, buf, size);
+
+ /*
+ * Find the right place to insert it into the queue:
+ * Right before the barrier that closes the current section.
+ */
+ QSIMPLEQ_FOREACH(section_req, &bq->sections, link_section) {
+ if (section_req->section >= req->section) {
+ req->section = section_req->section;
+ context->section = section_req->section;
+ QTAILQ_INSERT_BEFORE(section_req, req, link);
+ goto out;
+ }
+ }
+
+ /* If there was no barrier, just put it at the end. */
+ QTAILQ_INSERT_TAIL(&bq->queue, req, link);
+
+out:
+ DPRINTF("queue-ins pwrite: %p [%#lx + %ld]\n", req, req->offset, req->size);
+ bq->queue_size++;
+#ifndef RUN_TESTS
+ blkqueue_process_request(bq);
+#endif
+
+ return 0;
+}
+
+static int insert_barrier(BlockQueueContext *context, BlockQueueAIOCB *acb)
+{
+ BlockQueue *bq = context->bq;
+ BlockQueueRequest *section_req;
+
+ bq->barriers_requested++;
+
+ /* Create request structure */
+ BlockQueueRequest *req = qemu_malloc(sizeof(*req));
+ QLIST_INIT(&req->acbs);
+ req->type = REQ_TYPE_BARRIER;
+ req->bq = bq;
+ req->section = context->section;
+ req->buf = NULL;
+
+ /* Find another barrier to merge with. */
+ QSIMPLEQ_FOREACH(section_req, &bq->sections, link_section) {
+ if (section_req->section >= req->section) {
+
+ /*
+ * If acb is set, the intention of the barrier request is to flush
+ * the complete queue and notify the caller when all requests have
+ * been processed. To achieve this, we may only merge with the very
+ * last request in the queue.
+ */
+ if (acb && QTAILQ_NEXT(section_req, link)) {
+ continue;
+ }
+
+ req->section = section_req->section;
+ context->section = section_req->section + 1;
+ qemu_free(req);
+ req = section_req;
+ goto out;
+ }
+ }
+
+ /*
+ * If there wasn't a barrier for the same section yet, insert a new one at
+ * the end.
+ */
+ DPRINTF("queue-ins flush: %p\n", req);
+ QTAILQ_INSERT_TAIL(&bq->queue, req, link);
+ QSIMPLEQ_INSERT_TAIL(&bq->sections, req, link_section);
+ bq->queue_size++;
+ context->section++;
+
+ bq->barriers_submitted++;
+
+ /*
+ * At this point, req is either the newly inserted request, or a previously
+ * existing barrier with which the current request has been merged.
+ *
+ * Insert the ACB in the list of that request so that the callback is
+ * called when the request has completed.
+ */
+out:
+ if (acb) {
+ QLIST_INSERT_HEAD(&req->acbs, acb, link);
+ }
+
+#ifndef RUN_TESTS
+ blkqueue_process_request(bq);
+#endif
+
+ return 0;
+}
+
+/*
+ * Adds a barrier request to the queue.
+ *
+ * A barrier requested by blkqueue_barrier orders requests within the given
+ * context. It does not do global ordering.
+ */
+int blkqueue_barrier(BlockQueueContext *context)
+{
+ /* Don't flush for writethrough images */
+ if ((context->bq->bs->open_flags & WRITEBACK_MODES) == 0) {
+ return 0;
+ }
+
+ return insert_barrier(context, NULL);
+}
+
+/*
+ * Removes the first request from the queue and returns it. While doing so, it
+ * also takes care of the section list.
+ */
+static BlockQueueRequest *blkqueue_pop(BlockQueue *bq)
+{
+ BlockQueueRequest *req;
+
+ req = QTAILQ_FIRST(&bq->queue);
+ if (req == NULL) {
+ goto out;
+ }
+
+ QTAILQ_REMOVE(&bq->queue, req, link);
+ bq->queue_size--;
+
+ if (req->type == REQ_TYPE_BARRIER) {
+ assert(QSIMPLEQ_FIRST(&bq->sections) == req);
+ QSIMPLEQ_REMOVE_HEAD(&bq->sections, link_section);
+ }
+
+out:
+ return req;
+}
+
+static void blkqueue_free_request(BlockQueueRequest *req)
+{
+ qemu_free(req->buf);
+ qemu_free(req);
+}
+
+/*
+ * If there are any blkqueue_aio_flush callbacks pending, call them with ret
+ * as the error code and remove them from the queue.
+ *
+ * If keep_queue is false, all requests are removed from the queue
+ */
+static void blkqueue_fail_flush(BlockQueue *bq, int ret, bool keep_queue)
+{
+ BlockQueueRequest *req, *next_req;
+ BlockQueueAIOCB *acb, *next_acb;
+
+ QTAILQ_FOREACH_SAFE(req, &bq->queue, link, next_req) {
+
+ /* Call and remove registered callbacks */
+ QLIST_FOREACH_SAFE(acb, &req->acbs, link, next_acb) {
+ acb->common.cb(acb->common.opaque, ret);
+ qemu_free(acb);
+ }
+ QLIST_INIT(&req->acbs);
+
+ /* If requested, remove the request itself */
+ if (!keep_queue) {
+ QTAILQ_REMOVE(&bq->queue, req, link);
+ if (req->type == REQ_TYPE_BARRIER) {
+ QSIMPLEQ_REMOVE(&bq->sections, req, BlockQueueRequest,
+ link_section);
+ }
+ }
+ }
+
+ /* Make sure that blkqueue_flush stops running */
+ bq->flushing = ret;
+}
+
+static void blkqueue_process_request_cb(void *opaque, int ret)
+{
+ BlockQueueRequest *req = opaque;
+ BlockQueue *bq = req->bq;
+ BlockQueueAIOCB *acb, *next;
+
+ DPRINTF(" done req: %p [%#lx + %ld]\n", req, req->offset, req->size);
+
+ /* Remove from in-flight list */
+ QTAILQ_REMOVE(&bq->in_flight, req, link);
+ bq->in_flight_num--;
+
+ /*
+ * Error handling gets a bit complicated, because we have already completed
+ * the requests that went wrong. There are two ways of dealing with this:
+ *
+ * 1. With werror=stop we can put the request back into the queue and stop
+ * the VM. When the user continues the VM, the request is retried.
+ *
+ * 2. In other cases we need to return an error on the next bdrv_flush. The
+ * caller must cope with the fact that he doesn't know which of the
+ * requests succeeded (i.e. invalidate all caches)
+ *
+ * If we're in an blkqueue_aio_flush, we must return an error in both
+ * cases. If we stop the VM, we can clear bq->errno immediately again.
+ * Otherwise, it's cleared in bdrv_(aio_)flush.
+ */
+ if (ret < 0) {
+ if (bq->error_ret != -ENOSPC) {
+ bq->error_ret = ret;
+ }
+ }
+
+ /* Call any callbacks attached to the request (see blkqueue_aio_flush) */
+ QLIST_FOREACH_SAFE(acb, &req->acbs, link, next) {
+ acb->common.cb(acb->common.opaque, bq->error_ret);
+ qemu_free(acb);
+ }
+ QLIST_INIT(&req->acbs);
+
+ /* Handle errors in the VM stop case */
+ if (ret < 0) {
+ bool keep_queue = bq->error_handler(bq->error_opaque, ret);
+
+ /* Fail any flushes that may wait for the queue to become empty */
+ blkqueue_fail_flush(bq, bq->error_ret, keep_queue);
+
+ if (keep_queue) {
+ /* Reinsert request into the queue */
+ QTAILQ_INSERT_HEAD(&bq->queue, req, link);
+ if (req->type == REQ_TYPE_BARRIER) {
+ QSIMPLEQ_INSERT_HEAD(&bq->sections, req, link_section);
+ }
+
+ /* Clear the error to restore a normal state after 'cont' */
+ bq->error_ret = 0;
+ return;
+ }
+ }
+
+ /* Cleanup */
+ blkqueue_free_request(req);
+
+ /* Check if there are more requests to submit */
+ blkqueue_process_request(bq);
+}
+
+/*
+ * Checks if the first request on the queue can run. If so, remove it from the
+ * queue, submit the request and put it onto the queue of in-flight requests.
+ *
+ * Returns 0 if a request has been submitted, -1 if no request can run or an
+ * error has occurred.
+ */
+static int blkqueue_submit_request(BlockQueue *bq)
+{
+ BlockDriverAIOCB *acb;
+ BlockQueueRequest *req;
+
+ /*
+ * If we had an error, we must not submit new requests from another
+ * section or may we get ordering problems. In fact, not submitting any new
+ * requests looks like a good idea in this case.
+ */
+ if (bq->error_ret) {
+ return -1;
+ }
+
+ /* Fetch a request */
+ req = QTAILQ_FIRST(&bq->queue);
+ if (req == NULL) {
+ return -1;
+ }
+
+ /* Writethrough images aren't supposed to have any queue entries */
+ assert((bq->bs->open_flags & WRITEBACK_MODES) != 0);
+
+ /*
+ * We need to wait for completion before we can submit new requests:
+ * 1. If we're currently processing a barrier, or the new request is a
+ * barrier, we need to guarantee this barrier semantics.
+ * 2. We must make sure that newer writes cannot pass older ones.
+ */
+ if (bq->in_flight_num > 0) {
+ return -1;
+ }
+
+ /* Process barriers only if the queue is long enough */
+ if (!bq->flushing) {
+ if (req->type == REQ_TYPE_BARRIER && bq->queue_size < 50) {
+ return -1;
+ }
+ }
+
+ /*
+ * Copy the request in the queue of currently processed requests so that
+ * blkqueue_pread continues to read from the queue before the request has
+ * completed.
+ */
+ blkqueue_pop(bq);
+ QTAILQ_INSERT_TAIL(&bq->in_flight, req, link);
+
+ bq->in_flight_num++;
+ bq->in_flight_type = req->type;
+
+ /* Submit the request */
+ switch (req->type) {
+ case REQ_TYPE_WRITE:
+ DPRINTF(" process pwrite: %p [%#lx + %ld]\n",
+ req, req->offset, req->size);
+ acb = bdrv_aio_pwrite(bq->bs, req->offset, req->buf, req->size,
+ blkqueue_process_request_cb, req);
+ break;
+ case REQ_TYPE_BARRIER:
+ DPRINTF(" process flush\n");
+ acb = bdrv_aio_flush(bq->bs, blkqueue_process_request_cb, req);
+ break;
+ default:
+ /* Make gcc happy (acb would be uninitialized) */
+ return -1;
+ }
+
+ if (!acb) {
+ blkqueue_process_request_cb(req, -EIO);
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ * Starts execution of the queue if requests are ready to run.
+ */
+static void blkqueue_process_request(BlockQueue *bq)
+{
+ int ret = 0;
+
+ while (ret >= 0) {
+ ret = blkqueue_submit_request(bq);
+ }
+}
+
+static void blkqueue_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+ BlockQueueAIOCB *acb = (BlockQueueAIOCB*) blockacb;
+
+ /*
+ * We can't cancel the flush any more, but that doesn't hurt. We just
+ * need to make sure that we don't call the callback when it completes.
+ */
+ QLIST_REMOVE(acb, link);
+ qemu_free(acb);
+}
+
+/*
+ * Inserts a barrier at the end of the queue (or merges with an existing
+ * barrier there). Once the barrier has completed, the callback is called.
+ */
+BlockDriverAIOCB* blkqueue_aio_flush(BlockQueueContext *context,
+ BlockDriverCompletionFunc *cb, void *opaque)
+{
+ BlockQueueAIOCB *acb;
+ BlockDriverState *bs = context->bq->bs;
+ int ret;
+
+ /* Don't use the queue for writethrough images */
+ if ((bs->open_flags & WRITEBACK_MODES) == 0) {
+ return bdrv_aio_flush(bs, cb, opaque);
+ }
+
+ /* Insert a barrier into the queue */
+ acb = qemu_aio_get(&blkqueue_aio_pool, NULL, cb, opaque);
+
+ ret = insert_barrier(context, acb);
+ if (ret < 0) {
+ cb(opaque, ret);
+ qemu_free(acb);
+ }
+
+ return &acb->common;
+}
+
+/*
+ * Flushes the queue (i.e. disables waiting for new requests to be batched) and
+ * waits until all requests in the queue have completed.
+ *
+ * Note that unlike blkqueue_aio_flush this does not call bdrv_flush().
+ */
+int blkqueue_flush(BlockQueue *bq)
+{
+ int res = 0;
+
+ bq->flushing = 1;
+
+ /* Process any left over requests */
+ while ((bq->flushing > 0) &&
+ (bq->in_flight_num || QTAILQ_FIRST(&bq->queue)))
+ {
+ blkqueue_process_request(bq);
+ qemu_aio_wait();
+ }
+
+ /*
+ * bq->flushing contains the error if it could be handled by stopping the
+ * VM, error_ret contains it if we're not allowed to do this.
+ */
+ if (bq->error_ret < 0) {
+ res = bq->error_ret;
+
+ /*
+ * Wait for AIO requests, so that the queue is really unused after
+ * blkqueue_flush() and the caller can destroy it
+ */
+ if (res < 0) {
+ qemu_aio_flush();
+ }
+ } else if (bq->flushing < 0) {
+ res = bq->flushing;
+ }
+
+ bq->flushing = 0;
+ bq->error_ret = 0;
+
+ return res;
+}
diff --git a/block-queue.h b/block-queue.h
new file mode 100644
index 0000000..3cff695
--- /dev/null
+++ b/block-queue.h
@@ -0,0 +1,61 @@
+/*
+ * QEMU System Emulator
+ *
+ * Copyright (c) 2010 Kevin Wolf <kwolf@redhat.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#ifndef BLOCK_QUEUE_H
+#define BLOCK_QUEUE_H
+
+#include "qemu-common.h"
+
+typedef struct BlockQueue BlockQueue;
+
+/*
+ * Returns true if the error has been handled (e.g. by stopping the VM), the
+ * error status should be cleared and the requests should be re-inserted into
+ * the queue.
+ *
+ * Returns false if the request should be completed and the next flush should
+ * fail.
+ */
+typedef bool (*BlockQueueErrorHandler)(void *opaque, int ret);
+
+typedef struct BlockQueueContext {
+ BlockQueue* bq;
+ unsigned section;
+} BlockQueueContext;
+
+BlockQueue *blkqueue_create(BlockDriverState *bs,
+ BlockQueueErrorHandler error_handler, void *error_opaque);
+void blkqueue_init_context(BlockQueueContext* context, BlockQueue *bq);
+void blkqueue_destroy(BlockQueue *bq);
+bool blkqueue_is_empty(BlockQueue *bq);
+int blkqueue_pread(BlockQueueContext *context, uint64_t offset, void *buf,
+ uint64_t size);
+int blkqueue_pwrite(BlockQueueContext *context, uint64_t offset, void *buf,
+ uint64_t size);
+int blkqueue_barrier(BlockQueueContext *context);
+int blkqueue_flush(BlockQueue *bq);
+BlockDriverAIOCB* blkqueue_aio_flush(BlockQueueContext *context,
+ BlockDriverCompletionFunc *cb, void *opaque);
+
+#endif
--
1.7.2.3
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [Qemu-devel] [RFC PATCH v3 3/4] Test cases for block-queue
2010-11-30 12:48 [Qemu-devel] [RFC PATCH v3 0/4] block-queue: Delay and batch metadata writes Kevin Wolf
2010-11-30 12:48 ` [Qemu-devel] [RFC PATCH v3 1/4] block: Implement bdrv_aio_pwrite Kevin Wolf
2010-11-30 12:48 ` [Qemu-devel] [RFC PATCH v3 2/4] Add block-queue Kevin Wolf
@ 2010-11-30 12:48 ` Kevin Wolf
2010-11-30 12:48 ` [Qemu-devel] [RFC PATCH v3 4/4] qcow2: Preliminary block-queue support Kevin Wolf
3 siblings, 0 replies; 9+ messages in thread
From: Kevin Wolf @ 2010-11-30 12:48 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf, stefanha
Add some unit tests especially for the ordering and request merging in
block-queue.
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
---
Makefile | 1 +
check-block-queue.c | 402 +++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 403 insertions(+), 0 deletions(-)
create mode 100644 check-block-queue.c
diff --git a/Makefile b/Makefile
index 4e120a2..3e7071c 100644
--- a/Makefile
+++ b/Makefile
@@ -171,6 +171,7 @@ check-qdict: check-qdict.o qdict.o qfloat.o qint.o qstring.o qbool.o qlist.o $(C
check-qlist: check-qlist.o qlist.o qint.o $(CHECK_PROG_DEPS)
check-qfloat: check-qfloat.o qfloat.o $(CHECK_PROG_DEPS)
check-qjson: check-qjson.o qfloat.o qint.o qdict.o qstring.o qlist.o qbool.o qjson.o json-streamer.o json-lexer.o json-parser.o $(CHECK_PROG_DEPS)
+check-block-queue: check-block-queue.o qemu-tool.o qemu-error.o $(oslib-obj-y) $(filter-out block-queue.o,$(block-obj-y)) $(qobject-obj-y) qemu-timer-common.o
clean:
# avoid old build problems by removing potentially incorrect old files
diff --git a/check-block-queue.c b/check-block-queue.c
new file mode 100644
index 0000000..4dae068
--- /dev/null
+++ b/check-block-queue.c
@@ -0,0 +1,402 @@
+/*
+ * block-queue.c unit tests
+ *
+ * Copyright (c) 2010 Kevin Wolf <kwolf@redhat.com>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+/* We want to test some static functions, so just include the source file */
+#define RUN_TESTS
+#include "block-queue.c"
+
+#define CHECK_WRITE(req, _bq, _offset, _size, _buf, _section) \
+ do { \
+ assert(req != NULL); \
+ assert(req->type == REQ_TYPE_WRITE); \
+ assert(req->bq == _bq); \
+ assert(req->offset == _offset); \
+ assert(req->size == _size); \
+ assert(req->section == _section); \
+ assert(!memcmp(req->buf, _buf, _size)); \
+ } while(0)
+
+#define CHECK_BARRIER(req, _bq, _section) \
+ do { \
+ assert(req != NULL); \
+ assert(req->type == REQ_TYPE_BARRIER); \
+ assert(req->bq == _bq); \
+ assert(req->section == _section); \
+ } while(0)
+
+#define CHECK_READ(_context, _offset, _buf, _size, _cmpbuf) \
+ do { \
+ int ret; \
+ memset(buf, 0, 512); \
+ ret = blkqueue_pread(_context, _offset, _buf, _size); \
+ assert(ret == 0); \
+ assert(!memcmp(_cmpbuf, _buf, _size)); \
+ } while(0)
+
+#define QUEUE_WRITE(_context, _offset, _buf, _size, _pattern) \
+ do { \
+ int ret; \
+ memset(_buf, _pattern, _size); \
+ ret = blkqueue_pwrite(_context, _offset, _buf, _size); \
+ assert(ret == 0); \
+ } while(0)
+#define QUEUE_BARRIER(_context) \
+ do { \
+ int ret; \
+ ret = blkqueue_barrier(_context); \
+ assert(ret == 0); \
+ } while(0)
+
+#define POP_CHECK_WRITE(_bq, _offset, _buf, _size, _pattern, _section) \
+ do { \
+ BlockQueueRequest *req; \
+ memset(_buf, _pattern, _size); \
+ req = blkqueue_pop(_bq); \
+ CHECK_WRITE(req, _bq, _offset, _size, _buf, _section); \
+ blkqueue_free_request(req); \
+ } while(0)
+#define POP_CHECK_BARRIER(_bq, _section) \
+ do { \
+ BlockQueueRequest *req; \
+ req = blkqueue_pop(_bq); \
+ CHECK_BARRIER(req, _bq, _section); \
+ blkqueue_free_request(req); \
+ } while(0)
+
+static void __attribute__((used)) dump_queue(BlockQueue *bq)
+{
+ BlockQueueRequest *req;
+
+ fprintf(stderr, "--- Queue dump ---\n");
+ QTAILQ_FOREACH(req, &bq->queue, link) {
+ fprintf(stderr, "[%d] ", req->section);
+ if (req->type == REQ_TYPE_WRITE) {
+ fprintf(stderr, "Write off=%5"PRId64", len=%5"PRId64", buf=%p\n",
+ req->offset, req->size, req->buf);
+ } else if (req->type == REQ_TYPE_BARRIER) {
+ fprintf(stderr, "Barrier\n");
+ } else {
+ fprintf(stderr, "Unknown type %d\n", req->type);
+ }
+ }
+}
+
+static void test_basic(BlockDriverState *bs)
+{
+ uint8_t buf[512];
+ BlockQueue *bq;
+ BlockQueueContext context;
+
+ bq = blkqueue_create(bs, NULL, NULL);
+ blkqueue_init_context(&context, bq);
+
+ /* Queue requests */
+ QUEUE_WRITE(&context, 0, buf, 512, 0x12);
+ QUEUE_WRITE(&context, 512, buf, 42, 0x34);
+ QUEUE_BARRIER(&context);
+ QUEUE_WRITE(&context, 678, buf, 42, 0x56);
+
+ /* Verify queue contents */
+ POP_CHECK_WRITE(bq, 0, buf, 512, 0x12, 0);
+ POP_CHECK_WRITE(bq, 512, buf, 42, 0x34, 0);
+ POP_CHECK_BARRIER(bq, 0);
+ POP_CHECK_WRITE(bq, 678, buf, 42, 0x56, 1);
+
+ blkqueue_destroy(bq);
+}
+
+static void test_merge(BlockDriverState *bs)
+{
+ uint8_t buf[512];
+ BlockQueue *bq;
+ BlockQueueContext ctx1, ctx2;
+
+ bq = blkqueue_create(bs, NULL, NULL);
+ blkqueue_init_context(&ctx1, bq);
+ blkqueue_init_context(&ctx2, bq);
+
+ /* Queue requests */
+ QUEUE_WRITE(&ctx1, 0, buf, 512, 0x12);
+ QUEUE_BARRIER(&ctx1);
+ QUEUE_WRITE(&ctx2, 512, buf, 42, 0x34);
+ QUEUE_WRITE(&ctx1, 1024, buf, 512, 0x12);
+ QUEUE_BARRIER(&ctx2);
+ QUEUE_WRITE(&ctx2, 1536, buf, 42, 0x34);
+
+ /* Verify queue contents */
+ POP_CHECK_WRITE(bq, 0, buf, 512, 0x12, 0);
+ POP_CHECK_WRITE(bq, 512, buf, 42, 0x34, 0);
+ POP_CHECK_BARRIER(bq, 0);
+ POP_CHECK_WRITE(bq, 1024, buf, 512, 0x12, 1);
+ POP_CHECK_WRITE(bq, 1536, buf, 42, 0x34, 1);
+
+ /* Same queue, new contexts */
+ blkqueue_init_context(&ctx1, bq);
+ blkqueue_init_context(&ctx2, bq);
+
+ /* Queue requests */
+ QUEUE_BARRIER(&ctx2);
+ QUEUE_WRITE(&ctx2, 512, buf, 42, 0x34);
+ QUEUE_WRITE(&ctx2, 12, buf, 20, 0x45);
+ QUEUE_BARRIER(&ctx2);
+ QUEUE_WRITE(&ctx2, 2892, buf, 142, 0x56);
+
+ QUEUE_WRITE(&ctx1, 0, buf, 8, 0x12);
+ QUEUE_BARRIER(&ctx1);
+ QUEUE_WRITE(&ctx1, 1024, buf, 512, 0x12);
+ QUEUE_BARRIER(&ctx1);
+ QUEUE_WRITE(&ctx1, 2512, buf, 42, 0x34);
+ QUEUE_BARRIER(&ctx1);
+
+ /* Verify queue contents */
+ POP_CHECK_WRITE(bq, 0, buf, 8, 0x12, 0);
+ POP_CHECK_BARRIER(bq, 0);
+ POP_CHECK_WRITE(bq, 512, buf, 42, 0x34, 1);
+ POP_CHECK_WRITE(bq, 12, buf, 20, 0x45, 1);
+ POP_CHECK_WRITE(bq, 1024, buf, 512, 0x12, 1);
+ POP_CHECK_BARRIER(bq, 1);
+ POP_CHECK_WRITE(bq, 2892, buf, 142, 0x56, 2);
+ POP_CHECK_WRITE(bq, 2512, buf, 42, 0x34, 2);
+ POP_CHECK_BARRIER(bq, 2);
+
+ blkqueue_destroy(bq);
+}
+
+static void test_read(BlockDriverState *bs)
+{
+ uint8_t buf[512], buf2[512];
+ BlockQueue *bq;
+ BlockQueueContext ctx1;
+
+ bq = blkqueue_create(bs, NULL, NULL);
+ blkqueue_init_context(&ctx1, bq);
+
+ /* Queue requests and do some test reads */
+ memset(buf2, 0xa5, 512);
+ CHECK_READ(&ctx1, 0, buf, 32, buf2);
+
+ QUEUE_WRITE(&ctx1, 5, buf, 5, 0x12);
+ memset(buf2, 0x12, 5);
+ CHECK_READ(&ctx1, 5, buf, 5, buf2);
+ CHECK_READ(&ctx1, 7, buf, 2, buf2);
+ memset(buf2, 0xa5, 512);
+ memset(buf2 + 5, 0x12, 5);
+ CHECK_READ(&ctx1, 0, buf, 8, buf2);
+ CHECK_READ(&ctx1, 0, buf, 10, buf2);
+ CHECK_READ(&ctx1, 0, buf, 32, buf2);
+ memset(buf2, 0xa5, 512);
+ memset(buf2, 0x12, 5);
+ CHECK_READ(&ctx1, 5, buf, 16, buf2);
+ memset(buf2, 0xa5, 512);
+ CHECK_READ(&ctx1, 0, buf, 2, buf2);
+ CHECK_READ(&ctx1, 10, buf, 16, buf2);
+
+ QUEUE_WRITE(&ctx1, 0, buf, 2, 0x12);
+ memset(&buf2[5], 0x12, 5);
+ memset(buf2, 0x12, 2);
+ CHECK_READ(&ctx1, 0, buf, 32, buf2);
+
+ /* Verify queue contents */
+ POP_CHECK_WRITE(bq, 5, buf, 5, 0x12, 0);
+ POP_CHECK_WRITE(bq, 0, buf, 2, 0x12, 0);
+
+ blkqueue_destroy(bq);
+}
+
+static void test_read_order(BlockDriverState *bs)
+{
+ uint8_t buf[512], buf2[512];
+ BlockQueue *bq;
+ BlockQueueContext ctx1, ctx2;
+
+ bq = blkqueue_create(bs, NULL, NULL);
+ blkqueue_init_context(&ctx1, bq);
+ blkqueue_init_context(&ctx2, bq);
+
+ /* Queue requests and do some test reads */
+ QUEUE_WRITE(&ctx1, 25, buf, 5, 0x44);
+ QUEUE_BARRIER(&ctx1);
+ QUEUE_WRITE(&ctx1, 5, buf, 5, 0x12);
+ QUEUE_BARRIER(&ctx1);
+ QUEUE_WRITE(&ctx2, 10, buf, 5, 0x34);
+
+ memset(buf2, 0xa5, 512);
+ memset(buf2 + 5, 0x12, 5);
+ memset(buf2 + 10, 0x34, 5);
+ CHECK_READ(&ctx2, 0, buf, 20, buf2);
+ QUEUE_WRITE(&ctx2, 0, buf, 10, 0x34);
+ QUEUE_BARRIER(&ctx2);
+
+ /* Verify queue contents */
+ POP_CHECK_WRITE(bq, 25, buf, 5, 0x44, 0);
+ POP_CHECK_WRITE(bq, 10, buf, 5, 0x34, 0);
+ POP_CHECK_BARRIER(bq, 0);
+ POP_CHECK_WRITE(bq, 5, buf, 5, 0x34, 1);
+ POP_CHECK_WRITE(bq, 0, buf, 5, 0x34, 1);
+ POP_CHECK_BARRIER(bq, 1);
+
+ blkqueue_destroy(bq);
+}
+
+static void test_write_order(BlockDriverState *bs)
+{
+ uint8_t buf[512], buf2[512];
+ BlockQueue *bq;
+ BlockQueueContext context;
+
+ bq = blkqueue_create(bs, NULL, NULL);
+
+ /* Merging two writes */
+ /* Queue requests */
+ blkqueue_init_context(&context, bq);
+ QUEUE_WRITE(&context, 0, buf, 512, 0x12);
+ QUEUE_BARRIER(&context);
+ QUEUE_WRITE(&context, 512, buf, 512, 0x56);
+
+ blkqueue_init_context(&context, bq);
+ QUEUE_WRITE(&context, 512, buf, 512, 0x34);
+
+ /* Verify queue contents */
+ POP_CHECK_WRITE(bq, 0, buf, 512, 0x12, 0);
+ POP_CHECK_BARRIER(bq, 0);
+ POP_CHECK_WRITE(bq, 512, buf, 512, 0x34, 1);
+
+ /* Queue requests once again */
+ blkqueue_init_context(&context, bq);
+ QUEUE_WRITE(&context, 0, buf, 512, 0x12);
+ QUEUE_BARRIER(&context);
+ QUEUE_WRITE(&context, 512, buf, 512, 0x56);
+
+ blkqueue_init_context(&context, bq);
+ QUEUE_WRITE(&context, 512, buf, 512, 0x34);
+
+ /* Check if the right values are read back */
+ memset(buf2, 0x34, 512);
+ CHECK_READ(&context, 512, buf, 512, buf2);
+ blkqueue_process_request(bq);
+ qemu_aio_flush();
+ memset(buf2, 0x34, 512);
+ CHECK_READ(&context, 512, buf, 512, buf2);
+
+ blkqueue_flush(bq);
+
+ /* Must not merge with write in earlier section */
+ /* Queue requests */
+ blkqueue_init_context(&context, bq);
+ QUEUE_WRITE(&context, 0, buf, 512, 0x12);
+
+ blkqueue_init_context(&context, bq);
+ QUEUE_WRITE(&context, 512, buf, 512, 0x34);
+ QUEUE_BARRIER(&context);
+ QUEUE_WRITE(&context, 0, buf, 512, 0x56);
+
+ /* Verify queue contents */
+ POP_CHECK_WRITE(bq, 0, buf, 512, 0x12, 0);
+ POP_CHECK_WRITE(bq, 512, buf, 512, 0x34, 0);
+ POP_CHECK_BARRIER(bq, 0);
+ POP_CHECK_WRITE(bq, 0, buf, 512, 0x56, 1);
+
+ blkqueue_destroy(bq);
+}
+
+static void test_process_request(BlockDriverState *bs)
+{
+ uint8_t buf[512], buf2[512];
+ BlockQueue *bq;
+ BlockQueueContext ctx1;
+
+ bq = blkqueue_create(bs, NULL, NULL);
+ blkqueue_init_context(&ctx1, bq);
+
+ /* Queue requests and do a test read */
+ QUEUE_WRITE(&ctx1, 25, buf, 5, 0x44);
+ QUEUE_BARRIER(&ctx1);
+
+ memset(buf2, 0xa5, 512);
+ memset(buf2 + 25, 0x44, 5);
+ CHECK_READ(&ctx1, 0, buf, 64, buf2);
+
+ /* Process the requests */
+ blkqueue_process_request(bq);
+
+ /* Check if we still read the same */
+ CHECK_READ(&ctx1, 0, buf, 64, buf2);
+
+ /* Process the AIO requests and check again */
+ qemu_aio_flush();
+ assert(bq->barriers_submitted == 1);
+ assert(bq->in_flight_num == 0);
+ CHECK_READ(&ctx1, 0, buf, 64, buf2);
+
+ /* Run the barrier */
+ blkqueue_flush(bq);
+
+ /* Verify the queue is empty */
+ assert(blkqueue_pop(bq) == NULL);
+
+ /* Check that processing an empty queue works */
+ blkqueue_process_request(bq);
+
+ blkqueue_destroy(bq);
+}
+
+static void run_test(void (*testfn)(BlockDriverState*), BlockDriverState *bs)
+{
+ void* buf;
+ int ret;
+
+ buf = qemu_malloc(1024 * 1024);
+ memset(buf, 0xa5, 1024 * 1024);
+ ret = bdrv_write(bs, 0, buf, 2048);
+ assert(ret >= 0);
+ qemu_free(buf);
+
+ testfn(bs);
+}
+
+int main(void)
+{
+ BlockDriverState *bs;
+ int ret;
+
+ bdrv_init();
+ bs = bdrv_new("");
+ ret = bdrv_open(bs, "block-queue.img", BDRV_O_RDWR | BDRV_O_CACHE_WB, NULL);
+ if (ret < 0) {
+ fprintf(stderr, "Couldn't open block-queue.img: %s\n",
+ strerror(-ret));
+ exit(1);
+ }
+
+ run_test(&test_basic, bs);
+ run_test(&test_merge, bs);
+ run_test(&test_read, bs);
+ run_test(&test_read_order, bs);
+ run_test(&test_write_order, bs);
+ run_test(&test_process_request, bs);
+
+ bdrv_delete(bs);
+
+ return 0;
+}
--
1.7.2.3
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [Qemu-devel] [RFC PATCH v3 4/4] qcow2: Preliminary block-queue support
2010-11-30 12:48 [Qemu-devel] [RFC PATCH v3 0/4] block-queue: Delay and batch metadata writes Kevin Wolf
` (2 preceding siblings ...)
2010-11-30 12:48 ` [Qemu-devel] [RFC PATCH v3 3/4] Test cases for block-queue Kevin Wolf
@ 2010-11-30 12:48 ` Kevin Wolf
3 siblings, 0 replies; 9+ messages in thread
From: Kevin Wolf @ 2010-11-30 12:48 UTC (permalink / raw)
To: qemu-devel; +Cc: kwolf, stefanha
This is a first hack that makes qcow2 use block-queue. Will be reworked to pass
down the blkqueue context to all functions that queue requests instead of using
a global per-image context.
---
block/qcow2-cluster.c | 39 +++++++++++-----------
block/qcow2-refcount.c | 62 ++++++++++++++++++------------------
block/qcow2.c | 83 +++++++++++++++++++++++++++++++++++++++++++++++-
block/qcow2.h | 5 +++
cpus.c | 8 +++--
qemu-common.h | 3 ++
qemu-tool.c | 5 +++
sysemu.h | 1 -
8 files changed, 151 insertions(+), 55 deletions(-)
diff --git a/block/qcow2-cluster.c b/block/qcow2-cluster.c
index b040208..674699d 100644
--- a/block/qcow2-cluster.c
+++ b/block/qcow2-cluster.c
@@ -67,12 +67,13 @@ int qcow2_grow_l1_table(BlockDriverState *bs, int min_size, bool exact_size)
qemu_free(new_l1_table);
return new_l1_table_offset;
}
- bdrv_flush(bs->file);
+ blkqueue_barrier(s->bq_context);
BLKDBG_EVENT(bs->file, BLKDBG_L1_GROW_WRITE_TABLE);
for(i = 0; i < s->l1_size; i++)
new_l1_table[i] = cpu_to_be64(new_l1_table[i]);
- ret = bdrv_pwrite_sync(bs->file, new_l1_table_offset, new_l1_table, new_l1_size2);
+ ret = blkqueue_pwrite(s->bq_context, new_l1_table_offset, new_l1_table, new_l1_size2);
+ blkqueue_barrier(s->bq_context);
if (ret < 0)
goto fail;
for(i = 0; i < s->l1_size; i++)
@@ -82,7 +83,8 @@ int qcow2_grow_l1_table(BlockDriverState *bs, int min_size, bool exact_size)
BLKDBG_EVENT(bs->file, BLKDBG_L1_GROW_ACTIVATE_TABLE);
cpu_to_be32w((uint32_t*)data, new_l1_size);
cpu_to_be64w((uint64_t*)(data + 4), new_l1_table_offset);
- ret = bdrv_pwrite_sync(bs->file, offsetof(QCowHeader, l1_size), data,sizeof(data));
+ ret = blkqueue_pwrite(s->bq_context, offsetof(QCowHeader, l1_size), data, sizeof(data));
+ blkqueue_barrier(s->bq_context);
if (ret < 0) {
goto fail;
}
@@ -185,8 +187,7 @@ static int l2_load(BlockDriverState *bs, uint64_t l2_offset,
*l2_table = s->l2_cache + (min_index << s->l2_bits);
BLKDBG_EVENT(bs->file, BLKDBG_L2_LOAD);
- ret = bdrv_pread(bs->file, l2_offset, *l2_table,
- s->l2_size * sizeof(uint64_t));
+ ret = blkqueue_pread(s->bq_context, l2_offset, *l2_table, s->l2_size * sizeof(uint64_t));
if (ret < 0) {
qcow2_l2_cache_reset(bs);
return ret;
@@ -216,8 +217,8 @@ static int write_l1_entry(BlockDriverState *bs, int l1_index)
}
BLKDBG_EVENT(bs->file, BLKDBG_L1_UPDATE);
- ret = bdrv_pwrite_sync(bs->file, s->l1_table_offset + 8 * l1_start_index,
- buf, sizeof(buf));
+ ret = blkqueue_pwrite(s->bq_context, s->l1_table_offset + 8 * l1_start_index, buf, sizeof(buf));
+ blkqueue_barrier(s->bq_context);
if (ret < 0) {
return ret;
}
@@ -252,7 +253,7 @@ static int l2_allocate(BlockDriverState *bs, int l1_index, uint64_t **table)
if (l2_offset < 0) {
return l2_offset;
}
- bdrv_flush(bs->file);
+ blkqueue_barrier(s->bq_context);
/* allocate a new entry in the l2 cache */
@@ -265,16 +266,15 @@ static int l2_allocate(BlockDriverState *bs, int l1_index, uint64_t **table)
} else {
/* if there was an old l2 table, read it from the disk */
BLKDBG_EVENT(bs->file, BLKDBG_L2_ALLOC_COW_READ);
- ret = bdrv_pread(bs->file, old_l2_offset, l2_table,
- s->l2_size * sizeof(uint64_t));
+ ret = blkqueue_pread(s->bq_context, old_l2_offset, l2_table, s->l2_size * sizeof(uint64_t));
if (ret < 0) {
goto fail;
}
}
/* write the l2 table to the file */
BLKDBG_EVENT(bs->file, BLKDBG_L2_ALLOC_WRITE);
- ret = bdrv_pwrite_sync(bs->file, l2_offset, l2_table,
- s->l2_size * sizeof(uint64_t));
+ ret = blkqueue_pwrite(s->bq_context, l2_offset, l2_table, s->l2_size * sizeof(uint64_t));
+ blkqueue_barrier(s->bq_context);
if (ret < 0) {
goto fail;
}
@@ -394,8 +394,8 @@ static int qcow_read(BlockDriverState *bs, int64_t sector_num,
memcpy(buf, s->cluster_cache + index_in_cluster * 512, 512 * n);
} else {
BLKDBG_EVENT(bs->file, BLKDBG_READ);
- ret = bdrv_pread(bs->file, cluster_offset + index_in_cluster * 512, buf, n * 512);
- if (ret != n * 512)
+ ret = blkqueue_pread(s->bq_context, cluster_offset + index_in_cluster * 512, buf, n * 512);
+ if (ret < 0)
return -1;
if (s->crypt_method) {
qcow2_encrypt_sectors(s, sector_num, buf, buf, n, 0,
@@ -647,11 +647,12 @@ uint64_t qcow2_alloc_compressed_cluster_offset(BlockDriverState *bs,
BLKDBG_EVENT(bs->file, BLKDBG_L2_UPDATE_COMPRESSED);
l2_table[l2_index] = cpu_to_be64(cluster_offset);
- if (bdrv_pwrite_sync(bs->file,
+ if (blkqueue_pwrite(s->bq_context,
l2_offset + l2_index * sizeof(uint64_t),
l2_table + l2_index,
sizeof(uint64_t)) < 0)
return 0;
+ blkqueue_barrier(s->bq_context);
return cluster_offset;
}
@@ -664,6 +665,7 @@ uint64_t qcow2_alloc_compressed_cluster_offset(BlockDriverState *bs,
static int write_l2_entries(BlockDriverState *bs, uint64_t *l2_table,
uint64_t l2_offset, int l2_index, int num)
{
+ BDRVQcowState *s = bs->opaque;
int l2_start_index = l2_index & ~(L1_ENTRIES_PER_SECTOR - 1);
int start_offset = (8 * l2_index) & ~511;
int end_offset = (8 * (l2_index + num) + 511) & ~511;
@@ -671,8 +673,7 @@ static int write_l2_entries(BlockDriverState *bs, uint64_t *l2_table,
int ret;
BLKDBG_EVENT(bs->file, BLKDBG_L2_UPDATE);
- ret = bdrv_pwrite(bs->file, l2_offset + start_offset,
- &l2_table[l2_start_index], len);
+ ret = blkqueue_pwrite(s->bq_context, l2_offset + start_offset, &l2_table[l2_start_index], len);
if (ret < 0) {
return ret;
}
@@ -733,7 +734,7 @@ int qcow2_alloc_cluster_link_l2(BlockDriverState *bs, QCowL2Meta *m)
* need to be sure that the refcounts have been increased and COW was
* handled.
*/
- bdrv_flush(bs->file);
+ blkqueue_barrier(s->bq_context);
ret = write_l2_entries(bs, l2_table, l2_offset, l2_index, m->nb_clusters);
if (ret < 0) {
@@ -746,7 +747,7 @@ int qcow2_alloc_cluster_link_l2(BlockDriverState *bs, QCowL2Meta *m)
* Also flush bs->file to get the right order for L2 and refcount update.
*/
if (j != 0) {
- bdrv_flush(bs->file);
+ blkqueue_barrier(s->bq_context);
for (i = 0; i < j; i++) {
qcow2_free_any_clusters(bs,
be64_to_cpu(old_cluster[i]) & ~QCOW_OFLAG_COPIED, 1);
diff --git a/block/qcow2-refcount.c b/block/qcow2-refcount.c
index a10453c..ef109e9 100644
--- a/block/qcow2-refcount.c
+++ b/block/qcow2-refcount.c
@@ -44,7 +44,7 @@ static int write_refcount_block(BlockDriverState *bs)
}
BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_UPDATE);
- if (bdrv_pwrite_sync(bs->file, s->refcount_block_cache_offset,
+ if (blkqueue_pwrite(s->bq_context, s->refcount_block_cache_offset,
s->refcount_block_cache, size) < 0)
{
return -EIO;
@@ -66,8 +66,7 @@ int qcow2_refcount_init(BlockDriverState *bs)
s->refcount_table = qemu_malloc(refcount_table_size2);
if (s->refcount_table_size > 0) {
BLKDBG_EVENT(bs->file, BLKDBG_REFTABLE_LOAD);
- ret = bdrv_pread(bs->file, s->refcount_table_offset,
- s->refcount_table, refcount_table_size2);
+ ret = bdrv_pread(bs->file, s->refcount_table_offset, s->refcount_table, refcount_table_size2);
if (ret != refcount_table_size2)
goto fail;
for(i = 0; i < s->refcount_table_size; i++)
@@ -100,8 +99,7 @@ static int load_refcount_block(BlockDriverState *bs,
}
BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_LOAD);
- ret = bdrv_pread(bs->file, refcount_block_offset, s->refcount_block_cache,
- s->cluster_size);
+ ret = blkqueue_pread(s->bq_context, refcount_block_offset, s->refcount_block_cache, s->cluster_size);
if (ret < 0) {
s->refcount_block_cache_offset = 0;
return ret;
@@ -262,7 +260,7 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
goto fail_block;
}
- bdrv_flush(bs->file);
+ blkqueue_barrier(s->bq_context);
/* Initialize the new refcount block only after updating its refcount,
* update_refcount uses the refcount cache itself */
@@ -272,8 +270,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
/* Now the new refcount block needs to be written to disk */
BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_WRITE);
- ret = bdrv_pwrite_sync(bs->file, new_block, s->refcount_block_cache,
- s->cluster_size);
+ ret = blkqueue_pwrite(s->bq_context, new_block, s->refcount_block_cache, s->cluster_size);
+ blkqueue_barrier(s->bq_context);
if (ret < 0) {
goto fail_block;
}
@@ -282,9 +280,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
if (refcount_table_index < s->refcount_table_size) {
uint64_t data64 = cpu_to_be64(new_block);
BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_HOOKUP);
- ret = bdrv_pwrite_sync(bs->file,
- s->refcount_table_offset + refcount_table_index * sizeof(uint64_t),
- &data64, sizeof(data64));
+ ret = blkqueue_pwrite(s->bq_context, s->refcount_table_offset + refcount_table_index * sizeof(uint64_t), &data64, sizeof(data64));
+ blkqueue_barrier(s->bq_context);
if (ret < 0) {
goto fail_block;
}
@@ -362,8 +359,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
/* Write refcount blocks to disk */
BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_WRITE_BLOCKS);
- ret = bdrv_pwrite_sync(bs->file, meta_offset, new_blocks,
- blocks_clusters * s->cluster_size);
+ ret = blkqueue_pwrite(s->bq_context, meta_offset, new_blocks, blocks_clusters * s->cluster_size);
+ blkqueue_barrier(s->bq_context);
qemu_free(new_blocks);
if (ret < 0) {
goto fail_table;
@@ -375,8 +372,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
}
BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_WRITE_TABLE);
- ret = bdrv_pwrite_sync(bs->file, table_offset, new_table,
- table_size * sizeof(uint64_t));
+ ret = blkqueue_pwrite(s->bq_context, table_offset, new_table, table_size * sizeof(uint64_t));
+ blkqueue_barrier(s->bq_context);
if (ret < 0) {
goto fail_table;
}
@@ -390,8 +387,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
cpu_to_be64w((uint64_t*)data, table_offset);
cpu_to_be32w((uint32_t*)(data + 8), table_clusters);
BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_SWITCH_TABLE);
- ret = bdrv_pwrite_sync(bs->file, offsetof(QCowHeader, refcount_table_offset),
- data, sizeof(data));
+ ret = blkqueue_pwrite(s->bq_context, offsetof(QCowHeader, refcount_table_offset), data, sizeof(data));
+ blkqueue_barrier(s->bq_context);
if (ret < 0) {
goto fail_table;
}
@@ -447,9 +444,7 @@ static int write_refcount_block_entries(BlockDriverState *bs,
size = (last_index - first_index) << REFCOUNT_SHIFT;
BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_UPDATE_PART);
- ret = bdrv_pwrite(bs->file,
- refcount_block_offset + (first_index << REFCOUNT_SHIFT),
- &s->refcount_block_cache[first_index], size);
+ ret = blkqueue_pwrite(s->bq_context, refcount_block_offset + (first_index << REFCOUNT_SHIFT), &s->refcount_block_cache[first_index], size);
if (ret < 0) {
return ret;
}
@@ -577,7 +572,7 @@ static int update_cluster_refcount(BlockDriverState *bs,
return ret;
}
- bdrv_flush(bs->file);
+ blkqueue_barrier(s->bq_context);
return get_refcount(bs, cluster_index);
}
@@ -679,7 +674,7 @@ int64_t qcow2_alloc_bytes(BlockDriverState *bs, int size)
}
}
- bdrv_flush(bs->file);
+ blkqueue_barrier(s->bq_context);
return offset;
}
@@ -772,8 +767,7 @@ int qcow2_update_snapshot_refcount(BlockDriverState *bs,
l1_table = NULL;
}
l1_allocated = 1;
- if (bdrv_pread(bs->file, l1_table_offset,
- l1_table, l1_size2) != l1_size2)
+ if (blkqueue_pread(s->bq_context, l1_table_offset, l1_table, l1_size2) < 0)
goto fail;
for(i = 0;i < l1_size; i++)
be64_to_cpus(&l1_table[i]);
@@ -792,7 +786,7 @@ int qcow2_update_snapshot_refcount(BlockDriverState *bs,
old_l2_offset = l2_offset;
l2_offset &= ~QCOW_OFLAG_COPIED;
l2_modified = 0;
- if (bdrv_pread(bs->file, l2_offset, l2_table, l2_size) != l2_size)
+ if (blkqueue_pread(s->bq_context, l2_offset, l2_table, l2_size) < 0)
goto fail;
for(j = 0; j < s->l2_size; j++) {
offset = be64_to_cpu(l2_table[j]);
@@ -813,7 +807,7 @@ int qcow2_update_snapshot_refcount(BlockDriverState *bs,
/* TODO Flushing once for the whole function should
* be enough */
- bdrv_flush(bs->file);
+ blkqueue_barrier(s->bq_context);
}
/* compressed clusters are never modified */
refcount = 2;
@@ -839,9 +833,10 @@ int qcow2_update_snapshot_refcount(BlockDriverState *bs,
}
}
if (l2_modified) {
- if (bdrv_pwrite_sync(bs->file,
+ if (blkqueue_pwrite(s->bq_context,
l2_offset, l2_table, l2_size) < 0)
goto fail;
+ blkqueue_barrier(s->bq_context);
}
if (addend != 0) {
@@ -863,9 +858,10 @@ int qcow2_update_snapshot_refcount(BlockDriverState *bs,
if (l1_modified) {
for(i = 0; i < l1_size; i++)
cpu_to_be64s(&l1_table[i]);
- if (bdrv_pwrite_sync(bs->file, l1_table_offset, l1_table,
+ if (blkqueue_pwrite(s->bq_context, l1_table_offset, l1_table,
l1_size2) < 0)
goto fail;
+ blkqueue_barrier(s->bq_context);
for(i = 0; i < l1_size; i++)
be64_to_cpus(&l1_table[i]);
}
@@ -956,8 +952,9 @@ static int check_refcounts_l2(BlockDriverState *bs, BdrvCheckResult *res,
l2_size = s->l2_size * sizeof(uint64_t);
l2_table = qemu_malloc(l2_size);
- if (bdrv_pread(bs->file, l2_offset, l2_table, l2_size) != l2_size)
+ if (blkqueue_pread(s->bq_context, l2_offset, l2_table, l2_size) < 0) {
goto fail;
+ }
/* Do the actual checks */
for(i = 0; i < s->l2_size; i++) {
@@ -1051,9 +1048,10 @@ static int check_refcounts_l1(BlockDriverState *bs,
l1_table = NULL;
} else {
l1_table = qemu_malloc(l1_size2);
- if (bdrv_pread(bs->file, l1_table_offset,
- l1_table, l1_size2) != l1_size2)
+ if (blkqueue_pread(s->bq_context, l1_table_offset, l1_table, l1_size2) < 0) {
goto fail;
+ }
+
for(i = 0;i < l1_size; i++)
be64_to_cpus(&l1_table[i]);
}
@@ -1127,6 +1125,8 @@ int qcow2_check_refcounts(BlockDriverState *bs, BdrvCheckResult *res)
nb_clusters = size_to_clusters(s, size);
refcount_table = qemu_mallocz(nb_clusters * sizeof(uint16_t));
+ blkqueue_init_context(s->bq_context, s->bq);
+
/* header */
inc_refcounts(bs, res, refcount_table, nb_clusters,
0, s->cluster_size);
diff --git a/block/qcow2.c b/block/qcow2.c
index 537c479..1ad2832 100644
--- a/block/qcow2.c
+++ b/block/qcow2.c
@@ -136,6 +136,21 @@ static int qcow_read_extensions(BlockDriverState *bs, uint64_t start_offset,
return 0;
}
+static bool qcow_blkqueue_error_cb(void *opaque, int ret)
+{
+ BlockDriverState *bs = opaque;
+ BlockErrorAction action = bdrv_get_on_error(bs, 0);
+
+ if ((action == BLOCK_ERR_STOP_ENOSPC && ret == -ENOSPC)
+ || action == BLOCK_ERR_STOP_ANY)
+ {
+ bdrv_mon_event(bs, BDRV_ACTION_STOP, 0);
+ vm_stop(0);
+ return true;
+ }
+
+ return false;
+}
static int qcow_open(BlockDriverState *bs, int flags)
{
@@ -237,6 +252,11 @@ static int qcow_open(BlockDriverState *bs, int flags)
if (qcow2_read_snapshots(bs) < 0)
goto fail;
+ /* Block queue */
+ s->bq = blkqueue_create(bs->file, qcow_blkqueue_error_cb, bs);
+ blkqueue_init_context(&s->initial_bq_context, s->bq);
+ s->bq_context = &s->initial_bq_context;
+
#ifdef DEBUG_ALLOC
qcow2_check_refcounts(bs);
#endif
@@ -341,6 +361,7 @@ typedef struct QCowAIOCB {
QEMUIOVector hd_qiov;
QEMUBH *bh;
QCowL2Meta l2meta;
+ BlockQueueContext bq_context;
QLIST_ENTRY(QCowAIOCB) next_depend;
} QCowAIOCB;
@@ -387,6 +408,8 @@ static void qcow_aio_read_cb(void *opaque, int ret)
BDRVQcowState *s = bs->opaque;
int index_in_cluster, n1;
+ s->bq_context = &acb->bq_context;
+
acb->hd_aiocb = NULL;
if (ret < 0)
goto done;
@@ -519,6 +542,7 @@ static QCowAIOCB *qcow_aio_setup(BlockDriverState *bs,
int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
BlockDriverCompletionFunc *cb, void *opaque, int is_write)
{
+ BDRVQcowState *s = bs->opaque;
QCowAIOCB *acb;
acb = qemu_aio_get(&qcow_aio_pool, bs, cb, opaque);
@@ -536,6 +560,10 @@ static QCowAIOCB *qcow_aio_setup(BlockDriverState *bs,
acb->cluster_offset = 0;
acb->l2meta.nb_clusters = 0;
QLIST_INIT(&acb->l2meta.dependent_requests);
+
+ /* TODO Push the context into l2meta */
+ blkqueue_init_context(&acb->bq_context, s->bq);
+
return acb;
}
@@ -582,6 +610,7 @@ static void qcow_aio_write_cb(void *opaque, int ret)
int index_in_cluster;
int n_end;
+ s->bq_context = &acb->bq_context;
acb->hd_aiocb = NULL;
if (ret >= 0) {
@@ -694,6 +723,7 @@ static void qcow_close(BlockDriverState *bs)
qemu_free(s->cluster_cache);
qemu_free(s->cluster_data);
qcow2_refcount_close(bs);
+ blkqueue_destroy(s->bq);
}
/*
@@ -1150,13 +1180,64 @@ static int qcow_write_compressed(BlockDriverState *bs, int64_t sector_num,
static int qcow_flush(BlockDriverState *bs)
{
+ BDRVQcowState *s = bs->opaque;
+ int ret;
+
+ ret = blkqueue_flush(s->bq);
+ if (ret < 0) {
+ /*
+ * If the queue is empty, we couldn't handle the write error by
+ * stopping the guest. In this case we don't know which metadata writes
+ * have succeeded. Reopen the qcow2 layer to make sure that all caches
+ * are invalidated.
+ */
+ if (blkqueue_is_empty(s->bq)) {
+ qcow_close(bs);
+ qcow_open(bs, 0);
+ }
+
+ return ret;
+ }
+
return bdrv_flush(bs->file);
}
+typedef struct QcowFlushAIOCB {
+ BlockDriverState *bs;
+ BlockDriverCompletionFunc *cb;
+ void *opaque;
+} QcowFlushAIOCB;
+
+static void qcow_aio_flush_cb(void *opaque, int ret)
+{
+ QcowFlushAIOCB *acb = opaque;
+ BlockDriverState *bs = acb->bs;
+ BDRVQcowState *s = bs->opaque;
+
+ if (blkqueue_is_empty(s->bq)) {
+ qcow_close(bs);
+ qcow_open(bs, 0);
+ }
+
+ acb->cb(acb->opaque, ret);
+ qemu_free(acb);
+}
+
static BlockDriverAIOCB *qcow_aio_flush(BlockDriverState *bs,
BlockDriverCompletionFunc *cb, void *opaque)
{
- return bdrv_aio_flush(bs->file, cb, opaque);
+ BDRVQcowState *s = bs->opaque;
+ BlockQueueContext context;
+ QcowFlushAIOCB *acb;
+
+ blkqueue_init_context(&context, s->bq);
+
+ acb = qemu_malloc(sizeof(*acb));
+ acb->bs = bs;
+ acb->cb = cb;
+ acb->opaque = opaque;
+
+ return blkqueue_aio_flush(&context, qcow_aio_flush_cb, acb);
}
static int64_t qcow_vm_state_offset(BDRVQcowState *s)
diff --git a/block/qcow2.h b/block/qcow2.h
index 2d22e5e..2ef910c 100644
--- a/block/qcow2.h
+++ b/block/qcow2.h
@@ -26,6 +26,7 @@
#define BLOCK_QCOW2_H
#include "aes.h"
+#include "block-queue.h"
//#define DEBUG_ALLOC
//#define DEBUG_ALLOC2
@@ -108,6 +109,10 @@ typedef struct BDRVQcowState {
int64_t free_cluster_index;
int64_t free_byte_offset;
+ BlockQueue *bq;
+ BlockQueueContext *bq_context;
+ BlockQueueContext initial_bq_context;
+
uint32_t crypt_method; /* current crypt method, 0 if no key yet */
uint32_t crypt_method_header;
AES_KEY aes_encrypt_key;
diff --git a/cpus.c b/cpus.c
index 91a0fb1..8ec0ed6 100644
--- a/cpus.c
+++ b/cpus.c
@@ -310,9 +310,10 @@ void qemu_notify_event(void)
void qemu_mutex_lock_iothread(void) {}
void qemu_mutex_unlock_iothread(void) {}
-void vm_stop(int reason)
+bool vm_stop(int reason)
{
do_vm_stop(reason);
+ return true;
}
#else /* CONFIG_IOTHREAD */
@@ -848,7 +849,7 @@ static void qemu_system_vmstop_request(int reason)
qemu_notify_event();
}
-void vm_stop(int reason)
+bool vm_stop(int reason)
{
QemuThread me;
qemu_thread_self(&me);
@@ -863,9 +864,10 @@ void vm_stop(int reason)
cpu_exit(cpu_single_env);
cpu_single_env->stop = 1;
}
- return;
+ return true;
}
do_vm_stop(reason);
+ return true;
}
#endif
diff --git a/qemu-common.h b/qemu-common.h
index b3957f1..1c23b0f 100644
--- a/qemu-common.h
+++ b/qemu-common.h
@@ -115,6 +115,9 @@ static inline char *realpath(const char *path, char *resolved_path)
#endif /* !defined(NEED_CPU_H) */
+/* VM state */
+bool vm_stop(int reason);
+
/* bottom halves */
typedef void QEMUBHFunc(void *opaque);
diff --git a/qemu-tool.c b/qemu-tool.c
index 392e1c9..3926435 100644
--- a/qemu-tool.c
+++ b/qemu-tool.c
@@ -111,3 +111,8 @@ int qemu_set_fd_handler2(int fd,
{
return 0;
}
+
+bool vm_stop(int reason)
+{
+ return false;
+}
diff --git a/sysemu.h b/sysemu.h
index b81a70e..77788f1 100644
--- a/sysemu.h
+++ b/sysemu.h
@@ -38,7 +38,6 @@ VMChangeStateEntry *qemu_add_vm_change_state_handler(VMChangeStateHandler *cb,
void qemu_del_vm_change_state_handler(VMChangeStateEntry *e);
void vm_start(void);
-void vm_stop(int reason);
uint64_t ram_bytes_remaining(void);
uint64_t ram_bytes_transferred(void);
--
1.7.2.3
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [Qemu-devel] Re: [RFC PATCH v3 1/4] block: Implement bdrv_aio_pwrite
2010-11-30 12:48 ` [Qemu-devel] [RFC PATCH v3 1/4] block: Implement bdrv_aio_pwrite Kevin Wolf
@ 2010-12-02 12:07 ` Stefan Hajnoczi
2010-12-02 12:30 ` Kevin Wolf
0 siblings, 1 reply; 9+ messages in thread
From: Stefan Hajnoczi @ 2010-12-02 12:07 UTC (permalink / raw)
To: Kevin Wolf; +Cc: qemu-devel
On Tue, Nov 30, 2010 at 12:48 PM, Kevin Wolf <kwolf@redhat.com> wrote:
> This implements an asynchronous version of bdrv_pwrite.
>
> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
> ---
> block.c | 167 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
> block.h | 2 +
> 2 files changed, 169 insertions(+), 0 deletions(-)
Is this function is necessary?
Current synchronous code uses pwrite() so this function makes it easy
to convert existing code. But if that code took the block-based
nature of storage into account then this read-modify-write helper
isn't needed.
I guess what I'm saying is that this function should only be used when
you really need rmw (in many cases with image metadata it can be
avoided because you have enough metadata cached in memory to do full
sector writes). If it turns out we don't need rmw then we can
eliminate this function.
> + switch (acb->state) {
> + case 0: {
> + /* Read first sector if needed */
Please use an enum instead of int literals with comments. Or you
could try separate functions and see if the switch statement really
saves that many lines of code.
> + case 3: {
> + /* Read last sector if needed */
> + if (acb->bytes == 0) {
> + goto done;
> + }
> +
> + acb->state = 4;
> + acb->iov.iov_base = acb->tmp_buf;
acb->tmp_buf may be NULL here if we took the state transition to 2
instead of doing 1.
> +done:
> + qemu_free(acb->tmp_buf);
> + acb->common.cb(acb->common.opaque, ret);
Callback not invoked from a BH. In an error case we might have made
no blocking calls, i.e. never returned and this callback can cause
reentrancy.
> +BlockDriverAIOCB *bdrv_aio_pwrite(BlockDriverState *bs, int64_t offset,
> + void* buf, size_t bytes, BlockDriverCompletionFunc *cb, void *opaque)
> +{
> + PwriteAIOCB *acb;
> +
> + acb = qemu_aio_get(&blkqueue_aio_pool, bs, cb, opaque);
> + acb->state = 0;
> + acb->offset = offset;
> + acb->buf = buf;
> + acb->bytes = bytes;
> + acb->tmp_buf = NULL;
> +
> + bdrv_aio_pwrite_cb(acb, 0);
We're missing the usual !bs->drv, bs->read_only, bdrv_check_request()
checks here. Are we okay to wait until calling
bdrv_aio_readv/bdrv_aio_writev for these checks?
Stefan
^ permalink raw reply [flat|nested] 9+ messages in thread
* [Qemu-devel] Re: [RFC PATCH v3 1/4] block: Implement bdrv_aio_pwrite
2010-12-02 12:07 ` [Qemu-devel] " Stefan Hajnoczi
@ 2010-12-02 12:30 ` Kevin Wolf
2010-12-02 13:04 ` Stefan Hajnoczi
0 siblings, 1 reply; 9+ messages in thread
From: Kevin Wolf @ 2010-12-02 12:30 UTC (permalink / raw)
To: Stefan Hajnoczi; +Cc: qemu-devel
Am 02.12.2010 13:07, schrieb Stefan Hajnoczi:
> On Tue, Nov 30, 2010 at 12:48 PM, Kevin Wolf <kwolf@redhat.com> wrote:
>> This implements an asynchronous version of bdrv_pwrite.
>>
>> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
>> ---
>> block.c | 167 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>> block.h | 2 +
>> 2 files changed, 169 insertions(+), 0 deletions(-)
>
> Is this function is necessary?
>
> Current synchronous code uses pwrite() so this function makes it easy
> to convert existing code. But if that code took the block-based
> nature of storage into account then this read-modify-write helper
> isn't needed.
For qcow2, most writes (refcount tables, L2 tables, etc.) are aligned to
512 byte sectors, but there are still some left that use pwrite with an
unaligned count. I'm not completely sure which data, but qemu-iotests
crashed with tmp_buf == NULL, so there are some ;-) Probably things like
header and snapshot table writes.
I'm not sure what other image formats do (we might want to use
block-queue for them, too, eventually), but usually that means that they
do strange things.
> I guess what I'm saying is that this function should only be used when
> you really need rmw (in many cases with image metadata it can be
> avoided because you have enough metadata cached in memory to do full
> sector writes). If it turns out we don't need rmw then we can
> eliminate this function.
Maybe what we really should do is completely change the block layer
functions to use bytes as their unit and do any RMW in posix-aio-compat
and linux-aio. Other backends don't need it and without O_DIRECT we
don't even need to do it with files.
Also, using units of 512 bytes is completely arbitrary and may still
involve RMW if the host uses a different sector size.
>> + switch (acb->state) {
>> + case 0: {
>> + /* Read first sector if needed */
>
> Please use an enum instead of int literals with comments. Or you
> could try separate functions and see if the switch statement really
> saves that many lines of code.
Okay, will use an enum.
I think the switch may not save that many lines of code, but it improves
readability because with chained functions (and no forward declarations)
you have to read backwards.
>> + case 3: {
>> + /* Read last sector if needed */
>> + if (acb->bytes == 0) {
>> + goto done;
>> + }
>> +
>> + acb->state = 4;
>> + acb->iov.iov_base = acb->tmp_buf;
>
> acb->tmp_buf may be NULL here if we took the state transition to 2
> instead of doing 1.
Yup, is already fixed.
>> +done:
>> + qemu_free(acb->tmp_buf);
>> + acb->common.cb(acb->common.opaque, ret);
>
> Callback not invoked from a BH. In an error case we might have made
> no blocking calls, i.e. never returned and this callback can cause
> reentrancy.
Good point.
>> +BlockDriverAIOCB *bdrv_aio_pwrite(BlockDriverState *bs, int64_t offset,
>> + void* buf, size_t bytes, BlockDriverCompletionFunc *cb, void *opaque)
>> +{
>> + PwriteAIOCB *acb;
>> +
>> + acb = qemu_aio_get(&blkqueue_aio_pool, bs, cb, opaque);
>> + acb->state = 0;
>> + acb->offset = offset;
>> + acb->buf = buf;
>> + acb->bytes = bytes;
>> + acb->tmp_buf = NULL;
>> +
>> + bdrv_aio_pwrite_cb(acb, 0);
>
> We're missing the usual !bs->drv, bs->read_only, bdrv_check_request()
> checks here. Are we okay to wait until calling
> bdrv_aio_readv/bdrv_aio_writev for these checks?
I think we are, but if you prefer, I can copy them here.
Kevin
^ permalink raw reply [flat|nested] 9+ messages in thread
* [Qemu-devel] Re: [RFC PATCH v3 1/4] block: Implement bdrv_aio_pwrite
2010-12-02 12:30 ` Kevin Wolf
@ 2010-12-02 13:04 ` Stefan Hajnoczi
0 siblings, 0 replies; 9+ messages in thread
From: Stefan Hajnoczi @ 2010-12-02 13:04 UTC (permalink / raw)
To: Kevin Wolf; +Cc: qemu-devel
On Thu, Dec 2, 2010 at 12:30 PM, Kevin Wolf <kwolf@redhat.com> wrote:
> Am 02.12.2010 13:07, schrieb Stefan Hajnoczi:
>> On Tue, Nov 30, 2010 at 12:48 PM, Kevin Wolf <kwolf@redhat.com> wrote:
>> I guess what I'm saying is that this function should only be used when
>> you really need rmw (in many cases with image metadata it can be
>> avoided because you have enough metadata cached in memory to do full
>> sector writes). If it turns out we don't need rmw then we can
>> eliminate this function.
>
> Maybe what we really should do is completely change the block layer
> functions to use bytes as their unit and do any RMW in posix-aio-compat
> and linux-aio. Other backends don't need it and without O_DIRECT we
> don't even need to do it with files.
Yeah that sounds like something worth exploring more. Perhaps
together with some input from Christoph on moving QEMU to the native
block size (e.g. 4k on some devices).
>>> +BlockDriverAIOCB *bdrv_aio_pwrite(BlockDriverState *bs, int64_t offset,
>>> + void* buf, size_t bytes, BlockDriverCompletionFunc *cb, void *opaque)
>>> +{
>>> + PwriteAIOCB *acb;
>>> +
>>> + acb = qemu_aio_get(&blkqueue_aio_pool, bs, cb, opaque);
>>> + acb->state = 0;
>>> + acb->offset = offset;
>>> + acb->buf = buf;
>>> + acb->bytes = bytes;
>>> + acb->tmp_buf = NULL;
>>> +
>>> + bdrv_aio_pwrite_cb(acb, 0);
>>
>> We're missing the usual !bs->drv, bs->read_only, bdrv_check_request()
>> checks here. Are we okay to wait until calling
>> bdrv_aio_readv/bdrv_aio_writev for these checks?
>
> I think we are, but if you prefer, I can copy them here.
No, I just wanted to make sure you took them into account. In theory
those error cases won't affect your code and it's fine to wait for
bdrv_aio_readv/bdrv_aio_writev to catch them. I haven't thought
through the cases in detail though.
Stefan
^ permalink raw reply [flat|nested] 9+ messages in thread
* [Qemu-devel] Re: [RFC PATCH v3 2/4] Add block-queue
2010-11-30 12:48 ` [Qemu-devel] [RFC PATCH v3 2/4] Add block-queue Kevin Wolf
@ 2010-12-03 9:44 ` Stefan Hajnoczi
0 siblings, 0 replies; 9+ messages in thread
From: Stefan Hajnoczi @ 2010-12-03 9:44 UTC (permalink / raw)
To: Kevin Wolf; +Cc: qemu-devel
On Tue, Nov 30, 2010 at 12:48 PM, Kevin Wolf <kwolf@redhat.com> wrote:
> +/*
> + * Adds a write request to the queue.
> + */
> +int blkqueue_pwrite(BlockQueueContext *context, uint64_t offset, void *buf,
> + uint64_t size)
> +{
> + BlockQueue *bq = context->bq;
> + BlockQueueRequest *section_req;
> + bool completed;
> +
> + /* Don't use the queue for writethrough images */
> + if ((bq->bs->open_flags & WRITEBACK_MODES) == 0) {
> + return bdrv_pwrite(bq->bs, offset, buf, size);
> + }
> +
> + /* First check if there are any pending writes for the same data. */
> + DPRINTF("-- pwrite: [%#lx + %ld]\n", offset, size);
> + completed = blkqueue_check_queue_overlap(context, &bq->queue, &offset,
> + &buf, &size, &blkqueue_pwrite, &pwrite_handle_overlap,
> + context->section);
> +
> + if (completed) {
> + return 0;
> + }
> +
> + /* Create request structure */
> + BlockQueueRequest *req = qemu_malloc(sizeof(*req));
> + QLIST_INIT(&req->acbs);
> + req->type = REQ_TYPE_WRITE;
> + req->bq = bq;
> + req->offset = offset;
> + req->size = size;
> + req->buf = qemu_malloc(size);
qemu_blockalign()
> +static int insert_barrier(BlockQueueContext *context, BlockQueueAIOCB *acb)
> +{
> + BlockQueue *bq = context->bq;
> + BlockQueueRequest *section_req;
> +
> + bq->barriers_requested++;
> +
> + /* Create request structure */
> + BlockQueueRequest *req = qemu_malloc(sizeof(*req));
> + QLIST_INIT(&req->acbs);
> + req->type = REQ_TYPE_BARRIER;
> + req->bq = bq;
> + req->section = context->section;
> + req->buf = NULL;
> +
> + /* Find another barrier to merge with. */
> + QSIMPLEQ_FOREACH(section_req, &bq->sections, link_section) {
> + if (section_req->section >= req->section) {
> +
> + /*
> + * If acb is set, the intention of the barrier request is to flush
> + * the complete queue and notify the caller when all requests have
> + * been processed. To achieve this, we may only merge with the very
> + * last request in the queue.
> + */
> + if (acb && QTAILQ_NEXT(section_req, link)) {
> + continue;
> + }
> +
> + req->section = section_req->section;
> + context->section = section_req->section + 1;
> + qemu_free(req);
> + req = section_req;
> + goto out;
> + }
> + }
I think the search for an existing barrier should be moved above the
req allocation. It's more work to allocate req, do an unnecessary
req->section = section_req->section, and then free it again.
> +
> + /*
> + * If there wasn't a barrier for the same section yet, insert a new one at
> + * the end.
> + */
> + DPRINTF("queue-ins flush: %p\n", req);
> + QTAILQ_INSERT_TAIL(&bq->queue, req, link);
> + QSIMPLEQ_INSERT_TAIL(&bq->sections, req, link_section);
> + bq->queue_size++;
> + context->section++;
> +
> + bq->barriers_submitted++;
> +
> + /*
> + * At this point, req is either the newly inserted request, or a previously
> + * existing barrier with which the current request has been merged.
> + *
> + * Insert the ACB in the list of that request so that the callback is
> + * called when the request has completed.
> + */
> +out:
> + if (acb) {
> + QLIST_INSERT_HEAD(&req->acbs, acb, link);
Is there a reason to insert at the head and not append to the tail?
Preserving order is usually good.
> +/*
> + * If there are any blkqueue_aio_flush callbacks pending, call them with ret
> + * as the error code and remove them from the queue.
> + *
> + * If keep_queue is false, all requests are removed from the queue
> + */
> +static void blkqueue_fail_flush(BlockQueue *bq, int ret, bool keep_queue)
> +{
> + BlockQueueRequest *req, *next_req;
> + BlockQueueAIOCB *acb, *next_acb;
> +
> + QTAILQ_FOREACH_SAFE(req, &bq->queue, link, next_req) {
> +
> + /* Call and remove registered callbacks */
> + QLIST_FOREACH_SAFE(acb, &req->acbs, link, next_acb) {
> + acb->common.cb(acb->common.opaque, ret);
> + qemu_free(acb);
qemu_aio_release()
> + }
> + QLIST_INIT(&req->acbs);
> +
> + /* If requested, remove the request itself */
> + if (!keep_queue) {
> + QTAILQ_REMOVE(&bq->queue, req, link);
bq->queue_size--;
> + if (req->type == REQ_TYPE_BARRIER) {
> + QSIMPLEQ_REMOVE(&bq->sections, req, BlockQueueRequest,
> + link_section);
> + }
Now free the request?
> + }
> + }
> +
> + /* Make sure that blkqueue_flush stops running */
> + bq->flushing = ret;
> +}
> +
> +static void blkqueue_process_request_cb(void *opaque, int ret)
> +{
> + BlockQueueRequest *req = opaque;
> + BlockQueue *bq = req->bq;
> + BlockQueueAIOCB *acb, *next;
> +
> + DPRINTF(" done req: %p [%#lx + %ld]\n", req, req->offset, req->size);
> +
> + /* Remove from in-flight list */
> + QTAILQ_REMOVE(&bq->in_flight, req, link);
> + bq->in_flight_num--;
> +
> + /*
> + * Error handling gets a bit complicated, because we have already completed
> + * the requests that went wrong. There are two ways of dealing with this:
> + *
> + * 1. With werror=stop we can put the request back into the queue and stop
> + * the VM. When the user continues the VM, the request is retried.
> + *
> + * 2. In other cases we need to return an error on the next bdrv_flush. The
> + * caller must cope with the fact that he doesn't know which of the
> + * requests succeeded (i.e. invalidate all caches)
> + *
> + * If we're in an blkqueue_aio_flush, we must return an error in both
> + * cases. If we stop the VM, we can clear bq->errno immediately again.
> + * Otherwise, it's cleared in bdrv_(aio_)flush.
> + */
> + if (ret < 0) {
> + if (bq->error_ret != -ENOSPC) {
> + bq->error_ret = ret;
> + }
> + }
> +
> + /* Call any callbacks attached to the request (see blkqueue_aio_flush) */
> + QLIST_FOREACH_SAFE(acb, &req->acbs, link, next) {
> + acb->common.cb(acb->common.opaque, bq->error_ret);
> + qemu_free(acb);
qemu_aio_release()
> + }
> + QLIST_INIT(&req->acbs);
> +
> + /* Handle errors in the VM stop case */
> + if (ret < 0) {
> + bool keep_queue = bq->error_handler(bq->error_opaque, ret);
> +
> + /* Fail any flushes that may wait for the queue to become empty */
> + blkqueue_fail_flush(bq, bq->error_ret, keep_queue);
> +
> + if (keep_queue) {
> + /* Reinsert request into the queue */
> + QTAILQ_INSERT_HEAD(&bq->queue, req, link);
> + if (req->type == REQ_TYPE_BARRIER) {
> + QSIMPLEQ_INSERT_HEAD(&bq->sections, req, link_section);
> + }
> +
> + /* Clear the error to restore a normal state after 'cont' */
> + bq->error_ret = 0;
> + return;
> + }
> + }
> +
> + /* Cleanup */
> + blkqueue_free_request(req);
> +
> + /* Check if there are more requests to submit */
> + blkqueue_process_request(bq);
> +}
> +
> +/*
> + * Checks if the first request on the queue can run. If so, remove it from the
> + * queue, submit the request and put it onto the queue of in-flight requests.
> + *
> + * Returns 0 if a request has been submitted, -1 if no request can run or an
> + * error has occurred.
> + */
If we want specific error codes:
-EAGAIN no request can run
-EINVAL, -EIO, ... specific errors
> +static int blkqueue_submit_request(BlockQueue *bq)
> +{
> + BlockDriverAIOCB *acb;
> + BlockQueueRequest *req;
> +
> + /*
> + * If we had an error, we must not submit new requests from another
> + * section or may we get ordering problems. In fact, not submitting any new
> + * requests looks like a good idea in this case.
> + */
> + if (bq->error_ret) {
> + return -1;
> + }
> +
> + /* Fetch a request */
> + req = QTAILQ_FIRST(&bq->queue);
> + if (req == NULL) {
> + return -1;
> + }
> +
> + /* Writethrough images aren't supposed to have any queue entries */
> + assert((bq->bs->open_flags & WRITEBACK_MODES) != 0);
> +
> + /*
> + * We need to wait for completion before we can submit new requests:
> + * 1. If we're currently processing a barrier, or the new request is a
> + * barrier, we need to guarantee this barrier semantics.
> + * 2. We must make sure that newer writes cannot pass older ones.
> + */
> + if (bq->in_flight_num > 0) {
I don't understand why we refuse to do more work on in_flight_num > 0.
This function increments in_flight_num which means
blkqueue_process_request() will only ever submit one request at a
time? Why does blkqueue_process_request() loop then? I'm missing
something.
> + return -1;
> + }
> +
> + /* Process barriers only if the queue is long enough */
> + if (!bq->flushing) {
> + if (req->type == REQ_TYPE_BARRIER && bq->queue_size < 50) {
> + return -1;
> + }
Not sure about this. Need to think about the patch more but this
check looks like it could have consequences like starvation. I guess
that's why you check for !bq->flushing.
> +static void blkqueue_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> + BlockQueueAIOCB *acb = (BlockQueueAIOCB*) blockacb;
> +
> + /*
> + * We can't cancel the flush any more, but that doesn't hurt. We just
> + * need to make sure that we don't call the callback when it completes.
> + */
> + QLIST_REMOVE(acb, link);
> + qemu_free(acb);
qemu_aio_release()
> +}
> +
> +/*
> + * Inserts a barrier at the end of the queue (or merges with an existing
> + * barrier there). Once the barrier has completed, the callback is called.
> + */
> +BlockDriverAIOCB* blkqueue_aio_flush(BlockQueueContext *context,
> + BlockDriverCompletionFunc *cb, void *opaque)
> +{
> + BlockQueueAIOCB *acb;
> + BlockDriverState *bs = context->bq->bs;
> + int ret;
> +
> + /* Don't use the queue for writethrough images */
> + if ((bs->open_flags & WRITEBACK_MODES) == 0) {
> + return bdrv_aio_flush(bs, cb, opaque);
> + }
> +
> + /* Insert a barrier into the queue */
> + acb = qemu_aio_get(&blkqueue_aio_pool, NULL, cb, opaque);
> +
> + ret = insert_barrier(context, acb);
> + if (ret < 0) {
This return path is broken:
> + cb(opaque, ret);
Missing BH.
> + qemu_free(acb);
qemu_aio_release(acb);
If we want to invoke cb (via a BH) then we shouldn't release acb. If
we do release it we need to return NULL but shouldn't invoke cb.
> + }
> +
> + return &acb->common;
> +}
> +
> +/*
> + * Flushes the queue (i.e. disables waiting for new requests to be batched) and
> + * waits until all requests in the queue have completed.
> + *
> + * Note that unlike blkqueue_aio_flush this does not call bdrv_flush().
> + */
> +int blkqueue_flush(BlockQueue *bq)
> +{
> + int res = 0;
> +
> + bq->flushing = 1;
> +
> + /* Process any left over requests */
> + while ((bq->flushing > 0) &&
> + (bq->in_flight_num || QTAILQ_FIRST(&bq->queue)))
> + {
> + blkqueue_process_request(bq);
> + qemu_aio_wait();
> + }
> +
> + /*
> + * bq->flushing contains the error if it could be handled by stopping the
> + * VM, error_ret contains it if we're not allowed to do this.
> + */
> + if (bq->error_ret < 0) {
> + res = bq->error_ret;
> +
> + /*
> + * Wait for AIO requests, so that the queue is really unused after
> + * blkqueue_flush() and the caller can destroy it
> + */
> + if (res < 0) {
We already know bq->error_ret < 0. This comparison is always true.
Stefan
^ permalink raw reply [flat|nested] 9+ messages in thread
end of thread, other threads:[~2010-12-03 9:44 UTC | newest]
Thread overview: 9+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2010-11-30 12:48 [Qemu-devel] [RFC PATCH v3 0/4] block-queue: Delay and batch metadata writes Kevin Wolf
2010-11-30 12:48 ` [Qemu-devel] [RFC PATCH v3 1/4] block: Implement bdrv_aio_pwrite Kevin Wolf
2010-12-02 12:07 ` [Qemu-devel] " Stefan Hajnoczi
2010-12-02 12:30 ` Kevin Wolf
2010-12-02 13:04 ` Stefan Hajnoczi
2010-11-30 12:48 ` [Qemu-devel] [RFC PATCH v3 2/4] Add block-queue Kevin Wolf
2010-12-03 9:44 ` [Qemu-devel] " Stefan Hajnoczi
2010-11-30 12:48 ` [Qemu-devel] [RFC PATCH v3 3/4] Test cases for block-queue Kevin Wolf
2010-11-30 12:48 ` [Qemu-devel] [RFC PATCH v3 4/4] qcow2: Preliminary block-queue support Kevin Wolf
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).