* [Qemu-devel] [RFC PATCH v3 8/9] repagent: Work on review of patch V2 - moved to QTAILQ instead of array, replaced pthreads with qemu-thread
@ 2012-04-05 12:20 Ori Mamluk
0 siblings, 0 replies; only message in thread
From: Ori Mamluk @ 2012-04-05 12:20 UTC (permalink / raw)
To: qemu-devel
Cc: Kevin Wolf, Roni Luxenberg, Stefan Hajnoczi, dlaor,
Anthony Liguori, Oded Kedem, Yair Kuszpet, Paolo Bonzini
[-- Attachment #1: Type: text/plain, Size: 19959 bytes --]
Changed volumes list to QTAILQ instead of an array.
Replaced pthreads with qemu-thread
Fixed license text
---
block.c | 9 ---
block/repagent/qemu-repagent.txt | 2 +
block/repagent/repagent.c | 142
++++++++++++++++++--------------------
block/repagent/repagent.h | 5 +-
block/repagent/repagent_client.c | 2 +-
block/repagent/repagent_client.h | 4 +-
block/repagent/repagent_drv.c | 6 +-
block/repagent/repcmd_listener.c | 2 +-
block/repagent/repcmd_listener.h | 2 +-
block/repagent/rephub_cmds.h | 2 +-
block/repagent/rephub_defs.h | 2 +-
11 files changed, 82 insertions(+), 96 deletions(-)
diff --git a/block.c b/block.c
index 8e11c03..b77ca0f 100644
--- a/block.c
+++ b/block.c
@@ -1486,13 +1486,6 @@ static int bdrv_rw_co(BlockDriverState *bs, int64_t
sector_num, uint8_t *buf,
qemu_aio_wait();
}
}
- /* orim todo remove */
- printf("IO done. is_write %d sec %lld size %d ", is_write,
- (long long int) sector_num, nb_sectors);
- if (bs->drv != NULL) {
- printf("Drv %s, ", bs->drv->format_name);
- }
- printf("file %s, ret %d\n", bs->filename, rwco.ret);
return rwco.ret;
}
@@ -1966,8 +1959,6 @@ int64_t bdrv_getlength(BlockDriverState *bs)
ret = drv->bdrv_getlength(bs);
}
}
- /* orim todo remove */
- printf("bdrv_getlength returned %d", (int)ret);
return ret;
}
diff --git a/block/repagent/qemu-repagent.txt
b/block/repagent/qemu-repagent.txt
index 0f9dc03..af1245c 100644
--- a/block/repagent/qemu-repagent.txt
+++ b/block/repagent/qemu-repagent.txt
@@ -1,5 +1,7 @@
repagent - replication agent - a Qemu module for enabling continuous
async replication of VM volumes
+Ori Mamluk, 2012
+
Introduction
This document describes a feature in Qemu - a replication agent (named
Repagent).
The Repagent is a new module that exposes an API to an external
replication system (AKA Rephub).
diff --git a/block/repagent/repagent.c b/block/repagent/repagent.c
index 2e70853..a5c0636 100644
--- a/block/repagent/repagent.c
+++ b/block/repagent/repagent.c
@@ -1,7 +1,7 @@
/*
* QEMU System Emulator replication agent
*
- * Copyright (c) 2003 Fabrice Bellard
+ * Copyright (c) 2012 Ori Mamluk
*
* Permission is hereby granted, free of charge, to any person obtaining a
copy
* of this software and associated documentation files (the "Software"),
to deal
@@ -24,7 +24,6 @@
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
-#include <pthread.h>
#include <stdint.h>
#include "block.h"
@@ -33,21 +32,24 @@
#include "repagent_client.h"
#include "repagent.h"
#include "rephub_cmds.h"
+#include "qemu-queue.h"
+#include "qemu-thread.h"
#define ZERO_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj))
-#define REPAGENT_MAX_NUM_VOLUMES (64)
#define REPAGENT_VOLUME_ID_NONE (0)
typedef struct RepagentVolume {
uint64_t vol_id;
- const char *vol_path;
+ char *vol_path;
BlockDriverState *driver_ptr;
+ QTAILQ_ENTRY(RepagentVolume) list;
} RepagentVolume;
struct RepAgentState {
bool is_init;
int num_volumes;
- RepagentVolume *volumes[REPAGENT_MAX_NUM_VOLUMES];
+ QTAILQ_HEAD(RepagentVolumesList, RepagentVolume) volumes;
+ QemuThread disconnectThread;
};
typedef struct RepagentReadVolIo {
@@ -57,9 +59,9 @@ typedef struct RepagentReadVolIo {
struct timeval start_time;
} RepagentReadVolIo;
-static int repagent_get_volume_by_driver(
+static RepagentVolume *repagent_get_volume_by_driver(
BlockDriverState *bs);
-static int repagent_get_volume_by_name(const char *name);
+static RepagentVolume *repagent_get_volume_by_name(const char *name);
static void repagent_report_volumes_to_hub(void);
static void repagent_remote_io_done(void *opaque, int ret);
static struct timeval tsub(struct timeval t1, struct timeval t2);
@@ -68,6 +70,10 @@ RepAgentState g_rep_agent = { 0 };
void repagent_init(const char *hubname, int port)
{
+ QTAILQ_HEAD(RepagentVolumesList, RepagentVolume) tmpHead =
+ QTAILQ_HEAD_INITIALIZER(tmpHead);
+ memcpy(&g_rep_agent.volumes, &tmpHead, sizeof(g_rep_agent.volumes));
+
/* It is the responsibility of the thread to free this struct */
rephub_params *pParams = g_malloc(sizeof(rephub_params));
if (hubname == NULL) {
@@ -82,70 +88,57 @@ void repagent_init(const char *hubname, int port)
pParams->port = port;
pParams->name = g_strdup(hubname);
- pthread_t thread_id = 0;
/* Create the repagent client listener thread */
- pthread_create(&thread_id, 0, repagent_listen, (void *) pParams);
- pthread_detach(thread_id);
+ qemu_thread_create(&g_rep_agent.disconnectThread, repagent_listen,
+ (void *) pParams, QEMU_THREAD_DETACHED);
}
void repagent_register_drive(const char *drive_path,
BlockDriverState *driver_ptr)
{
/* Assert that the volume is not registered yet */
- int i = repagent_get_volume_by_name(drive_path);
- assert(i == -1);
-
- /*Add the volume at the last place*/
- assert(g_rep_agent.num_volumes < REPAGENT_MAX_NUM_VOLUMES);
-
- i = g_rep_agent.num_volumes;
- g_rep_agent.num_volumes++;
+ RepagentVolume *vol = repagent_get_volume_by_name(drive_path);
+ assert(vol == NULL);
printf("zerto repagent: Registering drive. Num drives %d, path %s\n",
g_rep_agent.num_volumes, drive_path);
- g_rep_agent.volumes[i] =
- (RepagentVolume *)g_malloc(sizeof(RepagentVolume));
- g_rep_agent.volumes[i]->driver_ptr = driver_ptr;
+ RepagentVolume *new_vol = g_malloc(sizeof(RepagentVolume));
+ new_vol->driver_ptr = driver_ptr;
/* orim todo strcpy? */
- g_rep_agent.volumes[i]->vol_path = drive_path;
+ new_vol->vol_path = g_strdup(drive_path);
+
+ QTAILQ_INSERT_HEAD(&g_rep_agent.volumes, new_vol, list);
/* Orim todo thread-safety? */
g_rep_agent.num_volumes++;
repagent_report_volumes_to_hub();
}
-void repagent_deregister_drive(const char *drive_path,
- BlockDriverState *driver_ptr)
+void repagent_deregister_drive(BlockDriverState *driver_ptr)
{
/* Orim todo thread-safety? */
- int i = repagent_get_volume_by_driver(driver_ptr);
- assert(i != -1);
-
- RepagentVolume *vol = g_rep_agent.volumes[i];
- /* Put the last volume in the cell of the removed volume to maintain a
- * contiguous array */
- g_rep_agent.volumes[i] = g_rep_agent.volumes[g_rep_agent.num_volumes -
1];
- g_rep_agent.volumes[g_rep_agent.num_volumes - 1] = NULL;
+ RepagentVolume *vol = repagent_get_volume_by_driver(driver_ptr);
+ assert(vol != NULL);
+
+ QTAILQ_REMOVE(&g_rep_agent.volumes, vol, list);
+ g_free(vol->vol_path);
g_rep_agent.num_volumes--;
g_free(vol);
-
}
/* orim todo destruction? */
-static int repagent_get_volume_by_driver(
+static RepagentVolume *repagent_get_volume_by_driver(
BlockDriverState *bs)
{
- /* orim todo optimize search */
- int i = 0;
- for (i = 0; i < g_rep_agent.num_volumes ; i++) {
- RepagentVolume *p_vol = g_rep_agent.volumes[i];
- if (p_vol != NULL && p_vol->driver_ptr == (void *) bs) {
- return i;
+ RepagentVolume *vol = NULL;
+ QTAILQ_FOREACH(vol, &g_rep_agent.volumes, list) {
+ if (vol != NULL && vol->driver_ptr == (void *)bs) {
+ return vol;
}
}
- return -1;
+ return NULL;
}
void repagent_handle_protected_write(BlockDriverState *bs, int64_t
sector_num,
@@ -160,15 +153,13 @@ void repagent_handle_protected_write(BlockDriverState
*bs, int64_t sector_num,
printf("\n");
/* orim todo thread safety? */
- int i = repagent_get_volume_by_driver(bs);
- if (i == -1 || g_rep_agent.volumes[i]->vol_id ==
REPAGENT_VOLUME_ID_NONE) {
+ RepagentVolume *vol = repagent_get_volume_by_driver(bs);
+ if (vol == NULL || vol->vol_id == REPAGENT_VOLUME_ID_NONE) {
/* Unprotected */
printf("Got a write to an unprotected volume.\n");
return;
}
- RepagentVolume *p_vol = g_rep_agent.volumes[i];
-
/* Report IO to rephub */
int data_size = qiov->size;
@@ -184,7 +175,7 @@ void repagent_handle_protected_write(BlockDriverState
*bs, int64_t sector_num,
qemu_iovec_to_buffer(qiov, pdata);
}
- p_cmd->volume_id = p_vol->vol_id;
+ p_cmd->volume_id = vol->vol_id;
p_cmd->offset_sectors = sector_num;
p_cmd->size_sectors = nb_sectors;
p_cmd->ret_status = ret_status;
@@ -197,23 +188,26 @@ void repagent_handle_protected_write(BlockDriverState
*bs, int64_t sector_num,
static void repagent_report_volumes_to_hub(void)
{
/* Report IO to rephub */
- int i;
RepCmdDataReportVmVolumes *p_cmd_data = NULL;
RepCmdReportVmVolumes *p_cmd = (RepCmdReportVmVolumes *) repcmd_new(
REPHUB_CMD_REPORT_VM_VOLUMES,
g_rep_agent.num_volumes * sizeof(RepVmVolumeInfo),
(uint8_t **) &p_cmd_data);
+ RepagentVolume *vol = QTAILQ_FIRST(&g_rep_agent.volumes);
p_cmd->num_volumes = g_rep_agent.num_volumes;
printf("reporting %u volumes\n", g_rep_agent.num_volumes);
+
+ int i;
for (i = 0; i < g_rep_agent.num_volumes ; i++) {
- assert(g_rep_agent.volumes[i] != NULL);
+ assert(vol != NULL);
printf("reporting volume %s size %u\n",
- g_rep_agent.volumes[i]->vol_path,
+ vol->vol_path,
(uint32_t) sizeof(p_cmd_data->volumes[i].name));
strncpy((char *) p_cmd_data->volumes[i].name,
- g_rep_agent.volumes[i]->vol_path,
+ vol->vol_path,
sizeof(p_cmd_data->volumes[i].name));
- p_cmd_data->volumes[i].volume_id = g_rep_agent.volumes[i]->vol_id;
+ p_cmd_data->volumes[i].volume_id = vol->vol_id;
+ vol = QTAILQ_NEXT(vol, list);
}
if (repagent_client_send((RepCmd *) p_cmd) != 0) {
printf("Error sending command\n");
@@ -225,51 +219,49 @@ bool repaget_start_protect(RepCmdStartProtect *pcmd,
{
printf("Start protect vol %s, ID %llu\n", pcmd_data->volume_name,
(unsigned long long) pcmd->volume_id);
- int vol_index = repagent_get_volume_by_name(pcmd_data->volume_name);
+ RepagentVolume *vol =
repagent_get_volume_by_name(pcmd_data->volume_name);
if (g_rep_agent.num_volumes > 0
&& strcmp(pcmd_data->volume_name, "stam") == 0) {
/* Choose the first one for rephub */
- vol_index = 0;
+ vol = QTAILQ_LAST(&g_rep_agent.volumes, RepagentVolumesList);
}
- if (vol_index < 0) {
+ if (vol == NULL) {
printf("The volume doesn't exist\n");
return true;
}
/* orim todo protect */
- g_rep_agent.volumes[vol_index]->vol_id = pcmd->volume_id;
+ vol->vol_id = pcmd->volume_id;
return true;
}
-static int repagent_get_volume_by_name(const char *name)
+static RepagentVolume *repagent_get_volume_by_name(const char *name)
{
- int i = 0;
- for (i = 0; i < g_rep_agent.num_volumes ; i++) {
- if (g_rep_agent.volumes[i] != NULL
- && strcmp(name, g_rep_agent.volumes[i]->vol_path) == 0) {
- return i;
+ RepagentVolume *vol = NULL;
+ QTAILQ_FOREACH(vol, &g_rep_agent.volumes, list) {
+ if (vol != NULL && strcmp(name, vol->vol_path) == 0) {
+ return vol;
}
}
- return -1;
+ return NULL;
}
-static int repagent_get_volume_by_id(uint64_t vol_id)
+static RepagentVolume *repagent_get_volume_by_id(uint64_t vol_id)
{
- int i = 0;
- for (i = 0; i < g_rep_agent.num_volumes ; i++) {
- if (g_rep_agent.volumes[i] != NULL
- && g_rep_agent.volumes[i]->vol_id == vol_id) {
- return i;
+ RepagentVolume *vol = NULL;
+ QTAILQ_FOREACH(vol, &g_rep_agent.volumes, list) {
+ if (vol != NULL && vol->vol_id == vol_id) {
+ return vol;
}
}
- return -1;
+ return NULL;
}
bool repagent_remote_io(RepCmdRemoteIoReq *pcmd, uint8_t *pdata)
{
- int index = repagent_get_volume_by_id(pcmd->volume_id);
+ RepagentVolume *vol = repagent_get_volume_by_id(pcmd->volume_id);
int size_bytes = pcmd->size_sectors * 512;
- if (index < 0) {
+ if (vol == NULL) {
printf("Vol read - Could not find vol id %llx\n",
(unsigned long long int) pcmd->volume_id);
RepCmdRemoteIoRes *p_res_cmd = (RepCmdRemoteIoRes *) repcmd_new(
@@ -282,7 +274,7 @@ bool repagent_remote_io(RepCmdRemoteIoReq *pcmd,
uint8_t *pdata)
}
printf("Vol read - driver %p, volId %llu, offset %llu, size %u\n",
- g_rep_agent.volumes[index]->driver_ptr,
+ vol->driver_ptr,
(unsigned long long int) pcmd->volume_id,
(unsigned long long int) pcmd->offset_sectors,
pcmd->size_sectors);
@@ -296,7 +288,7 @@ bool repagent_remote_io(RepCmdRemoteIoReq *pcmd,
uint8_t *pdata)
qemu_iovec_init(&io_xaction->qiov, 1);
/*read_xact->buf =
- qemu_blockalign(g_rep_agent.volumes[index]->driver_ptr,
size_bytes); */
+ qemu_blockalign(vol->driver_ptr, size_bytes); */
io_xaction->buf = (uint8_t *) g_malloc(size_bytes);
io_xaction->rep_cmd = *pcmd;
qemu_iovec_add(&io_xaction->qiov, io_xaction->buf, size_bytes);
@@ -305,12 +297,12 @@ bool repagent_remote_io(RepCmdRemoteIoReq *pcmd,
uint8_t *pdata)
/* orim TODO - use the returned acb to cancel the request on
shutdown */
/*acb = */
if (pcmd->is_read) {
- bdrv_aio_readv(g_rep_agent.volumes[index]->driver_ptr,
+ bdrv_aio_readv(vol->driver_ptr,
io_xaction->rep_cmd.offset_sectors, &io_xaction->qiov,
io_xaction->rep_cmd.size_sectors,
repagent_remote_io_done,
io_xaction);
} else {
- bdrv_aio_writev(g_rep_agent.volumes[index]->driver_ptr,
+ bdrv_aio_writev(vol->driver_ptr,
io_xaction->rep_cmd.offset_sectors, &io_xaction->qiov,
io_xaction->rep_cmd.size_sectors,
repagent_remote_io_done,
io_xaction);
diff --git a/block/repagent/repagent.h b/block/repagent/repagent.h
index 2863ffc..157a9b6 100644
--- a/block/repagent/repagent.h
+++ b/block/repagent/repagent.h
@@ -1,7 +1,7 @@
/*
* QEMU System Emulator
*
- * Copyright (c) 2003-2008 Fabrice Bellard
+ * Copyright (c) 2012 Ori Mamluk
*
* Permission is hereby granted, free of charge, to any person obtaining a
copy
* of this software and associated documentation files (the "Software"),
to deal
@@ -42,8 +42,7 @@ void repagent_handle_protected_write(BlockDriverState *bs,
int nb_sectors, QEMUIOVector *qiov, int ret_status);
void repagent_register_drive(const char *drive_path,
BlockDriverState *driver_ptr);
-void repagent_deregister_drive(const char *drive_path,
- BlockDriverState *driver_ptr);
+void repagent_deregister_drive(BlockDriverState *driver_ptr);
bool repaget_start_protect(RepCmdStartProtect *pcmd,
RepCmdDataStartProtect *pcmd_data);
bool repagent_remote_io(struct RepCmdRemoteIoReq *pcmd, uint8_t *pdata);
diff --git a/block/repagent/repagent_client.c
b/block/repagent/repagent_client.c
index 9d826c4..2e57ed0 100644
--- a/block/repagent/repagent_client.c
+++ b/block/repagent/repagent_client.c
@@ -1,7 +1,7 @@
/*
* QEMU System Emulator replication agent - socket client
*
- * Copyright (c) 2003 Fabrice Bellard
+ * Copyright (c) 2012 Ori Mamluk
*
* Permission is hereby granted, free of charge, to any person obtaining a
copy
* of this software and associated documentation files (the "Software"),
to deal
diff --git a/block/repagent/repagent_client.h
b/block/repagent/repagent_client.h
index 62a5377..6eaafed 100644
--- a/block/repagent/repagent_client.h
+++ b/block/repagent/repagent_client.h
@@ -1,7 +1,7 @@
/*
- * QEMU System Emulator
+ * QEMU System Emulator - replication agent client
*
- * Copyright (c) 2003-2008 Fabrice Bellard
+ * Copyright (c) 2012 Ori Mamluk
*
* Permission is hereby granted, free of charge, to any person obtaining a
copy
* of this software and associated documentation files (the "Software"),
to deal
diff --git a/block/repagent/repagent_drv.c b/block/repagent/repagent_drv.c
index 4775166..1795de1 100644
--- a/block/repagent/repagent_drv.c
+++ b/block/repagent/repagent_drv.c
@@ -1,7 +1,7 @@
/*
* QEMU System Emulator replication agent - repagent block driver
*
- * Copyright (c) 2003 Fabrice Bellard
+ * Copyright (c) 2012 Ori Mamluk
*
* Permission is hereby granted, free of charge, to any person obtaining a
copy
* of this software and associated documentation files (the "Software"),
to deal
@@ -68,6 +68,7 @@ static int coroutine_fn
repagent_co_writev(BlockDriverState *bs,
static void repagent_close(BlockDriverState *bs)
{
printf("%s\n", __func__);
+ repagent_deregister_drive(bs);
}
static int coroutine_fn repagent_co_flush(BlockDriverState *bs)
@@ -88,11 +89,12 @@ static int repagent_truncate(BlockDriverState *bs,
int64_t offset)
return bdrv_truncate(bs->file, offset);
}
+/* orim todo maybe use probe for planting repagent in every driver */
static int repagent_probe(const uint8_t *buf, int buf_size,
const char *filename)
{
printf("%s\n", __func__);
- return 1; /* everything can be opened as raw image */
+ return 0; /* everything can be opened as raw image */
}
static int coroutine_fn repagent_co_discard(BlockDriverState *bs,
diff --git a/block/repagent/repcmd_listener.c
b/block/repagent/repcmd_listener.c
index e6b4d74..0998f44 100644
--- a/block/repagent/repcmd_listener.c
+++ b/block/repagent/repcmd_listener.c
@@ -1,7 +1,7 @@
/*
* QEMU System Emulator replication agent - socket commands layer
*
- * Copyright (c) 2003 Fabrice Bellard
+ * Copyright (c) 2012 Ori Mamluk
*
* Permission is hereby granted, free of charge, to any person obtaining a
copy
* of this software and associated documentation files (the "Software"),
to deal
diff --git a/block/repagent/repcmd_listener.h
b/block/repagent/repcmd_listener.h
index 19b9ea9..629cdb5 100644
--- a/block/repagent/repcmd_listener.h
+++ b/block/repagent/repcmd_listener.h
@@ -1,7 +1,7 @@
/*
* QEMU System Emulator
*
- * Copyright (c) 2003-2008 Fabrice Bellard
+ * Copyright (c) 2012 Ori Mamluk
*
* Permission is hereby granted, free of charge, to any person obtaining a
copy
* of this software and associated documentation files (the "Software"),
to deal
diff --git a/block/repagent/rephub_cmds.h b/block/repagent/rephub_cmds.h
index cb737e6..5075f33 100644
--- a/block/repagent/rephub_cmds.h
+++ b/block/repagent/rephub_cmds.h
@@ -1,7 +1,7 @@
/*
* QEMU System Emulator
*
- * Copyright (c) 2003-2008 Fabrice Bellard
+ * Copyright (c) 2012 Ori Mamluk
*
* Permission is hereby granted, free of charge, to any person obtaining a
copy
* of this software and associated documentation files (the "Software"),
to deal
diff --git a/block/repagent/rephub_defs.h b/block/repagent/rephub_defs.h
index f036f58..575e9f8 100644
--- a/block/repagent/rephub_defs.h
+++ b/block/repagent/rephub_defs.h
@@ -1,7 +1,7 @@
/*
* QEMU System Emulator
*
- * Copyright (c) 2003-2008 Fabrice Bellard
+ * Copyright (c) 2012 Ori Mamluk
*
* Permission is hereby granted, free of charge, to any person obtaining a
copy
* of this software and associated documentation files (the "Software"),
to deal
--
1.7.6.5
[-- Attachment #2: Type: text/html, Size: 34128 bytes --]
^ permalink raw reply [flat|nested] only message in thread
only message in thread, other threads:[~2012-04-05 12:20 UTC | newest]
Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2012-04-05 12:20 [Qemu-devel] [RFC PATCH v3 8/9] repagent: Work on review of patch V2 - moved to QTAILQ instead of array, replaced pthreads with qemu-thread Ori Mamluk
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).