From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
To: Juan Quintela <quintela@redhat.com>
Cc: qemu-devel@nongnu.org, lvivier@redhat.com, peterx@redhat.com
Subject: Re: [Qemu-devel] [PATCH v12 16/21] migration: Synchronize multifd threads with main thread
Date: Thu, 3 May 2018 11:44:38 +0100 [thread overview]
Message-ID: <20180503104437.GC2660@work-vm> (raw)
In-Reply-To: <20180425112723.1111-17-quintela@redhat.com>
* Juan Quintela (quintela@redhat.com) wrote:
> We synchronize all threads each RAM_SAVE_FLAG_EOS. Bitmap
> synchronizations don't happen inside a ram section, so we are safe
> about two channels trying to overwrite the same memory.
OK, that's quite neat - so you don't need any extra flags in the stream
to do the sync; it probably needs a comment in the code somewhere so we
don't forget!
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
> migration/ram.c | 118 +++++++++++++++++++++++++++++++++++++----
> migration/trace-events | 6 +++
> 2 files changed, 113 insertions(+), 11 deletions(-)
>
> diff --git a/migration/ram.c b/migration/ram.c
> index c4c185cc4c..398cb0af3b 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -405,6 +405,8 @@ static void compress_threads_save_setup(void)
> #define MULTIFD_MAGIC 0x11223344U
> #define MULTIFD_VERSION 1
>
> +#define MULTIFD_FLAG_SYNC (1 << 0)
> +
> typedef struct {
> uint32_t magic;
> uint32_t version;
> @@ -471,6 +473,8 @@ typedef struct {
> uint32_t num_packets;
> /* pages sent through this channel */
> uint32_t num_pages;
> + /* syncs main thread and channels */
> + QemuSemaphore sem_sync;
> } MultiFDSendParams;
>
> typedef struct {
> @@ -507,6 +511,8 @@ typedef struct {
> uint32_t num_packets;
> /* pages sent through this channel */
> uint32_t num_pages;
> + /* syncs main thread and channels */
> + QemuSemaphore sem_sync;
> } MultiFDRecvParams;
>
> static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
> @@ -682,6 +688,10 @@ struct {
> int count;
> /* array of pages to sent */
> MultiFDPages_t *pages;
> + /* syncs main thread and channels */
> + QemuSemaphore sem_sync;
> + /* global number of generated multifd packets */
> + uint32_t seq;
It's interesting you use the same comment for 'seq' in
MultiFDSendParams - but I guess that means only this one is the global
version and the others aren't really global number - they're just
local to that thread?
> } *multifd_send_state;
>
> static void multifd_send_terminate_threads(Error *err)
> @@ -727,6 +737,7 @@ int multifd_save_cleanup(Error **errp)
> p->c = NULL;
> qemu_mutex_destroy(&p->mutex);
> qemu_sem_destroy(&p->sem);
> + qemu_sem_destroy(&p->sem_sync);
> g_free(p->name);
> p->name = NULL;
> multifd_pages_clear(p->pages);
> @@ -735,6 +746,7 @@ int multifd_save_cleanup(Error **errp)
> g_free(p->packet);
> p->packet = NULL;
> }
> + qemu_sem_destroy(&multifd_send_state->sem_sync);
> g_free(multifd_send_state->params);
> multifd_send_state->params = NULL;
> multifd_pages_clear(multifd_send_state->pages);
> @@ -744,6 +756,33 @@ int multifd_save_cleanup(Error **errp)
> return ret;
> }
>
> +static void multifd_send_sync_main(void)
> +{
> + int i;
> +
> + if (!migrate_use_multifd()) {
> + return;
> + }
> + for (i = 0; i < migrate_multifd_channels(); i++) {
> + MultiFDSendParams *p = &multifd_send_state->params[i];
> +
> + trace_multifd_send_sync_main_signal(p->id);
> +
> + qemu_mutex_lock(&p->mutex);
> + p->flags |= MULTIFD_FLAG_SYNC;
> + p->pending_job++;
> + qemu_mutex_unlock(&p->mutex);
> + qemu_sem_post(&p->sem);
> + }
> + for (i = 0; i < migrate_multifd_channels(); i++) {
> + MultiFDSendParams *p = &multifd_send_state->params[i];
> +
> + trace_multifd_send_sync_main_wait(p->id);
> + qemu_sem_wait(&multifd_send_state->sem_sync);
> + }
> + trace_multifd_send_sync_main(multifd_send_state->seq);
> +}
> +
OK, so this just makes each of the sending threads ack, so that seems
OK.
But what happens with an error? multifd_send_sync_main exits it's
loop with a 'break' if the writes fail, and that could mean they never
come and post the flag-sync sem.
> static void *multifd_send_thread(void *opaque)
> {
> MultiFDSendParams *p = opaque;
> @@ -778,17 +817,20 @@ static void *multifd_send_thread(void *opaque)
> /* ToDo: send packet here */
>
> qemu_mutex_lock(&p->mutex);
> + p->flags = 0;
> p->pending_job--;
> qemu_mutex_unlock(&p->mutex);
> - continue;
> +
> + if (flags & MULTIFD_FLAG_SYNC) {
> + qemu_sem_post(&multifd_send_state->sem_sync);
> + }
> } else if (p->quit) {
> qemu_mutex_unlock(&p->mutex);
> break;
> + } else {
> + qemu_mutex_unlock(&p->mutex);
> + /* sometimes there are spurious wakeups */
> }
> - qemu_mutex_unlock(&p->mutex);
> - /* this is impossible */
> - error_setg(&local_err, "multifd_send_thread: Unknown command");
> - break;
> }
>
> out:
> @@ -840,12 +882,14 @@ int multifd_save_setup(void)
> multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
> atomic_set(&multifd_send_state->count, 0);
> multifd_pages_init(&multifd_send_state->pages, page_count);
> + qemu_sem_init(&multifd_send_state->sem_sync, 0);
>
> for (i = 0; i < thread_count; i++) {
> MultiFDSendParams *p = &multifd_send_state->params[i];
>
> qemu_mutex_init(&p->mutex);
> qemu_sem_init(&p->sem, 0);
> + qemu_sem_init(&p->sem_sync, 0);
> p->quit = false;
> p->pending_job = 0;
> p->id = i;
> @@ -863,6 +907,10 @@ struct {
> MultiFDRecvParams *params;
> /* number of created threads */
> int count;
> + /* syncs main thread and channels */
> + QemuSemaphore sem_sync;
> + /* global number of generated multifd packets */
> + uint32_t seq;
> } *multifd_recv_state;
>
> static void multifd_recv_terminate_threads(Error *err)
> @@ -908,6 +956,7 @@ int multifd_load_cleanup(Error **errp)
> p->c = NULL;
> qemu_mutex_destroy(&p->mutex);
> qemu_sem_destroy(&p->sem);
> + qemu_sem_destroy(&p->sem_sync);
> g_free(p->name);
> p->name = NULL;
> multifd_pages_clear(p->pages);
> @@ -916,6 +965,7 @@ int multifd_load_cleanup(Error **errp)
> g_free(p->packet);
> p->packet = NULL;
> }
> + qemu_sem_destroy(&multifd_recv_state->sem_sync);
> g_free(multifd_recv_state->params);
> multifd_recv_state->params = NULL;
> g_free(multifd_recv_state);
> @@ -924,6 +974,42 @@ int multifd_load_cleanup(Error **errp)
> return ret;
> }
>
> +static void multifd_recv_sync_main(void)
> +{
> + int i;
> +
> + if (!migrate_use_multifd()) {
> + return;
> + }
> + for (i = 0; i < migrate_multifd_channels(); i++) {
> + MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +
> + trace_multifd_recv_sync_main_signal(p->id);
> + qemu_mutex_lock(&p->mutex);
> + p->pending_job = true;
> + qemu_mutex_unlock(&p->mutex);
> + }
> + for (i = 0; i < migrate_multifd_channels(); i++) {
> + MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +
> + trace_multifd_recv_sync_main_wait(p->id);
> + qemu_sem_wait(&multifd_recv_state->sem_sync);
> + qemu_mutex_lock(&p->mutex);
> + if (multifd_recv_state->seq < p->seq) {
> + multifd_recv_state->seq = p->seq;
> + }
Can you explain what this is for?
Something like the latest received block?
> + qemu_mutex_unlock(&p->mutex);
> + }
> + for (i = 0; i < migrate_multifd_channels(); i++) {
> + MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +
> + trace_multifd_recv_sync_main_signal(p->id);
> +
> + qemu_sem_post(&p->sem_sync);
> + }
> + trace_multifd_recv_sync_main(multifd_recv_state->seq);
> +}
> +
> static void *multifd_recv_thread(void *opaque)
> {
> MultiFDRecvParams *p = opaque;
> @@ -933,9 +1019,8 @@ static void *multifd_recv_thread(void *opaque)
> trace_multifd_recv_thread_start(p->id);
>
> while (true) {
> - qemu_sem_wait(&p->sem);
> qemu_mutex_lock(&p->mutex);
> - if (p->pending_job) {
> + if (true || p->pending_job) {
A TODO I guess???
> uint32_t used;
> uint32_t flags;
> qemu_mutex_unlock(&p->mutex);
> @@ -956,14 +1041,18 @@ static void *multifd_recv_thread(void *opaque)
> p->num_packets++;
> p->num_pages += used;
> qemu_mutex_unlock(&p->mutex);
> +
> + if (flags & MULTIFD_FLAG_SYNC) {
> + qemu_sem_post(&multifd_recv_state->sem_sync);
> + qemu_sem_wait(&p->sem_sync);
> + }
Can you explain the receive side logic - I think this is waiting for all
receive threads to 'ack' - but how do we know that they've finished
receiving all data that was sent?
Dave
> } else if (p->quit) {
> qemu_mutex_unlock(&p->mutex);
> break;
> + } else {
> + qemu_mutex_unlock(&p->mutex);
> + /* sometimes there are spurious wakeups */
> }
> - qemu_mutex_unlock(&p->mutex);
> - /* this is impossible */
> - error_setg(&local_err, "multifd_recv_thread: Unknown command");
> - break;
> }
>
> if (local_err) {
> @@ -991,12 +1080,14 @@ int multifd_load_setup(void)
> multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
> multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
> atomic_set(&multifd_recv_state->count, 0);
> + qemu_sem_init(&multifd_recv_state->sem_sync, 0);
>
> for (i = 0; i < thread_count; i++) {
> MultiFDRecvParams *p = &multifd_recv_state->params[i];
>
> qemu_mutex_init(&p->mutex);
> qemu_sem_init(&p->sem, 0);
> + qemu_sem_init(&p->sem_sync, 0);
> p->quit = false;
> p->pending_job = false;
> p->id = i;
> @@ -2695,6 +2786,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
> ram_control_before_iterate(f, RAM_CONTROL_SETUP);
> ram_control_after_iterate(f, RAM_CONTROL_SETUP);
>
> + multifd_send_sync_main();
> qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
>
> return 0;
> @@ -2770,6 +2862,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
> */
> ram_control_after_iterate(f, RAM_CONTROL_ROUND);
>
> + multifd_send_sync_main();
> out:
> qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
> ram_counters.transferred += 8;
> @@ -2823,6 +2916,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
>
> rcu_read_unlock();
>
> + multifd_send_sync_main();
> qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
>
> return 0;
> @@ -3253,6 +3347,7 @@ static int ram_load_postcopy(QEMUFile *f)
> break;
> case RAM_SAVE_FLAG_EOS:
> /* normal exit */
> + multifd_recv_sync_main();
> break;
> default:
> error_report("Unknown combination of migration flags: %#x"
> @@ -3438,6 +3533,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
> break;
> case RAM_SAVE_FLAG_EOS:
> /* normal exit */
> + multifd_recv_sync_main();
> break;
> default:
> if (flags & RAM_SAVE_FLAG_HOOK) {
> diff --git a/migration/trace-events b/migration/trace-events
> index 9eee048287..b0ab8e2d03 100644
> --- a/migration/trace-events
> +++ b/migration/trace-events
> @@ -83,6 +83,12 @@ multifd_recv_thread_start(uint8_t id) "%d"
> multifd_recv_thread_end(uint8_t id, uint32_t packets, uint32_t pages) "channel %d packets %d pages %d"
> multifd_send(uint8_t id, uint32_t seq, uint32_t used, uint32_t flags) "channel %d seq number %d pages %d flags 0x%x"
> multifd_recv(uint8_t id, uint32_t seq, uint32_t used, uint32_t flags) "channel %d seq number %d pages %d flags 0x%x"
> +multifd_send_sync_main(uint32_t seq) "seq %d"
> +multifd_send_sync_main_signal(uint8_t id) "channel %d"
> +multifd_send_sync_main_wait(uint8_t id) "channel %d"
> +multifd_recv_sync_main(uint32_t seq) "seq %d"
> +multifd_recv_sync_main_signal(uint8_t id) "channel %d"
> +multifd_recv_sync_main_wait(uint8_t id) "channel %d"
>
> # migration/migration.c
> await_return_path_close_on_source_close(void) ""
> --
> 2.17.0
>
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
next prev parent reply other threads:[~2018-05-03 10:44 UTC|newest]
Thread overview: 60+ messages / expand[flat|nested] mbox.gz Atom feed top
2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 01/21] migration: Set error state in case of error Juan Quintela
2018-05-02 15:53 ` Dr. David Alan Gilbert
2018-05-09 8:15 ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 02/21] migration: Introduce multifd_recv_new_channel() Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 03/21] migration: terminate_* can be called for other threads Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 04/21] migration: Be sure all recv channels are created Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 05/21] migration: Export functions to create send channels Juan Quintela
2018-04-26 7:28 ` Peter Xu
2018-05-09 8:05 ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 06/21] migration: Create multifd channels Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 07/21] migration: Delay start of migration main routines Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 08/21] migration: Transmit initial package through the multifd channels Juan Quintela
2018-05-02 17:19 ` Dr. David Alan Gilbert
2018-05-09 8:34 ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 09/21] migration: Define MultifdRecvParams sooner Juan Quintela
2018-05-02 17:32 ` Dr. David Alan Gilbert
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 10/21] migration: Create multipage support Juan Quintela
2018-04-26 7:15 ` Peter Xu
2018-05-09 10:52 ` Juan Quintela
2018-05-02 17:52 ` Dr. David Alan Gilbert
2018-05-09 10:53 ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 11/21] migration: Create multifd packet Juan Quintela
2018-05-02 18:04 ` Dr. David Alan Gilbert
2018-05-09 11:09 ` Juan Quintela
2018-05-09 11:12 ` Dr. David Alan Gilbert
2018-05-09 19:46 ` Juan Quintela
2018-05-11 16:36 ` Dr. David Alan Gilbert
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 12/21] migration: Add multifd traces for start/end thread Juan Quintela
2018-05-02 18:35 ` Dr. David Alan Gilbert
2018-05-09 11:11 ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 13/21] migration: Calculate transferred ram correctly Juan Quintela
2018-05-02 18:59 ` Dr. David Alan Gilbert
2018-05-09 11:14 ` Juan Quintela
2018-05-09 19:46 ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 14/21] migration: Multifd channels always wait on the sem Juan Quintela
2018-05-03 9:36 ` Dr. David Alan Gilbert
2018-05-23 10:59 ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 15/21] migration: Add block where to send/receive packets Juan Quintela
2018-05-03 10:03 ` Dr. David Alan Gilbert
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 16/21] migration: Synchronize multifd threads with main thread Juan Quintela
2018-05-03 10:44 ` Dr. David Alan Gilbert [this message]
2018-05-09 19:45 ` Juan Quintela
2018-05-11 16:32 ` Dr. David Alan Gilbert
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 17/21] migration: Create ram_multifd_page Juan Quintela
2018-04-26 7:43 ` Peter Xu
2018-04-26 8:18 ` Peter Xu
2018-05-03 11:30 ` Dr. David Alan Gilbert
2018-05-23 11:13 ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 18/21] migration: Start sending messages Juan Quintela
2018-05-03 14:55 ` Dr. David Alan Gilbert
2018-05-23 10:51 ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 19/21] migration: Wait for blocking IO Juan Quintela
2018-05-03 15:04 ` Dr. David Alan Gilbert
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 20/21] migration: Remove not needed semaphore and quit Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 21/21] migration: Stop sending whole pages through main channel Juan Quintela
2018-05-03 15:24 ` Dr. David Alan Gilbert
2018-04-25 11:44 ` [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
2018-05-03 15:32 ` Dr. David Alan Gilbert
2018-04-26 8:28 ` Peter Xu
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20180503104437.GC2660@work-vm \
--to=dgilbert@redhat.com \
--cc=lvivier@redhat.com \
--cc=peterx@redhat.com \
--cc=qemu-devel@nongnu.org \
--cc=quintela@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).