All of lore.kernel.org
 help / color / mirror / Atom feed
From: "Daniel P. Berrange" <berrange@redhat.com>
To: Juan Quintela <quintela@redhat.com>
Cc: qemu-devel@nongnu.org, lvivier@redhat.com, dgilbert@redhat.com,
	peterx@redhat.com
Subject: Re: [Qemu-devel] [PATCH v8 12/20] migration: Start of multiple fd work
Date: Wed, 13 Sep 2017 12:35:34 +0100	[thread overview]
Message-ID: <20170913113534.GC3067@redhat.com> (raw)
In-Reply-To: <20170913105953.13760-13-quintela@redhat.com>

On Wed, Sep 13, 2017 at 12:59:45PM +0200, Juan Quintela wrote:
> We create new channels for each new thread created. We send through
> them a string containing <uuid> multifd <channel number> so we are
> sure that we connect the right channels in both sides.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> --
> Split SocketArgs into incoming and outgoing args
> 
> Use UUID's on the initial message, so we are sure we are connecting to
> the right channel.
> 
> Remove init semaphore.  Now that we use uuids on the init message, we
> know that this is our channel.
> 
> Fix recv socket destwroy, we were destroying send channels.
> This was very interesting, because we were using an unreferred object
> without problems.
> 
> Move to struct of pointers
> init channel sooner.
> split recv thread creation.
> listen on main thread
> We count the number of created threads to know when we need to stop listening
> Use g_strdup_printf
> report channel id on errors
> Add name parameter
> Use local_err
> Add Error * parameter to socket_send_channel_create()
> Use qio_channel_*_all
> Use asynchronous connect
> ---
>  migration/migration.c |   5 ++
>  migration/ram.c       | 138 +++++++++++++++++++++++++++++++++++++++++++-------
>  migration/ram.h       |   3 ++
>  migration/socket.c    |  34 ++++++++++++-
>  migration/socket.h    |  10 ++++
>  5 files changed, 172 insertions(+), 18 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index 1401841997..679be8e8d4 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -419,6 +419,11 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
>   */
>  bool migration_has_all_channels(void)
>  {
> +    if (migrate_use_multifd()) {
> +        int thread_count = migrate_multifd_channels();
> +
> +        return thread_count == multifd_created_channels();
> +    }
>      return true;
>  }
>  
> diff --git a/migration/ram.c b/migration/ram.c
> index a3e2abb2a5..8577eeb032 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -36,6 +36,7 @@
>  #include "xbzrle.h"
>  #include "ram.h"
>  #include "migration.h"
> +#include "socket.h"
>  #include "migration/register.h"
>  #include "migration/misc.h"
>  #include "qemu-file.h"
> @@ -46,6 +47,8 @@
>  #include "exec/ram_addr.h"
>  #include "qemu/rcu_queue.h"
>  #include "migration/colo.h"
> +#include "sysemu/sysemu.h"
> +#include "qemu/uuid.h"
>  
>  /***********************************************************/
>  /* ram save/restore */
> @@ -362,6 +365,7 @@ struct MultiFDSendParams {
>      uint8_t id;
>      char *name;
>      QemuThread thread;
> +    QIOChannel *c;
>      QemuSemaphore sem;
>      QemuMutex mutex;
>      bool quit;
> @@ -378,6 +382,12 @@ static void terminate_multifd_send_threads(Error *errp)
>  {
>      int i;
>  
> +    if (errp) {
> +        MigrationState *s = migrate_get_current();
> +        migrate_set_error(s, errp);
> +        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
> +                          MIGRATION_STATUS_FAILED);
> +    }
>      for (i = 0; i < multifd_send_state->count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
> @@ -403,6 +413,7 @@ int multifd_save_cleanup(Error **errp)
>          qemu_thread_join(&p->thread);
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        socket_send_channel_destroy(p->c);
>          g_free(p->name);
>          p->name = NULL;
>      }
> @@ -413,9 +424,32 @@ int multifd_save_cleanup(Error **errp)
>      return ret;
>  }
>  
> +/* Default uuid for multifd when qemu is not started with uuid */
> +static char multifd_uuid[] = "5c49fd7e-af88-4a07-b6e8-091fd696ad40";

Same comment as last time - this is pointless. You should just
unconditionally send 'qemu_uuid'.  If the user hasn't set it
via '--uuid', it'll be a fixed UUID of all-zeros which is not
a semantic problem for the usage you have here.

> +/* strlen(multifd) + '-' + <channel id> + '-' +  UUID_FMT + '\0' */
> +#define MULTIFD_UUID_MSG (7 + 1 + 3 + 1 + UUID_FMT_LEN + 1)

>  static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *p = opaque;
> +    Error *local_err = NULL;
> +    char *string;
> +    char *string_uuid;
> +    size_t ret;
> +
> +    if (qemu_uuid_set) {
> +        string_uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
> +    } else {
> +        string_uuid = g_strdup(multifd_uuid);
> +    }
> +    string = g_strdup_printf("%s multifd %03d", string_uuid, p->id);

As before, if we need to send some structured data to the other
end, we should define a struct with fields for that, not send
plain text with printf+scanf.

Even better if you put a version in the struct so that you have
something to detect if you need to add more fields at a later
date. eg

 struct MigrateMultiFDInit {
     uint32_t version;
     char uuid[32];
     uint32_t id;
 };


> +    g_free(string_uuid);
> +    ret = qio_channel_write_all(p->c, string, MULTIFD_UUID_MSG, &local_err);
> +    g_free(string);
> +    if (ret != 0) {
> +        terminate_multifd_send_threads(local_err);
> +        return NULL;
> +    }
>  
>      while (true) {
>          qemu_mutex_lock(&p->mutex);
> @@ -430,6 +464,27 @@ static void *multifd_send_thread(void *opaque)
>      return NULL;
>  }
>  
> +static void multifd_new_channel_async(QIOTask *task, gpointer opaque)
> +{
> +    MultiFDSendParams *p = opaque;
> +    QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
> +    Error *local_err;
> +
> +    if (qio_task_propagate_error(task, &local_err)) {
> +        if (multifd_save_cleanup(&local_err) != 0) {
> +            migrate_set_error(migrate_get_current(), local_err);
> +        }
> +    } else {
> +        p->c = QIO_CHANNEL(sioc);
> +        qio_channel_set_delay(p->c, false);
> +
> +        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
> +                           QEMU_THREAD_JOINABLE);
> +
> +        multifd_send_state->count++;
> +    }
> +}
> +
>  int multifd_save_setup(void)
>  {
>      int thread_count;
> @@ -450,10 +505,7 @@ int multifd_save_setup(void)
>          p->quit = false;
>          p->id = i;
>          p->name = g_strdup_printf("multifdsend_%d", i);
> -        qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
> -                           QEMU_THREAD_JOINABLE);
> -
> -        multifd_send_state->count++;
> +        socket_send_channel_create(multifd_new_channel_async, p);
>      }
>      return 0;
>  }
> @@ -462,6 +514,7 @@ struct MultiFDRecvParams {
>      uint8_t id;
>      char *name;
>      QemuThread thread;
> +    QIOChannel *c;
>      QemuSemaphore sem;
>      QemuMutex mutex;
>      bool quit;
> @@ -472,12 +525,22 @@ struct {
>      MultiFDRecvParams *params;
>      /* number of created threads */
>      int count;
> +    /* Should we finish */
> +    bool quit;
>  } *multifd_recv_state;
>  
>  static void terminate_multifd_recv_threads(Error *errp)
>  {
>      int i;
>  
> +    if (errp) {
> +        MigrationState *s = migrate_get_current();
> +        migrate_set_error(s, errp);
> +        migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
> +                          MIGRATION_STATUS_FAILED);
> +    }
> +    multifd_recv_state->quit = true;
> +
>      for (i = 0; i < multifd_recv_state->count; i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
> @@ -503,6 +566,7 @@ int multifd_load_cleanup(Error **errp)
>          qemu_thread_join(&p->thread);
>          qemu_mutex_destroy(&p->mutex);
>          qemu_sem_destroy(&p->sem);
> +        socket_recv_channel_destroy(p->c);
>          g_free(p->name);
>          p->name = NULL;
>      }
> @@ -531,10 +595,56 @@ static void *multifd_recv_thread(void *opaque)
>      return NULL;
>  }
>  
> +void multifd_new_channel(QIOChannel *ioc)
> +{
> +    MultiFDRecvParams *p;
> +    char string[MULTIFD_UUID_MSG];
> +    char string_uuid[UUID_FMT_LEN];
> +    Error *local_err = NULL;
> +    char *uuid;
> +    size_t ret;
> +    int id;
> +
> +    ret = qio_channel_read_all(ioc, string, sizeof(string), &local_err);
> +    if (ret != 0) {
> +        terminate_multifd_recv_threads(local_err);
> +        return;
> +    }

Now here initially read sizeof(struct MigrateMultiFDInit).

If we need to add extra fields in a v2 struct for example

 struct MigrateMultiFDInitV2 {
     struct MigrateMultiFDInit v1;
     uint32_t extra;
 };

This side now just has to check of version == 2, and then it can
read (sizeof(MigrateMultiFDInitV2) - sizeof(MigrateMultiFDInit))
to get the extra bytes for the new fields from v2.

This is vastly better than non-extensible printf()+scanf()


> +    sscanf(string, "%s multifd %03d", string_uuid, &id);
> +
> +    if (qemu_uuid_set) {
> +        uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
> +    } else {
> +        uuid = g_strdup(multifd_uuid);
> +    }
> +    if (strcmp(string_uuid, uuid)) {
> +        error_setg(&local_err, "multifd: received uuid '%s' and expected "
> +                   "uuid '%s' for channel %d", string_uuid, uuid, id);
> +        terminate_multifd_recv_threads(local_err);
> +        return;
> +    }
> +    g_free(uuid);

Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|

  reply	other threads:[~2017-09-13 11:35 UTC|newest]

Thread overview: 25+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2017-09-13 10:59 [Qemu-devel] [PATCH v8 00/20] Multifd Juan Quintela
2017-09-13 10:59 ` [Qemu-devel] [PATCH v8 01/20] migration: Create migration_ioc_process_incoming() Juan Quintela
2017-09-13 10:59 ` [Qemu-devel] [PATCH v8 02/20] migration: Teach it about G_SOURCE_REMOVE Juan Quintela
2017-09-13 10:59 ` [Qemu-devel] [PATCH v8 03/20] migration: Add comments to channel functions Juan Quintela
2017-09-13 10:59 ` [Qemu-devel] [PATCH v8 04/20] migration: Create migration_has_all_channels Juan Quintela
2017-09-13 10:59 ` [Qemu-devel] [PATCH v8 05/20] migration: Improve migration thread error handling Juan Quintela
2017-09-18 16:34   ` Dr. David Alan Gilbert
2017-09-13 10:59 ` [Qemu-devel] [PATCH v8 06/20] migration: Make migrate_fd_error() the owner of the Error Juan Quintela
2017-09-13 10:59 ` [Qemu-devel] [PATCH v8 07/20] migration: Add multifd capability Juan Quintela
2017-09-13 10:59 ` [Qemu-devel] [PATCH v8 08/20] migration: Create x-multifd-channels parameter Juan Quintela
2017-09-13 10:59 ` [Qemu-devel] [PATCH v8 09/20] migration: Create x-multifd-page-count parameter Juan Quintela
2017-09-13 10:59 ` [Qemu-devel] [PATCH v8 10/20] migration: Create multifd migration threads Juan Quintela
2017-09-13 10:59 ` [Qemu-devel] [PATCH v8 11/20] migration: Split migration_fd_process_incoming Juan Quintela
2017-09-13 10:59 ` [Qemu-devel] [PATCH v8 12/20] migration: Start of multiple fd work Juan Quintela
2017-09-13 11:35   ` Daniel P. Berrange [this message]
2017-09-13 11:44   ` Daniel P. Berrange
2017-09-13 10:59 ` [Qemu-devel] [PATCH v8 13/20] migration: Create ram_multifd_page Juan Quintela
2017-09-13 10:59 ` [Qemu-devel] [PATCH v8 14/20] migration: Really use multiple pages at a time Juan Quintela
2017-09-13 10:59 ` [Qemu-devel] [PATCH v8 15/20] migration: Send the fd number which we are going to use for this page Juan Quintela
2017-09-13 10:59 ` [Qemu-devel] [PATCH v8 16/20] migration: Create thread infrastructure for multifd recv side Juan Quintela
2017-09-13 11:42   ` Daniel P. Berrange
2017-09-13 10:59 ` [Qemu-devel] [PATCH v8 17/20] migration: Test new fd infrastructure Juan Quintela
2017-09-13 10:59 ` [Qemu-devel] [PATCH v8 18/20] migration: Rename initial_bytes Juan Quintela
2017-09-13 10:59 ` [Qemu-devel] [PATCH v8 19/20] migration: Transfer pages over new channels Juan Quintela
2017-09-13 10:59 ` [Qemu-devel] [PATCH v8 20/20] migration: Flush receive queue Juan Quintela

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20170913113534.GC3067@redhat.com \
    --to=berrange@redhat.com \
    --cc=dgilbert@redhat.com \
    --cc=lvivier@redhat.com \
    --cc=peterx@redhat.com \
    --cc=qemu-devel@nongnu.org \
    --cc=quintela@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.