All of lore.kernel.org
 help / color / mirror / Atom feed
* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
@ 2010-11-12  1:01 Goldwyn Rodrigues
  2010-11-12  2:16 ` Wengang Wang
  0 siblings, 1 reply; 13+ messages in thread
From: Goldwyn Rodrigues @ 2010-11-12  1:01 UTC (permalink / raw)
  To: ocfs2-devel

ocfs2_recover_node is a new data structure to include all information required
to recover one node. The structure is maintained as a list in the super block.

ocfs2_recover_node replaces the recovery_map

Signed-off-by: Goldwyn Rodrigues <rgoldwyn@suse.de>
---
 fs/ocfs2/journal.c |  170 +++++++++++++++++++++------------------------------
 fs/ocfs2/journal.h |    9 ++-
 fs/ocfs2/ocfs2.h   |    3 +-
 fs/ocfs2/super.c   |   16 -----
 4 files changed, 78 insertions(+), 120 deletions(-)

diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index faa2303..6c378d6 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -58,8 +58,7 @@ DEFINE_SPINLOCK(trans_inc_lock);
 #define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000

 static int ocfs2_force_read_journal(struct inode *inode);
-static int ocfs2_recover_node(struct ocfs2_super *osb,
-			      int node_num, int slot_num);
+static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn);
 static int __ocfs2_recovery_thread(void *arg);
 static int ocfs2_commit_cache(struct ocfs2_super *osb);
 static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota);
@@ -179,24 +178,11 @@ void ocfs2_free_replay_slots(struct ocfs2_super *osb)

 int ocfs2_recovery_init(struct ocfs2_super *osb)
 {
-	struct ocfs2_recovery_map *rm;
-
 	mutex_init(&osb->recovery_lock);
 	osb->disable_recovery = 0;
 	osb->recovery_thread_task = NULL;
 	init_waitqueue_head(&osb->recovery_event);
-
-	rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
-		     osb->max_slots * sizeof(unsigned int),
-		     GFP_KERNEL);
-	if (!rm) {
-		mlog_errno(-ENOMEM);
-		return -ENOMEM;
-	}
-
-	rm->rm_entries = (unsigned int *)((char *)rm +
-					  sizeof(struct ocfs2_recovery_map));
-	osb->recovery_map = rm;
+	INIT_LIST_HEAD(&osb->s_recovery_nodes);

 	return 0;
 }
@@ -212,8 +198,6 @@ static int ocfs2_recovery_thread_running(struct
ocfs2_super *osb)

 void ocfs2_recovery_exit(struct ocfs2_super *osb)
 {
-	struct ocfs2_recovery_map *rm;
-
 	/* disable any new recovery threads and wait for any currently
 	 * running ones to exit. Do this before setting the vol_state. */
 	mutex_lock(&osb->recovery_lock);
@@ -226,29 +210,20 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
 	 * complete. */
 	flush_workqueue(ocfs2_wq);

-	/*
-	 * Now that recovery is shut down, and the osb is about to be
-	 * freed,  the osb_lock is not taken here.
-	 */
-	rm = osb->recovery_map;
-	/* XXX: Should we bug if there are dirty entries? */
-
-	kfree(rm);
 }

 static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
 				     unsigned int node_num)
 {
-	int i;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
+	struct ocfs2_recover_node *rn;
+	struct list_head *iter;

 	assert_spin_locked(&osb->osb_lock);
-
-	for (i = 0; i < rm->rm_used; i++) {
-		if (rm->rm_entries[i] == node_num)
+	list_for_each(iter, &osb->s_recovery_nodes) {
+		rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
+		if (rn->rn_node_num == node_num)
 			return 1;
 	}
-
 	return 0;
 }

@@ -256,45 +231,36 @@ static int __ocfs2_recovery_map_test(struct
ocfs2_super *osb,
 static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
 				  unsigned int node_num)
 {
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
-
-	spin_lock(&osb->osb_lock);
-	if (__ocfs2_recovery_map_test(osb, node_num)) {
-		spin_unlock(&osb->osb_lock);
-		return 1;
-	}
+	struct list_head *iter;
+	struct ocfs2_recover_node *rn = NULL, *trn;

-	/* XXX: Can this be exploited? Not from o2dlm... */
-	BUG_ON(rm->rm_used >= osb->max_slots);
+	trn = kzalloc(sizeof(struct ocfs2_recover_node), GFP_KERNEL);

-	rm->rm_entries[rm->rm_used] = node_num;
-	rm->rm_used++;
+	spin_lock(&osb->osb_lock);
+	list_for_each(iter, &osb->s_recovery_nodes) {
+			rn = list_entry(iter, struct ocfs2_recover_node,
+					rn_list);
+			if (rn->rn_node_num == node_num) {
+				spin_unlock(&osb->osb_lock);
+				kfree(trn);
+				return 1;
+			}
+	}
+	trn->rn_node_num = node_num;
+	trn->rn_osb = osb;
+	list_add_tail(&trn->rn_list, &osb->s_recovery_nodes);
 	spin_unlock(&osb->osb_lock);

 	return 0;
 }

 static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
-				     unsigned int node_num)
+			struct ocfs2_recover_node *rn)
 {
-	int i;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
-
 	spin_lock(&osb->osb_lock);
-
-	for (i = 0; i < rm->rm_used; i++) {
-		if (rm->rm_entries[i] == node_num)
-			break;
-	}
-
-	if (i < rm->rm_used) {
-		/* XXX: be careful with the pointer math */
-		memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
-			(rm->rm_used - i - 1) * sizeof(unsigned int));
-		rm->rm_used--;
-	}
-
+	list_del(&rn->rn_list);
 	spin_unlock(&osb->osb_lock);
+	kfree(rn);
 }

 static int ocfs2_commit_cache(struct ocfs2_super *osb)
@@ -1092,10 +1058,9 @@ bail:
 static int ocfs2_recovery_completed(struct ocfs2_super *osb)
 {
 	int empty;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;

 	spin_lock(&osb->osb_lock);
-	empty = (rm->rm_used == 0);
+	empty = list_empty(&osb->s_recovery_nodes);
 	spin_unlock(&osb->osb_lock);

 	return empty;
@@ -1332,12 +1297,13 @@ void ocfs2_complete_quota_recovery(struct
ocfs2_super *osb)

 static int __ocfs2_recovery_thread(void *arg)
 {
-	int status, node_num, slot_num;
+	int status;
 	struct ocfs2_super *osb = arg;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
 	int *rm_quota = NULL;
 	int rm_quota_used = 0, i;
 	struct ocfs2_quota_recovery *qrec;
+	struct list_head *iter;
+	struct ocfs2_recover_node *rn = NULL;

 	mlog_entry_void();

@@ -1367,20 +1333,21 @@ restart:
 					NULL, NULL);

 	spin_lock(&osb->osb_lock);
-	while (rm->rm_used) {
+	list_for_each(iter, &osb->s_recovery_nodes) {
+		rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
 		/* It's always safe to remove entry zero, as we won't
 		 * clear it until ocfs2_recover_node() has succeeded. */
-		node_num = rm->rm_entries[0];
 		spin_unlock(&osb->osb_lock);
-		mlog(0, "checking node %d\n", node_num);
-		slot_num = ocfs2_node_num_to_slot(osb, node_num);
-		if (slot_num == -ENOENT) {
+		mlog(0, "checking node %d\n", rn->rn_node_num);
+		rn->rn_slot_num = ocfs2_node_num_to_slot(osb, rn->rn_node_num);
+		if (rn->rn_slot_num == -ENOENT) {
 			status = 0;
 			mlog(0, "no slot for this node, so no recovery"
 			     "required.\n");
 			goto skip_recovery;
 		}
-		mlog(0, "node %d was using slot %d\n", node_num, slot_num);
+		mlog(0, "node %d was using slot %d\n",
+				rn->rn_node_num, rn->rn_slot_num);

 		/* It is a bit subtle with quota recovery. We cannot do it
 		 * immediately because we have to obtain cluster locks from
@@ -1388,18 +1355,19 @@ restart:
 		 * then quota usage would be out of sync until some node takes
 		 * the slot. So we remember which nodes need quota recovery
 		 * and when everything else is done, we recover quotas. */
-		for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
+		for (i = 0; i < rm_quota_used && rm_quota[i] != rn->rn_slot_num;
+					i++);
 		if (i == rm_quota_used)
-			rm_quota[rm_quota_used++] = slot_num;
+			rm_quota[rm_quota_used++] = rn->rn_slot_num;

-		status = ocfs2_recover_node(osb, node_num, slot_num);
+		status = ocfs2_recover_one_node(rn);
 skip_recovery:
 		if (!status) {
-			ocfs2_recovery_map_clear(osb, node_num);
+			ocfs2_recovery_map_clear(osb, rn);
 		} else {
 			mlog(ML_ERROR,
 			     "Error %d recovering node %d on device (%u,%u)!\n",
-			     status, node_num,
+			     status, rn->rn_node_num,
 			     MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
 			mlog(ML_ERROR, "Volume requires unmount.\n");
 		}
@@ -1681,62 +1649,64 @@ done:
  * second part of a nodes recovery process (local alloc recovery) is
  * far less concerning.
  */
-static int ocfs2_recover_node(struct ocfs2_super *osb,
-			      int node_num, int slot_num)
+static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn)
 {
-	int status = 0;
 	struct ocfs2_dinode *la_copy = NULL;
 	struct ocfs2_dinode *tl_copy = NULL;
+	struct ocfs2_super *osb = rn->rn_osb;

 	mlog_entry("(node_num=%d, slot_num=%d, osb->node_num = %d)\n",
-		   node_num, slot_num, osb->node_num);
+		   rn->rn_node_num, rn->rn_slot_num, osb->node_num);

 	/* Should not ever be called to recover ourselves -- in that
 	 * case we should've called ocfs2_journal_load instead. */
-	BUG_ON(osb->node_num == node_num);
+	BUG_ON(osb->node_num == rn->rn_node_num);

-	status = ocfs2_replay_journal(osb, node_num, slot_num);
-	if (status < 0) {
-		if (status == -EBUSY) {
+	rn->rn_status = ocfs2_replay_journal(osb, rn->rn_node_num,
+			rn->rn_slot_num);
+	if (rn->rn_status < 0) {
+		if (rn->rn_status == -EBUSY) {
 			mlog(0, "Skipping recovery for slot %u (node %u) "
-			     "as another node has recovered it\n", slot_num,
-			     node_num);
-			status = 0;
+			     "as another node has recovered it\n",
+			     rn->rn_slot_num, rn->rn_node_num);
+			rn->rn_status = 0;
 			goto done;
 		}
-		mlog_errno(status);
+		mlog_errno(rn->rn_status);
 		goto done;
 	}

 	/* Stamp a clean local alloc file AFTER recovering the journal... */
-	status = ocfs2_begin_local_alloc_recovery(osb, slot_num, &la_copy);
-	if (status < 0) {
-		mlog_errno(status);
+	rn->rn_status = ocfs2_begin_local_alloc_recovery(osb, rn->rn_slot_num,
+			&la_copy);
+	if (rn->rn_status < 0) {
+		mlog_errno(rn->rn_status);
 		goto done;
 	}

 	/* An error from begin_truncate_log_recovery is not
 	 * serious enough to warrant halting the rest of
 	 * recovery. */
-	status = ocfs2_begin_truncate_log_recovery(osb, slot_num, &tl_copy);
-	if (status < 0)
-		mlog_errno(status);
+	rn->rn_status = ocfs2_begin_truncate_log_recovery(osb,
+			rn->rn_slot_num, &tl_copy);
+	if (rn->rn_status < 0)
+		mlog_errno(rn->rn_status);

 	/* Likewise, this would be a strange but ultimately not so
 	 * harmful place to get an error... */
-	status = ocfs2_clear_slot(osb, slot_num);
-	if (status < 0)
-		mlog_errno(status);
+	rn->rn_status = ocfs2_clear_slot(osb, rn->rn_slot_num);
+	if (rn->rn_status < 0)
+		mlog_errno(rn->rn_status);

 	/* This will kfree the memory pointed to by la_copy and tl_copy */
-	ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
-					tl_copy, NULL);
+	ocfs2_queue_recovery_completion(osb->journal, rn->rn_slot_num,
+				la_copy, tl_copy, NULL);

-	status = 0;
+	rn->rn_status = 0;
 done:

-	mlog_exit(status);
-	return status;
+	mlog_exit(rn->rn_status);
+	return rn->rn_status;
 }

 /* Test node liveness by trylocking his journal. If we get the lock,
diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h
index 43e56b9..0325d81 100644
--- a/fs/ocfs2/journal.h
+++ b/fs/ocfs2/journal.h
@@ -43,9 +43,12 @@ struct ocfs2_dinode;
  * It is protected by the recovery_lock.
  */

-struct ocfs2_recovery_map {
-	unsigned int rm_used;
-	unsigned int *rm_entries;
+struct ocfs2_recover_node {
+	struct ocfs2_super *rn_osb;
+	int rn_node_num;
+	int rn_slot_num;
+	int rn_status;
+	struct list_head rn_list;
 };


diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index d840821..318caac 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -337,7 +337,8 @@ struct ocfs2_super

 	atomic_t vol_state;
 	struct mutex recovery_lock;
-	struct ocfs2_recovery_map *recovery_map;
+	struct list_head s_recovery_nodes;
+
 	struct ocfs2_replay_map *replay_map;
 	struct task_struct *recovery_thread_task;
 	int disable_recovery;
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index f02c0ef..478715b 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -220,7 +220,6 @@ static const match_table_t tokens = {
 static int ocfs2_osb_dump(struct ocfs2_super *osb, char *buf, int len)
 {
 	struct ocfs2_cluster_connection *cconn = osb->cconn;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
 	struct ocfs2_orphan_scan *os = &osb->osb_orphan_scan;
 	int i, out = 0;

@@ -267,21 +266,6 @@ static int ocfs2_osb_dump(struct ocfs2_super
*osb, char *buf, int len)
 			osb->dc_work_sequence);
 	spin_unlock(&osb->dc_task_lock);

-	spin_lock(&osb->osb_lock);
-	out += snprintf(buf + out, len - out, "%10s => Pid: %d  Nodes:",
-			"Recovery",
-			(osb->recovery_thread_task ?
-			 task_pid_nr(osb->recovery_thread_task) : -1));
-	if (rm->rm_used == 0)
-		out += snprintf(buf + out, len - out, " None\n");
-	else {
-		for (i = 0; i < rm->rm_used; i++)
-			out += snprintf(buf + out, len - out, " %d",
-					rm->rm_entries[i]);
-		out += snprintf(buf + out, len - out, "\n");
-	}
-	spin_unlock(&osb->osb_lock);
-
 	out += snprintf(buf + out, len - out,
 			"%10s => Pid: %d  Interval: %lu  Needs: %d\n", "Commit",
 			(osb->commit_task ? task_pid_nr(osb->commit_task) : -1),
-- 
1.7.1


-- 
Goldwyn

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
  2010-11-12  1:01 [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node Goldwyn Rodrigues
@ 2010-11-12  2:16 ` Wengang Wang
  2010-11-12 19:22   ` Goldwyn Rodrigues
  0 siblings, 1 reply; 13+ messages in thread
From: Wengang Wang @ 2010-11-12  2:16 UTC (permalink / raw)
  To: ocfs2-devel

On 10-11-11 19:01, Goldwyn Rodrigues wrote:
> ocfs2_recover_node is a new data structure to include all information required
> to recover one node. The structure is maintained as a list in the super block.
> 
> ocfs2_recover_node replaces the recovery_map
> 
> Signed-off-by: Goldwyn Rodrigues <rgoldwyn@suse.de>
> ---
>  fs/ocfs2/journal.c |  170 +++++++++++++++++++++------------------------------
>  fs/ocfs2/journal.h |    9 ++-
>  fs/ocfs2/ocfs2.h   |    3 +-
>  fs/ocfs2/super.c   |   16 -----
>  4 files changed, 78 insertions(+), 120 deletions(-)
> 
> diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
> index faa2303..6c378d6 100644
> --- a/fs/ocfs2/journal.c
> +++ b/fs/ocfs2/journal.c
> @@ -58,8 +58,7 @@ DEFINE_SPINLOCK(trans_inc_lock);
>  #define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000
> 
>  static int ocfs2_force_read_journal(struct inode *inode);
> -static int ocfs2_recover_node(struct ocfs2_super *osb,
> -			      int node_num, int slot_num);
> +static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn);
>  static int __ocfs2_recovery_thread(void *arg);
>  static int ocfs2_commit_cache(struct ocfs2_super *osb);
>  static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota);
> @@ -179,24 +178,11 @@ void ocfs2_free_replay_slots(struct ocfs2_super *osb)
> 
>  int ocfs2_recovery_init(struct ocfs2_super *osb)
>  {
> -	struct ocfs2_recovery_map *rm;
> -
>  	mutex_init(&osb->recovery_lock);
>  	osb->disable_recovery = 0;
>  	osb->recovery_thread_task = NULL;
>  	init_waitqueue_head(&osb->recovery_event);
> -
> -	rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
> -		     osb->max_slots * sizeof(unsigned int),
> -		     GFP_KERNEL);
> -	if (!rm) {
> -		mlog_errno(-ENOMEM);
> -		return -ENOMEM;
> -	}
> -
> -	rm->rm_entries = (unsigned int *)((char *)rm +
> -					  sizeof(struct ocfs2_recovery_map));
> -	osb->recovery_map = rm;
> +	INIT_LIST_HEAD(&osb->s_recovery_nodes);
> 
>  	return 0;
>  }
> @@ -212,8 +198,6 @@ static int ocfs2_recovery_thread_running(struct
> ocfs2_super *osb)
> 
>  void ocfs2_recovery_exit(struct ocfs2_super *osb)
>  {
> -	struct ocfs2_recovery_map *rm;
> -
>  	/* disable any new recovery threads and wait for any currently
>  	 * running ones to exit. Do this before setting the vol_state. */
>  	mutex_lock(&osb->recovery_lock);
> @@ -226,29 +210,20 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
>  	 * complete. */
>  	flush_workqueue(ocfs2_wq);
> 
> -	/*
> -	 * Now that recovery is shut down, and the osb is about to be
> -	 * freed,  the osb_lock is not taken here.
> -	 */
> -	rm = osb->recovery_map;
> -	/* XXX: Should we bug if there are dirty entries? */
> -
> -	kfree(rm);
>  }

needs to free ocfs2_recover_nodes in list osb->s_recovery_nodes?

> 
>  static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
>  				     unsigned int node_num)

for function names, would you also change all *map* to *node* accordingly?

>  {
> -	int i;
> -	struct ocfs2_recovery_map *rm = osb->recovery_map;
> +	struct ocfs2_recover_node *rn;
> +	struct list_head *iter;
> 
>  	assert_spin_locked(&osb->osb_lock);
> -
> -	for (i = 0; i < rm->rm_used; i++) {
> -		if (rm->rm_entries[i] == node_num)
> +	list_for_each(iter, &osb->s_recovery_nodes) {
> +		rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
> +		if (rn->rn_node_num == node_num)
>  			return 1;
>  	}
> -
>  	return 0;
>  }
> 
> @@ -256,45 +231,36 @@ static int __ocfs2_recovery_map_test(struct
> ocfs2_super *osb,
>  static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
>  				  unsigned int node_num)
>  {
> -	struct ocfs2_recovery_map *rm = osb->recovery_map;
> -
> -	spin_lock(&osb->osb_lock);
> -	if (__ocfs2_recovery_map_test(osb, node_num)) {
> -		spin_unlock(&osb->osb_lock);
> -		return 1;
> -	}
> +	struct list_head *iter;
> +	struct ocfs2_recover_node *rn = NULL, *trn;
> 
> -	/* XXX: Can this be exploited? Not from o2dlm... */
> -	BUG_ON(rm->rm_used >= osb->max_slots);
> +	trn = kzalloc(sizeof(struct ocfs2_recover_node), GFP_KERNEL);
> 

GFP_NOFS is better in this case?
NULL check?

> -	rm->rm_entries[rm->rm_used] = node_num;
> -	rm->rm_used++;
> +	spin_lock(&osb->osb_lock);
> +	list_for_each(iter, &osb->s_recovery_nodes) {
> +			rn = list_entry(iter, struct ocfs2_recover_node,
> +					rn_list);
> +			if (rn->rn_node_num == node_num) {
> +				spin_unlock(&osb->osb_lock);
> +				kfree(trn);
> +				return 1;
> +			}
> +	}
> +	trn->rn_node_num = node_num;
> +	trn->rn_osb = osb;
> +	list_add_tail(&trn->rn_list, &osb->s_recovery_nodes);
>  	spin_unlock(&osb->osb_lock);
> 
>  	return 0;
>  }
> 
>  static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
> -				     unsigned int node_num)
> +			struct ocfs2_recover_node *rn)
>  {
> -	int i;
> -	struct ocfs2_recovery_map *rm = osb->recovery_map;
> -
>  	spin_lock(&osb->osb_lock);
> -
> -	for (i = 0; i < rm->rm_used; i++) {
> -		if (rm->rm_entries[i] == node_num)
> -			break;
> -	}
> -
> -	if (i < rm->rm_used) {
> -		/* XXX: be careful with the pointer math */
> -		memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
> -			(rm->rm_used - i - 1) * sizeof(unsigned int));
> -		rm->rm_used--;
> -	}
> -
> +	list_del(&rn->rn_list);
>  	spin_unlock(&osb->osb_lock);
> +	kfree(rn);
>  }
> 
>  static int ocfs2_commit_cache(struct ocfs2_super *osb)
> @@ -1092,10 +1058,9 @@ bail:
>  static int ocfs2_recovery_completed(struct ocfs2_super *osb)
>  {
>  	int empty;
> -	struct ocfs2_recovery_map *rm = osb->recovery_map;
> 
>  	spin_lock(&osb->osb_lock);
> -	empty = (rm->rm_used == 0);
> +	empty = list_empty(&osb->s_recovery_nodes);
>  	spin_unlock(&osb->osb_lock);
> 
>  	return empty;
> @@ -1332,12 +1297,13 @@ void ocfs2_complete_quota_recovery(struct
> ocfs2_super *osb)
> 
>  static int __ocfs2_recovery_thread(void *arg)
>  {
> -	int status, node_num, slot_num;
> +	int status;
>  	struct ocfs2_super *osb = arg;
> -	struct ocfs2_recovery_map *rm = osb->recovery_map;
>  	int *rm_quota = NULL;
>  	int rm_quota_used = 0, i;
>  	struct ocfs2_quota_recovery *qrec;
> +	struct list_head *iter;
> +	struct ocfs2_recover_node *rn = NULL;
> 
>  	mlog_entry_void();
> 
> @@ -1367,20 +1333,21 @@ restart:
>  					NULL, NULL);
> 
>  	spin_lock(&osb->osb_lock);
> -	while (rm->rm_used) {
> +	list_for_each(iter, &osb->s_recovery_nodes) {
> +		rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
>  		/* It's always safe to remove entry zero, as we won't
>  		 * clear it until ocfs2_recover_node() has succeeded. */
> -		node_num = rm->rm_entries[0];
>  		spin_unlock(&osb->osb_lock);
> -		mlog(0, "checking node %d\n", node_num);
> -		slot_num = ocfs2_node_num_to_slot(osb, node_num);
> -		if (slot_num == -ENOENT) {
> +		mlog(0, "checking node %d\n", rn->rn_node_num);
> +		rn->rn_slot_num = ocfs2_node_num_to_slot(osb, rn->rn_node_num);
> +		if (rn->rn_slot_num == -ENOENT) {
>  			status = 0;
>  			mlog(0, "no slot for this node, so no recovery"
>  			     "required.\n");
>  			goto skip_recovery;
>  		}
> -		mlog(0, "node %d was using slot %d\n", node_num, slot_num);
> +		mlog(0, "node %d was using slot %d\n",
> +				rn->rn_node_num, rn->rn_slot_num);
> 
>  		/* It is a bit subtle with quota recovery. We cannot do it
>  		 * immediately because we have to obtain cluster locks from
> @@ -1388,18 +1355,19 @@ restart:
>  		 * then quota usage would be out of sync until some node takes
>  		 * the slot. So we remember which nodes need quota recovery
>  		 * and when everything else is done, we recover quotas. */
> -		for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
> +		for (i = 0; i < rm_quota_used && rm_quota[i] != rn->rn_slot_num;
> +					i++);
>  		if (i == rm_quota_used)
> -			rm_quota[rm_quota_used++] = slot_num;
> +			rm_quota[rm_quota_used++] = rn->rn_slot_num;
> 
> -		status = ocfs2_recover_node(osb, node_num, slot_num);
> +		status = ocfs2_recover_one_node(rn);
>  skip_recovery:
>  		if (!status) {
> -			ocfs2_recovery_map_clear(osb, node_num);
> +			ocfs2_recovery_map_clear(osb, rn);

kfree with spinlocked here.

>  		} else {
>  			mlog(ML_ERROR,
>  			     "Error %d recovering node %d on device (%u,%u)!\n",
> -			     status, node_num,
> +			     status, rn->rn_node_num,
>  			     MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
>  			mlog(ML_ERROR, "Volume requires unmount.\n");
>  		}
> @@ -1681,62 +1649,64 @@ done:
>   * second part of a nodes recovery process (local alloc recovery) is
>   * far less concerning.
>   */
> -static int ocfs2_recover_node(struct ocfs2_super *osb,
> -			      int node_num, int slot_num)
> +static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn)
>  {
> -	int status = 0;
>  	struct ocfs2_dinode *la_copy = NULL;
>  	struct ocfs2_dinode *tl_copy = NULL;
> +	struct ocfs2_super *osb = rn->rn_osb;
> 
>  	mlog_entry("(node_num=%d, slot_num=%d, osb->node_num = %d)\n",
> -		   node_num, slot_num, osb->node_num);
> +		   rn->rn_node_num, rn->rn_slot_num, osb->node_num);
> 
>  	/* Should not ever be called to recover ourselves -- in that
>  	 * case we should've called ocfs2_journal_load instead. */
> -	BUG_ON(osb->node_num == node_num);
> +	BUG_ON(osb->node_num == rn->rn_node_num);
> 
> -	status = ocfs2_replay_journal(osb, node_num, slot_num);
> -	if (status < 0) {
> -		if (status == -EBUSY) {
> +	rn->rn_status = ocfs2_replay_journal(osb, rn->rn_node_num,
> +			rn->rn_slot_num);
> +	if (rn->rn_status < 0) {
> +		if (rn->rn_status == -EBUSY) {
>  			mlog(0, "Skipping recovery for slot %u (node %u) "
> -			     "as another node has recovered it\n", slot_num,
> -			     node_num);
> -			status = 0;
> +			     "as another node has recovered it\n",
> +			     rn->rn_slot_num, rn->rn_node_num);
> +			rn->rn_status = 0;
>  			goto done;
>  		}
> -		mlog_errno(status);
> +		mlog_errno(rn->rn_status);
>  		goto done;
>  	}
> 
>  	/* Stamp a clean local alloc file AFTER recovering the journal... */
> -	status = ocfs2_begin_local_alloc_recovery(osb, slot_num, &la_copy);
> -	if (status < 0) {
> -		mlog_errno(status);
> +	rn->rn_status = ocfs2_begin_local_alloc_recovery(osb, rn->rn_slot_num,
> +			&la_copy);
> +	if (rn->rn_status < 0) {
> +		mlog_errno(rn->rn_status);
>  		goto done;
>  	}
> 
>  	/* An error from begin_truncate_log_recovery is not
>  	 * serious enough to warrant halting the rest of
>  	 * recovery. */
> -	status = ocfs2_begin_truncate_log_recovery(osb, slot_num, &tl_copy);
> -	if (status < 0)
> -		mlog_errno(status);
> +	rn->rn_status = ocfs2_begin_truncate_log_recovery(osb,
> +			rn->rn_slot_num, &tl_copy);
> +	if (rn->rn_status < 0)
> +		mlog_errno(rn->rn_status);
> 
>  	/* Likewise, this would be a strange but ultimately not so
>  	 * harmful place to get an error... */
> -	status = ocfs2_clear_slot(osb, slot_num);
> -	if (status < 0)
> -		mlog_errno(status);
> +	rn->rn_status = ocfs2_clear_slot(osb, rn->rn_slot_num);
> +	if (rn->rn_status < 0)
> +		mlog_errno(rn->rn_status);
> 
>  	/* This will kfree the memory pointed to by la_copy and tl_copy */
> -	ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
> -					tl_copy, NULL);
> +	ocfs2_queue_recovery_completion(osb->journal, rn->rn_slot_num,
> +				la_copy, tl_copy, NULL);
> 
> -	status = 0;
> +	rn->rn_status = 0;
>  done:
> 
> -	mlog_exit(status);
> -	return status;
> +	mlog_exit(rn->rn_status);
> +	return rn->rn_status;
>  }
> 
>  /* Test node liveness by trylocking his journal. If we get the lock,
> diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h
> index 43e56b9..0325d81 100644
> --- a/fs/ocfs2/journal.h
> +++ b/fs/ocfs2/journal.h
> @@ -43,9 +43,12 @@ struct ocfs2_dinode;
>   * It is protected by the recovery_lock.
>   */
> 
> -struct ocfs2_recovery_map {
> -	unsigned int rm_used;
> -	unsigned int *rm_entries;
> +struct ocfs2_recover_node {
> +	struct ocfs2_super *rn_osb;
> +	int rn_node_num;
> +	int rn_slot_num;
> +	int rn_status;
> +	struct list_head rn_list;
>  };
> 
> 
> diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
> index d840821..318caac 100644
> --- a/fs/ocfs2/ocfs2.h
> +++ b/fs/ocfs2/ocfs2.h
> @@ -337,7 +337,8 @@ struct ocfs2_super
> 
>  	atomic_t vol_state;
>  	struct mutex recovery_lock;
> -	struct ocfs2_recovery_map *recovery_map;
> +	struct list_head s_recovery_nodes;
> +
>  	struct ocfs2_replay_map *replay_map;
>  	struct task_struct *recovery_thread_task;
>  	int disable_recovery;
> diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
> index f02c0ef..478715b 100644
> --- a/fs/ocfs2/super.c
> +++ b/fs/ocfs2/super.c
> @@ -220,7 +220,6 @@ static const match_table_t tokens = {
>  static int ocfs2_osb_dump(struct ocfs2_super *osb, char *buf, int len)
>  {
>  	struct ocfs2_cluster_connection *cconn = osb->cconn;
> -	struct ocfs2_recovery_map *rm = osb->recovery_map;
>  	struct ocfs2_orphan_scan *os = &osb->osb_orphan_scan;
>  	int i, out = 0;
> 
> @@ -267,21 +266,6 @@ static int ocfs2_osb_dump(struct ocfs2_super
> *osb, char *buf, int len)
>  			osb->dc_work_sequence);
>  	spin_unlock(&osb->dc_task_lock);
> 
> -	spin_lock(&osb->osb_lock);
> -	out += snprintf(buf + out, len - out, "%10s => Pid: %d  Nodes:",
> -			"Recovery",
> -			(osb->recovery_thread_task ?
> -			 task_pid_nr(osb->recovery_thread_task) : -1));
> -	if (rm->rm_used == 0)
> -		out += snprintf(buf + out, len - out, " None\n");
> -	else {
> -		for (i = 0; i < rm->rm_used; i++)
> -			out += snprintf(buf + out, len - out, " %d",
> -					rm->rm_entries[i]);
> -		out += snprintf(buf + out, len - out, "\n");
> -	}
> -	spin_unlock(&osb->osb_lock);
> -
>  	out += snprintf(buf + out, len - out,
>  			"%10s => Pid: %d  Interval: %lu  Needs: %d\n", "Commit",
>  			(osb->commit_task ? task_pid_nr(osb->commit_task) : -1),
> -- 
> 1.7.1
> 
> 
> -- 
> Goldwyn
> 
> _______________________________________________
> Ocfs2-devel mailing list
> Ocfs2-devel at oss.oracle.com
> http://oss.oracle.com/mailman/listinfo/ocfs2-devel

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
  2010-11-12  2:16 ` Wengang Wang
@ 2010-11-12 19:22   ` Goldwyn Rodrigues
  2010-11-13  5:03     ` Wengang Wang
  0 siblings, 1 reply; 13+ messages in thread
From: Goldwyn Rodrigues @ 2010-11-12 19:22 UTC (permalink / raw)
  To: ocfs2-devel

Hi Wengang,

Thanks for the review.

On Thu, Nov 11, 2010 at 8:16 PM, Wengang Wang <wen.gang.wang@oracle.com> wrote:
> On 10-11-11 19:01, Goldwyn Rodrigues wrote:
>> ocfs2_recover_node is a new data structure to include all information required
>> to recover one node. The structure is maintained as a list in the super block.
>>
>> ocfs2_recover_node replaces the recovery_map
>>
>> Signed-off-by: Goldwyn Rodrigues <rgoldwyn@suse.de>
>> ---
>> ?fs/ocfs2/journal.c | ?170 +++++++++++++++++++++------------------------------
>> ?fs/ocfs2/journal.h | ? ?9 ++-
>> ?fs/ocfs2/ocfs2.h ? | ? ?3 +-
>> ?fs/ocfs2/super.c ? | ? 16 -----
>> ?4 files changed, 78 insertions(+), 120 deletions(-)
>>
>> diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
>> index faa2303..6c378d6 100644
>> --- a/fs/ocfs2/journal.c
>> +++ b/fs/ocfs2/journal.c
>> @@ -58,8 +58,7 @@ DEFINE_SPINLOCK(trans_inc_lock);
>> ?#define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000
>>
>> ?static int ocfs2_force_read_journal(struct inode *inode);
>> -static int ocfs2_recover_node(struct ocfs2_super *osb,
>> - ? ? ? ? ? ? ? ? ? ? ? ? ? int node_num, int slot_num);
>> +static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn);
>> ?static int __ocfs2_recovery_thread(void *arg);
>> ?static int ocfs2_commit_cache(struct ocfs2_super *osb);
>> ?static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota);
>> @@ -179,24 +178,11 @@ void ocfs2_free_replay_slots(struct ocfs2_super *osb)
>>
>> ?int ocfs2_recovery_init(struct ocfs2_super *osb)
>> ?{
>> - ? ? struct ocfs2_recovery_map *rm;
>> -
>> ? ? ? mutex_init(&osb->recovery_lock);
>> ? ? ? osb->disable_recovery = 0;
>> ? ? ? osb->recovery_thread_task = NULL;
>> ? ? ? init_waitqueue_head(&osb->recovery_event);
>> -
>> - ? ? rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
>> - ? ? ? ? ? ? ? ? ?osb->max_slots * sizeof(unsigned int),
>> - ? ? ? ? ? ? ? ? ?GFP_KERNEL);
>> - ? ? if (!rm) {
>> - ? ? ? ? ? ? mlog_errno(-ENOMEM);
>> - ? ? ? ? ? ? return -ENOMEM;
>> - ? ? }
>> -
>> - ? ? rm->rm_entries = (unsigned int *)((char *)rm +
>> - ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? sizeof(struct ocfs2_recovery_map));
>> - ? ? osb->recovery_map = rm;
>> + ? ? INIT_LIST_HEAD(&osb->s_recovery_nodes);
>>
>> ? ? ? return 0;
>> ?}
>> @@ -212,8 +198,6 @@ static int ocfs2_recovery_thread_running(struct
>> ocfs2_super *osb)
>>
>> ?void ocfs2_recovery_exit(struct ocfs2_super *osb)
>> ?{
>> - ? ? struct ocfs2_recovery_map *rm;
>> -
>> ? ? ? /* disable any new recovery threads and wait for any currently
>> ? ? ? ?* running ones to exit. Do this before setting the vol_state. */
>> ? ? ? mutex_lock(&osb->recovery_lock);
>> @@ -226,29 +210,20 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
>> ? ? ? ?* complete. */
>> ? ? ? flush_workqueue(ocfs2_wq);
>>
>> - ? ? /*
>> - ? ? ?* Now that recovery is shut down, and the osb is about to be
>> - ? ? ?* freed, ?the osb_lock is not taken here.
>> - ? ? ?*/
>> - ? ? rm = osb->recovery_map;
>> - ? ? /* XXX: Should we bug if there are dirty entries? */
>> -
>> - ? ? kfree(rm);
>> ?}
>
> needs to free ocfs2_recover_nodes in list osb->s_recovery_nodes?
>

Hmm. Good catch.


>>
>> ?static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
>> ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?unsigned int node_num)
>
> for function names, would you also change all *map* to *node* accordingly?
>
>> ?{
>> - ? ? int i;
>> - ? ? struct ocfs2_recovery_map *rm = osb->recovery_map;
>> + ? ? struct ocfs2_recover_node *rn;
>> + ? ? struct list_head *iter;
>>
>> ? ? ? assert_spin_locked(&osb->osb_lock);
>> -
>> - ? ? for (i = 0; i < rm->rm_used; i++) {
>> - ? ? ? ? ? ? if (rm->rm_entries[i] == node_num)
>> + ? ? list_for_each(iter, &osb->s_recovery_nodes) {
>> + ? ? ? ? ? ? rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
>> + ? ? ? ? ? ? if (rn->rn_node_num == node_num)
>> ? ? ? ? ? ? ? ? ? ? ? return 1;
>> ? ? ? }
>> -
>> ? ? ? return 0;
>> ?}
>>
>> @@ -256,45 +231,36 @@ static int __ocfs2_recovery_map_test(struct
>> ocfs2_super *osb,
>> ?static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
>> ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? unsigned int node_num)
>> ?{
>> - ? ? struct ocfs2_recovery_map *rm = osb->recovery_map;
>> -
>> - ? ? spin_lock(&osb->osb_lock);
>> - ? ? if (__ocfs2_recovery_map_test(osb, node_num)) {
>> - ? ? ? ? ? ? spin_unlock(&osb->osb_lock);
>> - ? ? ? ? ? ? return 1;
>> - ? ? }
>> + ? ? struct list_head *iter;
>> + ? ? struct ocfs2_recover_node *rn = NULL, *trn;
>>
>> - ? ? /* XXX: Can this be exploited? Not from o2dlm... */
>> - ? ? BUG_ON(rm->rm_used >= osb->max_slots);
>> + ? ? trn = kzalloc(sizeof(struct ocfs2_recover_node), GFP_KERNEL);
>>
>
> GFP_NOFS is better in this case?
> NULL check?
>

Yes, I will change that.

>> - ? ? rm->rm_entries[rm->rm_used] = node_num;
>> - ? ? rm->rm_used++;
>> + ? ? spin_lock(&osb->osb_lock);
>> + ? ? list_for_each(iter, &osb->s_recovery_nodes) {
>> + ? ? ? ? ? ? ? ? ? ? rn = list_entry(iter, struct ocfs2_recover_node,
>> + ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? rn_list);
>> + ? ? ? ? ? ? ? ? ? ? if (rn->rn_node_num == node_num) {
>> + ? ? ? ? ? ? ? ? ? ? ? ? ? ? spin_unlock(&osb->osb_lock);
>> + ? ? ? ? ? ? ? ? ? ? ? ? ? ? kfree(trn);
>> + ? ? ? ? ? ? ? ? ? ? ? ? ? ? return 1;
>> + ? ? ? ? ? ? ? ? ? ? }
>> + ? ? }
>> + ? ? trn->rn_node_num = node_num;
>> + ? ? trn->rn_osb = osb;
>> + ? ? list_add_tail(&trn->rn_list, &osb->s_recovery_nodes);
>> ? ? ? spin_unlock(&osb->osb_lock);
>>
>> ? ? ? return 0;
>> ?}
>>
>> ?static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
>> - ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?unsigned int node_num)
>> + ? ? ? ? ? ? ? ? ? ? struct ocfs2_recover_node *rn)
>> ?{
>> - ? ? int i;
>> - ? ? struct ocfs2_recovery_map *rm = osb->recovery_map;
>> -
>> ? ? ? spin_lock(&osb->osb_lock);
>> -
>> - ? ? for (i = 0; i < rm->rm_used; i++) {
>> - ? ? ? ? ? ? if (rm->rm_entries[i] == node_num)
>> - ? ? ? ? ? ? ? ? ? ? break;
>> - ? ? }
>> -
>> - ? ? if (i < rm->rm_used) {
>> - ? ? ? ? ? ? /* XXX: be careful with the pointer math */
>> - ? ? ? ? ? ? memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
>> - ? ? ? ? ? ? ? ? ? ? (rm->rm_used - i - 1) * sizeof(unsigned int));
>> - ? ? ? ? ? ? rm->rm_used--;
>> - ? ? }
>> -
>> + ? ? list_del(&rn->rn_list);
>> ? ? ? spin_unlock(&osb->osb_lock);
>> + ? ? kfree(rn);
>> ?}
>>
>> ?static int ocfs2_commit_cache(struct ocfs2_super *osb)
>> @@ -1092,10 +1058,9 @@ bail:
>> ?static int ocfs2_recovery_completed(struct ocfs2_super *osb)
>> ?{
>> ? ? ? int empty;
>> - ? ? struct ocfs2_recovery_map *rm = osb->recovery_map;
>>
>> ? ? ? spin_lock(&osb->osb_lock);
>> - ? ? empty = (rm->rm_used == 0);
>> + ? ? empty = list_empty(&osb->s_recovery_nodes);
>> ? ? ? spin_unlock(&osb->osb_lock);
>>
>> ? ? ? return empty;
>> @@ -1332,12 +1297,13 @@ void ocfs2_complete_quota_recovery(struct
>> ocfs2_super *osb)
>>
>> ?static int __ocfs2_recovery_thread(void *arg)
>> ?{
>> - ? ? int status, node_num, slot_num;
>> + ? ? int status;
>> ? ? ? struct ocfs2_super *osb = arg;
>> - ? ? struct ocfs2_recovery_map *rm = osb->recovery_map;
>> ? ? ? int *rm_quota = NULL;
>> ? ? ? int rm_quota_used = 0, i;
>> ? ? ? struct ocfs2_quota_recovery *qrec;
>> + ? ? struct list_head *iter;
>> + ? ? struct ocfs2_recover_node *rn = NULL;
>>
>> ? ? ? mlog_entry_void();
>>
>> @@ -1367,20 +1333,21 @@ restart:
>> ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? NULL, NULL);
>>
>> ? ? ? spin_lock(&osb->osb_lock);
>> - ? ? while (rm->rm_used) {
>> + ? ? list_for_each(iter, &osb->s_recovery_nodes) {
>> + ? ? ? ? ? ? rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
>> ? ? ? ? ? ? ? /* It's always safe to remove entry zero, as we won't
>> ? ? ? ? ? ? ? ?* clear it until ocfs2_recover_node() has succeeded. */
>> - ? ? ? ? ? ? node_num = rm->rm_entries[0];
>> ? ? ? ? ? ? ? spin_unlock(&osb->osb_lock);
>> - ? ? ? ? ? ? mlog(0, "checking node %d\n", node_num);
>> - ? ? ? ? ? ? slot_num = ocfs2_node_num_to_slot(osb, node_num);
>> - ? ? ? ? ? ? if (slot_num == -ENOENT) {
>> + ? ? ? ? ? ? mlog(0, "checking node %d\n", rn->rn_node_num);
>> + ? ? ? ? ? ? rn->rn_slot_num = ocfs2_node_num_to_slot(osb, rn->rn_node_num);
>> + ? ? ? ? ? ? if (rn->rn_slot_num == -ENOENT) {
>> ? ? ? ? ? ? ? ? ? ? ? status = 0;
>> ? ? ? ? ? ? ? ? ? ? ? mlog(0, "no slot for this node, so no recovery"
>> ? ? ? ? ? ? ? ? ? ? ? ? ? ?"required.\n");
>> ? ? ? ? ? ? ? ? ? ? ? goto skip_recovery;
>> ? ? ? ? ? ? ? }
>> - ? ? ? ? ? ? mlog(0, "node %d was using slot %d\n", node_num, slot_num);
>> + ? ? ? ? ? ? mlog(0, "node %d was using slot %d\n",
>> + ? ? ? ? ? ? ? ? ? ? ? ? ? ? rn->rn_node_num, rn->rn_slot_num);
>>
>> ? ? ? ? ? ? ? /* It is a bit subtle with quota recovery. We cannot do it
>> ? ? ? ? ? ? ? ?* immediately because we have to obtain cluster locks from
>> @@ -1388,18 +1355,19 @@ restart:
>> ? ? ? ? ? ? ? ?* then quota usage would be out of sync until some node takes
>> ? ? ? ? ? ? ? ?* the slot. So we remember which nodes need quota recovery
>> ? ? ? ? ? ? ? ?* and when everything else is done, we recover quotas. */
>> - ? ? ? ? ? ? for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
>> + ? ? ? ? ? ? for (i = 0; i < rm_quota_used && rm_quota[i] != rn->rn_slot_num;
>> + ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? i++);
>> ? ? ? ? ? ? ? if (i == rm_quota_used)
>> - ? ? ? ? ? ? ? ? ? ? rm_quota[rm_quota_used++] = slot_num;
>> + ? ? ? ? ? ? ? ? ? ? rm_quota[rm_quota_used++] = rn->rn_slot_num;
>>
>> - ? ? ? ? ? ? status = ocfs2_recover_node(osb, node_num, slot_num);
>> + ? ? ? ? ? ? status = ocfs2_recover_one_node(rn);
>> ?skip_recovery:
>> ? ? ? ? ? ? ? if (!status) {
>> - ? ? ? ? ? ? ? ? ? ? ocfs2_recovery_map_clear(osb, node_num);
>> + ? ? ? ? ? ? ? ? ? ? ocfs2_recovery_map_clear(osb, rn);
>
> kfree with spinlocked here.
>

What is the problem with that?
In any case, this part of the code moves to thread.

Regards,

-- 
Goldwyn

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
  2010-11-12 19:22   ` Goldwyn Rodrigues
@ 2010-11-13  5:03     ` Wengang Wang
  0 siblings, 0 replies; 13+ messages in thread
From: Wengang Wang @ 2010-11-13  5:03 UTC (permalink / raw)
  To: ocfs2-devel

Hi,

On 10-11-12 13:22, Goldwyn Rodrigues wrote:
> Hi Wengang,
> 
> Thanks for the review.
> 
> On Thu, Nov 11, 2010 at 8:16 PM, Wengang Wang <wen.gang.wang@oracle.com> wrote:
> > On 10-11-11 19:01, Goldwyn Rodrigues wrote:
> >> ocfs2_recover_node is a new data structure to include all information required
> >> to recover one node. The structure is maintained as a list in the super block.
> >>
> >> ocfs2_recover_node replaces the recovery_map
> >>
> >> Signed-off-by: Goldwyn Rodrigues <rgoldwyn@suse.de>
> >> - ? ? ? ? ? ? ? ? ? ? rm_quota[rm_quota_used++] = slot_num;
> >> + ? ? ? ? ? ? ? ? ? ? rm_quota[rm_quota_used++] = rn->rn_slot_num;
> >>
> >> - ? ? ? ? ? ? status = ocfs2_recover_node(osb, node_num, slot_num);
> >> + ? ? ? ? ? ? status = ocfs2_recover_one_node(rn);
> >> ?skip_recovery:
> >> ? ? ? ? ? ? ? if (!status) {
> >> - ? ? ? ? ? ? ? ? ? ? ocfs2_recovery_map_clear(osb, node_num);
> >> + ? ? ? ? ? ? ? ? ? ? ocfs2_recovery_map_clear(osb, rn);
> >
> > kfree with spinlocked here.
> >
> 
> What is the problem with that?

hm... seems no block in kfree() in current code.
please ignore that comment!

regards,
wengang.

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
@ 2010-11-16 22:59 Goldwyn Rodrigues
  2010-11-17  1:49 ` Wengang Wang
  0 siblings, 1 reply; 13+ messages in thread
From: Goldwyn Rodrigues @ 2010-11-16 22:59 UTC (permalink / raw)
  To: ocfs2-devel

Changes: Wengang's review comments incorporated.

ocfs2_recover_node is a new data structure to include all information required
to recover one node. The structure is maintainer as a list in the super block.

recovery_map is no longer required with ocfs2_recover_node.

Signed-off-by: Goldwyn Rodrigues <rgoldwyn@suse.de>
---
 fs/ocfs2/journal.c |  166 ++++++++++++++++++++++++----------------------------
 fs/ocfs2/journal.h |    9 ++-
 fs/ocfs2/ocfs2.h   |    3 +-
 fs/ocfs2/super.c   |   16 -----
 4 files changed, 85 insertions(+), 109 deletions(-)

diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index faa2303..71987d4 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -58,8 +58,7 @@ DEFINE_SPINLOCK(trans_inc_lock);
 #define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000

 static int ocfs2_force_read_journal(struct inode *inode);
-static int ocfs2_recover_node(struct ocfs2_super *osb,
-			      int node_num, int slot_num);
+static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn);
 static int __ocfs2_recovery_thread(void *arg);
 static int ocfs2_commit_cache(struct ocfs2_super *osb);
 static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota);
@@ -179,24 +178,11 @@ void ocfs2_free_replay_slots(struct ocfs2_super *osb)

 int ocfs2_recovery_init(struct ocfs2_super *osb)
 {
-	struct ocfs2_recovery_map *rm;
-
 	mutex_init(&osb->recovery_lock);
 	osb->disable_recovery = 0;
 	osb->recovery_thread_task = NULL;
 	init_waitqueue_head(&osb->recovery_event);
-
-	rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
-		     osb->max_slots * sizeof(unsigned int),
-		     GFP_KERNEL);
-	if (!rm) {
-		mlog_errno(-ENOMEM);
-		return -ENOMEM;
-	}
-
-	rm->rm_entries = (unsigned int *)((char *)rm +
-					  sizeof(struct ocfs2_recovery_map));
-	osb->recovery_map = rm;
+	INIT_LIST_HEAD(&osb->s_recovery_nodes);

 	return 0;
 }
@@ -212,8 +198,8 @@ static int ocfs2_recovery_thread_running(struct
ocfs2_super *osb)

 void ocfs2_recovery_exit(struct ocfs2_super *osb)
 {
-	struct ocfs2_recovery_map *rm;
-
+	struct list_head *iter;
+	struct ocfs2_recover_node *rn;
 	/* disable any new recovery threads and wait for any currently
 	 * running ones to exit. Do this before setting the vol_state. */
 	mutex_lock(&osb->recovery_lock);
@@ -226,75 +212,66 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
 	 * complete. */
 	flush_workqueue(ocfs2_wq);

-	/*
-	 * Now that recovery is shut down, and the osb is about to be
-	 * freed,  the osb_lock is not taken here.
-	 */
-	rm = osb->recovery_map;
-	/* XXX: Should we bug if there are dirty entries? */
-
-	kfree(rm);
+	spin_lock(&osb->osb_lock);
+	list_for_each(iter, &osb->s_recovery_nodes) {
+			rn = list_entry(iter, struct ocfs2_recover_node,
+					rn_list);
+			list_del(&rn->rn_list);
+	}
+	spin_unlock(&osb->osb_lock);
 }

 static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
 				     unsigned int node_num)
 {
-	int i;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
+	struct ocfs2_recover_node *rn;
+	struct list_head *iter;

 	assert_spin_locked(&osb->osb_lock);
-
-	for (i = 0; i < rm->rm_used; i++) {
-		if (rm->rm_entries[i] == node_num)
+	list_for_each(iter, &osb->s_recovery_nodes) {
+		rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
+		if (rn->rn_node_num == node_num)
 			return 1;
 	}
-
 	return 0;
 }

 /* Behaves like test-and-set.  Returns the previous value */
-static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
+static int ocfs2_recovery_node_set(struct ocfs2_super *osb,
 				  unsigned int node_num)
 {
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
-
-	spin_lock(&osb->osb_lock);
-	if (__ocfs2_recovery_map_test(osb, node_num)) {
-		spin_unlock(&osb->osb_lock);
-		return 1;
-	}
+	struct list_head *iter;
+	struct ocfs2_recover_node *rn = NULL, *trn;

-	/* XXX: Can this be exploited? Not from o2dlm... */
-	BUG_ON(rm->rm_used >= osb->max_slots);
+	trn = kzalloc(sizeof(struct ocfs2_recover_node), GFP_NOFS);
+	if (!trn)
+		return -ENOMEM;

-	rm->rm_entries[rm->rm_used] = node_num;
-	rm->rm_used++;
+	spin_lock(&osb->osb_lock);
+	list_for_each(iter, &osb->s_recovery_nodes) {
+			rn = list_entry(iter, struct ocfs2_recover_node,
+					rn_list);
+			if (rn->rn_node_num == node_num) {
+				spin_unlock(&osb->osb_lock);
+				kfree(trn);
+				return 1;
+			}
+	}
+	trn->rn_node_num = node_num;
+	trn->rn_osb = osb;
+	list_add_tail(&trn->rn_list, &osb->s_recovery_nodes);
 	spin_unlock(&osb->osb_lock);

 	return 0;
 }

-static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
-				     unsigned int node_num)
+static void ocfs2_recovery_node_clear(struct ocfs2_super *osb,
+			struct ocfs2_recover_node *rn)
 {
-	int i;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
-
 	spin_lock(&osb->osb_lock);
-
-	for (i = 0; i < rm->rm_used; i++) {
-		if (rm->rm_entries[i] == node_num)
-			break;
-	}
-
-	if (i < rm->rm_used) {
-		/* XXX: be careful with the pointer math */
-		memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
-			(rm->rm_used - i - 1) * sizeof(unsigned int));
-		rm->rm_used--;
-	}
-
+	list_del(&rn->rn_list);
 	spin_unlock(&osb->osb_lock);
+	kfree(rn);
 }

 static int ocfs2_commit_cache(struct ocfs2_super *osb)
@@ -1092,10 +1069,9 @@ bail:
 static int ocfs2_recovery_completed(struct ocfs2_super *osb)
 {
 	int empty;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;

 	spin_lock(&osb->osb_lock);
-	empty = (rm->rm_used == 0);
+	empty = list_empty(&osb->s_recovery_nodes);
 	spin_unlock(&osb->osb_lock);

 	return empty;
@@ -1332,12 +1308,13 @@ void ocfs2_complete_quota_recovery(struct
ocfs2_super *osb)

 static int __ocfs2_recovery_thread(void *arg)
 {
-	int status, node_num, slot_num;
+	int status;
 	struct ocfs2_super *osb = arg;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
 	int *rm_quota = NULL;
 	int rm_quota_used = 0, i;
 	struct ocfs2_quota_recovery *qrec;
+	struct list_head *iter;
+	struct ocfs2_recover_node *rn = NULL;

 	mlog_entry_void();

@@ -1367,20 +1344,21 @@ restart:
 					NULL, NULL);

 	spin_lock(&osb->osb_lock);
-	while (rm->rm_used) {
+	list_for_each(iter, &osb->s_recovery_nodes) {
+		rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
 		/* It's always safe to remove entry zero, as we won't
 		 * clear it until ocfs2_recover_node() has succeeded. */
-		node_num = rm->rm_entries[0];
 		spin_unlock(&osb->osb_lock);
-		mlog(0, "checking node %d\n", node_num);
-		slot_num = ocfs2_node_num_to_slot(osb, node_num);
-		if (slot_num == -ENOENT) {
+		mlog(0, "checking node %d\n", rn->rn_node_num);
+		rn->rn_slot_num = ocfs2_node_num_to_slot(osb, rn->rn_node_num);
+		if (rn->rn_slot_num == -ENOENT) {
 			status = 0;
 			mlog(0, "no slot for this node, so no recovery"
 			     "required.\n");
 			goto skip_recovery;
 		}
-		mlog(0, "node %d was using slot %d\n", node_num, slot_num);
+		mlog(0, "node %d was using slot %d\n",
+				rn->rn_node_num, rn->rn_slot_num);

 		/* It is a bit subtle with quota recovery. We cannot do it
 		 * immediately because we have to obtain cluster locks from
@@ -1388,18 +1366,19 @@ restart:
 		 * then quota usage would be out of sync until some node takes
 		 * the slot. So we remember which nodes need quota recovery
 		 * and when everything else is done, we recover quotas. */
-		for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
+		for (i = 0; i < rm_quota_used && rm_quota[i] != rn->rn_slot_num;
+					i++);
 		if (i == rm_quota_used)
-			rm_quota[rm_quota_used++] = slot_num;
+			rm_quota[rm_quota_used++] = rn->rn_slot_num;

-		status = ocfs2_recover_node(osb, node_num, slot_num);
+		status = ocfs2_recover_one_node(rn);
 skip_recovery:
 		if (!status) {
-			ocfs2_recovery_map_clear(osb, node_num);
+			ocfs2_recovery_node_clear(osb, rn);
 		} else {
 			mlog(ML_ERROR,
 			     "Error %d recovering node %d on device (%u,%u)!\n",
-			     status, node_num,
+			     status, rn->rn_node_num,
 			     MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
 			mlog(ML_ERROR, "Volume requires unmount.\n");
 		}
@@ -1461,6 +1440,7 @@ bail:

 void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
 {
+	int ret = 0;
 	mlog_entry("(node_num=%d, osb->node_num = %d)\n",
 		   node_num, osb->node_num);

@@ -1470,7 +1450,11 @@ void ocfs2_recovery_thread(struct ocfs2_super
*osb, int node_num)

 	/* People waiting on recovery will wait on
 	 * the recovery map to empty. */
-	if (ocfs2_recovery_map_set(osb, node_num))
+	ret = ocfs2_recovery_node_set(osb, node_num);
+	if (ret == -ENOMEM) {
+		mlog_errno(ret);
+		goto out;
+	} else if (ret)
 		mlog(0, "node %d already in recovery map.\n", node_num);

 	mlog(0, "starting recovery thread...\n");
@@ -1681,26 +1665,27 @@ done:
  * second part of a nodes recovery process (local alloc recovery) is
  * far less concerning.
  */
-static int ocfs2_recover_node(struct ocfs2_super *osb,
-			      int node_num, int slot_num)
+static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn)
 {
 	int status = 0;
 	struct ocfs2_dinode *la_copy = NULL;
 	struct ocfs2_dinode *tl_copy = NULL;
+	struct ocfs2_super *osb = rn->rn_osb;

 	mlog_entry("(node_num=%d, slot_num=%d, osb->node_num = %d)\n",
-		   node_num, slot_num, osb->node_num);
+		   rn->rn_node_num, rn->rn_slot_num, osb->node_num);

 	/* Should not ever be called to recover ourselves -- in that
 	 * case we should've called ocfs2_journal_load instead. */
-	BUG_ON(osb->node_num == node_num);
+	BUG_ON(osb->node_num == rn->rn_node_num);

-	status = ocfs2_replay_journal(osb, node_num, slot_num);
+	status = ocfs2_replay_journal(osb, rn->rn_node_num,
+			rn->rn_slot_num);
 	if (status < 0) {
 		if (status == -EBUSY) {
 			mlog(0, "Skipping recovery for slot %u (node %u) "
-			     "as another node has recovered it\n", slot_num,
-			     node_num);
+			     "as another node has recovered it\n",
+			     rn->rn_slot_num, rn->rn_node_num);
 			status = 0;
 			goto done;
 		}
@@ -1709,7 +1694,8 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
 	}

 	/* Stamp a clean local alloc file AFTER recovering the journal... */
-	status = ocfs2_begin_local_alloc_recovery(osb, slot_num, &la_copy);
+	status = ocfs2_begin_local_alloc_recovery(osb, rn->rn_slot_num,
+			&la_copy);
 	if (status < 0) {
 		mlog_errno(status);
 		goto done;
@@ -1718,22 +1704,24 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
 	/* An error from begin_truncate_log_recovery is not
 	 * serious enough to warrant halting the rest of
 	 * recovery. */
-	status = ocfs2_begin_truncate_log_recovery(osb, slot_num, &tl_copy);
+	status = ocfs2_begin_truncate_log_recovery(osb,
+			rn->rn_slot_num, &tl_copy);
 	if (status < 0)
 		mlog_errno(status);

 	/* Likewise, this would be a strange but ultimately not so
 	 * harmful place to get an error... */
-	status = ocfs2_clear_slot(osb, slot_num);
+	status = ocfs2_clear_slot(osb, rn->rn_slot_num);
 	if (status < 0)
 		mlog_errno(status);

 	/* This will kfree the memory pointed to by la_copy and tl_copy */
-	ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
-					tl_copy, NULL);
+	ocfs2_queue_recovery_completion(osb->journal, rn->rn_slot_num,
+				la_copy, tl_copy, NULL);

 	status = 0;
 done:
+	rn->rn_status = status;

 	mlog_exit(status);
 	return status;
diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h
index 43e56b9..0325d81 100644
--- a/fs/ocfs2/journal.h
+++ b/fs/ocfs2/journal.h
@@ -43,9 +43,12 @@ struct ocfs2_dinode;
  * It is protected by the recovery_lock.
  */

-struct ocfs2_recovery_map {
-	unsigned int rm_used;
-	unsigned int *rm_entries;
+struct ocfs2_recover_node {
+	struct ocfs2_super *rn_osb;
+	int rn_node_num;
+	int rn_slot_num;
+	int rn_status;
+	struct list_head rn_list;
 };


diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index d840821..318caac 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -337,7 +337,8 @@ struct ocfs2_super

 	atomic_t vol_state;
 	struct mutex recovery_lock;
-	struct ocfs2_recovery_map *recovery_map;
+	struct list_head s_recovery_nodes;
+
 	struct ocfs2_replay_map *replay_map;
 	struct task_struct *recovery_thread_task;
 	int disable_recovery;
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index f02c0ef..478715b 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -220,7 +220,6 @@ static const match_table_t tokens = {
 static int ocfs2_osb_dump(struct ocfs2_super *osb, char *buf, int len)
 {
 	struct ocfs2_cluster_connection *cconn = osb->cconn;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
 	struct ocfs2_orphan_scan *os = &osb->osb_orphan_scan;
 	int i, out = 0;

@@ -267,21 +266,6 @@ static int ocfs2_osb_dump(struct ocfs2_super
*osb, char *buf, int len)
 			osb->dc_work_sequence);
 	spin_unlock(&osb->dc_task_lock);

-	spin_lock(&osb->osb_lock);
-	out += snprintf(buf + out, len - out, "%10s => Pid: %d  Nodes:",
-			"Recovery",
-			(osb->recovery_thread_task ?
-			 task_pid_nr(osb->recovery_thread_task) : -1));
-	if (rm->rm_used == 0)
-		out += snprintf(buf + out, len - out, " None\n");
-	else {
-		for (i = 0; i < rm->rm_used; i++)
-			out += snprintf(buf + out, len - out, " %d",
-					rm->rm_entries[i]);
-		out += snprintf(buf + out, len - out, "\n");
-	}
-	spin_unlock(&osb->osb_lock);
-
 	out += snprintf(buf + out, len - out,
 			"%10s => Pid: %d  Interval: %lu  Needs: %d\n", "Commit",
 			(osb->commit_task ? task_pid_nr(osb->commit_task) : -1),
-- 
1.7.1


-- 
Goldwyn

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
  2010-11-16 22:59 Goldwyn Rodrigues
@ 2010-11-17  1:49 ` Wengang Wang
  2010-11-17  2:00   ` Wengang Wang
  0 siblings, 1 reply; 13+ messages in thread
From: Wengang Wang @ 2010-11-17  1:49 UTC (permalink / raw)
  To: ocfs2-devel

On 10-11-16 16:59, Goldwyn Rodrigues wrote:

<snip>
> @@ -212,8 +198,8 @@ static int ocfs2_recovery_thread_running(struct
> ocfs2_super *osb)
> 
>  void ocfs2_recovery_exit(struct ocfs2_super *osb)
>  {
> -	struct ocfs2_recovery_map *rm;
> -
> +	struct list_head *iter;
> +	struct ocfs2_recover_node *rn;
>  	/* disable any new recovery threads and wait for any currently
>  	 * running ones to exit. Do this before setting the vol_state. */
>  	mutex_lock(&osb->recovery_lock);
> @@ -226,75 +212,66 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
>  	 * complete. */
>  	flush_workqueue(ocfs2_wq);
> 
> -	/*
> -	 * Now that recovery is shut down, and the osb is about to be
> -	 * freed,  the osb_lock is not taken here.
> -	 */
> -	rm = osb->recovery_map;
> -	/* XXX: Should we bug if there are dirty entries? */
> -
> -	kfree(rm);
> +	spin_lock(&osb->osb_lock);
> +	list_for_each(iter, &osb->s_recovery_nodes) {

list_for_each_safe()?

regards,
wengang.
> +			rn = list_entry(iter, struct ocfs2_recover_node,
> +					rn_list);
> +			list_del(&rn->rn_list);
> +	}
> +	spin_unlock(&osb->osb_lock);
>  }
> 

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
  2010-11-17  1:49 ` Wengang Wang
@ 2010-11-17  2:00   ` Wengang Wang
  2010-11-17  2:04     ` Wengang Wang
  0 siblings, 1 reply; 13+ messages in thread
From: Wengang Wang @ 2010-11-17  2:00 UTC (permalink / raw)
  To: ocfs2-devel

On 10-11-17 09:49, Wengang Wang wrote:
> On 10-11-16 16:59, Goldwyn Rodrigues wrote:
> 
> <snip>
> > @@ -212,8 +198,8 @@ static int ocfs2_recovery_thread_running(struct
> > ocfs2_super *osb)
> > 
> >  void ocfs2_recovery_exit(struct ocfs2_super *osb)
> >  {
> > -	struct ocfs2_recovery_map *rm;
> > -
> > +	struct list_head *iter;
> > +	struct ocfs2_recover_node *rn;
> >  	/* disable any new recovery threads and wait for any currently
> >  	 * running ones to exit. Do this before setting the vol_state. */
> >  	mutex_lock(&osb->recovery_lock);
> > @@ -226,75 +212,66 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
> >  	 * complete. */
> >  	flush_workqueue(ocfs2_wq);
> > 
> > -	/*
> > -	 * Now that recovery is shut down, and the osb is about to be
> > -	 * freed,  the osb_lock is not taken here.
> > -	 */
> > -	rm = osb->recovery_map;
> > -	/* XXX: Should we bug if there are dirty entries? */
> > -
> > -	kfree(rm);
> > +	spin_lock(&osb->osb_lock);
> > +	list_for_each(iter, &osb->s_recovery_nodes) {
> 
> list_for_each_safe()?
> 
> regards,
> wengang.
> > +			rn = list_entry(iter, struct ocfs2_recover_node,
> > +					rn_list);
> > +			list_del(&rn->rn_list);

If you need to do this here, I guess you may also want to free rm.

regards,
wengang.
> > +	}
> > +	spin_unlock(&osb->osb_lock);
> >  }
> > 

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
  2010-11-17  2:00   ` Wengang Wang
@ 2010-11-17  2:04     ` Wengang Wang
  0 siblings, 0 replies; 13+ messages in thread
From: Wengang Wang @ 2010-11-17  2:04 UTC (permalink / raw)
  To: ocfs2-devel

On 10-11-17 10:00, Wengang Wang wrote:
> On 10-11-17 09:49, Wengang Wang wrote:
> > On 10-11-16 16:59, Goldwyn Rodrigues wrote:
> > 
> > <snip>
> > > @@ -212,8 +198,8 @@ static int ocfs2_recovery_thread_running(struct
> > > ocfs2_super *osb)
> > > 
> > >  void ocfs2_recovery_exit(struct ocfs2_super *osb)
> > >  {
> > > -	struct ocfs2_recovery_map *rm;
> > > -
> > > +	struct list_head *iter;
> > > +	struct ocfs2_recover_node *rn;
> > >  	/* disable any new recovery threads and wait for any currently
> > >  	 * running ones to exit. Do this before setting the vol_state. */
> > >  	mutex_lock(&osb->recovery_lock);
> > > @@ -226,75 +212,66 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
> > >  	 * complete. */
> > >  	flush_workqueue(ocfs2_wq);
> > > 
> > > -	/*
> > > -	 * Now that recovery is shut down, and the osb is about to be
> > > -	 * freed,  the osb_lock is not taken here.
> > > -	 */
> > > -	rm = osb->recovery_map;
> > > -	/* XXX: Should we bug if there are dirty entries? */
> > > -
> > > -	kfree(rm);
> > > +	spin_lock(&osb->osb_lock);
> > > +	list_for_each(iter, &osb->s_recovery_nodes) {
> > 
> > list_for_each_safe()?
> > 
> > regards,
> > wengang.
> > > +			rn = list_entry(iter, struct ocfs2_recover_node,
> > > +					rn_list);
> > > +			list_del(&rn->rn_list);
> 
> If you need to do this here, I guess you may also want to free rm.

Sorry, typo. s/rm/rn
> 
> regards,
> wengang.
> > > +	}
> > > +	spin_unlock(&osb->osb_lock);
> > >  }
> > > 

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
@ 2010-11-17 15:50 Goldwyn Rodrigues
  2010-12-02  4:49 ` Sunil Mushran
  2010-12-02  5:29 ` Joel Becker
  0 siblings, 2 replies; 13+ messages in thread
From: Goldwyn Rodrigues @ 2010-11-17 15:50 UTC (permalink / raw)
  To: ocfs2-devel

Review comments by Wengang incorporated.

ocfs2_recover_node is a new data structure to include all information required
to recover one node. The structure is maintainer as a list in the super block.

recovery_map is no longer required with ocfs2_recover_node.

Signed-off-by: Goldwyn Rodrigues <rgoldwyn@suse.de>
---
 fs/ocfs2/journal.c |  171 ++++++++++++++++++++++++----------------------------
 fs/ocfs2/journal.h |    9 ++-
 fs/ocfs2/ocfs2.h   |    3 +-
 fs/ocfs2/super.c   |   16 -----
 4 files changed, 88 insertions(+), 111 deletions(-)

diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index faa2303..277b810 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -58,8 +58,7 @@ DEFINE_SPINLOCK(trans_inc_lock);
 #define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000

 static int ocfs2_force_read_journal(struct inode *inode);
-static int ocfs2_recover_node(struct ocfs2_super *osb,
-			      int node_num, int slot_num);
+static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn);
 static int __ocfs2_recovery_thread(void *arg);
 static int ocfs2_commit_cache(struct ocfs2_super *osb);
 static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota);
@@ -179,24 +178,11 @@ void ocfs2_free_replay_slots(struct ocfs2_super *osb)

 int ocfs2_recovery_init(struct ocfs2_super *osb)
 {
-	struct ocfs2_recovery_map *rm;
-
 	mutex_init(&osb->recovery_lock);
 	osb->disable_recovery = 0;
 	osb->recovery_thread_task = NULL;
 	init_waitqueue_head(&osb->recovery_event);
-
-	rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
-		     osb->max_slots * sizeof(unsigned int),
-		     GFP_KERNEL);
-	if (!rm) {
-		mlog_errno(-ENOMEM);
-		return -ENOMEM;
-	}
-
-	rm->rm_entries = (unsigned int *)((char *)rm +
-					  sizeof(struct ocfs2_recovery_map));
-	osb->recovery_map = rm;
+	INIT_LIST_HEAD(&osb->s_recovery_nodes);

 	return 0;
 }
@@ -212,8 +198,8 @@ static int ocfs2_recovery_thread_running(struct
ocfs2_super *osb)

 void ocfs2_recovery_exit(struct ocfs2_super *osb)
 {
-	struct ocfs2_recovery_map *rm;
-
+	struct list_head *iter, *next;
+	struct ocfs2_recover_node *rn;
 	/* disable any new recovery threads and wait for any currently
 	 * running ones to exit. Do this before setting the vol_state. */
 	mutex_lock(&osb->recovery_lock);
@@ -226,75 +212,67 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
 	 * complete. */
 	flush_workqueue(ocfs2_wq);

-	/*
-	 * Now that recovery is shut down, and the osb is about to be
-	 * freed,  the osb_lock is not taken here.
-	 */
-	rm = osb->recovery_map;
-	/* XXX: Should we bug if there are dirty entries? */
-
-	kfree(rm);
+	spin_lock(&osb->osb_lock);
+	list_for_each_safe(iter, next, &osb->s_recovery_nodes) {
+			rn = list_entry(iter, struct ocfs2_recover_node,
+					rn_list);
+			list_del(&rn->rn_list);
+			kfree(rn);
+	}
+	spin_unlock(&osb->osb_lock);
 }

-static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
+static int __ocfs2_recovery_node_test(struct ocfs2_super *osb,
 				     unsigned int node_num)
 {
-	int i;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
+	struct ocfs2_recover_node *rn;
+	struct list_head *iter;

 	assert_spin_locked(&osb->osb_lock);
-
-	for (i = 0; i < rm->rm_used; i++) {
-		if (rm->rm_entries[i] == node_num)
+	list_for_each(iter, &osb->s_recovery_nodes) {
+		rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
+		if (rn->rn_node_num == node_num)
 			return 1;
 	}
-
 	return 0;
 }

 /* Behaves like test-and-set.  Returns the previous value */
-static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
+static int ocfs2_recovery_node_set(struct ocfs2_super *osb,
 				  unsigned int node_num)
 {
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
-
-	spin_lock(&osb->osb_lock);
-	if (__ocfs2_recovery_map_test(osb, node_num)) {
-		spin_unlock(&osb->osb_lock);
-		return 1;
-	}
+	struct list_head *iter;
+	struct ocfs2_recover_node *rn = NULL, *trn;

-	/* XXX: Can this be exploited? Not from o2dlm... */
-	BUG_ON(rm->rm_used >= osb->max_slots);
+	trn = kzalloc(sizeof(struct ocfs2_recover_node), GFP_NOFS);
+	if (!trn)
+		return -ENOMEM;

-	rm->rm_entries[rm->rm_used] = node_num;
-	rm->rm_used++;
+	spin_lock(&osb->osb_lock);
+	list_for_each(iter, &osb->s_recovery_nodes) {
+			rn = list_entry(iter, struct ocfs2_recover_node,
+					rn_list);
+			if (rn->rn_node_num == node_num) {
+				spin_unlock(&osb->osb_lock);
+				kfree(trn);
+				return 1;
+			}
+	}
+	trn->rn_node_num = node_num;
+	trn->rn_osb = osb;
+	list_add_tail(&trn->rn_list, &osb->s_recovery_nodes);
 	spin_unlock(&osb->osb_lock);

 	return 0;
 }

-static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
-				     unsigned int node_num)
+static void ocfs2_recovery_node_clear(struct ocfs2_super *osb,
+			struct ocfs2_recover_node *rn)
 {
-	int i;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
-
 	spin_lock(&osb->osb_lock);
-
-	for (i = 0; i < rm->rm_used; i++) {
-		if (rm->rm_entries[i] == node_num)
-			break;
-	}
-
-	if (i < rm->rm_used) {
-		/* XXX: be careful with the pointer math */
-		memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
-			(rm->rm_used - i - 1) * sizeof(unsigned int));
-		rm->rm_used--;
-	}
-
+	list_del(&rn->rn_list);
 	spin_unlock(&osb->osb_lock);
+	kfree(rn);
 }

 static int ocfs2_commit_cache(struct ocfs2_super *osb)
@@ -1092,10 +1070,9 @@ bail:
 static int ocfs2_recovery_completed(struct ocfs2_super *osb)
 {
 	int empty;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;

 	spin_lock(&osb->osb_lock);
-	empty = (rm->rm_used == 0);
+	empty = list_empty(&osb->s_recovery_nodes);
 	spin_unlock(&osb->osb_lock);

 	return empty;
@@ -1332,12 +1309,13 @@ void ocfs2_complete_quota_recovery(struct
ocfs2_super *osb)

 static int __ocfs2_recovery_thread(void *arg)
 {
-	int status, node_num, slot_num;
+	int status;
 	struct ocfs2_super *osb = arg;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
 	int *rm_quota = NULL;
 	int rm_quota_used = 0, i;
 	struct ocfs2_quota_recovery *qrec;
+	struct list_head *iter;
+	struct ocfs2_recover_node *rn = NULL;

 	mlog_entry_void();

@@ -1367,20 +1345,21 @@ restart:
 					NULL, NULL);

 	spin_lock(&osb->osb_lock);
-	while (rm->rm_used) {
+	list_for_each(iter, &osb->s_recovery_nodes) {
+		rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
 		/* It's always safe to remove entry zero, as we won't
 		 * clear it until ocfs2_recover_node() has succeeded. */
-		node_num = rm->rm_entries[0];
 		spin_unlock(&osb->osb_lock);
-		mlog(0, "checking node %d\n", node_num);
-		slot_num = ocfs2_node_num_to_slot(osb, node_num);
-		if (slot_num == -ENOENT) {
+		mlog(0, "checking node %d\n", rn->rn_node_num);
+		rn->rn_slot_num = ocfs2_node_num_to_slot(osb, rn->rn_node_num);
+		if (rn->rn_slot_num == -ENOENT) {
 			status = 0;
 			mlog(0, "no slot for this node, so no recovery"
 			     "required.\n");
 			goto skip_recovery;
 		}
-		mlog(0, "node %d was using slot %d\n", node_num, slot_num);
+		mlog(0, "node %d was using slot %d\n",
+				rn->rn_node_num, rn->rn_slot_num);

 		/* It is a bit subtle with quota recovery. We cannot do it
 		 * immediately because we have to obtain cluster locks from
@@ -1388,18 +1367,19 @@ restart:
 		 * then quota usage would be out of sync until some node takes
 		 * the slot. So we remember which nodes need quota recovery
 		 * and when everything else is done, we recover quotas. */
-		for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
+		for (i = 0; i < rm_quota_used && rm_quota[i] != rn->rn_slot_num;
+					i++);
 		if (i == rm_quota_used)
-			rm_quota[rm_quota_used++] = slot_num;
+			rm_quota[rm_quota_used++] = rn->rn_slot_num;

-		status = ocfs2_recover_node(osb, node_num, slot_num);
+		status = ocfs2_recover_one_node(rn);
 skip_recovery:
 		if (!status) {
-			ocfs2_recovery_map_clear(osb, node_num);
+			ocfs2_recovery_node_clear(osb, rn);
 		} else {
 			mlog(ML_ERROR,
 			     "Error %d recovering node %d on device (%u,%u)!\n",
-			     status, node_num,
+			     status, rn->rn_node_num,
 			     MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
 			mlog(ML_ERROR, "Volume requires unmount.\n");
 		}
@@ -1461,6 +1441,7 @@ bail:

 void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
 {
+	int ret = 0;
 	mlog_entry("(node_num=%d, osb->node_num = %d)\n",
 		   node_num, osb->node_num);

@@ -1470,7 +1451,11 @@ void ocfs2_recovery_thread(struct ocfs2_super
*osb, int node_num)

 	/* People waiting on recovery will wait on
 	 * the recovery map to empty. */
-	if (ocfs2_recovery_map_set(osb, node_num))
+	ret = ocfs2_recovery_node_set(osb, node_num);
+	if (ret == -ENOMEM) {
+		mlog_errno(ret);
+		goto out;
+	} else if (ret)
 		mlog(0, "node %d already in recovery map.\n", node_num);

 	mlog(0, "starting recovery thread...\n");
@@ -1681,26 +1666,27 @@ done:
  * second part of a nodes recovery process (local alloc recovery) is
  * far less concerning.
  */
-static int ocfs2_recover_node(struct ocfs2_super *osb,
-			      int node_num, int slot_num)
+static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn)
 {
 	int status = 0;
 	struct ocfs2_dinode *la_copy = NULL;
 	struct ocfs2_dinode *tl_copy = NULL;
+	struct ocfs2_super *osb = rn->rn_osb;

 	mlog_entry("(node_num=%d, slot_num=%d, osb->node_num = %d)\n",
-		   node_num, slot_num, osb->node_num);
+		   rn->rn_node_num, rn->rn_slot_num, osb->node_num);

 	/* Should not ever be called to recover ourselves -- in that
 	 * case we should've called ocfs2_journal_load instead. */
-	BUG_ON(osb->node_num == node_num);
+	BUG_ON(osb->node_num == rn->rn_node_num);

-	status = ocfs2_replay_journal(osb, node_num, slot_num);
+	status = ocfs2_replay_journal(osb, rn->rn_node_num,
+			rn->rn_slot_num);
 	if (status < 0) {
 		if (status == -EBUSY) {
 			mlog(0, "Skipping recovery for slot %u (node %u) "
-			     "as another node has recovered it\n", slot_num,
-			     node_num);
+			     "as another node has recovered it\n",
+			     rn->rn_slot_num, rn->rn_node_num);
 			status = 0;
 			goto done;
 		}
@@ -1709,7 +1695,8 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
 	}

 	/* Stamp a clean local alloc file AFTER recovering the journal... */
-	status = ocfs2_begin_local_alloc_recovery(osb, slot_num, &la_copy);
+	status = ocfs2_begin_local_alloc_recovery(osb, rn->rn_slot_num,
+			&la_copy);
 	if (status < 0) {
 		mlog_errno(status);
 		goto done;
@@ -1718,22 +1705,24 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
 	/* An error from begin_truncate_log_recovery is not
 	 * serious enough to warrant halting the rest of
 	 * recovery. */
-	status = ocfs2_begin_truncate_log_recovery(osb, slot_num, &tl_copy);
+	status = ocfs2_begin_truncate_log_recovery(osb,
+			rn->rn_slot_num, &tl_copy);
 	if (status < 0)
 		mlog_errno(status);

 	/* Likewise, this would be a strange but ultimately not so
 	 * harmful place to get an error... */
-	status = ocfs2_clear_slot(osb, slot_num);
+	status = ocfs2_clear_slot(osb, rn->rn_slot_num);
 	if (status < 0)
 		mlog_errno(status);

 	/* This will kfree the memory pointed to by la_copy and tl_copy */
-	ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
-					tl_copy, NULL);
+	ocfs2_queue_recovery_completion(osb->journal, rn->rn_slot_num,
+				la_copy, tl_copy, NULL);

 	status = 0;
 done:
+	rn->rn_status = status;

 	mlog_exit(status);
 	return status;
@@ -1822,7 +1811,7 @@ int ocfs2_mark_dead_nodes(struct ocfs2_super *osb)
 			continue;
 		}

-		if (__ocfs2_recovery_map_test(osb, node_num)) {
+		if (__ocfs2_recovery_node_test(osb, node_num)) {
 			spin_unlock(&osb->osb_lock);
 			continue;
 		}
diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h
index 43e56b9..0325d81 100644
--- a/fs/ocfs2/journal.h
+++ b/fs/ocfs2/journal.h
@@ -43,9 +43,12 @@ struct ocfs2_dinode;
  * It is protected by the recovery_lock.
  */

-struct ocfs2_recovery_map {
-	unsigned int rm_used;
-	unsigned int *rm_entries;
+struct ocfs2_recover_node {
+	struct ocfs2_super *rn_osb;
+	int rn_node_num;
+	int rn_slot_num;
+	int rn_status;
+	struct list_head rn_list;
 };


diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index 1efea36..f70c25a 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -337,7 +337,8 @@ struct ocfs2_super

 	atomic_t vol_state;
 	struct mutex recovery_lock;
-	struct ocfs2_recovery_map *recovery_map;
+	struct list_head s_recovery_nodes;
+
 	struct ocfs2_replay_map *replay_map;
 	struct task_struct *recovery_thread_task;
 	int disable_recovery;
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index f02c0ef..478715b 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -220,7 +220,6 @@ static const match_table_t tokens = {
 static int ocfs2_osb_dump(struct ocfs2_super *osb, char *buf, int len)
 {
 	struct ocfs2_cluster_connection *cconn = osb->cconn;
-	struct ocfs2_recovery_map *rm = osb->recovery_map;
 	struct ocfs2_orphan_scan *os = &osb->osb_orphan_scan;
 	int i, out = 0;

@@ -267,21 +266,6 @@ static int ocfs2_osb_dump(struct ocfs2_super
*osb, char *buf, int len)
 			osb->dc_work_sequence);
 	spin_unlock(&osb->dc_task_lock);

-	spin_lock(&osb->osb_lock);
-	out += snprintf(buf + out, len - out, "%10s => Pid: %d  Nodes:",
-			"Recovery",
-			(osb->recovery_thread_task ?
-			 task_pid_nr(osb->recovery_thread_task) : -1));
-	if (rm->rm_used == 0)
-		out += snprintf(buf + out, len - out, " None\n");
-	else {
-		for (i = 0; i < rm->rm_used; i++)
-			out += snprintf(buf + out, len - out, " %d",
-					rm->rm_entries[i]);
-		out += snprintf(buf + out, len - out, "\n");
-	}
-	spin_unlock(&osb->osb_lock);
-
 	out += snprintf(buf + out, len - out,
 			"%10s => Pid: %d  Interval: %lu  Needs: %d\n", "Commit",
 			(osb->commit_task ? task_pid_nr(osb->commit_task) : -1),
-- 
1.7.1


-- 
Goldwyn

^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
  2010-11-17 15:50 Goldwyn Rodrigues
@ 2010-12-02  4:49 ` Sunil Mushran
  2010-12-02  5:29 ` Joel Becker
  1 sibling, 0 replies; 13+ messages in thread
From: Sunil Mushran @ 2010-12-02  4:49 UTC (permalink / raw)
  To: ocfs2-devel

On 11/17/2010 07:50 AM, Goldwyn Rodrigues wrote:
> Review comments by Wengang incorporated.
>
> ocfs2_recover_node is a new data structure to include all information required
> to recover one node. The structure is maintainer as a list in the super block.

ocfs2_recover_node suggests action. How about calling it 
ocfs2_recovery_list.

> recovery_map is no longer required with ocfs2_recover_node.
>
> Signed-off-by: Goldwyn Rodrigues<rgoldwyn@suse.de>
> ---
>   fs/ocfs2/journal.c |  171 ++++++++++++++++++++++++----------------------------
>   fs/ocfs2/journal.h |    9 ++-
>   fs/ocfs2/ocfs2.h   |    3 +-
>   fs/ocfs2/super.c   |   16 -----
>   4 files changed, 88 insertions(+), 111 deletions(-)
>
> diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
> index faa2303..277b810 100644
> --- a/fs/ocfs2/journal.c
> +++ b/fs/ocfs2/journal.c
> @@ -58,8 +58,7 @@ DEFINE_SPINLOCK(trans_inc_lock);
>   #define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000
>
>   static int ocfs2_force_read_journal(struct inode *inode);
> -static int ocfs2_recover_node(struct ocfs2_super *osb,
> -			      int node_num, int slot_num);
> +static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn);
>   static int __ocfs2_recovery_thread(void *arg);
>   static int ocfs2_commit_cache(struct ocfs2_super *osb);
>   static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota);
> @@ -179,24 +178,11 @@ void ocfs2_free_replay_slots(struct ocfs2_super *osb)
>
>   int ocfs2_recovery_init(struct ocfs2_super *osb)
>   {
> -	struct ocfs2_recovery_map *rm;
> -
>   	mutex_init(&osb->recovery_lock);
>   	osb->disable_recovery = 0;
>   	osb->recovery_thread_task = NULL;
>   	init_waitqueue_head(&osb->recovery_event);
> -
> -	rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
> -		     osb->max_slots * sizeof(unsigned int),
> -		     GFP_KERNEL);
> -	if (!rm) {
> -		mlog_errno(-ENOMEM);
> -		return -ENOMEM;
> -	}
> -
> -	rm->rm_entries = (unsigned int *)((char *)rm +
> -					  sizeof(struct ocfs2_recovery_map));
> -	osb->recovery_map = rm;
> +	INIT_LIST_HEAD(&osb->s_recovery_nodes);
>
>   	return 0;
>   }
> @@ -212,8 +198,8 @@ static int ocfs2_recovery_thread_running(struct
> ocfs2_super *osb)
>
>   void ocfs2_recovery_exit(struct ocfs2_super *osb)
>   {
> -	struct ocfs2_recovery_map *rm;
> -
> +	struct list_head *iter, *next;
> +	struct ocfs2_recover_node *rn;
>   	/* disable any new recovery threads and wait for any currently
>   	 * running ones to exit. Do this before setting the vol_state. */
>   	mutex_lock(&osb->recovery_lock);
> @@ -226,75 +212,67 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
>   	 * complete. */
>   	flush_workqueue(ocfs2_wq);
>
> -	/*
> -	 * Now that recovery is shut down, and the osb is about to be
> -	 * freed,  the osb_lock is not taken here.
> -	 */
> -	rm = osb->recovery_map;
> -	/* XXX: Should we bug if there are dirty entries? */

This comment is still relevant.

> -
> -	kfree(rm);
> +	spin_lock(&osb->osb_lock);
> +	list_for_each_safe(iter, next,&osb->s_recovery_nodes) {
> +			rn = list_entry(iter, struct ocfs2_recover_node,
> +					rn_list);
> +			list_del(&rn->rn_list);
> +			kfree(rn);
> +	}
> +	spin_unlock(&osb->osb_lock);

extra indentation

>   }
>
> -static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
> +static int __ocfs2_recovery_node_test(struct ocfs2_super *osb,
>   				     unsigned int node_num)
>   {
> -	int i;
> -	struct ocfs2_recovery_map *rm = osb->recovery_map;
> +	struct ocfs2_recover_node *rn;
> +	struct list_head *iter;
>
>   	assert_spin_locked(&osb->osb_lock);
> -
> -	for (i = 0; i<  rm->rm_used; i++) {
> -		if (rm->rm_entries[i] == node_num)
> +	list_for_each(iter,&osb->s_recovery_nodes) {
> +		rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
> +		if (rn->rn_node_num == node_num)
>   			return 1;
>   	}
> -
>   	return 0;
>   }
>
>   /* Behaves like test-and-set.  Returns the previous value */
> -static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
> +static int ocfs2_recovery_node_set(struct ocfs2_super *osb,
>   				  unsigned int node_num)
>   {
> -	struct ocfs2_recovery_map *rm = osb->recovery_map;
> -
> -	spin_lock(&osb->osb_lock);
> -	if (__ocfs2_recovery_map_test(osb, node_num)) {
> -		spin_unlock(&osb->osb_lock);
> -		return 1;
> -	}
> +	struct list_head *iter;
> +	struct ocfs2_recover_node *rn = NULL, *trn;
>
> -	/* XXX: Can this be exploited? Not from o2dlm... */
> -	BUG_ON(rm->rm_used>= osb->max_slots);
> +	trn = kzalloc(sizeof(struct ocfs2_recover_node), GFP_NOFS);
> +	if (!trn)
> +		return -ENOMEM;

Is this enomem case handled properly up the stack? This is new.
Previously we allocated memory upfront. Wondering why we cannot
so the same here.

> -	rm->rm_entries[rm->rm_used] = node_num;
> -	rm->rm_used++;
> +	spin_lock(&osb->osb_lock);
> +	list_for_each(iter,&osb->s_recovery_nodes) {
> +			rn = list_entry(iter, struct ocfs2_recover_node,
> +					rn_list);
> +			if (rn->rn_node_num == node_num) {
> +				spin_unlock(&osb->osb_lock);
> +				kfree(trn);
> +				return 1;
> +			}
> +	}

extra indentation

> +	trn->rn_node_num = node_num;
> +	trn->rn_osb = osb;
> +	list_add_tail(&trn->rn_list,&osb->s_recovery_nodes);
>   	spin_unlock(&osb->osb_lock);
>
>   	return 0;
>   }
>
> -static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
> -				     unsigned int node_num)
> +static void ocfs2_recovery_node_clear(struct ocfs2_super *osb,
> +			struct ocfs2_recover_node *rn)
>   {
> -	int i;
> -	struct ocfs2_recovery_map *rm = osb->recovery_map;
> -
>   	spin_lock(&osb->osb_lock);
> -
> -	for (i = 0; i<  rm->rm_used; i++) {
> -		if (rm->rm_entries[i] == node_num)
> -			break;
> -	}
> -
> -	if (i<  rm->rm_used) {
> -		/* XXX: be careful with the pointer math */
> -		memmove(&(rm->rm_entries[i]),&(rm->rm_entries[i + 1]),
> -			(rm->rm_used - i - 1) * sizeof(unsigned int));
> -		rm->rm_used--;
> -	}
> -
> +	list_del(&rn->rn_list);
>   	spin_unlock(&osb->osb_lock);
> +	kfree(rn);

Aah...we did memmove. That won't work here. So list manipulation it is.

>   }
>
>   static int ocfs2_commit_cache(struct ocfs2_super *osb)
> @@ -1092,10 +1070,9 @@ bail:
>   static int ocfs2_recovery_completed(struct ocfs2_super *osb)
>   {
>   	int empty;
> -	struct ocfs2_recovery_map *rm = osb->recovery_map;
>
>   	spin_lock(&osb->osb_lock);
> -	empty = (rm->rm_used == 0);
> +	empty = list_empty(&osb->s_recovery_nodes);
>   	spin_unlock(&osb->osb_lock);
>
>   	return empty;
> @@ -1332,12 +1309,13 @@ void ocfs2_complete_quota_recovery(struct
> ocfs2_super *osb)
>
>   static int __ocfs2_recovery_thread(void *arg)
>   {
> -	int status, node_num, slot_num;
> +	int status;
>   	struct ocfs2_super *osb = arg;
> -	struct ocfs2_recovery_map *rm = osb->recovery_map;
>   	int *rm_quota = NULL;
>   	int rm_quota_used = 0, i;
>   	struct ocfs2_quota_recovery *qrec;
> +	struct list_head *iter;
> +	struct ocfs2_recover_node *rn = NULL;
>
>   	mlog_entry_void();
>
> @@ -1367,20 +1345,21 @@ restart:
>   					NULL, NULL);
>
>   	spin_lock(&osb->osb_lock);
> -	while (rm->rm_used) {
> +	list_for_each(iter,&osb->s_recovery_nodes) {
> +		rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
>   		/* It's always safe to remove entry zero, as we won't
>   		 * clear it until ocfs2_recover_node() has succeeded. */
> -		node_num = rm->rm_entries[0];
>   		spin_unlock(&osb->osb_lock);
> -		mlog(0, "checking node %d\n", node_num);
> -		slot_num = ocfs2_node_num_to_slot(osb, node_num);
> -		if (slot_num == -ENOENT) {
> +		mlog(0, "checking node %d\n", rn->rn_node_num);
> +		rn->rn_slot_num = ocfs2_node_num_to_slot(osb, rn->rn_node_num);
> +		if (rn->rn_slot_num == -ENOENT) {
>   			status = 0;
>   			mlog(0, "no slot for this node, so no recovery"
>   			     "required.\n");
>   			goto skip_recovery;
>   		}
> -		mlog(0, "node %d was using slot %d\n", node_num, slot_num);
> +		mlog(0, "node %d was using slot %d\n",
> +				rn->rn_node_num, rn->rn_slot_num);
>
>   		/* It is a bit subtle with quota recovery. We cannot do it
>   		 * immediately because we have to obtain cluster locks from
> @@ -1388,18 +1367,19 @@ restart:
>   		 * then quota usage would be out of sync until some node takes
>   		 * the slot. So we remember which nodes need quota recovery
>   		 * and when everything else is done, we recover quotas. */
> -		for (i = 0; i<  rm_quota_used&&  rm_quota[i] != slot_num; i++);
> +		for (i = 0; i<  rm_quota_used&&  rm_quota[i] != rn->rn_slot_num;
> +					i++);
>   		if (i == rm_quota_used)
> -			rm_quota[rm_quota_used++] = slot_num;
> +			rm_quota[rm_quota_used++] = rn->rn_slot_num;
>
> -		status = ocfs2_recover_node(osb, node_num, slot_num);
> +		status = ocfs2_recover_one_node(rn);
>   skip_recovery:
>   		if (!status) {
> -			ocfs2_recovery_map_clear(osb, node_num);
> +			ocfs2_recovery_node_clear(osb, rn);
>   		} else {
>   			mlog(ML_ERROR,
>   			     "Error %d recovering node %d on device (%u,%u)!\n",
> -			     status, node_num,
> +			     status, rn->rn_node_num,
>   			     MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
>   			mlog(ML_ERROR, "Volume requires unmount.\n");

Hmm... this mlog could do with some clean up. But that can be
a separate change.

>   		}
> @@ -1461,6 +1441,7 @@ bail:
>
>   void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
>   {
> +	int ret = 0;
>   	mlog_entry("(node_num=%d, osb->node_num = %d)\n",
>   		   node_num, osb->node_num);
>
> @@ -1470,7 +1451,11 @@ void ocfs2_recovery_thread(struct ocfs2_super
> *osb, int node_num)
>
>   	/* People waiting on recovery will wait on
>   	 * the recovery map to empty. */
> -	if (ocfs2_recovery_map_set(osb, node_num))
> +	ret = ocfs2_recovery_node_set(osb, node_num);
> +	if (ret == -ENOMEM) {
> +		mlog_errno(ret);
> +		goto out;
> +	} else if (ret)
>   		mlog(0, "node %d already in recovery map.\n", node_num);
>
>   	mlog(0, "starting recovery thread...\n");
> @@ -1681,26 +1666,27 @@ done:
>    * second part of a nodes recovery process (local alloc recovery) is
>    * far less concerning.
>    */
> -static int ocfs2_recover_node(struct ocfs2_super *osb,
> -			      int node_num, int slot_num)
> +static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn)
>   {
>   	int status = 0;
>   	struct ocfs2_dinode *la_copy = NULL;
>   	struct ocfs2_dinode *tl_copy = NULL;
> +	struct ocfs2_super *osb = rn->rn_osb;
>
>   	mlog_entry("(node_num=%d, slot_num=%d, osb->node_num = %d)\n",
> -		   node_num, slot_num, osb->node_num);
> +		   rn->rn_node_num, rn->rn_slot_num, osb->node_num);
>
>   	/* Should not ever be called to recover ourselves -- in that
>   	 * case we should've called ocfs2_journal_load instead. */
> -	BUG_ON(osb->node_num == node_num);
> +	BUG_ON(osb->node_num == rn->rn_node_num);
>
> -	status = ocfs2_replay_journal(osb, node_num, slot_num);
> +	status = ocfs2_replay_journal(osb, rn->rn_node_num,
> +			rn->rn_slot_num);
>   	if (status<  0) {
>   		if (status == -EBUSY) {
>   			mlog(0, "Skipping recovery for slot %u (node %u) "
> -			     "as another node has recovered it\n", slot_num,
> -			     node_num);
> +			     "as another node has recovered it\n",
> +			     rn->rn_slot_num, rn->rn_node_num);
>   			status = 0;
>   			goto done;
>   		}
> @@ -1709,7 +1695,8 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
>   	}
>
>   	/* Stamp a clean local alloc file AFTER recovering the journal... */
> -	status = ocfs2_begin_local_alloc_recovery(osb, slot_num,&la_copy);
> +	status = ocfs2_begin_local_alloc_recovery(osb, rn->rn_slot_num,
> +			&la_copy);
>   	if (status<  0) {
>   		mlog_errno(status);
>   		goto done;
> @@ -1718,22 +1705,24 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
>   	/* An error from begin_truncate_log_recovery is not
>   	 * serious enough to warrant halting the rest of
>   	 * recovery. */
> -	status = ocfs2_begin_truncate_log_recovery(osb, slot_num,&tl_copy);
> +	status = ocfs2_begin_truncate_log_recovery(osb,
> +			rn->rn_slot_num,&tl_copy);
>   	if (status<  0)
>   		mlog_errno(status);
>
>   	/* Likewise, this would be a strange but ultimately not so
>   	 * harmful place to get an error... */
> -	status = ocfs2_clear_slot(osb, slot_num);
> +	status = ocfs2_clear_slot(osb, rn->rn_slot_num);
>   	if (status<  0)
>   		mlog_errno(status);
>
>   	/* This will kfree the memory pointed to by la_copy and tl_copy */
> -	ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
> -					tl_copy, NULL);
> +	ocfs2_queue_recovery_completion(osb->journal, rn->rn_slot_num,
> +				la_copy, tl_copy, NULL);
>
>   	status = 0;
>   done:
> +	rn->rn_status = status;
>
>   	mlog_exit(status);
>   	return status;
> @@ -1822,7 +1811,7 @@ int ocfs2_mark_dead_nodes(struct ocfs2_super *osb)
>   			continue;
>   		}
>
> -		if (__ocfs2_recovery_map_test(osb, node_num)) {
> +		if (__ocfs2_recovery_node_test(osb, node_num)) {
>   			spin_unlock(&osb->osb_lock);
>   			continue;
>   		}
> diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h
> index 43e56b9..0325d81 100644
> --- a/fs/ocfs2/journal.h
> +++ b/fs/ocfs2/journal.h
> @@ -43,9 +43,12 @@ struct ocfs2_dinode;
>    * It is protected by the recovery_lock.
>    */
>
> -struct ocfs2_recovery_map {
> -	unsigned int rm_used;
> -	unsigned int *rm_entries;
> +struct ocfs2_recover_node {
> +	struct ocfs2_super *rn_osb;
> +	int rn_node_num;
> +	int rn_slot_num;
> +	int rn_status;
> +	struct list_head rn_list;
>   };

See comment above.

>
> diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
> index 1efea36..f70c25a 100644
> --- a/fs/ocfs2/ocfs2.h
> +++ b/fs/ocfs2/ocfs2.h
> @@ -337,7 +337,8 @@ struct ocfs2_super
>
>   	atomic_t vol_state;
>   	struct mutex recovery_lock;
> -	struct ocfs2_recovery_map *recovery_map;
> +	struct list_head s_recovery_nodes;

s_recovery_list will fit better.

> +
>   	struct ocfs2_replay_map *replay_map;
>   	struct task_struct *recovery_thread_task;
>   	int disable_recovery;
> diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
> index f02c0ef..478715b 100644
> --- a/fs/ocfs2/super.c
> +++ b/fs/ocfs2/super.c
> @@ -220,7 +220,6 @@ static const match_table_t tokens = {
>   static int ocfs2_osb_dump(struct ocfs2_super *osb, char *buf, int len)
>   {
>   	struct ocfs2_cluster_connection *cconn = osb->cconn;
> -	struct ocfs2_recovery_map *rm = osb->recovery_map;
>   	struct ocfs2_orphan_scan *os =&osb->osb_orphan_scan;
>   	int i, out = 0;
>
> @@ -267,21 +266,6 @@ static int ocfs2_osb_dump(struct ocfs2_super
> *osb, char *buf, int len)
>   			osb->dc_work_sequence);
>   	spin_unlock(&osb->dc_task_lock);
>
> -	spin_lock(&osb->osb_lock);
> -	out += snprintf(buf + out, len - out, "%10s =>  Pid: %d  Nodes:",
> -			"Recovery",
> -			(osb->recovery_thread_task ?
> -			 task_pid_nr(osb->recovery_thread_task) : -1));
> -	if (rm->rm_used == 0)
> -		out += snprintf(buf + out, len - out, " None\n");
> -	else {
> -		for (i = 0; i<  rm->rm_used; i++)
> -			out += snprintf(buf + out, len - out, " %d",
> -					rm->rm_entries[i]);
> -		out += snprintf(buf + out, len - out, "\n");
> -	}
> -	spin_unlock(&osb->osb_lock);
> -
>   	out += snprintf(buf + out, len - out,
>   			"%10s =>  Pid: %d  Interval: %lu  Needs: %d\n", "Commit",
>   			(osb->commit_task ? task_pid_nr(osb->commit_task) : -1),

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
  2010-11-17 15:50 Goldwyn Rodrigues
  2010-12-02  4:49 ` Sunil Mushran
@ 2010-12-02  5:29 ` Joel Becker
  2010-12-02 19:05   ` Sunil Mushran
  1 sibling, 1 reply; 13+ messages in thread
From: Joel Becker @ 2010-12-02  5:29 UTC (permalink / raw)
  To: ocfs2-devel

On Wed, Nov 17, 2010 at 09:50:04AM -0600, Goldwyn Rodrigues wrote:
> @@ -1470,7 +1451,11 @@ void ocfs2_recovery_thread(struct ocfs2_super
> *osb, int node_num)
> 
>  	/* People waiting on recovery will wait on
>  	 * the recovery map to empty. */
> -	if (ocfs2_recovery_map_set(osb, node_num))
> +	ret = ocfs2_recovery_node_set(osb, node_num);
> +	if (ret == -ENOMEM) {
> +		mlog_errno(ret);
> +		goto out;
> +	} else if (ret)
>  		mlog(0, "node %d already in recovery map.\n", node_num);

	This is a broken change.  If we get -ENOMEM, we won't block
other processes.  We can't have that happen.  There are two possible
solutions.  First, like Sunil said, we can preallocate max_slots recovery
entries.  Seems pretty sane.  The other solution would be to set an
in-recovery flag that others can check, so even when the recovery list
is empty because of a failed allocation, other processes still block.  I
prefer the preallocation because it doesn't fail recovery.

Joel

-- 

"In a crisis, don't hide behind anything or anybody. They're going
 to find you anyway."
	- Paul "Bear" Bryant

Joel Becker
Senior Development Manager
Oracle
E-mail: joel.becker at oracle.com
Phone: (650) 506-8127

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
  2010-12-02  5:29 ` Joel Becker
@ 2010-12-02 19:05   ` Sunil Mushran
  2010-12-02 23:49     ` Joel Becker
  0 siblings, 1 reply; 13+ messages in thread
From: Sunil Mushran @ 2010-12-02 19:05 UTC (permalink / raw)
  To: ocfs2-devel

On 12/01/2010 09:29 PM, Joel Becker wrote:
> On Wed, Nov 17, 2010 at 09:50:04AM -0600, Goldwyn Rodrigues wrote:
>> @@ -1470,7 +1451,11 @@ void ocfs2_recovery_thread(struct ocfs2_super
>> *osb, int node_num)
>>
>>   	/* People waiting on recovery will wait on
>>   	 * the recovery map to empty. */
>> -	if (ocfs2_recovery_map_set(osb, node_num))
>> +	ret = ocfs2_recovery_node_set(osb, node_num);
>> +	if (ret == -ENOMEM) {
>> +		mlog_errno(ret);
>> +		goto out;
>> +	} else if (ret)
>>   		mlog(0, "node %d already in recovery map.\n", node_num);
> 	This is a broken change.  If we get -ENOMEM, we won't block
> other processes.  We can't have that happen.  There are two possible
> solutions.  First, like Sunil said, we can preallocate max_slots recovery
> entries.  Seems pretty sane.  The other solution would be to set an
> in-recovery flag that others can check, so even when the recovery list
> is empty because of a failed allocation, other processes still block.  I
> prefer the preallocation because it doesn't fail recovery.

You could stick with the list but preallocate max_slots items during init.
Attach all those to the s_init_reco_list. During set, move a item to
s_active_reco_list. Clear moves it back.

^ permalink raw reply	[flat|nested] 13+ messages in thread

* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
  2010-12-02 19:05   ` Sunil Mushran
@ 2010-12-02 23:49     ` Joel Becker
  0 siblings, 0 replies; 13+ messages in thread
From: Joel Becker @ 2010-12-02 23:49 UTC (permalink / raw)
  To: ocfs2-devel

On Thu, Dec 02, 2010 at 11:05:01AM -0800, Sunil Mushran wrote:
> On 12/01/2010 09:29 PM, Joel Becker wrote:
> > On Wed, Nov 17, 2010 at 09:50:04AM -0600, Goldwyn Rodrigues wrote:
> >> @@ -1470,7 +1451,11 @@ void ocfs2_recovery_thread(struct ocfs2_super
> >> *osb, int node_num)
> >>
> >>   	/* People waiting on recovery will wait on
> >>   	 * the recovery map to empty. */
> >> -	if (ocfs2_recovery_map_set(osb, node_num))
> >> +	ret = ocfs2_recovery_node_set(osb, node_num);
> >> +	if (ret == -ENOMEM) {
> >> +		mlog_errno(ret);
> >> +		goto out;
> >> +	} else if (ret)
> >>   		mlog(0, "node %d already in recovery map.\n", node_num);
> > 	This is a broken change.  If we get -ENOMEM, we won't block
> > other processes.  We can't have that happen.  There are two possible
> > solutions.  First, like Sunil said, we can preallocate max_slots recovery
> > entries.  Seems pretty sane.  The other solution would be to set an
> > in-recovery flag that others can check, so even when the recovery list
> > is empty because of a failed allocation, other processes still block.  I
> > prefer the preallocation because it doesn't fail recovery.
> 
> You could stick with the list but preallocate max_slots items during init.
> Attach all those to the s_init_reco_list. During set, move a item to
> s_active_reco_list. Clear moves it back.

	Exactly.

Joel

-- 

"You must remember this:
 A kiss is just a kiss,
 A sigh is just a sigh.
 The fundamental rules apply
 As time goes by."

Joel Becker
Senior Development Manager
Oracle
E-mail: joel.becker at oracle.com
Phone: (650) 506-8127

^ permalink raw reply	[flat|nested] 13+ messages in thread

end of thread, other threads:[~2010-12-02 23:49 UTC | newest]

Thread overview: 13+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2010-11-12  1:01 [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node Goldwyn Rodrigues
2010-11-12  2:16 ` Wengang Wang
2010-11-12 19:22   ` Goldwyn Rodrigues
2010-11-13  5:03     ` Wengang Wang
  -- strict thread matches above, loose matches on Subject: below --
2010-11-16 22:59 Goldwyn Rodrigues
2010-11-17  1:49 ` Wengang Wang
2010-11-17  2:00   ` Wengang Wang
2010-11-17  2:04     ` Wengang Wang
2010-11-17 15:50 Goldwyn Rodrigues
2010-12-02  4:49 ` Sunil Mushran
2010-12-02  5:29 ` Joel Becker
2010-12-02 19:05   ` Sunil Mushran
2010-12-02 23:49     ` Joel Becker

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.