From: Peter Xu <peterx@redhat.com>
To: Juan Quintela <quintela@redhat.com>
Cc: qemu-devel@nongnu.org, dgilbert@redhat.com, lvivier@redhat.com
Subject: Re: [Qemu-devel] [PATCH v10 04/14] migration: Start of multiple fd work
Date: Mon, 22 Jan 2018 15:00:42 +0800 [thread overview]
Message-ID: <20180122070042.GA29532@xz-mi> (raw)
In-Reply-To: <20180110124723.11879-5-quintela@redhat.com>
On Wed, Jan 10, 2018 at 01:47:13PM +0100, Juan Quintela wrote:
> We create new channels for each new thread created. We send through
> them in a packed struct. This way we can check we connect the right
> channels in both sides.
>
> Signed-off-by: Juan Quintela <quintela@redhat.com>
>
> --
> Split SocketArgs into incoming and outgoing args
>
> Use UUID's on the initial message, so we are sure we are connecting to
> the right channel.
>
> Remove init semaphore. Now that we use uuids on the init message, we
> know that this is our channel.
>
> Fix recv socket destwroy, we were destroying send channels.
> This was very interesting, because we were using an unreferred object
> without problems.
>
> Move to struct of pointers
> init channel sooner.
> split recv thread creation.
> listen on main thread
> We count the number of created threads to know when we need to stop listening
> Use g_strdup_printf
> report channel id on errors
> Add name parameter
> Use local_err
> Add Error * parameter to socket_send_channel_create()
> Use qio_channel_*_all
> Use asynchronous connect
> Use an struct to send all fields
> Use default uuid
> Fix local_err = NULL (dave)
> Make lines 80 lines long (checkpatch)
> Move multifd_new_channel() and multifd_recv_thread() to later patches
> when used.
> Add __attribute__(packad)
> Use UUIDs are opaques isntead of the ASCII represantation
> rename migrate_new_channel_async to migrate_new_send_channel_async
> rename recv_channel_destroy to _unref. And create the pairing _ref.
> ---
> migration/migration.c | 7 +++-
> migration/ram.c | 114 +++++++++++++++++++++++++++++++++++---------------
> migration/ram.h | 3 ++
> migration/socket.c | 39 ++++++++++++++++-
> migration/socket.h | 10 +++++
> 5 files changed, 137 insertions(+), 36 deletions(-)
>
> diff --git a/migration/migration.c b/migration/migration.c
> index e506b9c2c6..77fc17f723 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -426,7 +426,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
> QEMUFile *f = qemu_fopen_channel_input(ioc);
> migration_fd_process_incoming(f);
> }
> - /* We still only have a single channel. Nothing to do here yet */
> + multifd_recv_new_channel(ioc);
> }
>
> /**
> @@ -437,6 +437,11 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
> */
> bool migration_has_all_channels(void)
> {
> + if (migrate_use_multifd()) {
> + int thread_count = migrate_multifd_channels();
> +
> + return thread_count == multifd_created_channels();
> + }
> return true;
> }
>
> diff --git a/migration/ram.c b/migration/ram.c
> index 5a109efeda..aef5a323f3 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -36,6 +36,7 @@
> #include "xbzrle.h"
> #include "ram.h"
> #include "migration.h"
> +#include "socket.h"
> #include "migration/register.h"
> #include "migration/misc.h"
> #include "qemu-file.h"
> @@ -49,6 +50,8 @@
> #include "qemu/rcu_queue.h"
> #include "migration/colo.h"
> #include "migration/block.h"
> +#include "sysemu/sysemu.h"
> +#include "qemu/uuid.h"
>
> /***********************************************************/
> /* ram save/restore */
> @@ -396,6 +399,7 @@ struct MultiFDSendParams {
> uint8_t id;
> char *name;
> QemuThread thread;
> + QIOChannel *c;
> QemuSemaphore sem;
> QemuMutex mutex;
> bool quit;
> @@ -412,6 +416,15 @@ static void terminate_multifd_send_threads(Error *errp)
> {
> int i;
>
> + if (errp) {
> + MigrationState *s = migrate_get_current();
> + migrate_set_error(s, errp);
> + if (s->state == MIGRATION_STATUS_SETUP ||
> + s->state == MIGRATION_STATUS_ACTIVE) {
> + migrate_set_state(&s->state, s->state,
> + MIGRATION_STATUS_FAILED);
I'm fine with it, but could I ask why we explicitly pass the error
into this function and handle the state machine transition here?
Asked since IMHO we know the error already when calling
terminate_multifd_send_threads() because we passed it in as parameter,
then why not we do that there?
[1]
> + }
> + }
> for (i = 0; i < multifd_send_state->count; i++) {
> MultiFDSendParams *p = &multifd_send_state->params[i];
>
> @@ -437,6 +450,7 @@ int multifd_save_cleanup(Error **errp)
> qemu_thread_join(&p->thread);
> qemu_mutex_destroy(&p->mutex);
> qemu_sem_destroy(&p->sem);
> + socket_send_channel_destroy(p->c);
> g_free(p->name);
> p->name = NULL;
> }
> @@ -447,9 +461,27 @@ int multifd_save_cleanup(Error **errp)
> return ret;
> }
>
> +typedef struct {
> + uint32_t version;
> + unsigned char uuid[16]; /* QemuUUID */
> + uint8_t id;
> +} __attribute__((packed)) MultiFDInit_t;
> +
> static void *multifd_send_thread(void *opaque)
> {
> MultiFDSendParams *p = opaque;
> + MultiFDInit_t msg;
> + Error *local_err = NULL;
> + size_t ret;
> +
> + msg.version = 1;
> + msg.id = p->id;
> + memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
I don't remember whether I asked before, but... do we need to handle
the BE/LE convertion? Say, do we need to support migration to happen
between BE/LE hosts with multifd?
> + ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), &local_err);
> + if (ret != 0) {
> + terminate_multifd_send_threads(local_err);
Hmm... so we are in a thread handler, but we are calling the global
cleanup function for all the threads. Will this function be called
more than once? I think it'll also be called in
multifd_save_cleanup() at last too (that seems to be N+1 times)?
Shall we just keep the only one in multifd_save_cleanup() since after
all we should reach there when error happens?
> + return NULL;
> + }
>
> while (true) {
> qemu_mutex_lock(&p->mutex);
> @@ -464,6 +496,27 @@ static void *multifd_send_thread(void *opaque)
> return NULL;
> }
>
> +static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
> +{
> + MultiFDSendParams *p = opaque;
> + QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
> + Error *local_err = NULL;
> +
> + if (qio_task_propagate_error(task, &local_err)) {
> + if (multifd_save_cleanup(&local_err) != 0) {
Similar question here: are we calling multifd_save_cleanup() (which is
a function to clean up everything related to multifd) in a per-channel
handler? Will it be called more than once somehow?
> + migrate_set_error(migrate_get_current(), local_err);
> + }
> + } else {
> + p->c = QIO_CHANNEL(sioc);
> + qio_channel_set_delay(p->c, false);
> +
> + qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
> + QEMU_THREAD_JOINABLE);
> +
> + multifd_send_state->count++;
> + }
> +}
> +
> int multifd_save_setup(void)
> {
> int thread_count;
> @@ -484,10 +537,7 @@ int multifd_save_setup(void)
> p->quit = false;
> p->id = i;
> p->name = g_strdup_printf("multifdsend_%d", i);
> - qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
> - QEMU_THREAD_JOINABLE);
> -
> - multifd_send_state->count++;
> + socket_send_channel_create(multifd_new_send_channel_async, p);
> }
> return 0;
> }
> @@ -496,6 +546,7 @@ struct MultiFDRecvParams {
> uint8_t id;
> char *name;
> QemuThread thread;
> + QIOChannel *c;
> QemuSemaphore sem;
> QemuMutex mutex;
> bool quit;
> @@ -506,12 +557,25 @@ struct {
> MultiFDRecvParams *params;
> /* number of created threads */
> int count;
> + /* Should we finish */
> + bool quit;
> } *multifd_recv_state;
>
> static void terminate_multifd_recv_threads(Error *errp)
> {
> int i;
>
> + if (errp) {
> + MigrationState *s = migrate_get_current();
> + migrate_set_error(s, errp);
> + if (s->state == MIGRATION_STATUS_SETUP ||
> + s->state == MIGRATION_STATUS_ACTIVE) {
> + migrate_set_state(&s->state, s->state,
> + MIGRATION_STATUS_FAILED);
> + }
Similar question like above [1]. Would it be better to handle the
error before calling terminate_multifd_recv_threads(), so that we can
avoid touching the state machine in multifd code?
> + }
> + multifd_recv_state->quit = true;
> +
> for (i = 0; i < multifd_recv_state->count; i++) {
> MultiFDRecvParams *p = &multifd_recv_state->params[i];
>
> @@ -537,6 +601,7 @@ int multifd_load_cleanup(Error **errp)
> qemu_thread_join(&p->thread);
> qemu_mutex_destroy(&p->mutex);
> qemu_sem_destroy(&p->sem);
> + socket_recv_channel_unref(p->c);
> g_free(p->name);
> p->name = NULL;
> }
> @@ -548,27 +613,9 @@ int multifd_load_cleanup(Error **errp)
> return ret;
> }
>
> -static void *multifd_recv_thread(void *opaque)
> -{
> - MultiFDRecvParams *p = opaque;
> -
> - while (true) {
> - qemu_mutex_lock(&p->mutex);
> - if (p->quit) {
> - qemu_mutex_unlock(&p->mutex);
> - break;
> - }
> - qemu_mutex_unlock(&p->mutex);
> - qemu_sem_wait(&p->sem);
> - }
> -
> - return NULL;
> -}
> -
> int multifd_load_setup(void)
> {
> int thread_count;
> - uint8_t i;
>
> if (!migrate_use_multifd()) {
> return 0;
> @@ -577,21 +624,20 @@ int multifd_load_setup(void)
> multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
> multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
> multifd_recv_state->count = 0;
> - for (i = 0; i < thread_count; i++) {
> - MultiFDRecvParams *p = &multifd_recv_state->params[i];
> -
> - qemu_mutex_init(&p->mutex);
> - qemu_sem_init(&p->sem, 0);
> - p->quit = false;
> - p->id = i;
> - p->name = g_strdup_printf("multifdrecv_%d", i);
> - qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
> - QEMU_THREAD_JOINABLE);
> - multifd_recv_state->count++;
> - }
> + multifd_recv_state->quit = false;
> return 0;
> }
>
> +int multifd_created_channels(void)
> +{
> + return multifd_recv_state->count;
> +}
> +
> +void multifd_recv_new_channel(QIOChannel *ioc)
> +{
> + socket_recv_channel_unref(ioc);
> +}
> +
> /**
> * save_page_header: write page header to wire
> *
> diff --git a/migration/ram.h b/migration/ram.h
> index 64d81e9f1d..be7d09d0ec 100644
> --- a/migration/ram.h
> +++ b/migration/ram.h
> @@ -31,6 +31,7 @@
>
> #include "qemu-common.h"
> #include "exec/cpu-common.h"
> +#include "io/channel.h"
>
> extern MigrationStats ram_counters;
> extern XBZRLECacheStats xbzrle_counters;
> @@ -43,6 +44,8 @@ int multifd_save_setup(void);
> int multifd_save_cleanup(Error **errp);
> int multifd_load_setup(void);
> int multifd_load_cleanup(Error **errp);
> +int multifd_created_channels(void);
> +void multifd_recv_new_channel(QIOChannel *ioc);
>
> uint64_t ram_pagesize_summary(void);
> int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len);
> diff --git a/migration/socket.c b/migration/socket.c
> index 6d49903978..aedabee8a1 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -27,6 +27,39 @@
> #include "io/channel-socket.h"
> #include "trace.h"
>
> +int socket_recv_channel_ref(QIOChannel *recv)
> +{
> + object_ref(OBJECT(recv));
> + return 0;
> +}
> +
> +int socket_recv_channel_unref(QIOChannel *recv)
> +{
> + object_unref(OBJECT(recv));
> + return 0;
> +}
> +
> +struct SocketOutgoingArgs {
> + SocketAddress *saddr;
> +} outgoing_args;
> +
> +void socket_send_channel_create(void (*f)(QIOTask *, gpointer), void *data)
> +{
> + QIOChannelSocket *sioc = qio_channel_socket_new();
> + qio_channel_socket_connect_async(sioc, outgoing_args.saddr,
> + f, data, NULL);
> +}
> +
> +int socket_send_channel_destroy(QIOChannel *send)
> +{
> + /* Remove channel */
> + object_unref(OBJECT(send));
> + if (outgoing_args.saddr) {
> + qapi_free_SocketAddress(outgoing_args.saddr);
> + outgoing_args.saddr = NULL;
I would still prefer to free global variables somewhere else, rather
than doing it in per-channel destructor. At least IMHO if we free
globals in socket_send_channel_destroy() then the function will not be
thread-safe, or we can have double free here?
Thanks,
--
Peter Xu
next prev parent reply other threads:[~2018-01-22 7:00 UTC|newest]
Thread overview: 26+ messages / expand[flat|nested] mbox.gz Atom feed top
2018-01-10 12:47 [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 01/14] migration: Make migrate_fd_error() the owner of the Error Juan Quintela
2018-01-12 18:50 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 02/14] migration: Rename initial_bytes Juan Quintela
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 03/14] migration: Drop current address parameter from save_zero_page() Juan Quintela
2018-01-12 18:56 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 04/14] migration: Start of multiple fd work Juan Quintela
2018-01-22 7:00 ` Peter Xu [this message]
2018-01-23 19:52 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 05/14] migration: Create ram_multifd_page Juan Quintela
2018-01-23 20:16 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 06/14] migration: Send the fd number which we are going to use for this page Juan Quintela
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 07/14] migration: Create thread infrastructure for multifd recv side Juan Quintela
2018-01-24 13:34 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 08/14] migration: Transfer pages over new channels Juan Quintela
2018-01-24 13:46 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 09/14] migration: Flush receive queue Juan Quintela
2018-01-24 14:12 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 10/14] migration: Add multifd test Juan Quintela
2018-01-24 14:23 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 11/14] LOCAL: use trace events for migration-test Juan Quintela
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 12/14] migration: Sent the page list over the normal thread Juan Quintela
2018-01-24 14:29 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 13/14] migration: Add multifd_send_packet trace Juan Quintela
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 14/14] all works Juan Quintela
2018-01-10 15:01 ` [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20180122070042.GA29532@xz-mi \
--to=peterx@redhat.com \
--cc=dgilbert@redhat.com \
--cc=lvivier@redhat.com \
--cc=qemu-devel@nongnu.org \
--cc=quintela@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).