From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([208.118.235.92]:57805) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1SFlds-0004uY-1M for qemu-devel@nongnu.org; Thu, 05 Apr 2012 08:18:27 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1SFlde-0002fF-5p for qemu-devel@nongnu.org; Thu, 05 Apr 2012 08:18:17 -0400 Received: from mail-qa0-f52.google.com ([209.85.216.52]:41881) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1SFldd-0002ew-UU for qemu-devel@nongnu.org; Thu, 05 Apr 2012 08:18:06 -0400 Received: by qabg40 with SMTP id g40so802265qab.4 for ; Thu, 05 Apr 2012 05:17:58 -0700 (PDT) From: Ori Mamluk MIME-Version: 1.0 Date: Thu, 5 Apr 2012 15:17:50 +0300 Message-ID: Content-Type: multipart/alternative; boundary=20cf3074b916ce7c3104bced874d Subject: [Qemu-devel] [RFC PATCH v3 2/9] repagent: Changed repagent socket to be based on set_fd_handler, converted parsing functions to be non-serial List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: qemu-devel@nongnu.org Cc: Kevin Wolf , Roni Luxenberg , Stefan Hajnoczi , dlaor@redhat.com, Anthony Liguori , Oded Kedem , Yair Kuszpet , Paolo Bonzini --20cf3074b916ce7c3104bced874d Content-Type: text/plain; charset=ISO-8859-1 Use set_fd_handler instead of a listening thread. Change the reading/parsing function to be state-machine based, because they no longer have their own thread. --- Makefile | 6 +- Makefile.objs | 10 +- configure | 3 +- replication/repagent_client.c | 28 ++++++- replication/repcmd_listener.c | 166 +++++++++++++++++++++++++---------------- replication/repcmd_listener.h | 8 +- 6 files changed, 141 insertions(+), 80 deletions(-) diff --git a/Makefile b/Makefile index fbd77df..b6379fb 100644 --- a/Makefile +++ b/Makefile @@ -156,9 +156,9 @@ tools-obj-y = $(oslib-obj-y) $(trace-obj-y) qemu-tool.o qemu-timer.o \ qemu-timer-common.o main-loop.o notify.o iohandler.o cutils.o async.o tools-obj-$(CONFIG_POSIX) += compatfd.o -qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y) $(replication-obj-y) -qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y) $(replication-obj-y) -qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y) $(replication-obj-y) +qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y) +qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y) +qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y) qemu-bridge-helper$(EXESUF): qemu-bridge-helper.o qemu-bridge-helper.o: $(GENERATED_HEADERS) diff --git a/Makefile.objs b/Makefile.objs index a28eefb..01413a2 100755 --- a/Makefile.objs +++ b/Makefile.objs @@ -30,6 +30,11 @@ block-obj-y += $(coroutine-obj-y) $(qobject-obj-y) $(version-obj-y) block-obj-$(CONFIG_POSIX) += posix-aio-compat.o block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o +# Replication agent +replication-nested-y = repagent_client.o repagent.o repcmd_listener.o +replication-obj-y = $(addprefix replication/, $(replication-nested-y)) +block-obj-y += $(replication-obj-y) + block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vvfat.o block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-cache.o block-nested-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o @@ -423,11 +428,6 @@ common-obj-y += qmp-marshal.o qapi-visit.o qapi-types.o $(qapi-obj-y) common-obj-y += qmp.o hmp.o ###################################################################### -# replication -replication-nested-y = repagent_client.o repagent.o repcmd_listener.o -replication-obj-y = $(addprefix replication/, $(replication-nested-y)) - -###################################################################### # guest agent qga-nested-y = commands.o guest-agent-command-state.o diff --git a/configure b/configure index f97394f..83b74c2 100755 --- a/configure +++ b/configure @@ -2883,7 +2883,6 @@ echo "curses support $curses" echo "curl support $curl" echo "mingw32 support $mingw32" echo "Audio drivers $audio_drv_list" -echo "Replication $replication" echo "Extra audio cards $audio_card_list" echo "Block whitelist $block_drv_whitelist" echo "Mixer emulation $mixemu" @@ -3904,3 +3903,5 @@ symlink $source_path/Makefile.user $d/Makefile if test "$docs" = "yes" ; then mkdir -p QMP fi + +echo "Replication $replication" diff --git a/replication/repagent_client.c b/replication/repagent_client.c index 4dd9ea4..eaa0a28 100644 --- a/replication/repagent_client.c +++ b/replication/repagent_client.c @@ -3,6 +3,7 @@ #include "repcmd_listener.h" #include "repagent_client.h" #include "repagent.h" +#include "main-loop.h" #include #include @@ -26,6 +27,15 @@ typedef struct repagent_client_state { static repagent_client_state g_client_state = { 0 }; +static void repagent_client_read(void *opaque) +{ + printf("repagent_client_read\n"); + int bytes_read = repcmd_listener_socket_read_next_buf(g_client_state.hsock); + if (bytes_read <= 0) { + g_client_state.is_connected = 0; + } +} + void *repagent_listen(void *pParam) { rephub_params *pServerParams = (rephub_params *) pParam; @@ -80,13 +90,25 @@ void *repagent_listen(void *pParam) } retries = 0; - g_client_state.is_connected = 1; repagent_client_connected(); - repcmd_listener(g_client_state.hsock, repagent_process_cmd, NULL); - close(g_client_state.hsock); + repcmd_listener_init(repagent_process_cmd, NULL); + g_client_state.is_connected = 1; + static int c; + /* repcmd_listener_socket_thread_listener(g_client_state.hsock); */ + qemu_set_fd_handler(g_client_state.hsock, repagent_client_read, NULL, + NULL); + while (g_client_state.is_connected) { + printf("Connected (%d)...\n", c++); + usleep(1 * 1000 * 1000); + } + /* Unregister */ + qemu_set_fd_handler(g_client_state.hsock, NULL, NULL, NULL); + printf("Disconnected\n"); g_client_state.is_connected = 0; + close(g_client_state.hsock); + } return 0; } diff --git a/replication/repcmd_listener.c b/replication/repcmd_listener.c index a211927..c1ce97f 100644 --- a/replication/repcmd_listener.c +++ b/replication/repcmd_listener.c @@ -26,93 +26,129 @@ #define ZERO_MEM_OBJ(pObj) memset((void *)pObj, 0, sizeof(*pObj)) + +typedef struct RepCmdRxCmdState { + RepCmd curCmd; + uint8_t *pReadBuf; + int bytesToGet; + int bytesGotten; + int isGotHeader; + uint8_t *pdata; +} RepCmdRxCmdState; + typedef struct RepCmdListenerState { int is_terminate_receive; + pfn_received_cmd_cb receive_cb; + void *opaque; + int hsock; + RepCmdRxCmdState cur_cmd; } RepCmdListenerState; static RepCmdListenerState g_listenerState = { 0 }; -/* Returns 0 for initiated termination or socket error value on error */ -int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void *clientPtr) +static int repcmd_listener_process_rx(int bytecount); + +void repcmd_listener_init(pfn_received_cmd_cb callback, void *opaque) { - RepCmd curCmd; - uint8_t *pReadBuf = (uint8_t *) &curCmd; - int bytesToGet = sizeof(RepCmd); - int bytesGotten = 0; - int isGotHeader = 0; - uint8_t *pdata = NULL; + ZERO_MEM_OBJ(&g_listenerState); + g_listenerState.receive_cb = callback; + g_listenerState.opaque = opaque; - assert(callback != NULL); + g_listenerState.cur_cmd.bytesToGet = sizeof(RepCmd); + g_listenerState.cur_cmd.pReadBuf = + (uint8_t *) &g_listenerState.cur_cmd.curCmd; +} +int repcmd_listener_socket_read_next_buf(int hsock) +{ + RepCmdRxCmdState *cmd_state = &g_listenerState.cur_cmd; + int bytecount = recv(hsock, cmd_state->pReadBuf + cmd_state->bytesGotten, + cmd_state->bytesToGet - cmd_state->bytesGotten, 0); + return repcmd_listener_process_rx(bytecount); +} + +/* Returns 0 for initiated termination or socket error value on error */ +int repcmd_listener_socket_thread_listener(int hsock) +{ + int ret = 0; /* receive loop */ while (!g_listenerState.is_terminate_receive) { - int bytecount; - - bytecount = recv(hsock, pReadBuf + bytesGotten, - bytesToGet - bytesGotten, 0); - if (bytecount == -1) { - fprintf(stderr, "Error receiving data %d\n", errno); - return errno; + ret = repcmd_listener_socket_read_next_buf(hsock); + if (ret <= 0) { + return ret; } + } + return 0; +} - if (bytecount == 0) { - printf("Disconnected\n"); - return 0; - } - bytesGotten += bytecount; +static int repcmd_listener_process_rx(int bytecount) +{ + RepCmdRxCmdState *cmd_state = &g_listenerState.cur_cmd; + if (bytecount == -1) { + fprintf(stderr, "Error receiving data %d\n", errno); + return errno; + } + + if (bytecount == 0) { + printf("Disconnected\n"); + return 0; + } + cmd_state->bytesGotten += bytecount; /* printf("Recieved bytes %d, got %d/%d\n", - bytecount, bytesGotten, bytesToGet); */ - /* print content */ - if (0) { - int i; - for (i = 0; i < bytecount ; i += 4) { - /*printf("%d/%d", i, bytecount/4); */ - printf("%#x ", - *(int *) (&pReadBuf[bytesGotten - bytecount + i])); + bytecount, cmd_state->bytesGotten, cmd_state->bytesToGet); */ + /* print content */ + if (0) { + int i; + for (i = 0; i < bytecount ; i += 4) { + /*printf("%d/%d", i, bytecount/4); */ + printf( + "%#x ", + *(int *) (&cmd_state->pReadBuf[cmd_state->bytesGotten + - bytecount + i])); - } - printf("\n"); } - assert(bytesGotten <= bytesToGet); - if (bytesGotten == bytesToGet) { - int isGotData = 0; - bytesGotten = 0; - if (!isGotHeader) { - /* We just got the header */ - isGotHeader = 1; - - assert(curCmd.hdr.magic1 == REPCMD_MAGIC1); - assert(curCmd.magic2 == REPCMD_MAGIC2); - if (curCmd.hdr.data_size_bytes > 0) { - pdata = (uint8_t *)REPCMD_MALLOC( - curCmd.hdr.data_size_bytes); -/* printf("malloc %p\n", pdata); */ - pReadBuf = pdata; - } else { - /* no data */ - isGotData = 1; - pdata = NULL; - } - bytesToGet = curCmd.hdr.data_size_bytes; + printf("\n"); + } + assert(cmd_state->bytesGotten <= cmd_state->bytesToGet); + if (cmd_state->bytesGotten == cmd_state->bytesToGet) { + int isGotData = 0; + cmd_state->bytesGotten = 0; + if (!cmd_state->isGotHeader) { + /* We just got the header */ + cmd_state->isGotHeader = 1; + + assert(cmd_state->curCmd.hdr.magic1 == REPCMD_MAGIC1); + assert(cmd_state->curCmd.magic2 == REPCMD_MAGIC2); + if (cmd_state->curCmd.hdr.data_size_bytes > 0) { + cmd_state->pdata = (uint8_t *)REPCMD_MALLOC( + cmd_state->curCmd.hdr.data_size_bytes); +/* printf("malloc %p\n", cmd_state->pdata); */ + cmd_state->pReadBuf = cmd_state->pdata; } else { + /* no data */ isGotData = 1; + cmd_state->pdata = NULL; } + cmd_state->bytesToGet = cmd_state->curCmd.hdr.data_size_bytes; + } else { + isGotData = 1; + } - if (isGotData) { - /* Got command and data */ - (*callback)(&curCmd, pdata, clientPtr); - - /* It's the callee responsibility to free pData */ - pdata = NULL; - ZERO_MEM_OBJ(&curCmd); - pReadBuf = (uint8_t *) &curCmd; - bytesGotten = 0; - bytesToGet = sizeof(RepCmd); - isGotHeader = 0; - } + if (isGotData) { + /* Got command and data */ + (*g_listenerState.receive_cb)(&cmd_state->curCmd, cmd_state->pdata, + g_listenerState.opaque); + + /* It's the callee responsibility to free cmd_state->pdata */ + cmd_state->pdata = NULL; + ZERO_MEM_OBJ(&cmd_state->curCmd); + cmd_state->pReadBuf = (uint8_t *) &cmd_state->curCmd; + cmd_state->bytesGotten = 0; + cmd_state->bytesToGet = sizeof(RepCmd); + cmd_state->isGotHeader = 0; } } - return 0; + return bytecount; } RepCmd *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata) diff --git a/replication/repcmd_listener.h b/replication/repcmd_listener.h index c09a12e..19b9ea9 100644 --- a/replication/repcmd_listener.h +++ b/replication/repcmd_listener.h @@ -24,9 +24,11 @@ #ifndef REPCMD_LISTENER_H #define REPCMD_LISTENER_H #include -typedef void (*pfn_received_cmd_cb)(RepCmd *pCmd, - uint8_t *pData, void *clientPtr); +typedef void (*pfn_received_cmd_cb)(RepCmd *pcmd, + uint8_t *pdata, void *opaque); -int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void *clientPtr); +void repcmd_listener_init(pfn_received_cmd_cb callback, void *opaque); +int repcmd_listener_socket_read_next_buf(int hsock); +int repcmd_listener_socket_thread_listener(int hsock); #endif /* REPCMD_LISTENER_H */ -- 1.7.6.5 --20cf3074b916ce7c3104bced874d Content-Type: text/html; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable

Use set_fd_handler instead of= a listening thread.

Change the reading/parsing f= unction to be state-machine based, because they no longer have their own th= read.

---

Makefile=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 |=A0=A0=A0 6 +-

Makefile.objs=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 |=A0=A0 10 +-

configure=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 |=A0=A0=A0 3 +-

replication/repagent_client.c |=A0=A0 28 ++++++-

replication/repcmd_listener.c |=A0 166 +++++++++++= ++++++++++++++----------------

replication/repcm= d_listener.h |=A0=A0=A0 8 +-

6 files changed, 141 insertions(+), 80 deletions(-)=

=A0

diff --git a/Makef= ile b/Makefile

index fbd77df..b6379fb 100644

<= p class=3D"MsoNormal"> --- a/Makefile

+++ b/Makefile

@@ -156,9 +156,9 @@ tools-obj-y =3D $(oslib-obj-y) $(trace-obj-y) qe= mu-tool.o qemu-timer.o \

=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0 qemu-timer-common.o main-loop.o notify.o iohandler.o = cutils.o async.o

tools-obj-$(CONFIG_POSIX) +=3D compatfd.o

-qemu-img$(EXESUF): qemu-img.o= $(tools-obj-y) $(block-obj-y) $(replication-obj-y)

-qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y) $(replicati= on-obj-y)

-qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(= block-obj-y) $(replication-obj-y)

+qemu-img$(EXES= UF): qemu-img.o $(tools-obj-y) $(block-obj-y)

+qe= mu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y)

+qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(= block-obj-y)

=A0qemu-= bridge-helper$(EXESUF): qemu-bridge-helper.o

qem= u-bridge-helper.o: $(GENERATED_HEADERS)

diff --git a/Makefile.objs b/Makefile.objs

index a28eefb..01413a2 100755

--= - a/Makefile.objs

+++ b/Makefile.objs

@@ -30,6 +30,11 @@ block-obj-y +=3D $(coroutine-obj-y) $(qobject-obj-y) $(v= ersion-obj-y)

block-obj-$(CONFIG_POSIX) +=3D pos= ix-aio-compat.o

block-obj-$(CONFIG_LINUX_AIO) += =3D linux-aio.o

+# Replication agent

=

+replication-nested-y =3D repagent_client.o=A0 repag= ent.o=A0 repcmd_listener.o

+replication-obj-y =3D= $(addprefix replication/, $(replication-nested-y))

+block-obj-y +=3D $(replication-obj-y)

+

block-nested-y +=3D raw.o cow.o= qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vvfat.o

block-nested-y +=3D qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2= -snapshot.o qcow2-cache.o

block-nested-y +=3D qed.o qed-gencb.o qed-l2-cache.= o qed-table.o qed-cluster.o

@@ -423,11 +428,6 @@ = common-obj-y +=3D qmp-marshal.o qapi-visit.o qapi-types.o $(qapi-obj-y)

=

common-obj-y +=3D qmp.o hmp.o

=A0##############################################################= ########

-# replication

-replication-nested-y =3D repagent_client.o=A0 repagent.o=A0 repcmd_listen= er.o

-replication-obj-y =3D $(addprefix replication/, $(r= eplication-nested-y))

-

-######################################################################

# guest agent

=A0qga= -nested-y =3D commands.o guest-agent-command-state.o

diff --git a/configure b/configure

index f973= 94f..83b74c2 100755

--- a/configure

+++ b/conf= igure

@@ -2883,7 +2883,6 @@ echo "curses sup= port=A0=A0=A0 $curses"

echo "curl supp= ort=A0=A0=A0=A0=A0 $curl"

echo "mingw32 support=A0=A0 $mingw32"

=

echo "Audio drivers=A0=A0=A0=A0 $audio_drv_lis= t"

-echo "Replication=A0=A0=A0=A0=A0 = =A0=A0=A0=A0 $replication"

echo "Extra audio cards $audio_card_list"=

echo "Block whitelist=A0=A0 $block_drv_whi= telist"

echo "Mixer emulation=A0=A0 $m= ixemu"

@@ -3904,3 +3903,5 @@ symlink $source_path/Makefile.= user $d/Makefile

if test "$docs" =3D &= quot;yes" ; then

=A0=A0 mkdir -p QMP

fi

+

+echo "Repli= cation=A0=A0=A0=A0=A0 =A0=A0=A0 $replication"

diff --git a/replication/repagent_client.c b/replication/repagent_client.= c

index 4dd9ea4..eaa0a28 100644

--- a/replication/r= epagent_client.c

+++ b/replication/repagent_clien= t.c

@@ -3,6 +3,7 @@

#i= nclude "repcmd_listener.h"

#include "repagent_client.h"

#include "repagent.h"

+= #include "main-loop.h"

=A0#include <string.h>

#include <stdlib= .h>

@@ -26,6 +27,15 @@ typedef struct repagent= _client_state {

=A0st= atic repagent_client_state g_client_state =3D { 0 };

+static void repagent_cl= ient_read(void *opaque)

+{

+=A0=A0=A0 printf("repagent_client_read\n");

+=A0=A0=A0 int bytes_read =3D repcmd_listener_socket_read_next_bu= f(g_client_state.hsock);

+=A0=A0=A0 if (bytes_read <=3D 0) {

+=A0=A0=A0=A0=A0=A0=A0 g_client_state.is_connected =3D 0;

+=A0=A0=A0 }

+}

+

void *repagent_listen(void *pParam)

{

=A0=A0=A0=A0 rephub_params *pServerParams =3D (rephub_para= ms *) pParam;

@@ -80,13 +90,25 @@ void *repagent_= listen(void *pParam)

=A0=A0=A0=A0=A0=A0=A0=A0 }

=A0=A0=A0=A0=A0=A0=A0=A0 retries =3D 0;

-=A0=A0=A0=A0=A0=A0=A0 g_client_state.is_connected =3D = 1;

=A0=A0=A0=A0=A0=A0= =A0=A0=A0repagent_client_connected();

-=A0=A0=A0=A0=A0=A0=A0 repcmd_listener(g_client_stat= e.hsock, repagent_process_cmd, NULL);

-=A0=A0=A0= =A0=A0=A0=A0 close(g_client_state.hsock);

+=A0=A0= =A0=A0=A0=A0=A0 repcmd_listener_init(repagent_process_cmd, NULL);

+=A0=A0=A0=A0=A0=A0=A0 g_client_state.is_connected = =3D 1;

+=A0=A0=A0=A0=A0=A0=A0 static int c;

+=A0=A0=A0=A0=A0=A0=A0 /* repcmd_listener_socket_threa= d_listener(g_client_state.hsock); */

+=A0=A0=A0=A0=A0=A0=A0 qemu_set_fd_handler(g_client_state.hsock, repagent_c= lient_read, NULL,

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0 NULL);

+=A0=A0=A0=A0=A0=A0=A0 whi= le (g_client_state.is_connected) {

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 printf("Connected (%d)...\n", = c++);

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 usleep(1= * 1000 * 1000);

+=A0=A0=A0=A0=A0=A0=A0 }

+=A0=A0=A0=A0=A0=A0=A0 /* Unregister */

+=A0=A0=A0=A0=A0=A0=A0 qemu_set_fd_handler(g_client_state.hsock, NULL, NULL= , NULL);

+=A0=A0=A0= =A0=A0=A0=A0 printf("Disconnected\n");

= =A0=A0=A0=A0=A0=A0=A0=A0 g_client_state.is_connected =3D 0;

+=A0=A0=A0=A0=A0=A0=A0 close(g_client_state.hsock);<= /p>

+

=A0=A0=A0=A0 }

=A0=A0=A0=A0 return 0;

}

=

diff --git a/replication/repcmd_listener.c b/replica= tion/repcmd_listener.c

index a211927..c1ce97f 100644

--- a/replication/repcmd_listener.c

+++ b/rep= lication/repcmd_listener.c

@@ -26,93 +26,129 @@

=A0#define ZERO_MEM_OBJ(pObj) memset((void *)pO= bj, 0, sizeof(*pObj))

+

+typedef struct RepCmdRxCmdState {

+=A0=A0=A0 RepCmd curCmd;

+=A0=A0=A0 uint8_t *pRe= adBuf;

+=A0=A0=A0 int bytesToGet;

+=A0=A0=A0 int bytesGotten;

+=A0=A0=A0= int isGotHeader;

+=A0=A0=A0 uint8_t *pdata;

+} RepCmdRxCmdState;

+

typedef struct RepCmd= ListenerState {

=A0=A0=A0=A0 int is_terminate_rec= eive;

+=A0=A0=A0 pfn_received_cmd_cb=A0 receive_cb;

+= =A0=A0=A0 void *opaque;

+=A0=A0=A0 int hsock;

=

+=A0=A0=A0 RepCmdRxCmdState cur_cmd;

} RepCmdListenerState;

=A0static RepCmdListener= State g_listenerState =3D { 0 };

-/* Returns 0 for initiated termination or socket error value o= n error */

-int repcmd_listener(int hsock, pfn_received_cmd_cb = callback, void *clientPtr)

+static int repcmd_lis= tener_process_rx(int bytecount);

+

+void repcmd_listener_init(pfn_received_cmd_cb callback, void *opaque)

<= p class=3D"MsoNormal"> {

-=A0=A0=A0 RepCmd curCmd= ;

-=A0=A0=A0 uint8_t *pReadBuf =3D (uint8_t *) &a= mp;curCmd;

-=A0=A0=A0 int bytesToGet =3D sizeof(RepCmd);

-= =A0=A0=A0 int bytesGotten =3D 0;

-=A0=A0=A0 int i= sGotHeader =3D 0;

-=A0=A0=A0 uint8_t *pdata =3D N= ULL;

+=A0=A0=A0 ZERO_MEM_OBJ(&g_listenerState= );

+=A0=A0=A0 g_listenerState.receive_cb =3D callback;<= /p>

+=A0=A0=A0 g_listenerState.opaque =3D opaque;

=

-=A0=A0=A0 assert(callba= ck !=3D NULL);

+=A0=A0=A0 g_listenerState.cur_cmd.bytesToGet =3D sizeof(RepCmd);

+=A0=A0=A0 g_listenerState.cur_cmd.pReadBuf =3D

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 (uint8_t *) &g_liste= nerState.cur_cmd.curCmd;

+}

+int repcmd_listen= er_socket_read_next_buf(int hsock)

+{

+=A0=A0=A0 RepCmdRxCmdState *cmd_state =3D &g_listenerSt= ate.cur_cmd;

+=A0=A0=A0 int bytecount =3D recv(hsock, cmd_state-&= gt;pReadBuf + cmd_state->bytesGotten,

+=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0 cmd_state->bytesToGet - cmd_state->bytesG= otten, 0);

+=A0=A0=A0 return repcmd_listener_process_rx(bytecount);

+}

+

+/* Return= s 0 for initiated termination or socket error value on error */

+int repcmd_listener_socket_thread_listener(int hsock)

+{

+=A0=A0=A0 int ret =3D 0;

=A0=A0=A0=A0 /* receive loop */

=A0= =A0=A0=A0 while (!g_listenerState.is_terminate_receive) {

-=A0=A0=A0=A0=A0=A0=A0 int bytecount;

-

-=A0=A0=A0=A0=A0=A0=A0 bytecount = =3D recv(hsock, pReadBuf + bytesGotten,

-=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 bytesToGet - bytesGotten, 0);

-=A0=A0=A0=A0=A0=A0=A0 if (bytecount =3D=3D -1) {

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 fprintf(stderr, = "Error receiving data %d\n", errno);

-= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 return errno;

+=A0=A0=A0=A0=A0=A0=A0 ret =3D repcmd_listener_socket_read_next_buf(hsock);=

+=A0=A0=A0=A0=A0=A0=A0 if (ret <=3D 0) {

<= p class=3D"MsoNormal">+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 return ret;

=A0=A0=A0=A0=A0=A0=A0=A0 }

+= =A0=A0=A0 }

+=A0 =A0=A0return 0;

+}

-=A0=A0=A0=A0=A0=A0=A0 = if (bytecount =3D=3D 0) {

-=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0 printf("Disconnected\n");

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 return 0;

-=A0= =A0=A0=A0=A0=A0=A0 }

-=A0=A0=A0=A0=A0=A0=A0 bytes= Gotten +=3D bytecount;

+static int repcmd_listene= r_process_rx(int bytecount)

+{

+=A0=A0=A0 RepCmdRxCmdState *cmd_state =3D &g_li= stenerState.cur_cmd;

+=A0=A0=A0 if (bytecount =3D= =3D -1) {

+=A0=A0=A0=A0=A0=A0=A0 fprintf(stderr, = "Error receiving data %d\n", errno);

+=A0=A0=A0=A0=A0=A0=A0 return errno;

+=A0=A0=A0 }

+

+=A0=A0=A0 if (bytecount =3D=3D 0) {

+=A0=A0= =A0=A0=A0=A0=A0 printf("Disconnected\n");

+=A0=A0=A0=A0=A0=A0=A0 return 0;

+=A0=A0=A0 }

+=A0=A0=A0 cmd_state->byte= sGotten +=3D bytecount;

/*=A0=A0=A0=A0 printf(&q= uot;Recieved bytes %d, got %d/%d\n",

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 bytec= ount, bytesGotten, bytesToGet); */

-=A0=A0=A0=A0= =A0=A0=A0 /* print content */

-=A0=A0=A0=A0=A0=A0= =A0 if (0) {

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 i= nt i;

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 for (i =3D 0; i < bytecount ; i +=3D = 4) {

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0 /*printf("%d/%d", i, bytecount/4); */

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 printf("%#x ",<= /p>

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 *(in= t *) (&pReadBuf[bytesGotten - bytecount + i]));

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 bytecount, cmd_state->bytesGotten,= cmd_state->bytesToGet); */

+=A0=A0=A0 /* prin= t content */

+=A0=A0=A0 if (0) {

+=A0= =A0=A0=A0=A0=A0=A0 int i;

+=A0=A0=A0=A0=A0=A0=A0 = for (i =3D 0; i < bytecount ; i +=3D 4) {

+=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 /*printf("%d/%d", i, bytecount/4);= */

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 printf(

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= "%#x ",

+=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 *(int *) (&cmd_state->pReadBuf[cmd_st= ate->bytesGotten

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0 - bytecount + i]));

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 }

-= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 printf("\n");

=A0=A0=A0=A0=A0=A0=A0=A0 }

-=A0=A0=A0=A0=A0=A0=A0 assert(bytesGotten <=3D bytesToGet);

-=A0=A0=A0=A0=A0=A0=A0 if (bytesGotten =3D=3D bytesToGet) {<= /p>

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 int isGotData = =3D 0;

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 bytesGo= tten =3D 0;

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 if (!isGotHeader)= {

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= /* We just got the header */

-=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0 isGotHeader =3D 1;

-<= /p>

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 assert(curCmd.hdr.magic1 =3D= =3D REPCMD_MAGIC1);

-=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0 assert(curCmd.magic2 =3D=3D REPCMD_MAGIC2);

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 if (curCmd.hd= r.data_size_bytes > 0) {

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 pdata =3D (uint8_t *)REPCMD_MALLOC(

-=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0 curCmd.hdr.data_size_bytes);

-/*= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 printf("mall= oc %p\n", pdata); */

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 pReadBuf =3D pdata;

-=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0 } else {

-=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 /* no data */

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 isGot= Data =3D 1;

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 pdata =3D NULL;

-=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0 }

-=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0 bytesToGet =3D curCmd.hdr.data_size_bytes;

+=A0=A0=A0=A0=A0=A0=A0 printf("\n");

+=A0=A0=A0 }

+=A0=A0=A0 as= sert(cmd_state->bytesGotten <=3D cmd_state->bytesToGet);

+=A0=A0=A0 if (cmd_state->bytesGotten =3D=3D cmd_state-= >bytesToGet) {

+=A0=A0=A0=A0=A0=A0=A0 int isGotData =3D 0;

+=A0= =A0=A0=A0=A0=A0=A0 cmd_state->bytesGotten =3D 0;

+=A0=A0=A0=A0=A0=A0=A0 if (!cmd_state->isGotHeader) {

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 /* We just got the header */<= /p>

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 cmd_state->isG= otHeader =3D 1;

+

+=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 assert(cmd_state->curCmd.hdr.magic1 =3D= =3D REPCMD_MAGIC1);

+=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 assert(cmd_state->curCmd.magic2 =3D=3D REPCMD_MAGIC2);

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 if (cmd_state->= ;curCmd.hdr.data_size_bytes > 0) {

+=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 cmd_state->pdata =3D (uint8_t *)REP= CMD_MALLOC(

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 cmd_state->curCmd.hdr.data_size_bytes)= ;

+/*=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0 printf("malloc %p\n", cmd_state->pdata); */

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 cmd_state-= >pReadBuf =3D cmd_state->pdata;

=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0 } else {

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 /* no= data */

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0 isGotData =3D 1;

+=A0=A0=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0 cmd_state->pdata =3D NULL;

=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 }

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 cmd_state->bytesToGet =3D cmd_state-&= gt;curCmd.hdr.data_size_bytes;

+=A0=A0=A0=A0=A0= =A0=A0 } else {

+ =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0isGotData =3D 1;

+=A0=A0=A0=A0=A0=A0=A0 }

<= p class=3D"MsoNormal">

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 if (isGotDat= a) {

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0 /* Got command and data */

-=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 (*callback)(&curCmd, pdata, clientPtr);<= /p>

-

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 = /* It's the callee responsibility to free pData */

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 =A0pdata =3D NULL;

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 ZERO_MEM= _OBJ(&curCmd);

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 pRead= Buf =3D (uint8_t *) &curCmd;

-=A0=A0=A0=A0=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 bytesGotten =3D 0;

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 bytesToGet =3D sizeof(RepCm= d);

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 isGotHeader =3D 0;

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 }

+=A0=A0=A0=A0=A0=A0=A0 if (isGotData) {

+=A0= =A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 /* Got command and data */

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 (*g_listenerState.receive_cb)(&= amp;cmd_state->curCmd, cmd_state->pdata,

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0=A0 g_listenerState.opaque);

+

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 /* It's the callee respon= sibility to free cmd_state->pdata */

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 cmd_state->pdata =3D NULL;

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 ZERO_MEM_OBJ(&cmd_st= ate->curCmd);

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0 cmd_state->pReadBuf =3D (uint8_t *) &cmd_state->curCmd;

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 cmd_state->byt= esGotten =3D 0;

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0= =A0 cmd_state->bytesToGet =3D sizeof(RepCmd);

= +=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 cmd_state->isGotHeader =3D 0;

=A0=A0=A0=A0=A0=A0=A0=A0 }

=A0=A0=A0=A0 }

-=A0=A0=A0 return 0;

+=A0=A0= =A0 return bytecount;

}

=A0RepCmd *repcmd_new(int cmd_id, int data_si= ze, uint8_t **p_out_pdata)

diff --git a/replication/repcmd_listener.h b/replica= tion/repcmd_listener.h

index c09a12e..19b9ea9 100= 644

--- a/replication/repcmd_listener.h

+++ b/replication/repcmd_listener.h

@@ -24,9 +24,= 11 @@

#ifndef REPCMD_LISTENER_H

#define REPCMD_LISTENER_H

#include &l= t;stdint.h>

-typedef void (*pfn_received_cmd_cb)(RepCmd *pCmd,

-=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 uin= t8_t *pData, void *clientPtr);

+typedef void (*pf= n_received_cmd_cb)(RepCmd *pcmd,

+=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0=A0 uint8= _t *pdata, void *opaque);

-int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void *cl= ientPtr);

+void repcmd_listener_init(pfn_received_cmd_cb callback, void *opaque);

=

+int repcmd_listener_socket_read_next_buf(int hsock)= ;

+int repcmd_listener_socket_thread_listener(int= hsock);

=A0#endif /* REPCMD_LIST= ENER_H */

--

1.7.6.5

--20cf3074b916ce7c3104bced874d--