qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: Anthony Liguori <anthony@codemonkey.ws>
To: Kevin Wolf <kwolf@redhat.com>
Cc: qemu-devel@nongnu.org
Subject: Re: [Qemu-devel] [RFC] block-queue: Delay and batch metadata writes
Date: Mon, 20 Sep 2010 09:31:04 -0500	[thread overview]
Message-ID: <4C977028.3050602@codemonkey.ws> (raw)
In-Reply-To: <1284991010-10951-1-git-send-email-kwolf@redhat.com>

On 09/20/2010 08:56 AM, Kevin Wolf wrote:
> I won't get this ready until I leave for vacation on Wednesday, so I thought I
> could just as well post it as an RFC in this state.
>
> With this patch applied, qcow2 doesn't directly access the image file any more
> for metadata, but rather goes through the newly introduced blkqueue. Write
> and sync requests are queued there and executed in a separate worker thread.
> Reads consider the contents of the queue before accessing the the image file.
>
> What makes this interesting is that we can delay syncs and if multiple syncs
> occur, we can merge them into one bdrv_flush.
>
> A typical sequence in qcow2 (simple cluster allocation) looks like this:
>
> 1. Update refcount table
> 2. bdrv_flush
> 3. Update L2 entry
>    

Let's expand it a bit more:

1. Update refcount table
2. bdrv_flush
3. Update L2 entry
4. Write data to disk
5. Report write complete

I'm struggling to understand how a thread helps out.

If you run 1-3 in a thread, you need to inject a barrier between steps 3 
and 5 or you'll report the write complete before writing the metadata 
out.  You can't delay completing step 3 until a guest requests a flush.  
If you do, then you're implementing a writeback cache for metadata.

If you're comfortable with a writeback cache for metadata, then you 
should also be comfortable with a writeback cache for data in which 
case, cache=writeback is the answer.

If it's a matter of batching, batching can't occur if you have a barrier 
between steps 3 and 5.  The only way you can get batching is by doing a 
writeback cache for the metadata such that you can complete your request 
before the metadata is written.

Am I misunderstanding the idea?

Regards,

Anthony Liguori

> If we delay the operation and get three of these sequences queued before
> actually executing, we end up with the following result, saving two syncs:
>
> 1. Update refcount table (req 1)
> 2. Update refcount table (req 2)
> 3. Update refcount table (req 3)
> 4. bdrv_flush
> 5. Update L2 entry (req 1)
> 6. Update L2 entry (req 2)
> 7. Update L2 entry (req 3)
>
> This patch only commits a sync if either the guests has requested a flush or if
> a certain number of requests in the queue, so usually we batch more than just
> three requests.
>
> I didn't run any detailed benchmarks but just tried what happens with
> installation time of a Fedora 13 guest, and while git master takes about 40-50%
> longer than before the metadata syncs, we get most of it back with blkqueue.
>
> Of course, in this state the code is not correct, but it's correct enough to
> try and have qcow2 run on a file backend. Some remaining problems are:
>
> - There's no locking between the worker thread and other functions accessing
>    the same backend driver. Should be fine for file, but probably not for other
>    backends.
>
> - Error handling doesn't really exist. If something goes wrong with writing
>    metadata we can't fail the guest request any more because it's long
>    completed. Losing this data is actually okay, the guest hasn't flushed yet.
>
>    However, we need to be able to fail a flush, and we also need some way to
>    handle errors transparently. This probably means that we have to stop the VM
>    and let the user fix things so that we can retry. The only other way would be
>    to shut down the VM and end up in the same situation as with a host crash.
>
>    Or maybe it would even be enough to start failing all new requests.
>
> - The Makefile integration is obviously very wrong, too. It worked for me good
>    enough, but you need to be aware when block-queue.o is compiled with
>    RUN_TESTS and when it isn't. The tests need to be split out properly.
>
> They are certainly fixable and shouldn't have any major impact on performance,
> so that's just a matter of doing it.
>
> Kevin
>
> ---
>   Makefile               |    3 +
>   Makefile.objs          |    1 +
>   block-queue.c          |  720 ++++++++++++++++++++++++++++++++++++++++++++++++
>   block-queue.h          |   49 ++++
>   block/qcow2-cluster.c  |   28 +-
>   block/qcow2-refcount.c |   44 ++--
>   block/qcow2.c          |   14 +
>   block/qcow2.h          |    4 +
>   qemu-thread.c          |   13 +
>   qemu-thread.h          |    1 +
>   10 files changed, 838 insertions(+), 39 deletions(-)
>   create mode 100644 block-queue.c
>   create mode 100644 block-queue.h
>
> diff --git a/Makefile b/Makefile
> index ab91d42..0202dc6 100644
> --- a/Makefile
> +++ b/Makefile
> @@ -125,6 +125,9 @@ qemu-nbd$(EXESUF): qemu-nbd.o qemu-tool.o qemu-error.o $(trace-obj-y) $(block-ob
>
>   qemu-io$(EXESUF): qemu-io.o cmd.o qemu-tool.o qemu-error.o $(trace-obj-y) $(block-obj-y) $(qobject-obj-y)
>
> +block-queue$(EXESUF): QEMU_CFLAGS += -DRUN_TESTS
> +block-queue$(EXESUF): qemu-tool.o qemu-error.o qemu-thread.o $(block-obj-y) $(qobject-obj-y)
> +
>   qemu-img-cmds.h: $(SRC_PATH)/qemu-img-cmds.hx
>   	$(call quiet-command,sh $(SRC_PATH)/hxtool -h<  $<  >  $@,"  GEN   $@")
>
> diff --git a/Makefile.objs b/Makefile.objs
> index 3ef6d80..e97a246 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -9,6 +9,7 @@ qobject-obj-y += qerror.o
>
>   block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o
>   block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
> +block-obj-y += qemu-thread.o block-queue.o
>   block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
>   block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>
> diff --git a/block-queue.c b/block-queue.c
> new file mode 100644
> index 0000000..13579a7
> --- /dev/null
> +++ b/block-queue.c
> @@ -0,0 +1,720 @@
> +/*
> + * QEMU System Emulator
> + *
> + * Copyright (c) 2010 Kevin Wolf<kwolf@redhat.com>
> + *
> + * Permission is hereby granted, free of charge, to any person obtaining a copy
> + * of this software and associated documentation files (the "Software"), to deal
> + * in the Software without restriction, including without limitation the rights
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
> + * copies of the Software, and to permit persons to whom the Software is
> + * furnished to do so, subject to the following conditions:
> + *
> + * The above copyright notice and this permission notice shall be included in
> + * all copies or substantial portions of the Software.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
> + * THE SOFTWARE.
> + */
> +
> +#include<signal.h>
> +
> +#include "qemu-common.h"
> +#include "qemu-queue.h"
> +#include "qemu-thread.h"
> +#include "qemu-barrier.h"
> +#include "block.h"
> +#include "block-queue.h"
> +
> +enum blkqueue_req_type {
> +    REQ_TYPE_WRITE,
> +    REQ_TYPE_BARRIER,
> +};
> +
> +typedef struct BlockQueueRequest {
> +    enum blkqueue_req_type type;
> +
> +    uint64_t    offset;
> +    void*       buf;
> +    uint64_t    size;
> +    unsigned    section;
> +
> +    QTAILQ_ENTRY(BlockQueueRequest) link;
> +    QSIMPLEQ_ENTRY(BlockQueueRequest) link_section;
> +} BlockQueueRequest;
> +
> +struct BlockQueue {
> +    BlockDriverState*   bs;
> +
> +    QemuThread          thread;
> +    bool                thread_done;
> +    QemuMutex           lock;
> +    QemuMutex           flush_lock;
> +    QemuCond            cond;
> +
> +    int                 barriers_requested;
> +    int                 barriers_submitted;
> +    int                 queue_size;
> +
> +    QTAILQ_HEAD(bq_queue_head, BlockQueueRequest) queue;
> +    QSIMPLEQ_HEAD(, BlockQueueRequest) sections;
> +};
> +
> +static void *blkqueue_thread(void *bq);
> +
> +BlockQueue *blkqueue_create(BlockDriverState *bs)
> +{
> +    BlockQueue *bq = qemu_mallocz(sizeof(BlockQueue));
> +    bq->bs = bs;
> +
> +    QTAILQ_INIT(&bq->queue);
> +    QSIMPLEQ_INIT(&bq->sections);
> +
> +    qemu_mutex_init(&bq->lock);
> +    qemu_mutex_init(&bq->flush_lock);
> +    qemu_cond_init(&bq->cond);
> +
> +    bq->thread_done = false;
> +    qemu_thread_create(&bq->thread, blkqueue_thread, bq);
> +
> +    return bq;
> +}
> +
> +void blkqueue_init_context(BlockQueueContext* context, BlockQueue *bq)
> +{
> +    context->bq = bq;
> +    context->section = 0;
> +}
> +
> +void blkqueue_destroy(BlockQueue *bq)
> +{
> +    bq->thread_done = true;
> +    qemu_cond_signal(&bq->cond);
> +    qemu_thread_join(&bq->thread);
> +
> +    blkqueue_flush(bq);
> +
> +    fprintf(stderr, "blkqueue_destroy: %d/%d barriers left\n",
> +        bq->barriers_submitted, bq->barriers_requested);
> +
> +    qemu_mutex_destroy(&bq->lock);
> +    qemu_mutex_destroy(&bq->flush_lock);
> +    qemu_cond_destroy(&bq->cond);
> +
> +    assert(QTAILQ_FIRST(&bq->queue) == NULL);
> +    assert(QSIMPLEQ_FIRST(&bq->sections) == NULL);
> +    qemu_free(bq);
> +}
> +
> +int blkqueue_pread(BlockQueueContext *context, uint64_t offset, void *buf,
> +    uint64_t size)
> +{
> +    BlockQueue *bq = context->bq;
> +    BlockQueueRequest *req;
> +    int ret;
> +
> +    /*
> +     * First check if there are any pending writes for the same data. Reverse
> +     * order to return data written by the latest write.
> +     */
> +    QTAILQ_FOREACH_REVERSE(req,&bq->queue, bq_queue_head, link) {
> +        uint64_t end = offset + size;
> +        uint64_t req_end = req->offset + req->size;
> +        uint8_t *read_buf = buf;
> +        uint8_t *req_buf = req->buf;
> +
> +        /* We're only interested in queued writes */
> +        if (req->type != REQ_TYPE_WRITE) {
> +            continue;
> +        }
> +
> +        /*
> +         * If we read from a write in the queue (i.e. our read overlaps the
> +         * write request), our next write probably depends on this write, so
> +         * let's move forward to its section.
> +         */
> +        if (end>  req->offset&&  offset<  req_end) {
> +            context->section = MAX(context->section, req->section);
> +        }
> +
> +        /* How we continue, depends on the kind of overlap we have */
> +        if ((offset>= req->offset)&&  (end<= req_end)) {
> +            /* Completely contained in the write request */
> +            memcpy(buf,&req_buf[offset - req->offset], size);
> +            return 0;
> +        } else if ((end>= req->offset)&&  (end<= req_end)) {
> +            /* Overlap in the end of the read request */
> +            assert(offset<  req->offset);
> +            memcpy(&read_buf[req->offset - offset], req_buf, end - req->offset);
> +            size = req->offset - offset;
> +        } else if ((offset>= req->offset)&&  (offset<  req_end)) {
> +            /* Overlap in the start of the read request */
> +            assert(end>  req_end);
> +            memcpy(read_buf,&req_buf[offset - req->offset], req_end - offset);
> +            buf = read_buf =&read_buf[req_end - offset];
> +            offset = req_end;
> +            size = end - req_end;
> +        } else if ((req->offset>= offset)&&  (req_end<= end)) {
> +            /*
> +             * The write request is completely contained in the read request.
> +             * memcpy the data from the write request here, continue with the
> +             * data before the write request and handle the data after the
> +             * write request with a recursive call.
> +             */
> +            memcpy(&read_buf[req->offset - offset], req_buf, req_end - req->offset);
> +            size = req->offset - offset;
> +            blkqueue_pread(context, req_end,&read_buf[req_end - offset], end - req_end);
> +        }
> +    }
> +
> +    /* The requested is not written in the queue, read it from disk */
> +    ret = bdrv_pread(bq->bs, offset, buf, size);
> +    if (ret<  0) {
> +        return ret;
> +    }
> +
> +    return 0;
> +}
> +
> +int blkqueue_pwrite(BlockQueueContext *context, uint64_t offset, void *buf,
> +    uint64_t size)
> +{
> +    BlockQueue *bq = context->bq;
> +    BlockQueueRequest *section_req;
> +
> +    /* Create request structure */
> +    BlockQueueRequest *req = qemu_malloc(sizeof(*req));
> +    req->type       = REQ_TYPE_WRITE;
> +    req->offset     = offset;
> +    req->size       = size;
> +    req->buf        = qemu_malloc(size);
> +    req->section    = context->section;
> +    memcpy(req->buf, buf, size);
> +
> +    /*
> +     * Find the right place to insert it into the queue:
> +     * Right before the barrier that closes the current section.
> +     */
> +    qemu_mutex_lock(&bq->lock);
> +    QSIMPLEQ_FOREACH(section_req,&bq->sections, link_section) {
> +        if (section_req->section>= req->section) {
> +            req->section = section_req->section;
> +            context->section = section_req->section;
> +            QTAILQ_INSERT_BEFORE(section_req, req, link);
> +            bq->queue_size++;
> +            goto out;
> +        }
> +    }
> +
> +    /* If there was no barrier, just put it at the end. */
> +    QTAILQ_INSERT_TAIL(&bq->queue, req, link);
> +    bq->queue_size++;
> +    qemu_cond_signal(&bq->cond);
> +
> +out:
> +    qemu_mutex_unlock(&bq->lock);
> +    return 0;
> +}
> +
> +int blkqueue_barrier(BlockQueueContext *context)
> +{
> +    BlockQueue *bq = context->bq;
> +    BlockQueueRequest *section_req;
> +
> +    bq->barriers_requested++;
> +
> +    /* Create request structure */
> +    BlockQueueRequest *req = qemu_malloc(sizeof(*req));
> +    req->type       = REQ_TYPE_BARRIER;
> +    req->section    = context->section;
> +    req->buf        = NULL;
> +
> +    /* Find another barrier to merge with. */
> +    qemu_mutex_lock(&bq->lock);
> +    QSIMPLEQ_FOREACH(section_req,&bq->sections, link_section) {
> +        if (section_req->section>= req->section) {
> +            req->section = section_req->section;
> +            context->section = section_req->section + 1;
> +            qemu_free(req);
> +            goto out;
> +        }
> +    }
> +
> +    /*
> +     * If there wasn't a barrier for the same section yet, insert a new one at
> +     * the end.
> +     */
> +    QTAILQ_INSERT_TAIL(&bq->queue, req, link);
> +    QSIMPLEQ_INSERT_TAIL(&bq->sections, req, link_section);
> +    bq->queue_size++;
> +    context->section++;
> +    qemu_cond_signal(&bq->cond);
> +
> +    bq->barriers_submitted++;
> +
> +out:
> +    qemu_mutex_unlock(&bq->lock);
> +    return 0;
> +}
> +
> +/*
> + * Caller needs to hold the bq->lock mutex
> + */
> +static BlockQueueRequest *blkqueue_pop(BlockQueue *bq)
> +{
> +    BlockQueueRequest *req;
> +
> +    req = QTAILQ_FIRST(&bq->queue);
> +    if (req == NULL) {
> +        goto out;
> +    }
> +
> +    QTAILQ_REMOVE(&bq->queue, req, link);
> +    bq->queue_size--;
> +
> +    if (req->type == REQ_TYPE_BARRIER) {
> +        assert(QSIMPLEQ_FIRST(&bq->sections) == req);
> +        QSIMPLEQ_REMOVE_HEAD(&bq->sections, link_section);
> +    }
> +
> +out:
> +    return req;
> +}
> +
> +static void blkqueue_free_request(BlockQueueRequest *req)
> +{
> +    qemu_free(req->buf);
> +    qemu_free(req);
> +}
> +
> +static void blkqueue_process_request(BlockQueue *bq)
> +{
> +    BlockQueueRequest *req;
> +    BlockQueueRequest *req2;
> +    int ret;
> +
> +    /*
> +     * Note that we leave the request in the queue while we process it. No
> +     * other request will be queued before this one and we have only one thread
> +     * that processes the queue, so afterwards it will still be the first
> +     * request. (Not true for barriers in the first position, but we can handle
> +     * that)
> +     */
> +    req = QTAILQ_FIRST(&bq->queue);
> +    if (req == NULL) {
> +        return;
> +    }
> +
> +    switch (req->type) {
> +        case REQ_TYPE_WRITE:
> +            ret = bdrv_pwrite(bq->bs, req->offset, req->buf, req->size);
> +            if (ret<  0) {
> +                /* TODO Error reporting! */
> +                return;
> +            }
> +            break;
> +        case REQ_TYPE_BARRIER:
> +            bdrv_flush(bq->bs);
> +            break;
> +    }
> +
> +    /*
> +     * Only remove the request from the queue when it's written, so that reads
> +     * always access the right data.
> +     */
> +    qemu_mutex_lock(&bq->lock);
> +    req2 = QTAILQ_FIRST(&bq->queue);
> +    if (req == req2) {
> +        blkqueue_pop(bq);
> +        blkqueue_free_request(req);
> +    } else {
> +        /*
> +         * If it's a barrier and something has been queued before it, just
> +         * leave it in the queue and flush once again later.
> +         */
> +        assert(req->type == REQ_TYPE_BARRIER);
> +        bq->barriers_submitted++;
> +    }
> +    qemu_mutex_unlock(&bq->lock);
> +}
> +
> +struct blkqueue_flush_aiocb {
> +    BlockQueue *bq;
> +    BlockDriverCompletionFunc *cb;
> +    void *opaque;
> +};
> +
> +static void *blkqueue_aio_flush_thread(void *opaque)
> +{
> +    struct blkqueue_flush_aiocb *acb = opaque;
> +
> +    /* Process any left over requests */
> +    blkqueue_flush(acb->bq);
> +
> +    acb->cb(acb->opaque, 0);
> +    qemu_free(acb);
> +
> +    return NULL;
> +}
> +
> +void blkqueue_aio_flush(BlockQueue *bq, BlockDriverCompletionFunc *cb,
> +    void *opaque)
> +{
> +    struct blkqueue_flush_aiocb *acb;
> +
> +    acb = qemu_malloc(sizeof(*acb));
> +    acb->bq = bq;
> +    acb->cb = cb;
> +    acb->opaque = opaque;
> +
> +    qemu_thread_create(NULL, blkqueue_aio_flush_thread, acb);
> +}
> +
> +void blkqueue_flush(BlockQueue *bq)
> +{
> +    qemu_mutex_lock(&bq->flush_lock);
> +
> +    /* Process any left over requests */
> +    while (QTAILQ_FIRST(&bq->queue)) {
> +        blkqueue_process_request(bq);
> +    }
> +
> +    qemu_mutex_unlock(&bq->flush_lock);
> +}
> +
> +static void *blkqueue_thread(void *_bq)
> +{
> +    BlockQueue *bq = _bq;
> +#ifndef RUN_TESTS
> +    BlockQueueRequest *req;
> +#endif
> +
> +    qemu_mutex_lock(&bq->flush_lock);
> +    while (!bq->thread_done) {
> +        barrier();
> +#ifndef RUN_TESTS
> +        req = QTAILQ_FIRST(&bq->queue);
> +
> +        /* Don't process barriers, we only do that on flushes */
> +        if (req&&  (req->type != REQ_TYPE_BARRIER || bq->queue_size>  42)) {
> +            blkqueue_process_request(bq);
> +        } else {
> +            qemu_cond_wait(&bq->cond,&bq->flush_lock);
> +        }
> +#else
> +        qemu_cond_wait(&bq->cond,&bq->flush_lock);
> +#endif
> +    }
> +    qemu_mutex_unlock(&bq->flush_lock);
> +
> +    return NULL;
> +}
> +
> +#ifdef RUN_TESTS
> +
> +#define CHECK_WRITE(req, _offset, _size, _buf, _section) \
> +    do { \
> +        assert(req != NULL); \
> +        assert(req->type == REQ_TYPE_WRITE); \
> +        assert(req->offset == _offset); \
> +        assert(req->size == _size); \
> +        assert(req->section == _section); \
> +        assert(!memcmp(req->buf, _buf, _size)); \
> +    } while(0)
> +
> +#define CHECK_BARRIER(req, _section) \
> +    do { \
> +        assert(req != NULL); \
> +        assert(req->type == REQ_TYPE_BARRIER); \
> +        assert(req->section == _section); \
> +    } while(0)
> +
> +#define CHECK_READ(_context, _offset, _buf, _size, _cmpbuf) \
> +    do { \
> +        int ret; \
> +        memset(buf, 0, 512); \
> +        ret = blkqueue_pread(_context, _offset, _buf, _size); \
> +        assert(ret == 0); \
> +        assert(!memcmp(_cmpbuf, _buf, _size)); \
> +    } while(0)
> +
> +#define QUEUE_WRITE(_context, _offset, _buf, _size, _pattern) \
> +    do { \
> +        int ret; \
> +        memset(_buf, _pattern, _size); \
> +        ret = blkqueue_pwrite(_context, _offset, _buf, _size); \
> +        assert(ret == 0); \
> +    } while(0)
> +#define QUEUE_BARRIER(_context) \
> +    do { \
> +        int ret; \
> +        ret = blkqueue_barrier(_context); \
> +        assert(ret == 0); \
> +    } while(0)
> +
> +#define POP_CHECK_WRITE(_bq, _offset, _buf, _size, _pattern, _section) \
> +    do { \
> +        BlockQueueRequest *req; \
> +        memset(_buf, _pattern, _size); \
> +        req = blkqueue_pop(_bq); \
> +        CHECK_WRITE(req, _offset, _size, _buf, _section); \
> +        blkqueue_free_request(req); \
> +    } while(0)
> +#define POP_CHECK_BARRIER(_bq, _section) \
> +    do { \
> +        BlockQueueRequest *req; \
> +        req = blkqueue_pop(_bq); \
> +        CHECK_BARRIER(req, _section); \
> +        blkqueue_free_request(req); \
> +    } while(0)
> +
> +static void  __attribute__((used)) dump_queue(BlockQueue *bq)
> +{
> +    BlockQueueRequest *req;
> +
> +    fprintf(stderr, "--- Queue dump ---\n");
> +    QTAILQ_FOREACH(req,&bq->queue, link) {
> +        fprintf(stderr, "[%d] ", req->section);
> +        if (req->type == REQ_TYPE_WRITE) {
> +            fprintf(stderr, "Write off=%5"PRId64", len=%5"PRId64", buf=%p\n",
> +                req->offset, req->size, req->buf);
> +        } else if (req->type == REQ_TYPE_BARRIER) {
> +            fprintf(stderr, "Barrier\n");
> +        } else {
> +            fprintf(stderr, "Unknown type %d\n", req->type);
> +        }
> +    }
> +}
> +
> +static void test_basic(BlockDriverState *bs)
> +{
> +    uint8_t buf[512];
> +    BlockQueue *bq;
> +    BlockQueueContext context;
> +
> +    bq = blkqueue_create(bs);
> +    blkqueue_init_context(&context, bq);
> +
> +    /* Queue requests */
> +    QUEUE_WRITE(&context,   0, buf, 512, 0x12);
> +    QUEUE_WRITE(&context, 512, buf,  42, 0x34);
> +    QUEUE_BARRIER(&context);
> +    QUEUE_WRITE(&context, 678, buf,  42, 0x56);
> +
> +    /* Verify queue contents */
> +    POP_CHECK_WRITE(bq,     0, buf, 512, 0x12, 0);
> +    POP_CHECK_WRITE(bq,   512, buf,  42, 0x34, 0);
> +    POP_CHECK_BARRIER(bq, 0);
> +    POP_CHECK_WRITE(bq,   678, buf,  42, 0x56, 1);
> +
> +    blkqueue_destroy(bq);
> +}
> +
> +static void test_merge(BlockDriverState *bs)
> +{
> +    uint8_t buf[512];
> +    BlockQueue *bq;
> +    BlockQueueContext ctx1, ctx2;
> +
> +    bq = blkqueue_create(bs);
> +    blkqueue_init_context(&ctx1, bq);
> +    blkqueue_init_context(&ctx2, bq);
> +
> +    /* Queue requests */
> +    QUEUE_WRITE(&ctx1,    0, buf, 512, 0x12);
> +    QUEUE_BARRIER(&ctx1);
> +    QUEUE_WRITE(&ctx2,  512, buf,  42, 0x34);
> +    QUEUE_WRITE(&ctx1, 1024, buf, 512, 0x12);
> +    QUEUE_BARRIER(&ctx2);
> +    QUEUE_WRITE(&ctx2, 1512, buf,  42, 0x34);
> +
> +    /* Verify queue contents */
> +    POP_CHECK_WRITE(bq,     0, buf, 512, 0x12, 0);
> +    POP_CHECK_WRITE(bq,   512, buf,  42, 0x34, 0);
> +    POP_CHECK_BARRIER(bq, 0);
> +    POP_CHECK_WRITE(bq,  1024, buf, 512, 0x12, 1);
> +    POP_CHECK_WRITE(bq,  1512, buf,  42, 0x34, 1);
> +
> +    /* Same queue, new contexts */
> +    blkqueue_init_context(&ctx1, bq);
> +    blkqueue_init_context(&ctx2, bq);
> +
> +    /* Queue requests */
> +    QUEUE_BARRIER(&ctx2);
> +    QUEUE_WRITE(&ctx2,  512, buf,  42, 0x34);
> +    QUEUE_WRITE(&ctx2,   12, buf,  20, 0x45);
> +    QUEUE_BARRIER(&ctx2);
> +    QUEUE_WRITE(&ctx2,  892, buf, 142, 0x56);
> +
> +    QUEUE_WRITE(&ctx1,    0, buf,   8, 0x12);
> +    QUEUE_BARRIER(&ctx1);
> +    QUEUE_WRITE(&ctx1, 1024, buf, 512, 0x12);
> +    QUEUE_BARRIER(&ctx1);
> +    QUEUE_WRITE(&ctx1, 1512, buf,  42, 0x34);
> +    QUEUE_BARRIER(&ctx1);
> +
> +    /* Verify queue contents */
> +    POP_CHECK_WRITE(bq,     0, buf,   8, 0x12, 0);
> +    POP_CHECK_BARRIER(bq, 0);
> +    POP_CHECK_WRITE(bq,   512, buf,  42, 0x34, 1);
> +    POP_CHECK_WRITE(bq,    12, buf,  20, 0x45, 1);
> +    POP_CHECK_WRITE(bq,  1024, buf, 512, 0x12, 1);
> +    POP_CHECK_BARRIER(bq, 1);
> +    POP_CHECK_WRITE(bq,   892, buf, 142, 0x56, 2);
> +    POP_CHECK_WRITE(bq,  1512, buf,  42, 0x34, 2);
> +    POP_CHECK_BARRIER(bq, 2);
> +
> +    blkqueue_destroy(bq);
> +}
> +
> +static void test_read(BlockDriverState *bs)
> +{
> +    uint8_t buf[512], buf2[512];
> +    BlockQueue *bq;
> +    BlockQueueContext ctx1;
> +
> +    bq = blkqueue_create(bs);
> +    blkqueue_init_context(&ctx1, bq);
> +
> +    /* Queue requests and do some test reads */
> +    memset(buf2, 0xa5, 512);
> +    CHECK_READ(&ctx1, 0, buf, 32, buf2);
> +
> +    QUEUE_WRITE(&ctx1, 5, buf, 5, 0x12);
> +    memset(buf2, 0x12, 5);
> +    CHECK_READ(&ctx1,  5, buf, 5, buf2);
> +    CHECK_READ(&ctx1,  7, buf, 2, buf2);
> +    memset(buf2, 0xa5, 512);
> +    memset(buf2 + 5, 0x12, 5);
> +    CHECK_READ(&ctx1,  0, buf, 8, buf2);
> +    CHECK_READ(&ctx1,  0, buf, 10, buf2);
> +    CHECK_READ(&ctx1,  0, buf, 32, buf2);
> +    memset(buf2, 0xa5, 512);
> +    memset(buf2, 0x12, 5);
> +    CHECK_READ(&ctx1,  5, buf, 16, buf2);
> +    memset(buf2, 0xa5, 512);
> +    CHECK_READ(&ctx1,  0, buf,  2, buf2);
> +    CHECK_READ(&ctx1, 10, buf, 16, buf2);
> +
> +    QUEUE_WRITE(&ctx1, 0, buf, 2, 0x12);
> +    memset(&buf2[5], 0x12, 5);
> +    memset(buf2, 0x12, 2);
> +    CHECK_READ(&ctx1,  0, buf, 32, buf2);
> +
> +    /* Verify queue contents */
> +    POP_CHECK_WRITE(bq,     5, buf,   5, 0x12, 0);
> +    POP_CHECK_WRITE(bq,     0, buf,   2, 0x12, 0);
> +
> +    blkqueue_destroy(bq);
> +}
> +
> +static void test_read_order(BlockDriverState *bs)
> +{
> +    uint8_t buf[512], buf2[512];
> +    BlockQueue *bq;
> +    BlockQueueContext ctx1, ctx2;
> +
> +    bq = blkqueue_create(bs);
> +    blkqueue_init_context(&ctx1, bq);
> +    blkqueue_init_context(&ctx2, bq);
> +
> +    /* Queue requests and do some test reads */
> +    QUEUE_WRITE(&ctx1, 25, buf, 5, 0x44);
> +    QUEUE_BARRIER(&ctx1);
> +    QUEUE_WRITE(&ctx1, 5, buf, 5, 0x12);
> +    QUEUE_BARRIER(&ctx1);
> +    QUEUE_WRITE(&ctx2, 10, buf, 5, 0x34);
> +
> +    memset(buf2, 0xa5, 512);
> +    memset(buf2 + 5, 0x12, 5);
> +    memset(buf2 + 10, 0x34, 5);
> +    CHECK_READ(&ctx2, 0, buf, 20, buf2);
> +    QUEUE_WRITE(&ctx2,  0, buf, 10, 0x34);
> +    QUEUE_BARRIER(&ctx2);
> +
> +    /* Verify queue contents */
> +    POP_CHECK_WRITE(bq,    25, buf,   5, 0x44, 0);
> +    POP_CHECK_WRITE(bq,    10, buf,   5, 0x34, 0);
> +    POP_CHECK_BARRIER(bq, 0);
> +    POP_CHECK_WRITE(bq,     5, buf,   5, 0x12, 1);
> +    POP_CHECK_WRITE(bq,     0, buf,  10, 0x34, 1);
> +    POP_CHECK_BARRIER(bq, 1);
> +
> +    blkqueue_destroy(bq);
> +}
> +
> +static void test_process_request(BlockDriverState *bs)
> +{
> +    uint8_t buf[512], buf2[512];
> +    BlockQueue *bq;
> +    BlockQueueContext ctx1;
> +
> +    bq = blkqueue_create(bs);
> +    blkqueue_init_context(&ctx1, bq);
> +
> +    /* Queue requests and do a test read */
> +    QUEUE_WRITE(&ctx1, 25, buf, 5, 0x44);
> +    QUEUE_BARRIER(&ctx1);
> +
> +    memset(buf2, 0xa5, 512);
> +    memset(buf2 + 25, 0x44, 5);
> +    CHECK_READ(&ctx1, 0, buf, 64, buf2);
> +
> +    /* Process the queue (plus one call to test a NULL condition) */
> +    blkqueue_process_request(bq);
> +    blkqueue_process_request(bq);
> +    blkqueue_process_request(bq);
> +
> +    /* Verify the queue is empty */
> +    assert(blkqueue_pop(bq) == NULL);
> +
> +    /* Check if we still read the same */
> +    CHECK_READ(&ctx1, 0, buf, 64, buf2);
> +
> +    blkqueue_destroy(bq);
> +}
> +
> +static void run_test(void (*testfn)(BlockDriverState*), BlockDriverState *bs)
> +{
> +    void* buf;
> +    int ret;
> +
> +    buf = qemu_malloc(1024 * 1024);
> +    memset(buf, 0xa5, 1024 * 1024);
> +    ret = bdrv_write(bs, 0, buf, 2048);
> +    assert(ret>= 0);
> +    qemu_free(buf);
> +
> +    testfn(bs);
> +}
> +
> +int main(void)
> +{
> +    BlockDriverState *bs;
> +    int ret;
> +
> +    bdrv_init();
> +    bs = bdrv_new("");
> +    ret = bdrv_open(bs, "block-queue.img", BDRV_O_RDWR, NULL);
> +    if (ret<  0) {
> +        fprintf(stderr, "Couldn't open block-queue.img: %s\n",
> +            strerror(-ret));
> +        exit(1);
> +    }
> +
> +    run_test(&test_basic, bs);
> +    run_test(&test_merge, bs);
> +    run_test(&test_read, bs);
> +    run_test(&test_read_order, bs);
> +    run_test(&test_process_request, bs);
> +
> +    bdrv_delete(bs);
> +
> +    return 0;
> +}
> +#endif
> diff --git a/block-queue.h b/block-queue.h
> new file mode 100644
> index 0000000..4ce0e1b
> --- /dev/null
> +++ b/block-queue.h
> @@ -0,0 +1,49 @@
> +/*
> + * QEMU System Emulator
> + *
> + * Copyright (c) 2010 Kevin Wolf<kwolf@redhat.com>
> + *
> + * Permission is hereby granted, free of charge, to any person obtaining a copy
> + * of this software and associated documentation files (the "Software"), to deal
> + * in the Software without restriction, including without limitation the rights
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
> + * copies of the Software, and to permit persons to whom the Software is
> + * furnished to do so, subject to the following conditions:
> + *
> + * The above copyright notice and this permission notice shall be included in
> + * all copies or substantial portions of the Software.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
> + * THE SOFTWARE.
> + */
> +
> +#ifndef BLOCK_QUEUE_H
> +#define BLOCK_QUEUE_H
> +
> +#include "qemu-common.h"
> +
> +typedef struct BlockQueue BlockQueue;
> +
> +typedef struct BlockQueueContext {
> +    BlockQueue* bq;
> +    unsigned    section;
> +} BlockQueueContext;
> +
> +BlockQueue *blkqueue_create(BlockDriverState *bs);
> +void blkqueue_init_context(BlockQueueContext* context, BlockQueue *bq);
> +void blkqueue_destroy(BlockQueue *bq);
> +int blkqueue_pread(BlockQueueContext *context, uint64_t offset, void *buf,
> +    uint64_t size);
> +int blkqueue_pwrite(BlockQueueContext *context, uint64_t offset, void *buf,
> +    uint64_t size);
> +int blkqueue_barrier(BlockQueueContext *context);
> +void blkqueue_flush(BlockQueue *bq);
> +void blkqueue_aio_flush(BlockQueue *bq, BlockDriverCompletionFunc *cb,
> +    void *opaque);
> +
> +#endif
> diff --git a/block/qcow2-cluster.c b/block/qcow2-cluster.c
> index f562b16..eeae173 100644
> --- a/block/qcow2-cluster.c
> +++ b/block/qcow2-cluster.c
> @@ -64,7 +64,8 @@ int qcow2_grow_l1_table(BlockDriverState *bs, int min_size)
>       BLKDBG_EVENT(bs->file, BLKDBG_L1_GROW_WRITE_TABLE);
>       for(i = 0; i<  s->l1_size; i++)
>           new_l1_table[i] = cpu_to_be64(new_l1_table[i]);
> -    ret = bdrv_pwrite_sync(bs->file, new_l1_table_offset, new_l1_table, new_l1_size2);
> +    ret = blkqueue_pwrite(&s->bq_context, new_l1_table_offset, new_l1_table, new_l1_size2);
> +    blkqueue_barrier(&s->bq_context);
>       if (ret<  0)
>           goto fail;
>       for(i = 0; i<  s->l1_size; i++)
> @@ -74,7 +75,8 @@ int qcow2_grow_l1_table(BlockDriverState *bs, int min_size)
>       BLKDBG_EVENT(bs->file, BLKDBG_L1_GROW_ACTIVATE_TABLE);
>       cpu_to_be32w((uint32_t*)data, new_l1_size);
>       cpu_to_be64w((uint64_t*)(data + 4), new_l1_table_offset);
> -    ret = bdrv_pwrite_sync(bs->file, offsetof(QCowHeader, l1_size), data,sizeof(data));
> +    ret = blkqueue_pwrite(&s->bq_context, offsetof(QCowHeader, l1_size), data, sizeof(data));
> +    blkqueue_barrier(&s->bq_context);
>       if (ret<  0) {
>           goto fail;
>       }
> @@ -177,8 +179,7 @@ static int l2_load(BlockDriverState *bs, uint64_t l2_offset,
>       *l2_table = s->l2_cache + (min_index<<  s->l2_bits);
>
>       BLKDBG_EVENT(bs->file, BLKDBG_L2_LOAD);
> -    ret = bdrv_pread(bs->file, l2_offset, *l2_table,
> -        s->l2_size * sizeof(uint64_t));
> +    ret = blkqueue_pread(&s->bq_context, l2_offset, *l2_table, s->l2_size * sizeof(uint64_t));
>       if (ret<  0) {
>           return ret;
>       }
> @@ -207,8 +208,8 @@ static int write_l1_entry(BlockDriverState *bs, int l1_index)
>       }
>
>       BLKDBG_EVENT(bs->file, BLKDBG_L1_UPDATE);
> -    ret = bdrv_pwrite_sync(bs->file, s->l1_table_offset + 8 * l1_start_index,
> -        buf, sizeof(buf));
> +    ret = blkqueue_pwrite(&s->bq_context, s->l1_table_offset + 8 * l1_start_index, buf, sizeof(buf));
> +    blkqueue_barrier(&s->bq_context);
>       if (ret<  0) {
>           return ret;
>       }
> @@ -255,16 +256,15 @@ static int l2_allocate(BlockDriverState *bs, int l1_index, uint64_t **table)
>       } else {
>           /* if there was an old l2 table, read it from the disk */
>           BLKDBG_EVENT(bs->file, BLKDBG_L2_ALLOC_COW_READ);
> -        ret = bdrv_pread(bs->file, old_l2_offset, l2_table,
> -            s->l2_size * sizeof(uint64_t));
> +        ret = blkqueue_pread(&s->bq_context, old_l2_offset, l2_table, s->l2_size * sizeof(uint64_t));
>           if (ret<  0) {
>               goto fail;
>           }
>       }
>       /* write the l2 table to the file */
>       BLKDBG_EVENT(bs->file, BLKDBG_L2_ALLOC_WRITE);
> -    ret = bdrv_pwrite_sync(bs->file, l2_offset, l2_table,
> -        s->l2_size * sizeof(uint64_t));
> +    ret = blkqueue_pwrite(&s->bq_context, l2_offset, l2_table, s->l2_size * sizeof(uint64_t));
> +    blkqueue_barrier(&s->bq_context);
>       if (ret<  0) {
>           goto fail;
>       }
> @@ -378,7 +378,7 @@ static int qcow_read(BlockDriverState *bs, int64_t sector_num,
>               memcpy(buf, s->cluster_cache + index_in_cluster * 512, 512 * n);
>           } else {
>               BLKDBG_EVENT(bs->file, BLKDBG_READ);
> -            ret = bdrv_pread(bs->file, cluster_offset + index_in_cluster * 512, buf, n * 512);
> +            ret = blkqueue_pread(&s->bq_context, cluster_offset + index_in_cluster * 512, buf, n * 512);
>               if (ret != n * 512)
>                   return -1;
>               if (s->crypt_method) {
> @@ -648,6 +648,7 @@ uint64_t qcow2_alloc_compressed_cluster_offset(BlockDriverState *bs,
>   static int write_l2_entries(BlockDriverState *bs, uint64_t *l2_table,
>       uint64_t l2_offset, int l2_index, int num)
>   {
> +    BDRVQcowState *s = bs->opaque;
>       int l2_start_index = l2_index&  ~(L1_ENTRIES_PER_SECTOR - 1);
>       int start_offset = (8 * l2_index)&  ~511;
>       int end_offset = (8 * (l2_index + num) + 511)&  ~511;
> @@ -655,8 +656,7 @@ static int write_l2_entries(BlockDriverState *bs, uint64_t *l2_table,
>       int ret;
>
>       BLKDBG_EVENT(bs->file, BLKDBG_L2_UPDATE);
> -    ret = bdrv_pwrite(bs->file, l2_offset + start_offset,
> -&l2_table[l2_start_index], len);
> +    ret = blkqueue_pwrite(&s->bq_context, l2_offset + start_offset,&l2_table[l2_start_index], len);
>       if (ret<  0) {
>           return ret;
>       }
> @@ -723,7 +723,7 @@ int qcow2_alloc_cluster_link_l2(BlockDriverState *bs, QCowL2Meta *m)
>        * Also flush bs->file to get the right order for L2 and refcount update.
>        */
>       if (j != 0) {
> -        bdrv_flush(bs->file);
> +        blkqueue_barrier(&s->bq_context);
>           for (i = 0; i<  j; i++) {
>               qcow2_free_any_clusters(bs,
>                   be64_to_cpu(old_cluster[i])&  ~QCOW_OFLAG_COPIED, 1);
> diff --git a/block/qcow2-refcount.c b/block/qcow2-refcount.c
> index 4c19e7e..0d21d1f 100644
> --- a/block/qcow2-refcount.c
> +++ b/block/qcow2-refcount.c
> @@ -44,7 +44,7 @@ static int write_refcount_block(BlockDriverState *bs)
>       }
>
>       BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_UPDATE);
> -    if (bdrv_pwrite_sync(bs->file, s->refcount_block_cache_offset,
> +    if (blkqueue_pwrite(&s->bq_context, s->refcount_block_cache_offset,
>               s->refcount_block_cache, size)<  0)
>       {
>           return -EIO;
> @@ -66,8 +66,7 @@ int qcow2_refcount_init(BlockDriverState *bs)
>       s->refcount_table = qemu_malloc(refcount_table_size2);
>       if (s->refcount_table_size>  0) {
>           BLKDBG_EVENT(bs->file, BLKDBG_REFTABLE_LOAD);
> -        ret = bdrv_pread(bs->file, s->refcount_table_offset,
> -                         s->refcount_table, refcount_table_size2);
> +        ret = bdrv_pread(bs->file, s->refcount_table_offset, s->refcount_table, refcount_table_size2);
>           if (ret != refcount_table_size2)
>               goto fail;
>           for(i = 0; i<  s->refcount_table_size; i++)
> @@ -100,8 +99,7 @@ static int load_refcount_block(BlockDriverState *bs,
>       }
>
>       BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_LOAD);
> -    ret = bdrv_pread(bs->file, refcount_block_offset, s->refcount_block_cache,
> -                     s->cluster_size);
> +    ret = blkqueue_pread(&s->bq_context, refcount_block_offset, s->refcount_block_cache, s->cluster_size);
>       if (ret<  0) {
>           return ret;
>       }
> @@ -269,8 +267,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
>
>       /* Now the new refcount block needs to be written to disk */
>       BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_WRITE);
> -    ret = bdrv_pwrite_sync(bs->file, new_block, s->refcount_block_cache,
> -        s->cluster_size);
> +    ret = blkqueue_pwrite(&s->bq_context, new_block, s->refcount_block_cache, s->cluster_size);
> +    blkqueue_barrier(&s->bq_context);
>       if (ret<  0) {
>           goto fail_block;
>       }
> @@ -279,9 +277,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
>       if (refcount_table_index<  s->refcount_table_size) {
>           uint64_t data64 = cpu_to_be64(new_block);
>           BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_HOOKUP);
> -        ret = bdrv_pwrite_sync(bs->file,
> -            s->refcount_table_offset + refcount_table_index * sizeof(uint64_t),
> -&data64, sizeof(data64));
> +        ret = blkqueue_pwrite(&s->bq_context, s->refcount_table_offset + refcount_table_index * sizeof(uint64_t),&data64, sizeof(data64));
> +        blkqueue_barrier(&s->bq_context);
>           if (ret<  0) {
>               goto fail_block;
>           }
> @@ -359,8 +356,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
>
>       /* Write refcount blocks to disk */
>       BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_WRITE_BLOCKS);
> -    ret = bdrv_pwrite_sync(bs->file, meta_offset, new_blocks,
> -        blocks_clusters * s->cluster_size);
> +    ret = blkqueue_pwrite(&s->bq_context, meta_offset, new_blocks, blocks_clusters * s->cluster_size);
> +    blkqueue_barrier(&s->bq_context);
>       qemu_free(new_blocks);
>       if (ret<  0) {
>           goto fail_table;
> @@ -372,8 +369,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
>       }
>
>       BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_WRITE_TABLE);
> -    ret = bdrv_pwrite_sync(bs->file, table_offset, new_table,
> -        table_size * sizeof(uint64_t));
> +    ret = blkqueue_pwrite(&s->bq_context, table_offset, new_table, table_size * sizeof(uint64_t));
> +    blkqueue_barrier(&s->bq_context);
>       if (ret<  0) {
>           goto fail_table;
>       }
> @@ -387,8 +384,8 @@ static int64_t alloc_refcount_block(BlockDriverState *bs, int64_t cluster_index)
>       cpu_to_be64w((uint64_t*)data, table_offset);
>       cpu_to_be32w((uint32_t*)(data + 8), table_clusters);
>       BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_ALLOC_SWITCH_TABLE);
> -    ret = bdrv_pwrite_sync(bs->file, offsetof(QCowHeader, refcount_table_offset),
> -        data, sizeof(data));
> +    ret = blkqueue_pwrite(&s->bq_context, offsetof(QCowHeader, refcount_table_offset), data, sizeof(data));
> +    blkqueue_barrier(&s->bq_context);
>       if (ret<  0) {
>           goto fail_table;
>       }
> @@ -444,9 +441,8 @@ static int write_refcount_block_entries(BlockDriverState *bs,
>       size = (last_index - first_index)<<  REFCOUNT_SHIFT;
>
>       BLKDBG_EVENT(bs->file, BLKDBG_REFBLOCK_UPDATE_PART);
> -    ret = bdrv_pwrite_sync(bs->file,
> -        refcount_block_offset + (first_index<<  REFCOUNT_SHIFT),
> -&s->refcount_block_cache[first_index], size);
> +    ret = blkqueue_pwrite(&s->bq_context, refcount_block_offset + (first_index<<  REFCOUNT_SHIFT),&s->refcount_block_cache[first_index], size);
> +    blkqueue_barrier(&s->bq_context);
>       if (ret<  0) {
>           return ret;
>       }
> @@ -763,8 +759,7 @@ int qcow2_update_snapshot_refcount(BlockDriverState *bs,
>               l1_table = NULL;
>           }
>           l1_allocated = 1;
> -        if (bdrv_pread(bs->file, l1_table_offset,
> -                       l1_table, l1_size2) != l1_size2)
> +        if (blkqueue_pread(&s->bq_context, l1_table_offset, l1_table, l1_size2) != l1_size2)
>               goto fail;
>           for(i = 0;i<  l1_size; i++)
>               be64_to_cpus(&l1_table[i]);
> @@ -783,7 +778,7 @@ int qcow2_update_snapshot_refcount(BlockDriverState *bs,
>               old_l2_offset = l2_offset;
>               l2_offset&= ~QCOW_OFLAG_COPIED;
>               l2_modified = 0;
> -            if (bdrv_pread(bs->file, l2_offset, l2_table, l2_size) != l2_size)
> +            if (blkqueue_pread(&s->bq_context, l2_offset, l2_table, l2_size) != l2_size)
>                   goto fail;
>               for(j = 0; j<  s->l2_size; j++) {
>                   offset = be64_to_cpu(l2_table[j]);
> @@ -943,7 +938,7 @@ static int check_refcounts_l2(BlockDriverState *bs, BdrvCheckResult *res,
>       l2_size = s->l2_size * sizeof(uint64_t);
>       l2_table = qemu_malloc(l2_size);
>
> -    if (bdrv_pread(bs->file, l2_offset, l2_table, l2_size) != l2_size)
> +    if (blkqueue_pread(&s->bq_context, l2_offset, l2_table, l2_size) != l2_size)
>           goto fail;
>
>       /* Do the actual checks */
> @@ -1038,8 +1033,7 @@ static int check_refcounts_l1(BlockDriverState *bs,
>           l1_table = NULL;
>       } else {
>           l1_table = qemu_malloc(l1_size2);
> -        if (bdrv_pread(bs->file, l1_table_offset,
> -                       l1_table, l1_size2) != l1_size2)
> +        if (blkqueue_pread(&s->bq_context, l1_table_offset, l1_table, l1_size2) != l1_size2)
>               goto fail;
>           for(i = 0;i<  l1_size; i++)
>               be64_to_cpus(&l1_table[i]);
> diff --git a/block/qcow2.c b/block/qcow2.c
> index f2b1b1c..9b1cd78 100644
> --- a/block/qcow2.c
> +++ b/block/qcow2.c
> @@ -237,6 +237,10 @@ static int qcow_open(BlockDriverState *bs, int flags)
>       if (qcow2_read_snapshots(bs)<  0)
>           goto fail;
>
> +    /* Block queue */
> +    s->bq = blkqueue_create(bs->file);
> +    blkqueue_init_context(&s->bq_context, s->bq);
> +
>   #ifdef DEBUG_ALLOC
>       qcow2_check_refcounts(bs);
>   #endif
> @@ -494,6 +498,7 @@ static QCowAIOCB *qcow_aio_setup(BlockDriverState *bs,
>           int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
>           BlockDriverCompletionFunc *cb, void *opaque, int is_write)
>   {
> +    BDRVQcowState *s = bs->opaque;
>       QCowAIOCB *acb;
>
>       acb = qemu_aio_get(&qcow_aio_pool, bs, cb, opaque);
> @@ -514,6 +519,10 @@ static QCowAIOCB *qcow_aio_setup(BlockDriverState *bs,
>       acb->cluster_offset = 0;
>       acb->l2meta.nb_clusters = 0;
>       QLIST_INIT(&acb->l2meta.dependent_requests);
> +
> +    /* TODO Push the context into l2meta */
> +    blkqueue_init_context(&s->bq_context, s->bq);
> +
>       return acb;
>   }
>
> @@ -667,6 +676,7 @@ static void qcow_close(BlockDriverState *bs)
>       qemu_free(s->cluster_cache);
>       qemu_free(s->cluster_data);
>       qcow2_refcount_close(bs);
> +    blkqueue_destroy(s->bq);
>   }
>
>   /*
> @@ -1123,12 +1133,16 @@ static int qcow_write_compressed(BlockDriverState *bs, int64_t sector_num,
>
>   static void qcow_flush(BlockDriverState *bs)
>   {
> +    BDRVQcowState *s = bs->opaque;
> +    blkqueue_flush(s->bq);
>       bdrv_flush(bs->file);
>   }
>
>   static BlockDriverAIOCB *qcow_aio_flush(BlockDriverState *bs,
>            BlockDriverCompletionFunc *cb, void *opaque)
>   {
> +    BDRVQcowState *s = bs->opaque;
> +    blkqueue_flush(s->bq);
>       return bdrv_aio_flush(bs->file, cb, opaque);
>   }
>
> diff --git a/block/qcow2.h b/block/qcow2.h
> index 3ff162e..361f1ba 100644
> --- a/block/qcow2.h
> +++ b/block/qcow2.h
> @@ -26,6 +26,7 @@
>   #define BLOCK_QCOW2_H
>
>   #include "aes.h"
> +#include "block-queue.h"
>
>   //#define DEBUG_ALLOC
>   //#define DEBUG_ALLOC2
> @@ -108,6 +109,9 @@ typedef struct BDRVQcowState {
>       int64_t free_cluster_index;
>       int64_t free_byte_offset;
>
> +    BlockQueue *bq;
> +    BlockQueueContext bq_context;
> +
>       uint32_t crypt_method; /* current crypt method, 0 if no key yet */
>       uint32_t crypt_method_header;
>       AES_KEY aes_encrypt_key;
> diff --git a/qemu-thread.c b/qemu-thread.c
> index fbc78fe..abcb7a6 100644
> --- a/qemu-thread.c
> +++ b/qemu-thread.c
> @@ -167,6 +167,19 @@ void qemu_thread_create(QemuThread *thread,
>       pthread_sigmask(SIG_SETMASK,&oldset, NULL);
>   }
>
> +void *qemu_thread_join(QemuThread *thread)
> +{
> +    int err;
> +    void *ret;
> +
> +    err = pthread_join(thread->thread,&ret);
> +    if (err) {
> +        error_exit(err, __func__);
> +    }
> +
> +    return ret;
> +}
> +
>   void qemu_thread_signal(QemuThread *thread, int sig)
>   {
>       int err;
> diff --git a/qemu-thread.h b/qemu-thread.h
> index 19bb30c..2b6f218 100644
> --- a/qemu-thread.h
> +++ b/qemu-thread.h
> @@ -36,6 +36,7 @@ int qemu_cond_timedwait(QemuCond *cond, QemuMutex *mutex, uint64_t msecs);
>   void qemu_thread_create(QemuThread *thread,
>                          void *(*start_routine)(void*),
>                          void *arg);
> +void *qemu_thread_join(QemuThread *thread);
>   void qemu_thread_signal(QemuThread *thread, int sig);
>   void qemu_thread_self(QemuThread *thread);
>   int qemu_thread_equal(QemuThread *thread1, QemuThread *thread2);
>
>    

  reply	other threads:[~2010-09-20 14:31 UTC|newest]

Thread overview: 15+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2010-09-20 13:56 [Qemu-devel] [RFC] block-queue: Delay and batch metadata writes Kevin Wolf
2010-09-20 14:31 ` Anthony Liguori [this message]
2010-09-20 14:56   ` Anthony Liguori
2010-09-20 15:33     ` Kevin Wolf
2010-09-20 15:48       ` Anthony Liguori
2010-09-20 15:08   ` Kevin Wolf
2010-09-20 15:33     ` Avi Kivity
2010-09-20 15:38       ` Avi Kivity
2010-09-20 15:46       ` Kevin Wolf
2010-09-20 15:40     ` Anthony Liguori
2010-09-20 15:55       ` Kevin Wolf
2010-09-20 16:34         ` Anthony Liguori
2010-09-20 15:51     ` Anthony Liguori
2010-09-20 16:05       ` Avi Kivity
2010-09-21  9:13       ` Kevin Wolf

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=4C977028.3050602@codemonkey.ws \
    --to=anthony@codemonkey.ws \
    --cc=kwolf@redhat.com \
    --cc=qemu-devel@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).