From: Chrysostomos Nanakos <cnanakos@grnet.gr>
To: Jeff Cody <jcody@redhat.com>
Cc: kwolf@redhat.com, qemu-devel@nongnu.org, stefanha@redhat.com
Subject: Re: [Qemu-devel] [PATCH v6 1/5] block: Support Archipelago as a QEMU block backend
Date: Thu, 10 Jul 2014 17:02:23 +0300 [thread overview]
Message-ID: <53BE9CEF.2060106@grnet.gr> (raw)
In-Reply-To: <53BE6546.5030704@grnet.gr>
On 07/10/2014 01:04 PM, Chrysostomos Nanakos wrote:
> On 07/10/2014 03:23 AM, Jeff Cody wrote:
>> On Fri, Jun 27, 2014 at 11:24:08AM +0300, Chrysostomos Nanakos wrote:
>>> VM Image on Archipelago volume is specified like this:
>>>
>>> file.driver=archipelago,file.volume=<volumename>[,file.mport=<mapperd_port>[,
>>>
>>> file.vport=<vlmcd_port>][,file.segment=<segment_name>]]
>>>
>>> 'archipelago' is the protocol.
>>>
>>> 'mport' is the port number on which mapperd is listening. This is
>>> optional
>>> and if not specified, QEMU will make Archipelago to use the default
>>> port.
>>>
>>> 'vport' is the port number on which vlmcd is listening. This is
>>> optional
>>> and if not specified, QEMU will make Archipelago to use the default
>>> port.
>>>
>>> 'segment' is the name of the shared memory segment Archipelago stack
>>> is using.
>>> This is optional and if not specified, QEMU will make Archipelago to
>>> use the
>>> default value, 'archipelago'.
>>>
>>> Examples:
>>>
>>> file.driver=archipelago,file.volume=my_vm_volume
>>> file.driver=archipelago,file.volume=my_vm_volume,file.mport=123
>>> file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
>>> file.vport=1234
>>> file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
>>> file.vport=1234,file.segment=my_segment
>>>
>>> Signed-off-by: Chrysostomos Nanakos <cnanakos@grnet.gr>
>> This is just a superficial review, because I don't have a good idea of
>> what archipelago or libxseg really does (I didn't even compile it or
>> these patches). But I scanned through this patch, and found a few
>> things, and had a few questions.
>
> No worries, every review is more than welcome.
>
>>
>>> ---
>>> MAINTAINERS | 6 +
>>> block/Makefile.objs | 2 +
>>> block/archipelago.c | 819
>>> +++++++++++++++++++++++++++++++++++++++++++++++++++
>>> configure | 40 +++
>>> 4 files changed, 867 insertions(+)
>>> create mode 100644 block/archipelago.c
>>>
>>> diff --git a/MAINTAINERS b/MAINTAINERS
>>> index 9b93edd..58ef1e3 100644
>>> --- a/MAINTAINERS
>>> +++ b/MAINTAINERS
>>> @@ -999,3 +999,9 @@ SSH
>>> M: Richard W.M. Jones <rjones@redhat.com>
>>> S: Supported
>>> F: block/ssh.c
>>> +
>>> +ARCHIPELAGO
>>> +M: Chrysostomos Nanakos <cnanakos@grnet.gr>
>>> +M: Chrysostomos Nanakos <chris@include.gr>
>>> +S: Maintained
>>> +F: block/archipelago.c
>>> diff --git a/block/Makefile.objs b/block/Makefile.objs
>>> index fd88c03..858d2b3 100644
>>> --- a/block/Makefile.objs
>>> +++ b/block/Makefile.objs
>>> @@ -17,6 +17,7 @@ block-obj-$(CONFIG_LIBNFS) += nfs.o
>>> block-obj-$(CONFIG_CURL) += curl.o
>>> block-obj-$(CONFIG_RBD) += rbd.o
>>> block-obj-$(CONFIG_GLUSTERFS) += gluster.o
>>> +block-obj-$(CONFIG_ARCHIPELAGO) += archipelago.o
>>> block-obj-$(CONFIG_LIBSSH2) += ssh.o
>>> endif
>>> @@ -35,5 +36,6 @@ gluster.o-cflags := $(GLUSTERFS_CFLAGS)
>>> gluster.o-libs := $(GLUSTERFS_LIBS)
>>> ssh.o-cflags := $(LIBSSH2_CFLAGS)
>>> ssh.o-libs := $(LIBSSH2_LIBS)
>>> +archipelago.o-libs := $(ARCHIPELAGO_LIBS)
>>> qcow.o-libs := -lz
>>> linux-aio.o-libs := -laio
>>> diff --git a/block/archipelago.c b/block/archipelago.c
>>> new file mode 100644
>>> index 0000000..c56826a
>>> --- /dev/null
>>> +++ b/block/archipelago.c
>>> @@ -0,0 +1,819 @@
>>> +/*
>>> + * QEMU Block driver for Archipelago
>>> + *
>>> + * Copyright 2014 GRNET S.A. All rights reserved.
>>> + *
>>> + * Redistribution and use in source and binary forms, with or
>>> + * without modification, are permitted provided that the following
>>> + * conditions are met:
>>> + *
>>> + * 1. Redistributions of source code must retain the above
>>> + * copyright notice, this list of conditions and the following
>>> + * disclaimer.
>>> + * 2. Redistributions in binary form must reproduce the above
>>> + * copyright notice, this list of conditions and the following
>>> + * disclaimer in the documentation and/or other materials
>>> + * provided with the distribution.
>>> + *
>>> + * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
>>> + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
>>> + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
>>> + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
>>> + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
>>> + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
>>> + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
>>> + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
>>> + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
>>> + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
>>> + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
>>> + * POSSIBILITY OF SUCH DAMAGE.
>>> + *
>>> + * The views and conclusions contained in the software and
>>> + * documentation are those of the authors and should not be
>>> + * interpreted as representing official policies, either expressed
>>> + * or implied, of GRNET S.A.
>>> + */
>>> +
>>> +/*
>>> +* VM Image on Archipelago volume is specified like this:
>>> +*
>>> +*
>>> file.driver=archipelago,file.volume=<volumename>[,file.mport=<mapperd_port>[,
>>> +* file.vport=<vlmcd_port>][,file.segment=<segment_name>]]
>>> +*
>>> +* 'archipelago' is the protocol.
>>> +*
>>> +* 'mport' is the port number on which mapperd is listening. This is
>>> optional
>>> +* and if not specified, QEMU will make Archipelago to use the
>>> default port.
>>> +*
>>> +* 'vport' is the port number on which vlmcd is listening. This is
>>> optional
>>> +* and if not specified, QEMU will make Archipelago to use the
>>> default port.
>>> +*
>>> +* 'segment' is the name of the shared memory segment Archipelago
>>> stack is using.
>>> +* This is optional and if not specified, QEMU will make Archipelago
>>> to use the
>>> +* default value, 'archipelago'.
>>> +*
>>> +* Examples:
>>> +*
>>> +* file.driver=archipelago,file.volume=my_vm_volume
>>> +* file.driver=archipelago,file.volume=my_vm_volume,file.mport=123
>>> +* file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
>>> +* file.vport=1234
>>> +* file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
>>> +* file.vport=1234,file.segment=my_segment
>>> +*/
>>> +
>>> +#include "block/block_int.h"
>>> +#include "qemu/error-report.h"
>>> +#include "qemu/thread.h"
>>> +#include "qapi/qmp/qint.h"
>>> +#include "qapi/qmp/qstring.h"
>>> +#include "qapi/qmp/qjson.h"
>>> +
>>> +#include <inttypes.h>
>>> +#include <xseg/xseg.h>
>>> +#include <xseg/protocol.h>
>>> +
>>> +#define ARCHIP_FD_READ 0
>>> +#define ARCHIP_FD_WRITE 1
>>> +#define MAX_REQUEST_SIZE 524288
>>> +
>>> +#define ARCHIPELAGO_OPT_VOLUME "volume"
>>> +#define ARCHIPELAGO_OPT_SEGMENT "segment"
>>> +#define ARCHIPELAGO_OPT_MPORT "mport"
>>> +#define ARCHIPELAGO_OPT_VPORT "vport"
>>> +
>>> +#define archipelagolog(fmt, ...) \
>>> + do { \
>>> + fprintf(stderr, "archipelago\t%-24s: " fmt, __func__,
>>> ##__VA_ARGS__); \
>>> + } while (0)
>>> +
>>> +typedef enum {
>>> + ARCHIP_OP_READ,
>>> + ARCHIP_OP_WRITE,
>>> + ARCHIP_OP_FLUSH,
>>> + ARCHIP_OP_VOLINFO,
>>> +} ARCHIPCmd;
>>> +
>>> +typedef struct ArchipelagoAIOCB {
>>> + BlockDriverAIOCB common;
>>> + QEMUBH *bh;
>>> + struct BDRVArchipelagoState *s;
>>> + QEMUIOVector *qiov;
>>> + void *buffer;
>>> + ARCHIPCmd cmd;
>>> + bool cancelled;
>>> + int status;
>>> + int64_t size;
>>> + int64_t ret;
>>> +} ArchipelagoAIOCB;
>>> +
>>> +typedef struct BDRVArchipelagoState {
>>> + ArchipelagoAIOCB *event_acb;
>>> + char *volname;
>>> + char *segment_name;
>>> + uint64_t size;
>>> + /* Archipelago specific */
>>> + struct xseg *xseg;
>> I assume s->xseg is allocated in xseg_join() - is it ever freed? In
>> _close(), there is a final call to xseg_leave(s->xseg), but from what
>> I found in libxseg, it does not appear to be freed:
>> https://github.com/cnanakos/libxseg/blob/develop/src/xseg.c#L975
>>
>> Is it up to libxseg to free xseg, or the caller?
>
> libxseg allocated xseg and should free it also. I will fix that in
> libxseg. Thanks!
>
>>> + struct xseg_port *port;
>>> + xport srcport;
>>> + xport sport;
>>> + xport mportno;
>>> + xport vportno;
>>> + QemuMutex archip_mutex;
>>> + QemuCond archip_cond;
>>> + bool is_signaled;
>>> + /* Request handler specific */
>>> + QemuThread request_th;
>>> + QemuCond request_cond;
>>> + QemuMutex request_mutex;
>>> + bool th_is_signaled;
>>> + bool stopping;
>>> +} BDRVArchipelagoState;
>>> +
>>> +typedef struct ArchipelagoSegmentedRequest {
>>> + size_t count;
>>> + size_t total;
>>> + int ref;
>>> + int failed;
>>> +} ArchipelagoSegmentedRequest;
>>> +
>>> +typedef struct AIORequestData {
>>> + const char *volname;
>>> + off_t offset;
>>> + size_t size;
>>> + uint64_t bufidx;
>>> + int ret;
>>> + int op;
>>> + ArchipelagoAIOCB *aio_cb;
>>> + ArchipelagoSegmentedRequest *segreq;
>>> +} AIORequestData;
>>> +
>>> +static void qemu_archipelago_complete_aio(void *opaque);
>>> +
>>> +static void init_local_signal(struct xseg *xseg, xport sport, xport
>>> srcport)
>>> +{
>>> + if (xseg && (sport != srcport)) {
>>> + xseg_init_local_signal(xseg, srcport);
>>> + sport = srcport;
>>> + }
>>> +}
>>> +
>>> +static void archipelago_finish_aiocb(AIORequestData *reqdata)
>>> +{
>>> + if (reqdata->aio_cb->ret != reqdata->segreq->total) {
>>> + reqdata->aio_cb->ret = -EIO;
>>> + } else if (reqdata->aio_cb->ret == reqdata->segreq->total) {
>>> + reqdata->aio_cb->ret = 0;
>>> + }
>>> + reqdata->aio_cb->bh = aio_bh_new(
>>> + bdrv_get_aio_context(reqdata->aio_cb->common.bs),
>>> + qemu_archipelago_complete_aio, reqdata
>>> + );
>>> + qemu_bh_schedule(reqdata->aio_cb->bh);
>>> +}
>>> +
>>> +static int wait_reply(struct xseg *xseg, xport srcport, struct
>>> xseg_port *port,
>>> + struct xseg_request *expected_req)
>>> +{
>>> + struct xseg_request *req;
>>> + xseg_prepare_wait(xseg, srcport);
>>> + void *psd = xseg_get_signal_desc(xseg, port);
>>> + while (1) {
>>> + req = xseg_receive(xseg, srcport, 0);
>>> + if (req) {
>>> + if (req != expected_req) {
>>> + archipelagolog("Unknown received request\n");
>>> + xseg_put_request(xseg, req, srcport);
>>> + } else if (!(req->state & XS_SERVED)) {
>>> + return -1;
>>> + } else {
>>> + break;
>>> + }
>>> + }
>>> + xseg_wait_signal(xseg, psd, 100000UL);
>>> + }
>>> + xseg_cancel_wait(xseg, srcport);
>>> + return 0;
>>> +}
>>> +
>>> +static void xseg_request_handler(void *state)
>>> +{
>>> + BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
>>> + void *psd = xseg_get_signal_desc(s->xseg, s->port);
>>> + qemu_mutex_lock(&s->request_mutex);
>>> +
>>> + while (!s->stopping) {
>>> + struct xseg_request *req;
>>> + void *data;
>>> + xseg_prepare_wait(s->xseg, s->srcport);
>>> + req = xseg_receive(s->xseg, s->srcport, 0);
>> Is this a blocking call? If so, is there a timeout, and if not, what
>> scenarios (if any) could cause us to wait here indefinitely?
>
> xseg_receive won't block until a request is received but It will wait
> until it takes
> the volume lock, check if there is or not a request and will return
> after giving back the volume lock.
> If it can't take the lock, it could wait indefinitely, there is no
> timeout for that at the moment
> but I could easily add this kind of functionality.
>
> On the other hand xseg_receive won't wait for the volume lock if the
> caller sets X_NONBLOCK.
>
> After that, I believe I should set here and in wait_reply() also, the
> X_NONBLOCK flag.
s/volume lock/receive queue lock/g
>
>
>
>>
>>> + if (req) {
>>> + AIORequestData *reqdata;
>>> + ArchipelagoSegmentedRequest *segreq;
>>> + xseg_get_req_data(s->xseg, req, (void **)&reqdata);
>>> +
>>> + switch (reqdata->op) {
>>> + case ARCHIP_OP_READ:
>>> + data = xseg_get_data(s->xseg, req);
>>> + segreq = reqdata->segreq;
>>> + segreq->count += req->serviced;
>>> +
>>> + qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx,
>>> + data,
>>> + req->serviced);
>>> +
>>> + xseg_put_request(s->xseg, req, s->srcport);
>>> +
>>> + if ((__sync_add_and_fetch(&segreq->ref, -1)) ==
>>> 0) {
>>> + if (!segreq->failed) {
>>> + reqdata->aio_cb->ret = segreq->count;
>>> + archipelago_finish_aiocb(reqdata);
>>> + g_free(segreq);
>>> + } else {
>>> + g_free(segreq);
>>> + g_free(reqdata);
>>> + }
>>> + } else {
>>> + g_free(reqdata);
>>> + }
>>> + break;
>>> + case ARCHIP_OP_WRITE:
>>> + segreq = reqdata->segreq;
>>> + segreq->count += req->serviced;
>>> + xseg_put_request(s->xseg, req, s->srcport);
>>> +
>>> + if ((__sync_add_and_fetch(&segreq->ref, -1)) ==
>>> 0) {
>>> + if (!segreq->failed) {
>>> + reqdata->aio_cb->ret = segreq->count;
>>> + archipelago_finish_aiocb(reqdata);
>>> + g_free(segreq);
>>> + } else {
>>> + g_free(segreq);
>>> + g_free(reqdata);
>>> + }
>>> + } else {
>>> + g_free(reqdata);
>>> +
>> This (OP_WRITE / OP_READ) is where I am worried that we leak in error
>> cases, and a _close() won't clean it up (see later comments).
>
>>> + break;
>>> + case ARCHIP_OP_VOLINFO:
>>> + s->is_signaled = true;
>>> + qemu_cond_signal(&s->archip_cond);
>>> + break;
>>> + }
>>> + } else {
>>> + xseg_wait_signal(s->xseg, psd, 100000UL);
>>> + }
>>> + xseg_cancel_wait(s->xseg, s->srcport);
>>
>>
>>> + }
>>> +
>>> + s->th_is_signaled = true;
>>> + qemu_cond_signal(&s->request_cond);
>>> + qemu_mutex_unlock(&s->request_mutex);
>>> + qemu_thread_exit(NULL);
>>> +}
>>> +
>>> +static int qemu_archipelago_xseg_init(BDRVArchipelagoState *s)
>>> +{
>>> + if (xseg_initialize()) {
>>> + archipelagolog("Cannot initialize XSEG\n");
>>> + goto err_exit;
>>> + }
>>> +
>>> + s->xseg = xseg_join((char *)"posix", s->segment_name,
>>> + (char *)"posixfd", NULL);
>>> + if (!s->xseg) {
>>> + archipelagolog("Cannot join XSEG shared memory segment\n");
>>> + goto err_exit;
>>> + }
>>> + s->port = xseg_bind_dynport(s->xseg);
>>> + s->srcport = s->port->portno;
>>> + init_local_signal(s->xseg, s->sport, s->srcport);
>>> + return 0;
>>> +
>>> +err_exit:
>>> + return -1;
>>> +}
>>> +
>>> +static int qemu_archipelago_init(BDRVArchipelagoState *s)
>>> +{
>>> + int ret;
>>> +
>>> + ret = qemu_archipelago_xseg_init(s);
>>> + if (ret < 0) {
>>> + error_report("Cannot initialize XSEG. Aborting...\n");
>>> + goto err_exit;
>>> + }
>>> +
>>> + qemu_cond_init(&s->archip_cond);
>>> + qemu_mutex_init(&s->archip_mutex);
>>> + qemu_cond_init(&s->request_cond);
>>> + qemu_mutex_init(&s->request_mutex);
>>> + s->th_is_signaled = false;
>>> + qemu_thread_create(&s->request_th, "xseg_io_th",
>>> + (void *) xseg_request_handler,
>>> + (void *) s, QEMU_THREAD_JOINABLE);
>>> +
>>> +err_exit:
>>> + return ret;
>>> +}
>>> +
>>> +static void qemu_archipelago_complete_aio(void *opaque)
>>> +{
>>> + AIORequestData *reqdata = (AIORequestData *) opaque;
>>> + ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) reqdata->aio_cb;
>>> +
>>> + qemu_bh_delete(aio_cb->bh);
>>> + qemu_vfree(aio_cb->buffer);
>>> + aio_cb->common.cb(aio_cb->common.opaque, aio_cb->ret);
>>> + aio_cb->status = 0;
>>> +
>>> + if (!aio_cb->cancelled) {
>>> + qemu_aio_release(aio_cb);
>>> + }
>>> + g_free(reqdata);
>>> +}
>>> +
>>> +static QemuOptsList archipelago_runtime_opts = {
>>> + .name = "archipelago",
>>> + .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
>>> + .desc = {
>>> + {
>>> + .name = ARCHIPELAGO_OPT_VOLUME,
>>> + .type = QEMU_OPT_STRING,
>>> + .help = "Name of the volume image",
>>> + },
>>> + {
>>> + .name = ARCHIPELAGO_OPT_SEGMENT,
>>> + .type = QEMU_OPT_STRING,
>>> + .help = "Name of the Archipelago shared memory segment",
>>> + },
>>> + {
>>> + .name = ARCHIPELAGO_OPT_MPORT,
>>> + .type = QEMU_OPT_NUMBER,
>>> + .help = "Archipelago mapperd port number"
>>> + },
>>> + {
>>> + .name = ARCHIPELAGO_OPT_VPORT,
>>> + .type = QEMU_OPT_NUMBER,
>>> + .help = "Archipelago vlmcd port number"
>>> +
>>> + },
>>> + { /* end of list */ }
>>> + },
>>> +};
>>> +
>>> +static int qemu_archipelago_open(BlockDriverState *bs,
>>> + QDict *options,
>>> + int bdrv_flags,
>>> + Error **errp)
>>> +{
>>> + int ret = 0;
>>> + const char *volume, *segment_name;
>>> + QemuOpts *opts;
>>> + Error *local_err = NULL;
>>> + BDRVArchipelagoState *s = bs->opaque;
>>> +
>>> + opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0,
>>> &error_abort);
>>> + qemu_opts_absorb_qdict(opts, options, &local_err);
>>> + if (local_err) {
>>> + error_propagate(errp, local_err);
>>> + qemu_opts_del(opts);
>>> + return -EINVAL;
>>> + }
>>> +
>>> + s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT,
>>> 1001);
>>> + s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT,
>>> 501);
>>> +
>>> + segment_name = qemu_opt_get(opts, ARCHIPELAGO_OPT_SEGMENT);
>>> + if (segment_name == NULL) {
>>> + s->segment_name = g_strdup("archipelago");
>>> + } else {
>>> + s->segment_name = g_strdup(segment_name);
>>> + }
>>> +
>>> + volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
>>> + if (volume == NULL) {
>>> + error_setg(errp, "archipelago block driver requires the
>>> 'volume'"
>>> + " option");
>>> + qemu_opts_del(opts);
>>> + return -EINVAL;
>> s->segment_name is leaked here.
>>
>> You already have an exit label (err_exit) that cleans everything up,
>> and g_free() is NULL safe (and bs->opaque is zero-initialized).
>>
>> You should be able to just set ret, and 'goto err_exit' in each error
>> instance in qemu_archipelago_open() - this also gets rid of the extra
>> qemu_opts_del() calls.
>
> Missed it. Thanks for that!
>>
>>> + }
>>> + s->volname = g_strdup(volume);
>>> +
>>> + /* Initialize XSEG, join shared memory segment */
>>> + ret = qemu_archipelago_init(s);
>>> + if (ret < 0) {
>>> + error_setg(errp, "cannot initialize XSEG and join shared "
>>> + "memory segment");
>>> + goto err_exit;
>>> + }
>>> +
>>> + qemu_opts_del(opts);
>>> + return 0;
>>> +
>>> +err_exit:
>>> + g_free(s->volname);
>>> + g_free(s->segment_name);
>>> + qemu_opts_del(opts);
>>> + return ret;
>>> +}
>>> +
>>> +static void qemu_archipelago_close(BlockDriverState *bs)
>>> +{
>>> + int r, targetlen;
>>> + char *target;
>>> + struct xseg_request *req;
>>> + BDRVArchipelagoState *s = bs->opaque;
>>> +
>>> + s->stopping = true;
>>> +
>>> + qemu_mutex_lock(&s->request_mutex);
>>> + while (!s->th_is_signaled) {
>>> + qemu_cond_wait(&s->request_cond,
>>> + &s->request_mutex);
>>> + }
>>> + qemu_mutex_unlock(&s->request_mutex);
>>> + qemu_thread_join(&s->request_th);
>>> + qemu_cond_destroy(&s->request_cond);
>>> + qemu_mutex_destroy(&s->request_mutex);
>>> +
>>> + qemu_cond_destroy(&s->archip_cond);
>>> + qemu_mutex_destroy(&s->archip_mutex);
>>> +
>>> + targetlen = strlen(s->volname);
>> Should this be strlen(s->volname) + 1, to account for the '\0'? Or
>> does xseg_prep_request() just need the length of the non-null
>> terminated string?
>
> You are right, xseg_prep_request() needs the length of the non-null
> terminated string.
>
>>
>>> + req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
>>> + if (!req) {
>>> + archipelagolog("Cannot get XSEG request\n");
>>> + goto err_exit;
>>> + }
>>> + r = xseg_prep_request(s->xseg, req, targetlen, 0);
>>> + if (r < 0) {
>>> + xseg_put_request(s->xseg, req, s->srcport);
>> What does this do here, if xseg_prep_request() failed? Is it
>> essentially a cleanup function?
>
> Yes this is a cleanup function.
>
>>> + archipelagolog("Cannot prepare XSEG close request\n");
>>> + goto err_exit;
>>> + }
>>> +
>>> + target = xseg_get_target(s->xseg, req);
>>> + memcpy(target, s->volname, targetlen);
>>> + req->size = req->datalen;
>>> + req->offset = 0;
>>> + req->op = X_CLOSE;
>>> +
>>> + xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
>>> + if (p == NoPort) {
>>> + xseg_put_request(s->xseg, req, s->srcport);
>>> + archipelagolog("Cannot submit XSEG close request\n");
>>> + goto err_exit;
>>> + }
>>> +
>>> + xseg_signal(s->xseg, p);
>>> + wait_reply(s->xseg, s->srcport, s->port, req);
>> This is another spot I am wondering if we could get stuck on a
>> blocking call that could potentially wait forever... is there a
>> timeout here?
>
> For the same reasons I explained in xseg_receive(). We will resolve
> this by setting X_NONBLOCK.
>
>>
>>> +
>>> + xseg_put_request(s->xseg, req, s->srcport);
>>> +
>>> +err_exit:
>>> + g_free(s->volname);
>>> + g_free(s->segment_name);
>>> + xseg_quit_local_signal(s->xseg, s->srcport);
>>> + xseg_leave_dynport(s->xseg, s->port);
>>> + xseg_leave(s->xseg);
>>> +}
>>> +
>>> +static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb)
>>> +{
>>> + ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb;
>>> + aio_cb->cancelled = true;
>>> + while (aio_cb->status == -EINPROGRESS) {
>>> + qemu_aio_wait();
>>> + }
>>> + qemu_aio_release(aio_cb);
>>> +}
>>> +
>>> +static const AIOCBInfo archipelago_aiocb_info = {
>>> + .aiocb_size = sizeof(ArchipelagoAIOCB),
>>> + .cancel = qemu_archipelago_aio_cancel,
>>> +};
>>> +
>>> +static int __archipelago_submit_request(BDRVArchipelagoState *s,
>>> + uint64_t bufidx,
>>> + size_t count,
>>> + off_t offset,
>>> + ArchipelagoAIOCB *aio_cb,
>>> + ArchipelagoSegmentedRequest *segreq,
>>> + int op)
>>> +{
>>> + int ret, targetlen;
>>> + char *target;
>>> + void *data = NULL;
>>> + struct xseg_request *req;
>>> + AIORequestData *reqdata = g_malloc(sizeof(AIORequestData));
>>> +
>>> + targetlen = strlen(s->volname);
>>> + req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
>>> + if (!req) {
>>> + archipelagolog("Cannot get XSEG request\n");
>>> + goto err_exit2;
>>> + }
>>> + ret = xseg_prep_request(s->xseg, req, targetlen, count);
>>> + if (ret < 0) {
>>> + archipelagolog("Cannot prepare XSEG request\n");
>>> + goto err_exit;
>>> + }
>>> + target = xseg_get_target(s->xseg, req);
>>> + if (!target) {
>>> + archipelagolog("Cannot get XSEG target\n");
>>> + goto err_exit;
>>> + }
>>> + memcpy(target, s->volname, targetlen);
>>> + req->size = count;
>>> + req->offset = offset;
>>> +
>>> + switch (op) {
>>> + case ARCHIP_OP_READ:
>>> + req->op = X_READ;
>>> + break;
>>> + case ARCHIP_OP_WRITE:
>>> + req->op = X_WRITE;
>>> + break;
>>> + }
>>> + reqdata->volname = s->volname;
>>> + reqdata->offset = offset;
>>> + reqdata->size = count;
>>> + reqdata->bufidx = bufidx;
>>> + reqdata->aio_cb = aio_cb;
>>> + reqdata->segreq = segreq;
>>> + reqdata->op = op;
>>> +
>>> + xseg_set_req_data(s->xseg, req, reqdata);
>>> + if (op == ARCHIP_OP_WRITE) {
>>> + data = xseg_get_data(s->xseg, req);
>>> + if (!data) {
>>> + archipelagolog("Cannot get XSEG data\n");
>>> + goto err_exit;
>>> + }
>>> + memcpy(data, aio_cb->buffer + bufidx, count);
>>> + }
>>> +
>>> + xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
>>> + if (p == NoPort) {
>>> + archipelagolog("Could not submit XSEG request\n");
>>> + goto err_exit;
>>> + }
>>> + xseg_signal(s->xseg, p);
>>> + return 0;
>>> +
>>> +err_exit:
>>> + g_free(reqdata);
>>> + xseg_put_request(s->xseg, req, s->srcport);
>>> + return -EIO;
>>> +err_exit2:
>>> + g_free(reqdata);
>>> + return -EIO;
>>> +}
>>> +
>>> +static int archipelago_aio_segmented_rw(BDRVArchipelagoState *s,
>>> + size_t count,
>>> + off_t offset,
>>> + ArchipelagoAIOCB *aio_cb,
>>> + int op)
>>> +{
>>> + int i, ret, segments_nr, last_segment_size;
>>> + ArchipelagoSegmentedRequest *segreq;
>>> +
>>> + segreq = g_malloc(sizeof(ArchipelagoSegmentedRequest));
>>> +
>>> + if (op == ARCHIP_OP_FLUSH) {
>>> + segments_nr = 1;
>>> + segreq->ref = segments_nr;
>>> + segreq->total = count;
>>> + segreq->count = 0;
>>> + segreq->failed = 0;
>>> + ret = __archipelago_submit_request(s, 0, count, offset,
>>> aio_cb,
>>> + segreq, ARCHIP_OP_WRITE);
>>> + if (ret < 0) {
>>> + goto err_exit;
>>> + }
>>> + return 0;
>>> + }
>>> +
>>> + segments_nr = (int)(count / MAX_REQUEST_SIZE) + \
>>> + ((count % MAX_REQUEST_SIZE) ? 1 : 0);
>>> + last_segment_size = (int)(count % MAX_REQUEST_SIZE);
>>> +
>>> + segreq->ref = segments_nr;
>>> + segreq->total = count;
>>> + segreq->count = 0;
>>> + segreq->failed = 0;
>>> +
>>> + for (i = 0; i < segments_nr - 1; i++) {
>>> + ret = __archipelago_submit_request(s, i * MAX_REQUEST_SIZE,
>>> + MAX_REQUEST_SIZE,
>>> + offset + i *
>>> MAX_REQUEST_SIZE,
>>> + aio_cb, segreq, op);
>>> +
>>> + if (ret < 0) {
>>> + goto err_exit;
>>> + }
>>> + }
>>> +
>>> + if ((segments_nr > 1) && last_segment_size) {
>>> + ret = __archipelago_submit_request(s, i * MAX_REQUEST_SIZE,
>>> + last_segment_size,
>>> + offset + i *
>>> MAX_REQUEST_SIZE,
>>> + aio_cb, segreq, op);
>>> + } else if ((segments_nr > 1) && !last_segment_size) {
>>> + ret = __archipelago_submit_request(s, i * MAX_REQUEST_SIZE,
>>> + MAX_REQUEST_SIZE,
>>> + offset + i *
>>> MAX_REQUEST_SIZE,
>>> + aio_cb, segreq, op);
>>> + } else if (segments_nr == 1) {
>>> + ret = __archipelago_submit_request(s, 0, count, offset,
>>> aio_cb,
>>> + segreq, op);
>>> + }
>>> +
>>> + if (ret < 0) {
>>> + goto err_exit;
>>> + }
>>> +
>>> + return 0;
>>> +
>>> +err_exit:
>>> + __sync_add_and_fetch(&segreq->failed, 1);
>>> + if (segments_nr == 1) {
>>> + if (__sync_add_and_fetch(&segreq->ref, -1) == 0) {
>>> + g_free(segreq);
>>> + }
>>> + } else {
>>> + if ((__sync_add_and_fetch(&segreq->ref, -segments_nr + i))
>>> == 0) {
>>> + g_free(segreq);
>>> + }
>>> + }
>> Don't we run the risk of leaking segreq here? The other place this is
>> freed is in xseg_request_handler(), but could we run into a race
>> condition where 's->stopping' is true, or even xseg_receive() just
>> does not
>> return a request?
>
> If 's->stopping' is true means that _close() has been invoked. How
> QEMU handles unserviced requests while in the meantime someone invokes
> _close()? Does it wait for the requests to finish and then exits? Or
> it exits silently without checking for pending requests?
>
> If xseg_receive() does not return an already submitted request then
> the problem is located in Archipelago stack. Someone should check why
> the pending requests are not serviced and resolve the problem. The
> question here is the same as before, how QEMU handles pending requests
> while in the meantime invokes _close()?
>
> Until all pending requests are serviced successfully or not, segreq
> allocations will remain and not freed. Another approach could have
> been a linked list that tracks all submitted requests and handle them
> accordingly on _close().
>
> Suggestions here are more than welcome!
>
>
>>
>>> +
>>> + return ret;
>>> +}
>>> +
>>> +static BlockDriverAIOCB *qemu_archipelago_aio_rw(BlockDriverState *bs,
>>> + int64_t sector_num,
>>> + QEMUIOVector *qiov,
>>> + int nb_sectors,
>>> + BlockDriverCompletionFunc *cb,
>>> + void *opaque,
>>> + int op)
>>> +{
>>> + ArchipelagoAIOCB *aio_cb;
>>> + BDRVArchipelagoState *s = bs->opaque;
>>> + int64_t size, off;
>>> + int ret;
>>> +
>>> + aio_cb = qemu_aio_get(&archipelago_aiocb_info, bs, cb, opaque);
>>> + aio_cb->cmd = op;
>>> + aio_cb->qiov = qiov;
>>> +
>>> + if (op != ARCHIP_OP_FLUSH) {
>>> + aio_cb->buffer = qemu_blockalign(bs, qiov->size);
>>> + } else {
>>> + aio_cb->buffer = NULL;
>>> + }
>>> +
>>> + aio_cb->ret = 0;
>>> + aio_cb->s = s;
>>> + aio_cb->cancelled = false;
>>> + aio_cb->status = -EINPROGRESS;
>>> +
>>> + if (op == ARCHIP_OP_WRITE) {
>>> + qemu_iovec_to_buf(aio_cb->qiov, 0, aio_cb->buffer,
>>> qiov->size);
>>> + }
>>> +
>>> + off = sector_num * BDRV_SECTOR_SIZE;
>>> + size = nb_sectors * BDRV_SECTOR_SIZE;
>>> + aio_cb->size = size;
>>> +
>>> + ret = archipelago_aio_segmented_rw(s, size, off,
>>> + aio_cb, op);
>>> + if (ret < 0) {
>>> + goto err_exit;
>>> + }
>>> + return &aio_cb->common;
>>> +
>>> +err_exit:
>>> + error_report("qemu_archipelago_aio_rw(): I/O Error\n");
>>> + qemu_vfree(aio_cb->buffer);
>>> + qemu_aio_release(aio_cb);
>>> + return NULL;
>>> +}
>>> +
>>> +static BlockDriverAIOCB
>>> *qemu_archipelago_aio_readv(BlockDriverState *bs,
>>> + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
>>> + BlockDriverCompletionFunc *cb, void *opaque)
>>> +{
>>> + return qemu_archipelago_aio_rw(bs, sector_num, qiov,
>>> nb_sectors, cb,
>>> + opaque, ARCHIP_OP_READ);
>>> +}
>>> +
>>> +static BlockDriverAIOCB
>>> *qemu_archipelago_aio_writev(BlockDriverState *bs,
>>> + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
>>> + BlockDriverCompletionFunc *cb, void *opaque)
>>> +{
>>> + return qemu_archipelago_aio_rw(bs, sector_num, qiov,
>>> nb_sectors, cb,
>>> + opaque, ARCHIP_OP_WRITE);
>>> +}
>>> +
>>> +static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
>>> +{
>>> + uint64_t size;
>>> + int ret, targetlen;
>>> + struct xseg_request *req;
>>> + struct xseg_reply_info *xinfo;
>>> + AIORequestData *reqdata = g_malloc(sizeof(AIORequestData));
>>> +
>>> + const char *volname = s->volname;
>>> + targetlen = strlen(volname);
>>> + req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
>>> + if (!req) {
>>> + archipelagolog("Cannot get XSEG request\n");
>>> + goto err_exit2;
>>> + }
>>> + ret = xseg_prep_request(s->xseg, req, targetlen,
>>> + sizeof(struct xseg_reply_info));
>>> + if (ret < 0) {
>>> + archipelagolog("Cannot prepare XSEG request\n");
>>> + goto err_exit;
>>> + }
>>> + char *target = xseg_get_target(s->xseg, req);
>>> + if (!target) {
>>> + archipelagolog("Cannot get XSEG target\n");
>>> + goto err_exit;
>>> + }
>>> + memcpy(target, volname, targetlen);
>>> + req->size = req->datalen;
>>> + req->offset = 0;
>>> + req->op = X_INFO;
>>> +
>>> + reqdata->op = ARCHIP_OP_VOLINFO;
>>> + reqdata->volname = volname;
>>> + xseg_set_req_data(s->xseg, req, reqdata);
>>> +
>>> + xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
>>> + if (p == NoPort) {
>>> + archipelagolog("Cannot submit XSEG request\n");
>>> + goto err_exit;
>>> + }
>>> + xseg_signal(s->xseg, p);
>>> + qemu_mutex_lock(&s->archip_mutex);
>>> + while (!s->is_signaled) {
>>> + qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
>>> + }
>>> + s->is_signaled = false;
>>> + qemu_mutex_unlock(&s->archip_mutex);
>>> +
>>> + xinfo = (struct xseg_reply_info *) xseg_get_data(s->xseg, req);
>>> + size = xinfo->size;
>>> + xseg_put_request(s->xseg, req, s->srcport);
>>> + g_free(reqdata);
>>> + s->size = size;
>>> + return size;
>>> +
>>
>>
>>> +err_exit:
>>> + g_free(reqdata);
>>> + xseg_put_request(s->xseg, req, s->srcport);
>>> + return -1;
>>> +err_exit2:
>>> + g_free(reqdata);
>>> + return -1;
>>> +}
>> This could be simplified to just:
>>
>> err_exit:
>> xseg_put_request(s->xseg, req, s->srcport);
>> err_exit2:
>> g_free(reqdata);
>> return -1;
>> }
>>
>> Maybe it'd also be best to return -EIO (or other meaningful error
>> value) instead of just -1, as this value gets passed along to
>> .bdrv_getlength().
>>
>>> +
>>> +static int64_t qemu_archipelago_getlength(BlockDriverState *bs)
>>> +{
>>> + int64_t ret;
>>> + BDRVArchipelagoState *s = bs->opaque;
>>> +
>>> + ret = archipelago_volume_info(s);
>> (This is where I am talking about an error value such as -EIO may be
>> better)
>
> Yes, it seems a lot better. Thanks!
>
>>
>>> + return ret;
>>> +}
>>> +
>>> +static BlockDriverAIOCB
>>> *qemu_archipelago_aio_flush(BlockDriverState *bs,
>>> + BlockDriverCompletionFunc *cb, void *opaque)
>>> +{
>>> + return qemu_archipelago_aio_rw(bs, 0, NULL, 0, cb, opaque,
>>> + ARCHIP_OP_FLUSH);
>>> +}
>>> +
>>> +static BlockDriver bdrv_archipelago = {
>>> + .format_name = "archipelago",
>>> + .protocol_name = "archipelago",
>>> + .instance_size = sizeof(BDRVArchipelagoState),
>>> + .bdrv_file_open = qemu_archipelago_open,
>>> + .bdrv_close = qemu_archipelago_close,
>>> + .bdrv_getlength = qemu_archipelago_getlength,
>>> + .bdrv_aio_readv = qemu_archipelago_aio_readv,
>>> + .bdrv_aio_writev = qemu_archipelago_aio_writev,
>>> + .bdrv_aio_flush = qemu_archipelago_aio_flush,
>>> + .bdrv_has_zero_init = bdrv_has_zero_init_1,
>>> +};
>>> +
>>> +static void bdrv_archipelago_init(void)
>>> +{
>>> + bdrv_register(&bdrv_archipelago);
>>> +}
>>> +
>>> +block_init(bdrv_archipelago_init);
>>> diff --git a/configure b/configure
>>> index 7102964..e4acd9c 100755
>>> --- a/configure
>>> +++ b/configure
>>> @@ -326,6 +326,7 @@ seccomp=""
>>> glusterfs=""
>>> glusterfs_discard="no"
>>> glusterfs_zerofill="no"
>>> +archipelago=""
>>> virtio_blk_data_plane=""
>>> gtk=""
>>> gtkabi=""
>>> @@ -1087,6 +1088,10 @@ for opt do
>>> ;;
>>> --enable-glusterfs) glusterfs="yes"
>>> ;;
>>> + --disable-archipelago) archipelago="no"
>>> + ;;
>>> + --enable-archipelago) archipelago="yes"
>>> + ;;
>>> --disable-virtio-blk-data-plane) virtio_blk_data_plane="no"
>>> ;;
>>> --enable-virtio-blk-data-plane) virtio_blk_data_plane="yes"
>>> @@ -1382,6 +1387,8 @@ Advanced options (experts only):
>>> --enable-coroutine-pool enable coroutine freelist (better
>>> performance)
>>> --enable-glusterfs enable GlusterFS backend
>>> --disable-glusterfs disable GlusterFS backend
>>> + --enable-archipelago enable Archipelago backend
>>> + --disable-archipelago disable Archipelago backend
>>> --enable-gcov enable test coverage analysis with gcov
>>> --gcov=GCOV use specified gcov [$gcov_tool]
>>> --disable-tpm disable TPM support
>>> @@ -3051,6 +3058,33 @@ EOF
>>> fi
>>> fi
>>> +
>>> +##########################################
>>> +# archipelago probe
>>> +if test "$archipelago" != "no" ; then
>>> + cat > $TMPC <<EOF
>>> +#include <stdio.h>
>>> +#include <xseg/xseg.h>
>>> +#include <xseg/protocol.h>
>>> +int main(void) {
>>> + xseg_initialize();
>>> + return 0;
>>> +}
>>> +EOF
>>> + archipelago_libs=-lxseg
>>> + if compile_prog "" "$archipelago_libs"; then
>>> + archipelago="yes"
>>> + libs_tools="$archipelago_libs $libs_tools"
>>> + libs_softmmu="$archipelago_libs $libs_softmmu"
>>> + else
>>> + if test "$archipelago" = "yes" ; then
>>> + feature_not_found "Archipelago backend support" "Install
>>> libxseg devel"
>>> + fi
>>> + archipelago="no"
>>> + fi
>>> +fi
>>> +
>>> +
>>> ##########################################
>>> # glusterfs probe
>>> if test "$glusterfs" != "no" ; then
>>> @@ -4230,6 +4264,7 @@ echo "seccomp support $seccomp"
>>> echo "coroutine backend $coroutine"
>>> echo "coroutine pool $coroutine_pool"
>>> echo "GlusterFS support $glusterfs"
>>> +echo "Archipelago support $archipelago"
>>> echo "virtio-blk-data-plane $virtio_blk_data_plane"
>>> echo "gcov $gcov_tool"
>>> echo "gcov enabled $gcov"
>>> @@ -4665,6 +4700,11 @@ if test "$glusterfs_zerofill" = "yes" ; then
>>> echo "CONFIG_GLUSTERFS_ZEROFILL=y" >> $config_host_mak
>>> fi
>>> +if test "$archipelago" = "yes" ; then
>>> + echo "CONFIG_ARCHIPELAGO=m" >> $config_host_mak
>>> + echo "ARCHIPELAGO_LIBS=$archipelago_libs" >> $config_host_mak
>>> +fi
>>> +
>>> if test "$libssh2" = "yes" ; then
>>> echo "CONFIG_LIBSSH2=m" >> $config_host_mak
>>> echo "LIBSSH2_CFLAGS=$libssh2_cflags" >> $config_host_mak
>>> --
>>> 1.7.10.4
>>>
>>>
>
next prev parent reply other threads:[~2014-07-10 14:03 UTC|newest]
Thread overview: 22+ messages / expand[flat|nested] mbox.gz Atom feed top
2014-06-27 8:24 [Qemu-devel] [PATCH v6 0/5] Support Archipelago as a QEMU block backend Chrysostomos Nanakos
2014-06-27 8:24 ` [Qemu-devel] [PATCH v6 1/5] block: " Chrysostomos Nanakos
2014-07-02 13:59 ` Eric Blake
2014-07-02 14:18 ` Chrysostomos Nanakos
2014-07-02 14:30 ` Eric Blake
2014-07-10 0:23 ` Jeff Cody
2014-07-10 10:04 ` Chrysostomos Nanakos
2014-07-10 14:02 ` Chrysostomos Nanakos [this message]
2014-07-22 12:40 ` Stefan Hajnoczi
2014-07-22 12:35 ` Stefan Hajnoczi
2014-06-27 8:24 ` [Qemu-devel] [PATCH v6 2/5] block/archipelago: Implement bdrv_parse_filename() Chrysostomos Nanakos
2014-07-21 15:55 ` Stefan Hajnoczi
2014-06-27 8:24 ` [Qemu-devel] [PATCH v6 3/5] block/archipelago: Add support for creating images Chrysostomos Nanakos
2014-07-02 14:01 ` Eric Blake
2014-07-02 14:06 ` Chrysostomos Nanakos
2014-07-21 16:01 ` Stefan Hajnoczi
2014-06-27 8:24 ` [Qemu-devel] [PATCH v6 4/5] QMP: Add support for Archipelago Chrysostomos Nanakos
2014-07-02 13:58 ` Eric Blake
2014-07-02 14:11 ` Chrysostomos Nanakos
2014-07-02 14:22 ` Eric Blake
2014-06-27 8:24 ` [Qemu-devel] [PATCH v6 5/5] qemu-iotests: add support for Archipelago protocol Chrysostomos Nanakos
2014-07-21 16:02 ` Stefan Hajnoczi
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=53BE9CEF.2060106@grnet.gr \
--to=cnanakos@grnet.gr \
--cc=jcody@redhat.com \
--cc=kwolf@redhat.com \
--cc=qemu-devel@nongnu.org \
--cc=stefanha@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).