From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([140.186.70.92]:53404) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1Rujuq-0003YL-RR for qemu-devel@nongnu.org; Tue, 07 Feb 2012 07:13:08 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1Rujul-0006w4-G3 for qemu-devel@nongnu.org; Tue, 07 Feb 2012 07:12:56 -0500 Received: from mail-tul01m020-f173.google.com ([209.85.214.173]:42411) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1Rujul-0006vw-4Q for qemu-devel@nongnu.org; Tue, 07 Feb 2012 07:12:51 -0500 Received: by obbup16 with SMTP id up16so10474331obb.4 for ; Tue, 07 Feb 2012 04:12:50 -0800 (PST) Message-ID: <4F31153E.9010205@codemonkey.ws> Date: Tue, 07 Feb 2012 06:12:46 -0600 From: Anthony Liguori MIME-Version: 1.0 References: <73865e0ce364c40e0eb65ec6b22b819d@mail.gmail.com> In-Reply-To: <73865e0ce364c40e0eb65ec6b22b819d@mail.gmail.com> Content-Type: text/plain; charset=windows-1252; format=flowed Content-Transfer-Encoding: 8bit Subject: Re: [Qemu-devel] [RFC PATCH] replication agent module List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Ori Mamluk Cc: Kevin Wolf , dlaor@redhat.com, qemu-devel@nongnu.org Hi, On 02/07/2012 04:29 AM, Ori Mamluk wrote: > Repagent is a new module that allows an external replication system to > replicate a volume of a Qemu VM. > > This RFC patch adds the repagent client module to Qemu. Please read http://wiki.qemu.org/Contribute/SubmitAPatch In particular, use a tool like git-send-email and split this patch up into more manageable chunks. Is there an Open Source rehub available? As a project policy, adding external APIs specifically for proprietary software is not something we're willing to do. Regards, Anthony Liguori > Documentation of the module role and API is in the patch at > replication/qemu-repagent.txt > > > > The main motivation behind the module is to allow replication of VMs in a > virtualization environment like RhevM. > > To achieve this we need basic replication support in Qemu. > > > > This is the first submission of this module, which was written as a Proof > Of Concept, and used successfully for replicating and recovering a Qemu VM. > > Points and open issues: > > * The module interfaces the Qemu storage stack at block.c > generic layer. Is this the right place to intercept/inject IOs? > > * The patch contains performing IO reads invoked by a new > thread (a TCP listener thread). See repaget_read_vol in repagent.c. It is > not protected by any lock – is this OK? > > * VM ID – the replication system implies an environment with > several VMs connected to a central replication system (Rephub). > > This requires some sort of identification for a VM. The > current patch does not include a VM ID – I did not find any adequate ID to > use. > > Any suggestions? > > > > Appreciate any feedback or suggestions. Thanks, > > Ori. > > > > > > From 5a0d88689ddcf325f25fdfca2a2012f1bbf141b9 Mon Sep 17 00:00:00 2001 > > From: Ori Mamluk > > Date: Tue, 7 Feb 2012 11:12:12 +0200 > > Subject: [PATCH] Added replication agent module (repagent) to Qemu under > > replication directory, added repagent configure and run > > options, and the repagent API usage in bloc > > > > Added build options to ./configure: --enable-replication --disable-replicat > > Added a commandline option to enable: -repagent > > Added the module files under replication. > > > > Signed-off-by: Ori Mamluk > > --- > > Makefile | 9 +- > > Makefile.objs | 6 + > > block.c | 20 +++- > > configure | 11 ++ > > qemu-options.hx | 6 + > > replication/qemu-repagent.txt | 104 +++++++++++++ > > replication/repagent.c | 322 > +++++++++++++++++++++++++++++++++++++++++ > > replication/repagent.h | 46 ++++++ > > replication/repagent_client.c | 138 ++++++++++++++++++ > > replication/repagent_client.h | 36 +++++ > > replication/repcmd.h | 59 ++++++++ > > replication/repcmd_listener.c | 137 +++++++++++++++++ > > replication/repcmd_listener.h | 32 ++++ > > replication/rephub_cmds.h | 150 +++++++++++++++++++ > > replication/rephub_defs.h | 40 +++++ > > vl.c | 10 ++ > > 16 files changed, 1121 insertions(+), 5 deletions(-) > > mode change 100644 => 100755 Makefile.objs > > mode change 100644 => 100755 qemu-options.hx > > create mode 100755 replication/qemu-repagent.txt > > create mode 100644 replication/repagent.c > > create mode 100644 replication/repagent.h > > create mode 100644 replication/repagent_client.c > > create mode 100644 replication/repagent_client.h > > create mode 100644 replication/repcmd.h > > create mode 100644 replication/repcmd_listener.c > > create mode 100644 replication/repcmd_listener.h > > create mode 100644 replication/rephub_cmds.h > > create mode 100644 replication/rephub_defs.h > > > > diff --git a/Makefile b/Makefile > > index 4f6eaa4..a1b3701 100644 > > --- a/Makefile > > +++ b/Makefile > > @@ -149,9 +149,9 @@ qemu-img.o qemu-tool.o qemu-nbd.o qemu-io.o cmd.o > qemu-ga.o: $(GENERATED_HEADERS > > tools-obj-y = qemu-tool.o $(oslib-obj-y) $(trace-obj-y) \ > > qemu-timer-common.o cutils.o > > -qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y) > > -qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y) > > -qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y) > > +qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y) > $(replication-obj-y) > > +qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y) > $(replication-obj-y) > > +qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y) > $(replication-obj-y) > > qemu-img-cmds.h: $(SRC_PATH)/qemu-img-cmds.hx > > $(call quiet-command,sh $(SRC_PATH)/scripts/hxtool -h< $< > > $@," GEN $@") > > @@ -228,6 +228,7 @@ clean: > > rm -f trace-dtrace.dtrace trace-dtrace.dtrace-timestamp > > rm -f trace-dtrace.h trace-dtrace.h-timestamp > > rm -rf $(qapi-dir) > > + rm -f replication/*.{o,d} > > $(MAKE) -C tests clean > > for d in $(ALL_SUBDIRS) $(QEMULIBS) libcacard; do \ > > if test -d $$d; then $(MAKE) -C $$d $@ || exit 1; fi; \ > > @@ -387,4 +388,4 @@ tar: > > rm -rf /tmp/$(FILE) > > # Include automatically generated dependency files > > --include $(wildcard *.d audio/*.d slirp/*.d block/*.d net/*.d ui/*.d > qapi/*.d qga/*.d) > > +-include $(wildcard *.d audio/*.d slirp/*.d block/*.d net/*.d ui/*.d > qapi/*.d qga/*.d replication/*.d) > > diff --git a/Makefile.objs b/Makefile.objs > > old mode 100644 > > new mode 100755 > > index d7a6539..dbd6f15 > > --- a/Makefile.objs > > +++ b/Makefile.objs > > @@ -74,6 +74,7 @@ fsdev-obj-$(CONFIG_VIRTFS) += $(addprefix fsdev/, > $(fsdev-nested-y)) > > # CPUs and machines. > > common-obj-y = $(block-obj-y) blockdev.o > > +common-obj-y += $(replication-obj-$(CONFIG_REPLICATION)) > > common-obj-y += $(net-obj-y) > > common-obj-y += $(qobject-obj-y) > > common-obj-$(CONFIG_LINUX) += $(fsdev-obj-$(CONFIG_LINUX)) > > @@ -413,6 +414,11 @@ common-obj-y += qmp-marshal.o qapi-visit.o > qapi-types.o $(qapi-obj-y) > > common-obj-y += qmp.o hmp.o > > ###################################################################### > > +# replication > > +replication-nested-y = repagent_client.o repagent.o repcmd_listener.o > > +replication-obj-y = $(addprefix replication/, $(replication-nested-y)) > > + > > +###################################################################### > > # guest agent > > qga-nested-y = guest-agent-commands.o guest-agent-command-state.o > > diff --git a/block.c b/block.c > > index 9bb236c..f3b8387 100644 > > --- a/block.c > > +++ b/block.c > > @@ -31,6 +31,10 @@ > > #include "qemu-coroutine.h" > > #include "qmp-commands.h" > > +#ifdef CONFIG_REPLICATION > > +#include "replication/repagent.h" > > +#endif > > + > > #ifdef CONFIG_BSD > > #include > > #include > > @@ -640,6 +644,9 @@ int bdrv_open(BlockDriverState *bs, const char > *filename, int flags, > > goto unlink_and_fail; > > } > > +#ifdef CONFIG_REPLICATION > > + repagent_register_drive(filename, bs); > > +#endif > > /* Open the image */ > > ret = bdrv_open_common(bs, filename, flags, drv); > > if (ret< 0) { > > @@ -1292,6 +1299,17 @@ static int coroutine_fn > bdrv_co_do_writev(BlockDriverState *bs, > > ret = drv->bdrv_co_writev(bs, sector_num, nb_sectors, qiov); > > + > > +#ifdef CONFIG_REPLICATION > > + if (bs->device_name[0] != '\0') { > > + /* We split the IO only at the highest stack driver layer. > > + Currently we know that by checking device_name - only > > + highest level (closest to the guest) has that name. > > + */ > > + repagent_handle_protected_write(bs, sector_num, > > + nb_sectors, qiov, ret); > > + } > > +#endif > > if (bs->dirty_bitmap) { > > set_dirty_bitmap(bs, sector_num, nb_sectors, 1); > > } > > @@ -1783,7 +1801,7 @@ int bdrv_has_zero_init(BlockDriverState *bs) > > * 'nb_sectors' is the max value 'pnum' should be set to. > > */ > > int bdrv_is_allocated(BlockDriverState *bs, int64_t sector_num, int > nb_sectors, > > - int *pnum) > > + int *pnum) > > { > > int64_t n; > > if (!bs->drv->bdrv_is_allocated) { > > diff --git a/configure b/configure > > index 9e5da44..93d600e 100755 > > --- a/configure > > +++ b/configure > > @@ -179,6 +179,7 @@ spice="" > > rbd="" > > smartcard="" > > smartcard_nss="" > > +replication="" > > usb_redir="" > > opengl="" > > zlib="yes" > > @@ -772,6 +773,10 @@ for opt do > > ;; > > --enable-smartcard-nss) smartcard_nss="yes" > > ;; > > + --disable-replication) replication="no" > > + ;; > > + --enable-replication) replication="yes" > > + ;; > > --disable-usb-redir) usb_redir="no" > > ;; > > --enable-usb-redir) usb_redir="yes" > > @@ -1067,6 +1072,7 @@ echo " --disable-usb-redir disable usb network > redirection support" > > echo " --enable-usb-redir enable usb network redirection support" > > echo " --disable-guest-agent disable building of the QEMU Guest Agent" > > echo " --enable-guest-agent enable building of the QEMU Guest Agent" > > +echo " --enable-replication enable replication support" > > echo "" > > echo "NOTE: The object files are built at the place where configure is > launched" > > exit 1 > > @@ -2733,6 +2739,7 @@ echo "curl support $curl" > > echo "check support $check_utests" > > echo "mingw32 support $mingw32" > > echo "Audio drivers $audio_drv_list" > > +echo "Replication $replication" > > echo "Extra audio cards $audio_card_list" > > echo "Block whitelist $block_drv_whitelist" > > echo "Mixer emulation $mixemu" > > @@ -3080,6 +3087,10 @@ if test "$smartcard_nss" = "yes" ; then > > echo "CONFIG_SMARTCARD_NSS=y">> $config_host_mak > > fi > > +if test "$replication" = "yes" ; then > > + echo "CONFIG_REPLICATION=y">> $config_host_mak > > +fi > > + > > if test "$usb_redir" = "yes" ; then > > echo "CONFIG_USB_REDIR=y">> $config_host_mak > > fi > > diff --git a/qemu-options.hx b/qemu-options.hx > > old mode 100644 > > new mode 100755 > > index 681eaf1..c97e4f8 > > --- a/qemu-options.hx > > +++ b/qemu-options.hx > > @@ -2602,3 +2602,9 @@ HXCOMM This is the last statement. Insert new options > before this line! > > STEXI > > @end table > > ETEXI > > + > > +DEF("repagent", HAS_ARG, QEMU_OPTION_repagent, > > + "-repagent [hub IP/name]\n" > > + " Enable replication support for disks\n" > > + " hub is the ip or name of the machine running the > replication hub.\n", > > + QEMU_ARCH_ALL) > > diff --git a/replication/qemu-repagent.txt b/replication/qemu-repagent.txt > > new file mode 100755 > > index 0000000..e3b0c1e > > --- /dev/null > > +++ b/replication/qemu-repagent.txt > > @@ -0,0 +1,104 @@ > > + repagent - replication agent - a Qemu module for enabling > continuous async replication of VM volumes > > + > > +Introduction > > + This document describes a feature in Qemu - a replication > agent (AKA Repagent). > > + The Repagent is a new module that exposes an API to an > external replication system (AKA Rephub). > > + This API allows a Rephub to communicate with a Qemu VM and > continuously replicate its volumes. > > + The imlementation of a Rephub is outside of the scope of this > document. There may be several various Rephub > > + implenetations using the same repagent in Qemu. > > + > > +Main feature of Repagent > > + Repagent does the following: > > + * Report volumes - report a list of all volumes in a VM to > the Rephub. > > + * Report writes to a volume - send all writes made to a > protected volume to the Rephub. > > + The reporting of an IO is asyncronuous - i.e. > the IO is not delayed by the Repagent to get any acknowledgement from the > Rephub. > > + It is only copied to the Rephub. > > + * Read a protected volume - allows the Rephub to read a > protected volume, to enable the protected hub to syncronize the content of > a protected volume. > > + > > +Description of the Repagent module > > + > > +Build and run options > > + New configure option: --enable-replication > > + New command line option: > > + -repagent [hub IP/name] > > + > Enable replication support for disks > > + > hub is the ip or name of the machine running the replication hub. > > + > > +Module APIs > > + The Repagent module interfaces two main components: > > + 1. The Rephub - An external API based on socket messages > > + 2. The generic block layer- block.c > > + > > + Rephub message API > > + The external replication API is a message > based API. > > + We won't go into the structure of the > messages here - just the sematics. > > + > > + Messages list > > + (The updated list and > comments are in Rephub_cmds.h) > > + > > + Messages from the Repagent to > the Rephub: > > + * Protected write > > + The Repagent > sends each write to a protected volume to the hub with the IO status. > > + In case the > status is bad the write content is not sent > > + * Report VM volumes > > + The agent > reports all the volumes of the VM to the hub. > > + * Read Volume Response > > + A response to > a Read Volume Request > > + Sends the > data read from a protected volume to the hub > > + * Agent shutdown > > + Notifies the > hub that the agent is about to shutdown. > > + This allows a > graceful shutdown. Any disconnection of an agent without > > + sending this > command will result in a full sync of the VM volumes. > > + > > + Messages from the Rephub to > the Repagent: > > + * Start protect > > + The hub > instructs the agent to start protecting a volume. When a volume is protected > > + all its > writes are sent to to the hub. > > + With this > command the hub also assigns a volume ID to the given volume name. > > + * Read volume request > > + The hub > issues a read IO to a protected volume. > > + This command > is used during sync - when the hub needs to read unsyncronized > > + sections of a > protected volume. > > + This command > is a request, the read data is returned by the read volume response message > (see above). > > + block.c API > > + The API to the generic block storage layer > contains 3 functionalities: > > + 1. Handle writes to protected volumes > > + In bdrv_co_do_writev, each > write is reported to the Repagent module. > > + 2. Handle each new volume that registers > > + In bdrv_open - each new > bottom-level block driver that registers is reported. > > + 2. Read from a volume > > + Repagent calls bdrv_aio_readv > to handle read requests coming from the hub. > > + > > + > > +General description of a Rephub - a replication system the repagent > connects to > > + This section describes in high level a sample Rephub - a > replication system that uses the repagent API > > + to replicate disks. > > + It describes a simple Rephub that comntinuously maintains a > mirror of the volumes of a VM. > > + > > + Say we have a VM we want to protect - call it PVM, say it has > 2 volumes - V1, V2. > > + Our Rephub is called SingleRephub - a Rephub protecting a > single VM. > > + > > + Preparations > > + 1. The user chooses a host to rub SingleRephub - a different > host than PVM, call it Host2 > > + 2. The user creates two volumes on Host2 - same sizes of V1 > and V2, call them V1R (V1 recovery) and V2R. > > + 3. The user runs SingleRephub process on Host2, and gives V1R > and V2R as command line arguments. > > + From now on SingleRephub waits for the > protected VM repagent to connect. > > + 4. The user runs the protected VM PVM - and uses the switch > -repagent. > > + > > + Runtime > > + 1. The repagent module connects to SingleRephub on startup. > > + 2. repagent reports V1 and V2 to SingleRephub. > > + 3. SingleRephub starts to perform an initial synchronization > of the protected volumes- > > + it reads each protected volume (V1 and V2) - > using read volume requests - and copies the data into the > > + recovery volume V1R and V2R. > > + 4. SingleRephub enters 'protection' mode - each write to the > protected volume is sent by the repagent to the Rephub, > > + and the Rephub performs the write on the > matching recovery volume. > > + > > + * Note that during stage 3 writes to the protected volumes > are not ignored - they're kept in a bitmap, > > + and will be read again when stage 3 ends, in > an interative convergin process. > > + > > + This flow continuously maintains an updated recovery volume. > > + If the protected system is damaged, the user can create a new > VM on Host2 with the replicated volumes attached to it. > > + The new VM is a replica of the protected system. > > + > > + > > diff --git a/replication/repagent.c b/replication/repagent.c > > new file mode 100644 > > index 0000000..c66eae7 > > --- /dev/null > > +++ b/replication/repagent.c > > @@ -0,0 +1,322 @@ > > +#include > > +#include > > +#include > > +#include > > +#include > > + > > +#include "block.h" > > +#include "rephub_defs.h" > > +#include "block_int.h" > > +#include "repagent_client.h" > > +#include "repagent.h" > > +#include "rephub_cmds.h" > > + > > +#define ZERO_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj)) > > +#define REPAGENT_MAX_NUM_VOLUMES (64) > > +#define REPAGENT_VOLUME_ID_NONE (0) > > + > > +typedef struct RepagentVolume { > > + uint64_t vol_id; > > + const char *vol_path; > > + BlockDriverState *driver_ptr; > > +} RepagentVolume; > > + > > +struct RepAgentState { > > + int is_init; > > + int num_volumes; > > + RepagentVolume * volumes[REPAGENT_MAX_NUM_VOLUMES]; > > +}; > > + > > +typedef struct RepagentReadVolIo { > > + QEMUIOVector qiov; > > + RepCmdReadVolReq rep_cmd; > > + uint8_t *buf; > > + struct timeval start_time; > > +} RepagentReadVolIo; > > + > > +static int repagent_get_volume_by_name(const char *name); > > +static void repagent_report_volumes_to_hub(void); > > +static void repagent_vol_read_done(void *opaque, int ret); > > +static struct timeval tsub(struct timeval t1, struct timeval t2); > > + > > +RepAgentState g_rep_agent = { 0 }; > > + > > +void repagent_init(const char *hubname, int port) > > +{ > > + /* It is the responsibility of the thread to free this struct */ > > + rephub_params *pParams = (rephub_params > *)g_malloc(sizeof(rephub_params)); > > + if (hubname == NULL) { > > + hubname = "127.0.0.1"; > > + } > > + if (port == 0) { > > + port = 9010; > > + } > > + > > + printf("repagent_init %s\n", hubname); > > + > > + pParams->port = port; > > + pParams->name = g_strdup(hubname); > > + > > + pthread_t thread_id = 0; > > + > > + /* Create the repagent client listener thread */ > > + pthread_create(&thread_id, 0, repagent_listen, (void *) pParams); > > + pthread_detach(thread_id); > > +} > > + > > +void repagent_register_drive(const char *drive_path, > > + BlockDriverState *driver_ptr) > > +{ > > + int i; > > + for (i = 0; i< g_rep_agent.num_volumes ; i++) { > > + RepagentVolume *vol = g_rep_agent.volumes[i]; > > + if (vol != NULL) { > > + assert( > > + strcmp(drive_path, vol->vol_path) != 0 > > +&& driver_ptr != vol->driver_ptr); > > + } > > + } > > + > > + assert(g_rep_agent.num_volumes< REPAGENT_MAX_NUM_VOLUMES); > > + > > + printf("zerto repagent: Registering drive. Num drives %d, path %s\n", > > + g_rep_agent.num_volumes, drive_path); > > + g_rep_agent.volumes[i] = > > + (RepagentVolume *)g_malloc(sizeof(RepagentVolume)); > > + g_rep_agent.volumes[i]->driver_ptr = driver_ptr; > > + /* orim todo strcpy? */ > > + g_rep_agent.volumes[i]->vol_path = drive_path; > > + > > + /* Orim todo thread-safety? */ > > + g_rep_agent.num_volumes++; > > + > > + repagent_report_volumes_to_hub(); > > +} > > + > > +/* orim todo destruction? */ > > + > > +static RepagentVolume *repagent_get_protected_volume_by_driver( > > + BlockDriverState *bs) > > +{ > > + /* orim todo optimize search */ > > + int i = 0; > > + for (i = 0; i< g_rep_agent.num_volumes ; i++) { > > + RepagentVolume *p_vol = g_rep_agent.volumes[i]; > > + if (p_vol != NULL&& p_vol->driver_ptr == (void *) bs) { > > + return p_vol; > > + } > > + } > > + return NULL; > > +} > > + > > +void repagent_handle_protected_write(BlockDriverState *bs, int64_t > sector_num, > > + int nb_sectors, QEMUIOVector *qiov, int ret_status) > > +{ > > + printf("zerto Protected write offset %lld, size %d, IO return status > %d", > > + (long long int) sector_num, nb_sectors, ret_status); > > + if (bs->filename != NULL) { > > + printf(", filename %s", bs->filename); > > + } > > + > > + printf("\n"); > > + > > + RepagentVolume *p_vol = repagent_get_protected_volume_by_driver(bs); > > + if (p_vol == NULL || p_vol->vol_id == REPAGENT_VOLUME_ID_NONE) { > > + /* Unprotected */ > > + printf("Got a write to an unprotected volume.\n"); > > + return; > > + } > > + > > + /* Report IO to rephub */ > > + > > + int data_size = qiov->size; > > + if (ret_status< 0) { > > + /* On failed ios we don't send the data to the hub */ > > + data_size = 0; > > + } > > + uint8_t *pdata = NULL; > > + RepCmdProtectedWrite *p_cmd = (RepCmdProtectedWrite *) repcmd_new( > > + REPHUB_CMD_PROTECTED_WRITE, data_size, (uint8_t **)&pdata); > > + > > + if (ret_status>= 0) { > > + qemu_iovec_to_buffer(qiov, pdata); > > + } > > + > > + p_cmd->volume_id = p_vol->vol_id; > > + p_cmd->offset_sectors = sector_num; > > + p_cmd->size_sectors = nb_sectors; > > + p_cmd->ret_status = ret_status; > > + > > + if (repagent_client_send((RepCmd *) p_cmd) != 0) { > > + printf("Error sending command\n"); > > + } > > +} > > + > > +static void repagent_report_volumes_to_hub(void) > > +{ > > + /* Report IO to rephub */ > > + int i; > > + RepCmdDataReportVmVolumes *p_cmd_data = NULL; > > + RepCmdReportVmVolumes *p_cmd = (RepCmdReportVmVolumes *) repcmd_new( > > + REPHUB_CMD_REPORT_VM_VOLUMES, > > + g_rep_agent.num_volumes * sizeof(RepVmVolumeInfo), > > + (uint8_t **)&p_cmd_data); > > + p_cmd->num_volumes = g_rep_agent.num_volumes; > > + printf("reporting %u volumes\n", g_rep_agent.num_volumes); > > + for (i = 0; i< g_rep_agent.num_volumes ; i++) { > > + assert(g_rep_agent.volumes[i] != NULL); > > + printf("reporting volume %s size %u\n", > > + g_rep_agent.volumes[i]->vol_path, > > + (uint32_t) sizeof(p_cmd_data->volumes[i].name)); > > + strncpy((char *) p_cmd_data->volumes[i].name, > > + g_rep_agent.volumes[i]->vol_path, > > + sizeof(p_cmd_data->volumes[i].name)); > > + p_cmd_data->volumes[i].volume_id = g_rep_agent.volumes[i]->vol_id; > > + } > > + if (repagent_client_send((RepCmd *) p_cmd) != 0) { > > + printf("Error sending command\n"); > > + } > > +} > > + > > +int repaget_start_protect(RepCmdStartProtect *pcmd, > > + RepCmdDataStartProtect *pcmd_data) > > +{ > > + printf("Start protect vol %s, ID %llu\n", pcmd_data->volume_name, > > + (unsigned long long) pcmd->volume_id); > > + int vol_index = repagent_get_volume_by_name(pcmd_data->volume_name); > > + if (vol_index< 0) { > > + printf("The volume doesn't exist\n"); > > + return TRUE; > > + } > > + /* orim todo protect */ > > + g_rep_agent.volumes[vol_index]->vol_id = pcmd->volume_id; > > + > > + return TRUE; > > +} > > + > > +static int repagent_get_volume_by_name(const char *name) > > +{ > > + int i = 0; > > + for (i = 0; i< g_rep_agent.num_volumes ; i++) { > > + if (g_rep_agent.volumes[i] != NULL > > +&& strcmp(name, g_rep_agent.volumes[i]->vol_path) == 0) { > > + return i; > > + } > > + } > > + return -1; > > +} > > + > > +static int repagent_get_volume_by_id(uint64_t vol_id) > > +{ > > + int i = 0; > > + for (i = 0; i< g_rep_agent.num_volumes ; i++) { > > + if (g_rep_agent.volumes[i] != NULL > > +&& g_rep_agent.volumes[i]->vol_id == vol_id) { > > + return i; > > + } > > + } > > + return -1; > > +} > > + > > +int repaget_read_vol(RepCmdReadVolReq *pcmd, uint8_t *pdata) > > +{ > > + int index = repagent_get_volume_by_id(pcmd->volume_id); > > + int size_bytes = pcmd->size_sectors * 512; > > + if (index< 0) { > > + printf("Vol read - Could not find vol id %llu\n", > > + (unsigned long long int) pcmd->volume_id); > > + RepCmdReadVolRes *p_res_cmd = (RepCmdReadVolRes *) repcmd_new( > > + REPHUB_CMD_READ_VOL_RES, 0, NULL); > > + p_res_cmd->req_id = pcmd->req_id; > > + p_res_cmd->volume_id = pcmd->volume_id; > > + p_res_cmd->is_status_success = FALSE; > > + repagent_client_send((RepCmd *) p_res_cmd); > > + return TRUE; > > + } > > + > > + printf("Vol read - driver %p, volId %llu, offset %llu, size %u\n", > > + g_rep_agent.volumes[index]->driver_ptr, > > + (unsigned long long int) pcmd->volume_id, > > + (unsigned long long int) pcmd->offset_sectors, > pcmd->size_sectors); > > + > > + { > > + RepagentReadVolIo *read_xact = calloc(1, > sizeof(RepagentReadVolIo)); > > + > > +/* BlockDriverAIOCB *acb; */ > > + > > + ZERO_MEM_OBJ(read_xact); > > + > > + qemu_iovec_init(&read_xact->qiov, 1); > > + > > + /*read_xact->buf = > > + qemu_blockalign(g_rep_agent.volumes[index]->driver_ptr, > size_bytes); */ > > + read_xact->buf = (uint8_t *) g_malloc(size_bytes); > > + read_xact->rep_cmd = *pcmd; > > + qemu_iovec_add(&read_xact->qiov, read_xact->buf, size_bytes); > > + > > + gettimeofday(&read_xact->start_time, NULL); > > + /* orim TODO - use the returned acb to cancel the request on > shutdown */ > > + /*acb = */bdrv_aio_readv(g_rep_agent.volumes[index]->driver_ptr, > > + read_xact->rep_cmd.offset_sectors,&read_xact->qiov, > > + read_xact->rep_cmd.size_sectors, repagent_vol_read_done, > > + read_xact); > > + } > > + > > + return TRUE; > > +} > > + > > +static void repagent_vol_read_done(void *opaque, int ret) > > +{ > > + struct timeval t2; > > + RepagentReadVolIo *read_xact = (RepagentReadVolIo *) opaque; > > + uint8_t *pdata = NULL; > > + RepCmdReadVolRes *pcmd = (RepCmdReadVolRes *) repcmd_new( > > + REPHUB_CMD_READ_VOL_RES, read_xact->rep_cmd.size_sectors * 512, > > +&pdata); > > + pcmd->req_id = read_xact->rep_cmd.req_id; > > + pcmd->volume_id = read_xact->rep_cmd.volume_id; > > + pcmd->is_status_success = FALSE; > > + > > + printf("Protected vol read - volId %llu, offset %llu, size %u\n", > > + (unsigned long long int) read_xact->rep_cmd.volume_id, > > + (unsigned long long int) read_xact->rep_cmd.offset_sectors, > > + read_xact->rep_cmd.size_sectors); > > + gettimeofday(&t2, NULL); > > + > > + if (ret>= 0) { > > + /* Read response - send the data to the hub */ > > + t2 = tsub(t2, read_xact->start_time); > > + printf("Read prot vol done. Took %u seconds, %u us.", > > + (uint32_t) t2.tv_sec, (uint32_t) t2.tv_usec); > > + > > + pcmd->is_status_success = TRUE; > > + /* orim todo optimize - don't copy, use the qiov buffer */ > > + qemu_iovec_to_buffer(&read_xact->qiov, pdata); > > + } else { > > + printf("readv failed: %s\n", strerror(-ret)); > > + } > > + > > + repagent_client_send((RepCmd *) pcmd); > > + > > + /*qemu_vfree(read_xact->buf); */ > > + g_free(read_xact->buf); > > + > > + g_free(read_xact); > > +} > > + > > +static struct timeval tsub(struct timeval t1, struct timeval t2) > > +{ > > + t1.tv_usec -= t2.tv_usec; > > + if (t1.tv_usec< 0) { > > + t1.tv_usec += 1000000; > > + t1.tv_sec--; > > + } > > + t1.tv_sec -= t2.tv_sec; > > + return t1; > > +} > > + > > +void repagent_client_connected(void) > > +{ > > + /* orim todo thread protection */ > > + repagent_report_volumes_to_hub(); > > +} > > diff --git a/replication/repagent.h b/replication/repagent.h > > new file mode 100644 > > index 0000000..98ccbf2 > > --- /dev/null > > +++ b/replication/repagent.h > > @@ -0,0 +1,46 @@ > > +/* > > + * QEMU System Emulator > > + * > > + * Copyright (c) 2003-2008 Fabrice Bellard > > + * > > + * Permission is hereby granted, free of charge, to any person obtaining a > copy > > + * of this software and associated documentation files (the "Software"), > to deal > > + * in the Software without restriction, including without limitation the > rights > > + * to use, copy, modify, merge, publish, distribute, sublicense, and/or > sell > > + * copies of the Software, and to permit persons to whom the Software is > > + * furnished to do so, subject to the following conditions: > > + * > > + * The above copyright notice and this permission notice shall be included > in > > + * all copies or substantial portions of the Software. > > + * > > + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS > OR > > + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, > > + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL > > + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR > OTHER > > + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING > FROM, > > + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS > IN > > + * THE SOFTWARE. > > + */ > > +#ifndef REPAGENT_H > > +#define REPAGENT_H > > +#include > > + > > +#include "qemu-common.h" > > + > > +typedef struct RepAgentState RepAgentState; > > +typedef struct RepCmdStartProtect RepCmdStartProtect; > > +typedef struct RepCmdDataStartProtect RepCmdDataStartProtect; > > +struct RepCmdReadVolReq; > > + > > +void repagent_init(const char *hubname, int port); > > +void repagent_handle_protected_write(BlockDriverState *bs, > > + int64_t sector_num, int nb_sectors, QEMUIOVector *qiov, int > ret_status); > > +void repagent_register_drive(const char *drive_path, > > + BlockDriverState *driver_ptr); > > +int repaget_start_protect(RepCmdStartProtect *pcmd, > > + RepCmdDataStartProtect *pcmd_data); > > +int repaget_read_vol(struct RepCmdReadVolReq *pcmd, uint8_t *pdata); > > +void repagent_client_connected(void); > > + > > + > > +#endif /* REPAGENT_H */ > > diff --git a/replication/repagent_client.c b/replication/repagent_client.c > > new file mode 100644 > > index 0000000..4dd9ea4 > > --- /dev/null > > +++ b/replication/repagent_client.c > > @@ -0,0 +1,138 @@ > > +#include "repcmd.h" > > +#include "rephub_cmds.h" > > +#include "repcmd_listener.h" > > +#include "repagent_client.h" > > +#include "repagent.h" > > + > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > + > > +#define ZERO_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj)) > > + > > +static void repagent_process_cmd(RepCmd *pCmd, uint8_t *pData, void > *clientPtr); > > + > > +typedef struct repagent_client_state { > > + int is_connected; > > + int is_terminate_receive; > > + int hsock; > > +} repagent_client_state; > > + > > +static repagent_client_state g_client_state = { 0 }; > > + > > +void *repagent_listen(void *pParam) > > +{ > > + rephub_params *pServerParams = (rephub_params *) pParam; > > + int host_port = pServerParams->port; > > + const char *host_name = pServerParams->name; > > + > > + printf("Creating repagent listener thread...\n"); > > + g_free(pServerParams); > > + > > + struct sockaddr_in my_addr; > > + > > + int err; > > + int retries = 0; > > + > > + g_client_state.hsock = socket(AF_INET, SOCK_STREAM, 0); > > + if (g_client_state.hsock == -1) { > > + printf("Error initializing socket %d\n", errno); > > + return (void *) -1; > > + } > > + > > + int param = 1; > > + > > + if ((setsockopt(g_client_state.hsock, SOL_SOCKET, SO_REUSEADDR, > > + (char *)¶m, sizeof(int)) == -1) > > + || (setsockopt(g_client_state.hsock, SOL_SOCKET, SO_KEEPALIVE, > > + (char *)¶m, sizeof(int)) == -1)) { > > + printf("Error setting options %d\n", errno); > > + return (void *) -1; > > + } > > + > > + my_addr.sin_family = AF_INET; > > + my_addr.sin_port = htons(host_port); > > + memset(&(my_addr.sin_zero), 0, 8); > > + > > + my_addr.sin_addr.s_addr = inet_addr(host_name); > > + > > + /* Reconnect loop */ > > + while (!g_client_state.is_terminate_receive) { > > + > > + if (connect(g_client_state.hsock, (struct sockaddr *)&my_addr, > > + sizeof(my_addr)) == -1) { > > + err = errno; > > + if (err != EINPROGRESS) { > > + retries++; > > + fprintf( > > + stderr, > > + "Error connecting socket %d. Host %s, port %u. > Retry count %d\n", > > + errno, host_name, host_port, retries); > > + usleep(5 * 1000 * 1000); > > + continue; > > + } > > + } > > + retries = 0; > > + > > + g_client_state.is_connected = 1; > > + > > + repagent_client_connected(); > > + repcmd_listener(g_client_state.hsock, repagent_process_cmd, NULL); > > + close(g_client_state.hsock); > > + > > + g_client_state.is_connected = 0; > > + } > > + return 0; > > +} > > + > > +void repagent_process_cmd(RepCmd *pcmd, uint8_t *pdata, void *clientPtr) > > +{ > > + int is_free_data = 1; > > + printf("Repagent got cmd %d\n", pcmd->hdr.cmdid); > > + switch (pcmd->hdr.cmdid) { > > + case REPHUB_CMD_START_PROTECT: { > > + is_free_data = repaget_start_protect((RepCmdStartProtect *) pcmd, > > + (RepCmdDataStartProtect *) pdata); > > + } > > + break; > > + case REPHUB_CMD_READ_VOL_REQ: { > > + is_free_data = repaget_read_vol((RepCmdReadVolReq *) pcmd, pdata); > > + } > > + break; > > + default: > > + assert(0); > > + break; > > + > > + } > > + > > + if (is_free_data) { > > + g_free(pdata); > > + } > > +} > > + > > +int repagent_client_send(RepCmd *p_cmd) > > +{ > > + int bytecount = 0; > > + printf("Send cmd %u, data size %u\n", p_cmd->hdr.cmdid, > > + p_cmd->hdr.data_size_bytes); > > + if (!g_client_state.is_connected) { > > + printf("Not connected to hub\n"); > > + return -1; > > + } > > + > > + bytecount = send(g_client_state.hsock, p_cmd, > > + sizeof(RepCmd) + p_cmd->hdr.data_size_bytes, 0); > > + if (bytecount< sizeof(RepCmd) + p_cmd->hdr.data_size_bytes) { > > + printf("Bad send %d, errno %d\n", bytecount, errno); > > + return bytecount; > > + } > > + > > + /* Success */ > > + return 0; > > +} > > diff --git a/replication/repagent_client.h b/replication/repagent_client.h > > new file mode 100644 > > index 0000000..62a5377 > > --- /dev/null > > +++ b/replication/repagent_client.h > > @@ -0,0 +1,36 @@ > > +/* > > + * QEMU System Emulator > > + * > > + * Copyright (c) 2003-2008 Fabrice Bellard > > + * > > + * Permission is hereby granted, free of charge, to any person obtaining a > copy > > + * of this software and associated documentation files (the "Software"), > to deal > > + * in the Software without restriction, including without limitation the > rights > > + * to use, copy, modify, merge, publish, distribute, sublicense, and/or > sell > > + * copies of the Software, and to permit persons to whom the Software is > > + * furnished to do so, subject to the following conditions: > > + * > > + * The above copyright notice and this permission notice shall be included > in > > + * all copies or substantial portions of the Software. > > + * > > + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS > OR > > + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, > > + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL > > + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR > OTHER > > + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING > FROM, > > + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS > IN > > + * THE SOFTWARE. > > + */ > > +#ifndef REPAGENT_CLIENT_H > > +#define REPAGENT_CLIENT_H > > +#include "repcmd.h" > > + > > +typedef struct rephub_params { > > + char *name; > > + int port; > > +} rephub_params; > > + > > +void *repagent_listen(void *pParam); > > +int repagent_client_send(RepCmd *p_cmd); > > + > > +#endif /* REPAGENT_CLIENT_H */ > > diff --git a/replication/repcmd.h b/replication/repcmd.h > > new file mode 100644 > > index 0000000..8c6cf1b > > --- /dev/null > > +++ b/replication/repcmd.h > > @@ -0,0 +1,59 @@ > > +/* > > + * QEMU System Emulator > > + * > > + * Copyright (c) 2003-2008 Fabrice Bellard > > + * > > + * Permission is hereby granted, free of charge, to any person obtaining a > copy > > + * of this software and associated documentation files (the "Software"), > to deal > > + * in the Software without restriction, including without limitation the > rights > > + * to use, copy, modify, merge, publish, distribute, sublicense, and/or > sell > > + * copies of the Software, and to permit persons to whom the Software is > > + * furnished to do so, subject to the following conditions: > > + * > > + * The above copyright notice and this permission notice shall be included > in > > + * all copies or substantial portions of the Software. > > + * > > + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS > OR > > + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, > > + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL > > + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR > OTHER > > + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING > FROM, > > + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS > IN > > + * THE SOFTWARE. > > + */ > > +#ifndef REPCMD_H > > +#define REPCMD_H > > + > > +#include > > + > > +#define REPCMD_MAGIC1 (0x1122) > > +#define REPCMD_MAGIC2 (0x3344) > > +#define REPCMD_NUM_U32_PARAMS (11) > > + > > +enum RepCmds { > > + REPCMD_FIRST_INVALID = 0, > > + REPCMD_FIRST_HUBCMD = 1, > > + REPHUB_CMD_PROTECTED_WRITE = 2, > > + REPHUB_CMD_REPORT_VM_VOLUMES = 3, > > + REPHUB_CMD_START_PROTECT = 4, > > + REPHUB_CMD_READ_VOL_REQ = 5, > > + REPHUB_CMD_READ_VOL_RES = 6, > > + REPHUB_CMD_AGENT_SHUTDOWN = 7, > > +}; > > + > > +typedef struct RepCmdHdr { > > + uint16_t magic1; > > + uint16_t cmdid; > > + uint32_t data_size_bytes; > > +} RepCmdHdr; > > + > > +typedef struct RepCmd { > > + RepCmdHdr hdr; > > + unsigned int parameters[REPCMD_NUM_U32_PARAMS]; > > + unsigned int magic2; > > + uint8_t data[0]; > > +} RepCmd; > > + > > +RepCmd *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata); > > + > > +#endif /* REPCMD_H */ > > diff --git a/replication/repcmd_listener.c b/replication/repcmd_listener.c > > new file mode 100644 > > index 0000000..a211927 > > --- /dev/null > > +++ b/replication/repcmd_listener.c > > @@ -0,0 +1,137 @@ > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > +#include > > + > > +/* Use the CONFIG_REPLICATION flag to determine whether > > + * we're under qemu build or a hub When under > > + * qemu use g_malloc */ > > +#ifdef CONFIG_REPLICATION > > +#include > > +#define REPCMD_MALLOC g_malloc > > +#else > > +#define REPCMD_MALLOC malloc > > +#endif > > + > > +#include "repcmd.h" > > +#include "repcmd_listener.h" > > + > > +#define ZERO_MEM_OBJ(pObj) memset((void *)pObj, 0, sizeof(*pObj)) > > + > > +typedef struct RepCmdListenerState { > > + int is_terminate_receive; > > +} RepCmdListenerState; > > + > > +static RepCmdListenerState g_listenerState = { 0 }; > > + > > +/* Returns 0 for initiated termination or socket error value on error */ > > +int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void > *clientPtr) > > +{ > > + RepCmd curCmd; > > + uint8_t *pReadBuf = (uint8_t *)&curCmd; > > + int bytesToGet = sizeof(RepCmd); > > + int bytesGotten = 0; > > + int isGotHeader = 0; > > + uint8_t *pdata = NULL; > > + > > + assert(callback != NULL); > > + > > + /* receive loop */ > > + while (!g_listenerState.is_terminate_receive) { > > + int bytecount; > > + > > + bytecount = recv(hsock, pReadBuf + bytesGotten, > > + bytesToGet - bytesGotten, 0); > > + if (bytecount == -1) { > > + fprintf(stderr, "Error receiving data %d\n", errno); > > + return errno; > > + } > > + > > + if (bytecount == 0) { > > + printf("Disconnected\n"); > > + return 0; > > + } > > + bytesGotten += bytecount; > > +/* printf("Recieved bytes %d, got %d/%d\n", > > + bytecount, bytesGotten, bytesToGet); */ > > + /* print content */ > > + if (0) { > > + int i; > > + for (i = 0; i< bytecount ; i += 4) { > > + /*printf("%d/%d", i, bytecount/4); */ > > + printf("%#x ", > > + *(int *) (&pReadBuf[bytesGotten - bytecount + i])); > > + > > + } > > + printf("\n"); > > + } > > + assert(bytesGotten<= bytesToGet); > > + if (bytesGotten == bytesToGet) { > > + int isGotData = 0; > > + bytesGotten = 0; > > + if (!isGotHeader) { > > + /* We just got the header */ > > + isGotHeader = 1; > > + > > + assert(curCmd.hdr.magic1 == REPCMD_MAGIC1); > > + assert(curCmd.magic2 == REPCMD_MAGIC2); > > + if (curCmd.hdr.data_size_bytes> 0) { > > + pdata = (uint8_t *)REPCMD_MALLOC( > > + curCmd.hdr.data_size_bytes); > > +/* printf("malloc %p\n", pdata); */ > > + pReadBuf = pdata; > > + } else { > > + /* no data */ > > + isGotData = 1; > > + pdata = NULL; > > + } > > + bytesToGet = curCmd.hdr.data_size_bytes; > > + } else { > > + isGotData = 1; > > + } > > + > > + if (isGotData) { > > + /* Got command and data */ > > + (*callback)(&curCmd, pdata, clientPtr); > > + > > + /* It's the callee responsibility to free pData */ > > + pdata = NULL; > > + ZERO_MEM_OBJ(&curCmd); > > + pReadBuf = (uint8_t *)&curCmd; > > + bytesGotten = 0; > > + bytesToGet = sizeof(RepCmd); > > + isGotHeader = 0; > > + } > > + } > > + } > > + return 0; > > +} > > + > > +RepCmd *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata) > > +{ > > + RepCmd *p_cmd = (RepCmd *)REPCMD_MALLOC(sizeof(RepCmd) + data_size); > > + assert(p_cmd != NULL); > > + > > + /* Zero the CMD (not the data) */ > > + ZERO_MEM_OBJ(p_cmd); > > + > > + p_cmd->hdr.cmdid = cmd_id; > > + p_cmd->hdr.magic1 = REPCMD_MAGIC1; > > + p_cmd->magic2 = REPCMD_MAGIC2; > > + p_cmd->hdr.data_size_bytes = data_size; > > + > > + if (p_out_pdata != NULL) { > > + *p_out_pdata = p_cmd->data; > > + } > > + > > + return p_cmd; > > +} > > + > > diff --git a/replication/repcmd_listener.h b/replication/repcmd_listener.h > > new file mode 100644 > > index 0000000..c09a12e > > --- /dev/null > > +++ b/replication/repcmd_listener.h > > @@ -0,0 +1,32 @@ > > +/* > > + * QEMU System Emulator > > + * > > + * Copyright (c) 2003-2008 Fabrice Bellard > > + * > > + * Permission is hereby granted, free of charge, to any person obtaining a > copy > > + * of this software and associated documentation files (the "Software"), > to deal > > + * in the Software without restriction, including without limitation the > rights > > + * to use, copy, modify, merge, publish, distribute, sublicense, and/or > sell > > + * copies of the Software, and to permit persons to whom the Software is > > + * furnished to do so, subject to the following conditions: > > + * > > + * The above copyright notice and this permission notice shall be included > in > > + * all copies or substantial portions of the Software. > > + * > > + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS > OR > > + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, > > + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL > > + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR > OTHER > > + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING > FROM, > > + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS > IN > > + * THE SOFTWARE. > > + */ > > +#ifndef REPCMD_LISTENER_H > > +#define REPCMD_LISTENER_H > > +#include > > +typedef void (*pfn_received_cmd_cb)(RepCmd *pCmd, > > + uint8_t *pData, void *clientPtr); > > + > > +int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void > *clientPtr); > > + > > +#endif /* REPCMD_LISTENER_H */ > > diff --git a/replication/rephub_cmds.h b/replication/rephub_cmds.h > > new file mode 100644 > > index 0000000..820c37d > > --- /dev/null > > +++ b/replication/rephub_cmds.h > > @@ -0,0 +1,150 @@ > > +/* > > + * QEMU System Emulator > > + * > > + * Copyright (c) 2003-2008 Fabrice Bellard > > + * > > + * Permission is hereby granted, free of charge, to any person obtaining a > copy > > + * of this software and associated documentation files (the "Software"), > to deal > > + * in the Software without restriction, including without limitation the > rights > > + * to use, copy, modify, merge, publish, distribute, sublicense, and/or > sell > > + * copies of the Software, and to permit persons to whom the Software is > > + * furnished to do so, subject to the following conditions: > > + * > > + * The above copyright notice and this permission notice shall be included > in > > + * all copies or substantial portions of the Software. > > + * > > + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS > OR > > + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, > > + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL > > + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR > OTHER > > + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING > FROM, > > + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS > IN > > + * THE SOFTWARE. > > + */ > > +#ifndef REPHUB_CMDS_H > > +#define REPHUB_CMDS_H > > + > > +#include > > +#include "repcmd.h" > > +#include "rephub_defs.h" > > + > > +/********************************************************* > > + * RepCmd Report a protected IO > > + * > > + * REPHUB_CMD_PROTECTED_WRITE > > + * Direction: agent->hub > > + * > > + * Any write of a protected volume is send with this > > + * message to the hub, with its status. > > + * When case the status is bad no data is sent > > + *********************************************************/ > > +typedef struct RepCmdProtectedWrite { > > + RepCmdHdr hdr; > > + uint64_t volume_id; > > + uint64_t offset_sectors; > > + /* The size field duplicates the RepCmd size, > > + * but it is needed for reporting failed IOs' sizes */ > > + uint32_t size_sectors; > > + int ret_status; > > +} RepCmdProtectedWrite; > > + > > +/********************************************************* > > + * RepCmd Report VM volumes > > + * > > + * REPHUB_CMD_REPORT_VM_VOLUMES > > + * Direction: agent->hub > > + * > > + * The agent reports all the volumes of the VM > > + * to the hub. > > + *********************************************************/ > > +typedef struct RepVmVolumeInfo { > > + char name[REPHUB_MAX_VOL_NAME_LEN]; > > + uint64_t volume_id; > > + uint32_t size_mb; > > +} RepVmVolumeInfo; > > + > > +typedef struct RepCmdReportVmVolumes { > > + RepCmdHdr hdr; > > + int num_volumes; > > +} RepCmdReportVmVolumes; > > + > > +typedef struct RepCmdDataReportVmVolumes { > > + RepVmVolumeInfo volumes[0]; > > +} RepCmdDataReportVmVolumes; > > + > > + > > +/********************************************************* > > + * RepCmd Start protect > > + * > > + * REPHUB_CMD_START_PROTECT > > + * Direction: hub->agent > > + * > > + * The hub instructs the agent to start protecting > > + * a volume. When a volume is protected all its writes > > + * are sent to to the hub. > > + * With this command the hub also assigns a volume ID to > > + * the given volume name. > > + *********************************************************/ > > +typedef struct RepCmdStartProtect { > > + RepCmdHdr hdr; > > + uint64_t volume_id; > > +} RepCmdStartProtect; > > + > > +typedef struct RepCmdDataStartProtect { > > + char volume_name[REPHUB_MAX_VOL_NAME_LEN]; > > +} RepCmdDataStartProtect; > > + > > + > > +/********************************************************* > > + * RepCmd Read Volume Request > > + * > > + * REPHUB_CMD_READ_VOL_REQ > > + * Direction: hub->agent > > + * > > + * The hub issues a read IO to a protected volume. > > + * This command is used during sync - when the hub needs > > + * to read unsyncronized sections of a protected volume. > > + * This command is a request, the read data is returned > > + * by the response command REPHUB_CMD_READ_VOL_RES > > + *********************************************************/ > > +typedef struct RepCmdReadVolReq { > > + RepCmdHdr hdr; > > + int req_id; > > + int size_sectors; > > + uint64_t volume_id; > > + uint64_t offset_sectors; > > +} RepCmdReadVolReq; > > + > > +/********************************************************* > > + * RepCmd Read Volume Response > > + * > > + * REPHUB_CMD_READ_VOL_RES > > + * Direction: agent->hub > > + * > > + * A response to REPHUB_CMD_READ_VOL_REQ. > > + * Sends the data read from a protected volume > > + *********************************************************/ > > +typedef struct RepCmdReadVolRes { > > + RepCmdHdr hdr; > > + int req_id; > > + int is_status_success; > > + uint64_t volume_id; > > +} RepCmdReadVolRes; > > + > > +/********************************************************* > > + * RepCmd Agent shutdown > > + * > > + * REPHUB_CMD_AGENT_SHUTDOWN > > + * Direction: agent->hub > > + * > > + * Notifies the hub that the agent is about to shutdown. > > + * This allows a graceful shutdown. Any disconnection > > + * of an agent without sending this command will result > > + * in a full sync of the VM volumes. > > + *********************************************************/ > > +typedef struct RepCmdAgentShutdown { > > + RepCmdHdr hdr; > > +} RepCmdAgentShutdown; > > + > > + > > +#endif /* REPHUB_CMDS_H */ > > diff --git a/replication/rephub_defs.h b/replication/rephub_defs.h > > new file mode 100644 > > index 0000000..e34e0ce > > --- /dev/null > > +++ b/replication/rephub_defs.h > > @@ -0,0 +1,40 @@ > > +/* > > + * QEMU System Emulator > > + * > > + * Copyright (c) 2003-2008 Fabrice Bellard > > + * > > + * Permission is hereby granted, free of charge, to any person obtaining a > copy > > + * of this software and associated documentation files (the "Software"), > to deal > > + * in the Software without restriction, including without limitation the > rights > > + * to use, copy, modify, merge, publish, distribute, sublicense, and/or > sell > > + * copies of the Software, and to permit persons to whom the Software is > > + * furnished to do so, subject to the following conditions: > > + * > > + * The above copyright notice and this permission notice shall be included > in > > + * all copies or substantial portions of the Software. > > + * > > + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS > OR > > + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, > > + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL > > + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR > OTHER > > + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING > FROM, > > + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS > IN > > + * THE SOFTWARE. > > + */ > > +#ifndef REP_HUB_DEFS_H > > +#define REP_HUB_DEFS_H > > + > > +#include > > + > > +#define REPHUB_MAX_VOL_NAME_LEN (1024) > > +#define REPHUB_MAX_NUM_VOLUMES (512) > > + > > +#ifndef TRUE > > + #define TRUE (1) > > +#endif > > + > > +#ifndef FALSE > > + #define FALSE (0) > > +#endif > > + > > +#endif /* REP_HUB_DEFS_H */ > > diff --git a/vl.c b/vl.c > > index 624da0f..506b5dc 100644 > > --- a/vl.c > > +++ b/vl.c > > @@ -167,6 +167,7 @@ int main(int argc, char **argv) > > #include "ui/qemu-spice.h" > > +#include "replication/repagent.h" > > //#define DEBUG_NET > > //#define DEBUG_SLIRP > > @@ -2307,6 +2308,15 @@ int main(int argc, char **argv, char **envp) > > drive_add(IF_DEFAULT, popt->index - QEMU_OPTION_hda, > optarg, > > HD_OPTS); > > break; > > + case QEMU_OPTION_repagent: > > +#ifdef CONFIG_REPLICATION > > + repagent_init(optarg, 0); > > +#else > > + fprintf(stderr, "Replication support is disabled. " > > + "Don't use -repagent option.\n"); > > + exit(1); > > +#endif > > + break; > > case QEMU_OPTION_drive: > > if (drive_def(optarg) == NULL) { > > exit(1); >