From: Li Guang <lig.fnst@cn.fujitsu.com>
To: mrhines@linux.vnet.ibm.com
Cc: GILR@il.ibm.com, SADEKJ@il.ibm.com, quintela@redhat.com,
BIRAN@il.ibm.com, hinesmr@cn.ibm.com, qemu-devel@nongnu.org,
EREZH@il.ibm.com, owasserm@redhat.com, onom@us.ibm.com,
junqing.wang@cs2c.com.cn, "Michael R. Hines" <mrhines@us.ibm.com>,
gokul@us.ibm.com, dbulkow@gmail.com, pbonzini@redhat.com,
abali@us.ibm.com, isaku.yamahata@gmail.com
Subject: Re: [Qemu-devel] [RFC PATCH v2 08/12] mc: core logic
Date: Wed, 19 Feb 2014 09:07:53 +0800 [thread overview]
Message-ID: <530403E9.7060501@cn.fujitsu.com> (raw)
In-Reply-To: <1392713429-18201-9-git-send-email-mrhines@linux.vnet.ibm.com>
Hi,
mrhines@linux.vnet.ibm.com wrote:
> From: "Michael R. Hines"<mrhines@us.ibm.com>
>
> This implements the core logic,
> all described in the first patch (docs/mc.txt).
>
> Signed-off-by: Michael R. Hines<mrhines@us.ibm.com>
> ---
> migration-checkpoint.c | 1565 ++++++++++++++++++++++++++++++++++++++++++++++++
> 1 file changed, 1565 insertions(+)
> create mode 100644 migration-checkpoint.c
>
>
>
[big snip] ...
> +
> +/*
> + * Stop the VM, generate the micro checkpoint,
> + * but save the dirty memory into staging memory until
> + * we can re-activate the VM as soon as possible.
> + */
> +static int capture_checkpoint(MCParams *mc, MigrationState *s)
> +{
> + MCCopyset *copyset;
> + int idx, ret = 0;
> + uint64_t start, stop, copies = 0;
> + int64_t start_time;
> +
> + mc->total_copies = 0;
> + qemu_mutex_lock_iothread();
> + vm_stop_force_state(RUN_STATE_CHECKPOINT_VM);
> + start = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
> +
> + /*
> + * If buffering is enabled, insert a Qdisc plug here
> + * to hold packets for the *next* MC, (not this one,
> + * the packets for this one have already been plugged
> + * and will be released after the MC has been transmitted.
> + */
> + mc_start_buffer();
>
actually, I have a special request,
if QEMU started without netdev,
then don't bother me by Qdisc for network buffering. :-)
Thanks!
> +
> + qemu_savevm_state_begin(mc->staging,&s->params);
> + ret = qemu_file_get_error(s->file);
> +
> + if (ret< 0) {
> + migrate_set_state(s, MIG_STATE_CHECKPOINTING, MIG_STATE_ERROR);
> + }
> +
> + qemu_savevm_state_complete(mc->staging);
> +
> + ret = qemu_file_get_error(s->file);
> + if (ret< 0) {
> + migrate_set_state(s, MIG_STATE_CHECKPOINTING, MIG_STATE_ERROR);
> + goto out;
> + }
> +
> + /*
> + * The copied memory gets appended to the end of the snapshot, so let's
> + * remember where its going to go first and start a new slab.
> + */
> + mc_slab_next(mc, mc->curr_slab);
> + mc->start_copyset = mc->curr_slab->idx;
> +
> + start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
> +
> + /*
> + * Now perform the actual copy of memory into the tail end of the slab list.
> + */
> + QTAILQ_FOREACH(copyset,&mc->copy_head, node) {
> + if (!copyset->nb_copies) {
> + break;
> + }
> +
> + copies += copyset->nb_copies;
> +
> + DDDPRINTF("copyset %d copies: %" PRIu64 " total: %" PRIu64 "\n",
> + copyset->idx, copyset->nb_copies, copies);
> +
> + for (idx = 0; idx< copyset->nb_copies; idx++) {
> + uint8_t *addr;
> + long size;
> + mc->copy =©set->copies[idx];
> + addr = (uint8_t *) (mc->copy->host_addr + mc->copy->offset);
> + size = mc_put_buffer(mc, addr, mc->copy->offset, mc->copy->size);
> + if (size != mc->copy->size) {
> + fprintf(stderr, "Failure to initiate copyset %d index %d\n",
> + copyset->idx, idx);
> + migrate_set_state(s, MIG_STATE_CHECKPOINTING, MIG_STATE_ERROR);
> + vm_start();
> + goto out;
> + }
> +
> + DDDPRINTF("Success copyset %d index %d\n", copyset->idx, idx);
> + }
> +
> + copyset->nb_copies = 0;
> + }
> +
> + s->ram_copy_time = (qemu_clock_get_ms(QEMU_CLOCK_REALTIME) - start_time);
> +
> + mc->copy = NULL;
> + ram_control_before_iterate(mc->file, RAM_CONTROL_FLUSH);
> + assert(mc->total_copies == copies);
> +
> + stop = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
> +
> + /*
> + * MC is safe in staging area. Let the VM go.
> + */
> + vm_start();
> + qemu_fflush(mc->staging);
> +
> + s->downtime = stop - start;
> +out:
> + qemu_mutex_unlock_iothread();
> + return ret;
> +}
> +
> +/*
> + * Synchronously send a micro-checkpointing command
> + */
> +static int mc_send(QEMUFile *f, uint64_t request)
> +{
> + int ret = 0;
> +
> + qemu_put_be64(f, request);
> +
> + ret = qemu_file_get_error(f);
> + if (ret) {
> + fprintf(stderr, "transaction: send error while sending %" PRIu64 ", "
> + "bailing: %s\n", request, strerror(-ret));
> + } else {
> + DDPRINTF("transaction: sent: %s (%" PRIu64 ")\n",
> + mc_desc[request], request);
> + }
> +
> + qemu_fflush(f);
> +
> + return ret;
> +}
> +
> +/*
> + * Synchronously receive a micro-checkpointing command
> + */
> +static int mc_recv(QEMUFile *f, uint64_t request, uint64_t *action)
> +{
> + int ret = 0;
> + uint64_t got;
> +
> + got = qemu_get_be64(f);
> +
> + ret = qemu_file_get_error(f);
> + if (ret) {
> + fprintf(stderr, "transaction: recv error while expecting %s (%"
> + PRIu64 "), bailing: %s\n", mc_desc[request],
> + request, strerror(-ret));
> + } else {
> + if ((request != MC_TRANSACTION_ANY)&& request != got) {
> + fprintf(stderr, "transaction: was expecting %s (%" PRIu64
> + ") but got %" PRIu64 " instead\n",
> + mc_desc[request], request, got);
> + ret = -EINVAL;
> + } else {
> + DDPRINTF("transaction: recv: %s (%" PRIu64 ")\n",
> + mc_desc[got], got);
> + ret = 0;
> + if (action) {
> + *action = got;
> + }
> + }
> + }
> +
> + return ret;
> +}
> +
> +static MCSlab *mc_slab_start(MCParams *mc)
> +{
> + if (mc->nb_slabs> 2) {
> + if (mc->slab_strikes>= max_strikes) {
> + uint64_t nb_slabs_to_free = MAX(1, (((mc->nb_slabs - 1) / 2)));
> +
> + DPRINTF("MC has reached max strikes. Will free %"
> + PRIu64 " / %" PRIu64 " slabs max %d, "
> + "checkpoints %" PRIu64 "\n",
> + nb_slabs_to_free, mc->nb_slabs,
> + max_strikes, mc->checkpoints);
> +
> + mc->slab_strikes = 0;
> +
> + while (nb_slabs_to_free) {
> + MCSlab *slab = QTAILQ_LAST(&mc->slab_head, shead);
> + ram_control_remove(mc->file, (uint64_t) slab->buf);
> + QTAILQ_REMOVE(&mc->slab_head, slab, node);
> + g_free(slab);
> + nb_slabs_to_free--;
> + mc->nb_slabs--;
> + }
> +
> + goto skip;
> + } else if (((mc->slab_total<=
> + ((mc->nb_slabs - 1) * MC_SLAB_BUFFER_SIZE)))) {
> + mc->slab_strikes++;
> + DDPRINTF("MC has strike %d slabs %" PRIu64 " max %d\n",
> + mc->slab_strikes, mc->nb_slabs, max_strikes);
> + goto skip;
> + }
> + }
> +
> + if (mc->slab_strikes) {
> + DDPRINTF("MC used all slabs. Resetting strikes to zero.\n");
> + mc->slab_strikes = 0;
> + }
> +skip:
> +
> + mc->used_slabs = 1;
> + mc->slab_total = 0;
> + mc->curr_slab = QTAILQ_FIRST(&mc->slab_head);
> + SLAB_RESET(mc->curr_slab);
> +
> + return mc->curr_slab;
> +}
> +
> +static MCCopyset *mc_copy_start(MCParams *mc)
> +{
> + if (mc->nb_copysets>= 2) {
> + if (mc->copy_strikes>= max_strikes) {
> + int nb_copies_to_free = MAX(1, (((mc->nb_copysets - 1) / 2)));
> +
> + DPRINTF("MC has reached max strikes. Will free %d / %d copies max %d\n",
> + nb_copies_to_free, mc->nb_copysets, max_strikes);
> +
> + mc->copy_strikes = 0;
> +
> + while (nb_copies_to_free) {
> + MCCopyset * copyset = QTAILQ_LAST(&mc->copy_head, chead);
> + QTAILQ_REMOVE(&mc->copy_head, copyset, node);
> + g_free(copyset);
> + nb_copies_to_free--;
> + mc->nb_copysets--;
> + }
> +
> + goto skip;
> + } else if (((mc->total_copies<=
> + ((mc->nb_copysets - 1) * MC_MAX_SLAB_COPY_DESCRIPTORS)))) {
> + mc->copy_strikes++;
> + DDPRINTF("MC has strike %d copies %d max %d\n",
> + mc->copy_strikes, mc->nb_copysets, max_strikes);
> + goto skip;
> + }
> + }
> +
> + if (mc->copy_strikes) {
> + DDPRINTF("MC used all copies. Resetting strikes to zero.\n");
> + mc->copy_strikes = 0;
> + }
> +skip:
> +
> + mc->total_copies = 0;
> + mc->curr_copyset = QTAILQ_FIRST(&mc->copy_head);
> + mc->curr_copyset->nb_copies = 0;
> +
> + return mc->curr_copyset;
> +}
> +
> +/*
> + * Main MC loop. Stop the VM, dump the dirty memory
> + * into staging, restart the VM, transmit the MC,
> + * and then sleep for some milliseconds before
> + * starting the next MC.
> + */
> +static void *mc_thread(void *opaque)
> +{
> + MigrationState *s = opaque;
> + MCParams mc = { .file = s->file };
> + MCSlab * slab;
> + int64_t initial_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
> + int ret = 0, fd = qemu_get_fd(s->file), x;
> + QEMUFile *mc_control, *mc_staging = NULL;
> + uint64_t wait_time = 0;
> +
> + if (!(mc_control = qemu_fopen_socket(fd, "rb"))) {
> + fprintf(stderr, "Failed to setup read MC control\n");
> + goto err;
> + }
> +
> + if (!(mc_staging = qemu_fopen_mc(&mc, "wb"))) {
> + fprintf(stderr, "Failed to setup MC staging area\n");
> + goto err;
> + }
> +
> + mc.staging = mc_staging;
> +
> + qemu_set_block(fd);
> + socket_set_nodelay(fd);
> +
> + s->checkpoints = 0;
> +
> + while (s->state == MIG_STATE_CHECKPOINTING) {
> + int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
> + int64_t start_time, xmit_start, end_time;
> + bool commit_sent = false;
> + int nb_slab = 0;
> + (void)nb_slab;
> +
> + slab = mc_slab_start(&mc);
> + mc_copy_start(&mc);
> + acct_clear();
> + start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
> +
> + if (capture_checkpoint(&mc, s)< 0)
> + break;
> +
> + xmit_start = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
> +
> + if ((ret = mc_send(s->file, MC_TRANSACTION_START)< 0)) {
> + fprintf(stderr, "transaction start failed\n");
> + break;
> + }
> +
> + DDPRINTF("Sending checkpoint size %" PRId64
> + " copyset start: %" PRIu64 " nb slab %" PRIu64
> + " used slabs %" PRIu64 "\n",
> + mc.slab_total, mc.start_copyset, mc.nb_slabs, mc.used_slabs);
> +
> + mc.curr_slab = QTAILQ_FIRST(&mc.slab_head);
> +
> + qemu_put_be64(s->file, mc.slab_total);
> + qemu_put_be64(s->file, mc.start_copyset);
> + qemu_put_be64(s->file, mc.used_slabs);
> +
> + qemu_fflush(s->file);
> +
> + DDPRINTF("Transaction commit\n");
> +
> + /*
> + * The MC is safe, and VM is running again.
> + * Start a transaction and send it.
> + */
> + ram_control_before_iterate(s->file, RAM_CONTROL_ROUND);
> +
> + slab = QTAILQ_FIRST(&mc.slab_head);
> +
> + for (x = 0; x< mc.used_slabs; x++) {
> + DDPRINTF("Attempting write to slab #%d: %p"
> + " total size: %" PRId64 " / %" PRIu64 "\n",
> + nb_slab++, slab->buf, slab->size, MC_SLAB_BUFFER_SIZE);
> +
> + ret = ram_control_save_page(s->file, (uint64_t) slab->buf,
> + NULL, 0, slab->size, NULL);
> +
> + if (ret == RAM_SAVE_CONTROL_NOT_SUPP) {
> + if (!commit_sent) {
> + if ((ret = mc_send(s->file, MC_TRANSACTION_COMMIT)< 0)) {
> + fprintf(stderr, "transaction commit failed\n");
> + break;
> + }
> + commit_sent = true;
> + }
> +
> + qemu_put_be64(s->file, slab->size);
> + qemu_put_buffer_async(s->file, slab->buf, slab->size);
> + } else if ((ret< 0)&& (ret != RAM_SAVE_CONTROL_DELAYED)) {
> + fprintf(stderr, "failed 1, skipping send\n");
> + goto err;
> + }
> +
> + if (qemu_file_get_error(s->file)) {
> + fprintf(stderr, "failed 2, skipping send\n");
> + goto err;
> + }
> +
> + DDPRINTF("Sent %" PRId64 " all %ld\n", slab->size, mc.slab_total);
> +
> + slab = QTAILQ_NEXT(slab, node);
> + }
> +
> + if (!commit_sent) {
> + ram_control_after_iterate(s->file, RAM_CONTROL_ROUND);
> + slab = QTAILQ_FIRST(&mc.slab_head);
> +
> + for (x = 0; x< mc.used_slabs; x++) {
> + qemu_put_be64(s->file, slab->size);
> + slab = QTAILQ_NEXT(slab, node);
> + }
> + }
> +
> + qemu_fflush(s->file);
> +
> + if (commit_sent) {
> + DDPRINTF("Waiting for commit ACK\n");
> +
> + if ((ret = mc_recv(mc_control, MC_TRANSACTION_ACK, NULL))< 0) {
> + goto err;
> + }
> + }
> +
> + ret = qemu_file_get_error(s->file);
> + if (ret) {
> + fprintf(stderr, "Error sending checkpoint: %d\n", ret);
> + goto err;
> + }
> +
> + DDPRINTF("Memory transfer complete.\n");
> +
> + /*
> + * The MC is safe on the other side now,
> + * go along our merry way and release the network
> + * packets from the buffer if enabled.
> + */
> + mc_flush_oldest_buffer();
> +
> + end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
> + s->total_time = end_time - start_time;
> + s->xmit_time = end_time - xmit_start;
> + s->bitmap_time = norm_mig_bitmap_time();
> + s->log_dirty_time = norm_mig_log_dirty_time();
> + s->mbps = MBPS(mc.slab_total, s->xmit_time);
> + s->copy_mbps = MBPS(mc.slab_total, s->ram_copy_time);
> + s->bytes_xfer = mc.slab_total;
> + s->checkpoints = mc.checkpoints++;
> +
> + wait_time = (s->downtime<= freq_ms) ? (freq_ms - s->downtime) : 0;
> +
> + if (current_time>= initial_time + 1000) {
> + DPRINTF("bytes %" PRIu64 " xmit_mbps %0.1f xmit_time %" PRId64
> + " downtime %" PRIu64 " sync_time %" PRId64
> + " logdirty_time %" PRId64 " ram_copy_time %" PRId64
> + " copy_mbps %0.1f wait time %" PRIu64
> + " checkpoints %" PRId64 "\n",
> + s->bytes_xfer,
> + s->mbps,
> + s->xmit_time,
> + s->downtime,
> + s->bitmap_time,
> + s->log_dirty_time,
> + s->ram_copy_time,
> + s->copy_mbps,
> + wait_time,
> + s->checkpoints);
> + initial_time = current_time;
> + }
> +
> + /*
> + * Checkpoint frequency in microseconds.
> + *
> + * Sometimes, when checkpoints are very large,
> + * all of the wait time was dominated by the
> + * time taken to copy the checkpoint into the staging area,
> + * in which case wait_time, will probably be zero and we
> + * will end up diving right back into the next checkpoint
> + * as soon as the previous transmission completed.
> + */
> + if (wait_time) {
> + g_usleep(wait_time * 1000);
> + }
> + }
> +
> + goto out;
> +
> +err:
> + /*
> + * TODO: Possible split-brain scenario:
> + * Normally, this should never be reached unless there was a
> + * connection error or network partition - in which case
> + * only the management software can resume the VM safely
> + * when it knows the exact state of the MC destination.
> + *
> + * We need management to poll the source and destination to deterine
> + * if the destination has already taken control. If not, then
> + * we need to resume the source.
> + *
> + * If there was a connection error during checkpoint *transmission*
> + * then the destination VM will likely have already resumed,
> + * in which case we need to stop the current VM from running
> + * and throw away any buffered packets.
> + *
> + * Verify that "disable_buffering" below does not release any traffic.
> + */
> + migrate_set_state(s, MIG_STATE_CHECKPOINTING, MIG_STATE_ERROR);
> +out:
> + if (mc_staging) {
> + qemu_fclose(mc_staging);
> + }
> +
> + if (mc_control) {
> + qemu_fclose(mc_control);
> + }
> +
> + mc_disable_buffering();
> +
> + qemu_mutex_lock_iothread();
> +
> + if (s->state != MIG_STATE_ERROR) {
> + migrate_set_state(s, MIG_STATE_CHECKPOINTING, MIG_STATE_COMPLETED);
> + }
> +
> + qemu_bh_schedule(s->cleanup_bh);
> + qemu_mutex_unlock_iothread();
> +
> + return NULL;
> +}
> +
> +/*
> + * Get the next copyset in the list. If there is none, then make one.
> + */
> +static MCCopyset *mc_copy_next(MCParams *mc, MCCopyset *copyset)
> +{
> + if (!QTAILQ_NEXT(copyset, node)) {
> + int idx = mc->nb_copysets++;
> + DDPRINTF("Extending copysets by one: %d sets total, "
> + "%" PRIu64 " MB\n", mc->nb_copysets,
> + mc->nb_copysets * sizeof(MCCopyset) / 1024UL / 1024UL);
> + mc->curr_copyset = g_malloc(sizeof(MCCopyset));
> + mc->curr_copyset->idx = idx;
> + QTAILQ_INSERT_TAIL(&mc->copy_head, mc->curr_copyset, node);
> + copyset = mc->curr_copyset;
> + } else {
> + copyset = QTAILQ_NEXT(copyset, node);
> + }
> +
> + mc->curr_copyset = copyset;
> + copyset->nb_copies = 0;
> +
> + return copyset;
> +}
> +
> +void mc_process_incoming_checkpoints_if_requested(QEMUFile *f)
> +{
> + MCParams mc = { .file = f };
> + MCSlab *slab;
> + int fd = qemu_get_fd(f);
> + QEMUFile *mc_control, *mc_staging;
> + uint64_t checkpoint_size, action;
> + uint64_t slabs;
> + int got, x, ret, received = 0;
> + bool checkpoint_received;
> +
> + CALC_MAX_STRIKES();
> +
> + if (!mc_requested) {
> + DPRINTF("Source has not requested MC. Returning.\n");
> + return;
> + }
> +
> + if (!(mc_control = qemu_fopen_socket(fd, "wb"))) {
> + fprintf(stderr, "Could not make incoming MC control channel\n");
> + goto rollback;
> + }
> +
> + if (!(mc_staging = qemu_fopen_mc(&mc, "rb"))) {
> + fprintf(stderr, "Could not make outgoing MC staging area\n");
> + goto rollback;
> + }
> +
> + //qemu_set_block(fd);
> + socket_set_nodelay(fd);
> +
> + while (true) {
> + checkpoint_received = false;
> + ret = mc_recv(f, MC_TRANSACTION_ANY,&action);
> + if (ret< 0) {
> + goto rollback;
> + }
> +
> + switch(action) {
> + case MC_TRANSACTION_START:
> + checkpoint_size = qemu_get_be64(f);
> + mc.start_copyset = qemu_get_be64(f);
> + slabs = qemu_get_be64(f);
> +
> + DDPRINTF("Transaction start: size %" PRIu64
> + " copyset start: %" PRIu64 " slabs %" PRIu64 "\n",
> + checkpoint_size, mc.start_copyset, slabs);
> +
> + assert(checkpoint_size);
> + break;
> + case MC_TRANSACTION_COMMIT: /* tcp */
> + slab = mc_slab_start(&mc);
> + received = 0;
> +
> + while (received< checkpoint_size) {
> + int total = 0;
> + slab->size = qemu_get_be64(f);
> +
> + DDPRINTF("Expecting size: %" PRIu64 "\n", slab->size);
> +
> + while (total != slab->size) {
> + got = qemu_get_buffer(f, slab->buf + total, slab->size - total);
> + if (got<= 0) {
> + fprintf(stderr, "Error pre-filling checkpoint: %d\n", got);
> + goto rollback;
> + }
> + DDPRINTF("Received %d slab %d / %ld received %d total %"
> + PRIu64 "\n", got, total, slab->size,
> + received, checkpoint_size);
> + received += got;
> + total += got;
> + }
> +
> + if (received != checkpoint_size) {
> + slab = mc_slab_next(&mc, slab);
> + }
> + }
> +
> + DDPRINTF("Acknowledging successful commit\n");
> +
> + if (mc_send(mc_control, MC_TRANSACTION_ACK)< 0) {
> + goto rollback;
> + }
> +
> + checkpoint_received = true;
> + break;
> + case RAM_SAVE_FLAG_HOOK: /* rdma */
> + /*
> + * Must be RDMA registration handling. Preallocate
> + * the slabs (if not already done in a previous checkpoint)
> + * before allowing RDMA to register them.
> + */
> + slab = mc_slab_start(&mc);
> +
> + DDPRINTF("Pre-populating slabs %" PRIu64 "...\n", slabs);
> +
> + for(x = 1; x< slabs; x++) {
> + slab = mc_slab_next(&mc, slab);
> + }
> +
> + ram_control_load_hook(f, action);
> +
> + DDPRINTF("Hook complete.\n");
> +
> + slab = QTAILQ_FIRST(&mc.slab_head);
> +
> + for(x = 0; x< slabs; x++) {
> + slab->size = qemu_get_be64(f);
> + slab = QTAILQ_NEXT(slab, node);
> + }
> +
> + checkpoint_received = true;
> + break;
> + default:
> + fprintf(stderr, "Unknown MC action: %" PRIu64 "\n", action);
> + goto rollback;
> + }
> +
> + if (checkpoint_received) {
> + mc.curr_slab = QTAILQ_FIRST(&mc.slab_head);
> + mc.slab_total = checkpoint_size;
> +
> + DDPRINTF("Committed Loading MC state \n");
> +
> + mc_copy_start(&mc);
> +
> + if (qemu_loadvm_state(mc_staging)< 0) {
> + fprintf(stderr, "loadvm transaction failed\n");
> + /*
> + * This is fatal. No rollback possible because we have potentially
> + * applied only a subset of the checkpoint to main memory, potentially
> + * leaving the VM in an inconsistent state.
> + */
> + goto err;
> + }
> +
> + mc.slab_total = checkpoint_size;
> +
> + DDPRINTF("Transaction complete.\n");
> + mc.checkpoints++;
> + }
> + }
> +
> +rollback:
> + fprintf(stderr, "MC: checkpointing stopped. Recovering VM\n");
> + goto out;
> +err:
> + fprintf(stderr, "Micro Checkpointing Protocol Failed\n");
> + exit(1);
> +out:
> + if (mc_staging) {
> + qemu_fclose(mc_staging);
> + }
> +
> + if (mc_control) {
> + qemu_fclose(mc_control);
> + }
> +}
> +
> +static int mc_get_buffer_internal(void *opaque, uint8_t *buf, int64_t pos,
> + int size, MCSlab **curr_slab, uint64_t end_idx)
> +{
> + uint64_t len = size;
> + uint8_t *data = (uint8_t *) buf;
> + MCSlab *slab = *curr_slab;
> + MCParams *mc = opaque;
> +
> + assert(slab);
> +
> + DDDPRINTF("got request for %d bytes %p %p. idx %d\n",
> + size, slab, QTAILQ_FIRST(&mc->slab_head), slab->idx);
> +
> + while (len&& slab) {
> + uint64_t get = MIN(slab->size - slab->read, len);
> +
> + memcpy(data, slab->buf + slab->read, get);
> +
> + data += get;
> + slab->read += get;
> + len -= get;
> + mc->slab_total -= get;
> +
> + DDDPRINTF("got: %" PRIu64 " read: %" PRIu64
> + " len %" PRIu64 " slab_total %" PRIu64
> + " size %" PRIu64 " addr: %p slab %d"
> + " requested %d\n",
> + get, slab->read, len, mc->slab_total,
> + slab->size, slab->buf, slab->idx, size);
> +
> + if (len) {
> + if (slab->idx == end_idx) {
> + break;
> + }
> +
> + slab = QTAILQ_NEXT(slab, node);
> + }
> + }
> +
> + *curr_slab = slab;
> + DDDPRINTF("Returning %" PRIu64 " / %d bytes\n", size - len, size);
> +
> + return size - len;
> +}
> +static int mc_get_buffer(void *opaque, uint8_t *buf, int64_t pos, int size)
> +{
> + MCParams *mc = opaque;
> +
> + return mc_get_buffer_internal(mc, buf, pos, size,&mc->curr_slab,
> + mc->start_copyset - 1);
> +}
> +
> +static int mc_load_page(QEMUFile *f, void *opaque, void *host_addr, long size)
> +{
> + MCParams *mc = opaque;
> +
> + DDDPRINTF("Loading page into %p of size %" PRIu64 "\n", host_addr, size);
> +
> + return mc_get_buffer_internal(mc, host_addr, 0, size,&mc->mem_slab,
> + mc->nb_slabs - 1);
> +}
> +
> +/*
> + * Provide QEMUFile with an *local* RDMA-based way to do memcpy().
> + * This lowers cache pollution and allows the CPU pipeline to
> + * remain free for regular use by VMs (as well as by neighbors).
> + *
> + * In a future implementation, we may attempt to perform this
> + * copy *without* stopping the source VM - if the data shows
> + * that it can be done effectively.
> + */
> +static int mc_save_page(QEMUFile *f, void *opaque,
> + ram_addr_t block_offset,
> + uint8_t *host_addr,
> + ram_addr_t offset,
> + long size, int *bytes_sent)
> +{
> + MCParams *mc = opaque;
> + MCCopyset *copyset = mc->curr_copyset;
> + MCCopy *c;
> +
> + if (copyset->nb_copies>= MC_MAX_SLAB_COPY_DESCRIPTORS) {
> + copyset = mc_copy_next(mc, copyset);
> + }
> +
> + c =©set->copies[copyset->nb_copies++];
> + c->ramblock_offset = (uint64_t) block_offset;
> + c->host_addr = (uint64_t) host_addr;
> + c->offset = (uint64_t) offset;
> + c->size = (uint64_t) size;
> + mc->total_copies++;
> +
> + return RAM_SAVE_CONTROL_DELAYED;
> +}
> +
> +static ssize_t mc_writev_buffer(void *opaque, struct iovec *iov,
> + int iovcnt, int64_t pos)
> +{
> + ssize_t len = 0;
> + unsigned int i;
> +
> + for (i = 0; i< iovcnt; i++) {
> + DDDPRINTF("iov # %d, len: %" PRId64 "\n", i, iov[i].iov_len);
> + len += mc_put_buffer(opaque, iov[i].iov_base, 0, iov[i].iov_len);
> + }
> +
> + return len;
> +}
> +
> +static int mc_get_fd(void *opaque)
> +{
> + MCParams *mc = opaque;
> +
> + return qemu_get_fd(mc->file);
> +}
> +
> +static int mc_close(void *opaque)
> +{
> + MCParams *mc = opaque;
> + MCSlab *slab, *next;
> +
> + QTAILQ_FOREACH_SAFE(slab,&mc->slab_head, node, next) {
> + ram_control_remove(mc->file, (uint64_t) slab->buf);
> + QTAILQ_REMOVE(&mc->slab_head, slab, node);
> + g_free(slab);
> + }
> +
> + mc->curr_slab = NULL;
> +
> + return 0;
> +}
> +
> +static const QEMUFileOps mc_write_ops = {
> + .writev_buffer = mc_writev_buffer,
> + .put_buffer = mc_put_buffer,
> + .get_fd = mc_get_fd,
> + .close = mc_close,
> + .save_page = mc_save_page,
> +};
> +
> +static const QEMUFileOps mc_read_ops = {
> + .get_buffer = mc_get_buffer,
> + .get_fd = mc_get_fd,
> + .close = mc_close,
> + .load_page = mc_load_page,
> +};
> +
> +QEMUFile *qemu_fopen_mc(void *opaque, const char *mode)
> +{
> + MCParams *mc = opaque;
> + MCSlab *slab;
> + MCCopyset *copyset;
> +
> + if (qemu_file_mode_is_not_valid(mode)) {
> + return NULL;
> + }
> +
> + QTAILQ_INIT(&mc->slab_head);
> + QTAILQ_INIT(&mc->copy_head);
> +
> + slab = qemu_memalign(8, sizeof(MCSlab));
> + memset(slab, 0, sizeof(*slab));
> + slab->idx = 0;
> + QTAILQ_INSERT_HEAD(&mc->slab_head, slab, node);
> + mc->slab_total = 0;
> + mc->curr_slab = slab;
> + mc->nb_slabs = 1;
> + mc->slab_strikes = 0;
> +
> + ram_control_add(mc->file, slab->buf, (uint64_t) slab->buf, MC_SLAB_BUFFER_SIZE);
> +
> + copyset = g_malloc(sizeof(MCCopyset));
> + copyset->idx = 0;
> + QTAILQ_INSERT_HEAD(&mc->copy_head, copyset, node);
> + mc->total_copies = 0;
> + mc->curr_copyset = copyset;
> + mc->nb_copysets = 1;
> + mc->copy_strikes = 0;
> +
> + if (mode[0] == 'w') {
> + return qemu_fopen_ops(mc,&mc_write_ops);
> + }
> +
> + return qemu_fopen_ops(mc,&mc_read_ops);
> +}
> +
> +static void mc_start_checkpointer(void *opaque) {
> + MigrationState *s = opaque;
> +
> + if (checkpoint_bh) {
> + qemu_bh_delete(checkpoint_bh);
> + checkpoint_bh = NULL;
> + }
> +
> + qemu_mutex_unlock_iothread();
> + qemu_thread_join(s->thread);
> + g_free(s->thread);
> + qemu_mutex_lock_iothread();
> +
> + migrate_set_state(s, MIG_STATE_ACTIVE, MIG_STATE_CHECKPOINTING);
> + s->thread = g_malloc0(sizeof(*s->thread));
> + qemu_thread_create(s->thread, mc_thread, s, QEMU_THREAD_JOINABLE);
> +}
> +
> +void mc_init_checkpointer(MigrationState *s)
> +{
> + CALC_MAX_STRIKES();
> + checkpoint_bh = qemu_bh_new(mc_start_checkpointer, s);
> + qemu_bh_schedule(checkpoint_bh);
> +}
> +
> +void qmp_migrate_set_mc_delay(int64_t value, Error **errp)
> +{
> + freq_ms = value;
> + CALC_MAX_STRIKES();
> + DPRINTF("Setting checkpoint frequency to %" PRId64 " ms and "
> + "resetting strikes to %d based on a %d sec delay.\n",
> + freq_ms, max_strikes, max_strikes_delay_secs);
> +}
> +
> +int mc_info_load(QEMUFile *f, void *opaque, int version_id)
> +{
> + bool mc_enabled = qemu_get_byte(f);
> +
> + if (mc_enabled&& !mc_requested) {
> + DPRINTF("MC is requested\n");
> + mc_requested = true;
> + }
> +
> + max_strikes = qemu_get_be32(f);
> +
> + return 0;
> +}
> +
> +void mc_info_save(QEMUFile *f, void *opaque)
> +{
> + qemu_put_byte(f, migrate_use_mc());
> + qemu_put_be32(f, max_strikes);
> +}
>
next prev parent reply other threads:[~2014-02-19 1:08 UTC|newest]
Thread overview: 68+ messages / expand[flat|nested] mbox.gz Atom feed top
2014-02-18 8:50 [Qemu-devel] [RFC PATCH v2 00/12] mc: fault tolerante through micro-checkpointing mrhines
2014-02-18 8:50 ` [Qemu-devel] [RFC PATCH v2 01/12] mc: add documentation for micro-checkpointing mrhines
2014-02-18 12:45 ` Dr. David Alan Gilbert
2014-02-19 1:40 ` Michael R. Hines
2014-02-19 11:27 ` Dr. David Alan Gilbert
2014-02-20 1:17 ` Michael R. Hines
2014-02-20 10:09 ` Dr. David Alan Gilbert
2014-02-20 11:14 ` Li Guang
2014-02-20 14:58 ` Michael R. Hines
2014-02-20 14:57 ` Michael R. Hines
2014-02-20 16:32 ` Dr. David Alan Gilbert
2014-02-21 4:54 ` Michael R. Hines
2014-02-21 9:44 ` Dr. David Alan Gilbert
2014-03-03 6:08 ` Michael R. Hines
2014-02-18 8:50 ` [Qemu-devel] [RFC PATCH v2 02/12] mc: timestamp migration_bitmap and KVM logdirty usage mrhines
2014-02-18 10:32 ` Dr. David Alan Gilbert
2014-02-19 1:42 ` Michael R. Hines
2014-03-11 21:31 ` Juan Quintela
2014-04-04 3:08 ` Michael R. Hines
2014-02-18 8:50 ` [Qemu-devel] [RFC PATCH v2 03/12] mc: introduce a 'checkpointing' status check into the VCPU states mrhines
2014-03-11 21:36 ` Juan Quintela
2014-04-04 3:11 ` Michael R. Hines
2014-03-11 21:40 ` Eric Blake
2014-04-04 3:12 ` Michael R. Hines
2014-02-18 8:50 ` [Qemu-devel] [RFC PATCH v2 04/12] mc: support custom page loading and copying mrhines
2014-02-18 8:50 ` [Qemu-devel] [RFC PATCH v2 05/12] rdma: accelerated memcpy() support and better external RDMA user interfaces mrhines
2014-02-18 8:50 ` [Qemu-devel] [RFC PATCH v2 06/12] mc: introduce state machine changes for MC mrhines
2014-02-19 1:00 ` Li Guang
2014-02-19 2:14 ` Michael R. Hines
2014-02-20 5:03 ` Michael R. Hines
2014-02-21 8:13 ` Michael R. Hines
2014-02-24 6:48 ` Li Guang
2014-02-26 2:52 ` Li Guang
2014-03-11 21:57 ` Juan Quintela
2014-04-04 3:50 ` Michael R. Hines
2014-02-18 8:50 ` [Qemu-devel] [RFC PATCH v2 07/12] mc: introduce additional QMP statistics for micro-checkpointing mrhines
2014-03-11 21:45 ` Eric Blake
2014-04-04 3:15 ` Michael R. Hines
2014-04-04 4:22 ` Eric Blake
2014-03-11 21:59 ` Juan Quintela
2014-04-04 3:55 ` Michael R. Hines
2014-02-18 8:50 ` [Qemu-devel] [RFC PATCH v2 08/12] mc: core logic mrhines
2014-02-19 1:07 ` Li Guang [this message]
2014-02-19 2:16 ` Michael R. Hines
2014-02-19 2:53 ` Li Guang
2014-02-19 4:27 ` Michael R. Hines
2014-02-18 8:50 ` [Qemu-devel] [RFC PATCH v2 09/12] mc: configure and makefile support mrhines
2014-02-18 8:50 ` [Qemu-devel] [RFC PATCH v2 10/12] mc: expose tunable parameter for checkpointing frequency mrhines
2014-03-11 21:49 ` Eric Blake
2014-03-11 22:15 ` Juan Quintela
2014-03-11 22:49 ` Eric Blake
2014-04-04 5:29 ` Michael R. Hines
2014-04-04 14:56 ` Eric Blake
2014-04-11 6:10 ` Michael R. Hines
2014-04-04 16:28 ` Dr. David Alan Gilbert
2014-04-04 16:35 ` Eric Blake
2014-04-04 3:29 ` Michael R. Hines
2014-02-18 8:50 ` [Qemu-devel] [RFC PATCH v2 11/12] mc: introduce new capabilities to control micro-checkpointing mrhines
2014-03-11 21:57 ` Eric Blake
2014-04-04 3:38 ` Michael R. Hines
2014-04-04 4:25 ` Eric Blake
2014-03-11 22:02 ` Juan Quintela
2014-03-11 22:07 ` Eric Blake
2014-04-04 3:57 ` Michael R. Hines
2014-04-04 3:56 ` Michael R. Hines
2014-02-18 8:50 ` [Qemu-devel] [RFC PATCH v2 12/12] mc: activate and use MC if requested mrhines
2014-02-18 9:28 ` [Qemu-devel] [RFC PATCH v2 00/12] mc: fault tolerante through micro-checkpointing Li Guang
2014-02-19 1:29 ` Michael R. Hines
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=530403E9.7060501@cn.fujitsu.com \
--to=lig.fnst@cn.fujitsu.com \
--cc=BIRAN@il.ibm.com \
--cc=EREZH@il.ibm.com \
--cc=GILR@il.ibm.com \
--cc=SADEKJ@il.ibm.com \
--cc=abali@us.ibm.com \
--cc=dbulkow@gmail.com \
--cc=gokul@us.ibm.com \
--cc=hinesmr@cn.ibm.com \
--cc=isaku.yamahata@gmail.com \
--cc=junqing.wang@cs2c.com.cn \
--cc=mrhines@linux.vnet.ibm.com \
--cc=mrhines@us.ibm.com \
--cc=onom@us.ibm.com \
--cc=owasserm@redhat.com \
--cc=pbonzini@redhat.com \
--cc=qemu-devel@nongnu.org \
--cc=quintela@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.