From: Anthony Liguori <anthony@codemonkey.ws>
To: Ori Mamluk <omamluk@zerto.com>
Cc: Kevin Wolf <kwolf@redhat.com>, dlaor@redhat.com, qemu-devel@nongnu.org
Subject: Re: [Qemu-devel] [RFC PATCH] replication agent module
Date: Tue, 07 Feb 2012 06:12:46 -0600 [thread overview]
Message-ID: <4F31153E.9010205@codemonkey.ws> (raw)
In-Reply-To: <73865e0ce364c40e0eb65ec6b22b819d@mail.gmail.com>
Hi,
On 02/07/2012 04:29 AM, Ori Mamluk wrote:
> Repagent is a new module that allows an external replication system to
> replicate a volume of a Qemu VM.
>
> This RFC patch adds the repagent client module to Qemu.
Please read http://wiki.qemu.org/Contribute/SubmitAPatch
In particular, use a tool like git-send-email and split this patch up into more
manageable chunks.
Is there an Open Source rehub available? As a project policy, adding external
APIs specifically for proprietary software is not something we're willing to do.
Regards,
Anthony Liguori
> Documentation of the module role and API is in the patch at
> replication/qemu-repagent.txt
>
>
>
> The main motivation behind the module is to allow replication of VMs in a
> virtualization environment like RhevM.
>
> To achieve this we need basic replication support in Qemu.
>
>
>
> This is the first submission of this module, which was written as a Proof
> Of Concept, and used successfully for replicating and recovering a Qemu VM.
>
> Points and open issues:
>
> * The module interfaces the Qemu storage stack at block.c
> generic layer. Is this the right place to intercept/inject IOs?
>
> * The patch contains performing IO reads invoked by a new
> thread (a TCP listener thread). See repaget_read_vol in repagent.c. It is
> not protected by any lock – is this OK?
>
> * VM ID – the replication system implies an environment with
> several VMs connected to a central replication system (Rephub).
>
> This requires some sort of identification for a VM. The
> current patch does not include a VM ID – I did not find any adequate ID to
> use.
>
> Any suggestions?
>
>
>
> Appreciate any feedback or suggestions. Thanks,
>
> Ori.
>
>
>
>
>
> From 5a0d88689ddcf325f25fdfca2a2012f1bbf141b9 Mon Sep 17 00:00:00 2001
>
> From: Ori Mamluk<orim@orim-fedora.(none)>
>
> Date: Tue, 7 Feb 2012 11:12:12 +0200
>
> Subject: [PATCH] Added replication agent module (repagent) to Qemu under
>
> replication directory, added repagent configure and run
>
> options, and the repagent API usage in bloc
>
>
>
> Added build options to ./configure: --enable-replication --disable-replicat
>
> Added a commandline option to enable: -repagent<rep hub IP>
>
> Added the module files under replication.
>
>
>
> Signed-off-by: Ori Mamluk<orim@zerto.com>
>
> ---
>
> Makefile | 9 +-
>
> Makefile.objs | 6 +
>
> block.c | 20 +++-
>
> configure | 11 ++
>
> qemu-options.hx | 6 +
>
> replication/qemu-repagent.txt | 104 +++++++++++++
>
> replication/repagent.c | 322
> +++++++++++++++++++++++++++++++++++++++++
>
> replication/repagent.h | 46 ++++++
>
> replication/repagent_client.c | 138 ++++++++++++++++++
>
> replication/repagent_client.h | 36 +++++
>
> replication/repcmd.h | 59 ++++++++
>
> replication/repcmd_listener.c | 137 +++++++++++++++++
>
> replication/repcmd_listener.h | 32 ++++
>
> replication/rephub_cmds.h | 150 +++++++++++++++++++
>
> replication/rephub_defs.h | 40 +++++
>
> vl.c | 10 ++
>
> 16 files changed, 1121 insertions(+), 5 deletions(-)
>
> mode change 100644 => 100755 Makefile.objs
>
> mode change 100644 => 100755 qemu-options.hx
>
> create mode 100755 replication/qemu-repagent.txt
>
> create mode 100644 replication/repagent.c
>
> create mode 100644 replication/repagent.h
>
> create mode 100644 replication/repagent_client.c
>
> create mode 100644 replication/repagent_client.h
>
> create mode 100644 replication/repcmd.h
>
> create mode 100644 replication/repcmd_listener.c
>
> create mode 100644 replication/repcmd_listener.h
>
> create mode 100644 replication/rephub_cmds.h
>
> create mode 100644 replication/rephub_defs.h
>
>
>
> diff --git a/Makefile b/Makefile
>
> index 4f6eaa4..a1b3701 100644
>
> --- a/Makefile
>
> +++ b/Makefile
>
> @@ -149,9 +149,9 @@ qemu-img.o qemu-tool.o qemu-nbd.o qemu-io.o cmd.o
> qemu-ga.o: $(GENERATED_HEADERS
>
> tools-obj-y = qemu-tool.o $(oslib-obj-y) $(trace-obj-y) \
>
> qemu-timer-common.o cutils.o
>
> -qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y)
>
> -qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y)
>
> -qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y)
>
> +qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y)
> $(replication-obj-y)
>
> +qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y)
> $(replication-obj-y)
>
> +qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y)
> $(replication-obj-y)
>
> qemu-img-cmds.h: $(SRC_PATH)/qemu-img-cmds.hx
>
> $(call quiet-command,sh $(SRC_PATH)/scripts/hxtool -h< $< >
> $@," GEN $@")
>
> @@ -228,6 +228,7 @@ clean:
>
> rm -f trace-dtrace.dtrace trace-dtrace.dtrace-timestamp
>
> rm -f trace-dtrace.h trace-dtrace.h-timestamp
>
> rm -rf $(qapi-dir)
>
> + rm -f replication/*.{o,d}
>
> $(MAKE) -C tests clean
>
> for d in $(ALL_SUBDIRS) $(QEMULIBS) libcacard; do \
>
> if test -d $$d; then $(MAKE) -C $$d $@ || exit 1; fi; \
>
> @@ -387,4 +388,4 @@ tar:
>
> rm -rf /tmp/$(FILE)
>
> # Include automatically generated dependency files
>
> --include $(wildcard *.d audio/*.d slirp/*.d block/*.d net/*.d ui/*.d
> qapi/*.d qga/*.d)
>
> +-include $(wildcard *.d audio/*.d slirp/*.d block/*.d net/*.d ui/*.d
> qapi/*.d qga/*.d replication/*.d)
>
> diff --git a/Makefile.objs b/Makefile.objs
>
> old mode 100644
>
> new mode 100755
>
> index d7a6539..dbd6f15
>
> --- a/Makefile.objs
>
> +++ b/Makefile.objs
>
> @@ -74,6 +74,7 @@ fsdev-obj-$(CONFIG_VIRTFS) += $(addprefix fsdev/,
> $(fsdev-nested-y))
>
> # CPUs and machines.
>
> common-obj-y = $(block-obj-y) blockdev.o
>
> +common-obj-y += $(replication-obj-$(CONFIG_REPLICATION))
>
> common-obj-y += $(net-obj-y)
>
> common-obj-y += $(qobject-obj-y)
>
> common-obj-$(CONFIG_LINUX) += $(fsdev-obj-$(CONFIG_LINUX))
>
> @@ -413,6 +414,11 @@ common-obj-y += qmp-marshal.o qapi-visit.o
> qapi-types.o $(qapi-obj-y)
>
> common-obj-y += qmp.o hmp.o
>
> ######################################################################
>
> +# replication
>
> +replication-nested-y = repagent_client.o repagent.o repcmd_listener.o
>
> +replication-obj-y = $(addprefix replication/, $(replication-nested-y))
>
> +
>
> +######################################################################
>
> # guest agent
>
> qga-nested-y = guest-agent-commands.o guest-agent-command-state.o
>
> diff --git a/block.c b/block.c
>
> index 9bb236c..f3b8387 100644
>
> --- a/block.c
>
> +++ b/block.c
>
> @@ -31,6 +31,10 @@
>
> #include "qemu-coroutine.h"
>
> #include "qmp-commands.h"
>
> +#ifdef CONFIG_REPLICATION
>
> +#include "replication/repagent.h"
>
> +#endif
>
> +
>
> #ifdef CONFIG_BSD
>
> #include<sys/types.h>
>
> #include<sys/stat.h>
>
> @@ -640,6 +644,9 @@ int bdrv_open(BlockDriverState *bs, const char
> *filename, int flags,
>
> goto unlink_and_fail;
>
> }
>
> +#ifdef CONFIG_REPLICATION
>
> + repagent_register_drive(filename, bs);
>
> +#endif
>
> /* Open the image */
>
> ret = bdrv_open_common(bs, filename, flags, drv);
>
> if (ret< 0) {
>
> @@ -1292,6 +1299,17 @@ static int coroutine_fn
> bdrv_co_do_writev(BlockDriverState *bs,
>
> ret = drv->bdrv_co_writev(bs, sector_num, nb_sectors, qiov);
>
> +
>
> +#ifdef CONFIG_REPLICATION
>
> + if (bs->device_name[0] != '\0') {
>
> + /* We split the IO only at the highest stack driver layer.
>
> + Currently we know that by checking device_name - only
>
> + highest level (closest to the guest) has that name.
>
> + */
>
> + repagent_handle_protected_write(bs, sector_num,
>
> + nb_sectors, qiov, ret);
>
> + }
>
> +#endif
>
> if (bs->dirty_bitmap) {
>
> set_dirty_bitmap(bs, sector_num, nb_sectors, 1);
>
> }
>
> @@ -1783,7 +1801,7 @@ int bdrv_has_zero_init(BlockDriverState *bs)
>
> * 'nb_sectors' is the max value 'pnum' should be set to.
>
> */
>
> int bdrv_is_allocated(BlockDriverState *bs, int64_t sector_num, int
> nb_sectors,
>
> - int *pnum)
>
> + int *pnum)
>
> {
>
> int64_t n;
>
> if (!bs->drv->bdrv_is_allocated) {
>
> diff --git a/configure b/configure
>
> index 9e5da44..93d600e 100755
>
> --- a/configure
>
> +++ b/configure
>
> @@ -179,6 +179,7 @@ spice=""
>
> rbd=""
>
> smartcard=""
>
> smartcard_nss=""
>
> +replication=""
>
> usb_redir=""
>
> opengl=""
>
> zlib="yes"
>
> @@ -772,6 +773,10 @@ for opt do
>
> ;;
>
> --enable-smartcard-nss) smartcard_nss="yes"
>
> ;;
>
> + --disable-replication) replication="no"
>
> + ;;
>
> + --enable-replication) replication="yes"
>
> + ;;
>
> --disable-usb-redir) usb_redir="no"
>
> ;;
>
> --enable-usb-redir) usb_redir="yes"
>
> @@ -1067,6 +1072,7 @@ echo " --disable-usb-redir disable usb network
> redirection support"
>
> echo " --enable-usb-redir enable usb network redirection support"
>
> echo " --disable-guest-agent disable building of the QEMU Guest Agent"
>
> echo " --enable-guest-agent enable building of the QEMU Guest Agent"
>
> +echo " --enable-replication enable replication support"
>
> echo ""
>
> echo "NOTE: The object files are built at the place where configure is
> launched"
>
> exit 1
>
> @@ -2733,6 +2739,7 @@ echo "curl support $curl"
>
> echo "check support $check_utests"
>
> echo "mingw32 support $mingw32"
>
> echo "Audio drivers $audio_drv_list"
>
> +echo "Replication $replication"
>
> echo "Extra audio cards $audio_card_list"
>
> echo "Block whitelist $block_drv_whitelist"
>
> echo "Mixer emulation $mixemu"
>
> @@ -3080,6 +3087,10 @@ if test "$smartcard_nss" = "yes" ; then
>
> echo "CONFIG_SMARTCARD_NSS=y">> $config_host_mak
>
> fi
>
> +if test "$replication" = "yes" ; then
>
> + echo "CONFIG_REPLICATION=y">> $config_host_mak
>
> +fi
>
> +
>
> if test "$usb_redir" = "yes" ; then
>
> echo "CONFIG_USB_REDIR=y">> $config_host_mak
>
> fi
>
> diff --git a/qemu-options.hx b/qemu-options.hx
>
> old mode 100644
>
> new mode 100755
>
> index 681eaf1..c97e4f8
>
> --- a/qemu-options.hx
>
> +++ b/qemu-options.hx
>
> @@ -2602,3 +2602,9 @@ HXCOMM This is the last statement. Insert new options
> before this line!
>
> STEXI
>
> @end table
>
> ETEXI
>
> +
>
> +DEF("repagent", HAS_ARG, QEMU_OPTION_repagent,
>
> + "-repagent [hub IP/name]\n"
>
> + " Enable replication support for disks\n"
>
> + " hub is the ip or name of the machine running the
> replication hub.\n",
>
> + QEMU_ARCH_ALL)
>
> diff --git a/replication/qemu-repagent.txt b/replication/qemu-repagent.txt
>
> new file mode 100755
>
> index 0000000..e3b0c1e
>
> --- /dev/null
>
> +++ b/replication/qemu-repagent.txt
>
> @@ -0,0 +1,104 @@
>
> + repagent - replication agent - a Qemu module for enabling
> continuous async replication of VM volumes
>
> +
>
> +Introduction
>
> + This document describes a feature in Qemu - a replication
> agent (AKA Repagent).
>
> + The Repagent is a new module that exposes an API to an
> external replication system (AKA Rephub).
>
> + This API allows a Rephub to communicate with a Qemu VM and
> continuously replicate its volumes.
>
> + The imlementation of a Rephub is outside of the scope of this
> document. There may be several various Rephub
>
> + implenetations using the same repagent in Qemu.
>
> +
>
> +Main feature of Repagent
>
> + Repagent does the following:
>
> + * Report volumes - report a list of all volumes in a VM to
> the Rephub.
>
> + * Report writes to a volume - send all writes made to a
> protected volume to the Rephub.
>
> + The reporting of an IO is asyncronuous - i.e.
> the IO is not delayed by the Repagent to get any acknowledgement from the
> Rephub.
>
> + It is only copied to the Rephub.
>
> + * Read a protected volume - allows the Rephub to read a
> protected volume, to enable the protected hub to syncronize the content of
> a protected volume.
>
> +
>
> +Description of the Repagent module
>
> +
>
> +Build and run options
>
> + New configure option: --enable-replication
>
> + New command line option:
>
> + -repagent [hub IP/name]
>
> +
> Enable replication support for disks
>
> +
> hub is the ip or name of the machine running the replication hub.
>
> +
>
> +Module APIs
>
> + The Repagent module interfaces two main components:
>
> + 1. The Rephub - An external API based on socket messages
>
> + 2. The generic block layer- block.c
>
> +
>
> + Rephub message API
>
> + The external replication API is a message
> based API.
>
> + We won't go into the structure of the
> messages here - just the sematics.
>
> +
>
> + Messages list
>
> + (The updated list and
> comments are in Rephub_cmds.h)
>
> +
>
> + Messages from the Repagent to
> the Rephub:
>
> + * Protected write
>
> + The Repagent
> sends each write to a protected volume to the hub with the IO status.
>
> + In case the
> status is bad the write content is not sent
>
> + * Report VM volumes
>
> + The agent
> reports all the volumes of the VM to the hub.
>
> + * Read Volume Response
>
> + A response to
> a Read Volume Request
>
> + Sends the
> data read from a protected volume to the hub
>
> + * Agent shutdown
>
> + Notifies the
> hub that the agent is about to shutdown.
>
> + This allows a
> graceful shutdown. Any disconnection of an agent without
>
> + sending this
> command will result in a full sync of the VM volumes.
>
> +
>
> + Messages from the Rephub to
> the Repagent:
>
> + * Start protect
>
> + The hub
> instructs the agent to start protecting a volume. When a volume is protected
>
> + all its
> writes are sent to to the hub.
>
> + With this
> command the hub also assigns a volume ID to the given volume name.
>
> + * Read volume request
>
> + The hub
> issues a read IO to a protected volume.
>
> + This command
> is used during sync - when the hub needs to read unsyncronized
>
> + sections of a
> protected volume.
>
> + This command
> is a request, the read data is returned by the read volume response message
> (see above).
>
> + block.c API
>
> + The API to the generic block storage layer
> contains 3 functionalities:
>
> + 1. Handle writes to protected volumes
>
> + In bdrv_co_do_writev, each
> write is reported to the Repagent module.
>
> + 2. Handle each new volume that registers
>
> + In bdrv_open - each new
> bottom-level block driver that registers is reported.
>
> + 2. Read from a volume
>
> + Repagent calls bdrv_aio_readv
> to handle read requests coming from the hub.
>
> +
>
> +
>
> +General description of a Rephub - a replication system the repagent
> connects to
>
> + This section describes in high level a sample Rephub - a
> replication system that uses the repagent API
>
> + to replicate disks.
>
> + It describes a simple Rephub that comntinuously maintains a
> mirror of the volumes of a VM.
>
> +
>
> + Say we have a VM we want to protect - call it PVM, say it has
> 2 volumes - V1, V2.
>
> + Our Rephub is called SingleRephub - a Rephub protecting a
> single VM.
>
> +
>
> + Preparations
>
> + 1. The user chooses a host to rub SingleRephub - a different
> host than PVM, call it Host2
>
> + 2. The user creates two volumes on Host2 - same sizes of V1
> and V2, call them V1R (V1 recovery) and V2R.
>
> + 3. The user runs SingleRephub process on Host2, and gives V1R
> and V2R as command line arguments.
>
> + From now on SingleRephub waits for the
> protected VM repagent to connect.
>
> + 4. The user runs the protected VM PVM - and uses the switch
> -repagent<Host2 IP>.
>
> +
>
> + Runtime
>
> + 1. The repagent module connects to SingleRephub on startup.
>
> + 2. repagent reports V1 and V2 to SingleRephub.
>
> + 3. SingleRephub starts to perform an initial synchronization
> of the protected volumes-
>
> + it reads each protected volume (V1 and V2) -
> using read volume requests - and copies the data into the
>
> + recovery volume V1R and V2R.
>
> + 4. SingleRephub enters 'protection' mode - each write to the
> protected volume is sent by the repagent to the Rephub,
>
> + and the Rephub performs the write on the
> matching recovery volume.
>
> +
>
> + * Note that during stage 3 writes to the protected volumes
> are not ignored - they're kept in a bitmap,
>
> + and will be read again when stage 3 ends, in
> an interative convergin process.
>
> +
>
> + This flow continuously maintains an updated recovery volume.
>
> + If the protected system is damaged, the user can create a new
> VM on Host2 with the replicated volumes attached to it.
>
> + The new VM is a replica of the protected system.
>
> +
>
> +
>
> diff --git a/replication/repagent.c b/replication/repagent.c
>
> new file mode 100644
>
> index 0000000..c66eae7
>
> --- /dev/null
>
> +++ b/replication/repagent.c
>
> @@ -0,0 +1,322 @@
>
> +#include<string.h>
>
> +#include<stdlib.h>
>
> +#include<stdio.h>
>
> +#include<pthread.h>
>
> +#include<stdint.h>
>
> +
>
> +#include "block.h"
>
> +#include "rephub_defs.h"
>
> +#include "block_int.h"
>
> +#include "repagent_client.h"
>
> +#include "repagent.h"
>
> +#include "rephub_cmds.h"
>
> +
>
> +#define ZERO_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj))
>
> +#define REPAGENT_MAX_NUM_VOLUMES (64)
>
> +#define REPAGENT_VOLUME_ID_NONE (0)
>
> +
>
> +typedef struct RepagentVolume {
>
> + uint64_t vol_id;
>
> + const char *vol_path;
>
> + BlockDriverState *driver_ptr;
>
> +} RepagentVolume;
>
> +
>
> +struct RepAgentState {
>
> + int is_init;
>
> + int num_volumes;
>
> + RepagentVolume * volumes[REPAGENT_MAX_NUM_VOLUMES];
>
> +};
>
> +
>
> +typedef struct RepagentReadVolIo {
>
> + QEMUIOVector qiov;
>
> + RepCmdReadVolReq rep_cmd;
>
> + uint8_t *buf;
>
> + struct timeval start_time;
>
> +} RepagentReadVolIo;
>
> +
>
> +static int repagent_get_volume_by_name(const char *name);
>
> +static void repagent_report_volumes_to_hub(void);
>
> +static void repagent_vol_read_done(void *opaque, int ret);
>
> +static struct timeval tsub(struct timeval t1, struct timeval t2);
>
> +
>
> +RepAgentState g_rep_agent = { 0 };
>
> +
>
> +void repagent_init(const char *hubname, int port)
>
> +{
>
> + /* It is the responsibility of the thread to free this struct */
>
> + rephub_params *pParams = (rephub_params
> *)g_malloc(sizeof(rephub_params));
>
> + if (hubname == NULL) {
>
> + hubname = "127.0.0.1";
>
> + }
>
> + if (port == 0) {
>
> + port = 9010;
>
> + }
>
> +
>
> + printf("repagent_init %s\n", hubname);
>
> +
>
> + pParams->port = port;
>
> + pParams->name = g_strdup(hubname);
>
> +
>
> + pthread_t thread_id = 0;
>
> +
>
> + /* Create the repagent client listener thread */
>
> + pthread_create(&thread_id, 0, repagent_listen, (void *) pParams);
>
> + pthread_detach(thread_id);
>
> +}
>
> +
>
> +void repagent_register_drive(const char *drive_path,
>
> + BlockDriverState *driver_ptr)
>
> +{
>
> + int i;
>
> + for (i = 0; i< g_rep_agent.num_volumes ; i++) {
>
> + RepagentVolume *vol = g_rep_agent.volumes[i];
>
> + if (vol != NULL) {
>
> + assert(
>
> + strcmp(drive_path, vol->vol_path) != 0
>
> +&& driver_ptr != vol->driver_ptr);
>
> + }
>
> + }
>
> +
>
> + assert(g_rep_agent.num_volumes< REPAGENT_MAX_NUM_VOLUMES);
>
> +
>
> + printf("zerto repagent: Registering drive. Num drives %d, path %s\n",
>
> + g_rep_agent.num_volumes, drive_path);
>
> + g_rep_agent.volumes[i] =
>
> + (RepagentVolume *)g_malloc(sizeof(RepagentVolume));
>
> + g_rep_agent.volumes[i]->driver_ptr = driver_ptr;
>
> + /* orim todo strcpy? */
>
> + g_rep_agent.volumes[i]->vol_path = drive_path;
>
> +
>
> + /* Orim todo thread-safety? */
>
> + g_rep_agent.num_volumes++;
>
> +
>
> + repagent_report_volumes_to_hub();
>
> +}
>
> +
>
> +/* orim todo destruction? */
>
> +
>
> +static RepagentVolume *repagent_get_protected_volume_by_driver(
>
> + BlockDriverState *bs)
>
> +{
>
> + /* orim todo optimize search */
>
> + int i = 0;
>
> + for (i = 0; i< g_rep_agent.num_volumes ; i++) {
>
> + RepagentVolume *p_vol = g_rep_agent.volumes[i];
>
> + if (p_vol != NULL&& p_vol->driver_ptr == (void *) bs) {
>
> + return p_vol;
>
> + }
>
> + }
>
> + return NULL;
>
> +}
>
> +
>
> +void repagent_handle_protected_write(BlockDriverState *bs, int64_t
> sector_num,
>
> + int nb_sectors, QEMUIOVector *qiov, int ret_status)
>
> +{
>
> + printf("zerto Protected write offset %lld, size %d, IO return status
> %d",
>
> + (long long int) sector_num, nb_sectors, ret_status);
>
> + if (bs->filename != NULL) {
>
> + printf(", filename %s", bs->filename);
>
> + }
>
> +
>
> + printf("\n");
>
> +
>
> + RepagentVolume *p_vol = repagent_get_protected_volume_by_driver(bs);
>
> + if (p_vol == NULL || p_vol->vol_id == REPAGENT_VOLUME_ID_NONE) {
>
> + /* Unprotected */
>
> + printf("Got a write to an unprotected volume.\n");
>
> + return;
>
> + }
>
> +
>
> + /* Report IO to rephub */
>
> +
>
> + int data_size = qiov->size;
>
> + if (ret_status< 0) {
>
> + /* On failed ios we don't send the data to the hub */
>
> + data_size = 0;
>
> + }
>
> + uint8_t *pdata = NULL;
>
> + RepCmdProtectedWrite *p_cmd = (RepCmdProtectedWrite *) repcmd_new(
>
> + REPHUB_CMD_PROTECTED_WRITE, data_size, (uint8_t **)&pdata);
>
> +
>
> + if (ret_status>= 0) {
>
> + qemu_iovec_to_buffer(qiov, pdata);
>
> + }
>
> +
>
> + p_cmd->volume_id = p_vol->vol_id;
>
> + p_cmd->offset_sectors = sector_num;
>
> + p_cmd->size_sectors = nb_sectors;
>
> + p_cmd->ret_status = ret_status;
>
> +
>
> + if (repagent_client_send((RepCmd *) p_cmd) != 0) {
>
> + printf("Error sending command\n");
>
> + }
>
> +}
>
> +
>
> +static void repagent_report_volumes_to_hub(void)
>
> +{
>
> + /* Report IO to rephub */
>
> + int i;
>
> + RepCmdDataReportVmVolumes *p_cmd_data = NULL;
>
> + RepCmdReportVmVolumes *p_cmd = (RepCmdReportVmVolumes *) repcmd_new(
>
> + REPHUB_CMD_REPORT_VM_VOLUMES,
>
> + g_rep_agent.num_volumes * sizeof(RepVmVolumeInfo),
>
> + (uint8_t **)&p_cmd_data);
>
> + p_cmd->num_volumes = g_rep_agent.num_volumes;
>
> + printf("reporting %u volumes\n", g_rep_agent.num_volumes);
>
> + for (i = 0; i< g_rep_agent.num_volumes ; i++) {
>
> + assert(g_rep_agent.volumes[i] != NULL);
>
> + printf("reporting volume %s size %u\n",
>
> + g_rep_agent.volumes[i]->vol_path,
>
> + (uint32_t) sizeof(p_cmd_data->volumes[i].name));
>
> + strncpy((char *) p_cmd_data->volumes[i].name,
>
> + g_rep_agent.volumes[i]->vol_path,
>
> + sizeof(p_cmd_data->volumes[i].name));
>
> + p_cmd_data->volumes[i].volume_id = g_rep_agent.volumes[i]->vol_id;
>
> + }
>
> + if (repagent_client_send((RepCmd *) p_cmd) != 0) {
>
> + printf("Error sending command\n");
>
> + }
>
> +}
>
> +
>
> +int repaget_start_protect(RepCmdStartProtect *pcmd,
>
> + RepCmdDataStartProtect *pcmd_data)
>
> +{
>
> + printf("Start protect vol %s, ID %llu\n", pcmd_data->volume_name,
>
> + (unsigned long long) pcmd->volume_id);
>
> + int vol_index = repagent_get_volume_by_name(pcmd_data->volume_name);
>
> + if (vol_index< 0) {
>
> + printf("The volume doesn't exist\n");
>
> + return TRUE;
>
> + }
>
> + /* orim todo protect */
>
> + g_rep_agent.volumes[vol_index]->vol_id = pcmd->volume_id;
>
> +
>
> + return TRUE;
>
> +}
>
> +
>
> +static int repagent_get_volume_by_name(const char *name)
>
> +{
>
> + int i = 0;
>
> + for (i = 0; i< g_rep_agent.num_volumes ; i++) {
>
> + if (g_rep_agent.volumes[i] != NULL
>
> +&& strcmp(name, g_rep_agent.volumes[i]->vol_path) == 0) {
>
> + return i;
>
> + }
>
> + }
>
> + return -1;
>
> +}
>
> +
>
> +static int repagent_get_volume_by_id(uint64_t vol_id)
>
> +{
>
> + int i = 0;
>
> + for (i = 0; i< g_rep_agent.num_volumes ; i++) {
>
> + if (g_rep_agent.volumes[i] != NULL
>
> +&& g_rep_agent.volumes[i]->vol_id == vol_id) {
>
> + return i;
>
> + }
>
> + }
>
> + return -1;
>
> +}
>
> +
>
> +int repaget_read_vol(RepCmdReadVolReq *pcmd, uint8_t *pdata)
>
> +{
>
> + int index = repagent_get_volume_by_id(pcmd->volume_id);
>
> + int size_bytes = pcmd->size_sectors * 512;
>
> + if (index< 0) {
>
> + printf("Vol read - Could not find vol id %llu\n",
>
> + (unsigned long long int) pcmd->volume_id);
>
> + RepCmdReadVolRes *p_res_cmd = (RepCmdReadVolRes *) repcmd_new(
>
> + REPHUB_CMD_READ_VOL_RES, 0, NULL);
>
> + p_res_cmd->req_id = pcmd->req_id;
>
> + p_res_cmd->volume_id = pcmd->volume_id;
>
> + p_res_cmd->is_status_success = FALSE;
>
> + repagent_client_send((RepCmd *) p_res_cmd);
>
> + return TRUE;
>
> + }
>
> +
>
> + printf("Vol read - driver %p, volId %llu, offset %llu, size %u\n",
>
> + g_rep_agent.volumes[index]->driver_ptr,
>
> + (unsigned long long int) pcmd->volume_id,
>
> + (unsigned long long int) pcmd->offset_sectors,
> pcmd->size_sectors);
>
> +
>
> + {
>
> + RepagentReadVolIo *read_xact = calloc(1,
> sizeof(RepagentReadVolIo));
>
> +
>
> +/* BlockDriverAIOCB *acb; */
>
> +
>
> + ZERO_MEM_OBJ(read_xact);
>
> +
>
> + qemu_iovec_init(&read_xact->qiov, 1);
>
> +
>
> + /*read_xact->buf =
>
> + qemu_blockalign(g_rep_agent.volumes[index]->driver_ptr,
> size_bytes); */
>
> + read_xact->buf = (uint8_t *) g_malloc(size_bytes);
>
> + read_xact->rep_cmd = *pcmd;
>
> + qemu_iovec_add(&read_xact->qiov, read_xact->buf, size_bytes);
>
> +
>
> + gettimeofday(&read_xact->start_time, NULL);
>
> + /* orim TODO - use the returned acb to cancel the request on
> shutdown */
>
> + /*acb = */bdrv_aio_readv(g_rep_agent.volumes[index]->driver_ptr,
>
> + read_xact->rep_cmd.offset_sectors,&read_xact->qiov,
>
> + read_xact->rep_cmd.size_sectors, repagent_vol_read_done,
>
> + read_xact);
>
> + }
>
> +
>
> + return TRUE;
>
> +}
>
> +
>
> +static void repagent_vol_read_done(void *opaque, int ret)
>
> +{
>
> + struct timeval t2;
>
> + RepagentReadVolIo *read_xact = (RepagentReadVolIo *) opaque;
>
> + uint8_t *pdata = NULL;
>
> + RepCmdReadVolRes *pcmd = (RepCmdReadVolRes *) repcmd_new(
>
> + REPHUB_CMD_READ_VOL_RES, read_xact->rep_cmd.size_sectors * 512,
>
> +&pdata);
>
> + pcmd->req_id = read_xact->rep_cmd.req_id;
>
> + pcmd->volume_id = read_xact->rep_cmd.volume_id;
>
> + pcmd->is_status_success = FALSE;
>
> +
>
> + printf("Protected vol read - volId %llu, offset %llu, size %u\n",
>
> + (unsigned long long int) read_xact->rep_cmd.volume_id,
>
> + (unsigned long long int) read_xact->rep_cmd.offset_sectors,
>
> + read_xact->rep_cmd.size_sectors);
>
> + gettimeofday(&t2, NULL);
>
> +
>
> + if (ret>= 0) {
>
> + /* Read response - send the data to the hub */
>
> + t2 = tsub(t2, read_xact->start_time);
>
> + printf("Read prot vol done. Took %u seconds, %u us.",
>
> + (uint32_t) t2.tv_sec, (uint32_t) t2.tv_usec);
>
> +
>
> + pcmd->is_status_success = TRUE;
>
> + /* orim todo optimize - don't copy, use the qiov buffer */
>
> + qemu_iovec_to_buffer(&read_xact->qiov, pdata);
>
> + } else {
>
> + printf("readv failed: %s\n", strerror(-ret));
>
> + }
>
> +
>
> + repagent_client_send((RepCmd *) pcmd);
>
> +
>
> + /*qemu_vfree(read_xact->buf); */
>
> + g_free(read_xact->buf);
>
> +
>
> + g_free(read_xact);
>
> +}
>
> +
>
> +static struct timeval tsub(struct timeval t1, struct timeval t2)
>
> +{
>
> + t1.tv_usec -= t2.tv_usec;
>
> + if (t1.tv_usec< 0) {
>
> + t1.tv_usec += 1000000;
>
> + t1.tv_sec--;
>
> + }
>
> + t1.tv_sec -= t2.tv_sec;
>
> + return t1;
>
> +}
>
> +
>
> +void repagent_client_connected(void)
>
> +{
>
> + /* orim todo thread protection */
>
> + repagent_report_volumes_to_hub();
>
> +}
>
> diff --git a/replication/repagent.h b/replication/repagent.h
>
> new file mode 100644
>
> index 0000000..98ccbf2
>
> --- /dev/null
>
> +++ b/replication/repagent.h
>
> @@ -0,0 +1,46 @@
>
> +/*
>
> + * QEMU System Emulator
>
> + *
>
> + * Copyright (c) 2003-2008 Fabrice Bellard
>
> + *
>
> + * Permission is hereby granted, free of charge, to any person obtaining a
> copy
>
> + * of this software and associated documentation files (the "Software"),
> to deal
>
> + * in the Software without restriction, including without limitation the
> rights
>
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or
> sell
>
> + * copies of the Software, and to permit persons to whom the Software is
>
> + * furnished to do so, subject to the following conditions:
>
> + *
>
> + * The above copyright notice and this permission notice shall be included
> in
>
> + * all copies or substantial portions of the Software.
>
> + *
>
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
> OR
>
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
>
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
>
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> OTHER
>
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
> FROM,
>
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
> IN
>
> + * THE SOFTWARE.
>
> + */
>
> +#ifndef REPAGENT_H
>
> +#define REPAGENT_H
>
> +#include<stdint.h>
>
> +
>
> +#include "qemu-common.h"
>
> +
>
> +typedef struct RepAgentState RepAgentState;
>
> +typedef struct RepCmdStartProtect RepCmdStartProtect;
>
> +typedef struct RepCmdDataStartProtect RepCmdDataStartProtect;
>
> +struct RepCmdReadVolReq;
>
> +
>
> +void repagent_init(const char *hubname, int port);
>
> +void repagent_handle_protected_write(BlockDriverState *bs,
>
> + int64_t sector_num, int nb_sectors, QEMUIOVector *qiov, int
> ret_status);
>
> +void repagent_register_drive(const char *drive_path,
>
> + BlockDriverState *driver_ptr);
>
> +int repaget_start_protect(RepCmdStartProtect *pcmd,
>
> + RepCmdDataStartProtect *pcmd_data);
>
> +int repaget_read_vol(struct RepCmdReadVolReq *pcmd, uint8_t *pdata);
>
> +void repagent_client_connected(void);
>
> +
>
> +
>
> +#endif /* REPAGENT_H */
>
> diff --git a/replication/repagent_client.c b/replication/repagent_client.c
>
> new file mode 100644
>
> index 0000000..4dd9ea4
>
> --- /dev/null
>
> +++ b/replication/repagent_client.c
>
> @@ -0,0 +1,138 @@
>
> +#include "repcmd.h"
>
> +#include "rephub_cmds.h"
>
> +#include "repcmd_listener.h"
>
> +#include "repagent_client.h"
>
> +#include "repagent.h"
>
> +
>
> +#include<string.h>
>
> +#include<stdlib.h>
>
> +#include<errno.h>
>
> +#include<stdio.h>
>
> +#include<resolv.h>
>
> +#include<sys/socket.h>
>
> +#include<arpa/inet.h>
>
> +#include<netinet/in.h>
>
> +#include<unistd.h>
>
> +
>
> +#define ZERO_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj))
>
> +
>
> +static void repagent_process_cmd(RepCmd *pCmd, uint8_t *pData, void
> *clientPtr);
>
> +
>
> +typedef struct repagent_client_state {
>
> + int is_connected;
>
> + int is_terminate_receive;
>
> + int hsock;
>
> +} repagent_client_state;
>
> +
>
> +static repagent_client_state g_client_state = { 0 };
>
> +
>
> +void *repagent_listen(void *pParam)
>
> +{
>
> + rephub_params *pServerParams = (rephub_params *) pParam;
>
> + int host_port = pServerParams->port;
>
> + const char *host_name = pServerParams->name;
>
> +
>
> + printf("Creating repagent listener thread...\n");
>
> + g_free(pServerParams);
>
> +
>
> + struct sockaddr_in my_addr;
>
> +
>
> + int err;
>
> + int retries = 0;
>
> +
>
> + g_client_state.hsock = socket(AF_INET, SOCK_STREAM, 0);
>
> + if (g_client_state.hsock == -1) {
>
> + printf("Error initializing socket %d\n", errno);
>
> + return (void *) -1;
>
> + }
>
> +
>
> + int param = 1;
>
> +
>
> + if ((setsockopt(g_client_state.hsock, SOL_SOCKET, SO_REUSEADDR,
>
> + (char *)¶m, sizeof(int)) == -1)
>
> + || (setsockopt(g_client_state.hsock, SOL_SOCKET, SO_KEEPALIVE,
>
> + (char *)¶m, sizeof(int)) == -1)) {
>
> + printf("Error setting options %d\n", errno);
>
> + return (void *) -1;
>
> + }
>
> +
>
> + my_addr.sin_family = AF_INET;
>
> + my_addr.sin_port = htons(host_port);
>
> + memset(&(my_addr.sin_zero), 0, 8);
>
> +
>
> + my_addr.sin_addr.s_addr = inet_addr(host_name);
>
> +
>
> + /* Reconnect loop */
>
> + while (!g_client_state.is_terminate_receive) {
>
> +
>
> + if (connect(g_client_state.hsock, (struct sockaddr *)&my_addr,
>
> + sizeof(my_addr)) == -1) {
>
> + err = errno;
>
> + if (err != EINPROGRESS) {
>
> + retries++;
>
> + fprintf(
>
> + stderr,
>
> + "Error connecting socket %d. Host %s, port %u.
> Retry count %d\n",
>
> + errno, host_name, host_port, retries);
>
> + usleep(5 * 1000 * 1000);
>
> + continue;
>
> + }
>
> + }
>
> + retries = 0;
>
> +
>
> + g_client_state.is_connected = 1;
>
> +
>
> + repagent_client_connected();
>
> + repcmd_listener(g_client_state.hsock, repagent_process_cmd, NULL);
>
> + close(g_client_state.hsock);
>
> +
>
> + g_client_state.is_connected = 0;
>
> + }
>
> + return 0;
>
> +}
>
> +
>
> +void repagent_process_cmd(RepCmd *pcmd, uint8_t *pdata, void *clientPtr)
>
> +{
>
> + int is_free_data = 1;
>
> + printf("Repagent got cmd %d\n", pcmd->hdr.cmdid);
>
> + switch (pcmd->hdr.cmdid) {
>
> + case REPHUB_CMD_START_PROTECT: {
>
> + is_free_data = repaget_start_protect((RepCmdStartProtect *) pcmd,
>
> + (RepCmdDataStartProtect *) pdata);
>
> + }
>
> + break;
>
> + case REPHUB_CMD_READ_VOL_REQ: {
>
> + is_free_data = repaget_read_vol((RepCmdReadVolReq *) pcmd, pdata);
>
> + }
>
> + break;
>
> + default:
>
> + assert(0);
>
> + break;
>
> +
>
> + }
>
> +
>
> + if (is_free_data) {
>
> + g_free(pdata);
>
> + }
>
> +}
>
> +
>
> +int repagent_client_send(RepCmd *p_cmd)
>
> +{
>
> + int bytecount = 0;
>
> + printf("Send cmd %u, data size %u\n", p_cmd->hdr.cmdid,
>
> + p_cmd->hdr.data_size_bytes);
>
> + if (!g_client_state.is_connected) {
>
> + printf("Not connected to hub\n");
>
> + return -1;
>
> + }
>
> +
>
> + bytecount = send(g_client_state.hsock, p_cmd,
>
> + sizeof(RepCmd) + p_cmd->hdr.data_size_bytes, 0);
>
> + if (bytecount< sizeof(RepCmd) + p_cmd->hdr.data_size_bytes) {
>
> + printf("Bad send %d, errno %d\n", bytecount, errno);
>
> + return bytecount;
>
> + }
>
> +
>
> + /* Success */
>
> + return 0;
>
> +}
>
> diff --git a/replication/repagent_client.h b/replication/repagent_client.h
>
> new file mode 100644
>
> index 0000000..62a5377
>
> --- /dev/null
>
> +++ b/replication/repagent_client.h
>
> @@ -0,0 +1,36 @@
>
> +/*
>
> + * QEMU System Emulator
>
> + *
>
> + * Copyright (c) 2003-2008 Fabrice Bellard
>
> + *
>
> + * Permission is hereby granted, free of charge, to any person obtaining a
> copy
>
> + * of this software and associated documentation files (the "Software"),
> to deal
>
> + * in the Software without restriction, including without limitation the
> rights
>
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or
> sell
>
> + * copies of the Software, and to permit persons to whom the Software is
>
> + * furnished to do so, subject to the following conditions:
>
> + *
>
> + * The above copyright notice and this permission notice shall be included
> in
>
> + * all copies or substantial portions of the Software.
>
> + *
>
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
> OR
>
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
>
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
>
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> OTHER
>
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
> FROM,
>
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
> IN
>
> + * THE SOFTWARE.
>
> + */
>
> +#ifndef REPAGENT_CLIENT_H
>
> +#define REPAGENT_CLIENT_H
>
> +#include "repcmd.h"
>
> +
>
> +typedef struct rephub_params {
>
> + char *name;
>
> + int port;
>
> +} rephub_params;
>
> +
>
> +void *repagent_listen(void *pParam);
>
> +int repagent_client_send(RepCmd *p_cmd);
>
> +
>
> +#endif /* REPAGENT_CLIENT_H */
>
> diff --git a/replication/repcmd.h b/replication/repcmd.h
>
> new file mode 100644
>
> index 0000000..8c6cf1b
>
> --- /dev/null
>
> +++ b/replication/repcmd.h
>
> @@ -0,0 +1,59 @@
>
> +/*
>
> + * QEMU System Emulator
>
> + *
>
> + * Copyright (c) 2003-2008 Fabrice Bellard
>
> + *
>
> + * Permission is hereby granted, free of charge, to any person obtaining a
> copy
>
> + * of this software and associated documentation files (the "Software"),
> to deal
>
> + * in the Software without restriction, including without limitation the
> rights
>
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or
> sell
>
> + * copies of the Software, and to permit persons to whom the Software is
>
> + * furnished to do so, subject to the following conditions:
>
> + *
>
> + * The above copyright notice and this permission notice shall be included
> in
>
> + * all copies or substantial portions of the Software.
>
> + *
>
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
> OR
>
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
>
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
>
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> OTHER
>
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
> FROM,
>
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
> IN
>
> + * THE SOFTWARE.
>
> + */
>
> +#ifndef REPCMD_H
>
> +#define REPCMD_H
>
> +
>
> +#include<stdint.h>
>
> +
>
> +#define REPCMD_MAGIC1 (0x1122)
>
> +#define REPCMD_MAGIC2 (0x3344)
>
> +#define REPCMD_NUM_U32_PARAMS (11)
>
> +
>
> +enum RepCmds {
>
> + REPCMD_FIRST_INVALID = 0,
>
> + REPCMD_FIRST_HUBCMD = 1,
>
> + REPHUB_CMD_PROTECTED_WRITE = 2,
>
> + REPHUB_CMD_REPORT_VM_VOLUMES = 3,
>
> + REPHUB_CMD_START_PROTECT = 4,
>
> + REPHUB_CMD_READ_VOL_REQ = 5,
>
> + REPHUB_CMD_READ_VOL_RES = 6,
>
> + REPHUB_CMD_AGENT_SHUTDOWN = 7,
>
> +};
>
> +
>
> +typedef struct RepCmdHdr {
>
> + uint16_t magic1;
>
> + uint16_t cmdid;
>
> + uint32_t data_size_bytes;
>
> +} RepCmdHdr;
>
> +
>
> +typedef struct RepCmd {
>
> + RepCmdHdr hdr;
>
> + unsigned int parameters[REPCMD_NUM_U32_PARAMS];
>
> + unsigned int magic2;
>
> + uint8_t data[0];
>
> +} RepCmd;
>
> +
>
> +RepCmd *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata);
>
> +
>
> +#endif /* REPCMD_H */
>
> diff --git a/replication/repcmd_listener.c b/replication/repcmd_listener.c
>
> new file mode 100644
>
> index 0000000..a211927
>
> --- /dev/null
>
> +++ b/replication/repcmd_listener.c
>
> @@ -0,0 +1,137 @@
>
> +#include<fcntl.h>
>
> +#include<string.h>
>
> +#include<stdlib.h>
>
> +#include<errno.h>
>
> +#include<stdio.h>
>
> +#include<netinet/in.h>
>
> +#include<resolv.h>
>
> +#include<sys/socket.h>
>
> +#include<arpa/inet.h>
>
> +#include<unistd.h>
>
> +#include<pthread.h>
>
> +#include<assert.h>
>
> +
>
> +/* Use the CONFIG_REPLICATION flag to determine whether
>
> + * we're under qemu build or a hub When under
>
> + * qemu use g_malloc */
>
> +#ifdef CONFIG_REPLICATION
>
> +#include<glib.h>
>
> +#define REPCMD_MALLOC g_malloc
>
> +#else
>
> +#define REPCMD_MALLOC malloc
>
> +#endif
>
> +
>
> +#include "repcmd.h"
>
> +#include "repcmd_listener.h"
>
> +
>
> +#define ZERO_MEM_OBJ(pObj) memset((void *)pObj, 0, sizeof(*pObj))
>
> +
>
> +typedef struct RepCmdListenerState {
>
> + int is_terminate_receive;
>
> +} RepCmdListenerState;
>
> +
>
> +static RepCmdListenerState g_listenerState = { 0 };
>
> +
>
> +/* Returns 0 for initiated termination or socket error value on error */
>
> +int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void
> *clientPtr)
>
> +{
>
> + RepCmd curCmd;
>
> + uint8_t *pReadBuf = (uint8_t *)&curCmd;
>
> + int bytesToGet = sizeof(RepCmd);
>
> + int bytesGotten = 0;
>
> + int isGotHeader = 0;
>
> + uint8_t *pdata = NULL;
>
> +
>
> + assert(callback != NULL);
>
> +
>
> + /* receive loop */
>
> + while (!g_listenerState.is_terminate_receive) {
>
> + int bytecount;
>
> +
>
> + bytecount = recv(hsock, pReadBuf + bytesGotten,
>
> + bytesToGet - bytesGotten, 0);
>
> + if (bytecount == -1) {
>
> + fprintf(stderr, "Error receiving data %d\n", errno);
>
> + return errno;
>
> + }
>
> +
>
> + if (bytecount == 0) {
>
> + printf("Disconnected\n");
>
> + return 0;
>
> + }
>
> + bytesGotten += bytecount;
>
> +/* printf("Recieved bytes %d, got %d/%d\n",
>
> + bytecount, bytesGotten, bytesToGet); */
>
> + /* print content */
>
> + if (0) {
>
> + int i;
>
> + for (i = 0; i< bytecount ; i += 4) {
>
> + /*printf("%d/%d", i, bytecount/4); */
>
> + printf("%#x ",
>
> + *(int *) (&pReadBuf[bytesGotten - bytecount + i]));
>
> +
>
> + }
>
> + printf("\n");
>
> + }
>
> + assert(bytesGotten<= bytesToGet);
>
> + if (bytesGotten == bytesToGet) {
>
> + int isGotData = 0;
>
> + bytesGotten = 0;
>
> + if (!isGotHeader) {
>
> + /* We just got the header */
>
> + isGotHeader = 1;
>
> +
>
> + assert(curCmd.hdr.magic1 == REPCMD_MAGIC1);
>
> + assert(curCmd.magic2 == REPCMD_MAGIC2);
>
> + if (curCmd.hdr.data_size_bytes> 0) {
>
> + pdata = (uint8_t *)REPCMD_MALLOC(
>
> + curCmd.hdr.data_size_bytes);
>
> +/* printf("malloc %p\n", pdata); */
>
> + pReadBuf = pdata;
>
> + } else {
>
> + /* no data */
>
> + isGotData = 1;
>
> + pdata = NULL;
>
> + }
>
> + bytesToGet = curCmd.hdr.data_size_bytes;
>
> + } else {
>
> + isGotData = 1;
>
> + }
>
> +
>
> + if (isGotData) {
>
> + /* Got command and data */
>
> + (*callback)(&curCmd, pdata, clientPtr);
>
> +
>
> + /* It's the callee responsibility to free pData */
>
> + pdata = NULL;
>
> + ZERO_MEM_OBJ(&curCmd);
>
> + pReadBuf = (uint8_t *)&curCmd;
>
> + bytesGotten = 0;
>
> + bytesToGet = sizeof(RepCmd);
>
> + isGotHeader = 0;
>
> + }
>
> + }
>
> + }
>
> + return 0;
>
> +}
>
> +
>
> +RepCmd *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata)
>
> +{
>
> + RepCmd *p_cmd = (RepCmd *)REPCMD_MALLOC(sizeof(RepCmd) + data_size);
>
> + assert(p_cmd != NULL);
>
> +
>
> + /* Zero the CMD (not the data) */
>
> + ZERO_MEM_OBJ(p_cmd);
>
> +
>
> + p_cmd->hdr.cmdid = cmd_id;
>
> + p_cmd->hdr.magic1 = REPCMD_MAGIC1;
>
> + p_cmd->magic2 = REPCMD_MAGIC2;
>
> + p_cmd->hdr.data_size_bytes = data_size;
>
> +
>
> + if (p_out_pdata != NULL) {
>
> + *p_out_pdata = p_cmd->data;
>
> + }
>
> +
>
> + return p_cmd;
>
> +}
>
> +
>
> diff --git a/replication/repcmd_listener.h b/replication/repcmd_listener.h
>
> new file mode 100644
>
> index 0000000..c09a12e
>
> --- /dev/null
>
> +++ b/replication/repcmd_listener.h
>
> @@ -0,0 +1,32 @@
>
> +/*
>
> + * QEMU System Emulator
>
> + *
>
> + * Copyright (c) 2003-2008 Fabrice Bellard
>
> + *
>
> + * Permission is hereby granted, free of charge, to any person obtaining a
> copy
>
> + * of this software and associated documentation files (the "Software"),
> to deal
>
> + * in the Software without restriction, including without limitation the
> rights
>
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or
> sell
>
> + * copies of the Software, and to permit persons to whom the Software is
>
> + * furnished to do so, subject to the following conditions:
>
> + *
>
> + * The above copyright notice and this permission notice shall be included
> in
>
> + * all copies or substantial portions of the Software.
>
> + *
>
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
> OR
>
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
>
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
>
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> OTHER
>
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
> FROM,
>
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
> IN
>
> + * THE SOFTWARE.
>
> + */
>
> +#ifndef REPCMD_LISTENER_H
>
> +#define REPCMD_LISTENER_H
>
> +#include<stdint.h>
>
> +typedef void (*pfn_received_cmd_cb)(RepCmd *pCmd,
>
> + uint8_t *pData, void *clientPtr);
>
> +
>
> +int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void
> *clientPtr);
>
> +
>
> +#endif /* REPCMD_LISTENER_H */
>
> diff --git a/replication/rephub_cmds.h b/replication/rephub_cmds.h
>
> new file mode 100644
>
> index 0000000..820c37d
>
> --- /dev/null
>
> +++ b/replication/rephub_cmds.h
>
> @@ -0,0 +1,150 @@
>
> +/*
>
> + * QEMU System Emulator
>
> + *
>
> + * Copyright (c) 2003-2008 Fabrice Bellard
>
> + *
>
> + * Permission is hereby granted, free of charge, to any person obtaining a
> copy
>
> + * of this software and associated documentation files (the "Software"),
> to deal
>
> + * in the Software without restriction, including without limitation the
> rights
>
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or
> sell
>
> + * copies of the Software, and to permit persons to whom the Software is
>
> + * furnished to do so, subject to the following conditions:
>
> + *
>
> + * The above copyright notice and this permission notice shall be included
> in
>
> + * all copies or substantial portions of the Software.
>
> + *
>
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
> OR
>
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
>
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
>
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> OTHER
>
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
> FROM,
>
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
> IN
>
> + * THE SOFTWARE.
>
> + */
>
> +#ifndef REPHUB_CMDS_H
>
> +#define REPHUB_CMDS_H
>
> +
>
> +#include<stdint.h>
>
> +#include "repcmd.h"
>
> +#include "rephub_defs.h"
>
> +
>
> +/*********************************************************
>
> + * RepCmd Report a protected IO
>
> + *
>
> + * REPHUB_CMD_PROTECTED_WRITE
>
> + * Direction: agent->hub
>
> + *
>
> + * Any write of a protected volume is send with this
>
> + * message to the hub, with its status.
>
> + * When case the status is bad no data is sent
>
> + *********************************************************/
>
> +typedef struct RepCmdProtectedWrite {
>
> + RepCmdHdr hdr;
>
> + uint64_t volume_id;
>
> + uint64_t offset_sectors;
>
> + /* The size field duplicates the RepCmd size,
>
> + * but it is needed for reporting failed IOs' sizes */
>
> + uint32_t size_sectors;
>
> + int ret_status;
>
> +} RepCmdProtectedWrite;
>
> +
>
> +/*********************************************************
>
> + * RepCmd Report VM volumes
>
> + *
>
> + * REPHUB_CMD_REPORT_VM_VOLUMES
>
> + * Direction: agent->hub
>
> + *
>
> + * The agent reports all the volumes of the VM
>
> + * to the hub.
>
> + *********************************************************/
>
> +typedef struct RepVmVolumeInfo {
>
> + char name[REPHUB_MAX_VOL_NAME_LEN];
>
> + uint64_t volume_id;
>
> + uint32_t size_mb;
>
> +} RepVmVolumeInfo;
>
> +
>
> +typedef struct RepCmdReportVmVolumes {
>
> + RepCmdHdr hdr;
>
> + int num_volumes;
>
> +} RepCmdReportVmVolumes;
>
> +
>
> +typedef struct RepCmdDataReportVmVolumes {
>
> + RepVmVolumeInfo volumes[0];
>
> +} RepCmdDataReportVmVolumes;
>
> +
>
> +
>
> +/*********************************************************
>
> + * RepCmd Start protect
>
> + *
>
> + * REPHUB_CMD_START_PROTECT
>
> + * Direction: hub->agent
>
> + *
>
> + * The hub instructs the agent to start protecting
>
> + * a volume. When a volume is protected all its writes
>
> + * are sent to to the hub.
>
> + * With this command the hub also assigns a volume ID to
>
> + * the given volume name.
>
> + *********************************************************/
>
> +typedef struct RepCmdStartProtect {
>
> + RepCmdHdr hdr;
>
> + uint64_t volume_id;
>
> +} RepCmdStartProtect;
>
> +
>
> +typedef struct RepCmdDataStartProtect {
>
> + char volume_name[REPHUB_MAX_VOL_NAME_LEN];
>
> +} RepCmdDataStartProtect;
>
> +
>
> +
>
> +/*********************************************************
>
> + * RepCmd Read Volume Request
>
> + *
>
> + * REPHUB_CMD_READ_VOL_REQ
>
> + * Direction: hub->agent
>
> + *
>
> + * The hub issues a read IO to a protected volume.
>
> + * This command is used during sync - when the hub needs
>
> + * to read unsyncronized sections of a protected volume.
>
> + * This command is a request, the read data is returned
>
> + * by the response command REPHUB_CMD_READ_VOL_RES
>
> + *********************************************************/
>
> +typedef struct RepCmdReadVolReq {
>
> + RepCmdHdr hdr;
>
> + int req_id;
>
> + int size_sectors;
>
> + uint64_t volume_id;
>
> + uint64_t offset_sectors;
>
> +} RepCmdReadVolReq;
>
> +
>
> +/*********************************************************
>
> + * RepCmd Read Volume Response
>
> + *
>
> + * REPHUB_CMD_READ_VOL_RES
>
> + * Direction: agent->hub
>
> + *
>
> + * A response to REPHUB_CMD_READ_VOL_REQ.
>
> + * Sends the data read from a protected volume
>
> + *********************************************************/
>
> +typedef struct RepCmdReadVolRes {
>
> + RepCmdHdr hdr;
>
> + int req_id;
>
> + int is_status_success;
>
> + uint64_t volume_id;
>
> +} RepCmdReadVolRes;
>
> +
>
> +/*********************************************************
>
> + * RepCmd Agent shutdown
>
> + *
>
> + * REPHUB_CMD_AGENT_SHUTDOWN
>
> + * Direction: agent->hub
>
> + *
>
> + * Notifies the hub that the agent is about to shutdown.
>
> + * This allows a graceful shutdown. Any disconnection
>
> + * of an agent without sending this command will result
>
> + * in a full sync of the VM volumes.
>
> + *********************************************************/
>
> +typedef struct RepCmdAgentShutdown {
>
> + RepCmdHdr hdr;
>
> +} RepCmdAgentShutdown;
>
> +
>
> +
>
> +#endif /* REPHUB_CMDS_H */
>
> diff --git a/replication/rephub_defs.h b/replication/rephub_defs.h
>
> new file mode 100644
>
> index 0000000..e34e0ce
>
> --- /dev/null
>
> +++ b/replication/rephub_defs.h
>
> @@ -0,0 +1,40 @@
>
> +/*
>
> + * QEMU System Emulator
>
> + *
>
> + * Copyright (c) 2003-2008 Fabrice Bellard
>
> + *
>
> + * Permission is hereby granted, free of charge, to any person obtaining a
> copy
>
> + * of this software and associated documentation files (the "Software"),
> to deal
>
> + * in the Software without restriction, including without limitation the
> rights
>
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or
> sell
>
> + * copies of the Software, and to permit persons to whom the Software is
>
> + * furnished to do so, subject to the following conditions:
>
> + *
>
> + * The above copyright notice and this permission notice shall be included
> in
>
> + * all copies or substantial portions of the Software.
>
> + *
>
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
> OR
>
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
>
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
>
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> OTHER
>
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
> FROM,
>
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
> IN
>
> + * THE SOFTWARE.
>
> + */
>
> +#ifndef REP_HUB_DEFS_H
>
> +#define REP_HUB_DEFS_H
>
> +
>
> +#include<stdint.h>
>
> +
>
> +#define REPHUB_MAX_VOL_NAME_LEN (1024)
>
> +#define REPHUB_MAX_NUM_VOLUMES (512)
>
> +
>
> +#ifndef TRUE
>
> + #define TRUE (1)
>
> +#endif
>
> +
>
> +#ifndef FALSE
>
> + #define FALSE (0)
>
> +#endif
>
> +
>
> +#endif /* REP_HUB_DEFS_H */
>
> diff --git a/vl.c b/vl.c
>
> index 624da0f..506b5dc 100644
>
> --- a/vl.c
>
> +++ b/vl.c
>
> @@ -167,6 +167,7 @@ int main(int argc, char **argv)
>
> #include "ui/qemu-spice.h"
>
> +#include "replication/repagent.h"
>
> //#define DEBUG_NET
>
> //#define DEBUG_SLIRP
>
> @@ -2307,6 +2308,15 @@ int main(int argc, char **argv, char **envp)
>
> drive_add(IF_DEFAULT, popt->index - QEMU_OPTION_hda,
> optarg,
>
> HD_OPTS);
>
> break;
>
> + case QEMU_OPTION_repagent:
>
> +#ifdef CONFIG_REPLICATION
>
> + repagent_init(optarg, 0);
>
> +#else
>
> + fprintf(stderr, "Replication support is disabled. "
>
> + "Don't use -repagent option.\n");
>
> + exit(1);
>
> +#endif
>
> + break;
>
> case QEMU_OPTION_drive:
>
> if (drive_def(optarg) == NULL) {
>
> exit(1);
>
next prev parent reply other threads:[~2012-02-07 12:13 UTC|newest]
Thread overview: 66+ messages / expand[flat|nested] mbox.gz Atom feed top
2012-02-07 10:29 [Qemu-devel] [RFC PATCH] replication agent module Ori Mamluk
2012-02-07 12:12 ` Anthony Liguori [this message]
2012-02-07 12:25 ` Dor Laor
2012-02-07 12:30 ` Ori Mamluk
2012-02-07 12:40 ` Anthony Liguori
2012-02-07 14:06 ` Ori Mamluk
2012-02-07 14:40 ` Paolo Bonzini
2012-02-07 14:48 ` Ori Mamluk
2012-02-07 15:47 ` Paolo Bonzini
2012-02-08 6:10 ` Ori Mamluk
2012-02-08 8:49 ` Dor Laor
2012-02-08 11:59 ` Stefan Hajnoczi
2012-02-08 8:55 ` Kevin Wolf
2012-02-08 9:47 ` Ori Mamluk
2012-02-08 10:04 ` Kevin Wolf
2012-02-08 13:28 ` [Qemu-devel] [RFC] Replication agent design (was [RFC PATCH] replication agent module) Ori Mamluk
2012-02-08 14:59 ` Stefan Hajnoczi
2012-02-08 14:59 ` Stefan Hajnoczi
2012-02-19 13:40 ` Ori Mamluk
2012-02-20 14:32 ` Paolo Bonzini
2012-02-21 9:03 ` [Qemu-devel] BlockDriverState stack and BlockListeners (was: [RFC] Replication agent design) Kevin Wolf
2012-02-21 9:15 ` [Qemu-devel] BlockDriverState stack and BlockListeners Paolo Bonzini
2012-02-21 9:49 ` Kevin Wolf
2012-02-21 10:09 ` Paolo Bonzini
2012-02-21 10:51 ` Kevin Wolf
2012-02-21 11:36 ` Paolo Bonzini
2012-02-21 12:22 ` Stefan Hajnoczi
2012-02-21 12:57 ` Paolo Bonzini
2012-02-21 15:49 ` Markus Armbruster
2012-02-21 13:10 ` Kevin Wolf
2012-02-21 13:21 ` Paolo Bonzini
2012-02-21 15:56 ` Markus Armbruster
2012-02-21 16:04 ` Kevin Wolf
2012-02-21 16:19 ` Markus Armbruster
2012-02-21 16:39 ` Kevin Wolf
2012-02-21 17:16 ` Stefan Hajnoczi
2012-02-21 10:20 ` Ori Mamluk
2012-02-29 8:38 ` Ori Mamluk
2012-03-03 11:46 ` Stefan Hajnoczi
2012-03-04 5:14 ` Ori Mamluk
2012-03-04 8:56 ` Paolo Bonzini
2012-03-05 12:04 ` Stefan Hajnoczi
2012-02-08 11:02 ` [Qemu-devel] [RFC PATCH] replication agent module Stefan Hajnoczi
2012-02-08 13:00 ` [Qemu-devel] [RFC] Replication agent requirements (was [RFC PATCH] replication agent module) Ori Mamluk
2012-02-08 13:30 ` Anthony Liguori
2012-02-08 12:03 ` [Qemu-devel] [RFC PATCH] replication agent module Stefan Hajnoczi
2012-02-08 12:46 ` Paolo Bonzini
2012-02-08 14:39 ` Stefan Hajnoczi
2012-02-08 14:55 ` Paolo Bonzini
2012-02-08 15:07 ` Stefan Hajnoczi
2012-02-07 14:53 ` Kevin Wolf
2012-02-07 15:00 ` Anthony Liguori
2012-02-07 13:34 ` Kevin Wolf
2012-02-07 13:50 ` Stefan Hajnoczi
2012-02-07 13:58 ` Paolo Bonzini
2012-02-07 14:05 ` Paolo Bonzini
2012-02-08 12:17 ` Orit Wasserman
2012-02-07 14:18 ` Ori Mamluk
2012-02-07 14:59 ` Anthony Liguori
2012-02-07 15:20 ` Stefan Hajnoczi
2012-02-07 16:25 ` Anthony Liguori
2012-02-21 16:01 ` Markus Armbruster
2012-02-21 17:31 ` Stefan Hajnoczi
2012-02-07 14:45 ` Ori Mamluk
2012-02-08 12:29 ` Orit Wasserman
2012-02-08 11:45 ` Luiz Capitulino
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=4F31153E.9010205@codemonkey.ws \
--to=anthony@codemonkey.ws \
--cc=dlaor@redhat.com \
--cc=kwolf@redhat.com \
--cc=omamluk@zerto.com \
--cc=qemu-devel@nongnu.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).