qemu-devel.nongnu.org archive mirror
 help / color / mirror / Atom feed
From: Anthony Liguori <anthony@codemonkey.ws>
To: Ori Mamluk <omamluk@zerto.com>
Cc: Kevin Wolf <kwolf@redhat.com>, dlaor@redhat.com, qemu-devel@nongnu.org
Subject: Re: [Qemu-devel] [RFC PATCH] replication agent module
Date: Tue, 07 Feb 2012 06:12:46 -0600	[thread overview]
Message-ID: <4F31153E.9010205@codemonkey.ws> (raw)
In-Reply-To: <73865e0ce364c40e0eb65ec6b22b819d@mail.gmail.com>

Hi,

On 02/07/2012 04:29 AM, Ori Mamluk wrote:
> Repagent is a new module that allows an external replication system to
> replicate a volume of a Qemu VM.
>
> This RFC patch adds the repagent client module to Qemu.

Please read http://wiki.qemu.org/Contribute/SubmitAPatch

In particular, use a tool like git-send-email and split this patch up into more 
manageable chunks.

Is there an Open Source rehub available?  As a project policy, adding external 
APIs specifically for proprietary software is not something we're willing to do.

Regards,

Anthony Liguori

> Documentation of the module role and API is in the patch at
> replication/qemu-repagent.txt
>
>
>
> The main motivation behind the module is to allow replication of VMs in a
> virtualization environment like RhevM.
>
> To achieve this we need basic replication support in Qemu.
>
>
>
> This is the first submission of this module, which was written as a Proof
> Of Concept, and used successfully for replicating and recovering a Qemu VM.
>
> Points and open issues:
>
> *             The module interfaces the Qemu storage stack at block.c
> generic layer. Is this the right place to intercept/inject IOs?
>
> *             The patch contains performing IO reads invoked by a new
> thread (a TCP listener thread). See repaget_read_vol in repagent.c. It is
> not protected by any lock – is this OK?
>
> *             VM ID – the replication system implies an environment with
> several VMs connected to a central replication system (Rephub).
>
>                  This requires some sort of identification for a VM. The
> current patch does not include a VM ID – I did not find any adequate ID to
> use.
>
>                  Any suggestions?
>
>
>
> Appreciate any feedback or suggestions.  Thanks,
>
> Ori.
>
>
>
>
>
>  From 5a0d88689ddcf325f25fdfca2a2012f1bbf141b9 Mon Sep 17 00:00:00 2001
>
> From: Ori Mamluk<orim@orim-fedora.(none)>
>
> Date: Tue, 7 Feb 2012 11:12:12 +0200
>
> Subject: [PATCH] Added replication agent module (repagent) to Qemu under
>
> replication directory, added repagent configure and run
>
> options, and the repagent API usage in bloc
>
>
>
> Added build options to ./configure:  --enable-replication --disable-replicat
>
> Added a commandline option to enable: -repagent<rep hub IP>
>
> Added the module files under replication.
>
>
>
> Signed-off-by: Ori Mamluk<orim@zerto.com>
>
> ---
>
> Makefile                      |    9 +-
>
> Makefile.objs                 |    6 +
>
> block.c                       |   20 +++-
>
> configure                     |   11 ++
>
> qemu-options.hx               |    6 +
>
> replication/qemu-repagent.txt |  104 +++++++++++++
>
> replication/repagent.c        |  322
> +++++++++++++++++++++++++++++++++++++++++
>
> replication/repagent.h        |   46 ++++++
>
> replication/repagent_client.c |  138 ++++++++++++++++++
>
> replication/repagent_client.h |   36 +++++
>
> replication/repcmd.h          |   59 ++++++++
>
> replication/repcmd_listener.c |  137 +++++++++++++++++
>
> replication/repcmd_listener.h |   32 ++++
>
> replication/rephub_cmds.h     |  150 +++++++++++++++++++
>
> replication/rephub_defs.h     |   40 +++++
>
> vl.c                          |   10 ++
>
> 16 files changed, 1121 insertions(+), 5 deletions(-)
>
> mode change 100644 =>  100755 Makefile.objs
>
> mode change 100644 =>  100755 qemu-options.hx
>
> create mode 100755 replication/qemu-repagent.txt
>
> create mode 100644 replication/repagent.c
>
> create mode 100644 replication/repagent.h
>
> create mode 100644 replication/repagent_client.c
>
> create mode 100644 replication/repagent_client.h
>
> create mode 100644 replication/repcmd.h
>
> create mode 100644 replication/repcmd_listener.c
>
> create mode 100644 replication/repcmd_listener.h
>
> create mode 100644 replication/rephub_cmds.h
>
> create mode 100644 replication/rephub_defs.h
>
>
>
> diff --git a/Makefile b/Makefile
>
> index 4f6eaa4..a1b3701 100644
>
> --- a/Makefile
>
> +++ b/Makefile
>
> @@ -149,9 +149,9 @@ qemu-img.o qemu-tool.o qemu-nbd.o qemu-io.o cmd.o
> qemu-ga.o: $(GENERATED_HEADERS
>
> tools-obj-y = qemu-tool.o $(oslib-obj-y) $(trace-obj-y) \
>
>                 qemu-timer-common.o cutils.o
>
> -qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y)
>
> -qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y)
>
> -qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y)
>
> +qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y)
> $(replication-obj-y)
>
> +qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y)
> $(replication-obj-y)
>
> +qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y)
> $(replication-obj-y)
>
>   qemu-img-cmds.h: $(SRC_PATH)/qemu-img-cmds.hx
>
>                 $(call quiet-command,sh $(SRC_PATH)/scripts/hxtool -h<  $<  >
> $@,"  GEN   $@")
>
> @@ -228,6 +228,7 @@ clean:
>
>                 rm -f trace-dtrace.dtrace trace-dtrace.dtrace-timestamp
>
>                 rm -f trace-dtrace.h trace-dtrace.h-timestamp
>
>                 rm -rf $(qapi-dir)
>
> +             rm -f replication/*.{o,d}
>
>                 $(MAKE) -C tests clean
>
>                 for d in $(ALL_SUBDIRS) $(QEMULIBS) libcacard; do \
>
>                 if test -d $$d; then $(MAKE) -C $$d $@ || exit 1; fi; \
>
> @@ -387,4 +388,4 @@ tar:
>
>                 rm -rf /tmp/$(FILE)
>
>   # Include automatically generated dependency files
>
> --include $(wildcard *.d audio/*.d slirp/*.d block/*.d net/*.d ui/*.d
> qapi/*.d qga/*.d)
>
> +-include $(wildcard *.d audio/*.d slirp/*.d block/*.d net/*.d ui/*.d
> qapi/*.d qga/*.d replication/*.d)
>
> diff --git a/Makefile.objs b/Makefile.objs
>
> old mode 100644
>
> new mode 100755
>
> index d7a6539..dbd6f15
>
> --- a/Makefile.objs
>
> +++ b/Makefile.objs
>
> @@ -74,6 +74,7 @@ fsdev-obj-$(CONFIG_VIRTFS) += $(addprefix fsdev/,
> $(fsdev-nested-y))
>
> # CPUs and machines.
>
>   common-obj-y = $(block-obj-y) blockdev.o
>
> +common-obj-y += $(replication-obj-$(CONFIG_REPLICATION))
>
> common-obj-y += $(net-obj-y)
>
> common-obj-y += $(qobject-obj-y)
>
> common-obj-$(CONFIG_LINUX) += $(fsdev-obj-$(CONFIG_LINUX))
>
> @@ -413,6 +414,11 @@ common-obj-y += qmp-marshal.o qapi-visit.o
> qapi-types.o $(qapi-obj-y)
>
> common-obj-y += qmp.o hmp.o
>
>   ######################################################################
>
> +# replication
>
> +replication-nested-y = repagent_client.o  repagent.o  repcmd_listener.o
>
> +replication-obj-y = $(addprefix replication/, $(replication-nested-y))
>
> +
>
> +######################################################################
>
> # guest agent
>
>   qga-nested-y = guest-agent-commands.o guest-agent-command-state.o
>
> diff --git a/block.c b/block.c
>
> index 9bb236c..f3b8387 100644
>
> --- a/block.c
>
> +++ b/block.c
>
> @@ -31,6 +31,10 @@
>
> #include "qemu-coroutine.h"
>
> #include "qmp-commands.h"
>
> +#ifdef CONFIG_REPLICATION
>
> +#include "replication/repagent.h"
>
> +#endif
>
> +
>
> #ifdef CONFIG_BSD
>
> #include<sys/types.h>
>
> #include<sys/stat.h>
>
> @@ -640,6 +644,9 @@ int bdrv_open(BlockDriverState *bs, const char
> *filename, int flags,
>
>           goto unlink_and_fail;
>
>       }
>
> +#ifdef CONFIG_REPLICATION
>
> +    repagent_register_drive(filename,  bs);
>
> +#endif
>
>       /* Open the image */
>
>       ret = bdrv_open_common(bs, filename, flags, drv);
>
>       if (ret<  0) {
>
> @@ -1292,6 +1299,17 @@ static int coroutine_fn
> bdrv_co_do_writev(BlockDriverState *bs,
>
>       ret = drv->bdrv_co_writev(bs, sector_num, nb_sectors, qiov);
>
> +
>
> +#ifdef CONFIG_REPLICATION
>
> +    if (bs->device_name[0] != '\0') {
>
> +        /* We split the IO only at the highest stack driver layer.
>
> +           Currently we know that by checking device_name - only
>
> +           highest level (closest to the guest) has that name.
>
> +           */
>
> +           repagent_handle_protected_write(bs, sector_num,
>
> +                nb_sectors, qiov, ret);
>
> +    }
>
> +#endif
>
>       if (bs->dirty_bitmap) {
>
>           set_dirty_bitmap(bs, sector_num, nb_sectors, 1);
>
>       }
>
> @@ -1783,7 +1801,7 @@ int bdrv_has_zero_init(BlockDriverState *bs)
>
>    * 'nb_sectors' is the max value 'pnum' should be set to.
>
>    */
>
> int bdrv_is_allocated(BlockDriverState *bs, int64_t sector_num, int
> nb_sectors,
>
> -              int *pnum)
>
> +    int *pnum)
>
> {
>
>       int64_t n;
>
>       if (!bs->drv->bdrv_is_allocated) {
>
> diff --git a/configure b/configure
>
> index 9e5da44..93d600e 100755
>
> --- a/configure
>
> +++ b/configure
>
> @@ -179,6 +179,7 @@ spice=""
>
> rbd=""
>
> smartcard=""
>
> smartcard_nss=""
>
> +replication=""
>
> usb_redir=""
>
> opengl=""
>
> zlib="yes"
>
> @@ -772,6 +773,10 @@ for opt do
>
>     ;;
>
>     --enable-smartcard-nss) smartcard_nss="yes"
>
>     ;;
>
> +  --disable-replication) replication="no"
>
> +  ;;
>
> +  --enable-replication) replication="yes"
>
> +  ;;
>
>     --disable-usb-redir) usb_redir="no"
>
>     ;;
>
>     --enable-usb-redir) usb_redir="yes"
>
> @@ -1067,6 +1072,7 @@ echo "  --disable-usb-redir      disable usb network
> redirection support"
>
> echo "  --enable-usb-redir       enable usb network redirection support"
>
> echo "  --disable-guest-agent    disable building of the QEMU Guest Agent"
>
> echo "  --enable-guest-agent     enable building of the QEMU Guest Agent"
>
> +echo "  --enable-replication     enable replication support"
>
> echo ""
>
> echo "NOTE: The object files are built at the place where configure is
> launched"
>
> exit 1
>
> @@ -2733,6 +2739,7 @@ echo "curl support      $curl"
>
> echo "check support     $check_utests"
>
> echo "mingw32 support   $mingw32"
>
> echo "Audio drivers     $audio_drv_list"
>
> +echo "Replication          $replication"
>
> echo "Extra audio cards $audio_card_list"
>
> echo "Block whitelist   $block_drv_whitelist"
>
> echo "Mixer emulation   $mixemu"
>
> @@ -3080,6 +3087,10 @@ if test "$smartcard_nss" = "yes" ; then
>
>     echo "CONFIG_SMARTCARD_NSS=y">>  $config_host_mak
>
> fi
>
> +if test "$replication" = "yes" ; then
>
> +  echo "CONFIG_REPLICATION=y">>  $config_host_mak
>
> +fi
>
> +
>
> if test "$usb_redir" = "yes" ; then
>
>     echo "CONFIG_USB_REDIR=y">>  $config_host_mak
>
> fi
>
> diff --git a/qemu-options.hx b/qemu-options.hx
>
> old mode 100644
>
> new mode 100755
>
> index 681eaf1..c97e4f8
>
> --- a/qemu-options.hx
>
> +++ b/qemu-options.hx
>
> @@ -2602,3 +2602,9 @@ HXCOMM This is the last statement. Insert new options
> before this line!
>
> STEXI
>
> @end table
>
> ETEXI
>
> +
>
> +DEF("repagent", HAS_ARG, QEMU_OPTION_repagent,
>
> +    "-repagent [hub IP/name]\n"
>
> +    "                Enable replication support for disks\n"
>
> +    "                hub is the ip or name of the machine running the
> replication hub.\n",
>
> +    QEMU_ARCH_ALL)
>
> diff --git a/replication/qemu-repagent.txt b/replication/qemu-repagent.txt
>
> new file mode 100755
>
> index 0000000..e3b0c1e
>
> --- /dev/null
>
> +++ b/replication/qemu-repagent.txt
>
> @@ -0,0 +1,104 @@
>
> +             repagent - replication agent - a Qemu module for enabling
> continuous async replication of VM volumes
>
> +
>
> +Introduction
>
> +             This document describes a feature in Qemu - a replication
> agent (AKA Repagent).
>
> +             The Repagent is a new module that exposes an API to an
> external replication system (AKA Rephub).
>
> +             This API allows a Rephub to communicate with a Qemu VM and
> continuously replicate its volumes.
>
> +             The imlementation of a Rephub is outside of the scope of this
> document. There may be several various Rephub
>
> +             implenetations using the same repagent in Qemu.
>
> +
>
> +Main feature of Repagent
>
> +             Repagent does the following:
>
> +             * Report volumes - report a list of all volumes in a VM to
> the Rephub.
>
> +             * Report writes to a volume - send all writes made to a
> protected volume to the Rephub.
>
> +                             The reporting of an IO is asyncronuous - i.e.
> the IO is not delayed by the Repagent to get any acknowledgement from the
> Rephub.
>
> +                             It is only copied to the Rephub.
>
> +             * Read a protected volume - allows the Rephub to read a
> protected volume, to enable the protected hub to syncronize the content of
> a protected volume.
>
> +
>
> +Description of the Repagent module
>
> +
>
> +Build and run options
>
> +             New configure option: --enable-replication
>
> +             New command line option:
>
> +             -repagent [hub IP/name]
>
> +
> Enable replication support for disks
>
> +
> hub is the ip or name of the machine running the replication hub.
>
> +
>
> +Module APIs
>
> +             The Repagent module interfaces two main components:
>
> +             1. The Rephub - An external API based on socket messages
>
> +             2. The generic block layer- block.c
>
> +
>
> +             Rephub message API
>
> +                             The external replication API is a message
> based API.
>
> +                             We won't go into the structure of the
> messages here - just the sematics.
>
> +
>
> +                             Messages list
>
> +                                             (The updated list and
> comments are in Rephub_cmds.h)
>
> +
>
> +                                             Messages from the Repagent to
> the Rephub:
>
> +                                             * Protected write
>
> +                                                             The Repagent
> sends each write to a protected volume to the hub with the IO status.
>
> +                                                             In case the
> status is bad the write content is not sent
>
> +                                             * Report VM volumes
>
> +                                                             The agent
> reports all the volumes of the VM to the hub.
>
> +                                             * Read Volume Response
>
> +                                                             A response to
> a Read Volume Request
>
> +                                                             Sends the
> data read from a protected volume to the hub
>
> +                                             * Agent shutdown
>
> +                                                             Notifies the
> hub that the agent is about to shutdown.
>
> +                                                             This allows a
> graceful shutdown. Any disconnection of an agent without
>
> +                                                             sending this
> command will result in a full sync of the VM volumes.
>
> +
>
> +                                             Messages from the Rephub to
> the Repagent:
>
> +                                             * Start protect
>
> +                                                             The hub
> instructs the agent to start protecting a volume. When a volume is protected
>
> +                                                             all its
> writes are sent to to the hub.
>
> +                                                             With this
> command the hub also assigns a volume ID to the given volume name.
>
> +                                             * Read volume request
>
> +                                                             The hub
> issues a read IO to a protected volume.
>
> +                                                             This command
> is used during sync - when the hub needs to read unsyncronized
>
> +                                                             sections of a
> protected volume.
>
> +                                                             This command
> is a request, the read data is returned by the read volume response message
> (see above).
>
> +             block.c API
>
> +                             The API to the generic block storage layer
> contains 3 functionalities:
>
> +                             1. Handle writes to protected volumes
>
> +                                             In bdrv_co_do_writev, each
> write is reported to the Repagent module.
>
> +                             2. Handle each new volume that registers
>
> +                                             In bdrv_open - each new
> bottom-level block driver that registers is reported.
>
> +                             2. Read from a volume
>
> +                                             Repagent calls bdrv_aio_readv
> to handle read requests coming from the hub.
>
> +
>
> +
>
> +General description of a Rephub  - a replication system the repagent
> connects to
>
> +             This section describes in high level a sample Rephub - a
> replication system that uses the repagent API
>
> +             to replicate disks.
>
> +             It describes a simple Rephub that comntinuously maintains a
> mirror of the volumes of a VM.
>
> +
>
> +             Say we have a VM we want to protect - call it PVM, say it has
> 2 volumes - V1, V2.
>
> +             Our Rephub is called SingleRephub - a Rephub protecting a
> single VM.
>
> +
>
> +             Preparations
>
> +             1. The user chooses a host to rub SingleRephub - a different
> host than PVM, call it Host2
>
> +             2. The user creates two volumes on Host2 - same sizes of V1
> and V2, call them V1R (V1 recovery) and V2R.
>
> +             3. The user runs SingleRephub process on Host2, and gives V1R
> and V2R as command line arguments.
>
> +                             From now on SingleRephub waits for the
> protected VM repagent to connect.
>
> +             4. The user runs the protected VM PVM - and uses the switch
> -repagent<Host2 IP>.
>
> +
>
> +             Runtime
>
> +             1. The repagent module connects to SingleRephub on startup.
>
> +             2. repagent reports V1 and V2 to SingleRephub.
>
> +             3. SingleRephub starts to perform an initial synchronization
> of the protected volumes-
>
> +                             it reads each protected volume (V1 and V2) -
> using read volume requests - and copies the data into the
>
> +                             recovery volume V1R and V2R.
>
> +             4. SingleRephub enters 'protection' mode - each write to the
> protected volume is sent by the repagent to the Rephub,
>
> +                             and the Rephub performs the write on the
> matching recovery volume.
>
> +
>
> +             * Note that during stage 3 writes to the protected volumes
> are not ignored - they're kept in a bitmap,
>
> +                             and will be read again when stage 3 ends, in
> an interative convergin process.
>
> +
>
> +             This flow continuously maintains an updated recovery volume.
>
> +             If the protected system is damaged, the user can create a new
> VM on Host2 with the replicated volumes attached to it.
>
> +             The new VM is a replica of the protected system.
>
> +
>
> +
>
> diff --git a/replication/repagent.c b/replication/repagent.c
>
> new file mode 100644
>
> index 0000000..c66eae7
>
> --- /dev/null
>
> +++ b/replication/repagent.c
>
> @@ -0,0 +1,322 @@
>
> +#include<string.h>
>
> +#include<stdlib.h>
>
> +#include<stdio.h>
>
> +#include<pthread.h>
>
> +#include<stdint.h>
>
> +
>
> +#include "block.h"
>
> +#include "rephub_defs.h"
>
> +#include "block_int.h"
>
> +#include "repagent_client.h"
>
> +#include "repagent.h"
>
> +#include "rephub_cmds.h"
>
> +
>
> +#define ZERO_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj))
>
> +#define REPAGENT_MAX_NUM_VOLUMES (64)
>
> +#define REPAGENT_VOLUME_ID_NONE (0)
>
> +
>
> +typedef struct RepagentVolume {
>
> +    uint64_t vol_id;
>
> +    const char *vol_path;
>
> +    BlockDriverState *driver_ptr;
>
> +} RepagentVolume;
>
> +
>
> +struct RepAgentState {
>
> +    int is_init;
>
> +    int num_volumes;
>
> +    RepagentVolume * volumes[REPAGENT_MAX_NUM_VOLUMES];
>
> +};
>
> +
>
> +typedef struct RepagentReadVolIo {
>
> +    QEMUIOVector qiov;
>
> +    RepCmdReadVolReq rep_cmd;
>
> +    uint8_t *buf;
>
> +    struct timeval start_time;
>
> +} RepagentReadVolIo;
>
> +
>
> +static int repagent_get_volume_by_name(const char *name);
>
> +static void repagent_report_volumes_to_hub(void);
>
> +static void repagent_vol_read_done(void *opaque, int ret);
>
> +static struct timeval tsub(struct timeval t1, struct timeval t2);
>
> +
>
> +RepAgentState g_rep_agent = { 0 };
>
> +
>
> +void repagent_init(const char *hubname, int port)
>
> +{
>
> +    /* It is the responsibility of the thread to free this struct */
>
> +    rephub_params *pParams = (rephub_params
> *)g_malloc(sizeof(rephub_params));
>
> +    if (hubname == NULL) {
>
> +        hubname = "127.0.0.1";
>
> +    }
>
> +    if (port == 0) {
>
> +        port = 9010;
>
> +    }
>
> +
>
> +    printf("repagent_init %s\n", hubname);
>
> +
>
> +    pParams->port = port;
>
> +    pParams->name = g_strdup(hubname);
>
> +
>
> +    pthread_t thread_id = 0;
>
> +
>
> +    /* Create the repagent client listener thread */
>
> +    pthread_create(&thread_id, 0, repagent_listen, (void *) pParams);
>
> +    pthread_detach(thread_id);
>
> +}
>
> +
>
> +void repagent_register_drive(const char *drive_path,
>
> +        BlockDriverState *driver_ptr)
>
> +{
>
> +    int i;
>
> +    for (i = 0; i<  g_rep_agent.num_volumes ; i++) {
>
> +        RepagentVolume *vol = g_rep_agent.volumes[i];
>
> +        if (vol != NULL) {
>
> +            assert(
>
> +                    strcmp(drive_path, vol->vol_path) != 0
>
> +&&  driver_ptr != vol->driver_ptr);
>
> +        }
>
> +    }
>
> +
>
> +    assert(g_rep_agent.num_volumes<  REPAGENT_MAX_NUM_VOLUMES);
>
> +
>
> +    printf("zerto repagent: Registering drive. Num drives %d, path %s\n",
>
> +            g_rep_agent.num_volumes, drive_path);
>
> +    g_rep_agent.volumes[i] =
>
> +            (RepagentVolume *)g_malloc(sizeof(RepagentVolume));
>
> +    g_rep_agent.volumes[i]->driver_ptr = driver_ptr;
>
> +    /* orim todo strcpy? */
>
> +    g_rep_agent.volumes[i]->vol_path = drive_path;
>
> +
>
> +    /* Orim todo thread-safety? */
>
> +    g_rep_agent.num_volumes++;
>
> +
>
> +    repagent_report_volumes_to_hub();
>
> +}
>
> +
>
> +/* orim todo destruction? */
>
> +
>
> +static RepagentVolume *repagent_get_protected_volume_by_driver(
>
> +        BlockDriverState *bs)
>
> +{
>
> +    /* orim todo optimize search */
>
> +    int i = 0;
>
> +    for (i = 0; i<  g_rep_agent.num_volumes ; i++) {
>
> +        RepagentVolume *p_vol = g_rep_agent.volumes[i];
>
> +        if (p_vol != NULL&&  p_vol->driver_ptr == (void *) bs) {
>
> +            return p_vol;
>
> +        }
>
> +    }
>
> +    return NULL;
>
> +}
>
> +
>
> +void repagent_handle_protected_write(BlockDriverState *bs, int64_t
> sector_num,
>
> +        int nb_sectors, QEMUIOVector *qiov, int ret_status)
>
> +{
>
> +    printf("zerto Protected write offset %lld, size %d, IO return status
> %d",
>
> +            (long long int) sector_num, nb_sectors, ret_status);
>
> +    if (bs->filename != NULL) {
>
> +        printf(", filename %s", bs->filename);
>
> +    }
>
> +
>
> +    printf("\n");
>
> +
>
> +    RepagentVolume *p_vol = repagent_get_protected_volume_by_driver(bs);
>
> +    if (p_vol == NULL || p_vol->vol_id == REPAGENT_VOLUME_ID_NONE) {
>
> +        /* Unprotected */
>
> +        printf("Got a write to an unprotected volume.\n");
>
> +        return;
>
> +    }
>
> +
>
> +    /* Report IO to rephub */
>
> +
>
> +    int data_size = qiov->size;
>
> +    if (ret_status<  0) {
>
> +        /* On failed ios we don't send the data to the hub */
>
> +        data_size = 0;
>
> +    }
>
> +    uint8_t *pdata = NULL;
>
> +    RepCmdProtectedWrite *p_cmd = (RepCmdProtectedWrite *) repcmd_new(
>
> +            REPHUB_CMD_PROTECTED_WRITE, data_size, (uint8_t **)&pdata);
>
> +
>
> +    if (ret_status>= 0) {
>
> +        qemu_iovec_to_buffer(qiov, pdata);
>
> +    }
>
> +
>
> +    p_cmd->volume_id = p_vol->vol_id;
>
> +    p_cmd->offset_sectors = sector_num;
>
> +    p_cmd->size_sectors = nb_sectors;
>
> +    p_cmd->ret_status = ret_status;
>
> +
>
> +    if (repagent_client_send((RepCmd *) p_cmd) != 0) {
>
> +        printf("Error sending command\n");
>
> +    }
>
> +}
>
> +
>
> +static void repagent_report_volumes_to_hub(void)
>
> +{
>
> +    /* Report IO to rephub */
>
> +    int i;
>
> +    RepCmdDataReportVmVolumes *p_cmd_data = NULL;
>
> +    RepCmdReportVmVolumes *p_cmd = (RepCmdReportVmVolumes *) repcmd_new(
>
> +            REPHUB_CMD_REPORT_VM_VOLUMES,
>
> +            g_rep_agent.num_volumes * sizeof(RepVmVolumeInfo),
>
> +            (uint8_t **)&p_cmd_data);
>
> +    p_cmd->num_volumes = g_rep_agent.num_volumes;
>
> +    printf("reporting %u volumes\n", g_rep_agent.num_volumes);
>
> +    for (i = 0; i<  g_rep_agent.num_volumes ; i++) {
>
> +        assert(g_rep_agent.volumes[i] != NULL);
>
> +        printf("reporting volume %s size %u\n",
>
> +                g_rep_agent.volumes[i]->vol_path,
>
> +                (uint32_t) sizeof(p_cmd_data->volumes[i].name));
>
> +        strncpy((char *) p_cmd_data->volumes[i].name,
>
> +                g_rep_agent.volumes[i]->vol_path,
>
> +                sizeof(p_cmd_data->volumes[i].name));
>
> +        p_cmd_data->volumes[i].volume_id = g_rep_agent.volumes[i]->vol_id;
>
> +    }
>
> +    if (repagent_client_send((RepCmd *) p_cmd) != 0) {
>
> +        printf("Error sending command\n");
>
> +    }
>
> +}
>
> +
>
> +int repaget_start_protect(RepCmdStartProtect *pcmd,
>
> +        RepCmdDataStartProtect *pcmd_data)
>
> +{
>
> +    printf("Start protect vol %s, ID %llu\n", pcmd_data->volume_name,
>
> +            (unsigned long long) pcmd->volume_id);
>
> +    int vol_index = repagent_get_volume_by_name(pcmd_data->volume_name);
>
> +    if (vol_index<  0) {
>
> +        printf("The volume doesn't exist\n");
>
> +        return TRUE;
>
> +    }
>
> +    /* orim todo protect */
>
> +    g_rep_agent.volumes[vol_index]->vol_id = pcmd->volume_id;
>
> +
>
> +    return TRUE;
>
> +}
>
> +
>
> +static int repagent_get_volume_by_name(const char *name)
>
> +{
>
> +    int i = 0;
>
> +    for (i = 0; i<  g_rep_agent.num_volumes ; i++) {
>
> +        if (g_rep_agent.volumes[i] != NULL
>
> +&&  strcmp(name, g_rep_agent.volumes[i]->vol_path) == 0) {
>
> +            return i;
>
> +        }
>
> +    }
>
> +    return -1;
>
> +}
>
> +
>
> +static int repagent_get_volume_by_id(uint64_t vol_id)
>
> +{
>
> +    int i = 0;
>
> +    for (i = 0; i<  g_rep_agent.num_volumes ; i++) {
>
> +        if (g_rep_agent.volumes[i] != NULL
>
> +&&  g_rep_agent.volumes[i]->vol_id == vol_id) {
>
> +            return i;
>
> +        }
>
> +    }
>
> +    return -1;
>
> +}
>
> +
>
> +int repaget_read_vol(RepCmdReadVolReq *pcmd, uint8_t *pdata)
>
> +{
>
> +    int index = repagent_get_volume_by_id(pcmd->volume_id);
>
> +    int size_bytes = pcmd->size_sectors * 512;
>
> +    if (index<  0) {
>
> +        printf("Vol read - Could not find vol id %llu\n",
>
> +                (unsigned long long int) pcmd->volume_id);
>
> +        RepCmdReadVolRes *p_res_cmd = (RepCmdReadVolRes *) repcmd_new(
>
> +                REPHUB_CMD_READ_VOL_RES, 0, NULL);
>
> +        p_res_cmd->req_id = pcmd->req_id;
>
> +        p_res_cmd->volume_id = pcmd->volume_id;
>
> +        p_res_cmd->is_status_success = FALSE;
>
> +        repagent_client_send((RepCmd *) p_res_cmd);
>
> +        return TRUE;
>
> +    }
>
> +
>
> +    printf("Vol read - driver %p, volId %llu, offset %llu, size %u\n",
>
> +            g_rep_agent.volumes[index]->driver_ptr,
>
> +            (unsigned long long int) pcmd->volume_id,
>
> +            (unsigned long long int) pcmd->offset_sectors,
> pcmd->size_sectors);
>
> +
>
> +    {
>
> +        RepagentReadVolIo *read_xact = calloc(1,
> sizeof(RepagentReadVolIo));
>
> +
>
> +/*        BlockDriverAIOCB *acb; */
>
> +
>
> +        ZERO_MEM_OBJ(read_xact);
>
> +
>
> +        qemu_iovec_init(&read_xact->qiov, 1);
>
> +
>
> +        /*read_xact->buf =
>
> +        qemu_blockalign(g_rep_agent.volumes[index]->driver_ptr,
> size_bytes); */
>
> +        read_xact->buf = (uint8_t *) g_malloc(size_bytes);
>
> +        read_xact->rep_cmd = *pcmd;
>
> +        qemu_iovec_add(&read_xact->qiov, read_xact->buf, size_bytes);
>
> +
>
> +        gettimeofday(&read_xact->start_time, NULL);
>
> +        /* orim TODO - use the returned acb to cancel the request on
> shutdown */
>
> +        /*acb = */bdrv_aio_readv(g_rep_agent.volumes[index]->driver_ptr,
>
> +                read_xact->rep_cmd.offset_sectors,&read_xact->qiov,
>
> +                read_xact->rep_cmd.size_sectors, repagent_vol_read_done,
>
> +                read_xact);
>
> +    }
>
> +
>
> +    return TRUE;
>
> +}
>
> +
>
> +static void repagent_vol_read_done(void *opaque, int ret)
>
> +{
>
> +    struct timeval t2;
>
> +    RepagentReadVolIo *read_xact = (RepagentReadVolIo *) opaque;
>
> +    uint8_t *pdata = NULL;
>
> +    RepCmdReadVolRes *pcmd = (RepCmdReadVolRes *) repcmd_new(
>
> +            REPHUB_CMD_READ_VOL_RES, read_xact->rep_cmd.size_sectors * 512,
>
> +&pdata);
>
> +    pcmd->req_id = read_xact->rep_cmd.req_id;
>
> +    pcmd->volume_id = read_xact->rep_cmd.volume_id;
>
> +    pcmd->is_status_success = FALSE;
>
> +
>
> +    printf("Protected vol read - volId %llu, offset %llu, size %u\n",
>
> +            (unsigned long long int) read_xact->rep_cmd.volume_id,
>
> +            (unsigned long long int) read_xact->rep_cmd.offset_sectors,
>
> +            read_xact->rep_cmd.size_sectors);
>
> +    gettimeofday(&t2, NULL);
>
> +
>
> +    if (ret>= 0) {
>
> +        /* Read response - send the data to the hub */
>
> +        t2 = tsub(t2, read_xact->start_time);
>
> +        printf("Read prot vol done. Took %u seconds, %u us.",
>
> +                (uint32_t) t2.tv_sec, (uint32_t) t2.tv_usec);
>
> +
>
> +        pcmd->is_status_success = TRUE;
>
> +        /* orim todo optimize - don't copy, use the qiov buffer */
>
> +        qemu_iovec_to_buffer(&read_xact->qiov, pdata);
>
> +    } else {
>
> +        printf("readv failed: %s\n", strerror(-ret));
>
> +    }
>
> +
>
> +    repagent_client_send((RepCmd *) pcmd);
>
> +
>
> +    /*qemu_vfree(read_xact->buf); */
>
> +    g_free(read_xact->buf);
>
> +
>
> +    g_free(read_xact);
>
> +}
>
> +
>
> +static struct timeval tsub(struct timeval t1, struct timeval t2)
>
> +{
>
> +    t1.tv_usec -= t2.tv_usec;
>
> +    if (t1.tv_usec<  0) {
>
> +        t1.tv_usec += 1000000;
>
> +        t1.tv_sec--;
>
> +    }
>
> +    t1.tv_sec -= t2.tv_sec;
>
> +    return t1;
>
> +}
>
> +
>
> +void repagent_client_connected(void)
>
> +{
>
> +    /* orim todo thread protection */
>
> +    repagent_report_volumes_to_hub();
>
> +}
>
> diff --git a/replication/repagent.h b/replication/repagent.h
>
> new file mode 100644
>
> index 0000000..98ccbf2
>
> --- /dev/null
>
> +++ b/replication/repagent.h
>
> @@ -0,0 +1,46 @@
>
> +/*
>
> + * QEMU System Emulator
>
> + *
>
> + * Copyright (c) 2003-2008 Fabrice Bellard
>
> + *
>
> + * Permission is hereby granted, free of charge, to any person obtaining a
> copy
>
> + * of this software and associated documentation files (the "Software"),
> to deal
>
> + * in the Software without restriction, including without limitation the
> rights
>
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or
> sell
>
> + * copies of the Software, and to permit persons to whom the Software is
>
> + * furnished to do so, subject to the following conditions:
>
> + *
>
> + * The above copyright notice and this permission notice shall be included
> in
>
> + * all copies or substantial portions of the Software.
>
> + *
>
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
> OR
>
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
>
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
>
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> OTHER
>
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
> FROM,
>
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
> IN
>
> + * THE SOFTWARE.
>
> + */
>
> +#ifndef REPAGENT_H
>
> +#define REPAGENT_H
>
> +#include<stdint.h>
>
> +
>
> +#include "qemu-common.h"
>
> +
>
> +typedef struct RepAgentState RepAgentState;
>
> +typedef struct RepCmdStartProtect RepCmdStartProtect;
>
> +typedef struct RepCmdDataStartProtect RepCmdDataStartProtect;
>
> +struct RepCmdReadVolReq;
>
> +
>
> +void repagent_init(const char *hubname, int port);
>
> +void repagent_handle_protected_write(BlockDriverState *bs,
>
> +        int64_t sector_num, int nb_sectors, QEMUIOVector *qiov, int
> ret_status);
>
> +void repagent_register_drive(const char *drive_path,
>
> +        BlockDriverState *driver_ptr);
>
> +int repaget_start_protect(RepCmdStartProtect *pcmd,
>
> +        RepCmdDataStartProtect *pcmd_data);
>
> +int repaget_read_vol(struct RepCmdReadVolReq *pcmd, uint8_t *pdata);
>
> +void repagent_client_connected(void);
>
> +
>
> +
>
> +#endif /* REPAGENT_H */
>
> diff --git a/replication/repagent_client.c b/replication/repagent_client.c
>
> new file mode 100644
>
> index 0000000..4dd9ea4
>
> --- /dev/null
>
> +++ b/replication/repagent_client.c
>
> @@ -0,0 +1,138 @@
>
> +#include "repcmd.h"
>
> +#include "rephub_cmds.h"
>
> +#include "repcmd_listener.h"
>
> +#include "repagent_client.h"
>
> +#include "repagent.h"
>
> +
>
> +#include<string.h>
>
> +#include<stdlib.h>
>
> +#include<errno.h>
>
> +#include<stdio.h>
>
> +#include<resolv.h>
>
> +#include<sys/socket.h>
>
> +#include<arpa/inet.h>
>
> +#include<netinet/in.h>
>
> +#include<unistd.h>
>
> +
>
> +#define ZERO_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj))
>
> +
>
> +static void repagent_process_cmd(RepCmd *pCmd, uint8_t *pData, void
> *clientPtr);
>
> +
>
> +typedef struct repagent_client_state {
>
> +    int is_connected;
>
> +    int is_terminate_receive;
>
> +    int hsock;
>
> +} repagent_client_state;
>
> +
>
> +static repagent_client_state g_client_state = { 0 };
>
> +
>
> +void *repagent_listen(void *pParam)
>
> +{
>
> +    rephub_params *pServerParams = (rephub_params *) pParam;
>
> +    int host_port = pServerParams->port;
>
> +    const char *host_name = pServerParams->name;
>
> +
>
> +    printf("Creating repagent listener thread...\n");
>
> +    g_free(pServerParams);
>
> +
>
> +    struct sockaddr_in my_addr;
>
> +
>
> +    int err;
>
> +    int retries = 0;
>
> +
>
> +    g_client_state.hsock = socket(AF_INET, SOCK_STREAM, 0);
>
> +    if (g_client_state.hsock == -1) {
>
> +        printf("Error initializing socket %d\n", errno);
>
> +        return (void *) -1;
>
> +    }
>
> +
>
> +    int param = 1;
>
> +
>
> +    if ((setsockopt(g_client_state.hsock, SOL_SOCKET, SO_REUSEADDR,
>
> +            (char *)&param, sizeof(int)) == -1)
>
> +            || (setsockopt(g_client_state.hsock, SOL_SOCKET, SO_KEEPALIVE,
>
> +                    (char *)&param, sizeof(int)) == -1)) {
>
> +        printf("Error setting options %d\n", errno);
>
> +        return (void *) -1;
>
> +    }
>
> +
>
> +    my_addr.sin_family = AF_INET;
>
> +    my_addr.sin_port = htons(host_port);
>
> +    memset(&(my_addr.sin_zero), 0, 8);
>
> +
>
> +    my_addr.sin_addr.s_addr = inet_addr(host_name);
>
> +
>
> +    /* Reconnect loop */
>
> +    while (!g_client_state.is_terminate_receive) {
>
> +
>
> +        if (connect(g_client_state.hsock, (struct sockaddr *)&my_addr,
>
> +                sizeof(my_addr)) == -1) {
>
> +            err = errno;
>
> +            if (err != EINPROGRESS) {
>
> +                retries++;
>
> +                fprintf(
>
> +                        stderr,
>
> +                        "Error connecting socket %d. Host %s, port %u.
> Retry count %d\n",
>
> +                        errno, host_name, host_port, retries);
>
> +                usleep(5 * 1000 * 1000);
>
> +                continue;
>
> +            }
>
> +        }
>
> +        retries = 0;
>
> +
>
> +        g_client_state.is_connected = 1;
>
> +
>
> +        repagent_client_connected();
>
> +        repcmd_listener(g_client_state.hsock, repagent_process_cmd, NULL);
>
> +        close(g_client_state.hsock);
>
> +
>
> +        g_client_state.is_connected = 0;
>
> +    }
>
> +    return 0;
>
> +}
>
> +
>
> +void repagent_process_cmd(RepCmd *pcmd, uint8_t *pdata, void *clientPtr)
>
> +{
>
> +    int is_free_data = 1;
>
> +    printf("Repagent got cmd %d\n", pcmd->hdr.cmdid);
>
> +    switch (pcmd->hdr.cmdid) {
>
> +    case REPHUB_CMD_START_PROTECT: {
>
> +        is_free_data = repaget_start_protect((RepCmdStartProtect *) pcmd,
>
> +                (RepCmdDataStartProtect *) pdata);
>
> +    }
>
> +        break;
>
> +    case REPHUB_CMD_READ_VOL_REQ: {
>
> +        is_free_data = repaget_read_vol((RepCmdReadVolReq *) pcmd, pdata);
>
> +    }
>
> +        break;
>
> +    default:
>
> +        assert(0);
>
> +        break;
>
> +
>
> +    }
>
> +
>
> +    if (is_free_data) {
>
> +        g_free(pdata);
>
> +    }
>
> +}
>
> +
>
> +int repagent_client_send(RepCmd *p_cmd)
>
> +{
>
> +    int bytecount = 0;
>
> +    printf("Send cmd %u, data size %u\n", p_cmd->hdr.cmdid,
>
> +            p_cmd->hdr.data_size_bytes);
>
> +    if (!g_client_state.is_connected) {
>
> +        printf("Not connected to hub\n");
>
> +        return -1;
>
> +    }
>
> +
>
> +    bytecount = send(g_client_state.hsock, p_cmd,
>
> +            sizeof(RepCmd) + p_cmd->hdr.data_size_bytes, 0);
>
> +    if (bytecount<  sizeof(RepCmd) + p_cmd->hdr.data_size_bytes) {
>
> +        printf("Bad send %d, errno %d\n", bytecount, errno);
>
> +        return bytecount;
>
> +    }
>
> +
>
> +    /* Success */
>
> +    return 0;
>
> +}
>
> diff --git a/replication/repagent_client.h b/replication/repagent_client.h
>
> new file mode 100644
>
> index 0000000..62a5377
>
> --- /dev/null
>
> +++ b/replication/repagent_client.h
>
> @@ -0,0 +1,36 @@
>
> +/*
>
> + * QEMU System Emulator
>
> + *
>
> + * Copyright (c) 2003-2008 Fabrice Bellard
>
> + *
>
> + * Permission is hereby granted, free of charge, to any person obtaining a
> copy
>
> + * of this software and associated documentation files (the "Software"),
> to deal
>
> + * in the Software without restriction, including without limitation the
> rights
>
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or
> sell
>
> + * copies of the Software, and to permit persons to whom the Software is
>
> + * furnished to do so, subject to the following conditions:
>
> + *
>
> + * The above copyright notice and this permission notice shall be included
> in
>
> + * all copies or substantial portions of the Software.
>
> + *
>
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
> OR
>
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
>
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
>
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> OTHER
>
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
> FROM,
>
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
> IN
>
> + * THE SOFTWARE.
>
> + */
>
> +#ifndef REPAGENT_CLIENT_H
>
> +#define REPAGENT_CLIENT_H
>
> +#include "repcmd.h"
>
> +
>
> +typedef struct rephub_params {
>
> +    char *name;
>
> +    int port;
>
> +} rephub_params;
>
> +
>
> +void *repagent_listen(void *pParam);
>
> +int repagent_client_send(RepCmd *p_cmd);
>
> +
>
> +#endif /* REPAGENT_CLIENT_H */
>
> diff --git a/replication/repcmd.h b/replication/repcmd.h
>
> new file mode 100644
>
> index 0000000..8c6cf1b
>
> --- /dev/null
>
> +++ b/replication/repcmd.h
>
> @@ -0,0 +1,59 @@
>
> +/*
>
> + * QEMU System Emulator
>
> + *
>
> + * Copyright (c) 2003-2008 Fabrice Bellard
>
> + *
>
> + * Permission is hereby granted, free of charge, to any person obtaining a
> copy
>
> + * of this software and associated documentation files (the "Software"),
> to deal
>
> + * in the Software without restriction, including without limitation the
> rights
>
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or
> sell
>
> + * copies of the Software, and to permit persons to whom the Software is
>
> + * furnished to do so, subject to the following conditions:
>
> + *
>
> + * The above copyright notice and this permission notice shall be included
> in
>
> + * all copies or substantial portions of the Software.
>
> + *
>
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
> OR
>
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
>
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
>
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> OTHER
>
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
> FROM,
>
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
> IN
>
> + * THE SOFTWARE.
>
> + */
>
> +#ifndef REPCMD_H
>
> +#define REPCMD_H
>
> +
>
> +#include<stdint.h>
>
> +
>
> +#define REPCMD_MAGIC1 (0x1122)
>
> +#define REPCMD_MAGIC2 (0x3344)
>
> +#define REPCMD_NUM_U32_PARAMS (11)
>
> +
>
> +enum RepCmds {
>
> +    REPCMD_FIRST_INVALID                    = 0,
>
> +    REPCMD_FIRST_HUBCMD                     = 1,
>
> +    REPHUB_CMD_PROTECTED_WRITE              = 2,
>
> +    REPHUB_CMD_REPORT_VM_VOLUMES            = 3,
>
> +    REPHUB_CMD_START_PROTECT                = 4,
>
> +    REPHUB_CMD_READ_VOL_REQ                 = 5,
>
> +    REPHUB_CMD_READ_VOL_RES                 = 6,
>
> +    REPHUB_CMD_AGENT_SHUTDOWN               = 7,
>
> +};
>
> +
>
> +typedef struct RepCmdHdr {
>
> +    uint16_t magic1;
>
> +    uint16_t cmdid;
>
> +    uint32_t data_size_bytes;
>
> +} RepCmdHdr;
>
> +
>
> +typedef struct RepCmd {
>
> +    RepCmdHdr hdr;
>
> +    unsigned int parameters[REPCMD_NUM_U32_PARAMS];
>
> +    unsigned int magic2;
>
> +    uint8_t data[0];
>
> +} RepCmd;
>
> +
>
> +RepCmd *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata);
>
> +
>
> +#endif /* REPCMD_H */
>
> diff --git a/replication/repcmd_listener.c b/replication/repcmd_listener.c
>
> new file mode 100644
>
> index 0000000..a211927
>
> --- /dev/null
>
> +++ b/replication/repcmd_listener.c
>
> @@ -0,0 +1,137 @@
>
> +#include<fcntl.h>
>
> +#include<string.h>
>
> +#include<stdlib.h>
>
> +#include<errno.h>
>
> +#include<stdio.h>
>
> +#include<netinet/in.h>
>
> +#include<resolv.h>
>
> +#include<sys/socket.h>
>
> +#include<arpa/inet.h>
>
> +#include<unistd.h>
>
> +#include<pthread.h>
>
> +#include<assert.h>
>
> +
>
> +/* Use the CONFIG_REPLICATION flag to determine whether
>
> + * we're under qemu build or a hub When under
>
> + * qemu use g_malloc */
>
> +#ifdef CONFIG_REPLICATION
>
> +#include<glib.h>
>
> +#define REPCMD_MALLOC g_malloc
>
> +#else
>
> +#define REPCMD_MALLOC malloc
>
> +#endif
>
> +
>
> +#include "repcmd.h"
>
> +#include "repcmd_listener.h"
>
> +
>
> +#define ZERO_MEM_OBJ(pObj) memset((void *)pObj, 0, sizeof(*pObj))
>
> +
>
> +typedef struct RepCmdListenerState {
>
> +    int is_terminate_receive;
>
> +} RepCmdListenerState;
>
> +
>
> +static RepCmdListenerState g_listenerState = { 0 };
>
> +
>
> +/* Returns 0 for initiated termination or socket error value on error */
>
> +int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void
> *clientPtr)
>
> +{
>
> +    RepCmd curCmd;
>
> +    uint8_t *pReadBuf = (uint8_t *)&curCmd;
>
> +    int bytesToGet = sizeof(RepCmd);
>
> +    int bytesGotten = 0;
>
> +    int isGotHeader = 0;
>
> +    uint8_t *pdata = NULL;
>
> +
>
> +    assert(callback != NULL);
>
> +
>
> +    /* receive loop */
>
> +    while (!g_listenerState.is_terminate_receive) {
>
> +        int bytecount;
>
> +
>
> +        bytecount = recv(hsock, pReadBuf + bytesGotten,
>
> +                bytesToGet - bytesGotten, 0);
>
> +        if (bytecount == -1) {
>
> +            fprintf(stderr, "Error receiving data %d\n", errno);
>
> +            return errno;
>
> +        }
>
> +
>
> +        if (bytecount == 0) {
>
> +            printf("Disconnected\n");
>
> +            return 0;
>
> +        }
>
> +        bytesGotten += bytecount;
>
> +/*     printf("Recieved bytes %d, got %d/%d\n",
>
> +                bytecount, bytesGotten, bytesToGet); */
>
> +        /* print content */
>
> +        if (0) {
>
> +            int i;
>
> +            for (i = 0; i<  bytecount ; i += 4) {
>
> +                /*printf("%d/%d", i, bytecount/4); */
>
> +                printf("%#x ",
>
> +                        *(int *) (&pReadBuf[bytesGotten - bytecount + i]));
>
> +
>
> +            }
>
> +            printf("\n");
>
> +        }
>
> +        assert(bytesGotten<= bytesToGet);
>
> +        if (bytesGotten == bytesToGet) {
>
> +            int isGotData = 0;
>
> +            bytesGotten = 0;
>
> +            if (!isGotHeader) {
>
> +                /* We just got the header */
>
> +                isGotHeader = 1;
>
> +
>
> +                assert(curCmd.hdr.magic1 == REPCMD_MAGIC1);
>
> +                assert(curCmd.magic2 == REPCMD_MAGIC2);
>
> +                if (curCmd.hdr.data_size_bytes>  0) {
>
> +                    pdata = (uint8_t *)REPCMD_MALLOC(
>
> +                                curCmd.hdr.data_size_bytes);
>
> +/*                    printf("malloc %p\n", pdata); */
>
> +                    pReadBuf = pdata;
>
> +                } else {
>
> +                    /* no data */
>
> +                    isGotData = 1;
>
> +                    pdata = NULL;
>
> +                }
>
> +                bytesToGet = curCmd.hdr.data_size_bytes;
>
> +            } else {
>
> +                isGotData = 1;
>
> +            }
>
> +
>
> +            if (isGotData) {
>
> +                /* Got command and data */
>
> +                (*callback)(&curCmd, pdata, clientPtr);
>
> +
>
> +                /* It's the callee responsibility to free pData */
>
> +                pdata = NULL;
>
> +                ZERO_MEM_OBJ(&curCmd);
>
> +                pReadBuf = (uint8_t *)&curCmd;
>
> +                bytesGotten = 0;
>
> +                bytesToGet = sizeof(RepCmd);
>
> +                isGotHeader = 0;
>
> +            }
>
> +        }
>
> +    }
>
> +    return 0;
>
> +}
>
> +
>
> +RepCmd *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata)
>
> +{
>
> +    RepCmd *p_cmd = (RepCmd *)REPCMD_MALLOC(sizeof(RepCmd) + data_size);
>
> +    assert(p_cmd != NULL);
>
> +
>
> +    /* Zero the CMD (not the data) */
>
> +    ZERO_MEM_OBJ(p_cmd);
>
> +
>
> +    p_cmd->hdr.cmdid = cmd_id;
>
> +    p_cmd->hdr.magic1 = REPCMD_MAGIC1;
>
> +    p_cmd->magic2 = REPCMD_MAGIC2;
>
> +    p_cmd->hdr.data_size_bytes = data_size;
>
> +
>
> +    if (p_out_pdata != NULL) {
>
> +        *p_out_pdata = p_cmd->data;
>
> +    }
>
> +
>
> +    return p_cmd;
>
> +}
>
> +
>
> diff --git a/replication/repcmd_listener.h b/replication/repcmd_listener.h
>
> new file mode 100644
>
> index 0000000..c09a12e
>
> --- /dev/null
>
> +++ b/replication/repcmd_listener.h
>
> @@ -0,0 +1,32 @@
>
> +/*
>
> + * QEMU System Emulator
>
> + *
>
> + * Copyright (c) 2003-2008 Fabrice Bellard
>
> + *
>
> + * Permission is hereby granted, free of charge, to any person obtaining a
> copy
>
> + * of this software and associated documentation files (the "Software"),
> to deal
>
> + * in the Software without restriction, including without limitation the
> rights
>
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or
> sell
>
> + * copies of the Software, and to permit persons to whom the Software is
>
> + * furnished to do so, subject to the following conditions:
>
> + *
>
> + * The above copyright notice and this permission notice shall be included
> in
>
> + * all copies or substantial portions of the Software.
>
> + *
>
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
> OR
>
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
>
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
>
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> OTHER
>
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
> FROM,
>
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
> IN
>
> + * THE SOFTWARE.
>
> + */
>
> +#ifndef REPCMD_LISTENER_H
>
> +#define REPCMD_LISTENER_H
>
> +#include<stdint.h>
>
> +typedef void (*pfn_received_cmd_cb)(RepCmd *pCmd,
>
> +                uint8_t *pData, void *clientPtr);
>
> +
>
> +int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void
> *clientPtr);
>
> +
>
> +#endif /* REPCMD_LISTENER_H */
>
> diff --git a/replication/rephub_cmds.h b/replication/rephub_cmds.h
>
> new file mode 100644
>
> index 0000000..820c37d
>
> --- /dev/null
>
> +++ b/replication/rephub_cmds.h
>
> @@ -0,0 +1,150 @@
>
> +/*
>
> + * QEMU System Emulator
>
> + *
>
> + * Copyright (c) 2003-2008 Fabrice Bellard
>
> + *
>
> + * Permission is hereby granted, free of charge, to any person obtaining a
> copy
>
> + * of this software and associated documentation files (the "Software"),
> to deal
>
> + * in the Software without restriction, including without limitation the
> rights
>
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or
> sell
>
> + * copies of the Software, and to permit persons to whom the Software is
>
> + * furnished to do so, subject to the following conditions:
>
> + *
>
> + * The above copyright notice and this permission notice shall be included
> in
>
> + * all copies or substantial portions of the Software.
>
> + *
>
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
> OR
>
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
>
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
>
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> OTHER
>
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
> FROM,
>
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
> IN
>
> + * THE SOFTWARE.
>
> + */
>
> +#ifndef REPHUB_CMDS_H
>
> +#define REPHUB_CMDS_H
>
> +
>
> +#include<stdint.h>
>
> +#include "repcmd.h"
>
> +#include "rephub_defs.h"
>
> +
>
> +/*********************************************************
>
> + * RepCmd Report a protected IO
>
> + *
>
> + * REPHUB_CMD_PROTECTED_WRITE
>
> + * Direction: agent->hub
>
> + *
>
> + * Any write of a protected volume is send with this
>
> + * message to the hub, with its status.
>
> + * When case the status is bad no data is sent
>
> + *********************************************************/
>
> +typedef struct RepCmdProtectedWrite {
>
> +    RepCmdHdr hdr;
>
> +    uint64_t volume_id;
>
> +    uint64_t offset_sectors;
>
> +    /* The size field duplicates the RepCmd size,
>
> +     * but it is needed for reporting failed IOs' sizes */
>
> +    uint32_t size_sectors;
>
> +    int ret_status;
>
> +} RepCmdProtectedWrite;
>
> +
>
> +/*********************************************************
>
> + * RepCmd Report VM volumes
>
> + *
>
> + * REPHUB_CMD_REPORT_VM_VOLUMES
>
> + * Direction: agent->hub
>
> + *
>
> + * The agent reports all the volumes of the VM
>
> + * to the hub.
>
> + *********************************************************/
>
> +typedef struct RepVmVolumeInfo {
>
> +    char name[REPHUB_MAX_VOL_NAME_LEN];
>
> +    uint64_t volume_id;
>
> +    uint32_t size_mb;
>
> +} RepVmVolumeInfo;
>
> +
>
> +typedef struct RepCmdReportVmVolumes {
>
> +    RepCmdHdr hdr;
>
> +    int num_volumes;
>
> +} RepCmdReportVmVolumes;
>
> +
>
> +typedef struct RepCmdDataReportVmVolumes {
>
> +    RepVmVolumeInfo volumes[0];
>
> +} RepCmdDataReportVmVolumes;
>
> +
>
> +
>
> +/*********************************************************
>
> + * RepCmd Start protect
>
> + *
>
> + * REPHUB_CMD_START_PROTECT
>
> + * Direction: hub->agent
>
> + *
>
> + * The hub instructs the agent to start protecting
>
> + * a volume. When a volume is protected all its writes
>
> + * are sent to to the hub.
>
> + * With this command the hub also assigns a volume ID to
>
> + * the given volume name.
>
> + *********************************************************/
>
> +typedef struct RepCmdStartProtect {
>
> +    RepCmdHdr hdr;
>
> +    uint64_t volume_id;
>
> +} RepCmdStartProtect;
>
> +
>
> +typedef struct RepCmdDataStartProtect {
>
> +    char volume_name[REPHUB_MAX_VOL_NAME_LEN];
>
> +} RepCmdDataStartProtect;
>
> +
>
> +
>
> +/*********************************************************
>
> + * RepCmd Read Volume Request
>
> + *
>
> + * REPHUB_CMD_READ_VOL_REQ
>
> + * Direction: hub->agent
>
> + *
>
> + * The hub issues a read IO to a protected volume.
>
> + * This command is used during sync - when the hub needs
>
> + * to read unsyncronized sections of a protected volume.
>
> + * This command is a request, the read data is returned
>
> + * by the response command REPHUB_CMD_READ_VOL_RES
>
> + *********************************************************/
>
> +typedef struct RepCmdReadVolReq {
>
> +    RepCmdHdr hdr;
>
> +    int req_id;
>
> +    int size_sectors;
>
> +    uint64_t volume_id;
>
> +    uint64_t offset_sectors;
>
> +} RepCmdReadVolReq;
>
> +
>
> +/*********************************************************
>
> + * RepCmd Read Volume Response
>
> + *
>
> + * REPHUB_CMD_READ_VOL_RES
>
> + * Direction: agent->hub
>
> + *
>
> + * A response to REPHUB_CMD_READ_VOL_REQ.
>
> + * Sends the data read from a protected volume
>
> + *********************************************************/
>
> +typedef struct RepCmdReadVolRes {
>
> +    RepCmdHdr hdr;
>
> +    int req_id;
>
> +    int is_status_success;
>
> +    uint64_t volume_id;
>
> +} RepCmdReadVolRes;
>
> +
>
> +/*********************************************************
>
> + * RepCmd Agent shutdown
>
> + *
>
> + * REPHUB_CMD_AGENT_SHUTDOWN
>
> + * Direction: agent->hub
>
> + *
>
> + * Notifies the hub that the agent is about to shutdown.
>
> + * This allows a graceful shutdown. Any disconnection
>
> + * of an agent without sending this command will result
>
> + * in a full sync of the VM volumes.
>
> + *********************************************************/
>
> +typedef struct RepCmdAgentShutdown {
>
> +    RepCmdHdr hdr;
>
> +} RepCmdAgentShutdown;
>
> +
>
> +
>
> +#endif /* REPHUB_CMDS_H */
>
> diff --git a/replication/rephub_defs.h b/replication/rephub_defs.h
>
> new file mode 100644
>
> index 0000000..e34e0ce
>
> --- /dev/null
>
> +++ b/replication/rephub_defs.h
>
> @@ -0,0 +1,40 @@
>
> +/*
>
> + * QEMU System Emulator
>
> + *
>
> + * Copyright (c) 2003-2008 Fabrice Bellard
>
> + *
>
> + * Permission is hereby granted, free of charge, to any person obtaining a
> copy
>
> + * of this software and associated documentation files (the "Software"),
> to deal
>
> + * in the Software without restriction, including without limitation the
> rights
>
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or
> sell
>
> + * copies of the Software, and to permit persons to whom the Software is
>
> + * furnished to do so, subject to the following conditions:
>
> + *
>
> + * The above copyright notice and this permission notice shall be included
> in
>
> + * all copies or substantial portions of the Software.
>
> + *
>
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
> OR
>
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
>
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
>
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> OTHER
>
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
> FROM,
>
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
> IN
>
> + * THE SOFTWARE.
>
> + */
>
> +#ifndef REP_HUB_DEFS_H
>
> +#define REP_HUB_DEFS_H
>
> +
>
> +#include<stdint.h>
>
> +
>
> +#define REPHUB_MAX_VOL_NAME_LEN (1024)
>
> +#define REPHUB_MAX_NUM_VOLUMES (512)
>
> +
>
> +#ifndef TRUE
>
> +    #define TRUE (1)
>
> +#endif
>
> +
>
> +#ifndef FALSE
>
> +    #define FALSE (0)
>
> +#endif
>
> +
>
> +#endif /* REP_HUB_DEFS_H */
>
> diff --git a/vl.c b/vl.c
>
> index 624da0f..506b5dc 100644
>
> --- a/vl.c
>
> +++ b/vl.c
>
> @@ -167,6 +167,7 @@ int main(int argc, char **argv)
>
>   #include "ui/qemu-spice.h"
>
> +#include "replication/repagent.h"
>
> //#define DEBUG_NET
>
> //#define DEBUG_SLIRP
>
> @@ -2307,6 +2308,15 @@ int main(int argc, char **argv, char **envp)
>
>                   drive_add(IF_DEFAULT, popt->index - QEMU_OPTION_hda,
> optarg,
>
>                             HD_OPTS);
>
>                   break;
>
> +            case QEMU_OPTION_repagent:
>
> +#ifdef CONFIG_REPLICATION
>
> +                repagent_init(optarg, 0);
>
> +#else
>
> +                fprintf(stderr, "Replication support is disabled. "
>
> +                    "Don't use -repagent option.\n");
>
> +                exit(1);
>
> +#endif
>
> +                break;
>
>               case QEMU_OPTION_drive:
>
>                   if (drive_def(optarg) == NULL) {
>
>                       exit(1);
>

  reply	other threads:[~2012-02-07 12:13 UTC|newest]

Thread overview: 66+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2012-02-07 10:29 [Qemu-devel] [RFC PATCH] replication agent module Ori Mamluk
2012-02-07 12:12 ` Anthony Liguori [this message]
2012-02-07 12:25   ` Dor Laor
2012-02-07 12:30     ` Ori Mamluk
2012-02-07 12:40       ` Anthony Liguori
2012-02-07 14:06         ` Ori Mamluk
2012-02-07 14:40           ` Paolo Bonzini
2012-02-07 14:48             ` Ori Mamluk
2012-02-07 15:47               ` Paolo Bonzini
2012-02-08  6:10                 ` Ori Mamluk
2012-02-08  8:49                   ` Dor Laor
2012-02-08 11:59                     ` Stefan Hajnoczi
2012-02-08  8:55                   ` Kevin Wolf
2012-02-08  9:47                     ` Ori Mamluk
2012-02-08 10:04                       ` Kevin Wolf
2012-02-08 13:28                         ` [Qemu-devel] [RFC] Replication agent design (was [RFC PATCH] replication agent module) Ori Mamluk
2012-02-08 14:59                           ` Stefan Hajnoczi
2012-02-08 14:59                             ` Stefan Hajnoczi
2012-02-19 13:40                             ` Ori Mamluk
2012-02-20 14:32                               ` Paolo Bonzini
2012-02-21  9:03                                 ` [Qemu-devel] BlockDriverState stack and BlockListeners (was: [RFC] Replication agent design) Kevin Wolf
2012-02-21  9:15                                   ` [Qemu-devel] BlockDriverState stack and BlockListeners Paolo Bonzini
2012-02-21  9:49                                     ` Kevin Wolf
2012-02-21 10:09                                       ` Paolo Bonzini
2012-02-21 10:51                                         ` Kevin Wolf
2012-02-21 11:36                                           ` Paolo Bonzini
2012-02-21 12:22                                             ` Stefan Hajnoczi
2012-02-21 12:57                                               ` Paolo Bonzini
2012-02-21 15:49                                               ` Markus Armbruster
2012-02-21 13:10                                             ` Kevin Wolf
2012-02-21 13:21                                               ` Paolo Bonzini
2012-02-21 15:56                                               ` Markus Armbruster
2012-02-21 16:04                                                 ` Kevin Wolf
2012-02-21 16:19                                                   ` Markus Armbruster
2012-02-21 16:39                                                     ` Kevin Wolf
2012-02-21 17:16                                               ` Stefan Hajnoczi
2012-02-21 10:20                                       ` Ori Mamluk
2012-02-29  8:38                                   ` Ori Mamluk
2012-03-03 11:46                                     ` Stefan Hajnoczi
2012-03-04  5:14                                       ` Ori Mamluk
2012-03-04  8:56                                         ` Paolo Bonzini
2012-03-05 12:04                                         ` Stefan Hajnoczi
2012-02-08 11:02                   ` [Qemu-devel] [RFC PATCH] replication agent module Stefan Hajnoczi
2012-02-08 13:00                     ` [Qemu-devel] [RFC] Replication agent requirements (was [RFC PATCH] replication agent module) Ori Mamluk
2012-02-08 13:30                       ` Anthony Liguori
2012-02-08 12:03                   ` [Qemu-devel] [RFC PATCH] replication agent module Stefan Hajnoczi
2012-02-08 12:46                     ` Paolo Bonzini
2012-02-08 14:39                       ` Stefan Hajnoczi
2012-02-08 14:55                         ` Paolo Bonzini
2012-02-08 15:07                           ` Stefan Hajnoczi
2012-02-07 14:53             ` Kevin Wolf
2012-02-07 15:00             ` Anthony Liguori
2012-02-07 13:34 ` Kevin Wolf
2012-02-07 13:50   ` Stefan Hajnoczi
2012-02-07 13:58     ` Paolo Bonzini
2012-02-07 14:05     ` Paolo Bonzini
2012-02-08 12:17       ` Orit Wasserman
2012-02-07 14:18     ` Ori Mamluk
2012-02-07 14:59     ` Anthony Liguori
2012-02-07 15:20       ` Stefan Hajnoczi
2012-02-07 16:25         ` Anthony Liguori
2012-02-21 16:01       ` Markus Armbruster
2012-02-21 17:31         ` Stefan Hajnoczi
2012-02-07 14:45   ` Ori Mamluk
2012-02-08 12:29     ` Orit Wasserman
2012-02-08 11:45   ` Luiz Capitulino

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=4F31153E.9010205@codemonkey.ws \
    --to=anthony@codemonkey.ws \
    --cc=dlaor@redhat.com \
    --cc=kwolf@redhat.com \
    --cc=omamluk@zerto.com \
    --cc=qemu-devel@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).