* [PATCH 0/2] io: follow coroutine AioContext in qio_channel_yield()
@ 2023-08-23 23:45 Stefan Hajnoczi
2023-08-23 23:45 ` [PATCH 1/2] io: check there are no qio_channel_yield() coroutines during ->finalize() Stefan Hajnoczi
2023-08-23 23:45 ` [PATCH 2/2] io: follow coroutine AioContext in qio_channel_yield() Stefan Hajnoczi
0 siblings, 2 replies; 11+ messages in thread
From: Stefan Hajnoczi @ 2023-08-23 23:45 UTC (permalink / raw)
To: qemu-devel
Cc: Eric Blake, kwolf, Fam Zheng, Juan Quintela,
Vladimir Sementsov-Ogievskiy, Daniel Berrange, Hanna Reitz,
Paolo Bonzini, qemu-block, Leonardo Bras, Coiby Xu, Peter Xu,
Stefan Hajnoczi
The ongoing QEMU multi-queue block layer effort makes it possible for multiple
threads to process I/O in parallel. The nbd block driver is not compatible with
the multi-queue block layer yet because QIOChannel cannot be used easily from
coroutines running in multiple threads. This series changes the QIOChannel API
to make that possible.
Stefan Hajnoczi (2):
io: check there are no qio_channel_yield() coroutines during
->finalize()
io: follow coroutine AioContext in qio_channel_yield()
include/io/channel.h | 34 ++++++++-
include/qemu/vhost-user-server.h | 1 +
block/nbd.c | 11 +--
io/channel-command.c | 13 +++-
io/channel-file.c | 18 ++++-
io/channel-null.c | 3 +-
io/channel-socket.c | 18 ++++-
io/channel-tls.c | 6 +-
io/channel.c | 124 ++++++++++++++++++++++---------
migration/channel-block.c | 3 +-
nbd/client.c | 2 +-
nbd/server.c | 14 +---
scsi/qemu-pr-helper.c | 4 +-
util/vhost-user-server.c | 27 +++++--
14 files changed, 195 insertions(+), 83 deletions(-)
--
2.41.0
^ permalink raw reply [flat|nested] 11+ messages in thread
* [PATCH 1/2] io: check there are no qio_channel_yield() coroutines during ->finalize()
2023-08-23 23:45 [PATCH 0/2] io: follow coroutine AioContext in qio_channel_yield() Stefan Hajnoczi
@ 2023-08-23 23:45 ` Stefan Hajnoczi
2023-08-24 11:01 ` Daniel P. Berrangé
2023-08-24 18:18 ` Eric Blake
2023-08-23 23:45 ` [PATCH 2/2] io: follow coroutine AioContext in qio_channel_yield() Stefan Hajnoczi
1 sibling, 2 replies; 11+ messages in thread
From: Stefan Hajnoczi @ 2023-08-23 23:45 UTC (permalink / raw)
To: qemu-devel
Cc: Eric Blake, kwolf, Fam Zheng, Juan Quintela,
Vladimir Sementsov-Ogievskiy, Daniel Berrange, Hanna Reitz,
Paolo Bonzini, qemu-block, Leonardo Bras, Coiby Xu, Peter Xu,
Stefan Hajnoczi
Callers must clean up their coroutines before calling
object_unref(OBJECT(ioc)) to prevent an fd handler leak. Add an
assertion to check this.
This patch is preparation for the fd handler changes that follow.
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
io/channel.c | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/io/channel.c b/io/channel.c
index 72f0066af5..c415f3fc88 100644
--- a/io/channel.c
+++ b/io/channel.c
@@ -653,6 +653,10 @@ static void qio_channel_finalize(Object *obj)
{
QIOChannel *ioc = QIO_CHANNEL(obj);
+ /* Must not have coroutines in qio_channel_yield() */
+ assert(!ioc->read_coroutine);
+ assert(!ioc->write_coroutine);
+
g_free(ioc->name);
#ifdef _WIN32
--
2.41.0
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH 2/2] io: follow coroutine AioContext in qio_channel_yield()
2023-08-23 23:45 [PATCH 0/2] io: follow coroutine AioContext in qio_channel_yield() Stefan Hajnoczi
2023-08-23 23:45 ` [PATCH 1/2] io: check there are no qio_channel_yield() coroutines during ->finalize() Stefan Hajnoczi
@ 2023-08-23 23:45 ` Stefan Hajnoczi
2023-08-24 11:26 ` Daniel P. Berrangé
2023-08-24 16:09 ` Fabiano Rosas
1 sibling, 2 replies; 11+ messages in thread
From: Stefan Hajnoczi @ 2023-08-23 23:45 UTC (permalink / raw)
To: qemu-devel
Cc: Eric Blake, kwolf, Fam Zheng, Juan Quintela,
Vladimir Sementsov-Ogievskiy, Daniel Berrange, Hanna Reitz,
Paolo Bonzini, qemu-block, Leonardo Bras, Coiby Xu, Peter Xu,
Stefan Hajnoczi
The ongoing QEMU multi-queue block layer effort makes it possible for multiple
threads to process I/O in parallel. The nbd block driver is not compatible with
the multi-queue block layer yet because QIOChannel cannot be used easily from
coroutines running in multiple threads. This series changes the QIOChannel API
to make that possible.
In the current API, calling qio_channel_attach_aio_context() sets the
AioContext where qio_channel_yield() installs an fd handler prior to yielding:
qio_channel_attach_aio_context(ioc, my_ctx);
...
qio_channel_yield(ioc); // my_ctx is used here
...
qio_channel_detach_aio_context(ioc);
This API design has limitations: reading and writing must be done in the same
AioContext and moving between AioContexts involves a cumbersome sequence of API
calls that is not suitable for doing on a per-request basis.
There is no fundamental reason why a QIOChannel needs to run within the
same AioContext every time qio_channel_yield() is called. QIOChannel
only uses the AioContext while inside qio_channel_yield(). The rest of
the time, QIOChannel is independent of any AioContext.
In the new API, qio_channel_yield() queries the AioContext from the current
coroutine using qemu_coroutine_get_aio_context(). There is no need to
explicitly attach/detach AioContexts anymore and
qio_channel_attach_aio_context() and qio_channel_detach_aio_context() are gone.
One coroutine can read from the QIOChannel while another coroutine writes from
a different AioContext.
This API change allows the nbd block driver to use QIOChannel from any thread.
It's important to keep in mind that the block driver already synchronizes
QIOChannel access and ensures that two coroutines never read simultaneously or
write simultaneously.
This patch updates all users of qio_channel_attach_aio_context() to the
new API. Most conversions are simple, but vhost-user-server requires a
new qemu_coroutine_yield() call to quiesce the vu_client_trip()
coroutine when not attached to any AioContext.
While the API is has become simpler, there is one wart: QIOChannel has a
special case for the iohandler AioContext (used for handlers that must not run
in nested event loops). I didn't find an elegant way preserve that behavior, so
I added a new API called qio_channel_set_follow_coroutine_ctx(ioc, true|false)
for opting in to the new AioContext model. By default QIOChannel uses the
iohandler AioHandler. Code that formerly called
qio_channel_attach_aio_context() now calls
qio_channel_set_follow_coroutine_ctx(ioc, true) once after the QIOChannel is
created.
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
include/io/channel.h | 34 +++++++--
include/qemu/vhost-user-server.h | 1 +
block/nbd.c | 11 +--
io/channel-command.c | 13 +++-
io/channel-file.c | 18 ++++-
io/channel-null.c | 3 +-
io/channel-socket.c | 18 ++++-
io/channel-tls.c | 6 +-
io/channel.c | 120 ++++++++++++++++++++++---------
migration/channel-block.c | 3 +-
nbd/client.c | 2 +-
nbd/server.c | 14 +---
scsi/qemu-pr-helper.c | 4 +-
util/vhost-user-server.c | 27 +++++--
14 files changed, 191 insertions(+), 83 deletions(-)
diff --git a/include/io/channel.h b/include/io/channel.h
index 229bf36910..dfbe6f2931 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -81,9 +81,11 @@ struct QIOChannel {
Object parent;
unsigned int features; /* bitmask of QIOChannelFeatures */
char *name;
- AioContext *ctx;
+ AioContext *read_ctx;
Coroutine *read_coroutine;
+ AioContext *write_ctx;
Coroutine *write_coroutine;
+ bool follow_coroutine_ctx;
#ifdef _WIN32
HANDLE event; /* For use with GSource on Win32 */
#endif
@@ -140,8 +142,9 @@ struct QIOChannelClass {
int whence,
Error **errp);
void (*io_set_aio_fd_handler)(QIOChannel *ioc,
- AioContext *ctx,
+ AioContext *read_ctx,
IOHandler *io_read,
+ AioContext *write_ctx,
IOHandler *io_write,
void *opaque);
int (*io_flush)(QIOChannel *ioc,
@@ -498,6 +501,21 @@ int qio_channel_set_blocking(QIOChannel *ioc,
bool enabled,
Error **errp);
+/**
+ * qio_channel_set_follow_coroutine_ctx:
+ * @ioc: the channel object
+ * @enabled: whether or not to follow the coroutine's AioContext
+ *
+ * If @enabled is true, calls to qio_channel_yield() use the current
+ * coroutine's AioContext. Usually this is desirable.
+ *
+ * If @enabled is false, calls to qio_channel_yield() use the global iohandler
+ * AioContext. This is may be used by coroutines that run in the main loop and
+ * do not wish to respond to I/O during nested event loops. This is the
+ * default for compatibility with code that is not aware of AioContexts.
+ */
+void qio_channel_set_follow_coroutine_ctx(QIOChannel *ioc, bool enabled);
+
/**
* qio_channel_close:
* @ioc: the channel object
@@ -785,8 +803,9 @@ void qio_channel_wait(QIOChannel *ioc,
/**
* qio_channel_set_aio_fd_handler:
* @ioc: the channel object
- * @ctx: the AioContext to set the handlers on
+ * @read_ctx: the AioContext to set the read handler on or NULL
* @io_read: the read handler
+ * @write_ctx: the AioContext to set the write handler on or NULL
* @io_write: the write handler
* @opaque: the opaque value passed to the handler
*
@@ -794,10 +813,17 @@ void qio_channel_wait(QIOChannel *ioc,
* be used by channel implementations to forward the handlers
* to another channel (e.g. from #QIOChannelTLS to the
* underlying socket).
+ *
+ * When @read_ctx is NULL, don't touch the read handler. When @write_ctx is
+ * NULL, don't touch the write handler. Note that setting the read handler
+ * clears the write handler, and vice versa, if they share the same AioContext.
+ * Therefore the caller must pass both handlers together when sharing the same
+ * AioContext.
*/
void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
- AioContext *ctx,
+ AioContext *read_ctx,
IOHandler *io_read,
+ AioContext *write_ctx,
IOHandler *io_write,
void *opaque);
diff --git a/include/qemu/vhost-user-server.h b/include/qemu/vhost-user-server.h
index b1c1cda886..64ad701015 100644
--- a/include/qemu/vhost-user-server.h
+++ b/include/qemu/vhost-user-server.h
@@ -43,6 +43,7 @@ typedef struct {
unsigned int in_flight; /* atomic */
/* Protected by ctx lock */
+ bool in_qio_channel_yield;
bool wait_idle;
VuDev vu_dev;
QIOChannel *ioc; /* The I/O channel with the client */
diff --git a/block/nbd.c b/block/nbd.c
index 5322e66166..cc48580df7 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -352,7 +352,7 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
}
qio_channel_set_blocking(s->ioc, false, NULL);
- qio_channel_attach_aio_context(s->ioc, bdrv_get_aio_context(bs));
+ qio_channel_set_follow_coroutine_ctx(s->ioc, true);
/* successfully connected */
WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
@@ -397,7 +397,6 @@ static void coroutine_fn GRAPH_RDLOCK nbd_reconnect_attempt(BDRVNBDState *s)
/* Finalize previous connection if any */
if (s->ioc) {
- qio_channel_detach_aio_context(s->ioc);
yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
nbd_yank, s->bs);
object_unref(OBJECT(s->ioc));
@@ -2089,10 +2088,6 @@ static void nbd_attach_aio_context(BlockDriverState *bs,
* the reconnect_delay_timer cannot be active here.
*/
assert(!s->reconnect_delay_timer);
-
- if (s->ioc) {
- qio_channel_attach_aio_context(s->ioc, new_context);
- }
}
static void nbd_detach_aio_context(BlockDriverState *bs)
@@ -2101,10 +2096,6 @@ static void nbd_detach_aio_context(BlockDriverState *bs)
assert(!s->open_timer);
assert(!s->reconnect_delay_timer);
-
- if (s->ioc) {
- qio_channel_detach_aio_context(s->ioc);
- }
}
static BlockDriver bdrv_nbd = {
diff --git a/io/channel-command.c b/io/channel-command.c
index 7ed726c802..1f61026222 100644
--- a/io/channel-command.c
+++ b/io/channel-command.c
@@ -331,14 +331,21 @@ static int qio_channel_command_close(QIOChannel *ioc,
static void qio_channel_command_set_aio_fd_handler(QIOChannel *ioc,
- AioContext *ctx,
+ AioContext *read_ctx,
IOHandler *io_read,
+ AioContext *write_ctx,
IOHandler *io_write,
void *opaque)
{
QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
- aio_set_fd_handler(ctx, cioc->readfd, io_read, NULL, NULL, NULL, opaque);
- aio_set_fd_handler(ctx, cioc->writefd, NULL, io_write, NULL, NULL, opaque);
+ if (read_ctx) {
+ aio_set_fd_handler(read_ctx, cioc->readfd, io_read, NULL,
+ NULL, NULL, opaque);
+ }
+ if (write_ctx) {
+ aio_set_fd_handler(write_ctx, cioc->writefd, NULL, io_write,
+ NULL, NULL, opaque);
+ }
}
diff --git a/io/channel-file.c b/io/channel-file.c
index 8b5821f452..e6c6329dbb 100644
--- a/io/channel-file.c
+++ b/io/channel-file.c
@@ -192,13 +192,27 @@ static int qio_channel_file_close(QIOChannel *ioc,
static void qio_channel_file_set_aio_fd_handler(QIOChannel *ioc,
- AioContext *ctx,
+ AioContext *read_ctx,
IOHandler *io_read,
+ AioContext *write_ctx,
IOHandler *io_write,
void *opaque)
{
QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
- aio_set_fd_handler(ctx, fioc->fd, io_read, io_write, NULL, NULL, opaque);
+
+ if (read_ctx == write_ctx) {
+ aio_set_fd_handler(read_ctx, fioc->fd, io_read, io_write,
+ NULL, NULL, opaque);
+ } else {
+ if (read_ctx) {
+ aio_set_fd_handler(read_ctx, fioc->fd, io_read, NULL,
+ NULL, NULL, opaque);
+ }
+ if (write_ctx) {
+ aio_set_fd_handler(write_ctx, fioc->fd, NULL, io_write,
+ NULL, NULL, opaque);
+ }
+ }
}
static GSource *qio_channel_file_create_watch(QIOChannel *ioc,
diff --git a/io/channel-null.c b/io/channel-null.c
index 4fafdb770d..ef99586348 100644
--- a/io/channel-null.c
+++ b/io/channel-null.c
@@ -128,8 +128,9 @@ qio_channel_null_close(QIOChannel *ioc,
static void
qio_channel_null_set_aio_fd_handler(QIOChannel *ioc G_GNUC_UNUSED,
- AioContext *ctx G_GNUC_UNUSED,
+ AioContext *read_ctx G_GNUC_UNUSED,
IOHandler *io_read G_GNUC_UNUSED,
+ AioContext *write_ctx G_GNUC_UNUSED,
IOHandler *io_write G_GNUC_UNUSED,
void *opaque G_GNUC_UNUSED)
{
diff --git a/io/channel-socket.c b/io/channel-socket.c
index d99945ebec..daeb92bbe0 100644
--- a/io/channel-socket.c
+++ b/io/channel-socket.c
@@ -893,13 +893,27 @@ qio_channel_socket_shutdown(QIOChannel *ioc,
}
static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
- AioContext *ctx,
+ AioContext *read_ctx,
IOHandler *io_read,
+ AioContext *write_ctx,
IOHandler *io_write,
void *opaque)
{
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
- aio_set_fd_handler(ctx, sioc->fd, io_read, io_write, NULL, NULL, opaque);
+
+ if (read_ctx == write_ctx) {
+ aio_set_fd_handler(read_ctx, sioc->fd, io_read, io_write,
+ NULL, NULL, opaque);
+ } else {
+ if (read_ctx) {
+ aio_set_fd_handler(read_ctx, sioc->fd, io_read, NULL,
+ NULL, NULL, opaque);
+ }
+ if (write_ctx) {
+ aio_set_fd_handler(write_ctx, sioc->fd, NULL, io_write,
+ NULL, NULL, opaque);
+ }
+ }
}
static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,
diff --git a/io/channel-tls.c b/io/channel-tls.c
index 847d5297c3..58fe1aceee 100644
--- a/io/channel-tls.c
+++ b/io/channel-tls.c
@@ -388,14 +388,16 @@ static int qio_channel_tls_close(QIOChannel *ioc,
}
static void qio_channel_tls_set_aio_fd_handler(QIOChannel *ioc,
- AioContext *ctx,
+ AioContext *read_ctx,
IOHandler *io_read,
+ AioContext *write_ctx,
IOHandler *io_write,
void *opaque)
{
QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);
- qio_channel_set_aio_fd_handler(tioc->master, ctx, io_read, io_write, opaque);
+ qio_channel_set_aio_fd_handler(tioc->master, read_ctx, io_read,
+ write_ctx, io_write, opaque);
}
typedef struct QIOChannelTLSSource QIOChannelTLSSource;
diff --git a/io/channel.c b/io/channel.c
index c415f3fc88..b190d593d3 100644
--- a/io/channel.c
+++ b/io/channel.c
@@ -365,6 +365,12 @@ int qio_channel_set_blocking(QIOChannel *ioc,
}
+void qio_channel_set_follow_coroutine_ctx(QIOChannel *ioc, bool enabled)
+{
+ ioc->follow_coroutine_ctx = enabled;
+}
+
+
int qio_channel_close(QIOChannel *ioc,
Error **errp)
{
@@ -388,14 +394,16 @@ GSource *qio_channel_create_watch(QIOChannel *ioc,
void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
- AioContext *ctx,
+ AioContext *read_ctx,
IOHandler *io_read,
+ AioContext *write_ctx,
IOHandler *io_write,
void *opaque)
{
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
- klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
+ klass->io_set_aio_fd_handler(ioc, read_ctx, io_read, write_ctx, io_write,
+ opaque);
}
guint qio_channel_add_watch_full(QIOChannel *ioc,
@@ -542,56 +550,101 @@ static void qio_channel_restart_write(void *opaque)
aio_co_wake(co);
}
-static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
+static void coroutine_fn
+qio_channel_set_fd_handlers(QIOChannel *ioc, GIOCondition condition)
{
- IOHandler *rd_handler = NULL, *wr_handler = NULL;
+ AioContext *ctx = ioc->follow_coroutine_ctx ?
+ qemu_coroutine_get_aio_context(qemu_coroutine_self()) :
+ iohandler_get_aio_context();
+ AioContext *read_ctx = NULL;
+ IOHandler *io_read = NULL;
+ AioContext *write_ctx = NULL;
+ IOHandler *io_write = NULL;
+
+ if (condition == G_IO_IN) {
+ ioc->read_coroutine = qemu_coroutine_self();
+ ioc->read_ctx = ctx;
+ read_ctx = ctx;
+ io_read = qio_channel_restart_read;
+
+ /*
+ * Thread safety: if the other coroutine is set and its AioContext
+ * match ours, then there is mutual exclusion between read and write
+ * because they share a single thread and it's safe to set both read
+ * and write fd handlers here. If the AioContext does not match ours,
+ * then both threads may run in parallel but there is no shared state
+ * to worry about.
+ */
+ if (ioc->write_coroutine && ioc->write_ctx == ctx) {
+ write_ctx = ctx;
+ io_write = qio_channel_restart_write;
+ }
+ } else if (condition == G_IO_OUT) {
+ ioc->write_coroutine = qemu_coroutine_self();
+ ioc->write_ctx = ctx;
+ write_ctx = ctx;
+ io_write = qio_channel_restart_write;
+ if (ioc->read_coroutine && ioc->read_ctx == ctx) {
+ read_ctx = ctx;
+ io_read = qio_channel_restart_read;
+ }
+ } else {
+ abort();
+ }
+
+ qio_channel_set_aio_fd_handler(ioc, read_ctx, io_read,
+ write_ctx, io_write, ioc);
+}
+
+static void coroutine_fn
+qio_channel_clear_fd_handlers(QIOChannel *ioc, GIOCondition condition)
+{
+ AioContext *read_ctx = NULL;
+ IOHandler *io_read = NULL;
+ AioContext *write_ctx = NULL;
+ IOHandler *io_write = NULL;
AioContext *ctx;
- if (ioc->read_coroutine) {
- rd_handler = qio_channel_restart_read;
+ if (condition == G_IO_IN) {
+ ctx = ioc->read_ctx;
+ read_ctx = ctx;
+ io_read = NULL;
+ if (ioc->write_coroutine && ioc->write_ctx == ctx) {
+ write_ctx = ctx;
+ io_write = qio_channel_restart_write;
+ }
+ } else if (condition == G_IO_OUT) {
+ ctx = ioc->write_ctx;
+ write_ctx = ctx;
+ io_write = NULL;
+ if (ioc->read_coroutine && ioc->read_ctx == ctx) {
+ read_ctx = ctx;
+ io_read = qio_channel_restart_read;
+ }
+ } else {
+ abort();
}
- if (ioc->write_coroutine) {
- wr_handler = qio_channel_restart_write;
- }
-
- ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
- qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc);
-}
-
-void qio_channel_attach_aio_context(QIOChannel *ioc,
- AioContext *ctx)
-{
- assert(!ioc->read_coroutine);
- assert(!ioc->write_coroutine);
- ioc->ctx = ctx;
-}
-void qio_channel_detach_aio_context(QIOChannel *ioc)
-{
- ioc->read_coroutine = NULL;
- ioc->write_coroutine = NULL;
- qio_channel_set_aio_fd_handlers(ioc);
- ioc->ctx = NULL;
+ qio_channel_set_aio_fd_handler(ioc, read_ctx, io_read,
+ write_ctx, io_write, ioc);
}
void coroutine_fn qio_channel_yield(QIOChannel *ioc,
GIOCondition condition)
{
- AioContext *ioc_ctx = ioc->ctx ?: qemu_get_aio_context();
+ AioContext *ioc_ctx;
assert(qemu_in_coroutine());
- assert(in_aio_context_home_thread(ioc_ctx));
+ ioc_ctx = qemu_coroutine_get_aio_context(qemu_coroutine_self());
if (condition == G_IO_IN) {
assert(!ioc->read_coroutine);
- ioc->read_coroutine = qemu_coroutine_self();
} else if (condition == G_IO_OUT) {
assert(!ioc->write_coroutine);
- ioc->write_coroutine = qemu_coroutine_self();
} else {
abort();
}
- qio_channel_set_aio_fd_handlers(ioc);
+ qio_channel_set_fd_handlers(ioc, condition);
qemu_coroutine_yield();
assert(in_aio_context_home_thread(ioc_ctx));
@@ -599,11 +652,10 @@ void coroutine_fn qio_channel_yield(QIOChannel *ioc,
* through the aio_fd_handlers. */
if (condition == G_IO_IN) {
assert(ioc->read_coroutine == NULL);
- qio_channel_set_aio_fd_handlers(ioc);
} else if (condition == G_IO_OUT) {
assert(ioc->write_coroutine == NULL);
- qio_channel_set_aio_fd_handlers(ioc);
}
+ qio_channel_clear_fd_handlers(ioc, condition);
}
void qio_channel_wake_read(QIOChannel *ioc)
diff --git a/migration/channel-block.c b/migration/channel-block.c
index b7374363c3..fff8d87094 100644
--- a/migration/channel-block.c
+++ b/migration/channel-block.c
@@ -158,8 +158,9 @@ qio_channel_block_close(QIOChannel *ioc,
static void
qio_channel_block_set_aio_fd_handler(QIOChannel *ioc,
- AioContext *ctx,
+ AioContext *read_ctx,
IOHandler *io_read,
+ AioContext *write_ctx,
IOHandler *io_write,
void *opaque)
{
diff --git a/nbd/client.c b/nbd/client.c
index 479208d5d9..81877d088d 100644
--- a/nbd/client.c
+++ b/nbd/client.c
@@ -948,7 +948,7 @@ static int nbd_start_negotiate(AioContext *aio_context, QIOChannel *ioc,
ioc = *outioc;
if (aio_context) {
qio_channel_set_blocking(ioc, false, NULL);
- qio_channel_attach_aio_context(ioc, aio_context);
+ qio_channel_set_follow_coroutine_ctx(ioc, true);
}
} else {
error_setg(errp, "Server does not support STARTTLS");
diff --git a/nbd/server.c b/nbd/server.c
index 8486b64b15..b5f93a20c9 100644
--- a/nbd/server.c
+++ b/nbd/server.c
@@ -1333,6 +1333,7 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
*/
qio_channel_set_blocking(client->ioc, false, NULL);
+ qio_channel_set_follow_coroutine_ctx(client->ioc, true);
trace_nbd_negotiate_begin();
memcpy(buf, "NBDMAGIC", 8);
@@ -1352,11 +1353,6 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
return ret;
}
- /* Attach the channel to the same AioContext as the export */
- if (client->exp && client->exp->common.ctx) {
- qio_channel_attach_aio_context(client->ioc, client->exp->common.ctx);
- }
-
assert(!client->optlen);
trace_nbd_negotiate_success();
@@ -1465,7 +1461,6 @@ void nbd_client_put(NBDClient *client)
*/
assert(client->closing);
- qio_channel_detach_aio_context(client->ioc);
object_unref(OBJECT(client->sioc));
object_unref(OBJECT(client->ioc));
if (client->tlscreds) {
@@ -1544,8 +1539,6 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
exp->common.ctx = ctx;
QTAILQ_FOREACH(client, &exp->clients, next) {
- qio_channel_attach_aio_context(client->ioc, ctx);
-
assert(client->nb_requests == 0);
assert(client->recv_coroutine == NULL);
assert(client->send_coroutine == NULL);
@@ -1555,14 +1548,9 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
static void blk_aio_detach(void *opaque)
{
NBDExport *exp = opaque;
- NBDClient *client;
trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
- QTAILQ_FOREACH(client, &exp->clients, next) {
- qio_channel_detach_aio_context(client->ioc);
- }
-
exp->common.ctx = NULL;
}
diff --git a/scsi/qemu-pr-helper.c b/scsi/qemu-pr-helper.c
index ae44a816e1..c6c6347e9b 100644
--- a/scsi/qemu-pr-helper.c
+++ b/scsi/qemu-pr-helper.c
@@ -735,8 +735,7 @@ static void coroutine_fn prh_co_entry(void *opaque)
qio_channel_set_blocking(QIO_CHANNEL(client->ioc),
false, NULL);
- qio_channel_attach_aio_context(QIO_CHANNEL(client->ioc),
- qemu_get_aio_context());
+ qio_channel_set_follow_coroutine_ctx(QIO_CHANNEL(client->ioc), true);
/* A very simple negotiation for future extensibility. No features
* are defined so write 0.
@@ -796,7 +795,6 @@ static void coroutine_fn prh_co_entry(void *opaque)
}
out:
- qio_channel_detach_aio_context(QIO_CHANNEL(client->ioc));
object_unref(OBJECT(client->ioc));
g_free(client);
}
diff --git a/util/vhost-user-server.c b/util/vhost-user-server.c
index cd17fb5326..b4b6bf30a2 100644
--- a/util/vhost-user-server.c
+++ b/util/vhost-user-server.c
@@ -127,7 +127,14 @@ vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg)
if (rc < 0) {
if (rc == QIO_CHANNEL_ERR_BLOCK) {
assert(local_err == NULL);
- qio_channel_yield(ioc, G_IO_IN);
+ if (server->ctx) {
+ server->in_qio_channel_yield = true;
+ qio_channel_yield(ioc, G_IO_IN);
+ server->in_qio_channel_yield = false;
+ } else {
+ /* Wait until attached to an AioContext again */
+ qemu_coroutine_yield();
+ }
continue;
} else {
error_report_err(local_err);
@@ -278,7 +285,7 @@ set_watch(VuDev *vu_dev, int fd, int vu_evt,
vu_fd_watch->fd = fd;
vu_fd_watch->cb = cb;
qemu_socket_set_nonblock(fd);
- aio_set_fd_handler(server->ioc->ctx, fd, kick_handler,
+ aio_set_fd_handler(server->ctx, fd, kick_handler,
NULL, NULL, NULL, vu_fd_watch);
vu_fd_watch->vu_dev = vu_dev;
vu_fd_watch->pvt = pvt;
@@ -299,7 +306,7 @@ static void remove_watch(VuDev *vu_dev, int fd)
if (!vu_fd_watch) {
return;
}
- aio_set_fd_handler(server->ioc->ctx, fd, NULL, NULL, NULL, NULL, NULL);
+ aio_set_fd_handler(server->ctx, fd, NULL, NULL, NULL, NULL, NULL);
QTAILQ_REMOVE(&server->vu_fd_watches, vu_fd_watch, next);
g_free(vu_fd_watch);
@@ -344,6 +351,8 @@ static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
/* TODO vu_message_write() spins if non-blocking! */
qio_channel_set_blocking(server->ioc, false, NULL);
+ qio_channel_set_follow_coroutine_ctx(server->ioc, true);
+
server->co_trip = qemu_coroutine_create(vu_client_trip, server);
aio_context_acquire(server->ctx);
@@ -399,13 +408,12 @@ void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx)
return;
}
- qio_channel_attach_aio_context(server->ioc, ctx);
-
QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
aio_set_fd_handler(ctx, vu_fd_watch->fd, kick_handler, NULL,
NULL, NULL, vu_fd_watch);
}
+ assert(!server->in_qio_channel_yield);
aio_co_schedule(ctx, server->co_trip);
}
@@ -419,11 +427,16 @@ void vhost_user_server_detach_aio_context(VuServer *server)
aio_set_fd_handler(server->ctx, vu_fd_watch->fd,
NULL, NULL, NULL, NULL, vu_fd_watch);
}
-
- qio_channel_detach_aio_context(server->ioc);
}
server->ctx = NULL;
+
+ if (server->ioc) {
+ if (server->in_qio_channel_yield) {
+ /* Stop receiving the next vhost-user message */
+ qio_channel_wake_read(server->ioc);
+ }
+ }
}
bool vhost_user_server_start(VuServer *server,
--
2.41.0
^ permalink raw reply related [flat|nested] 11+ messages in thread
* Re: [PATCH 1/2] io: check there are no qio_channel_yield() coroutines during ->finalize()
2023-08-23 23:45 ` [PATCH 1/2] io: check there are no qio_channel_yield() coroutines during ->finalize() Stefan Hajnoczi
@ 2023-08-24 11:01 ` Daniel P. Berrangé
2023-08-24 18:18 ` Eric Blake
1 sibling, 0 replies; 11+ messages in thread
From: Daniel P. Berrangé @ 2023-08-24 11:01 UTC (permalink / raw)
To: Stefan Hajnoczi
Cc: qemu-devel, Eric Blake, kwolf, Fam Zheng, Juan Quintela,
Vladimir Sementsov-Ogievskiy, Hanna Reitz, Paolo Bonzini,
qemu-block, Leonardo Bras, Coiby Xu, Peter Xu
On Wed, Aug 23, 2023 at 07:45:03PM -0400, Stefan Hajnoczi wrote:
> Callers must clean up their coroutines before calling
> object_unref(OBJECT(ioc)) to prevent an fd handler leak. Add an
> assertion to check this.
>
> This patch is preparation for the fd handler changes that follow.
>
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
> io/channel.c | 4 ++++
> 1 file changed, 4 insertions(+)
Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>
With regards,
Daniel
--
|: https://berrange.com -o- https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org -o- https://fstop138.berrange.com :|
|: https://entangle-photo.org -o- https://www.instagram.com/dberrange :|
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [PATCH 2/2] io: follow coroutine AioContext in qio_channel_yield()
2023-08-23 23:45 ` [PATCH 2/2] io: follow coroutine AioContext in qio_channel_yield() Stefan Hajnoczi
@ 2023-08-24 11:26 ` Daniel P. Berrangé
2023-08-24 17:07 ` Stefan Hajnoczi
2023-08-24 18:26 ` Stefan Hajnoczi
2023-08-24 16:09 ` Fabiano Rosas
1 sibling, 2 replies; 11+ messages in thread
From: Daniel P. Berrangé @ 2023-08-24 11:26 UTC (permalink / raw)
To: Stefan Hajnoczi
Cc: qemu-devel, Eric Blake, kwolf, Fam Zheng, Juan Quintela,
Vladimir Sementsov-Ogievskiy, Hanna Reitz, Paolo Bonzini,
qemu-block, Leonardo Bras, Coiby Xu, Peter Xu
On Wed, Aug 23, 2023 at 07:45:04PM -0400, Stefan Hajnoczi wrote:
> The ongoing QEMU multi-queue block layer effort makes it possible for multiple
> threads to process I/O in parallel. The nbd block driver is not compatible with
> the multi-queue block layer yet because QIOChannel cannot be used easily from
> coroutines running in multiple threads. This series changes the QIOChannel API
> to make that possible.
>
> In the current API, calling qio_channel_attach_aio_context() sets the
> AioContext where qio_channel_yield() installs an fd handler prior to yielding:
>
> qio_channel_attach_aio_context(ioc, my_ctx);
> ...
> qio_channel_yield(ioc); // my_ctx is used here
> ...
> qio_channel_detach_aio_context(ioc);
>
> This API design has limitations: reading and writing must be done in the same
> AioContext and moving between AioContexts involves a cumbersome sequence of API
> calls that is not suitable for doing on a per-request basis.
>
> There is no fundamental reason why a QIOChannel needs to run within the
> same AioContext every time qio_channel_yield() is called. QIOChannel
> only uses the AioContext while inside qio_channel_yield(). The rest of
> the time, QIOChannel is independent of any AioContext.
>
> In the new API, qio_channel_yield() queries the AioContext from the current
> coroutine using qemu_coroutine_get_aio_context(). There is no need to
> explicitly attach/detach AioContexts anymore and
> qio_channel_attach_aio_context() and qio_channel_detach_aio_context() are gone.
> One coroutine can read from the QIOChannel while another coroutine writes from
> a different AioContext.
>
> This API change allows the nbd block driver to use QIOChannel from any thread.
> It's important to keep in mind that the block driver already synchronizes
> QIOChannel access and ensures that two coroutines never read simultaneously or
> write simultaneously.
>
> This patch updates all users of qio_channel_attach_aio_context() to the
> new API. Most conversions are simple, but vhost-user-server requires a
> new qemu_coroutine_yield() call to quiesce the vu_client_trip()
> coroutine when not attached to any AioContext.
>
> While the API is has become simpler, there is one wart: QIOChannel has a
> special case for the iohandler AioContext (used for handlers that must not run
> in nested event loops). I didn't find an elegant way preserve that behavior, so
> I added a new API called qio_channel_set_follow_coroutine_ctx(ioc, true|false)
> for opting in to the new AioContext model. By default QIOChannel uses the
> iohandler AioHandler. Code that formerly called
> qio_channel_attach_aio_context() now calls
> qio_channel_set_follow_coroutine_ctx(ioc, true) once after the QIOChannel is
> created.
I wonder if it is better to just pass the AioContext object into
qio_channel_yield explicitly eg have
qio_channel_yield(QIOChannel *ioc,
AioContext *ctx,
GIOCondition cond);
With semantics that if 'ctx == NULL', then we assume the default
global iohandler context, and for non-default context it must
be non-NULL ?
That could nicely de-couple the API from relying on global
coroutine/thread state for querying an AioContext, which
makes it easier to reason about IMHO.
>
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
> include/io/channel.h | 34 +++++++--
> include/qemu/vhost-user-server.h | 1 +
> block/nbd.c | 11 +--
> io/channel-command.c | 13 +++-
> io/channel-file.c | 18 ++++-
> io/channel-null.c | 3 +-
> io/channel-socket.c | 18 ++++-
> io/channel-tls.c | 6 +-
> io/channel.c | 120 ++++++++++++++++++++++---------
> migration/channel-block.c | 3 +-
> nbd/client.c | 2 +-
> nbd/server.c | 14 +---
> scsi/qemu-pr-helper.c | 4 +-
> util/vhost-user-server.c | 27 +++++--
> 14 files changed, 191 insertions(+), 83 deletions(-)
>
> diff --git a/include/io/channel.h b/include/io/channel.h
> index 229bf36910..dfbe6f2931 100644
> --- a/include/io/channel.h
> +++ b/include/io/channel.h
> @@ -81,9 +81,11 @@ struct QIOChannel {
> Object parent;
> unsigned int features; /* bitmask of QIOChannelFeatures */
> char *name;
> - AioContext *ctx;
> + AioContext *read_ctx;
> Coroutine *read_coroutine;
> + AioContext *write_ctx;
> Coroutine *write_coroutine;
> + bool follow_coroutine_ctx;
> #ifdef _WIN32
> HANDLE event; /* For use with GSource on Win32 */
> #endif
> @@ -140,8 +142,9 @@ struct QIOChannelClass {
> int whence,
> Error **errp);
> void (*io_set_aio_fd_handler)(QIOChannel *ioc,
> - AioContext *ctx,
> + AioContext *read_ctx,
> IOHandler *io_read,
> + AioContext *write_ctx,
> IOHandler *io_write,
> void *opaque);
> int (*io_flush)(QIOChannel *ioc,
> @@ -498,6 +501,21 @@ int qio_channel_set_blocking(QIOChannel *ioc,
> bool enabled,
> Error **errp);
>
> +/**
> + * qio_channel_set_follow_coroutine_ctx:
> + * @ioc: the channel object
> + * @enabled: whether or not to follow the coroutine's AioContext
> + *
> + * If @enabled is true, calls to qio_channel_yield() use the current
> + * coroutine's AioContext. Usually this is desirable.
> + *
> + * If @enabled is false, calls to qio_channel_yield() use the global iohandler
> + * AioContext. This is may be used by coroutines that run in the main loop and
> + * do not wish to respond to I/O during nested event loops. This is the
> + * default for compatibility with code that is not aware of AioContexts.
> + */
> +void qio_channel_set_follow_coroutine_ctx(QIOChannel *ioc, bool enabled);
> +
> /**
> * qio_channel_close:
> * @ioc: the channel object
> @@ -785,8 +803,9 @@ void qio_channel_wait(QIOChannel *ioc,
> /**
> * qio_channel_set_aio_fd_handler:
> * @ioc: the channel object
> - * @ctx: the AioContext to set the handlers on
> + * @read_ctx: the AioContext to set the read handler on or NULL
> * @io_read: the read handler
> + * @write_ctx: the AioContext to set the write handler on or NULL
> * @io_write: the write handler
> * @opaque: the opaque value passed to the handler
> *
> @@ -794,10 +813,17 @@ void qio_channel_wait(QIOChannel *ioc,
> * be used by channel implementations to forward the handlers
> * to another channel (e.g. from #QIOChannelTLS to the
> * underlying socket).
> + *
> + * When @read_ctx is NULL, don't touch the read handler. When @write_ctx is
> + * NULL, don't touch the write handler. Note that setting the read handler
> + * clears the write handler, and vice versa, if they share the same AioContext.
> + * Therefore the caller must pass both handlers together when sharing the same
> + * AioContext.
> */
> void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
> - AioContext *ctx,
> + AioContext *read_ctx,
> IOHandler *io_read,
> + AioContext *write_ctx,
> IOHandler *io_write,
> void *opaque);
>
Need to drop the qio_channel_attach_aio_context / qio_channel_detach_aio_context
methods from the header too.
> diff --git a/io/channel-command.c b/io/channel-command.c
> index 7ed726c802..1f61026222 100644
> --- a/io/channel-command.c
> +++ b/io/channel-command.c
> @@ -331,14 +331,21 @@ static int qio_channel_command_close(QIOChannel *ioc,
>
>
> static void qio_channel_command_set_aio_fd_handler(QIOChannel *ioc,
> - AioContext *ctx,
> + AioContext *read_ctx,
> IOHandler *io_read,
> + AioContext *write_ctx,
> IOHandler *io_write,
> void *opaque)
> {
> QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
> - aio_set_fd_handler(ctx, cioc->readfd, io_read, NULL, NULL, NULL, opaque);
> - aio_set_fd_handler(ctx, cioc->writefd, NULL, io_write, NULL, NULL, opaque);
> + if (read_ctx) {
> + aio_set_fd_handler(read_ctx, cioc->readfd, io_read, NULL,
> + NULL, NULL, opaque);
> + }
> + if (write_ctx) {
> + aio_set_fd_handler(write_ctx, cioc->writefd, NULL, io_write,
> + NULL, NULL, opaque);
> + }
> }
>
>
> diff --git a/io/channel-file.c b/io/channel-file.c
> index 8b5821f452..e6c6329dbb 100644
> --- a/io/channel-file.c
> +++ b/io/channel-file.c
> @@ -192,13 +192,27 @@ static int qio_channel_file_close(QIOChannel *ioc,
>
>
> static void qio_channel_file_set_aio_fd_handler(QIOChannel *ioc,
> - AioContext *ctx,
> + AioContext *read_ctx,
> IOHandler *io_read,
> + AioContext *write_ctx,
> IOHandler *io_write,
> void *opaque)
> {
> QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
> - aio_set_fd_handler(ctx, fioc->fd, io_read, io_write, NULL, NULL, opaque);
> +
> + if (read_ctx == write_ctx) {
> + aio_set_fd_handler(read_ctx, fioc->fd, io_read, io_write,
> + NULL, NULL, opaque);
> + } else {
> + if (read_ctx) {
> + aio_set_fd_handler(read_ctx, fioc->fd, io_read, NULL,
> + NULL, NULL, opaque);
> + }
> + if (write_ctx) {
> + aio_set_fd_handler(write_ctx, fioc->fd, NULL, io_write,
> + NULL, NULL, opaque);
> + }
> + }
> }
>
> static GSource *qio_channel_file_create_watch(QIOChannel *ioc,
> diff --git a/io/channel-null.c b/io/channel-null.c
> index 4fafdb770d..ef99586348 100644
> --- a/io/channel-null.c
> +++ b/io/channel-null.c
> @@ -128,8 +128,9 @@ qio_channel_null_close(QIOChannel *ioc,
>
> static void
> qio_channel_null_set_aio_fd_handler(QIOChannel *ioc G_GNUC_UNUSED,
> - AioContext *ctx G_GNUC_UNUSED,
> + AioContext *read_ctx G_GNUC_UNUSED,
> IOHandler *io_read G_GNUC_UNUSED,
> + AioContext *write_ctx G_GNUC_UNUSED,
> IOHandler *io_write G_GNUC_UNUSED,
> void *opaque G_GNUC_UNUSED)
> {
> diff --git a/io/channel-socket.c b/io/channel-socket.c
> index d99945ebec..daeb92bbe0 100644
> --- a/io/channel-socket.c
> +++ b/io/channel-socket.c
> @@ -893,13 +893,27 @@ qio_channel_socket_shutdown(QIOChannel *ioc,
> }
>
> static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
> - AioContext *ctx,
> + AioContext *read_ctx,
> IOHandler *io_read,
> + AioContext *write_ctx,
> IOHandler *io_write,
> void *opaque)
> {
> QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> - aio_set_fd_handler(ctx, sioc->fd, io_read, io_write, NULL, NULL, opaque);
> +
> + if (read_ctx == write_ctx) {
> + aio_set_fd_handler(read_ctx, sioc->fd, io_read, io_write,
> + NULL, NULL, opaque);
> + } else {
> + if (read_ctx) {
> + aio_set_fd_handler(read_ctx, sioc->fd, io_read, NULL,
> + NULL, NULL, opaque);
> + }
> + if (write_ctx) {
> + aio_set_fd_handler(write_ctx, sioc->fd, NULL, io_write,
> + NULL, NULL, opaque);
> + }
> + }
> }
>
> static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,
The file, command and socket impls all have fairly similar logic, and
could be handled by calling out to a common helper in channel-util.c
along the lines of this:
void qio_channel_util_set_aio_fd_handler(int read_fd,
AioContext *read_ctx,
IOHandler *io_read,
int write_fd,
AioContext *write_ctx,
IOHandler *io_write,
void *opaque)
{
if (read_fd == write_fd && read_ctx == write_ctx) {
aio_set_fd_handler(read_ctx, read_fd, io_read, io_write,
NULL, NULL, opaque);
} else {
if (read_ctx) {
aio_set_fd_handler(read_ctx, read_fd, io_read, NULL,
NULL, NULL, opaque);
}
if (write_ctx) {
aio_set_fd_handler(write_ctx, write_fd, NULL, io_write,
NULL, NULL, opaque);
}
}
}
> diff --git a/io/channel.c b/io/channel.c
> index c415f3fc88..b190d593d3 100644
> --- a/io/channel.c
> +++ b/io/channel.c
> @@ -365,6 +365,12 @@ int qio_channel_set_blocking(QIOChannel *ioc,
> }
>
>
> +void qio_channel_set_follow_coroutine_ctx(QIOChannel *ioc, bool enabled)
> +{
> + ioc->follow_coroutine_ctx = enabled;
> +}
> +
> +
> int qio_channel_close(QIOChannel *ioc,
> Error **errp)
> {
> @@ -542,56 +550,101 @@ static void qio_channel_restart_write(void *opaque)
> aio_co_wake(co);
> }
>
> -static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
> +static void coroutine_fn
> +qio_channel_set_fd_handlers(QIOChannel *ioc, GIOCondition condition)
> {
> - IOHandler *rd_handler = NULL, *wr_handler = NULL;
> + AioContext *ctx = ioc->follow_coroutine_ctx ?
> + qemu_coroutine_get_aio_context(qemu_coroutine_self()) :
> + iohandler_get_aio_context();
This is conditionally calling qemu_coroutine_get_aio_context
or iohandler_get_aio_context, but in qio_channel_yield, we
don't look at 'follow_coroutine_ctx' and unconditionally use
qemu_coroutine_get_aio_context. Is that correct ?
Should we not just pass in the AioContext directly from
qio_channel_yield to ensure consistency ?
> + AioContext *read_ctx = NULL;
> + IOHandler *io_read = NULL;
> + AioContext *write_ctx = NULL;
> + IOHandler *io_write = NULL;
> +
> + if (condition == G_IO_IN) {
> + ioc->read_coroutine = qemu_coroutine_self();
> + ioc->read_ctx = ctx;
> + read_ctx = ctx;
> + io_read = qio_channel_restart_read;
> +
> + /*
> + * Thread safety: if the other coroutine is set and its AioContext
> + * match ours, then there is mutual exclusion between read and write
> + * because they share a single thread and it's safe to set both read
> + * and write fd handlers here. If the AioContext does not match ours,
> + * then both threads may run in parallel but there is no shared state
> + * to worry about.
> + */
> + if (ioc->write_coroutine && ioc->write_ctx == ctx) {
> + write_ctx = ctx;
> + io_write = qio_channel_restart_write;
> + }
> + } else if (condition == G_IO_OUT) {
> + ioc->write_coroutine = qemu_coroutine_self();
> + ioc->write_ctx = ctx;
> + write_ctx = ctx;
> + io_write = qio_channel_restart_write;
> + if (ioc->read_coroutine && ioc->read_ctx == ctx) {
> + read_ctx = ctx;
> + io_read = qio_channel_restart_read;
> + }
> + } else {
> + abort();
> + }
> +
> + qio_channel_set_aio_fd_handler(ioc, read_ctx, io_read,
> + write_ctx, io_write, ioc);
> +}
snip
> void coroutine_fn qio_channel_yield(QIOChannel *ioc,
> GIOCondition condition)
> {
> - AioContext *ioc_ctx = ioc->ctx ?: qemu_get_aio_context();
> + AioContext *ioc_ctx;
>
> assert(qemu_in_coroutine());
> - assert(in_aio_context_home_thread(ioc_ctx));
> + ioc_ctx = qemu_coroutine_get_aio_context(qemu_coroutine_self());
>
> if (condition == G_IO_IN) {
> assert(!ioc->read_coroutine);
> - ioc->read_coroutine = qemu_coroutine_self();
> } else if (condition == G_IO_OUT) {
> assert(!ioc->write_coroutine);
> - ioc->write_coroutine = qemu_coroutine_self();
> } else {
> abort();
> }
> - qio_channel_set_aio_fd_handlers(ioc);
> + qio_channel_set_fd_handlers(ioc, condition);
> qemu_coroutine_yield();
> assert(in_aio_context_home_thread(ioc_ctx));
>
> @@ -599,11 +652,10 @@ void coroutine_fn qio_channel_yield(QIOChannel *ioc,
> * through the aio_fd_handlers. */
> if (condition == G_IO_IN) {
> assert(ioc->read_coroutine == NULL);
> - qio_channel_set_aio_fd_handlers(ioc);
> } else if (condition == G_IO_OUT) {
> assert(ioc->write_coroutine == NULL);
> - qio_channel_set_aio_fd_handlers(ioc);
> }
> + qio_channel_clear_fd_handlers(ioc, condition);
> }
>
> void qio_channel_wake_read(QIOChannel *ioc)
With regards,
Daniel
--
|: https://berrange.com -o- https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org -o- https://fstop138.berrange.com :|
|: https://entangle-photo.org -o- https://www.instagram.com/dberrange :|
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [PATCH 2/2] io: follow coroutine AioContext in qio_channel_yield()
2023-08-23 23:45 ` [PATCH 2/2] io: follow coroutine AioContext in qio_channel_yield() Stefan Hajnoczi
2023-08-24 11:26 ` Daniel P. Berrangé
@ 2023-08-24 16:09 ` Fabiano Rosas
2023-08-24 17:29 ` Stefan Hajnoczi
1 sibling, 1 reply; 11+ messages in thread
From: Fabiano Rosas @ 2023-08-24 16:09 UTC (permalink / raw)
To: Stefan Hajnoczi, qemu-devel
Cc: Eric Blake, kwolf, Fam Zheng, Juan Quintela,
Vladimir Sementsov-Ogievskiy, Daniel Berrange, Hanna Reitz,
Paolo Bonzini, qemu-block, Leonardo Bras, Coiby Xu, Peter Xu,
Stefan Hajnoczi
Stefan Hajnoczi <stefanha@redhat.com> writes:
> @@ -2089,10 +2088,6 @@ static void nbd_attach_aio_context(BlockDriverState *bs,
> * the reconnect_delay_timer cannot be active here.
> */
> assert(!s->reconnect_delay_timer);
> -
> - if (s->ioc) {
> - qio_channel_attach_aio_context(s->ioc, new_context);
> - }
> }
>
> static void nbd_detach_aio_context(BlockDriverState *bs)
> @@ -2101,10 +2096,6 @@ static void nbd_detach_aio_context(BlockDriverState *bs)
>
> assert(!s->open_timer);
> assert(!s->reconnect_delay_timer);
> -
> - if (s->ioc) {
> - qio_channel_detach_aio_context(s->ioc);
> - }
> }
The whole attach/detach functions could go away.
>
> static BlockDriver bdrv_nbd = {
> diff --git a/io/channel-command.c b/io/channel-command.c
> index 7ed726c802..1f61026222 100644
> --- a/io/channel-command.c
> +++ b/io/channel-command.c
> @@ -331,14 +331,21 @@ static int qio_channel_command_close(QIOChannel *ioc,
>
>
> static void qio_channel_command_set_aio_fd_handler(QIOChannel *ioc,
> - AioContext *ctx,
> + AioContext *read_ctx,
> IOHandler *io_read,
> + AioContext *write_ctx,
> IOHandler *io_write,
> void *opaque)
> {
> QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
> - aio_set_fd_handler(ctx, cioc->readfd, io_read, NULL, NULL, NULL, opaque);
> - aio_set_fd_handler(ctx, cioc->writefd, NULL, io_write, NULL, NULL, opaque);
> + if (read_ctx) {
> + aio_set_fd_handler(read_ctx, cioc->readfd, io_read, NULL,
> + NULL, NULL, opaque);
> + }
> + if (write_ctx) {
> + aio_set_fd_handler(write_ctx, cioc->writefd, NULL, io_write,
> + NULL, NULL, opaque);
> + }
> }
>
>
...
> diff --git a/nbd/client.c b/nbd/client.c
> index 479208d5d9..81877d088d 100644
> --- a/nbd/client.c
> +++ b/nbd/client.c
> @@ -948,7 +948,7 @@ static int nbd_start_negotiate(AioContext *aio_context, QIOChannel *ioc,
> ioc = *outioc;
> if (aio_context) {
> qio_channel_set_blocking(ioc, false, NULL);
> - qio_channel_attach_aio_context(ioc, aio_context);
> + qio_channel_set_follow_coroutine_ctx(ioc, true);
This is actually unreachable, aio_context is always NULL here.
> }
> } else {
> error_setg(errp, "Server does not support STARTTLS");
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [PATCH 2/2] io: follow coroutine AioContext in qio_channel_yield()
2023-08-24 11:26 ` Daniel P. Berrangé
@ 2023-08-24 17:07 ` Stefan Hajnoczi
2023-08-24 18:26 ` Stefan Hajnoczi
1 sibling, 0 replies; 11+ messages in thread
From: Stefan Hajnoczi @ 2023-08-24 17:07 UTC (permalink / raw)
To: Daniel P. Berrangé
Cc: qemu-devel, Eric Blake, kwolf, Fam Zheng, Juan Quintela,
Vladimir Sementsov-Ogievskiy, Hanna Reitz, Paolo Bonzini,
qemu-block, Leonardo Bras, Coiby Xu, Peter Xu
[-- Attachment #1: Type: text/plain, Size: 16808 bytes --]
On Thu, Aug 24, 2023 at 12:26:05PM +0100, Daniel P. Berrangé wrote:
> On Wed, Aug 23, 2023 at 07:45:04PM -0400, Stefan Hajnoczi wrote:
> > The ongoing QEMU multi-queue block layer effort makes it possible for multiple
> > threads to process I/O in parallel. The nbd block driver is not compatible with
> > the multi-queue block layer yet because QIOChannel cannot be used easily from
> > coroutines running in multiple threads. This series changes the QIOChannel API
> > to make that possible.
> >
> > In the current API, calling qio_channel_attach_aio_context() sets the
> > AioContext where qio_channel_yield() installs an fd handler prior to yielding:
> >
> > qio_channel_attach_aio_context(ioc, my_ctx);
> > ...
> > qio_channel_yield(ioc); // my_ctx is used here
> > ...
> > qio_channel_detach_aio_context(ioc);
> >
> > This API design has limitations: reading and writing must be done in the same
> > AioContext and moving between AioContexts involves a cumbersome sequence of API
> > calls that is not suitable for doing on a per-request basis.
> >
> > There is no fundamental reason why a QIOChannel needs to run within the
> > same AioContext every time qio_channel_yield() is called. QIOChannel
> > only uses the AioContext while inside qio_channel_yield(). The rest of
> > the time, QIOChannel is independent of any AioContext.
> >
> > In the new API, qio_channel_yield() queries the AioContext from the current
> > coroutine using qemu_coroutine_get_aio_context(). There is no need to
> > explicitly attach/detach AioContexts anymore and
> > qio_channel_attach_aio_context() and qio_channel_detach_aio_context() are gone.
> > One coroutine can read from the QIOChannel while another coroutine writes from
> > a different AioContext.
> >
> > This API change allows the nbd block driver to use QIOChannel from any thread.
> > It's important to keep in mind that the block driver already synchronizes
> > QIOChannel access and ensures that two coroutines never read simultaneously or
> > write simultaneously.
> >
> > This patch updates all users of qio_channel_attach_aio_context() to the
> > new API. Most conversions are simple, but vhost-user-server requires a
> > new qemu_coroutine_yield() call to quiesce the vu_client_trip()
> > coroutine when not attached to any AioContext.
> >
> > While the API is has become simpler, there is one wart: QIOChannel has a
> > special case for the iohandler AioContext (used for handlers that must not run
> > in nested event loops). I didn't find an elegant way preserve that behavior, so
> > I added a new API called qio_channel_set_follow_coroutine_ctx(ioc, true|false)
> > for opting in to the new AioContext model. By default QIOChannel uses the
> > iohandler AioHandler. Code that formerly called
> > qio_channel_attach_aio_context() now calls
> > qio_channel_set_follow_coroutine_ctx(ioc, true) once after the QIOChannel is
> > created.
>
> I wonder if it is better to just pass the AioContext object into
> qio_channel_yield explicitly eg have
>
> qio_channel_yield(QIOChannel *ioc,
> AioContext *ctx,
> GIOCondition cond);
>
> With semantics that if 'ctx == NULL', then we assume the default
> global iohandler context, and for non-default context it must
> be non-NULL ?
>
> That could nicely de-couple the API from relying on global
> coroutine/thread state for querying an AioContext, which
> makes it easier to reason about IMHO.
I like the idea and am auditing all callers of qio_channel_yield() to
see whether passing along the AioContext is feasible. Hopefully the next
version of this series can take that approach.
>
> >
> > Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> > ---
> > include/io/channel.h | 34 +++++++--
> > include/qemu/vhost-user-server.h | 1 +
> > block/nbd.c | 11 +--
> > io/channel-command.c | 13 +++-
> > io/channel-file.c | 18 ++++-
> > io/channel-null.c | 3 +-
> > io/channel-socket.c | 18 ++++-
> > io/channel-tls.c | 6 +-
> > io/channel.c | 120 ++++++++++++++++++++++---------
> > migration/channel-block.c | 3 +-
> > nbd/client.c | 2 +-
> > nbd/server.c | 14 +---
> > scsi/qemu-pr-helper.c | 4 +-
> > util/vhost-user-server.c | 27 +++++--
> > 14 files changed, 191 insertions(+), 83 deletions(-)
> >
> > diff --git a/include/io/channel.h b/include/io/channel.h
> > index 229bf36910..dfbe6f2931 100644
> > --- a/include/io/channel.h
> > +++ b/include/io/channel.h
> > @@ -81,9 +81,11 @@ struct QIOChannel {
> > Object parent;
> > unsigned int features; /* bitmask of QIOChannelFeatures */
> > char *name;
> > - AioContext *ctx;
> > + AioContext *read_ctx;
> > Coroutine *read_coroutine;
> > + AioContext *write_ctx;
> > Coroutine *write_coroutine;
> > + bool follow_coroutine_ctx;
> > #ifdef _WIN32
> > HANDLE event; /* For use with GSource on Win32 */
> > #endif
> > @@ -140,8 +142,9 @@ struct QIOChannelClass {
> > int whence,
> > Error **errp);
> > void (*io_set_aio_fd_handler)(QIOChannel *ioc,
> > - AioContext *ctx,
> > + AioContext *read_ctx,
> > IOHandler *io_read,
> > + AioContext *write_ctx,
> > IOHandler *io_write,
> > void *opaque);
> > int (*io_flush)(QIOChannel *ioc,
> > @@ -498,6 +501,21 @@ int qio_channel_set_blocking(QIOChannel *ioc,
> > bool enabled,
> > Error **errp);
> >
> > +/**
> > + * qio_channel_set_follow_coroutine_ctx:
> > + * @ioc: the channel object
> > + * @enabled: whether or not to follow the coroutine's AioContext
> > + *
> > + * If @enabled is true, calls to qio_channel_yield() use the current
> > + * coroutine's AioContext. Usually this is desirable.
> > + *
> > + * If @enabled is false, calls to qio_channel_yield() use the global iohandler
> > + * AioContext. This is may be used by coroutines that run in the main loop and
> > + * do not wish to respond to I/O during nested event loops. This is the
> > + * default for compatibility with code that is not aware of AioContexts.
> > + */
> > +void qio_channel_set_follow_coroutine_ctx(QIOChannel *ioc, bool enabled);
> > +
> > /**
> > * qio_channel_close:
> > * @ioc: the channel object
> > @@ -785,8 +803,9 @@ void qio_channel_wait(QIOChannel *ioc,
> > /**
> > * qio_channel_set_aio_fd_handler:
> > * @ioc: the channel object
> > - * @ctx: the AioContext to set the handlers on
> > + * @read_ctx: the AioContext to set the read handler on or NULL
> > * @io_read: the read handler
> > + * @write_ctx: the AioContext to set the write handler on or NULL
> > * @io_write: the write handler
> > * @opaque: the opaque value passed to the handler
> > *
> > @@ -794,10 +813,17 @@ void qio_channel_wait(QIOChannel *ioc,
> > * be used by channel implementations to forward the handlers
> > * to another channel (e.g. from #QIOChannelTLS to the
> > * underlying socket).
> > + *
> > + * When @read_ctx is NULL, don't touch the read handler. When @write_ctx is
> > + * NULL, don't touch the write handler. Note that setting the read handler
> > + * clears the write handler, and vice versa, if they share the same AioContext.
> > + * Therefore the caller must pass both handlers together when sharing the same
> > + * AioContext.
> > */
> > void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
> > - AioContext *ctx,
> > + AioContext *read_ctx,
> > IOHandler *io_read,
> > + AioContext *write_ctx,
> > IOHandler *io_write,
> > void *opaque);
> >
>
> Need to drop the qio_channel_attach_aio_context / qio_channel_detach_aio_context
> methods from the header too.
Thanks for spotting this. I'll fix it.
> > diff --git a/io/channel-command.c b/io/channel-command.c
> > index 7ed726c802..1f61026222 100644
> > --- a/io/channel-command.c
> > +++ b/io/channel-command.c
> > @@ -331,14 +331,21 @@ static int qio_channel_command_close(QIOChannel *ioc,
> >
> >
> > static void qio_channel_command_set_aio_fd_handler(QIOChannel *ioc,
> > - AioContext *ctx,
> > + AioContext *read_ctx,
> > IOHandler *io_read,
> > + AioContext *write_ctx,
> > IOHandler *io_write,
> > void *opaque)
> > {
> > QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
> > - aio_set_fd_handler(ctx, cioc->readfd, io_read, NULL, NULL, NULL, opaque);
> > - aio_set_fd_handler(ctx, cioc->writefd, NULL, io_write, NULL, NULL, opaque);
> > + if (read_ctx) {
> > + aio_set_fd_handler(read_ctx, cioc->readfd, io_read, NULL,
> > + NULL, NULL, opaque);
> > + }
> > + if (write_ctx) {
> > + aio_set_fd_handler(write_ctx, cioc->writefd, NULL, io_write,
> > + NULL, NULL, opaque);
> > + }
> > }
> >
> >
> > diff --git a/io/channel-file.c b/io/channel-file.c
> > index 8b5821f452..e6c6329dbb 100644
> > --- a/io/channel-file.c
> > +++ b/io/channel-file.c
> > @@ -192,13 +192,27 @@ static int qio_channel_file_close(QIOChannel *ioc,
> >
> >
> > static void qio_channel_file_set_aio_fd_handler(QIOChannel *ioc,
> > - AioContext *ctx,
> > + AioContext *read_ctx,
> > IOHandler *io_read,
> > + AioContext *write_ctx,
> > IOHandler *io_write,
> > void *opaque)
> > {
> > QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
> > - aio_set_fd_handler(ctx, fioc->fd, io_read, io_write, NULL, NULL, opaque);
> > +
> > + if (read_ctx == write_ctx) {
> > + aio_set_fd_handler(read_ctx, fioc->fd, io_read, io_write,
> > + NULL, NULL, opaque);
> > + } else {
> > + if (read_ctx) {
> > + aio_set_fd_handler(read_ctx, fioc->fd, io_read, NULL,
> > + NULL, NULL, opaque);
> > + }
> > + if (write_ctx) {
> > + aio_set_fd_handler(write_ctx, fioc->fd, NULL, io_write,
> > + NULL, NULL, opaque);
> > + }
> > + }
> > }
> >
> > static GSource *qio_channel_file_create_watch(QIOChannel *ioc,
> > diff --git a/io/channel-null.c b/io/channel-null.c
> > index 4fafdb770d..ef99586348 100644
> > --- a/io/channel-null.c
> > +++ b/io/channel-null.c
> > @@ -128,8 +128,9 @@ qio_channel_null_close(QIOChannel *ioc,
> >
> > static void
> > qio_channel_null_set_aio_fd_handler(QIOChannel *ioc G_GNUC_UNUSED,
> > - AioContext *ctx G_GNUC_UNUSED,
> > + AioContext *read_ctx G_GNUC_UNUSED,
> > IOHandler *io_read G_GNUC_UNUSED,
> > + AioContext *write_ctx G_GNUC_UNUSED,
> > IOHandler *io_write G_GNUC_UNUSED,
> > void *opaque G_GNUC_UNUSED)
> > {
> > diff --git a/io/channel-socket.c b/io/channel-socket.c
> > index d99945ebec..daeb92bbe0 100644
> > --- a/io/channel-socket.c
> > +++ b/io/channel-socket.c
> > @@ -893,13 +893,27 @@ qio_channel_socket_shutdown(QIOChannel *ioc,
> > }
> >
> > static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
> > - AioContext *ctx,
> > + AioContext *read_ctx,
> > IOHandler *io_read,
> > + AioContext *write_ctx,
> > IOHandler *io_write,
> > void *opaque)
> > {
> > QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> > - aio_set_fd_handler(ctx, sioc->fd, io_read, io_write, NULL, NULL, opaque);
> > +
> > + if (read_ctx == write_ctx) {
> > + aio_set_fd_handler(read_ctx, sioc->fd, io_read, io_write,
> > + NULL, NULL, opaque);
> > + } else {
> > + if (read_ctx) {
> > + aio_set_fd_handler(read_ctx, sioc->fd, io_read, NULL,
> > + NULL, NULL, opaque);
> > + }
> > + if (write_ctx) {
> > + aio_set_fd_handler(write_ctx, sioc->fd, NULL, io_write,
> > + NULL, NULL, opaque);
> > + }
> > + }
> > }
> >
> > static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,
>
> The file, command and socket impls all have fairly similar logic, and
> could be handled by calling out to a common helper in channel-util.c
> along the lines of this:
>
> void qio_channel_util_set_aio_fd_handler(int read_fd,
> AioContext *read_ctx,
> IOHandler *io_read,
> int write_fd,
> AioContext *write_ctx,
> IOHandler *io_write,
> void *opaque)
> {
> if (read_fd == write_fd && read_ctx == write_ctx) {
> aio_set_fd_handler(read_ctx, read_fd, io_read, io_write,
> NULL, NULL, opaque);
> } else {
> if (read_ctx) {
> aio_set_fd_handler(read_ctx, read_fd, io_read, NULL,
> NULL, NULL, opaque);
> }
> if (write_ctx) {
> aio_set_fd_handler(write_ctx, write_fd, NULL, io_write,
> NULL, NULL, opaque);
> }
> }
> }
Okay.
>
>
> > diff --git a/io/channel.c b/io/channel.c
> > index c415f3fc88..b190d593d3 100644
> > --- a/io/channel.c
> > +++ b/io/channel.c
> > @@ -365,6 +365,12 @@ int qio_channel_set_blocking(QIOChannel *ioc,
> > }
> >
> >
> > +void qio_channel_set_follow_coroutine_ctx(QIOChannel *ioc, bool enabled)
> > +{
> > + ioc->follow_coroutine_ctx = enabled;
> > +}
> > +
> > +
> > int qio_channel_close(QIOChannel *ioc,
> > Error **errp)
> > {
> > @@ -542,56 +550,101 @@ static void qio_channel_restart_write(void *opaque)
> > aio_co_wake(co);
> > }
> >
> > -static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
> > +static void coroutine_fn
> > +qio_channel_set_fd_handlers(QIOChannel *ioc, GIOCondition condition)
> > {
> > - IOHandler *rd_handler = NULL, *wr_handler = NULL;
> > + AioContext *ctx = ioc->follow_coroutine_ctx ?
> > + qemu_coroutine_get_aio_context(qemu_coroutine_self()) :
> > + iohandler_get_aio_context();
>
> This is conditionally calling qemu_coroutine_get_aio_context
> or iohandler_get_aio_context, but in qio_channel_yield, we
> don't look at 'follow_coroutine_ctx' and unconditionally use
> qemu_coroutine_get_aio_context. Is that correct ?
Yes. Only the fd handler function executes in ctx:
static void qio_channel_restart_read(void *opaque)
{
QIOChannel *ioc = opaque;
Coroutine *co = qatomic_xchg(&ioc->read_coroutine, NULL);
if (!co) {
return;
}
/* Assert that aio_co_wake() reenters the coroutine directly */
assert(qemu_get_current_aio_context() ==
qemu_coroutine_get_aio_context(co));
aio_co_wake(co);
^^^^^^^^^^^^^^^
}
aio_co_wake() will wake up read_coroutine in the AioContext where it was
previously running. The coroutine is never moved to the AioContext
chosen in qio_channel_yield().
Stefan
[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [PATCH 2/2] io: follow coroutine AioContext in qio_channel_yield()
2023-08-24 16:09 ` Fabiano Rosas
@ 2023-08-24 17:29 ` Stefan Hajnoczi
0 siblings, 0 replies; 11+ messages in thread
From: Stefan Hajnoczi @ 2023-08-24 17:29 UTC (permalink / raw)
To: Fabiano Rosas
Cc: qemu-devel, Eric Blake, kwolf, Fam Zheng, Juan Quintela,
Vladimir Sementsov-Ogievskiy, Daniel Berrange, Hanna Reitz,
Paolo Bonzini, qemu-block, Leonardo Bras, Coiby Xu, Peter Xu
[-- Attachment #1: Type: text/plain, Size: 3104 bytes --]
On Thu, Aug 24, 2023 at 01:09:59PM -0300, Fabiano Rosas wrote:
> Stefan Hajnoczi <stefanha@redhat.com> writes:
>
> > @@ -2089,10 +2088,6 @@ static void nbd_attach_aio_context(BlockDriverState *bs,
> > * the reconnect_delay_timer cannot be active here.
> > */
> > assert(!s->reconnect_delay_timer);
> > -
> > - if (s->ioc) {
> > - qio_channel_attach_aio_context(s->ioc, new_context);
> > - }
> > }
> >
> > static void nbd_detach_aio_context(BlockDriverState *bs)
> > @@ -2101,10 +2096,6 @@ static void nbd_detach_aio_context(BlockDriverState *bs)
> >
> > assert(!s->open_timer);
> > assert(!s->reconnect_delay_timer);
> > -
> > - if (s->ioc) {
> > - qio_channel_detach_aio_context(s->ioc);
> > - }
> > }
>
> The whole attach/detach functions could go away.
Yes, but at the expense of losing the assertions. Some of them have
extensive comments and I guess they are important to someone, so I
didn't drop them.
>
> >
> > static BlockDriver bdrv_nbd = {
> > diff --git a/io/channel-command.c b/io/channel-command.c
> > index 7ed726c802..1f61026222 100644
> > --- a/io/channel-command.c
> > +++ b/io/channel-command.c
> > @@ -331,14 +331,21 @@ static int qio_channel_command_close(QIOChannel *ioc,
> >
> >
> > static void qio_channel_command_set_aio_fd_handler(QIOChannel *ioc,
> > - AioContext *ctx,
> > + AioContext *read_ctx,
> > IOHandler *io_read,
> > + AioContext *write_ctx,
> > IOHandler *io_write,
> > void *opaque)
> > {
> > QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
> > - aio_set_fd_handler(ctx, cioc->readfd, io_read, NULL, NULL, NULL, opaque);
> > - aio_set_fd_handler(ctx, cioc->writefd, NULL, io_write, NULL, NULL, opaque);
> > + if (read_ctx) {
> > + aio_set_fd_handler(read_ctx, cioc->readfd, io_read, NULL,
> > + NULL, NULL, opaque);
> > + }
> > + if (write_ctx) {
> > + aio_set_fd_handler(write_ctx, cioc->writefd, NULL, io_write,
> > + NULL, NULL, opaque);
> > + }
> > }
> >
> >
>
> ...
>
> > diff --git a/nbd/client.c b/nbd/client.c
> > index 479208d5d9..81877d088d 100644
> > --- a/nbd/client.c
> > +++ b/nbd/client.c
> > @@ -948,7 +948,7 @@ static int nbd_start_negotiate(AioContext *aio_context, QIOChannel *ioc,
> > ioc = *outioc;
> > if (aio_context) {
> > qio_channel_set_blocking(ioc, false, NULL);
> > - qio_channel_attach_aio_context(ioc, aio_context);
> > + qio_channel_set_follow_coroutine_ctx(ioc, true);
>
> This is actually unreachable, aio_context is always NULL here.
Interesting, I will add a patch to remove the dead code. Thanks!
[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [PATCH 1/2] io: check there are no qio_channel_yield() coroutines during ->finalize()
2023-08-23 23:45 ` [PATCH 1/2] io: check there are no qio_channel_yield() coroutines during ->finalize() Stefan Hajnoczi
2023-08-24 11:01 ` Daniel P. Berrangé
@ 2023-08-24 18:18 ` Eric Blake
1 sibling, 0 replies; 11+ messages in thread
From: Eric Blake @ 2023-08-24 18:18 UTC (permalink / raw)
To: Stefan Hajnoczi
Cc: qemu-devel, kwolf, Fam Zheng, Juan Quintela,
Vladimir Sementsov-Ogievskiy, Daniel Berrange, Hanna Reitz,
Paolo Bonzini, qemu-block, Leonardo Bras, Coiby Xu, Peter Xu
On Wed, Aug 23, 2023 at 07:45:03PM -0400, Stefan Hajnoczi wrote:
> Callers must clean up their coroutines before calling
> object_unref(OBJECT(ioc)) to prevent an fd handler leak. Add an
> assertion to check this.
>
> This patch is preparation for the fd handler changes that follow.
>
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
> io/channel.c | 4 ++++
> 1 file changed, 4 insertions(+)
Reviewed-by: Eric Blake <eblake@redhat.com>
--
Eric Blake, Principal Software Engineer
Red Hat, Inc.
Virtualization: qemu.org | libguestfs.org
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [PATCH 2/2] io: follow coroutine AioContext in qio_channel_yield()
2023-08-24 11:26 ` Daniel P. Berrangé
2023-08-24 17:07 ` Stefan Hajnoczi
@ 2023-08-24 18:26 ` Stefan Hajnoczi
2023-08-25 8:09 ` Daniel P. Berrangé
1 sibling, 1 reply; 11+ messages in thread
From: Stefan Hajnoczi @ 2023-08-24 18:26 UTC (permalink / raw)
To: Daniel P. Berrangé
Cc: qemu-devel, Eric Blake, kwolf, Fam Zheng, Juan Quintela,
Vladimir Sementsov-Ogievskiy, Hanna Reitz, Paolo Bonzini,
qemu-block, Leonardo Bras, Coiby Xu, Peter Xu
[-- Attachment #1: Type: text/plain, Size: 6123 bytes --]
On Thu, Aug 24, 2023 at 12:26:05PM +0100, Daniel P. Berrangé wrote:
> On Wed, Aug 23, 2023 at 07:45:04PM -0400, Stefan Hajnoczi wrote:
> > The ongoing QEMU multi-queue block layer effort makes it possible for multiple
> > threads to process I/O in parallel. The nbd block driver is not compatible with
> > the multi-queue block layer yet because QIOChannel cannot be used easily from
> > coroutines running in multiple threads. This series changes the QIOChannel API
> > to make that possible.
> >
> > In the current API, calling qio_channel_attach_aio_context() sets the
> > AioContext where qio_channel_yield() installs an fd handler prior to yielding:
> >
> > qio_channel_attach_aio_context(ioc, my_ctx);
> > ...
> > qio_channel_yield(ioc); // my_ctx is used here
> > ...
> > qio_channel_detach_aio_context(ioc);
> >
> > This API design has limitations: reading and writing must be done in the same
> > AioContext and moving between AioContexts involves a cumbersome sequence of API
> > calls that is not suitable for doing on a per-request basis.
> >
> > There is no fundamental reason why a QIOChannel needs to run within the
> > same AioContext every time qio_channel_yield() is called. QIOChannel
> > only uses the AioContext while inside qio_channel_yield(). The rest of
> > the time, QIOChannel is independent of any AioContext.
> >
> > In the new API, qio_channel_yield() queries the AioContext from the current
> > coroutine using qemu_coroutine_get_aio_context(). There is no need to
> > explicitly attach/detach AioContexts anymore and
> > qio_channel_attach_aio_context() and qio_channel_detach_aio_context() are gone.
> > One coroutine can read from the QIOChannel while another coroutine writes from
> > a different AioContext.
> >
> > This API change allows the nbd block driver to use QIOChannel from any thread.
> > It's important to keep in mind that the block driver already synchronizes
> > QIOChannel access and ensures that two coroutines never read simultaneously or
> > write simultaneously.
> >
> > This patch updates all users of qio_channel_attach_aio_context() to the
> > new API. Most conversions are simple, but vhost-user-server requires a
> > new qemu_coroutine_yield() call to quiesce the vu_client_trip()
> > coroutine when not attached to any AioContext.
> >
> > While the API is has become simpler, there is one wart: QIOChannel has a
> > special case for the iohandler AioContext (used for handlers that must not run
> > in nested event loops). I didn't find an elegant way preserve that behavior, so
> > I added a new API called qio_channel_set_follow_coroutine_ctx(ioc, true|false)
> > for opting in to the new AioContext model. By default QIOChannel uses the
> > iohandler AioHandler. Code that formerly called
> > qio_channel_attach_aio_context() now calls
> > qio_channel_set_follow_coroutine_ctx(ioc, true) once after the QIOChannel is
> > created.
>
> I wonder if it is better to just pass the AioContext object into
> qio_channel_yield explicitly eg have
>
> qio_channel_yield(QIOChannel *ioc,
> AioContext *ctx,
> GIOCondition cond);
>
> With semantics that if 'ctx == NULL', then we assume the default
> global iohandler context, and for non-default context it must
> be non-NULL ?
>
> That could nicely de-couple the API from relying on global
> coroutine/thread state for querying an AioContext, which
> makes it easier to reason about IMHO.
Hi Dan,
I've done most of the audit necessary to understand which AioContext is
used where. The call graph is large because qio_channel_yield() is used
internally by qio_channel_readv_full_all_eof(),
qio_channel_writev_full_all(), and their variants. They would all need
a new AioContext argument.
I think it's not worth passing AioContext explicitly everywhere since
this involves a lot of code changes and more verbosity to achieve what
we already have.
However, If you think the QIOChannel API should pass AioContext
explicitly then I'll go ahead and make the changes.
Here is what I've explored so far:
qio_channel_readv_full_all_eof
mpqemu_read - should be doable
qio_channel_readv_all_eof
qio_channel_read_all_eof
multifd_recv_thread - NULL non-coroutine
vu_message_read - coroutine AioContext
qio_channel_readv_full_all
hw/virtio/vhost-user.c:backend_read() - NULL non-coroutine
qio_channel_readv_all
nbd_co_receive_offset_data_payload - coroutine AioContext
nbd_co_do_receive_one_chunk - coroutine AioContext
qio_channel_read_all
hw/virtio/vhost-user.c:backend_read() - NULL non-coroutine
tpm_emulator_unix_tx_bufs - NULL non-coroutine
nbd_read - ?
zlib_recv_pages - NULL non-coroutine
zstd_recv_pages - NULL non-coroutine
multifd_initial_recv_packet - NULL non-coroutine
nbd_opt_read - iohandler
pr_manager_helper_read - NULL non-coroutine
prh_read_request - coroutine AioContext
prh_co_entry - coroutine AioContext
char_socket_ping_pong - NULL non-coroutine
nocomp_recv_pages - NULL non-coroutine
test_io_thread_reader - NULL non-coroutine
qio_channel_writev_full_all
mpqemu_msg_send - should be doable
qio_channel_writev_all
nbd_co_send_request - coroutine AioContext
hw/virtio/vhost-user.c:backend_read() - NULL non-coroutine
qio_channel_write_all
tpm_emulator_unix_tx_bufs - NULL non-coroutine
multifd_send_initial_packet - NULL non-coroutine
multifd_send_thread - NULL non-coroutine
nbd_write - ?
prh_write_response - coroutine AioContext
prh_co_entry - coroutine AioContext
char_socket_ping_pong - NULL non-coroutine
ppm_save - iohandler
qemu_fflush - ?
nbd_negotiate_send_meta_context - iohandler
nbd/server.c:nbd_co_send_iov - cooroutine AioContext
test_io_thread_writer - NULL non-coroutine
multifd_send_thread - NULL non-coroutine
qemu_fill_buffer - ?
What do you think?
Stefan
[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 488 bytes --]
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [PATCH 2/2] io: follow coroutine AioContext in qio_channel_yield()
2023-08-24 18:26 ` Stefan Hajnoczi
@ 2023-08-25 8:09 ` Daniel P. Berrangé
0 siblings, 0 replies; 11+ messages in thread
From: Daniel P. Berrangé @ 2023-08-25 8:09 UTC (permalink / raw)
To: Stefan Hajnoczi
Cc: qemu-devel, Eric Blake, kwolf, Fam Zheng, Juan Quintela,
Vladimir Sementsov-Ogievskiy, Hanna Reitz, Paolo Bonzini,
qemu-block, Leonardo Bras, Coiby Xu, Peter Xu
On Thu, Aug 24, 2023 at 02:26:42PM -0400, Stefan Hajnoczi wrote:
> I've done most of the audit necessary to understand which AioContext is
> used where. The call graph is large because qio_channel_yield() is used
> internally by qio_channel_readv_full_all_eof(),
> qio_channel_writev_full_all(), and their variants. They would all need
> a new AioContext argument.
Argh, I forgot about that usage.
> I think it's not worth passing AioContext explicitly everywhere since
> this involves a lot of code changes and more verbosity to achieve what
> we already have.
Yeah, I agree with you.
>
> However, If you think the QIOChannel API should pass AioContext
> explicitly then I'll go ahead and make the changes.
>
> Here is what I've explored so far:
>
> qio_channel_readv_full_all_eof
> mpqemu_read - should be doable
> qio_channel_readv_all_eof
> qio_channel_read_all_eof
> multifd_recv_thread - NULL non-coroutine
> vu_message_read - coroutine AioContext
> qio_channel_readv_full_all
> hw/virtio/vhost-user.c:backend_read() - NULL non-coroutine
> qio_channel_readv_all
> nbd_co_receive_offset_data_payload - coroutine AioContext
> nbd_co_do_receive_one_chunk - coroutine AioContext
> qio_channel_read_all
> hw/virtio/vhost-user.c:backend_read() - NULL non-coroutine
> tpm_emulator_unix_tx_bufs - NULL non-coroutine
> nbd_read - ?
> zlib_recv_pages - NULL non-coroutine
> zstd_recv_pages - NULL non-coroutine
> multifd_initial_recv_packet - NULL non-coroutine
> nbd_opt_read - iohandler
> pr_manager_helper_read - NULL non-coroutine
> prh_read_request - coroutine AioContext
> prh_co_entry - coroutine AioContext
> char_socket_ping_pong - NULL non-coroutine
> nocomp_recv_pages - NULL non-coroutine
> test_io_thread_reader - NULL non-coroutine
> qio_channel_writev_full_all
> mpqemu_msg_send - should be doable
> qio_channel_writev_all
> nbd_co_send_request - coroutine AioContext
> hw/virtio/vhost-user.c:backend_read() - NULL non-coroutine
> qio_channel_write_all
> tpm_emulator_unix_tx_bufs - NULL non-coroutine
> multifd_send_initial_packet - NULL non-coroutine
> multifd_send_thread - NULL non-coroutine
> nbd_write - ?
> prh_write_response - coroutine AioContext
> prh_co_entry - coroutine AioContext
> char_socket_ping_pong - NULL non-coroutine
> ppm_save - iohandler
> qemu_fflush - ?
> nbd_negotiate_send_meta_context - iohandler
> nbd/server.c:nbd_co_send_iov - cooroutine AioContext
> test_io_thread_writer - NULL non-coroutine
> multifd_send_thread - NULL non-coroutine
> qemu_fill_buffer - ?
>
> What do you think?
>
> Stefan
With regards,
Daniel
--
|: https://berrange.com -o- https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org -o- https://fstop138.berrange.com :|
|: https://entangle-photo.org -o- https://www.instagram.com/dberrange :|
^ permalink raw reply [flat|nested] 11+ messages in thread
end of thread, other threads:[~2023-08-25 8:10 UTC | newest]
Thread overview: 11+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2023-08-23 23:45 [PATCH 0/2] io: follow coroutine AioContext in qio_channel_yield() Stefan Hajnoczi
2023-08-23 23:45 ` [PATCH 1/2] io: check there are no qio_channel_yield() coroutines during ->finalize() Stefan Hajnoczi
2023-08-24 11:01 ` Daniel P. Berrangé
2023-08-24 18:18 ` Eric Blake
2023-08-23 23:45 ` [PATCH 2/2] io: follow coroutine AioContext in qio_channel_yield() Stefan Hajnoczi
2023-08-24 11:26 ` Daniel P. Berrangé
2023-08-24 17:07 ` Stefan Hajnoczi
2023-08-24 18:26 ` Stefan Hajnoczi
2023-08-25 8:09 ` Daniel P. Berrangé
2023-08-24 16:09 ` Fabiano Rosas
2023-08-24 17:29 ` Stefan Hajnoczi
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).