From: Anthony Liguori <anthony@codemonkey.ws>
To: Kevin Wolf <kwolf@redhat.com>
Cc: qemu-devel@nongnu.org
Subject: Re: [Qemu-devel] [RFC] block-queue: Delay and batch metadata writes
Date: Mon, 20 Sep 2010 09:31:04 -0500 [thread overview]
Message-ID: <4C977028.3050602@codemonkey.ws> (raw)
In-Reply-To: <1284991010-10951-1-git-send-email-kwolf@redhat.com>
On 09/20/2010 08:56 AM, Kevin Wolf wrote:
> I won't get this ready until I leave for vacation on Wednesday, so I thought I
> could just as well post it as an RFC in this state.
>
> With this patch applied, qcow2 doesn't directly access the image file any more
> for metadata, but rather goes through the newly introduced blkqueue. Write
> and sync requests are queued there and executed in a separate worker thread.
> Reads consider the contents of the queue before accessing the the image file.
>
> What makes this interesting is that we can delay syncs and if multiple syncs
> occur, we can merge them into one bdrv_flush.
>
> A typical sequence in qcow2 (simple cluster allocation) looks like this:
>
> 1. Update refcount table
> 2. bdrv_flush
> 3. Update L2 entry
>
Let's expand it a bit more:
1. Update refcount table
2. bdrv_flush
3. Update L2 entry
4. Write data to disk
5. Report write complete
I'm struggling to understand how a thread helps out.
If you run 1-3 in a thread, you need to inject a barrier between steps 3
and 5 or you'll report the write complete before writing the metadata
out. You can't delay completing step 3 until a guest requests a flush.
If you do, then you're implementing a writeback cache for metadata.
If you're comfortable with a writeback cache for metadata, then you
should also be comfortable with a writeback cache for data in which
case, cache=writeback is the answer.
If it's a matter of batching, batching can't occur if you have a barrier
between steps 3 and 5. The only way you can get batching is by doing a
writeback cache for the metadata such that you can complete your request
before the metadata is written.
Am I misunderstanding the idea?
Regards,
Anthony Liguori
> If we delay the operation and get three of these sequences queued before
> actually executing, we end up with the following result, saving two syncs:
>
> 1. Update refcount table (req 1)
> 2. Update refcount table (req 2)
> 3. Update refcount table (req 3)
> 4. bdrv_flush
> 5. Update L2 entry (req 1)
> 6. Update L2 entry (req 2)
> 7. Update L2 entry (req 3)
>
> This patch only commits a sync if either the guests has requested a flush or if
> a certain number of requests in the queue, so usually we batch more than just
> three requests.
>
> I didn't run any detailed benchmarks but just tried what happens with
> installation time of a Fedora 13 guest, and while git master takes about 40-50%
> longer than before the metadata syncs, we get most of it back with blkqueue.
>
> Of course, in this state the code is not correct, but it's correct enough to
> try and have qcow2 run on a file backend. Some remaining problems are:
>
> - There's no locking between the worker thread and other functions accessing
> the same backend driver. Should be fine for file, but probably not for other
> backends.
>
> - Error handling doesn't really exist. If something goes wrong with writing
> metadata we can't fail the guest request any more because it's long
> completed. Losing this data is actually okay, the guest hasn't flushed yet.
>
> However, we need to be able to fail a flush, and we also need some way to
> handle errors transparently. This probably means that we have to stop the VM
> and let the user fix things so that we can retry. The only other way would be
> to shut down the VM and end up in the same situation as with a host crash.
>
> Or maybe it would even be enough to start failing all new requests.
>
> - The Makefile integration is obviously very wrong, too. It worked for me good
> enough, but you need to be aware when block-queue.o is compiled with
> RUN_TESTS and when it isn't. The tests need to be split out properly.
>
> They are certainly fixable and shouldn't have any major impact on performance,
> so that's just a matter of doing it.
>
> Kevin
>
> ---
> Makefile | 3 +
> Makefile.objs | 1 +
> block-queue.c | 720 ++++++++++++++++++++++++++++++++++++++++++++++++
> block-queue.h | 49 ++++
> block/qcow2-cluster.c | 28 +-
> block/qcow2-refcount.c | 44 ++--
> block/qcow2.c | 14 +
> block/qcow2.h | 4 +
> qemu-thread.c | 13 +
> qemu-thread.h | 1 +
> 10 files changed, 838 insertions(+), 39 deletions(-)
> create mode 100644 block-queue.c
> create mode 100644 block-queue.h
>
> diff --git a/Makefile b/Makefile
> index ab91d42..0202dc6 100644
> --- a/Makefile
> +++ b/Makefile
> @@ -125,6 +125,9 @@ qemu-nbd$(EXESUF): qemu-nbd.o qemu-tool.o qemu-error.o $(trace-obj-y) $(block-ob
>
> qemu-io$(EXESUF): qemu-io.o cmd.o qemu-tool.o qemu-error.o $(trace-obj-y) $(block-obj-y) $(qobject-obj-y)
>
> +block-queue$(EXESUF): QEMU_CFLAGS += -DRUN_TESTS
> +block-queue$(EXESUF): qemu-tool.o qemu-error.o qemu-thread.o $(block-obj-y) $(qobject-obj-y)
> +
> qemu-img-cmds.h: $(SRC_PATH)/qemu-img-cmds.hx
> $(call quiet-command,sh $(SRC_PATH)/hxtool -h< $< > $@," GEN $@")
>
> diff --git a/Makefile.objs b/Makefile.objs
> index 3ef6d80..e97a246 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -9,6 +9,7 @@ qobject-obj-y += qerror.o
>
> block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
> block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
> +block-obj-y += qemu-thread.o block-queue.o
> block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
> block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>
> diff --git a/block-queue.c b/block-queue.c
> new file mode 100644
> index 0000000..13579a7
> --- /dev/null
> +++ b/block-queue.c
> @@ -0,0 +1,720 @@
> +/*
> + * QEMU System Emulator
> + *
> + * Copyright (c) 2010 Kevin Wolf<kwolf@redhat.com>
> + *
> + * Permission is hereby granted, free of charge, to any person obtaining a copy
> + * of this software and associated documentation files (the "Software"), to deal
> + * in the Software without restriction, including without limitation the rights
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
> + * copies of the Software, and to permit persons to whom the Software is
> + * furnished to do so, subject to the following conditions:
> + *
> + * The above copyright notice and this permission notice shall be included in
> + * all copies or substantial portions of the Software.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
> + * THE SOFTWARE.
> + */
> +
> +#include<signal.h>
> +
> +#include "qemu-common.h"
> +#include "qemu-queue.h"
> +#include "qemu-thread.h"
> +#include "qemu-barrier.h"
> +#include "block.h"
> +#include "block-queue.h"
> +
> +enum blkqueue_req_type {
> + REQ_TYPE_WRITE,
> + REQ_TYPE_BARRIER,
> +};
> +
> +typedef struct BlockQueueRequest {
> + enum blkqueue_req_type type;
> +
> + uint64_t offset;
> + void* buf;
> + uint64_t size;
> + unsigned section;
> +
> + QTAILQ_ENTRY(BlockQueueRequest) link;
> + QSIMPLEQ_ENTRY(BlockQueueRequest) link_section;
> +} BlockQueueRequest;
> +
> +struct BlockQueue {
> + BlockDriverState* bs;
> +
> + QemuThread thread;
> + bool thread_done;
> + QemuMutex lock;
> + QemuMutex flush_lock;
> + QemuCond cond;
> +
> + int barriers_requested;
> + int barriers_submitted;
> + int queue_size;
> +
> + QTAILQ_HEAD(bq_queue_head, BlockQueueRequest) queue;
> + QSIMPLEQ_HEAD(, BlockQueueRequest) sections;
> +};
> +
> +static void *blkqueue_thread(void *bq);
> +
> +BlockQueue *blkqueue_create(BlockDriverState *bs)
> +{
> + BlockQueue *bq = qemu_mallocz(sizeof(BlockQueue));
> + bq->bs = bs;
> +
> + QTAILQ_INIT(&bq->queue);
> + QSIMPLEQ_INIT(&bq->sections);
> +
> + qemu_mutex_init(&bq->lock);
> + qemu_mutex_init(&bq->flush_lock);
> + qemu_cond_init(&bq->cond);
> +
> + bq->thread_done = false;
> + qemu_thread_create(&bq->thread, blkqueue_thread, bq);
> +
> + return bq;
> +}
> +
> +void blkqueue_init_context(BlockQueueContext* context, BlockQueue *bq)
> +{
> + context->bq = bq;
> + context->section = 0;
> +}
> +
> +void blkqueue_destroy(BlockQueue *bq)
> +{
> + bq->thread_done = true;
> + qemu_cond_signal(&bq->cond);
> + qemu_thread_join(&bq->thread);
> +
> + blkqueue_flush(bq);
> +
> + fprintf(stderr, "blkqueue_destroy: %d/%d barriers left\n",
> + bq->barriers_submitted, bq->barriers_requested);
> +
> + qemu_mutex_destroy(&bq->lock);
> + qemu_mutex_destroy(&bq->flush_lock);
> + qemu_cond_destroy(&bq->cond);
> +
> + assert(QTAILQ_FIRST(&bq->queue) == NULL);
> + assert(QSIMPLEQ_FIRST(&bq->sections) == NULL);
> + qemu_free(bq);
> +}
> +
> +int blkqueue_pread(BlockQueueContext *context, uint64_t offset, void *buf,
> + uint64_t size)
> +{
> + BlockQueue *bq = context->bq;
> + BlockQueueRequest *req;
> + int ret;
> +
> + /*
> + * First check if there are any pending writes for the same data. Reverse
> + * order to return data written by the latest write.
> + */
> + QTAILQ_FOREACH_REVERSE(req,&bq->queue, bq_queue_head, link) {
> + uint64_t end = offset + size;
> + uint64_t req_end = req->offset + req->size;
> + uint8_t *read_buf = buf;
> + uint8_t *req_buf = req->buf;
> +
> + /* We're only interested in queued writes */
> + if (req->type != REQ_TYPE_WRITE) {
> + continue;
> + }
> +
> + /*
> + * If we read from a write in the queue (i.e. our read overlaps the
> + * write request), our next write probably depends on this write, so
> + * let's move forward to its section.
> + */
> + if (end> req->offset&& offset< req_end) {
> + context->section = MAX(context->section, req->section);
> + }
> +
> + /* How we continue, depends on the kind of overlap we have */
> + if ((offset>= req->offset)&& (end<= req_end)) {
> + /* Completely contained in the write request */
> + memcpy(buf,&req_buf[offset - req->offset], size);
> + return 0;
> + } else if ((end>= req->offset)&& (end<= req_end)) {
> + /* Overlap in the end of the read request */
> + assert(offset< req->offset);
> + memcpy(&read_buf[req->offset - offset], req_buf, end - req->offset);
> + size = req->offset - offset;
> + } else if ((offset>= req->offset)&& (offset< req_end)) {
> + /* Overlap in the start of the read request */
> + assert(end> req_end);
> + memcpy(read_buf,&req_buf[offset - req->offset], req_end - offset);
> + buf = read_buf =&read_buf[req_end - offset];
> + offset = req_end;
> + size = end - req_end;
> + } else if ((req->offset>= offset)&& (req_end<= end)) {
> + /*
> + * The write request is completely contained in the read request.
> + * memcpy the data from the write request here, continue with the
> + * data before the write request and handle the data after the
> + * write request with a recursive call.
> + */
> + memcpy(&read_buf[req->offset - offset], req_buf, req_end - req->offset);
> + size = req->offset - offset;
> + blkqueue_pread(context, req_end,&read_buf[req_end - offset], end - req_end);
> + }
> + }
> +
> + /* The requested is not written in the queue, read it from disk */
> + ret = bdrv_pread(bq->bs, offset, buf, size);
> + if (ret< 0) {
> + return ret;
> + }
> +
> + return 0;
> +}
> +
> +int blkqueue_pwrite(BlockQueueContext *context, uint64_t offset, void *buf,
> + uint64_t size)
> +{
> + BlockQueue *bq = context->bq;
> + BlockQueueRequest *section_req;
> +
> + /* Create request structure */
> + BlockQueueRequest *req = qemu_malloc(sizeof(*req));
> + req->type = REQ_TYPE_WRITE;
> + req->offset = offset;
> + req->size = size;
> + req->buf = qemu_malloc(size);
> + req->section = context->section;
> + memcpy(req->buf, buf, size);
> +
> + /*
> + * Find the right place to insert it into the queue:
> + * Right before the barrier that closes the current section.
> + */
> + qemu_mutex_lock(&bq->lock);
> + QSIMPLEQ_FOREACH(section_req,&bq->sections, link_section) {
> + if (section_req->section>= req->section) {
> + req->section = section_req->section;
> + context->section = section_req->section;
> + QTAILQ_INSERT_BEFORE(section_req, req, link);
> + bq->queue_size++;
> + goto out;
> + }
> + }
> +
> + /* If there was no barrier, just put it at the end. */
> + QTAILQ_INSERT_TAIL(&bq->queue, req, link);
> + bq->queue_size++;
> + qemu_cond_signal(&bq->cond);
> +
> +out:
> + qemu_mutex_unlock(&bq->lock);
> + return 0;
> +}
> +
> +int blkqueue_barrier(BlockQueueContext *context)
> +{
> + BlockQueue *bq = context->bq;
> + BlockQueueRequest *section_req;
> +
> + bq->barriers_requested++;
> +
> + /* Create request structure */
> + BlockQueueRequest *req = qemu_malloc(sizeof(*req));
> + req->type = REQ_TYPE_BARRIER;
> + req->section = context->section;
> + req->buf = NULL;
> +
> + /* Find another barrier to merge with. */
> + qemu_mutex_lock(&bq->lock);
> + QSIMPLEQ_FOREACH(section_req,&bq->sections, link_section) {
> + if (section_req->section>= req->section) {
> + req->section = section_req->section;
> + context->section = section_req->section + 1;
> + qemu_free(req);
> + goto out;
> + }
> + }
> +
> + /*
> + * If there wasn't a barrier for the same section yet, insert a new one at
> + * the end.
> + */
> + QTAILQ_INSERT_TAIL(&bq->queue, req, link);
> + QSIMPLEQ_INSERT_TAIL(&bq->sections, req, link_section);
> + bq->queue_size++;
> + context->section++;
> + qemu_cond_signal(&bq->cond);
> +
> + bq->barriers_submitted++;
> +
> +out:
> + qemu_mutex_unlock(&bq->lock);
> + return 0;
> +}
> +
> +/*
> + * Caller needs to hold the bq->lock mutex
> + */
> +static BlockQueueRequest *blkqueue_pop(BlockQueue *bq)
> +{
> + BlockQueueRequest *req;
> +
> + req = QTAILQ_FIRST(&bq->queue);
> + if (req == NULL) {
> + goto out;
> + }
> +
> + QTAILQ_REMOVE(&bq->queue, req, link);
> + bq->queue_size--;
> +
> + if (req->type == REQ_TYPE_BARRIER) {
> + assert(QSIMPLEQ_FIRST(&bq->sections) == req);
> + QSIMPLEQ_REMOVE_HEAD(&bq->sections, link_section);
> + }
> +
> +out:
> + return req;
> +}
> +
> +static void blkqueue_free_request(BlockQueueRequest *req)
> +{
> + qemu_free(req->buf);
> + qemu_free(req);
> +}
> +
> +static void blkqueue_process_request(BlockQueue *bq)
> +{
> + BlockQueueRequest *req;
> + BlockQueueRequest *req2;
> + int ret;
> +
> + /*
> + * Note that we leave the request in the queue while we process it. No
> + * other request will be queued before this one and we have only one thread
> + * that processes the queue, so afterwards it will still be the first
> + * request. (Not true for barriers in the first position, but we can handle
> + * that)
> + */
> + req = QTAILQ_FIRST(&bq->queue);
> + if (req == NULL) {
> + return;
> + }
> +
> + switch (req->type) {
> + case REQ_TYPE_WRITE:
> + ret = bdrv_pwrite(bq->bs, req->offset, req->buf, req->size);
> + if (ret< 0) {
> + /* TODO Error reporting! */
> + return;
> + }
> + break;
> + case REQ_TYPE_BARRIER:
> + bdrv_flush(bq->bs);
> + break;
> + }
> +
> + /*
> + * Only remove the request from the queue when it's written, so that reads
> + * always access the right data.
> + */
> + qemu_mutex_lock(&bq->lock);
> + req2 = QTAILQ_FIRST(&bq->queue);
> + if (req == req2) {
> + blkqueue_pop(bq);
> + blkqueue_free_request(req);
> + } else {
> + /*
> + * If it's a barrier and something has been queued before it, just
> + * leave it in the queue and flush once again later.
> + */
> + assert(req->type == REQ_TYPE_BARRIER);
> + bq->barriers_submitted++;
> + }
> + qemu_mutex_unlock(&bq->lock);
> +}
> +
> +struct blkqueue_flush_aiocb {
> + BlockQueue *bq;
> + BlockDriverCompletionFunc *cb;
> + void *opaque;
> +};
> +
> +static void *blkqueue_aio_flush_thread(void *opaque)
> +{
> + struct blkqueue_flush_aiocb *acb = opaque;
> +
> + /* Process any left over requests */
> + blkqueue_flush(acb->bq);
> +
> + acb->cb(acb->opaque, 0);
> + qemu_free(acb);
> +
> + return NULL;
> +}
> +
> +void blkqueue_aio_flush(BlockQueue *bq, BlockDriverCompletionFunc *cb,
> + void *opaque)
> +{
> + struct blkqueue_flush_aiocb *acb;
> +
> + acb = qemu_malloc(sizeof(*acb));
> + acb->bq = bq;
> + acb->cb = cb;
> + acb->opaque = opaque;
> +
> + qemu_thread_create(NULL, blkqueue_aio_flush_thread, acb);
> +}
> +
> +void blkqueue_flush(BlockQueue *bq)
> +{
> + qemu_mutex_lock(&bq->flush_lock);
> +
> + /* Process any left over requests */
> + while (QTAILQ_FIRST(&bq->queue)) {
> + blkqueue_process_request(bq);
> + }
> +
> + qemu_mutex_unlock(&bq->flush_lock);
> +}
> +
> +static void *blkqueue_thread(void *_bq)
> +{
> + BlockQueue *bq = _bq;
> +#ifndef RUN_TESTS
> + BlockQueueRequest *req;
> +#endif
> +
> + qemu_mutex_lock(&bq->flush_lock);
> + while (!bq->thread_done) {
> + barrier();
> +#ifndef RUN_TESTS
> + req = QTAILQ_FIRST(&bq->queue);
> +
> + /* Don't process barriers, we only do that on flushes */
> + if (req&& (req->type != REQ_TYPE_BARRIER || bq->queue_size> 42)) {
> + blkqueue_process_request(bq);
> + } else {
> + qemu_cond_wait(&bq->cond,&bq->flush_lock);
> + }
> +#else
> + qemu_cond_wait(&bq->cond,&bq->flush_lock);
> +#endif
> + }
> + qemu_mutex_unlock(&bq->flush_lock);
> +
> + return NULL;
> +}
> +
> +#ifdef RUN_TESTS
> +
> +#define CHECK_WRITE(req, _offset, _size, _buf, _section) \
> + do { \
> + assert(req != NULL); \
> + assert(req->type == REQ_TYPE_WRITE); \
> + assert(req->offset == _offset); \
> + assert(req->size == _size); \
> + assert(req->section == _section); \
> + assert(!memcmp(req->buf, _buf, _size)); \
> + } while(0)
> +
> +#define CHECK_BARRIER(req, _section) \
> + do { \
> + assert(req != NULL); \
> + assert(req->type == REQ_TYPE_BARRIER); \
> + assert(req->section == _section); \
> + } while(0)
> +
> +#define CHECK_READ(_context, _offset, _buf, _size, _cmpbuf) \
> + do { \
> + int ret; \
> + memset(buf, 0, 512); \
> + ret = blkqueue_pread(_context, _offset, _buf, _size); \
> + assert(ret == 0); \
> + assert(!memcmp(_cmpbuf, _buf, _size)); \
> + } while(0)
> +
> +#define QUEUE_WRITE(_context, _offset, _buf, _size, _pattern) \
> + do { \
> + int ret; \
> + memset(_buf, _pattern, _size); \
> + ret = blkqueue_pwrite(_context, _offset, _buf, _size); \
> + assert(ret == 0); \
> + } while(0)
> +#define QUEUE_BARRIER(_context) \
> + do { \
> + int ret; \
> + ret = blkqueue_barrier(_context); \
> + assert(ret == 0); \
> + } while(0)
> +
> +#define POP_CHECK_WRITE(_bq, _offset, _buf, _size, _pattern, _section) \
> + do { \
> + BlockQueueRequest *req; \
> + memset(_buf, _pattern, _size); \
> + req = blkqueue_pop(_bq); \
> + CHECK_WRITE(req, _offset, _size, _buf, _section); \
> + blkqueue_free_request(req); \
> + } while(0)
> +#define POP_CHECK_BARRIER(_bq, _section) \
> + do { \
> + BlockQueueRequest *req; \
> + req = blkqueue_pop(_bq); \
> + CHECK_BARRIER(req, _section); \
> + blkqueue_free_request(req); \
> + } while(0)
> +
> +static void __attribute__((used)) dump_queue(BlockQueue *bq)
> +{
> + BlockQueueRequest *req;
> +
> + fprintf(stderr, "--- Queue dump ---\n");
> + QTAILQ_FOREACH(req,&bq->queue, link) {
> + fprintf(stderr, "[%d] ", req->section);
> + if (req->type == REQ_TYPE_WRITE) {
> + fprintf(stderr, "Write off=%5"PRId64", len=%5"PRId64", buf=%p\n",
> + req->offset, req->size, req->buf);
> + } else if (req->type == REQ_TYPE_BARRIER) {
> + fprintf(stderr, "Barrier\n");
> + } else {
> + fprintf(stderr, "Unknown type %d\n", req->type);
> + }
> + }
> +}
> +
> +static void test_basic(BlockDriverState *bs)
> +{
> + uint8_t buf[512];
> + BlockQueue *bq;
> + BlockQueueContext context;
> +
> + bq = blkqueue_create(bs);
> + blkqueue_init_context(&context, bq);
> +
> + /* Queue requests */
> + QUEUE_WRITE(&context, 0, buf, 512, 0x12);
> + QUEUE_WRITE(&context, 512, buf, 42, 0x34);
> + QUEUE_BARRIER(&context);
> + QUEUE_WRITE(&context, 678, buf, 42, 0x56);
> +
> + /* Verify queue contents */
> + POP_CHECK_WRITE(bq, 0, buf, 512, 0x12, 0);
> + POP_CHECK_WRITE(bq, 512, buf, 42, 0x34, 0);
> + POP_CHECK_BARRIER(bq, 0);
> + POP_CHECK_WRITE(bq, 678, buf, 42, 0x56, 1);
> +
> + blkqueue_destroy(bq);
> +}
> +
> +static void test_merge(BlockDriverState *bs)
> +{
> + uint8_t buf[512];
> + BlockQueue *bq;
> + BlockQueueContext ctx1, ctx2;
> +
> + bq = blkqueue_create(bs);
> + blkqueue_init_context(&ctx1, bq);
> + blkqueue_init_context(&ctx2, bq);
> +
> + /* Queue requests */
> + QUEUE_WRITE(&ctx1, 0, buf, 512, 0x12);
> + QUEUE_BARRIER(&ctx1);
> + QUEUE_WRITE(&ctx2, 512, buf, 42, 0x34);
> + QUEUE_WRITE(&ctx1, 1024, buf, 512, 0x12);
> + QUEUE_BARRIER(&ctx2);
> + QUEUE_WRITE(&ctx2, 1512, buf, 42, 0x34);
> +
> + /* Verify queue contents */
> + POP_CHECK_WRITE(bq, 0, buf, 512, 0x12, 0);
> + POP_CHECK_WRITE(bq, 512, buf, 42, 0x34, 0);
> + POP_CHECK_BARRIER(bq, 0);
> + POP_CHECK_WRITE(bq, 1024, buf, 512, 0x12, 1);
> + POP_CHECK_WRITE(bq, 1512, buf, 42, 0x34, 1);
> +
> + /* Same queue, new contexts */
> + blkqueue_init_context(&ctx1, bq);
> + blkqueue_init_context(&ctx2, bq);
> +
> + /* Queue requests */
> + QUEUE_BARRIER(&ctx2);
> + QUEUE_WRITE(&ctx2, 512, buf, 42, 0x34);
> + QUEUE_WRITE(&ctx2, 12, buf, 20, 0x45);
> + QUEUE_BARRIER(&ctx2);
> + QUEUE_WRITE(&ctx2, 892, buf, 142, 0x56);
> +
> + QUEUE_WRITE(&ctx1, 0, buf, 8, 0x12);
> + QUEUE_BARRIER(&ctx1);
> + QUEUE_WRITE(&ctx1, 1024, buf, 512, 0x12);
> + QUEUE_BARRIER(&ctx1);
> + QUEUE_WRITE(&ctx1, 1512, buf, 42, 0x34);
> + QUEUE_BARRIER(&ctx1);
> +
> + /* Verify queue contents */
> + POP_CHECK_WRITE(bq, 0, buf, 8, 0x12, 0);
> + POP_CHECK_BARRIER(bq, 0);
> + POP_CHECK_WRITE(bq, 512, buf, 42, 0x34, 1);
> + POP_CHECK_WRITE(bq, 12, buf, 20, 0x45, 1);
> + POP_CHECK_WRITE(bq, 1024, buf, 512, 0x12, 1);
> + POP_CHECK_BARRIER(bq, 1);
> + POP_CHECK_WRITE(bq, 892, buf, 142, 0x56, 2);
> + POP_CHECK_WRITE(bq, 1512, buf, 42, 0x34, 2);
> + POP_CHECK_BARRIER(bq, 2);
> +
> + blkqueue_destroy(bq);
> +}
> +
> +static void test_read(BlockDriverState *bs)
> +{
> + uint8_t buf[512], buf2[512];
> + BlockQueue *bq;
> + BlockQueueContext ctx1;
> +
> + bq = blkqueue_create(bs);
> + blkqueue_init_context(&ctx1, bq);
> +
> + /* Queue requests and do some test reads */
> + memset(buf2, 0xa5, 512);
> + CHECK_READ(&ctx1, 0, buf, 32, buf2);
> +
> + QUEUE_WRITE(&ctx1, 5, buf, 5, 0x12);
> + memset(buf2, 0x12, 5);
> + CHECK_READ(&ctx1, 5, buf, 5, buf2);
> + CHECK_READ(&ctx1, 7, buf, 2, buf2);
> + memset(buf2, 0xa5, 512);
> + memset(buf2 + 5, 0x12, 5);
> + CHECK_READ(&ctx1, 0, buf, 8, buf2);
> + CHECK_READ(&ctx1, 0, buf, 10, buf2);
> + CHECK_READ(&ctx1, 0, buf, 32, buf2);
> + memset(buf2, 0xa5, 512);
> + memset(buf2, 0x12, 5);
> + CHECK_READ(&ctx1, 5, buf, 16, buf2);
> + memset(buf2, 0xa5, 512);
> + CHECK_READ(&ctx1, 0, buf, 2, buf2);
> + CHECK_READ(&ctx1, 10, buf, 16, buf2);
> +
> + QUEUE_WRITE(&ctx1, 0, buf, 2, 0x12);
> + memset(&buf2[5], 0x12, 5);
> + memset(buf2, 0x12, 2);
> + CHECK_READ(&ctx1, 0, buf, 32, buf2);
> +
> + /* Verify queue contents */
> + POP_CHECK_WRITE(bq, 5, buf, 5, 0x12, 0);
> + POP_CHECK_WRITE(bq, 0, buf, 2, 0x12, 0);
> +
> + blkqueue_destroy(bq);
> +}
> +
> +static void test_read_order(BlockDriverState *bs)
> +{
> + uint8_t buf[512], buf2[512];
> + BlockQueue *bq;
> + BlockQueueContext ctx1, ctx2;
> +
> + bq = blkqueue_create(bs);
> + blkqueue_init_context(&ctx1, bq);
> + blkqueue_init_context(&ctx2, bq);
> +
> + /* Queue requests and do some test reads */
> + QUEUE_WRITE(&ctx1, 25, buf, 5, 0x44);
> + QUEUE_BARRIER(&ctx1);
> + QUEUE_WRITE(&ctx1, 5, buf, 5, 0x12);
> + QUEUE_BARRIER(&ctx1);
> + QUEUE_WRITE(&ctx2, 10, buf, 5, 0x34);
> +
> + memset(buf2, 0xa5, 512);
> + memset(buf2 + 5, 0x12, 5);
> + memset(buf2 + 10, 0x34, 5);
> + CHECK_READ(&ctx2, 0, buf, 20, buf2);
> + QUEUE_WRITE(&ctx2, 0, buf, 10, 0x34);
> + QUEUE_BARRIER(&ctx2);
> +
> + /* Verify queue contents */
> + POP_CHECK_WRITE(bq, 25, buf, 5, 0x44, 0);
> + POP_CHECK_WRITE(bq, 10, buf, 5, 0x34, 0);
> + POP_CHECK_BARRIER(bq, 0);
> + POP_CHECK_WRITE(bq, 5, buf, 5, 0x12, 1);
> + POP_CHECK_WRITE(bq, 0, buf, 10, 0x34, 1);
> + POP_CHECK_BARRIER(bq, 1);
> +
> + blkqueue_destroy(bq);
> +}
> +
> +static void test_process_request(BlockDriverState *bs)
> +{
> + uint8_t buf[512], buf2[512];
> + BlockQueue *bq;
> + BlockQueueContext ctx1;
> +
> + bq = blkqueue_create(bs);
> + blkqueue_init_context(&ctx1, bq);
> +
> + /* Queue requests and do a test read */
> + QUEUE_WRITE(&ctx1, 25, buf, 5, 0x44);
> + QUEUE_BARRIER(&ctx1);
> +
> + memset(buf2, 0xa5, 512);
> + memset(buf2 + 25, 0x44, 5);
> + CHECK_READ(&ctx1, 0, buf, 64, buf2);
> +
> + /* Process the queue (plus one call to test a NULL condition) */
> + blkqueue_process_request(bq);
> + blkqueue_process_request(bq);
> + blkqueue_process_request(bq);
> +
> + /* Verify the queue is empty */
> + assert(blkqueue_pop(bq) == NULL);
> +
> + /* Check if we still read the same */
> + CHECK_READ(&ctx1, 0, buf, 64, buf2);
> +
> + blkqueue_destroy(bq);
> +}
> +
> +static void run_test(void (*testfn)(BlockDriverState*), BlockDriverState *bs)
> +{
> + void* buf;
> + int ret;
> +
> + buf = qemu_malloc(1024 * 1024);
> + memset(buf, 0xa5, 1024 * 1024);
> + ret = bdrv_write(bs, 0, buf, 2048);
> + assert(ret>= 0);
> + qemu_free(buf);
> +
> + testfn(bs);
> +}
> +
> +int main(void)
> +{
> + BlockDriverState *bs;
> + int ret;
> +
> + bdrv_init();
> + bs = bdrv_new("");
> + ret = bdrv_open(bs, "block-queue.img", BDRV_O_RDWR, NULL);
> + if (ret< 0) {
> + fprintf(stderr, "Couldn't open block-queue.img: %s\n",
> + strerror(-ret));
> + exit(1);
> + }
> +
> + run_test(&test_basic, bs);
> + run_test(&test_merge, bs);
> + run_test(&test_read, bs);
> + run_test(&test_read_order, bs);
> + run_test(&test_process_request, bs);
> +
> + bdrv_delete(bs);
> +
> + return 0;
> +}
> +#endif
> diff --git a/block-queue.h b/block-queue.h
> new file mode 100644
> index 0000000..4ce0e1b
> --- /dev/null
> +++ b/block-queue.h
> @@ -0,0 +1,49 @@
> +/*
> + * QEMU System Emulator
> + *
> + * Copyright (c) 2010 Kevin Wolf<kwolf@redhat.com>
> + *
> + * Permission is hereby granted, free of charge, to any person obtaining a copy
> + * of this software and associated documentation files (the "Software"), to deal
> + * in the Software without restriction, including without limitation the rights
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
> + * copies of the Software, and to permit persons to whom the Software is
> + * furnished to do so, subject to the following conditions:
> + *
> + * The above copyright notice and this permission notice shall be included in
> + * all copies or substantial portions of the Software.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
> + * THE SOFTWARE.
> + */
> +
> +#ifndef BLOCK_QUEUE_H
> +#define BLOCK_QUEUE_H
> +
> +#include "qemu-common.h"
> +
> +typedef struct BlockQueue BlockQueue;
> +
> +typedef struct BlockQueueContext {
> + BlockQueue* bq;
> + unsigned section;
> +} BlockQueueContext;
> +
> +BlockQueue *blkqueue_create(BlockDriverState *bs);
> +void blkqueue_init_context(BlockQueueContext* context, BlockQueue *bq);
> +void blkqueue_destroy(BlockQueue *bq);
> +int blkqueue_pread(BlockQueueContext *context, uint64_t offset, void *buf,
> + uint64_t size);
> +int blkqueue_pwrite(BlockQueueContext *context, uint64_t offset, void *buf,
> + uint64_t size);
> +int blkqueue_barrier(BlockQueueContext *context);
> +void blkqueue_flush(BlockQueue *bq);
> +void blkqueue_aio_flush(BlockQueue *bq, BlockDriverCompletionFunc *cb,
> + void *opaque);
> +
> +#endif
> diff --git a/block/qcow2-cluster.c b/block/qcow2-cluster.c
> index f562b16..eeae173 100644
> --- a/block/qcow2-cluster.c
> +++ b/block/qcow2-cluster.c
> @@ -64,7 +64,8 @@ int qcow2_grow_l1_table(BlockDriverState *bs, int min_size)
> BLKDBG_EVENT(bs->file, BLKDBG_L1_GROW_WRITE_TABLE);
> for(i = 0; i< s->l1_size; i++)
> new_l1_table[i] = cpu_to_be64(new_l1_table[i]);
> - ret = bdrv_pwrite_sync(bs->file, new_l1_table_offset, new_l1_table, new_l1_size2);
> + ret = blkqueue_pwrite(&s->bq_context, new_l1_table_offset, new_l1_table, new_l1_size2);
> + blkqueue_barrier(&s->bq_context);
> if (ret< 0)
> goto fail;
> for(i = 0; i< s->l1_size; i++)
> @@ -74,7 +75,8 @@ int qcow2_grow_l1_table(BlockDriverState *bs, int min_size)
> BLKDBG_EVENT(bs->file, BLKDBG_L1_GROW_ACTIVATE_TABLE);
> cpu_to_be32w((uint32_t*)data, new_l1_size);
> cpu_to_be64w((uint64_t*)(data + 4), new_l1_table_offset);
> - ret = bdrv_pwrite_sync(bs->file, offsetof(QCowHeader, l1_size), data,sizeof(data));
> + ret = blkqueue_pwrite(&s->bq_context, offsetof(QCowHeader, l1_size), data, sizeof(data));
> + blkqueue_barrier(&s->bq_context);
> if (ret< 0) {
> goto fail;
> }
> @@ -177,8 +179,7 @@ static int l2_load(BlockDriverState *bs, uint64_t l2_offset,
> *l2_table = s->l2_cache + (min_index<< s->l2_bits);
>
> BLKDBG_EVENT(bs->file, BLKDBG_L2_LOAD);
> - ret = bdrv_pread(bs->file, l2_offset, *l2_table,
> - s->l2_size * sizeof(uint64_t));
> + ret = blkqueue_pread(&s->bq_context, l2_offset, *l2_table, s->l2_size * sizeof(uint64_t));
> if (ret< 0) {
> return ret;
> }
> @@ -207,8 +208,8 @@ static int write_l1_entry(BlockDriverState *bs, int l1_index)
> }
>
> BLKDBG_EVENT(bs->file, BLKDBG_L1_UPDATE);
> - ret = bdrv_pwrite_sync(bs->file, s->l1_table_offset + 8 * l1_start_index,
> - buf, sizeof(buf));
> + ret = blkqueue_pwrite(&s->bq_context, s->l1_table_offset + 8 * l1_start_index, buf, sizeof(buf));
> + blkqueue_barrier(&s->bq_context);
> if (ret< 0) {
> return ret;
> }
> @@ -255,16 +256,15 @@ static int l2_allocate(BlockDriverState *bs, int l1_index, uint64_t **table)
> } else {
> /* if there was an old l2 table, read it from the disk */
> BLKDBG_EVENT(bs->file, BLKDBG_L2_ALLOC_COW_READ);
> - ret = bdrv_pread(bs->file, old_l2_offset, l2_table,
> - s->l2_size * sizeof(uint64_t));
> + ret = blkqueue_pread(&s->bq_context, old_l2_offset, l2_table, s->l2_size * sizeof(uint64_t));
> if (ret< 0) {
> goto fail;
> }
> }
> /* write the l2 table to the file */
> BLKDBG_EVENT(bs->file, BLKDBG_L2_ALLOC_WRITE);
> - ret = bdrv_pwrite_sync(bs->file, l2_offset, l2_table,
> - s->l2_size * sizeof(uint64_t));
> + ret = blkqueue_pwrite(&s->bq_context, l2_offset, l2_table, s->l2_size * sizeof(uint64_t));
> + blkqueue_barrier(&s->bq_context);
> if (ret< 0) {
> goto fail;
> }
> @@ -378,7 +378,7 @@ static int qcow_read(BlockDriverState *bs, int64_t sector_num,
> memcpy(buf, s->cluster_cache + index_in_cluster * 512, 512 * n);
> } else {
> BLKDBG_EVENT(bs->file, BLKDBG_READ);
> - ret = bdrv_pread(bs->file, cluster_offset + index_in_cluster * 512, buf, n * 512);
> + ret = blkqueue_pread(&s->bq_context, cluster_offset + index_in_cluster * 512, buf, n * 512);
> if (ret != n * 512)
> return -1;
> if (s->crypt_method) {
> @@ -648,6 +648,7 @@ uint64_t qcow2_alloc_compressed_cluster_offset(BlockDriverState *bs,
> static int write_l2_entries(BlockDriverState *bs, uint64_t *l2_table,
> uint64_t l2_offset, int l2_index, int num)
> {
> + BDRVQcowState *s = bs->opaque;
> int l2_start_index = l2_index& ~(L1_ENTRIES_PER_SECTOR - 1);
> int start_offset = (8 * l2_index)& ~511;
> int end_offset = (8 * (l2_index + num) + 511)& ~511;
> @@ -655,8 +656,7 @@ static int write_l2_entries(BlockDriverState *bs, uint64_t *l2_table,
> int ret;
>
> BLKDBG_EVENT(bs->file, BLKDBG_L2_UPDATE);
> - ret = bdrv_pwrite(bs->file, l2_offset + start_offset,
> -&l2_table[l2_start_index], len);
> + ret = blkqueue_pwrite(&s->bq_context, l2_offset + start_offset,&l2_table[l2_start_index], len);
> if (ret< 0) {
> return ret;
> }
> @@ -723,7 +723,7 @@ int qcow2_alloc_cluster_link_l2(BlockDriverState *bs, QCowL2Meta *m)
> * Also flush bs->file to get the right order for L2 and refcount update.
> */
> if (j != 0) {
> - bdrv_flush(bs->file);
> + blkqueue_barrier(&s->bq_context);
> for (i = 0; i< j; i++) {
> qcow2_free_any_clusters(bs,
> be64_to_cpu(old_cluster[i])& ~QCOW_OFLAG_COPIED, 1);
> diff --git a/block/qcow2-refcount.c b/block/qcow2-refcount.c
> index 4c19e7e..0d21d1f 100644
> --- a/block/qcow2-refcount.c
> +++ b/block/qcow2-refcount.c
> @@ -44,7 +44,7 @@ static int write_refcount_block(BlockDriverState *bs)
> }
>
> BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_UPDATE);
> - if (bdrv_pwrite_sync(bs->file, s->refcount_block_cache_offset,
> + if (blkqueue_pwrite(&s->bq_context, s->refcount_block_cache_offset,
> s->refcount_block_cache, size)< 0)
> {
> return -EIO;
> @@ -66,8 +66,7 @@ int qcow2_refcount_init(BlockDriverState *bs)
> s->refcount_table = qemu_malloc(refcount_table_size2);
> if (s->refcount_table_size> 0) {
> BLKDBG_EVENT(bs->file, BLKDBG_REFTABLE_LOAD);
> - ret = bdrv_pread(bs->file, s->refcount_table_offset,
> - s->refcount_table, refcount_table_size2);
> + ret = bdrv_pread(bs->file, s->refcount_table_offset, s->refcount_table, refcount_table_size2);
> if (ret != refcount_table_size2)
> goto fail;
> for(i = 0; i< s->refcount_table_size; i++)
> @@ -100,8 +99,7 @@ static int load_refcount_block(BlockDriverState *bs,
> }
>
> BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_LOAD);
> - ret = bdrv_pread(bs->file, refcount_block_offset, s->refcount_block_cache,
> - s->cluster_size);
> + ret = blkqueue_pread(&s->bq_context, refcount_block_offset, s->refcount_block_cache, s->cluster_size);
> if (ret< 0) {
> return ret;
> }
> @@ -269,8 +267,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
>
> /* Now the new refcount block needs to be written to disk */
> BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_WRITE);
> - ret = bdrv_pwrite_sync(bs->file, new_block, s->refcount_block_cache,
> - s->cluster_size);
> + ret = blkqueue_pwrite(&s->bq_context, new_block, s->refcount_block_cache, s->cluster_size);
> + blkqueue_barrier(&s->bq_context);
> if (ret< 0) {
> goto fail_block;
> }
> @@ -279,9 +277,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
> if (refcount_table_index< s->refcount_table_size) {
> uint64_t data64 = cpu_to_be64(new_block);
> BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_HOOKUP);
> - ret = bdrv_pwrite_sync(bs->file,
> - s->refcount_table_offset + refcount_table_index * sizeof(uint64_t),
> -&data64, sizeof(data64));
> + ret = blkqueue_pwrite(&s->bq_context, s->refcount_table_offset + refcount_table_index * sizeof(uint64_t),&data64, sizeof(data64));
> + blkqueue_barrier(&s->bq_context);
> if (ret< 0) {
> goto fail_block;
> }
> @@ -359,8 +356,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
>
> /* Write refcount blocks to disk */
> BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_WRITE_BLOCKS);
> - ret = bdrv_pwrite_sync(bs->file, meta_offset, new_blocks,
> - blocks_clusters * s->cluster_size);
> + ret = blkqueue_pwrite(&s->bq_context, meta_offset, new_blocks, blocks_clusters * s->cluster_size);
> + blkqueue_barrier(&s->bq_context);
> qemu_free(new_blocks);
> if (ret< 0) {
> goto fail_table;
> @@ -372,8 +369,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
> }
>
> BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_WRITE_TABLE);
> - ret = bdrv_pwrite_sync(bs->file, table_offset, new_table,
> - table_size * sizeof(uint64_t));
> + ret = blkqueue_pwrite(&s->bq_context, table_offset, new_table, table_size * sizeof(uint64_t));
> + blkqueue_barrier(&s->bq_context);
> if (ret< 0) {
> goto fail_table;
> }
> @@ -387,8 +384,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
> cpu_to_be64w((uint64_t*)data, table_offset);
> cpu_to_be32w((uint32_t*)(data + 8), table_clusters);
> BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_SWITCH_TABLE);
> - ret = bdrv_pwrite_sync(bs->file, offsetof(QCowHeader, refcount_table_offset),
> - data, sizeof(data));
> + ret = blkqueue_pwrite(&s->bq_context, offsetof(QCowHeader, refcount_table_offset), data, sizeof(data));
> + blkqueue_barrier(&s->bq_context);
> if (ret< 0) {
> goto fail_table;
> }
> @@ -444,9 +441,8 @@ static int write_refcount_block_entries(BlockDriverState *bs,
> size = (last_index - first_index)<< REFCOUNT_SHIFT;
>
> BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_UPDATE_PART);
> - ret = bdrv_pwrite_sync(bs->file,
> - refcount_block_offset + (first_index<< REFCOUNT_SHIFT),
> -&s->refcount_block_cache[first_index], size);
> + ret = blkqueue_pwrite(&s->bq_context, refcount_block_offset + (first_index<< REFCOUNT_SHIFT),&s->refcount_block_cache[first_index], size);
> + blkqueue_barrier(&s->bq_context);
> if (ret< 0) {
> return ret;
> }
> @@ -763,8 +759,7 @@ int qcow2_update_snapshot_refcount(BlockDriverState *bs,
> l1_table = NULL;
> }
> l1_allocated = 1;
> - if (bdrv_pread(bs->file, l1_table_offset,
> - l1_table, l1_size2) != l1_size2)
> + if (blkqueue_pread(&s->bq_context, l1_table_offset, l1_table, l1_size2) != l1_size2)
> goto fail;
> for(i = 0;i< l1_size; i++)
> be64_to_cpus(&l1_table[i]);
> @@ -783,7 +778,7 @@ int qcow2_update_snapshot_refcount(BlockDriverState *bs,
> old_l2_offset = l2_offset;
> l2_offset&= ~QCOW_OFLAG_COPIED;
> l2_modified = 0;
> - if (bdrv_pread(bs->file, l2_offset, l2_table, l2_size) != l2_size)
> + if (blkqueue_pread(&s->bq_context, l2_offset, l2_table, l2_size) != l2_size)
> goto fail;
> for(j = 0; j< s->l2_size; j++) {
> offset = be64_to_cpu(l2_table[j]);
> @@ -943,7 +938,7 @@ static int check_refcounts_l2(BlockDriverState *bs, BdrvCheckResult *res,
> l2_size = s->l2_size * sizeof(uint64_t);
> l2_table = qemu_malloc(l2_size);
>
> - if (bdrv_pread(bs->file, l2_offset, l2_table, l2_size) != l2_size)
> + if (blkqueue_pread(&s->bq_context, l2_offset, l2_table, l2_size) != l2_size)
> goto fail;
>
> /* Do the actual checks */
> @@ -1038,8 +1033,7 @@ static int check_refcounts_l1(BlockDriverState *bs,
> l1_table = NULL;
> } else {
> l1_table = qemu_malloc(l1_size2);
> - if (bdrv_pread(bs->file, l1_table_offset,
> - l1_table, l1_size2) != l1_size2)
> + if (blkqueue_pread(&s->bq_context, l1_table_offset, l1_table, l1_size2) != l1_size2)
> goto fail;
> for(i = 0;i< l1_size; i++)
> be64_to_cpus(&l1_table[i]);
> diff --git a/block/qcow2.c b/block/qcow2.c
> index f2b1b1c..9b1cd78 100644
> --- a/block/qcow2.c
> +++ b/block/qcow2.c
> @@ -237,6 +237,10 @@ static int qcow_open(BlockDriverState *bs, int flags)
> if (qcow2_read_snapshots(bs)< 0)
> goto fail;
>
> + /* Block queue */
> + s->bq = blkqueue_create(bs->file);
> + blkqueue_init_context(&s->bq_context, s->bq);
> +
> #ifdef DEBUG_ALLOC
> qcow2_check_refcounts(bs);
> #endif
> @@ -494,6 +498,7 @@ static QCowAIOCB *qcow_aio_setup(BlockDriverState *bs,
> int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> BlockDriverCompletionFunc *cb, void *opaque, int is_write)
> {
> + BDRVQcowState *s = bs->opaque;
> QCowAIOCB *acb;
>
> acb = qemu_aio_get(&qcow_aio_pool, bs, cb, opaque);
> @@ -514,6 +519,10 @@ static QCowAIOCB *qcow_aio_setup(BlockDriverState *bs,
> acb->cluster_offset = 0;
> acb->l2meta.nb_clusters = 0;
> QLIST_INIT(&acb->l2meta.dependent_requests);
> +
> + /* TODO Push the context into l2meta */
> + blkqueue_init_context(&s->bq_context, s->bq);
> +
> return acb;
> }
>
> @@ -667,6 +676,7 @@ static void qcow_close(BlockDriverState *bs)
> qemu_free(s->cluster_cache);
> qemu_free(s->cluster_data);
> qcow2_refcount_close(bs);
> + blkqueue_destroy(s->bq);
> }
>
> /*
> @@ -1123,12 +1133,16 @@ static int qcow_write_compressed(BlockDriverState *bs, int64_t sector_num,
>
> static void qcow_flush(BlockDriverState *bs)
> {
> + BDRVQcowState *s = bs->opaque;
> + blkqueue_flush(s->bq);
> bdrv_flush(bs->file);
> }
>
> static BlockDriverAIOCB *qcow_aio_flush(BlockDriverState *bs,
> BlockDriverCompletionFunc *cb, void *opaque)
> {
> + BDRVQcowState *s = bs->opaque;
> + blkqueue_flush(s->bq);
> return bdrv_aio_flush(bs->file, cb, opaque);
> }
>
> diff --git a/block/qcow2.h b/block/qcow2.h
> index 3ff162e..361f1ba 100644
> --- a/block/qcow2.h
> +++ b/block/qcow2.h
> @@ -26,6 +26,7 @@
> #define BLOCK_QCOW2_H
>
> #include "aes.h"
> +#include "block-queue.h"
>
> //#define DEBUG_ALLOC
> //#define DEBUG_ALLOC2
> @@ -108,6 +109,9 @@ typedef struct BDRVQcowState {
> int64_t free_cluster_index;
> int64_t free_byte_offset;
>
> + BlockQueue *bq;
> + BlockQueueContext bq_context;
> +
> uint32_t crypt_method; /* current crypt method, 0 if no key yet */
> uint32_t crypt_method_header;
> AES_KEY aes_encrypt_key;
> diff --git a/qemu-thread.c b/qemu-thread.c
> index fbc78fe..abcb7a6 100644
> --- a/qemu-thread.c
> +++ b/qemu-thread.c
> @@ -167,6 +167,19 @@ void qemu_thread_create(QemuThread *thread,
> pthread_sigmask(SIG_SETMASK,&oldset, NULL);
> }
>
> +void *qemu_thread_join(QemuThread *thread)
> +{
> + int err;
> + void *ret;
> +
> + err = pthread_join(thread->thread,&ret);
> + if (err) {
> + error_exit(err, __func__);
> + }
> +
> + return ret;
> +}
> +
> void qemu_thread_signal(QemuThread *thread, int sig)
> {
> int err;
> diff --git a/qemu-thread.h b/qemu-thread.h
> index 19bb30c..2b6f218 100644
> --- a/qemu-thread.h
> +++ b/qemu-thread.h
> @@ -36,6 +36,7 @@ int qemu_cond_timedwait(QemuCond *cond, QemuMutex *mutex, uint64_t msecs);
> void qemu_thread_create(QemuThread *thread,
> void *(*start_routine)(void*),
> void *arg);
> +void *qemu_thread_join(QemuThread *thread);
> void qemu_thread_signal(QemuThread *thread, int sig);
> void qemu_thread_self(QemuThread *thread);
> int qemu_thread_equal(QemuThread *thread1, QemuThread *thread2);
>
>
next prev parent reply other threads:[~2010-09-20 14:31 UTC|newest]
Thread overview: 15+ messages / expand[flat|nested] mbox.gz Atom feed top
2010-09-20 13:56 [Qemu-devel] [RFC] block-queue: Delay and batch metadata writes Kevin Wolf
2010-09-20 14:31 ` Anthony Liguori [this message]
2010-09-20 14:56 ` Anthony Liguori
2010-09-20 15:33 ` Kevin Wolf
2010-09-20 15:48 ` Anthony Liguori
2010-09-20 15:08 ` Kevin Wolf
2010-09-20 15:33 ` Avi Kivity
2010-09-20 15:38 ` Avi Kivity
2010-09-20 15:46 ` Kevin Wolf
2010-09-20 15:40 ` Anthony Liguori
2010-09-20 15:55 ` Kevin Wolf
2010-09-20 16:34 ` Anthony Liguori
2010-09-20 15:51 ` Anthony Liguori
2010-09-20 16:05 ` Avi Kivity
2010-09-21 9:13 ` Kevin Wolf
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=4C977028.3050602@codemonkey.ws \
--to=anthony@codemonkey.ws \
--cc=kwolf@redhat.com \
--cc=qemu-devel@nongnu.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).