From: Ori Mamluk <omamluk@zerto.com>
To: qemu-devel@nongnu.org
Cc: Kevin Wolf <kwolf@redhat.com>,
Roni Luxenberg <rluxenbe@redhat.com>,
Stefan Hajnoczi <stefanha@gmail.com>,
dlaor@redhat.com, Anthony Liguori <anthony@codemonkey.ws>,
Oded Kedem <oded@zerto.com>, Yair Kuszpet <yairk@zerto.com>,
Paolo Bonzini <pbonzini@redhat.com>
Subject: [Qemu-devel] [RFC PATCH v3 2/9] repagent: Changed repagent socket to be based on set_fd_handler, converted parsing functions to be non-serial
Date: Thu, 5 Apr 2012 15:17:50 +0300 [thread overview]
Message-ID: <cd0c4796377c415081de2534c4730532@mail.gmail.com> (raw)
[-- Attachment #1: Type: text/plain, Size: 12874 bytes --]
Use set_fd_handler instead of a listening thread.
Change the reading/parsing function to be state-machine based, because they
no longer have their own thread.
---
Makefile | 6 +-
Makefile.objs | 10 +-
configure | 3 +-
replication/repagent_client.c | 28 ++++++-
replication/repcmd_listener.c | 166
+++++++++++++++++++++++++----------------
replication/repcmd_listener.h | 8 +-
6 files changed, 141 insertions(+), 80 deletions(-)
diff --git a/Makefile b/Makefile
index fbd77df..b6379fb 100644
--- a/Makefile
+++ b/Makefile
@@ -156,9 +156,9 @@ tools-obj-y = $(oslib-obj-y) $(trace-obj-y) qemu-tool.o
qemu-timer.o \
qemu-timer-common.o main-loop.o notify.o iohandler.o
cutils.o async.o
tools-obj-$(CONFIG_POSIX) += compatfd.o
-qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y)
$(replication-obj-y)
-qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y)
$(replication-obj-y)
-qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y)
$(replication-obj-y)
+qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y)
+qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y)
+qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y)
qemu-bridge-helper$(EXESUF): qemu-bridge-helper.o
qemu-bridge-helper.o: $(GENERATED_HEADERS)
diff --git a/Makefile.objs b/Makefile.objs
index a28eefb..01413a2 100755
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -30,6 +30,11 @@ block-obj-y += $(coroutine-obj-y) $(qobject-obj-y)
$(version-obj-y)
block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
+# Replication agent
+replication-nested-y = repagent_client.o repagent.o repcmd_listener.o
+replication-obj-y = $(addprefix replication/, $(replication-nested-y))
+block-obj-y += $(replication-obj-y)
+
block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o
vpc.o vvfat.o
block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o
qcow2-cache.o
block-nested-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
@@ -423,11 +428,6 @@ common-obj-y += qmp-marshal.o qapi-visit.o
qapi-types.o $(qapi-obj-y)
common-obj-y += qmp.o hmp.o
######################################################################
-# replication
-replication-nested-y = repagent_client.o repagent.o repcmd_listener.o
-replication-obj-y = $(addprefix replication/, $(replication-nested-y))
-
-######################################################################
# guest agent
qga-nested-y = commands.o guest-agent-command-state.o
diff --git a/configure b/configure
index f97394f..83b74c2 100755
--- a/configure
+++ b/configure
@@ -2883,7 +2883,6 @@ echo "curses support $curses"
echo "curl support $curl"
echo "mingw32 support $mingw32"
echo "Audio drivers $audio_drv_list"
-echo "Replication $replication"
echo "Extra audio cards $audio_card_list"
echo "Block whitelist $block_drv_whitelist"
echo "Mixer emulation $mixemu"
@@ -3904,3 +3903,5 @@ symlink $source_path/Makefile.user $d/Makefile
if test "$docs" = "yes" ; then
mkdir -p QMP
fi
+
+echo "Replication $replication"
diff --git a/replication/repagent_client.c b/replication/repagent_client.c
index 4dd9ea4..eaa0a28 100644
--- a/replication/repagent_client.c
+++ b/replication/repagent_client.c
@@ -3,6 +3,7 @@
#include "repcmd_listener.h"
#include "repagent_client.h"
#include "repagent.h"
+#include "main-loop.h"
#include <string.h>
#include <stdlib.h>
@@ -26,6 +27,15 @@ typedef struct repagent_client_state {
static repagent_client_state g_client_state = { 0 };
+static void repagent_client_read(void *opaque)
+{
+ printf("repagent_client_read\n");
+ int bytes_read =
repcmd_listener_socket_read_next_buf(g_client_state.hsock);
+ if (bytes_read <= 0) {
+ g_client_state.is_connected = 0;
+ }
+}
+
void *repagent_listen(void *pParam)
{
rephub_params *pServerParams = (rephub_params *) pParam;
@@ -80,13 +90,25 @@ void *repagent_listen(void *pParam)
}
retries = 0;
- g_client_state.is_connected = 1;
repagent_client_connected();
- repcmd_listener(g_client_state.hsock, repagent_process_cmd, NULL);
- close(g_client_state.hsock);
+ repcmd_listener_init(repagent_process_cmd, NULL);
+ g_client_state.is_connected = 1;
+ static int c;
+ /* repcmd_listener_socket_thread_listener(g_client_state.hsock); */
+ qemu_set_fd_handler(g_client_state.hsock, repagent_client_read,
NULL,
+ NULL);
+ while (g_client_state.is_connected) {
+ printf("Connected (%d)...\n", c++);
+ usleep(1 * 1000 * 1000);
+ }
+ /* Unregister */
+ qemu_set_fd_handler(g_client_state.hsock, NULL, NULL, NULL);
+ printf("Disconnected\n");
g_client_state.is_connected = 0;
+ close(g_client_state.hsock);
+
}
return 0;
}
diff --git a/replication/repcmd_listener.c b/replication/repcmd_listener.c
index a211927..c1ce97f 100644
--- a/replication/repcmd_listener.c
+++ b/replication/repcmd_listener.c
@@ -26,93 +26,129 @@
#define ZERO_MEM_OBJ(pObj) memset((void *)pObj, 0, sizeof(*pObj))
+
+typedef struct RepCmdRxCmdState {
+ RepCmd curCmd;
+ uint8_t *pReadBuf;
+ int bytesToGet;
+ int bytesGotten;
+ int isGotHeader;
+ uint8_t *pdata;
+} RepCmdRxCmdState;
+
typedef struct RepCmdListenerState {
int is_terminate_receive;
+ pfn_received_cmd_cb receive_cb;
+ void *opaque;
+ int hsock;
+ RepCmdRxCmdState cur_cmd;
} RepCmdListenerState;
static RepCmdListenerState g_listenerState = { 0 };
-/* Returns 0 for initiated termination or socket error value on error */
-int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void
*clientPtr)
+static int repcmd_listener_process_rx(int bytecount);
+
+void repcmd_listener_init(pfn_received_cmd_cb callback, void *opaque)
{
- RepCmd curCmd;
- uint8_t *pReadBuf = (uint8_t *) &curCmd;
- int bytesToGet = sizeof(RepCmd);
- int bytesGotten = 0;
- int isGotHeader = 0;
- uint8_t *pdata = NULL;
+ ZERO_MEM_OBJ(&g_listenerState);
+ g_listenerState.receive_cb = callback;
+ g_listenerState.opaque = opaque;
- assert(callback != NULL);
+ g_listenerState.cur_cmd.bytesToGet = sizeof(RepCmd);
+ g_listenerState.cur_cmd.pReadBuf =
+ (uint8_t *) &g_listenerState.cur_cmd.curCmd;
+}
+int repcmd_listener_socket_read_next_buf(int hsock)
+{
+ RepCmdRxCmdState *cmd_state = &g_listenerState.cur_cmd;
+ int bytecount = recv(hsock, cmd_state->pReadBuf +
cmd_state->bytesGotten,
+ cmd_state->bytesToGet - cmd_state->bytesGotten, 0);
+ return repcmd_listener_process_rx(bytecount);
+}
+
+/* Returns 0 for initiated termination or socket error value on error */
+int repcmd_listener_socket_thread_listener(int hsock)
+{
+ int ret = 0;
/* receive loop */
while (!g_listenerState.is_terminate_receive) {
- int bytecount;
-
- bytecount = recv(hsock, pReadBuf + bytesGotten,
- bytesToGet - bytesGotten, 0);
- if (bytecount == -1) {
- fprintf(stderr, "Error receiving data %d\n", errno);
- return errno;
+ ret = repcmd_listener_socket_read_next_buf(hsock);
+ if (ret <= 0) {
+ return ret;
}
+ }
+ return 0;
+}
- if (bytecount == 0) {
- printf("Disconnected\n");
- return 0;
- }
- bytesGotten += bytecount;
+static int repcmd_listener_process_rx(int bytecount)
+{
+ RepCmdRxCmdState *cmd_state = &g_listenerState.cur_cmd;
+ if (bytecount == -1) {
+ fprintf(stderr, "Error receiving data %d\n", errno);
+ return errno;
+ }
+
+ if (bytecount == 0) {
+ printf("Disconnected\n");
+ return 0;
+ }
+ cmd_state->bytesGotten += bytecount;
/* printf("Recieved bytes %d, got %d/%d\n",
- bytecount, bytesGotten, bytesToGet); */
- /* print content */
- if (0) {
- int i;
- for (i = 0; i < bytecount ; i += 4) {
- /*printf("%d/%d", i, bytecount/4); */
- printf("%#x ",
- *(int *) (&pReadBuf[bytesGotten - bytecount + i]));
+ bytecount, cmd_state->bytesGotten, cmd_state->bytesToGet); */
+ /* print content */
+ if (0) {
+ int i;
+ for (i = 0; i < bytecount ; i += 4) {
+ /*printf("%d/%d", i, bytecount/4); */
+ printf(
+ "%#x ",
+ *(int *) (&cmd_state->pReadBuf[cmd_state->bytesGotten
+ - bytecount + i]));
- }
- printf("\n");
}
- assert(bytesGotten <= bytesToGet);
- if (bytesGotten == bytesToGet) {
- int isGotData = 0;
- bytesGotten = 0;
- if (!isGotHeader) {
- /* We just got the header */
- isGotHeader = 1;
-
- assert(curCmd.hdr.magic1 == REPCMD_MAGIC1);
- assert(curCmd.magic2 == REPCMD_MAGIC2);
- if (curCmd.hdr.data_size_bytes > 0) {
- pdata = (uint8_t *)REPCMD_MALLOC(
- curCmd.hdr.data_size_bytes);
-/* printf("malloc %p\n", pdata); */
- pReadBuf = pdata;
- } else {
- /* no data */
- isGotData = 1;
- pdata = NULL;
- }
- bytesToGet = curCmd.hdr.data_size_bytes;
+ printf("\n");
+ }
+ assert(cmd_state->bytesGotten <= cmd_state->bytesToGet);
+ if (cmd_state->bytesGotten == cmd_state->bytesToGet) {
+ int isGotData = 0;
+ cmd_state->bytesGotten = 0;
+ if (!cmd_state->isGotHeader) {
+ /* We just got the header */
+ cmd_state->isGotHeader = 1;
+
+ assert(cmd_state->curCmd.hdr.magic1 == REPCMD_MAGIC1);
+ assert(cmd_state->curCmd.magic2 == REPCMD_MAGIC2);
+ if (cmd_state->curCmd.hdr.data_size_bytes > 0) {
+ cmd_state->pdata = (uint8_t *)REPCMD_MALLOC(
+ cmd_state->curCmd.hdr.data_size_bytes);
+/* printf("malloc %p\n", cmd_state->pdata); */
+ cmd_state->pReadBuf = cmd_state->pdata;
} else {
+ /* no data */
isGotData = 1;
+ cmd_state->pdata = NULL;
}
+ cmd_state->bytesToGet = cmd_state->curCmd.hdr.data_size_bytes;
+ } else {
+ isGotData = 1;
+ }
- if (isGotData) {
- /* Got command and data */
- (*callback)(&curCmd, pdata, clientPtr);
-
- /* It's the callee responsibility to free pData */
- pdata = NULL;
- ZERO_MEM_OBJ(&curCmd);
- pReadBuf = (uint8_t *) &curCmd;
- bytesGotten = 0;
- bytesToGet = sizeof(RepCmd);
- isGotHeader = 0;
- }
+ if (isGotData) {
+ /* Got command and data */
+ (*g_listenerState.receive_cb)(&cmd_state->curCmd,
cmd_state->pdata,
+ g_listenerState.opaque);
+
+ /* It's the callee responsibility to free cmd_state->pdata */
+ cmd_state->pdata = NULL;
+ ZERO_MEM_OBJ(&cmd_state->curCmd);
+ cmd_state->pReadBuf = (uint8_t *) &cmd_state->curCmd;
+ cmd_state->bytesGotten = 0;
+ cmd_state->bytesToGet = sizeof(RepCmd);
+ cmd_state->isGotHeader = 0;
}
}
- return 0;
+ return bytecount;
}
RepCmd *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata)
diff --git a/replication/repcmd_listener.h b/replication/repcmd_listener.h
index c09a12e..19b9ea9 100644
--- a/replication/repcmd_listener.h
+++ b/replication/repcmd_listener.h
@@ -24,9 +24,11 @@
#ifndef REPCMD_LISTENER_H
#define REPCMD_LISTENER_H
#include <stdint.h>
-typedef void (*pfn_received_cmd_cb)(RepCmd *pCmd,
- uint8_t *pData, void *clientPtr);
+typedef void (*pfn_received_cmd_cb)(RepCmd *pcmd,
+ uint8_t *pdata, void *opaque);
-int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void
*clientPtr);
+void repcmd_listener_init(pfn_received_cmd_cb callback, void *opaque);
+int repcmd_listener_socket_read_next_buf(int hsock);
+int repcmd_listener_socket_thread_listener(int hsock);
#endif /* REPCMD_LISTENER_H */
--
1.7.6.5
[-- Attachment #2: Type: text/html, Size: 22827 bytes --]
reply other threads:[~2012-04-05 12:18 UTC|newest]
Thread overview: [no followups] expand[flat|nested] mbox.gz Atom feed
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=cd0c4796377c415081de2534c4730532@mail.gmail.com \
--to=omamluk@zerto.com \
--cc=anthony@codemonkey.ws \
--cc=dlaor@redhat.com \
--cc=kwolf@redhat.com \
--cc=oded@zerto.com \
--cc=pbonzini@redhat.com \
--cc=qemu-devel@nongnu.org \
--cc=rluxenbe@redhat.com \
--cc=stefanha@gmail.com \
--cc=yairk@zerto.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).