From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
To: Juan Quintela <quintela@redhat.com>
Cc: qemu-devel@nongnu.org, lvivier@redhat.com, peterx@redhat.com
Subject: Re: [Qemu-devel] [PATCH v10 07/14] migration: Create thread infrastructure for multifd recv side
Date: Wed, 24 Jan 2018 13:34:58 +0000 [thread overview]
Message-ID: <20180124133458.GA2497@work-vm> (raw)
In-Reply-To: <20180110124723.11879-8-quintela@redhat.com>
* Juan Quintela (quintela@redhat.com) wrote:
> We make the locking and the transfer of information specific, even if we
> are still receiving things through the main thread.
>
> Signed-off-by: Juan Quintela <quintela@redhat.com>
It might be better to split the change to
migration_incoming_process/setup out into a separate patch.
I'm not too sure what it's trying to do here.
> --
>
> We split when we create the main channel and where we start the main
> migration thread, so we wait for the creation of the other threads.
>
> Use multifd_clear_pages().
> Don't remove object_unref()
> We use correctly the channel numbres
> Denife multifd_new_channel/multifd_recv_thread in this patch, that is
> where it is used.
> rename migrate_new_channel to migrate_new_send_channel
> Add ToDo comment
> Add trace
> ---
> migration/migration.c | 5 +-
> migration/migration.h | 1 +
> migration/ram.c | 129 ++++++++++++++++++++++++++++++++++++++++++++++---
> migration/socket.c | 3 ++
> migration/trace-events | 2 +
> 5 files changed, 132 insertions(+), 8 deletions(-)
>
> diff --git a/migration/migration.c b/migration/migration.c
> index 77fc17f723..1545f3a0b0 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -406,7 +406,7 @@ static void migration_incoming_setup(QEMUFile *f)
> qemu_file_set_blocking(f, false);
> }
>
> -static void migration_incoming_process(void)
> +void migration_incoming_process(void)
> {
> Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, NULL);
> qemu_coroutine_enter(co);
> @@ -424,7 +424,8 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
>
> if (!mis->from_src_file) {
> QEMUFile *f = qemu_fopen_channel_input(ioc);
> - migration_fd_process_incoming(f);
> + migration_incoming_setup(f);
> + return;
> }
> multifd_recv_new_channel(ioc);
> }
> diff --git a/migration/migration.h b/migration/migration.h
> index 29a7b79a39..7de193a9c0 100644
> --- a/migration/migration.h
> +++ b/migration/migration.h
> @@ -172,6 +172,7 @@ void migrate_set_state(int *state, int old_state, int new_state);
>
> void migration_fd_process_incoming(QEMUFile *f);
> void migration_ioc_process_incoming(QIOChannel *ioc);
> +void migration_incoming_process(void);
>
> bool migration_has_all_channels(void);
>
> diff --git a/migration/ram.c b/migration/ram.c
> index 19c8089c4b..8443806f12 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -660,13 +660,20 @@ static uint16_t multifd_send_page(RAMBlock *block, ram_addr_t offset,
> }
>
mostly same comments as the sent side:
> struct MultiFDRecvParams {
> + /* not changed */
?
> uint8_t id;
> char *name;
> QemuThread thread;
> QIOChannel *c;
> + QemuSemaphore ready;
> QemuSemaphore sem;
> QemuMutex mutex;
> + /* proteced by param mutex */
Typo: 't'
> bool quit;
> + /* how many patches has sent this channel */
s/patches/packets/
> + uint32_t packets_recv;
> + multifd_pages_t *pages;
> + bool done;
> };
> typedef struct MultiFDRecvParams MultiFDRecvParams;
>
> @@ -676,6 +683,7 @@ struct {
> int count;
> /* Should we finish */
> bool quit;
> + multifd_pages_t *pages;
> } *multifd_recv_state;
>
> static void terminate_multifd_recv_threads(Error *errp)
> @@ -721,15 +729,51 @@ int multifd_load_cleanup(Error **errp)
> socket_recv_channel_unref(p->c);
> g_free(p->name);
> p->name = NULL;
> + multifd_pages_clear(p->pages);
> + p->pages = NULL;
> }
> g_free(multifd_recv_state->params);
> multifd_recv_state->params = NULL;
> + multifd_pages_clear(multifd_recv_state->pages);
> + multifd_recv_state->pages = NULL;
> g_free(multifd_recv_state);
> multifd_recv_state = NULL;
>
> return ret;
> }
>
> +static void *multifd_recv_thread(void *opaque)
> +{
> + MultiFDRecvParams *p = opaque;
> +
> + qemu_sem_post(&p->ready);
> + while (true) {
> + qemu_mutex_lock(&p->mutex);
> + if (p->quit) {
> + qemu_mutex_unlock(&p->mutex);
> + break;
> + }
> + if (p->pages->used) {
> + p->pages->used = 0;
> +
> + trace_multifd_recv(p->id, p->pages->seq, p->pages->used);
Same as on send, ->used is 0 here
> + /* ToDo: receive pages here */
> +
> + p->done = true;
> + p->packets_recv++;
> + qemu_mutex_unlock(&p->mutex);
> + qemu_sem_post(&p->ready);
> + continue;
> + }
> + qemu_mutex_unlock(&p->mutex);
> + qemu_sem_wait(&p->sem);
> + }
> + trace_multifd_recv_thread(p->id, p->packets_recv);
> +
> + return NULL;
> +}
> +
> int multifd_load_setup(void)
> {
> int thread_count;
> @@ -742,6 +786,8 @@ int multifd_load_setup(void)
> multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
> multifd_recv_state->count = 0;
> multifd_recv_state->quit = false;
> + multifd_pages_init(&multifd_recv_state->pages,
> + migrate_multifd_page_count());
What happens if the migrate parameter for multifd-page-count is
different on source and dest?
> return 0;
> }
>
> @@ -752,7 +798,80 @@ int multifd_created_channels(void)
>
> void multifd_recv_new_channel(QIOChannel *ioc)
> {
> - socket_recv_channel_unref(ioc);
> + MultiFDRecvParams *p;
> + MultiFDInit_t msg;
> + Error *local_err = NULL;
> + size_t ret;
> +
> + ret = qio_channel_read_all(ioc, (char *)&msg, sizeof(msg), &local_err);
> + if (ret != 0) {
> + terminate_multifd_recv_threads(local_err);
> + return;
> + }
> +
> + if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
> + char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
> + error_setg(&local_err, "multifd: received uuid '%s' and expected "
> + "uuid '%s' for channel %hhd", msg.uuid, uuid, msg.id);
> + g_free(uuid);
> + terminate_multifd_recv_threads(local_err);
> + return;
> + }
> +
> + p = &multifd_recv_state->params[msg.id];
msg.id needs to be checked against the size of params (thread_count ?)
before that array index.
> + if (p->id != 0) {
> + error_setg(&local_err, "multifd: received id '%d' already setup'",
> + msg.id);
> + terminate_multifd_recv_threads(local_err);
> + return;
> + }
> + qemu_mutex_init(&p->mutex);
> + qemu_sem_init(&p->sem, 0);
> + qemu_sem_init(&p->ready, 0);
> + p->quit = false;
> + p->id = msg.id;
> + p->done = false;
> + multifd_pages_init(&p->pages, migrate_multifd_page_count());
> + p->c = ioc;
> + multifd_recv_state->count++;
> + p->name = g_strdup_printf("multifdrecv_%d", msg.id);
> + socket_recv_channel_ref(ioc);
> +
> + qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
> + QEMU_THREAD_JOINABLE);
> + if (multifd_recv_state->count == migrate_multifd_channels()) {
> + migration_incoming_process();
> + }
> +}
> +
> +static void multifd_recv_page(RAMBlock *block, ram_addr_t offset,
> + uint8_t *address, uint16_t fd_num)
> +{
> + int thread_count;
> + MultiFDRecvParams *p;
> + multifd_pages_t *pages = multifd_recv_state->pages;
> +
> + pages->iov[pages->used].iov_base = address;
> + pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
> + pages->used++;
> +
> + if (fd_num == MULTIFD_CONTINUE) {
> + return;
> + }
> +
> + thread_count = migrate_multifd_channels();
> + assert(fd_num < thread_count);
> + p = &multifd_recv_state->params[fd_num];
> +
> + qemu_sem_wait(&p->ready);
> +
> + qemu_mutex_lock(&p->mutex);
> + p->done = false;
> + p->pages->used = 0;
> + multifd_recv_state->pages = p->pages;
> + p->pages = pages;
This needs a comment; I'm a bit confused - too many things called
'pages' to know what is going where. I think you've just waited for
one of the threads to get data, then passed the received data
out into multifd_recv_state? But then the last p->pages = pages I'm
not sure.
> + qemu_mutex_unlock(&p->mutex);
> + qemu_sem_post(&p->sem);
> }
>
> /**
> @@ -3042,6 +3161,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
> }
>
> while (!postcopy_running && !ret && !(flags & RAM_SAVE_FLAG_EOS)) {
> + RAMBlock *block = NULL;
> ram_addr_t addr, total_ram_bytes;
> void *host = NULL;
> uint16_t fd_num;
> @@ -3066,7 +3186,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
> if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
> RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
> RAM_SAVE_FLAG_MULTIFD_PAGE)) {
> - RAMBlock *block = ram_block_from_stream(f, flags);
> + block = ram_block_from_stream(f, flags);
>
> host = host_from_ram_block_offset(block, addr);
> if (!host) {
> @@ -3157,10 +3277,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>
> case RAM_SAVE_FLAG_MULTIFD_PAGE:
> fd_num = qemu_get_be16(f);
> - if (fd_num != 0) {
> - /* this is yet an unused variable, changed later */
> - fd_num = fd_num;
> - }
> + multifd_recv_page(block, addr, host, fd_num);
> qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
> break;
>
> diff --git a/migration/socket.c b/migration/socket.c
> index aedabee8a1..ba23442175 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -192,6 +192,9 @@ out:
> if (migration_has_all_channels()) {
> /* Close listening socket as its no longer needed */
> qio_channel_close(ioc, NULL);
> + if (!migrate_use_multifd()) {
> + migration_incoming_process();
> + }
> return G_SOURCE_REMOVE;
> } else {
> return G_SOURCE_CONTINUE;
> diff --git a/migration/trace-events b/migration/trace-events
> index 61ee21a13e..06aef4d4a3 100644
> --- a/migration/trace-events
> +++ b/migration/trace-events
> @@ -77,7 +77,9 @@ ram_load_postcopy_loop(uint64_t addr, int flags) "@%" PRIx64 " %x"
> ram_postcopy_send_discard_bitmap(void) ""
> ram_save_page(const char *rbname, uint64_t offset, void *host) "%s: offset: 0x%" PRIx64 " host: %p"
> ram_save_queue_pages(const char *rbname, size_t start, size_t len) "%s: start: 0x%zx len: 0x%zx"
> +multifd_recv(char id, int seq, int num) "channel %d sequence %d num pages %d"
> multifd_send(char id, int seq, int num) "channel %d sequence %d num pages %d"
> +multifd_recv_thread(char id, uint32_t packets) "channel %d packets %d"
> multifd_send_thread(char id, uint32_t packets) "channel %d packets %d"
>
> # migration/migration.c
> --
> 2.14.3
>
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
next prev parent reply other threads:[~2018-01-24 13:35 UTC|newest]
Thread overview: 26+ messages / expand[flat|nested] mbox.gz Atom feed top
2018-01-10 12:47 [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 01/14] migration: Make migrate_fd_error() the owner of the Error Juan Quintela
2018-01-12 18:50 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 02/14] migration: Rename initial_bytes Juan Quintela
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 03/14] migration: Drop current address parameter from save_zero_page() Juan Quintela
2018-01-12 18:56 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 04/14] migration: Start of multiple fd work Juan Quintela
2018-01-22 7:00 ` Peter Xu
2018-01-23 19:52 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 05/14] migration: Create ram_multifd_page Juan Quintela
2018-01-23 20:16 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 06/14] migration: Send the fd number which we are going to use for this page Juan Quintela
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 07/14] migration: Create thread infrastructure for multifd recv side Juan Quintela
2018-01-24 13:34 ` Dr. David Alan Gilbert [this message]
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 08/14] migration: Transfer pages over new channels Juan Quintela
2018-01-24 13:46 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 09/14] migration: Flush receive queue Juan Quintela
2018-01-24 14:12 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 10/14] migration: Add multifd test Juan Quintela
2018-01-24 14:23 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 11/14] LOCAL: use trace events for migration-test Juan Quintela
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 12/14] migration: Sent the page list over the normal thread Juan Quintela
2018-01-24 14:29 ` Dr. David Alan Gilbert
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 13/14] migration: Add multifd_send_packet trace Juan Quintela
2018-01-10 12:47 ` [Qemu-devel] [PATCH v10 14/14] all works Juan Quintela
2018-01-10 15:01 ` [Qemu-devel] [RFC 00/14] Multifd Juan Quintela
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20180124133458.GA2497@work-vm \
--to=dgilbert@redhat.com \
--cc=lvivier@redhat.com \
--cc=peterx@redhat.com \
--cc=qemu-devel@nongnu.org \
--cc=quintela@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.