public inbox for kvm@vger.kernel.org
 help / color / mirror / Atom feed
From: Paolo Bonzini <pbonzini@redhat.com>
To: Umesh Deshpande <udeshpan@redhat.com>
Cc: kvm@vger.kernel.org, quintela@redhat.com, mtosatti@redhat.com
Subject: Re: [RFC PATCH v3 1/4] separate thread for VM migration
Date: Thu, 11 Aug 2011 18:18:54 +0200	[thread overview]
Message-ID: <4E4400EE.1030905@redhat.com> (raw)
In-Reply-To: <6ac256e1f481ea28678bae846a13714302f258db.1313076455.git.udeshpan@redhat.com>

On 08/11/2011 05:32 PM, Umesh Deshpande wrote:
> This patch creates a separate thread for the guest migration on the source side.
> migrate_cancel request from the iothread is handled asynchronously. That is,
> iothread submits migrate_cancel to the migration thread and returns, while the
> migration thread attends this request at the next iteration to terminate its
> execution.

Looks pretty good!  I hope you agree. :)  Just one note inside.

> Signed-off-by: Umesh Deshpande<udeshpan@redhat.com>
> ---
>   buffered_file.c |   85 ++++++++++++++++++++++++++++++++----------------------
>   buffered_file.h |    4 ++
>   migration.c     |   49 ++++++++++++++-----------------
>   migration.h     |    6 ++++
>   4 files changed, 82 insertions(+), 62 deletions(-)
>
> diff --git a/buffered_file.c b/buffered_file.c
> index 41b42c3..19932b6 100644
> --- a/buffered_file.c
> +++ b/buffered_file.c
> @@ -16,6 +16,8 @@
>   #include "qemu-timer.h"
>   #include "qemu-char.h"
>   #include "buffered_file.h"
> +#include "migration.h"
> +#include "qemu-thread.h"
>
>   //#define DEBUG_BUFFERED_FILE
>
> @@ -28,13 +30,14 @@ typedef struct QEMUFileBuffered
>       void *opaque;
>       QEMUFile *file;
>       int has_error;
> +    int closed;
>       int freeze_output;
>       size_t bytes_xfer;
>       size_t xfer_limit;
>       uint8_t *buffer;
>       size_t buffer_size;
>       size_t buffer_capacity;
> -    QEMUTimer *timer;
> +    QemuThread thread;
>   } QEMUFileBuffered;
>
>   #ifdef DEBUG_BUFFERED_FILE
> @@ -155,14 +158,6 @@ static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in
>           offset = size;
>       }
>
> -    if (pos == 0&&  size == 0) {
> -        DPRINTF("file is ready\n");
> -        if (s->bytes_xfer<= s->xfer_limit) {
> -            DPRINTF("notifying client\n");
> -            s->put_ready(s->opaque);
> -        }
> -    }
> -
>       return offset;
>   }
>
> @@ -175,20 +170,20 @@ static int buffered_close(void *opaque)
>
>       while (!s->has_error&&  s->buffer_size) {
>           buffered_flush(s);
> -        if (s->freeze_output)
> +        if (s->freeze_output) {
>               s->wait_for_unfreeze(s);
> +        }
>       }

This is racy; you might end up calling buffered_put_buffer twice from 
two different threads.

> -    ret = s->close(s->opaque);
> +    s->closed = 1;
>
> -    qemu_del_timer(s->timer);
> -    qemu_free_timer(s->timer);
> +    ret = s->close(s->opaque);
>       qemu_free(s->buffer);
> -    qemu_free(s);

... similarly, here the migration thread might end up using the buffer. 
  Just set s->closed here and wait for thread completion; the migration 
thread can handle the flushes free the buffer etc.  Let the migration 
thread do as much as possible, it will simplify your life.

>       return ret;
>   }
>
> +
>   static int buffered_rate_limit(void *opaque)
>   {
>       QEMUFileBuffered *s = opaque;
> @@ -228,34 +223,55 @@ static int64_t buffered_get_rate_limit(void *opaque)
>       return s->xfer_limit;
>   }
>
> -static void buffered_rate_tick(void *opaque)
> +static void *migrate_vm(void *opaque)
>   {
>       QEMUFileBuffered *s = opaque;
> +    int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) + 100;
> +    struct timeval tv = { .tv_sec = 0, .tv_usec = 100000};
>
> -    if (s->has_error) {
> -        buffered_close(s);
> -        return;
> -    }
> +    qemu_mutex_lock_iothread();
>
> -    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
> +    while (!s->closed) {

... This can be in fact

     while (!s->closed || s->buffered_size)

and that alone will subsume the loop in buffered_close, no?

> +        if (s->freeze_output) {
> +            s->wait_for_unfreeze(s);
> +            s->freeze_output = 0;
> +            continue;
> +        }
>
> -    if (s->freeze_output)
> -        return;
> +        if (s->has_error) {
> +            break;
> +        }
> +
> +        current_time = qemu_get_clock_ms(rt_clock);
> +        if (!s->closed&&  (expire_time>  current_time)) {
> +            tv.tv_usec = 1000 * (expire_time - current_time);
> +            select(0, NULL, NULL, NULL,&tv);
> +            continue;
> +        }
>
> -    s->bytes_xfer = 0;
> +        s->bytes_xfer = 0;
> +        buffered_flush(s);
>
> -    buffered_flush(s);
> +        expire_time = qemu_get_clock_ms(rt_clock) + 100;
> +        s->put_ready(s->opaque);
> +    }
>
> -    /* Add some checks around this */
> -    s->put_ready(s->opaque);
> +    if (s->has_error) {
> +        buffered_close(s);
> +    }
> +    qemu_free(s);
> +
> +    qemu_mutex_unlock_iothread();
> +
> +    return NULL;
>   }
>
>   QEMUFile *qemu_fopen_ops_buffered(void *opaque,
> -                                  size_t bytes_per_sec,
> -                                  BufferedPutFunc *put_buffer,
> -                                  BufferedPutReadyFunc *put_ready,
> -                                  BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
> -                                  BufferedCloseFunc *close)
> +        size_t bytes_per_sec,
> +        BufferedPutFunc *put_buffer,
> +        BufferedPutReadyFunc *put_ready,
> +        BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
> +        BufferedCloseFunc *close)
>   {
>       QEMUFileBuffered *s;
>
> @@ -267,15 +283,14 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque,
>       s->put_ready = put_ready;
>       s->wait_for_unfreeze = wait_for_unfreeze;
>       s->close = close;
> +    s->closed = 0;
>
>       s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
>                                buffered_close, buffered_rate_limit,
>                                buffered_set_rate_limit,
> -			     buffered_get_rate_limit);
> -
> -    s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
> +                             buffered_get_rate_limit);
>
> -    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
> +    qemu_thread_create(&s->thread, migrate_vm, s);
>
>       return s->file;
>   }
> diff --git a/buffered_file.h b/buffered_file.h
> index 98d358b..477bf7c 100644
> --- a/buffered_file.h
> +++ b/buffered_file.h
> @@ -17,9 +17,13 @@
>   #include "hw/hw.h"
>
>   typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t size);
> +typedef void (BufferedBeginFunc)(void *opaque);

Unused typedef.

>   typedef void (BufferedPutReadyFunc)(void *opaque);
>   typedef void (BufferedWaitForUnfreezeFunc)(void *opaque);
>   typedef int (BufferedCloseFunc)(void *opaque);
> +typedef void (BufferedWaitForCancelFunc)(void *opaque);
> +
> +void wait_for_cancel(void *opaque);

BufferedWaitForCancelFunc should go in patch 2; wait_for_cancel is unused.

>   QEMUFile *qemu_fopen_ops_buffered(void *opaque, size_t xfer_limit,
>                                     BufferedPutFunc *put_buffer,
> diff --git a/migration.c b/migration.c
> index af3a1f2..d8a0abb 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -284,8 +284,6 @@ int migrate_fd_cleanup(FdMigrationState *s)
>   {
>       int ret = 0;
>
> -    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
> -
>       if (s->file) {
>           DPRINTF("closing file\n");
>           if (qemu_fclose(s->file) != 0) {
> @@ -307,14 +305,6 @@ int migrate_fd_cleanup(FdMigrationState *s)
>       return ret;
>   }
>
> -void migrate_fd_put_notify(void *opaque)
> -{
> -    FdMigrationState *s = opaque;
> -
> -    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
> -    qemu_file_put_notify(s->file);
> -}
> -

qemu_file_put_notify is also unused now.

>   ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
>   {
>       FdMigrationState *s = opaque;
> @@ -327,9 +317,7 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
>       if (ret == -1)
>           ret = -(s->get_error(s));
>
> -    if (ret == -EAGAIN) {
> -        qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s);
> -    } else if (ret<  0) {
> +    if (ret<  0&&  ret != -EAGAIN) {
>           if (s->mon) {
>               monitor_resume(s->mon);
>           }
> @@ -342,36 +330,40 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
>
>   void migrate_fd_connect(FdMigrationState *s)
>   {
> -    int ret;
> -
> +    s->begin = 1;
>       s->file = qemu_fopen_ops_buffered(s,
>                                         s->bandwidth_limit,
>                                         migrate_fd_put_buffer,
>                                         migrate_fd_put_ready,
>                                         migrate_fd_wait_for_unfreeze,
>                                         migrate_fd_close);
> -
> -    DPRINTF("beginning savevm\n");
> -    ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
> -                                  s->mig_state.shared);
> -    if (ret<  0) {
> -        DPRINTF("failed, %d\n", ret);
> -        migrate_fd_error(s);
> -        return;
> -    }
> -
> -    migrate_fd_put_ready(s);
>   }
>
>   void migrate_fd_put_ready(void *opaque)
>   {
>       FdMigrationState *s = opaque;
> +    int ret;
>
>       if (s->state != MIG_STATE_ACTIVE) {
>           DPRINTF("put_ready returning because of non-active state\n");
> +        if (s->state == MIG_STATE_CANCELLED) {
> +            migrate_fd_terminate(s);
> +        }
>           return;
>       }
>
> +    if (s->begin) {
> +        DPRINTF("beginning savevm\n");
> +        ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
> +                s->mig_state.shared);
> +        if (ret<  0) {
> +            DPRINTF("failed, %d\n", ret);
> +            migrate_fd_error(s);
> +            return;
> +        }
> +        s->begin = 0;
> +    }
> +
>       DPRINTF("iterate\n");
>       if (qemu_savevm_state_iterate(s->mon, s->file) == 1) {
>           int state;
> @@ -415,6 +407,10 @@ void migrate_fd_cancel(MigrationState *mig_state)
>       DPRINTF("cancelling migration\n");
>
>       s->state = MIG_STATE_CANCELLED;
> +}
> +
> +void migrate_fd_terminate(FdMigrationState *s)
> +{
>       notifier_list_notify(&migration_state_notifiers);
>       qemu_savevm_state_cancel(s->mon, s->file);
>
> @@ -458,7 +454,6 @@ int migrate_fd_close(void *opaque)
>   {
>       FdMigrationState *s = opaque;
>
> -    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
>       return s->close(s);
>   }
>
> diff --git a/migration.h b/migration.h
> index 050c56c..887f84c 100644
> --- a/migration.h
> +++ b/migration.h
> @@ -45,9 +45,11 @@ struct FdMigrationState
>       int fd;
>       Monitor *mon;
>       int state;
> +    int begin;
>       int (*get_error)(struct FdMigrationState*);
>       int (*close)(struct FdMigrationState*);
>       int (*write)(struct FdMigrationState*, const void *, size_t);
> +    void (*callback)(void *);
>       void *opaque;
>   };
>
> @@ -118,12 +120,16 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size);
>
>   void migrate_fd_connect(FdMigrationState *s);
>
> +void migrate_fd_begin(void *opaque);
> +
>   void migrate_fd_put_ready(void *opaque);
>
>   int migrate_fd_get_status(MigrationState *mig_state);
>
>   void migrate_fd_cancel(MigrationState *mig_state);
>
> +void migrate_fd_terminate(FdMigrationState *s);
> +
>   void migrate_fd_release(MigrationState *mig_state);
>
>   void migrate_fd_wait_for_unfreeze(void *opaque);


  reply	other threads:[~2011-08-11 16:18 UTC|newest]

Thread overview: 19+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2011-08-11 15:32 [RFC PATCH v3 0/4] Separate thread for VM migration Umesh Deshpande
2011-08-11 15:32 ` [RFC PATCH v3 1/4] separate " Umesh Deshpande
2011-08-11 16:18   ` Paolo Bonzini [this message]
2011-08-11 17:36     ` Umesh Deshpande
2011-08-12  6:40       ` Paolo Bonzini
2011-08-11 15:32 ` [RFC PATCH v3 2/4] Making iothread block for migrate_cancel Umesh Deshpande
2011-08-11 15:32 ` [RFC PATCH v3 3/4] lock to protect memslots Umesh Deshpande
2011-08-11 16:20   ` Paolo Bonzini
2011-08-12  6:45     ` Paolo Bonzini
2011-08-15  6:45       ` Umesh Deshpande
2011-08-15 14:10         ` Paolo Bonzini
2011-08-15  7:26       ` Marcelo Tosatti
2011-08-15 14:14         ` Paolo Bonzini
2011-08-15 20:27           ` Umesh Deshpande
2011-08-16  6:15             ` Paolo Bonzini
2011-08-16  7:56               ` Paolo Bonzini
2011-08-11 15:32 ` [RFC PATCH v3 4/4] Separate migration bitmap Umesh Deshpande
2011-08-11 16:23 ` [RFC PATCH v3 0/4] Separate thread for VM migration Paolo Bonzini
2011-08-11 18:25 ` Anthony Liguori

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=4E4400EE.1030905@redhat.com \
    --to=pbonzini@redhat.com \
    --cc=kvm@vger.kernel.org \
    --cc=mtosatti@redhat.com \
    --cc=quintela@redhat.com \
    --cc=udeshpan@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox