* [Cluster-devel] cluster/cmirror/src cluster.c cluster.h functi ...
@ 2008-02-08 14:30 jbrassow
0 siblings, 0 replies; only message in thread
From: jbrassow @ 2008-02-08 14:30 UTC (permalink / raw)
To: cluster-devel.redhat.com
CVSROOT: /cvs/cluster
Module name: cluster
Branch: RHEL5
Changes by: jbrassow at sourceware.org 2008-02-08 14:30:10
Modified files:
cmirror/src : cluster.c cluster.h functions.c functions.h
local.c
Log message:
- stop delaying disk log writes
- stop placing requests into the startup queue before initial config
- added recovering_region to checkpoint data to prevent duplicate region
syncing assignment.
Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/cluster.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.15&r2=1.1.2.16
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/cluster.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.2&r2=1.1.2.3
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.13&r2=1.1.2.14
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.4&r2=1.1.2.5
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/local.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.13&r2=1.1.2.14
--- cluster/cmirror/src/Attic/cluster.c 2008/02/06 23:03:05 1.1.2.15
+++ cluster/cmirror/src/Attic/cluster.c 2008/02/08 14:30:10 1.1.2.16
@@ -27,6 +27,13 @@
static SaCkptCallbacksT callbacks = { 0, 0 };
static SaVersionT version = { 'B', 1, 1 };
+#define DEBUGGING_HISTORY 20
+static char debugging[DEBUGGING_HISTORY][128];
+static int idx = 0;
+static int memberz = 0;
+static int doit = 0;
+
+
struct checkpoint_data {
uint32_t requester;
char uuid[CPG_MAX_NAME_LENGTH];
@@ -34,6 +41,7 @@
int bitmap_size; /* in bytes */
char *sync_bits;
char *clean_bits;
+ char *recovering_region;
struct checkpoint_data *next;
};
@@ -58,44 +66,18 @@
static struct list_head clog_cpg_list;
/*
- * flow_control
- * @handle
- *
- * Returns: 1 if flow control needed, 0 otherwise
- */
-static int flow_control(cpg_handle_t handle)
-{
- cpg_flow_control_state_t flow_control_state;
- cpg_error_t error;
-
- /* FIXME: no flow control for now (cmirror should self regulate) */
- return 0;
-
- error = cpg_flow_control_state_get(handle, &flow_control_state);
- if (error != CPG_OK) {
- LOG_ERROR("Failed to get flow control state. Reason: %d", error);
- /* FIXME: Better error handling */
- return 0;
- }
-
- return (flow_control_state == CPG_FLOW_CONTROL_ENABLED) ? 1 : 0;
-}
-
-/*
* cluster_send
* @tfr
*
* Returns: 0 on success, -Exxx on error
*/
-int cluster_send(struct clog_tfr *tfr)
+static int cluster_send(struct clog_tfr *tfr)
{
int r;
int found;
struct iovec iov;
struct clog_cpg *entry, *tmp;
- ENTER();
-
list_for_each_entry_safe(entry, tmp, &clog_cpg_list, list)
if (!strncmp(entry->name.value, tfr->uuid, CPG_MAX_NAME_LENGTH)) {
found = 1;
@@ -104,26 +86,35 @@
if (!found) {
tfr->error = -ENOENT;
- EXIT();
return -ENOENT;
}
iov.iov_base = tfr;
iov.iov_len = sizeof(struct clog_tfr) + tfr->data_size;
- while (flow_control(entry->handle)) {
- /*
- * FIXME: Don't need to sleep this long
- *
- * ... or, we could dispatch the queued messages here.
- */
- LOG_PRINT("Flow control enabled. Delaying msg [%s]",
- RQ_TYPE(tfr->request_type));
- sleep(1);
- }
+
r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1);
+ if (r == CPG_OK)
+ return 0;
+ if (r == SA_AIS_ERR_TRY_AGAIN)
+ return -EAGAIN;
+
+ LOG_ERROR("cpg_mcast_joined error: %d", r);
+
+ tfr->error = -EBADE;
+ return -EBADE;
+}
+
+int cluster_send_helper(struct clog_tfr *tfr, int line, char *file, const char *function)
+{
+ int r;
+
+ do {
+ r = cluster_send(tfr);
+ if (r)
+ LOG_ERROR("cluster_send failed at: %s:%d (%s)",
+ file, line, function);
+ } while (r == -EAGAIN);
- EXIT();
- tfr->error = r = (r == CPG_OK) ? 0 : -EBADE;
return r;
}
@@ -137,7 +128,7 @@
return r;
}
-static int handle_cluster_request(struct clog_tfr *tfr, int server)
+static int handle_cluster_request(struct clog_tfr *tfr, int server, int printz)
{
int r = 0;
@@ -152,27 +143,22 @@
*/
if ((tfr->request_type != DM_CLOG_RESUME) ||
(tfr->originator == my_cluster_id))
- r = do_request(tfr);
+ r = do_request(tfr, server);
if (server) {
- if (r)
- LOG_ERROR("do_request failed, unable to commit log");
- else
- r = commit_log(tfr);
-
tfr->request_type |= DM_CLOG_RESPONSE;
/*
* Errors from previous functions are in the tfr struct.
*/
-
- LOG_DBG("Sending response to %u on cluster: [%s/%llu]",
- tfr->originator,
- RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
- (unsigned long long)tfr->seq);
+ if (printz)
+ LOG_DBG("[%s] Sending response to %u on cluster: [%s/%llu]",
+ SHORT_UUID(tfr->uuid), tfr->originator,
+ RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
+ (unsigned long long)tfr->seq);
r = cluster_send(tfr);
if (r)
- LOG_ERROR("cluster_send failed");
+ LOG_ERROR("cluster_send failed: %s", strerror(-r));
}
EXIT();
@@ -209,6 +195,8 @@
INIT_LIST_HEAD(&l);
queue_remove_all(&l, cluster_queue);
LOG_ERROR("Current list:");
+ if (list_empty(&l))
+ LOG_ERROR(" [none]");
list_for_each_safe(p, n, &l) {
list_del_init(p);
t = (struct clog_tfr *)p;
@@ -257,6 +245,7 @@
static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry,
uint32_t cp_requester)
{
+ int r;
struct checkpoint_data *new;
new = malloc(sizeof(*new));
@@ -270,7 +259,7 @@
strncpy(new->uuid, entry->name.value, entry->name.length);
if (entry->valid) {
- new->bitmap_size = store_bits(entry->name.value, "clean_bits",
+ new->bitmap_size = push_state(entry->name.value, "clean_bits",
&new->clean_bits);
if (new->bitmap_size <= 0) {
LOG_ERROR("Failed to store clean_bits to checkpoint for node %u",
@@ -279,7 +268,7 @@
return NULL;
}
- new->bitmap_size = store_bits(entry->name.value,
+ new->bitmap_size = push_state(entry->name.value,
"sync_bits", &new->sync_bits);
if (new->bitmap_size <= 0) {
LOG_ERROR("Failed to store sync_bits to checkpoint for node %u",
@@ -288,6 +277,16 @@
free(new);
return NULL;
}
+
+ r = push_state(entry->name.value, "recovering_region", &new->recovering_region);
+ if (r <= 0) {
+ LOG_ERROR("Failed to store recovering_region to checkpoint for node %u",
+ new->requester);
+ free(new->sync_bits);
+ free(new->clean_bits);
+ free(new);
+ return NULL;
+ }
} else {
/*
* We can store bitmaps yet, because the log is not
@@ -309,6 +308,7 @@
*/
static void free_checkpoint(struct checkpoint_data *cp)
{
+ free(cp->recovering_region);
free(cp->sync_bits);
free(cp->clean_bits);
free(cp);
@@ -335,9 +335,9 @@
name.length = len;
attr.creationFlags = SA_CKPT_WR_ALL_REPLICAS;
- attr.checkpointSize = cp->bitmap_size * 2;
+ attr.checkpointSize = cp->bitmap_size * 2 + strlen(cp->recovering_region) + 1;
attr.retentionDuration = SA_TIME_MAX;
- attr.maxSections = 3; /* don't know why we need +1 */
+ attr.maxSections = 4; /* don't know why we need +1 */
attr.maxSectionSize = cp->bitmap_size;
attr.maxSectionIdSize = 22;
@@ -363,6 +363,7 @@
EXIT();
return -EIO; /* FIXME: better error */
}
+
/*
* Add section for sync_bits
*/
@@ -408,7 +409,7 @@
}
if (rv == SA_AIS_ERR_EXIST) {
- LOG_ERROR("export_checkpoint: clean checkpoint section already exists");
+ LOG_DBG("export_checkpoint: clean checkpoint section already exists");
EXIT();
return -EEXIST;
}
@@ -419,6 +420,35 @@
return -EIO; /* FIXME: better error */
}
+ /*
+ * Add section for recovering_region
+ */
+ section_id.idLen = snprintf(buf, 32, "recovering_region");
+ section_id.id = (unsigned char *)buf;
+ section_attr.sectionId = §ion_id;
+ section_attr.expirationTime = SA_TIME_END;
+
+rr_create_retry:
+ rv = saCkptSectionCreate(h, §ion_attr, cp->recovering_region,
+ strlen(cp->recovering_region) + 1);
+ if (rv == SA_AIS_ERR_TRY_AGAIN) {
+ LOG_ERROR("export_checkpoint: RR create retry");
+ sleep(1);
+ goto rr_create_retry;
+ }
+
+ if (rv == SA_AIS_ERR_EXIST) {
+ LOG_DBG("export_checkpoint: RR checkpoint section already exists");
+ EXIT();
+ return -EEXIST;
+ }
+
+ if (rv != SA_AIS_OK) {
+ LOG_ERROR("export_checkpoint: RR checkpoint section creation failed");
+ EXIT();
+ return -EIO; /* FIXME: better error */
+ }
+
LOG_DBG("export_checkpoint: closing checkpoint");
saCkptCheckpointClose(h);
@@ -515,7 +545,7 @@
break;
}
saCkptSectionIterationFinalize(itr);
- if (len != 2) {
+ if (len != 3) {
LOG_ERROR("import_checkpoint: %d checkpoint sections found", len);
sleep(1);
goto init_retry;
@@ -572,8 +602,9 @@
*/
if (iov.readSize) {
- if (load_bits(entry->name.value, (char *)desc.sectionId.id, bitmap, iov.readSize)) {
- LOG_ERROR("Error loading bits");
+ if (pull_state(entry->name.value, (char *)desc.sectionId.id, bitmap,
+ iov.readSize)) {
+ LOG_ERROR("Error loading state");
rtn = -EIO;
goto fail;
}
@@ -645,6 +676,7 @@
int i;
int r = 0;
int i_am_server;
+ int response = 0;
struct clog_tfr *tfr = msg;
struct clog_tfr *startup_tfr = NULL;
struct clog_cpg *match;
@@ -665,7 +697,9 @@
(unsigned long long)tfr->seq);
if (my_cluster_id == 0xDEAD) {
- LOG_DBG("Message before init... ignoring.\n");
+ LOG_DBG("[%s] Message from %u before init [%s/%llu]",
+ SHORT_UUID(tfr->uuid), nodeid,
+ RQ_TYPE(tfr->request_type), (unsigned long long) tfr->seq);
return;
}
@@ -674,6 +708,14 @@
LOG_ERROR("Unable to find clog_cpg for cluster message");
return;
}
+
+ if (match->lowest_id == 0xDEAD) {
+ LOG_DBG("[%s] Message from %u before init* [%s/%llu]",
+ SHORT_UUID(tfr->uuid), nodeid,
+ RQ_TYPE(tfr->request_type), (unsigned long long) tfr->seq);
+ return;
+ }
+
i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0;
if (tfr->request_type == DM_CLOG_CHECKPOINT_READY) {
@@ -691,7 +733,7 @@
LOG_DBG("Processing delayed request %d: %s",
match->startup_queue->count,
RQ_TYPE(startup_tfr->request_type));
- r = handle_cluster_request(startup_tfr, i_am_server);
+ r = handle_cluster_request(startup_tfr, i_am_server, 1);
if (r) {
LOG_ERROR("Error while processing delayed CPG message");
@@ -732,9 +774,10 @@
match->checkpoint_list = new;
}
- if (tfr->request_type & DM_CLOG_RESPONSE)
+ if (tfr->request_type & DM_CLOG_RESPONSE) {
+ response = 1;
r = handle_cluster_response(tfr);
- else {
+ } else {
tfr->originator = nodeid;
if (!match->valid) {
@@ -757,15 +800,40 @@
goto out;
}
- r = handle_cluster_request(tfr, i_am_server);
+ r = handle_cluster_request(tfr, i_am_server,
+ ((memberz != 4) || (--doit > 0)));
}
out:
if (r) {
- LOG_ERROR("[%s] Error while processing CPG message, %s: %d",
+ LOG_ERROR("[%s] Error while processing CPG message, %s: %s",
SHORT_UUID(tfr->uuid),
RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
- r);
+ strerror(-r));
+ LOG_ERROR("[%s] Response : %s", SHORT_UUID(tfr->uuid),
+ (response) ? "YES" : "NO");
+ LOG_ERROR("[%s] Originator: %u", SHORT_UUID(tfr->uuid), tfr->originator);
+ if (response)
+ LOG_ERROR("[%s] Responder : %u", SHORT_UUID(tfr->uuid), nodeid);
+ LOG_ERROR("HISTORY::");
+
+ for (i = 0; i < DEBUGGING_HISTORY; i++) {
+ idx++;
+ idx = idx % DEBUGGING_HISTORY;
+ if (debugging[idx][0] == '\0')
+ continue;
+ LOG_ERROR("%d:%d) %s", i, idx, debugging[idx]);
+ }
+ } else if (!(tfr->request_type & DM_CLOG_RESPONSE)) {
+ int len;
+ idx++;
+ idx = idx % DEBUGGING_HISTORY;
+ len = sprintf(debugging[idx], "SEQ#=%llu, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s",
+ (unsigned long long)tfr->seq, SHORT_UUID(tfr->uuid),
+ RQ_TYPE(tfr->request_type),
+ tfr->originator, (response) ? "YES" : "NO");
+ if (response)
+ sprintf(debugging[idx] + len, ", RSPR=%u", nodeid);
}
EXIT();
}
@@ -779,10 +847,12 @@
int my_pid = getpid();
int found = 0;
struct clog_cpg *match, *tmp;
- uint32_t lowest;
+ uint32_t lowest = 0xDEAD;
ENTER();
+ memberz = member_list_entries;
+
LOG_DBG("****** CPG config callback **[%s]**",
SHORT_UUID(gname->value));
@@ -821,6 +891,8 @@
goto out;
}
+ lowest = match->lowest_id;
+
/* Am I leaving? */
for (i = 0; i < left_list_entries; i++)
if (my_cluster_id == left_list[i].nodeid) {
@@ -863,6 +935,7 @@
free(match->startup_queue);
match->free_me = 1;
+ match->lowest_id = 0xDEAD;
goto out;
}
@@ -871,8 +944,6 @@
if (!left_list_entries &&
(member_list_entries == 1) && (joined_list_entries == 1) &&
(member_list[0].nodeid == joined_list[0].nodeid)) {
- LOG_DBG("[%s] I am the log server (and first to join)",
- SHORT_UUID(match->name.value));
match->lowest_id = my_cluster_id = joined_list[0].nodeid;
match->valid = 1;
goto out;
@@ -894,17 +965,15 @@
}
}
- lowest = match->lowest_id;
+ if (member_list_entries)
+ match->lowest_id = member_list[0].nodeid;
+ else
+ match->lowest_id = 0xDEAD;
/* Find the lowest_id, i.e. the server */
- for (i = 0, match->lowest_id = member_list[0].nodeid;
- i < member_list_entries; i++)
+ for (i = 0; i < member_list_entries; i++)
if (match->lowest_id > member_list[i].nodeid)
match->lowest_id = member_list[i].nodeid;
- if (lowest != match->lowest_id)
- LOG_DBG("[%s] Server is now %u", SHORT_UUID(match->name.value),
- match->lowest_id);
-
/*
* If I am part of the joining list, I do not send checkpoints
* FIXME: What are the cases where multiple nodes can join?
@@ -920,6 +989,21 @@
match->checkpoints_needed += i;
out:
+ if (lowest != match->lowest_id)
+ LOG_DBG("[%s] Server change %u -> %u (%u %s)",
+ SHORT_UUID(match->name.value),
+ lowest, match->lowest_id,
+ (joined_list_entries) ? joined_list[0].nodeid : left_list[0].nodeid,
+ (joined_list_entries && (member_list_entries == 1)) ?
+ "is first to join" : (joined_list_entries) ? "joined" : "left");
+ else
+ LOG_DBG("[%s] Server unchanged at %u (%u %s)",
+ SHORT_UUID(match->name.value), lowest,
+ (joined_list_entries) ? joined_list[0].nodeid : left_list[0].nodeid,
+ (joined_list_entries) ? "joined" : "left");
+
+ if (joined_list_entries && (joined_list[0].nodeid == my_cluster_id))
+ doit = 25;
EXIT();
}
@@ -1019,6 +1103,12 @@
ENTER();
+ {
+ int i;
+ for(i = 0; i < DEBUGGING_HISTORY; i++)
+ debugging[i][0] = '\0';
+ }
+
INIT_LIST_HEAD(&clog_cpg_list);
rv = saCkptInitialize(&ckpt_handle, &callbacks, &version);
--- cluster/cmirror/src/Attic/cluster.h 2008/01/23 21:21:06 1.1.2.2
+++ cluster/cmirror/src/Attic/cluster.h 2008/02/08 14:30:10 1.1.2.3
@@ -7,6 +7,7 @@
int create_cluster_cpg(char *str);
int destroy_cluster_cpg(char *str);
-int cluster_send(struct clog_tfr *tfr);
+int cluster_send_helper(struct clog_tfr *tfr, int line, char *file, const char *function);
+#define cluster_send(x) cluster_send_helper((x), __LINE__, __FILE__, __FUNCTION__)
#endif /* __CLUSTER_LOG_CLUSTER_DOT_H__ */
--- cluster/cmirror/src/Attic/functions.c 2008/02/06 23:03:05 1.1.2.13
+++ cluster/cmirror/src/Attic/functions.c 2008/02/08 14:30:10 1.1.2.14
@@ -33,14 +33,6 @@
uint64_t nr_regions;
};
-/*
- * Used by the 'touched' variable, these macros mean:
- * LOG_CHANGED - bits in the in-memory log have changed
- * LOG_FLUSH - log must be committed to disk
- */
-#define LOG_CHANGED 1
-#define LOG_FLUSH 2
-
struct log_c {
struct list_head list;
@@ -103,13 +95,13 @@
static void log_set_bit(struct log_c *lc, uint32_t *bs, unsigned bit)
{
ext2fs_set_bit(bit, (unsigned int *) bs);
- lc->touched |= LOG_CHANGED;
+ lc->touched = 1;
}
static void log_clear_bit(struct log_c *lc, uint32_t *bs, unsigned bit)
{
ext2fs_clear_bit(bit, (unsigned int *) bs);
- lc->touched |= LOG_CHANGED;
+ lc->touched = 1;
}
/* FIXME: Why aren't count and start the same type? */
@@ -205,7 +197,7 @@
if (r < 0) {
LOG_ERROR("rw_log: write failure: %s",
strerror(errno));
- return -EIO;
+ return -EIO; /* Failed disk write */
}
return 0;
}
@@ -216,7 +208,7 @@
LOG_ERROR("rw_log: read failure: %s",
strerror(errno));
if (r != lc->disk_size)
- return -EIO;
+ return -EIO; /* Failed disk read */
return 0;
}
@@ -239,7 +231,7 @@
memset(&lh, 0, sizeof(struct log_header));
if (rw_log(lc, 0))
- return -EIO;
+ return -EIO; /* Failed disk read */
header_from_disk(&lh, lc->disk_buffer);
if (lh.magic != MIRROR_MAGIC) {
@@ -285,8 +277,10 @@
bitset_size += (lc->region_count % 8) ? 1 : 0;
memcpy(lc->disk_buffer + 1024, lc->sync_bits, bitset_size);
- if (rw_log(lc, 1))
- return -EIO;
+ if (rw_log(lc, 1)) {
+ lc->log_dev_failed = 1;
+ return -EIO; /* Failed disk write */
+ }
return 0;
}
@@ -697,6 +691,7 @@
static int clog_resume(struct clog_tfr *tfr)
{
uint32_t i;
+ int commit_log = 0;
struct log_c *lc = get_log(tfr->uuid);
size_t size = lc->bitset_uint32_count * sizeof(uint32_t);
@@ -715,6 +710,7 @@
LOG_DBG("[%s] Master resume: reading disk log",
SHORT_UUID(lc->uuid));
lc->resume_override = 1000;
+ commit_log = 1;
break;
case 1:
LOG_ERROR("Error:: partial bit loading (just sync_bits)");
@@ -782,11 +778,14 @@
SHORT_UUID(lc->uuid), (unsigned long long)lc->sync_count);
lc->sync_search = 0;
- /*
- * We mark 'touched' as LOG_FLUSH so that only the master commits
- * the log via 'commit_log'
- */
- lc->touched = LOG_FLUSH;
+ if (commit_log && (lc->disk_fd >= 0)) {
+ tfr->error = write_log(lc);
+ if (tfr->error)
+ LOG_ERROR("Failed initial disk log write");
+ else
+ LOG_DBG("Disk log initialized");
+ lc->touched = 0;
+ }
out:
lc->state = LOG_RESUMED;
lc->recovery_halted = 0;
@@ -917,20 +916,34 @@
* @tfr
*
*/
-static int clog_flush(struct clog_tfr *tfr)
+static int clog_flush(struct clog_tfr *tfr, int server)
{
+ int r = 0;
struct log_c *lc = get_log(tfr->uuid);
-
+
if (!lc)
return -EINVAL;
- /*
- * Actual disk flush happens in 'commit_log()'
- * Clear LOG_CHANGED and set LOG_FLUSH
+ if (!lc->touched)
+ return 0;
+
+ /*
+ * Do the actual flushing of the log only
+ * if we are the server.
*/
- lc->touched = LOG_FLUSH;
+ if (server && (lc->disk_fd >= 0)) {
+ r = tfr->error = write_log(lc);
+ if (r) {
+ LOG_ERROR("Error writing to disk log");
+ return r;
+ }
+ LOG_DBG("[%s] Disk log written", SHORT_UUID(lc->uuid));
+ }
+
+ lc->touched = 0;
return 0;
+
}
/*
@@ -1179,14 +1192,18 @@
if (pkg->in_sync) {
if (log_test_bit(lc->sync_bits, pkg->region)) {
- LOG_PRINT(" Region already in-sync: %llu",
- (unsigned long long)pkg->region);
+ LOG_ERROR("[%s] Region already in-sync: region=%llu, seq=%llu, who=%u",
+ SHORT_UUID(lc->uuid),
+ (unsigned long long)pkg->region,
+ (unsigned long long)tfr->seq,
+ tfr->originator);
} else {
log_set_bit(lc, lc->sync_bits, pkg->region);
lc->sync_count++;
- LOG_DBG("[%s] sync_count = %llu, Region %llu marked in-sync by %u",
+ LOG_DBG("[%s] sync_count=%llu, Region %llu marked in-sync by %u, seq=%llu",
SHORT_UUID(lc->uuid), (unsigned long long)lc->sync_count,
- (unsigned long long)pkg->region, tfr->originator);
+ (unsigned long long)pkg->region, tfr->originator,
+ (unsigned long long)tfr->seq);
}
} else if (log_test_bit(lc->sync_bits, pkg->region)) {
lc->sync_count--;
@@ -1249,7 +1266,7 @@
tfr->data_size = sprintf(data, "3 clustered_disk %d:%d %c",
major(statbuf.st_rdev), minor(statbuf.st_rdev),
- 'A'); /* FIXME: detect dead device */
+ (lc->log_dev_failed) ? 'D' : 'A');
return 0;
}
@@ -1396,6 +1413,7 @@
/*
* do_request
* @tfr: the request
+ * @server: is this request performed by the server
*
* An inability to perform this function will return an error
* from this function. However, an inability to successfully
@@ -1403,7 +1421,7 @@
*
* Returns: 0 on success, -EXXX on error
*/
-int do_request(struct clog_tfr *tfr)
+int do_request(struct clog_tfr *tfr, int server)
{
int r;
@@ -1442,7 +1460,7 @@
r = clog_in_sync(tfr);
break;
case DM_CLOG_FLUSH:
- r = clog_flush(tfr);
+ r = clog_flush(tfr, server);
break;
case DM_CLOG_MARK_REGION:
r = clog_mark_region(tfr);
@@ -1489,52 +1507,6 @@
return 0;
}
-/*
- * commit_log
- * @tfr: commit log associated with this request
- *
- * This function will also set 'tfr->error' on failure
- *
- * Returns: 0 on success, -EXXX on error
- */
-int commit_log(struct clog_tfr *tfr)
-{
- int r = 0;
- struct log_c *lc;
-
- ENTER();
-
- lc = get_log(tfr->uuid);
-
- if (!lc) {
- LOG_DBG("No log found");
- tfr->error = -EINVAL;
- r = -EINVAL;
- goto out;
- }
-
- if (!(lc->touched & LOG_FLUSH))
- goto out;
-
- if (lc->disk_fd >= 0) {
- r = tfr->error = write_log(lc);
- if (r) {
- LOG_ERROR("Error writing to disk log");
- return -EIO;
- }
- LOG_DBG("[%s] Disk log written", SHORT_UUID(lc->uuid));
- }
-
- if (lc->touched & LOG_CHANGED)
- LOG_ERROR("WARNING: Log has changed during a flush operation");
-
- lc->touched &= ~LOG_FLUSH;
-
-out:
- EXIT();
- return 0;
-}
-
static void print_bits(char *buf, int size)
{
#ifdef DEBUG
@@ -1556,7 +1528,8 @@
#endif
}
-int store_bits(const char *uuid, const char *which, char **buf)
+/* int store_bits(const char *uuid, const char *which, char **buf)*/
+int push_state(const char *uuid, const char *which, char **buf)
{
int bitset_size;
struct log_c *lc;
@@ -1570,8 +1543,18 @@
return -EINVAL;
}
+ if (!strcmp(which, "recovering_region")) {
+ *buf = malloc(32); /* easily covers largest 64-bit int */
+ if (!*buf)
+ return -ENOMEM;
+ sprintf(*buf, "%llu", (unsigned long long)lc->recovering_region);
+
+ return 32;
+ }
+
bitset_size = lc->bitset_uint32_count * sizeof(*lc->clean_bits);
*buf = malloc(bitset_size);
+
if (!*buf) {
LOG_ERROR("store_bits: Unable to allocate memory");
return -ENOMEM;
@@ -1590,23 +1573,33 @@
return bitset_size;
}
-int load_bits(const char *uuid, const char *which, char *buf, int size)
+/*int load_bits(const char *uuid, const char *which, char *buf, int size)*/
+int pull_state(const char *uuid, const char *which, char *buf, int size)
{
int bitset_size;
struct log_c *lc;
if (!buf)
- LOG_ERROR("load_bits: buf == NULL");
+ LOG_ERROR("pull_state: buf == NULL");
lc = get_log(uuid);
if (!lc) {
- LOG_ERROR("load_bits: No log found for %s", uuid);
+ LOG_ERROR("pull_state: No log found for %s", uuid);
return -EINVAL;
}
+ if (!strncmp(which, "recovering_region", 17)) {
+ sscanf(buf, "%llu", (unsigned long long *)&lc->recovering_region);
+ LOG_DBG("[%s] recovering_region set to %llu",
+ SHORT_UUID(uuid),
+ (unsigned long long)lc->recovering_region);
+ return 0;
+ }
+
bitset_size = lc->bitset_uint32_count * sizeof(*lc->clean_bits);
if (bitset_size != size) {
- LOG_ERROR("load_bits: bad bitset_size");
+ LOG_ERROR("pull_state(%s): bad bitset_size (%d vs %d)",
+ which, size, bitset_size);
return -EINVAL;
}
--- cluster/cmirror/src/Attic/functions.h 2008/02/06 23:03:05 1.1.2.4
+++ cluster/cmirror/src/Attic/functions.h 2008/02/08 14:30:10 1.1.2.5
@@ -8,10 +8,16 @@
int local_resume(struct clog_tfr *tfr);
int cluster_postsuspend(char *);
-int do_request(struct clog_tfr *tfr);
-int commit_log(struct clog_tfr *tfr);
+
+int do_request(struct clog_tfr *tfr, int server);
+
+/*
int store_bits(const char *uuid, const char *which, char **buf);
int load_bits(const char *uuid, const char *which, char *buf, int size);
+*/
+int push_state(const char *uuid, const char *which, char **buf);
+int pull_state(const char *uuid, const char *which, char *buf, int size);
+
int log_get_state(struct clog_tfr *tfr);
int log_status(int);
#endif /* __CLOG_FUNCTIONS_DOT_H__ */
--- cluster/cmirror/src/Attic/local.c 2008/02/06 23:03:05 1.1.2.13
+++ cluster/cmirror/src/Attic/local.c 2008/02/08 14:30:10 1.1.2.14
@@ -166,7 +166,8 @@
case DM_CLOG_STATUS_INFO:
case DM_CLOG_STATUS_TABLE:
case DM_CLOG_PRESUSPEND:
- r = do_request(tfr);
+ /* We do not specify ourselves as server here */
+ r = do_request(tfr, 0);
if (r)
LOG_DBG("Returning failed request to kernel [%s]",
RQ_TYPE(tfr->request_type));
@@ -177,7 +178,8 @@
break;
case DM_CLOG_POSTSUSPEND:
- r = do_request(tfr);
+ /* We do not specify ourselves as server here */
+ r = do_request(tfr, 0);
if (r) {
LOG_DBG("Returning failed request to kernel [%s]",
RQ_TYPE(tfr->request_type));
@@ -212,6 +214,7 @@
LOG_ERROR("[%s] Unable to send %s to cluster: %s",
SHORT_UUID(tfr->uuid),
RQ_TYPE(tfr->request_type), strerror(-r));
+ tfr->data_size = 0;
tfr->error = r;
kernel_send(tfr);
} else {
^ permalink raw reply [flat|nested] only message in thread
only message in thread, other threads:[~2008-02-08 14:30 UTC | newest]
Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2008-02-08 14:30 [Cluster-devel] cluster/cmirror/src cluster.c cluster.h functi jbrassow
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.