All of lore.kernel.org
 help / color / mirror / Atom feed
From: Stefan Hajnoczi <stefanha@gmail.com>
To: Chrysostomos Nanakos <cnanakos@grnet.gr>
Cc: kwolf@redhat.com, qemu-devel@nongnu.org, stefanha@redhat.com
Subject: Re: [Qemu-devel] [PATCH v4 1/3] block: Support Archipelago as a QEMU block backend
Date: Fri, 20 Jun 2014 22:33:46 +0800	[thread overview]
Message-ID: <20140620143346.GG11029@stefanha-thinkpad.redhat.com> (raw)
In-Reply-To: <1403189328-18457-2-git-send-email-cnanakos@grnet.gr>

[-- Attachment #1: Type: text/plain, Size: 15631 bytes --]

On Thu, Jun 19, 2014 at 05:48:46PM +0300, Chrysostomos Nanakos wrote:
> +typedef struct BDRVArchipelagoState {
> +    int fds[2];
> +    int qemu_aio_count;

This field is never used.  It's increment and decremented but nothing
ever checks the value.  It can be dropped.

> +    int event_reader_pos;
> +    ArchipelagoAIOCB *event_acb;
> +    const char *volname;
> +    uint64_t size;
> +    /* Archipelago specific */
> +    struct xseg *xseg;
> +    struct xseg_port *port;
> +    xport srcport;
> +    xport sport;
> +    xport mportno;
> +    xport vportno;
> +    QemuMutex archip_mutex;
> +    QemuCond archip_cond;
> +    bool is_signaled;
> +    /* Request handler specific */
> +    QemuThread request_th;
> +    QemuCond request_cond;
> +    QemuMutex request_mutex;
> +    bool th_is_signaled;
> +    bool stopping;
> +} BDRVArchipelagoState;
> +
> +typedef struct ArchipelagoSegmentedRequest {
> +    size_t count;
> +    size_t total;
> +    int ref;
> +    int failed;
> +} ArchipelagoSegmentedRequest;
> +
> +typedef struct AIORequestData {
> +    const char *volname;
> +    off_t offset;
> +    size_t size;
> +    uint64_t bufidx;
> +    int ret;
> +    int op;
> +    ArchipelagoAIOCB *aio_cb;
> +    ArchipelagoSegmentedRequest *segreq;
> +} AIORequestData;
> +
> +
> +static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb);
> +
> +static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
> +{
> +    if (xseg && (sport != srcport)) {
> +        xseg_init_local_signal(xseg, srcport);
> +        sport = srcport;
> +    }
> +}

QEMU should clean up by calling xseg_quit_local_signal().

> +
> +static void archipelago_finish_aiocb(AIORequestData *reqdata)
> +{
> +    int ret;
> +    ret = qemu_archipelago_signal_pipe(reqdata->aio_cb);
> +    if (ret < 0) {
> +        error_report("archipelago_finish_aiocb(): failed writing"
> +                     " aio_cb->s->fds");
> +    }
> +    g_free(reqdata);
> +}
> +
> +static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port *port,
> +                      struct xseg_request *expected_req)
> +{
> +    struct xseg_request *req;
> +    xseg_prepare_wait(xseg, srcport);
> +    void *psd = xseg_get_signal_desc(xseg, port);
> +    while (1) {
> +        req = xseg_receive(xseg, srcport, 0);
> +        if (req) {
> +            if (req != expected_req) {
> +                archipelagolog("Unknown received request\n");
> +                xseg_put_request(xseg, req, srcport);
> +            } else if (!(req->state & XS_SERVED)) {
> +                archipelagolog("Failed req\n");
> +                return -1;
> +            } else {
> +                break;
> +            }
> +        }
> +        xseg_wait_signal(xseg, psd, 100000UL);
> +    }
> +    xseg_cancel_wait(xseg, srcport);
> +    return 0;
> +}
> +
> +static void xseg_request_handler(void *state)
> +{

This thread is only necessary because you're not integrating xseg into
the QEMU event loop.  If you got the pipe fds from xseg and used
aio_set_fd_handler() you could eliminate this thread.  The advantage is
that you can skip the archipelago_finish_aiocb() and get slightly better
performance due to one less context switch between threads.

> +    BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
> +    void *psd = xseg_get_signal_desc(s->xseg, s->port);
> +    qemu_mutex_lock(&s->request_mutex);
> +
> +    while (!s->stopping) {
> +        struct xseg_request *req;
> +        char *data;
> +        xseg_prepare_wait(s->xseg, s->srcport);
> +        req = xseg_receive(s->xseg, s->srcport, 0);
> +        if (req) {
> +            AIORequestData *reqdata;
> +            ArchipelagoSegmentedRequest *segreq;
> +            xseg_get_req_data(s->xseg, req, (void **)&reqdata);
> +
> +            if (!(req->state & XS_SERVED)) {
> +                    segreq = reqdata->segreq;
> +                    __sync_bool_compare_and_swap(&segreq->failed, 0, 1);
> +            }
> +
> +            switch (reqdata->op) {
> +            case ARCHIP_OP_READ:
> +                    data = xseg_get_data(s->xseg, req);
> +                    segreq = reqdata->segreq;
> +                    segreq->count += req->serviced;
> +
> +                    qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx,
> +                            data,
> +                            req->serviced);
> +
> +                    xseg_put_request(s->xseg, req, s->srcport);
> +
> +                    __sync_add_and_fetch(&segreq->ref, -1);
> +
> +                    if (segreq->ref == 0) {

Not sure about the value of __sync_add_and_fetch() since the if
statement fetches segreq->ref again.  But I'm not reviewing the details
of the shared memory accesses.  I'm assuming this stuff is correct,
secure, etc.

> +                        if (!segreq->failed) {
> +                            reqdata->aio_cb->ret = segreq->count;
> +                            archipelago_finish_aiocb(reqdata);
> +                        }

What does segreq->failed mean?  We should always finish the I/O request,
otherwise the upper layers will run out of resources as we leak
failed requests.

> +static void parse_filename_opts(const char *filename, Error **errp,
> +                                char **volume, xport *mport, xport *vport)
> +{
> +    const char *start;
> +    char *tokens[3], *ds;
> +    int idx;
> +    xport lmport = NoPort, lvport = NoPort;
> +
> +    strstart(filename, "archipelago:", &start);
> +
> +    ds = g_strdup(start);
> +    tokens[0] = strtok(ds, "/");
> +    tokens[1] = strtok(NULL, ":");
> +    tokens[2] = strtok(NULL, "\0");
> +
> +    if (!strlen(tokens[0])) {
> +        error_setg(errp, "volume name must be specified first");
> +        return;

ds is leaked.

> +    }
> +
> +    for (idx = 1; idx < 3; idx++) {
> +        if (tokens[idx] != NULL) {
> +            if (strstart(tokens[idx], "mport=", NULL)) {
> +                xseg_find_port(tokens[idx], "mport=", &lmport);
> +            }
> +            if (strstart(tokens[idx], "vport=", NULL)) {
> +                xseg_find_port(tokens[idx], "vport=", &lvport);
> +            }
> +        }
> +    }
> +
> +    if ((lmport == (xport) -2) || (lvport == (xport) -2)) {
> +        error_setg(errp, "Usage: file=archipelago:"
> +                   "<volumename>[/mport=<mapperd_port>"
> +                   "[:vport=<vlmcd_port>]]");

ds is leaked.

> +        return;
> +    }
> +    *volume = g_strdup(tokens[0]);
> +    *mport = lmport;
> +    *vport = lvport;
> +    g_free(ds);
> +}
> +
> +static void archipelago_parse_filename(const char *filename, QDict *options,
> +                                       Error **errp)
> +{
> +    const char *start;
> +    char *volume = NULL;
> +    xport mport = NoPort, vport = NoPort;
> +
> +    if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
> +            || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
> +            || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
> +        error_setg(errp, "volume/mport/vport and a file name may not be "
> +                         "specified at the same time");
> +        return;
> +    }
> +
> +    if (!strstart(filename, "archipelago:", &start)) {
> +        error_setg(errp, "File name must start with 'archipelago:'");
> +        return;
> +    }
> +
> +    if (!strlen(start) || strstart(start, "/", NULL)) {
> +        error_setg(errp, "volume name must be specified");
> +        return;
> +    }
> +
> +    parse_filename_opts(filename, errp, &volume, &mport, &vport);
> +
> +    if (volume) {
> +        qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
> +        g_free(volume);
> +    }
> +    if (mport != NoPort) {
> +        qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
> +    }
> +    if (vport != NoPort) {
> +        qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
> +    }
> +}
> +
> +static QemuOptsList archipelago_runtime_opts = {
> +    .name = "archipelago",
> +    .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
> +    .desc = {
> +        {
> +            .name = ARCHIPELAGO_OPT_VOLUME,
> +            .type = QEMU_OPT_STRING,
> +            .help = "Name of the volume image",
> +        },
> +        {
> +            .name = ARCHIPELAGO_OPT_MPORT,
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "Archipelago mapperd port number"
> +        },
> +        {
> +            .name = ARCHIPELAGO_OPT_VPORT,
> +            .type = QEMU_OPT_NUMBER,
> +            .help = "Archipelago vlmcd port number"
> +
> +        },
> +        { /* end of list */ }
> +    },
> +};
> +
> +static int qemu_archipelago_open(BlockDriverState *bs,
> +                                 QDict *options,
> +                                 int bdrv_flags,
> +                                 Error **errp)
> +{
> +    int ret = 0;
> +    const char *volume;
> +    QemuOpts *opts;
> +    Error *local_err = NULL;
> +    BDRVArchipelagoState *s = bs->opaque;
> +
> +    opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, &error_abort);
> +    qemu_opts_absorb_qdict(opts, options, &local_err);
> +    if (local_err) {
> +        error_propagate(errp, local_err);
> +        qemu_opts_del(opts);
> +        return -EINVAL;
> +    }
> +
> +    s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT, 1001);
> +    s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT, 501);
> +
> +    volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
> +    if (volume == NULL) {
> +        error_setg(errp, "archipelago block driver requires an 'volume'"
> +                   " options");

"archipelago block driver requires the 'volume' option"

> +        error_propagate(errp, local_err);

This line is unnecessary since the error message was already put into
errp.

> +        qemu_opts_del(opts);
> +        return -EINVAL;
> +    }
> +    s->volname = g_strdup(volume);
> +
> +    /* Initialize XSEG, join shared memory segment */
> +    ret = qemu_archipelago_init(s);
> +    if (ret < 0) {
> +        error_setg(errp, "cannot initialize XSEG and join shared "
> +                   "memory segment");
> +        goto err_exit;
> +    }
> +
> +    s->event_reader_pos = 0;
> +    ret = qemu_pipe(s->fds);
> +    if (ret < 0) {
> +        error_setg(errp, "cannot create pipe");
> +        goto err_exit;

Do we need to xseg_leave() to avoid leaking xseg refcounts, leaving
memory mapped, and memory leaks?

> +    }
> +
> +    fcntl(s->fds[ARCHIP_FD_READ], F_SETFL, O_NONBLOCK);
> +    fcntl(s->fds[ARCHIP_FD_WRITE], F_SETFL, O_NONBLOCK);
> +    qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ],
> +                            qemu_archipelago_aio_event_reader, NULL,
> +                            s);
> +
> +    qemu_opts_del(opts);
> +    return 0;
> +
> +err_exit:
> +    qemu_opts_del(opts);
> +    return ret;

s->volname is leaked

> +}
> +
> +static void qemu_archipelago_close(BlockDriverState *bs)
> +{
> +    int r, targetlen;
> +    char *target;
> +    struct xseg_request *req;
> +    BDRVArchipelagoState *s = bs->opaque;
> +
> +    qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ], NULL, NULL, NULL);
> +    close(s->fds[0]);
> +    close(s->fds[1]);
> +
> +    s->stopping = true;
> +
> +    qemu_mutex_lock(&s->request_mutex);
> +    while (!s->th_is_signaled) {
> +        qemu_cond_wait(&s->request_cond,
> +                       &s->request_mutex);
> +    }
> +    qemu_mutex_unlock(&s->request_mutex);
> +    qemu_cond_destroy(&s->request_cond);
> +    qemu_mutex_destroy(&s->request_mutex);

It's not safe to qemu_mutex_destroy() because the other thread may still
be inside qemu_mutex_unlock(&s->request_mutex) and may still access
s->request_mutex memory.

Use qemu_thread_join() before destroying request_cond and request_mutex.
That way you can be sure there is no race condition.

(I recently did the same thing and Paolo Bonzini pointed out the bug.
After checking the glibc implementation I was convinced that it's not
safe.)

> +
> +    qemu_cond_destroy(&s->archip_cond);
> +    qemu_mutex_destroy(&s->archip_mutex);
> +
> +    targetlen = strlen(s->volname);
> +    req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
> +    if (!req) {
> +        archipelagolog("Cannot get XSEG request\n");
> +        goto err_exit;
> +    }
> +    r = xseg_prep_request(s->xseg, req, targetlen, 0);
> +    if (r < 0) {
> +        xseg_put_request(s->xseg, req, s->srcport);
> +        archipelagolog("Cannot prepare XSEG close request\n");
> +        goto err_exit;
> +    }
> +
> +    target = xseg_get_target(s->xseg, req);
> +    strncpy(target, s->volname, targetlen);

Using strncpy() hints that target is a string when in fact it's not.  I
think memcpy() would be clearer here since you don't want a '\0' byte at
the end of the string.

Or maybe I'm wrong and there is some guarantee that there will be a '\0'
byte after target?

> +    req->size = req->datalen;
> +    req->offset = 0;
> +    req->op = X_CLOSE;
> +
> +    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
> +    if (p == NoPort) {
> +        xseg_put_request(s->xseg, req, s->srcport);
> +        archipelagolog("Cannot submit XSEG close request\n");
> +        goto err_exit;
> +    }
> +
> +    xseg_signal(s->xseg, p);
> +    r = wait_reply(s->xseg, s->srcport, s->port, req);
> +    if (r < 0) {
> +        archipelagolog("wait_reply() error\n");
> +    }
> +    if (!(req->state & XS_SERVED)) {
> +        archipelagolog("Could no close map for volume '%s'\n", s->volname);
> +    }
> +
> +    xseg_put_request(s->xseg, req, s->srcport);
> +
> +err_exit:
> +    xseg_leave_dynport(s->xseg, s->port);
> +    xseg_leave(s->xseg);

s->volname is leaked.

> +}
> +
> +static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> +    ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb;
> +    aio_cb->cancelled = true;
> +    while (aio_cb->status == -EINPROGRESS) {
> +        qemu_aio_wait();
> +    }
> +    qemu_aio_release(aio_cb);
> +}
> +
> +static const AIOCBInfo archipelago_aiocb_info = {
> +    .aiocb_size = sizeof(ArchipelagoAIOCB),
> +    .cancel = qemu_archipelago_aio_cancel,
> +};
> +
> +static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb)
> +{
> +    int ret = 0;
> +    while (1) {
> +        fd_set wfd;
> +        int fd = aio_cb->s->fds[1];
> +
> +        ret = write(fd, (void *)&aio_cb, sizeof(aio_cb));
> +        if (ret > 0) {
> +            break;
> +        }
> +        if (errno == EINTR) {
> +            continue;
> +        }
> +        if (errno != EAGAIN) {
> +            break;
> +        }
> +        FD_ZERO(&wfd);
> +        FD_SET(fd, &wfd);
> +        do {
> +            ret = select(fd + 1, NULL, &wfd, NULL, NULL);
> +        } while (ret < 0 && errno == EINTR);
> +    }
> +    return ret;
> +}

A newer signalling approach is available and will let you drop the pipe
code.  QEMUBH is a "bottom half" or deferred function call that can be
scheduled in an event loop.  Scheduling the the QEMUBH is thread-safe so
you can perform it from any thread.

See block/gluster.c:gluster_finish_aiocb() for an example using QEMUBH.

> +static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
> +{
> +    uint64_t size;
> +    int ret, targetlen;
> +    struct xseg_request *req;
> +    struct xseg_reply_info *xinfo;
> +    AIORequestData *reqdata = g_malloc(sizeof(AIORequestData));
> +
> +    if (!reqdata) {
> +        archipelagolog("Cannot allocate reqdata\n");
> +        return -1;

g_malloc() never returns NULL, this if statement can be dropped.

[-- Attachment #2: Type: application/pgp-signature, Size: 473 bytes --]

  reply	other threads:[~2014-06-20 14:34 UTC|newest]

Thread overview: 8+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2014-06-19 14:48 [Qemu-devel] [PATCH v4 0/3] Support Archipelago as a QEMU block backend Chrysostomos Nanakos
2014-06-19 14:48 ` [Qemu-devel] [PATCH v4 1/3] block: " Chrysostomos Nanakos
2014-06-20 14:33   ` Stefan Hajnoczi [this message]
2014-06-23  8:17     ` Chrysostomos Nanakos
2014-06-23  8:31       ` Stefan Hajnoczi
2014-06-19 14:48 ` [Qemu-devel] [PATCH v4 2/3] block/archipelago: Add support for creating images Chrysostomos Nanakos
2014-06-20 14:35   ` Stefan Hajnoczi
2014-06-19 14:48 ` [Qemu-devel] [PATCH v4 3/3] QMP: Add support for Archipelago Chrysostomos Nanakos

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20140620143346.GG11029@stefanha-thinkpad.redhat.com \
    --to=stefanha@gmail.com \
    --cc=cnanakos@grnet.gr \
    --cc=kwolf@redhat.com \
    --cc=qemu-devel@nongnu.org \
    --cc=stefanha@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.