All of lore.kernel.org
 help / color / mirror / Atom feed
From: Peter Xu <peterx@redhat.com>
To: Fabiano Rosas <farosas@suse.de>
Cc: qemu-devel@nongnu.org, berrange@redhat.com, armbru@redhat.com,
	Juan Quintela <quintela@redhat.com>,
	Leonardo Bras <leobras@redhat.com>,
	Claudio Fontana <cfontana@suse.de>
Subject: Re: [RFC PATCH v3 18/30] migration/multifd: Allow receiving pages without packets
Date: Tue, 16 Jan 2024 16:10:59 +0800	[thread overview]
Message-ID: <ZaY6E4tefb5DGEp9@x1n> (raw)
In-Reply-To: <20231127202612.23012-19-farosas@suse.de>

On Mon, Nov 27, 2023 at 05:26:00PM -0300, Fabiano Rosas wrote:
> Currently multifd does not need to have knowledge of pages on the
> receiving side because all the information needed is within the
> packets that come in the stream.
> 
> We're about to add support to fixed-ram migration, which cannot use
> packets because it expects the ramblock section in the migration file
> to contain only the guest pages data.
> 
> Add a data structure to transfer pages between the ram migration code
> and the multifd receiving threads.
> 
> We don't want to reuse MultiFDPages_t for two reasons:
> 
> a) multifd threads don't really need to know about the data they're
>    receiving.
> 
> b) the receiving side has to be stopped to load the pages, which means
>    we can experiment with larger granularities than page size when
>    transferring data.
> 
> Signed-off-by: Fabiano Rosas <farosas@suse.de>
> ---
> - stopped using MultiFDPages_t and added a new structure which can
>   take offset + size
> ---
>  migration/multifd.c | 122 ++++++++++++++++++++++++++++++++++++++++++--
>  migration/multifd.h |  20 ++++++++
>  2 files changed, 138 insertions(+), 4 deletions(-)
> 
> diff --git a/migration/multifd.c b/migration/multifd.c
> index c1381bdc21..7dfab2367a 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -142,17 +142,36 @@ static void nocomp_recv_cleanup(MultiFDRecvParams *p)
>  static int nocomp_recv_data(MultiFDRecvParams *p, Error **errp)
>  {
>      uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
> +    ERRP_GUARD();
>  
>      if (flags != MULTIFD_FLAG_NOCOMP) {
>          error_setg(errp, "multifd %u: flags received %x flags expected %x",
>                     p->id, flags, MULTIFD_FLAG_NOCOMP);
>          return -1;
>      }
> -    for (int i = 0; i < p->normal_num; i++) {
> -        p->iov[i].iov_base = p->host + p->normal[i];
> -        p->iov[i].iov_len = p->page_size;
> +
> +    if (!migrate_multifd_packets()) {
> +        MultiFDRecvData *data = p->data;
> +        size_t ret;
> +
> +        ret = qio_channel_pread(p->c, (char *) data->opaque,
> +                                data->size, data->file_offset, errp);
> +        if (ret != data->size) {
> +            error_prepend(errp,
> +                          "multifd recv (%u): read 0x%zx, expected 0x%zx",
> +                          p->id, ret, data->size);
> +            return -1;
> +        }
> +
> +        return 0;
> +    } else {
> +        for (int i = 0; i < p->normal_num; i++) {
> +            p->iov[i].iov_base = p->host + p->normal[i];
> +            p->iov[i].iov_len = p->page_size;
> +        }
> +
> +        return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
>      }

I guess you managed to squash the file loads into "no compression" handler
of multifd, but IMHO it's not as clean.

Firstly, if to do so, we'd better make sure multifd-compression is not
enabled anywhere together with fixed-ram.  I didn't yet see such protection
in the series.  I think if it happens we should expect crashes because
they'll go into zlib/zstd paths for the file.

IMHO the only model fixed-ram can share with multifd is the task management
part, mutexes, semaphores, etc..  IIRC I used to mention that it'll be nice
if we have simply a pool of threads so we can enqueue tasks.  If that's too
far away, would something like below closer to that?  What I'm thinking:

  - patch 1: rename MultiFDMethods to MultiFDCompressMethods, this can
    replace the other patch to do s/recv_pages/recv_data/

  - patch 2: introduce MultiFDMethods (on top of MultiFDCompressMethods),
    refactor the current code to provide the socket version of MultiFDMethods.

  - patch 3: add the fixed-ram "file" version of MultiFDMethods

MultiFDCompressMethods doesn't need to be used at all for "file" version of
MultiFDMethods.

Would that work?

> -    return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
>  }
>  
>  static MultiFDMethods multifd_nocomp_ops = {
> @@ -989,6 +1008,7 @@ int multifd_save_setup(Error **errp)
>  
>  struct {
>      MultiFDRecvParams *params;
> +    MultiFDRecvData *data;

(If above would work, maybe we can split MultiFDRecvParams into two chunks,
 one commonly used for both, one only for sockets?)

>      /* number of created threads */
>      int count;
>      /* syncs main thread and channels */
> @@ -999,6 +1019,49 @@ struct {
>      MultiFDMethods *ops;
>  } *multifd_recv_state;
>  
> +int multifd_recv(void)
> +{
> +    int i;
> +    static int next_recv_channel;
> +    MultiFDRecvParams *p = NULL;
> +    MultiFDRecvData *data = multifd_recv_state->data;
> +
> +    /*
> +     * next_channel can remain from a previous migration that was
> +     * using more channels, so ensure it doesn't overflow if the
> +     * limit is lower now.
> +     */
> +    next_recv_channel %= migrate_multifd_channels();
> +    for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
> +        p = &multifd_recv_state->params[i];
> +
> +        qemu_mutex_lock(&p->mutex);
> +        if (p->quit) {
> +            error_report("%s: channel %d has already quit!", __func__, i);
> +            qemu_mutex_unlock(&p->mutex);
> +            return -1;
> +        }
> +        if (!p->pending_job) {
> +            p->pending_job++;
> +            next_recv_channel = (i + 1) % migrate_multifd_channels();
> +            break;
> +        }
> +        qemu_mutex_unlock(&p->mutex);
> +    }
> +    assert(p->data->size == 0);
> +    multifd_recv_state->data = p->data;
> +    p->data = data;
> +    qemu_mutex_unlock(&p->mutex);
> +    qemu_sem_post(&p->sem);
> +
> +    return 1;
> +}

PS: so if we have the pool model we can already mostly merge above code
with multifd_send_pages().. because this will be a common helper to enqueue
a task to a pool, no matter it's for writting (to file/socket) or reading
(only from file).

> +
> +MultiFDRecvData *multifd_get_recv_data(void)
> +{
> +    return multifd_recv_state->data;
> +}
> +
>  static void multifd_recv_terminate_threads(Error *err)
>  {
>      int i;
> @@ -1020,6 +1083,7 @@ static void multifd_recv_terminate_threads(Error *err)
>  
>          qemu_mutex_lock(&p->mutex);
>          p->quit = true;
> +        qemu_sem_post(&p->sem);
>          /*
>           * We could arrive here for two reasons:
>           *  - normal quit, i.e. everything went fine, just finished
> @@ -1069,6 +1133,7 @@ void multifd_load_cleanup(void)
>          p->c = NULL;
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem_sync);
> +        qemu_sem_destroy(&p->sem);
>          g_free(p->name);
>          p->name = NULL;
>          p->packet_len = 0;
> @@ -1083,6 +1148,8 @@ void multifd_load_cleanup(void)
>      qemu_sem_destroy(&multifd_recv_state->sem_sync);
>      g_free(multifd_recv_state->params);
>      multifd_recv_state->params = NULL;
> +    g_free(multifd_recv_state->data);
> +    multifd_recv_state->data = NULL;
>      g_free(multifd_recv_state);
>      multifd_recv_state = NULL;
>  }
> @@ -1094,6 +1161,21 @@ void multifd_recv_sync_main(void)
>      if (!migrate_multifd() || !migrate_multifd_packets()) {

[1]

>          return;
>      }
> +
> +    if (!migrate_multifd_packets()) {

Hmm, isn't this checked already above at [1]?  Could this path ever trigger
then?  Maybe we need to drop the one at [1]?

IIUC what you wanted to do here is relying on the last RAM_SAVE_FLAG_EOS in
the image file to do a full flush to make sure all pages are loaded.

You may want to be careful on the side effect of flush_after_each_section
parameter:

        case RAM_SAVE_FLAG_EOS:
            /* normal exit */
            if (migrate_multifd() &&
                migrate_multifd_flush_after_each_section()) {
                multifd_recv_sync_main();
            }

You may want to flush always for file?

> +        for (i = 0; i < migrate_multifd_channels(); i++) {
> +            MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +
> +            qemu_sem_post(&p->sem);
> +            qemu_sem_wait(&p->sem_sync);
> +
> +            qemu_mutex_lock(&p->mutex);
> +            assert(!p->pending_job || p->quit);
> +            qemu_mutex_unlock(&p->mutex);
> +        }
> +        return;

Btw, how does this kick off all the recv threads?  Is it because you did a
sem_post(&sem) with p->pending_job==false this time?

Maybe it's clearer to just set p->quit (or a global quite knob) somewhere?
That'll be clear that this is a one-shot thing, only needed at the end of
the file incoming migration.

> +    }
> +
>      for (i = 0; i < migrate_multifd_channels(); i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
> @@ -1156,6 +1238,18 @@ static void *multifd_recv_thread(void *opaque)
>  
>              p->total_normal_pages += p->normal_num;
>              has_data = !!p->normal_num;
> +        } else {
> +            /*
> +             * No packets, so we need to wait for the vmstate code to
> +             * give us work.
> +             */
> +            qemu_sem_wait(&p->sem);
> +            qemu_mutex_lock(&p->mutex);
> +            if (!p->pending_job) {
> +                qemu_mutex_unlock(&p->mutex);
> +                break;
> +            }
> +            has_data = !!p->data->size;
>          }
>  
>          qemu_mutex_unlock(&p->mutex);
> @@ -1171,6 +1265,17 @@ static void *multifd_recv_thread(void *opaque)
>              qemu_sem_post(&multifd_recv_state->sem_sync);
>              qemu_sem_wait(&p->sem_sync);
>          }
> +
> +        if (!use_packets) {
> +            qemu_mutex_lock(&p->mutex);
> +            p->data->size = 0;
> +            p->pending_job--;
> +            qemu_mutex_unlock(&p->mutex);
> +        }
> +    }
> +
> +    if (!use_packets) {
> +        qemu_sem_post(&p->sem_sync);

Currently sem_sync is only posted with MULTIFD_FLAG_SYNC flag.  We'd better
be careful on reusing it.

Maybe add some comment above recv_state->sem_sync?

  /*
   * For sockets: this is posted once for each MULTIFD_FLAG_SYNC flag.
   *
   * For files: this is only posted at the end of the file load to mark
   *            completion of the load process.
   */

>      }
>  
>      if (local_err) {
> @@ -1205,6 +1310,10 @@ int multifd_load_setup(Error **errp)
>      thread_count = migrate_multifd_channels();
>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
> +
> +    multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
> +    multifd_recv_state->data->size = 0;
> +
>      qatomic_set(&multifd_recv_state->count, 0);
>      qemu_sem_init(&multifd_recv_state->sem_sync, 0);
>      multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
> @@ -1214,9 +1323,14 @@ int multifd_load_setup(Error **errp)
>  
>          qemu_mutex_init(&p->mutex);
>          qemu_sem_init(&p->sem_sync, 0);
> +        qemu_sem_init(&p->sem, 0);
>          p->quit = false;
> +        p->pending_job = 0;
>          p->id = i;
>  
> +        p->data = g_new0(MultiFDRecvData, 1);
> +        p->data->size = 0;
> +
>          if (use_packets) {
>              p->packet_len = sizeof(MultiFDPacket_t)
>                  + sizeof(uint64_t) * page_count;
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 406d42dbae..abaf16c3f2 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -13,6 +13,8 @@
>  #ifndef QEMU_MIGRATION_MULTIFD_H
>  #define QEMU_MIGRATION_MULTIFD_H
>  
> +typedef struct MultiFDRecvData MultiFDRecvData;
> +
>  int multifd_save_setup(Error **errp);
>  void multifd_save_cleanup(void);
>  int multifd_load_setup(Error **errp);
> @@ -24,6 +26,8 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
>  void multifd_recv_sync_main(void);
>  int multifd_send_sync_main(QEMUFile *f);
>  int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset);
> +int multifd_recv(void);
> +MultiFDRecvData *multifd_get_recv_data(void);
>  
>  /* Multifd Compression flags */
>  #define MULTIFD_FLAG_SYNC (1 << 0)
> @@ -66,6 +70,13 @@ typedef struct {
>      RAMBlock *block;
>  } MultiFDPages_t;
>  
> +struct MultiFDRecvData {
> +    void *opaque;
> +    size_t size;
> +    /* for preadv */
> +    off_t file_offset;
> +};
> +
>  typedef struct {
>      /* Fields are only written at creating/deletion time */
>      /* No lock required for them, they are read only */
> @@ -156,6 +167,8 @@ typedef struct {
>  
>      /* syncs main thread and channels */
>      QemuSemaphore sem_sync;
> +    /* sem where to wait for more work */
> +    QemuSemaphore sem;
>  
>      /* this mutex protects the following parameters */
>      QemuMutex mutex;
> @@ -167,6 +180,13 @@ typedef struct {
>      uint32_t flags;
>      /* global number of generated multifd packets */
>      uint64_t packet_num;
> +    int pending_job;
> +    /*
> +     * The owner of 'data' depends of 'pending_job' value:
> +     * pending_job == 0 -> migration_thread can use it.
> +     * pending_job != 0 -> multifd_channel can use it.
> +     */
> +    MultiFDRecvData *data;

Right after the main thread assigns a chunk of memory to load for a recv
thread, the main thread job done, afaict.  I don't see how a race could
happen here.

I'm not sure, but I _think_ if we rely on p->quite or something similar to
quite all recv threads, then this can be dropped?

>  
>      /* thread local variables. No locking required */
>  
> -- 
> 2.35.3
> 

-- 
Peter Xu



  reply	other threads:[~2024-01-16  8:11 UTC|newest]

Thread overview: 95+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-11-27 20:25 [RFC PATCH v3 00/30] migration: File based migration with multifd and fixed-ram Fabiano Rosas
2023-11-27 20:25 ` [RFC PATCH v3 01/30] io: add and implement QIO_CHANNEL_FEATURE_SEEKABLE for channel file Fabiano Rosas
2024-01-10  8:49   ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 02/30] io: Add generic pwritev/preadv interface Fabiano Rosas
2024-01-10  9:07   ` Daniel P. Berrangé
2024-01-11  6:59   ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 03/30] io: implement io_pwritev/preadv for QIOChannelFile Fabiano Rosas
2024-01-10  9:08   ` Daniel P. Berrangé
2024-01-11  7:04   ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 04/30] io: fsync before closing a file channel Fabiano Rosas
2024-01-10  9:04   ` Daniel P. Berrangé
2024-01-11  8:44   ` Peter Xu
2024-01-11 18:46     ` Fabiano Rosas
2024-01-12  0:01       ` Peter Xu
2024-01-12 10:40         ` Daniel P. Berrangé
2024-01-15  3:38           ` Peter Xu
2024-01-15  8:57       ` Peter Xu
2024-01-15  9:03         ` Daniel P. Berrangé
2024-01-15  9:31           ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 05/30] migration/qemu-file: add utility methods for working with seekable channels Fabiano Rosas
2024-01-11  9:57   ` Peter Xu
2024-01-11 18:49     ` Fabiano Rosas
2023-11-27 20:25 ` [RFC PATCH v3 06/30] migration/ram: Introduce 'fixed-ram' migration capability Fabiano Rosas
2023-12-22 10:35   ` Markus Armbruster
2024-01-11 10:43   ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 07/30] migration: Add fixed-ram URI compatibility check Fabiano Rosas
2024-01-15  9:01   ` Peter Xu
2024-01-23 19:07     ` Fabiano Rosas
2024-01-23 19:07     ` Fabiano Rosas
2023-11-27 20:25 ` [RFC PATCH v3 08/30] migration/ram: Add outgoing 'fixed-ram' migration Fabiano Rosas
2024-01-15  9:28   ` Peter Xu
2024-01-15 14:50     ` Fabiano Rosas
2023-11-27 20:25 ` [RFC PATCH v3 09/30] migration/ram: Add incoming " Fabiano Rosas
2024-01-15  9:49   ` Peter Xu
2024-01-15 16:43     ` Fabiano Rosas
2023-11-27 20:25 ` [RFC PATCH v3 10/30] tests/qtest: migration-test: Add tests for fixed-ram file-based migration Fabiano Rosas
2024-01-15 10:01   ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 11/30] migration/multifd: Allow multifd without packets Fabiano Rosas
2024-01-15 11:51   ` Peter Xu
2024-01-15 18:39     ` Fabiano Rosas
2024-01-15 23:01       ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 12/30] migration/multifd: Allow QIOTask error reporting without an object Fabiano Rosas
2024-01-15 12:06   ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 13/30] migration/multifd: Add outgoing QIOChannelFile support Fabiano Rosas
2024-01-16  4:05   ` Peter Xu
2024-01-16  7:25     ` Peter Xu
2024-01-16 13:37     ` Fabiano Rosas
2024-01-17  8:28       ` Peter Xu
2024-01-17 17:34         ` Fabiano Rosas
2024-01-18  7:11           ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 14/30] migration/multifd: Add incoming " Fabiano Rosas
2024-01-16  6:29   ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 15/30] io: Add a pwritev/preadv version that takes a discontiguous iovec Fabiano Rosas
2024-01-16  6:58   ` Peter Xu
2024-01-16 18:15     ` Fabiano Rosas
2024-01-17  9:48       ` Peter Xu
2024-01-17 18:06         ` Fabiano Rosas
2024-01-18  7:44           ` Peter Xu
2024-01-18 12:47             ` Fabiano Rosas
2024-01-19  0:22               ` Peter Xu
2024-01-17 12:39   ` Daniel P. Berrangé
2024-01-17 14:27     ` Daniel P. Berrangé
2024-01-17 18:09       ` Fabiano Rosas
2023-11-27 20:25 ` [RFC PATCH v3 16/30] multifd: Rename MultiFDSendParams::data to compress_data Fabiano Rosas
2024-01-16  7:03   ` Peter Xu
2023-11-27 20:25 ` [RFC PATCH v3 17/30] migration/multifd: Decouple recv method from pages Fabiano Rosas
2024-01-16  7:23   ` Peter Xu
2023-11-27 20:26 ` [RFC PATCH v3 18/30] migration/multifd: Allow receiving pages without packets Fabiano Rosas
2024-01-16  8:10   ` Peter Xu [this message]
2024-01-16 20:25     ` Fabiano Rosas
2024-01-19  0:20       ` Peter Xu
2024-01-19 12:57         ` Fabiano Rosas
2023-11-27 20:26 ` [RFC PATCH v3 19/30] migration/ram: Ignore multifd flush when doing fixed-ram migration Fabiano Rosas
2024-01-16  8:23   ` Peter Xu
2024-01-17 18:13     ` Fabiano Rosas
2024-01-19  1:33       ` Peter Xu
2023-11-27 20:26 ` [RFC PATCH v3 20/30] migration/multifd: Support outgoing fixed-ram stream format Fabiano Rosas
2023-11-27 20:26 ` [RFC PATCH v3 21/30] migration/multifd: Support incoming " Fabiano Rosas
2023-11-27 20:26 ` [RFC PATCH v3 22/30] tests/qtest: Add a multifd + fixed-ram migration test Fabiano Rosas
2023-11-27 20:26 ` [RFC PATCH v3 23/30] migration: Add direct-io parameter Fabiano Rosas
2023-12-22 10:38   ` Markus Armbruster
2023-11-27 20:26 ` [RFC PATCH v3 24/30] tests/qtest: Add a test for migration with direct-io and multifd Fabiano Rosas
2023-11-27 20:26 ` [RFC PATCH v3 25/30] monitor: Honor QMP request for fd removal immediately Fabiano Rosas
2023-11-27 20:26 ` [RFC PATCH v3 26/30] monitor: Extract fdset fd flags comparison into a function Fabiano Rosas
2023-11-27 20:26 ` [RFC PATCH v3 27/30] monitor: fdset: Match against O_DIRECT Fabiano Rosas
2023-11-27 20:26 ` [RFC PATCH v3 28/30] docs/devel/migration.rst: Document the file transport Fabiano Rosas
2023-11-27 20:26 ` [RFC PATCH v3 29/30] migration: Add support for fdset with multifd + file Fabiano Rosas
2023-11-27 20:26 ` [RFC PATCH v3 30/30] tests/qtest: Add a test for fixed-ram with passing of fds Fabiano Rosas
2024-01-11 10:50 ` [RFC PATCH v3 00/30] migration: File based migration with multifd and fixed-ram Peter Xu
2024-01-11 18:38   ` Fabiano Rosas
2024-01-15  6:22     ` Peter Xu
2024-01-15  8:11       ` Daniel P. Berrangé
2024-01-15  8:41         ` Peter Xu
2024-01-15 19:45       ` Fabiano Rosas
2024-01-15 23:20         ` Peter Xu

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=ZaY6E4tefb5DGEp9@x1n \
    --to=peterx@redhat.com \
    --cc=armbru@redhat.com \
    --cc=berrange@redhat.com \
    --cc=cfontana@suse.de \
    --cc=farosas@suse.de \
    --cc=leobras@redhat.com \
    --cc=qemu-devel@nongnu.org \
    --cc=quintela@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.