All of lore.kernel.org
 help / color / mirror / Atom feed
From: jbrassow@sourceware.org <jbrassow@sourceware.org>
To: cluster-devel.redhat.com
Subject: [Cluster-devel] cluster/cmirror-kernel/src dm-cmirror-client.c ...
Date: 3 Apr 2007 18:21:12 -0000	[thread overview]
Message-ID: <20070403182112.4454.qmail@sourceware.org> (raw)

CVSROOT:	/cvs/cluster
Module name:	cluster
Branch: 	RHEL4
Changes by:	jbrassow at sourceware.org	2007-04-03 19:21:10

Modified files:
	cmirror-kernel/src: dm-cmirror-client.c dm-cmirror-common.h 
	                    dm-cmirror-server.c dm-cmirror-xfr.h 

Log message:
	Bug 234539: multiple streams of I/O can cause system to lock up
	
	This bug provoked an audit of the communications exchange, locking,
	and memory allocations/stack usage.
	
	Communication fixes include:
	1) Added sequence numbers to ensure that replies from the server
	correctly correspond to client requests.  It was found that if
	a client timed out waiting for a server to respond, it would send
	the request again.  However, the server may have simply been too
	busy to respond in a timely fashion.  It ends up responding to
	both the original request and the resent request - causing the
	client and server to become out-of-sync WRT log requests.
	
	Locking fixes include:
	1) A semaphore was being "up"ed twice in some cases, rendering
	the lock impotent.
	
	2) A spin lock controlling region status lists was being held
	across blocking operations - sometimes causing deadlocks.  The
	spin lock was changed to a per-log lock, and some logging
	operations were restructured to better suit the way locking
	needed to be done.  A side-effect of this fix is a 20%
	improvement in write operations.
	
	3) The log list protection lock needed to change from a spin lock
	to a semaphore to allow blocking operations.
	
	Memory allocation fixes include:
	1) Wrong flags to kmalloc could cause deadlock.  Use NOFS instead
	of KERNEL.
	
	2) Mempools needed more reserves for low memory conditions.
	
	3) Server now allocates a communication structure instead of having
	it on the stack.  This reduces the likelyhood of stack corruption.

Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror-kernel/src/dm-cmirror-client.c.diff?cvsroot=cluster&only_with_tag=RHEL4&r1=1.1.2.42&r2=1.1.2.43
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror-kernel/src/dm-cmirror-common.h.diff?cvsroot=cluster&only_with_tag=RHEL4&r1=1.1.2.12&r2=1.1.2.13
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror-kernel/src/dm-cmirror-server.c.diff?cvsroot=cluster&only_with_tag=RHEL4&r1=1.1.2.27&r2=1.1.2.28
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror-kernel/src/dm-cmirror-xfr.h.diff?cvsroot=cluster&only_with_tag=RHEL4&r1=1.1.2.3&r2=1.1.2.4

--- cluster/cmirror-kernel/src/Attic/dm-cmirror-client.c	2007/03/22 22:21:59	1.1.2.42
+++ cluster/cmirror-kernel/src/Attic/dm-cmirror-client.c	2007/04/03 18:21:10	1.1.2.43
@@ -28,20 +28,16 @@
 #include "dm-cmirror-server.h"
 #include "dm-cmirror-cman.h"
 
-spinlock_t log_list_lock;
+DECLARE_MUTEX(log_list_lock);
 LIST_HEAD(log_list_head);
 
 struct region_state {
-	struct log_c *rs_lc;
+	int rs_mark_logged;
 	region_t rs_region;
 	struct list_head rs_list;
 };
 
 static mempool_t *region_state_pool = NULL;
-static spinlock_t region_state_lock;
-static int clear_region_count=0;
-static struct list_head clear_region_list;
-static struct list_head marked_region_list;
 
 static int shutting_down=0;
 static atomic_t suspend_client;
@@ -145,15 +141,7 @@
 	memset(lc->sync_bits, (sync == NOSYNC) ? -1 : 0, bitset_size);
 	lc->sync_count = (sync == NOSYNC) ? region_count : 0;
 
-	lc->recovering_bits = vmalloc(bitset_size);
-	if (!lc->recovering_bits) {
-		DMWARN("couldn't allocate sync bitset");
-		vfree(lc->sync_bits);
-		vfree(lc->clean_bits);
-		kfree(lc);
-		return -ENOMEM;
-	}
-	memset(lc->recovering_bits, 0, bitset_size);
+	lc->recovering_region = (uint64_t)-1;
 	lc->sync_search = 0;
 	log->context = lc;
 	return 0;
@@ -164,7 +152,6 @@
 	struct log_c *lc = (struct log_c *) log->context;
 	vfree(lc->clean_bits);
 	vfree(lc->sync_bits);
-	vfree(lc->recovering_bits);
 	kfree(lc);
 }
 
@@ -321,8 +308,9 @@
 
 	request_count++;
 
-	lr = kmalloc(sizeof(struct log_request), GFP_KERNEL);
+	lr = kmalloc(sizeof(struct log_request), GFP_NOFS);
 	if(!lr){
+		BUG();
 		error = -ENOMEM;
 		*retry = 1;
 		goto fail;
@@ -404,15 +392,15 @@
 	}
     
 	if (seq != lr->lr_seq) {
-		DMERR("Message sequence number mismatch: %d/%d",
+		DMDEBUG("Message sequence number mismatch: %d/%d",
 		      seq, lr->lr_seq);
 		if (seq > lr->lr_seq) {
-			DMERR(" Skipping.  Listening again for response to %s",
+			DMDEBUG(" Skipping.  Listening again for response to %s",
 			      RQ_STRING(type));
 			memset(lr, 0, sizeof(struct log_request));
 			goto rerecv;
 		}
-		DMERR(" Must try to resend request, %s", RQ_STRING(type));
+		DMERR(" Seq# mismatch: Must try to resend request, %s", RQ_STRING(type));
 		error = -EBADE;
 		*retry = 1;
 		seq++;
@@ -509,91 +497,43 @@
 			new_server = 1;
 		}
 
-		spin_lock(&region_state_lock);
+		spin_lock(&lc->state_lock);
 		if(new_server && 
-		   (!list_empty(&clear_region_list) ||
-		    !list_empty(&marked_region_list))){
+		   !list_empty(&lc->mark_logged)){
 			int i=0;
-			struct region_state *tmp_rs;
+			LIST_HEAD(mark);
 
 			DMINFO("Clean-up required due to new server");
-			DMINFO(" - Wiping clear region list");
-			list_for_each_entry_safe(rs, tmp_rs,
-						 &clear_region_list, rs_list){
-				/* Remove only those associated with referenced log */
-				if (rs->rs_lc != lc)
-					continue;
-				i++;
-				list_del_init(&rs->rs_list);
-				mempool_free(rs, region_state_pool);
-			}
-			clear_region_count -= i;
-			DMINFO(" - %d clear region requests wiped", i);
-			i=0;
 			DMINFO(" - Resending all mark region requests");
-			list_for_each_entry(rs, &marked_region_list, rs_list){
-				/* Resend only those associated with referenced log */
-				if (rs->rs_lc != lc)
-					continue;
+			list_splice_init(&lc->mark_logged, &mark);
+
+			spin_unlock(&lc->state_lock);
+
+			list_for_each_entry(rs, &mark, rs_list){
 				do {
 					retry = 0;
-					i++;
-					rtn = _consult_server(rs->rs_lc, rs->rs_region,
+					rtn = _consult_server(lc, rs->rs_region,
 							      LRT_MARK_REGION, NULL, &retry);
 					if (lc->server_id == 0xDEAD) {
-						spin_unlock(&region_state_lock);
 						goto election;
 					}
 				} while(retry);
+				i++;
 			}
+
+			spin_lock(&lc->state_lock);
+			list_splice_init(&mark, &lc->mark_logged);
+
 			DMINFO(" - %d mark region requests resent", i);
 			DMINFO("Clean-up complete");
-			if(type == LRT_MARK_REGION){
-				/* we just handled all marks */
-				DMWARN("Mark request ignored.\n");
-				spin_unlock(&region_state_lock);
-				goto out;
-			} else {
-				DMINFO("Continuing request type, %d (%s)", type,
-				       RQ_STRING(type));
-			}
+			DMINFO("Continuing request type, %d (%s)", type,
+			       RQ_STRING(type));
 			new_server = 0;
 		}
-
-		rs = NULL;
-
-		if(!list_empty(&clear_region_list)){
-			rs = list_entry(clear_region_list.next,
-					struct region_state, rs_list);
-			list_del_init(&rs->rs_list);
-			clear_region_count--;
-		}
-
-		spin_unlock(&region_state_lock);
-		
-		/* ATTENTION -- it may be possible to remove a clear region **
-		** request from the list.  Then, have a mark region happen  **
-		** while we are here.  If the clear region request fails, it**
-		** would be re-added - perhaps prematurely clearing the bit */
+		spin_unlock(&lc->state_lock);
 		
-		if(rs && !rs->rs_lc->log_dev_failed){
-			_consult_server(rs->rs_lc, rs->rs_region,
-					LRT_CLEAR_REGION, NULL, &retry);
-
-			if(retry){
-				spin_lock(&region_state_lock);
-				list_add(&rs->rs_list, &clear_region_list);
-				clear_region_count++;
-				spin_unlock(&region_state_lock);
-
-			} else {
-				mempool_free(rs, region_state_pool);
-			}
-		}
 		retry = 0;
-		
 		rtn = _consult_server(lc, region, type, result, &retry);
-		schedule();
 	} while(retry);
 out:
 	up(&consult_server_lock);
@@ -640,7 +580,7 @@
 	atomic_set(&lc->in_sync, -1);
 	lc->uuid_ref = 1;
 
-	spin_lock(&log_list_lock);
+	down(&log_list_lock);
 	list_for_each_entry(tmp_lc, &log_list_head, log_list){
 		if(!strncmp(tmp_lc->uuid, lc->uuid, MAX_NAME_LEN)){
 			lc->uuid_ref = (lc->uuid_ref > tmp_lc->uuid_ref) ?
@@ -649,12 +589,16 @@
 	}
 
 	list_add(&lc->log_list, &log_list_head);
-	spin_unlock(&log_list_lock);
+	up(&log_list_lock);
 	DMDEBUG("Creating %s (%d)",
 	       lc->uuid + (strlen(lc->uuid) - 8),
 	       lc->uuid_ref);
 
 	INIT_LIST_HEAD(&lc->region_users);
+	INIT_LIST_HEAD(&lc->clear_waiting);
+	INIT_LIST_HEAD(&lc->mark_waiting);
+	INIT_LIST_HEAD(&lc->mark_logged);
+	spin_lock_init(&lc->state_lock);
 
 	lc->server_id = 0xDEAD;
 
@@ -761,31 +705,44 @@
 	       lc->uuid + (strlen(lc->uuid) - 8),
 	       lc->uuid_ref);
 
-	if (!list_empty(&clear_region_list))
-		DMINFO("Leaving while clear region requests remain.");
-
-	spin_lock(&log_list_lock);
+	down(&log_list_lock);
 	list_del_init(&lc->log_list);
-	spin_unlock(&log_list_lock);
+	up(&log_list_lock);
 
 	if ((lc->server_id == my_id) && !atomic_read(&lc->suspended))
 		consult_server(lc, 0, LRT_MASTER_LEAVING, NULL);
 
 	sock_release(lc->client_sock);
 
-	spin_lock(&region_state_lock);
+	spin_lock(&lc->state_lock);
 
-	list_for_each_entry_safe(rs, tmp_rs, &clear_region_list, rs_list) {
-		if (lc == rs->rs_lc)
+	if (!list_empty(&lc->clear_waiting)) {
+		DMINFO("Clear requests remain at cluster log deactivation");
+		list_for_each_entry_safe(rs, tmp_rs, &lc->clear_waiting, rs_list) {
 			list_del_init(&rs->rs_list);
+			DMINFO(" - Ignoring clear request: %Lu", rs->rs_region);
+			mempool_free(rs, region_state_pool);
+		}
 	}
 
-	list_for_each_entry_safe(rs, tmp_rs, &marked_region_list, rs_list) {
-		if (lc == rs->rs_lc)
-			list_del_init(&rs->rs_list);
+	if (!list_empty(&lc->mark_waiting)) {
+		DMERR("Pending mark requests remain at cluster_dtr");
+		BUG();
+	}
+
+	if (!list_empty(&lc->mark_logged)) {
+		DMERR("Mark requests remain at cluster log deactivation");
+		/*
+		 * Should I BUG() this?
+		 * No.  In the worst case, they will get cleaned up later
+		 */
+	}
+	list_for_each_entry_safe(rs, tmp_rs, &lc->mark_logged, rs_list) {
+		list_del_init(&rs->rs_list);
+		mempool_free(rs, region_state_pool);
 	}
 
-	spin_unlock(&region_state_lock);
+	spin_unlock(&lc->state_lock);
 
 	if (lc->log_dev)
 		disk_dtr(log);
@@ -803,19 +760,27 @@
 
 static int cluster_postsuspend(struct dirty_log *log)
 {
+	struct region_state *rs, *tmp_rs;
 	struct log_c *lc = (struct log_c *) log->context;
 
-	while (1) {
-		spin_lock(&region_state_lock);
-		if (list_empty(&clear_region_list)) {
-			spin_unlock(&region_state_lock);
-			break;
-		}
-		spin_unlock(&region_state_lock);
+	spin_lock(&lc->state_lock);
+	if (!list_empty(&lc->mark_waiting)) {
+		DMERR("Mark requests remain at postsuspend!");
+		BUG();
+	}
 
-		/* Just an unnessesary call to clear out regions */
-		consult_server(lc, 0, LRT_IN_SYNC, NULL);
+	if (!list_empty(&lc->clear_waiting)) {
+		DMERR("Clear requests remain at postsuspend!");
+
+		list_for_each_entry_safe(rs, tmp_rs, &lc->clear_waiting, rs_list) {
+			list_del_init(&rs->rs_list);
+			DMERR(" - Ignoring clear request: %Lu", rs->rs_region);
+			mempool_free(rs, region_state_pool);
+		}
 	}
+
+	spin_unlock(&lc->state_lock);
+
 	atomic_set(&lc->suspended, 1);
 	if(lc->server_id == my_id) {
 		while (1) {
@@ -903,103 +868,162 @@
 
 static int cluster_flush(struct dirty_log *log)
 {
+	int r = 0;
+	int clear_count = 0;
+	int mark_count = 0;
 	struct log_c *lc = (struct log_c *) log->context;
+	struct region_state *rs, *tmp_rs;
+	LIST_HEAD(mark);
+	LIST_HEAD(clear);
+
+	/*
+	 * It should never be a problem to temporarily have
+	 * the mark requests in limbo.  The only functions
+	 * that call cluster_flush are rh_update_states and
+	 * do_writes, and they are in the same thread as
+	 * those changing the region states
+	 */
+	spin_lock(&lc->state_lock);
+	list_splice_init(&lc->clear_waiting, &clear);
+	list_splice_init(&lc->mark_waiting, &mark);
+	spin_unlock(&lc->state_lock);
+
+	list_for_each_entry_safe(rs, tmp_rs, &clear, rs_list) {
+		/* don't really care if LRT_CLEAR_REGION fails */
+		consult_server(lc, rs->rs_region, LRT_CLEAR_REGION, NULL);
+		list_del_init(&rs->rs_list);
+		mempool_free(rs, region_state_pool);
+		clear_count++;
+	}
+
+	list_for_each_entry_safe(rs, tmp_rs, &mark, rs_list) {
+		while (1) {
+			r = consult_server(lc, rs->rs_region,
+					   LRT_MARK_REGION, NULL);
+			if (!r)
+				break;
+
+			if (r == -EBUSY) {
+				DMDEBUG("Delaying mark to region %Lu, due to recovery",
+					rs->rs_region);
+				set_current_state(TASK_INTERRUPTIBLE);
+				schedule_timeout(HZ/2);
+				continue;
+			}
 
-	/* FIXME:  flush all clear_region requests to server */
-	return (lc->log_dev_failed) ? -EIO : 0;
+			if (r == -EIO)
+				goto fail;
+
+			DMWARN("unable to get server (%u) to mark region (%Lu)",
+			       lc->server_id, rs->rs_region);
+			DMWARN("Reason :: %d", r);
+		}
+		mark_count++;
+	}
+
+	/* No flush work? */
+	if (!clear_count && !mark_count)
+		return 0;
+
+	spin_lock(&lc->state_lock);
+	list_splice_init(&mark, &lc->mark_logged);
+	spin_unlock(&lc->state_lock);
+
+	while ((r = consult_server(lc, 0, LRT_FLUSH, NULL))) {
+		if (r == -EBUSY) {
+			DMDEBUG("Delaying flush due to recovery");
+			set_current_state(TASK_INTERRUPTIBLE);
+			schedule_timeout(HZ/2);
+			continue;
+		}
+
+		if (r == -EIO)
+			break;
+	}
+
+fail:
+	if (r) {
+		DMERR("Log flush failure: %d%s", r,
+		      (r == -EIO) ? " -EIO" : "");
+		dm_table_event(lc->ti->table);
+		lc->log_dev_failed = 1;
+	}
+
+	return r;
 }
 
 static void cluster_mark_region(struct dirty_log *log, region_t region)
 {
-	int error = 0;
 	struct region_state *rs, *tmp_rs, *rs_new;
 	struct log_c *lc = (struct log_c *) log->context;
 
-	rs_new = mempool_alloc(region_state_pool, GFP_KERNEL);
+	spin_lock(&lc->state_lock);
 
-	memset(rs_new, 0, sizeof(struct region_state));
 
-	spin_lock(&region_state_lock);
-	list_for_each_entry_safe(rs, tmp_rs, &clear_region_list, rs_list){
-		if(lc == rs->rs_lc && region == rs->rs_region){
-			/*
-			DMDEBUG("Mark pre-empting clear (%Lu/%s)",
-				region, lc->uuid + (strlen(lc->uuid) - 8));
-			*/
+	/*
+	 * It is possible for the following in the mirror code:
+	 *  0) Mark is already logged for a region
+	 *  1) rh_dec, sets region state to RH_CLEAN (asynchronous)
+	 *  2) rh_update_states (DOESN'T FLUSH!!!, bug #235040)
+	 *  3) do_writes, trys to mark region
+	 *
+	 * The following shouldn't have to be handled b/c of the flush
+	 *  0) Region finishes recovery
+	 *  1) rh_update_states clears region (DOES FLUSH)
+	 *  2) do_writes, trys to mark region
+	 *
+	 * This can lead to this next case being valid.
+	 */
+	list_for_each_entry_safe(rs, tmp_rs, &lc->clear_waiting, rs_list) {
+		if (region == rs->rs_region) {
+			if (!rs->rs_mark_logged) {
+				DMERR("Moving region(%Lu/%s) from clear_waiting -> mark_waiting",
+				      region, lc->uuid + (strlen(lc->uuid) - 8));
+			}
 			list_del_init(&rs->rs_list);
-			list_add(&rs->rs_list, &marked_region_list);
-			clear_region_count--;
-			spin_unlock(&region_state_lock);
-			if (rs_new)
-				mempool_free(rs_new, region_state_pool);
-
-			return;
+			list_add(&rs->rs_list,
+				 (rs->rs_mark_logged) ?
+				 &lc->mark_logged : &lc->mark_waiting);
+			goto out;
 		}
 	}
+
 	/*
-	 * In the mirroring code, it is possible for a write
-	 * to complete and call rh_dec - putting the region on
-	 * the clear_region list.  However, before the actual
-	 * clear request is issued to the log (rh_update_states)
-	 * another mark happens.  So, we check for and remove
-	 * duplicates.
+	 * It is possible for the following in the mirror code:
+	 *  0) Mark is already logged for a region
+	 *  1) rh_update_states
+	 *  2) rh_dec, sets region state to RH_CLEAN (asynchronous)
+	 *  3) do_writes, trys to mark region
+	 *
+	 * This can lead to this next case being valid.
 	 */
-	list_for_each_entry(rs, &marked_region_list, rs_list){
-		if(lc == rs->rs_lc && region == rs->rs_region){
-#ifdef DEBUG
-			DMINFO("Double mark on region ("
-			       SECTOR_FORMAT ")", region);
-#endif
-			spin_unlock(&region_state_lock);
-			if (rs_new)
-				mempool_free(rs_new, region_state_pool);
-
-			return;
+	list_for_each_entry_safe(rs, tmp_rs, &lc->mark_logged, rs_list){
+		if (region == rs->rs_region) {
+			goto out;
 		}
 	}
 
-	if(!rs_new){
-		DMERR("Unable to allocate region_state for mark.");
-		BUG();
+	list_for_each_entry_safe(rs, tmp_rs, &lc->mark_waiting, rs_list){
+		if (region == rs->rs_region) {
+			DMERR("Mark already waiting (%Lu/%s)",
+			      region, lc->uuid + (strlen(lc->uuid) - 8));
+			BUG();
+		}
 	}
+	spin_unlock(&lc->state_lock);
 
-	rs_new->rs_lc = lc;
+	rs_new = mempool_alloc(region_state_pool, GFP_NOFS);
+	BUG_ON(!rs_new);
+	memset(rs_new, 0, sizeof(struct region_state));
+
+	spin_lock(&lc->state_lock);
+	rs_new->rs_mark_logged = 1;
 	rs_new->rs_region = region;
 	INIT_LIST_HEAD(&rs_new->rs_list);
-	list_add(&rs_new->rs_list, &marked_region_list);
-
-	spin_unlock(&region_state_lock);
-
-	if (!lc->log_dev_failed) {
-		while((error = consult_server(lc, region, LRT_MARK_REGION, NULL))){
-			if (error == -EBUSY) {
-				/* Remote recovering delay and try again */
-				DMDEBUG("Delaying mark to region %Lu, due to recovery",
-					region);
-				set_current_state(TASK_INTERRUPTIBLE);
-				schedule_timeout(HZ/2);
-				continue;
-			}
-
-			if (error == -EIO) {
-				lc->log_dev_failed = 1;
-				break;
-			}
-			DMWARN("unable to get server (%u) to mark region (%Lu)",
-			       lc->server_id, region);
-			DMWARN("Reason :: %d", error);
-		}
+	list_add(&rs_new->rs_list, &lc->mark_waiting);
+out:
+	spin_unlock(&lc->state_lock);
 
-		if (lc->log_dev_failed) {
-			dm_table_event(lc->ti->table);
-			/*
-			  DMERR("Write failed on mirror log device, %s",
-			  lc->log_dev->name);
-			  if (!atomic_read(&lc->suspended))
-			  wait_for_completion(&lc->failure_completion);
-			*/
-		}
-	}
 	return;
 }
 
@@ -1008,53 +1032,48 @@
 	struct log_c *lc = (struct log_c *) log->context;
 	struct region_state *rs, *tmp_rs, *rs_new;
 
-	rs_new = mempool_alloc(region_state_pool, GFP_ATOMIC);
+	spin_lock(&lc->state_lock);
 
-	memset(rs_new, 0, sizeof(struct region_state));
+	/* Should find match in this list, or no lists at all */
+	list_for_each_entry_safe(rs, tmp_rs, &lc->mark_logged, rs_list){
+		if(region == rs->rs_region){
+			list_del_init(&rs->rs_list);
+			list_add(&rs->rs_list, &lc->clear_waiting);
+			goto out;
+		}
+	}
 
-	spin_lock(&region_state_lock);
 
-	list_for_each_entry_safe(rs, tmp_rs, &clear_region_list, rs_list){
-		if(lc == rs->rs_lc && region == rs->rs_region){
-			DMINFO("%d) Double clear on region ("
-			      SECTOR_FORMAT ")", __LINE__, region);
-			spin_unlock(&region_state_lock);
-			if (rs_new)
-				mempool_free(rs_new, region_state_pool);
-			return;
+	list_for_each_entry_safe(rs, tmp_rs, &lc->mark_waiting, rs_list){
+		if(region == rs->rs_region){
+			DMERR("Clear pre-empting mark (%Lu/%s)",
+			       region, lc->uuid + (strlen(lc->uuid) - 8));
+			BUG();
 		}
 	}
 
-	list_for_each_entry_safe(rs, tmp_rs, &marked_region_list, rs_list){
-		if(lc == rs->rs_lc && region == rs->rs_region){
-			list_del_init(&rs->rs_list);
-			list_add(&rs->rs_list, &clear_region_list);
-			clear_region_count++;
-			spin_unlock(&region_state_lock);
-			if (rs_new)
-				mempool_free(rs_new, region_state_pool);
-			return;
+	list_for_each_entry_safe(rs, tmp_rs, &lc->clear_waiting, rs_list){
+		if(region == rs->rs_region){
+			DMERR("%d) Double clear on region ("
+			      SECTOR_FORMAT ")", __LINE__, region);
+			BUG();
 		}
 	}
-
-	/* We can get here because we my be doing resync_work, and therefore, **
+	/* We can get here because we may be doing resync_work, and therefore,**
 	** clearing without ever marking..................................... */
 
-	if(!rs_new){
-		DMERR("Unable to allocate region_state for clear.");
-		BUG();
-	}
+	/* Don't need to spin_unlock, because allocation is non-blocking */
+	rs_new = mempool_alloc(region_state_pool, GFP_ATOMIC);
+	BUG_ON(!rs_new);
+	memset(rs_new, 0, sizeof(struct region_state));
 
-	rs_new->rs_lc = lc;
 	rs_new->rs_region = region;
 	INIT_LIST_HEAD(&rs_new->rs_list);
-	list_add(&rs_new->rs_list, &clear_region_list);
-	clear_region_count++;
-	if(!(clear_region_count & 0x7F)){
-		DMINFO("clear_region_count :: %d", clear_region_count);
-	}
+	list_add(&rs_new->rs_list, &lc->clear_waiting);
+
+out:
+	spin_unlock(&lc->state_lock);
 
-	spin_unlock(&region_state_lock);
 	return;
 }
 
@@ -1122,27 +1141,6 @@
 
 	switch(status){
 	case STATUSTYPE_INFO:
-/*
-		spin_lock(&region_state_lock);
-		i = clear_region_count;
-		list_for_each_entry(rs, &marked_region_list, rs_list){
-			j++;
-		}
-		spin_unlock(&region_state_lock);
-
-		DMINFO("CLIENT OUTPUT::");
-		DMINFO("  My ID            : %u", my_id);
-		DMINFO("  Server ID        : %u", lc->server_id);
-
-		DMINFO("  In-sync          : %s", (atomic_read(&lc->in_sync)>0)?
-		       "YES" : "NO");
-		DMINFO("  Regions marked   : %d", j);
-		DMINFO("  Regions clearing : %d", i);
-
-		if(lc->server_id == my_id){
-			print_server_status(lc);
-		}
-*/
 		if(lc->sync != DEFAULTSYNC)
 			arg_count++;
 
@@ -1195,11 +1193,11 @@
 
 	atomic_set(&suspend_client, 1);
 
-	spin_lock(&log_list_lock);
+	down(&log_list_lock);
 	list_for_each_entry(lc, &log_list_head, log_list) {
 		atomic_set(&lc->in_sync, 0);
 	}
-	spin_unlock(&log_list_lock);
+	up(&log_list_lock);
 
 	/*
 	if (likely(!shutting_down))
@@ -1221,7 +1219,12 @@
 	global_nodeids = nodeids;
 	global_count = count;
 
-	kcl_get_node_by_nodeid(0, &node);
+	for (i = 0; kcl_get_node_by_nodeid(0, &node); i++) {
+		if (i > 10)
+			BUG();
+		else
+			DMERR("Bad call to kcl_get_node_by_nodeid");
+	}
 	my_id = node.node_id;
 
 	/* Wait for any outstanding starts to complete */
@@ -1233,7 +1236,7 @@
 	switch(type){
 	case SERVICE_NODE_LEAVE:
 	case SERVICE_NODE_FAILED:
-		spin_lock(&log_list_lock);
+		down(&log_list_lock);
 		list_for_each_entry(lc, &log_list_head, log_list){
 			for(i=0, server = 0xDEAD; i < count; i++){
 				if(lc->server_id == nodeids[i]){
@@ -1243,7 +1246,7 @@
 			/* ATTENTION -- need locking around this ? */
 			lc->server_id = server;
 		}
-		spin_unlock(&log_list_lock);
+		up(&log_list_lock);
 
 		break;
 	case SERVICE_NODE_JOIN:
@@ -1279,10 +1282,8 @@
 
 	down(&cmirror_register_lock);
 
-	if (mirror_set_count++) {
-		up(&cmirror_register_lock);
+	if (mirror_set_count++)
 		goto out;
-	}
 
 	r = kcl_register_service("clustered_log", 13, SERVICE_LEVEL_GDLM, &clog_ops,
 				 1, NULL, &local_id);
@@ -1383,12 +1384,7 @@
         DMINFO("dm-cmirror %s (built %s %s) installed",
                CMIRROR_RELEASE_NAME, __DATE__, __TIME__);
 
-	INIT_LIST_HEAD(&clear_region_list);
-	INIT_LIST_HEAD(&marked_region_list);
-
-	spin_lock_init(&region_state_lock);
-	spin_lock_init(&log_list_lock);
-	region_state_pool = mempool_create(20, region_state_alloc,
+	region_state_pool = mempool_create(500, region_state_alloc,
 					   region_state_free, NULL);
 	if(!region_state_pool){
 		DMWARN("couldn't create region state pool");
@@ -1424,6 +1420,8 @@
 	}
 	dm_unregister_dirty_log_type(&_clustered_core_type);
 	dm_unregister_dirty_log_type(&_clustered_disk_type);
+        DMINFO("dm-cmirror %s (built %s %s) removed",
+               CMIRROR_RELEASE_NAME, __DATE__, __TIME__);
 }
 
 module_init(cluster_dirty_log_init);
--- cluster/cmirror-kernel/src/Attic/dm-cmirror-common.h	2007/02/21 17:14:44	1.1.2.12
+++ cluster/cmirror-kernel/src/Attic/dm-cmirror-common.h	2007/04/03 18:21:10	1.1.2.13
@@ -97,7 +97,7 @@
 	unsigned bitset_uint32_count;
 	uint32_t *clean_bits;
 	uint32_t *sync_bits;
-	uint32_t *recovering_bits;	/* FIXME: this seems excessive */
+	uint64_t recovering_region;
 
 	int sync_pass;          /* number of passes attempting to resync */
 	int sync_search;
@@ -134,7 +134,12 @@
 	atomic_t in_sync;  /* like sync_count, except all or nothing */
 
 	struct list_head log_list;
-	struct list_head region_users;
+	struct list_head region_users;  /* Used by Server */
+
+	spinlock_t state_lock;
+	struct list_head clear_waiting;
+	struct list_head mark_waiting;
+	struct list_head mark_logged;
 
 	uint32_t server_id;
 	struct socket *client_sock;
--- cluster/cmirror-kernel/src/Attic/dm-cmirror-server.c	2007/03/22 22:21:59	1.1.2.27
+++ cluster/cmirror-kernel/src/Attic/dm-cmirror-server.c	2007/04/03 18:21:10	1.1.2.28
@@ -47,7 +47,7 @@
 static atomic_t _do_requests;
 
 static int debug_disk_write = 0;
-extern spinlock_t log_list_lock;
+extern struct semaphore log_list_lock;
 extern struct list_head log_list_head;
 
 static void *region_user_alloc(int gfp_mask, void *pool_data){
@@ -225,6 +225,11 @@
 
 static int _core_get_resync_work(struct log_c *lc, region_t *region)
 {
+	if (lc->recovering_region != (uint64_t)-1) {
+		DMDEBUG("Someone is already recovering (%Lu)", lc->recovering_region);
+		return 0;
+	}
+
 	if (lc->sync_search >= lc->region_count) {
 		/*
 		 * FIXME: pvmove is not supported yet, but when it is,
@@ -237,18 +242,16 @@
 			return 0;
 		}
 	}
-	do {
-		*region = ext2_find_next_zero_bit((unsigned long *) lc->sync_bits,
-						  lc->region_count,
-						  lc->sync_search);
-		lc->sync_search = *region + 1;
-
-		if (*region >= lc->region_count)
-			return 0;
+	*region = ext2_find_next_zero_bit((unsigned long *) lc->sync_bits,
+					  lc->region_count,
+					  lc->sync_search);
+	lc->sync_search = *region + 1;
 
-	} while (log_test_bit(lc->recovering_bits, *region));
+	if (*region >= lc->region_count)
+		return 0;
 
-	log_set_bit(lc, lc->recovering_bits, *region);
+	lc->recovering_region = *region;
+	DMDEBUG("Assigning recovery work: %Lu", *region);
 	return 1;
 }
 
@@ -371,7 +374,7 @@
 			bad_count++;
 			log_clear_bit(lc, lc->sync_bits, ru->ru_region);
 			if (ru->ru_rw == RU_RECOVER) {
-				log_clear_bit(lc, lc->recovering_bits, ru->ru_region);
+				lc->recovering_region = (uint64_t)-1;
 			}
 			list_del(&ru->ru_list);
 			mempool_free(ru, region_user_pool);
@@ -506,10 +509,9 @@
 
 static int server_mark_region(struct log_c *lc, struct log_request *lr, uint32_t who)
 {
-	int r = 0;
 	struct region_user *ru, *new;
 
-	new = mempool_alloc(region_user_pool, GFP_KERNEL);
+	new = mempool_alloc(region_user_pool, GFP_NOFS);
 	if(!new){
 		return -ENOMEM;
 	}
@@ -519,21 +521,13 @@
     
 	if (!(ru = find_ru_by_region(lc, lr->u.lr_region))) {
 		log_clear_bit(lc, lc->clean_bits, lr->u.lr_region);
-		r = write_bits(lc);
-
 		list_add(&new->ru_list, &lc->region_users);
-		if (!r) {
-			lc->touched = 0;
-			lc->log_dev_failed = 0;
-		} else {
-			lc->log_dev_failed = 1;
-		}
 	} else if (ru->ru_rw == RU_RECOVER) {
-		DMINFO("Attempt to mark a region " SECTOR_FORMAT 
+		DMDEBUG("Attempt to mark a region " SECTOR_FORMAT
 		      "/%s which is being recovered.",
 		       lr->u.lr_region, lc->uuid + (strlen(lc->uuid) - 8));
-		DMINFO("Current recoverer: %u", ru->ru_nodeid);
-		DMINFO("Mark requester   : %u", who);
+		DMDEBUG("Current recoverer: %u", ru->ru_nodeid);
+		DMDEBUG("Mark requester   : %u", who);
 
 		mempool_free(new, region_user_pool);
 		return -EBUSY;
@@ -547,7 +541,7 @@
 		mempool_free(new, region_user_pool);
 	}
 
-	return (lc->log_dev_failed) ? -EIO : 0;
+	return 0;
 }
 
 
@@ -567,28 +561,34 @@
 
 	if(!find_ru_by_region(lc, lr->u.lr_region)){
 		log_set_bit(lc, lc->clean_bits, lr->u.lr_region);
-		write_bits(lc);
-		/*
-		if (write_bits(lc))
-			DMERR("Write bits failed on mirror log device, %s",
-			      lc->log_dev->name);
-		*/
 	}
 	return 0;
 }
 
 
+static int server_flush(struct log_c *lc)
+{
+	int r = 0;
+
+	r = write_bits(lc);
+	if (!r) {
+		lc->touched = 0;
+		lc->log_dev_failed = 0;
+	} else {
+		lc->log_dev_failed = 1;
+	}
+
+	return (lc->log_dev_failed) ? -EIO : 0;
+}
+
+
 static int server_get_resync_work(struct log_c *lc, struct log_request *lr, uint32_t who)
 {
 	struct region_user *new;
 
-/* We now have the ability to use remote_recovering
-	if (my_id != who)
-		return 0;
-*/
-
-	new = mempool_alloc(region_user_pool, GFP_KERNEL);
+	new = mempool_alloc(region_user_pool, GFP_NOFS);
 	if(!new){
+		lr->u.lr_int_rtn = 0;
 		return -ENOMEM;
 	}
 	
@@ -610,9 +610,15 @@
 		return -EINVAL;
 	}
 
-	log_clear_bit(lc, lc->recovering_bits, lr->u.lr_region);
-
 	if (success) {
+		if (lr->u.lr_region != lc->recovering_region) {
+			DMERR("Told to clear recovery on wrong region %Lu/%Lu",
+			      lr->u.lr_region, lc->recovering_region);
+			return -EINVAL;
+		}
+
+		lc->recovering_region = (uint64_t)-1;
+
 		/* We could receive multiple identical request due to network failure */
 		if(!log_test_bit(lc->sync_bits, lr->u.lr_region)) {
 			log_set_bit(lc, lc->sync_bits, lr->u.lr_region);
@@ -650,7 +656,7 @@
 static struct log_c *get_log_context(char *uuid, int uuid_ref){
 	struct log_c *lc, *r = NULL;
 
-	spin_lock(&log_list_lock);
+	down(&log_list_lock);
 	list_for_each_entry(lc, &log_list_head, log_list){
 		if (!strncmp(lc->uuid, uuid, MAX_NAME_LEN) &&
 		    (uuid_ref == lc->uuid_ref)) {
@@ -660,7 +666,7 @@
 				r = lc;
 		}
 	}
-	spin_unlock(&log_list_lock);
+	up(&log_list_lock);
 
 	return r;
 }
@@ -838,6 +844,7 @@
  * Returns: 0 on success, -1 on error
  */
 static int process_log_request(struct socket *sock){
+	static struct log_request *lr = NULL;
 	int error;
 	uint32_t nodeid;
 	struct msghdr msg;
@@ -845,9 +852,13 @@
 	struct sockaddr_in saddr_in;
 	mm_segment_t fs;
 	struct log_c *lc;
-	struct log_request lr; /* ATTENTION -- could be too much on the stack */
 
-	memset(&lr, 0, sizeof(struct log_request));
+	if (unlikely(!lr))
+		lr = kmalloc(sizeof(*lr), GFP_KERNEL);
+	if (!lr)
+		return -1;
+
+	memset(lr, 0, sizeof(struct log_request));
 	memset(&saddr_in, 0, sizeof(saddr_in));
 		
 	msg.msg_control = NULL;
@@ -858,7 +869,7 @@
 	msg.msg_name = &saddr_in;
 	msg.msg_namelen = sizeof(saddr_in);
 	iov.iov_len = sizeof(struct log_request);
-	iov.iov_base = &lr;
+	iov.iov_base = lr;
 		
 	fs = get_fs();
 	set_fs(get_ds());
@@ -871,14 +882,14 @@
 		if(error < sizeof(struct log_request)){
 			DMERR("Cluster mirror log server received incomplete message.");
 		}
-		lc = get_log_context(lr.lr_uuid, lr.lr_uuid_ref);
+		lc = get_log_context(lr->lr_uuid, lr->lr_uuid_ref);
 
-		if(lr.lr_type == LRT_ELECTION ||
-		   lr.lr_type == LRT_SELECTION ||
-		   lr.lr_type == LRT_MASTER_ASSIGN ||
-		   lr.lr_type == LRT_MASTER_LEAVING){
+		if(lr->lr_type == LRT_ELECTION ||
+		   lr->lr_type == LRT_SELECTION ||
+		   lr->lr_type == LRT_MASTER_ASSIGN ||
+		   lr->lr_type == LRT_MASTER_LEAVING){
 			uint32_t old = (lc)?lc->server_id: 0xDEAD;
-			if(process_election(&lr, lc, &saddr_in)){
+			if(process_election(lr, lc, &saddr_in)){
 				DMERR("Election processing failed.");
 				return -1;
 			}
@@ -896,12 +907,12 @@
 		}
 
 		if(!lc){
-			lr.u.lr_int_rtn = -ENXIO;
+			lr->u.lr_int_rtn = -ENXIO;
 			goto reply;
 		}
 
 		if (lc->server_id != my_id) {
-			lr.u.lr_int_rtn = -ENXIO;
+			lr->u.lr_int_rtn = -ENXIO;
 			goto reply;
 		}
 
@@ -911,23 +922,23 @@
 			DMDEBUG("Getting request while server (%u) is suspended:", my_id);
 			DMDEBUG(" - Requester :: %u", nodeid);
 			DMDEBUG(" - log uuid  :: %s", lc->uuid + (strlen(lc->uuid) - 8));
-			DMDEBUG(" - req type  :: %s", RQ_STRING(lr.lr_type));
+			DMDEBUG(" - req type  :: %s", RQ_STRING(lr->lr_type));
 			*/
 			if (my_id != nodeid) {
-				lr.u.lr_int_rtn = -ENXIO;
+				lr->u.lr_int_rtn = -ENXIO;
 				goto reply;
 			}
 		}			
 
-		switch(lr.lr_type){
+		switch(lr->lr_type){
 		case LRT_IS_CLEAN:
-			error = server_is_clean(lc, &lr);
+			error = server_is_clean(lc, lr);
 			break;
 		case LRT_IS_REMOTE_RECOVERING:
-			error = server_is_remote_recovering(lc, &lr);
+			error = server_is_remote_recovering(lc, lr);
 			break;
 		case LRT_IN_SYNC:
-			error = server_in_sync(lc, &lr);
+			error = server_in_sync(lc, lr);
 			break;
 		case LRT_MARK_REGION:
 			if(!(nodeid = 
@@ -935,8 +946,8 @@
 				error = -ENXIO;
 				break;
 			}
-			error = server_mark_region(lc, &lr, nodeid);
-			lr.u.lr_int_rtn = 0;
+			error = server_mark_region(lc, lr, nodeid);
+			lr->u.lr_int_rtn = 0;
 			break;
 		case LRT_CLEAR_REGION:
 			if(!(nodeid = 
@@ -944,7 +955,10 @@
 				error = -ENXIO;
 				break;
 			}
-			error = server_clear_region(lc, &lr, nodeid);
+			error = server_clear_region(lc, lr, nodeid);
+			break;
+		case LRT_FLUSH:
+			error = server_flush(lc);
 			break;
 		case LRT_GET_RESYNC_WORK:
 			if(!(nodeid = 
@@ -952,14 +966,14 @@
 				error = -ENXIO;
 				break;
 			}
-			error = server_get_resync_work(lc, &lr, nodeid);
+			error = server_get_resync_work(lc, lr, nodeid);
 			break;
 		case LRT_COMPLETE_RESYNC_WORK:
-			error = server_complete_resync_work(lc, &lr, lr.u.lr_int_rtn);
-			lr.u.lr_int_rtn = 0;
+			error = server_complete_resync_work(lc, lr, lr->u.lr_int_rtn);
+			lr->u.lr_int_rtn = 0;
 			break;
 		case LRT_GET_SYNC_COUNT:
-			error = server_get_sync_count(lc, &lr);
+			error = server_get_sync_count(lc, lr);
 			break;
 		default:
 			DMWARN("unknown request type received");
@@ -971,15 +985,15 @@
 		if(error){
 /*
 			DMWARN("Error (%d) while processing request (%s)",
-			       error, RQ_STRING(lr.lr_type));
+			       error, RQ_STRING(lr->lr_type));
 */
-			lr.u.lr_int_rtn = error;
+			lr->u.lr_int_rtn = error;
 		}
 	reply:
     
 		/* Why do we need to reset this? */
 		iov.iov_len = sizeof(struct log_request);
-		iov.iov_base = &lr;
+		iov.iov_base = lr;
 		msg.msg_name = &saddr_in;
 		msg.msg_namelen = sizeof(saddr_in);
 
@@ -991,7 +1005,7 @@
 		set_fs(fs);
 		if(error < 0){
 			DMWARN("unable to sendmsg to client (type = %s, error = %d)",
-			       RQ_STRING(lr.lr_type), error);
+			       RQ_STRING(lr->lr_type), error);
 			return error;
 		}
 	} else if(error == -EAGAIN || error == -ETIMEDOUT){
@@ -1052,7 +1066,7 @@
 			if (atomic_read(&restart_event_type) == SERVICE_NODE_FAILED)
 				DMINFO("A cluster mirror log member has failed.");
 			
-			spin_lock(&log_list_lock);
+			down(&log_list_lock);
 			list_for_each_entry(lc, &log_list_head, log_list){
 				if(lc->server_id == my_id){
 					if (atomic_read(&lc->suspended)) {
@@ -1062,7 +1076,7 @@
 					}
 				}
 			}
-			spin_unlock(&log_list_lock);
+			up(&log_list_lock);
 
 			break;
 		default:
@@ -1150,7 +1164,7 @@
 int start_server(void /* log_devices ? */){
 	int error;
 
-	region_user_pool = mempool_create(100, region_user_alloc,
+	region_user_pool = mempool_create(1000, region_user_alloc,
 					  region_user_free, NULL);
 	if(!region_user_pool){
 		DMWARN("unable to allocate region user pool for server");
--- cluster/cmirror-kernel/src/Attic/dm-cmirror-xfr.h	2007/03/22 22:21:59	1.1.2.3
+++ cluster/cmirror-kernel/src/Attic/dm-cmirror-xfr.h	2007/04/03 18:21:10	1.1.2.4
@@ -14,14 +14,15 @@
 #define LRT_IN_SYNC             	3
 #define LRT_MARK_REGION         	4
 #define LRT_CLEAR_REGION        	5
-#define LRT_GET_RESYNC_WORK     	6
-#define LRT_COMPLETE_RESYNC_WORK        7
-#define LRT_GET_SYNC_COUNT      	8
-
-#define LRT_ELECTION			9
-#define LRT_SELECTION			10
-#define LRT_MASTER_ASSIGN		11
-#define LRT_MASTER_LEAVING		12
+#define LRT_FLUSH                       6
+#define LRT_GET_RESYNC_WORK     	7
+#define LRT_COMPLETE_RESYNC_WORK        8
+#define LRT_GET_SYNC_COUNT      	9
+
+#define LRT_ELECTION			10
+#define LRT_SELECTION			11
+#define LRT_MASTER_ASSIGN		12
+#define LRT_MASTER_LEAVING		13
 
 #define CLUSTER_LOG_PORT 51005
 
@@ -29,6 +30,7 @@
 	((x) == LRT_IS_CLEAN) ? "LRT_IS_CLEAN": \
 	((x) == LRT_IN_SYNC) ? "LRT_IN_SYNC": \
 	((x) == LRT_MARK_REGION) ? "LRT_MARK_REGION": \
+	((x) == LRT_FLUSH) ? "LRT_FLUSH": \
 	((x) == LRT_GET_RESYNC_WORK) ? "LRT_GET_RESYNC_WORK": \
 	((x) == LRT_GET_SYNC_COUNT) ? "LRT_GET_SYNC_COUNT": \
 	((x) == LRT_CLEAR_REGION) ? "LRT_CLEAR_REGION": \



             reply	other threads:[~2007-04-03 18:21 UTC|newest]

Thread overview: 40+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2007-04-03 18:21 jbrassow [this message]
  -- strict thread matches above, loose matches on Subject: below --
2007-10-03 19:02 [Cluster-devel] cluster/cmirror-kernel/src dm-cmirror-client.c jbrassow
2007-09-27 20:31 jbrassow
2007-09-26  3:15 jbrassow
2007-09-21 20:07 jbrassow
2007-09-13 15:24 jbrassow
2007-07-11 16:18 jbrassow
2007-04-26 16:55 jbrassow
2007-04-26 16:54 jbrassow
2007-04-24 20:10 jbrassow
2007-04-24 20:08 jbrassow
2007-04-10  7:13 jbrassow
2007-04-10  7:12 jbrassow
2007-04-05 21:33 jbrassow
2007-04-05 21:32 jbrassow
2007-04-03 18:23 jbrassow
2007-03-22 22:34 jbrassow
2007-03-22 22:22 jbrassow
2007-03-14  4:28 jbrassow
2007-02-26 17:38 jbrassow
2007-02-20 19:35 jbrassow
2007-02-19 16:29 jbrassow
2007-02-14 17:44 jbrassow
2007-02-02 17:22 jbrassow
2007-01-08 19:28 jbrassow
2006-12-07 18:58 jbrassow
2006-09-05 17:50 jbrassow
2006-09-05 17:48 jbrassow
2006-07-27 23:11 jbrassow
2006-07-27 23:11 jbrassow
2006-07-22 22:19 jbrassow
2006-07-22 22:19 jbrassow
2006-07-22 22:12 jbrassow
2006-06-29 19:49 jbrassow
2006-06-29 19:48 jbrassow
2006-06-29 19:46 jbrassow
2006-06-27 20:19 jbrassow
2006-06-15 19:48 jbrassow
2006-06-15 19:34 jbrassow
2006-06-13 16:26 jbrassow

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20070403182112.4454.qmail@sourceware.org \
    --to=jbrassow@sourceware.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.