From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([2001:4830:134:3::10]:40772) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1WyzSX-00089i-KD for qemu-devel@nongnu.org; Mon, 23 Jun 2014 04:18:46 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1WyzSO-0003uc-Pt for qemu-devel@nongnu.org; Mon, 23 Jun 2014 04:18:37 -0400 Received: from averel.grnet-hq.admin.grnet.gr ([195.251.29.3]:40626) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1WyzSO-0003X9-8H for qemu-devel@nongnu.org; Mon, 23 Jun 2014 04:18:28 -0400 Message-ID: <53A7E28C.9060107@grnet.gr> Date: Mon, 23 Jun 2014 11:17:16 +0300 From: Chrysostomos Nanakos MIME-Version: 1.0 References: <1403189328-18457-1-git-send-email-cnanakos@grnet.gr> <1403189328-18457-2-git-send-email-cnanakos@grnet.gr> <20140620143346.GG11029@stefanha-thinkpad.redhat.com> In-Reply-To: <20140620143346.GG11029@stefanha-thinkpad.redhat.com> Content-Type: text/plain; charset=ISO-8859-1; format=flowed Content-Transfer-Encoding: 7bit Subject: Re: [Qemu-devel] [PATCH v4 1/3] block: Support Archipelago as a QEMU block backend List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Stefan Hajnoczi Cc: kwolf@redhat.com, qemu-devel@nongnu.org, stefanha@redhat.com On 06/20/2014 05:33 PM, Stefan Hajnoczi wrote: > On Thu, Jun 19, 2014 at 05:48:46PM +0300, Chrysostomos Nanakos wrote: >> +typedef struct BDRVArchipelagoState { >> + int fds[2]; >> + int qemu_aio_count; > This field is never used. It's increment and decremented but nothing > ever checks the value. It can be dropped. > >> + int event_reader_pos; >> + ArchipelagoAIOCB *event_acb; >> + const char *volname; >> + uint64_t size; >> + /* Archipelago specific */ >> + struct xseg *xseg; >> + struct xseg_port *port; >> + xport srcport; >> + xport sport; >> + xport mportno; >> + xport vportno; >> + QemuMutex archip_mutex; >> + QemuCond archip_cond; >> + bool is_signaled; >> + /* Request handler specific */ >> + QemuThread request_th; >> + QemuCond request_cond; >> + QemuMutex request_mutex; >> + bool th_is_signaled; >> + bool stopping; >> +} BDRVArchipelagoState; >> + >> +typedef struct ArchipelagoSegmentedRequest { >> + size_t count; >> + size_t total; >> + int ref; >> + int failed; >> +} ArchipelagoSegmentedRequest; >> + >> +typedef struct AIORequestData { >> + const char *volname; >> + off_t offset; >> + size_t size; >> + uint64_t bufidx; >> + int ret; >> + int op; >> + ArchipelagoAIOCB *aio_cb; >> + ArchipelagoSegmentedRequest *segreq; >> +} AIORequestData; >> + >> + >> +static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb); >> + >> +static void init_local_signal(struct xseg *xseg, xport sport, xport srcport) >> +{ >> + if (xseg && (sport != srcport)) { >> + xseg_init_local_signal(xseg, srcport); >> + sport = srcport; >> + } >> +} > QEMU should clean up by calling xseg_quit_local_signal(). > >> + >> +static void archipelago_finish_aiocb(AIORequestData *reqdata) >> +{ >> + int ret; >> + ret = qemu_archipelago_signal_pipe(reqdata->aio_cb); >> + if (ret < 0) { >> + error_report("archipelago_finish_aiocb(): failed writing" >> + " aio_cb->s->fds"); >> + } >> + g_free(reqdata); >> +} >> + >> +static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port *port, >> + struct xseg_request *expected_req) >> +{ >> + struct xseg_request *req; >> + xseg_prepare_wait(xseg, srcport); >> + void *psd = xseg_get_signal_desc(xseg, port); >> + while (1) { >> + req = xseg_receive(xseg, srcport, 0); >> + if (req) { >> + if (req != expected_req) { >> + archipelagolog("Unknown received request\n"); >> + xseg_put_request(xseg, req, srcport); >> + } else if (!(req->state & XS_SERVED)) { >> + archipelagolog("Failed req\n"); >> + return -1; >> + } else { >> + break; >> + } >> + } >> + xseg_wait_signal(xseg, psd, 100000UL); >> + } >> + xseg_cancel_wait(xseg, srcport); >> + return 0; >> +} >> + >> +static void xseg_request_handler(void *state) >> +{ > This thread is only necessary because you're not integrating xseg into > the QEMU event loop. If you got the pipe fds from xseg and used > aio_set_fd_handler() you could eliminate this thread. The advantage is > that you can skip the archipelago_finish_aiocb() and get slightly better > performance due to one less context switch between threads. > >> + BDRVArchipelagoState *s = (BDRVArchipelagoState *) state; >> + void *psd = xseg_get_signal_desc(s->xseg, s->port); >> + qemu_mutex_lock(&s->request_mutex); >> + >> + while (!s->stopping) { >> + struct xseg_request *req; >> + char *data; >> + xseg_prepare_wait(s->xseg, s->srcport); >> + req = xseg_receive(s->xseg, s->srcport, 0); >> + if (req) { >> + AIORequestData *reqdata; >> + ArchipelagoSegmentedRequest *segreq; >> + xseg_get_req_data(s->xseg, req, (void **)&reqdata); >> + >> + if (!(req->state & XS_SERVED)) { >> + segreq = reqdata->segreq; >> + __sync_bool_compare_and_swap(&segreq->failed, 0, 1); >> + } >> + >> + switch (reqdata->op) { >> + case ARCHIP_OP_READ: >> + data = xseg_get_data(s->xseg, req); >> + segreq = reqdata->segreq; >> + segreq->count += req->serviced; >> + >> + qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx, >> + data, >> + req->serviced); >> + >> + xseg_put_request(s->xseg, req, s->srcport); >> + >> + __sync_add_and_fetch(&segreq->ref, -1); >> + >> + if (segreq->ref == 0) { > Not sure about the value of __sync_add_and_fetch() since the if > statement fetches segreq->ref again. But I'm not reviewing the details > of the shared memory accesses. I'm assuming this stuff is correct, > secure, etc. Yes you are right, IMHO a better and safer approach is: if ((__sync_add_and_fetch(&segreq->ref, -1)) == 0) { ... > >> + if (!segreq->failed) { >> + reqdata->aio_cb->ret = segreq->count; >> + archipelago_finish_aiocb(reqdata); >> + } > What does segreq->failed mean? We should always finish the I/O request, > otherwise the upper layers will run out of resources as we leak > failed requests. Yes you are right. If a request fails while submitting it to Archipelago archipelago_aio_segmented_rw() will return -EIO to qemu_archipelago_aio_rw() which will return NULL to .bdrv_aio_readv/_write(). Now if all requests to Archipelago have succeeded in submission and one or all of them haven't been serviced (partial read/write) from Archipelago, archipelago_finish_aiocb() will fail the request. The last one wasn't implemented in this patch, v5 series has the appropriate changes. Is this a proper and accepted approach along with the removal of the pipe code and the introduction of the QEMU "bottom-half" scheduled in archipelago_finish_aiocb()? > >> +static void parse_filename_opts(const char *filename, Error **errp, >> + char **volume, xport *mport, xport *vport) >> +{ >> + const char *start; >> + char *tokens[3], *ds; >> + int idx; >> + xport lmport = NoPort, lvport = NoPort; >> + >> + strstart(filename, "archipelago:", &start); >> + >> + ds = g_strdup(start); >> + tokens[0] = strtok(ds, "/"); >> + tokens[1] = strtok(NULL, ":"); >> + tokens[2] = strtok(NULL, "\0"); >> + >> + if (!strlen(tokens[0])) { >> + error_setg(errp, "volume name must be specified first"); >> + return; > ds is leaked. > >> + } >> + >> + for (idx = 1; idx < 3; idx++) { >> + if (tokens[idx] != NULL) { >> + if (strstart(tokens[idx], "mport=", NULL)) { >> + xseg_find_port(tokens[idx], "mport=", &lmport); >> + } >> + if (strstart(tokens[idx], "vport=", NULL)) { >> + xseg_find_port(tokens[idx], "vport=", &lvport); >> + } >> + } >> + } >> + >> + if ((lmport == (xport) -2) || (lvport == (xport) -2)) { >> + error_setg(errp, "Usage: file=archipelago:" >> + "[/mport=" >> + "[:vport=]]"); > ds is leaked. > >> + return; >> + } >> + *volume = g_strdup(tokens[0]); >> + *mport = lmport; >> + *vport = lvport; >> + g_free(ds); >> +} >> + >> +static void archipelago_parse_filename(const char *filename, QDict *options, >> + Error **errp) >> +{ >> + const char *start; >> + char *volume = NULL; >> + xport mport = NoPort, vport = NoPort; >> + >> + if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME) >> + || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT) >> + || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) { >> + error_setg(errp, "volume/mport/vport and a file name may not be " >> + "specified at the same time"); >> + return; >> + } >> + >> + if (!strstart(filename, "archipelago:", &start)) { >> + error_setg(errp, "File name must start with 'archipelago:'"); >> + return; >> + } >> + >> + if (!strlen(start) || strstart(start, "/", NULL)) { >> + error_setg(errp, "volume name must be specified"); >> + return; >> + } >> + >> + parse_filename_opts(filename, errp, &volume, &mport, &vport); >> + >> + if (volume) { >> + qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume)); >> + g_free(volume); >> + } >> + if (mport != NoPort) { >> + qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport)); >> + } >> + if (vport != NoPort) { >> + qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport)); >> + } >> +} >> + >> +static QemuOptsList archipelago_runtime_opts = { >> + .name = "archipelago", >> + .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head), >> + .desc = { >> + { >> + .name = ARCHIPELAGO_OPT_VOLUME, >> + .type = QEMU_OPT_STRING, >> + .help = "Name of the volume image", >> + }, >> + { >> + .name = ARCHIPELAGO_OPT_MPORT, >> + .type = QEMU_OPT_NUMBER, >> + .help = "Archipelago mapperd port number" >> + }, >> + { >> + .name = ARCHIPELAGO_OPT_VPORT, >> + .type = QEMU_OPT_NUMBER, >> + .help = "Archipelago vlmcd port number" >> + >> + }, >> + { /* end of list */ } >> + }, >> +}; >> + >> +static int qemu_archipelago_open(BlockDriverState *bs, >> + QDict *options, >> + int bdrv_flags, >> + Error **errp) >> +{ >> + int ret = 0; >> + const char *volume; >> + QemuOpts *opts; >> + Error *local_err = NULL; >> + BDRVArchipelagoState *s = bs->opaque; >> + >> + opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, &error_abort); >> + qemu_opts_absorb_qdict(opts, options, &local_err); >> + if (local_err) { >> + error_propagate(errp, local_err); >> + qemu_opts_del(opts); >> + return -EINVAL; >> + } >> + >> + s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT, 1001); >> + s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT, 501); >> + >> + volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME); >> + if (volume == NULL) { >> + error_setg(errp, "archipelago block driver requires an 'volume'" >> + " options"); > "archipelago block driver requires the 'volume' option" > >> + error_propagate(errp, local_err); > This line is unnecessary since the error message was already put into > errp. > >> + qemu_opts_del(opts); >> + return -EINVAL; >> + } >> + s->volname = g_strdup(volume); >> + >> + /* Initialize XSEG, join shared memory segment */ >> + ret = qemu_archipelago_init(s); >> + if (ret < 0) { >> + error_setg(errp, "cannot initialize XSEG and join shared " >> + "memory segment"); >> + goto err_exit; >> + } >> + >> + s->event_reader_pos = 0; >> + ret = qemu_pipe(s->fds); >> + if (ret < 0) { >> + error_setg(errp, "cannot create pipe"); >> + goto err_exit; > Do we need to xseg_leave() to avoid leaking xseg refcounts, leaving > memory mapped, and memory leaks? Removed qemu_pipe() call so we do not need to call xseg_leave() anymore. > >> + } >> + >> + fcntl(s->fds[ARCHIP_FD_READ], F_SETFL, O_NONBLOCK); >> + fcntl(s->fds[ARCHIP_FD_WRITE], F_SETFL, O_NONBLOCK); >> + qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ], >> + qemu_archipelago_aio_event_reader, NULL, >> + s); >> + >> + qemu_opts_del(opts); >> + return 0; >> + >> +err_exit: >> + qemu_opts_del(opts); >> + return ret; > s->volname is leaked > >> +} >> + >> +static void qemu_archipelago_close(BlockDriverState *bs) >> +{ >> + int r, targetlen; >> + char *target; >> + struct xseg_request *req; >> + BDRVArchipelagoState *s = bs->opaque; >> + >> + qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ], NULL, NULL, NULL); >> + close(s->fds[0]); >> + close(s->fds[1]); >> + >> + s->stopping = true; >> + >> + qemu_mutex_lock(&s->request_mutex); >> + while (!s->th_is_signaled) { >> + qemu_cond_wait(&s->request_cond, >> + &s->request_mutex); >> + } >> + qemu_mutex_unlock(&s->request_mutex); >> + qemu_cond_destroy(&s->request_cond); >> + qemu_mutex_destroy(&s->request_mutex); > It's not safe to qemu_mutex_destroy() because the other thread may still > be inside qemu_mutex_unlock(&s->request_mutex) and may still access > s->request_mutex memory. > > Use qemu_thread_join() before destroying request_cond and request_mutex. > That way you can be sure there is no race condition. > > (I recently did the same thing and Paolo Bonzini pointed out the bug. > After checking the glibc implementation I was convinced that it's not > safe.) Got it! Paolo was absolutely right! Thanks for sharing! >> + >> + qemu_cond_destroy(&s->archip_cond); >> + qemu_mutex_destroy(&s->archip_mutex); >> + >> + targetlen = strlen(s->volname); >> + req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC); >> + if (!req) { >> + archipelagolog("Cannot get XSEG request\n"); >> + goto err_exit; >> + } >> + r = xseg_prep_request(s->xseg, req, targetlen, 0); >> + if (r < 0) { >> + xseg_put_request(s->xseg, req, s->srcport); >> + archipelagolog("Cannot prepare XSEG close request\n"); >> + goto err_exit; >> + } >> + >> + target = xseg_get_target(s->xseg, req); >> + strncpy(target, s->volname, targetlen); > Using strncpy() hints that target is a string when in fact it's not. I > think memcpy() would be clearer here since you don't want a '\0' byte at > the end of the string. > > Or maybe I'm wrong and there is some guarantee that there will be a '\0' > byte after target? No you are not wrong, memcpy() is clearer, fixed for v5 series. >> + req->size = req->datalen; >> + req->offset = 0; >> + req->op = X_CLOSE; >> + >> + xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC); >> + if (p == NoPort) { >> + xseg_put_request(s->xseg, req, s->srcport); >> + archipelagolog("Cannot submit XSEG close request\n"); >> + goto err_exit; >> + } >> + >> + xseg_signal(s->xseg, p); >> + r = wait_reply(s->xseg, s->srcport, s->port, req); >> + if (r < 0) { >> + archipelagolog("wait_reply() error\n"); >> + } >> + if (!(req->state & XS_SERVED)) { >> + archipelagolog("Could no close map for volume '%s'\n", s->volname); >> + } >> + >> + xseg_put_request(s->xseg, req, s->srcport); >> + >> +err_exit: >> + xseg_leave_dynport(s->xseg, s->port); >> + xseg_leave(s->xseg); > s->volname is leaked. > >> +} >> + >> +static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb) >> +{ >> + ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb; >> + aio_cb->cancelled = true; >> + while (aio_cb->status == -EINPROGRESS) { >> + qemu_aio_wait(); >> + } >> + qemu_aio_release(aio_cb); >> +} >> + >> +static const AIOCBInfo archipelago_aiocb_info = { >> + .aiocb_size = sizeof(ArchipelagoAIOCB), >> + .cancel = qemu_archipelago_aio_cancel, >> +}; >> + >> +static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb) >> +{ >> + int ret = 0; >> + while (1) { >> + fd_set wfd; >> + int fd = aio_cb->s->fds[1]; >> + >> + ret = write(fd, (void *)&aio_cb, sizeof(aio_cb)); >> + if (ret > 0) { >> + break; >> + } >> + if (errno == EINTR) { >> + continue; >> + } >> + if (errno != EAGAIN) { >> + break; >> + } >> + FD_ZERO(&wfd); >> + FD_SET(fd, &wfd); >> + do { >> + ret = select(fd + 1, NULL, &wfd, NULL, NULL); >> + } while (ret < 0 && errno == EINTR); >> + } >> + return ret; >> +} > A newer signalling approach is available and will let you drop the pipe > code. QEMUBH is a "bottom half" or deferred function call that can be > scheduled in an event loop. Scheduling the the QEMUBH is thread-safe so > you can perform it from any thread. > > See block/gluster.c:gluster_finish_aiocb() for an example using QEMUBH. > >> +static int64_t archipelago_volume_info(BDRVArchipelagoState *s) >> +{ >> + uint64_t size; >> + int ret, targetlen; >> + struct xseg_request *req; >> + struct xseg_reply_info *xinfo; >> + AIORequestData *reqdata = g_malloc(sizeof(AIORequestData)); >> + >> + if (!reqdata) { >> + archipelagolog("Cannot allocate reqdata\n"); >> + return -1; > g_malloc() never returns NULL, this if statement can be dropped.