All of lore.kernel.org
 help / color / mirror / Atom feed
From: jbrassow@sourceware.org <jbrassow@sourceware.org>
To: cluster-devel.redhat.com
Subject: [Cluster-devel] cluster/cmirror/src cluster.c functions.c func ...
Date: 5 Nov 2007 22:44:06 -0000	[thread overview]
Message-ID: <20071105224406.19189.qmail@sourceware.org> (raw)

CVSROOT:	/cvs/cluster
Module name:	cluster
Branch: 	RHEL5
Changes by:	jbrassow at sourceware.org	2007-11-05 22:44:04

Modified files:
	cmirror/src    : cluster.c functions.c functions.h link_mon.c 
	                 local.c 

Log message:
	- Fix problem with recovery work assignment (still need to add
	priority recovery... otherwise, I/O will stall for long periods
	during mirror resync)
	
	- Clean-up checkpointing code and fix a couple bugs there that
	prevented proper start-up.

Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/cluster.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.3&r2=1.1.2.4
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.2&r2=1.1.2.3
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/link_mon.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/local.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.2&r2=1.1.2.3

--- cluster/cmirror/src/Attic/cluster.c	2007/11/03 18:53:03	1.1.2.3
+++ cluster/cmirror/src/Attic/cluster.c	2007/11/05 22:44:03	1.1.2.4
@@ -49,6 +49,8 @@
 	int valid;
 	struct queue *startup_queue;
 
+	int checkpoints_needed;
+	uint32_t checkpoint_requesters[10];
 	struct checkpoint_data *checkpoint_list;
 };
 
@@ -384,7 +386,7 @@
 	return 0;
 }
 
-static int import_checkpoint(struct clog_cpg *entry)
+static int import_checkpoint(struct clog_cpg *entry, int no_read)
 {
 	int rtn = 0;
 	SaCkptCheckpointHandleT h;
@@ -422,6 +424,11 @@
 
 	saCkptCheckpointUnlink(ckpt_handle, &name);
 
+	if (no_read) {
+		LOG_DBG("Checkpoint for this log already recieved");
+		goto no_read;
+	}
+
 init_retry:
 	rv = saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY, 0, &itr);
 	if (rv == SA_AIS_ERR_TRY_AGAIN) {
@@ -516,7 +523,7 @@
 
 fail:
 	saCkptSectionIterationFinalize(itr);
-
+no_read:
 	saCkptCheckpointClose(h);
 
 	free(bitmap);
@@ -542,11 +549,18 @@
 			 * notice in tfr in export_checkpoint function
 			 * by setting tfr->error
 			 */
-			export_checkpoint(cp);
-
-			entry->checkpoint_list = cp->next;
-			free_checkpoint(cp);
-			cp = entry->checkpoint_list;
+			switch (export_checkpoint(cp)) {
+			case -EEXIST:
+				LOG_DBG("Checkpoint already handled by someone else");
+			case 0:
+				entry->checkpoint_list = cp->next;
+				free_checkpoint(cp);
+				cp = entry->checkpoint_list;
+				break;
+			default:
+				/* FIXME: Skipping will cause list corruption */
+				LOG_ERROR("Failed to export checkpoint");
+			}
 		}
 	}
 	EXIT();
@@ -556,7 +570,9 @@
 static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
 				 uint32_t nodeid, uint32_t pid, void *msg, int msg_len)
 {
+	int i;
 	int r = 0;
+	int i_am_server;
 	struct clog_tfr *tfr = msg;
 	struct clog_tfr *startup_tfr = NULL;
 	struct clog_tfr *cp_tfr = NULL;
@@ -567,9 +583,6 @@
 	if (msg_len != (sizeof(*tfr) + tfr->data_size))
 		LOG_ERROR("Badly sized message recieved from cluster.");
 
-	LOG_DBG("Message (len = %d) from node/pid %u/%d", msg_len,
-		  nodeid, pid);
-
 	if (tfr->request_type & DM_CLOG_RESPONSE)
 		LOG_DBG("Response from cluster recieved %s",
 			RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE));
@@ -587,13 +600,31 @@
 		LOG_ERROR("Unable to find clog_cpg for cluster message");
 		goto out;
 	}
+	i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0;
 
 	if (tfr->request_type == DM_CLOG_CHECKPOINT_READY) {
-		if ((!match->valid) && (my_cluster_id == tfr->originator)) {
-			switch (import_checkpoint(match)) {
+		/* Redundant checkpoints ignored due to match->valid */
+		if (my_cluster_id == tfr->originator) {
+			switch (import_checkpoint(match, match->valid)) {
 			case 0:
-				LOG_DBG("Checkpoint data recieved.  Log is now valid");
-				match->valid = 1;
+				if (!match->valid) {
+					LOG_DBG("Checkpoint data recieved.  Log is now valid");
+					match->valid = 1;
+					while ((startup_tfr = queue_remove(match->startup_queue))) {
+						LOG_DBG("Processing delayed request %d: %s",
+							match->startup_queue->count,
+							RQ_TYPE(startup_tfr->request_type));
+						r = handle_cluster_request(startup_tfr, i_am_server);
+
+						if (r) {
+							LOG_ERROR("Error while processing delayed CPG message");
+							goto out;
+						} else {
+							queue_add(startup_tfr, free_queue);
+						}
+					}
+				}
+
 				break;
 			case -EAGAIN:
 				LOG_PRINT("Checkpoint data empty.  Requesting new checkpoint.");
@@ -628,28 +659,28 @@
 		goto out;
 	}
 
-	if (tfr->request_type == DM_CLOG_CHECKPOINT_REQUEST) {
-		if (tfr->originator == my_cluster_id) {
-			/*
-			 * The checkpoint includes any request up to the
-			 * request for checkpoint.  So, we must clear any
-			 * previous requests we were storing.
-			 */
-			while ((startup_tfr = queue_remove(match->startup_queue)))
-				queue_add(startup_tfr, free_queue);
-		} else if (my_cluster_id == match->lowest_id) {
-			struct checkpoint_data *new;
-
-			new = prepare_checkpoint(match, tfr->originator);
-			if (!new) {
-				/* FIXME: Need better error handling */
-				LOG_ERROR("Failed to prepare checkpoint!!!");
-				goto out;
-			}
-			new->next = match->checkpoint_list;
-			match->checkpoint_list = new;
+	/*
+	 * If the log is now valid, we can queue the checkpoints
+	 */
+	for (i = match->checkpoints_needed; i;) {
+		struct checkpoint_data *new;
+
+		i--;
+		if (log_get_state(tfr) != LOG_RESUMED) {
+			LOG_DBG("Skipping checkpoint for %u, my log is not ready",
+				match->checkpoint_requesters[i]);
+			continue;
 		}
-		goto out;
+		new = prepare_checkpoint(match, match->checkpoint_requesters[i]);
+		if (!new) {
+			/* FIXME: Need better error handling */
+			LOG_ERROR("Failed to prepare checkpoint for %u!!!",
+				  match->checkpoint_requesters[i]);
+			break;
+		}
+		match->checkpoints_needed = i;
+		new->next = match->checkpoint_list;
+		match->checkpoint_list = new;
 	}
 
 	if (tfr->request_type & DM_CLOG_RESPONSE)
@@ -665,21 +696,7 @@
 			goto out;
 		}
 
-		while ((startup_tfr = queue_remove(match->startup_queue))) {
-			LOG_DBG("Processing delayed request %d: %s",
-				match->startup_queue->count,
-				RQ_TYPE(startup_tfr->request_type));
-			r = handle_cluster_request(startup_tfr,
-						   (my_cluster_id == match->lowest_id) ? 1 : 0);
-			if (r) {
-				LOG_ERROR("Error while processing delayed CPG message");
-				goto out;
-			} else {
-				queue_add(startup_tfr, free_queue);
-			}
-		}
-
-		r = handle_cluster_request(tfr, (my_cluster_id == match->lowest_id) ? 1 : 0);
+		r = handle_cluster_request(tfr, i_am_server);
 	}
 
 out:
@@ -694,16 +711,14 @@
 				struct cpg_address *left_list, int left_list_entries,
 				struct cpg_address *joined_list, int joined_list_entries)
 {
-	int i;
+	int i, j;
 	int my_pid = getpid();
 	int found = 0;
-	int i_was_server = 0;
-	int do_checkpoint = 0;
 	struct clog_cpg *match, *tmp;
 
 	ENTER();
 
-	LOG_PRINT("* CPG config callback *********************");
+	LOG_PRINT("****** CPG config callback ****************");
 
 	LOG_PRINT("* JOINING (%d):", joined_list_entries);
 	for (i = 0; i < joined_list_entries; i++)
@@ -751,6 +766,7 @@
 		goto out;
 	}
 
+	/* Assign my_cluster_id */
 	if (my_cluster_id == 0xDEAD) {
 		for (i = 0; i < joined_list_entries; i++) {
 			LOG_DBG("My pid = %d\t\t[%u/%d]", my_pid,
@@ -764,42 +780,30 @@
 				LOG_PRINT("Setting my cluster id: %u", my_cluster_id);
 			}
 		}
-	} else if (match->lowest_id == my_cluster_id) {
-		LOG_DBG("I was the server (lowest_id = %u, my_cluster_id = %u)",
-			  match->lowest_id, my_cluster_id);
-		i_was_server = 1;
+		goto out;
 	}
 
+	/* Find the lowest_id, i.e. the server */
 	for (i = 0, match->lowest_id = member_list[0].nodeid;
 	     i < member_list_entries; i++)
 		if (match->lowest_id > member_list[i].nodeid)
 			match->lowest_id = member_list[i].nodeid;
 
-	if (match->lowest_id == my_cluster_id) {
-		/* I am the server now */
-		if (!i_was_server)
-			LOG_PRINT("I am the new log server for %s", match->name.value);
-		else if (joined_list_entries) {
-			LOG_PRINT("I must send checkpoint data.");
-			do_checkpoint = 1;
-		}
-	} else if (i_was_server && joined_list_entries) {
-		LOG_PRINT("Giving server ownership to %u", match->lowest_id);
-		LOG_PRINT("I must send checkpoint data.");
-		do_checkpoint = 1;
-	}
+	LOG_PRINT("Server is now %u", match->lowest_id);
 
-	if (do_checkpoint) {
-		struct checkpoint_data *new;
+	/*
+	 * If I am part of the joining list, I do not send checkpoints
+	 * FIXME: What are the cases where multiple nodes can join?
+	 */
+	for (i = 0; i < joined_list_entries; i++)
+		if (joined_list[i].nodeid == my_cluster_id)
+			goto out;
 
-		for (i = 0; i < joined_list_entries; i++) {
-			new = prepare_checkpoint(match, joined_list[i].nodeid);
-			if (!new)
-				goto out;
-			new->next = match->checkpoint_list;
-			match->checkpoint_list = new;
-		}
+	for (i = 0, j = match->checkpoints_needed; i < joined_list_entries; i++) {
+		LOG_DBG("Joining node, %u needs checkpoint", joined_list[i].nodeid);
+		match->checkpoint_requesters[i + j] = joined_list[i].nodeid;
 	}
+	match->checkpoints_needed += i;
 
 out:
 	EXIT();
--- cluster/cmirror/src/Attic/functions.c	2007/11/03 18:37:48	1.1.2.2
+++ cluster/cmirror/src/Attic/functions.c	2007/11/05 22:44:03	1.1.2.3
@@ -56,6 +56,8 @@
                 FORCESYNC,      /* Force a sync to happen */
         } sync;
 
+	uint32_t state;         /* current operational state of the log */
+
 	int disk_fd;            /* -1 means no disk log */
 	int log_dev_failed;
 	uint64_t disk_nr_regions;
@@ -563,6 +565,8 @@
 	if (lc->touched)
 		LOG_DBG("WARNING: log still marked as 'touched' during suspend");
 
+	lc->state = LOG_SUSPENDED;
+
 	return 0;
 }
 
@@ -583,12 +587,12 @@
 }
 
 /*
- * _clog_resume
+ * clog_resume
  * @tfr
  *
  * Does the main work of resuming.
  */
-static int _clog_resume(struct clog_tfr *tfr)
+static int clog_resume(struct clog_tfr *tfr)
 {
 	uint32_t i;
 	struct log_c *lc = get_log(tfr->uuid);
@@ -617,6 +621,9 @@
 	case 3:
 		LOG_DBG("Non-master resume: bits pre-loaded");
 		lc->resume_override = 1000;
+		lc->sync_count = count_bits32(lc->clean_bits, lc->bitset_uint32_count);
+		LOG_DBG("sync_count = %llu", lc->sync_count);
+		goto out;
 		return 0;
 	default:
 		LOG_ERROR("Error:: multiple loading of bits (%d)", lc->resume_override);
@@ -662,36 +669,29 @@
 	/* copy clean across to sync */
 	memcpy(lc->sync_bits, lc->clean_bits, size);
 	lc->sync_count = count_bits32(lc->clean_bits, lc->bitset_uint32_count);
+	LOG_DBG("sync_count = %llu", lc->sync_count);
 	lc->sync_search = 0;
 
 	/*
-	tfr->error = write_log(lc);
-	if (tfr->error) {
-		lc->log_dev_failed = 1;
-		LOG_ERROR("Failed to write initial disk log");
-	} else
-		lc->log_dev_failed = 0;
-	*/
-	/*
 	 * We mark 'touched' so that only the master commits
 	 * the log via 'commit_log'
 	 */
 	lc->touched = 1;
-
+out:
+	lc->state = LOG_RESUMED;
+	
 	return tfr->error;
 }
 
 /*
- * clog_resume
+ * local_resume
  * @tfr
  *
  * If the log is pending, we must first join the cpg and
  * put the log in the official list.
  *
- * If the log is in the official list, then we call
- * _clog_resume.
  */
-static int clog_resume(struct clog_tfr *tfr)
+int local_resume(struct clog_tfr *tfr)
 {
 	int r;
 	struct log_c *lc = get_log(tfr->uuid);
@@ -714,12 +714,9 @@
 		/* move log to official list */
 		list_del_init(&lc->list);
 		list_add(&lc->list, &log_list);
-
-		return 0;
 	}
 
-	/* log is in the official list, try to resume */
-	return _clog_resume(tfr);
+	return 0;
 }
 
 /*
@@ -896,7 +893,6 @@
  */
 static int clog_get_resync_work(struct clog_tfr *tfr)
 {
-	int sync_search, conflict=0;
 	struct {int i; uint64_t r; } *pkg = (void *)tfr->data;
 	struct log_c *lc = get_log(tfr->uuid);
 
@@ -905,42 +901,35 @@
 
 	tfr->data_size = sizeof(*pkg);
 
-	/*
-	 * Check if we are done recovering, or if there
-	 * is already someone recovering.
-	 */
-	if ((lc->sync_search >= lc->region_count) ||
-	    (lc->recovering_region != (uint64_t)-1)) {
+	if (lc->sync_search >= lc->region_count) {
+		/*
+		 * FIXME: handle intermittent errors during recovery
+		 * by resetting sync_search... but not to many times.
+		 */
+		LOG_DBG("  Recovery has finished");
 		pkg->i = 0;
 		return 0;
 	}
 
-	for (sync_search = lc->sync_search;
-	     sync_search < lc->region_count;
-	     sync_search += (pkg->r + 1)) {
-		pkg->r = find_next_zero_bit(lc->sync_bits,
-					    lc->region_count,
-					    sync_search);
-
-		/*
-		 * If the region is currently marked, we cannot
-		 * recover it yet.
-		 */
-		if (!log_test_bit(lc->clean_bits, pkg->r))
-			conflict = 1;
-		else
-			break;
+	if (lc->recovering_region != (uint64_t)-1) {
+		LOG_DBG("Someone is already recovering region %Lu",
+			lc->recovering_region);
+		pkg->i = 0;
+		return 0;
 	}
 
-	if (!conflict)
-		lc->sync_search = pkg->r + 1;
+	pkg->r = find_next_zero_bit(lc->sync_bits,
+				    lc->region_count,
+				    lc->sync_search);
 
 	if (pkg->r >= lc->region_count) {
 		pkg->i = 0;
 		return 0;
 	}
 
-	LOG_DBG("Assigning resync work: region = %llu\n", pkg->r);
+	lc->sync_search = pkg->r + 1;
+
+	LOG_DBG("  Assigning resync work: region = %llu\n", pkg->r);
 	pkg->i = 1;
 	return 0;
 }
@@ -984,6 +973,8 @@
 		return -EINVAL;
 
 	*sync_count = lc->sync_count;
+	LOG_DBG("sync_count = %llu", *sync_count);
+
 	tfr->data_size = sizeof(*sync_count);
 
 	return 0;
@@ -1347,6 +1338,17 @@
 	return 0;
 }
 
+int log_get_state(struct clog_tfr *tfr)
+{
+	struct log_c *lc;
+
+	lc = get_log(tfr->uuid);
+	if (!lc)
+		return -EINVAL;
+
+	return lc->state;
+}
+
 int log_status(void)
 {
 	int found = 0;
@@ -1380,3 +1382,4 @@
 	}
 	return found;
 }
+
--- cluster/cmirror/src/Attic/functions.h	2007/08/23 19:57:31	1.1.2.1
+++ cluster/cmirror/src/Attic/functions.h	2007/11/05 22:44:03	1.1.2.2
@@ -3,9 +3,14 @@
 
 #include <linux/dm-clog-tfr.h>
 
+#define LOG_RESUMED   1
+#define LOG_SUSPENDED 2
+
+int local_resume(struct clog_tfr *tfr);
 int do_request(struct clog_tfr *tfr);
 int commit_log(struct clog_tfr *tfr);
 int store_bits(const char *uuid, const char *which, char **buf);
 int load_bits(const char *uuid, const char *which, char *buf, int size);
+int log_get_state(struct clog_tfr *tfr);
 int log_status(void);
 #endif /* __CLOG_FUNCTIONS_DOT_H__ */
--- cluster/cmirror/src/Attic/link_mon.c	2007/08/23 19:57:31	1.1.2.1
+++ cluster/cmirror/src/Attic/link_mon.c	2007/11/05 22:44:03	1.1.2.2
@@ -102,8 +102,6 @@
 	/* FIXME: handle POLLHUP */
 	for (i = 0; i < used_pfds; i++)
 		if (pfds[i].revents & POLLIN) {
-			LOG_DBG("Data waiting on file descriptor, %d.",
-				pfds[i].fd);
 			/* FIXME: Add this back return 1;*/
 			r++;
 		}
--- cluster/cmirror/src/Attic/local.c	2007/11/03 18:37:48	1.1.2.2
+++ cluster/cmirror/src/Attic/local.c	2007/11/05 22:44:03	1.1.2.3
@@ -170,7 +170,7 @@
 		 * component to join the CPG, and a cluster component
 		 * to handle the request.
 		 */
-		r = do_request(tfr);
+		r = local_resume(tfr);
 		if (r) {
 			LOG_DBG("Returning failed request to kernel [%s]",
 				RQ_TYPE(tfr->request_type));



             reply	other threads:[~2007-11-05 22:44 UTC|newest]

Thread overview: 2+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2007-11-05 22:44 jbrassow [this message]
  -- strict thread matches above, loose matches on Subject: below --
2008-01-14 22:52 [Cluster-devel] cluster/cmirror/src cluster.c functions.c func jbrassow

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20071105224406.19189.qmail@sourceware.org \
    --to=jbrassow@sourceware.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.