All of lore.kernel.org
 help / color / mirror / Atom feed
From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
To: Juan Quintela <quintela@redhat.com>
Cc: "Fam Zheng" <fam@euphon.net>,
	"Laurent Vivier" <lvivier@redhat.com>,
	"Thomas Huth" <thuth@redhat.com>,
	"Daniel P. Berrangé" <berrange@redhat.com>,
	"Eduardo Habkost" <ehabkost@redhat.com>,
	"Philippe Mathieu-Daudé" <philmd@redhat.com>,
	qemu-devel@nongnu.org, "Markus Armbruster" <armbru@redhat.com>,
	"Paolo Bonzini" <pbonzini@redhat.com>,
	"Alex Bennée" <alex.bennee@linaro.org>
Subject: Re: [PATCH v6 3/8] multifd: Make no compression operations into its own structure
Date: Fri, 14 Feb 2020 10:11:52 +0000	[thread overview]
Message-ID: <20200214101152.GA3283@work-vm> (raw)
In-Reply-To: <20200213211709.59065-4-quintela@redhat.com>

* Juan Quintela (quintela@redhat.com) wrote:
> It will be used later.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> 
> ---
> 
> No comp value needs to be zero.
> ---
>  migration/migration.c |   9 ++
>  migration/migration.h |   1 +
>  migration/multifd.c   | 185 ++++++++++++++++++++++++++++++++++++++++--
>  migration/multifd.h   |  26 ++++++
>  migration/ram.c       |   1 +
>  5 files changed, 214 insertions(+), 8 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index bc744d1734..eb7b0a2df0 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -2245,6 +2245,15 @@ int migrate_multifd_channels(void)
>      return s->parameters.multifd_channels;
>  }
>  
> +MultiFDCompression migrate_multifd_compression(void)
> +{
> +    MigrationState *s;
> +
> +    s = migrate_get_current();
> +
> +    return s->parameters.multifd_compression;
> +}
> +
>  int migrate_use_xbzrle(void)
>  {
>      MigrationState *s;
> diff --git a/migration/migration.h b/migration/migration.h
> index 8473ddfc88..59fea02482 100644
> --- a/migration/migration.h
> +++ b/migration/migration.h
> @@ -300,6 +300,7 @@ bool migrate_auto_converge(void);
>  bool migrate_use_multifd(void);
>  bool migrate_pause_before_switchover(void);
>  int migrate_multifd_channels(void);
> +MultiFDCompression migrate_multifd_compression(void);
>  
>  int migrate_use_xbzrle(void);
>  int64_t migrate_xbzrle_cache_size(void);
> diff --git a/migration/multifd.c b/migration/multifd.c
> index b3e8ae9bcc..97433e5135 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -38,6 +38,134 @@ typedef struct {
>      uint64_t unused2[4];    /* Reserved for future use */
>  } __attribute__((packed)) MultiFDInit_t;
>  
> +/* Multifd without compression */
> +
> +/**
> + * nocomp_send_setup: setup send side
> + *
> + * For no compression this function does nothing.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
> +{
> +    return 0;
> +}
> +
> +/**
> + * nocomp_send_cleanup: cleanup send side
> + *
> + * For no compression this function does nothing.
> + *
> + * @p: Params for the channel that we are using
> + */
> +static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
> +{
> +    return;
> +}
> +
> +/**
> + * nocomp_send_prepare: prepare date to be able to send
> + *
> + * For no compression we just have to calculate the size of the
> + * packet.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @used: number of pages used
> + * @errp: pointer to an error
> + */
> +static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used,
> +                               Error **errp)
> +{
> +    p->next_packet_size = used * qemu_target_page_size();
> +    p->flags |= MULTIFD_FLAG_NOCOMP;
> +    return 0;
> +}
> +
> +/**
> + * nocomp_send_write: do the actual write of the data
> + *
> + * For no compression we just have to write the data.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @used: number of pages used
> + * @errp: pointer to an error
> + */
> +static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
> +{
> +    return qio_channel_writev_all(p->c, p->pages->iov, used, errp);
> +}
> +
> +/**
> + * nocomp_recv_setup: setup receive side
> + *
> + * For no compression this function does nothing.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
> +{
> +    return 0;
> +}
> +
> +/**
> + * nocomp_recv_cleanup: setup receive side
> + *
> + * For no compression this function does nothing.
> + *
> + * @p: Params for the channel that we are using
> + */
> +static void nocomp_recv_cleanup(MultiFDRecvParams *p)
> +{
> +}
> +
> +/**
> + * nocomp_recv_pages: read the data from the channel into actual pages
> + *
> + * For no compression we just need to read things into the correct place.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @used: number of pages used
> + * @errp: pointer to an error
> + */
> +static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
> +{
> +    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
> +
> +    if (flags != MULTIFD_FLAG_NOCOMP) {
> +        error_setg(errp, "multifd %d: flags received %x flags expected %x",
> +                   p->id, flags, MULTIFD_FLAG_NOCOMP);
> +        return -1;
> +    }
> +    return qio_channel_readv_all(p->c, p->pages->iov, used, errp);
> +}
> +
> +static MultiFDMethods multifd_nocomp_ops = {
> +    .send_setup = nocomp_send_setup,
> +    .send_cleanup = nocomp_send_cleanup,
> +    .send_prepare = nocomp_send_prepare,
> +    .send_write = nocomp_send_write,
> +    .recv_setup = nocomp_recv_setup,
> +    .recv_cleanup = nocomp_recv_cleanup,
> +    .recv_pages = nocomp_recv_pages
> +};
> +
> +static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
> +    [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
> +};
> +
>  static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
>  {
>      MultiFDInit_t msg = {};
> @@ -246,6 +374,8 @@ struct {
>       * We will use atomic operations.  Only valid values are 0 and 1.
>       */
>      int exiting;
> +    /* multifd ops */
> +    MultiFDMethods *ops;
>  } *multifd_send_state;
>  
>  /*
> @@ -397,6 +527,7 @@ void multifd_save_cleanup(void)
>      }
>      for (i = 0; i < migrate_multifd_channels(); i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
> +        Error *local_err = NULL;
>  
>          socket_send_channel_destroy(p->c);
>          p->c = NULL;
> @@ -410,6 +541,10 @@ void multifd_save_cleanup(void)
>          p->packet_len = 0;
>          g_free(p->packet);
>          p->packet = NULL;
> +        multifd_send_state->ops->send_cleanup(p, &local_err);
> +        if (local_err) {
> +            migrate_set_error(migrate_get_current(), local_err);
> +        }
>      }
>      qemu_sem_destroy(&multifd_send_state->channels_ready);
>      g_free(multifd_send_state->params);
> @@ -494,7 +629,14 @@ static void *multifd_send_thread(void *opaque)
>              uint64_t packet_num = p->packet_num;
>              flags = p->flags;
>  
> -            p->next_packet_size = used * qemu_target_page_size();
> +            if (used) {
> +                ret = multifd_send_state->ops->send_prepare(p, used,
> +                                                            &local_err);
> +                if (ret != 0) {
> +                    qemu_mutex_unlock(&p->mutex);
> +                    break;
> +                }
> +            }
>              multifd_send_fill_packet(p);
>              p->flags = 0;
>              p->num_packets++;
> @@ -513,8 +655,7 @@ static void *multifd_send_thread(void *opaque)
>              }
>  
>              if (used) {
> -                ret = qio_channel_writev_all(p->c, p->pages->iov,
> -                                             used, &local_err);
> +                ret = multifd_send_state->ops->send_write(p, used, &local_err);
>                  if (ret != 0) {
>                      break;
>                  }
> @@ -604,6 +745,7 @@ int multifd_save_setup(Error **errp)
>      multifd_send_state->pages = multifd_pages_init(page_count);
>      qemu_sem_init(&multifd_send_state->channels_ready, 0);
>      atomic_set(&multifd_send_state->exiting, 0);
> +    multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
>  
>      for (i = 0; i < thread_count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
> @@ -623,6 +765,18 @@ int multifd_save_setup(Error **errp)
>          p->name = g_strdup_printf("multifdsend_%d", i);
>          socket_send_channel_create(multifd_new_send_channel_async, p);
>      }
> +
> +    for (i = 0; i < thread_count; i++) {
> +        MultiFDSendParams *p = &multifd_send_state->params[i];
> +        Error *local_err = NULL;
> +        int ret;
> +
> +        ret = multifd_send_state->ops->send_setup(p, &local_err);
> +        if (ret) {
> +            error_propagate(errp, local_err);
> +            return ret;
> +        }
> +    }
>      return 0;
>  }
>  
> @@ -634,6 +788,8 @@ struct {
>      QemuSemaphore sem_sync;
>      /* global number of generated multifd packets */
>      uint64_t packet_num;
> +    /* multifd ops */
> +    MultiFDMethods *ops;
>  } *multifd_recv_state;
>  
>  static void multifd_recv_terminate_threads(Error *err)
> @@ -673,7 +829,6 @@ static void multifd_recv_terminate_threads(Error *err)
>  int multifd_load_cleanup(Error **errp)
>  {
>      int i;
> -    int ret = 0;
>  
>      if (!migrate_use_multifd()) {
>          return 0;
> @@ -706,6 +861,7 @@ int multifd_load_cleanup(Error **errp)
>          p->packet_len = 0;
>          g_free(p->packet);
>          p->packet = NULL;
> +        multifd_recv_state->ops->recv_cleanup(p);
>      }
>      qemu_sem_destroy(&multifd_recv_state->sem_sync);
>      g_free(multifd_recv_state->params);
> @@ -713,7 +869,7 @@ int multifd_load_cleanup(Error **errp)
>      g_free(multifd_recv_state);
>      multifd_recv_state = NULL;
>  
> -    return ret;
> +    return 0;
>  }
>  
>  void multifd_recv_sync_main(void)
> @@ -778,6 +934,8 @@ static void *multifd_recv_thread(void *opaque)
>  
>          used = p->pages->used;
>          flags = p->flags;
> +        /* recv methods don't know how to handle the SYNC flag */
> +        p->flags &= ~MULTIFD_FLAG_SYNC;
>          trace_multifd_recv(p->id, p->packet_num, used, flags,
>                             p->next_packet_size);
>          p->num_packets++;
> @@ -785,8 +943,7 @@ static void *multifd_recv_thread(void *opaque)
>          qemu_mutex_unlock(&p->mutex);
>  
>          if (used) {
> -            ret = qio_channel_readv_all(p->c, p->pages->iov,
> -                                        used, &local_err);
> +            ret = multifd_recv_state->ops->recv_pages(p, used, &local_err);
>              if (ret != 0) {
>                  break;
>              }
> @@ -825,6 +982,7 @@ int multifd_load_setup(Error **errp)
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
>      atomic_set(&multifd_recv_state->count, 0);
>      qemu_sem_init(&multifd_recv_state->sem_sync, 0);
> +    multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
>  
>      for (i = 0; i < thread_count; i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
> @@ -839,6 +997,18 @@ int multifd_load_setup(Error **errp)
>          p->packet = g_malloc0(p->packet_len);
>          p->name = g_strdup_printf("multifdrecv_%d", i);
>      }
> +
> +    for (i = 0; i < thread_count; i++) {
> +        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +        Error *local_err = NULL;
> +        int ret;
> +
> +        ret = multifd_recv_state->ops->recv_setup(p, &local_err);
> +        if (ret) {
> +            error_propagate(errp, local_err);
> +            return ret;
> +        }
> +    }
>      return 0;
>  }
>  
> @@ -896,4 +1066,3 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
>      return atomic_read(&multifd_recv_state->count) ==
>             migrate_multifd_channels();
>  }
> -
> diff --git a/migration/multifd.h b/migration/multifd.h
> index d8b0205977..54075ffa7d 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -25,6 +25,11 @@ int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset);
>  
>  #define MULTIFD_FLAG_SYNC (1 << 0)
>  
> +/* We reserve 3 bits for compression methods */
> +#define MULTIFD_FLAG_COMPRESSION_MASK (7 << 1)
> +/* we need to be compatible. Before compression value was 0 */
> +#define MULTIFD_FLAG_NOCOMP (0 << 1)
> +
>  /* This value needs to be a multiple of qemu_target_page_size() */
>  #define MULTIFD_PACKET_SIZE (512 * 1024)
>  
> @@ -96,6 +101,8 @@ typedef struct {
>      uint64_t num_pages;
>      /* syncs main thread and channels */
>      QemuSemaphore sem_sync;
> +    /* used for compression methods */
> +    void *data;
>  }  MultiFDSendParams;
>  
>  typedef struct {
> @@ -133,7 +140,26 @@ typedef struct {
>      uint64_t num_pages;
>      /* syncs main thread and channels */
>      QemuSemaphore sem_sync;
> +    /* used for de-compression methods */
> +    void *data;
>  } MultiFDRecvParams;
>  
> +typedef struct {
> +    /* Setup for sending side */
> +    int (*send_setup)(MultiFDSendParams *p, Error **errp);
> +    /* Cleanup for sending side */
> +    void (*send_cleanup)(MultiFDSendParams *p, Error **errp);
> +    /* Prepare the send packet */
> +    int (*send_prepare)(MultiFDSendParams *p, uint32_t used, Error **errp);
> +    /* Write the send packet */
> +    int (*send_write)(MultiFDSendParams *p, uint32_t used, Error **errp);
> +    /* Setup for receiving side */
> +    int (*recv_setup)(MultiFDRecvParams *p, Error **errp);
> +    /* Cleanup for receiving side */
> +    void (*recv_cleanup)(MultiFDRecvParams *p);
> +    /* Read all pages */
> +    int (*recv_pages)(MultiFDRecvParams *p, uint32_t used, Error **errp);
> +} MultiFDMethods;
> +
>  #endif
>  
> diff --git a/migration/ram.c b/migration/ram.c
> index ed23ed1c7c..73a141bb60 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -43,6 +43,7 @@
>  #include "page_cache.h"
>  #include "qemu/error-report.h"
>  #include "qapi/error.h"
> +#include "qapi/qapi-types-migration.h"
>  #include "qapi/qapi-events-migration.h"
>  #include "qapi/qmp/qerror.h"
>  #include "trace.h"
> -- 
> 2.24.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK



  reply	other threads:[~2020-02-14 10:12 UTC|newest]

Thread overview: 12+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-02-13 21:17 [PATCH v6 0/8] Multifd Migration Compression Juan Quintela
2020-02-13 21:17 ` [PATCH v6 1/8] multifd: Add multifd-compression parameter Juan Quintela
2020-02-13 21:17 ` [PATCH v6 2/8] migration: Add support for modules Juan Quintela
2020-02-13 21:17 ` [PATCH v6 3/8] multifd: Make no compression operations into its own structure Juan Quintela
2020-02-14 10:11   ` Dr. David Alan Gilbert [this message]
2020-02-13 21:17 ` [PATCH v6 4/8] multifd: Add multifd-zlib-level parameter Juan Quintela
2020-02-13 21:17 ` [PATCH v6 5/8] multifd: Add zlib compression multifd support Juan Quintela
2020-02-14 15:23   ` Dr. David Alan Gilbert
2020-02-13 21:17 ` [PATCH v6 6/8] configure: Enable test and libs for zstd Juan Quintela
2020-02-13 21:17 ` [PATCH v6 7/8] multifd: Add multifd-zstd-level parameter Juan Quintela
2020-02-27 20:53   ` Peter Xu
2020-02-13 21:17 ` [PATCH v6 8/8] multifd: Add zstd compression multifd support Juan Quintela

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20200214101152.GA3283@work-vm \
    --to=dgilbert@redhat.com \
    --cc=alex.bennee@linaro.org \
    --cc=armbru@redhat.com \
    --cc=berrange@redhat.com \
    --cc=ehabkost@redhat.com \
    --cc=fam@euphon.net \
    --cc=lvivier@redhat.com \
    --cc=pbonzini@redhat.com \
    --cc=philmd@redhat.com \
    --cc=qemu-devel@nongnu.org \
    --cc=quintela@redhat.com \
    --cc=thuth@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.