From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
To: Juan Quintela <quintela@redhat.com>
Cc: "Laurent Vivier" <lvivier@redhat.com>,
"Thomas Huth" <thuth@redhat.com>,
"Daniel P. Berrangé" <berrange@redhat.com>,
"Eduardo Habkost" <ehabkost@redhat.com>,
qemu-devel@nongnu.org, "Markus Armbruster" <armbru@redhat.com>,
"Paolo Bonzini" <pbonzini@redhat.com>
Subject: Re: [PATCH v2 07/10] migration: Make no compression operations into its own structure
Date: Fri, 3 Jan 2020 18:20:38 +0000 [thread overview]
Message-ID: <20200103182038.GS3804@work-vm> (raw)
In-Reply-To: <20191218020119.3776-8-quintela@redhat.com>
* Juan Quintela (quintela@redhat.com) wrote:
> It will be used later.
>
> Signed-off-by: Juan Quintela <quintela@redhat.com>
>
> ---
> Move setup of ->ops helper to proper place (wei)
> Rename s/none/nocomp/ (dave)
> Introduce MULTIFD_FLAG_NOCOMP
> ---
> migration/migration.c | 9 ++
> migration/migration.h | 1 +
> migration/ram.c | 194 ++++++++++++++++++++++++++++++++++++++++--
> 3 files changed, 196 insertions(+), 8 deletions(-)
>
> diff --git a/migration/migration.c b/migration/migration.c
> index 93c6ed10a6..56203eb536 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -2213,6 +2213,15 @@ int migrate_multifd_channels(void)
> return s->parameters.multifd_channels;
> }
>
> +int migrate_multifd_method(void)
> +{
> + MigrationState *s;
> +
> + s = migrate_get_current();
> +
> + return s->parameters.multifd_compress;
> +}
> +
> int migrate_use_xbzrle(void)
> {
> MigrationState *s;
> diff --git a/migration/migration.h b/migration/migration.h
> index 545f283ae7..d3ea45e25a 100644
> --- a/migration/migration.h
> +++ b/migration/migration.h
> @@ -299,6 +299,7 @@ bool migrate_auto_converge(void);
> bool migrate_use_multifd(void);
> bool migrate_pause_before_switchover(void);
> int migrate_multifd_channels(void);
> +int migrate_multifd_method(void);
>
> int migrate_use_xbzrle(void);
> int64_t migrate_xbzrle_cache_size(void);
> diff --git a/migration/ram.c b/migration/ram.c
> index fcf50e648a..10661e03ae 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -44,6 +44,7 @@
> #include "page_cache.h"
> #include "qemu/error-report.h"
> #include "qapi/error.h"
> +#include "qapi/qapi-types-migration.h"
> #include "qapi/qapi-events-migration.h"
> #include "qapi/qmp/qerror.h"
> #include "trace.h"
> @@ -581,6 +582,7 @@ exit:
> #define MULTIFD_VERSION 1
>
> #define MULTIFD_FLAG_SYNC (1 << 0)
> +#define MULTIFD_FLAG_NOCOMP (1 << 1)
>
> /* This value needs to be a multiple of qemu_target_page_size() */
> #define MULTIFD_PACKET_SIZE (512 * 1024)
> @@ -662,6 +664,8 @@ typedef struct {
> uint64_t num_pages;
> /* syncs main thread and channels */
> QemuSemaphore sem_sync;
> + /* used for compression methods */
> + void *data;
> } MultiFDSendParams;
>
> typedef struct {
> @@ -699,8 +703,153 @@ typedef struct {
> uint64_t num_pages;
> /* syncs main thread and channels */
> QemuSemaphore sem_sync;
> + /* used for de-compression methods */
> + void *data;
> } MultiFDRecvParams;
>
> +typedef struct {
> + /* Setup for sending side */
> + int (*send_setup)(MultiFDSendParams *p, Error **errp);
> + /* Cleanup for sending side */
> + void (*send_cleanup)(MultiFDSendParams *p, Error **errp);
> + /* Prepare the send packet */
> + int (*send_prepare)(MultiFDSendParams *p, uint32_t used, Error **errp);
> + /* Write the send packet */
> + int (*send_write)(MultiFDSendParams *p, uint32_t used, Error **errp);
> + /* Setup for receiving side */
> + int (*recv_setup)(MultiFDRecvParams *p, Error **errp);
> + /* Cleanup for receiving side */
> + void (*recv_cleanup)(MultiFDRecvParams *p);
> + /* Read all pages */
> + int (*recv_pages)(MultiFDRecvParams *p, uint32_t used, Error **errp);
> +} MultiFDMethods;
> +
> +/* Multifd without compression */
> +
> +/**
> + * nocomp_send_setup: setup send side
> + *
> + * For no compression this function does nothing.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
> +{
> + return 0;
> +}
> +
> +/**
> + * nocomp_send_cleanup: cleanup send side
> + *
> + * For no compression this function does nothing.
> + *
> + * @p: Params for the channel that we are using
> + */
> +static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
> +{
> + return;
> +}
> +
> +/**
> + * nocomp_send_prepare: prepare date to be able to send
> + *
> + * For no compression we just have to calculate the size of the
> + * packet.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @used: number of pages used
> + * @errp: pointer to an error
> + */
> +static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used,
> + Error **errp)
> +{
> + p->next_packet_size = used * qemu_target_page_size();
> + p->flags |= MULTIFD_FLAG_NOCOMP;
> + return 0;
> +}
> +
> +/**
> + * nocomp_send_write: do the actual write of the data
> + *
> + * For no compression we just have to write the data.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @used: number of pages used
> + * @errp: pointer to an error
> + */
> +static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
> +{
> + return qio_channel_writev_all(p->c, p->pages->iov, used, errp);
> +}
> +
> +/**
> + * nocomp_recv_setup: setup receive side
> + *
> + * For no compression this function does nothing.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
> +{
> + return 0;
> +}
> +
> +/**
> + * nocomp_recv_cleanup: setup receive side
> + *
> + * For no compression this function does nothing.
> + *
> + * @p: Params for the channel that we are using
> + */
> +static void nocomp_recv_cleanup(MultiFDRecvParams *p)
> +{
> +}
> +
> +/**
> + * nocomp_recv_pages: read the data from the channel into actual pages
> + *
> + * For no compression we just need to read things into the correct place.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @used: number of pages used
> + * @errp: pointer to an error
> + */
> +static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
> +{
> + if (p->flags != MULTIFD_FLAG_NOCOMP) {
> + error_setg(errp, "multifd %d: flags received %x flags expected %x",
> + p->id, MULTIFD_FLAG_NOCOMP, p->flags);
That looks the wrong way around to me - shouldn't that be
p->flags, MULTIFD_FLAG_NOCOMP
?
> + return -1;
> + }
> + return qio_channel_readv_all(p->c, p->pages->iov, used, errp);
> +}
> +
> +static MultiFDMethods multifd_nocomp_ops = {
> + .send_setup = nocomp_send_setup,
> + .send_cleanup = nocomp_send_cleanup,
> + .send_prepare = nocomp_send_prepare,
> + .send_write = nocomp_send_write,
> + .recv_setup = nocomp_recv_setup,
> + .recv_cleanup = nocomp_recv_cleanup,
> + .recv_pages = nocomp_recv_pages
> +};
> +
> +static MultiFDMethods *multifd_ops[MULTIFD_COMPRESS__MAX] = {
> + [MULTIFD_COMPRESS_NONE] = &multifd_nocomp_ops,
> +};
> +
> static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
> {
> MultiFDInit_t msg;
> @@ -898,6 +1047,8 @@ struct {
> uint64_t packet_num;
> /* send channels ready */
> QemuSemaphore channels_ready;
> + /* multifd ops */
> + MultiFDMethods *ops;
> } *multifd_send_state;
>
> /*
> @@ -1027,6 +1178,7 @@ void multifd_save_cleanup(void)
> multifd_send_terminate_threads(NULL);
> for (i = 0; i < migrate_multifd_channels(); i++) {
> MultiFDSendParams *p = &multifd_send_state->params[i];
> + Error *local_err = NULL;
>
> if (p->running) {
> qemu_thread_join(&p->thread);
> @@ -1043,6 +1195,10 @@ void multifd_save_cleanup(void)
> p->packet_len = 0;
> g_free(p->packet);
> p->packet = NULL;
> + multifd_send_state->ops->send_cleanup(p, &local_err);
> + if (local_err) {
> + migrate_set_error(migrate_get_current(), local_err);
> + }
> }
> qemu_sem_destroy(&multifd_send_state->channels_ready);
> g_free(multifd_send_state->params);
> @@ -1123,7 +1279,14 @@ static void *multifd_send_thread(void *opaque)
> uint64_t packet_num = p->packet_num;
> flags = p->flags;
>
> - p->next_packet_size = used * qemu_target_page_size();
> + if (used) {
> + ret = multifd_send_state->ops->send_prepare(p, used,
> + &local_err);
> + if (ret != 0) {
> + qemu_mutex_unlock(&p->mutex);
> + break;
> + }
> + }
> multifd_send_fill_packet(p);
> p->flags = 0;
> p->num_packets++;
> @@ -1140,8 +1303,7 @@ static void *multifd_send_thread(void *opaque)
> }
>
> if (used) {
> - ret = qio_channel_writev_all(p->c, p->pages->iov,
> - used, &local_err);
> + ret = multifd_send_state->ops->send_write(p, used, &local_err);
> if (ret != 0) {
> break;
> }
> @@ -1212,6 +1374,7 @@ int multifd_save_setup(Error **errp)
> {
> int thread_count;
> uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
> + int ret = 0;
> uint8_t i;
>
> if (!migrate_use_multifd()) {
> @@ -1222,9 +1385,11 @@ int multifd_save_setup(Error **errp)
> multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
> multifd_send_state->pages = multifd_pages_init(page_count);
> qemu_sem_init(&multifd_send_state->channels_ready, 0);
> + multifd_send_state->ops = multifd_ops[migrate_multifd_method()];
>
> for (i = 0; i < thread_count; i++) {
> MultiFDSendParams *p = &multifd_send_state->params[i];
> + int res;
>
> qemu_mutex_init(&p->mutex);
> qemu_sem_init(&p->sem, 0);
> @@ -1240,8 +1405,12 @@ int multifd_save_setup(Error **errp)
> p->packet->version = cpu_to_be32(MULTIFD_VERSION);
> p->name = g_strdup_printf("multifdsend_%d", i);
> socket_send_channel_create(multifd_new_send_channel_async, p);
> + res = multifd_send_state->ops->send_setup(p, errp);
> + if (ret == 0) {
> + ret = res;
How do you handle the errp's here - I think that if is so that you
get the 'ret' from the first thread that fails; but I don't think you're
allowed to call that twice if the first one set it's errp.
> + }
> }
> - return 0;
> + return ret;
> }
>
> struct {
> @@ -1252,6 +1421,8 @@ struct {
> QemuSemaphore sem_sync;
> /* global number of generated multifd packets */
> uint64_t packet_num;
> + /* multifd ops */
> + MultiFDMethods *ops;
> } *multifd_recv_state;
>
> static void multifd_recv_terminate_threads(Error *err)
> @@ -1287,7 +1458,6 @@ static void multifd_recv_terminate_threads(Error *err)
> int multifd_load_cleanup(Error **errp)
> {
> int i;
> - int ret = 0;
>
> if (!migrate_use_multifd()) {
> return 0;
> @@ -1316,6 +1486,7 @@ int multifd_load_cleanup(Error **errp)
> p->packet_len = 0;
> g_free(p->packet);
> p->packet = NULL;
> + multifd_recv_state->ops->recv_cleanup(p);
> }
> qemu_sem_destroy(&multifd_recv_state->sem_sync);
> g_free(multifd_recv_state->params);
> @@ -1323,7 +1494,7 @@ int multifd_load_cleanup(Error **errp)
> g_free(multifd_recv_state);
> multifd_recv_state = NULL;
>
> - return ret;
> + return 0;
> }
>
> static void multifd_recv_sync_main(void)
> @@ -1388,6 +1559,8 @@ static void *multifd_recv_thread(void *opaque)
>
> used = p->pages->used;
> flags = p->flags;
> + /* recv methods don't know how to handle the SYNC flag */
> + p->flags &= ~MULTIFD_FLAG_SYNC;
> trace_multifd_recv(p->id, p->packet_num, used, flags,
> p->next_packet_size);
> p->num_packets++;
> @@ -1395,8 +1568,7 @@ static void *multifd_recv_thread(void *opaque)
> qemu_mutex_unlock(&p->mutex);
>
> if (used) {
> - ret = qio_channel_readv_all(p->c, p->pages->iov,
> - used, &local_err);
> + ret = multifd_recv_state->ops->recv_pages(p, used, &local_err);
> if (ret != 0) {
> break;
> }
> @@ -1435,9 +1607,11 @@ int multifd_load_setup(Error **errp)
> multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
> atomic_set(&multifd_recv_state->count, 0);
> qemu_sem_init(&multifd_recv_state->sem_sync, 0);
> + multifd_recv_state->ops = multifd_ops[migrate_multifd_method()];
>
> for (i = 0; i < thread_count; i++) {
> MultiFDRecvParams *p = &multifd_recv_state->params[i];
> + int ret;
>
> qemu_mutex_init(&p->mutex);
> qemu_sem_init(&p->sem_sync, 0);
> @@ -1448,6 +1622,10 @@ int multifd_load_setup(Error **errp)
> + sizeof(ram_addr_t) * page_count;
> p->packet = g_malloc0(p->packet_len);
> p->name = g_strdup_printf("multifdrecv_%d", i);
> + ret = multifd_recv_state->ops->recv_setup(p, errp);
> + if (ret != 0) {
> + return ret;
> + }
same question as the save case above
> }
> return 0;
> }
> --
> 2.23.0
>
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
next prev parent reply other threads:[~2020-01-03 18:21 UTC|newest]
Thread overview: 31+ messages / expand[flat|nested] mbox.gz Atom feed top
2019-12-18 2:01 [PATCH v2 00/10] Multifd Migration Compression Juan Quintela
2019-12-18 2:01 ` [PATCH v2 01/10] migration: Increase default number of multifd channels to 16 Juan Quintela
2020-01-03 16:51 ` Dr. David Alan Gilbert
2020-01-03 16:58 ` Daniel P. Berrangé
2020-01-03 17:01 ` Dr. David Alan Gilbert
2020-01-03 17:12 ` Daniel P. Berrangé
2020-01-03 17:32 ` Dr. David Alan Gilbert
2020-01-03 18:25 ` Juan Quintela
2020-01-07 12:49 ` Daniel P. Berrangé
2020-01-07 13:32 ` Juan Quintela
2020-01-07 13:42 ` Daniel P. Berrangé
2020-01-03 17:49 ` Daniel P. Berrangé
2019-12-18 2:01 ` [PATCH v2 02/10] migration-test: Add migration multifd test Juan Quintela
2019-12-18 2:01 ` [PATCH v2 03/10] migration-test: introduce functions to handle string parameters Juan Quintela
2020-01-03 16:57 ` Dr. David Alan Gilbert
2019-12-18 2:01 ` [PATCH v2 04/10] migration: Make multifd_save_setup() get an Error parameter Juan Quintela
2020-01-03 16:46 ` Dr. David Alan Gilbert
2020-01-07 12:35 ` Juan Quintela
2019-12-18 2:01 ` [PATCH v2 05/10] migration: Make multifd_load_setup() " Juan Quintela
2020-01-03 17:22 ` Dr. David Alan Gilbert
2020-01-07 13:00 ` Juan Quintela
2019-12-18 2:01 ` [PATCH v2 06/10] migration: Add multifd-compress parameter Juan Quintela
2019-12-19 7:41 ` Markus Armbruster
2020-01-03 17:57 ` Dr. David Alan Gilbert
2020-01-07 13:03 ` Juan Quintela
2019-12-18 2:01 ` [PATCH v2 07/10] migration: Make no compression operations into its own structure Juan Quintela
2020-01-03 18:20 ` Dr. David Alan Gilbert [this message]
2020-01-07 13:08 ` Juan Quintela
2019-12-18 2:01 ` [PATCH v2 08/10] migration: Add zlib compression multifd support Juan Quintela
2019-12-18 2:01 ` [PATCH v2 09/10] configure: Enable test and libs for zstd Juan Quintela
2019-12-18 2:01 ` [PATCH v2 10/10] migration: Add zstd compression multifd support Juan Quintela
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20200103182038.GS3804@work-vm \
--to=dgilbert@redhat.com \
--cc=armbru@redhat.com \
--cc=berrange@redhat.com \
--cc=ehabkost@redhat.com \
--cc=lvivier@redhat.com \
--cc=pbonzini@redhat.com \
--cc=qemu-devel@nongnu.org \
--cc=quintela@redhat.com \
--cc=thuth@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.