qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: Peter Xu <peterx@redhat.com>
To: Fabiano Rosas <farosas@suse.de>
Cc: qemu-devel@nongnu.org, berrange@redhat.com, armbru@redhat.com,
	Claudio Fontana <cfontana@suse.de>
Subject: Re: [PATCH v4 19/34] migration/multifd: Allow receiving pages without packets
Date: Mon, 26 Feb 2024 14:58:50 +0800	[thread overview]
Message-ID: <Zdw2qilH4DVdmmjB@x1n> (raw)
In-Reply-To: <20240220224138.24759-20-farosas@suse.de>

On Tue, Feb 20, 2024 at 07:41:23PM -0300, Fabiano Rosas wrote:
> Currently multifd does not need to have knowledge of pages on the
> receiving side because all the information needed is within the
> packets that come in the stream.
> 
> We're about to add support to fixed-ram migration, which cannot use
> packets because it expects the ramblock section in the migration file
> to contain only the guest pages data.
> 
> Add a data structure to transfer pages between the ram migration code
> and the multifd receiving threads.
> 
> We don't want to reuse MultiFDPages_t for two reasons:
> 
> a) multifd threads don't really need to know about the data they're
>    receiving.
> 
> b) the receiving side has to be stopped to load the pages, which means
>    we can experiment with larger granularities than page size when
>    transferring data.
> 
> Signed-off-by: Fabiano Rosas <farosas@suse.de>
> ---
> @Peter: a 'quit' flag cannot be used instead of pending_job. The
> receiving thread needs know there's no more data coming. If the
> migration thread sets a 'quit' flag, the multifd thread would see the
> flag right away and exit.

Hmm.. isn't this exactly what we want?  I'll comment for this inline below.

> The only way is to clear pending_job on the
> thread and spin once more.
> ---
>  migration/file.c    |   1 +
>  migration/multifd.c | 122 +++++++++++++++++++++++++++++++++++++++++---
>  migration/multifd.h |  15 ++++++
>  3 files changed, 131 insertions(+), 7 deletions(-)
> 
> diff --git a/migration/file.c b/migration/file.c
> index 5d4975f43e..22d052a71f 100644
> --- a/migration/file.c
> +++ b/migration/file.c
> @@ -6,6 +6,7 @@
>   */
>  
>  #include "qemu/osdep.h"
> +#include "exec/ramblock.h"
>  #include "qemu/cutils.h"
>  #include "qapi/error.h"
>  #include "channel.h"
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 0a5279314d..45a0c7aaa8 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -81,9 +81,15 @@ struct {
>  
>  struct {
>      MultiFDRecvParams *params;
> +    MultiFDRecvData *data;
>      /* number of created threads */
>      int count;
> -    /* syncs main thread and channels */
> +    /*
> +     * For sockets: this is posted once for each MULTIFD_FLAG_SYNC flag.
> +     *
> +     * For files: this is only posted at the end of the file load to mark
> +     *            completion of the load process.
> +     */
>      QemuSemaphore sem_sync;
>      /* global number of generated multifd packets */
>      uint64_t packet_num;
> @@ -1110,6 +1116,53 @@ bool multifd_send_setup(void)
>      return true;
>  }
>  
> +bool multifd_recv(void)
> +{
> +    int i;
> +    static int next_recv_channel;
> +    MultiFDRecvParams *p = NULL;
> +    MultiFDRecvData *data = multifd_recv_state->data;

[1]

> +
> +    /*
> +     * next_channel can remain from a previous migration that was
> +     * using more channels, so ensure it doesn't overflow if the
> +     * limit is lower now.
> +     */
> +    next_recv_channel %= migrate_multifd_channels();
> +    for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
> +        if (multifd_recv_should_exit()) {
> +            return false;
> +        }
> +
> +        p = &multifd_recv_state->params[i];
> +
> +        /*
> +         * Safe to read atomically without a lock because the flag is
> +         * only set by this function below. Reading an old value of
> +         * true is not an issue because it would only send us looking
> +         * for the next idle channel.
> +         */
> +        if (qatomic_read(&p->pending_job) == false) {
> +            next_recv_channel = (i + 1) % migrate_multifd_channels();
> +            break;
> +        }
> +    }

IIUC you'll need an smp_mb_acquire() here.  The ordering of "reading
pending_job" and below must be guaranteed, similar to the sender side.

> +
> +    assert(!p->data->size);
> +    multifd_recv_state->data = p->data;

[2]

> +    p->data = data;
> +
> +    qatomic_set(&p->pending_job, true);

Then here:

       qatomic_store_release(&p->pending_job, true);

Please consider add comment above all acquire/releases pairs like sender
too.

> +    qemu_sem_post(&p->sem);
> +
> +    return true;
> +}
> +
> +MultiFDRecvData *multifd_get_recv_data(void)
> +{
> +    return multifd_recv_state->data;
> +}

Can also use it above [1].

I'm thinking maybe we can do something like:

#define  MULTIFD_RECV_DATA_GLOBAL  (multifd_recv_state->data)

Then we can also use it at [2], and replace multifd_get_recv_data()?

> +
>  static void multifd_recv_terminate_threads(Error *err)
>  {
>      int i;
> @@ -1134,11 +1187,26 @@ static void multifd_recv_terminate_threads(Error *err)
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
>          /*
> -         * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
> -         * however try to wakeup it without harm in cleanup phase.
> +         * The migration thread and channels interact differently
> +         * depending on the presence of packets.
>           */
>          if (multifd_use_packets()) {
> +            /*
> +             * The channel receives as long as there are packets. When
> +             * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
> +             * channel waits for the migration thread to sync. If the
> +             * sync never happens, do it here.
> +             */
>              qemu_sem_post(&p->sem_sync);
> +        } else {
> +            /*
> +             * The channel waits for the migration thread to give it
> +             * work. When the migration thread runs out of work, it
> +             * releases the channel and waits for any pending work to
> +             * finish. If we reach here (e.g. due to error) before the
> +             * work runs out, release the channel.
> +             */
> +            qemu_sem_post(&p->sem);
>          }
>  
>          /*
> @@ -1167,6 +1235,7 @@ static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
>      p->c = NULL;
>      qemu_mutex_destroy(&p->mutex);
>      qemu_sem_destroy(&p->sem_sync);
> +    qemu_sem_destroy(&p->sem);
>      g_free(p->name);
>      p->name = NULL;
>      p->packet_len = 0;
> @@ -1184,6 +1253,8 @@ static void multifd_recv_cleanup_state(void)
>      qemu_sem_destroy(&multifd_recv_state->sem_sync);
>      g_free(multifd_recv_state->params);
>      multifd_recv_state->params = NULL;
> +    g_free(multifd_recv_state->data);
> +    multifd_recv_state->data = NULL;
>      g_free(multifd_recv_state);
>      multifd_recv_state = NULL;
>  }
> @@ -1251,11 +1322,11 @@ static void *multifd_recv_thread(void *opaque)
>          bool has_data = false;
>          p->normal_num = 0;
>  
> -        if (multifd_recv_should_exit()) {
> -            break;
> -        }
> -
>          if (use_packets) {
> +            if (multifd_recv_should_exit()) {
> +                break;
> +            }
> +
>              ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
>                                             p->packet_len, &local_err);
>              if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
> @@ -1274,6 +1345,26 @@ static void *multifd_recv_thread(void *opaque)
>              p->flags &= ~MULTIFD_FLAG_SYNC;
>              has_data = !!p->normal_num;
>              qemu_mutex_unlock(&p->mutex);
> +        } else {
> +            /*
> +             * No packets, so we need to wait for the vmstate code to
> +             * give us work.
> +             */
> +            qemu_sem_wait(&p->sem);
> +
> +            if (multifd_recv_should_exit()) {
> +                break;
> +            }
> +
> +            /*
> +             * Migration thread did not send work, break and signal
> +             * sem_sync so it knows we're not lagging behind.
> +             */
> +            if (!qatomic_read(&p->pending_job)) {
> +                break;
> +            }

In reality, this _must_ be true when reaching here, right?  Since AFAIU
recv side p->sem is posted only in two conditions:

  1) when there is work (pending_job==true)
  2) when terminating threads (multifd_recv_should_exit==true)

Then if 2) is checked above, I assume 1) must be the case here?

> +
> +            has_data = !!p->data->size;
>          }
>  
>          if (has_data) {
> @@ -1288,9 +1379,17 @@ static void *multifd_recv_thread(void *opaque)
>                  qemu_sem_post(&multifd_recv_state->sem_sync);
>                  qemu_sem_wait(&p->sem_sync);
>              }
> +        } else {
> +            p->total_normal_pages += p->data->size / qemu_target_page_size();
> +            p->data->size = 0;
> +            qatomic_set(&p->pending_job, false);

I think it needs to be:

  qatomic_store_release(&p->pending_job, false);

?

So as to guarantee when the other side sees pending_job==false, size must
already have been reset.

>          }
>      }
>  
> +    if (!use_packets) {
> +        qemu_sem_post(&p->sem_sync);
> +    }
> +
>      if (local_err) {
>          multifd_recv_terminate_threads(local_err);
>          error_free(local_err);
> @@ -1320,6 +1419,10 @@ int multifd_recv_setup(Error **errp)
>      thread_count = migrate_multifd_channels();
>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
> +
> +    multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
> +    multifd_recv_state->data->size = 0;
> +
>      qatomic_set(&multifd_recv_state->count, 0);
>      qatomic_set(&multifd_recv_state->exiting, 0);
>      qemu_sem_init(&multifd_recv_state->sem_sync, 0);
> @@ -1330,8 +1433,13 @@ int multifd_recv_setup(Error **errp)
>  
>          qemu_mutex_init(&p->mutex);
>          qemu_sem_init(&p->sem_sync, 0);
> +        qemu_sem_init(&p->sem, 0);
> +        p->pending_job = false;
>          p->id = i;
>  
> +        p->data = g_new0(MultiFDRecvData, 1);
> +        p->data->size = 0;
> +
>          if (use_packets) {
>              p->packet_len = sizeof(MultiFDPacket_t)
>                  + sizeof(uint64_t) * page_count;
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 9a6a7a72df..19188815a3 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -13,6 +13,8 @@
>  #ifndef QEMU_MIGRATION_MULTIFD_H
>  #define QEMU_MIGRATION_MULTIFD_H
>  
> +typedef struct MultiFDRecvData MultiFDRecvData;
> +
>  bool multifd_send_setup(void);
>  void multifd_send_shutdown(void);
>  int multifd_recv_setup(Error **errp);
> @@ -23,6 +25,8 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
>  void multifd_recv_sync_main(void);
>  int multifd_send_sync_main(void);
>  bool multifd_queue_page(RAMBlock *block, ram_addr_t offset);
> +bool multifd_recv(void);
> +MultiFDRecvData *multifd_get_recv_data(void);
>  
>  /* Multifd Compression flags */
>  #define MULTIFD_FLAG_SYNC (1 << 0)
> @@ -63,6 +67,13 @@ typedef struct {
>      RAMBlock *block;
>  } MultiFDPages_t;
>  
> +struct MultiFDRecvData {
> +    void *opaque;
> +    size_t size;
> +    /* for preadv */
> +    off_t file_offset;
> +};
> +
>  typedef struct {
>      /* Fields are only written at creating/deletion time */
>      /* No lock required for them, they are read only */
> @@ -154,6 +165,8 @@ typedef struct {
>  
>      /* syncs main thread and channels */
>      QemuSemaphore sem_sync;
> +    /* sem where to wait for more work */
> +    QemuSemaphore sem;
>  
>      /* this mutex protects the following parameters */
>      QemuMutex mutex;
> @@ -163,6 +176,8 @@ typedef struct {
>      uint32_t flags;
>      /* global number of generated multifd packets */
>      uint64_t packet_num;
> +    int pending_job;
> +    MultiFDRecvData *data;
>  
>      /* thread local variables. No locking required */
>  
> -- 
> 2.35.3
> 

-- 
Peter Xu



  reply	other threads:[~2024-02-26  6:59 UTC|newest]

Thread overview: 79+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-02-20 22:41 [PATCH v4 00/34] migration: File based migration with multifd and fixed-ram Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 01/34] docs/devel/migration.rst: Document the file transport Fabiano Rosas
2024-02-23  3:01   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 02/34] tests/qtest/migration: Rename fd_proto test Fabiano Rosas
2024-02-23  3:03   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 03/34] tests/qtest/migration: Add a fd + file test Fabiano Rosas
2024-02-23  3:08   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 04/34] migration/multifd: Remove p->quit from recv side Fabiano Rosas
2024-02-23  3:13   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 05/34] migration/multifd: Release recv sem_sync earlier Fabiano Rosas
2024-02-23  3:16   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 06/34] io: add and implement QIO_CHANNEL_FEATURE_SEEKABLE for channel file Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 07/34] io: Add generic pwritev/preadv interface Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 08/34] io: implement io_pwritev/preadv for QIOChannelFile Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 09/34] io: fsync before closing a file channel Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 10/34] migration/qemu-file: add utility methods for working with seekable channels Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 11/34] migration/ram: Introduce 'fixed-ram' migration capability Fabiano Rosas
2024-02-21  8:41   ` Markus Armbruster
2024-02-21 13:24     ` Fabiano Rosas
2024-02-21 13:50       ` Daniel P. Berrangé
2024-02-21 15:05         ` Fabiano Rosas
2024-02-26  3:07   ` Peter Xu
2024-02-26  3:22   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 12/34] migration: Add fixed-ram URI compatibility check Fabiano Rosas
2024-02-26  3:11   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 13/34] migration/ram: Add outgoing 'fixed-ram' migration Fabiano Rosas
2024-02-26  4:03   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 14/34] migration/ram: Add incoming " Fabiano Rosas
2024-02-26  5:19   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 15/34] tests/qtest/migration: Add tests for fixed-ram file-based migration Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 16/34] migration/multifd: Rename MultiFDSend|RecvParams::data to compress_data Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 17/34] migration/multifd: Decouple recv method from pages Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 18/34] migration/multifd: Allow multifd without packets Fabiano Rosas
2024-02-26  5:57   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 19/34] migration/multifd: Allow receiving pages " Fabiano Rosas
2024-02-26  6:58   ` Peter Xu [this message]
2024-02-26 19:19     ` Fabiano Rosas
2024-02-26 20:54       ` Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 20/34] migration/multifd: Add outgoing QIOChannelFile support Fabiano Rosas
2024-02-26  7:10   ` Peter Xu
2024-02-26  7:21   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 21/34] migration/multifd: Add incoming " Fabiano Rosas
2024-02-26  7:34   ` Peter Xu
2024-02-26  7:53     ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 22/34] migration/multifd: Prepare multifd sync for fixed-ram migration Fabiano Rosas
2024-02-26  7:47   ` Peter Xu
2024-02-26 22:52     ` Fabiano Rosas
2024-02-27  3:52       ` Peter Xu
2024-02-27 14:00         ` Fabiano Rosas
2024-02-27 23:46           ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 23/34] migration/multifd: Support outgoing fixed-ram stream format Fabiano Rosas
2024-02-26  8:08   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 24/34] migration/multifd: Support incoming " Fabiano Rosas
2024-02-26  8:30   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 25/34] migration/multifd: Add fixed-ram support to fd: URI Fabiano Rosas
2024-02-26  8:37   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 26/34] tests/qtest/migration: Add a multifd + fixed-ram migration test Fabiano Rosas
2024-02-26  8:42   ` Peter Xu
2024-02-20 22:41 ` [PATCH v4 27/34] migration: Add direct-io parameter Fabiano Rosas
2024-02-21  9:17   ` Markus Armbruster
2024-02-26  8:50   ` Peter Xu
2024-02-26 13:28     ` Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 28/34] migration/multifd: Add direct-io support Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 29/34] tests/qtest/migration: Add tests for file migration with direct-io Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 30/34] monitor: Honor QMP request for fd removal immediately Fabiano Rosas
2024-02-21  9:20   ` Markus Armbruster
2024-02-20 22:41 ` [PATCH v4 31/34] monitor: Extract fdset fd flags comparison into a function Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 32/34] monitor: fdset: Match against O_DIRECT Fabiano Rosas
2024-02-21  9:27   ` Markus Armbruster
2024-02-21 13:37     ` Fabiano Rosas
2024-02-22  6:56       ` Markus Armbruster
2024-02-22 13:26         ` Fabiano Rosas
2024-02-22 14:44           ` Markus Armbruster
2024-02-20 22:41 ` [PATCH v4 33/34] migration: Add support for fdset with multifd + file Fabiano Rosas
2024-02-20 22:41 ` [PATCH v4 34/34] tests/qtest/migration: Add a test for fixed-ram with passing of fds Fabiano Rosas
2024-02-23  2:59 ` [PATCH v4 00/34] migration: File based migration with multifd and fixed-ram Peter Xu
2024-02-23 13:48   ` Claudio Fontana
2024-02-23 14:22   ` Fabiano Rosas
2024-02-26  6:15 ` Peter Xu

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=Zdw2qilH4DVdmmjB@x1n \
    --to=peterx@redhat.com \
    --cc=armbru@redhat.com \
    --cc=berrange@redhat.com \
    --cc=cfontana@suse.de \
    --cc=farosas@suse.de \
    --cc=qemu-devel@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).