From: Peter Xu <peterx@redhat.com>
To: Juan Quintela <quintela@redhat.com>
Cc: qemu-devel@nongnu.org, dgilbert@redhat.com, lvivier@redhat.com
Subject: Re: [Qemu-devel] [PATCH v10 04/14] migration: Start of multiple fd work
Date: Mon, 22 Jan 2018 15:00:42 +0800 [thread overview]
Message-ID: <20180122070042.GA29532@xz-mi> (raw)
In-Reply-To: <20180110124723.11879-5-quintela@redhat.com>
On Wed, Jan 10, 2018 at 01:47:13PM +0100, Juan Quintela wrote:
> We create new channels for each new thread created. We send through
> them in a packed struct. This way we can check we connect the right
> channels in both sides.
>
> Signed-off-by: Juan Quintela <quintela@redhat.com>
>
> --
> Split SocketArgs into incoming and outgoing args
>
> Use UUID's on the initial message, so we are sure we are connecting to
> the right channel.
>
> Remove init semaphore. Now that we use uuids on the init message, we
> know that this is our channel.
>
> Fix recv socket destwroy, we were destroying send channels.
> This was very interesting, because we were using an unreferred object
> without problems.
>
> Move to struct of pointers
> init channel sooner.
> split recv thread creation.
> listen on main thread
> We count the number of created threads to know when we need to stop listening
> Use g_strdup_printf
> report channel id on errors
> Add name parameter
> Use local_err
> Add Error * parameter to socket_send_channel_create()
> Use qio_channel_*_all
> Use asynchronous connect
> Use an struct to send all fields
> Use default uuid
> Fix local_err = NULL (dave)
> Make lines 80 lines long (checkpatch)
> Move multifd_new_channel() and multifd_recv_thread() to later patches
> when used.
> Add __attribute__(packad)
> Use UUIDs are opaques isntead of the ASCII represantation
> rename migrate_new_channel_async to migrate_new_send_channel_async
> rename recv_channel_destroy to _unref. And create the pairing _ref.
> ---
> migration/migration.c | 7 +++-
> migration/ram.c | 114 +++++++++++++++++++++++++++++++++++---------------
> migration/ram.h | 3 ++
> migration/socket.c | 39 ++++++++++++++++-
> migration/socket.h | 10 +++++
> 5 files changed, 137 insertions(+), 36 deletions(-)
>
> diff --git a/migration/migration.c b/migration/migration.c
> index e506b9c2c6..77fc17f723 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -426,7 +426,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
> QEMUFile *f = qemu_fopen_channel_input(ioc);
> migration_fd_process_incoming(f);
> }
> - /* We still only have a single channel. Nothing to do here yet */
> + multifd_recv_new_channel(ioc);
> }
>
> /**
> @@ -437,6 +437,11 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
> */
> bool migration_has_all_channels(void)
> {
> + if (migrate_use_multifd()) {
> + int thread_count = migrate_multifd_channels();
> +
> + return thread_count == multifd_created_channels();
> + }
> return true;
> }
>
> diff --git a/migration/ram.c b/migration/ram.c
> index 5a109efeda..aef5a323f3 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -36,6 +36,7 @@
> #include "xbzrle.h"
> #include "ram.h"
> #include "migration.h"
> +#include "socket.h"
> #include "migration/register.h"
> #include "migration/misc.h"
> #include "qemu-file.h"
> @@ -49,6 +50,8 @@
> #include "qemu/rcu_queue.h"
> #include "migration/colo.h"
> #include "migration/block.h"
> +#include "sysemu/sysemu.h"
> +#include "qemu/uuid.h"
>
> /***********************************************************/
> /* ram save/restore */
> @@ -396,6 +399,7 @@ struct MultiFDSendParams {
> uint8_t id;
> char *name;
> QemuThread thread;
> + QIOChannel *c;
> QemuSemaphore sem;
> QemuMutex mutex;
> bool quit;
> @@ -412,6 +416,15 @@ static void terminate_multifd_send_threads(Error *errp)
> {
> int i;
>
> + if (errp) {
> + MigrationState *s = migrate_get_current();
> + migrate_set_error(s, errp);
> + if (s->state == MIGRATION_STATUS_SETUP ||
> + s->state == MIGRATION_STATUS_ACTIVE) {
> + migrate_set_state(&s->state, s->state,
> + MIGRATION_STATUS_FAILED);
I'm fine with it, but could I ask why we explicitly pass the error
into this function and handle the state machine transition here?
Asked since IMHO we know the error already when calling
terminate_multifd_send_threads() because we passed it in as parameter,
then why not we do that there?
[1]
> + }
> + }
> for (i = 0; i < multifd_send_state->count; i++) {
> MultiFDSendParams *p = &multifd_send_state->params[i];
>
> @@ -437,6 +450,7 @@ int multifd_save_cleanup(Error **errp)
> qemu_thread_join(&p->thread);
> qemu_mutex_destroy(&p->mutex);
> qemu_sem_destroy(&p->sem);
> + socket_send_channel_destroy(p->c);
> g_free(p->name);
> p->name = NULL;
> }
> @@ -447,9 +461,27 @@ int multifd_save_cleanup(Error **errp)
> return ret;
> }
>
> +typedef struct {
> + uint32_t version;
> + unsigned char uuid[16]; /* QemuUUID */
> + uint8_t id;
> +} __attribute__((packed)) MultiFDInit_t;
> +
> static void *multifd_send_thread(void *opaque)
> {
> MultiFDSendParams *p = opaque;
> + MultiFDInit_t msg;
> + Error *local_err = NULL;
> + size_t ret;
> +
> + msg.version = 1;
> + msg.id = p->id;
> + memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
I don't remember whether I asked before, but... do we need to handle
the BE/LE convertion? Say, do we need to support migration to happen
between BE/LE hosts with multifd?
> + ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), &local_err);
> + if (ret != 0) {
> + terminate_multifd_send_threads(local_err);
Hmm... so we are in a thread handler, but we are calling the global
cleanup function for all the threads. Will this function be called
more than once? I think it'll also be called in
multifd_save_cleanup() at last too (that seems to be N+1 times)?
Shall we just keep the only one in multifd_save_cleanup() since after
all we should reach there when error happens?
> + return NULL;
> + }
>
> while (true) {
> qemu_mutex_lock(&p->mutex);
> @@ -464,6 +496,27 @@ static void *multifd_send_thread(void *opaque)
> return NULL;
> }
>
> +static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
> +{
> + MultiFDSendParams *p = opaque;
> + QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
> + Error *local_err = NULL;
> +
> + if (qio_task_propagate_error(task, &local_err)) {
> + if (multifd_save_cleanup(&local_err) != 0) {
Similar question here: are we calling multifd_save_cleanup() (which is
a function to clean up everything related to multifd) in a per-channel
handler? Will it be called more than once somehow?
> + migrate_set_error(migrate_get_current(), local_err);
> + }
> + } else {
> + p->c = QIO_CHANNEL(sioc);
> + qio_channel_set_delay(p->c, false);
> +
> + qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
> + QEMU_THREAD_JOINABLE);
> +
> + multifd_send_state->count++;
> + }
> +}
> +
> int multifd_save_setup(void)
> {
> int thread_count;
> @@ -484,10 +537,7 @@ int multifd_save_setup(void)
> p->quit = false;
> p->id = i;
> p->name = g_strdup_printf("multifdsend_%d", i);
> - qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
> - QEMU_THREAD_JOINABLE);
> -
> - multifd_send_state->count++;
> + socket_send_channel_create(multifd_new_send_channel_async, p);
> }
> return 0;
> }
> @@ -496,6 +546,7 @@ struct MultiFDRecvParams {
> uint8_t id;
> char *name;
> QemuThread thread;
> + QIOChannel *c;
> QemuSemaphore sem;
> QemuMutex mutex;
> bool quit;
> @@ -506,12 +557,25 @@ struct {
> MultiFDRecvParams *params;
> /* number of created threads */
> int count;
> + /* Should we finish */
> + bool quit;
> } *multifd_recv_state;
>
> static void terminate_multifd_recv_threads(Error *errp)
> {
> int i;
>
> + if (errp) {
> + MigrationState *s = migrate_get_current();
> + migrate_set_error(s, errp);
> + if (s->state == MIGRATION_STATUS_SETUP ||
> + s->state == MIGRATION_STATUS_ACTIVE) {
> + migrate_set_state(&s->state, s->state,
> + MIGRATION_STATUS_FAILED);
> + }
Similar question like above [1]. Would it be better to handle the
error before calling terminate_multifd_recv_threads(), so that we can
avoid touching the state machine in multifd code?
> + }
> + multifd_recv_state->quit = true;
> +
> for (i = 0; i < multifd_recv_state->count; i++) {
> MultiFDRecvParams *p = &multifd_recv_state->params[i];
>
> @@ -537,6 +601,7 @@ int multifd_load_cleanup(Error **errp)
> qemu_thread_join(&p->thread);
> qemu_mutex_destroy(&p->mutex);
> qemu_sem_destroy(&p->sem);
> + socket_recv_channel_unref(p->c);
> g_free(p->name);
> p->name = NULL;
> }
> @@ -548,27 +613,9 @@ int multifd_load_cleanup(Error **errp)
> return ret;
> }
>
> -static void *multifd_recv_thread(void *opaque)
> -{
> - MultiFDRecvParams *p = opaque;
> -
> - while (true) {
> - qemu_mutex_lock(&p->mutex);
> - if (p->quit) {
> - qemu_mutex_unlock(&p->mutex);
> - break;
> - }
> - qemu_mutex_unlock(&p->mutex);
> - qemu_sem_wait(&p->sem);
> - }
> -
> - return NULL;
> -}
> -
> int multifd_load_setup(void)
> {
> int thread_count;
> - uint8_t i;
>
> if (!migrate_use_multifd()) {
> return 0;
> @@ -577,21 +624,20 @@ int multifd_load_setup(void)
> multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
> multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
> multifd_recv_state->count = 0;
> - for (i = 0; i < thread_count; i++) {
> - MultiFDRecvParams *p = &multifd_recv_state->params[i];
> -
> - qemu_mutex_init(&p->mutex);
> - qemu_sem_init(&p->sem, 0);
> - p->quit = false;
> - p->id = i;
> - p->name = g_strdup_printf("multifdrecv_%d", i);
> - qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
> - QEMU_THREAD_JOINABLE);
> - multifd_recv_state->count++;
> - }
> + multifd_recv_state->quit = false;
> return 0;
> }
>
> +int multifd_created_channels(void)
> +{
> + return multifd_recv_state->count;
> +}
> +
> +void multifd_recv_new_channel(QIOChannel *ioc)
> +{
> + socket_recv_channel_unref(ioc);
> +}
> +
> /**
> * save_page_header: write page header to wire
> *
> diff --git a/migration/ram.h b/migration/ram.h
> index 64d81e9f1d..be7d09d0ec 100644
> --- a/migration/ram.h
> +++ b/migration/ram.h
> @@ -31,6 +31,7 @@
>
> #include "qemu-common.h"
> #include "exec/cpu-common.h"
> +#include "io/channel.h"
>
> extern MigrationStats ram_counters;
> extern XBZRLECacheStats xbzrle_counters;
> @@ -43,6 +44,8 @@ int multifd_save_setup(void);
> int multifd_save_cleanup(Error **errp);
> int multifd_load_setup(void);
> int multifd_load_cleanup(Error **errp);
> +int multifd_created_channels(void);
> +void multifd_recv_new_channel(QIOChannel *ioc);
>
> uint64_t ram_pagesize_summary(void);
> int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len);
> diff --git a/migration/socket.c b/migration/socket.c
> index 6d49903978..aedabee8a1 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -27,6 +27,39 @@
> #include "io/channel-socket.h"
> #include "trace.h"
>
> +int socket_recv_channel_ref(QIOChannel *recv)
> +{
> + object_ref(OBJECT(recv));
> + return 0;
> +}
> +
> +int socket_recv_channel_unref(QIOChannel *recv)
> +{
> + object_unref(OBJECT(recv));
> + return 0;
> +}
> +
> +struct SocketOutgoingArgs {
> + SocketAddress *saddr;
> +} outgoing_args;
> +
> +void socket_send_channel_create(void (*f)(QIOTask *, gpointer), void *data)
> +{
> + QIOChannelSocket *sioc = qio_channel_socket_new();
> + qio_channel_socket_connect_async(sioc, outgoing_args.saddr,
> + f, data, NULL);
> +}
> +
> +int socket_send_channel_destroy(QIOChannel *send)
> +{
> + /* Remove channel */
> + object_unref(OBJECT(send));
> + if (outgoing_args.saddr) {
> + qapi_free_SocketAddress(outgoing_args.saddr);
> + outgoing_args.saddr = NULL;
I would still prefer to free global variables somewhere else, rather
than doing it in per-channel destructor. At least IMHO if we free
globals in socket_send_channel_destroy() then the function will not be
thread-safe, or we can have double free here?
Thanks,
--
Peter Xu
next prev parent reply other threads:[~2018-01-22 7:00 UTC|newest]
Thread overview: 26+ messages / expand[flat|nested] mbox.gz Atom feed top
2018-01-10 12:47 [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 01/14] migration: Make migrate_fd_error() the owner of the Error Juan Quintela
2018-01-12 18:50 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 02/14] migration: Rename initial_bytes Juan Quintela
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 03/14] migration: Drop current address parameter from save_zero_page() Juan Quintela
2018-01-12 18:56 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 04/14] migration: Start of multiple fd work Juan Quintela
2018-01-22 7:00 ` Peter Xu [this message]
2018-01-23 19:52 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 05/14] migration: Create ram_multifd_page Juan Quintela
2018-01-23 20:16 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 06/14] migration: Send the fd number which we are going to use for this page Juan Quintela
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 07/14] migration: Create thread infrastructure for multifd recv side Juan Quintela
2018-01-24 13:34 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 08/14] migration: Transfer pages over new channels Juan Quintela
2018-01-24 13:46 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 09/14] migration: Flush receive queue Juan Quintela
2018-01-24 14:12 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 10/14] migration: Add multifd test Juan Quintela
2018-01-24 14:23 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 11/14] LOCAL: use trace events for migration-test Juan Quintela
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 12/14] migration: Sent the page list over the normal thread Juan Quintela
2018-01-24 14:29 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 13/14] migration: Add multifd_send_packet trace Juan Quintela
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 14/14] all works Juan Quintela
2018-01-10 15:01 ` [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20180122070042.GA29532@xz-mi \
--to=peterx@redhat.com \
--cc=dgilbert@redhat.com \
--cc=lvivier@redhat.com \
--cc=qemu-devel@nongnu.org \
--cc=quintela@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.