From: Kevin Wolf <kwolf@redhat.com>
To: Nick Thomas <nick@bytemark.co.uk>
Cc: stefanha@gmail.com, qemu-devel@nongnu.org
Subject: [Qemu-devel] Re: [PATCH] NBD: Convert the NBD driver to use the AIO interface.
Date: Wed, 06 Apr 2011 13:30:19 +0200 [thread overview]
Message-ID: <4D9C4ECB.8000908@redhat.com> (raw)
In-Reply-To: <1298566151-27218-1-git-send-email-nick@bytemark.co.uk>
Am 24.02.2011 17:49, schrieb Nick Thomas:
> This preserves the previous behaviour where the NBD server is
> unavailable or goes away during guest execution, but switches the
> NBD backend to present the AIO interface instead of the sync IO
> interface.
>
> We also split read & write requests into 1 MiB blocks (minus header).
> This is a hard limit in the NBD servers (including qemu-nbd), but
> never seemed to come up with the previous backend code.
>
> All IO except setup and teardown is asynchronous, and we (in theory)
> handle canceled requests properly too.
>
> Signed-off-by: Nick Thomas <nick@bytemark.co.uk>
Actually, the patch doesn't even apply. So just a quick review...
> block/nbd.c | 652 +++++++++++++++++++++++++++++++++++++++++++++++++++++-----
> nbd.c | 76 +++++--
> nbd.h | 13 +-
> 3 files changed, 662 insertions(+), 79 deletions(-)
>
> diff --git a/block/nbd.c b/block/nbd.c
> index 1d6b225..d6594e4 100644
> --- a/block/nbd.c
> +++ b/block/nbd.c
> @@ -34,7 +34,12 @@
> #include <sys/types.h>
> #include <unistd.h>
>
> -#define EN_OPTSTR ":exportname="
> +#define EN_OPTSTR ":exportname="
> +#define SECTOR_SIZE 512
> +
> +/* 1MiB minus header size */
> +#define NBD_MAX_READ ((1024*1024) - sizeof(struct nbd_reply))
> +#define NBD_MAX_WRITE ((1024*1024) - sizeof(struct nbd_request))
>
> /* #define DEBUG_NBD */
>
> @@ -45,17 +50,148 @@
> #define logout(fmt, ...) ((void)0)
> #endif
>
> -typedef struct BDRVNBDState {
> +/*
> + * Here's how the I/O works.
> + * qemu creates a BDRVNBDState for us, which is the context for all reads
> + * and writes.
> + *
> + * nbd_open is called to connect to the NBD server and set up on-read and
> + * on-write handlers (aio_read_response, aio_write_request).
> + *
> + * nbd_aio_readv/writev, called by qemu, create an NBDAIOCB (representing the
> + * I/O request to qemu).
> + * For read requests, read/writev creates a single AIOReq containing the NBD
> + * header. For write requests, 1 or more AIOReqs are created, containing the
> + * NBD header and the write data. These are pushed to reqs_to_send_head in the
> + * BDRVNBDState and the list in the NBDAIOCB.
> + *
> + * Each time aio_write_request is called by qemu, it gets the first AIOReq
> + * in the reqs_to_send_head and writes the data to the socket.
> + * If this results in the whole AIOReq being written to the socket, it moves
> + * the AIOReq to the reqs_for_reply_head in the BDRVNBDState. If the AIOReq
> + * isn't finished, then it's left where it is. to have more of it written
> + * next callback.
> + *
> + * If there's an unrecoverable error writing to the socket, we disconnect and
> + * return the entire acb as -EIO
> + *
> + * Each aio_read_response, we check the BDRVNBDState's current_req attribute
> + * to see if we're in the middle of a read. If not, we read a header's worth of
> + * data, then try to find an AIOReq in the reqs_for_reply_head.
> + *
> + * If we don't find one, then we're in a weird error state.
> + *
> + * Once we have our AIOReq, we remove it from reqs_for_reply_head and put it
> + * in the current_req attribute, then read from the socket to the buffer (if
> + * needed). If that completes the AIOReq, we clear the current_req attribute
> + * and deallocate the AIOReq.
> + * If the AIOReq is complete, and that's the last one for the NBDAIOCB, we call
> + * the 'done' callback' and return.
> + * If the AIOReq isn't complete, we just return. It'll be completed in future
> + * callbacks, since it's now the current_req
> + *
> + * If there'an unrecoverable error reading from the socket, [...]
Something's missing here? ;-)
> + */
> +
> +typedef struct NBDAIOCB NBDAIOCB;
> +typedef struct BDRVNBDState BDRVNBDState;
> +
> +static int nbd_establish_connection(BDRVNBDState *s);
> +static void nbd_teardown_connection(BDRVNBDState *s);
> +
> +typedef struct AIOReq {
> + NBDAIOCB *aiocb; /* Which QEMU operation this belongs to */
> +
> + /* Where on the NBDAIOCB's iov does this request start? */
> + off_t iov_offset;
> +
> + /* The NBD request header pertaining to this AIOReq.
> + * This specifies the handle of the request, the read offset and length.
> + */
> + NBDRequest nbd_req_hdr;
> +
> + /* How many bytes have been written to the NBD server so far. This will
> + * vary between 0 and sizeof(nbd_req_hdr) + nbd_req_hdr.len
> + */
> + size_t bytes_sent;
> +
> + /* How many bytes have been read from the NBD server so far. Varies between
> + * 0 and sizeof(nbd_rsp_hdr) + nbd_req_hdr.len
> + */
> + size_t bytes_got;
> +
> + /* Used to record this in the state object. waiting_sent is used to work
> + * out which queue the AIOReq is in. Before it's been sent, it's in the
> + * reqs_to_send_head. After being sent, if it's not current_req, it's in
> + * reqs_for_reply_head.
> + */
> + QTAILQ_ENTRY(AIOReq) socket_siblings;
> + bool waiting_sent;
> +
> + /* Used to enter this into an NBDAIOCB */
> + QLIST_ENTRY(AIOReq) aioreq_siblings;
> +} AIOReq;
> +
> +struct BDRVNBDState {
> + /* File descriptor for the socket to the NBD server */
> int sock;
> +
> + /* Size of the file being served */
> off_t size;
> +
> + /* block size */
> size_t blocksize;
> - char *export_name; /* An NBD server may export several devices */
>
> /* If it begins with '/', this is a UNIX domain socket. Otherwise,
> * it's a string of the form <hostname|ip4|\[ip6\]>:port
> */
> char *host_spec;
> -} BDRVNBDState;
> +
> + /* An NBD server may export several devices - this is the one we want*/
> + char *export_name;
> +
> + /* Used to generate unique NBD handles */
> + uint64_t aioreq_seq_num;
> +
> + /* AIOReqs yet to be transmitted */
> + QTAILQ_HEAD(reqs_to_send_head, AIOReq) reqs_to_send_head;
> +
> + /* AIOReqs that have been transmitted and are awaiting a reply */
> + QTAILQ_HEAD(reqs_for_reply_head, AIOReq) reqs_for_reply_head;
> +
> + /* AIOReq that is currently being read from the socket */
> + AIOReq *current_req;
> +
> + /* Used in aio_read_response. We may need to store received header bytes
> + * between reads - we don't have an AIOReq at that point.
> + */
> + uint8_t nbd_rsp_buf[sizeof(NBDReply)];
> + size_t nbd_rsp_offset;
> +
> +};
> +
> +enum AIOCBState {
> + AIOCB_WRITE_UDATA,
> + AIOCB_READ_UDATA,
> +};
> +
> +struct NBDAIOCB {
> + BlockDriverAIOCB common;
> + QEMUIOVector *qiov;
> + QEMUBH *bh;
> +
> + enum AIOCBState aiocb_type;
> +
> + int64_t sector_num;
> + int nb_sectors;
> + int ret;
> +
> + bool canceled;
> +
> + void (*aio_done_func)(NBDAIOCB *);
> +
> + QLIST_HEAD(aioreq_head, AIOReq) aioreq_head;
> +};
>
> static int nbd_config(BDRVNBDState *s, const char *filename, int flags)
> {
> @@ -103,9 +239,307 @@ out:
> return err;
> }
>
> -static int nbd_establish_connection(BlockDriverState *bs)
> +static inline AIOReq *nbd_alloc_aio_req(BDRVNBDState *s, NBDAIOCB *acb,
> + size_t data_len,
> + off_t offset,
> + off_t iov_offset)
> +{
> + AIOReq *aio_req;
> +
> + aio_req = qemu_malloc(sizeof(*aio_req));
> + aio_req->aiocb = acb;
> + aio_req->iov_offset = iov_offset;
> + aio_req->nbd_req_hdr.from = offset;
> + aio_req->nbd_req_hdr.len = data_len;
> + aio_req->nbd_req_hdr.handle = s->aioreq_seq_num++;
> +
> + if(acb->aiocb_type == AIOCB_READ_UDATA) {
> + aio_req->nbd_req_hdr.type = NBD_CMD_READ;
> + } else {
> + aio_req->nbd_req_hdr.type = NBD_CMD_WRITE;
> + }
> +
> + aio_req->bytes_sent = 0;
> + aio_req->bytes_got = 0;
> +
> + QTAILQ_INSERT_TAIL(&s->reqs_to_send_head, aio_req, socket_siblings);
> + aio_req->waiting_sent = true;
> + QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings);
> + return aio_req;
> +}
> +
> +static int nbd_aio_flush_request(void *opaque)
> +{
> + BDRVNBDState *s = opaque;
> +
> + return !(QTAILQ_EMPTY(&s->reqs_to_send_head) &&
> + QTAILQ_EMPTY(&s->reqs_for_reply_head) &&
> + (s->current_req == NULL));
> +}
> +
> +static inline int free_aio_req(BDRVNBDState *s, AIOReq *aio_req)
> +{
> + NBDAIOCB *acb = aio_req->aiocb;
> + QLIST_REMOVE(aio_req, aioreq_siblings);
> + qemu_free(aio_req);
> +
> + return !QLIST_EMPTY(&acb->aioreq_head);
> +}
> +
> +static void nbd_finish_aiocb(NBDAIOCB *acb)
> +{
> + acb->common.cb(acb->common.opaque, acb->ret);
> + qemu_aio_release(acb);
> +}
> +
> +static void nbd_handle_io_err(BDRVNBDState *s, AIOReq *aio_req, int err)
> +{
> + NBDAIOCB *acb;
> + AIOReq *a;
> +
> + /* These are fine - no need to do anything */
> + if(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
> + return;
> + }
> +
> + /* These errors mean the request failed. So we need to trash the acb
> + * (and all associated AIOReqs) and return -EIO. Partial reads are
> + * fine. Partial writes aren't great, but no worse than (say) a write
> + * to a physical disc that hits a bad sector.
> + */
> + if (aio_req == NULL) {
> + logout("Error %i (%s) on NBD I/O. Killing NBD\n", err, strerror(err));
> + } else {
> + acb = aio_req->aiocb;
> + logout("Error %i (%s) on NBD request (handle %lu). Killing NBD\n", err,
> + sterror(err), aio_req->nbd_req_hdr.handle);
> + acb->ret = -EIO;
Why don't we use the real error code?
> + QLIST_FOREACH(a, &acb->aioreq_head, aioreq_siblings) {
> + if (aio_req->waiting_sent) {
> + QTAILQ_REMOVE(&s->reqs_to_send_head, aio_req, socket_siblings);
> + } else {
> + if (s->current_req == aio_req) {
> + s->current_req = NULL;
> + } else {
> + QTAILQ_REMOVE(&s->reqs_for_reply_head, aio_req,
> + socket_siblings);
> + }
> + }
I think this logic should be part of free_aio_req.
> + free_aio_req(s, a);
> + }
> + nbd_finish_aiocb(acb);
> + }
> +
> + nbd_teardown_connection(s);
And now there's no way to get the disk back to life without a reboot? Do
I understand correctly that now trying to access the disk will always
return -EBADF?
> + return;
Unnecessary return;
> +}
> +
> +static void nbd_aio_write_request(void *opaque)
> +{
> + BDRVNBDState *s = opaque;
> + AIOReq *aio_req = NULL;
> + NBDAIOCB *acb;
> + size_t total;
> + ssize_t ret;
> +
> + if (QTAILQ_EMPTY(&s->reqs_to_send_head)) {
> + return;
> + }
> +
> + aio_req = QTAILQ_FIRST(&s->reqs_to_send_head);
> + acb = aio_req->aiocb;
> +
> + if(acb->aiocb_type == AIOCB_WRITE_UDATA) {
Space after if is missing
> + total = sizeof(NBDRequest) + aio_req->nbd_req_hdr.len;
> + } else {
> + total = sizeof(NBDRequest);
> + }
> +
> + /* Since we've not written (all of) the header yet, get on with it.
> + * We always grab the *head* of the queue in this callback, so we
> + * won't interleave writes to the socket.
> + *
> + * Creating the header buffer on the fly isn't ideal in the case of many
> + * retries, but almost all the time, this will happen exactly once.
> + */
> + if (aio_req->bytes_sent < sizeof(NBDRequest)) {
> + logout("Buffer not written in full, doing so\n");
> + uint8_t buf[sizeof(NBDRequest)];
> + QEMUIOVector hdr;
> + nbd_request_to_buf(&aio_req->nbd_req_hdr, buf);
> + qemu_iovec_init(&hdr, 1);
> + qemu_iovec_add(&hdr, &buf, sizeof(NBDRequest));
> + ret = writev(s->sock, hdr.iov, hdr.niov);
> + qemu_iovec_destroy(&hdr);
> +
> + if (ret == -1) {
> + nbd_handle_io_err(s, aio_req, socket_error());
> + return;
> + } else {
> + logout("Written %zu bytes to socket (request is %zu bytes)\n", ret,
> + sizeof(NBDRequest));
> + aio_req->bytes_sent += ret;
> + }
> + }
> +
> + /* If the header is sent & we're doing a write request, send data */
> + if (acb->aiocb_type == AIOCB_WRITE_UDATA &&
> + aio_req->bytes_sent >= sizeof(NBDRequest) &&
> + aio_req->bytes_sent < total) {
> + logout("Write request - putting data in socket\n");
> + off_t offset = (aio_req->bytes_sent - sizeof(NBDRequest)) +
> + aio_req->iov_offset;
> +
> + ret = nbd_wr_aio(s->sock, acb->qiov, total - aio_req->bytes_sent,
> + offset, false);
> +
> + if (ret < 0 ) {
I guess this is the space that was missing above. :-)
> + nbd_handle_io_err(s, aio_req, -ret);
> + return;
> + } else {
> + logout("Written %zu bytes to socket\n", ret);
> + aio_req->bytes_sent += ret;
> + }
> + }
> +
> + /* Request written. aio_read_response gets the reply */
> + if (aio_req->bytes_sent == total) {
> + logout("aio_req written to socket, moving to reqs_for_reply\n");
> + aio_req->waiting_sent = false;
> + QTAILQ_REMOVE(&s->reqs_to_send_head, aio_req, socket_siblings);
> + QTAILQ_INSERT_TAIL(&s->reqs_for_reply_head, aio_req, socket_siblings);
> + }
> +
> + return;
> +}
> +
> +static void nbd_aio_read_response(void *opaque)
> +{
> + BDRVNBDState *s = opaque;
> + AIOReq *aio_req = NULL;
> + NBDAIOCB *acb;
> + NBDReply rsp;
> +
> + size_t total;
> + ssize_t ret;
> + int rest;
> +
> + if (s->current_req == NULL && QTAILQ_EMPTY(&s->reqs_for_reply_head)) {
> + return;
> + }
> +
> + /* Build our nbd_reply object if we've got it */
> + if (s->current_req && (s->nbd_rsp_offset == sizeof(NBDReply))) {
> + nbd_buf_to_reply((uint8_t *)&s->nbd_rsp_buf, &rsp);
> + }
> +
> + if (s->current_req == NULL) {
> + /* Try to read a header */
Factor this whole block out in its own function?
> + QEMUIOVector hdr;
> + qemu_iovec_init(&hdr, 1);
> + qemu_iovec_add(&hdr, ((&s->nbd_rsp_buf) + s->nbd_rsp_offset),
> + (sizeof(NBDReply) - s->nbd_rsp_offset));
> + ret = readv(s->sock, hdr.iov, hdr.niov);
> + qemu_iovec_destroy(&hdr);
> +
> + if (ret == -1) {
> + nbd_handle_io_err(s, aio_req, socket_error());
> + return;
> + }
> +
> + s->nbd_rsp_offset += ret;
> +
> + if (s->nbd_rsp_offset == sizeof(NBDReply)) {
> + /* Turn data into NBDReply, find the matching aio_req */
> + nbd_buf_to_reply((uint8_t *)&s->nbd_rsp_buf, &rsp);
> +
> + /* Check the magic */
> + if (rsp.magic != NBD_REPLY_MAGIC) {
> + logout("Received invalid NBD response magic!\n");
> + nbd_handle_io_err(s, NULL, EIO);
> + return;
> + }
> +
> + QTAILQ_FOREACH(aio_req, &s->reqs_for_reply_head, socket_siblings) {
> + if (aio_req->nbd_req_hdr.handle == rsp.handle) {
> + s->current_req = aio_req;
> + s->current_req->bytes_got = sizeof(NBDReply);
> + break;
> + }
> + }
> +
> + if (!s->current_req) {
> + logout("cannot find aio_req for handle %lu\n", rsp.handle);
> + nbd_handle_io_err(s, NULL, EIO);
> + return;
> + }
> + QTAILQ_REMOVE(&s->reqs_for_reply_head, aio_req, socket_siblings);
> + } else {
> + /* We haven't finished reading the entire header yet. */
> + return;
> + }
> + }
> +
> + /* s->current_req and rsp are both usable now */
> + aio_req = s->current_req;
> + acb = aio_req->aiocb;
> +
> + total = sizeof(NBDReply);
> + if (acb->aiocb_type == AIOCB_READ_UDATA) {
> + total += aio_req->nbd_req_hdr.len;
> + }
> +
> + if (acb->aiocb_type == AIOCB_READ_UDATA && aio_req->bytes_got < total) {
> + off_t offset = (aio_req->bytes_got - sizeof(NBDReply)) +
> + aio_req->iov_offset;
> + QEMUIOVector *qiov = acb->qiov;
> + uint8_t *buf = NULL;
> +
> + if (acb->canceled) {
> + buf = qemu_malloc(total - offset);
> + qemu_iovec_init(qiov, 1);
> + qemu_iovec_add(qiov, buf, total - offset);
> + }
> +
> + ret = nbd_wr_aio(s->sock, qiov, total - offset, offset, true);
> +
> + if (acb->canceled) {
> + qemu_iovec_destroy(qiov);
> + qemu_free(buf);
> + }
> +
> + if (ret < 0) {
> + nbd_handle_io_err(s, aio_req, -ret);
> + return;
> + } else {
> + aio_req->bytes_got += ret;
> + }
> + }
> +
> + /* Entire request has been read */
> + if (total == aio_req->bytes_got) {
> + logout("Read all bytes of the response; removing response from s->current_req\n");
> + s->nbd_rsp_offset = 0;
> + s->current_req = NULL;
> + if (rsp.error != 0) {
> + acb->ret = -EIO;
> + logout("NBD request resulted in error %i\n", rsp.error);
> + }
> +
> + /* Free the aio_req. If the NBDAIOCB is finished, notify QEMU */
> + rest = free_aio_req(s, aio_req);
> + if (!rest) {
> + logout("acb complete\n");
> + acb->aio_done_func(acb);
> + }
> + }
> +
> + logout("Leaving nbd_aio_read_response\n");
> + return;
Another useless return.
> +}
> +
> +static int nbd_establish_connection(BDRVNBDState *s)
> {
> - BDRVNBDState *s = bs->opaque;
> int sock;
> int ret;
> off_t size;
> @@ -139,23 +573,29 @@ static int nbd_establish_connection(BlockDriverState *bs)
> s->sock = sock;
> s->size = size;
> s->blocksize = blocksize;
> + s->nbd_rsp_offset = 0;
> +
> + qemu_aio_set_fd_handler(sock, nbd_aio_read_response, nbd_aio_write_request,
> + nbd_aio_flush_request, NULL, s);
>
> logout("Established connection with NBD server\n");
> return 0;
> }
>
> -static void nbd_teardown_connection(BlockDriverState *bs)
> +static void nbd_teardown_connection(BDRVNBDState *s)
> {
> - BDRVNBDState *s = bs->opaque;
> struct nbd_request request;
>
> request.type = NBD_CMD_DISC;
> - request.handle = (uint64_t)(intptr_t)bs;
> + request.handle = s->aioreq_seq_num++;
> request.from = 0;
> request.len = 0;
> nbd_send_request(s->sock, &request);
>
> + qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL);
> closesocket(s->sock);
> + logout("Connection to NBD server closed\n");
> + return;
> }
>
> static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
> @@ -169,95 +609,193 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
> return result;
> }
>
> + QTAILQ_INIT(&s->reqs_to_send_head);
> + QTAILQ_INIT(&s->reqs_for_reply_head);
> +
> + s->current_req = NULL;
> + s->aioreq_seq_num = 0;
> + s->nbd_rsp_offset = 0;
> +
> /* establish TCP connection, return error if it fails
> * TODO: Configurable retry-until-timeout behaviour.
> */
> - result = nbd_establish_connection(bs);
> + result = nbd_establish_connection(s);
>
> return result;
> }
>
> -static int nbd_read(BlockDriverState *bs, int64_t sector_num,
> - uint8_t *buf, int nb_sectors)
> +static void nbd_close(BlockDriverState *bs)
> {
> BDRVNBDState *s = bs->opaque;
> - struct nbd_request request;
> - struct nbd_reply reply;
>
> - request.type = NBD_CMD_READ;
> - request.handle = (uint64_t)(intptr_t)bs;
> - request.from = sector_num * 512;;
> - request.len = nb_sectors * 512;
> + nbd_teardown_connection(s);
> + qemu_free(s->export_name);
> + qemu_free(s->host_spec);
>
> - if (nbd_send_request(s->sock, &request) == -1)
> - return -errno;
> + return;
> +}
>
> - if (nbd_receive_reply(s->sock, &reply) == -1)
> - return -errno;
> +/* We remove all the aiocbs currently sat in reqs_to_send_head (excepting the
> + * first, if any bytes have been transmitted). So we don't need to check for
> + * canceled in aio_write_request at all.If that finishes the acb, we call its
> + * completion function. Otherwise, we leave it alone.
> + *
> + * in nbd_aio_read_response, when we're handling a read request for an acb with
> + * canceled = true, we allocate a QEMUIOVector of the appropriate size to do
> + * the read, and throw the bytes away. Everything else goes on as normal.
> + */
> +static void nbd_aio_cancel(BlockDriverAIOCB *blockacb) {
> + NBDAIOCB *acb = (NBDAIOCB *)blockacb;
> + BDRVNBDState *s = acb->common.bs->opaque;
> + AIOReq *a;
> +
> + QLIST_FOREACH(a, &acb->aioreq_head, aioreq_siblings) {
> + if(a->waiting_sent && a->bytes_sent == 0) {
> + QTAILQ_REMOVE(&s->reqs_to_send_head, a, socket_siblings);
> + free_aio_req(s, a);
> + }
> + }
> +
> + acb->canceled = true;
> + acb->ret = -EIO;
> +
> + if QLIST_EMPTY(&acb->aioreq_head) {
> + nbd_finish_aiocb(acb);
> + }
> +}
> +
> +static AIOPool nbd_aio_pool = {
> + .aiocb_size = sizeof(NBDAIOCB),
> + .cancel = nbd_aio_cancel,
> +};
> +
> +static NBDAIOCB *nbd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
> + int64_t sector_num, int nb_sectors,
> + BlockDriverCompletionFunc *cb, void *opaque)
> +{
> + NBDAIOCB *acb;
>
> - if (reply.error !=0)
> - return -reply.error;
> + acb = qemu_aio_get(&nbd_aio_pool, bs, cb, opaque);
>
> - if (reply.handle != request.handle)
> + acb->qiov = qiov;
> + acb->sector_num = sector_num;
> + acb->nb_sectors = nb_sectors;
> +
> + acb->canceled = false;
> +
> + acb->aio_done_func = NULL;
> + acb->bh = NULL;
> + acb->ret = 0;
> +
> + QLIST_INIT(&acb->aioreq_head);
> + return acb;
> +}
> +
> +static int nbd_schedule_bh(QEMUBHFunc *cb, NBDAIOCB *acb)
> +{
> + if (acb->bh) {
> + logout("bug: %d %d\n", acb->aiocb_type, acb->aiocb_type);
> return -EIO;
> + }
>
> - if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len)
> + acb->bh = qemu_bh_new(cb, acb);
> + if (!acb->bh) {
> + logout("oom: %d %d\n", acb->aiocb_type, acb->aiocb_type);
> return -EIO;
> + }
> +
> + qemu_bh_schedule(acb->bh);
>
> return 0;
> }
>
> -static int nbd_write(BlockDriverState *bs, int64_t sector_num,
> - const uint8_t *buf, int nb_sectors)
> +static void nbd_readv_writev_bh_cb(void *p)
> {
> - BDRVNBDState *s = bs->opaque;
> - struct nbd_request request;
> - struct nbd_reply reply;
> + NBDAIOCB *acb = p;
>
> - request.type = NBD_CMD_WRITE;
> - request.handle = (uint64_t)(intptr_t)bs;
> - request.from = sector_num * 512;;
> - request.len = nb_sectors * 512;
> + size_t len, done = 0;
> + size_t total = acb->nb_sectors * SECTOR_SIZE;
>
> - if (nbd_send_request(s->sock, &request) == -1)
> - return -errno;
> + /* Where the read/write starts from */
> + off_t offset = acb->sector_num * SECTOR_SIZE;
> + BDRVNBDState *s = acb->common.bs->opaque;
>
> - if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len)
> - return -EIO;
> + AIOReq *aio_req;
> +
> + logout("Entering nbd_readv_writev_bh_cb\n");
>
> - if (nbd_receive_reply(s->sock, &reply) == -1)
> - return -errno;
> + qemu_bh_delete(acb->bh);
> + acb->bh = NULL;
>
> - if (reply.error !=0)
> - return -reply.error;
> + while (done < total) {
> + len = (total - done);
>
> - if (reply.handle != request.handle)
> - return -EIO;
> + /* Split read & write requests into segments if needed */
> + if (acb->aiocb_type == AIOCB_READ_UDATA && len > NBD_MAX_READ) {
> + len = NBD_MAX_READ;
> + }
>
> - return 0;
> + if (acb->aiocb_type == AIOCB_WRITE_UDATA && len > NBD_MAX_WRITE) {
> + len = NBD_MAX_WRITE;
> + }
> +
> + logout("Allocating an aio_req of %zu bytes\n", len);
> + aio_req = nbd_alloc_aio_req(s, acb, len, offset + done, done);
> +
> + done += len;
> + }
> +
> + if (QLIST_EMPTY(&acb->aioreq_head)) {
> + logout("acb->ioreq_head empty, so finishing acb now\n");
> + nbd_finish_aiocb(acb);
> + }
> + logout("Leaving nbd_readv_writev_bh_cb\n");
> + return;
> }
>
> -static void nbd_close(BlockDriverState *bs)
> +static BlockDriverAIOCB *nbd_aio_readv(BlockDriverState *bs,
> + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> + BlockDriverCompletionFunc *cb, void *opaque)
> +{
> + NBDAIOCB *acb;
> +
> + acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
> + acb->aiocb_type = AIOCB_READ_UDATA;
> + acb->aio_done_func = nbd_finish_aiocb;
> +
> + nbd_schedule_bh(nbd_readv_writev_bh_cb, acb);
> + return &acb->common;
> +}
> +
> +static BlockDriverAIOCB *nbd_aio_writev(BlockDriverState *bs,
> + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
> + BlockDriverCompletionFunc *cb, void *opaque)
> {
> - nbd_teardown_connection(bs);
> + NBDAIOCB *acb;
> +
> + acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
> + acb->aiocb_type = AIOCB_WRITE_UDATA;
> + acb->aio_done_func = nbd_finish_aiocb;
> +
> + nbd_schedule_bh(nbd_readv_writev_bh_cb, acb);
> + return &acb->common;
> }
>
> static int64_t nbd_getlength(BlockDriverState *bs)
> {
> BDRVNBDState *s = bs->opaque;
> -
> return s->size;
> }
>
> static BlockDriver bdrv_nbd = {
> - .format_name = "nbd",
> - .instance_size = sizeof(BDRVNBDState),
> - .bdrv_file_open = nbd_open,
> - .bdrv_read = nbd_read,
> - .bdrv_write = nbd_write,
> - .bdrv_close = nbd_close,
> - .bdrv_getlength = nbd_getlength,
> - .protocol_name = "nbd",
> + .format_name = "nbd",
> + .instance_size = sizeof(BDRVNBDState),
> + .bdrv_file_open = nbd_open,
> + .bdrv_aio_readv = nbd_aio_readv,
> + .bdrv_aio_writev = nbd_aio_writev,
> + .bdrv_close = nbd_close,
> + .bdrv_getlength = nbd_getlength,
> + .protocol_name = "nbd"
> };
>
> static void bdrv_nbd_init(void)
> diff --git a/nbd.c b/nbd.c
> index a3e6d52..26d33c5 100644
> --- a/nbd.c
> +++ b/nbd.c
> @@ -31,7 +31,7 @@
>
> #include "qemu_socket.h"
>
> -//#define DEBUG_NBD
> +#define DEBUG_NBD
>
> #ifdef DEBUG_NBD
> #define TRACE(msg, ...) do { \
> @@ -49,10 +49,6 @@
>
> /* This is all part of the "official" NBD API */
>
> -#define NBD_REPLY_SIZE (4 + 4 + 8)
> -#define NBD_REQUEST_MAGIC 0x25609513
> -#define NBD_REPLY_MAGIC 0x67446698
> -
> #define NBD_SET_SOCK _IO(0xab, 0)
> #define NBD_SET_BLKSIZE _IO(0xab, 1)
> #define NBD_SET_SIZE _IO(0xab, 2)
> @@ -107,6 +103,30 @@ size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
> return offset;
> }
>
> +ssize_t nbd_wr_aio(int fd, QEMUIOVector *qiov, size_t len, off_t offset,
> + bool do_read)
Isn't this name misleading? The function is completely synchronous. It
just happens not to block because it's only called when the socket is ready.
Kevin
next parent reply other threads:[~2011-04-06 11:28 UTC|newest]
Thread overview: 6+ messages / expand[flat|nested] mbox.gz Atom feed top
[not found] <1298389495-20618-5-git-send-email-nick@bytemark.co.uk>
[not found] ` <1298566151-27218-1-git-send-email-nick@bytemark.co.uk>
2011-04-06 11:30 ` Kevin Wolf [this message]
2011-04-28 15:16 ` [Qemu-devel] [PATCH] NBD: Convert the NBD driver to use the AIO interface Nicholas Thomas
2011-04-28 15:20 ` [Qemu-devel] [PATCH 1/2] NBD: Avoid leaking a couple of strings when the NBD device is closed nick
2011-05-03 11:01 ` Stefan Hajnoczi
2011-04-28 15:20 ` [Qemu-devel] [PATCH 2/2] NBD: Convert the NBD driver to use the AIO interface nick
2011-05-03 13:20 ` Nicholas Thomas
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=4D9C4ECB.8000908@redhat.com \
--to=kwolf@redhat.com \
--cc=nick@bytemark.co.uk \
--cc=qemu-devel@nongnu.org \
--cc=stefanha@gmail.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.