qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: Fabiano Rosas <farosas@suse.de>
To: qemu-devel@nongnu.org
Cc: Peter Xu <peterx@redhat.com>,
	"Maciej S . Szmigiero" <mail@maciej.szmigiero.name>
Subject: [RFC PATCH 6/7] migration/multifd: Move payload storage out of the channel parameters
Date: Thu, 20 Jun 2024 18:21:10 -0300	[thread overview]
Message-ID: <20240620212111.29319-7-farosas@suse.de> (raw)
In-Reply-To: <20240620212111.29319-1-farosas@suse.de>

Multifd currently has a simple scheduling mechanism that distributes
work to the various channels by providing the client (producer) with a
memory slot and swapping that slot with free slot from the next idle
channel (consumer). Or graphically:

[]       <-- multifd_send_state->pages
[][][][] <-- channels' p->pages pointers

1) client fills the empty slot with data:
  [a]
  [][][][]

2) multifd_send_pages() finds an idle channel and swaps the pointers:
  [a]
  [][][][]
  ^idle

  []
  [a][][][]

3) client can immediately fill new slot with more data:
  [b]
  [a][][][]

4) channel processes the data, the channel slot is now free to use
   again:
  [b]
  [][][][]

This works just fine, except that it doesn't allow different types of
payloads to be processed at the same time in different channels,
i.e. the data type of multifd_send_state->pages needs to be the same
as p->pages. For each new data type different from MultiFDPage_t that
is to be handled, this logic needs to be duplicated by adding new
fields to multifd_send_state and to the channels.

The core of the issue here is that we're using the channel parameters
(MultiFDSendParams) to hold the storage space on behalf of the multifd
client (currently ram.c). This is cumbersome because it forces us to
change multifd_send_pages() to check the data type being handled
before deciding which field to use.

One way to solve this is to detach the storage space from the multifd
channel and put it somewhere else, in control of the multifd
client. That way, multifd_send_pages() can operate on an opaque
pointer without needing to be adapted to each new data type. Implement
this logic with a new "slots" abstraction:

struct MultiFDSendData {
    void *opaque;
    size_t size;
}

struct MultiFDSlots {
    MultiFDSendData **free;   <-- what used to be p->pages
    MultiFDSendData *active;  <-- what used to be multifd_send_state->pages
};

Each multifd client now gets one set of slots to use. The slots are
passed into multifd_send_pages() (renamed to multifd_send). The
channels now only hold a pointer to the generic MultiFDSendData, and
after it's processed that reference can be dropped.

Or graphically:

1) client fills the active slot with data. Channels point to nothing
   at this point:
  [a]      <-- active slot
  [][][][] <-- free slots, one per-channel

  [][][][] <-- channels' p->data pointers

2) multifd_send() swaps the pointers inside the client slot. Channels
   still point to nothing:
  []
  [a][][][]

  [][][][]

3) multifd_send() finds an idle channel and updates its pointer:
  []
  [a][][][]

  [a][][][]
  ^idle

4) a second client calls multifd_send(), but with it's own slots:
  []          [b]
  [a][][][]   [][][][]

        [a][][][]

5) multifd_send() does steps 2 and 3 again:
  []          []
  [a][][][]   [][b][][]

        [a][b][][]
           ^idle

6) The channels continue processing the data and lose/acquire the
references as multifd_send() updates them. The free lists of each
client are not affected.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
---
 migration/multifd.c | 119 +++++++++++++++++++++++++++++++-------------
 migration/multifd.h |  17 +++++++
 migration/ram.c     |   1 +
 3 files changed, 102 insertions(+), 35 deletions(-)

diff --git a/migration/multifd.c b/migration/multifd.c
index 6fe339b378..f22a1c2e84 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -97,6 +97,30 @@ struct {
     MultiFDMethods *ops;
 } *multifd_recv_state;
 
+MultiFDSlots *multifd_allocate_slots(void *(*alloc_fn)(void),
+                                     void (*reset_fn)(void *),
+                                     void (*cleanup_fn)(void *))
+{
+    int thread_count = migrate_multifd_channels();
+    MultiFDSlots *slots = g_new0(MultiFDSlots, 1);
+
+    slots->active = g_new0(MultiFDSendData, 1);
+    slots->free = g_new0(MultiFDSendData *, thread_count);
+
+    slots->active->opaque = alloc_fn();
+    slots->active->reset = reset_fn;
+    slots->active->cleanup = cleanup_fn;
+
+    for (int i = 0; i < thread_count; i++) {
+        slots->free[i] = g_new0(MultiFDSendData, 1);
+        slots->free[i]->opaque = alloc_fn();
+        slots->free[i]->reset = reset_fn;
+        slots->free[i]->cleanup = cleanup_fn;
+    }
+
+    return slots;
+}
+
 static bool multifd_use_packets(void)
 {
     return !migrate_mapped_ram();
@@ -313,8 +337,10 @@ void multifd_register_ops(int method, MultiFDMethods *ops)
 }
 
 /* Reset a MultiFDPages_t* object for the next use */
-static void multifd_pages_reset(MultiFDPages_t *pages)
+static void multifd_pages_reset(void *opaque)
 {
+    MultiFDPages_t *pages = opaque;
+
     /*
      * We don't need to touch offset[] array, because it will be
      * overwritten later when reused.
@@ -388,8 +414,9 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
     return msg.id;
 }
 
-static MultiFDPages_t *multifd_pages_init(uint32_t n)
+static void *multifd_pages_init(void)
 {
+    uint32_t n = MULTIFD_PACKET_SIZE / qemu_target_page_size();
     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
 
     pages->allocated = n;
@@ -398,13 +425,24 @@ static MultiFDPages_t *multifd_pages_init(uint32_t n)
     return pages;
 }
 
-static void multifd_pages_clear(MultiFDPages_t *pages)
+static void multifd_pages_clear(void *opaque)
 {
+    MultiFDPages_t *pages = opaque;
+
     multifd_pages_reset(pages);
     pages->allocated = 0;
     g_free(pages->offset);
     pages->offset = NULL;
-    g_free(pages);
+}
+
+/* TODO: move these to multifd-ram.c */
+MultiFDSlots *multifd_ram_send_slots;
+
+void multifd_ram_save_setup(void)
+{
+    multifd_ram_send_slots = multifd_allocate_slots(multifd_pages_init,
+                                                    multifd_pages_reset,
+                                                    multifd_pages_clear);
 }
 
 static void multifd_ram_fill_packet(MultiFDSendParams *p)
@@ -617,13 +655,12 @@ static void multifd_send_kick_main(MultiFDSendParams *p)
  *
  * Returns true if succeed, false otherwise.
  */
-static bool multifd_send_pages(void)
+static bool multifd_send(MultiFDSlots *slots)
 {
     int i;
     static int next_channel;
     MultiFDSendParams *p = NULL; /* make happy gcc */
-    MultiFDPages_t *channel_pages;
-    MultiFDSendData *data = multifd_send_state->data;
+    MultiFDSendData *active_slot;
 
     if (multifd_send_should_exit()) {
         return false;
@@ -659,11 +696,24 @@ static bool multifd_send_pages(void)
      */
     smp_mb_acquire();
 
-    channel_pages = p->data->opaque;
-    assert(!channel_pages->num);
+    assert(!slots->free[p->id]->size);
+
+    /*
+     * Swap the slots. The client gets a free slot to fill up for the
+     * next iteration and the channel gets the active slot for
+     * processing.
+     */
+    active_slot = slots->active;
+    slots->active = slots->free[p->id];
+    p->data = active_slot;
+
+    /*
+     * By the next time we arrive here, the channel will certainly
+     * have consumed the active slot. Put it back on the free list
+     * now.
+     */
+    slots->free[p->id] = active_slot;
 
-    multifd_send_state->data = p->data;
-    p->data = data;
     /*
      * Making sure p->data is setup before marking pending_job=true. Pairs
      * with the qatomic_load_acquire() in multifd_send_thread().
@@ -687,6 +737,7 @@ static inline bool multifd_queue_full(MultiFDPages_t *pages)
 static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
 {
     pages->offset[pages->num++] = offset;
+    multifd_ram_send_slots->active->size += qemu_target_page_size();
 }
 
 /* Returns true if enqueue successful, false otherwise */
@@ -695,7 +746,7 @@ bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
     MultiFDPages_t *pages;
 
 retry:
-    pages = multifd_send_state->data->opaque;
+    pages = multifd_ram_send_slots->active->opaque;
 
     /* If the queue is empty, we can already enqueue now */
     if (multifd_queue_empty(pages)) {
@@ -713,7 +764,7 @@ retry:
      * After flush, always retry.
      */
     if (pages->block != block || multifd_queue_full(pages)) {
-        if (!multifd_send_pages()) {
+        if (!multifd_send(multifd_ram_send_slots)) {
             return false;
         }
         goto retry;
@@ -825,10 +876,12 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
     qemu_sem_destroy(&p->sem_sync);
     g_free(p->name);
     p->name = NULL;
-    multifd_pages_clear(p->data->opaque);
-    p->data->opaque = NULL;
-    g_free(p->data);
-    p->data = NULL;
+    if (p->data) {
+        p->data->cleanup(p->data->opaque);
+        p->data->opaque = NULL;
+        /* p->data was not allocated by us, just clear the pointer */
+        p->data = NULL;
+    }
     p->packet_len = 0;
     g_free(p->packet);
     p->packet = NULL;
@@ -845,10 +898,6 @@ static void multifd_send_cleanup_state(void)
     qemu_sem_destroy(&multifd_send_state->channels_ready);
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
-    multifd_pages_clear(multifd_send_state->data->opaque);
-    multifd_send_state->data->opaque = NULL;
-    g_free(multifd_send_state->data);
-    multifd_send_state->data = NULL;
     g_free(multifd_send_state);
     multifd_send_state = NULL;
 }
@@ -897,14 +946,13 @@ int multifd_send_sync_main(void)
 {
     int i;
     bool flush_zero_copy;
-    MultiFDPages_t *pages;
 
     if (!migrate_multifd()) {
         return 0;
     }
-    pages = multifd_send_state->data->opaque;
-    if (pages->num) {
-        if (!multifd_send_pages()) {
+
+    if (multifd_ram_send_slots->active->size) {
+        if (!multifd_send(multifd_ram_send_slots)) {
             error_report("%s: multifd_send_pages fail", __func__);
             return -1;
         }
@@ -979,13 +1027,11 @@ static void *multifd_send_thread(void *opaque)
 
         /*
          * Read pending_job flag before p->data.  Pairs with the
-         * qatomic_store_release() in multifd_send_pages().
+         * qatomic_store_release() in multifd_send().
          */
         if (qatomic_load_acquire(&p->pending_job)) {
-            MultiFDPages_t *pages = p->data->opaque;
-
             p->iovs_num = 0;
-            assert(pages->num);
+            assert(p->data->size);
 
             ret = multifd_send_state->ops->send_prepare(p, &local_err);
             if (ret != 0) {
@@ -1008,13 +1054,20 @@ static void *multifd_send_thread(void *opaque)
             stat64_add(&mig_stats.multifd_bytes,
                        p->next_packet_size + p->packet_len);
 
-            multifd_pages_reset(pages);
             p->next_packet_size = 0;
 
+            /*
+             * The data has now been sent. Since multifd_send()
+             * already put this slot on the free list, reset the
+             * entire slot before releasing the barrier below.
+             */
+            p->data->size = 0;
+            p->data->reset(p->data->opaque);
+
             /*
              * Making sure p->data is published before saying "we're
              * free".  Pairs with the smp_mb_acquire() in
-             * multifd_send_pages().
+             * multifd_send().
              */
             qatomic_store_release(&p->pending_job, false);
         } else {
@@ -1208,8 +1261,6 @@ bool multifd_send_setup(void)
     thread_count = migrate_multifd_channels();
     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
-    multifd_send_state->data = g_new0(MultiFDSendData, 1);
-    multifd_send_state->data->opaque = multifd_pages_init(page_count);
     qemu_sem_init(&multifd_send_state->channels_created, 0);
     qemu_sem_init(&multifd_send_state->channels_ready, 0);
     qatomic_set(&multifd_send_state->exiting, 0);
@@ -1221,8 +1272,6 @@ bool multifd_send_setup(void)
         qemu_sem_init(&p->sem, 0);
         qemu_sem_init(&p->sem_sync, 0);
         p->id = i;
-        p->data = g_new0(MultiFDSendData, 1);
-        p->data->opaque = multifd_pages_init(page_count);
 
         if (use_packets) {
             p->packet_len = sizeof(MultiFDPacket_t)
diff --git a/migration/multifd.h b/migration/multifd.h
index 2029bfd80a..5230729077 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -17,6 +17,10 @@
 
 typedef struct MultiFDRecvData MultiFDRecvData;
 typedef struct MultiFDSendData MultiFDSendData;
+typedef struct MultiFDSlots MultiFDSlots;
+
+typedef void *(multifd_data_alloc_cb)(void);
+typedef void (multifd_data_cleanup_cb)(void *);
 
 bool multifd_send_setup(void);
 void multifd_send_shutdown(void);
@@ -93,8 +97,21 @@ struct MultiFDRecvData {
 struct MultiFDSendData {
     void *opaque;
     size_t size;
+    /* reset the slot for reuse after successful transfer */
+    void (*reset)(void *);
+    void (*cleanup)(void *);
 };
 
+struct MultiFDSlots {
+    MultiFDSendData **free;
+    MultiFDSendData *active;
+};
+
+MultiFDSlots *multifd_allocate_slots(void *(*alloc_fn)(void),
+                                     void (*reset_fn)(void *),
+                                     void (*cleanup_fn)(void *));
+void multifd_ram_save_setup(void);
+
 typedef struct {
     /* Fields are only written at creating/deletion time */
     /* No lock required for them, they are read only */
diff --git a/migration/ram.c b/migration/ram.c
index ceea586b06..c33a9dcf3f 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -3058,6 +3058,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque, Error **errp)
     migration_ops = g_malloc0(sizeof(MigrationOps));
 
     if (migrate_multifd()) {
+        multifd_ram_save_setup();
         migration_ops->ram_save_target_page = ram_save_target_page_multifd;
     } else {
         migration_ops->ram_save_target_page = ram_save_target_page_legacy;
-- 
2.35.3



  parent reply	other threads:[~2024-06-20 21:22 UTC|newest]

Thread overview: 46+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-06-20 21:21 [RFC PATCH 0/7] migration/multifd: Introduce storage slots Fabiano Rosas
2024-06-20 21:21 ` [RFC PATCH 1/7] migration/multifd: Reduce access to p->pages Fabiano Rosas
2024-06-21 14:42   ` Peter Xu
2024-06-20 21:21 ` [RFC PATCH 2/7] migration/multifd: Pass in MultiFDPages_t to file_write_ramblock_iov Fabiano Rosas
2024-06-20 21:21 ` [RFC PATCH 3/7] migration/multifd: Replace p->pages with an opaque pointer Fabiano Rosas
2024-06-20 21:21 ` [RFC PATCH 4/7] migration/multifd: Move pages accounting into multifd_send_zero_page_detect() Fabiano Rosas
2024-06-20 21:21 ` [RFC PATCH 5/7] migration/multifd: Isolate ram pages packet data Fabiano Rosas
2024-07-19 14:40   ` Fabiano Rosas
2024-06-20 21:21 ` Fabiano Rosas [this message]
2024-06-27  3:27   ` [RFC PATCH 6/7] migration/multifd: Move payload storage out of the channel parameters Wang, Lei
2024-06-27 14:40     ` Peter Xu
2024-06-27 15:17       ` Peter Xu
2024-07-10 16:10       ` Fabiano Rosas
2024-07-10 19:10         ` Peter Xu
2024-07-10 20:16           ` Fabiano Rosas
2024-07-10 21:55             ` Peter Xu
2024-07-11 14:12               ` Fabiano Rosas
2024-07-11 16:11                 ` Peter Xu
2024-07-11 19:37                   ` Fabiano Rosas
2024-07-11 20:27                     ` Peter Xu
2024-07-11 21:12                       ` Fabiano Rosas
2024-07-11 22:06                         ` Peter Xu
2024-07-12 12:44                           ` Fabiano Rosas
2024-07-12 15:37                             ` Peter Xu
2024-07-18 19:39                   ` Fabiano Rosas
2024-07-18 21:12                     ` Peter Xu
2024-07-18 21:27                       ` Fabiano Rosas
2024-07-18 21:52                         ` Peter Xu
2024-07-18 22:32                           ` Fabiano Rosas
2024-07-19 14:04                             ` Peter Xu
2024-07-19 16:54                               ` Fabiano Rosas
2024-07-19 17:58                                 ` Peter Xu
2024-07-19 21:30                                   ` Fabiano Rosas
2024-07-16 20:10             ` Maciej S. Szmigiero
2024-07-17 19:00               ` Peter Xu
2024-07-17 21:07                 ` Maciej S. Szmigiero
2024-07-17 21:30                   ` Peter Xu
2024-06-20 21:21 ` [RFC PATCH 7/7] migration/multifd: Hide multifd slots implementation Fabiano Rosas
2024-06-21 14:44 ` [RFC PATCH 0/7] migration/multifd: Introduce storage slots Maciej S. Szmigiero
2024-06-21 15:04   ` Fabiano Rosas
2024-06-21 15:31     ` Maciej S. Szmigiero
2024-06-21 15:56       ` Peter Xu
2024-06-21 17:40         ` Maciej S. Szmigiero
2024-06-21 20:54           ` Peter Xu
2024-06-23 20:25             ` Maciej S. Szmigiero
2024-06-23 20:45               ` Peter Xu

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240620212111.29319-7-farosas@suse.de \
    --to=farosas@suse.de \
    --cc=mail@maciej.szmigiero.name \
    --cc=peterx@redhat.com \
    --cc=qemu-devel@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).