qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: Chrysostomos Nanakos <cnanakos@grnet.gr>
To: Stefan Hajnoczi <stefanha@gmail.com>
Cc: kwolf@redhat.com, qemu-devel@nongnu.org, stefanha@redhat.com
Subject: Re: [Qemu-devel] [PATCH v4 1/3] block: Support Archipelago as a QEMU block backend
Date: Mon, 23 Jun 2014 11:17:16 +0300	[thread overview]
Message-ID: <53A7E28C.9060107@grnet.gr> (raw)
In-Reply-To: <20140620143346.GG11029@stefanha-thinkpad.redhat.com>

On 06/20/2014 05:33 PM, Stefan Hajnoczi wrote:
> On Thu, Jun 19, 2014 at 05:48:46PM +0300, Chrysostomos Nanakos wrote:
>> +typedef struct BDRVArchipelagoState {
>> +    int fds[2];
>> +    int qemu_aio_count;
> This field is never used.  It's increment and decremented but nothing
> ever checks the value.  It can be dropped.
>
>> +    int event_reader_pos;
>> +    ArchipelagoAIOCB *event_acb;
>> +    const char *volname;
>> +    uint64_t size;
>> +    /* Archipelago specific */
>> +    struct xseg *xseg;
>> +    struct xseg_port *port;
>> +    xport srcport;
>> +    xport sport;
>> +    xport mportno;
>> +    xport vportno;
>> +    QemuMutex archip_mutex;
>> +    QemuCond archip_cond;
>> +    bool is_signaled;
>> +    /* Request handler specific */
>> +    QemuThread request_th;
>> +    QemuCond request_cond;
>> +    QemuMutex request_mutex;
>> +    bool th_is_signaled;
>> +    bool stopping;
>> +} BDRVArchipelagoState;
>> +
>> +typedef struct ArchipelagoSegmentedRequest {
>> +    size_t count;
>> +    size_t total;
>> +    int ref;
>> +    int failed;
>> +} ArchipelagoSegmentedRequest;
>> +
>> +typedef struct AIORequestData {
>> +    const char *volname;
>> +    off_t offset;
>> +    size_t size;
>> +    uint64_t bufidx;
>> +    int ret;
>> +    int op;
>> +    ArchipelagoAIOCB *aio_cb;
>> +    ArchipelagoSegmentedRequest *segreq;
>> +} AIORequestData;
>> +
>> +
>> +static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb);
>> +
>> +static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
>> +{
>> +    if (xseg && (sport != srcport)) {
>> +        xseg_init_local_signal(xseg, srcport);
>> +        sport = srcport;
>> +    }
>> +}
> QEMU should clean up by calling xseg_quit_local_signal().
>
>> +
>> +static void archipelago_finish_aiocb(AIORequestData *reqdata)
>> +{
>> +    int ret;
>> +    ret = qemu_archipelago_signal_pipe(reqdata->aio_cb);
>> +    if (ret < 0) {
>> +        error_report("archipelago_finish_aiocb(): failed writing"
>> +                     " aio_cb->s->fds");
>> +    }
>> +    g_free(reqdata);
>> +}
>> +
>> +static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port *port,
>> +                      struct xseg_request *expected_req)
>> +{
>> +    struct xseg_request *req;
>> +    xseg_prepare_wait(xseg, srcport);
>> +    void *psd = xseg_get_signal_desc(xseg, port);
>> +    while (1) {
>> +        req = xseg_receive(xseg, srcport, 0);
>> +        if (req) {
>> +            if (req != expected_req) {
>> +                archipelagolog("Unknown received request\n");
>> +                xseg_put_request(xseg, req, srcport);
>> +            } else if (!(req->state & XS_SERVED)) {
>> +                archipelagolog("Failed req\n");
>> +                return -1;
>> +            } else {
>> +                break;
>> +            }
>> +        }
>> +        xseg_wait_signal(xseg, psd, 100000UL);
>> +    }
>> +    xseg_cancel_wait(xseg, srcport);
>> +    return 0;
>> +}
>> +
>> +static void xseg_request_handler(void *state)
>> +{
> This thread is only necessary because you're not integrating xseg into
> the QEMU event loop.  If you got the pipe fds from xseg and used
> aio_set_fd_handler() you could eliminate this thread.  The advantage is
> that you can skip the archipelago_finish_aiocb() and get slightly better
> performance due to one less context switch between threads.
>
>> +    BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
>> +    void *psd = xseg_get_signal_desc(s->xseg, s->port);
>> +    qemu_mutex_lock(&s->request_mutex);
>> +
>> +    while (!s->stopping) {
>> +        struct xseg_request *req;
>> +        char *data;
>> +        xseg_prepare_wait(s->xseg, s->srcport);
>> +        req = xseg_receive(s->xseg, s->srcport, 0);
>> +        if (req) {
>> +            AIORequestData *reqdata;
>> +            ArchipelagoSegmentedRequest *segreq;
>> +            xseg_get_req_data(s->xseg, req, (void **)&reqdata);
>> +
>> +            if (!(req->state & XS_SERVED)) {
>> +                    segreq = reqdata->segreq;
>> +                    __sync_bool_compare_and_swap(&segreq->failed, 0, 1);
>> +            }
>> +
>> +            switch (reqdata->op) {
>> +            case ARCHIP_OP_READ:
>> +                    data = xseg_get_data(s->xseg, req);
>> +                    segreq = reqdata->segreq;
>> +                    segreq->count += req->serviced;
>> +
>> +                    qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx,
>> +                            data,
>> +                            req->serviced);
>> +
>> +                    xseg_put_request(s->xseg, req, s->srcport);
>> +
>> +                    __sync_add_and_fetch(&segreq->ref, -1);
>> +
>> +                    if (segreq->ref == 0) {
> Not sure about the value of __sync_add_and_fetch() since the if
> statement fetches segreq->ref again.  But I'm not reviewing the details
> of the shared memory accesses.  I'm assuming this stuff is correct,
> secure, etc.

Yes you are right, IMHO a better and safer approach is:

if ((__sync_add_and_fetch(&segreq->ref, -1)) == 0) {
...

>
>> +                        if (!segreq->failed) {
>> +                            reqdata->aio_cb->ret = segreq->count;
>> +                            archipelago_finish_aiocb(reqdata);
>> +                        }
> What does segreq->failed mean?  We should always finish the I/O request,
> otherwise the upper layers will run out of resources as we leak
> failed requests.

Yes you are right.
If a request fails while submitting it to Archipelago 
archipelago_aio_segmented_rw()
will return -EIO to qemu_archipelago_aio_rw() which will return NULL to 
.bdrv_aio_readv/_write(). Now if all requests to Archipelago have 
succeeded in submission and one or all of them haven't been serviced 
(partial read/write) from Archipelago, archipelago_finish_aiocb() will 
fail the request. The last one wasn't implemented in this patch, v5 
series has the appropriate changes.

Is this a proper and accepted approach along with the removal of the 
pipe code and the introduction of the QEMU "bottom-half" scheduled in 
archipelago_finish_aiocb()?



>
>> +static void parse_filename_opts(const char *filename, Error **errp,
>> +                                char **volume, xport *mport, xport *vport)
>> +{
>> +    const char *start;
>> +    char *tokens[3], *ds;
>> +    int idx;
>> +    xport lmport = NoPort, lvport = NoPort;
>> +
>> +    strstart(filename, "archipelago:", &start);
>> +
>> +    ds = g_strdup(start);
>> +    tokens[0] = strtok(ds, "/");
>> +    tokens[1] = strtok(NULL, ":");
>> +    tokens[2] = strtok(NULL, "\0");
>> +
>> +    if (!strlen(tokens[0])) {
>> +        error_setg(errp, "volume name must be specified first");
>> +        return;
> ds is leaked.
>
>> +    }
>> +
>> +    for (idx = 1; idx < 3; idx++) {
>> +        if (tokens[idx] != NULL) {
>> +            if (strstart(tokens[idx], "mport=", NULL)) {
>> +                xseg_find_port(tokens[idx], "mport=", &lmport);
>> +            }
>> +            if (strstart(tokens[idx], "vport=", NULL)) {
>> +                xseg_find_port(tokens[idx], "vport=", &lvport);
>> +            }
>> +        }
>> +    }
>> +
>> +    if ((lmport == (xport) -2) || (lvport == (xport) -2)) {
>> +        error_setg(errp, "Usage: file=archipelago:"
>> +                   "<volumename>[/mport=<mapperd_port>"
>> +                   "[:vport=<vlmcd_port>]]");
> ds is leaked.
>
>> +        return;
>> +    }
>> +    *volume = g_strdup(tokens[0]);
>> +    *mport = lmport;
>> +    *vport = lvport;
>> +    g_free(ds);
>> +}
>> +
>> +static void archipelago_parse_filename(const char *filename, QDict *options,
>> +                                       Error **errp)
>> +{
>> +    const char *start;
>> +    char *volume = NULL;
>> +    xport mport = NoPort, vport = NoPort;
>> +
>> +    if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
>> +            || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
>> +            || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
>> +        error_setg(errp, "volume/mport/vport and a file name may not be "
>> +                         "specified at the same time");
>> +        return;
>> +    }
>> +
>> +    if (!strstart(filename, "archipelago:", &start)) {
>> +        error_setg(errp, "File name must start with 'archipelago:'");
>> +        return;
>> +    }
>> +
>> +    if (!strlen(start) || strstart(start, "/", NULL)) {
>> +        error_setg(errp, "volume name must be specified");
>> +        return;
>> +    }
>> +
>> +    parse_filename_opts(filename, errp, &volume, &mport, &vport);
>> +
>> +    if (volume) {
>> +        qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
>> +        g_free(volume);
>> +    }
>> +    if (mport != NoPort) {
>> +        qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
>> +    }
>> +    if (vport != NoPort) {
>> +        qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
>> +    }
>> +}
>> +
>> +static QemuOptsList archipelago_runtime_opts = {
>> +    .name = "archipelago",
>> +    .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
>> +    .desc = {
>> +        {
>> +            .name = ARCHIPELAGO_OPT_VOLUME,
>> +            .type = QEMU_OPT_STRING,
>> +            .help = "Name of the volume image",
>> +        },
>> +        {
>> +            .name = ARCHIPELAGO_OPT_MPORT,
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "Archipelago mapperd port number"
>> +        },
>> +        {
>> +            .name = ARCHIPELAGO_OPT_VPORT,
>> +            .type = QEMU_OPT_NUMBER,
>> +            .help = "Archipelago vlmcd port number"
>> +
>> +        },
>> +        { /* end of list */ }
>> +    },
>> +};
>> +
>> +static int qemu_archipelago_open(BlockDriverState *bs,
>> +                                 QDict *options,
>> +                                 int bdrv_flags,
>> +                                 Error **errp)
>> +{
>> +    int ret = 0;
>> +    const char *volume;
>> +    QemuOpts *opts;
>> +    Error *local_err = NULL;
>> +    BDRVArchipelagoState *s = bs->opaque;
>> +
>> +    opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, &error_abort);
>> +    qemu_opts_absorb_qdict(opts, options, &local_err);
>> +    if (local_err) {
>> +        error_propagate(errp, local_err);
>> +        qemu_opts_del(opts);
>> +        return -EINVAL;
>> +    }
>> +
>> +    s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT, 1001);
>> +    s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT, 501);
>> +
>> +    volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
>> +    if (volume == NULL) {
>> +        error_setg(errp, "archipelago block driver requires an 'volume'"
>> +                   " options");
> "archipelago block driver requires the 'volume' option"
>
>> +        error_propagate(errp, local_err);
> This line is unnecessary since the error message was already put into
> errp.
>
>> +        qemu_opts_del(opts);
>> +        return -EINVAL;
>> +    }
>> +    s->volname = g_strdup(volume);
>> +
>> +    /* Initialize XSEG, join shared memory segment */
>> +    ret = qemu_archipelago_init(s);
>> +    if (ret < 0) {
>> +        error_setg(errp, "cannot initialize XSEG and join shared "
>> +                   "memory segment");
>> +        goto err_exit;
>> +    }
>> +
>> +    s->event_reader_pos = 0;
>> +    ret = qemu_pipe(s->fds);
>> +    if (ret < 0) {
>> +        error_setg(errp, "cannot create pipe");
>> +        goto err_exit;
> Do we need to xseg_leave() to avoid leaking xseg refcounts, leaving
> memory mapped, and memory leaks?

Removed qemu_pipe() call so we do not need to call xseg_leave() anymore.

>
>> +    }
>> +
>> +    fcntl(s->fds[ARCHIP_FD_READ], F_SETFL, O_NONBLOCK);
>> +    fcntl(s->fds[ARCHIP_FD_WRITE], F_SETFL, O_NONBLOCK);
>> +    qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ],
>> +                            qemu_archipelago_aio_event_reader, NULL,
>> +                            s);
>> +
>> +    qemu_opts_del(opts);
>> +    return 0;
>> +
>> +err_exit:
>> +    qemu_opts_del(opts);
>> +    return ret;
> s->volname is leaked
>
>> +}
>> +
>> +static void qemu_archipelago_close(BlockDriverState *bs)
>> +{
>> +    int r, targetlen;
>> +    char *target;
>> +    struct xseg_request *req;
>> +    BDRVArchipelagoState *s = bs->opaque;
>> +
>> +    qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ], NULL, NULL, NULL);
>> +    close(s->fds[0]);
>> +    close(s->fds[1]);
>> +
>> +    s->stopping = true;
>> +
>> +    qemu_mutex_lock(&s->request_mutex);
>> +    while (!s->th_is_signaled) {
>> +        qemu_cond_wait(&s->request_cond,
>> +                       &s->request_mutex);
>> +    }
>> +    qemu_mutex_unlock(&s->request_mutex);
>> +    qemu_cond_destroy(&s->request_cond);
>> +    qemu_mutex_destroy(&s->request_mutex);
> It's not safe to qemu_mutex_destroy() because the other thread may still
> be inside qemu_mutex_unlock(&s->request_mutex) and may still access
> s->request_mutex memory.
>
> Use qemu_thread_join() before destroying request_cond and request_mutex.
> That way you can be sure there is no race condition.
>
> (I recently did the same thing and Paolo Bonzini pointed out the bug.
> After checking the glibc implementation I was convinced that it's not
> safe.)

Got it! Paolo was absolutely right! Thanks for sharing!

>> +
>> +    qemu_cond_destroy(&s->archip_cond);
>> +    qemu_mutex_destroy(&s->archip_mutex);
>> +
>> +    targetlen = strlen(s->volname);
>> +    req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
>> +    if (!req) {
>> +        archipelagolog("Cannot get XSEG request\n");
>> +        goto err_exit;
>> +    }
>> +    r = xseg_prep_request(s->xseg, req, targetlen, 0);
>> +    if (r < 0) {
>> +        xseg_put_request(s->xseg, req, s->srcport);
>> +        archipelagolog("Cannot prepare XSEG close request\n");
>> +        goto err_exit;
>> +    }
>> +
>> +    target = xseg_get_target(s->xseg, req);
>> +    strncpy(target, s->volname, targetlen);
> Using strncpy() hints that target is a string when in fact it's not.  I
> think memcpy() would be clearer here since you don't want a '\0' byte at
> the end of the string.
>
> Or maybe I'm wrong and there is some guarantee that there will be a '\0'
> byte after target?

No you are not wrong, memcpy() is clearer, fixed for v5 series.

>> +    req->size = req->datalen;
>> +    req->offset = 0;
>> +    req->op = X_CLOSE;
>> +
>> +    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
>> +    if (p == NoPort) {
>> +        xseg_put_request(s->xseg, req, s->srcport);
>> +        archipelagolog("Cannot submit XSEG close request\n");
>> +        goto err_exit;
>> +    }
>> +
>> +    xseg_signal(s->xseg, p);
>> +    r = wait_reply(s->xseg, s->srcport, s->port, req);
>> +    if (r < 0) {
>> +        archipelagolog("wait_reply() error\n");
>> +    }
>> +    if (!(req->state & XS_SERVED)) {
>> +        archipelagolog("Could no close map for volume '%s'\n", s->volname);
>> +    }
>> +
>> +    xseg_put_request(s->xseg, req, s->srcport);
>> +
>> +err_exit:
>> +    xseg_leave_dynport(s->xseg, s->port);
>> +    xseg_leave(s->xseg);
> s->volname is leaked.
>
>> +}
>> +
>> +static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb)
>> +{
>> +    ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb;
>> +    aio_cb->cancelled = true;
>> +    while (aio_cb->status == -EINPROGRESS) {
>> +        qemu_aio_wait();
>> +    }
>> +    qemu_aio_release(aio_cb);
>> +}
>> +
>> +static const AIOCBInfo archipelago_aiocb_info = {
>> +    .aiocb_size = sizeof(ArchipelagoAIOCB),
>> +    .cancel = qemu_archipelago_aio_cancel,
>> +};
>> +
>> +static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb)
>> +{
>> +    int ret = 0;
>> +    while (1) {
>> +        fd_set wfd;
>> +        int fd = aio_cb->s->fds[1];
>> +
>> +        ret = write(fd, (void *)&aio_cb, sizeof(aio_cb));
>> +        if (ret > 0) {
>> +            break;
>> +        }
>> +        if (errno == EINTR) {
>> +            continue;
>> +        }
>> +        if (errno != EAGAIN) {
>> +            break;
>> +        }
>> +        FD_ZERO(&wfd);
>> +        FD_SET(fd, &wfd);
>> +        do {
>> +            ret = select(fd + 1, NULL, &wfd, NULL, NULL);
>> +        } while (ret < 0 && errno == EINTR);
>> +    }
>> +    return ret;
>> +}
> A newer signalling approach is available and will let you drop the pipe
> code.  QEMUBH is a "bottom half" or deferred function call that can be
> scheduled in an event loop.  Scheduling the the QEMUBH is thread-safe so
> you can perform it from any thread.

>
> See block/gluster.c:gluster_finish_aiocb() for an example using QEMUBH.
>
>> +static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
>> +{
>> +    uint64_t size;
>> +    int ret, targetlen;
>> +    struct xseg_request *req;
>> +    struct xseg_reply_info *xinfo;
>> +    AIORequestData *reqdata = g_malloc(sizeof(AIORequestData));
>> +
>> +    if (!reqdata) {
>> +        archipelagolog("Cannot allocate reqdata\n");
>> +        return -1;
> g_malloc() never returns NULL, this if statement can be dropped.

  reply	other threads:[~2014-06-23  8:18 UTC|newest]

Thread overview: 8+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2014-06-19 14:48 [Qemu-devel] [PATCH v4 0/3] Support Archipelago as a QEMU block backend Chrysostomos Nanakos
2014-06-19 14:48 ` [Qemu-devel] [PATCH v4 1/3] block: " Chrysostomos Nanakos
2014-06-20 14:33   ` Stefan Hajnoczi
2014-06-23  8:17     ` Chrysostomos Nanakos [this message]
2014-06-23  8:31       ` Stefan Hajnoczi
2014-06-19 14:48 ` [Qemu-devel] [PATCH v4 2/3] block/archipelago: Add support for creating images Chrysostomos Nanakos
2014-06-20 14:35   ` Stefan Hajnoczi
2014-06-19 14:48 ` [Qemu-devel] [PATCH v4 3/3] QMP: Add support for Archipelago Chrysostomos Nanakos

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=53A7E28C.9060107@grnet.gr \
    --to=cnanakos@grnet.gr \
    --cc=kwolf@redhat.com \
    --cc=qemu-devel@nongnu.org \
    --cc=stefanha@gmail.com \
    --cc=stefanha@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).