All of lore.kernel.org
 help / color / mirror / Atom feed
From: lhh@sourceware.org <lhh@sourceware.org>
To: cluster-devel.redhat.com
Subject: [Cluster-devel] cluster/rgmanager/src clulib/fdops.c clulib/lo ...
Date: 13 Jun 2006 19:22:39 -0000	[thread overview]
Message-ID: <20060613192239.27217.qmail@sourceware.org> (raw)

CVSROOT:	/cvs/cluster
Module name:	cluster
Changes by:	lhh at sourceware.org	2006-06-13 19:22:38

Added files:
	rgmanager/src/clulib: fdops.c lock.c lockspace.c members.c 
	                      message.c 
Removed files:
	rgmanager/src/daemons: members.c 

Log message:
	Include missing .c files in src/clulib; remove defunct src/daemons/members.c

Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/fdops.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/lock.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/lockspace.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/members.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/message.c.diff?cvsroot=cluster&r1=NONE&r2=1.1
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/members.c.diff?cvsroot=cluster&r1=1.4&r2=NONE

/cvs/cluster/cluster/rgmanager/src/clulib/fdops.c,v  -->  standard output
revision 1.1
--- cluster/rgmanager/src/clulib/fdops.c
+++ -	2006-06-13 19:22:38.586032000 +0000
@@ -0,0 +1,193 @@
+/*
+  Copyright Red Hat, Inc. 2002-2003
+
+  This program is free software; you can redistribute it and/or modify it
+  under the terms of the GNU General Public License as published by the
+  Free Software Foundation; either version 2, or (at your option) any
+  later version.
+
+  This program is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  General Public License for more details.
+
+  You should have received a copy of the GNU General Public License
+  along with this program; see the file COPYING.  If not, write to the
+  Free Software Foundation, Inc.,  675 Mass Ave, Cambridge,
+  MA 02139, USA.
+*/
+/** @file
+ * Wrapper functions around read/write/select to retry in the event
+ * of interrupts.
+ */
+#include <unistd.h>
+#include <sys/types.h>
+#include <fcntl.h>
+#include <errno.h>
+
+/**
+ * This is a wrapper around select which will retry in the case we receive
+ * EINTR.  This is necessary for _read_retry, since it wouldn't make sense
+ * to have _read_retry terminate if and only if two EINTRs were received
+ * in a row - one during the read() call, one during the select call...
+ *
+ * See select(2) for description of parameters.
+ */
+int
+_select_retry(int fdmax, fd_set * rfds, fd_set * wfds, fd_set * xfds,
+	       struct timeval *timeout)
+{
+	int rv;
+
+	while (1) {
+		rv = select(fdmax, rfds, wfds, xfds, timeout);
+		if ((rv == -1) && (errno == EINTR))
+			/* return on EBADF/EINVAL/ENOMEM; continue on EINTR */
+			continue;
+		return rv;
+	}
+}
+
+/**
+ * Retries a write in the event of a non-blocked interrupt signal.
+ *
+ * @param fd		File descriptor to which we are writing.
+ * @param buf		Data buffer to send.
+ * @param count		Number of bytes in buf to send.
+ * @param timeout	(struct timeval) telling us how long we should retry.
+ * @return		The number of bytes written to the file descriptor,
+ * 			or -1 on error (with errno set appropriately).
+ */
+ssize_t
+_write_retry(int fd, void *buf, int count, struct timeval * timeout)
+{
+	int n, total = 0, remain = count, rv = 0;
+	fd_set wfds, xfds;
+
+	while (total < count) {
+
+		/* Create the write FD set of 1... */
+		FD_ZERO(&wfds);
+		FD_SET(fd, &wfds);
+		FD_ZERO(&xfds);
+		FD_SET(fd, &xfds);
+
+		/* wait for the fd to be available for writing */
+		rv = _select_retry(fd + 1, NULL, &wfds, &xfds, timeout);
+		if (rv == -1)
+			return -1;
+		else if (rv == 0) {
+			errno = ETIMEDOUT;
+			return -1;
+		}
+
+		if (FD_ISSET(fd, &xfds)) {
+			errno = EPIPE;
+			return -1;
+		}
+
+		/* 
+		 * Attempt to write to fd
+		 */
+		n = write(fd, buf + (off_t) total, remain);
+
+		/*
+		 * When we know our fd was select()ed and we receive 0 bytes
+		 * when we write, the fd was closed.
+		 */
+		if ((n == 0) && (rv == 1)) {
+			errno = EPIPE;
+			return -1;
+		}
+
+		if (n == -1) {
+			if ((errno == EAGAIN) || (errno == EINTR)) {
+				/* 
+				 * Not ready?
+				 */
+				continue;
+			}
+
+			/* Other errors: EIO, EINVAL, etc */
+			return -1;
+		}
+
+		total += n;
+		remain -= n;
+	}
+
+	return total;
+}
+
+/**
+ * Retry reads until we @ time out or (b) get our data.  Of course, if
+ * timeout is NULL, it'll wait forever.
+ *
+ * @param sockfd	File descriptor we want to read from.
+ * @param buf		Preallocated buffer into which we will read data.
+ * @param count		Number of bytes to read.
+ * @param timeout	(struct timeval) describing how long we should retry.
+ * @return 		The number of bytes read on success, or -1 on failure.
+ 			Note that we will always return (count) or (-1).
+ */
+ssize_t
+_read_retry(int sockfd, void *buf, int count, struct timeval * timeout)
+{
+	int n, total = 0, remain = count, rv = 0;
+	fd_set rfds, xfds;
+
+	while (total < count) {
+		FD_ZERO(&rfds);
+		FD_SET(sockfd, &rfds);
+		FD_ZERO(&xfds);
+		FD_SET(sockfd, &xfds);
+		
+		/*
+		 * Select on the socket, in case it closes while we're not
+		 * looking...
+		 */
+		rv = _select_retry(sockfd + 1, &rfds, NULL, &xfds, timeout);
+		if (rv == -1)
+			return -1;
+		else if (rv == 0) {
+			errno = ETIMEDOUT;
+			return -1;
+		}
+
+		if (FD_ISSET(sockfd, &xfds)) {
+			errno = EPIPE;
+			return -1;
+		}
+
+		/* 
+		 * Attempt to read off the socket 
+		 */
+		n = read(sockfd, buf + (off_t) total, remain);
+
+		/*
+		 * When we know our socket was select()ed and we receive 0 bytes
+		 * when we read, the socket was closed.
+		 */
+		if ((n == 0) && (rv == 1)) {
+			errno = EPIPE;
+			return -1;
+		}
+
+		if (n == -1) {
+			if ((errno == EAGAIN) || (errno == EINTR)) {
+				/* 
+				 * Not ready? Wait for data to become available
+				 */
+				continue;
+			}
+
+			/* Other errors: EPIPE, EINVAL, etc */
+			return -1;
+		}
+
+		total += n;
+		remain -= n;
+	}
+
+	return total;
+}
/cvs/cluster/cluster/rgmanager/src/clulib/lock.c,v  -->  standard output
revision 1.1
--- cluster/rgmanager/src/clulib/lock.c
+++ -	2006-06-13 19:22:38.669122000 +0000
@@ -0,0 +1,153 @@
+/*
+  Copyright Red Hat, Inc. 2004-2006
+
+  The Magma Cluster API Library is free software; you can redistribute
+  it and/or modify it under the terms of the GNU Lesser General Public
+  License as published by the Free Software Foundation; either version
+  2.1 of the License, or (at your option) any later version.
+
+  The Magma Cluster API Library is distributed in the hope that it will
+  be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+  of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, write to the Free Software
+  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+  USA.
+ */
+/** @file
+ * Locking.
+ */
+#include <errno.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <stdio.h>
+#include <assert.h>
+#include <sys/ioctl.h>
+#include <lock.h>
+#include <sys/types.h>
+#include <sys/select.h>
+#include <pthread.h>
+
+
+static void
+ast_function(void * __attribute__ ((unused)) arg)
+{
+}
+
+
+static int
+wait_for_dlm_event(dlm_lshandle_t *ls)
+{
+	fd_set rfds;
+	int fd = dlm_ls_get_fd(ls);
+
+	FD_ZERO(&rfds);
+	FD_SET(fd, &rfds);
+
+	if (select(fd + 1, &rfds, NULL, NULL, NULL) == 1)
+		return dlm_dispatch(fd);
+
+	return -1;
+}
+
+
+int
+clu_lock(dlm_lshandle_t ls,
+	 int mode,
+	 struct dlm_lksb *lksb,
+	 int options,
+         char *resource)
+{
+        int ret;
+
+	if (!ls || !lksb || !resource || !strlen(resource)) {
+		errno = EINVAL;
+		return -1;
+	}
+
+        ret = dlm_ls_lock(ls, mode, lksb, options, resource,
+                          strlen(resource), 0, ast_function, lksb,
+                          NULL, NULL);
+
+        if (ret < 0) {
+                if (errno == ENOENT)
+                        assert(0);
+
+                return -1;
+        }
+
+        if ((ret = (wait_for_dlm_event(ls) < 0))) {
+                fprintf(stderr, "wait_for_dlm_event: %d / %d\n",
+                        ret, errno);
+                return -1;
+        }
+
+        return 0;
+}
+
+
+
+dlm_lshandle_t
+clu_acquire_lockspace(const char *lsname)
+{
+        dlm_lshandle_t ls = NULL;
+
+        while (!ls) {
+                ls = dlm_open_lockspace(lsname);
+                if (ls)
+                        break;
+
+                ls = dlm_create_lockspace(lsname, 0644);
+                if (ls)
+                        break;
+
+                /* Work around race: Someone was closing lockspace as
+                   we were trying to open it.  Retry. */
+                if (errno == ENOENT)
+                        continue;
+
+                fprintf(stderr, "failed acquiring lockspace: %s\n",
+                        strerror(errno));
+
+                return NULL;
+        }
+
+        return ls;
+}
+
+
+
+int
+clu_unlock(dlm_lshandle_t ls, struct dlm_lksb *lksb)
+{
+        int ret;
+
+	if (!ls || !lksb) {
+		errno = EINVAL;
+		return -1;
+	}
+
+        ret = dlm_ls_unlock(ls, lksb->sb_lkid, 0, lksb, NULL);
+
+        if (ret != 0)
+                return ret;
+
+        /* lksb->sb_status should be EINPROG at this point */
+
+        if (wait_for_dlm_event(ls) < 0) {
+                errno = lksb->sb_status;
+                return -1;
+        }
+
+        return 0;
+}
+
+
+int
+clu_release_lockspace(dlm_lshandle_t ls, char *name)
+{
+        return dlm_release_lockspace(name, ls, 0);
+}
/cvs/cluster/cluster/rgmanager/src/clulib/lockspace.c,v  -->  standard output
revision 1.1
--- cluster/rgmanager/src/clulib/lockspace.c
+++ -	2006-06-13 19:22:38.753795000 +0000
@@ -0,0 +1,75 @@
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <stdio.h>
+#include <assert.h>
+#include <sys/ioctl.h>
+#include <sys/select.h>
+#include <lock.h>
+#include <pthread.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <clulog.h>
+#include <signal.h>
+#include <gettid.h>
+#include <libdlm.h>
+#include <errno.h>
+
+
+#define RGMGR_LOCKSPACE "rgmanager"
+
+static pthread_mutex_t _default_lock = PTHREAD_MUTEX_INITIALIZER;
+static char _init = 0;
+static pid_t _holder_id = 0;
+static dlm_lshandle_t _default_ls;
+
+
+static int
+_init_lockspace(void)
+{
+	_default_ls = clu_acquire_lockspace(RGMGR_LOCKSPACE);
+	if (!_default_ls) {
+		return -1;
+	}
+	_init = 1;
+	return 0;
+}
+
+
+dlm_lshandle_t 
+ls_hold_default(void)
+{
+	pthread_mutex_lock(&_default_lock);
+	if (!_init && (_init_lockspace() < 0)) {
+		pthread_mutex_unlock(&_default_lock);
+		errno = ENOLCK;
+		return NULL;
+	}
+
+	if (_holder_id != 0) {
+		pthread_mutex_unlock(&_default_lock);
+		errno = EAGAIN;
+		return NULL;
+	}
+
+	_holder_id = gettid();
+	pthread_mutex_unlock(&_default_lock);
+	return _default_ls;
+}
+
+
+void
+ls_release_default(void)
+{
+	pthread_mutex_lock(&_default_lock);
+	if (_holder_id != gettid()) {
+		clulog(LOG_ERR, "Attempt to release lockspace when I am not"
+		       "the holder!\n");
+		raise(SIGSTOP);
+	}
+
+	_holder_id = 0;
+	pthread_mutex_unlock(&_default_lock);
+}
+
+
/cvs/cluster/cluster/rgmanager/src/clulib/members.c,v  -->  standard output
revision 1.1
--- cluster/rgmanager/src/clulib/members.c
+++ -	2006-06-13 19:22:38.840838000 +0000
@@ -0,0 +1,397 @@
+#include <sys/types.h>
+#include <arpa/inet.h>
+#include <stdint.h>
+#include <malloc.h>
+#include <libcman.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <members.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <rg_types.h>
+#include <pthread.h>
+
+static int my_node_id = -1;
+static pthread_rwlock_t memblock = PTHREAD_RWLOCK_INITIALIZER;
+static cluster_member_list_t *membership = NULL;
+
+
+/**
+  Return the stored node ID.  Since this should never
+  change during the duration of running rgmanager, it is
+  not protected by a lock.
+ */
+int
+my_id(void)
+{
+	return my_node_id;
+}
+
+
+int
+set_my_id(int id)
+{
+	my_node_id = id;
+	return 0;
+}
+
+
+/**
+  Determine and store the local node ID.  This should
+  only ever be called once during initialization.
+ */
+int
+get_my_nodeid(cman_handle_t h)
+{
+	cman_node_t node;
+
+	if (cman_get_node(h, CMAN_NODEID_US, &node) != 0)
+		return -1;
+
+	return node.cn_nodeid;
+}
+
+
+
+/**
+  Generate and return a list of members which are now online in a new
+  membership list, given the old membership list.  User must call
+  @ref free_member_list
+  to free the returned
+  @ref cluster_member_list_t
+  structure.
+
+  @param old		Old membership list
+  @param new		New membership list
+  @return		NULL if no members were gained, or a newly 
+  			allocated cluster_member_list_t structure.
+ */
+cluster_member_list_t *
+memb_gained(cluster_member_list_t *old, cluster_member_list_t *new)
+{
+	int count, x, y;
+	char in_old = 0;
+	cluster_member_list_t *gained;
+
+	/* No nodes in new?  Then nothing could have been gained */
+	if (!new || !new->cml_count)
+		return NULL;
+
+	/* Nothing in old?  Duplicate 'new' and return it. */
+	if (!old || !old->cml_count) {
+		gained = cml_alloc(new->cml_count);
+		if (!gained)
+			return NULL;
+		memcpy(gained, new, cml_size(new->cml_count));
+		return gained;
+	}
+
+	/* Use greatest possible count */
+	count = (old->cml_count > new->cml_count ?
+		 cml_size(old->cml_count) : cml_size(new->cml_count));
+
+	gained = malloc(count);
+	if (!gained)
+		return NULL;
+	memset(gained, 0, count);
+
+	for (x = 0; x < new->cml_count; x++) {
+
+		/* This one isn't active at the moment; it could not have
+		   been gained. */
+		if (!new->cml_members[x].cn_member)
+			continue;
+
+		in_old = 0;
+		for (y = 0; y < old->cml_count; y++) {
+			if ((new->cml_members[x].cn_nodeid !=
+			     old->cml_members[y].cn_nodeid) ||
+			     !old->cml_members[y].cn_member)
+				continue;
+			in_old = 1;
+			break;
+		}
+
+		if (in_old)
+			continue;
+		memcpy(&gained->cml_members[gained->cml_count],
+		       &new->cml_members[x], sizeof(cman_node_t));
+	}
+
+	if (gained->cml_count == 0) {
+		free(gained);
+		gained = NULL;
+	}
+
+	return gained;
+}
+
+
+/**
+  Generate and return a list of members which are lost or no longer online
+  in a new membership list, given the old membership list.  User must call
+  @ref free_member_list
+  to free the returned
+  @ref cluster_member_list_t
+  structure.
+
+  @param old		Old membership list
+  @param new		New membership list
+  @return		NULL if no members were lost, or a newly 
+  			allocated cluster_member_list_t structure.
+ */
+cluster_member_list_t *
+memb_lost(cluster_member_list_t *old, cluster_member_list_t *new)
+{
+	cluster_member_list_t *ret;
+	int x;
+
+	/* Reverse. ;) */
+	ret = memb_gained(new, old);
+	if (!ret)
+		return NULL;
+
+	for (x = 0; x < ret->cml_count; x++) {
+		ret->cml_members[x].cn_member = 0;
+	}
+
+	return ret;
+}
+
+
+
+void
+member_list_update(cluster_member_list_t *new_ml)
+{
+	pthread_rwlock_wrlock(&memblock);
+	if (membership)
+		free_member_list(membership);
+	if (new_ml)
+		membership = member_list_dup(new_ml);
+	else
+		membership = NULL;
+	pthread_rwlock_unlock(&memblock);
+}
+
+
+cluster_member_list_t *
+member_list(void)
+{
+	cluster_member_list_t *ret = NULL;
+	pthread_rwlock_rdlock(&memblock);
+	if (membership) 
+		ret = member_list_dup(membership);
+	pthread_rwlock_unlock(&memblock);
+	return ret;
+}
+
+
+char *
+member_name(uint64_t id, char *buf, int buflen)
+{
+	char *n;
+
+	if (!buf || !buflen)
+		return NULL;
+
+	pthread_rwlock_rdlock(&memblock);
+	n = memb_id_to_name(membership, id);
+	if (n) {
+		strncpy(buf, n, buflen);
+	} else {
+		buf[0] = 0;
+	}
+	pthread_rwlock_unlock(&memblock);
+	return buf;
+}
+
+
+
+cluster_member_list_t *
+get_member_list(cman_handle_t h)
+{
+	int c;
+	cluster_member_list_t *ml = NULL;
+	cman_node_t *nodes = NULL;
+
+	do {
+		if (nodes)
+			free(nodes);
+
+		c = cman_get_node_count(h);
+		if (c <= 0)
+			return NULL;
+
+		if (!ml)
+			ml = malloc(sizeof(*ml));
+		if (!ml)
+			return NULL;
+
+		nodes = malloc(sizeof(*nodes) * c);
+		if (!nodes) {
+			free(ml);
+			return NULL;
+		}
+
+		memset(ml, 0, sizeof(*ml));
+		memset(nodes, 0, sizeof(*nodes)*c);
+
+		cman_get_nodes(h, c, &ml->cml_count, nodes);
+
+	} while (ml->cml_count != c);
+
+	ml->cml_members = nodes;
+	ml->cml_count = c;
+	return ml;
+}
+
+
+void
+free_member_list(cluster_member_list_t *ml)
+{
+	if (ml) {
+		if (ml->cml_members)
+			free(ml->cml_members);
+		free(ml);
+	}
+}
+
+
+int
+memb_online(cluster_member_list_t *ml, int nodeid)
+{
+	int x = 0;
+
+	for (x = 0; x < ml->cml_count; x++) {
+		if (ml->cml_members[x].cn_nodeid == nodeid)
+			return ml->cml_members[x].cn_member;
+	}
+
+	return 0;
+}
+
+
+int
+memb_count(cluster_member_list_t *ml)
+{
+	int x = 0, count = 0;
+
+	for (x = 0; x < ml->cml_count; x++) {
+		if (ml->cml_members[x].cn_member)
+			++count;
+	}
+
+	return count;
+}
+
+
+int
+memb_mark_down(cluster_member_list_t *ml, int nodeid)
+{
+	int x = 0;
+
+	for (x = 0; x < ml->cml_count; x++) {
+		if (ml->cml_members[x].cn_nodeid == nodeid)
+			ml->cml_members[x].cn_member = 0;
+	}
+
+	return 0;
+}
+
+
+
+char *
+memb_id_to_name(cluster_member_list_t *ml, int nodeid)
+{
+	int x = 0;
+
+	for (x = 0; x < ml->cml_count; x++) {
+		if (ml->cml_members[x].cn_nodeid == nodeid)
+			return ml->cml_members[x].cn_name;
+	}
+
+	return 0;
+}
+
+
+cman_node_t *
+memb_id_to_p(cluster_member_list_t *ml, int nodeid)
+{
+	int x = 0;
+
+	for (x = 0; x < ml->cml_count; x++) {
+		if (ml->cml_members[x].cn_nodeid == nodeid)
+			return &ml->cml_members[x];
+	}
+
+	return 0;
+}
+
+
+int
+memb_online_name(cluster_member_list_t *ml, char *name)
+{
+	int x = 0;
+
+	for (x = 0; x < ml->cml_count; x++) {
+		if (!strcasecmp(ml->cml_members[x].cn_name, name))
+			return ml->cml_members[x].cn_member;
+	}
+
+	return 0;
+}
+
+
+int
+memb_name_to_id(cluster_member_list_t *ml, char *name)
+{
+	int x = 0;
+
+	for (x = 0; x < ml->cml_count; x++) {
+		if (!strcasecmp(ml->cml_members[x].cn_name, name))
+			return ml->cml_members[x].cn_nodeid;
+	}
+
+	return 0;
+}
+
+
+cman_node_t *
+memb_name_to_p(cluster_member_list_t *ml, char *name)
+{
+	int x = 0;
+
+	for (x = 0; x < ml->cml_count; x++) {
+		if (!strcasecmp(ml->cml_members[x].cn_name, name))
+			return &ml->cml_members[x];
+	}
+
+	return 0;
+}
+
+/**
+  Duplicate and return a cluster member list structure, sans the DNS resolution
+  information.
+
+  @param orig		List to duplicate.
+  @return		NULL if there is nothing to duplicate or duplication
+  			fails, or a newly allocated cluster_member_list_t
+			structure.
+ */
+cluster_member_list_t *
+member_list_dup(cluster_member_list_t *orig)
+{
+	cluster_member_list_t *ret = NULL;
+
+	if (!orig)
+		return NULL;
+
+	ret = malloc(cml_size(orig->cml_count));
+	memset(ret, 0, cml_size(orig->cml_count));
+	memcpy(ret, orig, cml_size(orig->cml_count));
+
+	return ret;
+}
+
/cvs/cluster/cluster/rgmanager/src/clulib/message.c,v  -->  standard output
revision 1.1
--- cluster/rgmanager/src/clulib/message.c
+++ -	2006-06-13 19:22:38.921541000 +0000
@@ -0,0 +1,1357 @@
+#define _MESSAGE_BUILD
+#include <message.h>
+#include <stdio.h>
+#include <pthread.h>
+#include <libcman.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/types.h>
+#include <stdlib.h>
+#include <rg_types.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <errno.h>
+#include <sys/time.h>
+#include <fdops.h>
+#include <resgroup.h>
+
+
+
+/* Ripped from ccsd's setup_local_socket */
+#define RGMGR_SOCK "/var/run/cluster/rgmanager.sk"
+#define MAX_CONTEXTS 32  /* Testing; production should be 1024-ish */
+
+/* Context 0 is reserved for control messages */
+
+/* Local-ish contexts */
+static pthread_mutex_t context_lock = PTHREAD_MUTEX_INITIALIZER;
+static msgctx_t *contexts[MAX_CONTEXTS];
+static uint32_t context_index = 1;
+static chandle_t *gch;
+pthread_t comms_thread;
+int thread_running;
+
+
+#define is_established(ctx) \
+	(((ctx->type == MSG_CLUSTER) && \
+ 	  (ctx->u.cluster_info.remote_ctx && ctx->u.cluster_info.local_ctx)) || \
+	 ((ctx->type == MSG_SOCKET) && \
+	  (ctx->u.local_info.sockfd != -1)))
+
+
+static int
+local_connect(void)
+{
+	struct sockaddr_un sun;
+	int sock = -1, error = 0;
+
+	memset(&sun, 0, sizeof(sun));
+	sun.sun_family = PF_LOCAL;
+	snprintf(sun.sun_path, sizeof(sun.sun_path), RGMGR_SOCK);
+
+	sock = socket(PF_LOCAL, SOCK_STREAM, 0);
+	if (sock < 0) {
+		error = errno;
+		goto fail;
+	}
+
+	error = connect(sock, (struct sockaddr *)&sun, sizeof(sun));
+	if (error < 0) {
+		error = errno;
+		goto fail;
+	}
+
+	sock = error;
+fail:
+
+	return sock;
+}
+
+
+static int
+send_cluster_message(msgctx_t *ctx, void *msg, size_t len)
+{
+	char buf[4096];
+	cluster_msg_hdr_t *h = (void *)buf;
+	int ret;
+	char *msgptr = (buf + sizeof(*h));
+
+	if ((len + sizeof(*h)) > sizeof(buf)) {
+		errno = E2BIG;
+		return -1;
+	}
+
+	h->msg_control = M_DATA;
+	h->src_ctx = ctx->u.cluster_info.local_ctx;
+	h->dest_ctx = ctx->u.cluster_info.remote_ctx;
+	h->msg_port = ctx->u.cluster_info.port;
+	memcpy(msgptr, msg, len);
+
+	/*
+	printf("sending cluster message, length = %d to nodeid %d port %d\n",
+	       len + sizeof(*h), ctx->u.cluster_info.nodeid, ctx->u.cluster_info.port);
+	 */
+
+	pthread_mutex_lock(&gch->c_lock);
+	h->src_nodeid = gch->c_nodeid;
+
+	swab_cluster_msg_hdr_t(h);
+
+	ret = cman_send_data(gch->c_cluster, (void *)h, len + sizeof(*h),
+			       ctx->u.cluster_info.nodeid,
+			       ctx->u.cluster_info.port, 0);
+
+	pthread_mutex_unlock(&gch->c_lock);
+
+	return len + sizeof(h);
+}
+
+
+/**
+  Wrapper around write(2)
+ */
+static int
+send_socket_message(msgctx_t *ctx, void *msg, size_t len)
+{
+	char buf[4096];
+	local_msg_hdr_t *h = (local_msg_hdr_t *)buf;
+	char *msgptr = (buf + sizeof(*h));
+
+	/* encapsulate ... ? */
+	if ((len + sizeof(*h)) > sizeof(buf)) {
+		errno = E2BIG;
+		return -1;
+	}
+
+	h->msg_control = M_DATA;
+	h->msg_len = len;
+	memcpy(msgptr, msg, len);
+
+	return _write_retry(ctx->u.local_info.sockfd, msg, len + sizeof(*h), NULL);
+}
+
+
+/**
+  Message sending API.  Sends to the cluster or a socket, depending on
+  the context.
+ */
+int
+msg_send(msgctx_t *ctx, void *msg, size_t len)
+{
+	if (!ctx || !msg || !len) {
+		errno = EINVAL;
+		return -1;
+	}
+
+	switch(ctx->type) {
+	case MSG_CLUSTER:
+		return send_cluster_message(ctx, msg, len);
+	case MSG_SOCKET:
+		return send_socket_message(ctx, msg, len);
+	default:
+		break;
+	}
+
+	errno = EINVAL;
+	return -1;
+}
+
+
+/**
+  Assign a (free) cluster context ID
+ */
+static int
+assign_ctx(msgctx_t *ctx)
+{
+	int start;
+
+	/* Assign context index */
+	ctx->type = MSG_CLUSTER;
+
+	pthread_mutex_lock(&context_lock);
+	start = context_index++;
+	if (context_index >= MAX_CONTEXTS || context_index <= 0)
+		context_index = 1;
+	do {
+		if (contexts[context_index]) {
+			++context_index;
+			if (context_index >= MAX_CONTEXTS)
+				context_index = 1;
+
+			if (context_index == start) {
+				pthread_mutex_unlock(&context_lock);
+				errno = EAGAIN;
+				return -1;
+			}
+
+			continue;
+		}
+
+		contexts[context_index] = ctx;
+		ctx->u.cluster_info.local_ctx = context_index;
+
+	} while (0);
+	pthread_mutex_unlock(&context_lock);
+
+	return 0;
+}
+
+
+/* See if anything's on the cluster socket.  If so, dispatch it
+   on to the requisite queues
+   XXX should be passed a connection arg! */
+static int
+poll_cluster_messages(int timeout)
+{
+	int ret = -1;
+	fd_set rfds;
+	int fd;
+	struct timeval tv;
+	struct timeval *p = NULL;
+
+	if (timeout >= 0) {
+		p = &tv;
+		tv.tv_sec = tv.tv_usec = timeout;
+	}
+	printf("%s\n", __FUNCTION__);
+
+	FD_ZERO(&rfds);
+
+	//pthread_mutex_lock(&gch->c_lock);
+	fd = cman_get_fd(gch->c_cluster);
+	FD_SET(fd, &rfds);
+
+	if (select(fd + 1, &rfds, NULL, NULL, p) == 1) {
+		cman_dispatch(gch->c_cluster, 0);
+		ret = 0;
+	}
+	//pthread_mutex_unlock(&gch->c_lock);
+
+	return ret;
+}
+
+
+/**
+  This is used to establish and tear down pseudo-private
+  contexts which are shared with the cluster context.
+ */
+static int
+cluster_send_control_msg(msgctx_t *ctx, int type)
+{
+	cluster_msg_hdr_t cm;
+
+	cm.msg_control = (uint8_t)type;
+	cm.src_nodeid = gch->c_nodeid;
+	cm.dest_ctx = ctx->u.cluster_info.remote_ctx;
+	cm.src_ctx = ctx->u.cluster_info.local_ctx;
+	cm.msg_port = ctx->u.cluster_info.port;
+
+	swab_cluster_msg_hdr_t(&cm);
+
+	return (cman_send_data(gch->c_cluster, (void *)&cm, sizeof(cm),
+			       ctx->u.cluster_info.nodeid,
+			       ctx->u.cluster_info.port, 0));
+}
+
+
+/**
+  Wait for a message on a context.
+ */
+static int
+cluster_msg_wait(msgctx_t *ctx, int timeout)
+{
+	struct timespec ts = {0, 0};
+	int req = M_NONE;
+	struct timeval start;
+	struct timeval now;
+	
+
+	if (timeout > 0)
+		gettimeofday(&start, NULL);
+
+	ts.tv_sec = !!timeout;
+
+	pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+	while (1) {
+		/* See if we dispatched any messages on to our queue */
+		if (ctx->u.cluster_info.queue) {
+			req = ctx->u.cluster_info.queue->message->msg_control;
+			/*printf("Queue not empty CTX%d : %d\n",
+			  	 ctx->u.cluster_info.local_ctx, req);*/
+			break;
+		}
+
+		if (timeout == 0)
+			break;
+
+		/* Ok, someone else has the mutex on our FD.  Go to
+	   	   sleep on a cond; maybe they'll wake us up */
+		if (pthread_cond_timedwait(&ctx->u.cluster_info.cond,
+		    			   &ctx->u.cluster_info.mutex,
+		   			   &ts) < 0) {
+
+			/* Mutex held */
+			if (errno == ETIMEDOUT) {
+				if (timeout < 0) {
+					ts.tv_sec = 1;
+					ts.tv_nsec = 0;
+					continue;
+				} 
+
+				ts.tv_sec = !!timeout;
+
+				/* Done */
+				break;
+			}
+		}
+
+		if (timeout > 0) {
+			gettimeofday(&now, NULL);
+			/* XXX imprecise */
+			if (now.tv_sec - start.tv_sec > timeout)
+				break;
+		}
+	}
+	pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+	return req;
+}
+
+
+static int
+peekypeeky(int fd)
+{
+	local_msg_hdr_t h;
+	int ret;
+
+	while ((ret = recv(fd, (void *)&h, sizeof(h), MSG_PEEK)) < 0) {
+		if (errno == EINTR)
+			continue;
+		return -1;
+	}
+
+	if (ret == sizeof(h))
+		return h.msg_control;
+
+	if (ret == 0)
+		/* Socket closed? */
+		return M_CLOSE;
+
+	/* XXX */
+	printf("PROTOCOL ERROR: Invalid message\n");
+	return M_CLOSE;
+}
+
+
+static int
+local_msg_wait(msgctx_t *ctx, int timeout)
+{
+	fd_set rfds;
+	struct timeval tv = {0, 0};
+	struct timeval *p = NULL;
+
+	if (timeout >= 0) {
+		tv.tv_sec = timeout;
+		p = &tv;
+	}
+
+	FD_ZERO(&rfds);
+	FD_SET(ctx->u.local_info.sockfd, &rfds);
+
+	if (_select_retry(ctx->u.local_info.sockfd + 1, &rfds,
+			  NULL, NULL, p) == 1) {
+		return peekypeeky(ctx->u.local_info.sockfd);
+	}
+
+	return M_NONE;
+}
+
+
+int
+msg_get_nodeid(msgctx_t *ctx)
+{
+	switch(ctx->type) {
+	case MSG_CLUSTER:
+		return ctx->u.cluster_info.nodeid;
+	case MSG_SOCKET:
+		return 0;
+	default:
+		break;
+	}
+
+	return -1;
+}
+
+
+int
+msg_fd_set(msgctx_t *ctx, fd_set *fds, int *max)
+{
+	int e;
+	switch(ctx->type) {
+	case MSG_CLUSTER:
+		pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+		if (ctx->u.cluster_info.select_pipe[0] < 0) {
+			if (pipe(ctx->u.cluster_info.select_pipe) < 0) {
+				e = errno;
+				pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+				errno = e;
+				return -1;
+			}
+
+			printf("%s: Created cluster CTX select pipe "
+			       "rd=%d wr=%d\n", __FUNCTION__,
+			       ctx->u.cluster_info.select_pipe[0],
+			       ctx->u.cluster_info.select_pipe[1]);
+
+		}
+
+		e = ctx->u.cluster_info.select_pipe[0];
+		printf("%s: cluster %d\n", __FUNCTION__,  e);
+		FD_SET(e, fds);
+		pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+		if (e > *max)
+			*max = e;
+		return 0;
+
+	case MSG_SOCKET:
+		if (ctx->u.local_info.sockfd >= 0) {
+			printf("%s: local %d\n", __FUNCTION__,
+			       ctx->u.local_info.sockfd);
+			FD_SET(ctx->u.local_info.sockfd, fds);
+
+			if (ctx->u.local_info.sockfd > *max)
+				*max = ctx->u.local_info.sockfd;
+			return 0;
+		}
+		return -1;
+	default:
+		break;
+	}
+
+	return -1;
+}
+
+
+int
+msg_fd_isset(msgctx_t *ctx, fd_set *fds)
+{
+	errno = EINVAL;
+
+	if (!fds || !ctx)
+		return -1;
+
+	switch(ctx->type) {
+	case MSG_CLUSTER:
+		pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+		if (ctx->u.cluster_info.select_pipe[0] >= 0 &&
+		    FD_ISSET(ctx->u.cluster_info.select_pipe[0], fds)) {
+			pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+			return 1;
+		}
+		pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+		return 0;
+	case MSG_SOCKET:
+		if (ctx->u.local_info.sockfd >= 0 &&
+		    FD_ISSET(ctx->u.local_info.sockfd, fds)) {
+			return 1;
+		}
+		return 0;
+	default:
+		break;
+	}
+
+	return -1;
+}
+
+
+int
+msg_fd_clr(msgctx_t *ctx, fd_set *fds)
+{
+	errno = EINVAL;
+
+	if (!fds || !ctx)
+		return -1;
+
+	switch(ctx->type) {
+	case MSG_CLUSTER:
+		pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+		if (ctx->u.cluster_info.select_pipe[0] >= 0) {
+		    	FD_CLR(ctx->u.cluster_info.select_pipe[0], fds);
+			pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+			return 1;
+		}
+		pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+		return 0;
+	case MSG_SOCKET:
+		if (ctx->u.local_info.sockfd >= 0) {
+		    	FD_CLR(ctx->u.local_info.sockfd, fds);
+			return 1;
+		}
+		return 0;
+	default:
+		break;
+	}
+
+	return -1;
+}
+
+
+/**
+  This polls the context for 'timeout' seconds waiting for data
+  to become available.  Return codes are M_DATA, M_CLOSE, and M_OPEN
+
+  M_DATA - data available
+  M_OPEN - needs msg_accept(
+  M_CLOSE - context / socket closed by remote host
+  M_NONE - nothing available
+
+  For the cluster connection, the return code could also map to one of
+  the CMAN return codes
+
+  M_STATECHANGE - node has changed state
+
+ */
+int
+msg_wait(msgctx_t *ctx, int timeout)
+{
+
+	if (!ctx) {
+		errno = EINVAL;
+		return -1;
+	}
+		
+	switch(ctx->type) {
+	case MSG_CLUSTER:
+		return cluster_msg_wait(ctx, timeout);
+	case MSG_SOCKET:
+		return local_msg_wait(ctx, timeout);
+	default:
+		break;
+	}
+
+	errno = EINVAL;
+	return -1;
+}
+
+
+int
+_cluster_msg_receive(msgctx_t *ctx, void **msg, size_t *len)
+{
+	cluster_msg_hdr_t *m;
+	msg_q_t *n;
+	int ret = 0;
+
+	if (msg)
+		*msg = NULL;
+	if (len)
+		*len = 0;
+
+	if (ctx->u.cluster_info.local_ctx < 0 ||
+	    ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) {
+		errno = EBADF;
+		return -1;
+	}
+
+	/* trigger receive here */
+	pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+
+	n = ctx->u.cluster_info.queue;
+	if (n == NULL) {
+		pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+		errno = EAGAIN;
+		return -1;
+	}
+
+	list_remove(&ctx->u.cluster_info.queue, n);
+
+	pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+
+	m = n->message;
+	switch(m->msg_control) {
+	case M_CLOSE:
+		ctx->u.cluster_info.remote_ctx = 0;
+		break;
+	case M_OPEN_ACK:
+		/* Response to new connection */
+		ctx->u.cluster_info.remote_ctx = m->src_ctx;
+		break;
+	case M_DATA:
+		/* Kill the message control structure */
+		memmove(m, &m[1], n->len - sizeof(*m));
+		if (msg)
+			*msg = (void *)m;
+		else {
+			printf("Warning: dropping data message\n");
+			free(m);
+		}
+		if (len)
+			*len = (n->len - sizeof(*m));
+		ret = (n->len - sizeof(*m));
+		free(n);
+
+		printf("Message received\n");
+		return ret;
+	case M_OPEN:
+		/* Someone is trying to open a connection */
+	default:
+		/* ?!! */
+		ret = -1;
+		break;
+	}
+
+	free(m);
+	free(n);
+
+	return ret;
+}
+
+
+/**
+  Receive a message from a cluster-context.  This copies out the contents
+  into the user-specified buffer, and does random other things.
+ */
+static int
+cluster_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+	int req;
+	void *priv_msg;
+	size_t priv_len;
+
+	if (!msg || !maxlen) {
+		errno = EINVAL;
+		return -1;
+	}
+
+	req = cluster_msg_wait(ctx, timeout);
+
+	switch (req) {
+	case M_DATA:
+		/* Copy out. */
+		req = _cluster_msg_receive(ctx, &priv_msg, &priv_len);
+		if (req < 0) {
+			printf("Ruh roh!\n");
+			return -1;
+		}
+
+		priv_len = (priv_len < maxlen ? priv_len : maxlen);
+
+		memcpy(msg, priv_msg, priv_len);
+		free(priv_msg);
+		return req;
+	case M_CLOSE:
+		errno = ECONNRESET;
+		return -1;
+	case 0:
+		/*printf("Nothing on queue\n");*/
+		return 0;
+	default:
+		printf("PROTOCOL ERROR: Received %d\n", req);
+		return -1;
+	}
+
+	printf("%s: CODE PATH ERROR\n", __FUNCTION__);
+	return -1;
+}
+
+
+static int
+_local_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+	struct timeval tv = {0, 0};
+	struct timeval *p = NULL;
+	local_msg_hdr_t h;
+
+	if (timeout >= 0) {
+		tv.tv_sec = timeout;
+		p = &tv;
+	}
+
+	if (_read_retry(ctx->u.local_info.sockfd, &h, sizeof(h), p) < 0)
+		return -1;
+
+	if (maxlen < h.msg_len) {
+		printf("WARNING: Buffer too small for message!\n");
+		h.msg_len = maxlen;
+	}
+
+	return _read_retry(ctx->u.local_info.sockfd, msg, h.msg_len, p);
+}
+
+
+/**
+  Receive a message from a cluster-context.  This copies out the contents
+  into the user-specified buffer, and does random other things.
+ */
+static int
+local_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+	int req;
+	char priv_msg[4096];
+	size_t priv_len;
+
+	if (!msg || !maxlen) {
+		errno = EINVAL;
+		return -1;
+	}
+
+	switch (req) {
+	case M_DATA:
+		/* Copy out. */
+		req = _local_msg_receive(ctx, priv_msg, priv_len, timeout);
+		if (req <= 0)
+			return -1;
+
+		priv_len = (priv_len < maxlen ? priv_len : maxlen);
+
+		memcpy(msg, priv_msg, priv_len);
+		free(msg);
+		return req;
+	case M_CLOSE:
+		errno = ECONNRESET;
+		return -1;
+	case 0:
+		/*printf("Nothing on queue\n");*/
+		return 0;
+	default:
+		printf("PROTOCOL ERROR: Received %d\n", req);
+		return -1;
+	}
+
+	printf("%s: CODE PATH ERROR\n", __FUNCTION__);
+	return -1;
+}
+
+
+int
+msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout)
+{
+	if (!ctx || !msg || !maxlen) {
+		errno = EINVAL;
+		return -1;
+	}
+
+	switch(ctx->type) {
+	case MSG_CLUSTER:
+		return cluster_msg_receive(ctx, msg, maxlen, timeout);
+	case MSG_SOCKET:
+		return local_msg_receive(ctx, msg, maxlen, timeout);
+	default:
+		break;
+	}
+
+	errno = EINVAL;
+	return -1;
+}
+
+
+/**
+  Open a connection to the specified node ID.
+  If the speficied node is 0, this connects via the socket in
+  /var/run/cluster...
+ */
+int
+msg_open(int nodeid, int port, msgctx_t *ctx, int timeout)
+{
+	int t = 0;
+
+	errno = EINVAL;
+	if (!ctx)
+		return -1;
+
+
+	/*printf("Opening pseudo channel to node %d\n", nodeid);*/
+
+	memset(ctx, 0, sizeof(*ctx));
+	if (nodeid == CMAN_NODEID_US) {
+		if ((ctx->u.local_info.sockfd = local_connect()) < 0) {
+			return -1;
+		}
+		ctx->type = MSG_SOCKET;
+		return 0;
+	}
+
+	ctx->type = MSG_CLUSTER;
+	ctx->u.cluster_info.nodeid = nodeid;
+	ctx->u.cluster_info.port = port;
+	ctx->u.cluster_info.local_ctx = -1;
+	ctx->u.cluster_info.remote_ctx = 0;
+	ctx->u.cluster_info.queue = NULL;
+	ctx->u.cluster_info.select_pipe[0] = -1;
+	ctx->u.cluster_info.select_pipe[1] = -1;
+	pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL);
+	pthread_cond_init(&ctx->u.cluster_info.cond, NULL);
+
+	/* Assign context index */
+	if (assign_ctx(ctx) < 0)
+		return -1;
+
+	//printf("  Local CTX: %d\n", ctx->u.cluster_info.local_ctx);
+
+	/* Send open */
+	
+	//printf("  Sending control message M_OPEN\n");
+	if (cluster_send_control_msg(ctx, M_OPEN) < 0) {
+		return -1;
+	}
+
+	/* Ok, wait for a response */
+	while (!is_established(ctx)) {
+		++t;
+		if (t > timeout) {
+			msg_close(ctx);
+			errno = ETIMEDOUT;
+			return -1;
+		}
+			
+		switch(msg_wait(ctx, 1)) {
+		case M_OPEN_ACK:
+			_cluster_msg_receive(ctx, NULL, NULL);
+			break;
+		case M_NONE:
+			continue;
+		default: 
+			printf("PROTO ERROR: M_OPEN_ACK not received \n");
+		}
+	}
+
+	/*	
+	printf("  Remote CTX: %d\n",
+	       ctx->u.cluster_info.remote_ctx);
+	printf("  Pseudo channel established!\n");
+	*/
+	
+
+	return 0;
+}
+
+
+/**
+  Close a connection context (cluster or socket; it doesn't matter)
+  In the case of a cluster context, we need to clear out the 
+  receive queue and what-not.  This isn't a big deal.  Also, we
+  need to tell the other end that we're done -- just in case it does
+  not know yet ;)
+
+  With a socket, the O/S cleans up the buffers for us.
+ */
+int
+msg_close(msgctx_t *ctx)
+{
+	msg_q_t *n = NULL;
+
+	if (!ctx) {
+		errno = EINVAL;
+		return -1;
+	}
+
+	switch (ctx->type) {
+	case MSG_CLUSTER:
+		if (ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) {
+			errno = EINVAL;
+			return -1;
+		}
+		pthread_mutex_lock(&context_lock);
+		/* Other threads should not be able to see this again */
+		if (contexts[ctx->u.cluster_info.local_ctx])
+			contexts[ctx->u.cluster_info.local_ctx] = NULL;
+		pthread_mutex_unlock(&context_lock);
+		/* Clear receive queue */
+		while ((n = ctx->u.cluster_info.queue) != NULL) {
+			list_remove(&ctx->u.cluster_info.queue, n);
+			free(n->message);
+			free(n);
+		}
+		/* Send close message */
+		if (ctx->u.cluster_info.remote_ctx != 0) {
+			cluster_send_control_msg(ctx, M_CLOSE);
+		}
+
+		/* Close pipe if it's open */
+		if (ctx->u.cluster_info.select_pipe[0] >= 0) {
+			close(ctx->u.cluster_info.select_pipe[0]);
+			ctx->u.cluster_info.select_pipe[0] = -1;
+		}
+		if (ctx->u.cluster_info.select_pipe[1] >= 0) {
+			close(ctx->u.cluster_info.select_pipe[1]);
+			ctx->u.cluster_info.select_pipe[1] = -1;
+		}
+		return 0;
+	case MSG_SOCKET:
+		close(ctx->u.local_info.sockfd);
+		ctx->u.local_info.sockfd = -1;
+		return 0;
+	default:
+		break;
+	}
+
+	errno = EINVAL;
+	return -1;
+}
+
+
+/**
+  Called by cman_dispatch to deal with messages coming across the
+  cluster socket.  This function deals with fanning out the requests
+  and putting them on the per-context queues.  We don't have
+  the benefits of pre-configured buffers, so we need this.
+ */
+static void
+process_cman_msg(cman_handle_t h, void *priv, char *buf, int len,
+	    uint8_t port, int nodeid)
+{
+	cluster_msg_hdr_t *m = (cluster_msg_hdr_t *)buf;
+	msg_q_t *node;
+	msgctx_t *ctx;
+
+	if (len < sizeof(*m)) {
+		printf("Message too short.\n");
+		return;
+	}
+
+	swab_cluster_msg_hdr_t(m);
+
+#if 0
+	printf("Processing ");
+	switch(m->msg_control) {
+	case M_NONE: 
+		printf("M_NONE\n");
+		break;
+	case M_OPEN:
+		printf("M_OPEN\n");
+		break;
+	case M_OPEN_ACK: 
+		printf("M_OPEN_ACK\n");
+		break;
+	case M_DATA: 
+		printf("M_DATA\n");
+		break;
+	case M_CLOSE: 
+		printf("M_CLOSE\n");
+		break;
+	}
+
+	printf("  Node ID: %d %d\n", m->src_nodeid, nodeid);
+	printf("  Remote CTX: %d  Local CTX: %d\n", m->src_ctx, m->dest_ctx);
+#endif
+
+	if (m->dest_ctx >= MAX_CONTEXTS) {
+		printf("Context invalid; ignoring\n");
+		return;
+	}
+
+	while ((node = malloc(sizeof(*node))) == NULL) {
+		sleep(1);
+	}
+	memset(node, 0, sizeof(*node));
+	while ((node->message = malloc(len)) == NULL) {
+		sleep(1);
+	}
+	memcpy(node->message, buf, len);
+	node->len = len;
+
+	pthread_mutex_lock(&context_lock);
+	ctx = contexts[m->dest_ctx];
+	if (!ctx) {
+		/* We received a close for something we've already
+		   detached from our list.  No big deal, just
+		   ignore. */
+		free(node->message);
+		free(node);
+		pthread_mutex_unlock(&context_lock);
+		return;
+	}
+
+	pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+	list_insert(&ctx->u.cluster_info.queue, node);
+	pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+	/* If a select pipe was set up, wake it up */
+	if (ctx->u.cluster_info.select_pipe[1] >= 0)
+		write(ctx->u.cluster_info.select_pipe[1], "", 1);
+	pthread_mutex_unlock(&context_lock);
+
+	pthread_cond_signal(&ctx->u.cluster_info.cond);
+}
+
+
+/**
+  Accept a new pseudo-private connection coming in over the
+  cluster socket.
+ */
+static int
+cluster_msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx)
+{
+	errno = EINVAL;
+	cluster_msg_hdr_t *m;
+	msg_q_t *n;
+	char done = 0;
+	char foo;
+
+	if (!listenctx || !acceptctx)
+		return -1;
+	if (listenctx->u.cluster_info.local_ctx != 0)
+		return -1;
+
+	pthread_mutex_lock(&listenctx->u.cluster_info.mutex);
+
+	n = listenctx->u.cluster_info.queue;
+	if (n == NULL) {
+		pthread_mutex_unlock(&listenctx->u.cluster_info.mutex);
+		errno = EAGAIN;
+		return -1;
+	}
+
+	/* the OPEN should be the first thing on the list; this loop
+	   is probably not necessary */
+	list_do(&listenctx->u.cluster_info.queue, n) {
+
+		m = n->message;
+		switch(m->msg_control) {
+		case M_OPEN:
+			list_remove(&listenctx->u.cluster_info.queue, n);
+			/*printf("Accepting connection from %d %d\n",
+			  	 m->src_nodeid, m->src_ctx);*/
+
+			/* New connection */
+			pthread_mutex_init(&acceptctx->u.cluster_info.mutex,
+					   NULL);
+			pthread_cond_init(&acceptctx->u.cluster_info.cond,
+					  NULL);
+			acceptctx->u.cluster_info.queue = NULL;
+			acceptctx->u.cluster_info.remote_ctx = m->src_ctx;
+			acceptctx->u.cluster_info.nodeid = m->src_nodeid;
+			acceptctx->u.cluster_info.port = m->msg_port;
+
+			assign_ctx(acceptctx);
+			cluster_send_control_msg(acceptctx, M_OPEN_ACK);
+
+			if (listenctx->u.cluster_info.select_pipe[0] >= 0) {
+				read(listenctx->u.cluster_info.select_pipe[0],
+				     &foo, 1);
+			}
+
+			done = 1;
+			free(m);
+			free(n);
+
+			break;
+		case M_DATA:
+			/* Data messages (i.e. from broadcast msgs) are
+			   okay too!...  but we don't handle them here */
+			break;
+		default:
+			/* Other message?! */
+			printf("Odd... %d\n", m->msg_control);
+			break;
+		}
+
+		if (done)
+			break;
+
+	} while (!list_done(&listenctx->u.cluster_info.queue, n));
+
+	pthread_mutex_unlock(&listenctx->u.cluster_info.mutex);
+
+	return 0;
+}
+
+
+/* XXX INCOMPLETE */
+int
+msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx)
+{
+	switch(listenctx->type) {
+	case MSG_CLUSTER:
+		return cluster_msg_accept(listenctx, acceptctx);
+	case MSG_SOCKET:
+		return 0;
+	default:
+		break;
+	}
+
+	return -1;
+}
+
+
+static int
+local_listener_sk(void)
+{
+	int sock;
+	struct sockaddr_un su;
+	mode_t om;
+
+	sock = socket(PF_LOCAL, SOCK_STREAM, 0);
+	if (sock < 0)
+		return -1;
+
+	unlink(RGMGR_SOCK);
+	om = umask(077);
+	su.sun_family = PF_LOCAL;
+	snprintf(su.sun_path, sizeof(su.sun_path), RGMGR_SOCK);
+
+	if (bind(sock, &su, sizeof(su)) < 0) {
+		umask(om);
+		goto fail;
+	}
+	umask(om);
+
+	if (listen(sock, SOMAXCONN) < 0)
+		goto fail;
+
+	return sock;
+fail:
+	if (sock >= 0)
+		close(sock);
+	return -1;
+}
+
+
+/**
+  This waits for events on the cluster comms FD and
+  dispatches them using cman_dispatch.  Initially,
+  the design had no permanent threads, but that model
+  proved difficult to implement correctly.
+ */
+static void *
+cluster_comms_thread(void *arg)
+{
+	while (thread_running) {
+		poll_cluster_messages(2);
+	}
+
+	return NULL;
+}
+
+
+/*
+   Transliterates a CMAN event to a control message
+ */
+static void
+process_cman_event(cman_handle_t handle, void *private, int reason, int arg)
+{
+	cluster_msg_hdr_t *msg;
+	int *argp;
+	msg_q_t *node;
+	msgctx_t *ctx;
+
+	/* Allocate queue node */
+	while ((node = malloc(sizeof(*node))) == NULL) {
+		sleep(1);
+	}
+	memset(node, 0, sizeof(*node));
+
+	/* Allocate message: header + int (for arg) */
+	while ((msg = malloc(sizeof(int) +
+			     sizeof(cluster_msg_hdr_t))) == NULL) {
+		sleep(1);
+	}
+	memset(msg, 0, sizeof(int)+sizeof(cluster_msg_hdr_t));
+
+
+	switch(reason) {
+#if defined(LIBCMAN_VERSION) && LIBCMAN_VERSION >= 2
+	case CMAN_REASON_PORTOPENED:
+		msg->msg_control = M_PORTOPENED;
+		break;
+	case CMAN_REASON_TRY_SHUTDOWN:
+		msg->msg_control = M_TRY_SHUTDOWN;
+		break;
+#endif
+	case CMAN_REASON_PORTCLOSED:
+		msg->msg_control = M_PORTCLOSED;
+		break;
+	case CMAN_REASON_STATECHANGE:
+		msg->msg_control = M_STATECHANGE;
+		break;
+	}
+
+	argp = ((void *)msg + sizeof(cluster_msg_hdr_t));
+	*argp = arg;
+
+	node->len = sizeof(cluster_msg_hdr_t) + sizeof(int);
+	node->message = msg;
+
+	pthread_mutex_lock(&context_lock);
+	ctx = contexts[0]; /* This is the cluster context... */
+	if (!ctx) {
+		/* We received a close for something we've already
+		   detached from our list.  No big deal, just
+		   ignore. */
+		free(node->message);
+		free(node);
+		pthread_mutex_unlock(&context_lock);
+		return;
+	}
+
+	pthread_mutex_lock(&ctx->u.cluster_info.mutex);
+	list_insert(&ctx->u.cluster_info.queue, node);
+	pthread_mutex_unlock(&ctx->u.cluster_info.mutex);
+	/* If a select pipe was set up, wake it up */
+	if (ctx->u.cluster_info.select_pipe[1] >= 0)
+		write(ctx->u.cluster_info.select_pipe[1], "", 1);
+	pthread_mutex_unlock(&context_lock);
+
+	pthread_cond_signal(&ctx->u.cluster_info.cond);
+}
+
+
+/* XXX INCOMPLETE */
+int
+msg_init(chandle_t *ch)
+{
+	int e;
+	pthread_attr_t attrs;
+	msgctx_t *ctx;
+
+	pthread_mutex_lock(&ch->c_lock);
+
+	/* Set up local context */
+
+	ctx = msg_new_ctx();
+	if (!ctx) {
+		pthread_mutex_unlock(&ch->c_lock);
+		return -1;
+	}
+
+	ctx->type = MSG_SOCKET;
+	ctx->u.local_info.sockfd = local_listener_sk();
+	ctx->u.local_info.flags = SKF_LISTEN;
+
+	ch->local_ctx = ctx;
+
+	ctx = msg_new_ctx();
+
+	if (!ctx) {
+		pthread_mutex_unlock(&ch->c_lock);
+		msg_free_ctx((msgctx_t *)ch->local_ctx);
+		return -1;
+	}
+
+	gch = ch;
+
+	if (cman_start_recv_data(ch->c_cluster, process_cman_msg,
+				 RG_PORT) != 0) {
+		e = errno;
+		msg_close(ch->local_ctx);
+		pthread_mutex_unlock(&ch->c_lock);
+		msg_free_ctx((msgctx_t *)ch->local_ctx);
+		msg_free_ctx((msgctx_t *)ch->cluster_ctx);
+		errno = e;
+		return -1;
+	}
+
+	if (cman_start_notification(ch->c_cluster, process_cman_event) != 0) {
+		e = errno;
+		msg_close(ch->local_ctx);
+		pthread_mutex_unlock(&ch->c_lock);
+		msg_free_ctx((msgctx_t *)ch->local_ctx);
+		msg_free_ctx((msgctx_t *)ch->cluster_ctx);
+		errno = e;
+	}
+
+	ch->cluster_ctx = ctx;
+	pthread_mutex_unlock(&ch->c_lock);
+
+	pthread_mutex_lock(&context_lock);
+
+	memset(contexts, 0, sizeof(contexts));
+	contexts[0] = ctx;
+
+	ctx->type = MSG_CLUSTER;
+	ctx->u.cluster_info.port = RG_PORT; /* port! */
+	ctx->u.cluster_info.nodeid = 0; /* Broadcast! */
+	ctx->u.cluster_info.select_pipe[0] = -1;
+	ctx->u.cluster_info.select_pipe[1] = -1;
+	pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL);
+	pthread_cond_init(&ctx->u.cluster_info.cond, NULL);
+	pthread_mutex_unlock(&context_lock);
+
+       	pthread_attr_init(&attrs);
+       	pthread_attr_setinheritsched(&attrs, PTHREAD_INHERIT_SCHED);
+       	pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
+
+	thread_running = 1;	
+	pthread_create(&comms_thread, &attrs, cluster_comms_thread, NULL);
+
+
+	pthread_attr_destroy(&attrs);
+
+	return 0;
+}
+
+
+int
+msg_print_ctx(int ctx)
+{
+	if (!contexts[ctx])
+		return -1;
+
+	printf("Cluster Message Context %d\n", ctx);
+	printf("  Node ID %d\n", contexts[ctx]->u.cluster_info.nodeid);
+	printf("  Remote %d\n", contexts[ctx]->u.cluster_info.remote_ctx);
+	return 0;
+}
+
+
+/* XXX INCOMPLETE */
+int
+msg_shutdown(chandle_t *ch)
+{
+	if (!ch) {
+		errno = EINVAL;
+		return -1;
+	}
+
+	while (pthread_kill(comms_thread, 0) == 0)
+		sleep(1);
+
+	pthread_mutex_lock(&ch->c_lock);
+
+	/* xxx purge everything */
+	msg_close(ch->local_ctx);
+	cman_end_recv_data(ch->c_cluster);
+
+	msg_free_ctx(ch->local_ctx);
+	msg_free_ctx(ch->cluster_ctx);
+
+
+	pthread_mutex_unlock(&ch->c_lock);
+
+	return 0;
+}
+
+
+inline int
+msgctx_size(void)
+{
+	return sizeof(msgctx_t);
+}
+
+
+msgctx_t *
+msg_new_ctx(void)
+{
+	msgctx_t *p;
+	
+	printf("Alloc %d\n", sizeof(msgctx_t));
+	p = malloc(sizeof(msgctx_t));
+	if (!p)
+		return NULL;
+
+	memset(p, 0, sizeof(p));
+	p->type = MSG_NONE;
+
+	return p;
+}
+
+
+void
+msg_free_ctx(msgctx_t *dead)
+{
+	free(dead);
+}
+



                 reply	other threads:[~2006-06-13 19:22 UTC|newest]

Thread overview: [no followups] expand[flat|nested]  mbox.gz  Atom feed

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20060613192239.27217.qmail@sourceware.org \
    --to=lhh@sourceware.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.