* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
@ 2010-11-12 1:01 Goldwyn Rodrigues
2010-11-12 2:16 ` Wengang Wang
0 siblings, 1 reply; 13+ messages in thread
From: Goldwyn Rodrigues @ 2010-11-12 1:01 UTC (permalink / raw)
To: ocfs2-devel
ocfs2_recover_node is a new data structure to include all information required
to recover one node. The structure is maintained as a list in the super block.
ocfs2_recover_node replaces the recovery_map
Signed-off-by: Goldwyn Rodrigues <rgoldwyn@suse.de>
---
fs/ocfs2/journal.c | 170 +++++++++++++++++++++------------------------------
fs/ocfs2/journal.h | 9 ++-
fs/ocfs2/ocfs2.h | 3 +-
fs/ocfs2/super.c | 16 -----
4 files changed, 78 insertions(+), 120 deletions(-)
diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index faa2303..6c378d6 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -58,8 +58,7 @@ DEFINE_SPINLOCK(trans_inc_lock);
#define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000
static int ocfs2_force_read_journal(struct inode *inode);
-static int ocfs2_recover_node(struct ocfs2_super *osb,
- int node_num, int slot_num);
+static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn);
static int __ocfs2_recovery_thread(void *arg);
static int ocfs2_commit_cache(struct ocfs2_super *osb);
static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota);
@@ -179,24 +178,11 @@ void ocfs2_free_replay_slots(struct ocfs2_super *osb)
int ocfs2_recovery_init(struct ocfs2_super *osb)
{
- struct ocfs2_recovery_map *rm;
-
mutex_init(&osb->recovery_lock);
osb->disable_recovery = 0;
osb->recovery_thread_task = NULL;
init_waitqueue_head(&osb->recovery_event);
-
- rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
- osb->max_slots * sizeof(unsigned int),
- GFP_KERNEL);
- if (!rm) {
- mlog_errno(-ENOMEM);
- return -ENOMEM;
- }
-
- rm->rm_entries = (unsigned int *)((char *)rm +
- sizeof(struct ocfs2_recovery_map));
- osb->recovery_map = rm;
+ INIT_LIST_HEAD(&osb->s_recovery_nodes);
return 0;
}
@@ -212,8 +198,6 @@ static int ocfs2_recovery_thread_running(struct
ocfs2_super *osb)
void ocfs2_recovery_exit(struct ocfs2_super *osb)
{
- struct ocfs2_recovery_map *rm;
-
/* disable any new recovery threads and wait for any currently
* running ones to exit. Do this before setting the vol_state. */
mutex_lock(&osb->recovery_lock);
@@ -226,29 +210,20 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
* complete. */
flush_workqueue(ocfs2_wq);
- /*
- * Now that recovery is shut down, and the osb is about to be
- * freed, the osb_lock is not taken here.
- */
- rm = osb->recovery_map;
- /* XXX: Should we bug if there are dirty entries? */
-
- kfree(rm);
}
static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
unsigned int node_num)
{
- int i;
- struct ocfs2_recovery_map *rm = osb->recovery_map;
+ struct ocfs2_recover_node *rn;
+ struct list_head *iter;
assert_spin_locked(&osb->osb_lock);
-
- for (i = 0; i < rm->rm_used; i++) {
- if (rm->rm_entries[i] == node_num)
+ list_for_each(iter, &osb->s_recovery_nodes) {
+ rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
+ if (rn->rn_node_num == node_num)
return 1;
}
-
return 0;
}
@@ -256,45 +231,36 @@ static int __ocfs2_recovery_map_test(struct
ocfs2_super *osb,
static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
unsigned int node_num)
{
- struct ocfs2_recovery_map *rm = osb->recovery_map;
-
- spin_lock(&osb->osb_lock);
- if (__ocfs2_recovery_map_test(osb, node_num)) {
- spin_unlock(&osb->osb_lock);
- return 1;
- }
+ struct list_head *iter;
+ struct ocfs2_recover_node *rn = NULL, *trn;
- /* XXX: Can this be exploited? Not from o2dlm... */
- BUG_ON(rm->rm_used >= osb->max_slots);
+ trn = kzalloc(sizeof(struct ocfs2_recover_node), GFP_KERNEL);
- rm->rm_entries[rm->rm_used] = node_num;
- rm->rm_used++;
+ spin_lock(&osb->osb_lock);
+ list_for_each(iter, &osb->s_recovery_nodes) {
+ rn = list_entry(iter, struct ocfs2_recover_node,
+ rn_list);
+ if (rn->rn_node_num == node_num) {
+ spin_unlock(&osb->osb_lock);
+ kfree(trn);
+ return 1;
+ }
+ }
+ trn->rn_node_num = node_num;
+ trn->rn_osb = osb;
+ list_add_tail(&trn->rn_list, &osb->s_recovery_nodes);
spin_unlock(&osb->osb_lock);
return 0;
}
static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
- unsigned int node_num)
+ struct ocfs2_recover_node *rn)
{
- int i;
- struct ocfs2_recovery_map *rm = osb->recovery_map;
-
spin_lock(&osb->osb_lock);
-
- for (i = 0; i < rm->rm_used; i++) {
- if (rm->rm_entries[i] == node_num)
- break;
- }
-
- if (i < rm->rm_used) {
- /* XXX: be careful with the pointer math */
- memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
- (rm->rm_used - i - 1) * sizeof(unsigned int));
- rm->rm_used--;
- }
-
+ list_del(&rn->rn_list);
spin_unlock(&osb->osb_lock);
+ kfree(rn);
}
static int ocfs2_commit_cache(struct ocfs2_super *osb)
@@ -1092,10 +1058,9 @@ bail:
static int ocfs2_recovery_completed(struct ocfs2_super *osb)
{
int empty;
- struct ocfs2_recovery_map *rm = osb->recovery_map;
spin_lock(&osb->osb_lock);
- empty = (rm->rm_used == 0);
+ empty = list_empty(&osb->s_recovery_nodes);
spin_unlock(&osb->osb_lock);
return empty;
@@ -1332,12 +1297,13 @@ void ocfs2_complete_quota_recovery(struct
ocfs2_super *osb)
static int __ocfs2_recovery_thread(void *arg)
{
- int status, node_num, slot_num;
+ int status;
struct ocfs2_super *osb = arg;
- struct ocfs2_recovery_map *rm = osb->recovery_map;
int *rm_quota = NULL;
int rm_quota_used = 0, i;
struct ocfs2_quota_recovery *qrec;
+ struct list_head *iter;
+ struct ocfs2_recover_node *rn = NULL;
mlog_entry_void();
@@ -1367,20 +1333,21 @@ restart:
NULL, NULL);
spin_lock(&osb->osb_lock);
- while (rm->rm_used) {
+ list_for_each(iter, &osb->s_recovery_nodes) {
+ rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
/* It's always safe to remove entry zero, as we won't
* clear it until ocfs2_recover_node() has succeeded. */
- node_num = rm->rm_entries[0];
spin_unlock(&osb->osb_lock);
- mlog(0, "checking node %d\n", node_num);
- slot_num = ocfs2_node_num_to_slot(osb, node_num);
- if (slot_num == -ENOENT) {
+ mlog(0, "checking node %d\n", rn->rn_node_num);
+ rn->rn_slot_num = ocfs2_node_num_to_slot(osb, rn->rn_node_num);
+ if (rn->rn_slot_num == -ENOENT) {
status = 0;
mlog(0, "no slot for this node, so no recovery"
"required.\n");
goto skip_recovery;
}
- mlog(0, "node %d was using slot %d\n", node_num, slot_num);
+ mlog(0, "node %d was using slot %d\n",
+ rn->rn_node_num, rn->rn_slot_num);
/* It is a bit subtle with quota recovery. We cannot do it
* immediately because we have to obtain cluster locks from
@@ -1388,18 +1355,19 @@ restart:
* then quota usage would be out of sync until some node takes
* the slot. So we remember which nodes need quota recovery
* and when everything else is done, we recover quotas. */
- for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
+ for (i = 0; i < rm_quota_used && rm_quota[i] != rn->rn_slot_num;
+ i++);
if (i == rm_quota_used)
- rm_quota[rm_quota_used++] = slot_num;
+ rm_quota[rm_quota_used++] = rn->rn_slot_num;
- status = ocfs2_recover_node(osb, node_num, slot_num);
+ status = ocfs2_recover_one_node(rn);
skip_recovery:
if (!status) {
- ocfs2_recovery_map_clear(osb, node_num);
+ ocfs2_recovery_map_clear(osb, rn);
} else {
mlog(ML_ERROR,
"Error %d recovering node %d on device (%u,%u)!\n",
- status, node_num,
+ status, rn->rn_node_num,
MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
mlog(ML_ERROR, "Volume requires unmount.\n");
}
@@ -1681,62 +1649,64 @@ done:
* second part of a nodes recovery process (local alloc recovery) is
* far less concerning.
*/
-static int ocfs2_recover_node(struct ocfs2_super *osb,
- int node_num, int slot_num)
+static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn)
{
- int status = 0;
struct ocfs2_dinode *la_copy = NULL;
struct ocfs2_dinode *tl_copy = NULL;
+ struct ocfs2_super *osb = rn->rn_osb;
mlog_entry("(node_num=%d, slot_num=%d, osb->node_num = %d)\n",
- node_num, slot_num, osb->node_num);
+ rn->rn_node_num, rn->rn_slot_num, osb->node_num);
/* Should not ever be called to recover ourselves -- in that
* case we should've called ocfs2_journal_load instead. */
- BUG_ON(osb->node_num == node_num);
+ BUG_ON(osb->node_num == rn->rn_node_num);
- status = ocfs2_replay_journal(osb, node_num, slot_num);
- if (status < 0) {
- if (status == -EBUSY) {
+ rn->rn_status = ocfs2_replay_journal(osb, rn->rn_node_num,
+ rn->rn_slot_num);
+ if (rn->rn_status < 0) {
+ if (rn->rn_status == -EBUSY) {
mlog(0, "Skipping recovery for slot %u (node %u) "
- "as another node has recovered it\n", slot_num,
- node_num);
- status = 0;
+ "as another node has recovered it\n",
+ rn->rn_slot_num, rn->rn_node_num);
+ rn->rn_status = 0;
goto done;
}
- mlog_errno(status);
+ mlog_errno(rn->rn_status);
goto done;
}
/* Stamp a clean local alloc file AFTER recovering the journal... */
- status = ocfs2_begin_local_alloc_recovery(osb, slot_num, &la_copy);
- if (status < 0) {
- mlog_errno(status);
+ rn->rn_status = ocfs2_begin_local_alloc_recovery(osb, rn->rn_slot_num,
+ &la_copy);
+ if (rn->rn_status < 0) {
+ mlog_errno(rn->rn_status);
goto done;
}
/* An error from begin_truncate_log_recovery is not
* serious enough to warrant halting the rest of
* recovery. */
- status = ocfs2_begin_truncate_log_recovery(osb, slot_num, &tl_copy);
- if (status < 0)
- mlog_errno(status);
+ rn->rn_status = ocfs2_begin_truncate_log_recovery(osb,
+ rn->rn_slot_num, &tl_copy);
+ if (rn->rn_status < 0)
+ mlog_errno(rn->rn_status);
/* Likewise, this would be a strange but ultimately not so
* harmful place to get an error... */
- status = ocfs2_clear_slot(osb, slot_num);
- if (status < 0)
- mlog_errno(status);
+ rn->rn_status = ocfs2_clear_slot(osb, rn->rn_slot_num);
+ if (rn->rn_status < 0)
+ mlog_errno(rn->rn_status);
/* This will kfree the memory pointed to by la_copy and tl_copy */
- ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
- tl_copy, NULL);
+ ocfs2_queue_recovery_completion(osb->journal, rn->rn_slot_num,
+ la_copy, tl_copy, NULL);
- status = 0;
+ rn->rn_status = 0;
done:
- mlog_exit(status);
- return status;
+ mlog_exit(rn->rn_status);
+ return rn->rn_status;
}
/* Test node liveness by trylocking his journal. If we get the lock,
diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h
index 43e56b9..0325d81 100644
--- a/fs/ocfs2/journal.h
+++ b/fs/ocfs2/journal.h
@@ -43,9 +43,12 @@ struct ocfs2_dinode;
* It is protected by the recovery_lock.
*/
-struct ocfs2_recovery_map {
- unsigned int rm_used;
- unsigned int *rm_entries;
+struct ocfs2_recover_node {
+ struct ocfs2_super *rn_osb;
+ int rn_node_num;
+ int rn_slot_num;
+ int rn_status;
+ struct list_head rn_list;
};
diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index d840821..318caac 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -337,7 +337,8 @@ struct ocfs2_super
atomic_t vol_state;
struct mutex recovery_lock;
- struct ocfs2_recovery_map *recovery_map;
+ struct list_head s_recovery_nodes;
+
struct ocfs2_replay_map *replay_map;
struct task_struct *recovery_thread_task;
int disable_recovery;
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index f02c0ef..478715b 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -220,7 +220,6 @@ static const match_table_t tokens = {
static int ocfs2_osb_dump(struct ocfs2_super *osb, char *buf, int len)
{
struct ocfs2_cluster_connection *cconn = osb->cconn;
- struct ocfs2_recovery_map *rm = osb->recovery_map;
struct ocfs2_orphan_scan *os = &osb->osb_orphan_scan;
int i, out = 0;
@@ -267,21 +266,6 @@ static int ocfs2_osb_dump(struct ocfs2_super
*osb, char *buf, int len)
osb->dc_work_sequence);
spin_unlock(&osb->dc_task_lock);
- spin_lock(&osb->osb_lock);
- out += snprintf(buf + out, len - out, "%10s => Pid: %d Nodes:",
- "Recovery",
- (osb->recovery_thread_task ?
- task_pid_nr(osb->recovery_thread_task) : -1));
- if (rm->rm_used == 0)
- out += snprintf(buf + out, len - out, " None\n");
- else {
- for (i = 0; i < rm->rm_used; i++)
- out += snprintf(buf + out, len - out, " %d",
- rm->rm_entries[i]);
- out += snprintf(buf + out, len - out, "\n");
- }
- spin_unlock(&osb->osb_lock);
-
out += snprintf(buf + out, len - out,
"%10s => Pid: %d Interval: %lu Needs: %d\n", "Commit",
(osb->commit_task ? task_pid_nr(osb->commit_task) : -1),
--
1.7.1
--
Goldwyn
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
2010-11-12 1:01 Goldwyn Rodrigues
@ 2010-11-12 2:16 ` Wengang Wang
2010-11-12 19:22 ` Goldwyn Rodrigues
0 siblings, 1 reply; 13+ messages in thread
From: Wengang Wang @ 2010-11-12 2:16 UTC (permalink / raw)
To: ocfs2-devel
On 10-11-11 19:01, Goldwyn Rodrigues wrote:
> ocfs2_recover_node is a new data structure to include all information required
> to recover one node. The structure is maintained as a list in the super block.
>
> ocfs2_recover_node replaces the recovery_map
>
> Signed-off-by: Goldwyn Rodrigues <rgoldwyn@suse.de>
> ---
> fs/ocfs2/journal.c | 170 +++++++++++++++++++++------------------------------
> fs/ocfs2/journal.h | 9 ++-
> fs/ocfs2/ocfs2.h | 3 +-
> fs/ocfs2/super.c | 16 -----
> 4 files changed, 78 insertions(+), 120 deletions(-)
>
> diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
> index faa2303..6c378d6 100644
> --- a/fs/ocfs2/journal.c
> +++ b/fs/ocfs2/journal.c
> @@ -58,8 +58,7 @@ DEFINE_SPINLOCK(trans_inc_lock);
> #define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000
>
> static int ocfs2_force_read_journal(struct inode *inode);
> -static int ocfs2_recover_node(struct ocfs2_super *osb,
> - int node_num, int slot_num);
> +static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn);
> static int __ocfs2_recovery_thread(void *arg);
> static int ocfs2_commit_cache(struct ocfs2_super *osb);
> static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota);
> @@ -179,24 +178,11 @@ void ocfs2_free_replay_slots(struct ocfs2_super *osb)
>
> int ocfs2_recovery_init(struct ocfs2_super *osb)
> {
> - struct ocfs2_recovery_map *rm;
> -
> mutex_init(&osb->recovery_lock);
> osb->disable_recovery = 0;
> osb->recovery_thread_task = NULL;
> init_waitqueue_head(&osb->recovery_event);
> -
> - rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
> - osb->max_slots * sizeof(unsigned int),
> - GFP_KERNEL);
> - if (!rm) {
> - mlog_errno(-ENOMEM);
> - return -ENOMEM;
> - }
> -
> - rm->rm_entries = (unsigned int *)((char *)rm +
> - sizeof(struct ocfs2_recovery_map));
> - osb->recovery_map = rm;
> + INIT_LIST_HEAD(&osb->s_recovery_nodes);
>
> return 0;
> }
> @@ -212,8 +198,6 @@ static int ocfs2_recovery_thread_running(struct
> ocfs2_super *osb)
>
> void ocfs2_recovery_exit(struct ocfs2_super *osb)
> {
> - struct ocfs2_recovery_map *rm;
> -
> /* disable any new recovery threads and wait for any currently
> * running ones to exit. Do this before setting the vol_state. */
> mutex_lock(&osb->recovery_lock);
> @@ -226,29 +210,20 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
> * complete. */
> flush_workqueue(ocfs2_wq);
>
> - /*
> - * Now that recovery is shut down, and the osb is about to be
> - * freed, the osb_lock is not taken here.
> - */
> - rm = osb->recovery_map;
> - /* XXX: Should we bug if there are dirty entries? */
> -
> - kfree(rm);
> }
needs to free ocfs2_recover_nodes in list osb->s_recovery_nodes?
>
> static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
> unsigned int node_num)
for function names, would you also change all *map* to *node* accordingly?
> {
> - int i;
> - struct ocfs2_recovery_map *rm = osb->recovery_map;
> + struct ocfs2_recover_node *rn;
> + struct list_head *iter;
>
> assert_spin_locked(&osb->osb_lock);
> -
> - for (i = 0; i < rm->rm_used; i++) {
> - if (rm->rm_entries[i] == node_num)
> + list_for_each(iter, &osb->s_recovery_nodes) {
> + rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
> + if (rn->rn_node_num == node_num)
> return 1;
> }
> -
> return 0;
> }
>
> @@ -256,45 +231,36 @@ static int __ocfs2_recovery_map_test(struct
> ocfs2_super *osb,
> static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
> unsigned int node_num)
> {
> - struct ocfs2_recovery_map *rm = osb->recovery_map;
> -
> - spin_lock(&osb->osb_lock);
> - if (__ocfs2_recovery_map_test(osb, node_num)) {
> - spin_unlock(&osb->osb_lock);
> - return 1;
> - }
> + struct list_head *iter;
> + struct ocfs2_recover_node *rn = NULL, *trn;
>
> - /* XXX: Can this be exploited? Not from o2dlm... */
> - BUG_ON(rm->rm_used >= osb->max_slots);
> + trn = kzalloc(sizeof(struct ocfs2_recover_node), GFP_KERNEL);
>
GFP_NOFS is better in this case?
NULL check?
> - rm->rm_entries[rm->rm_used] = node_num;
> - rm->rm_used++;
> + spin_lock(&osb->osb_lock);
> + list_for_each(iter, &osb->s_recovery_nodes) {
> + rn = list_entry(iter, struct ocfs2_recover_node,
> + rn_list);
> + if (rn->rn_node_num == node_num) {
> + spin_unlock(&osb->osb_lock);
> + kfree(trn);
> + return 1;
> + }
> + }
> + trn->rn_node_num = node_num;
> + trn->rn_osb = osb;
> + list_add_tail(&trn->rn_list, &osb->s_recovery_nodes);
> spin_unlock(&osb->osb_lock);
>
> return 0;
> }
>
> static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
> - unsigned int node_num)
> + struct ocfs2_recover_node *rn)
> {
> - int i;
> - struct ocfs2_recovery_map *rm = osb->recovery_map;
> -
> spin_lock(&osb->osb_lock);
> -
> - for (i = 0; i < rm->rm_used; i++) {
> - if (rm->rm_entries[i] == node_num)
> - break;
> - }
> -
> - if (i < rm->rm_used) {
> - /* XXX: be careful with the pointer math */
> - memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
> - (rm->rm_used - i - 1) * sizeof(unsigned int));
> - rm->rm_used--;
> - }
> -
> + list_del(&rn->rn_list);
> spin_unlock(&osb->osb_lock);
> + kfree(rn);
> }
>
> static int ocfs2_commit_cache(struct ocfs2_super *osb)
> @@ -1092,10 +1058,9 @@ bail:
> static int ocfs2_recovery_completed(struct ocfs2_super *osb)
> {
> int empty;
> - struct ocfs2_recovery_map *rm = osb->recovery_map;
>
> spin_lock(&osb->osb_lock);
> - empty = (rm->rm_used == 0);
> + empty = list_empty(&osb->s_recovery_nodes);
> spin_unlock(&osb->osb_lock);
>
> return empty;
> @@ -1332,12 +1297,13 @@ void ocfs2_complete_quota_recovery(struct
> ocfs2_super *osb)
>
> static int __ocfs2_recovery_thread(void *arg)
> {
> - int status, node_num, slot_num;
> + int status;
> struct ocfs2_super *osb = arg;
> - struct ocfs2_recovery_map *rm = osb->recovery_map;
> int *rm_quota = NULL;
> int rm_quota_used = 0, i;
> struct ocfs2_quota_recovery *qrec;
> + struct list_head *iter;
> + struct ocfs2_recover_node *rn = NULL;
>
> mlog_entry_void();
>
> @@ -1367,20 +1333,21 @@ restart:
> NULL, NULL);
>
> spin_lock(&osb->osb_lock);
> - while (rm->rm_used) {
> + list_for_each(iter, &osb->s_recovery_nodes) {
> + rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
> /* It's always safe to remove entry zero, as we won't
> * clear it until ocfs2_recover_node() has succeeded. */
> - node_num = rm->rm_entries[0];
> spin_unlock(&osb->osb_lock);
> - mlog(0, "checking node %d\n", node_num);
> - slot_num = ocfs2_node_num_to_slot(osb, node_num);
> - if (slot_num == -ENOENT) {
> + mlog(0, "checking node %d\n", rn->rn_node_num);
> + rn->rn_slot_num = ocfs2_node_num_to_slot(osb, rn->rn_node_num);
> + if (rn->rn_slot_num == -ENOENT) {
> status = 0;
> mlog(0, "no slot for this node, so no recovery"
> "required.\n");
> goto skip_recovery;
> }
> - mlog(0, "node %d was using slot %d\n", node_num, slot_num);
> + mlog(0, "node %d was using slot %d\n",
> + rn->rn_node_num, rn->rn_slot_num);
>
> /* It is a bit subtle with quota recovery. We cannot do it
> * immediately because we have to obtain cluster locks from
> @@ -1388,18 +1355,19 @@ restart:
> * then quota usage would be out of sync until some node takes
> * the slot. So we remember which nodes need quota recovery
> * and when everything else is done, we recover quotas. */
> - for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
> + for (i = 0; i < rm_quota_used && rm_quota[i] != rn->rn_slot_num;
> + i++);
> if (i == rm_quota_used)
> - rm_quota[rm_quota_used++] = slot_num;
> + rm_quota[rm_quota_used++] = rn->rn_slot_num;
>
> - status = ocfs2_recover_node(osb, node_num, slot_num);
> + status = ocfs2_recover_one_node(rn);
> skip_recovery:
> if (!status) {
> - ocfs2_recovery_map_clear(osb, node_num);
> + ocfs2_recovery_map_clear(osb, rn);
kfree with spinlocked here.
> } else {
> mlog(ML_ERROR,
> "Error %d recovering node %d on device (%u,%u)!\n",
> - status, node_num,
> + status, rn->rn_node_num,
> MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
> mlog(ML_ERROR, "Volume requires unmount.\n");
> }
> @@ -1681,62 +1649,64 @@ done:
> * second part of a nodes recovery process (local alloc recovery) is
> * far less concerning.
> */
> -static int ocfs2_recover_node(struct ocfs2_super *osb,
> - int node_num, int slot_num)
> +static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn)
> {
> - int status = 0;
> struct ocfs2_dinode *la_copy = NULL;
> struct ocfs2_dinode *tl_copy = NULL;
> + struct ocfs2_super *osb = rn->rn_osb;
>
> mlog_entry("(node_num=%d, slot_num=%d, osb->node_num = %d)\n",
> - node_num, slot_num, osb->node_num);
> + rn->rn_node_num, rn->rn_slot_num, osb->node_num);
>
> /* Should not ever be called to recover ourselves -- in that
> * case we should've called ocfs2_journal_load instead. */
> - BUG_ON(osb->node_num == node_num);
> + BUG_ON(osb->node_num == rn->rn_node_num);
>
> - status = ocfs2_replay_journal(osb, node_num, slot_num);
> - if (status < 0) {
> - if (status == -EBUSY) {
> + rn->rn_status = ocfs2_replay_journal(osb, rn->rn_node_num,
> + rn->rn_slot_num);
> + if (rn->rn_status < 0) {
> + if (rn->rn_status == -EBUSY) {
> mlog(0, "Skipping recovery for slot %u (node %u) "
> - "as another node has recovered it\n", slot_num,
> - node_num);
> - status = 0;
> + "as another node has recovered it\n",
> + rn->rn_slot_num, rn->rn_node_num);
> + rn->rn_status = 0;
> goto done;
> }
> - mlog_errno(status);
> + mlog_errno(rn->rn_status);
> goto done;
> }
>
> /* Stamp a clean local alloc file AFTER recovering the journal... */
> - status = ocfs2_begin_local_alloc_recovery(osb, slot_num, &la_copy);
> - if (status < 0) {
> - mlog_errno(status);
> + rn->rn_status = ocfs2_begin_local_alloc_recovery(osb, rn->rn_slot_num,
> + &la_copy);
> + if (rn->rn_status < 0) {
> + mlog_errno(rn->rn_status);
> goto done;
> }
>
> /* An error from begin_truncate_log_recovery is not
> * serious enough to warrant halting the rest of
> * recovery. */
> - status = ocfs2_begin_truncate_log_recovery(osb, slot_num, &tl_copy);
> - if (status < 0)
> - mlog_errno(status);
> + rn->rn_status = ocfs2_begin_truncate_log_recovery(osb,
> + rn->rn_slot_num, &tl_copy);
> + if (rn->rn_status < 0)
> + mlog_errno(rn->rn_status);
>
> /* Likewise, this would be a strange but ultimately not so
> * harmful place to get an error... */
> - status = ocfs2_clear_slot(osb, slot_num);
> - if (status < 0)
> - mlog_errno(status);
> + rn->rn_status = ocfs2_clear_slot(osb, rn->rn_slot_num);
> + if (rn->rn_status < 0)
> + mlog_errno(rn->rn_status);
>
> /* This will kfree the memory pointed to by la_copy and tl_copy */
> - ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
> - tl_copy, NULL);
> + ocfs2_queue_recovery_completion(osb->journal, rn->rn_slot_num,
> + la_copy, tl_copy, NULL);
>
> - status = 0;
> + rn->rn_status = 0;
> done:
>
> - mlog_exit(status);
> - return status;
> + mlog_exit(rn->rn_status);
> + return rn->rn_status;
> }
>
> /* Test node liveness by trylocking his journal. If we get the lock,
> diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h
> index 43e56b9..0325d81 100644
> --- a/fs/ocfs2/journal.h
> +++ b/fs/ocfs2/journal.h
> @@ -43,9 +43,12 @@ struct ocfs2_dinode;
> * It is protected by the recovery_lock.
> */
>
> -struct ocfs2_recovery_map {
> - unsigned int rm_used;
> - unsigned int *rm_entries;
> +struct ocfs2_recover_node {
> + struct ocfs2_super *rn_osb;
> + int rn_node_num;
> + int rn_slot_num;
> + int rn_status;
> + struct list_head rn_list;
> };
>
>
> diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
> index d840821..318caac 100644
> --- a/fs/ocfs2/ocfs2.h
> +++ b/fs/ocfs2/ocfs2.h
> @@ -337,7 +337,8 @@ struct ocfs2_super
>
> atomic_t vol_state;
> struct mutex recovery_lock;
> - struct ocfs2_recovery_map *recovery_map;
> + struct list_head s_recovery_nodes;
> +
> struct ocfs2_replay_map *replay_map;
> struct task_struct *recovery_thread_task;
> int disable_recovery;
> diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
> index f02c0ef..478715b 100644
> --- a/fs/ocfs2/super.c
> +++ b/fs/ocfs2/super.c
> @@ -220,7 +220,6 @@ static const match_table_t tokens = {
> static int ocfs2_osb_dump(struct ocfs2_super *osb, char *buf, int len)
> {
> struct ocfs2_cluster_connection *cconn = osb->cconn;
> - struct ocfs2_recovery_map *rm = osb->recovery_map;
> struct ocfs2_orphan_scan *os = &osb->osb_orphan_scan;
> int i, out = 0;
>
> @@ -267,21 +266,6 @@ static int ocfs2_osb_dump(struct ocfs2_super
> *osb, char *buf, int len)
> osb->dc_work_sequence);
> spin_unlock(&osb->dc_task_lock);
>
> - spin_lock(&osb->osb_lock);
> - out += snprintf(buf + out, len - out, "%10s => Pid: %d Nodes:",
> - "Recovery",
> - (osb->recovery_thread_task ?
> - task_pid_nr(osb->recovery_thread_task) : -1));
> - if (rm->rm_used == 0)
> - out += snprintf(buf + out, len - out, " None\n");
> - else {
> - for (i = 0; i < rm->rm_used; i++)
> - out += snprintf(buf + out, len - out, " %d",
> - rm->rm_entries[i]);
> - out += snprintf(buf + out, len - out, "\n");
> - }
> - spin_unlock(&osb->osb_lock);
> -
> out += snprintf(buf + out, len - out,
> "%10s => Pid: %d Interval: %lu Needs: %d\n", "Commit",
> (osb->commit_task ? task_pid_nr(osb->commit_task) : -1),
> --
> 1.7.1
>
>
> --
> Goldwyn
>
> _______________________________________________
> Ocfs2-devel mailing list
> Ocfs2-devel at oss.oracle.com
> http://oss.oracle.com/mailman/listinfo/ocfs2-devel
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
2010-11-12 2:16 ` Wengang Wang
@ 2010-11-12 19:22 ` Goldwyn Rodrigues
2010-11-13 5:03 ` Wengang Wang
0 siblings, 1 reply; 13+ messages in thread
From: Goldwyn Rodrigues @ 2010-11-12 19:22 UTC (permalink / raw)
To: ocfs2-devel
Hi Wengang,
Thanks for the review.
On Thu, Nov 11, 2010 at 8:16 PM, Wengang Wang <wen.gang.wang@oracle.com> wrote:
> On 10-11-11 19:01, Goldwyn Rodrigues wrote:
>> ocfs2_recover_node is a new data structure to include all information required
>> to recover one node. The structure is maintained as a list in the super block.
>>
>> ocfs2_recover_node replaces the recovery_map
>>
>> Signed-off-by: Goldwyn Rodrigues <rgoldwyn@suse.de>
>> ---
>> ?fs/ocfs2/journal.c | ?170 +++++++++++++++++++++------------------------------
>> ?fs/ocfs2/journal.h | ? ?9 ++-
>> ?fs/ocfs2/ocfs2.h ? | ? ?3 +-
>> ?fs/ocfs2/super.c ? | ? 16 -----
>> ?4 files changed, 78 insertions(+), 120 deletions(-)
>>
>> diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
>> index faa2303..6c378d6 100644
>> --- a/fs/ocfs2/journal.c
>> +++ b/fs/ocfs2/journal.c
>> @@ -58,8 +58,7 @@ DEFINE_SPINLOCK(trans_inc_lock);
>> ?#define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000
>>
>> ?static int ocfs2_force_read_journal(struct inode *inode);
>> -static int ocfs2_recover_node(struct ocfs2_super *osb,
>> - ? ? ? ? ? ? ? ? ? ? ? ? ? int node_num, int slot_num);
>> +static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn);
>> ?static int __ocfs2_recovery_thread(void *arg);
>> ?static int ocfs2_commit_cache(struct ocfs2_super *osb);
>> ?static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota);
>> @@ -179,24 +178,11 @@ void ocfs2_free_replay_slots(struct ocfs2_super *osb)
>>
>> ?int ocfs2_recovery_init(struct ocfs2_super *osb)
>> ?{
>> - ? ? struct ocfs2_recovery_map *rm;
>> -
>> ? ? ? mutex_init(&osb->recovery_lock);
>> ? ? ? osb->disable_recovery = 0;
>> ? ? ? osb->recovery_thread_task = NULL;
>> ? ? ? init_waitqueue_head(&osb->recovery_event);
>> -
>> - ? ? rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
>> - ? ? ? ? ? ? ? ? ?osb->max_slots * sizeof(unsigned int),
>> - ? ? ? ? ? ? ? ? ?GFP_KERNEL);
>> - ? ? if (!rm) {
>> - ? ? ? ? ? ? mlog_errno(-ENOMEM);
>> - ? ? ? ? ? ? return -ENOMEM;
>> - ? ? }
>> -
>> - ? ? rm->rm_entries = (unsigned int *)((char *)rm +
>> - ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? sizeof(struct ocfs2_recovery_map));
>> - ? ? osb->recovery_map = rm;
>> + ? ? INIT_LIST_HEAD(&osb->s_recovery_nodes);
>>
>> ? ? ? return 0;
>> ?}
>> @@ -212,8 +198,6 @@ static int ocfs2_recovery_thread_running(struct
>> ocfs2_super *osb)
>>
>> ?void ocfs2_recovery_exit(struct ocfs2_super *osb)
>> ?{
>> - ? ? struct ocfs2_recovery_map *rm;
>> -
>> ? ? ? /* disable any new recovery threads and wait for any currently
>> ? ? ? ?* running ones to exit. Do this before setting the vol_state. */
>> ? ? ? mutex_lock(&osb->recovery_lock);
>> @@ -226,29 +210,20 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
>> ? ? ? ?* complete. */
>> ? ? ? flush_workqueue(ocfs2_wq);
>>
>> - ? ? /*
>> - ? ? ?* Now that recovery is shut down, and the osb is about to be
>> - ? ? ?* freed, ?the osb_lock is not taken here.
>> - ? ? ?*/
>> - ? ? rm = osb->recovery_map;
>> - ? ? /* XXX: Should we bug if there are dirty entries? */
>> -
>> - ? ? kfree(rm);
>> ?}
>
> needs to free ocfs2_recover_nodes in list osb->s_recovery_nodes?
>
Hmm. Good catch.
>>
>> ?static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
>> ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?unsigned int node_num)
>
> for function names, would you also change all *map* to *node* accordingly?
>
>> ?{
>> - ? ? int i;
>> - ? ? struct ocfs2_recovery_map *rm = osb->recovery_map;
>> + ? ? struct ocfs2_recover_node *rn;
>> + ? ? struct list_head *iter;
>>
>> ? ? ? assert_spin_locked(&osb->osb_lock);
>> -
>> - ? ? for (i = 0; i < rm->rm_used; i++) {
>> - ? ? ? ? ? ? if (rm->rm_entries[i] == node_num)
>> + ? ? list_for_each(iter, &osb->s_recovery_nodes) {
>> + ? ? ? ? ? ? rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
>> + ? ? ? ? ? ? if (rn->rn_node_num == node_num)
>> ? ? ? ? ? ? ? ? ? ? ? return 1;
>> ? ? ? }
>> -
>> ? ? ? return 0;
>> ?}
>>
>> @@ -256,45 +231,36 @@ static int __ocfs2_recovery_map_test(struct
>> ocfs2_super *osb,
>> ?static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
>> ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? unsigned int node_num)
>> ?{
>> - ? ? struct ocfs2_recovery_map *rm = osb->recovery_map;
>> -
>> - ? ? spin_lock(&osb->osb_lock);
>> - ? ? if (__ocfs2_recovery_map_test(osb, node_num)) {
>> - ? ? ? ? ? ? spin_unlock(&osb->osb_lock);
>> - ? ? ? ? ? ? return 1;
>> - ? ? }
>> + ? ? struct list_head *iter;
>> + ? ? struct ocfs2_recover_node *rn = NULL, *trn;
>>
>> - ? ? /* XXX: Can this be exploited? Not from o2dlm... */
>> - ? ? BUG_ON(rm->rm_used >= osb->max_slots);
>> + ? ? trn = kzalloc(sizeof(struct ocfs2_recover_node), GFP_KERNEL);
>>
>
> GFP_NOFS is better in this case?
> NULL check?
>
Yes, I will change that.
>> - ? ? rm->rm_entries[rm->rm_used] = node_num;
>> - ? ? rm->rm_used++;
>> + ? ? spin_lock(&osb->osb_lock);
>> + ? ? list_for_each(iter, &osb->s_recovery_nodes) {
>> + ? ? ? ? ? ? ? ? ? ? rn = list_entry(iter, struct ocfs2_recover_node,
>> + ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? rn_list);
>> + ? ? ? ? ? ? ? ? ? ? if (rn->rn_node_num == node_num) {
>> + ? ? ? ? ? ? ? ? ? ? ? ? ? ? spin_unlock(&osb->osb_lock);
>> + ? ? ? ? ? ? ? ? ? ? ? ? ? ? kfree(trn);
>> + ? ? ? ? ? ? ? ? ? ? ? ? ? ? return 1;
>> + ? ? ? ? ? ? ? ? ? ? }
>> + ? ? }
>> + ? ? trn->rn_node_num = node_num;
>> + ? ? trn->rn_osb = osb;
>> + ? ? list_add_tail(&trn->rn_list, &osb->s_recovery_nodes);
>> ? ? ? spin_unlock(&osb->osb_lock);
>>
>> ? ? ? return 0;
>> ?}
>>
>> ?static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
>> - ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?unsigned int node_num)
>> + ? ? ? ? ? ? ? ? ? ? struct ocfs2_recover_node *rn)
>> ?{
>> - ? ? int i;
>> - ? ? struct ocfs2_recovery_map *rm = osb->recovery_map;
>> -
>> ? ? ? spin_lock(&osb->osb_lock);
>> -
>> - ? ? for (i = 0; i < rm->rm_used; i++) {
>> - ? ? ? ? ? ? if (rm->rm_entries[i] == node_num)
>> - ? ? ? ? ? ? ? ? ? ? break;
>> - ? ? }
>> -
>> - ? ? if (i < rm->rm_used) {
>> - ? ? ? ? ? ? /* XXX: be careful with the pointer math */
>> - ? ? ? ? ? ? memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
>> - ? ? ? ? ? ? ? ? ? ? (rm->rm_used - i - 1) * sizeof(unsigned int));
>> - ? ? ? ? ? ? rm->rm_used--;
>> - ? ? }
>> -
>> + ? ? list_del(&rn->rn_list);
>> ? ? ? spin_unlock(&osb->osb_lock);
>> + ? ? kfree(rn);
>> ?}
>>
>> ?static int ocfs2_commit_cache(struct ocfs2_super *osb)
>> @@ -1092,10 +1058,9 @@ bail:
>> ?static int ocfs2_recovery_completed(struct ocfs2_super *osb)
>> ?{
>> ? ? ? int empty;
>> - ? ? struct ocfs2_recovery_map *rm = osb->recovery_map;
>>
>> ? ? ? spin_lock(&osb->osb_lock);
>> - ? ? empty = (rm->rm_used == 0);
>> + ? ? empty = list_empty(&osb->s_recovery_nodes);
>> ? ? ? spin_unlock(&osb->osb_lock);
>>
>> ? ? ? return empty;
>> @@ -1332,12 +1297,13 @@ void ocfs2_complete_quota_recovery(struct
>> ocfs2_super *osb)
>>
>> ?static int __ocfs2_recovery_thread(void *arg)
>> ?{
>> - ? ? int status, node_num, slot_num;
>> + ? ? int status;
>> ? ? ? struct ocfs2_super *osb = arg;
>> - ? ? struct ocfs2_recovery_map *rm = osb->recovery_map;
>> ? ? ? int *rm_quota = NULL;
>> ? ? ? int rm_quota_used = 0, i;
>> ? ? ? struct ocfs2_quota_recovery *qrec;
>> + ? ? struct list_head *iter;
>> + ? ? struct ocfs2_recover_node *rn = NULL;
>>
>> ? ? ? mlog_entry_void();
>>
>> @@ -1367,20 +1333,21 @@ restart:
>> ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? NULL, NULL);
>>
>> ? ? ? spin_lock(&osb->osb_lock);
>> - ? ? while (rm->rm_used) {
>> + ? ? list_for_each(iter, &osb->s_recovery_nodes) {
>> + ? ? ? ? ? ? rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
>> ? ? ? ? ? ? ? /* It's always safe to remove entry zero, as we won't
>> ? ? ? ? ? ? ? ?* clear it until ocfs2_recover_node() has succeeded. */
>> - ? ? ? ? ? ? node_num = rm->rm_entries[0];
>> ? ? ? ? ? ? ? spin_unlock(&osb->osb_lock);
>> - ? ? ? ? ? ? mlog(0, "checking node %d\n", node_num);
>> - ? ? ? ? ? ? slot_num = ocfs2_node_num_to_slot(osb, node_num);
>> - ? ? ? ? ? ? if (slot_num == -ENOENT) {
>> + ? ? ? ? ? ? mlog(0, "checking node %d\n", rn->rn_node_num);
>> + ? ? ? ? ? ? rn->rn_slot_num = ocfs2_node_num_to_slot(osb, rn->rn_node_num);
>> + ? ? ? ? ? ? if (rn->rn_slot_num == -ENOENT) {
>> ? ? ? ? ? ? ? ? ? ? ? status = 0;
>> ? ? ? ? ? ? ? ? ? ? ? mlog(0, "no slot for this node, so no recovery"
>> ? ? ? ? ? ? ? ? ? ? ? ? ? ?"required.\n");
>> ? ? ? ? ? ? ? ? ? ? ? goto skip_recovery;
>> ? ? ? ? ? ? ? }
>> - ? ? ? ? ? ? mlog(0, "node %d was using slot %d\n", node_num, slot_num);
>> + ? ? ? ? ? ? mlog(0, "node %d was using slot %d\n",
>> + ? ? ? ? ? ? ? ? ? ? ? ? ? ? rn->rn_node_num, rn->rn_slot_num);
>>
>> ? ? ? ? ? ? ? /* It is a bit subtle with quota recovery. We cannot do it
>> ? ? ? ? ? ? ? ?* immediately because we have to obtain cluster locks from
>> @@ -1388,18 +1355,19 @@ restart:
>> ? ? ? ? ? ? ? ?* then quota usage would be out of sync until some node takes
>> ? ? ? ? ? ? ? ?* the slot. So we remember which nodes need quota recovery
>> ? ? ? ? ? ? ? ?* and when everything else is done, we recover quotas. */
>> - ? ? ? ? ? ? for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
>> + ? ? ? ? ? ? for (i = 0; i < rm_quota_used && rm_quota[i] != rn->rn_slot_num;
>> + ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? i++);
>> ? ? ? ? ? ? ? if (i == rm_quota_used)
>> - ? ? ? ? ? ? ? ? ? ? rm_quota[rm_quota_used++] = slot_num;
>> + ? ? ? ? ? ? ? ? ? ? rm_quota[rm_quota_used++] = rn->rn_slot_num;
>>
>> - ? ? ? ? ? ? status = ocfs2_recover_node(osb, node_num, slot_num);
>> + ? ? ? ? ? ? status = ocfs2_recover_one_node(rn);
>> ?skip_recovery:
>> ? ? ? ? ? ? ? if (!status) {
>> - ? ? ? ? ? ? ? ? ? ? ocfs2_recovery_map_clear(osb, node_num);
>> + ? ? ? ? ? ? ? ? ? ? ocfs2_recovery_map_clear(osb, rn);
>
> kfree with spinlocked here.
>
What is the problem with that?
In any case, this part of the code moves to thread.
Regards,
--
Goldwyn
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
2010-11-12 19:22 ` Goldwyn Rodrigues
@ 2010-11-13 5:03 ` Wengang Wang
0 siblings, 0 replies; 13+ messages in thread
From: Wengang Wang @ 2010-11-13 5:03 UTC (permalink / raw)
To: ocfs2-devel
Hi,
On 10-11-12 13:22, Goldwyn Rodrigues wrote:
> Hi Wengang,
>
> Thanks for the review.
>
> On Thu, Nov 11, 2010 at 8:16 PM, Wengang Wang <wen.gang.wang@oracle.com> wrote:
> > On 10-11-11 19:01, Goldwyn Rodrigues wrote:
> >> ocfs2_recover_node is a new data structure to include all information required
> >> to recover one node. The structure is maintained as a list in the super block.
> >>
> >> ocfs2_recover_node replaces the recovery_map
> >>
> >> Signed-off-by: Goldwyn Rodrigues <rgoldwyn@suse.de>
> >> - ? ? ? ? ? ? ? ? ? ? rm_quota[rm_quota_used++] = slot_num;
> >> + ? ? ? ? ? ? ? ? ? ? rm_quota[rm_quota_used++] = rn->rn_slot_num;
> >>
> >> - ? ? ? ? ? ? status = ocfs2_recover_node(osb, node_num, slot_num);
> >> + ? ? ? ? ? ? status = ocfs2_recover_one_node(rn);
> >> ?skip_recovery:
> >> ? ? ? ? ? ? ? if (!status) {
> >> - ? ? ? ? ? ? ? ? ? ? ocfs2_recovery_map_clear(osb, node_num);
> >> + ? ? ? ? ? ? ? ? ? ? ocfs2_recovery_map_clear(osb, rn);
> >
> > kfree with spinlocked here.
> >
>
> What is the problem with that?
hm... seems no block in kfree() in current code.
please ignore that comment!
regards,
wengang.
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
@ 2010-11-16 22:59 Goldwyn Rodrigues
2010-11-17 1:49 ` Wengang Wang
0 siblings, 1 reply; 13+ messages in thread
From: Goldwyn Rodrigues @ 2010-11-16 22:59 UTC (permalink / raw)
To: ocfs2-devel
Changes: Wengang's review comments incorporated.
ocfs2_recover_node is a new data structure to include all information required
to recover one node. The structure is maintainer as a list in the super block.
recovery_map is no longer required with ocfs2_recover_node.
Signed-off-by: Goldwyn Rodrigues <rgoldwyn@suse.de>
---
fs/ocfs2/journal.c | 166 ++++++++++++++++++++++++----------------------------
fs/ocfs2/journal.h | 9 ++-
fs/ocfs2/ocfs2.h | 3 +-
fs/ocfs2/super.c | 16 -----
4 files changed, 85 insertions(+), 109 deletions(-)
diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index faa2303..71987d4 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -58,8 +58,7 @@ DEFINE_SPINLOCK(trans_inc_lock);
#define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000
static int ocfs2_force_read_journal(struct inode *inode);
-static int ocfs2_recover_node(struct ocfs2_super *osb,
- int node_num, int slot_num);
+static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn);
static int __ocfs2_recovery_thread(void *arg);
static int ocfs2_commit_cache(struct ocfs2_super *osb);
static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota);
@@ -179,24 +178,11 @@ void ocfs2_free_replay_slots(struct ocfs2_super *osb)
int ocfs2_recovery_init(struct ocfs2_super *osb)
{
- struct ocfs2_recovery_map *rm;
-
mutex_init(&osb->recovery_lock);
osb->disable_recovery = 0;
osb->recovery_thread_task = NULL;
init_waitqueue_head(&osb->recovery_event);
-
- rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
- osb->max_slots * sizeof(unsigned int),
- GFP_KERNEL);
- if (!rm) {
- mlog_errno(-ENOMEM);
- return -ENOMEM;
- }
-
- rm->rm_entries = (unsigned int *)((char *)rm +
- sizeof(struct ocfs2_recovery_map));
- osb->recovery_map = rm;
+ INIT_LIST_HEAD(&osb->s_recovery_nodes);
return 0;
}
@@ -212,8 +198,8 @@ static int ocfs2_recovery_thread_running(struct
ocfs2_super *osb)
void ocfs2_recovery_exit(struct ocfs2_super *osb)
{
- struct ocfs2_recovery_map *rm;
-
+ struct list_head *iter;
+ struct ocfs2_recover_node *rn;
/* disable any new recovery threads and wait for any currently
* running ones to exit. Do this before setting the vol_state. */
mutex_lock(&osb->recovery_lock);
@@ -226,75 +212,66 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
* complete. */
flush_workqueue(ocfs2_wq);
- /*
- * Now that recovery is shut down, and the osb is about to be
- * freed, the osb_lock is not taken here.
- */
- rm = osb->recovery_map;
- /* XXX: Should we bug if there are dirty entries? */
-
- kfree(rm);
+ spin_lock(&osb->osb_lock);
+ list_for_each(iter, &osb->s_recovery_nodes) {
+ rn = list_entry(iter, struct ocfs2_recover_node,
+ rn_list);
+ list_del(&rn->rn_list);
+ }
+ spin_unlock(&osb->osb_lock);
}
static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
unsigned int node_num)
{
- int i;
- struct ocfs2_recovery_map *rm = osb->recovery_map;
+ struct ocfs2_recover_node *rn;
+ struct list_head *iter;
assert_spin_locked(&osb->osb_lock);
-
- for (i = 0; i < rm->rm_used; i++) {
- if (rm->rm_entries[i] == node_num)
+ list_for_each(iter, &osb->s_recovery_nodes) {
+ rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
+ if (rn->rn_node_num == node_num)
return 1;
}
-
return 0;
}
/* Behaves like test-and-set. Returns the previous value */
-static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
+static int ocfs2_recovery_node_set(struct ocfs2_super *osb,
unsigned int node_num)
{
- struct ocfs2_recovery_map *rm = osb->recovery_map;
-
- spin_lock(&osb->osb_lock);
- if (__ocfs2_recovery_map_test(osb, node_num)) {
- spin_unlock(&osb->osb_lock);
- return 1;
- }
+ struct list_head *iter;
+ struct ocfs2_recover_node *rn = NULL, *trn;
- /* XXX: Can this be exploited? Not from o2dlm... */
- BUG_ON(rm->rm_used >= osb->max_slots);
+ trn = kzalloc(sizeof(struct ocfs2_recover_node), GFP_NOFS);
+ if (!trn)
+ return -ENOMEM;
- rm->rm_entries[rm->rm_used] = node_num;
- rm->rm_used++;
+ spin_lock(&osb->osb_lock);
+ list_for_each(iter, &osb->s_recovery_nodes) {
+ rn = list_entry(iter, struct ocfs2_recover_node,
+ rn_list);
+ if (rn->rn_node_num == node_num) {
+ spin_unlock(&osb->osb_lock);
+ kfree(trn);
+ return 1;
+ }
+ }
+ trn->rn_node_num = node_num;
+ trn->rn_osb = osb;
+ list_add_tail(&trn->rn_list, &osb->s_recovery_nodes);
spin_unlock(&osb->osb_lock);
return 0;
}
-static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
- unsigned int node_num)
+static void ocfs2_recovery_node_clear(struct ocfs2_super *osb,
+ struct ocfs2_recover_node *rn)
{
- int i;
- struct ocfs2_recovery_map *rm = osb->recovery_map;
-
spin_lock(&osb->osb_lock);
-
- for (i = 0; i < rm->rm_used; i++) {
- if (rm->rm_entries[i] == node_num)
- break;
- }
-
- if (i < rm->rm_used) {
- /* XXX: be careful with the pointer math */
- memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
- (rm->rm_used - i - 1) * sizeof(unsigned int));
- rm->rm_used--;
- }
-
+ list_del(&rn->rn_list);
spin_unlock(&osb->osb_lock);
+ kfree(rn);
}
static int ocfs2_commit_cache(struct ocfs2_super *osb)
@@ -1092,10 +1069,9 @@ bail:
static int ocfs2_recovery_completed(struct ocfs2_super *osb)
{
int empty;
- struct ocfs2_recovery_map *rm = osb->recovery_map;
spin_lock(&osb->osb_lock);
- empty = (rm->rm_used == 0);
+ empty = list_empty(&osb->s_recovery_nodes);
spin_unlock(&osb->osb_lock);
return empty;
@@ -1332,12 +1308,13 @@ void ocfs2_complete_quota_recovery(struct
ocfs2_super *osb)
static int __ocfs2_recovery_thread(void *arg)
{
- int status, node_num, slot_num;
+ int status;
struct ocfs2_super *osb = arg;
- struct ocfs2_recovery_map *rm = osb->recovery_map;
int *rm_quota = NULL;
int rm_quota_used = 0, i;
struct ocfs2_quota_recovery *qrec;
+ struct list_head *iter;
+ struct ocfs2_recover_node *rn = NULL;
mlog_entry_void();
@@ -1367,20 +1344,21 @@ restart:
NULL, NULL);
spin_lock(&osb->osb_lock);
- while (rm->rm_used) {
+ list_for_each(iter, &osb->s_recovery_nodes) {
+ rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
/* It's always safe to remove entry zero, as we won't
* clear it until ocfs2_recover_node() has succeeded. */
- node_num = rm->rm_entries[0];
spin_unlock(&osb->osb_lock);
- mlog(0, "checking node %d\n", node_num);
- slot_num = ocfs2_node_num_to_slot(osb, node_num);
- if (slot_num == -ENOENT) {
+ mlog(0, "checking node %d\n", rn->rn_node_num);
+ rn->rn_slot_num = ocfs2_node_num_to_slot(osb, rn->rn_node_num);
+ if (rn->rn_slot_num == -ENOENT) {
status = 0;
mlog(0, "no slot for this node, so no recovery"
"required.\n");
goto skip_recovery;
}
- mlog(0, "node %d was using slot %d\n", node_num, slot_num);
+ mlog(0, "node %d was using slot %d\n",
+ rn->rn_node_num, rn->rn_slot_num);
/* It is a bit subtle with quota recovery. We cannot do it
* immediately because we have to obtain cluster locks from
@@ -1388,18 +1366,19 @@ restart:
* then quota usage would be out of sync until some node takes
* the slot. So we remember which nodes need quota recovery
* and when everything else is done, we recover quotas. */
- for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
+ for (i = 0; i < rm_quota_used && rm_quota[i] != rn->rn_slot_num;
+ i++);
if (i == rm_quota_used)
- rm_quota[rm_quota_used++] = slot_num;
+ rm_quota[rm_quota_used++] = rn->rn_slot_num;
- status = ocfs2_recover_node(osb, node_num, slot_num);
+ status = ocfs2_recover_one_node(rn);
skip_recovery:
if (!status) {
- ocfs2_recovery_map_clear(osb, node_num);
+ ocfs2_recovery_node_clear(osb, rn);
} else {
mlog(ML_ERROR,
"Error %d recovering node %d on device (%u,%u)!\n",
- status, node_num,
+ status, rn->rn_node_num,
MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
mlog(ML_ERROR, "Volume requires unmount.\n");
}
@@ -1461,6 +1440,7 @@ bail:
void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
{
+ int ret = 0;
mlog_entry("(node_num=%d, osb->node_num = %d)\n",
node_num, osb->node_num);
@@ -1470,7 +1450,11 @@ void ocfs2_recovery_thread(struct ocfs2_super
*osb, int node_num)
/* People waiting on recovery will wait on
* the recovery map to empty. */
- if (ocfs2_recovery_map_set(osb, node_num))
+ ret = ocfs2_recovery_node_set(osb, node_num);
+ if (ret == -ENOMEM) {
+ mlog_errno(ret);
+ goto out;
+ } else if (ret)
mlog(0, "node %d already in recovery map.\n", node_num);
mlog(0, "starting recovery thread...\n");
@@ -1681,26 +1665,27 @@ done:
* second part of a nodes recovery process (local alloc recovery) is
* far less concerning.
*/
-static int ocfs2_recover_node(struct ocfs2_super *osb,
- int node_num, int slot_num)
+static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn)
{
int status = 0;
struct ocfs2_dinode *la_copy = NULL;
struct ocfs2_dinode *tl_copy = NULL;
+ struct ocfs2_super *osb = rn->rn_osb;
mlog_entry("(node_num=%d, slot_num=%d, osb->node_num = %d)\n",
- node_num, slot_num, osb->node_num);
+ rn->rn_node_num, rn->rn_slot_num, osb->node_num);
/* Should not ever be called to recover ourselves -- in that
* case we should've called ocfs2_journal_load instead. */
- BUG_ON(osb->node_num == node_num);
+ BUG_ON(osb->node_num == rn->rn_node_num);
- status = ocfs2_replay_journal(osb, node_num, slot_num);
+ status = ocfs2_replay_journal(osb, rn->rn_node_num,
+ rn->rn_slot_num);
if (status < 0) {
if (status == -EBUSY) {
mlog(0, "Skipping recovery for slot %u (node %u) "
- "as another node has recovered it\n", slot_num,
- node_num);
+ "as another node has recovered it\n",
+ rn->rn_slot_num, rn->rn_node_num);
status = 0;
goto done;
}
@@ -1709,7 +1694,8 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
}
/* Stamp a clean local alloc file AFTER recovering the journal... */
- status = ocfs2_begin_local_alloc_recovery(osb, slot_num, &la_copy);
+ status = ocfs2_begin_local_alloc_recovery(osb, rn->rn_slot_num,
+ &la_copy);
if (status < 0) {
mlog_errno(status);
goto done;
@@ -1718,22 +1704,24 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
/* An error from begin_truncate_log_recovery is not
* serious enough to warrant halting the rest of
* recovery. */
- status = ocfs2_begin_truncate_log_recovery(osb, slot_num, &tl_copy);
+ status = ocfs2_begin_truncate_log_recovery(osb,
+ rn->rn_slot_num, &tl_copy);
if (status < 0)
mlog_errno(status);
/* Likewise, this would be a strange but ultimately not so
* harmful place to get an error... */
- status = ocfs2_clear_slot(osb, slot_num);
+ status = ocfs2_clear_slot(osb, rn->rn_slot_num);
if (status < 0)
mlog_errno(status);
/* This will kfree the memory pointed to by la_copy and tl_copy */
- ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
- tl_copy, NULL);
+ ocfs2_queue_recovery_completion(osb->journal, rn->rn_slot_num,
+ la_copy, tl_copy, NULL);
status = 0;
done:
+ rn->rn_status = status;
mlog_exit(status);
return status;
diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h
index 43e56b9..0325d81 100644
--- a/fs/ocfs2/journal.h
+++ b/fs/ocfs2/journal.h
@@ -43,9 +43,12 @@ struct ocfs2_dinode;
* It is protected by the recovery_lock.
*/
-struct ocfs2_recovery_map {
- unsigned int rm_used;
- unsigned int *rm_entries;
+struct ocfs2_recover_node {
+ struct ocfs2_super *rn_osb;
+ int rn_node_num;
+ int rn_slot_num;
+ int rn_status;
+ struct list_head rn_list;
};
diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index d840821..318caac 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -337,7 +337,8 @@ struct ocfs2_super
atomic_t vol_state;
struct mutex recovery_lock;
- struct ocfs2_recovery_map *recovery_map;
+ struct list_head s_recovery_nodes;
+
struct ocfs2_replay_map *replay_map;
struct task_struct *recovery_thread_task;
int disable_recovery;
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index f02c0ef..478715b 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -220,7 +220,6 @@ static const match_table_t tokens = {
static int ocfs2_osb_dump(struct ocfs2_super *osb, char *buf, int len)
{
struct ocfs2_cluster_connection *cconn = osb->cconn;
- struct ocfs2_recovery_map *rm = osb->recovery_map;
struct ocfs2_orphan_scan *os = &osb->osb_orphan_scan;
int i, out = 0;
@@ -267,21 +266,6 @@ static int ocfs2_osb_dump(struct ocfs2_super
*osb, char *buf, int len)
osb->dc_work_sequence);
spin_unlock(&osb->dc_task_lock);
- spin_lock(&osb->osb_lock);
- out += snprintf(buf + out, len - out, "%10s => Pid: %d Nodes:",
- "Recovery",
- (osb->recovery_thread_task ?
- task_pid_nr(osb->recovery_thread_task) : -1));
- if (rm->rm_used == 0)
- out += snprintf(buf + out, len - out, " None\n");
- else {
- for (i = 0; i < rm->rm_used; i++)
- out += snprintf(buf + out, len - out, " %d",
- rm->rm_entries[i]);
- out += snprintf(buf + out, len - out, "\n");
- }
- spin_unlock(&osb->osb_lock);
-
out += snprintf(buf + out, len - out,
"%10s => Pid: %d Interval: %lu Needs: %d\n", "Commit",
(osb->commit_task ? task_pid_nr(osb->commit_task) : -1),
--
1.7.1
--
Goldwyn
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
2010-11-16 22:59 Goldwyn Rodrigues
@ 2010-11-17 1:49 ` Wengang Wang
2010-11-17 2:00 ` Wengang Wang
0 siblings, 1 reply; 13+ messages in thread
From: Wengang Wang @ 2010-11-17 1:49 UTC (permalink / raw)
To: ocfs2-devel
On 10-11-16 16:59, Goldwyn Rodrigues wrote:
<snip>
> @@ -212,8 +198,8 @@ static int ocfs2_recovery_thread_running(struct
> ocfs2_super *osb)
>
> void ocfs2_recovery_exit(struct ocfs2_super *osb)
> {
> - struct ocfs2_recovery_map *rm;
> -
> + struct list_head *iter;
> + struct ocfs2_recover_node *rn;
> /* disable any new recovery threads and wait for any currently
> * running ones to exit. Do this before setting the vol_state. */
> mutex_lock(&osb->recovery_lock);
> @@ -226,75 +212,66 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
> * complete. */
> flush_workqueue(ocfs2_wq);
>
> - /*
> - * Now that recovery is shut down, and the osb is about to be
> - * freed, the osb_lock is not taken here.
> - */
> - rm = osb->recovery_map;
> - /* XXX: Should we bug if there are dirty entries? */
> -
> - kfree(rm);
> + spin_lock(&osb->osb_lock);
> + list_for_each(iter, &osb->s_recovery_nodes) {
list_for_each_safe()?
regards,
wengang.
> + rn = list_entry(iter, struct ocfs2_recover_node,
> + rn_list);
> + list_del(&rn->rn_list);
> + }
> + spin_unlock(&osb->osb_lock);
> }
>
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
2010-11-17 1:49 ` Wengang Wang
@ 2010-11-17 2:00 ` Wengang Wang
2010-11-17 2:04 ` Wengang Wang
0 siblings, 1 reply; 13+ messages in thread
From: Wengang Wang @ 2010-11-17 2:00 UTC (permalink / raw)
To: ocfs2-devel
On 10-11-17 09:49, Wengang Wang wrote:
> On 10-11-16 16:59, Goldwyn Rodrigues wrote:
>
> <snip>
> > @@ -212,8 +198,8 @@ static int ocfs2_recovery_thread_running(struct
> > ocfs2_super *osb)
> >
> > void ocfs2_recovery_exit(struct ocfs2_super *osb)
> > {
> > - struct ocfs2_recovery_map *rm;
> > -
> > + struct list_head *iter;
> > + struct ocfs2_recover_node *rn;
> > /* disable any new recovery threads and wait for any currently
> > * running ones to exit. Do this before setting the vol_state. */
> > mutex_lock(&osb->recovery_lock);
> > @@ -226,75 +212,66 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
> > * complete. */
> > flush_workqueue(ocfs2_wq);
> >
> > - /*
> > - * Now that recovery is shut down, and the osb is about to be
> > - * freed, the osb_lock is not taken here.
> > - */
> > - rm = osb->recovery_map;
> > - /* XXX: Should we bug if there are dirty entries? */
> > -
> > - kfree(rm);
> > + spin_lock(&osb->osb_lock);
> > + list_for_each(iter, &osb->s_recovery_nodes) {
>
> list_for_each_safe()?
>
> regards,
> wengang.
> > + rn = list_entry(iter, struct ocfs2_recover_node,
> > + rn_list);
> > + list_del(&rn->rn_list);
If you need to do this here, I guess you may also want to free rm.
regards,
wengang.
> > + }
> > + spin_unlock(&osb->osb_lock);
> > }
> >
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
2010-11-17 2:00 ` Wengang Wang
@ 2010-11-17 2:04 ` Wengang Wang
0 siblings, 0 replies; 13+ messages in thread
From: Wengang Wang @ 2010-11-17 2:04 UTC (permalink / raw)
To: ocfs2-devel
On 10-11-17 10:00, Wengang Wang wrote:
> On 10-11-17 09:49, Wengang Wang wrote:
> > On 10-11-16 16:59, Goldwyn Rodrigues wrote:
> >
> > <snip>
> > > @@ -212,8 +198,8 @@ static int ocfs2_recovery_thread_running(struct
> > > ocfs2_super *osb)
> > >
> > > void ocfs2_recovery_exit(struct ocfs2_super *osb)
> > > {
> > > - struct ocfs2_recovery_map *rm;
> > > -
> > > + struct list_head *iter;
> > > + struct ocfs2_recover_node *rn;
> > > /* disable any new recovery threads and wait for any currently
> > > * running ones to exit. Do this before setting the vol_state. */
> > > mutex_lock(&osb->recovery_lock);
> > > @@ -226,75 +212,66 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
> > > * complete. */
> > > flush_workqueue(ocfs2_wq);
> > >
> > > - /*
> > > - * Now that recovery is shut down, and the osb is about to be
> > > - * freed, the osb_lock is not taken here.
> > > - */
> > > - rm = osb->recovery_map;
> > > - /* XXX: Should we bug if there are dirty entries? */
> > > -
> > > - kfree(rm);
> > > + spin_lock(&osb->osb_lock);
> > > + list_for_each(iter, &osb->s_recovery_nodes) {
> >
> > list_for_each_safe()?
> >
> > regards,
> > wengang.
> > > + rn = list_entry(iter, struct ocfs2_recover_node,
> > > + rn_list);
> > > + list_del(&rn->rn_list);
>
> If you need to do this here, I guess you may also want to free rm.
Sorry, typo. s/rm/rn
>
> regards,
> wengang.
> > > + }
> > > + spin_unlock(&osb->osb_lock);
> > > }
> > >
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
@ 2010-11-17 15:50 Goldwyn Rodrigues
2010-12-02 4:49 ` Sunil Mushran
2010-12-02 5:29 ` Joel Becker
0 siblings, 2 replies; 13+ messages in thread
From: Goldwyn Rodrigues @ 2010-11-17 15:50 UTC (permalink / raw)
To: ocfs2-devel
Review comments by Wengang incorporated.
ocfs2_recover_node is a new data structure to include all information required
to recover one node. The structure is maintainer as a list in the super block.
recovery_map is no longer required with ocfs2_recover_node.
Signed-off-by: Goldwyn Rodrigues <rgoldwyn@suse.de>
---
fs/ocfs2/journal.c | 171 ++++++++++++++++++++++++----------------------------
fs/ocfs2/journal.h | 9 ++-
fs/ocfs2/ocfs2.h | 3 +-
fs/ocfs2/super.c | 16 -----
4 files changed, 88 insertions(+), 111 deletions(-)
diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index faa2303..277b810 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -58,8 +58,7 @@ DEFINE_SPINLOCK(trans_inc_lock);
#define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000
static int ocfs2_force_read_journal(struct inode *inode);
-static int ocfs2_recover_node(struct ocfs2_super *osb,
- int node_num, int slot_num);
+static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn);
static int __ocfs2_recovery_thread(void *arg);
static int ocfs2_commit_cache(struct ocfs2_super *osb);
static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota);
@@ -179,24 +178,11 @@ void ocfs2_free_replay_slots(struct ocfs2_super *osb)
int ocfs2_recovery_init(struct ocfs2_super *osb)
{
- struct ocfs2_recovery_map *rm;
-
mutex_init(&osb->recovery_lock);
osb->disable_recovery = 0;
osb->recovery_thread_task = NULL;
init_waitqueue_head(&osb->recovery_event);
-
- rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
- osb->max_slots * sizeof(unsigned int),
- GFP_KERNEL);
- if (!rm) {
- mlog_errno(-ENOMEM);
- return -ENOMEM;
- }
-
- rm->rm_entries = (unsigned int *)((char *)rm +
- sizeof(struct ocfs2_recovery_map));
- osb->recovery_map = rm;
+ INIT_LIST_HEAD(&osb->s_recovery_nodes);
return 0;
}
@@ -212,8 +198,8 @@ static int ocfs2_recovery_thread_running(struct
ocfs2_super *osb)
void ocfs2_recovery_exit(struct ocfs2_super *osb)
{
- struct ocfs2_recovery_map *rm;
-
+ struct list_head *iter, *next;
+ struct ocfs2_recover_node *rn;
/* disable any new recovery threads and wait for any currently
* running ones to exit. Do this before setting the vol_state. */
mutex_lock(&osb->recovery_lock);
@@ -226,75 +212,67 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
* complete. */
flush_workqueue(ocfs2_wq);
- /*
- * Now that recovery is shut down, and the osb is about to be
- * freed, the osb_lock is not taken here.
- */
- rm = osb->recovery_map;
- /* XXX: Should we bug if there are dirty entries? */
-
- kfree(rm);
+ spin_lock(&osb->osb_lock);
+ list_for_each_safe(iter, next, &osb->s_recovery_nodes) {
+ rn = list_entry(iter, struct ocfs2_recover_node,
+ rn_list);
+ list_del(&rn->rn_list);
+ kfree(rn);
+ }
+ spin_unlock(&osb->osb_lock);
}
-static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
+static int __ocfs2_recovery_node_test(struct ocfs2_super *osb,
unsigned int node_num)
{
- int i;
- struct ocfs2_recovery_map *rm = osb->recovery_map;
+ struct ocfs2_recover_node *rn;
+ struct list_head *iter;
assert_spin_locked(&osb->osb_lock);
-
- for (i = 0; i < rm->rm_used; i++) {
- if (rm->rm_entries[i] == node_num)
+ list_for_each(iter, &osb->s_recovery_nodes) {
+ rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
+ if (rn->rn_node_num == node_num)
return 1;
}
-
return 0;
}
/* Behaves like test-and-set. Returns the previous value */
-static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
+static int ocfs2_recovery_node_set(struct ocfs2_super *osb,
unsigned int node_num)
{
- struct ocfs2_recovery_map *rm = osb->recovery_map;
-
- spin_lock(&osb->osb_lock);
- if (__ocfs2_recovery_map_test(osb, node_num)) {
- spin_unlock(&osb->osb_lock);
- return 1;
- }
+ struct list_head *iter;
+ struct ocfs2_recover_node *rn = NULL, *trn;
- /* XXX: Can this be exploited? Not from o2dlm... */
- BUG_ON(rm->rm_used >= osb->max_slots);
+ trn = kzalloc(sizeof(struct ocfs2_recover_node), GFP_NOFS);
+ if (!trn)
+ return -ENOMEM;
- rm->rm_entries[rm->rm_used] = node_num;
- rm->rm_used++;
+ spin_lock(&osb->osb_lock);
+ list_for_each(iter, &osb->s_recovery_nodes) {
+ rn = list_entry(iter, struct ocfs2_recover_node,
+ rn_list);
+ if (rn->rn_node_num == node_num) {
+ spin_unlock(&osb->osb_lock);
+ kfree(trn);
+ return 1;
+ }
+ }
+ trn->rn_node_num = node_num;
+ trn->rn_osb = osb;
+ list_add_tail(&trn->rn_list, &osb->s_recovery_nodes);
spin_unlock(&osb->osb_lock);
return 0;
}
-static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
- unsigned int node_num)
+static void ocfs2_recovery_node_clear(struct ocfs2_super *osb,
+ struct ocfs2_recover_node *rn)
{
- int i;
- struct ocfs2_recovery_map *rm = osb->recovery_map;
-
spin_lock(&osb->osb_lock);
-
- for (i = 0; i < rm->rm_used; i++) {
- if (rm->rm_entries[i] == node_num)
- break;
- }
-
- if (i < rm->rm_used) {
- /* XXX: be careful with the pointer math */
- memmove(&(rm->rm_entries[i]), &(rm->rm_entries[i + 1]),
- (rm->rm_used - i - 1) * sizeof(unsigned int));
- rm->rm_used--;
- }
-
+ list_del(&rn->rn_list);
spin_unlock(&osb->osb_lock);
+ kfree(rn);
}
static int ocfs2_commit_cache(struct ocfs2_super *osb)
@@ -1092,10 +1070,9 @@ bail:
static int ocfs2_recovery_completed(struct ocfs2_super *osb)
{
int empty;
- struct ocfs2_recovery_map *rm = osb->recovery_map;
spin_lock(&osb->osb_lock);
- empty = (rm->rm_used == 0);
+ empty = list_empty(&osb->s_recovery_nodes);
spin_unlock(&osb->osb_lock);
return empty;
@@ -1332,12 +1309,13 @@ void ocfs2_complete_quota_recovery(struct
ocfs2_super *osb)
static int __ocfs2_recovery_thread(void *arg)
{
- int status, node_num, slot_num;
+ int status;
struct ocfs2_super *osb = arg;
- struct ocfs2_recovery_map *rm = osb->recovery_map;
int *rm_quota = NULL;
int rm_quota_used = 0, i;
struct ocfs2_quota_recovery *qrec;
+ struct list_head *iter;
+ struct ocfs2_recover_node *rn = NULL;
mlog_entry_void();
@@ -1367,20 +1345,21 @@ restart:
NULL, NULL);
spin_lock(&osb->osb_lock);
- while (rm->rm_used) {
+ list_for_each(iter, &osb->s_recovery_nodes) {
+ rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
/* It's always safe to remove entry zero, as we won't
* clear it until ocfs2_recover_node() has succeeded. */
- node_num = rm->rm_entries[0];
spin_unlock(&osb->osb_lock);
- mlog(0, "checking node %d\n", node_num);
- slot_num = ocfs2_node_num_to_slot(osb, node_num);
- if (slot_num == -ENOENT) {
+ mlog(0, "checking node %d\n", rn->rn_node_num);
+ rn->rn_slot_num = ocfs2_node_num_to_slot(osb, rn->rn_node_num);
+ if (rn->rn_slot_num == -ENOENT) {
status = 0;
mlog(0, "no slot for this node, so no recovery"
"required.\n");
goto skip_recovery;
}
- mlog(0, "node %d was using slot %d\n", node_num, slot_num);
+ mlog(0, "node %d was using slot %d\n",
+ rn->rn_node_num, rn->rn_slot_num);
/* It is a bit subtle with quota recovery. We cannot do it
* immediately because we have to obtain cluster locks from
@@ -1388,18 +1367,19 @@ restart:
* then quota usage would be out of sync until some node takes
* the slot. So we remember which nodes need quota recovery
* and when everything else is done, we recover quotas. */
- for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
+ for (i = 0; i < rm_quota_used && rm_quota[i] != rn->rn_slot_num;
+ i++);
if (i == rm_quota_used)
- rm_quota[rm_quota_used++] = slot_num;
+ rm_quota[rm_quota_used++] = rn->rn_slot_num;
- status = ocfs2_recover_node(osb, node_num, slot_num);
+ status = ocfs2_recover_one_node(rn);
skip_recovery:
if (!status) {
- ocfs2_recovery_map_clear(osb, node_num);
+ ocfs2_recovery_node_clear(osb, rn);
} else {
mlog(ML_ERROR,
"Error %d recovering node %d on device (%u,%u)!\n",
- status, node_num,
+ status, rn->rn_node_num,
MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
mlog(ML_ERROR, "Volume requires unmount.\n");
}
@@ -1461,6 +1441,7 @@ bail:
void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
{
+ int ret = 0;
mlog_entry("(node_num=%d, osb->node_num = %d)\n",
node_num, osb->node_num);
@@ -1470,7 +1451,11 @@ void ocfs2_recovery_thread(struct ocfs2_super
*osb, int node_num)
/* People waiting on recovery will wait on
* the recovery map to empty. */
- if (ocfs2_recovery_map_set(osb, node_num))
+ ret = ocfs2_recovery_node_set(osb, node_num);
+ if (ret == -ENOMEM) {
+ mlog_errno(ret);
+ goto out;
+ } else if (ret)
mlog(0, "node %d already in recovery map.\n", node_num);
mlog(0, "starting recovery thread...\n");
@@ -1681,26 +1666,27 @@ done:
* second part of a nodes recovery process (local alloc recovery) is
* far less concerning.
*/
-static int ocfs2_recover_node(struct ocfs2_super *osb,
- int node_num, int slot_num)
+static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn)
{
int status = 0;
struct ocfs2_dinode *la_copy = NULL;
struct ocfs2_dinode *tl_copy = NULL;
+ struct ocfs2_super *osb = rn->rn_osb;
mlog_entry("(node_num=%d, slot_num=%d, osb->node_num = %d)\n",
- node_num, slot_num, osb->node_num);
+ rn->rn_node_num, rn->rn_slot_num, osb->node_num);
/* Should not ever be called to recover ourselves -- in that
* case we should've called ocfs2_journal_load instead. */
- BUG_ON(osb->node_num == node_num);
+ BUG_ON(osb->node_num == rn->rn_node_num);
- status = ocfs2_replay_journal(osb, node_num, slot_num);
+ status = ocfs2_replay_journal(osb, rn->rn_node_num,
+ rn->rn_slot_num);
if (status < 0) {
if (status == -EBUSY) {
mlog(0, "Skipping recovery for slot %u (node %u) "
- "as another node has recovered it\n", slot_num,
- node_num);
+ "as another node has recovered it\n",
+ rn->rn_slot_num, rn->rn_node_num);
status = 0;
goto done;
}
@@ -1709,7 +1695,8 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
}
/* Stamp a clean local alloc file AFTER recovering the journal... */
- status = ocfs2_begin_local_alloc_recovery(osb, slot_num, &la_copy);
+ status = ocfs2_begin_local_alloc_recovery(osb, rn->rn_slot_num,
+ &la_copy);
if (status < 0) {
mlog_errno(status);
goto done;
@@ -1718,22 +1705,24 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
/* An error from begin_truncate_log_recovery is not
* serious enough to warrant halting the rest of
* recovery. */
- status = ocfs2_begin_truncate_log_recovery(osb, slot_num, &tl_copy);
+ status = ocfs2_begin_truncate_log_recovery(osb,
+ rn->rn_slot_num, &tl_copy);
if (status < 0)
mlog_errno(status);
/* Likewise, this would be a strange but ultimately not so
* harmful place to get an error... */
- status = ocfs2_clear_slot(osb, slot_num);
+ status = ocfs2_clear_slot(osb, rn->rn_slot_num);
if (status < 0)
mlog_errno(status);
/* This will kfree the memory pointed to by la_copy and tl_copy */
- ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
- tl_copy, NULL);
+ ocfs2_queue_recovery_completion(osb->journal, rn->rn_slot_num,
+ la_copy, tl_copy, NULL);
status = 0;
done:
+ rn->rn_status = status;
mlog_exit(status);
return status;
@@ -1822,7 +1811,7 @@ int ocfs2_mark_dead_nodes(struct ocfs2_super *osb)
continue;
}
- if (__ocfs2_recovery_map_test(osb, node_num)) {
+ if (__ocfs2_recovery_node_test(osb, node_num)) {
spin_unlock(&osb->osb_lock);
continue;
}
diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h
index 43e56b9..0325d81 100644
--- a/fs/ocfs2/journal.h
+++ b/fs/ocfs2/journal.h
@@ -43,9 +43,12 @@ struct ocfs2_dinode;
* It is protected by the recovery_lock.
*/
-struct ocfs2_recovery_map {
- unsigned int rm_used;
- unsigned int *rm_entries;
+struct ocfs2_recover_node {
+ struct ocfs2_super *rn_osb;
+ int rn_node_num;
+ int rn_slot_num;
+ int rn_status;
+ struct list_head rn_list;
};
diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index 1efea36..f70c25a 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -337,7 +337,8 @@ struct ocfs2_super
atomic_t vol_state;
struct mutex recovery_lock;
- struct ocfs2_recovery_map *recovery_map;
+ struct list_head s_recovery_nodes;
+
struct ocfs2_replay_map *replay_map;
struct task_struct *recovery_thread_task;
int disable_recovery;
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index f02c0ef..478715b 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -220,7 +220,6 @@ static const match_table_t tokens = {
static int ocfs2_osb_dump(struct ocfs2_super *osb, char *buf, int len)
{
struct ocfs2_cluster_connection *cconn = osb->cconn;
- struct ocfs2_recovery_map *rm = osb->recovery_map;
struct ocfs2_orphan_scan *os = &osb->osb_orphan_scan;
int i, out = 0;
@@ -267,21 +266,6 @@ static int ocfs2_osb_dump(struct ocfs2_super
*osb, char *buf, int len)
osb->dc_work_sequence);
spin_unlock(&osb->dc_task_lock);
- spin_lock(&osb->osb_lock);
- out += snprintf(buf + out, len - out, "%10s => Pid: %d Nodes:",
- "Recovery",
- (osb->recovery_thread_task ?
- task_pid_nr(osb->recovery_thread_task) : -1));
- if (rm->rm_used == 0)
- out += snprintf(buf + out, len - out, " None\n");
- else {
- for (i = 0; i < rm->rm_used; i++)
- out += snprintf(buf + out, len - out, " %d",
- rm->rm_entries[i]);
- out += snprintf(buf + out, len - out, "\n");
- }
- spin_unlock(&osb->osb_lock);
-
out += snprintf(buf + out, len - out,
"%10s => Pid: %d Interval: %lu Needs: %d\n", "Commit",
(osb->commit_task ? task_pid_nr(osb->commit_task) : -1),
--
1.7.1
--
Goldwyn
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
2010-11-17 15:50 [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node Goldwyn Rodrigues
@ 2010-12-02 4:49 ` Sunil Mushran
2010-12-02 5:29 ` Joel Becker
1 sibling, 0 replies; 13+ messages in thread
From: Sunil Mushran @ 2010-12-02 4:49 UTC (permalink / raw)
To: ocfs2-devel
On 11/17/2010 07:50 AM, Goldwyn Rodrigues wrote:
> Review comments by Wengang incorporated.
>
> ocfs2_recover_node is a new data structure to include all information required
> to recover one node. The structure is maintainer as a list in the super block.
ocfs2_recover_node suggests action. How about calling it
ocfs2_recovery_list.
> recovery_map is no longer required with ocfs2_recover_node.
>
> Signed-off-by: Goldwyn Rodrigues<rgoldwyn@suse.de>
> ---
> fs/ocfs2/journal.c | 171 ++++++++++++++++++++++++----------------------------
> fs/ocfs2/journal.h | 9 ++-
> fs/ocfs2/ocfs2.h | 3 +-
> fs/ocfs2/super.c | 16 -----
> 4 files changed, 88 insertions(+), 111 deletions(-)
>
> diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
> index faa2303..277b810 100644
> --- a/fs/ocfs2/journal.c
> +++ b/fs/ocfs2/journal.c
> @@ -58,8 +58,7 @@ DEFINE_SPINLOCK(trans_inc_lock);
> #define ORPHAN_SCAN_SCHEDULE_TIMEOUT 300000
>
> static int ocfs2_force_read_journal(struct inode *inode);
> -static int ocfs2_recover_node(struct ocfs2_super *osb,
> - int node_num, int slot_num);
> +static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn);
> static int __ocfs2_recovery_thread(void *arg);
> static int ocfs2_commit_cache(struct ocfs2_super *osb);
> static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota);
> @@ -179,24 +178,11 @@ void ocfs2_free_replay_slots(struct ocfs2_super *osb)
>
> int ocfs2_recovery_init(struct ocfs2_super *osb)
> {
> - struct ocfs2_recovery_map *rm;
> -
> mutex_init(&osb->recovery_lock);
> osb->disable_recovery = 0;
> osb->recovery_thread_task = NULL;
> init_waitqueue_head(&osb->recovery_event);
> -
> - rm = kzalloc(sizeof(struct ocfs2_recovery_map) +
> - osb->max_slots * sizeof(unsigned int),
> - GFP_KERNEL);
> - if (!rm) {
> - mlog_errno(-ENOMEM);
> - return -ENOMEM;
> - }
> -
> - rm->rm_entries = (unsigned int *)((char *)rm +
> - sizeof(struct ocfs2_recovery_map));
> - osb->recovery_map = rm;
> + INIT_LIST_HEAD(&osb->s_recovery_nodes);
>
> return 0;
> }
> @@ -212,8 +198,8 @@ static int ocfs2_recovery_thread_running(struct
> ocfs2_super *osb)
>
> void ocfs2_recovery_exit(struct ocfs2_super *osb)
> {
> - struct ocfs2_recovery_map *rm;
> -
> + struct list_head *iter, *next;
> + struct ocfs2_recover_node *rn;
> /* disable any new recovery threads and wait for any currently
> * running ones to exit. Do this before setting the vol_state. */
> mutex_lock(&osb->recovery_lock);
> @@ -226,75 +212,67 @@ void ocfs2_recovery_exit(struct ocfs2_super *osb)
> * complete. */
> flush_workqueue(ocfs2_wq);
>
> - /*
> - * Now that recovery is shut down, and the osb is about to be
> - * freed, the osb_lock is not taken here.
> - */
> - rm = osb->recovery_map;
> - /* XXX: Should we bug if there are dirty entries? */
This comment is still relevant.
> -
> - kfree(rm);
> + spin_lock(&osb->osb_lock);
> + list_for_each_safe(iter, next,&osb->s_recovery_nodes) {
> + rn = list_entry(iter, struct ocfs2_recover_node,
> + rn_list);
> + list_del(&rn->rn_list);
> + kfree(rn);
> + }
> + spin_unlock(&osb->osb_lock);
extra indentation
> }
>
> -static int __ocfs2_recovery_map_test(struct ocfs2_super *osb,
> +static int __ocfs2_recovery_node_test(struct ocfs2_super *osb,
> unsigned int node_num)
> {
> - int i;
> - struct ocfs2_recovery_map *rm = osb->recovery_map;
> + struct ocfs2_recover_node *rn;
> + struct list_head *iter;
>
> assert_spin_locked(&osb->osb_lock);
> -
> - for (i = 0; i< rm->rm_used; i++) {
> - if (rm->rm_entries[i] == node_num)
> + list_for_each(iter,&osb->s_recovery_nodes) {
> + rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
> + if (rn->rn_node_num == node_num)
> return 1;
> }
> -
> return 0;
> }
>
> /* Behaves like test-and-set. Returns the previous value */
> -static int ocfs2_recovery_map_set(struct ocfs2_super *osb,
> +static int ocfs2_recovery_node_set(struct ocfs2_super *osb,
> unsigned int node_num)
> {
> - struct ocfs2_recovery_map *rm = osb->recovery_map;
> -
> - spin_lock(&osb->osb_lock);
> - if (__ocfs2_recovery_map_test(osb, node_num)) {
> - spin_unlock(&osb->osb_lock);
> - return 1;
> - }
> + struct list_head *iter;
> + struct ocfs2_recover_node *rn = NULL, *trn;
>
> - /* XXX: Can this be exploited? Not from o2dlm... */
> - BUG_ON(rm->rm_used>= osb->max_slots);
> + trn = kzalloc(sizeof(struct ocfs2_recover_node), GFP_NOFS);
> + if (!trn)
> + return -ENOMEM;
Is this enomem case handled properly up the stack? This is new.
Previously we allocated memory upfront. Wondering why we cannot
so the same here.
> - rm->rm_entries[rm->rm_used] = node_num;
> - rm->rm_used++;
> + spin_lock(&osb->osb_lock);
> + list_for_each(iter,&osb->s_recovery_nodes) {
> + rn = list_entry(iter, struct ocfs2_recover_node,
> + rn_list);
> + if (rn->rn_node_num == node_num) {
> + spin_unlock(&osb->osb_lock);
> + kfree(trn);
> + return 1;
> + }
> + }
extra indentation
> + trn->rn_node_num = node_num;
> + trn->rn_osb = osb;
> + list_add_tail(&trn->rn_list,&osb->s_recovery_nodes);
> spin_unlock(&osb->osb_lock);
>
> return 0;
> }
>
> -static void ocfs2_recovery_map_clear(struct ocfs2_super *osb,
> - unsigned int node_num)
> +static void ocfs2_recovery_node_clear(struct ocfs2_super *osb,
> + struct ocfs2_recover_node *rn)
> {
> - int i;
> - struct ocfs2_recovery_map *rm = osb->recovery_map;
> -
> spin_lock(&osb->osb_lock);
> -
> - for (i = 0; i< rm->rm_used; i++) {
> - if (rm->rm_entries[i] == node_num)
> - break;
> - }
> -
> - if (i< rm->rm_used) {
> - /* XXX: be careful with the pointer math */
> - memmove(&(rm->rm_entries[i]),&(rm->rm_entries[i + 1]),
> - (rm->rm_used - i - 1) * sizeof(unsigned int));
> - rm->rm_used--;
> - }
> -
> + list_del(&rn->rn_list);
> spin_unlock(&osb->osb_lock);
> + kfree(rn);
Aah...we did memmove. That won't work here. So list manipulation it is.
> }
>
> static int ocfs2_commit_cache(struct ocfs2_super *osb)
> @@ -1092,10 +1070,9 @@ bail:
> static int ocfs2_recovery_completed(struct ocfs2_super *osb)
> {
> int empty;
> - struct ocfs2_recovery_map *rm = osb->recovery_map;
>
> spin_lock(&osb->osb_lock);
> - empty = (rm->rm_used == 0);
> + empty = list_empty(&osb->s_recovery_nodes);
> spin_unlock(&osb->osb_lock);
>
> return empty;
> @@ -1332,12 +1309,13 @@ void ocfs2_complete_quota_recovery(struct
> ocfs2_super *osb)
>
> static int __ocfs2_recovery_thread(void *arg)
> {
> - int status, node_num, slot_num;
> + int status;
> struct ocfs2_super *osb = arg;
> - struct ocfs2_recovery_map *rm = osb->recovery_map;
> int *rm_quota = NULL;
> int rm_quota_used = 0, i;
> struct ocfs2_quota_recovery *qrec;
> + struct list_head *iter;
> + struct ocfs2_recover_node *rn = NULL;
>
> mlog_entry_void();
>
> @@ -1367,20 +1345,21 @@ restart:
> NULL, NULL);
>
> spin_lock(&osb->osb_lock);
> - while (rm->rm_used) {
> + list_for_each(iter,&osb->s_recovery_nodes) {
> + rn = list_entry(iter, struct ocfs2_recover_node, rn_list);
> /* It's always safe to remove entry zero, as we won't
> * clear it until ocfs2_recover_node() has succeeded. */
> - node_num = rm->rm_entries[0];
> spin_unlock(&osb->osb_lock);
> - mlog(0, "checking node %d\n", node_num);
> - slot_num = ocfs2_node_num_to_slot(osb, node_num);
> - if (slot_num == -ENOENT) {
> + mlog(0, "checking node %d\n", rn->rn_node_num);
> + rn->rn_slot_num = ocfs2_node_num_to_slot(osb, rn->rn_node_num);
> + if (rn->rn_slot_num == -ENOENT) {
> status = 0;
> mlog(0, "no slot for this node, so no recovery"
> "required.\n");
> goto skip_recovery;
> }
> - mlog(0, "node %d was using slot %d\n", node_num, slot_num);
> + mlog(0, "node %d was using slot %d\n",
> + rn->rn_node_num, rn->rn_slot_num);
>
> /* It is a bit subtle with quota recovery. We cannot do it
> * immediately because we have to obtain cluster locks from
> @@ -1388,18 +1367,19 @@ restart:
> * then quota usage would be out of sync until some node takes
> * the slot. So we remember which nodes need quota recovery
> * and when everything else is done, we recover quotas. */
> - for (i = 0; i< rm_quota_used&& rm_quota[i] != slot_num; i++);
> + for (i = 0; i< rm_quota_used&& rm_quota[i] != rn->rn_slot_num;
> + i++);
> if (i == rm_quota_used)
> - rm_quota[rm_quota_used++] = slot_num;
> + rm_quota[rm_quota_used++] = rn->rn_slot_num;
>
> - status = ocfs2_recover_node(osb, node_num, slot_num);
> + status = ocfs2_recover_one_node(rn);
> skip_recovery:
> if (!status) {
> - ocfs2_recovery_map_clear(osb, node_num);
> + ocfs2_recovery_node_clear(osb, rn);
> } else {
> mlog(ML_ERROR,
> "Error %d recovering node %d on device (%u,%u)!\n",
> - status, node_num,
> + status, rn->rn_node_num,
> MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
> mlog(ML_ERROR, "Volume requires unmount.\n");
Hmm... this mlog could do with some clean up. But that can be
a separate change.
> }
> @@ -1461,6 +1441,7 @@ bail:
>
> void ocfs2_recovery_thread(struct ocfs2_super *osb, int node_num)
> {
> + int ret = 0;
> mlog_entry("(node_num=%d, osb->node_num = %d)\n",
> node_num, osb->node_num);
>
> @@ -1470,7 +1451,11 @@ void ocfs2_recovery_thread(struct ocfs2_super
> *osb, int node_num)
>
> /* People waiting on recovery will wait on
> * the recovery map to empty. */
> - if (ocfs2_recovery_map_set(osb, node_num))
> + ret = ocfs2_recovery_node_set(osb, node_num);
> + if (ret == -ENOMEM) {
> + mlog_errno(ret);
> + goto out;
> + } else if (ret)
> mlog(0, "node %d already in recovery map.\n", node_num);
>
> mlog(0, "starting recovery thread...\n");
> @@ -1681,26 +1666,27 @@ done:
> * second part of a nodes recovery process (local alloc recovery) is
> * far less concerning.
> */
> -static int ocfs2_recover_node(struct ocfs2_super *osb,
> - int node_num, int slot_num)
> +static int ocfs2_recover_one_node(struct ocfs2_recover_node *rn)
> {
> int status = 0;
> struct ocfs2_dinode *la_copy = NULL;
> struct ocfs2_dinode *tl_copy = NULL;
> + struct ocfs2_super *osb = rn->rn_osb;
>
> mlog_entry("(node_num=%d, slot_num=%d, osb->node_num = %d)\n",
> - node_num, slot_num, osb->node_num);
> + rn->rn_node_num, rn->rn_slot_num, osb->node_num);
>
> /* Should not ever be called to recover ourselves -- in that
> * case we should've called ocfs2_journal_load instead. */
> - BUG_ON(osb->node_num == node_num);
> + BUG_ON(osb->node_num == rn->rn_node_num);
>
> - status = ocfs2_replay_journal(osb, node_num, slot_num);
> + status = ocfs2_replay_journal(osb, rn->rn_node_num,
> + rn->rn_slot_num);
> if (status< 0) {
> if (status == -EBUSY) {
> mlog(0, "Skipping recovery for slot %u (node %u) "
> - "as another node has recovered it\n", slot_num,
> - node_num);
> + "as another node has recovered it\n",
> + rn->rn_slot_num, rn->rn_node_num);
> status = 0;
> goto done;
> }
> @@ -1709,7 +1695,8 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
> }
>
> /* Stamp a clean local alloc file AFTER recovering the journal... */
> - status = ocfs2_begin_local_alloc_recovery(osb, slot_num,&la_copy);
> + status = ocfs2_begin_local_alloc_recovery(osb, rn->rn_slot_num,
> + &la_copy);
> if (status< 0) {
> mlog_errno(status);
> goto done;
> @@ -1718,22 +1705,24 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
> /* An error from begin_truncate_log_recovery is not
> * serious enough to warrant halting the rest of
> * recovery. */
> - status = ocfs2_begin_truncate_log_recovery(osb, slot_num,&tl_copy);
> + status = ocfs2_begin_truncate_log_recovery(osb,
> + rn->rn_slot_num,&tl_copy);
> if (status< 0)
> mlog_errno(status);
>
> /* Likewise, this would be a strange but ultimately not so
> * harmful place to get an error... */
> - status = ocfs2_clear_slot(osb, slot_num);
> + status = ocfs2_clear_slot(osb, rn->rn_slot_num);
> if (status< 0)
> mlog_errno(status);
>
> /* This will kfree the memory pointed to by la_copy and tl_copy */
> - ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
> - tl_copy, NULL);
> + ocfs2_queue_recovery_completion(osb->journal, rn->rn_slot_num,
> + la_copy, tl_copy, NULL);
>
> status = 0;
> done:
> + rn->rn_status = status;
>
> mlog_exit(status);
> return status;
> @@ -1822,7 +1811,7 @@ int ocfs2_mark_dead_nodes(struct ocfs2_super *osb)
> continue;
> }
>
> - if (__ocfs2_recovery_map_test(osb, node_num)) {
> + if (__ocfs2_recovery_node_test(osb, node_num)) {
> spin_unlock(&osb->osb_lock);
> continue;
> }
> diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h
> index 43e56b9..0325d81 100644
> --- a/fs/ocfs2/journal.h
> +++ b/fs/ocfs2/journal.h
> @@ -43,9 +43,12 @@ struct ocfs2_dinode;
> * It is protected by the recovery_lock.
> */
>
> -struct ocfs2_recovery_map {
> - unsigned int rm_used;
> - unsigned int *rm_entries;
> +struct ocfs2_recover_node {
> + struct ocfs2_super *rn_osb;
> + int rn_node_num;
> + int rn_slot_num;
> + int rn_status;
> + struct list_head rn_list;
> };
See comment above.
>
> diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
> index 1efea36..f70c25a 100644
> --- a/fs/ocfs2/ocfs2.h
> +++ b/fs/ocfs2/ocfs2.h
> @@ -337,7 +337,8 @@ struct ocfs2_super
>
> atomic_t vol_state;
> struct mutex recovery_lock;
> - struct ocfs2_recovery_map *recovery_map;
> + struct list_head s_recovery_nodes;
s_recovery_list will fit better.
> +
> struct ocfs2_replay_map *replay_map;
> struct task_struct *recovery_thread_task;
> int disable_recovery;
> diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
> index f02c0ef..478715b 100644
> --- a/fs/ocfs2/super.c
> +++ b/fs/ocfs2/super.c
> @@ -220,7 +220,6 @@ static const match_table_t tokens = {
> static int ocfs2_osb_dump(struct ocfs2_super *osb, char *buf, int len)
> {
> struct ocfs2_cluster_connection *cconn = osb->cconn;
> - struct ocfs2_recovery_map *rm = osb->recovery_map;
> struct ocfs2_orphan_scan *os =&osb->osb_orphan_scan;
> int i, out = 0;
>
> @@ -267,21 +266,6 @@ static int ocfs2_osb_dump(struct ocfs2_super
> *osb, char *buf, int len)
> osb->dc_work_sequence);
> spin_unlock(&osb->dc_task_lock);
>
> - spin_lock(&osb->osb_lock);
> - out += snprintf(buf + out, len - out, "%10s => Pid: %d Nodes:",
> - "Recovery",
> - (osb->recovery_thread_task ?
> - task_pid_nr(osb->recovery_thread_task) : -1));
> - if (rm->rm_used == 0)
> - out += snprintf(buf + out, len - out, " None\n");
> - else {
> - for (i = 0; i< rm->rm_used; i++)
> - out += snprintf(buf + out, len - out, " %d",
> - rm->rm_entries[i]);
> - out += snprintf(buf + out, len - out, "\n");
> - }
> - spin_unlock(&osb->osb_lock);
> -
> out += snprintf(buf + out, len - out,
> "%10s => Pid: %d Interval: %lu Needs: %d\n", "Commit",
> (osb->commit_task ? task_pid_nr(osb->commit_task) : -1),
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
2010-11-17 15:50 [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node Goldwyn Rodrigues
2010-12-02 4:49 ` Sunil Mushran
@ 2010-12-02 5:29 ` Joel Becker
2010-12-02 19:05 ` Sunil Mushran
1 sibling, 1 reply; 13+ messages in thread
From: Joel Becker @ 2010-12-02 5:29 UTC (permalink / raw)
To: ocfs2-devel
On Wed, Nov 17, 2010 at 09:50:04AM -0600, Goldwyn Rodrigues wrote:
> @@ -1470,7 +1451,11 @@ void ocfs2_recovery_thread(struct ocfs2_super
> *osb, int node_num)
>
> /* People waiting on recovery will wait on
> * the recovery map to empty. */
> - if (ocfs2_recovery_map_set(osb, node_num))
> + ret = ocfs2_recovery_node_set(osb, node_num);
> + if (ret == -ENOMEM) {
> + mlog_errno(ret);
> + goto out;
> + } else if (ret)
> mlog(0, "node %d already in recovery map.\n", node_num);
This is a broken change. If we get -ENOMEM, we won't block
other processes. We can't have that happen. There are two possible
solutions. First, like Sunil said, we can preallocate max_slots recovery
entries. Seems pretty sane. The other solution would be to set an
in-recovery flag that others can check, so even when the recovery list
is empty because of a failed allocation, other processes still block. I
prefer the preallocation because it doesn't fail recovery.
Joel
--
"In a crisis, don't hide behind anything or anybody. They're going
to find you anyway."
- Paul "Bear" Bryant
Joel Becker
Senior Development Manager
Oracle
E-mail: joel.becker at oracle.com
Phone: (650) 506-8127
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
2010-12-02 5:29 ` Joel Becker
@ 2010-12-02 19:05 ` Sunil Mushran
2010-12-02 23:49 ` Joel Becker
0 siblings, 1 reply; 13+ messages in thread
From: Sunil Mushran @ 2010-12-02 19:05 UTC (permalink / raw)
To: ocfs2-devel
On 12/01/2010 09:29 PM, Joel Becker wrote:
> On Wed, Nov 17, 2010 at 09:50:04AM -0600, Goldwyn Rodrigues wrote:
>> @@ -1470,7 +1451,11 @@ void ocfs2_recovery_thread(struct ocfs2_super
>> *osb, int node_num)
>>
>> /* People waiting on recovery will wait on
>> * the recovery map to empty. */
>> - if (ocfs2_recovery_map_set(osb, node_num))
>> + ret = ocfs2_recovery_node_set(osb, node_num);
>> + if (ret == -ENOMEM) {
>> + mlog_errno(ret);
>> + goto out;
>> + } else if (ret)
>> mlog(0, "node %d already in recovery map.\n", node_num);
> This is a broken change. If we get -ENOMEM, we won't block
> other processes. We can't have that happen. There are two possible
> solutions. First, like Sunil said, we can preallocate max_slots recovery
> entries. Seems pretty sane. The other solution would be to set an
> in-recovery flag that others can check, so even when the recovery list
> is empty because of a failed allocation, other processes still block. I
> prefer the preallocation because it doesn't fail recovery.
You could stick with the list but preallocate max_slots items during init.
Attach all those to the s_init_reco_list. During set, move a item to
s_active_reco_list. Clear moves it back.
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node
2010-12-02 19:05 ` Sunil Mushran
@ 2010-12-02 23:49 ` Joel Becker
0 siblings, 0 replies; 13+ messages in thread
From: Joel Becker @ 2010-12-02 23:49 UTC (permalink / raw)
To: ocfs2-devel
On Thu, Dec 02, 2010 at 11:05:01AM -0800, Sunil Mushran wrote:
> On 12/01/2010 09:29 PM, Joel Becker wrote:
> > On Wed, Nov 17, 2010 at 09:50:04AM -0600, Goldwyn Rodrigues wrote:
> >> @@ -1470,7 +1451,11 @@ void ocfs2_recovery_thread(struct ocfs2_super
> >> *osb, int node_num)
> >>
> >> /* People waiting on recovery will wait on
> >> * the recovery map to empty. */
> >> - if (ocfs2_recovery_map_set(osb, node_num))
> >> + ret = ocfs2_recovery_node_set(osb, node_num);
> >> + if (ret == -ENOMEM) {
> >> + mlog_errno(ret);
> >> + goto out;
> >> + } else if (ret)
> >> mlog(0, "node %d already in recovery map.\n", node_num);
> > This is a broken change. If we get -ENOMEM, we won't block
> > other processes. We can't have that happen. There are two possible
> > solutions. First, like Sunil said, we can preallocate max_slots recovery
> > entries. Seems pretty sane. The other solution would be to set an
> > in-recovery flag that others can check, so even when the recovery list
> > is empty because of a failed allocation, other processes still block. I
> > prefer the preallocation because it doesn't fail recovery.
>
> You could stick with the list but preallocate max_slots items during init.
> Attach all those to the s_init_reco_list. During set, move a item to
> s_active_reco_list. Clear moves it back.
Exactly.
Joel
--
"You must remember this:
A kiss is just a kiss,
A sigh is just a sigh.
The fundamental rules apply
As time goes by."
Joel Becker
Senior Development Manager
Oracle
E-mail: joel.becker at oracle.com
Phone: (650) 506-8127
^ permalink raw reply [flat|nested] 13+ messages in thread
end of thread, other threads:[~2010-12-02 23:49 UTC | newest]
Thread overview: 13+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2010-11-17 15:50 [Ocfs2-devel] [PATCH 1/2] Introduce ocfs2_recover_node Goldwyn Rodrigues
2010-12-02 4:49 ` Sunil Mushran
2010-12-02 5:29 ` Joel Becker
2010-12-02 19:05 ` Sunil Mushran
2010-12-02 23:49 ` Joel Becker
-- strict thread matches above, loose matches on Subject: below --
2010-11-16 22:59 Goldwyn Rodrigues
2010-11-17 1:49 ` Wengang Wang
2010-11-17 2:00 ` Wengang Wang
2010-11-17 2:04 ` Wengang Wang
2010-11-12 1:01 Goldwyn Rodrigues
2010-11-12 2:16 ` Wengang Wang
2010-11-12 19:22 ` Goldwyn Rodrigues
2010-11-13 5:03 ` Wengang Wang
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.