From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([2001:4830:134:3::10]:33464) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1XegPC-0004C3-48 for qemu-devel@nongnu.org; Thu, 16 Oct 2014 04:27:36 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1XegP5-0004fF-UZ for qemu-devel@nongnu.org; Thu, 16 Oct 2014 04:27:30 -0400 Received: from szxga03-in.huawei.com ([119.145.14.66]:62909) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1XegP4-0004bp-Ub for qemu-devel@nongnu.org; Thu, 16 Oct 2014 04:27:23 -0400 Message-ID: <543F814F.6030105@huawei.com> Date: Thu, 16 Oct 2014 16:26:55 +0800 From: zhanghailiang MIME-Version: 1.0 References: <1412358473-31398-1-git-send-email-dgilbert@redhat.com> <1412358473-31398-17-git-send-email-dgilbert@redhat.com> In-Reply-To: <1412358473-31398-17-git-send-email-dgilbert@redhat.com> Content-Type: text/plain; charset="windows-1252"; format=flowed Content-Transfer-Encoding: 7bit Subject: Re: [Qemu-devel] [PATCH v4 16/47] Return path: Source handling of return path List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: "Dr. David Alan Gilbert (git)" , qemu-devel@nongnu.org Cc: aarcange@redhat.com, yamahata@private.email.ne.jp, lilei@linux.vnet.ibm.com, quintela@redhat.com, cristian.klein@cs.umu.se, amit.shah@redhat.com, yanghy@cn.fujitsu.com On 2014/10/4 1:47, Dr. David Alan Gilbert (git) wrote: > From: "Dr. David Alan Gilbert" > > Open a return path, and handle messages that are received upon it. > > Signed-off-by: Dr. David Alan Gilbert > --- > include/migration/migration.h | 10 +++ > migration.c | 181 +++++++++++++++++++++++++++++++++++++++++- > 2 files changed, 190 insertions(+), 1 deletion(-) > > diff --git a/include/migration/migration.h b/include/migration/migration.h > index 12e640d..b87c289 100644 > --- a/include/migration/migration.h > +++ b/include/migration/migration.h > @@ -47,6 +47,14 @@ enum mig_rpcomm_cmd { > MIG_RPCOMM_ACK, /* data (seq: be32 ) */ > MIG_RPCOMM_AFTERLASTVALID > }; > + > +/* Source side RP state */ > +struct MigrationRetPathState { > + uint32_t latest_ack; > + QemuThread rp_thread; > + bool error; > +}; > + > typedef struct MigrationState MigrationState; > > /* State for the incoming migration */ > @@ -69,9 +77,11 @@ struct MigrationState > QemuThread thread; > QEMUBH *cleanup_bh; > QEMUFile *file; > + QEMUFile *return_path; > > int state; > MigrationParams params; > + struct MigrationRetPathState rp_state; > double mbps; > int64_t total_time; > int64_t downtime; > diff --git a/migration.c b/migration.c > index 5ba8f3e..ee6db1d 100644 > --- a/migration.c > +++ b/migration.c > @@ -246,6 +246,23 @@ MigrationCapabilityStatusList *qmp_query_migrate_capabilities(Error **errp) > return head; > } > > +/* > + * Return true if we're already in the middle of a migration > + * (i.e. any of the active or setup states) > + */ > +static bool migration_already_active(MigrationState *ms) > +{ > + switch (ms->state) { > + case MIG_STATE_ACTIVE: > + case MIG_STATE_SETUP: > + return true; > + > + default: > + return false; > + > + } > +} > + > static void get_xbzrle_cache_stats(MigrationInfo *info) > { > if (migrate_use_xbzrle()) { > @@ -371,6 +388,21 @@ static void migrate_set_state(MigrationState *s, int old_state, int new_state) > } > } > > +static void migrate_fd_cleanup_src_rp(MigrationState *ms) > +{ > + QEMUFile *rp = ms->return_path; > + > + /* > + * When stuff goes wrong (e.g. failing destination) on the rp, it can get > + * cleaned up from a few threads; make sure not to do it twice in parallel > + */ > + rp = atomic_cmpxchg(&ms->return_path, rp, NULL); > + if (rp) { > + DPRINTF("cleaning up return path\n"); > + qemu_fclose(rp); > + } > +} > + > static void migrate_fd_cleanup(void *opaque) > { > MigrationState *s = opaque; > @@ -378,6 +410,8 @@ static void migrate_fd_cleanup(void *opaque) > qemu_bh_delete(s->cleanup_bh); > s->cleanup_bh = NULL; > > + migrate_fd_cleanup_src_rp(s); > + > if (s->file) { > trace_migrate_fd_cleanup(); > qemu_mutex_unlock_iothread(); > @@ -414,6 +448,11 @@ static void migrate_fd_cancel(MigrationState *s) > int old_state ; > trace_migrate_fd_cancel(); > > + if (s->return_path) { > + /* shutdown the rp socket, so causing the rp thread to shutdown */ > + qemu_file_shutdown(s->return_path); > + } > + > do { > old_state = s->state; > if (old_state != MIG_STATE_SETUP && old_state != MIG_STATE_ACTIVE) { > @@ -655,8 +694,148 @@ int64_t migrate_xbzrle_cache_size(void) > return s->xbzrle_cache_size; > } > > -/* migration thread support */ > +/* > + * Something bad happened to the RP stream, mark an error > + * The caller shall print something to indicate why > + */ > +static void source_return_path_bad(MigrationState *s) > +{ > + s->rp_state.error = true; > + migrate_fd_cleanup_src_rp(s); > +} > > +/* > + * Handles messages sent on the return path towards the source VM > + * > + */ > +static void *source_return_path_thread(void *opaque) > +{ > + MigrationState *ms = opaque; > + QEMUFile *rp = ms->return_path; > + uint16_t expected_len, header_len, header_com; > + const int max_len = 512; > + uint8_t buf[max_len]; > + uint32_t tmp32; > + int res; > + > + DPRINTF("RP: %s entry", __func__); > + while (rp && !qemu_file_get_error(rp) && > + migration_already_active(ms)) { > + DPRINTF("RP: %s top of loop", __func__); > + header_com = qemu_get_be16(rp); > + header_len = qemu_get_be16(rp); > + > + switch (header_com) { > + case MIG_RPCOMM_SHUT: > + case MIG_RPCOMM_ACK: > + expected_len = 4; > + break; > + > + default: > + error_report("RP: Received invalid cmd 0x%04x length 0x%04x", > + header_com, header_len); > + source_return_path_bad(ms); > + goto out; > + } > + > + if (header_len > expected_len) { > + error_report("RP: Received command 0x%04x with" > + "incorrect length %d expecting %d", > + header_com, header_len, > + expected_len); > + source_return_path_bad(ms); > + goto out; > + } > + > + /* We know we've got a valid header by this point */ > + res = qemu_get_buffer(rp, buf, header_len); > + if (res != header_len) { > + DPRINTF("RP: Failed to read command data"); > + source_return_path_bad(ms); > + goto out; > + } > + > + /* OK, we have the command and the data */ > + switch (header_com) { > + case MIG_RPCOMM_SHUT: > + tmp32 = be32_to_cpup((uint32_t *)buf); > + if (tmp32) { > + error_report("RP: Sibling indicated error %d", tmp32); > + source_return_path_bad(ms); > + } else { > + DPRINTF("RP: SHUT received"); > + } > + /* > + * We'll let the main thread deal with closing the RP > + * we could do a shutdown(2) on it, but we're the only user > + * anyway, so there's nothing gained. > + */ > + goto out; > + > + case MIG_RPCOMM_ACK: > + tmp32 = be32_to_cpup((uint32_t *)buf); > + DPRINTF("RP: Received ACK 0x%x", tmp32); > + atomic_xchg(&ms->rp_state.latest_ack, tmp32); I didn't see *ms->rp_state.latest_ack* been used elsewhere, what's it used for?;) > + break; > + > + default: > + /* This shouldn't happen because we should catch this above */ > + DPRINTF("RP: Bad header_com in dispatch"); > + } > + /* Latest command processed, now leave a gap for the next one */ > + header_com = MIG_RPCOMM_INVALID; > + } > + if (rp && qemu_file_get_error(rp)) { > + DPRINTF("%s: rp bad at end", __func__); > + source_return_path_bad(ms); > + } > + > + DPRINTF("%s: Bottom exit", __func__); > + > +out: > + return NULL; > +} > + > +__attribute__ (( unused )) /* Until later in patch series */ > +static int open_outgoing_return_path(MigrationState *ms) > +{ > + > + ms->return_path = qemu_file_get_return_path(ms->file); > + if (!ms->return_path) { > + return -1; > + } > + > + DPRINTF("%s: starting thread", __func__); > + qemu_thread_create(&ms->rp_state.rp_thread, "return path", > + source_return_path_thread, ms, QEMU_THREAD_JOINABLE); > + > + DPRINTF("%s: continuing", __func__); > + > + return 0; > +} > + > +__attribute__ (( unused )) /* Until later in patch series */ > +static void await_outgoing_return_path_close(MigrationState *ms) > +{ > + /* > + * If this is a normal exit then the destination will send a SHUT and the > + * rp_thread will exit, however if there's an error we need to cause > + * it to exit, which we can do by a shutdown. > + * (canceling must also shutdown to stop us getting stuck here if > + * the destination died at just the wrong place) > + */ > + if (qemu_file_get_error(ms->file) && ms->return_path) { > + qemu_file_shutdown(ms->return_path); > + } > + DPRINTF("%s: Joining", __func__); > + qemu_thread_join(&ms->rp_state.rp_thread); > + DPRINTF("%s: Exit", __func__); > +} > + > +/* > + * Master migration thread on the source VM. > + * It drives the migration and pumps the data down the outgoing channel. > + */ > static void *migration_thread(void *opaque) > { > MigrationState *s = opaque; >