* [Cluster-devel] cluster/rgmanager/src clulib/Makefile clulib/a ...
@ 2006-07-11 23:52 lhh
0 siblings, 0 replies; only message in thread
From: lhh @ 2006-07-11 23:52 UTC (permalink / raw)
To: cluster-devel.redhat.com
CVSROOT: /cvs/cluster
Module name: cluster
Changes by: lhh at sourceware.org 2006-07-11 23:52:42
Modified files:
rgmanager/src/clulib: Makefile alloc.c lock.c lockspace.c
members.c message.c msgsimple.c
rg_strings.c signals.c vft.c
rgmanager/src/daemons: Makefile fo_domain.c groups.c main.c
nodeevent.c reslist.c restree.c
rg_forward.c rg_state.c test.c
rgmanager/src/resources: service.sh
rgmanager/src/utils: clustat.c clusvcadm.c
Added files:
rgmanager/src/clulib: cman.c locktest.c msg_cluster.c
msg_socket.c msgtest.c
rgmanager/src/daemons: rg_event.c
Log message:
- Make rgmanager actually do things.
- Finish port of rgmanager to CMAN messaging.
- Add feature to wait for nodes to be fenced prior to handling a
node-down event.
- Add direct DLM lock support.
- Fix local communication.
- Optimize VF data distribution algorithm to use CMAN/Totem's broadcast
mode; this should make rgmanager much more scalable.
- Add multiplexing for CMAN communications so threads can have pseudo
private channels over the One CMAN socket.
- Add service->service dependencies based on service events.
- Add node ID display to clustat text-mode output.
Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/cman.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/locktest.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/msg_cluster.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/msg_socket.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/msgtest.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/Makefile.diff?cvsroot=cluster&r1=1.8&r2=1.9
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/alloc.c.diff?cvsroot=cluster&r1=1.7&r2=1.8
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/lock.c.diff?cvsroot=cluster&r1=1.1&r2=1.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/lockspace.c.diff?cvsroot=cluster&r1=1.1&r2=1.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/members.c.diff?cvsroot=cluster&r1=1.1&r2=1.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/message.c.diff?cvsroot=cluster&r1=1.1&r2=1.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/msgsimple.c.diff?cvsroot=cluster&r1=1.5&r2=1.6
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/rg_strings.c.diff?cvsroot=cluster&r1=1.3&r2=1.4
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/signals.c.diff?cvsroot=cluster&r1=1.1&r2=1.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/vft.c.diff?cvsroot=cluster&r1=1.13&r2=1.14
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/rg_event.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/Makefile.diff?cvsroot=cluster&r1=1.12&r2=1.13
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/fo_domain.c.diff?cvsroot=cluster&r1=1.8&r2=1.9
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/groups.c.diff?cvsroot=cluster&r1=1.18&r2=1.19
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/main.c.diff?cvsroot=cluster&r1=1.25&r2=1.26
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/nodeevent.c.diff?cvsroot=cluster&r1=1.2&r2=1.3
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/reslist.c.diff?cvsroot=cluster&r1=1.13&r2=1.14
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/restree.c.diff?cvsroot=cluster&r1=1.19&r2=1.20
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/rg_forward.c.diff?cvsroot=cluster&r1=1.4&r2=1.5
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/rg_state.c.diff?cvsroot=cluster&r1=1.16&r2=1.17
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/test.c.diff?cvsroot=cluster&r1=1.4&r2=1.5
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/resources/service.sh.diff?cvsroot=cluster&r1=1.5&r2=1.6
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/utils/clustat.c.diff?cvsroot=cluster&r1=1.17&r2=1.18
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/utils/clusvcadm.c.diff?cvsroot=cluster&r1=1.8&r2=1.9
/cvs/cluster/cluster/rgmanager/src/clulib/cman.c,v --> standard output
revision 1.1
--- cluster/rgmanager/src/clulib/cman.c
+++ - 2006-07-11 23:52:43.661578000 +0000
@@ -0,0 +1,264 @@
+/*
+ Copyright Red Hat, Inc. 2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
+/**
+ pthread mutex wrapper for a global CMAN handle
+ */
+#include <stdio.h>
+#include <pthread.h>
+#include <libcman.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/types.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <signal.h>
+#include <sys/socket.h>
+#include <fcntl.h>
+
+static cman_handle_t *_chandle = NULL;
+static pthread_mutex_t _chandle_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t _chandle_cond = PTHREAD_COND_INITIALIZER;
+static pthread_t _chandle_holder = 0;
+static int _chandle_preempt = 0;
+static int _wakeup_pipe[2] = { -1, -1 };
+
+static void
+_set_nonblock(int fd)
+{
+ int flags;
+
+ flags = fcntl(fd, F_GETFL, 0);
+ if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
+ perror("fcntl");
+}
+
+
+/**
+ Lock / return the global CMAN handle.
+
+ @param block If nonzero, we wait until the handle is released
+ @param preempt If nonzero, *try* to wake up the holder who has
+ taken the lock with cman_lock_preemptible. Will not
+ wake up holders which took it with cman_lock().
+ @return NULL / errno on failure; the global CMAN handle
+ on success.
+ */
+cman_handle_t *
+cman_lock(int block, int preempt)
+{
+ pthread_t tid;
+ cman_handle_t *ret = NULL;
+
+ pthread_mutex_lock(&_chandle_lock);
+ if (_chandle == NULL) {
+ errno = ENOSYS;
+ goto out_unlock;
+ }
+
+ tid = pthread_self();
+ if (_chandle_holder == tid) {
+ errno = EDEADLK;
+ goto out_unlock;
+ }
+
+ if (_chandle_holder > 0) {
+ if (!block) {
+ errno = EAGAIN;
+ goto out_unlock;
+ }
+
+ /* Try to wake up the holder! */
+ if (preempt)
+ write(_wakeup_pipe[1], "!", 1);
+
+ /* Blocking call; do the cond-thing */
+ pthread_cond_wait(&_chandle_cond, &_chandle_lock);
+ }
+
+ _chandle_holder = tid;
+ ret = _chandle;
+out_unlock:
+ pthread_mutex_unlock(&_chandle_lock);
+ return ret;
+}
+
+
+/**
+ Lock / return the global CMAN handle.
+
+ @param block If nonzero, we wait until the handle is released
+ @param preempt_fd Caller should include this file descriptor in
+ blocking calls to select(2), so that we can wake
+ it up if someone calls with cman_lock(xxx, 1);
+ @return NULL / errno on failure; the global CMAN handle
+ on success.
+ */
+cman_handle_t *
+cman_lock_preemptible(int block, int *preempt_fd)
+{
+ pthread_t tid;
+ cman_handle_t *ret = NULL;
+
+ if (preempt_fd == NULL) {
+ errno = EINVAL;
+ return NULL;
+ }
+
+ pthread_mutex_lock(&_chandle_lock);
+ if (_chandle == NULL) {
+ errno = ENOSYS;
+ goto out_unlock;
+ }
+
+ tid = pthread_self();
+ if (_chandle_holder == tid) {
+ errno = EDEADLK;
+ goto out_unlock;
+ }
+
+ if (_chandle_holder > 0) {
+ if (!block) {
+ errno = EAGAIN;
+ goto out_unlock;
+ }
+
+ /* Blocking call; do the cond-thing */
+ pthread_cond_wait(&_chandle_cond, &_chandle_lock);
+ }
+
+ *preempt_fd = _wakeup_pipe[0];
+ _chandle_holder = tid;
+ _chandle_preempt = 1;
+ ret = _chandle;
+out_unlock:
+ pthread_mutex_unlock(&_chandle_lock);
+ return ret;
+}
+
+
+/**
+ Release the global CMAN handle
+
+ @param ch Should match the global handle
+ @return -1 on failure, 0 on success
+ */
+int
+cman_unlock(cman_handle_t *ch)
+{
+ int ret = -1;
+ char c;
+
+ pthread_mutex_lock(&_chandle_lock);
+ if (_chandle == NULL) {
+ errno = ENOSYS;
+ goto out_unlock;
+ }
+
+ if (_chandle_holder != pthread_self() || !_chandle_holder) {
+ errno = EBUSY;
+ goto out_unlock;
+ }
+
+ if (_chandle != ch) {
+ errno = EINVAL;
+ goto out_unlock;
+ }
+
+ /* Empty wakeup pipe if we took it with the preempt flag */
+ if (_chandle_preempt)
+ read(_wakeup_pipe[0], &c, 1);
+
+ _chandle_preempt = 0;
+ _chandle_holder = 0;
+ ret = 0;
+
+out_unlock:
+ pthread_mutex_unlock(&_chandle_lock);
+ if (ret == 0)
+ pthread_cond_broadcast(&_chandle_cond);
+ return ret;
+}
+
+
+int
+cman_init_subsys(cman_handle_t *ch)
+{
+ int ret = -1;
+
+ pthread_mutex_lock(&_chandle_lock);
+ if (_chandle) {
+ errno = EAGAIN;
+ goto out_unlock;
+ }
+
+ if (!ch) {
+ errno = EAGAIN;
+ goto out_unlock;
+ }
+
+ if (pipe(_wakeup_pipe) < 0) {
+ goto out_unlock;
+ }
+
+ _set_nonblock(_wakeup_pipe[0]);
+ _chandle = ch;
+ _chandle_holder = 0;
+ ret = 0;
+
+out_unlock:
+ pthread_mutex_unlock(&_chandle_lock);
+ return ret;
+}
+
+
+int
+cman_cleanup_subsys(void)
+{
+ int ret = -1;
+
+ pthread_mutex_lock(&_chandle_lock);
+ if (!_chandle) {
+ errno = EAGAIN;
+ goto out_unlock;
+ }
+
+ if (_chandle_holder > 0) {
+ pthread_cond_wait(&_chandle_cond, &_chandle_lock);
+ }
+
+ ret = 0;
+ _chandle = NULL;
+ _chandle_holder = 0;
+
+ close(_wakeup_pipe[0]);
+ close(_wakeup_pipe[1]);
+
+out_unlock:
+ pthread_mutex_unlock(&_chandle_lock);
+ return ret;
+}
+
+
+int
+cman_send_data_unlocked(void *buf, int len, int flags,
+ uint8_t port, int nodeid)
+{
+ return cman_send_data(_chandle, buf, len, flags, port, nodeid);
+}
/cvs/cluster/cluster/rgmanager/src/clulib/locktest.c,v --> standard output
revision 1.1
--- cluster/rgmanager/src/clulib/locktest.c
+++ - 2006-07-11 23:52:43.746107000 +0000
@@ -0,0 +1,85 @@
+/*
+ Copyright Red Hat, Inc. 2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
+#include <lock.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <pthread.h>
+#include <signal.h>
+
+
+void *
+lock_thread(void *arg)
+{
+ struct dlm_lksb lksb;
+
+ while(1) {
+ printf("Taking lock..\n");
+ clu_lock(LKM_EXMODE, &lksb, 0, arg);
+ printf("Thread acquired lock on %s\n", (char *)arg);
+ clu_unlock(&lksb);
+ }
+}
+
+
+
+
+int
+main(int argc, char **argv)
+{
+ struct dlm_lksb lksb;
+ int ret;
+ pthread_t th;
+
+ if (clu_lock_init("Testing") != 0) {
+ perror("clu_lock_init");
+ return 1;
+ }
+
+ if (argc < 2) {
+ printf("Lock what?\n");
+ return 1;
+ }
+
+ if (argc == 3) {
+ pthread_create(&th, NULL, lock_thread, strdup(argv[1]));
+ }
+
+ memset(&lksb,0,sizeof(lksb));
+ ret = clu_lock(LKM_EXMODE, &lksb, 0, argv[1]);
+ if (ret < 0) {
+ perror("clu_lock");
+ return 1;
+ }
+
+ printf("Acquired lock on %s; press enter to release\n", argv[1]);
+ getchar();
+
+ clu_unlock(&lksb);
+
+ if (argc == 3) {
+ printf("Press enter to kill lock thread...\n");
+ getchar();
+ pthread_kill(th, SIGTERM);
+ }
+
+ clu_lock_finished("Testing");
+
+ return 0;
+}
/cvs/cluster/cluster/rgmanager/src/clulib/msg_cluster.c,v --> standard output
revision 1.1
--- cluster/rgmanager/src/clulib/msg_cluster.c
+++ - 2006-07-11 23:52:43.828222000 +0000
@@ -0,0 +1,1104 @@
+/*
+ Copyright Red Hat, Inc. 2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
+#define _MESSAGE_BUILD
+#include <message.h>
+#include <stdio.h>
+#include <pthread.h>
+#include <libcman.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/types.h>
+#include <time.h>
+#include <sys/time.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <signal.h>
+#include <signals.h>
+#include <cman-private.h>
+
+/* Ripped from ccsd's setup_local_socket */
+#define MAX_CONTEXTS 32 /* Testing; production should be 1024-ish */
+
+int cluster_msg_close(msgctx_t *ctx);
+
+/* Context 0 is reserved for control messages */
+
+/* Local-ish contexts */
+static pthread_mutex_t context_lock = PTHREAD_MUTEX_INITIALIZER;
+static msgctx_t *contexts[MAX_CONTEXTS];
+static int _me = 0;
+pthread_t comms_thread;
+int thread_running;
+
+#define is_established(ctx) \
+ (((ctx->type == MSG_CLUSTER) && \
+ (ctx->u.cluster_info.remote_ctx && \
+ ctx->u.cluster_info.local_ctx)) || \
+ ((ctx->type == MSG_SOCKET) && \
+ (ctx->u.local_info.sockfd != -1)))
+
+static msg_ops_t cluster_msg_ops;
+
+
+static int
+cluster_msg_send(msgctx_t *ctx, void *msg, size_t len)
+{
+ char buf[4096];
+ cluster_msg_hdr_t *h = (void *)buf;
+ int ret;
+ char *msgptr = (buf + sizeof(*h));
+
+ errno = EINVAL;
+ if (ctx->type != MSG_CLUSTER)
+ return -1;
+ if (!(ctx->flags & SKF_WRITE))
+ return -1;
+ if ((len + sizeof(*h)) > sizeof(buf)) {
+ errno = E2BIG;
+ return -1;
+ }
+
+ h->msg_control = M_DATA;
+ h->src_ctx = ctx->u.cluster_info.local_ctx;
+ h->dest_ctx = ctx->u.cluster_info.remote_ctx;
+ h->msg_port = ctx->u.cluster_info.port;
+ memcpy(msgptr, msg, len);
+ h->src_nodeid = _me;
+
+ /*
+ printf("sending cluster message, length = %d to nodeid %d port %d\n",
+ len + sizeof(*h), ctx->u.cluster_info.nodeid, ctx->u.cluster_info.port);
+ */
+
+ swab_cluster_msg_hdr_t(h);
+
+ ret = cman_send_data_unlocked((void *)h, len + sizeof(*h),
+ ctx->u.cluster_info.nodeid,
+ ctx->u.cluster_info.port, 0);
+
+ return len + sizeof(h);
+}
+
+
+/**
+ Assign a (free) cluster context ID if possible
+ */
+static int
+assign_ctx(msgctx_t *ctx)
+{
+ int start;
+ static uint32_t context_index = 1;
+
+ /* Assign context index */
+ pthread_mutex_lock(&context_lock);
+
+ start = context_index;
+ do {
+ context_index++;
+ if (context_index >= MAX_CONTEXTS)
+ context_index = 1;
+
+ if (!contexts[context_index]) {
+ contexts[start] = ctx;
+ ctx->u.cluster_info.local_ctx = start;
+ pthread_mutex_unlock(&context_lock);
+ return 0;
+ }
+ } while (context_index != start);
+
+ pthread_mutex_unlock(&context_lock);
+
+ errno = EAGAIN;
+ return -1;
+}
+
+
+/* See if anything's on the cluster socket. If so, dispatch it
+ on to the requisite queues
+ XXX should be passed a connection arg! */
+static int
+poll_cluster_messages(int timeout)
+{
+ int ret = -1;
+ fd_set rfds;
+ int fd, lfd, max;
+ struct timeval tv;
+ struct timeval *p = NULL;
+ cman_handle_t *ch;
+
+ if (timeout >= 0) {
+ p = &tv;
+ tv.tv_sec = tv.tv_usec = timeout;
+ }
+
+ FD_ZERO(&rfds);
+
+ /* This sucks - it could cause other threads trying to get a
+ membership list to block for a long time. Now, that should not
+ happen. Basically, when we get a membership event, we generate
+ a new membership list in a locally cached copy and reference
+ that.
+
+ */
+ ch = cman_lock_preemptible(1, &lfd);
+ if (!ch) {
+ printf("%s\n", strerror(errno));
+ }
+
+ fd = cman_get_fd(ch);
+ if (fd < 0) {
+ cman_unlock(ch);
+ return 0;
+ }
+ FD_SET(fd, &rfds);
+ FD_SET(lfd, &rfds);
+
+ max = (lfd > fd ? lfd : fd);
+ if (select(fd + 1, &rfds, NULL, NULL, p) > 0) {
+ /* Someone woke us up */
+ if (FD_ISSET(lfd, &rfds)) {
+ cman_unlock(ch);
+ errno = EAGAIN;
+ return -1;
+ }
+
+ cman_dispatch(ch, 0);
+ ret = 0;
+ }
+ cman_unlock(ch);
+
+ return ret;
+}
+
+
+/**
+ This is used to establish and tear down pseudo-private
+ contexts which are shared with the cluster context.
+ */
+static int
+cluster_send_control_msg(msgctx_t *ctx, int type)
+{
+ cluster_msg_hdr_t cm;
+ int ret;
+
+ cm.msg_control = (uint8_t)type;
+ cm.src_nodeid = _me;
+ cm.dest_ctx = ctx->u.cluster_info.remote_ctx;
+ cm.src_ctx = ctx->u.cluster_info.local_ctx;
+ cm.msg_port = ctx->u.cluster_info.port;
+
+ swab_cluster_msg_hdr_t(&cm);
+
+ ret = (cman_send_data_unlocked((void *)&cm, sizeof(cm),
+ ctx->u.cluster_info.nodeid,
+ ctx->u.cluster_info.port, 0));
+ return ret;
+}
+
+
+/**
+ Wait for a message on a context.
+ */
+static int
+cluster_msg_wait(msgctx_t *ctx, int timeout)
+{
+ struct timespec ts = {0, 0};
+ int req = M_NONE;
+ int e;
+
+ errno = EINVAL;
+ if (!ctx)
+ return -1;
+ if (ctx->type != MSG_CLUSTER)
+ return -1;
+ if (!(ctx->flags & (SKF_READ | SKF_LISTEN)))
+ return -1;
+
+ if (timeout > 0) {
+ gettimeofday((struct timeval *)&ts, NULL);
+ ts.tv_sec += timeout;
+ ts.tv_nsec *= 1000;
+ }
+
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+ while (1) {
+
+ /* See if we dispatched any messages on to our queue */
+ if (ctx->u.cluster_info.queue) {
+ req = ctx->u.cluster_info.queue->message->msg_control;
+ /*printf("Queue not empty CTX%d : %d\n",
+ ctx->u.cluster_info.local_ctx, req);*/
+ break;
+ }
+
+ /* Ok, someone else has the mutex on our FD. Go to
+ sleep on a cond; maybe they'll wake us up */
+ e = pthread_cond_timedwait(&ctx->u.cluster_info.cond,
+ &ctx->u.cluster_info.mutex,
+ &ts);
+
+ if (timeout == 0) {
+ break;
+ }
+
+ if (e == 0) {
+ continue;
+ }
+
+ if (e == ETIMEDOUT) {
+ break;
+ }
+ }
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+ return req;
+}
+
+
+static int
+cluster_msg_fd_set(msgctx_t *ctx, fd_set *fds, int *max)
+{
+ int e;
+ msg_q_t *n;
+
+ errno = EINVAL;
+ if (!ctx || !fds)
+ return -1;
+ if (ctx->type != MSG_CLUSTER)
+ return -1;
+
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+ if (ctx->u.cluster_info.select_pipe[0] < 0) {
+ if (pipe(ctx->u.cluster_info.select_pipe) < 0) {
+ e = errno;
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ errno = e;
+ return -1;
+ }
+
+ /*
+ printf("%s: Created cluster CTX select pipe "
+ "rd=%d wr=%d\n", __FUNCTION__,
+ ctx->u.cluster_info.select_pipe[0],
+ ctx->u.cluster_info.select_pipe[1]);
+ */
+
+ /* Ok, we just created the pipe. Now, we need to write
+ a char for every unprocessed event to the pipe, because
+ events could be pending that would otherwise be unhandled
+ by the caller because the caller is switching to select()
+ semantics. (as opposed to msg_wait() ) */
+ list_do(&ctx->u.cluster_info.queue, n) {
+ write(ctx->u.cluster_info.select_pipe[1], "", 1);
+ } while (!list_done(&ctx->u.cluster_info.queue, n));
+ }
+
+ e = ctx->u.cluster_info.select_pipe[0];
+ //printf("%s: cluster %d\n", __FUNCTION__, e);
+ FD_SET(e, fds);
+
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+ if (max && (e > *max))
+ *max = e;
+ return 0;
+}
+
+
+int
+cluster_msg_fd_isset(msgctx_t *ctx, fd_set *fds)
+{
+ errno = EINVAL;
+
+ if (!fds || !ctx)
+ return -1;
+
+ if (ctx->type != MSG_CLUSTER)
+ return -1;
+
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+ if (ctx->u.cluster_info.select_pipe[0] >= 0 &&
+ FD_ISSET(ctx->u.cluster_info.select_pipe[0], fds)) {
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ return 1;
+ }
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ return 0;
+}
+
+
+int
+cluster_msg_fd_clr(msgctx_t *ctx, fd_set *fds)
+{
+ errno = EINVAL;
+
+ if (!fds || !ctx)
+ return -1;
+
+ if (ctx->type != MSG_CLUSTER)
+ return -1;
+
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+ if (ctx->u.cluster_info.select_pipe[0] >= 0) {
+ FD_CLR(ctx->u.cluster_info.select_pipe[0], fds);
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ return 1;
+ }
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ return 0;
+}
+
+
+static int
+_cluster_msg_receive(msgctx_t *ctx, void **msg, size_t *len)
+{
+ cluster_msg_hdr_t *m;
+ msg_q_t *n;
+ int ret = 0;
+ char foo;
+
+ if (msg)
+ *msg = NULL;
+ if (len)
+ *len = 0;
+
+ if (ctx->u.cluster_info.local_ctx < 0 ||
+ ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) {
+ errno = EBADF;
+ return -1;
+ }
+
+ /* trigger receive here */
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+
+ n = ctx->u.cluster_info.queue;
+ if (n == NULL) {
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ errno = EAGAIN;
+ return -1;
+ }
+
+ list_remove(&ctx->u.cluster_info.queue, n);
+
+ if (ctx->u.cluster_info.select_pipe[0] >= 0) {
+ //printf("%s read\n", __FUNCTION__);
+ read(ctx->u.cluster_info.select_pipe[0],
+ &foo, 1);
+ }
+
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+ m = n->message;
+ switch(m->msg_control) {
+ case M_CLOSE:
+ ctx->u.cluster_info.remote_ctx = 0;
+ break;
+ case M_OPEN_ACK:
+ /* Response to new connection */
+ ctx->u.cluster_info.remote_ctx = m->src_ctx;
+ break;
+ case M_DATA:
+ /* Kill the message control structure */
+ memmove(m, &m[1], n->len - sizeof(*m));
+ if (msg)
+ *msg = (void *)m;
+ else {
+ printf("Warning: dropping data message\n");
+ free(m);
+ }
+ if (len)
+ *len = (n->len - sizeof(*m));
+ ret = (n->len - sizeof(*m));
+ free(n);
+
+ //printf("Message received\n");
+ return ret;
+ case M_OPEN:
+ /* Someone is trying to open a connection */
+ default:
+ /* ?!! */
+ ret = -1;
+ break;
+ }
+
+ free(m);
+ free(n);
+
+ return ret;
+}
+
+
+/**
+ Receive a message from a cluster-context. This copies out the contents
+ into the user-specified buffer, and does random other things.
+ */
+static int
+cluster_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+ int req;
+ msg_q_t *n;
+ void *priv_msg;
+ size_t priv_len;
+ char foo;
+
+ errno = EINVAL;
+ if (!ctx)
+ return -1;
+ if (!(ctx->flags & SKF_READ))
+ return -1;
+
+ req = cluster_msg_wait(ctx, timeout);
+
+ switch (req) {
+ case M_DATA:
+ /* Copy out. */
+ req = _cluster_msg_receive(ctx, &priv_msg, &priv_len);
+ if (req < 0) {
+ printf("Ruh roh!\n");
+ return -1;
+ }
+
+ priv_len = (priv_len < maxlen ? priv_len : maxlen);
+
+ if (msg && maxlen)
+ memcpy(msg, priv_msg, priv_len);
+ free(priv_msg);
+ return req;
+ case M_CLOSE:
+ errno = ECONNRESET;
+ return -1;
+ case 0:
+ //printf("Nothing on queue\n");
+ return 0;
+ case M_STATECHANGE:
+ case M_PORTOPENED:
+ case M_PORTCLOSED:
+ case M_TRY_SHUTDOWN:
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+
+ n = ctx->u.cluster_info.queue;
+ if (n == NULL) {
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ errno = EAGAIN;
+ return -1;
+ }
+
+ list_remove(&ctx->u.cluster_info.queue, n);
+
+ if (ctx->u.cluster_info.select_pipe[0] >= 0) {
+ //printf("%s read\n", __FUNCTION__);
+ read(ctx->u.cluster_info.select_pipe[0],
+ &foo, 1);
+ }
+
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+ if (n->message)
+ free(n->message);
+ free(n);
+ return 0;
+ default:
+ printf("PROTOCOL ERROR: Received %d\n", req);
+ return -1;
+ }
+
+ printf("%s: CODE PATH ERROR\n", __FUNCTION__);
+ return -1;
+}
+
+
+/**
+ Open a connection to the specified node ID.
+ If the speficied node is 0, this connects via the socket in
+ /var/run/cluster...
+ */
+static int
+cluster_msg_open(int type, int nodeid, int port, msgctx_t *ctx, int timeout)
+{
+ int t = 0, ret;
+
+ errno = EINVAL;
+ if (!ctx)
+ return -1;
+
+ if (type != MSG_CLUSTER)
+ return -1;
+
+ /*printf("Opening pseudo channel to node %d\n", nodeid);*/
+
+ ctx->type = MSG_CLUSTER;
+ ctx->ops = &cluster_msg_ops;
+ ctx->flags = 0;
+ ctx->u.cluster_info.nodeid = nodeid;
+ ctx->u.cluster_info.port = port;
+ ctx->u.cluster_info.local_ctx = -1;
+ ctx->u.cluster_info.remote_ctx = 0;
+ ctx->u.cluster_info.queue = NULL;
+
+ /* Assign context index */
+ if (assign_ctx(ctx) < 0) {
+ errno = EAGAIN;
+ return -1;
+ }
+ ctx->flags = SKF_READ | SKF_WRITE;
+
+ if (nodeid == CMAN_NODEID_US) {
+ /* Broadcast pseudo ctx; no handshake needed */
+ ctx->flags |= SKF_MCAST;
+ return 0;
+ }
+
+ //printf(" Local CTX: %d\n", ctx->u.cluster_info.local_ctx);
+
+ /* Send open */
+ //printf(" Sending control message M_OPEN\n");
+ if (cluster_send_control_msg(ctx, M_OPEN) < 0) {
+ return -1;
+ }
+
+ /* Ok, wait for a response */
+ while (!is_established(ctx)) {
+ ++t;
+ if (t > timeout) {
+ cluster_msg_close(ctx);
+ errno = ETIMEDOUT;
+ return -1;
+ }
+
+ ret = cluster_msg_wait(ctx, 1);
+ switch(ret) {
+ case M_OPEN_ACK:
+ _cluster_msg_receive(ctx, NULL, NULL);
+ break;
+ case M_NONE:
+ continue;
+ default:
+ printf("PROTO ERROR: M_OPEN_ACK not received: %d %d\n",
+ ret, errno);
+ }
+ }
+
+ /*
+ printf(" Remote CTX: %d\n",
+ ctx->u.cluster_info.remote_ctx);
+ printf(" Pseudo channel established!\n");
+ */
+ return 0;
+}
+
+
+/**
+ Close a connection context (cluster or socket; it doesn't matter)
+ In the case of a cluster context, we need to clear out the
+ receive queue and what-not. This isn't a big deal. Also, we
+ need to tell the other end that we're done -- just in case it does
+ not know yet ;)
+
+ With a socket, the O/S cleans up the buffers for us.
+ */
+int
+cluster_msg_close(msgctx_t *ctx)
+{
+ msg_q_t *n = NULL;
+
+ errno = EINVAL;
+
+ if (!ctx)
+ return -1;
+ if (ctx->type != MSG_CLUSTER)
+ return -1;
+
+ if (ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) {
+ printf("Context invalid during close\n");
+ return -1;
+ }
+
+ pthread_mutex_lock(&context_lock);
+ /* Other threads should not be able to see this again */
+ if (contexts[ctx->u.cluster_info.local_ctx] &&
+ (contexts[ctx->u.cluster_info.local_ctx]->u.cluster_info.local_ctx ==
+ ctx->u.cluster_info.local_ctx)) {
+ //printf("reclaimed context %d\n",
+ //ctx->u.cluster_info.local_ctx);
+ contexts[ctx->u.cluster_info.local_ctx] = NULL;
+ }
+ pthread_mutex_unlock(&context_lock);
+
+ /* Clear receive queue */
+ while ((n = ctx->u.cluster_info.queue) != NULL) {
+ list_remove(&ctx->u.cluster_info.queue, n);
+ free(n->message);
+ free(n);
+ }
+ /* Send close message */
+ if (ctx->u.cluster_info.remote_ctx != 0) {
+ cluster_send_control_msg(ctx, M_CLOSE);
+ }
+
+ /* Close pipe if it's open */
+ if (ctx->u.cluster_info.select_pipe[0] >= 0) {
+ close(ctx->u.cluster_info.select_pipe[0]);
+ ctx->u.cluster_info.select_pipe[0] = -1;
+ }
+ if (ctx->u.cluster_info.select_pipe[1] >= 0) {
+ close(ctx->u.cluster_info.select_pipe[1]);
+ ctx->u.cluster_info.select_pipe[1] = -1;
+ }
+ ctx->type = MSG_NONE;
+ ctx->ops = NULL;
+ return 0;
+}
+
+
+static void
+queue_for_context(msgctx_t *ctx, char *buf, int len)
+{
+ msg_q_t *node;
+
+ while ((node = malloc(sizeof(*node))) == NULL) {
+ sleep(1);
+ }
+ memset(node, 0, sizeof(*node));
+ while ((node->message = malloc(len)) == NULL) {
+ sleep(1);
+ }
+ memcpy(node->message, buf, len);
+ node->len = len;
+
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+ list_insert(&ctx->u.cluster_info.queue, node);
+ /* If a select pipe was set up, wake it up */
+ if (ctx->u.cluster_info.select_pipe[1] >= 0) {
+ //printf("QUEUE_FOR_CONTEXT write\n");
+ if (write(ctx->u.cluster_info.select_pipe[1], "", 1) < 0)
+ perror("queue_for_context write");
+ }
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ pthread_cond_signal(&ctx->u.cluster_info.cond);
+}
+
+
+/**
+ Called by cman_dispatch to deal with messages coming across the
+ cluster socket. This function deals with fanning out the requests
+ and putting them on the per-context queues. We don't have
+ the benefits of pre-configured buffers, so we need this.
+ */
+static void
+process_cman_msg(cman_handle_t h, void *priv, char *buf, int len,
+ uint8_t port, int nodeid)
+{
+ cluster_msg_hdr_t *m = (cluster_msg_hdr_t *)buf;
+ int x;
+
+ if (len < sizeof(*m)) {
+ printf("Message too short.\n");
+ return;
+ }
+
+ swab_cluster_msg_hdr_t(m);
+
+#ifdef DEBUG
+ printf("Processing ");
+ switch(m->msg_control) {
+ case M_NONE:
+ printf("M_NONE\n");
+ break;
+ case M_OPEN:
+ printf("M_OPEN\n");
+ break;
+ case M_OPEN_ACK:
+ printf("M_OPEN_ACK\n");
+ break;
+ case M_DATA:
+ printf("M_DATA\n");
+ break;
+ case M_CLOSE:
+ printf("M_CLOSE\n");
+ break;
+ }
+
+ printf(" Node ID: %d %d\n", m->src_nodeid, nodeid);
+ printf(" Remote CTX: %d Local CTX: %d\n", m->src_ctx, m->dest_ctx);
+#endif
+
+ if (m->dest_ctx >= MAX_CONTEXTS || m->dest_ctx < 0) {
+ printf("Context invalid; ignoring\n");
+ return;
+ }
+
+ pthread_mutex_lock(&context_lock);
+
+ if (m->dest_ctx == 0 && m->msg_control == M_DATA) {
+ /* Copy & place on all broadcast queues if it's a broadcast
+ M_DATA message... */
+ for (x = 0; x < MAX_CONTEXTS; x++) {
+ if (!contexts[x])
+ continue;
+ if (contexts[x]->type != MSG_CLUSTER)
+ continue;
+ if (!(contexts[x]->flags & SKF_MCAST))
+ continue;
+ if (!(contexts[x]->flags & SKF_READ))
+ continue;
+
+ queue_for_context(contexts[x], buf, len);
+ }
+ } else if (contexts[m->dest_ctx]) {
+ /* Normal receive */
+ queue_for_context(contexts[m->dest_ctx], buf, len);
+ }
+ /* If none of the above, then we msg for something we've already
+ detached from our list. No big deal, just ignore. */
+
+ pthread_mutex_unlock(&context_lock);
+ return;
+}
+
+
+/**
+ Accept a new pseudo-private connection coming in over the
+ cluster socket.
+ */
+static int
+cluster_msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx)
+{
+ errno = EINVAL;
+ cluster_msg_hdr_t *m;
+ msg_q_t *n;
+ char done = 0;
+ char foo;
+
+ if (!listenctx || !acceptctx)
+ return -1;
+ if (listenctx->u.cluster_info.local_ctx != 0)
+ return -1;
+ if (!(listenctx->flags & SKF_LISTEN))
+ return -1;
+
+ listenctx->ops->mo_init(acceptctx);
+
+ pthread_mutex_lock(&listenctx->u.cluster_info.mutex);
+
+ n = listenctx->u.cluster_info.queue;
+ if (n == NULL) {
+ pthread_mutex_unlock(&listenctx->u.cluster_info.mutex);
+ errno = EAGAIN;
+ return -1;
+ }
+
+ /* the OPEN should be the first thing on the list; this loop
+ is probably not necessary */
+ list_do(&listenctx->u.cluster_info.queue, n) {
+
+ m = n->message;
+ switch(m->msg_control) {
+ case M_OPEN:
+ list_remove(&listenctx->u.cluster_info.queue, n);
+ /*printf("Accepting connection from %d %d\n",
+ m->src_nodeid, m->src_ctx);*/
+
+ /* New connection */
+ pthread_mutex_init(&acceptctx->u.cluster_info.mutex,
+ NULL);
+ pthread_cond_init(&acceptctx->u.cluster_info.cond,
+ NULL);
+ acceptctx->u.cluster_info.queue = NULL;
+ acceptctx->u.cluster_info.remote_ctx = m->src_ctx;
+ acceptctx->u.cluster_info.nodeid = m->src_nodeid;
+ acceptctx->u.cluster_info.port = m->msg_port;
+ acceptctx->flags = (SKF_READ | SKF_WRITE);
+
+ if (assign_ctx(acceptctx) < 0) {
+ printf("FAILED TO ASSIGN CONTEXT\n");
+ }
+ cluster_send_control_msg(acceptctx, M_OPEN_ACK);
+
+ if (listenctx->u.cluster_info.select_pipe[0] >= 0) {
+ //printf("%s read\n", __FUNCTION__);
+ read(listenctx->u.cluster_info.select_pipe[0],
+ &foo, 1);
+ }
+
+ done = 1;
+ free(m);
+ free(n);
+
+ break;
+ case M_DATA:
+ /* Data messages (i.e. from broadcast msgs) are
+ okay too!... but we don't handle them here */
+ break;
+ default:
+ /* Other message?! */
+ printf("Odd... %d\n", m->msg_control);
+ break;
+ }
+
+ if (done)
+ break;
+
+ } while (!list_done(&listenctx->u.cluster_info.queue, n));
+
+ pthread_mutex_unlock(&listenctx->u.cluster_info.mutex);
+
+ return 0;
+}
+
+
+/**
+ This waits for events on the cluster comms FD and
+ dispatches them using cman_dispatch. Initially,
+ the design had no permanent threads, but that model
+ proved difficult to implement correctly.
+ */
+static void *
+cluster_comms_thread(void *arg)
+{
+ /* SIGUSR2 will cause select() to abort */
+ while (thread_running) {
+ poll_cluster_messages(2);
+ }
+
+ return NULL;
+}
+
+
+/*
+ Transliterates a CMAN event to a control message
+ */
+static void
+process_cman_event(cman_handle_t handle, void *private, int reason, int arg)
+{
+ cluster_msg_hdr_t *msg;
+ int *argp;
+ msg_q_t *node;
+ msgctx_t *ctx;
+
+#if 0
+ printf("EVENT: %p %p %d %d\n", handle, private, reason, arg);
+#endif
+
+ /* Allocate queue node */
+ while ((node = malloc(sizeof(*node))) == NULL) {
+ sleep(1);
+ }
+ memset(node, 0, sizeof(*node));
+
+ /* Allocate message: header + int (for arg) */
+ while ((msg = malloc(sizeof(int)*2 +
+ sizeof(cluster_msg_hdr_t))) == NULL) {
+ sleep(1);
+ }
+ memset(msg, 0, sizeof(int)*2 +sizeof(cluster_msg_hdr_t));
+
+ switch(reason) {
+#if defined(LIBCMAN_VERSION) && LIBCMAN_VERSION >= 2
+ case CMAN_REASON_PORTOPENED:
+ msg->msg_control = M_PORTOPENED;
+ break;
+ case CMAN_REASON_TRY_SHUTDOWN:
+ msg->msg_control = M_TRY_SHUTDOWN;
+ break;
+#endif
+ case CMAN_REASON_PORTCLOSED:
+ msg->msg_control = M_PORTCLOSED;
+ break;
+ case CMAN_REASON_STATECHANGE:
+ msg->msg_control = M_STATECHANGE;
+ break;
+ }
+
+ argp = ((void *)msg + sizeof(cluster_msg_hdr_t));
+ *argp = arg;
+
+ node->len = sizeof(cluster_msg_hdr_t) + sizeof(int);
+ node->message = msg;
+
+ pthread_mutex_lock(&context_lock);
+ ctx = contexts[0]; /* This is the cluster context... */
+ if (!ctx) {
+ /* We received a close for something we've already
+ detached from our list. No big deal, just
+ ignore. */
+ free(node->message);
+ free(node);
+ pthread_mutex_unlock(&context_lock);
+ return;
+ }
+ pthread_mutex_unlock(&context_lock);
+
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+ list_insert(&ctx->u.cluster_info.queue, node);
+ /* If a select pipe was set up, wake it up */
+ if (ctx->u.cluster_info.select_pipe[1] >= 0) {
+ //printf("PROCESS_CMAN_EVENT write\n");
+ if (write(ctx->u.cluster_info.select_pipe[1], "", 1) < 0)
+ perror("process_cman_event write");
+ }
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ pthread_cond_signal(&ctx->u.cluster_info.cond);
+}
+
+
+/* */
+int
+cluster_msg_listen(int me, void *portp, msgctx_t **cluster_ctx)
+{
+ int e;
+ pthread_attr_t attrs;
+ cman_handle_t *ch = NULL;
+ msgctx_t *ctx;
+ int port;
+
+ errno = EINVAL;
+ if (!portp)
+ return -1;
+ port = *(int *)portp;
+ if (port < 10 || port > 254)
+ return -1;
+
+ ch = cman_lock(1, 0);
+ _me = me;
+
+ /* Set up cluster context */
+ ctx = msg_new_ctx();
+ if (!ctx) {
+ cman_unlock(ch);
+ errno = EINVAL;
+ return -1;
+ }
+
+ memset(contexts, 0, sizeof(contexts));
+
+ *cluster_ctx = ctx;
+ if (cman_start_recv_data(ch, process_cman_msg,
+ port) != 0) {
+ e = errno;
+ cman_unlock(ch);
+ msg_free_ctx((msgctx_t *)*cluster_ctx);
+ errno = e;
+ return -1;
+ }
+
+ if (cman_start_notification(ch, process_cman_event) != 0) {
+ e = errno;
+ cman_unlock(ch);
+ msg_free_ctx((msgctx_t *)*cluster_ctx);
+ errno = e;
+ }
+
+ cman_unlock(ch);
+ /* Done with CMAN bits */
+
+ pthread_mutex_lock(&context_lock);
+
+ memset(contexts, 0, sizeof(contexts));
+ contexts[0] = ctx;
+
+ ctx->type = MSG_CLUSTER;
+ ctx->ops = &cluster_msg_ops;
+ ctx->u.cluster_info.local_ctx = 0;
+ ctx->u.cluster_info.remote_ctx = 0;
+ ctx->u.cluster_info.port = port; /* port! */
+ ctx->u.cluster_info.nodeid = 0; /* Broadcast! */
+ ctx->u.cluster_info.select_pipe[0] = -1;
+ ctx->u.cluster_info.select_pipe[1] = -1;
+ ctx->u.cluster_info.queue = NULL;
+ pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL);
+ pthread_cond_init(&ctx->u.cluster_info.cond, NULL);
+ ctx->flags = SKF_LISTEN | SKF_READ | SKF_WRITE | SKF_MCAST;
+ pthread_mutex_unlock(&context_lock);
+
+ pthread_attr_init(&attrs);
+ pthread_attr_setinheritsched(&attrs, PTHREAD_INHERIT_SCHED);
+ pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
+
+ thread_running = 1;
+ pthread_create(&comms_thread, &attrs, cluster_comms_thread, NULL);
+
+ pthread_attr_destroy(&attrs);
+
+ return 0;
+}
+
+
+static void
+cluster_msg_print(msgctx_t *ctx)
+{
+ if (!ctx)
+ return;
+
+ printf("Cluster Message Context %p\n", ctx);
+ printf(" Flags %08x\n", ctx->flags);
+ printf(" Node ID %d\n", ctx->u.cluster_info.nodeid);
+ printf(" Local CTX %d\n", ctx->u.cluster_info.local_ctx);
+ printf(" Remote CTX %d\n", ctx->u.cluster_info.remote_ctx);
+}
+
+
+int
+cluster_msg_shutdown(void)
+{
+ cman_handle_t *ch;
+
+ ch = cman_lock(1, SIGUSR2);
+ cman_end_recv_data(ch);
+ pthread_kill(comms_thread, SIGTERM);
+ cman_unlock(ch);
+
+ return 0;
+}
+
+
+int
+cluster_msg_init(msgctx_t *ctx)
+{
+ errno = EINVAL;
+ if (!ctx)
+ return -1;
+
+ memset(ctx, 0, sizeof(*ctx));
+ ctx->type = MSG_CLUSTER;
+ ctx->ops = &cluster_msg_ops;
+ pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL);
+ pthread_cond_init(&ctx->u.cluster_info.cond, NULL);
+ ctx->u.cluster_info.select_pipe[0] = -1;
+ ctx->u.cluster_info.select_pipe[1] = -1;
+
+ return 0;
+}
+
+
+static msg_ops_t cluster_msg_ops = {
+ .mo_open = cluster_msg_open,
+ .mo_close = cluster_msg_close,
+ .mo_listen = cluster_msg_listen,
+ .mo_accept = cluster_msg_accept,
+ .mo_shutdown = cluster_msg_shutdown,
+ .mo_wait = cluster_msg_wait,
+ .mo_send = cluster_msg_send,
+ .mo_receive = cluster_msg_receive,
+ .mo_fd_set = cluster_msg_fd_set,
+ .mo_fd_isset = cluster_msg_fd_isset,
+ .mo_fd_clr = cluster_msg_fd_clr,
+ .mo_print = cluster_msg_print,
+ .mo_init = cluster_msg_init
+};
/cvs/cluster/cluster/rgmanager/src/clulib/msg_socket.c,v --> standard output
revision 1.1
--- cluster/rgmanager/src/clulib/msg_socket.c
+++ - 2006-07-11 23:52:43.913279000 +0000
@@ -0,0 +1,432 @@
+/*
+ Copyright Red Hat, Inc. 2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
+#define _MESSAGE_BUILD
+#include <message.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/types.h>
+#include <stdlib.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/un.h>
+#include <errno.h>
+#include <sys/time.h>
+#include <fdops.h>
+
+/* Ripped from ccsd's setup_local_socket */
+#define RGMGR_SOCK "/var/run/cluster/rgmanager.sk"
+
+static msg_ops_t sock_msg_ops;
+
+static int
+sock_connect(void)
+{
+ struct sockaddr_un sun;
+ int sock = -1, error = 0;
+
+ memset(&sun, 0, sizeof(sun));
+ sun.sun_family = PF_LOCAL;
+ snprintf(sun.sun_path, sizeof(sun.sun_path), RGMGR_SOCK);
+
+ sock = socket(PF_LOCAL, SOCK_STREAM, 0);
+ if (sock < 0) {
+ error = errno;
+ goto fail;
+ }
+
+ error = connect(sock, (struct sockaddr *)&sun, sizeof(sun));
+ if (error < 0) {
+ error = errno;
+ close(sock);
+ errno = error;
+ sock = -1;
+ goto fail;
+ }
+
+fail:
+
+ return sock;
+}
+
+
+/**
+ Wrapper around write(2)
+ */
+static int
+sock_msg_send(msgctx_t *ctx, void *msg, size_t len)
+{
+ char buf[4096];
+ int ret;
+ local_msg_hdr_t *h = (local_msg_hdr_t *)buf;
+ char *msgptr = (buf + sizeof(*h));
+
+ if (!ctx)
+ return -1;
+ if (!(ctx->flags & SKF_WRITE))
+ return -1;
+
+ /* encapsulate ... ? */
+ if ((len + sizeof(*h)) > sizeof(buf)) {
+ errno = E2BIG;
+ return -1;
+ }
+
+ h->msg_control = M_DATA;
+ h->msg_len = len;
+ memcpy(msgptr, msg, len);
+
+ ret = _write_retry(ctx->u.local_info.sockfd, buf,
+ len + sizeof(*h), NULL);
+
+ if (ret >= sizeof(*h))
+ return (ret - (sizeof(*h)));
+
+ errno = EAGAIN;
+ return -1;
+}
+
+
+static int
+peekypeeky(int fd)
+{
+ local_msg_hdr_t h;
+ int ret;
+
+ while ((ret = recv(fd, (void *)&h, sizeof(h), MSG_PEEK)) < 0) {
+ if (errno == EINTR)
+ continue;
+ return -1;
+ }
+
+ if (ret == sizeof(h))
+ return h.msg_control;
+
+ if (ret == 0)
+ /* Socket closed? */
+ return M_CLOSE;
+
+ /* XXX */
+ printf("PROTOCOL ERROR: Invalid message\n");
+ return M_CLOSE;
+}
+
+
+static int
+sock_msg_wait(msgctx_t *ctx, int timeout)
+{
+ fd_set rfds;
+ struct timeval tv = {0, 0};
+ struct timeval *p = NULL;
+
+ if (timeout >= 0) {
+ tv.tv_sec = timeout;
+ p = &tv;
+ }
+
+ FD_ZERO(&rfds);
+ FD_SET(ctx->u.local_info.sockfd, &rfds);
+
+ if (_select_retry(ctx->u.local_info.sockfd + 1, &rfds,
+ NULL, NULL, p) == 1) {
+ return peekypeeky(ctx->u.local_info.sockfd);
+ }
+
+ return M_NONE;
+}
+
+
+static int
+sock_msg_fd_set(msgctx_t *ctx, fd_set *fds, int *max)
+{
+ errno = EINVAL;
+ if (ctx->type != MSG_SOCKET)
+ return -1;
+ if (!fds)
+ return -1;
+
+ if (ctx->u.local_info.sockfd >= 0) {
+ FD_SET(ctx->u.local_info.sockfd, fds);
+ if (ctx->u.local_info.sockfd > *max)
+ *max = ctx->u.local_info.sockfd;
+ return 0;
+ }
+
+ return -1;
+}
+
+
+static int
+sock_msg_fd_isset(msgctx_t *ctx, fd_set *fds)
+{
+ errno = EINVAL;
+ if (!fds || !ctx)
+ return -1;
+ if (ctx->type != MSG_SOCKET)
+ return -1;
+
+ if (ctx->u.local_info.sockfd >= 0 &&
+ FD_ISSET(ctx->u.local_info.sockfd, fds)) {
+ return 1;
+ }
+ return 0;
+}
+
+
+int
+sock_msg_fd_clr(msgctx_t *ctx, fd_set *fds)
+{
+ errno = EINVAL;
+ if (!fds || !ctx)
+ return -1;
+ if (ctx->type != MSG_SOCKET)
+ return -1;
+
+ if (ctx->u.local_info.sockfd >= 0) {
+ FD_CLR(ctx->u.local_info.sockfd, fds);
+ return 1;
+ }
+ return 0;
+}
+
+
+static int
+_local_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+ struct timeval tv = {0, 0};
+ struct timeval *p = NULL;
+ local_msg_hdr_t h;
+
+ if (timeout > 0) {
+ tv.tv_sec = timeout;
+ p = &tv;
+ }
+
+ if (_read_retry(ctx->u.local_info.sockfd, &h, sizeof(h), p) < 0)
+ return -1;
+
+ if (maxlen < h.msg_len) {
+ printf("WARNING: Buffer too small for message (%d vs %d)!\n",
+ h.msg_len, (int)maxlen);
+ h.msg_len = maxlen;
+ }
+
+ return _read_retry(ctx->u.local_info.sockfd, msg, h.msg_len, p);
+}
+
+
+/**
+ Receive a message from a cluster-context. This copies out the contents
+ into the user-specified buffer, and does random other things.
+ */
+static int
+sock_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+ int req;
+ char priv_msg[4096];
+ size_t priv_len = sizeof(priv_msg);
+
+ errno = EINVAL;
+ if (!msg || !maxlen)
+ return -1;
+ if (ctx->type != MSG_SOCKET)
+ return -1;
+ if (!(ctx->flags & SKF_READ))
+ return -1;
+
+ req = _local_msg_receive(ctx, priv_msg, priv_len, timeout);
+
+ if (req == 0) {
+ errno = ECONNRESET;
+ return -1;
+ }
+
+ if (req < 0)
+ return -1;
+
+ /* Copy out. */
+ priv_len = (priv_len < maxlen ? priv_len : maxlen);
+
+ memcpy(msg, priv_msg, priv_len);
+ return req;
+
+ printf("%s: CODE PATH ERROR\n", __FUNCTION__);
+ return -1;
+}
+
+
+/**
+ Open a connection to the specified node ID.
+ If the speficied node is 0, this connects via the socket in
+ /var/run/cluster...
+ */
+int
+sock_msg_open(int type, int nodeid, int port, msgctx_t *ctx, int timeout)
+{
+ errno = EINVAL;
+ if (!ctx || ctx->type != MSG_SOCKET)
+ return -1;
+ if (type != MSG_SOCKET)
+ return -1;
+
+ if (nodeid != CMAN_NODEID_US)
+ return -1;
+ if ((ctx->u.local_info.sockfd = sock_connect()) < 0)
+ return -1;
+ ctx->flags = (SKF_READ | SKF_WRITE);
+ return 0;
+}
+
+
+/**
+ With a socket, the O/S cleans up the buffers for us.
+ */
+int
+sock_msg_close(msgctx_t *ctx)
+{
+ errno = EINVAL;
+ if (ctx->type != MSG_SOCKET)
+ return -1;
+
+ close(ctx->u.local_info.sockfd);
+ ctx->u.local_info.sockfd = -1;
+ ctx->flags = 0;
+ ctx->type = MSG_NONE;
+ return 0;
+}
+
+
+/**
+ Accept a new pseudo-private connection coming in over the
+ cluster socket.
+ */
+static int
+sock_msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx)
+{
+ errno = EINVAL;
+
+ if (!listenctx || !acceptctx)
+ return -1;
+ if (listenctx->u.local_info.sockfd < 0)
+ return -1;
+ if (!(listenctx->flags & SKF_LISTEN))
+ return -1;
+
+ listenctx->ops->mo_init(acceptctx);
+ acceptctx->u.local_info.sockfd =
+ accept(listenctx->u.local_info.sockfd, NULL, NULL);
+
+ if (acceptctx->u.local_info.sockfd < 0)
+ return -1;
+
+ acceptctx->flags = (SKF_READ | SKF_WRITE);
+ return 0;
+}
+
+
+int
+sock_msg_listen(int me, void *portp, msgctx_t **listen_ctx)
+{
+ int sock;
+ struct sockaddr_un su;
+ mode_t om;
+ msgctx_t *ctx = NULL;
+ char *path = (char *)portp;
+
+ /* Set up cluster context */
+ ctx = msg_new_ctx();
+ if (!ctx)
+ return -1;
+
+ sock = socket(PF_LOCAL, SOCK_STREAM, 0);
+ if (sock < 0)
+ return -1;
+
+ unlink(RGMGR_SOCK);
+ om = umask(077);
+ su.sun_family = PF_LOCAL;
+ snprintf(su.sun_path, sizeof(su.sun_path), path);
+
+ if (bind(sock, &su, sizeof(su)) < 0) {
+ umask(om);
+ goto fail;
+ }
+ umask(om);
+
+ if (listen(sock, SOMAXCONN) < 0)
+ goto fail;
+
+ ctx->type = MSG_SOCKET;
+ ctx->u.local_info.sockfd = sock;
+ ctx->flags = SKF_LISTEN;
+ ctx->ops = &sock_msg_ops;
+ *listen_ctx = ctx;
+ return 0;
+fail:
+ if (ctx)
+ free(ctx);
+ if (sock >= 0)
+ close(sock);
+ return -1;
+}
+
+
+/* XXX INCOMPLETE - no local_ctx setup*/
+int
+sock_msg_init(msgctx_t *ctx)
+{
+ memset(ctx,0,sizeof(*ctx));
+ ctx->type = MSG_SOCKET;
+ ctx->u.local_info.sockfd = -1;
+ ctx->ops = &sock_msg_ops;
+ return 0;
+}
+
+
+void
+sock_msg_print(msgctx_t *ctx)
+{
+ printf("Socket Message Context; fd = %d\n", ctx->u.local_info.sockfd);
+}
+
+
+/* XXX INCOMPLETE */
+int
+sock_msg_shutdown(void)
+{
+ return 0;
+}
+
+
+static msg_ops_t sock_msg_ops = {
+ .mo_open = sock_msg_open,
+ .mo_close = sock_msg_close,
+ .mo_listen = sock_msg_listen,
+ .mo_accept = sock_msg_accept,
+ .mo_shutdown = sock_msg_shutdown,
+ .mo_wait = sock_msg_wait,
+ .mo_send = sock_msg_send,
+ .mo_receive = sock_msg_receive,
+ .mo_fd_set = sock_msg_fd_set,
+ .mo_fd_isset = sock_msg_fd_isset,
+ .mo_fd_clr = sock_msg_fd_clr,
+ .mo_print = sock_msg_print,
+ .mo_init = sock_msg_init
+};
/cvs/cluster/cluster/rgmanager/src/clulib/msgtest.c,v --> standard output
revision 1.1
--- cluster/rgmanager/src/clulib/msgtest.c
+++ - 2006-07-11 23:52:43.995452000 +0000
@@ -0,0 +1,286 @@
+/*
+ Copyright Red Hat, Inc. 2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
+#include <message.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/select.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <cman-private.h>
+
+#define MYPORT 190
+
+int my_node_id = 0;
+int running = 1;
+
+
+void
+sighandler(int sig)
+{
+ running = 0;
+}
+
+
+void *
+piggyback(void *arg)
+{
+ msgctx_t ctx;
+ char buf[4096];
+
+ if (msg_open(MSG_CLUSTER, 0, MYPORT, &ctx, 0) != 0) {
+ printf("Could not set up mcast socket!\n");
+ return NULL;
+ }
+
+ printf("PIGGYBACK CONTEXT\n");
+ msg_print(&ctx);
+ printf("END PIGGYBACK CONTEXT\n");
+
+ while (running) {
+ if (msg_receive(&ctx, buf, sizeof(buf), 2) > 0) {
+ printf("Piggyback received: %s\n", buf);
+ }
+ }
+
+ msg_close(&ctx);
+
+ printf("PIGGY flies...\n");
+
+ return NULL;
+}
+
+
+void *
+private(void *arg)
+{
+ msgctx_t ctx;
+ char buf[4096];
+
+ while (running) {
+ sleep(3);
+
+ /* use pseudoprivate channel */
+ if (msg_open(MSG_CLUSTER, my_node_id, MYPORT, &ctx, 1) != 0) {
+ printf("Could not set up virtual-socket!\n");
+ return NULL;
+ }
+
+ printf("=== pvt thread channel info ===\n");
+ msg_print(&ctx);
+ printf("=== end pvt thread channel info ===\n");
+ fflush(stdout);
+
+ snprintf(buf, sizeof(buf), "Hello!\n");
+ msg_send(&ctx, buf, strlen(buf)+1);
+
+ if (msg_receive(&ctx, buf, sizeof(buf), 10) > 0) {
+ printf("PRIVATE: Received %s\n", buf);
+ fflush(stdout);
+ }
+
+ msg_close(&ctx);
+
+ if (msg_open(MSG_CLUSTER, 0, MYPORT, &ctx, 1) != 0) {
+ printf("Could not set up mcast socket!\n");
+ return NULL;
+ }
+
+ snprintf(buf, sizeof(buf), "Babble, babble\n");
+ msg_send(&ctx, buf, strlen(buf)+1);
+ if (msg_receive(&ctx, buf, sizeof(buf), 1) > 0) {
+ printf("PRIVATE: Via MCAST %s\n", buf);
+ fflush(stdout);
+ }
+ msg_close(&ctx);
+ }
+
+ printf("Private thread is outta here...\n");
+
+ return NULL;
+}
+
+
+void
+clu_initialize(cman_handle_t **ch)
+{
+ if (!ch)
+ exit(1);
+
+ *ch = cman_init(NULL);
+ if (!(*ch)) {
+ printf("Waiting for CMAN to start\n");
+
+ while (!(*ch = cman_init(NULL))) {
+ sleep(1);
+ }
+ }
+
+ if (!cman_is_quorate(*ch)) {
+ /*
+ There are two ways to do this; this happens to be the simpler
+ of the two. The other method is to join with a NULL group
+ and log in -- this will cause the plugin to not select any
+ node group (if any exist).
+ */
+ printf("Waiting for quorum to form\n");
+
+ while (cman_is_quorate(*ch) == 0) {
+ sleep(1);
+ }
+ printf("Quorum formed, starting\n");
+ }
+}
+
+
+int
+side_message(msgctx_t *ctx)
+{
+ msgctx_t actx;
+ char buf[1024];
+
+ if (msg_accept(ctx, &actx) < 0)
+ return -1;
+
+ printf("=== MAIN: Handling side message ===\n");
+ msg_print(&actx);
+ fflush(stdout);
+
+ if (msg_receive(&actx, buf, sizeof(buf), 10) > 0) {
+ printf("MAIN: Received %s\n", buf);
+ snprintf(buf, sizeof(buf), "Goodbye!\n");
+ msg_send(&actx, buf, strlen(buf)+1);
+ }
+
+ msg_close(&actx);
+
+ printf("=== MAIN: end side message ===\n");
+
+ return 0;
+}
+
+
+void
+malloc_dump_table(int, int);
+
+
+void
+sigusr2_handler(int sig)
+{
+}
+
+int
+main(int argc, char **argv)
+{
+ msgctx_t *cluster_ctx;
+ char recvbuf[128];
+ cman_node_t me;
+ int ret;
+ pthread_t piggy, priv;
+ fd_set rfds;
+ int max = 0;
+ int port = MYPORT;
+ cman_handle_t *clu = NULL;
+
+
+ clu_initialize(&clu);
+
+ if (cman_init_subsys(clu) < 0) {
+ perror("cman_init_subsys");
+ return -1;
+ }
+ cman_get_node(clu, CMAN_NODEID_US, &me);
+
+ my_node_id = me.cn_nodeid;
+
+ if (msg_listen(MSG_CLUSTER, (void *)&port,
+ me.cn_nodeid, &cluster_ctx) < 0) {
+ printf("Couldn't set up cluster message system\n");
+ return -1;
+ }
+
+ signal(SIGTERM, sigusr2_handler);
+ signal(SIGUSR2, sigusr2_handler);
+
+ pthread_create(&piggy, NULL, piggyback, NULL);
+ pthread_create(&priv, NULL, private, NULL);
+
+ msg_print(cluster_ctx);
+ while (running) {
+ max = 0;
+ FD_ZERO(&rfds);
+ FD_SET(STDIN_FILENO, &rfds);
+ msg_fd_set(cluster_ctx, &rfds, &max);
+
+ select(max+1, &rfds, NULL, NULL, NULL);
+
+ if (FD_ISSET(STDIN_FILENO, &rfds)) {
+ fgets(recvbuf, 128, stdin);
+ if (recvbuf[0] == 'q' || recvbuf[0] == 'Q')
+ break;
+ if (msg_send(cluster_ctx, recvbuf,
+ strlen(recvbuf)+1) < 0)
+ perror("msg_send");
+ FD_CLR(STDIN_FILENO, &rfds);
+ }
+
+ if (!msg_fd_isset(cluster_ctx, &rfds))
+ continue;
+
+ ret = msg_wait(cluster_ctx, 1);
+
+ switch(ret) {
+ case M_DATA:
+ msg_receive(cluster_ctx, recvbuf, 128, 10);
+ printf("MAIN: received %s\n", recvbuf);
+ break;
+ case M_OPEN:
+ printf("MAIN: private connection detected\n");
+ side_message(cluster_ctx);
+ break;
+ case 0:
+ /* No data; probably a control msg */
+ break;
+ default:
+ printf("Cluster EV: %d\n", ret);
+ /* Cluster events, etc. */
+ msg_receive(cluster_ctx, recvbuf, 128, 0);
+ }
+ }
+
+ printf("Shutting down...\n");
+
+ running = 0;
+
+ pthread_join(piggy, NULL);
+ pthread_join(priv, NULL);
+
+ msg_close(cluster_ctx);
+ msg_free_ctx(cluster_ctx);
+ msg_shutdown();
+
+ cman_finish(clu);
+
+ malloc_dump_table(0, 1024);
+
+ exit(0);
+}
--- cluster/rgmanager/src/clulib/Makefile 2006/06/02 17:37:10 1.8
+++ cluster/rgmanager/src/clulib/Makefile 2006/07/11 23:52:41 1.9
@@ -21,7 +21,7 @@
CFLAGS+= -g -Wstrict-prototypes -Wshadow -fPIC -D_GNU_SOURCE
-TARGETS=libclulib.a liblalloc.a
+TARGETS=libclulib.a liblalloc.a msgtest
all: ${TARGETS}
@@ -29,9 +29,12 @@
uninstall:
+msgtest: msgtest.o libclulib.a
+ gcc -o msgtest msgtest.o -lcman -L. -lclulib -llalloc -lpthread
+
libclulib.a: clulog.o daemon_init.o signals.o msgsimple.o \
- gettid.o rg_strings.o message.o members.o fdops.o
- # lock.o lockspace.o
+ gettid.o rg_strings.o message.o members.o fdops.o \
+ lock.o cman.o vft.o msg_cluster.o msg_socket.o
${AR} cru $@ $^
ranlib $@
--- cluster/rgmanager/src/clulib/alloc.c 2006/01/20 16:27:24 1.7
+++ cluster/rgmanager/src/clulib/alloc.c 2006/07/11 23:52:41 1.8
@@ -129,8 +129,9 @@
#undef AGGR_RECLAIM /* consolidate_all on free (*slow*) */
#undef STACKSIZE /*4 backtrace to store if DEBUG is set */
+//#define STACKSIZE 4
-#undef GDB_HOOK /* Dump program addresses in malloc_table
+#undef GDB_HOOK /* Dump program addresses in malloc_table
using a fork/exec of gdb (SLOW but fun)
building this defeats the purpose of
a bounded memory allocator, and is only
--- cluster/rgmanager/src/clulib/lock.c 2006/06/13 19:22:38 1.1
+++ cluster/rgmanager/src/clulib/lock.c 2006/07/11 23:52:41 1.2
@@ -31,6 +31,10 @@
#include <sys/select.h>
#include <pthread.h>
+/* Default lockspace stuff */
+static dlm_lshandle_t _default_ls = NULL;
+static pthread_mutex_t _default_lock = PTHREAD_MUTEX_INITIALIZER;
+
static void
ast_function(void * __attribute__ ((unused)) arg)
@@ -39,7 +43,7 @@
static int
-wait_for_dlm_event(dlm_lshandle_t *ls)
+wait_for_dlm_event(dlm_lshandle_t ls)
{
fd_set rfds;
int fd = dlm_ls_get_fd(ls);
@@ -55,15 +59,18 @@
int
-clu_lock(dlm_lshandle_t ls,
- int mode,
- struct dlm_lksb *lksb,
- int options,
- char *resource)
+clu_ls_lock(dlm_lshandle_t ls,
+ int mode,
+ struct dlm_lksb *lksb,
+ int options,
+ char *resource)
{
int ret;
if (!ls || !lksb || !resource || !strlen(resource)) {
+ printf("%p %p %p %d\n", ls, lksb, resource,
+ (int)strlen(resource));
+ printf("INVAL...\n");
errno = EINVAL;
return -1;
}
@@ -91,15 +98,22 @@
dlm_lshandle_t
-clu_acquire_lockspace(const char *lsname)
+clu_open_lockspace(const char *lsname)
{
dlm_lshandle_t ls = NULL;
+ //printf("opening lockspace %s\n", lsname);
+
while (!ls) {
ls = dlm_open_lockspace(lsname);
if (ls)
break;
+ /*
+ printf("Failed to open: %s; trying create.\n",
+ strerror(errno));
+ */
+
ls = dlm_create_lockspace(lsname, 0644);
if (ls)
break;
@@ -119,9 +133,8 @@
}
-
int
-clu_unlock(dlm_lshandle_t ls, struct dlm_lksb *lksb)
+clu_ls_unlock(dlm_lshandle_t ls, struct dlm_lksb *lksb)
{
int ret;
@@ -147,7 +160,146 @@
int
-clu_release_lockspace(dlm_lshandle_t ls, char *name)
+clu_close_lockspace(dlm_lshandle_t ls, const char *name)
{
return dlm_release_lockspace(name, ls, 0);
}
+
+
+int
+clu_lock(int mode,
+ struct dlm_lksb *lksb,
+ int options,
+ char *resource)
+{
+ int ret = 0, block = 0, conv = 0, err;
+
+ block = !(options & LKF_NOQUEUE);
+
+ /*
+ Try to use a conversion lock mechanism when possible
+ If the caller calls explicitly with a NULL lock, then
+ assume the caller knows what it is doing.
+
+ Only take the NULL lock if:
+ (a) the user isn't specifying CONVERT; if they are, they
+ know what they are doing.
+
+ ...and one of...
+
+ (b) This is a blocking call, or
+ (c) The user requested a NULL lock explicitly. In this case,
+ short-out early; there's no reason to convert a NULL lock
+ to a NULL lock.
+ */
+ if (!(options & LKF_CONVERT) &&
+ (block || (mode == LKM_NLMODE))) {
+ /* Acquire NULL lock */
+ pthread_mutex_lock(&_default_lock);
+ ret = clu_ls_lock(_default_ls, LKM_NLMODE, lksb,
+ (options & ~LKF_NOQUEUE),
+ resource);
+ err = errno;
+ pthread_mutex_unlock(&_default_lock);
+ if (ret == 0) {
+ if (mode == LKM_NLMODE) {
+ /* User only wanted a NULL lock... */
+ return 0;
+ }
+ /*
+ Ok, NULL lock was taken, rest of blocking
+ call should be done using lock conversions.
+ */
+ options |= LKF_CONVERT;
+ conv = 1;
+ } else {
+ switch(err) {
+ case EINVAL:
+ /* Oops, null locks don't work on this
+ plugin; use normal spam mode */
+ break;
+ default:
+ errno = err;
+ return -1;
+ }
+ }
+ }
+
+ while (1) {
+ pthread_mutex_lock(&_default_lock);
+ ret = clu_ls_lock(_default_ls, mode, lksb,
+ (options | LKF_NOQUEUE),
+ resource);
+ err = errno;
+ pthread_mutex_unlock(&_default_lock);
+
+ if ((ret != 0) && (err == EAGAIN) && block) {
+ usleep(random()&32767);
+ continue;
+ }
+
+ break;
+ }
+
+ if (ret != 0 && conv) {
+ /* If we get some other error, release the NL lock we
+ took so we don't leak locks*/
+ pthread_mutex_lock(&_default_lock);
+ clu_ls_unlock(_default_ls, lksb);
+ pthread_mutex_unlock(&_default_lock);
+ errno = err;
+ }
+
+ return ret;
+}
+
+
+int
+clu_unlock(struct dlm_lksb *lksb)
+{
+ int ret, err;
+ pthread_mutex_lock(&_default_lock);
+ ret = clu_ls_unlock(_default_ls, lksb);
+ err = errno;
+ pthread_mutex_unlock(&_default_lock);
+
+ usleep(random()&32767);
+ errno = err;
+ return ret;
+}
+
+
+int
+clu_lock_init(const char *dflt_lsname)
+{
+ int ret, err;
+
+ pthread_mutex_lock(&_default_lock);
+ if (_default_ls) {
+ pthread_mutex_unlock(&_default_lock);
+ return 0;
+ }
+
+ if (!dflt_lsname || !strlen(dflt_lsname)) {
+ pthread_mutex_unlock(&_default_lock);
+ errno = EINVAL;
+ return -1;
+ }
+
+ _default_ls = clu_open_lockspace(dflt_lsname);
+ ret = (_default_ls == NULL);
+ err = errno;
+ pthread_mutex_unlock(&_default_lock);
+
+ errno = err;
+ return ret;
+}
+
+void
+clu_lock_finished(const char *name)
+{
+ pthread_mutex_lock(&_default_lock);
+ if (_default_ls)
+ clu_close_lockspace(_default_ls, name);
+ pthread_mutex_unlock(&_default_lock);
+}
--- cluster/rgmanager/src/clulib/lockspace.c 2006/06/13 19:22:38 1.1
+++ cluster/rgmanager/src/clulib/lockspace.c 2006/07/11 23:52:41 1.2
@@ -1,3 +1,21 @@
+/*
+ Copyright Red Hat, Inc. 2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
--- cluster/rgmanager/src/clulib/members.c 2006/06/13 19:22:38 1.1
+++ cluster/rgmanager/src/clulib/members.c 2006/07/11 23:52:41 1.2
@@ -1,3 +1,21 @@
+/*
+ Copyright Red Hat, Inc. 2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
#include <sys/types.h>
#include <arpa/inet.h>
#include <stdint.h>
@@ -8,10 +26,12 @@
#include <members.h>
#include <stdlib.h>
#include <unistd.h>
+#include <signal.h>
#include <string.h>
#include <sys/socket.h>
#include <rg_types.h>
#include <pthread.h>
+#include <errno.h>
static int my_node_id = -1;
static pthread_rwlock_t memblock = PTHREAD_RWLOCK_INITIALIZER;
@@ -73,29 +93,32 @@
{
int count, x, y;
char in_old = 0;
- cluster_member_list_t *gained;
+ cluster_member_list_t *gained = NULL;
/* No nodes in new? Then nothing could have been gained */
if (!new || !new->cml_count)
return NULL;
/* Nothing in old? Duplicate 'new' and return it. */
- if (!old || !old->cml_count) {
- gained = cml_alloc(new->cml_count);
- if (!gained)
- return NULL;
- memcpy(gained, new, cml_size(new->cml_count));
- return gained;
- }
+ if (!old || !old->cml_count)
+ return member_list_dup(new);
/* Use greatest possible count */
count = (old->cml_count > new->cml_count ?
- cml_size(old->cml_count) : cml_size(new->cml_count));
+ old->cml_count : new->cml_count);
+ count *= sizeof(cman_node_t);
- gained = malloc(count);
+ gained = malloc(sizeof(cluster_member_list_t));
if (!gained)
return NULL;
- memset(gained, 0, count);
+ memset(gained, 0, sizeof(*gained));
+
+ gained->cml_members = malloc(count);
+ if (!gained->cml_members) {
+ free(gained);
+ return NULL;
+ }
+ memset(gained->cml_members, 0, count);
for (x = 0; x < new->cml_count; x++) {
@@ -121,6 +144,7 @@
}
if (gained->cml_count == 0) {
+ free(gained->cml_members);
free(gained);
gained = NULL;
}
@@ -145,7 +169,7 @@
cluster_member_list_t *
memb_lost(cluster_member_list_t *old, cluster_member_list_t *new)
{
- cluster_member_list_t *ret;
+ cluster_member_list_t *ret = NULL;
int x;
/* Reverse. ;) */
@@ -213,26 +237,42 @@
get_member_list(cman_handle_t h)
{
int c;
+ int tries = 0, local = 0;
cluster_member_list_t *ml = NULL;
cman_node_t *nodes = NULL;
- do {
+ if (h == NULL) {
+ local = 1;
+ h = cman_init(NULL);
+ if (!h)
+ return NULL;
+ }
+
+ do {
+ ++tries;
if (nodes)
free(nodes);
c = cman_get_node_count(h);
- if (c <= 0)
- return NULL;
+ if (c <= 0) {
+ if (errno == EINTR)
+ continue;
+ if (ml)
+ free(ml);
+ ml = NULL;
+ goto out;
+ }
if (!ml)
ml = malloc(sizeof(*ml));
if (!ml)
- return NULL;
+ goto out;
nodes = malloc(sizeof(*nodes) * c);
if (!nodes) {
free(ml);
- return NULL;
+ ml = NULL;
+ goto out;
}
memset(ml, 0, sizeof(*ml));
@@ -244,6 +284,10 @@
ml->cml_members = nodes;
ml->cml_count = c;
+
+out:
+ if (local)
+ cman_finish(h);
return ml;
}
@@ -264,6 +308,9 @@
{
int x = 0;
+ if (!ml)
+ return 0;
+
for (x = 0; x < ml->cml_count; x++) {
if (ml->cml_members[x].cn_nodeid == nodeid)
return ml->cml_members[x].cn_member;
@@ -278,6 +325,9 @@
{
int x = 0, count = 0;
+ if (!ml)
+ return 0;
+
for (x = 0; x < ml->cml_count; x++) {
if (ml->cml_members[x].cn_member)
++count;
@@ -292,6 +342,9 @@
{
int x = 0;
+ if (!ml)
+ return -1;
+
for (x = 0; x < ml->cml_count; x++) {
if (ml->cml_members[x].cn_nodeid == nodeid)
ml->cml_members[x].cn_member = 0;
@@ -306,13 +359,15 @@
memb_id_to_name(cluster_member_list_t *ml, int nodeid)
{
int x = 0;
+ if (!ml)
+ return NULL;
for (x = 0; x < ml->cml_count; x++) {
if (ml->cml_members[x].cn_nodeid == nodeid)
return ml->cml_members[x].cn_name;
}
- return 0;
+ return NULL;
}
@@ -320,13 +375,15 @@
memb_id_to_p(cluster_member_list_t *ml, int nodeid)
{
int x = 0;
+ if (!ml)
+ return NULL;
for (x = 0; x < ml->cml_count; x++) {
if (ml->cml_members[x].cn_nodeid == nodeid)
return &ml->cml_members[x];
}
- return 0;
+ return NULL;
}
@@ -334,6 +391,8 @@
memb_online_name(cluster_member_list_t *ml, char *name)
{
int x = 0;
+ if (!ml)
+ return 0;
for (x = 0; x < ml->cml_count; x++) {
if (!strcasecmp(ml->cml_members[x].cn_name, name))
@@ -348,6 +407,8 @@
memb_name_to_id(cluster_member_list_t *ml, char *name)
{
int x = 0;
+ if (!ml)
+ return 0;
for (x = 0; x < ml->cml_count; x++) {
if (!strcasecmp(ml->cml_members[x].cn_name, name))
@@ -362,13 +423,15 @@
memb_name_to_p(cluster_member_list_t *ml, char *name)
{
int x = 0;
+ if (!ml)
+ return NULL;
for (x = 0; x < ml->cml_count; x++) {
if (!strcasecmp(ml->cml_members[x].cn_name, name))
return &ml->cml_members[x];
}
- return 0;
+ return NULL;
}
/**
@@ -388,9 +451,19 @@
if (!orig)
return NULL;
- ret = malloc(cml_size(orig->cml_count));
- memset(ret, 0, cml_size(orig->cml_count));
- memcpy(ret, orig, cml_size(orig->cml_count));
+ ret = malloc(sizeof(cluster_member_list_t));
+ if (!ret)
+ return NULL;
+ memset(ret, 0, sizeof(cluster_member_list_t));
+ ret->cml_members = malloc(sizeof(cman_node_t) * orig->cml_count);
+
+ if (!ret->cml_members) {
+ free(ret);
+ return NULL;
+ }
+ ret->cml_count = orig->cml_count;
+ memcpy(ret->cml_members, orig->cml_members,
+ orig->cml_count * sizeof(cman_node_t));
return ret;
}
--- cluster/rgmanager/src/clulib/message.c 2006/06/13 19:22:38 1.1
+++ cluster/rgmanager/src/clulib/message.c 2006/07/11 23:52:41 1.2
@@ -1,138 +1,41 @@
+/*
+ Copyright Red Hat, Inc. 2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
#define _MESSAGE_BUILD
#include <message.h>
#include <stdio.h>
-#include <pthread.h>
-#include <libcman.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <stdlib.h>
-#include <rg_types.h>
-#include <sys/socket.h>
#include <sys/stat.h>
-#include <sys/un.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
#include <errno.h>
-#include <sys/time.h>
-#include <fdops.h>
-#include <resgroup.h>
-
-
-
-/* Ripped from ccsd's setup_local_socket */
-#define RGMGR_SOCK "/var/run/cluster/rgmanager.sk"
-#define MAX_CONTEXTS 32 /* Testing; production should be 1024-ish */
-
-/* Context 0 is reserved for control messages */
-
-/* Local-ish contexts */
-static pthread_mutex_t context_lock = PTHREAD_MUTEX_INITIALIZER;
-static msgctx_t *contexts[MAX_CONTEXTS];
-static uint32_t context_index = 1;
-static chandle_t *gch;
-pthread_t comms_thread;
-int thread_running;
-
-
-#define is_established(ctx) \
- (((ctx->type == MSG_CLUSTER) && \
- (ctx->u.cluster_info.remote_ctx && ctx->u.cluster_info.local_ctx)) || \
- ((ctx->type == MSG_SOCKET) && \
- (ctx->u.local_info.sockfd != -1)))
-
-
-static int
-local_connect(void)
-{
- struct sockaddr_un sun;
- int sock = -1, error = 0;
-
- memset(&sun, 0, sizeof(sun));
- sun.sun_family = PF_LOCAL;
- snprintf(sun.sun_path, sizeof(sun.sun_path), RGMGR_SOCK);
-
- sock = socket(PF_LOCAL, SOCK_STREAM, 0);
- if (sock < 0) {
- error = errno;
- goto fail;
- }
-
- error = connect(sock, (struct sockaddr *)&sun, sizeof(sun));
- if (error < 0) {
- error = errno;
- goto fail;
- }
-
- sock = error;
-fail:
-
- return sock;
-}
-
-
-static int
-send_cluster_message(msgctx_t *ctx, void *msg, size_t len)
-{
- char buf[4096];
- cluster_msg_hdr_t *h = (void *)buf;
- int ret;
- char *msgptr = (buf + sizeof(*h));
-
- if ((len + sizeof(*h)) > sizeof(buf)) {
- errno = E2BIG;
- return -1;
- }
-
- h->msg_control = M_DATA;
- h->src_ctx = ctx->u.cluster_info.local_ctx;
- h->dest_ctx = ctx->u.cluster_info.remote_ctx;
- h->msg_port = ctx->u.cluster_info.port;
- memcpy(msgptr, msg, len);
- /*
- printf("sending cluster message, length = %d to nodeid %d port %d\n",
- len + sizeof(*h), ctx->u.cluster_info.nodeid, ctx->u.cluster_info.port);
- */
-
- pthread_mutex_lock(&gch->c_lock);
- h->src_nodeid = gch->c_nodeid;
-
- swab_cluster_msg_hdr_t(h);
-
- ret = cman_send_data(gch->c_cluster, (void *)h, len + sizeof(*h),
- ctx->u.cluster_info.nodeid,
- ctx->u.cluster_info.port, 0);
-
- pthread_mutex_unlock(&gch->c_lock);
-
- return len + sizeof(h);
-}
-
-
-/**
- Wrapper around write(2)
- */
-static int
-send_socket_message(msgctx_t *ctx, void *msg, size_t len)
-{
- char buf[4096];
- local_msg_hdr_t *h = (local_msg_hdr_t *)buf;
- char *msgptr = (buf + sizeof(*h));
-
- /* encapsulate ... ? */
- if ((len + sizeof(*h)) > sizeof(buf)) {
- errno = E2BIG;
- return -1;
- }
-
- h->msg_control = M_DATA;
- h->msg_len = len;
- memcpy(msgptr, msg, len);
-
- return _write_retry(ctx->u.local_info.sockfd, msg, len + sizeof(*h), NULL);
-}
+/* From msg_cluster */
+int cluster_msg_init(msgctx_t *ctx);
+int cluster_msg_listen(int me, void *, msgctx_t **ctx);
+int cluster_msg_shutdown(void);
+
+/* From msg_socket */
+int sock_msg_init(msgctx_t *ctx);
+int sock_msg_listen(int me, void *, msgctx_t **ctx);
+int sock_msg_shutdown(void);
/**
@@ -142,238 +45,22 @@
int
msg_send(msgctx_t *ctx, void *msg, size_t len)
{
- if (!ctx || !msg || !len) {
- errno = EINVAL;
- return -1;
- }
-
- switch(ctx->type) {
- case MSG_CLUSTER:
- return send_cluster_message(ctx, msg, len);
- case MSG_SOCKET:
- return send_socket_message(ctx, msg, len);
- default:
- break;
- }
-
errno = EINVAL;
- return -1;
-}
-
-
-/**
- Assign a (free) cluster context ID
- */
-static int
-assign_ctx(msgctx_t *ctx)
-{
- int start;
-
- /* Assign context index */
- ctx->type = MSG_CLUSTER;
-
- pthread_mutex_lock(&context_lock);
- start = context_index++;
- if (context_index >= MAX_CONTEXTS || context_index <= 0)
- context_index = 1;
- do {
- if (contexts[context_index]) {
- ++context_index;
- if (context_index >= MAX_CONTEXTS)
- context_index = 1;
-
- if (context_index == start) {
- pthread_mutex_unlock(&context_lock);
- errno = EAGAIN;
- return -1;
- }
-
- continue;
- }
-
- contexts[context_index] = ctx;
- ctx->u.cluster_info.local_ctx = context_index;
-
- } while (0);
- pthread_mutex_unlock(&context_lock);
-
- return 0;
-}
-
-
-/* See if anything's on the cluster socket. If so, dispatch it
- on to the requisite queues
- XXX should be passed a connection arg! */
-static int
-poll_cluster_messages(int timeout)
-{
- int ret = -1;
- fd_set rfds;
- int fd;
- struct timeval tv;
- struct timeval *p = NULL;
-
- if (timeout >= 0) {
- p = &tv;
- tv.tv_sec = tv.tv_usec = timeout;
- }
- printf("%s\n", __FUNCTION__);
-
- FD_ZERO(&rfds);
-
- //pthread_mutex_lock(&gch->c_lock);
- fd = cman_get_fd(gch->c_cluster);
- FD_SET(fd, &rfds);
-
- if (select(fd + 1, &rfds, NULL, NULL, p) == 1) {
- cman_dispatch(gch->c_cluster, 0);
- ret = 0;
- }
- //pthread_mutex_unlock(&gch->c_lock);
-
- return ret;
-}
-
-
-/**
- This is used to establish and tear down pseudo-private
- contexts which are shared with the cluster context.
- */
-static int
-cluster_send_control_msg(msgctx_t *ctx, int type)
-{
- cluster_msg_hdr_t cm;
-
- cm.msg_control = (uint8_t)type;
- cm.src_nodeid = gch->c_nodeid;
- cm.dest_ctx = ctx->u.cluster_info.remote_ctx;
- cm.src_ctx = ctx->u.cluster_info.local_ctx;
- cm.msg_port = ctx->u.cluster_info.port;
-
- swab_cluster_msg_hdr_t(&cm);
-
- return (cman_send_data(gch->c_cluster, (void *)&cm, sizeof(cm),
- ctx->u.cluster_info.nodeid,
- ctx->u.cluster_info.port, 0));
-}
-
-
-/**
- Wait for a message on a context.
- */
-static int
-cluster_msg_wait(msgctx_t *ctx, int timeout)
-{
- struct timespec ts = {0, 0};
- int req = M_NONE;
- struct timeval start;
- struct timeval now;
-
-
- if (timeout > 0)
- gettimeofday(&start, NULL);
-
- ts.tv_sec = !!timeout;
-
- pthread_mutex_lock(&ctx->u.cluster_info.mutex);
- while (1) {
- /* See if we dispatched any messages on to our queue */
- if (ctx->u.cluster_info.queue) {
- req = ctx->u.cluster_info.queue->message->msg_control;
- /*printf("Queue not empty CTX%d : %d\n",
- ctx->u.cluster_info.local_ctx, req);*/
- break;
- }
-
- if (timeout == 0)
- break;
-
- /* Ok, someone else has the mutex on our FD. Go to
- sleep on a cond; maybe they'll wake us up */
- if (pthread_cond_timedwait(&ctx->u.cluster_info.cond,
- &ctx->u.cluster_info.mutex,
- &ts) < 0) {
-
- /* Mutex held */
- if (errno == ETIMEDOUT) {
- if (timeout < 0) {
- ts.tv_sec = 1;
- ts.tv_nsec = 0;
- continue;
- }
-
- ts.tv_sec = !!timeout;
-
- /* Done */
- break;
- }
- }
-
- if (timeout > 0) {
- gettimeofday(&now, NULL);
- /* XXX imprecise */
- if (now.tv_sec - start.tv_sec > timeout)
- break;
- }
- }
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
-
- return req;
-}
-
-
-static int
-peekypeeky(int fd)
-{
- local_msg_hdr_t h;
- int ret;
-
- while ((ret = recv(fd, (void *)&h, sizeof(h), MSG_PEEK)) < 0) {
- if (errno == EINTR)
- continue;
+ if (!ctx || !msg || !len)
return -1;
- }
-
- if (ret == sizeof(h))
- return h.msg_control;
-
- if (ret == 0)
- /* Socket closed? */
- return M_CLOSE;
- /* XXX */
- printf("PROTOCOL ERROR: Invalid message\n");
- return M_CLOSE;
-}
-
-
-static int
-local_msg_wait(msgctx_t *ctx, int timeout)
-{
- fd_set rfds;
- struct timeval tv = {0, 0};
- struct timeval *p = NULL;
-
- if (timeout >= 0) {
- tv.tv_sec = timeout;
- p = &tv;
- }
-
- FD_ZERO(&rfds);
- FD_SET(ctx->u.local_info.sockfd, &rfds);
-
- if (_select_retry(ctx->u.local_info.sockfd + 1, &rfds,
- NULL, NULL, p) == 1) {
- return peekypeeky(ctx->u.local_info.sockfd);
- }
-
- return M_NONE;
+ if (ctx->ops && ctx->ops->mo_send)
+ return ctx->ops->mo_send(ctx, msg, len);
+ errno = ENOSYS;
+ return -1;
}
+/* XXX get API for this ready */
int
msg_get_nodeid(msgctx_t *ctx)
{
+ /* XXX */
switch(ctx->type) {
case MSG_CLUSTER:
return ctx->u.cluster_info.nodeid;
@@ -390,49 +77,13 @@
int
msg_fd_set(msgctx_t *ctx, fd_set *fds, int *max)
{
- int e;
- switch(ctx->type) {
- case MSG_CLUSTER:
- pthread_mutex_lock(&ctx->u.cluster_info.mutex);
- if (ctx->u.cluster_info.select_pipe[0] < 0) {
- if (pipe(ctx->u.cluster_info.select_pipe) < 0) {
- e = errno;
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
- errno = e;
- return -1;
- }
-
- printf("%s: Created cluster CTX select pipe "
- "rd=%d wr=%d\n", __FUNCTION__,
- ctx->u.cluster_info.select_pipe[0],
- ctx->u.cluster_info.select_pipe[1]);
-
- }
-
- e = ctx->u.cluster_info.select_pipe[0];
- printf("%s: cluster %d\n", __FUNCTION__, e);
- FD_SET(e, fds);
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
-
- if (e > *max)
- *max = e;
- return 0;
-
- case MSG_SOCKET:
- if (ctx->u.local_info.sockfd >= 0) {
- printf("%s: local %d\n", __FUNCTION__,
- ctx->u.local_info.sockfd);
- FD_SET(ctx->u.local_info.sockfd, fds);
-
- if (ctx->u.local_info.sockfd > *max)
- *max = ctx->u.local_info.sockfd;
- return 0;
- }
+ errno = EINVAL;
+ if (!ctx)
return -1;
- default:
- break;
- }
-
+
+ if (ctx->ops && ctx->ops->mo_fd_set)
+ return ctx->ops->mo_fd_set(ctx, fds, max);
+ errno = ENOSYS;
return -1;
}
@@ -441,30 +92,12 @@
msg_fd_isset(msgctx_t *ctx, fd_set *fds)
{
errno = EINVAL;
-
- if (!fds || !ctx)
+ if (!ctx)
return -1;
-
- switch(ctx->type) {
- case MSG_CLUSTER:
- pthread_mutex_lock(&ctx->u.cluster_info.mutex);
- if (ctx->u.cluster_info.select_pipe[0] >= 0 &&
- FD_ISSET(ctx->u.cluster_info.select_pipe[0], fds)) {
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
- return 1;
- }
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
- return 0;
- case MSG_SOCKET:
- if (ctx->u.local_info.sockfd >= 0 &&
- FD_ISSET(ctx->u.local_info.sockfd, fds)) {
- return 1;
- }
- return 0;
- default:
- break;
- }
-
+
+ if (ctx->ops && ctx->ops->mo_fd_isset)
+ return ctx->ops->mo_fd_isset(ctx, fds);
+ errno = ENOSYS;
return -1;
}
@@ -473,30 +106,12 @@
msg_fd_clr(msgctx_t *ctx, fd_set *fds)
{
errno = EINVAL;
-
- if (!fds || !ctx)
+ if (!ctx)
return -1;
-
- switch(ctx->type) {
- case MSG_CLUSTER:
- pthread_mutex_lock(&ctx->u.cluster_info.mutex);
- if (ctx->u.cluster_info.select_pipe[0] >= 0) {
- FD_CLR(ctx->u.cluster_info.select_pipe[0], fds);
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
- return 1;
- }
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
- return 0;
- case MSG_SOCKET:
- if (ctx->u.local_info.sockfd >= 0) {
- FD_CLR(ctx->u.local_info.sockfd, fds);
- return 1;
- }
- return 0;
- default:
- break;
- }
-
+
+ if (ctx->ops && ctx->ops->mo_fd_clr)
+ return ctx->ops->mo_fd_clr(ctx, fds);
+ errno = ENOSYS;
return -1;
}
@@ -519,232 +134,28 @@
int
msg_wait(msgctx_t *ctx, int timeout)
{
-
- if (!ctx) {
- errno = EINVAL;
- return -1;
- }
-
- switch(ctx->type) {
- case MSG_CLUSTER:
- return cluster_msg_wait(ctx, timeout);
- case MSG_SOCKET:
- return local_msg_wait(ctx, timeout);
- default:
- break;
- }
-
errno = EINVAL;
- return -1;
-}
-
-
-int
-_cluster_msg_receive(msgctx_t *ctx, void **msg, size_t *len)
-{
- cluster_msg_hdr_t *m;
- msg_q_t *n;
- int ret = 0;
-
- if (msg)
- *msg = NULL;
- if (len)
- *len = 0;
-
- if (ctx->u.cluster_info.local_ctx < 0 ||
- ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) {
- errno = EBADF;
- return -1;
- }
-
- /* trigger receive here */
- pthread_mutex_lock(&ctx->u.cluster_info.mutex);
-
- n = ctx->u.cluster_info.queue;
- if (n == NULL) {
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
- errno = EAGAIN;
- return -1;
- }
-
- list_remove(&ctx->u.cluster_info.queue, n);
-
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
-
- m = n->message;
- switch(m->msg_control) {
- case M_CLOSE:
- ctx->u.cluster_info.remote_ctx = 0;
- break;
- case M_OPEN_ACK:
- /* Response to new connection */
- ctx->u.cluster_info.remote_ctx = m->src_ctx;
- break;
- case M_DATA:
- /* Kill the message control structure */
- memmove(m, &m[1], n->len - sizeof(*m));
- if (msg)
- *msg = (void *)m;
- else {
- printf("Warning: dropping data message\n");
- free(m);
- }
- if (len)
- *len = (n->len - sizeof(*m));
- ret = (n->len - sizeof(*m));
- free(n);
-
- printf("Message received\n");
- return ret;
- case M_OPEN:
- /* Someone is trying to open a connection */
- default:
- /* ?!! */
- ret = -1;
- break;
- }
-
- free(m);
- free(n);
-
- return ret;
-}
-
-
-/**
- Receive a message from a cluster-context. This copies out the contents
- into the user-specified buffer, and does random other things.
- */
-static int
-cluster_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
-{
- int req;
- void *priv_msg;
- size_t priv_len;
-
- if (!msg || !maxlen) {
- errno = EINVAL;
- return -1;
- }
-
- req = cluster_msg_wait(ctx, timeout);
-
- switch (req) {
- case M_DATA:
- /* Copy out. */
- req = _cluster_msg_receive(ctx, &priv_msg, &priv_len);
- if (req < 0) {
- printf("Ruh roh!\n");
- return -1;
- }
-
- priv_len = (priv_len < maxlen ? priv_len : maxlen);
-
- memcpy(msg, priv_msg, priv_len);
- free(priv_msg);
- return req;
- case M_CLOSE:
- errno = ECONNRESET;
- return -1;
- case 0:
- /*printf("Nothing on queue\n");*/
- return 0;
- default:
- printf("PROTOCOL ERROR: Received %d\n", req);
+ if (!ctx)
return -1;
- }
-
- printf("%s: CODE PATH ERROR\n", __FUNCTION__);
+
+ if (ctx->ops && ctx->ops->mo_wait)
+ return ctx->ops->mo_wait(ctx, timeout);
+ errno = ENOSYS;
return -1;
}
-static int
-_local_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
-{
- struct timeval tv = {0, 0};
- struct timeval *p = NULL;
- local_msg_hdr_t h;
-
- if (timeout >= 0) {
- tv.tv_sec = timeout;
- p = &tv;
- }
-
- if (_read_retry(ctx->u.local_info.sockfd, &h, sizeof(h), p) < 0)
- return -1;
-
- if (maxlen < h.msg_len) {
- printf("WARNING: Buffer too small for message!\n");
- h.msg_len = maxlen;
- }
-
- return _read_retry(ctx->u.local_info.sockfd, msg, h.msg_len, p);
-}
-
-
-/**
- Receive a message from a cluster-context. This copies out the contents
- into the user-specified buffer, and does random other things.
- */
-static int
-local_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
-{
- int req;
- char priv_msg[4096];
- size_t priv_len;
-
- if (!msg || !maxlen) {
- errno = EINVAL;
- return -1;
- }
-
- switch (req) {
- case M_DATA:
- /* Copy out. */
- req = _local_msg_receive(ctx, priv_msg, priv_len, timeout);
- if (req <= 0)
- return -1;
-
- priv_len = (priv_len < maxlen ? priv_len : maxlen);
-
- memcpy(msg, priv_msg, priv_len);
- free(msg);
- return req;
- case M_CLOSE:
- errno = ECONNRESET;
- return -1;
- case 0:
- /*printf("Nothing on queue\n");*/
- return 0;
- default:
- printf("PROTOCOL ERROR: Received %d\n", req);
- return -1;
- }
-
- printf("%s: CODE PATH ERROR\n", __FUNCTION__);
- return -1;
-}
-
int
msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
{
- if (!ctx || !msg || !maxlen) {
- errno = EINVAL;
+ errno = EINVAL;
+ if (!ctx)
return -1;
- }
- switch(ctx->type) {
- case MSG_CLUSTER:
- return cluster_msg_receive(ctx, msg, maxlen, timeout);
- case MSG_SOCKET:
- return local_msg_receive(ctx, msg, maxlen, timeout);
- default:
- break;
- }
-
- errno = EINVAL;
+ if (ctx->ops && ctx->ops->mo_receive)
+ return ctx->ops->mo_receive(ctx, msg, maxlen, timeout);
+ errno = ENOSYS;
return -1;
}
@@ -755,78 +166,28 @@
/var/run/cluster...
*/
int
-msg_open(int nodeid, int port, msgctx_t *ctx, int timeout)
+msg_open(int type, int nodeid, int port, msgctx_t *ctx, int timeout)
{
- int t = 0;
-
errno = EINVAL;
if (!ctx)
return -1;
-
- /*printf("Opening pseudo channel to node %d\n", nodeid);*/
-
- memset(ctx, 0, sizeof(*ctx));
- if (nodeid == CMAN_NODEID_US) {
- if ((ctx->u.local_info.sockfd = local_connect()) < 0) {
- return -1;
- }
- ctx->type = MSG_SOCKET;
- return 0;
- }
-
- ctx->type = MSG_CLUSTER;
- ctx->u.cluster_info.nodeid = nodeid;
- ctx->u.cluster_info.port = port;
- ctx->u.cluster_info.local_ctx = -1;
- ctx->u.cluster_info.remote_ctx = 0;
- ctx->u.cluster_info.queue = NULL;
- ctx->u.cluster_info.select_pipe[0] = -1;
- ctx->u.cluster_info.select_pipe[1] = -1;
- pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL);
- pthread_cond_init(&ctx->u.cluster_info.cond, NULL);
-
- /* Assign context index */
- if (assign_ctx(ctx) < 0)
- return -1;
-
- //printf(" Local CTX: %d\n", ctx->u.cluster_info.local_ctx);
-
- /* Send open */
-
- //printf(" Sending control message M_OPEN\n");
- if (cluster_send_control_msg(ctx, M_OPEN) < 0) {
+ /* XXX SPECIAL CASE... ow. */
+ switch(type) {
+ case MSG_SOCKET:
+ sock_msg_init(ctx);
+ break;
+ case MSG_CLUSTER:
+ cluster_msg_init(ctx);
+ break;
+ default:
return -1;
}
- /* Ok, wait for a response */
- while (!is_established(ctx)) {
- ++t;
- if (t > timeout) {
- msg_close(ctx);
- errno = ETIMEDOUT;
- return -1;
- }
-
- switch(msg_wait(ctx, 1)) {
- case M_OPEN_ACK:
- _cluster_msg_receive(ctx, NULL, NULL);
- break;
- case M_NONE:
- continue;
- default:
- printf("PROTO ERROR: M_OPEN_ACK not received \n");
- }
- }
-
- /*
- printf(" Remote CTX: %d\n",
- ctx->u.cluster_info.remote_ctx);
- printf(" Pseudo channel established!\n");
- */
-
-
- return 0;
+ if (ctx->ops && ctx->ops->mo_open)
+ return ctx->ops->mo_open(ctx->type, nodeid, port, ctx, timeout);
+ errno = ENOSYS;
+ return -1;
}
@@ -842,484 +203,65 @@
int
msg_close(msgctx_t *ctx)
{
- msg_q_t *n = NULL;
-
- if (!ctx) {
- errno = EINVAL;
- return -1;
- }
-
- switch (ctx->type) {
- case MSG_CLUSTER:
- if (ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) {
- errno = EINVAL;
- return -1;
- }
- pthread_mutex_lock(&context_lock);
- /* Other threads should not be able to see this again */
- if (contexts[ctx->u.cluster_info.local_ctx])
- contexts[ctx->u.cluster_info.local_ctx] = NULL;
- pthread_mutex_unlock(&context_lock);
- /* Clear receive queue */
- while ((n = ctx->u.cluster_info.queue) != NULL) {
- list_remove(&ctx->u.cluster_info.queue, n);
- free(n->message);
- free(n);
- }
- /* Send close message */
- if (ctx->u.cluster_info.remote_ctx != 0) {
- cluster_send_control_msg(ctx, M_CLOSE);
- }
-
- /* Close pipe if it's open */
- if (ctx->u.cluster_info.select_pipe[0] >= 0) {
- close(ctx->u.cluster_info.select_pipe[0]);
- ctx->u.cluster_info.select_pipe[0] = -1;
- }
- if (ctx->u.cluster_info.select_pipe[1] >= 0) {
- close(ctx->u.cluster_info.select_pipe[1]);
- ctx->u.cluster_info.select_pipe[1] = -1;
- }
- return 0;
- case MSG_SOCKET:
- close(ctx->u.local_info.sockfd);
- ctx->u.local_info.sockfd = -1;
- return 0;
- default:
- break;
- }
-
errno = EINVAL;
- return -1;
-}
-
-
-/**
- Called by cman_dispatch to deal with messages coming across the
- cluster socket. This function deals with fanning out the requests
- and putting them on the per-context queues. We don't have
- the benefits of pre-configured buffers, so we need this.
- */
-static void
-process_cman_msg(cman_handle_t h, void *priv, char *buf, int len,
- uint8_t port, int nodeid)
-{
- cluster_msg_hdr_t *m = (cluster_msg_hdr_t *)buf;
- msg_q_t *node;
- msgctx_t *ctx;
-
- if (len < sizeof(*m)) {
- printf("Message too short.\n");
- return;
- }
-
- swab_cluster_msg_hdr_t(m);
-
-#if 0
- printf("Processing ");
- switch(m->msg_control) {
- case M_NONE:
- printf("M_NONE\n");
- break;
- case M_OPEN:
- printf("M_OPEN\n");
- break;
- case M_OPEN_ACK:
- printf("M_OPEN_ACK\n");
- break;
- case M_DATA:
- printf("M_DATA\n");
- break;
- case M_CLOSE:
- printf("M_CLOSE\n");
- break;
- }
-
- printf(" Node ID: %d %d\n", m->src_nodeid, nodeid);
- printf(" Remote CTX: %d Local CTX: %d\n", m->src_ctx, m->dest_ctx);
-#endif
-
- if (m->dest_ctx >= MAX_CONTEXTS) {
- printf("Context invalid; ignoring\n");
- return;
- }
-
- while ((node = malloc(sizeof(*node))) == NULL) {
- sleep(1);
- }
- memset(node, 0, sizeof(*node));
- while ((node->message = malloc(len)) == NULL) {
- sleep(1);
- }
- memcpy(node->message, buf, len);
- node->len = len;
-
- pthread_mutex_lock(&context_lock);
- ctx = contexts[m->dest_ctx];
- if (!ctx) {
- /* We received a close for something we've already
- detached from our list. No big deal, just
- ignore. */
- free(node->message);
- free(node);
- pthread_mutex_unlock(&context_lock);
- return;
- }
-
- pthread_mutex_lock(&ctx->u.cluster_info.mutex);
- list_insert(&ctx->u.cluster_info.queue, node);
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
- /* If a select pipe was set up, wake it up */
- if (ctx->u.cluster_info.select_pipe[1] >= 0)
- write(ctx->u.cluster_info.select_pipe[1], "", 1);
- pthread_mutex_unlock(&context_lock);
-
- pthread_cond_signal(&ctx->u.cluster_info.cond);
-}
-
-
-/**
- Accept a new pseudo-private connection coming in over the
- cluster socket.
- */
-static int
-cluster_msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx)
-{
- errno = EINVAL;
- cluster_msg_hdr_t *m;
- msg_q_t *n;
- char done = 0;
- char foo;
-
- if (!listenctx || !acceptctx)
- return -1;
- if (listenctx->u.cluster_info.local_ctx != 0)
- return -1;
-
- pthread_mutex_lock(&listenctx->u.cluster_info.mutex);
-
- n = listenctx->u.cluster_info.queue;
- if (n == NULL) {
- pthread_mutex_unlock(&listenctx->u.cluster_info.mutex);
- errno = EAGAIN;
+ if (!ctx)
return -1;
- }
-
- /* the OPEN should be the first thing on the list; this loop
- is probably not necessary */
- list_do(&listenctx->u.cluster_info.queue, n) {
-
- m = n->message;
- switch(m->msg_control) {
- case M_OPEN:
- list_remove(&listenctx->u.cluster_info.queue, n);
- /*printf("Accepting connection from %d %d\n",
- m->src_nodeid, m->src_ctx);*/
-
- /* New connection */
- pthread_mutex_init(&acceptctx->u.cluster_info.mutex,
- NULL);
- pthread_cond_init(&acceptctx->u.cluster_info.cond,
- NULL);
- acceptctx->u.cluster_info.queue = NULL;
- acceptctx->u.cluster_info.remote_ctx = m->src_ctx;
- acceptctx->u.cluster_info.nodeid = m->src_nodeid;
- acceptctx->u.cluster_info.port = m->msg_port;
-
- assign_ctx(acceptctx);
- cluster_send_control_msg(acceptctx, M_OPEN_ACK);
-
- if (listenctx->u.cluster_info.select_pipe[0] >= 0) {
- read(listenctx->u.cluster_info.select_pipe[0],
- &foo, 1);
- }
-
- done = 1;
- free(m);
- free(n);
-
- break;
- case M_DATA:
- /* Data messages (i.e. from broadcast msgs) are
- okay too!... but we don't handle them here */
- break;
- default:
- /* Other message?! */
- printf("Odd... %d\n", m->msg_control);
- break;
- }
-
- if (done)
- break;
-
- } while (!list_done(&listenctx->u.cluster_info.queue, n));
-
- pthread_mutex_unlock(&listenctx->u.cluster_info.mutex);
-
- return 0;
+ if (ctx->ops && ctx->ops->mo_close)
+ return ctx->ops->mo_close(ctx);
+ errno = ENOSYS;
+ return -1;
}
-/* XXX INCOMPLETE */
int
msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx)
{
- switch(listenctx->type) {
- case MSG_CLUSTER:
- return cluster_msg_accept(listenctx, acceptctx);
- case MSG_SOCKET:
- return 0;
- default:
- break;
- }
-
- return -1;
-}
-
-
-static int
-local_listener_sk(void)
-{
- int sock;
- struct sockaddr_un su;
- mode_t om;
-
- sock = socket(PF_LOCAL, SOCK_STREAM, 0);
- if (sock < 0)
+ errno = EINVAL;
+ if (!listenctx || !acceptctx)
return -1;
-
- unlink(RGMGR_SOCK);
- om = umask(077);
- su.sun_family = PF_LOCAL;
- snprintf(su.sun_path, sizeof(su.sun_path), RGMGR_SOCK);
-
- if (bind(sock, &su, sizeof(su)) < 0) {
- umask(om);
- goto fail;
- }
- umask(om);
-
- if (listen(sock, SOMAXCONN) < 0)
- goto fail;
-
- return sock;
-fail:
- if (sock >= 0)
- close(sock);
+ if (listenctx->ops && listenctx->ops->mo_accept)
+ return listenctx->ops->mo_accept(listenctx, acceptctx);
+ errno = ENOSYS;
return -1;
}
-/**
- This waits for events on the cluster comms FD and
- dispatches them using cman_dispatch. Initially,
- the design had no permanent threads, but that model
- proved difficult to implement correctly.
- */
-static void *
-cluster_comms_thread(void *arg)
-{
- while (thread_running) {
- poll_cluster_messages(2);
- }
-
- return NULL;
-}
-
-
-/*
- Transliterates a CMAN event to a control message
- */
-static void
-process_cman_event(cman_handle_t handle, void *private, int reason, int arg)
-{
- cluster_msg_hdr_t *msg;
- int *argp;
- msg_q_t *node;
- msgctx_t *ctx;
-
- /* Allocate queue node */
- while ((node = malloc(sizeof(*node))) == NULL) {
- sleep(1);
- }
- memset(node, 0, sizeof(*node));
-
- /* Allocate message: header + int (for arg) */
- while ((msg = malloc(sizeof(int) +
- sizeof(cluster_msg_hdr_t))) == NULL) {
- sleep(1);
- }
- memset(msg, 0, sizeof(int)+sizeof(cluster_msg_hdr_t));
-
-
- switch(reason) {
-#if defined(LIBCMAN_VERSION) && LIBCMAN_VERSION >= 2
- case CMAN_REASON_PORTOPENED:
- msg->msg_control = M_PORTOPENED;
- break;
- case CMAN_REASON_TRY_SHUTDOWN:
- msg->msg_control = M_TRY_SHUTDOWN;
- break;
-#endif
- case CMAN_REASON_PORTCLOSED:
- msg->msg_control = M_PORTCLOSED;
- break;
- case CMAN_REASON_STATECHANGE:
- msg->msg_control = M_STATECHANGE;
- break;
- }
-
- argp = ((void *)msg + sizeof(cluster_msg_hdr_t));
- *argp = arg;
-
- node->len = sizeof(cluster_msg_hdr_t) + sizeof(int);
- node->message = msg;
-
- pthread_mutex_lock(&context_lock);
- ctx = contexts[0]; /* This is the cluster context... */
- if (!ctx) {
- /* We received a close for something we've already
- detached from our list. No big deal, just
- ignore. */
- free(node->message);
- free(node);
- pthread_mutex_unlock(&context_lock);
- return;
- }
-
- pthread_mutex_lock(&ctx->u.cluster_info.mutex);
- list_insert(&ctx->u.cluster_info.queue, node);
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
- /* If a select pipe was set up, wake it up */
- if (ctx->u.cluster_info.select_pipe[1] >= 0)
- write(ctx->u.cluster_info.select_pipe[1], "", 1);
- pthread_mutex_unlock(&context_lock);
-
- pthread_cond_signal(&ctx->u.cluster_info.cond);
-}
-
-
-/* XXX INCOMPLETE */
+/* XXX Special case */
int
-msg_init(chandle_t *ch)
+msg_listen(int type, void *port, int me, msgctx_t **ctx)
{
- int e;
- pthread_attr_t attrs;
- msgctx_t *ctx;
-
- pthread_mutex_lock(&ch->c_lock);
-
- /* Set up local context */
-
- ctx = msg_new_ctx();
- if (!ctx) {
- pthread_mutex_unlock(&ch->c_lock);
+ errno = EINVAL;
+ if (!me)
return -1;
- }
-
- ctx->type = MSG_SOCKET;
- ctx->u.local_info.sockfd = local_listener_sk();
- ctx->u.local_info.flags = SKF_LISTEN;
-
- ch->local_ctx = ctx;
-
- ctx = msg_new_ctx();
-
- if (!ctx) {
- pthread_mutex_unlock(&ch->c_lock);
- msg_free_ctx((msgctx_t *)ch->local_ctx);
+ if (type == MSG_NONE)
return -1;
- }
-
- gch = ch;
-
- if (cman_start_recv_data(ch->c_cluster, process_cman_msg,
- RG_PORT) != 0) {
- e = errno;
- msg_close(ch->local_ctx);
- pthread_mutex_unlock(&ch->c_lock);
- msg_free_ctx((msgctx_t *)ch->local_ctx);
- msg_free_ctx((msgctx_t *)ch->cluster_ctx);
- errno = e;
+ if (!ctx)
return -1;
- }
- if (cman_start_notification(ch->c_cluster, process_cman_event) != 0) {
- e = errno;
- msg_close(ch->local_ctx);
- pthread_mutex_unlock(&ch->c_lock);
- msg_free_ctx((msgctx_t *)ch->local_ctx);
- msg_free_ctx((msgctx_t *)ch->cluster_ctx);
- errno = e;
+ if (type == MSG_CLUSTER) {
+ return cluster_msg_listen(me, port, ctx);
+ } else if (type == MSG_SOCKET) {
+ return sock_msg_listen(me, port, ctx);
}
- ch->cluster_ctx = ctx;
- pthread_mutex_unlock(&ch->c_lock);
-
- pthread_mutex_lock(&context_lock);
-
- memset(contexts, 0, sizeof(contexts));
- contexts[0] = ctx;
-
- ctx->type = MSG_CLUSTER;
- ctx->u.cluster_info.port = RG_PORT; /* port! */
- ctx->u.cluster_info.nodeid = 0; /* Broadcast! */
- ctx->u.cluster_info.select_pipe[0] = -1;
- ctx->u.cluster_info.select_pipe[1] = -1;
- pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL);
- pthread_cond_init(&ctx->u.cluster_info.cond, NULL);
- pthread_mutex_unlock(&context_lock);
-
- pthread_attr_init(&attrs);
- pthread_attr_setinheritsched(&attrs, PTHREAD_INHERIT_SCHED);
- pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
-
- thread_running = 1;
- pthread_create(&comms_thread, &attrs, cluster_comms_thread, NULL);
-
-
- pthread_attr_destroy(&attrs);
-
- return 0;
+ return -1;
}
-int
-msg_print_ctx(int ctx)
+void
+msg_print(msgctx_t *ctx)
{
- if (!contexts[ctx])
- return -1;
-
- printf("Cluster Message Context %d\n", ctx);
- printf(" Node ID %d\n", contexts[ctx]->u.cluster_info.nodeid);
- printf(" Remote %d\n", contexts[ctx]->u.cluster_info.remote_ctx);
- return 0;
+ if (ctx->ops && ctx->ops->mo_print)
+ return ctx->ops->mo_print(ctx);
}
/* XXX INCOMPLETE */
int
-msg_shutdown(chandle_t *ch)
+msg_shutdown(void)
{
- if (!ch) {
- errno = EINVAL;
- return -1;
- }
-
- while (pthread_kill(comms_thread, 0) == 0)
- sleep(1);
-
- pthread_mutex_lock(&ch->c_lock);
-
- /* xxx purge everything */
- msg_close(ch->local_ctx);
- cman_end_recv_data(ch->c_cluster);
-
- msg_free_ctx(ch->local_ctx);
- msg_free_ctx(ch->cluster_ctx);
-
-
- pthread_mutex_unlock(&ch->c_lock);
+ sock_msg_shutdown();
+ cluster_msg_shutdown();
return 0;
}
@@ -1337,7 +279,6 @@
{
msgctx_t *p;
- printf("Alloc %d\n", sizeof(msgctx_t));
p = malloc(sizeof(msgctx_t));
if (!p)
return NULL;
@@ -1354,4 +295,3 @@
{
free(dead);
}
-
--- cluster/rgmanager/src/clulib/msgsimple.c 2006/06/02 17:37:10 1.5
+++ cluster/rgmanager/src/clulib/msgsimple.c 2006/07/11 23:52:41 1.6
@@ -76,9 +76,8 @@
msg_receive_simple(msgctx_t *ctx, generic_msg_hdr ** buf, int timeout)
{
int ret;
- char msgbuf[16384];
+ char msgbuf[4096];
generic_msg_hdr *peek_msg = (generic_msg_hdr *)msgbuf;
- int size;
/*
* Peek at the header. We need the size of the inbound buffer!
@@ -102,6 +101,7 @@
return -1;
}
+ /* Decode so we know how much to allocate */
swab_generic_msg_hdr(peek_msg);
if (peek_msg->gh_magic != GENERIC_HDR_MAGIC) {
fprintf(stderr, "Invalid magic: Wanted 0x%08x, got 0x%08x\n",
@@ -113,14 +113,15 @@
* allocate enough memory to receive the header + diff buffer
*/
*buf = malloc(peek_msg->gh_length);
-
if (!*buf) {
fprintf(stderr, "%s: malloc: %s", __FUNCTION__,
strerror(errno));
return -1;
}
- memset(*buf, 0, peek_msg->gh_length);
- memcpy(*buf, msgbuf + sizeof(generic_msg_hdr), peek_msg->gh_length);
+ memcpy(*buf, msgbuf, peek_msg->gh_length);
+
+ /* Put it back into the original order... */
+ swab_generic_msg_hdr((generic_msg_hdr *)(*buf));
return ret;
}
--- cluster/rgmanager/src/clulib/rg_strings.c 2005/03/07 17:25:12 1.3
+++ cluster/rgmanager/src/clulib/rg_strings.c 2006/07/11 23:52:41 1.4
@@ -1,3 +1,21 @@
+/*
+ Copyright Red Hat, Inc. 2004-2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
const char *rg_state_strings[] = {
"stopped",
"starting",
--- cluster/rgmanager/src/clulib/signals.c 2004/08/13 15:36:51 1.1
+++ cluster/rgmanager/src/clulib/signals.c 2006/07/11 23:52:41 1.2
@@ -1,3 +1,21 @@
+/*
+ Copyright Red Hat, Inc. 2003-2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
#include <signal.h>
#include <stdlib.h>
#include <string.h>
--- cluster/rgmanager/src/clulib/vft.c 2006/06/02 17:37:10 1.13
+++ cluster/rgmanager/src/clulib/vft.c 2006/07/11 23:52:41 1.14
@@ -1,5 +1,5 @@
/*
- Copyright Red Hat, Inc. 2002-2003
+ Copyright Red Hat, Inc. 2002-2006
This program is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
@@ -40,12 +40,8 @@
#include <stdio.h>
#include <assert.h>
#include <signals.h>
+#include <lock.h>
-
-int clu_lock_verbose(char *lockname, int flags, void **lockpp);
-
-static int vf_lfds[2];
-static int vf_lfd = 0;
static key_node_t *key_list = NULL; /** List of key nodes. */
static int _node_id = (int)-1;/** Our node ID, set with vf_init. */
static uint16_t _port = 0; /** Our daemon ID, set with vf_init. */
@@ -57,7 +53,7 @@
static pthread_mutex_t key_list_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t vf_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_t vf_thread = (pthread_t)-1;
-static int thread_ready = 0;
+static int vf_thread_ready = 0;
static vf_vote_cb_t default_vote_cb = NULL;
static vf_vote_cb_t default_commit_cb = NULL;
@@ -65,43 +61,42 @@
/*
* Internal Functions
*/
-static int send_to_all(msgctx_t *peer_ctx, int32_t command, int arg1, int arg2,
- int log_errors);
-static int vf_send_abort(msgctx_t *ctx);
-static int vf_send_commit(msgctx_t *ctx);
-static void close_all(msgctx_t *fds);
+static int _send_simple(msgctx_t *ctx, int32_t command, int arg1, int arg2,
+ int log_errors);
+static int vf_send_abort(msgctx_t *ctx, uint32_t trans);
+static int vf_send_commit(msgctx_t *ctx, uint32_t trans);
static key_node_t * kn_find_key(char *keyid);
-static key_node_t * kn_find_fd(msgctx_t *ctx);
-static int vf_handle_join_view_msg(msgctx_t *ctx, vf_msg_t * hdrp);
+static key_node_t * kn_find_trans(uint32_t trans);
+static int vf_handle_join_view_msg(msgctx_t *ctx, int nodeid, vf_msg_t * hdrp);
static int vf_resolve_views(key_node_t *key_node);
-static int vf_unanimous(msgctx_t *peer_ctx, int remain, int timeout);
-static view_node_t * vn_new(msgctx_t *ctx, uint32_t nodeid, int viewno,
+static int vf_unanimous(msgctx_t *ctx, int trans, int remain, int timeout);
+static view_node_t * vn_new(uint32_t trans, uint32_t nodeid, int viewno,
void *data, uint32_t datalen);
static int vf_request_current(cluster_member_list_t *membership, char *keyid,
- int *viewno, void **data, uint32_t *datalen);
-static int _vf_purge(key_node_t *key_node, msgctx_t **fd);
+ uint64_t *viewno, void **data, uint32_t *datalen);
+static int _vf_purge(key_node_t *key_node, uint32_t *trans);
/* Join-view buffer list functions */
static int vn_cmp(view_node_t *left, view_node_t *right);
static int vn_insert_sorted(view_node_t **head, view_node_t *node);
-static view_node_t * vn_remove(view_node_t **head, msgctx_t *ctx);
-static int vf_buffer_join_msg(int fd, vf_msg_t *hdr,
+static view_node_t * vn_remove(view_node_t **head, uint32_t trans);
+static int vf_buffer_join_msg(vf_msg_t *hdr,
struct timeval *timeout);
/* Commits buffer list functions */
static int vc_cmp(commit_node_t *left, commit_node_t *right);
static int vc_insert_sorted(commit_node_t **head, commit_node_t *node);
-static commit_node_t * vc_remove(commit_node_t **head, int fd);
-static int vf_buffer_commit(int fd);
+static commit_node_t * vc_remove(commit_node_t **head, uint32_t trans);
+static int vf_buffer_commit(uint32_t trans);
/* Simple functions which client calls to vote/abort */
-static int vf_vote_yes(msgctx_t *ctx);
-static int vf_vote_no(msgctx_t *ctx);
-static int vf_abort(msgctx_t *ctx);
+static int vf_vote_yes(msgctx_t *ctx, uint32_t trans);
+static int vf_vote_no(msgctx_t *ctx, uint32_t trans);
+static int vf_abort(uint32_t trans);
static int tv_cmp(struct timeval *left, struct timeval *right);
/* Resolution */
-static int vf_try_commit(key_node_t *key_node);
+static uint32_t vf_try_commit(key_node_t *key_node);
int vf_init(int my_node_id, uint16_t my_port,
vf_vote_cb_t vote_cb, vf_commit_cb_t commit_cb);
@@ -111,7 +106,7 @@
vf_commit_cb_t commit_cb);
int vf_write(cluster_member_list_t *memberhip, uint32_t flags,
char *keyid, void *data, uint32_t datalen);
-int vf_process_msg(int handle, generic_msg_hdr *msgp, int nbytes);
+int vf_process_msg(msgctx_t *ctx, int nodeid, generic_msg_hdr *msgp, int nbytes);
int vf_end(char *keyid);
int vf_read(cluster_member_list_t *membership, char *keyid, uint64_t *view,
void **data, uint32_t *datalen);
@@ -123,14 +118,14 @@
struct vf_args {
uint16_t port;
int local_node_id;
+ msgctx_t *ctx;
};
static int
-send_to_all(msgctx_t *peer_ctx, int32_t command, int arg1, int arg2, int log_errors)
+_send_simple(msgctx_t *ctx, int32_t command, int arg1, int arg2, int log_errors)
{
generic_msg_hdr hdr;
- int x, rv = 0;
hdr.gh_magic = GENERIC_HDR_MAGIC;
hdr.gh_length = sizeof(hdr);
@@ -140,51 +135,27 @@
swab_generic_msg_hdr(&hdr);
- for (x=0; peer_ctx[x].type != -1; x++) {
- if (msg_send(&peer_ctx[x], &hdr, sizeof(hdr)) == sizeof(hdr))
- continue;
-
- if (log_errors) {
-#if 0
- clulog(LOG_ERR, "#14: Failed to send %d "
- "bytes to %d!\n", sizeof(hdr),
- x);
-#endif
- }
- rv = -1;
- }
-
- return rv;
+ return msg_send(ctx, &hdr, sizeof(hdr));
}
static int
-vf_send_abort(msgctx_t *ctx)
+vf_send_abort(msgctx_t *ctx, uint32_t trans)
{
#ifdef DEBUG
- printf("VF: Broadcasting ABORT\n");
+ printf("VF: Broadcasting ABORT (X#%08x)\n", trans);
#endif
- return send_to_all(ctx, VF_MESSAGE, VF_ABORT, 0, 0);
+ return _send_simple(ctx, VF_MESSAGE, VF_ABORT, trans, 0);
}
static int
-vf_send_commit(msgctx_t *ctx)
+vf_send_commit(msgctx_t *ctx, uint32_t trans)
{
#ifdef DEBUG
printf("VF: Broadcasting FORMED\n");
#endif
- return send_to_all(ctx, VF_MESSAGE, VF_VIEW_FORMED, 0, 1);
-}
-
-
-static void
-close_all(msgctx_t *ctx)
-{
- int x;
- for (x = 0; ctx[x].type != -1; x++) {
- msg_close(&ctx[x]);
- }
+ return _send_simple(ctx, VF_MESSAGE, VF_VIEW_FORMED, trans, 1);
}
@@ -202,14 +173,14 @@
static key_node_t *
-kn_find_fd(msgctx_t *ctx)
+kn_find_trans(uint32_t trans)
{
key_node_t *cur;
view_node_t *curvn;
for (cur = key_list; cur; cur = cur->kn_next)
for (curvn = cur->kn_jvlist; curvn; curvn = curvn->vn_next)
- if (curvn->vn_ctx == ctx)
+ if (curvn->vn_transaction == trans)
return cur;
return NULL;
@@ -217,15 +188,17 @@
static int
-vf_handle_join_view_msg(msgctx_t *ctx, vf_msg_t * hdrp)
+vf_handle_join_view_msg(msgctx_t *ctx, int nodeid, vf_msg_t * hdrp)
{
struct timeval timeout;
key_node_t *key_node;
+ uint32_t trans;
+ trans = hdrp->vm_msg.vf_transaction;
#ifdef DEBUG
- printf("VF_JOIN_VIEW from member #%d! Key: %s #%d\n",
+ printf("VF_JOIN_VIEW from member #%d! Key: %s #%d (X#%08x)\n",
hdrp->vm_msg.vf_coordinator, hdrp->vm_msg.vf_keyid,
- (int) hdrp->vm_msg.vf_view);
+ (int) hdrp->vm_msg.vf_view, trans);
#endif
pthread_mutex_lock(&key_list_mutex);
@@ -241,7 +214,7 @@
pthread_mutex_unlock(&key_list_mutex);
printf("VF: Error: Failed to initialize %s\n",
hdrp->vm_msg.vf_keyid);
- vf_vote_no(ctx);
+ vf_vote_no(ctx, trans);
return VFR_ERROR;
}
@@ -258,7 +231,7 @@
#ifdef DEBUG
printf("VF: Voting NO (via callback)\n");
#endif
- vf_vote_no(ctx);
+ vf_vote_no(ctx, trans);
return VFR_OK;
}
}
@@ -269,12 +242,12 @@
timeout.tv_sec = key_node->kn_tsec;
timeout.tv_usec = 0;
- if (vf_buffer_join_msg(ctx, (vf_msg_t *) hdrp, &timeout)) {
+ if (vf_buffer_join_msg((vf_msg_t *) hdrp, &timeout)) {
pthread_mutex_unlock(&key_list_mutex);
#ifdef DEBUG
- printf("VF: Voting YES\n");
+ printf("VF: Voting YES (X#%08x)\n", trans);
#endif
- vf_vote_yes(ctx);
+ vf_vote_yes(ctx, trans);
return VFR_OK;
}
@@ -282,7 +255,7 @@
#ifdef DEBUG
printf("VF: Voting NO\n");
#endif
- vf_vote_no(ctx);
+ vf_vote_no(ctx, trans);
return VFR_NO;
}
@@ -297,13 +270,10 @@
int commits = 0;
void *data;
uint32_t datalen;
- msgctx_t *ctx;
+ uint32_t trans;
- while ((ctx = vf_try_commit(key_node)) != -1) {
-
- /* XXX in general, this shouldn't kill the fd... */
+ while ((trans = vf_try_commit(key_node)) != 0) {
commits++;
- msg_close(commitfd);
}
if (key_node->kn_commit_cb) {
@@ -327,12 +297,12 @@
static int
-vf_unanimous(msgctx_t *peer_ctx, int remain, int timeout)
+vf_unanimous(msgctx_t *mcast_ctx, int trans, int remain,
+ int timeout)
{
generic_msg_hdr response;
struct timeval tv;
- fd_set rfds;
- int nready, x, max;
+ int x;
/* Set up for the select */
tv.tv_sec = timeout;
@@ -346,69 +316,62 @@
* Flag hosts which we received messages from so we don't
* read a second message.
*/
- while (remain) {
- FD_ZERO(&rfds);
- for (x = 0; peer_ctx[x].type != M_NONE; x++)
- msg_fd_set(&peer_ctx[x], &rfds, &max);
-
- nready = select(max + 1, &rfds, NULL, NULL, &tv);
- if (nready <= -1) {
- if (nready == 0)
- printf("VF Abort: Timed out!\n");
- else
- printf("VF Abort: %s\n",
- strerror(errno));
- return 0;
+ while (remain && timeout) {
+
+ if (msg_wait(mcast_ctx, 5) <= 0) {
+ --timeout;
+ continue;
}
- for (x = 0; (peer_ctx[x].type != M_NONE) && nready; x++) {
- if (!msg_fd_isset(&peer_fds[x], &rfds))
- continue;
+ x = msg_receive(mcast_ctx, &response, sizeof(response), 1);
+ if (x < sizeof(response))
+ continue;
+
+ /*
+ * Decode & validate message
+ */
+ swab_generic_msg_hdr(&response);
+ if ((response.gh_magic != GENERIC_HDR_MAGIC) ||
+ (response.gh_command != VF_MESSAGE)) {
+ /* Don't process anything but votes */
+ continue;
+ }
- remain--;
- nready--;
- /*
- * Get reply from node x. XXX 1 second timeout?
- */
- if (msg_receive(&peer_ctx[x], &response,
- sizeof(response),
- 1) == -1) {
- printf("VF: Abort: Timed out during "
- "receive from fd #%p\n", &peer_ctx[x]);
- return 0;
- }
-
- /*
- * Decode & validate message
- */
- swab_generic_msg_hdr(&response);
- if ((response.gh_magic != GENERIC_HDR_MAGIC) ||
- (response.gh_command != VF_MESSAGE) ||
- (response.gh_arg1 != VF_VOTE)) {
- printf("VF: Abort: Invalid header in"
- " reply from fd #%p\n", &peer_fds[x]);
- return 0;
- }
-
+ if (vf_command(response.gh_arg1) != VF_VOTE)
+ /* Don't process anything but votes */
+ continue;
+
+ if (response.gh_arg2 != trans)
+ continue;
+
+ /*
+ * If we get a 'NO', we are done.
+ */
+ if (!(vf_flags(response.gh_arg1) & VFMF_AFFIRM)) {
/*
- * If we get a 'NO', we are done.
+ * XXX ok, it might be a mangled message;
+ * treat it as no anyway!
*/
- if (response.gh_arg2 != 1) {
- /*
- * XXX ok, it might be a mangled message;
- * treat it as no anyway!
- */
- printf("VF: Abort: fd #%d voted NO\n",
- peer_fds[x]);
- return 0;
- }
-
#ifdef DEBUG
- printf("VF: fd #%d voted YES\n", peer_fds[x]);
+ printf("VF: Abort: someone voted NO\n");
#endif
+ return 0;
}
+
+#ifdef DEBUG
+ printf("VF: YES\n");
+#endif
+ --remain;
}
+ if (remain) {
+#ifdef DEBUG
+ printf("VF: Timed out waiting for %d responses\n", remain);
+#endif
+ return 0;
+ }
+
+
/*
* Whohoooooooo!
*/
@@ -420,7 +383,7 @@
* ...
*/
static view_node_t *
-vn_new(msgctx_t *ctx, uint32_t nodeid, int viewno, void *data,
+vn_new(uint32_t trans, uint32_t nodeid, int viewno, void *data,
uint32_t datalen)
{
view_node_t *new;
@@ -433,7 +396,7 @@
memset(new,0,totallen);
- new->vn_ctx = ctx;
+ new->vn_transaction = trans;
new->vn_nodeid = nodeid;
new->vn_viewno = viewno;
new->vn_datalen = datalen;
@@ -502,7 +465,7 @@
static view_node_t *
-vn_remove(view_node_t **head, msgctx_t *ctx)
+vn_remove(view_node_t **head, uint32_t trans)
{
view_node_t *cur = *head, *back = NULL;
@@ -510,7 +473,7 @@
return NULL;
do {
- if (cur->vn_ctx == ctx) {
+ if (cur->vn_transaction == trans) {
if (back) {
back->vn_next = cur->vn_next;
cur->vn_next = NULL;
@@ -537,7 +500,7 @@
* (b) we don't receive any messages.
*/
static int
-vf_buffer_join_msg(msgctx_t *ctx, vf_msg_t *hdr, struct timeval *timeout)
+vf_buffer_join_msg(vf_msg_t *hdr, struct timeval *timeout)
{
key_node_t *key_node;
view_node_t *newp;
@@ -557,7 +520,8 @@
return 0;
}
- newp = vn_new(ctx, hdr->vm_msg.vf_coordinator, hdr->vm_msg.vf_view,
+ newp = vn_new(hdr->vm_msg.vf_transaction, hdr->vm_msg.vf_coordinator,
+ hdr->vm_msg.vf_view,
hdr->vm_msg.vf_data, hdr->vm_msg.vf_datalen);
if (timeout && (timeout->tv_sec || timeout->tv_usec)) {
@@ -585,10 +549,10 @@
static int
vc_cmp(commit_node_t *left, commit_node_t *right)
{
- if (left->vc_fd < right->vc_fd)
+ if (left->vc_transaction < right->vc_transaction)
return -1;
- if (left->vc_fd == right->vc_fd)
+ if (left->vc_transaction == right->vc_transaction)
return 0;
return 1;
@@ -637,7 +601,7 @@
static commit_node_t *
-vc_remove(commit_node_t **head, int fd)
+vc_remove(commit_node_t **head, uint32_t trans)
{
commit_node_t *cur = *head, *back = NULL;
@@ -645,7 +609,7 @@
return NULL;
do {
- if (cur->vc_fd == fd) {
+ if (cur->vc_transaction == trans) {
if (back) {
back->vc_next = cur->vc_next;
cur->vc_next = NULL;
@@ -671,13 +635,13 @@
* the last 'join-view' message.
*/
static int
-vf_buffer_commit(int fd)
+vf_buffer_commit(uint32_t trans)
{
key_node_t *key_node;
commit_node_t *newp;
int rv;
- key_node = kn_find_fd(fd);
+ key_node = kn_find_trans(trans);
if (!key_node)
return 0;
@@ -686,7 +650,7 @@
return 0;
newp->vc_next = NULL;
- newp->vc_fd = fd;
+ newp->vc_transaction = trans;
rv = vc_insert_sorted(&key_node->kn_clist, newp);
if (!rv)
@@ -697,31 +661,32 @@
static int
-vf_vote_yes(msgctx_t *ctx)
+vf_vote_yes(msgctx_t *ctx, uint32_t trans)
{
- return msg_send_simple(ctx, VF_MESSAGE, VF_VOTE, 1);
-
+ /* XXX */
+ return _send_simple(ctx, VF_MESSAGE, VF_VOTE | VFMF_AFFIRM, trans, 0);
}
static int
-vf_vote_no(msgctx_t *ctx)
+vf_vote_no(msgctx_t *ctx, uint32_t trans)
{
- return msg_send_simple(ctx, VF_MESSAGE, VF_VOTE, 0);
+ /* XXX */
+ return _send_simple(ctx, VF_MESSAGE, VF_VOTE, trans, 0);
}
static int
-vf_abort(msgctx_t *ctx)
+vf_abort(uint32_t trans)
{
key_node_t *key_node;
view_node_t *cur;
- key_node = kn_find_fd(ctx);
+ key_node = kn_find_trans(trans);
if (!key_node)
return -1;
- cur = vn_remove(&key_node->kn_jvlist, ctx);
+ cur = vn_remove(&key_node->kn_jvlist, trans);
if (!cur)
return -1;
@@ -787,30 +752,30 @@
/**
* Try to commit views in a given key_node.
*/
-static msgctx_t *
+static uint32_t
vf_try_commit(key_node_t *key_node)
{
view_node_t *vnp;
commit_node_t *cmp;
- msgctx_t *ctx = NULL;
+ uint32_t trans = 0;
if (!key_node)
- return NULL;
+ return 0;
if (!key_node->kn_jvlist)
- return NULL;
+ return 0;
- ctx = key_node->kn_jvlist->vn_ctx;
+ trans = key_node->kn_jvlist->vn_transaction;
- cmp = vc_remove(&key_node->kn_clist, ctx);
+ cmp = vc_remove(&key_node->kn_clist, trans);
if (!cmp) {
/*printf("VF: Commit for fd%d not received yet!", fd);*/
- return NULL;
+ return 0;
}
free(cmp); /* no need for it any longer */
- vnp = vn_remove(&key_node->kn_jvlist, ctx);
+ vnp = vn_remove(&key_node->kn_jvlist, trans);
if (!vnp) {
/*
* But, we know it was the first element on the list?!!
@@ -835,46 +800,27 @@
memcpy(key_node->kn_data, vnp->vn_data, vnp->vn_datalen);
free(vnp);
- return ctx;
+ return trans;
}
void
-vf_event_loop(int my_node_id)
+vf_event_loop(msgctx_t *ctx, int my_node_id)
{
- int max, nready, n, fd, flags;
- struct timeval tv;
- fd_set rfds;
+ int n;
generic_msg_hdr *hdrp = NULL;
- FD_ZERO(&rfds);
- max = msg_fill_fdset(&rfds, MSG_ALL, MSGP_VFS);
-
- tv.tv_sec = 1;
- tv.tv_usec = 0;
- nready = select(max + 1, &rfds, NULL, NULL, &tv);
- if (nready <= 0)
- return;
-
- while (nready) {
- fd = msg_next_fd(&rfds);
- --nready;
+ if (msg_wait(ctx, 3) != 0) {
- flags = msg_get_flags(fd);
-
- if (flags & MSG_LISTEN)
- fd = msg_accept(fd, 1, NULL);
-
- n = msg_receive_simple(fd, &hdrp, 5);
+ n = msg_receive_simple(ctx, &hdrp, 2);
if (n <= 0 || !hdrp) {
- msg_close(fd);
- continue;
+ return;
}
swab_generic_msg_hdr(hdrp);
if (hdrp->gh_command == VF_MESSAGE) {
- if (vf_process_msg(fd, hdrp, n) == VFR_COMMIT) {
+ if (vf_process_msg(ctx, 0, hdrp, n) == VFR_COMMIT) {
#ifdef DEBUG
printf("VFT: View committed\n");
#endif
@@ -893,7 +839,7 @@
vf_wait_ready(void)
{
pthread_mutex_lock(&vf_mutex);
- while (!thread_ready) {
+ while (!vf_thread_ready) {
pthread_mutex_unlock(&vf_mutex);
usleep(50000);
pthread_mutex_lock(&vf_mutex);
@@ -908,12 +854,14 @@
int my_node_id;
uint16_t port;
key_node_t *cur;
- int fd;
+ uint32_t trans;
+ msgctx_t *ctx;
block_all_signals();
port = ((struct vf_args *)arg)->port;
my_node_id = ((struct vf_args *)arg)->local_node_id;
+ ctx = ((struct vf_args *)arg)->ctx;
free(arg);
#ifdef DEBUG
@@ -921,29 +869,20 @@
#endif
pthread_mutex_lock(&vf_mutex);
-#if 0
- if ((vf_lfd = msg_listen(port, MSGP_VFS, vf_lfds, 2)) <= 0) {
- printf("Unable to set up listen socket on port %d\n",
- port);
- pthread_mutex_unlock(&vf_mutex);
- pthread_exit(NULL);
- }
-
- thread_ready = 1;
+ vf_thread_ready = 1;
pthread_mutex_unlock(&vf_mutex);
-#endif
- while (1) {
+ while (vf_thread_ready) {
pthread_mutex_lock(&key_list_mutex);
for (cur = key_list; cur; cur = cur->kn_next) {
/* Destroy timed-out join views */
- while (_vf_purge(cur, &fd) != VFR_NO) {
- msg_close(fd);
- }
+ while (_vf_purge(cur, &trans) != VFR_NO);
}
pthread_mutex_unlock(&key_list_mutex);
- vf_event_loop(my_node_id);
+ vf_event_loop(ctx, my_node_id);
}
+
+ msg_close(ctx);
return NULL;
}
@@ -952,40 +891,43 @@
/**
* Initialize VF. Initializes the View Formation sub system.
* @param my_node_id The node ID of the caller.
- * @param my_port The port of the caller.
* @return 0 on success, -1 on failure.
*/
int
vf_init(int my_node_id, uint16_t my_port, vf_vote_cb_t vcb,
vf_commit_cb_t ccb)
{
- struct vf_args *va;
-
+ struct vf_args *args;
+ msgctx_t *ctx;
if (my_node_id == (int)-1)
return -1;
+
+ while((ctx = msg_new_ctx()) == NULL)
+ sleep(1);
- if (my_port == 0)
- return -1;
+ while((args = malloc(sizeof(*args))) == NULL)
+ sleep(1);
- pthread_mutex_lock(&vf_mutex);
- if (vf_thread != (pthread_t)-1) {
- pthread_mutex_unlock(&vf_mutex);
- return 0;
+ if (msg_open(MSG_CLUSTER, 0, my_port, ctx, 1) < 0) {
+ free(ctx);
+ free(args);
+ return -1;
}
- va = malloc(sizeof(*va));
- va->local_node_id = my_node_id;
- va->port = my_port;
+ args->port = my_port;
+ args->local_node_id = my_node_id;
+ args->ctx = ctx;
- pthread_create(&vf_thread, NULL, vf_server, va);
- /* Write/read needs this */
+ pthread_mutex_lock(&vf_mutex);
_port = my_port;
_node_id = my_node_id;
default_vote_cb = vcb;
default_commit_cb = ccb;
pthread_mutex_unlock(&vf_mutex);
+ pthread_create(&vf_thread, NULL, vf_server, args);
+
vf_wait_ready();
return 0;
@@ -998,20 +940,14 @@
int
vf_shutdown(void)
{
- int x;
key_node_t *c_key;
view_node_t *c_jv;
commit_node_t *c_cn;
pthread_mutex_lock(&vf_mutex);
+ vf_thread_ready = 0;
pthread_cancel(vf_thread);
pthread_join(vf_thread, NULL);
- thread_ready = 0;
- vf_thread = (pthread_t)0;
-
- for (x = 0 ; x < vf_lfd; x++)
- msg_close(vf_lfds[x]);
-
_port = 0;
_node_id = (int)-1;
pthread_mutex_lock(&key_list_mutex);
@@ -1019,7 +955,6 @@
while ((c_key = key_list) != NULL) {
while ((c_jv = c_key->kn_jvlist) != NULL) {
- msg_close(c_jv->vn_ctx);
key_list->kn_jvlist = c_jv->vn_next;
free(c_jv);
}
@@ -1120,7 +1055,7 @@
vf_msg_t *
build_vf_data_message(int cmd, char *keyid, void *data, uint32_t datalen,
- int viewno, uint32_t *retlen)
+ int viewno, int trans, uint32_t *retlen)
{
uint32_t totallen;
vf_msg_t *msg;
@@ -1142,6 +1077,7 @@
/* Data */
strncpy(msg->vm_msg.vf_keyid,keyid,sizeof(msg->vm_msg.vf_keyid));
+ msg->vm_msg.vf_transaction = trans;
msg->vm_msg.vf_datalen = datalen;
msg->vm_msg.vf_coordinator = _node_id;
msg->vm_msg.vf_view = viewno;
@@ -1171,89 +1107,52 @@
vf_write(cluster_member_list_t *membership, uint32_t flags, char *keyid,
void *data, uint32_t datalen)
{
- int nodeid;
- msgctx_t *peer_ctx;
- int count;
+ msgctx_t everyone;
key_node_t *key_node;
vf_msg_t *join_view;
- int remain = 0, x, y, rv = 1;
+ int remain = 0, x, y, rv = VFR_ERROR;
uint32_t totallen;
+#ifdef DEBUG
struct timeval start, end, dif;
- void *lockp = NULL;
+#endif
+ struct dlm_lksb lockp;
int l;
char lock_name[256];
+ static uint32_t trans = 0;
if (!data || !datalen || !keyid || !strlen(keyid) || !membership)
return -1;
+
pthread_mutex_lock(&vf_mutex);
+ if (!trans) {
+ trans = _node_id << 16;
+ }
+ ++trans;
+
/* Obtain cluster lock on it. */
snprintf(lock_name, sizeof(lock_name), "usrm::vf");
- l = clu_lock_verbose(lock_name, CLK_EX, &lockp);
+ l = clu_lock(LKM_EXMODE, &lockp, 0, lock_name);
if (l < 0) {
- clu_unlock(lock_name, lockp);
+ clu_unlock(&lockp);
pthread_mutex_unlock(&vf_mutex);
return l;
}
- /* set to -1 */
- count = sizeof(msgctx_t) * (membership->cml_count + 1);
- peer_ctx = malloc(count);
- if(!peer_ctx) {
- pthread_mutex_unlock(&vf_mutex);
- return -1;
- }
-
- memset(peer_ctx, 0, sizeof(msgctx_t) * (membership->cml_count +1));
- for (x = 0; x < (membership->cml_count + 1); x++) {
- peer_ctx[x].type = M_NONE;
- }
+#ifdef DEBUG
getuptime(&start);
+#endif
-retry_top:
- /*
- * Connect to everyone, except ourself. We separate this from the
- * initial send cycle because the connect cycle can cause timeouts
- * within the code - ie, if a node is down, it is likely the connect
- * will take longer than the client is expecting to wait for the
- * commit/abort messages!
- *
- * We assume we're up. Since challenge-response needs both
- * processes to be operational...
- */
+ remain = 0;
for (x = 0, y = 0; x < membership->cml_count; x++) {
- if (!memb_online(membership,
- membership->cml_members[x].cn_nodeid)) {
- continue;
+ if (membership->cml_members[x].cn_member) {
+ remain++;
}
+ }
- if (peer_fds[x].type != M_NONE)
- continue;
-
- nodeid = membership->cml_members[x].cn_nodeid;
-#ifdef DEBUG
- printf("VF: Connecting to member #%d\n", (int)nodeid);
- fflush(stdout);
-#endif
-
- if (msg_open(nodeid, _port, &peer_ctx[y], 15) != 0) {
#ifdef DEBUG
- printf("VF: Connect to %d failed: %s\n", (int)nodeid,
- strerror(errno));
+ printf("aight, need responses from %d guys\n", remain);
#endif
- if (flags & VFF_RETRY)
- goto retry_top;
- if (flags & VFF_IGN_CONN_ERRORS)
- continue;
- free(peer_ctx);
-
- clu_unlock(lock_name, lockp);
- pthread_mutex_unlock(&vf_mutex);
- return -1;
- }
-
- ++y;
- }
pthread_mutex_lock(&key_list_mutex);
key_node = kn_find_key(keyid);
@@ -1261,7 +1160,7 @@
if ((vf_key_init_nt(keyid, 10, NULL, NULL) < 0)) {
pthread_mutex_unlock(&key_list_mutex);
- clu_unlock(lock_name, lockp);
+ clu_unlock(&lockp);
pthread_mutex_unlock(&vf_mutex);
return -1;
}
@@ -1270,19 +1169,19 @@
}
join_view = build_vf_data_message(VF_JOIN_VIEW, keyid, data, datalen,
- key_node->kn_viewno+1, &totallen);
+ key_node->kn_viewno+1, trans, &totallen);
pthread_mutex_unlock(&key_list_mutex);
if (!join_view) {
- clu_unlock(lock_name, lockp);
+ clu_unlock(&lockp);
pthread_mutex_unlock(&vf_mutex);
return -1;
}
#ifdef DEBUG
- printf("VF: Push %d.%d #%d\n", (int)_node_id, getpid(),
- (int)join_view->vm_msg.vf_view);
+ printf("VF: Push %d.%d #%d (X#%08x)\n", (int)_node_id, getpid(),
+ (int)join_view->vm_msg.vf_view, trans);
#endif
/*
* Encode the package.
@@ -1292,50 +1191,53 @@
/*
* Send our message to everyone
*/
- for (x = 0; peer_ctx[x].type != M_NONE; x++) {
-
- if (msg_send(&peer_ctx[x], join_view, totallen) != totallen) {
- vf_send_abort(peer_ctx);
- close_all(peer_ctx);
-
- free(join_view);
- clu_unlock(lock_name, lockp);
- pthread_mutex_unlock(&vf_mutex);
- return -1;
- }
-
- remain++;
+ if (msg_open(MSG_CLUSTER, 0, _port, &everyone, 0) < 0) {
+ printf("msg_open: fail: %s\n", strerror(errno));
+ return -1;
}
+ x = msg_send(&everyone, join_view, totallen);
+ if (x < totallen) {
+ vf_send_abort(&everyone, trans);
+#ifdef DEBUG
+ printf("VF: Aborted: Send failed (%d/%d)\n", x, totallen);
+#endif
+ msg_close(&everyone);
+ free(join_view);
+ clu_unlock(&lockp);
+ pthread_mutex_unlock(&vf_mutex);
+ return -1;
+ }
+
#ifdef DEBUG
printf("VF: Checking for consensus...\n");
#endif
/*
* See if we have a consensus =)
*/
- if ((rv = (vf_unanimous(peer_ctx, remain, VF_COORD_TIMEOUT)))) {
- vf_send_commit(peer_ctx);
+ if ((rv = (vf_unanimous(&everyone, trans, remain,
+ 5)))) {
+ vf_send_commit(&everyone, trans);
+#ifdef DEBUG
+ printf("VF: Consensus reached!\n");
+#endif
} else {
- vf_send_abort(peer_ctx);
+ vf_send_abort(&everyone, trans);
#ifdef DEBUG
printf("VF: Aborted!\n");
#endif
}
/*
- * Clean up
- */
- close_all(peer_fds);
-
- /*
* unanimous returns 1 for true; 0 for false, so negate it and
* return our value...
*/
+ msg_close(&everyone);
free(join_view);
- free(peer_fds);
- clu_unlock(lock_name, lockp);
+ clu_unlock(&lockp);
pthread_mutex_unlock(&vf_mutex);
+#ifdef DEBUG
if (rv) {
getuptime(&end);
@@ -1347,11 +1249,10 @@
dif.tv_sec--;
}
-#ifdef DEBUG
printf("VF: Converge Time: %d.%06d\n", (int)dif.tv_sec,
(int)dif.tv_usec);
-#endif
}
+#endif
return (rv?0:-1);
}
@@ -1378,12 +1279,12 @@
* VFR_COMMIT if new views were committed.
*/
static int
-_vf_purge(key_node_t *key_node, msgctx_t **ctx)
+_vf_purge(key_node_t *key_node, uint32_t *trans)
{
view_node_t *cur, *dead = NULL;
struct timeval tv;
- *fd = -1;
+ *trans = 0;
if (!key_node)
return VFR_NO;
@@ -1401,11 +1302,11 @@
if (tv_cmp(&tv, &cur->vn_timeout) < 0)
continue;
- *ctx = cur->vn_ctx;
- dead = vn_remove(&key_node->kn_jvlist, *ctx);
+ *trans = cur->vn_transaction;
+ dead = vn_remove(&key_node->kn_jvlist, *trans);
free(dead);
- printf("VF: Killed ctx %p\n", *ctx);
+ printf("VF: Killed transaction %08x\n", *trans);
/*
* returns the removed associated file descriptor
* so that we can close it and get on with life
@@ -1413,7 +1314,7 @@
break;
}
- if (*fd == -1)
+ if (*trans == 0)
return VFR_NO;
if (vf_resolve_views(key_node))
@@ -1425,13 +1326,13 @@
/**
* Process a VF message.
*
- * @param handle File descriptor on which msgp was received.
+ * @param nodeid Node id from which msgp was received.
* @param msgp Pointer to already-received message.
* @param nbytes Length of msgp.
* @return -1 on failure, 0 on success.
*/
int
-vf_process_msg(int handle, generic_msg_hdr *msgp, int nbytes)
+vf_process_msg(msgctx_t *ctx, int nodeid, generic_msg_hdr *msgp, int nbytes)
{
vf_msg_t *hdrp;
int ret;
@@ -1440,7 +1341,7 @@
(msgp->gh_command != VF_MESSAGE))
return VFR_ERROR;
- switch(msgp->gh_arg1) {
+ switch(vf_command(msgp->gh_arg1)) {
case VF_CURRENT:
#ifdef DEBUG
printf("VF: Received request for current data\n");
@@ -1455,7 +1356,7 @@
hdrp = (vf_msg_t *)msgp;
swab_vf_msg_info_t(&hdrp->vm_msg);
- return vf_send_current(handle, hdrp->vm_msg.vf_keyid);
+ return vf_send_current(ctx, hdrp->vm_msg.vf_keyid);
case VF_JOIN_VIEW:
/* Validate size... */
@@ -1475,28 +1376,28 @@
return VFR_ERROR;
}
- return vf_handle_join_view_msg(handle, hdrp);
+ return vf_handle_join_view_msg(ctx, nodeid, hdrp);
case VF_ABORT:
- printf("VF: Received VF_ABORT, fd%d\n", handle);
- vf_abort(handle);
+ printf("VF: Received VF_ABORT (X#%08x)\n", msgp->gh_arg2);
+ vf_abort(msgp->gh_arg2);
return VFR_ABORT;
case VF_VIEW_FORMED:
#ifdef DEBUG
- printf("VF: Received VF_VIEW_FORMED, fd%d\n",
- handle);
+ printf("VF: Received VF_VIEW_FORMED, %d\n",
+ nodeid);
#endif
pthread_mutex_lock(&key_list_mutex);
- vf_buffer_commit(handle);
- ret = (vf_resolve_views(kn_find_fd(handle)) ?
+ vf_buffer_commit(msgp->gh_arg2);
+ ret = (vf_resolve_views(kn_find_trans(msgp->gh_arg2)) ?
VFR_COMMIT : VFR_OK);
pthread_mutex_unlock(&key_list_mutex);
return ret;
-
+
default:
- printf("VF: Unknown msg type 0x%08x\n",
- msgp->gh_arg1);
+ /* Ignore votes and the like from this part */
+ break;
}
return VFR_OK;
@@ -1521,15 +1422,15 @@
{
key_node_t *key_node;
char lock_name[256];
- void *lockp = NULL;
+ struct dlm_lksb lockp;
int l;
/* Obtain cluster lock on it. */
pthread_mutex_lock(&vf_mutex);
snprintf(lock_name, sizeof(lock_name), "usrm::vf");
- l = clu_lock_verbose(lock_name, CLK_EX, &lockp);
+ l = clu_lock(LKM_EXMODE, &lockp, 0, lock_name);
if (l < 0) {
- clu_unlock(lock_name, lockp);
+ clu_unlock(&lockp);
pthread_mutex_unlock(&vf_mutex);
printf("Couldn't lock %s\n", keyid);
return l;
@@ -1542,7 +1443,7 @@
if (!key_node) {
if ((vf_key_init_nt(keyid, 10, NULL, NULL) < 0)) {
pthread_mutex_unlock(&key_list_mutex);
- clu_unlock(lock_name, lockp);
+ clu_unlock(&lockp);
pthread_mutex_unlock(&vf_mutex);
printf("Couldn't locate %s\n", keyid);
return VFR_ERROR;
@@ -1564,7 +1465,7 @@
pthread_mutex_unlock(&key_list_mutex);
if (!membership) {
- clu_unlock(lock_name, lockp);
+ clu_unlock(&lockp);
//printf("Membership NULL, can't find %s\n", keyid);
pthread_mutex_unlock(&vf_mutex);
return VFR_ERROR;
@@ -1573,7 +1474,7 @@
l = vf_request_current(membership, keyid, view, data,
datalen);
if (l == VFR_NODATA || l == VFR_ERROR) {
- clu_unlock(lock_name, lockp);
+ clu_unlock(&lockp);
//printf("Requesting current failed %s %d\n", keyid, l);
pthread_mutex_unlock(&vf_mutex);
return l;
@@ -1583,7 +1484,7 @@
*data = malloc(key_node->kn_datalen);
if (! *data) {
pthread_mutex_unlock(&key_list_mutex);
- clu_unlock(lock_name, lockp);
+ clu_unlock(&lockp);
pthread_mutex_unlock(&vf_mutex);
printf("Couldn't malloc %s\n", keyid);
return VFR_ERROR;
@@ -1594,7 +1495,7 @@
*view = key_node->kn_viewno;
pthread_mutex_unlock(&key_list_mutex);
- clu_unlock(lock_name, lockp);
+ clu_unlock(&lockp);
pthread_mutex_unlock(&vf_mutex);
return VFR_OK;
@@ -1659,7 +1560,7 @@
if (!key_node || !key_node->kn_data || !key_node->kn_datalen) {
pthread_mutex_unlock(&key_list_mutex);
printf("VFT: No data for keyid %s\n", keyid);
- return (msg_send_simple(ctx, VF_NACK, 0, 0) != -1)?
+ return (_send_simple(ctx, VF_NACK, 0, 0, 0) != -1)?
VFR_OK : VFR_ERROR;
}
@@ -1670,11 +1571,12 @@
msg = build_vf_data_message(VF_ACK, keyid, key_node->kn_data,
key_node->kn_datalen,
key_node->kn_viewno,
+ 0,
&totallen);
pthread_mutex_unlock(&key_list_mutex);
if (!msg)
- return (msg_send_simple(ctx, VFR_ERROR, 0, 0) != -1)?
+ return (_send_simple(ctx, VFR_ERROR, 0, 0, 0) != -1)?
VFR_OK : VFR_ERROR;
swab_vf_msg_t(msg);
@@ -1733,7 +1635,7 @@
*/
static int
vf_request_current(cluster_member_list_t *membership, char *keyid,
- int *viewno, void **data, uint32_t *datalen)
+ uint64_t *viewno, void **data, uint32_t *datalen)
{
int x, n, rv = VFR_OK, port;
msgctx_t ctx;
@@ -1761,8 +1663,7 @@
swab_vf_msg_info_t(&(msg->vm_msg));
for (x = 0; x < membership->cml_count; x++) {
- if (!memb_online(membership,
- membership->cml_members[x].cn_nodeid))
+ if (!membership->cml_members[x].cn_member)
continue;
/* Can't request from self. */
@@ -1770,10 +1671,11 @@
continue;
rv = VFR_ERROR;
- fd = msg_open(membership->cml_members[x].cn_nodeid, port,
- &ctx, 15);
- if (fd == -1)
+ if (msg_open(MSG_CLUSTER,
+ membership->cml_members[x].cn_nodeid,
+ port, &ctx, 15) < 0) {
continue;
+ }
msg = &rmsg;
//printf("VF: Requesting current value of %s from %d\n",
/cvs/cluster/cluster/rgmanager/src/daemons/rg_event.c,v --> standard output
revision 1.1
--- cluster/rgmanager/src/daemons/rg_event.c
+++ - 2006-07-11 23:52:45.033199000 +0000
@@ -0,0 +1,103 @@
+/*
+ Copyright Red Hat, Inc. 2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
+#include <resgroup.h>
+#include <rg_locks.h>
+#include <gettid.h>
+#include <assert.h>
+#include <libcman.h>
+#include <ccs.h>
+#include <clulog.h>
+
+typedef struct __rge_q {
+ list_head();
+ char rg_name[128];
+ uint32_t rg_state;
+ int rg_owner;
+} rgevent_t;
+
+
+/**
+ * resource group event queue.
+ */
+static rgevent_t *rg_ev_queue = NULL;
+static pthread_mutex_t rg_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_t rg_ev_thread = 0;
+
+void group_event(char *name, uint32_t state, int owner);
+
+
+void *
+rg_event_thread(void *arg)
+{
+ rgevent_t *ev;
+
+ while (1) {
+ pthread_mutex_lock(&rg_queue_mutex);
+ ev = rg_ev_queue;
+ if (ev)
+ list_remove(&rg_ev_queue, ev);
+ else
+ break; /* We're outta here */
+ pthread_mutex_unlock(&rg_queue_mutex);
+
+ group_event(ev->rg_name, ev->rg_state, ev->rg_owner);
+
+ free(ev);
+ }
+
+ /* Mutex held */
+ rg_ev_thread = 0;
+ pthread_mutex_unlock(&rg_queue_mutex);
+ return NULL;
+}
+
+
+void
+rg_event_q(char *name, uint32_t state, int owner)
+{
+ rgevent_t *ev;
+ pthread_attr_t attrs;
+
+ while (1) {
+ ev = malloc(sizeof(rgevent_t));
+ if (ev) {
+ break;
+ }
+ sleep(1);
+ }
+
+ memset(ev,0,sizeof(*ev));
+
+ strncpy(ev->rg_name, name, 128);
+ ev->rg_state = state;
+ ev->rg_owner = owner;
+
+ pthread_mutex_lock (&rg_queue_mutex);
+ list_insert(&rg_ev_queue, ev);
+ if (rg_ev_thread == 0) {
+ pthread_attr_init(&attrs);
+ pthread_attr_setinheritsched(&attrs, PTHREAD_INHERIT_SCHED);
+ pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
+ pthread_attr_setstacksize(&attrs, 262144);
+
+ pthread_create(&rg_ev_thread, &attrs, rg_event_thread, NULL);
+ pthread_attr_destroy(&attrs);
+ }
+ pthread_mutex_unlock (&rg_queue_mutex);
+}
--- cluster/rgmanager/src/daemons/Makefile 2006/06/02 17:37:10 1.12
+++ cluster/rgmanager/src/daemons/Makefile 2006/07/11 23:52:41 1.13
@@ -41,8 +41,8 @@
clurgmgrd: rg_thread.o rg_locks.o main.o groups.o \
rg_queue.o rg_forward.o reslist.o \
resrules.o restree.o fo_domain.o nodeevent.o \
- watchdog.o rg_state.o
- $(CC) -o $@ $^ $(INCLUDE) $(CFLAGS) $(LDFLAGS) -lccs -lcman
+ rg_event.o watchdog.o rg_state.o ../clulib/libclulib.a
+ $(CC) -o $@ $^ $(INCLUDE) $(CFLAGS) $(LDFLAGS) -lccs -lcman -lpthread -ldlm
#
# Our test program links against the local allocator so that
--- cluster/rgmanager/src/daemons/fo_domain.c 2006/06/02 17:37:10 1.8
+++ cluster/rgmanager/src/daemons/fo_domain.c 2006/07/11 23:52:41 1.9
@@ -334,7 +334,7 @@
int found = 0;
int owned_by_node = 0, started = 0, no_owner = 0;
rg_state_t svc_state;
- void *lockp;
+ struct dlm_lksb lockp;
ENTER();
@@ -414,10 +414,10 @@
*/
clulog(LOG_WARNING, "Problem getting state information for "
"%s\n", rg_name);
- rg_unlock(rg_name, lockp);
+ rg_unlock(&lockp);
RETURN(FOD_BEST);
}
- rg_unlock(rg_name, lockp);
+ rg_unlock(&lockp);
/*
* Check to see if the service is started and if we are the owner in case of
--- cluster/rgmanager/src/daemons/groups.c 2006/06/02 17:37:10 1.18
+++ cluster/rgmanager/src/daemons/groups.c 2006/07/11 23:52:41 1.19
@@ -29,9 +29,12 @@
#include <reslist.h>
#include <assert.h>
-#define cm_svccount cm_pad[0] /* Theses are uint8_t size */
-#define cm_svcexcl cm_pad[1]
+/* Use address field in this because we never use it internally,
+ and there is no extra space in the cman_node_t type.
+ */
+#define cn_svccount cn_address.cna_address[0] /* Theses are uint8_t size */
+#define cn_svcexcl cn_address.cna_address[1]
static int config_version = 0;
static resource_t *_resources = NULL;
@@ -42,6 +45,9 @@
pthread_mutex_t config_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_rwlock_t resource_lock = PTHREAD_RWLOCK_INITIALIZER;
+void res_build_name(char *, size_t, resource_t *);
+int get_rg_state_local(char *, rg_state_t *);
+
struct status_arg {
msgctx_t *ctx;
@@ -71,17 +77,16 @@
int
count_resource_groups(cluster_member_list_t *ml)
{
-#if 0
resource_t *res;
char *rgname, *val;
int x;
rg_state_t st;
- void *lockp;
+ struct dlm_lksb lockp;
cman_node_t *mp;
for (x = 0; x < ml->cml_count; x++) {
- ml->cml_members[x].cm_svccount = 0;
- ml->cml_members[x].cm_svcexcl = 0;
+ ml->cml_members[x].cn_svccount = 0;
+ ml->cml_members[x].cn_svcexcl = 0;
}
pthread_rwlock_rdlock(&resource_lock);
@@ -102,11 +107,11 @@
if (get_rg_state(rgname, &st) < 0) {
clulog(LOG_ERR, "#34: Cannot get status "
"for service %s\n", rgname);
- rg_unlock(rgname, lockp);
+ rg_unlock(&lockp);
continue;
}
- rg_unlock(rgname, lockp);
+ rg_unlock(&lockp);
if (st.rs_state != RG_STATE_STARTED &&
st.rs_state != RG_STATE_STARTING)
@@ -116,18 +121,17 @@
if (!mp)
continue;
- ++mp->cm_svccount;
+ ++mp->cn_svccount;
val = res_attr_value(res, "exclusive");
if (val && ((!strcmp(val, "yes") ||
(atoi(val)>0))) ) {
- ++mp->cm_svcexcl;
+ ++mp->cn_svcexcl;
}
} while (!list_done(&_resources, res));
pthread_rwlock_unlock(&resource_lock);
-#endif
return 0;
}
@@ -189,13 +193,13 @@
if (exclusive) {
- if (0) {//(allowed->cml_members[x].cm_svccount > 0) {
+ if (allowed->cml_members[x].cn_svccount > 0) {
/* Definitely not this guy */
continue;
} else {
score += 2;
}
- } else if (0) { //(allowed->cml_members[x].cm_svcexcl) {
+ } else if (allowed->cml_members[x].cn_svcexcl) {
/* This guy has an exclusive resource group.
Can't relocate / failover to him. */
continue;
@@ -212,6 +216,42 @@
}
+int
+check_depend(resource_t *res)
+{
+ char *val;
+ rg_state_t rs;
+
+ val = res_attr_value(res, "depend");
+ if (!val)
+ /* No dependency */
+ return -1;
+
+ if (get_rg_state_local(val, &rs) == 0)
+ return (rs.rs_state == RG_STATE_STARTED);
+
+ return 1;
+}
+
+
+int
+check_depend_safe(char *rg_name)
+{
+ resource_t *res;
+ int ret;
+
+ pthread_rwlock_rdlock(&resource_lock);
+ res = find_root_by_ref(&_resources, rg_name);
+ if (!res)
+ return -1;
+
+ ret = check_depend(res);
+ pthread_rwlock_unlock(&resource_lock);
+
+ return ret;
+}
+
+
/**
Start or failback a resource group: if it's not running, start it.
If it is running and we're a better member to run it, then ask for
@@ -224,7 +264,7 @@
char *val;
cman_node_t *mp;
int autostart, exclusive;
- void *lockp;
+ struct dlm_lksb lockp;
mp = memb_id_to_p(membership, my_id());
assert(mp);
@@ -267,7 +307,7 @@
if (get_rg_state(svcName, svcStatus) != 0) {
clulog(LOG_ERR, "#34: Cannot get status "
"for service %s\n", svcName);
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return;
}
@@ -277,17 +317,25 @@
set_rg_state(svcName, svcStatus);
}
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return;
}
}
+ /* See if service this one depends on is running. If not,
+ don't start it */
+ if (check_depend(node->rn_resource) == 0) {
+ clulog(LOG_DEBUG,
+ "Skipping RG %s: Dependency missing\n", svcName);
+ return;
+ }
+
val = res_attr_value(node->rn_resource, "exclusive");
exclusive = val && ((!strcmp(val, "yes") || (atoi(val)>0)));
- if (0) { //(exclusive && mp->cm_svccount) {
- clulog(LOG_INFO,
+ if (exclusive && mp->cn_svccount) {
+ clulog(LOG_DEBUG,
"Skipping RG %s: Exclusive and I am running services\n",
svcName);
return;
@@ -297,8 +345,8 @@
Don't start other services if I'm running an exclusive
service.
*/
- if (0) { //(mp->cm_svcexcl) {
- clulog(LOG_INFO,
+ if (mp->cn_svcexcl) {
+ clulog(LOG_DEBUG,
"Skipping RG %s: I am running an exclusive service\n",
svcName);
return;
@@ -363,8 +411,8 @@
int
eval_groups(int local, uint64_t nodeid, int nodeStatus)
{
- void *lockp = NULL;
- char *svcName, *nodeName;
+ struct dlm_lksb lockp;
+ char svcName[64], *nodeName;
resource_node_t *node;
rg_state_t svcStatus;
cluster_member_list_t *membership;
@@ -385,10 +433,7 @@
list_do(&_tree, node) {
- if (node->rn_resource->r_rule->rr_root == 0)
- continue;
-
- svcName = node->rn_resource->r_attrs->ra_value;
+ res_build_name(svcName, sizeof(svcName), node->rn_resource);
/*
* Lock the service information and get the current service
@@ -407,11 +452,11 @@
clulog(LOG_ERR,
"#34: Cannot get status for service %s\n",
svcName);
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
continue;
}
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
if (svcStatus.rs_owner == 0)
nodeName = "none";
@@ -461,7 +506,7 @@
pthread_rwlock_unlock(&resource_lock);
free_member_list(membership);
- clulog(LOG_INFO, "Event (%d:%d:%d) Processed\n", local,
+ clulog(LOG_DEBUG, "Event (%d:%d:%d) Processed\n", local,
(int)nodeid, nodeStatus);
return 0;
@@ -469,6 +514,105 @@
/**
+ * Called to decide what services to start locally after a service event.
+ *
+ * @see eval_groups
+ */
+int
+group_event(char *rg_name, uint32_t state, int owner)
+{
+ char svcName[64], *nodeName;
+ resource_node_t *node;
+ rg_state_t svcStatus;
+ cluster_member_list_t *membership;
+ int depend;
+
+ if (rg_locked()) {
+ clulog(LOG_NOTICE,
+ "Resource groups locked; not evaluating\n");
+ return -EAGAIN;
+ }
+
+ membership = member_list();
+ if (!membership)
+ return -1;
+
+ pthread_rwlock_rdlock(&resource_lock);
+
+ /* Requires read lock */
+ count_resource_groups(membership);
+
+ list_do(&_tree, node) {
+
+ res_build_name(svcName, sizeof(svcName), node->rn_resource);
+
+ if (get_rg_state_local(svcName, &svcStatus) != 0)
+ continue;
+
+ if (svcStatus.rs_owner == 0)
+ nodeName = "none";
+ else
+ nodeName = memb_id_to_name(membership,
+ svcStatus.rs_owner);
+
+ /* Disabled/failed/in recovery? Do nothing */
+ if ((svcStatus.rs_state == RG_STATE_DISABLED) ||
+ (svcStatus.rs_state == RG_STATE_FAILED) ||
+ (svcStatus.rs_state == RG_STATE_RECOVER)) {
+ continue;
+ }
+
+ depend = check_depend(node->rn_resource);
+
+ /* Skip if we have no dependency */
+ if (depend == -1)
+ continue;
+
+ /*
+ If we have:
+ (a) a met dependency
+ (b) we're in the STOPPED state, and
+ (c) our new service event is a started service
+
+ Then see if we should start this other service as well.
+ */
+ if (depend == 1 &&
+ svcStatus.rs_state == RG_STATE_STOPPED &&
+ state == RG_STATE_STARTED) {
+
+ clulog(LOG_DEBUG, "Evaluating RG %s, state %s, owner "
+ "%s\n", svcName,
+ rg_state_str(svcStatus.rs_state),
+ nodeName);
+ consider_start(node, svcName, &svcStatus, membership);
+ continue;
+ }
+
+ /*
+ If we lost a dependency for this service and it's running
+ locally, stop it.
+ */
+ if (depend == 0 &&
+ svcStatus.rs_state == RG_STATE_STARTED &&
+ svcStatus.rs_owner == my_id()) {
+
+ clulog(LOG_WARNING, "Stopping service %s: Dependency missing\n",
+ svcName);
+ rt_enqueue_request(svcName, RG_STOP, NULL, 0, my_id(),
+ 0, 0);
+ }
+
+ } while (!list_done(&_tree, node));
+
+ pthread_rwlock_unlock(&resource_lock);
+ free_member_list(membership);
+
+ return 0;
+}
+
+
+
+/**
Perform an operation on a resource group. That is, walk down the
tree for that resource group, performing the given operation on
all children in the necessary order.
@@ -576,12 +720,11 @@
@param rgname Resource group name whose state we want to send.
@see send_rg_states
*/
-int get_rg_state_local(char *, rg_state_t *);
void
send_rg_state(msgctx_t *ctx, char *rgname, int fast)
{
rg_state_msg_t msg, *msgp = &msg;
- void *lockp;
+ struct dlm_lksb lockp;
msgp->rsm_hdr.gh_magic = GENERIC_HDR_MAGIC;
msgp->rsm_hdr.gh_length = sizeof(msg);
@@ -594,10 +737,10 @@
if (rg_lock(rgname, &lockp) < 0)
return;
if (get_rg_state(rgname, &msgp->rsm_state) < 0) {
- rg_unlock(rgname, lockp);
+ rg_unlock(&lockp);
return;
}
- rg_unlock(rgname, lockp);
+ rg_unlock(&lockp);
}
swab_rg_state_msg_t(msgp);
@@ -616,19 +759,19 @@
{
msgctx_t *ctx = ((struct status_arg *)arg)->ctx;
int fast = ((struct status_arg *)arg)->fast;
- resource_t *res;
+ resource_node_t *node;
generic_msg_hdr hdr;
+ char rg[64];
free(arg);
pthread_rwlock_rdlock(&resource_lock);
- list_do(&_resources, res) {
- if (res->r_rule->rr_root == 0)
- continue;
+ list_do(&_tree, node) {
- send_rg_state(ctx, res->r_attrs[0].ra_value, fast);
- } while (!list_done(&_resources, res));
+ res_build_name(rg, sizeof(rg), node->rn_resource);
+ send_rg_state(ctx, rg, fast);
+ } while (!list_done(&_tree, node));
pthread_rwlock_unlock(&resource_lock);
@@ -637,8 +780,8 @@
/* XXX wait for client to tell us it's done; I don't know why
this is needed when doing fast I/O, but it is. */
msg_receive(ctx, &hdr, sizeof(hdr), 10);
-
msg_close(ctx);
+ msg_free_ctx(ctx);
return NULL;
}
@@ -681,21 +824,20 @@
int
svc_exists(char *svcname)
{
- resource_t *res;
+ resource_node_t *node;
int ret = 0;
+ char rg[64];
pthread_rwlock_rdlock(&resource_lock);
- list_do(&_resources, res) {
- if (res->r_rule->rr_root == 0)
- continue;
+ list_do(&_tree, node) {
+ res_build_name(rg, sizeof(rg), node->rn_resource);
- if (strcmp(res->r_attrs[0].ra_value,
- svcname) == 0) {
+ if (strcmp(rg, svcname) == 0) {
ret = 1;
break;
}
- } while (!list_done(&_resources, res));
+ } while (!list_done(&_tree, node));
pthread_rwlock_unlock(&resource_lock);
@@ -707,25 +849,22 @@
rg_doall(int request, int block, char *debugfmt)
{
resource_node_t *curr;
- char *name;
rg_state_t svcblk;
+ char rg[64];
pthread_rwlock_rdlock(&resource_lock);
list_do(&_tree, curr) {
- if (curr->rn_resource->r_rule->rr_root == 0)
- continue;
-
/* Group name */
- name = curr->rn_resource->r_attrs->ra_value;
+ res_build_name(rg, sizeof(rg), curr->rn_resource);
if (debugfmt)
- clulog(LOG_DEBUG, debugfmt, name);
+ clulog(LOG_DEBUG, debugfmt, rg);
/* Optimization: Don't bother even queueing the request
during the exit case if we don't own it */
if (request == RG_STOP_EXITING) {
- if (get_rg_state_local(name, &svcblk) < 0)
+ if (get_rg_state_local(rg, &svcblk) < 0)
continue;
/* Always run stop if we're the owner, regardless
@@ -734,7 +873,7 @@
continue;
}
- rt_enqueue_request(name, request, NULL, 0,
+ rt_enqueue_request(rg, request, NULL, 0,
0, 0, 0);
} while (!list_done(&_tree, curr));
@@ -755,21 +894,17 @@
q_status_checks(void *arg)
{
resource_node_t *curr;
- char *name;
rg_state_t svcblk;
- void *lockp;
+ char rg[64];
pthread_rwlock_rdlock(&resource_lock);
list_do(&_tree, curr) {
- if (curr->rn_resource->r_rule->rr_root == 0)
- continue;
-
/* Group name */
- name = curr->rn_resource->r_attrs->ra_value;
+ res_build_name(rg, sizeof(rg), curr->rn_resource);
/* Local check - no one will make us take a service */
- if (get_rg_state_local(name, &svcblk) < 0) {
+ if (get_rg_state_local(rg, &svcblk) < 0) {
continue;
}
@@ -777,7 +912,7 @@
svcblk.rs_state != RG_STATE_STARTED)
continue;
- rt_enqueue_request(name, RG_STATUS,
+ rt_enqueue_request(rg, RG_STATUS,
NULL, 0, 0, 0, 0);
} while (!list_done(&_tree, curr));
@@ -811,24 +946,20 @@
do_condstops(void)
{
resource_node_t *curr;
- char *name;
rg_state_t svcblk;
int need_kill;
- void *lockp;
+ char rg[64];
clulog(LOG_INFO, "Stopping changed resources.\n");
pthread_rwlock_rdlock(&resource_lock);
list_do(&_tree, curr) {
- if (curr->rn_resource->r_rule->rr_root == 0)
- continue;
-
/* Group name */
- name = curr->rn_resource->r_attrs->ra_value;
+ res_build_name(rg, sizeof(rg), curr->rn_resource);
/* If we're not running it, no need to CONDSTOP */
- if (get_rg_state_local(name, &svcblk) < 0) {
+ if (get_rg_state_local(rg, &svcblk) < 0) {
continue;
}
@@ -839,10 +970,10 @@
need_kill = 0;
if (curr->rn_resource->r_flags & RF_NEEDSTOP) {
need_kill = 1;
- clulog(LOG_DEBUG, "Removing %s\n", name);
+ clulog(LOG_DEBUG, "Removing %s\n", rg);
}
- rt_enqueue_request(name, need_kill ? RG_DISABLE : RG_CONDSTOP,
+ rt_enqueue_request(rg, need_kill ? RG_DISABLE : RG_CONDSTOP,
NULL, 0, 0, 0, 0);
} while (!list_done(&_tree, curr));
@@ -859,10 +990,10 @@
do_condstarts(void)
{
resource_node_t *curr;
- char *name, *val;
+ char rg[64], *val;
rg_state_t svcblk;
int need_init, new_groups = 0, autostart;
- void *lockp;
+ struct dlm_lksb lockp;
clulog(LOG_INFO, "Starting changed resources.\n");
@@ -870,31 +1001,26 @@
pthread_rwlock_rdlock(&resource_lock);
list_do(&_tree, curr) {
- if (curr->rn_resource->r_rule->rr_root == 0)
- continue;
-
/* Group name */
- name = curr->rn_resource->r_attrs->ra_value;
+ res_build_name(rg, sizeof(rg), curr->rn_resource);
/* New RG. We'll need to initialize it. */
need_init = 0;
if (curr->rn_resource->r_flags & RF_NEEDSTART)
need_init = 1;
- if (get_rg_state_local(name, &svcblk) < 0) {
+ if (get_rg_state_local(rg, &svcblk) < 0)
continue;
- }
- if (!need_init && svcblk.rs_owner != my_id()) {
+ if (!need_init && svcblk.rs_owner != my_id())
continue;
- }
if (need_init) {
++new_groups;
- clulog(LOG_DEBUG, "Initializing %s\n", name);
+ clulog(LOG_DEBUG, "Initializing %s\n", rg);
}
- rt_enqueue_request(name, need_init ? RG_INIT : RG_CONDSTART,
+ rt_enqueue_request(rg, need_init ? RG_INIT : RG_CONDSTART,
NULL, 0, 0, 0, 0);
} while (!list_done(&_tree, curr));
@@ -909,21 +1035,18 @@
pthread_rwlock_rdlock(&resource_lock);
list_do(&_tree, curr) {
- if (curr->rn_resource->r_rule->rr_root == 0)
- continue;
-
/* Group name */
- name = curr->rn_resource->r_attrs->ra_value;
+ res_build_name(rg, sizeof(rg), curr->rn_resource);
/* New RG. We'll need to initialize it. */
if (!(curr->rn_resource->r_flags & RF_NEEDSTART))
continue;
- if (rg_lock(name, &lockp) != 0)
+ if (rg_lock(rg, &lockp) != 0)
continue;
- if (get_rg_state(name, &svcblk) < 0) {
- rg_unlock(name, lockp);
+ if (get_rg_state(rg, &svcblk) < 0) {
+ rg_unlock(&lockp);
continue;
}
@@ -933,7 +1056,7 @@
a truly new service, it will be in the UNINITIALIZED
state, which will be caught by eval_groups. */
if (svcblk.rs_state != RG_STATE_DISABLED) {
- rg_unlock(name, lockp);
+ rg_unlock(&lockp);
continue;
}
@@ -946,9 +1069,9 @@
else
svcblk.rs_state = RG_STATE_DISABLED;
- set_rg_state(name, &svcblk);
+ set_rg_state(rg, &svcblk);
- rg_unlock(name, lockp);
+ rg_unlock(&lockp);
} while (!list_done(&_tree, curr));
pthread_rwlock_unlock(&resource_lock);
--- cluster/rgmanager/src/daemons/main.c 2006/06/02 17:37:10 1.25
+++ cluster/rgmanager/src/daemons/main.c 2006/07/11 23:52:41 1.26
@@ -31,21 +31,23 @@
#include <members.h>
#include <msgsimple.h>
#include <vf.h>
+#include <lock.h>
#include <rg_queue.h>
#include <malloc.h>
+#include <cman-private.h>
#define L_SYS (1<<1)
#define L_USER (1<<0)
-int configure_logging(int ccsfd);
+int configure_logging(int ccsfd, int debug);
+void node_event(int, uint64_t, int);
void node_event_q(int, uint64_t, int);
int daemon_init(char *);
int init_resource_groups(int);
void kill_resource_groups(void);
-void set_my_id(uint64_t);
+void set_my_id(int);
int eval_groups(int, uint64_t, int);
-void graceful_exit(int);
void flag_shutdown(int sig);
void hard_exit(void);
int send_rg_states(msgctx_t *, int);
@@ -58,6 +60,7 @@
static int signalled = 0;
uint64_t next_node_id(cluster_member_list_t *membership, uint64_t me);
+int rg_event_q(char *svcName, uint32_t state, int owner);
void
@@ -74,49 +77,9 @@
int
-send_exit_msg(uint64_t nodeid)
+send_exit_msg(msgctx_t *ctx)
{
- msgctx_t ctx;
-
- if (msg_open(nodeid, RG_PORT, &ctx, 5) < 0) {
- printf("Failed to send exit message\n");
- return -1;
- }
- msg_send_simple(&ctx, RG_EXITING, 0, 0);
- msg_close(&ctx);
-
- return 0;
-}
-
-
-/**
- Notify other resource group managers that we're leaving, since
- cluster membership is not necessarily tied to the members running
- the rgmgr.
- */
-int
-notify_exiting(void)
-{
- int x;
- uint64_t partner;
- cluster_member_list_t *membership;
-
- membership = member_list();
- if (!membership)
- return 0;
-
- for (x = 0; x < membership->cml_count; x++) {
-
- partner = membership->cml_members[x].cn_nodeid;
-
- if (partner == my_id() ||
- !membership->cml_members[x].cn_member)
- continue;
-
- send_exit_msg(partner);
- }
-
- free_member_list(membership);
+ msg_send_simple(ctx, RG_EXITING, my_id(), 0);
return 0;
}
@@ -130,60 +93,6 @@
/**
- Called to handle the transition of a cluster member from up->down or
- down->up. This handles initializing services (in the local node-up case),
- exiting due to loss of quorum (local node-down), and service fail-over
- (remote node down).
-
- @param nodeID ID of the member which has come up/gone down.
- @param nodeStatus New state of the member in question.
- @see eval_groups
- */
-void
-node_event(int local, uint64_t nodeID, int nodeStatus)
-{
- if (!running)
- return;
-
- if (local) {
-
- /* Local Node Event */
- if (nodeStatus == 0)
- hard_exit();
-
- if (!rg_initialized()) {
- if (init_resource_groups(0) != 0) {
- clulog(LOG_ERR,
- "#36: Cannot initialize services\n");
- hard_exit();
- }
- }
-
- if (shutdown_pending) {
- clulog(LOG_NOTICE, "Processing delayed exit signal\n");
- graceful_exit(SIGINT);
- }
- setup_signal(SIGINT, graceful_exit);
- setup_signal(SIGTERM, graceful_exit);
- setup_signal(SIGHUP, flag_reconfigure);
-
- eval_groups(1, nodeID, 1);
- return;
- }
-
- /*
- * Nothing to do for events from other nodes if we are not ready.
- */
- if (!rg_initialized()) {
- clulog(LOG_DEBUG, "Services not initialized.\n");
- return;
- }
-
- eval_groups(0, nodeID, nodeStatus);
-}
-
-
-/**
This updates our local membership view and handles whether or not we
should exit, as well as determines node transitions (thus, calling
node_event()).
@@ -192,22 +101,43 @@
@return 0
*/
int
-membership_update(chandle_t *clu)
+membership_update(void)
{
- cluster_member_list_t *new_ml, *node_delta, *old_membership;
+ cluster_member_list_t *new_ml = NULL, *node_delta = NULL,
+ *old_membership = NULL;
int x;
int me = 0;
+ cman_handle_t h;
+ int quorate;
- if (!rg_quorate())
- return 0;
+ h = cman_init(NULL);
+ quorate = cman_is_quorate(h);
+ if (!quorate) {
+ cman_finish(h);
+
+ if (!rg_quorate())
+ return -1;
+
+ clulog(LOG_EMERG, "#1: Quorum Dissolved\n");
+ rg_set_inquorate();
+ member_list_update(NULL);/* Clear member list */
+ rg_lockall(L_SYS);
+ rg_doall(RG_INIT, 1, "Emergency stop of %s");
+ rg_set_uninitialized();
+ return -1;
+ } else if (!rg_quorate()) {
- clulog(LOG_INFO, "Magma Event: Membership Change\n");
+ rg_set_quorate();
+ rg_unlockall(L_SYS);
+ rg_unlockall(L_USER);
+ clulog(LOG_NOTICE, "Quorum Formed\n");
+ }
old_membership = member_list();
- new_ml = get_member_list(clu->c_cluster);
+ new_ml = get_member_list(h);
member_list_update(new_ml);
+ cman_finish(h);
- clulog(LOG_DEBUG, "I am node #%lld\n", my_id());
/*
* Handle nodes lost. Do our local node event first.
@@ -218,7 +148,8 @@
if (me) {
/* Should not happen */
clulog(LOG_INFO, "State change: LOCAL OFFLINE\n");
- free_member_list(node_delta);
+ if (node_delta)
+ free_member_list(node_delta);
node_event(1, my_id(), 0);
/* NOTREACHED */
}
@@ -238,7 +169,6 @@
}
}
- /* Free nodes */
free_member_list(node_delta);
/*
@@ -263,8 +193,7 @@
clulog(LOG_INFO, "State change: %s UP\n",
node_delta->cml_members[x].cn_name);
- node_event_q(0, node_delta->cml_members[x].cn_nodeid,
- 1);
+ node_event_q(0, node_delta->cml_members[x].cn_nodeid, 1);
}
free_member_list(node_delta);
@@ -309,10 +238,61 @@
}
-int
+#if 0
+struct lr_arg {
+ msgctx_t *ctx;
+ int req;
+};
+
+
+void *
+lockreq_th(void *a)
+{
+ int ret;
+ char state;
+ struct lr_arg *lr_arg = (struct lr_arg *)a;
+ cluster_member_list_t *m = member_list();
+
+ state = (lr_arg->req==RG_LOCK)?1:0;
+ ret = vf_write(m, VFF_IGN_CONN_ERRORS, "rg_lockdown", &state, 1);
+ free_member_list(m);
+
+ if (ret == 0) {
+ msg_send_simple(lr_arg->ctx, RG_SUCCESS, 0, 0);
+ } else {
+ msg_send_simple(lr_arg->ctx, RG_FAIL, 0, 0);
+ }
+
+ msg_close(lr_arg->ctx);
+ msg_free_ctx(lr_arg->ctx);
+ free(lr_arg);
+ return NULL;
+}
+
+
+void
+do_lockreq(msgctx_t *ctx, int req)
+{
+ pthread_t th;
+ struct lr_arg *arg;
+
+ arg = malloc(sizeof(*arg));
+ if (!arg) {
+ msg_send_simple(ctx, RG_FAIL, 0, 0);
+ msg_close(ctx);
+ msg_free_ctx(ctx);
+ return 0;
+ }
+
+ arg->ctx = ctx;
+ arg->req = req;
+
+ pthread_create(&th, NULL, lockreq_th, (void *)arg);
+}
+#else
+void
do_lockreq(msgctx_t *ctx, int req)
{
-#if 0
int ret;
char state;
cluster_member_list_t *m = member_list();
@@ -326,9 +306,9 @@
} else {
msg_send_simple(ctx, RG_FAIL, 0, 0);
}
-#endif
- return 0;
}
+#endif
+
/**
@@ -341,20 +321,35 @@
* @see quorum_msg
*/
int
-dispatch_msg(msgctx_t *ctx, uint64_t nodeid)
+dispatch_msg(msgctx_t *ctx, int nodeid, int need_close)
{
- int ret = -1;
+ int ret = 0, sz = -1;
char msgbuf[4096];
- generic_msg_hdr *msg_hdr = (generic_msg_hdr *)msg_hdr;
+ generic_msg_hdr *msg_hdr = (generic_msg_hdr *)msgbuf;
SmMessageSt *msg_sm = (SmMessageSt *)msgbuf;
+ memset(msgbuf, 0, sizeof(msgbuf));
+
/* Peek-a-boo */
- ret = msg_receive(ctx, msg_hdr, sizeof(msgbuf), 10);
- if (ret < sizeof (generic_msg_hdr)) {
- clulog(LOG_ERR, "#37: Error receiving message header\n");
+ sz = msg_receive(ctx, msg_hdr, sizeof(msgbuf), 10);
+ if (sz < sizeof (generic_msg_hdr)) {
+ clulog(LOG_ERR, "#37: Error receiving message header (%d)\n", sz);
goto out;
}
+ if (sz < 0)
+ return -1;
+
+ if (sz > sizeof(msgbuf)) {
+ raise(SIGSTOP);
+ }
+
+ /*
+ printf("RECEIVED %d %d %d %p\n", sz, (int)sizeof(msgbuf),
+ (int)sizeof(generic_msg_hdr), ctx);
+ msg_print(ctx);
+ */
+
/* Decode the header */
swab_generic_msg_hdr(msg_hdr);
if ((msg_hdr->gh_magic != GENERIC_HDR_MAGIC)) {
@@ -364,28 +359,23 @@
goto out;
}
- if (msg_hdr->gh_length != ret) {
+ if (msg_hdr->gh_length != sz) {
clulog(LOG_ERR, "#XX: Read size mismatch: %d %d\n",
ret, msg_hdr->gh_length);
goto out;
}
- ret = 0;
switch (msg_hdr->gh_command) {
case RG_STATUS:
- clulog(LOG_DEBUG, "Sending service states to ctx%p\n",ctx);
+ clulog(LOG_DEBUG, "Sending service states to CTX%p\n",ctx);
send_rg_states(ctx, msg_hdr->gh_arg1);
+ need_close = 0;
break;
case RG_LOCK:
- if (rg_quorate())
- do_lockreq(ctx, RG_LOCK);
- msg_close(ctx);
- break;
-
case RG_UNLOCK:
if (rg_quorate())
- do_lockreq(ctx, RG_UNLOCK);
+ do_lockreq(ctx, msg_hdr->gh_command);
break;
case RG_QUERY_LOCK:
@@ -395,19 +385,18 @@
}
break;
-
case RG_ACTION_REQUEST:
- if (ret != sizeof(msg_sm)) {
+ if (sz < sizeof(msg_sm)) {
clulog(LOG_ERR,
- "#39: Error receiving entire request\n");
+ "#39: Error receiving entire request (%d/%d)\n",
+ ret, (int)sizeof(msg_sm));
ret = -1;
goto out;
}
/* XXX perf: reencode header */
swab_generic_msg_hdr(msg_hdr);
-
/* Decode SmMessageSt message */
swab_SmMessageSt(msg_sm);
@@ -430,21 +419,47 @@
ctx, 0, msg_sm->sm_data.d_svcOwner, 0, 0);
return 0;
+ case RG_EVENT:
+ /* Service event. Run a dependency check */
+ if (sz < sizeof(msg_sm)) {
+ clulog(LOG_ERR,
+ "#39: Error receiving entire request (%d/%d)\n",
+ ret, (int)sizeof(msg_sm));
+ ret = -1;
+ goto out;
+ }
+
+ /* XXX perf: reencode header */
+ swab_generic_msg_hdr(msg_hdr);
+ /* Decode SmMessageSt message */
+ swab_SmMessageSt(msg_sm);
+
+ /* Send to our rg event handler */
+ rg_event_q(msg_sm->sm_data.d_svcName,
+ msg_sm->sm_data.d_action,
+ msg_sm->sm_data.d_svcOwner);
+ break;
+
case RG_EXITING:
clulog(LOG_NOTICE, "Member %d is now offline\n", (int)nodeid);
-
node_event(0, nodeid, 0);
break;
+ case VF_MESSAGE:
+ /* Ignore; our VF thread handles these */
+ break;
+
default:
clulog(LOG_DEBUG, "unhandled message request %d\n",
msg_hdr->gh_command);
break;
}
-
out:
- msg_close(ctx);
+ if (need_close) {
+ msg_close(ctx);
+ msg_free_ctx(ctx);
+ }
return ret;
}
@@ -455,47 +470,65 @@
@return Event
*/
int
-handle_cluster_event(chandle_t *clu, msgctx_t *ctx)
+handle_cluster_event(msgctx_t *ctx)
{
int ret;
+ msgctx_t *newctx;
+ int nodeid;
ret = msg_wait(ctx, 0);
+
switch(ret) {
case M_PORTOPENED:
+ msg_receive(ctx, NULL, 0, 0);
+ clulog(LOG_DEBUG, "Event: Port Opened\n");
+ membership_update();
+ break;
case M_PORTCLOSED:
/* Might want to handle powerclosed like membership change */
+ msg_receive(ctx, NULL, 0, 0);
+ clulog(LOG_DEBUG, "Event: Port Closed\n");
+ membership_update();
+ break;
case M_NONE:
+ msg_receive(ctx, NULL, 0, 0);
clulog(LOG_DEBUG, "NULL cluster message\n");
break;
case M_OPEN:
+ newctx = msg_new_ctx();
+ if (msg_accept(ctx, newctx) >= 0 &&
+ rg_quorate()) {
+ /* Handle message */
+ /* When request completes, the fd is closed */
+ nodeid = msg_get_nodeid(newctx);
+ dispatch_msg(newctx, nodeid, 1);
+ break;
+ }
+ break;
+
+ case M_DATA:
+ dispatch_msg(ctx, nodeid, 0);
+ break;
+
case M_OPEN_ACK:
case M_CLOSE:
clulog(LOG_DEBUG, "I should NOT get here: %d\n",
ret);
break;
case M_STATECHANGE:
+ msg_receive(ctx, NULL, 0, 0);
clulog(LOG_DEBUG, "Membership Change Event\n");
if (rg_quorate() && running) {
rg_unlockall(L_SYS);
- membership_update(clu);
+ membership_update();
}
break;
- rg_set_quorate();
- rg_unlockall(L_SYS);
- rg_unlockall(L_USER);
- clulog(LOG_NOTICE, "Quorum Achieved\n");
- membership_update(clu);
+ case 998:
break;
case 999:
- clulog(LOG_EMERG, "#1: Quorum Dissolved\n");
- rg_set_inquorate();
- member_list_update(NULL); /* Clear member list */
- rg_lockall(L_SYS);
- rg_doall(RG_INIT, 1, "Emergency stop of %s");
- rg_set_uninitialized();
- break;
case M_TRY_SHUTDOWN:
+ msg_receive(ctx, NULL, 0, 0);
clulog(LOG_WARNING, "#67: Shutting down uncleanly\n");
rg_set_inquorate();
rg_doall(RG_INIT, 1, "Emergency stop of %s");
@@ -512,15 +545,13 @@
void dump_threads(void);
int
-event_loop(chandle_t *clu)
+event_loop(msgctx_t *localctx, msgctx_t *clusterctx)
{
int n, max, ret;
fd_set rfds;
- msgctx_t newctx;
+ msgctx_t *newctx;
struct timeval tv;
int nodeid;
- msgctx_t *localctx = clu->local_ctx;
- msgctx_t *clusterctx = clu->cluster_ctx;
tv.tv_sec = 10;
tv.tv_usec = 0;
@@ -545,7 +576,8 @@
break;
if (msg_fd_isset(clusterctx, &rfds)) {
- handle_cluster_event(clu, clusterctx);
+ msg_fd_clr(clusterctx, &rfds);
+ handle_cluster_event(clusterctx);
continue;
}
@@ -553,7 +585,9 @@
continue;
}
- ret = msg_accept(localctx, &newctx);
+ msg_fd_clr(localctx, &rfds);
+ newctx = msg_new_ctx();
+ ret = msg_accept(localctx, newctx);
if (ret == -1)
continue;
@@ -561,25 +595,27 @@
if (rg_quorate()) {
/* Handle message */
/* When request completes, the fd is closed */
- nodeid = msg_get_nodeid(&newctx);
- dispatch_msg(&newctx, nodeid);
+ nodeid = msg_get_nodeid(newctx);
+ dispatch_msg(newctx, nodeid, 1);
continue;
}
if (!rg_initialized()) {
- msg_close(&newctx);
+ msg_close(newctx);
+ msg_free_ctx(newctx);
continue;
}
if (!rg_quorate()) {
printf("Dropping connect: NO QUORUM\n");
- msg_close(&newctx);
+ msg_close(newctx);
+ msg_free_ctx(newctx);
}
}
if (need_reconfigure || check_config_update()) {
need_reconfigure = 0;
- configure_logging(-1);
+ configure_logging(-1, 0);
init_resource_groups(1);
return 0;
}
@@ -606,13 +642,6 @@
void
-graceful_exit(int sig)
-{
- running = 0;
-}
-
-
-void
hard_exit(void)
{
rg_lockall(L_SYS);
@@ -625,12 +654,9 @@
void
cleanup(msgctx_t *clusterctx)
{
- rg_lockall(L_SYS);
- rg_doall(RG_STOP_EXITING, 1, NULL);
- //vf_shutdown();
kill_resource_groups();
member_list_update(NULL);
- notify_exiting();
+ send_exit_msg(clusterctx);
}
@@ -649,7 +675,7 @@
* Configure logging based on data in cluster.conf
*/
int
-configure_logging(int ccsfd)
+configure_logging(int ccsfd, int dbg)
{
char *v;
char internal = 0;
@@ -667,7 +693,8 @@
}
if (ccs_get(ccsfd, "/cluster/rm/@log_level", &v) == 0) {
- clu_set_loglevel(atoi(v));
+ if (!dbg)
+ clu_set_loglevel(atoi(v));
free(v);
}
@@ -679,20 +706,21 @@
void
-clu_initialize(chandle_t *clu)
+clu_initialize(cman_handle_t **ch)
{
- cman_node_t me;
+ if (!ch)
+ exit(1);
- clu->c_cluster = cman_init(NULL);
- if (!clu->c_cluster) {
+ *ch = cman_init(NULL);
+ if (!(*ch)) {
clulog(LOG_NOTICE, "Waiting for CMAN to start\n");
- while (!(clu->c_cluster = cman_init(NULL))) {
+ while (!(*ch = cman_init(NULL))) {
sleep(1);
}
}
- if (!cman_is_quorate(clu->c_cluster)) {
+ if (!cman_is_quorate(*ch)) {
/*
There are two ways to do this; this happens to be the simpler
of the two. The other method is to join with a NULL group
@@ -701,14 +729,11 @@
*/
clulog(LOG_NOTICE, "Waiting for quorum to form\n");
- while (cman_is_quorate(clu->c_cluster) == 0) {
+ while (cman_is_quorate(*ch) == 0) {
sleep(1);
}
clulog(LOG_NOTICE, "Quorum formed, starting\n");
}
-
- cman_get_node(clu->c_cluster, CMAN_NODEID_US, &me);
- clu->c_nodeid = me.cn_nodeid;
}
@@ -722,15 +747,28 @@
}
+void *
+shutdown_thread(void *arg)
+{
+ rg_doall(RG_STOP_EXITING, 1, NULL);
+ running = 0;
+
+ return 0;
+}
+
+
int
main(int argc, char **argv)
{
int rv;
char foreground = 0;
- int quorate;
- int listen_fds[2], listeners;
- int myNodeID;
- chandle_t clu;
+ cman_node_t me;
+ msgctx_t *cluster_ctx;
+ msgctx_t *local_ctx;
+ int port = RG_PORT;
+ pthread_t th;
+
+ cman_handle_t *clu = NULL;
while ((rv = getopt(argc, argv, "fd")) != EOF) {
switch (rv) {
@@ -759,13 +797,25 @@
}
clu_initialize(&clu);
- set_my_id(clu.c_nodeid);
+ if (cman_init_subsys(clu) < 0) {
+ perror("cman_init_subsys");
+ return -1;
+ }
+
+ if (clu_lock_init("rgmanager") != 0) {
+ printf("Locks not working!\n");
+ return -1;
+ }
+
+ memset(&me, 0, sizeof(me));
+ cman_get_node(clu, CMAN_NODEID_US, &me);
+ set_my_id(me.cn_nodeid);
/*
We know we're quorate. At this point, we need to
read the resource group trees from ccsd.
*/
- configure_logging(-1);
+ configure_logging(-1, debug);
clulog(LOG_NOTICE, "Resource Group Manager Starting\n");
if (init_resource_groups(0) != 0) {
@@ -778,46 +828,60 @@
setup_signal(SIGUSR1, statedump);
unblock_signal(SIGCHLD);
setup_signal(SIGPIPE, SIG_IGN);
+
if (debug) {
setup_signal(SIGSEGV, segfault);
} else {
unblock_signal(SIGSEGV);
}
- if (msg_init(&clu) < 0) {
- clulog(LOG_CRIT, "#10: Couldn't set up message system\n");
+ if (msg_listen(MSG_SOCKET, RGMGR_SOCK, me.cn_nodeid, &local_ctx) < 0) {
+ clulog(LOG_CRIT,
+ "#10: Couldn't set up cluster message system: %s\n",
+ strerror(errno));
+ return -1;
+ }
+
+ if (msg_listen(MSG_CLUSTER, &port , me.cn_nodeid, &cluster_ctx) < 0) {
+ clulog(LOG_CRIT,
+ "#10b: Couldn't set up cluster message system: %s\n",
+ strerror(errno));
return -1;
}
rg_set_quorate();
- set_my_id(clu.c_nodeid);
+
+ /*
+ msg_print(local_ctx);
+ msg_print(cluster_ctx);
+ */
/*
Initialize the VF stuff.
*/
-#if 0
- if (vf_init(clu.c_nodeid, RG_VF_PORT, NULL, NULL) != 0) {
+ if (vf_init(me.cn_nodeid, RG_PORT, NULL, NULL) != 0) {
clulog(LOG_CRIT, "#11: Couldn't set up VF listen socket\n");
return -1;
}
vf_key_init("rg_lockdown", 10, NULL, lock_commit_cb);
-#endif
-
- /*
- Get an initial membership view.
- */
- membership_update(&clu);
/*
Do everything useful
*/
- while (running)
- event_loop(&clu);
+ while (running) {
+ event_loop(local_ctx, cluster_ctx);
+
+ if (shutdown_pending == 1) {
+ ++shutdown_pending;
+ clulog(LOG_NOTICE, "Shutting down\n");
+ pthread_create(&th, NULL, shutdown_thread, NULL);
+ }
+ }
- clulog(LOG_NOTICE, "Shutting down\n");
- cleanup(&clu);
+ cleanup(cluster_ctx);
clulog(LOG_NOTICE, "Shutdown complete, exiting\n");
+ cman_finish(clu);
/*malloc_dump_table(); */ /* Only works if alloc.c us used */
/*malloc_stats();*/
--- cluster/rgmanager/src/daemons/nodeevent.c 2006/06/02 17:37:10 1.2
+++ cluster/rgmanager/src/daemons/nodeevent.c 2006/07/11 23:52:41 1.3
@@ -20,6 +20,9 @@
#include <rg_locks.h>
#include <gettid.h>
#include <assert.h>
+#include <libcman.h>
+#include <ccs.h>
+#include <clulog.h>
typedef struct __ne_q {
list_head();
@@ -28,8 +31,6 @@
int ne_state;
} nevent_t;
-int node_event(int, uint64_t, int);
-
/**
* Node event queue.
*/
@@ -38,11 +39,120 @@
static pthread_t ne_thread = 0;
int ne_queue_request(int local, uint64_t nodeid, int state);
+void hard_exit(void);
+int init_resource_groups(int);
+void flag_shutdown(int sig);
+void flag_reconfigure(int sig);
+
+extern int running;
+extern int shutdown_pending;
+
+
+/**
+ Called to handle the transition of a cluster member from up->down or
+ down->up. This handles initializing services (in the local node-up case),
+ exiting due to loss of quorum (local node-down), and service fail-over
+ (remote node down).
+
+ @param nodeID ID of the member which has come up/gone down.
+ @param nodeStatus New state of the member in question.
+ @see eval_groups
+ */
+void
+node_event(int local, uint64_t nodeID, int nodeStatus)
+{
+ if (!running)
+ return;
+
+ if (local) {
+
+ /* Local Node Event */
+ if (nodeStatus == 0)
+ hard_exit();
+
+ if (!rg_initialized()) {
+ if (init_resource_groups(0) != 0) {
+ clulog(LOG_ERR,
+ "#36: Cannot initialize services\n");
+ hard_exit();
+ }
+ }
+
+ if (shutdown_pending) {
+ clulog(LOG_NOTICE, "Processing delayed exit signal\n");
+ running = 0;
+ }
+ setup_signal(SIGINT, flag_shutdown);
+ setup_signal(SIGTERM, flag_shutdown);
+ setup_signal(SIGHUP, flag_reconfigure);
+
+ eval_groups(1, nodeID, 1);
+ return;
+ }
+
+ /*
+ * Nothing to do for events from other nodes if we are not ready.
+ */
+ if (!rg_initialized()) {
+ clulog(LOG_DEBUG, "Services not initialized.\n");
+ return;
+ }
+
+ eval_groups(0, nodeID, nodeStatus);
+}
+
+
+int
+node_has_fencing(int nodeid)
+{
+ int ccs_desc;
+ char *val = NULL;
+ char buf[1024];
+ int ret = 1;
+
+ ccs_desc = ccs_connect();
+ if (ccs_desc < 0) {
+ clulog(LOG_ERR, "Unable to connect to ccsd; cannot handle"
+ " node event!\n");
+ /* Assume node has fencing */
+ return 1;
+ }
+
+ snprintf(buf, sizeof(buf),
+ "/cluster/clusternodes/clusternode[@nodeid=\"%d\"]"
+ "/fence/method/device/@name", nodeid);
+
+ if (ccs_get(ccs_desc, buf, &val) != 0)
+ ret = 0;
+ if (val)
+ free(val);
+ ccs_disconnect(ccs_desc);
+ return ret;
+}
+
+
+int
+node_fenced(int nodeid)
+{
+ cman_handle_t ch;
+ int fenced = 0;
+ uint64_t fence_time;
+
+ ch = cman_init(NULL);
+ if (cman_get_fenceinfo(ch, nodeid, &fence_time, &fenced, NULL) < 0)
+ fenced = 0;
+
+ cman_finish(ch);
+
+ return fenced;
+}
+
void *
node_event_thread(void *arg)
{
nevent_t *ev;
+ int notice;
while (1) {
pthread_mutex_lock(&ne_queue_mutex);
@@ -53,6 +163,22 @@
break; /* We're outta here */
pthread_mutex_unlock(&ne_queue_mutex);
+ if (ev->ne_state == 0 && node_has_fencing(ev->ne_nodeid)) {
+ notice = 0;
+ while (!node_fenced(ev->ne_nodeid)) {
+ if (!notice) {
+ notice = 1;
+ clulog(LOG_INFO, "Waiting for "
+ "node #%d to be fenced\n",
+ ev->ne_nodeid);
+ }
+ sleep(2);
+ }
+ if (notice)
+ clulog(LOG_INFO, "Node #%d fenced; "
+ "continuing\n", ev->ne_nodeid);
+ }
+
node_event(ev->ne_local, ev->ne_nodeid, ev->ne_state);
free(ev);
@@ -60,7 +186,6 @@
/* Mutex held */
ne_thread = 0;
- rg_dec_threads();
pthread_mutex_unlock(&ne_queue_mutex);
return NULL;
}
@@ -96,8 +221,6 @@
pthread_create(&ne_thread, &attrs, node_event_thread, NULL);
pthread_attr_destroy(&attrs);
-
- rg_inc_threads();
}
pthread_mutex_unlock (&ne_queue_mutex);
}
--- cluster/rgmanager/src/daemons/reslist.c 2006/06/02 17:37:10 1.13
+++ cluster/rgmanager/src/daemons/reslist.c 2006/07/11 23:52:41 1.14
@@ -33,6 +33,13 @@
char *attr_value(resource_node_t *node, char *attrname);
char *rg_attr_value(resource_node_t *node, char *attrname);
+void
+res_build_name(char *buf, size_t buflen, resource_t *res)
+{
+ snprintf(buf, buflen, "%s:%s", res->r_rule->rr_type,
+ res->r_attrs[0].ra_value);
+}
+
/**
Find and determine an attribute's value.
@@ -265,11 +272,23 @@
find_root_by_ref(resource_t **reslist, char *ref)
{
resource_t *curr;
+ char ref_buf[128];
+ char *type;
+ char *name;
int x;
+ snprintf(ref_buf, sizeof(ref_buf), "%s", ref);
+
+ type = ref_buf;
+ if ((name = strchr(ref_buf, ':'))) {
+ *name = 0;
+ name++;
+ } else {
+ /* Default type */
+ type = "service";
+ }
+
list_do(reslist, curr) {
- if (curr->r_rule->rr_root == 0)
- continue;
/*
This should be one operation - the primary attr
@@ -277,15 +296,18 @@
*/
for (x = 0; curr->r_attrs && curr->r_attrs[x].ra_name;
x++) {
+ if (strcmp(type, curr->r_rule->rr_type))
+ continue;
if (!(curr->r_attrs[x].ra_flags & RA_PRIMARY))
continue;
- if (strcmp(ref, curr->r_attrs[x].ra_value))
+ if (strcmp(name, curr->r_attrs[x].ra_value))
continue;
return curr;
}
} while (!list_done(reslist, curr));
+
return NULL;
}
@@ -447,8 +469,6 @@
int x;
printf("Resource type: %s", res->r_rule->rr_type);
- if (res->r_rule->rr_root)
- printf(" [ROOT]");
if (res->r_flags & RF_INLINE)
printf(" [INLINE]");
if (res->r_flags & RF_NEEDSTART)
--- cluster/rgmanager/src/daemons/restree.c 2006/06/02 17:37:10 1.19
+++ cluster/rgmanager/src/daemons/restree.c 2006/07/11 23:52:41 1.20
@@ -1,5 +1,5 @@
/*
- Copyright Red Hat, Inc. 2004
+ Copyright Red Hat, Inc. 2004-2006
This program is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
--- cluster/rgmanager/src/daemons/rg_forward.c 2006/06/02 17:37:10 1.4
+++ cluster/rgmanager/src/daemons/rg_forward.c 2006/07/11 23:52:41 1.5
@@ -47,24 +47,26 @@
{
rg_state_t rgs;
request_t *req = (request_t *)arg;
- void *lockp;
+ struct dlm_lksb lockp;
msgctx_t ctx;
SmMessageSt msg;
if (rg_lock(req->rr_group, &lockp) != 0) {
msg_close(req->rr_resp_ctx);
+ msg_free_ctx(req->rr_resp_ctx);
rq_free(req);
pthread_exit(NULL);
}
if (get_rg_state(req->rr_group, &rgs) != 0) {
- rg_unlock(req->rr_group, lockp);
+ rg_unlock(&lockp);
msg_close(req->rr_resp_ctx);
+ msg_free_ctx(req->rr_resp_ctx);
rq_free(req);
pthread_exit(NULL);
}
- rg_unlock(req->rr_group, lockp);
+ rg_unlock(&lockp);
/* Construct message */
build_message(&msg, req->rr_request, req->rr_group, req->rr_target);
@@ -74,8 +76,9 @@
rg_req_str(req->rr_request), (int)rgs.rs_owner);
*/
- if (msg_open(rgs.rs_owner, RG_PORT, &ctx, 10) < 0) {
+ if (msg_open(MSG_CLUSTER, rgs.rs_owner, RG_PORT, &ctx, 10) < 0) {
msg_close(req->rr_resp_ctx);
+ msg_free_ctx(req->rr_resp_ctx);
rq_free(req);
pthread_exit(NULL);
}
@@ -83,6 +86,7 @@
if (msg_send(&ctx, &msg, sizeof(msg)) != sizeof(msg)) {
msg_close(&ctx);
msg_close(req->rr_resp_ctx);
+ msg_free_ctx(req->rr_resp_ctx);
rq_free(req);
pthread_exit(NULL);
}
@@ -90,6 +94,7 @@
if (msg_receive(&ctx, &msg, sizeof(msg),10) != sizeof(msg)) {
msg_close(&ctx);
msg_close(req->rr_resp_ctx);
+ msg_free_ctx(req->rr_resp_ctx);
rq_free(req);
pthread_exit(NULL);
}
--- cluster/rgmanager/src/daemons/rg_state.c 2006/06/02 17:37:10 1.16
+++ cluster/rgmanager/src/daemons/rg_state.c 2006/07/11 23:52:41 1.17
@@ -26,6 +26,7 @@
#include <string.h>
#include <resgroup.h>
#include <clulog.h>
+#include <lock.h>
#include <rg_locks.h>
#include <ccs.h>
#include <rg_queue.h>
@@ -41,6 +42,7 @@
int set_rg_state(char *servicename, rg_state_t *svcblk);
int get_rg_state(char *servicename, rg_state_t *svcblk);
void get_recovery_policy(char *rg_name, char *buf, size_t buflen);
+int check_depend_safe(char *servicename);
uint64_t
@@ -67,11 +69,35 @@
}
+void
+broadcast_event(char *svcName, uint32_t state)
+{
+ SmMessageSt msgp;
+ msgctx_t everyone;
+
+ msgp.sm_hdr.gh_magic = GENERIC_HDR_MAGIC;
+ msgp.sm_hdr.gh_command = RG_EVENT;
+ msgp.sm_hdr.gh_length = sizeof(msgp);
+ msgp.sm_data.d_action = state;
+ strncpy(msgp.sm_data.d_svcName, svcName,
+ sizeof(msgp.sm_data.d_svcName));
+ msgp.sm_data.d_svcOwner = 0;
+ msgp.sm_data.d_ret = 0;
+
+ swab_SmMessageSt(&msgp);
+
+ if (msg_open(MSG_CLUSTER, 0, RG_PORT, &everyone, 0) < 0)
+ return;
+
+ msg_send(&everyone, &msgp, sizeof(msgp));
+ msg_close(&everyone);
+}
+
+
int
svc_report_failure(char *svcName)
{
-#if 0
- void *lockp = NULL;
+ struct dlm_lksb lockp;
rg_state_t svcStatus;
char *nodeName;
cluster_member_list_t *membership;
@@ -85,10 +111,10 @@
if (get_rg_state(svcName, &svcStatus) != 0) {
clulog(LOG_ERR, "#42: Couldn't obtain status for RG %s\n",
svcName);
- clu_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return -1;
}
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
membership = member_list();
nodeName = memb_id_to_name(membership, svcStatus.rs_last_owner);
@@ -107,104 +133,27 @@
"#4: Administrator intervention required.\n",
svcName, nodeName);
-#endif
return 0;
}
int
-clu_lock_verbose(char *resource, int dflt_flags, void **lockpp)
-{
-#if 0
- int ret, timed_out = 0;
- struct timeval start, now;
- uint64_t nodeid, *p;
- int flags;
- int block = !(dflt_flags & CLK_NOWAIT);
-
- /* Holder not supported for this call */
- dflt_flags &= ~CLK_HOLDER;
-
- flags = dflt_flags;
-
- if (block) {
- gettimeofday(&start, NULL);
- start.tv_sec += 30;
- }
- while (1) {
- if (block) {
- gettimeofday(&now, NULL);
-
- if ((now.tv_sec > start.tv_sec) ||
- ((now.tv_sec == start.tv_sec) &&
- (now.tv_usec >= start.tv_usec))) {
-
- gettimeofday(&start, NULL);
- start.tv_sec += 30;
-
- timed_out = 1;
- flags |= CLK_HOLDER;
- }
- }
-
- *lockpp = NULL;
- ret = clu_lock(resource, flags | CLK_NOWAIT, lockpp);
-
- if ((ret != 0) && (errno == EAGAIN) && block) {
- if (timed_out) {
- p = (uint64_t *)*lockpp;
- if (p) {
- nodeid = *p;
- clulog(LOG_WARNING, "Node ID:%08x%08x"
- " stuck with lock %s\n",
- (uint32_t)(nodeid>>32&0xffffffff),
- (uint32_t)nodeid&0xffffffff,
- resource);
- free(p);
- } else {
- clulog(LOG_WARNING, "Starving for lock"
- " %s\n", resource);
- }
- flags = dflt_flags;
- timed_out = 0;
- }
- usleep(random()&32767<<1);
- continue;
-
- } else if (ret == 0) {
- /* Success */
- return 0;
- }
-
- break;
- }
-
- return ret;
-#endif
- return -1;
-}
-
-
-int
#ifdef DEBUG
-_rg_lock(char *name, void **p)
+_rg_lock(char *name, struct dlm_lksb *p)
#else
-rg_lock(char *name, void **p)
+rg_lock(char *name, struct dlm_lksb *p)
#endif
{
-#if 0
char res[256];
snprintf(res, sizeof(res), "usrm::rg=\"%s\"", name);
- return clu_lock_verbose(res, CLK_EX, p);
-#endif
- return -1;
+ return clu_lock(LKM_EXMODE, p, 0, res);
}
#ifdef DEBUG
int
-_rg_lock_dbg(char *name, void **p, char *file, int line)
+_rg_lock_dbg(char *name, struct dlm_lksb *p, char *file, int line)
{
dprintf("rg_lock(%s) @ %s:%d\n", name, file, line);
return _rg_lock(name, p);
@@ -215,27 +164,21 @@
int
#ifdef DEBUG
-_rg_unlock(char *name, void *p)
+_rg_unlock(struct dlm_lksb *p)
#else
-rg_unlock(char *name, void *p)
+rg_unlock(struct dlm_lksb *p)
#endif
{
-#if 0
- char res[256];
-
- snprintf(res, sizeof(res), "usrm::rg=\"%s\"", name);
- return clu_unlock(res, p);
-#endif
- return -1;
+ return clu_unlock(p);
}
#ifdef DEBUG
int
-_rg_unlock_dbg(char *name, void *p, char *file, int line)
+_rg_unlock_dbg(void *p, char *file, int line)
{
- dprintf("rg_unlock(%s) @ %s:%d\n", name, file, line);
- return _rg_unlock(name, p);
+ dprintf("rg_unlock() @ %s:%d\n", file, line);
+ return _rg_unlock(p);
}
#endif
@@ -293,7 +236,6 @@
int
set_rg_state(char *name, rg_state_t *svcblk)
{
-#if 0
cluster_member_list_t *membership;
char res[256];
int ret;
@@ -307,8 +249,6 @@
sizeof(*svcblk));
free_member_list(membership);
return ret;
-#endif
- return -1;
}
@@ -329,7 +269,6 @@
int
get_rg_state(char *name, rg_state_t *svcblk)
{
-#if 0
char res[256];
int ret;
void *data = NULL;
@@ -381,8 +320,7 @@
free(data);
free_member_list(membership);
-#endif
- return -1;
+ return 0;
}
@@ -390,7 +328,6 @@
int
get_rg_state_local(char *name, rg_state_t *svcblk)
{
-#if 0
char res[256];
int ret;
void *data = NULL;
@@ -422,9 +359,7 @@
/* Copy out the data. */
memcpy(svcblk, data, sizeof(*svcblk));
free(data);
-
-#endif
- return -1;
+ return 0;
}
@@ -714,7 +649,7 @@
int
svc_start(char *svcName, int req)
{
- void *lockp = NULL;
+ struct dlm_lksb lockp;
int ret;
rg_state_t svcStatus;
@@ -725,7 +660,7 @@
}
if (get_rg_state(svcName, &svcStatus) != 0) {
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
clulog(LOG_ERR, "#46: Failed getting status for RG %s\n",
svcName);
return FAIL;
@@ -734,13 +669,13 @@
/* LOCK HELD */
switch (svc_advise_start(&svcStatus, svcName, req)) {
case 0: /* Don't start service, return FAIL */
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return FAIL;
case 2: /* Don't start service, return 0 */
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return 0;
case 3:
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return RG_EAGAIN;
default:
break;
@@ -760,11 +695,11 @@
if (set_rg_state(svcName, &svcStatus) != 0) {
clulog(LOG_ERR,
"#47: Failed changing service status\n");
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return FAIL;
}
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
ret = group_op(svcName, RG_START);
ret = !!ret; /* Either it worked or it didn't. Ignore all the
@@ -780,19 +715,22 @@
if (set_rg_state(svcName, &svcStatus) != 0) {
clulog(LOG_ERR,
"#75: Failed changing service status\n");
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return FAIL;
}
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
- if (ret == 0)
+ if (ret == 0) {
clulog(LOG_NOTICE,
"Service %s started\n",
svcName);
- else
+
+ broadcast_event(svcName, RG_STATE_STARTED);
+ } else {
clulog(LOG_WARNING,
"#68: Failed to start %s; return value: %d\n",
svcName, ret);
+ }
return ret;
}
@@ -807,7 +745,7 @@
int
svc_status(char *svcName)
{
- void *lockp = NULL;
+ struct dlm_lksb lockp;
rg_state_t svcStatus;
if (rg_lock(svcName, &lockp) < 0) {
@@ -817,12 +755,12 @@
}
if (get_rg_state(svcName, &svcStatus) != 0) {
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
clulog(LOG_ERR, "#49: Failed getting status for RG %s\n",
svcName);
return FAIL;
}
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
if (svcStatus.rs_owner != my_id())
/* Don't check status for anything not owned */
@@ -847,7 +785,7 @@
static int
_svc_stop(char *svcName, int req, int recover, uint32_t newstate)
{
- void *lockp = NULL;
+ struct dlm_lksb lockp;
rg_state_t svcStatus;
int ret;
@@ -864,7 +802,7 @@
}
if (get_rg_state(svcName, &svcStatus) != 0) {
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
clulog(LOG_ERR, "#51: Failed getting status for RG %s\n",
svcName);
return FAIL;
@@ -872,18 +810,18 @@
switch (svc_advise_stop(&svcStatus, svcName, req)) {
case 0:
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
clulog(LOG_DEBUG, "Unable to stop RG %s in %s state\n",
svcName, rg_state_str(svcStatus.rs_state));
return FAIL;
case 2:
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return SUCCESS;
case 3:
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return RG_EFORWARD;
case 4:
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return RG_EAGAIN;
default:
break;
@@ -900,11 +838,11 @@
//printf("rg state = %s\n", rg_state_str(svcStatus.rs_state));
if (set_rg_state(svcName, &svcStatus) != 0) {
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
clulog(LOG_ERR, "#52: Failed changing RG status\n");
return FAIL;
}
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
ret = group_op(svcName, RG_STOP);
@@ -918,7 +856,7 @@
_svc_stop_finish(char *svcName, int failed, uint32_t newstate)
{
rg_state_t svcStatus;
- void *lockp;
+ struct dlm_lksb lockp;
if (rg_lock(svcName, &lockp) == FAIL) {
clulog(LOG_ERR, "#53: Unable to obtain cluster lock: %s\n",
@@ -927,7 +865,7 @@
}
if (get_rg_state(svcName, &svcStatus) != 0) {
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
clulog(LOG_ERR, "#54: Failed getting status for RG %s\n",
svcName);
return FAIL;
@@ -935,7 +873,7 @@
if ((svcStatus.rs_state != RG_STATE_STOPPING) &&
(svcStatus.rs_state != RG_STATE_ERROR)) {
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return 0;
}
@@ -945,11 +883,13 @@
if (failed) {
clulog(LOG_CRIT, "#12: RG %s failed to stop; intervention "
"required\n", svcName);
- svcStatus.rs_state = RG_STATE_FAILED;
- } else if (svcStatus.rs_state == RG_STATE_ERROR)
+ newstate = RG_STATE_FAILED;
+ } else if (svcStatus.rs_state == RG_STATE_ERROR) {
svcStatus.rs_state = RG_STATE_RECOVER;
- else
- svcStatus.rs_state = newstate;
+ newstate = RG_STATE_RECOVER;
+ }
+
+ svcStatus.rs_state = newstate;
clulog(LOG_NOTICE, "Service %s is %s\n", svcName,
rg_state_str(svcStatus.rs_state));
@@ -957,11 +897,13 @@
svcStatus.rs_transition = (uint64_t)time(NULL);
if (set_rg_state(svcName, &svcStatus) != 0) {
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
clulog(LOG_ERR, "#55: Failed changing RG status\n");
return FAIL;
}
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
+
+ broadcast_event(svcName, newstate);
return 0;
}
@@ -999,7 +941,7 @@
int
svc_fail(char *svcName)
{
- void *lockp = NULL;
+ struct dlm_lksb lockp;
rg_state_t svcStatus;
if (rg_lock(svcName, &lockp) == FAIL) {
@@ -1011,7 +953,7 @@
clulog(LOG_DEBUG, "Handling failure request for RG %s\n", svcName);
if (get_rg_state(svcName, &svcStatus) != 0) {
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
clulog(LOG_ERR, "#56: Failed getting status for RG %s\n",
svcName);
return FAIL;
@@ -1019,7 +961,7 @@
if ((svcStatus.rs_state == RG_STATE_STARTED) &&
(svcStatus.rs_owner != my_id())) {
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
clulog(LOG_DEBUG, "Unable to disable RG %s in %s state\n",
svcName, rg_state_str(svcStatus.rs_state));
return FAIL;
@@ -1036,11 +978,13 @@
svcStatus.rs_transition = (uint64_t)time(NULL);
svcStatus.rs_restarts = 0;
if (set_rg_state(svcName, &svcStatus) != 0) {
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
clulog(LOG_ERR, "#57: Failed changing RG status\n");
return FAIL;
}
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
+
+ broadcast_event(svcName, RG_STATE_FAILED);
return 0;
}
@@ -1068,7 +1012,7 @@
/* Open a connection to the other node */
- if (msg_open(target, RG_PORT, &ctx, 2)< 0) {
+ if (msg_open(MSG_CLUSTER, target, RG_PORT, &ctx, 2)< 0) {
clulog(LOG_ERR,
"#58: Failed opening connection to member #%d\n",
target);
@@ -1355,6 +1299,11 @@
return FAIL;
}
free_member_list(membership);
+
+ /* Check for dependency. We cannot start unless our
+ dependency is met */
+ if (check_depend_safe(svcName) == 0)
+ return RG_EDEPEND;
/*
* This is a 'root' start request. We need to clear out our failure
--- cluster/rgmanager/src/daemons/test.c 2005/03/21 22:00:31 1.4
+++ cluster/rgmanager/src/daemons/test.c 2006/07/11 23:52:41 1.5
@@ -1,3 +1,21 @@
+/*
+ Copyright Red Hat, Inc. 2004-2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
#include <libxml/parser.h>
#include <libxml/xmlmemory.h>
#include <libxml/xpath.h>
--- cluster/rgmanager/src/resources/service.sh 2006/06/02 17:37:10 1.5
+++ cluster/rgmanager/src/resources/service.sh 2006/07/11 23:52:41 1.6
@@ -125,6 +125,17 @@
</shortdesc>
<content type="string"/>
</parameter>
+
+ <parameter name="depend">
+ <longdesc lang="en">
+ Top-level service this depends on, in "service:name" format.
+ </longdesc>
+ <shortdesc lang="en">
+ Service dependency; will not start without the specified
+ service running.
+ </shortdesc>
+ <content type="string"/>
+ </parameter>
</parameters>
<actions>
--- cluster/rgmanager/src/utils/clustat.c 2006/06/02 17:37:11 1.17
+++ cluster/rgmanager/src/utils/clustat.c 2006/07/11 23:52:41 1.18
@@ -6,6 +6,7 @@
#include <libgen.h>
#include <ncurses.h>
#include <term.h>
+#include <rg_types.h>
#include <termios.h>
#include <ccs.h>
#include <libcman.h>
@@ -53,7 +54,7 @@
struct timeval tv;
- if (msg_open(0, RG_PORT, &ctx, 10) < 0) {
+ if (msg_open(MSG_SOCKET, 0, 0, &ctx, 10) < 0) {
return NULL;
}
@@ -107,6 +108,7 @@
}
swab_generic_msg_hdr(msgp);
+
if (msgp->gh_command == RG_SUCCESS) {
free(msgp);
break;
@@ -117,7 +119,6 @@
return NULL;
}
-
rsmp = (rg_state_msg_t *)msgp;
swab_rg_state_t(&rsmp->rsm_state);
@@ -157,46 +158,63 @@
char buf[128];
char *name;
cluster_member_list_t *ret = NULL;
+ cman_node_t *nodes = NULL;
desc = ccs_connect();
if (desc < 0) {
return NULL;
}
- x = 1;
+ while ((ret = malloc(sizeof(*ret))) == NULL)
+ sleep(1);
- snprintf(buf, sizeof(buf),
- "/cluster/clusternodes/clusternode[%d]/@name", x);
- while (ccs_get(desc, buf, &name) == 0) {
- if (!ret) {
- ret = malloc(cml_size(x));
- if (!ret) {
+ x = 1;
+ while (1) {
+ snprintf(buf, sizeof(buf),
+ "/cluster/clusternodes/clusternode[%d]/@name", x);
+
+ if (ccs_get(desc, buf, &name) != 0)
+ break;
+
+ if (!nodes) {
+ nodes = malloc(x * sizeof(cman_node_t));
+ if (!nodes ) {
perror("malloc");
ccs_disconnect(desc);
exit(1);
}
- memset(ret, 0, cml_size(x));
+ memset(nodes, 0, x * sizeof(cman_node_t));
} else {
- ret = realloc(ret, cml_size(x));
- if (!ret) {
+ nodes = realloc(ret, x * sizeof(cman_node_t));
+ if (!nodes) {
perror("realloc");
ccs_disconnect(desc);
exit(1);
}
}
- memset(&ret->cml_members[x-1], 0, sizeof(cman_node_t));
- strncpy(ret->cml_members[x-1].cn_name, name,
- sizeof(ret->cml_members[x-1].cn_name));
+ memset(&nodes[x-1], 0, sizeof(cman_node_t));
+ strncpy(nodes[x-1].cn_name, name,
+ sizeof(nodes[x-1].cn_name));
free(name);
+ /* Add node ID */
+ snprintf(buf, sizeof(buf),
+ "/cluster/clusternodes/clusternode[%d]/@nodeid", x);
+ if (ccs_get(desc, buf, &name) == 0) {
+ nodes[x-1].cn_nodeid = atoi(name);
+ free(name);
+ }
+
ret->cml_count = x;
++x;
- snprintf(buf, sizeof(buf),
- "/cluster/clusternodes/clusternode[%d]/@name", x);
}
ccs_disconnect(desc);
+
+ ret->cml_members = nodes;
+
+
return ret;
}
@@ -238,9 +256,12 @@
if (!m) {
printf("%s not found\n", these->cml_members[x].cn_name);
/* WTF? It's not in our config */
- printf("realloc %d\n", (int)cml_size((all->cml_count+1)));
- all = realloc(all, cml_size((all->cml_count+1)));
- if (!all) {
+ printf("realloc %d\n", (int)((all->cml_count+1) *
+ sizeof(cman_node_t)));
+ all->cml_members = realloc(all->cml_members,
+ (all->cml_count+1) *
+ sizeof(cman_node_t));
+ if (!all->cml_members) {
perror("realloc");
exit(1);
}
@@ -440,7 +461,8 @@
void
txt_member_state(cman_node_t *node)
{
- printf(" %-40.40s ", node->cn_name);
+ printf(" %-34.34s %4d ", node->cn_name,
+ node->cn_nodeid);
if (node->cn_member & FLAG_UP)
printf("Online");
@@ -481,8 +503,8 @@
{
int x;
- printf(" %-40.40s %s\n", "Member Name", "Status");
- printf(" %-40.40s %s\n", "------ ----", "------");
+ printf(" %-34.34s %-4.4s %s\n", "Member Name", "ID", "Status");
+ printf(" %-34.34s %-4.4s %s\n", "------ ----", "----", "------");
for (x = 0; x < membership->cml_count; x++) {
if (name && strcmp(membership->cml_members[x].cn_name, name))
--- cluster/rgmanager/src/utils/clusvcadm.c 2006/06/02 17:37:11 1.8
+++ cluster/rgmanager/src/utils/clusvcadm.c 2006/07/11 23:52:41 1.9
@@ -72,7 +72,7 @@
membership = get_member_list(ch);
me = get_my_nodeid(ch);
- if (msg_open(0, RG_PORT, &ctx, 5) < 0) {
+ if (msg_open(MSG_SOCKET, 0, RG_PORT, &ctx, 5) < 0) {
printf("Could not connect to resource group manager\n");
goto out;
}
@@ -179,7 +179,7 @@
main(int argc, char **argv)
{
extern char *optarg;
- char *svcname=NULL, nodename[256];
+ char *svcname=NULL, nodename[256], realsvcname[64];
int opt;
msgctx_t ctx;
cman_handle_t ch;
@@ -259,7 +259,12 @@
usage(basename(argv[0]));
return 1;
}
-
+
+ if (!strchr(svcname,':')) {
+ snprintf(realsvcname, sizeof(realsvcname), "service:%s",
+ svcname);
+ svcname = realsvcname;
+ }
/* No login */
ch = cman_init(NULL);
@@ -287,18 +292,19 @@
*/
}
+ strcpy(nodename,"me");
build_message(&msg, action, svcname, svctarget);
if (action != RG_RELOCATE) {
printf("Member %s %s %s", nodename, actionstr, svcname);
printf("...");
fflush(stdout);
- msg_open(0, RG_PORT, &ctx, 5);
+ msg_open(MSG_SOCKET, 0, RG_PORT, &ctx, 5);
} else {
printf("Trying to relocate %s to %s", svcname, nodename);
printf("...");
fflush(stdout);
- msg_open(0, RG_PORT, &ctx, 5);
+ msg_open(MSG_SOCKET, 0, RG_PORT, &ctx, 5);
}
if (ctx.type < 0) {
@@ -307,7 +313,9 @@
return 1;
}
- if (msg_send(&ctx, &msg, sizeof(msg)) != sizeof(msg)) {
+ opt = msg_send(&ctx, &msg, sizeof(msg));
+
+ if (opt < sizeof(msg)) {
perror("msg_send");
fprintf(stderr, "Could not send entire message!\n");
return 1;
@@ -340,6 +348,9 @@
case RG_EAGAIN:
printf("failed: Try again (resource groups locked)\n");
break;
+ case RG_EDEPEND:
+ printf("failed: Operation would break dependency\n");
+ break;
default:
printf("failed: unknown reason %d\n", msg.sm_data.d_ret);
break;
^ permalink raw reply [flat|nested] only message in thread
only message in thread, other threads:[~2006-07-11 23:52 UTC | newest]
Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2006-07-11 23:52 [Cluster-devel] cluster/rgmanager/src clulib/Makefile clulib/a lhh
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.