qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: Fabiano Rosas <farosas@suse.de>
To: qemu-devel@nongnu.org
Cc: berrange@redhat.com, armbru@redhat.com,
	Peter Xu <peterx@redhat.com>, Claudio Fontana <cfontana@suse.de>
Subject: [PATCH v4 19/34] migration/multifd: Allow receiving pages without packets
Date: Tue, 20 Feb 2024 19:41:23 -0300	[thread overview]
Message-ID: <20240220224138.24759-20-farosas@suse.de> (raw)
In-Reply-To: <20240220224138.24759-1-farosas@suse.de>

Currently multifd does not need to have knowledge of pages on the
receiving side because all the information needed is within the
packets that come in the stream.

We're about to add support to fixed-ram migration, which cannot use
packets because it expects the ramblock section in the migration file
to contain only the guest pages data.

Add a data structure to transfer pages between the ram migration code
and the multifd receiving threads.

We don't want to reuse MultiFDPages_t for two reasons:

a) multifd threads don't really need to know about the data they're
   receiving.

b) the receiving side has to be stopped to load the pages, which means
   we can experiment with larger granularities than page size when
   transferring data.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
---
@Peter: a 'quit' flag cannot be used instead of pending_job. The
receiving thread needs know there's no more data coming. If the
migration thread sets a 'quit' flag, the multifd thread would see the
flag right away and exit. The only way is to clear pending_job on the
thread and spin once more.
---
 migration/file.c    |   1 +
 migration/multifd.c | 122 +++++++++++++++++++++++++++++++++++++++++---
 migration/multifd.h |  15 ++++++
 3 files changed, 131 insertions(+), 7 deletions(-)

diff --git a/migration/file.c b/migration/file.c
index 5d4975f43e..22d052a71f 100644
--- a/migration/file.c
+++ b/migration/file.c
@@ -6,6 +6,7 @@
  */
 
 #include "qemu/osdep.h"
+#include "exec/ramblock.h"
 #include "qemu/cutils.h"
 #include "qapi/error.h"
 #include "channel.h"
diff --git a/migration/multifd.c b/migration/multifd.c
index 0a5279314d..45a0c7aaa8 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -81,9 +81,15 @@ struct {
 
 struct {
     MultiFDRecvParams *params;
+    MultiFDRecvData *data;
     /* number of created threads */
     int count;
-    /* syncs main thread and channels */
+    /*
+     * For sockets: this is posted once for each MULTIFD_FLAG_SYNC flag.
+     *
+     * For files: this is only posted at the end of the file load to mark
+     *            completion of the load process.
+     */
     QemuSemaphore sem_sync;
     /* global number of generated multifd packets */
     uint64_t packet_num;
@@ -1110,6 +1116,53 @@ bool multifd_send_setup(void)
     return true;
 }
 
+bool multifd_recv(void)
+{
+    int i;
+    static int next_recv_channel;
+    MultiFDRecvParams *p = NULL;
+    MultiFDRecvData *data = multifd_recv_state->data;
+
+    /*
+     * next_channel can remain from a previous migration that was
+     * using more channels, so ensure it doesn't overflow if the
+     * limit is lower now.
+     */
+    next_recv_channel %= migrate_multifd_channels();
+    for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
+        if (multifd_recv_should_exit()) {
+            return false;
+        }
+
+        p = &multifd_recv_state->params[i];
+
+        /*
+         * Safe to read atomically without a lock because the flag is
+         * only set by this function below. Reading an old value of
+         * true is not an issue because it would only send us looking
+         * for the next idle channel.
+         */
+        if (qatomic_read(&p->pending_job) == false) {
+            next_recv_channel = (i + 1) % migrate_multifd_channels();
+            break;
+        }
+    }
+
+    assert(!p->data->size);
+    multifd_recv_state->data = p->data;
+    p->data = data;
+
+    qatomic_set(&p->pending_job, true);
+    qemu_sem_post(&p->sem);
+
+    return true;
+}
+
+MultiFDRecvData *multifd_get_recv_data(void)
+{
+    return multifd_recv_state->data;
+}
+
 static void multifd_recv_terminate_threads(Error *err)
 {
     int i;
@@ -1134,11 +1187,26 @@ static void multifd_recv_terminate_threads(Error *err)
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         /*
-         * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
-         * however try to wakeup it without harm in cleanup phase.
+         * The migration thread and channels interact differently
+         * depending on the presence of packets.
          */
         if (multifd_use_packets()) {
+            /*
+             * The channel receives as long as there are packets. When
+             * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
+             * channel waits for the migration thread to sync. If the
+             * sync never happens, do it here.
+             */
             qemu_sem_post(&p->sem_sync);
+        } else {
+            /*
+             * The channel waits for the migration thread to give it
+             * work. When the migration thread runs out of work, it
+             * releases the channel and waits for any pending work to
+             * finish. If we reach here (e.g. due to error) before the
+             * work runs out, release the channel.
+             */
+            qemu_sem_post(&p->sem);
         }
 
         /*
@@ -1167,6 +1235,7 @@ static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
     p->c = NULL;
     qemu_mutex_destroy(&p->mutex);
     qemu_sem_destroy(&p->sem_sync);
+    qemu_sem_destroy(&p->sem);
     g_free(p->name);
     p->name = NULL;
     p->packet_len = 0;
@@ -1184,6 +1253,8 @@ static void multifd_recv_cleanup_state(void)
     qemu_sem_destroy(&multifd_recv_state->sem_sync);
     g_free(multifd_recv_state->params);
     multifd_recv_state->params = NULL;
+    g_free(multifd_recv_state->data);
+    multifd_recv_state->data = NULL;
     g_free(multifd_recv_state);
     multifd_recv_state = NULL;
 }
@@ -1251,11 +1322,11 @@ static void *multifd_recv_thread(void *opaque)
         bool has_data = false;
         p->normal_num = 0;
 
-        if (multifd_recv_should_exit()) {
-            break;
-        }
-
         if (use_packets) {
+            if (multifd_recv_should_exit()) {
+                break;
+            }
+
             ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
                                            p->packet_len, &local_err);
             if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
@@ -1274,6 +1345,26 @@ static void *multifd_recv_thread(void *opaque)
             p->flags &= ~MULTIFD_FLAG_SYNC;
             has_data = !!p->normal_num;
             qemu_mutex_unlock(&p->mutex);
+        } else {
+            /*
+             * No packets, so we need to wait for the vmstate code to
+             * give us work.
+             */
+            qemu_sem_wait(&p->sem);
+
+            if (multifd_recv_should_exit()) {
+                break;
+            }
+
+            /*
+             * Migration thread did not send work, break and signal
+             * sem_sync so it knows we're not lagging behind.
+             */
+            if (!qatomic_read(&p->pending_job)) {
+                break;
+            }
+
+            has_data = !!p->data->size;
         }
 
         if (has_data) {
@@ -1288,9 +1379,17 @@ static void *multifd_recv_thread(void *opaque)
                 qemu_sem_post(&multifd_recv_state->sem_sync);
                 qemu_sem_wait(&p->sem_sync);
             }
+        } else {
+            p->total_normal_pages += p->data->size / qemu_target_page_size();
+            p->data->size = 0;
+            qatomic_set(&p->pending_job, false);
         }
     }
 
+    if (!use_packets) {
+        qemu_sem_post(&p->sem_sync);
+    }
+
     if (local_err) {
         multifd_recv_terminate_threads(local_err);
         error_free(local_err);
@@ -1320,6 +1419,10 @@ int multifd_recv_setup(Error **errp)
     thread_count = migrate_multifd_channels();
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
+
+    multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
+    multifd_recv_state->data->size = 0;
+
     qatomic_set(&multifd_recv_state->count, 0);
     qatomic_set(&multifd_recv_state->exiting, 0);
     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
@@ -1330,8 +1433,13 @@ int multifd_recv_setup(Error **errp)
 
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem_sync, 0);
+        qemu_sem_init(&p->sem, 0);
+        p->pending_job = false;
         p->id = i;
 
+        p->data = g_new0(MultiFDRecvData, 1);
+        p->data->size = 0;
+
         if (use_packets) {
             p->packet_len = sizeof(MultiFDPacket_t)
                 + sizeof(uint64_t) * page_count;
diff --git a/migration/multifd.h b/migration/multifd.h
index 9a6a7a72df..19188815a3 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -13,6 +13,8 @@
 #ifndef QEMU_MIGRATION_MULTIFD_H
 #define QEMU_MIGRATION_MULTIFD_H
 
+typedef struct MultiFDRecvData MultiFDRecvData;
+
 bool multifd_send_setup(void);
 void multifd_send_shutdown(void);
 int multifd_recv_setup(Error **errp);
@@ -23,6 +25,8 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
 void multifd_recv_sync_main(void);
 int multifd_send_sync_main(void);
 bool multifd_queue_page(RAMBlock *block, ram_addr_t offset);
+bool multifd_recv(void);
+MultiFDRecvData *multifd_get_recv_data(void);
 
 /* Multifd Compression flags */
 #define MULTIFD_FLAG_SYNC (1 << 0)
@@ -63,6 +67,13 @@ typedef struct {
     RAMBlock *block;
 } MultiFDPages_t;
 
+struct MultiFDRecvData {
+    void *opaque;
+    size_t size;
+    /* for preadv */
+    off_t file_offset;
+};
+
 typedef struct {
     /* Fields are only written at creating/deletion time */
     /* No lock required for them, they are read only */
@@ -154,6 +165,8 @@ typedef struct {
 
     /* syncs main thread and channels */
     QemuSemaphore sem_sync;
+    /* sem where to wait for more work */
+    QemuSemaphore sem;
 
     /* this mutex protects the following parameters */
     QemuMutex mutex;
@@ -163,6 +176,8 @@ typedef struct {
     uint32_t flags;
     /* global number of generated multifd packets */
     uint64_t packet_num;
+    int pending_job;
+    MultiFDRecvData *data;
 
     /* thread local variables. No locking required */
 
-- 
2.35.3



  parent reply	other threads:[~2024-02-20 22:43 UTC|newest]

Thread overview: 79+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-02-20 22:41 [PATCH v4 00/34] migration: File based migration with multifd and fixed-ram Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 01/34] docs/devel/migration.rst: Document the file transport Fabiano Rosas
2024-02-23  3:01   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 02/34] tests/qtest/migration: Rename fd_proto test Fabiano Rosas
2024-02-23  3:03   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 03/34] tests/qtest/migration: Add a fd + file test Fabiano Rosas
2024-02-23  3:08   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 04/34] migration/multifd: Remove p->quit from recv side Fabiano Rosas
2024-02-23  3:13   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 05/34] migration/multifd: Release recv sem_sync earlier Fabiano Rosas
2024-02-23  3:16   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 06/34] io: add and implement QIO_CHANNEL_FEATURE_SEEKABLE for channel file Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 07/34] io: Add generic pwritev/preadv interface Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 08/34] io: implement io_pwritev/preadv for QIOChannelFile Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 09/34] io: fsync before closing a file channel Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 10/34] migration/qemu-file: add utility methods for working with seekable channels Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 11/34] migration/ram: Introduce 'fixed-ram' migration capability Fabiano Rosas
2024-02-21  8:41   ` Markus Armbruster
2024-02-21 13:24     ` Fabiano Rosas
2024-02-21 13:50       ` Daniel P. Berrangé
2024-02-21 15:05         ` Fabiano Rosas
2024-02-26  3:07   ` Peter Xu
2024-02-26  3:22   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 12/34] migration: Add fixed-ram URI compatibility check Fabiano Rosas
2024-02-26  3:11   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 13/34] migration/ram: Add outgoing 'fixed-ram' migration Fabiano Rosas
2024-02-26  4:03   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 14/34] migration/ram: Add incoming " Fabiano Rosas
2024-02-26  5:19   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 15/34] tests/qtest/migration: Add tests for fixed-ram file-based migration Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 16/34] migration/multifd: Rename MultiFDSend|RecvParams::data to compress_data Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 17/34] migration/multifd: Decouple recv method from pages Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 18/34] migration/multifd: Allow multifd without packets Fabiano Rosas
2024-02-26  5:57   ` Peter Xu
2024-02-20 22:41 ` Fabiano Rosas [this message]
2024-02-26  6:58   ` [PATCH v4 19/34] migration/multifd: Allow receiving pages " Peter Xu
2024-02-26 19:19     ` Fabiano Rosas
2024-02-26 20:54       ` Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 20/34] migration/multifd: Add outgoing QIOChannelFile support Fabiano Rosas
2024-02-26  7:10   ` Peter Xu
2024-02-26  7:21   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 21/34] migration/multifd: Add incoming " Fabiano Rosas
2024-02-26  7:34   ` Peter Xu
2024-02-26  7:53     ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 22/34] migration/multifd: Prepare multifd sync for fixed-ram migration Fabiano Rosas
2024-02-26  7:47   ` Peter Xu
2024-02-26 22:52     ` Fabiano Rosas
2024-02-27  3:52       ` Peter Xu
2024-02-27 14:00         ` Fabiano Rosas
2024-02-27 23:46           ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 23/34] migration/multifd: Support outgoing fixed-ram stream format Fabiano Rosas
2024-02-26  8:08   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 24/34] migration/multifd: Support incoming " Fabiano Rosas
2024-02-26  8:30   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 25/34] migration/multifd: Add fixed-ram support to fd: URI Fabiano Rosas
2024-02-26  8:37   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 26/34] tests/qtest/migration: Add a multifd + fixed-ram migration test Fabiano Rosas
2024-02-26  8:42   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 27/34] migration: Add direct-io parameter Fabiano Rosas
2024-02-21  9:17   ` Markus Armbruster
2024-02-26  8:50   ` Peter Xu
2024-02-26 13:28     ` Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 28/34] migration/multifd: Add direct-io support Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 29/34] tests/qtest/migration: Add tests for file migration with direct-io Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 30/34] monitor: Honor QMP request for fd removal immediately Fabiano Rosas
2024-02-21  9:20   ` Markus Armbruster
2024-02-20 22:41 ` [PATCH v4 31/34] monitor: Extract fdset fd flags comparison into a function Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 32/34] monitor: fdset: Match against O_DIRECT Fabiano Rosas
2024-02-21  9:27   ` Markus Armbruster
2024-02-21 13:37     ` Fabiano Rosas
2024-02-22  6:56       ` Markus Armbruster
2024-02-22 13:26         ` Fabiano Rosas
2024-02-22 14:44           ` Markus Armbruster
2024-02-20 22:41 ` [PATCH v4 33/34] migration: Add support for fdset with multifd + file Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 34/34] tests/qtest/migration: Add a test for fixed-ram with passing of fds Fabiano Rosas
2024-02-23  2:59 ` [PATCH v4 00/34] migration: File based migration with multifd and fixed-ram Peter Xu
2024-02-23 13:48   ` Claudio Fontana
2024-02-23 14:22   ` Fabiano Rosas
2024-02-26  6:15 ` Peter Xu

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240220224138.24759-20-farosas@suse.de \
    --to=farosas@suse.de \
    --cc=armbru@redhat.com \
    --cc=berrange@redhat.com \
    --cc=cfontana@suse.de \
    --cc=peterx@redhat.com \
    --cc=qemu-devel@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).