xen-devel.lists.xenproject.org archive mirror
 help / color / mirror / Atom feed
From: Wen Congyang <wency@cn.fujitsu.com>
To: xen devel <xen-devel@lists.xen.org>
Cc: Ian Campbell <Ian.Campbell@citrix.com>,
	Wen Congyang <wency@cn.fujitsu.com>,
	Ian Jackson <Ian.Jackson@eu.citrix.com>,
	Jiang Yunhong <yunhong.jiang@intel.com>,
	Dong Eddie <eddie.dong@intel.com>,
	Yang Hongyang <yanghy@cn.fujitsu.com>,
	Lai Jiangshan <laijs@cn.fujitsu.com>
Subject: [RFC Patch v2 38/45] blktap2: move ramdisk related codes to block-replication.c
Date: Fri, 8 Aug 2014 15:01:37 +0800	[thread overview]
Message-ID: <1407481305-19808-39-git-send-email-wency@cn.fujitsu.com> (raw)
In-Reply-To: <1407481305-19808-1-git-send-email-wency@cn.fujitsu.com>

 COLO will reuse them

Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
---
 tools/blktap2/drivers/block-remus.c       | 485 ++----------------------------
 tools/blktap2/drivers/block-replication.c | 452 ++++++++++++++++++++++++++++
 tools/blktap2/drivers/block-replication.h |  48 +++
 3 files changed, 523 insertions(+), 462 deletions(-)

diff --git a/tools/blktap2/drivers/block-remus.c b/tools/blktap2/drivers/block-remus.c
index 8b6f157..2713af1 100644
--- a/tools/blktap2/drivers/block-remus.c
+++ b/tools/blktap2/drivers/block-remus.c
@@ -37,9 +37,6 @@
 #include "tapdisk-server.h"
 #include "tapdisk-driver.h"
 #include "tapdisk-interface.h"
-#include "hashtable.h"
-#include "hashtable_itr.h"
-#include "hashtable_utility.h"
 #include "block-replication.h"
 
 #include <errno.h>
@@ -58,7 +55,6 @@
 
 /* timeout for reads and writes in ms */
 #define HEARTBEAT_MS 1000
-#define RAMDISK_HASHSIZE 128
 
 /* connect retry timeout (seconds) */
 #define REMUS_CONNRETRY_TIMEOUT 1
@@ -97,51 +93,6 @@ td_vbd_t *device_vbd = NULL;
 td_image_t *remus_image = NULL;
 struct tap_disk tapdisk_remus;
 
-struct ramdisk {
-	size_t sector_size;
-	struct hashtable* h;
-	/* when a ramdisk is flushed, h is given a new empty hash for writes
-	 * while the old ramdisk (prev) is drained asynchronously.
-	 */
-	struct hashtable* prev;
-	/* count of outstanding requests to the base driver */
-	size_t inflight;
-	/* prev holds the requests to be flushed, while inprogress holds
-	 * requests being flushed. When requests complete, they are removed
-	 * from inprogress.
-	 * Whenever a new flush is merged with ongoing flush (i.e, prev),
-	 * we have to make sure that none of the new requests overlap with
-	 * ones in "inprogress". If it does, keep it back in prev and dont issue
-	 * IO until the current one finishes. If we allow this IO to proceed,
-	 * we might end up with two "overlapping" requests in the disk's queue and
-	 * the disk may not offer any guarantee on which one is written first.
-	 * IOW, make sure we dont create a write-after-write time ordering constraint.
-	 * 
-	 */
-	struct hashtable* inprogress;
-};
-
-/* the ramdisk intercepts the original callback for reads and writes.
- * This holds the original data. */
-/* Might be worth making this a static array in struct ramdisk to avoid
- * a malloc per request */
-
-struct tdremus_state;
-
-struct ramdisk_cbdata {
-	td_callback_t cb;
-	void* private;
-	char* buf;
-	struct tdremus_state* state;
-};
-
-struct ramdisk_write_cbdata {
-	struct tdremus_state* state;
-	char* buf;
-};
-
-typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq);
-
 typedef struct poll_fd {
 	int        fd;
 	event_id_t id;
@@ -167,8 +118,14 @@ struct tdremus_state {
 	 */
 	struct req_ring queued_io;
 
-	/* ramdisk data*/
+	/* ramdisk data */
 	struct ramdisk ramdisk;
+	/*
+	 * The primary write request is queued in this
+	 * hashtable, and will be flushed to ramdisk when
+	 * the checkpoint finishes.
+	 */
+	struct hashtable *h;
 
 	/* mode methods */
 	enum tdremus_mode mode;
@@ -239,404 +196,20 @@ static void ring_add_request(struct req_ring *ring, const td_request_t *treq)
 	ring->prod = ring_next(ring->prod);
 }
 
-/* Prototype declarations */
-static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s);
-
-/* functions to create and sumbit treq's */
-
-static void
-replicated_write_callback(td_request_t treq, int err)
-{
-	struct tdremus_state *s = (struct tdremus_state *) treq.cb_data;
-	td_vbd_request_t *vreq;
-	int i;
-	uint64_t start;
-	vreq = (td_vbd_request_t *) treq.private;
-
-	/* the write failed for now, lets panic. this is very bad */
-	if (err) {
-		RPRINTF("ramdisk write failed, disk image is not consistent\n");
-		exit(-1);
-	}
-
-	/* The write succeeded. let's pull the vreq off whatever request list
-	 * it is on and free() it */
-	list_del(&vreq->next);
-	free(vreq);
-
-	s->ramdisk.inflight--;
-	start = treq.sec;
-	for (i = 0; i < treq.secs; i++) {
-		hashtable_remove(s->ramdisk.inprogress, &start);
-		start++;
-	}
-	free(treq.buf);
-
-	if (!s->ramdisk.inflight && !s->ramdisk.prev) {
-		/* TODO: the ramdisk has been flushed */
-	}
-}
-
-static inline int
-create_write_request(struct tdremus_state *state, td_sector_t sec, int secs, char *buf)
-{
-	td_request_t treq;
-	td_vbd_request_t *vreq;
-
-	treq.op      = TD_OP_WRITE;
-	treq.buf     = buf;
-	treq.sec     = sec;
-	treq.secs    = secs;
-	treq.image   = remus_image;
-	treq.cb      = replicated_write_callback;
-	treq.cb_data = state;
-	treq.id      = 0;
-	treq.sidx    = 0;
-
-	vreq         = calloc(1, sizeof(td_vbd_request_t));
-	treq.private = vreq;
-
-	if(!vreq)
-		return -1;
-
-	vreq->submitting = 1;
-	INIT_LIST_HEAD(&vreq->next);
-	tapdisk_vbd_move_request(treq.private, &device_vbd->pending_requests);
-
-	/* TODO:
-	 * we should probably leave it up to the caller to forward the request */
-	td_forward_request(treq);
-
-	vreq->submitting--;
-
-	return 0;
-}
-
-
-/* http://www.concentric.net/~Ttwang/tech/inthash.htm */
-static unsigned int uint64_hash(void* k)
-{
-	uint64_t key = *(uint64_t*)k;
-
-	key = (~key) + (key << 18);
-	key = key ^ (key >> 31);
-	key = key * 21;
-	key = key ^ (key >> 11);
-	key = key + (key << 6);
-	key = key ^ (key >> 22);
-
-	return (unsigned int)key;
-}
-
-static int rd_hash_equal(void* k1, void* k2)
-{
-	uint64_t key1, key2;
-
-	key1 = *(uint64_t*)k1;
-	key2 = *(uint64_t*)k2;
-
-	return key1 == key2;
-}
-
-static int ramdisk_read(struct ramdisk* ramdisk, uint64_t sector,
-			int nb_sectors, char* buf)
-{
-	int i;
-	char* v;
-	uint64_t key;
-
-	for (i = 0; i < nb_sectors; i++) {
-		key = sector + i;
-		/* check whether it is queued in a previous flush request */
-		if (!(ramdisk->prev && (v = hashtable_search(ramdisk->prev, &key)))) {
-			/* check whether it is an ongoing flush */
-			if (!(ramdisk->inprogress && (v = hashtable_search(ramdisk->inprogress, &key))))
-				return -1;
-		}
-		memcpy(buf + i * ramdisk->sector_size, v, ramdisk->sector_size);
-	}
-
-	return 0;
-}
-
-static int ramdisk_write_hash(struct hashtable* h, uint64_t sector, char* buf,
-			      size_t len)
-{
-	char* v;
-	uint64_t* key;
-
-	if ((v = hashtable_search(h, &sector))) {
-		memcpy(v, buf, len);
-		return 0;
-	}
-
-	if (!(v = malloc(len))) {
-		DPRINTF("ramdisk_write_hash: malloc failed\n");
-		return -1;
-	}
-	memcpy(v, buf, len);
-	if (!(key = malloc(sizeof(*key)))) {
-		DPRINTF("ramdisk_write_hash: error allocating key\n");
-		free(v);
-		return -1;
-	}
-	*key = sector;
-	if (!hashtable_insert(h, key, v)) {
-		DPRINTF("ramdisk_write_hash failed on sector %" PRIu64 "\n", sector);
-		free(key);
-		free(v);
-		return -1;
-	}
-
-	return 0;
-}
-
-static inline int ramdisk_write(struct ramdisk* ramdisk, uint64_t sector,
-				int nb_sectors, char* buf)
-{
-	int i, rc;
-
-	for (i = 0; i < nb_sectors; i++) {
-		rc = ramdisk_write_hash(ramdisk->h, sector + i,
-					buf + i * ramdisk->sector_size,
-					ramdisk->sector_size);
-		if (rc)
-			return rc;
-	}
-
-	return 0;
-}
-
-static int uint64_compare(const void* k1, const void* k2)
-{
-	uint64_t u1 = *(uint64_t*)k1;
-	uint64_t u2 = *(uint64_t*)k2;
-
-	/* u1 - u2 is unsigned */
-	return u1 < u2 ? -1 : u1 > u2 ? 1 : 0;
-}
-
-/* set psectors to an array of the sector numbers in the hash, returning
- * the number of entries (or -1 on error) */
-static int ramdisk_get_sectors(struct hashtable* h, uint64_t** psectors)
-{
-	struct hashtable_itr* itr;
-	uint64_t* sectors;
-	int count;
-
-	if (!(count = hashtable_count(h)))
-		return 0;
-
-	if (!(*psectors = malloc(count * sizeof(uint64_t)))) {
-		DPRINTF("ramdisk_get_sectors: error allocating sector map\n");
-		return -1;
-	}
-	sectors = *psectors;
-
-	itr = hashtable_iterator(h);
-	count = 0;
-	do {
-		sectors[count++] = *(uint64_t*)hashtable_iterator_key(itr);
-	} while (hashtable_iterator_advance(itr));
-	free(itr);
-
-	return count;
-}
-
-/*
-  return -1 for OOM
-  return -2 for merge lookup failure
-  return -3 for WAW race
-  return 0 on success.
-*/
-static int merge_requests(struct ramdisk* ramdisk, uint64_t start,
-			size_t count, char **mergedbuf)
-{
-	char* buf;
-	char* sector;
-	int i;
-	uint64_t *key;
-	int rc = 0;
-
-	if (!(buf = valloc(count * ramdisk->sector_size))) {
-		DPRINTF("merge_request: allocation failed\n");
-		return -1;
-	}
-
-	for (i = 0; i < count; i++) {
-		if (!(sector = hashtable_search(ramdisk->prev, &start))) {
-			DPRINTF("merge_request: lookup failed on %"PRIu64"\n", start);
-			free(buf);
-			rc = -2;
-			goto fail;
-		}
-
-		/* Check inprogress requests to avoid waw non-determinism */
-		if (hashtable_search(ramdisk->inprogress, &start)) {
-			DPRINTF("merge_request: WAR RACE on %"PRIu64"\n", start);
-			free(buf);
-			rc = -3;
-			goto fail;
-		}
-		/* Insert req into inprogress (brief period of duplication of hash entries until
-		 * they are removed from prev. Read tracking would not be reading wrong entries)
-		 */
-		if (!(key = malloc(sizeof(*key)))) {
-			DPRINTF("%s: error allocating key\n", __FUNCTION__);
-			free(buf);			
-			rc = -1;
-			goto fail;
-		}
-		*key = start;
-		if (!hashtable_insert(ramdisk->inprogress, key, NULL)) {
-			DPRINTF("%s failed to insert sector %" PRIu64 " into inprogress hash\n", 
-				__FUNCTION__, start);
-			free(key);
-			free(buf);
-			rc = -1;
-			goto fail;
-		}
-		memcpy(buf + i * ramdisk->sector_size, sector, ramdisk->sector_size);
-		start++;
-	}
-
-	*mergedbuf = buf;
-	return 0;
-fail:
-	for (start--; i >0; i--, start--)
-		hashtable_remove(ramdisk->inprogress, &start);
-	return rc;
-}
-
-/* The underlying driver may not handle having the whole ramdisk queued at
- * once. We queue what we can and let the callbacks attempt to queue more. */
-/* NOTE: may be called from callback, while dd->private still belongs to
- * the underlying driver */
-static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s)
-{
-	uint64_t* sectors;
-	char* buf = NULL;
-	uint64_t base, batchlen;
-	int i, j, count = 0;
-
-	// RPRINTF("ramdisk flush\n");
-
-	if ((count = ramdisk_get_sectors(s->ramdisk.prev, &sectors)) <= 0)
-		return count;
-
-	/* Create the inprogress table if empty */
-	if (!s->ramdisk.inprogress)
-		s->ramdisk.inprogress = create_hashtable(RAMDISK_HASHSIZE,
-							uint64_hash,
-							rd_hash_equal);
-	
-	/*
-	  RPRINTF("ramdisk: flushing %d sectors\n", count);
-	*/
-
-	/* sort and merge sectors to improve disk performance */
-	qsort(sectors, count, sizeof(*sectors), uint64_compare);
-
-	for (i = 0; i < count;) {
-		base = sectors[i++];
-		while (i < count && sectors[i] == sectors[i-1] + 1)
-			i++;
-		batchlen = sectors[i-1] - base + 1;
-
-		j = merge_requests(&s->ramdisk, base, batchlen, &buf);
-			
-		if (j) {
-			RPRINTF("ramdisk_flush: merge_requests failed:%s\n",
-				j == -1? "OOM": (j==-2? "missing sector" : "WAW race"));
-			if (j == -3) continue;
-			free(sectors);
-			return -1;
-		}
-
-		/* NOTE: create_write_request() creates a treq AND forwards it down
-		 * the driver chain */
-		// RPRINTF("forwarding write request at %" PRIu64 ", length: %" PRIu64 "\n", base, batchlen);
-		create_write_request(s, base, batchlen, buf);
-		//RPRINTF("write request at %" PRIu64 ", length: %" PRIu64 " forwarded\n", base, batchlen);
-
-		s->ramdisk.inflight++;
-
-		for (j = 0; j < batchlen; j++) {
-			buf = hashtable_search(s->ramdisk.prev, &base);
-			free(buf);
-			hashtable_remove(s->ramdisk.prev, &base);
-			base++;
-		}
-	}
-
-	if (!hashtable_count(s->ramdisk.prev)) {
-		/* everything is in flight */
-		hashtable_destroy(s->ramdisk.prev, 0);
-		s->ramdisk.prev = NULL;
-	}
-
-	free(sectors);
-
-	// RPRINTF("ramdisk flush done\n");
-	return 0;
-}
-
-/* flush ramdisk contents to disk */
-static int ramdisk_start_flush(td_driver_t *driver)
-{
-	struct tdremus_state *s = (struct tdremus_state *)driver->data;
-	uint64_t* key;
-	char* buf;
-	int rc = 0;
-	int i, j, count, batchlen;
-	uint64_t* sectors;
-
-	if (!hashtable_count(s->ramdisk.h)) {
-		/*
-		  RPRINTF("Nothing to flush\n");
-		*/
-		return 0;
-	}
-
-	if (s->ramdisk.prev) {
-		/* a flush request issued while a previous flush is still in progress
-		 * will merge with the previous request. If you want the previous
-		 * request to be consistent, wait for it to complete. */
-		if ((count = ramdisk_get_sectors(s->ramdisk.h, &sectors)) < 0)
-			return count;
-
-		for (i = 0; i < count; i++) {
-			buf = hashtable_search(s->ramdisk.h, sectors + i);
-			ramdisk_write_hash(s->ramdisk.prev, sectors[i], buf,
-					   s->ramdisk.sector_size);
-		}
-		free(sectors);
-
-		hashtable_destroy (s->ramdisk.h, 1);
-	} else
-		s->ramdisk.prev = s->ramdisk.h;
-
-	/* We create a new hashtable so that new writes can be performed before
-	 * the old hashtable is completely drained. */
-	s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
-					rd_hash_equal);
-
-	return ramdisk_flush(driver, s);
-}
-
-
 static int ramdisk_start(td_driver_t *driver)
 {
 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
 
-	if (s->ramdisk.h) {
+	if (s->h) {
 		RPRINTF("ramdisk already allocated\n");
 		return 0;
 	}
 
 	s->ramdisk.sector_size = driver->info.sector_size;
-	s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
-					rd_hash_equal);
+	s->ramdisk.log_prefix = "remus";
+	s->ramdisk.image = remus_image;
+	ramdisk_init(&s->ramdisk);
+	s->h = ramdisk_new_hashtable();
 
 	DPRINTF("Ramdisk started, %zu bytes/sector\n", s->ramdisk.sector_size);
 
@@ -1024,10 +597,7 @@ static inline int server_writes_inflight(td_driver_t *driver)
 {
 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
 
-	if (!s->ramdisk.inflight && !s->ramdisk.prev)
-		return 0;
-
-	return 1;
+	return ramdisk_writes_inflight(&s->ramdisk);
 }
 
 /* Due to block device prefetching this code may be called on the server side
@@ -1067,13 +637,9 @@ void backup_queue_write(td_driver_t *driver, td_request_t treq)
 static int server_flush(td_driver_t *driver)
 {
 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
-	/*
-	 * Nothing to flush in beginning.
-	 */
-	if (!s->ramdisk.prev)
-		return 0;
+
 	/* Try to flush any remaining requests */
-	return ramdisk_flush(driver, s);
+	return ramdisk_flush(&s->ramdisk);
 }
 
 static int backup_start(td_driver_t *driver)
@@ -1120,7 +686,9 @@ static void server_do_wreq(td_driver_t *driver)
 	if (mread(s->stream_fd.fd, buf, len) < 0)
 		goto err;
 
-	if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0) {
+	if (ramdisk_write_to_hashtable(s->h, *sector, *sectors,
+				       driver->info.sector_size, buf,
+				       "remus") < 0) {
 		rc = ERROR_INTERNAL;
 		goto err;
 	}
@@ -1150,7 +718,7 @@ static void server_do_creq(td_driver_t *driver)
 
 	// RPRINTF("committing buffer\n");
 
-	ramdisk_start_flush(driver);
+	ramdisk_start_flush(&s->ramdisk, &s->h);
 
 	/* XXX this message should not be sent until flush completes! */
 	if (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) != 4)
@@ -1199,12 +767,7 @@ void unprotected_queue_read(td_driver_t *driver, td_request_t treq)
 
 	/* wait for previous ramdisk to flush  before servicing reads */
 	if (server_writes_inflight(driver)) {
-		/* for now lets just return EBUSY.
-		 * if there are any left-over requests in prev,
-		 * kick em again.
-		 */
-		if(!s->ramdisk.inflight) /* nothing in inprogress */
-			ramdisk_flush(driver, s);
+		ramdisk_flush(&s->ramdisk);
 
 		td_complete_request(treq, -EBUSY);
 	}
@@ -1222,8 +785,7 @@ void unprotected_queue_write(td_driver_t *driver, td_request_t treq)
 	/* wait for previous ramdisk to flush */
 	if (server_writes_inflight(driver)) {
 		RPRINTF("queue_write: waiting for queue to drain");
-		if(!s->ramdisk.inflight) /* nothing in inprogress. Kick prev */
-			ramdisk_flush(driver, s);
+		ramdisk_flush(&s->ramdisk);
 		td_complete_request(treq, -EBUSY);
 	}
 	else {
@@ -1532,9 +1094,8 @@ static int tdremus_close(td_driver_t *driver)
 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
 
 	RPRINTF("closing\n");
-	if (s->ramdisk.inprogress)
-		hashtable_destroy(s->ramdisk.inprogress, 0);
-
+	ramdisk_destroy(&s->ramdisk);
+	ramdisk_destroy_hashtable(s->h);
 	td_replication_connect_kill(&s->t);
 	ctl_unregister(s);
 	ctl_close(s);
diff --git a/tools/blktap2/drivers/block-replication.c b/tools/blktap2/drivers/block-replication.c
index e4b2679..30eba8f 100644
--- a/tools/blktap2/drivers/block-replication.c
+++ b/tools/blktap2/drivers/block-replication.c
@@ -15,6 +15,10 @@
 
 #include "tapdisk-server.h"
 #include "block-replication.h"
+#include "tapdisk-interface.h"
+#include "hashtable.h"
+#include "hashtable_itr.h"
+#include "hashtable_utility.h"
 
 #include <string.h>
 #include <errno.h>
@@ -30,6 +34,8 @@
 #define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "%s: " _f, log_prefix, ## _a)
 #define EPRINTF(_f, _a...) syslog (LOG_ERR, "%s: " _f, log_prefix, ## _a)
 
+#define RAMDISK_HASHSIZE 128
+
 /* connection status */
 enum {
 	connection_none,
@@ -466,3 +472,449 @@ static void td_replication_connect_event(event_id_t id, char mode,
 fail:
 	td_replication_client_failed(t, rc);
 }
+
+
+/* I/O replication */
+static void replicated_write_callback(td_request_t treq, int err)
+{
+	ramdisk_t *ramdisk = treq.cb_data;
+	td_vbd_request_t *vreq = treq.private;
+	int i;
+	uint64_t start;
+	const char *log_prefix = ramdisk->log_prefix;
+
+	/* the write failed for now, lets panic. this is very bad */
+	if (err) {
+		EPRINTF("ramdisk write failed, disk image is not consistent\n");
+		exit(-1);
+	}
+
+	/*
+	 * The write succeeded. let's pull the vreq off whatever request list
+	 * it is on and free() it
+	 */
+	list_del(&vreq->next);
+	free(vreq);
+
+	ramdisk->inflight--;
+	start = treq.sec;
+	for (i = 0; i < treq.secs; i++) {
+		hashtable_remove(ramdisk->inprogress, &start);
+		start++;
+	}
+	free(treq.buf);
+
+	if (!ramdisk->inflight && ramdisk->prev)
+		ramdisk_flush(ramdisk);
+}
+
+static int
+create_write_request(ramdisk_t *ramdisk, td_sector_t sec, int secs, char *buf)
+{
+	td_request_t treq;
+	td_vbd_request_t *vreq;
+	td_vbd_t *vbd = ramdisk->image->private;
+
+	treq.op      = TD_OP_WRITE;
+	treq.buf     = buf;
+	treq.sec     = sec;
+	treq.secs    = secs;
+	treq.image   = ramdisk->image;
+	treq.cb      = replicated_write_callback;
+	treq.cb_data = ramdisk;
+	treq.id      = 0;
+	treq.sidx    = 0;
+
+	vreq         = calloc(1, sizeof(td_vbd_request_t));
+	treq.private = vreq;
+
+	if(!vreq)
+		return -1;
+
+	vreq->submitting = 1;
+	INIT_LIST_HEAD(&vreq->next);
+	tapdisk_vbd_move_request(treq.private, &vbd->pending_requests);
+
+	td_forward_request(treq);
+
+	vreq->submitting--;
+
+	return 0;
+}
+
+/* http://www.concentric.net/~Ttwang/tech/inthash.htm */
+static unsigned int uint64_hash(void *k)
+{
+	uint64_t key = *(uint64_t*)k;
+
+	key = (~key) + (key << 18);
+	key = key ^ (key >> 31);
+	key = key * 21;
+	key = key ^ (key >> 11);
+	key = key + (key << 6);
+	key = key ^ (key >> 22);
+
+	return (unsigned int)key;
+}
+
+static int rd_hash_equal(void *k1, void *k2)
+{
+	uint64_t key1, key2;
+
+	key1 = *(uint64_t*)k1;
+	key2 = *(uint64_t*)k2;
+
+	return key1 == key2;
+}
+
+static int uint64_compare(const void *k1, const void *k2)
+{
+	uint64_t u1 = *(uint64_t*)k1;
+	uint64_t u2 = *(uint64_t*)k2;
+
+	/* u1 - u2 is unsigned */
+	return u1 < u2 ? -1 : u1 > u2 ? 1 : 0;
+}
+
+/*
+ * set psectors to an array of the sector numbers in the hash, returning
+ * the number of entries (or -1 on error)
+ */
+static int ramdisk_get_sectors(struct hashtable *h, uint64_t **psectors,
+			       const char *log_prefix)
+{
+	struct hashtable_itr* itr;
+	uint64_t* sectors;
+	int count;
+
+	if (!(count = hashtable_count(h)))
+		return 0;
+
+	if (!(*psectors = malloc(count * sizeof(uint64_t)))) {
+		DPRINTF("ramdisk_get_sectors: error allocating sector map\n");
+		return -1;
+	}
+	sectors = *psectors;
+
+	itr = hashtable_iterator(h);
+	count = 0;
+	do {
+		sectors[count++] = *(uint64_t*)hashtable_iterator_key(itr);
+	} while (hashtable_iterator_advance(itr));
+	free(itr);
+
+	return count;
+}
+
+static int ramdisk_write_hash(struct hashtable *h, uint64_t sector, char *buf,
+			      size_t len, const char *log_prefix)
+{
+	char *v;
+	uint64_t *key;
+
+	if ((v = hashtable_search(h, &sector))) {
+		memcpy(v, buf, len);
+		return 0;
+	}
+
+	if (!(v = malloc(len))) {
+		DPRINTF("ramdisk_write_hash: malloc failed\n");
+		return -1;
+	}
+	memcpy(v, buf, len);
+	if (!(key = malloc(sizeof(*key)))) {
+		DPRINTF("ramdisk_write_hash: error allocating key\n");
+		free(v);
+		return -1;
+	}
+	*key = sector;
+	if (!hashtable_insert(h, key, v)) {
+		DPRINTF("ramdisk_write_hash failed on sector %" PRIu64 "\n", sector);
+		free(key);
+		free(v);
+		return -1;
+	}
+
+	return 0;
+}
+
+/*
+ * return -1 for OOM
+ * return -2 for merge lookup failure(should not happen)
+ * return -3 for WAW race
+ * return 0 on success.
+ */
+static int merge_requests(struct ramdisk *ramdisk, uint64_t start,
+			  size_t count, char **mergedbuf)
+{
+	char* buf;
+	char* sector;
+	int i;
+	uint64_t *key;
+	int rc = 0;
+	const char *log_prefix = ramdisk->log_prefix;
+
+	if (!(buf = valloc(count * ramdisk->sector_size))) {
+		DPRINTF("merge_request: allocation failed\n");
+		return -1;
+	}
+
+	for (i = 0; i < count; i++) {
+		if (!(sector = hashtable_search(ramdisk->prev, &start))) {
+			EPRINTF("merge_request: lookup failed on %"PRIu64"\n",
+				start);
+			free(buf);
+			rc = -2;
+			goto fail;
+		}
+
+		/* Check inprogress requests to avoid waw non-determinism */
+		if (hashtable_search(ramdisk->inprogress, &start)) {
+			DPRINTF("merge_request: WAR RACE on %"PRIu64"\n",
+				start);
+			free(buf);
+			rc = -3;
+			goto fail;
+		}
+
+		/*
+		 * Insert req into inprogress (brief period of duplication of
+		 * hash entries until they are removed from prev. Read tracking
+		 * would not be reading wrong entries)
+		 */
+		if (!(key = malloc(sizeof(*key)))) {
+			EPRINTF("%s: error allocating key\n", __FUNCTION__);
+			free(buf);
+			rc = -1;
+			goto fail;
+		}
+		*key = start;
+		if (!hashtable_insert(ramdisk->inprogress, key, NULL)) {
+			EPRINTF("%s failed to insert sector %" PRIu64 " into inprogress hash\n",
+				__FUNCTION__, start);
+			free(key);
+			free(buf);
+			rc = -1;
+			goto fail;
+		}
+
+		memcpy(buf + i * ramdisk->sector_size, sector, ramdisk->sector_size);
+		start++;
+	}
+
+	*mergedbuf = buf;
+	return 0;
+fail:
+	for (start--; i > 0; i--, start--)
+		hashtable_remove(ramdisk->inprogress, &start);
+	return rc;
+}
+
+int ramdisk_flush(ramdisk_t *ramdisk)
+{
+	uint64_t *sectors;
+	char *buf = NULL;
+	uint64_t base, batchlen;
+	int i, j, count = 0;
+	const char *log_prefix = ramdisk->log_prefix;
+
+	/* everything is in flight */
+	if (!ramdisk->prev)
+		return 0;
+
+	count = ramdisk_get_sectors(ramdisk->prev, &sectors, log_prefix);
+	if (count <= 0)
+		/* should not happen */
+		return count;
+
+	/* Create the inprogress table if empty */
+	if (!ramdisk->inprogress)
+		ramdisk->inprogress = ramdisk_new_hashtable();
+
+	/* sort and merge sectors to improve disk performance */
+	qsort(sectors, count, sizeof(*sectors), uint64_compare);
+
+	for (i = 0; i < count;) {
+		base = sectors[i++];
+		while (i < count && sectors[i] == sectors[i-1] + 1)
+			i++;
+		batchlen = sectors[i-1] - base + 1;
+
+		j = merge_requests(ramdisk, base, batchlen, &buf);
+		if (j) {
+			EPRINTF("ramdisk_flush: merge_requests failed:%s\n",
+				j == -1 ? "OOM" :
+					(j == -2 ? "missing sector" :
+						 "WAW race"));
+			if (j == -3)
+				continue;
+			free(sectors);
+			return -1;
+		}
+
+		/*
+		 * NOTE: create_write_request() creates a treq AND forwards
+		 * it down the driver chain
+		 *
+		 * TODO: handle create_write_request()'s error.
+		 */
+		create_write_request(ramdisk, base, batchlen, buf);
+
+		ramdisk->inflight++;
+
+		for (j = 0; j < batchlen; j++) {
+			buf = hashtable_search(ramdisk->prev, &base);
+			free(buf);
+			hashtable_remove(ramdisk->prev, &base);
+			base++;
+		}
+	}
+
+	if (!hashtable_count(ramdisk->prev)) {
+		/* everything is in flight */
+		hashtable_destroy(ramdisk->prev, 0);
+		ramdisk->prev = NULL;
+	}
+
+	free(sectors);
+	return 0;
+}
+
+int ramdisk_start_flush(ramdisk_t *ramdisk, struct hashtable **new)
+{
+	uint64_t *key;
+	char *buf;
+	int rc = 0;
+	int i, j, count, batchlen;
+	uint64_t *sectors;
+	const char *log_prefix = ramdisk->log_prefix;
+
+	if (!hashtable_count(*new))
+		return 0;
+
+	if (ramdisk->prev) {
+		/*
+		 * a flush request issued while a previous flush is still in
+		 * progress will merge with the previous request. If you want
+		 * the previous request to be consistent, wait for it to
+		 * complete.
+		 */
+		count = ramdisk_get_sectors(*new, &sectors, log_prefix);
+		if (count < 0 )
+			return count;
+
+		for (i = 0; i < count; i++) {
+			buf = hashtable_search(*new, sectors + i);
+			ramdisk_write_hash(ramdisk->prev, sectors[i], buf,
+					   ramdisk->sector_size, log_prefix);
+		}
+		free(sectors);
+
+		hashtable_destroy(*new, 1);
+	} else
+		ramdisk->prev = *new;
+
+	/*
+	 * We create a new hashtable so that new writes can be performed before
+	 * the old hashtable is completely drained.
+	 */
+	*new = ramdisk_new_hashtable();
+
+	return ramdisk_flush(ramdisk);
+}
+
+void ramdisk_init(ramdisk_t *ramdisk)
+{
+	ramdisk->inflight = 0;
+	ramdisk->prev = NULL;
+	ramdisk->inprogress = NULL;
+}
+
+void ramdisk_destroy(ramdisk_t *ramdisk)
+{
+	const char *log_prefix = ramdisk->log_prefix;
+
+	/*
+	 * ramdisk_destroy() is called only when we will close the tapdisk image.
+	 * In this case, there are no pending requests in vbd.
+	 *
+	 * If ramdisk->inflight is not 0, it means that the requests created by
+	 * us are still in vbd->pending_requests.
+	 */
+	if (ramdisk->inflight) {
+		/* should not happen */
+		EPRINTF("cannot destroy ramdisk\n");
+		return;
+	}
+
+	if (ramdisk->inprogress) {
+		hashtable_destroy(ramdisk->inprogress, 0);
+		ramdisk->inprogress = NULL;
+	}
+
+	if (ramdisk->prev) {
+		hashtable_destroy(ramdisk->prev, 1);
+		ramdisk->prev = NULL;
+	}
+}
+
+int ramdisk_writes_inflight(ramdisk_t *ramdisk)
+{
+	if (!ramdisk->inflight && !ramdisk->prev)
+		return 0;
+
+	return 1;
+}
+
+int ramdisk_read(struct ramdisk *ramdisk, uint64_t sector,
+		 int nb_sectors, char *buf)
+{
+	int i;
+	char *v;
+	uint64_t key;
+
+	for (i = 0; i < nb_sectors; i++) {
+		key = sector + i;
+		/* check whether it is queued in a previous flush request */
+		if (!(ramdisk->prev &&
+		    (v = hashtable_search(ramdisk->prev, &key)))) {
+			/* check whether it is an ongoing flush */
+			if (!(ramdisk->inprogress &&
+			    (v = hashtable_search(ramdisk->inprogress, &key))))
+				return -1;
+		}
+		memcpy(buf + i * ramdisk->sector_size, v, ramdisk->sector_size);
+	}
+
+	return 0;
+}
+
+struct hashtable *ramdisk_new_hashtable(void)
+{
+	return create_hashtable(RAMDISK_HASHSIZE, uint64_hash, rd_hash_equal);
+}
+
+int ramdisk_write_to_hashtable(struct hashtable *h, uint64_t sector,
+			       int nb_sectors, size_t sector_size, char* buf,
+			       const char *log_prefix)
+{
+	int i, rc;
+
+	for (i = 0; i < nb_sectors; i++) {
+		rc = ramdisk_write_hash(h, sector + i,
+					buf + i * sector_size,
+					sector_size, log_prefix);
+		if (rc)
+			return rc;
+	}
+
+	return 0;
+}
+
+void ramdisk_destroy_hashtable(struct hashtable *h)
+{
+	if (!h)
+		return;
+
+	hashtable_destroy(h, 1);
+}
diff --git a/tools/blktap2/drivers/block-replication.h b/tools/blktap2/drivers/block-replication.h
index 0bd6e71..fdc216e 100644
--- a/tools/blktap2/drivers/block-replication.h
+++ b/tools/blktap2/drivers/block-replication.h
@@ -110,4 +110,52 @@ int td_replication_server_restart(td_replication_connect_t *t);
  */
 int td_replication_client_start(td_replication_connect_t *t);
 
+/* I/O replication */
+typedef struct ramdisk ramdisk_t;
+struct ramdisk {
+	size_t sector_size;
+	const char *log_prefix;
+	td_image_t *image;
+
+	/* private */
+	/* count of outstanding requests to the base driver */
+	size_t inflight;
+	/* prev holds the requests to be flushed, while inprogress holds
+	 * requests being flushed. When requests complete, they are removed
+	 * from inprogress.
+	 * Whenever a new flush is merged with ongoing flush (i.e, prev),
+	 * we have to make sure that none of the new requests overlap with
+	 * ones in "inprogress". If it does, keep it back in prev and dont issue
+	 * IO until the current one finishes. If we allow this IO to proceed,
+	 * we might end up with two "overlapping" requests in the disk's queue and
+	 * the disk may not offer any guarantee on which one is written first.
+	 * IOW, make sure we dont create a write-after-write time ordering constraint.
+	 */
+	struct hashtable *prev;
+	struct hashtable *inprogress;
+};
+
+void ramdisk_init(ramdisk_t *ramdisk);
+void ramdisk_destroy(ramdisk_t *ramdisk);
+
+/* flush pending contents to disk */
+int ramdisk_flush(ramdisk_t *ramdisk);
+/* flush new contents to disk */
+int ramdisk_start_flush(ramdisk_t *ramdisk, struct hashtable **new);
+int ramdisk_writes_inflight(ramdisk_t *ramdisk);
+
+/*
+ * try to read from ramdisk. Return -1 if some sectors are not in
+ * ramdisk. Otherwise, return 0.
+ */
+int ramdisk_read(struct ramdisk *ramdisk, uint64_t sector,
+		 int nb_sectors, char *buf);
+
+/* create a new hashtable that can be used by ramdisk */
+struct hashtable *ramdisk_new_hashtable(void);
+int ramdisk_write_to_hashtable(struct hashtable *h, uint64_t sector,
+			       int nb_sectors, size_t sector_size, char* buf,
+			       const char *log_prefix);
+void ramdisk_destroy_hashtable(struct hashtable *h);
+
 #endif
-- 
1.9.3

  parent reply	other threads:[~2014-08-08  7:01 UTC|newest]

Thread overview: 64+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2014-08-08  7:00 [RFC Patch v2 00/45] COarse-grain LOck-stepping Virtual Machines for Non-stop Service Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 01/45] copy the correct page to memory Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 02/45] csum the correct page Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 03/45] don't zero out ioreq page Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 04/45] Refactor domain_suspend_callback_common() Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 05/45] Update libxl__domain_resume() for colo Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 06/45] Update libxl__domain_suspend_common_switch_qemu_logdirty() " Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 07/45] Introduce a new internal API libxl__domain_unpause() Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 08/45] Update libxl__domain_unpause() to support qemu-xen Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 09/45] support to resume uncooperative HVM guests Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 10/45] update datecopier to support sending data only Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 11/45] introduce a new API to aync read data from fd Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 12/45] move remus related codes to libxl_remus.c Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 13/45] rename remus device to checkpoint device Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 14/45] adjust the indentation Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 15/45] don't touch remus in checkpoint_device Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 16/45] Update libxl_save_msgs_gen.pl to support return data from xl to xc Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 17/45] Allow slave sends data to master Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 18/45] secondary vm suspend/resume/checkpoint code Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 19/45] primary vm suspend/get_dirty_pfn/resume/checkpoint code Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 20/45] xc_domain_save: flush cache before calling callbacks->postcopy() in colo mode Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 21/45] COLO: xc related codes Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 22/45] send store mfn and console mfn to xl before resuming secondary vm Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 23/45] implement the cmdline for COLO Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 24/45] HACK: do checkpoint per 20ms Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 25/45] colo: dynamic allocate aio_requests to avoid -EBUSY error Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 26/45] fix memory leak in block-remus Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 27/45] pass uuid to the callback td_open Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 28/45] return the correct dev path Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 29/45] blktap2: use correct way to get remus_image Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 30/45] don't call client_flush() when switching to unprotected mode Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 31/45] remus: fix bug in tdremus_close() Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 32/45] blktap2: use correct way to get free event id Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 33/45] blktap2: don't return negative " Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 34/45] blktap2: use correct way to define array Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 35/45] blktap2: connect to backup asynchronously Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 36/45] switch to unprotected mode before closing Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 37/45] blktap2: move async connect related codes to block-replication.c Wen Congyang
2014-08-08  7:01 ` Wen Congyang [this message]
2014-08-08  7:01 ` [RFC Patch v2 39/45] block-colo: implement colo disk replication Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 40/45] pass correct file to qemu if we use blktap2 Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 41/45] support blktap remus in xl Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 42/45] support blktap colo in xl: Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 43/45] update libxl__device_disk_from_xs_be() to support blktap device Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 44/45] libxl/colo: setup and control disk replication for blktap2 backends Wen Congyang
2014-08-08  7:01 ` [RFC Patch v2 45/45] x86/hvm: Always set pending event injection when loading VMC[BS] state Wen Congyang
2014-08-08  7:24   ` Jan Beulich
2014-08-08  7:29     ` Wen Congyang
2014-08-26 16:02   ` Jan Beulich
2014-08-27  0:46     ` Wen Congyang
2014-08-27 14:58       ` Aravind Gopalakrishnan
2014-08-28  1:04         ` Wen Congyang
2014-08-28  8:54           ` Andrew Cooper
2014-08-28 11:17             ` Wen Congyang
2014-08-28 11:31               ` Paul Durrant
2014-08-29  5:59                 ` Wen Congyang
2014-08-28  9:53         ` Tim Deegan
2014-08-27 23:24     ` Tian, Kevin
2014-08-27 15:02   ` Andrew Cooper
2014-08-08  7:01 ` [RFC Patch v2 46/45] Introduce "xen-load-devices-state" Wen Congyang
2014-08-08  7:19 ` [RFC Patch v2 00/45] COarse-grain LOck-stepping Virtual Machines for Non-stop Service Jan Beulich
2014-08-08  7:39   ` Wen Congyang
2014-08-08  8:21   ` Wen Congyang
2014-08-08  9:02     ` Jan Beulich

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1407481305-19808-39-git-send-email-wency@cn.fujitsu.com \
    --to=wency@cn.fujitsu.com \
    --cc=Ian.Campbell@citrix.com \
    --cc=Ian.Jackson@eu.citrix.com \
    --cc=eddie.dong@intel.com \
    --cc=laijs@cn.fujitsu.com \
    --cc=xen-devel@lists.xen.org \
    --cc=yanghy@cn.fujitsu.com \
    --cc=yunhong.jiang@intel.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).