From: Peter Xu <peterx@redhat.com>
To: Fabiano Rosas <farosas@suse.de>
Cc: qemu-devel@nongnu.org, berrange@redhat.com, armbru@redhat.com,
Juan Quintela <quintela@redhat.com>,
Leonardo Bras <leobras@redhat.com>,
Claudio Fontana <cfontana@suse.de>
Subject: Re: [RFC PATCH v3 18/30] migration/multifd: Allow receiving pages without packets
Date: Tue, 16 Jan 2024 16:10:59 +0800 [thread overview]
Message-ID: <ZaY6E4tefb5DGEp9@x1n> (raw)
In-Reply-To: <20231127202612.23012-19-farosas@suse.de>
On Mon, Nov 27, 2023 at 05:26:00PM -0300, Fabiano Rosas wrote:
> Currently multifd does not need to have knowledge of pages on the
> receiving side because all the information needed is within the
> packets that come in the stream.
>
> We're about to add support to fixed-ram migration, which cannot use
> packets because it expects the ramblock section in the migration file
> to contain only the guest pages data.
>
> Add a data structure to transfer pages between the ram migration code
> and the multifd receiving threads.
>
> We don't want to reuse MultiFDPages_t for two reasons:
>
> a) multifd threads don't really need to know about the data they're
> receiving.
>
> b) the receiving side has to be stopped to load the pages, which means
> we can experiment with larger granularities than page size when
> transferring data.
>
> Signed-off-by: Fabiano Rosas <farosas@suse.de>
> ---
> - stopped using MultiFDPages_t and added a new structure which can
> take offset + size
> ---
> migration/multifd.c | 122 ++++++++++++++++++++++++++++++++++++++++++--
> migration/multifd.h | 20 ++++++++
> 2 files changed, 138 insertions(+), 4 deletions(-)
>
> diff --git a/migration/multifd.c b/migration/multifd.c
> index c1381bdc21..7dfab2367a 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -142,17 +142,36 @@ static void nocomp_recv_cleanup(MultiFDRecvParams *p)
> static int nocomp_recv_data(MultiFDRecvParams *p, Error **errp)
> {
> uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
> + ERRP_GUARD();
>
> if (flags != MULTIFD_FLAG_NOCOMP) {
> error_setg(errp, "multifd %u: flags received %x flags expected %x",
> p->id, flags, MULTIFD_FLAG_NOCOMP);
> return -1;
> }
> - for (int i = 0; i < p->normal_num; i++) {
> - p->iov[i].iov_base = p->host + p->normal[i];
> - p->iov[i].iov_len = p->page_size;
> +
> + if (!migrate_multifd_packets()) {
> + MultiFDRecvData *data = p->data;
> + size_t ret;
> +
> + ret = qio_channel_pread(p->c, (char *) data->opaque,
> + data->size, data->file_offset, errp);
> + if (ret != data->size) {
> + error_prepend(errp,
> + "multifd recv (%u): read 0x%zx, expected 0x%zx",
> + p->id, ret, data->size);
> + return -1;
> + }
> +
> + return 0;
> + } else {
> + for (int i = 0; i < p->normal_num; i++) {
> + p->iov[i].iov_base = p->host + p->normal[i];
> + p->iov[i].iov_len = p->page_size;
> + }
> +
> + return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
> }
I guess you managed to squash the file loads into "no compression" handler
of multifd, but IMHO it's not as clean.
Firstly, if to do so, we'd better make sure multifd-compression is not
enabled anywhere together with fixed-ram. I didn't yet see such protection
in the series. I think if it happens we should expect crashes because
they'll go into zlib/zstd paths for the file.
IMHO the only model fixed-ram can share with multifd is the task management
part, mutexes, semaphores, etc.. IIRC I used to mention that it'll be nice
if we have simply a pool of threads so we can enqueue tasks. If that's too
far away, would something like below closer to that? What I'm thinking:
- patch 1: rename MultiFDMethods to MultiFDCompressMethods, this can
replace the other patch to do s/recv_pages/recv_data/
- patch 2: introduce MultiFDMethods (on top of MultiFDCompressMethods),
refactor the current code to provide the socket version of MultiFDMethods.
- patch 3: add the fixed-ram "file" version of MultiFDMethods
MultiFDCompressMethods doesn't need to be used at all for "file" version of
MultiFDMethods.
Would that work?
> - return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
> }
>
> static MultiFDMethods multifd_nocomp_ops = {
> @@ -989,6 +1008,7 @@ int multifd_save_setup(Error **errp)
>
> struct {
> MultiFDRecvParams *params;
> + MultiFDRecvData *data;
(If above would work, maybe we can split MultiFDRecvParams into two chunks,
one commonly used for both, one only for sockets?)
> /* number of created threads */
> int count;
> /* syncs main thread and channels */
> @@ -999,6 +1019,49 @@ struct {
> MultiFDMethods *ops;
> } *multifd_recv_state;
>
> +int multifd_recv(void)
> +{
> + int i;
> + static int next_recv_channel;
> + MultiFDRecvParams *p = NULL;
> + MultiFDRecvData *data = multifd_recv_state->data;
> +
> + /*
> + * next_channel can remain from a previous migration that was
> + * using more channels, so ensure it doesn't overflow if the
> + * limit is lower now.
> + */
> + next_recv_channel %= migrate_multifd_channels();
> + for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
> + p = &multifd_recv_state->params[i];
> +
> + qemu_mutex_lock(&p->mutex);
> + if (p->quit) {
> + error_report("%s: channel %d has already quit!", __func__, i);
> + qemu_mutex_unlock(&p->mutex);
> + return -1;
> + }
> + if (!p->pending_job) {
> + p->pending_job++;
> + next_recv_channel = (i + 1) % migrate_multifd_channels();
> + break;
> + }
> + qemu_mutex_unlock(&p->mutex);
> + }
> + assert(p->data->size == 0);
> + multifd_recv_state->data = p->data;
> + p->data = data;
> + qemu_mutex_unlock(&p->mutex);
> + qemu_sem_post(&p->sem);
> +
> + return 1;
> +}
PS: so if we have the pool model we can already mostly merge above code
with multifd_send_pages().. because this will be a common helper to enqueue
a task to a pool, no matter it's for writting (to file/socket) or reading
(only from file).
> +
> +MultiFDRecvData *multifd_get_recv_data(void)
> +{
> + return multifd_recv_state->data;
> +}
> +
> static void multifd_recv_terminate_threads(Error *err)
> {
> int i;
> @@ -1020,6 +1083,7 @@ static void multifd_recv_terminate_threads(Error *err)
>
> qemu_mutex_lock(&p->mutex);
> p->quit = true;
> + qemu_sem_post(&p->sem);
> /*
> * We could arrive here for two reasons:
> * - normal quit, i.e. everything went fine, just finished
> @@ -1069,6 +1133,7 @@ void multifd_load_cleanup(void)
> p->c = NULL;
> qemu_mutex_destroy(&p->mutex);
> qemu_sem_destroy(&p->sem_sync);
> + qemu_sem_destroy(&p->sem);
> g_free(p->name);
> p->name = NULL;
> p->packet_len = 0;
> @@ -1083,6 +1148,8 @@ void multifd_load_cleanup(void)
> qemu_sem_destroy(&multifd_recv_state->sem_sync);
> g_free(multifd_recv_state->params);
> multifd_recv_state->params = NULL;
> + g_free(multifd_recv_state->data);
> + multifd_recv_state->data = NULL;
> g_free(multifd_recv_state);
> multifd_recv_state = NULL;
> }
> @@ -1094,6 +1161,21 @@ void multifd_recv_sync_main(void)
> if (!migrate_multifd() || !migrate_multifd_packets()) {
[1]
> return;
> }
> +
> + if (!migrate_multifd_packets()) {
Hmm, isn't this checked already above at [1]? Could this path ever trigger
then? Maybe we need to drop the one at [1]?
IIUC what you wanted to do here is relying on the last RAM_SAVE_FLAG_EOS in
the image file to do a full flush to make sure all pages are loaded.
You may want to be careful on the side effect of flush_after_each_section
parameter:
case RAM_SAVE_FLAG_EOS:
/* normal exit */
if (migrate_multifd() &&
migrate_multifd_flush_after_each_section()) {
multifd_recv_sync_main();
}
You may want to flush always for file?
> + for (i = 0; i < migrate_multifd_channels(); i++) {
> + MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +
> + qemu_sem_post(&p->sem);
> + qemu_sem_wait(&p->sem_sync);
> +
> + qemu_mutex_lock(&p->mutex);
> + assert(!p->pending_job || p->quit);
> + qemu_mutex_unlock(&p->mutex);
> + }
> + return;
Btw, how does this kick off all the recv threads? Is it because you did a
sem_post(&sem) with p->pending_job==false this time?
Maybe it's clearer to just set p->quit (or a global quite knob) somewhere?
That'll be clear that this is a one-shot thing, only needed at the end of
the file incoming migration.
> + }
> +
> for (i = 0; i < migrate_multifd_channels(); i++) {
> MultiFDRecvParams *p = &multifd_recv_state->params[i];
>
> @@ -1156,6 +1238,18 @@ static void *multifd_recv_thread(void *opaque)
>
> p->total_normal_pages += p->normal_num;
> has_data = !!p->normal_num;
> + } else {
> + /*
> + * No packets, so we need to wait for the vmstate code to
> + * give us work.
> + */
> + qemu_sem_wait(&p->sem);
> + qemu_mutex_lock(&p->mutex);
> + if (!p->pending_job) {
> + qemu_mutex_unlock(&p->mutex);
> + break;
> + }
> + has_data = !!p->data->size;
> }
>
> qemu_mutex_unlock(&p->mutex);
> @@ -1171,6 +1265,17 @@ static void *multifd_recv_thread(void *opaque)
> qemu_sem_post(&multifd_recv_state->sem_sync);
> qemu_sem_wait(&p->sem_sync);
> }
> +
> + if (!use_packets) {
> + qemu_mutex_lock(&p->mutex);
> + p->data->size = 0;
> + p->pending_job--;
> + qemu_mutex_unlock(&p->mutex);
> + }
> + }
> +
> + if (!use_packets) {
> + qemu_sem_post(&p->sem_sync);
Currently sem_sync is only posted with MULTIFD_FLAG_SYNC flag. We'd better
be careful on reusing it.
Maybe add some comment above recv_state->sem_sync?
/*
* For sockets: this is posted once for each MULTIFD_FLAG_SYNC flag.
*
* For files: this is only posted at the end of the file load to mark
* completion of the load process.
*/
> }
>
> if (local_err) {
> @@ -1205,6 +1310,10 @@ int multifd_load_setup(Error **errp)
> thread_count = migrate_multifd_channels();
> multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
> multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
> +
> + multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
> + multifd_recv_state->data->size = 0;
> +
> qatomic_set(&multifd_recv_state->count, 0);
> qemu_sem_init(&multifd_recv_state->sem_sync, 0);
> multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
> @@ -1214,9 +1323,14 @@ int multifd_load_setup(Error **errp)
>
> qemu_mutex_init(&p->mutex);
> qemu_sem_init(&p->sem_sync, 0);
> + qemu_sem_init(&p->sem, 0);
> p->quit = false;
> + p->pending_job = 0;
> p->id = i;
>
> + p->data = g_new0(MultiFDRecvData, 1);
> + p->data->size = 0;
> +
> if (use_packets) {
> p->packet_len = sizeof(MultiFDPacket_t)
> + sizeof(uint64_t) * page_count;
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 406d42dbae..abaf16c3f2 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -13,6 +13,8 @@
> #ifndef QEMU_MIGRATION_MULTIFD_H
> #define QEMU_MIGRATION_MULTIFD_H
>
> +typedef struct MultiFDRecvData MultiFDRecvData;
> +
> int multifd_save_setup(Error **errp);
> void multifd_save_cleanup(void);
> int multifd_load_setup(Error **errp);
> @@ -24,6 +26,8 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
> void multifd_recv_sync_main(void);
> int multifd_send_sync_main(QEMUFile *f);
> int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset);
> +int multifd_recv(void);
> +MultiFDRecvData *multifd_get_recv_data(void);
>
> /* Multifd Compression flags */
> #define MULTIFD_FLAG_SYNC (1 << 0)
> @@ -66,6 +70,13 @@ typedef struct {
> RAMBlock *block;
> } MultiFDPages_t;
>
> +struct MultiFDRecvData {
> + void *opaque;
> + size_t size;
> + /* for preadv */
> + off_t file_offset;
> +};
> +
> typedef struct {
> /* Fields are only written at creating/deletion time */
> /* No lock required for them, they are read only */
> @@ -156,6 +167,8 @@ typedef struct {
>
> /* syncs main thread and channels */
> QemuSemaphore sem_sync;
> + /* sem where to wait for more work */
> + QemuSemaphore sem;
>
> /* this mutex protects the following parameters */
> QemuMutex mutex;
> @@ -167,6 +180,13 @@ typedef struct {
> uint32_t flags;
> /* global number of generated multifd packets */
> uint64_t packet_num;
> + int pending_job;
> + /*
> + * The owner of 'data' depends of 'pending_job' value:
> + * pending_job == 0 -> migration_thread can use it.
> + * pending_job != 0 -> multifd_channel can use it.
> + */
> + MultiFDRecvData *data;
Right after the main thread assigns a chunk of memory to load for a recv
thread, the main thread job done, afaict. I don't see how a race could
happen here.
I'm not sure, but I _think_ if we rely on p->quite or something similar to
quite all recv threads, then this can be dropped?
>
> /* thread local variables. No locking required */
>
> --
> 2.35.3
>
--
Peter Xu
next prev parent reply other threads:[~2024-01-16 8:11 UTC|newest]
Thread overview: 95+ messages / expand[flat|nested] mbox.gz Atom feed top
2023-11-27 20:25 [RFC PATCH v3 00/30] migration: File based migration with multifd and fixed-ram Fabiano Rosas
2023-11-27 20:25 ` [RFC PATCH v3 01/30] io: add and implement QIO_CHANNEL_FEATURE_SEEKABLE for channel file Fabiano Rosas
2024-01-10 8:49 ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 02/30] io: Add generic pwritev/preadv interface Fabiano Rosas
2024-01-10 9:07 ` Daniel P. Berrangé
2024-01-11 6:59 ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 03/30] io: implement io_pwritev/preadv for QIOChannelFile Fabiano Rosas
2024-01-10 9:08 ` Daniel P. Berrangé
2024-01-11 7:04 ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 04/30] io: fsync before closing a file channel Fabiano Rosas
2024-01-10 9:04 ` Daniel P. Berrangé
2024-01-11 8:44 ` Peter Xu
2024-01-11 18:46 ` Fabiano Rosas
2024-01-12 0:01 ` Peter Xu
2024-01-12 10:40 ` Daniel P. Berrangé
2024-01-15 3:38 ` Peter Xu
2024-01-15 8:57 ` Peter Xu
2024-01-15 9:03 ` Daniel P. Berrangé
2024-01-15 9:31 ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 05/30] migration/qemu-file: add utility methods for working with seekable channels Fabiano Rosas
2024-01-11 9:57 ` Peter Xu
2024-01-11 18:49 ` Fabiano Rosas
2023-11-27 20:25 ` [RFC PATCH v3 06/30] migration/ram: Introduce 'fixed-ram' migration capability Fabiano Rosas
2023-12-22 10:35 ` Markus Armbruster
2024-01-11 10:43 ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 07/30] migration: Add fixed-ram URI compatibility check Fabiano Rosas
2024-01-15 9:01 ` Peter Xu
2024-01-23 19:07 ` Fabiano Rosas
2024-01-23 19:07 ` Fabiano Rosas
2023-11-27 20:25 ` [RFC PATCH v3 08/30] migration/ram: Add outgoing 'fixed-ram' migration Fabiano Rosas
2024-01-15 9:28 ` Peter Xu
2024-01-15 14:50 ` Fabiano Rosas
2023-11-27 20:25 ` [RFC PATCH v3 09/30] migration/ram: Add incoming " Fabiano Rosas
2024-01-15 9:49 ` Peter Xu
2024-01-15 16:43 ` Fabiano Rosas
2023-11-27 20:25 ` [RFC PATCH v3 10/30] tests/qtest: migration-test: Add tests for fixed-ram file-based migration Fabiano Rosas
2024-01-15 10:01 ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 11/30] migration/multifd: Allow multifd without packets Fabiano Rosas
2024-01-15 11:51 ` Peter Xu
2024-01-15 18:39 ` Fabiano Rosas
2024-01-15 23:01 ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 12/30] migration/multifd: Allow QIOTask error reporting without an object Fabiano Rosas
2024-01-15 12:06 ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 13/30] migration/multifd: Add outgoing QIOChannelFile support Fabiano Rosas
2024-01-16 4:05 ` Peter Xu
2024-01-16 7:25 ` Peter Xu
2024-01-16 13:37 ` Fabiano Rosas
2024-01-17 8:28 ` Peter Xu
2024-01-17 17:34 ` Fabiano Rosas
2024-01-18 7:11 ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 14/30] migration/multifd: Add incoming " Fabiano Rosas
2024-01-16 6:29 ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 15/30] io: Add a pwritev/preadv version that takes a discontiguous iovec Fabiano Rosas
2024-01-16 6:58 ` Peter Xu
2024-01-16 18:15 ` Fabiano Rosas
2024-01-17 9:48 ` Peter Xu
2024-01-17 18:06 ` Fabiano Rosas
2024-01-18 7:44 ` Peter Xu
2024-01-18 12:47 ` Fabiano Rosas
2024-01-19 0:22 ` Peter Xu
2024-01-17 12:39 ` Daniel P. Berrangé
2024-01-17 14:27 ` Daniel P. Berrangé
2024-01-17 18:09 ` Fabiano Rosas
2023-11-27 20:25 ` [RFC PATCH v3 16/30] multifd: Rename MultiFDSendParams::data to compress_data Fabiano Rosas
2024-01-16 7:03 ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 17/30] migration/multifd: Decouple recv method from pages Fabiano Rosas
2024-01-16 7:23 ` Peter Xu
2023-11-27 20:26 ` [RFC PATCH v3 18/30] migration/multifd: Allow receiving pages without packets Fabiano Rosas
2024-01-16 8:10 ` Peter Xu [this message]
2024-01-16 20:25 ` Fabiano Rosas
2024-01-19 0:20 ` Peter Xu
2024-01-19 12:57 ` Fabiano Rosas
2023-11-27 20:26 ` [RFC PATCH v3 19/30] migration/ram: Ignore multifd flush when doing fixed-ram migration Fabiano Rosas
2024-01-16 8:23 ` Peter Xu
2024-01-17 18:13 ` Fabiano Rosas
2024-01-19 1:33 ` Peter Xu
2023-11-27 20:26 ` [RFC PATCH v3 20/30] migration/multifd: Support outgoing fixed-ram stream format Fabiano Rosas
2023-11-27 20:26 ` [RFC PATCH v3 21/30] migration/multifd: Support incoming " Fabiano Rosas
2023-11-27 20:26 ` [RFC PATCH v3 22/30] tests/qtest: Add a multifd + fixed-ram migration test Fabiano Rosas
2023-11-27 20:26 ` [RFC PATCH v3 23/30] migration: Add direct-io parameter Fabiano Rosas
2023-12-22 10:38 ` Markus Armbruster
2023-11-27 20:26 ` [RFC PATCH v3 24/30] tests/qtest: Add a test for migration with direct-io and multifd Fabiano Rosas
2023-11-27 20:26 ` [RFC PATCH v3 25/30] monitor: Honor QMP request for fd removal immediately Fabiano Rosas
2023-11-27 20:26 ` [RFC PATCH v3 26/30] monitor: Extract fdset fd flags comparison into a function Fabiano Rosas
2023-11-27 20:26 ` [RFC PATCH v3 27/30] monitor: fdset: Match against O_DIRECT Fabiano Rosas
2023-11-27 20:26 ` [RFC PATCH v3 28/30] docs/devel/migration.rst: Document the file transport Fabiano Rosas
2023-11-27 20:26 ` [RFC PATCH v3 29/30] migration: Add support for fdset with multifd + file Fabiano Rosas
2023-11-27 20:26 ` [RFC PATCH v3 30/30] tests/qtest: Add a test for fixed-ram with passing of fds Fabiano Rosas
2024-01-11 10:50 ` [RFC PATCH v3 00/30] migration: File based migration with multifd and fixed-ram Peter Xu
2024-01-11 18:38 ` Fabiano Rosas
2024-01-15 6:22 ` Peter Xu
2024-01-15 8:11 ` Daniel P. Berrangé
2024-01-15 8:41 ` Peter Xu
2024-01-15 19:45 ` Fabiano Rosas
2024-01-15 23:20 ` Peter Xu
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=ZaY6E4tefb5DGEp9@x1n \
--to=peterx@redhat.com \
--cc=armbru@redhat.com \
--cc=berrange@redhat.com \
--cc=cfontana@suse.de \
--cc=farosas@suse.de \
--cc=leobras@redhat.com \
--cc=qemu-devel@nongnu.org \
--cc=quintela@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).