From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([209.51.188.92]:46144) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1gvkMm-0002p9-W4 for qemu-devel@nongnu.org; Mon, 18 Feb 2019 09:57:58 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1gvkMj-0002WY-U9 for qemu-devel@nongnu.org; Mon, 18 Feb 2019 09:57:55 -0500 Received: from mx1.redhat.com ([209.132.183.28]:33382) by eggs.gnu.org with esmtps (TLS1.0:DHE_RSA_AES_256_CBC_SHA1:32) (Exim 4.71) (envelope-from ) id 1gvkMh-0002S9-TW for qemu-devel@nongnu.org; Mon, 18 Feb 2019 09:57:53 -0500 Date: Mon, 18 Feb 2019 22:46:40 +0800 From: Wei Xu Message-ID: <20190218144640.GA28793@wei-ubt> References: <1550118402-4057-1-git-send-email-wexu@redhat.com> <1550118402-4057-8-git-send-email-wexu@redhat.com> <7d71fb88-2651-41eb-023e-63a11a83cd38@redhat.com> MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Disposition: inline In-Reply-To: <7d71fb88-2651-41eb-023e-63a11a83cd38@redhat.com> Content-Transfer-Encoding: quoted-printable Subject: Re: [Qemu-devel] [PATCH v4 07/11] virtio: fill/flush/pop for packed ring List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Jason Wang Cc: qemu-devel@nongnu.org, mst@redhat.com, jfreiman@redhat.com, maxime.coquelin@redhat.com, tiwei.bie@intel.com On Mon, Feb 18, 2019 at 03:51:05PM +0800, Jason Wang wrote: >=20 > On 2019/2/14 =E4=B8=8B=E5=8D=8812:26, wexu@redhat.com wrote: > >From: Wei Xu > > > >last_used_idx/wrap_counter should be equal to last_avail_idx/wrap_coun= ter > >after a successful flush. > > > >Batching in vhost-net & dpdk testpmd is not equivalently supported in > >userspace backend, but a chained descriptors for Rx is similarly prese= nted > >as a lightweight batch, so a write barrier is nailed only for the > >first(head) descriptor. > > > >Signed-off-by: Wei Xu > >--- > > hw/virtio/virtio.c | 291 +++++++++++++++++++++++++++++++++++++++++++= ++++++---- > > 1 file changed, 274 insertions(+), 17 deletions(-) > > > >diff --git a/hw/virtio/virtio.c b/hw/virtio/virtio.c > >index 832287b..7e276b4 100644 > >--- a/hw/virtio/virtio.c > >+++ b/hw/virtio/virtio.c > >@@ -379,6 +379,25 @@ static void vring_packed_desc_read(VirtIODevice *= vdev, VRingPackedDesc *desc, > > virtio_tswap16s(vdev, &desc->id); > > } > >+static void vring_packed_desc_write_data(VirtIODevice *vdev, > >+ VRingPackedDesc *desc, MemoryRegionCache *cache, = int i) > >+{ > >+ virtio_tswap32s(vdev, &desc->len); > >+ virtio_tswap16s(vdev, &desc->id); > >+ address_space_write_cached(cache, > >+ i * sizeof(VRingPackedDesc) + offsetof(VRingPackedDesc,= id), > >+ &desc->id, sizeof(desc->id)); > >+ address_space_cache_invalidate(cache, > >+ i * sizeof(VRingPackedDesc) + offsetof(VRingPackedDesc,= id), > >+ sizeof(desc->id)); > >+ address_space_write_cached(cache, > >+ i * sizeof(VRingPackedDesc) + offsetof(VRingPackedDesc,= len), > >+ &desc->len, sizeof(desc->len)); > >+ address_space_cache_invalidate(cache, > >+ i * sizeof(VRingPackedDesc) + offsetof(VRingPackedDesc,= len), > >+ sizeof(desc->len)); > >+} > >+ > > static void vring_packed_desc_read_flags(VirtIODevice *vdev, > > VRingPackedDesc *desc, MemoryRegionCache *cache,= int i) > > { > >@@ -388,6 +407,18 @@ static void vring_packed_desc_read_flags(VirtIODe= vice *vdev, > > virtio_tswap16s(vdev, &desc->flags); > > } > >+static void vring_packed_desc_write_flags(VirtIODevice *vdev, > >+ VRingPackedDesc *desc, MemoryRegionCache *cache, = int i) > >+{ > >+ virtio_tswap16s(vdev, &desc->flags); > >+ address_space_write_cached(cache, > >+ i * sizeof(VRingPackedDesc) + offsetof(VRingPackedDesc,= flags), > >+ &desc->flags, sizeof(desc->flags)); > >+ address_space_cache_invalidate(cache, > >+ i * sizeof(VRingPackedDesc) + offsetof(VRingPackedDesc,= flags), > >+ sizeof(desc->flags)); > >+} > >+ > > static inline bool is_desc_avail(struct VRingPackedDesc *desc, > > bool wrap_counter) > > { > >@@ -554,19 +585,11 @@ bool virtqueue_rewind(VirtQueue *vq, unsigned in= t num) > > } > > /* Called within rcu_read_lock(). */ > >-void virtqueue_fill(VirtQueue *vq, const VirtQueueElement *elem, > >+static void virtqueue_split_fill(VirtQueue *vq, const VirtQueueElemen= t *elem, > > unsigned int len, unsigned int idx) > > { > > VRingUsedElem uelem; > >- trace_virtqueue_fill(vq, elem, len, idx); > >- > >- virtqueue_unmap_sg(vq, elem, len); > >- > >- if (unlikely(vq->vdev->broken)) { > >- return; > >- } > >- > > if (unlikely(!vq->vring.used)) { > > return; > > } > >@@ -578,16 +601,71 @@ void virtqueue_fill(VirtQueue *vq, const VirtQue= ueElement *elem, > > vring_used_write(vq, &uelem, idx); > > } > >-/* Called within rcu_read_lock(). */ > >-void virtqueue_flush(VirtQueue *vq, unsigned int count) > >+static void virtqueue_packed_fill(VirtQueue *vq, const VirtQueueEleme= nt *elem, > >+ unsigned int len, unsigned int idx) > > { > >- uint16_t old, new; > >+ uint16_t head; > >+ VRingMemoryRegionCaches *caches; > >+ VRingPackedDesc desc =3D { > >+ .flags =3D 0, > >+ .id =3D elem->index, > >+ .len =3D len, > >+ }; > >+ bool wrap_counter =3D vq->used_wrap_counter; > >+ > >+ if (unlikely(!vq->vring.desc)) { > >+ return; > >+ } > >+ > >+ head =3D vq->used_idx + idx; > >+ if (head >=3D vq->vring.num) { > >+ head -=3D vq->vring.num; > >+ wrap_counter ^=3D 1; > >+ } > >+ if (wrap_counter) { > >+ desc.flags |=3D (1 << VRING_PACKED_DESC_F_AVAIL); > >+ desc.flags |=3D (1 << VRING_PACKED_DESC_F_USED); > >+ } else { > >+ desc.flags &=3D ~(1 << VRING_PACKED_DESC_F_AVAIL); > >+ desc.flags &=3D ~(1 << VRING_PACKED_DESC_F_USED); > >+ } > >+ > >+ caches =3D vring_get_region_caches(vq); > >+ vring_packed_desc_write_data(vq->vdev, &desc, &caches->desc, head= ); > >+ if (idx =3D=3D 0) { > >+ /* > >+ * Make sure descriptor id and len is written before > >+ * flags for the first used buffer. > >+ */ > >+ smp_wmb(); > >+ } >=20 >=20 > I suspect you need to do this unconditionally since this function doesn= 't do > any batched writing to used ring. What it did is to write used descript= ors > at idx. So there's no reason to supress wmb for the idx !=3D 0. See its= usage > in virtio-net.c I see, this is unnecessary, will remove it. >=20 >=20 > >+ > >+ vring_packed_desc_write_flags(vq->vdev, &desc, &caches->desc, hea= d); > >+} > >+ > >+void virtqueue_fill(VirtQueue *vq, const VirtQueueElement *elem, > >+ unsigned int len, unsigned int idx) > >+{ > >+ trace_virtqueue_fill(vq, elem, len, idx); > >+ > >+ virtqueue_unmap_sg(vq, elem, len); > > if (unlikely(vq->vdev->broken)) { > >- vq->inuse -=3D count; > > return; > > } > >+ if (virtio_vdev_has_feature(vq->vdev, VIRTIO_F_RING_PACKED)) { > >+ virtqueue_packed_fill(vq, elem, len, idx); > >+ } else { > >+ virtqueue_split_fill(vq, elem, len, idx); > >+ } > >+} > >+ > >+/* Called within rcu_read_lock(). */ > >+static void virtqueue_split_flush(VirtQueue *vq, unsigned int count) > >+{ > >+ uint16_t old, new; > >+ > > if (unlikely(!vq->vring.used)) { > > return; > > } > >@@ -603,6 +681,31 @@ void virtqueue_flush(VirtQueue *vq, unsigned int = count) > > vq->signalled_used_valid =3D false; > > } > >+static void virtqueue_packed_flush(VirtQueue *vq, unsigned int count) > >+{ > >+ if (unlikely(!vq->vring.desc)) { > >+ return; > >+ } > >+ > >+ vq->inuse -=3D count; > >+ vq->used_idx =3D vq->last_avail_idx; > >+ vq->used_wrap_counter =3D vq->last_avail_wrap_counter; > >+} > >+ > >+void virtqueue_flush(VirtQueue *vq, unsigned int count) > >+{ > >+ if (unlikely(vq->vdev->broken)) { > >+ vq->inuse -=3D count; > >+ return; > >+ } > >+ > >+ if (virtio_vdev_has_feature(vq->vdev, VIRTIO_F_RING_PACKED)) { > >+ virtqueue_packed_flush(vq, count); > >+ } else { > >+ virtqueue_split_flush(vq, count); > >+ } > >+} > >+ > > void virtqueue_push(VirtQueue *vq, const VirtQueueElement *elem, > > unsigned int len) > > { > >@@ -1077,7 +1180,7 @@ static void *virtqueue_alloc_element(size_t sz, = unsigned out_num, unsigned in_nu > > return elem; > > } > >-void *virtqueue_pop(VirtQueue *vq, size_t sz) > >+static void *virtqueue_split_pop(VirtQueue *vq, size_t sz) > > { > > unsigned int i, head, max; > > VRingMemoryRegionCaches *caches; > >@@ -1092,9 +1195,6 @@ void *virtqueue_pop(VirtQueue *vq, size_t sz) > > VRingDesc desc; > > int rc; > >- if (unlikely(vdev->broken)) { > >- return NULL; > >- } > > rcu_read_lock(); > > if (virtio_queue_empty_rcu(vq)) { > > goto done; > >@@ -1212,6 +1312,163 @@ err_undo_map: > > goto done; > > } > >+static void *virtqueue_packed_pop(VirtQueue *vq, size_t sz) > >+{ > >+ unsigned int i, head, max; > >+ VRingMemoryRegionCaches *caches; > >+ MemoryRegionCache indirect_desc_cache =3D MEMORY_REGION_CACHE_INV= ALID; > >+ MemoryRegionCache *cache; > >+ int64_t len; > >+ VirtIODevice *vdev =3D vq->vdev; > >+ VirtQueueElement *elem =3D NULL; > >+ unsigned out_num, in_num, elem_entries; > >+ hwaddr addr[VIRTQUEUE_MAX_SIZE]; > >+ struct iovec iov[VIRTQUEUE_MAX_SIZE]; > >+ VRingPackedDesc desc; > >+ uint16_t id; > >+ > >+ rcu_read_lock(); > >+ if (virtio_queue_packed_empty_rcu(vq)) { > >+ goto done; > >+ } > >+ > >+ /* When we start there are none of either input nor output. */ > >+ out_num =3D in_num =3D elem_entries =3D 0; > >+ > >+ max =3D vq->vring.num; > >+ > >+ if (vq->inuse >=3D vq->vring.num) { > >+ virtio_error(vdev, "Virtqueue size exceeded"); > >+ goto done; > >+ } > >+ > >+ head =3D vq->last_avail_idx; > >+ i =3D head; > >+ > >+ caches =3D vring_get_region_caches(vq); > >+ cache =3D &caches->desc; > >+ > >+ /* make sure flags is read before all the other fields */ > >+ smp_rmb(); > >+ vring_packed_desc_read(vdev, &desc, cache, i); > >+ > >+ id =3D desc.id; > >+ if (desc.flags & VRING_DESC_F_INDIRECT) { > >+ > >+ if (desc.len % sizeof(VRingPackedDesc)) { > >+ virtio_error(vdev, "Invalid size for indirect buffer tabl= e"); > >+ goto done; > >+ } > >+ > >+ /* loop over the indirect descriptor table */ > >+ len =3D address_space_cache_init(&indirect_desc_cache, vdev->= dma_as, > >+ desc.addr, desc.len, false); > >+ cache =3D &indirect_desc_cache; > >+ if (len < desc.len) { > >+ gi virtio_error(vdev, "Cannot map indirect buffer"); > >+ goto done; > >+ } > >+ > >+ max =3D desc.len / sizeof(VRingPackedDesc); > >+ i =3D 0; > >+ vring_packed_desc_read(vdev, &desc, cache, i); > >+ /* Make sure we see all the fields*/ > >+ smp_rmb(); >=20 >=20 > This looks unnecessary, all indirect descriptor should be available now= . Right as the last one. >=20 >=20 > >+ } > >+ > >+ /* Collect all the descriptors */ > >+ while (1) { > >+ bool map_ok; > >+ > >+ if (desc.flags & VRING_DESC_F_WRITE) { > >+ map_ok =3D virtqueue_map_desc(vdev, &in_num, addr + out_n= um, > >+ iov + out_num, > >+ VIRTQUEUE_MAX_SIZE - out_num,= true, > >+ desc.addr, desc.len); > >+ } else { > >+ if (in_num) { > >+ virtio_error(vdev, "Incorrect order for descriptors")= ; > >+ goto err_undo_map; > >+ } > >+ map_ok =3D virtqueue_map_desc(vdev, &out_num, addr, iov, > >+ VIRTQUEUE_MAX_SIZE, false, > >+ desc.addr, desc.len); > >+ } > >+ if (!map_ok) { > >+ goto err_undo_map; > >+ } > >+ > >+ /* If we've got too many, that implies a descriptor loop. */ > >+ if (++elem_entries > max) { > >+ virtio_error(vdev, "Looped descriptor"); > >+ goto err_undo_map; > >+ } > >+ > >+ if (++i >=3D vq->vring.num) { > >+ i -=3D vq->vring.num; >=20 >=20 > Do we allow chain more descriptors than vq size in the case of indirect= ? > According to the spec: >=20 > " >=20 > The device limits the number of descriptors in a list through a > transport-specific and/or device-specific value. If not limited, > the maximum number of descriptors in a list is the virt queue > size. > " >=20 > This looks possible, so the above is probably wrong if the max number o= f > chained descriptors is negotiated through a device specific way. >=20 OK, I will remove this check, while it is necessary to have a limitation for indirect descriptor anyway, otherwise it is possible to hit an overfl= ow since presumably u16 is used for most number/size in the spec.=20 >=20 > >+ } > >+ > >+ if (cache =3D=3D &indirect_desc_cache) { > >+ if (i =3D=3D max) { > >+ break; > >+ } > >+ vring_packed_desc_read(vq->vdev, &desc, cache, i); > >+ } else if (desc.flags & VRING_DESC_F_NEXT) { > >+ vring_packed_desc_read(vq->vdev, &desc, cache, i); > >+ } else { > >+ break; > >+ } > >+ } > >+ > >+ /* Now copy what we have collected and mapped */ > >+ elem =3D virtqueue_alloc_element(sz, out_num, in_num); > >+ elem->index =3D id; > >+ for (i =3D 0; i < out_num; i++) { > >+ elem->out_addr[i] =3D addr[i]; > >+ elem->out_sg[i] =3D iov[i]; > >+ } > >+ for (i =3D 0; i < in_num; i++) { > >+ elem->in_addr[i] =3D addr[head + out_num + i]; > >+ elem->in_sg[i] =3D iov[out_num + i]; > >+ } > >+ > >+ vq->last_avail_idx +=3D (cache =3D=3D &indirect_desc_cache) ? > >+ 1 : elem_entries; > >+ if (vq->last_avail_idx >=3D vq->vring.num) { > >+ vq->last_avail_idx -=3D vq->vring.num; > >+ vq->last_avail_wrap_counter ^=3D 1; > >+ } > >+ vq->inuse++; > >+ > >+ vq->shadow_avail_idx =3D vq->last_avail_idx; > >+ vq->avail_wrap_counter =3D vq->last_avail_wrap_counter; >=20 >=20 > It's better to name this to "shadow_avail_wrap_counter". OK, thanks for reviewing. Wei >=20 > Thanks >=20 >=20 > >+ > >+ trace_virtqueue_pop(vq, elem, elem->in_num, elem->out_num); > >+done: > >+ address_space_cache_destroy(&indirect_desc_cache); > >+ rcu_read_unlock(); > >+ > >+ return elem; > >+ > >+err_undo_map: > >+ virtqueue_undo_map_desc(out_num, in_num, iov); > >+ g_free(elem); > >+ goto done; > >+} > >+ > >+void *virtqueue_pop(VirtQueue *vq, size_t sz) > >+{ > >+ if (unlikely(vq->vdev->broken)) { > >+ return NULL; > >+ } > >+ > >+ if (virtio_vdev_has_feature(vq->vdev, VIRTIO_F_RING_PACKED)) { > >+ return virtqueue_packed_pop(vq, sz); > >+ } else { > >+ return virtqueue_split_pop(vq, sz); > >+ } > >+} > >+ > > /* virtqueue_drop_all: > > * @vq: The #VirtQueue > > * Drops all queued buffers and indicates them to the guest