From: Chrysostomos Nanakos <cnanakos@grnet.gr>
To: Stefan Hajnoczi <stefanha@gmail.com>
Cc: kwolf@redhat.com, qemu-devel@nongnu.org, stefanha@redhat.com
Subject: Re: [Qemu-devel] [PATCH v4 1/3] block: Support Archipelago as a QEMU block backend
Date: Mon, 23 Jun 2014 11:17:16 +0300 [thread overview]
Message-ID: <53A7E28C.9060107@grnet.gr> (raw)
In-Reply-To: <20140620143346.GG11029@stefanha-thinkpad.redhat.com>
On 06/20/2014 05:33 PM, Stefan Hajnoczi wrote:
> On Thu, Jun 19, 2014 at 05:48:46PM +0300, Chrysostomos Nanakos wrote:
>> +typedef struct BDRVArchipelagoState {
>> + int fds[2];
>> + int qemu_aio_count;
> This field is never used. It's increment and decremented but nothing
> ever checks the value. It can be dropped.
>
>> + int event_reader_pos;
>> + ArchipelagoAIOCB *event_acb;
>> + const char *volname;
>> + uint64_t size;
>> + /* Archipelago specific */
>> + struct xseg *xseg;
>> + struct xseg_port *port;
>> + xport srcport;
>> + xport sport;
>> + xport mportno;
>> + xport vportno;
>> + QemuMutex archip_mutex;
>> + QemuCond archip_cond;
>> + bool is_signaled;
>> + /* Request handler specific */
>> + QemuThread request_th;
>> + QemuCond request_cond;
>> + QemuMutex request_mutex;
>> + bool th_is_signaled;
>> + bool stopping;
>> +} BDRVArchipelagoState;
>> +
>> +typedef struct ArchipelagoSegmentedRequest {
>> + size_t count;
>> + size_t total;
>> + int ref;
>> + int failed;
>> +} ArchipelagoSegmentedRequest;
>> +
>> +typedef struct AIORequestData {
>> + const char *volname;
>> + off_t offset;
>> + size_t size;
>> + uint64_t bufidx;
>> + int ret;
>> + int op;
>> + ArchipelagoAIOCB *aio_cb;
>> + ArchipelagoSegmentedRequest *segreq;
>> +} AIORequestData;
>> +
>> +
>> +static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb);
>> +
>> +static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
>> +{
>> + if (xseg && (sport != srcport)) {
>> + xseg_init_local_signal(xseg, srcport);
>> + sport = srcport;
>> + }
>> +}
> QEMU should clean up by calling xseg_quit_local_signal().
>
>> +
>> +static void archipelago_finish_aiocb(AIORequestData *reqdata)
>> +{
>> + int ret;
>> + ret = qemu_archipelago_signal_pipe(reqdata->aio_cb);
>> + if (ret < 0) {
>> + error_report("archipelago_finish_aiocb(): failed writing"
>> + " aio_cb->s->fds");
>> + }
>> + g_free(reqdata);
>> +}
>> +
>> +static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port *port,
>> + struct xseg_request *expected_req)
>> +{
>> + struct xseg_request *req;
>> + xseg_prepare_wait(xseg, srcport);
>> + void *psd = xseg_get_signal_desc(xseg, port);
>> + while (1) {
>> + req = xseg_receive(xseg, srcport, 0);
>> + if (req) {
>> + if (req != expected_req) {
>> + archipelagolog("Unknown received request\n");
>> + xseg_put_request(xseg, req, srcport);
>> + } else if (!(req->state & XS_SERVED)) {
>> + archipelagolog("Failed req\n");
>> + return -1;
>> + } else {
>> + break;
>> + }
>> + }
>> + xseg_wait_signal(xseg, psd, 100000UL);
>> + }
>> + xseg_cancel_wait(xseg, srcport);
>> + return 0;
>> +}
>> +
>> +static void xseg_request_handler(void *state)
>> +{
> This thread is only necessary because you're not integrating xseg into
> the QEMU event loop. If you got the pipe fds from xseg and used
> aio_set_fd_handler() you could eliminate this thread. The advantage is
> that you can skip the archipelago_finish_aiocb() and get slightly better
> performance due to one less context switch between threads.
>
>> + BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
>> + void *psd = xseg_get_signal_desc(s->xseg, s->port);
>> + qemu_mutex_lock(&s->request_mutex);
>> +
>> + while (!s->stopping) {
>> + struct xseg_request *req;
>> + char *data;
>> + xseg_prepare_wait(s->xseg, s->srcport);
>> + req = xseg_receive(s->xseg, s->srcport, 0);
>> + if (req) {
>> + AIORequestData *reqdata;
>> + ArchipelagoSegmentedRequest *segreq;
>> + xseg_get_req_data(s->xseg, req, (void **)&reqdata);
>> +
>> + if (!(req->state & XS_SERVED)) {
>> + segreq = reqdata->segreq;
>> + __sync_bool_compare_and_swap(&segreq->failed, 0, 1);
>> + }
>> +
>> + switch (reqdata->op) {
>> + case ARCHIP_OP_READ:
>> + data = xseg_get_data(s->xseg, req);
>> + segreq = reqdata->segreq;
>> + segreq->count += req->serviced;
>> +
>> + qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx,
>> + data,
>> + req->serviced);
>> +
>> + xseg_put_request(s->xseg, req, s->srcport);
>> +
>> + __sync_add_and_fetch(&segreq->ref, -1);
>> +
>> + if (segreq->ref == 0) {
> Not sure about the value of __sync_add_and_fetch() since the if
> statement fetches segreq->ref again. But I'm not reviewing the details
> of the shared memory accesses. I'm assuming this stuff is correct,
> secure, etc.
Yes you are right, IMHO a better and safer approach is:
if ((__sync_add_and_fetch(&segreq->ref, -1)) == 0) {
...
>
>> + if (!segreq->failed) {
>> + reqdata->aio_cb->ret = segreq->count;
>> + archipelago_finish_aiocb(reqdata);
>> + }
> What does segreq->failed mean? We should always finish the I/O request,
> otherwise the upper layers will run out of resources as we leak
> failed requests.
Yes you are right.
If a request fails while submitting it to Archipelago
archipelago_aio_segmented_rw()
will return -EIO to qemu_archipelago_aio_rw() which will return NULL to
.bdrv_aio_readv/_write(). Now if all requests to Archipelago have
succeeded in submission and one or all of them haven't been serviced
(partial read/write) from Archipelago, archipelago_finish_aiocb() will
fail the request. The last one wasn't implemented in this patch, v5
series has the appropriate changes.
Is this a proper and accepted approach along with the removal of the
pipe code and the introduction of the QEMU "bottom-half" scheduled in
archipelago_finish_aiocb()?
>
>> +static void parse_filename_opts(const char *filename, Error **errp,
>> + char **volume, xport *mport, xport *vport)
>> +{
>> + const char *start;
>> + char *tokens[3], *ds;
>> + int idx;
>> + xport lmport = NoPort, lvport = NoPort;
>> +
>> + strstart(filename, "archipelago:", &start);
>> +
>> + ds = g_strdup(start);
>> + tokens[0] = strtok(ds, "/");
>> + tokens[1] = strtok(NULL, ":");
>> + tokens[2] = strtok(NULL, "\0");
>> +
>> + if (!strlen(tokens[0])) {
>> + error_setg(errp, "volume name must be specified first");
>> + return;
> ds is leaked.
>
>> + }
>> +
>> + for (idx = 1; idx < 3; idx++) {
>> + if (tokens[idx] != NULL) {
>> + if (strstart(tokens[idx], "mport=", NULL)) {
>> + xseg_find_port(tokens[idx], "mport=", &lmport);
>> + }
>> + if (strstart(tokens[idx], "vport=", NULL)) {
>> + xseg_find_port(tokens[idx], "vport=", &lvport);
>> + }
>> + }
>> + }
>> +
>> + if ((lmport == (xport) -2) || (lvport == (xport) -2)) {
>> + error_setg(errp, "Usage: file=archipelago:"
>> + "<volumename>[/mport=<mapperd_port>"
>> + "[:vport=<vlmcd_port>]]");
> ds is leaked.
>
>> + return;
>> + }
>> + *volume = g_strdup(tokens[0]);
>> + *mport = lmport;
>> + *vport = lvport;
>> + g_free(ds);
>> +}
>> +
>> +static void archipelago_parse_filename(const char *filename, QDict *options,
>> + Error **errp)
>> +{
>> + const char *start;
>> + char *volume = NULL;
>> + xport mport = NoPort, vport = NoPort;
>> +
>> + if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
>> + || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
>> + || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
>> + error_setg(errp, "volume/mport/vport and a file name may not be "
>> + "specified at the same time");
>> + return;
>> + }
>> +
>> + if (!strstart(filename, "archipelago:", &start)) {
>> + error_setg(errp, "File name must start with 'archipelago:'");
>> + return;
>> + }
>> +
>> + if (!strlen(start) || strstart(start, "/", NULL)) {
>> + error_setg(errp, "volume name must be specified");
>> + return;
>> + }
>> +
>> + parse_filename_opts(filename, errp, &volume, &mport, &vport);
>> +
>> + if (volume) {
>> + qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
>> + g_free(volume);
>> + }
>> + if (mport != NoPort) {
>> + qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
>> + }
>> + if (vport != NoPort) {
>> + qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
>> + }
>> +}
>> +
>> +static QemuOptsList archipelago_runtime_opts = {
>> + .name = "archipelago",
>> + .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
>> + .desc = {
>> + {
>> + .name = ARCHIPELAGO_OPT_VOLUME,
>> + .type = QEMU_OPT_STRING,
>> + .help = "Name of the volume image",
>> + },
>> + {
>> + .name = ARCHIPELAGO_OPT_MPORT,
>> + .type = QEMU_OPT_NUMBER,
>> + .help = "Archipelago mapperd port number"
>> + },
>> + {
>> + .name = ARCHIPELAGO_OPT_VPORT,
>> + .type = QEMU_OPT_NUMBER,
>> + .help = "Archipelago vlmcd port number"
>> +
>> + },
>> + { /* end of list */ }
>> + },
>> +};
>> +
>> +static int qemu_archipelago_open(BlockDriverState *bs,
>> + QDict *options,
>> + int bdrv_flags,
>> + Error **errp)
>> +{
>> + int ret = 0;
>> + const char *volume;
>> + QemuOpts *opts;
>> + Error *local_err = NULL;
>> + BDRVArchipelagoState *s = bs->opaque;
>> +
>> + opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, &error_abort);
>> + qemu_opts_absorb_qdict(opts, options, &local_err);
>> + if (local_err) {
>> + error_propagate(errp, local_err);
>> + qemu_opts_del(opts);
>> + return -EINVAL;
>> + }
>> +
>> + s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT, 1001);
>> + s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT, 501);
>> +
>> + volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
>> + if (volume == NULL) {
>> + error_setg(errp, "archipelago block driver requires an 'volume'"
>> + " options");
> "archipelago block driver requires the 'volume' option"
>
>> + error_propagate(errp, local_err);
> This line is unnecessary since the error message was already put into
> errp.
>
>> + qemu_opts_del(opts);
>> + return -EINVAL;
>> + }
>> + s->volname = g_strdup(volume);
>> +
>> + /* Initialize XSEG, join shared memory segment */
>> + ret = qemu_archipelago_init(s);
>> + if (ret < 0) {
>> + error_setg(errp, "cannot initialize XSEG and join shared "
>> + "memory segment");
>> + goto err_exit;
>> + }
>> +
>> + s->event_reader_pos = 0;
>> + ret = qemu_pipe(s->fds);
>> + if (ret < 0) {
>> + error_setg(errp, "cannot create pipe");
>> + goto err_exit;
> Do we need to xseg_leave() to avoid leaking xseg refcounts, leaving
> memory mapped, and memory leaks?
Removed qemu_pipe() call so we do not need to call xseg_leave() anymore.
>
>> + }
>> +
>> + fcntl(s->fds[ARCHIP_FD_READ], F_SETFL, O_NONBLOCK);
>> + fcntl(s->fds[ARCHIP_FD_WRITE], F_SETFL, O_NONBLOCK);
>> + qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ],
>> + qemu_archipelago_aio_event_reader, NULL,
>> + s);
>> +
>> + qemu_opts_del(opts);
>> + return 0;
>> +
>> +err_exit:
>> + qemu_opts_del(opts);
>> + return ret;
> s->volname is leaked
>
>> +}
>> +
>> +static void qemu_archipelago_close(BlockDriverState *bs)
>> +{
>> + int r, targetlen;
>> + char *target;
>> + struct xseg_request *req;
>> + BDRVArchipelagoState *s = bs->opaque;
>> +
>> + qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ], NULL, NULL, NULL);
>> + close(s->fds[0]);
>> + close(s->fds[1]);
>> +
>> + s->stopping = true;
>> +
>> + qemu_mutex_lock(&s->request_mutex);
>> + while (!s->th_is_signaled) {
>> + qemu_cond_wait(&s->request_cond,
>> + &s->request_mutex);
>> + }
>> + qemu_mutex_unlock(&s->request_mutex);
>> + qemu_cond_destroy(&s->request_cond);
>> + qemu_mutex_destroy(&s->request_mutex);
> It's not safe to qemu_mutex_destroy() because the other thread may still
> be inside qemu_mutex_unlock(&s->request_mutex) and may still access
> s->request_mutex memory.
>
> Use qemu_thread_join() before destroying request_cond and request_mutex.
> That way you can be sure there is no race condition.
>
> (I recently did the same thing and Paolo Bonzini pointed out the bug.
> After checking the glibc implementation I was convinced that it's not
> safe.)
Got it! Paolo was absolutely right! Thanks for sharing!
>> +
>> + qemu_cond_destroy(&s->archip_cond);
>> + qemu_mutex_destroy(&s->archip_mutex);
>> +
>> + targetlen = strlen(s->volname);
>> + req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
>> + if (!req) {
>> + archipelagolog("Cannot get XSEG request\n");
>> + goto err_exit;
>> + }
>> + r = xseg_prep_request(s->xseg, req, targetlen, 0);
>> + if (r < 0) {
>> + xseg_put_request(s->xseg, req, s->srcport);
>> + archipelagolog("Cannot prepare XSEG close request\n");
>> + goto err_exit;
>> + }
>> +
>> + target = xseg_get_target(s->xseg, req);
>> + strncpy(target, s->volname, targetlen);
> Using strncpy() hints that target is a string when in fact it's not. I
> think memcpy() would be clearer here since you don't want a '\0' byte at
> the end of the string.
>
> Or maybe I'm wrong and there is some guarantee that there will be a '\0'
> byte after target?
No you are not wrong, memcpy() is clearer, fixed for v5 series.
>> + req->size = req->datalen;
>> + req->offset = 0;
>> + req->op = X_CLOSE;
>> +
>> + xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
>> + if (p == NoPort) {
>> + xseg_put_request(s->xseg, req, s->srcport);
>> + archipelagolog("Cannot submit XSEG close request\n");
>> + goto err_exit;
>> + }
>> +
>> + xseg_signal(s->xseg, p);
>> + r = wait_reply(s->xseg, s->srcport, s->port, req);
>> + if (r < 0) {
>> + archipelagolog("wait_reply() error\n");
>> + }
>> + if (!(req->state & XS_SERVED)) {
>> + archipelagolog("Could no close map for volume '%s'\n", s->volname);
>> + }
>> +
>> + xseg_put_request(s->xseg, req, s->srcport);
>> +
>> +err_exit:
>> + xseg_leave_dynport(s->xseg, s->port);
>> + xseg_leave(s->xseg);
> s->volname is leaked.
>
>> +}
>> +
>> +static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb)
>> +{
>> + ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb;
>> + aio_cb->cancelled = true;
>> + while (aio_cb->status == -EINPROGRESS) {
>> + qemu_aio_wait();
>> + }
>> + qemu_aio_release(aio_cb);
>> +}
>> +
>> +static const AIOCBInfo archipelago_aiocb_info = {
>> + .aiocb_size = sizeof(ArchipelagoAIOCB),
>> + .cancel = qemu_archipelago_aio_cancel,
>> +};
>> +
>> +static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb)
>> +{
>> + int ret = 0;
>> + while (1) {
>> + fd_set wfd;
>> + int fd = aio_cb->s->fds[1];
>> +
>> + ret = write(fd, (void *)&aio_cb, sizeof(aio_cb));
>> + if (ret > 0) {
>> + break;
>> + }
>> + if (errno == EINTR) {
>> + continue;
>> + }
>> + if (errno != EAGAIN) {
>> + break;
>> + }
>> + FD_ZERO(&wfd);
>> + FD_SET(fd, &wfd);
>> + do {
>> + ret = select(fd + 1, NULL, &wfd, NULL, NULL);
>> + } while (ret < 0 && errno == EINTR);
>> + }
>> + return ret;
>> +}
> A newer signalling approach is available and will let you drop the pipe
> code. QEMUBH is a "bottom half" or deferred function call that can be
> scheduled in an event loop. Scheduling the the QEMUBH is thread-safe so
> you can perform it from any thread.
>
> See block/gluster.c:gluster_finish_aiocb() for an example using QEMUBH.
>
>> +static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
>> +{
>> + uint64_t size;
>> + int ret, targetlen;
>> + struct xseg_request *req;
>> + struct xseg_reply_info *xinfo;
>> + AIORequestData *reqdata = g_malloc(sizeof(AIORequestData));
>> +
>> + if (!reqdata) {
>> + archipelagolog("Cannot allocate reqdata\n");
>> + return -1;
> g_malloc() never returns NULL, this if statement can be dropped.
next prev parent reply other threads:[~2014-06-23 8:18 UTC|newest]
Thread overview: 8+ messages / expand[flat|nested] mbox.gz Atom feed top
2014-06-19 14:48 [Qemu-devel] [PATCH v4 0/3] Support Archipelago as a QEMU block backend Chrysostomos Nanakos
2014-06-19 14:48 ` [Qemu-devel] [PATCH v4 1/3] block: " Chrysostomos Nanakos
2014-06-20 14:33 ` Stefan Hajnoczi
2014-06-23 8:17 ` Chrysostomos Nanakos [this message]
2014-06-23 8:31 ` Stefan Hajnoczi
2014-06-19 14:48 ` [Qemu-devel] [PATCH v4 2/3] block/archipelago: Add support for creating images Chrysostomos Nanakos
2014-06-20 14:35 ` Stefan Hajnoczi
2014-06-19 14:48 ` [Qemu-devel] [PATCH v4 3/3] QMP: Add support for Archipelago Chrysostomos Nanakos
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=53A7E28C.9060107@grnet.gr \
--to=cnanakos@grnet.gr \
--cc=kwolf@redhat.com \
--cc=qemu-devel@nongnu.org \
--cc=stefanha@gmail.com \
--cc=stefanha@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).