qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: Kevin Wolf <kwolf@redhat.com>
To: qemu-block@nongnu.org
Cc: kwolf@redhat.com, qemu-devel@nongnu.org
Subject: [PULL 13/27] aio-posix: add aio_add_sqe() API for user-defined io_uring requests
Date: Tue,  4 Nov 2025 18:54:01 +0100	[thread overview]
Message-ID: <20251104175415.525388-14-kwolf@redhat.com> (raw)
In-Reply-To: <20251104175415.525388-1-kwolf@redhat.com>

From: Stefan Hajnoczi <stefanha@redhat.com>

Introduce the aio_add_sqe() API for submitting io_uring requests in the
current AioContext. This allows other components in QEMU, like the block
layer, to take advantage of io_uring features without creating their own
io_uring context.

This API supports nested event loops just like file descriptor
monitoring and BHs do. This comes at a complexity cost: CQE callbacks
must be placed on a list so that nested event loops can invoke pending
CQE callbacks from parent event loops. If you're wondering why
CqeHandler exists instead of just a callback function pointer, this is
why.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
Reviewed-by: Eric Blake <eblake@redhat.com>
Message-ID: <20251104022933.618123-14-stefanha@redhat.com>
Reviewed-by: Kevin Wolf <kwolf@redhat.com>
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
---
 include/block/aio.h   |  83 +++++++++++++++++++++++++++++++-
 util/aio-posix.h      |   1 +
 util/aio-posix.c      |   9 ++++
 util/fdmon-io_uring.c | 109 ++++++++++++++++++++++++++++++++++++------
 util/trace-events     |   4 ++
 5 files changed, 190 insertions(+), 16 deletions(-)

diff --git a/include/block/aio.h b/include/block/aio.h
index b266daa58f..05d1bf4036 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -61,6 +61,27 @@ typedef struct LuringState LuringState;
 /* Is polling disabled? */
 bool aio_poll_disabled(AioContext *ctx);
 
+#ifdef CONFIG_LINUX_IO_URING
+/*
+ * Each io_uring request must have a unique CqeHandler that processes the cqe.
+ * The lifetime of a CqeHandler must be at least from aio_add_sqe() until
+ * ->cb() invocation.
+ */
+typedef struct CqeHandler CqeHandler;
+struct CqeHandler {
+    /* Called by the AioContext when the request has completed */
+    void (*cb)(CqeHandler *handler);
+
+    /* Used internally, do not access this */
+    QSIMPLEQ_ENTRY(CqeHandler) next;
+
+    /* This field is filled in before ->cb() is called */
+    struct io_uring_cqe cqe;
+};
+
+typedef QSIMPLEQ_HEAD(, CqeHandler) CqeHandlerSimpleQ;
+#endif /* CONFIG_LINUX_IO_URING */
+
 /* Callbacks for file descriptor monitoring implementations */
 typedef struct {
     /*
@@ -157,6 +178,27 @@ typedef struct {
      * Called with list_lock incremented.
      */
     void (*gsource_dispatch)(AioContext *ctx, AioHandlerList *ready_list);
+
+#ifdef CONFIG_LINUX_IO_URING
+    /**
+     * add_sqe: Add an io_uring sqe for submission.
+     * @prep_sqe: invoked with an sqe that should be prepared for submission
+     * @opaque: user-defined argument to @prep_sqe()
+     * @cqe_handler: the unique cqe handler associated with this request
+     *
+     * The caller's @prep_sqe() function is invoked to fill in the details of
+     * the sqe. Do not call io_uring_sqe_set_data() on this sqe.
+     *
+     * The kernel may see the sqe as soon as @prep_sqe() returns or it may take
+     * until the next event loop iteration.
+     *
+     * This function is called from the current AioContext and is not
+     * thread-safe.
+     */
+    void (*add_sqe)(AioContext *ctx,
+                    void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
+                    void *opaque, CqeHandler *cqe_handler);
+#endif /* CONFIG_LINUX_IO_URING */
 } FDMonOps;
 
 /*
@@ -274,7 +316,10 @@ struct AioContext {
     struct io_uring fdmon_io_uring;
     AioHandlerSList submit_list;
     gpointer io_uring_fd_tag;
-#endif
+
+    /* Pending callback state for cqe handlers */
+    CqeHandlerSimpleQ cqe_handler_ready_list;
+#endif /* CONFIG_LINUX_IO_URING */
 
     /* TimerLists for calling timers - one per clock type.  Has its own
      * locking.
@@ -782,4 +827,40 @@ void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch);
  */
 void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
                                         int64_t max, Error **errp);
+
+#ifdef CONFIG_LINUX_IO_URING
+/**
+ * aio_has_io_uring: Return whether io_uring is available.
+ *
+ * io_uring is either available in all AioContexts or in none, so this only
+ * needs to be called once from within any thread's AioContext.
+ */
+static inline bool aio_has_io_uring(void)
+{
+    AioContext *ctx = qemu_get_current_aio_context();
+    return ctx->fdmon_ops->add_sqe;
+}
+
+/**
+ * aio_add_sqe: Add an io_uring sqe for submission.
+ * @prep_sqe: invoked with an sqe that should be prepared for submission
+ * @opaque: user-defined argument to @prep_sqe()
+ * @cqe_handler: the unique cqe handler associated with this request
+ *
+ * The caller's @prep_sqe() function is invoked to fill in the details of the
+ * sqe. Do not call io_uring_sqe_set_data() on this sqe.
+ *
+ * The sqe is submitted by the current AioContext. The kernel may see the sqe
+ * as soon as @prep_sqe() returns or it may take until the next event loop
+ * iteration.
+ *
+ * When the AioContext is destroyed, pending sqes are ignored and their
+ * CqeHandlers are not invoked.
+ *
+ * This function must be called only when aio_has_io_uring() returns true.
+ */
+void aio_add_sqe(void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
+                 void *opaque, CqeHandler *cqe_handler);
+#endif /* CONFIG_LINUX_IO_URING */
+
 #endif
diff --git a/util/aio-posix.h b/util/aio-posix.h
index dfa1a51c0b..babbfa8314 100644
--- a/util/aio-posix.h
+++ b/util/aio-posix.h
@@ -36,6 +36,7 @@ struct AioHandler {
 #ifdef CONFIG_LINUX_IO_URING
     QSLIST_ENTRY(AioHandler) node_submitted;
     unsigned flags; /* see fdmon-io_uring.c */
+    CqeHandler internal_cqe_handler; /* used for POLL_ADD/POLL_REMOVE */
 #endif
     int64_t poll_idle_timeout; /* when to stop userspace polling */
     bool poll_ready; /* has polling detected an event? */
diff --git a/util/aio-posix.c b/util/aio-posix.c
index 6ff36b6e51..e24b955fd9 100644
--- a/util/aio-posix.c
+++ b/util/aio-posix.c
@@ -806,3 +806,12 @@ void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch)
 
     aio_notify(ctx);
 }
+
+#ifdef CONFIG_LINUX_IO_URING
+void aio_add_sqe(void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
+                 void *opaque, CqeHandler *cqe_handler)
+{
+    AioContext *ctx = qemu_get_current_aio_context();
+    ctx->fdmon_ops->add_sqe(ctx, prep_sqe, opaque, cqe_handler);
+}
+#endif /* CONFIG_LINUX_IO_URING */
diff --git a/util/fdmon-io_uring.c b/util/fdmon-io_uring.c
index a06bbe2715..4230bf33e3 100644
--- a/util/fdmon-io_uring.c
+++ b/util/fdmon-io_uring.c
@@ -46,8 +46,10 @@
 #include "qemu/osdep.h"
 #include <poll.h>
 #include "qapi/error.h"
+#include "qemu/defer-call.h"
 #include "qemu/rcu_queue.h"
 #include "aio-posix.h"
+#include "trace.h"
 
 enum {
     FDMON_IO_URING_ENTRIES  = 128, /* sq/cq ring size */
@@ -76,8 +78,8 @@ static inline int pfd_events_from_poll(int poll_events)
 }
 
 /*
- * Returns an sqe for submitting a request.  Only be called within
- * fdmon_io_uring_wait().
+ * Returns an sqe for submitting a request. Only called from the AioContext
+ * thread.
  */
 static struct io_uring_sqe *get_sqe(AioContext *ctx)
 {
@@ -168,23 +170,46 @@ static void fdmon_io_uring_update(AioContext *ctx,
     }
 }
 
+static void fdmon_io_uring_add_sqe(AioContext *ctx,
+        void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
+        void *opaque, CqeHandler *cqe_handler)
+{
+    struct io_uring_sqe *sqe = get_sqe(ctx);
+
+    prep_sqe(sqe, opaque);
+    io_uring_sqe_set_data(sqe, cqe_handler);
+
+    trace_fdmon_io_uring_add_sqe(ctx, opaque, sqe->opcode, sqe->fd, sqe->off,
+                                 cqe_handler);
+}
+
+static void fdmon_special_cqe_handler(CqeHandler *cqe_handler)
+{
+    /*
+     * This is an empty function that is never called. It is used as a function
+     * pointer to distinguish it from ordinary cqe handlers.
+     */
+}
+
 static void add_poll_add_sqe(AioContext *ctx, AioHandler *node)
 {
     struct io_uring_sqe *sqe = get_sqe(ctx);
     int events = poll_events_from_pfd(node->pfd.events);
 
     io_uring_prep_poll_add(sqe, node->pfd.fd, events);
-    io_uring_sqe_set_data(sqe, node);
+    node->internal_cqe_handler.cb = fdmon_special_cqe_handler;
+    io_uring_sqe_set_data(sqe, &node->internal_cqe_handler);
 }
 
 static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node)
 {
     struct io_uring_sqe *sqe = get_sqe(ctx);
+    CqeHandler *cqe_handler = &node->internal_cqe_handler;
 
 #ifdef LIBURING_HAVE_DATA64
-    io_uring_prep_poll_remove(sqe, (uintptr_t)node);
+    io_uring_prep_poll_remove(sqe, (uintptr_t)cqe_handler);
 #else
-    io_uring_prep_poll_remove(sqe, node);
+    io_uring_prep_poll_remove(sqe, cqe_handler);
 #endif
     io_uring_sqe_set_data(sqe, NULL);
 }
@@ -219,19 +244,13 @@ static void fill_sq_ring(AioContext *ctx)
     }
 }
 
-/* Returns true if a handler became ready */
-static bool process_cqe(AioContext *ctx,
-                        AioHandlerList *ready_list,
-                        struct io_uring_cqe *cqe)
+static bool process_cqe_aio_handler(AioContext *ctx,
+                                    AioHandlerList *ready_list,
+                                    AioHandler *node,
+                                    struct io_uring_cqe *cqe)
 {
-    AioHandler *node = io_uring_cqe_get_data(cqe);
     unsigned flags;
 
-    /* poll_timeout and poll_remove have a zero user_data field */
-    if (!node) {
-        return false;
-    }
-
     /*
      * Deletion can only happen when IORING_OP_POLL_ADD completes.  If we race
      * with enqueue() here then we can safely clear the FDMON_IO_URING_REMOVE
@@ -255,6 +274,35 @@ static bool process_cqe(AioContext *ctx,
     return true;
 }
 
+/* Returns true if a handler became ready */
+static bool process_cqe(AioContext *ctx,
+                        AioHandlerList *ready_list,
+                        struct io_uring_cqe *cqe)
+{
+    CqeHandler *cqe_handler = io_uring_cqe_get_data(cqe);
+
+    /* poll_timeout and poll_remove have a zero user_data field */
+    if (!cqe_handler) {
+        return false;
+    }
+
+    /*
+     * Special handling for AioHandler cqes. They need ready_list and have a
+     * return value.
+     */
+    if (cqe_handler->cb == fdmon_special_cqe_handler) {
+        AioHandler *node = container_of(cqe_handler, AioHandler,
+                                        internal_cqe_handler);
+        return process_cqe_aio_handler(ctx, ready_list, node, cqe);
+    }
+
+    cqe_handler->cqe = *cqe;
+
+    /* Handlers are invoked later by fdmon_io_uring_dispatch() */
+    QSIMPLEQ_INSERT_TAIL(&ctx->cqe_handler_ready_list, cqe_handler, next);
+    return false;
+}
+
 static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list)
 {
     struct io_uring *ring = &ctx->fdmon_io_uring;
@@ -297,6 +345,32 @@ static bool fdmon_io_uring_gsource_check(AioContext *ctx)
     return g_source_query_unix_fd(&ctx->source, tag) & G_IO_IN;
 }
 
+/* Dispatch CQE handlers that are ready */
+static bool fdmon_io_uring_dispatch(AioContext *ctx)
+{
+    CqeHandlerSimpleQ *ready_list = &ctx->cqe_handler_ready_list;
+    bool progress = false;
+
+    /* Handlers may use defer_call() to coalesce frequent operations */
+    defer_call_begin();
+
+    while (!QSIMPLEQ_EMPTY(ready_list)) {
+        CqeHandler *cqe_handler = QSIMPLEQ_FIRST(ready_list);
+
+        QSIMPLEQ_REMOVE_HEAD(ready_list, next);
+
+        trace_fdmon_io_uring_cqe_handler(ctx, cqe_handler,
+                                         cqe_handler->cqe.res);
+        cqe_handler->cb(cqe_handler);
+        progress = true;
+    }
+
+    defer_call_end();
+
+    return progress;
+}
+
+
 /* This is where CQEs are processed in the glib event loop */
 static void fdmon_io_uring_gsource_dispatch(AioContext *ctx,
                                             AioHandlerList *ready_list)
@@ -369,9 +443,11 @@ static const FDMonOps fdmon_io_uring_ops = {
     .update = fdmon_io_uring_update,
     .wait = fdmon_io_uring_wait,
     .need_wait = fdmon_io_uring_need_wait,
+    .dispatch = fdmon_io_uring_dispatch,
     .gsource_prepare = fdmon_io_uring_gsource_prepare,
     .gsource_check = fdmon_io_uring_gsource_check,
     .gsource_dispatch = fdmon_io_uring_gsource_dispatch,
+    .add_sqe = fdmon_io_uring_add_sqe,
 };
 
 bool fdmon_io_uring_setup(AioContext *ctx, Error **errp)
@@ -387,6 +463,7 @@ bool fdmon_io_uring_setup(AioContext *ctx, Error **errp)
     }
 
     QSLIST_INIT(&ctx->submit_list);
+    QSIMPLEQ_INIT(&ctx->cqe_handler_ready_list);
     ctx->fdmon_ops = &fdmon_io_uring_ops;
     ctx->io_uring_fd_tag = g_source_add_unix_fd(&ctx->source,
             ctx->fdmon_io_uring.ring_fd, G_IO_IN);
@@ -423,6 +500,8 @@ void fdmon_io_uring_destroy(AioContext *ctx)
     g_source_remove_unix_fd(&ctx->source, ctx->io_uring_fd_tag);
     ctx->io_uring_fd_tag = NULL;
 
+    assert(QSIMPLEQ_EMPTY(&ctx->cqe_handler_ready_list));
+
     qemu_lockcnt_lock(&ctx->list_lock);
     fdmon_poll_downgrade(ctx);
     qemu_lockcnt_unlock(&ctx->list_lock);
diff --git a/util/trace-events b/util/trace-events
index bd8f25fb59..540d662507 100644
--- a/util/trace-events
+++ b/util/trace-events
@@ -24,6 +24,10 @@ buffer_move_empty(const char *buf, size_t len, const char *from) "%s: %zd bytes
 buffer_move(const char *buf, size_t len, const char *from) "%s: %zd bytes from %s"
 buffer_free(const char *buf, size_t len) "%s: capacity %zd"
 
+# fdmon-io_uring.c
+fdmon_io_uring_add_sqe(void *ctx, void *opaque, int opcode, int fd, uint64_t off, void *cqe_handler) "ctx %p opaque %p opcode %d fd %d off %"PRId64" cqe_handler %p"
+fdmon_io_uring_cqe_handler(void *ctx, void *cqe_handler, int cqe_res) "ctx %p cqe_handler %p cqe_res %d"
+
 # filemonitor-inotify.c
 qemu_file_monitor_add_watch(void *mon, const char *dirpath, const char *filename, void *cb, void *opaque, int64_t id) "File monitor %p add watch dir='%s' file='%s' cb=%p opaque=%p id=%" PRId64
 qemu_file_monitor_remove_watch(void *mon, const char *dirpath, int64_t id) "File monitor %p remove watch dir='%s' id=%" PRId64
-- 
2.51.1



  parent reply	other threads:[~2025-11-04 17:59 UTC|newest]

Thread overview: 30+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-11-04 17:53 [PULL 00/27] Block layer patches Kevin Wolf
2025-11-04 17:53 ` [PULL 01/27] aio-posix: fix race between io_uring CQE and AioHandler deletion Kevin Wolf
2025-11-04 17:53 ` [PULL 02/27] aio-posix: fix fdmon-io_uring.c timeout stack variable lifetime Kevin Wolf
2025-11-04 17:53 ` [PULL 03/27] aio-posix: fix spurious return from ->wait() due to signals Kevin Wolf
2025-11-04 17:53 ` [PULL 04/27] aio-posix: keep polling enabled with fdmon-io_uring.c Kevin Wolf
2025-11-04 17:53 ` [PULL 05/27] tests/unit: skip test-nested-aio-poll with io_uring Kevin Wolf
2025-11-04 17:53 ` [PULL 06/27] aio-posix: integrate fdmon into glib event loop Kevin Wolf
2025-11-05 15:06   ` Richard Henderson
2025-11-11 21:40     ` Kevin Wolf
2025-11-04 17:53 ` [PULL 07/27] aio: remove aio_context_use_g_source() Kevin Wolf
2025-11-04 17:53 ` [PULL 08/27] aio: free AioContext when aio_context_new() fails Kevin Wolf
2025-11-04 17:53 ` [PULL 09/27] aio: add errp argument to aio_context_setup() Kevin Wolf
2025-11-04 17:53 ` [PULL 10/27] aio-posix: gracefully handle io_uring_queue_init() failure Kevin Wolf
2025-11-04 17:53 ` [PULL 11/27] aio-posix: unindent fdmon_io_uring_destroy() Kevin Wolf
2025-11-04 17:54 ` [PULL 12/27] aio-posix: add fdmon_ops->dispatch() Kevin Wolf
2025-11-04 17:54 ` Kevin Wolf [this message]
2025-11-04 17:54 ` [PULL 14/27] block/io_uring: use aio_add_sqe() Kevin Wolf
2025-11-04 17:54 ` [PULL 15/27] block/io_uring: use non-vectored read/write when possible Kevin Wolf
2025-11-04 17:54 ` [PULL 16/27] block: replace TABs with space Kevin Wolf
2025-11-04 17:54 ` [PULL 17/27] block: Drop detach_subchain for bdrv_replace_node Kevin Wolf
2025-11-04 17:54 ` [PULL 18/27] iotests: Test resizing file node under raw with size/offset Kevin Wolf
2025-11-04 17:54 ` [PULL 19/27] qemu-img: Fix amend option parse error handling Kevin Wolf
2025-11-04 17:54 ` [PULL 20/27] iotests: Run iotests with sanitizers Kevin Wolf
2025-11-04 17:54 ` [PULL 21/27] qcow2: rename update_refcount_discard to queue_discard Kevin Wolf
2025-11-04 17:54 ` [PULL 22/27] qcow2: put discards in discard queue when discard-no-unref is enabled Kevin Wolf
2025-11-04 17:54 ` [PULL 23/27] tests/qemu-iotests/184: Fix skip message for qemu-img without throttle Kevin Wolf
2025-11-04 17:54 ` [PULL 24/27] tests/qemu-iotests: Improve the dry run list to speed up thorough testing Kevin Wolf
2025-11-04 17:54 ` [PULL 25/27] tests/qemu-iotest: Add more image formats to the " Kevin Wolf
2025-11-04 17:54 ` [PULL 26/27] block: Allow drivers to control protocol prefix at creation Kevin Wolf
2025-11-04 17:54 ` [PULL 27/27] qcow2, vmdk: Restrict creation with secondary file using protocol Kevin Wolf

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20251104175415.525388-14-kwolf@redhat.com \
    --to=kwolf@redhat.com \
    --cc=qemu-block@nongnu.org \
    --cc=qemu-devel@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).