From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
To: Juan Quintela <quintela@redhat.com>
Cc: qemu-devel@nongnu.org, lvivier@redhat.com, peterx@redhat.com,
berrange@redhat.com
Subject: Re: [Qemu-devel] [PATCH v5 09/17] migration: Start of multiple fd work
Date: Wed, 19 Jul 2017 18:35:14 +0100 [thread overview]
Message-ID: <20170719173513.GJ3500@work-vm> (raw)
In-Reply-To: <20170717134238.1966-10-quintela@redhat.com>
* Juan Quintela (quintela@redhat.com) wrote:
> We create new channels for each new thread created. We only send through
> them a character to be sure that we are creating the channels in the
> right order.
That text is out of date isn't it?
>
> Signed-off-by: Juan Quintela <quintela@redhat.com>
>
> --
> Split SocketArgs into incoming and outgoing args
>
> Use UUID's on the initial message, so we are sure we are connecting to
> the right channel.
>
> Remove init semaphore. Now that we use uuids on the init message, we
> know that this is our channel.
>
> Fix recv socket destwroy, we were destroying send channels.
> This was very interesting, because we were using an unreferred object
> without problems.
>
> Move to struct of pointers
> init channel sooner.
> split recv thread creation.
> listen on main thread
> ---
> migration/migration.c | 7 ++-
> migration/ram.c | 118 ++++++++++++++++++++++++++++++++++++++++++--------
> migration/ram.h | 2 +
> migration/socket.c | 38 ++++++++++++++--
> migration/socket.h | 10 +++++
> 5 files changed, 152 insertions(+), 23 deletions(-)
>
> diff --git a/migration/migration.c b/migration/migration.c
> index b81c498..e1c79d5 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -389,8 +389,13 @@ gboolean migration_ioc_process_incoming(QIOChannel *ioc)
> QEMUFile *f = qemu_fopen_channel_input(ioc);
> mis->from_src_file = f;
> migration_fd_process_incoming(f);
> + if (!migrate_use_multifd()) {
> + return FALSE;
> + } else {
> + return TRUE;
> + }
> }
> - return FALSE; /* unregister */
> + return multifd_new_channel(ioc);
> }
>
> /*
> diff --git a/migration/ram.c b/migration/ram.c
> index 8e87533..b80f511 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -36,6 +36,7 @@
> #include "xbzrle.h"
> #include "ram.h"
> #include "migration.h"
> +#include "socket.h"
> #include "migration/register.h"
> #include "migration/misc.h"
> #include "qemu-file.h"
> @@ -46,6 +47,8 @@
> #include "exec/ram_addr.h"
> #include "qemu/rcu_queue.h"
> #include "migration/colo.h"
> +#include "sysemu/sysemu.h"
> +#include "qemu/uuid.h"
>
> /***********************************************************/
> /* ram save/restore */
> @@ -361,6 +364,7 @@ static void compress_threads_save_setup(void)
> struct MultiFDSendParams {
> uint8_t id;
> QemuThread thread;
> + QIOChannel *c;
> QemuSemaphore sem;
> QemuMutex mutex;
> bool quit;
> @@ -401,6 +405,7 @@ void multifd_save_cleanup(void)
> qemu_thread_join(&p->thread);
> qemu_mutex_destroy(&p->mutex);
> qemu_sem_destroy(&p->sem);
> + socket_send_channel_destroy(p->c);
> }
> g_free(multifd_send_state->params);
> multifd_send_state->params = NULL;
> @@ -408,11 +413,38 @@ void multifd_save_cleanup(void)
> multifd_send_state = NULL;
> }
>
> +/* Default uuid for multifd when qemu is not started with uuid */
> +static char multifd_uuid[] = "5c49fd7e-af88-4a07-b6e8-091fd696ad40";
> +/* strlen(multifd) + '-' + <channel id> + '-' + UUID_FMT + '\0' */
> +#define MULTIFD_UUID_MSG (7 + 1 + 3 + 1 + UUID_FMT_LEN + 1)
> +
> static void *multifd_send_thread(void *opaque)
> {
> MultiFDSendParams *p = opaque;
> + char string[MULTIFD_UUID_MSG];
> + char *string_uuid;
> + int res;
> + bool exit = false;
>
> - while (true) {
> + if (qemu_uuid_set) {
> + string_uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
> + } else {
> + string_uuid = g_strdup(multifd_uuid);
> + }
> + res = snprintf(string, MULTIFD_UUID_MSG, "%s multifd %03d",
> + string_uuid, p->id);
> + g_free(string_uuid);
> +
> + /* -1 due to the wonders of '\0' accounting */
> + if (res != (MULTIFD_UUID_MSG - 1)) {
> + error_report("Multifd UUID message '%s' is not of right length",
> + string);
> + exit = true;
> + } else {
> + qio_channel_write(p->c, string, MULTIFD_UUID_MSG, &error_abort);
> + }
> +
> + while (!exit) {
> qemu_mutex_lock(&p->mutex);
> if (p->quit) {
> qemu_mutex_unlock(&p->mutex);
> @@ -445,6 +477,12 @@ int multifd_save_setup(void)
> qemu_sem_init(&p->sem, 0);
> p->quit = false;
> p->id = i;
> + p->c = socket_send_channel_create();
> + if (!p->c) {
> + error_report("Error creating a send channel");
> + multifd_save_cleanup();
> + return -1;
> + }
> snprintf(thread_name, sizeof(thread_name), "multifdsend_%d", i);
> qemu_thread_create(&p->thread, thread_name, multifd_send_thread, p,
> QEMU_THREAD_JOINABLE);
> @@ -456,6 +494,7 @@ int multifd_save_setup(void)
> struct MultiFDRecvParams {
> uint8_t id;
> QemuThread thread;
> + QIOChannel *c;
> QemuSemaphore sem;
> QemuMutex mutex;
> bool quit;
> @@ -463,7 +502,7 @@ struct MultiFDRecvParams {
> typedef struct MultiFDRecvParams MultiFDRecvParams;
>
> struct {
> - MultiFDRecvParams *params;
> + MultiFDRecvParams **params;
Probably want to push that upto where you added that struct?
> /* number of created threads */
> int count;
> } *multifd_recv_state;
> @@ -473,7 +512,7 @@ static void terminate_multifd_recv_threads(void)
> int i;
>
> for (i = 0; i < multifd_recv_state->count; i++) {
> - MultiFDRecvParams *p = &multifd_recv_state->params[i];
> + MultiFDRecvParams *p = multifd_recv_state->params[i];
>
> qemu_mutex_lock(&p->mutex);
> p->quit = true;
> @@ -491,11 +530,13 @@ void multifd_load_cleanup(void)
> }
> terminate_multifd_recv_threads();
> for (i = 0; i < multifd_recv_state->count; i++) {
> - MultiFDRecvParams *p = &multifd_recv_state->params[i];
> + MultiFDRecvParams *p = multifd_recv_state->params[i];
>
> qemu_thread_join(&p->thread);
> qemu_mutex_destroy(&p->mutex);
> qemu_sem_destroy(&p->sem);
> + socket_recv_channel_destroy(p->c);
> + g_free(p);
> }
> g_free(multifd_recv_state->params);
> multifd_recv_state->params = NULL;
> @@ -520,31 +561,70 @@ static void *multifd_recv_thread(void *opaque)
> return NULL;
> }
>
> +gboolean multifd_new_channel(QIOChannel *ioc)
> +{
> + int thread_count = migrate_multifd_threads();
> + MultiFDRecvParams *p = g_new0(MultiFDRecvParams, 1);
> + MigrationState *s = migrate_get_current();
> + char string[MULTIFD_UUID_MSG];
> + char string_uuid[UUID_FMT_LEN];
> + char *uuid;
> + int id;
> +
> + qio_channel_read(ioc, string, sizeof(string), &error_abort);
> + sscanf(string, "%s multifd %03d", string_uuid, &id);
> +
> + if (qemu_uuid_set) {
> + uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
> + } else {
> + uuid = g_strdup(multifd_uuid);
> + }
> + if (strcmp(string_uuid, uuid)) {
> + error_report("multifd: received uuid '%s' and expected uuid '%s'",
> + string_uuid, uuid);
probably worth adding the channel id as well so we can see
when it fails.
> + migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
> + MIGRATION_STATUS_FAILED);
> + terminate_multifd_recv_threads();
> + return FALSE;
> + }
> + g_free(uuid);
> +
> + if (multifd_recv_state->params[id] != NULL) {
> + error_report("multifd: received id '%d' is already setup'", id);
> + migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
> + MIGRATION_STATUS_FAILED);
> + terminate_multifd_recv_threads();
> + return FALSE;
> + }
> + qemu_mutex_init(&p->mutex);
> + qemu_sem_init(&p->sem, 0);
> + p->quit = false;
> + p->id = id;
> + p->c = ioc;
> + atomic_set(&multifd_recv_state->params[id], p);
Can you explain why this is quite so careful about ordering ? Is there
something that could look at params or try and take the mutex before
the count is incremented?
I think it's safe to do:
p->quit = false;
p->id = id;
p->c = ioc;
&multifd_recv_state->params[id] = p;
qemu_sem_init(&p->sem, 0);
qemu_mutex_init(&p->mutex);
qemu_thread_create(...)
atomic_inc(&multifd_recv_state->count); <-- I'm not sure if this
needs to be atomic
> + qemu_thread_create(&p->thread, "multifd_recv", multifd_recv_thread, p,
> + QEMU_THREAD_JOINABLE);
You've lost the nice numbered thread names you had created in the
previous version of this that you're removing.
> + multifd_recv_state->count++;
> +
> + /* We need to return FALSE for the last channel */
> + if (multifd_recv_state->count == thread_count) {
> + return FALSE;
> + } else {
> + return TRUE;
> + }
return multifd_recv_state->count != thread_count; ?
> +}
> +
> int multifd_load_setup(void)
> {
> int thread_count;
> - uint8_t i;
>
> if (!migrate_use_multifd()) {
> return 0;
> }
> thread_count = migrate_multifd_threads();
> multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
> - multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
> + multifd_recv_state->params = g_new0(MultiFDRecvParams *, thread_count);
> multifd_recv_state->count = 0;
> - for (i = 0; i < thread_count; i++) {
> - char thread_name[16];
> - MultiFDRecvParams *p = &multifd_recv_state->params[i];
> -
> - qemu_mutex_init(&p->mutex);
> - qemu_sem_init(&p->sem, 0);
> - p->quit = false;
> - p->id = i;
> - snprintf(thread_name, sizeof(thread_name), "multifdrecv_%d", i);
> - qemu_thread_create(&p->thread, thread_name, multifd_recv_thread, p,
> - QEMU_THREAD_JOINABLE);
> - multifd_recv_state->count++;
> - }
> return 0;
> }
>
> diff --git a/migration/ram.h b/migration/ram.h
> index 93c2bb4..9413544 100644
> --- a/migration/ram.h
> +++ b/migration/ram.h
> @@ -31,6 +31,7 @@
>
> #include "qemu-common.h"
> #include "exec/cpu-common.h"
> +#include "io/channel.h"
>
> extern MigrationStats ram_counters;
> extern XBZRLECacheStats xbzrle_counters;
> @@ -43,6 +44,7 @@ int multifd_save_setup(void);
> void multifd_save_cleanup(void);
> int multifd_load_setup(void);
> void multifd_load_cleanup(void);
> +gboolean multifd_new_channel(QIOChannel *ioc);
>
> uint64_t ram_pagesize_summary(void);
> int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len);
> diff --git a/migration/socket.c b/migration/socket.c
> index 6195596..32a6b39 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -26,6 +26,38 @@
> #include "io/channel-socket.h"
> #include "trace.h"
>
> +int socket_recv_channel_destroy(QIOChannel *recv)
> +{
> + /* Remove channel */
> + object_unref(OBJECT(recv));
> + return 0;
> +}
> +
> +struct SocketOutgoingArgs {
> + SocketAddress *saddr;
> + Error **errp;
> +} outgoing_args;
> +
> +QIOChannel *socket_send_channel_create(void)
> +{
> + QIOChannelSocket *sioc = qio_channel_socket_new();
> +
> + qio_channel_socket_connect_sync(sioc, outgoing_args.saddr,
> + outgoing_args.errp);
> + qio_channel_set_delay(QIO_CHANNEL(sioc), false);
> + return QIO_CHANNEL(sioc);
> +}
> +
> +int socket_send_channel_destroy(QIOChannel *send)
> +{
> + /* Remove channel */
> + object_unref(OBJECT(send));
> + if (outgoing_args.saddr) {
> + qapi_free_SocketAddress(outgoing_args.saddr);
> + outgoing_args.saddr = NULL;
> + }
> + return 0;
> +}
>
> static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
> {
> @@ -96,6 +128,9 @@ static void socket_start_outgoing_migration(MigrationState *s,
> struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
>
> data->s = s;
> + outgoing_args.saddr = saddr;
> + outgoing_args.errp = errp;
> +
> if (saddr->type == SOCKET_ADDRESS_TYPE_INET) {
> data->hostname = g_strdup(saddr->u.inet.host);
> }
> @@ -106,7 +141,6 @@ static void socket_start_outgoing_migration(MigrationState *s,
> socket_outgoing_migration,
> data,
> socket_connect_data_free);
> - qapi_free_SocketAddress(saddr);
> }
>
> void tcp_start_outgoing_migration(MigrationState *s,
> @@ -151,8 +185,6 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
>
> qio_channel_set_name(QIO_CHANNEL(sioc), "migration-socket-incoming");
> result = migration_channel_process_incoming(QIO_CHANNEL(sioc));
> - object_unref(OBJECT(sioc));
> -
> out:
> if (result == FALSE) {
> /* Close listening socket as its no longer needed */
> diff --git a/migration/socket.h b/migration/socket.h
> index 6b91e9d..dabce0e 100644
> --- a/migration/socket.h
> +++ b/migration/socket.h
> @@ -16,6 +16,16 @@
>
> #ifndef QEMU_MIGRATION_SOCKET_H
> #define QEMU_MIGRATION_SOCKET_H
> +
> +#include "io/channel.h"
> +
> +QIOChannel *socket_recv_channel_create(void);
> +int socket_recv_channel_destroy(QIOChannel *recv);
> +
> +QIOChannel *socket_send_channel_create(void);
> +
> +int socket_send_channel_destroy(QIOChannel *send);
> +
> void tcp_start_incoming_migration(const char *host_port, Error **errp);
>
> void tcp_start_outgoing_migration(MigrationState *s, const char *host_port,
> --
> 2.9.4
Dave
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
next prev parent reply other threads:[~2017-07-19 17:35 UTC|newest]
Thread overview: 93+ messages / expand[flat|nested] mbox.gz Atom feed top
2017-07-17 13:42 [Qemu-devel] [PATCH v5 00/17] Multifd Juan Quintela
2017-07-17 13:42 ` [Qemu-devel] [PATCH v5 01/17] migrate: Add gboolean return type to migrate_channel_process_incoming Juan Quintela
2017-07-19 15:01 ` Dr. David Alan Gilbert
2017-07-20 7:00 ` Peter Xu
2017-07-20 8:47 ` Daniel P. Berrange
2017-07-24 10:18 ` Juan Quintela
2017-07-17 13:42 ` [Qemu-devel] [PATCH v5 02/17] migration: Create migration_ioc_process_incoming() Juan Quintela
2017-07-19 13:38 ` Daniel P. Berrange
2017-07-24 11:09 ` Juan Quintela
2017-07-17 13:42 ` [Qemu-devel] [PATCH v5 03/17] qio: Create new qio_channel_{readv, writev}_all Juan Quintela
2017-07-19 13:44 ` Daniel P. Berrange
2017-08-08 8:40 ` Juan Quintela
2017-08-08 9:25 ` Daniel P. Berrange
2017-07-19 15:42 ` Dr. David Alan Gilbert
2017-07-19 15:43 ` Daniel P. Berrange
2017-07-19 16:04 ` Dr. David Alan Gilbert
2017-07-19 16:08 ` Daniel P. Berrange
2017-07-17 13:42 ` [Qemu-devel] [PATCH v5 04/17] migration: Add multifd capability Juan Quintela
2017-07-19 15:44 ` Dr. David Alan Gilbert
2017-08-08 8:42 ` Juan Quintela
2017-07-19 17:14 ` Eric Blake
2017-07-17 13:42 ` [Qemu-devel] [PATCH v5 05/17] migration: Create x-multifd-threads parameter Juan Quintela
2017-07-19 16:00 ` Dr. David Alan Gilbert
2017-08-08 8:46 ` Juan Quintela
2017-08-08 9:44 ` Dr. David Alan Gilbert
2017-07-17 13:42 ` [Qemu-devel] [PATCH v5 06/17] migration: Create x-multifd-group parameter Juan Quintela
2017-07-17 13:42 ` [Qemu-devel] [PATCH v5 07/17] migration: Create multifd migration threads Juan Quintela
2017-07-19 16:49 ` Dr. David Alan Gilbert
2017-08-08 8:58 ` Juan Quintela
2017-07-17 13:42 ` [Qemu-devel] [PATCH v5 08/17] migration: Split migration_fd_process_incomming Juan Quintela
2017-07-19 17:08 ` Dr. David Alan Gilbert
2017-07-21 12:39 ` Eric Blake
2017-07-17 13:42 ` [Qemu-devel] [PATCH v5 09/17] migration: Start of multiple fd work Juan Quintela
2017-07-19 13:56 ` Daniel P. Berrange
2017-07-19 17:35 ` Dr. David Alan Gilbert [this message]
2017-08-08 9:35 ` Juan Quintela
2017-08-08 9:54 ` Dr. David Alan Gilbert
2017-07-20 9:34 ` Peter Xu
2017-08-08 9:19 ` Juan Quintela
2017-08-09 8:08 ` Peter Xu
2017-08-09 11:12 ` Juan Quintela
2017-07-17 13:42 ` [Qemu-devel] [PATCH v5 10/17] migration: Create ram_multifd_page Juan Quintela
2017-07-19 19:02 ` Dr. David Alan Gilbert
2017-07-20 8:10 ` Peter Xu
2017-07-20 11:48 ` Dr. David Alan Gilbert
2017-08-08 15:58 ` Juan Quintela
2017-08-08 16:04 ` Juan Quintela
2017-08-09 7:42 ` Peter Xu
2017-08-08 15:56 ` Juan Quintela
2017-08-08 16:30 ` Dr. David Alan Gilbert
2017-08-08 18:02 ` Juan Quintela
2017-08-08 19:14 ` Dr. David Alan Gilbert
2017-08-09 16:48 ` Paolo Bonzini
2017-07-17 13:42 ` [Qemu-devel] [PATCH v5 11/17] migration: Really use multiple pages at a time Juan Quintela
2017-07-19 13:58 ` Daniel P. Berrange
2017-08-08 11:55 ` Juan Quintela
2017-07-20 9:44 ` Dr. David Alan Gilbert
2017-08-08 12:11 ` Juan Quintela
2017-07-20 9:49 ` Peter Xu
2017-07-20 10:09 ` Peter Xu
2017-08-08 16:06 ` Juan Quintela
2017-08-09 7:48 ` Peter Xu
2017-08-09 8:05 ` Juan Quintela
2017-08-09 8:12 ` Peter Xu
2017-07-17 13:42 ` [Qemu-devel] [PATCH v5 12/17] migration: Send the fd number which we are going to use for this page Juan Quintela
2017-07-20 9:58 ` Dr. David Alan Gilbert
2017-08-09 16:48 ` Paolo Bonzini
2017-07-17 13:42 ` [Qemu-devel] [PATCH v5 13/17] migration: Create thread infrastructure for multifd recv side Juan Quintela
2017-07-20 10:22 ` Peter Xu
2017-08-08 11:41 ` Juan Quintela
2017-08-09 5:53 ` Peter Xu
2017-07-20 10:29 ` Dr. David Alan Gilbert
2017-08-08 11:51 ` Juan Quintela
2017-07-17 13:42 ` [Qemu-devel] [PATCH v5 14/17] migration: Delay the start of reception on main channel Juan Quintela
2017-07-20 10:56 ` Dr. David Alan Gilbert
2017-08-08 11:29 ` Juan Quintela
2017-07-20 11:10 ` Peter Xu
2017-08-08 11:30 ` Juan Quintela
2017-07-17 13:42 ` [Qemu-devel] [PATCH v5 15/17] migration: Test new fd infrastructure Juan Quintela
2017-07-20 11:20 ` Dr. David Alan Gilbert
2017-07-17 13:42 ` [Qemu-devel] [PATCH v5 16/17] migration: Transfer pages over new channels Juan Quintela
2017-07-20 11:31 ` Dr. David Alan Gilbert
2017-08-08 11:13 ` Juan Quintela
2017-08-08 11:32 ` Dr. David Alan Gilbert
2017-07-17 13:42 ` [Qemu-devel] [PATCH v5 17/17] migration: Flush receive queue Juan Quintela
2017-07-20 11:45 ` Dr. David Alan Gilbert
2017-08-08 10:43 ` Juan Quintela
2017-08-08 11:25 ` Dr. David Alan Gilbert
2017-07-21 2:40 ` Peter Xu
2017-08-08 11:40 ` Juan Quintela
2017-08-10 6:49 ` Peter Xu
2017-07-21 6:03 ` Peter Xu
2017-07-21 10:53 ` Juan Quintela
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20170719173513.GJ3500@work-vm \
--to=dgilbert@redhat.com \
--cc=berrange@redhat.com \
--cc=lvivier@redhat.com \
--cc=peterx@redhat.com \
--cc=qemu-devel@nongnu.org \
--cc=quintela@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).