From mboxrd@z Thu Jan 1 00:00:00 1970 From: Paolo Bonzini Subject: Re: [RFC PATCH v3 1/4] separate thread for VM migration Date: Thu, 11 Aug 2011 18:18:54 +0200 Message-ID: <4E4400EE.1030905@redhat.com> References: <6ac256e1f481ea28678bae846a13714302f258db.1313076455.git.udeshpan@redhat.com> Mime-Version: 1.0 Content-Type: text/plain; charset=ISO-8859-1; format=flowed Content-Transfer-Encoding: 7bit Cc: kvm@vger.kernel.org, quintela@redhat.com, mtosatti@redhat.com To: Umesh Deshpande Return-path: Received: from mx1.redhat.com ([209.132.183.28]:52406 "EHLO mx1.redhat.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1751251Ab1HKQS5 (ORCPT ); Thu, 11 Aug 2011 12:18:57 -0400 Received: from int-mx02.intmail.prod.int.phx2.redhat.com (int-mx02.intmail.prod.int.phx2.redhat.com [10.5.11.12]) by mx1.redhat.com (8.14.4/8.14.4) with ESMTP id p7BGIvds017513 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES256-SHA bits=256 verify=OK) for ; Thu, 11 Aug 2011 12:18:57 -0400 In-Reply-To: <6ac256e1f481ea28678bae846a13714302f258db.1313076455.git.udeshpan@redhat.com> Sender: kvm-owner@vger.kernel.org List-ID: On 08/11/2011 05:32 PM, Umesh Deshpande wrote: > This patch creates a separate thread for the guest migration on the source side. > migrate_cancel request from the iothread is handled asynchronously. That is, > iothread submits migrate_cancel to the migration thread and returns, while the > migration thread attends this request at the next iteration to terminate its > execution. Looks pretty good! I hope you agree. :) Just one note inside. > Signed-off-by: Umesh Deshpande > --- > buffered_file.c | 85 ++++++++++++++++++++++++++++++++---------------------- > buffered_file.h | 4 ++ > migration.c | 49 ++++++++++++++----------------- > migration.h | 6 ++++ > 4 files changed, 82 insertions(+), 62 deletions(-) > > diff --git a/buffered_file.c b/buffered_file.c > index 41b42c3..19932b6 100644 > --- a/buffered_file.c > +++ b/buffered_file.c > @@ -16,6 +16,8 @@ > #include "qemu-timer.h" > #include "qemu-char.h" > #include "buffered_file.h" > +#include "migration.h" > +#include "qemu-thread.h" > > //#define DEBUG_BUFFERED_FILE > > @@ -28,13 +30,14 @@ typedef struct QEMUFileBuffered > void *opaque; > QEMUFile *file; > int has_error; > + int closed; > int freeze_output; > size_t bytes_xfer; > size_t xfer_limit; > uint8_t *buffer; > size_t buffer_size; > size_t buffer_capacity; > - QEMUTimer *timer; > + QemuThread thread; > } QEMUFileBuffered; > > #ifdef DEBUG_BUFFERED_FILE > @@ -155,14 +158,6 @@ static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in > offset = size; > } > > - if (pos == 0&& size == 0) { > - DPRINTF("file is ready\n"); > - if (s->bytes_xfer<= s->xfer_limit) { > - DPRINTF("notifying client\n"); > - s->put_ready(s->opaque); > - } > - } > - > return offset; > } > > @@ -175,20 +170,20 @@ static int buffered_close(void *opaque) > > while (!s->has_error&& s->buffer_size) { > buffered_flush(s); > - if (s->freeze_output) > + if (s->freeze_output) { > s->wait_for_unfreeze(s); > + } > } This is racy; you might end up calling buffered_put_buffer twice from two different threads. > - ret = s->close(s->opaque); > + s->closed = 1; > > - qemu_del_timer(s->timer); > - qemu_free_timer(s->timer); > + ret = s->close(s->opaque); > qemu_free(s->buffer); > - qemu_free(s); ... similarly, here the migration thread might end up using the buffer. Just set s->closed here and wait for thread completion; the migration thread can handle the flushes free the buffer etc. Let the migration thread do as much as possible, it will simplify your life. > return ret; > } > > + > static int buffered_rate_limit(void *opaque) > { > QEMUFileBuffered *s = opaque; > @@ -228,34 +223,55 @@ static int64_t buffered_get_rate_limit(void *opaque) > return s->xfer_limit; > } > > -static void buffered_rate_tick(void *opaque) > +static void *migrate_vm(void *opaque) > { > QEMUFileBuffered *s = opaque; > + int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) + 100; > + struct timeval tv = { .tv_sec = 0, .tv_usec = 100000}; > > - if (s->has_error) { > - buffered_close(s); > - return; > - } > + qemu_mutex_lock_iothread(); > > - qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100); > + while (!s->closed) { ... This can be in fact while (!s->closed || s->buffered_size) and that alone will subsume the loop in buffered_close, no? > + if (s->freeze_output) { > + s->wait_for_unfreeze(s); > + s->freeze_output = 0; > + continue; > + } > > - if (s->freeze_output) > - return; > + if (s->has_error) { > + break; > + } > + > + current_time = qemu_get_clock_ms(rt_clock); > + if (!s->closed&& (expire_time> current_time)) { > + tv.tv_usec = 1000 * (expire_time - current_time); > + select(0, NULL, NULL, NULL,&tv); > + continue; > + } > > - s->bytes_xfer = 0; > + s->bytes_xfer = 0; > + buffered_flush(s); > > - buffered_flush(s); > + expire_time = qemu_get_clock_ms(rt_clock) + 100; > + s->put_ready(s->opaque); > + } > > - /* Add some checks around this */ > - s->put_ready(s->opaque); > + if (s->has_error) { > + buffered_close(s); > + } > + qemu_free(s); > + > + qemu_mutex_unlock_iothread(); > + > + return NULL; > } > > QEMUFile *qemu_fopen_ops_buffered(void *opaque, > - size_t bytes_per_sec, > - BufferedPutFunc *put_buffer, > - BufferedPutReadyFunc *put_ready, > - BufferedWaitForUnfreezeFunc *wait_for_unfreeze, > - BufferedCloseFunc *close) > + size_t bytes_per_sec, > + BufferedPutFunc *put_buffer, > + BufferedPutReadyFunc *put_ready, > + BufferedWaitForUnfreezeFunc *wait_for_unfreeze, > + BufferedCloseFunc *close) > { > QEMUFileBuffered *s; > > @@ -267,15 +283,14 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque, > s->put_ready = put_ready; > s->wait_for_unfreeze = wait_for_unfreeze; > s->close = close; > + s->closed = 0; > > s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL, > buffered_close, buffered_rate_limit, > buffered_set_rate_limit, > - buffered_get_rate_limit); > - > - s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s); > + buffered_get_rate_limit); > > - qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100); > + qemu_thread_create(&s->thread, migrate_vm, s); > > return s->file; > } > diff --git a/buffered_file.h b/buffered_file.h > index 98d358b..477bf7c 100644 > --- a/buffered_file.h > +++ b/buffered_file.h > @@ -17,9 +17,13 @@ > #include "hw/hw.h" > > typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t size); > +typedef void (BufferedBeginFunc)(void *opaque); Unused typedef. > typedef void (BufferedPutReadyFunc)(void *opaque); > typedef void (BufferedWaitForUnfreezeFunc)(void *opaque); > typedef int (BufferedCloseFunc)(void *opaque); > +typedef void (BufferedWaitForCancelFunc)(void *opaque); > + > +void wait_for_cancel(void *opaque); BufferedWaitForCancelFunc should go in patch 2; wait_for_cancel is unused. > QEMUFile *qemu_fopen_ops_buffered(void *opaque, size_t xfer_limit, > BufferedPutFunc *put_buffer, > diff --git a/migration.c b/migration.c > index af3a1f2..d8a0abb 100644 > --- a/migration.c > +++ b/migration.c > @@ -284,8 +284,6 @@ int migrate_fd_cleanup(FdMigrationState *s) > { > int ret = 0; > > - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); > - > if (s->file) { > DPRINTF("closing file\n"); > if (qemu_fclose(s->file) != 0) { > @@ -307,14 +305,6 @@ int migrate_fd_cleanup(FdMigrationState *s) > return ret; > } > > -void migrate_fd_put_notify(void *opaque) > -{ > - FdMigrationState *s = opaque; > - > - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); > - qemu_file_put_notify(s->file); > -} > - qemu_file_put_notify is also unused now. > ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size) > { > FdMigrationState *s = opaque; > @@ -327,9 +317,7 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size) > if (ret == -1) > ret = -(s->get_error(s)); > > - if (ret == -EAGAIN) { > - qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s); > - } else if (ret< 0) { > + if (ret< 0&& ret != -EAGAIN) { > if (s->mon) { > monitor_resume(s->mon); > } > @@ -342,36 +330,40 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size) > > void migrate_fd_connect(FdMigrationState *s) > { > - int ret; > - > + s->begin = 1; > s->file = qemu_fopen_ops_buffered(s, > s->bandwidth_limit, > migrate_fd_put_buffer, > migrate_fd_put_ready, > migrate_fd_wait_for_unfreeze, > migrate_fd_close); > - > - DPRINTF("beginning savevm\n"); > - ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk, > - s->mig_state.shared); > - if (ret< 0) { > - DPRINTF("failed, %d\n", ret); > - migrate_fd_error(s); > - return; > - } > - > - migrate_fd_put_ready(s); > } > > void migrate_fd_put_ready(void *opaque) > { > FdMigrationState *s = opaque; > + int ret; > > if (s->state != MIG_STATE_ACTIVE) { > DPRINTF("put_ready returning because of non-active state\n"); > + if (s->state == MIG_STATE_CANCELLED) { > + migrate_fd_terminate(s); > + } > return; > } > > + if (s->begin) { > + DPRINTF("beginning savevm\n"); > + ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk, > + s->mig_state.shared); > + if (ret< 0) { > + DPRINTF("failed, %d\n", ret); > + migrate_fd_error(s); > + return; > + } > + s->begin = 0; > + } > + > DPRINTF("iterate\n"); > if (qemu_savevm_state_iterate(s->mon, s->file) == 1) { > int state; > @@ -415,6 +407,10 @@ void migrate_fd_cancel(MigrationState *mig_state) > DPRINTF("cancelling migration\n"); > > s->state = MIG_STATE_CANCELLED; > +} > + > +void migrate_fd_terminate(FdMigrationState *s) > +{ > notifier_list_notify(&migration_state_notifiers); > qemu_savevm_state_cancel(s->mon, s->file); > > @@ -458,7 +454,6 @@ int migrate_fd_close(void *opaque) > { > FdMigrationState *s = opaque; > > - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); > return s->close(s); > } > > diff --git a/migration.h b/migration.h > index 050c56c..887f84c 100644 > --- a/migration.h > +++ b/migration.h > @@ -45,9 +45,11 @@ struct FdMigrationState > int fd; > Monitor *mon; > int state; > + int begin; > int (*get_error)(struct FdMigrationState*); > int (*close)(struct FdMigrationState*); > int (*write)(struct FdMigrationState*, const void *, size_t); > + void (*callback)(void *); > void *opaque; > }; > > @@ -118,12 +120,16 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size); > > void migrate_fd_connect(FdMigrationState *s); > > +void migrate_fd_begin(void *opaque); > + > void migrate_fd_put_ready(void *opaque); > > int migrate_fd_get_status(MigrationState *mig_state); > > void migrate_fd_cancel(MigrationState *mig_state); > > +void migrate_fd_terminate(FdMigrationState *s); > + > void migrate_fd_release(MigrationState *mig_state); > > void migrate_fd_wait_for_unfreeze(void *opaque);