From: Paolo Bonzini <pbonzini@redhat.com>
To: Umesh Deshpande <udeshpan@redhat.com>
Cc: kvm@vger.kernel.org, quintela@redhat.com, mtosatti@redhat.com
Subject: Re: [RFC PATCH v3 1/4] separate thread for VM migration
Date: Thu, 11 Aug 2011 18:18:54 +0200 [thread overview]
Message-ID: <4E4400EE.1030905@redhat.com> (raw)
In-Reply-To: <6ac256e1f481ea28678bae846a13714302f258db.1313076455.git.udeshpan@redhat.com>
On 08/11/2011 05:32 PM, Umesh Deshpande wrote:
> This patch creates a separate thread for the guest migration on the source side.
> migrate_cancel request from the iothread is handled asynchronously. That is,
> iothread submits migrate_cancel to the migration thread and returns, while the
> migration thread attends this request at the next iteration to terminate its
> execution.
Looks pretty good! I hope you agree. :) Just one note inside.
> Signed-off-by: Umesh Deshpande<udeshpan@redhat.com>
> ---
> buffered_file.c | 85 ++++++++++++++++++++++++++++++++----------------------
> buffered_file.h | 4 ++
> migration.c | 49 ++++++++++++++-----------------
> migration.h | 6 ++++
> 4 files changed, 82 insertions(+), 62 deletions(-)
>
> diff --git a/buffered_file.c b/buffered_file.c
> index 41b42c3..19932b6 100644
> --- a/buffered_file.c
> +++ b/buffered_file.c
> @@ -16,6 +16,8 @@
> #include "qemu-timer.h"
> #include "qemu-char.h"
> #include "buffered_file.h"
> +#include "migration.h"
> +#include "qemu-thread.h"
>
> //#define DEBUG_BUFFERED_FILE
>
> @@ -28,13 +30,14 @@ typedef struct QEMUFileBuffered
> void *opaque;
> QEMUFile *file;
> int has_error;
> + int closed;
> int freeze_output;
> size_t bytes_xfer;
> size_t xfer_limit;
> uint8_t *buffer;
> size_t buffer_size;
> size_t buffer_capacity;
> - QEMUTimer *timer;
> + QemuThread thread;
> } QEMUFileBuffered;
>
> #ifdef DEBUG_BUFFERED_FILE
> @@ -155,14 +158,6 @@ static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in
> offset = size;
> }
>
> - if (pos == 0&& size == 0) {
> - DPRINTF("file is ready\n");
> - if (s->bytes_xfer<= s->xfer_limit) {
> - DPRINTF("notifying client\n");
> - s->put_ready(s->opaque);
> - }
> - }
> -
> return offset;
> }
>
> @@ -175,20 +170,20 @@ static int buffered_close(void *opaque)
>
> while (!s->has_error&& s->buffer_size) {
> buffered_flush(s);
> - if (s->freeze_output)
> + if (s->freeze_output) {
> s->wait_for_unfreeze(s);
> + }
> }
This is racy; you might end up calling buffered_put_buffer twice from
two different threads.
> - ret = s->close(s->opaque);
> + s->closed = 1;
>
> - qemu_del_timer(s->timer);
> - qemu_free_timer(s->timer);
> + ret = s->close(s->opaque);
> qemu_free(s->buffer);
> - qemu_free(s);
... similarly, here the migration thread might end up using the buffer.
Just set s->closed here and wait for thread completion; the migration
thread can handle the flushes free the buffer etc. Let the migration
thread do as much as possible, it will simplify your life.
> return ret;
> }
>
> +
> static int buffered_rate_limit(void *opaque)
> {
> QEMUFileBuffered *s = opaque;
> @@ -228,34 +223,55 @@ static int64_t buffered_get_rate_limit(void *opaque)
> return s->xfer_limit;
> }
>
> -static void buffered_rate_tick(void *opaque)
> +static void *migrate_vm(void *opaque)
> {
> QEMUFileBuffered *s = opaque;
> + int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) + 100;
> + struct timeval tv = { .tv_sec = 0, .tv_usec = 100000};
>
> - if (s->has_error) {
> - buffered_close(s);
> - return;
> - }
> + qemu_mutex_lock_iothread();
>
> - qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
> + while (!s->closed) {
... This can be in fact
while (!s->closed || s->buffered_size)
and that alone will subsume the loop in buffered_close, no?
> + if (s->freeze_output) {
> + s->wait_for_unfreeze(s);
> + s->freeze_output = 0;
> + continue;
> + }
>
> - if (s->freeze_output)
> - return;
> + if (s->has_error) {
> + break;
> + }
> +
> + current_time = qemu_get_clock_ms(rt_clock);
> + if (!s->closed&& (expire_time> current_time)) {
> + tv.tv_usec = 1000 * (expire_time - current_time);
> + select(0, NULL, NULL, NULL,&tv);
> + continue;
> + }
>
> - s->bytes_xfer = 0;
> + s->bytes_xfer = 0;
> + buffered_flush(s);
>
> - buffered_flush(s);
> + expire_time = qemu_get_clock_ms(rt_clock) + 100;
> + s->put_ready(s->opaque);
> + }
>
> - /* Add some checks around this */
> - s->put_ready(s->opaque);
> + if (s->has_error) {
> + buffered_close(s);
> + }
> + qemu_free(s);
> +
> + qemu_mutex_unlock_iothread();
> +
> + return NULL;
> }
>
> QEMUFile *qemu_fopen_ops_buffered(void *opaque,
> - size_t bytes_per_sec,
> - BufferedPutFunc *put_buffer,
> - BufferedPutReadyFunc *put_ready,
> - BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
> - BufferedCloseFunc *close)
> + size_t bytes_per_sec,
> + BufferedPutFunc *put_buffer,
> + BufferedPutReadyFunc *put_ready,
> + BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
> + BufferedCloseFunc *close)
> {
> QEMUFileBuffered *s;
>
> @@ -267,15 +283,14 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque,
> s->put_ready = put_ready;
> s->wait_for_unfreeze = wait_for_unfreeze;
> s->close = close;
> + s->closed = 0;
>
> s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
> buffered_close, buffered_rate_limit,
> buffered_set_rate_limit,
> - buffered_get_rate_limit);
> -
> - s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
> + buffered_get_rate_limit);
>
> - qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
> + qemu_thread_create(&s->thread, migrate_vm, s);
>
> return s->file;
> }
> diff --git a/buffered_file.h b/buffered_file.h
> index 98d358b..477bf7c 100644
> --- a/buffered_file.h
> +++ b/buffered_file.h
> @@ -17,9 +17,13 @@
> #include "hw/hw.h"
>
> typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t size);
> +typedef void (BufferedBeginFunc)(void *opaque);
Unused typedef.
> typedef void (BufferedPutReadyFunc)(void *opaque);
> typedef void (BufferedWaitForUnfreezeFunc)(void *opaque);
> typedef int (BufferedCloseFunc)(void *opaque);
> +typedef void (BufferedWaitForCancelFunc)(void *opaque);
> +
> +void wait_for_cancel(void *opaque);
BufferedWaitForCancelFunc should go in patch 2; wait_for_cancel is unused.
> QEMUFile *qemu_fopen_ops_buffered(void *opaque, size_t xfer_limit,
> BufferedPutFunc *put_buffer,
> diff --git a/migration.c b/migration.c
> index af3a1f2..d8a0abb 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -284,8 +284,6 @@ int migrate_fd_cleanup(FdMigrationState *s)
> {
> int ret = 0;
>
> - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
> -
> if (s->file) {
> DPRINTF("closing file\n");
> if (qemu_fclose(s->file) != 0) {
> @@ -307,14 +305,6 @@ int migrate_fd_cleanup(FdMigrationState *s)
> return ret;
> }
>
> -void migrate_fd_put_notify(void *opaque)
> -{
> - FdMigrationState *s = opaque;
> -
> - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
> - qemu_file_put_notify(s->file);
> -}
> -
qemu_file_put_notify is also unused now.
> ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
> {
> FdMigrationState *s = opaque;
> @@ -327,9 +317,7 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
> if (ret == -1)
> ret = -(s->get_error(s));
>
> - if (ret == -EAGAIN) {
> - qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s);
> - } else if (ret< 0) {
> + if (ret< 0&& ret != -EAGAIN) {
> if (s->mon) {
> monitor_resume(s->mon);
> }
> @@ -342,36 +330,40 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
>
> void migrate_fd_connect(FdMigrationState *s)
> {
> - int ret;
> -
> + s->begin = 1;
> s->file = qemu_fopen_ops_buffered(s,
> s->bandwidth_limit,
> migrate_fd_put_buffer,
> migrate_fd_put_ready,
> migrate_fd_wait_for_unfreeze,
> migrate_fd_close);
> -
> - DPRINTF("beginning savevm\n");
> - ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
> - s->mig_state.shared);
> - if (ret< 0) {
> - DPRINTF("failed, %d\n", ret);
> - migrate_fd_error(s);
> - return;
> - }
> -
> - migrate_fd_put_ready(s);
> }
>
> void migrate_fd_put_ready(void *opaque)
> {
> FdMigrationState *s = opaque;
> + int ret;
>
> if (s->state != MIG_STATE_ACTIVE) {
> DPRINTF("put_ready returning because of non-active state\n");
> + if (s->state == MIG_STATE_CANCELLED) {
> + migrate_fd_terminate(s);
> + }
> return;
> }
>
> + if (s->begin) {
> + DPRINTF("beginning savevm\n");
> + ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
> + s->mig_state.shared);
> + if (ret< 0) {
> + DPRINTF("failed, %d\n", ret);
> + migrate_fd_error(s);
> + return;
> + }
> + s->begin = 0;
> + }
> +
> DPRINTF("iterate\n");
> if (qemu_savevm_state_iterate(s->mon, s->file) == 1) {
> int state;
> @@ -415,6 +407,10 @@ void migrate_fd_cancel(MigrationState *mig_state)
> DPRINTF("cancelling migration\n");
>
> s->state = MIG_STATE_CANCELLED;
> +}
> +
> +void migrate_fd_terminate(FdMigrationState *s)
> +{
> notifier_list_notify(&migration_state_notifiers);
> qemu_savevm_state_cancel(s->mon, s->file);
>
> @@ -458,7 +454,6 @@ int migrate_fd_close(void *opaque)
> {
> FdMigrationState *s = opaque;
>
> - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
> return s->close(s);
> }
>
> diff --git a/migration.h b/migration.h
> index 050c56c..887f84c 100644
> --- a/migration.h
> +++ b/migration.h
> @@ -45,9 +45,11 @@ struct FdMigrationState
> int fd;
> Monitor *mon;
> int state;
> + int begin;
> int (*get_error)(struct FdMigrationState*);
> int (*close)(struct FdMigrationState*);
> int (*write)(struct FdMigrationState*, const void *, size_t);
> + void (*callback)(void *);
> void *opaque;
> };
>
> @@ -118,12 +120,16 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size);
>
> void migrate_fd_connect(FdMigrationState *s);
>
> +void migrate_fd_begin(void *opaque);
> +
> void migrate_fd_put_ready(void *opaque);
>
> int migrate_fd_get_status(MigrationState *mig_state);
>
> void migrate_fd_cancel(MigrationState *mig_state);
>
> +void migrate_fd_terminate(FdMigrationState *s);
> +
> void migrate_fd_release(MigrationState *mig_state);
>
> void migrate_fd_wait_for_unfreeze(void *opaque);
next prev parent reply other threads:[~2011-08-11 16:18 UTC|newest]
Thread overview: 19+ messages / expand[flat|nested] mbox.gz Atom feed top
2011-08-11 15:32 [RFC PATCH v3 0/4] Separate thread for VM migration Umesh Deshpande
2011-08-11 15:32 ` [RFC PATCH v3 1/4] separate " Umesh Deshpande
2011-08-11 16:18 ` Paolo Bonzini [this message]
2011-08-11 17:36 ` Umesh Deshpande
2011-08-12 6:40 ` Paolo Bonzini
2011-08-11 15:32 ` [RFC PATCH v3 2/4] Making iothread block for migrate_cancel Umesh Deshpande
2011-08-11 15:32 ` [RFC PATCH v3 3/4] lock to protect memslots Umesh Deshpande
2011-08-11 16:20 ` Paolo Bonzini
2011-08-12 6:45 ` Paolo Bonzini
2011-08-15 6:45 ` Umesh Deshpande
2011-08-15 14:10 ` Paolo Bonzini
2011-08-15 7:26 ` Marcelo Tosatti
2011-08-15 14:14 ` Paolo Bonzini
2011-08-15 20:27 ` Umesh Deshpande
2011-08-16 6:15 ` Paolo Bonzini
2011-08-16 7:56 ` Paolo Bonzini
2011-08-11 15:32 ` [RFC PATCH v3 4/4] Separate migration bitmap Umesh Deshpande
2011-08-11 16:23 ` [RFC PATCH v3 0/4] Separate thread for VM migration Paolo Bonzini
2011-08-11 18:25 ` Anthony Liguori
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=4E4400EE.1030905@redhat.com \
--to=pbonzini@redhat.com \
--cc=kvm@vger.kernel.org \
--cc=mtosatti@redhat.com \
--cc=quintela@redhat.com \
--cc=udeshpan@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox