From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by smtp.lore.kernel.org (Postfix) with ESMTP id CC96AE7E0D2 for ; Mon, 9 Feb 2026 18:41:13 +0000 (UTC) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 99CA54067B; Mon, 9 Feb 2026 19:40:57 +0100 (CET) Received: from mail-wm1-f46.google.com (mail-wm1-f46.google.com [209.85.128.46]) by mails.dpdk.org (Postfix) with ESMTP id 0213940678 for ; Mon, 9 Feb 2026 19:40:55 +0100 (CET) Received: by mail-wm1-f46.google.com with SMTP id 5b1f17b1804b1-47ff94b46afso45889455e9.1 for ; Mon, 09 Feb 2026 10:40:55 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=networkplumber-org.20230601.gappssmtp.com; s=20230601; t=1770662455; x=1771267255; darn=dpdk.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=cqRaQx2Ty3AHVbOyEINtOrkbSAyc8L0PZT9MEHc9XaE=; b=A443rVCiBauPwQvdNOO+74Nrh3OUvBXkp+rpgQ5ASPt50zz6V79qOrt9FMDsldNkDx QDSKPjWmo0jnSGN4y/vykSsqZHv+gw7xxMaEfkwH+zNsJq/He3jhJOeAooYT9DZU8rjZ f0wpp5D/E3UThJND8QuXZl3KP4jqfgFv1NRMbtAgR5xjJ4FBnE15UlOr4YrxwFG12B+J yEzRSjBQe6rSIYapLAoXf3u21MNHzOKhBT2vdvK2Hevm/9IcljDZTfONSImLm+JVx5XL +I7Y7TcM22gz+OMqCOH2fy9tU6sjMdaXLhUVcvv3UXjC4ch2/HmRmHTjaZCMGl5HkMlQ I1oQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1770662455; x=1771267255; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-gg:x-gm-message-state:from :to:cc:subject:date:message-id:reply-to; bh=cqRaQx2Ty3AHVbOyEINtOrkbSAyc8L0PZT9MEHc9XaE=; b=hSQygKhGR0PgSuL+iT5y/IKcVxUqN0g0FNB/6R15/goyFirefNstROcKyLFbbKDUlL 2VvjCD+qSin39RxWyOReoCmQ335GiEHBT/hFeLyih78HI1JC0uurhjlSL0SUUf56/bO6 HYR+CWM4aWAJKDjL6/fMpovMP541mz1tUsKAXJJI0h2Rs5eWoQ1eFkKStemgueOC3Xzf 1dnUtnoKcoedhkxgA6toxmdzpVmW/Zz5RGaUeNWpVJ1QMDdar40IRPwHMyQG21Jyk/F1 +0/gDId9Upwy+6BYipv+dCL9+SDX5LeSKRw7oSHFHxN41O/FT7B4WH7EISU6blBz48qa 7WNg== X-Gm-Message-State: AOJu0Yw5oCEK4sd0mN2QTyWnFAU/At50Q6gOltlFZQqkIXUabwmjERmk VSB8WfUjrzCnwy1iMZjDifEU2VR7BaSnGQWiokKUZXQk03g+2rLB6ZjFtqAf3rx0BsVPbE6SeQY E56bq X-Gm-Gg: AZuq6aKA6Gii68lu+91CCB+VLGDPfzjWq12gtDVXlR1UMawyK3roaRLw+BsPXbmAKq3 wMN8JhJZuOVfXleQ//LjQQkSHFPZ0O5ThyYJXLw2sZ/Hi1SqcWuE+KrXTvqYLZNs8x4tuirM581 86vwGCQPc8RB4ejsSaUQ7XuPvR5W+JAuTrzu94bcsmHZW7X8rxjlfDTBj7rNmQuydf46fsaf7SC 6C6VCJsDXvjv+bBNeU0SQSpjq12LF2rpj4gdvbXakNS8ZvmxZjpSoh42x3uu1lFSbxIbuPDiHWE BPyf3TYcoHd0q1QgRMEZAQ3dHHkzfhuZwceh2ho+m/IyhaGmeblFcaroRWsBod+5PuzxFHPGjcR vZ3tEOGZB79/RR9s2+i1Og1ui7TVrhqmNyKA211hzYK/lSLsoOZM4yl6V89VB8/toYNbXtqnEAX 9IBNuRRY7K4jQmGCEXLnFQS9EKahgw2Y1S0Rc94wfcIglfN8vOZA== X-Received: by 2002:a05:600c:8686:b0:47d:87ac:73ef with SMTP id 5b1f17b1804b1-4834d835361mr3581795e9.13.1770662455361; Mon, 09 Feb 2026 10:40:55 -0800 (PST) Received: from phoenix.lan (204-195-96-226.wavecable.com. [204.195.96.226]) by smtp.gmail.com with ESMTPSA id ffacd0b85a97d-4362972fc26sm26448426f8f.22.2026.02.09.10.40.53 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Mon, 09 Feb 2026 10:40:54 -0800 (PST) From: Stephen Hemminger To: dev@dpdk.org Cc: Stephen Hemminger Subject: [PATCH v5 03/10] net/rtap: add Rx/Tx with scatter/gather support Date: Mon, 9 Feb 2026 10:39:02 -0800 Message-ID: <20260209184045.132774-4-stephen@networkplumber.org> X-Mailer: git-send-email 2.51.0 In-Reply-To: <20260209184045.132774-1-stephen@networkplumber.org> References: <20241210212757.83490-1-stephen@networkplumber.org> <20260209184045.132774-1-stephen@networkplumber.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Implement packet receive and transmit using io_uring asynchronous I/O, with full support for both single-segment and multi-segment mbufs. Rx path: - rtap_rx_alloc() chains multiple mbufs when the MTU exceeds a single mbuf's tailroom capacity - Pre-post read/readv requests to the io_uring submission queue, each backed by a pre-allocated (possibly chained) mbuf - On rx_burst, harvest completed CQEs and replace each consumed mbuf with a freshly allocated one - rtap_rx_adjust() distributes received data across segments and frees unused trailing segments - Parse the prepended virtio-net header (offload fields are ignored until the offload patch) Tx path: - For single-segment mbufs, use io_uring write and batch submits - For multi-segment mbufs, use writev via io_uring with immediate submit (iovec is stack-allocated) - When the mbuf headroom is not writable (shared or indirect), chain a new header mbuf for the virtio-net header - Prepend a zeroed virtio-net header (offload population deferred) - Clean completed tx CQEs to free transmitted mbufs Add io_uring cancel-all logic using IORING_ASYNC_CANCEL_ALL for clean queue teardown, draining all pending CQEs and freeing mbufs. Signed-off-by: Stephen Hemminger --- doc/guides/nics/features/rtap.ini | 1 + drivers/net/rtap/meson.build | 1 + drivers/net/rtap/rtap.h | 13 + drivers/net/rtap/rtap_ethdev.c | 7 + drivers/net/rtap/rtap_rxtx.c | 755 ++++++++++++++++++++++++++++++ 5 files changed, 777 insertions(+) create mode 100644 drivers/net/rtap/rtap_rxtx.c diff --git a/doc/guides/nics/features/rtap.ini b/doc/guides/nics/features/rtap.ini index ed7c638029..c064e1e0b9 100644 --- a/doc/guides/nics/features/rtap.ini +++ b/doc/guides/nics/features/rtap.ini @@ -4,6 +4,7 @@ ; Refer to default.ini for the full list of available PMD features. ; [Features] +Scattered Rx = P Linux = Y ARMv7 = Y ARMv8 = Y diff --git a/drivers/net/rtap/meson.build b/drivers/net/rtap/meson.build index 7bd7806ef3..8e2b15f382 100644 --- a/drivers/net/rtap/meson.build +++ b/drivers/net/rtap/meson.build @@ -19,6 +19,7 @@ endif sources = files( 'rtap_ethdev.c', + 'rtap_rxtx.c', ) ext_deps += liburing diff --git a/drivers/net/rtap/rtap.h b/drivers/net/rtap/rtap.h index 39a3188a7b..a0bbb1a8a0 100644 --- a/drivers/net/rtap/rtap.h +++ b/drivers/net/rtap/rtap.h @@ -70,4 +70,17 @@ struct rtap_pmd { int rtap_queue_open(struct rte_eth_dev *dev, uint16_t queue_id); void rtap_queue_close(struct rte_eth_dev *dev, uint16_t queue_id); +/* rtap_rxtx.c */ +uint16_t rtap_rx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts); +uint16_t rtap_tx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts); +int rtap_rx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_id, + uint16_t nb_rx_desc, unsigned int socket_id, + const struct rte_eth_rxconf *rx_conf, + struct rte_mempool *mb_pool); +void rtap_rx_queue_release(struct rte_eth_dev *dev, uint16_t queue_id); +int rtap_tx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_id, + uint16_t nb_tx_desc, unsigned int socket_id, + const struct rte_eth_txconf *tx_conf); +void rtap_tx_queue_release(struct rte_eth_dev *dev, uint16_t queue_id); + #endif /* _RTAP_H_ */ diff --git a/drivers/net/rtap/rtap_ethdev.c b/drivers/net/rtap/rtap_ethdev.c index 4e7847ff8d..a65a8b77ad 100644 --- a/drivers/net/rtap/rtap_ethdev.c +++ b/drivers/net/rtap/rtap_ethdev.c @@ -232,6 +232,10 @@ static const struct eth_dev_ops rtap_ops = { .dev_stop = rtap_dev_stop, .dev_configure = rtap_dev_configure, .dev_close = rtap_dev_close, + .rx_queue_setup = rtap_rx_queue_setup, + .rx_queue_release = rtap_rx_queue_release, + .tx_queue_setup = rtap_tx_queue_setup, + .tx_queue_release = rtap_tx_queue_release, }; static int @@ -272,6 +276,9 @@ rtap_create(struct rte_eth_dev *dev, const char *tap_name, uint8_t persist) PMD_LOG(DEBUG, "%s setup", ifr.ifr_name); + dev->rx_pkt_burst = rtap_rx_burst; + dev->tx_pkt_burst = rtap_tx_burst; + return 0; error: diff --git a/drivers/net/rtap/rtap_rxtx.c b/drivers/net/rtap/rtap_rxtx.c new file mode 100644 index 0000000000..c972ab4ca0 --- /dev/null +++ b/drivers/net/rtap/rtap_rxtx.c @@ -0,0 +1,755 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright (c) 2026 Stephen Hemminger + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "rtap.h" + +/* + * Since virtio net header is prepended to the mbuf, + * the DPDK configuration should make sure that mbuf pools + * are created to work. + */ +static_assert(RTE_PKTMBUF_HEADROOM >= sizeof(struct virtio_net_hdr), + "Pktmbuf headroom not big enough for virtio header"); + + +/* Get the per-process file descriptor used transmit and receive */ +static inline int +rtap_queue_fd(uint16_t port_id, uint16_t queue_id) +{ + struct rte_eth_dev *dev = &rte_eth_devices[port_id]; + int *fds = dev->process_private; + int fd = fds[queue_id]; + + RTE_ASSERT(fd != -1); + return fd; +} + +/* + * Add to submit queue a read of mbuf data. + * For multi-segment mbuf's requires readv(). + * Return: + * -ENOSPC : no submit queue element available. + * 1 : readv was used and no io_uring_submit was done. + * 0 : regular read submitted, caller should call io_uring_submit + * later to batch. + */ +static inline int +rtap_rx_submit(struct rtap_rx_queue *rxq, int fd, struct rte_mbuf *mb) +{ + struct io_uring_sqe *sqe = io_uring_get_sqe(&rxq->io_ring); + struct iovec iovs[RTE_MBUF_MAX_NB_SEGS]; + + if (unlikely(sqe == NULL)) + return -ENOSPC; + + io_uring_sqe_set_data(sqe, mb); + + RTE_ASSERT(rte_pktmbuf_headroom(mb) >= sizeof(struct virtio_net_hdr)); + void *buf = rte_pktmbuf_mtod_offset(mb, void *, -sizeof(struct virtio_net_hdr)); + unsigned int nbytes = sizeof(struct virtio_net_hdr) + rte_pktmbuf_tailroom(mb); + + /* optimize for the case where packet fits in one mbuf */ + if (mb->nb_segs == 1) { + io_uring_prep_read(sqe, fd, buf, nbytes, 0); + /* caller will submit as batch */ + return 0; + } else { + uint16_t nsegs = mb->nb_segs; + RTE_ASSERT(nsegs > 0 && nsegs < IOV_MAX); + + iovs[0].iov_base = buf; + iovs[0].iov_len = nbytes; + + for (uint16_t i = 1; i < nsegs; i++) { + mb = mb->next; + iovs[i].iov_base = rte_pktmbuf_mtod(mb, void *); + iovs[i].iov_len = rte_pktmbuf_tailroom(mb); + } + io_uring_prep_readv(sqe, fd, iovs, nsegs, 0); + + /* + * For readv, need to submit now since iovs[] must be + * valid until submitted. + * io_uring_submit(3) returns the number of submitted submission + * queue entries (on failure returns -errno). + */ + return io_uring_submit(&rxq->io_ring); + } +} + +/* Allocates one or more mbuf's to be used for reading packets */ +static struct rte_mbuf * +rtap_rx_alloc(struct rtap_rx_queue *rxq) +{ + const struct rte_eth_dev *dev = &rte_eth_devices[rxq->port_id]; + int buf_size = dev->data->mtu + RTE_ETHER_HDR_LEN; + struct rte_mbuf *m = NULL; + struct rte_mbuf **tail = &m; + + do { + struct rte_mbuf *seg = rte_pktmbuf_alloc(rxq->mb_pool); + if (unlikely(seg == NULL)) { + rte_pktmbuf_free(m); + return NULL; + } + *tail = seg; + tail = &seg->next; + if (seg != m) + ++m->nb_segs; + + buf_size -= rte_pktmbuf_tailroom(seg); + } while (buf_size > 0); + + __rte_mbuf_sanity_check(m, 1); + return m; +} + +/* + * When receiving multi-segment mbuf's need to adjust + * the length of mbufs. + */ +static inline int +rtap_rx_adjust(struct rte_mbuf *mb, uint32_t len) +{ + struct rte_mbuf *seg; + uint16_t count = 0; + + mb->pkt_len = len; + + /* Walk through mbuf chain and update the length of each segment */ + for (seg = mb; seg != NULL && len > 0; seg = seg->next) { + uint16_t seg_len = RTE_MIN(len, rte_pktmbuf_tailroom(seg)); + + seg->data_len = seg_len; + count++; + len -= seg_len; + + /* If length is zero, this is end of packet */ + if (len == 0) { + /* Drop unused tail segments */ + if (seg->next != NULL) { + struct rte_mbuf *tail = seg->next; + seg->next = NULL; + + /* Free segments one by one to avoid nb_segs issues */ + while (tail != NULL) { + struct rte_mbuf *next = tail->next; + rte_pktmbuf_free_seg(tail); + tail = next; + } + } + + mb->nb_segs = count; + return 0; + } + } + + /* Packet was truncated - not enough mbuf space */ + return -1; +} + +/* + * Set the receive offload flags of received mbuf + * based on the bits in the virtio network header + */ +static int +rtap_rx_offload(struct rte_mbuf *m, const struct virtio_net_hdr *hdr) +{ + uint32_t ptype; + bool l4_supported = false; + struct rte_net_hdr_lens hdr_lens; + + /* nothing to do */ + if (hdr->flags == 0 && hdr->gso_type == VIRTIO_NET_HDR_GSO_NONE) + return 0; + + m->ol_flags |= RTE_MBUF_F_RX_IP_CKSUM_UNKNOWN; + + ptype = rte_net_get_ptype(m, &hdr_lens, RTE_PTYPE_ALL_MASK); + m->packet_type = ptype; + if ((ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_TCP || + (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_UDP || + (ptype & RTE_PTYPE_L4_MASK) == RTE_PTYPE_L4_SCTP) + l4_supported = true; + + if (hdr->flags & VIRTIO_NET_HDR_F_NEEDS_CSUM) { + uint32_t hdrlen = hdr_lens.l2_len + hdr_lens.l3_len + hdr_lens.l4_len; + if (hdr->csum_start <= hdrlen && l4_supported) { + m->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_NONE; + } else { + /* Unknown proto or tunnel, do sw cksum. */ + uint16_t csum = 0; + + if (rte_raw_cksum_mbuf(m, hdr->csum_start, + rte_pktmbuf_pkt_len(m) - hdr->csum_start, + &csum) < 0) + return -EINVAL; + if (likely(csum != 0xffff)) + csum = ~csum; + + uint32_t off = (uint32_t)hdr->csum_offset + hdr->csum_start; + if (rte_pktmbuf_data_len(m) >= off + sizeof(uint16_t)) + *rte_pktmbuf_mtod_offset(m, uint16_t *, off) = csum; + } + } else if ((hdr->flags & VIRTIO_NET_HDR_F_DATA_VALID) && l4_supported) { + m->ol_flags |= RTE_MBUF_F_RX_L4_CKSUM_GOOD; + } + + /* GSO request, save required information in mbuf */ + if (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE) { + /* Check unsupported modes */ + if ((hdr->gso_type & VIRTIO_NET_HDR_GSO_ECN) || hdr->gso_size == 0) + return -EINVAL; + + /* Update mss lengths in mbuf */ + m->tso_segsz = hdr->gso_size; + switch (hdr->gso_type & ~VIRTIO_NET_HDR_GSO_ECN) { + case VIRTIO_NET_HDR_GSO_TCPV4: + case VIRTIO_NET_HDR_GSO_TCPV6: + m->ol_flags |= RTE_MBUF_F_RX_LRO | RTE_MBUF_F_RX_L4_CKSUM_NONE; + break; + default: + return -EINVAL; + } + } + + return 0; +} + +uint16_t +rtap_rx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) +{ + struct rtap_rx_queue *rxq = queue; + struct io_uring_cqe *cqe; + unsigned int head, num_cqe = 0, num_sqe = 0; + uint16_t num_rx = 0; + uint32_t num_bytes = 0; + int fd = rtap_queue_fd(rxq->port_id, rxq->queue_id); + + if (unlikely(nb_pkts == 0)) + return 0; + + io_uring_for_each_cqe(&rxq->io_ring, head, cqe) { + struct rte_mbuf *mb = (void *)(uintptr_t)cqe->user_data; + struct rte_mbuf *nmb = NULL; + struct virtio_net_hdr *hdr = NULL; + ssize_t len = cqe->res; + + PMD_RX_LOG(DEBUG, "complete m=%p len=%zd", mb, len); + + num_cqe++; + + if (unlikely(len < (ssize_t)(sizeof(*hdr) + RTE_ETHER_HDR_LEN))) { + if (len < 0) + PMD_RX_LOG(ERR, "io_uring_read: %s", strerror(-len)); + else + PMD_RX_LOG(ERR, "io_uring_read len %zd", len); + rxq->rx_errors++; + goto resubmit; + } + + /* virtio header is before packet data */ + hdr = rte_pktmbuf_mtod_offset(mb, struct virtio_net_hdr *, -sizeof(*hdr)); + len -= sizeof(*hdr); + + /* Replacement mbuf for resubmitting */ + nmb = rtap_rx_alloc(rxq); + if (unlikely(nmb == NULL)) { + struct rte_eth_dev *dev = &rte_eth_devices[rxq->port_id]; + + PMD_RX_LOG(ERR, "Rx mbuf alloc failed"); + dev->data->rx_mbuf_alloc_failed++; + + nmb = mb; /* Reuse original */ + goto resubmit; + } + + if (mb->nb_segs == 1) { + mb->data_len = len; + mb->pkt_len = len; + } else { + if (unlikely(rtap_rx_adjust(mb, len) < 0)) { + PMD_RX_LOG(ERR, "packet truncated: pkt_len=%u exceeds mbuf capacity", + mb->pkt_len); + ++rxq->rx_errors; + rte_pktmbuf_free(mb); + goto resubmit; + } + } + + if (unlikely(rtap_rx_offload(mb, hdr) < 0)) { + PMD_RX_LOG(ERR, "invalid rx offload"); + ++rxq->rx_errors; + rte_pktmbuf_free(mb); + goto resubmit; + } + + mb->port = rxq->port_id; + + __rte_mbuf_sanity_check(mb, 1); + num_bytes += mb->pkt_len; + bufs[num_rx++] = mb; + +resubmit: + /* Submit the replacement mbuf */ + int n = rtap_rx_submit(rxq, fd, nmb); + if (unlikely(n < 0)) { + /* Hope that later Rx can recover */ + PMD_RX_LOG(ERR, "io_uring no Rx sqe: %s", strerror(-n)); + rxq->rx_errors++; + rte_pktmbuf_free(nmb); + break; + } + + /* If using readv() then n > 0 and all sqe's have been queued. */ + if (n > 0) + num_sqe = 0; + else + ++num_sqe; + + if (num_rx == nb_pkts) + break; + } + if (num_cqe > 0) + io_uring_cq_advance(&rxq->io_ring, num_cqe); + + if (num_sqe > 0) { + int n = io_uring_submit(&rxq->io_ring); + if (unlikely(n < 0)) { + PMD_LOG(ERR, "Rx io_uring submit failed: %s", strerror(-n)); + } else if (unlikely(n != (int)num_sqe)) { + PMD_RX_LOG(NOTICE, "Rx io_uring %d of %u resubmitted", n, num_sqe); + } + } + + rxq->rx_packets += num_rx; + rxq->rx_bytes += num_bytes; + + return num_rx; +} + +int +rtap_rx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_id, uint16_t nb_rx_desc, + unsigned int socket_id, + const struct rte_eth_rxconf *rx_conf __rte_unused, + struct rte_mempool *mb_pool) +{ + struct rte_mbuf **mbufs = NULL; + unsigned int nsqe = 0; + int fd = -1; + + PMD_LOG(DEBUG, "setup port %u queue %u rx_descriptors %u", + dev->data->port_id, queue_id, nb_rx_desc); + + struct rtap_rx_queue *rxq = rte_zmalloc_socket(NULL, sizeof(*rxq), + RTE_CACHE_LINE_SIZE, socket_id); + if (rxq == NULL) { + PMD_LOG(ERR, "rxq alloc failed"); + return -1; + } + + rxq->mb_pool = mb_pool; + rxq->port_id = dev->data->port_id; + rxq->queue_id = queue_id; + dev->data->rx_queues[queue_id] = rxq; + + if (io_uring_queue_init(nb_rx_desc, &rxq->io_ring, 0) != 0) { + PMD_LOG(ERR, "io_uring_queue_init failed: %s", strerror(errno)); + goto error_rxq_free; + } + + mbufs = calloc(nb_rx_desc, sizeof(struct rte_mbuf *)); + if (mbufs == NULL) { + PMD_LOG(ERR, "Rx mbuf pointer alloc failed"); + goto error_iouring_exit; + } + + /* open shared tap fd maybe already setup */ + if (rtap_queue_open(dev, queue_id) < 0) + goto error_bulk_free; + + fd = rtap_queue_fd(rxq->port_id, rxq->queue_id); + + for (uint16_t i = 0; i < nb_rx_desc; i++) { + mbufs[i] = rtap_rx_alloc(rxq); + if (mbufs[i] == NULL) { + PMD_LOG(ERR, "Rx mbuf alloc buf failed"); + goto error_bulk_free; + } + + int n = rtap_rx_submit(rxq, fd, mbufs[i]); + if (n < 0) { + PMD_LOG(ERR, "rtap_rx_submit failed: %s", strerror(-n)); + goto error_bulk_free; + } + + /* If using readv() then n > 0 and all sqe's have been queued. */ + if (n > 0) + nsqe = 0; + else + ++nsqe; + } + + if (nsqe > 0) { + int n = io_uring_submit(&rxq->io_ring); + if (n < 0) { + PMD_LOG(ERR, "Rx io_uring submit failed: %s", strerror(-n)); + goto error_bulk_free; + } + if (n < (int)nsqe) + PMD_LOG(NOTICE, "Rx io_uring partial submit %d of %u", n, nb_rx_desc); + } + + free(mbufs); + return 0; + +error_bulk_free: + /* can't use bulk free here because some of mbufs[] maybe NULL */ + for (uint16_t i = 0; i < nb_rx_desc; i++) { + if (mbufs[i] != NULL) + rte_pktmbuf_free(mbufs[i]); + } + rtap_queue_close(dev, queue_id); + free(mbufs); +error_iouring_exit: + io_uring_queue_exit(&rxq->io_ring); +error_rxq_free: + rte_free(rxq); + return -1; +} + +/* + * Cancel all pending io_uring operations and drain completions. + * Uses IORING_ASYNC_CANCEL_ALL to cancel all operations at once. + * Returns the number of mbufs freed. + */ +static unsigned int +rtap_cancel_all(struct io_uring *ring) +{ + struct io_uring_cqe *cqe; + struct io_uring_sqe *sqe; + unsigned int head, num_freed = 0; + unsigned int ready; + int ret; + + /* Cancel all pending operations using CANCEL_ALL flag */ + sqe = io_uring_get_sqe(ring); + if (sqe != NULL) { + /* IORING_ASYNC_CANCEL_ALL | IORING_ASYNC_CANCEL_ANY cancels all ops */ + io_uring_prep_cancel(sqe, NULL, + IORING_ASYNC_CANCEL_ALL | IORING_ASYNC_CANCEL_ANY); + io_uring_sqe_set_data(sqe, NULL); + ret = io_uring_submit(ring); + if (ret < 0) + PMD_LOG(ERR, "cancel submit failed: %s", strerror(-ret)); + } + + /* + * One blocking wait to let the kernel deliver the cancel CQE + * and the CQEs for all cancelled operations. + */ + io_uring_submit_and_wait(ring, 1); + + /* + * Drain all CQEs non-blocking. Cancellation of many pending + * operations may produce CQEs in waves; keep polling until the + * CQ is empty. + */ + for (unsigned int retries = 0; retries < 10; retries++) { + ready = io_uring_cq_ready(ring); + if (ready == 0) + break; + + io_uring_for_each_cqe(ring, head, cqe) { + struct rte_mbuf *mb = (void *)(uintptr_t)cqe->user_data; + + /* Skip the cancel operation's own CQE (user_data = NULL) */ + if (mb != NULL) { + rte_pktmbuf_free(mb); + ++num_freed; + } + } + + /* Advance past all processed CQEs */ + io_uring_cq_advance(ring, ready); + } + + return num_freed; +} + +void +rtap_rx_queue_release(struct rte_eth_dev *dev, uint16_t queue_id) +{ + struct rtap_rx_queue *rxq = dev->data->rx_queues[queue_id]; + + if (rxq == NULL) + return; + + rtap_cancel_all(&rxq->io_ring); + io_uring_queue_exit(&rxq->io_ring); + + rte_free(rxq); + + /* Close the shared TAP fd if the tx queue is already gone */ + if (queue_id >= dev->data->nb_tx_queues || + dev->data->tx_queues[queue_id] == NULL) + rtap_queue_close(dev, queue_id); +} + +int +rtap_tx_queue_setup(struct rte_eth_dev *dev, uint16_t queue_id, + uint16_t nb_tx_desc, unsigned int socket_id, + const struct rte_eth_txconf *tx_conf) +{ + /* open shared tap fd maybe already setup */ + if (rtap_queue_open(dev, queue_id) < 0) + return -1; + + struct rtap_tx_queue *txq = rte_zmalloc_socket(NULL, sizeof(*txq), + RTE_CACHE_LINE_SIZE, socket_id); + if (txq == NULL) { + PMD_LOG(ERR, "txq alloc failed"); + return -1; + } + + txq->port_id = dev->data->port_id; + txq->queue_id = queue_id; + txq->free_thresh = tx_conf->tx_free_thresh; + dev->data->tx_queues[queue_id] = txq; + + if (io_uring_queue_init(nb_tx_desc, &txq->io_ring, 0) != 0) { + PMD_LOG(ERR, "io_uring_queue_init failed: %s", strerror(errno)); + rte_free(txq); + return -1; + } + + return 0; +} + +static void +rtap_tx_cleanup(struct rtap_tx_queue *txq) +{ + struct io_uring_cqe *cqe; + unsigned int head; + unsigned int num_cqe = 0; + + io_uring_for_each_cqe(&txq->io_ring, head, cqe) { + struct rte_mbuf *mb = (void *)(uintptr_t)cqe->user_data; + + ++num_cqe; + + /* Skip CQEs with NULL user_data (e.g., cancel operations) */ + if (mb == NULL) + continue; + + PMD_TX_LOG(DEBUG, " mbuf len %u result: %d", mb->pkt_len, cqe->res); + txq->tx_errors += (cqe->res < 0); + rte_pktmbuf_free(mb); + } + io_uring_cq_advance(&txq->io_ring, num_cqe); +} + +void +rtap_tx_queue_release(struct rte_eth_dev *dev, uint16_t queue_id) +{ + struct rtap_tx_queue *txq = dev->data->tx_queues[queue_id]; + + if (txq == NULL) + return; + + /* First drain any completed TX operations */ + rtap_tx_cleanup(txq); + + /* Cancel all remaining pending operations and free mbufs */ + rtap_cancel_all(&txq->io_ring); + io_uring_queue_exit(&txq->io_ring); + + rte_free(txq); + + /* Close the shared TAP fd if the rx queue is already gone */ + if (queue_id >= dev->data->nb_rx_queues || + dev->data->rx_queues[queue_id] == NULL) + rtap_queue_close(dev, queue_id); +} + +/* Convert mbuf offload flags to virtio net header */ +static void +rtap_tx_offload(struct virtio_net_hdr *hdr, const struct rte_mbuf *m) +{ + uint64_t csum_l4 = m->ol_flags & RTE_MBUF_F_TX_L4_MASK; + uint16_t o_l23_len = (m->ol_flags & RTE_MBUF_F_TX_TUNNEL_MASK) ? + m->outer_l2_len + m->outer_l3_len : 0; + + memset(hdr, 0, sizeof(*hdr)); + + if (m->ol_flags & RTE_MBUF_F_TX_TCP_SEG) + csum_l4 |= RTE_MBUF_F_TX_TCP_CKSUM; + + switch (csum_l4) { + case RTE_MBUF_F_TX_UDP_CKSUM: + hdr->csum_start = o_l23_len + m->l2_len + m->l3_len; + hdr->csum_offset = offsetof(struct rte_udp_hdr, dgram_cksum); + hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM; + break; + + case RTE_MBUF_F_TX_TCP_CKSUM: + hdr->csum_start = o_l23_len + m->l2_len + m->l3_len; + hdr->csum_offset = offsetof(struct rte_tcp_hdr, cksum); + hdr->flags = VIRTIO_NET_HDR_F_NEEDS_CSUM; + break; + } + + /* TCP Segmentation Offload */ + if (m->ol_flags & RTE_MBUF_F_TX_TCP_SEG) { + hdr->gso_type = (m->ol_flags & RTE_MBUF_F_TX_IPV6) ? + VIRTIO_NET_HDR_GSO_TCPV6 : + VIRTIO_NET_HDR_GSO_TCPV4; + hdr->gso_size = m->tso_segsz; + hdr->hdr_len = o_l23_len + m->l2_len + m->l3_len + m->l4_len; + } +} + +/* + * Transmit burst posts mbufs into the io_uring TAP file descriptor + * by creating queue elements with write operation. + * + * The driver mimics the behavior of a real hardware NIC. + * + * If there is no space left in the io_uring then the driver will return the number of + * mbuf's that were processed to that point. The application can then decide to retry + * later or drop the unsent packets in case of backpressue. + * + * The transmit process puts the virtio header before the data. In some cases, a new mbuf + * is required from same pool as original; but if that fails, the packet is not sent and + * is silently dropped. This is to avoid situation where pool is so small that transmit + * gets stuck when pool resources are very low. + */ +uint16_t +rtap_tx_burst(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) +{ + struct rtap_tx_queue *txq = queue; + uint16_t i, num_tx = 0; + uint32_t num_tx_bytes = 0; + + PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts); + + if (io_uring_sq_space_left(&txq->io_ring) < RTE_MAX(txq->free_thresh, nb_pkts)) + rtap_tx_cleanup(txq); + + int fd = rtap_queue_fd(txq->port_id, txq->queue_id); + + for (i = 0; i < nb_pkts; i++) { + struct rte_mbuf *mb = bufs[i]; + struct virtio_net_hdr *hdr; + + /* Use packet head room space for virtio header (if possible) */ + if (rte_mbuf_refcnt_read(mb) == 1 && RTE_MBUF_DIRECT(mb) && + rte_pktmbuf_headroom(mb) >= sizeof(*hdr)) { + hdr = rte_pktmbuf_mtod_offset(mb, struct virtio_net_hdr *, -sizeof(*hdr)); + } else { + /* Need to chain a new mbuf to make room for virtio header */ + struct rte_mbuf *mh = rte_pktmbuf_alloc(mb->pool); + if (unlikely(mh == NULL)) { + PMD_TX_LOG(DEBUG, "mbuf pool exhausted on transmit"); + rte_pktmbuf_free(mb); + ++txq->tx_errors; + continue; + } + + /* The packet headroom should be available in newly allocated mbuf */ + RTE_ASSERT(rte_pktmbuf_headroom(mh) >= sizeof(*hdr)); + + hdr = rte_pktmbuf_mtod_offset(mh, struct virtio_net_hdr *, -sizeof(*hdr)); + mh->next = mb; + mh->nb_segs = mb->nb_segs + 1; + mh->pkt_len = mb->pkt_len; + mh->ol_flags = mb->ol_flags & RTE_MBUF_F_TX_OFFLOAD_MASK; + mb = mh; + } + + struct io_uring_sqe *sqe = io_uring_get_sqe(&txq->io_ring); + if (sqe == NULL) { + /* Drop header mbuf if it was used */ + if (mb != bufs[i]) + rte_pktmbuf_free_seg(mb); + break; /* submit ring is full */ + } + + /* Note: transmit bytes does not include virtio header */ + num_tx_bytes += mb->pkt_len; + + io_uring_sqe_set_data(sqe, mb); + rtap_tx_offload(hdr, mb); + + PMD_TX_LOG(DEBUG, "write m=%p segs=%u", mb, mb->nb_segs); + + /* Start of data written to kernel includes virtio net header */ + void *buf = rte_pktmbuf_mtod_offset(mb, void *, -sizeof(*hdr)); + unsigned int nbytes = sizeof(struct virtio_net_hdr) + mb->data_len; + + if (mb->nb_segs == 1) { + /* Single segment mbuf can go as write and batched */ + io_uring_prep_write(sqe, fd, buf, nbytes, 0); + ++num_tx; + } else { + /* Mult-segment mbuf needs scatter/gather */ + struct iovec iovs[RTE_MBUF_MAX_NB_SEGS + 1]; + unsigned int niov = mb->nb_segs; + + iovs[0].iov_base = buf; + iovs[0].iov_len = nbytes; + + for (unsigned int v = 1; v < niov; v++) { + mb = mb->next; + iovs[v].iov_base = rte_pktmbuf_mtod(mb, void *); + iovs[v].iov_len = mb->data_len; + } + + io_uring_prep_writev(sqe, fd, iovs, niov, 0); + + /* + * For writev, submit now since iovs[] is on the stack + * and must remain valid until submitted. + * This also submits any previously batched single-seg writes. + */ + int err = io_uring_submit(&txq->io_ring); + if (unlikely(err < 0)) { + PMD_TX_LOG(ERR, "Tx io_uring submit failed: %s", strerror(-err)); + ++txq->tx_errors; + } + + num_tx = 0; + } + } + + if (likely(num_tx > 0)) { + int err = io_uring_submit(&txq->io_ring); + if (unlikely(err < 0)) { + PMD_LOG(ERR, "Tx io_uring submit failed: %s", strerror(-err)); + ++txq->tx_errors; + } + } + + txq->tx_packets += i; + txq->tx_bytes += num_tx_bytes; + + return i; +} -- 2.51.0