qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: "Michael R. Hines" <mrhines@linux.vnet.ibm.com>
To: Paolo Bonzini <pbonzini@redhat.com>
Cc: aliguori@us.ibm.com, mst@redhat.com, qemu-devel@nongnu.org,
	owasserm@redhat.com, abali@us.ibm.com, mrhines@us.ibm.com,
	gokul@us.ibm.com
Subject: Re: [Qemu-devel] [PULL 5/8] rdma: core rdma logic
Date: Tue, 16 Apr 2013 10:48:12 -0400	[thread overview]
Message-ID: <516D64AC.10009@linux.vnet.ibm.com> (raw)
In-Reply-To: <516CD851.7050705@redhat.com>

Incredible review, thank you for spending time on this. Much appreciated.

Will get started on it....

- Michael

On 04/16/2013 12:49 AM, Paolo Bonzini wrote:
> A bunch of comments, but nothing should be very hard to do.
>
> We're definitely converging.
>
> Paolo
>
> Il 16/04/2013 04:44, mrhines@linux.vnet.ibm.com ha scritto:
>> From: "Michael R. Hines" <mrhines@us.ibm.com>
>>
>> As requested, code that does need to be visible is kept
>> well contained inside this file and this is the only
>> new additional file to the entire patch - good
>> progress.
>>
>> This file includes the entire protocol and interfaces
>> required to perform RDMA migration.
>>
>> Also, the configure and Makefile modifications to link
>> this file are included.
>>
>> Full documentation is in docs/rdma.txt
>>
>> Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
>> ---
>>   Makefile.objs                 |    1 +
>>   configure                     |   29 +
>>   include/migration/migration.h |    4 +
>>   migration-rdma.c              | 2732 +++++++++++++++++++++++++++++++++++++++++
>>   migration.c                   |    8 +
>>   5 files changed, 2774 insertions(+)
>>   create mode 100644 migration-rdma.c
>>
>> diff --git a/Makefile.objs b/Makefile.objs
>> index a473348..d744827 100644
>> --- a/Makefile.objs
>> +++ b/Makefile.objs
>> @@ -49,6 +49,7 @@ common-obj-$(CONFIG_POSIX) += os-posix.o
>>   common-obj-$(CONFIG_LINUX) += fsdev/
>>   
>>   common-obj-y += migration.o migration-tcp.o
>> +common-obj-$(CONFIG_RDMA) += migration-rdma.o
>>   common-obj-y += qemu-char.o #aio.o
>>   common-obj-y += block-migration.o
>>   common-obj-y += page_cache.o xbzrle.o
>> diff --git a/configure b/configure
>> index 0788e27..48f067f 100755
>> --- a/configure
>> +++ b/configure
>> @@ -180,6 +180,7 @@ xfs=""
>>   
>>   vhost_net="no"
>>   kvm="no"
>> +rdma="yes"
>>   gprof="no"
>>   debug_tcg="no"
>>   debug="no"
>> @@ -920,6 +921,10 @@ for opt do
>>     ;;
>>     --enable-gtk) gtk="yes"
>>     ;;
>> +  --enable-rdma) rdma="yes"
>> +  ;;
>> +  --disable-rdma) rdma="no"
>> +  ;;
>>     --with-gtkabi=*) gtkabi="$optarg"
>>     ;;
>>     --enable-tpm) tpm="yes"
>> @@ -1128,6 +1133,8 @@ echo "  --enable-bluez           enable bluez stack connectivity"
>>   echo "  --disable-slirp          disable SLIRP userspace network connectivity"
>>   echo "  --disable-kvm            disable KVM acceleration support"
>>   echo "  --enable-kvm             enable KVM acceleration support"
>> +echo "  --disable-rdma           disable RDMA-based migration support"
>> +echo "  --enable-rdma            enable RDMA-based migration support"
>>   echo "  --enable-tcg-interpreter enable TCG with bytecode interpreter (TCI)"
>>   echo "  --disable-nptl           disable usermode NPTL support"
>>   echo "  --enable-nptl            enable usermode NPTL support"
>> @@ -1775,6 +1782,23 @@ EOF
>>     libs_softmmu="$sdl_libs $libs_softmmu"
>>   fi
>>   
>> +if test "$rdma" != "no" ; then
>> +  cat > $TMPC <<EOF
>> +#include <rdma/rdma_cma.h>
>> +int main(void) { return 0; }
>> +EOF
>> +  rdma_libs="-lrdmacm -libverbs"
>> +  if compile_prog "-Werror" "$rdma_libs" ; then
>> +    rdma="yes"
>> +    libs_softmmu="$libs_softmmu $rdma_libs"
>> +  else
>> +    if test "$rdma" = "yes" ; then
>> +      feature_not_found "rdma"
>> +    fi
>> +    rdma="no"
>> +  fi
>> +fi
>> +
>>   ##########################################
>>   # VNC TLS/WS detection
>>   if test "$vnc" = "yes" -a \( "$vnc_tls" != "no" -o "$vnc_ws" != "no" \) ; then
>> @@ -3500,6 +3524,7 @@ echo "Linux AIO support $linux_aio"
>>   echo "ATTR/XATTR support $attr"
>>   echo "Install blobs     $blobs"
>>   echo "KVM support       $kvm"
>> +echo "RDMA support      $rdma"
>>   echo "TCG interpreter   $tcg_interpreter"
>>   echo "fdt support       $fdt"
>>   echo "preadv support    $preadv"
>> @@ -4474,6 +4499,10 @@ if [ "$pixman" = "internal" ]; then
>>     echo "config-host.h: subdir-pixman" >> $config_host_mak
>>   fi
>>   
>> +if test "$rdma" = "yes" ; then
>> +echo "CONFIG_RDMA=y" >> $config_host_mak
>> +fi
>> +
>>   # build tree in object directory in case the source is not in the current directory
>>   DIRS="tests tests/tcg tests/tcg/cris tests/tcg/lm32"
>>   DIRS="$DIRS pc-bios/optionrom pc-bios/spapr-rtas"
>> diff --git a/include/migration/migration.h b/include/migration/migration.h
>> index e71ac12..c03f5dc 100644
>> --- a/include/migration/migration.h
>> +++ b/include/migration/migration.h
>> @@ -76,6 +76,10 @@ void fd_start_incoming_migration(const char *path, Error **errp);
>>   
>>   void fd_start_outgoing_migration(MigrationState *s, const char *fdname, Error **errp);
>>   
>> +void rdma_start_outgoing_migration(void *opaque, const char *host_port, Error **errp);
>> +
>> +void rdma_start_incoming_migration(const char *host_port, Error **errp);
>> +
>>   void migrate_fd_error(MigrationState *s);
>>   
>>   void migrate_fd_connect(MigrationState *s);
>> diff --git a/migration-rdma.c b/migration-rdma.c
>> new file mode 100644
>> index 0000000..9a23009
>> --- /dev/null
>> +++ b/migration-rdma.c
>> @@ -0,0 +1,2732 @@
>> +/*
>> + *  Copyright (C) 2013 Michael R. Hines <mrhines@us.ibm.com>
>> + *  Copyright (C) 2010 Jiuxing Liu <jl@us.ibm.com>
>> + *
>> + *  RDMA protocol and interfaces
>> + *
>> + *  This program is free software; you can redistribute it and/or modify
>> + *  it under the terms of the GNU General Public License as published by
>> + *  the Free Software Foundation; under version 2 of the License.
>> + *
>> + *  This program is distributed in the hope that it will be useful,
>> + *  but WITHOUT ANY WARRANTY; without even the implied warranty of
>> + *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
>> + *  GNU General Public License for more details.
>> + *
>> + *  You should have received a copy of the GNU General Public License
>> + *  along with this program; if not, see <http://www.gnu.org/licenses/>.
>> + */
>> +#include "qemu-common.h"
>> +#include "migration/migration.h"
>> +#include "migration/qemu-file.h"
>> +#include "exec/cpu-common.h"
>> +#include "qemu/main-loop.h"
>> +#include "qemu/sockets.h"
>> +#include <stdio.h>
>> +#include <sys/types.h>
>> +#include <sys/socket.h>
>> +#include <netdb.h>
>> +#include <arpa/inet.h>
>> +#include <string.h>
>> +#include <rdma/rdma_cma.h>
>> +
>> +//#define DEBUG_RDMA
>> +//#define DEBUG_RDMA_VERBOSE
>> +
>> +#ifdef DEBUG_RDMA
>> +#define DPRINTF(fmt, ...) \
>> +    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
>> +#else
>> +#define DPRINTF(fmt, ...) \
>> +    do { } while (0)
>> +#endif
>> +
>> +#ifdef DEBUG_RDMA_VERBOSE
>> +#define DDPRINTF(fmt, ...) \
>> +    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
>> +#else
>> +#define DDPRINTF(fmt, ...) \
>> +    do { } while (0)
>> +#endif
>> +
>> +#define RDMA_RESOLVE_TIMEOUT_MS 10000
>> +
>> +#define RDMA_CHUNK_REGISTRATION
>> +#define RDMA_LAZY_CLIENT_REGISTRATION
> Please drop these; no dead code.
>
>> +/* Do not merge data if larger than this. */
>> +#define RDMA_MERGE_MAX (4 * 1024 * 1024)
>> +#define RDMA_UNSIGNALED_SEND_MAX 64
>> +
>> +#define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
>> +//#define RDMA_REG_CHUNK_SHIFT 21 /* 2 MB */
>> +//#define RDMA_REG_CHUNK_SHIFT 22 /* 4 MB */
>> +//#define RDMA_REG_CHUNK_SHIFT 23 /* 8 MB */
>> +//#define RDMA_REG_CHUNK_SHIFT 24 /* 16 MB */
>> +//#define RDMA_REG_CHUNK_SHIFT 25 /* 32 MB */
>> +//#define RDMA_REG_CHUNK_SHIFT 26 /* 64 MB */
>> +//#define RDMA_REG_CHUNK_SHIFT 27 /* 128 MB */
>> +//#define RDMA_REG_CHUNK_SHIFT 28 /* 256 MB */
> IIUC this must be agreed upon by source and destination.  Either make it
> part of the protocol (e.g. using the extra available bits in the
> RAM_SAVE_HOOK 64-bit value), or drop the alternatives.
>
>> +#define RDMA_REG_CHUNK_SIZE (1UL << (RDMA_REG_CHUNK_SHIFT))
>> +#define RDMA_REG_CHUNK_INDEX(start_addr, host_addr) \
>> +            (((unsigned long)(host_addr) >> RDMA_REG_CHUNK_SHIFT) - \
>> +            ((unsigned long)(start_addr) >> RDMA_REG_CHUNK_SHIFT))
> This should be:
>
> #define RDMA_REG_CHUNK_INDEX(start_addr, host_addr) \
>              (((uintptr_t)(host_addr) - (uintptr_t)(start_addr)) \
>               >> RDMA_REG_CHUNK_SHIFT)
>
>> +#define RDMA_REG_NUM_CHUNKS(rdma_ram_block) \
>> +            (RDMA_REG_CHUNK_INDEX((rdma_ram_block)->local_host_addr,\
>> +                (rdma_ram_block)->local_host_addr +\
>> +                (rdma_ram_block)->length) + 1)
>> +#define RDMA_REG_CHUNK_START(rdma_ram_block, i) ((uint8_t *)\
>> +            ((((unsigned long)((rdma_ram_block)->local_host_addr) >> \
>> +                RDMA_REG_CHUNK_SHIFT) + (i)) << \
>> +                RDMA_REG_CHUNK_SHIFT))
> This should be:
>
> #define RDMA_REG_CHUNK_START(rdma_ram_block, i) ((uint8_t *)\
>              (((uintptr_t)(rdma_ram_block)->local_host_addr) \
>               + ((i) << RDMA_REG_CHUNK_SHIFT))
>
> Otherwise, the length of the first chunk might be different on the two
> sides.  You're probably not seeing because memory is aligned to 2MB by
> default, but with bigger chunk sizes you might get it.
>
>> +#define RDMA_REG_CHUNK_END(rdma_ram_block, i) \
>> +            (RDMA_REG_CHUNK_START(rdma_ram_block, i) + \
>> +             RDMA_REG_CHUNK_SIZE)
>> +
>> +/*
>> + * This is only for non-live state being migrated.
>> + * Instead of RDMA_WRITE messages, we use RDMA_SEND
>> + * messages for that state, which requires a different
>> + * delivery design than main memory.
>> + */
>> +#define RDMA_SEND_INCREMENT 32768
>> +
>> +#define RDMA_BLOCKING
> What's the difference?
>
>> +/*
>> + * Completion queue can be filled by both read and write work requests,
>> + * so must reflect the sum of both possible queue sizes.
>> + */
>> +#define RDMA_QP_SIZE 1000
>> +#define RDMA_CQ_SIZE (RDMA_QP_SIZE * 3)
>> +
>> +/*
>> + * Maximum size infiniband SEND message
>> + */
>> +#define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
>> +#define RDMA_CONTROL_MAX_WR 2
>> +#define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
>> +
>> +/*
>> + * Capabilities for negotiation.
>> + */
>> +#define RDMA_CAPABILITY_CHUNK_REGISTER 0x01
>> +#define RDMA_CAPABILITY_NEXT_FEATURE   0x02
> Please drop RDMA_CAPABILITY_NEXT_FEATURE and fail migration if unknown
> capabilities are transmitted.
>
>> +/*
>> + * RDMA migration protocol:
>> + * 1. RDMA Writes (data messages, i.e. RAM)
>> + * 2. IB Send/Recv (control channel messages)
>> + */
>> +enum {
>> +    RDMA_WRID_NONE = 0,
>> +    RDMA_WRID_RDMA_WRITE,
>> +    RDMA_WRID_SEND_CONTROL = 1000,
>> +    RDMA_WRID_RECV_CONTROL = 2000,
>> +};
>> +
>> +const char *wrid_desc[] = {
>> +        [RDMA_WRID_NONE] = "NONE",
>> +        [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
>> +        [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
>> +        [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
>> +};
>> +
>> +/*
>> + * SEND/RECV IB Control Messages.
>> + */
>> +enum {
>> +    RDMA_CONTROL_NONE = 0,
>> +    RDMA_CONTROL_READY,             /* ready to receive */
>> +    RDMA_CONTROL_QEMU_FILE,         /* QEMUFile-transmitted bytes */
>> +    RDMA_CONTROL_RAM_BLOCKS,        /* RAMBlock synchronization */
>> +    RDMA_CONTROL_COMPRESS,          /* page contains repeat values */
>> +    RDMA_CONTROL_REGISTER_REQUEST,  /* dynamic page registration */
>> +    RDMA_CONTROL_REGISTER_RESULT,   /* key to use after registration */
>> +    RDMA_CONTROL_REGISTER_FINISHED, /* current iteration finished */
>> +};
>> +
>> +const char *control_desc[] = {
>> +        [RDMA_CONTROL_NONE] = "NONE",
>> +        [RDMA_CONTROL_READY] = "READY",
>> +        [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
>> +        [RDMA_CONTROL_RAM_BLOCKS] = "REMOTE INFO",
>> +        [RDMA_CONTROL_COMPRESS] = "COMPRESS",
>> +        [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
>> +        [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
>> +        [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
>> +};
>> +
>> +/*
>> + * Memory and MR structures used to represent an IB Send/Recv work request.
>> + * This is *not* used for RDMA, only IB Send/Recv.
>> + */
>> +typedef struct {
>> +    uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
>> +    struct   ibv_mr *control_mr;               /* registration metadata */
>> +    size_t   control_len;                      /* length of the message */
>> +    uint8_t *control_curr;                     /* start of unconsumed bytes */
>> +} RDMAWorkRequestData;
>> +
>> +/*
>> + * Negotiate RDMA capabilities during connection-setup time.
>> + */
>> +typedef struct {
>> +    uint32_t version;
>> +    uint32_t flags;
>> +} RDMACapabilities;
>> +
>> +/*
>> + * Main data structure for RDMA state.
>> + * While there is only one copy of this structure being allocated right now,
>> + * this is the place where one would start if you wanted to consider
>> + * having more than one RDMA connection open at the same time.
>> + */
>> +typedef struct RDMAContext {
>> +    char *host;
>> +    int port;
>> +
>> +    /* This is used by the migration protocol to transmit
>> +     * control messages (such as device state and registration commands)
>> +     *
>> +     * WR #0 is for control channel ready messages from the server.
>> +     * WR #1 is for control channel data messages from the server.
> Please use destination/source instead of server/client.
>
>> +     * WR #2 is for control channel send messages.
>> +     *
>> +     * We could use more WRs, but we have enough for now.
>> +     */
>> +    RDMAWorkRequestData wr_data[RDMA_CONTROL_MAX_WR + 1];
>> +
>> +    /*
>> +     * This is used by *_exchange_send() to figure out whether or not
>> +     * the initial "READY" message has already been received or not.
>> +     * This is because other functions may potentially poll() and detect
>> +     * the READY message before send() does, in which case we need to
>> +     * know if it completed.
>> +     */
>> +    int control_ready_expected;
>> +
>> +    /* The rest is only for the initiator of the migration. */
>> +    int client_init_done;
>> +
>> +    /* number of outstanding unsignaled send */
>> +    int num_unsignaled_send;
>> +
>> +    /* number of outstanding signaled send */
>> +    int num_signaled_send;
>> +
>> +    /* store info about current buffer so that we can
>> +       merge it with future sends */
>> +    uint64_t current_offset;
>> +    uint64_t current_length;
>> +    /* index of ram block the current buffer belongs to */
>> +    int current_index;
>> +    /* index of the chunk in the current ram block */
>> +    int current_chunk;
>> +
>> +    bool chunk_register_destination;
>> +
>> +    /*
>> +     * infiniband-specific variables for opening the device
>> +     * and maintaining connection state and so forth.
>> +     *
>> +     * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
>> +     * cm_id->verbs, cm_id->channel, and cm_id->qp.
>> +     */
>> +    struct rdma_cm_id *cm_id;               /* connection manager ID */
>> +    struct rdma_cm_id *listen_id;
>> +
>> +    struct ibv_context *verbs;
>> +    struct rdma_event_channel *channel;
>> +    struct ibv_qp *qp;                      /* queue pair */
>> +    struct ibv_comp_channel *comp_channel;  /* completion channel */
>> +    struct ibv_pd *pd;                      /* protection domain */
>> +    struct ibv_cq *cq;                      /* completion queue */
>> +} RDMAContext;
>> +
>> +/*
>> + * Interface to the rest of the migration call stack.
>> + */
>> +typedef struct QEMUFileRDMA {
>> +    RDMAContext *rdma;
>> +    size_t len;
>> +    void *file;
>> +} QEMUFileRDMA;
>> +
>> +/*
>> + * Representation of a RAMBlock from an RDMA perspective.
>> + * This an subsequent structures cannot be linked lists
>> + * because we're using a single IB message to transmit
>> + * the information. It's small anyway, so a list is overkill.
>> + */
>> +typedef struct RDMALocalBlock {
>> +    uint8_t  *local_host_addr; /* local virtual address */
>> +    uint64_t remote_host_addr; /* remote virtual address */
>> +    uint64_t offset;
>> +    uint64_t length;
>> +    struct   ibv_mr **pmr;     /* MRs for chunk-level registration */
>> +    struct   ibv_mr *mr;       /* MR for non-chunk-level registration */
>> +    uint32_t *remote_keys;     /* rkeys for chunk-level registration */
>> +    uint32_t remote_rkey;      /* rkeys for non-chunk-level registration */
>> +} RDMALocalBlock;
>> +
>> +/*
>> + * Also represents a RAMblock, but only on the server.
>> + * This gets transmitted by the server during connection-time
>> + * to the client / primary VM and then is used to populate the
>> + * corresponding RDMALocalBlock with
>> + * the information needed to perform the actual RDMA.
>> + *
>> + */
>> +typedef struct RDMARemoteBlock {
>> +    uint64_t remote_host_addr;
>> +    uint64_t offset;
>> +    uint64_t length;
>> +    uint32_t remote_rkey;
> This has padding here, you cannot portably memcpy it to a buffer.
> Please add an uint32_t, and add QEMU_PACKED after the closing brace to
> mark that it is part of the wire protocol.
>
>> +} RDMARemoteBlock;
>> +
>> +/*
>> + * Virtual address of the above structures used for transmitting
>> + * the RAMBlock descriptions at connection-time.
>> + */
>> +typedef struct RDMALocalBlocks {
>> +    int num_blocks;
>> +    RDMALocalBlock *block;
>> +} RDMALocalBlocks;
>> +
>> +/*
>> + * Same as above
>> + */
>> +typedef struct RDMARemoteBlocks {
>> +    int *num_blocks;
>> +    RDMARemoteBlock *block;
>> +    void *remote_area;
>> +    int remote_size;
>> +} RDMARemoteBlocks;
>> +
>> +#define RDMA_CONTROL_VERSION_1      1
>> +//#define RDMA_CONTROL_VERSION_2      2  /* next version */
>> +#define RDMA_CONTROL_VERSION_MAX    1
> Drop these three...
>
>> +#define RDMA_CONTROL_VERSION_MIN    1    /* change on next version */
>> +#define RDMA_CONTROL_CURRENT_VERSION RDMA_CONTROL_VERSION_1
> ... rename this to RDMA_CONTROL_VERSION_CURRENT and #define it to 1.
>
>> +/*
>> + * Main structure for IB Send/Recv control messages.
>> + * This gets prepended at the beginning of every Send/Recv.
>> + */
>> +typedef struct {
>> +    uint32_t len;     /* Total length of data portion */
>> +    uint32_t type;    /* which control command to perform */
>> +    uint32_t version; /* version */
> The version is not needed, you can just use different types.  It is also
> wrong to use the same "namespace" for protocol and command versions,
> they are two different things.
>
>> +    uint32_t repeat;  /* number of commands in data portion of same type */
>> +} RDMAControlHeader;
> QEMU_PACKED.
>
>> +/*
>> + * Register a single Chunk.
>> + * Information sent by the primary VM to inform the server
>> + * to register an single chunk of memory before we can perform
>> + * the actual RDMA operation.
>> + */
>> +typedef struct {
>> +    uint32_t len;           /* length of the chunk to be registered */
>> +    uint32_t current_index; /* which ramblock the chunk belongs to */
>> +    uint64_t offset;        /* offset into the ramblock of the chunk */
>> +} RDMARegister;
> QEMU_PACKED.
>
>> +typedef struct {
>> +    uint32_t value;     /* if zero, we will madvise() */
>> +    uint64_t offset;    /* where in the remote ramblock this chunk */
>> +    uint32_t block_idx; /* which ram block index */
>> +    uint64_t length;    /* length of the chunk */
>> +} RDMACompress;
> Part of the protocol, must be QEMU_PACKED.  Please also move block_idx
> above offset (of value below offset) to keep everything nicely aligned.
>
>> +/*
>> + * The result of the server's memory registration produces an "rkey"
>> + * which the primary VM must reference in order to perform
>> + * the RDMA operation.
>> + */
>> +typedef struct {
>> +    uint32_t rkey;
>> +} RDMARegisterResult;
> QEMU_PACKED.
>
>> +#define RDMAControlHeaderSize sizeof(RDMAControlHeader)
> Just use the definition throughout.
>
>> +
>> +RDMALocalBlocks local_ram_blocks;
>> +RDMARemoteBlocks remote_ram_blocks;
> The first of these is (almost) always passed as a pointer, the other is
> used directly.  Make up your mind. :)  I would move these to RDMAContext.
>
>> +
>> +/*
>> + * Memory regions need to be registered with the device and queue pairs setup
>> + * in advanced before the migration starts. This tells us where the RAM blocks
>> + * are so that we can register them individually.
>> + */
>> +static void qemu_rdma_init_one_block(void *host_addr,
>> +    ram_addr_t offset, ram_addr_t length, void *opaque)
>> +{
>> +    RDMALocalBlocks *rdma_local_ram_blocks = opaque;
>> +    int num_blocks = rdma_local_ram_blocks->num_blocks;
>> +
>> +    rdma_local_ram_blocks->block[num_blocks].local_host_addr = host_addr;
>> +    rdma_local_ram_blocks->block[num_blocks].offset = (uint64_t)offset;
>> +    rdma_local_ram_blocks->block[num_blocks].length = (uint64_t)length;
>> +    rdma_local_ram_blocks->num_blocks++;
>> +
>> +}
>> +
>> +static void qemu_rdma_ram_block_counter(void *host_addr,
>> +            ram_addr_t offset, ram_addr_t length, void *opaque)
>> +{
>> +    int *num_blocks = opaque;
>> +    *num_blocks = *num_blocks + 1;
>> +}
>> +
>> +/*remote
>> + * Identify the RAMBlocks and their quantity. They will be references to
>> + * identify chunk boundaries inside each RAMBlock and also be referenced
>> + * during dynamic page registration.
>> + */
>> +static int qemu_rdma_init_ram_blocks(RDMALocalBlocks *rdma_local_ram_blocks)
>> +{
>> +    int num_blocks = 0;
>> +
>> +    qemu_ram_foreach_block(qemu_rdma_ram_block_counter, &num_blocks);
>> +
>> +    memset(rdma_local_ram_blocks, 0, sizeof *rdma_local_ram_blocks);
>> +    rdma_local_ram_blocks->block = g_malloc0(sizeof(RDMALocalBlock) *
>> +                                    num_blocks);
>> +
>> +    rdma_local_ram_blocks->num_blocks = 0;
>> +    qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma_local_ram_blocks);
>> +
>> +    DPRINTF("Allocated %d local ram block structures\n",
>> +                    rdma_local_ram_blocks->num_blocks);
>> +    return 0;
>> +}
>> +/*
>> + * Put in the log file which RDMA device was opened and the details
>> + * associated with that device.
>> + */
>> +static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
>> +{
>> +    printf("%s RDMA Device opened: kernel name %s "
>> +           "uverbs device name %s, "
>> +           "infiniband_verbs class device path %s,"
>> +           " infiniband class device path %s\n",
>> +                who,
>> +                verbs->device->name,
>> +                verbs->device->dev_name,
>> +                verbs->device->dev_path,
>> +                verbs->device->ibdev_path);
>> +}
>> +
>> +/*
>> + * Put in the log file the RDMA gid addressing information,
>> + * useful for folks who have trouble understanding the
>> + * RDMA device hierarchy in the kernel.
>> + */
>> +static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
>> +{
>> +    char sgid[33];
>> +    char dgid[33];
>> +    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
>> +    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
>> +    DPRINTF("%s Source GID: %s, Dest GID: %s\n", who, sgid, dgid);
>> +}
>> +
>> +/*
>> + * Figure out which RDMA device corresponds to the requested IP hostname
>> + * Also create the initial connection manager identifiers for opening
>> + * the connection.
>> + */
>> +static int qemu_rdma_resolve_host(RDMAContext *rdma)
>> +{
>> +    int ret;
>> +    struct addrinfo *res;
>> +    char port_str[16];
>> +    struct rdma_cm_event *cm_event;
>> +    char ip[40] = "unknown";
>> +
>> +    if (rdma->host == NULL || !strcmp(rdma->host, "")) {
>> +        fprintf(stderr, "RDMA hostname has not been set\n");
>> +        return -1;
>> +    }
>> +
>> +    /* create CM channel */
>> +    rdma->channel = rdma_create_event_channel();
>> +    if (!rdma->channel) {
>> +        fprintf(stderr, "could not create CM channel\n");
>> +        return -1;
>> +    }
>> +
>> +    /* create CM id */
>> +    ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
>> +    if (ret) {
>> +        fprintf(stderr, "could not create channel id\n");
>> +        goto err_resolve_create_id;
>> +    }
>> +
>> +    snprintf(port_str, 16, "%d", rdma->port);
>> +    port_str[15] = '\0';
>> +
>> +    ret = getaddrinfo(rdma->host, port_str, NULL, &res);
>> +    if (ret < 0) {
>> +        fprintf(stderr, "could not getaddrinfo destination address %s\n",
>> +                        rdma->host);
>> +        goto err_resolve_get_addr;
>> +    }
>> +
>> +    inet_ntop(AF_INET, &((struct sockaddr_in *) res->ai_addr)->sin_addr,
>> +                                ip, sizeof ip);
>> +    printf("%s => %s\n", rdma->host, ip);
> Drop this printf.
>> +
>> +    /* resolve the first address */
>> +    ret = rdma_resolve_addr(rdma->cm_id, NULL, res->ai_addr,
>> +            RDMA_RESOLVE_TIMEOUT_MS);
>> +    if (ret) {
>> +        fprintf(stderr, "could not resolve address %s\n", rdma->host);
>> +        goto err_resolve_get_addr;
>> +    }
>> +
>> +    qemu_rdma_dump_gid("client_resolve_addr", rdma->cm_id);
>> +
>> +    ret = rdma_get_cm_event(rdma->channel, &cm_event);
>> +    if (ret) {
>> +        fprintf(stderr, "could not perform event_addr_resolved\n");
>> +        goto err_resolve_get_addr;
>> +    }
>> +
>> +    if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
>> +        fprintf(stderr, "result not equal to event_addr_resolved %s\n",
>> +                rdma_event_str(cm_event->event));
>> +        perror("rdma_resolve_addr");
>> +        rdma_ack_cm_event(cm_event);
>> +        goto err_resolve_get_addr;
>> +    }
>> +    rdma_ack_cm_event(cm_event);
> Move the rdma_ack_cm_event before the if, so that you can have it just once.
>
>> +
>> +    /* resolve route */
>> +    ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
>> +    if (ret) {
>> +        fprintf(stderr, "could not resolve rdma route\n");
>> +        goto err_resolve_get_addr;
>> +    }
>> +
>> +    ret = rdma_get_cm_event(rdma->channel, &cm_event);
>> +    if (ret) {
>> +        fprintf(stderr, "could not perform event_route_resolved\n");
>> +        goto err_resolve_get_addr;
>> +    }
>> +    if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
>> +        fprintf(stderr, "result not equal to event_route_resolved: %s\n",
>> +                        rdma_event_str(cm_event->event));
>> +        rdma_ack_cm_event(cm_event);
>> +        goto err_resolve_get_addr;
>> +    }
>> +    rdma_ack_cm_event(cm_event);
> Same here.
>
>> +    rdma->verbs = rdma->cm_id->verbs;
>> +    qemu_rdma_dump_id("client_resolve_host", rdma->cm_id->verbs);
>> +    qemu_rdma_dump_gid("client_resolve_host", rdma->cm_id);
>> +    return 0;
>> +
>> +err_resolve_get_addr:
>> +    rdma_destroy_id(rdma->cm_id);
>> +err_resolve_create_id:
>> +    rdma_destroy_event_channel(rdma->channel);
>> +    rdma->channel = NULL;
>> +
>> +    return -1;
>> +}
>> +
>> +/*
>> + * Create protection domain and completion queues
>> + */
>> +static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
>> +{
>> +    /* allocate pd */
>> +    rdma->pd = ibv_alloc_pd(rdma->verbs);
>> +    if (!rdma->pd) {
>> +        return -1;
>> +    }
>> +
>> +#ifdef RDMA_BLOCKING
>> +    /* create completion channel */
>> +    rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
>> +    if (!rdma->comp_channel) {
>> +        goto err_alloc_pd_cq;
>> +    }
>> +#endif
>> +
>> +    /* create cq */
>> +    rdma->cq = ibv_create_cq(rdma->verbs, RDMA_CQ_SIZE,
>> +            NULL, rdma->comp_channel, 0);
>> +    if (!rdma->cq) {
>> +        goto err_alloc_pd_cq;
>> +    }
>> +
>> +    return 0;
>> +
>> +err_alloc_pd_cq:
>> +    if (rdma->pd) {
>> +        ibv_dealloc_pd(rdma->pd);
>> +    }
>> +    if (rdma->comp_channel) {
>> +        ibv_destroy_comp_channel(rdma->comp_channel);
>> +    }
>> +    rdma->pd = NULL;
>> +    rdma->comp_channel = NULL;
>> +    return -1;
>> +
>> +}
>> +
>> +/*
>> + * Create queue pairs.
>> + */
>> +static int qemu_rdma_alloc_qp(RDMAContext *rdma)
>> +{
>> +    struct ibv_qp_init_attr attr = { 0 };
>> +    int ret;
>> +
>> +    attr.cap.max_send_wr = RDMA_QP_SIZE;
>> +    attr.cap.max_recv_wr = 3;
>> +    attr.cap.max_send_sge = 1;
>> +    attr.cap.max_recv_sge = 1;
>> +    attr.send_cq = rdma->cq;
>> +    attr.recv_cq = rdma->cq;
>> +    attr.qp_type = IBV_QPT_RC;
>> +
>> +    ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
>> +    if (ret) {
>> +        return -1;
>> +    }
>> +
>> +    rdma->qp = rdma->cm_id->qp;
>> +    return 0;
>> +}
>> +
>> +/*
>> + * This is probably dead code, but its here anyway for testing.
>> + * Sometimes nice to know the performance tradeoffs of pinning.
>> + */
> Please drop this function.
>
>> +#if !defined(RDMA_LAZY_CLIENT_REGISTRATION)
>> +static int qemu_rdma_reg_chunk_ram_blocks(RDMAContext *rdma,
>> +        RDMALocalBlocks *rdma_local_ram_blocks)
>> +{
>> +    int i, j;
>> +    for (i = 0; i < rdma_local_ram_blocks->num_blocks; i++) {
>> +        RDMALocalBlock *block = &(rdma_local_ram_blocks->block[i]);
>> +        int num_chunks = RDMA_REG_NUM_CHUNKS(block);
>> +        /* allocate memory to store chunk MRs */
>> +        rdma_local_ram_blocks->block[i].pmr = g_malloc0(
>> +                                num_chunks * sizeof(struct ibv_mr *));
>> +
>> +        if (!block->pmr) {
>> +            goto err_reg_chunk_ram_blocks;
>> +        }
>> +
>> +        for (j = 0; j < num_chunks; j++) {
>> +            uint8_t *start_addr = RDMA_REG_CHUNK_START(block, j);
>> +            uint8_t *end_addr = RDMA_REG_CHUNK_END(block, j);
>> +            if (start_addr < block->local_host_addr) {
>> +                start_addr = block->local_host_addr;
>> +            }
>> +            if (end_addr > block->local_host_addr + block->length) {
>> +                end_addr = block->local_host_addr + block->length;
>> +            }
>> +            block->pmr[j] = ibv_reg_mr(rdma->pd,
>> +                                start_addr,
>> +                                end_addr - start_addr,
>> +                                IBV_ACCESS_REMOTE_READ
>> +                                );
>> +            if (!block->pmr[j]) {
>> +                break;
>> +            }
>> +        }
>> +        if (j < num_chunks) {
>> +            for (j--; j >= 0; j--) {
>> +                ibv_dereg_mr(block->pmr[j]);
>> +            }
>> +            block->pmr[i] = NULL;
>> +            goto err_reg_chunk_ram_blocks;
>> +        }
>> +    }
>> +
>> +    return 0;
>> +
>> +err_reg_chunk_ram_blocks:
>> +    for (i--; i >= 0; i--) {
>> +        int num_chunks =
>> +            RDMA_REG_NUM_CHUNKS(&(rdma_local_ram_blocks->block[i]));
>> +        for (j = 0; j < num_chunks; j++) {
>> +            ibv_dereg_mr(rdma_local_ram_blocks->block[i].pmr[j]);
>> +        }
>> +        free(rdma_local_ram_blocks->block[i].pmr);
>> +        rdma_local_ram_blocks->block[i].pmr = NULL;
>> +    }
>> +
>> +    return -1;
>> +
>> +}
>> +#endif
>> +
>> +/*
>> + * Also probably dead code, but for the same reason, its nice
>> + * to know the performance tradeoffs of dynamic registration
>> + * on both sides of the connection.
>> + */
> Actually it is not dead code. :)
>
>> +static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma,
>> +                                RDMALocalBlocks *rdma_local_ram_blocks)
>> +{
>> +    int i;
>> +    for (i = 0; i < rdma_local_ram_blocks->num_blocks; i++) {
>> +        rdma_local_ram_blocks->block[i].mr =
>> +            ibv_reg_mr(rdma->pd,
>> +                    rdma_local_ram_blocks->block[i].local_host_addr,
>> +                    rdma_local_ram_blocks->block[i].length,
>> +                    IBV_ACCESS_LOCAL_WRITE |
>> +                    IBV_ACCESS_REMOTE_WRITE
>> +                    );
>> +        if (!rdma_local_ram_blocks->block[i].mr) {
>> +            fprintf(stderr, "Failed to register local server ram block!\n");
>> +            break;
>> +        }
>> +    }
>> +
>> +    if (i >= rdma_local_ram_blocks->num_blocks) {
>> +        return 0;
>> +    }
>> +
>> +    for (i--; i >= 0; i--) {
>> +        ibv_dereg_mr(rdma_local_ram_blocks->block[i].mr);
>> +    }
>> +
>> +    return -1;
>> +
>> +}
>> +
>> +static int qemu_rdma_client_reg_ram_blocks(RDMAContext *rdma,
>> +                                RDMALocalBlocks *rdma_local_ram_blocks)
>> +{
>> +#ifdef RDMA_CHUNK_REGISTRATION
>> +#ifdef RDMA_LAZY_CLIENT_REGISTRATION
>> +    return 0;
>> +#else
>> +    return qemu_rdma_reg_chunk_ram_blocks(rdma, rdma_local_ram_blocks);
>> +#endif
>> +#else
>> +    return qemu_rdma_reg_whole_ram_blocks(rdma, rdma_local_ram_blocks);
>> +#endif
>> +}
> Drop this function altogether, replace call with a comment saying that
> the client only does on-demand registration.
>
>> +
>> +static int qemu_rdma_server_reg_ram_blocks(RDMAContext *rdma,
>> +                                RDMALocalBlocks *rdma_local_ram_blocks)
>> +{
>> +    return qemu_rdma_reg_whole_ram_blocks(rdma, rdma_local_ram_blocks);
>> +}
> Drop this, just call qemu_rdma_reg_whole_ram_blocks.
>
>> +
>> +/*
>> + * Shutdown and clean things up.
>> + */
>> +static void qemu_rdma_dereg_ram_blocks(RDMALocalBlocks *rdma_local_ram_blocks)
>> +{
>> +    int i, j;
>> +    for (i = 0; i < rdma_local_ram_blocks->num_blocks; i++) {
>> +        int num_chunks;
>> +        if (!rdma_local_ram_blocks->block[i].pmr) {
>> +            continue;
>> +        }
>> +        num_chunks = RDMA_REG_NUM_CHUNKS(&(rdma_local_ram_blocks->block[i]));
>> +        for (j = 0; j < num_chunks; j++) {
>> +            if (!rdma_local_ram_blocks->block[i].pmr[j]) {
>> +                continue;
>> +            }
>> +            ibv_dereg_mr(rdma_local_ram_blocks->block[i].pmr[j]);
>> +        }
>> +        free(rdma_local_ram_blocks->block[i].pmr);
> g_free.
>
>> +        rdma_local_ram_blocks->block[i].pmr = NULL;
>> +    }
>> +    for (i = 0; i < rdma_local_ram_blocks->num_blocks; i++) {
>> +        if (!rdma_local_ram_blocks->block[i].mr) {
>> +            continue;
>> +        }
>> +        ibv_dereg_mr(rdma_local_ram_blocks->block[i].mr);
>> +        rdma_local_ram_blocks->block[i].mr = NULL;
>> +    }
>> +}
>> +
>> +/*
>> + * Server uses this to prepare to transmit the RAMBlock descriptions
>> + * to the primary VM after connection setup.
>> + * Both sides use the "remote" structure to communicate and update
>> + * their "local" descriptions with what was sent.
>> + */
>> +static void qemu_rdma_copy_to_remote_ram_blocks(RDMAContext *rdma,
>> +                                                RDMALocalBlocks *local,
>> +                                                RDMARemoteBlocks *remote)
>> +{
>> +    int i;
>> +    DPRINTF("Allocating %d remote ram block structures\n", local->num_blocks);
>> +    *remote->num_blocks = local->num_blocks;
>> +
>> +    for (i = 0; i < local->num_blocks; i++) {
>> +            remote->block[i].remote_host_addr =
>> +                (uint64_t)(local->block[i].local_host_addr);
>> +
>> +            if (rdma->chunk_register_destination == false) {
> Use ! instead of "== false".
>
>> +                remote->block[i].remote_rkey = local->block[i].mr->rkey;
>> +            }
>> +
>> +            remote->block[i].offset = local->block[i].offset;
>> +            remote->block[i].length = local->block[i].length;
>> +    }
>> +}
>> +
>> +/*
>> + * Client then propogates the remote ram block descriptions to his local copy.
>> + * Really, only the virtual addresses are useful, but we propogate everything
>> + * anyway.
>> + *
>> + * If we're using dynamic registration on the server side (the default), then
>> + * the 'rkeys' are not useful because we will re-ask for them later during
>> + * runtime.
>> + */
> What if we are not using it?  Should the comment be more like:
>
> /*
>   * Client then propagates the remote ram block descriptions to his
>   * local copy.  If we're using dynamic registration on the server side
>   * (the default), only the virtual addresses are actually useful
>   * because we will re-ask for the 'rkeys' later.  But for simplicity
>   * we propagate everything anyway.
>   */
>
> ?
>
>> +static int qemu_rdma_process_remote_ram_blocks(RDMALocalBlocks *local,
>> +                                               RDMARemoteBlocks *remote)
>> +{
>> +    int i, j;
>> +
>> +    if (local->num_blocks != *remote->num_blocks) {
>> +        fprintf(stderr, "local %d != remote %d\n",
>> +            local->num_blocks, *remote->num_blocks);
>> +        return -1;
>> +    }
>> +
>> +    for (i = 0; i < *remote->num_blocks; i++) {
>> +        /* search local ram blocks */
>> +        for (j = 0; j < local->num_blocks; j++) {
>> +            if (remote->block[i].offset != local->block[j].offset) {
>> +                continue;
>> +            }
>> +            if (remote->block[i].length != local->block[j].length) {
>> +                return -1;
>> +            }
>> +            local->block[j].remote_host_addr =
>> +                remote->block[i].remote_host_addr;
>> +            local->block[j].remote_rkey = remote->block[i].remote_rkey;
>> +            break;
>> +        }
>> +        if (j >= local->num_blocks) {
>> +            return -1;
>> +        }
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +/*
>> + * Find the ram block that corresponds to the page requested to be
>> + * transmitted by QEMU.
>> + *
>> + * Once the block is found, also identify which 'chunk' within that
>> + * block that the page belongs to.
>> + *
>> + * This search cannot fail or the migration will fail.
>> + */
>> +static int qemu_rdma_search_ram_block(uint64_t offset, uint64_t length,
>> +        RDMALocalBlocks *blocks, int *block_index, int *chunk_index)
>> +{
>> +    int i;
>> +    for (i = 0; i < blocks->num_blocks; i++) {
>> +        if (offset < blocks->block[i].offset) {
>> +            continue;
>> +        }
>> +        if (offset + length >
>> +                blocks->block[i].offset + blocks->block[i].length) {
>> +            continue;
>> +        }
>> +
>> +        *block_index = i;
>> +        if (chunk_index) {
> Always true.
>
>> +            uint8_t *host_addr = blocks->block[i].local_host_addr +
>> +                (offset - blocks->block[i].offset);
>> +            *chunk_index = RDMA_REG_CHUNK_INDEX(
>> +                    blocks->block[i].local_host_addr, host_addr);
>> +        }
>> +        return 0;
>> +    }
>> +    return -1;
>> +}
>> +
>> +/*
>> + * Register a chunk with IB. If the chunk was already registered
>> + * previously, then skip.
>> + *
>> + * Also return the keys associated with the registration needed
>> + * to perform the actual RDMA operation.
>> + */
>> +static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
>> +        RDMALocalBlock *block, uint64_t host_addr,
>> +        uint32_t *lkey, uint32_t *rkey)
>> +{
>> +    int chunk;
>> +    if (block->mr) {
>> +        if (lkey) {
>> +            *lkey = block->mr->lkey;
>> +        }
>> +        if (rkey) {
>> +            *rkey = block->mr->rkey;
>> +        }
>> +        return 0;
>> +    }
>> +
>> +    /* allocate memory to store chunk MRs */
>> +    if (!block->pmr) {
>> +        int num_chunks = RDMA_REG_NUM_CHUNKS(block);
>> +        block->pmr = g_malloc0(num_chunks *
>> +                sizeof(struct ibv_mr *));
>> +        if (!block->pmr) {
>> +            return -1;
>> +        }
>> +    }
>> +
>> +    /*
>> +     * If 'rkey', then we're the server performing a dynamic
>> +     * registration, so grant access to the client.
> s/performing a dynamic registration//
>
> Replace server with destination (right?)
>
>> +     * If 'lkey', then we're the primary VM performing a dynamic
> s/primary VM performing a dynamic registration/source/
>
>> +     * registration, so grant access only to ourselves.
>> +     */
>> +    chunk = RDMA_REG_CHUNK_INDEX(block->local_host_addr, host_addr);
>> +    if (!block->pmr[chunk]) {
>> +        uint8_t *start_addr = RDMA_REG_CHUNK_START(block, chunk);
>> +        uint8_t *end_addr = RDMA_REG_CHUNK_END(block, chunk);
>> +        if (start_addr < block->local_host_addr) {
>> +            start_addr = block->local_host_addr;
>> +        }
> This "if" is not needed.
>
>> +        if (end_addr > block->local_host_addr + block->length) {
>> +            end_addr = block->local_host_addr + block->length;
>> +        }
> This "if" is needed, but boy it is ugly! :)
>
> I think it's better if you change RDMA_REG_CHUNK_* to static inline
> functions.  This way, the clamping of RDMA_REG_CHUNK_END's return value
> can be placed in the function itself.
>
>> +        block->pmr[chunk] = ibv_reg_mr(rdma->pd,
>> +                start_addr,
>> +                end_addr - start_addr,
>> +                (rkey ? (IBV_ACCESS_LOCAL_WRITE |
>> +                            IBV_ACCESS_REMOTE_WRITE) : 0)
>> +                | IBV_ACCESS_REMOTE_READ);
> Here my lack of IB-fu is showing, but...  I would have thought of
> something like this
>
>       (rkey ? IBV_ACCESS_REMOTE_WRITE : 0) |
>       (lkey ? IBV_ACCESS_LOCAL_READ : 0)
>
> i.e. on the source IB will read, on the destination IB will write on
> behalf of the source?
>
> Why are the flags like that?
>
>> +        if (!block->pmr[chunk]) {
>> +            fprintf(stderr, "Failed to register chunk!\n");
>> +            return -1;
>> +        }
>> +    }
>> +
>> +    if (lkey) {
>> +        *lkey = block->pmr[chunk]->lkey;
>> +    }
>> +    if (rkey) {
>> +        *rkey = block->pmr[chunk]->rkey;
>> +    }
>> +    return 0;
>> +}
>> +
>> +/*
>> + * Register (at connection time) the memory used for control
>> + * channel messages.
>> + */
>> +static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
>> +{
>> +    rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
>> +            rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
>> +            IBV_ACCESS_LOCAL_WRITE |
>> +            IBV_ACCESS_REMOTE_WRITE |
>> +            IBV_ACCESS_REMOTE_READ);
>> +    if (rdma->wr_data[idx].control_mr) {
>> +        return 0;
>> +    }
>> +    return -1;
>> +}
>> +
>> +static int qemu_rdma_dereg_control(RDMAContext *rdma, int idx)
>> +{
>> +    return ibv_dereg_mr(rdma->wr_data[idx].control_mr);
>> +}
>> +
>> +#if defined(DEBUG_RDMA) || defined(DEBUG_RDMA_VERBOSE)
>> +static const char *print_wrid(int wrid)
>> +{
>> +    if (wrid >= RDMA_WRID_RECV_CONTROL) {
>> +        return wrid_desc[RDMA_WRID_RECV_CONTROL];
>> +    }
>> +    return wrid_desc[wrid];
>> +}
>> +#endif
>> +
>> +/*
>> + * Consult the connection manager to see a work request
>> + * (of any kind) has completed.
>> + * Return the work request ID that completed.
>> + */
>> +static int qemu_rdma_poll(RDMAContext *rdma)
>> +{
>> +    int ret;
>> +    struct ibv_wc wc;
>> +
>> +    ret = ibv_poll_cq(rdma->cq, 1, &wc);
>> +    if (!ret) {
>> +        return RDMA_WRID_NONE;
>> +    }
>> +    if (ret < 0) {
>> +        fprintf(stderr, "ibv_poll_cq return %d!\n", ret);
>> +        return ret;
>> +    }
>> +    if (wc.status != IBV_WC_SUCCESS) {
>> +        fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
>> +                        wc.status, ibv_wc_status_str(wc.status));
>> +        fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wc.wr_id]);
>> +
>> +        return -1;
>> +    }
>> +
>> +    if (rdma->control_ready_expected &&
>> +        (wc.wr_id >= RDMA_WRID_RECV_CONTROL)) {
>> +        DPRINTF("completion %s #%" PRId64 " received (%" PRId64 ")\n",
>> +            wrid_desc[RDMA_WRID_RECV_CONTROL], wc.wr_id -
>> +            RDMA_WRID_RECV_CONTROL, wc.wr_id);
>> +        rdma->control_ready_expected = 0;
>> +    }
>> +
>> +    if (wc.wr_id == RDMA_WRID_RDMA_WRITE) {
>> +        rdma->num_signaled_send--;
>> +        DPRINTF("completions %s (%" PRId64 ") left %d\n",
>> +            print_wrid(wc.wr_id), wc.wr_id, rdma->num_signaled_send);
>> +    } else {
>> +        DPRINTF("other completion %s (%" PRId64 ") received left %d\n",
>> +            print_wrid(wc.wr_id), wc.wr_id, rdma->num_signaled_send);
>> +    }
>> +
>> +    return  (int)wc.wr_id;
>> +}
>> +
>> +/*
>> + * Block until the next work request has completed.
>> + *
>> + * First poll to see if a work request has already completed,
>> + * otherwise block.
>> + *
>> + * If we encounter completed work requests for IDs other than
>> + * the one we're interested in, then that's generally an error.
>> + *
>> + * The only exception is actual RDMA Write completions. These
>> + * completions only need to be recorded, but do not actually
>> + * need further processing.
>> + */
>> +#ifdef RDMA_BLOCKING
>> +static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid)
> Rename to qemu_rdma_wait_for_wrid.
>
>> +{
>> +    int num_cq_events = 0;
>> +    int r = RDMA_WRID_NONE;
>> +    struct ibv_cq *cq;
>> +    void *cq_ctx;
>> +
>> +    if (ibv_req_notify_cq(rdma->cq, 0)) {
>> +        return -1;
>> +    }
>> +    /* poll cq first */
>> +    while (r != wrid) {
>> +        r = qemu_rdma_poll(rdma);
>> +        if (r < 0) {
>> +            return r;
>> +        }
>> +        if (r == RDMA_WRID_NONE) {
>> +            break;
>> +        }
>> +        if (r != wrid) {
>> +            DPRINTF("A Wanted wrid %s (%d) but got %s (%d)\n",
>> +                print_wrid(wrid), wrid, print_wrid(r), r);
>> +        }
>> +    }
>> +    if (r == wrid) {
>> +        return 0;
>> +    }
>> +
>> +    while (1) {
>> +        if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
>> +            goto err_block_for_wrid;
>> +        }
>> +        num_cq_events++;
>> +        if (ibv_req_notify_cq(cq, 0)) {
>> +            goto err_block_for_wrid;
>> +        }
>> +        /* poll cq */
>> +        while (r != wrid) {
>> +            r = qemu_rdma_poll(rdma);
>> +            if (r < 0) {
>> +                goto err_block_for_wrid;
>> +            }
>> +            if (r == RDMA_WRID_NONE) {
>> +                break;
>> +            }
>> +            if (r != wrid) {
>> +                DPRINTF("B Wanted wrid %s (%d) but got %s (%d)\n",
>> +                    print_wrid(wrid), wrid, print_wrid(r), r);
>> +            }
>> +        }
>> +        if (r == wrid) {
>> +            goto success_block_for_wrid;
>> +        }
>> +    }
>> +
>> +success_block_for_wrid:
>> +    if (num_cq_events) {
>> +        ibv_ack_cq_events(cq, num_cq_events);
>> +    }
>> +    return 0;
>> +
>> +err_block_for_wrid:
>> +    if (num_cq_events) {
>> +        ibv_ack_cq_events(cq, num_cq_events);
>> +    }
>> +    return -1;
>> +}
>> +#else
>> +static int qemu_rdma_poll_for_wrid(RDMAContext *rdma, int wrid)
> Dead code, drop.
>
>> +{
>> +    int r = RDMA_WRID_NONE;
>> +    while (r != wrid) {
>> +        r = qemu_rdma_poll(rdma);
>> +        if (r < 0) {
>> +            return r;
>> +        }
>> +    }
>> +    return 0;
>> +}
>> +#endif
>> +
>> +
>> +static int wait_for_wrid(RDMAContext *rdma, int wrid)
> Drop, just call qemu_rdma_wait_for_wrid.
>
>> +{
>> +#ifdef RDMA_BLOCKING
>> +    return qemu_rdma_block_for_wrid(rdma, wrid);
>> +#else
>> +    return qemu_rdma_poll_for_wrid(rdma, wrid);
>> +#endif
>> +}
>> +
>> +static void control_to_network(RDMAControlHeader *control)
>> +{
>> +    control->version = htonl(control->version);
>> +    control->type = htonl(control->type);
>> +    control->len = htonl(control->len);
>> +    control->repeat = htonl(control->repeat);
>> +}
>> +
>> +static void network_to_control(RDMAControlHeader *control)
>> +{
>> +    control->version = ntohl(control->version);
>> +    control->type = ntohl(control->type);
>> +    control->len = ntohl(control->len);
>> +    control->repeat = ntohl(control->repeat);
>> +}
> Please group all network<->host conversion functions together in the
> file, closer to the top (and hence to the definition of the structs).
>
>> +/*
>> + * Post a SEND message work request for the control channel
>> + * containing some data and block until the post completes.
>> + */
>> +static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
>> +                                       RDMAControlHeader *head)
>> +{
>> +    int ret = 0;
>> +    RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_CONTROL_MAX_WR];
>> +    struct ibv_send_wr *bad_wr;
>> +    struct ibv_sge sge = {
>> +                           .addr = (uint64_t)(wr->control),
>> +                           .length = head->len + RDMAControlHeaderSize,
>> +                           .lkey = wr->control_mr->lkey,
>> +                         };
>> +    struct ibv_send_wr send_wr = {
>> +                                   .wr_id = RDMA_WRID_SEND_CONTROL,
>> +                                   .opcode = IBV_WR_SEND,
>> +                                   .send_flags = IBV_SEND_SIGNALED,
>> +                                   .sg_list = &sge,
>> +                                   .num_sge = 1,
>> +                                };
>> +
>> +    if (head->version < RDMA_CONTROL_VERSION_MIN ||
>> +            head->version > RDMA_CONTROL_VERSION_MAX) {
>> +        fprintf(stderr, "SEND: Invalid control message version: %d,"
>> +                        " min: %d, max: %d\n",
>> +                        head->version, RDMA_CONTROL_VERSION_MIN,
>> +                        RDMA_CONTROL_VERSION_MAX);
>> +        return -1;
>> +    }
> As mentioned above, please drop versions here.
>
>> +    DPRINTF("CONTROL: sending %s..\n", control_desc[head->type]);
>> +
>> +    /*
>> +     * We don't actually need to do a memcpy() in here if we used
>> +     * the "sge" properly, but since we're only sending control messages
>> +     * (not RAM in a performance-critical path), then its OK for now.
>> +     *
>> +     * The copy makes the RDMAControlHeader simpler to manipulate
>> +     * for the time being.
>> +     */
>> +    memcpy(wr->control, head, RDMAControlHeaderSize);
>> +    control_to_network((void *) wr->control);
>> +
>> +    if (buf) {
>> +        memcpy(wr->control + RDMAControlHeaderSize, buf, head->len);
>> +    }
>> +
>> +
>> +    if (ibv_post_send(rdma->qp, &send_wr, &bad_wr)) {
>> +        return -1;
>> +    }
>> +
>> +    if (ret < 0) {
>> +        fprintf(stderr, "Failed to use post IB SEND for control!\n");
>> +        return ret;
>> +    }
>> +
>> +    ret = wait_for_wrid(rdma, RDMA_WRID_SEND_CONTROL);
>> +    if (ret < 0) {
>> +        fprintf(stderr, "rdma migration: polling control error!");
>> +    }
>> +
>> +    return ret;
>> +}
>> +
>> +/*
>> + * Post a RECV work request in anticipation of some future receipt
>> + * of data on the control channel.
>> + */
>> +static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
>> +{
>> +    struct ibv_recv_wr *bad_wr;
>> +    struct ibv_sge sge = {
>> +                            .addr = (uint64_t)(rdma->wr_data[idx].control),
>> +                            .length = RDMA_CONTROL_MAX_BUFFER,
>> +                            .lkey = rdma->wr_data[idx].control_mr->lkey,
>> +                         };
>> +
>> +    struct ibv_recv_wr recv_wr = {
>> +                                    .wr_id = RDMA_WRID_RECV_CONTROL + idx,
>> +                                    .sg_list = &sge,
>> +                                    .num_sge = 1,
>> +                                 };
>> +
>> +
>> +    if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
>> +        return -1;
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +/*
>> + * Block and wait for a RECV control channel message to arrive.
>> + */
>> +static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
>> +                RDMAControlHeader *head, int expecting, int idx)
>> +{
>> +    int ret = wait_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx);
>> +
>> +    if (ret < 0) {
>> +        fprintf(stderr, "rdma migration: polling control error!\n");
>> +        return ret;
>> +    }
>> +
>> +    network_to_control((void *) rdma->wr_data[idx].control);
>> +    memcpy(head, rdma->wr_data[idx].control, RDMAControlHeaderSize);
>> +
>> +    if (head->version < RDMA_CONTROL_VERSION_MIN ||
>> +            head->version > RDMA_CONTROL_VERSION_MAX) {
>> +        fprintf(stderr, "RECV: Invalid control message version: %d,"
>> +                        " min: %d, max: %d\n",
>> +                        head->version, RDMA_CONTROL_VERSION_MIN,
>> +                        RDMA_CONTROL_VERSION_MAX);
>> +        return -1;
>> +    }
>> +
>> +    DPRINTF("CONTROL: %s received\n", control_desc[expecting]);
>> +
>> +    if (expecting != RDMA_CONTROL_NONE && head->type != expecting) {
>> +        fprintf(stderr, "Was expecting a %s (%d) control message"
>> +                ", but got: %s (%d), length: %d\n",
>> +                control_desc[expecting], expecting,
>> +                control_desc[head->type], head->type, head->len);
>> +        return -EIO;
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +/*
>> + * When a RECV work request has completed, the work request's
>> + * buffer is pointed at the header.
>> + *
>> + * This will advance the pointer to the data portion
>> + * of the control message of the work request's buffer that
>> + * was populated after the work request finished.
>> + */
>> +static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
>> +                                  RDMAControlHeader *head)
>> +{
>> +    rdma->wr_data[idx].control_len = head->len;
>> +    rdma->wr_data[idx].control_curr =
>> +        rdma->wr_data[idx].control + RDMAControlHeaderSize;
>> +}
>> +
>> +/*
>> + * This is an 'atomic' high-level operation to deliver a single, unified
>> + * control-channel message.
>> + *
>> + * Additionally, if the user is expecting some kind of reply to this message,
>> + * they can request a 'resp' response message be filled in by posting an
>> + * additional work request on behalf of the user and waiting for an additional
>> + * completion.
>> + *
>> + * The extra (optional) response is used during registration to us from having
>> + * to perform an *additional* exchange of message just to provide a response by
>> + * instead piggy-backing on the acknowledgement.
>> + */
>> +static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
>> +                                   uint8_t *data, RDMAControlHeader *resp,
>> +                                   int *resp_idx)
>> +{
>> +    int ret = 0;
>> +    int idx = 0;
>> +
>> +    /*
>> +     * Wait until the server is ready before attempting to deliver the message
>> +     * by waiting for a READY message.
>> +     */
>> +    if (rdma->control_ready_expected) {
>> +        RDMAControlHeader resp;
>> +        ret = qemu_rdma_exchange_get_response(rdma,
>> +                                    &resp, RDMA_CONTROL_READY, idx);
>> +        if (ret < 0) {
>> +            return ret;
>> +        }
>> +    }
>> +
>> +    /*
>> +     * If the user is expecting a response, post a WR in anticipation of it.
>> +     */
>> +    if (resp) {
>> +        ret = qemu_rdma_post_recv_control(rdma, idx + 1);
>> +        if (ret) {
>> +            fprintf(stderr, "rdma migration: error posting"
>> +                    " extra control recv for anticipated result!");
>> +            return ret;
>> +        }
>> +    }
>> +
>> +    /*
>> +     * Post a WR to replace the one we just consumed for the READY message.
>> +     */
>> +    ret = qemu_rdma_post_recv_control(rdma, idx);
>> +    if (ret) {
>> +        fprintf(stderr, "rdma migration: error posting first control recv!");
>> +        return ret;
>> +    }
>> +
>> +    /*
>> +     * Deliver the control message that was requested.
>> +     */
>> +    ret = qemu_rdma_post_send_control(rdma, data, head);
>> +
>> +    if (ret < 0) {
>> +        fprintf(stderr, "Failed to send control buffer!\n");
>> +        return ret;
>> +    }
>> +
>> +    /*
>> +     * If we're expecting a response, block and wait for it.
>> +     */
>> +    if (resp) {
>> +        DPRINTF("Waiting for response %s\n", control_desc[resp->type]);
>> +        ret = qemu_rdma_exchange_get_response(rdma, resp, resp->type, idx + 1);
>> +
>> +        if (ret < 0) {
>> +            return ret;
>> +        }
>> +
>> +        qemu_rdma_move_header(rdma, idx + 1, resp);
>> +        *resp_idx = idx + 1;
>> +        DPRINTF("Response %s received.\n", control_desc[resp->type]);
>> +    }
>> +
>> +    rdma->control_ready_expected = 1;
>> +
>> +    return 0;
>> +}
>> +
>> +/*
>> + * This is an 'atomic' high-level operation to receive a single, unified
>> + * control-channel message.
>> + */
>> +static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
>> +                                int expecting)
>> +{
>> +    RDMAControlHeader ready = {
>> +                                .len = 0,
>> +                                .type = RDMA_CONTROL_READY,
>> +                                .version = RDMA_CONTROL_CURRENT_VERSION,
>> +                                .repeat = 1,
>> +                              };
>> +    int ret;
>> +    int idx = 0;
>> +
>> +    /*
>> +     * Inform the client that we're ready to receive a message.
>> +     */
>> +    ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
>> +
>> +    if (ret < 0) {
>> +        fprintf(stderr, "Failed to send control buffer!\n");
>> +        return ret;
>> +    }
>> +
>> +    /*
>> +     * Block and wait for the message.
>> +     */
>> +    ret = qemu_rdma_exchange_get_response(rdma, head, expecting, idx);
>> +
>> +    if (ret < 0) {
>> +        return ret;
>> +    }
>> +
>> +    qemu_rdma_move_header(rdma, idx, head);
>> +
>> +    /*
>> +     * Post a new RECV work request to replace the one we just consumed.
>> +     */
>> +    ret = qemu_rdma_post_recv_control(rdma, idx);
>> +    if (ret) {
>> +        fprintf(stderr, "rdma migration: error posting second control recv!");
>> +        return ret;
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +/*
>> + * Write an actual chunk of memory using RDMA.
>> + *
>> + * If we're using dynamic registration on the server-side, we have to
>> + * send a registration command first.
>> + */
>> +static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
>> +        int current_index,
>> +        uint64_t offset, uint64_t length,
>> +        uint64_t wr_id, enum ibv_send_flags flag)
>> +{
>> +    struct ibv_sge sge;
>> +    struct ibv_send_wr send_wr = { 0 };
>> +    struct ibv_send_wr *bad_wr;
>> +    RDMALocalBlock *block = &(local_ram_blocks.block[current_index]);
>> +    int chunk;
>> +    RDMARegister reg;
>> +    RDMARegisterResult *reg_result;
>> +    int reg_result_idx;
>> +    RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
>> +    RDMAControlHeader head = { .len = sizeof(RDMARegister),
>> +                               .type = RDMA_CONTROL_REGISTER_REQUEST,
>> +                               .version = RDMA_CONTROL_CURRENT_VERSION,
>> +                               .repeat = 1,
>> +                             };
>> +    int ret;
>> +
>> +    sge.addr = (uint64_t)(block->local_host_addr + (offset - block->offset));
>> +    sge.length = length;
>> +
>> +    if (rdma->chunk_register_destination) {
>> +        chunk = RDMA_REG_CHUNK_INDEX(block->local_host_addr, sge.addr);
>> +        if (!block->remote_keys[chunk]) {
>> +            /*
>> +             * This page has not yet been registered, so first check to see
>> +             * if the entire chunk is zero. If so, tell the other size to
>> +             * memset() + madvise() the entire chunk without RDMA.
>> +             */
>> +            if (!can_use_buffer_find_nonzero_offset((void *)sge.addr, length)) {
>> +                fprintf(stderr, "Chunk cannot be checked for zero!!!! "
>> +                    "%d for %d bytes, index: %d, offset: %" PRId64 "...\n",
>> +                    chunk, sge.length, current_index, offset);
>> +                return -EIO;
>> +            }
> This shouldn't fail.  If it failed, it means that sge.addr is not
> suitably aligned.  But if it fails, just proceed with registration.
>
> That is, add
>
>      can_use_buffer_find_nonzero_offset((void *)sge.addr, length) &&
>
> to the "if" below.
>
>> +
>> +            if (buffer_find_nonzero_offset((void *)sge.addr,
>> +                                                    length) == length) {
>> +                RDMACompress comp = {
>> +                                        .offset = offset,
>> +                                        .value = 0,
>> +                                        .block_idx = current_index,
>> +                                        .length = length,
>> +                                    };
>> +
>> +                head.len = sizeof(comp);
>> +                head.type = RDMA_CONTROL_COMPRESS;
>> +
>> +                DPRINTF("Entire chunk is zero, sending compress: %d for %d "
>> +                    "bytes, index: %d, offset: %" PRId64 "...\n",
>> +                    chunk, sge.length, current_index, offset);
>> +
>> +                ret = qemu_rdma_exchange_send(rdma, &head,
>> +                                (uint8_t *) &comp, NULL, NULL);
>> +
>> +                if (ret < 0) {
>> +                    return -EIO;
>> +                }
>> +
>> +                return 1;
>> +            }
>> +
>> +            /*
>> +             * Otherwise, tell other side to register.
>> +             */
>> +            reg.len = sge.length;
>> +            reg.current_index = current_index;
>> +            reg.offset = offset;
>> +
>> +            DPRINTF("Sending registration request chunk %d for %d "
>> +                    "bytes, index: %d, offset: %" PRId64 "...\n",
>> +                    chunk, sge.length, current_index, offset);
>> +
>> +            ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
>> +                                    &resp, &reg_result_idx);
>> +            if (ret < 0) {
>> +                return ret;
>> +            }
>> +
>> +            reg_result = (RDMARegisterResult *)
>> +                    rdma->wr_data[reg_result_idx].control_curr;
>> +
>> +            DPRINTF("Received registration result:"
>> +                    " my key: %x their key %x, chunk %d\n",
>> +                    block->remote_keys[chunk], reg_result->rkey, chunk);
>> +
>> +            block->remote_keys[chunk] = reg_result->rkey;
>> +        }
>> +
>> +        send_wr.wr.rdma.rkey = block->remote_keys[chunk];
>> +    } else {
>> +        send_wr.wr.rdma.rkey = block->remote_rkey;
>> +    }
>> +
>> +    if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
>> +                                        &sge.lkey, NULL)) {
>> +        fprintf(stderr, "cannot get lkey!\n");
>> +        return -EINVAL;
>> +    }
>> +
>> +    send_wr.wr_id = wr_id;
>> +    send_wr.opcode = IBV_WR_RDMA_WRITE;
>> +    send_wr.send_flags = flag;
>> +    send_wr.sg_list = &sge;
>> +    send_wr.num_sge = 1;
>> +    send_wr.wr.rdma.remote_addr = block->remote_host_addr +
>> +        (offset - block->offset);
>> +
>> +    return ibv_post_send(rdma->qp, &send_wr, &bad_wr);
>> +}
>> +
>> +/*
>> + * Push out any unwritten RDMA operations.
>> + *
>> + * We support sending out multiple chunks at the same time.
>> + * Not all of them need to get signaled in the completion queue.
>> + */
>> +static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
>> +{
>> +    int ret;
>> +    enum ibv_send_flags flags = 0;
>> +
>> +    if (!rdma->current_length) {
>> +        return 0;
>> +    }
>> +    if (rdma->num_unsignaled_send >=
>> +            RDMA_UNSIGNALED_SEND_MAX) {
>> +        flags = IBV_SEND_SIGNALED;
>> +    }
>> +
>> +    while (1) {
>> +        ret = qemu_rdma_write_one(f, rdma,
>> +                rdma->current_index,
>> +                rdma->current_offset,
>> +                rdma->current_length,
>> +                RDMA_WRID_RDMA_WRITE, flags);
>> +        if (ret < 0) {
>> +            if (ret == ENOMEM) {
> This should be -ENOMEM at the very least, but I'm not sure who returns
> it.  Is it from IB?
>
>> +                DPRINTF("send queue is full. wait a little....\n");
>> +                ret = wait_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
>> +                if (ret < 0) {
>> +                    fprintf(stderr, "rdma migration: failed to make "
>> +                                    "room in full send queue! %d\n", ret);
>> +                    return -EIO;
>> +                }
>> +            } else {
>> +                 fprintf(stderr, "rdma migration: write flush error! %d\n",
>> +                                                                ret);
>> +                 perror("write flush error");
>> +                 return -EIO;
>> +            }
>> +        } else {
>> +                break;
>> +        }
>> +    }
> Goto works nicely here:
>
> retry:
>       ret = qemu_rdma_write_one(f, rdma,
>                rdma->current_index,
>                rdma->current_offset,
>                rdma->current_length,
>                RDMA_WRID_RDMA_WRITE, flags);
>       if (ret < 0) {
>           if (ret == -ENOMEM) {
>               DPRINTF("send queue is full. wait a little....\n");
>               ret = wait_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
>               if (ret >= 0) {
>                   goto retry;
>               }
>               if (ret < 0) {
>                   fprintf(stderr, "rdma migration: failed to make "
>                                   "room in full send queue! %d\n", ret);
>                   return ret;
>               }
>           }
>            perror("rdma migration: write flush error");
>            return ret;
>       }
>
>> +    if (ret == 0) {
>> +        if (rdma->num_unsignaled_send >=
>> +                RDMA_UNSIGNALED_SEND_MAX) {
>> +            rdma->num_unsignaled_send = 0;
>> +            rdma->num_signaled_send++;
>> +            DPRINTF("signaled total: %d\n", rdma->num_signaled_send);
>> +        } else {
>> +            rdma->num_unsignaled_send++;
>> +        }
>> +    }
>> +
>> +    rdma->current_length = 0;
>> +    rdma->current_offset = 0;
>> +
>> +    return 0;
>> +}
>> +
>> +static inline int qemu_rdma_in_current_block(RDMAContext *rdma,
>> +                uint64_t offset, uint64_t len)
>> +{
>> +    RDMALocalBlock *block =
>> +        &(local_ram_blocks.block[rdma->current_index]);
>> +    if (rdma->current_index < 0) {
>> +        return 0;
>> +    }
>> +    if (offset < block->offset) {
>> +        return 0;
>> +    }
>> +    if (offset + len > block->offset + block->length) {
>> +        return 0;
>> +    }
>> +    return 1;
>> +}
>> +
>> +static inline int qemu_rdma_in_current_chunk(RDMAContext *rdma,
>> +                uint64_t offset, uint64_t len)
>> +{
>> +    RDMALocalBlock *block = &(local_ram_blocks.block[rdma->current_index]);
>> +    uint8_t *chunk_start, *chunk_end, *host_addr;
>> +    if (rdma->current_chunk < 0) {
>> +        return 0;
>> +    }
>> +    host_addr = block->local_host_addr + (offset - block->offset);
>> +    chunk_start = RDMA_REG_CHUNK_START(block, rdma->current_chunk);
>> +    if (chunk_start < block->local_host_addr) {
>> +        chunk_start = block->local_host_addr;
>> +    }
> This if should never be true with the change to the macros.
>
>> +    if (host_addr < chunk_start) {
>> +        return 0;
>> +    }
>> +    chunk_end = RDMA_REG_CHUNK_END(block, rdma->current_chunk);
>> +    if (chunk_end > chunk_start + block->length) {
>> +        chunk_end = chunk_start + block->length;
>> +    }
> See above about doing this in RDMA_REG_CHUNK_END.
>
>> +    if (host_addr + len > chunk_end) {
>> +        return 0;
>> +    }
>> +    return 1;
>> +}
>> +
>> +static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
>> +                    uint64_t offset, uint64_t len)
>> +{
>> +    if (rdma->current_length == 0) {
>> +        return 0;
>> +    }
>> +    if (offset != rdma->current_offset + rdma->current_length) {
>> +        return 0;
>> +    }
>> +    if (!qemu_rdma_in_current_block(rdma, offset, len)) {
>> +        return 0;
>> +    }
>> +#ifdef RDMA_CHUNK_REGISTRATION
>> +    if (!qemu_rdma_in_current_chunk(rdma, offset, len)) {
>> +        return 0;
>> +    }
>> +#endif
>> +    return 1;
>> +}
>> +
>> +/*
>> + * We're not actually writing here, but doing three things:
>> + *
>> + * 1. Identify the chunk the buffer belongs to.
>> + * 2. If the chunk is full or the buffer doesn't belong to the current
>> + *    chunk, then start a new chunk and flush() the old chunk.
>> + * 3. To keep the hardware busy, we also group chunks into batches
>> + *    and only require that a batch gets acknowledged in the completion
>> + *    qeueue instead of each individual chunk.
>> + */
>> +static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
>> +                           uint64_t offset, uint64_t len)
>> +{
>> +    int index = rdma->current_index;
>> +    int chunk_index = rdma->current_chunk;
>> +    int ret;
>> +
>> +    /* If we cannot merge it, we flush the current buffer first. */
>> +    if (!qemu_rdma_buffer_mergable(rdma, offset, len)) {
>> +        ret = qemu_rdma_write_flush(f, rdma);
>> +        if (ret) {
>> +            return ret;
>> +        }
>> +        rdma->current_length = 0;
>> +        rdma->current_offset = offset;
>> +
>> +        ret = qemu_rdma_search_ram_block(offset, len,
>> +                    &local_ram_blocks, &index, &chunk_index);
>> +        if (ret) {
>> +            fprintf(stderr, "ram block search failed\n");
>> +            return ret;
>> +        }
>> +        rdma->current_index = index;
>> +        rdma->current_chunk = chunk_index;
>> +    }
>> +
>> +    /* merge it */
>> +    rdma->current_length += len;
>> +
>> +    /* flush it if buffer is too large */
>> +    if (rdma->current_length >= RDMA_MERGE_MAX) {
>> +        return qemu_rdma_write_flush(f, rdma);
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +static void qemu_rdma_cleanup(RDMAContext *rdma)
>> +{
>> +    struct rdma_cm_event *cm_event;
>> +    int ret, idx;
>> +
>> +    if (rdma->cm_id) {
>> +        DPRINTF("Disconnecting...\n");
>> +        ret = rdma_disconnect(rdma->cm_id);
>> +        if (!ret) {
>> +            ret = rdma_get_cm_event(rdma->channel, &cm_event);
>> +            if (!ret) {
>> +                rdma_ack_cm_event(cm_event);
>> +            }
>> +        }
>> +        DPRINTF("Disconnected.\n");
>> +    }
>> +
>> +    if (remote_ram_blocks.remote_area) {
>> +        g_free(remote_ram_blocks.remote_area);
> No "if" around g_free.
>
>> +    }
>> +
>> +    for (idx = 0; idx < (RDMA_CONTROL_MAX_WR + 1); idx++) {
>> +        if (rdma->wr_data[idx].control_mr) {
>> +            qemu_rdma_dereg_control(rdma, idx);
>> +        }
>> +        rdma->wr_data[idx].control_mr = NULL;
>> +    }
>> +
>> +    qemu_rdma_dereg_ram_blocks(&local_ram_blocks);
>> +
>> +    if (local_ram_blocks.block) {
>> +        if (rdma->chunk_register_destination) {
> No need for this "if", just go through the loop unconditionally, it's cheap.
>
>> +            for (idx = 0; idx < local_ram_blocks.num_blocks; idx++) {
>> +                RDMALocalBlock *block = &(local_ram_blocks.block[idx]);
>> +                if (block->remote_keys) {
> No "if" around g_free.
>
>> +                    g_free(block->remote_keys);
>> +                }
>> +            }
>> +        }
>> +        g_free(local_ram_blocks.block);
>> +    }
>> +
>> +    if (rdma->qp) {
>> +        ibv_destroy_qp(rdma->qp);
>> +    }
>> +    if (rdma->cq) {
>> +        ibv_destroy_cq(rdma->cq);
>> +    }
>> +    if (rdma->comp_channel) {
>> +        ibv_destroy_comp_channel(rdma->comp_channel);
>> +    }
>> +    if (rdma->pd) {
>> +        ibv_dealloc_pd(rdma->pd);
>> +    }
>> +    if (rdma->listen_id) {
>> +        rdma_destroy_id(rdma->listen_id);
>> +    }
>> +    if (rdma->cm_id) {
>> +        rdma_destroy_id(rdma->cm_id);
>> +        rdma->cm_id = 0;
> Why set this to zero and not the others?
>
>> +    }
>> +    if (rdma->channel) {
>> +        rdma_destroy_event_channel(rdma->channel);
>> +    }
>> +}
>> +
>> +static void qemu_rdma_remote_ram_blocks_init(void)
>> +{
>> +    int remote_size = (sizeof(RDMARemoteBlock) *
>> +                        local_ram_blocks.num_blocks)
>> +                        +   sizeof(*remote_ram_blocks.num_blocks);
>> +
>> +    DPRINTF("Preparing %d bytes for remote info\n", remote_size);
>> +
>> +    remote_ram_blocks.remote_area = g_malloc0(remote_size);
>> +    remote_ram_blocks.remote_size = remote_size;
>> +    remote_ram_blocks.num_blocks = remote_ram_blocks.remote_area;
>> +    remote_ram_blocks.block = (void *) (remote_ram_blocks.num_blocks + 1);
> You cannot do this, it doesn't guarantee that remote_ram_blocks.block is
> correctly aligned.  Please use an extra dummy struct RDMARemoteBlock.
>
>> +}
>> +
>> +static int qemu_rdma_client_init(RDMAContext *rdma, Error **errp,
>> +                                 bool chunk_register_destination)
>> +{
>> +    int ret, idx;
>> +
>> +    if (rdma->client_init_done) {
>> +        return 0;
>> +    }
> Why would this function be called twice?
>
>> +    rdma->chunk_register_destination = chunk_register_destination;
> I think this is too early to set it.  If you need it in the rest of the
> initialization, make it true unconditionally here, with a comment that
> it will be changed to "false" later depending on the server's settings.
>
> (See the other comments below in qemu_rdma_connect).
>
>> +    ret = qemu_rdma_resolve_host(rdma);
>> +    if (ret) {
>> +        fprintf(stderr, "rdma migration: error resolving host!");
>> +        goto err_rdma_client_init;
>> +    }
>> +
>> +    ret = qemu_rdma_alloc_pd_cq(rdma);
>> +    if (ret) {
>> +        fprintf(stderr, "rdma migration: error allocating pd and cq!");
>> +        goto err_rdma_client_init;
>> +    }
>> +
>> +    ret = qemu_rdma_alloc_qp(rdma);
>> +    if (ret) {
>> +        fprintf(stderr, "rdma migration: error allocating qp!");
>> +        goto err_rdma_client_init;
>> +    }
>> +
>> +    ret = qemu_rdma_init_ram_blocks(&local_ram_blocks);
>> +    if (ret) {
>> +        fprintf(stderr, "rdma migration: error initializing ram blocks!");
>> +        goto err_rdma_client_init;
>> +    }
>> +
>> +    ret = qemu_rdma_client_reg_ram_blocks(rdma, &local_ram_blocks);
>> +    if (ret) {
>> +        fprintf(stderr, "rdma migration: error client registering ram blocks!");
>> +        goto err_rdma_client_init;
>> +    }
>> +
>> +    for (idx = 0; idx < (RDMA_CONTROL_MAX_WR + 1); idx++) {
>> +        ret = qemu_rdma_reg_control(rdma, idx);
>> +        if (ret) {
>> +            fprintf(stderr, "rdma migration: error registering %d control!",
>> +                                                            idx);
>> +            goto err_rdma_client_init;
>> +        }
>> +    }
>> +
>> +    qemu_rdma_remote_ram_blocks_init();
>> +
>> +    rdma->client_init_done = 1;
>> +    return 0;
>> +
>> +err_rdma_client_init:
>> +    qemu_rdma_cleanup(rdma);
>> +    return -1;
>> +}
>> +
>> +static void caps_to_network(RDMACapabilities *cap)
>> +{
>> +    cap->version = htonl(cap->version);
>> +    cap->flags = htonl(cap->flags);
>> +}
>> +
>> +static void network_to_caps(RDMACapabilities *cap)
>> +{
>> +    cap->version = ntohl(cap->version);
>> +    cap->flags = ntohl(cap->flags);
>> +}
>> +
>> +static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
>> +{
>> +    RDMAControlHeader head;
>> +    RDMACapabilities cap = {
>> +                                .version = RDMA_CONTROL_CURRENT_VERSION,
>> +                                .flags = 0,
>> +                           };
>> +    struct rdma_conn_param conn_param = { .initiator_depth = 2,
>> +                                          .retry_count = 5,
>> +                                          .private_data = &cap,
>> +                                          .private_data_len = sizeof(cap),
>> +                                        };
>> +    struct rdma_cm_event *cm_event;
>> +    int ret;
>> +    int idx = 0;
>> +    int x;
>> +
>> +    if (rdma->chunk_register_destination) {
>> +        printf("Server dynamic registration requested.\n");
>> +        cap.flags |= RDMA_CAPABILITY_CHUNK_REGISTER;
>> +    }
>> +
>> +    caps_to_network(&cap);
>> +
>> +    ret = rdma_connect(rdma->cm_id, &conn_param);
>> +    if (ret) {
>> +        perror("rdma_connect");
>> +        fprintf(stderr, "rdma migration: error connecting!");
>> +        rdma_destroy_id(rdma->cm_id);
>> +        rdma->cm_id = 0;
>> +        goto err_rdma_client_connect;
>> +    }
>> +
>> +    ret = rdma_get_cm_event(rdma->channel, &cm_event);
>> +    if (ret) {
>> +        perror("rdma_get_cm_event after rdma_connect");
>> +        fprintf(stderr, "rdma migration: error connecting!");
>> +        rdma_ack_cm_event(cm_event);
> I think this rdma_ack_cm_event is wrong, rdma_get_cm_event has failed.
>
>> +        rdma_destroy_id(rdma->cm_id);
>> +        rdma->cm_id = 0;
>> +        goto err_rdma_client_connect;
>> +    }
>> +
>> +    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
>> +        perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
>> +        fprintf(stderr, "rdma migration: error connecting!");
>> +        rdma_ack_cm_event(cm_event);
>> +        rdma_destroy_id(rdma->cm_id);
>> +        rdma->cm_id = 0;
>> +        goto err_rdma_client_connect;
>> +    }
>> +
>> +    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
>> +    network_to_caps(&cap);
>> +
>> +    /*
>> +     * Verify that the destination can support the capabilities we requested.
>> +     */
>> +    if (!(cap.flags & RDMA_CAPABILITY_CHUNK_REGISTER) &&
>> +        rdma->chunk_register_destination) {
>> +        printf("Server cannot support dynamic registration. Will disable\n");
> Is it really "cannot support" or rather "has disabled"?  If so, this is
> not needed, the printf below already prints the same info.  Just make it
> like this:
>
>     rdma->chunk_register_destination =
>         !!(cap.flags & RDMA_CAPABILITY_CHUNK_REGISTER);
>
>> +        rdma->chunk_register_destination = false;
>> +    }
>> +
>> +    printf("Chunk registration %s\n",
>> +        rdma->chunk_register_destination ? "enabled" : "disabled");
>> +
>> +    rdma_ack_cm_event(cm_event);
> Move this above, before the "if (cm_event->event !=
> RDMA_CM_EVENT_ESTABLISHED)".
>
>> +
>> +    ret = qemu_rdma_post_recv_control(rdma, idx + 1);
>> +    if (ret) {
>> +        fprintf(stderr, "rdma migration: error posting second control recv!");
>> +        goto err_rdma_client_connect;
>> +    }
>> +
>> +    ret = qemu_rdma_post_recv_control(rdma, idx);
>> +    if (ret) {
>> +        fprintf(stderr, "rdma migration: error posting second control recv!");
>> +        goto err_rdma_client_connect;
>> +    }
>> +
>> +    ret = qemu_rdma_exchange_get_response(rdma,
>> +                                &head, RDMA_CONTROL_RAM_BLOCKS, idx + 1);
>> +
>> +    if (ret < 0) {
>> +        fprintf(stderr, "rdma migration: error sending remote info!");
>> +        goto err_rdma_client_connect;
>> +    }
>> +
>> +    qemu_rdma_move_header(rdma, idx + 1, &head);
>> +    memcpy(remote_ram_blocks.remote_area, rdma->wr_data[idx + 1].control_curr,
>> +                    remote_ram_blocks.remote_size);
>> +
>> +    ret = qemu_rdma_process_remote_ram_blocks(
>> +                            &local_ram_blocks, &remote_ram_blocks);
>> +    if (ret) {
>> +        fprintf(stderr, "rdma migration: error processing"
>> +                        " remote ram blocks!\n");
>> +        goto err_rdma_client_connect;
>> +    }
>> +
>> +    if (rdma->chunk_register_destination) {
>> +        for (x = 0; x < local_ram_blocks.num_blocks; x++) {
>> +            RDMALocalBlock *block = &(local_ram_blocks.block[x]);
>> +            int num_chunks = RDMA_REG_NUM_CHUNKS(block);
>> +            /* allocate memory to store remote rkeys */
>> +            block->remote_keys = g_malloc0(num_chunks * sizeof(uint32_t));
>> +        }
>> +    }
>> +    rdma->control_ready_expected = 1;
>> +    rdma->num_signaled_send = 0;
>> +    return 0;
>> +
>> +err_rdma_client_connect:
>> +    qemu_rdma_cleanup(rdma);
>> +    return -1;
>> +}
>> +
>> +static int qemu_rdma_server_init(RDMAContext *rdma, Error **errp)
>> +{
>> +    int ret, idx;
>> +    struct sockaddr_in sin;
>> +    struct rdma_cm_id *listen_id;
>> +    char ip[40] = "unknown";
>> +
>> +    for (idx = 0; idx < RDMA_CONTROL_MAX_WR; idx++) {
>> +        rdma->wr_data[idx].control_len = 0;
>> +        rdma->wr_data[idx].control_curr = NULL;
>> +    }
>> +
>> +    if (rdma->host == NULL) {
>> +        fprintf(stderr, "Error: RDMA host is not set!");
>> +        return -1;
>> +    }
>> +    /* create CM channel */
>> +    rdma->channel = rdma_create_event_channel();
>> +    if (!rdma->channel) {
>> +        fprintf(stderr, "Error: could not create rdma event channel");
>> +        return -1;
>> +    }
>> +
>> +    /* create CM id */
>> +    ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
>> +    if (ret) {
>> +        fprintf(stderr, "Error: could not create cm_id!");
>> +        goto err_server_init_create_listen_id;
>> +    }
>> +
>> +    memset(&sin, 0, sizeof(sin));
>> +    sin.sin_family = AF_INET;
>> +    sin.sin_port = htons(rdma->port);
>> +
>> +    if (rdma->host && strcmp("", rdma->host)) {
>> +        struct hostent *server_addr;
>> +        server_addr = gethostbyname(rdma->host);
>> +        if (!server_addr) {
>> +            fprintf(stderr, "Error: migration could not gethostbyname!");
>> +            goto err_server_init_bind_addr;
>> +        }
>> +        memcpy(&sin.sin_addr.s_addr, server_addr->h_addr,
>> +                server_addr->h_length);
>> +        inet_ntop(AF_INET, server_addr->h_addr, ip, sizeof ip);
>> +    } else {
>> +        sin.sin_addr.s_addr = INADDR_ANY;
>> +    }
>> +
>> +    DPRINTF("%s => %s\n", rdma->host, ip);
>> +
>> +    ret = rdma_bind_addr(listen_id, (struct sockaddr *)&sin);
>> +    if (ret) {
>> +        fprintf(stderr, "Error: could not rdma_bind_addr!");
>> +        goto err_server_init_bind_addr;
>> +    }
>> +
>> +    rdma->listen_id = listen_id;
>> +    if (listen_id->verbs) {
>> +        rdma->verbs = listen_id->verbs;
>> +    }
>> +    qemu_rdma_dump_id("server_init", rdma->verbs);
>> +    qemu_rdma_dump_gid("server_init", listen_id);
>> +    return 0;
>> +
>> +err_server_init_bind_addr:
>> +    rdma_destroy_id(listen_id);
>> +err_server_init_create_listen_id:
>> +    rdma_destroy_event_channel(rdma->channel);
>> +    rdma->channel = NULL;
>> +    return -1;
>> +
>> +}
>> +
>> +static int qemu_rdma_server_prepare(RDMAContext *rdma, Error **errp)
>> +{
>> +    int ret;
>> +    int idx;
>> +
>> +    if (!rdma->verbs) {
>> +        fprintf(stderr, "rdma migration: no verbs context!");
>> +        return 0;
>> +    }
>> +
>> +    ret = qemu_rdma_alloc_pd_cq(rdma);
>> +    if (ret) {
>> +        fprintf(stderr, "rdma migration: error allocating pd and cq!");
>> +        goto err_rdma_server_prepare;
>> +    }
>> +
>> +    ret = qemu_rdma_init_ram_blocks(&local_ram_blocks);
>> +    if (ret) {
>> +        fprintf(stderr, "rdma migration: error initializing ram blocks!");
>> +        goto err_rdma_server_prepare;
>> +    }
>> +
>> +    qemu_rdma_remote_ram_blocks_init();
>> +
>> +    /* Extra one for the send buffer */
>> +    for (idx = 0; idx < (RDMA_CONTROL_MAX_WR + 1); idx++) {
>> +        ret = qemu_rdma_reg_control(rdma, idx);
>> +        if (ret) {
>> +            fprintf(stderr, "rdma migration: error registering %d control!",
>> +                        idx);
>> +            goto err_rdma_server_prepare;
>> +        }
>> +    }
>> +
>> +    ret = rdma_listen(rdma->listen_id, 5);
>> +    if (ret) {
>> +        fprintf(stderr, "rdma migration: error listening on socket!");
>> +        goto err_rdma_server_prepare;
>> +    }
>> +
>> +    return 0;
>> +
>> +err_rdma_server_prepare:
>> +    qemu_rdma_cleanup(rdma);
>> +    return -1;
>> +}
>> +
>> +static void *qemu_rdma_data_init(const char *host_port, Error **errp)
>> +{
>> +    RDMAContext *rdma = NULL;
>> +    InetSocketAddress *addr;
>> +
>> +    if (host_port) {
>> +        rdma = g_malloc0(sizeof(RDMAContext));
>> +        memset(rdma, 0, sizeof(RDMAContext));
>> +        rdma->current_index = -1;
>> +        rdma->current_chunk = -1;
>> +
>> +        addr = inet_parse(host_port, errp);
>> +        if (addr != NULL) {
>> +            rdma->port = atoi(addr->port);
>> +            rdma->host = g_strdup(addr->host);
>> +            printf("rdma host: %s\n", rdma->host);
>> +            printf("rdma port: %d\n", rdma->port);
>> +        } else {
>> +            error_setg(errp, "bad RDMA migration address '%s'", host_port);
>> +            g_free(rdma);
>> +            return NULL;
>> +        }
>> +    }
>> +
>> +    return rdma;
>> +}
>> +
>> +/*
>> + * QEMUFile interface to the control channel.
>> + * SEND messages for control only.
>> + * pc.ram is handled with regular RDMA messages.
>> + */
>> +static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
>> +                                int64_t pos, int size)
>> +{
>> +    QEMUFileRDMA *r = opaque;
>> +    QEMUFile *f = r->file;
>> +    RDMAContext *rdma = r->rdma;
>> +    size_t remaining = size;
>> +    uint8_t * data = (void *) buf;
>> +    int ret;
>> +
>> +    /*
>> +     * Push out any writes that
>> +     * we're queued up for pc.ram.
>> +     */
>> +    if (qemu_rdma_write_flush(f, rdma) < 0) {
>> +        return -EIO;
>> +    }
>> +
>> +    while (remaining) {
>> +        RDMAControlHeader head;
>> +
>> +        r->len = MIN(remaining, RDMA_SEND_INCREMENT);
>> +        remaining -= r->len;
>> +
>> +        head.len = r->len;
>> +        head.type = RDMA_CONTROL_QEMU_FILE;
>> +        head.version = RDMA_CONTROL_CURRENT_VERSION;
>> +
>> +        ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL);
>> +
>> +        if (ret < 0) {
>> +            return ret;
>> +        }
>> +
>> +        data += r->len;
>> +    }
>> +
>> +    return size;
>> +}
>> +
>> +static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
>> +                             int size, int idx)
>> +{
>> +    size_t len = 0;
>> +
>> +    if (rdma->wr_data[idx].control_len) {
>> +        DPRINTF("RDMA %" PRId64 " of %d bytes already in buffer\n",
>> +                    rdma->wr_data[idx].control_len, size);
>> +
>> +        len = MIN(size, rdma->wr_data[idx].control_len);
>> +        memcpy(buf, rdma->wr_data[idx].control_curr, len);
>> +        rdma->wr_data[idx].control_curr += len;
>> +        rdma->wr_data[idx].control_len -= len;
>> +    }
>> +
>> +    return len;
>> +}
>> +
>> +/*
>> + * QEMUFile interface to the control channel.
>> + * RDMA links don't use bytestreams, so we have to
>> + * return bytes to QEMUFile opportunistically.
>> + */
>> +static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
>> +                                int64_t pos, int size)
>> +{
>> +    QEMUFileRDMA *r = opaque;
>> +    RDMAContext *rdma = r->rdma;
>> +    RDMAControlHeader head;
>> +    int ret = 0;
>> +
>> +    /*
>> +     * First, we hold on to the last SEND message we
>> +     * were given and dish out the bytes until we run
>> +     * out of bytes.
>> +     */
>> +    r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
>> +    if (r->len) {
>> +        return r->len;
>> +    }
>> +
>> +    /*
>> +     * Once we run out, we block and wait for another
>> +     * SEND message to arrive.
>> +     */
>> +    ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
>> +
>> +    if (ret < 0) {
>> +        return ret;
>> +    }
>> +
>> +    /*
>> +     * SEND was received with new bytes, now try again.
>> +     */
>> +    return qemu_rdma_fill(r->rdma, buf, size, 0);
>> +}
>> +
>> +/*
>> + * Block until all the outstanding chunks have been delivered by the hardware.
>> + */
>> +static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
>> +{
>> +    int ret;
>> +
>> +    if (qemu_rdma_write_flush(f, rdma) < 0) {
>> +        return -EIO;
>> +    }
>> +
>> +    while (rdma->num_signaled_send) {
>> +        ret = wait_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
>> +        if (ret < 0) {
>> +            fprintf(stderr, "rdma migration: complete polling error!\n");
>> +            return -EIO;
>> +        }
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +static int qemu_rdma_close(void *opaque)
>> +{
>> +    QEMUFileRDMA *r = opaque;
>> +    if (r->rdma) {
>> +        qemu_rdma_cleanup(r->rdma);
>> +        g_free(r->rdma);
>> +    }
>> +    g_free(r);
>> +    return 0;
>> +}
>> +
>> +static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
>> +                   ram_addr_t block_offset, ram_addr_t offset, size_t size)
>> +{
>> +    ram_addr_t current_addr = block_offset + offset;
>> +    QEMUFileRDMA *rfile = opaque;
>> +    RDMAContext *rdma;
>> +    int ret;
>> +
>> +    if (rfile) {
>> +        rdma = rfile->rdma;
>> +    } else {
>> +        return -ENOTSUP;
>> +    }
> This "if" should always be true, no?
>
>> +
>> +    qemu_ftell(f);
> If this is a trick to call qemu_fflush, please export the function
> instead, or alternatively call it from ram_control_save_page before
> f->ops->save_page (and after testing that f->ops->save_page exists).
>
>> +    /*
>> +     * Add this page to the current 'chunk'. If the chunk
>> +     * is full, or the page doen't belong to the current chunk,
>> +     * an actual RDMA write will occur and a new chunk will be formed.
>> +     */
>> +    ret = qemu_rdma_write(f, rdma, current_addr, size);
>> +    if (ret < 0) {
>> +        fprintf(stderr, "rdma migration: write error! %d\n", ret);
>> +        return ret;
>> +    }
>> +
>> +    /*
>> +     * Drain the Completion Queue if possible, but do not block,
>> +     * just poll.
>> +     *
>> +     * If nothing to poll, the end of the iteration will do this
>> +     * again to make sure we don't overflow the request queue.
>> +     */
>> +    while (1) {
>> +        int ret = qemu_rdma_poll(rdma);
>> +        if (ret == RDMA_WRID_NONE) {
>> +            break;
>> +        }
>> +        if (ret < 0) {
>> +            fprintf(stderr, "rdma migration: polling error! %d\n", ret);
>> +            return ret;
>> +        }
>> +    }
>> +
>> +    return size;
>> +}
>> +
>> +static int qemu_rdma_accept(RDMAContext *rdma)
>> +{
>> +    RDMAControlHeader head = { .len = remote_ram_blocks.remote_size,
>> +                               .type = RDMA_CONTROL_RAM_BLOCKS,
>> +                               .version = RDMA_CONTROL_CURRENT_VERSION,
>> +                               .repeat = 1,
>> +                             };
>> +    RDMACapabilities cap;
>> +    uint32_t requested_flags;
>> +    struct rdma_conn_param conn_param = {
>> +                                            .responder_resources = 2,
>> +                                            .private_data = &cap,
>> +                                            .private_data_len = sizeof(cap),
>> +                                         };
>> +    struct rdma_cm_event *cm_event;
>> +    struct ibv_context *verbs;
>> +    int ret;
>> +
>> +    ret = rdma_get_cm_event(rdma->channel, &cm_event);
>> +    if (ret) {
>> +        goto err_rdma_server_wait;
>> +    }
>> +
>> +    if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
>> +        rdma_ack_cm_event(cm_event);
>> +        goto err_rdma_server_wait;
>> +    }
>> +
>> +    memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
>> +
>> +    network_to_caps(&cap);
>> +
>> +    if (cap.version < RDMA_CONTROL_VERSION_MIN ||
>> +            cap.version > RDMA_CONTROL_VERSION_MAX) {
>> +            fprintf(stderr, "Unknown client RDMA version: %d, bailing...\n",
>> +                            cap.version);
>> +            rdma_ack_cm_event(cm_event);
> As usual, please unify the calls to rdma_ack_cm_event (three in this case).
>
>> +            goto err_rdma_server_wait;
>> +    }
>> +
>> +    if (cap.version == RDMA_CONTROL_VERSION_1) {
>> +        if (cap.flags & RDMA_CAPABILITY_CHUNK_REGISTER) {
>> +            rdma->chunk_register_destination = true;
>> +        } else if (cap.flags & RDMA_CAPABILITY_NEXT_FEATURE) {
>> +            /* handle new capability */
>> +        }
> As mentioned above, please drop this "else if".  But in general, this
> "if" is useless.  Please replace it with an
>
>      /* We only support one version, and we rejected all
>       * others above.
>       */
>      assert(cap.version == RDMA_CONTROL_VERSION_CURRENT);
>
>
>> +    } else {
>> +        fprintf(stderr, "Unknown client RDMA version: %d, bailing...\n",
>> +                        cap.version);
>> +        rdma_ack_cm_event(cm_event);
>> +        goto err_rdma_server_wait;
>> +    }
>> +
>> +    rdma->cm_id = cm_event->id;
>> +    verbs = cm_event->id->verbs;
>> +
>> +    rdma_ack_cm_event(cm_event);
>> +
>> +    /*
>> +     * Respond to client with the capabilities we agreed to support.
>> +     */
>> +    requested_flags = cap.flags;
>> +    cap.flags = 0;
>> +
>> +    if (rdma->chunk_register_destination &&
>> +        (requested_flags & RDMA_CAPABILITY_CHUNK_REGISTER)) {
>> +        cap.flags |= RDMA_CAPABILITY_CHUNK_REGISTER;
>> +    }
>> +
>> +    printf("Chunk registration %s\n",
>> +        rdma->chunk_register_destination ? "enabled" : "disabled");
>> +
>> +    caps_to_network(&cap);
>> +
>> +    DPRINTF("verbs context after listen: %p\n", verbs);
>> +
>> +    if (!rdma->verbs) {
>> +        rdma->verbs = verbs;
>> +        ret = qemu_rdma_server_prepare(rdma, NULL);
>> +        if (ret) {
>> +            fprintf(stderr, "rdma migration: error preparing server!\n");
>> +            goto err_rdma_server_wait;
>> +        }
>> +    } else if (rdma->verbs != verbs) {
>> +            fprintf(stderr, "ibv context not matching %p, %p!\n",
>> +                    rdma->verbs, verbs);
>> +            goto err_rdma_server_wait;
>> +    }
>> +
>> +    /* xxx destroy listen_id ??? */
>> +
>> +    qemu_set_fd_handler2(rdma->channel->fd, NULL, NULL, NULL, NULL);
>> +
>> +    ret = qemu_rdma_alloc_qp(rdma);
>> +    if (ret) {
>> +        fprintf(stderr, "rdma migration: error allocating qp!");
>> +        goto err_rdma_server_wait;
>> +    }
>> +
>> +    ret = rdma_accept(rdma->cm_id, &conn_param);
>> +    if (ret) {
>> +        fprintf(stderr, "rdma_accept returns %d!\n", ret);
>> +        goto err_rdma_server_wait;
>> +    }
>> +
>> +    ret = rdma_get_cm_event(rdma->channel, &cm_event);
>> +    if (ret) {
>> +        fprintf(stderr, "rdma_accept get_cm_event failed %d!\n", ret);
>> +        goto err_rdma_server_wait;
>> +    }
>> +
>> +    if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
>> +        fprintf(stderr, "rdma_accept not event established!\n");
>> +        rdma_ack_cm_event(cm_event);
>> +        goto err_rdma_server_wait;
>> +    }
>> +
>> +    rdma_ack_cm_event(cm_event);
> Usual comment on rdma_ack_cm_event.
>
>> +    ret = qemu_rdma_post_recv_control(rdma, 0);
>> +    if (ret) {
>> +        fprintf(stderr, "rdma migration: error posting second control recv!");
>> +        goto err_rdma_server_wait;
>> +    }
>> +
>> +    if (rdma->chunk_register_destination == false) {
> Use "!rdma->chunk_register_destination".
>
>> +        ret = qemu_rdma_server_reg_ram_blocks(rdma, &local_ram_blocks);
>> +        if (ret) {
>> +            fprintf(stderr, "rdma migration: error server "
>> +                            "registering ram blocks!");
>> +            goto err_rdma_server_wait;
>> +        }
>> +    }
>> +
>> +    qemu_rdma_copy_to_remote_ram_blocks(rdma,
>> +            &local_ram_blocks, &remote_ram_blocks);
>> +
>> +    ret = qemu_rdma_post_send_control(rdma,
>> +            (uint8_t *) remote_ram_blocks.remote_area, &head);
>> +
>> +    if (ret < 0) {
>> +        fprintf(stderr, "rdma migration: error sending remote info!");
>> +        goto err_rdma_server_wait;
>> +    }
>> +
>> +    qemu_rdma_dump_gid("server_connect", rdma->cm_id);
>> +
>> +    return 0;
>> +
>> +err_rdma_server_wait:
>> +    qemu_rdma_cleanup(rdma);
>> +    return ret;
>> +}
>> +
>> +/*
>> + * During each iteration of the migration, we listen for instructions
>> + * by the primary VM to perform dynamic page registrations before they
> Please use "server" or "client" instead of "primary VM".
>
> server == source, client == destination.
>
>> + * can perform RDMA operations.
>> + *
>> + * We respond with the 'rkey'.
>> + *
>> + * Keep doing this until the primary tells us to stop.
>> + */
>> +static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
>> +                                         uint32_t flags)
>> +{
>> +    RDMAControlHeader resp = { .len = sizeof(RDMARegisterResult),
>> +                               .type = RDMA_CONTROL_REGISTER_RESULT,
>> +                               .version = RDMA_CONTROL_CURRENT_VERSION,
>> +                               .repeat = 0,
>> +                             };
>> +    QEMUFileRDMA *rfile = opaque;
>> +    RDMAContext *rdma = rfile->rdma;
>> +    RDMAControlHeader head;
>> +    RDMARegister *reg, *registers;
>> +    RDMACompress *comp;
>> +    RDMARegisterResult *reg_result;
>> +    static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
>> +    RDMALocalBlock *block;
>> +    void *host_addr;
>> +    int ret = 0;
>> +    int idx = 0;
>> +    int count = 0;
>> +
>> +
>> +    do {
>> +        DPRINTF("Waiting for next registration %d...\n", flags);
>> +
>> +        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
>> +
>> +        if (ret < 0) {
>> +            break;
>> +        }
>> +
>> +        if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
>> +            printf("Too many requests in this message (%d). Bailing.\n",
>> +                head.repeat);
>> +            ret = -EIO;
>> +            goto out;
>> +        }
>> +
>> +        switch (head.type) {
>> +        case RDMA_CONTROL_COMPRESS:
>> +            comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
>> +
>> +            DPRINTF("Zapping zero chunk: %" PRId64
>> +                " bytes, index %d, offset %" PRId64 "\n",
>> +                comp->length, comp->block_idx, comp->offset);
>> +            comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
>> +            block = &(local_ram_blocks.block[comp->block_idx]);
>> +
>> +            host_addr = block->local_host_addr +
>> +                            (comp->offset - block->offset);
>> +
>> +            ram_handle_compressed(host_addr, comp->value, comp->length);
>> +            break;
>> +        case RDMA_CONTROL_REGISTER_FINISHED:
>> +            DPRINTF("Current registrations complete.\n");
>> +            goto out;
>> +        case RDMA_CONTROL_REGISTER_REQUEST:
>> +            DPRINTF("There are %d registration requests\n", head.repeat);
>> +
>> +            resp.repeat = head.repeat;
>> +            registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
>> +
>> +            for (count = 0; count < head.repeat; count++) {
>> +                reg = &registers[count];
>> +                reg_result = &results[count];
>> +
>> +                DPRINTF("Registration request (%d): %d"
>> +                    " bytes, index %d, offset %" PRId64 "\n",
>> +                    count, reg->len, reg->current_index, reg->offset);
>> +
>> +                block = &(local_ram_blocks.block[reg->current_index]);
>> +                host_addr = (block->local_host_addr +
>> +                            (reg->offset - block->offset));
>> +                if (qemu_rdma_register_and_get_keys(rdma, block,
>> +                            (uint64_t)host_addr, NULL, &reg_result->rkey)) {
>> +                    fprintf(stderr, "cannot get rkey!\n");
>> +                    ret = -EINVAL;
>> +                    goto out;
>> +                }
>> +
>> +                DPRINTF("Registered rkey for this request: %x\n",
>> +                                reg_result->rkey);
>> +            }
>> +
>> +            ret = qemu_rdma_post_send_control(rdma,
>> +                            (uint8_t *) results, &resp);
>> +
>> +            if (ret < 0) {
>> +                fprintf(stderr, "Failed to send control buffer!\n");
>> +                goto out;
>> +            }
>> +            break;
>> +        case RDMA_CONTROL_REGISTER_RESULT:
>> +            fprintf(stderr, "Invalid RESULT message at server.\n");
>> +            ret = -EIO;
>> +            goto out;
>> +        default:
>> +            fprintf(stderr, "Unknown control message %s\n",
>> +                                control_desc[head.type]);
>> +            ret = -EIO;
>> +            goto out;
>> +        }
>> +    } while (1);
>> +
>> +out:
>> +    return ret;
>> +}
>> +
>> +static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
>> +                                        uint32_t flags)
>> +{
>> +    DPRINTF("start section: %d\n", flags);
>> +    qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
>> +    qemu_ftell(f);
>> +    return 0;
>> +}
>> +
>> +/*
>> + * Inform server that dynamic registrations are done for now.
>> + * First, flush writes, if any.
>> + */
>> +static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
>> +                                       uint32_t flags)
>> +{
>> +    QEMUFileRDMA *rfile = opaque;
>> +    RDMAContext *rdma = rfile->rdma;
>> +    RDMAControlHeader head = { .len = 0,
>> +                               .type = RDMA_CONTROL_REGISTER_FINISHED,
>> +                               .version = RDMA_CONTROL_CURRENT_VERSION,
>> +                               .repeat = 1,
>> +                             };
>> +    qemu_ftell(f);
>> +    int ret = qemu_rdma_drain_cq(f, rdma);
>> +
>> +    if (ret >= 0) {
>> +        DPRINTF("Sending registration finish %d...\n", flags);
>> +
>> +        ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL);
>> +    }
>> +
>> +    return ret;
>> +}
>> +
>> +const QEMUFileOps rdma_read_ops = {
>> +    .get_buffer    = qemu_rdma_get_buffer,
>> +    .close         = qemu_rdma_close,
>> +    .hook_ram_load = qemu_rdma_registration_handle,
>> +};
>> +
>> +const QEMUFileOps rdma_write_ops = {
>> +    .put_buffer           = qemu_rdma_put_buffer,
>> +    .close                = qemu_rdma_close,
>> +    .before_ram_iterate   = qemu_rdma_registration_start,
>> +    .after_ram_iterate    = qemu_rdma_registration_stop,
>> +    .save_page            = qemu_rdma_save_page,
>> +};
>> +
>> +static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
>> +{
>> +    QEMUFileRDMA *r = g_malloc0(sizeof(QEMUFileRDMA));
>> +
>> +    if (qemu_file_mode_is_not_valid(mode)) {
>> +        return NULL;
>> +    }
>> +
>> +    r->rdma = rdma;
>> +
>> +    if (mode[0] == 'w') {
>> +        r->file = qemu_fopen_ops(r, &rdma_write_ops);
>> +    } else {
>> +        r->file = qemu_fopen_ops(r, &rdma_read_ops);
>> +    }
>> +
>> +    return r->file;
>> +}
>> +
>> +static void rdma_accept_incoming_migration(void *opaque)
>> +{
>> +    RDMAContext *rdma = opaque;
>> +    int ret;
>> +    QEMUFile *f;
>> +
>> +    DPRINTF("Accepting rdma connection...\n");
>> +    ret = qemu_rdma_accept(rdma);
>> +    if (ret) {
>> +        fprintf(stderr, "RDMA Migration initialization failed!\n");
>> +        goto err;
>> +    }
>> +
>> +    DPRINTF("Accepted migration\n");
>> +
>> +    f = qemu_fopen_rdma(rdma, "rb");
>> +    if (f == NULL) {
>> +        fprintf(stderr, "could not qemu_fopen_rdma!\n");
>> +        goto err;
>> +    }
>> +
>> +    process_incoming_migration(f);
>> +    return;
>> +
>> +err:
>> +    qemu_rdma_cleanup(rdma);
>> +}
>> +
>> +void rdma_start_incoming_migration(const char *host_port, Error **errp)
>> +{
>> +    int ret;
>> +    RDMAContext *rdma;
>> +
>> +    DPRINTF("Starting RDMA-based incoming migration\n");
>> +    rdma = qemu_rdma_data_init(host_port, errp);
>> +    if (rdma == NULL) {
>> +        return;
>> +    }
>> +
>> +    ret = qemu_rdma_server_init(rdma, NULL);
>> +
>> +    if (!ret) {
>> +        DPRINTF("qemu_rdma_server_init success\n");
>> +        ret = qemu_rdma_server_prepare(rdma, NULL);
>> +
>> +        if (!ret) {
>> +            DPRINTF("qemu_rdma_server_prepare success\n");
>> +
>> +            qemu_set_fd_handler2(rdma->channel->fd, NULL,
> I must have asked before, is it possible to use rdma->channel->fd or
> something similar also in qemu_rdma_block_for_wrid, before calling
> qemu_rdma_poll?
>
>> +                                 rdma_accept_incoming_migration, NULL,
>> +                                    (void *)(intptr_t) rdma);
>> +            return;
>> +        }
>> +    }
>> +
>> +    g_free(rdma);
>> +}
>> +
>> +void rdma_start_outgoing_migration(void *opaque,
>> +                            const char *host_port, Error **errp)
>> +{
>> +    MigrationState *s = opaque;
>> +    RDMAContext *rdma = NULL;
>> +    int ret;
>> +
>> +    rdma = qemu_rdma_data_init(host_port, errp);
>> +    if (rdma == NULL) {
>> +        goto err;
>> +    }
>> +
>> +    ret = qemu_rdma_client_init(rdma, NULL,
>> +        s->enabled_capabilities[MIGRATION_CAPABILITY_CHUNK_REGISTER_DESTINATION]);
>> +
>> +    if (!ret) {
>> +        DPRINTF("qemu_rdma_client_init success\n");
>> +        ret = qemu_rdma_connect(rdma, NULL);
>> +
>> +        if (!ret) {
>> +            DPRINTF("qemu_rdma_client_connect success\n");
>> +            s->file = qemu_fopen_rdma(rdma, "wb");
>> +            migrate_fd_connect(s);
>> +            return;
>> +        }
>> +    }
>> +err:
>> +    g_free(rdma);
>> +    migrate_fd_error(s);
>> +    error_setg(errp, "Error connecting using rdma! %d\n", ret);
>> +}
>> +
>> diff --git a/migration.c b/migration.c
>> index b46e103..0a428f0 100644
>> --- a/migration.c
>> +++ b/migration.c
>> @@ -78,6 +78,10 @@ void qemu_start_incoming_migration(const char *uri, Error **errp)
>>   
>>       if (strstart(uri, "tcp:", &p))
>>           tcp_start_incoming_migration(p, errp);
>> +#ifdef CONFIG_RDMA
>> +    else if (strstart(uri, "x-rdma:", &p))
>> +        rdma_start_incoming_migration(p, errp);
>> +#endif
>>   #if !defined(WIN32)
>>       else if (strstart(uri, "exec:", &p))
>>           exec_start_incoming_migration(p, errp);
>> @@ -405,6 +409,10 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
>>   
>>       if (strstart(uri, "tcp:", &p)) {
>>           tcp_start_outgoing_migration(s, p, &local_err);
>> +#ifdef CONFIG_RDMA
>> +    } else if (strstart(uri, "x-rdma:", &p)) {
>> +        rdma_start_outgoing_migration(s, p, &local_err);
>> +#endif
>>   #if !defined(WIN32)
>>       } else if (strstart(uri, "exec:", &p)) {
>>           exec_start_outgoing_migration(s, p, &local_err);
>>
>

  parent reply	other threads:[~2013-04-16 14:48 UTC|newest]

Thread overview: 28+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2013-04-16  2:44 [Qemu-devel] [PULL 0/8] rdma: migration support mrhines
2013-04-16  2:44 ` [Qemu-devel] [PULL 1/8] rdma: introduce qemu_ram_foreach_block mrhines
2013-04-16  2:44 ` [Qemu-devel] [PULL 2/8] rdma: new QEMUFileOps hooks mrhines
2013-04-16  3:24   ` Paolo Bonzini
2013-04-16  4:44     ` Michael R. Hines
2013-04-16  2:44 ` [Qemu-devel] [PULL 3/8] rdma: export ram_handle_compressed() mrhines
2013-04-16  2:44 ` [Qemu-devel] [PULL 4/8] rdma: introduce capability for chunk registration mrhines
2013-04-16  4:50   ` Paolo Bonzini
2013-04-16 12:58     ` Eric Blake
2013-04-16 14:54       ` Michael R. Hines
2013-04-16  2:44 ` [Qemu-devel] [PULL 5/8] rdma: core rdma logic mrhines
2013-04-16  4:49   ` Paolo Bonzini
2013-04-16  6:09     ` Paolo Bonzini
2013-04-16 14:48     ` Michael R. Hines [this message]
2013-04-17  3:27     ` Michael R. Hines
2013-04-17  8:58       ` Paolo Bonzini
2013-04-17 15:59         ` Michael R. Hines
2013-04-17 16:48           ` Paolo Bonzini
2013-04-16  2:44 ` [Qemu-devel] [PULL 6/8] rdma: send pc.ram mrhines
2013-04-16  3:25   ` Paolo Bonzini
2013-04-16  2:44 ` [Qemu-devel] [PULL 7/8] rdma: print out throughput while debugging mrhines
2013-04-16  2:44 ` [Qemu-devel] [PULL 8/8] rdma: add documentation mrhines
2013-04-16  3:23 ` [Qemu-devel] [PULL 0/8] rdma: migration support Paolo Bonzini
2013-04-16 14:32   ` Anthony Liguori
2013-04-16 14:34     ` Paolo Bonzini
2013-04-16 15:42       ` Anthony Liguori
2013-04-16 14:52     ` Michael R. Hines
2013-04-16 14:53       ` Paolo Bonzini

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=516D64AC.10009@linux.vnet.ibm.com \
    --to=mrhines@linux.vnet.ibm.com \
    --cc=abali@us.ibm.com \
    --cc=aliguori@us.ibm.com \
    --cc=gokul@us.ibm.com \
    --cc=mrhines@us.ibm.com \
    --cc=mst@redhat.com \
    --cc=owasserm@redhat.com \
    --cc=pbonzini@redhat.com \
    --cc=qemu-devel@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).