From mboxrd@z Thu Jan 1 00:00:00 1970 From: lhh@sourceware.org Date: 11 Jul 2006 23:52:46 -0000 Subject: [Cluster-devel] cluster/rgmanager/src clulib/Makefile clulib/a ... Message-ID: <20060711235246.32496.qmail@sourceware.org> List-Id: To: cluster-devel.redhat.com MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit CVSROOT: /cvs/cluster Module name: cluster Changes by: lhh at sourceware.org 2006-07-11 23:52:42 Modified files: rgmanager/src/clulib: Makefile alloc.c lock.c lockspace.c members.c message.c msgsimple.c rg_strings.c signals.c vft.c rgmanager/src/daemons: Makefile fo_domain.c groups.c main.c nodeevent.c reslist.c restree.c rg_forward.c rg_state.c test.c rgmanager/src/resources: service.sh rgmanager/src/utils: clustat.c clusvcadm.c Added files: rgmanager/src/clulib: cman.c locktest.c msg_cluster.c msg_socket.c msgtest.c rgmanager/src/daemons: rg_event.c Log message: - Make rgmanager actually do things. - Finish port of rgmanager to CMAN messaging. - Add feature to wait for nodes to be fenced prior to handling a node-down event. - Add direct DLM lock support. - Fix local communication. - Optimize VF data distribution algorithm to use CMAN/Totem's broadcast mode; this should make rgmanager much more scalable. - Add multiplexing for CMAN communications so threads can have pseudo private channels over the One CMAN socket. - Add service->service dependencies based on service events. - Add node ID display to clustat text-mode output. Patches: http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/cman.c.diff?cvsroot=cluster&r1=NONE&r2=1.1 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/locktest.c.diff?cvsroot=cluster&r1=NONE&r2=1.1 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/msg_cluster.c.diff?cvsroot=cluster&r1=NONE&r2=1.1 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/msg_socket.c.diff?cvsroot=cluster&r1=NONE&r2=1.1 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/msgtest.c.diff?cvsroot=cluster&r1=NONE&r2=1.1 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/Makefile.diff?cvsroot=cluster&r1=1.8&r2=1.9 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/alloc.c.diff?cvsroot=cluster&r1=1.7&r2=1.8 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/lock.c.diff?cvsroot=cluster&r1=1.1&r2=1.2 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/lockspace.c.diff?cvsroot=cluster&r1=1.1&r2=1.2 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/members.c.diff?cvsroot=cluster&r1=1.1&r2=1.2 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/message.c.diff?cvsroot=cluster&r1=1.1&r2=1.2 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/msgsimple.c.diff?cvsroot=cluster&r1=1.5&r2=1.6 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/rg_strings.c.diff?cvsroot=cluster&r1=1.3&r2=1.4 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/signals.c.diff?cvsroot=cluster&r1=1.1&r2=1.2 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/vft.c.diff?cvsroot=cluster&r1=1.13&r2=1.14 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/rg_event.c.diff?cvsroot=cluster&r1=NONE&r2=1.1 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/Makefile.diff?cvsroot=cluster&r1=1.12&r2=1.13 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/fo_domain.c.diff?cvsroot=cluster&r1=1.8&r2=1.9 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/groups.c.diff?cvsroot=cluster&r1=1.18&r2=1.19 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/main.c.diff?cvsroot=cluster&r1=1.25&r2=1.26 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/nodeevent.c.diff?cvsroot=cluster&r1=1.2&r2=1.3 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/reslist.c.diff?cvsroot=cluster&r1=1.13&r2=1.14 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/restree.c.diff?cvsroot=cluster&r1=1.19&r2=1.20 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/rg_forward.c.diff?cvsroot=cluster&r1=1.4&r2=1.5 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/rg_state.c.diff?cvsroot=cluster&r1=1.16&r2=1.17 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/test.c.diff?cvsroot=cluster&r1=1.4&r2=1.5 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/resources/service.sh.diff?cvsroot=cluster&r1=1.5&r2=1.6 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/utils/clustat.c.diff?cvsroot=cluster&r1=1.17&r2=1.18 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/utils/clusvcadm.c.diff?cvsroot=cluster&r1=1.8&r2=1.9 /cvs/cluster/cluster/rgmanager/src/clulib/cman.c,v --> standard output revision 1.1 --- cluster/rgmanager/src/clulib/cman.c +++ - 2006-07-11 23:52:43.661578000 +0000 @@ -0,0 +1,264 @@ +/* + Copyright Red Hat, Inc. 2006 + + This program is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by the + Free Software Foundation; either version 2, or (at your option) any + later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 675 Mass Ave, Cambridge, + MA 02139, USA. +*/ +/** + pthread mutex wrapper for a global CMAN handle + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static cman_handle_t *_chandle = NULL; +static pthread_mutex_t _chandle_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t _chandle_cond = PTHREAD_COND_INITIALIZER; +static pthread_t _chandle_holder = 0; +static int _chandle_preempt = 0; +static int _wakeup_pipe[2] = { -1, -1 }; + +static void +_set_nonblock(int fd) +{ + int flags; + + flags = fcntl(fd, F_GETFL, 0); + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) + perror("fcntl"); +} + + +/** + Lock / return the global CMAN handle. + + @param block If nonzero, we wait until the handle is released + @param preempt If nonzero, *try* to wake up the holder who has + taken the lock with cman_lock_preemptible. Will not + wake up holders which took it with cman_lock(). + @return NULL / errno on failure; the global CMAN handle + on success. + */ +cman_handle_t * +cman_lock(int block, int preempt) +{ + pthread_t tid; + cman_handle_t *ret = NULL; + + pthread_mutex_lock(&_chandle_lock); + if (_chandle == NULL) { + errno = ENOSYS; + goto out_unlock; + } + + tid = pthread_self(); + if (_chandle_holder == tid) { + errno = EDEADLK; + goto out_unlock; + } + + if (_chandle_holder > 0) { + if (!block) { + errno = EAGAIN; + goto out_unlock; + } + + /* Try to wake up the holder! */ + if (preempt) + write(_wakeup_pipe[1], "!", 1); + + /* Blocking call; do the cond-thing */ + pthread_cond_wait(&_chandle_cond, &_chandle_lock); + } + + _chandle_holder = tid; + ret = _chandle; +out_unlock: + pthread_mutex_unlock(&_chandle_lock); + return ret; +} + + +/** + Lock / return the global CMAN handle. + + @param block If nonzero, we wait until the handle is released + @param preempt_fd Caller should include this file descriptor in + blocking calls to select(2), so that we can wake + it up if someone calls with cman_lock(xxx, 1); + @return NULL / errno on failure; the global CMAN handle + on success. + */ +cman_handle_t * +cman_lock_preemptible(int block, int *preempt_fd) +{ + pthread_t tid; + cman_handle_t *ret = NULL; + + if (preempt_fd == NULL) { + errno = EINVAL; + return NULL; + } + + pthread_mutex_lock(&_chandle_lock); + if (_chandle == NULL) { + errno = ENOSYS; + goto out_unlock; + } + + tid = pthread_self(); + if (_chandle_holder == tid) { + errno = EDEADLK; + goto out_unlock; + } + + if (_chandle_holder > 0) { + if (!block) { + errno = EAGAIN; + goto out_unlock; + } + + /* Blocking call; do the cond-thing */ + pthread_cond_wait(&_chandle_cond, &_chandle_lock); + } + + *preempt_fd = _wakeup_pipe[0]; + _chandle_holder = tid; + _chandle_preempt = 1; + ret = _chandle; +out_unlock: + pthread_mutex_unlock(&_chandle_lock); + return ret; +} + + +/** + Release the global CMAN handle + + @param ch Should match the global handle + @return -1 on failure, 0 on success + */ +int +cman_unlock(cman_handle_t *ch) +{ + int ret = -1; + char c; + + pthread_mutex_lock(&_chandle_lock); + if (_chandle == NULL) { + errno = ENOSYS; + goto out_unlock; + } + + if (_chandle_holder != pthread_self() || !_chandle_holder) { + errno = EBUSY; + goto out_unlock; + } + + if (_chandle != ch) { + errno = EINVAL; + goto out_unlock; + } + + /* Empty wakeup pipe if we took it with the preempt flag */ + if (_chandle_preempt) + read(_wakeup_pipe[0], &c, 1); + + _chandle_preempt = 0; + _chandle_holder = 0; + ret = 0; + +out_unlock: + pthread_mutex_unlock(&_chandle_lock); + if (ret == 0) + pthread_cond_broadcast(&_chandle_cond); + return ret; +} + + +int +cman_init_subsys(cman_handle_t *ch) +{ + int ret = -1; + + pthread_mutex_lock(&_chandle_lock); + if (_chandle) { + errno = EAGAIN; + goto out_unlock; + } + + if (!ch) { + errno = EAGAIN; + goto out_unlock; + } + + if (pipe(_wakeup_pipe) < 0) { + goto out_unlock; + } + + _set_nonblock(_wakeup_pipe[0]); + _chandle = ch; + _chandle_holder = 0; + ret = 0; + +out_unlock: + pthread_mutex_unlock(&_chandle_lock); + return ret; +} + + +int +cman_cleanup_subsys(void) +{ + int ret = -1; + + pthread_mutex_lock(&_chandle_lock); + if (!_chandle) { + errno = EAGAIN; + goto out_unlock; + } + + if (_chandle_holder > 0) { + pthread_cond_wait(&_chandle_cond, &_chandle_lock); + } + + ret = 0; + _chandle = NULL; + _chandle_holder = 0; + + close(_wakeup_pipe[0]); + close(_wakeup_pipe[1]); + +out_unlock: + pthread_mutex_unlock(&_chandle_lock); + return ret; +} + + +int +cman_send_data_unlocked(void *buf, int len, int flags, + uint8_t port, int nodeid) +{ + return cman_send_data(_chandle, buf, len, flags, port, nodeid); +} /cvs/cluster/cluster/rgmanager/src/clulib/locktest.c,v --> standard output revision 1.1 --- cluster/rgmanager/src/clulib/locktest.c +++ - 2006-07-11 23:52:43.746107000 +0000 @@ -0,0 +1,85 @@ +/* + Copyright Red Hat, Inc. 2006 + + This program is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by the + Free Software Foundation; either version 2, or (at your option) any + later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 675 Mass Ave, Cambridge, + MA 02139, USA. +*/ +#include +#include +#include +#include +#include +#include + + +void * +lock_thread(void *arg) +{ + struct dlm_lksb lksb; + + while(1) { + printf("Taking lock..\n"); + clu_lock(LKM_EXMODE, &lksb, 0, arg); + printf("Thread acquired lock on %s\n", (char *)arg); + clu_unlock(&lksb); + } +} + + + + +int +main(int argc, char **argv) +{ + struct dlm_lksb lksb; + int ret; + pthread_t th; + + if (clu_lock_init("Testing") != 0) { + perror("clu_lock_init"); + return 1; + } + + if (argc < 2) { + printf("Lock what?\n"); + return 1; + } + + if (argc == 3) { + pthread_create(&th, NULL, lock_thread, strdup(argv[1])); + } + + memset(&lksb,0,sizeof(lksb)); + ret = clu_lock(LKM_EXMODE, &lksb, 0, argv[1]); + if (ret < 0) { + perror("clu_lock"); + return 1; + } + + printf("Acquired lock on %s; press enter to release\n", argv[1]); + getchar(); + + clu_unlock(&lksb); + + if (argc == 3) { + printf("Press enter to kill lock thread...\n"); + getchar(); + pthread_kill(th, SIGTERM); + } + + clu_lock_finished("Testing"); + + return 0; +} /cvs/cluster/cluster/rgmanager/src/clulib/msg_cluster.c,v --> standard output revision 1.1 --- cluster/rgmanager/src/clulib/msg_cluster.c +++ - 2006-07-11 23:52:43.828222000 +0000 @@ -0,0 +1,1104 @@ +/* + Copyright Red Hat, Inc. 2006 + + This program is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by the + Free Software Foundation; either version 2, or (at your option) any + later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 675 Mass Ave, Cambridge, + MA 02139, USA. +*/ +#define _MESSAGE_BUILD +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* Ripped from ccsd's setup_local_socket */ +#define MAX_CONTEXTS 32 /* Testing; production should be 1024-ish */ + +int cluster_msg_close(msgctx_t *ctx); + +/* Context 0 is reserved for control messages */ + +/* Local-ish contexts */ +static pthread_mutex_t context_lock = PTHREAD_MUTEX_INITIALIZER; +static msgctx_t *contexts[MAX_CONTEXTS]; +static int _me = 0; +pthread_t comms_thread; +int thread_running; + +#define is_established(ctx) \ + (((ctx->type == MSG_CLUSTER) && \ + (ctx->u.cluster_info.remote_ctx && \ + ctx->u.cluster_info.local_ctx)) || \ + ((ctx->type == MSG_SOCKET) && \ + (ctx->u.local_info.sockfd != -1))) + +static msg_ops_t cluster_msg_ops; + + +static int +cluster_msg_send(msgctx_t *ctx, void *msg, size_t len) +{ + char buf[4096]; + cluster_msg_hdr_t *h = (void *)buf; + int ret; + char *msgptr = (buf + sizeof(*h)); + + errno = EINVAL; + if (ctx->type != MSG_CLUSTER) + return -1; + if (!(ctx->flags & SKF_WRITE)) + return -1; + if ((len + sizeof(*h)) > sizeof(buf)) { + errno = E2BIG; + return -1; + } + + h->msg_control = M_DATA; + h->src_ctx = ctx->u.cluster_info.local_ctx; + h->dest_ctx = ctx->u.cluster_info.remote_ctx; + h->msg_port = ctx->u.cluster_info.port; + memcpy(msgptr, msg, len); + h->src_nodeid = _me; + + /* + printf("sending cluster message, length = %d to nodeid %d port %d\n", + len + sizeof(*h), ctx->u.cluster_info.nodeid, ctx->u.cluster_info.port); + */ + + swab_cluster_msg_hdr_t(h); + + ret = cman_send_data_unlocked((void *)h, len + sizeof(*h), + ctx->u.cluster_info.nodeid, + ctx->u.cluster_info.port, 0); + + return len + sizeof(h); +} + + +/** + Assign a (free) cluster context ID if possible + */ +static int +assign_ctx(msgctx_t *ctx) +{ + int start; + static uint32_t context_index = 1; + + /* Assign context index */ + pthread_mutex_lock(&context_lock); + + start = context_index; + do { + context_index++; + if (context_index >= MAX_CONTEXTS) + context_index = 1; + + if (!contexts[context_index]) { + contexts[start] = ctx; + ctx->u.cluster_info.local_ctx = start; + pthread_mutex_unlock(&context_lock); + return 0; + } + } while (context_index != start); + + pthread_mutex_unlock(&context_lock); + + errno = EAGAIN; + return -1; +} + + +/* See if anything's on the cluster socket. If so, dispatch it + on to the requisite queues + XXX should be passed a connection arg! */ +static int +poll_cluster_messages(int timeout) +{ + int ret = -1; + fd_set rfds; + int fd, lfd, max; + struct timeval tv; + struct timeval *p = NULL; + cman_handle_t *ch; + + if (timeout >= 0) { + p = &tv; + tv.tv_sec = tv.tv_usec = timeout; + } + + FD_ZERO(&rfds); + + /* This sucks - it could cause other threads trying to get a + membership list to block for a long time. Now, that should not + happen. Basically, when we get a membership event, we generate + a new membership list in a locally cached copy and reference + that. + + */ + ch = cman_lock_preemptible(1, &lfd); + if (!ch) { + printf("%s\n", strerror(errno)); + } + + fd = cman_get_fd(ch); + if (fd < 0) { + cman_unlock(ch); + return 0; + } + FD_SET(fd, &rfds); + FD_SET(lfd, &rfds); + + max = (lfd > fd ? lfd : fd); + if (select(fd + 1, &rfds, NULL, NULL, p) > 0) { + /* Someone woke us up */ + if (FD_ISSET(lfd, &rfds)) { + cman_unlock(ch); + errno = EAGAIN; + return -1; + } + + cman_dispatch(ch, 0); + ret = 0; + } + cman_unlock(ch); + + return ret; +} + + +/** + This is used to establish and tear down pseudo-private + contexts which are shared with the cluster context. + */ +static int +cluster_send_control_msg(msgctx_t *ctx, int type) +{ + cluster_msg_hdr_t cm; + int ret; + + cm.msg_control = (uint8_t)type; + cm.src_nodeid = _me; + cm.dest_ctx = ctx->u.cluster_info.remote_ctx; + cm.src_ctx = ctx->u.cluster_info.local_ctx; + cm.msg_port = ctx->u.cluster_info.port; + + swab_cluster_msg_hdr_t(&cm); + + ret = (cman_send_data_unlocked((void *)&cm, sizeof(cm), + ctx->u.cluster_info.nodeid, + ctx->u.cluster_info.port, 0)); + return ret; +} + + +/** + Wait for a message on a context. + */ +static int +cluster_msg_wait(msgctx_t *ctx, int timeout) +{ + struct timespec ts = {0, 0}; + int req = M_NONE; + int e; + + errno = EINVAL; + if (!ctx) + return -1; + if (ctx->type != MSG_CLUSTER) + return -1; + if (!(ctx->flags & (SKF_READ | SKF_LISTEN))) + return -1; + + if (timeout > 0) { + gettimeofday((struct timeval *)&ts, NULL); + ts.tv_sec += timeout; + ts.tv_nsec *= 1000; + } + + pthread_mutex_lock(&ctx->u.cluster_info.mutex); + while (1) { + + /* See if we dispatched any messages on to our queue */ + if (ctx->u.cluster_info.queue) { + req = ctx->u.cluster_info.queue->message->msg_control; + /*printf("Queue not empty CTX%d : %d\n", + ctx->u.cluster_info.local_ctx, req);*/ + break; + } + + /* Ok, someone else has the mutex on our FD. Go to + sleep on a cond; maybe they'll wake us up */ + e = pthread_cond_timedwait(&ctx->u.cluster_info.cond, + &ctx->u.cluster_info.mutex, + &ts); + + if (timeout == 0) { + break; + } + + if (e == 0) { + continue; + } + + if (e == ETIMEDOUT) { + break; + } + } + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + + return req; +} + + +static int +cluster_msg_fd_set(msgctx_t *ctx, fd_set *fds, int *max) +{ + int e; + msg_q_t *n; + + errno = EINVAL; + if (!ctx || !fds) + return -1; + if (ctx->type != MSG_CLUSTER) + return -1; + + pthread_mutex_lock(&ctx->u.cluster_info.mutex); + if (ctx->u.cluster_info.select_pipe[0] < 0) { + if (pipe(ctx->u.cluster_info.select_pipe) < 0) { + e = errno; + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + errno = e; + return -1; + } + + /* + printf("%s: Created cluster CTX select pipe " + "rd=%d wr=%d\n", __FUNCTION__, + ctx->u.cluster_info.select_pipe[0], + ctx->u.cluster_info.select_pipe[1]); + */ + + /* Ok, we just created the pipe. Now, we need to write + a char for every unprocessed event to the pipe, because + events could be pending that would otherwise be unhandled + by the caller because the caller is switching to select() + semantics. (as opposed to msg_wait() ) */ + list_do(&ctx->u.cluster_info.queue, n) { + write(ctx->u.cluster_info.select_pipe[1], "", 1); + } while (!list_done(&ctx->u.cluster_info.queue, n)); + } + + e = ctx->u.cluster_info.select_pipe[0]; + //printf("%s: cluster %d\n", __FUNCTION__, e); + FD_SET(e, fds); + + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + + if (max && (e > *max)) + *max = e; + return 0; +} + + +int +cluster_msg_fd_isset(msgctx_t *ctx, fd_set *fds) +{ + errno = EINVAL; + + if (!fds || !ctx) + return -1; + + if (ctx->type != MSG_CLUSTER) + return -1; + + pthread_mutex_lock(&ctx->u.cluster_info.mutex); + if (ctx->u.cluster_info.select_pipe[0] >= 0 && + FD_ISSET(ctx->u.cluster_info.select_pipe[0], fds)) { + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + return 1; + } + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + return 0; +} + + +int +cluster_msg_fd_clr(msgctx_t *ctx, fd_set *fds) +{ + errno = EINVAL; + + if (!fds || !ctx) + return -1; + + if (ctx->type != MSG_CLUSTER) + return -1; + + pthread_mutex_lock(&ctx->u.cluster_info.mutex); + if (ctx->u.cluster_info.select_pipe[0] >= 0) { + FD_CLR(ctx->u.cluster_info.select_pipe[0], fds); + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + return 1; + } + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + return 0; +} + + +static int +_cluster_msg_receive(msgctx_t *ctx, void **msg, size_t *len) +{ + cluster_msg_hdr_t *m; + msg_q_t *n; + int ret = 0; + char foo; + + if (msg) + *msg = NULL; + if (len) + *len = 0; + + if (ctx->u.cluster_info.local_ctx < 0 || + ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) { + errno = EBADF; + return -1; + } + + /* trigger receive here */ + pthread_mutex_lock(&ctx->u.cluster_info.mutex); + + n = ctx->u.cluster_info.queue; + if (n == NULL) { + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + errno = EAGAIN; + return -1; + } + + list_remove(&ctx->u.cluster_info.queue, n); + + if (ctx->u.cluster_info.select_pipe[0] >= 0) { + //printf("%s read\n", __FUNCTION__); + read(ctx->u.cluster_info.select_pipe[0], + &foo, 1); + } + + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + + m = n->message; + switch(m->msg_control) { + case M_CLOSE: + ctx->u.cluster_info.remote_ctx = 0; + break; + case M_OPEN_ACK: + /* Response to new connection */ + ctx->u.cluster_info.remote_ctx = m->src_ctx; + break; + case M_DATA: + /* Kill the message control structure */ + memmove(m, &m[1], n->len - sizeof(*m)); + if (msg) + *msg = (void *)m; + else { + printf("Warning: dropping data message\n"); + free(m); + } + if (len) + *len = (n->len - sizeof(*m)); + ret = (n->len - sizeof(*m)); + free(n); + + //printf("Message received\n"); + return ret; + case M_OPEN: + /* Someone is trying to open a connection */ + default: + /* ?!! */ + ret = -1; + break; + } + + free(m); + free(n); + + return ret; +} + + +/** + Receive a message from a cluster-context. This copies out the contents + into the user-specified buffer, and does random other things. + */ +static int +cluster_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout) +{ + int req; + msg_q_t *n; + void *priv_msg; + size_t priv_len; + char foo; + + errno = EINVAL; + if (!ctx) + return -1; + if (!(ctx->flags & SKF_READ)) + return -1; + + req = cluster_msg_wait(ctx, timeout); + + switch (req) { + case M_DATA: + /* Copy out. */ + req = _cluster_msg_receive(ctx, &priv_msg, &priv_len); + if (req < 0) { + printf("Ruh roh!\n"); + return -1; + } + + priv_len = (priv_len < maxlen ? priv_len : maxlen); + + if (msg && maxlen) + memcpy(msg, priv_msg, priv_len); + free(priv_msg); + return req; + case M_CLOSE: + errno = ECONNRESET; + return -1; + case 0: + //printf("Nothing on queue\n"); + return 0; + case M_STATECHANGE: + case M_PORTOPENED: + case M_PORTCLOSED: + case M_TRY_SHUTDOWN: + pthread_mutex_lock(&ctx->u.cluster_info.mutex); + + n = ctx->u.cluster_info.queue; + if (n == NULL) { + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + errno = EAGAIN; + return -1; + } + + list_remove(&ctx->u.cluster_info.queue, n); + + if (ctx->u.cluster_info.select_pipe[0] >= 0) { + //printf("%s read\n", __FUNCTION__); + read(ctx->u.cluster_info.select_pipe[0], + &foo, 1); + } + + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + + if (n->message) + free(n->message); + free(n); + return 0; + default: + printf("PROTOCOL ERROR: Received %d\n", req); + return -1; + } + + printf("%s: CODE PATH ERROR\n", __FUNCTION__); + return -1; +} + + +/** + Open a connection to the specified node ID. + If the speficied node is 0, this connects via the socket in + /var/run/cluster... + */ +static int +cluster_msg_open(int type, int nodeid, int port, msgctx_t *ctx, int timeout) +{ + int t = 0, ret; + + errno = EINVAL; + if (!ctx) + return -1; + + if (type != MSG_CLUSTER) + return -1; + + /*printf("Opening pseudo channel to node %d\n", nodeid);*/ + + ctx->type = MSG_CLUSTER; + ctx->ops = &cluster_msg_ops; + ctx->flags = 0; + ctx->u.cluster_info.nodeid = nodeid; + ctx->u.cluster_info.port = port; + ctx->u.cluster_info.local_ctx = -1; + ctx->u.cluster_info.remote_ctx = 0; + ctx->u.cluster_info.queue = NULL; + + /* Assign context index */ + if (assign_ctx(ctx) < 0) { + errno = EAGAIN; + return -1; + } + ctx->flags = SKF_READ | SKF_WRITE; + + if (nodeid == CMAN_NODEID_US) { + /* Broadcast pseudo ctx; no handshake needed */ + ctx->flags |= SKF_MCAST; + return 0; + } + + //printf(" Local CTX: %d\n", ctx->u.cluster_info.local_ctx); + + /* Send open */ + //printf(" Sending control message M_OPEN\n"); + if (cluster_send_control_msg(ctx, M_OPEN) < 0) { + return -1; + } + + /* Ok, wait for a response */ + while (!is_established(ctx)) { + ++t; + if (t > timeout) { + cluster_msg_close(ctx); + errno = ETIMEDOUT; + return -1; + } + + ret = cluster_msg_wait(ctx, 1); + switch(ret) { + case M_OPEN_ACK: + _cluster_msg_receive(ctx, NULL, NULL); + break; + case M_NONE: + continue; + default: + printf("PROTO ERROR: M_OPEN_ACK not received: %d %d\n", + ret, errno); + } + } + + /* + printf(" Remote CTX: %d\n", + ctx->u.cluster_info.remote_ctx); + printf(" Pseudo channel established!\n"); + */ + return 0; +} + + +/** + Close a connection context (cluster or socket; it doesn't matter) + In the case of a cluster context, we need to clear out the + receive queue and what-not. This isn't a big deal. Also, we + need to tell the other end that we're done -- just in case it does + not know yet ;) + + With a socket, the O/S cleans up the buffers for us. + */ +int +cluster_msg_close(msgctx_t *ctx) +{ + msg_q_t *n = NULL; + + errno = EINVAL; + + if (!ctx) + return -1; + if (ctx->type != MSG_CLUSTER) + return -1; + + if (ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) { + printf("Context invalid during close\n"); + return -1; + } + + pthread_mutex_lock(&context_lock); + /* Other threads should not be able to see this again */ + if (contexts[ctx->u.cluster_info.local_ctx] && + (contexts[ctx->u.cluster_info.local_ctx]->u.cluster_info.local_ctx == + ctx->u.cluster_info.local_ctx)) { + //printf("reclaimed context %d\n", + //ctx->u.cluster_info.local_ctx); + contexts[ctx->u.cluster_info.local_ctx] = NULL; + } + pthread_mutex_unlock(&context_lock); + + /* Clear receive queue */ + while ((n = ctx->u.cluster_info.queue) != NULL) { + list_remove(&ctx->u.cluster_info.queue, n); + free(n->message); + free(n); + } + /* Send close message */ + if (ctx->u.cluster_info.remote_ctx != 0) { + cluster_send_control_msg(ctx, M_CLOSE); + } + + /* Close pipe if it's open */ + if (ctx->u.cluster_info.select_pipe[0] >= 0) { + close(ctx->u.cluster_info.select_pipe[0]); + ctx->u.cluster_info.select_pipe[0] = -1; + } + if (ctx->u.cluster_info.select_pipe[1] >= 0) { + close(ctx->u.cluster_info.select_pipe[1]); + ctx->u.cluster_info.select_pipe[1] = -1; + } + ctx->type = MSG_NONE; + ctx->ops = NULL; + return 0; +} + + +static void +queue_for_context(msgctx_t *ctx, char *buf, int len) +{ + msg_q_t *node; + + while ((node = malloc(sizeof(*node))) == NULL) { + sleep(1); + } + memset(node, 0, sizeof(*node)); + while ((node->message = malloc(len)) == NULL) { + sleep(1); + } + memcpy(node->message, buf, len); + node->len = len; + + pthread_mutex_lock(&ctx->u.cluster_info.mutex); + list_insert(&ctx->u.cluster_info.queue, node); + /* If a select pipe was set up, wake it up */ + if (ctx->u.cluster_info.select_pipe[1] >= 0) { + //printf("QUEUE_FOR_CONTEXT write\n"); + if (write(ctx->u.cluster_info.select_pipe[1], "", 1) < 0) + perror("queue_for_context write"); + } + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + pthread_cond_signal(&ctx->u.cluster_info.cond); +} + + +/** + Called by cman_dispatch to deal with messages coming across the + cluster socket. This function deals with fanning out the requests + and putting them on the per-context queues. We don't have + the benefits of pre-configured buffers, so we need this. + */ +static void +process_cman_msg(cman_handle_t h, void *priv, char *buf, int len, + uint8_t port, int nodeid) +{ + cluster_msg_hdr_t *m = (cluster_msg_hdr_t *)buf; + int x; + + if (len < sizeof(*m)) { + printf("Message too short.\n"); + return; + } + + swab_cluster_msg_hdr_t(m); + +#ifdef DEBUG + printf("Processing "); + switch(m->msg_control) { + case M_NONE: + printf("M_NONE\n"); + break; + case M_OPEN: + printf("M_OPEN\n"); + break; + case M_OPEN_ACK: + printf("M_OPEN_ACK\n"); + break; + case M_DATA: + printf("M_DATA\n"); + break; + case M_CLOSE: + printf("M_CLOSE\n"); + break; + } + + printf(" Node ID: %d %d\n", m->src_nodeid, nodeid); + printf(" Remote CTX: %d Local CTX: %d\n", m->src_ctx, m->dest_ctx); +#endif + + if (m->dest_ctx >= MAX_CONTEXTS || m->dest_ctx < 0) { + printf("Context invalid; ignoring\n"); + return; + } + + pthread_mutex_lock(&context_lock); + + if (m->dest_ctx == 0 && m->msg_control == M_DATA) { + /* Copy & place on all broadcast queues if it's a broadcast + M_DATA message... */ + for (x = 0; x < MAX_CONTEXTS; x++) { + if (!contexts[x]) + continue; + if (contexts[x]->type != MSG_CLUSTER) + continue; + if (!(contexts[x]->flags & SKF_MCAST)) + continue; + if (!(contexts[x]->flags & SKF_READ)) + continue; + + queue_for_context(contexts[x], buf, len); + } + } else if (contexts[m->dest_ctx]) { + /* Normal receive */ + queue_for_context(contexts[m->dest_ctx], buf, len); + } + /* If none of the above, then we msg for something we've already + detached from our list. No big deal, just ignore. */ + + pthread_mutex_unlock(&context_lock); + return; +} + + +/** + Accept a new pseudo-private connection coming in over the + cluster socket. + */ +static int +cluster_msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx) +{ + errno = EINVAL; + cluster_msg_hdr_t *m; + msg_q_t *n; + char done = 0; + char foo; + + if (!listenctx || !acceptctx) + return -1; + if (listenctx->u.cluster_info.local_ctx != 0) + return -1; + if (!(listenctx->flags & SKF_LISTEN)) + return -1; + + listenctx->ops->mo_init(acceptctx); + + pthread_mutex_lock(&listenctx->u.cluster_info.mutex); + + n = listenctx->u.cluster_info.queue; + if (n == NULL) { + pthread_mutex_unlock(&listenctx->u.cluster_info.mutex); + errno = EAGAIN; + return -1; + } + + /* the OPEN should be the first thing on the list; this loop + is probably not necessary */ + list_do(&listenctx->u.cluster_info.queue, n) { + + m = n->message; + switch(m->msg_control) { + case M_OPEN: + list_remove(&listenctx->u.cluster_info.queue, n); + /*printf("Accepting connection from %d %d\n", + m->src_nodeid, m->src_ctx);*/ + + /* New connection */ + pthread_mutex_init(&acceptctx->u.cluster_info.mutex, + NULL); + pthread_cond_init(&acceptctx->u.cluster_info.cond, + NULL); + acceptctx->u.cluster_info.queue = NULL; + acceptctx->u.cluster_info.remote_ctx = m->src_ctx; + acceptctx->u.cluster_info.nodeid = m->src_nodeid; + acceptctx->u.cluster_info.port = m->msg_port; + acceptctx->flags = (SKF_READ | SKF_WRITE); + + if (assign_ctx(acceptctx) < 0) { + printf("FAILED TO ASSIGN CONTEXT\n"); + } + cluster_send_control_msg(acceptctx, M_OPEN_ACK); + + if (listenctx->u.cluster_info.select_pipe[0] >= 0) { + //printf("%s read\n", __FUNCTION__); + read(listenctx->u.cluster_info.select_pipe[0], + &foo, 1); + } + + done = 1; + free(m); + free(n); + + break; + case M_DATA: + /* Data messages (i.e. from broadcast msgs) are + okay too!... but we don't handle them here */ + break; + default: + /* Other message?! */ + printf("Odd... %d\n", m->msg_control); + break; + } + + if (done) + break; + + } while (!list_done(&listenctx->u.cluster_info.queue, n)); + + pthread_mutex_unlock(&listenctx->u.cluster_info.mutex); + + return 0; +} + + +/** + This waits for events on the cluster comms FD and + dispatches them using cman_dispatch. Initially, + the design had no permanent threads, but that model + proved difficult to implement correctly. + */ +static void * +cluster_comms_thread(void *arg) +{ + /* SIGUSR2 will cause select() to abort */ + while (thread_running) { + poll_cluster_messages(2); + } + + return NULL; +} + + +/* + Transliterates a CMAN event to a control message + */ +static void +process_cman_event(cman_handle_t handle, void *private, int reason, int arg) +{ + cluster_msg_hdr_t *msg; + int *argp; + msg_q_t *node; + msgctx_t *ctx; + +#if 0 + printf("EVENT: %p %p %d %d\n", handle, private, reason, arg); +#endif + + /* Allocate queue node */ + while ((node = malloc(sizeof(*node))) == NULL) { + sleep(1); + } + memset(node, 0, sizeof(*node)); + + /* Allocate message: header + int (for arg) */ + while ((msg = malloc(sizeof(int)*2 + + sizeof(cluster_msg_hdr_t))) == NULL) { + sleep(1); + } + memset(msg, 0, sizeof(int)*2 +sizeof(cluster_msg_hdr_t)); + + switch(reason) { +#if defined(LIBCMAN_VERSION) && LIBCMAN_VERSION >= 2 + case CMAN_REASON_PORTOPENED: + msg->msg_control = M_PORTOPENED; + break; + case CMAN_REASON_TRY_SHUTDOWN: + msg->msg_control = M_TRY_SHUTDOWN; + break; +#endif + case CMAN_REASON_PORTCLOSED: + msg->msg_control = M_PORTCLOSED; + break; + case CMAN_REASON_STATECHANGE: + msg->msg_control = M_STATECHANGE; + break; + } + + argp = ((void *)msg + sizeof(cluster_msg_hdr_t)); + *argp = arg; + + node->len = sizeof(cluster_msg_hdr_t) + sizeof(int); + node->message = msg; + + pthread_mutex_lock(&context_lock); + ctx = contexts[0]; /* This is the cluster context... */ + if (!ctx) { + /* We received a close for something we've already + detached from our list. No big deal, just + ignore. */ + free(node->message); + free(node); + pthread_mutex_unlock(&context_lock); + return; + } + pthread_mutex_unlock(&context_lock); + + pthread_mutex_lock(&ctx->u.cluster_info.mutex); + list_insert(&ctx->u.cluster_info.queue, node); + /* If a select pipe was set up, wake it up */ + if (ctx->u.cluster_info.select_pipe[1] >= 0) { + //printf("PROCESS_CMAN_EVENT write\n"); + if (write(ctx->u.cluster_info.select_pipe[1], "", 1) < 0) + perror("process_cman_event write"); + } + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + pthread_cond_signal(&ctx->u.cluster_info.cond); +} + + +/* */ +int +cluster_msg_listen(int me, void *portp, msgctx_t **cluster_ctx) +{ + int e; + pthread_attr_t attrs; + cman_handle_t *ch = NULL; + msgctx_t *ctx; + int port; + + errno = EINVAL; + if (!portp) + return -1; + port = *(int *)portp; + if (port < 10 || port > 254) + return -1; + + ch = cman_lock(1, 0); + _me = me; + + /* Set up cluster context */ + ctx = msg_new_ctx(); + if (!ctx) { + cman_unlock(ch); + errno = EINVAL; + return -1; + } + + memset(contexts, 0, sizeof(contexts)); + + *cluster_ctx = ctx; + if (cman_start_recv_data(ch, process_cman_msg, + port) != 0) { + e = errno; + cman_unlock(ch); + msg_free_ctx((msgctx_t *)*cluster_ctx); + errno = e; + return -1; + } + + if (cman_start_notification(ch, process_cman_event) != 0) { + e = errno; + cman_unlock(ch); + msg_free_ctx((msgctx_t *)*cluster_ctx); + errno = e; + } + + cman_unlock(ch); + /* Done with CMAN bits */ + + pthread_mutex_lock(&context_lock); + + memset(contexts, 0, sizeof(contexts)); + contexts[0] = ctx; + + ctx->type = MSG_CLUSTER; + ctx->ops = &cluster_msg_ops; + ctx->u.cluster_info.local_ctx = 0; + ctx->u.cluster_info.remote_ctx = 0; + ctx->u.cluster_info.port = port; /* port! */ + ctx->u.cluster_info.nodeid = 0; /* Broadcast! */ + ctx->u.cluster_info.select_pipe[0] = -1; + ctx->u.cluster_info.select_pipe[1] = -1; + ctx->u.cluster_info.queue = NULL; + pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL); + pthread_cond_init(&ctx->u.cluster_info.cond, NULL); + ctx->flags = SKF_LISTEN | SKF_READ | SKF_WRITE | SKF_MCAST; + pthread_mutex_unlock(&context_lock); + + pthread_attr_init(&attrs); + pthread_attr_setinheritsched(&attrs, PTHREAD_INHERIT_SCHED); + pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED); + + thread_running = 1; + pthread_create(&comms_thread, &attrs, cluster_comms_thread, NULL); + + pthread_attr_destroy(&attrs); + + return 0; +} + + +static void +cluster_msg_print(msgctx_t *ctx) +{ + if (!ctx) + return; + + printf("Cluster Message Context %p\n", ctx); + printf(" Flags %08x\n", ctx->flags); + printf(" Node ID %d\n", ctx->u.cluster_info.nodeid); + printf(" Local CTX %d\n", ctx->u.cluster_info.local_ctx); + printf(" Remote CTX %d\n", ctx->u.cluster_info.remote_ctx); +} + + +int +cluster_msg_shutdown(void) +{ + cman_handle_t *ch; + + ch = cman_lock(1, SIGUSR2); + cman_end_recv_data(ch); + pthread_kill(comms_thread, SIGTERM); + cman_unlock(ch); + + return 0; +} + + +int +cluster_msg_init(msgctx_t *ctx) +{ + errno = EINVAL; + if (!ctx) + return -1; + + memset(ctx, 0, sizeof(*ctx)); + ctx->type = MSG_CLUSTER; + ctx->ops = &cluster_msg_ops; + pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL); + pthread_cond_init(&ctx->u.cluster_info.cond, NULL); + ctx->u.cluster_info.select_pipe[0] = -1; + ctx->u.cluster_info.select_pipe[1] = -1; + + return 0; +} + + +static msg_ops_t cluster_msg_ops = { + .mo_open = cluster_msg_open, + .mo_close = cluster_msg_close, + .mo_listen = cluster_msg_listen, + .mo_accept = cluster_msg_accept, + .mo_shutdown = cluster_msg_shutdown, + .mo_wait = cluster_msg_wait, + .mo_send = cluster_msg_send, + .mo_receive = cluster_msg_receive, + .mo_fd_set = cluster_msg_fd_set, + .mo_fd_isset = cluster_msg_fd_isset, + .mo_fd_clr = cluster_msg_fd_clr, + .mo_print = cluster_msg_print, + .mo_init = cluster_msg_init +}; /cvs/cluster/cluster/rgmanager/src/clulib/msg_socket.c,v --> standard output revision 1.1 --- cluster/rgmanager/src/clulib/msg_socket.c +++ - 2006-07-11 23:52:43.913279000 +0000 @@ -0,0 +1,432 @@ +/* + Copyright Red Hat, Inc. 2006 + + This program is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by the + Free Software Foundation; either version 2, or (at your option) any + later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 675 Mass Ave, Cambridge, + MA 02139, USA. +*/ +#define _MESSAGE_BUILD +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* Ripped from ccsd's setup_local_socket */ +#define RGMGR_SOCK "/var/run/cluster/rgmanager.sk" + +static msg_ops_t sock_msg_ops; + +static int +sock_connect(void) +{ + struct sockaddr_un sun; + int sock = -1, error = 0; + + memset(&sun, 0, sizeof(sun)); + sun.sun_family = PF_LOCAL; + snprintf(sun.sun_path, sizeof(sun.sun_path), RGMGR_SOCK); + + sock = socket(PF_LOCAL, SOCK_STREAM, 0); + if (sock < 0) { + error = errno; + goto fail; + } + + error = connect(sock, (struct sockaddr *)&sun, sizeof(sun)); + if (error < 0) { + error = errno; + close(sock); + errno = error; + sock = -1; + goto fail; + } + +fail: + + return sock; +} + + +/** + Wrapper around write(2) + */ +static int +sock_msg_send(msgctx_t *ctx, void *msg, size_t len) +{ + char buf[4096]; + int ret; + local_msg_hdr_t *h = (local_msg_hdr_t *)buf; + char *msgptr = (buf + sizeof(*h)); + + if (!ctx) + return -1; + if (!(ctx->flags & SKF_WRITE)) + return -1; + + /* encapsulate ... ? */ + if ((len + sizeof(*h)) > sizeof(buf)) { + errno = E2BIG; + return -1; + } + + h->msg_control = M_DATA; + h->msg_len = len; + memcpy(msgptr, msg, len); + + ret = _write_retry(ctx->u.local_info.sockfd, buf, + len + sizeof(*h), NULL); + + if (ret >= sizeof(*h)) + return (ret - (sizeof(*h))); + + errno = EAGAIN; + return -1; +} + + +static int +peekypeeky(int fd) +{ + local_msg_hdr_t h; + int ret; + + while ((ret = recv(fd, (void *)&h, sizeof(h), MSG_PEEK)) < 0) { + if (errno == EINTR) + continue; + return -1; + } + + if (ret == sizeof(h)) + return h.msg_control; + + if (ret == 0) + /* Socket closed? */ + return M_CLOSE; + + /* XXX */ + printf("PROTOCOL ERROR: Invalid message\n"); + return M_CLOSE; +} + + +static int +sock_msg_wait(msgctx_t *ctx, int timeout) +{ + fd_set rfds; + struct timeval tv = {0, 0}; + struct timeval *p = NULL; + + if (timeout >= 0) { + tv.tv_sec = timeout; + p = &tv; + } + + FD_ZERO(&rfds); + FD_SET(ctx->u.local_info.sockfd, &rfds); + + if (_select_retry(ctx->u.local_info.sockfd + 1, &rfds, + NULL, NULL, p) == 1) { + return peekypeeky(ctx->u.local_info.sockfd); + } + + return M_NONE; +} + + +static int +sock_msg_fd_set(msgctx_t *ctx, fd_set *fds, int *max) +{ + errno = EINVAL; + if (ctx->type != MSG_SOCKET) + return -1; + if (!fds) + return -1; + + if (ctx->u.local_info.sockfd >= 0) { + FD_SET(ctx->u.local_info.sockfd, fds); + if (ctx->u.local_info.sockfd > *max) + *max = ctx->u.local_info.sockfd; + return 0; + } + + return -1; +} + + +static int +sock_msg_fd_isset(msgctx_t *ctx, fd_set *fds) +{ + errno = EINVAL; + if (!fds || !ctx) + return -1; + if (ctx->type != MSG_SOCKET) + return -1; + + if (ctx->u.local_info.sockfd >= 0 && + FD_ISSET(ctx->u.local_info.sockfd, fds)) { + return 1; + } + return 0; +} + + +int +sock_msg_fd_clr(msgctx_t *ctx, fd_set *fds) +{ + errno = EINVAL; + if (!fds || !ctx) + return -1; + if (ctx->type != MSG_SOCKET) + return -1; + + if (ctx->u.local_info.sockfd >= 0) { + FD_CLR(ctx->u.local_info.sockfd, fds); + return 1; + } + return 0; +} + + +static int +_local_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout) +{ + struct timeval tv = {0, 0}; + struct timeval *p = NULL; + local_msg_hdr_t h; + + if (timeout > 0) { + tv.tv_sec = timeout; + p = &tv; + } + + if (_read_retry(ctx->u.local_info.sockfd, &h, sizeof(h), p) < 0) + return -1; + + if (maxlen < h.msg_len) { + printf("WARNING: Buffer too small for message (%d vs %d)!\n", + h.msg_len, (int)maxlen); + h.msg_len = maxlen; + } + + return _read_retry(ctx->u.local_info.sockfd, msg, h.msg_len, p); +} + + +/** + Receive a message from a cluster-context. This copies out the contents + into the user-specified buffer, and does random other things. + */ +static int +sock_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout) +{ + int req; + char priv_msg[4096]; + size_t priv_len = sizeof(priv_msg); + + errno = EINVAL; + if (!msg || !maxlen) + return -1; + if (ctx->type != MSG_SOCKET) + return -1; + if (!(ctx->flags & SKF_READ)) + return -1; + + req = _local_msg_receive(ctx, priv_msg, priv_len, timeout); + + if (req == 0) { + errno = ECONNRESET; + return -1; + } + + if (req < 0) + return -1; + + /* Copy out. */ + priv_len = (priv_len < maxlen ? priv_len : maxlen); + + memcpy(msg, priv_msg, priv_len); + return req; + + printf("%s: CODE PATH ERROR\n", __FUNCTION__); + return -1; +} + + +/** + Open a connection to the specified node ID. + If the speficied node is 0, this connects via the socket in + /var/run/cluster... + */ +int +sock_msg_open(int type, int nodeid, int port, msgctx_t *ctx, int timeout) +{ + errno = EINVAL; + if (!ctx || ctx->type != MSG_SOCKET) + return -1; + if (type != MSG_SOCKET) + return -1; + + if (nodeid != CMAN_NODEID_US) + return -1; + if ((ctx->u.local_info.sockfd = sock_connect()) < 0) + return -1; + ctx->flags = (SKF_READ | SKF_WRITE); + return 0; +} + + +/** + With a socket, the O/S cleans up the buffers for us. + */ +int +sock_msg_close(msgctx_t *ctx) +{ + errno = EINVAL; + if (ctx->type != MSG_SOCKET) + return -1; + + close(ctx->u.local_info.sockfd); + ctx->u.local_info.sockfd = -1; + ctx->flags = 0; + ctx->type = MSG_NONE; + return 0; +} + + +/** + Accept a new pseudo-private connection coming in over the + cluster socket. + */ +static int +sock_msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx) +{ + errno = EINVAL; + + if (!listenctx || !acceptctx) + return -1; + if (listenctx->u.local_info.sockfd < 0) + return -1; + if (!(listenctx->flags & SKF_LISTEN)) + return -1; + + listenctx->ops->mo_init(acceptctx); + acceptctx->u.local_info.sockfd = + accept(listenctx->u.local_info.sockfd, NULL, NULL); + + if (acceptctx->u.local_info.sockfd < 0) + return -1; + + acceptctx->flags = (SKF_READ | SKF_WRITE); + return 0; +} + + +int +sock_msg_listen(int me, void *portp, msgctx_t **listen_ctx) +{ + int sock; + struct sockaddr_un su; + mode_t om; + msgctx_t *ctx = NULL; + char *path = (char *)portp; + + /* Set up cluster context */ + ctx = msg_new_ctx(); + if (!ctx) + return -1; + + sock = socket(PF_LOCAL, SOCK_STREAM, 0); + if (sock < 0) + return -1; + + unlink(RGMGR_SOCK); + om = umask(077); + su.sun_family = PF_LOCAL; + snprintf(su.sun_path, sizeof(su.sun_path), path); + + if (bind(sock, &su, sizeof(su)) < 0) { + umask(om); + goto fail; + } + umask(om); + + if (listen(sock, SOMAXCONN) < 0) + goto fail; + + ctx->type = MSG_SOCKET; + ctx->u.local_info.sockfd = sock; + ctx->flags = SKF_LISTEN; + ctx->ops = &sock_msg_ops; + *listen_ctx = ctx; + return 0; +fail: + if (ctx) + free(ctx); + if (sock >= 0) + close(sock); + return -1; +} + + +/* XXX INCOMPLETE - no local_ctx setup*/ +int +sock_msg_init(msgctx_t *ctx) +{ + memset(ctx,0,sizeof(*ctx)); + ctx->type = MSG_SOCKET; + ctx->u.local_info.sockfd = -1; + ctx->ops = &sock_msg_ops; + return 0; +} + + +void +sock_msg_print(msgctx_t *ctx) +{ + printf("Socket Message Context; fd = %d\n", ctx->u.local_info.sockfd); +} + + +/* XXX INCOMPLETE */ +int +sock_msg_shutdown(void) +{ + return 0; +} + + +static msg_ops_t sock_msg_ops = { + .mo_open = sock_msg_open, + .mo_close = sock_msg_close, + .mo_listen = sock_msg_listen, + .mo_accept = sock_msg_accept, + .mo_shutdown = sock_msg_shutdown, + .mo_wait = sock_msg_wait, + .mo_send = sock_msg_send, + .mo_receive = sock_msg_receive, + .mo_fd_set = sock_msg_fd_set, + .mo_fd_isset = sock_msg_fd_isset, + .mo_fd_clr = sock_msg_fd_clr, + .mo_print = sock_msg_print, + .mo_init = sock_msg_init +}; /cvs/cluster/cluster/rgmanager/src/clulib/msgtest.c,v --> standard output revision 1.1 --- cluster/rgmanager/src/clulib/msgtest.c +++ - 2006-07-11 23:52:43.995452000 +0000 @@ -0,0 +1,286 @@ +/* + Copyright Red Hat, Inc. 2006 + + This program is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by the + Free Software Foundation; either version 2, or (at your option) any + later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 675 Mass Ave, Cambridge, + MA 02139, USA. +*/ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define MYPORT 190 + +int my_node_id = 0; +int running = 1; + + +void +sighandler(int sig) +{ + running = 0; +} + + +void * +piggyback(void *arg) +{ + msgctx_t ctx; + char buf[4096]; + + if (msg_open(MSG_CLUSTER, 0, MYPORT, &ctx, 0) != 0) { + printf("Could not set up mcast socket!\n"); + return NULL; + } + + printf("PIGGYBACK CONTEXT\n"); + msg_print(&ctx); + printf("END PIGGYBACK CONTEXT\n"); + + while (running) { + if (msg_receive(&ctx, buf, sizeof(buf), 2) > 0) { + printf("Piggyback received: %s\n", buf); + } + } + + msg_close(&ctx); + + printf("PIGGY flies...\n"); + + return NULL; +} + + +void * +private(void *arg) +{ + msgctx_t ctx; + char buf[4096]; + + while (running) { + sleep(3); + + /* use pseudoprivate channel */ + if (msg_open(MSG_CLUSTER, my_node_id, MYPORT, &ctx, 1) != 0) { + printf("Could not set up virtual-socket!\n"); + return NULL; + } + + printf("=== pvt thread channel info ===\n"); + msg_print(&ctx); + printf("=== end pvt thread channel info ===\n"); + fflush(stdout); + + snprintf(buf, sizeof(buf), "Hello!\n"); + msg_send(&ctx, buf, strlen(buf)+1); + + if (msg_receive(&ctx, buf, sizeof(buf), 10) > 0) { + printf("PRIVATE: Received %s\n", buf); + fflush(stdout); + } + + msg_close(&ctx); + + if (msg_open(MSG_CLUSTER, 0, MYPORT, &ctx, 1) != 0) { + printf("Could not set up mcast socket!\n"); + return NULL; + } + + snprintf(buf, sizeof(buf), "Babble, babble\n"); + msg_send(&ctx, buf, strlen(buf)+1); + if (msg_receive(&ctx, buf, sizeof(buf), 1) > 0) { + printf("PRIVATE: Via MCAST %s\n", buf); + fflush(stdout); + } + msg_close(&ctx); + } + + printf("Private thread is outta here...\n"); + + return NULL; +} + + +void +clu_initialize(cman_handle_t **ch) +{ + if (!ch) + exit(1); + + *ch = cman_init(NULL); + if (!(*ch)) { + printf("Waiting for CMAN to start\n"); + + while (!(*ch = cman_init(NULL))) { + sleep(1); + } + } + + if (!cman_is_quorate(*ch)) { + /* + There are two ways to do this; this happens to be the simpler + of the two. The other method is to join with a NULL group + and log in -- this will cause the plugin to not select any + node group (if any exist). + */ + printf("Waiting for quorum to form\n"); + + while (cman_is_quorate(*ch) == 0) { + sleep(1); + } + printf("Quorum formed, starting\n"); + } +} + + +int +side_message(msgctx_t *ctx) +{ + msgctx_t actx; + char buf[1024]; + + if (msg_accept(ctx, &actx) < 0) + return -1; + + printf("=== MAIN: Handling side message ===\n"); + msg_print(&actx); + fflush(stdout); + + if (msg_receive(&actx, buf, sizeof(buf), 10) > 0) { + printf("MAIN: Received %s\n", buf); + snprintf(buf, sizeof(buf), "Goodbye!\n"); + msg_send(&actx, buf, strlen(buf)+1); + } + + msg_close(&actx); + + printf("=== MAIN: end side message ===\n"); + + return 0; +} + + +void +malloc_dump_table(int, int); + + +void +sigusr2_handler(int sig) +{ +} + +int +main(int argc, char **argv) +{ + msgctx_t *cluster_ctx; + char recvbuf[128]; + cman_node_t me; + int ret; + pthread_t piggy, priv; + fd_set rfds; + int max = 0; + int port = MYPORT; + cman_handle_t *clu = NULL; + + + clu_initialize(&clu); + + if (cman_init_subsys(clu) < 0) { + perror("cman_init_subsys"); + return -1; + } + cman_get_node(clu, CMAN_NODEID_US, &me); + + my_node_id = me.cn_nodeid; + + if (msg_listen(MSG_CLUSTER, (void *)&port, + me.cn_nodeid, &cluster_ctx) < 0) { + printf("Couldn't set up cluster message system\n"); + return -1; + } + + signal(SIGTERM, sigusr2_handler); + signal(SIGUSR2, sigusr2_handler); + + pthread_create(&piggy, NULL, piggyback, NULL); + pthread_create(&priv, NULL, private, NULL); + + msg_print(cluster_ctx); + while (running) { + max = 0; + FD_ZERO(&rfds); + FD_SET(STDIN_FILENO, &rfds); + msg_fd_set(cluster_ctx, &rfds, &max); + + select(max+1, &rfds, NULL, NULL, NULL); + + if (FD_ISSET(STDIN_FILENO, &rfds)) { + fgets(recvbuf, 128, stdin); + if (recvbuf[0] == 'q' || recvbuf[0] == 'Q') + break; + if (msg_send(cluster_ctx, recvbuf, + strlen(recvbuf)+1) < 0) + perror("msg_send"); + FD_CLR(STDIN_FILENO, &rfds); + } + + if (!msg_fd_isset(cluster_ctx, &rfds)) + continue; + + ret = msg_wait(cluster_ctx, 1); + + switch(ret) { + case M_DATA: + msg_receive(cluster_ctx, recvbuf, 128, 10); + printf("MAIN: received %s\n", recvbuf); + break; + case M_OPEN: + printf("MAIN: private connection detected\n"); + side_message(cluster_ctx); + break; + case 0: + /* No data; probably a control msg */ + break; + default: + printf("Cluster EV: %d\n", ret); + /* Cluster events, etc. */ + msg_receive(cluster_ctx, recvbuf, 128, 0); + } + } + + printf("Shutting down...\n"); + + running = 0; + + pthread_join(piggy, NULL); + pthread_join(priv, NULL); + + msg_close(cluster_ctx); + msg_free_ctx(cluster_ctx); + msg_shutdown(); + + cman_finish(clu); + + malloc_dump_table(0, 1024); + + exit(0); +} --- cluster/rgmanager/src/clulib/Makefile 2006/06/02 17:37:10 1.8 +++ cluster/rgmanager/src/clulib/Makefile 2006/07/11 23:52:41 1.9 @@ -21,7 +21,7 @@ CFLAGS+= -g -Wstrict-prototypes -Wshadow -fPIC -D_GNU_SOURCE -TARGETS=libclulib.a liblalloc.a +TARGETS=libclulib.a liblalloc.a msgtest all: ${TARGETS} @@ -29,9 +29,12 @@ uninstall: +msgtest: msgtest.o libclulib.a + gcc -o msgtest msgtest.o -lcman -L. -lclulib -llalloc -lpthread + libclulib.a: clulog.o daemon_init.o signals.o msgsimple.o \ - gettid.o rg_strings.o message.o members.o fdops.o - # lock.o lockspace.o + gettid.o rg_strings.o message.o members.o fdops.o \ + lock.o cman.o vft.o msg_cluster.o msg_socket.o ${AR} cru $@ $^ ranlib $@ --- cluster/rgmanager/src/clulib/alloc.c 2006/01/20 16:27:24 1.7 +++ cluster/rgmanager/src/clulib/alloc.c 2006/07/11 23:52:41 1.8 @@ -129,8 +129,9 @@ #undef AGGR_RECLAIM /* consolidate_all on free (*slow*) */ #undef STACKSIZE /*4 backtrace to store if DEBUG is set */ +//#define STACKSIZE 4 -#undef GDB_HOOK /* Dump program addresses in malloc_table +#undef GDB_HOOK /* Dump program addresses in malloc_table using a fork/exec of gdb (SLOW but fun) building this defeats the purpose of a bounded memory allocator, and is only --- cluster/rgmanager/src/clulib/lock.c 2006/06/13 19:22:38 1.1 +++ cluster/rgmanager/src/clulib/lock.c 2006/07/11 23:52:41 1.2 @@ -31,6 +31,10 @@ #include #include +/* Default lockspace stuff */ +static dlm_lshandle_t _default_ls = NULL; +static pthread_mutex_t _default_lock = PTHREAD_MUTEX_INITIALIZER; + static void ast_function(void * __attribute__ ((unused)) arg) @@ -39,7 +43,7 @@ static int -wait_for_dlm_event(dlm_lshandle_t *ls) +wait_for_dlm_event(dlm_lshandle_t ls) { fd_set rfds; int fd = dlm_ls_get_fd(ls); @@ -55,15 +59,18 @@ int -clu_lock(dlm_lshandle_t ls, - int mode, - struct dlm_lksb *lksb, - int options, - char *resource) +clu_ls_lock(dlm_lshandle_t ls, + int mode, + struct dlm_lksb *lksb, + int options, + char *resource) { int ret; if (!ls || !lksb || !resource || !strlen(resource)) { + printf("%p %p %p %d\n", ls, lksb, resource, + (int)strlen(resource)); + printf("INVAL...\n"); errno = EINVAL; return -1; } @@ -91,15 +98,22 @@ dlm_lshandle_t -clu_acquire_lockspace(const char *lsname) +clu_open_lockspace(const char *lsname) { dlm_lshandle_t ls = NULL; + //printf("opening lockspace %s\n", lsname); + while (!ls) { ls = dlm_open_lockspace(lsname); if (ls) break; + /* + printf("Failed to open: %s; trying create.\n", + strerror(errno)); + */ + ls = dlm_create_lockspace(lsname, 0644); if (ls) break; @@ -119,9 +133,8 @@ } - int -clu_unlock(dlm_lshandle_t ls, struct dlm_lksb *lksb) +clu_ls_unlock(dlm_lshandle_t ls, struct dlm_lksb *lksb) { int ret; @@ -147,7 +160,146 @@ int -clu_release_lockspace(dlm_lshandle_t ls, char *name) +clu_close_lockspace(dlm_lshandle_t ls, const char *name) { return dlm_release_lockspace(name, ls, 0); } + + +int +clu_lock(int mode, + struct dlm_lksb *lksb, + int options, + char *resource) +{ + int ret = 0, block = 0, conv = 0, err; + + block = !(options & LKF_NOQUEUE); + + /* + Try to use a conversion lock mechanism when possible + If the caller calls explicitly with a NULL lock, then + assume the caller knows what it is doing. + + Only take the NULL lock if: + (a) the user isn't specifying CONVERT; if they are, they + know what they are doing. + + ...and one of... + + (b) This is a blocking call, or + (c) The user requested a NULL lock explicitly. In this case, + short-out early; there's no reason to convert a NULL lock + to a NULL lock. + */ + if (!(options & LKF_CONVERT) && + (block || (mode == LKM_NLMODE))) { + /* Acquire NULL lock */ + pthread_mutex_lock(&_default_lock); + ret = clu_ls_lock(_default_ls, LKM_NLMODE, lksb, + (options & ~LKF_NOQUEUE), + resource); + err = errno; + pthread_mutex_unlock(&_default_lock); + if (ret == 0) { + if (mode == LKM_NLMODE) { + /* User only wanted a NULL lock... */ + return 0; + } + /* + Ok, NULL lock was taken, rest of blocking + call should be done using lock conversions. + */ + options |= LKF_CONVERT; + conv = 1; + } else { + switch(err) { + case EINVAL: + /* Oops, null locks don't work on this + plugin; use normal spam mode */ + break; + default: + errno = err; + return -1; + } + } + } + + while (1) { + pthread_mutex_lock(&_default_lock); + ret = clu_ls_lock(_default_ls, mode, lksb, + (options | LKF_NOQUEUE), + resource); + err = errno; + pthread_mutex_unlock(&_default_lock); + + if ((ret != 0) && (err == EAGAIN) && block) { + usleep(random()&32767); + continue; + } + + break; + } + + if (ret != 0 && conv) { + /* If we get some other error, release the NL lock we + took so we don't leak locks*/ + pthread_mutex_lock(&_default_lock); + clu_ls_unlock(_default_ls, lksb); + pthread_mutex_unlock(&_default_lock); + errno = err; + } + + return ret; +} + + +int +clu_unlock(struct dlm_lksb *lksb) +{ + int ret, err; + pthread_mutex_lock(&_default_lock); + ret = clu_ls_unlock(_default_ls, lksb); + err = errno; + pthread_mutex_unlock(&_default_lock); + + usleep(random()&32767); + errno = err; + return ret; +} + + +int +clu_lock_init(const char *dflt_lsname) +{ + int ret, err; + + pthread_mutex_lock(&_default_lock); + if (_default_ls) { + pthread_mutex_unlock(&_default_lock); + return 0; + } + + if (!dflt_lsname || !strlen(dflt_lsname)) { + pthread_mutex_unlock(&_default_lock); + errno = EINVAL; + return -1; + } + + _default_ls = clu_open_lockspace(dflt_lsname); + ret = (_default_ls == NULL); + err = errno; + pthread_mutex_unlock(&_default_lock); + + errno = err; + return ret; +} + +void +clu_lock_finished(const char *name) +{ + pthread_mutex_lock(&_default_lock); + if (_default_ls) + clu_close_lockspace(_default_ls, name); + pthread_mutex_unlock(&_default_lock); +} --- cluster/rgmanager/src/clulib/lockspace.c 2006/06/13 19:22:38 1.1 +++ cluster/rgmanager/src/clulib/lockspace.c 2006/07/11 23:52:41 1.2 @@ -1,3 +1,21 @@ +/* + Copyright Red Hat, Inc. 2006 + + This program is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by the + Free Software Foundation; either version 2, or (at your option) any + later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 675 Mass Ave, Cambridge, + MA 02139, USA. +*/ #include #include #include --- cluster/rgmanager/src/clulib/members.c 2006/06/13 19:22:38 1.1 +++ cluster/rgmanager/src/clulib/members.c 2006/07/11 23:52:41 1.2 @@ -1,3 +1,21 @@ +/* + Copyright Red Hat, Inc. 2006 + + This program is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by the + Free Software Foundation; either version 2, or (at your option) any + later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 675 Mass Ave, Cambridge, + MA 02139, USA. +*/ #include #include #include @@ -8,10 +26,12 @@ #include #include #include +#include #include #include #include #include +#include static int my_node_id = -1; static pthread_rwlock_t memblock = PTHREAD_RWLOCK_INITIALIZER; @@ -73,29 +93,32 @@ { int count, x, y; char in_old = 0; - cluster_member_list_t *gained; + cluster_member_list_t *gained = NULL; /* No nodes in new? Then nothing could have been gained */ if (!new || !new->cml_count) return NULL; /* Nothing in old? Duplicate 'new' and return it. */ - if (!old || !old->cml_count) { - gained = cml_alloc(new->cml_count); - if (!gained) - return NULL; - memcpy(gained, new, cml_size(new->cml_count)); - return gained; - } + if (!old || !old->cml_count) + return member_list_dup(new); /* Use greatest possible count */ count = (old->cml_count > new->cml_count ? - cml_size(old->cml_count) : cml_size(new->cml_count)); + old->cml_count : new->cml_count); + count *= sizeof(cman_node_t); - gained = malloc(count); + gained = malloc(sizeof(cluster_member_list_t)); if (!gained) return NULL; - memset(gained, 0, count); + memset(gained, 0, sizeof(*gained)); + + gained->cml_members = malloc(count); + if (!gained->cml_members) { + free(gained); + return NULL; + } + memset(gained->cml_members, 0, count); for (x = 0; x < new->cml_count; x++) { @@ -121,6 +144,7 @@ } if (gained->cml_count == 0) { + free(gained->cml_members); free(gained); gained = NULL; } @@ -145,7 +169,7 @@ cluster_member_list_t * memb_lost(cluster_member_list_t *old, cluster_member_list_t *new) { - cluster_member_list_t *ret; + cluster_member_list_t *ret = NULL; int x; /* Reverse. ;) */ @@ -213,26 +237,42 @@ get_member_list(cman_handle_t h) { int c; + int tries = 0, local = 0; cluster_member_list_t *ml = NULL; cman_node_t *nodes = NULL; - do { + if (h == NULL) { + local = 1; + h = cman_init(NULL); + if (!h) + return NULL; + } + + do { + ++tries; if (nodes) free(nodes); c = cman_get_node_count(h); - if (c <= 0) - return NULL; + if (c <= 0) { + if (errno == EINTR) + continue; + if (ml) + free(ml); + ml = NULL; + goto out; + } if (!ml) ml = malloc(sizeof(*ml)); if (!ml) - return NULL; + goto out; nodes = malloc(sizeof(*nodes) * c); if (!nodes) { free(ml); - return NULL; + ml = NULL; + goto out; } memset(ml, 0, sizeof(*ml)); @@ -244,6 +284,10 @@ ml->cml_members = nodes; ml->cml_count = c; + +out: + if (local) + cman_finish(h); return ml; } @@ -264,6 +308,9 @@ { int x = 0; + if (!ml) + return 0; + for (x = 0; x < ml->cml_count; x++) { if (ml->cml_members[x].cn_nodeid == nodeid) return ml->cml_members[x].cn_member; @@ -278,6 +325,9 @@ { int x = 0, count = 0; + if (!ml) + return 0; + for (x = 0; x < ml->cml_count; x++) { if (ml->cml_members[x].cn_member) ++count; @@ -292,6 +342,9 @@ { int x = 0; + if (!ml) + return -1; + for (x = 0; x < ml->cml_count; x++) { if (ml->cml_members[x].cn_nodeid == nodeid) ml->cml_members[x].cn_member = 0; @@ -306,13 +359,15 @@ memb_id_to_name(cluster_member_list_t *ml, int nodeid) { int x = 0; + if (!ml) + return NULL; for (x = 0; x < ml->cml_count; x++) { if (ml->cml_members[x].cn_nodeid == nodeid) return ml->cml_members[x].cn_name; } - return 0; + return NULL; } @@ -320,13 +375,15 @@ memb_id_to_p(cluster_member_list_t *ml, int nodeid) { int x = 0; + if (!ml) + return NULL; for (x = 0; x < ml->cml_count; x++) { if (ml->cml_members[x].cn_nodeid == nodeid) return &ml->cml_members[x]; } - return 0; + return NULL; } @@ -334,6 +391,8 @@ memb_online_name(cluster_member_list_t *ml, char *name) { int x = 0; + if (!ml) + return 0; for (x = 0; x < ml->cml_count; x++) { if (!strcasecmp(ml->cml_members[x].cn_name, name)) @@ -348,6 +407,8 @@ memb_name_to_id(cluster_member_list_t *ml, char *name) { int x = 0; + if (!ml) + return 0; for (x = 0; x < ml->cml_count; x++) { if (!strcasecmp(ml->cml_members[x].cn_name, name)) @@ -362,13 +423,15 @@ memb_name_to_p(cluster_member_list_t *ml, char *name) { int x = 0; + if (!ml) + return NULL; for (x = 0; x < ml->cml_count; x++) { if (!strcasecmp(ml->cml_members[x].cn_name, name)) return &ml->cml_members[x]; } - return 0; + return NULL; } /** @@ -388,9 +451,19 @@ if (!orig) return NULL; - ret = malloc(cml_size(orig->cml_count)); - memset(ret, 0, cml_size(orig->cml_count)); - memcpy(ret, orig, cml_size(orig->cml_count)); + ret = malloc(sizeof(cluster_member_list_t)); + if (!ret) + return NULL; + memset(ret, 0, sizeof(cluster_member_list_t)); + ret->cml_members = malloc(sizeof(cman_node_t) * orig->cml_count); + + if (!ret->cml_members) { + free(ret); + return NULL; + } + ret->cml_count = orig->cml_count; + memcpy(ret->cml_members, orig->cml_members, + orig->cml_count * sizeof(cman_node_t)); return ret; } --- cluster/rgmanager/src/clulib/message.c 2006/06/13 19:22:38 1.1 +++ cluster/rgmanager/src/clulib/message.c 2006/07/11 23:52:41 1.2 @@ -1,138 +1,41 @@ +/* + Copyright Red Hat, Inc. 2006 + + This program is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by the + Free Software Foundation; either version 2, or (at your option) any + later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 675 Mass Ave, Cambridge, + MA 02139, USA. +*/ #define _MESSAGE_BUILD #include #include -#include -#include #include #include #include #include #include -#include -#include #include -#include -#include -#include #include -#include -#include -#include - - - -/* Ripped from ccsd's setup_local_socket */ -#define RGMGR_SOCK "/var/run/cluster/rgmanager.sk" -#define MAX_CONTEXTS 32 /* Testing; production should be 1024-ish */ - -/* Context 0 is reserved for control messages */ - -/* Local-ish contexts */ -static pthread_mutex_t context_lock = PTHREAD_MUTEX_INITIALIZER; -static msgctx_t *contexts[MAX_CONTEXTS]; -static uint32_t context_index = 1; -static chandle_t *gch; -pthread_t comms_thread; -int thread_running; - - -#define is_established(ctx) \ - (((ctx->type == MSG_CLUSTER) && \ - (ctx->u.cluster_info.remote_ctx && ctx->u.cluster_info.local_ctx)) || \ - ((ctx->type == MSG_SOCKET) && \ - (ctx->u.local_info.sockfd != -1))) - - -static int -local_connect(void) -{ - struct sockaddr_un sun; - int sock = -1, error = 0; - - memset(&sun, 0, sizeof(sun)); - sun.sun_family = PF_LOCAL; - snprintf(sun.sun_path, sizeof(sun.sun_path), RGMGR_SOCK); - - sock = socket(PF_LOCAL, SOCK_STREAM, 0); - if (sock < 0) { - error = errno; - goto fail; - } - - error = connect(sock, (struct sockaddr *)&sun, sizeof(sun)); - if (error < 0) { - error = errno; - goto fail; - } - - sock = error; -fail: - - return sock; -} - - -static int -send_cluster_message(msgctx_t *ctx, void *msg, size_t len) -{ - char buf[4096]; - cluster_msg_hdr_t *h = (void *)buf; - int ret; - char *msgptr = (buf + sizeof(*h)); - - if ((len + sizeof(*h)) > sizeof(buf)) { - errno = E2BIG; - return -1; - } - - h->msg_control = M_DATA; - h->src_ctx = ctx->u.cluster_info.local_ctx; - h->dest_ctx = ctx->u.cluster_info.remote_ctx; - h->msg_port = ctx->u.cluster_info.port; - memcpy(msgptr, msg, len); - /* - printf("sending cluster message, length = %d to nodeid %d port %d\n", - len + sizeof(*h), ctx->u.cluster_info.nodeid, ctx->u.cluster_info.port); - */ - - pthread_mutex_lock(&gch->c_lock); - h->src_nodeid = gch->c_nodeid; - - swab_cluster_msg_hdr_t(h); - - ret = cman_send_data(gch->c_cluster, (void *)h, len + sizeof(*h), - ctx->u.cluster_info.nodeid, - ctx->u.cluster_info.port, 0); - - pthread_mutex_unlock(&gch->c_lock); - - return len + sizeof(h); -} - - -/** - Wrapper around write(2) - */ -static int -send_socket_message(msgctx_t *ctx, void *msg, size_t len) -{ - char buf[4096]; - local_msg_hdr_t *h = (local_msg_hdr_t *)buf; - char *msgptr = (buf + sizeof(*h)); - - /* encapsulate ... ? */ - if ((len + sizeof(*h)) > sizeof(buf)) { - errno = E2BIG; - return -1; - } - - h->msg_control = M_DATA; - h->msg_len = len; - memcpy(msgptr, msg, len); - - return _write_retry(ctx->u.local_info.sockfd, msg, len + sizeof(*h), NULL); -} +/* From msg_cluster */ +int cluster_msg_init(msgctx_t *ctx); +int cluster_msg_listen(int me, void *, msgctx_t **ctx); +int cluster_msg_shutdown(void); + +/* From msg_socket */ +int sock_msg_init(msgctx_t *ctx); +int sock_msg_listen(int me, void *, msgctx_t **ctx); +int sock_msg_shutdown(void); /** @@ -142,238 +45,22 @@ int msg_send(msgctx_t *ctx, void *msg, size_t len) { - if (!ctx || !msg || !len) { - errno = EINVAL; - return -1; - } - - switch(ctx->type) { - case MSG_CLUSTER: - return send_cluster_message(ctx, msg, len); - case MSG_SOCKET: - return send_socket_message(ctx, msg, len); - default: - break; - } - errno = EINVAL; - return -1; -} - - -/** - Assign a (free) cluster context ID - */ -static int -assign_ctx(msgctx_t *ctx) -{ - int start; - - /* Assign context index */ - ctx->type = MSG_CLUSTER; - - pthread_mutex_lock(&context_lock); - start = context_index++; - if (context_index >= MAX_CONTEXTS || context_index <= 0) - context_index = 1; - do { - if (contexts[context_index]) { - ++context_index; - if (context_index >= MAX_CONTEXTS) - context_index = 1; - - if (context_index == start) { - pthread_mutex_unlock(&context_lock); - errno = EAGAIN; - return -1; - } - - continue; - } - - contexts[context_index] = ctx; - ctx->u.cluster_info.local_ctx = context_index; - - } while (0); - pthread_mutex_unlock(&context_lock); - - return 0; -} - - -/* See if anything's on the cluster socket. If so, dispatch it - on to the requisite queues - XXX should be passed a connection arg! */ -static int -poll_cluster_messages(int timeout) -{ - int ret = -1; - fd_set rfds; - int fd; - struct timeval tv; - struct timeval *p = NULL; - - if (timeout >= 0) { - p = &tv; - tv.tv_sec = tv.tv_usec = timeout; - } - printf("%s\n", __FUNCTION__); - - FD_ZERO(&rfds); - - //pthread_mutex_lock(&gch->c_lock); - fd = cman_get_fd(gch->c_cluster); - FD_SET(fd, &rfds); - - if (select(fd + 1, &rfds, NULL, NULL, p) == 1) { - cman_dispatch(gch->c_cluster, 0); - ret = 0; - } - //pthread_mutex_unlock(&gch->c_lock); - - return ret; -} - - -/** - This is used to establish and tear down pseudo-private - contexts which are shared with the cluster context. - */ -static int -cluster_send_control_msg(msgctx_t *ctx, int type) -{ - cluster_msg_hdr_t cm; - - cm.msg_control = (uint8_t)type; - cm.src_nodeid = gch->c_nodeid; - cm.dest_ctx = ctx->u.cluster_info.remote_ctx; - cm.src_ctx = ctx->u.cluster_info.local_ctx; - cm.msg_port = ctx->u.cluster_info.port; - - swab_cluster_msg_hdr_t(&cm); - - return (cman_send_data(gch->c_cluster, (void *)&cm, sizeof(cm), - ctx->u.cluster_info.nodeid, - ctx->u.cluster_info.port, 0)); -} - - -/** - Wait for a message on a context. - */ -static int -cluster_msg_wait(msgctx_t *ctx, int timeout) -{ - struct timespec ts = {0, 0}; - int req = M_NONE; - struct timeval start; - struct timeval now; - - - if (timeout > 0) - gettimeofday(&start, NULL); - - ts.tv_sec = !!timeout; - - pthread_mutex_lock(&ctx->u.cluster_info.mutex); - while (1) { - /* See if we dispatched any messages on to our queue */ - if (ctx->u.cluster_info.queue) { - req = ctx->u.cluster_info.queue->message->msg_control; - /*printf("Queue not empty CTX%d : %d\n", - ctx->u.cluster_info.local_ctx, req);*/ - break; - } - - if (timeout == 0) - break; - - /* Ok, someone else has the mutex on our FD. Go to - sleep on a cond; maybe they'll wake us up */ - if (pthread_cond_timedwait(&ctx->u.cluster_info.cond, - &ctx->u.cluster_info.mutex, - &ts) < 0) { - - /* Mutex held */ - if (errno == ETIMEDOUT) { - if (timeout < 0) { - ts.tv_sec = 1; - ts.tv_nsec = 0; - continue; - } - - ts.tv_sec = !!timeout; - - /* Done */ - break; - } - } - - if (timeout > 0) { - gettimeofday(&now, NULL); - /* XXX imprecise */ - if (now.tv_sec - start.tv_sec > timeout) - break; - } - } - pthread_mutex_unlock(&ctx->u.cluster_info.mutex); - - return req; -} - - -static int -peekypeeky(int fd) -{ - local_msg_hdr_t h; - int ret; - - while ((ret = recv(fd, (void *)&h, sizeof(h), MSG_PEEK)) < 0) { - if (errno == EINTR) - continue; + if (!ctx || !msg || !len) return -1; - } - - if (ret == sizeof(h)) - return h.msg_control; - - if (ret == 0) - /* Socket closed? */ - return M_CLOSE; - /* XXX */ - printf("PROTOCOL ERROR: Invalid message\n"); - return M_CLOSE; -} - - -static int -local_msg_wait(msgctx_t *ctx, int timeout) -{ - fd_set rfds; - struct timeval tv = {0, 0}; - struct timeval *p = NULL; - - if (timeout >= 0) { - tv.tv_sec = timeout; - p = &tv; - } - - FD_ZERO(&rfds); - FD_SET(ctx->u.local_info.sockfd, &rfds); - - if (_select_retry(ctx->u.local_info.sockfd + 1, &rfds, - NULL, NULL, p) == 1) { - return peekypeeky(ctx->u.local_info.sockfd); - } - - return M_NONE; + if (ctx->ops && ctx->ops->mo_send) + return ctx->ops->mo_send(ctx, msg, len); + errno = ENOSYS; + return -1; } +/* XXX get API for this ready */ int msg_get_nodeid(msgctx_t *ctx) { + /* XXX */ switch(ctx->type) { case MSG_CLUSTER: return ctx->u.cluster_info.nodeid; @@ -390,49 +77,13 @@ int msg_fd_set(msgctx_t *ctx, fd_set *fds, int *max) { - int e; - switch(ctx->type) { - case MSG_CLUSTER: - pthread_mutex_lock(&ctx->u.cluster_info.mutex); - if (ctx->u.cluster_info.select_pipe[0] < 0) { - if (pipe(ctx->u.cluster_info.select_pipe) < 0) { - e = errno; - pthread_mutex_unlock(&ctx->u.cluster_info.mutex); - errno = e; - return -1; - } - - printf("%s: Created cluster CTX select pipe " - "rd=%d wr=%d\n", __FUNCTION__, - ctx->u.cluster_info.select_pipe[0], - ctx->u.cluster_info.select_pipe[1]); - - } - - e = ctx->u.cluster_info.select_pipe[0]; - printf("%s: cluster %d\n", __FUNCTION__, e); - FD_SET(e, fds); - pthread_mutex_unlock(&ctx->u.cluster_info.mutex); - - if (e > *max) - *max = e; - return 0; - - case MSG_SOCKET: - if (ctx->u.local_info.sockfd >= 0) { - printf("%s: local %d\n", __FUNCTION__, - ctx->u.local_info.sockfd); - FD_SET(ctx->u.local_info.sockfd, fds); - - if (ctx->u.local_info.sockfd > *max) - *max = ctx->u.local_info.sockfd; - return 0; - } + errno = EINVAL; + if (!ctx) return -1; - default: - break; - } - + + if (ctx->ops && ctx->ops->mo_fd_set) + return ctx->ops->mo_fd_set(ctx, fds, max); + errno = ENOSYS; return -1; } @@ -441,30 +92,12 @@ msg_fd_isset(msgctx_t *ctx, fd_set *fds) { errno = EINVAL; - - if (!fds || !ctx) + if (!ctx) return -1; - - switch(ctx->type) { - case MSG_CLUSTER: - pthread_mutex_lock(&ctx->u.cluster_info.mutex); - if (ctx->u.cluster_info.select_pipe[0] >= 0 && - FD_ISSET(ctx->u.cluster_info.select_pipe[0], fds)) { - pthread_mutex_unlock(&ctx->u.cluster_info.mutex); - return 1; - } - pthread_mutex_unlock(&ctx->u.cluster_info.mutex); - return 0; - case MSG_SOCKET: - if (ctx->u.local_info.sockfd >= 0 && - FD_ISSET(ctx->u.local_info.sockfd, fds)) { - return 1; - } - return 0; - default: - break; - } - + + if (ctx->ops && ctx->ops->mo_fd_isset) + return ctx->ops->mo_fd_isset(ctx, fds); + errno = ENOSYS; return -1; } @@ -473,30 +106,12 @@ msg_fd_clr(msgctx_t *ctx, fd_set *fds) { errno = EINVAL; - - if (!fds || !ctx) + if (!ctx) return -1; - - switch(ctx->type) { - case MSG_CLUSTER: - pthread_mutex_lock(&ctx->u.cluster_info.mutex); - if (ctx->u.cluster_info.select_pipe[0] >= 0) { - FD_CLR(ctx->u.cluster_info.select_pipe[0], fds); - pthread_mutex_unlock(&ctx->u.cluster_info.mutex); - return 1; - } - pthread_mutex_unlock(&ctx->u.cluster_info.mutex); - return 0; - case MSG_SOCKET: - if (ctx->u.local_info.sockfd >= 0) { - FD_CLR(ctx->u.local_info.sockfd, fds); - return 1; - } - return 0; - default: - break; - } - + + if (ctx->ops && ctx->ops->mo_fd_clr) + return ctx->ops->mo_fd_clr(ctx, fds); + errno = ENOSYS; return -1; } @@ -519,232 +134,28 @@ int msg_wait(msgctx_t *ctx, int timeout) { - - if (!ctx) { - errno = EINVAL; - return -1; - } - - switch(ctx->type) { - case MSG_CLUSTER: - return cluster_msg_wait(ctx, timeout); - case MSG_SOCKET: - return local_msg_wait(ctx, timeout); - default: - break; - } - errno = EINVAL; - return -1; -} - - -int -_cluster_msg_receive(msgctx_t *ctx, void **msg, size_t *len) -{ - cluster_msg_hdr_t *m; - msg_q_t *n; - int ret = 0; - - if (msg) - *msg = NULL; - if (len) - *len = 0; - - if (ctx->u.cluster_info.local_ctx < 0 || - ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) { - errno = EBADF; - return -1; - } - - /* trigger receive here */ - pthread_mutex_lock(&ctx->u.cluster_info.mutex); - - n = ctx->u.cluster_info.queue; - if (n == NULL) { - pthread_mutex_unlock(&ctx->u.cluster_info.mutex); - errno = EAGAIN; - return -1; - } - - list_remove(&ctx->u.cluster_info.queue, n); - - pthread_mutex_unlock(&ctx->u.cluster_info.mutex); - - m = n->message; - switch(m->msg_control) { - case M_CLOSE: - ctx->u.cluster_info.remote_ctx = 0; - break; - case M_OPEN_ACK: - /* Response to new connection */ - ctx->u.cluster_info.remote_ctx = m->src_ctx; - break; - case M_DATA: - /* Kill the message control structure */ - memmove(m, &m[1], n->len - sizeof(*m)); - if (msg) - *msg = (void *)m; - else { - printf("Warning: dropping data message\n"); - free(m); - } - if (len) - *len = (n->len - sizeof(*m)); - ret = (n->len - sizeof(*m)); - free(n); - - printf("Message received\n"); - return ret; - case M_OPEN: - /* Someone is trying to open a connection */ - default: - /* ?!! */ - ret = -1; - break; - } - - free(m); - free(n); - - return ret; -} - - -/** - Receive a message from a cluster-context. This copies out the contents - into the user-specified buffer, and does random other things. - */ -static int -cluster_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout) -{ - int req; - void *priv_msg; - size_t priv_len; - - if (!msg || !maxlen) { - errno = EINVAL; - return -1; - } - - req = cluster_msg_wait(ctx, timeout); - - switch (req) { - case M_DATA: - /* Copy out. */ - req = _cluster_msg_receive(ctx, &priv_msg, &priv_len); - if (req < 0) { - printf("Ruh roh!\n"); - return -1; - } - - priv_len = (priv_len < maxlen ? priv_len : maxlen); - - memcpy(msg, priv_msg, priv_len); - free(priv_msg); - return req; - case M_CLOSE: - errno = ECONNRESET; - return -1; - case 0: - /*printf("Nothing on queue\n");*/ - return 0; - default: - printf("PROTOCOL ERROR: Received %d\n", req); + if (!ctx) return -1; - } - - printf("%s: CODE PATH ERROR\n", __FUNCTION__); + + if (ctx->ops && ctx->ops->mo_wait) + return ctx->ops->mo_wait(ctx, timeout); + errno = ENOSYS; return -1; } -static int -_local_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout) -{ - struct timeval tv = {0, 0}; - struct timeval *p = NULL; - local_msg_hdr_t h; - - if (timeout >= 0) { - tv.tv_sec = timeout; - p = &tv; - } - - if (_read_retry(ctx->u.local_info.sockfd, &h, sizeof(h), p) < 0) - return -1; - - if (maxlen < h.msg_len) { - printf("WARNING: Buffer too small for message!\n"); - h.msg_len = maxlen; - } - - return _read_retry(ctx->u.local_info.sockfd, msg, h.msg_len, p); -} - - -/** - Receive a message from a cluster-context. This copies out the contents - into the user-specified buffer, and does random other things. - */ -static int -local_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout) -{ - int req; - char priv_msg[4096]; - size_t priv_len; - - if (!msg || !maxlen) { - errno = EINVAL; - return -1; - } - - switch (req) { - case M_DATA: - /* Copy out. */ - req = _local_msg_receive(ctx, priv_msg, priv_len, timeout); - if (req <= 0) - return -1; - - priv_len = (priv_len < maxlen ? priv_len : maxlen); - - memcpy(msg, priv_msg, priv_len); - free(msg); - return req; - case M_CLOSE: - errno = ECONNRESET; - return -1; - case 0: - /*printf("Nothing on queue\n");*/ - return 0; - default: - printf("PROTOCOL ERROR: Received %d\n", req); - return -1; - } - - printf("%s: CODE PATH ERROR\n", __FUNCTION__); - return -1; -} - int msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout) { - if (!ctx || !msg || !maxlen) { - errno = EINVAL; + errno = EINVAL; + if (!ctx) return -1; - } - switch(ctx->type) { - case MSG_CLUSTER: - return cluster_msg_receive(ctx, msg, maxlen, timeout); - case MSG_SOCKET: - return local_msg_receive(ctx, msg, maxlen, timeout); - default: - break; - } - - errno = EINVAL; + if (ctx->ops && ctx->ops->mo_receive) + return ctx->ops->mo_receive(ctx, msg, maxlen, timeout); + errno = ENOSYS; return -1; } @@ -755,78 +166,28 @@ /var/run/cluster... */ int -msg_open(int nodeid, int port, msgctx_t *ctx, int timeout) +msg_open(int type, int nodeid, int port, msgctx_t *ctx, int timeout) { - int t = 0; - errno = EINVAL; if (!ctx) return -1; - - /*printf("Opening pseudo channel to node %d\n", nodeid);*/ - - memset(ctx, 0, sizeof(*ctx)); - if (nodeid == CMAN_NODEID_US) { - if ((ctx->u.local_info.sockfd = local_connect()) < 0) { - return -1; - } - ctx->type = MSG_SOCKET; - return 0; - } - - ctx->type = MSG_CLUSTER; - ctx->u.cluster_info.nodeid = nodeid; - ctx->u.cluster_info.port = port; - ctx->u.cluster_info.local_ctx = -1; - ctx->u.cluster_info.remote_ctx = 0; - ctx->u.cluster_info.queue = NULL; - ctx->u.cluster_info.select_pipe[0] = -1; - ctx->u.cluster_info.select_pipe[1] = -1; - pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL); - pthread_cond_init(&ctx->u.cluster_info.cond, NULL); - - /* Assign context index */ - if (assign_ctx(ctx) < 0) - return -1; - - //printf(" Local CTX: %d\n", ctx->u.cluster_info.local_ctx); - - /* Send open */ - - //printf(" Sending control message M_OPEN\n"); - if (cluster_send_control_msg(ctx, M_OPEN) < 0) { + /* XXX SPECIAL CASE... ow. */ + switch(type) { + case MSG_SOCKET: + sock_msg_init(ctx); + break; + case MSG_CLUSTER: + cluster_msg_init(ctx); + break; + default: return -1; } - /* Ok, wait for a response */ - while (!is_established(ctx)) { - ++t; - if (t > timeout) { - msg_close(ctx); - errno = ETIMEDOUT; - return -1; - } - - switch(msg_wait(ctx, 1)) { - case M_OPEN_ACK: - _cluster_msg_receive(ctx, NULL, NULL); - break; - case M_NONE: - continue; - default: - printf("PROTO ERROR: M_OPEN_ACK not received \n"); - } - } - - /* - printf(" Remote CTX: %d\n", - ctx->u.cluster_info.remote_ctx); - printf(" Pseudo channel established!\n"); - */ - - - return 0; + if (ctx->ops && ctx->ops->mo_open) + return ctx->ops->mo_open(ctx->type, nodeid, port, ctx, timeout); + errno = ENOSYS; + return -1; } @@ -842,484 +203,65 @@ int msg_close(msgctx_t *ctx) { - msg_q_t *n = NULL; - - if (!ctx) { - errno = EINVAL; - return -1; - } - - switch (ctx->type) { - case MSG_CLUSTER: - if (ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) { - errno = EINVAL; - return -1; - } - pthread_mutex_lock(&context_lock); - /* Other threads should not be able to see this again */ - if (contexts[ctx->u.cluster_info.local_ctx]) - contexts[ctx->u.cluster_info.local_ctx] = NULL; - pthread_mutex_unlock(&context_lock); - /* Clear receive queue */ - while ((n = ctx->u.cluster_info.queue) != NULL) { - list_remove(&ctx->u.cluster_info.queue, n); - free(n->message); - free(n); - } - /* Send close message */ - if (ctx->u.cluster_info.remote_ctx != 0) { - cluster_send_control_msg(ctx, M_CLOSE); - } - - /* Close pipe if it's open */ - if (ctx->u.cluster_info.select_pipe[0] >= 0) { - close(ctx->u.cluster_info.select_pipe[0]); - ctx->u.cluster_info.select_pipe[0] = -1; - } - if (ctx->u.cluster_info.select_pipe[1] >= 0) { - close(ctx->u.cluster_info.select_pipe[1]); - ctx->u.cluster_info.select_pipe[1] = -1; - } - return 0; - case MSG_SOCKET: - close(ctx->u.local_info.sockfd); - ctx->u.local_info.sockfd = -1; - return 0; - default: - break; - } - errno = EINVAL; - return -1; -} - - -/** - Called by cman_dispatch to deal with messages coming across the - cluster socket. This function deals with fanning out the requests - and putting them on the per-context queues. We don't have - the benefits of pre-configured buffers, so we need this. - */ -static void -process_cman_msg(cman_handle_t h, void *priv, char *buf, int len, - uint8_t port, int nodeid) -{ - cluster_msg_hdr_t *m = (cluster_msg_hdr_t *)buf; - msg_q_t *node; - msgctx_t *ctx; - - if (len < sizeof(*m)) { - printf("Message too short.\n"); - return; - } - - swab_cluster_msg_hdr_t(m); - -#if 0 - printf("Processing "); - switch(m->msg_control) { - case M_NONE: - printf("M_NONE\n"); - break; - case M_OPEN: - printf("M_OPEN\n"); - break; - case M_OPEN_ACK: - printf("M_OPEN_ACK\n"); - break; - case M_DATA: - printf("M_DATA\n"); - break; - case M_CLOSE: - printf("M_CLOSE\n"); - break; - } - - printf(" Node ID: %d %d\n", m->src_nodeid, nodeid); - printf(" Remote CTX: %d Local CTX: %d\n", m->src_ctx, m->dest_ctx); -#endif - - if (m->dest_ctx >= MAX_CONTEXTS) { - printf("Context invalid; ignoring\n"); - return; - } - - while ((node = malloc(sizeof(*node))) == NULL) { - sleep(1); - } - memset(node, 0, sizeof(*node)); - while ((node->message = malloc(len)) == NULL) { - sleep(1); - } - memcpy(node->message, buf, len); - node->len = len; - - pthread_mutex_lock(&context_lock); - ctx = contexts[m->dest_ctx]; - if (!ctx) { - /* We received a close for something we've already - detached from our list. No big deal, just - ignore. */ - free(node->message); - free(node); - pthread_mutex_unlock(&context_lock); - return; - } - - pthread_mutex_lock(&ctx->u.cluster_info.mutex); - list_insert(&ctx->u.cluster_info.queue, node); - pthread_mutex_unlock(&ctx->u.cluster_info.mutex); - /* If a select pipe was set up, wake it up */ - if (ctx->u.cluster_info.select_pipe[1] >= 0) - write(ctx->u.cluster_info.select_pipe[1], "", 1); - pthread_mutex_unlock(&context_lock); - - pthread_cond_signal(&ctx->u.cluster_info.cond); -} - - -/** - Accept a new pseudo-private connection coming in over the - cluster socket. - */ -static int -cluster_msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx) -{ - errno = EINVAL; - cluster_msg_hdr_t *m; - msg_q_t *n; - char done = 0; - char foo; - - if (!listenctx || !acceptctx) - return -1; - if (listenctx->u.cluster_info.local_ctx != 0) - return -1; - - pthread_mutex_lock(&listenctx->u.cluster_info.mutex); - - n = listenctx->u.cluster_info.queue; - if (n == NULL) { - pthread_mutex_unlock(&listenctx->u.cluster_info.mutex); - errno = EAGAIN; + if (!ctx) return -1; - } - - /* the OPEN should be the first thing on the list; this loop - is probably not necessary */ - list_do(&listenctx->u.cluster_info.queue, n) { - - m = n->message; - switch(m->msg_control) { - case M_OPEN: - list_remove(&listenctx->u.cluster_info.queue, n); - /*printf("Accepting connection from %d %d\n", - m->src_nodeid, m->src_ctx);*/ - - /* New connection */ - pthread_mutex_init(&acceptctx->u.cluster_info.mutex, - NULL); - pthread_cond_init(&acceptctx->u.cluster_info.cond, - NULL); - acceptctx->u.cluster_info.queue = NULL; - acceptctx->u.cluster_info.remote_ctx = m->src_ctx; - acceptctx->u.cluster_info.nodeid = m->src_nodeid; - acceptctx->u.cluster_info.port = m->msg_port; - - assign_ctx(acceptctx); - cluster_send_control_msg(acceptctx, M_OPEN_ACK); - - if (listenctx->u.cluster_info.select_pipe[0] >= 0) { - read(listenctx->u.cluster_info.select_pipe[0], - &foo, 1); - } - - done = 1; - free(m); - free(n); - - break; - case M_DATA: - /* Data messages (i.e. from broadcast msgs) are - okay too!... but we don't handle them here */ - break; - default: - /* Other message?! */ - printf("Odd... %d\n", m->msg_control); - break; - } - - if (done) - break; - - } while (!list_done(&listenctx->u.cluster_info.queue, n)); - - pthread_mutex_unlock(&listenctx->u.cluster_info.mutex); - - return 0; + if (ctx->ops && ctx->ops->mo_close) + return ctx->ops->mo_close(ctx); + errno = ENOSYS; + return -1; } -/* XXX INCOMPLETE */ int msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx) { - switch(listenctx->type) { - case MSG_CLUSTER: - return cluster_msg_accept(listenctx, acceptctx); - case MSG_SOCKET: - return 0; - default: - break; - } - - return -1; -} - - -static int -local_listener_sk(void) -{ - int sock; - struct sockaddr_un su; - mode_t om; - - sock = socket(PF_LOCAL, SOCK_STREAM, 0); - if (sock < 0) + errno = EINVAL; + if (!listenctx || !acceptctx) return -1; - - unlink(RGMGR_SOCK); - om = umask(077); - su.sun_family = PF_LOCAL; - snprintf(su.sun_path, sizeof(su.sun_path), RGMGR_SOCK); - - if (bind(sock, &su, sizeof(su)) < 0) { - umask(om); - goto fail; - } - umask(om); - - if (listen(sock, SOMAXCONN) < 0) - goto fail; - - return sock; -fail: - if (sock >= 0) - close(sock); + if (listenctx->ops && listenctx->ops->mo_accept) + return listenctx->ops->mo_accept(listenctx, acceptctx); + errno = ENOSYS; return -1; } -/** - This waits for events on the cluster comms FD and - dispatches them using cman_dispatch. Initially, - the design had no permanent threads, but that model - proved difficult to implement correctly. - */ -static void * -cluster_comms_thread(void *arg) -{ - while (thread_running) { - poll_cluster_messages(2); - } - - return NULL; -} - - -/* - Transliterates a CMAN event to a control message - */ -static void -process_cman_event(cman_handle_t handle, void *private, int reason, int arg) -{ - cluster_msg_hdr_t *msg; - int *argp; - msg_q_t *node; - msgctx_t *ctx; - - /* Allocate queue node */ - while ((node = malloc(sizeof(*node))) == NULL) { - sleep(1); - } - memset(node, 0, sizeof(*node)); - - /* Allocate message: header + int (for arg) */ - while ((msg = malloc(sizeof(int) + - sizeof(cluster_msg_hdr_t))) == NULL) { - sleep(1); - } - memset(msg, 0, sizeof(int)+sizeof(cluster_msg_hdr_t)); - - - switch(reason) { -#if defined(LIBCMAN_VERSION) && LIBCMAN_VERSION >= 2 - case CMAN_REASON_PORTOPENED: - msg->msg_control = M_PORTOPENED; - break; - case CMAN_REASON_TRY_SHUTDOWN: - msg->msg_control = M_TRY_SHUTDOWN; - break; -#endif - case CMAN_REASON_PORTCLOSED: - msg->msg_control = M_PORTCLOSED; - break; - case CMAN_REASON_STATECHANGE: - msg->msg_control = M_STATECHANGE; - break; - } - - argp = ((void *)msg + sizeof(cluster_msg_hdr_t)); - *argp = arg; - - node->len = sizeof(cluster_msg_hdr_t) + sizeof(int); - node->message = msg; - - pthread_mutex_lock(&context_lock); - ctx = contexts[0]; /* This is the cluster context... */ - if (!ctx) { - /* We received a close for something we've already - detached from our list. No big deal, just - ignore. */ - free(node->message); - free(node); - pthread_mutex_unlock(&context_lock); - return; - } - - pthread_mutex_lock(&ctx->u.cluster_info.mutex); - list_insert(&ctx->u.cluster_info.queue, node); - pthread_mutex_unlock(&ctx->u.cluster_info.mutex); - /* If a select pipe was set up, wake it up */ - if (ctx->u.cluster_info.select_pipe[1] >= 0) - write(ctx->u.cluster_info.select_pipe[1], "", 1); - pthread_mutex_unlock(&context_lock); - - pthread_cond_signal(&ctx->u.cluster_info.cond); -} - - -/* XXX INCOMPLETE */ +/* XXX Special case */ int -msg_init(chandle_t *ch) +msg_listen(int type, void *port, int me, msgctx_t **ctx) { - int e; - pthread_attr_t attrs; - msgctx_t *ctx; - - pthread_mutex_lock(&ch->c_lock); - - /* Set up local context */ - - ctx = msg_new_ctx(); - if (!ctx) { - pthread_mutex_unlock(&ch->c_lock); + errno = EINVAL; + if (!me) return -1; - } - - ctx->type = MSG_SOCKET; - ctx->u.local_info.sockfd = local_listener_sk(); - ctx->u.local_info.flags = SKF_LISTEN; - - ch->local_ctx = ctx; - - ctx = msg_new_ctx(); - - if (!ctx) { - pthread_mutex_unlock(&ch->c_lock); - msg_free_ctx((msgctx_t *)ch->local_ctx); + if (type == MSG_NONE) return -1; - } - - gch = ch; - - if (cman_start_recv_data(ch->c_cluster, process_cman_msg, - RG_PORT) != 0) { - e = errno; - msg_close(ch->local_ctx); - pthread_mutex_unlock(&ch->c_lock); - msg_free_ctx((msgctx_t *)ch->local_ctx); - msg_free_ctx((msgctx_t *)ch->cluster_ctx); - errno = e; + if (!ctx) return -1; - } - if (cman_start_notification(ch->c_cluster, process_cman_event) != 0) { - e = errno; - msg_close(ch->local_ctx); - pthread_mutex_unlock(&ch->c_lock); - msg_free_ctx((msgctx_t *)ch->local_ctx); - msg_free_ctx((msgctx_t *)ch->cluster_ctx); - errno = e; + if (type == MSG_CLUSTER) { + return cluster_msg_listen(me, port, ctx); + } else if (type == MSG_SOCKET) { + return sock_msg_listen(me, port, ctx); } - ch->cluster_ctx = ctx; - pthread_mutex_unlock(&ch->c_lock); - - pthread_mutex_lock(&context_lock); - - memset(contexts, 0, sizeof(contexts)); - contexts[0] = ctx; - - ctx->type = MSG_CLUSTER; - ctx->u.cluster_info.port = RG_PORT; /* port! */ - ctx->u.cluster_info.nodeid = 0; /* Broadcast! */ - ctx->u.cluster_info.select_pipe[0] = -1; - ctx->u.cluster_info.select_pipe[1] = -1; - pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL); - pthread_cond_init(&ctx->u.cluster_info.cond, NULL); - pthread_mutex_unlock(&context_lock); - - pthread_attr_init(&attrs); - pthread_attr_setinheritsched(&attrs, PTHREAD_INHERIT_SCHED); - pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED); - - thread_running = 1; - pthread_create(&comms_thread, &attrs, cluster_comms_thread, NULL); - - - pthread_attr_destroy(&attrs); - - return 0; + return -1; } -int -msg_print_ctx(int ctx) +void +msg_print(msgctx_t *ctx) { - if (!contexts[ctx]) - return -1; - - printf("Cluster Message Context %d\n", ctx); - printf(" Node ID %d\n", contexts[ctx]->u.cluster_info.nodeid); - printf(" Remote %d\n", contexts[ctx]->u.cluster_info.remote_ctx); - return 0; + if (ctx->ops && ctx->ops->mo_print) + return ctx->ops->mo_print(ctx); } /* XXX INCOMPLETE */ int -msg_shutdown(chandle_t *ch) +msg_shutdown(void) { - if (!ch) { - errno = EINVAL; - return -1; - } - - while (pthread_kill(comms_thread, 0) == 0) - sleep(1); - - pthread_mutex_lock(&ch->c_lock); - - /* xxx purge everything */ - msg_close(ch->local_ctx); - cman_end_recv_data(ch->c_cluster); - - msg_free_ctx(ch->local_ctx); - msg_free_ctx(ch->cluster_ctx); - - - pthread_mutex_unlock(&ch->c_lock); + sock_msg_shutdown(); + cluster_msg_shutdown(); return 0; } @@ -1337,7 +279,6 @@ { msgctx_t *p; - printf("Alloc %d\n", sizeof(msgctx_t)); p = malloc(sizeof(msgctx_t)); if (!p) return NULL; @@ -1354,4 +295,3 @@ { free(dead); } - --- cluster/rgmanager/src/clulib/msgsimple.c 2006/06/02 17:37:10 1.5 +++ cluster/rgmanager/src/clulib/msgsimple.c 2006/07/11 23:52:41 1.6 @@ -76,9 +76,8 @@ msg_receive_simple(msgctx_t *ctx, generic_msg_hdr ** buf, int timeout) { int ret; - char msgbuf[16384]; + char msgbuf[4096]; generic_msg_hdr *peek_msg = (generic_msg_hdr *)msgbuf; - int size; /* * Peek at the header. We need the size of the inbound buffer! @@ -102,6 +101,7 @@ return -1; } + /* Decode so we know how much to allocate */ swab_generic_msg_hdr(peek_msg); if (peek_msg->gh_magic != GENERIC_HDR_MAGIC) { fprintf(stderr, "Invalid magic: Wanted 0x%08x, got 0x%08x\n", @@ -113,14 +113,15 @@ * allocate enough memory to receive the header + diff buffer */ *buf = malloc(peek_msg->gh_length); - if (!*buf) { fprintf(stderr, "%s: malloc: %s", __FUNCTION__, strerror(errno)); return -1; } - memset(*buf, 0, peek_msg->gh_length); - memcpy(*buf, msgbuf + sizeof(generic_msg_hdr), peek_msg->gh_length); + memcpy(*buf, msgbuf, peek_msg->gh_length); + + /* Put it back into the original order... */ + swab_generic_msg_hdr((generic_msg_hdr *)(*buf)); return ret; } --- cluster/rgmanager/src/clulib/rg_strings.c 2005/03/07 17:25:12 1.3 +++ cluster/rgmanager/src/clulib/rg_strings.c 2006/07/11 23:52:41 1.4 @@ -1,3 +1,21 @@ +/* + Copyright Red Hat, Inc. 2004-2006 + + This program is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by the + Free Software Foundation; either version 2, or (at your option) any + later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 675 Mass Ave, Cambridge, + MA 02139, USA. +*/ const char *rg_state_strings[] = { "stopped", "starting", --- cluster/rgmanager/src/clulib/signals.c 2004/08/13 15:36:51 1.1 +++ cluster/rgmanager/src/clulib/signals.c 2006/07/11 23:52:41 1.2 @@ -1,3 +1,21 @@ +/* + Copyright Red Hat, Inc. 2003-2006 + + This program is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by the + Free Software Foundation; either version 2, or (at your option) any + later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 675 Mass Ave, Cambridge, + MA 02139, USA. +*/ #include #include #include --- cluster/rgmanager/src/clulib/vft.c 2006/06/02 17:37:10 1.13 +++ cluster/rgmanager/src/clulib/vft.c 2006/07/11 23:52:41 1.14 @@ -1,5 +1,5 @@ /* - Copyright Red Hat, Inc. 2002-2003 + Copyright Red Hat, Inc. 2002-2006 This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the @@ -40,12 +40,8 @@ #include #include #include +#include - -int clu_lock_verbose(char *lockname, int flags, void **lockpp); - -static int vf_lfds[2]; -static int vf_lfd = 0; static key_node_t *key_list = NULL; /** List of key nodes. */ static int _node_id = (int)-1;/** Our node ID, set with vf_init. */ static uint16_t _port = 0; /** Our daemon ID, set with vf_init. */ @@ -57,7 +53,7 @@ static pthread_mutex_t key_list_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t vf_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_t vf_thread = (pthread_t)-1; -static int thread_ready = 0; +static int vf_thread_ready = 0; static vf_vote_cb_t default_vote_cb = NULL; static vf_vote_cb_t default_commit_cb = NULL; @@ -65,43 +61,42 @@ /* * Internal Functions */ -static int send_to_all(msgctx_t *peer_ctx, int32_t command, int arg1, int arg2, - int log_errors); -static int vf_send_abort(msgctx_t *ctx); -static int vf_send_commit(msgctx_t *ctx); -static void close_all(msgctx_t *fds); +static int _send_simple(msgctx_t *ctx, int32_t command, int arg1, int arg2, + int log_errors); +static int vf_send_abort(msgctx_t *ctx, uint32_t trans); +static int vf_send_commit(msgctx_t *ctx, uint32_t trans); static key_node_t * kn_find_key(char *keyid); -static key_node_t * kn_find_fd(msgctx_t *ctx); -static int vf_handle_join_view_msg(msgctx_t *ctx, vf_msg_t * hdrp); +static key_node_t * kn_find_trans(uint32_t trans); +static int vf_handle_join_view_msg(msgctx_t *ctx, int nodeid, vf_msg_t * hdrp); static int vf_resolve_views(key_node_t *key_node); -static int vf_unanimous(msgctx_t *peer_ctx, int remain, int timeout); -static view_node_t * vn_new(msgctx_t *ctx, uint32_t nodeid, int viewno, +static int vf_unanimous(msgctx_t *ctx, int trans, int remain, int timeout); +static view_node_t * vn_new(uint32_t trans, uint32_t nodeid, int viewno, void *data, uint32_t datalen); static int vf_request_current(cluster_member_list_t *membership, char *keyid, - int *viewno, void **data, uint32_t *datalen); -static int _vf_purge(key_node_t *key_node, msgctx_t **fd); + uint64_t *viewno, void **data, uint32_t *datalen); +static int _vf_purge(key_node_t *key_node, uint32_t *trans); /* Join-view buffer list functions */ static int vn_cmp(view_node_t *left, view_node_t *right); static int vn_insert_sorted(view_node_t **head, view_node_t *node); -static view_node_t * vn_remove(view_node_t **head, msgctx_t *ctx); -static int vf_buffer_join_msg(int fd, vf_msg_t *hdr, +static view_node_t * vn_remove(view_node_t **head, uint32_t trans); +static int vf_buffer_join_msg(vf_msg_t *hdr, struct timeval *timeout); /* Commits buffer list functions */ static int vc_cmp(commit_node_t *left, commit_node_t *right); static int vc_insert_sorted(commit_node_t **head, commit_node_t *node); -static commit_node_t * vc_remove(commit_node_t **head, int fd); -static int vf_buffer_commit(int fd); +static commit_node_t * vc_remove(commit_node_t **head, uint32_t trans); +static int vf_buffer_commit(uint32_t trans); /* Simple functions which client calls to vote/abort */ -static int vf_vote_yes(msgctx_t *ctx); -static int vf_vote_no(msgctx_t *ctx); -static int vf_abort(msgctx_t *ctx); +static int vf_vote_yes(msgctx_t *ctx, uint32_t trans); +static int vf_vote_no(msgctx_t *ctx, uint32_t trans); +static int vf_abort(uint32_t trans); static int tv_cmp(struct timeval *left, struct timeval *right); /* Resolution */ -static int vf_try_commit(key_node_t *key_node); +static uint32_t vf_try_commit(key_node_t *key_node); int vf_init(int my_node_id, uint16_t my_port, vf_vote_cb_t vote_cb, vf_commit_cb_t commit_cb); @@ -111,7 +106,7 @@ vf_commit_cb_t commit_cb); int vf_write(cluster_member_list_t *memberhip, uint32_t flags, char *keyid, void *data, uint32_t datalen); -int vf_process_msg(int handle, generic_msg_hdr *msgp, int nbytes); +int vf_process_msg(msgctx_t *ctx, int nodeid, generic_msg_hdr *msgp, int nbytes); int vf_end(char *keyid); int vf_read(cluster_member_list_t *membership, char *keyid, uint64_t *view, void **data, uint32_t *datalen); @@ -123,14 +118,14 @@ struct vf_args { uint16_t port; int local_node_id; + msgctx_t *ctx; }; static int -send_to_all(msgctx_t *peer_ctx, int32_t command, int arg1, int arg2, int log_errors) +_send_simple(msgctx_t *ctx, int32_t command, int arg1, int arg2, int log_errors) { generic_msg_hdr hdr; - int x, rv = 0; hdr.gh_magic = GENERIC_HDR_MAGIC; hdr.gh_length = sizeof(hdr); @@ -140,51 +135,27 @@ swab_generic_msg_hdr(&hdr); - for (x=0; peer_ctx[x].type != -1; x++) { - if (msg_send(&peer_ctx[x], &hdr, sizeof(hdr)) == sizeof(hdr)) - continue; - - if (log_errors) { -#if 0 - clulog(LOG_ERR, "#14: Failed to send %d " - "bytes to %d!\n", sizeof(hdr), - x); -#endif - } - rv = -1; - } - - return rv; + return msg_send(ctx, &hdr, sizeof(hdr)); } static int -vf_send_abort(msgctx_t *ctx) +vf_send_abort(msgctx_t *ctx, uint32_t trans) { #ifdef DEBUG - printf("VF: Broadcasting ABORT\n"); + printf("VF: Broadcasting ABORT (X#%08x)\n", trans); #endif - return send_to_all(ctx, VF_MESSAGE, VF_ABORT, 0, 0); + return _send_simple(ctx, VF_MESSAGE, VF_ABORT, trans, 0); } static int -vf_send_commit(msgctx_t *ctx) +vf_send_commit(msgctx_t *ctx, uint32_t trans) { #ifdef DEBUG printf("VF: Broadcasting FORMED\n"); #endif - return send_to_all(ctx, VF_MESSAGE, VF_VIEW_FORMED, 0, 1); -} - - -static void -close_all(msgctx_t *ctx) -{ - int x; - for (x = 0; ctx[x].type != -1; x++) { - msg_close(&ctx[x]); - } + return _send_simple(ctx, VF_MESSAGE, VF_VIEW_FORMED, trans, 1); } @@ -202,14 +173,14 @@ static key_node_t * -kn_find_fd(msgctx_t *ctx) +kn_find_trans(uint32_t trans) { key_node_t *cur; view_node_t *curvn; for (cur = key_list; cur; cur = cur->kn_next) for (curvn = cur->kn_jvlist; curvn; curvn = curvn->vn_next) - if (curvn->vn_ctx == ctx) + if (curvn->vn_transaction == trans) return cur; return NULL; @@ -217,15 +188,17 @@ static int -vf_handle_join_view_msg(msgctx_t *ctx, vf_msg_t * hdrp) +vf_handle_join_view_msg(msgctx_t *ctx, int nodeid, vf_msg_t * hdrp) { struct timeval timeout; key_node_t *key_node; + uint32_t trans; + trans = hdrp->vm_msg.vf_transaction; #ifdef DEBUG - printf("VF_JOIN_VIEW from member #%d! Key: %s #%d\n", + printf("VF_JOIN_VIEW from member #%d! Key: %s #%d (X#%08x)\n", hdrp->vm_msg.vf_coordinator, hdrp->vm_msg.vf_keyid, - (int) hdrp->vm_msg.vf_view); + (int) hdrp->vm_msg.vf_view, trans); #endif pthread_mutex_lock(&key_list_mutex); @@ -241,7 +214,7 @@ pthread_mutex_unlock(&key_list_mutex); printf("VF: Error: Failed to initialize %s\n", hdrp->vm_msg.vf_keyid); - vf_vote_no(ctx); + vf_vote_no(ctx, trans); return VFR_ERROR; } @@ -258,7 +231,7 @@ #ifdef DEBUG printf("VF: Voting NO (via callback)\n"); #endif - vf_vote_no(ctx); + vf_vote_no(ctx, trans); return VFR_OK; } } @@ -269,12 +242,12 @@ timeout.tv_sec = key_node->kn_tsec; timeout.tv_usec = 0; - if (vf_buffer_join_msg(ctx, (vf_msg_t *) hdrp, &timeout)) { + if (vf_buffer_join_msg((vf_msg_t *) hdrp, &timeout)) { pthread_mutex_unlock(&key_list_mutex); #ifdef DEBUG - printf("VF: Voting YES\n"); + printf("VF: Voting YES (X#%08x)\n", trans); #endif - vf_vote_yes(ctx); + vf_vote_yes(ctx, trans); return VFR_OK; } @@ -282,7 +255,7 @@ #ifdef DEBUG printf("VF: Voting NO\n"); #endif - vf_vote_no(ctx); + vf_vote_no(ctx, trans); return VFR_NO; } @@ -297,13 +270,10 @@ int commits = 0; void *data; uint32_t datalen; - msgctx_t *ctx; + uint32_t trans; - while ((ctx = vf_try_commit(key_node)) != -1) { - - /* XXX in general, this shouldn't kill the fd... */ + while ((trans = vf_try_commit(key_node)) != 0) { commits++; - msg_close(commitfd); } if (key_node->kn_commit_cb) { @@ -327,12 +297,12 @@ static int -vf_unanimous(msgctx_t *peer_ctx, int remain, int timeout) +vf_unanimous(msgctx_t *mcast_ctx, int trans, int remain, + int timeout) { generic_msg_hdr response; struct timeval tv; - fd_set rfds; - int nready, x, max; + int x; /* Set up for the select */ tv.tv_sec = timeout; @@ -346,69 +316,62 @@ * Flag hosts which we received messages from so we don't * read a second message. */ - while (remain) { - FD_ZERO(&rfds); - for (x = 0; peer_ctx[x].type != M_NONE; x++) - msg_fd_set(&peer_ctx[x], &rfds, &max); - - nready = select(max + 1, &rfds, NULL, NULL, &tv); - if (nready <= -1) { - if (nready == 0) - printf("VF Abort: Timed out!\n"); - else - printf("VF Abort: %s\n", - strerror(errno)); - return 0; + while (remain && timeout) { + + if (msg_wait(mcast_ctx, 5) <= 0) { + --timeout; + continue; } - for (x = 0; (peer_ctx[x].type != M_NONE) && nready; x++) { - if (!msg_fd_isset(&peer_fds[x], &rfds)) - continue; + x = msg_receive(mcast_ctx, &response, sizeof(response), 1); + if (x < sizeof(response)) + continue; + + /* + * Decode & validate message + */ + swab_generic_msg_hdr(&response); + if ((response.gh_magic != GENERIC_HDR_MAGIC) || + (response.gh_command != VF_MESSAGE)) { + /* Don't process anything but votes */ + continue; + } - remain--; - nready--; - /* - * Get reply from node x. XXX 1 second timeout? - */ - if (msg_receive(&peer_ctx[x], &response, - sizeof(response), - 1) == -1) { - printf("VF: Abort: Timed out during " - "receive from fd #%p\n", &peer_ctx[x]); - return 0; - } - - /* - * Decode & validate message - */ - swab_generic_msg_hdr(&response); - if ((response.gh_magic != GENERIC_HDR_MAGIC) || - (response.gh_command != VF_MESSAGE) || - (response.gh_arg1 != VF_VOTE)) { - printf("VF: Abort: Invalid header in" - " reply from fd #%p\n", &peer_fds[x]); - return 0; - } - + if (vf_command(response.gh_arg1) != VF_VOTE) + /* Don't process anything but votes */ + continue; + + if (response.gh_arg2 != trans) + continue; + + /* + * If we get a 'NO', we are done. + */ + if (!(vf_flags(response.gh_arg1) & VFMF_AFFIRM)) { /* - * If we get a 'NO', we are done. + * XXX ok, it might be a mangled message; + * treat it as no anyway! */ - if (response.gh_arg2 != 1) { - /* - * XXX ok, it might be a mangled message; - * treat it as no anyway! - */ - printf("VF: Abort: fd #%d voted NO\n", - peer_fds[x]); - return 0; - } - #ifdef DEBUG - printf("VF: fd #%d voted YES\n", peer_fds[x]); + printf("VF: Abort: someone voted NO\n"); #endif + return 0; } + +#ifdef DEBUG + printf("VF: YES\n"); +#endif + --remain; } + if (remain) { +#ifdef DEBUG + printf("VF: Timed out waiting for %d responses\n", remain); +#endif + return 0; + } + + /* * Whohoooooooo! */ @@ -420,7 +383,7 @@ * ... */ static view_node_t * -vn_new(msgctx_t *ctx, uint32_t nodeid, int viewno, void *data, +vn_new(uint32_t trans, uint32_t nodeid, int viewno, void *data, uint32_t datalen) { view_node_t *new; @@ -433,7 +396,7 @@ memset(new,0,totallen); - new->vn_ctx = ctx; + new->vn_transaction = trans; new->vn_nodeid = nodeid; new->vn_viewno = viewno; new->vn_datalen = datalen; @@ -502,7 +465,7 @@ static view_node_t * -vn_remove(view_node_t **head, msgctx_t *ctx) +vn_remove(view_node_t **head, uint32_t trans) { view_node_t *cur = *head, *back = NULL; @@ -510,7 +473,7 @@ return NULL; do { - if (cur->vn_ctx == ctx) { + if (cur->vn_transaction == trans) { if (back) { back->vn_next = cur->vn_next; cur->vn_next = NULL; @@ -537,7 +500,7 @@ * (b) we don't receive any messages. */ static int -vf_buffer_join_msg(msgctx_t *ctx, vf_msg_t *hdr, struct timeval *timeout) +vf_buffer_join_msg(vf_msg_t *hdr, struct timeval *timeout) { key_node_t *key_node; view_node_t *newp; @@ -557,7 +520,8 @@ return 0; } - newp = vn_new(ctx, hdr->vm_msg.vf_coordinator, hdr->vm_msg.vf_view, + newp = vn_new(hdr->vm_msg.vf_transaction, hdr->vm_msg.vf_coordinator, + hdr->vm_msg.vf_view, hdr->vm_msg.vf_data, hdr->vm_msg.vf_datalen); if (timeout && (timeout->tv_sec || timeout->tv_usec)) { @@ -585,10 +549,10 @@ static int vc_cmp(commit_node_t *left, commit_node_t *right) { - if (left->vc_fd < right->vc_fd) + if (left->vc_transaction < right->vc_transaction) return -1; - if (left->vc_fd == right->vc_fd) + if (left->vc_transaction == right->vc_transaction) return 0; return 1; @@ -637,7 +601,7 @@ static commit_node_t * -vc_remove(commit_node_t **head, int fd) +vc_remove(commit_node_t **head, uint32_t trans) { commit_node_t *cur = *head, *back = NULL; @@ -645,7 +609,7 @@ return NULL; do { - if (cur->vc_fd == fd) { + if (cur->vc_transaction == trans) { if (back) { back->vc_next = cur->vc_next; cur->vc_next = NULL; @@ -671,13 +635,13 @@ * the last 'join-view' message. */ static int -vf_buffer_commit(int fd) +vf_buffer_commit(uint32_t trans) { key_node_t *key_node; commit_node_t *newp; int rv; - key_node = kn_find_fd(fd); + key_node = kn_find_trans(trans); if (!key_node) return 0; @@ -686,7 +650,7 @@ return 0; newp->vc_next = NULL; - newp->vc_fd = fd; + newp->vc_transaction = trans; rv = vc_insert_sorted(&key_node->kn_clist, newp); if (!rv) @@ -697,31 +661,32 @@ static int -vf_vote_yes(msgctx_t *ctx) +vf_vote_yes(msgctx_t *ctx, uint32_t trans) { - return msg_send_simple(ctx, VF_MESSAGE, VF_VOTE, 1); - + /* XXX */ + return _send_simple(ctx, VF_MESSAGE, VF_VOTE | VFMF_AFFIRM, trans, 0); } static int -vf_vote_no(msgctx_t *ctx) +vf_vote_no(msgctx_t *ctx, uint32_t trans) { - return msg_send_simple(ctx, VF_MESSAGE, VF_VOTE, 0); + /* XXX */ + return _send_simple(ctx, VF_MESSAGE, VF_VOTE, trans, 0); } static int -vf_abort(msgctx_t *ctx) +vf_abort(uint32_t trans) { key_node_t *key_node; view_node_t *cur; - key_node = kn_find_fd(ctx); + key_node = kn_find_trans(trans); if (!key_node) return -1; - cur = vn_remove(&key_node->kn_jvlist, ctx); + cur = vn_remove(&key_node->kn_jvlist, trans); if (!cur) return -1; @@ -787,30 +752,30 @@ /** * Try to commit views in a given key_node. */ -static msgctx_t * +static uint32_t vf_try_commit(key_node_t *key_node) { view_node_t *vnp; commit_node_t *cmp; - msgctx_t *ctx = NULL; + uint32_t trans = 0; if (!key_node) - return NULL; + return 0; if (!key_node->kn_jvlist) - return NULL; + return 0; - ctx = key_node->kn_jvlist->vn_ctx; + trans = key_node->kn_jvlist->vn_transaction; - cmp = vc_remove(&key_node->kn_clist, ctx); + cmp = vc_remove(&key_node->kn_clist, trans); if (!cmp) { /*printf("VF: Commit for fd%d not received yet!", fd);*/ - return NULL; + return 0; } free(cmp); /* no need for it any longer */ - vnp = vn_remove(&key_node->kn_jvlist, ctx); + vnp = vn_remove(&key_node->kn_jvlist, trans); if (!vnp) { /* * But, we know it was the first element on the list?!! @@ -835,46 +800,27 @@ memcpy(key_node->kn_data, vnp->vn_data, vnp->vn_datalen); free(vnp); - return ctx; + return trans; } void -vf_event_loop(int my_node_id) +vf_event_loop(msgctx_t *ctx, int my_node_id) { - int max, nready, n, fd, flags; - struct timeval tv; - fd_set rfds; + int n; generic_msg_hdr *hdrp = NULL; - FD_ZERO(&rfds); - max = msg_fill_fdset(&rfds, MSG_ALL, MSGP_VFS); - - tv.tv_sec = 1; - tv.tv_usec = 0; - nready = select(max + 1, &rfds, NULL, NULL, &tv); - if (nready <= 0) - return; - - while (nready) { - fd = msg_next_fd(&rfds); - --nready; + if (msg_wait(ctx, 3) != 0) { - flags = msg_get_flags(fd); - - if (flags & MSG_LISTEN) - fd = msg_accept(fd, 1, NULL); - - n = msg_receive_simple(fd, &hdrp, 5); + n = msg_receive_simple(ctx, &hdrp, 2); if (n <= 0 || !hdrp) { - msg_close(fd); - continue; + return; } swab_generic_msg_hdr(hdrp); if (hdrp->gh_command == VF_MESSAGE) { - if (vf_process_msg(fd, hdrp, n) == VFR_COMMIT) { + if (vf_process_msg(ctx, 0, hdrp, n) == VFR_COMMIT) { #ifdef DEBUG printf("VFT: View committed\n"); #endif @@ -893,7 +839,7 @@ vf_wait_ready(void) { pthread_mutex_lock(&vf_mutex); - while (!thread_ready) { + while (!vf_thread_ready) { pthread_mutex_unlock(&vf_mutex); usleep(50000); pthread_mutex_lock(&vf_mutex); @@ -908,12 +854,14 @@ int my_node_id; uint16_t port; key_node_t *cur; - int fd; + uint32_t trans; + msgctx_t *ctx; block_all_signals(); port = ((struct vf_args *)arg)->port; my_node_id = ((struct vf_args *)arg)->local_node_id; + ctx = ((struct vf_args *)arg)->ctx; free(arg); #ifdef DEBUG @@ -921,29 +869,20 @@ #endif pthread_mutex_lock(&vf_mutex); -#if 0 - if ((vf_lfd = msg_listen(port, MSGP_VFS, vf_lfds, 2)) <= 0) { - printf("Unable to set up listen socket on port %d\n", - port); - pthread_mutex_unlock(&vf_mutex); - pthread_exit(NULL); - } - - thread_ready = 1; + vf_thread_ready = 1; pthread_mutex_unlock(&vf_mutex); -#endif - while (1) { + while (vf_thread_ready) { pthread_mutex_lock(&key_list_mutex); for (cur = key_list; cur; cur = cur->kn_next) { /* Destroy timed-out join views */ - while (_vf_purge(cur, &fd) != VFR_NO) { - msg_close(fd); - } + while (_vf_purge(cur, &trans) != VFR_NO); } pthread_mutex_unlock(&key_list_mutex); - vf_event_loop(my_node_id); + vf_event_loop(ctx, my_node_id); } + + msg_close(ctx); return NULL; } @@ -952,40 +891,43 @@ /** * Initialize VF. Initializes the View Formation sub system. * @param my_node_id The node ID of the caller. - * @param my_port The port of the caller. * @return 0 on success, -1 on failure. */ int vf_init(int my_node_id, uint16_t my_port, vf_vote_cb_t vcb, vf_commit_cb_t ccb) { - struct vf_args *va; - + struct vf_args *args; + msgctx_t *ctx; if (my_node_id == (int)-1) return -1; + + while((ctx = msg_new_ctx()) == NULL) + sleep(1); - if (my_port == 0) - return -1; + while((args = malloc(sizeof(*args))) == NULL) + sleep(1); - pthread_mutex_lock(&vf_mutex); - if (vf_thread != (pthread_t)-1) { - pthread_mutex_unlock(&vf_mutex); - return 0; + if (msg_open(MSG_CLUSTER, 0, my_port, ctx, 1) < 0) { + free(ctx); + free(args); + return -1; } - va = malloc(sizeof(*va)); - va->local_node_id = my_node_id; - va->port = my_port; + args->port = my_port; + args->local_node_id = my_node_id; + args->ctx = ctx; - pthread_create(&vf_thread, NULL, vf_server, va); - /* Write/read needs this */ + pthread_mutex_lock(&vf_mutex); _port = my_port; _node_id = my_node_id; default_vote_cb = vcb; default_commit_cb = ccb; pthread_mutex_unlock(&vf_mutex); + pthread_create(&vf_thread, NULL, vf_server, args); + vf_wait_ready(); return 0; @@ -998,20 +940,14 @@ int vf_shutdown(void) { - int x; key_node_t *c_key; view_node_t *c_jv; commit_node_t *c_cn; pthread_mutex_lock(&vf_mutex); + vf_thread_ready = 0; pthread_cancel(vf_thread); pthread_join(vf_thread, NULL); - thread_ready = 0; - vf_thread = (pthread_t)0; - - for (x = 0 ; x < vf_lfd; x++) - msg_close(vf_lfds[x]); - _port = 0; _node_id = (int)-1; pthread_mutex_lock(&key_list_mutex); @@ -1019,7 +955,6 @@ while ((c_key = key_list) != NULL) { while ((c_jv = c_key->kn_jvlist) != NULL) { - msg_close(c_jv->vn_ctx); key_list->kn_jvlist = c_jv->vn_next; free(c_jv); } @@ -1120,7 +1055,7 @@ vf_msg_t * build_vf_data_message(int cmd, char *keyid, void *data, uint32_t datalen, - int viewno, uint32_t *retlen) + int viewno, int trans, uint32_t *retlen) { uint32_t totallen; vf_msg_t *msg; @@ -1142,6 +1077,7 @@ /* Data */ strncpy(msg->vm_msg.vf_keyid,keyid,sizeof(msg->vm_msg.vf_keyid)); + msg->vm_msg.vf_transaction = trans; msg->vm_msg.vf_datalen = datalen; msg->vm_msg.vf_coordinator = _node_id; msg->vm_msg.vf_view = viewno; @@ -1171,89 +1107,52 @@ vf_write(cluster_member_list_t *membership, uint32_t flags, char *keyid, void *data, uint32_t datalen) { - int nodeid; - msgctx_t *peer_ctx; - int count; + msgctx_t everyone; key_node_t *key_node; vf_msg_t *join_view; - int remain = 0, x, y, rv = 1; + int remain = 0, x, y, rv = VFR_ERROR; uint32_t totallen; +#ifdef DEBUG struct timeval start, end, dif; - void *lockp = NULL; +#endif + struct dlm_lksb lockp; int l; char lock_name[256]; + static uint32_t trans = 0; if (!data || !datalen || !keyid || !strlen(keyid) || !membership) return -1; + pthread_mutex_lock(&vf_mutex); + if (!trans) { + trans = _node_id << 16; + } + ++trans; + /* Obtain cluster lock on it. */ snprintf(lock_name, sizeof(lock_name), "usrm::vf"); - l = clu_lock_verbose(lock_name, CLK_EX, &lockp); + l = clu_lock(LKM_EXMODE, &lockp, 0, lock_name); if (l < 0) { - clu_unlock(lock_name, lockp); + clu_unlock(&lockp); pthread_mutex_unlock(&vf_mutex); return l; } - /* set to -1 */ - count = sizeof(msgctx_t) * (membership->cml_count + 1); - peer_ctx = malloc(count); - if(!peer_ctx) { - pthread_mutex_unlock(&vf_mutex); - return -1; - } - - memset(peer_ctx, 0, sizeof(msgctx_t) * (membership->cml_count +1)); - for (x = 0; x < (membership->cml_count + 1); x++) { - peer_ctx[x].type = M_NONE; - } +#ifdef DEBUG getuptime(&start); +#endif -retry_top: - /* - * Connect to everyone, except ourself. We separate this from the - * initial send cycle because the connect cycle can cause timeouts - * within the code - ie, if a node is down, it is likely the connect - * will take longer than the client is expecting to wait for the - * commit/abort messages! - * - * We assume we're up. Since challenge-response needs both - * processes to be operational... - */ + remain = 0; for (x = 0, y = 0; x < membership->cml_count; x++) { - if (!memb_online(membership, - membership->cml_members[x].cn_nodeid)) { - continue; + if (membership->cml_members[x].cn_member) { + remain++; } + } - if (peer_fds[x].type != M_NONE) - continue; - - nodeid = membership->cml_members[x].cn_nodeid; -#ifdef DEBUG - printf("VF: Connecting to member #%d\n", (int)nodeid); - fflush(stdout); -#endif - - if (msg_open(nodeid, _port, &peer_ctx[y], 15) != 0) { #ifdef DEBUG - printf("VF: Connect to %d failed: %s\n", (int)nodeid, - strerror(errno)); + printf("aight, need responses from %d guys\n", remain); #endif - if (flags & VFF_RETRY) - goto retry_top; - if (flags & VFF_IGN_CONN_ERRORS) - continue; - free(peer_ctx); - - clu_unlock(lock_name, lockp); - pthread_mutex_unlock(&vf_mutex); - return -1; - } - - ++y; - } pthread_mutex_lock(&key_list_mutex); key_node = kn_find_key(keyid); @@ -1261,7 +1160,7 @@ if ((vf_key_init_nt(keyid, 10, NULL, NULL) < 0)) { pthread_mutex_unlock(&key_list_mutex); - clu_unlock(lock_name, lockp); + clu_unlock(&lockp); pthread_mutex_unlock(&vf_mutex); return -1; } @@ -1270,19 +1169,19 @@ } join_view = build_vf_data_message(VF_JOIN_VIEW, keyid, data, datalen, - key_node->kn_viewno+1, &totallen); + key_node->kn_viewno+1, trans, &totallen); pthread_mutex_unlock(&key_list_mutex); if (!join_view) { - clu_unlock(lock_name, lockp); + clu_unlock(&lockp); pthread_mutex_unlock(&vf_mutex); return -1; } #ifdef DEBUG - printf("VF: Push %d.%d #%d\n", (int)_node_id, getpid(), - (int)join_view->vm_msg.vf_view); + printf("VF: Push %d.%d #%d (X#%08x)\n", (int)_node_id, getpid(), + (int)join_view->vm_msg.vf_view, trans); #endif /* * Encode the package. @@ -1292,50 +1191,53 @@ /* * Send our message to everyone */ - for (x = 0; peer_ctx[x].type != M_NONE; x++) { - - if (msg_send(&peer_ctx[x], join_view, totallen) != totallen) { - vf_send_abort(peer_ctx); - close_all(peer_ctx); - - free(join_view); - clu_unlock(lock_name, lockp); - pthread_mutex_unlock(&vf_mutex); - return -1; - } - - remain++; + if (msg_open(MSG_CLUSTER, 0, _port, &everyone, 0) < 0) { + printf("msg_open: fail: %s\n", strerror(errno)); + return -1; } + x = msg_send(&everyone, join_view, totallen); + if (x < totallen) { + vf_send_abort(&everyone, trans); +#ifdef DEBUG + printf("VF: Aborted: Send failed (%d/%d)\n", x, totallen); +#endif + msg_close(&everyone); + free(join_view); + clu_unlock(&lockp); + pthread_mutex_unlock(&vf_mutex); + return -1; + } + #ifdef DEBUG printf("VF: Checking for consensus...\n"); #endif /* * See if we have a consensus =) */ - if ((rv = (vf_unanimous(peer_ctx, remain, VF_COORD_TIMEOUT)))) { - vf_send_commit(peer_ctx); + if ((rv = (vf_unanimous(&everyone, trans, remain, + 5)))) { + vf_send_commit(&everyone, trans); +#ifdef DEBUG + printf("VF: Consensus reached!\n"); +#endif } else { - vf_send_abort(peer_ctx); + vf_send_abort(&everyone, trans); #ifdef DEBUG printf("VF: Aborted!\n"); #endif } /* - * Clean up - */ - close_all(peer_fds); - - /* * unanimous returns 1 for true; 0 for false, so negate it and * return our value... */ + msg_close(&everyone); free(join_view); - free(peer_fds); - clu_unlock(lock_name, lockp); + clu_unlock(&lockp); pthread_mutex_unlock(&vf_mutex); +#ifdef DEBUG if (rv) { getuptime(&end); @@ -1347,11 +1249,10 @@ dif.tv_sec--; } -#ifdef DEBUG printf("VF: Converge Time: %d.%06d\n", (int)dif.tv_sec, (int)dif.tv_usec); -#endif } +#endif return (rv?0:-1); } @@ -1378,12 +1279,12 @@ * VFR_COMMIT if new views were committed. */ static int -_vf_purge(key_node_t *key_node, msgctx_t **ctx) +_vf_purge(key_node_t *key_node, uint32_t *trans) { view_node_t *cur, *dead = NULL; struct timeval tv; - *fd = -1; + *trans = 0; if (!key_node) return VFR_NO; @@ -1401,11 +1302,11 @@ if (tv_cmp(&tv, &cur->vn_timeout) < 0) continue; - *ctx = cur->vn_ctx; - dead = vn_remove(&key_node->kn_jvlist, *ctx); + *trans = cur->vn_transaction; + dead = vn_remove(&key_node->kn_jvlist, *trans); free(dead); - printf("VF: Killed ctx %p\n", *ctx); + printf("VF: Killed transaction %08x\n", *trans); /* * returns the removed associated file descriptor * so that we can close it and get on with life @@ -1413,7 +1314,7 @@ break; } - if (*fd == -1) + if (*trans == 0) return VFR_NO; if (vf_resolve_views(key_node)) @@ -1425,13 +1326,13 @@ /** * Process a VF message. * - * @param handle File descriptor on which msgp was received. + * @param nodeid Node id from which msgp was received. * @param msgp Pointer to already-received message. * @param nbytes Length of msgp. * @return -1 on failure, 0 on success. */ int -vf_process_msg(int handle, generic_msg_hdr *msgp, int nbytes) +vf_process_msg(msgctx_t *ctx, int nodeid, generic_msg_hdr *msgp, int nbytes) { vf_msg_t *hdrp; int ret; @@ -1440,7 +1341,7 @@ (msgp->gh_command != VF_MESSAGE)) return VFR_ERROR; - switch(msgp->gh_arg1) { + switch(vf_command(msgp->gh_arg1)) { case VF_CURRENT: #ifdef DEBUG printf("VF: Received request for current data\n"); @@ -1455,7 +1356,7 @@ hdrp = (vf_msg_t *)msgp; swab_vf_msg_info_t(&hdrp->vm_msg); - return vf_send_current(handle, hdrp->vm_msg.vf_keyid); + return vf_send_current(ctx, hdrp->vm_msg.vf_keyid); case VF_JOIN_VIEW: /* Validate size... */ @@ -1475,28 +1376,28 @@ return VFR_ERROR; } - return vf_handle_join_view_msg(handle, hdrp); + return vf_handle_join_view_msg(ctx, nodeid, hdrp); case VF_ABORT: - printf("VF: Received VF_ABORT, fd%d\n", handle); - vf_abort(handle); + printf("VF: Received VF_ABORT (X#%08x)\n", msgp->gh_arg2); + vf_abort(msgp->gh_arg2); return VFR_ABORT; case VF_VIEW_FORMED: #ifdef DEBUG - printf("VF: Received VF_VIEW_FORMED, fd%d\n", - handle); + printf("VF: Received VF_VIEW_FORMED, %d\n", + nodeid); #endif pthread_mutex_lock(&key_list_mutex); - vf_buffer_commit(handle); - ret = (vf_resolve_views(kn_find_fd(handle)) ? + vf_buffer_commit(msgp->gh_arg2); + ret = (vf_resolve_views(kn_find_trans(msgp->gh_arg2)) ? VFR_COMMIT : VFR_OK); pthread_mutex_unlock(&key_list_mutex); return ret; - + default: - printf("VF: Unknown msg type 0x%08x\n", - msgp->gh_arg1); + /* Ignore votes and the like from this part */ + break; } return VFR_OK; @@ -1521,15 +1422,15 @@ { key_node_t *key_node; char lock_name[256]; - void *lockp = NULL; + struct dlm_lksb lockp; int l; /* Obtain cluster lock on it. */ pthread_mutex_lock(&vf_mutex); snprintf(lock_name, sizeof(lock_name), "usrm::vf"); - l = clu_lock_verbose(lock_name, CLK_EX, &lockp); + l = clu_lock(LKM_EXMODE, &lockp, 0, lock_name); if (l < 0) { - clu_unlock(lock_name, lockp); + clu_unlock(&lockp); pthread_mutex_unlock(&vf_mutex); printf("Couldn't lock %s\n", keyid); return l; @@ -1542,7 +1443,7 @@ if (!key_node) { if ((vf_key_init_nt(keyid, 10, NULL, NULL) < 0)) { pthread_mutex_unlock(&key_list_mutex); - clu_unlock(lock_name, lockp); + clu_unlock(&lockp); pthread_mutex_unlock(&vf_mutex); printf("Couldn't locate %s\n", keyid); return VFR_ERROR; @@ -1564,7 +1465,7 @@ pthread_mutex_unlock(&key_list_mutex); if (!membership) { - clu_unlock(lock_name, lockp); + clu_unlock(&lockp); //printf("Membership NULL, can't find %s\n", keyid); pthread_mutex_unlock(&vf_mutex); return VFR_ERROR; @@ -1573,7 +1474,7 @@ l = vf_request_current(membership, keyid, view, data, datalen); if (l == VFR_NODATA || l == VFR_ERROR) { - clu_unlock(lock_name, lockp); + clu_unlock(&lockp); //printf("Requesting current failed %s %d\n", keyid, l); pthread_mutex_unlock(&vf_mutex); return l; @@ -1583,7 +1484,7 @@ *data = malloc(key_node->kn_datalen); if (! *data) { pthread_mutex_unlock(&key_list_mutex); - clu_unlock(lock_name, lockp); + clu_unlock(&lockp); pthread_mutex_unlock(&vf_mutex); printf("Couldn't malloc %s\n", keyid); return VFR_ERROR; @@ -1594,7 +1495,7 @@ *view = key_node->kn_viewno; pthread_mutex_unlock(&key_list_mutex); - clu_unlock(lock_name, lockp); + clu_unlock(&lockp); pthread_mutex_unlock(&vf_mutex); return VFR_OK; @@ -1659,7 +1560,7 @@ if (!key_node || !key_node->kn_data || !key_node->kn_datalen) { pthread_mutex_unlock(&key_list_mutex); printf("VFT: No data for keyid %s\n", keyid); - return (msg_send_simple(ctx, VF_NACK, 0, 0) != -1)? + return (_send_simple(ctx, VF_NACK, 0, 0, 0) != -1)? VFR_OK : VFR_ERROR; } @@ -1670,11 +1571,12 @@ msg = build_vf_data_message(VF_ACK, keyid, key_node->kn_data, key_node->kn_datalen, key_node->kn_viewno, + 0, &totallen); pthread_mutex_unlock(&key_list_mutex); if (!msg) - return (msg_send_simple(ctx, VFR_ERROR, 0, 0) != -1)? + return (_send_simple(ctx, VFR_ERROR, 0, 0, 0) != -1)? VFR_OK : VFR_ERROR; swab_vf_msg_t(msg); @@ -1733,7 +1635,7 @@ */ static int vf_request_current(cluster_member_list_t *membership, char *keyid, - int *viewno, void **data, uint32_t *datalen) + uint64_t *viewno, void **data, uint32_t *datalen) { int x, n, rv = VFR_OK, port; msgctx_t ctx; @@ -1761,8 +1663,7 @@ swab_vf_msg_info_t(&(msg->vm_msg)); for (x = 0; x < membership->cml_count; x++) { - if (!memb_online(membership, - membership->cml_members[x].cn_nodeid)) + if (!membership->cml_members[x].cn_member) continue; /* Can't request from self. */ @@ -1770,10 +1671,11 @@ continue; rv = VFR_ERROR; - fd = msg_open(membership->cml_members[x].cn_nodeid, port, - &ctx, 15); - if (fd == -1) + if (msg_open(MSG_CLUSTER, + membership->cml_members[x].cn_nodeid, + port, &ctx, 15) < 0) { continue; + } msg = &rmsg; //printf("VF: Requesting current value of %s from %d\n", /cvs/cluster/cluster/rgmanager/src/daemons/rg_event.c,v --> standard output revision 1.1 --- cluster/rgmanager/src/daemons/rg_event.c +++ - 2006-07-11 23:52:45.033199000 +0000 @@ -0,0 +1,103 @@ +/* + Copyright Red Hat, Inc. 2006 + + This program is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by the + Free Software Foundation; either version 2, or (at your option) any + later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 675 Mass Ave, Cambridge, + MA 02139, USA. +*/ +#include +#include +#include +#include +#include +#include +#include + +typedef struct __rge_q { + list_head(); + char rg_name[128]; + uint32_t rg_state; + int rg_owner; +} rgevent_t; + + +/** + * resource group event queue. + */ +static rgevent_t *rg_ev_queue = NULL; +static pthread_mutex_t rg_queue_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_t rg_ev_thread = 0; + +void group_event(char *name, uint32_t state, int owner); + + +void * +rg_event_thread(void *arg) +{ + rgevent_t *ev; + + while (1) { + pthread_mutex_lock(&rg_queue_mutex); + ev = rg_ev_queue; + if (ev) + list_remove(&rg_ev_queue, ev); + else + break; /* We're outta here */ + pthread_mutex_unlock(&rg_queue_mutex); + + group_event(ev->rg_name, ev->rg_state, ev->rg_owner); + + free(ev); + } + + /* Mutex held */ + rg_ev_thread = 0; + pthread_mutex_unlock(&rg_queue_mutex); + return NULL; +} + + +void +rg_event_q(char *name, uint32_t state, int owner) +{ + rgevent_t *ev; + pthread_attr_t attrs; + + while (1) { + ev = malloc(sizeof(rgevent_t)); + if (ev) { + break; + } + sleep(1); + } + + memset(ev,0,sizeof(*ev)); + + strncpy(ev->rg_name, name, 128); + ev->rg_state = state; + ev->rg_owner = owner; + + pthread_mutex_lock (&rg_queue_mutex); + list_insert(&rg_ev_queue, ev); + if (rg_ev_thread == 0) { + pthread_attr_init(&attrs); + pthread_attr_setinheritsched(&attrs, PTHREAD_INHERIT_SCHED); + pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED); + pthread_attr_setstacksize(&attrs, 262144); + + pthread_create(&rg_ev_thread, &attrs, rg_event_thread, NULL); + pthread_attr_destroy(&attrs); + } + pthread_mutex_unlock (&rg_queue_mutex); +} --- cluster/rgmanager/src/daemons/Makefile 2006/06/02 17:37:10 1.12 +++ cluster/rgmanager/src/daemons/Makefile 2006/07/11 23:52:41 1.13 @@ -41,8 +41,8 @@ clurgmgrd: rg_thread.o rg_locks.o main.o groups.o \ rg_queue.o rg_forward.o reslist.o \ resrules.o restree.o fo_domain.o nodeevent.o \ - watchdog.o rg_state.o - $(CC) -o $@ $^ $(INCLUDE) $(CFLAGS) $(LDFLAGS) -lccs -lcman + rg_event.o watchdog.o rg_state.o ../clulib/libclulib.a + $(CC) -o $@ $^ $(INCLUDE) $(CFLAGS) $(LDFLAGS) -lccs -lcman -lpthread -ldlm # # Our test program links against the local allocator so that --- cluster/rgmanager/src/daemons/fo_domain.c 2006/06/02 17:37:10 1.8 +++ cluster/rgmanager/src/daemons/fo_domain.c 2006/07/11 23:52:41 1.9 @@ -334,7 +334,7 @@ int found = 0; int owned_by_node = 0, started = 0, no_owner = 0; rg_state_t svc_state; - void *lockp; + struct dlm_lksb lockp; ENTER(); @@ -414,10 +414,10 @@ */ clulog(LOG_WARNING, "Problem getting state information for " "%s\n", rg_name); - rg_unlock(rg_name, lockp); + rg_unlock(&lockp); RETURN(FOD_BEST); } - rg_unlock(rg_name, lockp); + rg_unlock(&lockp); /* * Check to see if the service is started and if we are the owner in case of --- cluster/rgmanager/src/daemons/groups.c 2006/06/02 17:37:10 1.18 +++ cluster/rgmanager/src/daemons/groups.c 2006/07/11 23:52:41 1.19 @@ -29,9 +29,12 @@ #include #include -#define cm_svccount cm_pad[0] /* Theses are uint8_t size */ -#define cm_svcexcl cm_pad[1] +/* Use address field in this because we never use it internally, + and there is no extra space in the cman_node_t type. + */ +#define cn_svccount cn_address.cna_address[0] /* Theses are uint8_t size */ +#define cn_svcexcl cn_address.cna_address[1] static int config_version = 0; static resource_t *_resources = NULL; @@ -42,6 +45,9 @@ pthread_mutex_t config_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_rwlock_t resource_lock = PTHREAD_RWLOCK_INITIALIZER; +void res_build_name(char *, size_t, resource_t *); +int get_rg_state_local(char *, rg_state_t *); + struct status_arg { msgctx_t *ctx; @@ -71,17 +77,16 @@ int count_resource_groups(cluster_member_list_t *ml) { -#if 0 resource_t *res; char *rgname, *val; int x; rg_state_t st; - void *lockp; + struct dlm_lksb lockp; cman_node_t *mp; for (x = 0; x < ml->cml_count; x++) { - ml->cml_members[x].cm_svccount = 0; - ml->cml_members[x].cm_svcexcl = 0; + ml->cml_members[x].cn_svccount = 0; + ml->cml_members[x].cn_svcexcl = 0; } pthread_rwlock_rdlock(&resource_lock); @@ -102,11 +107,11 @@ if (get_rg_state(rgname, &st) < 0) { clulog(LOG_ERR, "#34: Cannot get status " "for service %s\n", rgname); - rg_unlock(rgname, lockp); + rg_unlock(&lockp); continue; } - rg_unlock(rgname, lockp); + rg_unlock(&lockp); if (st.rs_state != RG_STATE_STARTED && st.rs_state != RG_STATE_STARTING) @@ -116,18 +121,17 @@ if (!mp) continue; - ++mp->cm_svccount; + ++mp->cn_svccount; val = res_attr_value(res, "exclusive"); if (val && ((!strcmp(val, "yes") || (atoi(val)>0))) ) { - ++mp->cm_svcexcl; + ++mp->cn_svcexcl; } } while (!list_done(&_resources, res)); pthread_rwlock_unlock(&resource_lock); -#endif return 0; } @@ -189,13 +193,13 @@ if (exclusive) { - if (0) {//(allowed->cml_members[x].cm_svccount > 0) { + if (allowed->cml_members[x].cn_svccount > 0) { /* Definitely not this guy */ continue; } else { score += 2; } - } else if (0) { //(allowed->cml_members[x].cm_svcexcl) { + } else if (allowed->cml_members[x].cn_svcexcl) { /* This guy has an exclusive resource group. Can't relocate / failover to him. */ continue; @@ -212,6 +216,42 @@ } +int +check_depend(resource_t *res) +{ + char *val; + rg_state_t rs; + + val = res_attr_value(res, "depend"); + if (!val) + /* No dependency */ + return -1; + + if (get_rg_state_local(val, &rs) == 0) + return (rs.rs_state == RG_STATE_STARTED); + + return 1; +} + + +int +check_depend_safe(char *rg_name) +{ + resource_t *res; + int ret; + + pthread_rwlock_rdlock(&resource_lock); + res = find_root_by_ref(&_resources, rg_name); + if (!res) + return -1; + + ret = check_depend(res); + pthread_rwlock_unlock(&resource_lock); + + return ret; +} + + /** Start or failback a resource group: if it's not running, start it. If it is running and we're a better member to run it, then ask for @@ -224,7 +264,7 @@ char *val; cman_node_t *mp; int autostart, exclusive; - void *lockp; + struct dlm_lksb lockp; mp = memb_id_to_p(membership, my_id()); assert(mp); @@ -267,7 +307,7 @@ if (get_rg_state(svcName, svcStatus) != 0) { clulog(LOG_ERR, "#34: Cannot get status " "for service %s\n", svcName); - rg_unlock(svcName, lockp); + rg_unlock(&lockp); return; } @@ -277,17 +317,25 @@ set_rg_state(svcName, svcStatus); } - rg_unlock(svcName, lockp); + rg_unlock(&lockp); return; } } + /* See if service this one depends on is running. If not, + don't start it */ + if (check_depend(node->rn_resource) == 0) { + clulog(LOG_DEBUG, + "Skipping RG %s: Dependency missing\n", svcName); + return; + } + val = res_attr_value(node->rn_resource, "exclusive"); exclusive = val && ((!strcmp(val, "yes") || (atoi(val)>0))); - if (0) { //(exclusive && mp->cm_svccount) { - clulog(LOG_INFO, + if (exclusive && mp->cn_svccount) { + clulog(LOG_DEBUG, "Skipping RG %s: Exclusive and I am running services\n", svcName); return; @@ -297,8 +345,8 @@ Don't start other services if I'm running an exclusive service. */ - if (0) { //(mp->cm_svcexcl) { - clulog(LOG_INFO, + if (mp->cn_svcexcl) { + clulog(LOG_DEBUG, "Skipping RG %s: I am running an exclusive service\n", svcName); return; @@ -363,8 +411,8 @@ int eval_groups(int local, uint64_t nodeid, int nodeStatus) { - void *lockp = NULL; - char *svcName, *nodeName; + struct dlm_lksb lockp; + char svcName[64], *nodeName; resource_node_t *node; rg_state_t svcStatus; cluster_member_list_t *membership; @@ -385,10 +433,7 @@ list_do(&_tree, node) { - if (node->rn_resource->r_rule->rr_root == 0) - continue; - - svcName = node->rn_resource->r_attrs->ra_value; + res_build_name(svcName, sizeof(svcName), node->rn_resource); /* * Lock the service information and get the current service @@ -407,11 +452,11 @@ clulog(LOG_ERR, "#34: Cannot get status for service %s\n", svcName); - rg_unlock(svcName, lockp); + rg_unlock(&lockp); continue; } - rg_unlock(svcName, lockp); + rg_unlock(&lockp); if (svcStatus.rs_owner == 0) nodeName = "none"; @@ -461,7 +506,7 @@ pthread_rwlock_unlock(&resource_lock); free_member_list(membership); - clulog(LOG_INFO, "Event (%d:%d:%d) Processed\n", local, + clulog(LOG_DEBUG, "Event (%d:%d:%d) Processed\n", local, (int)nodeid, nodeStatus); return 0; @@ -469,6 +514,105 @@ /** + * Called to decide what services to start locally after a service event. + * + * @see eval_groups + */ +int +group_event(char *rg_name, uint32_t state, int owner) +{ + char svcName[64], *nodeName; + resource_node_t *node; + rg_state_t svcStatus; + cluster_member_list_t *membership; + int depend; + + if (rg_locked()) { + clulog(LOG_NOTICE, + "Resource groups locked; not evaluating\n"); + return -EAGAIN; + } + + membership = member_list(); + if (!membership) + return -1; + + pthread_rwlock_rdlock(&resource_lock); + + /* Requires read lock */ + count_resource_groups(membership); + + list_do(&_tree, node) { + + res_build_name(svcName, sizeof(svcName), node->rn_resource); + + if (get_rg_state_local(svcName, &svcStatus) != 0) + continue; + + if (svcStatus.rs_owner == 0) + nodeName = "none"; + else + nodeName = memb_id_to_name(membership, + svcStatus.rs_owner); + + /* Disabled/failed/in recovery? Do nothing */ + if ((svcStatus.rs_state == RG_STATE_DISABLED) || + (svcStatus.rs_state == RG_STATE_FAILED) || + (svcStatus.rs_state == RG_STATE_RECOVER)) { + continue; + } + + depend = check_depend(node->rn_resource); + + /* Skip if we have no dependency */ + if (depend == -1) + continue; + + /* + If we have: + (a) a met dependency + (b) we're in the STOPPED state, and + (c) our new service event is a started service + + Then see if we should start this other service as well. + */ + if (depend == 1 && + svcStatus.rs_state == RG_STATE_STOPPED && + state == RG_STATE_STARTED) { + + clulog(LOG_DEBUG, "Evaluating RG %s, state %s, owner " + "%s\n", svcName, + rg_state_str(svcStatus.rs_state), + nodeName); + consider_start(node, svcName, &svcStatus, membership); + continue; + } + + /* + If we lost a dependency for this service and it's running + locally, stop it. + */ + if (depend == 0 && + svcStatus.rs_state == RG_STATE_STARTED && + svcStatus.rs_owner == my_id()) { + + clulog(LOG_WARNING, "Stopping service %s: Dependency missing\n", + svcName); + rt_enqueue_request(svcName, RG_STOP, NULL, 0, my_id(), + 0, 0); + } + + } while (!list_done(&_tree, node)); + + pthread_rwlock_unlock(&resource_lock); + free_member_list(membership); + + return 0; +} + + + +/** Perform an operation on a resource group. That is, walk down the tree for that resource group, performing the given operation on all children in the necessary order. @@ -576,12 +720,11 @@ @param rgname Resource group name whose state we want to send. @see send_rg_states */ -int get_rg_state_local(char *, rg_state_t *); void send_rg_state(msgctx_t *ctx, char *rgname, int fast) { rg_state_msg_t msg, *msgp = &msg; - void *lockp; + struct dlm_lksb lockp; msgp->rsm_hdr.gh_magic = GENERIC_HDR_MAGIC; msgp->rsm_hdr.gh_length = sizeof(msg); @@ -594,10 +737,10 @@ if (rg_lock(rgname, &lockp) < 0) return; if (get_rg_state(rgname, &msgp->rsm_state) < 0) { - rg_unlock(rgname, lockp); + rg_unlock(&lockp); return; } - rg_unlock(rgname, lockp); + rg_unlock(&lockp); } swab_rg_state_msg_t(msgp); @@ -616,19 +759,19 @@ { msgctx_t *ctx = ((struct status_arg *)arg)->ctx; int fast = ((struct status_arg *)arg)->fast; - resource_t *res; + resource_node_t *node; generic_msg_hdr hdr; + char rg[64]; free(arg); pthread_rwlock_rdlock(&resource_lock); - list_do(&_resources, res) { - if (res->r_rule->rr_root == 0) - continue; + list_do(&_tree, node) { - send_rg_state(ctx, res->r_attrs[0].ra_value, fast); - } while (!list_done(&_resources, res)); + res_build_name(rg, sizeof(rg), node->rn_resource); + send_rg_state(ctx, rg, fast); + } while (!list_done(&_tree, node)); pthread_rwlock_unlock(&resource_lock); @@ -637,8 +780,8 @@ /* XXX wait for client to tell us it's done; I don't know why this is needed when doing fast I/O, but it is. */ msg_receive(ctx, &hdr, sizeof(hdr), 10); - msg_close(ctx); + msg_free_ctx(ctx); return NULL; } @@ -681,21 +824,20 @@ int svc_exists(char *svcname) { - resource_t *res; + resource_node_t *node; int ret = 0; + char rg[64]; pthread_rwlock_rdlock(&resource_lock); - list_do(&_resources, res) { - if (res->r_rule->rr_root == 0) - continue; + list_do(&_tree, node) { + res_build_name(rg, sizeof(rg), node->rn_resource); - if (strcmp(res->r_attrs[0].ra_value, - svcname) == 0) { + if (strcmp(rg, svcname) == 0) { ret = 1; break; } - } while (!list_done(&_resources, res)); + } while (!list_done(&_tree, node)); pthread_rwlock_unlock(&resource_lock); @@ -707,25 +849,22 @@ rg_doall(int request, int block, char *debugfmt) { resource_node_t *curr; - char *name; rg_state_t svcblk; + char rg[64]; pthread_rwlock_rdlock(&resource_lock); list_do(&_tree, curr) { - if (curr->rn_resource->r_rule->rr_root == 0) - continue; - /* Group name */ - name = curr->rn_resource->r_attrs->ra_value; + res_build_name(rg, sizeof(rg), curr->rn_resource); if (debugfmt) - clulog(LOG_DEBUG, debugfmt, name); + clulog(LOG_DEBUG, debugfmt, rg); /* Optimization: Don't bother even queueing the request during the exit case if we don't own it */ if (request == RG_STOP_EXITING) { - if (get_rg_state_local(name, &svcblk) < 0) + if (get_rg_state_local(rg, &svcblk) < 0) continue; /* Always run stop if we're the owner, regardless @@ -734,7 +873,7 @@ continue; } - rt_enqueue_request(name, request, NULL, 0, + rt_enqueue_request(rg, request, NULL, 0, 0, 0, 0); } while (!list_done(&_tree, curr)); @@ -755,21 +894,17 @@ q_status_checks(void *arg) { resource_node_t *curr; - char *name; rg_state_t svcblk; - void *lockp; + char rg[64]; pthread_rwlock_rdlock(&resource_lock); list_do(&_tree, curr) { - if (curr->rn_resource->r_rule->rr_root == 0) - continue; - /* Group name */ - name = curr->rn_resource->r_attrs->ra_value; + res_build_name(rg, sizeof(rg), curr->rn_resource); /* Local check - no one will make us take a service */ - if (get_rg_state_local(name, &svcblk) < 0) { + if (get_rg_state_local(rg, &svcblk) < 0) { continue; } @@ -777,7 +912,7 @@ svcblk.rs_state != RG_STATE_STARTED) continue; - rt_enqueue_request(name, RG_STATUS, + rt_enqueue_request(rg, RG_STATUS, NULL, 0, 0, 0, 0); } while (!list_done(&_tree, curr)); @@ -811,24 +946,20 @@ do_condstops(void) { resource_node_t *curr; - char *name; rg_state_t svcblk; int need_kill; - void *lockp; + char rg[64]; clulog(LOG_INFO, "Stopping changed resources.\n"); pthread_rwlock_rdlock(&resource_lock); list_do(&_tree, curr) { - if (curr->rn_resource->r_rule->rr_root == 0) - continue; - /* Group name */ - name = curr->rn_resource->r_attrs->ra_value; + res_build_name(rg, sizeof(rg), curr->rn_resource); /* If we're not running it, no need to CONDSTOP */ - if (get_rg_state_local(name, &svcblk) < 0) { + if (get_rg_state_local(rg, &svcblk) < 0) { continue; } @@ -839,10 +970,10 @@ need_kill = 0; if (curr->rn_resource->r_flags & RF_NEEDSTOP) { need_kill = 1; - clulog(LOG_DEBUG, "Removing %s\n", name); + clulog(LOG_DEBUG, "Removing %s\n", rg); } - rt_enqueue_request(name, need_kill ? RG_DISABLE : RG_CONDSTOP, + rt_enqueue_request(rg, need_kill ? RG_DISABLE : RG_CONDSTOP, NULL, 0, 0, 0, 0); } while (!list_done(&_tree, curr)); @@ -859,10 +990,10 @@ do_condstarts(void) { resource_node_t *curr; - char *name, *val; + char rg[64], *val; rg_state_t svcblk; int need_init, new_groups = 0, autostart; - void *lockp; + struct dlm_lksb lockp; clulog(LOG_INFO, "Starting changed resources.\n"); @@ -870,31 +1001,26 @@ pthread_rwlock_rdlock(&resource_lock); list_do(&_tree, curr) { - if (curr->rn_resource->r_rule->rr_root == 0) - continue; - /* Group name */ - name = curr->rn_resource->r_attrs->ra_value; + res_build_name(rg, sizeof(rg), curr->rn_resource); /* New RG. We'll need to initialize it. */ need_init = 0; if (curr->rn_resource->r_flags & RF_NEEDSTART) need_init = 1; - if (get_rg_state_local(name, &svcblk) < 0) { + if (get_rg_state_local(rg, &svcblk) < 0) continue; - } - if (!need_init && svcblk.rs_owner != my_id()) { + if (!need_init && svcblk.rs_owner != my_id()) continue; - } if (need_init) { ++new_groups; - clulog(LOG_DEBUG, "Initializing %s\n", name); + clulog(LOG_DEBUG, "Initializing %s\n", rg); } - rt_enqueue_request(name, need_init ? RG_INIT : RG_CONDSTART, + rt_enqueue_request(rg, need_init ? RG_INIT : RG_CONDSTART, NULL, 0, 0, 0, 0); } while (!list_done(&_tree, curr)); @@ -909,21 +1035,18 @@ pthread_rwlock_rdlock(&resource_lock); list_do(&_tree, curr) { - if (curr->rn_resource->r_rule->rr_root == 0) - continue; - /* Group name */ - name = curr->rn_resource->r_attrs->ra_value; + res_build_name(rg, sizeof(rg), curr->rn_resource); /* New RG. We'll need to initialize it. */ if (!(curr->rn_resource->r_flags & RF_NEEDSTART)) continue; - if (rg_lock(name, &lockp) != 0) + if (rg_lock(rg, &lockp) != 0) continue; - if (get_rg_state(name, &svcblk) < 0) { - rg_unlock(name, lockp); + if (get_rg_state(rg, &svcblk) < 0) { + rg_unlock(&lockp); continue; } @@ -933,7 +1056,7 @@ a truly new service, it will be in the UNINITIALIZED state, which will be caught by eval_groups. */ if (svcblk.rs_state != RG_STATE_DISABLED) { - rg_unlock(name, lockp); + rg_unlock(&lockp); continue; } @@ -946,9 +1069,9 @@ else svcblk.rs_state = RG_STATE_DISABLED; - set_rg_state(name, &svcblk); + set_rg_state(rg, &svcblk); - rg_unlock(name, lockp); + rg_unlock(&lockp); } while (!list_done(&_tree, curr)); pthread_rwlock_unlock(&resource_lock); --- cluster/rgmanager/src/daemons/main.c 2006/06/02 17:37:10 1.25 +++ cluster/rgmanager/src/daemons/main.c 2006/07/11 23:52:41 1.26 @@ -31,21 +31,23 @@ #include #include #include +#include #include #include +#include #define L_SYS (1<<1) #define L_USER (1<<0) -int configure_logging(int ccsfd); +int configure_logging(int ccsfd, int debug); +void node_event(int, uint64_t, int); void node_event_q(int, uint64_t, int); int daemon_init(char *); int init_resource_groups(int); void kill_resource_groups(void); -void set_my_id(uint64_t); +void set_my_id(int); int eval_groups(int, uint64_t, int); -void graceful_exit(int); void flag_shutdown(int sig); void hard_exit(void); int send_rg_states(msgctx_t *, int); @@ -58,6 +60,7 @@ static int signalled = 0; uint64_t next_node_id(cluster_member_list_t *membership, uint64_t me); +int rg_event_q(char *svcName, uint32_t state, int owner); void @@ -74,49 +77,9 @@ int -send_exit_msg(uint64_t nodeid) +send_exit_msg(msgctx_t *ctx) { - msgctx_t ctx; - - if (msg_open(nodeid, RG_PORT, &ctx, 5) < 0) { - printf("Failed to send exit message\n"); - return -1; - } - msg_send_simple(&ctx, RG_EXITING, 0, 0); - msg_close(&ctx); - - return 0; -} - - -/** - Notify other resource group managers that we're leaving, since - cluster membership is not necessarily tied to the members running - the rgmgr. - */ -int -notify_exiting(void) -{ - int x; - uint64_t partner; - cluster_member_list_t *membership; - - membership = member_list(); - if (!membership) - return 0; - - for (x = 0; x < membership->cml_count; x++) { - - partner = membership->cml_members[x].cn_nodeid; - - if (partner == my_id() || - !membership->cml_members[x].cn_member) - continue; - - send_exit_msg(partner); - } - - free_member_list(membership); + msg_send_simple(ctx, RG_EXITING, my_id(), 0); return 0; } @@ -130,60 +93,6 @@ /** - Called to handle the transition of a cluster member from up->down or - down->up. This handles initializing services (in the local node-up case), - exiting due to loss of quorum (local node-down), and service fail-over - (remote node down). - - @param nodeID ID of the member which has come up/gone down. - @param nodeStatus New state of the member in question. - @see eval_groups - */ -void -node_event(int local, uint64_t nodeID, int nodeStatus) -{ - if (!running) - return; - - if (local) { - - /* Local Node Event */ - if (nodeStatus == 0) - hard_exit(); - - if (!rg_initialized()) { - if (init_resource_groups(0) != 0) { - clulog(LOG_ERR, - "#36: Cannot initialize services\n"); - hard_exit(); - } - } - - if (shutdown_pending) { - clulog(LOG_NOTICE, "Processing delayed exit signal\n"); - graceful_exit(SIGINT); - } - setup_signal(SIGINT, graceful_exit); - setup_signal(SIGTERM, graceful_exit); - setup_signal(SIGHUP, flag_reconfigure); - - eval_groups(1, nodeID, 1); - return; - } - - /* - * Nothing to do for events from other nodes if we are not ready. - */ - if (!rg_initialized()) { - clulog(LOG_DEBUG, "Services not initialized.\n"); - return; - } - - eval_groups(0, nodeID, nodeStatus); -} - - -/** This updates our local membership view and handles whether or not we should exit, as well as determines node transitions (thus, calling node_event()). @@ -192,22 +101,43 @@ @return 0 */ int -membership_update(chandle_t *clu) +membership_update(void) { - cluster_member_list_t *new_ml, *node_delta, *old_membership; + cluster_member_list_t *new_ml = NULL, *node_delta = NULL, + *old_membership = NULL; int x; int me = 0; + cman_handle_t h; + int quorate; - if (!rg_quorate()) - return 0; + h = cman_init(NULL); + quorate = cman_is_quorate(h); + if (!quorate) { + cman_finish(h); + + if (!rg_quorate()) + return -1; + + clulog(LOG_EMERG, "#1: Quorum Dissolved\n"); + rg_set_inquorate(); + member_list_update(NULL);/* Clear member list */ + rg_lockall(L_SYS); + rg_doall(RG_INIT, 1, "Emergency stop of %s"); + rg_set_uninitialized(); + return -1; + } else if (!rg_quorate()) { - clulog(LOG_INFO, "Magma Event: Membership Change\n"); + rg_set_quorate(); + rg_unlockall(L_SYS); + rg_unlockall(L_USER); + clulog(LOG_NOTICE, "Quorum Formed\n"); + } old_membership = member_list(); - new_ml = get_member_list(clu->c_cluster); + new_ml = get_member_list(h); member_list_update(new_ml); + cman_finish(h); - clulog(LOG_DEBUG, "I am node #%lld\n", my_id()); /* * Handle nodes lost. Do our local node event first. @@ -218,7 +148,8 @@ if (me) { /* Should not happen */ clulog(LOG_INFO, "State change: LOCAL OFFLINE\n"); - free_member_list(node_delta); + if (node_delta) + free_member_list(node_delta); node_event(1, my_id(), 0); /* NOTREACHED */ } @@ -238,7 +169,6 @@ } } - /* Free nodes */ free_member_list(node_delta); /* @@ -263,8 +193,7 @@ clulog(LOG_INFO, "State change: %s UP\n", node_delta->cml_members[x].cn_name); - node_event_q(0, node_delta->cml_members[x].cn_nodeid, - 1); + node_event_q(0, node_delta->cml_members[x].cn_nodeid, 1); } free_member_list(node_delta); @@ -309,10 +238,61 @@ } -int +#if 0 +struct lr_arg { + msgctx_t *ctx; + int req; +}; + + +void * +lockreq_th(void *a) +{ + int ret; + char state; + struct lr_arg *lr_arg = (struct lr_arg *)a; + cluster_member_list_t *m = member_list(); + + state = (lr_arg->req==RG_LOCK)?1:0; + ret = vf_write(m, VFF_IGN_CONN_ERRORS, "rg_lockdown", &state, 1); + free_member_list(m); + + if (ret == 0) { + msg_send_simple(lr_arg->ctx, RG_SUCCESS, 0, 0); + } else { + msg_send_simple(lr_arg->ctx, RG_FAIL, 0, 0); + } + + msg_close(lr_arg->ctx); + msg_free_ctx(lr_arg->ctx); + free(lr_arg); + return NULL; +} + + +void +do_lockreq(msgctx_t *ctx, int req) +{ + pthread_t th; + struct lr_arg *arg; + + arg = malloc(sizeof(*arg)); + if (!arg) { + msg_send_simple(ctx, RG_FAIL, 0, 0); + msg_close(ctx); + msg_free_ctx(ctx); + return 0; + } + + arg->ctx = ctx; + arg->req = req; + + pthread_create(&th, NULL, lockreq_th, (void *)arg); +} +#else +void do_lockreq(msgctx_t *ctx, int req) { -#if 0 int ret; char state; cluster_member_list_t *m = member_list(); @@ -326,9 +306,9 @@ } else { msg_send_simple(ctx, RG_FAIL, 0, 0); } -#endif - return 0; } +#endif + /** @@ -341,20 +321,35 @@ * @see quorum_msg */ int -dispatch_msg(msgctx_t *ctx, uint64_t nodeid) +dispatch_msg(msgctx_t *ctx, int nodeid, int need_close) { - int ret = -1; + int ret = 0, sz = -1; char msgbuf[4096]; - generic_msg_hdr *msg_hdr = (generic_msg_hdr *)msg_hdr; + generic_msg_hdr *msg_hdr = (generic_msg_hdr *)msgbuf; SmMessageSt *msg_sm = (SmMessageSt *)msgbuf; + memset(msgbuf, 0, sizeof(msgbuf)); + /* Peek-a-boo */ - ret = msg_receive(ctx, msg_hdr, sizeof(msgbuf), 10); - if (ret < sizeof (generic_msg_hdr)) { - clulog(LOG_ERR, "#37: Error receiving message header\n"); + sz = msg_receive(ctx, msg_hdr, sizeof(msgbuf), 10); + if (sz < sizeof (generic_msg_hdr)) { + clulog(LOG_ERR, "#37: Error receiving message header (%d)\n", sz); goto out; } + if (sz < 0) + return -1; + + if (sz > sizeof(msgbuf)) { + raise(SIGSTOP); + } + + /* + printf("RECEIVED %d %d %d %p\n", sz, (int)sizeof(msgbuf), + (int)sizeof(generic_msg_hdr), ctx); + msg_print(ctx); + */ + /* Decode the header */ swab_generic_msg_hdr(msg_hdr); if ((msg_hdr->gh_magic != GENERIC_HDR_MAGIC)) { @@ -364,28 +359,23 @@ goto out; } - if (msg_hdr->gh_length != ret) { + if (msg_hdr->gh_length != sz) { clulog(LOG_ERR, "#XX: Read size mismatch: %d %d\n", ret, msg_hdr->gh_length); goto out; } - ret = 0; switch (msg_hdr->gh_command) { case RG_STATUS: - clulog(LOG_DEBUG, "Sending service states to ctx%p\n",ctx); + clulog(LOG_DEBUG, "Sending service states to CTX%p\n",ctx); send_rg_states(ctx, msg_hdr->gh_arg1); + need_close = 0; break; case RG_LOCK: - if (rg_quorate()) - do_lockreq(ctx, RG_LOCK); - msg_close(ctx); - break; - case RG_UNLOCK: if (rg_quorate()) - do_lockreq(ctx, RG_UNLOCK); + do_lockreq(ctx, msg_hdr->gh_command); break; case RG_QUERY_LOCK: @@ -395,19 +385,18 @@ } break; - case RG_ACTION_REQUEST: - if (ret != sizeof(msg_sm)) { + if (sz < sizeof(msg_sm)) { clulog(LOG_ERR, - "#39: Error receiving entire request\n"); + "#39: Error receiving entire request (%d/%d)\n", + ret, (int)sizeof(msg_sm)); ret = -1; goto out; } /* XXX perf: reencode header */ swab_generic_msg_hdr(msg_hdr); - /* Decode SmMessageSt message */ swab_SmMessageSt(msg_sm); @@ -430,21 +419,47 @@ ctx, 0, msg_sm->sm_data.d_svcOwner, 0, 0); return 0; + case RG_EVENT: + /* Service event. Run a dependency check */ + if (sz < sizeof(msg_sm)) { + clulog(LOG_ERR, + "#39: Error receiving entire request (%d/%d)\n", + ret, (int)sizeof(msg_sm)); + ret = -1; + goto out; + } + + /* XXX perf: reencode header */ + swab_generic_msg_hdr(msg_hdr); + /* Decode SmMessageSt message */ + swab_SmMessageSt(msg_sm); + + /* Send to our rg event handler */ + rg_event_q(msg_sm->sm_data.d_svcName, + msg_sm->sm_data.d_action, + msg_sm->sm_data.d_svcOwner); + break; + case RG_EXITING: clulog(LOG_NOTICE, "Member %d is now offline\n", (int)nodeid); - node_event(0, nodeid, 0); break; + case VF_MESSAGE: + /* Ignore; our VF thread handles these */ + break; + default: clulog(LOG_DEBUG, "unhandled message request %d\n", msg_hdr->gh_command); break; } - out: - msg_close(ctx); + if (need_close) { + msg_close(ctx); + msg_free_ctx(ctx); + } return ret; } @@ -455,47 +470,65 @@ @return Event */ int -handle_cluster_event(chandle_t *clu, msgctx_t *ctx) +handle_cluster_event(msgctx_t *ctx) { int ret; + msgctx_t *newctx; + int nodeid; ret = msg_wait(ctx, 0); + switch(ret) { case M_PORTOPENED: + msg_receive(ctx, NULL, 0, 0); + clulog(LOG_DEBUG, "Event: Port Opened\n"); + membership_update(); + break; case M_PORTCLOSED: /* Might want to handle powerclosed like membership change */ + msg_receive(ctx, NULL, 0, 0); + clulog(LOG_DEBUG, "Event: Port Closed\n"); + membership_update(); + break; case M_NONE: + msg_receive(ctx, NULL, 0, 0); clulog(LOG_DEBUG, "NULL cluster message\n"); break; case M_OPEN: + newctx = msg_new_ctx(); + if (msg_accept(ctx, newctx) >= 0 && + rg_quorate()) { + /* Handle message */ + /* When request completes, the fd is closed */ + nodeid = msg_get_nodeid(newctx); + dispatch_msg(newctx, nodeid, 1); + break; + } + break; + + case M_DATA: + dispatch_msg(ctx, nodeid, 0); + break; + case M_OPEN_ACK: case M_CLOSE: clulog(LOG_DEBUG, "I should NOT get here: %d\n", ret); break; case M_STATECHANGE: + msg_receive(ctx, NULL, 0, 0); clulog(LOG_DEBUG, "Membership Change Event\n"); if (rg_quorate() && running) { rg_unlockall(L_SYS); - membership_update(clu); + membership_update(); } break; - rg_set_quorate(); - rg_unlockall(L_SYS); - rg_unlockall(L_USER); - clulog(LOG_NOTICE, "Quorum Achieved\n"); - membership_update(clu); + case 998: break; case 999: - clulog(LOG_EMERG, "#1: Quorum Dissolved\n"); - rg_set_inquorate(); - member_list_update(NULL); /* Clear member list */ - rg_lockall(L_SYS); - rg_doall(RG_INIT, 1, "Emergency stop of %s"); - rg_set_uninitialized(); - break; case M_TRY_SHUTDOWN: + msg_receive(ctx, NULL, 0, 0); clulog(LOG_WARNING, "#67: Shutting down uncleanly\n"); rg_set_inquorate(); rg_doall(RG_INIT, 1, "Emergency stop of %s"); @@ -512,15 +545,13 @@ void dump_threads(void); int -event_loop(chandle_t *clu) +event_loop(msgctx_t *localctx, msgctx_t *clusterctx) { int n, max, ret; fd_set rfds; - msgctx_t newctx; + msgctx_t *newctx; struct timeval tv; int nodeid; - msgctx_t *localctx = clu->local_ctx; - msgctx_t *clusterctx = clu->cluster_ctx; tv.tv_sec = 10; tv.tv_usec = 0; @@ -545,7 +576,8 @@ break; if (msg_fd_isset(clusterctx, &rfds)) { - handle_cluster_event(clu, clusterctx); + msg_fd_clr(clusterctx, &rfds); + handle_cluster_event(clusterctx); continue; } @@ -553,7 +585,9 @@ continue; } - ret = msg_accept(localctx, &newctx); + msg_fd_clr(localctx, &rfds); + newctx = msg_new_ctx(); + ret = msg_accept(localctx, newctx); if (ret == -1) continue; @@ -561,25 +595,27 @@ if (rg_quorate()) { /* Handle message */ /* When request completes, the fd is closed */ - nodeid = msg_get_nodeid(&newctx); - dispatch_msg(&newctx, nodeid); + nodeid = msg_get_nodeid(newctx); + dispatch_msg(newctx, nodeid, 1); continue; } if (!rg_initialized()) { - msg_close(&newctx); + msg_close(newctx); + msg_free_ctx(newctx); continue; } if (!rg_quorate()) { printf("Dropping connect: NO QUORUM\n"); - msg_close(&newctx); + msg_close(newctx); + msg_free_ctx(newctx); } } if (need_reconfigure || check_config_update()) { need_reconfigure = 0; - configure_logging(-1); + configure_logging(-1, 0); init_resource_groups(1); return 0; } @@ -606,13 +642,6 @@ void -graceful_exit(int sig) -{ - running = 0; -} - - -void hard_exit(void) { rg_lockall(L_SYS); @@ -625,12 +654,9 @@ void cleanup(msgctx_t *clusterctx) { - rg_lockall(L_SYS); - rg_doall(RG_STOP_EXITING, 1, NULL); - //vf_shutdown(); kill_resource_groups(); member_list_update(NULL); - notify_exiting(); + send_exit_msg(clusterctx); } @@ -649,7 +675,7 @@ * Configure logging based on data in cluster.conf */ int -configure_logging(int ccsfd) +configure_logging(int ccsfd, int dbg) { char *v; char internal = 0; @@ -667,7 +693,8 @@ } if (ccs_get(ccsfd, "/cluster/rm/@log_level", &v) == 0) { - clu_set_loglevel(atoi(v)); + if (!dbg) + clu_set_loglevel(atoi(v)); free(v); } @@ -679,20 +706,21 @@ void -clu_initialize(chandle_t *clu) +clu_initialize(cman_handle_t **ch) { - cman_node_t me; + if (!ch) + exit(1); - clu->c_cluster = cman_init(NULL); - if (!clu->c_cluster) { + *ch = cman_init(NULL); + if (!(*ch)) { clulog(LOG_NOTICE, "Waiting for CMAN to start\n"); - while (!(clu->c_cluster = cman_init(NULL))) { + while (!(*ch = cman_init(NULL))) { sleep(1); } } - if (!cman_is_quorate(clu->c_cluster)) { + if (!cman_is_quorate(*ch)) { /* There are two ways to do this; this happens to be the simpler of the two. The other method is to join with a NULL group @@ -701,14 +729,11 @@ */ clulog(LOG_NOTICE, "Waiting for quorum to form\n"); - while (cman_is_quorate(clu->c_cluster) == 0) { + while (cman_is_quorate(*ch) == 0) { sleep(1); } clulog(LOG_NOTICE, "Quorum formed, starting\n"); } - - cman_get_node(clu->c_cluster, CMAN_NODEID_US, &me); - clu->c_nodeid = me.cn_nodeid; } @@ -722,15 +747,28 @@ } +void * +shutdown_thread(void *arg) +{ + rg_doall(RG_STOP_EXITING, 1, NULL); + running = 0; + + return 0; +} + + int main(int argc, char **argv) { int rv; char foreground = 0; - int quorate; - int listen_fds[2], listeners; - int myNodeID; - chandle_t clu; + cman_node_t me; + msgctx_t *cluster_ctx; + msgctx_t *local_ctx; + int port = RG_PORT; + pthread_t th; + + cman_handle_t *clu = NULL; while ((rv = getopt(argc, argv, "fd")) != EOF) { switch (rv) { @@ -759,13 +797,25 @@ } clu_initialize(&clu); - set_my_id(clu.c_nodeid); + if (cman_init_subsys(clu) < 0) { + perror("cman_init_subsys"); + return -1; + } + + if (clu_lock_init("rgmanager") != 0) { + printf("Locks not working!\n"); + return -1; + } + + memset(&me, 0, sizeof(me)); + cman_get_node(clu, CMAN_NODEID_US, &me); + set_my_id(me.cn_nodeid); /* We know we're quorate. At this point, we need to read the resource group trees from ccsd. */ - configure_logging(-1); + configure_logging(-1, debug); clulog(LOG_NOTICE, "Resource Group Manager Starting\n"); if (init_resource_groups(0) != 0) { @@ -778,46 +828,60 @@ setup_signal(SIGUSR1, statedump); unblock_signal(SIGCHLD); setup_signal(SIGPIPE, SIG_IGN); + if (debug) { setup_signal(SIGSEGV, segfault); } else { unblock_signal(SIGSEGV); } - if (msg_init(&clu) < 0) { - clulog(LOG_CRIT, "#10: Couldn't set up message system\n"); + if (msg_listen(MSG_SOCKET, RGMGR_SOCK, me.cn_nodeid, &local_ctx) < 0) { + clulog(LOG_CRIT, + "#10: Couldn't set up cluster message system: %s\n", + strerror(errno)); + return -1; + } + + if (msg_listen(MSG_CLUSTER, &port , me.cn_nodeid, &cluster_ctx) < 0) { + clulog(LOG_CRIT, + "#10b: Couldn't set up cluster message system: %s\n", + strerror(errno)); return -1; } rg_set_quorate(); - set_my_id(clu.c_nodeid); + + /* + msg_print(local_ctx); + msg_print(cluster_ctx); + */ /* Initialize the VF stuff. */ -#if 0 - if (vf_init(clu.c_nodeid, RG_VF_PORT, NULL, NULL) != 0) { + if (vf_init(me.cn_nodeid, RG_PORT, NULL, NULL) != 0) { clulog(LOG_CRIT, "#11: Couldn't set up VF listen socket\n"); return -1; } vf_key_init("rg_lockdown", 10, NULL, lock_commit_cb); -#endif - - /* - Get an initial membership view. - */ - membership_update(&clu); /* Do everything useful */ - while (running) - event_loop(&clu); + while (running) { + event_loop(local_ctx, cluster_ctx); + + if (shutdown_pending == 1) { + ++shutdown_pending; + clulog(LOG_NOTICE, "Shutting down\n"); + pthread_create(&th, NULL, shutdown_thread, NULL); + } + } - clulog(LOG_NOTICE, "Shutting down\n"); - cleanup(&clu); + cleanup(cluster_ctx); clulog(LOG_NOTICE, "Shutdown complete, exiting\n"); + cman_finish(clu); /*malloc_dump_table(); */ /* Only works if alloc.c us used */ /*malloc_stats();*/ --- cluster/rgmanager/src/daemons/nodeevent.c 2006/06/02 17:37:10 1.2 +++ cluster/rgmanager/src/daemons/nodeevent.c 2006/07/11 23:52:41 1.3 @@ -20,6 +20,9 @@ #include #include #include +#include +#include +#include typedef struct __ne_q { list_head(); @@ -28,8 +31,6 @@ int ne_state; } nevent_t; -int node_event(int, uint64_t, int); - /** * Node event queue. */ @@ -38,11 +39,120 @@ static pthread_t ne_thread = 0; int ne_queue_request(int local, uint64_t nodeid, int state); +void hard_exit(void); +int init_resource_groups(int); +void flag_shutdown(int sig); +void flag_reconfigure(int sig); + +extern int running; +extern int shutdown_pending; + + +/** + Called to handle the transition of a cluster member from up->down or + down->up. This handles initializing services (in the local node-up case), + exiting due to loss of quorum (local node-down), and service fail-over + (remote node down). + + @param nodeID ID of the member which has come up/gone down. + @param nodeStatus New state of the member in question. + @see eval_groups + */ +void +node_event(int local, uint64_t nodeID, int nodeStatus) +{ + if (!running) + return; + + if (local) { + + /* Local Node Event */ + if (nodeStatus == 0) + hard_exit(); + + if (!rg_initialized()) { + if (init_resource_groups(0) != 0) { + clulog(LOG_ERR, + "#36: Cannot initialize services\n"); + hard_exit(); + } + } + + if (shutdown_pending) { + clulog(LOG_NOTICE, "Processing delayed exit signal\n"); + running = 0; + } + setup_signal(SIGINT, flag_shutdown); + setup_signal(SIGTERM, flag_shutdown); + setup_signal(SIGHUP, flag_reconfigure); + + eval_groups(1, nodeID, 1); + return; + } + + /* + * Nothing to do for events from other nodes if we are not ready. + */ + if (!rg_initialized()) { + clulog(LOG_DEBUG, "Services not initialized.\n"); + return; + } + + eval_groups(0, nodeID, nodeStatus); +} + + +int +node_has_fencing(int nodeid) +{ + int ccs_desc; + char *val = NULL; + char buf[1024]; + int ret = 1; + + ccs_desc = ccs_connect(); + if (ccs_desc < 0) { + clulog(LOG_ERR, "Unable to connect to ccsd; cannot handle" + " node event!\n"); + /* Assume node has fencing */ + return 1; + } + + snprintf(buf, sizeof(buf), + "/cluster/clusternodes/clusternode[@nodeid=\"%d\"]" + "/fence/method/device/@name", nodeid); + + if (ccs_get(ccs_desc, buf, &val) != 0) + ret = 0; + if (val) + free(val); + ccs_disconnect(ccs_desc); + return ret; +} + + +int +node_fenced(int nodeid) +{ + cman_handle_t ch; + int fenced = 0; + uint64_t fence_time; + + ch = cman_init(NULL); + if (cman_get_fenceinfo(ch, nodeid, &fence_time, &fenced, NULL) < 0) + fenced = 0; + + cman_finish(ch); + + return fenced; +} + void * node_event_thread(void *arg) { nevent_t *ev; + int notice; while (1) { pthread_mutex_lock(&ne_queue_mutex); @@ -53,6 +163,22 @@ break; /* We're outta here */ pthread_mutex_unlock(&ne_queue_mutex); + if (ev->ne_state == 0 && node_has_fencing(ev->ne_nodeid)) { + notice = 0; + while (!node_fenced(ev->ne_nodeid)) { + if (!notice) { + notice = 1; + clulog(LOG_INFO, "Waiting for " + "node #%d to be fenced\n", + ev->ne_nodeid); + } + sleep(2); + } + if (notice) + clulog(LOG_INFO, "Node #%d fenced; " + "continuing\n", ev->ne_nodeid); + } + node_event(ev->ne_local, ev->ne_nodeid, ev->ne_state); free(ev); @@ -60,7 +186,6 @@ /* Mutex held */ ne_thread = 0; - rg_dec_threads(); pthread_mutex_unlock(&ne_queue_mutex); return NULL; } @@ -96,8 +221,6 @@ pthread_create(&ne_thread, &attrs, node_event_thread, NULL); pthread_attr_destroy(&attrs); - - rg_inc_threads(); } pthread_mutex_unlock (&ne_queue_mutex); } --- cluster/rgmanager/src/daemons/reslist.c 2006/06/02 17:37:10 1.13 +++ cluster/rgmanager/src/daemons/reslist.c 2006/07/11 23:52:41 1.14 @@ -33,6 +33,13 @@ char *attr_value(resource_node_t *node, char *attrname); char *rg_attr_value(resource_node_t *node, char *attrname); +void +res_build_name(char *buf, size_t buflen, resource_t *res) +{ + snprintf(buf, buflen, "%s:%s", res->r_rule->rr_type, + res->r_attrs[0].ra_value); +} + /** Find and determine an attribute's value. @@ -265,11 +272,23 @@ find_root_by_ref(resource_t **reslist, char *ref) { resource_t *curr; + char ref_buf[128]; + char *type; + char *name; int x; + snprintf(ref_buf, sizeof(ref_buf), "%s", ref); + + type = ref_buf; + if ((name = strchr(ref_buf, ':'))) { + *name = 0; + name++; + } else { + /* Default type */ + type = "service"; + } + list_do(reslist, curr) { - if (curr->r_rule->rr_root == 0) - continue; /* This should be one operation - the primary attr @@ -277,15 +296,18 @@ */ for (x = 0; curr->r_attrs && curr->r_attrs[x].ra_name; x++) { + if (strcmp(type, curr->r_rule->rr_type)) + continue; if (!(curr->r_attrs[x].ra_flags & RA_PRIMARY)) continue; - if (strcmp(ref, curr->r_attrs[x].ra_value)) + if (strcmp(name, curr->r_attrs[x].ra_value)) continue; return curr; } } while (!list_done(reslist, curr)); + return NULL; } @@ -447,8 +469,6 @@ int x; printf("Resource type: %s", res->r_rule->rr_type); - if (res->r_rule->rr_root) - printf(" [ROOT]"); if (res->r_flags & RF_INLINE) printf(" [INLINE]"); if (res->r_flags & RF_NEEDSTART) --- cluster/rgmanager/src/daemons/restree.c 2006/06/02 17:37:10 1.19 +++ cluster/rgmanager/src/daemons/restree.c 2006/07/11 23:52:41 1.20 @@ -1,5 +1,5 @@ /* - Copyright Red Hat, Inc. 2004 + Copyright Red Hat, Inc. 2004-2006 This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the --- cluster/rgmanager/src/daemons/rg_forward.c 2006/06/02 17:37:10 1.4 +++ cluster/rgmanager/src/daemons/rg_forward.c 2006/07/11 23:52:41 1.5 @@ -47,24 +47,26 @@ { rg_state_t rgs; request_t *req = (request_t *)arg; - void *lockp; + struct dlm_lksb lockp; msgctx_t ctx; SmMessageSt msg; if (rg_lock(req->rr_group, &lockp) != 0) { msg_close(req->rr_resp_ctx); + msg_free_ctx(req->rr_resp_ctx); rq_free(req); pthread_exit(NULL); } if (get_rg_state(req->rr_group, &rgs) != 0) { - rg_unlock(req->rr_group, lockp); + rg_unlock(&lockp); msg_close(req->rr_resp_ctx); + msg_free_ctx(req->rr_resp_ctx); rq_free(req); pthread_exit(NULL); } - rg_unlock(req->rr_group, lockp); + rg_unlock(&lockp); /* Construct message */ build_message(&msg, req->rr_request, req->rr_group, req->rr_target); @@ -74,8 +76,9 @@ rg_req_str(req->rr_request), (int)rgs.rs_owner); */ - if (msg_open(rgs.rs_owner, RG_PORT, &ctx, 10) < 0) { + if (msg_open(MSG_CLUSTER, rgs.rs_owner, RG_PORT, &ctx, 10) < 0) { msg_close(req->rr_resp_ctx); + msg_free_ctx(req->rr_resp_ctx); rq_free(req); pthread_exit(NULL); } @@ -83,6 +86,7 @@ if (msg_send(&ctx, &msg, sizeof(msg)) != sizeof(msg)) { msg_close(&ctx); msg_close(req->rr_resp_ctx); + msg_free_ctx(req->rr_resp_ctx); rq_free(req); pthread_exit(NULL); } @@ -90,6 +94,7 @@ if (msg_receive(&ctx, &msg, sizeof(msg),10) != sizeof(msg)) { msg_close(&ctx); msg_close(req->rr_resp_ctx); + msg_free_ctx(req->rr_resp_ctx); rq_free(req); pthread_exit(NULL); } --- cluster/rgmanager/src/daemons/rg_state.c 2006/06/02 17:37:10 1.16 +++ cluster/rgmanager/src/daemons/rg_state.c 2006/07/11 23:52:41 1.17 @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -41,6 +42,7 @@ int set_rg_state(char *servicename, rg_state_t *svcblk); int get_rg_state(char *servicename, rg_state_t *svcblk); void get_recovery_policy(char *rg_name, char *buf, size_t buflen); +int check_depend_safe(char *servicename); uint64_t @@ -67,11 +69,35 @@ } +void +broadcast_event(char *svcName, uint32_t state) +{ + SmMessageSt msgp; + msgctx_t everyone; + + msgp.sm_hdr.gh_magic = GENERIC_HDR_MAGIC; + msgp.sm_hdr.gh_command = RG_EVENT; + msgp.sm_hdr.gh_length = sizeof(msgp); + msgp.sm_data.d_action = state; + strncpy(msgp.sm_data.d_svcName, svcName, + sizeof(msgp.sm_data.d_svcName)); + msgp.sm_data.d_svcOwner = 0; + msgp.sm_data.d_ret = 0; + + swab_SmMessageSt(&msgp); + + if (msg_open(MSG_CLUSTER, 0, RG_PORT, &everyone, 0) < 0) + return; + + msg_send(&everyone, &msgp, sizeof(msgp)); + msg_close(&everyone); +} + + int svc_report_failure(char *svcName) { -#if 0 - void *lockp = NULL; + struct dlm_lksb lockp; rg_state_t svcStatus; char *nodeName; cluster_member_list_t *membership; @@ -85,10 +111,10 @@ if (get_rg_state(svcName, &svcStatus) != 0) { clulog(LOG_ERR, "#42: Couldn't obtain status for RG %s\n", svcName); - clu_unlock(svcName, lockp); + rg_unlock(&lockp); return -1; } - rg_unlock(svcName, lockp); + rg_unlock(&lockp); membership = member_list(); nodeName = memb_id_to_name(membership, svcStatus.rs_last_owner); @@ -107,104 +133,27 @@ "#4: Administrator intervention required.\n", svcName, nodeName); -#endif return 0; } int -clu_lock_verbose(char *resource, int dflt_flags, void **lockpp) -{ -#if 0 - int ret, timed_out = 0; - struct timeval start, now; - uint64_t nodeid, *p; - int flags; - int block = !(dflt_flags & CLK_NOWAIT); - - /* Holder not supported for this call */ - dflt_flags &= ~CLK_HOLDER; - - flags = dflt_flags; - - if (block) { - gettimeofday(&start, NULL); - start.tv_sec += 30; - } - while (1) { - if (block) { - gettimeofday(&now, NULL); - - if ((now.tv_sec > start.tv_sec) || - ((now.tv_sec == start.tv_sec) && - (now.tv_usec >= start.tv_usec))) { - - gettimeofday(&start, NULL); - start.tv_sec += 30; - - timed_out = 1; - flags |= CLK_HOLDER; - } - } - - *lockpp = NULL; - ret = clu_lock(resource, flags | CLK_NOWAIT, lockpp); - - if ((ret != 0) && (errno == EAGAIN) && block) { - if (timed_out) { - p = (uint64_t *)*lockpp; - if (p) { - nodeid = *p; - clulog(LOG_WARNING, "Node ID:%08x%08x" - " stuck with lock %s\n", - (uint32_t)(nodeid>>32&0xffffffff), - (uint32_t)nodeid&0xffffffff, - resource); - free(p); - } else { - clulog(LOG_WARNING, "Starving for lock" - " %s\n", resource); - } - flags = dflt_flags; - timed_out = 0; - } - usleep(random()&32767<<1); - continue; - - } else if (ret == 0) { - /* Success */ - return 0; - } - - break; - } - - return ret; -#endif - return -1; -} - - -int #ifdef DEBUG -_rg_lock(char *name, void **p) +_rg_lock(char *name, struct dlm_lksb *p) #else -rg_lock(char *name, void **p) +rg_lock(char *name, struct dlm_lksb *p) #endif { -#if 0 char res[256]; snprintf(res, sizeof(res), "usrm::rg=\"%s\"", name); - return clu_lock_verbose(res, CLK_EX, p); -#endif - return -1; + return clu_lock(LKM_EXMODE, p, 0, res); } #ifdef DEBUG int -_rg_lock_dbg(char *name, void **p, char *file, int line) +_rg_lock_dbg(char *name, struct dlm_lksb *p, char *file, int line) { dprintf("rg_lock(%s) @ %s:%d\n", name, file, line); return _rg_lock(name, p); @@ -215,27 +164,21 @@ int #ifdef DEBUG -_rg_unlock(char *name, void *p) +_rg_unlock(struct dlm_lksb *p) #else -rg_unlock(char *name, void *p) +rg_unlock(struct dlm_lksb *p) #endif { -#if 0 - char res[256]; - - snprintf(res, sizeof(res), "usrm::rg=\"%s\"", name); - return clu_unlock(res, p); -#endif - return -1; + return clu_unlock(p); } #ifdef DEBUG int -_rg_unlock_dbg(char *name, void *p, char *file, int line) +_rg_unlock_dbg(void *p, char *file, int line) { - dprintf("rg_unlock(%s) @ %s:%d\n", name, file, line); - return _rg_unlock(name, p); + dprintf("rg_unlock() @ %s:%d\n", file, line); + return _rg_unlock(p); } #endif @@ -293,7 +236,6 @@ int set_rg_state(char *name, rg_state_t *svcblk) { -#if 0 cluster_member_list_t *membership; char res[256]; int ret; @@ -307,8 +249,6 @@ sizeof(*svcblk)); free_member_list(membership); return ret; -#endif - return -1; } @@ -329,7 +269,6 @@ int get_rg_state(char *name, rg_state_t *svcblk) { -#if 0 char res[256]; int ret; void *data = NULL; @@ -381,8 +320,7 @@ free(data); free_member_list(membership); -#endif - return -1; + return 0; } @@ -390,7 +328,6 @@ int get_rg_state_local(char *name, rg_state_t *svcblk) { -#if 0 char res[256]; int ret; void *data = NULL; @@ -422,9 +359,7 @@ /* Copy out the data. */ memcpy(svcblk, data, sizeof(*svcblk)); free(data); - -#endif - return -1; + return 0; } @@ -714,7 +649,7 @@ int svc_start(char *svcName, int req) { - void *lockp = NULL; + struct dlm_lksb lockp; int ret; rg_state_t svcStatus; @@ -725,7 +660,7 @@ } if (get_rg_state(svcName, &svcStatus) != 0) { - rg_unlock(svcName, lockp); + rg_unlock(&lockp); clulog(LOG_ERR, "#46: Failed getting status for RG %s\n", svcName); return FAIL; @@ -734,13 +669,13 @@ /* LOCK HELD */ switch (svc_advise_start(&svcStatus, svcName, req)) { case 0: /* Don't start service, return FAIL */ - rg_unlock(svcName, lockp); + rg_unlock(&lockp); return FAIL; case 2: /* Don't start service, return 0 */ - rg_unlock(svcName, lockp); + rg_unlock(&lockp); return 0; case 3: - rg_unlock(svcName, lockp); + rg_unlock(&lockp); return RG_EAGAIN; default: break; @@ -760,11 +695,11 @@ if (set_rg_state(svcName, &svcStatus) != 0) { clulog(LOG_ERR, "#47: Failed changing service status\n"); - rg_unlock(svcName, lockp); + rg_unlock(&lockp); return FAIL; } - rg_unlock(svcName, lockp); + rg_unlock(&lockp); ret = group_op(svcName, RG_START); ret = !!ret; /* Either it worked or it didn't. Ignore all the @@ -780,19 +715,22 @@ if (set_rg_state(svcName, &svcStatus) != 0) { clulog(LOG_ERR, "#75: Failed changing service status\n"); - rg_unlock(svcName, lockp); + rg_unlock(&lockp); return FAIL; } - rg_unlock(svcName, lockp); + rg_unlock(&lockp); - if (ret == 0) + if (ret == 0) { clulog(LOG_NOTICE, "Service %s started\n", svcName); - else + + broadcast_event(svcName, RG_STATE_STARTED); + } else { clulog(LOG_WARNING, "#68: Failed to start %s; return value: %d\n", svcName, ret); + } return ret; } @@ -807,7 +745,7 @@ int svc_status(char *svcName) { - void *lockp = NULL; + struct dlm_lksb lockp; rg_state_t svcStatus; if (rg_lock(svcName, &lockp) < 0) { @@ -817,12 +755,12 @@ } if (get_rg_state(svcName, &svcStatus) != 0) { - rg_unlock(svcName, lockp); + rg_unlock(&lockp); clulog(LOG_ERR, "#49: Failed getting status for RG %s\n", svcName); return FAIL; } - rg_unlock(svcName, lockp); + rg_unlock(&lockp); if (svcStatus.rs_owner != my_id()) /* Don't check status for anything not owned */ @@ -847,7 +785,7 @@ static int _svc_stop(char *svcName, int req, int recover, uint32_t newstate) { - void *lockp = NULL; + struct dlm_lksb lockp; rg_state_t svcStatus; int ret; @@ -864,7 +802,7 @@ } if (get_rg_state(svcName, &svcStatus) != 0) { - rg_unlock(svcName, lockp); + rg_unlock(&lockp); clulog(LOG_ERR, "#51: Failed getting status for RG %s\n", svcName); return FAIL; @@ -872,18 +810,18 @@ switch (svc_advise_stop(&svcStatus, svcName, req)) { case 0: - rg_unlock(svcName, lockp); + rg_unlock(&lockp); clulog(LOG_DEBUG, "Unable to stop RG %s in %s state\n", svcName, rg_state_str(svcStatus.rs_state)); return FAIL; case 2: - rg_unlock(svcName, lockp); + rg_unlock(&lockp); return SUCCESS; case 3: - rg_unlock(svcName, lockp); + rg_unlock(&lockp); return RG_EFORWARD; case 4: - rg_unlock(svcName, lockp); + rg_unlock(&lockp); return RG_EAGAIN; default: break; @@ -900,11 +838,11 @@ //printf("rg state = %s\n", rg_state_str(svcStatus.rs_state)); if (set_rg_state(svcName, &svcStatus) != 0) { - rg_unlock(svcName, lockp); + rg_unlock(&lockp); clulog(LOG_ERR, "#52: Failed changing RG status\n"); return FAIL; } - rg_unlock(svcName, lockp); + rg_unlock(&lockp); ret = group_op(svcName, RG_STOP); @@ -918,7 +856,7 @@ _svc_stop_finish(char *svcName, int failed, uint32_t newstate) { rg_state_t svcStatus; - void *lockp; + struct dlm_lksb lockp; if (rg_lock(svcName, &lockp) == FAIL) { clulog(LOG_ERR, "#53: Unable to obtain cluster lock: %s\n", @@ -927,7 +865,7 @@ } if (get_rg_state(svcName, &svcStatus) != 0) { - rg_unlock(svcName, lockp); + rg_unlock(&lockp); clulog(LOG_ERR, "#54: Failed getting status for RG %s\n", svcName); return FAIL; @@ -935,7 +873,7 @@ if ((svcStatus.rs_state != RG_STATE_STOPPING) && (svcStatus.rs_state != RG_STATE_ERROR)) { - rg_unlock(svcName, lockp); + rg_unlock(&lockp); return 0; } @@ -945,11 +883,13 @@ if (failed) { clulog(LOG_CRIT, "#12: RG %s failed to stop; intervention " "required\n", svcName); - svcStatus.rs_state = RG_STATE_FAILED; - } else if (svcStatus.rs_state == RG_STATE_ERROR) + newstate = RG_STATE_FAILED; + } else if (svcStatus.rs_state == RG_STATE_ERROR) { svcStatus.rs_state = RG_STATE_RECOVER; - else - svcStatus.rs_state = newstate; + newstate = RG_STATE_RECOVER; + } + + svcStatus.rs_state = newstate; clulog(LOG_NOTICE, "Service %s is %s\n", svcName, rg_state_str(svcStatus.rs_state)); @@ -957,11 +897,13 @@ svcStatus.rs_transition = (uint64_t)time(NULL); if (set_rg_state(svcName, &svcStatus) != 0) { - rg_unlock(svcName, lockp); + rg_unlock(&lockp); clulog(LOG_ERR, "#55: Failed changing RG status\n"); return FAIL; } - rg_unlock(svcName, lockp); + rg_unlock(&lockp); + + broadcast_event(svcName, newstate); return 0; } @@ -999,7 +941,7 @@ int svc_fail(char *svcName) { - void *lockp = NULL; + struct dlm_lksb lockp; rg_state_t svcStatus; if (rg_lock(svcName, &lockp) == FAIL) { @@ -1011,7 +953,7 @@ clulog(LOG_DEBUG, "Handling failure request for RG %s\n", svcName); if (get_rg_state(svcName, &svcStatus) != 0) { - rg_unlock(svcName, lockp); + rg_unlock(&lockp); clulog(LOG_ERR, "#56: Failed getting status for RG %s\n", svcName); return FAIL; @@ -1019,7 +961,7 @@ if ((svcStatus.rs_state == RG_STATE_STARTED) && (svcStatus.rs_owner != my_id())) { - rg_unlock(svcName, lockp); + rg_unlock(&lockp); clulog(LOG_DEBUG, "Unable to disable RG %s in %s state\n", svcName, rg_state_str(svcStatus.rs_state)); return FAIL; @@ -1036,11 +978,13 @@ svcStatus.rs_transition = (uint64_t)time(NULL); svcStatus.rs_restarts = 0; if (set_rg_state(svcName, &svcStatus) != 0) { - rg_unlock(svcName, lockp); + rg_unlock(&lockp); clulog(LOG_ERR, "#57: Failed changing RG status\n"); return FAIL; } - rg_unlock(svcName, lockp); + rg_unlock(&lockp); + + broadcast_event(svcName, RG_STATE_FAILED); return 0; } @@ -1068,7 +1012,7 @@ /* Open a connection to the other node */ - if (msg_open(target, RG_PORT, &ctx, 2)< 0) { + if (msg_open(MSG_CLUSTER, target, RG_PORT, &ctx, 2)< 0) { clulog(LOG_ERR, "#58: Failed opening connection to member #%d\n", target); @@ -1355,6 +1299,11 @@ return FAIL; } free_member_list(membership); + + /* Check for dependency. We cannot start unless our + dependency is met */ + if (check_depend_safe(svcName) == 0) + return RG_EDEPEND; /* * This is a 'root' start request. We need to clear out our failure --- cluster/rgmanager/src/daemons/test.c 2005/03/21 22:00:31 1.4 +++ cluster/rgmanager/src/daemons/test.c 2006/07/11 23:52:41 1.5 @@ -1,3 +1,21 @@ +/* + Copyright Red Hat, Inc. 2004-2006 + + This program is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by the + Free Software Foundation; either version 2, or (at your option) any + later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 675 Mass Ave, Cambridge, + MA 02139, USA. +*/ #include #include #include --- cluster/rgmanager/src/resources/service.sh 2006/06/02 17:37:10 1.5 +++ cluster/rgmanager/src/resources/service.sh 2006/07/11 23:52:41 1.6 @@ -125,6 +125,17 @@ + + + + Top-level service this depends on, in "service:name" format. + + + Service dependency; will not start without the specified + service running. + + + --- cluster/rgmanager/src/utils/clustat.c 2006/06/02 17:37:11 1.17 +++ cluster/rgmanager/src/utils/clustat.c 2006/07/11 23:52:41 1.18 @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -53,7 +54,7 @@ struct timeval tv; - if (msg_open(0, RG_PORT, &ctx, 10) < 0) { + if (msg_open(MSG_SOCKET, 0, 0, &ctx, 10) < 0) { return NULL; } @@ -107,6 +108,7 @@ } swab_generic_msg_hdr(msgp); + if (msgp->gh_command == RG_SUCCESS) { free(msgp); break; @@ -117,7 +119,6 @@ return NULL; } - rsmp = (rg_state_msg_t *)msgp; swab_rg_state_t(&rsmp->rsm_state); @@ -157,46 +158,63 @@ char buf[128]; char *name; cluster_member_list_t *ret = NULL; + cman_node_t *nodes = NULL; desc = ccs_connect(); if (desc < 0) { return NULL; } - x = 1; + while ((ret = malloc(sizeof(*ret))) == NULL) + sleep(1); - snprintf(buf, sizeof(buf), - "/cluster/clusternodes/clusternode[%d]/@name", x); - while (ccs_get(desc, buf, &name) == 0) { - if (!ret) { - ret = malloc(cml_size(x)); - if (!ret) { + x = 1; + while (1) { + snprintf(buf, sizeof(buf), + "/cluster/clusternodes/clusternode[%d]/@name", x); + + if (ccs_get(desc, buf, &name) != 0) + break; + + if (!nodes) { + nodes = malloc(x * sizeof(cman_node_t)); + if (!nodes ) { perror("malloc"); ccs_disconnect(desc); exit(1); } - memset(ret, 0, cml_size(x)); + memset(nodes, 0, x * sizeof(cman_node_t)); } else { - ret = realloc(ret, cml_size(x)); - if (!ret) { + nodes = realloc(ret, x * sizeof(cman_node_t)); + if (!nodes) { perror("realloc"); ccs_disconnect(desc); exit(1); } } - memset(&ret->cml_members[x-1], 0, sizeof(cman_node_t)); - strncpy(ret->cml_members[x-1].cn_name, name, - sizeof(ret->cml_members[x-1].cn_name)); + memset(&nodes[x-1], 0, sizeof(cman_node_t)); + strncpy(nodes[x-1].cn_name, name, + sizeof(nodes[x-1].cn_name)); free(name); + /* Add node ID */ + snprintf(buf, sizeof(buf), + "/cluster/clusternodes/clusternode[%d]/@nodeid", x); + if (ccs_get(desc, buf, &name) == 0) { + nodes[x-1].cn_nodeid = atoi(name); + free(name); + } + ret->cml_count = x; ++x; - snprintf(buf, sizeof(buf), - "/cluster/clusternodes/clusternode[%d]/@name", x); } ccs_disconnect(desc); + + ret->cml_members = nodes; + + return ret; } @@ -238,9 +256,12 @@ if (!m) { printf("%s not found\n", these->cml_members[x].cn_name); /* WTF? It's not in our config */ - printf("realloc %d\n", (int)cml_size((all->cml_count+1))); - all = realloc(all, cml_size((all->cml_count+1))); - if (!all) { + printf("realloc %d\n", (int)((all->cml_count+1) * + sizeof(cman_node_t))); + all->cml_members = realloc(all->cml_members, + (all->cml_count+1) * + sizeof(cman_node_t)); + if (!all->cml_members) { perror("realloc"); exit(1); } @@ -440,7 +461,8 @@ void txt_member_state(cman_node_t *node) { - printf(" %-40.40s ", node->cn_name); + printf(" %-34.34s %4d ", node->cn_name, + node->cn_nodeid); if (node->cn_member & FLAG_UP) printf("Online"); @@ -481,8 +503,8 @@ { int x; - printf(" %-40.40s %s\n", "Member Name", "Status"); - printf(" %-40.40s %s\n", "------ ----", "------"); + printf(" %-34.34s %-4.4s %s\n", "Member Name", "ID", "Status"); + printf(" %-34.34s %-4.4s %s\n", "------ ----", "----", "------"); for (x = 0; x < membership->cml_count; x++) { if (name && strcmp(membership->cml_members[x].cn_name, name)) --- cluster/rgmanager/src/utils/clusvcadm.c 2006/06/02 17:37:11 1.8 +++ cluster/rgmanager/src/utils/clusvcadm.c 2006/07/11 23:52:41 1.9 @@ -72,7 +72,7 @@ membership = get_member_list(ch); me = get_my_nodeid(ch); - if (msg_open(0, RG_PORT, &ctx, 5) < 0) { + if (msg_open(MSG_SOCKET, 0, RG_PORT, &ctx, 5) < 0) { printf("Could not connect to resource group manager\n"); goto out; } @@ -179,7 +179,7 @@ main(int argc, char **argv) { extern char *optarg; - char *svcname=NULL, nodename[256]; + char *svcname=NULL, nodename[256], realsvcname[64]; int opt; msgctx_t ctx; cman_handle_t ch; @@ -259,7 +259,12 @@ usage(basename(argv[0])); return 1; } - + + if (!strchr(svcname,':')) { + snprintf(realsvcname, sizeof(realsvcname), "service:%s", + svcname); + svcname = realsvcname; + } /* No login */ ch = cman_init(NULL); @@ -287,18 +292,19 @@ */ } + strcpy(nodename,"me"); build_message(&msg, action, svcname, svctarget); if (action != RG_RELOCATE) { printf("Member %s %s %s", nodename, actionstr, svcname); printf("..."); fflush(stdout); - msg_open(0, RG_PORT, &ctx, 5); + msg_open(MSG_SOCKET, 0, RG_PORT, &ctx, 5); } else { printf("Trying to relocate %s to %s", svcname, nodename); printf("..."); fflush(stdout); - msg_open(0, RG_PORT, &ctx, 5); + msg_open(MSG_SOCKET, 0, RG_PORT, &ctx, 5); } if (ctx.type < 0) { @@ -307,7 +313,9 @@ return 1; } - if (msg_send(&ctx, &msg, sizeof(msg)) != sizeof(msg)) { + opt = msg_send(&ctx, &msg, sizeof(msg)); + + if (opt < sizeof(msg)) { perror("msg_send"); fprintf(stderr, "Could not send entire message!\n"); return 1; @@ -340,6 +348,9 @@ case RG_EAGAIN: printf("failed: Try again (resource groups locked)\n"); break; + case RG_EDEPEND: + printf("failed: Operation would break dependency\n"); + break; default: printf("failed: unknown reason %d\n", msg.sm_data.d_ret); break;