From mboxrd@z Thu Jan 1 00:00:00 1970 From: lhh@sourceware.org Date: 7 Aug 2006 22:05:03 -0000 Subject: [Cluster-devel] cluster/rgmanager ChangeLog include/cman-priva ... Message-ID: <20060807220503.7437.qmail@sourceware.org> List-Id: To: cluster-devel.redhat.com MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit CVSROOT: /cvs/cluster Module name: cluster Changes by: lhh at sourceware.org 2006-08-07 22:05:02 Modified files: rgmanager : ChangeLog rgmanager/include: cman-private.h message.h rgmanager/src/clulib: alloc.c cman.c message.c msg_cluster.c msgtest.c vft.c rgmanager/src/daemons: main.c rg_forward.c rg_state.c rgmanager/src/utils: clustat.c Added files: rgmanager/src/clulib: ckpt_state.c Log message: * src/clulib/ckpt_state.c: Preliminary implementation of replacement for VF using AIS CKPT B.02.01 (w/ built-in test program) * include/cman-private.h: Clean up APIs (cman APIs return cman_handle_t, which is void*, should be using void ** all over) * include/message.h: Bump context count to 128, add destination node ID in header of packets. * src/clulib/alloc.c: If we alloc the same size, return the same block * src/clulib/cman.c: API cleanups * src/clulib/message.c: Add error checking to msg_print * src/clulib/msg_cluster.c: Check destination in header before processing message remove dup #define for MAX_CONTEXTS, add proto_error() macro for displaying protocol errors. Use 'max' instead of 'fd' for select(). Use correct var when assigning contexts. Fix CMAN handles. Return correct size from msg_send() requests. * src/clulib/msgtest.c: Fix CMAN handles * src/clulib/vft.c: Don't handle VF_CURRENT inside comms thread * src/daemons/main.c: Check to see if nodes are listening on our port before we consider them running. Handle VF_CURRENT requests from other nodes. Fail if we can't determine local node ID * src/daemons/rg_forward.c: Give 10 minutes for responses to forwarded requests. * src/daemons/rg_state.c: Shorten RG state names. Fix 'Uncertain' output line. * src/utils/clustat.c: Fix ccs_member_list() function. Patches: http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/ChangeLog.diff?cvsroot=cluster&r1=1.17&r2=1.18 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/include/cman-private.h.diff?cvsroot=cluster&r1=1.1&r2=1.2 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/include/message.h.diff?cvsroot=cluster&r1=1.2&r2=1.3 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/ckpt_state.c.diff?cvsroot=cluster&r1=NONE&r2=1.1 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/alloc.c.diff?cvsroot=cluster&r1=1.8&r2=1.9 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/cman.c.diff?cvsroot=cluster&r1=1.1&r2=1.2 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/message.c.diff?cvsroot=cluster&r1=1.2&r2=1.3 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/msg_cluster.c.diff?cvsroot=cluster&r1=1.1&r2=1.2 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/msgtest.c.diff?cvsroot=cluster&r1=1.1&r2=1.2 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/vft.c.diff?cvsroot=cluster&r1=1.14&r2=1.15 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/main.c.diff?cvsroot=cluster&r1=1.26&r2=1.27 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/rg_forward.c.diff?cvsroot=cluster&r1=1.5&r2=1.6 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/rg_state.c.diff?cvsroot=cluster&r1=1.18&r2=1.19 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/utils/clustat.c.diff?cvsroot=cluster&r1=1.18&r2=1.19 --- cluster/rgmanager/ChangeLog 2006/06/02 17:37:10 1.17 +++ cluster/rgmanager/ChangeLog 2006/08/07 22:05:01 1.18 @@ -1,3 +1,31 @@ +2006-08-07 Lon Hohberger + * src/clulib/ckpt_state.c: Preliminary implementation of replacement + for VF using AIS CKPT B.02.01 (w/ built-in test program) + * include/cman-private.h: Clean up APIs (cman APIs return + cman_handle_t, which is void*, should be using void ** all over) + * include/message.h: Bump context count to 128, add destination + node ID in header of packets. + * src/clulib/alloc.c: If we alloc the same size, return the same + block + * src/clulib/cman.c: API cleanups + * src/clulib/message.c: Add error checking to msg_print + * src/clulib/msg_cluster.c: Check destination in header before + processing message remove dup #define for MAX_CONTEXTS, add + proto_error() macro for displaying protocol errors. Use 'max' + instead of 'fd' for select(). Use correct var when assigning + contexts. Fix CMAN handles. Return correct size from msg_send() + requests. + * src/clulib/msgtest.c: Fix CMAN handles + * src/clulib/vft.c: Don't handle VF_CURRENT inside comms thread + * src/daemons/main.c: Check to see if nodes are listening on our + port before we consider them running. Handle VF_CURRENT requests + from other nodes. Fail if we can't determine local node ID + * src/daemons/rg_forward.c: Give 10 minutes for responses to + forwarded requests. + * src/daemons/rg_state.c: Shorten RG state names. Fix 'Uncertain' + output line. + * src/utils/clustat.c: Fix ccs_member_list() function. + 2006-05-23 Lon Hohberger * src/daemons/members.c: Zap pad fields on copy-out * src/daemons/main.c: Give notice if skipping an event because of --- cluster/rgmanager/include/cman-private.h 2006/07/12 14:04:06 1.1 +++ cluster/rgmanager/include/cman-private.h 2006/08/07 22:05:01 1.2 @@ -3,11 +3,11 @@ #include -int cman_init_subsys(cman_handle_t *ch); -cman_handle_t *cman_lock(int block, int sig); -cman_handle_t *cman_lock_preemptible(int block, int *fd); +int cman_init_subsys(cman_handle_t ch); +cman_handle_t cman_lock(int block, int sig); +cman_handle_t cman_lock_preemptible(int block, int *fd); int cman_cleanup_subsys(void); -int cman_unlock(cman_handle_t *ch); +int cman_unlock(cman_handle_t ch); int cman_send_data_unlocked(void *buf, int len, int flags, uint8_t port, int nodeid); --- cluster/rgmanager/include/message.h 2006/07/12 14:04:06 1.2 +++ cluster/rgmanager/include/message.h 2006/08/07 22:05:01 1.3 @@ -30,14 +30,17 @@ /* Header is never presented to applications */ typedef struct { - uint32_t dest_ctx; uint32_t src_ctx; - /* 8 */ uint32_t src_nodeid; + /* 8 */ + uint32_t dest_ctx; + uint32_t dest_nodeid; + /* 16 */ uint8_t msg_control; uint8_t msg_port; uint8_t pad[2]; - /* 16 */ + /* 20 */ + uint8_t msg_reserved[12]; } cluster_msg_hdr_t; /* Header is never presented to applications */ @@ -62,6 +65,7 @@ swab32((ptr)->dest_ctx);\ swab32((ptr)->src_ctx);\ swab32((ptr)->src_nodeid);\ + swab32((ptr)->dest_nodeid);\ } @@ -123,7 +127,7 @@ /* Ripped from ccsd's setup_local_socket */ -#define MAX_CONTEXTS 32 /* Testing; production should be 1024-ish */ +#define MAX_CONTEXTS 128 /* Testing; production should be 1024-ish */ #define SKF_LISTEN (1<<0) #define SKF_READ (1<<1) /cvs/cluster/cluster/rgmanager/src/clulib/ckpt_state.c,v --> standard output revision 1.1 --- cluster/rgmanager/src/clulib/ckpt_state.c +++ - 2006-08-07 22:05:02.499001000 +0000 @@ -0,0 +1,550 @@ +/* + Copyright Red Hat, Inc. 2002-2006 + + This program is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by the + Free Software Foundation; either version 2, or (at your option) any + later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 675 Mass Ave, Cambridge, + MA 02139, USA. +*/ +//#define DEBUG +/** @file + * Distributed states using saCkpt interface + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +typedef struct _key_node { + struct _key_node *kn_next; + char *kn_keyid; + SaTimeT kn_timeout; + uint16_t kn_ready; + SaNameT kn_cpname; + SaCkptCheckpointHandleT kn_cph; +} key_node_t; + + +static key_node_t *key_list = NULL; +static SaCkptHandleT ds_ckpt; +static int ds_ready = 0; +static pthread_mutex_t ds_mutex = PTHREAD_MUTEX_INITIALIZER; + + +int ais_to_posix(SaAisErrorT err); + + +static key_node_t * +kn_find_key(char *keyid) +{ + key_node_t *cur; + + for (cur = key_list; cur; cur = cur->kn_next) + if (!strcmp(cur->kn_keyid,keyid)) + return cur; + + return NULL; +} + + +/** + * Adds a key to key node list and sets up callback functions. + */ +static SaAisErrorT +ds_key_init_nt(char *keyid, int maxsize, int timeout) +{ + SaCkptCheckpointCreationAttributesT attrs; + SaCkptCheckpointOpenFlagsT flags; + SaCkptCheckpointDescriptorT status; + SaAisErrorT err; + key_node_t *newnode = NULL; + + newnode = kn_find_key(keyid); + if (newnode) { + printf("Key %s already initialized\n", keyid); + return SA_AIS_OK; + } + + newnode = malloc(sizeof(*newnode)); + memset(newnode,0,sizeof(*newnode)); + snprintf((char *)newnode->kn_cpname.value, SA_MAX_NAME_LENGTH-1, + "%s", keyid); + newnode->kn_cpname.length = strlen(keyid); + newnode->kn_keyid = (char *)newnode->kn_cpname.value; + newnode->kn_ready = 0; + + if (timeout < 5) { + /* Join View message timeout must exceed the + coordinator timeout */ + timeout = 5; + } + newnode->kn_timeout = timeout * SA_TIME_ONE_SECOND; + + flags = SA_CKPT_CHECKPOINT_READ | + SA_CKPT_CHECKPOINT_WRITE; + + err = saCkptCheckpointOpen(ds_ckpt, + &newnode->kn_cpname, + NULL, + flags, + newnode->kn_timeout, + &newnode->kn_cph); + + if (err == SA_AIS_OK) { + saCkptCheckpointStatusGet(newnode->kn_cph, + &status); + + printf("Checkpoint Size = %d bytes\n", (int) + status.checkpointCreationAttributes.checkpointSize); + printf("Flags = "); + if (status.checkpointCreationAttributes.creationFlags & + SA_CKPT_WR_ALL_REPLICAS) { + printf("%s ", "SA_CKPT_WR_ALL_REPLICAS"); + } + if (status.checkpointCreationAttributes.creationFlags & + SA_CKPT_WR_ACTIVE_REPLICA) { + printf("%s ", "SA_CKPT_WR_ACTIVE_REPLICA"); + } + if (status.checkpointCreationAttributes.creationFlags & + SA_CKPT_WR_ACTIVE_REPLICA_WEAK) { + printf("%s ", "SA_CKPT_WR_ACTIVE_REPLICA_WEAK"); + } + if (status.checkpointCreationAttributes.creationFlags & + SA_CKPT_CHECKPOINT_COLLOCATED) { + printf("%s ", "SA_CKPT_CHECKPOINT_COLLOCATED"); + } + printf("\nMax sections = %d\n", + (int)status.checkpointCreationAttributes.maxSections); + printf("Max section size = %d\n", + (int)status.checkpointCreationAttributes.maxSectionSize); + printf("Max section ID size = %d\n", + (int)status.checkpointCreationAttributes.maxSectionIdSize); + printf("Section count = %d\n", status.numberOfSections); + printf("\n"); + + goto good; + } + + printf("Retrying w/ create\n"); + + attrs.creationFlags = SA_CKPT_WR_ALL_REPLICAS; + attrs.checkpointSize = (SaSizeT)maxsize; + attrs.retentionDuration = SA_TIME_ONE_HOUR; + attrs.maxSections = 1; + attrs.maxSectionSize = (SaSizeT)maxsize; + attrs.maxSectionIdSize = (SaSizeT)32; + + flags = SA_CKPT_CHECKPOINT_READ | + SA_CKPT_CHECKPOINT_WRITE | + SA_CKPT_CHECKPOINT_CREATE; + + err = saCkptCheckpointOpen(ds_ckpt, + &newnode->kn_cpname, + &attrs, + flags, + newnode->kn_timeout, + &newnode->kn_cph); + if (err == SA_AIS_OK) + goto good; + + /* No checkpoint */ + free(newnode); + return err; +good: + + newnode->kn_ready = 1; + newnode->kn_next = key_list; + key_list = newnode; + printf("Opened ckpt %s\n", keyid); + + return err; +} + + +int +ds_key_init(char *keyid, int maxsize, int timeout) +{ + SaAisErrorT err; + + pthread_mutex_lock(&ds_mutex); + err = ds_key_init_nt(keyid, maxsize, timeout); + pthread_mutex_unlock(&ds_mutex); + + errno = ais_to_posix(err); + if (errno) + return -1; + return 0; +} + + +static SaAisErrorT +ds_key_cleanup(key_node_t *node) +{ + if (!node || !node->kn_ready) { + printf("Key %s already freed\n", node->kn_keyid); + return SA_AIS_OK; + } + + return saCkptCheckpointClose(node->kn_cph); +} + + + +static SaAisErrorT +ds_key_finish_nt(char *keyid) +{ + key_node_t *node; + + node = kn_find_key(keyid); + /* TODO: Free list entry */ + + return ds_key_cleanup(node); +} + + +int +ds_key_finish(char *keyid) +{ + SaAisErrorT err; + + pthread_mutex_lock(&ds_mutex); + err = ds_key_finish_nt(keyid); + pthread_mutex_unlock(&ds_mutex); + + errno = ais_to_posix(err); + if (errno) + return -1; + return 0; +} + + + +static void +open_callback(SaInvocationT invocation, + SaCkptCheckpointHandleT handle, + SaAisErrorT error) +{ + /* Do Open callback here. Since we use sync calls instead + of async calls, this is never used. */ +} + + +static void +sync_callback(SaInvocationT invocation, + SaAisErrorT error) +{ + /* Do Sync callback here. Since we use sync calls instead + of async calls, this is never used. */ +} + + +int +ais_to_posix(SaAisErrorT err) +{ + switch (err) { + case SA_AIS_OK: + return 0; + case SA_AIS_ERR_LIBRARY: + return ELIBBAD; + case SA_AIS_ERR_VERSION: + return EPROTONOSUPPORT; //XXX + case SA_AIS_ERR_INIT: + return EFAULT; //XXX + case SA_AIS_ERR_TIMEOUT: + return ETIMEDOUT; + case SA_AIS_ERR_TRY_AGAIN: + return EAGAIN; + case SA_AIS_ERR_INVALID_PARAM: + return EINVAL; + case SA_AIS_ERR_NO_MEMORY: + return ENOMEM; + case SA_AIS_ERR_BAD_HANDLE: + return EBADF; + case SA_AIS_ERR_BUSY: + return EBUSY; + case SA_AIS_ERR_ACCESS: + return EACCES; + case SA_AIS_ERR_NOT_EXIST: + return ENOENT; + case SA_AIS_ERR_NAME_TOO_LONG: + return ENAMETOOLONG; + case SA_AIS_ERR_EXIST: + return EEXIST; + case SA_AIS_ERR_NO_SPACE: + return ENOSPC; + case SA_AIS_ERR_INTERRUPT: + return EINTR; + case SA_AIS_ERR_NAME_NOT_FOUND: + return ENOENT; + case SA_AIS_ERR_NO_RESOURCES: + return ENOMEM; //XXX + case SA_AIS_ERR_NOT_SUPPORTED: + return ENOSYS; + case SA_AIS_ERR_BAD_OPERATION: + return EINVAL; //XXX + case SA_AIS_ERR_FAILED_OPERATION: + return EIO; //XXX + case SA_AIS_ERR_MESSAGE_ERROR: + return EIO; // XXX + case SA_AIS_ERR_QUEUE_FULL: + return ENOBUFS; + case SA_AIS_ERR_QUEUE_NOT_AVAILABLE: + return ENOENT; + case SA_AIS_ERR_BAD_FLAGS: + return EINVAL; + case SA_AIS_ERR_TOO_BIG: + return E2BIG; + case SA_AIS_ERR_NO_SECTIONS: + return ENOENT; // XXX + } + + return -1; +} + + +int +ds_init(void) +{ + int ret = 0; + SaAisErrorT err; + SaVersionT ver; + SaCkptCallbacksT callbacks; + + pthread_mutex_lock(&ds_mutex); + if (ds_ready) { + pthread_mutex_unlock(&ds_mutex); + return 0; + } + + ver.releaseCode = 'B'; + ver.majorVersion = 1; + ver.minorVersion = 1; + + callbacks.saCkptCheckpointOpenCallback = open_callback; + callbacks.saCkptCheckpointSynchronizeCallback = sync_callback; + + err = saCkptInitialize(&ds_ckpt, &callbacks, &ver); + + if (err != SA_AIS_OK) + ret = -1; + else + ds_ready= 1; + + pthread_mutex_unlock(&ds_mutex); + + if (ret != 0) + errno = ais_to_posix(err); + return ret; +} + + +int +ds_write(char *keyid, void *buf, size_t maxlen) +{ + key_node_t *node; + SaCkptIOVectorElementT iov = {SA_CKPT_DEFAULT_SECTION_ID, + NULL, 0, 0, 0}; + SaAisErrorT err; + + //printf("writing to ckpt %s\n", keyid); + + pthread_mutex_lock(&ds_mutex); + + while ((node = kn_find_key(keyid)) == NULL) { + + err = ds_key_init_nt(keyid, + (maxlen>DS_MIN_SIZE?maxlen:DS_MIN_SIZE), 5); + if (err != SA_AIS_OK) + goto out; + } + + iov.dataBuffer = buf; + iov.dataSize = (SaSizeT)maxlen; + iov.dataOffset = 0; + iov.readSize = 0; + + err = saCkptCheckpointWrite(node->kn_cph, &iov, 1, NULL); + + if (err == SA_AIS_OK) + saCkptCheckpointSynchronize(node->kn_cph, node->kn_timeout); + +out: + pthread_mutex_unlock(&ds_mutex); + + errno = ais_to_posix(err); + if (errno) + return -1; + return maxlen; /* XXX */ +} + + +int +ds_read(char *keyid, void *buf, size_t maxlen) +{ + key_node_t *node; + SaCkptIOVectorElementT iov = {SA_CKPT_DEFAULT_SECTION_ID, + NULL, 0, 0, 0}; + SaAisErrorT err; + + //printf("reading ckpt %s\n", keyid); + + pthread_mutex_lock(&ds_mutex); + + node = kn_find_key(keyid); + if (!node) { + pthread_mutex_unlock(&ds_mutex); + errno = ENOENT; + return -1; + } + + iov.dataBuffer = buf; + iov.dataSize = (SaSizeT)maxlen; + iov.dataOffset = 0; + iov.readSize = 0; + + err = saCkptCheckpointRead(node->kn_cph, &iov, 1, NULL); + + pthread_mutex_unlock(&ds_mutex); + + errno = ais_to_posix(err); + if (errno) + return -1; + return iov.readSize; /* XXX */ +} + + +int +ds_finish(void) +{ + int ret = 0; + SaAisErrorT err; + key_node_t *node; + + pthread_mutex_lock(&ds_mutex); + if (!ds_ready) { + pthread_mutex_unlock(&ds_mutex); + return 0; + } + + /* Zap all the checkpoints */ + for (node = key_list; node; node = node->kn_next) { + ds_key_cleanup(node); + } + + err = saCkptFinalize(ds_ckpt); + + if (err != SA_AIS_OK) + ret = -1; + else + ds_ready = 0; + + pthread_mutex_unlock(&ds_mutex); + + if (ret != 0) + errno = ais_to_posix(err); + return ret; +} + + +#ifdef STANDALONE +void +usage(int ret) +{ + printf("usage: ckpt <-r key|-w key -d data>\n"); + exit(ret); +} + +int +main(int argc, char **argv) +{ + char *keyid = "testing"; + char *val; + char buf[DS_MIN_SIZE]; + int ret; + int op = 0; + + while((ret = getopt(argc, argv, "w:r:d:j?")) != EOF) { + switch(ret) { + case 'w': + op = 'w'; + keyid = optarg; + break; + case 'r': + op = 'r'; + keyid = optarg; + break; + case 'd': + val = optarg; + break; + case '?': + case 'h': + usage(0); + default: + usage(1); + } + } + + if (!op) { + usage(1); + } + + if (!keyid) { + usage(1); + } + + if (ds_init() < 0) { + perror("ds_init"); + return 1; + } + + if (ds_key_init(keyid, DS_MIN_SIZE, 5) < 0) { + perror("ds_key_init"); + return 1; + } + + if (op == 'w') { + if (ds_write(keyid, val, strlen(val)+1) < 0) { + perror("ds_write"); + return 1; + } + } else if (op == 'r') { + ret = ds_read(keyid, buf, sizeof(buf)); + if (ret < 0) { + perror("ds_write"); + return 1; + } + + printf("%d bytes\nDATA for '%s':\n%s\n", ret, keyid, + buf); + } + + ds_key_finish(keyid); + + if (ds_finish() < 0) { + perror("ds_finish"); + return 0; + } + + return 0; +} +#endif --- cluster/rgmanager/src/clulib/alloc.c 2006/07/11 23:52:41 1.8 +++ cluster/rgmanager/src/clulib/alloc.c 2006/08/07 22:05:01 1.9 @@ -885,6 +885,12 @@ #endif void *newp; + if (oldp) { + oldb = block(oldp); + if (newsize <= oldb->mb_size) + return oldp; + } + newp = malloc(newsize); if (!newp) { --- cluster/rgmanager/src/clulib/cman.c 2006/07/11 23:52:41 1.1 +++ cluster/rgmanager/src/clulib/cman.c 2006/08/07 22:05:01 1.2 @@ -32,7 +32,7 @@ #include #include -static cman_handle_t *_chandle = NULL; +static cman_handle_t _chandle = NULL; static pthread_mutex_t _chandle_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t _chandle_cond = PTHREAD_COND_INITIALIZER; static pthread_t _chandle_holder = 0; @@ -60,7 +60,7 @@ @return NULL / errno on failure; the global CMAN handle on success. */ -cman_handle_t * +cman_handle_t cman_lock(int block, int preempt) { pthread_t tid; @@ -110,7 +110,7 @@ @return NULL / errno on failure; the global CMAN handle on success. */ -cman_handle_t * +cman_handle_t cman_lock_preemptible(int block, int *preempt_fd) { pthread_t tid; @@ -160,7 +160,7 @@ @return -1 on failure, 0 on success */ int -cman_unlock(cman_handle_t *ch) +cman_unlock(cman_handle_t ch) { int ret = -1; char c; --- cluster/rgmanager/src/clulib/message.c 2006/07/11 23:52:41 1.2 +++ cluster/rgmanager/src/clulib/message.c 2006/08/07 22:05:01 1.3 @@ -251,8 +251,17 @@ void msg_print(msgctx_t *ctx) { + if (!ctx) { + printf("Attempt to call %s on NULL\n", __FUNCTION__); + return; + } + if (ctx->ops && ctx->ops->mo_print) return ctx->ops->mo_print(ctx); + + printf("Warning: Attempt to call %s on uninitialized context %p\n", + __FUNCTION__, ctx); + printf(" ctx->type = %d\n", ctx->type); } --- cluster/rgmanager/src/clulib/msg_cluster.c 2006/07/11 23:52:41 1.1 +++ cluster/rgmanager/src/clulib/msg_cluster.c 2006/08/07 22:05:01 1.2 @@ -31,10 +31,10 @@ #include #include #include +#include #include /* Ripped from ccsd's setup_local_socket */ -#define MAX_CONTEXTS 32 /* Testing; production should be 1024-ish */ int cluster_msg_close(msgctx_t *ctx); @@ -55,6 +55,24 @@ (ctx->u.local_info.sockfd != -1))) static msg_ops_t cluster_msg_ops; +static void cluster_msg_print(msgctx_t *ctx); + + +#define proto_error(ctx, msg, str) \ +do { \ + printf("<<< CUT HERE >>>\n"); \ + printf("[%d] PROTOCOL ERROR in %s: %s\n", gettid(), __FUNCTION__, str); \ + msg_print(ctx); \ + if (msg) { \ + printf(" msg->msg_control = %d\n", ((cluster_msg_hdr_t *)msg)->msg_control); \ + printf(" msg->src_ctx = %d\n", ((cluster_msg_hdr_t *)msg)->src_ctx); \ + printf(" msg->dest_ctx = %d\n", ((cluster_msg_hdr_t *)msg)->dest_ctx); \ + printf(" msg->src_nodeid = %d\n", ((cluster_msg_hdr_t *)msg)->src_nodeid); \ + printf(" msg->msg_port = %d\n", ((cluster_msg_hdr_t *)msg)->msg_port); \ + } \ + printf(">>> CUT HERE <<<\n"); \ +} while(0) + static int @@ -76,6 +94,7 @@ } h->msg_control = M_DATA; + h->dest_nodeid = ctx->u.cluster_info.nodeid; h->src_ctx = ctx->u.cluster_info.local_ctx; h->dest_ctx = ctx->u.cluster_info.remote_ctx; h->msg_port = ctx->u.cluster_info.port; @@ -84,7 +103,8 @@ /* printf("sending cluster message, length = %d to nodeid %d port %d\n", - len + sizeof(*h), ctx->u.cluster_info.nodeid, ctx->u.cluster_info.port); + len + sizeof(*h), ctx->u.cluster_info.nodeid, + ctx->u.cluster_info.port); */ swab_cluster_msg_hdr_t(h); @@ -93,7 +113,14 @@ ctx->u.cluster_info.nodeid, ctx->u.cluster_info.port, 0); - return len + sizeof(h); + if (ret < 0) + return ret; + + if (ret >= (len + sizeof(*h))) + return len; + + errno = EAGAIN; + return -1; } @@ -116,8 +143,8 @@ context_index = 1; if (!contexts[context_index]) { - contexts[start] = ctx; - ctx->u.cluster_info.local_ctx = start; + contexts[context_index] = ctx; + ctx->u.cluster_info.local_ctx = context_index; pthread_mutex_unlock(&context_lock); return 0; } @@ -141,7 +168,7 @@ int fd, lfd, max; struct timeval tv; struct timeval *p = NULL; - cman_handle_t *ch; + cman_handle_t ch; if (timeout >= 0) { p = &tv; @@ -171,7 +198,7 @@ FD_SET(lfd, &rfds); max = (lfd > fd ? lfd : fd); - if (select(fd + 1, &rfds, NULL, NULL, p) > 0) { + if (select(max + 1, &rfds, NULL, NULL, p) > 0) { /* Someone woke us up */ if (FD_ISSET(lfd, &rfds)) { cman_unlock(ch); @@ -199,6 +226,7 @@ int ret; cm.msg_control = (uint8_t)type; + cm.dest_nodeid = ctx->u.cluster_info.nodeid; cm.src_nodeid = _me; cm.dest_ctx = ctx->u.cluster_info.remote_ctx; cm.src_ctx = ctx->u.cluster_info.local_ctx; @@ -515,7 +543,15 @@ free(n); return 0; default: - printf("PROTOCOL ERROR: Received %d\n", req); + pthread_mutex_lock(&ctx->u.cluster_info.mutex); + n = ctx->u.cluster_info.queue; + list_remove(&ctx->u.cluster_info.queue, n); + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + + proto_error(ctx, n->message, "Illegal request on established pchannel"); + if (n->message) + free(n->message); + free(n); return -1; } @@ -570,6 +606,7 @@ /* Send open */ //printf(" Sending control message M_OPEN\n"); if (cluster_send_control_msg(ctx, M_OPEN) < 0) { + printf("Error sending control message\n"); return -1; } @@ -590,8 +627,7 @@ case M_NONE: continue; default: - printf("PROTO ERROR: M_OPEN_ACK not received: %d %d\n", - ret, errno); + proto_error(ctx, NULL, "M_OPEN_ACK not received\n"); } } @@ -630,6 +666,7 @@ return -1; } + pthread_mutex_lock(&context_lock); /* Other threads should not be able to see this again */ if (contexts[ctx->u.cluster_info.local_ctx] && @@ -663,6 +700,8 @@ } ctx->type = MSG_NONE; ctx->ops = NULL; + + return 0; } @@ -672,6 +711,11 @@ { msg_q_t *node; + if (ctx->type == MSG_NONE) { + printf("Queue_for_context called w/o valid context\n"); + raise(SIGSEGV); + } + while ((node = malloc(sizeof(*node))) == NULL) { sleep(1); } @@ -744,6 +788,14 @@ return; } + if (m->dest_nodeid != 0 && m->dest_nodeid != _me) { +#ifdef DEBUG + printf("Skipping message meant for node %d (I am %d)\n", + m->dest_nodeid, _me); +#endif + return; + } + pthread_mutex_lock(&context_lock); if (m->dest_ctx == 0 && m->msg_control == M_DATA) { @@ -762,7 +814,19 @@ queue_for_context(contexts[x], buf, len); } } else if (contexts[m->dest_ctx]) { - /* Normal receive */ + +#if 0 + if (m->msg_control == M_OPEN_ACK) { + for (x = 0; x < MAX_CONTEXTS; x++) { + if (contexts[x] && + contexts[x]->dest_ctx == m->src_ctx) { + proto_error(contexts[x], m, + "Duplicate M_OPEN_ACK"); + } + } + } +#endif + queue_for_context(contexts[m->dest_ctx], buf, len); } /* If none of the above, then we msg for something we've already @@ -964,9 +1028,9 @@ { int e; pthread_attr_t attrs; - cman_handle_t *ch = NULL; + cman_handle_t ch = NULL; msgctx_t *ctx; - int port; + uint8_t port; errno = EINVAL; if (!portp) @@ -988,12 +1052,12 @@ memset(contexts, 0, sizeof(contexts)); - *cluster_ctx = ctx; - if (cman_start_recv_data(ch, process_cman_msg, - port) != 0) { + if (cman_start_recv_data(ch, process_cman_msg, port) != 0) { e = errno; cman_unlock(ch); - msg_free_ctx((msgctx_t *)*cluster_ctx); + msg_free_ctx((msgctx_t *)ctx); + + printf("Doom\n"); errno = e; return -1; } @@ -1001,8 +1065,9 @@ if (cman_start_notification(ch, process_cman_event) != 0) { e = errno; cman_unlock(ch); - msg_free_ctx((msgctx_t *)*cluster_ctx); + msg_free_ctx((msgctx_t *)ctx); errno = e; + return -1; } cman_unlock(ch); @@ -1027,6 +1092,8 @@ ctx->flags = SKF_LISTEN | SKF_READ | SKF_WRITE | SKF_MCAST; pthread_mutex_unlock(&context_lock); + *cluster_ctx = ctx; + pthread_attr_init(&attrs); pthread_attr_setinheritsched(&attrs, PTHREAD_INHERIT_SCHED); pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED); @@ -1057,7 +1124,7 @@ int cluster_msg_shutdown(void) { - cman_handle_t *ch; + cman_handle_t ch; ch = cman_lock(1, SIGUSR2); cman_end_recv_data(ch); --- cluster/rgmanager/src/clulib/msgtest.c 2006/07/11 23:52:41 1.1 +++ cluster/rgmanager/src/clulib/msgtest.c 2006/08/07 22:05:01 1.2 @@ -28,7 +28,7 @@ #include #include -#define MYPORT 190 +#define MYPORT 67 int my_node_id = 0; int running = 1; @@ -121,7 +121,7 @@ void -clu_initialize(cman_handle_t **ch) +clu_initialize(cman_handle_t *ch) { if (!ch) exit(1); @@ -198,23 +198,34 @@ pthread_t piggy, priv; fd_set rfds; int max = 0; - int port = MYPORT; - cman_handle_t *clu = NULL; + uint8_t port = MYPORT; + cman_handle_t clu = NULL; clu_initialize(&clu); + if (clu == NULL) { + printf("Failed to connect to CMAN\n"); + } + if (cman_init_subsys(clu) < 0) { perror("cman_init_subsys"); return -1; } - cman_get_node(clu, CMAN_NODEID_US, &me); + + memset(&me, 0, sizeof(me)); + + if (cman_get_node(clu, CMAN_NODEID_US, &me) < 0) { + perror("cman_get_node"); + return -1; + } my_node_id = me.cn_nodeid; + printf("I am node ID %d\n", my_node_id); - if (msg_listen(MSG_CLUSTER, (void *)&port, - me.cn_nodeid, &cluster_ctx) < 0) { - printf("Couldn't set up cluster message system\n"); + if (msg_listen(MSG_CLUSTER, (void *)&port, me.cn_nodeid, &cluster_ctx) < 0) { + printf("Couldn't set up cluster message system: %s\n", + strerror(errno)); return -1; } --- cluster/rgmanager/src/clulib/vft.c 2006/07/11 23:52:41 1.14 +++ cluster/rgmanager/src/clulib/vft.c 2006/08/07 22:05:01 1.15 @@ -819,7 +819,8 @@ } swab_generic_msg_hdr(hdrp); - if (hdrp->gh_command == VF_MESSAGE) { + if (hdrp->gh_command == VF_MESSAGE && + hdrp->gh_arg1 != VF_CURRENT) { if (vf_process_msg(ctx, 0, hdrp, n) == VFR_COMMIT) { #ifdef DEBUG printf("VFT: View committed\n"); --- cluster/rgmanager/src/daemons/main.c 2006/07/11 23:52:41 1.26 +++ cluster/rgmanager/src/daemons/main.c 2006/08/07 22:05:01 1.27 @@ -58,6 +58,7 @@ int shutdown_pending = 0, running = 1, need_reconfigure = 0; char debug = 0; /* XXX* */ static int signalled = 0; +static int port = RG_PORT; uint64_t next_node_id(cluster_member_list_t *membership, uint64_t me); int rg_event_q(char *svcName, uint32_t state, int owner); @@ -135,6 +136,38 @@ old_membership = member_list(); new_ml = get_member_list(h); + + for (x = 0; x < new_ml->cml_count; x++) { + + if (new_ml->cml_members[x].cn_member == 0) + continue; + if (new_ml->cml_members[x].cn_nodeid == my_id()) + continue; + + do { + quorate = cman_is_listening(h, + new_ml->cml_members[x].cn_nodeid, + port); + if (quorate == 0) { + clulog(LOG_DEBUG, "Node %d is not listening\n", + new_ml->cml_members[x].cn_nodeid); + new_ml->cml_members[x].cn_member = 0; + } else if (quorate == -1 && errno == EBUSY) { + usleep(50000); + continue; + } + + if (quorate < 0) { + perror("cman_is_listening"); + } + + if (quorate > 0) { + printf("Node %d IS listenin\n", new_ml->cml_members[x].cn_nodeid); + } + + } while(0); + } + member_list_update(new_ml); cman_finish(h); @@ -295,11 +328,21 @@ { int ret; char state; +#ifdef OPENAIS + msgctx_t everyone; +#else cluster_member_list_t *m = member_list(); +#endif state = (req==RG_LOCK)?1:0; + +#ifdef OPENAIS + ret = ds_write("rg_lockdown", &state, 1); + clulog(LOG_INFO, "FIXME: send RG_LOCK update to all!\n"); +#else ret = vf_write(m, VFF_IGN_CONN_ERRORS, "rg_lockdown", &state, 1); free_member_list(m); +#endif if (ret == 0) { msg_send_simple(ctx, RG_SUCCESS, 0, 0); @@ -331,7 +374,7 @@ memset(msgbuf, 0, sizeof(msgbuf)); /* Peek-a-boo */ - sz = msg_receive(ctx, msg_hdr, sizeof(msgbuf), 10); + sz = msg_receive(ctx, msg_hdr, sizeof(msgbuf), 1); if (sz < sizeof (generic_msg_hdr)) { clulog(LOG_ERR, "#37: Error receiving message header (%d)\n", sz); goto out; @@ -447,7 +490,10 @@ break; case VF_MESSAGE: - /* Ignore; our VF thread handles these */ + /* Ignore; our VF thread handles these + - except for VF_CURRENT XXX (bad design) */ + if (msg_hdr->gh_arg1 == VF_CURRENT) + vf_process_msg(ctx, 0, msg_hdr, sz); break; default: @@ -478,7 +524,6 @@ ret = msg_wait(ctx, 0); - switch(ret) { case M_PORTOPENED: msg_receive(ctx, NULL, 0, 0); @@ -706,7 +751,7 @@ void -clu_initialize(cman_handle_t **ch) +clu_initialize(cman_handle_t *ch) { if (!ch) exit(1); @@ -765,10 +810,8 @@ cman_node_t me; msgctx_t *cluster_ctx; msgctx_t *local_ctx; - int port = RG_PORT; pthread_t th; - - cman_handle_t *clu = NULL; + cman_handle_t clu = NULL; while ((rv = getopt(argc, argv, "fd")) != EOF) { switch (rv) { @@ -809,8 +852,16 @@ memset(&me, 0, sizeof(me)); cman_get_node(clu, CMAN_NODEID_US, &me); + + if (me.cn_nodeid == 0) { + printf("Unable to determine local node ID\n"); + perror("cman_get_node"); + return -1; + } set_my_id(me.cn_nodeid); + clulog(LOG_INFO, "I am node #%d\n", my_id()); + /* We know we're quorate. At this point, we need to read the resource group trees from ccsd. @@ -842,7 +893,7 @@ return -1; } - if (msg_listen(MSG_CLUSTER, &port , me.cn_nodeid, &cluster_ctx) < 0) { + if (msg_listen(MSG_CLUSTER, &port, me.cn_nodeid, &cluster_ctx) < 0) { clulog(LOG_CRIT, "#10b: Couldn't set up cluster message system: %s\n", strerror(errno)); @@ -859,12 +910,21 @@ /* Initialize the VF stuff. */ - if (vf_init(me.cn_nodeid, RG_PORT, NULL, NULL) != 0) { +#ifdef OPENAIS + if (ds_init() < 0) { + clulog(LOG_CRIT, "#11b: Couldn't initialize SAI AIS CKPT\n"); + return -1; + } + + ds_key_init("rg_lockdown", 32, 10); +#else + if (vf_init(me.cn_nodeid, port, NULL, NULL) != 0) { clulog(LOG_CRIT, "#11: Couldn't set up VF listen socket\n"); return -1; } vf_key_init("rg_lockdown", 10, NULL, lock_commit_cb); +#endif /* Do everything useful --- cluster/rgmanager/src/daemons/rg_forward.c 2006/07/11 23:52:41 1.5 +++ cluster/rgmanager/src/daemons/rg_forward.c 2006/08/07 22:05:01 1.6 @@ -91,7 +91,7 @@ pthread_exit(NULL); } - if (msg_receive(&ctx, &msg, sizeof(msg),10) != sizeof(msg)) { + if (msg_receive(&ctx, &msg, sizeof(msg), 600) != sizeof(msg)) { msg_close(&ctx); msg_close(req->rr_resp_ctx); msg_free_ctx(req->rr_resp_ctx); --- cluster/rgmanager/src/daemons/rg_state.c 2006/07/19 18:43:32 1.18 +++ cluster/rgmanager/src/daemons/rg_state.c 2006/08/07 22:05:01 1.19 @@ -21,7 +21,11 @@ #include #include #include +#ifdef OPENAIS +#include +#else #include +#endif #include #include #include @@ -147,7 +151,7 @@ { char res[256]; - snprintf(res, sizeof(res), "usrm::rg=\"%s\"", name); + snprintf(res, sizeof(res), "rg=\"%s\"", name); return clu_lock(LKM_EXMODE, p, 0, res); } @@ -228,8 +232,9 @@ swab_SmMessageSt(msgp); msg_send(req->rr_resp_ctx, msgp, sizeof(*msgp)); - /* :) */ + /* :( */ msg_close(req->rr_resp_ctx); + msg_free_ctx(req->rr_resp_ctx); req->rr_resp_ctx = NULL; } @@ -237,19 +242,27 @@ int set_rg_state(char *name, rg_state_t *svcblk) { - cluster_member_list_t *membership; char res[256]; +#ifndef OPENAIS + cluster_member_list_t *membership; int ret; +#endif if (name) strncpy(svcblk->rs_name, name, sizeof(svcblk->rs_name)); + snprintf(res, sizeof(res), "rg=\"%s\"", name); +#ifdef OPENAIS + if (ds_write(res, svcblk, sizeof(*svcblk)) < 0) + return -1; + return 0; +#else membership = member_list(); - snprintf(res, sizeof(res), "usrm::rg=\"%s\"", name); ret = vf_write(membership, VFF_IGN_CONN_ERRORS, res, svcblk, sizeof(*svcblk)); free_member_list(membership); return ret; +#endif } @@ -272,18 +285,50 @@ { char res[256]; int ret; - void *data = NULL; - uint32_t datalen = 0; +#ifdef OPENAIS + char data[DS_MIN_SIZE]; + int datalen; +#else uint64_t viewno; + void *data = NULL; cluster_member_list_t *membership; + uint32_t datalen = 0; +#endif /* ... */ if (name) strncpy(svcblk->rs_name, name, sizeof(svcblk->rs_name)); - membership = member_list(); + snprintf(res, sizeof(res),"rg=\"%s\"", svcblk->rs_name); - snprintf(res, sizeof(res),"usrm::rg=\"%s\"", svcblk->rs_name); +#ifdef OPENAIS + while((datalen = ds_read(res, data, sizeof(data))) < 0) { + if (errno == ENOENT) { + ds_key_init(res, DS_MIN_SIZE, 10); + } else { + return -1; + } + } + + if (datalen < 0) { + + ret = init_rg(name, svcblk); + if (ret < 0) { + printf("Couldn't initialize rg %s!\n", name); + return RG_EFAIL; + } + + datalen = ds_read(res, &data, sizeof(data)); + if (ret < 0) { + printf("Couldn't reread rg %s! (%d)\n", name, ret); + return RG_EFAIL; + } + } + + memcpy(svcblk, data, sizeof(*svcblk)); + return 0; +#else + membership = member_list(); ret = vf_read(membership, res, &viewno, &data, &datalen); if (ret != VFR_OK || datalen == 0) { @@ -307,7 +352,7 @@ } } - if (datalen != sizeof(*svcblk)) { + if (datalen < sizeof(*svcblk)) { printf("Size mismatch; expected %d got %d\n", (int)sizeof(*svcblk), datalen); if (data) @@ -322,6 +367,7 @@ free_member_list(membership); return 0; +#endif } @@ -331,22 +377,32 @@ { char res[256]; int ret; +#ifdef OPENAIS + char data[1024]; + int datalen; +#else void *data = NULL; - uint32_t datalen = 0; uint64_t viewno; + uint32_t datalen; +#endif /* ... */ if (name) strncpy(svcblk->rs_name, name, sizeof(svcblk->rs_name)); - snprintf(res, sizeof(res),"usrm::rg=\"%s\"", svcblk->rs_name); + snprintf(res, sizeof(res),"rg=\"%s\"", svcblk->rs_name); + +#ifdef OPENAIS + ret = ds_read(res, data, sizeof(data)); + if (ret <= 0) { +#else ret = vf_read_local(res, &viewno, &data, &datalen); if (ret != VFR_OK || datalen == 0 || datalen != sizeof(*svcblk)) { if (data) free(data); - +#endif svcblk->rs_owner = 0; svcblk->rs_last_owner = 0; svcblk->rs_state = RG_STATE_UNINITIALIZED; @@ -359,7 +415,9 @@ /* Copy out the data. */ memcpy(svcblk, data, sizeof(*svcblk)); +#ifndef OPENAIS free(data); +#endif return 0; } @@ -693,7 +751,7 @@ else svcStatus.rs_restarts = 0; - if (set_rg_state(svcName, &svcStatus) != 0) { + if (set_rg_state(svcName, &svcStatus) < 0) { clulog(LOG_ERR, "#47: Failed changing service status\n"); rg_unlock(&lockp); @@ -1318,9 +1376,8 @@ /* state uncertain */ free_member_list(allowed_nodes); clulog(LOG_DEBUG, "State Uncertain: svc:%s " - "nid:%08x%08x req:%d\n", svcName, - (uint32_t)(target>>32)&0xffffffff, - (uint32_t)(target&0xffffffff), request); + "nid:%08x req:%d\n", svcName, + target, request); return 0; case 0: *new_owner = target; --- cluster/rgmanager/src/utils/clustat.c 2006/07/11 23:52:41 1.18 +++ cluster/rgmanager/src/utils/clustat.c 2006/08/07 22:05:01 1.19 @@ -168,24 +168,31 @@ while ((ret = malloc(sizeof(*ret))) == NULL) sleep(1); - x = 1; - while (1) { + x = 0; + while (++x) { + name = NULL; snprintf(buf, sizeof(buf), "/cluster/clusternodes/clusternode[%d]/@name", x); if (ccs_get(desc, buf, &name) != 0) break; + if (!name) + break; + if (!strlen(name)) { + free(name); + continue; + } + if (!nodes) { nodes = malloc(x * sizeof(cman_node_t)); - if (!nodes ) { + if (!nodes) { perror("malloc"); ccs_disconnect(desc); exit(1); } - memset(nodes, 0, x * sizeof(cman_node_t)); } else { - nodes = realloc(ret, x * sizeof(cman_node_t)); + nodes = realloc(nodes, x * sizeof(cman_node_t)); if (!nodes) { perror("realloc"); ccs_disconnect(desc); @@ -207,14 +214,11 @@ } ret->cml_count = x; - ++x; } ccs_disconnect(desc); - ret->cml_members = nodes; - return ret; } @@ -615,7 +619,6 @@ /* Flag online nodes */ flag_nodes(all, part, FLAG_UP); - free_member_list(part); } else { /* not root - keep it simple for the next block */