From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([140.186.70.92]:60156) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1RuiIn-0006W2-1s for qemu-devel@nongnu.org; Tue, 07 Feb 2012 05:29:44 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1RuiIa-00039J-R3 for qemu-devel@nongnu.org; Tue, 07 Feb 2012 05:29:33 -0500 Received: from mail-pw0-f45.google.com ([209.85.160.45]:50012) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1RuiIZ-00038Y-V1 for qemu-devel@nongnu.org; Tue, 07 Feb 2012 05:29:20 -0500 Received: by pbaa11 with SMTP id a11so7751087pba.4 for ; Tue, 07 Feb 2012 02:29:18 -0800 (PST) From: Ori Mamluk MIME-Version: 1.0 Date: Tue, 7 Feb 2012 12:29:14 +0200 Message-ID: <73865e0ce364c40e0eb65ec6b22b819d@mail.gmail.com> Content-Type: multipart/alternative; boundary=047d7b3396015fba3f04b85d40a9 Subject: [Qemu-devel] [RFC PATCH] replication agent module List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: qemu-devel@nongnu.org Cc: Kevin Wolf , dlaor@redhat.com --047d7b3396015fba3f04b85d40a9 Content-Type: text/plain; charset=windows-1252 Content-Transfer-Encoding: quoted-printable Repagent is a new module that allows an external replication system to replicate a volume of a Qemu VM. This RFC patch adds the repagent client module to Qemu. Documentation of the module role and API is in the patch at replication/qemu-repagent.txt The main motivation behind the module is to allow replication of VMs in a virtualization environment like RhevM. To achieve this we need basic replication support in Qemu. This is the first submission of this module, which was written as a Proof Of Concept, and used successfully for replicating and recovering a Qemu VM. Points and open issues: * The module interfaces the Qemu storage stack at block.c generic layer. Is this the right place to intercept/inject IOs? * The patch contains performing IO reads invoked by a new thread (a TCP listener thread). See repaget_read_vol in repagent.c. It is not protected by any lock =96 is this OK? * VM ID =96 the replication system implies an environment with several VMs connected to a central replication system (Rephub). This requires some sort of identification for a VM. The current patch does not include a VM ID =96 I did not find any adequate ID t= o use. Any suggestions? Appreciate any feedback or suggestions. Thanks, Ori. >>From 5a0d88689ddcf325f25fdfca2a2012f1bbf141b9 Mon Sep 17 00:00:00 2001 From: Ori Mamluk Date: Tue, 7 Feb 2012 11:12:12 +0200 Subject: [PATCH] Added replication agent module (repagent) to Qemu under replication directory, added repagent configure and run options, and the repagent API usage in bloc Added build options to ./configure: --enable-replication --disable-replica= t Added a commandline option to enable: -repagent Added the module files under replication. Signed-off-by: Ori Mamluk --- Makefile | 9 +- Makefile.objs | 6 + block.c | 20 +++- configure | 11 ++ qemu-options.hx | 6 + replication/qemu-repagent.txt | 104 +++++++++++++ replication/repagent.c | 322 +++++++++++++++++++++++++++++++++++++++++ replication/repagent.h | 46 ++++++ replication/repagent_client.c | 138 ++++++++++++++++++ replication/repagent_client.h | 36 +++++ replication/repcmd.h | 59 ++++++++ replication/repcmd_listener.c | 137 +++++++++++++++++ replication/repcmd_listener.h | 32 ++++ replication/rephub_cmds.h | 150 +++++++++++++++++++ replication/rephub_defs.h | 40 +++++ vl.c | 10 ++ 16 files changed, 1121 insertions(+), 5 deletions(-) mode change 100644 =3D> 100755 Makefile.objs mode change 100644 =3D> 100755 qemu-options.hx create mode 100755 replication/qemu-repagent.txt create mode 100644 replication/repagent.c create mode 100644 replication/repagent.h create mode 100644 replication/repagent_client.c create mode 100644 replication/repagent_client.h create mode 100644 replication/repcmd.h create mode 100644 replication/repcmd_listener.c create mode 100644 replication/repcmd_listener.h create mode 100644 replication/rephub_cmds.h create mode 100644 replication/rephub_defs.h diff --git a/Makefile b/Makefile index 4f6eaa4..a1b3701 100644 --- a/Makefile +++ b/Makefile @@ -149,9 +149,9 @@ qemu-img.o qemu-tool.o qemu-nbd.o qemu-io.o cmd.o qemu-ga.o: $(GENERATED_HEADERS tools-obj-y =3D qemu-tool.o $(oslib-obj-y) $(trace-obj-y) \ qemu-timer-common.o cutils.o -qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y) -qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y) -qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y) +qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y) $(replication-obj-y) +qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y) $(replication-obj-y) +qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y) $(replication-obj-y) qemu-img-cmds.h: $(SRC_PATH)/qemu-img-cmds.hx $(call quiet-command,sh $(SRC_PATH)/scripts/hxtool -h < $< > $@," GEN $@") @@ -228,6 +228,7 @@ clean: rm -f trace-dtrace.dtrace trace-dtrace.dtrace-timestamp rm -f trace-dtrace.h trace-dtrace.h-timestamp rm -rf $(qapi-dir) + rm -f replication/*.{o,d} $(MAKE) -C tests clean for d in $(ALL_SUBDIRS) $(QEMULIBS) libcacard; do \ if test -d $$d; then $(MAKE) -C $$d $@ || exit 1; fi; \ @@ -387,4 +388,4 @@ tar: rm -rf /tmp/$(FILE) # Include automatically generated dependency files --include $(wildcard *.d audio/*.d slirp/*.d block/*.d net/*.d ui/*.d qapi/*.d qga/*.d) +-include $(wildcard *.d audio/*.d slirp/*.d block/*.d net/*.d ui/*.d qapi/*.d qga/*.d replication/*.d) diff --git a/Makefile.objs b/Makefile.objs old mode 100644 new mode 100755 index d7a6539..dbd6f15 --- a/Makefile.objs +++ b/Makefile.objs @@ -74,6 +74,7 @@ fsdev-obj-$(CONFIG_VIRTFS) +=3D $(addprefix fsdev/, $(fsdev-nested-y)) # CPUs and machines. common-obj-y =3D $(block-obj-y) blockdev.o +common-obj-y +=3D $(replication-obj-$(CONFIG_REPLICATION)) common-obj-y +=3D $(net-obj-y) common-obj-y +=3D $(qobject-obj-y) common-obj-$(CONFIG_LINUX) +=3D $(fsdev-obj-$(CONFIG_LINUX)) @@ -413,6 +414,11 @@ common-obj-y +=3D qmp-marshal.o qapi-visit.o qapi-types.o $(qapi-obj-y) common-obj-y +=3D qmp.o hmp.o ###################################################################### +# replication +replication-nested-y =3D repagent_client.o repagent.o repcmd_listener.o +replication-obj-y =3D $(addprefix replication/, $(replication-nested-y)) + +###################################################################### # guest agent qga-nested-y =3D guest-agent-commands.o guest-agent-command-state.o diff --git a/block.c b/block.c index 9bb236c..f3b8387 100644 --- a/block.c +++ b/block.c @@ -31,6 +31,10 @@ #include "qemu-coroutine.h" #include "qmp-commands.h" +#ifdef CONFIG_REPLICATION +#include "replication/repagent.h" +#endif + #ifdef CONFIG_BSD #include #include @@ -640,6 +644,9 @@ int bdrv_open(BlockDriverState *bs, const char *filename, int flags, goto unlink_and_fail; } +#ifdef CONFIG_REPLICATION + repagent_register_drive(filename, bs); +#endif /* Open the image */ ret =3D bdrv_open_common(bs, filename, flags, drv); if (ret < 0) { @@ -1292,6 +1299,17 @@ static int coroutine_fn bdrv_co_do_writev(BlockDriverState *bs, ret =3D drv->bdrv_co_writev(bs, sector_num, nb_sectors, qiov); + +#ifdef CONFIG_REPLICATION + if (bs->device_name[0] !=3D '\0') { + /* We split the IO only at the highest stack driver layer. + Currently we know that by checking device_name - only + highest level (closest to the guest) has that name. + */ + repagent_handle_protected_write(bs, sector_num, + nb_sectors, qiov, ret); + } +#endif if (bs->dirty_bitmap) { set_dirty_bitmap(bs, sector_num, nb_sectors, 1); } @@ -1783,7 +1801,7 @@ int bdrv_has_zero_init(BlockDriverState *bs) * 'nb_sectors' is the max value 'pnum' should be set to. */ int bdrv_is_allocated(BlockDriverState *bs, int64_t sector_num, int nb_sectors, - int *pnum) + int *pnum) { int64_t n; if (!bs->drv->bdrv_is_allocated) { diff --git a/configure b/configure index 9e5da44..93d600e 100755 --- a/configure +++ b/configure @@ -179,6 +179,7 @@ spice=3D"" rbd=3D"" smartcard=3D"" smartcard_nss=3D"" +replication=3D"" usb_redir=3D"" opengl=3D"" zlib=3D"yes" @@ -772,6 +773,10 @@ for opt do ;; --enable-smartcard-nss) smartcard_nss=3D"yes" ;; + --disable-replication) replication=3D"no" + ;; + --enable-replication) replication=3D"yes" + ;; --disable-usb-redir) usb_redir=3D"no" ;; --enable-usb-redir) usb_redir=3D"yes" @@ -1067,6 +1072,7 @@ echo " --disable-usb-redir disable usb network redirection support" echo " --enable-usb-redir enable usb network redirection support" echo " --disable-guest-agent disable building of the QEMU Guest Agent" echo " --enable-guest-agent enable building of the QEMU Guest Agent" +echo " --enable-replication enable replication support" echo "" echo "NOTE: The object files are built at the place where configure is launched" exit 1 @@ -2733,6 +2739,7 @@ echo "curl support $curl" echo "check support $check_utests" echo "mingw32 support $mingw32" echo "Audio drivers $audio_drv_list" +echo "Replication $replication" echo "Extra audio cards $audio_card_list" echo "Block whitelist $block_drv_whitelist" echo "Mixer emulation $mixemu" @@ -3080,6 +3087,10 @@ if test "$smartcard_nss" =3D "yes" ; then echo "CONFIG_SMARTCARD_NSS=3Dy" >> $config_host_mak fi +if test "$replication" =3D "yes" ; then + echo "CONFIG_REPLICATION=3Dy" >> $config_host_mak +fi + if test "$usb_redir" =3D "yes" ; then echo "CONFIG_USB_REDIR=3Dy" >> $config_host_mak fi diff --git a/qemu-options.hx b/qemu-options.hx old mode 100644 new mode 100755 index 681eaf1..c97e4f8 --- a/qemu-options.hx +++ b/qemu-options.hx @@ -2602,3 +2602,9 @@ HXCOMM This is the last statement. Insert new options before this line! STEXI @end table ETEXI + +DEF("repagent", HAS_ARG, QEMU_OPTION_repagent, + "-repagent [hub IP/name]\n" + " Enable replication support for disks\n" + " hub is the ip or name of the machine running the replication hub.\n", + QEMU_ARCH_ALL) diff --git a/replication/qemu-repagent.txt b/replication/qemu-repagent.txt new file mode 100755 index 0000000..e3b0c1e --- /dev/null +++ b/replication/qemu-repagent.txt @@ -0,0 +1,104 @@ + repagent - replication agent - a Qemu module for enabling continuous async replication of VM volumes + +Introduction + This document describes a feature in Qemu - a replication agent (AKA Repagent). + The Repagent is a new module that exposes an API to an external replication system (AKA Rephub). + This API allows a Rephub to communicate with a Qemu VM and continuously replicate its volumes. + The imlementation of a Rephub is outside of the scope of this document. There may be several various Rephub + implenetations using the same repagent in Qemu. + +Main feature of Repagent + Repagent does the following: + * Report volumes - report a list of all volumes in a VM to the Rephub. + * Report writes to a volume - send all writes made to a protected volume to the Rephub. + The reporting of an IO is asyncronuous - i.e. the IO is not delayed by the Repagent to get any acknowledgement from the Rephub. + It is only copied to the Rephub. + * Read a protected volume - allows the Rephub to read a protected volume, to enable the protected hub to syncronize the content of a protected volume. + +Description of the Repagent module + +Build and run options + New configure option: --enable-replication + New command line option: + -repagent [hub IP/name] + Enable replication support for disks + hub is the ip or name of the machine running the replication hub. + +Module APIs + The Repagent module interfaces two main components: + 1. The Rephub - An external API based on socket messages + 2. The generic block layer- block.c + + Rephub message API + The external replication API is a message based API. + We won't go into the structure of the messages here - just the sematics. + + Messages list + (The updated list and comments are in Rephub_cmds.h) + + Messages from the Repagent to the Rephub: + * Protected write + The Repagent sends each write to a protected volume to the hub with the IO status. + In case the status is bad the write content is not sent + * Report VM volumes + The agent reports all the volumes of the VM to the hub. + * Read Volume Response + A response to a Read Volume Request + Sends the data read from a protected volume to the hub + * Agent shutdown + Notifies the hub that the agent is about to shutdown. + This allows a graceful shutdown. Any disconnection of an agent without + sending this command will result in a full sync of the VM volumes. + + Messages from the Rephub to the Repagent: + * Start protect + The hub instructs the agent to start protecting a volume. When a volume is protecte= d + all its writes are sent to to the hub. + With this command the hub also assigns a volume ID to the given volume name. + * Read volume request + The hub issues a read IO to a protected volume. + This command is used during sync - when the hub needs to read unsyncronized + sections of a protected volume. + This command is a request, the read data is returned by the read volume response message (see above). + block.c API + The API to the generic block storage layer contains 3 functionalities: + 1. Handle writes to protected volumes + In bdrv_co_do_writev, each write is reported to the Repagent module. + 2. Handle each new volume that registers + In bdrv_open - each new bottom-level block driver that registers is reported. + 2. Read from a volume + Repagent calls bdrv_aio_readv to handle read requests coming from the hub. + + +General description of a Rephub - a replication system the repagent connects to + This section describes in high level a sample Rephub - a replication system that uses the repagent API + to replicate disks. + It describes a simple Rephub that comntinuously maintains a mirror of the volumes of a VM. + + Say we have a VM we want to protect - call it PVM, say it has 2 volumes - V1, V2. + Our Rephub is called SingleRephub - a Rephub protecting a single VM. + + Preparations + 1. The user chooses a host to rub SingleRephub - a different host than PVM, call it Host2 + 2. The user creates two volumes on Host2 - same sizes of V1 and V2, call them V1R (V1 recovery) and V2R. + 3. The user runs SingleRephub process on Host2, and gives V1R and V2R as command line arguments. + From now on SingleRephub waits for the protected VM repagent to connect. + 4. The user runs the protected VM PVM - and uses the switch -repagent . + + Runtime + 1. The repagent module connects to SingleRephub on startup. + 2. repagent reports V1 and V2 to SingleRephub. + 3. SingleRephub starts to perform an initial synchronization of the protected volumes- + it reads each protected volume (V1 and V2) - using read volume requests - and copies the data into the + recovery volume V1R and V2R. + 4. SingleRephub enters 'protection' mode - each write to the protected volume is sent by the repagent to the Rephub, + and the Rephub performs the write on the matching recovery volume. + + * Note that during stage 3 writes to the protected volumes are not ignored - they're kept in a bitmap, + and will be read again when stage 3 ends, in an interative convergin process. + + This flow continuously maintains an updated recovery volume. + If the protected system is damaged, the user can create a new VM on Host2 with the replicated volumes attached to it. + The new VM is a replica of the protected system. + + diff --git a/replication/repagent.c b/replication/repagent.c new file mode 100644 index 0000000..c66eae7 --- /dev/null +++ b/replication/repagent.c @@ -0,0 +1,322 @@ +#include +#include +#include +#include +#include + +#include "block.h" +#include "rephub_defs.h" +#include "block_int.h" +#include "repagent_client.h" +#include "repagent.h" +#include "rephub_cmds.h" + +#define ZERO_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj)) +#define REPAGENT_MAX_NUM_VOLUMES (64) +#define REPAGENT_VOLUME_ID_NONE (0) + +typedef struct RepagentVolume { + uint64_t vol_id; + const char *vol_path; + BlockDriverState *driver_ptr; +} RepagentVolume; + +struct RepAgentState { + int is_init; + int num_volumes; + RepagentVolume * volumes[REPAGENT_MAX_NUM_VOLUMES]; +}; + +typedef struct RepagentReadVolIo { + QEMUIOVector qiov; + RepCmdReadVolReq rep_cmd; + uint8_t *buf; + struct timeval start_time; +} RepagentReadVolIo; + +static int repagent_get_volume_by_name(const char *name); +static void repagent_report_volumes_to_hub(void); +static void repagent_vol_read_done(void *opaque, int ret); +static struct timeval tsub(struct timeval t1, struct timeval t2); + +RepAgentState g_rep_agent =3D { 0 }; + +void repagent_init(const char *hubname, int port) +{ + /* It is the responsibility of the thread to free this struct */ + rephub_params *pParams =3D (rephub_params *)g_malloc(sizeof(rephub_params)); + if (hubname =3D=3D NULL) { + hubname =3D "127.0.0.1"; + } + if (port =3D=3D 0) { + port =3D 9010; + } + + printf("repagent_init %s\n", hubname); + + pParams->port =3D port; + pParams->name =3D g_strdup(hubname); + + pthread_t thread_id =3D 0; + + /* Create the repagent client listener thread */ + pthread_create(&thread_id, 0, repagent_listen, (void *) pParams); + pthread_detach(thread_id); +} + +void repagent_register_drive(const char *drive_path, + BlockDriverState *driver_ptr) +{ + int i; + for (i =3D 0; i < g_rep_agent.num_volumes ; i++) { + RepagentVolume *vol =3D g_rep_agent.volumes[i]; + if (vol !=3D NULL) { + assert( + strcmp(drive_path, vol->vol_path) !=3D 0 + && driver_ptr !=3D vol->driver_ptr); + } + } + + assert(g_rep_agent.num_volumes < REPAGENT_MAX_NUM_VOLUMES); + + printf("zerto repagent: Registering drive. Num drives %d, path %s\n", + g_rep_agent.num_volumes, drive_path); + g_rep_agent.volumes[i] =3D + (RepagentVolume *)g_malloc(sizeof(RepagentVolume)); + g_rep_agent.volumes[i]->driver_ptr =3D driver_ptr; + /* orim todo strcpy? */ + g_rep_agent.volumes[i]->vol_path =3D drive_path; + + /* Orim todo thread-safety? */ + g_rep_agent.num_volumes++; + + repagent_report_volumes_to_hub(); +} + +/* orim todo destruction? */ + +static RepagentVolume *repagent_get_protected_volume_by_driver( + BlockDriverState *bs) +{ + /* orim todo optimize search */ + int i =3D 0; + for (i =3D 0; i < g_rep_agent.num_volumes ; i++) { + RepagentVolume *p_vol =3D g_rep_agent.volumes[i]; + if (p_vol !=3D NULL && p_vol->driver_ptr =3D=3D (void *) bs) { + return p_vol; + } + } + return NULL; +} + +void repagent_handle_protected_write(BlockDriverState *bs, int64_t sector_num, + int nb_sectors, QEMUIOVector *qiov, int ret_status) +{ + printf("zerto Protected write offset %lld, size %d, IO return status %d", + (long long int) sector_num, nb_sectors, ret_status); + if (bs->filename !=3D NULL) { + printf(", filename %s", bs->filename); + } + + printf("\n"); + + RepagentVolume *p_vol =3D repagent_get_protected_volume_by_driver(bs); + if (p_vol =3D=3D NULL || p_vol->vol_id =3D=3D REPAGENT_VOLUME_ID_NONE)= { + /* Unprotected */ + printf("Got a write to an unprotected volume.\n"); + return; + } + + /* Report IO to rephub */ + + int data_size =3D qiov->size; + if (ret_status < 0) { + /* On failed ios we don't send the data to the hub */ + data_size =3D 0; + } + uint8_t *pdata =3D NULL; + RepCmdProtectedWrite *p_cmd =3D (RepCmdProtectedWrite *) repcmd_new( + REPHUB_CMD_PROTECTED_WRITE, data_size, (uint8_t **) &pdata); + + if (ret_status >=3D 0) { + qemu_iovec_to_buffer(qiov, pdata); + } + + p_cmd->volume_id =3D p_vol->vol_id; + p_cmd->offset_sectors =3D sector_num; + p_cmd->size_sectors =3D nb_sectors; + p_cmd->ret_status =3D ret_status; + + if (repagent_client_send((RepCmd *) p_cmd) !=3D 0) { + printf("Error sending command\n"); + } +} + +static void repagent_report_volumes_to_hub(void) +{ + /* Report IO to rephub */ + int i; + RepCmdDataReportVmVolumes *p_cmd_data =3D NULL; + RepCmdReportVmVolumes *p_cmd =3D (RepCmdReportVmVolumes *) repcmd_new( + REPHUB_CMD_REPORT_VM_VOLUMES, + g_rep_agent.num_volumes * sizeof(RepVmVolumeInfo), + (uint8_t **) &p_cmd_data); + p_cmd->num_volumes =3D g_rep_agent.num_volumes; + printf("reporting %u volumes\n", g_rep_agent.num_volumes); + for (i =3D 0; i < g_rep_agent.num_volumes ; i++) { + assert(g_rep_agent.volumes[i] !=3D NULL); + printf("reporting volume %s size %u\n", + g_rep_agent.volumes[i]->vol_path, + (uint32_t) sizeof(p_cmd_data->volumes[i].name)); + strncpy((char *) p_cmd_data->volumes[i].name, + g_rep_agent.volumes[i]->vol_path, + sizeof(p_cmd_data->volumes[i].name)); + p_cmd_data->volumes[i].volume_id =3D g_rep_agent.volumes[i]->vol_i= d; + } + if (repagent_client_send((RepCmd *) p_cmd) !=3D 0) { + printf("Error sending command\n"); + } +} + +int repaget_start_protect(RepCmdStartProtect *pcmd, + RepCmdDataStartProtect *pcmd_data) +{ + printf("Start protect vol %s, ID %llu\n", pcmd_data->volume_name, + (unsigned long long) pcmd->volume_id); + int vol_index =3D repagent_get_volume_by_name(pcmd_data->volume_name); + if (vol_index < 0) { + printf("The volume doesn't exist\n"); + return TRUE; + } + /* orim todo protect */ + g_rep_agent.volumes[vol_index]->vol_id =3D pcmd->volume_id; + + return TRUE; +} + +static int repagent_get_volume_by_name(const char *name) +{ + int i =3D 0; + for (i =3D 0; i < g_rep_agent.num_volumes ; i++) { + if (g_rep_agent.volumes[i] !=3D NULL + && strcmp(name, g_rep_agent.volumes[i]->vol_path) =3D=3D 0= ) { + return i; + } + } + return -1; +} + +static int repagent_get_volume_by_id(uint64_t vol_id) +{ + int i =3D 0; + for (i =3D 0; i < g_rep_agent.num_volumes ; i++) { + if (g_rep_agent.volumes[i] !=3D NULL + && g_rep_agent.volumes[i]->vol_id =3D=3D vol_id) { + return i; + } + } + return -1; +} + +int repaget_read_vol(RepCmdReadVolReq *pcmd, uint8_t *pdata) +{ + int index =3D repagent_get_volume_by_id(pcmd->volume_id); + int size_bytes =3D pcmd->size_sectors * 512; + if (index < 0) { + printf("Vol read - Could not find vol id %llu\n", + (unsigned long long int) pcmd->volume_id); + RepCmdReadVolRes *p_res_cmd =3D (RepCmdReadVolRes *) repcmd_new( + REPHUB_CMD_READ_VOL_RES, 0, NULL); + p_res_cmd->req_id =3D pcmd->req_id; + p_res_cmd->volume_id =3D pcmd->volume_id; + p_res_cmd->is_status_success =3D FALSE; + repagent_client_send((RepCmd *) p_res_cmd); + return TRUE; + } + + printf("Vol read - driver %p, volId %llu, offset %llu, size %u\n", + g_rep_agent.volumes[index]->driver_ptr, + (unsigned long long int) pcmd->volume_id, + (unsigned long long int) pcmd->offset_sectors, pcmd->size_sectors); + + { + RepagentReadVolIo *read_xact =3D calloc(1, sizeof(RepagentReadVolIo)); + +/* BlockDriverAIOCB *acb; */ + + ZERO_MEM_OBJ(read_xact); + + qemu_iovec_init(&read_xact->qiov, 1); + + /*read_xact->buf =3D + qemu_blockalign(g_rep_agent.volumes[index]->driver_ptr, size_bytes); */ + read_xact->buf =3D (uint8_t *) g_malloc(size_bytes); + read_xact->rep_cmd =3D *pcmd; + qemu_iovec_add(&read_xact->qiov, read_xact->buf, size_bytes); + + gettimeofday(&read_xact->start_time, NULL); + /* orim TODO - use the returned acb to cancel the request on shutdown */ + /*acb =3D */bdrv_aio_readv(g_rep_agent.volumes[index]->driver_ptr, + read_xact->rep_cmd.offset_sectors, &read_xact->qiov, + read_xact->rep_cmd.size_sectors, repagent_vol_read_done, + read_xact); + } + + return TRUE; +} + +static void repagent_vol_read_done(void *opaque, int ret) +{ + struct timeval t2; + RepagentReadVolIo *read_xact =3D (RepagentReadVolIo *) opaque; + uint8_t *pdata =3D NULL; + RepCmdReadVolRes *pcmd =3D (RepCmdReadVolRes *) repcmd_new( + REPHUB_CMD_READ_VOL_RES, read_xact->rep_cmd.size_sectors * 512= , + &pdata); + pcmd->req_id =3D read_xact->rep_cmd.req_id; + pcmd->volume_id =3D read_xact->rep_cmd.volume_id; + pcmd->is_status_success =3D FALSE; + + printf("Protected vol read - volId %llu, offset %llu, size %u\n", + (unsigned long long int) read_xact->rep_cmd.volume_id, + (unsigned long long int) read_xact->rep_cmd.offset_sectors, + read_xact->rep_cmd.size_sectors); + gettimeofday(&t2, NULL); + + if (ret >=3D 0) { + /* Read response - send the data to the hub */ + t2 =3D tsub(t2, read_xact->start_time); + printf("Read prot vol done. Took %u seconds, %u us.", + (uint32_t) t2.tv_sec, (uint32_t) t2.tv_usec); + + pcmd->is_status_success =3D TRUE; + /* orim todo optimize - don't copy, use the qiov buffer */ + qemu_iovec_to_buffer(&read_xact->qiov, pdata); + } else { + printf("readv failed: %s\n", strerror(-ret)); + } + + repagent_client_send((RepCmd *) pcmd); + + /*qemu_vfree(read_xact->buf); */ + g_free(read_xact->buf); + + g_free(read_xact); +} + +static struct timeval tsub(struct timeval t1, struct timeval t2) +{ + t1.tv_usec -=3D t2.tv_usec; + if (t1.tv_usec < 0) { + t1.tv_usec +=3D 1000000; + t1.tv_sec--; + } + t1.tv_sec -=3D t2.tv_sec; + return t1; +} + +void repagent_client_connected(void) +{ + /* orim todo thread protection */ + repagent_report_volumes_to_hub(); +} diff --git a/replication/repagent.h b/replication/repagent.h new file mode 100644 index 0000000..98ccbf2 --- /dev/null +++ b/replication/repagent.h @@ -0,0 +1,46 @@ +/* + * QEMU System Emulator + * + * Copyright (c) 2003-2008 Fabrice Bellard + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY= , + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +#ifndef REPAGENT_H +#define REPAGENT_H +#include + +#include "qemu-common.h" + +typedef struct RepAgentState RepAgentState; +typedef struct RepCmdStartProtect RepCmdStartProtect; +typedef struct RepCmdDataStartProtect RepCmdDataStartProtect; +struct RepCmdReadVolReq; + +void repagent_init(const char *hubname, int port); +void repagent_handle_protected_write(BlockDriverState *bs, + int64_t sector_num, int nb_sectors, QEMUIOVector *qiov, int ret_status); +void repagent_register_drive(const char *drive_path, + BlockDriverState *driver_ptr); +int repaget_start_protect(RepCmdStartProtect *pcmd, + RepCmdDataStartProtect *pcmd_data); +int repaget_read_vol(struct RepCmdReadVolReq *pcmd, uint8_t *pdata); +void repagent_client_connected(void); + + +#endif /* REPAGENT_H */ diff --git a/replication/repagent_client.c b/replication/repagent_client.c new file mode 100644 index 0000000..4dd9ea4 --- /dev/null +++ b/replication/repagent_client.c @@ -0,0 +1,138 @@ +#include "repcmd.h" +#include "rephub_cmds.h" +#include "repcmd_listener.h" +#include "repagent_client.h" +#include "repagent.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define ZERO_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj)) + +static void repagent_process_cmd(RepCmd *pCmd, uint8_t *pData, void *clientPtr); + +typedef struct repagent_client_state { + int is_connected; + int is_terminate_receive; + int hsock; +} repagent_client_state; + +static repagent_client_state g_client_state =3D { 0 }; + +void *repagent_listen(void *pParam) +{ + rephub_params *pServerParams =3D (rephub_params *) pParam; + int host_port =3D pServerParams->port; + const char *host_name =3D pServerParams->name; + + printf("Creating repagent listener thread...\n"); + g_free(pServerParams); + + struct sockaddr_in my_addr; + + int err; + int retries =3D 0; + + g_client_state.hsock =3D socket(AF_INET, SOCK_STREAM, 0); + if (g_client_state.hsock =3D=3D -1) { + printf("Error initializing socket %d\n", errno); + return (void *) -1; + } + + int param =3D 1; + + if ((setsockopt(g_client_state.hsock, SOL_SOCKET, SO_REUSEADDR, + (char *) ¶m, sizeof(int)) =3D=3D -1) + || (setsockopt(g_client_state.hsock, SOL_SOCKET, SO_KEEPALIVE, + (char *) ¶m, sizeof(int)) =3D=3D -1)) { + printf("Error setting options %d\n", errno); + return (void *) -1; + } + + my_addr.sin_family =3D AF_INET; + my_addr.sin_port =3D htons(host_port); + memset(&(my_addr.sin_zero), 0, 8); + + my_addr.sin_addr.s_addr =3D inet_addr(host_name); + + /* Reconnect loop */ + while (!g_client_state.is_terminate_receive) { + + if (connect(g_client_state.hsock, (struct sockaddr *) &my_addr, + sizeof(my_addr)) =3D=3D -1) { + err =3D errno; + if (err !=3D EINPROGRESS) { + retries++; + fprintf( + stderr, + "Error connecting socket %d. Host %s, port %u. Retry count %d\n", + errno, host_name, host_port, retries); + usleep(5 * 1000 * 1000); + continue; + } + } + retries =3D 0; + + g_client_state.is_connected =3D 1; + + repagent_client_connected(); + repcmd_listener(g_client_state.hsock, repagent_process_cmd, NULL); + close(g_client_state.hsock); + + g_client_state.is_connected =3D 0; + } + return 0; +} + +void repagent_process_cmd(RepCmd *pcmd, uint8_t *pdata, void *clientPtr) +{ + int is_free_data =3D 1; + printf("Repagent got cmd %d\n", pcmd->hdr.cmdid); + switch (pcmd->hdr.cmdid) { + case REPHUB_CMD_START_PROTECT: { + is_free_data =3D repaget_start_protect((RepCmdStartProtect *) pcmd= , + (RepCmdDataStartProtect *) pdata); + } + break; + case REPHUB_CMD_READ_VOL_REQ: { + is_free_data =3D repaget_read_vol((RepCmdReadVolReq *) pcmd, pdata= ); + } + break; + default: + assert(0); + break; + + } + + if (is_free_data) { + g_free(pdata); + } +} + +int repagent_client_send(RepCmd *p_cmd) +{ + int bytecount =3D 0; + printf("Send cmd %u, data size %u\n", p_cmd->hdr.cmdid, + p_cmd->hdr.data_size_bytes); + if (!g_client_state.is_connected) { + printf("Not connected to hub\n"); + return -1; + } + + bytecount =3D send(g_client_state.hsock, p_cmd, + sizeof(RepCmd) + p_cmd->hdr.data_size_bytes, 0); + if (bytecount < sizeof(RepCmd) + p_cmd->hdr.data_size_bytes) { + printf("Bad send %d, errno %d\n", bytecount, errno); + return bytecount; + } + + /* Success */ + return 0; +} diff --git a/replication/repagent_client.h b/replication/repagent_client.h new file mode 100644 index 0000000..62a5377 --- /dev/null +++ b/replication/repagent_client.h @@ -0,0 +1,36 @@ +/* + * QEMU System Emulator + * + * Copyright (c) 2003-2008 Fabrice Bellard + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY= , + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +#ifndef REPAGENT_CLIENT_H +#define REPAGENT_CLIENT_H +#include "repcmd.h" + +typedef struct rephub_params { + char *name; + int port; +} rephub_params; + +void *repagent_listen(void *pParam); +int repagent_client_send(RepCmd *p_cmd); + +#endif /* REPAGENT_CLIENT_H */ diff --git a/replication/repcmd.h b/replication/repcmd.h new file mode 100644 index 0000000..8c6cf1b --- /dev/null +++ b/replication/repcmd.h @@ -0,0 +1,59 @@ +/* + * QEMU System Emulator + * + * Copyright (c) 2003-2008 Fabrice Bellard + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY= , + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +#ifndef REPCMD_H +#define REPCMD_H + +#include + +#define REPCMD_MAGIC1 (0x1122) +#define REPCMD_MAGIC2 (0x3344) +#define REPCMD_NUM_U32_PARAMS (11) + +enum RepCmds { + REPCMD_FIRST_INVALID =3D 0, + REPCMD_FIRST_HUBCMD =3D 1, + REPHUB_CMD_PROTECTED_WRITE =3D 2, + REPHUB_CMD_REPORT_VM_VOLUMES =3D 3, + REPHUB_CMD_START_PROTECT =3D 4, + REPHUB_CMD_READ_VOL_REQ =3D 5, + REPHUB_CMD_READ_VOL_RES =3D 6, + REPHUB_CMD_AGENT_SHUTDOWN =3D 7, +}; + +typedef struct RepCmdHdr { + uint16_t magic1; + uint16_t cmdid; + uint32_t data_size_bytes; +} RepCmdHdr; + +typedef struct RepCmd { + RepCmdHdr hdr; + unsigned int parameters[REPCMD_NUM_U32_PARAMS]; + unsigned int magic2; + uint8_t data[0]; +} RepCmd; + +RepCmd *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata); + +#endif /* REPCMD_H */ diff --git a/replication/repcmd_listener.c b/replication/repcmd_listener.c new file mode 100644 index 0000000..a211927 --- /dev/null +++ b/replication/repcmd_listener.c @@ -0,0 +1,137 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* Use the CONFIG_REPLICATION flag to determine whether + * we're under qemu build or a hub When under + * qemu use g_malloc */ +#ifdef CONFIG_REPLICATION +#include +#define REPCMD_MALLOC g_malloc +#else +#define REPCMD_MALLOC malloc +#endif + +#include "repcmd.h" +#include "repcmd_listener.h" + +#define ZERO_MEM_OBJ(pObj) memset((void *)pObj, 0, sizeof(*pObj)) + +typedef struct RepCmdListenerState { + int is_terminate_receive; +} RepCmdListenerState; + +static RepCmdListenerState g_listenerState =3D { 0 }; + +/* Returns 0 for initiated termination or socket error value on error */ +int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void *clientPtr) +{ + RepCmd curCmd; + uint8_t *pReadBuf =3D (uint8_t *) &curCmd; + int bytesToGet =3D sizeof(RepCmd); + int bytesGotten =3D 0; + int isGotHeader =3D 0; + uint8_t *pdata =3D NULL; + + assert(callback !=3D NULL); + + /* receive loop */ + while (!g_listenerState.is_terminate_receive) { + int bytecount; + + bytecount =3D recv(hsock, pReadBuf + bytesGotten, + bytesToGet - bytesGotten, 0); + if (bytecount =3D=3D -1) { + fprintf(stderr, "Error receiving data %d\n", errno); + return errno; + } + + if (bytecount =3D=3D 0) { + printf("Disconnected\n"); + return 0; + } + bytesGotten +=3D bytecount; +/* printf("Recieved bytes %d, got %d/%d\n", + bytecount, bytesGotten, bytesToGet); */ + /* print content */ + if (0) { + int i; + for (i =3D 0; i < bytecount ; i +=3D 4) { + /*printf("%d/%d", i, bytecount/4); */ + printf("%#x ", + *(int *) (&pReadBuf[bytesGotten - bytecount + i]))= ; + + } + printf("\n"); + } + assert(bytesGotten <=3D bytesToGet); + if (bytesGotten =3D=3D bytesToGet) { + int isGotData =3D 0; + bytesGotten =3D 0; + if (!isGotHeader) { + /* We just got the header */ + isGotHeader =3D 1; + + assert(curCmd.hdr.magic1 =3D=3D REPCMD_MAGIC1); + assert(curCmd.magic2 =3D=3D REPCMD_MAGIC2); + if (curCmd.hdr.data_size_bytes > 0) { + pdata =3D (uint8_t *)REPCMD_MALLOC( + curCmd.hdr.data_size_bytes); +/* printf("malloc %p\n", pdata); */ + pReadBuf =3D pdata; + } else { + /* no data */ + isGotData =3D 1; + pdata =3D NULL; + } + bytesToGet =3D curCmd.hdr.data_size_bytes; + } else { + isGotData =3D 1; + } + + if (isGotData) { + /* Got command and data */ + (*callback)(&curCmd, pdata, clientPtr); + + /* It's the callee responsibility to free pData */ + pdata =3D NULL; + ZERO_MEM_OBJ(&curCmd); + pReadBuf =3D (uint8_t *) &curCmd; + bytesGotten =3D 0; + bytesToGet =3D sizeof(RepCmd); + isGotHeader =3D 0; + } + } + } + return 0; +} + +RepCmd *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata) +{ + RepCmd *p_cmd =3D (RepCmd *)REPCMD_MALLOC(sizeof(RepCmd) + data_size); + assert(p_cmd !=3D NULL); + + /* Zero the CMD (not the data) */ + ZERO_MEM_OBJ(p_cmd); + + p_cmd->hdr.cmdid =3D cmd_id; + p_cmd->hdr.magic1 =3D REPCMD_MAGIC1; + p_cmd->magic2 =3D REPCMD_MAGIC2; + p_cmd->hdr.data_size_bytes =3D data_size; + + if (p_out_pdata !=3D NULL) { + *p_out_pdata =3D p_cmd->data; + } + + return p_cmd; +} + diff --git a/replication/repcmd_listener.h b/replication/repcmd_listener.h new file mode 100644 index 0000000..c09a12e --- /dev/null +++ b/replication/repcmd_listener.h @@ -0,0 +1,32 @@ +/* + * QEMU System Emulator + * + * Copyright (c) 2003-2008 Fabrice Bellard + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY= , + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +#ifndef REPCMD_LISTENER_H +#define REPCMD_LISTENER_H +#include +typedef void (*pfn_received_cmd_cb)(RepCmd *pCmd, + uint8_t *pData, void *clientPtr); + +int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void *clientPtr); + +#endif /* REPCMD_LISTENER_H */ diff --git a/replication/rephub_cmds.h b/replication/rephub_cmds.h new file mode 100644 index 0000000..820c37d --- /dev/null +++ b/replication/rephub_cmds.h @@ -0,0 +1,150 @@ +/* + * QEMU System Emulator + * + * Copyright (c) 2003-2008 Fabrice Bellard + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY= , + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +#ifndef REPHUB_CMDS_H +#define REPHUB_CMDS_H + +#include +#include "repcmd.h" +#include "rephub_defs.h" + +/********************************************************* + * RepCmd Report a protected IO + * + * REPHUB_CMD_PROTECTED_WRITE + * Direction: agent->hub + * + * Any write of a protected volume is send with this + * message to the hub, with its status. + * When case the status is bad no data is sent + *********************************************************/ +typedef struct RepCmdProtectedWrite { + RepCmdHdr hdr; + uint64_t volume_id; + uint64_t offset_sectors; + /* The size field duplicates the RepCmd size, + * but it is needed for reporting failed IOs' sizes */ + uint32_t size_sectors; + int ret_status; +} RepCmdProtectedWrite; + +/********************************************************* + * RepCmd Report VM volumes + * + * REPHUB_CMD_REPORT_VM_VOLUMES + * Direction: agent->hub + * + * The agent reports all the volumes of the VM + * to the hub. + *********************************************************/ +typedef struct RepVmVolumeInfo { + char name[REPHUB_MAX_VOL_NAME_LEN]; + uint64_t volume_id; + uint32_t size_mb; +} RepVmVolumeInfo; + +typedef struct RepCmdReportVmVolumes { + RepCmdHdr hdr; + int num_volumes; +} RepCmdReportVmVolumes; + +typedef struct RepCmdDataReportVmVolumes { + RepVmVolumeInfo volumes[0]; +} RepCmdDataReportVmVolumes; + + +/********************************************************* + * RepCmd Start protect + * + * REPHUB_CMD_START_PROTECT + * Direction: hub->agent + * + * The hub instructs the agent to start protecting + * a volume. When a volume is protected all its writes + * are sent to to the hub. + * With this command the hub also assigns a volume ID to + * the given volume name. + *********************************************************/ +typedef struct RepCmdStartProtect { + RepCmdHdr hdr; + uint64_t volume_id; +} RepCmdStartProtect; + +typedef struct RepCmdDataStartProtect { + char volume_name[REPHUB_MAX_VOL_NAME_LEN]; +} RepCmdDataStartProtect; + + +/********************************************************* + * RepCmd Read Volume Request + * + * REPHUB_CMD_READ_VOL_REQ + * Direction: hub->agent + * + * The hub issues a read IO to a protected volume. + * This command is used during sync - when the hub needs + * to read unsyncronized sections of a protected volume. + * This command is a request, the read data is returned + * by the response command REPHUB_CMD_READ_VOL_RES + *********************************************************/ +typedef struct RepCmdReadVolReq { + RepCmdHdr hdr; + int req_id; + int size_sectors; + uint64_t volume_id; + uint64_t offset_sectors; +} RepCmdReadVolReq; + +/********************************************************* + * RepCmd Read Volume Response + * + * REPHUB_CMD_READ_VOL_RES + * Direction: agent->hub + * + * A response to REPHUB_CMD_READ_VOL_REQ. + * Sends the data read from a protected volume + *********************************************************/ +typedef struct RepCmdReadVolRes { + RepCmdHdr hdr; + int req_id; + int is_status_success; + uint64_t volume_id; +} RepCmdReadVolRes; + +/********************************************************* + * RepCmd Agent shutdown + * + * REPHUB_CMD_AGENT_SHUTDOWN + * Direction: agent->hub + * + * Notifies the hub that the agent is about to shutdown. + * This allows a graceful shutdown. Any disconnection + * of an agent without sending this command will result + * in a full sync of the VM volumes. + *********************************************************/ +typedef struct RepCmdAgentShutdown { + RepCmdHdr hdr; +} RepCmdAgentShutdown; + + +#endif /* REPHUB_CMDS_H */ diff --git a/replication/rephub_defs.h b/replication/rephub_defs.h new file mode 100644 index 0000000..e34e0ce --- /dev/null +++ b/replication/rephub_defs.h @@ -0,0 +1,40 @@ +/* + * QEMU System Emulator + * + * Copyright (c) 2003-2008 Fabrice Bellard + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY= , + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +#ifndef REP_HUB_DEFS_H +#define REP_HUB_DEFS_H + +#include + +#define REPHUB_MAX_VOL_NAME_LEN (1024) +#define REPHUB_MAX_NUM_VOLUMES (512) + +#ifndef TRUE + #define TRUE (1) +#endif + +#ifndef FALSE + #define FALSE (0) +#endif + +#endif /* REP_HUB_DEFS_H */ diff --git a/vl.c b/vl.c index 624da0f..506b5dc 100644 --- a/vl.c +++ b/vl.c @@ -167,6 +167,7 @@ int main(int argc, char **argv) #include "ui/qemu-spice.h" +#include "replication/repagent.h" //#define DEBUG_NET //#define DEBUG_SLIRP @@ -2307,6 +2308,15 @@ int main(int argc, char **argv, char **envp) drive_add(IF_DEFAULT, popt->index - QEMU_OPTION_hda, optarg, HD_OPTS); break; + case QEMU_OPTION_repagent: +#ifdef CONFIG_REPLICATION + repagent_init(optarg, 0); +#else + fprintf(stderr, "Replication support is disabled. " + "Don't use -repagent option.\n"); + exit(1); +#endif + break; case QEMU_OPTION_drive: if (drive_def(optarg) =3D=3D NULL) { exit(1); --=20 1.7.6.4 --047d7b3396015fba3f04b85d40a9 Content-Type: text/html; charset=windows-1252 Content-Transfer-Encoding: quoted-printable

Repagent is a new module that= allows an external replication system to replicate a volume of a Qemu VM.<= /p>

This RFC patch adds the repagent client module to Qemu.

=A0

Documentation of the module role and A= PI is in the patch at replication/qemu-repagent.txt

=A0

The main motivation behind the module is to allow re= plication of VMs in a virtualization environment like RhevM.

To achieve this we need basic replication support in Qemu.

<= p class=3D"MsoNormal"> =A0

This is the first submission of this module, = which was written as a Proof Of Concept, and used successfully for replicat= ing and recovering a Qemu VM.

Points and open is= sues:

*=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 The module int= erfaces the Qemu storage stack at block.c generic layer. Is this the right = place to intercept/inject IOs?

*=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0 The patch contains performing IO reads invoked by a n= ew thread (a TCP listener thread). See repaget_read_vol in repagent.c. It i= s not protected by any lock =96 is this OK?

*=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 VM ID =96 the = replication system implies an environment with several VMs connected to a c= entral replication system (Rephub).

=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 This requires some sort of identification= for a VM. The current patch does not include a VM ID =96 I did not find an= y adequate ID to use.

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 Any su= ggestions?

=A0

Apprecia= te any feedback or suggestions.=A0 Thanks,

Ori.

=A0

=A0

From 5a0d88689ddcf325f25fdfca2a2012f1bbf141b9= Mon Sep 17 00:00:00 2001

From: Ori Mamluk <or= im@orim-fedora.(none)>

Date: Tue, 7 Feb 2012 1= 1:12:12 +0200

Subject: [PATCH] Added replication agent module (rep= agent) to Qemu under

replication directory, adde= d repagent configure and run

options, and the re= pagent API usage in bloc

=A0

Added build options to= ./configure:=A0 --enable-replication --disable-replicat

Added a commandline option to enable: -repagent <rep hub IP><= /p>

Added the module files under replication.

=A0

=

Signed-off-by: Ori Mamluk <orim@zerto.com>

---

Makefile=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 |= =A0=A0=A0 9 +-

Makefile.objs=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 |=A0=A0=A0 6 +

bl= ock.c=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0|= =A0=A0 20 +++-

configure=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 |=A0=A0 11 ++

qemu-options.hx=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 |=A0=A0=A0 6 +

replication/qemu-repagent.= txt |=A0 104 +++++++++++++

replication/repagent.= c=A0=A0=A0=A0=A0=A0=A0 |=A0 322 +++++++++++++++++++++++++++++++++++++++++

replication/repagent.h=A0=A0=A0=A0=A0=A0=A0 |=A0=A0= 46 ++++++

replication/repagent_client.c |=A0 13= 8 ++++++++++++++++++

replication/repagent_client= .h |=A0=A0 36 +++++

replication/repcmd.h=A0=A0=A0=A0=A0=A0=A0=A0=A0 |=A0=A0 59 ++++++++

replication/repcmd_listener.c |=A0 137 +++++++++++++++= ++

replication/repcmd_listener.h |=A0=A0 32 ++++=

replication/rephub_cmds.h=A0=A0=A0=A0 |=A0 150 = +++++++++++++++++++

replication/rephub_defs.h=A0=A0=A0=A0 |=A0=A0 40 ++= +++

vl.c=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 |=A0=A0 10 ++

16 files changed, 1121 insertions(+), 5 deletions(-)

mode change 100644 =3D> 100755 Makefile.objs

= mode change 100644 =3D> 100755 qemu-options.hx

create mode 100755 replication/qemu-repagent.txt

create mode 100644 replication/repagent.c

create mode 100644 replication/repagent.h

create mode 100644 replication/repagent_client.c

create mode 100644 replication/repagent_client.h

create mode 100644 replication/repcmd.h

create = mode 100644 replication/repcmd_listener.c

create= mode 100644 replication/repcmd_listener.h

creat= e mode 100644 replication/rephub_cmds.h

create mode 100644 replication/rephub_defs.h

=A0

diff --git a/Makefile b/M= akefile

index 4f6eaa4..a1b3701 100644

--- a/Makefile

+++ b/Makefile

@@ -149,9 +149,9 @@ qemu-img.o qemu-tool.o qemu-nbd.o qemu-io.o cmd.= o qemu-ga.o: $(GENERATED_HEADERS

tools-obj-y =3D= qemu-tool.o $(oslib-obj-y) $(trace-obj-y) \

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 qemu-tim= er-common.o cutils.o

= -qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y)

-qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y)

-qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(= block-obj-y)

+qemu-img$(EXESUF): qemu-img.o $(too= ls-obj-y) $(block-obj-y) $(replication-obj-y)

+qe= mu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y) $(replication-obj= -y)

+qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(= block-obj-y) $(replication-obj-y)

=A0qemu-img-cmds.h: $(SRC_PATH)/qemu-img-cmds.hx

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 $(call quiet-command,sh $(SRC_P= ATH)/scripts/hxtool -h < $< > $@,"=A0 GEN=A0=A0 $@")

=

@@ -228,6 +228,7 @@ clean:

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 rm -f trace-dtrace.dtrace trac= e-dtrace.dtrace-timestamp

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 rm -f tr= ace-dtrace.h trace-dtrace.h-timestamp

=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 rm -rf $(qapi-dir)

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 rm -f replication/*.{o,d}

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 $(MAKE) -C tests clean

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 for d in $(AL= L_SUBDIRS) $(QEMULIBS) libcacard; do \

=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 if test -d $$d; then $(MAKE) -C $$d $@ ||= exit 1; fi; \

@@ -387,4 +388,4 @@ tar:

= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 rm -rf /tmp/$(FILE)

=A0# Include automatically gener= ated dependency files

--include $(wildcard *.d audio/*.d slirp/*.d block/*.d net/*.d ui/*.d qapi/= *.d qga/*.d)

+-include $(wildcard *.d audio/*.d s= lirp/*.d block/*.d net/*.d ui/*.d qapi/*.d qga/*.d replication/*.d)

diff --git a/Makefile.objs b/Makefile.objs

old mo= de 100644

new mode 100755

index d7a6539..dbd6f15

--- a/Makefile.objs

=

+++ b/Makefile.objs

@@ -74,6 +74,7 @@ fsdev-obj-$= (CONFIG_VIRTFS) +=3D $(addprefix fsdev/, $(fsdev-nested-y))

# CPUs and machines.

=A0common-obj-y =3D $(block-obj-y) blockdev.o

+co= mmon-obj-y +=3D $(replication-obj-$(CONFIG_REPLICATION))

common-obj-y +=3D $(net-obj-y)

common-o= bj-y +=3D $(qobject-obj-y)

common-obj-$(CONFIG_LINUX) +=3D $(fsdev-obj-$(CONFI= G_LINUX))

@@ -413,6 +414,11 @@ common-obj-y +=3D = qmp-marshal.o qapi-visit.o qapi-types.o $(qapi-obj-y)

common-obj-y +=3D qmp.o hmp.o

=A0#####################= #################################################

+# replication

+replication-nested-y =3D repagen= t_client.o=A0 repagent.o=A0 repcmd_listener.o

+replication-obj-y =3D $(addprefix replication/, $(r= eplication-nested-y))

+

+######################################################################

# guest agent

=A0qga= -nested-y =3D guest-agent-commands.o guest-agent-command-state.o

diff --git a/block.c b/block.c

in= dex 9bb236c..f3b8387 100644

--- a/block.c

+++ b/block.= c

@@ -31,6 +31,10 @@

#= include "qemu-coroutine.h"

#include &q= uot;qmp-commands.h"

+#ifdef CONFIG_REPLICATI= ON

+#include "replication/repagent.h"

+#endif

+

#ifdef CONFIG_BSD

#include <sys/types.h><= /p>

#include <sys/stat.h>

@@ -640,6 +644,9 @@ int bdrv_open(BlockDriverState *bs, const char *f= ilename, int flags,

=A0=A0=A0=A0=A0=A0=A0=A0 goto unlink_and_fail;

=A0=A0=A0=A0 }

+#ifdef CONFIG_REPLICATION

+=A0=A0= =A0 repagent_register_drive(filename,=A0 bs);

+#endif

=A0=A0=A0=A0 /* Op= en the image */

=A0=A0=A0=A0 ret =3D bdrv_open_co= mmon(bs, filename, flags, drv);

=A0=A0=A0=A0 if (= ret < 0) {

@@ -1292,6 +1299,17 @@ static int coroutine_fn bdrv_co_do_writev(BlockDrive= rState *bs,

=A0=A0=A0= =A0=A0ret =3D drv->bdrv_co_writev(bs, sector_num, nb_sectors, qiov);

=

+

+#ifdef CONFIG_REPL= ICATION

+=A0=A0=A0 if (bs->device_name[0] !=3D= '\0') {

+=A0=A0=A0=A0=A0=A0=A0 /* We spl= it the IO only at the highest stack driver layer.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 Currently we know th= at by checking device_name - only

+=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0 highest level (closest to the guest) has that name.

<= p class=3D"MsoNormal">+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 */

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 repagent_handle_protected_write(bs, sector_= num,

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0 nb_sectors, qiov, ret);

+=A0=A0=A0 }

+#endif

=A0=A0=A0=A0 if (bs->= ;dirty_bitmap) {

=A0=A0=A0=A0=A0=A0=A0=A0 set_dirty_bitmap(bs, sector= _num, nb_sectors, 1);

=A0=A0=A0=A0 }

@@ -1783,7 +1801,7 @@ int bdrv_has_zero_init(BlockDriverStat= e *bs)

=A0 * 'nb_sectors' is the max value 'pnum' should be set to= .

=A0 */

int bdrv_is_a= llocated(BlockDriverState *bs, int64_t sector_num, int nb_sectors,

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 int *pnum)

+=A0=A0=A0 int *pnum)

{

=A0=A0=A0=A0 int64_t n;

=A0=A0=A0=A0 if (!bs= ->drv->bdrv_is_allocated) {

diff --git a/configure b/configure

index 9e5da44.= .93d600e 100755

--- a/configure

+++ b/configure

@@ -179,6 +179,7 @@ spic= e=3D""

rbd=3D""

smart= card=3D""

smartcard_nss=3D""=

+replication=3D""

usb_redir=3D""

opengl=3D""

zl= ib=3D"yes"

@@ -772,6 +773,10 @@ for opt= do

=A0=A0 ;;

=A0=A0 --= enable-smartcard-nss) smartcard_nss=3D"yes"

=A0=A0 ;;

+=A0 --disable-r= eplication) replication=3D"no"

+=A0 ;;<= /p>

+=A0 --enable-replication) replication=3D"ye= s"

+=A0 ;;

=A0=A0 --disable-usb-redir) usb_redir=3D&= quot;no"

=A0=A0 ;;

=A0=A0 --enable-usb-redir) usb_redir=3D"yes"

@@ -1067,6 +1072,7 @@ echo "=A0 --disable-usb-redir=A0=A0=A0=A0= =A0 disable usb network redirection support"

echo "=A0 --enable-usb-redir=A0=A0=A0=A0=A0=A0= enable usb network redirection support"

ec= ho "=A0 --disable-guest-agent=A0=A0=A0 disable building of the QEMU Gu= est Agent"

echo "=A0 --enable-guest-agent=A0=A0=A0=A0 enable building of the QEM= U Guest Agent"

+echo "=A0 --enable-repl= ication=A0=A0=A0=A0 enable replication support"

echo ""

echo "NOTE: The object files are built at the = place where configure is launched"

exit 1

@@ -2733,6 +2739,7 @@ echo "curl support=A0= =A0=A0=A0=A0 $curl"

echo "check support=A0=A0=A0=A0 $check_utests&= quot;

echo "mingw32 support=A0=A0 $mingw32&= quot;

echo "Audio drivers=A0=A0=A0=A0 $audi= o_drv_list"

+echo "Replication=A0=A0=A0=A0=A0 =A0=A0=A0 $replication"

echo "Extra audio cards $audio_card_list"

=

echo "Block whitelist=A0=A0 $block_drv_whiteli= st"

echo "Mixer emulation=A0=A0 $mixemu"

@= @ -3080,6 +3087,10 @@ if test "$smartcard_nss" =3D "yes"= ; ; then

=A0=A0 echo "CONFIG_SMARTCARD_NSS= =3Dy" >> $config_host_mak

fi

+if test "$replication" =3D "yes" ; then

+=A0 echo "CONFIG_REPLICATION=3Dy" >> $co= nfig_host_mak

+fi

+

if test "$usb_redir" =3D "yes" ; then

=A0=A0 echo "CONFIG_USB_REDIR=3Dy" >> $conf= ig_host_mak

fi

diff --git a/qemu-opti= ons.hx b/qemu-options.hx

old mode 100644

new mode 100755

index 681eaf1..= c97e4f8

--- a/qemu-options.hx

+++ = b/qemu-options.hx

@@ -2602,3 +2602,9 @@ HXCOMM Th= is is the last statement. Insert new options before this line!

STEXI

@end table

ETE= XI

+

+DEF("repagen= t", HAS_ARG, QEMU_OPTION_repagent,

+=A0=A0= =A0 "-repagent [hub IP/name]\n"

+=A0=A0=A0 "=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0= =A0=A0=A0=A0Enable replication support for disks\n"

+=A0=A0=A0 "=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 hub = is the ip or name of the machine running the replication hub.\n",

+=A0=A0=A0 QEMU_ARCH_ALL)

= diff --git a/replication/qemu-repagent.txt b/replication/qemu-repagent.txt<= /p>

new file mode 100755

in= dex 0000000..e3b0c1e

--- /dev/null

+++ b/replic= ation/qemu-repagent.txt

@@ -0,0 +1,104 @@

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 repagent - replica= tion agent - a Qemu module for enabling continuous async replication of VM = volumes

+

+Introduction

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 This document describ= es a feature in Qemu - a replication agent (AKA Repagent).

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 The Repagent is a new modul= e that exposes an API to an external replication system (AKA Rephub).

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 This API allow= s a Rephub to communicate with a Qemu VM and continuously replicate its vol= umes.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 The i= mlementation of a Rephub is outside of the scope of this document. There ma= y be several various Rephub

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 implenetations= using the same repagent in Qemu.

+

+Main feature of Repagent

+=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 Repagent does the following:

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 * Report volum= es - report a list of all volumes in a VM to the Rephub.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 * Report writes to a volume -= send all writes made to a protected volume to the Rephub.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 The reporting of an IO is asyncronuous - = i.e. the IO is not delayed by the Repagent to get any acknowledgement from = the Rephub.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 It is only copied to the R= ephub.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 * Read a prote= cted volume - allows the Rephub to read a protected volume, to enable the p= rotected hub to syncronize the content of a protected volume.

+

+Description of the Repagent module

+

+Build and run options

+=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0 New configure option: --enable-replication

<= p class=3D"MsoNormal">+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 New command lin= e option:

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 -repagent [hub= IP/name]

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 Enable replication support for d= isks

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 hub is the ip or name of the machine running the replication hub.

+

+Module APIs

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 The Repagent module interfaces two ma= in components:

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0 1. The Rephub - An external API based on socket messages

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 2. The generic block laye= r- block.c

+

+=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0 Rephub message API

+=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 Th= e external replication API is a message based API.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0 We won't go into the structure of the messages here - just= the sematics.

+

+=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 Messages li= st

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0 (The updated list and comments are in Rephub_cmds.h)

+

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0 Messages from the Repagent to the Rephub:

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 * Protec= ted write

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 The Repagent sends each write to a protec= ted volume to the hub with the IO status.

+=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0 In case the status is bad the write content is not= sent

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 * Report VM volumes

+=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0 The agent reports all the volumes of the VM to the hub.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 * Read Volume Response

+=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0 A response to a Read Volume Request

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 Sends the data read from a protected volu= me to the hub

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0 * Agent shutdown

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 Notifies the hub that the agent is about = to shutdown.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 Th= is allows a graceful shutdown. Any disconnection of an agent without

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 sending this command= will result in a full sync of the VM volumes.

+<= /p>

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0 Messages from the Rephub to the Repagent:

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 * Start protect

+=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 The hub instructs the agent to start protecting a volume. When a vol= ume is protected

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 all its writes are s= ent to to the hub.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0 With this command the hub also assigns a volume ID to the given volume = name.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 * Read volume request

+=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0 The hub issues a read IO to a protected volume.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 This command is used= during sync - when the hub needs to read unsyncronized

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 sections of a protected volume.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 This command is a re= quest, the read data is returned by the read volume response message (see a= bove).

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 bloc= k.c API

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 The API to the generic block storage laye= r contains 3 functionalities:

+=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 1. Handl= e writes to protected volumes

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 In bdrv_co_do_= writev, each write is reported to the Repagent module.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0 2. Handle each new volume that registers

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 In bdrv_open -= each new bottom-level block driver that registers is reported.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0 2. Read from a volume

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 Repagent calls= bdrv_aio_readv to handle read requests coming from the hub.

+

+

+Genera= l description of a Rephub=A0 - a replication system the repagent connects t= o

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 This section d= escribes in high level a sample Rephub - a replication system that uses the= repagent API

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0 to replicate disks.

+=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0 It describes a simple Rephub that comntinuously maintains a mi= rror of the volumes of a VM.

+

+=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0 Say we have a VM we want to protect - call it PVM, say it h= as 2 volumes - V1, V2.

+=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0 Our Rephub is called SingleRephub - a Rephub protecting a sing= le VM.

+

+=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0 Preparations

+=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0 1. The user chooses a host to rub SingleRephub - a diffe= rent host than PVM, call it Host2

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 2. The user creates two volumes on Ho= st2 - same sizes of V1 and V2, call them V1R (V1 recovery) and V2R.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 3. The user runs S= ingleRephub process on Host2, and gives V1R and V2R as command line argumen= ts.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 From now on SingleRephub waits for the pr= otected VM repagent to connect.

+=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0 4. The user runs the protected VM PVM - and uses the = switch -repagent <Host2 IP>.

+

+=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0 Runtime

+=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0 1. The repagent module connects to SingleRephub on startup.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 2. repagent r= eports V1 and V2 to SingleRephub.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 3. SingleRephu= b starts to perform an initial synchronization of the protected volumes-

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 it reads each protected volume (V1 and= V2) - using read volume requests - and copies the data into the

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 recovery volume V1R and V2R.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 4. SingleRephub enters= 'protection' mode - each write to the protected volume is sent by = the repagent to the Rephub,

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 and the Rephub performs the write on the = matching recovery volume.

+

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 * Note that during stage 3 write= s to the protected volumes are not ignored - they're kept in a bitmap,<= /p>

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 and will be read again when stage 3 ends,= in an interative convergin process.

+

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 This flow continuousl= y maintains an updated recovery volume.

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 If the protect= ed system is damaged, the user can create a new VM on Host2 with the replic= ated volumes attached to it.

+=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0 The new VM is a replica of the protected system.

+

+

diff --git a/replication/repagent.c b/replication/repagent.c

new file mode 100644

index 000000= 0..c66eae7

--- /dev/null

+++ b/replic= ation/repagent.c

@@ -0,0 +1,322 @@

+#include <string.h>

+#include = <stdlib.h>

+#include <stdio.h>

= +#include <pthread.h>

+#include <stdint.= h>

+

+#include "= ;block.h"

+#include "rephub_defs.h"

+#include "block_int.h"

+#inc= lude "repagent_client.h"

+#include &quo= t;repagent.h"

+#include "rephub_cmds.h"

+

+#define ZERO_MEM_OBJ(pObj) memset(pO= bj, 0, sizeof(*pObj))

+#define REPAGENT_MAX_NUM_V= OLUMES (64)

+#define REPAGENT_VOLUME_ID_NONE (0)

+

+typedef struct RepagentVolume {

=

+=A0=A0=A0 uint64_t vol_id;

+=A0=A0=A0 const char *vol_path;

+=A0=A0=A0 BlockDriverState *driver_ptr;

+} RepagentVolume;

+

+struct RepAgentState {

+=A0=A0=A0= int is_init;

+=A0=A0=A0 int num_volumes;

+=A0=A0=A0 RepagentVo= lume * volumes[REPAGENT_MAX_NUM_VOLUMES];

+};

=

+

+typedef struct Repagent= ReadVolIo {

+=A0=A0=A0 QEMUIOVector qiov;

+=A0=A0=A0 RepCmdRe= adVolReq rep_cmd;

+=A0=A0=A0 uint8_t *buf;

+=A0=A0=A0 struct timeval start_time;

+} RepagentReadVolIo;

+

+static int repagent_get= _volume_by_name(const char *name);

+static void r= epagent_report_volumes_to_hub(void);

+static void= repagent_vol_read_done(void *opaque, int ret);

+static struct timeval tsub(struct timeval t1, struc= t timeval t2);

+

+RepAg= entState g_rep_agent =3D { 0 };

+

+void repagent_init(const char *hubname, int port)

+{

+=A0=A0=A0 /* It is the responsibility of th= e thread to free this struct */

+=A0=A0=A0 rephub= _params *pParams =3D (rephub_params *)g_malloc(sizeof(rephub_params));

+=A0=A0=A0 if (hubname =3D=3D NULL) {

+=A0=A0=A0=A0=A0=A0=A0 hubname =3D "127.0.0.1";

+=A0=A0=A0 }

+=A0=A0=A0 if (= port =3D=3D 0) {

+=A0=A0=A0=A0=A0=A0=A0 port =3D = 9010;

+=A0=A0=A0 }

+

+=A0=A0=A0 printf("repagent_init %s\n", hubname);<= /p>

+

+=A0=A0=A0 pParams-&g= t;port =3D port;

+=A0=A0=A0 pParams->name =3D g_strdup(hubname);

+

+=A0=A0=A0 pthread_t thread_id =3D 0;

+

+=A0=A0=A0 /* Create the rep= agent client listener thread */

+=A0=A0=A0 pthread_create(&thread_id, 0, repagen= t_listen, (void *) pParams);

+=A0=A0=A0 pthread_d= etach(thread_id);

+}

+<= /p>

+void repagent_register_drive(const char *drive_path,

+=A0=A0=A0=A0=A0=A0=A0 BlockDriverState *driver_ptr)

+{

+=A0=A0=A0 int i;

+=A0=A0=A0 for (i =3D 0; i < g_rep_agent.num_volumes ; i++) {

+=A0=A0=A0=A0=A0=A0=A0 RepagentVolume *vol =3D g_rep= _agent.volumes[i];

+=A0=A0=A0=A0=A0=A0=A0 if (vol= !=3D NULL) {

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 = assert(

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0 strcmp(drive_path, vol->vol_path) !=3D 0

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 && driver_ptr !=3D vol->driver_ptr);

+=A0=A0=A0=A0=A0=A0=A0 }

+=A0=A0=A0 }

+

+=A0=A0=A0 assert(g_rep_ag= ent.num_volumes < REPAGENT_MAX_NUM_VOLUMES);

+

+=A0=A0=A0 printf("= zerto repagent: Registering drive. Num drives %d, path %s\n",

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 g_rep_agent.num_volume= s, drive_path);

+=A0=A0=A0 g_rep_agent.volumes[i] =3D

+=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0 (RepagentVolume *)g_malloc(sizeof(RepagentVolume))= ;

+=A0=A0=A0 g_rep_agent.volumes[i]->driver_pt= r =3D driver_ptr;

+=A0=A0=A0 /* orim todo strcpy? */

+=A0=A0=A0 g_r= ep_agent.volumes[i]->vol_path =3D drive_path;

= +

+=A0=A0=A0 /* Orim todo thread-safety? */

+=A0=A0=A0 g_rep_agent.num_volumes++;

+

+=A0=A0=A0 repagent_repo= rt_volumes_to_hub();

+}

+

+/* orim todo destruction? */

+

+static RepagentVolume *repagent_get_protected_volum= e_by_driver(

+=A0=A0=A0=A0=A0=A0=A0 BlockDriverSt= ate *bs)

+{

+=A0=A0=A0 = /* orim todo optimize search */

+=A0=A0=A0 int i =3D 0;

+= =A0=A0=A0 for (i =3D 0; i < g_rep_agent.num_volumes ; i++) {

+=A0=A0=A0=A0=A0=A0=A0 RepagentVolume *p_vol =3D g_rep_agent= .volumes[i];

+=A0=A0=A0=A0=A0=A0=A0 if (p_vol !=3D NULL && p_vol->driver_ptr = =3D=3D (void *) bs) {

+=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0 return p_vol;

+=A0=A0=A0=A0=A0=A0=A0 }<= /p>

+=A0=A0=A0 }

+=A0=A0=A0= return NULL;

+}

+

+void repagent_handle_protected_write(BlockDriverState *bs, int64_t se= ctor_num,

+=A0=A0=A0=A0=A0=A0=A0 int nb_sectors, = QEMUIOVector *qiov, int ret_status)

+{

+=A0=A0=A0 printf("= ;zerto Protected write offset %lld, size %d, IO return status %d",

=

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 (long long int) s= ector_num, nb_sectors, ret_status);

+=A0=A0=A0 if (bs->filename !=3D NULL) {

+=A0=A0=A0=A0=A0=A0=A0 printf(", filename %s", = bs->filename);

+=A0=A0=A0 }

+

+=A0=A0=A0 printf("\n");

+

+=A0=A0=A0 RepagentVolume *p_vol =3D repagent_get_protected_= volume_by_driver(bs);

+=A0=A0=A0 if (p_vol =3D=3D= NULL || p_vol->vol_id =3D=3D REPAGENT_VOLUME_ID_NONE) {

+=A0=A0=A0=A0=A0=A0=A0 /* Unprotected */

+=A0=A0=A0=A0=A0=A0=A0 printf("Got a write to an unprot= ected volume.\n");

+=A0=A0=A0=A0=A0=A0=A0 re= turn;

+=A0=A0=A0 }

+

+=A0=A0=A0 /* Report IO to rephub */

+

+=A0=A0=A0 int data_size =3D qi= ov->size;

+=A0=A0=A0 if (ret_status < 0) {<= /p>

+=A0=A0=A0=A0=A0=A0=A0 /* On failed ios we don't send the data to the h= ub */

+=A0=A0=A0=A0=A0=A0=A0 data_size =3D 0;

=

+=A0=A0=A0 }

+=A0=A0=A0 ui= nt8_t *pdata =3D NULL;

+=A0=A0=A0 RepCmdProtectedWrite *p_cmd =3D (RepCmdProtectedWrite *) repcmd_= new(

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 REPHUB_CM= D_PROTECTED_WRITE, data_size, (uint8_t **) &pdata);

+

+=A0=A0=A0 if (ret_status >=3D 0) {

+=A0=A0=A0= =A0=A0=A0 =A0qemu_iovec_to_buffer(qiov, pdata);

+= =A0=A0=A0 }

+

+=A0=A0= =A0 p_cmd->volume_id =3D p_vol->vol_id;

+=A0=A0=A0 p_cmd->offset_sectors =3D sector_num;<= /p>

+=A0=A0=A0 p_cmd->size_sectors =3D nb_sectors;=

+=A0=A0=A0 p_cmd->ret_status =3D ret_status;<= /p>

+

+=A0=A0=A0 if (repagent_client_send((RepCmd *) p_cmd= ) !=3D 0) {

+=A0=A0=A0=A0=A0=A0=A0 printf("E= rror sending command\n");

+=A0=A0=A0 }

+}

+

+static void repagent_report_volumes_to_hub(voi= d)

+{

+=A0=A0=A0 /* Rep= ort IO to rephub */

+=A0=A0=A0 int i;

+=A0=A0=A0 RepCmdDataReportVmVolumes *p_cmd_data =3D NULL;

+=A0=A0=A0 RepCmdReportVmVolumes *p_cmd =3D (RepCmdR= eportVmVolumes *) repcmd_new(

+=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0 REPHUB_CMD_REPORT_VM_VOLUMES,

+= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 g_rep_agent.num_volumes * sizeof(RepVmVol= umeInfo),

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 (uint8_t **) &= ;p_cmd_data);

+=A0=A0=A0 p_cmd->num_volumes = =3D g_rep_agent.num_volumes;

+=A0=A0=A0 printf(&q= uot;reporting %u volumes\n", g_rep_agent.num_volumes);

+=A0=A0=A0 for (i =3D 0; i < g_rep_agent.num_volu= mes ; i++) {

+=A0=A0=A0=A0=A0=A0=A0 assert(g_rep_= agent.volumes[i] !=3D NULL);

+=A0=A0=A0=A0=A0=A0= =A0 printf("reporting volume %s size %u\n",

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 g_rep= _agent.volumes[i]->vol_path,

+=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 (uint32_t) sizeof(p_cmd_data->volumes[i].= name));

+=A0=A0=A0=A0=A0=A0=A0 strncpy((char *) p= _cmd_data->volumes[i].name,

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 g_rep= _agent.volumes[i]->vol_path,

+=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 sizeof(p_cmd_data->volumes[i].name));

=

+ =A0=A0=A0=A0=A0=A0=A0p_cmd_data->volumes[i].vol= ume_id =3D g_rep_agent.volumes[i]->vol_id;

+=A0=A0=A0 }

+=A0=A0=A0 if= (repagent_client_send((RepCmd *) p_cmd) !=3D 0) {

+=A0=A0=A0=A0=A0=A0=A0 printf("Error sending command\n");

+=A0=A0=A0 }

+}

+

+int repaget_start= _protect(RepCmdStartProtect *pcmd,

+=A0=A0=A0=A0= =A0=A0=A0 RepCmdDataStartProtect *pcmd_data)

+{

+=A0=A0=A0 printf("Start protect vol %s, ID %llu\n", pcmd_data-&g= t;volume_name,

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= (unsigned long long) pcmd->volume_id);

+=A0= =A0=A0 int vol_index =3D repagent_get_volume_by_name(pcmd_data->volume_n= ame);

+=A0=A0=A0 if (vol_index < 0) {

+=A0=A0=A0=A0=A0=A0=A0 printf("The volume doesn't exist\n= ");

+=A0=A0=A0=A0=A0=A0=A0 return TRUE;

<= p class=3D"MsoNormal">+=A0=A0=A0 }

+=A0=A0=A0 /* orim todo protect */

+=A0=A0=A0 g_r= ep_agent.volumes[vol_index]->vol_id =3D pcmd->volume_id;

+

+=A0=A0=A0 return TRUE;

+}

+

+static int repagent_get_volume_by_name(const c= har *name)

+{

+=A0=A0= =A0 int i =3D 0;

+=A0=A0=A0 for (i =3D 0; i < = g_rep_agent.num_volumes ; i++) {

+=A0=A0=A0=A0=A0=A0=A0 if (g_rep_agent.volumes[i] != =3D NULL

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 && strcmp(name, g_rep_agent.volumes[i]->vol_path) =3D=3D = 0) {

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 return i;=

+=A0=A0=A0=A0=A0=A0=A0 }

+= =A0=A0=A0 }

+=A0=A0=A0 return -1;

+}

+

+static= int repagent_get_volume_by_id(uint64_t vol_id)

+{

+=A0=A0=A0 int i =3D 0;=

+=A0=A0=A0 for (i =3D 0; i < g_rep_agent.num_= volumes ; i++) {

+=A0=A0=A0=A0=A0=A0=A0 if (g_rep= _agent.volumes[i] !=3D NULL

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 && g_rep_agent.volum= es[i]->vol_id =3D=3D vol_id) {

+=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0 return i;

+=A0=A0=A0=A0=A0= =A0=A0 }

+=A0=A0=A0 }

+= =A0=A0=A0 return -1;

+}

+

+int repaget_read_vol(RepCmdReadVolReq *pcmd, uint8_t *pdata)

+{

+=A0=A0=A0 int index =3D rep= agent_get_volume_by_id(pcmd->volume_id);

+=A0=A0=A0 int size_bytes =3D pcmd->size_sectors = * 512;

+=A0=A0=A0 if (index < 0) {

+=A0=A0=A0=A0=A0=A0=A0 printf("Vol read - Could not fin= d vol id %llu\n",

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 (unsigned long long int) pcm= d->volume_id);

+=A0=A0=A0=A0=A0=A0=A0 RepCmdRe= adVolRes *p_res_cmd =3D (RepCmdReadVolRes *) repcmd_new(

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 REPHUB_CMD_READ_VOL_= RES, 0, NULL);

+=A0=A0=A0=A0=A0=A0=A0 p_res_cmd->req_id =3D pcmd= ->req_id;

+=A0=A0=A0=A0=A0=A0=A0 p_res_cmd->= ;volume_id =3D pcmd->volume_id;

+=A0=A0=A0=A0= =A0=A0=A0 p_res_cmd->is_status_success =3D FALSE;

+=A0=A0=A0=A0=A0=A0=A0 repagent_client_send((RepCmd = *) p_res_cmd);

+=A0=A0=A0=A0=A0=A0=A0 return TRUE= ;

+=A0=A0=A0 }

+

+=A0=A0=A0 printf("Vol read - driver %p, volId %ll= u, offset %llu, size %u\n",

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 g_rep_agent.volum= es[index]->driver_ptr,

+=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0 (unsigned long long int) pcmd->volume_id,

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 (unsigned long long int) pcmd-&= gt;offset_sectors, pcmd->size_sectors);

+

+=A0=A0=A0 {

+=A0=A0=A0=A0=A0=A0=A0 RepagentReadVolIo *read_xact =3D call= oc(1, sizeof(RepagentReadVolIo));

+

+/*=A0=A0=A0=A0=A0=A0=A0 BlockDriverAIOCB *acb; */

+

+=A0=A0=A0=A0=A0=A0=A0 Z= ERO_MEM_OBJ(read_xact);

+

+=A0=A0=A0=A0=A0=A0=A0 qemu_iovec_init(&read_xact->qiov, 1);

<= p class=3D"MsoNormal">+

+=A0=A0=A0=A0=A0=A0=A0 /*read_xact->buf =3D

+= =A0=A0=A0=A0=A0=A0=A0 qemu_blockalign(g_rep_agent.volumes[index]->driver= _ptr, size_bytes); */

+=A0=A0=A0=A0=A0=A0=A0 read= _xact->buf =3D (uint8_t *) g_malloc(size_bytes);

+=A0=A0=A0=A0=A0=A0=A0 read_xact->rep_cmd =3D *pc= md;

+=A0=A0=A0=A0=A0=A0=A0 qemu_iovec_add(&re= ad_xact->qiov, read_xact->buf, size_bytes);

+

+=A0=A0=A0=A0=A0=A0=A0 gettimeofday(&read_= xact->start_time, NULL);

+=A0=A0=A0=A0=A0=A0=A0 /* orim TODO - use the return= ed acb to cancel the request on shutdown */

+=A0= =A0=A0=A0=A0=A0=A0 /*acb =3D */bdrv_aio_readv(g_rep_agent.volumes[index]-&g= t;driver_ptr,

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 read_xact->rep_cmd.offset= _sectors, &read_xact->qiov,

+=A0=A0=A0=A0 = =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0read_xact->rep_cmd.size_sectors, repage= nt_vol_read_done,

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0 read_xact);

+=A0=A0=A0 }

+

+=A0=A0=A0 return TRUE;

+}

+

+static void repagent_vol_re= ad_done(void *opaque, int ret)

+{

+=A0=A0=A0 struct timev= al t2;

+=A0=A0=A0 RepagentReadVolIo *read_xact = =3D (RepagentReadVolIo *) opaque;

+=A0=A0=A0 uint= 8_t *pdata =3D NULL;

+=A0=A0=A0 RepCmdReadVolRes *pcmd =3D (RepCmdReadVolRes *) repcmd_new(

<= p class=3D"MsoNormal">+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 REPHUB_CMD_READ_VO= L_RES, read_xact->rep_cmd.size_sectors * 512,

= +=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 &pdata);

+=A0=A0=A0 pcmd->req_id =3D read_xact->rep_cmd.req_id;

+=A0=A0=A0 pcmd->volume_id =3D read_xact->rep_cmd.volume_= id;

+=A0=A0=A0 pcmd->is_status_success =3D FAL= SE;

+

+=A0=A0=A0 printf("Protected vol read - vo= lId %llu, offset %llu, size %u\n",

+=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0 (unsigned long long int) read_xact->rep_cmd.= volume_id,

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 (unsigned long long int) read_xact->r= ep_cmd.offset_sectors,

+=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0 read_xact->rep_cmd.size_sectors);

+= =A0=A0=A0 gettimeofday(&t2, NULL);

+

+=A0=A0=A0 if (ret >=3D 0) {

+=A0=A0=A0=A0=A0=A0=A0 /* Read response - send the data to the h= ub */

+=A0=A0 =A0=A0=A0=A0=A0t2 =3D tsub(t2, read= _xact->start_time);

+=A0=A0=A0=A0=A0=A0=A0 printf("Read prot vol done. Took %u seconds, %u= us.",

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0 (uint32_t) t2.tv_sec, (uint32_t) t2.tv_usec);

+

+=A0=A0=A0=A0=A0=A0=A0 pcmd->is_statu= s_success =3D TRUE;

+=A0=A0=A0=A0=A0=A0=A0 /* orim todo optimize - don&#= 39;t copy, use the qiov buffer */

+=A0=A0=A0=A0= =A0=A0=A0 qemu_iovec_to_buffer(&read_xact->qiov, pdata);

+=A0=A0=A0 } else {

+=A0=A0=A0=A0=A0=A0=A0 printf("readv failed: %s= \n", strerror(-ret));

+=A0=A0=A0 }

+

+=A0=A0=A0 repagent_client_sen= d((RepCmd *) pcmd);

+

+=A0=A0=A0 /*qemu_vfree(read_xact->buf); */<= /p>

+=A0=A0=A0 g_free(read_xact->buf);

+

+=A0=A0=A0 g_free(read_xact);

+}

+

+static struct tim= eval tsub(struct timeval t1, struct timeval t2)

+= {

+=A0=A0=A0 t1.tv_usec -=3D t2.tv_usec;

+=A0=A0=A0 if (t1.tv_usec < 0) {

+=A0=A0=A0=A0= =A0=A0=A0 t1.tv_usec +=3D 1000000;

+=A0=A0=A0=A0= =A0=A0=A0 t1.tv_sec--;

+=A0=A0=A0 }

+=A0=A0=A0 t1.tv_sec -=3D t2.tv_sec;

+=A0=A0=A0 return t1;

+}

+

+void repagent_client_connected(void)

+{

+=A0=A0=A0 /* orim todo thr= ead protection */

+=A0=A0=A0 repagent_report_volumes_to_hub();

+}

diff --git a/replication/re= pagent.h b/replication/repagent.h

new file mode 1= 00644

index 0000000..98ccbf2

--- /dev/null

+++ b/replication/repagent.h

@@ -0= ,0 +1,46 @@

+/*

+ * QEM= U System Emulator

+ *

+ * Copyright (c) 2003= -2008 Fabrice Bellard

+ *

+ * Permission is hereby granted, free of charge, to any person obtainin= g a copy

+ * of this software and associated documentation fi= les (the "Software"), to deal

+ * in th= e Software without restriction, including without limitation the rights

+ * to use, copy, modify, merge, publish, distribute= , sublicense, and/or sell

+ * copies of the Softw= are, and to permit persons to whom the Software is

+ * furnished to do so, subject to the following conditions:

+ *

+ * The above copyright notice an= d this permission notice shall be included in

+ *= all copies or substantial portions of the Software.

+ *

+ * THE SOFTWARE IS PR= OVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR

+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES = OF MERCHANTABILITY,

+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGE= MENT. IN NO EVENT SHALL

+ * THE AUTHORS OR COPYRI= GHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER

+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING= FROM,

+ * OUT OF OR IN CONNECTION WITH THE SOFTW= ARE OR THE USE OR OTHER DEALINGS IN

+ * THE SOFTW= ARE.

+ */

+#ifndef REPAGENT_H

+#define REPAGENT_H

+#in= clude <stdint.h>

+

+#include "qemu-common.h"

+

+typedef struct RepAgent= State RepAgentState;

+typedef struct RepCmdStartP= rotect RepCmdStartProtect;

+typedef struct RepCmd= DataStartProtect RepCmdDataStartProtect;

+struct RepCmdReadVolReq;

= +

+void repagent_init(const char *hubname, int po= rt);

+void repagent_handle_protected_write(BlockD= riverState *bs,

+=A0=A0=A0=A0=A0=A0=A0 int64_t sector_num, int nb_se= ctors, QEMUIOVector *qiov, int ret_status);

+void= repagent_register_drive(const char *drive_path,

= +=A0=A0=A0=A0=A0=A0=A0 BlockDriverState *driver_ptr);

+int repaget_start_protect(RepCmdStartProtect *pcmd,=

+=A0=A0=A0=A0=A0=A0=A0 RepCmdDataStartProtect *p= cmd_data);

+int repaget_read_vol(struct RepCmdRea= dVolReq *pcmd, uint8_t *pdata);

+void repagent_client_connected(void);

+

+

+#en= dif /* REPAGENT_H */

diff --git a/replication/rep= agent_client.c b/replication/repagent_client.c

new file mode 100644

index= 0000000..4dd9ea4

--- /dev/null

+++ b/replication/repagent_client.c

@@ -= 0,0 +1,138 @@

+#include "repcmd.h"

+#include "rephub_cmds.h"

+#includ= e "repcmd_listener.h"

+#include "r= epagent_client.h"

+#include "repagent.h"

+

+#include <string.h>

+#include <stdlib.h>

+#inclu= de <errno.h>

+#include <stdio.h>

= +#include <resolv.h>

+#include <sys/sock= et.h>

+#include <arpa/inet.h>

+#include <netinet/in.h>

+#include <unis= td.h>

+

+#define ZER= O_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj))

+<= /p>

+static void repagent_process_cmd(RepCmd *pCmd, uint8_t *pData, void *clien= tPtr);

+

+typedef struc= t repagent_client_state {

+=A0=A0=A0 int is_conne= cted;

+=A0=A0=A0 int is_terminate_receive;

+=A0=A0=A0 int hsock;

+} repagent_clie= nt_state;

+

+static rep= agent_client_state g_client_state =3D { 0 };

+

+void *repagent_listen(v= oid *pParam)

+{

+=A0=A0= =A0 rephub_params *pServerParams =3D (rephub_params *) pParam;

+=A0=A0=A0 int host_port =3D pServerParams->port;

+=A0=A0=A0 const char *host_name =3D pServerParams->name;

+

+=A0=A0=A0 printf("Creatin= g repagent listener thread...\n");

+=A0=A0=A0 g_free(pServerParams);

+

+=A0=A0=A0 struct sockaddr_in my_addr;<= /p>

+

+=A0=A0=A0 int err;

+=A0=A0=A0 int retries =3D 0;

+

+=A0=A0 =A0g_client_stat= e.hsock =3D socket(AF_INET, SOCK_STREAM, 0);

+=A0= =A0=A0 if (g_client_state.hsock =3D=3D -1) {

+=A0= =A0=A0=A0=A0=A0=A0 printf("Error initializing socket %d\n", errno= );

+=A0=A0=A0=A0=A0=A0=A0 return (void *) -1;

+=A0=A0=A0 }

+

+=A0=A0=A0 int param =3D 1;

+

+=A0=A0=A0 if ((setsockopt(g_client_state.hsock, SOL_SOCKET= , SO_REUSEADDR,

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 (char *) &par= am, sizeof(int)) =3D=3D -1)

+=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0 || (setsockopt(g_client_state.hsock, SOL_SOCKET, SO_KEEPALI= VE,

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0 (char *) &param, sizeof(int)) =3D=3D -1)) {

+=A0=A0=A0=A0=A0=A0=A0 printf("Error setting op= tions %d\n", errno);

+=A0=A0=A0=A0=A0=A0=A0 = return (void *) -1;

+=A0=A0=A0 }

+

+=A0=A0=A0 my_addr.sin_family =3D AF_= INET;

+=A0=A0=A0 my_addr.sin_port =3D htons(host_port);

+=A0=A0=A0 memset(&(my_addr.sin_zero), 0, 8);

+

+=A0=A0=A0 my_addr.sin= _addr.s_addr =3D inet_addr(host_name);

+

+=A0=A0=A0 /* Reconnect = loop */

+=A0=A0=A0 while (!g_client_state.is_term= inate_receive) {

+

+=A0= =A0=A0=A0=A0=A0=A0 if (connect(g_client_state.hsock, (struct sockaddr *) &a= mp;my_addr,

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 sizeo= f(my_addr)) =3D=3D -1) {

+=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0 err =3D errno;

+=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0 if (err !=3D EINPROGRESS) {

+=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 retries++;

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 fprin= tf(

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0 stderr,

+=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 "Error conne= cting socket %d. Host %s, port %u. Retry count %d\n",

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0 errno, host_name, host_port, retries);

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 usleep(5 * 1000 * 1= 000);

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0 continue;

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 }

+=A0=A0=A0= =A0=A0=A0=A0 }

+=A0=A0=A0=A0=A0=A0=A0 retries =3D= 0;

+

+=A0=A0=A0=A0=A0= =A0=A0 g_client_state.is_connected =3D 1;

+

+=A0=A0=A0=A0=A0=A0=A0 repagent_client_connected();

+=A0=A0=A0=A0=A0=A0=A0 repcmd_listener(g_client_state.hsock, repagent_pr= ocess_cmd, NULL);

+=A0=A0=A0=A0=A0=A0=A0 close(g_= client_state.hsock);

+

+=A0=A0=A0=A0=A0=A0=A0 g_client_state.is_connec= ted =3D 0;

+=A0=A0=A0 }

+=A0=A0=A0 return 0;

+}

+

+void repagent_process_cmd(RepCmd *pcmd, uint= 8_t *pdata, void *clientPtr)

+{

+=A0=A0=A0 int is_free_= data =3D 1;

+=A0=A0=A0 printf("Repagent got = cmd %d\n", pcmd->hdr.cmdid);

+=A0=A0=A0 s= witch (pcmd->hdr.cmdid) {

+=A0=A0=A0 case REPHUB_CMD_START_PROTECT: {

+=A0=A0=A0=A0=A0=A0=A0 is_free_data =3D repaget_start_pro= tect((RepCmdStartProtect *) pcmd,

+=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 (RepCmdDataStartProtect *) pdata);

+=A0=A0=A0 }

+=A0=A0=A0=A0= =A0=A0=A0 break;

+=A0=A0=A0 case REPHUB_CMD_READ_= VOL_REQ: {

+=A0=A0=A0=A0=A0=A0=A0 is_free_data = =3D repaget_read_vol((RepCmdReadVolReq *) pcmd, pdata);

+=A0=A0=A0 }

+=A0=A0=A0=A0= =A0=A0=A0 break;

+=A0=A0=A0 default:

+=A0=A0=A0=A0=A0=A0=A0 assert(0);

= +=A0=A0=A0=A0=A0=A0=A0 break;

+

+=A0=A0=A0 }

+

+=A0=A0= =A0 if (is_free_data) {

+=A0=A0=A0=A0=A0=A0=A0 g_= free(pdata);

+=A0=A0=A0 }

+}

+

+int repagent_client_send(RepCmd *p_cmd)

+{

+=A0=A0=A0 int bytecount =3D 0;

+=A0=A0=A0 printf("Send cmd %u, data size %u\n", p_cmd->hd= r.cmdid,

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 p_cmd->hdr.data_size_bytes);

+=A0=A0=A0 if (!g_client_state.is_connected) {

+=A0=A0=A0=A0=A0=A0=A0 printf("Not connected to hub\n&= quot;);

+=A0=A0=A0=A0=A0=A0=A0 return -1;

+=A0=A0=A0 }

+

+=A0=A0=A0 bytecount =3D send(g_client_state.hsock, p_cmd,

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 sizeof(RepCmd) = + p_cmd->hdr.data_size_bytes, 0);

+=A0=A0=A0 if (bytecount < sizeof(RepCmd) + p_cmd= ->hdr.data_size_bytes) {

+=A0=A0=A0=A0=A0=A0= =A0 printf("Bad send %d, errno %d\n", bytecount, errno);

+=A0=A0=A0=A0=A0=A0=A0 return bytecount;

+=A0=A0=A0 }

+

+=A0=A0=A0 /* Success */

+=A0=A0= =A0 return 0;

+}

diff -= -git a/replication/repagent_client.h b/replication/repagent_client.h

new file mode 100644

index= 0000000..62a5377

--- /dev/null

+++ b/replication/repagent_client.h

@@ -= 0,0 +1,36 @@

+/*

+ * QEMU System Emulat= or

+ *

+ * Copyright (c= ) 2003-2008 Fabrice Bellard

+ *

+ * Permission is hereby granted, free of charge, to any person obtaining a= copy

+ * of this software and associated documen= tation files (the "Software"), to deal

+ * in the Software without restriction, including without limitation the r= ights

+ * to use, copy, modify, merge, publish, d= istribute, sublicense, and/or sell

+ * copies of = the Software, and to permit persons to whom the Software is

+ * furnished to do so, subject to the following con= ditions:

+ *

+ * The ab= ove copyright notice and this permission notice shall be included in

+ * all copies or substantial portions of the Software.

+ *

+ * THE SOFTWARE IS PROVIDED "AS = IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR

+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY= ,

+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINF= RINGEMENT. IN NO EVENT SHALL

+ * THE AUTHORS OR C= OPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER

+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TOR= T OR OTHERWISE, ARISING FROM,

+ * OUT OF OR IN CO= NNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN

+ * THE SOFTWARE.

+ */

= +#ifndef REPAGENT_CLIENT_H

+#define REPAGENT_CLIE= NT_H

+#include "repcmd.h"

+

+typedef struct rephub_params {

+=A0=A0=A0 char *name;

+=A0=A0=A0 int = port;

+} rephub_params;

+

+void *repagent_listen(void *pParam);

+int repage= nt_client_send(RepCmd *p_cmd);

+

+#endif /* REPAGENT_CLIENT_H */

diff --= git a/replication/repcmd.h b/replication/repcmd.h

new file mode 100644

index= 0000000..8c6cf1b

--- /dev/null

+++ b/replication/repcmd.h

@@ -0,0 +1,59= @@

+/*

+ * QEMU System Emulat= or

+ *

+ * Copyright (c= ) 2003-2008 Fabrice Bellard

+ *

+ * Permission is hereby granted, free of charge, to any person obtaining a= copy

+ * of this software and associated documen= tation files (the "Software"), to deal

+ * in the Software without restriction, including without limitation the r= ights

+ * to use, copy, modify, merge, publish, d= istribute, sublicense, and/or sell

+ * copies of = the Software, and to permit persons to whom the Software is

+ * furnished to do so, subject to the following con= ditions:

+ *

+ * The ab= ove copyright notice and this permission notice shall be included in

+ * all copies or substantial portions of the Software.

+ *

+ * THE SOFTWARE IS PROVIDED "AS = IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR

+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY= ,

+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINF= RINGEMENT. IN NO EVENT SHALL

+ * THE AUTHORS OR C= OPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER

+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TOR= T OR OTHERWISE, ARISING FROM,

+ * OUT OF OR IN CO= NNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN

+ * THE SOFTWARE.

+ */

= +#ifndef REPCMD_H

+#define REPCMD_H

+

+#include <stdint.h>

+

+#define REPCMD_MAGIC1 (0x1122)

+#define REPCMD_MAGIC2 (0x3344)

+#defi= ne REPCMD_NUM_U32_PARAMS (11)

+

+enum RepCmds {

+=A0=A0=A0 REPCMD_FIRST_INVALID= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =3D 0,

+=A0=A0=A0 REPCMD_FIRST_HUBCMD=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =3D 1,

+=A0=A0= =A0 REPHUB_CMD_PROTECTED_WRITE=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =3D 2= ,

+=A0=A0=A0 REPHUB_CMD_REPORT_VM_VOLUMES=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0 =3D 3,

+=A0=A0=A0 REPHUB_CM= D_START_PROTECT=A0=A0 =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=3D 4,

+=A0=A0=A0 REPHUB_CMD_READ_VOL_REQ=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0 =3D 5,

+=A0=A0=A0 REPHUB_CMD_READ_VOL_RES=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =3D 6,

+=A0=A0=A0 = REPHUB_CMD_AGENT_SHUTDOWN=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =3D 7,<= /p>

+};

+

+typedef struct RepCmdHdr {

+=A0=A0=A0 uint16_t m= agic1;

+=A0=A0=A0 uint16_t cmdid;

+=A0=A0=A0 uint32_t data_size_bytes;

+= } RepCmdHdr;

+

+typedef struct RepCmd {

+=A0=A0=A0 RepCmdHdr hdr;

+=A0=A0=A0 unsigned= int parameters[REPCMD_NUM_U32_PARAMS];

+=A0=A0= =A0 unsigned int magic2;

+=A0=A0=A0 uint8_t data[0];

+} RepCmd;

+

+RepCmd = *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata);

+

+#endif /* REPCMD_H */

diff --git a/replication/repcmd_listener.c b/replication/repcmd_listener.= c

new file mode 100644

= index 0000000..a211927

--- /dev/null

+++ b/replic= ation/repcmd_listener.c

@@ -0,0 +1,137 @@

+#include <fcntl.h>

+#in= clude <string.h>

+#include <stdlib.h>

+#include <errno.h>

+#include <stdio.h&= gt;

+#include <netinet/in.h>

+#include <resolv.h>

+#include <sys/sock= et.h>

+#include <arpa/inet.h>

+#include <unistd.h>

+#incl= ude <pthread.h>

+#include <assert.h>

+

+/* Use the CONFIG_REPLICATION flag to determi= ne whether

+ * we're under qemu build or a hu= b When under

+ * qemu use g_malloc */

+= #ifdef CONFIG_REPLICATION

+#include <glib.h>= ;

+#define REPCMD_MALLOC g_malloc

+#else

+#define REPCMD_MALLOC malloc

+#endif

+

+#include "repcmd.h"

+#include "= repcmd_listener.h"

+

+#define ZERO_MEM_OBJ(pO= bj) memset((void *)pObj, 0, sizeof(*pObj))

+

<= p class=3D"MsoNormal">+typedef struct RepCmdListenerState {

+=A0=A0=A0 int is_terminate_receive;

+} RepCmdLis= tenerState;

+

+static R= epCmdListenerState g_listenerState =3D { 0 };

+

+/* Returns 0 for initiated termination or socket error value on error */

+int repcmd_listener(int hsock, pfn_received_cmd_c= b callback, void *clientPtr)

+{

+=A0=A0=A0 RepCmd curCmd;

+=A0=A0=A0 uint8_t *pRe= adBuf =3D (uint8_t *) &curCmd;

+=A0=A0=A0 int= bytesToGet =3D sizeof(RepCmd);

+=A0=A0=A0 int by= tesGotten =3D 0;

+=A0=A0=A0 int isGotHeader =3D 0;

+=A0=A0=A0 uint= 8_t *pdata =3D NULL;

+

= +=A0=A0=A0 assert(callback !=3D NULL);

+

+=A0=A0=A0 /* receive loop */

+=A0=A0=A0 while (!g_listenerState.is_terminate_rece= ive) {

+=A0=A0=A0=A0=A0=A0=A0 int bytecount;

<= p class=3D"MsoNormal">+

+=A0=A0=A0=A0=A0=A0=A0 by= tecount =3D recv(hsock, pReadBuf + bytesGotten,

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 bytes= ToGet - bytesGotten, 0);

+=A0=A0=A0=A0=A0=A0=A0 i= f (bytecount =3D=3D -1) {

+=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0 fprintf(stderr, "Error receiving data %d\n", errno);=

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 return errno;

=

+=A0=A0=A0=A0=A0=A0=A0 }

+=

+=A0=A0=A0=A0=A0=A0=A0 if (bytecount =3D=3D 0) {=

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 printf("= Disconnected\n");

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 return 0;

+=A0=A0=A0=A0=A0=A0=A0 }

+=A0= =A0=A0=A0=A0=A0=A0 bytesGotten +=3D bytecount;

+/= *=A0=A0=A0=A0 printf("Recieved bytes %d, got %d/%d\n",

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 bytec= ount, bytesGotten, bytesToGet); */

+=A0=A0=A0=A0= =A0=A0=A0 /* print content */

+=A0=A0=A0=A0=A0=A0= =A0 if (0) {

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 i= nt i;

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 for (i =3D 0; i < bytecount ; i +=3D = 4) {

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0 /*printf("%d/%d", i, bytecount/4); */

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 printf("%#x ",<= /p>

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 *(in= t *) (&pReadBuf[bytesGotten - bytecount + i]));

+

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 }

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 printf("\n"= );

+=A0=A0=A0=A0=A0=A0=A0 }

+=A0=A0=A0=A0=A0=A0=A0 a= ssert(bytesGotten <=3D bytesToGet);

+=A0=A0=A0= =A0=A0=A0=A0 if (bytesGotten =3D=3D bytesToGet) {

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 int isGotData =3D 0;

+=A0=A0=A0=A0 =A0=A0=A0=A0=A0=A0=A0bytesGotten =3D 0;

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 if (!isGotHeader) {

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 /* We just got th= e header */

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0 isGotHeader =3D 1;

+

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 = assert(curCmd.hdr.magic1 =3D=3D REPCMD_MAGIC1);

+= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 assert(curCmd.magic2 =3D=3D R= EPCMD_MAGIC2);

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0 if (curCmd.hdr.data_size_bytes > 0) {

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 pdata =3D (uint8_t *)REPCMD_MALLOC(

+=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0 curCmd.hdr.data_size_bytes);

+/*= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 printf("mall= oc %p\n", pdata); */

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 pReadBuf =3D pdata;

+=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0 } else {

+=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 /* no data */

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 isGot= Data =3D 1;

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 pdata =3D NULL;

+=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0 }

+=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0 bytesToGet =3D curCmd.hdr.data_size_bytes;

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 } else {

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0=A0=A0=A0isGot= Data =3D 1;

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 }<= /p>

+

+=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0 if (isGotData) {

+=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 /* Got command and data */

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 (*cal= lback)(&curCmd, pdata, clientPtr);

+

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 /* It'= s the callee responsibility to free pData */

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 pdata =3D NULL;

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 ZERO_MEM_OBJ(= &curCmd);

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0 pReadBuf =3D (uint8_t *) &curCmd;

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 bytesGotten =3D 0;

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 bytes= ToGet =3D sizeof(RepCmd);

+=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0 isGotHeader =3D 0;

+=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 }

+=A0=A0=A0=A0=A0= =A0=A0}

+=A0=A0=A0 }

+=A0=A0=A0 return 0;

+}

+

+RepCmd= *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata)

+{

+=A0=A0=A0 RepCmd *p_cmd =3D (RepCmd *)REPCMD_MALLOC= (sizeof(RepCmd) + data_size);

+=A0=A0=A0 assert(p= _cmd !=3D NULL);

+

+=A0= =A0=A0 /* Zero the CMD (not the data) */

+=A0=A0=A0 ZERO_MEM_OBJ(p_cmd);

+

+=A0=A0=A0 p_cmd->hdr.cmdid =3D cmd_id= ;

+=A0=A0=A0 p_cmd->hdr.magic1 =3D REPCMD_MAGI= C1;

+=A0=A0=A0 p_cmd->magic2 =3D REPCMD_MAGIC2;

+= =A0=A0=A0 p_cmd->hdr.data_size_bytes =3D data_size;

+

+=A0=A0=A0 if (p_out_pdata !=3D NULL) {

+=A0=A0=A0=A0=A0=A0=A0 *p_out_pdata =3D p_cmd->= data;

+=A0=A0=A0 }

+

+=A0=A0=A0 return p_cmd;

+}

+

diff --git a/replication/re= pcmd_listener.h b/replication/repcmd_listener.h

new file mode 100644

index= 0000000..c09a12e

--- /dev/null

+++ b/replication/repcmd_listener.h

@@ -= 0,0 +1,32 @@

+/*

+ * QEMU System Emulat= or

+ *

+ * Copyright (c= ) 2003-2008 Fabrice Bellard

+ *

+ * Permission is hereby granted, free of charge, to any person obtaining a= copy

+ * of this software and associated documen= tation files (the "Software"), to deal

+ * in the Software without restriction, including without limitation the r= ights

+ * to use, copy, modify, merge, publish, d= istribute, sublicense, and/or sell

+ * copies of = the Software, and to permit persons to whom the Software is

+ * furnished to do so, subject to the following con= ditions:

+ *

+ * The ab= ove copyright notice and this permission notice shall be included in

+ * all copies or substantial portions of the Software.

+ *

+ * THE SOFTWARE IS PROVIDED "AS = IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR

+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY= ,

+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINF= RINGEMENT. IN NO EVENT SHALL

+ * THE AUTHORS OR C= OPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER

+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TOR= T OR OTHERWISE, ARISING FROM,

+ * OUT OF OR IN CO= NNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN

+ * THE SOFTWARE.

+ */

= +#ifndef REPCMD_LISTENER_H

+#define REPCMD_LISTEN= ER_H

+#include <stdint.h>

+typedef void (*pfn_received_cmd_cb)(RepCmd *pCmd,

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 uint8_t *pData, void *clie= ntPtr);

+

+int repcmd_l= istener(int hsock, pfn_received_cmd_cb callback, void *clientPtr);

+

+#endif /* REPCMD_LISTEN= ER_H */

diff --git a/replication/rephub_cmds.h b/= replication/rephub_cmds.h

new file mode 100644

index 0000000..820c37d

--- /dev/null

+++ b/replication/rephub_cmds.h

@@= -0,0 +1,150 @@

+/*

+ *= QEMU System Emulator

+ *

+ * Copyright (c) 2003= -2008 Fabrice Bellard

+ *

+ * Permission is hereby granted, free of charge, to any person obtainin= g a copy

+ * of this software and associated documentation fi= les (the "Software"), to deal

+ * in th= e Software without restriction, including without limitation the rights

+ * to use, copy, modify, merge, publish, distribute= , sublicense, and/or sell

+ * copies of the Softw= are, and to permit persons to whom the Software is

+ * furnished to do so, subject to the following conditions:

+ *

+ * The above copyright notice an= d this permission notice shall be included in

+ *= all copies or substantial portions of the Software.

+ *

+ * THE SOFTWARE IS PR= OVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR

+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES = OF MERCHANTABILITY,

+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGE= MENT. IN NO EVENT SHALL

+ * THE AUTHORS OR COPYRI= GHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER

+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING= FROM,

+ * OUT OF OR IN CONNECTION WITH THE SOFTW= ARE OR THE USE OR OTHER DEALINGS IN

+ * THE SOFTW= ARE.

+ */

+#ifndef REPHUB_CMDS_= H

+#define REPHUB_CMDS_H

+

+#include <stdint.h>

+#include "repcmd.h"

+#include "re= phub_defs.h"

+

+/*= ********************************************************

+ * RepCmd Report a protected IO

+ *

+ * REPHUB_CMD_PROTECTED_WRITE

+ *= Direction: agent->hub

+ *

+ * Any write of a protected volume is send with this

+ * message to the hub, with its status.

+ *= When case the status is bad no data is sent

+ **= *******************************************************/

+typedef struct RepCmdProtectedWrite {

+=A0=A0=A0 RepCmdHdr hdr;

+=A0=A0= =A0 uint64_t volume_id;

+=A0=A0=A0 uint64_t offse= t_sectors;

+=A0=A0=A0 /* The size field duplicates the RepCmd size,

+=A0=A0=A0=A0 * but it is needed for reporting failed IOs' size= s */

+=A0=A0=A0 uint32_t size_sectors;

+=A0=A0=A0 int ret_status;

+} RepCmdProtectedWrite;

+=

+/**********************************************= ***********

+ * RepCmd Report VM volumes

+ *

+ * REPHUB_CMD_REPORT_VM_VOLUMES

+ * Direction: agent->hub

+ *

+ * The agent reports all the volumes of the VM

+ * to the hub.

+ *******************************= **************************/

+typedef struct RepVm= VolumeInfo {

+=A0=A0=A0 char name[REPHUB_MAX_VOL_= NAME_LEN];

+=A0=A0=A0 uint64_t volume_id;

+=A0=A0=A0 uint32_t size_mb;

+} RepVmVolumeI= nfo;

+

+typedef struct = RepCmdReportVmVolumes {

+=A0=A0=A0 RepCmdHdr hdr;

= +=A0=A0=A0 int num_volumes;

+} RepCmdReportVmVolu= mes;

+

+typedef struct = RepCmdDataReportVmVolumes {

+=A0=A0=A0 RepVmVolumeInfo volumes[0];

+} RepCmdDataReportVmVolumes;

+

+

+/*********************= ************************************

+ * RepCmd Start protect

+= *

+ * REPHUB_CMD_START_PROTECT

+ * Direction: hub->agent

+ *

+ * The hub instructs the agent to start protecting

+ * a volume. When a volume is protected all its writes

+ * are sent to to the hub.

+ * With th= is command the hub also assigns a volume ID to

+ * the given volume name.

+ *********************************************************/

+typedef struct RepCmdStartProtect {

+=A0=A0=A0 RepCmdHdr hdr;

+=A0=A0=A0 uint64_t volume_id;

+} RepCmdStartProtect;

+

+typedef struct RepCmdDataStartProtect {

+= =A0=A0=A0 char volume_name[REPHUB_MAX_VOL_NAME_LEN];

+} RepCmdDataStartProtect;

+

+

+/****************= *****************************************

+ * Rep= Cmd Read Volume Request

+ *

+ * REPHUB_CMD_READ_VO= L_REQ

+ * Direction: hub->agent

+ *

+ * The hub issues a read IO to a= protected volume.

+ * This command is used during sync - when the hub = needs

+ * to read unsyncronized sections of a pro= tected volume.

+ * This command is a request, the= read data is returned

+ * by the response command REPHUB_CMD_READ_VOL_RES<= /p>

+ ***********************************************= **********/

+typedef struct RepCmdReadVolReq {

+=A0=A0=A0 RepCmdHdr hdr;

+=A0=A0=A0 int req_id;<= /p>

+=A0=A0=A0 int size_sectors;

+=A0=A0=A0 uint64_t volume_id;

+=A0=A0=A0 u= int64_t offset_sectors;

+} RepCmdReadVolReq;

+

= +/*********************************************************

+ * RepCmd Read Volume Response

+ *

+ * REPHUB_CMD_READ_VOL_RES

+ * Direction: agent-= >hub

+ *

+ * A respo= nse to REPHUB_CMD_READ_VOL_REQ.

+ * Sends the dat= a read from a protected volume

+ **************************************************= *******/

+typedef struct RepCmdReadVolRes {

+=A0=A0=A0 RepCmdHdr hdr;

+= =A0=A0=A0 int req_id;

+=A0=A0=A0 int is_status_success;

+=A0=A0=A0 uint64_t volume_id;

+} RepCmdR= eadVolRes;

+

+/********= *************************************************

+ * RepCmd Agent shutdown

= + *

+ * REPHUB_CMD_AGENT_SHUTDOWN

+ * Direction: agent->hub

+ *

+ * Notifies the hub that the agent is about to shutdown.

+ * This allows a graceful shutdown. Any disconnection

+ * of an agent without sending this command will result

+ * in a full sync of the VM volumes.

+ *********************************************************/

+typedef struct RepCmdAgentShutdown {

+=A0=A0=A0 RepCmdHdr hdr;

+} RepCmdAgentShutdown;=

+

+

+#endif /* REPHUB_CMDS_H */

diff --git a/= replication/rephub_defs.h b/replication/rephub_defs.h

new file mode 100644

index= 0000000..e34e0ce

--- /dev/null

+++ b/replication/rephub_defs.h

@@ -0,0 = +1,40 @@

+/*

+ * QEMU System Emulat= or

+ *

+ * Copyright (c= ) 2003-2008 Fabrice Bellard

+ *

+ * Permission is hereby granted, free of charge, to any person obtaining a= copy

+ * of this software and associated documen= tation files (the "Software"), to deal

+ * in the Software without restriction, including without limitation the r= ights

+ * to use, copy, modify, merge, publish, d= istribute, sublicense, and/or sell

+ * copies of = the Software, and to permit persons to whom the Software is

+ * furnished to do so, subject to the following con= ditions:

+ *

+ * The ab= ove copyright notice and this permission notice shall be included in

+ * all copies or substantial portions of the Software.

+ *

+ * THE SOFTWARE IS PROVIDED "AS = IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR

+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY= ,

+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINF= RINGEMENT. IN NO EVENT SHALL

+ * THE AUTHORS OR C= OPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER

+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TOR= T OR OTHERWISE, ARISING FROM,

+ * OUT OF OR IN CO= NNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN

+ * THE SOFTWARE.

+ */

= +#ifndef REP_HUB_DEFS_H

+#define REP_HUB_DEFS_H

+

+#include <stdint.h= >

+

+#define REPHUB_MAX_VOL_= NAME_LEN (1024)

+#define REPHUB_MAX_NUM_VOLUMES (= 512)

+

+#ifndef TRUE

+=A0=A0=A0 #define TRUE (1)

+#endif

+

+#ifndef FALSE

+=A0=A0=A0 #define FALSE (0)

+#endif

=

+

+#endif /* REP_HUB_DEFS_H */

diff --git a/vl.c b/vl.c

index 624da0f..5= 06b5dc 100644

--- a/vl.c

+++ b/vl.c

@@ -167,6 +167,7 @@ int main(int argc, char **argv)<= /p>

=A0#include "ui/= qemu-spice.h"

+#= include "replication/repagent.h"

//#define DEBUG_NET

//#d= efine DEBUG_SLIRP

@@ = -2307,6 +2308,15 @@ int main(int argc, char **argv, char **envp)

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 drive_add(IF_DEFAULT, popt= ->index - QEMU_OPTION_hda, optarg,

=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 HD_OP= TS);

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 break;

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 = case QEMU_OPTION_repagent:

+#ifdef CONFIG_REPLICATION

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 repagent_init(optarg, 0);

+#else

+=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 fprintf(stderr, "Replication support is= disabled. "

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 "Don't use -repagent option.\n");

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 exit(1);

+#endif

+=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0 break;

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 case QEMU_OPTIO= N_drive:

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0 if (drive_def(optarg) =3D=3D NULL) {

= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 exit(1);

--

1.7.6.4

=A0

--047d7b3396015fba3f04b85d40a9--