* [PATCH 0/5] selftests: net: add multithread and multiqueue support to iou-zcrx
@ 2026-04-08 16:38 Juanlu Herrero
2026-04-08 16:38 ` [PATCH 1/5] selftests: net: fix get_refill_ring_size() to use its local variable Juanlu Herrero
` (4 more replies)
0 siblings, 5 replies; 6+ messages in thread
From: Juanlu Herrero @ 2026-04-08 16:38 UTC (permalink / raw)
To: netdev; +Cc: Juanlu Herrero
Add multithreaded support to the iou-zcrx selftest and a new
rss_multiqueue test variant that exercises multi-queue zero-copy
receive with per-port flow rule steering.
Juanlu Herrero (5):
selftests: net: fix get_refill_ring_size() to use its local variable
selftests: net: add multithread client support to iou-zcrx
selftests: net: remove unused variable in process_recvzc()
selftests: net: add multithread server support to iou-zcrx
selftests: net: add rss_multiqueue test variant to iou-zcrx
.../testing/selftests/drivers/net/hw/Makefile | 2 +-
.../selftests/drivers/net/hw/iou-zcrx.c | 361 ++++++++++++------
.../selftests/drivers/net/hw/iou-zcrx.py | 45 ++-
3 files changed, 281 insertions(+), 127 deletions(-)
--
2.53.0
^ permalink raw reply [flat|nested] 6+ messages in thread
* [PATCH 1/5] selftests: net: fix get_refill_ring_size() to use its local variable
2026-04-08 16:38 [PATCH 0/5] selftests: net: add multithread and multiqueue support to iou-zcrx Juanlu Herrero
@ 2026-04-08 16:38 ` Juanlu Herrero
2026-04-08 16:38 ` [PATCH 2/5] selftests: net: add multithread client support to iou-zcrx Juanlu Herrero
` (3 subsequent siblings)
4 siblings, 0 replies; 6+ messages in thread
From: Juanlu Herrero @ 2026-04-08 16:38 UTC (permalink / raw)
To: netdev; +Cc: Juanlu Herrero
In preparation for multi-threaded rss selftests, fix
get_refill_ring_size to use the local `size` variable,
instead of the `global_size`.
Signed-off-by: Juanlu Herrero <juanlu@fastmail.com>
---
tools/testing/selftests/drivers/net/hw/iou-zcrx.c | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/tools/testing/selftests/drivers/net/hw/iou-zcrx.c b/tools/testing/selftests/drivers/net/hw/iou-zcrx.c
index 240d13dbc54e..334985083f61 100644
--- a/tools/testing/selftests/drivers/net/hw/iou-zcrx.c
+++ b/tools/testing/selftests/drivers/net/hw/iou-zcrx.c
@@ -132,10 +132,10 @@ static inline size_t get_refill_ring_size(unsigned int rq_entries)
{
size_t size;
- ring_size = rq_entries * sizeof(struct io_uring_zcrx_rqe);
+ size = rq_entries * sizeof(struct io_uring_zcrx_rqe);
/* add space for the header (head/tail/etc.) */
- ring_size += page_size;
- return ALIGN_UP(ring_size, page_size);
+ size += page_size;
+ return ALIGN_UP(size, page_size);
}
static void setup_zcrx(struct io_uring *ring)
--
2.53.0
^ permalink raw reply related [flat|nested] 6+ messages in thread
* [PATCH 2/5] selftests: net: add multithread client support to iou-zcrx
2026-04-08 16:38 [PATCH 0/5] selftests: net: add multithread and multiqueue support to iou-zcrx Juanlu Herrero
2026-04-08 16:38 ` [PATCH 1/5] selftests: net: fix get_refill_ring_size() to use its local variable Juanlu Herrero
@ 2026-04-08 16:38 ` Juanlu Herrero
2026-04-08 16:38 ` [PATCH 3/5] selftests: net: remove unused variable in process_recvzc() Juanlu Herrero
` (2 subsequent siblings)
4 siblings, 0 replies; 6+ messages in thread
From: Juanlu Herrero @ 2026-04-08 16:38 UTC (permalink / raw)
To: netdev; +Cc: Juanlu Herrero
Add pthreads to the iou-zcrx client so that multiple connections can be
established simultaneously. Each client thread connects to the server
and sends its payload independently.
Introduce struct thread_ctx and the -t option to control the number of
threads (default 1), preserving backwards compatibility with existing
tests.
Signed-off-by: Juanlu Herrero <juanlu@fastmail.com>
---
.../testing/selftests/drivers/net/hw/Makefile | 2 +-
.../selftests/drivers/net/hw/iou-zcrx.c | 46 +++++++++++++++++--
2 files changed, 44 insertions(+), 4 deletions(-)
diff --git a/tools/testing/selftests/drivers/net/hw/Makefile b/tools/testing/selftests/drivers/net/hw/Makefile
index deeca3f8d080..227adfec706c 100644
--- a/tools/testing/selftests/drivers/net/hw/Makefile
+++ b/tools/testing/selftests/drivers/net/hw/Makefile
@@ -80,5 +80,5 @@ include ../../../net/ynl.mk
include ../../../net/bpf.mk
ifeq ($(HAS_IOURING_ZCRX),y)
-$(OUTPUT)/iou-zcrx: LDLIBS += -luring
+$(OUTPUT)/iou-zcrx: LDLIBS += -luring -lpthread
endif
diff --git a/tools/testing/selftests/drivers/net/hw/iou-zcrx.c b/tools/testing/selftests/drivers/net/hw/iou-zcrx.c
index 334985083f61..de2eea78a5b6 100644
--- a/tools/testing/selftests/drivers/net/hw/iou-zcrx.c
+++ b/tools/testing/selftests/drivers/net/hw/iou-zcrx.c
@@ -4,6 +4,7 @@
#include <error.h>
#include <fcntl.h>
#include <limits.h>
+#include <pthread.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
@@ -85,8 +86,14 @@ static int cfg_send_size = SEND_SIZE;
static struct sockaddr_in6 cfg_addr;
static unsigned int cfg_rx_buf_len;
static bool cfg_dry_run;
+static int cfg_num_threads = 1;
static char *payload;
+
+struct thread_ctx {
+ int thread_id;
+};
+
static void *area_ptr;
static void *ring_ptr;
static size_t ring_size;
@@ -376,7 +383,7 @@ static void run_server(void)
error(1, 0, "test failed\n");
}
-static void run_client(void)
+static void *client_worker(void *arg)
{
ssize_t to_send = cfg_send_size;
ssize_t sent = 0;
@@ -402,12 +409,42 @@ static void run_client(void)
}
close(fd);
+ return NULL;
+}
+
+static void run_client(void)
+{
+ struct thread_ctx *ctxs;
+ pthread_t *threads;
+ int i, ret;
+
+ ctxs = calloc(cfg_num_threads, sizeof(*ctxs));
+ threads = calloc(cfg_num_threads, sizeof(*threads));
+ if (!ctxs || !threads)
+ error(1, 0, "calloc()");
+
+ for (i = 0; i < cfg_num_threads; i++)
+ ctxs[i].thread_id = i;
+
+ for (i = 0; i < cfg_num_threads; i++) {
+ ret = pthread_create(&threads[i], NULL, client_worker,
+ &ctxs[i]);
+ if (ret)
+ error(1, ret, "pthread_create()");
+ }
+
+ for (i = 0; i < cfg_num_threads; i++)
+ pthread_join(threads[i], NULL);
+
+ free(threads);
+ free(ctxs);
}
static void usage(const char *filepath)
{
error(1, 0, "Usage: %s (-4|-6) (-s|-c) -h<server_ip> -p<port> "
- "-l<payload_size> -i<ifname> -q<rxq_id>", filepath);
+ "-l<payload_size> -i<ifname> -q<rxq_id> -t<num_threads>",
+ filepath);
}
static void parse_opts(int argc, char **argv)
@@ -425,7 +462,7 @@ static void parse_opts(int argc, char **argv)
usage(argv[0]);
cfg_payload_len = max_payload_len;
- while ((c = getopt(argc, argv, "sch:p:l:i:q:o:z:x:d")) != -1) {
+ while ((c = getopt(argc, argv, "sch:p:l:i:q:o:z:x:dt:")) != -1) {
switch (c) {
case 's':
if (cfg_client)
@@ -466,6 +503,9 @@ static void parse_opts(int argc, char **argv)
case 'd':
cfg_dry_run = true;
break;
+ case 't':
+ cfg_num_threads = strtoul(optarg, NULL, 0);
+ break;
}
}
--
2.53.0
^ permalink raw reply related [flat|nested] 6+ messages in thread
* [PATCH 3/5] selftests: net: remove unused variable in process_recvzc()
2026-04-08 16:38 [PATCH 0/5] selftests: net: add multithread and multiqueue support to iou-zcrx Juanlu Herrero
2026-04-08 16:38 ` [PATCH 1/5] selftests: net: fix get_refill_ring_size() to use its local variable Juanlu Herrero
2026-04-08 16:38 ` [PATCH 2/5] selftests: net: add multithread client support to iou-zcrx Juanlu Herrero
@ 2026-04-08 16:38 ` Juanlu Herrero
2026-04-08 16:38 ` [PATCH 4/5] selftests: net: add multithread server support to iou-zcrx Juanlu Herrero
2026-04-08 16:38 ` [PATCH 5/5] selftests: net: add rss_multiqueue test variant " Juanlu Herrero
4 siblings, 0 replies; 6+ messages in thread
From: Juanlu Herrero @ 2026-04-08 16:38 UTC (permalink / raw)
To: netdev; +Cc: Juanlu Herrero
Remove unused `sqe` variable in preparation for multiqueue
rss selftest changes to process_recvzc() in the following
commit.
Signed-off-by: Juanlu Herrero <juanlu@fastmail.com>
---
tools/testing/selftests/drivers/net/hw/iou-zcrx.c | 1 -
1 file changed, 1 deletion(-)
diff --git a/tools/testing/selftests/drivers/net/hw/iou-zcrx.c b/tools/testing/selftests/drivers/net/hw/iou-zcrx.c
index de2eea78a5b6..6185c855b85c 100644
--- a/tools/testing/selftests/drivers/net/hw/iou-zcrx.c
+++ b/tools/testing/selftests/drivers/net/hw/iou-zcrx.c
@@ -276,7 +276,6 @@ static void process_recvzc(struct io_uring *ring, struct io_uring_cqe *cqe)
unsigned rq_mask = rq_ring.ring_entries - 1;
struct io_uring_zcrx_cqe *rcqe;
struct io_uring_zcrx_rqe *rqe;
- struct io_uring_sqe *sqe;
uint64_t mask;
char *data;
ssize_t n;
--
2.53.0
^ permalink raw reply related [flat|nested] 6+ messages in thread
* [PATCH 4/5] selftests: net: add multithread server support to iou-zcrx
2026-04-08 16:38 [PATCH 0/5] selftests: net: add multithread and multiqueue support to iou-zcrx Juanlu Herrero
` (2 preceding siblings ...)
2026-04-08 16:38 ` [PATCH 3/5] selftests: net: remove unused variable in process_recvzc() Juanlu Herrero
@ 2026-04-08 16:38 ` Juanlu Herrero
2026-04-08 16:38 ` [PATCH 5/5] selftests: net: add rss_multiqueue test variant " Juanlu Herrero
4 siblings, 0 replies; 6+ messages in thread
From: Juanlu Herrero @ 2026-04-08 16:38 UTC (permalink / raw)
To: netdev; +Cc: Juanlu Herrero
Move server state (io_uring ring, zcrx area, receive tracking) from
global variables into struct thread_ctx and thread the server side.
The main thread creates a single listening socket, spawns N worker
threads (each setting up its own io_uring and zcrx instance), then
accepts N connections and distributes them to the workers via
pthread barriers for synchronization.
Signed-off-by: Juanlu Herrero <juanlu@fastmail.com>
---
.../selftests/drivers/net/hw/iou-zcrx.c | 247 ++++++++++--------
1 file changed, 140 insertions(+), 107 deletions(-)
diff --git a/tools/testing/selftests/drivers/net/hw/iou-zcrx.c b/tools/testing/selftests/drivers/net/hw/iou-zcrx.c
index 6185c855b85c..646682167bb0 100644
--- a/tools/testing/selftests/drivers/net/hw/iou-zcrx.c
+++ b/tools/testing/selftests/drivers/net/hw/iou-zcrx.c
@@ -89,20 +89,22 @@ static bool cfg_dry_run;
static int cfg_num_threads = 1;
static char *payload;
+static pthread_barrier_t barrier;
struct thread_ctx {
+ struct io_uring ring;
+ void *area_ptr;
+ void *ring_ptr;
+ size_t ring_size;
+ struct io_uring_zcrx_rq rq_ring;
+ unsigned long area_token;
+ int connfd;
+ bool stop;
+ size_t received;
+ int queue_id;
int thread_id;
};
-static void *area_ptr;
-static void *ring_ptr;
-static size_t ring_size;
-static struct io_uring_zcrx_rq rq_ring;
-static unsigned long area_token;
-static int connfd;
-static bool stop;
-static size_t received;
-
static unsigned long gettimeofday_ms(void)
{
struct timeval tv;
@@ -145,7 +147,7 @@ static inline size_t get_refill_ring_size(unsigned int rq_entries)
return ALIGN_UP(size, page_size);
}
-static void setup_zcrx(struct io_uring *ring)
+static void setup_zcrx(struct thread_ctx *ctx)
{
unsigned int ifindex;
unsigned int rq_entries = 4096;
@@ -156,58 +158,58 @@ static void setup_zcrx(struct io_uring *ring)
error(1, 0, "bad interface name: %s", cfg_ifname);
if (cfg_rx_buf_len && cfg_rx_buf_len != page_size) {
- area_ptr = mmap(NULL,
- AREA_SIZE,
- PROT_READ | PROT_WRITE,
- MAP_ANONYMOUS | MAP_PRIVATE |
- MAP_HUGETLB | MAP_HUGE_2MB,
- -1,
- 0);
- if (area_ptr == MAP_FAILED) {
+ ctx->area_ptr = mmap(NULL,
+ AREA_SIZE,
+ PROT_READ | PROT_WRITE,
+ MAP_ANONYMOUS | MAP_PRIVATE |
+ MAP_HUGETLB | MAP_HUGE_2MB,
+ -1,
+ 0);
+ if (ctx->area_ptr == MAP_FAILED) {
printf("Can't allocate huge pages\n");
exit(SKIP_CODE);
}
} else {
- area_ptr = mmap(NULL,
- AREA_SIZE,
- PROT_READ | PROT_WRITE,
- MAP_ANONYMOUS | MAP_PRIVATE,
- 0,
- 0);
- if (area_ptr == MAP_FAILED)
+ ctx->area_ptr = mmap(NULL,
+ AREA_SIZE,
+ PROT_READ | PROT_WRITE,
+ MAP_ANONYMOUS | MAP_PRIVATE,
+ 0,
+ 0);
+ if (ctx->area_ptr == MAP_FAILED)
error(1, 0, "mmap(): zero copy area");
}
- ring_size = get_refill_ring_size(rq_entries);
- ring_ptr = mmap(NULL,
- ring_size,
- PROT_READ | PROT_WRITE,
- MAP_ANONYMOUS | MAP_PRIVATE,
- 0,
- 0);
+ ctx->ring_size = get_refill_ring_size(rq_entries);
+ ctx->ring_ptr = mmap(NULL,
+ ctx->ring_size,
+ PROT_READ | PROT_WRITE,
+ MAP_ANONYMOUS | MAP_PRIVATE,
+ 0,
+ 0);
struct io_uring_region_desc region_reg = {
- .size = ring_size,
- .user_addr = (__u64)(unsigned long)ring_ptr,
+ .size = ctx->ring_size,
+ .user_addr = (__u64)(unsigned long)ctx->ring_ptr,
.flags = IORING_MEM_REGION_TYPE_USER,
};
struct io_uring_zcrx_area_reg area_reg = {
- .addr = (__u64)(unsigned long)area_ptr,
+ .addr = (__u64)(unsigned long)ctx->area_ptr,
.len = AREA_SIZE,
.flags = 0,
};
struct t_io_uring_zcrx_ifq_reg reg = {
.if_idx = ifindex,
- .if_rxq = cfg_queue_id,
+ .if_rxq = ctx->queue_id,
.rq_entries = rq_entries,
.area_ptr = (__u64)(unsigned long)&area_reg,
.region_ptr = (__u64)(unsigned long)®ion_reg,
.rx_buf_len = cfg_rx_buf_len,
};
- ret = io_uring_register_ifq(ring, (void *)®);
+ ret = io_uring_register_ifq(&ctx->ring, (void *)®);
if (cfg_rx_buf_len && (ret == -EINVAL || ret == -EOPNOTSUPP ||
ret == -ERANGE)) {
printf("Large chunks are not supported %i\n", ret);
@@ -216,64 +218,40 @@ static void setup_zcrx(struct io_uring *ring)
error(1, 0, "io_uring_register_ifq(): %d", ret);
}
- rq_ring.khead = (unsigned int *)((char *)ring_ptr + reg.offsets.head);
- rq_ring.ktail = (unsigned int *)((char *)ring_ptr + reg.offsets.tail);
- rq_ring.rqes = (struct io_uring_zcrx_rqe *)((char *)ring_ptr + reg.offsets.rqes);
- rq_ring.rq_tail = 0;
- rq_ring.ring_entries = reg.rq_entries;
-
- area_token = area_reg.rq_area_token;
-}
-
-static void add_accept(struct io_uring *ring, int sockfd)
-{
- struct io_uring_sqe *sqe;
-
- sqe = io_uring_get_sqe(ring);
+ ctx->rq_ring.khead = (unsigned int *)((char *)ctx->ring_ptr + reg.offsets.head);
+ ctx->rq_ring.ktail = (unsigned int *)((char *)ctx->ring_ptr + reg.offsets.tail);
+ ctx->rq_ring.rqes = (struct io_uring_zcrx_rqe *)((char *)ctx->ring_ptr + reg.offsets.rqes);
+ ctx->rq_ring.rq_tail = 0;
+ ctx->rq_ring.ring_entries = reg.rq_entries;
- io_uring_prep_accept(sqe, sockfd, NULL, NULL, 0);
- sqe->user_data = 1;
+ ctx->area_token = area_reg.rq_area_token;
}
-static void add_recvzc(struct io_uring *ring, int sockfd)
+static void add_recvzc(struct thread_ctx *ctx, int sockfd)
{
struct io_uring_sqe *sqe;
- sqe = io_uring_get_sqe(ring);
+ sqe = io_uring_get_sqe(&ctx->ring);
io_uring_prep_rw(IORING_OP_RECV_ZC, sqe, sockfd, NULL, 0, 0);
sqe->ioprio |= IORING_RECV_MULTISHOT;
sqe->user_data = 2;
}
-static void add_recvzc_oneshot(struct io_uring *ring, int sockfd, size_t len)
+static void add_recvzc_oneshot(struct thread_ctx *ctx, int sockfd, size_t len)
{
struct io_uring_sqe *sqe;
- sqe = io_uring_get_sqe(ring);
+ sqe = io_uring_get_sqe(&ctx->ring);
io_uring_prep_rw(IORING_OP_RECV_ZC, sqe, sockfd, NULL, len, 0);
sqe->ioprio |= IORING_RECV_MULTISHOT;
sqe->user_data = 2;
}
-static void process_accept(struct io_uring *ring, struct io_uring_cqe *cqe)
+static void process_recvzc(struct thread_ctx *ctx, struct io_uring_cqe *cqe)
{
- if (cqe->res < 0)
- error(1, 0, "accept()");
- if (connfd)
- error(1, 0, "Unexpected second connection");
-
- connfd = cqe->res;
- if (cfg_oneshot)
- add_recvzc_oneshot(ring, connfd, page_size);
- else
- add_recvzc(ring, connfd);
-}
-
-static void process_recvzc(struct io_uring *ring, struct io_uring_cqe *cqe)
-{
- unsigned rq_mask = rq_ring.ring_entries - 1;
+ unsigned rq_mask = ctx->rq_ring.ring_entries - 1;
struct io_uring_zcrx_cqe *rcqe;
struct io_uring_zcrx_rqe *rqe;
uint64_t mask;
@@ -282,7 +260,7 @@ static void process_recvzc(struct io_uring *ring, struct io_uring_cqe *cqe)
int i;
if (cqe->res == 0 && cqe->flags == 0 && cfg_oneshot_recvs == 0) {
- stop = true;
+ ctx->stop = true;
return;
}
@@ -291,59 +269,99 @@ static void process_recvzc(struct io_uring *ring, struct io_uring_cqe *cqe)
if (cfg_oneshot) {
if (cqe->res == 0 && cqe->flags == 0 && cfg_oneshot_recvs) {
- add_recvzc_oneshot(ring, connfd, page_size);
+ add_recvzc_oneshot(ctx, ctx->connfd, page_size);
cfg_oneshot_recvs--;
}
} else if (!(cqe->flags & IORING_CQE_F_MORE)) {
- add_recvzc(ring, connfd);
+ add_recvzc(ctx, ctx->connfd);
}
rcqe = (struct io_uring_zcrx_cqe *)(cqe + 1);
n = cqe->res;
mask = (1ULL << IORING_ZCRX_AREA_SHIFT) - 1;
- data = (char *)area_ptr + (rcqe->off & mask);
+ data = (char *)ctx->area_ptr + (rcqe->off & mask);
for (i = 0; i < n; i++) {
- if (*(data + i) != payload[(received + i)])
+ if (*(data + i) != payload[(ctx->received + i)])
error(1, 0, "payload mismatch at %d", i);
}
- received += n;
+ ctx->received += n;
- rqe = &rq_ring.rqes[(rq_ring.rq_tail & rq_mask)];
- rqe->off = (rcqe->off & ~IORING_ZCRX_AREA_MASK) | area_token;
+ rqe = &ctx->rq_ring.rqes[(ctx->rq_ring.rq_tail & rq_mask)];
+ rqe->off = (rcqe->off & ~IORING_ZCRX_AREA_MASK) | ctx->area_token;
rqe->len = cqe->res;
- io_uring_smp_store_release(rq_ring.ktail, ++rq_ring.rq_tail);
+ io_uring_smp_store_release(ctx->rq_ring.ktail, ++ctx->rq_ring.rq_tail);
}
-static void server_loop(struct io_uring *ring)
+static void server_loop(struct thread_ctx *ctx)
{
struct io_uring_cqe *cqe;
unsigned int count = 0;
unsigned int head;
int i, ret;
- io_uring_submit_and_wait(ring, 1);
+ io_uring_submit_and_wait(&ctx->ring, 1);
- io_uring_for_each_cqe(ring, head, cqe) {
- if (cqe->user_data == 1)
- process_accept(ring, cqe);
- else if (cqe->user_data == 2)
- process_recvzc(ring, cqe);
+ io_uring_for_each_cqe(&ctx->ring, head, cqe) {
+ if (cqe->user_data == 2)
+ process_recvzc(ctx, cqe);
else
error(1, 0, "unknown cqe");
count++;
}
- io_uring_cq_advance(ring, count);
+ io_uring_cq_advance(&ctx->ring, count);
}
-static void run_server(void)
+static void *server_worker(void *arg)
{
+ struct thread_ctx *ctx = arg;
unsigned int flags = 0;
- struct io_uring ring;
- int fd, enable, ret;
uint64_t tstop;
+ flags |= IORING_SETUP_COOP_TASKRUN;
+ flags |= IORING_SETUP_SINGLE_ISSUER;
+ flags |= IORING_SETUP_DEFER_TASKRUN;
+ flags |= IORING_SETUP_SUBMIT_ALL;
+ flags |= IORING_SETUP_CQE32;
+
+ io_uring_queue_init(512, &ctx->ring, flags);
+
+ setup_zcrx(ctx);
+
+ pthread_barrier_wait(&barrier);
+
+ if (cfg_dry_run)
+ return NULL;
+
+ pthread_barrier_wait(&barrier);
+
+ if (cfg_oneshot)
+ add_recvzc_oneshot(ctx, ctx->connfd, page_size);
+ else
+ add_recvzc(ctx, ctx->connfd);
+
+ tstop = gettimeofday_ms() + 5000;
+ while (!ctx->stop && gettimeofday_ms() < tstop)
+ server_loop(ctx);
+
+ if (!ctx->stop)
+ error(1, 0, "test failed\n");
+
+ return NULL;
+}
+
+static void run_server(void)
+{
+ struct thread_ctx *ctxs;
+ pthread_t *threads;
+ int fd, ret, i, enable;
+
+ ctxs = calloc(cfg_num_threads, sizeof(*ctxs));
+ threads = calloc(cfg_num_threads, sizeof(*threads));
+ if (!ctxs || !threads)
+ error(1, 0, "calloc()");
+
fd = socket(AF_INET6, SOCK_STREAM, 0);
if (fd == -1)
error(1, 0, "socket()");
@@ -360,26 +378,41 @@ static void run_server(void)
if (listen(fd, 1024) < 0)
error(1, 0, "listen()");
- flags |= IORING_SETUP_COOP_TASKRUN;
- flags |= IORING_SETUP_SINGLE_ISSUER;
- flags |= IORING_SETUP_DEFER_TASKRUN;
- flags |= IORING_SETUP_SUBMIT_ALL;
- flags |= IORING_SETUP_CQE32;
+ pthread_barrier_init(&barrier, NULL, cfg_num_threads + 1);
+
+ for (i = 0; i < cfg_num_threads; i++) {
+ ctxs[i].queue_id = cfg_queue_id + i;
+ ctxs[i].thread_id = i;
+ }
- io_uring_queue_init(512, &ring, flags);
+ for (i = 0; i < cfg_num_threads; i++) {
+ ret = pthread_create(&threads[i], NULL, server_worker,
+ &ctxs[i]);
+ if (ret)
+ error(1, ret, "pthread_create()");
+ }
+
+ pthread_barrier_wait(&barrier);
- setup_zcrx(&ring);
if (cfg_dry_run)
- return;
+ goto join;
+
+ for (i = 0; i < cfg_num_threads; i++) {
+ ctxs[i].connfd = accept(fd, NULL, NULL);
+ if (ctxs[i].connfd < 0)
+ error(1, 0, "accept()");
+ }
- add_accept(&ring, fd);
+ pthread_barrier_wait(&barrier);
- tstop = gettimeofday_ms() + 5000;
- while (!stop && gettimeofday_ms() < tstop)
- server_loop(&ring);
+join:
+ for (i = 0; i < cfg_num_threads; i++)
+ pthread_join(threads[i], NULL);
- if (!stop)
- error(1, 0, "test failed\n");
+ pthread_barrier_destroy(&barrier);
+ close(fd);
+ free(threads);
+ free(ctxs);
}
static void *client_worker(void *arg)
--
2.53.0
^ permalink raw reply related [flat|nested] 6+ messages in thread
* [PATCH 5/5] selftests: net: add rss_multiqueue test variant to iou-zcrx
2026-04-08 16:38 [PATCH 0/5] selftests: net: add multithread and multiqueue support to iou-zcrx Juanlu Herrero
` (3 preceding siblings ...)
2026-04-08 16:38 ` [PATCH 4/5] selftests: net: add multithread server support to iou-zcrx Juanlu Herrero
@ 2026-04-08 16:38 ` Juanlu Herrero
4 siblings, 0 replies; 6+ messages in thread
From: Juanlu Herrero @ 2026-04-08 16:38 UTC (permalink / raw)
To: netdev; +Cc: Juanlu Herrero
Add multi-port support to the iou-zcrx test binary and a new
rss_multiqueue Python test variant that exercises multi-queue zero-copy
receive with per-port flow rule steering.
In multi-port mode, the server creates N listening sockets on
consecutive ports (cfg_port, cfg_port+1, ...) and uses epoll to accept
one connection per socket. Each client thread connects to its
corresponding port. Per-port ntuple flow rules steer traffic to
different NIC hardware queues, each with its own zcrx instance.
For single-thread mode (the default), behavior is unchanged: one socket
on cfg_port, one thread, one queue.
Signed-off-by: Juanlu Herrero <juanlu@fastmail.com>
---
.../selftests/drivers/net/hw/iou-zcrx.c | 81 ++++++++++++++-----
.../selftests/drivers/net/hw/iou-zcrx.py | 45 ++++++++++-
2 files changed, 104 insertions(+), 22 deletions(-)
diff --git a/tools/testing/selftests/drivers/net/hw/iou-zcrx.c b/tools/testing/selftests/drivers/net/hw/iou-zcrx.c
index 646682167bb0..1f33d7127185 100644
--- a/tools/testing/selftests/drivers/net/hw/iou-zcrx.c
+++ b/tools/testing/selftests/drivers/net/hw/iou-zcrx.c
@@ -102,6 +102,7 @@ struct thread_ctx {
bool stop;
size_t received;
int queue_id;
+ int port;
int thread_id;
};
@@ -353,35 +354,47 @@ static void *server_worker(void *arg)
static void run_server(void)
{
+ struct epoll_event ev, events[64];
struct thread_ctx *ctxs;
+ struct sockaddr_in6 addr;
pthread_t *threads;
- int fd, ret, i, enable;
+ int *fds;
+ int epfd, nfds, accepted;
+ int ret, i, enable;
ctxs = calloc(cfg_num_threads, sizeof(*ctxs));
threads = calloc(cfg_num_threads, sizeof(*threads));
- if (!ctxs || !threads)
+ fds = calloc(cfg_num_threads, sizeof(*fds));
+ if (!ctxs || !threads || !fds)
error(1, 0, "calloc()");
- fd = socket(AF_INET6, SOCK_STREAM, 0);
- if (fd == -1)
- error(1, 0, "socket()");
+ for (i = 0; i < cfg_num_threads; i++) {
+ fds[i] = socket(AF_INET6, SOCK_STREAM, 0);
+ if (fds[i] == -1)
+ error(1, 0, "socket()");
+
+ enable = 1;
+ ret = setsockopt(fds[i], SOL_SOCKET, SO_REUSEADDR,
+ &enable, sizeof(int));
+ if (ret < 0)
+ error(1, 0, "setsockopt(SO_REUSEADDR)");
- enable = 1;
- ret = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int));
- if (ret < 0)
- error(1, 0, "setsockopt(SO_REUSEADDR)");
+ addr = cfg_addr;
+ addr.sin6_port = htons(cfg_port + i);
- ret = bind(fd, (struct sockaddr *)&cfg_addr, sizeof(cfg_addr));
- if (ret < 0)
- error(1, 0, "bind()");
+ ret = bind(fds[i], (struct sockaddr *)&addr, sizeof(addr));
+ if (ret < 0)
+ error(1, 0, "bind()");
- if (listen(fd, 1024) < 0)
- error(1, 0, "listen()");
+ if (listen(fds[i], 1024) < 0)
+ error(1, 0, "listen()");
+ }
pthread_barrier_init(&barrier, NULL, cfg_num_threads + 1);
for (i = 0; i < cfg_num_threads; i++) {
ctxs[i].queue_id = cfg_queue_id + i;
+ ctxs[i].port = cfg_port + i;
ctxs[i].thread_id = i;
}
@@ -397,12 +410,36 @@ static void run_server(void)
if (cfg_dry_run)
goto join;
+ epfd = epoll_create1(0);
+ if (epfd < 0)
+ error(1, 0, "epoll_create1()");
+
for (i = 0; i < cfg_num_threads; i++) {
- ctxs[i].connfd = accept(fd, NULL, NULL);
- if (ctxs[i].connfd < 0)
- error(1, 0, "accept()");
+ ev.events = EPOLLIN;
+ ev.data.u32 = i;
+ if (epoll_ctl(epfd, EPOLL_CTL_ADD, fds[i], &ev) < 0)
+ error(1, 0, "epoll_ctl()");
}
+ accepted = 0;
+ while (accepted < cfg_num_threads) {
+ nfds = epoll_wait(epfd, events, 64, 5000);
+ if (nfds < 0)
+ error(1, 0, "epoll_wait()");
+ if (nfds == 0)
+ error(1, 0, "epoll_wait() timeout");
+
+ for (i = 0; i < nfds; i++) {
+ int idx = events[i].data.u32;
+
+ ctxs[idx].connfd = accept(fds[idx], NULL, NULL);
+ if (ctxs[idx].connfd < 0)
+ error(1, 0, "accept()");
+ accepted++;
+ }
+ }
+
+ close(epfd);
pthread_barrier_wait(&barrier);
join:
@@ -410,23 +447,29 @@ static void run_server(void)
pthread_join(threads[i], NULL);
pthread_barrier_destroy(&barrier);
- close(fd);
+ for (i = 0; i < cfg_num_threads; i++)
+ close(fds[i]);
+ free(fds);
free(threads);
free(ctxs);
}
static void *client_worker(void *arg)
{
+ struct thread_ctx *ctx = arg;
+ struct sockaddr_in6 addr = cfg_addr;
ssize_t to_send = cfg_send_size;
ssize_t sent = 0;
ssize_t chunk, res;
int fd;
+ addr.sin6_port = htons(cfg_port + ctx->thread_id);
+
fd = socket(AF_INET6, SOCK_STREAM, 0);
if (fd == -1)
error(1, 0, "socket()");
- if (connect(fd, (struct sockaddr *)&cfg_addr, sizeof(cfg_addr)))
+ if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)))
error(1, 0, "connect()");
while (to_send) {
diff --git a/tools/testing/selftests/drivers/net/hw/iou-zcrx.py b/tools/testing/selftests/drivers/net/hw/iou-zcrx.py
index e81724cb5542..c918cdaf6b1b 100755
--- a/tools/testing/selftests/drivers/net/hw/iou-zcrx.py
+++ b/tools/testing/selftests/drivers/net/hw/iou-zcrx.py
@@ -35,6 +35,12 @@ def set_flow_rule(cfg):
return int(values)
+def set_flow_rule_port(cfg, port, queue):
+ output = ethtool(f"-N {cfg.ifname} flow-type tcp6 dst-port {port} action {queue}").stdout
+ values = re.search(r'ID (\d+)', output).group(1)
+ return int(values)
+
+
def set_flow_rule_rss(cfg, rss_ctx_id):
output = ethtool(f"-N {cfg.ifname} flow-type tcp6 dst-port {cfg.port} context {rss_ctx_id}").stdout
values = re.search(r'ID (\d+)', output).group(1)
@@ -100,18 +106,51 @@ def rss(cfg):
defer(ethtool, f"-N {cfg.ifname} delete {flow_rule_id}")
+def rss_multiqueue(cfg):
+ channels = cfg.ethnl.channels_get({'header': {'dev-index': cfg.ifindex}})
+ channels = channels['combined-count']
+ if channels < 3:
+ raise KsftSkipEx('Test requires NETIF with at least 3 combined channels')
+
+ rings = cfg.ethnl.rings_get({'header': {'dev-index': cfg.ifindex}})
+ rx_rings = rings['rx']
+ hds_thresh = rings.get('hds-thresh', 0)
+
+ cfg.ethnl.rings_set({'header': {'dev-index': cfg.ifindex},
+ 'tcp-data-split': 'enabled',
+ 'hds-thresh': 0,
+ 'rx': 64})
+ defer(cfg.ethnl.rings_set, {'header': {'dev-index': cfg.ifindex},
+ 'tcp-data-split': 'unknown',
+ 'hds-thresh': hds_thresh,
+ 'rx': rx_rings})
+ defer(mp_clear_wait, cfg)
+
+ cfg.num_threads = 2
+ cfg.target = channels - cfg.num_threads
+ ethtool(f"-X {cfg.ifname} equal {cfg.target}")
+ defer(ethtool, f"-X {cfg.ifname} default")
+
+ for i in range(cfg.num_threads):
+ flow_rule_id = set_flow_rule_port(cfg, cfg.port + i, cfg.target + i)
+ defer(ethtool, f"-N {cfg.ifname} delete {flow_rule_id}")
+
+
@ksft_variants([
KsftNamedVariant("single", single),
KsftNamedVariant("rss", rss),
+ KsftNamedVariant("rss_multiqueue", rss_multiqueue),
])
def test_zcrx(cfg, setup) -> None:
cfg.require_ipver('6')
+ cfg.num_threads = getattr(cfg, 'num_threads', 1)
setup(cfg)
- rx_cmd = f"{cfg.bin_local} -s -p {cfg.port} -i {cfg.ifname} -q {cfg.target}"
- tx_cmd = f"{cfg.bin_remote} -c -h {cfg.addr_v['6']} -p {cfg.port} -l 12840"
+ rx_cmd = f"{cfg.bin_local} -s -p {cfg.port} -i {cfg.ifname} -q {cfg.target} -t {cfg.num_threads}"
+ tx_cmd = f"{cfg.bin_remote} -c -h {cfg.addr_v['6']} -p {cfg.port} -l 12840 -t {cfg.num_threads}"
with bkg(rx_cmd, exit_wait=True):
- wait_port_listen(cfg.port, proto="tcp")
+ for i in range(cfg.num_threads):
+ wait_port_listen(cfg.port + i, proto="tcp")
cmd(tx_cmd, host=cfg.remote)
--
2.53.0
^ permalink raw reply related [flat|nested] 6+ messages in thread
end of thread, other threads:[~2026-04-08 16:38 UTC | newest]
Thread overview: 6+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2026-04-08 16:38 [PATCH 0/5] selftests: net: add multithread and multiqueue support to iou-zcrx Juanlu Herrero
2026-04-08 16:38 ` [PATCH 1/5] selftests: net: fix get_refill_ring_size() to use its local variable Juanlu Herrero
2026-04-08 16:38 ` [PATCH 2/5] selftests: net: add multithread client support to iou-zcrx Juanlu Herrero
2026-04-08 16:38 ` [PATCH 3/5] selftests: net: remove unused variable in process_recvzc() Juanlu Herrero
2026-04-08 16:38 ` [PATCH 4/5] selftests: net: add multithread server support to iou-zcrx Juanlu Herrero
2026-04-08 16:38 ` [PATCH 5/5] selftests: net: add rss_multiqueue test variant " Juanlu Herrero
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox