From: lhh@sourceware.org <lhh@sourceware.org>
To: cluster-devel.redhat.com
Subject: [Cluster-devel] cluster/rgmanager/src clulib/Makefile clulib/a ...
Date: 11 Jul 2006 23:52:46 -0000 [thread overview]
Message-ID: <20060711235246.32496.qmail@sourceware.org> (raw)
CVSROOT: /cvs/cluster
Module name: cluster
Changes by: lhh at sourceware.org 2006-07-11 23:52:42
Modified files:
rgmanager/src/clulib: Makefile alloc.c lock.c lockspace.c
members.c message.c msgsimple.c
rg_strings.c signals.c vft.c
rgmanager/src/daemons: Makefile fo_domain.c groups.c main.c
nodeevent.c reslist.c restree.c
rg_forward.c rg_state.c test.c
rgmanager/src/resources: service.sh
rgmanager/src/utils: clustat.c clusvcadm.c
Added files:
rgmanager/src/clulib: cman.c locktest.c msg_cluster.c
msg_socket.c msgtest.c
rgmanager/src/daemons: rg_event.c
Log message:
- Make rgmanager actually do things.
- Finish port of rgmanager to CMAN messaging.
- Add feature to wait for nodes to be fenced prior to handling a
node-down event.
- Add direct DLM lock support.
- Fix local communication.
- Optimize VF data distribution algorithm to use CMAN/Totem's broadcast
mode; this should make rgmanager much more scalable.
- Add multiplexing for CMAN communications so threads can have pseudo
private channels over the One CMAN socket.
- Add service->service dependencies based on service events.
- Add node ID display to clustat text-mode output.
Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/cman.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/locktest.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/msg_cluster.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/msg_socket.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/msgtest.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/Makefile.diff?cvsroot=cluster&r1=1.8&r2=1.9
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/alloc.c.diff?cvsroot=cluster&r1=1.7&r2=1.8
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/lock.c.diff?cvsroot=cluster&r1=1.1&r2=1.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/lockspace.c.diff?cvsroot=cluster&r1=1.1&r2=1.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/members.c.diff?cvsroot=cluster&r1=1.1&r2=1.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/message.c.diff?cvsroot=cluster&r1=1.1&r2=1.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/msgsimple.c.diff?cvsroot=cluster&r1=1.5&r2=1.6
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/rg_strings.c.diff?cvsroot=cluster&r1=1.3&r2=1.4
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/signals.c.diff?cvsroot=cluster&r1=1.1&r2=1.2
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/vft.c.diff?cvsroot=cluster&r1=1.13&r2=1.14
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/rg_event.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/Makefile.diff?cvsroot=cluster&r1=1.12&r2=1.13
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/fo_domain.c.diff?cvsroot=cluster&r1=1.8&r2=1.9
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/groups.c.diff?cvsroot=cluster&r1=1.18&r2=1.19
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/main.c.diff?cvsroot=cluster&r1=1.25&r2=1.26
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/nodeevent.c.diff?cvsroot=cluster&r1=1.2&r2=1.3
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/reslist.c.diff?cvsroot=cluster&r1=1.13&r2=1.14
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/restree.c.diff?cvsroot=cluster&r1=1.19&r2=1.20
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/rg_forward.c.diff?cvsroot=cluster&r1=1.4&r2=1.5
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/rg_state.c.diff?cvsroot=cluster&r1=1.16&r2=1.17
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/test.c.diff?cvsroot=cluster&r1=1.4&r2=1.5
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/resources/service.sh.diff?cvsroot=cluster&r1=1.5&r2=1.6
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/utils/clustat.c.diff?cvsroot=cluster&r1=1.17&r2=1.18
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/utils/clusvcadm.c.diff?cvsroot=cluster&r1=1.8&r2=1.9
/cvs/cluster/cluster/rgmanager/src/clulib/cman.c,v --> standard output
revision 1.1
--- cluster/rgmanager/src/clulib/cman.c
+++ - 2006-07-11 23:52:43.661578000 +0000
@@ -0,0 +1,264 @@
+/*
+ Copyright Red Hat, Inc. 2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
+/**
+ pthread mutex wrapper for a global CMAN handle
+ */
+#include <stdio.h>
+#include <pthread.h>
+#include <libcman.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/types.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <signal.h>
+#include <sys/socket.h>
+#include <fcntl.h>
+
+static cman_handle_t *_chandle = NULL;
+static pthread_mutex_t _chandle_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t _chandle_cond = PTHREAD_COND_INITIALIZER;
+static pthread_t _chandle_holder = 0;
+static int _chandle_preempt = 0;
+static int _wakeup_pipe[2] = { -1, -1 };
+
+static void
+_set_nonblock(int fd)
+{
+ int flags;
+
+ flags = fcntl(fd, F_GETFL, 0);
+ if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
+ perror("fcntl");
+}
+
+
+/**
+ Lock / return the global CMAN handle.
+
+ @param block If nonzero, we wait until the handle is released
+ @param preempt If nonzero, *try* to wake up the holder who has
+ taken the lock with cman_lock_preemptible. Will not
+ wake up holders which took it with cman_lock().
+ @return NULL / errno on failure; the global CMAN handle
+ on success.
+ */
+cman_handle_t *
+cman_lock(int block, int preempt)
+{
+ pthread_t tid;
+ cman_handle_t *ret = NULL;
+
+ pthread_mutex_lock(&_chandle_lock);
+ if (_chandle == NULL) {
+ errno = ENOSYS;
+ goto out_unlock;
+ }
+
+ tid = pthread_self();
+ if (_chandle_holder == tid) {
+ errno = EDEADLK;
+ goto out_unlock;
+ }
+
+ if (_chandle_holder > 0) {
+ if (!block) {
+ errno = EAGAIN;
+ goto out_unlock;
+ }
+
+ /* Try to wake up the holder! */
+ if (preempt)
+ write(_wakeup_pipe[1], "!", 1);
+
+ /* Blocking call; do the cond-thing */
+ pthread_cond_wait(&_chandle_cond, &_chandle_lock);
+ }
+
+ _chandle_holder = tid;
+ ret = _chandle;
+out_unlock:
+ pthread_mutex_unlock(&_chandle_lock);
+ return ret;
+}
+
+
+/**
+ Lock / return the global CMAN handle.
+
+ @param block If nonzero, we wait until the handle is released
+ @param preempt_fd Caller should include this file descriptor in
+ blocking calls to select(2), so that we can wake
+ it up if someone calls with cman_lock(xxx, 1);
+ @return NULL / errno on failure; the global CMAN handle
+ on success.
+ */
+cman_handle_t *
+cman_lock_preemptible(int block, int *preempt_fd)
+{
+ pthread_t tid;
+ cman_handle_t *ret = NULL;
+
+ if (preempt_fd == NULL) {
+ errno = EINVAL;
+ return NULL;
+ }
+
+ pthread_mutex_lock(&_chandle_lock);
+ if (_chandle == NULL) {
+ errno = ENOSYS;
+ goto out_unlock;
+ }
+
+ tid = pthread_self();
+ if (_chandle_holder == tid) {
+ errno = EDEADLK;
+ goto out_unlock;
+ }
+
+ if (_chandle_holder > 0) {
+ if (!block) {
+ errno = EAGAIN;
+ goto out_unlock;
+ }
+
+ /* Blocking call; do the cond-thing */
+ pthread_cond_wait(&_chandle_cond, &_chandle_lock);
+ }
+
+ *preempt_fd = _wakeup_pipe[0];
+ _chandle_holder = tid;
+ _chandle_preempt = 1;
+ ret = _chandle;
+out_unlock:
+ pthread_mutex_unlock(&_chandle_lock);
+ return ret;
+}
+
+
+/**
+ Release the global CMAN handle
+
+ @param ch Should match the global handle
+ @return -1 on failure, 0 on success
+ */
+int
+cman_unlock(cman_handle_t *ch)
+{
+ int ret = -1;
+ char c;
+
+ pthread_mutex_lock(&_chandle_lock);
+ if (_chandle == NULL) {
+ errno = ENOSYS;
+ goto out_unlock;
+ }
+
+ if (_chandle_holder != pthread_self() || !_chandle_holder) {
+ errno = EBUSY;
+ goto out_unlock;
+ }
+
+ if (_chandle != ch) {
+ errno = EINVAL;
+ goto out_unlock;
+ }
+
+ /* Empty wakeup pipe if we took it with the preempt flag */
+ if (_chandle_preempt)
+ read(_wakeup_pipe[0], &c, 1);
+
+ _chandle_preempt = 0;
+ _chandle_holder = 0;
+ ret = 0;
+
+out_unlock:
+ pthread_mutex_unlock(&_chandle_lock);
+ if (ret == 0)
+ pthread_cond_broadcast(&_chandle_cond);
+ return ret;
+}
+
+
+int
+cman_init_subsys(cman_handle_t *ch)
+{
+ int ret = -1;
+
+ pthread_mutex_lock(&_chandle_lock);
+ if (_chandle) {
+ errno = EAGAIN;
+ goto out_unlock;
+ }
+
+ if (!ch) {
+ errno = EAGAIN;
+ goto out_unlock;
+ }
+
+ if (pipe(_wakeup_pipe) < 0) {
+ goto out_unlock;
+ }
+
+ _set_nonblock(_wakeup_pipe[0]);
+ _chandle = ch;
+ _chandle_holder = 0;
+ ret = 0;
+
+out_unlock:
+ pthread_mutex_unlock(&_chandle_lock);
+ return ret;
+}
+
+
+int
+cman_cleanup_subsys(void)
+{
+ int ret = -1;
+
+ pthread_mutex_lock(&_chandle_lock);
+ if (!_chandle) {
+ errno = EAGAIN;
+ goto out_unlock;
+ }
+
+ if (_chandle_holder > 0) {
+ pthread_cond_wait(&_chandle_cond, &_chandle_lock);
+ }
+
+ ret = 0;
+ _chandle = NULL;
+ _chandle_holder = 0;
+
+ close(_wakeup_pipe[0]);
+ close(_wakeup_pipe[1]);
+
+out_unlock:
+ pthread_mutex_unlock(&_chandle_lock);
+ return ret;
+}
+
+
+int
+cman_send_data_unlocked(void *buf, int len, int flags,
+ uint8_t port, int nodeid)
+{
+ return cman_send_data(_chandle, buf, len, flags, port, nodeid);
+}
/cvs/cluster/cluster/rgmanager/src/clulib/locktest.c,v --> standard output
revision 1.1
--- cluster/rgmanager/src/clulib/locktest.c
+++ - 2006-07-11 23:52:43.746107000 +0000
@@ -0,0 +1,85 @@
+/*
+ Copyright Red Hat, Inc. 2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
+#include <lock.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <pthread.h>
+#include <signal.h>
+
+
+void *
+lock_thread(void *arg)
+{
+ struct dlm_lksb lksb;
+
+ while(1) {
+ printf("Taking lock..\n");
+ clu_lock(LKM_EXMODE, &lksb, 0, arg);
+ printf("Thread acquired lock on %s\n", (char *)arg);
+ clu_unlock(&lksb);
+ }
+}
+
+
+
+
+int
+main(int argc, char **argv)
+{
+ struct dlm_lksb lksb;
+ int ret;
+ pthread_t th;
+
+ if (clu_lock_init("Testing") != 0) {
+ perror("clu_lock_init");
+ return 1;
+ }
+
+ if (argc < 2) {
+ printf("Lock what?\n");
+ return 1;
+ }
+
+ if (argc == 3) {
+ pthread_create(&th, NULL, lock_thread, strdup(argv[1]));
+ }
+
+ memset(&lksb,0,sizeof(lksb));
+ ret = clu_lock(LKM_EXMODE, &lksb, 0, argv[1]);
+ if (ret < 0) {
+ perror("clu_lock");
+ return 1;
+ }
+
+ printf("Acquired lock on %s; press enter to release\n", argv[1]);
+ getchar();
+
+ clu_unlock(&lksb);
+
+ if (argc == 3) {
+ printf("Press enter to kill lock thread...\n");
+ getchar();
+ pthread_kill(th, SIGTERM);
+ }
+
+ clu_lock_finished("Testing");
+
+ return 0;
+}
/cvs/cluster/cluster/rgmanager/src/clulib/msg_cluster.c,v --> standard output
revision 1.1
--- cluster/rgmanager/src/clulib/msg_cluster.c
+++ - 2006-07-11 23:52:43.828222000 +0000
@@ -0,0 +1,1104 @@
+/*
+ Copyright Red Hat, Inc. 2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
+#define _MESSAGE_BUILD
+#include <message.h>
+#include <stdio.h>
+#include <pthread.h>
+#include <libcman.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/types.h>
+#include <time.h>
+#include <sys/time.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <signal.h>
+#include <signals.h>
+#include <cman-private.h>
+
+/* Ripped from ccsd's setup_local_socket */
+#define MAX_CONTEXTS 32 /* Testing; production should be 1024-ish */
+
+int cluster_msg_close(msgctx_t *ctx);
+
+/* Context 0 is reserved for control messages */
+
+/* Local-ish contexts */
+static pthread_mutex_t context_lock = PTHREAD_MUTEX_INITIALIZER;
+static msgctx_t *contexts[MAX_CONTEXTS];
+static int _me = 0;
+pthread_t comms_thread;
+int thread_running;
+
+#define is_established(ctx) \
+ (((ctx->type == MSG_CLUSTER) && \
+ (ctx->u.cluster_info.remote_ctx && \
+ ctx->u.cluster_info.local_ctx)) || \
+ ((ctx->type == MSG_SOCKET) && \
+ (ctx->u.local_info.sockfd != -1)))
+
+static msg_ops_t cluster_msg_ops;
+
+
+static int
+cluster_msg_send(msgctx_t *ctx, void *msg, size_t len)
+{
+ char buf[4096];
+ cluster_msg_hdr_t *h = (void *)buf;
+ int ret;
+ char *msgptr = (buf + sizeof(*h));
+
+ errno = EINVAL;
+ if (ctx->type != MSG_CLUSTER)
+ return -1;
+ if (!(ctx->flags & SKF_WRITE))
+ return -1;
+ if ((len + sizeof(*h)) > sizeof(buf)) {
+ errno = E2BIG;
+ return -1;
+ }
+
+ h->msg_control = M_DATA;
+ h->src_ctx = ctx->u.cluster_info.local_ctx;
+ h->dest_ctx = ctx->u.cluster_info.remote_ctx;
+ h->msg_port = ctx->u.cluster_info.port;
+ memcpy(msgptr, msg, len);
+ h->src_nodeid = _me;
+
+ /*
+ printf("sending cluster message, length = %d to nodeid %d port %d\n",
+ len + sizeof(*h), ctx->u.cluster_info.nodeid, ctx->u.cluster_info.port);
+ */
+
+ swab_cluster_msg_hdr_t(h);
+
+ ret = cman_send_data_unlocked((void *)h, len + sizeof(*h),
+ ctx->u.cluster_info.nodeid,
+ ctx->u.cluster_info.port, 0);
+
+ return len + sizeof(h);
+}
+
+
+/**
+ Assign a (free) cluster context ID if possible
+ */
+static int
+assign_ctx(msgctx_t *ctx)
+{
+ int start;
+ static uint32_t context_index = 1;
+
+ /* Assign context index */
+ pthread_mutex_lock(&context_lock);
+
+ start = context_index;
+ do {
+ context_index++;
+ if (context_index >= MAX_CONTEXTS)
+ context_index = 1;
+
+ if (!contexts[context_index]) {
+ contexts[start] = ctx;
+ ctx->u.cluster_info.local_ctx = start;
+ pthread_mutex_unlock(&context_lock);
+ return 0;
+ }
+ } while (context_index != start);
+
+ pthread_mutex_unlock(&context_lock);
+
+ errno = EAGAIN;
+ return -1;
+}
+
+
+/* See if anything's on the cluster socket. If so, dispatch it
+ on to the requisite queues
+ XXX should be passed a connection arg! */
+static int
+poll_cluster_messages(int timeout)
+{
+ int ret = -1;
+ fd_set rfds;
+ int fd, lfd, max;
+ struct timeval tv;
+ struct timeval *p = NULL;
+ cman_handle_t *ch;
+
+ if (timeout >= 0) {
+ p = &tv;
+ tv.tv_sec = tv.tv_usec = timeout;
+ }
+
+ FD_ZERO(&rfds);
+
+ /* This sucks - it could cause other threads trying to get a
+ membership list to block for a long time. Now, that should not
+ happen. Basically, when we get a membership event, we generate
+ a new membership list in a locally cached copy and reference
+ that.
+
+ */
+ ch = cman_lock_preemptible(1, &lfd);
+ if (!ch) {
+ printf("%s\n", strerror(errno));
+ }
+
+ fd = cman_get_fd(ch);
+ if (fd < 0) {
+ cman_unlock(ch);
+ return 0;
+ }
+ FD_SET(fd, &rfds);
+ FD_SET(lfd, &rfds);
+
+ max = (lfd > fd ? lfd : fd);
+ if (select(fd + 1, &rfds, NULL, NULL, p) > 0) {
+ /* Someone woke us up */
+ if (FD_ISSET(lfd, &rfds)) {
+ cman_unlock(ch);
+ errno = EAGAIN;
+ return -1;
+ }
+
+ cman_dispatch(ch, 0);
+ ret = 0;
+ }
+ cman_unlock(ch);
+
+ return ret;
+}
+
+
+/**
+ This is used to establish and tear down pseudo-private
+ contexts which are shared with the cluster context.
+ */
+static int
+cluster_send_control_msg(msgctx_t *ctx, int type)
+{
+ cluster_msg_hdr_t cm;
+ int ret;
+
+ cm.msg_control = (uint8_t)type;
+ cm.src_nodeid = _me;
+ cm.dest_ctx = ctx->u.cluster_info.remote_ctx;
+ cm.src_ctx = ctx->u.cluster_info.local_ctx;
+ cm.msg_port = ctx->u.cluster_info.port;
+
+ swab_cluster_msg_hdr_t(&cm);
+
+ ret = (cman_send_data_unlocked((void *)&cm, sizeof(cm),
+ ctx->u.cluster_info.nodeid,
+ ctx->u.cluster_info.port, 0));
+ return ret;
+}
+
+
+/**
+ Wait for a message on a context.
+ */
+static int
+cluster_msg_wait(msgctx_t *ctx, int timeout)
+{
+ struct timespec ts = {0, 0};
+ int req = M_NONE;
+ int e;
+
+ errno = EINVAL;
+ if (!ctx)
+ return -1;
+ if (ctx->type != MSG_CLUSTER)
+ return -1;
+ if (!(ctx->flags & (SKF_READ | SKF_LISTEN)))
+ return -1;
+
+ if (timeout > 0) {
+ gettimeofday((struct timeval *)&ts, NULL);
+ ts.tv_sec += timeout;
+ ts.tv_nsec *= 1000;
+ }
+
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+ while (1) {
+
+ /* See if we dispatched any messages on to our queue */
+ if (ctx->u.cluster_info.queue) {
+ req = ctx->u.cluster_info.queue->message->msg_control;
+ /*printf("Queue not empty CTX%d : %d\n",
+ ctx->u.cluster_info.local_ctx, req);*/
+ break;
+ }
+
+ /* Ok, someone else has the mutex on our FD. Go to
+ sleep on a cond; maybe they'll wake us up */
+ e = pthread_cond_timedwait(&ctx->u.cluster_info.cond,
+ &ctx->u.cluster_info.mutex,
+ &ts);
+
+ if (timeout == 0) {
+ break;
+ }
+
+ if (e == 0) {
+ continue;
+ }
+
+ if (e == ETIMEDOUT) {
+ break;
+ }
+ }
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+ return req;
+}
+
+
+static int
+cluster_msg_fd_set(msgctx_t *ctx, fd_set *fds, int *max)
+{
+ int e;
+ msg_q_t *n;
+
+ errno = EINVAL;
+ if (!ctx || !fds)
+ return -1;
+ if (ctx->type != MSG_CLUSTER)
+ return -1;
+
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+ if (ctx->u.cluster_info.select_pipe[0] < 0) {
+ if (pipe(ctx->u.cluster_info.select_pipe) < 0) {
+ e = errno;
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ errno = e;
+ return -1;
+ }
+
+ /*
+ printf("%s: Created cluster CTX select pipe "
+ "rd=%d wr=%d\n", __FUNCTION__,
+ ctx->u.cluster_info.select_pipe[0],
+ ctx->u.cluster_info.select_pipe[1]);
+ */
+
+ /* Ok, we just created the pipe. Now, we need to write
+ a char for every unprocessed event to the pipe, because
+ events could be pending that would otherwise be unhandled
+ by the caller because the caller is switching to select()
+ semantics. (as opposed to msg_wait() ) */
+ list_do(&ctx->u.cluster_info.queue, n) {
+ write(ctx->u.cluster_info.select_pipe[1], "", 1);
+ } while (!list_done(&ctx->u.cluster_info.queue, n));
+ }
+
+ e = ctx->u.cluster_info.select_pipe[0];
+ //printf("%s: cluster %d\n", __FUNCTION__, e);
+ FD_SET(e, fds);
+
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+ if (max && (e > *max))
+ *max = e;
+ return 0;
+}
+
+
+int
+cluster_msg_fd_isset(msgctx_t *ctx, fd_set *fds)
+{
+ errno = EINVAL;
+
+ if (!fds || !ctx)
+ return -1;
+
+ if (ctx->type != MSG_CLUSTER)
+ return -1;
+
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+ if (ctx->u.cluster_info.select_pipe[0] >= 0 &&
+ FD_ISSET(ctx->u.cluster_info.select_pipe[0], fds)) {
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ return 1;
+ }
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ return 0;
+}
+
+
+int
+cluster_msg_fd_clr(msgctx_t *ctx, fd_set *fds)
+{
+ errno = EINVAL;
+
+ if (!fds || !ctx)
+ return -1;
+
+ if (ctx->type != MSG_CLUSTER)
+ return -1;
+
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+ if (ctx->u.cluster_info.select_pipe[0] >= 0) {
+ FD_CLR(ctx->u.cluster_info.select_pipe[0], fds);
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ return 1;
+ }
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ return 0;
+}
+
+
+static int
+_cluster_msg_receive(msgctx_t *ctx, void **msg, size_t *len)
+{
+ cluster_msg_hdr_t *m;
+ msg_q_t *n;
+ int ret = 0;
+ char foo;
+
+ if (msg)
+ *msg = NULL;
+ if (len)
+ *len = 0;
+
+ if (ctx->u.cluster_info.local_ctx < 0 ||
+ ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) {
+ errno = EBADF;
+ return -1;
+ }
+
+ /* trigger receive here */
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+
+ n = ctx->u.cluster_info.queue;
+ if (n == NULL) {
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ errno = EAGAIN;
+ return -1;
+ }
+
+ list_remove(&ctx->u.cluster_info.queue, n);
+
+ if (ctx->u.cluster_info.select_pipe[0] >= 0) {
+ //printf("%s read\n", __FUNCTION__);
+ read(ctx->u.cluster_info.select_pipe[0],
+ &foo, 1);
+ }
+
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+ m = n->message;
+ switch(m->msg_control) {
+ case M_CLOSE:
+ ctx->u.cluster_info.remote_ctx = 0;
+ break;
+ case M_OPEN_ACK:
+ /* Response to new connection */
+ ctx->u.cluster_info.remote_ctx = m->src_ctx;
+ break;
+ case M_DATA:
+ /* Kill the message control structure */
+ memmove(m, &m[1], n->len - sizeof(*m));
+ if (msg)
+ *msg = (void *)m;
+ else {
+ printf("Warning: dropping data message\n");
+ free(m);
+ }
+ if (len)
+ *len = (n->len - sizeof(*m));
+ ret = (n->len - sizeof(*m));
+ free(n);
+
+ //printf("Message received\n");
+ return ret;
+ case M_OPEN:
+ /* Someone is trying to open a connection */
+ default:
+ /* ?!! */
+ ret = -1;
+ break;
+ }
+
+ free(m);
+ free(n);
+
+ return ret;
+}
+
+
+/**
+ Receive a message from a cluster-context. This copies out the contents
+ into the user-specified buffer, and does random other things.
+ */
+static int
+cluster_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+ int req;
+ msg_q_t *n;
+ void *priv_msg;
+ size_t priv_len;
+ char foo;
+
+ errno = EINVAL;
+ if (!ctx)
+ return -1;
+ if (!(ctx->flags & SKF_READ))
+ return -1;
+
+ req = cluster_msg_wait(ctx, timeout);
+
+ switch (req) {
+ case M_DATA:
+ /* Copy out. */
+ req = _cluster_msg_receive(ctx, &priv_msg, &priv_len);
+ if (req < 0) {
+ printf("Ruh roh!\n");
+ return -1;
+ }
+
+ priv_len = (priv_len < maxlen ? priv_len : maxlen);
+
+ if (msg && maxlen)
+ memcpy(msg, priv_msg, priv_len);
+ free(priv_msg);
+ return req;
+ case M_CLOSE:
+ errno = ECONNRESET;
+ return -1;
+ case 0:
+ //printf("Nothing on queue\n");
+ return 0;
+ case M_STATECHANGE:
+ case M_PORTOPENED:
+ case M_PORTCLOSED:
+ case M_TRY_SHUTDOWN:
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+
+ n = ctx->u.cluster_info.queue;
+ if (n == NULL) {
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ errno = EAGAIN;
+ return -1;
+ }
+
+ list_remove(&ctx->u.cluster_info.queue, n);
+
+ if (ctx->u.cluster_info.select_pipe[0] >= 0) {
+ //printf("%s read\n", __FUNCTION__);
+ read(ctx->u.cluster_info.select_pipe[0],
+ &foo, 1);
+ }
+
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+ if (n->message)
+ free(n->message);
+ free(n);
+ return 0;
+ default:
+ printf("PROTOCOL ERROR: Received %d\n", req);
+ return -1;
+ }
+
+ printf("%s: CODE PATH ERROR\n", __FUNCTION__);
+ return -1;
+}
+
+
+/**
+ Open a connection to the specified node ID.
+ If the speficied node is 0, this connects via the socket in
+ /var/run/cluster...
+ */
+static int
+cluster_msg_open(int type, int nodeid, int port, msgctx_t *ctx, int timeout)
+{
+ int t = 0, ret;
+
+ errno = EINVAL;
+ if (!ctx)
+ return -1;
+
+ if (type != MSG_CLUSTER)
+ return -1;
+
+ /*printf("Opening pseudo channel to node %d\n", nodeid);*/
+
+ ctx->type = MSG_CLUSTER;
+ ctx->ops = &cluster_msg_ops;
+ ctx->flags = 0;
+ ctx->u.cluster_info.nodeid = nodeid;
+ ctx->u.cluster_info.port = port;
+ ctx->u.cluster_info.local_ctx = -1;
+ ctx->u.cluster_info.remote_ctx = 0;
+ ctx->u.cluster_info.queue = NULL;
+
+ /* Assign context index */
+ if (assign_ctx(ctx) < 0) {
+ errno = EAGAIN;
+ return -1;
+ }
+ ctx->flags = SKF_READ | SKF_WRITE;
+
+ if (nodeid == CMAN_NODEID_US) {
+ /* Broadcast pseudo ctx; no handshake needed */
+ ctx->flags |= SKF_MCAST;
+ return 0;
+ }
+
+ //printf(" Local CTX: %d\n", ctx->u.cluster_info.local_ctx);
+
+ /* Send open */
+ //printf(" Sending control message M_OPEN\n");
+ if (cluster_send_control_msg(ctx, M_OPEN) < 0) {
+ return -1;
+ }
+
+ /* Ok, wait for a response */
+ while (!is_established(ctx)) {
+ ++t;
+ if (t > timeout) {
+ cluster_msg_close(ctx);
+ errno = ETIMEDOUT;
+ return -1;
+ }
+
+ ret = cluster_msg_wait(ctx, 1);
+ switch(ret) {
+ case M_OPEN_ACK:
+ _cluster_msg_receive(ctx, NULL, NULL);
+ break;
+ case M_NONE:
+ continue;
+ default:
+ printf("PROTO ERROR: M_OPEN_ACK not received: %d %d\n",
+ ret, errno);
+ }
+ }
+
+ /*
+ printf(" Remote CTX: %d\n",
+ ctx->u.cluster_info.remote_ctx);
+ printf(" Pseudo channel established!\n");
+ */
+ return 0;
+}
+
+
+/**
+ Close a connection context (cluster or socket; it doesn't matter)
+ In the case of a cluster context, we need to clear out the
+ receive queue and what-not. This isn't a big deal. Also, we
+ need to tell the other end that we're done -- just in case it does
+ not know yet ;)
+
+ With a socket, the O/S cleans up the buffers for us.
+ */
+int
+cluster_msg_close(msgctx_t *ctx)
+{
+ msg_q_t *n = NULL;
+
+ errno = EINVAL;
+
+ if (!ctx)
+ return -1;
+ if (ctx->type != MSG_CLUSTER)
+ return -1;
+
+ if (ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) {
+ printf("Context invalid during close\n");
+ return -1;
+ }
+
+ pthread_mutex_lock(&context_lock);
+ /* Other threads should not be able to see this again */
+ if (contexts[ctx->u.cluster_info.local_ctx] &&
+ (contexts[ctx->u.cluster_info.local_ctx]->u.cluster_info.local_ctx ==
+ ctx->u.cluster_info.local_ctx)) {
+ //printf("reclaimed context %d\n",
+ //ctx->u.cluster_info.local_ctx);
+ contexts[ctx->u.cluster_info.local_ctx] = NULL;
+ }
+ pthread_mutex_unlock(&context_lock);
+
+ /* Clear receive queue */
+ while ((n = ctx->u.cluster_info.queue) != NULL) {
+ list_remove(&ctx->u.cluster_info.queue, n);
+ free(n->message);
+ free(n);
+ }
+ /* Send close message */
+ if (ctx->u.cluster_info.remote_ctx != 0) {
+ cluster_send_control_msg(ctx, M_CLOSE);
+ }
+
+ /* Close pipe if it's open */
+ if (ctx->u.cluster_info.select_pipe[0] >= 0) {
+ close(ctx->u.cluster_info.select_pipe[0]);
+ ctx->u.cluster_info.select_pipe[0] = -1;
+ }
+ if (ctx->u.cluster_info.select_pipe[1] >= 0) {
+ close(ctx->u.cluster_info.select_pipe[1]);
+ ctx->u.cluster_info.select_pipe[1] = -1;
+ }
+ ctx->type = MSG_NONE;
+ ctx->ops = NULL;
+ return 0;
+}
+
+
+static void
+queue_for_context(msgctx_t *ctx, char *buf, int len)
+{
+ msg_q_t *node;
+
+ while ((node = malloc(sizeof(*node))) == NULL) {
+ sleep(1);
+ }
+ memset(node, 0, sizeof(*node));
+ while ((node->message = malloc(len)) == NULL) {
+ sleep(1);
+ }
+ memcpy(node->message, buf, len);
+ node->len = len;
+
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+ list_insert(&ctx->u.cluster_info.queue, node);
+ /* If a select pipe was set up, wake it up */
+ if (ctx->u.cluster_info.select_pipe[1] >= 0) {
+ //printf("QUEUE_FOR_CONTEXT write\n");
+ if (write(ctx->u.cluster_info.select_pipe[1], "", 1) < 0)
+ perror("queue_for_context write");
+ }
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ pthread_cond_signal(&ctx->u.cluster_info.cond);
+}
+
+
+/**
+ Called by cman_dispatch to deal with messages coming across the
+ cluster socket. This function deals with fanning out the requests
+ and putting them on the per-context queues. We don't have
+ the benefits of pre-configured buffers, so we need this.
+ */
+static void
+process_cman_msg(cman_handle_t h, void *priv, char *buf, int len,
+ uint8_t port, int nodeid)
+{
+ cluster_msg_hdr_t *m = (cluster_msg_hdr_t *)buf;
+ int x;
+
+ if (len < sizeof(*m)) {
+ printf("Message too short.\n");
+ return;
+ }
+
+ swab_cluster_msg_hdr_t(m);
+
+#ifdef DEBUG
+ printf("Processing ");
+ switch(m->msg_control) {
+ case M_NONE:
+ printf("M_NONE\n");
+ break;
+ case M_OPEN:
+ printf("M_OPEN\n");
+ break;
+ case M_OPEN_ACK:
+ printf("M_OPEN_ACK\n");
+ break;
+ case M_DATA:
+ printf("M_DATA\n");
+ break;
+ case M_CLOSE:
+ printf("M_CLOSE\n");
+ break;
+ }
+
+ printf(" Node ID: %d %d\n", m->src_nodeid, nodeid);
+ printf(" Remote CTX: %d Local CTX: %d\n", m->src_ctx, m->dest_ctx);
+#endif
+
+ if (m->dest_ctx >= MAX_CONTEXTS || m->dest_ctx < 0) {
+ printf("Context invalid; ignoring\n");
+ return;
+ }
+
+ pthread_mutex_lock(&context_lock);
+
+ if (m->dest_ctx == 0 && m->msg_control == M_DATA) {
+ /* Copy & place on all broadcast queues if it's a broadcast
+ M_DATA message... */
+ for (x = 0; x < MAX_CONTEXTS; x++) {
+ if (!contexts[x])
+ continue;
+ if (contexts[x]->type != MSG_CLUSTER)
+ continue;
+ if (!(contexts[x]->flags & SKF_MCAST))
+ continue;
+ if (!(contexts[x]->flags & SKF_READ))
+ continue;
+
+ queue_for_context(contexts[x], buf, len);
+ }
+ } else if (contexts[m->dest_ctx]) {
+ /* Normal receive */
+ queue_for_context(contexts[m->dest_ctx], buf, len);
+ }
+ /* If none of the above, then we msg for something we've already
+ detached from our list. No big deal, just ignore. */
+
+ pthread_mutex_unlock(&context_lock);
+ return;
+}
+
+
+/**
+ Accept a new pseudo-private connection coming in over the
+ cluster socket.
+ */
+static int
+cluster_msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx)
+{
+ errno = EINVAL;
+ cluster_msg_hdr_t *m;
+ msg_q_t *n;
+ char done = 0;
+ char foo;
+
+ if (!listenctx || !acceptctx)
+ return -1;
+ if (listenctx->u.cluster_info.local_ctx != 0)
+ return -1;
+ if (!(listenctx->flags & SKF_LISTEN))
+ return -1;
+
+ listenctx->ops->mo_init(acceptctx);
+
+ pthread_mutex_lock(&listenctx->u.cluster_info.mutex);
+
+ n = listenctx->u.cluster_info.queue;
+ if (n == NULL) {
+ pthread_mutex_unlock(&listenctx->u.cluster_info.mutex);
+ errno = EAGAIN;
+ return -1;
+ }
+
+ /* the OPEN should be the first thing on the list; this loop
+ is probably not necessary */
+ list_do(&listenctx->u.cluster_info.queue, n) {
+
+ m = n->message;
+ switch(m->msg_control) {
+ case M_OPEN:
+ list_remove(&listenctx->u.cluster_info.queue, n);
+ /*printf("Accepting connection from %d %d\n",
+ m->src_nodeid, m->src_ctx);*/
+
+ /* New connection */
+ pthread_mutex_init(&acceptctx->u.cluster_info.mutex,
+ NULL);
+ pthread_cond_init(&acceptctx->u.cluster_info.cond,
+ NULL);
+ acceptctx->u.cluster_info.queue = NULL;
+ acceptctx->u.cluster_info.remote_ctx = m->src_ctx;
+ acceptctx->u.cluster_info.nodeid = m->src_nodeid;
+ acceptctx->u.cluster_info.port = m->msg_port;
+ acceptctx->flags = (SKF_READ | SKF_WRITE);
+
+ if (assign_ctx(acceptctx) < 0) {
+ printf("FAILED TO ASSIGN CONTEXT\n");
+ }
+ cluster_send_control_msg(acceptctx, M_OPEN_ACK);
+
+ if (listenctx->u.cluster_info.select_pipe[0] >= 0) {
+ //printf("%s read\n", __FUNCTION__);
+ read(listenctx->u.cluster_info.select_pipe[0],
+ &foo, 1);
+ }
+
+ done = 1;
+ free(m);
+ free(n);
+
+ break;
+ case M_DATA:
+ /* Data messages (i.e. from broadcast msgs) are
+ okay too!... but we don't handle them here */
+ break;
+ default:
+ /* Other message?! */
+ printf("Odd... %d\n", m->msg_control);
+ break;
+ }
+
+ if (done)
+ break;
+
+ } while (!list_done(&listenctx->u.cluster_info.queue, n));
+
+ pthread_mutex_unlock(&listenctx->u.cluster_info.mutex);
+
+ return 0;
+}
+
+
+/**
+ This waits for events on the cluster comms FD and
+ dispatches them using cman_dispatch. Initially,
+ the design had no permanent threads, but that model
+ proved difficult to implement correctly.
+ */
+static void *
+cluster_comms_thread(void *arg)
+{
+ /* SIGUSR2 will cause select() to abort */
+ while (thread_running) {
+ poll_cluster_messages(2);
+ }
+
+ return NULL;
+}
+
+
+/*
+ Transliterates a CMAN event to a control message
+ */
+static void
+process_cman_event(cman_handle_t handle, void *private, int reason, int arg)
+{
+ cluster_msg_hdr_t *msg;
+ int *argp;
+ msg_q_t *node;
+ msgctx_t *ctx;
+
+#if 0
+ printf("EVENT: %p %p %d %d\n", handle, private, reason, arg);
+#endif
+
+ /* Allocate queue node */
+ while ((node = malloc(sizeof(*node))) == NULL) {
+ sleep(1);
+ }
+ memset(node, 0, sizeof(*node));
+
+ /* Allocate message: header + int (for arg) */
+ while ((msg = malloc(sizeof(int)*2 +
+ sizeof(cluster_msg_hdr_t))) == NULL) {
+ sleep(1);
+ }
+ memset(msg, 0, sizeof(int)*2 +sizeof(cluster_msg_hdr_t));
+
+ switch(reason) {
+#if defined(LIBCMAN_VERSION) && LIBCMAN_VERSION >= 2
+ case CMAN_REASON_PORTOPENED:
+ msg->msg_control = M_PORTOPENED;
+ break;
+ case CMAN_REASON_TRY_SHUTDOWN:
+ msg->msg_control = M_TRY_SHUTDOWN;
+ break;
+#endif
+ case CMAN_REASON_PORTCLOSED:
+ msg->msg_control = M_PORTCLOSED;
+ break;
+ case CMAN_REASON_STATECHANGE:
+ msg->msg_control = M_STATECHANGE;
+ break;
+ }
+
+ argp = ((void *)msg + sizeof(cluster_msg_hdr_t));
+ *argp = arg;
+
+ node->len = sizeof(cluster_msg_hdr_t) + sizeof(int);
+ node->message = msg;
+
+ pthread_mutex_lock(&context_lock);
+ ctx = contexts[0]; /* This is the cluster context... */
+ if (!ctx) {
+ /* We received a close for something we've already
+ detached from our list. No big deal, just
+ ignore. */
+ free(node->message);
+ free(node);
+ pthread_mutex_unlock(&context_lock);
+ return;
+ }
+ pthread_mutex_unlock(&context_lock);
+
+ pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+ list_insert(&ctx->u.cluster_info.queue, node);
+ /* If a select pipe was set up, wake it up */
+ if (ctx->u.cluster_info.select_pipe[1] >= 0) {
+ //printf("PROCESS_CMAN_EVENT write\n");
+ if (write(ctx->u.cluster_info.select_pipe[1], "", 1) < 0)
+ perror("process_cman_event write");
+ }
+ pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+ pthread_cond_signal(&ctx->u.cluster_info.cond);
+}
+
+
+/* */
+int
+cluster_msg_listen(int me, void *portp, msgctx_t **cluster_ctx)
+{
+ int e;
+ pthread_attr_t attrs;
+ cman_handle_t *ch = NULL;
+ msgctx_t *ctx;
+ int port;
+
+ errno = EINVAL;
+ if (!portp)
+ return -1;
+ port = *(int *)portp;
+ if (port < 10 || port > 254)
+ return -1;
+
+ ch = cman_lock(1, 0);
+ _me = me;
+
+ /* Set up cluster context */
+ ctx = msg_new_ctx();
+ if (!ctx) {
+ cman_unlock(ch);
+ errno = EINVAL;
+ return -1;
+ }
+
+ memset(contexts, 0, sizeof(contexts));
+
+ *cluster_ctx = ctx;
+ if (cman_start_recv_data(ch, process_cman_msg,
+ port) != 0) {
+ e = errno;
+ cman_unlock(ch);
+ msg_free_ctx((msgctx_t *)*cluster_ctx);
+ errno = e;
+ return -1;
+ }
+
+ if (cman_start_notification(ch, process_cman_event) != 0) {
+ e = errno;
+ cman_unlock(ch);
+ msg_free_ctx((msgctx_t *)*cluster_ctx);
+ errno = e;
+ }
+
+ cman_unlock(ch);
+ /* Done with CMAN bits */
+
+ pthread_mutex_lock(&context_lock);
+
+ memset(contexts, 0, sizeof(contexts));
+ contexts[0] = ctx;
+
+ ctx->type = MSG_CLUSTER;
+ ctx->ops = &cluster_msg_ops;
+ ctx->u.cluster_info.local_ctx = 0;
+ ctx->u.cluster_info.remote_ctx = 0;
+ ctx->u.cluster_info.port = port; /* port! */
+ ctx->u.cluster_info.nodeid = 0; /* Broadcast! */
+ ctx->u.cluster_info.select_pipe[0] = -1;
+ ctx->u.cluster_info.select_pipe[1] = -1;
+ ctx->u.cluster_info.queue = NULL;
+ pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL);
+ pthread_cond_init(&ctx->u.cluster_info.cond, NULL);
+ ctx->flags = SKF_LISTEN | SKF_READ | SKF_WRITE | SKF_MCAST;
+ pthread_mutex_unlock(&context_lock);
+
+ pthread_attr_init(&attrs);
+ pthread_attr_setinheritsched(&attrs, PTHREAD_INHERIT_SCHED);
+ pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
+
+ thread_running = 1;
+ pthread_create(&comms_thread, &attrs, cluster_comms_thread, NULL);
+
+ pthread_attr_destroy(&attrs);
+
+ return 0;
+}
+
+
+static void
+cluster_msg_print(msgctx_t *ctx)
+{
+ if (!ctx)
+ return;
+
+ printf("Cluster Message Context %p\n", ctx);
+ printf(" Flags %08x\n", ctx->flags);
+ printf(" Node ID %d\n", ctx->u.cluster_info.nodeid);
+ printf(" Local CTX %d\n", ctx->u.cluster_info.local_ctx);
+ printf(" Remote CTX %d\n", ctx->u.cluster_info.remote_ctx);
+}
+
+
+int
+cluster_msg_shutdown(void)
+{
+ cman_handle_t *ch;
+
+ ch = cman_lock(1, SIGUSR2);
+ cman_end_recv_data(ch);
+ pthread_kill(comms_thread, SIGTERM);
+ cman_unlock(ch);
+
+ return 0;
+}
+
+
+int
+cluster_msg_init(msgctx_t *ctx)
+{
+ errno = EINVAL;
+ if (!ctx)
+ return -1;
+
+ memset(ctx, 0, sizeof(*ctx));
+ ctx->type = MSG_CLUSTER;
+ ctx->ops = &cluster_msg_ops;
+ pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL);
+ pthread_cond_init(&ctx->u.cluster_info.cond, NULL);
+ ctx->u.cluster_info.select_pipe[0] = -1;
+ ctx->u.cluster_info.select_pipe[1] = -1;
+
+ return 0;
+}
+
+
+static msg_ops_t cluster_msg_ops = {
+ .mo_open = cluster_msg_open,
+ .mo_close = cluster_msg_close,
+ .mo_listen = cluster_msg_listen,
+ .mo_accept = cluster_msg_accept,
+ .mo_shutdown = cluster_msg_shutdown,
+ .mo_wait = cluster_msg_wait,
+ .mo_send = cluster_msg_send,
+ .mo_receive = cluster_msg_receive,
+ .mo_fd_set = cluster_msg_fd_set,
+ .mo_fd_isset = cluster_msg_fd_isset,
+ .mo_fd_clr = cluster_msg_fd_clr,
+ .mo_print = cluster_msg_print,
+ .mo_init = cluster_msg_init
+};
/cvs/cluster/cluster/rgmanager/src/clulib/msg_socket.c,v --> standard output
revision 1.1
--- cluster/rgmanager/src/clulib/msg_socket.c
+++ - 2006-07-11 23:52:43.913279000 +0000
@@ -0,0 +1,432 @@
+/*
+ Copyright Red Hat, Inc. 2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
+#define _MESSAGE_BUILD
+#include <message.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/types.h>
+#include <stdlib.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/un.h>
+#include <errno.h>
+#include <sys/time.h>
+#include <fdops.h>
+
+/* Ripped from ccsd's setup_local_socket */
+#define RGMGR_SOCK "/var/run/cluster/rgmanager.sk"
+
+static msg_ops_t sock_msg_ops;
+
+static int
+sock_connect(void)
+{
+ struct sockaddr_un sun;
+ int sock = -1, error = 0;
+
+ memset(&sun, 0, sizeof(sun));
+ sun.sun_family = PF_LOCAL;
+ snprintf(sun.sun_path, sizeof(sun.sun_path), RGMGR_SOCK);
+
+ sock = socket(PF_LOCAL, SOCK_STREAM, 0);
+ if (sock < 0) {
+ error = errno;
+ goto fail;
+ }
+
+ error = connect(sock, (struct sockaddr *)&sun, sizeof(sun));
+ if (error < 0) {
+ error = errno;
+ close(sock);
+ errno = error;
+ sock = -1;
+ goto fail;
+ }
+
+fail:
+
+ return sock;
+}
+
+
+/**
+ Wrapper around write(2)
+ */
+static int
+sock_msg_send(msgctx_t *ctx, void *msg, size_t len)
+{
+ char buf[4096];
+ int ret;
+ local_msg_hdr_t *h = (local_msg_hdr_t *)buf;
+ char *msgptr = (buf + sizeof(*h));
+
+ if (!ctx)
+ return -1;
+ if (!(ctx->flags & SKF_WRITE))
+ return -1;
+
+ /* encapsulate ... ? */
+ if ((len + sizeof(*h)) > sizeof(buf)) {
+ errno = E2BIG;
+ return -1;
+ }
+
+ h->msg_control = M_DATA;
+ h->msg_len = len;
+ memcpy(msgptr, msg, len);
+
+ ret = _write_retry(ctx->u.local_info.sockfd, buf,
+ len + sizeof(*h), NULL);
+
+ if (ret >= sizeof(*h))
+ return (ret - (sizeof(*h)));
+
+ errno = EAGAIN;
+ return -1;
+}
+
+
+static int
+peekypeeky(int fd)
+{
+ local_msg_hdr_t h;
+ int ret;
+
+ while ((ret = recv(fd, (void *)&h, sizeof(h), MSG_PEEK)) < 0) {
+ if (errno == EINTR)
+ continue;
+ return -1;
+ }
+
+ if (ret == sizeof(h))
+ return h.msg_control;
+
+ if (ret == 0)
+ /* Socket closed? */
+ return M_CLOSE;
+
+ /* XXX */
+ printf("PROTOCOL ERROR: Invalid message\n");
+ return M_CLOSE;
+}
+
+
+static int
+sock_msg_wait(msgctx_t *ctx, int timeout)
+{
+ fd_set rfds;
+ struct timeval tv = {0, 0};
+ struct timeval *p = NULL;
+
+ if (timeout >= 0) {
+ tv.tv_sec = timeout;
+ p = &tv;
+ }
+
+ FD_ZERO(&rfds);
+ FD_SET(ctx->u.local_info.sockfd, &rfds);
+
+ if (_select_retry(ctx->u.local_info.sockfd + 1, &rfds,
+ NULL, NULL, p) == 1) {
+ return peekypeeky(ctx->u.local_info.sockfd);
+ }
+
+ return M_NONE;
+}
+
+
+static int
+sock_msg_fd_set(msgctx_t *ctx, fd_set *fds, int *max)
+{
+ errno = EINVAL;
+ if (ctx->type != MSG_SOCKET)
+ return -1;
+ if (!fds)
+ return -1;
+
+ if (ctx->u.local_info.sockfd >= 0) {
+ FD_SET(ctx->u.local_info.sockfd, fds);
+ if (ctx->u.local_info.sockfd > *max)
+ *max = ctx->u.local_info.sockfd;
+ return 0;
+ }
+
+ return -1;
+}
+
+
+static int
+sock_msg_fd_isset(msgctx_t *ctx, fd_set *fds)
+{
+ errno = EINVAL;
+ if (!fds || !ctx)
+ return -1;
+ if (ctx->type != MSG_SOCKET)
+ return -1;
+
+ if (ctx->u.local_info.sockfd >= 0 &&
+ FD_ISSET(ctx->u.local_info.sockfd, fds)) {
+ return 1;
+ }
+ return 0;
+}
+
+
+int
+sock_msg_fd_clr(msgctx_t *ctx, fd_set *fds)
+{
+ errno = EINVAL;
+ if (!fds || !ctx)
+ return -1;
+ if (ctx->type != MSG_SOCKET)
+ return -1;
+
+ if (ctx->u.local_info.sockfd >= 0) {
+ FD_CLR(ctx->u.local_info.sockfd, fds);
+ return 1;
+ }
+ return 0;
+}
+
+
+static int
+_local_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+ struct timeval tv = {0, 0};
+ struct timeval *p = NULL;
+ local_msg_hdr_t h;
+
+ if (timeout > 0) {
+ tv.tv_sec = timeout;
+ p = &tv;
+ }
+
+ if (_read_retry(ctx->u.local_info.sockfd, &h, sizeof(h), p) < 0)
+ return -1;
+
+ if (maxlen < h.msg_len) {
+ printf("WARNING: Buffer too small for message (%d vs %d)!\n",
+ h.msg_len, (int)maxlen);
+ h.msg_len = maxlen;
+ }
+
+ return _read_retry(ctx->u.local_info.sockfd, msg, h.msg_len, p);
+}
+
+
+/**
+ Receive a message from a cluster-context. This copies out the contents
+ into the user-specified buffer, and does random other things.
+ */
+static int
+sock_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+ int req;
+ char priv_msg[4096];
+ size_t priv_len = sizeof(priv_msg);
+
+ errno = EINVAL;
+ if (!msg || !maxlen)
+ return -1;
+ if (ctx->type != MSG_SOCKET)
+ return -1;
+ if (!(ctx->flags & SKF_READ))
+ return -1;
+
+ req = _local_msg_receive(ctx, priv_msg, priv_len, timeout);
+
+ if (req == 0) {
+ errno = ECONNRESET;
+ return -1;
+ }
+
+ if (req < 0)
+ return -1;
+
+ /* Copy out. */
+ priv_len = (priv_len < maxlen ? priv_len : maxlen);
+
+ memcpy(msg, priv_msg, priv_len);
+ return req;
+
+ printf("%s: CODE PATH ERROR\n", __FUNCTION__);
+ return -1;
+}
+
+
+/**
+ Open a connection to the specified node ID.
+ If the speficied node is 0, this connects via the socket in
+ /var/run/cluster...
+ */
+int
+sock_msg_open(int type, int nodeid, int port, msgctx_t *ctx, int timeout)
+{
+ errno = EINVAL;
+ if (!ctx || ctx->type != MSG_SOCKET)
+ return -1;
+ if (type != MSG_SOCKET)
+ return -1;
+
+ if (nodeid != CMAN_NODEID_US)
+ return -1;
+ if ((ctx->u.local_info.sockfd = sock_connect()) < 0)
+ return -1;
+ ctx->flags = (SKF_READ | SKF_WRITE);
+ return 0;
+}
+
+
+/**
+ With a socket, the O/S cleans up the buffers for us.
+ */
+int
+sock_msg_close(msgctx_t *ctx)
+{
+ errno = EINVAL;
+ if (ctx->type != MSG_SOCKET)
+ return -1;
+
+ close(ctx->u.local_info.sockfd);
+ ctx->u.local_info.sockfd = -1;
+ ctx->flags = 0;
+ ctx->type = MSG_NONE;
+ return 0;
+}
+
+
+/**
+ Accept a new pseudo-private connection coming in over the
+ cluster socket.
+ */
+static int
+sock_msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx)
+{
+ errno = EINVAL;
+
+ if (!listenctx || !acceptctx)
+ return -1;
+ if (listenctx->u.local_info.sockfd < 0)
+ return -1;
+ if (!(listenctx->flags & SKF_LISTEN))
+ return -1;
+
+ listenctx->ops->mo_init(acceptctx);
+ acceptctx->u.local_info.sockfd =
+ accept(listenctx->u.local_info.sockfd, NULL, NULL);
+
+ if (acceptctx->u.local_info.sockfd < 0)
+ return -1;
+
+ acceptctx->flags = (SKF_READ | SKF_WRITE);
+ return 0;
+}
+
+
+int
+sock_msg_listen(int me, void *portp, msgctx_t **listen_ctx)
+{
+ int sock;
+ struct sockaddr_un su;
+ mode_t om;
+ msgctx_t *ctx = NULL;
+ char *path = (char *)portp;
+
+ /* Set up cluster context */
+ ctx = msg_new_ctx();
+ if (!ctx)
+ return -1;
+
+ sock = socket(PF_LOCAL, SOCK_STREAM, 0);
+ if (sock < 0)
+ return -1;
+
+ unlink(RGMGR_SOCK);
+ om = umask(077);
+ su.sun_family = PF_LOCAL;
+ snprintf(su.sun_path, sizeof(su.sun_path), path);
+
+ if (bind(sock, &su, sizeof(su)) < 0) {
+ umask(om);
+ goto fail;
+ }
+ umask(om);
+
+ if (listen(sock, SOMAXCONN) < 0)
+ goto fail;
+
+ ctx->type = MSG_SOCKET;
+ ctx->u.local_info.sockfd = sock;
+ ctx->flags = SKF_LISTEN;
+ ctx->ops = &sock_msg_ops;
+ *listen_ctx = ctx;
+ return 0;
+fail:
+ if (ctx)
+ free(ctx);
+ if (sock >= 0)
+ close(sock);
+ return -1;
+}
+
+
+/* XXX INCOMPLETE - no local_ctx setup*/
+int
+sock_msg_init(msgctx_t *ctx)
+{
+ memset(ctx,0,sizeof(*ctx));
+ ctx->type = MSG_SOCKET;
+ ctx->u.local_info.sockfd = -1;
+ ctx->ops = &sock_msg_ops;
+ return 0;
+}
+
+
+void
+sock_msg_print(msgctx_t *ctx)
+{
+ printf("Socket Message Context; fd = %d\n", ctx->u.local_info.sockfd);
+}
+
+
+/* XXX INCOMPLETE */
+int
+sock_msg_shutdown(void)
+{
+ return 0;
+}
+
+
+static msg_ops_t sock_msg_ops = {
+ .mo_open = sock_msg_open,
+ .mo_close = sock_msg_close,
+ .mo_listen = sock_msg_listen,
+ .mo_accept = sock_msg_accept,
+ .mo_shutdown = sock_msg_shutdown,
+ .mo_wait = sock_msg_wait,
+ .mo_send = sock_msg_send,
+ .mo_receive = sock_msg_receive,
+ .mo_fd_set = sock_msg_fd_set,
+ .mo_fd_isset = sock_msg_fd_isset,
+ .mo_fd_clr = sock_msg_fd_clr,
+ .mo_print = sock_msg_print,
+ .mo_init = sock_msg_init
+};
/cvs/cluster/cluster/rgmanager/src/clulib/msgtest.c,v --> standard output
revision 1.1
--- cluster/rgmanager/src/clulib/msgtest.c
+++ - 2006-07-11 23:52:43.995452000 +0000
@@ -0,0 +1,286 @@
+/*
+ Copyright Red Hat, Inc. 2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
+#include <message.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/select.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <cman-private.h>
+
+#define MYPORT 190
+
+int my_node_id = 0;
+int running = 1;
+
+
+void
+sighandler(int sig)
+{
+ running = 0;
+}
+
+
+void *
+piggyback(void *arg)
+{
+ msgctx_t ctx;
+ char buf[4096];
+
+ if (msg_open(MSG_CLUSTER, 0, MYPORT, &ctx, 0) != 0) {
+ printf("Could not set up mcast socket!\n");
+ return NULL;
+ }
+
+ printf("PIGGYBACK CONTEXT\n");
+ msg_print(&ctx);
+ printf("END PIGGYBACK CONTEXT\n");
+
+ while (running) {
+ if (msg_receive(&ctx, buf, sizeof(buf), 2) > 0) {
+ printf("Piggyback received: %s\n", buf);
+ }
+ }
+
+ msg_close(&ctx);
+
+ printf("PIGGY flies...\n");
+
+ return NULL;
+}
+
+
+void *
+private(void *arg)
+{
+ msgctx_t ctx;
+ char buf[4096];
+
+ while (running) {
+ sleep(3);
+
+ /* use pseudoprivate channel */
+ if (msg_open(MSG_CLUSTER, my_node_id, MYPORT, &ctx, 1) != 0) {
+ printf("Could not set up virtual-socket!\n");
+ return NULL;
+ }
+
+ printf("=== pvt thread channel info ===\n");
+ msg_print(&ctx);
+ printf("=== end pvt thread channel info ===\n");
+ fflush(stdout);
+
+ snprintf(buf, sizeof(buf), "Hello!\n");
+ msg_send(&ctx, buf, strlen(buf)+1);
+
+ if (msg_receive(&ctx, buf, sizeof(buf), 10) > 0) {
+ printf("PRIVATE: Received %s\n", buf);
+ fflush(stdout);
+ }
+
+ msg_close(&ctx);
+
+ if (msg_open(MSG_CLUSTER, 0, MYPORT, &ctx, 1) != 0) {
+ printf("Could not set up mcast socket!\n");
+ return NULL;
+ }
+
+ snprintf(buf, sizeof(buf), "Babble, babble\n");
+ msg_send(&ctx, buf, strlen(buf)+1);
+ if (msg_receive(&ctx, buf, sizeof(buf), 1) > 0) {
+ printf("PRIVATE: Via MCAST %s\n", buf);
+ fflush(stdout);
+ }
+ msg_close(&ctx);
+ }
+
+ printf("Private thread is outta here...\n");
+
+ return NULL;
+}
+
+
+void
+clu_initialize(cman_handle_t **ch)
+{
+ if (!ch)
+ exit(1);
+
+ *ch = cman_init(NULL);
+ if (!(*ch)) {
+ printf("Waiting for CMAN to start\n");
+
+ while (!(*ch = cman_init(NULL))) {
+ sleep(1);
+ }
+ }
+
+ if (!cman_is_quorate(*ch)) {
+ /*
+ There are two ways to do this; this happens to be the simpler
+ of the two. The other method is to join with a NULL group
+ and log in -- this will cause the plugin to not select any
+ node group (if any exist).
+ */
+ printf("Waiting for quorum to form\n");
+
+ while (cman_is_quorate(*ch) == 0) {
+ sleep(1);
+ }
+ printf("Quorum formed, starting\n");
+ }
+}
+
+
+int
+side_message(msgctx_t *ctx)
+{
+ msgctx_t actx;
+ char buf[1024];
+
+ if (msg_accept(ctx, &actx) < 0)
+ return -1;
+
+ printf("=== MAIN: Handling side message ===\n");
+ msg_print(&actx);
+ fflush(stdout);
+
+ if (msg_receive(&actx, buf, sizeof(buf), 10) > 0) {
+ printf("MAIN: Received %s\n", buf);
+ snprintf(buf, sizeof(buf), "Goodbye!\n");
+ msg_send(&actx, buf, strlen(buf)+1);
+ }
+
+ msg_close(&actx);
+
+ printf("=== MAIN: end side message ===\n");
+
+ return 0;
+}
+
+
+void
+malloc_dump_table(int, int);
+
+
+void
+sigusr2_handler(int sig)
+{
+}
+
+int
+main(int argc, char **argv)
+{
+ msgctx_t *cluster_ctx;
+ char recvbuf[128];
+ cman_node_t me;
+ int ret;
+ pthread_t piggy, priv;
+ fd_set rfds;
+ int max = 0;
+ int port = MYPORT;
+ cman_handle_t *clu = NULL;
+
+
+ clu_initialize(&clu);
+
+ if (cman_init_subsys(clu) < 0) {
+ perror("cman_init_subsys");
+ return -1;
+ }
+ cman_get_node(clu, CMAN_NODEID_US, &me);
+
+ my_node_id = me.cn_nodeid;
+
+ if (msg_listen(MSG_CLUSTER, (void *)&port,
+ me.cn_nodeid, &cluster_ctx) < 0) {
+ printf("Couldn't set up cluster message system\n");
+ return -1;
+ }
+
+ signal(SIGTERM, sigusr2_handler);
+ signal(SIGUSR2, sigusr2_handler);
+
+ pthread_create(&piggy, NULL, piggyback, NULL);
+ pthread_create(&priv, NULL, private, NULL);
+
+ msg_print(cluster_ctx);
+ while (running) {
+ max = 0;
+ FD_ZERO(&rfds);
+ FD_SET(STDIN_FILENO, &rfds);
+ msg_fd_set(cluster_ctx, &rfds, &max);
+
+ select(max+1, &rfds, NULL, NULL, NULL);
+
+ if (FD_ISSET(STDIN_FILENO, &rfds)) {
+ fgets(recvbuf, 128, stdin);
+ if (recvbuf[0] == 'q' || recvbuf[0] == 'Q')
+ break;
+ if (msg_send(cluster_ctx, recvbuf,
+ strlen(recvbuf)+1) < 0)
+ perror("msg_send");
+ FD_CLR(STDIN_FILENO, &rfds);
+ }
+
+ if (!msg_fd_isset(cluster_ctx, &rfds))
+ continue;
+
+ ret = msg_wait(cluster_ctx, 1);
+
+ switch(ret) {
+ case M_DATA:
+ msg_receive(cluster_ctx, recvbuf, 128, 10);
+ printf("MAIN: received %s\n", recvbuf);
+ break;
+ case M_OPEN:
+ printf("MAIN: private connection detected\n");
+ side_message(cluster_ctx);
+ break;
+ case 0:
+ /* No data; probably a control msg */
+ break;
+ default:
+ printf("Cluster EV: %d\n", ret);
+ /* Cluster events, etc. */
+ msg_receive(cluster_ctx, recvbuf, 128, 0);
+ }
+ }
+
+ printf("Shutting down...\n");
+
+ running = 0;
+
+ pthread_join(piggy, NULL);
+ pthread_join(priv, NULL);
+
+ msg_close(cluster_ctx);
+ msg_free_ctx(cluster_ctx);
+ msg_shutdown();
+
+ cman_finish(clu);
+
+ malloc_dump_table(0, 1024);
+
+ exit(0);
+}
--- cluster/rgmanager/src/clulib/Makefile 2006/06/02 17:37:10 1.8
+++ cluster/rgmanager/src/clulib/Makefile 2006/07/11 23:52:41 1.9
@@ -21,7 +21,7 @@
CFLAGS+= -g -Wstrict-prototypes -Wshadow -fPIC -D_GNU_SOURCE
-TARGETS=libclulib.a liblalloc.a
+TARGETS=libclulib.a liblalloc.a msgtest
all: ${TARGETS}
@@ -29,9 +29,12 @@
uninstall:
+msgtest: msgtest.o libclulib.a
+ gcc -o msgtest msgtest.o -lcman -L. -lclulib -llalloc -lpthread
+
libclulib.a: clulog.o daemon_init.o signals.o msgsimple.o \
- gettid.o rg_strings.o message.o members.o fdops.o
- # lock.o lockspace.o
+ gettid.o rg_strings.o message.o members.o fdops.o \
+ lock.o cman.o vft.o msg_cluster.o msg_socket.o
${AR} cru $@ $^
ranlib $@
--- cluster/rgmanager/src/clulib/alloc.c 2006/01/20 16:27:24 1.7
+++ cluster/rgmanager/src/clulib/alloc.c 2006/07/11 23:52:41 1.8
@@ -129,8 +129,9 @@
#undef AGGR_RECLAIM /* consolidate_all on free (*slow*) */
#undef STACKSIZE /*4 backtrace to store if DEBUG is set */
+//#define STACKSIZE 4
-#undef GDB_HOOK /* Dump program addresses in malloc_table
+#undef GDB_HOOK /* Dump program addresses in malloc_table
using a fork/exec of gdb (SLOW but fun)
building this defeats the purpose of
a bounded memory allocator, and is only
--- cluster/rgmanager/src/clulib/lock.c 2006/06/13 19:22:38 1.1
+++ cluster/rgmanager/src/clulib/lock.c 2006/07/11 23:52:41 1.2
@@ -31,6 +31,10 @@
#include <sys/select.h>
#include <pthread.h>
+/* Default lockspace stuff */
+static dlm_lshandle_t _default_ls = NULL;
+static pthread_mutex_t _default_lock = PTHREAD_MUTEX_INITIALIZER;
+
static void
ast_function(void * __attribute__ ((unused)) arg)
@@ -39,7 +43,7 @@
static int
-wait_for_dlm_event(dlm_lshandle_t *ls)
+wait_for_dlm_event(dlm_lshandle_t ls)
{
fd_set rfds;
int fd = dlm_ls_get_fd(ls);
@@ -55,15 +59,18 @@
int
-clu_lock(dlm_lshandle_t ls,
- int mode,
- struct dlm_lksb *lksb,
- int options,
- char *resource)
+clu_ls_lock(dlm_lshandle_t ls,
+ int mode,
+ struct dlm_lksb *lksb,
+ int options,
+ char *resource)
{
int ret;
if (!ls || !lksb || !resource || !strlen(resource)) {
+ printf("%p %p %p %d\n", ls, lksb, resource,
+ (int)strlen(resource));
+ printf("INVAL...\n");
errno = EINVAL;
return -1;
}
@@ -91,15 +98,22 @@
dlm_lshandle_t
-clu_acquire_lockspace(const char *lsname)
+clu_open_lockspace(const char *lsname)
{
dlm_lshandle_t ls = NULL;
+ //printf("opening lockspace %s\n", lsname);
+
while (!ls) {
ls = dlm_open_lockspace(lsname);
if (ls)
break;
+ /*
+ printf("Failed to open: %s; trying create.\n",
+ strerror(errno));
+ */
+
ls = dlm_create_lockspace(lsname, 0644);
if (ls)
break;
@@ -119,9 +133,8 @@
}
-
int
-clu_unlock(dlm_lshandle_t ls, struct dlm_lksb *lksb)
+clu_ls_unlock(dlm_lshandle_t ls, struct dlm_lksb *lksb)
{
int ret;
@@ -147,7 +160,146 @@
int
-clu_release_lockspace(dlm_lshandle_t ls, char *name)
+clu_close_lockspace(dlm_lshandle_t ls, const char *name)
{
return dlm_release_lockspace(name, ls, 0);
}
+
+
+int
+clu_lock(int mode,
+ struct dlm_lksb *lksb,
+ int options,
+ char *resource)
+{
+ int ret = 0, block = 0, conv = 0, err;
+
+ block = !(options & LKF_NOQUEUE);
+
+ /*
+ Try to use a conversion lock mechanism when possible
+ If the caller calls explicitly with a NULL lock, then
+ assume the caller knows what it is doing.
+
+ Only take the NULL lock if:
+ (a) the user isn't specifying CONVERT; if they are, they
+ know what they are doing.
+
+ ...and one of...
+
+ (b) This is a blocking call, or
+ (c) The user requested a NULL lock explicitly. In this case,
+ short-out early; there's no reason to convert a NULL lock
+ to a NULL lock.
+ */
+ if (!(options & LKF_CONVERT) &&
+ (block || (mode == LKM_NLMODE))) {
+ /* Acquire NULL lock */
+ pthread_mutex_lock(&_default_lock);
+ ret = clu_ls_lock(_default_ls, LKM_NLMODE, lksb,
+ (options & ~LKF_NOQUEUE),
+ resource);
+ err = errno;
+ pthread_mutex_unlock(&_default_lock);
+ if (ret == 0) {
+ if (mode == LKM_NLMODE) {
+ /* User only wanted a NULL lock... */
+ return 0;
+ }
+ /*
+ Ok, NULL lock was taken, rest of blocking
+ call should be done using lock conversions.
+ */
+ options |= LKF_CONVERT;
+ conv = 1;
+ } else {
+ switch(err) {
+ case EINVAL:
+ /* Oops, null locks don't work on this
+ plugin; use normal spam mode */
+ break;
+ default:
+ errno = err;
+ return -1;
+ }
+ }
+ }
+
+ while (1) {
+ pthread_mutex_lock(&_default_lock);
+ ret = clu_ls_lock(_default_ls, mode, lksb,
+ (options | LKF_NOQUEUE),
+ resource);
+ err = errno;
+ pthread_mutex_unlock(&_default_lock);
+
+ if ((ret != 0) && (err == EAGAIN) && block) {
+ usleep(random()&32767);
+ continue;
+ }
+
+ break;
+ }
+
+ if (ret != 0 && conv) {
+ /* If we get some other error, release the NL lock we
+ took so we don't leak locks*/
+ pthread_mutex_lock(&_default_lock);
+ clu_ls_unlock(_default_ls, lksb);
+ pthread_mutex_unlock(&_default_lock);
+ errno = err;
+ }
+
+ return ret;
+}
+
+
+int
+clu_unlock(struct dlm_lksb *lksb)
+{
+ int ret, err;
+ pthread_mutex_lock(&_default_lock);
+ ret = clu_ls_unlock(_default_ls, lksb);
+ err = errno;
+ pthread_mutex_unlock(&_default_lock);
+
+ usleep(random()&32767);
+ errno = err;
+ return ret;
+}
+
+
+int
+clu_lock_init(const char *dflt_lsname)
+{
+ int ret, err;
+
+ pthread_mutex_lock(&_default_lock);
+ if (_default_ls) {
+ pthread_mutex_unlock(&_default_lock);
+ return 0;
+ }
+
+ if (!dflt_lsname || !strlen(dflt_lsname)) {
+ pthread_mutex_unlock(&_default_lock);
+ errno = EINVAL;
+ return -1;
+ }
+
+ _default_ls = clu_open_lockspace(dflt_lsname);
+ ret = (_default_ls == NULL);
+ err = errno;
+ pthread_mutex_unlock(&_default_lock);
+
+ errno = err;
+ return ret;
+}
+
+void
+clu_lock_finished(const char *name)
+{
+ pthread_mutex_lock(&_default_lock);
+ if (_default_ls)
+ clu_close_lockspace(_default_ls, name);
+ pthread_mutex_unlock(&_default_lock);
+}
--- cluster/rgmanager/src/clulib/lockspace.c 2006/06/13 19:22:38 1.1
+++ cluster/rgmanager/src/clulib/lockspace.c 2006/07/11 23:52:41 1.2
@@ -1,3 +1,21 @@
+/*
+ Copyright Red Hat, Inc. 2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
--- cluster/rgmanager/src/clulib/members.c 2006/06/13 19:22:38 1.1
+++ cluster/rgmanager/src/clulib/members.c 2006/07/11 23:52:41 1.2
@@ -1,3 +1,21 @@
+/*
+ Copyright Red Hat, Inc. 2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
#include <sys/types.h>
#include <arpa/inet.h>
#include <stdint.h>
@@ -8,10 +26,12 @@
#include <members.h>
#include <stdlib.h>
#include <unistd.h>
+#include <signal.h>
#include <string.h>
#include <sys/socket.h>
#include <rg_types.h>
#include <pthread.h>
+#include <errno.h>
static int my_node_id = -1;
static pthread_rwlock_t memblock = PTHREAD_RWLOCK_INITIALIZER;
@@ -73,29 +93,32 @@
{
int count, x, y;
char in_old = 0;
- cluster_member_list_t *gained;
+ cluster_member_list_t *gained = NULL;
/* No nodes in new? Then nothing could have been gained */
if (!new || !new->cml_count)
return NULL;
/* Nothing in old? Duplicate 'new' and return it. */
- if (!old || !old->cml_count) {
- gained = cml_alloc(new->cml_count);
- if (!gained)
- return NULL;
- memcpy(gained, new, cml_size(new->cml_count));
- return gained;
- }
+ if (!old || !old->cml_count)
+ return member_list_dup(new);
/* Use greatest possible count */
count = (old->cml_count > new->cml_count ?
- cml_size(old->cml_count) : cml_size(new->cml_count));
+ old->cml_count : new->cml_count);
+ count *= sizeof(cman_node_t);
- gained = malloc(count);
+ gained = malloc(sizeof(cluster_member_list_t));
if (!gained)
return NULL;
- memset(gained, 0, count);
+ memset(gained, 0, sizeof(*gained));
+
+ gained->cml_members = malloc(count);
+ if (!gained->cml_members) {
+ free(gained);
+ return NULL;
+ }
+ memset(gained->cml_members, 0, count);
for (x = 0; x < new->cml_count; x++) {
@@ -121,6 +144,7 @@
}
if (gained->cml_count == 0) {
+ free(gained->cml_members);
free(gained);
gained = NULL;
}
@@ -145,7 +169,7 @@
cluster_member_list_t *
memb_lost(cluster_member_list_t *old, cluster_member_list_t *new)
{
- cluster_member_list_t *ret;
+ cluster_member_list_t *ret = NULL;
int x;
/* Reverse. ;) */
@@ -213,26 +237,42 @@
get_member_list(cman_handle_t h)
{
int c;
+ int tries = 0, local = 0;
cluster_member_list_t *ml = NULL;
cman_node_t *nodes = NULL;
- do {
+ if (h == NULL) {
+ local = 1;
+ h = cman_init(NULL);
+ if (!h)
+ return NULL;
+ }
+
+ do {
+ ++tries;
if (nodes)
free(nodes);
c = cman_get_node_count(h);
- if (c <= 0)
- return NULL;
+ if (c <= 0) {
+ if (errno == EINTR)
+ continue;
+ if (ml)
+ free(ml);
+ ml = NULL;
+ goto out;
+ }
if (!ml)
ml = malloc(sizeof(*ml));
if (!ml)
- return NULL;
+ goto out;
nodes = malloc(sizeof(*nodes) * c);
if (!nodes) {
free(ml);
- return NULL;
+ ml = NULL;
+ goto out;
}
memset(ml, 0, sizeof(*ml));
@@ -244,6 +284,10 @@
ml->cml_members = nodes;
ml->cml_count = c;
+
+out:
+ if (local)
+ cman_finish(h);
return ml;
}
@@ -264,6 +308,9 @@
{
int x = 0;
+ if (!ml)
+ return 0;
+
for (x = 0; x < ml->cml_count; x++) {
if (ml->cml_members[x].cn_nodeid == nodeid)
return ml->cml_members[x].cn_member;
@@ -278,6 +325,9 @@
{
int x = 0, count = 0;
+ if (!ml)
+ return 0;
+
for (x = 0; x < ml->cml_count; x++) {
if (ml->cml_members[x].cn_member)
++count;
@@ -292,6 +342,9 @@
{
int x = 0;
+ if (!ml)
+ return -1;
+
for (x = 0; x < ml->cml_count; x++) {
if (ml->cml_members[x].cn_nodeid == nodeid)
ml->cml_members[x].cn_member = 0;
@@ -306,13 +359,15 @@
memb_id_to_name(cluster_member_list_t *ml, int nodeid)
{
int x = 0;
+ if (!ml)
+ return NULL;
for (x = 0; x < ml->cml_count; x++) {
if (ml->cml_members[x].cn_nodeid == nodeid)
return ml->cml_members[x].cn_name;
}
- return 0;
+ return NULL;
}
@@ -320,13 +375,15 @@
memb_id_to_p(cluster_member_list_t *ml, int nodeid)
{
int x = 0;
+ if (!ml)
+ return NULL;
for (x = 0; x < ml->cml_count; x++) {
if (ml->cml_members[x].cn_nodeid == nodeid)
return &ml->cml_members[x];
}
- return 0;
+ return NULL;
}
@@ -334,6 +391,8 @@
memb_online_name(cluster_member_list_t *ml, char *name)
{
int x = 0;
+ if (!ml)
+ return 0;
for (x = 0; x < ml->cml_count; x++) {
if (!strcasecmp(ml->cml_members[x].cn_name, name))
@@ -348,6 +407,8 @@
memb_name_to_id(cluster_member_list_t *ml, char *name)
{
int x = 0;
+ if (!ml)
+ return 0;
for (x = 0; x < ml->cml_count; x++) {
if (!strcasecmp(ml->cml_members[x].cn_name, name))
@@ -362,13 +423,15 @@
memb_name_to_p(cluster_member_list_t *ml, char *name)
{
int x = 0;
+ if (!ml)
+ return NULL;
for (x = 0; x < ml->cml_count; x++) {
if (!strcasecmp(ml->cml_members[x].cn_name, name))
return &ml->cml_members[x];
}
- return 0;
+ return NULL;
}
/**
@@ -388,9 +451,19 @@
if (!orig)
return NULL;
- ret = malloc(cml_size(orig->cml_count));
- memset(ret, 0, cml_size(orig->cml_count));
- memcpy(ret, orig, cml_size(orig->cml_count));
+ ret = malloc(sizeof(cluster_member_list_t));
+ if (!ret)
+ return NULL;
+ memset(ret, 0, sizeof(cluster_member_list_t));
+ ret->cml_members = malloc(sizeof(cman_node_t) * orig->cml_count);
+
+ if (!ret->cml_members) {
+ free(ret);
+ return NULL;
+ }
+ ret->cml_count = orig->cml_count;
+ memcpy(ret->cml_members, orig->cml_members,
+ orig->cml_count * sizeof(cman_node_t));
return ret;
}
--- cluster/rgmanager/src/clulib/message.c 2006/06/13 19:22:38 1.1
+++ cluster/rgmanager/src/clulib/message.c 2006/07/11 23:52:41 1.2
@@ -1,138 +1,41 @@
+/*
+ Copyright Red Hat, Inc. 2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
#define _MESSAGE_BUILD
#include <message.h>
#include <stdio.h>
-#include <pthread.h>
-#include <libcman.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <stdlib.h>
-#include <rg_types.h>
-#include <sys/socket.h>
#include <sys/stat.h>
-#include <sys/un.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
#include <errno.h>
-#include <sys/time.h>
-#include <fdops.h>
-#include <resgroup.h>
-
-
-
-/* Ripped from ccsd's setup_local_socket */
-#define RGMGR_SOCK "/var/run/cluster/rgmanager.sk"
-#define MAX_CONTEXTS 32 /* Testing; production should be 1024-ish */
-
-/* Context 0 is reserved for control messages */
-
-/* Local-ish contexts */
-static pthread_mutex_t context_lock = PTHREAD_MUTEX_INITIALIZER;
-static msgctx_t *contexts[MAX_CONTEXTS];
-static uint32_t context_index = 1;
-static chandle_t *gch;
-pthread_t comms_thread;
-int thread_running;
-
-
-#define is_established(ctx) \
- (((ctx->type == MSG_CLUSTER) && \
- (ctx->u.cluster_info.remote_ctx && ctx->u.cluster_info.local_ctx)) || \
- ((ctx->type == MSG_SOCKET) && \
- (ctx->u.local_info.sockfd != -1)))
-
-
-static int
-local_connect(void)
-{
- struct sockaddr_un sun;
- int sock = -1, error = 0;
-
- memset(&sun, 0, sizeof(sun));
- sun.sun_family = PF_LOCAL;
- snprintf(sun.sun_path, sizeof(sun.sun_path), RGMGR_SOCK);
-
- sock = socket(PF_LOCAL, SOCK_STREAM, 0);
- if (sock < 0) {
- error = errno;
- goto fail;
- }
-
- error = connect(sock, (struct sockaddr *)&sun, sizeof(sun));
- if (error < 0) {
- error = errno;
- goto fail;
- }
-
- sock = error;
-fail:
-
- return sock;
-}
-
-
-static int
-send_cluster_message(msgctx_t *ctx, void *msg, size_t len)
-{
- char buf[4096];
- cluster_msg_hdr_t *h = (void *)buf;
- int ret;
- char *msgptr = (buf + sizeof(*h));
-
- if ((len + sizeof(*h)) > sizeof(buf)) {
- errno = E2BIG;
- return -1;
- }
-
- h->msg_control = M_DATA;
- h->src_ctx = ctx->u.cluster_info.local_ctx;
- h->dest_ctx = ctx->u.cluster_info.remote_ctx;
- h->msg_port = ctx->u.cluster_info.port;
- memcpy(msgptr, msg, len);
- /*
- printf("sending cluster message, length = %d to nodeid %d port %d\n",
- len + sizeof(*h), ctx->u.cluster_info.nodeid, ctx->u.cluster_info.port);
- */
-
- pthread_mutex_lock(&gch->c_lock);
- h->src_nodeid = gch->c_nodeid;
-
- swab_cluster_msg_hdr_t(h);
-
- ret = cman_send_data(gch->c_cluster, (void *)h, len + sizeof(*h),
- ctx->u.cluster_info.nodeid,
- ctx->u.cluster_info.port, 0);
-
- pthread_mutex_unlock(&gch->c_lock);
-
- return len + sizeof(h);
-}
-
-
-/**
- Wrapper around write(2)
- */
-static int
-send_socket_message(msgctx_t *ctx, void *msg, size_t len)
-{
- char buf[4096];
- local_msg_hdr_t *h = (local_msg_hdr_t *)buf;
- char *msgptr = (buf + sizeof(*h));
-
- /* encapsulate ... ? */
- if ((len + sizeof(*h)) > sizeof(buf)) {
- errno = E2BIG;
- return -1;
- }
-
- h->msg_control = M_DATA;
- h->msg_len = len;
- memcpy(msgptr, msg, len);
-
- return _write_retry(ctx->u.local_info.sockfd, msg, len + sizeof(*h), NULL);
-}
+/* From msg_cluster */
+int cluster_msg_init(msgctx_t *ctx);
+int cluster_msg_listen(int me, void *, msgctx_t **ctx);
+int cluster_msg_shutdown(void);
+
+/* From msg_socket */
+int sock_msg_init(msgctx_t *ctx);
+int sock_msg_listen(int me, void *, msgctx_t **ctx);
+int sock_msg_shutdown(void);
/**
@@ -142,238 +45,22 @@
int
msg_send(msgctx_t *ctx, void *msg, size_t len)
{
- if (!ctx || !msg || !len) {
- errno = EINVAL;
- return -1;
- }
-
- switch(ctx->type) {
- case MSG_CLUSTER:
- return send_cluster_message(ctx, msg, len);
- case MSG_SOCKET:
- return send_socket_message(ctx, msg, len);
- default:
- break;
- }
-
errno = EINVAL;
- return -1;
-}
-
-
-/**
- Assign a (free) cluster context ID
- */
-static int
-assign_ctx(msgctx_t *ctx)
-{
- int start;
-
- /* Assign context index */
- ctx->type = MSG_CLUSTER;
-
- pthread_mutex_lock(&context_lock);
- start = context_index++;
- if (context_index >= MAX_CONTEXTS || context_index <= 0)
- context_index = 1;
- do {
- if (contexts[context_index]) {
- ++context_index;
- if (context_index >= MAX_CONTEXTS)
- context_index = 1;
-
- if (context_index == start) {
- pthread_mutex_unlock(&context_lock);
- errno = EAGAIN;
- return -1;
- }
-
- continue;
- }
-
- contexts[context_index] = ctx;
- ctx->u.cluster_info.local_ctx = context_index;
-
- } while (0);
- pthread_mutex_unlock(&context_lock);
-
- return 0;
-}
-
-
-/* See if anything's on the cluster socket. If so, dispatch it
- on to the requisite queues
- XXX should be passed a connection arg! */
-static int
-poll_cluster_messages(int timeout)
-{
- int ret = -1;
- fd_set rfds;
- int fd;
- struct timeval tv;
- struct timeval *p = NULL;
-
- if (timeout >= 0) {
- p = &tv;
- tv.tv_sec = tv.tv_usec = timeout;
- }
- printf("%s\n", __FUNCTION__);
-
- FD_ZERO(&rfds);
-
- //pthread_mutex_lock(&gch->c_lock);
- fd = cman_get_fd(gch->c_cluster);
- FD_SET(fd, &rfds);
-
- if (select(fd + 1, &rfds, NULL, NULL, p) == 1) {
- cman_dispatch(gch->c_cluster, 0);
- ret = 0;
- }
- //pthread_mutex_unlock(&gch->c_lock);
-
- return ret;
-}
-
-
-/**
- This is used to establish and tear down pseudo-private
- contexts which are shared with the cluster context.
- */
-static int
-cluster_send_control_msg(msgctx_t *ctx, int type)
-{
- cluster_msg_hdr_t cm;
-
- cm.msg_control = (uint8_t)type;
- cm.src_nodeid = gch->c_nodeid;
- cm.dest_ctx = ctx->u.cluster_info.remote_ctx;
- cm.src_ctx = ctx->u.cluster_info.local_ctx;
- cm.msg_port = ctx->u.cluster_info.port;
-
- swab_cluster_msg_hdr_t(&cm);
-
- return (cman_send_data(gch->c_cluster, (void *)&cm, sizeof(cm),
- ctx->u.cluster_info.nodeid,
- ctx->u.cluster_info.port, 0));
-}
-
-
-/**
- Wait for a message on a context.
- */
-static int
-cluster_msg_wait(msgctx_t *ctx, int timeout)
-{
- struct timespec ts = {0, 0};
- int req = M_NONE;
- struct timeval start;
- struct timeval now;
-
-
- if (timeout > 0)
- gettimeofday(&start, NULL);
-
- ts.tv_sec = !!timeout;
-
- pthread_mutex_lock(&ctx->u.cluster_info.mutex);
- while (1) {
- /* See if we dispatched any messages on to our queue */
- if (ctx->u.cluster_info.queue) {
- req = ctx->u.cluster_info.queue->message->msg_control;
- /*printf("Queue not empty CTX%d : %d\n",
- ctx->u.cluster_info.local_ctx, req);*/
- break;
- }
-
- if (timeout == 0)
- break;
-
- /* Ok, someone else has the mutex on our FD. Go to
- sleep on a cond; maybe they'll wake us up */
- if (pthread_cond_timedwait(&ctx->u.cluster_info.cond,
- &ctx->u.cluster_info.mutex,
- &ts) < 0) {
-
- /* Mutex held */
- if (errno == ETIMEDOUT) {
- if (timeout < 0) {
- ts.tv_sec = 1;
- ts.tv_nsec = 0;
- continue;
- }
-
- ts.tv_sec = !!timeout;
-
- /* Done */
- break;
- }
- }
-
- if (timeout > 0) {
- gettimeofday(&now, NULL);
- /* XXX imprecise */
- if (now.tv_sec - start.tv_sec > timeout)
- break;
- }
- }
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
-
- return req;
-}
-
-
-static int
-peekypeeky(int fd)
-{
- local_msg_hdr_t h;
- int ret;
-
- while ((ret = recv(fd, (void *)&h, sizeof(h), MSG_PEEK)) < 0) {
- if (errno == EINTR)
- continue;
+ if (!ctx || !msg || !len)
return -1;
- }
-
- if (ret == sizeof(h))
- return h.msg_control;
-
- if (ret == 0)
- /* Socket closed? */
- return M_CLOSE;
- /* XXX */
- printf("PROTOCOL ERROR: Invalid message\n");
- return M_CLOSE;
-}
-
-
-static int
-local_msg_wait(msgctx_t *ctx, int timeout)
-{
- fd_set rfds;
- struct timeval tv = {0, 0};
- struct timeval *p = NULL;
-
- if (timeout >= 0) {
- tv.tv_sec = timeout;
- p = &tv;
- }
-
- FD_ZERO(&rfds);
- FD_SET(ctx->u.local_info.sockfd, &rfds);
-
- if (_select_retry(ctx->u.local_info.sockfd + 1, &rfds,
- NULL, NULL, p) == 1) {
- return peekypeeky(ctx->u.local_info.sockfd);
- }
-
- return M_NONE;
+ if (ctx->ops && ctx->ops->mo_send)
+ return ctx->ops->mo_send(ctx, msg, len);
+ errno = ENOSYS;
+ return -1;
}
+/* XXX get API for this ready */
int
msg_get_nodeid(msgctx_t *ctx)
{
+ /* XXX */
switch(ctx->type) {
case MSG_CLUSTER:
return ctx->u.cluster_info.nodeid;
@@ -390,49 +77,13 @@
int
msg_fd_set(msgctx_t *ctx, fd_set *fds, int *max)
{
- int e;
- switch(ctx->type) {
- case MSG_CLUSTER:
- pthread_mutex_lock(&ctx->u.cluster_info.mutex);
- if (ctx->u.cluster_info.select_pipe[0] < 0) {
- if (pipe(ctx->u.cluster_info.select_pipe) < 0) {
- e = errno;
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
- errno = e;
- return -1;
- }
-
- printf("%s: Created cluster CTX select pipe "
- "rd=%d wr=%d\n", __FUNCTION__,
- ctx->u.cluster_info.select_pipe[0],
- ctx->u.cluster_info.select_pipe[1]);
-
- }
-
- e = ctx->u.cluster_info.select_pipe[0];
- printf("%s: cluster %d\n", __FUNCTION__, e);
- FD_SET(e, fds);
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
-
- if (e > *max)
- *max = e;
- return 0;
-
- case MSG_SOCKET:
- if (ctx->u.local_info.sockfd >= 0) {
- printf("%s: local %d\n", __FUNCTION__,
- ctx->u.local_info.sockfd);
- FD_SET(ctx->u.local_info.sockfd, fds);
-
- if (ctx->u.local_info.sockfd > *max)
- *max = ctx->u.local_info.sockfd;
- return 0;
- }
+ errno = EINVAL;
+ if (!ctx)
return -1;
- default:
- break;
- }
-
+
+ if (ctx->ops && ctx->ops->mo_fd_set)
+ return ctx->ops->mo_fd_set(ctx, fds, max);
+ errno = ENOSYS;
return -1;
}
@@ -441,30 +92,12 @@
msg_fd_isset(msgctx_t *ctx, fd_set *fds)
{
errno = EINVAL;
-
- if (!fds || !ctx)
+ if (!ctx)
return -1;
-
- switch(ctx->type) {
- case MSG_CLUSTER:
- pthread_mutex_lock(&ctx->u.cluster_info.mutex);
- if (ctx->u.cluster_info.select_pipe[0] >= 0 &&
- FD_ISSET(ctx->u.cluster_info.select_pipe[0], fds)) {
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
- return 1;
- }
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
- return 0;
- case MSG_SOCKET:
- if (ctx->u.local_info.sockfd >= 0 &&
- FD_ISSET(ctx->u.local_info.sockfd, fds)) {
- return 1;
- }
- return 0;
- default:
- break;
- }
-
+
+ if (ctx->ops && ctx->ops->mo_fd_isset)
+ return ctx->ops->mo_fd_isset(ctx, fds);
+ errno = ENOSYS;
return -1;
}
@@ -473,30 +106,12 @@
msg_fd_clr(msgctx_t *ctx, fd_set *fds)
{
errno = EINVAL;
-
- if (!fds || !ctx)
+ if (!ctx)
return -1;
-
- switch(ctx->type) {
- case MSG_CLUSTER:
- pthread_mutex_lock(&ctx->u.cluster_info.mutex);
- if (ctx->u.cluster_info.select_pipe[0] >= 0) {
- FD_CLR(ctx->u.cluster_info.select_pipe[0], fds);
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
- return 1;
- }
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
- return 0;
- case MSG_SOCKET:
- if (ctx->u.local_info.sockfd >= 0) {
- FD_CLR(ctx->u.local_info.sockfd, fds);
- return 1;
- }
- return 0;
- default:
- break;
- }
-
+
+ if (ctx->ops && ctx->ops->mo_fd_clr)
+ return ctx->ops->mo_fd_clr(ctx, fds);
+ errno = ENOSYS;
return -1;
}
@@ -519,232 +134,28 @@
int
msg_wait(msgctx_t *ctx, int timeout)
{
-
- if (!ctx) {
- errno = EINVAL;
- return -1;
- }
-
- switch(ctx->type) {
- case MSG_CLUSTER:
- return cluster_msg_wait(ctx, timeout);
- case MSG_SOCKET:
- return local_msg_wait(ctx, timeout);
- default:
- break;
- }
-
errno = EINVAL;
- return -1;
-}
-
-
-int
-_cluster_msg_receive(msgctx_t *ctx, void **msg, size_t *len)
-{
- cluster_msg_hdr_t *m;
- msg_q_t *n;
- int ret = 0;
-
- if (msg)
- *msg = NULL;
- if (len)
- *len = 0;
-
- if (ctx->u.cluster_info.local_ctx < 0 ||
- ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) {
- errno = EBADF;
- return -1;
- }
-
- /* trigger receive here */
- pthread_mutex_lock(&ctx->u.cluster_info.mutex);
-
- n = ctx->u.cluster_info.queue;
- if (n == NULL) {
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
- errno = EAGAIN;
- return -1;
- }
-
- list_remove(&ctx->u.cluster_info.queue, n);
-
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
-
- m = n->message;
- switch(m->msg_control) {
- case M_CLOSE:
- ctx->u.cluster_info.remote_ctx = 0;
- break;
- case M_OPEN_ACK:
- /* Response to new connection */
- ctx->u.cluster_info.remote_ctx = m->src_ctx;
- break;
- case M_DATA:
- /* Kill the message control structure */
- memmove(m, &m[1], n->len - sizeof(*m));
- if (msg)
- *msg = (void *)m;
- else {
- printf("Warning: dropping data message\n");
- free(m);
- }
- if (len)
- *len = (n->len - sizeof(*m));
- ret = (n->len - sizeof(*m));
- free(n);
-
- printf("Message received\n");
- return ret;
- case M_OPEN:
- /* Someone is trying to open a connection */
- default:
- /* ?!! */
- ret = -1;
- break;
- }
-
- free(m);
- free(n);
-
- return ret;
-}
-
-
-/**
- Receive a message from a cluster-context. This copies out the contents
- into the user-specified buffer, and does random other things.
- */
-static int
-cluster_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
-{
- int req;
- void *priv_msg;
- size_t priv_len;
-
- if (!msg || !maxlen) {
- errno = EINVAL;
- return -1;
- }
-
- req = cluster_msg_wait(ctx, timeout);
-
- switch (req) {
- case M_DATA:
- /* Copy out. */
- req = _cluster_msg_receive(ctx, &priv_msg, &priv_len);
- if (req < 0) {
- printf("Ruh roh!\n");
- return -1;
- }
-
- priv_len = (priv_len < maxlen ? priv_len : maxlen);
-
- memcpy(msg, priv_msg, priv_len);
- free(priv_msg);
- return req;
- case M_CLOSE:
- errno = ECONNRESET;
- return -1;
- case 0:
- /*printf("Nothing on queue\n");*/
- return 0;
- default:
- printf("PROTOCOL ERROR: Received %d\n", req);
+ if (!ctx)
return -1;
- }
-
- printf("%s: CODE PATH ERROR\n", __FUNCTION__);
+
+ if (ctx->ops && ctx->ops->mo_wait)
+ return ctx->ops->mo_wait(ctx, timeout);
+ errno = ENOSYS;
return -1;
}
-static int
-_local_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
-{
- struct timeval tv = {0, 0};
- struct timeval *p = NULL;
- local_msg_hdr_t h;
-
- if (timeout >= 0) {
- tv.tv_sec = timeout;
- p = &tv;
- }
-
- if (_read_retry(ctx->u.local_info.sockfd, &h, sizeof(h), p) < 0)
- return -1;
-
- if (maxlen < h.msg_len) {
- printf("WARNING: Buffer too small for message!\n");
- h.msg_len = maxlen;
- }
-
- return _read_retry(ctx->u.local_info.sockfd, msg, h.msg_len, p);
-}
-
-
-/**
- Receive a message from a cluster-context. This copies out the contents
- into the user-specified buffer, and does random other things.
- */
-static int
-local_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
-{
- int req;
- char priv_msg[4096];
- size_t priv_len;
-
- if (!msg || !maxlen) {
- errno = EINVAL;
- return -1;
- }
-
- switch (req) {
- case M_DATA:
- /* Copy out. */
- req = _local_msg_receive(ctx, priv_msg, priv_len, timeout);
- if (req <= 0)
- return -1;
-
- priv_len = (priv_len < maxlen ? priv_len : maxlen);
-
- memcpy(msg, priv_msg, priv_len);
- free(msg);
- return req;
- case M_CLOSE:
- errno = ECONNRESET;
- return -1;
- case 0:
- /*printf("Nothing on queue\n");*/
- return 0;
- default:
- printf("PROTOCOL ERROR: Received %d\n", req);
- return -1;
- }
-
- printf("%s: CODE PATH ERROR\n", __FUNCTION__);
- return -1;
-}
-
int
msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
{
- if (!ctx || !msg || !maxlen) {
- errno = EINVAL;
+ errno = EINVAL;
+ if (!ctx)
return -1;
- }
- switch(ctx->type) {
- case MSG_CLUSTER:
- return cluster_msg_receive(ctx, msg, maxlen, timeout);
- case MSG_SOCKET:
- return local_msg_receive(ctx, msg, maxlen, timeout);
- default:
- break;
- }
-
- errno = EINVAL;
+ if (ctx->ops && ctx->ops->mo_receive)
+ return ctx->ops->mo_receive(ctx, msg, maxlen, timeout);
+ errno = ENOSYS;
return -1;
}
@@ -755,78 +166,28 @@
/var/run/cluster...
*/
int
-msg_open(int nodeid, int port, msgctx_t *ctx, int timeout)
+msg_open(int type, int nodeid, int port, msgctx_t *ctx, int timeout)
{
- int t = 0;
-
errno = EINVAL;
if (!ctx)
return -1;
-
- /*printf("Opening pseudo channel to node %d\n", nodeid);*/
-
- memset(ctx, 0, sizeof(*ctx));
- if (nodeid == CMAN_NODEID_US) {
- if ((ctx->u.local_info.sockfd = local_connect()) < 0) {
- return -1;
- }
- ctx->type = MSG_SOCKET;
- return 0;
- }
-
- ctx->type = MSG_CLUSTER;
- ctx->u.cluster_info.nodeid = nodeid;
- ctx->u.cluster_info.port = port;
- ctx->u.cluster_info.local_ctx = -1;
- ctx->u.cluster_info.remote_ctx = 0;
- ctx->u.cluster_info.queue = NULL;
- ctx->u.cluster_info.select_pipe[0] = -1;
- ctx->u.cluster_info.select_pipe[1] = -1;
- pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL);
- pthread_cond_init(&ctx->u.cluster_info.cond, NULL);
-
- /* Assign context index */
- if (assign_ctx(ctx) < 0)
- return -1;
-
- //printf(" Local CTX: %d\n", ctx->u.cluster_info.local_ctx);
-
- /* Send open */
-
- //printf(" Sending control message M_OPEN\n");
- if (cluster_send_control_msg(ctx, M_OPEN) < 0) {
+ /* XXX SPECIAL CASE... ow. */
+ switch(type) {
+ case MSG_SOCKET:
+ sock_msg_init(ctx);
+ break;
+ case MSG_CLUSTER:
+ cluster_msg_init(ctx);
+ break;
+ default:
return -1;
}
- /* Ok, wait for a response */
- while (!is_established(ctx)) {
- ++t;
- if (t > timeout) {
- msg_close(ctx);
- errno = ETIMEDOUT;
- return -1;
- }
-
- switch(msg_wait(ctx, 1)) {
- case M_OPEN_ACK:
- _cluster_msg_receive(ctx, NULL, NULL);
- break;
- case M_NONE:
- continue;
- default:
- printf("PROTO ERROR: M_OPEN_ACK not received \n");
- }
- }
-
- /*
- printf(" Remote CTX: %d\n",
- ctx->u.cluster_info.remote_ctx);
- printf(" Pseudo channel established!\n");
- */
-
-
- return 0;
+ if (ctx->ops && ctx->ops->mo_open)
+ return ctx->ops->mo_open(ctx->type, nodeid, port, ctx, timeout);
+ errno = ENOSYS;
+ return -1;
}
@@ -842,484 +203,65 @@
int
msg_close(msgctx_t *ctx)
{
- msg_q_t *n = NULL;
-
- if (!ctx) {
- errno = EINVAL;
- return -1;
- }
-
- switch (ctx->type) {
- case MSG_CLUSTER:
- if (ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) {
- errno = EINVAL;
- return -1;
- }
- pthread_mutex_lock(&context_lock);
- /* Other threads should not be able to see this again */
- if (contexts[ctx->u.cluster_info.local_ctx])
- contexts[ctx->u.cluster_info.local_ctx] = NULL;
- pthread_mutex_unlock(&context_lock);
- /* Clear receive queue */
- while ((n = ctx->u.cluster_info.queue) != NULL) {
- list_remove(&ctx->u.cluster_info.queue, n);
- free(n->message);
- free(n);
- }
- /* Send close message */
- if (ctx->u.cluster_info.remote_ctx != 0) {
- cluster_send_control_msg(ctx, M_CLOSE);
- }
-
- /* Close pipe if it's open */
- if (ctx->u.cluster_info.select_pipe[0] >= 0) {
- close(ctx->u.cluster_info.select_pipe[0]);
- ctx->u.cluster_info.select_pipe[0] = -1;
- }
- if (ctx->u.cluster_info.select_pipe[1] >= 0) {
- close(ctx->u.cluster_info.select_pipe[1]);
- ctx->u.cluster_info.select_pipe[1] = -1;
- }
- return 0;
- case MSG_SOCKET:
- close(ctx->u.local_info.sockfd);
- ctx->u.local_info.sockfd = -1;
- return 0;
- default:
- break;
- }
-
errno = EINVAL;
- return -1;
-}
-
-
-/**
- Called by cman_dispatch to deal with messages coming across the
- cluster socket. This function deals with fanning out the requests
- and putting them on the per-context queues. We don't have
- the benefits of pre-configured buffers, so we need this.
- */
-static void
-process_cman_msg(cman_handle_t h, void *priv, char *buf, int len,
- uint8_t port, int nodeid)
-{
- cluster_msg_hdr_t *m = (cluster_msg_hdr_t *)buf;
- msg_q_t *node;
- msgctx_t *ctx;
-
- if (len < sizeof(*m)) {
- printf("Message too short.\n");
- return;
- }
-
- swab_cluster_msg_hdr_t(m);
-
-#if 0
- printf("Processing ");
- switch(m->msg_control) {
- case M_NONE:
- printf("M_NONE\n");
- break;
- case M_OPEN:
- printf("M_OPEN\n");
- break;
- case M_OPEN_ACK:
- printf("M_OPEN_ACK\n");
- break;
- case M_DATA:
- printf("M_DATA\n");
- break;
- case M_CLOSE:
- printf("M_CLOSE\n");
- break;
- }
-
- printf(" Node ID: %d %d\n", m->src_nodeid, nodeid);
- printf(" Remote CTX: %d Local CTX: %d\n", m->src_ctx, m->dest_ctx);
-#endif
-
- if (m->dest_ctx >= MAX_CONTEXTS) {
- printf("Context invalid; ignoring\n");
- return;
- }
-
- while ((node = malloc(sizeof(*node))) == NULL) {
- sleep(1);
- }
- memset(node, 0, sizeof(*node));
- while ((node->message = malloc(len)) == NULL) {
- sleep(1);
- }
- memcpy(node->message, buf, len);
- node->len = len;
-
- pthread_mutex_lock(&context_lock);
- ctx = contexts[m->dest_ctx];
- if (!ctx) {
- /* We received a close for something we've already
- detached from our list. No big deal, just
- ignore. */
- free(node->message);
- free(node);
- pthread_mutex_unlock(&context_lock);
- return;
- }
-
- pthread_mutex_lock(&ctx->u.cluster_info.mutex);
- list_insert(&ctx->u.cluster_info.queue, node);
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
- /* If a select pipe was set up, wake it up */
- if (ctx->u.cluster_info.select_pipe[1] >= 0)
- write(ctx->u.cluster_info.select_pipe[1], "", 1);
- pthread_mutex_unlock(&context_lock);
-
- pthread_cond_signal(&ctx->u.cluster_info.cond);
-}
-
-
-/**
- Accept a new pseudo-private connection coming in over the
- cluster socket.
- */
-static int
-cluster_msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx)
-{
- errno = EINVAL;
- cluster_msg_hdr_t *m;
- msg_q_t *n;
- char done = 0;
- char foo;
-
- if (!listenctx || !acceptctx)
- return -1;
- if (listenctx->u.cluster_info.local_ctx != 0)
- return -1;
-
- pthread_mutex_lock(&listenctx->u.cluster_info.mutex);
-
- n = listenctx->u.cluster_info.queue;
- if (n == NULL) {
- pthread_mutex_unlock(&listenctx->u.cluster_info.mutex);
- errno = EAGAIN;
+ if (!ctx)
return -1;
- }
-
- /* the OPEN should be the first thing on the list; this loop
- is probably not necessary */
- list_do(&listenctx->u.cluster_info.queue, n) {
-
- m = n->message;
- switch(m->msg_control) {
- case M_OPEN:
- list_remove(&listenctx->u.cluster_info.queue, n);
- /*printf("Accepting connection from %d %d\n",
- m->src_nodeid, m->src_ctx);*/
-
- /* New connection */
- pthread_mutex_init(&acceptctx->u.cluster_info.mutex,
- NULL);
- pthread_cond_init(&acceptctx->u.cluster_info.cond,
- NULL);
- acceptctx->u.cluster_info.queue = NULL;
- acceptctx->u.cluster_info.remote_ctx = m->src_ctx;
- acceptctx->u.cluster_info.nodeid = m->src_nodeid;
- acceptctx->u.cluster_info.port = m->msg_port;
-
- assign_ctx(acceptctx);
- cluster_send_control_msg(acceptctx, M_OPEN_ACK);
-
- if (listenctx->u.cluster_info.select_pipe[0] >= 0) {
- read(listenctx->u.cluster_info.select_pipe[0],
- &foo, 1);
- }
-
- done = 1;
- free(m);
- free(n);
-
- break;
- case M_DATA:
- /* Data messages (i.e. from broadcast msgs) are
- okay too!... but we don't handle them here */
- break;
- default:
- /* Other message?! */
- printf("Odd... %d\n", m->msg_control);
- break;
- }
-
- if (done)
- break;
-
- } while (!list_done(&listenctx->u.cluster_info.queue, n));
-
- pthread_mutex_unlock(&listenctx->u.cluster_info.mutex);
-
- return 0;
+ if (ctx->ops && ctx->ops->mo_close)
+ return ctx->ops->mo_close(ctx);
+ errno = ENOSYS;
+ return -1;
}
-/* XXX INCOMPLETE */
int
msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx)
{
- switch(listenctx->type) {
- case MSG_CLUSTER:
- return cluster_msg_accept(listenctx, acceptctx);
- case MSG_SOCKET:
- return 0;
- default:
- break;
- }
-
- return -1;
-}
-
-
-static int
-local_listener_sk(void)
-{
- int sock;
- struct sockaddr_un su;
- mode_t om;
-
- sock = socket(PF_LOCAL, SOCK_STREAM, 0);
- if (sock < 0)
+ errno = EINVAL;
+ if (!listenctx || !acceptctx)
return -1;
-
- unlink(RGMGR_SOCK);
- om = umask(077);
- su.sun_family = PF_LOCAL;
- snprintf(su.sun_path, sizeof(su.sun_path), RGMGR_SOCK);
-
- if (bind(sock, &su, sizeof(su)) < 0) {
- umask(om);
- goto fail;
- }
- umask(om);
-
- if (listen(sock, SOMAXCONN) < 0)
- goto fail;
-
- return sock;
-fail:
- if (sock >= 0)
- close(sock);
+ if (listenctx->ops && listenctx->ops->mo_accept)
+ return listenctx->ops->mo_accept(listenctx, acceptctx);
+ errno = ENOSYS;
return -1;
}
-/**
- This waits for events on the cluster comms FD and
- dispatches them using cman_dispatch. Initially,
- the design had no permanent threads, but that model
- proved difficult to implement correctly.
- */
-static void *
-cluster_comms_thread(void *arg)
-{
- while (thread_running) {
- poll_cluster_messages(2);
- }
-
- return NULL;
-}
-
-
-/*
- Transliterates a CMAN event to a control message
- */
-static void
-process_cman_event(cman_handle_t handle, void *private, int reason, int arg)
-{
- cluster_msg_hdr_t *msg;
- int *argp;
- msg_q_t *node;
- msgctx_t *ctx;
-
- /* Allocate queue node */
- while ((node = malloc(sizeof(*node))) == NULL) {
- sleep(1);
- }
- memset(node, 0, sizeof(*node));
-
- /* Allocate message: header + int (for arg) */
- while ((msg = malloc(sizeof(int) +
- sizeof(cluster_msg_hdr_t))) == NULL) {
- sleep(1);
- }
- memset(msg, 0, sizeof(int)+sizeof(cluster_msg_hdr_t));
-
-
- switch(reason) {
-#if defined(LIBCMAN_VERSION) && LIBCMAN_VERSION >= 2
- case CMAN_REASON_PORTOPENED:
- msg->msg_control = M_PORTOPENED;
- break;
- case CMAN_REASON_TRY_SHUTDOWN:
- msg->msg_control = M_TRY_SHUTDOWN;
- break;
-#endif
- case CMAN_REASON_PORTCLOSED:
- msg->msg_control = M_PORTCLOSED;
- break;
- case CMAN_REASON_STATECHANGE:
- msg->msg_control = M_STATECHANGE;
- break;
- }
-
- argp = ((void *)msg + sizeof(cluster_msg_hdr_t));
- *argp = arg;
-
- node->len = sizeof(cluster_msg_hdr_t) + sizeof(int);
- node->message = msg;
-
- pthread_mutex_lock(&context_lock);
- ctx = contexts[0]; /* This is the cluster context... */
- if (!ctx) {
- /* We received a close for something we've already
- detached from our list. No big deal, just
- ignore. */
- free(node->message);
- free(node);
- pthread_mutex_unlock(&context_lock);
- return;
- }
-
- pthread_mutex_lock(&ctx->u.cluster_info.mutex);
- list_insert(&ctx->u.cluster_info.queue, node);
- pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
- /* If a select pipe was set up, wake it up */
- if (ctx->u.cluster_info.select_pipe[1] >= 0)
- write(ctx->u.cluster_info.select_pipe[1], "", 1);
- pthread_mutex_unlock(&context_lock);
-
- pthread_cond_signal(&ctx->u.cluster_info.cond);
-}
-
-
-/* XXX INCOMPLETE */
+/* XXX Special case */
int
-msg_init(chandle_t *ch)
+msg_listen(int type, void *port, int me, msgctx_t **ctx)
{
- int e;
- pthread_attr_t attrs;
- msgctx_t *ctx;
-
- pthread_mutex_lock(&ch->c_lock);
-
- /* Set up local context */
-
- ctx = msg_new_ctx();
- if (!ctx) {
- pthread_mutex_unlock(&ch->c_lock);
+ errno = EINVAL;
+ if (!me)
return -1;
- }
-
- ctx->type = MSG_SOCKET;
- ctx->u.local_info.sockfd = local_listener_sk();
- ctx->u.local_info.flags = SKF_LISTEN;
-
- ch->local_ctx = ctx;
-
- ctx = msg_new_ctx();
-
- if (!ctx) {
- pthread_mutex_unlock(&ch->c_lock);
- msg_free_ctx((msgctx_t *)ch->local_ctx);
+ if (type == MSG_NONE)
return -1;
- }
-
- gch = ch;
-
- if (cman_start_recv_data(ch->c_cluster, process_cman_msg,
- RG_PORT) != 0) {
- e = errno;
- msg_close(ch->local_ctx);
- pthread_mutex_unlock(&ch->c_lock);
- msg_free_ctx((msgctx_t *)ch->local_ctx);
- msg_free_ctx((msgctx_t *)ch->cluster_ctx);
- errno = e;
+ if (!ctx)
return -1;
- }
- if (cman_start_notification(ch->c_cluster, process_cman_event) != 0) {
- e = errno;
- msg_close(ch->local_ctx);
- pthread_mutex_unlock(&ch->c_lock);
- msg_free_ctx((msgctx_t *)ch->local_ctx);
- msg_free_ctx((msgctx_t *)ch->cluster_ctx);
- errno = e;
+ if (type == MSG_CLUSTER) {
+ return cluster_msg_listen(me, port, ctx);
+ } else if (type == MSG_SOCKET) {
+ return sock_msg_listen(me, port, ctx);
}
- ch->cluster_ctx = ctx;
- pthread_mutex_unlock(&ch->c_lock);
-
- pthread_mutex_lock(&context_lock);
-
- memset(contexts, 0, sizeof(contexts));
- contexts[0] = ctx;
-
- ctx->type = MSG_CLUSTER;
- ctx->u.cluster_info.port = RG_PORT; /* port! */
- ctx->u.cluster_info.nodeid = 0; /* Broadcast! */
- ctx->u.cluster_info.select_pipe[0] = -1;
- ctx->u.cluster_info.select_pipe[1] = -1;
- pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL);
- pthread_cond_init(&ctx->u.cluster_info.cond, NULL);
- pthread_mutex_unlock(&context_lock);
-
- pthread_attr_init(&attrs);
- pthread_attr_setinheritsched(&attrs, PTHREAD_INHERIT_SCHED);
- pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
-
- thread_running = 1;
- pthread_create(&comms_thread, &attrs, cluster_comms_thread, NULL);
-
-
- pthread_attr_destroy(&attrs);
-
- return 0;
+ return -1;
}
-int
-msg_print_ctx(int ctx)
+void
+msg_print(msgctx_t *ctx)
{
- if (!contexts[ctx])
- return -1;
-
- printf("Cluster Message Context %d\n", ctx);
- printf(" Node ID %d\n", contexts[ctx]->u.cluster_info.nodeid);
- printf(" Remote %d\n", contexts[ctx]->u.cluster_info.remote_ctx);
- return 0;
+ if (ctx->ops && ctx->ops->mo_print)
+ return ctx->ops->mo_print(ctx);
}
/* XXX INCOMPLETE */
int
-msg_shutdown(chandle_t *ch)
+msg_shutdown(void)
{
- if (!ch) {
- errno = EINVAL;
- return -1;
- }
-
- while (pthread_kill(comms_thread, 0) == 0)
- sleep(1);
-
- pthread_mutex_lock(&ch->c_lock);
-
- /* xxx purge everything */
- msg_close(ch->local_ctx);
- cman_end_recv_data(ch->c_cluster);
-
- msg_free_ctx(ch->local_ctx);
- msg_free_ctx(ch->cluster_ctx);
-
-
- pthread_mutex_unlock(&ch->c_lock);
+ sock_msg_shutdown();
+ cluster_msg_shutdown();
return 0;
}
@@ -1337,7 +279,6 @@
{
msgctx_t *p;
- printf("Alloc %d\n", sizeof(msgctx_t));
p = malloc(sizeof(msgctx_t));
if (!p)
return NULL;
@@ -1354,4 +295,3 @@
{
free(dead);
}
-
--- cluster/rgmanager/src/clulib/msgsimple.c 2006/06/02 17:37:10 1.5
+++ cluster/rgmanager/src/clulib/msgsimple.c 2006/07/11 23:52:41 1.6
@@ -76,9 +76,8 @@
msg_receive_simple(msgctx_t *ctx, generic_msg_hdr ** buf, int timeout)
{
int ret;
- char msgbuf[16384];
+ char msgbuf[4096];
generic_msg_hdr *peek_msg = (generic_msg_hdr *)msgbuf;
- int size;
/*
* Peek at the header. We need the size of the inbound buffer!
@@ -102,6 +101,7 @@
return -1;
}
+ /* Decode so we know how much to allocate */
swab_generic_msg_hdr(peek_msg);
if (peek_msg->gh_magic != GENERIC_HDR_MAGIC) {
fprintf(stderr, "Invalid magic: Wanted 0x%08x, got 0x%08x\n",
@@ -113,14 +113,15 @@
* allocate enough memory to receive the header + diff buffer
*/
*buf = malloc(peek_msg->gh_length);
-
if (!*buf) {
fprintf(stderr, "%s: malloc: %s", __FUNCTION__,
strerror(errno));
return -1;
}
- memset(*buf, 0, peek_msg->gh_length);
- memcpy(*buf, msgbuf + sizeof(generic_msg_hdr), peek_msg->gh_length);
+ memcpy(*buf, msgbuf, peek_msg->gh_length);
+
+ /* Put it back into the original order... */
+ swab_generic_msg_hdr((generic_msg_hdr *)(*buf));
return ret;
}
--- cluster/rgmanager/src/clulib/rg_strings.c 2005/03/07 17:25:12 1.3
+++ cluster/rgmanager/src/clulib/rg_strings.c 2006/07/11 23:52:41 1.4
@@ -1,3 +1,21 @@
+/*
+ Copyright Red Hat, Inc. 2004-2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
const char *rg_state_strings[] = {
"stopped",
"starting",
--- cluster/rgmanager/src/clulib/signals.c 2004/08/13 15:36:51 1.1
+++ cluster/rgmanager/src/clulib/signals.c 2006/07/11 23:52:41 1.2
@@ -1,3 +1,21 @@
+/*
+ Copyright Red Hat, Inc. 2003-2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
#include <signal.h>
#include <stdlib.h>
#include <string.h>
--- cluster/rgmanager/src/clulib/vft.c 2006/06/02 17:37:10 1.13
+++ cluster/rgmanager/src/clulib/vft.c 2006/07/11 23:52:41 1.14
@@ -1,5 +1,5 @@
/*
- Copyright Red Hat, Inc. 2002-2003
+ Copyright Red Hat, Inc. 2002-2006
This program is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
@@ -40,12 +40,8 @@
#include <stdio.h>
#include <assert.h>
#include <signals.h>
+#include <lock.h>
-
-int clu_lock_verbose(char *lockname, int flags, void **lockpp);
-
-static int vf_lfds[2];
-static int vf_lfd = 0;
static key_node_t *key_list = NULL; /** List of key nodes. */
static int _node_id = (int)-1;/** Our node ID, set with vf_init. */
static uint16_t _port = 0; /** Our daemon ID, set with vf_init. */
@@ -57,7 +53,7 @@
static pthread_mutex_t key_list_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t vf_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_t vf_thread = (pthread_t)-1;
-static int thread_ready = 0;
+static int vf_thread_ready = 0;
static vf_vote_cb_t default_vote_cb = NULL;
static vf_vote_cb_t default_commit_cb = NULL;
@@ -65,43 +61,42 @@
/*
* Internal Functions
*/
-static int send_to_all(msgctx_t *peer_ctx, int32_t command, int arg1, int arg2,
- int log_errors);
-static int vf_send_abort(msgctx_t *ctx);
-static int vf_send_commit(msgctx_t *ctx);
-static void close_all(msgctx_t *fds);
+static int _send_simple(msgctx_t *ctx, int32_t command, int arg1, int arg2,
+ int log_errors);
+static int vf_send_abort(msgctx_t *ctx, uint32_t trans);
+static int vf_send_commit(msgctx_t *ctx, uint32_t trans);
static key_node_t * kn_find_key(char *keyid);
-static key_node_t * kn_find_fd(msgctx_t *ctx);
-static int vf_handle_join_view_msg(msgctx_t *ctx, vf_msg_t * hdrp);
+static key_node_t * kn_find_trans(uint32_t trans);
+static int vf_handle_join_view_msg(msgctx_t *ctx, int nodeid, vf_msg_t * hdrp);
static int vf_resolve_views(key_node_t *key_node);
-static int vf_unanimous(msgctx_t *peer_ctx, int remain, int timeout);
-static view_node_t * vn_new(msgctx_t *ctx, uint32_t nodeid, int viewno,
+static int vf_unanimous(msgctx_t *ctx, int trans, int remain, int timeout);
+static view_node_t * vn_new(uint32_t trans, uint32_t nodeid, int viewno,
void *data, uint32_t datalen);
static int vf_request_current(cluster_member_list_t *membership, char *keyid,
- int *viewno, void **data, uint32_t *datalen);
-static int _vf_purge(key_node_t *key_node, msgctx_t **fd);
+ uint64_t *viewno, void **data, uint32_t *datalen);
+static int _vf_purge(key_node_t *key_node, uint32_t *trans);
/* Join-view buffer list functions */
static int vn_cmp(view_node_t *left, view_node_t *right);
static int vn_insert_sorted(view_node_t **head, view_node_t *node);
-static view_node_t * vn_remove(view_node_t **head, msgctx_t *ctx);
-static int vf_buffer_join_msg(int fd, vf_msg_t *hdr,
+static view_node_t * vn_remove(view_node_t **head, uint32_t trans);
+static int vf_buffer_join_msg(vf_msg_t *hdr,
struct timeval *timeout);
/* Commits buffer list functions */
static int vc_cmp(commit_node_t *left, commit_node_t *right);
static int vc_insert_sorted(commit_node_t **head, commit_node_t *node);
-static commit_node_t * vc_remove(commit_node_t **head, int fd);
-static int vf_buffer_commit(int fd);
+static commit_node_t * vc_remove(commit_node_t **head, uint32_t trans);
+static int vf_buffer_commit(uint32_t trans);
/* Simple functions which client calls to vote/abort */
-static int vf_vote_yes(msgctx_t *ctx);
-static int vf_vote_no(msgctx_t *ctx);
-static int vf_abort(msgctx_t *ctx);
+static int vf_vote_yes(msgctx_t *ctx, uint32_t trans);
+static int vf_vote_no(msgctx_t *ctx, uint32_t trans);
+static int vf_abort(uint32_t trans);
static int tv_cmp(struct timeval *left, struct timeval *right);
/* Resolution */
-static int vf_try_commit(key_node_t *key_node);
+static uint32_t vf_try_commit(key_node_t *key_node);
int vf_init(int my_node_id, uint16_t my_port,
vf_vote_cb_t vote_cb, vf_commit_cb_t commit_cb);
@@ -111,7 +106,7 @@
vf_commit_cb_t commit_cb);
int vf_write(cluster_member_list_t *memberhip, uint32_t flags,
char *keyid, void *data, uint32_t datalen);
-int vf_process_msg(int handle, generic_msg_hdr *msgp, int nbytes);
+int vf_process_msg(msgctx_t *ctx, int nodeid, generic_msg_hdr *msgp, int nbytes);
int vf_end(char *keyid);
int vf_read(cluster_member_list_t *membership, char *keyid, uint64_t *view,
void **data, uint32_t *datalen);
@@ -123,14 +118,14 @@
struct vf_args {
uint16_t port;
int local_node_id;
+ msgctx_t *ctx;
};
static int
-send_to_all(msgctx_t *peer_ctx, int32_t command, int arg1, int arg2, int log_errors)
+_send_simple(msgctx_t *ctx, int32_t command, int arg1, int arg2, int log_errors)
{
generic_msg_hdr hdr;
- int x, rv = 0;
hdr.gh_magic = GENERIC_HDR_MAGIC;
hdr.gh_length = sizeof(hdr);
@@ -140,51 +135,27 @@
swab_generic_msg_hdr(&hdr);
- for (x=0; peer_ctx[x].type != -1; x++) {
- if (msg_send(&peer_ctx[x], &hdr, sizeof(hdr)) == sizeof(hdr))
- continue;
-
- if (log_errors) {
-#if 0
- clulog(LOG_ERR, "#14: Failed to send %d "
- "bytes to %d!\n", sizeof(hdr),
- x);
-#endif
- }
- rv = -1;
- }
-
- return rv;
+ return msg_send(ctx, &hdr, sizeof(hdr));
}
static int
-vf_send_abort(msgctx_t *ctx)
+vf_send_abort(msgctx_t *ctx, uint32_t trans)
{
#ifdef DEBUG
- printf("VF: Broadcasting ABORT\n");
+ printf("VF: Broadcasting ABORT (X#%08x)\n", trans);
#endif
- return send_to_all(ctx, VF_MESSAGE, VF_ABORT, 0, 0);
+ return _send_simple(ctx, VF_MESSAGE, VF_ABORT, trans, 0);
}
static int
-vf_send_commit(msgctx_t *ctx)
+vf_send_commit(msgctx_t *ctx, uint32_t trans)
{
#ifdef DEBUG
printf("VF: Broadcasting FORMED\n");
#endif
- return send_to_all(ctx, VF_MESSAGE, VF_VIEW_FORMED, 0, 1);
-}
-
-
-static void
-close_all(msgctx_t *ctx)
-{
- int x;
- for (x = 0; ctx[x].type != -1; x++) {
- msg_close(&ctx[x]);
- }
+ return _send_simple(ctx, VF_MESSAGE, VF_VIEW_FORMED, trans, 1);
}
@@ -202,14 +173,14 @@
static key_node_t *
-kn_find_fd(msgctx_t *ctx)
+kn_find_trans(uint32_t trans)
{
key_node_t *cur;
view_node_t *curvn;
for (cur = key_list; cur; cur = cur->kn_next)
for (curvn = cur->kn_jvlist; curvn; curvn = curvn->vn_next)
- if (curvn->vn_ctx == ctx)
+ if (curvn->vn_transaction == trans)
return cur;
return NULL;
@@ -217,15 +188,17 @@
static int
-vf_handle_join_view_msg(msgctx_t *ctx, vf_msg_t * hdrp)
+vf_handle_join_view_msg(msgctx_t *ctx, int nodeid, vf_msg_t * hdrp)
{
struct timeval timeout;
key_node_t *key_node;
+ uint32_t trans;
+ trans = hdrp->vm_msg.vf_transaction;
#ifdef DEBUG
- printf("VF_JOIN_VIEW from member #%d! Key: %s #%d\n",
+ printf("VF_JOIN_VIEW from member #%d! Key: %s #%d (X#%08x)\n",
hdrp->vm_msg.vf_coordinator, hdrp->vm_msg.vf_keyid,
- (int) hdrp->vm_msg.vf_view);
+ (int) hdrp->vm_msg.vf_view, trans);
#endif
pthread_mutex_lock(&key_list_mutex);
@@ -241,7 +214,7 @@
pthread_mutex_unlock(&key_list_mutex);
printf("VF: Error: Failed to initialize %s\n",
hdrp->vm_msg.vf_keyid);
- vf_vote_no(ctx);
+ vf_vote_no(ctx, trans);
return VFR_ERROR;
}
@@ -258,7 +231,7 @@
#ifdef DEBUG
printf("VF: Voting NO (via callback)\n");
#endif
- vf_vote_no(ctx);
+ vf_vote_no(ctx, trans);
return VFR_OK;
}
}
@@ -269,12 +242,12 @@
timeout.tv_sec = key_node->kn_tsec;
timeout.tv_usec = 0;
- if (vf_buffer_join_msg(ctx, (vf_msg_t *) hdrp, &timeout)) {
+ if (vf_buffer_join_msg((vf_msg_t *) hdrp, &timeout)) {
pthread_mutex_unlock(&key_list_mutex);
#ifdef DEBUG
- printf("VF: Voting YES\n");
+ printf("VF: Voting YES (X#%08x)\n", trans);
#endif
- vf_vote_yes(ctx);
+ vf_vote_yes(ctx, trans);
return VFR_OK;
}
@@ -282,7 +255,7 @@
#ifdef DEBUG
printf("VF: Voting NO\n");
#endif
- vf_vote_no(ctx);
+ vf_vote_no(ctx, trans);
return VFR_NO;
}
@@ -297,13 +270,10 @@
int commits = 0;
void *data;
uint32_t datalen;
- msgctx_t *ctx;
+ uint32_t trans;
- while ((ctx = vf_try_commit(key_node)) != -1) {
-
- /* XXX in general, this shouldn't kill the fd... */
+ while ((trans = vf_try_commit(key_node)) != 0) {
commits++;
- msg_close(commitfd);
}
if (key_node->kn_commit_cb) {
@@ -327,12 +297,12 @@
static int
-vf_unanimous(msgctx_t *peer_ctx, int remain, int timeout)
+vf_unanimous(msgctx_t *mcast_ctx, int trans, int remain,
+ int timeout)
{
generic_msg_hdr response;
struct timeval tv;
- fd_set rfds;
- int nready, x, max;
+ int x;
/* Set up for the select */
tv.tv_sec = timeout;
@@ -346,69 +316,62 @@
* Flag hosts which we received messages from so we don't
* read a second message.
*/
- while (remain) {
- FD_ZERO(&rfds);
- for (x = 0; peer_ctx[x].type != M_NONE; x++)
- msg_fd_set(&peer_ctx[x], &rfds, &max);
-
- nready = select(max + 1, &rfds, NULL, NULL, &tv);
- if (nready <= -1) {
- if (nready == 0)
- printf("VF Abort: Timed out!\n");
- else
- printf("VF Abort: %s\n",
- strerror(errno));
- return 0;
+ while (remain && timeout) {
+
+ if (msg_wait(mcast_ctx, 5) <= 0) {
+ --timeout;
+ continue;
}
- for (x = 0; (peer_ctx[x].type != M_NONE) && nready; x++) {
- if (!msg_fd_isset(&peer_fds[x], &rfds))
- continue;
+ x = msg_receive(mcast_ctx, &response, sizeof(response), 1);
+ if (x < sizeof(response))
+ continue;
+
+ /*
+ * Decode & validate message
+ */
+ swab_generic_msg_hdr(&response);
+ if ((response.gh_magic != GENERIC_HDR_MAGIC) ||
+ (response.gh_command != VF_MESSAGE)) {
+ /* Don't process anything but votes */
+ continue;
+ }
- remain--;
- nready--;
- /*
- * Get reply from node x. XXX 1 second timeout?
- */
- if (msg_receive(&peer_ctx[x], &response,
- sizeof(response),
- 1) == -1) {
- printf("VF: Abort: Timed out during "
- "receive from fd #%p\n", &peer_ctx[x]);
- return 0;
- }
-
- /*
- * Decode & validate message
- */
- swab_generic_msg_hdr(&response);
- if ((response.gh_magic != GENERIC_HDR_MAGIC) ||
- (response.gh_command != VF_MESSAGE) ||
- (response.gh_arg1 != VF_VOTE)) {
- printf("VF: Abort: Invalid header in"
- " reply from fd #%p\n", &peer_fds[x]);
- return 0;
- }
-
+ if (vf_command(response.gh_arg1) != VF_VOTE)
+ /* Don't process anything but votes */
+ continue;
+
+ if (response.gh_arg2 != trans)
+ continue;
+
+ /*
+ * If we get a 'NO', we are done.
+ */
+ if (!(vf_flags(response.gh_arg1) & VFMF_AFFIRM)) {
/*
- * If we get a 'NO', we are done.
+ * XXX ok, it might be a mangled message;
+ * treat it as no anyway!
*/
- if (response.gh_arg2 != 1) {
- /*
- * XXX ok, it might be a mangled message;
- * treat it as no anyway!
- */
- printf("VF: Abort: fd #%d voted NO\n",
- peer_fds[x]);
- return 0;
- }
-
#ifdef DEBUG
- printf("VF: fd #%d voted YES\n", peer_fds[x]);
+ printf("VF: Abort: someone voted NO\n");
#endif
+ return 0;
}
+
+#ifdef DEBUG
+ printf("VF: YES\n");
+#endif
+ --remain;
}
+ if (remain) {
+#ifdef DEBUG
+ printf("VF: Timed out waiting for %d responses\n", remain);
+#endif
+ return 0;
+ }
+
+
/*
* Whohoooooooo!
*/
@@ -420,7 +383,7 @@
* ...
*/
static view_node_t *
-vn_new(msgctx_t *ctx, uint32_t nodeid, int viewno, void *data,
+vn_new(uint32_t trans, uint32_t nodeid, int viewno, void *data,
uint32_t datalen)
{
view_node_t *new;
@@ -433,7 +396,7 @@
memset(new,0,totallen);
- new->vn_ctx = ctx;
+ new->vn_transaction = trans;
new->vn_nodeid = nodeid;
new->vn_viewno = viewno;
new->vn_datalen = datalen;
@@ -502,7 +465,7 @@
static view_node_t *
-vn_remove(view_node_t **head, msgctx_t *ctx)
+vn_remove(view_node_t **head, uint32_t trans)
{
view_node_t *cur = *head, *back = NULL;
@@ -510,7 +473,7 @@
return NULL;
do {
- if (cur->vn_ctx == ctx) {
+ if (cur->vn_transaction == trans) {
if (back) {
back->vn_next = cur->vn_next;
cur->vn_next = NULL;
@@ -537,7 +500,7 @@
* (b) we don't receive any messages.
*/
static int
-vf_buffer_join_msg(msgctx_t *ctx, vf_msg_t *hdr, struct timeval *timeout)
+vf_buffer_join_msg(vf_msg_t *hdr, struct timeval *timeout)
{
key_node_t *key_node;
view_node_t *newp;
@@ -557,7 +520,8 @@
return 0;
}
- newp = vn_new(ctx, hdr->vm_msg.vf_coordinator, hdr->vm_msg.vf_view,
+ newp = vn_new(hdr->vm_msg.vf_transaction, hdr->vm_msg.vf_coordinator,
+ hdr->vm_msg.vf_view,
hdr->vm_msg.vf_data, hdr->vm_msg.vf_datalen);
if (timeout && (timeout->tv_sec || timeout->tv_usec)) {
@@ -585,10 +549,10 @@
static int
vc_cmp(commit_node_t *left, commit_node_t *right)
{
- if (left->vc_fd < right->vc_fd)
+ if (left->vc_transaction < right->vc_transaction)
return -1;
- if (left->vc_fd == right->vc_fd)
+ if (left->vc_transaction == right->vc_transaction)
return 0;
return 1;
@@ -637,7 +601,7 @@
static commit_node_t *
-vc_remove(commit_node_t **head, int fd)
+vc_remove(commit_node_t **head, uint32_t trans)
{
commit_node_t *cur = *head, *back = NULL;
@@ -645,7 +609,7 @@
return NULL;
do {
- if (cur->vc_fd == fd) {
+ if (cur->vc_transaction == trans) {
if (back) {
back->vc_next = cur->vc_next;
cur->vc_next = NULL;
@@ -671,13 +635,13 @@
* the last 'join-view' message.
*/
static int
-vf_buffer_commit(int fd)
+vf_buffer_commit(uint32_t trans)
{
key_node_t *key_node;
commit_node_t *newp;
int rv;
- key_node = kn_find_fd(fd);
+ key_node = kn_find_trans(trans);
if (!key_node)
return 0;
@@ -686,7 +650,7 @@
return 0;
newp->vc_next = NULL;
- newp->vc_fd = fd;
+ newp->vc_transaction = trans;
rv = vc_insert_sorted(&key_node->kn_clist, newp);
if (!rv)
@@ -697,31 +661,32 @@
static int
-vf_vote_yes(msgctx_t *ctx)
+vf_vote_yes(msgctx_t *ctx, uint32_t trans)
{
- return msg_send_simple(ctx, VF_MESSAGE, VF_VOTE, 1);
-
+ /* XXX */
+ return _send_simple(ctx, VF_MESSAGE, VF_VOTE | VFMF_AFFIRM, trans, 0);
}
static int
-vf_vote_no(msgctx_t *ctx)
+vf_vote_no(msgctx_t *ctx, uint32_t trans)
{
- return msg_send_simple(ctx, VF_MESSAGE, VF_VOTE, 0);
+ /* XXX */
+ return _send_simple(ctx, VF_MESSAGE, VF_VOTE, trans, 0);
}
static int
-vf_abort(msgctx_t *ctx)
+vf_abort(uint32_t trans)
{
key_node_t *key_node;
view_node_t *cur;
- key_node = kn_find_fd(ctx);
+ key_node = kn_find_trans(trans);
if (!key_node)
return -1;
- cur = vn_remove(&key_node->kn_jvlist, ctx);
+ cur = vn_remove(&key_node->kn_jvlist, trans);
if (!cur)
return -1;
@@ -787,30 +752,30 @@
/**
* Try to commit views in a given key_node.
*/
-static msgctx_t *
+static uint32_t
vf_try_commit(key_node_t *key_node)
{
view_node_t *vnp;
commit_node_t *cmp;
- msgctx_t *ctx = NULL;
+ uint32_t trans = 0;
if (!key_node)
- return NULL;
+ return 0;
if (!key_node->kn_jvlist)
- return NULL;
+ return 0;
- ctx = key_node->kn_jvlist->vn_ctx;
+ trans = key_node->kn_jvlist->vn_transaction;
- cmp = vc_remove(&key_node->kn_clist, ctx);
+ cmp = vc_remove(&key_node->kn_clist, trans);
if (!cmp) {
/*printf("VF: Commit for fd%d not received yet!", fd);*/
- return NULL;
+ return 0;
}
free(cmp); /* no need for it any longer */
- vnp = vn_remove(&key_node->kn_jvlist, ctx);
+ vnp = vn_remove(&key_node->kn_jvlist, trans);
if (!vnp) {
/*
* But, we know it was the first element on the list?!!
@@ -835,46 +800,27 @@
memcpy(key_node->kn_data, vnp->vn_data, vnp->vn_datalen);
free(vnp);
- return ctx;
+ return trans;
}
void
-vf_event_loop(int my_node_id)
+vf_event_loop(msgctx_t *ctx, int my_node_id)
{
- int max, nready, n, fd, flags;
- struct timeval tv;
- fd_set rfds;
+ int n;
generic_msg_hdr *hdrp = NULL;
- FD_ZERO(&rfds);
- max = msg_fill_fdset(&rfds, MSG_ALL, MSGP_VFS);
-
- tv.tv_sec = 1;
- tv.tv_usec = 0;
- nready = select(max + 1, &rfds, NULL, NULL, &tv);
- if (nready <= 0)
- return;
-
- while (nready) {
- fd = msg_next_fd(&rfds);
- --nready;
+ if (msg_wait(ctx, 3) != 0) {
- flags = msg_get_flags(fd);
-
- if (flags & MSG_LISTEN)
- fd = msg_accept(fd, 1, NULL);
-
- n = msg_receive_simple(fd, &hdrp, 5);
+ n = msg_receive_simple(ctx, &hdrp, 2);
if (n <= 0 || !hdrp) {
- msg_close(fd);
- continue;
+ return;
}
swab_generic_msg_hdr(hdrp);
if (hdrp->gh_command == VF_MESSAGE) {
- if (vf_process_msg(fd, hdrp, n) == VFR_COMMIT) {
+ if (vf_process_msg(ctx, 0, hdrp, n) == VFR_COMMIT) {
#ifdef DEBUG
printf("VFT: View committed\n");
#endif
@@ -893,7 +839,7 @@
vf_wait_ready(void)
{
pthread_mutex_lock(&vf_mutex);
- while (!thread_ready) {
+ while (!vf_thread_ready) {
pthread_mutex_unlock(&vf_mutex);
usleep(50000);
pthread_mutex_lock(&vf_mutex);
@@ -908,12 +854,14 @@
int my_node_id;
uint16_t port;
key_node_t *cur;
- int fd;
+ uint32_t trans;
+ msgctx_t *ctx;
block_all_signals();
port = ((struct vf_args *)arg)->port;
my_node_id = ((struct vf_args *)arg)->local_node_id;
+ ctx = ((struct vf_args *)arg)->ctx;
free(arg);
#ifdef DEBUG
@@ -921,29 +869,20 @@
#endif
pthread_mutex_lock(&vf_mutex);
-#if 0
- if ((vf_lfd = msg_listen(port, MSGP_VFS, vf_lfds, 2)) <= 0) {
- printf("Unable to set up listen socket on port %d\n",
- port);
- pthread_mutex_unlock(&vf_mutex);
- pthread_exit(NULL);
- }
-
- thread_ready = 1;
+ vf_thread_ready = 1;
pthread_mutex_unlock(&vf_mutex);
-#endif
- while (1) {
+ while (vf_thread_ready) {
pthread_mutex_lock(&key_list_mutex);
for (cur = key_list; cur; cur = cur->kn_next) {
/* Destroy timed-out join views */
- while (_vf_purge(cur, &fd) != VFR_NO) {
- msg_close(fd);
- }
+ while (_vf_purge(cur, &trans) != VFR_NO);
}
pthread_mutex_unlock(&key_list_mutex);
- vf_event_loop(my_node_id);
+ vf_event_loop(ctx, my_node_id);
}
+
+ msg_close(ctx);
return NULL;
}
@@ -952,40 +891,43 @@
/**
* Initialize VF. Initializes the View Formation sub system.
* @param my_node_id The node ID of the caller.
- * @param my_port The port of the caller.
* @return 0 on success, -1 on failure.
*/
int
vf_init(int my_node_id, uint16_t my_port, vf_vote_cb_t vcb,
vf_commit_cb_t ccb)
{
- struct vf_args *va;
-
+ struct vf_args *args;
+ msgctx_t *ctx;
if (my_node_id == (int)-1)
return -1;
+
+ while((ctx = msg_new_ctx()) == NULL)
+ sleep(1);
- if (my_port == 0)
- return -1;
+ while((args = malloc(sizeof(*args))) == NULL)
+ sleep(1);
- pthread_mutex_lock(&vf_mutex);
- if (vf_thread != (pthread_t)-1) {
- pthread_mutex_unlock(&vf_mutex);
- return 0;
+ if (msg_open(MSG_CLUSTER, 0, my_port, ctx, 1) < 0) {
+ free(ctx);
+ free(args);
+ return -1;
}
- va = malloc(sizeof(*va));
- va->local_node_id = my_node_id;
- va->port = my_port;
+ args->port = my_port;
+ args->local_node_id = my_node_id;
+ args->ctx = ctx;
- pthread_create(&vf_thread, NULL, vf_server, va);
- /* Write/read needs this */
+ pthread_mutex_lock(&vf_mutex);
_port = my_port;
_node_id = my_node_id;
default_vote_cb = vcb;
default_commit_cb = ccb;
pthread_mutex_unlock(&vf_mutex);
+ pthread_create(&vf_thread, NULL, vf_server, args);
+
vf_wait_ready();
return 0;
@@ -998,20 +940,14 @@
int
vf_shutdown(void)
{
- int x;
key_node_t *c_key;
view_node_t *c_jv;
commit_node_t *c_cn;
pthread_mutex_lock(&vf_mutex);
+ vf_thread_ready = 0;
pthread_cancel(vf_thread);
pthread_join(vf_thread, NULL);
- thread_ready = 0;
- vf_thread = (pthread_t)0;
-
- for (x = 0 ; x < vf_lfd; x++)
- msg_close(vf_lfds[x]);
-
_port = 0;
_node_id = (int)-1;
pthread_mutex_lock(&key_list_mutex);
@@ -1019,7 +955,6 @@
while ((c_key = key_list) != NULL) {
while ((c_jv = c_key->kn_jvlist) != NULL) {
- msg_close(c_jv->vn_ctx);
key_list->kn_jvlist = c_jv->vn_next;
free(c_jv);
}
@@ -1120,7 +1055,7 @@
vf_msg_t *
build_vf_data_message(int cmd, char *keyid, void *data, uint32_t datalen,
- int viewno, uint32_t *retlen)
+ int viewno, int trans, uint32_t *retlen)
{
uint32_t totallen;
vf_msg_t *msg;
@@ -1142,6 +1077,7 @@
/* Data */
strncpy(msg->vm_msg.vf_keyid,keyid,sizeof(msg->vm_msg.vf_keyid));
+ msg->vm_msg.vf_transaction = trans;
msg->vm_msg.vf_datalen = datalen;
msg->vm_msg.vf_coordinator = _node_id;
msg->vm_msg.vf_view = viewno;
@@ -1171,89 +1107,52 @@
vf_write(cluster_member_list_t *membership, uint32_t flags, char *keyid,
void *data, uint32_t datalen)
{
- int nodeid;
- msgctx_t *peer_ctx;
- int count;
+ msgctx_t everyone;
key_node_t *key_node;
vf_msg_t *join_view;
- int remain = 0, x, y, rv = 1;
+ int remain = 0, x, y, rv = VFR_ERROR;
uint32_t totallen;
+#ifdef DEBUG
struct timeval start, end, dif;
- void *lockp = NULL;
+#endif
+ struct dlm_lksb lockp;
int l;
char lock_name[256];
+ static uint32_t trans = 0;
if (!data || !datalen || !keyid || !strlen(keyid) || !membership)
return -1;
+
pthread_mutex_lock(&vf_mutex);
+ if (!trans) {
+ trans = _node_id << 16;
+ }
+ ++trans;
+
/* Obtain cluster lock on it. */
snprintf(lock_name, sizeof(lock_name), "usrm::vf");
- l = clu_lock_verbose(lock_name, CLK_EX, &lockp);
+ l = clu_lock(LKM_EXMODE, &lockp, 0, lock_name);
if (l < 0) {
- clu_unlock(lock_name, lockp);
+ clu_unlock(&lockp);
pthread_mutex_unlock(&vf_mutex);
return l;
}
- /* set to -1 */
- count = sizeof(msgctx_t) * (membership->cml_count + 1);
- peer_ctx = malloc(count);
- if(!peer_ctx) {
- pthread_mutex_unlock(&vf_mutex);
- return -1;
- }
-
- memset(peer_ctx, 0, sizeof(msgctx_t) * (membership->cml_count +1));
- for (x = 0; x < (membership->cml_count + 1); x++) {
- peer_ctx[x].type = M_NONE;
- }
+#ifdef DEBUG
getuptime(&start);
+#endif
-retry_top:
- /*
- * Connect to everyone, except ourself. We separate this from the
- * initial send cycle because the connect cycle can cause timeouts
- * within the code - ie, if a node is down, it is likely the connect
- * will take longer than the client is expecting to wait for the
- * commit/abort messages!
- *
- * We assume we're up. Since challenge-response needs both
- * processes to be operational...
- */
+ remain = 0;
for (x = 0, y = 0; x < membership->cml_count; x++) {
- if (!memb_online(membership,
- membership->cml_members[x].cn_nodeid)) {
- continue;
+ if (membership->cml_members[x].cn_member) {
+ remain++;
}
+ }
- if (peer_fds[x].type != M_NONE)
- continue;
-
- nodeid = membership->cml_members[x].cn_nodeid;
-#ifdef DEBUG
- printf("VF: Connecting to member #%d\n", (int)nodeid);
- fflush(stdout);
-#endif
-
- if (msg_open(nodeid, _port, &peer_ctx[y], 15) != 0) {
#ifdef DEBUG
- printf("VF: Connect to %d failed: %s\n", (int)nodeid,
- strerror(errno));
+ printf("aight, need responses from %d guys\n", remain);
#endif
- if (flags & VFF_RETRY)
- goto retry_top;
- if (flags & VFF_IGN_CONN_ERRORS)
- continue;
- free(peer_ctx);
-
- clu_unlock(lock_name, lockp);
- pthread_mutex_unlock(&vf_mutex);
- return -1;
- }
-
- ++y;
- }
pthread_mutex_lock(&key_list_mutex);
key_node = kn_find_key(keyid);
@@ -1261,7 +1160,7 @@
if ((vf_key_init_nt(keyid, 10, NULL, NULL) < 0)) {
pthread_mutex_unlock(&key_list_mutex);
- clu_unlock(lock_name, lockp);
+ clu_unlock(&lockp);
pthread_mutex_unlock(&vf_mutex);
return -1;
}
@@ -1270,19 +1169,19 @@
}
join_view = build_vf_data_message(VF_JOIN_VIEW, keyid, data, datalen,
- key_node->kn_viewno+1, &totallen);
+ key_node->kn_viewno+1, trans, &totallen);
pthread_mutex_unlock(&key_list_mutex);
if (!join_view) {
- clu_unlock(lock_name, lockp);
+ clu_unlock(&lockp);
pthread_mutex_unlock(&vf_mutex);
return -1;
}
#ifdef DEBUG
- printf("VF: Push %d.%d #%d\n", (int)_node_id, getpid(),
- (int)join_view->vm_msg.vf_view);
+ printf("VF: Push %d.%d #%d (X#%08x)\n", (int)_node_id, getpid(),
+ (int)join_view->vm_msg.vf_view, trans);
#endif
/*
* Encode the package.
@@ -1292,50 +1191,53 @@
/*
* Send our message to everyone
*/
- for (x = 0; peer_ctx[x].type != M_NONE; x++) {
-
- if (msg_send(&peer_ctx[x], join_view, totallen) != totallen) {
- vf_send_abort(peer_ctx);
- close_all(peer_ctx);
-
- free(join_view);
- clu_unlock(lock_name, lockp);
- pthread_mutex_unlock(&vf_mutex);
- return -1;
- }
-
- remain++;
+ if (msg_open(MSG_CLUSTER, 0, _port, &everyone, 0) < 0) {
+ printf("msg_open: fail: %s\n", strerror(errno));
+ return -1;
}
+ x = msg_send(&everyone, join_view, totallen);
+ if (x < totallen) {
+ vf_send_abort(&everyone, trans);
+#ifdef DEBUG
+ printf("VF: Aborted: Send failed (%d/%d)\n", x, totallen);
+#endif
+ msg_close(&everyone);
+ free(join_view);
+ clu_unlock(&lockp);
+ pthread_mutex_unlock(&vf_mutex);
+ return -1;
+ }
+
#ifdef DEBUG
printf("VF: Checking for consensus...\n");
#endif
/*
* See if we have a consensus =)
*/
- if ((rv = (vf_unanimous(peer_ctx, remain, VF_COORD_TIMEOUT)))) {
- vf_send_commit(peer_ctx);
+ if ((rv = (vf_unanimous(&everyone, trans, remain,
+ 5)))) {
+ vf_send_commit(&everyone, trans);
+#ifdef DEBUG
+ printf("VF: Consensus reached!\n");
+#endif
} else {
- vf_send_abort(peer_ctx);
+ vf_send_abort(&everyone, trans);
#ifdef DEBUG
printf("VF: Aborted!\n");
#endif
}
/*
- * Clean up
- */
- close_all(peer_fds);
-
- /*
* unanimous returns 1 for true; 0 for false, so negate it and
* return our value...
*/
+ msg_close(&everyone);
free(join_view);
- free(peer_fds);
- clu_unlock(lock_name, lockp);
+ clu_unlock(&lockp);
pthread_mutex_unlock(&vf_mutex);
+#ifdef DEBUG
if (rv) {
getuptime(&end);
@@ -1347,11 +1249,10 @@
dif.tv_sec--;
}
-#ifdef DEBUG
printf("VF: Converge Time: %d.%06d\n", (int)dif.tv_sec,
(int)dif.tv_usec);
-#endif
}
+#endif
return (rv?0:-1);
}
@@ -1378,12 +1279,12 @@
* VFR_COMMIT if new views were committed.
*/
static int
-_vf_purge(key_node_t *key_node, msgctx_t **ctx)
+_vf_purge(key_node_t *key_node, uint32_t *trans)
{
view_node_t *cur, *dead = NULL;
struct timeval tv;
- *fd = -1;
+ *trans = 0;
if (!key_node)
return VFR_NO;
@@ -1401,11 +1302,11 @@
if (tv_cmp(&tv, &cur->vn_timeout) < 0)
continue;
- *ctx = cur->vn_ctx;
- dead = vn_remove(&key_node->kn_jvlist, *ctx);
+ *trans = cur->vn_transaction;
+ dead = vn_remove(&key_node->kn_jvlist, *trans);
free(dead);
- printf("VF: Killed ctx %p\n", *ctx);
+ printf("VF: Killed transaction %08x\n", *trans);
/*
* returns the removed associated file descriptor
* so that we can close it and get on with life
@@ -1413,7 +1314,7 @@
break;
}
- if (*fd == -1)
+ if (*trans == 0)
return VFR_NO;
if (vf_resolve_views(key_node))
@@ -1425,13 +1326,13 @@
/**
* Process a VF message.
*
- * @param handle File descriptor on which msgp was received.
+ * @param nodeid Node id from which msgp was received.
* @param msgp Pointer to already-received message.
* @param nbytes Length of msgp.
* @return -1 on failure, 0 on success.
*/
int
-vf_process_msg(int handle, generic_msg_hdr *msgp, int nbytes)
+vf_process_msg(msgctx_t *ctx, int nodeid, generic_msg_hdr *msgp, int nbytes)
{
vf_msg_t *hdrp;
int ret;
@@ -1440,7 +1341,7 @@
(msgp->gh_command != VF_MESSAGE))
return VFR_ERROR;
- switch(msgp->gh_arg1) {
+ switch(vf_command(msgp->gh_arg1)) {
case VF_CURRENT:
#ifdef DEBUG
printf("VF: Received request for current data\n");
@@ -1455,7 +1356,7 @@
hdrp = (vf_msg_t *)msgp;
swab_vf_msg_info_t(&hdrp->vm_msg);
- return vf_send_current(handle, hdrp->vm_msg.vf_keyid);
+ return vf_send_current(ctx, hdrp->vm_msg.vf_keyid);
case VF_JOIN_VIEW:
/* Validate size... */
@@ -1475,28 +1376,28 @@
return VFR_ERROR;
}
- return vf_handle_join_view_msg(handle, hdrp);
+ return vf_handle_join_view_msg(ctx, nodeid, hdrp);
case VF_ABORT:
- printf("VF: Received VF_ABORT, fd%d\n", handle);
- vf_abort(handle);
+ printf("VF: Received VF_ABORT (X#%08x)\n", msgp->gh_arg2);
+ vf_abort(msgp->gh_arg2);
return VFR_ABORT;
case VF_VIEW_FORMED:
#ifdef DEBUG
- printf("VF: Received VF_VIEW_FORMED, fd%d\n",
- handle);
+ printf("VF: Received VF_VIEW_FORMED, %d\n",
+ nodeid);
#endif
pthread_mutex_lock(&key_list_mutex);
- vf_buffer_commit(handle);
- ret = (vf_resolve_views(kn_find_fd(handle)) ?
+ vf_buffer_commit(msgp->gh_arg2);
+ ret = (vf_resolve_views(kn_find_trans(msgp->gh_arg2)) ?
VFR_COMMIT : VFR_OK);
pthread_mutex_unlock(&key_list_mutex);
return ret;
-
+
default:
- printf("VF: Unknown msg type 0x%08x\n",
- msgp->gh_arg1);
+ /* Ignore votes and the like from this part */
+ break;
}
return VFR_OK;
@@ -1521,15 +1422,15 @@
{
key_node_t *key_node;
char lock_name[256];
- void *lockp = NULL;
+ struct dlm_lksb lockp;
int l;
/* Obtain cluster lock on it. */
pthread_mutex_lock(&vf_mutex);
snprintf(lock_name, sizeof(lock_name), "usrm::vf");
- l = clu_lock_verbose(lock_name, CLK_EX, &lockp);
+ l = clu_lock(LKM_EXMODE, &lockp, 0, lock_name);
if (l < 0) {
- clu_unlock(lock_name, lockp);
+ clu_unlock(&lockp);
pthread_mutex_unlock(&vf_mutex);
printf("Couldn't lock %s\n", keyid);
return l;
@@ -1542,7 +1443,7 @@
if (!key_node) {
if ((vf_key_init_nt(keyid, 10, NULL, NULL) < 0)) {
pthread_mutex_unlock(&key_list_mutex);
- clu_unlock(lock_name, lockp);
+ clu_unlock(&lockp);
pthread_mutex_unlock(&vf_mutex);
printf("Couldn't locate %s\n", keyid);
return VFR_ERROR;
@@ -1564,7 +1465,7 @@
pthread_mutex_unlock(&key_list_mutex);
if (!membership) {
- clu_unlock(lock_name, lockp);
+ clu_unlock(&lockp);
//printf("Membership NULL, can't find %s\n", keyid);
pthread_mutex_unlock(&vf_mutex);
return VFR_ERROR;
@@ -1573,7 +1474,7 @@
l = vf_request_current(membership, keyid, view, data,
datalen);
if (l == VFR_NODATA || l == VFR_ERROR) {
- clu_unlock(lock_name, lockp);
+ clu_unlock(&lockp);
//printf("Requesting current failed %s %d\n", keyid, l);
pthread_mutex_unlock(&vf_mutex);
return l;
@@ -1583,7 +1484,7 @@
*data = malloc(key_node->kn_datalen);
if (! *data) {
pthread_mutex_unlock(&key_list_mutex);
- clu_unlock(lock_name, lockp);
+ clu_unlock(&lockp);
pthread_mutex_unlock(&vf_mutex);
printf("Couldn't malloc %s\n", keyid);
return VFR_ERROR;
@@ -1594,7 +1495,7 @@
*view = key_node->kn_viewno;
pthread_mutex_unlock(&key_list_mutex);
- clu_unlock(lock_name, lockp);
+ clu_unlock(&lockp);
pthread_mutex_unlock(&vf_mutex);
return VFR_OK;
@@ -1659,7 +1560,7 @@
if (!key_node || !key_node->kn_data || !key_node->kn_datalen) {
pthread_mutex_unlock(&key_list_mutex);
printf("VFT: No data for keyid %s\n", keyid);
- return (msg_send_simple(ctx, VF_NACK, 0, 0) != -1)?
+ return (_send_simple(ctx, VF_NACK, 0, 0, 0) != -1)?
VFR_OK : VFR_ERROR;
}
@@ -1670,11 +1571,12 @@
msg = build_vf_data_message(VF_ACK, keyid, key_node->kn_data,
key_node->kn_datalen,
key_node->kn_viewno,
+ 0,
&totallen);
pthread_mutex_unlock(&key_list_mutex);
if (!msg)
- return (msg_send_simple(ctx, VFR_ERROR, 0, 0) != -1)?
+ return (_send_simple(ctx, VFR_ERROR, 0, 0, 0) != -1)?
VFR_OK : VFR_ERROR;
swab_vf_msg_t(msg);
@@ -1733,7 +1635,7 @@
*/
static int
vf_request_current(cluster_member_list_t *membership, char *keyid,
- int *viewno, void **data, uint32_t *datalen)
+ uint64_t *viewno, void **data, uint32_t *datalen)
{
int x, n, rv = VFR_OK, port;
msgctx_t ctx;
@@ -1761,8 +1663,7 @@
swab_vf_msg_info_t(&(msg->vm_msg));
for (x = 0; x < membership->cml_count; x++) {
- if (!memb_online(membership,
- membership->cml_members[x].cn_nodeid))
+ if (!membership->cml_members[x].cn_member)
continue;
/* Can't request from self. */
@@ -1770,10 +1671,11 @@
continue;
rv = VFR_ERROR;
- fd = msg_open(membership->cml_members[x].cn_nodeid, port,
- &ctx, 15);
- if (fd == -1)
+ if (msg_open(MSG_CLUSTER,
+ membership->cml_members[x].cn_nodeid,
+ port, &ctx, 15) < 0) {
continue;
+ }
msg = &rmsg;
//printf("VF: Requesting current value of %s from %d\n",
/cvs/cluster/cluster/rgmanager/src/daemons/rg_event.c,v --> standard output
revision 1.1
--- cluster/rgmanager/src/daemons/rg_event.c
+++ - 2006-07-11 23:52:45.033199000 +0000
@@ -0,0 +1,103 @@
+/*
+ Copyright Red Hat, Inc. 2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
+#include <resgroup.h>
+#include <rg_locks.h>
+#include <gettid.h>
+#include <assert.h>
+#include <libcman.h>
+#include <ccs.h>
+#include <clulog.h>
+
+typedef struct __rge_q {
+ list_head();
+ char rg_name[128];
+ uint32_t rg_state;
+ int rg_owner;
+} rgevent_t;
+
+
+/**
+ * resource group event queue.
+ */
+static rgevent_t *rg_ev_queue = NULL;
+static pthread_mutex_t rg_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_t rg_ev_thread = 0;
+
+void group_event(char *name, uint32_t state, int owner);
+
+
+void *
+rg_event_thread(void *arg)
+{
+ rgevent_t *ev;
+
+ while (1) {
+ pthread_mutex_lock(&rg_queue_mutex);
+ ev = rg_ev_queue;
+ if (ev)
+ list_remove(&rg_ev_queue, ev);
+ else
+ break; /* We're outta here */
+ pthread_mutex_unlock(&rg_queue_mutex);
+
+ group_event(ev->rg_name, ev->rg_state, ev->rg_owner);
+
+ free(ev);
+ }
+
+ /* Mutex held */
+ rg_ev_thread = 0;
+ pthread_mutex_unlock(&rg_queue_mutex);
+ return NULL;
+}
+
+
+void
+rg_event_q(char *name, uint32_t state, int owner)
+{
+ rgevent_t *ev;
+ pthread_attr_t attrs;
+
+ while (1) {
+ ev = malloc(sizeof(rgevent_t));
+ if (ev) {
+ break;
+ }
+ sleep(1);
+ }
+
+ memset(ev,0,sizeof(*ev));
+
+ strncpy(ev->rg_name, name, 128);
+ ev->rg_state = state;
+ ev->rg_owner = owner;
+
+ pthread_mutex_lock (&rg_queue_mutex);
+ list_insert(&rg_ev_queue, ev);
+ if (rg_ev_thread == 0) {
+ pthread_attr_init(&attrs);
+ pthread_attr_setinheritsched(&attrs, PTHREAD_INHERIT_SCHED);
+ pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
+ pthread_attr_setstacksize(&attrs, 262144);
+
+ pthread_create(&rg_ev_thread, &attrs, rg_event_thread, NULL);
+ pthread_attr_destroy(&attrs);
+ }
+ pthread_mutex_unlock (&rg_queue_mutex);
+}
--- cluster/rgmanager/src/daemons/Makefile 2006/06/02 17:37:10 1.12
+++ cluster/rgmanager/src/daemons/Makefile 2006/07/11 23:52:41 1.13
@@ -41,8 +41,8 @@
clurgmgrd: rg_thread.o rg_locks.o main.o groups.o \
rg_queue.o rg_forward.o reslist.o \
resrules.o restree.o fo_domain.o nodeevent.o \
- watchdog.o rg_state.o
- $(CC) -o $@ $^ $(INCLUDE) $(CFLAGS) $(LDFLAGS) -lccs -lcman
+ rg_event.o watchdog.o rg_state.o ../clulib/libclulib.a
+ $(CC) -o $@ $^ $(INCLUDE) $(CFLAGS) $(LDFLAGS) -lccs -lcman -lpthread -ldlm
#
# Our test program links against the local allocator so that
--- cluster/rgmanager/src/daemons/fo_domain.c 2006/06/02 17:37:10 1.8
+++ cluster/rgmanager/src/daemons/fo_domain.c 2006/07/11 23:52:41 1.9
@@ -334,7 +334,7 @@
int found = 0;
int owned_by_node = 0, started = 0, no_owner = 0;
rg_state_t svc_state;
- void *lockp;
+ struct dlm_lksb lockp;
ENTER();
@@ -414,10 +414,10 @@
*/
clulog(LOG_WARNING, "Problem getting state information for "
"%s\n", rg_name);
- rg_unlock(rg_name, lockp);
+ rg_unlock(&lockp);
RETURN(FOD_BEST);
}
- rg_unlock(rg_name, lockp);
+ rg_unlock(&lockp);
/*
* Check to see if the service is started and if we are the owner in case of
--- cluster/rgmanager/src/daemons/groups.c 2006/06/02 17:37:10 1.18
+++ cluster/rgmanager/src/daemons/groups.c 2006/07/11 23:52:41 1.19
@@ -29,9 +29,12 @@
#include <reslist.h>
#include <assert.h>
-#define cm_svccount cm_pad[0] /* Theses are uint8_t size */
-#define cm_svcexcl cm_pad[1]
+/* Use address field in this because we never use it internally,
+ and there is no extra space in the cman_node_t type.
+ */
+#define cn_svccount cn_address.cna_address[0] /* Theses are uint8_t size */
+#define cn_svcexcl cn_address.cna_address[1]
static int config_version = 0;
static resource_t *_resources = NULL;
@@ -42,6 +45,9 @@
pthread_mutex_t config_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_rwlock_t resource_lock = PTHREAD_RWLOCK_INITIALIZER;
+void res_build_name(char *, size_t, resource_t *);
+int get_rg_state_local(char *, rg_state_t *);
+
struct status_arg {
msgctx_t *ctx;
@@ -71,17 +77,16 @@
int
count_resource_groups(cluster_member_list_t *ml)
{
-#if 0
resource_t *res;
char *rgname, *val;
int x;
rg_state_t st;
- void *lockp;
+ struct dlm_lksb lockp;
cman_node_t *mp;
for (x = 0; x < ml->cml_count; x++) {
- ml->cml_members[x].cm_svccount = 0;
- ml->cml_members[x].cm_svcexcl = 0;
+ ml->cml_members[x].cn_svccount = 0;
+ ml->cml_members[x].cn_svcexcl = 0;
}
pthread_rwlock_rdlock(&resource_lock);
@@ -102,11 +107,11 @@
if (get_rg_state(rgname, &st) < 0) {
clulog(LOG_ERR, "#34: Cannot get status "
"for service %s\n", rgname);
- rg_unlock(rgname, lockp);
+ rg_unlock(&lockp);
continue;
}
- rg_unlock(rgname, lockp);
+ rg_unlock(&lockp);
if (st.rs_state != RG_STATE_STARTED &&
st.rs_state != RG_STATE_STARTING)
@@ -116,18 +121,17 @@
if (!mp)
continue;
- ++mp->cm_svccount;
+ ++mp->cn_svccount;
val = res_attr_value(res, "exclusive");
if (val && ((!strcmp(val, "yes") ||
(atoi(val)>0))) ) {
- ++mp->cm_svcexcl;
+ ++mp->cn_svcexcl;
}
} while (!list_done(&_resources, res));
pthread_rwlock_unlock(&resource_lock);
-#endif
return 0;
}
@@ -189,13 +193,13 @@
if (exclusive) {
- if (0) {//(allowed->cml_members[x].cm_svccount > 0) {
+ if (allowed->cml_members[x].cn_svccount > 0) {
/* Definitely not this guy */
continue;
} else {
score += 2;
}
- } else if (0) { //(allowed->cml_members[x].cm_svcexcl) {
+ } else if (allowed->cml_members[x].cn_svcexcl) {
/* This guy has an exclusive resource group.
Can't relocate / failover to him. */
continue;
@@ -212,6 +216,42 @@
}
+int
+check_depend(resource_t *res)
+{
+ char *val;
+ rg_state_t rs;
+
+ val = res_attr_value(res, "depend");
+ if (!val)
+ /* No dependency */
+ return -1;
+
+ if (get_rg_state_local(val, &rs) == 0)
+ return (rs.rs_state == RG_STATE_STARTED);
+
+ return 1;
+}
+
+
+int
+check_depend_safe(char *rg_name)
+{
+ resource_t *res;
+ int ret;
+
+ pthread_rwlock_rdlock(&resource_lock);
+ res = find_root_by_ref(&_resources, rg_name);
+ if (!res)
+ return -1;
+
+ ret = check_depend(res);
+ pthread_rwlock_unlock(&resource_lock);
+
+ return ret;
+}
+
+
/**
Start or failback a resource group: if it's not running, start it.
If it is running and we're a better member to run it, then ask for
@@ -224,7 +264,7 @@
char *val;
cman_node_t *mp;
int autostart, exclusive;
- void *lockp;
+ struct dlm_lksb lockp;
mp = memb_id_to_p(membership, my_id());
assert(mp);
@@ -267,7 +307,7 @@
if (get_rg_state(svcName, svcStatus) != 0) {
clulog(LOG_ERR, "#34: Cannot get status "
"for service %s\n", svcName);
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return;
}
@@ -277,17 +317,25 @@
set_rg_state(svcName, svcStatus);
}
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return;
}
}
+ /* See if service this one depends on is running. If not,
+ don't start it */
+ if (check_depend(node->rn_resource) == 0) {
+ clulog(LOG_DEBUG,
+ "Skipping RG %s: Dependency missing\n", svcName);
+ return;
+ }
+
val = res_attr_value(node->rn_resource, "exclusive");
exclusive = val && ((!strcmp(val, "yes") || (atoi(val)>0)));
- if (0) { //(exclusive && mp->cm_svccount) {
- clulog(LOG_INFO,
+ if (exclusive && mp->cn_svccount) {
+ clulog(LOG_DEBUG,
"Skipping RG %s: Exclusive and I am running services\n",
svcName);
return;
@@ -297,8 +345,8 @@
Don't start other services if I'm running an exclusive
service.
*/
- if (0) { //(mp->cm_svcexcl) {
- clulog(LOG_INFO,
+ if (mp->cn_svcexcl) {
+ clulog(LOG_DEBUG,
"Skipping RG %s: I am running an exclusive service\n",
svcName);
return;
@@ -363,8 +411,8 @@
int
eval_groups(int local, uint64_t nodeid, int nodeStatus)
{
- void *lockp = NULL;
- char *svcName, *nodeName;
+ struct dlm_lksb lockp;
+ char svcName[64], *nodeName;
resource_node_t *node;
rg_state_t svcStatus;
cluster_member_list_t *membership;
@@ -385,10 +433,7 @@
list_do(&_tree, node) {
- if (node->rn_resource->r_rule->rr_root == 0)
- continue;
-
- svcName = node->rn_resource->r_attrs->ra_value;
+ res_build_name(svcName, sizeof(svcName), node->rn_resource);
/*
* Lock the service information and get the current service
@@ -407,11 +452,11 @@
clulog(LOG_ERR,
"#34: Cannot get status for service %s\n",
svcName);
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
continue;
}
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
if (svcStatus.rs_owner == 0)
nodeName = "none";
@@ -461,7 +506,7 @@
pthread_rwlock_unlock(&resource_lock);
free_member_list(membership);
- clulog(LOG_INFO, "Event (%d:%d:%d) Processed\n", local,
+ clulog(LOG_DEBUG, "Event (%d:%d:%d) Processed\n", local,
(int)nodeid, nodeStatus);
return 0;
@@ -469,6 +514,105 @@
/**
+ * Called to decide what services to start locally after a service event.
+ *
+ * @see eval_groups
+ */
+int
+group_event(char *rg_name, uint32_t state, int owner)
+{
+ char svcName[64], *nodeName;
+ resource_node_t *node;
+ rg_state_t svcStatus;
+ cluster_member_list_t *membership;
+ int depend;
+
+ if (rg_locked()) {
+ clulog(LOG_NOTICE,
+ "Resource groups locked; not evaluating\n");
+ return -EAGAIN;
+ }
+
+ membership = member_list();
+ if (!membership)
+ return -1;
+
+ pthread_rwlock_rdlock(&resource_lock);
+
+ /* Requires read lock */
+ count_resource_groups(membership);
+
+ list_do(&_tree, node) {
+
+ res_build_name(svcName, sizeof(svcName), node->rn_resource);
+
+ if (get_rg_state_local(svcName, &svcStatus) != 0)
+ continue;
+
+ if (svcStatus.rs_owner == 0)
+ nodeName = "none";
+ else
+ nodeName = memb_id_to_name(membership,
+ svcStatus.rs_owner);
+
+ /* Disabled/failed/in recovery? Do nothing */
+ if ((svcStatus.rs_state == RG_STATE_DISABLED) ||
+ (svcStatus.rs_state == RG_STATE_FAILED) ||
+ (svcStatus.rs_state == RG_STATE_RECOVER)) {
+ continue;
+ }
+
+ depend = check_depend(node->rn_resource);
+
+ /* Skip if we have no dependency */
+ if (depend == -1)
+ continue;
+
+ /*
+ If we have:
+ (a) a met dependency
+ (b) we're in the STOPPED state, and
+ (c) our new service event is a started service
+
+ Then see if we should start this other service as well.
+ */
+ if (depend == 1 &&
+ svcStatus.rs_state == RG_STATE_STOPPED &&
+ state == RG_STATE_STARTED) {
+
+ clulog(LOG_DEBUG, "Evaluating RG %s, state %s, owner "
+ "%s\n", svcName,
+ rg_state_str(svcStatus.rs_state),
+ nodeName);
+ consider_start(node, svcName, &svcStatus, membership);
+ continue;
+ }
+
+ /*
+ If we lost a dependency for this service and it's running
+ locally, stop it.
+ */
+ if (depend == 0 &&
+ svcStatus.rs_state == RG_STATE_STARTED &&
+ svcStatus.rs_owner == my_id()) {
+
+ clulog(LOG_WARNING, "Stopping service %s: Dependency missing\n",
+ svcName);
+ rt_enqueue_request(svcName, RG_STOP, NULL, 0, my_id(),
+ 0, 0);
+ }
+
+ } while (!list_done(&_tree, node));
+
+ pthread_rwlock_unlock(&resource_lock);
+ free_member_list(membership);
+
+ return 0;
+}
+
+
+
+/**
Perform an operation on a resource group. That is, walk down the
tree for that resource group, performing the given operation on
all children in the necessary order.
@@ -576,12 +720,11 @@
@param rgname Resource group name whose state we want to send.
@see send_rg_states
*/
-int get_rg_state_local(char *, rg_state_t *);
void
send_rg_state(msgctx_t *ctx, char *rgname, int fast)
{
rg_state_msg_t msg, *msgp = &msg;
- void *lockp;
+ struct dlm_lksb lockp;
msgp->rsm_hdr.gh_magic = GENERIC_HDR_MAGIC;
msgp->rsm_hdr.gh_length = sizeof(msg);
@@ -594,10 +737,10 @@
if (rg_lock(rgname, &lockp) < 0)
return;
if (get_rg_state(rgname, &msgp->rsm_state) < 0) {
- rg_unlock(rgname, lockp);
+ rg_unlock(&lockp);
return;
}
- rg_unlock(rgname, lockp);
+ rg_unlock(&lockp);
}
swab_rg_state_msg_t(msgp);
@@ -616,19 +759,19 @@
{
msgctx_t *ctx = ((struct status_arg *)arg)->ctx;
int fast = ((struct status_arg *)arg)->fast;
- resource_t *res;
+ resource_node_t *node;
generic_msg_hdr hdr;
+ char rg[64];
free(arg);
pthread_rwlock_rdlock(&resource_lock);
- list_do(&_resources, res) {
- if (res->r_rule->rr_root == 0)
- continue;
+ list_do(&_tree, node) {
- send_rg_state(ctx, res->r_attrs[0].ra_value, fast);
- } while (!list_done(&_resources, res));
+ res_build_name(rg, sizeof(rg), node->rn_resource);
+ send_rg_state(ctx, rg, fast);
+ } while (!list_done(&_tree, node));
pthread_rwlock_unlock(&resource_lock);
@@ -637,8 +780,8 @@
/* XXX wait for client to tell us it's done; I don't know why
this is needed when doing fast I/O, but it is. */
msg_receive(ctx, &hdr, sizeof(hdr), 10);
-
msg_close(ctx);
+ msg_free_ctx(ctx);
return NULL;
}
@@ -681,21 +824,20 @@
int
svc_exists(char *svcname)
{
- resource_t *res;
+ resource_node_t *node;
int ret = 0;
+ char rg[64];
pthread_rwlock_rdlock(&resource_lock);
- list_do(&_resources, res) {
- if (res->r_rule->rr_root == 0)
- continue;
+ list_do(&_tree, node) {
+ res_build_name(rg, sizeof(rg), node->rn_resource);
- if (strcmp(res->r_attrs[0].ra_value,
- svcname) == 0) {
+ if (strcmp(rg, svcname) == 0) {
ret = 1;
break;
}
- } while (!list_done(&_resources, res));
+ } while (!list_done(&_tree, node));
pthread_rwlock_unlock(&resource_lock);
@@ -707,25 +849,22 @@
rg_doall(int request, int block, char *debugfmt)
{
resource_node_t *curr;
- char *name;
rg_state_t svcblk;
+ char rg[64];
pthread_rwlock_rdlock(&resource_lock);
list_do(&_tree, curr) {
- if (curr->rn_resource->r_rule->rr_root == 0)
- continue;
-
/* Group name */
- name = curr->rn_resource->r_attrs->ra_value;
+ res_build_name(rg, sizeof(rg), curr->rn_resource);
if (debugfmt)
- clulog(LOG_DEBUG, debugfmt, name);
+ clulog(LOG_DEBUG, debugfmt, rg);
/* Optimization: Don't bother even queueing the request
during the exit case if we don't own it */
if (request == RG_STOP_EXITING) {
- if (get_rg_state_local(name, &svcblk) < 0)
+ if (get_rg_state_local(rg, &svcblk) < 0)
continue;
/* Always run stop if we're the owner, regardless
@@ -734,7 +873,7 @@
continue;
}
- rt_enqueue_request(name, request, NULL, 0,
+ rt_enqueue_request(rg, request, NULL, 0,
0, 0, 0);
} while (!list_done(&_tree, curr));
@@ -755,21 +894,17 @@
q_status_checks(void *arg)
{
resource_node_t *curr;
- char *name;
rg_state_t svcblk;
- void *lockp;
+ char rg[64];
pthread_rwlock_rdlock(&resource_lock);
list_do(&_tree, curr) {
- if (curr->rn_resource->r_rule->rr_root == 0)
- continue;
-
/* Group name */
- name = curr->rn_resource->r_attrs->ra_value;
+ res_build_name(rg, sizeof(rg), curr->rn_resource);
/* Local check - no one will make us take a service */
- if (get_rg_state_local(name, &svcblk) < 0) {
+ if (get_rg_state_local(rg, &svcblk) < 0) {
continue;
}
@@ -777,7 +912,7 @@
svcblk.rs_state != RG_STATE_STARTED)
continue;
- rt_enqueue_request(name, RG_STATUS,
+ rt_enqueue_request(rg, RG_STATUS,
NULL, 0, 0, 0, 0);
} while (!list_done(&_tree, curr));
@@ -811,24 +946,20 @@
do_condstops(void)
{
resource_node_t *curr;
- char *name;
rg_state_t svcblk;
int need_kill;
- void *lockp;
+ char rg[64];
clulog(LOG_INFO, "Stopping changed resources.\n");
pthread_rwlock_rdlock(&resource_lock);
list_do(&_tree, curr) {
- if (curr->rn_resource->r_rule->rr_root == 0)
- continue;
-
/* Group name */
- name = curr->rn_resource->r_attrs->ra_value;
+ res_build_name(rg, sizeof(rg), curr->rn_resource);
/* If we're not running it, no need to CONDSTOP */
- if (get_rg_state_local(name, &svcblk) < 0) {
+ if (get_rg_state_local(rg, &svcblk) < 0) {
continue;
}
@@ -839,10 +970,10 @@
need_kill = 0;
if (curr->rn_resource->r_flags & RF_NEEDSTOP) {
need_kill = 1;
- clulog(LOG_DEBUG, "Removing %s\n", name);
+ clulog(LOG_DEBUG, "Removing %s\n", rg);
}
- rt_enqueue_request(name, need_kill ? RG_DISABLE : RG_CONDSTOP,
+ rt_enqueue_request(rg, need_kill ? RG_DISABLE : RG_CONDSTOP,
NULL, 0, 0, 0, 0);
} while (!list_done(&_tree, curr));
@@ -859,10 +990,10 @@
do_condstarts(void)
{
resource_node_t *curr;
- char *name, *val;
+ char rg[64], *val;
rg_state_t svcblk;
int need_init, new_groups = 0, autostart;
- void *lockp;
+ struct dlm_lksb lockp;
clulog(LOG_INFO, "Starting changed resources.\n");
@@ -870,31 +1001,26 @@
pthread_rwlock_rdlock(&resource_lock);
list_do(&_tree, curr) {
- if (curr->rn_resource->r_rule->rr_root == 0)
- continue;
-
/* Group name */
- name = curr->rn_resource->r_attrs->ra_value;
+ res_build_name(rg, sizeof(rg), curr->rn_resource);
/* New RG. We'll need to initialize it. */
need_init = 0;
if (curr->rn_resource->r_flags & RF_NEEDSTART)
need_init = 1;
- if (get_rg_state_local(name, &svcblk) < 0) {
+ if (get_rg_state_local(rg, &svcblk) < 0)
continue;
- }
- if (!need_init && svcblk.rs_owner != my_id()) {
+ if (!need_init && svcblk.rs_owner != my_id())
continue;
- }
if (need_init) {
++new_groups;
- clulog(LOG_DEBUG, "Initializing %s\n", name);
+ clulog(LOG_DEBUG, "Initializing %s\n", rg);
}
- rt_enqueue_request(name, need_init ? RG_INIT : RG_CONDSTART,
+ rt_enqueue_request(rg, need_init ? RG_INIT : RG_CONDSTART,
NULL, 0, 0, 0, 0);
} while (!list_done(&_tree, curr));
@@ -909,21 +1035,18 @@
pthread_rwlock_rdlock(&resource_lock);
list_do(&_tree, curr) {
- if (curr->rn_resource->r_rule->rr_root == 0)
- continue;
-
/* Group name */
- name = curr->rn_resource->r_attrs->ra_value;
+ res_build_name(rg, sizeof(rg), curr->rn_resource);
/* New RG. We'll need to initialize it. */
if (!(curr->rn_resource->r_flags & RF_NEEDSTART))
continue;
- if (rg_lock(name, &lockp) != 0)
+ if (rg_lock(rg, &lockp) != 0)
continue;
- if (get_rg_state(name, &svcblk) < 0) {
- rg_unlock(name, lockp);
+ if (get_rg_state(rg, &svcblk) < 0) {
+ rg_unlock(&lockp);
continue;
}
@@ -933,7 +1056,7 @@
a truly new service, it will be in the UNINITIALIZED
state, which will be caught by eval_groups. */
if (svcblk.rs_state != RG_STATE_DISABLED) {
- rg_unlock(name, lockp);
+ rg_unlock(&lockp);
continue;
}
@@ -946,9 +1069,9 @@
else
svcblk.rs_state = RG_STATE_DISABLED;
- set_rg_state(name, &svcblk);
+ set_rg_state(rg, &svcblk);
- rg_unlock(name, lockp);
+ rg_unlock(&lockp);
} while (!list_done(&_tree, curr));
pthread_rwlock_unlock(&resource_lock);
--- cluster/rgmanager/src/daemons/main.c 2006/06/02 17:37:10 1.25
+++ cluster/rgmanager/src/daemons/main.c 2006/07/11 23:52:41 1.26
@@ -31,21 +31,23 @@
#include <members.h>
#include <msgsimple.h>
#include <vf.h>
+#include <lock.h>
#include <rg_queue.h>
#include <malloc.h>
+#include <cman-private.h>
#define L_SYS (1<<1)
#define L_USER (1<<0)
-int configure_logging(int ccsfd);
+int configure_logging(int ccsfd, int debug);
+void node_event(int, uint64_t, int);
void node_event_q(int, uint64_t, int);
int daemon_init(char *);
int init_resource_groups(int);
void kill_resource_groups(void);
-void set_my_id(uint64_t);
+void set_my_id(int);
int eval_groups(int, uint64_t, int);
-void graceful_exit(int);
void flag_shutdown(int sig);
void hard_exit(void);
int send_rg_states(msgctx_t *, int);
@@ -58,6 +60,7 @@
static int signalled = 0;
uint64_t next_node_id(cluster_member_list_t *membership, uint64_t me);
+int rg_event_q(char *svcName, uint32_t state, int owner);
void
@@ -74,49 +77,9 @@
int
-send_exit_msg(uint64_t nodeid)
+send_exit_msg(msgctx_t *ctx)
{
- msgctx_t ctx;
-
- if (msg_open(nodeid, RG_PORT, &ctx, 5) < 0) {
- printf("Failed to send exit message\n");
- return -1;
- }
- msg_send_simple(&ctx, RG_EXITING, 0, 0);
- msg_close(&ctx);
-
- return 0;
-}
-
-
-/**
- Notify other resource group managers that we're leaving, since
- cluster membership is not necessarily tied to the members running
- the rgmgr.
- */
-int
-notify_exiting(void)
-{
- int x;
- uint64_t partner;
- cluster_member_list_t *membership;
-
- membership = member_list();
- if (!membership)
- return 0;
-
- for (x = 0; x < membership->cml_count; x++) {
-
- partner = membership->cml_members[x].cn_nodeid;
-
- if (partner == my_id() ||
- !membership->cml_members[x].cn_member)
- continue;
-
- send_exit_msg(partner);
- }
-
- free_member_list(membership);
+ msg_send_simple(ctx, RG_EXITING, my_id(), 0);
return 0;
}
@@ -130,60 +93,6 @@
/**
- Called to handle the transition of a cluster member from up->down or
- down->up. This handles initializing services (in the local node-up case),
- exiting due to loss of quorum (local node-down), and service fail-over
- (remote node down).
-
- @param nodeID ID of the member which has come up/gone down.
- @param nodeStatus New state of the member in question.
- @see eval_groups
- */
-void
-node_event(int local, uint64_t nodeID, int nodeStatus)
-{
- if (!running)
- return;
-
- if (local) {
-
- /* Local Node Event */
- if (nodeStatus == 0)
- hard_exit();
-
- if (!rg_initialized()) {
- if (init_resource_groups(0) != 0) {
- clulog(LOG_ERR,
- "#36: Cannot initialize services\n");
- hard_exit();
- }
- }
-
- if (shutdown_pending) {
- clulog(LOG_NOTICE, "Processing delayed exit signal\n");
- graceful_exit(SIGINT);
- }
- setup_signal(SIGINT, graceful_exit);
- setup_signal(SIGTERM, graceful_exit);
- setup_signal(SIGHUP, flag_reconfigure);
-
- eval_groups(1, nodeID, 1);
- return;
- }
-
- /*
- * Nothing to do for events from other nodes if we are not ready.
- */
- if (!rg_initialized()) {
- clulog(LOG_DEBUG, "Services not initialized.\n");
- return;
- }
-
- eval_groups(0, nodeID, nodeStatus);
-}
-
-
-/**
This updates our local membership view and handles whether or not we
should exit, as well as determines node transitions (thus, calling
node_event()).
@@ -192,22 +101,43 @@
@return 0
*/
int
-membership_update(chandle_t *clu)
+membership_update(void)
{
- cluster_member_list_t *new_ml, *node_delta, *old_membership;
+ cluster_member_list_t *new_ml = NULL, *node_delta = NULL,
+ *old_membership = NULL;
int x;
int me = 0;
+ cman_handle_t h;
+ int quorate;
- if (!rg_quorate())
- return 0;
+ h = cman_init(NULL);
+ quorate = cman_is_quorate(h);
+ if (!quorate) {
+ cman_finish(h);
+
+ if (!rg_quorate())
+ return -1;
+
+ clulog(LOG_EMERG, "#1: Quorum Dissolved\n");
+ rg_set_inquorate();
+ member_list_update(NULL);/* Clear member list */
+ rg_lockall(L_SYS);
+ rg_doall(RG_INIT, 1, "Emergency stop of %s");
+ rg_set_uninitialized();
+ return -1;
+ } else if (!rg_quorate()) {
- clulog(LOG_INFO, "Magma Event: Membership Change\n");
+ rg_set_quorate();
+ rg_unlockall(L_SYS);
+ rg_unlockall(L_USER);
+ clulog(LOG_NOTICE, "Quorum Formed\n");
+ }
old_membership = member_list();
- new_ml = get_member_list(clu->c_cluster);
+ new_ml = get_member_list(h);
member_list_update(new_ml);
+ cman_finish(h);
- clulog(LOG_DEBUG, "I am node #%lld\n", my_id());
/*
* Handle nodes lost. Do our local node event first.
@@ -218,7 +148,8 @@
if (me) {
/* Should not happen */
clulog(LOG_INFO, "State change: LOCAL OFFLINE\n");
- free_member_list(node_delta);
+ if (node_delta)
+ free_member_list(node_delta);
node_event(1, my_id(), 0);
/* NOTREACHED */
}
@@ -238,7 +169,6 @@
}
}
- /* Free nodes */
free_member_list(node_delta);
/*
@@ -263,8 +193,7 @@
clulog(LOG_INFO, "State change: %s UP\n",
node_delta->cml_members[x].cn_name);
- node_event_q(0, node_delta->cml_members[x].cn_nodeid,
- 1);
+ node_event_q(0, node_delta->cml_members[x].cn_nodeid, 1);
}
free_member_list(node_delta);
@@ -309,10 +238,61 @@
}
-int
+#if 0
+struct lr_arg {
+ msgctx_t *ctx;
+ int req;
+};
+
+
+void *
+lockreq_th(void *a)
+{
+ int ret;
+ char state;
+ struct lr_arg *lr_arg = (struct lr_arg *)a;
+ cluster_member_list_t *m = member_list();
+
+ state = (lr_arg->req==RG_LOCK)?1:0;
+ ret = vf_write(m, VFF_IGN_CONN_ERRORS, "rg_lockdown", &state, 1);
+ free_member_list(m);
+
+ if (ret == 0) {
+ msg_send_simple(lr_arg->ctx, RG_SUCCESS, 0, 0);
+ } else {
+ msg_send_simple(lr_arg->ctx, RG_FAIL, 0, 0);
+ }
+
+ msg_close(lr_arg->ctx);
+ msg_free_ctx(lr_arg->ctx);
+ free(lr_arg);
+ return NULL;
+}
+
+
+void
+do_lockreq(msgctx_t *ctx, int req)
+{
+ pthread_t th;
+ struct lr_arg *arg;
+
+ arg = malloc(sizeof(*arg));
+ if (!arg) {
+ msg_send_simple(ctx, RG_FAIL, 0, 0);
+ msg_close(ctx);
+ msg_free_ctx(ctx);
+ return 0;
+ }
+
+ arg->ctx = ctx;
+ arg->req = req;
+
+ pthread_create(&th, NULL, lockreq_th, (void *)arg);
+}
+#else
+void
do_lockreq(msgctx_t *ctx, int req)
{
-#if 0
int ret;
char state;
cluster_member_list_t *m = member_list();
@@ -326,9 +306,9 @@
} else {
msg_send_simple(ctx, RG_FAIL, 0, 0);
}
-#endif
- return 0;
}
+#endif
+
/**
@@ -341,20 +321,35 @@
* @see quorum_msg
*/
int
-dispatch_msg(msgctx_t *ctx, uint64_t nodeid)
+dispatch_msg(msgctx_t *ctx, int nodeid, int need_close)
{
- int ret = -1;
+ int ret = 0, sz = -1;
char msgbuf[4096];
- generic_msg_hdr *msg_hdr = (generic_msg_hdr *)msg_hdr;
+ generic_msg_hdr *msg_hdr = (generic_msg_hdr *)msgbuf;
SmMessageSt *msg_sm = (SmMessageSt *)msgbuf;
+ memset(msgbuf, 0, sizeof(msgbuf));
+
/* Peek-a-boo */
- ret = msg_receive(ctx, msg_hdr, sizeof(msgbuf), 10);
- if (ret < sizeof (generic_msg_hdr)) {
- clulog(LOG_ERR, "#37: Error receiving message header\n");
+ sz = msg_receive(ctx, msg_hdr, sizeof(msgbuf), 10);
+ if (sz < sizeof (generic_msg_hdr)) {
+ clulog(LOG_ERR, "#37: Error receiving message header (%d)\n", sz);
goto out;
}
+ if (sz < 0)
+ return -1;
+
+ if (sz > sizeof(msgbuf)) {
+ raise(SIGSTOP);
+ }
+
+ /*
+ printf("RECEIVED %d %d %d %p\n", sz, (int)sizeof(msgbuf),
+ (int)sizeof(generic_msg_hdr), ctx);
+ msg_print(ctx);
+ */
+
/* Decode the header */
swab_generic_msg_hdr(msg_hdr);
if ((msg_hdr->gh_magic != GENERIC_HDR_MAGIC)) {
@@ -364,28 +359,23 @@
goto out;
}
- if (msg_hdr->gh_length != ret) {
+ if (msg_hdr->gh_length != sz) {
clulog(LOG_ERR, "#XX: Read size mismatch: %d %d\n",
ret, msg_hdr->gh_length);
goto out;
}
- ret = 0;
switch (msg_hdr->gh_command) {
case RG_STATUS:
- clulog(LOG_DEBUG, "Sending service states to ctx%p\n",ctx);
+ clulog(LOG_DEBUG, "Sending service states to CTX%p\n",ctx);
send_rg_states(ctx, msg_hdr->gh_arg1);
+ need_close = 0;
break;
case RG_LOCK:
- if (rg_quorate())
- do_lockreq(ctx, RG_LOCK);
- msg_close(ctx);
- break;
-
case RG_UNLOCK:
if (rg_quorate())
- do_lockreq(ctx, RG_UNLOCK);
+ do_lockreq(ctx, msg_hdr->gh_command);
break;
case RG_QUERY_LOCK:
@@ -395,19 +385,18 @@
}
break;
-
case RG_ACTION_REQUEST:
- if (ret != sizeof(msg_sm)) {
+ if (sz < sizeof(msg_sm)) {
clulog(LOG_ERR,
- "#39: Error receiving entire request\n");
+ "#39: Error receiving entire request (%d/%d)\n",
+ ret, (int)sizeof(msg_sm));
ret = -1;
goto out;
}
/* XXX perf: reencode header */
swab_generic_msg_hdr(msg_hdr);
-
/* Decode SmMessageSt message */
swab_SmMessageSt(msg_sm);
@@ -430,21 +419,47 @@
ctx, 0, msg_sm->sm_data.d_svcOwner, 0, 0);
return 0;
+ case RG_EVENT:
+ /* Service event. Run a dependency check */
+ if (sz < sizeof(msg_sm)) {
+ clulog(LOG_ERR,
+ "#39: Error receiving entire request (%d/%d)\n",
+ ret, (int)sizeof(msg_sm));
+ ret = -1;
+ goto out;
+ }
+
+ /* XXX perf: reencode header */
+ swab_generic_msg_hdr(msg_hdr);
+ /* Decode SmMessageSt message */
+ swab_SmMessageSt(msg_sm);
+
+ /* Send to our rg event handler */
+ rg_event_q(msg_sm->sm_data.d_svcName,
+ msg_sm->sm_data.d_action,
+ msg_sm->sm_data.d_svcOwner);
+ break;
+
case RG_EXITING:
clulog(LOG_NOTICE, "Member %d is now offline\n", (int)nodeid);
-
node_event(0, nodeid, 0);
break;
+ case VF_MESSAGE:
+ /* Ignore; our VF thread handles these */
+ break;
+
default:
clulog(LOG_DEBUG, "unhandled message request %d\n",
msg_hdr->gh_command);
break;
}
-
out:
- msg_close(ctx);
+ if (need_close) {
+ msg_close(ctx);
+ msg_free_ctx(ctx);
+ }
return ret;
}
@@ -455,47 +470,65 @@
@return Event
*/
int
-handle_cluster_event(chandle_t *clu, msgctx_t *ctx)
+handle_cluster_event(msgctx_t *ctx)
{
int ret;
+ msgctx_t *newctx;
+ int nodeid;
ret = msg_wait(ctx, 0);
+
switch(ret) {
case M_PORTOPENED:
+ msg_receive(ctx, NULL, 0, 0);
+ clulog(LOG_DEBUG, "Event: Port Opened\n");
+ membership_update();
+ break;
case M_PORTCLOSED:
/* Might want to handle powerclosed like membership change */
+ msg_receive(ctx, NULL, 0, 0);
+ clulog(LOG_DEBUG, "Event: Port Closed\n");
+ membership_update();
+ break;
case M_NONE:
+ msg_receive(ctx, NULL, 0, 0);
clulog(LOG_DEBUG, "NULL cluster message\n");
break;
case M_OPEN:
+ newctx = msg_new_ctx();
+ if (msg_accept(ctx, newctx) >= 0 &&
+ rg_quorate()) {
+ /* Handle message */
+ /* When request completes, the fd is closed */
+ nodeid = msg_get_nodeid(newctx);
+ dispatch_msg(newctx, nodeid, 1);
+ break;
+ }
+ break;
+
+ case M_DATA:
+ dispatch_msg(ctx, nodeid, 0);
+ break;
+
case M_OPEN_ACK:
case M_CLOSE:
clulog(LOG_DEBUG, "I should NOT get here: %d\n",
ret);
break;
case M_STATECHANGE:
+ msg_receive(ctx, NULL, 0, 0);
clulog(LOG_DEBUG, "Membership Change Event\n");
if (rg_quorate() && running) {
rg_unlockall(L_SYS);
- membership_update(clu);
+ membership_update();
}
break;
- rg_set_quorate();
- rg_unlockall(L_SYS);
- rg_unlockall(L_USER);
- clulog(LOG_NOTICE, "Quorum Achieved\n");
- membership_update(clu);
+ case 998:
break;
case 999:
- clulog(LOG_EMERG, "#1: Quorum Dissolved\n");
- rg_set_inquorate();
- member_list_update(NULL); /* Clear member list */
- rg_lockall(L_SYS);
- rg_doall(RG_INIT, 1, "Emergency stop of %s");
- rg_set_uninitialized();
- break;
case M_TRY_SHUTDOWN:
+ msg_receive(ctx, NULL, 0, 0);
clulog(LOG_WARNING, "#67: Shutting down uncleanly\n");
rg_set_inquorate();
rg_doall(RG_INIT, 1, "Emergency stop of %s");
@@ -512,15 +545,13 @@
void dump_threads(void);
int
-event_loop(chandle_t *clu)
+event_loop(msgctx_t *localctx, msgctx_t *clusterctx)
{
int n, max, ret;
fd_set rfds;
- msgctx_t newctx;
+ msgctx_t *newctx;
struct timeval tv;
int nodeid;
- msgctx_t *localctx = clu->local_ctx;
- msgctx_t *clusterctx = clu->cluster_ctx;
tv.tv_sec = 10;
tv.tv_usec = 0;
@@ -545,7 +576,8 @@
break;
if (msg_fd_isset(clusterctx, &rfds)) {
- handle_cluster_event(clu, clusterctx);
+ msg_fd_clr(clusterctx, &rfds);
+ handle_cluster_event(clusterctx);
continue;
}
@@ -553,7 +585,9 @@
continue;
}
- ret = msg_accept(localctx, &newctx);
+ msg_fd_clr(localctx, &rfds);
+ newctx = msg_new_ctx();
+ ret = msg_accept(localctx, newctx);
if (ret == -1)
continue;
@@ -561,25 +595,27 @@
if (rg_quorate()) {
/* Handle message */
/* When request completes, the fd is closed */
- nodeid = msg_get_nodeid(&newctx);
- dispatch_msg(&newctx, nodeid);
+ nodeid = msg_get_nodeid(newctx);
+ dispatch_msg(newctx, nodeid, 1);
continue;
}
if (!rg_initialized()) {
- msg_close(&newctx);
+ msg_close(newctx);
+ msg_free_ctx(newctx);
continue;
}
if (!rg_quorate()) {
printf("Dropping connect: NO QUORUM\n");
- msg_close(&newctx);
+ msg_close(newctx);
+ msg_free_ctx(newctx);
}
}
if (need_reconfigure || check_config_update()) {
need_reconfigure = 0;
- configure_logging(-1);
+ configure_logging(-1, 0);
init_resource_groups(1);
return 0;
}
@@ -606,13 +642,6 @@
void
-graceful_exit(int sig)
-{
- running = 0;
-}
-
-
-void
hard_exit(void)
{
rg_lockall(L_SYS);
@@ -625,12 +654,9 @@
void
cleanup(msgctx_t *clusterctx)
{
- rg_lockall(L_SYS);
- rg_doall(RG_STOP_EXITING, 1, NULL);
- //vf_shutdown();
kill_resource_groups();
member_list_update(NULL);
- notify_exiting();
+ send_exit_msg(clusterctx);
}
@@ -649,7 +675,7 @@
* Configure logging based on data in cluster.conf
*/
int
-configure_logging(int ccsfd)
+configure_logging(int ccsfd, int dbg)
{
char *v;
char internal = 0;
@@ -667,7 +693,8 @@
}
if (ccs_get(ccsfd, "/cluster/rm/@log_level", &v) == 0) {
- clu_set_loglevel(atoi(v));
+ if (!dbg)
+ clu_set_loglevel(atoi(v));
free(v);
}
@@ -679,20 +706,21 @@
void
-clu_initialize(chandle_t *clu)
+clu_initialize(cman_handle_t **ch)
{
- cman_node_t me;
+ if (!ch)
+ exit(1);
- clu->c_cluster = cman_init(NULL);
- if (!clu->c_cluster) {
+ *ch = cman_init(NULL);
+ if (!(*ch)) {
clulog(LOG_NOTICE, "Waiting for CMAN to start\n");
- while (!(clu->c_cluster = cman_init(NULL))) {
+ while (!(*ch = cman_init(NULL))) {
sleep(1);
}
}
- if (!cman_is_quorate(clu->c_cluster)) {
+ if (!cman_is_quorate(*ch)) {
/*
There are two ways to do this; this happens to be the simpler
of the two. The other method is to join with a NULL group
@@ -701,14 +729,11 @@
*/
clulog(LOG_NOTICE, "Waiting for quorum to form\n");
- while (cman_is_quorate(clu->c_cluster) == 0) {
+ while (cman_is_quorate(*ch) == 0) {
sleep(1);
}
clulog(LOG_NOTICE, "Quorum formed, starting\n");
}
-
- cman_get_node(clu->c_cluster, CMAN_NODEID_US, &me);
- clu->c_nodeid = me.cn_nodeid;
}
@@ -722,15 +747,28 @@
}
+void *
+shutdown_thread(void *arg)
+{
+ rg_doall(RG_STOP_EXITING, 1, NULL);
+ running = 0;
+
+ return 0;
+}
+
+
int
main(int argc, char **argv)
{
int rv;
char foreground = 0;
- int quorate;
- int listen_fds[2], listeners;
- int myNodeID;
- chandle_t clu;
+ cman_node_t me;
+ msgctx_t *cluster_ctx;
+ msgctx_t *local_ctx;
+ int port = RG_PORT;
+ pthread_t th;
+
+ cman_handle_t *clu = NULL;
while ((rv = getopt(argc, argv, "fd")) != EOF) {
switch (rv) {
@@ -759,13 +797,25 @@
}
clu_initialize(&clu);
- set_my_id(clu.c_nodeid);
+ if (cman_init_subsys(clu) < 0) {
+ perror("cman_init_subsys");
+ return -1;
+ }
+
+ if (clu_lock_init("rgmanager") != 0) {
+ printf("Locks not working!\n");
+ return -1;
+ }
+
+ memset(&me, 0, sizeof(me));
+ cman_get_node(clu, CMAN_NODEID_US, &me);
+ set_my_id(me.cn_nodeid);
/*
We know we're quorate. At this point, we need to
read the resource group trees from ccsd.
*/
- configure_logging(-1);
+ configure_logging(-1, debug);
clulog(LOG_NOTICE, "Resource Group Manager Starting\n");
if (init_resource_groups(0) != 0) {
@@ -778,46 +828,60 @@
setup_signal(SIGUSR1, statedump);
unblock_signal(SIGCHLD);
setup_signal(SIGPIPE, SIG_IGN);
+
if (debug) {
setup_signal(SIGSEGV, segfault);
} else {
unblock_signal(SIGSEGV);
}
- if (msg_init(&clu) < 0) {
- clulog(LOG_CRIT, "#10: Couldn't set up message system\n");
+ if (msg_listen(MSG_SOCKET, RGMGR_SOCK, me.cn_nodeid, &local_ctx) < 0) {
+ clulog(LOG_CRIT,
+ "#10: Couldn't set up cluster message system: %s\n",
+ strerror(errno));
+ return -1;
+ }
+
+ if (msg_listen(MSG_CLUSTER, &port , me.cn_nodeid, &cluster_ctx) < 0) {
+ clulog(LOG_CRIT,
+ "#10b: Couldn't set up cluster message system: %s\n",
+ strerror(errno));
return -1;
}
rg_set_quorate();
- set_my_id(clu.c_nodeid);
+
+ /*
+ msg_print(local_ctx);
+ msg_print(cluster_ctx);
+ */
/*
Initialize the VF stuff.
*/
-#if 0
- if (vf_init(clu.c_nodeid, RG_VF_PORT, NULL, NULL) != 0) {
+ if (vf_init(me.cn_nodeid, RG_PORT, NULL, NULL) != 0) {
clulog(LOG_CRIT, "#11: Couldn't set up VF listen socket\n");
return -1;
}
vf_key_init("rg_lockdown", 10, NULL, lock_commit_cb);
-#endif
-
- /*
- Get an initial membership view.
- */
- membership_update(&clu);
/*
Do everything useful
*/
- while (running)
- event_loop(&clu);
+ while (running) {
+ event_loop(local_ctx, cluster_ctx);
+
+ if (shutdown_pending == 1) {
+ ++shutdown_pending;
+ clulog(LOG_NOTICE, "Shutting down\n");
+ pthread_create(&th, NULL, shutdown_thread, NULL);
+ }
+ }
- clulog(LOG_NOTICE, "Shutting down\n");
- cleanup(&clu);
+ cleanup(cluster_ctx);
clulog(LOG_NOTICE, "Shutdown complete, exiting\n");
+ cman_finish(clu);
/*malloc_dump_table(); */ /* Only works if alloc.c us used */
/*malloc_stats();*/
--- cluster/rgmanager/src/daemons/nodeevent.c 2006/06/02 17:37:10 1.2
+++ cluster/rgmanager/src/daemons/nodeevent.c 2006/07/11 23:52:41 1.3
@@ -20,6 +20,9 @@
#include <rg_locks.h>
#include <gettid.h>
#include <assert.h>
+#include <libcman.h>
+#include <ccs.h>
+#include <clulog.h>
typedef struct __ne_q {
list_head();
@@ -28,8 +31,6 @@
int ne_state;
} nevent_t;
-int node_event(int, uint64_t, int);
-
/**
* Node event queue.
*/
@@ -38,11 +39,120 @@
static pthread_t ne_thread = 0;
int ne_queue_request(int local, uint64_t nodeid, int state);
+void hard_exit(void);
+int init_resource_groups(int);
+void flag_shutdown(int sig);
+void flag_reconfigure(int sig);
+
+extern int running;
+extern int shutdown_pending;
+
+
+/**
+ Called to handle the transition of a cluster member from up->down or
+ down->up. This handles initializing services (in the local node-up case),
+ exiting due to loss of quorum (local node-down), and service fail-over
+ (remote node down).
+
+ @param nodeID ID of the member which has come up/gone down.
+ @param nodeStatus New state of the member in question.
+ @see eval_groups
+ */
+void
+node_event(int local, uint64_t nodeID, int nodeStatus)
+{
+ if (!running)
+ return;
+
+ if (local) {
+
+ /* Local Node Event */
+ if (nodeStatus == 0)
+ hard_exit();
+
+ if (!rg_initialized()) {
+ if (init_resource_groups(0) != 0) {
+ clulog(LOG_ERR,
+ "#36: Cannot initialize services\n");
+ hard_exit();
+ }
+ }
+
+ if (shutdown_pending) {
+ clulog(LOG_NOTICE, "Processing delayed exit signal\n");
+ running = 0;
+ }
+ setup_signal(SIGINT, flag_shutdown);
+ setup_signal(SIGTERM, flag_shutdown);
+ setup_signal(SIGHUP, flag_reconfigure);
+
+ eval_groups(1, nodeID, 1);
+ return;
+ }
+
+ /*
+ * Nothing to do for events from other nodes if we are not ready.
+ */
+ if (!rg_initialized()) {
+ clulog(LOG_DEBUG, "Services not initialized.\n");
+ return;
+ }
+
+ eval_groups(0, nodeID, nodeStatus);
+}
+
+
+int
+node_has_fencing(int nodeid)
+{
+ int ccs_desc;
+ char *val = NULL;
+ char buf[1024];
+ int ret = 1;
+
+ ccs_desc = ccs_connect();
+ if (ccs_desc < 0) {
+ clulog(LOG_ERR, "Unable to connect to ccsd; cannot handle"
+ " node event!\n");
+ /* Assume node has fencing */
+ return 1;
+ }
+
+ snprintf(buf, sizeof(buf),
+ "/cluster/clusternodes/clusternode[@nodeid=\"%d\"]"
+ "/fence/method/device/@name", nodeid);
+
+ if (ccs_get(ccs_desc, buf, &val) != 0)
+ ret = 0;
+ if (val)
+ free(val);
+ ccs_disconnect(ccs_desc);
+ return ret;
+}
+
+
+int
+node_fenced(int nodeid)
+{
+ cman_handle_t ch;
+ int fenced = 0;
+ uint64_t fence_time;
+
+ ch = cman_init(NULL);
+ if (cman_get_fenceinfo(ch, nodeid, &fence_time, &fenced, NULL) < 0)
+ fenced = 0;
+
+ cman_finish(ch);
+
+ return fenced;
+}
+
void *
node_event_thread(void *arg)
{
nevent_t *ev;
+ int notice;
while (1) {
pthread_mutex_lock(&ne_queue_mutex);
@@ -53,6 +163,22 @@
break; /* We're outta here */
pthread_mutex_unlock(&ne_queue_mutex);
+ if (ev->ne_state == 0 && node_has_fencing(ev->ne_nodeid)) {
+ notice = 0;
+ while (!node_fenced(ev->ne_nodeid)) {
+ if (!notice) {
+ notice = 1;
+ clulog(LOG_INFO, "Waiting for "
+ "node #%d to be fenced\n",
+ ev->ne_nodeid);
+ }
+ sleep(2);
+ }
+ if (notice)
+ clulog(LOG_INFO, "Node #%d fenced; "
+ "continuing\n", ev->ne_nodeid);
+ }
+
node_event(ev->ne_local, ev->ne_nodeid, ev->ne_state);
free(ev);
@@ -60,7 +186,6 @@
/* Mutex held */
ne_thread = 0;
- rg_dec_threads();
pthread_mutex_unlock(&ne_queue_mutex);
return NULL;
}
@@ -96,8 +221,6 @@
pthread_create(&ne_thread, &attrs, node_event_thread, NULL);
pthread_attr_destroy(&attrs);
-
- rg_inc_threads();
}
pthread_mutex_unlock (&ne_queue_mutex);
}
--- cluster/rgmanager/src/daemons/reslist.c 2006/06/02 17:37:10 1.13
+++ cluster/rgmanager/src/daemons/reslist.c 2006/07/11 23:52:41 1.14
@@ -33,6 +33,13 @@
char *attr_value(resource_node_t *node, char *attrname);
char *rg_attr_value(resource_node_t *node, char *attrname);
+void
+res_build_name(char *buf, size_t buflen, resource_t *res)
+{
+ snprintf(buf, buflen, "%s:%s", res->r_rule->rr_type,
+ res->r_attrs[0].ra_value);
+}
+
/**
Find and determine an attribute's value.
@@ -265,11 +272,23 @@
find_root_by_ref(resource_t **reslist, char *ref)
{
resource_t *curr;
+ char ref_buf[128];
+ char *type;
+ char *name;
int x;
+ snprintf(ref_buf, sizeof(ref_buf), "%s", ref);
+
+ type = ref_buf;
+ if ((name = strchr(ref_buf, ':'))) {
+ *name = 0;
+ name++;
+ } else {
+ /* Default type */
+ type = "service";
+ }
+
list_do(reslist, curr) {
- if (curr->r_rule->rr_root == 0)
- continue;
/*
This should be one operation - the primary attr
@@ -277,15 +296,18 @@
*/
for (x = 0; curr->r_attrs && curr->r_attrs[x].ra_name;
x++) {
+ if (strcmp(type, curr->r_rule->rr_type))
+ continue;
if (!(curr->r_attrs[x].ra_flags & RA_PRIMARY))
continue;
- if (strcmp(ref, curr->r_attrs[x].ra_value))
+ if (strcmp(name, curr->r_attrs[x].ra_value))
continue;
return curr;
}
} while (!list_done(reslist, curr));
+
return NULL;
}
@@ -447,8 +469,6 @@
int x;
printf("Resource type: %s", res->r_rule->rr_type);
- if (res->r_rule->rr_root)
- printf(" [ROOT]");
if (res->r_flags & RF_INLINE)
printf(" [INLINE]");
if (res->r_flags & RF_NEEDSTART)
--- cluster/rgmanager/src/daemons/restree.c 2006/06/02 17:37:10 1.19
+++ cluster/rgmanager/src/daemons/restree.c 2006/07/11 23:52:41 1.20
@@ -1,5 +1,5 @@
/*
- Copyright Red Hat, Inc. 2004
+ Copyright Red Hat, Inc. 2004-2006
This program is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
--- cluster/rgmanager/src/daemons/rg_forward.c 2006/06/02 17:37:10 1.4
+++ cluster/rgmanager/src/daemons/rg_forward.c 2006/07/11 23:52:41 1.5
@@ -47,24 +47,26 @@
{
rg_state_t rgs;
request_t *req = (request_t *)arg;
- void *lockp;
+ struct dlm_lksb lockp;
msgctx_t ctx;
SmMessageSt msg;
if (rg_lock(req->rr_group, &lockp) != 0) {
msg_close(req->rr_resp_ctx);
+ msg_free_ctx(req->rr_resp_ctx);
rq_free(req);
pthread_exit(NULL);
}
if (get_rg_state(req->rr_group, &rgs) != 0) {
- rg_unlock(req->rr_group, lockp);
+ rg_unlock(&lockp);
msg_close(req->rr_resp_ctx);
+ msg_free_ctx(req->rr_resp_ctx);
rq_free(req);
pthread_exit(NULL);
}
- rg_unlock(req->rr_group, lockp);
+ rg_unlock(&lockp);
/* Construct message */
build_message(&msg, req->rr_request, req->rr_group, req->rr_target);
@@ -74,8 +76,9 @@
rg_req_str(req->rr_request), (int)rgs.rs_owner);
*/
- if (msg_open(rgs.rs_owner, RG_PORT, &ctx, 10) < 0) {
+ if (msg_open(MSG_CLUSTER, rgs.rs_owner, RG_PORT, &ctx, 10) < 0) {
msg_close(req->rr_resp_ctx);
+ msg_free_ctx(req->rr_resp_ctx);
rq_free(req);
pthread_exit(NULL);
}
@@ -83,6 +86,7 @@
if (msg_send(&ctx, &msg, sizeof(msg)) != sizeof(msg)) {
msg_close(&ctx);
msg_close(req->rr_resp_ctx);
+ msg_free_ctx(req->rr_resp_ctx);
rq_free(req);
pthread_exit(NULL);
}
@@ -90,6 +94,7 @@
if (msg_receive(&ctx, &msg, sizeof(msg),10) != sizeof(msg)) {
msg_close(&ctx);
msg_close(req->rr_resp_ctx);
+ msg_free_ctx(req->rr_resp_ctx);
rq_free(req);
pthread_exit(NULL);
}
--- cluster/rgmanager/src/daemons/rg_state.c 2006/06/02 17:37:10 1.16
+++ cluster/rgmanager/src/daemons/rg_state.c 2006/07/11 23:52:41 1.17
@@ -26,6 +26,7 @@
#include <string.h>
#include <resgroup.h>
#include <clulog.h>
+#include <lock.h>
#include <rg_locks.h>
#include <ccs.h>
#include <rg_queue.h>
@@ -41,6 +42,7 @@
int set_rg_state(char *servicename, rg_state_t *svcblk);
int get_rg_state(char *servicename, rg_state_t *svcblk);
void get_recovery_policy(char *rg_name, char *buf, size_t buflen);
+int check_depend_safe(char *servicename);
uint64_t
@@ -67,11 +69,35 @@
}
+void
+broadcast_event(char *svcName, uint32_t state)
+{
+ SmMessageSt msgp;
+ msgctx_t everyone;
+
+ msgp.sm_hdr.gh_magic = GENERIC_HDR_MAGIC;
+ msgp.sm_hdr.gh_command = RG_EVENT;
+ msgp.sm_hdr.gh_length = sizeof(msgp);
+ msgp.sm_data.d_action = state;
+ strncpy(msgp.sm_data.d_svcName, svcName,
+ sizeof(msgp.sm_data.d_svcName));
+ msgp.sm_data.d_svcOwner = 0;
+ msgp.sm_data.d_ret = 0;
+
+ swab_SmMessageSt(&msgp);
+
+ if (msg_open(MSG_CLUSTER, 0, RG_PORT, &everyone, 0) < 0)
+ return;
+
+ msg_send(&everyone, &msgp, sizeof(msgp));
+ msg_close(&everyone);
+}
+
+
int
svc_report_failure(char *svcName)
{
-#if 0
- void *lockp = NULL;
+ struct dlm_lksb lockp;
rg_state_t svcStatus;
char *nodeName;
cluster_member_list_t *membership;
@@ -85,10 +111,10 @@
if (get_rg_state(svcName, &svcStatus) != 0) {
clulog(LOG_ERR, "#42: Couldn't obtain status for RG %s\n",
svcName);
- clu_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return -1;
}
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
membership = member_list();
nodeName = memb_id_to_name(membership, svcStatus.rs_last_owner);
@@ -107,104 +133,27 @@
"#4: Administrator intervention required.\n",
svcName, nodeName);
-#endif
return 0;
}
int
-clu_lock_verbose(char *resource, int dflt_flags, void **lockpp)
-{
-#if 0
- int ret, timed_out = 0;
- struct timeval start, now;
- uint64_t nodeid, *p;
- int flags;
- int block = !(dflt_flags & CLK_NOWAIT);
-
- /* Holder not supported for this call */
- dflt_flags &= ~CLK_HOLDER;
-
- flags = dflt_flags;
-
- if (block) {
- gettimeofday(&start, NULL);
- start.tv_sec += 30;
- }
- while (1) {
- if (block) {
- gettimeofday(&now, NULL);
-
- if ((now.tv_sec > start.tv_sec) ||
- ((now.tv_sec == start.tv_sec) &&
- (now.tv_usec >= start.tv_usec))) {
-
- gettimeofday(&start, NULL);
- start.tv_sec += 30;
-
- timed_out = 1;
- flags |= CLK_HOLDER;
- }
- }
-
- *lockpp = NULL;
- ret = clu_lock(resource, flags | CLK_NOWAIT, lockpp);
-
- if ((ret != 0) && (errno == EAGAIN) && block) {
- if (timed_out) {
- p = (uint64_t *)*lockpp;
- if (p) {
- nodeid = *p;
- clulog(LOG_WARNING, "Node ID:%08x%08x"
- " stuck with lock %s\n",
- (uint32_t)(nodeid>>32&0xffffffff),
- (uint32_t)nodeid&0xffffffff,
- resource);
- free(p);
- } else {
- clulog(LOG_WARNING, "Starving for lock"
- " %s\n", resource);
- }
- flags = dflt_flags;
- timed_out = 0;
- }
- usleep(random()&32767<<1);
- continue;
-
- } else if (ret == 0) {
- /* Success */
- return 0;
- }
-
- break;
- }
-
- return ret;
-#endif
- return -1;
-}
-
-
-int
#ifdef DEBUG
-_rg_lock(char *name, void **p)
+_rg_lock(char *name, struct dlm_lksb *p)
#else
-rg_lock(char *name, void **p)
+rg_lock(char *name, struct dlm_lksb *p)
#endif
{
-#if 0
char res[256];
snprintf(res, sizeof(res), "usrm::rg=\"%s\"", name);
- return clu_lock_verbose(res, CLK_EX, p);
-#endif
- return -1;
+ return clu_lock(LKM_EXMODE, p, 0, res);
}
#ifdef DEBUG
int
-_rg_lock_dbg(char *name, void **p, char *file, int line)
+_rg_lock_dbg(char *name, struct dlm_lksb *p, char *file, int line)
{
dprintf("rg_lock(%s) @ %s:%d\n", name, file, line);
return _rg_lock(name, p);
@@ -215,27 +164,21 @@
int
#ifdef DEBUG
-_rg_unlock(char *name, void *p)
+_rg_unlock(struct dlm_lksb *p)
#else
-rg_unlock(char *name, void *p)
+rg_unlock(struct dlm_lksb *p)
#endif
{
-#if 0
- char res[256];
-
- snprintf(res, sizeof(res), "usrm::rg=\"%s\"", name);
- return clu_unlock(res, p);
-#endif
- return -1;
+ return clu_unlock(p);
}
#ifdef DEBUG
int
-_rg_unlock_dbg(char *name, void *p, char *file, int line)
+_rg_unlock_dbg(void *p, char *file, int line)
{
- dprintf("rg_unlock(%s) @ %s:%d\n", name, file, line);
- return _rg_unlock(name, p);
+ dprintf("rg_unlock() @ %s:%d\n", file, line);
+ return _rg_unlock(p);
}
#endif
@@ -293,7 +236,6 @@
int
set_rg_state(char *name, rg_state_t *svcblk)
{
-#if 0
cluster_member_list_t *membership;
char res[256];
int ret;
@@ -307,8 +249,6 @@
sizeof(*svcblk));
free_member_list(membership);
return ret;
-#endif
- return -1;
}
@@ -329,7 +269,6 @@
int
get_rg_state(char *name, rg_state_t *svcblk)
{
-#if 0
char res[256];
int ret;
void *data = NULL;
@@ -381,8 +320,7 @@
free(data);
free_member_list(membership);
-#endif
- return -1;
+ return 0;
}
@@ -390,7 +328,6 @@
int
get_rg_state_local(char *name, rg_state_t *svcblk)
{
-#if 0
char res[256];
int ret;
void *data = NULL;
@@ -422,9 +359,7 @@
/* Copy out the data. */
memcpy(svcblk, data, sizeof(*svcblk));
free(data);
-
-#endif
- return -1;
+ return 0;
}
@@ -714,7 +649,7 @@
int
svc_start(char *svcName, int req)
{
- void *lockp = NULL;
+ struct dlm_lksb lockp;
int ret;
rg_state_t svcStatus;
@@ -725,7 +660,7 @@
}
if (get_rg_state(svcName, &svcStatus) != 0) {
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
clulog(LOG_ERR, "#46: Failed getting status for RG %s\n",
svcName);
return FAIL;
@@ -734,13 +669,13 @@
/* LOCK HELD */
switch (svc_advise_start(&svcStatus, svcName, req)) {
case 0: /* Don't start service, return FAIL */
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return FAIL;
case 2: /* Don't start service, return 0 */
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return 0;
case 3:
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return RG_EAGAIN;
default:
break;
@@ -760,11 +695,11 @@
if (set_rg_state(svcName, &svcStatus) != 0) {
clulog(LOG_ERR,
"#47: Failed changing service status\n");
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return FAIL;
}
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
ret = group_op(svcName, RG_START);
ret = !!ret; /* Either it worked or it didn't. Ignore all the
@@ -780,19 +715,22 @@
if (set_rg_state(svcName, &svcStatus) != 0) {
clulog(LOG_ERR,
"#75: Failed changing service status\n");
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return FAIL;
}
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
- if (ret == 0)
+ if (ret == 0) {
clulog(LOG_NOTICE,
"Service %s started\n",
svcName);
- else
+
+ broadcast_event(svcName, RG_STATE_STARTED);
+ } else {
clulog(LOG_WARNING,
"#68: Failed to start %s; return value: %d\n",
svcName, ret);
+ }
return ret;
}
@@ -807,7 +745,7 @@
int
svc_status(char *svcName)
{
- void *lockp = NULL;
+ struct dlm_lksb lockp;
rg_state_t svcStatus;
if (rg_lock(svcName, &lockp) < 0) {
@@ -817,12 +755,12 @@
}
if (get_rg_state(svcName, &svcStatus) != 0) {
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
clulog(LOG_ERR, "#49: Failed getting status for RG %s\n",
svcName);
return FAIL;
}
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
if (svcStatus.rs_owner != my_id())
/* Don't check status for anything not owned */
@@ -847,7 +785,7 @@
static int
_svc_stop(char *svcName, int req, int recover, uint32_t newstate)
{
- void *lockp = NULL;
+ struct dlm_lksb lockp;
rg_state_t svcStatus;
int ret;
@@ -864,7 +802,7 @@
}
if (get_rg_state(svcName, &svcStatus) != 0) {
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
clulog(LOG_ERR, "#51: Failed getting status for RG %s\n",
svcName);
return FAIL;
@@ -872,18 +810,18 @@
switch (svc_advise_stop(&svcStatus, svcName, req)) {
case 0:
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
clulog(LOG_DEBUG, "Unable to stop RG %s in %s state\n",
svcName, rg_state_str(svcStatus.rs_state));
return FAIL;
case 2:
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return SUCCESS;
case 3:
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return RG_EFORWARD;
case 4:
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return RG_EAGAIN;
default:
break;
@@ -900,11 +838,11 @@
//printf("rg state = %s\n", rg_state_str(svcStatus.rs_state));
if (set_rg_state(svcName, &svcStatus) != 0) {
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
clulog(LOG_ERR, "#52: Failed changing RG status\n");
return FAIL;
}
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
ret = group_op(svcName, RG_STOP);
@@ -918,7 +856,7 @@
_svc_stop_finish(char *svcName, int failed, uint32_t newstate)
{
rg_state_t svcStatus;
- void *lockp;
+ struct dlm_lksb lockp;
if (rg_lock(svcName, &lockp) == FAIL) {
clulog(LOG_ERR, "#53: Unable to obtain cluster lock: %s\n",
@@ -927,7 +865,7 @@
}
if (get_rg_state(svcName, &svcStatus) != 0) {
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
clulog(LOG_ERR, "#54: Failed getting status for RG %s\n",
svcName);
return FAIL;
@@ -935,7 +873,7 @@
if ((svcStatus.rs_state != RG_STATE_STOPPING) &&
(svcStatus.rs_state != RG_STATE_ERROR)) {
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
return 0;
}
@@ -945,11 +883,13 @@
if (failed) {
clulog(LOG_CRIT, "#12: RG %s failed to stop; intervention "
"required\n", svcName);
- svcStatus.rs_state = RG_STATE_FAILED;
- } else if (svcStatus.rs_state == RG_STATE_ERROR)
+ newstate = RG_STATE_FAILED;
+ } else if (svcStatus.rs_state == RG_STATE_ERROR) {
svcStatus.rs_state = RG_STATE_RECOVER;
- else
- svcStatus.rs_state = newstate;
+ newstate = RG_STATE_RECOVER;
+ }
+
+ svcStatus.rs_state = newstate;
clulog(LOG_NOTICE, "Service %s is %s\n", svcName,
rg_state_str(svcStatus.rs_state));
@@ -957,11 +897,13 @@
svcStatus.rs_transition = (uint64_t)time(NULL);
if (set_rg_state(svcName, &svcStatus) != 0) {
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
clulog(LOG_ERR, "#55: Failed changing RG status\n");
return FAIL;
}
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
+
+ broadcast_event(svcName, newstate);
return 0;
}
@@ -999,7 +941,7 @@
int
svc_fail(char *svcName)
{
- void *lockp = NULL;
+ struct dlm_lksb lockp;
rg_state_t svcStatus;
if (rg_lock(svcName, &lockp) == FAIL) {
@@ -1011,7 +953,7 @@
clulog(LOG_DEBUG, "Handling failure request for RG %s\n", svcName);
if (get_rg_state(svcName, &svcStatus) != 0) {
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
clulog(LOG_ERR, "#56: Failed getting status for RG %s\n",
svcName);
return FAIL;
@@ -1019,7 +961,7 @@
if ((svcStatus.rs_state == RG_STATE_STARTED) &&
(svcStatus.rs_owner != my_id())) {
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
clulog(LOG_DEBUG, "Unable to disable RG %s in %s state\n",
svcName, rg_state_str(svcStatus.rs_state));
return FAIL;
@@ -1036,11 +978,13 @@
svcStatus.rs_transition = (uint64_t)time(NULL);
svcStatus.rs_restarts = 0;
if (set_rg_state(svcName, &svcStatus) != 0) {
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
clulog(LOG_ERR, "#57: Failed changing RG status\n");
return FAIL;
}
- rg_unlock(svcName, lockp);
+ rg_unlock(&lockp);
+
+ broadcast_event(svcName, RG_STATE_FAILED);
return 0;
}
@@ -1068,7 +1012,7 @@
/* Open a connection to the other node */
- if (msg_open(target, RG_PORT, &ctx, 2)< 0) {
+ if (msg_open(MSG_CLUSTER, target, RG_PORT, &ctx, 2)< 0) {
clulog(LOG_ERR,
"#58: Failed opening connection to member #%d\n",
target);
@@ -1355,6 +1299,11 @@
return FAIL;
}
free_member_list(membership);
+
+ /* Check for dependency. We cannot start unless our
+ dependency is met */
+ if (check_depend_safe(svcName) == 0)
+ return RG_EDEPEND;
/*
* This is a 'root' start request. We need to clear out our failure
--- cluster/rgmanager/src/daemons/test.c 2005/03/21 22:00:31 1.4
+++ cluster/rgmanager/src/daemons/test.c 2006/07/11 23:52:41 1.5
@@ -1,3 +1,21 @@
+/*
+ Copyright Red Hat, Inc. 2004-2006
+
+ This program is free software; you can redistribute it and/or modify it
+ under the terms of the GNU General Public License as published by the
+ Free Software Foundation; either version 2, or (at your option) any
+ later version.
+
+ This program is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 675 Mass Ave, Cambridge,
+ MA 02139, USA.
+*/
#include <libxml/parser.h>
#include <libxml/xmlmemory.h>
#include <libxml/xpath.h>
--- cluster/rgmanager/src/resources/service.sh 2006/06/02 17:37:10 1.5
+++ cluster/rgmanager/src/resources/service.sh 2006/07/11 23:52:41 1.6
@@ -125,6 +125,17 @@
</shortdesc>
<content type="string"/>
</parameter>
+
+ <parameter name="depend">
+ <longdesc lang="en">
+ Top-level service this depends on, in "service:name" format.
+ </longdesc>
+ <shortdesc lang="en">
+ Service dependency; will not start without the specified
+ service running.
+ </shortdesc>
+ <content type="string"/>
+ </parameter>
</parameters>
<actions>
--- cluster/rgmanager/src/utils/clustat.c 2006/06/02 17:37:11 1.17
+++ cluster/rgmanager/src/utils/clustat.c 2006/07/11 23:52:41 1.18
@@ -6,6 +6,7 @@
#include <libgen.h>
#include <ncurses.h>
#include <term.h>
+#include <rg_types.h>
#include <termios.h>
#include <ccs.h>
#include <libcman.h>
@@ -53,7 +54,7 @@
struct timeval tv;
- if (msg_open(0, RG_PORT, &ctx, 10) < 0) {
+ if (msg_open(MSG_SOCKET, 0, 0, &ctx, 10) < 0) {
return NULL;
}
@@ -107,6 +108,7 @@
}
swab_generic_msg_hdr(msgp);
+
if (msgp->gh_command == RG_SUCCESS) {
free(msgp);
break;
@@ -117,7 +119,6 @@
return NULL;
}
-
rsmp = (rg_state_msg_t *)msgp;
swab_rg_state_t(&rsmp->rsm_state);
@@ -157,46 +158,63 @@
char buf[128];
char *name;
cluster_member_list_t *ret = NULL;
+ cman_node_t *nodes = NULL;
desc = ccs_connect();
if (desc < 0) {
return NULL;
}
- x = 1;
+ while ((ret = malloc(sizeof(*ret))) == NULL)
+ sleep(1);
- snprintf(buf, sizeof(buf),
- "/cluster/clusternodes/clusternode[%d]/@name", x);
- while (ccs_get(desc, buf, &name) == 0) {
- if (!ret) {
- ret = malloc(cml_size(x));
- if (!ret) {
+ x = 1;
+ while (1) {
+ snprintf(buf, sizeof(buf),
+ "/cluster/clusternodes/clusternode[%d]/@name", x);
+
+ if (ccs_get(desc, buf, &name) != 0)
+ break;
+
+ if (!nodes) {
+ nodes = malloc(x * sizeof(cman_node_t));
+ if (!nodes ) {
perror("malloc");
ccs_disconnect(desc);
exit(1);
}
- memset(ret, 0, cml_size(x));
+ memset(nodes, 0, x * sizeof(cman_node_t));
} else {
- ret = realloc(ret, cml_size(x));
- if (!ret) {
+ nodes = realloc(ret, x * sizeof(cman_node_t));
+ if (!nodes) {
perror("realloc");
ccs_disconnect(desc);
exit(1);
}
}
- memset(&ret->cml_members[x-1], 0, sizeof(cman_node_t));
- strncpy(ret->cml_members[x-1].cn_name, name,
- sizeof(ret->cml_members[x-1].cn_name));
+ memset(&nodes[x-1], 0, sizeof(cman_node_t));
+ strncpy(nodes[x-1].cn_name, name,
+ sizeof(nodes[x-1].cn_name));
free(name);
+ /* Add node ID */
+ snprintf(buf, sizeof(buf),
+ "/cluster/clusternodes/clusternode[%d]/@nodeid", x);
+ if (ccs_get(desc, buf, &name) == 0) {
+ nodes[x-1].cn_nodeid = atoi(name);
+ free(name);
+ }
+
ret->cml_count = x;
++x;
- snprintf(buf, sizeof(buf),
- "/cluster/clusternodes/clusternode[%d]/@name", x);
}
ccs_disconnect(desc);
+
+ ret->cml_members = nodes;
+
+
return ret;
}
@@ -238,9 +256,12 @@
if (!m) {
printf("%s not found\n", these->cml_members[x].cn_name);
/* WTF? It's not in our config */
- printf("realloc %d\n", (int)cml_size((all->cml_count+1)));
- all = realloc(all, cml_size((all->cml_count+1)));
- if (!all) {
+ printf("realloc %d\n", (int)((all->cml_count+1) *
+ sizeof(cman_node_t)));
+ all->cml_members = realloc(all->cml_members,
+ (all->cml_count+1) *
+ sizeof(cman_node_t));
+ if (!all->cml_members) {
perror("realloc");
exit(1);
}
@@ -440,7 +461,8 @@
void
txt_member_state(cman_node_t *node)
{
- printf(" %-40.40s ", node->cn_name);
+ printf(" %-34.34s %4d ", node->cn_name,
+ node->cn_nodeid);
if (node->cn_member & FLAG_UP)
printf("Online");
@@ -481,8 +503,8 @@
{
int x;
- printf(" %-40.40s %s\n", "Member Name", "Status");
- printf(" %-40.40s %s\n", "------ ----", "------");
+ printf(" %-34.34s %-4.4s %s\n", "Member Name", "ID", "Status");
+ printf(" %-34.34s %-4.4s %s\n", "------ ----", "----", "------");
for (x = 0; x < membership->cml_count; x++) {
if (name && strcmp(membership->cml_members[x].cn_name, name))
--- cluster/rgmanager/src/utils/clusvcadm.c 2006/06/02 17:37:11 1.8
+++ cluster/rgmanager/src/utils/clusvcadm.c 2006/07/11 23:52:41 1.9
@@ -72,7 +72,7 @@
membership = get_member_list(ch);
me = get_my_nodeid(ch);
- if (msg_open(0, RG_PORT, &ctx, 5) < 0) {
+ if (msg_open(MSG_SOCKET, 0, RG_PORT, &ctx, 5) < 0) {
printf("Could not connect to resource group manager\n");
goto out;
}
@@ -179,7 +179,7 @@
main(int argc, char **argv)
{
extern char *optarg;
- char *svcname=NULL, nodename[256];
+ char *svcname=NULL, nodename[256], realsvcname[64];
int opt;
msgctx_t ctx;
cman_handle_t ch;
@@ -259,7 +259,12 @@
usage(basename(argv[0]));
return 1;
}
-
+
+ if (!strchr(svcname,':')) {
+ snprintf(realsvcname, sizeof(realsvcname), "service:%s",
+ svcname);
+ svcname = realsvcname;
+ }
/* No login */
ch = cman_init(NULL);
@@ -287,18 +292,19 @@
*/
}
+ strcpy(nodename,"me");
build_message(&msg, action, svcname, svctarget);
if (action != RG_RELOCATE) {
printf("Member %s %s %s", nodename, actionstr, svcname);
printf("...");
fflush(stdout);
- msg_open(0, RG_PORT, &ctx, 5);
+ msg_open(MSG_SOCKET, 0, RG_PORT, &ctx, 5);
} else {
printf("Trying to relocate %s to %s", svcname, nodename);
printf("...");
fflush(stdout);
- msg_open(0, RG_PORT, &ctx, 5);
+ msg_open(MSG_SOCKET, 0, RG_PORT, &ctx, 5);
}
if (ctx.type < 0) {
@@ -307,7 +313,9 @@
return 1;
}
- if (msg_send(&ctx, &msg, sizeof(msg)) != sizeof(msg)) {
+ opt = msg_send(&ctx, &msg, sizeof(msg));
+
+ if (opt < sizeof(msg)) {
perror("msg_send");
fprintf(stderr, "Could not send entire message!\n");
return 1;
@@ -340,6 +348,9 @@
case RG_EAGAIN:
printf("failed: Try again (resource groups locked)\n");
break;
+ case RG_EDEPEND:
+ printf("failed: Operation would break dependency\n");
+ break;
default:
printf("failed: unknown reason %d\n", msg.sm_data.d_ret);
break;
reply other threads:[~2006-07-11 23:52 UTC|newest]
Thread overview: [no followups] expand[flat|nested] mbox.gz Atom feed
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20060711235246.32496.qmail@sourceware.org \
--to=lhh@sourceware.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.