qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: Juan Quintela <quintela@redhat.com>
To: qemu-devel@nongnu.org
Cc: dgilbert@redhat.com, lvivier@redhat.com, peterx@redhat.com
Subject: [Qemu-devel] [PATCH v12 16/21] migration: Synchronize multifd threads with main thread
Date: Wed, 25 Apr 2018 13:27:18 +0200	[thread overview]
Message-ID: <20180425112723.1111-17-quintela@redhat.com> (raw)
In-Reply-To: <20180425112723.1111-1-quintela@redhat.com>

We synchronize all threads each RAM_SAVE_FLAG_EOS.  Bitmap
synchronizations don't happen inside a  ram section, so we are safe
about two channels trying to overwrite the same memory.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c        | 118 +++++++++++++++++++++++++++++++++++++----
 migration/trace-events |   6 +++
 2 files changed, 113 insertions(+), 11 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index c4c185cc4c..398cb0af3b 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -405,6 +405,8 @@ static void compress_threads_save_setup(void)
 #define MULTIFD_MAGIC 0x11223344U
 #define MULTIFD_VERSION 1
 
+#define MULTIFD_FLAG_SYNC (1 << 0)
+
 typedef struct {
     uint32_t magic;
     uint32_t version;
@@ -471,6 +473,8 @@ typedef struct {
     uint32_t num_packets;
     /* pages sent through this channel */
     uint32_t num_pages;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
 }  MultiFDSendParams;
 
 typedef struct {
@@ -507,6 +511,8 @@ typedef struct {
     uint32_t num_packets;
     /* pages sent through this channel */
     uint32_t num_pages;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
 } MultiFDRecvParams;
 
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
@@ -682,6 +688,10 @@ struct {
     int count;
     /* array of pages to sent */
     MultiFDPages_t *pages;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
+    /* global number of generated multifd packets */
+    uint32_t seq;
 } *multifd_send_state;
 
 static void multifd_send_terminate_threads(Error *err)
@@ -727,6 +737,7 @@ int multifd_save_cleanup(Error **errp)
         p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        qemu_sem_destroy(&p->sem_sync);
         g_free(p->name);
         p->name = NULL;
         multifd_pages_clear(p->pages);
@@ -735,6 +746,7 @@ int multifd_save_cleanup(Error **errp)
         g_free(p->packet);
         p->packet = NULL;
     }
+    qemu_sem_destroy(&multifd_send_state->sem_sync);
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
     multifd_pages_clear(multifd_send_state->pages);
@@ -744,6 +756,33 @@ int multifd_save_cleanup(Error **errp)
     return ret;
 }
 
+static void multifd_send_sync_main(void)
+{
+    int i;
+
+    if (!migrate_use_multifd()) {
+        return;
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+
+        trace_multifd_send_sync_main_signal(p->id);
+
+        qemu_mutex_lock(&p->mutex);
+        p->flags |= MULTIFD_FLAG_SYNC;
+        p->pending_job++;
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_post(&p->sem);
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+
+        trace_multifd_send_sync_main_wait(p->id);
+        qemu_sem_wait(&multifd_send_state->sem_sync);
+    }
+    trace_multifd_send_sync_main(multifd_send_state->seq);
+}
+
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
@@ -778,17 +817,20 @@ static void *multifd_send_thread(void *opaque)
             /* ToDo: send packet here */
 
             qemu_mutex_lock(&p->mutex);
+            p->flags = 0;
             p->pending_job--;
             qemu_mutex_unlock(&p->mutex);
-            continue;
+
+            if (flags & MULTIFD_FLAG_SYNC) {
+                qemu_sem_post(&multifd_send_state->sem_sync);
+            }
         } else if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
+        } else {
+            qemu_mutex_unlock(&p->mutex);
+            /* sometimes there are spurious wakeups */
         }
-        qemu_mutex_unlock(&p->mutex);
-        /* this is impossible */
-        error_setg(&local_err, "multifd_send_thread: Unknown command");
-        break;
     }
 
 out:
@@ -840,12 +882,14 @@ int multifd_save_setup(void)
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
     atomic_set(&multifd_send_state->count, 0);
     multifd_pages_init(&multifd_send_state->pages, page_count);
+    qemu_sem_init(&multifd_send_state->sem_sync, 0);
 
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
+        qemu_sem_init(&p->sem_sync, 0);
         p->quit = false;
         p->pending_job = 0;
         p->id = i;
@@ -863,6 +907,10 @@ struct {
     MultiFDRecvParams *params;
     /* number of created threads */
     int count;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
+    /* global number of generated multifd packets */
+    uint32_t seq;
 } *multifd_recv_state;
 
 static void multifd_recv_terminate_threads(Error *err)
@@ -908,6 +956,7 @@ int multifd_load_cleanup(Error **errp)
         p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
+        qemu_sem_destroy(&p->sem_sync);
         g_free(p->name);
         p->name = NULL;
         multifd_pages_clear(p->pages);
@@ -916,6 +965,7 @@ int multifd_load_cleanup(Error **errp)
         g_free(p->packet);
         p->packet = NULL;
     }
+    qemu_sem_destroy(&multifd_recv_state->sem_sync);
     g_free(multifd_recv_state->params);
     multifd_recv_state->params = NULL;
     g_free(multifd_recv_state);
@@ -924,6 +974,42 @@ int multifd_load_cleanup(Error **errp)
     return ret;
 }
 
+static void multifd_recv_sync_main(void)
+{
+    int i;
+
+    if (!migrate_use_multifd()) {
+        return;
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        trace_multifd_recv_sync_main_signal(p->id);
+        qemu_mutex_lock(&p->mutex);
+        p->pending_job = true;
+        qemu_mutex_unlock(&p->mutex);
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        trace_multifd_recv_sync_main_wait(p->id);
+        qemu_sem_wait(&multifd_recv_state->sem_sync);
+        qemu_mutex_lock(&p->mutex);
+        if (multifd_recv_state->seq < p->seq) {
+            multifd_recv_state->seq = p->seq;
+        }
+        qemu_mutex_unlock(&p->mutex);
+    }
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        trace_multifd_recv_sync_main_signal(p->id);
+
+        qemu_sem_post(&p->sem_sync);
+    }
+    trace_multifd_recv_sync_main(multifd_recv_state->seq);
+}
+
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
@@ -933,9 +1019,8 @@ static void *multifd_recv_thread(void *opaque)
     trace_multifd_recv_thread_start(p->id);
 
     while (true) {
-        qemu_sem_wait(&p->sem);
         qemu_mutex_lock(&p->mutex);
-        if (p->pending_job) {
+        if (true || p->pending_job) {
             uint32_t used;
             uint32_t flags;
             qemu_mutex_unlock(&p->mutex);
@@ -956,14 +1041,18 @@ static void *multifd_recv_thread(void *opaque)
             p->num_packets++;
             p->num_pages += used;
             qemu_mutex_unlock(&p->mutex);
+
+            if (flags & MULTIFD_FLAG_SYNC) {
+                qemu_sem_post(&multifd_recv_state->sem_sync);
+                qemu_sem_wait(&p->sem_sync);
+            }
         } else if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
+        } else {
+            qemu_mutex_unlock(&p->mutex);
+            /* sometimes there are spurious wakeups */
         }
-        qemu_mutex_unlock(&p->mutex);
-        /* this is impossible */
-        error_setg(&local_err, "multifd_recv_thread: Unknown command");
-        break;
     }
 
     if (local_err) {
@@ -991,12 +1080,14 @@ int multifd_load_setup(void)
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
     atomic_set(&multifd_recv_state->count, 0);
+    qemu_sem_init(&multifd_recv_state->sem_sync, 0);
 
     for (i = 0; i < thread_count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
+        qemu_sem_init(&p->sem_sync, 0);
         p->quit = false;
         p->pending_job = false;
         p->id = i;
@@ -2695,6 +2786,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
 
+    multifd_send_sync_main();
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
 
     return 0;
@@ -2770,6 +2862,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
      */
     ram_control_after_iterate(f, RAM_CONTROL_ROUND);
 
+    multifd_send_sync_main();
 out:
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
     ram_counters.transferred += 8;
@@ -2823,6 +2916,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
 
     rcu_read_unlock();
 
+    multifd_send_sync_main();
     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
 
     return 0;
@@ -3253,6 +3347,7 @@ static int ram_load_postcopy(QEMUFile *f)
             break;
         case RAM_SAVE_FLAG_EOS:
             /* normal exit */
+            multifd_recv_sync_main();
             break;
         default:
             error_report("Unknown combination of migration flags: %#x"
@@ -3438,6 +3533,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
             break;
         case RAM_SAVE_FLAG_EOS:
             /* normal exit */
+            multifd_recv_sync_main();
             break;
         default:
             if (flags & RAM_SAVE_FLAG_HOOK) {
diff --git a/migration/trace-events b/migration/trace-events
index 9eee048287..b0ab8e2d03 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -83,6 +83,12 @@ multifd_recv_thread_start(uint8_t id) "%d"
 multifd_recv_thread_end(uint8_t id, uint32_t packets, uint32_t pages) "channel %d packets %d pages %d"
 multifd_send(uint8_t id, uint32_t seq, uint32_t used, uint32_t flags) "channel %d seq number %d pages %d flags 0x%x"
 multifd_recv(uint8_t id, uint32_t seq, uint32_t used, uint32_t flags) "channel %d seq number %d pages %d flags 0x%x"
+multifd_send_sync_main(uint32_t seq) "seq %d"
+multifd_send_sync_main_signal(uint8_t id) "channel %d"
+multifd_send_sync_main_wait(uint8_t id) "channel %d"
+multifd_recv_sync_main(uint32_t seq) "seq %d"
+multifd_recv_sync_main_signal(uint8_t id) "channel %d"
+multifd_recv_sync_main_wait(uint8_t id) "channel %d"
 
 # migration/migration.c
 await_return_path_close_on_source_close(void) ""
-- 
2.17.0

  parent reply	other threads:[~2018-04-25 11:27 UTC|newest]

Thread overview: 60+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-04-25 11:27 [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 01/21] migration: Set error state in case of error Juan Quintela
2018-05-02 15:53   ` Dr. David Alan Gilbert
2018-05-09  8:15     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 02/21] migration: Introduce multifd_recv_new_channel() Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 03/21] migration: terminate_* can be called for other threads Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 04/21] migration: Be sure all recv channels are created Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 05/21] migration: Export functions to create send channels Juan Quintela
2018-04-26  7:28   ` Peter Xu
2018-05-09  8:05     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 06/21] migration: Create multifd channels Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 07/21] migration: Delay start of migration main routines Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 08/21] migration: Transmit initial package through the multifd channels Juan Quintela
2018-05-02 17:19   ` Dr. David Alan Gilbert
2018-05-09  8:34     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 09/21] migration: Define MultifdRecvParams sooner Juan Quintela
2018-05-02 17:32   ` Dr. David Alan Gilbert
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 10/21] migration: Create multipage support Juan Quintela
2018-04-26  7:15   ` Peter Xu
2018-05-09 10:52     ` Juan Quintela
2018-05-02 17:52   ` Dr. David Alan Gilbert
2018-05-09 10:53     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 11/21] migration: Create multifd packet Juan Quintela
2018-05-02 18:04   ` Dr. David Alan Gilbert
2018-05-09 11:09     ` Juan Quintela
2018-05-09 11:12       ` Dr. David Alan Gilbert
2018-05-09 19:46         ` Juan Quintela
2018-05-11 16:36           ` Dr. David Alan Gilbert
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 12/21] migration: Add multifd traces for start/end thread Juan Quintela
2018-05-02 18:35   ` Dr. David Alan Gilbert
2018-05-09 11:11     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 13/21] migration: Calculate transferred ram correctly Juan Quintela
2018-05-02 18:59   ` Dr. David Alan Gilbert
2018-05-09 11:14     ` Juan Quintela
2018-05-09 19:46     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 14/21] migration: Multifd channels always wait on the sem Juan Quintela
2018-05-03  9:36   ` Dr. David Alan Gilbert
2018-05-23 10:59     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 15/21] migration: Add block where to send/receive packets Juan Quintela
2018-05-03 10:03   ` Dr. David Alan Gilbert
2018-04-25 11:27 ` Juan Quintela [this message]
2018-05-03 10:44   ` [Qemu-devel] [PATCH v12 16/21] migration: Synchronize multifd threads with main thread Dr. David Alan Gilbert
2018-05-09 19:45     ` Juan Quintela
2018-05-11 16:32       ` Dr. David Alan Gilbert
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 17/21] migration: Create ram_multifd_page Juan Quintela
2018-04-26  7:43   ` Peter Xu
2018-04-26  8:18   ` Peter Xu
2018-05-03 11:30     ` Dr. David Alan Gilbert
2018-05-23 11:13     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 18/21] migration: Start sending messages Juan Quintela
2018-05-03 14:55   ` Dr. David Alan Gilbert
2018-05-23 10:51     ` Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 19/21] migration: Wait for blocking IO Juan Quintela
2018-05-03 15:04   ` Dr. David Alan Gilbert
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 20/21] migration: Remove not needed semaphore and quit Juan Quintela
2018-04-25 11:27 ` [Qemu-devel] [PATCH v12 21/21] migration: Stop sending whole pages through main channel Juan Quintela
2018-05-03 15:24   ` Dr. David Alan Gilbert
2018-04-25 11:44 ` [Qemu-devel] [PATCH v12 00/21] Multifd Juan Quintela
2018-05-03 15:32   ` Dr. David Alan Gilbert
2018-04-26  8:28 ` Peter Xu

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20180425112723.1111-17-quintela@redhat.com \
    --to=quintela@redhat.com \
    --cc=dgilbert@redhat.com \
    --cc=lvivier@redhat.com \
    --cc=peterx@redhat.com \
    --cc=qemu-devel@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).