From: Juan Quintela <quintela@redhat.com>
To: qemu-devel@nongnu.org
Cc: dgilbert@redhat.com, lvivier@redhat.com, peterx@redhat.com
Subject: [Qemu-devel] [PATCH v13 07/12] migration: Synchronize multifd threads with main thread
Date: Wed, 23 May 2018 13:18:12 +0200 [thread overview]
Message-ID: <20180523111817.1463-8-quintela@redhat.com> (raw)
In-Reply-To: <20180523111817.1463-1-quintela@redhat.com>
We synchronize all threads each RAM_SAVE_FLAG_EOS. Bitmap
synchronizations don't happen inside a ram section, so we are safe
about two channels trying to overwrite the same memory.
Signed-off-by: Juan Quintela <quintela@redhat.com>
---
migration/ram.c | 117 +++++++++++++++++++++++++++++++++++++----
migration/trace-events | 6 +++
2 files changed, 112 insertions(+), 11 deletions(-)
diff --git a/migration/ram.c b/migration/ram.c
index c9a9bd79f3..3e99d48123 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -503,6 +503,8 @@ exit:
#define MULTIFD_MAGIC 0x11223344U
#define MULTIFD_VERSION 1
+#define MULTIFD_FLAG_SYNC (1 << 0)
+
typedef struct {
uint32_t magic;
uint32_t version;
@@ -570,6 +572,8 @@ typedef struct {
uint32_t num_packets;
/* pages sent through this channel */
uint32_t num_pages;
+ /* syncs main thread and channels */
+ QemuSemaphore sem_sync;
} MultiFDSendParams;
typedef struct {
@@ -607,6 +611,8 @@ typedef struct {
uint32_t num_packets;
/* pages sent through this channel */
uint32_t num_pages;
+ /* syncs main thread and channels */
+ QemuSemaphore sem_sync;
} MultiFDRecvParams;
static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
@@ -794,6 +800,10 @@ struct {
int count;
/* array of pages to sent */
MultiFDPages_t *pages;
+ /* syncs main thread and channels */
+ QemuSemaphore sem_sync;
+ /* global number of generated multifd packets */
+ uint32_t seq;
} *multifd_send_state;
static void multifd_send_terminate_threads(Error *err)
@@ -841,6 +851,7 @@ int multifd_save_cleanup(Error **errp)
p->c = NULL;
qemu_mutex_destroy(&p->mutex);
qemu_sem_destroy(&p->sem);
+ qemu_sem_destroy(&p->sem_sync);
g_free(p->name);
p->name = NULL;
multifd_pages_clear(p->pages);
@@ -849,6 +860,7 @@ int multifd_save_cleanup(Error **errp)
g_free(p->packet);
p->packet = NULL;
}
+ qemu_sem_destroy(&multifd_send_state->sem_sync);
g_free(multifd_send_state->params);
multifd_send_state->params = NULL;
multifd_pages_clear(multifd_send_state->pages);
@@ -858,6 +870,33 @@ int multifd_save_cleanup(Error **errp)
return ret;
}
+static void multifd_send_sync_main(void)
+{
+ int i;
+
+ if (!migrate_use_multifd()) {
+ return;
+ }
+ for (i = 0; i < migrate_multifd_channels(); i++) {
+ MultiFDSendParams *p = &multifd_send_state->params[i];
+
+ trace_multifd_send_sync_main_signal(p->id);
+
+ qemu_mutex_lock(&p->mutex);
+ p->flags |= MULTIFD_FLAG_SYNC;
+ p->pending_job++;
+ qemu_mutex_unlock(&p->mutex);
+ qemu_sem_post(&p->sem);
+ }
+ for (i = 0; i < migrate_multifd_channels(); i++) {
+ MultiFDSendParams *p = &multifd_send_state->params[i];
+
+ trace_multifd_send_sync_main_wait(p->id);
+ qemu_sem_wait(&multifd_send_state->sem_sync);
+ }
+ trace_multifd_send_sync_main(multifd_send_state->seq);
+}
+
static void *multifd_send_thread(void *opaque)
{
MultiFDSendParams *p = opaque;
@@ -894,15 +933,17 @@ static void *multifd_send_thread(void *opaque)
qemu_mutex_lock(&p->mutex);
p->pending_job--;
qemu_mutex_unlock(&p->mutex);
- continue;
+
+ if (flags & MULTIFD_FLAG_SYNC) {
+ qemu_sem_post(&multifd_send_state->sem_sync);
+ }
} else if (p->quit) {
qemu_mutex_unlock(&p->mutex);
break;
+ } else {
+ qemu_mutex_unlock(&p->mutex);
+ /* sometimes there are spurious wakeups */
}
- qemu_mutex_unlock(&p->mutex);
- /* this is impossible */
- error_setg(&local_err, "multifd_send_thread: Unknown command");
- break;
}
out:
@@ -954,12 +995,14 @@ int multifd_save_setup(void)
multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
atomic_set(&multifd_send_state->count, 0);
multifd_send_state->pages = multifd_pages_init(page_count);
+ qemu_sem_init(&multifd_send_state->sem_sync, 0);
for (i = 0; i < thread_count; i++) {
MultiFDSendParams *p = &multifd_send_state->params[i];
qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem, 0);
+ qemu_sem_init(&p->sem_sync, 0);
p->quit = false;
p->pending_job = 0;
p->id = i;
@@ -977,6 +1020,10 @@ struct {
MultiFDRecvParams *params;
/* number of created threads */
int count;
+ /* syncs main thread and channels */
+ QemuSemaphore sem_sync;
+ /* global number of generated multifd packets */
+ uint32_t seq;
} *multifd_recv_state;
static void multifd_recv_terminate_threads(Error *err)
@@ -1022,6 +1069,7 @@ int multifd_load_cleanup(Error **errp)
p->c = NULL;
qemu_mutex_destroy(&p->mutex);
qemu_sem_destroy(&p->sem);
+ qemu_sem_destroy(&p->sem_sync);
g_free(p->name);
p->name = NULL;
multifd_pages_clear(p->pages);
@@ -1030,6 +1078,7 @@ int multifd_load_cleanup(Error **errp)
g_free(p->packet);
p->packet = NULL;
}
+ qemu_sem_destroy(&multifd_recv_state->sem_sync);
g_free(multifd_recv_state->params);
multifd_recv_state->params = NULL;
g_free(multifd_recv_state);
@@ -1038,6 +1087,42 @@ int multifd_load_cleanup(Error **errp)
return ret;
}
+static void multifd_recv_sync_main(void)
+{
+ int i;
+
+ if (!migrate_use_multifd()) {
+ return;
+ }
+ for (i = 0; i < migrate_multifd_channels(); i++) {
+ MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+ trace_multifd_recv_sync_main_signal(p->id);
+ qemu_mutex_lock(&p->mutex);
+ p->pending_job = true;
+ qemu_mutex_unlock(&p->mutex);
+ }
+ for (i = 0; i < migrate_multifd_channels(); i++) {
+ MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+ trace_multifd_recv_sync_main_wait(p->id);
+ qemu_sem_wait(&multifd_recv_state->sem_sync);
+ qemu_mutex_lock(&p->mutex);
+ if (multifd_recv_state->seq < p->seq) {
+ multifd_recv_state->seq = p->seq;
+ }
+ qemu_mutex_unlock(&p->mutex);
+ }
+ for (i = 0; i < migrate_multifd_channels(); i++) {
+ MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+ trace_multifd_recv_sync_main_signal(p->id);
+
+ qemu_sem_post(&p->sem_sync);
+ }
+ trace_multifd_recv_sync_main(multifd_recv_state->seq);
+}
+
static void *multifd_recv_thread(void *opaque)
{
MultiFDRecvParams *p = opaque;
@@ -1047,9 +1132,8 @@ static void *multifd_recv_thread(void *opaque)
trace_multifd_recv_thread_start(p->id);
while (true) {
- qemu_sem_wait(&p->sem);
qemu_mutex_lock(&p->mutex);
- if (p->pending_job) {
+ if (true || p->pending_job) {
uint32_t used;
uint32_t flags;
qemu_mutex_unlock(&p->mutex);
@@ -1070,14 +1154,18 @@ static void *multifd_recv_thread(void *opaque)
p->num_packets++;
p->num_pages += used;
qemu_mutex_unlock(&p->mutex);
+
+ if (flags & MULTIFD_FLAG_SYNC) {
+ qemu_sem_post(&multifd_recv_state->sem_sync);
+ qemu_sem_wait(&p->sem_sync);
+ }
} else if (p->quit) {
qemu_mutex_unlock(&p->mutex);
break;
+ } else {
+ qemu_mutex_unlock(&p->mutex);
+ /* sometimes there are spurious wakeups */
}
- qemu_mutex_unlock(&p->mutex);
- /* this is impossible */
- error_setg(&local_err, "multifd_recv_thread: Unknown command");
- break;
}
if (local_err) {
@@ -1105,12 +1193,14 @@ int multifd_load_setup(void)
multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
atomic_set(&multifd_recv_state->count, 0);
+ qemu_sem_init(&multifd_recv_state->sem_sync, 0);
for (i = 0; i < thread_count; i++) {
MultiFDRecvParams *p = &multifd_recv_state->params[i];
qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem, 0);
+ qemu_sem_init(&p->sem_sync, 0);
p->quit = false;
p->pending_job = false;
p->id = i;
@@ -2847,6 +2937,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
ram_control_before_iterate(f, RAM_CONTROL_SETUP);
ram_control_after_iterate(f, RAM_CONTROL_SETUP);
+ multifd_send_sync_main();
qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
return 0;
@@ -2922,6 +3013,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
*/
ram_control_after_iterate(f, RAM_CONTROL_ROUND);
+ multifd_send_sync_main();
out:
qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
ram_counters.transferred += 8;
@@ -2975,6 +3067,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
rcu_read_unlock();
+ multifd_send_sync_main();
qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
return 0;
@@ -3459,6 +3552,7 @@ static int ram_load_postcopy(QEMUFile *f)
break;
case RAM_SAVE_FLAG_EOS:
/* normal exit */
+ multifd_recv_sync_main();
break;
default:
error_report("Unknown combination of migration flags: %#x"
@@ -3644,6 +3738,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
break;
case RAM_SAVE_FLAG_EOS:
/* normal exit */
+ multifd_recv_sync_main();
break;
default:
if (flags & RAM_SAVE_FLAG_HOOK) {
diff --git a/migration/trace-events b/migration/trace-events
index 36e20b312d..b821041281 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -77,9 +77,15 @@ migration_bitmap_sync_start(void) ""
migration_bitmap_sync_end(uint64_t dirty_pages) "dirty_pages %" PRIu64
migration_throttle(void) ""
multifd_recv(uint8_t id, uint32_t seq, uint32_t used, uint32_t flags) "channel %d seq number %d pages %d flags 0x%x"
+multifd_recv_sync_main(uint32_t seq) "seq %d"
+multifd_recv_sync_main_signal(uint8_t id) "channel %d"
+multifd_recv_sync_main_wait(uint8_t id) "channel %d"
multifd_recv_thread_end(uint8_t id, uint32_t packets, uint32_t pages) "channel %d packets %d pages %d"
multifd_recv_thread_start(uint8_t id) "%d"
multifd_send(uint8_t id, uint32_t seq, uint32_t used, uint32_t flags) "channel %d seq number %d pages %d flags 0x%x"
+multifd_send_sync_main(uint32_t seq) "seq %d"
+multifd_send_sync_main_signal(uint8_t id) "channel %d"
+multifd_send_sync_main_wait(uint8_t id) "channel %d"
multifd_send_thread_end(uint8_t id, uint32_t packets, uint32_t pages) "channel %d packets %d pages %d"
multifd_send_thread_start(uint8_t id) "%d"
ram_discard_range(const char *rbname, uint64_t start, size_t len) "%s: start: %" PRIx64 " %zx"
--
2.17.0
next prev parent reply other threads:[~2018-05-23 11:18 UTC|newest]
Thread overview: 32+ messages / expand[flat|nested] mbox.gz Atom feed top
2018-05-23 11:18 [Qemu-devel] [PATCH v13 00/12] Multifd Juan Quintela
2018-05-23 11:18 ` [Qemu-devel] [PATCH v13 01/12] migration: Create multipage support Juan Quintela
2018-06-11 16:58 ` Dr. David Alan Gilbert
2018-05-23 11:18 ` [Qemu-devel] [PATCH v13 02/12] migration: Create multifd packet Juan Quintela
2018-06-11 11:10 ` Dr. David Alan Gilbert
2018-05-23 11:18 ` [Qemu-devel] [PATCH v13 03/12] migration: Add multifd traces for start/end thread Juan Quintela
2018-05-31 16:11 ` Dr. David Alan Gilbert
2018-05-23 11:18 ` [Qemu-devel] [PATCH v13 04/12] migration: Calculate transferred ram correctly Juan Quintela
2018-05-31 17:14 ` Dr. David Alan Gilbert
2018-05-23 11:18 ` [Qemu-devel] [PATCH v13 05/12] migration: Multifd channels always wait on the sem Juan Quintela
2018-06-11 17:13 ` Dr. David Alan Gilbert
2018-05-23 11:18 ` [Qemu-devel] [PATCH v13 06/12] migration: Add block where to send/receive packets Juan Quintela
2018-05-23 11:18 ` Juan Quintela [this message]
2018-06-11 11:53 ` [Qemu-devel] [PATCH v13 07/12] migration: Synchronize multifd threads with main thread Dr. David Alan Gilbert
2018-05-23 11:18 ` [Qemu-devel] [PATCH v13 08/12] migration: Create ram_save_multifd_page Juan Quintela
2018-06-11 17:33 ` Dr. David Alan Gilbert
2018-05-23 11:18 ` [Qemu-devel] [PATCH v13 09/12] migration: Start sending messages Juan Quintela
2018-06-11 15:49 ` Dr. David Alan Gilbert
2018-05-23 11:18 ` [Qemu-devel] [PATCH v13 10/12] migration: Wait for blocking IO Juan Quintela
2018-06-11 15:54 ` Dr. David Alan Gilbert
2018-05-23 11:18 ` [Qemu-devel] [PATCH v13 11/12] migration: Remove not needed semaphore and quit Juan Quintela
2018-06-11 16:45 ` Dr. David Alan Gilbert
2018-06-20 7:20 ` Juan Quintela
2018-06-20 9:01 ` Dr. David Alan Gilbert
2018-06-20 9:42 ` Juan Quintela
2018-06-20 9:46 ` Dr. David Alan Gilbert
2018-06-20 9:48 ` Juan Quintela
2018-06-20 10:38 ` Dr. David Alan Gilbert
2018-06-20 11:07 ` Juan Quintela
2018-06-20 11:25 ` Dr. David Alan Gilbert
2018-05-23 11:18 ` [Qemu-devel] [PATCH v13 12/12] migration: Stop sending whole pages through main channel Juan Quintela
2018-05-23 11:41 ` [Qemu-devel] [PATCH v13 00/12] Multifd no-reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20180523111817.1463-8-quintela@redhat.com \
--to=quintela@redhat.com \
--cc=dgilbert@redhat.com \
--cc=lvivier@redhat.com \
--cc=peterx@redhat.com \
--cc=qemu-devel@nongnu.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).