From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([2001:4830:134:3::10]:55627) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1X5Ewp-0005x0-Qj for qemu-devel@nongnu.org; Thu, 10 Jul 2014 10:03:52 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1X5Ewi-0001nF-O2 for qemu-devel@nongnu.org; Thu, 10 Jul 2014 10:03:43 -0400 Received: from averel.grnet-hq.admin.grnet.gr ([195.251.29.3]:19768) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1X5Ewi-0001ml-2K for qemu-devel@nongnu.org; Thu, 10 Jul 2014 10:03:36 -0400 Message-ID: <53BE9CEF.2060106@grnet.gr> Date: Thu, 10 Jul 2014 17:02:23 +0300 From: Chrysostomos Nanakos MIME-Version: 1.0 References: <1403857452-23768-1-git-send-email-cnanakos@grnet.gr> <1403857452-23768-2-git-send-email-cnanakos@grnet.gr> <20140710002322.GA8058@localhost.localdomain> <53BE6546.5030704@grnet.gr> In-Reply-To: <53BE6546.5030704@grnet.gr> Content-Type: text/plain; charset=ISO-8859-1; format=flowed Content-Transfer-Encoding: 7bit Subject: Re: [Qemu-devel] [PATCH v6 1/5] block: Support Archipelago as a QEMU block backend List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Jeff Cody Cc: kwolf@redhat.com, qemu-devel@nongnu.org, stefanha@redhat.com On 07/10/2014 01:04 PM, Chrysostomos Nanakos wrote: > On 07/10/2014 03:23 AM, Jeff Cody wrote: >> On Fri, Jun 27, 2014 at 11:24:08AM +0300, Chrysostomos Nanakos wrote: >>> VM Image on Archipelago volume is specified like this: >>> >>> file.driver=archipelago,file.volume=[,file.mport=[, >>> >>> file.vport=][,file.segment=]] >>> >>> 'archipelago' is the protocol. >>> >>> 'mport' is the port number on which mapperd is listening. This is >>> optional >>> and if not specified, QEMU will make Archipelago to use the default >>> port. >>> >>> 'vport' is the port number on which vlmcd is listening. This is >>> optional >>> and if not specified, QEMU will make Archipelago to use the default >>> port. >>> >>> 'segment' is the name of the shared memory segment Archipelago stack >>> is using. >>> This is optional and if not specified, QEMU will make Archipelago to >>> use the >>> default value, 'archipelago'. >>> >>> Examples: >>> >>> file.driver=archipelago,file.volume=my_vm_volume >>> file.driver=archipelago,file.volume=my_vm_volume,file.mport=123 >>> file.driver=archipelago,file.volume=my_vm_volume,file.mport=123, >>> file.vport=1234 >>> file.driver=archipelago,file.volume=my_vm_volume,file.mport=123, >>> file.vport=1234,file.segment=my_segment >>> >>> Signed-off-by: Chrysostomos Nanakos >> This is just a superficial review, because I don't have a good idea of >> what archipelago or libxseg really does (I didn't even compile it or >> these patches). But I scanned through this patch, and found a few >> things, and had a few questions. > > No worries, every review is more than welcome. > >> >>> --- >>> MAINTAINERS | 6 + >>> block/Makefile.objs | 2 + >>> block/archipelago.c | 819 >>> +++++++++++++++++++++++++++++++++++++++++++++++++++ >>> configure | 40 +++ >>> 4 files changed, 867 insertions(+) >>> create mode 100644 block/archipelago.c >>> >>> diff --git a/MAINTAINERS b/MAINTAINERS >>> index 9b93edd..58ef1e3 100644 >>> --- a/MAINTAINERS >>> +++ b/MAINTAINERS >>> @@ -999,3 +999,9 @@ SSH >>> M: Richard W.M. Jones >>> S: Supported >>> F: block/ssh.c >>> + >>> +ARCHIPELAGO >>> +M: Chrysostomos Nanakos >>> +M: Chrysostomos Nanakos >>> +S: Maintained >>> +F: block/archipelago.c >>> diff --git a/block/Makefile.objs b/block/Makefile.objs >>> index fd88c03..858d2b3 100644 >>> --- a/block/Makefile.objs >>> +++ b/block/Makefile.objs >>> @@ -17,6 +17,7 @@ block-obj-$(CONFIG_LIBNFS) += nfs.o >>> block-obj-$(CONFIG_CURL) += curl.o >>> block-obj-$(CONFIG_RBD) += rbd.o >>> block-obj-$(CONFIG_GLUSTERFS) += gluster.o >>> +block-obj-$(CONFIG_ARCHIPELAGO) += archipelago.o >>> block-obj-$(CONFIG_LIBSSH2) += ssh.o >>> endif >>> @@ -35,5 +36,6 @@ gluster.o-cflags := $(GLUSTERFS_CFLAGS) >>> gluster.o-libs := $(GLUSTERFS_LIBS) >>> ssh.o-cflags := $(LIBSSH2_CFLAGS) >>> ssh.o-libs := $(LIBSSH2_LIBS) >>> +archipelago.o-libs := $(ARCHIPELAGO_LIBS) >>> qcow.o-libs := -lz >>> linux-aio.o-libs := -laio >>> diff --git a/block/archipelago.c b/block/archipelago.c >>> new file mode 100644 >>> index 0000000..c56826a >>> --- /dev/null >>> +++ b/block/archipelago.c >>> @@ -0,0 +1,819 @@ >>> +/* >>> + * QEMU Block driver for Archipelago >>> + * >>> + * Copyright 2014 GRNET S.A. All rights reserved. >>> + * >>> + * Redistribution and use in source and binary forms, with or >>> + * without modification, are permitted provided that the following >>> + * conditions are met: >>> + * >>> + * 1. Redistributions of source code must retain the above >>> + * copyright notice, this list of conditions and the following >>> + * disclaimer. >>> + * 2. Redistributions in binary form must reproduce the above >>> + * copyright notice, this list of conditions and the following >>> + * disclaimer in the documentation and/or other materials >>> + * provided with the distribution. >>> + * >>> + * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS >>> + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED >>> + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR >>> + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR >>> + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, >>> + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT >>> + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF >>> + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED >>> + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT >>> + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN >>> + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE >>> + * POSSIBILITY OF SUCH DAMAGE. >>> + * >>> + * The views and conclusions contained in the software and >>> + * documentation are those of the authors and should not be >>> + * interpreted as representing official policies, either expressed >>> + * or implied, of GRNET S.A. >>> + */ >>> + >>> +/* >>> +* VM Image on Archipelago volume is specified like this: >>> +* >>> +* >>> file.driver=archipelago,file.volume=[,file.mport=[, >>> +* file.vport=][,file.segment=]] >>> +* >>> +* 'archipelago' is the protocol. >>> +* >>> +* 'mport' is the port number on which mapperd is listening. This is >>> optional >>> +* and if not specified, QEMU will make Archipelago to use the >>> default port. >>> +* >>> +* 'vport' is the port number on which vlmcd is listening. This is >>> optional >>> +* and if not specified, QEMU will make Archipelago to use the >>> default port. >>> +* >>> +* 'segment' is the name of the shared memory segment Archipelago >>> stack is using. >>> +* This is optional and if not specified, QEMU will make Archipelago >>> to use the >>> +* default value, 'archipelago'. >>> +* >>> +* Examples: >>> +* >>> +* file.driver=archipelago,file.volume=my_vm_volume >>> +* file.driver=archipelago,file.volume=my_vm_volume,file.mport=123 >>> +* file.driver=archipelago,file.volume=my_vm_volume,file.mport=123, >>> +* file.vport=1234 >>> +* file.driver=archipelago,file.volume=my_vm_volume,file.mport=123, >>> +* file.vport=1234,file.segment=my_segment >>> +*/ >>> + >>> +#include "block/block_int.h" >>> +#include "qemu/error-report.h" >>> +#include "qemu/thread.h" >>> +#include "qapi/qmp/qint.h" >>> +#include "qapi/qmp/qstring.h" >>> +#include "qapi/qmp/qjson.h" >>> + >>> +#include >>> +#include >>> +#include >>> + >>> +#define ARCHIP_FD_READ 0 >>> +#define ARCHIP_FD_WRITE 1 >>> +#define MAX_REQUEST_SIZE 524288 >>> + >>> +#define ARCHIPELAGO_OPT_VOLUME "volume" >>> +#define ARCHIPELAGO_OPT_SEGMENT "segment" >>> +#define ARCHIPELAGO_OPT_MPORT "mport" >>> +#define ARCHIPELAGO_OPT_VPORT "vport" >>> + >>> +#define archipelagolog(fmt, ...) \ >>> + do { \ >>> + fprintf(stderr, "archipelago\t%-24s: " fmt, __func__, >>> ##__VA_ARGS__); \ >>> + } while (0) >>> + >>> +typedef enum { >>> + ARCHIP_OP_READ, >>> + ARCHIP_OP_WRITE, >>> + ARCHIP_OP_FLUSH, >>> + ARCHIP_OP_VOLINFO, >>> +} ARCHIPCmd; >>> + >>> +typedef struct ArchipelagoAIOCB { >>> + BlockDriverAIOCB common; >>> + QEMUBH *bh; >>> + struct BDRVArchipelagoState *s; >>> + QEMUIOVector *qiov; >>> + void *buffer; >>> + ARCHIPCmd cmd; >>> + bool cancelled; >>> + int status; >>> + int64_t size; >>> + int64_t ret; >>> +} ArchipelagoAIOCB; >>> + >>> +typedef struct BDRVArchipelagoState { >>> + ArchipelagoAIOCB *event_acb; >>> + char *volname; >>> + char *segment_name; >>> + uint64_t size; >>> + /* Archipelago specific */ >>> + struct xseg *xseg; >> I assume s->xseg is allocated in xseg_join() - is it ever freed? In >> _close(), there is a final call to xseg_leave(s->xseg), but from what >> I found in libxseg, it does not appear to be freed: >> https://github.com/cnanakos/libxseg/blob/develop/src/xseg.c#L975 >> >> Is it up to libxseg to free xseg, or the caller? > > libxseg allocated xseg and should free it also. I will fix that in > libxseg. Thanks! > >>> + struct xseg_port *port; >>> + xport srcport; >>> + xport sport; >>> + xport mportno; >>> + xport vportno; >>> + QemuMutex archip_mutex; >>> + QemuCond archip_cond; >>> + bool is_signaled; >>> + /* Request handler specific */ >>> + QemuThread request_th; >>> + QemuCond request_cond; >>> + QemuMutex request_mutex; >>> + bool th_is_signaled; >>> + bool stopping; >>> +} BDRVArchipelagoState; >>> + >>> +typedef struct ArchipelagoSegmentedRequest { >>> + size_t count; >>> + size_t total; >>> + int ref; >>> + int failed; >>> +} ArchipelagoSegmentedRequest; >>> + >>> +typedef struct AIORequestData { >>> + const char *volname; >>> + off_t offset; >>> + size_t size; >>> + uint64_t bufidx; >>> + int ret; >>> + int op; >>> + ArchipelagoAIOCB *aio_cb; >>> + ArchipelagoSegmentedRequest *segreq; >>> +} AIORequestData; >>> + >>> +static void qemu_archipelago_complete_aio(void *opaque); >>> + >>> +static void init_local_signal(struct xseg *xseg, xport sport, xport >>> srcport) >>> +{ >>> + if (xseg && (sport != srcport)) { >>> + xseg_init_local_signal(xseg, srcport); >>> + sport = srcport; >>> + } >>> +} >>> + >>> +static void archipelago_finish_aiocb(AIORequestData *reqdata) >>> +{ >>> + if (reqdata->aio_cb->ret != reqdata->segreq->total) { >>> + reqdata->aio_cb->ret = -EIO; >>> + } else if (reqdata->aio_cb->ret == reqdata->segreq->total) { >>> + reqdata->aio_cb->ret = 0; >>> + } >>> + reqdata->aio_cb->bh = aio_bh_new( >>> + bdrv_get_aio_context(reqdata->aio_cb->common.bs), >>> + qemu_archipelago_complete_aio, reqdata >>> + ); >>> + qemu_bh_schedule(reqdata->aio_cb->bh); >>> +} >>> + >>> +static int wait_reply(struct xseg *xseg, xport srcport, struct >>> xseg_port *port, >>> + struct xseg_request *expected_req) >>> +{ >>> + struct xseg_request *req; >>> + xseg_prepare_wait(xseg, srcport); >>> + void *psd = xseg_get_signal_desc(xseg, port); >>> + while (1) { >>> + req = xseg_receive(xseg, srcport, 0); >>> + if (req) { >>> + if (req != expected_req) { >>> + archipelagolog("Unknown received request\n"); >>> + xseg_put_request(xseg, req, srcport); >>> + } else if (!(req->state & XS_SERVED)) { >>> + return -1; >>> + } else { >>> + break; >>> + } >>> + } >>> + xseg_wait_signal(xseg, psd, 100000UL); >>> + } >>> + xseg_cancel_wait(xseg, srcport); >>> + return 0; >>> +} >>> + >>> +static void xseg_request_handler(void *state) >>> +{ >>> + BDRVArchipelagoState *s = (BDRVArchipelagoState *) state; >>> + void *psd = xseg_get_signal_desc(s->xseg, s->port); >>> + qemu_mutex_lock(&s->request_mutex); >>> + >>> + while (!s->stopping) { >>> + struct xseg_request *req; >>> + void *data; >>> + xseg_prepare_wait(s->xseg, s->srcport); >>> + req = xseg_receive(s->xseg, s->srcport, 0); >> Is this a blocking call? If so, is there a timeout, and if not, what >> scenarios (if any) could cause us to wait here indefinitely? > > xseg_receive won't block until a request is received but It will wait > until it takes > the volume lock, check if there is or not a request and will return > after giving back the volume lock. > If it can't take the lock, it could wait indefinitely, there is no > timeout for that at the moment > but I could easily add this kind of functionality. > > On the other hand xseg_receive won't wait for the volume lock if the > caller sets X_NONBLOCK. > > After that, I believe I should set here and in wait_reply() also, the > X_NONBLOCK flag. s/volume lock/receive queue lock/g > > > >> >>> + if (req) { >>> + AIORequestData *reqdata; >>> + ArchipelagoSegmentedRequest *segreq; >>> + xseg_get_req_data(s->xseg, req, (void **)&reqdata); >>> + >>> + switch (reqdata->op) { >>> + case ARCHIP_OP_READ: >>> + data = xseg_get_data(s->xseg, req); >>> + segreq = reqdata->segreq; >>> + segreq->count += req->serviced; >>> + >>> + qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx, >>> + data, >>> + req->serviced); >>> + >>> + xseg_put_request(s->xseg, req, s->srcport); >>> + >>> + if ((__sync_add_and_fetch(&segreq->ref, -1)) == >>> 0) { >>> + if (!segreq->failed) { >>> + reqdata->aio_cb->ret = segreq->count; >>> + archipelago_finish_aiocb(reqdata); >>> + g_free(segreq); >>> + } else { >>> + g_free(segreq); >>> + g_free(reqdata); >>> + } >>> + } else { >>> + g_free(reqdata); >>> + } >>> + break; >>> + case ARCHIP_OP_WRITE: >>> + segreq = reqdata->segreq; >>> + segreq->count += req->serviced; >>> + xseg_put_request(s->xseg, req, s->srcport); >>> + >>> + if ((__sync_add_and_fetch(&segreq->ref, -1)) == >>> 0) { >>> + if (!segreq->failed) { >>> + reqdata->aio_cb->ret = segreq->count; >>> + archipelago_finish_aiocb(reqdata); >>> + g_free(segreq); >>> + } else { >>> + g_free(segreq); >>> + g_free(reqdata); >>> + } >>> + } else { >>> + g_free(reqdata); >>> + >> This (OP_WRITE / OP_READ) is where I am worried that we leak in error >> cases, and a _close() won't clean it up (see later comments). > >>> + break; >>> + case ARCHIP_OP_VOLINFO: >>> + s->is_signaled = true; >>> + qemu_cond_signal(&s->archip_cond); >>> + break; >>> + } >>> + } else { >>> + xseg_wait_signal(s->xseg, psd, 100000UL); >>> + } >>> + xseg_cancel_wait(s->xseg, s->srcport); >> >> >>> + } >>> + >>> + s->th_is_signaled = true; >>> + qemu_cond_signal(&s->request_cond); >>> + qemu_mutex_unlock(&s->request_mutex); >>> + qemu_thread_exit(NULL); >>> +} >>> + >>> +static int qemu_archipelago_xseg_init(BDRVArchipelagoState *s) >>> +{ >>> + if (xseg_initialize()) { >>> + archipelagolog("Cannot initialize XSEG\n"); >>> + goto err_exit; >>> + } >>> + >>> + s->xseg = xseg_join((char *)"posix", s->segment_name, >>> + (char *)"posixfd", NULL); >>> + if (!s->xseg) { >>> + archipelagolog("Cannot join XSEG shared memory segment\n"); >>> + goto err_exit; >>> + } >>> + s->port = xseg_bind_dynport(s->xseg); >>> + s->srcport = s->port->portno; >>> + init_local_signal(s->xseg, s->sport, s->srcport); >>> + return 0; >>> + >>> +err_exit: >>> + return -1; >>> +} >>> + >>> +static int qemu_archipelago_init(BDRVArchipelagoState *s) >>> +{ >>> + int ret; >>> + >>> + ret = qemu_archipelago_xseg_init(s); >>> + if (ret < 0) { >>> + error_report("Cannot initialize XSEG. Aborting...\n"); >>> + goto err_exit; >>> + } >>> + >>> + qemu_cond_init(&s->archip_cond); >>> + qemu_mutex_init(&s->archip_mutex); >>> + qemu_cond_init(&s->request_cond); >>> + qemu_mutex_init(&s->request_mutex); >>> + s->th_is_signaled = false; >>> + qemu_thread_create(&s->request_th, "xseg_io_th", >>> + (void *) xseg_request_handler, >>> + (void *) s, QEMU_THREAD_JOINABLE); >>> + >>> +err_exit: >>> + return ret; >>> +} >>> + >>> +static void qemu_archipelago_complete_aio(void *opaque) >>> +{ >>> + AIORequestData *reqdata = (AIORequestData *) opaque; >>> + ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) reqdata->aio_cb; >>> + >>> + qemu_bh_delete(aio_cb->bh); >>> + qemu_vfree(aio_cb->buffer); >>> + aio_cb->common.cb(aio_cb->common.opaque, aio_cb->ret); >>> + aio_cb->status = 0; >>> + >>> + if (!aio_cb->cancelled) { >>> + qemu_aio_release(aio_cb); >>> + } >>> + g_free(reqdata); >>> +} >>> + >>> +static QemuOptsList archipelago_runtime_opts = { >>> + .name = "archipelago", >>> + .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head), >>> + .desc = { >>> + { >>> + .name = ARCHIPELAGO_OPT_VOLUME, >>> + .type = QEMU_OPT_STRING, >>> + .help = "Name of the volume image", >>> + }, >>> + { >>> + .name = ARCHIPELAGO_OPT_SEGMENT, >>> + .type = QEMU_OPT_STRING, >>> + .help = "Name of the Archipelago shared memory segment", >>> + }, >>> + { >>> + .name = ARCHIPELAGO_OPT_MPORT, >>> + .type = QEMU_OPT_NUMBER, >>> + .help = "Archipelago mapperd port number" >>> + }, >>> + { >>> + .name = ARCHIPELAGO_OPT_VPORT, >>> + .type = QEMU_OPT_NUMBER, >>> + .help = "Archipelago vlmcd port number" >>> + >>> + }, >>> + { /* end of list */ } >>> + }, >>> +}; >>> + >>> +static int qemu_archipelago_open(BlockDriverState *bs, >>> + QDict *options, >>> + int bdrv_flags, >>> + Error **errp) >>> +{ >>> + int ret = 0; >>> + const char *volume, *segment_name; >>> + QemuOpts *opts; >>> + Error *local_err = NULL; >>> + BDRVArchipelagoState *s = bs->opaque; >>> + >>> + opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, >>> &error_abort); >>> + qemu_opts_absorb_qdict(opts, options, &local_err); >>> + if (local_err) { >>> + error_propagate(errp, local_err); >>> + qemu_opts_del(opts); >>> + return -EINVAL; >>> + } >>> + >>> + s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT, >>> 1001); >>> + s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT, >>> 501); >>> + >>> + segment_name = qemu_opt_get(opts, ARCHIPELAGO_OPT_SEGMENT); >>> + if (segment_name == NULL) { >>> + s->segment_name = g_strdup("archipelago"); >>> + } else { >>> + s->segment_name = g_strdup(segment_name); >>> + } >>> + >>> + volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME); >>> + if (volume == NULL) { >>> + error_setg(errp, "archipelago block driver requires the >>> 'volume'" >>> + " option"); >>> + qemu_opts_del(opts); >>> + return -EINVAL; >> s->segment_name is leaked here. >> >> You already have an exit label (err_exit) that cleans everything up, >> and g_free() is NULL safe (and bs->opaque is zero-initialized). >> >> You should be able to just set ret, and 'goto err_exit' in each error >> instance in qemu_archipelago_open() - this also gets rid of the extra >> qemu_opts_del() calls. > > Missed it. Thanks for that! >> >>> + } >>> + s->volname = g_strdup(volume); >>> + >>> + /* Initialize XSEG, join shared memory segment */ >>> + ret = qemu_archipelago_init(s); >>> + if (ret < 0) { >>> + error_setg(errp, "cannot initialize XSEG and join shared " >>> + "memory segment"); >>> + goto err_exit; >>> + } >>> + >>> + qemu_opts_del(opts); >>> + return 0; >>> + >>> +err_exit: >>> + g_free(s->volname); >>> + g_free(s->segment_name); >>> + qemu_opts_del(opts); >>> + return ret; >>> +} >>> + >>> +static void qemu_archipelago_close(BlockDriverState *bs) >>> +{ >>> + int r, targetlen; >>> + char *target; >>> + struct xseg_request *req; >>> + BDRVArchipelagoState *s = bs->opaque; >>> + >>> + s->stopping = true; >>> + >>> + qemu_mutex_lock(&s->request_mutex); >>> + while (!s->th_is_signaled) { >>> + qemu_cond_wait(&s->request_cond, >>> + &s->request_mutex); >>> + } >>> + qemu_mutex_unlock(&s->request_mutex); >>> + qemu_thread_join(&s->request_th); >>> + qemu_cond_destroy(&s->request_cond); >>> + qemu_mutex_destroy(&s->request_mutex); >>> + >>> + qemu_cond_destroy(&s->archip_cond); >>> + qemu_mutex_destroy(&s->archip_mutex); >>> + >>> + targetlen = strlen(s->volname); >> Should this be strlen(s->volname) + 1, to account for the '\0'? Or >> does xseg_prep_request() just need the length of the non-null >> terminated string? > > You are right, xseg_prep_request() needs the length of the non-null > terminated string. > >> >>> + req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC); >>> + if (!req) { >>> + archipelagolog("Cannot get XSEG request\n"); >>> + goto err_exit; >>> + } >>> + r = xseg_prep_request(s->xseg, req, targetlen, 0); >>> + if (r < 0) { >>> + xseg_put_request(s->xseg, req, s->srcport); >> What does this do here, if xseg_prep_request() failed? Is it >> essentially a cleanup function? > > Yes this is a cleanup function. > >>> + archipelagolog("Cannot prepare XSEG close request\n"); >>> + goto err_exit; >>> + } >>> + >>> + target = xseg_get_target(s->xseg, req); >>> + memcpy(target, s->volname, targetlen); >>> + req->size = req->datalen; >>> + req->offset = 0; >>> + req->op = X_CLOSE; >>> + >>> + xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC); >>> + if (p == NoPort) { >>> + xseg_put_request(s->xseg, req, s->srcport); >>> + archipelagolog("Cannot submit XSEG close request\n"); >>> + goto err_exit; >>> + } >>> + >>> + xseg_signal(s->xseg, p); >>> + wait_reply(s->xseg, s->srcport, s->port, req); >> This is another spot I am wondering if we could get stuck on a >> blocking call that could potentially wait forever... is there a >> timeout here? > > For the same reasons I explained in xseg_receive(). We will resolve > this by setting X_NONBLOCK. > >> >>> + >>> + xseg_put_request(s->xseg, req, s->srcport); >>> + >>> +err_exit: >>> + g_free(s->volname); >>> + g_free(s->segment_name); >>> + xseg_quit_local_signal(s->xseg, s->srcport); >>> + xseg_leave_dynport(s->xseg, s->port); >>> + xseg_leave(s->xseg); >>> +} >>> + >>> +static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb) >>> +{ >>> + ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb; >>> + aio_cb->cancelled = true; >>> + while (aio_cb->status == -EINPROGRESS) { >>> + qemu_aio_wait(); >>> + } >>> + qemu_aio_release(aio_cb); >>> +} >>> + >>> +static const AIOCBInfo archipelago_aiocb_info = { >>> + .aiocb_size = sizeof(ArchipelagoAIOCB), >>> + .cancel = qemu_archipelago_aio_cancel, >>> +}; >>> + >>> +static int __archipelago_submit_request(BDRVArchipelagoState *s, >>> + uint64_t bufidx, >>> + size_t count, >>> + off_t offset, >>> + ArchipelagoAIOCB *aio_cb, >>> + ArchipelagoSegmentedRequest *segreq, >>> + int op) >>> +{ >>> + int ret, targetlen; >>> + char *target; >>> + void *data = NULL; >>> + struct xseg_request *req; >>> + AIORequestData *reqdata = g_malloc(sizeof(AIORequestData)); >>> + >>> + targetlen = strlen(s->volname); >>> + req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC); >>> + if (!req) { >>> + archipelagolog("Cannot get XSEG request\n"); >>> + goto err_exit2; >>> + } >>> + ret = xseg_prep_request(s->xseg, req, targetlen, count); >>> + if (ret < 0) { >>> + archipelagolog("Cannot prepare XSEG request\n"); >>> + goto err_exit; >>> + } >>> + target = xseg_get_target(s->xseg, req); >>> + if (!target) { >>> + archipelagolog("Cannot get XSEG target\n"); >>> + goto err_exit; >>> + } >>> + memcpy(target, s->volname, targetlen); >>> + req->size = count; >>> + req->offset = offset; >>> + >>> + switch (op) { >>> + case ARCHIP_OP_READ: >>> + req->op = X_READ; >>> + break; >>> + case ARCHIP_OP_WRITE: >>> + req->op = X_WRITE; >>> + break; >>> + } >>> + reqdata->volname = s->volname; >>> + reqdata->offset = offset; >>> + reqdata->size = count; >>> + reqdata->bufidx = bufidx; >>> + reqdata->aio_cb = aio_cb; >>> + reqdata->segreq = segreq; >>> + reqdata->op = op; >>> + >>> + xseg_set_req_data(s->xseg, req, reqdata); >>> + if (op == ARCHIP_OP_WRITE) { >>> + data = xseg_get_data(s->xseg, req); >>> + if (!data) { >>> + archipelagolog("Cannot get XSEG data\n"); >>> + goto err_exit; >>> + } >>> + memcpy(data, aio_cb->buffer + bufidx, count); >>> + } >>> + >>> + xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC); >>> + if (p == NoPort) { >>> + archipelagolog("Could not submit XSEG request\n"); >>> + goto err_exit; >>> + } >>> + xseg_signal(s->xseg, p); >>> + return 0; >>> + >>> +err_exit: >>> + g_free(reqdata); >>> + xseg_put_request(s->xseg, req, s->srcport); >>> + return -EIO; >>> +err_exit2: >>> + g_free(reqdata); >>> + return -EIO; >>> +} >>> + >>> +static int archipelago_aio_segmented_rw(BDRVArchipelagoState *s, >>> + size_t count, >>> + off_t offset, >>> + ArchipelagoAIOCB *aio_cb, >>> + int op) >>> +{ >>> + int i, ret, segments_nr, last_segment_size; >>> + ArchipelagoSegmentedRequest *segreq; >>> + >>> + segreq = g_malloc(sizeof(ArchipelagoSegmentedRequest)); >>> + >>> + if (op == ARCHIP_OP_FLUSH) { >>> + segments_nr = 1; >>> + segreq->ref = segments_nr; >>> + segreq->total = count; >>> + segreq->count = 0; >>> + segreq->failed = 0; >>> + ret = __archipelago_submit_request(s, 0, count, offset, >>> aio_cb, >>> + segreq, ARCHIP_OP_WRITE); >>> + if (ret < 0) { >>> + goto err_exit; >>> + } >>> + return 0; >>> + } >>> + >>> + segments_nr = (int)(count / MAX_REQUEST_SIZE) + \ >>> + ((count % MAX_REQUEST_SIZE) ? 1 : 0); >>> + last_segment_size = (int)(count % MAX_REQUEST_SIZE); >>> + >>> + segreq->ref = segments_nr; >>> + segreq->total = count; >>> + segreq->count = 0; >>> + segreq->failed = 0; >>> + >>> + for (i = 0; i < segments_nr - 1; i++) { >>> + ret = __archipelago_submit_request(s, i * MAX_REQUEST_SIZE, >>> + MAX_REQUEST_SIZE, >>> + offset + i * >>> MAX_REQUEST_SIZE, >>> + aio_cb, segreq, op); >>> + >>> + if (ret < 0) { >>> + goto err_exit; >>> + } >>> + } >>> + >>> + if ((segments_nr > 1) && last_segment_size) { >>> + ret = __archipelago_submit_request(s, i * MAX_REQUEST_SIZE, >>> + last_segment_size, >>> + offset + i * >>> MAX_REQUEST_SIZE, >>> + aio_cb, segreq, op); >>> + } else if ((segments_nr > 1) && !last_segment_size) { >>> + ret = __archipelago_submit_request(s, i * MAX_REQUEST_SIZE, >>> + MAX_REQUEST_SIZE, >>> + offset + i * >>> MAX_REQUEST_SIZE, >>> + aio_cb, segreq, op); >>> + } else if (segments_nr == 1) { >>> + ret = __archipelago_submit_request(s, 0, count, offset, >>> aio_cb, >>> + segreq, op); >>> + } >>> + >>> + if (ret < 0) { >>> + goto err_exit; >>> + } >>> + >>> + return 0; >>> + >>> +err_exit: >>> + __sync_add_and_fetch(&segreq->failed, 1); >>> + if (segments_nr == 1) { >>> + if (__sync_add_and_fetch(&segreq->ref, -1) == 0) { >>> + g_free(segreq); >>> + } >>> + } else { >>> + if ((__sync_add_and_fetch(&segreq->ref, -segments_nr + i)) >>> == 0) { >>> + g_free(segreq); >>> + } >>> + } >> Don't we run the risk of leaking segreq here? The other place this is >> freed is in xseg_request_handler(), but could we run into a race >> condition where 's->stopping' is true, or even xseg_receive() just >> does not >> return a request? > > If 's->stopping' is true means that _close() has been invoked. How > QEMU handles unserviced requests while in the meantime someone invokes > _close()? Does it wait for the requests to finish and then exits? Or > it exits silently without checking for pending requests? > > If xseg_receive() does not return an already submitted request then > the problem is located in Archipelago stack. Someone should check why > the pending requests are not serviced and resolve the problem. The > question here is the same as before, how QEMU handles pending requests > while in the meantime invokes _close()? > > Until all pending requests are serviced successfully or not, segreq > allocations will remain and not freed. Another approach could have > been a linked list that tracks all submitted requests and handle them > accordingly on _close(). > > Suggestions here are more than welcome! > > >> >>> + >>> + return ret; >>> +} >>> + >>> +static BlockDriverAIOCB *qemu_archipelago_aio_rw(BlockDriverState *bs, >>> + int64_t sector_num, >>> + QEMUIOVector *qiov, >>> + int nb_sectors, >>> + BlockDriverCompletionFunc *cb, >>> + void *opaque, >>> + int op) >>> +{ >>> + ArchipelagoAIOCB *aio_cb; >>> + BDRVArchipelagoState *s = bs->opaque; >>> + int64_t size, off; >>> + int ret; >>> + >>> + aio_cb = qemu_aio_get(&archipelago_aiocb_info, bs, cb, opaque); >>> + aio_cb->cmd = op; >>> + aio_cb->qiov = qiov; >>> + >>> + if (op != ARCHIP_OP_FLUSH) { >>> + aio_cb->buffer = qemu_blockalign(bs, qiov->size); >>> + } else { >>> + aio_cb->buffer = NULL; >>> + } >>> + >>> + aio_cb->ret = 0; >>> + aio_cb->s = s; >>> + aio_cb->cancelled = false; >>> + aio_cb->status = -EINPROGRESS; >>> + >>> + if (op == ARCHIP_OP_WRITE) { >>> + qemu_iovec_to_buf(aio_cb->qiov, 0, aio_cb->buffer, >>> qiov->size); >>> + } >>> + >>> + off = sector_num * BDRV_SECTOR_SIZE; >>> + size = nb_sectors * BDRV_SECTOR_SIZE; >>> + aio_cb->size = size; >>> + >>> + ret = archipelago_aio_segmented_rw(s, size, off, >>> + aio_cb, op); >>> + if (ret < 0) { >>> + goto err_exit; >>> + } >>> + return &aio_cb->common; >>> + >>> +err_exit: >>> + error_report("qemu_archipelago_aio_rw(): I/O Error\n"); >>> + qemu_vfree(aio_cb->buffer); >>> + qemu_aio_release(aio_cb); >>> + return NULL; >>> +} >>> + >>> +static BlockDriverAIOCB >>> *qemu_archipelago_aio_readv(BlockDriverState *bs, >>> + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, >>> + BlockDriverCompletionFunc *cb, void *opaque) >>> +{ >>> + return qemu_archipelago_aio_rw(bs, sector_num, qiov, >>> nb_sectors, cb, >>> + opaque, ARCHIP_OP_READ); >>> +} >>> + >>> +static BlockDriverAIOCB >>> *qemu_archipelago_aio_writev(BlockDriverState *bs, >>> + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, >>> + BlockDriverCompletionFunc *cb, void *opaque) >>> +{ >>> + return qemu_archipelago_aio_rw(bs, sector_num, qiov, >>> nb_sectors, cb, >>> + opaque, ARCHIP_OP_WRITE); >>> +} >>> + >>> +static int64_t archipelago_volume_info(BDRVArchipelagoState *s) >>> +{ >>> + uint64_t size; >>> + int ret, targetlen; >>> + struct xseg_request *req; >>> + struct xseg_reply_info *xinfo; >>> + AIORequestData *reqdata = g_malloc(sizeof(AIORequestData)); >>> + >>> + const char *volname = s->volname; >>> + targetlen = strlen(volname); >>> + req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC); >>> + if (!req) { >>> + archipelagolog("Cannot get XSEG request\n"); >>> + goto err_exit2; >>> + } >>> + ret = xseg_prep_request(s->xseg, req, targetlen, >>> + sizeof(struct xseg_reply_info)); >>> + if (ret < 0) { >>> + archipelagolog("Cannot prepare XSEG request\n"); >>> + goto err_exit; >>> + } >>> + char *target = xseg_get_target(s->xseg, req); >>> + if (!target) { >>> + archipelagolog("Cannot get XSEG target\n"); >>> + goto err_exit; >>> + } >>> + memcpy(target, volname, targetlen); >>> + req->size = req->datalen; >>> + req->offset = 0; >>> + req->op = X_INFO; >>> + >>> + reqdata->op = ARCHIP_OP_VOLINFO; >>> + reqdata->volname = volname; >>> + xseg_set_req_data(s->xseg, req, reqdata); >>> + >>> + xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC); >>> + if (p == NoPort) { >>> + archipelagolog("Cannot submit XSEG request\n"); >>> + goto err_exit; >>> + } >>> + xseg_signal(s->xseg, p); >>> + qemu_mutex_lock(&s->archip_mutex); >>> + while (!s->is_signaled) { >>> + qemu_cond_wait(&s->archip_cond, &s->archip_mutex); >>> + } >>> + s->is_signaled = false; >>> + qemu_mutex_unlock(&s->archip_mutex); >>> + >>> + xinfo = (struct xseg_reply_info *) xseg_get_data(s->xseg, req); >>> + size = xinfo->size; >>> + xseg_put_request(s->xseg, req, s->srcport); >>> + g_free(reqdata); >>> + s->size = size; >>> + return size; >>> + >> >> >>> +err_exit: >>> + g_free(reqdata); >>> + xseg_put_request(s->xseg, req, s->srcport); >>> + return -1; >>> +err_exit2: >>> + g_free(reqdata); >>> + return -1; >>> +} >> This could be simplified to just: >> >> err_exit: >> xseg_put_request(s->xseg, req, s->srcport); >> err_exit2: >> g_free(reqdata); >> return -1; >> } >> >> Maybe it'd also be best to return -EIO (or other meaningful error >> value) instead of just -1, as this value gets passed along to >> .bdrv_getlength(). >> >>> + >>> +static int64_t qemu_archipelago_getlength(BlockDriverState *bs) >>> +{ >>> + int64_t ret; >>> + BDRVArchipelagoState *s = bs->opaque; >>> + >>> + ret = archipelago_volume_info(s); >> (This is where I am talking about an error value such as -EIO may be >> better) > > Yes, it seems a lot better. Thanks! > >> >>> + return ret; >>> +} >>> + >>> +static BlockDriverAIOCB >>> *qemu_archipelago_aio_flush(BlockDriverState *bs, >>> + BlockDriverCompletionFunc *cb, void *opaque) >>> +{ >>> + return qemu_archipelago_aio_rw(bs, 0, NULL, 0, cb, opaque, >>> + ARCHIP_OP_FLUSH); >>> +} >>> + >>> +static BlockDriver bdrv_archipelago = { >>> + .format_name = "archipelago", >>> + .protocol_name = "archipelago", >>> + .instance_size = sizeof(BDRVArchipelagoState), >>> + .bdrv_file_open = qemu_archipelago_open, >>> + .bdrv_close = qemu_archipelago_close, >>> + .bdrv_getlength = qemu_archipelago_getlength, >>> + .bdrv_aio_readv = qemu_archipelago_aio_readv, >>> + .bdrv_aio_writev = qemu_archipelago_aio_writev, >>> + .bdrv_aio_flush = qemu_archipelago_aio_flush, >>> + .bdrv_has_zero_init = bdrv_has_zero_init_1, >>> +}; >>> + >>> +static void bdrv_archipelago_init(void) >>> +{ >>> + bdrv_register(&bdrv_archipelago); >>> +} >>> + >>> +block_init(bdrv_archipelago_init); >>> diff --git a/configure b/configure >>> index 7102964..e4acd9c 100755 >>> --- a/configure >>> +++ b/configure >>> @@ -326,6 +326,7 @@ seccomp="" >>> glusterfs="" >>> glusterfs_discard="no" >>> glusterfs_zerofill="no" >>> +archipelago="" >>> virtio_blk_data_plane="" >>> gtk="" >>> gtkabi="" >>> @@ -1087,6 +1088,10 @@ for opt do >>> ;; >>> --enable-glusterfs) glusterfs="yes" >>> ;; >>> + --disable-archipelago) archipelago="no" >>> + ;; >>> + --enable-archipelago) archipelago="yes" >>> + ;; >>> --disable-virtio-blk-data-plane) virtio_blk_data_plane="no" >>> ;; >>> --enable-virtio-blk-data-plane) virtio_blk_data_plane="yes" >>> @@ -1382,6 +1387,8 @@ Advanced options (experts only): >>> --enable-coroutine-pool enable coroutine freelist (better >>> performance) >>> --enable-glusterfs enable GlusterFS backend >>> --disable-glusterfs disable GlusterFS backend >>> + --enable-archipelago enable Archipelago backend >>> + --disable-archipelago disable Archipelago backend >>> --enable-gcov enable test coverage analysis with gcov >>> --gcov=GCOV use specified gcov [$gcov_tool] >>> --disable-tpm disable TPM support >>> @@ -3051,6 +3058,33 @@ EOF >>> fi >>> fi >>> + >>> +########################################## >>> +# archipelago probe >>> +if test "$archipelago" != "no" ; then >>> + cat > $TMPC <>> +#include >>> +#include >>> +#include >>> +int main(void) { >>> + xseg_initialize(); >>> + return 0; >>> +} >>> +EOF >>> + archipelago_libs=-lxseg >>> + if compile_prog "" "$archipelago_libs"; then >>> + archipelago="yes" >>> + libs_tools="$archipelago_libs $libs_tools" >>> + libs_softmmu="$archipelago_libs $libs_softmmu" >>> + else >>> + if test "$archipelago" = "yes" ; then >>> + feature_not_found "Archipelago backend support" "Install >>> libxseg devel" >>> + fi >>> + archipelago="no" >>> + fi >>> +fi >>> + >>> + >>> ########################################## >>> # glusterfs probe >>> if test "$glusterfs" != "no" ; then >>> @@ -4230,6 +4264,7 @@ echo "seccomp support $seccomp" >>> echo "coroutine backend $coroutine" >>> echo "coroutine pool $coroutine_pool" >>> echo "GlusterFS support $glusterfs" >>> +echo "Archipelago support $archipelago" >>> echo "virtio-blk-data-plane $virtio_blk_data_plane" >>> echo "gcov $gcov_tool" >>> echo "gcov enabled $gcov" >>> @@ -4665,6 +4700,11 @@ if test "$glusterfs_zerofill" = "yes" ; then >>> echo "CONFIG_GLUSTERFS_ZEROFILL=y" >> $config_host_mak >>> fi >>> +if test "$archipelago" = "yes" ; then >>> + echo "CONFIG_ARCHIPELAGO=m" >> $config_host_mak >>> + echo "ARCHIPELAGO_LIBS=$archipelago_libs" >> $config_host_mak >>> +fi >>> + >>> if test "$libssh2" = "yes" ; then >>> echo "CONFIG_LIBSSH2=m" >> $config_host_mak >>> echo "LIBSSH2_CFLAGS=$libssh2_cflags" >> $config_host_mak >>> -- >>> 1.7.10.4 >>> >>> >