All of lore.kernel.org
 help / color / mirror / Atom feed
From: jbrassow@sourceware.org <jbrassow@sourceware.org>
To: cluster-devel.redhat.com
Subject: [Cluster-devel] cluster cmirror/src/Makefile cmirror/src/clogd ...
Date: 3 Nov 2007 18:37:51 -0000	[thread overview]
Message-ID: <20071103183751.5030.qmail@sourceware.org> (raw)

CVSROOT:	/cvs/cluster
Module name:	cluster
Branch: 	RHEL5
Changes by:	jbrassow at sourceware.org	2007-11-03 18:37:49

Modified files:
	cmirror/src    : Makefile clogd.c cluster.c common.h functions.c 
	                 local.c 
	cmirror-kernel/src: dm-clog-tfr.h dm-clog.c 

Log message:
	- Addition of disk logging
	- Add 'is_remote_recovering' function
	- Checkpoint clean-ups

Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/Makefile.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.3.2.2&r2=1.3.2.3
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/clogd.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/cluster.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/common.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/local.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror-kernel/src/dm-clog-tfr.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror-kernel/src/dm-clog.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.2.2.3&r2=1.2.2.4

--- cluster/cmirror/src/Attic/Makefile	2007/08/30 15:49:32	1.3.2.2
+++ cluster/cmirror/src/Attic/Makefile	2007/11/03 18:37:48	1.3.2.3
@@ -30,7 +30,7 @@
 endif
 
 ifeq ($(DEBUG),log)
-CFLAGS += -DDEBUG_LOG
+CFLAGS += -DDEBUG
 endif
 
 CFLAGS += -g
--- cluster/cmirror/src/Attic/clogd.c	2007/08/23 19:57:31	1.1.2.1
+++ cluster/cmirror/src/Attic/clogd.c	2007/11/03 18:37:48	1.1.2.2
@@ -42,6 +42,7 @@
 
 	LOG_PRINT("Starting clogd:");
 	LOG_PRINT(" Built: "__DATE__" "__TIME__"\n");
+	LOG_DBG(" Compiled with debugging.");
 
 	while (!exit_now) {
 		links_monitor();
--- cluster/cmirror/src/Attic/cluster.c	2007/08/23 19:57:31	1.1.2.1
+++ cluster/cmirror/src/Attic/cluster.c	2007/11/03 18:37:48	1.1.2.2
@@ -21,6 +21,7 @@
 
 #define DM_CLOG_RESPONSE 0x1000 /* in last byte of 32-bit value */
 #define DM_CLOG_CHECKPOINT_READY ((uint32_t)-1)
+#define DM_CLOG_CHECKPOINT_REQUEST 0
 
 static uint32_t my_cluster_id = 0xDEAD;
 static SaCkptHandleT ckpt_handle;
@@ -102,11 +103,20 @@
 
 static int handle_cluster_request(struct clog_tfr *tfr, int server)
 {
-	int r;
+	int r = 0;
 
 	ENTER("%s", RQ_TYPE(tfr->request_type));
 
-	r = do_request(tfr);
+	/*
+	 * With resumes, we only handle our own.
+	 * Resume is a special case that requires
+	 * local action (to set up CPG), followed by
+	 * a cluster action to co-ordinate reading
+	 * the disk and checkpointing
+	 */
+	if ((tfr->request_type != DM_CLOG_RESUME) ||
+	    (tfr->originator == my_cluster_id))
+		r = do_request(tfr);
 
 	if (server) {
 		if (r)
@@ -176,6 +186,75 @@
 	return NULL;
 }
 
+/*
+ * prepare_checkpoint
+ * @entry: clog_cpg describing the log
+ * @cp_requester: nodeid requesting the checkpoint
+ *
+ * Creates and fills in a new checkpoint_data struct.
+ *
+ * Returns: checkpoint_data on success, NULL on error
+ */
+static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry,
+						  uint32_t cp_requester)
+{
+	struct checkpoint_data *new;
+
+	new = malloc(sizeof(*new));
+	if (!new) {
+		LOG_ERROR("Unable to create checkpoint data for %u",
+			  cp_requester);
+		return NULL;
+	}
+	memset(new, 0, sizeof(*new));
+	new->requester = cp_requester;
+	strncpy(new->uuid, entry->name.value, entry->name.length);
+
+	if (entry->valid) {
+		new->bitmap_size = store_bits(entry->name.value, "clean_bits",
+					      &new->clean_bits);
+		if (new->bitmap_size <= 0) {
+			LOG_ERROR("Failed to store clean_bits to checkpoint for node %u",
+				  new->requester);
+			free(new);
+			return NULL;
+		}
+
+		new->bitmap_size = store_bits(entry->name.value,
+					      "sync_bits", &new->sync_bits);
+		if (new->bitmap_size <= 0) {
+			LOG_ERROR("Failed to store sync_bits to checkpoint for node %u",
+				  new->requester);
+			free(new->clean_bits);
+			free(new);
+			return NULL;
+		}
+	} else {
+		/*
+		 * We can store bitmaps yet, because the log is not
+		 * valid yet.  The new machine will have to ask
+		 * specifically for a new checkpoint.
+		 */
+		LOG_ERROR("Forced to refuse checkpoint for nodeid %u - log not valid yet",
+			  new->requester);
+		new->bitmap_size = 0;
+	}
+
+	return new;
+}
+
+/*
+ * free_checkpoint
+ * @cp: the checkpoint_data struct to free
+ *
+ */
+static void free_checkpoint(struct checkpoint_data *cp)
+{
+	free(cp->sync_bits);
+	free(cp->clean_bits);
+	free(cp);
+}
+
 static int export_checkpoint(struct checkpoint_data *cp)
 {
 	SaCkptCheckpointCreationAttributesT attr;
@@ -307,6 +386,7 @@
 
 static int import_checkpoint(struct clog_cpg *entry)
 {
+	int rtn = 0;
 	SaCkptCheckpointHandleT h;
 	SaCkptSectionIterationHandleT itr;
 	SaCkptSectionDescriptorT desc;
@@ -385,8 +465,8 @@
 
 		if (rv != SA_AIS_OK) {
 			LOG_ERROR("import_checkpoint: clean checkpoint section creation failed");
-			EXIT();
-			return -EIO; /* FIXME: better error */
+			rtn = -EIO; /* FIXME: better error */
+			goto fail;
 		}
 
 		if (!desc.sectionSize) {
@@ -410,28 +490,38 @@
 
 		if (rv != SA_AIS_OK) {
 			LOG_ERROR("import_checkpoint: ckpt read error");
-			EXIT();
-			return -EIO; /* FIXME: better error */
+			rtn = -EIO; /* FIXME: better error */
+			goto fail;
 		}
 
+		/* FIXME: Is this catching something special?
 		if (!iov.readSize) {
 			LOG_ERROR("%s section empty", (char *)desc.sectionId.id);
 			continue;
 		}
+		*/
 
-		if (load_bits(entry->name.value, (char *)desc.sectionId.id, bitmap, iov.readSize)) {
-			LOG_ERROR("Error loading bits");
-			EXIT();
-			return -EIO;
+		if (iov.readSize) {
+			if (load_bits(entry->name.value, (char *)desc.sectionId.id, bitmap, iov.readSize)) {
+				LOG_ERROR("Error loading bits");
+				rtn = -EIO;
+				goto fail;
+			}
+		} else {
+			/* Need to request new checkpoint */
+			rtn = -EAGAIN;
+			goto fail;
 		}
 	}
+
+fail:
 	saCkptSectionIterationFinalize(itr);
 
 	saCkptCheckpointClose(h);
 
 	free(bitmap);
 	EXIT();
-	return 0;
+	return rtn;
 }
 
 static int do_cluster_work(void *data)
@@ -455,9 +545,7 @@
 			export_checkpoint(cp);
 
 			entry->checkpoint_list = cp->next;
-			free(cp->sync_bits);
-			free(cp->clean_bits);
-			free(cp);
+			free_checkpoint(cp);
 			cp = entry->checkpoint_list;
 		}
 	}
@@ -471,6 +559,7 @@
 	int r = 0;
 	struct clog_tfr *tfr = msg;
 	struct clog_tfr *startup_tfr = NULL;
+	struct clog_tfr *cp_tfr = NULL;
 	struct clog_cpg *match;
 
 	ENTER();
@@ -481,6 +570,13 @@
 	LOG_DBG("Message (len = %d) from node/pid %u/%d", msg_len,
 		  nodeid, pid);
 
+	if (tfr->request_type & DM_CLOG_RESPONSE)
+		LOG_DBG("Response from cluster recieved %s",
+			RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE));
+	else
+		LOG_DBG("Request from cluster recieved %s",
+			RQ_TYPE(tfr->request_type));
+
 	if (my_cluster_id == 0xDEAD) {
 		LOG_DBG("Message before init... ignoring.\n");
 		goto out;
@@ -494,10 +590,64 @@
 
 	if (tfr->request_type == DM_CLOG_CHECKPOINT_READY) {
 		if ((!match->valid) && (my_cluster_id == tfr->originator)) {
-			if (import_checkpoint(match))
-				LOG_ERROR("Failed to import checkpoint");
-			else
+			switch (import_checkpoint(match)) {
+			case 0:
+				LOG_DBG("Checkpoint data recieved.  Log is now valid");
 				match->valid = 1;
+				break;
+			case -EAGAIN:
+				LOG_PRINT("Checkpoint data empty.  Requesting new checkpoint.");
+				
+				cp_tfr = queue_remove(free_queue);
+				if (!cp_tfr) {
+					/* FIXME: better error handling */
+					LOG_ERROR("No clog_tfr struct available");
+					goto out;
+				}
+				memset(cp_tfr, 0, sizeof(*cp_tfr));
+				cp_tfr->request_type = DM_CLOG_CHECKPOINT_REQUEST;
+
+				cp_tfr->originator = my_cluster_id;
+
+				strncpy(cp_tfr->uuid, tfr->uuid, CPG_MAX_NAME_LENGTH);
+
+				if ((r = cluster_send(cp_tfr))) {
+					/* FIXME: better error handling */
+					LOG_ERROR("Failed to send checkpoint ready notice");
+					queue_add(cp_tfr, free_queue);
+					goto out;
+				}
+				queue_add(cp_tfr, free_queue);
+
+				break;
+			default:
+				LOG_ERROR("Failed to import checkpoint");
+				/* Could we retry? */
+			}
+		}
+		goto out;
+	}
+
+	if (tfr->request_type == DM_CLOG_CHECKPOINT_REQUEST) {
+		if (tfr->originator == my_cluster_id) {
+			/*
+			 * The checkpoint includes any request up to the
+			 * request for checkpoint.  So, we must clear any
+			 * previous requests we were storing.
+			 */
+			while ((startup_tfr = queue_remove(match->startup_queue)))
+				queue_add(startup_tfr, free_queue);
+		} else if (my_cluster_id == match->lowest_id) {
+			struct checkpoint_data *new;
+
+			new = prepare_checkpoint(match, tfr->originator);
+			if (!new) {
+				/* FIXME: Need better error handling */
+				LOG_ERROR("Failed to prepare checkpoint!!!");
+				goto out;
+			}
+			new->next = match->checkpoint_list;
+			match->checkpoint_list = new;
 		}
 		goto out;
 	}
@@ -517,8 +667,8 @@
 
 		while ((startup_tfr = queue_remove(match->startup_queue))) {
 			LOG_DBG("Processing delayed request %d: %s",
-				  match->startup_queue->count,
-				  RQ_TYPE(startup_tfr->request_type));
+				match->startup_queue->count,
+				RQ_TYPE(startup_tfr->request_type));
 			r = handle_cluster_request(startup_tfr,
 						   (my_cluster_id == match->lowest_id) ? 1 : 0);
 			if (r) {
@@ -562,8 +712,12 @@
 
 	LOG_PRINT("* MEMBERS (%d):", member_list_entries);
 	for (i = 0; i < member_list_entries; i++)
+		/*
 		LOG_PRINT("*   [%d] nodeid: %d, pid: %d",
 			  i, member_list[i].nodeid, member_list[i].pid);
+		*/
+		syslog(LOG_NOTICE, "*   [%d] nodeid: %d, pid: %d",
+		       i, member_list[i].nodeid, member_list[i].pid);
 
 	LOG_PRINT("* LEAVING (%d):", left_list_entries);
 	for (i = 0; i < left_list_entries; i++)
@@ -641,33 +795,11 @@
 
 	if (do_checkpoint) {
 		struct checkpoint_data *new;
-		/* FIXME: might need to wait until cleared... */
+
 		for (i = 0; i < joined_list_entries; i++) {
-			new = malloc(sizeof(*new));
-			if (!new) {
-				LOG_ERROR("Unable to create checkpoint data for %u",
-					  joined_list[i].nodeid);
-				goto out;
-			}
-			memset(new, 0, sizeof(*new));
-			new->requester = joined_list[i].nodeid;
-			strncpy(new->uuid, match->name.value, match->name.length);
-			new->bitmap_size = store_bits(match->name.value, "clean_bits", &new->clean_bits);
-			if (new->bitmap_size <= 0) {
-				LOG_ERROR("Failed to store clean_bits to checkpoint for node %u",
-					  joined_list[i].nodeid);
-				free(new);
+			new = prepare_checkpoint(match, joined_list[i].nodeid);
+			if (!new)
 				goto out;
-			}
-
-			new->bitmap_size = store_bits(match->name.value,
-						      "sync_bits", &new->sync_bits);
-			if (new->bitmap_size <= 0) {
-				LOG_ERROR("Failed to store sync_bits to checkpoint for node %u",
-					  joined_list[i].nodeid);
-				free(new->clean_bits);
-				free(new);
-			}
 			new->next = match->checkpoint_list;
 			match->checkpoint_list = new;
 		}
--- cluster/cmirror/src/Attic/common.h	2007/08/23 19:57:31	1.1.2.1
+++ cluster/cmirror/src/Attic/common.h	2007/11/03 18:37:48	1.1.2.2
@@ -16,6 +16,7 @@
 
 #define EXIT_QUEUE_NOMEM           7
 
+/* Located in dm-clog-tfr.h
 #define RQ_TYPE(x) \
 	((x) == DM_CLOG_CTR) ? "DM_CLOG_CTR" : \
 	((x) == DM_CLOG_DTR) ? "DM_CLOG_DTR" : \
@@ -34,5 +35,6 @@
 	((x) == DM_CLOG_STATUS_INFO) ? "DM_CLOG_STATUS_INFO" : \
 	((x) == DM_CLOG_STATUS_TABLE) ? "DM_CLOG_STATUS_TABLE" : \
 	NULL
+*/
 
 #endif /* __CLUSTER_LOG_COMMON_DOT_H__ */
--- cluster/cmirror/src/Attic/functions.c	2007/08/23 19:57:31	1.1.2.1
+++ cluster/cmirror/src/Attic/functions.c	2007/11/03 18:37:48	1.1.2.2
@@ -1,8 +1,15 @@
 #include <stdint.h>
 #include <errno.h>
 #include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <dirent.h>
+#include <unistd.h>
 #include <ext2fs/ext2_fs.h>
 #include <ext2fs/ext2fs.h>
+#include <linux/kdev_t.h>
+#define __USE_GNU /* for O_DIRECT */
+#include <fcntl.h>
 #include "functions.h"
 #include "queues.h"
 #include "common.h"
@@ -11,6 +18,21 @@
 
 #define BYTE_SHIFT 3
 
+/*
+ * Magic for persistent mirrors: "MiRr"
+ * Following on-disk header information is stolen from
+ * drivers/md/dm-log.c
+ */
+#define MIRROR_MAGIC 0x4D695272
+#define MIRROR_DISK_VERSION 2
+#define LOG_OFFSET 2
+
+struct log_header {
+        uint32_t magic;
+        uint32_t version;
+        uint64_t nr_regions;
+};
+
 struct log_c {
 	struct list_head list;
 	char uuid[DM_UUID_LEN];
@@ -26,6 +48,8 @@
 	uint64_t recovering_region; /* -1 means not recovering */
 	int sync_search;
 
+	int resume_override;
+
         enum sync {
                 DEFAULTSYNC,    /* Synchronize if necessary */
                 NOSYNC,         /* Devices known to be already in sync */
@@ -33,9 +57,14 @@
         } sync;
 
 	int disk_fd;            /* -1 means no disk log */
+	int log_dev_failed;
+	uint64_t disk_nr_regions;
+	size_t disk_size;       /* size of disk_buffer in bytes */
+	void *disk_buffer;      /* aligned memory for O_DIRECT */
 };
 
 static struct list_head log_list = LIST_HEAD_INIT(log_list);
+static struct list_head log_pending_list = LIST_HEAD_INIT(log_pending_list);
 
 static int log_test_bit(uint32_t *bs, unsigned bit)
 {
@@ -61,6 +90,21 @@
 	return start;
 }
 
+static uint64_t count_bits32(uint32_t *addr, uint32_t count)
+{
+	int j;
+	uint32_t i;
+	uint64_t rtn = 0;
+
+	for (i = 0; i < count; i++) {
+		if (!addr[i])
+			continue;
+		for (j = 0; j < 32; j++)
+			rtn += (addr[i] & (1<<j)) ? 1 : 0;
+	}
+	return rtn;
+}
+
 /*
  * get_log
  * @tfr
@@ -82,9 +126,191 @@
 	return NULL;
 }
 
+/*
+ * get_pending_log
+ * @tfr
+ *
+ * Pending logs are logs that have been 'clog_ctr'ed, but
+ * have not joined the CPG (via clog_resume).
+ *
+ * Returns: log if found, NULL otherwise
+ */
+static struct log_c *get_pending_log(const char *uuid)
+{
+	struct list_head *l;
+	struct log_c *lc;
+
+	/* FIXME: Need prefetch to do this right */
+	__list_for_each(l, &log_pending_list) {
+		lc = list_entry(l, struct log_c, list);
+		if (!strcmp(lc->uuid, uuid))
+			return lc;
+	}
+
+	return NULL;
+}
+
+static void header_to_disk(struct log_header *mem, struct log_header *disk)
+{
+	memcpy(disk, mem, sizeof(struct log_header));
+}
+
+static void header_from_disk(struct log_header *mem, struct log_header *disk)
+{
+	memcpy(mem, disk, sizeof(struct log_header));
+}
+
+static int rw_log(struct log_c *lc, int do_write)
+{
+	int r;
+
+	r = lseek(lc->disk_fd, 0, SEEK_SET);
+	if (r < 0) {
+		LOG_ERROR("rw_log:  lseek failure: %s",
+			  strerror(errno));
+		return -errno;
+	}
+
+	if (do_write) {
+		r = write(lc->disk_fd, lc->disk_buffer, lc->disk_size);
+		if (r < 0) {
+			LOG_ERROR("rw_log:  write failure: %s",
+				  strerror(errno));
+			return -EIO;
+		}
+		return 0;
+	}
+
+	/* Read */
+	r = read(lc->disk_fd, lc->disk_buffer, lc->disk_size);
+	if (r < 0)
+		LOG_ERROR("rw_log:  read failure: %s",
+			  strerror(errno));
+	if (r != lc->disk_size)
+		return -EIO;
+	return 0;
+}
+
+/*
+ * read_log
+ * @lc
+ *
+ * Valid return codes:
+ *   -EINVAL:  Invalid header, bits not copied
+ *   -EIO:     Unable to read disk log
+ *    0:       Valid header, disk bit -> lc->sync_bits
+ *
+ * Returns: 0 on success, -EXXX on failure
+ */
+static int read_log(struct log_c *lc)
+{
+	struct log_header lh;
+	size_t bitset_size;
+
+	memset(&lh, 0, sizeof(struct log_header));
+
+	if (rw_log(lc, 0))
+		return -EIO;
+
+	header_from_disk(&lh, lc->disk_buffer);
+	if (lh.magic != MIRROR_MAGIC) {
+		LOG_ERROR("Header not valid");
+		LOG_ERROR("  magic     : %x  (expected: %x)",
+			  lh.magic, MIRROR_MAGIC);
+		LOG_ERROR("  version   : %u", lh.version);
+		LOG_ERROR("  nr_regions: %llu", lh.nr_regions);
+		LOG_ERROR("*** %s ***", strerror(EINVAL));
+		return -EINVAL;
+	}
+
+	lc->disk_nr_regions = lh.nr_regions;
+
+	/* Read disk bits into sync_bits */
+	bitset_size = lc->region_count / 8;
+	bitset_size += (lc->region_count % 8) ? 1 : 0;
+	memcpy(lc->clean_bits, lc->disk_buffer + 1024, bitset_size);
+
+	return 0;
+}
+
+/*
+ * write_log
+ * @lc
+ *
+ * Returns: 0 on success, -EIO on failure
+ */
+static int write_log(struct log_c *lc)
+{
+	struct log_header lh;
+	size_t bitset_size;
+
+	lh.magic = MIRROR_MAGIC;
+	lh.version = MIRROR_DISK_VERSION;
+	lh.nr_regions = lc->region_count;
+
+	header_to_disk(&lh, lc->disk_buffer);
+
+	/* Write disk bits from clean_bits */
+	bitset_size = lc->region_count / 8;
+	bitset_size += (lc->region_count % 8) ? 1 : 0;
+	memcpy(lc->disk_buffer + 1024, lc->sync_bits, bitset_size);
+
+	if (rw_log(lc, 1))
+		return -EIO;
+	return 0;
+}
+
+static int find_disk_path(char *major_minor_str, char *path_rtn)
+{
+	int r;
+	DIR *dp;
+	struct dirent *dep;
+	struct stat statbuf;
+	int major, minor;
+
+	r = sscanf(major_minor_str, "%d:%d", &major, &minor);
+	if (r != 2)
+		return -EINVAL;
+
+	LOG_DBG("Checking /dev/mapper for device %d:%d", major, minor);
+	/* Check /dev/mapper dir */
+	dp = opendir("/dev/mapper");
+	if (!dp)
+		return -ENOENT;
+
+	while ((dep = readdir(dp)) != NULL) {
+		/*
+		 * FIXME: This is racy.  By the time the path is used,
+		 * it may point to something else.  'fstat' will be
+		 * required upon opening to ensure we got what we
+		 * wanted.
+		 */
+
+		sprintf(path_rtn, "/dev/mapper/%s", dep->d_name);
+		stat(path_rtn, &statbuf);
+		if (S_ISBLK(statbuf.st_mode) &&
+		    (major(statbuf.st_rdev) == major) &&
+		    (minor(statbuf.st_rdev) == minor)) {
+			LOG_DBG("  %s: YES", dep->d_name);
+			closedir(dp);
+			return 0;
+		} else {
+			LOG_DBG("  %s: NO", dep->d_name);
+		}
+	}
+
+	closedir(dp);
+
+	LOG_DBG("Path not found for %d/%d", major, minor);
+	LOG_DBG("Creating /dev/mapper/%d-%d", major, minor);
+	sprintf(path_rtn, "/dev/mapper/%d-%d", major, minor);
+	r = mknod(path_rtn, S_IFBLK | S_IRUSR | S_IWUSR, MKDEV(major, minor));
+
+	return r ? -errno : 0;
+}
+
 static int _clog_ctr(int argc, char **argv, uint64_t device_size)
 {
-	int disk_log = 0;
 	int r = 0;
 	char *p;
 	uint64_t region_size;
@@ -92,6 +318,12 @@
 	uint32_t bitset_size;
 	struct log_c *lc = NULL;
 	enum sync sync = DEFAULTSYNC;
+
+	int disk_log = 0;
+	char disk_path[128];
+	size_t page_size;
+	int pages;
+
 	ENTER();
 
 	/* If core log request, then argv[0] will be region_size */
@@ -104,6 +336,13 @@
 			r = -EINVAL;
 			goto fail;
 		}
+
+		r = find_disk_path(argv[0], disk_path);
+		if (r) {
+			LOG_ERROR("Unable to find path to device %s", argv[0]);
+			goto fail;
+		}
+		LOG_DBG("Clustered log disk is %s", disk_path);
 	} else {
 		disk_log = 0;
 
@@ -147,7 +386,11 @@
 	lc->region_size = region_size;
 	lc->region_count = region_count;
 	lc->sync = sync;
+	lc->sync_search = 0;
+	lc->recovering_region = (uint64_t)-1;
 	strncpy(lc->uuid, argv[1 + disk_log], DM_UUID_LEN);
+	lc->disk_fd = -1;
+	lc->log_dev_failed = 0;
 
 	lc->bitset_uint32_count = region_count / (sizeof(*lc->clean_bits) << BYTE_SHIFT);
 	if (region_count % (sizeof(*lc->clean_bits) << BYTE_SHIFT))
@@ -155,10 +398,9 @@
 
 	bitset_size = lc->bitset_uint32_count * sizeof(*lc->clean_bits);
 
-	lc->clean_bits = malloc(bitset_size);
+	lc->clean_bits = malloc(bitset_size);	
 	if (!lc->clean_bits) {
 		LOG_ERROR("Unable to allocate clean bitset");
-		free(lc);
 		r = -ENOMEM;
 		goto fail;
 	}
@@ -167,36 +409,52 @@
 	lc->sync_bits = malloc(bitset_size);
 	if (!lc->sync_bits) {
 		LOG_ERROR("Unable to allocate sync bitset");
-		free(lc->clean_bits);
-		free(lc);
 		r = -ENOMEM;
 		goto fail;
 	}
 	memset(lc->sync_bits, (sync == NOSYNC) ? -1 : 0, bitset_size);
 	lc->sync_count = (sync == NOSYNC) ? region_count : 0;
-
-	lc->sync_search = 0;
-	lc->disk_fd = -1;
-	lc->recovering_region = (uint64_t)-1;
-
 	if (disk_log) {
-		/*
-		 * FIXME:
-		 *	- open device
-		 *	- allocate direct I/O space
-		 *	- test read before resume?
-		 */
-		LOG_ERROR("clustered_disk log not implemented");
-		free(lc->sync_bits);
-		free(lc->clean_bits);
-		free(lc);
-		r = -ENOSYS;
-		goto fail;
+		page_size = sysconf(_SC_PAGESIZE);
+		pages = bitset_size/page_size;
+		pages += bitset_size%page_size ? 1 : 0;
+		pages += 1; /* for header */
+
+		r = open(disk_path, O_RDWR | O_DIRECT);
+		if (r < 0) {
+			LOG_ERROR("Unable to open log device, %s: %s",
+				  disk_path, strerror(errno));
+			r = errno;
+			goto fail;
+		}
+		lc->disk_fd = r;
+		lc->disk_size = pages * page_size;
+
+		r = posix_memalign(&(lc->disk_buffer), page_size,
+			lc->disk_size);
+		if (r) {
+			LOG_ERROR("Unable to allocate memory for disk_buffer");
+			goto fail;
+		}
+		LOG_DBG("Disk log ready");
 	}
 
-	list_add(&lc->list, &log_list);
+	list_add(&lc->list, &log_pending_list);
 
+	EXIT();
+	return 0;
 fail:
+	if (lc) {
+		if (lc->clean_bits)
+			free(lc->clean_bits);
+		if (lc->sync_bits)
+			free(lc->sync_bits);
+		if (lc->disk_buffer)
+			free(lc->disk_buffer);
+		if (lc->disk_fd >= 0)
+			close(lc->disk_fd);
+		free(lc);
+	}
 	EXIT();
 	return r;
 }
@@ -227,6 +485,9 @@
 
 	if (strlen(tfr->data) != tfr->data_size) {
 		LOG_ERROR("Received constructor request with bad data");
+		LOG_DBG("strlen(tfr->data)[%d] != tfr->data_size[%d]",
+			strlen(tfr->data), tfr->data_size);
+		LOG_DBG("tfr->data = %s", tfr->data);
 		return -EINVAL;
 	}
 
@@ -249,12 +510,8 @@
 
 	argc--;  /* We pass in the device_size separate */
 	r = _clog_ctr(argc, argv, device_size);
-	if (!r) {
-		r = create_cluster_cpg(tfr->uuid);
 
-		if (r)
-			clog_dtr(tfr);
-	}
+	/* We join the CPG when we resume */
 
 	/* No returning data */
 	tfr->data_size = 0;
@@ -321,27 +578,151 @@
 	if (!lc)
 		return -EINVAL;
 
+	lc->resume_override = 0;
 	return 0;
 }
 
 /*
- * clog_resume
+ * _clog_resume
  * @tfr
  *
+ * Does the main work of resuming.
  */
-static int clog_resume(struct clog_tfr *tfr)
+static int _clog_resume(struct clog_tfr *tfr)
 {
+	uint32_t i;
 	struct log_c *lc = get_log(tfr->uuid);
+	size_t size = lc->bitset_uint32_count * sizeof(uint32_t);
 
 	if (!lc)
 		return -EINVAL;
 
-	if (lc->disk_fd != -1)
-		tfr->error = -ENOSYS;
+	if (lc->disk_fd == -1)
+		return 0;
+
+	switch (lc->resume_override) {
+	case 1000:
+		LOG_ERROR("ERROR:: Additional resume issued before suspend");
+		return 0;
+	case 0:
+		LOG_PRINT("Master resume: reading disk log");
+		lc->resume_override = 1000;
+		break;
+	case 1:
+		LOG_ERROR("Error:: partial bit loading (just sync_bits)");
+		return -EINVAL;
+	case 2:
+		LOG_ERROR("Error:: partial bit loading (just clean_bits)");
+		return -EINVAL;
+	case 3:
+		LOG_DBG("Non-master resume: bits pre-loaded");
+		lc->resume_override = 1000;
+		return 0;
+	default:
+		LOG_ERROR("Error:: multiple loading of bits (%d)", lc->resume_override);
+		return -EINVAL;
+	}
+
+	if (lc->log_dev_failed) {
+		LOG_ERROR("Log device has failed, unable to read bits");
+		tfr->error = 0;  /* We can handle this so far */
+		lc->disk_nr_regions = 0;
+	} else
+		tfr->error = read_log(lc);
+
+	switch (tfr->error) {
+	case 0:
+		if (lc->disk_nr_regions < lc->region_count)
+			LOG_PRINT("Mirror has grown, updating log bits");
+		else if (lc->disk_nr_regions > lc->region_count)
+			LOG_PRINT("Mirror has shrunk, updating log bits");
+		break;		
+	case -EINVAL:
+		LOG_DBG("Read log failed: not yet initialized");
+		lc->disk_nr_regions = 0;
+		break;
+	default:
+		LOG_ERROR("Failed to read disk log");
+		lc->disk_nr_regions = 0;
+		break;
+	}
+
+	/* If mirror has grown, set bits appropriately */
+	if (lc->sync == NOSYNC)
+		for (i = lc->disk_nr_regions; i < lc->region_count; i++)
+			log_set_bit(lc, lc->clean_bits, i);
+	else
+		for (i = lc->disk_nr_regions; i < lc->region_count; i++)
+			log_clear_bit(lc, lc->clean_bits, i);
+
+	/* Clear any old bits if device has shrunk */
+	for (i = lc->region_count; i % 32; i++)
+		log_clear_bit(lc, lc->clean_bits, i);
+
+	/* copy clean across to sync */
+	memcpy(lc->sync_bits, lc->clean_bits, size);
+	lc->sync_count = count_bits32(lc->clean_bits, lc->bitset_uint32_count);
+	lc->sync_search = 0;
+
+	/*
+	tfr->error = write_log(lc);
+	if (tfr->error) {
+		lc->log_dev_failed = 1;
+		LOG_ERROR("Failed to write initial disk log");
+	} else
+		lc->log_dev_failed = 0;
+	*/
+	/*
+	 * We mark 'touched' so that only the master commits
+	 * the log via 'commit_log'
+	 */
+	lc->touched = 1;
+
 	return tfr->error;
 }
 
 /*
+ * clog_resume
+ * @tfr
+ *
+ * If the log is pending, we must first join the cpg and
+ * put the log in the official list.
+ *
+ * If the log is in the official list, then we call
+ * _clog_resume.
+ */
+static int clog_resume(struct clog_tfr *tfr)
+{
+	int r;
+	struct log_c *lc = get_log(tfr->uuid);
+
+	if (!lc) {
+		/* Is the log in the pending list? */
+		lc = get_pending_log(tfr->uuid);
+		if (!lc) {
+			LOG_ERROR("clog_resume called on log that is not official or pending");
+			return -EINVAL;
+		}
+
+		/* Join the CPG */
+		r = create_cluster_cpg(tfr->uuid);
+		if (r) {
+			LOG_ERROR("clog_resume:  Failed to create cluster CPG");
+			return r;
+		}
+
+		/* move log to official list */
+		list_del_init(&lc->list);
+		list_add(&lc->list, &log_list);
+
+		return 0;
+	}
+
+	/* log is in the official list, try to resume */
+	return _clog_resume(tfr);
+}
+
+/*
  * clog_get_region_size
  * @tfr
  *
@@ -619,8 +1000,19 @@
 
 static int disk_status_info(struct log_c *lc, struct clog_tfr *tfr)
 {
-	tfr->error = -ENOSYS;
-	return -ENOSYS;
+	char *data = (char *)tfr->data;
+	struct stat statbuf;
+
+	if(fstat(lc->disk_fd, &statbuf)) {
+		tfr->error = -errno;
+		return -errno;
+	}
+
+	tfr->data_size = sprintf(data, "3 clustered_disk %d:%d %c",
+				 major(statbuf.st_rdev), minor(statbuf.st_rdev),
+				 'A'); /* FIXME: detect dead device */
+
+	return 0;
 }
 
 /*
@@ -649,9 +1041,9 @@
 	int params;
 	char *data = (char *)tfr->data;
 
-	params = (lc->sync == DEFAULTSYNC) ? 2 : 3;
-	tfr->data_size = sprintf(data, "clustered_core %d %u %sblock_on_error ",
-				 params, lc->region_size,
+	params = (lc->sync == DEFAULTSYNC) ? 3 : 4;
+	tfr->data_size = sprintf(data, "clustered_core %d %u %s %sblock_on_error ",
+				 params, lc->region_size, lc->uuid,
 				 (lc->sync == DEFAULTSYNC) ? "" :
 				 (lc->sync == NOSYNC) ? "nosync " : "sync ");
 	return 0;
@@ -659,8 +1051,23 @@
 
 static int disk_status_table(struct log_c *lc, struct clog_tfr *tfr)
 {
-	tfr->error = -ENOSYS;
-	return -ENOSYS;
+	int params;
+	char *data = (char *)tfr->data;
+	struct stat statbuf;
+
+	if(fstat(lc->disk_fd, &statbuf)) {
+		tfr->error = -errno;
+		return -errno;
+	}
+
+	params = (lc->sync == DEFAULTSYNC) ? 4 : 5;
+	tfr->data_size = sprintf(data, "clustered_disk %d %u %d:%d %s %sblock_on_error ",
+				 params, lc->region_size,
+				 major(statbuf.st_rdev), minor(statbuf.st_rdev),
+				 lc->uuid,
+				 (lc->sync == DEFAULTSYNC) ? "" :
+				 (lc->sync == NOSYNC) ? "nosync " : "sync ");
+	return 0;
 }
 
 /*
@@ -685,6 +1092,30 @@
 }
 
 /*
+ * clog_is_remote_recovering
+ * @tfr
+ *
+ */
+static int clog_is_remote_recovering(struct clog_tfr *tfr)
+{
+	int *rtn = (int *)tfr->data;
+	uint64_t region = *((uint64_t *)(tfr->data));
+	struct log_c *lc = get_log(tfr->uuid);
+
+	if (!lc)
+		return -EINVAL;
+
+	if (region > lc->region_count)
+		return -EINVAL;
+
+	*rtn = !log_test_bit(lc->sync_bits, region);
+	tfr->data_size = sizeof(*rtn);
+
+	return 0;	
+}
+
+
+/*
  * do_request
  * @tfr: the request
  *
@@ -757,7 +1188,12 @@
 	case DM_CLOG_STATUS_TABLE:
 		r = clog_status_table(tfr);
 		break;
+	case DM_CLOG_IS_REMOTE_RECOVERING:
+		r = clog_is_remote_recovering(tfr);
+		break;
 	default:
+		LOG_ERROR("Unknown request");
+		r = tfr->error = -EINVAL;
 		break;
 	}
 
@@ -805,9 +1241,12 @@
 		goto out;
 
 	if (lc->disk_fd >= 0) {
-		/* FIXME: implement */
-		tfr->error = -ENOSYS;
-		r = -ENOSYS;
+		r = tfr->error = write_log(lc);
+		if (r) {
+			LOG_ERROR("Error writing to disk log");
+			return -EIO;
+		}
+		LOG_DBG("Disk log written");
 	}
 
 	lc->touched = 0;
@@ -894,10 +1333,12 @@
 	}
 
 	if (!strncmp(which, "sync_bits", 9)) {
+		lc->resume_override += 1;
 		memcpy(lc->sync_bits, buf, bitset_size);
 		LOG_DBG("loading sync_bits:");
 		print_bits((char *)lc->sync_bits, bitset_size);
 	} else if (!strncmp(which, "clean_bits", 9)) {
+		lc->resume_override += 2;
 		memcpy(lc->clean_bits, buf, bitset_size);
 		LOG_DBG("loading clean_bits:");
 		print_bits((char *)lc->clean_bits, bitset_size);
@@ -913,6 +1354,7 @@
 	struct log_c *lc;
 
 	/* FIXME: Need prefetch to do this right */
+	LOG_DBG("Official log list:");
 	__list_for_each(l, &log_list) {
 		found = 1;
 		lc = list_entry(l, struct log_c, list);
@@ -924,5 +1366,17 @@
 		print_bits((char *)lc->clean_bits,
 			   lc->bitset_uint32_count * sizeof(*lc->clean_bits));
 	}
+	LOG_DBG("Pending log list:");
+	__list_for_each(l, &log_pending_list) {
+		found = 1;
+		lc = list_entry(l, struct log_c, list);
+		LOG_DBG("%s", lc->uuid);
+		LOG_DBG("sync_bits:");
+		print_bits((char *)lc->sync_bits,
+			   lc->bitset_uint32_count * sizeof(*lc->sync_bits));
+		LOG_DBG("clean_bits:");
+		print_bits((char *)lc->clean_bits,
+			   lc->bitset_uint32_count * sizeof(*lc->clean_bits));
+	}
 	return found;
 }
--- cluster/cmirror/src/Attic/local.c	2007/08/23 19:57:31	1.1.2.1
+++ cluster/cmirror/src/Attic/local.c	2007/11/03 18:37:48	1.1.2.2
@@ -164,6 +164,23 @@
 				  RQ_TYPE(tfr->request_type));
 			
 		break;
+	case DM_CLOG_RESUME:
+		/*
+		 * Resume is a special case that requires a local
+		 * component to join the CPG, and a cluster component
+		 * to handle the request.
+		 */
+		r = do_request(tfr);
+		if (r) {
+			LOG_DBG("Returning failed request to kernel [%s]",
+				RQ_TYPE(tfr->request_type));
+			r = kernel_send(tfr);
+			if (r)
+				LOG_ERROR("Failed to respond to kernel [%s]",
+					  RQ_TYPE(tfr->request_type));
+			break;
+		}
+		/* ELSE, fall through to default */
 	default:
 		/* Add before send_to_cluster, so cluster code can find it */
 		queue_add_tail(tfr, cluster_queue);
--- cluster/cmirror-kernel/src/dm-clog-tfr.h	2007/08/23 19:54:57	1.1.2.1
+++ cluster/cmirror-kernel/src/dm-clog-tfr.h	2007/11/03 18:37:48	1.1.2.2
@@ -27,6 +27,7 @@
 #define DM_CLOG_GET_SYNC_COUNT        14
 #define DM_CLOG_STATUS_INFO           15
 #define DM_CLOG_STATUS_TABLE          16
+#define DM_CLOG_IS_REMOTE_RECOVERING  17
 
 #define RQ_TYPE(x) \
 	((x) == DM_CLOG_CTR) ? "DM_CLOG_CTR" : \
@@ -45,7 +46,8 @@
 	((x) == DM_CLOG_GET_SYNC_COUNT) ? "DM_CLOG_GET_SYNC_COUNT" : \
 	((x) == DM_CLOG_STATUS_INFO) ? "DM_CLOG_STATUS_INFO" : \
 	((x) == DM_CLOG_STATUS_TABLE) ? "DM_CLOG_STATUS_TABLE" : \
-	NULL
+	((x) == DM_CLOG_IS_REMOTE_RECOVERING) ? \
+	"DM_CLOG_IS_REMOTE_RECOVERING" : NULL
 
 struct clog_tfr {
 	uint64_t private[2];
--- cluster/cmirror-kernel/src/dm-clog.c	2007/08/30 18:26:22	1.2.2.3
+++ cluster/cmirror-kernel/src/dm-clog.c	2007/11/03 18:37:48	1.2.2.4
@@ -560,6 +560,20 @@
 	return DMLOG_IOERR_BLOCK;
 }
 
+static int cluster_is_remote_recovering(struct dirty_log *log, region_t region)
+{
+	int r;
+	int is_recovering;
+	int rdata_size;
+	struct log_c *lc = (struct log_c *)log->context;
+
+	rdata_size = sizeof(is_recovering);
+	r = dm_clog_consult_server(lc->uuid, DM_CLOG_IS_REMOTE_RECOVERING,
+				   (char *)&region, sizeof(region),
+				   (char *)&is_recovering, &rdata_size);
+	return (r) ? 1 : is_recovering;
+}
+
 static struct dirty_log_type _clustered_core_type = {
 	.name = "clustered_core",
 	.module = THIS_MODULE,
@@ -579,6 +593,7 @@
 	.get_sync_count = cluster_get_sync_count,
 	.status = cluster_status,
 	.get_failure_response = cluster_get_failure_response,
+	.is_remote_recovering = cluster_is_remote_recovering,
 };
 
 static struct dirty_log_type _clustered_disk_type = {
@@ -600,6 +615,7 @@
 	.get_sync_count = cluster_get_sync_count,
 	.status = cluster_status,
 	.get_failure_response = cluster_get_failure_response,
+	.is_remote_recovering = cluster_is_remote_recovering,
 };
 
 static int __init cluster_dirty_log_init(void)
@@ -638,7 +654,7 @@
 		return r;
 	}
 
-	DMINFO("dm-clulog (built %s %s) installed", __DATE__, __TIME__);
+	DMINFO("dm-log-clustered (built %s %s) installed", __DATE__, __TIME__);
 	return 0;
 }
 
@@ -648,7 +664,7 @@
 	dm_unregister_dirty_log_type(&_clustered_core_type);
 	dm_clog_tfr_exit();
 	mempool_destroy(flush_entry_pool);
-	DMINFO("dm-clulog (built %s %s) removed", __DATE__, __TIME__);
+	DMINFO("dm-log-clustered (built %s %s) removed", __DATE__, __TIME__);
 	return;
 }
 



                 reply	other threads:[~2007-11-03 18:37 UTC|newest]

Thread overview: [no followups] expand[flat|nested]  mbox.gz  Atom feed

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20071103183751.5030.qmail@sourceware.org \
    --to=jbrassow@sourceware.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.