From mboxrd@z Thu Jan 1 00:00:00 1970 From: jbrassow@sourceware.org Date: 5 Nov 2007 22:44:06 -0000 Subject: [Cluster-devel] cluster/cmirror/src cluster.c functions.c func ... Message-ID: <20071105224406.19189.qmail@sourceware.org> List-Id: To: cluster-devel.redhat.com MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit CVSROOT: /cvs/cluster Module name: cluster Branch: RHEL5 Changes by: jbrassow at sourceware.org 2007-11-05 22:44:04 Modified files: cmirror/src : cluster.c functions.c functions.h link_mon.c local.c Log message: - Fix problem with recovery work assignment (still need to add priority recovery... otherwise, I/O will stall for long periods during mirror resync) - Clean-up checkpointing code and fix a couple bugs there that prevented proper start-up. Patches: http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/cluster.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.3&r2=1.1.2.4 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.2&r2=1.1.2.3 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/link_mon.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.1&r2=1.1.2.2 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/local.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.2&r2=1.1.2.3 --- cluster/cmirror/src/Attic/cluster.c 2007/11/03 18:53:03 1.1.2.3 +++ cluster/cmirror/src/Attic/cluster.c 2007/11/05 22:44:03 1.1.2.4 @@ -49,6 +49,8 @@ int valid; struct queue *startup_queue; + int checkpoints_needed; + uint32_t checkpoint_requesters[10]; struct checkpoint_data *checkpoint_list; }; @@ -384,7 +386,7 @@ return 0; } -static int import_checkpoint(struct clog_cpg *entry) +static int import_checkpoint(struct clog_cpg *entry, int no_read) { int rtn = 0; SaCkptCheckpointHandleT h; @@ -422,6 +424,11 @@ saCkptCheckpointUnlink(ckpt_handle, &name); + if (no_read) { + LOG_DBG("Checkpoint for this log already recieved"); + goto no_read; + } + init_retry: rv = saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY, 0, &itr); if (rv == SA_AIS_ERR_TRY_AGAIN) { @@ -516,7 +523,7 @@ fail: saCkptSectionIterationFinalize(itr); - +no_read: saCkptCheckpointClose(h); free(bitmap); @@ -542,11 +549,18 @@ * notice in tfr in export_checkpoint function * by setting tfr->error */ - export_checkpoint(cp); - - entry->checkpoint_list = cp->next; - free_checkpoint(cp); - cp = entry->checkpoint_list; + switch (export_checkpoint(cp)) { + case -EEXIST: + LOG_DBG("Checkpoint already handled by someone else"); + case 0: + entry->checkpoint_list = cp->next; + free_checkpoint(cp); + cp = entry->checkpoint_list; + break; + default: + /* FIXME: Skipping will cause list corruption */ + LOG_ERROR("Failed to export checkpoint"); + } } } EXIT(); @@ -556,7 +570,9 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname, uint32_t nodeid, uint32_t pid, void *msg, int msg_len) { + int i; int r = 0; + int i_am_server; struct clog_tfr *tfr = msg; struct clog_tfr *startup_tfr = NULL; struct clog_tfr *cp_tfr = NULL; @@ -567,9 +583,6 @@ if (msg_len != (sizeof(*tfr) + tfr->data_size)) LOG_ERROR("Badly sized message recieved from cluster."); - LOG_DBG("Message (len = %d) from node/pid %u/%d", msg_len, - nodeid, pid); - if (tfr->request_type & DM_CLOG_RESPONSE) LOG_DBG("Response from cluster recieved %s", RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE)); @@ -587,13 +600,31 @@ LOG_ERROR("Unable to find clog_cpg for cluster message"); goto out; } + i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0; if (tfr->request_type == DM_CLOG_CHECKPOINT_READY) { - if ((!match->valid) && (my_cluster_id == tfr->originator)) { - switch (import_checkpoint(match)) { + /* Redundant checkpoints ignored due to match->valid */ + if (my_cluster_id == tfr->originator) { + switch (import_checkpoint(match, match->valid)) { case 0: - LOG_DBG("Checkpoint data recieved. Log is now valid"); - match->valid = 1; + if (!match->valid) { + LOG_DBG("Checkpoint data recieved. Log is now valid"); + match->valid = 1; + while ((startup_tfr = queue_remove(match->startup_queue))) { + LOG_DBG("Processing delayed request %d: %s", + match->startup_queue->count, + RQ_TYPE(startup_tfr->request_type)); + r = handle_cluster_request(startup_tfr, i_am_server); + + if (r) { + LOG_ERROR("Error while processing delayed CPG message"); + goto out; + } else { + queue_add(startup_tfr, free_queue); + } + } + } + break; case -EAGAIN: LOG_PRINT("Checkpoint data empty. Requesting new checkpoint."); @@ -628,28 +659,28 @@ goto out; } - if (tfr->request_type == DM_CLOG_CHECKPOINT_REQUEST) { - if (tfr->originator == my_cluster_id) { - /* - * The checkpoint includes any request up to the - * request for checkpoint. So, we must clear any - * previous requests we were storing. - */ - while ((startup_tfr = queue_remove(match->startup_queue))) - queue_add(startup_tfr, free_queue); - } else if (my_cluster_id == match->lowest_id) { - struct checkpoint_data *new; - - new = prepare_checkpoint(match, tfr->originator); - if (!new) { - /* FIXME: Need better error handling */ - LOG_ERROR("Failed to prepare checkpoint!!!"); - goto out; - } - new->next = match->checkpoint_list; - match->checkpoint_list = new; + /* + * If the log is now valid, we can queue the checkpoints + */ + for (i = match->checkpoints_needed; i;) { + struct checkpoint_data *new; + + i--; + if (log_get_state(tfr) != LOG_RESUMED) { + LOG_DBG("Skipping checkpoint for %u, my log is not ready", + match->checkpoint_requesters[i]); + continue; } - goto out; + new = prepare_checkpoint(match, match->checkpoint_requesters[i]); + if (!new) { + /* FIXME: Need better error handling */ + LOG_ERROR("Failed to prepare checkpoint for %u!!!", + match->checkpoint_requesters[i]); + break; + } + match->checkpoints_needed = i; + new->next = match->checkpoint_list; + match->checkpoint_list = new; } if (tfr->request_type & DM_CLOG_RESPONSE) @@ -665,21 +696,7 @@ goto out; } - while ((startup_tfr = queue_remove(match->startup_queue))) { - LOG_DBG("Processing delayed request %d: %s", - match->startup_queue->count, - RQ_TYPE(startup_tfr->request_type)); - r = handle_cluster_request(startup_tfr, - (my_cluster_id == match->lowest_id) ? 1 : 0); - if (r) { - LOG_ERROR("Error while processing delayed CPG message"); - goto out; - } else { - queue_add(startup_tfr, free_queue); - } - } - - r = handle_cluster_request(tfr, (my_cluster_id == match->lowest_id) ? 1 : 0); + r = handle_cluster_request(tfr, i_am_server); } out: @@ -694,16 +711,14 @@ struct cpg_address *left_list, int left_list_entries, struct cpg_address *joined_list, int joined_list_entries) { - int i; + int i, j; int my_pid = getpid(); int found = 0; - int i_was_server = 0; - int do_checkpoint = 0; struct clog_cpg *match, *tmp; ENTER(); - LOG_PRINT("* CPG config callback *********************"); + LOG_PRINT("****** CPG config callback ****************"); LOG_PRINT("* JOINING (%d):", joined_list_entries); for (i = 0; i < joined_list_entries; i++) @@ -751,6 +766,7 @@ goto out; } + /* Assign my_cluster_id */ if (my_cluster_id == 0xDEAD) { for (i = 0; i < joined_list_entries; i++) { LOG_DBG("My pid = %d\t\t[%u/%d]", my_pid, @@ -764,42 +780,30 @@ LOG_PRINT("Setting my cluster id: %u", my_cluster_id); } } - } else if (match->lowest_id == my_cluster_id) { - LOG_DBG("I was the server (lowest_id = %u, my_cluster_id = %u)", - match->lowest_id, my_cluster_id); - i_was_server = 1; + goto out; } + /* Find the lowest_id, i.e. the server */ for (i = 0, match->lowest_id = member_list[0].nodeid; i < member_list_entries; i++) if (match->lowest_id > member_list[i].nodeid) match->lowest_id = member_list[i].nodeid; - if (match->lowest_id == my_cluster_id) { - /* I am the server now */ - if (!i_was_server) - LOG_PRINT("I am the new log server for %s", match->name.value); - else if (joined_list_entries) { - LOG_PRINT("I must send checkpoint data."); - do_checkpoint = 1; - } - } else if (i_was_server && joined_list_entries) { - LOG_PRINT("Giving server ownership to %u", match->lowest_id); - LOG_PRINT("I must send checkpoint data."); - do_checkpoint = 1; - } + LOG_PRINT("Server is now %u", match->lowest_id); - if (do_checkpoint) { - struct checkpoint_data *new; + /* + * If I am part of the joining list, I do not send checkpoints + * FIXME: What are the cases where multiple nodes can join? + */ + for (i = 0; i < joined_list_entries; i++) + if (joined_list[i].nodeid == my_cluster_id) + goto out; - for (i = 0; i < joined_list_entries; i++) { - new = prepare_checkpoint(match, joined_list[i].nodeid); - if (!new) - goto out; - new->next = match->checkpoint_list; - match->checkpoint_list = new; - } + for (i = 0, j = match->checkpoints_needed; i < joined_list_entries; i++) { + LOG_DBG("Joining node, %u needs checkpoint", joined_list[i].nodeid); + match->checkpoint_requesters[i + j] = joined_list[i].nodeid; } + match->checkpoints_needed += i; out: EXIT(); --- cluster/cmirror/src/Attic/functions.c 2007/11/03 18:37:48 1.1.2.2 +++ cluster/cmirror/src/Attic/functions.c 2007/11/05 22:44:03 1.1.2.3 @@ -56,6 +56,8 @@ FORCESYNC, /* Force a sync to happen */ } sync; + uint32_t state; /* current operational state of the log */ + int disk_fd; /* -1 means no disk log */ int log_dev_failed; uint64_t disk_nr_regions; @@ -563,6 +565,8 @@ if (lc->touched) LOG_DBG("WARNING: log still marked as 'touched' during suspend"); + lc->state = LOG_SUSPENDED; + return 0; } @@ -583,12 +587,12 @@ } /* - * _clog_resume + * clog_resume * @tfr * * Does the main work of resuming. */ -static int _clog_resume(struct clog_tfr *tfr) +static int clog_resume(struct clog_tfr *tfr) { uint32_t i; struct log_c *lc = get_log(tfr->uuid); @@ -617,6 +621,9 @@ case 3: LOG_DBG("Non-master resume: bits pre-loaded"); lc->resume_override = 1000; + lc->sync_count = count_bits32(lc->clean_bits, lc->bitset_uint32_count); + LOG_DBG("sync_count = %llu", lc->sync_count); + goto out; return 0; default: LOG_ERROR("Error:: multiple loading of bits (%d)", lc->resume_override); @@ -662,36 +669,29 @@ /* copy clean across to sync */ memcpy(lc->sync_bits, lc->clean_bits, size); lc->sync_count = count_bits32(lc->clean_bits, lc->bitset_uint32_count); + LOG_DBG("sync_count = %llu", lc->sync_count); lc->sync_search = 0; /* - tfr->error = write_log(lc); - if (tfr->error) { - lc->log_dev_failed = 1; - LOG_ERROR("Failed to write initial disk log"); - } else - lc->log_dev_failed = 0; - */ - /* * We mark 'touched' so that only the master commits * the log via 'commit_log' */ lc->touched = 1; - +out: + lc->state = LOG_RESUMED; + return tfr->error; } /* - * clog_resume + * local_resume * @tfr * * If the log is pending, we must first join the cpg and * put the log in the official list. * - * If the log is in the official list, then we call - * _clog_resume. */ -static int clog_resume(struct clog_tfr *tfr) +int local_resume(struct clog_tfr *tfr) { int r; struct log_c *lc = get_log(tfr->uuid); @@ -714,12 +714,9 @@ /* move log to official list */ list_del_init(&lc->list); list_add(&lc->list, &log_list); - - return 0; } - /* log is in the official list, try to resume */ - return _clog_resume(tfr); + return 0; } /* @@ -896,7 +893,6 @@ */ static int clog_get_resync_work(struct clog_tfr *tfr) { - int sync_search, conflict=0; struct {int i; uint64_t r; } *pkg = (void *)tfr->data; struct log_c *lc = get_log(tfr->uuid); @@ -905,42 +901,35 @@ tfr->data_size = sizeof(*pkg); - /* - * Check if we are done recovering, or if there - * is already someone recovering. - */ - if ((lc->sync_search >= lc->region_count) || - (lc->recovering_region != (uint64_t)-1)) { + if (lc->sync_search >= lc->region_count) { + /* + * FIXME: handle intermittent errors during recovery + * by resetting sync_search... but not to many times. + */ + LOG_DBG(" Recovery has finished"); pkg->i = 0; return 0; } - for (sync_search = lc->sync_search; - sync_search < lc->region_count; - sync_search += (pkg->r + 1)) { - pkg->r = find_next_zero_bit(lc->sync_bits, - lc->region_count, - sync_search); - - /* - * If the region is currently marked, we cannot - * recover it yet. - */ - if (!log_test_bit(lc->clean_bits, pkg->r)) - conflict = 1; - else - break; + if (lc->recovering_region != (uint64_t)-1) { + LOG_DBG("Someone is already recovering region %Lu", + lc->recovering_region); + pkg->i = 0; + return 0; } - if (!conflict) - lc->sync_search = pkg->r + 1; + pkg->r = find_next_zero_bit(lc->sync_bits, + lc->region_count, + lc->sync_search); if (pkg->r >= lc->region_count) { pkg->i = 0; return 0; } - LOG_DBG("Assigning resync work: region = %llu\n", pkg->r); + lc->sync_search = pkg->r + 1; + + LOG_DBG(" Assigning resync work: region = %llu\n", pkg->r); pkg->i = 1; return 0; } @@ -984,6 +973,8 @@ return -EINVAL; *sync_count = lc->sync_count; + LOG_DBG("sync_count = %llu", *sync_count); + tfr->data_size = sizeof(*sync_count); return 0; @@ -1347,6 +1338,17 @@ return 0; } +int log_get_state(struct clog_tfr *tfr) +{ + struct log_c *lc; + + lc = get_log(tfr->uuid); + if (!lc) + return -EINVAL; + + return lc->state; +} + int log_status(void) { int found = 0; @@ -1380,3 +1382,4 @@ } return found; } + --- cluster/cmirror/src/Attic/functions.h 2007/08/23 19:57:31 1.1.2.1 +++ cluster/cmirror/src/Attic/functions.h 2007/11/05 22:44:03 1.1.2.2 @@ -3,9 +3,14 @@ #include +#define LOG_RESUMED 1 +#define LOG_SUSPENDED 2 + +int local_resume(struct clog_tfr *tfr); int do_request(struct clog_tfr *tfr); int commit_log(struct clog_tfr *tfr); int store_bits(const char *uuid, const char *which, char **buf); int load_bits(const char *uuid, const char *which, char *buf, int size); +int log_get_state(struct clog_tfr *tfr); int log_status(void); #endif /* __CLOG_FUNCTIONS_DOT_H__ */ --- cluster/cmirror/src/Attic/link_mon.c 2007/08/23 19:57:31 1.1.2.1 +++ cluster/cmirror/src/Attic/link_mon.c 2007/11/05 22:44:03 1.1.2.2 @@ -102,8 +102,6 @@ /* FIXME: handle POLLHUP */ for (i = 0; i < used_pfds; i++) if (pfds[i].revents & POLLIN) { - LOG_DBG("Data waiting on file descriptor, %d.", - pfds[i].fd); /* FIXME: Add this back return 1;*/ r++; } --- cluster/cmirror/src/Attic/local.c 2007/11/03 18:37:48 1.1.2.2 +++ cluster/cmirror/src/Attic/local.c 2007/11/05 22:44:03 1.1.2.3 @@ -170,7 +170,7 @@ * component to join the CPG, and a cluster component * to handle the request. */ - r = do_request(tfr); + r = local_resume(tfr); if (r) { LOG_DBG("Returning failed request to kernel [%s]", RQ_TYPE(tfr->request_type));