* [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups
@ 2024-02-02 10:28 peterx
2024-02-02 10:28 ` [PATCH v2 01/23] migration/multifd: Drop stale comment for multifd zero copy peterx
` (23 more replies)
0 siblings, 24 replies; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
v1: https://lore.kernel.org/r/20240131103111.306523-1-peterx@redhat.com
This v2 patchset contains quite a few refactorings to current multifd:
1) Redefines send_prepare() interface, to be:
p->pages -----------> send_prepare() -------------> IOVs
A major goal of it is to get prepared for others to add quite a few new
hardware accelerators for multifd compression, or adding zeropage
detection accelerators.
It turns out we don't yet need more hooks to achieve that, so hopefully
most new accelerator codes do not need a lot of changes if rebase onto
this (please still wait until this series fully reviewed and merged;
hopefully this should happen soon). Please note that p->normal[] is
dropped; now please use p->pages->offset[] for the same purpose.
Please check the code for details.
We may want a separate OPs for file later, which I'll leave that
decision to Fabiano.
This also prepares any possibility of replacing p->pages to raw
buffers (VFIO usage). But that's left for later too. Logically with
this patchset applied, it should be much easier.
2) [new in v2] Fixed one more race usage of MultiFDSendParams.packet_num,
as reported by Fabiano during his review on v1. This is mostly done in
the single patch:
3) [new in v2] Made multifd sender side lockless, by dropping the mutex,
as suggested by Fabiano in his review in v1. This is mostly done in a
single patch:
4) A lot of cleanups to multifd code, it picked up some patches from an
old series of mine [0] (the last patches were dropped, though; I did
the cleanup slightly differently), and added a bunch of other cleanups
either I got to see when doing that, or requested when Fabiano reviewed
v1.
Note: when I worked on this I even found more things to cleanup. But I
decided to stop at only what mostly requested from Fabiano in v1 to
stop growing this series.
1) above is mostly done in:
migration/multifd: Move header prepare/fill into send_prepare()
2) above is mostly done in:
migration/multifd: Fix MultiFDSendParams.packet_num race
3) above is mostly done in:
migration/multifd: Optimize sender side to be lockless
The rest patches all fall into 4) category. Please have a look, thanks.
Avihai/Fabiano, I hope my understanding is right that we can still consider
this series before a separate patchset to fix the dangling thread issue.
If not, please shoot.
v2 changelog:
- When spurious wakeup happens for multifd sender thread, crash hard rather
than causing a deadlock later.
- Always use atomic operations on pending_job / pending_sync
- Moved the setup of zerocopy write_flags from multifd_save_setup() into
no-comp setup() phase.
- Added below patches, some form of request here and there from Fabiano:
migration/multifd: Split multifd_send_terminate_threads()
migration/multifd: Change retval of multifd_queue_page()
migration/multifd: Change retval of multifd_send_pages()
migration/multifd: Rewrite multifd_queue_page()
migration/multifd: Cleanup multifd_save_cleanup()
migration/multifd: Cleanup multifd_load_cleanup()
migration/multifd: Stick with send/recv on function names
migration/multifd: Fix MultiFDSendParams.packet_num race
migration/multifd: Optimize sender side to be lockless
[0] https://lore.kernel.org/r/20231022201211.452861-1-peterx@redhat.com
[1] https://lore.kernel.org/qemu-devel/20240126221943.26628-1-farosas@suse.de
Peter Xu (23):
migration/multifd: Drop stale comment for multifd zero copy
migration/multifd: multifd_send_kick_main()
migration/multifd: Drop MultiFDSendParams.quit, cleanup error paths
migration/multifd: Postpone reset of MultiFDPages_t
migration/multifd: Drop MultiFDSendParams.normal[] array
migration/multifd: Separate SYNC request with normal jobs
migration/multifd: Simplify locking in sender thread
migration/multifd: Drop pages->num check in sender thread
migration/multifd: Rename p->num_packets and clean it up
migration/multifd: Move total_normal_pages accounting
migration/multifd: Move trace_multifd_send|recv()
migration/multifd: multifd_send_prepare_header()
migration/multifd: Move header prepare/fill into send_prepare()
migration/multifd: Forbid spurious wakeups
migration/multifd: Split multifd_send_terminate_threads()
migration/multifd: Change retval of multifd_queue_page()
migration/multifd: Change retval of multifd_send_pages()
migration/multifd: Rewrite multifd_queue_page()
migration/multifd: Cleanup multifd_save_cleanup()
migration/multifd: Cleanup multifd_load_cleanup()
migration/multifd: Stick with send/recv on function names
migration/multifd: Fix MultiFDSendParams.packet_num race
migration/multifd: Optimize sender side to be lockless
migration/multifd.h | 50 ++--
migration/migration.c | 12 +-
migration/multifd-zlib.c | 11 +-
migration/multifd-zstd.c | 11 +-
migration/multifd.c | 630 ++++++++++++++++++++++-----------------
migration/ram.c | 2 +-
migration/trace-events | 2 +-
7 files changed, 404 insertions(+), 314 deletions(-)
--
2.43.0
^ permalink raw reply [flat|nested] 49+ messages in thread
* [PATCH v2 01/23] migration/multifd: Drop stale comment for multifd zero copy
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 10:28 ` [PATCH v2 02/23] migration/multifd: multifd_send_kick_main() peterx
` (22 subsequent siblings)
23 siblings, 0 replies; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
We've already done that with multifd_flush_after_each_section, for multifd
in general. Drop the stale "TODO-like" comment.
Reviewed-by: Fabiano Rosas <farosas@suse.de>
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.c | 11 -----------
1 file changed, 11 deletions(-)
diff --git a/migration/multifd.c b/migration/multifd.c
index 25cbc6dc6b..eee2586770 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -598,17 +598,6 @@ int multifd_send_sync_main(void)
}
}
- /*
- * When using zero-copy, it's necessary to flush the pages before any of
- * the pages can be sent again, so we'll make sure the new version of the
- * pages will always arrive _later_ than the old pages.
- *
- * Currently we achieve this by flushing the zero-page requested writes
- * per ram iteration, but in the future we could potentially optimize it
- * to be less frequent, e.g. only after we finished one whole scanning of
- * all the dirty bitmaps.
- */
-
flush_zero_copy = migrate_zero_copy_send();
for (i = 0; i < migrate_multifd_channels(); i++) {
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 02/23] migration/multifd: multifd_send_kick_main()
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
2024-02-02 10:28 ` [PATCH v2 01/23] migration/multifd: Drop stale comment for multifd zero copy peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 10:28 ` [PATCH v2 03/23] migration/multifd: Drop MultiFDSendParams.quit, cleanup error paths peterx
` (21 subsequent siblings)
23 siblings, 0 replies; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
When a multifd sender thread hit errors, it always needs to kick the main
thread by kicking all the semaphores that it can be waiting upon.
Provide a helper for it and deduplicate the code.
Reviewed-by: Fabiano Rosas <farosas@suse.de>
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.c | 21 +++++++++++++++------
1 file changed, 15 insertions(+), 6 deletions(-)
diff --git a/migration/multifd.c b/migration/multifd.c
index eee2586770..b8d2c96533 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -372,6 +372,18 @@ struct {
MultiFDMethods *ops;
} *multifd_send_state;
+/*
+ * The migration thread can wait on either of the two semaphores. This
+ * function can be used to kick the main thread out of waiting on either of
+ * them. Should mostly only be called when something wrong happened with
+ * the current multifd send thread.
+ */
+static void multifd_send_kick_main(MultiFDSendParams *p)
+{
+ qemu_sem_post(&p->sem_sync);
+ qemu_sem_post(&multifd_send_state->channels_ready);
+}
+
/*
* How we use multifd_send_state->pages and channel->pages?
*
@@ -739,8 +751,7 @@ out:
assert(local_err);
trace_multifd_send_error(p->id);
multifd_send_terminate_threads(local_err);
- qemu_sem_post(&p->sem_sync);
- qemu_sem_post(&multifd_send_state->channels_ready);
+ multifd_send_kick_main(p);
error_free(local_err);
}
@@ -781,8 +792,7 @@ static void multifd_tls_outgoing_handshake(QIOTask *task,
* is not created, and then tell who pay attention to me.
*/
p->quit = true;
- qemu_sem_post(&multifd_send_state->channels_ready);
- qemu_sem_post(&p->sem_sync);
+ multifd_send_kick_main(p);
error_free(err);
}
@@ -852,8 +862,7 @@ static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
{
migrate_set_error(migrate_get_current(), err);
/* Error happen, we need to tell who pay attention to me */
- qemu_sem_post(&multifd_send_state->channels_ready);
- qemu_sem_post(&p->sem_sync);
+ multifd_send_kick_main(p);
/*
* Although multifd_send_thread is not created, but main migration
* thread need to judge whether it is running, so we need to mark
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 03/23] migration/multifd: Drop MultiFDSendParams.quit, cleanup error paths
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
2024-02-02 10:28 ` [PATCH v2 01/23] migration/multifd: Drop stale comment for multifd zero copy peterx
2024-02-02 10:28 ` [PATCH v2 02/23] migration/multifd: multifd_send_kick_main() peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 19:15 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 04/23] migration/multifd: Postpone reset of MultiFDPages_t peterx
` (20 subsequent siblings)
23 siblings, 1 reply; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
Multifd send side has two fields to indicate error quits:
- MultiFDSendParams.quit
- &multifd_send_state->exiting
Merge them into the global one. The replacement is done by changing all
p->quit checks into the global var check. The global check doesn't need
any lock.
A few more things done on top of this altogether:
- multifd_send_terminate_threads()
Moving the xchg() of &multifd_send_state->exiting upper, so as to cover
the tracepoint, migrate_set_error() and migrate_set_state().
- multifd_send_sync_main()
In the 2nd loop, add one more check over the global var to make sure we
don't keep the looping if QEMU already decided to quit.
- multifd_tls_outgoing_handshake()
Use multifd_send_terminate_threads() to set the error state. That has
a benefit of updating MigrationState.error to that error too, so we can
persist that 1st error we hit in that specific channel.
- multifd_new_send_channel_async()
Take similar approach like above, drop the migrate_set_error() because
multifd_send_terminate_threads() already covers that. Unwrap the helper
multifd_new_send_channel_cleanup() along the way; not really needed.
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.h | 2 --
migration/multifd.c | 85 ++++++++++++++++++---------------------------
2 files changed, 33 insertions(+), 54 deletions(-)
diff --git a/migration/multifd.h b/migration/multifd.h
index 35d11f103c..7c040cb85a 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -95,8 +95,6 @@ typedef struct {
QemuMutex mutex;
/* is this channel thread running */
bool running;
- /* should this thread finish */
- bool quit;
/* multifd flags for each packet */
uint32_t flags;
/* global number of generated multifd packets */
diff --git a/migration/multifd.c b/migration/multifd.c
index b8d2c96533..2c98023d67 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -372,6 +372,11 @@ struct {
MultiFDMethods *ops;
} *multifd_send_state;
+static bool multifd_send_should_exit(void)
+{
+ return qatomic_read(&multifd_send_state->exiting);
+}
+
/*
* The migration thread can wait on either of the two semaphores. This
* function can be used to kick the main thread out of waiting on either of
@@ -409,7 +414,7 @@ static int multifd_send_pages(void)
MultiFDSendParams *p = NULL; /* make happy gcc */
MultiFDPages_t *pages = multifd_send_state->pages;
- if (qatomic_read(&multifd_send_state->exiting)) {
+ if (multifd_send_should_exit()) {
return -1;
}
@@ -421,14 +426,11 @@ static int multifd_send_pages(void)
*/
next_channel %= migrate_multifd_channels();
for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
- p = &multifd_send_state->params[i];
-
- qemu_mutex_lock(&p->mutex);
- if (p->quit) {
- error_report("%s: channel %d has already quit!", __func__, i);
- qemu_mutex_unlock(&p->mutex);
+ if (multifd_send_should_exit()) {
return -1;
}
+ p = &multifd_send_state->params[i];
+ qemu_mutex_lock(&p->mutex);
if (!p->pending_job) {
p->pending_job++;
next_channel = (i + 1) % migrate_multifd_channels();
@@ -483,6 +485,16 @@ static void multifd_send_terminate_threads(Error *err)
{
int i;
+ /*
+ * We don't want to exit each threads twice. Depending on where
+ * we get the error, or if there are two independent errors in two
+ * threads at the same time, we can end calling this function
+ * twice.
+ */
+ if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
+ return;
+ }
+
trace_multifd_send_terminate_threads(err != NULL);
if (err) {
@@ -497,26 +509,13 @@ static void multifd_send_terminate_threads(Error *err)
}
}
- /*
- * We don't want to exit each threads twice. Depending on where
- * we get the error, or if there are two independent errors in two
- * threads at the same time, we can end calling this function
- * twice.
- */
- if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
- return;
- }
-
for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
- qemu_mutex_lock(&p->mutex);
- p->quit = true;
qemu_sem_post(&p->sem);
if (p->c) {
qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
}
- qemu_mutex_unlock(&p->mutex);
}
}
@@ -615,16 +614,13 @@ int multifd_send_sync_main(void)
for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
- trace_multifd_send_sync_main_signal(p->id);
-
- qemu_mutex_lock(&p->mutex);
-
- if (p->quit) {
- error_report("%s: channel %d has already quit", __func__, i);
- qemu_mutex_unlock(&p->mutex);
+ if (multifd_send_should_exit()) {
return -1;
}
+ trace_multifd_send_sync_main_signal(p->id);
+
+ qemu_mutex_lock(&p->mutex);
p->packet_num = multifd_send_state->packet_num++;
p->flags |= MULTIFD_FLAG_SYNC;
p->pending_job++;
@@ -634,6 +630,10 @@ int multifd_send_sync_main(void)
for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
+ if (multifd_send_should_exit()) {
+ return -1;
+ }
+
qemu_sem_wait(&multifd_send_state->channels_ready);
trace_multifd_send_sync_main_wait(p->id);
qemu_sem_wait(&p->sem_sync);
@@ -671,7 +671,7 @@ static void *multifd_send_thread(void *opaque)
qemu_sem_post(&multifd_send_state->channels_ready);
qemu_sem_wait(&p->sem);
- if (qatomic_read(&multifd_send_state->exiting)) {
+ if (multifd_send_should_exit()) {
break;
}
qemu_mutex_lock(&p->mutex);
@@ -786,12 +786,7 @@ static void multifd_tls_outgoing_handshake(QIOTask *task,
trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
- migrate_set_error(migrate_get_current(), err);
- /*
- * Error happen, mark multifd_send_thread status as 'quit' although it
- * is not created, and then tell who pay attention to me.
- */
- p->quit = true;
+ multifd_send_terminate_threads(err);
multifd_send_kick_main(p);
error_free(err);
}
@@ -857,22 +852,6 @@ static bool multifd_channel_connect(MultiFDSendParams *p,
return true;
}
-static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
- QIOChannel *ioc, Error *err)
-{
- migrate_set_error(migrate_get_current(), err);
- /* Error happen, we need to tell who pay attention to me */
- multifd_send_kick_main(p);
- /*
- * Although multifd_send_thread is not created, but main migration
- * thread need to judge whether it is running, so we need to mark
- * its status.
- */
- p->quit = true;
- object_unref(OBJECT(ioc));
- error_free(err);
-}
-
static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
{
MultiFDSendParams *p = opaque;
@@ -889,7 +868,10 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
}
trace_multifd_new_send_channel_async_error(p->id, local_err);
- multifd_new_send_channel_cleanup(p, ioc, local_err);
+ multifd_send_terminate_threads(local_err);
+ multifd_send_kick_main(p);
+ object_unref(OBJECT(ioc));
+ error_free(local_err);
}
static void multifd_new_send_channel_create(gpointer opaque)
@@ -921,7 +903,6 @@ int multifd_save_setup(Error **errp)
qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem, 0);
qemu_sem_init(&p->sem_sync, 0);
- p->quit = false;
p->pending_job = 0;
p->id = i;
p->pages = multifd_pages_init(page_count);
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 04/23] migration/multifd: Postpone reset of MultiFDPages_t
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (2 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 03/23] migration/multifd: Drop MultiFDSendParams.quit, cleanup error paths peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 10:28 ` [PATCH v2 05/23] migration/multifd: Drop MultiFDSendParams.normal[] array peterx
` (19 subsequent siblings)
23 siblings, 0 replies; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
Now we reset MultiFDPages_t object in the multifd sender thread in the
middle of the sending job. That's not necessary, because the "*pages"
struct will not be reused anyway until pending_job is cleared.
Move that to the end after the job is completed, provide a helper to reset
a "*pages" object. Use that same helper when free the object too.
This prepares us to keep using p->pages in the follow up patches, where we
may drop p->normal[].
Reviewed-by: Fabiano Rosas <farosas@suse.de>
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.c | 18 ++++++++++++++----
1 file changed, 14 insertions(+), 4 deletions(-)
diff --git a/migration/multifd.c b/migration/multifd.c
index 2c98023d67..5633ac245a 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -172,6 +172,17 @@ void multifd_register_ops(int method, MultiFDMethods *ops)
multifd_ops[method] = ops;
}
+/* Reset a MultiFDPages_t* object for the next use */
+static void multifd_pages_reset(MultiFDPages_t *pages)
+{
+ /*
+ * We don't need to touch offset[] array, because it will be
+ * overwritten later when reused.
+ */
+ pages->num = 0;
+ pages->block = NULL;
+}
+
static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
{
MultiFDInit_t msg = {};
@@ -248,9 +259,8 @@ static MultiFDPages_t *multifd_pages_init(uint32_t n)
static void multifd_pages_clear(MultiFDPages_t *pages)
{
- pages->num = 0;
+ multifd_pages_reset(pages);
pages->allocated = 0;
- pages->block = NULL;
g_free(pages->offset);
pages->offset = NULL;
g_free(pages);
@@ -704,8 +714,6 @@ static void *multifd_send_thread(void *opaque)
p->flags = 0;
p->num_packets++;
p->total_normal_pages += p->normal_num;
- p->pages->num = 0;
- p->pages->block = NULL;
qemu_mutex_unlock(&p->mutex);
trace_multifd_send(p->id, packet_num, p->normal_num, flags,
@@ -732,6 +740,8 @@ static void *multifd_send_thread(void *opaque)
stat64_add(&mig_stats.multifd_bytes,
p->next_packet_size + p->packet_len);
+
+ multifd_pages_reset(p->pages);
p->next_packet_size = 0;
qemu_mutex_lock(&p->mutex);
p->pending_job--;
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 05/23] migration/multifd: Drop MultiFDSendParams.normal[] array
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (3 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 04/23] migration/multifd: Postpone reset of MultiFDPages_t peterx
@ 2024-02-02 10:28 ` peterx
2024-02-09 0:06 ` [External] " Hao Xiang
2024-02-02 10:28 ` [PATCH v2 06/23] migration/multifd: Separate SYNC request with normal jobs peterx
` (18 subsequent siblings)
23 siblings, 1 reply; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
This array is redundant when p->pages exists. Now we extended the life of
p->pages to the whole period where pending_job is set, it should be safe to
always use p->pages->offset[] rather than p->normal[]. Drop the array.
Alongside, the normal_num is also redundant, which is the same to
p->pages->num.
This doesn't apply to recv side, because there's no extra buffering on recv
side, so p->normal[] array is still needed.
Reviewed-by: Fabiano Rosas <farosas@suse.de>
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.h | 4 ----
migration/multifd-zlib.c | 7 ++++---
migration/multifd-zstd.c | 7 ++++---
migration/multifd.c | 33 +++++++++++++--------------------
4 files changed, 21 insertions(+), 30 deletions(-)
diff --git a/migration/multifd.h b/migration/multifd.h
index 7c040cb85a..3920bdbcf1 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -122,10 +122,6 @@ typedef struct {
struct iovec *iov;
/* number of iovs used */
uint32_t iovs_num;
- /* Pages that are not zero */
- ram_addr_t *normal;
- /* num of non zero pages */
- uint32_t normal_num;
/* used for compression methods */
void *data;
} MultiFDSendParams;
diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
index 37ce48621e..100809abc1 100644
--- a/migration/multifd-zlib.c
+++ b/migration/multifd-zlib.c
@@ -116,17 +116,18 @@ static void zlib_send_cleanup(MultiFDSendParams *p, Error **errp)
*/
static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
{
+ MultiFDPages_t *pages = p->pages;
struct zlib_data *z = p->data;
z_stream *zs = &z->zs;
uint32_t out_size = 0;
int ret;
uint32_t i;
- for (i = 0; i < p->normal_num; i++) {
+ for (i = 0; i < pages->num; i++) {
uint32_t available = z->zbuff_len - out_size;
int flush = Z_NO_FLUSH;
- if (i == p->normal_num - 1) {
+ if (i == pages->num - 1) {
flush = Z_SYNC_FLUSH;
}
@@ -135,7 +136,7 @@ static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
* with compression. zlib does not guarantee that this is safe,
* therefore copy the page before calling deflate().
*/
- memcpy(z->buf, p->pages->block->host + p->normal[i], p->page_size);
+ memcpy(z->buf, p->pages->block->host + pages->offset[i], p->page_size);
zs->avail_in = p->page_size;
zs->next_in = z->buf;
diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
index b471daadcd..2023edd8cc 100644
--- a/migration/multifd-zstd.c
+++ b/migration/multifd-zstd.c
@@ -113,6 +113,7 @@ static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp)
*/
static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
{
+ MultiFDPages_t *pages = p->pages;
struct zstd_data *z = p->data;
int ret;
uint32_t i;
@@ -121,13 +122,13 @@ static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
z->out.size = z->zbuff_len;
z->out.pos = 0;
- for (i = 0; i < p->normal_num; i++) {
+ for (i = 0; i < pages->num; i++) {
ZSTD_EndDirective flush = ZSTD_e_continue;
- if (i == p->normal_num - 1) {
+ if (i == pages->num - 1) {
flush = ZSTD_e_flush;
}
- z->in.src = p->pages->block->host + p->normal[i];
+ z->in.src = p->pages->block->host + pages->offset[i];
z->in.size = p->page_size;
z->in.pos = 0;
diff --git a/migration/multifd.c b/migration/multifd.c
index 5633ac245a..8bb1fd95cf 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -90,13 +90,13 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
{
MultiFDPages_t *pages = p->pages;
- for (int i = 0; i < p->normal_num; i++) {
- p->iov[p->iovs_num].iov_base = pages->block->host + p->normal[i];
+ for (int i = 0; i < pages->num; i++) {
+ p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
p->iov[p->iovs_num].iov_len = p->page_size;
p->iovs_num++;
}
- p->next_packet_size = p->normal_num * p->page_size;
+ p->next_packet_size = pages->num * p->page_size;
p->flags |= MULTIFD_FLAG_NOCOMP;
return 0;
}
@@ -269,21 +269,22 @@ static void multifd_pages_clear(MultiFDPages_t *pages)
static void multifd_send_fill_packet(MultiFDSendParams *p)
{
MultiFDPacket_t *packet = p->packet;
+ MultiFDPages_t *pages = p->pages;
int i;
packet->flags = cpu_to_be32(p->flags);
packet->pages_alloc = cpu_to_be32(p->pages->allocated);
- packet->normal_pages = cpu_to_be32(p->normal_num);
+ packet->normal_pages = cpu_to_be32(pages->num);
packet->next_packet_size = cpu_to_be32(p->next_packet_size);
packet->packet_num = cpu_to_be64(p->packet_num);
- if (p->pages->block) {
- strncpy(packet->ramblock, p->pages->block->idstr, 256);
+ if (pages->block) {
+ strncpy(packet->ramblock, pages->block->idstr, 256);
}
- for (i = 0; i < p->normal_num; i++) {
+ for (i = 0; i < pages->num; i++) {
/* there are architectures where ram_addr_t is 32 bit */
- uint64_t temp = p->normal[i];
+ uint64_t temp = pages->offset[i];
packet->offset[i] = cpu_to_be64(temp);
}
@@ -570,8 +571,6 @@ void multifd_save_cleanup(void)
p->packet = NULL;
g_free(p->iov);
p->iov = NULL;
- g_free(p->normal);
- p->normal = NULL;
multifd_send_state->ops->send_cleanup(p, &local_err);
if (local_err) {
migrate_set_error(migrate_get_current(), local_err);
@@ -688,8 +687,8 @@ static void *multifd_send_thread(void *opaque)
if (p->pending_job) {
uint64_t packet_num = p->packet_num;
+ MultiFDPages_t *pages = p->pages;
uint32_t flags;
- p->normal_num = 0;
if (use_zero_copy_send) {
p->iovs_num = 0;
@@ -697,12 +696,7 @@ static void *multifd_send_thread(void *opaque)
p->iovs_num = 1;
}
- for (int i = 0; i < p->pages->num; i++) {
- p->normal[p->normal_num] = p->pages->offset[i];
- p->normal_num++;
- }
-
- if (p->normal_num) {
+ if (pages->num) {
ret = multifd_send_state->ops->send_prepare(p, &local_err);
if (ret != 0) {
qemu_mutex_unlock(&p->mutex);
@@ -713,10 +707,10 @@ static void *multifd_send_thread(void *opaque)
flags = p->flags;
p->flags = 0;
p->num_packets++;
- p->total_normal_pages += p->normal_num;
+ p->total_normal_pages += pages->num;
qemu_mutex_unlock(&p->mutex);
- trace_multifd_send(p->id, packet_num, p->normal_num, flags,
+ trace_multifd_send(p->id, packet_num, pages->num, flags,
p->next_packet_size);
if (use_zero_copy_send) {
@@ -924,7 +918,6 @@ int multifd_save_setup(Error **errp)
p->name = g_strdup_printf("multifdsend_%d", i);
/* We need one extra place for the packet header */
p->iov = g_new0(struct iovec, page_count + 1);
- p->normal = g_new0(ram_addr_t, page_count);
p->page_size = qemu_target_page_size();
p->page_count = page_count;
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 06/23] migration/multifd: Separate SYNC request with normal jobs
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (4 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 05/23] migration/multifd: Drop MultiFDSendParams.normal[] array peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 19:21 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 07/23] migration/multifd: Simplify locking in sender thread peterx
` (17 subsequent siblings)
23 siblings, 1 reply; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
Multifd provide a threaded model for processing jobs. On sender side,
there can be two kinds of job: (1) a list of pages to send, or (2) a sync
request.
The sync request is a very special kind of job. It never contains a page
array, but only a multifd packet telling the dest side to synchronize with
sent pages.
Before this patch, both requests use the pending_job field, no matter what
the request is, it will boost pending_job, while multifd sender thread will
decrement it after it finishes one job.
However this should be racy, because SYNC is special in that it needs to
set p->flags with MULTIFD_FLAG_SYNC, showing that this is a sync request.
Consider a sequence of operations where:
- migration thread enqueue a job to send some pages, pending_job++ (0->1)
- [...before the selected multifd sender thread wakes up...]
- migration thread enqueue another job to sync, pending_job++ (1->2),
setup p->flags=MULTIFD_FLAG_SYNC
- multifd sender thread wakes up, found pending_job==2
- send the 1st packet with MULTIFD_FLAG_SYNC and list of pages
- send the 2nd packet with flags==0 and no pages
This is not expected, because MULTIFD_FLAG_SYNC should hopefully be done
after all the pages are received. Meanwhile, the 2nd packet will be
completely useless, which contains zero information.
I didn't verify above, but I think this issue is still benign in that at
least on the recv side we always receive pages before handling
MULTIFD_FLAG_SYNC. However that's not always guaranteed and just tricky.
One other reason I want to separate it is using p->flags to communicate
between the two threads is also not clearly defined, it's very hard to read
and understand why accessing p->flags is always safe; see the current impl
of multifd_send_thread() where we tried to cache only p->flags. It doesn't
need to be that complicated.
This patch introduces pending_sync, a separate flag just to show that the
requester needs a sync. Alongside, we remove the tricky caching of
p->flags now because after this patch p->flags should only be used by
multifd sender thread now, which will be crystal clear. So it is always
thread safe to access p->flags.
With that, we can also safely convert the pending_job into a boolean,
because we don't support >1 pending jobs anyway.
Always use atomic ops to access both flags to make sure no cache effect.
When at it, drop the initial setting of "pending_job = 0" because it's
always allocated using g_new0().
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.h | 13 +++++++++++--
migration/multifd.c | 39 +++++++++++++++++++++++++--------------
2 files changed, 36 insertions(+), 16 deletions(-)
diff --git a/migration/multifd.h b/migration/multifd.h
index 3920bdbcf1..08f26ef3fe 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -99,8 +99,17 @@ typedef struct {
uint32_t flags;
/* global number of generated multifd packets */
uint64_t packet_num;
- /* thread has work to do */
- int pending_job;
+ /*
+ * The sender thread has work to do if either of below boolean is set.
+ *
+ * @pending_job: a job is pending
+ * @pending_sync: a sync request is pending
+ *
+ * For both of these fields, they're only set by the requesters, and
+ * cleared by the multifd sender threads.
+ */
+ bool pending_job;
+ bool pending_sync;
/* array of pages to sent.
* The owner of 'pages' depends of 'pending_job' value:
* pending_job == 0 -> migration_thread can use it.
diff --git a/migration/multifd.c b/migration/multifd.c
index 8bb1fd95cf..ea25bbe6bd 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -442,8 +442,8 @@ static int multifd_send_pages(void)
}
p = &multifd_send_state->params[i];
qemu_mutex_lock(&p->mutex);
- if (!p->pending_job) {
- p->pending_job++;
+ if (qatomic_read(&p->pending_job) == false) {
+ qatomic_set(&p->pending_job, true);
next_channel = (i + 1) % migrate_multifd_channels();
break;
}
@@ -631,8 +631,12 @@ int multifd_send_sync_main(void)
qemu_mutex_lock(&p->mutex);
p->packet_num = multifd_send_state->packet_num++;
- p->flags |= MULTIFD_FLAG_SYNC;
- p->pending_job++;
+ /*
+ * We should be the only user so far, so not possible to be set by
+ * others concurrently.
+ */
+ assert(qatomic_read(&p->pending_sync) == false);
+ qatomic_set(&p->pending_sync, true);
qemu_mutex_unlock(&p->mutex);
qemu_sem_post(&p->sem);
}
@@ -685,10 +689,9 @@ static void *multifd_send_thread(void *opaque)
}
qemu_mutex_lock(&p->mutex);
- if (p->pending_job) {
+ if (qatomic_read(&p->pending_job)) {
uint64_t packet_num = p->packet_num;
MultiFDPages_t *pages = p->pages;
- uint32_t flags;
if (use_zero_copy_send) {
p->iovs_num = 0;
@@ -704,13 +707,11 @@ static void *multifd_send_thread(void *opaque)
}
}
multifd_send_fill_packet(p);
- flags = p->flags;
- p->flags = 0;
p->num_packets++;
p->total_normal_pages += pages->num;
qemu_mutex_unlock(&p->mutex);
- trace_multifd_send(p->id, packet_num, pages->num, flags,
+ trace_multifd_send(p->id, packet_num, pages->num, p->flags,
p->next_packet_size);
if (use_zero_copy_send) {
@@ -738,12 +739,23 @@ static void *multifd_send_thread(void *opaque)
multifd_pages_reset(p->pages);
p->next_packet_size = 0;
qemu_mutex_lock(&p->mutex);
- p->pending_job--;
+ qatomic_set(&p->pending_job, false);
qemu_mutex_unlock(&p->mutex);
-
- if (flags & MULTIFD_FLAG_SYNC) {
- qemu_sem_post(&p->sem_sync);
+ } else if (qatomic_read(&p->pending_sync)) {
+ p->flags = MULTIFD_FLAG_SYNC;
+ multifd_send_fill_packet(p);
+ ret = qio_channel_write_all(p->c, (void *)p->packet,
+ p->packet_len, &local_err);
+ if (ret != 0) {
+ qemu_mutex_unlock(&p->mutex);
+ break;
}
+ /* p->next_packet_size will always be zero for a SYNC packet */
+ stat64_add(&mig_stats.multifd_bytes, p->packet_len);
+ p->flags = 0;
+ qatomic_set(&p->pending_sync, false);
+ qemu_mutex_unlock(&p->mutex);
+ qemu_sem_post(&p->sem_sync);
} else {
qemu_mutex_unlock(&p->mutex);
/* sometimes there are spurious wakeups */
@@ -907,7 +919,6 @@ int multifd_save_setup(Error **errp)
qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem, 0);
qemu_sem_init(&p->sem_sync, 0);
- p->pending_job = 0;
p->id = i;
p->pages = multifd_pages_init(page_count);
p->packet_len = sizeof(MultiFDPacket_t)
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 07/23] migration/multifd: Simplify locking in sender thread
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (5 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 06/23] migration/multifd: Separate SYNC request with normal jobs peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 19:23 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 08/23] migration/multifd: Drop pages->num check " peterx
` (16 subsequent siblings)
23 siblings, 1 reply; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
The sender thread will yield the p->mutex before IO starts, trying to not
block the requester thread. This may be unnecessary lock optimizations,
because the requester can already read pending_job safely even without the
lock, because the requester is currently the only one who can assign a
task.
Drop that lock complication on both sides:
(1) in the sender thread, always take the mutex until job done
(2) in the requester thread, check pending_job clear lockless
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.c | 23 ++++++++++++++++-------
1 file changed, 16 insertions(+), 7 deletions(-)
diff --git a/migration/multifd.c b/migration/multifd.c
index ea25bbe6bd..4d5a01ed93 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -429,7 +429,9 @@ static int multifd_send_pages(void)
return -1;
}
+ /* We wait here, until at least one channel is ready */
qemu_sem_wait(&multifd_send_state->channels_ready);
+
/*
* next_channel can remain from a previous migration that was
* using more channels, so ensure it doesn't overflow if the
@@ -441,17 +443,26 @@ static int multifd_send_pages(void)
return -1;
}
p = &multifd_send_state->params[i];
- qemu_mutex_lock(&p->mutex);
+ /*
+ * Lockless read to p->pending_job is safe, because only multifd
+ * sender thread can clear it.
+ */
if (qatomic_read(&p->pending_job) == false) {
- qatomic_set(&p->pending_job, true);
next_channel = (i + 1) % migrate_multifd_channels();
break;
}
- qemu_mutex_unlock(&p->mutex);
}
+
+ qemu_mutex_lock(&p->mutex);
assert(!p->pages->num);
assert(!p->pages->block);
-
+ /*
+ * Double check on pending_job==false with the lock. In the future if
+ * we can have >1 requester thread, we can replace this with a "goto
+ * retry", but that is for later.
+ */
+ assert(qatomic_read(&p->pending_job) == false);
+ qatomic_set(&p->pending_job, true);
p->packet_num = multifd_send_state->packet_num++;
multifd_send_state->pages = p->pages;
p->pages = pages;
@@ -709,8 +720,6 @@ static void *multifd_send_thread(void *opaque)
multifd_send_fill_packet(p);
p->num_packets++;
p->total_normal_pages += pages->num;
- qemu_mutex_unlock(&p->mutex);
-
trace_multifd_send(p->id, packet_num, pages->num, p->flags,
p->next_packet_size);
@@ -730,6 +739,7 @@ static void *multifd_send_thread(void *opaque)
ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
0, p->write_flags, &local_err);
if (ret != 0) {
+ qemu_mutex_unlock(&p->mutex);
break;
}
@@ -738,7 +748,6 @@ static void *multifd_send_thread(void *opaque)
multifd_pages_reset(p->pages);
p->next_packet_size = 0;
- qemu_mutex_lock(&p->mutex);
qatomic_set(&p->pending_job, false);
qemu_mutex_unlock(&p->mutex);
} else if (qatomic_read(&p->pending_sync)) {
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 08/23] migration/multifd: Drop pages->num check in sender thread
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (6 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 07/23] migration/multifd: Simplify locking in sender thread peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 10:28 ` [PATCH v2 09/23] migration/multifd: Rename p->num_packets and clean it up peterx
` (15 subsequent siblings)
23 siblings, 0 replies; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
Now with a split SYNC handler, we always have pages->num set for
pending_job==true. Assert it instead.
Reviewed-by: Fabiano Rosas <farosas@suse.de>
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.c | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
diff --git a/migration/multifd.c b/migration/multifd.c
index 4d5a01ed93..518f9de723 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -710,13 +710,14 @@ static void *multifd_send_thread(void *opaque)
p->iovs_num = 1;
}
- if (pages->num) {
- ret = multifd_send_state->ops->send_prepare(p, &local_err);
- if (ret != 0) {
- qemu_mutex_unlock(&p->mutex);
- break;
- }
+ assert(pages->num);
+
+ ret = multifd_send_state->ops->send_prepare(p, &local_err);
+ if (ret != 0) {
+ qemu_mutex_unlock(&p->mutex);
+ break;
}
+
multifd_send_fill_packet(p);
p->num_packets++;
p->total_normal_pages += pages->num;
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 09/23] migration/multifd: Rename p->num_packets and clean it up
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (7 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 08/23] migration/multifd: Drop pages->num check " peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 10:28 ` [PATCH v2 10/23] migration/multifd: Move total_normal_pages accounting peterx
` (14 subsequent siblings)
23 siblings, 0 replies; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
This field, no matter whether on src or dest, is only used for debugging
purpose.
They can even be removed already, unless it still more or less provide some
accounting on "how many packets are sent/recved for this thread". The
other more important one is called packet_num, which is embeded in the
multifd packet headers (MultiFDPacket_t).
So let's keep them for now, but make them much easier to understand, by
doing below:
- Rename both of them to packets_sent / packets_recved, the old
name (num_packets) are waaay too confusing when we already have
MultiFDPacket_t.packets_num.
- Avoid worrying on the "initial packet": we know we will send it, that's
good enough. The accounting won't matter a great deal to start with 0 or
with 1.
- Move them to where we send/recv the packets. They're:
- multifd_send_fill_packet() for senders.
- multifd_recv_unfill_packet() for receivers.
Reviewed-by: Fabiano Rosas <farosas@suse.de>
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.h | 6 +++---
migration/multifd.c | 13 +++++--------
2 files changed, 8 insertions(+), 11 deletions(-)
diff --git a/migration/multifd.h b/migration/multifd.h
index 08f26ef3fe..2e4ad0dc56 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -124,7 +124,7 @@ typedef struct {
/* size of the next packet that contains pages */
uint32_t next_packet_size;
/* packets sent through this channel */
- uint64_t num_packets;
+ uint64_t packets_sent;
/* non zero pages sent through this channel */
uint64_t total_normal_pages;
/* buffers to send */
@@ -174,8 +174,8 @@ typedef struct {
MultiFDPacket_t *packet;
/* size of the next packet that contains pages */
uint32_t next_packet_size;
- /* packets sent through this channel */
- uint64_t num_packets;
+ /* packets received through this channel */
+ uint64_t packets_recved;
/* ramblock */
RAMBlock *block;
/* ramblock host address */
diff --git a/migration/multifd.c b/migration/multifd.c
index 518f9de723..eca76e2c18 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -288,6 +288,8 @@ static void multifd_send_fill_packet(MultiFDSendParams *p)
packet->offset[i] = cpu_to_be64(temp);
}
+
+ p->packets_sent++;
}
static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
@@ -335,6 +337,7 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
p->next_packet_size = be32_to_cpu(packet->next_packet_size);
p->packet_num = be64_to_cpu(packet->packet_num);
+ p->packets_recved++;
if (p->normal_num == 0) {
return 0;
@@ -688,8 +691,6 @@ static void *multifd_send_thread(void *opaque)
ret = -1;
goto out;
}
- /* initial packet */
- p->num_packets = 1;
while (true) {
qemu_sem_post(&multifd_send_state->channels_ready);
@@ -719,7 +720,6 @@ static void *multifd_send_thread(void *opaque)
}
multifd_send_fill_packet(p);
- p->num_packets++;
p->total_normal_pages += pages->num;
trace_multifd_send(p->id, packet_num, pages->num, p->flags,
p->next_packet_size);
@@ -787,7 +787,7 @@ out:
rcu_unregister_thread();
migration_threads_remove(thread);
- trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages);
+ trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages);
return NULL;
}
@@ -1124,7 +1124,6 @@ static void *multifd_recv_thread(void *opaque)
p->flags &= ~MULTIFD_FLAG_SYNC;
trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
p->next_packet_size);
- p->num_packets++;
p->total_normal_pages += p->normal_num;
qemu_mutex_unlock(&p->mutex);
@@ -1150,7 +1149,7 @@ static void *multifd_recv_thread(void *opaque)
qemu_mutex_unlock(&p->mutex);
rcu_unregister_thread();
- trace_multifd_recv_thread_end(p->id, p->num_packets, p->total_normal_pages);
+ trace_multifd_recv_thread_end(p->id, p->packets_recved, p->total_normal_pages);
return NULL;
}
@@ -1252,8 +1251,6 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
}
p->c = ioc;
object_ref(OBJECT(ioc));
- /* initial packet */
- p->num_packets = 1;
p->running = true;
qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 10/23] migration/multifd: Move total_normal_pages accounting
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (8 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 09/23] migration/multifd: Rename p->num_packets and clean it up peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 10:28 ` [PATCH v2 11/23] migration/multifd: Move trace_multifd_send|recv() peterx
` (13 subsequent siblings)
23 siblings, 0 replies; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
Just like the previous patch, move the accounting for total_normal_pages on
both src/dst sides into the packet fill/unfill procedures.
Reviewed-by: Fabiano Rosas <farosas@suse.de>
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.c | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/migration/multifd.c b/migration/multifd.c
index eca76e2c18..94a0124934 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -290,6 +290,7 @@ static void multifd_send_fill_packet(MultiFDSendParams *p)
}
p->packets_sent++;
+ p->total_normal_pages += pages->num;
}
static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
@@ -338,6 +339,7 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
p->next_packet_size = be32_to_cpu(packet->next_packet_size);
p->packet_num = be64_to_cpu(packet->packet_num);
p->packets_recved++;
+ p->total_normal_pages += p->normal_num;
if (p->normal_num == 0) {
return 0;
@@ -720,7 +722,6 @@ static void *multifd_send_thread(void *opaque)
}
multifd_send_fill_packet(p);
- p->total_normal_pages += pages->num;
trace_multifd_send(p->id, packet_num, pages->num, p->flags,
p->next_packet_size);
@@ -1124,7 +1125,6 @@ static void *multifd_recv_thread(void *opaque)
p->flags &= ~MULTIFD_FLAG_SYNC;
trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
p->next_packet_size);
- p->total_normal_pages += p->normal_num;
qemu_mutex_unlock(&p->mutex);
if (p->normal_num) {
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 11/23] migration/multifd: Move trace_multifd_send|recv()
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (9 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 10/23] migration/multifd: Move total_normal_pages accounting peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 10:28 ` [PATCH v2 12/23] migration/multifd: multifd_send_prepare_header() peterx
` (12 subsequent siblings)
23 siblings, 0 replies; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
Move them into fill/unfill of packets. With that, we can further cleanup
the send/recv thread procedure, and remove one more temp var.
Reviewed-by: Fabiano Rosas <farosas@suse.de>
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.c | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
diff --git a/migration/multifd.c b/migration/multifd.c
index 94a0124934..44163e4e28 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -291,6 +291,9 @@ static void multifd_send_fill_packet(MultiFDSendParams *p)
p->packets_sent++;
p->total_normal_pages += pages->num;
+
+ trace_multifd_send(p->id, p->packet_num, pages->num, p->flags,
+ p->next_packet_size);
}
static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
@@ -341,6 +344,9 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
p->packets_recved++;
p->total_normal_pages += p->normal_num;
+ trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags,
+ p->next_packet_size);
+
if (p->normal_num == 0) {
return 0;
}
@@ -704,7 +710,6 @@ static void *multifd_send_thread(void *opaque)
qemu_mutex_lock(&p->mutex);
if (qatomic_read(&p->pending_job)) {
- uint64_t packet_num = p->packet_num;
MultiFDPages_t *pages = p->pages;
if (use_zero_copy_send) {
@@ -722,8 +727,6 @@ static void *multifd_send_thread(void *opaque)
}
multifd_send_fill_packet(p);
- trace_multifd_send(p->id, packet_num, pages->num, p->flags,
- p->next_packet_size);
if (use_zero_copy_send) {
/* Send header first, without zerocopy */
@@ -1123,8 +1126,6 @@ static void *multifd_recv_thread(void *opaque)
flags = p->flags;
/* recv methods don't know how to handle the SYNC flag */
p->flags &= ~MULTIFD_FLAG_SYNC;
- trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
- p->next_packet_size);
qemu_mutex_unlock(&p->mutex);
if (p->normal_num) {
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 12/23] migration/multifd: multifd_send_prepare_header()
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (10 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 11/23] migration/multifd: Move trace_multifd_send|recv() peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 10:28 ` [PATCH v2 13/23] migration/multifd: Move header prepare/fill into send_prepare() peterx
` (11 subsequent siblings)
23 siblings, 0 replies; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
Introduce a helper multifd_send_prepare_header() to setup the header packet
for multifd sender.
It's fine to setup the IOV[0] _before_ send_prepare() because the packet
buffer is already ready, even if the content is to be filled in.
With this helper, we can already slightly clean up the zero copy path.
Note that I explicitly put it into multifd.h, because I want it inlined
directly into multifd*.c where necessary later.
Reviewed-by: Fabiano Rosas <farosas@suse.de>
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.h | 8 ++++++++
migration/multifd.c | 16 ++++++++--------
2 files changed, 16 insertions(+), 8 deletions(-)
diff --git a/migration/multifd.h b/migration/multifd.h
index 2e4ad0dc56..4ec005f53f 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -209,5 +209,13 @@ typedef struct {
void multifd_register_ops(int method, MultiFDMethods *ops);
+static inline void multifd_send_prepare_header(MultiFDSendParams *p)
+{
+ p->iov[0].iov_len = p->packet_len;
+ p->iov[0].iov_base = p->packet;
+ p->iovs_num++;
+}
+
+
#endif
diff --git a/migration/multifd.c b/migration/multifd.c
index 44163e4e28..cd4467aff4 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -712,10 +712,14 @@ static void *multifd_send_thread(void *opaque)
if (qatomic_read(&p->pending_job)) {
MultiFDPages_t *pages = p->pages;
- if (use_zero_copy_send) {
- p->iovs_num = 0;
- } else {
- p->iovs_num = 1;
+ p->iovs_num = 0;
+
+ if (!use_zero_copy_send) {
+ /*
+ * Only !zerocopy needs the header in IOV; zerocopy will
+ * send it separately.
+ */
+ multifd_send_prepare_header(p);
}
assert(pages->num);
@@ -735,10 +739,6 @@ static void *multifd_send_thread(void *opaque)
if (ret != 0) {
break;
}
- } else {
- /* Send header using the same writev call */
- p->iov[0].iov_len = p->packet_len;
- p->iov[0].iov_base = p->packet;
}
ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 13/23] migration/multifd: Move header prepare/fill into send_prepare()
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (11 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 12/23] migration/multifd: multifd_send_prepare_header() peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 19:26 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 14/23] migration/multifd: Forbid spurious wakeups peterx
` (10 subsequent siblings)
23 siblings, 1 reply; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
This patch redefines the interfacing of ->send_prepare(). It further
simplifies multifd_send_thread() especially on zero copy.
Now with the new interface, we require the hook to do all the work for
preparing the IOVs to send. After it's completed, the IOVs should be ready
to be dumped into the specific multifd QIOChannel later.
So now the API looks like:
p->pages -----------> send_prepare() -------------> IOVs
This also prepares for the case where the input can be extended to even not
any p->pages. But that's for later.
This patch will achieve similar goal of what Fabiano used to propose here:
https://lore.kernel.org/r/20240126221943.26628-1-farosas@suse.de
However the send() interface may not be necessary. I'm boldly attaching a
"Co-developed-by" for Fabiano.
Co-developed-by: Fabiano Rosas <farosas@suse.de>
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.h | 1 +
migration/multifd-zlib.c | 4 +++
migration/multifd-zstd.c | 4 +++
migration/multifd.c | 61 ++++++++++++++++++----------------------
4 files changed, 37 insertions(+), 33 deletions(-)
diff --git a/migration/multifd.h b/migration/multifd.h
index 4ec005f53f..34a2ecb9f4 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -208,6 +208,7 @@ typedef struct {
} MultiFDMethods;
void multifd_register_ops(int method, MultiFDMethods *ops);
+void multifd_send_fill_packet(MultiFDSendParams *p);
static inline void multifd_send_prepare_header(MultiFDSendParams *p)
{
diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
index 100809abc1..012e3bdea1 100644
--- a/migration/multifd-zlib.c
+++ b/migration/multifd-zlib.c
@@ -123,6 +123,8 @@ static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
int ret;
uint32_t i;
+ multifd_send_prepare_header(p);
+
for (i = 0; i < pages->num; i++) {
uint32_t available = z->zbuff_len - out_size;
int flush = Z_NO_FLUSH;
@@ -172,6 +174,8 @@ static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
p->next_packet_size = out_size;
p->flags |= MULTIFD_FLAG_ZLIB;
+ multifd_send_fill_packet(p);
+
return 0;
}
diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
index 2023edd8cc..dc8fe43e94 100644
--- a/migration/multifd-zstd.c
+++ b/migration/multifd-zstd.c
@@ -118,6 +118,8 @@ static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
int ret;
uint32_t i;
+ multifd_send_prepare_header(p);
+
z->out.dst = z->zbuff;
z->out.size = z->zbuff_len;
z->out.pos = 0;
@@ -161,6 +163,8 @@ static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
p->next_packet_size = z->out.pos;
p->flags |= MULTIFD_FLAG_ZSTD;
+ multifd_send_fill_packet(p);
+
return 0;
}
diff --git a/migration/multifd.c b/migration/multifd.c
index cd4467aff4..6aa44340de 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -50,15 +50,15 @@ typedef struct {
/**
* nocomp_send_setup: setup send side
*
- * For no compression this function does nothing.
- *
- * Returns 0 for success or -1 for error
- *
* @p: Params for the channel that we are using
* @errp: pointer to an error
*/
static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
{
+ if (migrate_zero_copy_send()) {
+ p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
+ }
+
return 0;
}
@@ -88,7 +88,17 @@ static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
*/
static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
{
+ bool use_zero_copy_send = migrate_zero_copy_send();
MultiFDPages_t *pages = p->pages;
+ int ret;
+
+ if (!use_zero_copy_send) {
+ /*
+ * Only !zerocopy needs the header in IOV; zerocopy will
+ * send it separately.
+ */
+ multifd_send_prepare_header(p);
+ }
for (int i = 0; i < pages->num; i++) {
p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
@@ -98,6 +108,18 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
p->next_packet_size = pages->num * p->page_size;
p->flags |= MULTIFD_FLAG_NOCOMP;
+
+ multifd_send_fill_packet(p);
+
+ if (use_zero_copy_send) {
+ /* Send header first, without zerocopy */
+ ret = qio_channel_write_all(p->c, (void *)p->packet,
+ p->packet_len, errp);
+ if (ret != 0) {
+ return -1;
+ }
+ }
+
return 0;
}
@@ -266,7 +288,7 @@ static void multifd_pages_clear(MultiFDPages_t *pages)
g_free(pages);
}
-static void multifd_send_fill_packet(MultiFDSendParams *p)
+void multifd_send_fill_packet(MultiFDSendParams *p)
{
MultiFDPacket_t *packet = p->packet;
MultiFDPages_t *pages = p->pages;
@@ -688,7 +710,6 @@ static void *multifd_send_thread(void *opaque)
MigrationThread *thread = NULL;
Error *local_err = NULL;
int ret = 0;
- bool use_zero_copy_send = migrate_zero_copy_send();
thread = migration_threads_add(p->name, qemu_get_thread_id());
@@ -713,15 +734,6 @@ static void *multifd_send_thread(void *opaque)
MultiFDPages_t *pages = p->pages;
p->iovs_num = 0;
-
- if (!use_zero_copy_send) {
- /*
- * Only !zerocopy needs the header in IOV; zerocopy will
- * send it separately.
- */
- multifd_send_prepare_header(p);
- }
-
assert(pages->num);
ret = multifd_send_state->ops->send_prepare(p, &local_err);
@@ -730,17 +742,6 @@ static void *multifd_send_thread(void *opaque)
break;
}
- multifd_send_fill_packet(p);
-
- if (use_zero_copy_send) {
- /* Send header first, without zerocopy */
- ret = qio_channel_write_all(p->c, (void *)p->packet,
- p->packet_len, &local_err);
- if (ret != 0) {
- break;
- }
- }
-
ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
0, p->write_flags, &local_err);
if (ret != 0) {
@@ -945,13 +946,7 @@ int multifd_save_setup(Error **errp)
p->iov = g_new0(struct iovec, page_count + 1);
p->page_size = qemu_target_page_size();
p->page_count = page_count;
-
- if (migrate_zero_copy_send()) {
- p->write_flags = QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
- } else {
- p->write_flags = 0;
- }
-
+ p->write_flags = 0;
multifd_new_send_channel_create(p);
}
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 14/23] migration/multifd: Forbid spurious wakeups
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (12 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 13/23] migration/multifd: Move header prepare/fill into send_prepare() peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 10:28 ` [PATCH v2 15/23] migration/multifd: Split multifd_send_terminate_threads() peterx
` (9 subsequent siblings)
23 siblings, 0 replies; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
Now multifd's logic is designed to have no spurious wakeup. I still
remember a talk to Juan and he seems to agree we should drop it now, and if
my memory was right it was there because multifd used to hit that when
still debugging.
Let's drop it and see what can explode; as long as it's not reaching
soft-freeze.
Reviewed-by: Fabiano Rosas <farosas@suse.de>
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.c | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git a/migration/multifd.c b/migration/multifd.c
index 6aa44340de..28b54100cd 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -756,7 +756,9 @@ static void *multifd_send_thread(void *opaque)
p->next_packet_size = 0;
qatomic_set(&p->pending_job, false);
qemu_mutex_unlock(&p->mutex);
- } else if (qatomic_read(&p->pending_sync)) {
+ } else {
+ /* If not a normal job, must be a sync request */
+ assert(qatomic_read(&p->pending_sync));
p->flags = MULTIFD_FLAG_SYNC;
multifd_send_fill_packet(p);
ret = qio_channel_write_all(p->c, (void *)p->packet,
@@ -771,9 +773,6 @@ static void *multifd_send_thread(void *opaque)
qatomic_set(&p->pending_sync, false);
qemu_mutex_unlock(&p->mutex);
qemu_sem_post(&p->sem_sync);
- } else {
- qemu_mutex_unlock(&p->mutex);
- /* sometimes there are spurious wakeups */
}
}
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 15/23] migration/multifd: Split multifd_send_terminate_threads()
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (13 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 14/23] migration/multifd: Forbid spurious wakeups peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 19:28 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 16/23] migration/multifd: Change retval of multifd_queue_page() peterx
` (8 subsequent siblings)
23 siblings, 1 reply; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
Split multifd_send_terminate_threads() into two functions:
- multifd_send_set_error(): used when an error happened on the sender
side, set error and quit state only
- multifd_send_terminate_threads(): used only by the main thread to kick
all multifd send threads out of sleep, for the last recycling.
Use multifd_send_set_error() in the three old call sites where only the
error will be set.
Use multifd_send_terminate_threads() in the last one where the main thread
will kick the multifd threads at last in multifd_save_cleanup().
Both helpers will need to set quitting=1.
Suggested-by: Fabiano Rosas <farosas@suse.de>
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.c | 27 ++++++++++++++++++---------
migration/trace-events | 2 +-
2 files changed, 19 insertions(+), 10 deletions(-)
diff --git a/migration/multifd.c b/migration/multifd.c
index 28b54100cd..ba86f9dda5 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -536,10 +536,9 @@ int multifd_queue_page(RAMBlock *block, ram_addr_t offset)
return 1;
}
-static void multifd_send_terminate_threads(Error *err)
+/* Multifd send side hit an error; remember it and prepare to quit */
+static void multifd_send_set_error(Error *err)
{
- int i;
-
/*
* We don't want to exit each threads twice. Depending on where
* we get the error, or if there are two independent errors in two
@@ -550,8 +549,6 @@ static void multifd_send_terminate_threads(Error *err)
return;
}
- trace_multifd_send_terminate_threads(err != NULL);
-
if (err) {
MigrationState *s = migrate_get_current();
migrate_set_error(s, err);
@@ -563,7 +560,19 @@ static void multifd_send_terminate_threads(Error *err)
MIGRATION_STATUS_FAILED);
}
}
+}
+
+static void multifd_send_terminate_threads(void)
+{
+ int i;
+
+ trace_multifd_send_terminate_threads();
+ /*
+ * Tell everyone we're quitting. No xchg() needed here; we simply
+ * always set it.
+ */
+ qatomic_set(&multifd_send_state->exiting, 1);
for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
@@ -586,7 +595,7 @@ void multifd_save_cleanup(void)
if (!migrate_multifd()) {
return;
}
- multifd_send_terminate_threads(NULL);
+ multifd_send_terminate_threads();
for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
@@ -780,7 +789,7 @@ out:
if (ret) {
assert(local_err);
trace_multifd_send_error(p->id);
- multifd_send_terminate_threads(local_err);
+ multifd_send_set_error(local_err);
multifd_send_kick_main(p);
error_free(local_err);
}
@@ -816,7 +825,7 @@ static void multifd_tls_outgoing_handshake(QIOTask *task,
trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
- multifd_send_terminate_threads(err);
+ multifd_send_set_error(err);
multifd_send_kick_main(p);
error_free(err);
}
@@ -898,7 +907,7 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
}
trace_multifd_new_send_channel_async_error(p->id, local_err);
- multifd_send_terminate_threads(local_err);
+ multifd_send_set_error(local_err);
multifd_send_kick_main(p);
object_unref(OBJECT(ioc));
error_free(local_err);
diff --git a/migration/trace-events b/migration/trace-events
index de4a743c8a..298ad2b0dd 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -141,7 +141,7 @@ multifd_send_error(uint8_t id) "channel %u"
multifd_send_sync_main(long packet_num) "packet num %ld"
multifd_send_sync_main_signal(uint8_t id) "channel %u"
multifd_send_sync_main_wait(uint8_t id) "channel %u"
-multifd_send_terminate_threads(bool error) "error %d"
+multifd_send_terminate_threads(void) ""
multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages) "channel %u packets %" PRIu64 " normal pages %" PRIu64
multifd_send_thread_start(uint8_t id) "%u"
multifd_tls_outgoing_handshake_start(void *ioc, void *tioc, const char *hostname) "ioc=%p tioc=%p hostname=%s"
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 16/23] migration/multifd: Change retval of multifd_queue_page()
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (14 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 15/23] migration/multifd: Split multifd_send_terminate_threads() peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 19:29 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 17/23] migration/multifd: Change retval of multifd_send_pages() peterx
` (7 subsequent siblings)
23 siblings, 1 reply; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
Using int is an overkill when there're only two options. Change it to a
boolean.
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.h | 2 +-
migration/multifd.c | 9 +++++----
migration/ram.c | 2 +-
3 files changed, 7 insertions(+), 6 deletions(-)
diff --git a/migration/multifd.h b/migration/multifd.h
index 34a2ecb9f4..a320c53a6f 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -22,7 +22,7 @@ bool multifd_recv_all_channels_created(void);
void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
void multifd_recv_sync_main(void);
int multifd_send_sync_main(void);
-int multifd_queue_page(RAMBlock *block, ram_addr_t offset);
+bool multifd_queue_page(RAMBlock *block, ram_addr_t offset);
/* Multifd Compression flags */
#define MULTIFD_FLAG_SYNC (1 << 0)
diff --git a/migration/multifd.c b/migration/multifd.c
index ba86f9dda5..12e587fda8 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -505,7 +505,8 @@ static int multifd_send_pages(void)
return 1;
}
-int multifd_queue_page(RAMBlock *block, ram_addr_t offset)
+/* Returns true if enqueue successful, false otherwise */
+bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
{
MultiFDPages_t *pages = multifd_send_state->pages;
bool changed = false;
@@ -519,21 +520,21 @@ int multifd_queue_page(RAMBlock *block, ram_addr_t offset)
pages->num++;
if (pages->num < pages->allocated) {
- return 1;
+ return true;
}
} else {
changed = true;
}
if (multifd_send_pages() < 0) {
- return -1;
+ return false;
}
if (changed) {
return multifd_queue_page(block, offset);
}
- return 1;
+ return true;
}
/* Multifd send side hit an error; remember it and prepare to quit */
diff --git a/migration/ram.c b/migration/ram.c
index d5b7cd5ac2..4649a81204 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1252,7 +1252,7 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss)
static int ram_save_multifd_page(RAMBlock *block, ram_addr_t offset)
{
- if (multifd_queue_page(block, offset) < 0) {
+ if (!multifd_queue_page(block, offset)) {
return -1;
}
stat64_add(&mig_stats.normal_pages, 1);
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 17/23] migration/multifd: Change retval of multifd_send_pages()
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (15 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 16/23] migration/multifd: Change retval of multifd_queue_page() peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 19:30 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 18/23] migration/multifd: Rewrite multifd_queue_page() peterx
` (6 subsequent siblings)
23 siblings, 1 reply; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
Using int is an overkill when there're only two options. Change it to a
boolean.
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.c | 15 ++++++++-------
1 file changed, 8 insertions(+), 7 deletions(-)
diff --git a/migration/multifd.c b/migration/multifd.c
index 12e587fda8..35d4e8ad1f 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -449,9 +449,10 @@ static void multifd_send_kick_main(MultiFDSendParams *p)
* thread is using the channel mutex when changing it, and the channel
* have to had finish with its own, otherwise pending_job can't be
* false.
+ *
+ * Returns true if succeed, false otherwise.
*/
-
-static int multifd_send_pages(void)
+static bool multifd_send_pages(void)
{
int i;
static int next_channel;
@@ -459,7 +460,7 @@ static int multifd_send_pages(void)
MultiFDPages_t *pages = multifd_send_state->pages;
if (multifd_send_should_exit()) {
- return -1;
+ return false;
}
/* We wait here, until at least one channel is ready */
@@ -473,7 +474,7 @@ static int multifd_send_pages(void)
next_channel %= migrate_multifd_channels();
for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
if (multifd_send_should_exit()) {
- return -1;
+ return false;
}
p = &multifd_send_state->params[i];
/*
@@ -502,7 +503,7 @@ static int multifd_send_pages(void)
qemu_mutex_unlock(&p->mutex);
qemu_sem_post(&p->sem);
- return 1;
+ return true;
}
/* Returns true if enqueue successful, false otherwise */
@@ -526,7 +527,7 @@ bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
changed = true;
}
- if (multifd_send_pages() < 0) {
+ if (!multifd_send_pages()) {
return false;
}
@@ -666,7 +667,7 @@ int multifd_send_sync_main(void)
return 0;
}
if (multifd_send_state->pages->num) {
- if (multifd_send_pages() < 0) {
+ if (!multifd_send_pages()) {
error_report("%s: multifd_send_pages fail", __func__);
return -1;
}
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 18/23] migration/multifd: Rewrite multifd_queue_page()
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (16 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 17/23] migration/multifd: Change retval of multifd_send_pages() peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 20:47 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 19/23] migration/multifd: Cleanup multifd_save_cleanup() peterx
` (5 subsequent siblings)
23 siblings, 1 reply; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
The current multifd_queue_page() is not easy to read and follow. It is not
good with a few reasons:
- No helper at all to show what exactly does a condition mean; in short,
readability is low.
- Rely on pages->ramblock being cleared to detect an empty queue. It's
slightly an overload of the ramblock pointer, per Fabiano [1], which I
also agree.
- Contains a self recursion, even if not necessary..
Rewrite this function. We add some comments to make it even clearer on
what it does.
[1] https://lore.kernel.org/r/87wmrpjzew.fsf@suse.de
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.c | 56 ++++++++++++++++++++++++++++++---------------
1 file changed, 37 insertions(+), 19 deletions(-)
diff --git a/migration/multifd.c b/migration/multifd.c
index 35d4e8ad1f..4ab8e6eff2 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -506,35 +506,53 @@ static bool multifd_send_pages(void)
return true;
}
+static inline bool multifd_queue_empty(MultiFDPages_t *pages)
+{
+ return pages->num == 0;
+}
+
+static inline bool multifd_queue_full(MultiFDPages_t *pages)
+{
+ return pages->num == pages->allocated;
+}
+
+static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
+{
+ pages->offset[pages->num++] = offset;
+}
+
/* Returns true if enqueue successful, false otherwise */
bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
{
- MultiFDPages_t *pages = multifd_send_state->pages;
- bool changed = false;
+ MultiFDPages_t *pages;
+
+retry:
+ pages = multifd_send_state->pages;
- if (!pages->block) {
+ /* If the queue is empty, we can already enqueue now */
+ if (multifd_queue_empty(pages)) {
pages->block = block;
+ multifd_enqueue(pages, offset);
+ return true;
}
- if (pages->block == block) {
- pages->offset[pages->num] = offset;
- pages->num++;
-
- if (pages->num < pages->allocated) {
- return true;
+ /*
+ * Not empty, meanwhile we need a flush. It can because of either:
+ *
+ * (1) The page is not on the same ramblock of previous ones, or,
+ * (2) The queue is full.
+ *
+ * After flush, always retry.
+ */
+ if (pages->block != block || multifd_queue_full(pages)) {
+ if (!multifd_send_pages()) {
+ return false;
}
- } else {
- changed = true;
- }
-
- if (!multifd_send_pages()) {
- return false;
- }
-
- if (changed) {
- return multifd_queue_page(block, offset);
+ goto retry;
}
+ /* Not empty, and we still have space, do it! */
+ multifd_enqueue(pages, offset);
return true;
}
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 19/23] migration/multifd: Cleanup multifd_save_cleanup()
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (17 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 18/23] migration/multifd: Rewrite multifd_queue_page() peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 20:54 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 20/23] migration/multifd: Cleanup multifd_load_cleanup() peterx
` (4 subsequent siblings)
23 siblings, 1 reply; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
Shrink the function by moving relevant works into helpers: move the thread
join()s into multifd_send_terminate_threads(), then create two more helpers
to cover channel/state cleanups.
Add a TODO entry for the thread terminate process because p->running is
still buggy. We need to fix it at some point but not yet covered.
Suggested-by: Fabiano Rosas <farosas@suse.de>
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.c | 91 +++++++++++++++++++++++++++++----------------
1 file changed, 59 insertions(+), 32 deletions(-)
diff --git a/migration/multifd.c b/migration/multifd.c
index 4ab8e6eff2..4cb0d2cc17 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -593,6 +593,11 @@ static void multifd_send_terminate_threads(void)
* always set it.
*/
qatomic_set(&multifd_send_state->exiting, 1);
+
+ /*
+ * Firstly, kick all threads out; no matter whether they are just idle,
+ * or blocked in an IO system call.
+ */
for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
@@ -601,6 +606,21 @@ static void multifd_send_terminate_threads(void)
qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
}
}
+
+ /*
+ * Finally recycle all the threads.
+ *
+ * TODO: p->running is still buggy, e.g. we can reach here without the
+ * corresponding multifd_new_send_channel_async() get invoked yet,
+ * then a new thread can even be created after this function returns.
+ */
+ for (i = 0; i < migrate_multifd_channels(); i++) {
+ MultiFDSendParams *p = &multifd_send_state->params[i];
+
+ if (p->running) {
+ qemu_thread_join(&p->thread);
+ }
+ }
}
static int multifd_send_channel_destroy(QIOChannel *send)
@@ -608,6 +628,41 @@ static int multifd_send_channel_destroy(QIOChannel *send)
return socket_send_channel_destroy(send);
}
+static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
+{
+ if (p->registered_yank) {
+ migration_ioc_unregister_yank(p->c);
+ }
+ multifd_send_channel_destroy(p->c);
+ p->c = NULL;
+ qemu_mutex_destroy(&p->mutex);
+ qemu_sem_destroy(&p->sem);
+ qemu_sem_destroy(&p->sem_sync);
+ g_free(p->name);
+ p->name = NULL;
+ multifd_pages_clear(p->pages);
+ p->pages = NULL;
+ p->packet_len = 0;
+ g_free(p->packet);
+ p->packet = NULL;
+ g_free(p->iov);
+ p->iov = NULL;
+ multifd_send_state->ops->send_cleanup(p, errp);
+
+ return *errp == NULL;
+}
+
+static void multifd_send_cleanup_state(void)
+{
+ qemu_sem_destroy(&multifd_send_state->channels_ready);
+ g_free(multifd_send_state->params);
+ multifd_send_state->params = NULL;
+ multifd_pages_clear(multifd_send_state->pages);
+ multifd_send_state->pages = NULL;
+ g_free(multifd_send_state);
+ multifd_send_state = NULL;
+}
+
void multifd_save_cleanup(void)
{
int i;
@@ -615,48 +670,20 @@ void multifd_save_cleanup(void)
if (!migrate_multifd()) {
return;
}
+
multifd_send_terminate_threads();
- for (i = 0; i < migrate_multifd_channels(); i++) {
- MultiFDSendParams *p = &multifd_send_state->params[i];
- if (p->running) {
- qemu_thread_join(&p->thread);
- }
- }
for (i = 0; i < migrate_multifd_channels(); i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
Error *local_err = NULL;
- if (p->registered_yank) {
- migration_ioc_unregister_yank(p->c);
- }
- multifd_send_channel_destroy(p->c);
- p->c = NULL;
- qemu_mutex_destroy(&p->mutex);
- qemu_sem_destroy(&p->sem);
- qemu_sem_destroy(&p->sem_sync);
- g_free(p->name);
- p->name = NULL;
- multifd_pages_clear(p->pages);
- p->pages = NULL;
- p->packet_len = 0;
- g_free(p->packet);
- p->packet = NULL;
- g_free(p->iov);
- p->iov = NULL;
- multifd_send_state->ops->send_cleanup(p, &local_err);
- if (local_err) {
+ if (!multifd_send_cleanup_channel(p, &local_err)) {
migrate_set_error(migrate_get_current(), local_err);
error_free(local_err);
}
}
- qemu_sem_destroy(&multifd_send_state->channels_ready);
- g_free(multifd_send_state->params);
- multifd_send_state->params = NULL;
- multifd_pages_clear(multifd_send_state->pages);
- multifd_send_state->pages = NULL;
- g_free(multifd_send_state);
- multifd_send_state = NULL;
+
+ multifd_send_cleanup_state();
}
static int multifd_zero_copy_flush(QIOChannel *c)
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 20/23] migration/multifd: Cleanup multifd_load_cleanup()
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (18 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 19/23] migration/multifd: Cleanup multifd_save_cleanup() peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 20:55 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 21/23] migration/multifd: Stick with send/recv on function names peterx
` (3 subsequent siblings)
23 siblings, 1 reply; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
Use similar logic to cleanup the recv side.
Note that multifd_recv_terminate_threads() may need some similar rework
like the sender side, but let's leave that for later.
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.c | 52 ++++++++++++++++++++++++++-------------------
1 file changed, 30 insertions(+), 22 deletions(-)
diff --git a/migration/multifd.c b/migration/multifd.c
index 4cb0d2cc17..e2dd2f6e04 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -1070,6 +1070,34 @@ void multifd_load_shutdown(void)
}
}
+static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
+{
+ migration_ioc_unregister_yank(p->c);
+ object_unref(OBJECT(p->c));
+ p->c = NULL;
+ qemu_mutex_destroy(&p->mutex);
+ qemu_sem_destroy(&p->sem_sync);
+ g_free(p->name);
+ p->name = NULL;
+ p->packet_len = 0;
+ g_free(p->packet);
+ p->packet = NULL;
+ g_free(p->iov);
+ p->iov = NULL;
+ g_free(p->normal);
+ p->normal = NULL;
+ multifd_recv_state->ops->recv_cleanup(p);
+}
+
+static void multifd_recv_cleanup_state(void)
+{
+ qemu_sem_destroy(&multifd_recv_state->sem_sync);
+ g_free(multifd_recv_state->params);
+ multifd_recv_state->params = NULL;
+ g_free(multifd_recv_state);
+ multifd_recv_state = NULL;
+}
+
void multifd_load_cleanup(void)
{
int i;
@@ -1092,29 +1120,9 @@ void multifd_load_cleanup(void)
qemu_thread_join(&p->thread);
}
for (i = 0; i < migrate_multifd_channels(); i++) {
- MultiFDRecvParams *p = &multifd_recv_state->params[i];
-
- migration_ioc_unregister_yank(p->c);
- object_unref(OBJECT(p->c));
- p->c = NULL;
- qemu_mutex_destroy(&p->mutex);
- qemu_sem_destroy(&p->sem_sync);
- g_free(p->name);
- p->name = NULL;
- p->packet_len = 0;
- g_free(p->packet);
- p->packet = NULL;
- g_free(p->iov);
- p->iov = NULL;
- g_free(p->normal);
- p->normal = NULL;
- multifd_recv_state->ops->recv_cleanup(p);
+ multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
}
- qemu_sem_destroy(&multifd_recv_state->sem_sync);
- g_free(multifd_recv_state->params);
- multifd_recv_state->params = NULL;
- g_free(multifd_recv_state);
- multifd_recv_state = NULL;
+ multifd_recv_cleanup_state();
}
void multifd_recv_sync_main(void)
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 21/23] migration/multifd: Stick with send/recv on function names
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (19 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 20/23] migration/multifd: Cleanup multifd_load_cleanup() peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 21:03 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 22/23] migration/multifd: Fix MultiFDSendParams.packet_num race peterx
` (2 subsequent siblings)
23 siblings, 1 reply; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
Most of the multifd code uses send/recv to represent the two sides, but
some rare cases use save/load.
Since send/recv is the majority, replacing the save/load use cases to use
send/recv globally. Now we reach a consensus on the naming.
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.h | 10 +++++-----
migration/migration.c | 12 ++++++------
migration/multifd.c | 10 +++++-----
3 files changed, 16 insertions(+), 16 deletions(-)
diff --git a/migration/multifd.h b/migration/multifd.h
index a320c53a6f..9b40a53cb6 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -13,11 +13,11 @@
#ifndef QEMU_MIGRATION_MULTIFD_H
#define QEMU_MIGRATION_MULTIFD_H
-int multifd_save_setup(Error **errp);
-void multifd_save_cleanup(void);
-int multifd_load_setup(Error **errp);
-void multifd_load_cleanup(void);
-void multifd_load_shutdown(void);
+int multifd_send_setup(Error **errp);
+void multifd_send_shutdown(void);
+int multifd_recv_setup(Error **errp);
+void multifd_recv_cleanup(void);
+void multifd_recv_shutdown(void);
bool multifd_recv_all_channels_created(void);
void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
void multifd_recv_sync_main(void);
diff --git a/migration/migration.c b/migration/migration.c
index d5f705ceef..ba99772e76 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -311,7 +311,7 @@ void migration_incoming_state_destroy(void)
{
struct MigrationIncomingState *mis = migration_incoming_get_current();
- multifd_load_cleanup();
+ multifd_recv_cleanup();
compress_threads_load_cleanup();
if (mis->to_src_file) {
@@ -662,7 +662,7 @@ static void process_incoming_migration_bh(void *opaque)
trace_vmstate_downtime_checkpoint("dst-precopy-bh-announced");
- multifd_load_shutdown();
+ multifd_recv_shutdown();
dirty_bitmap_mig_before_vm_start();
@@ -759,7 +759,7 @@ fail:
MIGRATION_STATUS_FAILED);
qemu_fclose(mis->from_src_file);
- multifd_load_cleanup();
+ multifd_recv_cleanup();
compress_threads_load_cleanup();
exit(EXIT_FAILURE);
@@ -885,7 +885,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
default_channel = !mis->from_src_file;
}
- if (multifd_load_setup(errp) != 0) {
+ if (multifd_recv_setup(errp) != 0) {
return;
}
@@ -1331,7 +1331,7 @@ static void migrate_fd_cleanup(MigrationState *s)
}
bql_lock();
- multifd_save_cleanup();
+ multifd_send_shutdown();
qemu_mutex_lock(&s->qemu_file_lock);
tmp = s->to_dst_file;
s->to_dst_file = NULL;
@@ -3623,7 +3623,7 @@ void migrate_fd_connect(MigrationState *s, Error *error_in)
return;
}
- if (multifd_save_setup(&local_err) != 0) {
+ if (multifd_send_setup(&local_err) != 0) {
migrate_set_error(s, local_err);
error_report_err(local_err);
migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
diff --git a/migration/multifd.c b/migration/multifd.c
index e2dd2f6e04..130f86a1fb 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -663,7 +663,7 @@ static void multifd_send_cleanup_state(void)
multifd_send_state = NULL;
}
-void multifd_save_cleanup(void)
+void multifd_send_shutdown(void)
{
int i;
@@ -965,7 +965,7 @@ static void multifd_new_send_channel_create(gpointer opaque)
socket_send_channel_create(multifd_new_send_channel_async, opaque);
}
-int multifd_save_setup(Error **errp)
+int multifd_send_setup(Error **errp)
{
int thread_count;
uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
@@ -1063,7 +1063,7 @@ static void multifd_recv_terminate_threads(Error *err)
}
}
-void multifd_load_shutdown(void)
+void multifd_recv_shutdown(void)
{
if (migrate_multifd()) {
multifd_recv_terminate_threads(NULL);
@@ -1098,7 +1098,7 @@ static void multifd_recv_cleanup_state(void)
multifd_recv_state = NULL;
}
-void multifd_load_cleanup(void)
+void multifd_recv_cleanup(void)
{
int i;
@@ -1213,7 +1213,7 @@ static void *multifd_recv_thread(void *opaque)
return NULL;
}
-int multifd_load_setup(Error **errp)
+int multifd_recv_setup(Error **errp)
{
int thread_count;
uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 22/23] migration/multifd: Fix MultiFDSendParams.packet_num race
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (20 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 21/23] migration/multifd: Stick with send/recv on function names peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 21:08 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 23/23] migration/multifd: Optimize sender side to be lockless peterx
2024-02-06 3:05 ` [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups Peter Xu
23 siblings, 1 reply; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
As reported correctly by Fabiano [1], MultiFDSendParams.packet_num is buggy
to be assigned and stored. Consider two consequent operations of: (1)
queue a job into multifd send thread X, then (2) queue another sync request
to the same send thread X. Then the MultiFDSendParams.packet_num will be
assigned twice, and the first assignment can get lost already.
To avoid that, we move the packet_num assignment from p->packet_num into
where the thread will fill in the packet. Use atomic operations to protect
the field, making sure there's no race.
Note that atomic fetch_add() may not be good for scaling purposes, however
multifd should be fine as number of threads should normally not go beyond
16 threads. Let's leave that concern for later but fix the issue first.
There's also a trick on how to make it always work even on 32 bit hosts for
uint64_t packet number. Switching to uintptr_t as of now to simply the
case. It will cause packet number to overflow easier on 32 bit, but that
shouldn't be a major concern for now as 32 bit systems is not the major
audience for any performance concerns like what multifd wants to address.
We also need to move multifd_send_state definition upper, so that
multifd_send_fill_packet() can reference it.
[1] https://lore.kernel.org/r/87o7d1jlu5.fsf@suse.de
Reported-by: Fabiano Rosas <farosas@suse.de>
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.h | 2 --
migration/multifd.c | 56 +++++++++++++++++++++++++++------------------
2 files changed, 34 insertions(+), 24 deletions(-)
diff --git a/migration/multifd.h b/migration/multifd.h
index 9b40a53cb6..98876ff94a 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -97,8 +97,6 @@ typedef struct {
bool running;
/* multifd flags for each packet */
uint32_t flags;
- /* global number of generated multifd packets */
- uint64_t packet_num;
/*
* The sender thread has work to do if either of below boolean is set.
*
diff --git a/migration/multifd.c b/migration/multifd.c
index 130f86a1fb..b317d57d61 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -45,6 +45,35 @@ typedef struct {
uint64_t unused2[4]; /* Reserved for future use */
} __attribute__((packed)) MultiFDInit_t;
+struct {
+ MultiFDSendParams *params;
+ /* array of pages to sent */
+ MultiFDPages_t *pages;
+ /*
+ * Global number of generated multifd packets.
+ *
+ * Note that we used 'uintptr_t' because it'll naturally support atomic
+ * operations on both 32bit / 64 bits hosts. It means on 32bit systems
+ * multifd will overflow the packet_num easier, but that should be
+ * fine.
+ *
+ * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
+ * hosts, however so far it does not support atomic fetch_add() yet.
+ * Make it easy for now.
+ */
+ uintptr_t packet_num;
+ /* send channels ready */
+ QemuSemaphore channels_ready;
+ /*
+ * Have we already run terminate threads. There is a race when it
+ * happens that we got one error while we are exiting.
+ * We will use atomic operations. Only valid values are 0 and 1.
+ */
+ int exiting;
+ /* multifd ops */
+ MultiFDMethods *ops;
+} *multifd_send_state;
+
/* Multifd without compression */
/**
@@ -292,13 +321,16 @@ void multifd_send_fill_packet(MultiFDSendParams *p)
{
MultiFDPacket_t *packet = p->packet;
MultiFDPages_t *pages = p->pages;
+ uint64_t packet_num;
int i;
packet->flags = cpu_to_be32(p->flags);
packet->pages_alloc = cpu_to_be32(p->pages->allocated);
packet->normal_pages = cpu_to_be32(pages->num);
packet->next_packet_size = cpu_to_be32(p->next_packet_size);
- packet->packet_num = cpu_to_be64(p->packet_num);
+
+ packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
+ packet->packet_num = cpu_to_be64(packet_num);
if (pages->block) {
strncpy(packet->ramblock, pages->block->idstr, 256);
@@ -314,7 +346,7 @@ void multifd_send_fill_packet(MultiFDSendParams *p)
p->packets_sent++;
p->total_normal_pages += pages->num;
- trace_multifd_send(p->id, p->packet_num, pages->num, p->flags,
+ trace_multifd_send(p->id, packet_num, pages->num, p->flags,
p->next_packet_size);
}
@@ -398,24 +430,6 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
return 0;
}
-struct {
- MultiFDSendParams *params;
- /* array of pages to sent */
- MultiFDPages_t *pages;
- /* global number of generated multifd packets */
- uint64_t packet_num;
- /* send channels ready */
- QemuSemaphore channels_ready;
- /*
- * Have we already run terminate threads. There is a race when it
- * happens that we got one error while we are exiting.
- * We will use atomic operations. Only valid values are 0 and 1.
- */
- int exiting;
- /* multifd ops */
- MultiFDMethods *ops;
-} *multifd_send_state;
-
static bool multifd_send_should_exit(void)
{
return qatomic_read(&multifd_send_state->exiting);
@@ -497,7 +511,6 @@ static bool multifd_send_pages(void)
*/
assert(qatomic_read(&p->pending_job) == false);
qatomic_set(&p->pending_job, true);
- p->packet_num = multifd_send_state->packet_num++;
multifd_send_state->pages = p->pages;
p->pages = pages;
qemu_mutex_unlock(&p->mutex);
@@ -730,7 +743,6 @@ int multifd_send_sync_main(void)
trace_multifd_send_sync_main_signal(p->id);
qemu_mutex_lock(&p->mutex);
- p->packet_num = multifd_send_state->packet_num++;
/*
* We should be the only user so far, so not possible to be set by
* others concurrently.
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* [PATCH v2 23/23] migration/multifd: Optimize sender side to be lockless
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (21 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 22/23] migration/multifd: Fix MultiFDSendParams.packet_num race peterx
@ 2024-02-02 10:28 ` peterx
2024-02-02 21:34 ` Fabiano Rosas
2024-02-06 3:05 ` [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups Peter Xu
23 siblings, 1 reply; 49+ messages in thread
From: peterx @ 2024-02-02 10:28 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, peterx, Avihai Horon,
Yuan Liu, Prasad Pandit
From: Peter Xu <peterx@redhat.com>
When reviewing my attempt to refactor send_prepare(), Fabiano suggested we
try out with dropping the mutex in multifd code [1].
I thought about that before but I never tried to change the code. Now
maybe it's time to give it a stab. This only optimizes the sender side.
The trick here is multifd has a clear provider/consumer model, that the
migration main thread publishes requests (either pending_job/pending_sync),
while the multifd sender threads are consumers. Here we don't have a lot
of comlicated data sharing, and the jobs can logically be submitted lockless.
Arm the code with atomic weapons. Two things worth mentioning:
- For multifd_send_pages(): we can use qatomic_load_acquire() when trying
to find a free channel, but that's expensive if we attach one ACQUIRE per
channel. Instead, make it atomic_read() on the pending_job flag, but
merge the ACQUIRE into one single smp_mb_acquire() later.
- For pending_sync: it doesn't have any extra data required since now
p->flags are never touched, it should be safe to not use memory barrier.
That's different from pending_sync.
Provide rich comments for all the lockless operations to state how they are
paired. With that, we can remove the mutex.
[1] https://lore.kernel.org/r/87o7d1jlu5.fsf@suse.de
Suggested-by: Fabiano Rosas <farosas@suse.de>
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.h | 2 --
migration/multifd.c | 51 +++++++++++++++++++++++----------------------
2 files changed, 26 insertions(+), 27 deletions(-)
diff --git a/migration/multifd.h b/migration/multifd.h
index 98876ff94a..78a2317263 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -91,8 +91,6 @@ typedef struct {
/* syncs main thread and channels */
QemuSemaphore sem_sync;
- /* this mutex protects the following parameters */
- QemuMutex mutex;
/* is this channel thread running */
bool running;
/* multifd flags for each packet */
diff --git a/migration/multifd.c b/migration/multifd.c
index b317d57d61..ef13e2e781 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -501,19 +501,19 @@ static bool multifd_send_pages(void)
}
}
- qemu_mutex_lock(&p->mutex);
- assert(!p->pages->num);
- assert(!p->pages->block);
/*
- * Double check on pending_job==false with the lock. In the future if
- * we can have >1 requester thread, we can replace this with a "goto
- * retry", but that is for later.
+ * Make sure we read p->pending_job before all the rest. Pairs with
+ * qatomic_store_release() in multifd_send_thread().
*/
- assert(qatomic_read(&p->pending_job) == false);
- qatomic_set(&p->pending_job, true);
+ smp_mb_acquire();
+ assert(!p->pages->num);
multifd_send_state->pages = p->pages;
p->pages = pages;
- qemu_mutex_unlock(&p->mutex);
+ /*
+ * Making sure p->pages is setup before marking pending_job=true. Pairs
+ * with the qatomic_load_acquire() in multifd_send_thread().
+ */
+ qatomic_store_release(&p->pending_job, true);
qemu_sem_post(&p->sem);
return true;
@@ -648,7 +648,6 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
}
multifd_send_channel_destroy(p->c);
p->c = NULL;
- qemu_mutex_destroy(&p->mutex);
qemu_sem_destroy(&p->sem);
qemu_sem_destroy(&p->sem_sync);
g_free(p->name);
@@ -742,14 +741,12 @@ int multifd_send_sync_main(void)
trace_multifd_send_sync_main_signal(p->id);
- qemu_mutex_lock(&p->mutex);
/*
* We should be the only user so far, so not possible to be set by
* others concurrently.
*/
assert(qatomic_read(&p->pending_sync) == false);
qatomic_set(&p->pending_sync, true);
- qemu_mutex_unlock(&p->mutex);
qemu_sem_post(&p->sem);
}
for (i = 0; i < migrate_multifd_channels(); i++) {
@@ -796,9 +793,12 @@ static void *multifd_send_thread(void *opaque)
if (multifd_send_should_exit()) {
break;
}
- qemu_mutex_lock(&p->mutex);
- if (qatomic_read(&p->pending_job)) {
+ /*
+ * Read pending_job flag before p->pages. Pairs with the
+ * qatomic_store_release() in multifd_send_pages().
+ */
+ if (qatomic_load_acquire(&p->pending_job)) {
MultiFDPages_t *pages = p->pages;
p->iovs_num = 0;
@@ -806,14 +806,12 @@ static void *multifd_send_thread(void *opaque)
ret = multifd_send_state->ops->send_prepare(p, &local_err);
if (ret != 0) {
- qemu_mutex_unlock(&p->mutex);
break;
}
ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
0, p->write_flags, &local_err);
if (ret != 0) {
- qemu_mutex_unlock(&p->mutex);
break;
}
@@ -822,24 +820,31 @@ static void *multifd_send_thread(void *opaque)
multifd_pages_reset(p->pages);
p->next_packet_size = 0;
- qatomic_set(&p->pending_job, false);
- qemu_mutex_unlock(&p->mutex);
+
+ /*
+ * Making sure p->pages is published before saying "we're
+ * free". Pairs with the qatomic_load_acquire() in
+ * multifd_send_pages().
+ */
+ qatomic_store_release(&p->pending_job, false);
} else {
- /* If not a normal job, must be a sync request */
+ /*
+ * If not a normal job, must be a sync request. Note that
+ * pending_sync is a standalone flag (unlike pending_job), so
+ * it doesn't require explicit memory barriers.
+ */
assert(qatomic_read(&p->pending_sync));
p->flags = MULTIFD_FLAG_SYNC;
multifd_send_fill_packet(p);
ret = qio_channel_write_all(p->c, (void *)p->packet,
p->packet_len, &local_err);
if (ret != 0) {
- qemu_mutex_unlock(&p->mutex);
break;
}
/* p->next_packet_size will always be zero for a SYNC packet */
stat64_add(&mig_stats.multifd_bytes, p->packet_len);
p->flags = 0;
qatomic_set(&p->pending_sync, false);
- qemu_mutex_unlock(&p->mutex);
qemu_sem_post(&p->sem_sync);
}
}
@@ -853,10 +858,7 @@ out:
error_free(local_err);
}
- qemu_mutex_lock(&p->mutex);
p->running = false;
- qemu_mutex_unlock(&p->mutex);
-
rcu_unregister_thread();
migration_threads_remove(thread);
trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages);
@@ -998,7 +1000,6 @@ int multifd_send_setup(Error **errp)
for (i = 0; i < thread_count; i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
- qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem, 0);
qemu_sem_init(&p->sem_sync, 0);
p->id = i;
--
2.43.0
^ permalink raw reply related [flat|nested] 49+ messages in thread
* Re: [PATCH v2 03/23] migration/multifd: Drop MultiFDSendParams.quit, cleanup error paths
2024-02-02 10:28 ` [PATCH v2 03/23] migration/multifd: Drop MultiFDSendParams.quit, cleanup error paths peterx
@ 2024-02-02 19:15 ` Fabiano Rosas
0 siblings, 0 replies; 49+ messages in thread
From: Fabiano Rosas @ 2024-02-02 19:15 UTC (permalink / raw)
To: peterx, qemu-devel
Cc: Hao Xiang, Bryan Zhang, peterx, Avihai Horon, Yuan Liu,
Prasad Pandit
peterx@redhat.com writes:
> From: Peter Xu <peterx@redhat.com>
>
> Multifd send side has two fields to indicate error quits:
>
> - MultiFDSendParams.quit
> - &multifd_send_state->exiting
>
> Merge them into the global one. The replacement is done by changing all
> p->quit checks into the global var check. The global check doesn't need
> any lock.
>
> A few more things done on top of this altogether:
>
> - multifd_send_terminate_threads()
>
> Moving the xchg() of &multifd_send_state->exiting upper, so as to cover
> the tracepoint, migrate_set_error() and migrate_set_state().
>
> - multifd_send_sync_main()
>
> In the 2nd loop, add one more check over the global var to make sure we
> don't keep the looping if QEMU already decided to quit.
>
> - multifd_tls_outgoing_handshake()
>
> Use multifd_send_terminate_threads() to set the error state. That has
> a benefit of updating MigrationState.error to that error too, so we can
> persist that 1st error we hit in that specific channel.
>
> - multifd_new_send_channel_async()
>
> Take similar approach like above, drop the migrate_set_error() because
> multifd_send_terminate_threads() already covers that. Unwrap the helper
> multifd_new_send_channel_cleanup() along the way; not really needed.
>
> Signed-off-by: Peter Xu <peterx@redhat.com>
Reviewed-by: Fabiano Rosas <farosas@suse.de>
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [PATCH v2 06/23] migration/multifd: Separate SYNC request with normal jobs
2024-02-02 10:28 ` [PATCH v2 06/23] migration/multifd: Separate SYNC request with normal jobs peterx
@ 2024-02-02 19:21 ` Fabiano Rosas
0 siblings, 0 replies; 49+ messages in thread
From: Fabiano Rosas @ 2024-02-02 19:21 UTC (permalink / raw)
To: peterx, qemu-devel
Cc: Hao Xiang, Bryan Zhang, peterx, Avihai Horon, Yuan Liu,
Prasad Pandit
peterx@redhat.com writes:
> From: Peter Xu <peterx@redhat.com>
>
> Multifd provide a threaded model for processing jobs. On sender side,
> there can be two kinds of job: (1) a list of pages to send, or (2) a sync
> request.
>
> The sync request is a very special kind of job. It never contains a page
> array, but only a multifd packet telling the dest side to synchronize with
> sent pages.
>
> Before this patch, both requests use the pending_job field, no matter what
> the request is, it will boost pending_job, while multifd sender thread will
> decrement it after it finishes one job.
>
> However this should be racy, because SYNC is special in that it needs to
> set p->flags with MULTIFD_FLAG_SYNC, showing that this is a sync request.
> Consider a sequence of operations where:
>
> - migration thread enqueue a job to send some pages, pending_job++ (0->1)
>
> - [...before the selected multifd sender thread wakes up...]
>
> - migration thread enqueue another job to sync, pending_job++ (1->2),
> setup p->flags=MULTIFD_FLAG_SYNC
>
> - multifd sender thread wakes up, found pending_job==2
> - send the 1st packet with MULTIFD_FLAG_SYNC and list of pages
> - send the 2nd packet with flags==0 and no pages
>
> This is not expected, because MULTIFD_FLAG_SYNC should hopefully be done
> after all the pages are received. Meanwhile, the 2nd packet will be
> completely useless, which contains zero information.
>
> I didn't verify above, but I think this issue is still benign in that at
> least on the recv side we always receive pages before handling
> MULTIFD_FLAG_SYNC. However that's not always guaranteed and just tricky.
>
> One other reason I want to separate it is using p->flags to communicate
> between the two threads is also not clearly defined, it's very hard to read
> and understand why accessing p->flags is always safe; see the current impl
> of multifd_send_thread() where we tried to cache only p->flags. It doesn't
> need to be that complicated.
>
> This patch introduces pending_sync, a separate flag just to show that the
> requester needs a sync. Alongside, we remove the tricky caching of
> p->flags now because after this patch p->flags should only be used by
> multifd sender thread now, which will be crystal clear. So it is always
> thread safe to access p->flags.
>
> With that, we can also safely convert the pending_job into a boolean,
> because we don't support >1 pending jobs anyway.
>
> Always use atomic ops to access both flags to make sure no cache effect.
> When at it, drop the initial setting of "pending_job = 0" because it's
> always allocated using g_new0().
>
> Signed-off-by: Peter Xu <peterx@redhat.com>
Reviewed-by: Fabiano Rosas <farosas@suse.de>
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [PATCH v2 07/23] migration/multifd: Simplify locking in sender thread
2024-02-02 10:28 ` [PATCH v2 07/23] migration/multifd: Simplify locking in sender thread peterx
@ 2024-02-02 19:23 ` Fabiano Rosas
0 siblings, 0 replies; 49+ messages in thread
From: Fabiano Rosas @ 2024-02-02 19:23 UTC (permalink / raw)
To: peterx, qemu-devel
Cc: Hao Xiang, Bryan Zhang, peterx, Avihai Horon, Yuan Liu,
Prasad Pandit
peterx@redhat.com writes:
> From: Peter Xu <peterx@redhat.com>
>
> The sender thread will yield the p->mutex before IO starts, trying to not
> block the requester thread. This may be unnecessary lock optimizations,
> because the requester can already read pending_job safely even without the
> lock, because the requester is currently the only one who can assign a
> task.
>
> Drop that lock complication on both sides:
>
> (1) in the sender thread, always take the mutex until job done
> (2) in the requester thread, check pending_job clear lockless
>
> Signed-off-by: Peter Xu <peterx@redhat.com>
Reviewed-by: Fabiano Rosas <farosas@suse.de>
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [PATCH v2 13/23] migration/multifd: Move header prepare/fill into send_prepare()
2024-02-02 10:28 ` [PATCH v2 13/23] migration/multifd: Move header prepare/fill into send_prepare() peterx
@ 2024-02-02 19:26 ` Fabiano Rosas
0 siblings, 0 replies; 49+ messages in thread
From: Fabiano Rosas @ 2024-02-02 19:26 UTC (permalink / raw)
To: peterx, qemu-devel
Cc: Hao Xiang, Bryan Zhang, peterx, Avihai Horon, Yuan Liu,
Prasad Pandit
peterx@redhat.com writes:
> From: Peter Xu <peterx@redhat.com>
>
> This patch redefines the interfacing of ->send_prepare(). It further
> simplifies multifd_send_thread() especially on zero copy.
>
> Now with the new interface, we require the hook to do all the work for
> preparing the IOVs to send. After it's completed, the IOVs should be ready
> to be dumped into the specific multifd QIOChannel later.
>
> So now the API looks like:
>
> p->pages -----------> send_prepare() -------------> IOVs
>
> This also prepares for the case where the input can be extended to even not
> any p->pages. But that's for later.
>
> This patch will achieve similar goal of what Fabiano used to propose here:
>
> https://lore.kernel.org/r/20240126221943.26628-1-farosas@suse.de
>
> However the send() interface may not be necessary. I'm boldly attaching a
> "Co-developed-by" for Fabiano.
>
> Co-developed-by: Fabiano Rosas <farosas@suse.de>
> Signed-off-by: Peter Xu <peterx@redhat.com>
Reviewed-by: Fabiano Rosas <farosas@suse.de>
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [PATCH v2 15/23] migration/multifd: Split multifd_send_terminate_threads()
2024-02-02 10:28 ` [PATCH v2 15/23] migration/multifd: Split multifd_send_terminate_threads() peterx
@ 2024-02-02 19:28 ` Fabiano Rosas
0 siblings, 0 replies; 49+ messages in thread
From: Fabiano Rosas @ 2024-02-02 19:28 UTC (permalink / raw)
To: peterx, qemu-devel
Cc: Hao Xiang, Bryan Zhang, peterx, Avihai Horon, Yuan Liu,
Prasad Pandit
peterx@redhat.com writes:
> From: Peter Xu <peterx@redhat.com>
>
> Split multifd_send_terminate_threads() into two functions:
>
> - multifd_send_set_error(): used when an error happened on the sender
> side, set error and quit state only
>
> - multifd_send_terminate_threads(): used only by the main thread to kick
> all multifd send threads out of sleep, for the last recycling.
>
> Use multifd_send_set_error() in the three old call sites where only the
> error will be set.
>
> Use multifd_send_terminate_threads() in the last one where the main thread
> will kick the multifd threads at last in multifd_save_cleanup().
>
> Both helpers will need to set quitting=1.
>
> Suggested-by: Fabiano Rosas <farosas@suse.de>
> Signed-off-by: Peter Xu <peterx@redhat.com>
Reviewed-by: Fabiano Rosas <farosas@suse.de>
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [PATCH v2 16/23] migration/multifd: Change retval of multifd_queue_page()
2024-02-02 10:28 ` [PATCH v2 16/23] migration/multifd: Change retval of multifd_queue_page() peterx
@ 2024-02-02 19:29 ` Fabiano Rosas
0 siblings, 0 replies; 49+ messages in thread
From: Fabiano Rosas @ 2024-02-02 19:29 UTC (permalink / raw)
To: peterx, qemu-devel
Cc: Hao Xiang, Bryan Zhang, peterx, Avihai Horon, Yuan Liu,
Prasad Pandit
peterx@redhat.com writes:
> From: Peter Xu <peterx@redhat.com>
>
> Using int is an overkill when there're only two options. Change it to a
> boolean.
>
> Signed-off-by: Peter Xu <peterx@redhat.com>
Reviewed-by: Fabiano Rosas <farosas@suse.de>
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [PATCH v2 17/23] migration/multifd: Change retval of multifd_send_pages()
2024-02-02 10:28 ` [PATCH v2 17/23] migration/multifd: Change retval of multifd_send_pages() peterx
@ 2024-02-02 19:30 ` Fabiano Rosas
0 siblings, 0 replies; 49+ messages in thread
From: Fabiano Rosas @ 2024-02-02 19:30 UTC (permalink / raw)
To: peterx, qemu-devel
Cc: Hao Xiang, Bryan Zhang, peterx, Avihai Horon, Yuan Liu,
Prasad Pandit
peterx@redhat.com writes:
> From: Peter Xu <peterx@redhat.com>
>
> Using int is an overkill when there're only two options. Change it to a
> boolean.
>
> Signed-off-by: Peter Xu <peterx@redhat.com>
Reviewed-by: Fabiano Rosas <farosas@suse.de>
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [PATCH v2 18/23] migration/multifd: Rewrite multifd_queue_page()
2024-02-02 10:28 ` [PATCH v2 18/23] migration/multifd: Rewrite multifd_queue_page() peterx
@ 2024-02-02 20:47 ` Fabiano Rosas
2024-02-05 4:03 ` Peter Xu
0 siblings, 1 reply; 49+ messages in thread
From: Fabiano Rosas @ 2024-02-02 20:47 UTC (permalink / raw)
To: peterx, qemu-devel
Cc: Hao Xiang, Bryan Zhang, peterx, Avihai Horon, Yuan Liu,
Prasad Pandit
peterx@redhat.com writes:
> From: Peter Xu <peterx@redhat.com>
>
> The current multifd_queue_page() is not easy to read and follow. It is not
> good with a few reasons:
>
> - No helper at all to show what exactly does a condition mean; in short,
> readability is low.
>
> - Rely on pages->ramblock being cleared to detect an empty queue. It's
> slightly an overload of the ramblock pointer, per Fabiano [1], which I
> also agree.
>
> - Contains a self recursion, even if not necessary..
>
> Rewrite this function. We add some comments to make it even clearer on
> what it does.
>
> [1] https://lore.kernel.org/r/87wmrpjzew.fsf@suse.de
>
> Signed-off-by: Peter Xu <peterx@redhat.com>
Reviewed-by: Fabiano Rosas <farosas@suse.de>
Patch looks good, but I have a question below.
> ---
> migration/multifd.c | 56 ++++++++++++++++++++++++++++++---------------
> 1 file changed, 37 insertions(+), 19 deletions(-)
>
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 35d4e8ad1f..4ab8e6eff2 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -506,35 +506,53 @@ static bool multifd_send_pages(void)
> return true;
> }
>
> +static inline bool multifd_queue_empty(MultiFDPages_t *pages)
> +{
> + return pages->num == 0;
> +}
> +
> +static inline bool multifd_queue_full(MultiFDPages_t *pages)
> +{
> + return pages->num == pages->allocated;
> +}
> +
> +static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
> +{
> + pages->offset[pages->num++] = offset;
> +}
> +
> /* Returns true if enqueue successful, false otherwise */
> bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
> {
> - MultiFDPages_t *pages = multifd_send_state->pages;
> - bool changed = false;
> + MultiFDPages_t *pages;
> +
> +retry:
> + pages = multifd_send_state->pages;
>
> - if (!pages->block) {
> + /* If the queue is empty, we can already enqueue now */
> + if (multifd_queue_empty(pages)) {
> pages->block = block;
> + multifd_enqueue(pages, offset);
> + return true;
> }
>
> - if (pages->block == block) {
> - pages->offset[pages->num] = offset;
> - pages->num++;
> -
> - if (pages->num < pages->allocated) {
> - return true;
> + /*
> + * Not empty, meanwhile we need a flush. It can because of either:
> + *
> + * (1) The page is not on the same ramblock of previous ones, or,
> + * (2) The queue is full.
> + *
> + * After flush, always retry.
> + */
> + if (pages->block != block || multifd_queue_full(pages)) {
> + if (!multifd_send_pages()) {
> + return false;
> }
> - } else {
> - changed = true;
> - }
> -
> - if (!multifd_send_pages()) {
> - return false;
> - }
> -
> - if (changed) {
> - return multifd_queue_page(block, offset);
> + goto retry;
> }
>
> + /* Not empty, and we still have space, do it! */
> + multifd_enqueue(pages, offset);
Hm, here you're missing the flush of the last group of pages of the last
ramblock. Just like current code...
...which means we're relying on the multifd_send_pages() at
multifd_send_sync_main() to send the last few pages. So how can that
work when multifd_flush_after_each_section==false? Because it skips the
sync flag, but would also skip the last send. I'm confused.
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [PATCH v2 19/23] migration/multifd: Cleanup multifd_save_cleanup()
2024-02-02 10:28 ` [PATCH v2 19/23] migration/multifd: Cleanup multifd_save_cleanup() peterx
@ 2024-02-02 20:54 ` Fabiano Rosas
2024-02-05 4:25 ` Peter Xu
0 siblings, 1 reply; 49+ messages in thread
From: Fabiano Rosas @ 2024-02-02 20:54 UTC (permalink / raw)
To: peterx, qemu-devel
Cc: Hao Xiang, Bryan Zhang, peterx, Avihai Horon, Yuan Liu,
Prasad Pandit
peterx@redhat.com writes:
> From: Peter Xu <peterx@redhat.com>
>
> Shrink the function by moving relevant works into helpers: move the thread
> join()s into multifd_send_terminate_threads(), then create two more helpers
> to cover channel/state cleanups.
>
> Add a TODO entry for the thread terminate process because p->running is
> still buggy. We need to fix it at some point but not yet covered.
>
> Suggested-by: Fabiano Rosas <farosas@suse.de>
> Signed-off-by: Peter Xu <peterx@redhat.com>
Reviewed-by: Fabiano Rosas <farosas@suse.de>
minor comment below
> ---
> migration/multifd.c | 91 +++++++++++++++++++++++++++++----------------
> 1 file changed, 59 insertions(+), 32 deletions(-)
>
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 4ab8e6eff2..4cb0d2cc17 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -593,6 +593,11 @@ static void multifd_send_terminate_threads(void)
> * always set it.
> */
> qatomic_set(&multifd_send_state->exiting, 1);
> +
> + /*
> + * Firstly, kick all threads out; no matter whether they are just idle,
> + * or blocked in an IO system call.
> + */
> for (i = 0; i < migrate_multifd_channels(); i++) {
> MultiFDSendParams *p = &multifd_send_state->params[i];
>
> @@ -601,6 +606,21 @@ static void multifd_send_terminate_threads(void)
> qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
> }
> }
> +
> + /*
> + * Finally recycle all the threads.
> + *
> + * TODO: p->running is still buggy, e.g. we can reach here without the
> + * corresponding multifd_new_send_channel_async() get invoked yet,
> + * then a new thread can even be created after this function returns.
> + */
Series on the list:
https://lore.kernel.org/r/20240202191128.1901-1-farosas@suse.de
> + for (i = 0; i < migrate_multifd_channels(); i++) {
> + MultiFDSendParams *p = &multifd_send_state->params[i];
> +
> + if (p->running) {
> + qemu_thread_join(&p->thread);
> + }
> + }
> }
>
> static int multifd_send_channel_destroy(QIOChannel *send)
> @@ -608,6 +628,41 @@ static int multifd_send_channel_destroy(QIOChannel *send)
> return socket_send_channel_destroy(send);
> }
>
> +static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
> +{
> + if (p->registered_yank) {
> + migration_ioc_unregister_yank(p->c);
> + }
> + multifd_send_channel_destroy(p->c);
> + p->c = NULL;
> + qemu_mutex_destroy(&p->mutex);
> + qemu_sem_destroy(&p->sem);
> + qemu_sem_destroy(&p->sem_sync);
> + g_free(p->name);
> + p->name = NULL;
> + multifd_pages_clear(p->pages);
> + p->pages = NULL;
> + p->packet_len = 0;
> + g_free(p->packet);
> + p->packet = NULL;
> + g_free(p->iov);
> + p->iov = NULL;
> + multifd_send_state->ops->send_cleanup(p, errp);
> +
> + return *errp == NULL;
I think technically this would require the ERRP_GUARD() macro?
> +}
> +
> +static void multifd_send_cleanup_state(void)
> +{
> + qemu_sem_destroy(&multifd_send_state->channels_ready);
> + g_free(multifd_send_state->params);
> + multifd_send_state->params = NULL;
> + multifd_pages_clear(multifd_send_state->pages);
> + multifd_send_state->pages = NULL;
> + g_free(multifd_send_state);
> + multifd_send_state = NULL;
> +}
> +
> void multifd_save_cleanup(void)
> {
> int i;
> @@ -615,48 +670,20 @@ void multifd_save_cleanup(void)
> if (!migrate_multifd()) {
> return;
> }
> +
> multifd_send_terminate_threads();
> - for (i = 0; i < migrate_multifd_channels(); i++) {
> - MultiFDSendParams *p = &multifd_send_state->params[i];
>
> - if (p->running) {
> - qemu_thread_join(&p->thread);
> - }
> - }
> for (i = 0; i < migrate_multifd_channels(); i++) {
> MultiFDSendParams *p = &multifd_send_state->params[i];
> Error *local_err = NULL;
>
> - if (p->registered_yank) {
> - migration_ioc_unregister_yank(p->c);
> - }
> - multifd_send_channel_destroy(p->c);
> - p->c = NULL;
> - qemu_mutex_destroy(&p->mutex);
> - qemu_sem_destroy(&p->sem);
> - qemu_sem_destroy(&p->sem_sync);
> - g_free(p->name);
> - p->name = NULL;
> - multifd_pages_clear(p->pages);
> - p->pages = NULL;
> - p->packet_len = 0;
> - g_free(p->packet);
> - p->packet = NULL;
> - g_free(p->iov);
> - p->iov = NULL;
> - multifd_send_state->ops->send_cleanup(p, &local_err);
> - if (local_err) {
> + if (!multifd_send_cleanup_channel(p, &local_err)) {
> migrate_set_error(migrate_get_current(), local_err);
> error_free(local_err);
> }
> }
> - qemu_sem_destroy(&multifd_send_state->channels_ready);
> - g_free(multifd_send_state->params);
> - multifd_send_state->params = NULL;
> - multifd_pages_clear(multifd_send_state->pages);
> - multifd_send_state->pages = NULL;
> - g_free(multifd_send_state);
> - multifd_send_state = NULL;
> +
> + multifd_send_cleanup_state();
> }
>
> static int multifd_zero_copy_flush(QIOChannel *c)
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [PATCH v2 20/23] migration/multifd: Cleanup multifd_load_cleanup()
2024-02-02 10:28 ` [PATCH v2 20/23] migration/multifd: Cleanup multifd_load_cleanup() peterx
@ 2024-02-02 20:55 ` Fabiano Rosas
0 siblings, 0 replies; 49+ messages in thread
From: Fabiano Rosas @ 2024-02-02 20:55 UTC (permalink / raw)
To: peterx, qemu-devel
Cc: Hao Xiang, Bryan Zhang, peterx, Avihai Horon, Yuan Liu,
Prasad Pandit
peterx@redhat.com writes:
> From: Peter Xu <peterx@redhat.com>
>
> Use similar logic to cleanup the recv side.
>
> Note that multifd_recv_terminate_threads() may need some similar rework
> like the sender side, but let's leave that for later.
>
> Signed-off-by: Peter Xu <peterx@redhat.com>
Reviewed-by: Fabiano Rosas <farosas@suse.de>
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [PATCH v2 21/23] migration/multifd: Stick with send/recv on function names
2024-02-02 10:28 ` [PATCH v2 21/23] migration/multifd: Stick with send/recv on function names peterx
@ 2024-02-02 21:03 ` Fabiano Rosas
0 siblings, 0 replies; 49+ messages in thread
From: Fabiano Rosas @ 2024-02-02 21:03 UTC (permalink / raw)
To: peterx, qemu-devel
Cc: Hao Xiang, Bryan Zhang, peterx, Avihai Horon, Yuan Liu,
Prasad Pandit
peterx@redhat.com writes:
> From: Peter Xu <peterx@redhat.com>
>
> Most of the multifd code uses send/recv to represent the two sides, but
> some rare cases use save/load.
>
> Since send/recv is the majority, replacing the save/load use cases to use
> send/recv globally. Now we reach a consensus on the naming.
>
> Signed-off-by: Peter Xu <peterx@redhat.com>
Good!
Reviewed-by: Fabiano Rosas <farosas@suse.de>
> ---
> migration/multifd.h | 10 +++++-----
> migration/migration.c | 12 ++++++------
> migration/multifd.c | 10 +++++-----
> 3 files changed, 16 insertions(+), 16 deletions(-)
>
> diff --git a/migration/multifd.h b/migration/multifd.h
> index a320c53a6f..9b40a53cb6 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -13,11 +13,11 @@
> #ifndef QEMU_MIGRATION_MULTIFD_H
> #define QEMU_MIGRATION_MULTIFD_H
>
> -int multifd_save_setup(Error **errp);
> -void multifd_save_cleanup(void);
> -int multifd_load_setup(Error **errp);
> -void multifd_load_cleanup(void);
> -void multifd_load_shutdown(void);
> +int multifd_send_setup(Error **errp);
> +void multifd_send_shutdown(void);
> +int multifd_recv_setup(Error **errp);
> +void multifd_recv_cleanup(void);
> +void multifd_recv_shutdown(void);
> bool multifd_recv_all_channels_created(void);
> void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
> void multifd_recv_sync_main(void);
> diff --git a/migration/migration.c b/migration/migration.c
> index d5f705ceef..ba99772e76 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -311,7 +311,7 @@ void migration_incoming_state_destroy(void)
> {
> struct MigrationIncomingState *mis = migration_incoming_get_current();
>
> - multifd_load_cleanup();
> + multifd_recv_cleanup();
> compress_threads_load_cleanup();
>
> if (mis->to_src_file) {
> @@ -662,7 +662,7 @@ static void process_incoming_migration_bh(void *opaque)
>
> trace_vmstate_downtime_checkpoint("dst-precopy-bh-announced");
>
> - multifd_load_shutdown();
> + multifd_recv_shutdown();
>
> dirty_bitmap_mig_before_vm_start();
>
> @@ -759,7 +759,7 @@ fail:
> MIGRATION_STATUS_FAILED);
> qemu_fclose(mis->from_src_file);
>
> - multifd_load_cleanup();
> + multifd_recv_cleanup();
> compress_threads_load_cleanup();
>
> exit(EXIT_FAILURE);
> @@ -885,7 +885,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
> default_channel = !mis->from_src_file;
> }
>
> - if (multifd_load_setup(errp) != 0) {
> + if (multifd_recv_setup(errp) != 0) {
> return;
> }
>
> @@ -1331,7 +1331,7 @@ static void migrate_fd_cleanup(MigrationState *s)
> }
> bql_lock();
>
> - multifd_save_cleanup();
> + multifd_send_shutdown();
> qemu_mutex_lock(&s->qemu_file_lock);
> tmp = s->to_dst_file;
> s->to_dst_file = NULL;
> @@ -3623,7 +3623,7 @@ void migrate_fd_connect(MigrationState *s, Error *error_in)
> return;
> }
>
> - if (multifd_save_setup(&local_err) != 0) {
> + if (multifd_send_setup(&local_err) != 0) {
> migrate_set_error(s, local_err);
> error_report_err(local_err);
> migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
> diff --git a/migration/multifd.c b/migration/multifd.c
> index e2dd2f6e04..130f86a1fb 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -663,7 +663,7 @@ static void multifd_send_cleanup_state(void)
> multifd_send_state = NULL;
> }
>
> -void multifd_save_cleanup(void)
> +void multifd_send_shutdown(void)
> {
> int i;
>
> @@ -965,7 +965,7 @@ static void multifd_new_send_channel_create(gpointer opaque)
> socket_send_channel_create(multifd_new_send_channel_async, opaque);
> }
>
> -int multifd_save_setup(Error **errp)
> +int multifd_send_setup(Error **errp)
> {
> int thread_count;
> uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
> @@ -1063,7 +1063,7 @@ static void multifd_recv_terminate_threads(Error *err)
> }
> }
>
> -void multifd_load_shutdown(void)
> +void multifd_recv_shutdown(void)
> {
> if (migrate_multifd()) {
> multifd_recv_terminate_threads(NULL);
> @@ -1098,7 +1098,7 @@ static void multifd_recv_cleanup_state(void)
> multifd_recv_state = NULL;
> }
>
> -void multifd_load_cleanup(void)
> +void multifd_recv_cleanup(void)
> {
> int i;
>
> @@ -1213,7 +1213,7 @@ static void *multifd_recv_thread(void *opaque)
> return NULL;
> }
>
> -int multifd_load_setup(Error **errp)
> +int multifd_recv_setup(Error **errp)
> {
> int thread_count;
> uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [PATCH v2 22/23] migration/multifd: Fix MultiFDSendParams.packet_num race
2024-02-02 10:28 ` [PATCH v2 22/23] migration/multifd: Fix MultiFDSendParams.packet_num race peterx
@ 2024-02-02 21:08 ` Fabiano Rosas
2024-02-05 4:05 ` Peter Xu
0 siblings, 1 reply; 49+ messages in thread
From: Fabiano Rosas @ 2024-02-02 21:08 UTC (permalink / raw)
To: peterx, qemu-devel
Cc: Hao Xiang, Bryan Zhang, peterx, Avihai Horon, Yuan Liu,
Prasad Pandit, Elena Ufimtseva
peterx@redhat.com writes:
> From: Peter Xu <peterx@redhat.com>
>
> As reported correctly by Fabiano [1], MultiFDSendParams.packet_num is buggy
> to be assigned and stored. Consider two consequent operations of: (1)
> queue a job into multifd send thread X, then (2) queue another sync request
> to the same send thread X. Then the MultiFDSendParams.packet_num will be
> assigned twice, and the first assignment can get lost already.
>
> To avoid that, we move the packet_num assignment from p->packet_num into
> where the thread will fill in the packet. Use atomic operations to protect
> the field, making sure there's no race.
>
> Note that atomic fetch_add() may not be good for scaling purposes, however
> multifd should be fine as number of threads should normally not go beyond
> 16 threads. Let's leave that concern for later but fix the issue first.
>
> There's also a trick on how to make it always work even on 32 bit hosts for
> uint64_t packet number. Switching to uintptr_t as of now to simply the
> case. It will cause packet number to overflow easier on 32 bit, but that
> shouldn't be a major concern for now as 32 bit systems is not the major
> audience for any performance concerns like what multifd wants to address.
>
> We also need to move multifd_send_state definition upper, so that
> multifd_send_fill_packet() can reference it.
>
> [1] https://lore.kernel.org/r/87o7d1jlu5.fsf@suse.de
>
> Reported-by: Fabiano Rosas <farosas@suse.de>
> Signed-off-by: Peter Xu <peterx@redhat.com>
Elena had reported this in October already.
Reported-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
Reviewed-by: Fabiano Rosas <farosas@suse.de>
> ---
> migration/multifd.h | 2 --
> migration/multifd.c | 56 +++++++++++++++++++++++++++------------------
> 2 files changed, 34 insertions(+), 24 deletions(-)
>
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 9b40a53cb6..98876ff94a 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -97,8 +97,6 @@ typedef struct {
> bool running;
> /* multifd flags for each packet */
> uint32_t flags;
> - /* global number of generated multifd packets */
> - uint64_t packet_num;
> /*
> * The sender thread has work to do if either of below boolean is set.
> *
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 130f86a1fb..b317d57d61 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -45,6 +45,35 @@ typedef struct {
> uint64_t unused2[4]; /* Reserved for future use */
> } __attribute__((packed)) MultiFDInit_t;
>
> +struct {
> + MultiFDSendParams *params;
> + /* array of pages to sent */
> + MultiFDPages_t *pages;
> + /*
> + * Global number of generated multifd packets.
> + *
> + * Note that we used 'uintptr_t' because it'll naturally support atomic
> + * operations on both 32bit / 64 bits hosts. It means on 32bit systems
> + * multifd will overflow the packet_num easier, but that should be
> + * fine.
> + *
> + * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
> + * hosts, however so far it does not support atomic fetch_add() yet.
> + * Make it easy for now.
> + */
> + uintptr_t packet_num;
> + /* send channels ready */
> + QemuSemaphore channels_ready;
> + /*
> + * Have we already run terminate threads. There is a race when it
> + * happens that we got one error while we are exiting.
> + * We will use atomic operations. Only valid values are 0 and 1.
> + */
> + int exiting;
> + /* multifd ops */
> + MultiFDMethods *ops;
> +} *multifd_send_state;
> +
> /* Multifd without compression */
>
> /**
> @@ -292,13 +321,16 @@ void multifd_send_fill_packet(MultiFDSendParams *p)
> {
> MultiFDPacket_t *packet = p->packet;
> MultiFDPages_t *pages = p->pages;
> + uint64_t packet_num;
> int i;
>
> packet->flags = cpu_to_be32(p->flags);
> packet->pages_alloc = cpu_to_be32(p->pages->allocated);
> packet->normal_pages = cpu_to_be32(pages->num);
> packet->next_packet_size = cpu_to_be32(p->next_packet_size);
> - packet->packet_num = cpu_to_be64(p->packet_num);
> +
> + packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
> + packet->packet_num = cpu_to_be64(packet_num);
>
> if (pages->block) {
> strncpy(packet->ramblock, pages->block->idstr, 256);
> @@ -314,7 +346,7 @@ void multifd_send_fill_packet(MultiFDSendParams *p)
> p->packets_sent++;
> p->total_normal_pages += pages->num;
>
> - trace_multifd_send(p->id, p->packet_num, pages->num, p->flags,
> + trace_multifd_send(p->id, packet_num, pages->num, p->flags,
> p->next_packet_size);
> }
>
> @@ -398,24 +430,6 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
> return 0;
> }
>
> -struct {
> - MultiFDSendParams *params;
> - /* array of pages to sent */
> - MultiFDPages_t *pages;
> - /* global number of generated multifd packets */
> - uint64_t packet_num;
> - /* send channels ready */
> - QemuSemaphore channels_ready;
> - /*
> - * Have we already run terminate threads. There is a race when it
> - * happens that we got one error while we are exiting.
> - * We will use atomic operations. Only valid values are 0 and 1.
> - */
> - int exiting;
> - /* multifd ops */
> - MultiFDMethods *ops;
> -} *multifd_send_state;
> -
> static bool multifd_send_should_exit(void)
> {
> return qatomic_read(&multifd_send_state->exiting);
> @@ -497,7 +511,6 @@ static bool multifd_send_pages(void)
> */
> assert(qatomic_read(&p->pending_job) == false);
> qatomic_set(&p->pending_job, true);
> - p->packet_num = multifd_send_state->packet_num++;
> multifd_send_state->pages = p->pages;
> p->pages = pages;
> qemu_mutex_unlock(&p->mutex);
> @@ -730,7 +743,6 @@ int multifd_send_sync_main(void)
> trace_multifd_send_sync_main_signal(p->id);
>
> qemu_mutex_lock(&p->mutex);
> - p->packet_num = multifd_send_state->packet_num++;
> /*
> * We should be the only user so far, so not possible to be set by
> * others concurrently.
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [PATCH v2 23/23] migration/multifd: Optimize sender side to be lockless
2024-02-02 10:28 ` [PATCH v2 23/23] migration/multifd: Optimize sender side to be lockless peterx
@ 2024-02-02 21:34 ` Fabiano Rosas
2024-02-05 4:35 ` Peter Xu
0 siblings, 1 reply; 49+ messages in thread
From: Fabiano Rosas @ 2024-02-02 21:34 UTC (permalink / raw)
To: peterx, qemu-devel
Cc: Hao Xiang, Bryan Zhang, peterx, Avihai Horon, Yuan Liu,
Prasad Pandit
peterx@redhat.com writes:
> From: Peter Xu <peterx@redhat.com>
>
> When reviewing my attempt to refactor send_prepare(), Fabiano suggested we
> try out with dropping the mutex in multifd code [1].
>
> I thought about that before but I never tried to change the code. Now
> maybe it's time to give it a stab. This only optimizes the sender side.
>
> The trick here is multifd has a clear provider/consumer model, that the
> migration main thread publishes requests (either pending_job/pending_sync),
> while the multifd sender threads are consumers. Here we don't have a lot
> of comlicated data sharing, and the jobs can logically be submitted lockless.
complicated
>
> Arm the code with atomic weapons. Two things worth mentioning:
>
> - For multifd_send_pages(): we can use qatomic_load_acquire() when trying
> to find a free channel, but that's expensive if we attach one ACQUIRE per
> channel. Instead, make it atomic_read() on the pending_job flag, but
s/make it/keep it/
The diff doesn't show the atomic_read already there so it's confusing.
> merge the ACQUIRE into one single smp_mb_acquire() later.
>
> - For pending_sync: it doesn't have any extra data required since now
> p->flags are never touched, it should be safe to not use memory barrier.
> That's different from pending_sync.
pending_job?
>
> Provide rich comments for all the lockless operations to state how they are
> paired. With that, we can remove the mutex.
>
> [1] https://lore.kernel.org/r/87o7d1jlu5.fsf@suse.de
>
> Suggested-by: Fabiano Rosas <farosas@suse.de>
> Signed-off-by: Peter Xu <peterx@redhat.com>
> ---
> migration/multifd.h | 2 --
> migration/multifd.c | 51 +++++++++++++++++++++++----------------------
> 2 files changed, 26 insertions(+), 27 deletions(-)
>
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 98876ff94a..78a2317263 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -91,8 +91,6 @@ typedef struct {
> /* syncs main thread and channels */
> QemuSemaphore sem_sync;
>
> - /* this mutex protects the following parameters */
> - QemuMutex mutex;
> /* is this channel thread running */
> bool running;
> /* multifd flags for each packet */
> diff --git a/migration/multifd.c b/migration/multifd.c
> index b317d57d61..ef13e2e781 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -501,19 +501,19 @@ static bool multifd_send_pages(void)
> }
> }
>
> - qemu_mutex_lock(&p->mutex);
> - assert(!p->pages->num);
> - assert(!p->pages->block);
> /*
> - * Double check on pending_job==false with the lock. In the future if
> - * we can have >1 requester thread, we can replace this with a "goto
> - * retry", but that is for later.
> + * Make sure we read p->pending_job before all the rest. Pairs with
> + * qatomic_store_release() in multifd_send_thread().
> */
> - assert(qatomic_read(&p->pending_job) == false);
> - qatomic_set(&p->pending_job, true);
> + smp_mb_acquire();
> + assert(!p->pages->num);
> multifd_send_state->pages = p->pages;
> p->pages = pages;
> - qemu_mutex_unlock(&p->mutex);
> + /*
> + * Making sure p->pages is setup before marking pending_job=true. Pairs
> + * with the qatomic_load_acquire() in multifd_send_thread().
> + */
> + qatomic_store_release(&p->pending_job, true);
> qemu_sem_post(&p->sem);
>
> return true;
> @@ -648,7 +648,6 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
> }
> multifd_send_channel_destroy(p->c);
> p->c = NULL;
> - qemu_mutex_destroy(&p->mutex);
> qemu_sem_destroy(&p->sem);
> qemu_sem_destroy(&p->sem_sync);
> g_free(p->name);
> @@ -742,14 +741,12 @@ int multifd_send_sync_main(void)
>
> trace_multifd_send_sync_main_signal(p->id);
>
> - qemu_mutex_lock(&p->mutex);
> /*
> * We should be the only user so far, so not possible to be set by
> * others concurrently.
> */
> assert(qatomic_read(&p->pending_sync) == false);
> qatomic_set(&p->pending_sync, true);
> - qemu_mutex_unlock(&p->mutex);
> qemu_sem_post(&p->sem);
> }
> for (i = 0; i < migrate_multifd_channels(); i++) {
> @@ -796,9 +793,12 @@ static void *multifd_send_thread(void *opaque)
> if (multifd_send_should_exit()) {
> break;
> }
> - qemu_mutex_lock(&p->mutex);
>
> - if (qatomic_read(&p->pending_job)) {
> + /*
> + * Read pending_job flag before p->pages. Pairs with the
> + * qatomic_store_release() in multifd_send_pages().
> + */
> + if (qatomic_load_acquire(&p->pending_job)) {
> MultiFDPages_t *pages = p->pages;
>
> p->iovs_num = 0;
> @@ -806,14 +806,12 @@ static void *multifd_send_thread(void *opaque)
>
> ret = multifd_send_state->ops->send_prepare(p, &local_err);
> if (ret != 0) {
> - qemu_mutex_unlock(&p->mutex);
> break;
> }
>
> ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
> 0, p->write_flags, &local_err);
> if (ret != 0) {
> - qemu_mutex_unlock(&p->mutex);
> break;
> }
>
> @@ -822,24 +820,31 @@ static void *multifd_send_thread(void *opaque)
>
> multifd_pages_reset(p->pages);
> p->next_packet_size = 0;
> - qatomic_set(&p->pending_job, false);
> - qemu_mutex_unlock(&p->mutex);
> +
> + /*
> + * Making sure p->pages is published before saying "we're
> + * free". Pairs with the qatomic_load_acquire() in
smp_mb_acquire()
> + * multifd_send_pages().
> + */
> + qatomic_store_release(&p->pending_job, false);
> } else {
> - /* If not a normal job, must be a sync request */
> + /*
> + * If not a normal job, must be a sync request. Note that
> + * pending_sync is a standalone flag (unlike pending_job), so
> + * it doesn't require explicit memory barriers.
> + */
> assert(qatomic_read(&p->pending_sync));
> p->flags = MULTIFD_FLAG_SYNC;
> multifd_send_fill_packet(p);
> ret = qio_channel_write_all(p->c, (void *)p->packet,
> p->packet_len, &local_err);
> if (ret != 0) {
> - qemu_mutex_unlock(&p->mutex);
> break;
> }
> /* p->next_packet_size will always be zero for a SYNC packet */
> stat64_add(&mig_stats.multifd_bytes, p->packet_len);
> p->flags = 0;
> qatomic_set(&p->pending_sync, false);
> - qemu_mutex_unlock(&p->mutex);
> qemu_sem_post(&p->sem_sync);
> }
> }
> @@ -853,10 +858,7 @@ out:
> error_free(local_err);
> }
>
> - qemu_mutex_lock(&p->mutex);
> p->running = false;
> - qemu_mutex_unlock(&p->mutex);
> -
> rcu_unregister_thread();
> migration_threads_remove(thread);
> trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages);
> @@ -998,7 +1000,6 @@ int multifd_send_setup(Error **errp)
> for (i = 0; i < thread_count; i++) {
> MultiFDSendParams *p = &multifd_send_state->params[i];
>
> - qemu_mutex_init(&p->mutex);
> qemu_sem_init(&p->sem, 0);
> qemu_sem_init(&p->sem_sync, 0);
> p->id = i;
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [PATCH v2 18/23] migration/multifd: Rewrite multifd_queue_page()
2024-02-02 20:47 ` Fabiano Rosas
@ 2024-02-05 4:03 ` Peter Xu
0 siblings, 0 replies; 49+ messages in thread
From: Peter Xu @ 2024-02-05 4:03 UTC (permalink / raw)
To: Fabiano Rosas
Cc: qemu-devel, Hao Xiang, Bryan Zhang, Avihai Horon, Yuan Liu,
Prasad Pandit
On Fri, Feb 02, 2024 at 05:47:05PM -0300, Fabiano Rosas wrote:
> peterx@redhat.com writes:
>
> > From: Peter Xu <peterx@redhat.com>
> >
> > The current multifd_queue_page() is not easy to read and follow. It is not
> > good with a few reasons:
> >
> > - No helper at all to show what exactly does a condition mean; in short,
> > readability is low.
> >
> > - Rely on pages->ramblock being cleared to detect an empty queue. It's
> > slightly an overload of the ramblock pointer, per Fabiano [1], which I
> > also agree.
> >
> > - Contains a self recursion, even if not necessary..
> >
> > Rewrite this function. We add some comments to make it even clearer on
> > what it does.
> >
> > [1] https://lore.kernel.org/r/87wmrpjzew.fsf@suse.de
> >
> > Signed-off-by: Peter Xu <peterx@redhat.com>
>
> Reviewed-by: Fabiano Rosas <farosas@suse.de>
>
> Patch looks good, but I have a question below.
>
> > ---
> > migration/multifd.c | 56 ++++++++++++++++++++++++++++++---------------
> > 1 file changed, 37 insertions(+), 19 deletions(-)
> >
> > diff --git a/migration/multifd.c b/migration/multifd.c
> > index 35d4e8ad1f..4ab8e6eff2 100644
> > --- a/migration/multifd.c
> > +++ b/migration/multifd.c
> > @@ -506,35 +506,53 @@ static bool multifd_send_pages(void)
> > return true;
> > }
> >
> > +static inline bool multifd_queue_empty(MultiFDPages_t *pages)
> > +{
> > + return pages->num == 0;
> > +}
> > +
> > +static inline bool multifd_queue_full(MultiFDPages_t *pages)
> > +{
> > + return pages->num == pages->allocated;
> > +}
> > +
> > +static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
> > +{
> > + pages->offset[pages->num++] = offset;
> > +}
> > +
> > /* Returns true if enqueue successful, false otherwise */
> > bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
> > {
> > - MultiFDPages_t *pages = multifd_send_state->pages;
> > - bool changed = false;
> > + MultiFDPages_t *pages;
> > +
> > +retry:
> > + pages = multifd_send_state->pages;
> >
> > - if (!pages->block) {
> > + /* If the queue is empty, we can already enqueue now */
> > + if (multifd_queue_empty(pages)) {
> > pages->block = block;
> > + multifd_enqueue(pages, offset);
> > + return true;
> > }
> >
> > - if (pages->block == block) {
> > - pages->offset[pages->num] = offset;
> > - pages->num++;
> > -
> > - if (pages->num < pages->allocated) {
> > - return true;
> > + /*
> > + * Not empty, meanwhile we need a flush. It can because of either:
> > + *
> > + * (1) The page is not on the same ramblock of previous ones, or,
> > + * (2) The queue is full.
> > + *
> > + * After flush, always retry.
> > + */
> > + if (pages->block != block || multifd_queue_full(pages)) {
> > + if (!multifd_send_pages()) {
> > + return false;
> > }
> > - } else {
> > - changed = true;
> > - }
> > -
> > - if (!multifd_send_pages()) {
> > - return false;
> > - }
> > -
> > - if (changed) {
> > - return multifd_queue_page(block, offset);
> > + goto retry;
> > }
> >
> > + /* Not empty, and we still have space, do it! */
> > + multifd_enqueue(pages, offset);
>
> Hm, here you're missing the flush of the last group of pages of the last
> ramblock. Just like current code...
>
> ...which means we're relying on the multifd_send_pages() at
> multifd_send_sync_main() to send the last few pages. So how can that
> work when multifd_flush_after_each_section==false? Because it skips the
> sync flag, but would also skip the last send. I'm confused.
IIUC it won't skip the final flush of the last pages. See
find_dirty_block():
if (migrate_multifd() &&
!migrate_multifd_flush_after_each_section()) {
QEMUFile *f = rs->pss[RAM_CHANNEL_PRECOPY].pss_channel;
int ret = multifd_send_sync_main();
if (ret < 0) {
return ret;
}
qemu_put_be64(f, RAM_SAVE_FLAG_MULTIFD_FLUSH);
qemu_fflush(f);
}
IMHO this should be the last flush of the pages when we loop one more
round.
Maybe what you're talking about this one (of ram_save_complete())?
if (migrate_multifd() && !migrate_multifd_flush_after_each_section()) {
qemu_put_be64(f, RAM_SAVE_FLAG_MULTIFD_FLUSH);
}
I remember we talked about this somewhere in your "file" series,
but.. AFAIU this last RAM_SAVE_FLAG_MULTIFD_FLUSH might be redundant, it
just needs some justifications to double check I didn't miss something.
Now multifd_queue_page() is kind of lazy-mode on flushing, I think that may
make some sense (we assign job unless required, so maybe there's higher
chance that one thread is free?), but I'm not sure whether that's a huge
deal if NIC is the bandwidth, because in that case we'll wait for sender
threads anyway, and they should all be busy at any time.
However even if we flush immediately as long as full, we'd still better
check queue is empty before completion of migration for sure, to make sure
nothing is left.
--
Peter Xu
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [PATCH v2 22/23] migration/multifd: Fix MultiFDSendParams.packet_num race
2024-02-02 21:08 ` Fabiano Rosas
@ 2024-02-05 4:05 ` Peter Xu
0 siblings, 0 replies; 49+ messages in thread
From: Peter Xu @ 2024-02-05 4:05 UTC (permalink / raw)
To: Fabiano Rosas
Cc: qemu-devel, Hao Xiang, Bryan Zhang, Avihai Horon, Yuan Liu,
Prasad Pandit, Elena Ufimtseva
On Fri, Feb 02, 2024 at 06:08:22PM -0300, Fabiano Rosas wrote:
> peterx@redhat.com writes:
>
> > From: Peter Xu <peterx@redhat.com>
> >
> > As reported correctly by Fabiano [1], MultiFDSendParams.packet_num is buggy
> > to be assigned and stored. Consider two consequent operations of: (1)
> > queue a job into multifd send thread X, then (2) queue another sync request
> > to the same send thread X. Then the MultiFDSendParams.packet_num will be
> > assigned twice, and the first assignment can get lost already.
> >
> > To avoid that, we move the packet_num assignment from p->packet_num into
> > where the thread will fill in the packet. Use atomic operations to protect
> > the field, making sure there's no race.
> >
> > Note that atomic fetch_add() may not be good for scaling purposes, however
> > multifd should be fine as number of threads should normally not go beyond
> > 16 threads. Let's leave that concern for later but fix the issue first.
> >
> > There's also a trick on how to make it always work even on 32 bit hosts for
> > uint64_t packet number. Switching to uintptr_t as of now to simply the
> > case. It will cause packet number to overflow easier on 32 bit, but that
> > shouldn't be a major concern for now as 32 bit systems is not the major
> > audience for any performance concerns like what multifd wants to address.
> >
> > We also need to move multifd_send_state definition upper, so that
> > multifd_send_fill_packet() can reference it.
> >
> > [1] https://lore.kernel.org/r/87o7d1jlu5.fsf@suse.de
> >
> > Reported-by: Fabiano Rosas <farosas@suse.de>
> > Signed-off-by: Peter Xu <peterx@redhat.com>
>
> Elena had reported this in October already.
>
> Reported-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
Ah, I'll do the replacement.
> Reviewed-by: Fabiano Rosas <farosas@suse.de>
Thanks,
--
Peter Xu
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [PATCH v2 19/23] migration/multifd: Cleanup multifd_save_cleanup()
2024-02-02 20:54 ` Fabiano Rosas
@ 2024-02-05 4:25 ` Peter Xu
0 siblings, 0 replies; 49+ messages in thread
From: Peter Xu @ 2024-02-05 4:25 UTC (permalink / raw)
To: Fabiano Rosas
Cc: qemu-devel, Hao Xiang, Bryan Zhang, Avihai Horon, Yuan Liu,
Prasad Pandit
On Fri, Feb 02, 2024 at 05:54:23PM -0300, Fabiano Rosas wrote:
> peterx@redhat.com writes:
>
> > From: Peter Xu <peterx@redhat.com>
> >
> > Shrink the function by moving relevant works into helpers: move the thread
> > join()s into multifd_send_terminate_threads(), then create two more helpers
> > to cover channel/state cleanups.
> >
> > Add a TODO entry for the thread terminate process because p->running is
> > still buggy. We need to fix it at some point but not yet covered.
> >
> > Suggested-by: Fabiano Rosas <farosas@suse.de>
> > Signed-off-by: Peter Xu <peterx@redhat.com>
>
> Reviewed-by: Fabiano Rosas <farosas@suse.de>
>
> minor comment below
>
> > ---
> > migration/multifd.c | 91 +++++++++++++++++++++++++++++----------------
> > 1 file changed, 59 insertions(+), 32 deletions(-)
> >
> > diff --git a/migration/multifd.c b/migration/multifd.c
> > index 4ab8e6eff2..4cb0d2cc17 100644
> > --- a/migration/multifd.c
> > +++ b/migration/multifd.c
> > @@ -593,6 +593,11 @@ static void multifd_send_terminate_threads(void)
> > * always set it.
> > */
> > qatomic_set(&multifd_send_state->exiting, 1);
> > +
> > + /*
> > + * Firstly, kick all threads out; no matter whether they are just idle,
> > + * or blocked in an IO system call.
> > + */
> > for (i = 0; i < migrate_multifd_channels(); i++) {
> > MultiFDSendParams *p = &multifd_send_state->params[i];
> >
> > @@ -601,6 +606,21 @@ static void multifd_send_terminate_threads(void)
> > qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
> > }
> > }
> > +
> > + /*
> > + * Finally recycle all the threads.
> > + *
> > + * TODO: p->running is still buggy, e.g. we can reach here without the
> > + * corresponding multifd_new_send_channel_async() get invoked yet,
> > + * then a new thread can even be created after this function returns.
> > + */
>
> Series on the list:
>
> https://lore.kernel.org/r/20240202191128.1901-1-farosas@suse.de
Thanks a lot. I'll read it later today.
>
> > + for (i = 0; i < migrate_multifd_channels(); i++) {
> > + MultiFDSendParams *p = &multifd_send_state->params[i];
> > +
> > + if (p->running) {
> > + qemu_thread_join(&p->thread);
> > + }
> > + }
> > }
> >
> > static int multifd_send_channel_destroy(QIOChannel *send)
> > @@ -608,6 +628,41 @@ static int multifd_send_channel_destroy(QIOChannel *send)
> > return socket_send_channel_destroy(send);
> > }
> >
> > +static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
> > +{
> > + if (p->registered_yank) {
> > + migration_ioc_unregister_yank(p->c);
> > + }
> > + multifd_send_channel_destroy(p->c);
> > + p->c = NULL;
> > + qemu_mutex_destroy(&p->mutex);
> > + qemu_sem_destroy(&p->sem);
> > + qemu_sem_destroy(&p->sem_sync);
> > + g_free(p->name);
> > + p->name = NULL;
> > + multifd_pages_clear(p->pages);
> > + p->pages = NULL;
> > + p->packet_len = 0;
> > + g_free(p->packet);
> > + p->packet = NULL;
> > + g_free(p->iov);
> > + p->iov = NULL;
> > + multifd_send_state->ops->send_cleanup(p, errp);
> > +
> > + return *errp == NULL;
>
> I think technically this would require the ERRP_GUARD() macro?
I normally only use ERRP_GUARD() if there can be any caller passing in
NULL, or when I am not sure it's always !NULL.
What I wanted to add here is actually assert(errp), but then I noticed
*errp==NULL plays the same role as that, because if errp==NULL, it'll crash
here when dereferencing, so it actually has an implicit assert(errp);
exactly what I wanted, but even one line less (even if not obvious).
--
Peter Xu
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [PATCH v2 23/23] migration/multifd: Optimize sender side to be lockless
2024-02-02 21:34 ` Fabiano Rosas
@ 2024-02-05 4:35 ` Peter Xu
2024-02-05 14:10 ` Fabiano Rosas
0 siblings, 1 reply; 49+ messages in thread
From: Peter Xu @ 2024-02-05 4:35 UTC (permalink / raw)
To: Fabiano Rosas
Cc: qemu-devel, Hao Xiang, Bryan Zhang, Avihai Horon, Yuan Liu,
Prasad Pandit
On Fri, Feb 02, 2024 at 06:34:08PM -0300, Fabiano Rosas wrote:
> peterx@redhat.com writes:
>
> > From: Peter Xu <peterx@redhat.com>
> >
> > When reviewing my attempt to refactor send_prepare(), Fabiano suggested we
> > try out with dropping the mutex in multifd code [1].
> >
> > I thought about that before but I never tried to change the code. Now
> > maybe it's time to give it a stab. This only optimizes the sender side.
> >
> > The trick here is multifd has a clear provider/consumer model, that the
> > migration main thread publishes requests (either pending_job/pending_sync),
> > while the multifd sender threads are consumers. Here we don't have a lot
> > of comlicated data sharing, and the jobs can logically be submitted lockless.
>
> complicated
>
> >
> > Arm the code with atomic weapons. Two things worth mentioning:
> >
> > - For multifd_send_pages(): we can use qatomic_load_acquire() when trying
> > to find a free channel, but that's expensive if we attach one ACQUIRE per
> > channel. Instead, make it atomic_read() on the pending_job flag, but
>
> s/make it/keep it/
>
> The diff doesn't show the atomic_read already there so it's confusing.
Right. I also has a trivial typo on s/atomic_read/qatomic_read/..
I tried to rephrase the last sentence:
- For multifd_send_pages(): we can use qatomic_load_acquire() when trying
to find a free channel, but that's expensive if we attach one ACQUIRE per
channel. Instead, keep the qatomic_read() on reading the pending_job
flag as we do already, meanwhile use one smp_mb_acquire() after the loop
to guarantee the memory ordering.
Maybe slightly clearer?
>
> > merge the ACQUIRE into one single smp_mb_acquire() later.
> >
> > - For pending_sync: it doesn't have any extra data required since now
> > p->flags are never touched, it should be safe to not use memory barrier.
> > That's different from pending_sync.
>
> pending_job?
Yep, all the rest fixed.
>
> >
> > Provide rich comments for all the lockless operations to state how they are
> > paired. With that, we can remove the mutex.
> >
> > [1] https://lore.kernel.org/r/87o7d1jlu5.fsf@suse.de
> >
> > Suggested-by: Fabiano Rosas <farosas@suse.de>
> > Signed-off-by: Peter Xu <peterx@redhat.com>
> > ---
> > migration/multifd.h | 2 --
> > migration/multifd.c | 51 +++++++++++++++++++++++----------------------
> > 2 files changed, 26 insertions(+), 27 deletions(-)
> >
> > diff --git a/migration/multifd.h b/migration/multifd.h
> > index 98876ff94a..78a2317263 100644
> > --- a/migration/multifd.h
> > +++ b/migration/multifd.h
> > @@ -91,8 +91,6 @@ typedef struct {
> > /* syncs main thread and channels */
> > QemuSemaphore sem_sync;
> >
> > - /* this mutex protects the following parameters */
> > - QemuMutex mutex;
> > /* is this channel thread running */
> > bool running;
> > /* multifd flags for each packet */
> > diff --git a/migration/multifd.c b/migration/multifd.c
> > index b317d57d61..ef13e2e781 100644
> > --- a/migration/multifd.c
> > +++ b/migration/multifd.c
> > @@ -501,19 +501,19 @@ static bool multifd_send_pages(void)
> > }
> > }
> >
> > - qemu_mutex_lock(&p->mutex);
> > - assert(!p->pages->num);
> > - assert(!p->pages->block);
> > /*
> > - * Double check on pending_job==false with the lock. In the future if
> > - * we can have >1 requester thread, we can replace this with a "goto
> > - * retry", but that is for later.
> > + * Make sure we read p->pending_job before all the rest. Pairs with
> > + * qatomic_store_release() in multifd_send_thread().
> > */
> > - assert(qatomic_read(&p->pending_job) == false);
> > - qatomic_set(&p->pending_job, true);
> > + smp_mb_acquire();
> > + assert(!p->pages->num);
> > multifd_send_state->pages = p->pages;
> > p->pages = pages;
> > - qemu_mutex_unlock(&p->mutex);
> > + /*
> > + * Making sure p->pages is setup before marking pending_job=true. Pairs
> > + * with the qatomic_load_acquire() in multifd_send_thread().
> > + */
> > + qatomic_store_release(&p->pending_job, true);
> > qemu_sem_post(&p->sem);
> >
> > return true;
> > @@ -648,7 +648,6 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
> > }
> > multifd_send_channel_destroy(p->c);
> > p->c = NULL;
> > - qemu_mutex_destroy(&p->mutex);
> > qemu_sem_destroy(&p->sem);
> > qemu_sem_destroy(&p->sem_sync);
> > g_free(p->name);
> > @@ -742,14 +741,12 @@ int multifd_send_sync_main(void)
> >
> > trace_multifd_send_sync_main_signal(p->id);
> >
> > - qemu_mutex_lock(&p->mutex);
> > /*
> > * We should be the only user so far, so not possible to be set by
> > * others concurrently.
> > */
> > assert(qatomic_read(&p->pending_sync) == false);
> > qatomic_set(&p->pending_sync, true);
> > - qemu_mutex_unlock(&p->mutex);
> > qemu_sem_post(&p->sem);
> > }
> > for (i = 0; i < migrate_multifd_channels(); i++) {
> > @@ -796,9 +793,12 @@ static void *multifd_send_thread(void *opaque)
> > if (multifd_send_should_exit()) {
> > break;
> > }
> > - qemu_mutex_lock(&p->mutex);
> >
> > - if (qatomic_read(&p->pending_job)) {
> > + /*
> > + * Read pending_job flag before p->pages. Pairs with the
> > + * qatomic_store_release() in multifd_send_pages().
> > + */
> > + if (qatomic_load_acquire(&p->pending_job)) {
> > MultiFDPages_t *pages = p->pages;
> >
> > p->iovs_num = 0;
> > @@ -806,14 +806,12 @@ static void *multifd_send_thread(void *opaque)
> >
> > ret = multifd_send_state->ops->send_prepare(p, &local_err);
> > if (ret != 0) {
> > - qemu_mutex_unlock(&p->mutex);
> > break;
> > }
> >
> > ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
> > 0, p->write_flags, &local_err);
> > if (ret != 0) {
> > - qemu_mutex_unlock(&p->mutex);
> > break;
> > }
> >
> > @@ -822,24 +820,31 @@ static void *multifd_send_thread(void *opaque)
> >
> > multifd_pages_reset(p->pages);
> > p->next_packet_size = 0;
> > - qatomic_set(&p->pending_job, false);
> > - qemu_mutex_unlock(&p->mutex);
> > +
> > + /*
> > + * Making sure p->pages is published before saying "we're
> > + * free". Pairs with the qatomic_load_acquire() in
>
> smp_mb_acquire()
Fixed.
Any more comment on the code changes before I repost?
(maybe I can repost this single patch in-place to avoid another round of
mail bombs..)
--
Peter Xu
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [PATCH v2 23/23] migration/multifd: Optimize sender side to be lockless
2024-02-05 4:35 ` Peter Xu
@ 2024-02-05 14:10 ` Fabiano Rosas
2024-02-05 14:24 ` Peter Xu
0 siblings, 1 reply; 49+ messages in thread
From: Fabiano Rosas @ 2024-02-05 14:10 UTC (permalink / raw)
To: Peter Xu
Cc: qemu-devel, Hao Xiang, Bryan Zhang, Avihai Horon, Yuan Liu,
Prasad Pandit
Peter Xu <peterx@redhat.com> writes:
> On Fri, Feb 02, 2024 at 06:34:08PM -0300, Fabiano Rosas wrote:
>> peterx@redhat.com writes:
>>
>> > From: Peter Xu <peterx@redhat.com>
>> >
>> > When reviewing my attempt to refactor send_prepare(), Fabiano suggested we
>> > try out with dropping the mutex in multifd code [1].
>> >
>> > I thought about that before but I never tried to change the code. Now
>> > maybe it's time to give it a stab. This only optimizes the sender side.
>> >
>> > The trick here is multifd has a clear provider/consumer model, that the
>> > migration main thread publishes requests (either pending_job/pending_sync),
>> > while the multifd sender threads are consumers. Here we don't have a lot
>> > of comlicated data sharing, and the jobs can logically be submitted lockless.
>>
>> complicated
>>
>> >
>> > Arm the code with atomic weapons. Two things worth mentioning:
>> >
>> > - For multifd_send_pages(): we can use qatomic_load_acquire() when trying
>> > to find a free channel, but that's expensive if we attach one ACQUIRE per
>> > channel. Instead, make it atomic_read() on the pending_job flag, but
>>
>> s/make it/keep it/
>>
>> The diff doesn't show the atomic_read already there so it's confusing.
>
> Right. I also has a trivial typo on s/atomic_read/qatomic_read/..
>
> I tried to rephrase the last sentence:
>
> - For multifd_send_pages(): we can use qatomic_load_acquire() when trying
> to find a free channel, but that's expensive if we attach one ACQUIRE per
> channel. Instead, keep the qatomic_read() on reading the pending_job
> flag as we do already, meanwhile use one smp_mb_acquire() after the loop
> to guarantee the memory ordering.
>
> Maybe slightly clearer?
>
Yep, that's better. Thanks.
>>
>> > merge the ACQUIRE into one single smp_mb_acquire() later.
>> >
>> > - For pending_sync: it doesn't have any extra data required since now
>> > p->flags are never touched, it should be safe to not use memory barrier.
>> > That's different from pending_sync.
>>
>> pending_job?
>
> Yep, all the rest fixed.
>
>>
>> >
>> > Provide rich comments for all the lockless operations to state how they are
>> > paired. With that, we can remove the mutex.
>> >
>> > [1] https://lore.kernel.org/r/87o7d1jlu5.fsf@suse.de
>> >
>> > Suggested-by: Fabiano Rosas <farosas@suse.de>
>> > Signed-off-by: Peter Xu <peterx@redhat.com>
>> > ---
>> > migration/multifd.h | 2 --
>> > migration/multifd.c | 51 +++++++++++++++++++++++----------------------
>> > 2 files changed, 26 insertions(+), 27 deletions(-)
>> >
>> > diff --git a/migration/multifd.h b/migration/multifd.h
>> > index 98876ff94a..78a2317263 100644
>> > --- a/migration/multifd.h
>> > +++ b/migration/multifd.h
>> > @@ -91,8 +91,6 @@ typedef struct {
>> > /* syncs main thread and channels */
>> > QemuSemaphore sem_sync;
>> >
>> > - /* this mutex protects the following parameters */
>> > - QemuMutex mutex;
>> > /* is this channel thread running */
>> > bool running;
>> > /* multifd flags for each packet */
>> > diff --git a/migration/multifd.c b/migration/multifd.c
>> > index b317d57d61..ef13e2e781 100644
>> > --- a/migration/multifd.c
>> > +++ b/migration/multifd.c
>> > @@ -501,19 +501,19 @@ static bool multifd_send_pages(void)
>> > }
>> > }
>> >
>> > - qemu_mutex_lock(&p->mutex);
>> > - assert(!p->pages->num);
>> > - assert(!p->pages->block);
>> > /*
>> > - * Double check on pending_job==false with the lock. In the future if
>> > - * we can have >1 requester thread, we can replace this with a "goto
>> > - * retry", but that is for later.
>> > + * Make sure we read p->pending_job before all the rest. Pairs with
>> > + * qatomic_store_release() in multifd_send_thread().
>> > */
>> > - assert(qatomic_read(&p->pending_job) == false);
>> > - qatomic_set(&p->pending_job, true);
>> > + smp_mb_acquire();
>> > + assert(!p->pages->num);
>> > multifd_send_state->pages = p->pages;
>> > p->pages = pages;
>> > - qemu_mutex_unlock(&p->mutex);
>> > + /*
>> > + * Making sure p->pages is setup before marking pending_job=true. Pairs
>> > + * with the qatomic_load_acquire() in multifd_send_thread().
>> > + */
>> > + qatomic_store_release(&p->pending_job, true);
>> > qemu_sem_post(&p->sem);
>> >
>> > return true;
>> > @@ -648,7 +648,6 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
>> > }
>> > multifd_send_channel_destroy(p->c);
>> > p->c = NULL;
>> > - qemu_mutex_destroy(&p->mutex);
>> > qemu_sem_destroy(&p->sem);
>> > qemu_sem_destroy(&p->sem_sync);
>> > g_free(p->name);
>> > @@ -742,14 +741,12 @@ int multifd_send_sync_main(void)
>> >
>> > trace_multifd_send_sync_main_signal(p->id);
>> >
>> > - qemu_mutex_lock(&p->mutex);
>> > /*
>> > * We should be the only user so far, so not possible to be set by
>> > * others concurrently.
>> > */
>> > assert(qatomic_read(&p->pending_sync) == false);
>> > qatomic_set(&p->pending_sync, true);
>> > - qemu_mutex_unlock(&p->mutex);
>> > qemu_sem_post(&p->sem);
>> > }
>> > for (i = 0; i < migrate_multifd_channels(); i++) {
>> > @@ -796,9 +793,12 @@ static void *multifd_send_thread(void *opaque)
>> > if (multifd_send_should_exit()) {
>> > break;
>> > }
>> > - qemu_mutex_lock(&p->mutex);
>> >
>> > - if (qatomic_read(&p->pending_job)) {
>> > + /*
>> > + * Read pending_job flag before p->pages. Pairs with the
>> > + * qatomic_store_release() in multifd_send_pages().
>> > + */
>> > + if (qatomic_load_acquire(&p->pending_job)) {
>> > MultiFDPages_t *pages = p->pages;
>> >
>> > p->iovs_num = 0;
>> > @@ -806,14 +806,12 @@ static void *multifd_send_thread(void *opaque)
>> >
>> > ret = multifd_send_state->ops->send_prepare(p, &local_err);
>> > if (ret != 0) {
>> > - qemu_mutex_unlock(&p->mutex);
>> > break;
>> > }
>> >
>> > ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
>> > 0, p->write_flags, &local_err);
>> > if (ret != 0) {
>> > - qemu_mutex_unlock(&p->mutex);
>> > break;
>> > }
>> >
>> > @@ -822,24 +820,31 @@ static void *multifd_send_thread(void *opaque)
>> >
>> > multifd_pages_reset(p->pages);
>> > p->next_packet_size = 0;
>> > - qatomic_set(&p->pending_job, false);
>> > - qemu_mutex_unlock(&p->mutex);
>> > +
>> > + /*
>> > + * Making sure p->pages is published before saying "we're
>> > + * free". Pairs with the qatomic_load_acquire() in
>>
>> smp_mb_acquire()
>
> Fixed.
>
> Any more comment on the code changes before I repost?
Nope, that's it.
>
> (maybe I can repost this single patch in-place to avoid another round of
> mail bombs..)
Sure.
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [PATCH v2 23/23] migration/multifd: Optimize sender side to be lockless
2024-02-05 14:10 ` Fabiano Rosas
@ 2024-02-05 14:24 ` Peter Xu
2024-02-05 17:59 ` Fabiano Rosas
0 siblings, 1 reply; 49+ messages in thread
From: Peter Xu @ 2024-02-05 14:24 UTC (permalink / raw)
To: Fabiano Rosas
Cc: qemu-devel, Hao Xiang, Bryan Zhang, Avihai Horon, Yuan Liu,
Prasad Pandit
On Mon, Feb 05, 2024 at 11:10:34AM -0300, Fabiano Rosas wrote:
> > (maybe I can repost this single patch in-place to avoid another round of
> > mail bombs..)
>
> Sure.
I've got the final version attached here. Feel free to have a look, thanks.
====
From 6ba337320430feae4ce9d3d906ea19f68430642d Mon Sep 17 00:00:00 2001
From: Peter Xu <peterx@redhat.com>
Date: Fri, 2 Feb 2024 18:28:57 +0800
Subject: [PATCH] migration/multifd: Optimize sender side to be lockless
When reviewing my attempt to refactor send_prepare(), Fabiano suggested we
try out with dropping the mutex in multifd code [1].
I thought about that before but I never tried to change the code. Now
maybe it's time to give it a stab. This only optimizes the sender side.
The trick here is multifd has a clear provider/consumer model, that the
migration main thread publishes requests (either pending_job/pending_sync),
while the multifd sender threads are consumers. Here we don't have a lot
of complicated data sharing, and the jobs can logically be submitted
lockless.
Arm the code with atomic weapons. Two things worth mentioning:
- For multifd_send_pages(): we can use qatomic_load_acquire() when trying
to find a free channel, but that's expensive if we attach one ACQUIRE per
channel. Instead, keep the qatomic_read() on reading the pending_job
flag as we do already, meanwhile use one smp_mb_acquire() after the loop
to guarantee the memory ordering.
- For pending_sync: it doesn't have any extra data required since now
p->flags are never touched, it should be safe to not use memory barrier.
That's different from pending_job.
Provide rich comments for all the lockless operations to state how they are
paired. With that, we can remove the mutex.
[1] https://lore.kernel.org/r/87o7d1jlu5.fsf@suse.de
Suggested-by: Fabiano Rosas <farosas@suse.de>
Link: https://lore.kernel.org/r/20240202102857.110210-24-peterx@redhat.com
Signed-off-by: Peter Xu <peterx@redhat.com>
---
migration/multifd.h | 2 --
migration/multifd.c | 51 +++++++++++++++++++++++----------------------
2 files changed, 26 insertions(+), 27 deletions(-)
diff --git a/migration/multifd.h b/migration/multifd.h
index 98876ff94a..78a2317263 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -91,8 +91,6 @@ typedef struct {
/* syncs main thread and channels */
QemuSemaphore sem_sync;
- /* this mutex protects the following parameters */
- QemuMutex mutex;
/* is this channel thread running */
bool running;
/* multifd flags for each packet */
diff --git a/migration/multifd.c b/migration/multifd.c
index b317d57d61..fbdb129088 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -501,19 +501,19 @@ static bool multifd_send_pages(void)
}
}
- qemu_mutex_lock(&p->mutex);
- assert(!p->pages->num);
- assert(!p->pages->block);
/*
- * Double check on pending_job==false with the lock. In the future if
- * we can have >1 requester thread, we can replace this with a "goto
- * retry", but that is for later.
+ * Make sure we read p->pending_job before all the rest. Pairs with
+ * qatomic_store_release() in multifd_send_thread().
*/
- assert(qatomic_read(&p->pending_job) == false);
- qatomic_set(&p->pending_job, true);
+ smp_mb_acquire();
+ assert(!p->pages->num);
multifd_send_state->pages = p->pages;
p->pages = pages;
- qemu_mutex_unlock(&p->mutex);
+ /*
+ * Making sure p->pages is setup before marking pending_job=true. Pairs
+ * with the qatomic_load_acquire() in multifd_send_thread().
+ */
+ qatomic_store_release(&p->pending_job, true);
qemu_sem_post(&p->sem);
return true;
@@ -648,7 +648,6 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
}
multifd_send_channel_destroy(p->c);
p->c = NULL;
- qemu_mutex_destroy(&p->mutex);
qemu_sem_destroy(&p->sem);
qemu_sem_destroy(&p->sem_sync);
g_free(p->name);
@@ -742,14 +741,12 @@ int multifd_send_sync_main(void)
trace_multifd_send_sync_main_signal(p->id);
- qemu_mutex_lock(&p->mutex);
/*
* We should be the only user so far, so not possible to be set by
* others concurrently.
*/
assert(qatomic_read(&p->pending_sync) == false);
qatomic_set(&p->pending_sync, true);
- qemu_mutex_unlock(&p->mutex);
qemu_sem_post(&p->sem);
}
for (i = 0; i < migrate_multifd_channels(); i++) {
@@ -796,9 +793,12 @@ static void *multifd_send_thread(void *opaque)
if (multifd_send_should_exit()) {
break;
}
- qemu_mutex_lock(&p->mutex);
- if (qatomic_read(&p->pending_job)) {
+ /*
+ * Read pending_job flag before p->pages. Pairs with the
+ * qatomic_store_release() in multifd_send_pages().
+ */
+ if (qatomic_load_acquire(&p->pending_job)) {
MultiFDPages_t *pages = p->pages;
p->iovs_num = 0;
@@ -806,14 +806,12 @@ static void *multifd_send_thread(void *opaque)
ret = multifd_send_state->ops->send_prepare(p, &local_err);
if (ret != 0) {
- qemu_mutex_unlock(&p->mutex);
break;
}
ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
0, p->write_flags, &local_err);
if (ret != 0) {
- qemu_mutex_unlock(&p->mutex);
break;
}
@@ -822,24 +820,31 @@ static void *multifd_send_thread(void *opaque)
multifd_pages_reset(p->pages);
p->next_packet_size = 0;
- qatomic_set(&p->pending_job, false);
- qemu_mutex_unlock(&p->mutex);
+
+ /*
+ * Making sure p->pages is published before saying "we're
+ * free". Pairs with the smp_mb_acquire() in
+ * multifd_send_pages().
+ */
+ qatomic_store_release(&p->pending_job, false);
} else {
- /* If not a normal job, must be a sync request */
+ /*
+ * If not a normal job, must be a sync request. Note that
+ * pending_sync is a standalone flag (unlike pending_job), so
+ * it doesn't require explicit memory barriers.
+ */
assert(qatomic_read(&p->pending_sync));
p->flags = MULTIFD_FLAG_SYNC;
multifd_send_fill_packet(p);
ret = qio_channel_write_all(p->c, (void *)p->packet,
p->packet_len, &local_err);
if (ret != 0) {
- qemu_mutex_unlock(&p->mutex);
break;
}
/* p->next_packet_size will always be zero for a SYNC packet */
stat64_add(&mig_stats.multifd_bytes, p->packet_len);
p->flags = 0;
qatomic_set(&p->pending_sync, false);
- qemu_mutex_unlock(&p->mutex);
qemu_sem_post(&p->sem_sync);
}
}
@@ -853,10 +858,7 @@ out:
error_free(local_err);
}
- qemu_mutex_lock(&p->mutex);
p->running = false;
- qemu_mutex_unlock(&p->mutex);
-
rcu_unregister_thread();
migration_threads_remove(thread);
trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages);
@@ -998,7 +1000,6 @@ int multifd_send_setup(Error **errp)
for (i = 0; i < thread_count; i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
- qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem, 0);
qemu_sem_init(&p->sem_sync, 0);
p->id = i;
--
2.43.0
--
Peter Xu
^ permalink raw reply related [flat|nested] 49+ messages in thread
* Re: [PATCH v2 23/23] migration/multifd: Optimize sender side to be lockless
2024-02-05 14:24 ` Peter Xu
@ 2024-02-05 17:59 ` Fabiano Rosas
0 siblings, 0 replies; 49+ messages in thread
From: Fabiano Rosas @ 2024-02-05 17:59 UTC (permalink / raw)
To: Peter Xu
Cc: qemu-devel, Hao Xiang, Bryan Zhang, Avihai Horon, Yuan Liu,
Prasad Pandit
Peter Xu <peterx@redhat.com> writes:
> On Mon, Feb 05, 2024 at 11:10:34AM -0300, Fabiano Rosas wrote:
>> > (maybe I can repost this single patch in-place to avoid another round of
>> > mail bombs..)
>>
>> Sure.
>
> I've got the final version attached here. Feel free to have a look, thanks.
>
> ====
> From 6ba337320430feae4ce9d3d906ea19f68430642d Mon Sep 17 00:00:00 2001
> From: Peter Xu <peterx@redhat.com>
> Date: Fri, 2 Feb 2024 18:28:57 +0800
> Subject: [PATCH] migration/multifd: Optimize sender side to be lockless
>
> When reviewing my attempt to refactor send_prepare(), Fabiano suggested we
> try out with dropping the mutex in multifd code [1].
>
> I thought about that before but I never tried to change the code. Now
> maybe it's time to give it a stab. This only optimizes the sender side.
>
> The trick here is multifd has a clear provider/consumer model, that the
> migration main thread publishes requests (either pending_job/pending_sync),
> while the multifd sender threads are consumers. Here we don't have a lot
> of complicated data sharing, and the jobs can logically be submitted
> lockless.
>
> Arm the code with atomic weapons. Two things worth mentioning:
>
> - For multifd_send_pages(): we can use qatomic_load_acquire() when trying
> to find a free channel, but that's expensive if we attach one ACQUIRE per
> channel. Instead, keep the qatomic_read() on reading the pending_job
> flag as we do already, meanwhile use one smp_mb_acquire() after the loop
> to guarantee the memory ordering.
>
> - For pending_sync: it doesn't have any extra data required since now
> p->flags are never touched, it should be safe to not use memory barrier.
> That's different from pending_job.
>
> Provide rich comments for all the lockless operations to state how they are
> paired. With that, we can remove the mutex.
>
> [1] https://lore.kernel.org/r/87o7d1jlu5.fsf@suse.de
>
> Suggested-by: Fabiano Rosas <farosas@suse.de>
> Link: https://lore.kernel.org/r/20240202102857.110210-24-peterx@redhat.com
> Signed-off-by: Peter Xu <peterx@redhat.com>
> ---
> migration/multifd.h | 2 --
> migration/multifd.c | 51 +++++++++++++++++++++++----------------------
> 2 files changed, 26 insertions(+), 27 deletions(-)
>
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 98876ff94a..78a2317263 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -91,8 +91,6 @@ typedef struct {
> /* syncs main thread and channels */
> QemuSemaphore sem_sync;
>
> - /* this mutex protects the following parameters */
> - QemuMutex mutex;
> /* is this channel thread running */
> bool running;
> /* multifd flags for each packet */
> diff --git a/migration/multifd.c b/migration/multifd.c
> index b317d57d61..fbdb129088 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -501,19 +501,19 @@ static bool multifd_send_pages(void)
> }
> }
>
> - qemu_mutex_lock(&p->mutex);
> - assert(!p->pages->num);
> - assert(!p->pages->block);
> /*
> - * Double check on pending_job==false with the lock. In the future if
> - * we can have >1 requester thread, we can replace this with a "goto
> - * retry", but that is for later.
> + * Make sure we read p->pending_job before all the rest. Pairs with
> + * qatomic_store_release() in multifd_send_thread().
> */
> - assert(qatomic_read(&p->pending_job) == false);
> - qatomic_set(&p->pending_job, true);
> + smp_mb_acquire();
> + assert(!p->pages->num);
> multifd_send_state->pages = p->pages;
> p->pages = pages;
> - qemu_mutex_unlock(&p->mutex);
> + /*
> + * Making sure p->pages is setup before marking pending_job=true. Pairs
> + * with the qatomic_load_acquire() in multifd_send_thread().
> + */
> + qatomic_store_release(&p->pending_job, true);
> qemu_sem_post(&p->sem);
>
> return true;
> @@ -648,7 +648,6 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
> }
> multifd_send_channel_destroy(p->c);
> p->c = NULL;
> - qemu_mutex_destroy(&p->mutex);
> qemu_sem_destroy(&p->sem);
> qemu_sem_destroy(&p->sem_sync);
> g_free(p->name);
> @@ -742,14 +741,12 @@ int multifd_send_sync_main(void)
>
> trace_multifd_send_sync_main_signal(p->id);
>
> - qemu_mutex_lock(&p->mutex);
> /*
> * We should be the only user so far, so not possible to be set by
> * others concurrently.
> */
> assert(qatomic_read(&p->pending_sync) == false);
> qatomic_set(&p->pending_sync, true);
> - qemu_mutex_unlock(&p->mutex);
> qemu_sem_post(&p->sem);
> }
> for (i = 0; i < migrate_multifd_channels(); i++) {
> @@ -796,9 +793,12 @@ static void *multifd_send_thread(void *opaque)
> if (multifd_send_should_exit()) {
> break;
> }
> - qemu_mutex_lock(&p->mutex);
>
> - if (qatomic_read(&p->pending_job)) {
> + /*
> + * Read pending_job flag before p->pages. Pairs with the
> + * qatomic_store_release() in multifd_send_pages().
> + */
> + if (qatomic_load_acquire(&p->pending_job)) {
> MultiFDPages_t *pages = p->pages;
>
> p->iovs_num = 0;
> @@ -806,14 +806,12 @@ static void *multifd_send_thread(void *opaque)
>
> ret = multifd_send_state->ops->send_prepare(p, &local_err);
> if (ret != 0) {
> - qemu_mutex_unlock(&p->mutex);
> break;
> }
>
> ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
> 0, p->write_flags, &local_err);
> if (ret != 0) {
> - qemu_mutex_unlock(&p->mutex);
> break;
> }
>
> @@ -822,24 +820,31 @@ static void *multifd_send_thread(void *opaque)
>
> multifd_pages_reset(p->pages);
> p->next_packet_size = 0;
> - qatomic_set(&p->pending_job, false);
> - qemu_mutex_unlock(&p->mutex);
> +
> + /*
> + * Making sure p->pages is published before saying "we're
> + * free". Pairs with the smp_mb_acquire() in
> + * multifd_send_pages().
> + */
> + qatomic_store_release(&p->pending_job, false);
> } else {
> - /* If not a normal job, must be a sync request */
> + /*
> + * If not a normal job, must be a sync request. Note that
> + * pending_sync is a standalone flag (unlike pending_job), so
> + * it doesn't require explicit memory barriers.
> + */
> assert(qatomic_read(&p->pending_sync));
> p->flags = MULTIFD_FLAG_SYNC;
> multifd_send_fill_packet(p);
> ret = qio_channel_write_all(p->c, (void *)p->packet,
> p->packet_len, &local_err);
> if (ret != 0) {
> - qemu_mutex_unlock(&p->mutex);
> break;
> }
> /* p->next_packet_size will always be zero for a SYNC packet */
> stat64_add(&mig_stats.multifd_bytes, p->packet_len);
> p->flags = 0;
> qatomic_set(&p->pending_sync, false);
> - qemu_mutex_unlock(&p->mutex);
> qemu_sem_post(&p->sem_sync);
> }
> }
> @@ -853,10 +858,7 @@ out:
> error_free(local_err);
> }
>
> - qemu_mutex_lock(&p->mutex);
> p->running = false;
> - qemu_mutex_unlock(&p->mutex);
> -
> rcu_unregister_thread();
> migration_threads_remove(thread);
> trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages);
> @@ -998,7 +1000,6 @@ int multifd_send_setup(Error **errp)
> for (i = 0; i < thread_count; i++) {
> MultiFDSendParams *p = &multifd_send_state->params[i];
>
> - qemu_mutex_init(&p->mutex);
> qemu_sem_init(&p->sem, 0);
> qemu_sem_init(&p->sem_sync, 0);
> p->id = i;
> --
> 2.43.0
Reviewed-by: Fabiano Rosas <farosas@suse.de>
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
` (22 preceding siblings ...)
2024-02-02 10:28 ` [PATCH v2 23/23] migration/multifd: Optimize sender side to be lockless peterx
@ 2024-02-06 3:05 ` Peter Xu
23 siblings, 0 replies; 49+ messages in thread
From: Peter Xu @ 2024-02-06 3:05 UTC (permalink / raw)
To: qemu-devel
Cc: Hao Xiang, Bryan Zhang, Fabiano Rosas, Avihai Horon, Yuan Liu,
Prasad Pandit
queued.
--
Peter Xu
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [External] [PATCH v2 05/23] migration/multifd: Drop MultiFDSendParams.normal[] array
2024-02-02 10:28 ` [PATCH v2 05/23] migration/multifd: Drop MultiFDSendParams.normal[] array peterx
@ 2024-02-09 0:06 ` Hao Xiang
2024-02-09 12:20 ` Fabiano Rosas
0 siblings, 1 reply; 49+ messages in thread
From: Hao Xiang @ 2024-02-09 0:06 UTC (permalink / raw)
To: peterx
Cc: qemu-devel, Bryan Zhang, Fabiano Rosas, Avihai Horon, Yuan Liu,
Prasad Pandit
On Fri, Feb 2, 2024 at 2:30 AM <peterx@redhat.com> wrote:
>
> From: Peter Xu <peterx@redhat.com>
>
> This array is redundant when p->pages exists. Now we extended the life of
> p->pages to the whole period where pending_job is set, it should be safe to
> always use p->pages->offset[] rather than p->normal[]. Drop the array.
>
> Alongside, the normal_num is also redundant, which is the same to
> p->pages->num.
Can we not drop p->normal and p_normal_num? It is redundant now but I
think it will be needed for multifd zero page checking. In multifd
zero page, we find out all zero pages and we sort the normal pages and
zero pages in two seperate arrays. p->offset is the original array of
pages, p->normal will contain the array of normal pages and p->zero
will contain the array of zero pages.
>
> This doesn't apply to recv side, because there's no extra buffering on recv
> side, so p->normal[] array is still needed.
>
> Reviewed-by: Fabiano Rosas <farosas@suse.de>
> Signed-off-by: Peter Xu <peterx@redhat.com>
> ---
> migration/multifd.h | 4 ----
> migration/multifd-zlib.c | 7 ++++---
> migration/multifd-zstd.c | 7 ++++---
> migration/multifd.c | 33 +++++++++++++--------------------
> 4 files changed, 21 insertions(+), 30 deletions(-)
>
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 7c040cb85a..3920bdbcf1 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -122,10 +122,6 @@ typedef struct {
> struct iovec *iov;
> /* number of iovs used */
> uint32_t iovs_num;
> - /* Pages that are not zero */
> - ram_addr_t *normal;
> - /* num of non zero pages */
> - uint32_t normal_num;
> /* used for compression methods */
> void *data;
> } MultiFDSendParams;
> diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
> index 37ce48621e..100809abc1 100644
> --- a/migration/multifd-zlib.c
> +++ b/migration/multifd-zlib.c
> @@ -116,17 +116,18 @@ static void zlib_send_cleanup(MultiFDSendParams *p, Error **errp)
> */
> static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
> {
> + MultiFDPages_t *pages = p->pages;
> struct zlib_data *z = p->data;
> z_stream *zs = &z->zs;
> uint32_t out_size = 0;
> int ret;
> uint32_t i;
>
> - for (i = 0; i < p->normal_num; i++) {
> + for (i = 0; i < pages->num; i++) {
> uint32_t available = z->zbuff_len - out_size;
> int flush = Z_NO_FLUSH;
>
> - if (i == p->normal_num - 1) {
> + if (i == pages->num - 1) {
> flush = Z_SYNC_FLUSH;
> }
>
> @@ -135,7 +136,7 @@ static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
> * with compression. zlib does not guarantee that this is safe,
> * therefore copy the page before calling deflate().
> */
> - memcpy(z->buf, p->pages->block->host + p->normal[i], p->page_size);
> + memcpy(z->buf, p->pages->block->host + pages->offset[i], p->page_size);
> zs->avail_in = p->page_size;
> zs->next_in = z->buf;
>
> diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
> index b471daadcd..2023edd8cc 100644
> --- a/migration/multifd-zstd.c
> +++ b/migration/multifd-zstd.c
> @@ -113,6 +113,7 @@ static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp)
> */
> static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
> {
> + MultiFDPages_t *pages = p->pages;
> struct zstd_data *z = p->data;
> int ret;
> uint32_t i;
> @@ -121,13 +122,13 @@ static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
> z->out.size = z->zbuff_len;
> z->out.pos = 0;
>
> - for (i = 0; i < p->normal_num; i++) {
> + for (i = 0; i < pages->num; i++) {
> ZSTD_EndDirective flush = ZSTD_e_continue;
>
> - if (i == p->normal_num - 1) {
> + if (i == pages->num - 1) {
> flush = ZSTD_e_flush;
> }
> - z->in.src = p->pages->block->host + p->normal[i];
> + z->in.src = p->pages->block->host + pages->offset[i];
> z->in.size = p->page_size;
> z->in.pos = 0;
>
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 5633ac245a..8bb1fd95cf 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -90,13 +90,13 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
> {
> MultiFDPages_t *pages = p->pages;
>
> - for (int i = 0; i < p->normal_num; i++) {
> - p->iov[p->iovs_num].iov_base = pages->block->host + p->normal[i];
> + for (int i = 0; i < pages->num; i++) {
> + p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
> p->iov[p->iovs_num].iov_len = p->page_size;
> p->iovs_num++;
> }
>
> - p->next_packet_size = p->normal_num * p->page_size;
> + p->next_packet_size = pages->num * p->page_size;
> p->flags |= MULTIFD_FLAG_NOCOMP;
> return 0;
> }
> @@ -269,21 +269,22 @@ static void multifd_pages_clear(MultiFDPages_t *pages)
> static void multifd_send_fill_packet(MultiFDSendParams *p)
> {
> MultiFDPacket_t *packet = p->packet;
> + MultiFDPages_t *pages = p->pages;
> int i;
>
> packet->flags = cpu_to_be32(p->flags);
> packet->pages_alloc = cpu_to_be32(p->pages->allocated);
> - packet->normal_pages = cpu_to_be32(p->normal_num);
> + packet->normal_pages = cpu_to_be32(pages->num);
> packet->next_packet_size = cpu_to_be32(p->next_packet_size);
> packet->packet_num = cpu_to_be64(p->packet_num);
>
> - if (p->pages->block) {
> - strncpy(packet->ramblock, p->pages->block->idstr, 256);
> + if (pages->block) {
> + strncpy(packet->ramblock, pages->block->idstr, 256);
> }
>
> - for (i = 0; i < p->normal_num; i++) {
> + for (i = 0; i < pages->num; i++) {
> /* there are architectures where ram_addr_t is 32 bit */
> - uint64_t temp = p->normal[i];
> + uint64_t temp = pages->offset[i];
>
> packet->offset[i] = cpu_to_be64(temp);
> }
> @@ -570,8 +571,6 @@ void multifd_save_cleanup(void)
> p->packet = NULL;
> g_free(p->iov);
> p->iov = NULL;
> - g_free(p->normal);
> - p->normal = NULL;
> multifd_send_state->ops->send_cleanup(p, &local_err);
> if (local_err) {
> migrate_set_error(migrate_get_current(), local_err);
> @@ -688,8 +687,8 @@ static void *multifd_send_thread(void *opaque)
>
> if (p->pending_job) {
> uint64_t packet_num = p->packet_num;
> + MultiFDPages_t *pages = p->pages;
> uint32_t flags;
> - p->normal_num = 0;
>
> if (use_zero_copy_send) {
> p->iovs_num = 0;
> @@ -697,12 +696,7 @@ static void *multifd_send_thread(void *opaque)
> p->iovs_num = 1;
> }
>
> - for (int i = 0; i < p->pages->num; i++) {
> - p->normal[p->normal_num] = p->pages->offset[i];
> - p->normal_num++;
> - }
> -
> - if (p->normal_num) {
> + if (pages->num) {
> ret = multifd_send_state->ops->send_prepare(p, &local_err);
> if (ret != 0) {
> qemu_mutex_unlock(&p->mutex);
> @@ -713,10 +707,10 @@ static void *multifd_send_thread(void *opaque)
> flags = p->flags;
> p->flags = 0;
> p->num_packets++;
> - p->total_normal_pages += p->normal_num;
> + p->total_normal_pages += pages->num;
> qemu_mutex_unlock(&p->mutex);
>
> - trace_multifd_send(p->id, packet_num, p->normal_num, flags,
> + trace_multifd_send(p->id, packet_num, pages->num, flags,
> p->next_packet_size);
>
> if (use_zero_copy_send) {
> @@ -924,7 +918,6 @@ int multifd_save_setup(Error **errp)
> p->name = g_strdup_printf("multifdsend_%d", i);
> /* We need one extra place for the packet header */
> p->iov = g_new0(struct iovec, page_count + 1);
> - p->normal = g_new0(ram_addr_t, page_count);
> p->page_size = qemu_target_page_size();
> p->page_count = page_count;
>
> --
> 2.43.0
>
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [External] [PATCH v2 05/23] migration/multifd: Drop MultiFDSendParams.normal[] array
2024-02-09 0:06 ` [External] " Hao Xiang
@ 2024-02-09 12:20 ` Fabiano Rosas
2024-02-14 2:16 ` Hao Xiang
0 siblings, 1 reply; 49+ messages in thread
From: Fabiano Rosas @ 2024-02-09 12:20 UTC (permalink / raw)
To: Hao Xiang, peterx
Cc: qemu-devel, Bryan Zhang, Avihai Horon, Yuan Liu, Prasad Pandit
Hao Xiang <hao.xiang@bytedance.com> writes:
> On Fri, Feb 2, 2024 at 2:30 AM <peterx@redhat.com> wrote:
>>
>> From: Peter Xu <peterx@redhat.com>
>>
>> This array is redundant when p->pages exists. Now we extended the life of
>> p->pages to the whole period where pending_job is set, it should be safe to
>> always use p->pages->offset[] rather than p->normal[]. Drop the array.
>>
>> Alongside, the normal_num is also redundant, which is the same to
>> p->pages->num.
>
> Can we not drop p->normal and p_normal_num? It is redundant now but I
> think it will be needed for multifd zero page checking. In multifd
> zero page, we find out all zero pages and we sort the normal pages and
> zero pages in two seperate arrays. p->offset is the original array of
> pages, p->normal will contain the array of normal pages and p->zero
> will contain the array of zero pages.
We're moving send_fill_packet into send_prepare(), so you should be able
to do whatever data transformation at send_prepare() and add any fields
you need into p->pages.
If we keep p->normal we will not be able to switch into an opaque
payload later on. There should be no mention of pages outside of
hooks. This is long-term work, but let's avoid blocking it if possible.
>>
>> This doesn't apply to recv side, because there's no extra buffering on recv
>> side, so p->normal[] array is still needed.
>>
>> Reviewed-by: Fabiano Rosas <farosas@suse.de>
>> Signed-off-by: Peter Xu <peterx@redhat.com>
>> ---
>> migration/multifd.h | 4 ----
>> migration/multifd-zlib.c | 7 ++++---
>> migration/multifd-zstd.c | 7 ++++---
>> migration/multifd.c | 33 +++++++++++++--------------------
>> 4 files changed, 21 insertions(+), 30 deletions(-)
>>
>> diff --git a/migration/multifd.h b/migration/multifd.h
>> index 7c040cb85a..3920bdbcf1 100644
>> --- a/migration/multifd.h
>> +++ b/migration/multifd.h
>> @@ -122,10 +122,6 @@ typedef struct {
>> struct iovec *iov;
>> /* number of iovs used */
>> uint32_t iovs_num;
>> - /* Pages that are not zero */
>> - ram_addr_t *normal;
>> - /* num of non zero pages */
>> - uint32_t normal_num;
>> /* used for compression methods */
>> void *data;
>> } MultiFDSendParams;
>> diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
>> index 37ce48621e..100809abc1 100644
>> --- a/migration/multifd-zlib.c
>> +++ b/migration/multifd-zlib.c
>> @@ -116,17 +116,18 @@ static void zlib_send_cleanup(MultiFDSendParams *p, Error **errp)
>> */
>> static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
>> {
>> + MultiFDPages_t *pages = p->pages;
>> struct zlib_data *z = p->data;
>> z_stream *zs = &z->zs;
>> uint32_t out_size = 0;
>> int ret;
>> uint32_t i;
>>
>> - for (i = 0; i < p->normal_num; i++) {
>> + for (i = 0; i < pages->num; i++) {
>> uint32_t available = z->zbuff_len - out_size;
>> int flush = Z_NO_FLUSH;
>>
>> - if (i == p->normal_num - 1) {
>> + if (i == pages->num - 1) {
>> flush = Z_SYNC_FLUSH;
>> }
>>
>> @@ -135,7 +136,7 @@ static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
>> * with compression. zlib does not guarantee that this is safe,
>> * therefore copy the page before calling deflate().
>> */
>> - memcpy(z->buf, p->pages->block->host + p->normal[i], p->page_size);
>> + memcpy(z->buf, p->pages->block->host + pages->offset[i], p->page_size);
>> zs->avail_in = p->page_size;
>> zs->next_in = z->buf;
>>
>> diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
>> index b471daadcd..2023edd8cc 100644
>> --- a/migration/multifd-zstd.c
>> +++ b/migration/multifd-zstd.c
>> @@ -113,6 +113,7 @@ static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp)
>> */
>> static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
>> {
>> + MultiFDPages_t *pages = p->pages;
>> struct zstd_data *z = p->data;
>> int ret;
>> uint32_t i;
>> @@ -121,13 +122,13 @@ static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
>> z->out.size = z->zbuff_len;
>> z->out.pos = 0;
>>
>> - for (i = 0; i < p->normal_num; i++) {
>> + for (i = 0; i < pages->num; i++) {
>> ZSTD_EndDirective flush = ZSTD_e_continue;
>>
>> - if (i == p->normal_num - 1) {
>> + if (i == pages->num - 1) {
>> flush = ZSTD_e_flush;
>> }
>> - z->in.src = p->pages->block->host + p->normal[i];
>> + z->in.src = p->pages->block->host + pages->offset[i];
>> z->in.size = p->page_size;
>> z->in.pos = 0;
>>
>> diff --git a/migration/multifd.c b/migration/multifd.c
>> index 5633ac245a..8bb1fd95cf 100644
>> --- a/migration/multifd.c
>> +++ b/migration/multifd.c
>> @@ -90,13 +90,13 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
>> {
>> MultiFDPages_t *pages = p->pages;
>>
>> - for (int i = 0; i < p->normal_num; i++) {
>> - p->iov[p->iovs_num].iov_base = pages->block->host + p->normal[i];
>> + for (int i = 0; i < pages->num; i++) {
>> + p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
>> p->iov[p->iovs_num].iov_len = p->page_size;
>> p->iovs_num++;
>> }
>>
>> - p->next_packet_size = p->normal_num * p->page_size;
>> + p->next_packet_size = pages->num * p->page_size;
>> p->flags |= MULTIFD_FLAG_NOCOMP;
>> return 0;
>> }
>> @@ -269,21 +269,22 @@ static void multifd_pages_clear(MultiFDPages_t *pages)
>> static void multifd_send_fill_packet(MultiFDSendParams *p)
>> {
>> MultiFDPacket_t *packet = p->packet;
>> + MultiFDPages_t *pages = p->pages;
>> int i;
>>
>> packet->flags = cpu_to_be32(p->flags);
>> packet->pages_alloc = cpu_to_be32(p->pages->allocated);
>> - packet->normal_pages = cpu_to_be32(p->normal_num);
>> + packet->normal_pages = cpu_to_be32(pages->num);
>> packet->next_packet_size = cpu_to_be32(p->next_packet_size);
>> packet->packet_num = cpu_to_be64(p->packet_num);
>>
>> - if (p->pages->block) {
>> - strncpy(packet->ramblock, p->pages->block->idstr, 256);
>> + if (pages->block) {
>> + strncpy(packet->ramblock, pages->block->idstr, 256);
>> }
>>
>> - for (i = 0; i < p->normal_num; i++) {
>> + for (i = 0; i < pages->num; i++) {
>> /* there are architectures where ram_addr_t is 32 bit */
>> - uint64_t temp = p->normal[i];
>> + uint64_t temp = pages->offset[i];
>>
>> packet->offset[i] = cpu_to_be64(temp);
>> }
>> @@ -570,8 +571,6 @@ void multifd_save_cleanup(void)
>> p->packet = NULL;
>> g_free(p->iov);
>> p->iov = NULL;
>> - g_free(p->normal);
>> - p->normal = NULL;
>> multifd_send_state->ops->send_cleanup(p, &local_err);
>> if (local_err) {
>> migrate_set_error(migrate_get_current(), local_err);
>> @@ -688,8 +687,8 @@ static void *multifd_send_thread(void *opaque)
>>
>> if (p->pending_job) {
>> uint64_t packet_num = p->packet_num;
>> + MultiFDPages_t *pages = p->pages;
>> uint32_t flags;
>> - p->normal_num = 0;
>>
>> if (use_zero_copy_send) {
>> p->iovs_num = 0;
>> @@ -697,12 +696,7 @@ static void *multifd_send_thread(void *opaque)
>> p->iovs_num = 1;
>> }
>>
>> - for (int i = 0; i < p->pages->num; i++) {
>> - p->normal[p->normal_num] = p->pages->offset[i];
>> - p->normal_num++;
>> - }
>> -
>> - if (p->normal_num) {
>> + if (pages->num) {
>> ret = multifd_send_state->ops->send_prepare(p, &local_err);
>> if (ret != 0) {
>> qemu_mutex_unlock(&p->mutex);
>> @@ -713,10 +707,10 @@ static void *multifd_send_thread(void *opaque)
>> flags = p->flags;
>> p->flags = 0;
>> p->num_packets++;
>> - p->total_normal_pages += p->normal_num;
>> + p->total_normal_pages += pages->num;
>> qemu_mutex_unlock(&p->mutex);
>>
>> - trace_multifd_send(p->id, packet_num, p->normal_num, flags,
>> + trace_multifd_send(p->id, packet_num, pages->num, flags,
>> p->next_packet_size);
>>
>> if (use_zero_copy_send) {
>> @@ -924,7 +918,6 @@ int multifd_save_setup(Error **errp)
>> p->name = g_strdup_printf("multifdsend_%d", i);
>> /* We need one extra place for the packet header */
>> p->iov = g_new0(struct iovec, page_count + 1);
>> - p->normal = g_new0(ram_addr_t, page_count);
>> p->page_size = qemu_target_page_size();
>> p->page_count = page_count;
>>
>> --
>> 2.43.0
>>
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [External] [PATCH v2 05/23] migration/multifd: Drop MultiFDSendParams.normal[] array
2024-02-09 12:20 ` Fabiano Rosas
@ 2024-02-14 2:16 ` Hao Xiang
2024-02-14 17:17 ` Fabiano Rosas
0 siblings, 1 reply; 49+ messages in thread
From: Hao Xiang @ 2024-02-14 2:16 UTC (permalink / raw)
To: Fabiano Rosas
Cc: peterx, qemu-devel, Bryan Zhang, Avihai Horon, Yuan Liu,
Prasad Pandit
On Fri, Feb 9, 2024 at 4:20 AM Fabiano Rosas <farosas@suse.de> wrote:
>
> Hao Xiang <hao.xiang@bytedance.com> writes:
>
> > On Fri, Feb 2, 2024 at 2:30 AM <peterx@redhat.com> wrote:
> >>
> >> From: Peter Xu <peterx@redhat.com>
> >>
> >> This array is redundant when p->pages exists. Now we extended the life of
> >> p->pages to the whole period where pending_job is set, it should be safe to
> >> always use p->pages->offset[] rather than p->normal[]. Drop the array.
> >>
> >> Alongside, the normal_num is also redundant, which is the same to
> >> p->pages->num.
> >
> > Can we not drop p->normal and p_normal_num? It is redundant now but I
> > think it will be needed for multifd zero page checking. In multifd
> > zero page, we find out all zero pages and we sort the normal pages and
> > zero pages in two seperate arrays. p->offset is the original array of
> > pages, p->normal will contain the array of normal pages and p->zero
> > will contain the array of zero pages.
>
> We're moving send_fill_packet into send_prepare(), so you should be able
> to do whatever data transformation at send_prepare() and add any fields
> you need into p->pages.
>
> If we keep p->normal we will not be able to switch into an opaque
> payload later on. There should be no mention of pages outside of
> hooks. This is long-term work, but let's avoid blocking it if possible.
>
Got it. I will make the proper changes.
Aside from that, I would like to get opinions from you guys regarding
zero page detection interface.
Here are the options I am thinking:
1) Do zero page detection in send_prepare().
This means no dedicated hook for zero_page_detection() otherwise we
will be calling a hook from inside a hook. But we will need a new
function multifd_zero_page_check_send() similar to how we use
multifd_send_fill_packet() now. multifd_zero_page_check_send() will
need to be called by all send_prepare() implementations.
2) Do zero page detection in a new hook zero_page_detection().
zero_page_detection will be called before send_prepare(). Seems like
extra complexity but I can go with that routine if you guys think it's
a cleaner way.
I am leaning towards 1) right now.
> >>
> >> This doesn't apply to recv side, because there's no extra buffering on recv
> >> side, so p->normal[] array is still needed.
> >>
> >> Reviewed-by: Fabiano Rosas <farosas@suse.de>
> >> Signed-off-by: Peter Xu <peterx@redhat.com>
> >> ---
> >> migration/multifd.h | 4 ----
> >> migration/multifd-zlib.c | 7 ++++---
> >> migration/multifd-zstd.c | 7 ++++---
> >> migration/multifd.c | 33 +++++++++++++--------------------
> >> 4 files changed, 21 insertions(+), 30 deletions(-)
> >>
> >> diff --git a/migration/multifd.h b/migration/multifd.h
> >> index 7c040cb85a..3920bdbcf1 100644
> >> --- a/migration/multifd.h
> >> +++ b/migration/multifd.h
> >> @@ -122,10 +122,6 @@ typedef struct {
> >> struct iovec *iov;
> >> /* number of iovs used */
> >> uint32_t iovs_num;
> >> - /* Pages that are not zero */
> >> - ram_addr_t *normal;
> >> - /* num of non zero pages */
> >> - uint32_t normal_num;
> >> /* used for compression methods */
> >> void *data;
> >> } MultiFDSendParams;
> >> diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
> >> index 37ce48621e..100809abc1 100644
> >> --- a/migration/multifd-zlib.c
> >> +++ b/migration/multifd-zlib.c
> >> @@ -116,17 +116,18 @@ static void zlib_send_cleanup(MultiFDSendParams *p, Error **errp)
> >> */
> >> static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
> >> {
> >> + MultiFDPages_t *pages = p->pages;
> >> struct zlib_data *z = p->data;
> >> z_stream *zs = &z->zs;
> >> uint32_t out_size = 0;
> >> int ret;
> >> uint32_t i;
> >>
> >> - for (i = 0; i < p->normal_num; i++) {
> >> + for (i = 0; i < pages->num; i++) {
> >> uint32_t available = z->zbuff_len - out_size;
> >> int flush = Z_NO_FLUSH;
> >>
> >> - if (i == p->normal_num - 1) {
> >> + if (i == pages->num - 1) {
> >> flush = Z_SYNC_FLUSH;
> >> }
> >>
> >> @@ -135,7 +136,7 @@ static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
> >> * with compression. zlib does not guarantee that this is safe,
> >> * therefore copy the page before calling deflate().
> >> */
> >> - memcpy(z->buf, p->pages->block->host + p->normal[i], p->page_size);
> >> + memcpy(z->buf, p->pages->block->host + pages->offset[i], p->page_size);
> >> zs->avail_in = p->page_size;
> >> zs->next_in = z->buf;
> >>
> >> diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
> >> index b471daadcd..2023edd8cc 100644
> >> --- a/migration/multifd-zstd.c
> >> +++ b/migration/multifd-zstd.c
> >> @@ -113,6 +113,7 @@ static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp)
> >> */
> >> static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
> >> {
> >> + MultiFDPages_t *pages = p->pages;
> >> struct zstd_data *z = p->data;
> >> int ret;
> >> uint32_t i;
> >> @@ -121,13 +122,13 @@ static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
> >> z->out.size = z->zbuff_len;
> >> z->out.pos = 0;
> >>
> >> - for (i = 0; i < p->normal_num; i++) {
> >> + for (i = 0; i < pages->num; i++) {
> >> ZSTD_EndDirective flush = ZSTD_e_continue;
> >>
> >> - if (i == p->normal_num - 1) {
> >> + if (i == pages->num - 1) {
> >> flush = ZSTD_e_flush;
> >> }
> >> - z->in.src = p->pages->block->host + p->normal[i];
> >> + z->in.src = p->pages->block->host + pages->offset[i];
> >> z->in.size = p->page_size;
> >> z->in.pos = 0;
> >>
> >> diff --git a/migration/multifd.c b/migration/multifd.c
> >> index 5633ac245a..8bb1fd95cf 100644
> >> --- a/migration/multifd.c
> >> +++ b/migration/multifd.c
> >> @@ -90,13 +90,13 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
> >> {
> >> MultiFDPages_t *pages = p->pages;
> >>
> >> - for (int i = 0; i < p->normal_num; i++) {
> >> - p->iov[p->iovs_num].iov_base = pages->block->host + p->normal[i];
> >> + for (int i = 0; i < pages->num; i++) {
> >> + p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
> >> p->iov[p->iovs_num].iov_len = p->page_size;
> >> p->iovs_num++;
> >> }
> >>
> >> - p->next_packet_size = p->normal_num * p->page_size;
> >> + p->next_packet_size = pages->num * p->page_size;
> >> p->flags |= MULTIFD_FLAG_NOCOMP;
> >> return 0;
> >> }
> >> @@ -269,21 +269,22 @@ static void multifd_pages_clear(MultiFDPages_t *pages)
> >> static void multifd_send_fill_packet(MultiFDSendParams *p)
> >> {
> >> MultiFDPacket_t *packet = p->packet;
> >> + MultiFDPages_t *pages = p->pages;
> >> int i;
> >>
> >> packet->flags = cpu_to_be32(p->flags);
> >> packet->pages_alloc = cpu_to_be32(p->pages->allocated);
> >> - packet->normal_pages = cpu_to_be32(p->normal_num);
> >> + packet->normal_pages = cpu_to_be32(pages->num);
> >> packet->next_packet_size = cpu_to_be32(p->next_packet_size);
> >> packet->packet_num = cpu_to_be64(p->packet_num);
> >>
> >> - if (p->pages->block) {
> >> - strncpy(packet->ramblock, p->pages->block->idstr, 256);
> >> + if (pages->block) {
> >> + strncpy(packet->ramblock, pages->block->idstr, 256);
> >> }
> >>
> >> - for (i = 0; i < p->normal_num; i++) {
> >> + for (i = 0; i < pages->num; i++) {
> >> /* there are architectures where ram_addr_t is 32 bit */
> >> - uint64_t temp = p->normal[i];
> >> + uint64_t temp = pages->offset[i];
> >>
> >> packet->offset[i] = cpu_to_be64(temp);
> >> }
> >> @@ -570,8 +571,6 @@ void multifd_save_cleanup(void)
> >> p->packet = NULL;
> >> g_free(p->iov);
> >> p->iov = NULL;
> >> - g_free(p->normal);
> >> - p->normal = NULL;
> >> multifd_send_state->ops->send_cleanup(p, &local_err);
> >> if (local_err) {
> >> migrate_set_error(migrate_get_current(), local_err);
> >> @@ -688,8 +687,8 @@ static void *multifd_send_thread(void *opaque)
> >>
> >> if (p->pending_job) {
> >> uint64_t packet_num = p->packet_num;
> >> + MultiFDPages_t *pages = p->pages;
> >> uint32_t flags;
> >> - p->normal_num = 0;
> >>
> >> if (use_zero_copy_send) {
> >> p->iovs_num = 0;
> >> @@ -697,12 +696,7 @@ static void *multifd_send_thread(void *opaque)
> >> p->iovs_num = 1;
> >> }
> >>
> >> - for (int i = 0; i < p->pages->num; i++) {
> >> - p->normal[p->normal_num] = p->pages->offset[i];
> >> - p->normal_num++;
> >> - }
> >> -
> >> - if (p->normal_num) {
> >> + if (pages->num) {
> >> ret = multifd_send_state->ops->send_prepare(p, &local_err);
> >> if (ret != 0) {
> >> qemu_mutex_unlock(&p->mutex);
> >> @@ -713,10 +707,10 @@ static void *multifd_send_thread(void *opaque)
> >> flags = p->flags;
> >> p->flags = 0;
> >> p->num_packets++;
> >> - p->total_normal_pages += p->normal_num;
> >> + p->total_normal_pages += pages->num;
> >> qemu_mutex_unlock(&p->mutex);
> >>
> >> - trace_multifd_send(p->id, packet_num, p->normal_num, flags,
> >> + trace_multifd_send(p->id, packet_num, pages->num, flags,
> >> p->next_packet_size);
> >>
> >> if (use_zero_copy_send) {
> >> @@ -924,7 +918,6 @@ int multifd_save_setup(Error **errp)
> >> p->name = g_strdup_printf("multifdsend_%d", i);
> >> /* We need one extra place for the packet header */
> >> p->iov = g_new0(struct iovec, page_count + 1);
> >> - p->normal = g_new0(ram_addr_t, page_count);
> >> p->page_size = qemu_target_page_size();
> >> p->page_count = page_count;
> >>
> >> --
> >> 2.43.0
> >>
^ permalink raw reply [flat|nested] 49+ messages in thread
* Re: [External] [PATCH v2 05/23] migration/multifd: Drop MultiFDSendParams.normal[] array
2024-02-14 2:16 ` Hao Xiang
@ 2024-02-14 17:17 ` Fabiano Rosas
0 siblings, 0 replies; 49+ messages in thread
From: Fabiano Rosas @ 2024-02-14 17:17 UTC (permalink / raw)
To: Hao Xiang
Cc: peterx, qemu-devel, Bryan Zhang, Avihai Horon, Yuan Liu,
Prasad Pandit
Hao Xiang <hao.xiang@bytedance.com> writes:
> On Fri, Feb 9, 2024 at 4:20 AM Fabiano Rosas <farosas@suse.de> wrote:
>>
>> Hao Xiang <hao.xiang@bytedance.com> writes:
>>
>> > On Fri, Feb 2, 2024 at 2:30 AM <peterx@redhat.com> wrote:
>> >>
>> >> From: Peter Xu <peterx@redhat.com>
>> >>
>> >> This array is redundant when p->pages exists. Now we extended the life of
>> >> p->pages to the whole period where pending_job is set, it should be safe to
>> >> always use p->pages->offset[] rather than p->normal[]. Drop the array.
>> >>
>> >> Alongside, the normal_num is also redundant, which is the same to
>> >> p->pages->num.
>> >
>> > Can we not drop p->normal and p_normal_num? It is redundant now but I
>> > think it will be needed for multifd zero page checking. In multifd
>> > zero page, we find out all zero pages and we sort the normal pages and
>> > zero pages in two seperate arrays. p->offset is the original array of
>> > pages, p->normal will contain the array of normal pages and p->zero
>> > will contain the array of zero pages.
>>
>> We're moving send_fill_packet into send_prepare(), so you should be able
>> to do whatever data transformation at send_prepare() and add any fields
>> you need into p->pages.
>>
>> If we keep p->normal we will not be able to switch into an opaque
>> payload later on. There should be no mention of pages outside of
>> hooks. This is long-term work, but let's avoid blocking it if possible.
>>
>
> Got it. I will make the proper changes.
>
> Aside from that, I would like to get opinions from you guys regarding
> zero page detection interface.
> Here are the options I am thinking:
> 1) Do zero page detection in send_prepare().
> This means no dedicated hook for zero_page_detection() otherwise we
> will be calling a hook from inside a hook. But we will need a new
> function multifd_zero_page_check_send() similar to how we use
> multifd_send_fill_packet() now. multifd_zero_page_check_send() will
> need to be called by all send_prepare() implementations.
> 2) Do zero page detection in a new hook zero_page_detection().
> zero_page_detection will be called before send_prepare(). Seems like
> extra complexity but I can go with that routine if you guys think it's
> a cleaner way.
>
> I am leaning towards 1) right now.
That's fine. Zero page detection is only needed for ram migration. Once
we start using multifd to transfer generic device state, then there will
be no zero page detection. So send_prepare() seems like a good place to
put it.
>> >>
>> >> This doesn't apply to recv side, because there's no extra buffering on recv
>> >> side, so p->normal[] array is still needed.
>> >>
>> >> Reviewed-by: Fabiano Rosas <farosas@suse.de>
>> >> Signed-off-by: Peter Xu <peterx@redhat.com>
>> >> ---
>> >> migration/multifd.h | 4 ----
>> >> migration/multifd-zlib.c | 7 ++++---
>> >> migration/multifd-zstd.c | 7 ++++---
>> >> migration/multifd.c | 33 +++++++++++++--------------------
>> >> 4 files changed, 21 insertions(+), 30 deletions(-)
>> >>
>> >> diff --git a/migration/multifd.h b/migration/multifd.h
>> >> index 7c040cb85a..3920bdbcf1 100644
>> >> --- a/migration/multifd.h
>> >> +++ b/migration/multifd.h
>> >> @@ -122,10 +122,6 @@ typedef struct {
>> >> struct iovec *iov;
>> >> /* number of iovs used */
>> >> uint32_t iovs_num;
>> >> - /* Pages that are not zero */
>> >> - ram_addr_t *normal;
>> >> - /* num of non zero pages */
>> >> - uint32_t normal_num;
>> >> /* used for compression methods */
>> >> void *data;
>> >> } MultiFDSendParams;
>> >> diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
>> >> index 37ce48621e..100809abc1 100644
>> >> --- a/migration/multifd-zlib.c
>> >> +++ b/migration/multifd-zlib.c
>> >> @@ -116,17 +116,18 @@ static void zlib_send_cleanup(MultiFDSendParams *p, Error **errp)
>> >> */
>> >> static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
>> >> {
>> >> + MultiFDPages_t *pages = p->pages;
>> >> struct zlib_data *z = p->data;
>> >> z_stream *zs = &z->zs;
>> >> uint32_t out_size = 0;
>> >> int ret;
>> >> uint32_t i;
>> >>
>> >> - for (i = 0; i < p->normal_num; i++) {
>> >> + for (i = 0; i < pages->num; i++) {
>> >> uint32_t available = z->zbuff_len - out_size;
>> >> int flush = Z_NO_FLUSH;
>> >>
>> >> - if (i == p->normal_num - 1) {
>> >> + if (i == pages->num - 1) {
>> >> flush = Z_SYNC_FLUSH;
>> >> }
>> >>
>> >> @@ -135,7 +136,7 @@ static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
>> >> * with compression. zlib does not guarantee that this is safe,
>> >> * therefore copy the page before calling deflate().
>> >> */
>> >> - memcpy(z->buf, p->pages->block->host + p->normal[i], p->page_size);
>> >> + memcpy(z->buf, p->pages->block->host + pages->offset[i], p->page_size);
>> >> zs->avail_in = p->page_size;
>> >> zs->next_in = z->buf;
>> >>
>> >> diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
>> >> index b471daadcd..2023edd8cc 100644
>> >> --- a/migration/multifd-zstd.c
>> >> +++ b/migration/multifd-zstd.c
>> >> @@ -113,6 +113,7 @@ static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp)
>> >> */
>> >> static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
>> >> {
>> >> + MultiFDPages_t *pages = p->pages;
>> >> struct zstd_data *z = p->data;
>> >> int ret;
>> >> uint32_t i;
>> >> @@ -121,13 +122,13 @@ static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
>> >> z->out.size = z->zbuff_len;
>> >> z->out.pos = 0;
>> >>
>> >> - for (i = 0; i < p->normal_num; i++) {
>> >> + for (i = 0; i < pages->num; i++) {
>> >> ZSTD_EndDirective flush = ZSTD_e_continue;
>> >>
>> >> - if (i == p->normal_num - 1) {
>> >> + if (i == pages->num - 1) {
>> >> flush = ZSTD_e_flush;
>> >> }
>> >> - z->in.src = p->pages->block->host + p->normal[i];
>> >> + z->in.src = p->pages->block->host + pages->offset[i];
>> >> z->in.size = p->page_size;
>> >> z->in.pos = 0;
>> >>
>> >> diff --git a/migration/multifd.c b/migration/multifd.c
>> >> index 5633ac245a..8bb1fd95cf 100644
>> >> --- a/migration/multifd.c
>> >> +++ b/migration/multifd.c
>> >> @@ -90,13 +90,13 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
>> >> {
>> >> MultiFDPages_t *pages = p->pages;
>> >>
>> >> - for (int i = 0; i < p->normal_num; i++) {
>> >> - p->iov[p->iovs_num].iov_base = pages->block->host + p->normal[i];
>> >> + for (int i = 0; i < pages->num; i++) {
>> >> + p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
>> >> p->iov[p->iovs_num].iov_len = p->page_size;
>> >> p->iovs_num++;
>> >> }
>> >>
>> >> - p->next_packet_size = p->normal_num * p->page_size;
>> >> + p->next_packet_size = pages->num * p->page_size;
>> >> p->flags |= MULTIFD_FLAG_NOCOMP;
>> >> return 0;
>> >> }
>> >> @@ -269,21 +269,22 @@ static void multifd_pages_clear(MultiFDPages_t *pages)
>> >> static void multifd_send_fill_packet(MultiFDSendParams *p)
>> >> {
>> >> MultiFDPacket_t *packet = p->packet;
>> >> + MultiFDPages_t *pages = p->pages;
>> >> int i;
>> >>
>> >> packet->flags = cpu_to_be32(p->flags);
>> >> packet->pages_alloc = cpu_to_be32(p->pages->allocated);
>> >> - packet->normal_pages = cpu_to_be32(p->normal_num);
>> >> + packet->normal_pages = cpu_to_be32(pages->num);
>> >> packet->next_packet_size = cpu_to_be32(p->next_packet_size);
>> >> packet->packet_num = cpu_to_be64(p->packet_num);
>> >>
>> >> - if (p->pages->block) {
>> >> - strncpy(packet->ramblock, p->pages->block->idstr, 256);
>> >> + if (pages->block) {
>> >> + strncpy(packet->ramblock, pages->block->idstr, 256);
>> >> }
>> >>
>> >> - for (i = 0; i < p->normal_num; i++) {
>> >> + for (i = 0; i < pages->num; i++) {
>> >> /* there are architectures where ram_addr_t is 32 bit */
>> >> - uint64_t temp = p->normal[i];
>> >> + uint64_t temp = pages->offset[i];
>> >>
>> >> packet->offset[i] = cpu_to_be64(temp);
>> >> }
>> >> @@ -570,8 +571,6 @@ void multifd_save_cleanup(void)
>> >> p->packet = NULL;
>> >> g_free(p->iov);
>> >> p->iov = NULL;
>> >> - g_free(p->normal);
>> >> - p->normal = NULL;
>> >> multifd_send_state->ops->send_cleanup(p, &local_err);
>> >> if (local_err) {
>> >> migrate_set_error(migrate_get_current(), local_err);
>> >> @@ -688,8 +687,8 @@ static void *multifd_send_thread(void *opaque)
>> >>
>> >> if (p->pending_job) {
>> >> uint64_t packet_num = p->packet_num;
>> >> + MultiFDPages_t *pages = p->pages;
>> >> uint32_t flags;
>> >> - p->normal_num = 0;
>> >>
>> >> if (use_zero_copy_send) {
>> >> p->iovs_num = 0;
>> >> @@ -697,12 +696,7 @@ static void *multifd_send_thread(void *opaque)
>> >> p->iovs_num = 1;
>> >> }
>> >>
>> >> - for (int i = 0; i < p->pages->num; i++) {
>> >> - p->normal[p->normal_num] = p->pages->offset[i];
>> >> - p->normal_num++;
>> >> - }
>> >> -
>> >> - if (p->normal_num) {
>> >> + if (pages->num) {
>> >> ret = multifd_send_state->ops->send_prepare(p, &local_err);
>> >> if (ret != 0) {
>> >> qemu_mutex_unlock(&p->mutex);
>> >> @@ -713,10 +707,10 @@ static void *multifd_send_thread(void *opaque)
>> >> flags = p->flags;
>> >> p->flags = 0;
>> >> p->num_packets++;
>> >> - p->total_normal_pages += p->normal_num;
>> >> + p->total_normal_pages += pages->num;
>> >> qemu_mutex_unlock(&p->mutex);
>> >>
>> >> - trace_multifd_send(p->id, packet_num, p->normal_num, flags,
>> >> + trace_multifd_send(p->id, packet_num, pages->num, flags,
>> >> p->next_packet_size);
>> >>
>> >> if (use_zero_copy_send) {
>> >> @@ -924,7 +918,6 @@ int multifd_save_setup(Error **errp)
>> >> p->name = g_strdup_printf("multifdsend_%d", i);
>> >> /* We need one extra place for the packet header */
>> >> p->iov = g_new0(struct iovec, page_count + 1);
>> >> - p->normal = g_new0(ram_addr_t, page_count);
>> >> p->page_size = qemu_target_page_size();
>> >> p->page_count = page_count;
>> >>
>> >> --
>> >> 2.43.0
>> >>
^ permalink raw reply [flat|nested] 49+ messages in thread
end of thread, other threads:[~2024-02-14 17:20 UTC | newest]
Thread overview: 49+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2024-02-02 10:28 [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups peterx
2024-02-02 10:28 ` [PATCH v2 01/23] migration/multifd: Drop stale comment for multifd zero copy peterx
2024-02-02 10:28 ` [PATCH v2 02/23] migration/multifd: multifd_send_kick_main() peterx
2024-02-02 10:28 ` [PATCH v2 03/23] migration/multifd: Drop MultiFDSendParams.quit, cleanup error paths peterx
2024-02-02 19:15 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 04/23] migration/multifd: Postpone reset of MultiFDPages_t peterx
2024-02-02 10:28 ` [PATCH v2 05/23] migration/multifd: Drop MultiFDSendParams.normal[] array peterx
2024-02-09 0:06 ` [External] " Hao Xiang
2024-02-09 12:20 ` Fabiano Rosas
2024-02-14 2:16 ` Hao Xiang
2024-02-14 17:17 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 06/23] migration/multifd: Separate SYNC request with normal jobs peterx
2024-02-02 19:21 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 07/23] migration/multifd: Simplify locking in sender thread peterx
2024-02-02 19:23 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 08/23] migration/multifd: Drop pages->num check " peterx
2024-02-02 10:28 ` [PATCH v2 09/23] migration/multifd: Rename p->num_packets and clean it up peterx
2024-02-02 10:28 ` [PATCH v2 10/23] migration/multifd: Move total_normal_pages accounting peterx
2024-02-02 10:28 ` [PATCH v2 11/23] migration/multifd: Move trace_multifd_send|recv() peterx
2024-02-02 10:28 ` [PATCH v2 12/23] migration/multifd: multifd_send_prepare_header() peterx
2024-02-02 10:28 ` [PATCH v2 13/23] migration/multifd: Move header prepare/fill into send_prepare() peterx
2024-02-02 19:26 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 14/23] migration/multifd: Forbid spurious wakeups peterx
2024-02-02 10:28 ` [PATCH v2 15/23] migration/multifd: Split multifd_send_terminate_threads() peterx
2024-02-02 19:28 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 16/23] migration/multifd: Change retval of multifd_queue_page() peterx
2024-02-02 19:29 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 17/23] migration/multifd: Change retval of multifd_send_pages() peterx
2024-02-02 19:30 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 18/23] migration/multifd: Rewrite multifd_queue_page() peterx
2024-02-02 20:47 ` Fabiano Rosas
2024-02-05 4:03 ` Peter Xu
2024-02-02 10:28 ` [PATCH v2 19/23] migration/multifd: Cleanup multifd_save_cleanup() peterx
2024-02-02 20:54 ` Fabiano Rosas
2024-02-05 4:25 ` Peter Xu
2024-02-02 10:28 ` [PATCH v2 20/23] migration/multifd: Cleanup multifd_load_cleanup() peterx
2024-02-02 20:55 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 21/23] migration/multifd: Stick with send/recv on function names peterx
2024-02-02 21:03 ` Fabiano Rosas
2024-02-02 10:28 ` [PATCH v2 22/23] migration/multifd: Fix MultiFDSendParams.packet_num race peterx
2024-02-02 21:08 ` Fabiano Rosas
2024-02-05 4:05 ` Peter Xu
2024-02-02 10:28 ` [PATCH v2 23/23] migration/multifd: Optimize sender side to be lockless peterx
2024-02-02 21:34 ` Fabiano Rosas
2024-02-05 4:35 ` Peter Xu
2024-02-05 14:10 ` Fabiano Rosas
2024-02-05 14:24 ` Peter Xu
2024-02-05 17:59 ` Fabiano Rosas
2024-02-06 3:05 ` [PATCH v2 00/23] migration/multifd: Refactor ->send_prepare() and cleanups Peter Xu
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).