From: Stefan Hajnoczi <stefanha@redhat.com>
To: qemu-devel@nongnu.org
Cc: Hanna Reitz <hreitz@redhat.com>, Kevin Wolf <kwolf@redhat.com>,
Stefan Weil <sw@weilnetz.de>, Paolo Bonzini <pbonzini@redhat.com>,
Fam Zheng <fam@euphon.net>,
eblake@redhat.com, Stefano Garzarella <sgarzare@redhat.com>,
qemu-block@nongnu.org, Stefan Hajnoczi <stefanha@redhat.com>,
Aarushi Mehta <mehta.aaru20@gmail.com>,
hibriansong@gmail.com
Subject: [PATCH v2 09/12] aio-posix: add aio_add_sqe() API for user-defined io_uring requests
Date: Thu, 19 Jun 2025 20:08:25 -0400 [thread overview]
Message-ID: <20250620000829.1426291-10-stefanha@redhat.com> (raw)
In-Reply-To: <20250620000829.1426291-1-stefanha@redhat.com>
Introduce the aio_add_sqe() API for submitting io_uring requests in the
current AioContext. This allows other components in QEMU, like the block
layer, to take advantage of io_uring features without creating their own
io_uring context.
This API supports nested event loops just like file descriptor
monitoring and BHs do. This comes at a complexity cost: a BH is required
to dispatch CQE callbacks and they are placed on a list so that a nested
event loop can invoke its parent's pending CQE callbacks. If you're
wondering why CqeHandler exists instead of just a callback function
pointer, this is why.
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
Reviewed-by: Eric Blake <eblake@redhat.com>
---
v2:
- Fix pre_sqe -> prep_sqe typo [Eric]
- Add #endif terminator comment [Eric]
---
include/block/aio.h | 84 ++++++++++++++++++++++-
util/aio-posix.h | 1 +
util/aio-posix.c | 9 +++
util/fdmon-io_uring.c | 152 ++++++++++++++++++++++++++++++++----------
4 files changed, 208 insertions(+), 38 deletions(-)
diff --git a/include/block/aio.h b/include/block/aio.h
index d919d7c8f4..56ea0d47b7 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -61,6 +61,27 @@ typedef struct LuringState LuringState;
/* Is polling disabled? */
bool aio_poll_disabled(AioContext *ctx);
+#ifdef CONFIG_LINUX_IO_URING
+/*
+ * Each io_uring request must have a unique CqeHandler that processes the cqe.
+ * The lifetime of a CqeHandler must be at least from aio_add_sqe() until
+ * ->cb() invocation.
+ */
+typedef struct CqeHandler CqeHandler;
+struct CqeHandler {
+ /* Called by the AioContext when the request has completed */
+ void (*cb)(CqeHandler *handler);
+
+ /* Used internally, do not access this */
+ QSIMPLEQ_ENTRY(CqeHandler) next;
+
+ /* This field is filled in before ->cb() is called */
+ struct io_uring_cqe cqe;
+};
+
+typedef QSIMPLEQ_HEAD(, CqeHandler) CqeHandlerSimpleQ;
+#endif /* CONFIG_LINUX_IO_URING */
+
/* Callbacks for file descriptor monitoring implementations */
typedef struct {
/*
@@ -138,6 +159,27 @@ typedef struct {
* Called with list_lock incremented.
*/
void (*gsource_dispatch)(AioContext *ctx, AioHandlerList *ready_list);
+
+#ifdef CONFIG_LINUX_IO_URING
+ /**
+ * aio_add_sqe: Add an io_uring sqe for submission.
+ * @prep_sqe: invoked with an sqe that should be prepared for submission
+ * @opaque: user-defined argument to @prep_sqe()
+ * @cqe_handler: the unique cqe handler associated with this request
+ *
+ * The caller's @prep_sqe() function is invoked to fill in the details of
+ * the sqe. Do not call io_uring_sqe_set_data() on this sqe.
+ *
+ * The kernel may see the sqe as soon as @prep_sqe() returns or it may take
+ * until the next event loop iteration.
+ *
+ * This function is called from the current AioContext and is not
+ * thread-safe.
+ */
+ void (*add_sqe)(AioContext *ctx,
+ void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
+ void *opaque, CqeHandler *cqe_handler);
+#endif /* CONFIG_LINUX_IO_URING */
} FDMonOps;
/*
@@ -255,7 +297,11 @@ struct AioContext {
struct io_uring fdmon_io_uring;
AioHandlerSList submit_list;
gpointer io_uring_fd_tag;
-#endif
+
+ /* Pending callback state for cqe handlers */
+ CqeHandlerSimpleQ cqe_handler_ready_list;
+ QEMUBH *cqe_handler_bh;
+#endif /* CONFIG_LINUX_IO_URING */
/* TimerLists for calling timers - one per clock type. Has its own
* locking.
@@ -761,4 +807,40 @@ void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch);
*/
void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
int64_t max, Error **errp);
+
+#ifdef CONFIG_LINUX_IO_URING
+/**
+ * aio_has_io_uring: Return whether io_uring is available.
+ *
+ * io_uring is either available in all AioContexts or in none, so this only
+ * needs to be called once from within any thread's AioContext.
+ */
+static inline bool aio_has_io_uring(void)
+{
+ AioContext *ctx = qemu_get_current_aio_context();
+ return ctx->fdmon_ops->add_sqe;
+}
+
+/**
+ * aio_add_sqe: Add an io_uring sqe for submission.
+ * @prep_sqe: invoked with an sqe that should be prepared for submission
+ * @opaque: user-defined argument to @prep_sqe()
+ * @cqe_handler: the unique cqe handler associated with this request
+ *
+ * The caller's @prep_sqe() function is invoked to fill in the details of the
+ * sqe. Do not call io_uring_sqe_set_data() on this sqe.
+ *
+ * The sqe is submitted by the current AioContext. The kernel may see the sqe
+ * as soon as @prep_sqe() returns or it may take until the next event loop
+ * iteration.
+ *
+ * When the AioContext is destroyed, pending sqes are ignored and their
+ * CqeHandlers are not invoked.
+ *
+ * This function must be called only when aio_has_io_uring() returns true.
+ */
+void aio_add_sqe(void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
+ void *opaque, CqeHandler *cqe_handler);
+#endif /* CONFIG_LINUX_IO_URING */
+
#endif
diff --git a/util/aio-posix.h b/util/aio-posix.h
index 6f9d97d866..57ef801a5f 100644
--- a/util/aio-posix.h
+++ b/util/aio-posix.h
@@ -36,6 +36,7 @@ struct AioHandler {
#ifdef CONFIG_LINUX_IO_URING
QSLIST_ENTRY(AioHandler) node_submitted;
unsigned flags; /* see fdmon-io_uring.c */
+ CqeHandler cqe_handler;
#endif
int64_t poll_idle_timeout; /* when to stop userspace polling */
bool poll_ready; /* has polling detected an event? */
diff --git a/util/aio-posix.c b/util/aio-posix.c
index 800b4debbf..df945312b3 100644
--- a/util/aio-posix.c
+++ b/util/aio-posix.c
@@ -797,3 +797,12 @@ void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch)
aio_notify(ctx);
}
+
+#ifdef CONFIG_LINUX_IO_URING
+void aio_add_sqe(void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
+ void *opaque, CqeHandler *cqe_handler)
+{
+ AioContext *ctx = qemu_get_current_aio_context();
+ ctx->fdmon_ops->add_sqe(ctx, prep_sqe, opaque, cqe_handler);
+}
+#endif /* CONFIG_LINUX_IO_URING */
diff --git a/util/fdmon-io_uring.c b/util/fdmon-io_uring.c
index 7236c3dc35..6d4f33d2de 100644
--- a/util/fdmon-io_uring.c
+++ b/util/fdmon-io_uring.c
@@ -46,6 +46,7 @@
#include "qemu/osdep.h"
#include <poll.h>
#include "qapi/error.h"
+#include "qemu/defer-call.h"
#include "qemu/rcu_queue.h"
#include "aio-posix.h"
@@ -76,8 +77,8 @@ static inline int pfd_events_from_poll(int poll_events)
}
/*
- * Returns an sqe for submitting a request. Only be called within
- * fdmon_io_uring_wait().
+ * Returns an sqe for submitting a request. Only called from the AioContext
+ * thread.
*/
static struct io_uring_sqe *get_sqe(AioContext *ctx)
{
@@ -168,23 +169,43 @@ static void fdmon_io_uring_update(AioContext *ctx,
}
}
+static void fdmon_io_uring_add_sqe(AioContext *ctx,
+ void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
+ void *opaque, CqeHandler *cqe_handler)
+{
+ struct io_uring_sqe *sqe = get_sqe(ctx);
+
+ prep_sqe(sqe, opaque);
+ io_uring_sqe_set_data(sqe, cqe_handler);
+}
+
+static void fdmon_special_cqe_handler(CqeHandler *cqe_handler)
+{
+ /*
+ * This is an empty function that is never called. It is used as a function
+ * pointer to distinguish it from ordinary cqe handlers.
+ */
+}
+
static void add_poll_add_sqe(AioContext *ctx, AioHandler *node)
{
struct io_uring_sqe *sqe = get_sqe(ctx);
int events = poll_events_from_pfd(node->pfd.events);
io_uring_prep_poll_add(sqe, node->pfd.fd, events);
- io_uring_sqe_set_data(sqe, node);
+ node->cqe_handler.cb = fdmon_special_cqe_handler;
+ io_uring_sqe_set_data(sqe, &node->cqe_handler);
}
static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node)
{
struct io_uring_sqe *sqe = get_sqe(ctx);
+ CqeHandler *cqe_handler = &node->cqe_handler;
#ifdef LIBURING_HAVE_DATA64
- io_uring_prep_poll_remove(sqe, (uintptr_t)node);
+ io_uring_prep_poll_remove(sqe, (uintptr_t)cqe_handler);
#else
- io_uring_prep_poll_remove(sqe, node);
+ io_uring_prep_poll_remove(sqe, cqe_handler);
#endif
io_uring_sqe_set_data(sqe, NULL);
}
@@ -226,19 +247,13 @@ static void fill_sq_ring(AioContext *ctx)
}
}
-/* Returns true if a handler became ready */
-static bool process_cqe(AioContext *ctx,
- AioHandlerList *ready_list,
- struct io_uring_cqe *cqe)
+static bool process_cqe_aio_handler(AioContext *ctx,
+ AioHandlerList *ready_list,
+ AioHandler *node,
+ struct io_uring_cqe *cqe)
{
- AioHandler *node = io_uring_cqe_get_data(cqe);
unsigned flags;
- /* poll_timeout and poll_remove have a zero user_data field */
- if (!node) {
- return false;
- }
-
/*
* Deletion can only happen when IORING_OP_POLL_ADD completes. If we race
* with enqueue() here then we can safely clear the FDMON_IO_URING_REMOVE
@@ -262,6 +277,61 @@ static bool process_cqe(AioContext *ctx,
return true;
}
+/* Process CqeHandlers from the ready list */
+static void cqe_handler_bh(void *opaque)
+{
+ AioContext *ctx = opaque;
+ CqeHandlerSimpleQ *ready_list = &ctx->cqe_handler_ready_list;
+
+ /*
+ * If cqe_handler->cb() calls aio_poll() it must continue processing
+ * ready_list. Schedule a BH so the inner event loop calls us again.
+ */
+ qemu_bh_schedule(ctx->cqe_handler_bh);
+
+ /* Handlers may use defer_call() to coalesce frequent operations */
+ defer_call_begin();
+
+ while (!QSIMPLEQ_EMPTY(ready_list)) {
+ CqeHandler *cqe_handler = QSIMPLEQ_FIRST(ready_list);
+
+ QSIMPLEQ_REMOVE_HEAD(ready_list, next);
+
+ cqe_handler->cb(cqe_handler);
+ }
+
+ defer_call_end();
+
+ qemu_bh_cancel(ctx->cqe_handler_bh);
+}
+
+/* Returns true if a handler became ready */
+static bool process_cqe(AioContext *ctx,
+ AioHandlerList *ready_list,
+ struct io_uring_cqe *cqe)
+{
+ CqeHandler *cqe_handler = io_uring_cqe_get_data(cqe);
+
+ /* poll_timeout and poll_remove have a zero user_data field */
+ if (!cqe_handler) {
+ return false;
+ }
+
+ /*
+ * Special handling for AioHandler cqes. They need ready_list and have a
+ * return value.
+ */
+ if (cqe_handler->cb == fdmon_special_cqe_handler) {
+ AioHandler *node = container_of(cqe_handler, AioHandler, cqe_handler);
+ return process_cqe_aio_handler(ctx, ready_list, node, cqe);
+ }
+
+ cqe_handler->cqe = *cqe;
+ QSIMPLEQ_INSERT_TAIL(&ctx->cqe_handler_ready_list, cqe_handler, next);
+ qemu_bh_schedule(ctx->cqe_handler_bh);
+ return false;
+}
+
static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list)
{
struct io_uring *ring = &ctx->fdmon_io_uring;
@@ -361,6 +431,7 @@ static const FDMonOps fdmon_io_uring_ops = {
.gsource_prepare = fdmon_io_uring_gsource_prepare,
.gsource_check = fdmon_io_uring_gsource_check,
.gsource_dispatch = fdmon_io_uring_gsource_dispatch,
+ .add_sqe = fdmon_io_uring_add_sqe,
};
void fdmon_io_uring_setup(AioContext *ctx, Error **errp)
@@ -376,6 +447,8 @@ void fdmon_io_uring_setup(AioContext *ctx, Error **errp)
}
QSLIST_INIT(&ctx->submit_list);
+ QSIMPLEQ_INIT(&ctx->cqe_handler_ready_list);
+ ctx->cqe_handler_bh = aio_bh_new(ctx, cqe_handler_bh, ctx);
ctx->fdmon_ops = &fdmon_io_uring_ops;
ctx->io_uring_fd_tag = g_source_add_unix_fd(&ctx->source,
ctx->fdmon_io_uring.ring_fd, G_IO_IN);
@@ -383,33 +456,38 @@ void fdmon_io_uring_setup(AioContext *ctx, Error **errp)
void fdmon_io_uring_destroy(AioContext *ctx)
{
- if (ctx->fdmon_ops == &fdmon_io_uring_ops) {
- AioHandler *node;
+ AioHandler *node;
- io_uring_queue_exit(&ctx->fdmon_io_uring);
+ if (ctx->fdmon_ops != &fdmon_io_uring_ops) {
+ return;
+ }
- /* Move handlers due to be removed onto the deleted list */
- while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) {
- unsigned flags = qatomic_fetch_and(&node->flags,
- ~(FDMON_IO_URING_PENDING |
- FDMON_IO_URING_ADD |
- FDMON_IO_URING_REMOVE |
- FDMON_IO_URING_DELETE_AIO_HANDLER));
+ io_uring_queue_exit(&ctx->fdmon_io_uring);
- if ((flags & FDMON_IO_URING_REMOVE) ||
- (flags & FDMON_IO_URING_DELETE_AIO_HANDLER)) {
- QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers,
- node, node_deleted);
- }
+ /* Move handlers due to be removed onto the deleted list */
+ while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) {
+ unsigned flags = qatomic_fetch_and(&node->flags,
+ ~(FDMON_IO_URING_PENDING |
+ FDMON_IO_URING_ADD |
+ FDMON_IO_URING_REMOVE |
+ FDMON_IO_URING_DELETE_AIO_HANDLER));
- QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted);
+ if ((flags & FDMON_IO_URING_REMOVE) ||
+ (flags & FDMON_IO_URING_DELETE_AIO_HANDLER)) {
+ QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers,
+ node, node_deleted);
}
- g_source_remove_unix_fd(&ctx->source, ctx->io_uring_fd_tag);
- ctx->io_uring_fd_tag = NULL;
-
- qemu_lockcnt_lock(&ctx->list_lock);
- fdmon_poll_downgrade(ctx);
- qemu_lockcnt_unlock(&ctx->list_lock);
+ QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted);
}
+
+ g_source_remove_unix_fd(&ctx->source, ctx->io_uring_fd_tag);
+ ctx->io_uring_fd_tag = NULL;
+
+ assert(QSIMPLEQ_EMPTY(&ctx->cqe_handler_ready_list));
+ qemu_bh_delete(ctx->cqe_handler_bh);
+
+ qemu_lockcnt_lock(&ctx->list_lock);
+ fdmon_poll_downgrade(ctx);
+ qemu_lockcnt_unlock(&ctx->list_lock);
}
--
2.49.0
next prev parent reply other threads:[~2025-06-20 0:13 UTC|newest]
Thread overview: 19+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-06-20 0:08 [PATCH v2 00/12] aio: add the aio_add_sqe() io_uring API Stefan Hajnoczi
2025-06-20 0:08 ` [PATCH v2 01/12] aio-posix: fix race between io_uring CQE and AioHandler deletion Stefan Hajnoczi
2025-06-23 20:25 ` Eric Blake
2025-07-02 12:10 ` Kevin Wolf
2025-07-21 18:14 ` Stefan Hajnoczi
2025-07-21 20:47 ` Stefan Hajnoczi
2025-06-20 0:08 ` [PATCH v2 02/12] aio-posix: keep polling enabled with fdmon-io_uring.c Stefan Hajnoczi
2025-06-20 0:08 ` [PATCH v2 03/12] tests/unit: skip test-nested-aio-poll with io_uring Stefan Hajnoczi
2025-06-20 0:08 ` [PATCH v2 04/12] aio-posix: integrate fdmon into glib event loop Stefan Hajnoczi
2025-06-20 0:08 ` [PATCH v2 05/12] aio: remove aio_context_use_g_source() Stefan Hajnoczi
2025-06-20 0:08 ` [PATCH v2 06/12] aio: free AioContext when aio_context_new() fails Stefan Hajnoczi
2025-06-20 0:08 ` [PATCH v2 07/12] aio: add errp argument to aio_context_setup() Stefan Hajnoczi
2025-06-20 0:08 ` [PATCH v2 08/12] aio-posix: gracefully handle io_uring_queue_init() failure Stefan Hajnoczi
2025-06-23 20:39 ` Eric Blake
2025-06-20 0:08 ` Stefan Hajnoczi [this message]
2025-06-20 0:08 ` [PATCH v2 10/12] aio-posix: avoid EventNotifier for cqe_handler_bh Stefan Hajnoczi
2025-06-20 0:08 ` [PATCH v2 11/12] block/io_uring: use aio_add_sqe() Stefan Hajnoczi
2025-06-20 0:08 ` [PATCH v2 12/12] block/io_uring: use non-vectored read/write when possible Stefan Hajnoczi
2025-06-23 20:40 ` Eric Blake
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250620000829.1426291-10-stefanha@redhat.com \
--to=stefanha@redhat.com \
--cc=eblake@redhat.com \
--cc=fam@euphon.net \
--cc=hibriansong@gmail.com \
--cc=hreitz@redhat.com \
--cc=kwolf@redhat.com \
--cc=mehta.aaru20@gmail.com \
--cc=pbonzini@redhat.com \
--cc=qemu-block@nongnu.org \
--cc=qemu-devel@nongnu.org \
--cc=sgarzare@redhat.com \
--cc=sw@weilnetz.de \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).