From mboxrd@z Thu Jan 1 00:00:00 1970 From: Shriram Rajagopalan Subject: Re: [PATCH 15/17] tools: blktap2: move ramdisk related codes to block-replication.c Date: Sun, 19 Oct 2014 22:52:27 -0400 Message-ID: References: <1413252845-23433-1-git-send-email-wency@cn.fujitsu.com> <1413252845-23433-16-git-send-email-wency@cn.fujitsu.com> Reply-To: rshriram@cs.ubc.ca Mime-Version: 1.0 Content-Type: multipart/mixed; boundary="===============3653136825524742269==" Return-path: In-Reply-To: <1413252845-23433-16-git-send-email-wency@cn.fujitsu.com> List-Unsubscribe: , List-Post: List-Help: List-Subscribe: , Sender: xen-devel-bounces@lists.xen.org Errors-To: xen-devel-bounces@lists.xen.org To: Wen Congyang Cc: Lai Jiangshan , Ian Jackson , Jiang Yunhong , Dong Eddie , xen devel , Yang Hongyang , Ian Campbell List-Id: xen-devel@lists.xenproject.org --===============3653136825524742269== Content-Type: multipart/alternative; boundary=089e01183eda1e49750505d1ce3b --089e01183eda1e49750505d1ce3b Content-Type: text/plain; charset=UTF-8 On Oct 13, 2014 10:13 PM, "Wen Congyang" wrote: > > COLO will reuse them > > Signed-off-by: Wen Congyang > Cc: Shriram Rajagopalan > --- > tools/blktap2/drivers/block-remus.c | 480 +----------------------------- > tools/blktap2/drivers/block-replication.c | 460 ++++++++++++++++++++++++++++ > tools/blktap2/drivers/block-replication.h | 65 ++++ > 3 files changed, 539 insertions(+), 466 deletions(-) > > diff --git a/tools/blktap2/drivers/block-remus.c b/tools/blktap2/drivers/block-remus.c > index 09dc46f..c7b429c 100644 > --- a/tools/blktap2/drivers/block-remus.c > +++ b/tools/blktap2/drivers/block-remus.c > @@ -37,9 +37,6 @@ > #include "tapdisk-server.h" > #include "tapdisk-driver.h" > #include "tapdisk-interface.h" > -#include "hashtable.h" > -#include "hashtable_itr.h" > -#include "hashtable_utility.h" > #include "block-replication.h" > > #include > @@ -58,7 +55,6 @@ > > /* timeout for reads and writes in ms */ > #define HEARTBEAT_MS 1000 > -#define RAMDISK_HASHSIZE 128 > > /* connect retry timeout (seconds) */ > #define REMUS_CONNRETRY_TIMEOUT 1 > @@ -97,51 +93,6 @@ td_vbd_t *device_vbd = NULL; > td_image_t *remus_image = NULL; > struct tap_disk tapdisk_remus; > > -struct ramdisk { > - size_t sector_size; > - struct hashtable* h; > - /* when a ramdisk is flushed, h is given a new empty hash for writes > - * while the old ramdisk (prev) is drained asynchronously. > - */ > - struct hashtable* prev; > - /* count of outstanding requests to the base driver */ > - size_t inflight; > - /* prev holds the requests to be flushed, while inprogress holds > - * requests being flushed. When requests complete, they are removed > - * from inprogress. > - * Whenever a new flush is merged with ongoing flush (i.e, prev), > - * we have to make sure that none of the new requests overlap with > - * ones in "inprogress". If it does, keep it back in prev and dont issue > - * IO until the current one finishes. If we allow this IO to proceed, > - * we might end up with two "overlapping" requests in the disk's queue and > - * the disk may not offer any guarantee on which one is written first. > - * IOW, make sure we dont create a write-after-write time ordering constraint. > - * > - */ > - struct hashtable* inprogress; > -}; > - > -/* the ramdisk intercepts the original callback for reads and writes. > - * This holds the original data. */ > -/* Might be worth making this a static array in struct ramdisk to avoid > - * a malloc per request */ > - > -struct tdremus_state; > - > -struct ramdisk_cbdata { > - td_callback_t cb; > - void* private; > - char* buf; > - struct tdremus_state* state; > -}; > - > -struct ramdisk_write_cbdata { > - struct tdremus_state* state; > - char* buf; > -}; > - > -typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq); > - > /* poll_fd type for blktap2 fd system. taken from block_log.c */ > typedef struct poll_fd { > int fd; > @@ -168,7 +119,7 @@ struct tdremus_state { > */ > struct req_ring queued_io; > > - /* ramdisk data*/ > + /* ramdisk data */ > struct ramdisk ramdisk; > > /* mode methods */ > @@ -239,404 +190,14 @@ static void ring_add_request(struct req_ring *ring, const td_request_t *treq) > ring->prod = ring_next(ring->prod); > } > > -/* Prototype declarations */ > -static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s); > - > -/* functions to create and sumbit treq's */ > - > -static void > -replicated_write_callback(td_request_t treq, int err) > -{ > - struct tdremus_state *s = (struct tdremus_state *) treq.cb_data; > - td_vbd_request_t *vreq; > - int i; > - uint64_t start; > - vreq = (td_vbd_request_t *) treq.private; > - > - /* the write failed for now, lets panic. this is very bad */ > - if (err) { > - RPRINTF("ramdisk write failed, disk image is not consistent\n"); > - exit(-1); > - } > - > - /* The write succeeded. let's pull the vreq off whatever request list > - * it is on and free() it */ > - list_del(&vreq->next); > - free(vreq); > - > - s->ramdisk.inflight--; > - start = treq.sec; > - for (i = 0; i < treq.secs; i++) { > - hashtable_remove(s->ramdisk.inprogress, &start); > - start++; > - } > - free(treq.buf); > - > - if (!s->ramdisk.inflight && !s->ramdisk.prev) { > - /* TODO: the ramdisk has been flushed */ > - } > -} > - > -static inline int > -create_write_request(struct tdremus_state *state, td_sector_t sec, int secs, char *buf) > -{ > - td_request_t treq; > - td_vbd_request_t *vreq; > - > - treq.op = TD_OP_WRITE; > - treq.buf = buf; > - treq.sec = sec; > - treq.secs = secs; > - treq.image = remus_image; > - treq.cb = replicated_write_callback; > - treq.cb_data = state; > - treq.id = 0; > - treq.sidx = 0; > - > - vreq = calloc(1, sizeof(td_vbd_request_t)); > - treq.private = vreq; > - > - if(!vreq) > - return -1; > - > - vreq->submitting = 1; > - INIT_LIST_HEAD(&vreq->next); > - tapdisk_vbd_move_request(treq.private, &device_vbd->pending_requests); > - > - /* TODO: > - * we should probably leave it up to the caller to forward the request */ > - td_forward_request(treq); > - > - vreq->submitting--; > - > - return 0; > -} > - > - > -/* http://www.concentric.net/~Ttwang/tech/inthash.htm */ > -static unsigned int uint64_hash(void* k) > -{ > - uint64_t key = *(uint64_t*)k; > - > - key = (~key) + (key << 18); > - key = key ^ (key >> 31); > - key = key * 21; > - key = key ^ (key >> 11); > - key = key + (key << 6); > - key = key ^ (key >> 22); > - > - return (unsigned int)key; > -} > - > -static int rd_hash_equal(void* k1, void* k2) > -{ > - uint64_t key1, key2; > - > - key1 = *(uint64_t*)k1; > - key2 = *(uint64_t*)k2; > - > - return key1 == key2; > -} > - > -static int ramdisk_read(struct ramdisk* ramdisk, uint64_t sector, > - int nb_sectors, char* buf) > -{ > - int i; > - char* v; > - uint64_t key; > - > - for (i = 0; i < nb_sectors; i++) { > - key = sector + i; > - /* check whether it is queued in a previous flush request */ > - if (!(ramdisk->prev && (v = hashtable_search(ramdisk->prev, &key)))) { > - /* check whether it is an ongoing flush */ > - if (!(ramdisk->inprogress && (v = hashtable_search(ramdisk->inprogress, &key)))) > - return -1; > - } > - memcpy(buf + i * ramdisk->sector_size, v, ramdisk->sector_size); > - } > - > - return 0; > -} > - > -static int ramdisk_write_hash(struct hashtable* h, uint64_t sector, char* buf, > - size_t len) > -{ > - char* v; > - uint64_t* key; > - > - if ((v = hashtable_search(h, §or))) { > - memcpy(v, buf, len); > - return 0; > - } > - > - if (!(v = malloc(len))) { > - DPRINTF("ramdisk_write_hash: malloc failed\n"); > - return -1; > - } > - memcpy(v, buf, len); > - if (!(key = malloc(sizeof(*key)))) { > - DPRINTF("ramdisk_write_hash: error allocating key\n"); > - free(v); > - return -1; > - } > - *key = sector; > - if (!hashtable_insert(h, key, v)) { > - DPRINTF("ramdisk_write_hash failed on sector %" PRIu64 "\n", sector); > - free(key); > - free(v); > - return -1; > - } > - > - return 0; > -} > - > -static inline int ramdisk_write(struct ramdisk* ramdisk, uint64_t sector, > - int nb_sectors, char* buf) > -{ > - int i, rc; > - > - for (i = 0; i < nb_sectors; i++) { > - rc = ramdisk_write_hash(ramdisk->h, sector + i, > - buf + i * ramdisk->sector_size, > - ramdisk->sector_size); > - if (rc) > - return rc; > - } > - > - return 0; > -} > - > -static int uint64_compare(const void* k1, const void* k2) > -{ > - uint64_t u1 = *(uint64_t*)k1; > - uint64_t u2 = *(uint64_t*)k2; > - > - /* u1 - u2 is unsigned */ > - return u1 < u2 ? -1 : u1 > u2 ? 1 : 0; > -} > - > -/* set psectors to an array of the sector numbers in the hash, returning > - * the number of entries (or -1 on error) */ > -static int ramdisk_get_sectors(struct hashtable* h, uint64_t** psectors) > -{ > - struct hashtable_itr* itr; > - uint64_t* sectors; > - int count; > - > - if (!(count = hashtable_count(h))) > - return 0; > - > - if (!(*psectors = malloc(count * sizeof(uint64_t)))) { > - DPRINTF("ramdisk_get_sectors: error allocating sector map\n"); > - return -1; > - } > - sectors = *psectors; > - > - itr = hashtable_iterator(h); > - count = 0; > - do { > - sectors[count++] = *(uint64_t*)hashtable_iterator_key(itr); > - } while (hashtable_iterator_advance(itr)); > - free(itr); > - > - return count; > -} > - > -/* > - return -1 for OOM > - return -2 for merge lookup failure > - return -3 for WAW race > - return 0 on success. > -*/ > -static int merge_requests(struct ramdisk* ramdisk, uint64_t start, > - size_t count, char **mergedbuf) > -{ > - char* buf; > - char* sector; > - int i; > - uint64_t *key; > - int rc = 0; > - > - if (!(buf = valloc(count * ramdisk->sector_size))) { > - DPRINTF("merge_request: allocation failed\n"); > - return -1; > - } > - > - for (i = 0; i < count; i++) { > - if (!(sector = hashtable_search(ramdisk->prev, &start))) { > - DPRINTF("merge_request: lookup failed on %"PRIu64"\n", start); > - free(buf); > - rc = -2; > - goto fail; > - } > - > - /* Check inprogress requests to avoid waw non-determinism */ > - if (hashtable_search(ramdisk->inprogress, &start)) { > - DPRINTF("merge_request: WAR RACE on %"PRIu64"\n", start); > - free(buf); > - rc = -3; > - goto fail; > - } > - /* Insert req into inprogress (brief period of duplication of hash entries until > - * they are removed from prev. Read tracking would not be reading wrong entries) > - */ > - if (!(key = malloc(sizeof(*key)))) { > - DPRINTF("%s: error allocating key\n", __FUNCTION__); > - free(buf); > - rc = -1; > - goto fail; > - } > - *key = start; > - if (!hashtable_insert(ramdisk->inprogress, key, NULL)) { > - DPRINTF("%s failed to insert sector %" PRIu64 " into inprogress hash\n", > - __FUNCTION__, start); > - free(key); > - free(buf); > - rc = -1; > - goto fail; > - } > - memcpy(buf + i * ramdisk->sector_size, sector, ramdisk->sector_size); > - start++; > - } > - > - *mergedbuf = buf; > - return 0; > -fail: > - for (start--; i >0; i--, start--) > - hashtable_remove(ramdisk->inprogress, &start); > - return rc; > -} > - > -/* The underlying driver may not handle having the whole ramdisk queued at > - * once. We queue what we can and let the callbacks attempt to queue more. */ > -/* NOTE: may be called from callback, while dd->private still belongs to > - * the underlying driver */ > -static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s) > -{ > - uint64_t* sectors; > - char* buf = NULL; > - uint64_t base, batchlen; > - int i, j, count = 0; > - > - // RPRINTF("ramdisk flush\n"); > - > - if ((count = ramdisk_get_sectors(s->ramdisk.prev, §ors)) <= 0) > - return count; > - > - /* Create the inprogress table if empty */ > - if (!s->ramdisk.inprogress) > - s->ramdisk.inprogress = create_hashtable(RAMDISK_HASHSIZE, > - uint64_hash, > - rd_hash_equal); > - > - /* > - RPRINTF("ramdisk: flushing %d sectors\n", count); > - */ > - > - /* sort and merge sectors to improve disk performance */ > - qsort(sectors, count, sizeof(*sectors), uint64_compare); > - > - for (i = 0; i < count;) { > - base = sectors[i++]; > - while (i < count && sectors[i] == sectors[i-1] + 1) > - i++; > - batchlen = sectors[i-1] - base + 1; > - > - j = merge_requests(&s->ramdisk, base, batchlen, &buf); > - > - if (j) { > - RPRINTF("ramdisk_flush: merge_requests failed:%s\n", > - j == -1? "OOM": (j==-2? "missing sector" : "WAW race")); > - if (j == -3) continue; > - free(sectors); > - return -1; > - } > - > - /* NOTE: create_write_request() creates a treq AND forwards it down > - * the driver chain */ > - // RPRINTF("forwarding write request at %" PRIu64 ", length: %" PRIu64 "\n", base, batchlen); > - create_write_request(s, base, batchlen, buf); > - //RPRINTF("write request at %" PRIu64 ", length: %" PRIu64 " forwarded\n", base, batchlen); > - > - s->ramdisk.inflight++; > - > - for (j = 0; j < batchlen; j++) { > - buf = hashtable_search(s->ramdisk.prev, &base); > - free(buf); > - hashtable_remove(s->ramdisk.prev, &base); > - base++; > - } > - } > - > - if (!hashtable_count(s->ramdisk.prev)) { > - /* everything is in flight */ > - hashtable_destroy(s->ramdisk.prev, 0); > - s->ramdisk.prev = NULL; > - } > - > - free(sectors); > - > - // RPRINTF("ramdisk flush done\n"); > - return 0; > -} > - > -/* flush ramdisk contents to disk */ > -static int ramdisk_start_flush(td_driver_t *driver) > -{ > - struct tdremus_state *s = (struct tdremus_state *)driver->data; > - uint64_t* key; > - char* buf; > - int rc = 0; > - int i, j, count, batchlen; > - uint64_t* sectors; > - > - if (!hashtable_count(s->ramdisk.h)) { > - /* > - RPRINTF("Nothing to flush\n"); > - */ > - return 0; > - } > - > - if (s->ramdisk.prev) { > - /* a flush request issued while a previous flush is still in progress > - * will merge with the previous request. If you want the previous > - * request to be consistent, wait for it to complete. */ > - if ((count = ramdisk_get_sectors(s->ramdisk.h, §ors)) < 0) > - return count; > - > - for (i = 0; i < count; i++) { > - buf = hashtable_search(s->ramdisk.h, sectors + i); > - ramdisk_write_hash(s->ramdisk.prev, sectors[i], buf, > - s->ramdisk.sector_size); > - } > - free(sectors); > - > - hashtable_destroy (s->ramdisk.h, 1); > - } else > - s->ramdisk.prev = s->ramdisk.h; > - > - /* We create a new hashtable so that new writes can be performed before > - * the old hashtable is completely drained. */ > - s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash, > - rd_hash_equal); > - > - return ramdisk_flush(driver, s); > -} > - > - > static int ramdisk_start(td_driver_t *driver) > { > struct tdremus_state *s = (struct tdremus_state *)driver->data; > > - if (s->ramdisk.h) { > - RPRINTF("ramdisk already allocated\n"); > - return 0; > - } > - > s->ramdisk.sector_size = driver->info.sector_size; > - s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash, > - rd_hash_equal); > + s->ramdisk.log_prefix = "remus"; > + s->ramdisk.image = remus_image; > + ramdisk_init(&s->ramdisk); > > DPRINTF("Ramdisk started, %zu bytes/sector\n", s->ramdisk.sector_size); > > @@ -917,13 +478,9 @@ static int client_flush(td_driver_t *driver) > static int server_flush(td_driver_t *driver) > { > struct tdremus_state *s = (struct tdremus_state *)driver->data; > - /* > - * Nothing to flush in beginning. > - */ > - if (!s->ramdisk.prev) > - return 0; > + > /* Try to flush any remaining requests */ > - return ramdisk_flush(driver, s); > + return ramdisk_flush_pended_requests(&s->ramdisk); > } > > /* It is called when switching the mode from primary to unprotected */ > @@ -1030,10 +587,7 @@ static inline int server_writes_inflight(td_driver_t *driver) > { > struct tdremus_state *s = (struct tdremus_state *)driver->data; > > - if (!s->ramdisk.inflight && !s->ramdisk.prev) > - return 0; > - > - return 1; > + return ramdisk_writes_inflight(&s->ramdisk); > } > > /* Due to block device prefetching this code may be called on the server side > @@ -1116,7 +670,9 @@ static void server_do_wreq(td_driver_t *driver) > if (mread(s->stream_fd.fd, buf, len) < 0) > goto err; > > - if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0) { > + if (ramdisk_cache_write_request(&s->ramdisk, *sector, *sectors, > + driver->info.sector_size, buf, > + "remus") < 0) { > rc = ERROR_INTERNAL; > goto err; > } > @@ -1137,7 +693,7 @@ static void server_do_creq(td_driver_t *driver) > > // RPRINTF("committing buffer\n"); > > - ramdisk_start_flush(driver); > + ramdisk_start_flush(&s->ramdisk); > > /* XXX this message should not be sent until flush completes! */ > if (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) != 4) > @@ -1184,12 +740,7 @@ void unprotected_queue_read(td_driver_t *driver, td_request_t treq) > > /* wait for previous ramdisk to flush before servicing reads */ > if (server_writes_inflight(driver)) { > - /* for now lets just return EBUSY. > - * if there are any left-over requests in prev, > - * kick em again. > - */ > - if(!s->ramdisk.inflight) /* nothing in inprogress */ > - ramdisk_flush(driver, s); > + ramdisk_flush_pended_requests(&s->ramdisk); > > td_complete_request(treq, -EBUSY); > } > @@ -1207,8 +758,7 @@ void unprotected_queue_write(td_driver_t *driver, td_request_t treq) > /* wait for previous ramdisk to flush */ > if (server_writes_inflight(driver)) { > RPRINTF("queue_write: waiting for queue to drain"); > - if(!s->ramdisk.inflight) /* nothing in inprogress. Kick prev */ > - ramdisk_flush(driver, s); > + ramdisk_flush_pended_requests(&s->ramdisk); > td_complete_request(treq, -EBUSY); > } > else { > @@ -1518,9 +1068,7 @@ static int tdremus_close(td_driver_t *driver) > struct tdremus_state *s = (struct tdremus_state *)driver->data; > > RPRINTF("closing\n"); > - if (s->ramdisk.inprogress) > - hashtable_destroy(s->ramdisk.inprogress, 0); > - > + ramdisk_destroy(&s->ramdisk); > td_replication_connect_kill(&s->t); > ctl_unregister(s); > ctl_close(s); > diff --git a/tools/blktap2/drivers/block-replication.c b/tools/blktap2/drivers/block-replication.c > index e4b2679..82d7609 100644 > --- a/tools/blktap2/drivers/block-replication.c > +++ b/tools/blktap2/drivers/block-replication.c > @@ -15,6 +15,10 @@ > > #include "tapdisk-server.h" > #include "block-replication.h" > +#include "tapdisk-interface.h" > +#include "hashtable.h" > +#include "hashtable_itr.h" > +#include "hashtable_utility.h" > > #include > #include > @@ -30,6 +34,8 @@ > #define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "%s: " _f, log_prefix, ## _a) > #define EPRINTF(_f, _a...) syslog (LOG_ERR, "%s: " _f, log_prefix, ## _a) > > +#define RAMDISK_HASHSIZE 128 > + > /* connection status */ > enum { > connection_none, > @@ -466,3 +472,457 @@ static void td_replication_connect_event(event_id_t id, char mode, > fail: > td_replication_client_failed(t, rc); > } > + > + > +/* I/O replication */ > +static void replicated_write_callback(td_request_t treq, int err) > +{ > + ramdisk_t *ramdisk = treq.cb_data; > + td_vbd_request_t *vreq = treq.private; > + int i; > + uint64_t start; > + const char *log_prefix = ramdisk->log_prefix; > + > + /* the write failed for now, lets panic. this is very bad */ > + if (err) { > + EPRINTF("ramdisk write failed, disk image is not consistent\n"); > + exit(-1); > + } > + > + /* > + * The write succeeded. let's pull the vreq off whatever request list > + * it is on and free() it > + */ > + list_del(&vreq->next); > + free(vreq); > + > + ramdisk->inflight--; > + start = treq.sec; > + for (i = 0; i < treq.secs; i++) { > + hashtable_remove(ramdisk->inprogress, &start); > + start++; > + } > + free(treq.buf); > + > + if (!ramdisk->inflight && ramdisk->prev) > + ramdisk_flush_pended_requests(ramdisk); > +} > + > +static int > +create_write_request(ramdisk_t *ramdisk, td_sector_t sec, int secs, char *buf) > +{ > + td_request_t treq; > + td_vbd_request_t *vreq; > + td_vbd_t *vbd = ramdisk->image->private; > + > + treq.op = TD_OP_WRITE; > + treq.buf = buf; > + treq.sec = sec; > + treq.secs = secs; > + treq.image = ramdisk->image; > + treq.cb = replicated_write_callback; > + treq.cb_data = ramdisk; > + treq.id = 0; > + treq.sidx = 0; > + > + vreq = calloc(1, sizeof(td_vbd_request_t)); > + treq.private = vreq; > + > + if(!vreq) > + return -1; > + > + vreq->submitting = 1; > + INIT_LIST_HEAD(&vreq->next); > + tapdisk_vbd_move_request(treq.private, &vbd->pending_requests); > + > + td_forward_request(treq); > + > + vreq->submitting--; > + > + return 0; > +} > + > +/* http://www.concentric.net/~Ttwang/tech/inthash.htm */ > +static unsigned int uint64_hash(void *k) > +{ > + uint64_t key = *(uint64_t*)k; > + > + key = (~key) + (key << 18); > + key = key ^ (key >> 31); > + key = key * 21; > + key = key ^ (key >> 11); > + key = key + (key << 6); > + key = key ^ (key >> 22); > + > + return (unsigned int)key; > +} > + > +static int rd_hash_equal(void *k1, void *k2) > +{ > + uint64_t key1, key2; > + > + key1 = *(uint64_t*)k1; > + key2 = *(uint64_t*)k2; > + > + return key1 == key2; > +} > + > +static int uint64_compare(const void *k1, const void *k2) > +{ > + uint64_t u1 = *(uint64_t*)k1; > + uint64_t u2 = *(uint64_t*)k2; > + > + /* u1 - u2 is unsigned */ > + return u1 < u2 ? -1 : u1 > u2 ? 1 : 0; > +} > + > +static struct hashtable *ramdisk_new_hashtable(void) > +{ > + return create_hashtable(RAMDISK_HASHSIZE, uint64_hash, rd_hash_equal); > +} > + > +/* > + * set psectors to an array of the sector numbers in the hash, returning > + * the number of entries (or -1 on error) > + */ > +static int ramdisk_get_sectors(struct hashtable *h, uint64_t **psectors, > + const char *log_prefix) > +{ > + struct hashtable_itr* itr; > + uint64_t* sectors; > + int count; > + > + if (!(count = hashtable_count(h))) > + return 0; > + > + if (!(*psectors = malloc(count * sizeof(uint64_t)))) { > + DPRINTF("ramdisk_get_sectors: error allocating sector map\n"); > + return -1; > + } > + sectors = *psectors; > + > + itr = hashtable_iterator(h); > + count = 0; > + do { > + sectors[count++] = *(uint64_t*)hashtable_iterator_key(itr); > + } while (hashtable_iterator_advance(itr)); > + free(itr); > + > + return count; > +} > + > +static int ramdisk_write_hash(struct hashtable *h, uint64_t sector, char *buf, > + size_t len, const char *log_prefix) > +{ > + char *v; > + uint64_t *key; > + > + if ((v = hashtable_search(h, §or))) { > + memcpy(v, buf, len); > + return 0; > + } > + > + if (!(v = malloc(len))) { > + DPRINTF("ramdisk_write_hash: malloc failed\n"); > + return -1; > + } > + memcpy(v, buf, len); > + if (!(key = malloc(sizeof(*key)))) { > + DPRINTF("ramdisk_write_hash: error allocating key\n"); > + free(v); > + return -1; > + } > + *key = sector; > + if (!hashtable_insert(h, key, v)) { > + DPRINTF("ramdisk_write_hash failed on sector %" PRIu64 "\n", sector); > + free(key); > + free(v); > + return -1; > + } > + > + return 0; > +} > + > +/* > + * return -1 for OOM > + * return -2 for merge lookup failure(should not happen) > + * return -3 for WAW race > + * return 0 on success. > + */ > +static int merge_requests(ramdisk_t *ramdisk, uint64_t start, > + size_t count, char **mergedbuf) > +{ > + char* buf; > + char* sector; > + int i; > + uint64_t *key; > + int rc = 0; > + const char *log_prefix = ramdisk->log_prefix; > + > + if (!(buf = valloc(count * ramdisk->sector_size))) { > + DPRINTF("merge_request: allocation failed\n"); > + return -1; > + } > + > + for (i = 0; i < count; i++) { > + if (!(sector = hashtable_search(ramdisk->prev, &start))) { > + EPRINTF("merge_request: lookup failed on %"PRIu64"\n", > + start); > + free(buf); > + rc = -2; > + goto fail; > + } > + > + /* Check inprogress requests to avoid waw non-determinism */ > + if (hashtable_search(ramdisk->inprogress, &start)) { > + DPRINTF("merge_request: WAR RACE on %"PRIu64"\n", > + start); > + free(buf); > + rc = -3; > + goto fail; > + } > + > + /* > + * Insert req into inprogress (brief period of duplication of > + * hash entries until they are removed from prev. Read tracking > + * would not be reading wrong entries) > + */ > + if (!(key = malloc(sizeof(*key)))) { > + EPRINTF("%s: error allocating key\n", __FUNCTION__); > + free(buf); > + rc = -1; > + goto fail; > + } > + *key = start; > + if (!hashtable_insert(ramdisk->inprogress, key, NULL)) { > + EPRINTF("%s failed to insert sector %" PRIu64 " into inprogress hash\n", > + __FUNCTION__, start); > + free(key); > + free(buf); > + rc = -1; > + goto fail; > + } > + > + memcpy(buf + i * ramdisk->sector_size, sector, ramdisk->sector_size); > + start++; > + } > + > + *mergedbuf = buf; > + return 0; > +fail: > + for (start--; i > 0; i--, start--) > + hashtable_remove(ramdisk->inprogress, &start); > + return rc; > +} > + > +#define HASHTABLE_DESTROY(hashtable, free) \ > + do { \ > + if (hashtable) { \ > + hashtable_destroy(hashtable, free); \ > + hashtable = NULL; \ > + } \ > + } while (0) > + > +int ramdisk_init(ramdisk_t *ramdisk) > +{ > + ramdisk->inflight = 0; > + ramdisk->prev = NULL; > + ramdisk->inprogress = NULL; > + ramdisk->primary_cache = ramdisk_new_hashtable(); > + if (!ramdisk->primary_cache) > + return -1; > + > + return 0; > +} > + > +void ramdisk_destroy(ramdisk_t *ramdisk) > +{ > + const char *log_prefix = ramdisk->log_prefix; > + > + /* > + * ramdisk_destroy() is called only when we will close the tapdisk image. > + * In this case, there are no pending requests in vbd. > + * > + * If ramdisk->inflight is not 0, it means that the requests created by > + * us are still in vbd->pending_requests. > + */ > + if (ramdisk->inflight) { > + /* should not happen */ > + EPRINTF("cannot destroy ramdisk\n"); > + return; > + } > + > + HASHTABLE_DESTROY(ramdisk->inprogress, 0); > + HASHTABLE_DESTROY(ramdisk->prev, 1); > + HASHTABLE_DESTROY(ramdisk->primary_cache, 1); > +} > + > +int ramdisk_read(ramdisk_t *ramdisk, uint64_t sector, > + int nb_sectors, char *buf) > +{ > + int i; > + char *v; > + uint64_t key; > + > + for (i = 0; i < nb_sectors; i++) { > + key = sector + i; > + /* check whether it is queued in a previous flush request */ > + if (!(ramdisk->prev && > + (v = hashtable_search(ramdisk->prev, &key)))) { > + /* check whether it is an ongoing flush */ > + if (!(ramdisk->inprogress && > + (v = hashtable_search(ramdisk->inprogress, &key)))) > + return -1; > + } > + memcpy(buf + i * ramdisk->sector_size, v, ramdisk->sector_size); > + } > + > + return 0; > +} > + > +int ramdisk_cache_write_request(ramdisk_t *ramdisk, uint64_t sector, > + int nb_sectors, size_t sector_size, > + char *buf, const char *log_prefix) > +{ > + int i, rc; > + > + for (i = 0; i < nb_sectors; i++) { > + rc = ramdisk_write_hash(ramdisk->primary_cache, sector + i, > + buf + i * sector_size, > + sector_size, log_prefix); > + if (rc) > + return rc; > + } > + > + return 0; > +} > + > +int ramdisk_flush_pended_requests(ramdisk_t *ramdisk) > +{ > + uint64_t *sectors; > + char *buf = NULL; > + uint64_t base, batchlen; > + int i, j, count = 0; > + const char *log_prefix = ramdisk->log_prefix; > + > + /* everything is in flight */ > + if (!ramdisk->prev) > + return 0; > + > + count = ramdisk_get_sectors(ramdisk->prev, §ors, log_prefix); > + if (count <= 0) > + /* should not happen */ > + return count; > + > + /* Create the inprogress table if empty */ > + if (!ramdisk->inprogress) { > + ramdisk->inprogress = ramdisk_new_hashtable(); > + if (!ramdisk->inprogress) { > + EPRINTF("ramdisk_flush: creating the inprogress table failed:OOM\n"); > + return -1; > + } > + } > + > + /* sort and merge sectors to improve disk performance */ > + qsort(sectors, count, sizeof(*sectors), uint64_compare); > + > + for (i = 0; i < count;) { > + base = sectors[i++]; > + while (i < count && sectors[i] == sectors[i-1] + 1) > + i++; > + batchlen = sectors[i-1] - base + 1; > + > + j = merge_requests(ramdisk, base, batchlen, &buf); > + if (j) { > + EPRINTF("ramdisk_flush: merge_requests failed:%s\n", > + j == -1 ? "OOM" : > + (j == -2 ? "missing sector" : > + "WAW race")); > + if (j == -3) > + continue; > + free(sectors); > + return -1; > + } > + > + /* > + * NOTE: create_write_request() creates a treq AND forwards > + * it down the driver chain > + * > + * TODO: handle create_write_request()'s error. > + */ > + create_write_request(ramdisk, base, batchlen, buf); > + > + ramdisk->inflight++; > + > + for (j = 0; j < batchlen; j++) { > + buf = hashtable_search(ramdisk->prev, &base); > + free(buf); > + hashtable_remove(ramdisk->prev, &base); > + base++; > + } > + } > + > + if (!hashtable_count(ramdisk->prev)) > + /* everything is in flight */ > + HASHTABLE_DESTROY(ramdisk->prev, 0); > + > + free(sectors); > + return 0; > +} > + > +int ramdisk_start_flush(ramdisk_t *ramdisk) > +{ > + uint64_t *key; > + char *buf; > + int rc = 0; > + int i, j, count, batchlen; > + uint64_t *sectors; > + const char *log_prefix = ramdisk->log_prefix; > + struct hashtable *cache; > + > + cache = ramdisk->primary_cache; > + if (!hashtable_count(cache)) > + return 0; > + > + if (ramdisk->prev) { > + /* > + * a flush request issued while a previous flush is still in > + * progress will merge with the previous request. If you want > + * the previous request to be consistent, wait for it to > + * complete. > + */ > + count = ramdisk_get_sectors(cache, §ors, log_prefix); > + if (count < 0 ) > + return count; > + > + for (i = 0; i < count; i++) { > + buf = hashtable_search(cache, sectors + i); > + ramdisk_write_hash(ramdisk->prev, sectors[i], buf, > + ramdisk->sector_size, log_prefix); > + } > + free(sectors); > + > + hashtable_destroy(cache, 1); > + } else > + ramdisk->prev = cache; > + > + /* > + * We create a new hashtable so that new writes can be performed before > + * the old hashtable is completely drained. > + */ > + ramdisk->primary_cache = ramdisk_new_hashtable(); > + if (!ramdisk->primary_cache) { > + EPRINTF("ramdisk_start_flush: creating cache table failed: OOM\n"); > + return -1; > + } > + > + return ramdisk_flush_pended_requests(ramdisk); > +} > + > +int ramdisk_writes_inflight(ramdisk_t *ramdisk) > +{ > + if (!ramdisk->inflight && !ramdisk->prev) > + return 0; > + > + return 1; > +} > diff --git a/tools/blktap2/drivers/block-replication.h b/tools/blktap2/drivers/block-replication.h > index 358c08b..cbdac3c 100644 > --- a/tools/blktap2/drivers/block-replication.h > +++ b/tools/blktap2/drivers/block-replication.h > @@ -110,4 +110,69 @@ int td_replication_server_restart(td_replication_connect_t *t); > */ > int td_replication_client_start(td_replication_connect_t *t); > > +/* I/O replication */ > +typedef struct ramdisk ramdisk_t; > +struct ramdisk { > + size_t sector_size; > + const char *log_prefix; > + td_image_t *image; > + > + /* private */ > + /* count of outstanding requests to the base driver */ > + size_t inflight; > + /* prev holds the requests to be flushed, while inprogress holds > + * requests being flushed. When requests complete, they are removed > + * from inprogress. > + * Whenever a new flush is merged with ongoing flush (i.e, prev), > + * we have to make sure that none of the new requests overlap with > + * ones in "inprogress". If it does, keep it back in prev and dont issue > + * IO until the current one finishes. If we allow this IO to proceed, > + * we might end up with two "overlapping" requests in the disk's queue and > + * the disk may not offer any guarantee on which one is written first. > + * IOW, make sure we dont create a write-after-write time ordering constraint. > + */ > + struct hashtable *prev; > + struct hashtable *inprogress; > + /* > + * The primary write request is queued in this > + * hashtable, and will be flushed to ramdisk when > + * the checkpoint finishes. > + */ > + struct hashtable *primary_cache; > +}; > + > +int ramdisk_init(ramdisk_t *ramdisk); > +void ramdisk_destroy(ramdisk_t *ramdisk); > + > +/* > + * try to read from ramdisk. Return -1 if some sectors are not in > + * ramdisk. Otherwise, return 0. > + */ > +int ramdisk_read(ramdisk_t *ramdisk, uint64_t sector, > + int nb_sectors, char *buf); > + > +/* > + * cache the write requests, and it will be flushed after a > + * new checkpoint finishes > + */ > +int ramdisk_cache_write_request(ramdisk_t *ramdisk, uint64_t sector, > + int nb_sectors, size_t sector_size, > + char* buf, const char *log_prefix); > + > +/* flush pended write requests to disk */ > +int ramdisk_flush_pended_requests(ramdisk_t *ramdisk); > +/* > + * flush cached write requests to disk. If WAW is detected, the cached > + * write requests will be moved to pended queue. The pended write > + * requests will be auto flushed after all inprogress write requests > + * are flushed to disk. This function don't wait all write requests > + * are flushed to disk. > + */ > +int ramdisk_start_flush(ramdisk_t *ramdisk); > +/* > + * Return true if some write reqeusts are inprogress or pended, > + * otherwise return false > + */ > +int ramdisk_writes_inflight(ramdisk_t *ramdisk); > + > #endif > -- > 1.9.3 > Acked-by: Shriram Rajagopalan --089e01183eda1e49750505d1ce3b Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable

On Oct 13, 2014 10:13 PM, "Wen Congyang" <wency@cn.fujitsu.com> wrote:
>
> COLO will reuse them
>
> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
> Cc: Shriram Rajagopalan <rshr= iram@cs.ubc.ca>
> ---
> =C2=A0tools/blktap2/drivers/block-remus.c=C2=A0 =C2=A0 =C2=A0 =C2=A0| = 480 +-----------------------------
> =C2=A0tools/blktap2/drivers/block-replication.c | 460 ++++++++++++++++= ++++++++++++
> =C2=A0tools/blktap2/drivers/block-replication.h |=C2=A0 65 ++++
> =C2=A03 files changed, 539 insertions(+), 466 deletions(-)
>
> diff --git a/tools/blktap2/drivers/block-remus.c b/tools/blktap2/drive= rs/block-remus.c
> index 09dc46f..c7b429c 100644
> --- a/tools/blktap2/drivers/block-remus.c
> +++ b/tools/blktap2/drivers/block-remus.c
> @@ -37,9 +37,6 @@
> =C2=A0#include "tapdisk-server.h"
> =C2=A0#include "tapdisk-driver.h"
> =C2=A0#include "tapdisk-interface.h"
> -#include "hashtable.h"
> -#include "hashtable_itr.h"
> -#include "hashtable_utility.h"
> =C2=A0#include "block-replication.h"
>
> =C2=A0#include <errno.h>
> @@ -58,7 +55,6 @@
>
> =C2=A0/* timeout for reads and writes in ms */
> =C2=A0#define HEARTBEAT_MS 1000
> -#define RAMDISK_HASHSIZE 128
>
> =C2=A0/* connect retry timeout (seconds) */
> =C2=A0#define REMUS_CONNRETRY_TIMEOUT 1
> @@ -97,51 +93,6 @@ td_vbd_t *device_vbd =3D NULL;
> =C2=A0td_image_t *remus_image =3D NULL;
> =C2=A0struct tap_disk tapdisk_remus;
>
> -struct ramdisk {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0size_t sector_size;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0struct hashtable* h;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* when a ramdisk is flushed, h is given a= new empty hash for writes
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * while the old ramdisk (prev) is drained= asynchronously.
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0struct hashtable* prev;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* count of outstanding requests to the ba= se driver */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0size_t inflight;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* prev holds the requests to be flushed, = while inprogress holds
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * requests being flushed. When requests c= omplete, they are removed
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * from inprogress.
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * Whenever a new flush is merged with ong= oing flush (i.e, prev),
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * we have to make sure that none of the n= ew requests overlap with
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * ones in "inprogress". If it d= oes, keep it back in prev and dont issue
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * IO until the current one finishes. If w= e allow this IO to proceed,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * we might end up with two "overlapp= ing" requests in the disk's queue and
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * the disk may not offer any guarantee on= which one is written first.
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * IOW, make sure we dont create a write-a= fter-write time ordering constraint.
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 *
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0struct hashtable* inprogress;
> -};
> -
> -/* the ramdisk intercepts the original callback for reads and writes.=
> - * This holds the original data. */
> -/* Might be worth making this a static array in struct ramdisk to avo= id
> - * a malloc per request */
> -
> -struct tdremus_state;
> -
> -struct ramdisk_cbdata {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0td_callback_t cb;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0void* private;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0char* buf;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0struct tdremus_state* state;
> -};
> -
> -struct ramdisk_write_cbdata {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0struct tdremus_state* state;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0char* buf;
> -};
> -
> -typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq);<= br> > -
> =C2=A0/* poll_fd type for blktap2 fd system. taken from block_log.c */=
> =C2=A0typedef struct poll_fd {
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 int=C2=A0 =C2=A0 =C2=A0 =C2=A0 fd;
> @@ -168,7 +119,7 @@ struct tdremus_state {
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0*/
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 struct req_ring queued_io;
>
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* ramdisk data*/
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/* ramdisk data */
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 struct ramdisk ramdisk;
>
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 /* mode methods */
> @@ -239,404 +190,14 @@ static void ring_add_request(struct req_ring *r= ing, const td_request_t *treq)
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 ring->prod =3D ring_next(ring->prod)= ;
> =C2=A0}
>
> -/* Prototype declarations */
> -static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s= );
> -
> -/* functions to create and sumbit treq's */
> -
> -static void
> -replicated_write_callback(td_request_t treq, int err)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0struct tdremus_state *s =3D (struct tdremu= s_state *) treq.cb_data;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0td_vbd_request_t *vreq;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int i;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t start;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0vreq =3D (td_vbd_request_t *) treq.private= ;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* the write failed for now, lets panic. t= his is very bad */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (err) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("= ramdisk write failed, disk image is not consistent\n");
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0exit(-1);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* The write succeeded. let's pull the= vreq off whatever request list
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * it is on and free() it */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0list_del(&vreq->next);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0free(vreq);
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0s->ramdisk.inflight--;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0start =3D treq.sec;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0for (i =3D 0; i < treq.secs; i++) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0hashtable_remo= ve(s->ramdisk.inprogress, &start);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0start++;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0free(treq.buf);
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!s->ramdisk.inflight && !s-= >ramdisk.prev) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/* TODO: the r= amdisk has been flushed */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -}
> -
> -static inline int
> -create_write_request(struct tdremus_state *state, td_sector_t sec, in= t secs, char *buf)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0td_request_t treq;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0td_vbd_request_t *vreq;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0treq.op=C2=A0 =C2=A0 =C2=A0 =3D TD_OP_WRIT= E;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0treq.buf=C2=A0 =C2=A0 =C2=A0=3D buf;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0treq.sec=C2=A0 =C2=A0 =C2=A0=3D sec;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0treq.secs=C2=A0 =C2=A0 =3D secs;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0treq.image=C2=A0 =C2=A0=3D remus_image; > -=C2=A0 =C2=A0 =C2=A0 =C2=A0treq.cb=C2=A0 =C2=A0 =C2=A0 =3D replicated= _write_callback;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0treq.cb_data =3D state;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0treq.id=C2= =A0 =C2=A0 =C2=A0 =3D 0;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0treq.sidx=C2=A0 =C2=A0 =3D 0;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0vreq=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0=3D = calloc(1, sizeof(td_vbd_request_t));
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0treq.private =3D vreq;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if(!vreq)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -1;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0vreq->submitting =3D 1;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0INIT_LIST_HEAD(&vreq->next);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0tapdisk_vbd_move_request(treq.private, &am= p;device_vbd->pending_requests);
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* TODO:
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * we should probably leave it up to the c= aller to forward the request */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0td_forward_request(treq);
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0vreq->submitting--;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> -}
> -
> -
> -/* htt= p://www.concentric.net/~Ttwang/tech/inthash.htm */
> -static unsigned int uint64_hash(void* k)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t key =3D *(uint64_t*)k;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0key =3D (~key) + (key << 18);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0key =3D key ^ (key >> 31);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0key =3D key * 21;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0key =3D key ^ (key >> 11);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0key =3D key + (key << 6);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0key =3D key ^ (key >> 22);
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return (unsigned int)key;
> -}
> -
> -static int rd_hash_equal(void* k1, void* k2)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t key1, key2;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0key1 =3D *(uint64_t*)k1;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0key2 =3D *(uint64_t*)k2;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return key1 =3D=3D key2;
> -}
> -
> -static int ramdisk_read(struct ramdisk* ramdisk, uint64_t sector,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0int nb_sectors, char* buf)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int i;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0char* v;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t key;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0for (i =3D 0; i < nb_sectors; i++) { > -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0key =3D sector= + i;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/* check wheth= er it is queued in a previous flush request */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(ramdisk-= >prev && (v =3D hashtable_search(ramdisk->prev, &key)))) = {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0/* check whether it is an ongoing flush */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0if (!(ramdisk->inprogress && (v =3D hashtable_sear= ch(ramdisk->inprogress, &key))))
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -1;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0memcpy(buf + i= * ramdisk->sector_size, v, ramdisk->sector_size);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> -}
> -
> -static int ramdisk_write_hash(struct hashtable* h, uint64_t sector, c= har* buf,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0size_t len)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0char* v;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t* key;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if ((v =3D hashtable_search(h, &sector= ))) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0memcpy(v, buf,= len);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(v =3D malloc(len))) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0DPRINTF("= ramdisk_write_hash: malloc failed\n");
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -1;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0memcpy(v, buf, len);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(key =3D malloc(sizeof(*key)))) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0DPRINTF("= ramdisk_write_hash: error allocating key\n");
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0free(v);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -1;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0*key =3D sector;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!hashtable_insert(h, key, v)) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0DPRINTF("= ramdisk_write_hash failed on sector %" PRIu64 "\n", sector);=
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0free(key);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0free(v);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -1;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> -}
> -
> -static inline int ramdisk_write(struct ramdisk* ramdisk, uint64_t sec= tor,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0int nb_sectors, char* buf)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int i, rc;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0for (i =3D 0; i < nb_sectors; i++) { > -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0rc =3D ramdisk= _write_hash(ramdisk->h, sector + i,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0buf += i * ramdisk->sector_size,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0ramdi= sk->sector_size);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (rc)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0return rc;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> -}
> -
> -static int uint64_compare(const void* k1, const void* k2)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t u1 =3D *(uint64_t*)k1;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t u2 =3D *(uint64_t*)k2;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* u1 - u2 is unsigned */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return u1 < u2 ? -1 : u1 > u2 ? 1 : = 0;
> -}
> -
> -/* set psectors to an array of the sector numbers in the hash, return= ing
> - * the number of entries (or -1 on error) */
> -static int ramdisk_get_sectors(struct hashtable* h, uint64_t** psecto= rs)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0struct hashtable_itr* itr;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t* sectors;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int count;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(count =3D hashtable_count(h)))
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(*psectors =3D malloc(count * sizeof(= uint64_t)))) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0DPRINTF("= ramdisk_get_sectors: error allocating sector map\n");
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -1;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0sectors =3D *psectors;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0itr =3D hashtable_iterator(h);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0count =3D 0;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0do {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0sectors[count+= +] =3D *(uint64_t*)hashtable_iterator_key(itr);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0} while (hashtable_iterator_advance(itr));=
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0free(itr);
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return count;
> -}
> -
> -/*
> -=C2=A0 return -1 for OOM
> -=C2=A0 return -2 for merge lookup failure
> -=C2=A0 return -3 for WAW race
> -=C2=A0 return 0 on success.
> -*/
> -static int merge_requests(struct ramdisk* ramdisk, uint64_t start, > -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0size_t count, char **mergedbuf)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0char* buf;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0char* sector;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int i;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t *key;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int rc =3D 0;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(buf =3D valloc(count * ramdisk->s= ector_size))) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0DPRINTF("= merge_request: allocation failed\n");
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -1;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0for (i =3D 0; i < count; i++) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(sector = =3D hashtable_search(ramdisk->prev, &start))) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0DPRINTF("merge_request: lookup failed on %"PRIu64&q= uot;\n", start);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0free(buf);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0rc =3D -2;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0goto fail;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/* Check inpro= gress requests to avoid waw non-determinism */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (hashtable_= search(ramdisk->inprogress, &start)) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0DPRINTF("merge_request: WAR RACE on %"PRIu64"\= n", start);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0free(buf);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0rc =3D -3;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0goto fail;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/* Insert req = into inprogress (brief period of duplication of hash entries until
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 * they are re= moved from prev. Read tracking would not be reading wrong entries)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(key =3D = malloc(sizeof(*key)))) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0DPRINTF("%s: error allocating key\n", __FUNCTION__)= ;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0free(buf);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0rc =3D -1;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0goto fail;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0*key =3D start= ;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (!hashtable= _insert(ramdisk->inprogress, key, NULL)) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0DPRINTF("%s failed to insert sector %" PRIu64 "= ; into inprogress hash\n",
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0__FUNCTION__, start);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0free(key);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0free(buf);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0rc =3D -1;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0goto fail;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0memcpy(buf + i= * ramdisk->sector_size, sector, ramdisk->sector_size);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0start++;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0*mergedbuf =3D buf;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> -fail:
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0for (start--; i >0; i--, start--)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0hashtable_remo= ve(ramdisk->inprogress, &start);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return rc;
> -}
> -
> -/* The underlying driver may not handle having the whole ramdisk queu= ed at
> - * once. We queue what we can and let the callbacks attempt to queue = more. */
> -/* NOTE: may be called from callback, while dd->private still belo= ngs to
> - * the underlying driver */
> -static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s= )
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t* sectors;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0char* buf =3D NULL;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t base, batchlen;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int i, j, count =3D 0;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0// RPRINTF("ramdisk flush\n"); > -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if ((count =3D ramdisk_get_sectors(s->r= amdisk.prev, &sectors)) <=3D 0)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return count;<= br> > -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* Create the inprogress table if empty */=
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!s->ramdisk.inprogress)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0s->ramdisk.= inprogress =3D create_hashtable(RAMDISK_HASHSIZE,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_hash,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0rd_hash_equal);
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/*
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("ramdisk: flushing %d = sectors\n", count);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0*/
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* sort and merge sectors to improve disk = performance */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0qsort(sectors, count, sizeof(*sectors), ui= nt64_compare);
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0for (i =3D 0; i < count;) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0base =3D secto= rs[i++];
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0while (i < = count && sectors[i] =3D=3D sectors[i-1] + 1)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0i++;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0batchlen =3D s= ectors[i-1] - base + 1;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0j =3D merge_re= quests(&s->ramdisk, base, batchlen, &buf);
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (j) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0RPRINTF("ramdisk_flush: merge_requests failed:%s\n"= ,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0j =3D=3D -1? "OOM": (j= =3D=3D-2? "missing sector" : "WAW race"));
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0if (j =3D=3D -3) continue;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0free(sectors);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0return -1;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/* NOTE: creat= e_write_request() creates a treq AND forwards it down
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 * the driver = chain */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0// RPRINTF(&qu= ot;forwarding write request at %" PRIu64 ", length: %" PRIu6= 4 "\n", base, batchlen);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0create_write_r= equest(s, base, batchlen, buf);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0//RPRINTF(&quo= t;write request at %" PRIu64 ", length: %" PRIu64 " for= warded\n", base, batchlen);
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0s->ramdisk.= inflight++;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0for (j =3D 0; = j < batchlen; j++) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0buf =3D hashtable_search(s->ramdisk.prev, &base);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0free(buf);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0hashtable_remove(s->ramdisk.prev, &base);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0base++;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!hashtable_count(s->ramdisk.prev)) = {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/* everything = is in flight */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0hashtable_dest= roy(s->ramdisk.prev, 0);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0s->ramdisk.= prev =3D NULL;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0free(sectors);
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0// RPRINTF("ramdisk flush done\n"= ;);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> -}
> -
> -/* flush ramdisk contents to disk */
> -static int ramdisk_start_flush(td_driver_t *driver)
> -{
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0struct tdremus_state *s =3D (struct tdremu= s_state *)driver->data;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t* key;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0char* buf;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int rc =3D 0;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0int i, j, count, batchlen;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t* sectors;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!hashtable_count(s->ramdisk.h)) { > -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/*
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF= ("Nothing to flush\n");
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0*/
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (s->ramdisk.prev) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/* a flush req= uest issued while a previous flush is still in progress
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 * will merge = with the previous request. If you want the previous
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 * request to = be consistent, wait for it to complete. */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if ((count =3D= ramdisk_get_sectors(s->ramdisk.h, &sectors)) < 0)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0return count;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0for (i =3D 0; = i < count; i++) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0buf =3D hashtable_search(s->ramdisk.h, sectors + i);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0ramdisk_write_hash(s->ramdisk.prev, sectors[i], buf,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 s->ramdisk.sector_size);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0free(sectors);=
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0hashtable_dest= roy (s->ramdisk.h, 1);
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0} else
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0s->ramdisk.= prev =3D s->ramdisk.h;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/* We create a new hashtable so that new w= rites can be performed before
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * the old hashtable is completely drained= . */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0s->ramdisk.h =3D create_hashtable(RAMDI= SK_HASHSIZE, uint64_hash,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0rd_ha= sh_equal);
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return ramdisk_flush(driver, s);
> -}
> -
> -
> =C2=A0static int ramdisk_start(td_driver_t *driver)
> =C2=A0{
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 struct tdremus_state *s =3D (struct tdremu= s_state *)driver->data;
>
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (s->ramdisk.h) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0RPRINTF("= ramdisk already allocated\n");
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> -
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 s->ramdisk.sector_size =3D driver->i= nfo.sector_size;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0s->ramdisk.h =3D create_hashtable(RAMDI= SK_HASHSIZE, uint64_hash,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0rd_ha= sh_equal);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0s->ramdisk.log_prefix =3D "remus&q= uot;;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0s->ramdisk.image =3D remus_image;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0ramdisk_init(&s->ramdisk);
>
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 DPRINTF("Ramdisk started, %zu bytes/s= ector\n", s->ramdisk.sector_size);
>
> @@ -917,13 +478,9 @@ static int client_flush(td_driver_t *driver)
> =C2=A0static int server_flush(td_driver_t *driver)
> =C2=A0{
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 struct tdremus_state *s =3D (struct tdremu= s_state *)driver->data;
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0/*
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 * Nothing to flush in beginning.
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!s->ramdisk.prev)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 /* Try to flush any remaining requests */<= br> > -=C2=A0 =C2=A0 =C2=A0 =C2=A0return ramdisk_flush(driver, s);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return ramdisk_flush_pended_requests(&= s->ramdisk);
> =C2=A0}
>
> =C2=A0/* It is called when switching the mode from primary to unprotec= ted */
> @@ -1030,10 +587,7 @@ static inline int server_writes_inflight(td_driv= er_t *driver)
> =C2=A0{
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 struct tdremus_state *s =3D (struct tdremu= s_state *)driver->data;
>
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!s->ramdisk.inflight && !s-= >ramdisk.prev)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> -
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0return 1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return ramdisk_writes_inflight(&s->= ramdisk);
> =C2=A0}
>
> =C2=A0/* Due to block device prefetching this code may be called on th= e server side
> @@ -1116,7 +670,9 @@ static void server_do_wreq(td_driver_t *driver) > =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (mread(s->stream_fd.fd, buf, len) &l= t; 0)
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 goto err;
>
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (ramdisk_write(&s->ramdisk, *sec= tor, *sectors, buf) < 0) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (ramdisk_cache_write_request(&s->= ;ramdisk, *sector, *sectors,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0drive= r->info.sector_size, buf,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0"= ;remus") < 0) {
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 rc =3D ERROR_I= NTERNAL;
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 goto err;
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
> @@ -1137,7 +693,7 @@ static void server_do_creq(td_driver_t *driver) >
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 // RPRINTF("committing buffer\n"= );
>
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0ramdisk_start_flush(driver);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0ramdisk_start_flush(&s->ramdisk); >
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 /* XXX this message should not be sent unt= il flush completes! */
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (write(s->stream_fd.fd, TDREMUS_DONE= , strlen(TDREMUS_DONE)) !=3D 4)
> @@ -1184,12 +740,7 @@ void unprotected_queue_read(td_driver_t *driver,= td_request_t treq)
>
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 /* wait for previous ramdisk to flush=C2= =A0 before servicing reads */
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (server_writes_inflight(driver)) {
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/* for now let= s just return EBUSY.
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 * if there ar= e any left-over requests in prev,
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 * kick em aga= in.
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if(!s->ramd= isk.inflight) /* nothing in inprogress */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0ramdisk_flush(driver, s);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0ramdisk_flush_= pended_requests(&s->ramdisk);
>
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 td_complete_re= quest(treq, -EBUSY);
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
> @@ -1207,8 +758,7 @@ void unprotected_queue_write(td_driver_t *driver,= td_request_t treq)
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 /* wait for previous ramdisk to flush */ > =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (server_writes_inflight(driver)) {
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 RPRINTF("= queue_write: waiting for queue to drain");
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if(!s->ramd= isk.inflight) /* nothing in inprogress. Kick prev */
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0ramdisk_flush(driver, s);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0ramdisk_flush_= pended_requests(&s->ramdisk);
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 td_complete_re= quest(treq, -EBUSY);
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 else {
> @@ -1518,9 +1068,7 @@ static int tdremus_close(td_driver_t *driver) > =C2=A0 =C2=A0 =C2=A0 =C2=A0 struct tdremus_state *s =3D (struct tdremu= s_state *)driver->data;
>
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 RPRINTF("closing\n");
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0if (s->ramdisk.inprogress)
> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0hashtable_dest= roy(s->ramdisk.inprogress, 0);
> -
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0ramdisk_destroy(&s->ramdisk);
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 td_replication_connect_kill(&s->t);=
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 ctl_unregister(s);
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 ctl_close(s);
> diff --git a/tools/blktap2/drivers/block-replication.c b/tools/blktap2= /drivers/block-replication.c
> index e4b2679..82d7609 100644
> --- a/tools/blktap2/drivers/block-replication.c
> +++ b/tools/blktap2/drivers/block-replication.c
> @@ -15,6 +15,10 @@
>
> =C2=A0#include "tapdisk-server.h"
> =C2=A0#include "block-replication.h"
> +#include "tapdisk-interface.h"
> +#include "hashtable.h"
> +#include "hashtable_itr.h"
> +#include "hashtable_utility.h"
>
> =C2=A0#include <string.h>
> =C2=A0#include <errno.h>
> @@ -30,6 +34,8 @@
> =C2=A0#define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "%s: " _= f, log_prefix, ## _a)
> =C2=A0#define EPRINTF(_f, _a...) syslog (LOG_ERR, "%s: " _f,= log_prefix, ## _a)
>
> +#define RAMDISK_HASHSIZE 128
> +
> =C2=A0/* connection status */
> =C2=A0enum {
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 connection_none,
> @@ -466,3 +472,457 @@ static void td_replication_connect_event(event_i= d_t id, char mode,
> =C2=A0fail:
> =C2=A0 =C2=A0 =C2=A0 =C2=A0 td_replication_client_failed(t, rc);
> =C2=A0}
> +
> +
> +/* I/O replication */
> +static void replicated_write_callback(td_request_t treq, int err)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0ramdisk_t *ramdisk =3D treq.cb_data;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0td_vbd_request_t *vreq =3D treq.private; > +=C2=A0 =C2=A0 =C2=A0 =C2=A0int i;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t start;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0const char *log_prefix =3D ramdisk->log= _prefix;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/* the write failed for now, lets panic. t= his is very bad */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (err) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0EPRINTF("= ramdisk write failed, disk image is not consistent\n");
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0exit(-1);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/*
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * The write succeeded. let's pull the= vreq off whatever request list
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * it is on and free() it
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0list_del(&vreq->next);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0free(vreq);
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0ramdisk->inflight--;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0start =3D treq.sec;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0for (i =3D 0; i < treq.secs; i++) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0hashtable_remo= ve(ramdisk->inprogress, &start);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0start++;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0free(treq.buf);
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!ramdisk->inflight && ramdi= sk->prev)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0ramdisk_flush_= pended_requests(ramdisk);
> +}
> +
> +static int
> +create_write_request(ramdisk_t *ramdisk, td_sector_t sec, int secs, c= har *buf)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0td_request_t treq;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0td_vbd_request_t *vreq;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0td_vbd_t *vbd =3D ramdisk->image->pr= ivate;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0treq.op=C2=A0 =C2=A0 =C2=A0 =3D TD_OP_WRIT= E;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0treq.buf=C2=A0 =C2=A0 =C2=A0=3D buf;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0treq.sec=C2=A0 =C2=A0 =C2=A0=3D sec;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0treq.secs=C2=A0 =C2=A0 =3D secs;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0treq.image=C2=A0 =C2=A0=3D ramdisk->ima= ge;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0treq.cb=C2=A0 =C2=A0 =C2=A0 =3D replicated= _write_callback;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0treq.cb_data =3D ramdisk;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0treq.id=C2= =A0 =C2=A0 =C2=A0 =3D 0;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0treq.sidx=C2=A0 =C2=A0 =3D 0;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0vreq=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0=3D = calloc(1, sizeof(td_vbd_request_t));
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0treq.private =3D vreq;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if(!vreq)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -1;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0vreq->submitting =3D 1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0INIT_LIST_HEAD(&vreq->next);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0tapdisk_vbd_move_request(treq.private, &am= p;vbd->pending_requests);
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0td_forward_request(treq);
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0vreq->submitting--;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +}
> +
> +/* htt= p://www.concentric.net/~Ttwang/tech/inthash.htm */
> +static unsigned int uint64_hash(void *k)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t key =3D *(uint64_t*)k;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0key =3D (~key) + (key << 18);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0key =3D key ^ (key >> 31);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0key =3D key * 21;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0key =3D key ^ (key >> 11);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0key =3D key + (key << 6);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0key =3D key ^ (key >> 22);
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return (unsigned int)key;
> +}
> +
> +static int rd_hash_equal(void *k1, void *k2)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t key1, key2;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0key1 =3D *(uint64_t*)k1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0key2 =3D *(uint64_t*)k2;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return key1 =3D=3D key2;
> +}
> +
> +static int uint64_compare(const void *k1, const void *k2)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t u1 =3D *(uint64_t*)k1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t u2 =3D *(uint64_t*)k2;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/* u1 - u2 is unsigned */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return u1 < u2 ? -1 : u1 > u2 ? 1 : = 0;
> +}
> +
> +static struct hashtable *ramdisk_new_hashtable(void)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return create_hashtable(RAMDISK_HASHSIZE, = uint64_hash, rd_hash_equal);
> +}
> +
> +/*
> + * set psectors to an array of the sector numbers in the hash, return= ing
> + * the number of entries (or -1 on error)
> + */
> +static int ramdisk_get_sectors(struct hashtable *h, uint64_t **psecto= rs,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 const char *log_prefix)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0struct hashtable_itr* itr;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t* sectors;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int count;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(count =3D hashtable_count(h)))
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(*psectors =3D malloc(count * sizeof(= uint64_t)))) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0DPRINTF("= ramdisk_get_sectors: error allocating sector map\n");
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0sectors =3D *psectors;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0itr =3D hashtable_iterator(h);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0count =3D 0;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0do {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0sectors[count+= +] =3D *(uint64_t*)hashtable_iterator_key(itr);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0} while (hashtable_iterator_advance(itr));=
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0free(itr);
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return count;
> +}
> +
> +static int ramdisk_write_hash(struct hashtable *h, uint64_t sector, c= har *buf,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0size_t len, const char *log_prefix)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0char *v;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t *key;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if ((v =3D hashtable_search(h, &sector= ))) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0memcpy(v, buf,= len);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(v =3D malloc(len))) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0DPRINTF("= ramdisk_write_hash: malloc failed\n");
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0memcpy(v, buf, len);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(key =3D malloc(sizeof(*key)))) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0DPRINTF("= ramdisk_write_hash: error allocating key\n");
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0free(v);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0*key =3D sector;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!hashtable_insert(h, key, v)) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0DPRINTF("= ramdisk_write_hash failed on sector %" PRIu64 "\n", sector);=
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0free(key);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0free(v);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +}
> +
> +/*
> + * return -1 for OOM
> + * return -2 for merge lookup failure(should not happen)
> + * return -3 for WAW race
> + * return 0 on success.
> + */
> +static int merge_requests(ramdisk_t *ramdisk, uint64_t start,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0size_t count, char **mergedbuf)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0char* buf;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0char* sector;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int i;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t *key;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int rc =3D 0;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0const char *log_prefix =3D ramdisk->log= _prefix;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(buf =3D valloc(count * ramdisk->s= ector_size))) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0DPRINTF("= merge_request: allocation failed\n");
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0for (i =3D 0; i < count; i++) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(sector = =3D hashtable_search(ramdisk->prev, &start))) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0EPRINTF("merge_request: lookup failed on %"PRIu64&q= uot;\n",
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0start);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0free(buf);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0rc =3D -2;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0goto fail;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/* Check inpro= gress requests to avoid waw non-determinism */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (hashtable_= search(ramdisk->inprogress, &start)) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0DPRINTF("merge_request: WAR RACE on %"PRIu64"\= n",
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0start);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0free(buf);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0rc =3D -3;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0goto fail;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/*
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 * Insert req = into inprogress (brief period of duplication of
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 * hash entrie= s until they are removed from prev. Read tracking
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 * would not b= e reading wrong entries)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(key =3D = malloc(sizeof(*key)))) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0EPRINTF("%s: error allocating key\n", __FUNCTION__)= ;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0free(buf);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0rc =3D -1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0goto fail;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0*key =3D start= ;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (!hashtable= _insert(ramdisk->inprogress, key, NULL)) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0EPRINTF("%s failed to insert sector %" PRIu64 "= ; into inprogress hash\n",
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0__FUNCTION__, start);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0free(key);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0free(buf);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0rc =3D -1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0goto fail;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0memcpy(buf + i= * ramdisk->sector_size, sector, ramdisk->sector_size);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0start++;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0*mergedbuf =3D buf;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +fail:
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0for (start--; i > 0; i--, start--)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0hashtable_remo= ve(ramdisk->inprogress, &start);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return rc;
> +}
> +
> +#define HASHTABLE_DESTROY(hashtable, free)=C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0\
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0do {=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (hashtable)= {=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 \
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0hashtable_destroy(hashtable, free);=C2=A0 =C2=A0 =C2=A0\
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0hashtable =3D NULL;=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0\
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}=C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0\
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0} while (0)
> +
> +int ramdisk_init(ramdisk_t *ramdisk)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0ramdisk->inflight =3D 0;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0ramdisk->prev =3D NULL;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0ramdisk->inprogress =3D NULL;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0ramdisk->primary_cache =3D ramdisk_new_= hashtable();
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!ramdisk->primary_cache)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -1;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +}
> +
> +void ramdisk_destroy(ramdisk_t *ramdisk)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0const char *log_prefix =3D ramdisk->log= _prefix;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/*
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * ramdisk_destroy() is called only when w= e will close the tapdisk image.
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * In this case, there are no pending requ= ests in vbd.
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 *
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * If ramdisk->inflight is not 0, it me= ans that the requests created by
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * us are still in vbd->pending_request= s.
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (ramdisk->inflight) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/* should not = happen */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0EPRINTF("= cannot destroy ramdisk\n");
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0HASHTABLE_DESTROY(ramdisk->inprogress, = 0);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0HASHTABLE_DESTROY(ramdisk->prev, 1); > +=C2=A0 =C2=A0 =C2=A0 =C2=A0HASHTABLE_DESTROY(ramdisk->primary_cach= e, 1);
> +}
> +
> +int ramdisk_read(ramdisk_t *ramdisk, uint64_t sector,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 int nb_sector= s, char *buf)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int i;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0char *v;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t key;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0for (i =3D 0; i < nb_sectors; i++) { > +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0key =3D sector= + i;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/* check wheth= er it is queued in a previous flush request */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (!(ramdisk-= >prev &&
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= (v =3D hashtable_search(ramdisk->prev, &key)))) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0/* check whether it is an ongoing flush */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0if (!(ramdisk->inprogress &&
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0(v =3D hashtable_search(ramdisk->inprogress,= &key))))
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0memcpy(buf + i= * ramdisk->sector_size, v, ramdisk->sector_size);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +}
> +
> +int ramdisk_cache_write_request(ramdisk_t *ramdisk, uint64_t sector,<= br> > +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0int nb_sectors, size_t sector_siz= e,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0char *buf, const char *log_prefix= )
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int i, rc;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0for (i =3D 0; i < nb_sectors; i++) { > +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0rc =3D ramdisk= _write_hash(ramdisk->primary_cache, sector + i,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0buf += i * sector_size,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0secto= r_size, log_prefix);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (rc)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0return rc;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +}
> +
> +int ramdisk_flush_pended_requests(ramdisk_t *ramdisk)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t *sectors;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0char *buf =3D NULL;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t base, batchlen;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int i, j, count =3D 0;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0const char *log_prefix =3D ramdisk->log= _prefix;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/* everything is in flight */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!ramdisk->prev)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0count =3D ramdisk_get_sectors(ramdisk->= prev, &sectors, log_prefix);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (count <=3D 0)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/* should not = happen */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return count;<= br> > +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/* Create the inprogress table if empty */=
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!ramdisk->inprogress) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0ramdisk->in= progress =3D ramdisk_new_hashtable();
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (!ramdisk-&= gt;inprogress) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0EPRINTF("ramdisk_flush: creating the inprogress table fa= iled:OOM\n");
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0return -1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/* sort and merge sectors to improve disk = performance */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0qsort(sectors, count, sizeof(*sectors), ui= nt64_compare);
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0for (i =3D 0; i < count;) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0base =3D secto= rs[i++];
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0while (i < = count && sectors[i] =3D=3D sectors[i-1] + 1)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0i++;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0batchlen =3D s= ectors[i-1] - base + 1;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0j =3D merge_re= quests(ramdisk, base, batchlen, &buf);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (j) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0EPRINTF("ramdisk_flush: merge_requests failed:%s\n"= ,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0j =3D=3D -1 ? "OOM" : > +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0(j = =3D=3D -2 ? "missing sector" :
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 "WAW race"));
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0if (j =3D=3D -3)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0continue;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0free(sectors);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0return -1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/*
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 * NOTE: creat= e_write_request() creates a treq AND forwards
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 * it down the= driver chain
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 *
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 * TODO: handl= e create_write_request()'s error.
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0create_write_r= equest(ramdisk, base, batchlen, buf);
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0ramdisk->in= flight++;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0for (j =3D 0; = j < batchlen; j++) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0buf =3D hashtable_search(ramdisk->prev, &base);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0free(buf);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0hashtable_remove(ramdisk->prev, &base);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0base++;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!hashtable_count(ramdisk->prev)) > +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/* everything = is in flight */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0HASHTABLE_DEST= ROY(ramdisk->prev, 0);
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0free(sectors);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +}
> +
> +int ramdisk_start_flush(ramdisk_t *ramdisk)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t *key;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0char *buf;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int rc =3D 0;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0int i, j, count, batchlen;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0uint64_t *sectors;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0const char *log_prefix =3D ramdisk->log= _prefix;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0struct hashtable *cache;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0cache =3D ramdisk->primary_cache;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!hashtable_count(cache))
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (ramdisk->prev) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0/*
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 * a flush req= uest issued while a previous flush is still in
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 * progress wi= ll merge with the previous request. If you want
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 * the previou= s request to be consistent, wait for it to
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 * complete. > +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0count =3D ramd= isk_get_sectors(cache, &sectors, log_prefix);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0if (count <= 0 )
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0return count;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0for (i =3D 0; = i < count; i++) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0buf =3D hashtable_search(cache, sectors + i);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0ramdisk_write_hash(ramdisk->prev, sectors[i], buf,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 ramdisk->sector_size, log_prefix);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0free(sectors);=
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0hashtable_dest= roy(cache, 1);
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0} else
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0ramdisk->pr= ev =3D cache;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/*
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * We create a new hashtable so that new w= rites can be performed before
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * the old hashtable is completely drained= .
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0ramdisk->primary_cache =3D ramdisk_new_= hashtable();
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!ramdisk->primary_cache) {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0EPRINTF("= ramdisk_start_flush: creating cache table failed: OOM\n");
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return -1;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0}
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return ramdisk_flush_pended_requests(ramdi= sk);
> +}
> +
> +int ramdisk_writes_inflight(ramdisk_t *ramdisk)
> +{
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0if (!ramdisk->inflight && !ramd= isk->prev)
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0return 0;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0return 1;
> +}
> diff --git a/tools/blktap2/drivers/block-replication.h b/tools/blktap2= /drivers/block-replication.h
> index 358c08b..cbdac3c 100644
> --- a/tools/blktap2/drivers/block-replication.h
> +++ b/tools/blktap2/drivers/block-replication.h
> @@ -110,4 +110,69 @@ int td_replication_server_restart(td_replication_= connect_t *t);
> =C2=A0 */
> =C2=A0int td_replication_client_start(td_replication_connect_t *t); >
> +/* I/O replication */
> +typedef struct ramdisk ramdisk_t;
> +struct ramdisk {
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0size_t sector_size;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0const char *log_prefix;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0td_image_t *image;
> +
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/* private */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/* count of outstanding requests to the ba= se driver */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0size_t inflight;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/* prev holds the requests to be flushed, = while inprogress holds
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * requests being flushed. When requests c= omplete, they are removed
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * from inprogress.
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * Whenever a new flush is merged with ong= oing flush (i.e, prev),
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * we have to make sure that none of the n= ew requests overlap with
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * ones in "inprogress". If it d= oes, keep it back in prev and dont issue
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * IO until the current one finishes. If w= e allow this IO to proceed,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * we might end up with two "overlapp= ing" requests in the disk's queue and
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * the disk may not offer any guarantee on= which one is written first.
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * IOW, make sure we dont create a write-a= fter-write time ordering constraint.
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0struct hashtable *prev;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0struct hashtable *inprogress;
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0/*
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * The primary write request is queued in = this
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * hashtable, and will be flushed to ramdi= sk when
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 * the checkpoint finishes.
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 */
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0struct hashtable *primary_cache;
> +};
> +
> +int ramdisk_init(ramdisk_t *ramdisk);
> +void ramdisk_destroy(ramdisk_t *ramdisk);
> +
> +/*
> + * try to read from ramdisk. Return -1 if some sectors are not in
> + * ramdisk. Otherwise, return 0.
> + */
> +int ramdisk_read(ramdisk_t *ramdisk, uint64_t sector,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 int nb_sector= s, char *buf);
> +
> +/*
> + * cache the write requests, and it will be flushed after a
> + * new checkpoint finishes
> + */
> +int ramdisk_cache_write_request(ramdisk_t *ramdisk, uint64_t sector,<= br> > +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0int nb_sectors, size_t sector_siz= e,
> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0char* buf, const char *log_prefix= );
> +
> +/* flush pended write requests to disk */
> +int ramdisk_flush_pended_requests(ramdisk_t *ramdisk);
> +/*
> + * flush cached write requests to disk. If WAW is detected, the cache= d
> + * write requests will be moved to pended queue. The pended write
> + * requests will be auto flushed after all inprogress write requests<= br> > + * are flushed to disk. This function don't wait all write reques= ts
> + * are flushed to disk.
> + */
> +int ramdisk_start_flush(ramdisk_t *ramdisk);
> +/*
> + * Return true if some write reqeusts are inprogress or pended,
> + * otherwise return false
> + */
> +int ramdisk_writes_inflight(ramdisk_t *ramdisk);
> +
> =C2=A0#endif
> --
> 1.9.3
>

Acked-by: Shriram Rajagopalan <rshriram@cs.ubc.ca>

--089e01183eda1e49750505d1ce3b-- --===============3653136825524742269== Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Content-Disposition: inline _______________________________________________ Xen-devel mailing list Xen-devel@lists.xen.org http://lists.xen.org/xen-devel --===============3653136825524742269==--