From mboxrd@z Thu Jan 1 00:00:00 1970 Content-Type: multipart/mixed; boundary="===============3426935067545159985==" MIME-Version: 1.0 From: Ming Lin Subject: [SPDK] [RFC PATCH 2/4] support virtio-scsi to reconnect to vhost process when vhost restart Date: Thu, 10 May 2018 20:25:52 -0700 Message-ID: <1526009154-5492-3-git-send-email-minggr@gmail.com> In-Reply-To: 1526009154-5492-1-git-send-email-minggr@gmail.com List-ID: To: spdk@lists.01.org --===============3426935067545159985== Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable From: Sheng Qiu 1. virtio scsi initiator detect io timeout and vhost connection's liveness. 2. reconnect to vhost and resend pending io if current vhost connection is = dead Signed-off-by: Sheng Qiu --- include/spdk/io_channel.h | 20 +++- include/spdk_internal/bdev.h | 10 ++ include/spdk_internal/virtio.h | 13 +++ lib/bdev/bdev.c | 2 +- lib/bdev/virtio/bdev_virtio_scsi.c | 201 +++++++++++++++++++++++++++++++++= ---- lib/util/io_channel.c | 33 +++--- lib/vhost/vhost.c | 1 + lib/virtio/virtio.c | 2 +- lib/virtio/virtio_user.c | 106 +++++++++++++++++++ 9 files changed, 353 insertions(+), 35 deletions(-) diff --git a/include/spdk/io_channel.h b/include/spdk/io_channel.h index 83a26ed..9e37f53 100644 --- a/include/spdk/io_channel.h +++ b/include/spdk/io_channel.h @@ -47,10 +47,10 @@ extern "C" { #endif = struct spdk_thread; -struct spdk_io_channel; struct spdk_io_channel_iter; struct spdk_poller; = + typedef void (*spdk_thread_fn)(void *ctx); typedef void (*spdk_thread_pass_msg)(spdk_thread_fn fn, void *ctx, void *thread_ctx); @@ -70,6 +70,21 @@ typedef void (*spdk_io_device_unregister_cb)(void *io_de= vice); typedef void (*spdk_channel_msg)(struct spdk_io_channel_iter *i); typedef void (*spdk_channel_for_each_cpl)(struct spdk_io_channel_iter *i, = int status); = +struct spdk_io_channel { + struct spdk_thread *thread; + struct io_device *dev; + uint32_t ref; + TAILQ_ENTRY(spdk_io_channel) tailq; + spdk_io_channel_destroy_cb destroy_cb; + + /* + * Modules will allocate extra memory off the end of this structure + * to store references to hardware-specific references (i.e. NVMe queue + * pairs, or references to child device spdk_io_channels (i.e. + * virtual bdevs). + */ +}; + /** * \brief Initializes the calling thread for I/O channel allocation. * @@ -185,6 +200,9 @@ void spdk_io_device_unregister(void *io_device, spdk_io= _device_unregister_cb unr */ struct spdk_io_channel *spdk_get_io_channel(void *io_device); = +void spdk_get_all_io_channel(void *io_device, struct spdk_io_channel **ch_= array, int size, + int *cnt); + /** * \brief Releases a reference to an I/O channel. This happens asynchronou= sly. * diff --git a/include/spdk_internal/bdev.h b/include/spdk_internal/bdev.h index 2b49819..d12907c 100644 --- a/include/spdk_internal/bdev.h +++ b/include/spdk_internal/bdev.h @@ -374,6 +374,15 @@ struct spdk_bdev_io { /** It may be used by modules to put the bdev_io into its own list. */ TAILQ_ENTRY(spdk_bdev_io) module_link; = + /** outstanding request queue link */ + TAILQ_ENTRY(spdk_bdev_io) outstanding_req_link; + + /** submit tick */ + uint64_t submit_tick; + + /** The I/O channel that this was submitted on. */ + struct spdk_io_channel *io_ch; + /** * Per I/O context for use by the bdev module. */ @@ -415,6 +424,7 @@ void spdk_bdev_io_get_buf(struct spdk_bdev_io *bdev_io,= spdk_bdev_io_get_buf_cb void spdk_bdev_io_complete(struct spdk_bdev_io *bdev_io, enum spdk_bdev_io_status status); = +void spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io); /** * Complete a bdev_io with an NVMe status code. * diff --git a/include/spdk_internal/virtio.h b/include/spdk_internal/virtio.h index 69416d5..e6143e8 100644 --- a/include/spdk_internal/virtio.h +++ b/include/spdk_internal/virtio.h @@ -46,6 +46,7 @@ #include "spdk/json.h" #include "spdk/io_channel.h" #include "spdk/pci_ids.h" +#include "spdk_internal/bdev.h" = /** * The maximum virtqueue size is 2^15. Use that value as the end of @@ -295,6 +296,10 @@ int virtio_dev_start(struct virtio_dev *vdev, uint16_t= max_queues, */ void virtio_dev_stop(struct virtio_dev *vdev); = +/* free the allocated queue */ +void +virtio_free_queues(struct virtio_dev *dev); + /** * Destruct a virtio device. Note that it must be in the stopped state. * The virtio_dev should be manually freed afterwards. @@ -453,6 +458,14 @@ int virtio_pci_dev_enumerate(virtio_pci_create_cb enum= _cb, uint16_t pci_device_i int virtio_user_dev_init(struct virtio_dev *vdev, const char *name, const = char *path, uint32_t queue_size); = +/* check vhost aliveness */ +int +virtio_user_check_vhost_alive(struct virtio_dev *vdev); + +/* reconnect vhost */ +void +virtio_user_dev_reconnect(struct virtio_dev *vdev); + /** * Initialize virtio_dev for a given PCI device. * The virtio_dev has to be freed with \c virtio_dev_destruct. diff --git a/lib/bdev/bdev.c b/lib/bdev/bdev.c index c3aa063..3d34e44 100644 --- a/lib/bdev/bdev.c +++ b/lib/bdev/bdev.c @@ -767,7 +767,7 @@ spdk_bdev_put_io(struct spdk_bdev_io *bdev_io) } } = -static void +void spdk_bdev_io_submit(struct spdk_bdev_io *bdev_io) { struct spdk_bdev *bdev =3D bdev_io->bdev; diff --git a/lib/bdev/virtio/bdev_virtio_scsi.c b/lib/bdev/virtio/bdev_virt= io_scsi.c index 94f4835..0602597 100644 --- a/lib/bdev/virtio/bdev_virtio_scsi.c +++ b/lib/bdev/virtio/bdev_virtio_scsi.c @@ -66,6 +66,10 @@ #define VIRTIO_SCSI_EVENTQ 1 #define VIRTIO_SCSI_REQUESTQ 2 = +/* io timeout value in second */ +#define VIRTIO_SCSI_IO_TIMEMOUT (1) + + static int bdev_virtio_initialize(void); static void bdev_virtio_finish(void); = @@ -90,6 +94,9 @@ struct virtio_scsi_dev { = /** Device marked for removal. */ bool removed; + + /** protect reconnect process */ + pthread_mutex_t reconn_mutex; }; = struct virtio_scsi_io_ctx { @@ -165,6 +172,15 @@ struct bdev_virtio_io_channel { = /** Virtio response poller. */ struct spdk_poller *poller; + + /** Outstanding Req in this channel */ + TAILQ_HEAD(virtio_outstanding_req_head, spdk_bdev_io) outstanding_req; + + /** protect channel's status update */ + pthread_mutex_t lock; + + /** whether channel is reconnecting */ + bool reconnecting; }; = /** Module finish in progress */ @@ -268,6 +284,7 @@ virtio_scsi_dev_init(struct virtio_scsi_dev *svdev, uin= t16_t max_queues) TAILQ_INIT(&svdev->luns); svdev->scan_ctx =3D NULL; svdev->removed =3D false; + pthread_mutex_init(&svdev->reconn_mutex, NULL); = spdk_io_device_register(svdev, bdev_virtio_scsi_ch_create_cb, bdev_virtio_scsi_ch_destroy_cb, @@ -445,29 +462,52 @@ bdev_virtio_send_io(struct spdk_io_channel *ch, struc= t spdk_bdev_io *bdev_io) struct bdev_virtio_io_channel *virtio_channel =3D spdk_io_channel_get_ctx= (ch); struct virtqueue *vq =3D virtio_channel->vq; struct virtio_scsi_io_ctx *io_ctx =3D (struct virtio_scsi_io_ctx *)bdev_i= o->driver_ctx; - int rc; + int rc =3D 0; = - rc =3D virtqueue_req_start(vq, bdev_io, bdev_io->u.bdev.iovcnt + 2); - if (rc =3D=3D -ENOMEM) { - spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM); - return; - } else if (rc !=3D 0) { - spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); - return; + /* this lock is per channel, only in reconnecting phase it has contention= */ + pthread_mutex_lock(&virtio_channel->lock); + + /* only submit io to vq if it's not in reconnecting */ + if (!virtio_channel->reconnecting) { + + rc =3D virtqueue_req_start(vq, bdev_io, bdev_io->u.bdev.iovcnt + 2); + if (rc =3D=3D -ENOMEM) { + SPDK_ERRLOG("no memory to submit io\n"); + spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_NOMEM); + pthread_mutex_unlock(&virtio_channel->lock); + return; + } else if (rc !=3D 0) { + SPDK_ERRLOG("io submit failed\n"); + spdk_bdev_io_complete(bdev_io, SPDK_BDEV_IO_STATUS_FAILED); + pthread_mutex_unlock(&virtio_channel->lock); + return; + } + + virtqueue_req_add_iovs(vq, &io_ctx->iov_req, 1, SPDK_VIRTIO_DESC_RO); + if (bdev_io->type =3D=3D SPDK_BDEV_IO_TYPE_READ) { + virtqueue_req_add_iovs(vq, &io_ctx->iov_resp, 1, SPDK_VIRTIO_DESC_WR); + virtqueue_req_add_iovs(vq, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, + SPDK_VIRTIO_DESC_WR); + } else { + virtqueue_req_add_iovs(vq, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, + SPDK_VIRTIO_DESC_RO); + virtqueue_req_add_iovs(vq, &io_ctx->iov_resp, 1, SPDK_VIRTIO_DESC_WR); + } + + virtqueue_req_flush(vq); } = - virtqueue_req_add_iovs(vq, &io_ctx->iov_req, 1, SPDK_VIRTIO_DESC_RO); - if (bdev_io->type =3D=3D SPDK_BDEV_IO_TYPE_READ) { - virtqueue_req_add_iovs(vq, &io_ctx->iov_resp, 1, SPDK_VIRTIO_DESC_WR); - virtqueue_req_add_iovs(vq, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, - SPDK_VIRTIO_DESC_WR); - } else { - virtqueue_req_add_iovs(vq, bdev_io->u.bdev.iovs, bdev_io->u.bdev.iovcnt, - SPDK_VIRTIO_DESC_RO); - virtqueue_req_add_iovs(vq, &io_ctx->iov_resp, 1, SPDK_VIRTIO_DESC_WR); + /* add io to pending queue if it's successfully submit or it's in reconne= cting phase */ + if (rc =3D=3D 0) { + bdev_io->submit_tick =3D spdk_get_ticks(); + bdev_io->io_ch =3D ch; + struct spdk_bdev_io *last =3D TAILQ_LAST(&virtio_channel->outstanding_re= q, + virtio_outstanding_req_head); + assert(last !=3D bdev_io); + TAILQ_INSERT_TAIL(&virtio_channel->outstanding_req, bdev_io, outstanding= _req_link); } = - virtqueue_req_flush(vq); + pthread_mutex_unlock(&virtio_channel->lock); } = static void @@ -592,6 +632,7 @@ static int _bdev_virtio_submit_request(struct spdk_io_c= hannel *ch, struct spdk_b default: return -1; } + return 0; } = @@ -700,6 +741,113 @@ bdev_virtio_io_cpl(struct spdk_bdev_io *bdev_io) spdk_bdev_io_complete_scsi_status(bdev_io, io_ctx->resp.status, sk, asc, = ascq); } = +static int +bdev_virtio_check_timeout(struct bdev_virtio_io_channel *ch) +{ + uint64_t t02; + struct spdk_bdev_io *bdev_io; + uint64_t timeout_tick =3D VIRTIO_SCSI_IO_TIMEMOUT * spdk_get_ticks_hz(); + + t02 =3D spdk_get_ticks(); + + bdev_io =3D TAILQ_FIRST(&ch->outstanding_req); + if (bdev_io =3D=3D NULL) { + return 0; + } + + /* check the oldest request */ + if (bdev_io->submit_tick + timeout_tick > t02) { + return 0; + } else { + return 1; + } +} + +static void +bdev_virtio_update_all_channel_status(struct virtio_scsi_dev *svdev, bool = is_reconnecting) +{ + struct spdk_io_channel *ch[1024] =3D {NULL}; + int cnt =3D 0, i; + struct bdev_virtio_io_channel *virtio_channel; + + spdk_get_all_io_channel(svdev, ch, 1024, &cnt); + for (i =3D 0; i < cnt; i++) { + virtio_channel =3D spdk_io_channel_get_ctx(ch[i]); + pthread_mutex_lock(&virtio_channel->lock); + virtio_channel->reconnecting =3D is_reconnecting; + pthread_mutex_unlock(&virtio_channel->lock); + } +} + +static void +bdev_virtio_update_all_io_channel_vq(struct virtio_scsi_dev *svdev) +{ + struct spdk_io_channel *ch[1024] =3D {NULL}; + int cnt =3D 0, i; + struct bdev_virtio_io_channel *virtio_channel; + int32_t queue_idx; + + spdk_get_all_io_channel(svdev, ch, 1024, &cnt); + for (i =3D 0; i < cnt; i++) { + virtio_channel =3D spdk_io_channel_get_ctx(ch[i]); + queue_idx =3D virtio_dev_find_and_acquire_queue(&svdev->vdev, VIRTIO_SCS= I_REQUESTQ); + if (queue_idx < 0) { + SPDK_ERRLOG("Couldn't get an unused queue for the io_channel.\n"); + assert(0); + } + + virtio_channel->vq =3D svdev->vdev.vqs[queue_idx]; + virtio_channel->svdev =3D svdev; + } +} + +/* + * reconnect to vhost + * return only if reconnect is successful + * */ +static bool +bdev_virtio_reconnect(struct virtio_scsi_dev *svdev) +{ + bool reconnected =3D false; + /* only one thread can initiate reconnect */ + pthread_mutex_lock(&svdev->reconn_mutex); + if (virtio_user_check_vhost_alive(&svdev->vdev) !=3D 0) { + /* lock all virtio channels */ + bdev_virtio_update_all_channel_status(svdev, true); + virtio_user_dev_reconnect(&svdev->vdev); + /* update all virtio channels pointing to new vqs*/ + bdev_virtio_update_all_io_channel_vq(svdev); + /* unlock all virtio channels */ + bdev_virtio_update_all_channel_status(svdev, false); + reconnected =3D true; + } + pthread_mutex_unlock(&svdev->reconn_mutex); + + return reconnected; +} + +/* + * resend pending requests + * */ +static void +bdev_virtio_resend_pending_req(struct virtio_scsi_dev *svdev, struct bdev_= virtio_io_channel *ch) +{ + struct spdk_bdev_io *bdev_io, *tmp; + TAILQ_HEAD(virtio_outstanding_req_head, spdk_bdev_io) requests; + + TAILQ_INIT(&requests); + TAILQ_SWAP(&ch->outstanding_req, &requests, spdk_bdev_io, outstanding_req= _link); + + TAILQ_FOREACH_SAFE(bdev_io, &requests, outstanding_req_link, tmp) { + TAILQ_REMOVE(&requests, (struct spdk_bdev_io *)bdev_io, outstanding_req_= link); + + /* Make sure it's in PENDING status */ + if (bdev_io->status !=3D SPDK_BDEV_IO_STATUS_PENDING) + abort(); + spdk_bdev_io_submit(bdev_io); + } +} + static void bdev_virtio_poll(void *arg) { @@ -730,6 +878,9 @@ bdev_virtio_poll(void *arg) continue; } = + /* remove from outstanding queue */ + TAILQ_REMOVE(&ch->outstanding_req, (struct spdk_bdev_io *)io[i], outstan= ding_req_link); + bdev_virtio_io_cpl(io[i]); } = @@ -750,6 +901,17 @@ bdev_virtio_poll(void *arg) _virtio_scsi_dev_scan_finish(scan_ctx, rc); } } + + return; + } + + if (cnt =3D=3D 0 && bdev_virtio_check_timeout(ch)) { + /* block and reconnect */ + bool reconnected =3D bdev_virtio_reconnect(svdev); + + /* resend all outstanding io */ + if (reconnected) + bdev_virtio_resend_pending_req(svdev, ch); } } = @@ -913,6 +1075,9 @@ bdev_virtio_scsi_ch_create_cb(void *io_device, void *c= tx_buf) = ch->svdev =3D svdev; ch->vq =3D vq; + ch->reconnecting =3D false; + pthread_mutex_init(&ch->lock, NULL); + TAILQ_INIT(&ch->outstanding_req); = ch->poller =3D spdk_poller_register(bdev_virtio_poll, ch, 0); = diff --git a/lib/util/io_channel.c b/lib/util/io_channel.c index e1ac2c2..0e803c5 100644 --- a/lib/util/io_channel.c +++ b/lib/util/io_channel.c @@ -60,20 +60,6 @@ struct io_device { = static TAILQ_HEAD(, io_device) g_io_devices =3D TAILQ_HEAD_INITIALIZER(g_i= o_devices); = -struct spdk_io_channel { - struct spdk_thread *thread; - struct io_device *dev; - uint32_t ref; - TAILQ_ENTRY(spdk_io_channel) tailq; - spdk_io_channel_destroy_cb destroy_cb; - - /* - * Modules will allocate extra memory off the end of this structure - * to store references to hardware-specific references (i.e. NVMe queue - * pairs, or references to child device spdk_io_channels (i.e. - * virtual bdevs). - */ -}; = struct spdk_thread { pthread_t thread_id; @@ -396,6 +382,25 @@ spdk_io_device_unregister(void *io_device, spdk_io_dev= ice_unregister_cb unregist _spdk_io_device_attempt_free(dev); } = +void +spdk_get_all_io_channel(void *io_device, struct spdk_io_channel **ch_array= , int size, int *cnt) +{ + struct spdk_thread *thread; + struct spdk_io_channel *ch; + int i =3D 0; + + TAILQ_FOREACH(thread, &g_threads, tailq) { + TAILQ_FOREACH(ch, &thread->io_channels, tailq) { + if (ch->dev->io_device =3D=3D io_device) { + ch_array[i++] =3D ch; + assert(i < size); + } + } + } + + *cnt =3D i; +} + struct spdk_io_channel * spdk_get_io_channel(void *io_device) { diff --git a/lib/vhost/vhost.c b/lib/vhost/vhost.c index bd3473e..79a2a21 100644 --- a/lib/vhost/vhost.c +++ b/lib/vhost/vhost.c @@ -1138,6 +1138,7 @@ destroy_connection(int vid) = /* since pollers are not running it safe not to use spdk_event here */ vdev->vid =3D -1; + vdev->status =3D 0; pthread_mutex_unlock(&g_spdk_vhost_mutex); } = diff --git a/lib/virtio/virtio.c b/lib/virtio/virtio.c index 3e22d9e..f4c0e5d 100644 --- a/lib/virtio/virtio.c +++ b/lib/virtio/virtio.c @@ -233,7 +233,7 @@ fail_q_alloc: return ret; } = -static void +void virtio_free_queues(struct virtio_dev *dev) { uint16_t nr_vq =3D dev->max_queues; diff --git a/lib/virtio/virtio_user.c b/lib/virtio/virtio_user.c index d7010cc..86237a5 100644 --- a/lib/virtio/virtio_user.c +++ b/lib/virtio/virtio_user.c @@ -46,6 +46,10 @@ = #include "spdk_internal/virtio.h" = +/* reconnect interval in second */ +#define RECONNECT_INTERVAL (3) + + static int virtio_user_create_queue(struct virtio_dev *vdev, uint32_t queue_sel) { @@ -400,6 +404,108 @@ static const struct virtio_dev_ops virtio_user_ops = =3D { .dump_json_config =3D virtio_user_dump_json_config, }; = + + +int +virtio_user_check_vhost_alive(struct virtio_dev *vdev) +{ + struct virtio_user_dev *dev =3D vdev->ctx; + uint64_t host_max_queues; + int ret; + + /* ignore SIGPIPE, so won't exit on broken pipe */ + signal(SIGPIPE, SIG_IGN); + + /* Hack: use VHOST_USER_GET_QUEUE_NUM to check vhost alive */ + ret =3D dev->ops->send_request(dev, VHOST_USER_GET_QUEUE_NUM, &host_max_q= ueues); + return ret; +} + +static int +vhost_user_check_vhost_socket(struct virtio_user_dev *dev) +{ + int fd; + struct sockaddr_un un; + ssize_t rc; + + fd =3D socket(AF_UNIX, SOCK_STREAM, 0); + if (fd < 0) + return -1; + + memset(&un, 0, sizeof(un)); + un.sun_family =3D AF_UNIX; + + rc =3D snprintf(un.sun_path, sizeof(un.sun_path), "%s", dev->path); + if (rc < 0 || (size_t)rc >=3D sizeof(un.sun_path)) { + close(fd); + return -1; + } + + if (connect(fd, (struct sockaddr *)&un, sizeof(un)) < 0) { + close(fd); + return -1; + } + + close(fd); + return 0; +} + +static int +_virtio_user_del_queue(struct virtio_dev *vdev, uint32_t queue_sel) +{ + struct virtio_user_dev *dev =3D vdev->ctx; + close(dev->callfds[queue_sel]); + close(dev->kickfds[queue_sel]); + dev->callfds[queue_sel] =3D -1; + dev->kickfds[queue_sel] =3D -1; + return 0; +} + +static int +virtio_user_dev_reconnect_impl(struct virtio_dev *vdev, const char *name, = const char *path, + uint32_t queue_size) +{ + virtio_user_queue_setup(vdev, _virtio_user_del_queue); + virtio_free_queues(vdev); + virtio_dev_destruct(vdev); + + virtio_user_dev_init(vdev, name, path, queue_size); + virtio_dev_start(vdev, vdev->max_queues - vdev->fixed_queues_num, vdev->f= ixed_queues_num); + return 0; +} + +void +virtio_user_dev_reconnect(struct virtio_dev *vdev) +{ + bool alive =3D false; + + while (1) { + struct virtio_user_dev *dev =3D vdev->ctx; + + /* check vhost liveness before reconnect */ + if (!alive) + alive =3D virtio_user_check_vhost_alive(vdev) =3D=3D 0; + + if (!alive) { + if (vhost_user_check_vhost_socket(dev) =3D=3D 0) { + SPDK_NOTICELOG("Reconnecting: start reconnect %s\n", dev->path); + char path[PATH_MAX]; + memcpy(path, dev->path, PATH_MAX); + virtio_user_dev_reconnect_impl(vdev, vdev->name, path, dev->queue_size= ); + + sleep(RECONNECT_INTERVAL); + } else { + /* vhost socket not ready, wait some time*/ + sleep(RECONNECT_INTERVAL); + } + } else { + break; + } + } + + SPDK_NOTICELOG("Reconnecting: done\n"); +} + int virtio_user_dev_init(struct virtio_dev *vdev, const char *name, const char= *path, uint32_t queue_size) -- = 1.9.1 --===============3426935067545159985==--