From mboxrd@z Thu Jan 1 00:00:00 1970 From: lhh@sourceware.org Date: 13 Jun 2006 19:22:39 -0000 Subject: [Cluster-devel] cluster/rgmanager/src clulib/fdops.c clulib/lo ... Message-ID: <20060613192239.27217.qmail@sourceware.org> List-Id: To: cluster-devel.redhat.com MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit CVSROOT: /cvs/cluster Module name: cluster Changes by: lhh at sourceware.org 2006-06-13 19:22:38 Added files: rgmanager/src/clulib: fdops.c lock.c lockspace.c members.c message.c Removed files: rgmanager/src/daemons: members.c Log message: Include missing .c files in src/clulib; remove defunct src/daemons/members.c Patches: http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/fdops.c.diff?cvsroot=cluster&r1=NONE&r2=1.1 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/lock.c.diff?cvsroot=cluster&r1=NONE&r2=1.1 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/lockspace.c.diff?cvsroot=cluster&r1=NONE&r2=1.1 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/members.c.diff?cvsroot=cluster&r1=NONE&r2=1.1 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/clulib/message.c.diff?cvsroot=cluster&r1=NONE&r2=1.1 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/rgmanager/src/daemons/members.c.diff?cvsroot=cluster&r1=1.4&r2=NONE /cvs/cluster/cluster/rgmanager/src/clulib/fdops.c,v --> standard output revision 1.1 --- cluster/rgmanager/src/clulib/fdops.c +++ - 2006-06-13 19:22:38.586032000 +0000 @@ -0,0 +1,193 @@ +/* + Copyright Red Hat, Inc. 2002-2003 + + This program is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by the + Free Software Foundation; either version 2, or (at your option) any + later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 675 Mass Ave, Cambridge, + MA 02139, USA. +*/ +/** @file + * Wrapper functions around read/write/select to retry in the event + * of interrupts. + */ +#include +#include +#include +#include + +/** + * This is a wrapper around select which will retry in the case we receive + * EINTR. This is necessary for _read_retry, since it wouldn't make sense + * to have _read_retry terminate if and only if two EINTRs were received + * in a row - one during the read() call, one during the select call... + * + * See select(2) for description of parameters. + */ +int +_select_retry(int fdmax, fd_set * rfds, fd_set * wfds, fd_set * xfds, + struct timeval *timeout) +{ + int rv; + + while (1) { + rv = select(fdmax, rfds, wfds, xfds, timeout); + if ((rv == -1) && (errno == EINTR)) + /* return on EBADF/EINVAL/ENOMEM; continue on EINTR */ + continue; + return rv; + } +} + +/** + * Retries a write in the event of a non-blocked interrupt signal. + * + * @param fd File descriptor to which we are writing. + * @param buf Data buffer to send. + * @param count Number of bytes in buf to send. + * @param timeout (struct timeval) telling us how long we should retry. + * @return The number of bytes written to the file descriptor, + * or -1 on error (with errno set appropriately). + */ +ssize_t +_write_retry(int fd, void *buf, int count, struct timeval * timeout) +{ + int n, total = 0, remain = count, rv = 0; + fd_set wfds, xfds; + + while (total < count) { + + /* Create the write FD set of 1... */ + FD_ZERO(&wfds); + FD_SET(fd, &wfds); + FD_ZERO(&xfds); + FD_SET(fd, &xfds); + + /* wait for the fd to be available for writing */ + rv = _select_retry(fd + 1, NULL, &wfds, &xfds, timeout); + if (rv == -1) + return -1; + else if (rv == 0) { + errno = ETIMEDOUT; + return -1; + } + + if (FD_ISSET(fd, &xfds)) { + errno = EPIPE; + return -1; + } + + /* + * Attempt to write to fd + */ + n = write(fd, buf + (off_t) total, remain); + + /* + * When we know our fd was select()ed and we receive 0 bytes + * when we write, the fd was closed. + */ + if ((n == 0) && (rv == 1)) { + errno = EPIPE; + return -1; + } + + if (n == -1) { + if ((errno == EAGAIN) || (errno == EINTR)) { + /* + * Not ready? + */ + continue; + } + + /* Other errors: EIO, EINVAL, etc */ + return -1; + } + + total += n; + remain -= n; + } + + return total; +} + +/** + * Retry reads until we @ time out or (b) get our data. Of course, if + * timeout is NULL, it'll wait forever. + * + * @param sockfd File descriptor we want to read from. + * @param buf Preallocated buffer into which we will read data. + * @param count Number of bytes to read. + * @param timeout (struct timeval) describing how long we should retry. + * @return The number of bytes read on success, or -1 on failure. + Note that we will always return (count) or (-1). + */ +ssize_t +_read_retry(int sockfd, void *buf, int count, struct timeval * timeout) +{ + int n, total = 0, remain = count, rv = 0; + fd_set rfds, xfds; + + while (total < count) { + FD_ZERO(&rfds); + FD_SET(sockfd, &rfds); + FD_ZERO(&xfds); + FD_SET(sockfd, &xfds); + + /* + * Select on the socket, in case it closes while we're not + * looking... + */ + rv = _select_retry(sockfd + 1, &rfds, NULL, &xfds, timeout); + if (rv == -1) + return -1; + else if (rv == 0) { + errno = ETIMEDOUT; + return -1; + } + + if (FD_ISSET(sockfd, &xfds)) { + errno = EPIPE; + return -1; + } + + /* + * Attempt to read off the socket + */ + n = read(sockfd, buf + (off_t) total, remain); + + /* + * When we know our socket was select()ed and we receive 0 bytes + * when we read, the socket was closed. + */ + if ((n == 0) && (rv == 1)) { + errno = EPIPE; + return -1; + } + + if (n == -1) { + if ((errno == EAGAIN) || (errno == EINTR)) { + /* + * Not ready? Wait for data to become available + */ + continue; + } + + /* Other errors: EPIPE, EINVAL, etc */ + return -1; + } + + total += n; + remain -= n; + } + + return total; +} /cvs/cluster/cluster/rgmanager/src/clulib/lock.c,v --> standard output revision 1.1 --- cluster/rgmanager/src/clulib/lock.c +++ - 2006-06-13 19:22:38.669122000 +0000 @@ -0,0 +1,153 @@ +/* + Copyright Red Hat, Inc. 2004-2006 + + The Magma Cluster API Library is free software; you can redistribute + it and/or modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either version + 2.1 of the License, or (at your option) any later version. + + The Magma Cluster API Library is distributed in the hope that it will + be useful, but WITHOUT ANY WARRANTY; without even the implied warranty + of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + USA. + */ +/** @file + * Locking. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +static void +ast_function(void * __attribute__ ((unused)) arg) +{ +} + + +static int +wait_for_dlm_event(dlm_lshandle_t *ls) +{ + fd_set rfds; + int fd = dlm_ls_get_fd(ls); + + FD_ZERO(&rfds); + FD_SET(fd, &rfds); + + if (select(fd + 1, &rfds, NULL, NULL, NULL) == 1) + return dlm_dispatch(fd); + + return -1; +} + + +int +clu_lock(dlm_lshandle_t ls, + int mode, + struct dlm_lksb *lksb, + int options, + char *resource) +{ + int ret; + + if (!ls || !lksb || !resource || !strlen(resource)) { + errno = EINVAL; + return -1; + } + + ret = dlm_ls_lock(ls, mode, lksb, options, resource, + strlen(resource), 0, ast_function, lksb, + NULL, NULL); + + if (ret < 0) { + if (errno == ENOENT) + assert(0); + + return -1; + } + + if ((ret = (wait_for_dlm_event(ls) < 0))) { + fprintf(stderr, "wait_for_dlm_event: %d / %d\n", + ret, errno); + return -1; + } + + return 0; +} + + + +dlm_lshandle_t +clu_acquire_lockspace(const char *lsname) +{ + dlm_lshandle_t ls = NULL; + + while (!ls) { + ls = dlm_open_lockspace(lsname); + if (ls) + break; + + ls = dlm_create_lockspace(lsname, 0644); + if (ls) + break; + + /* Work around race: Someone was closing lockspace as + we were trying to open it. Retry. */ + if (errno == ENOENT) + continue; + + fprintf(stderr, "failed acquiring lockspace: %s\n", + strerror(errno)); + + return NULL; + } + + return ls; +} + + + +int +clu_unlock(dlm_lshandle_t ls, struct dlm_lksb *lksb) +{ + int ret; + + if (!ls || !lksb) { + errno = EINVAL; + return -1; + } + + ret = dlm_ls_unlock(ls, lksb->sb_lkid, 0, lksb, NULL); + + if (ret != 0) + return ret; + + /* lksb->sb_status should be EINPROG at this point */ + + if (wait_for_dlm_event(ls) < 0) { + errno = lksb->sb_status; + return -1; + } + + return 0; +} + + +int +clu_release_lockspace(dlm_lshandle_t ls, char *name) +{ + return dlm_release_lockspace(name, ls, 0); +} /cvs/cluster/cluster/rgmanager/src/clulib/lockspace.c,v --> standard output revision 1.1 --- cluster/rgmanager/src/clulib/lockspace.c +++ - 2006-06-13 19:22:38.753795000 +0000 @@ -0,0 +1,75 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +#define RGMGR_LOCKSPACE "rgmanager" + +static pthread_mutex_t _default_lock = PTHREAD_MUTEX_INITIALIZER; +static char _init = 0; +static pid_t _holder_id = 0; +static dlm_lshandle_t _default_ls; + + +static int +_init_lockspace(void) +{ + _default_ls = clu_acquire_lockspace(RGMGR_LOCKSPACE); + if (!_default_ls) { + return -1; + } + _init = 1; + return 0; +} + + +dlm_lshandle_t +ls_hold_default(void) +{ + pthread_mutex_lock(&_default_lock); + if (!_init && (_init_lockspace() < 0)) { + pthread_mutex_unlock(&_default_lock); + errno = ENOLCK; + return NULL; + } + + if (_holder_id != 0) { + pthread_mutex_unlock(&_default_lock); + errno = EAGAIN; + return NULL; + } + + _holder_id = gettid(); + pthread_mutex_unlock(&_default_lock); + return _default_ls; +} + + +void +ls_release_default(void) +{ + pthread_mutex_lock(&_default_lock); + if (_holder_id != gettid()) { + clulog(LOG_ERR, "Attempt to release lockspace when I am not" + "the holder!\n"); + raise(SIGSTOP); + } + + _holder_id = 0; + pthread_mutex_unlock(&_default_lock); +} + + /cvs/cluster/cluster/rgmanager/src/clulib/members.c,v --> standard output revision 1.1 --- cluster/rgmanager/src/clulib/members.c +++ - 2006-06-13 19:22:38.840838000 +0000 @@ -0,0 +1,397 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static int my_node_id = -1; +static pthread_rwlock_t memblock = PTHREAD_RWLOCK_INITIALIZER; +static cluster_member_list_t *membership = NULL; + + +/** + Return the stored node ID. Since this should never + change during the duration of running rgmanager, it is + not protected by a lock. + */ +int +my_id(void) +{ + return my_node_id; +} + + +int +set_my_id(int id) +{ + my_node_id = id; + return 0; +} + + +/** + Determine and store the local node ID. This should + only ever be called once during initialization. + */ +int +get_my_nodeid(cman_handle_t h) +{ + cman_node_t node; + + if (cman_get_node(h, CMAN_NODEID_US, &node) != 0) + return -1; + + return node.cn_nodeid; +} + + + +/** + Generate and return a list of members which are now online in a new + membership list, given the old membership list. User must call + @ref free_member_list + to free the returned + @ref cluster_member_list_t + structure. + + @param old Old membership list + @param new New membership list + @return NULL if no members were gained, or a newly + allocated cluster_member_list_t structure. + */ +cluster_member_list_t * +memb_gained(cluster_member_list_t *old, cluster_member_list_t *new) +{ + int count, x, y; + char in_old = 0; + cluster_member_list_t *gained; + + /* No nodes in new? Then nothing could have been gained */ + if (!new || !new->cml_count) + return NULL; + + /* Nothing in old? Duplicate 'new' and return it. */ + if (!old || !old->cml_count) { + gained = cml_alloc(new->cml_count); + if (!gained) + return NULL; + memcpy(gained, new, cml_size(new->cml_count)); + return gained; + } + + /* Use greatest possible count */ + count = (old->cml_count > new->cml_count ? + cml_size(old->cml_count) : cml_size(new->cml_count)); + + gained = malloc(count); + if (!gained) + return NULL; + memset(gained, 0, count); + + for (x = 0; x < new->cml_count; x++) { + + /* This one isn't active at the moment; it could not have + been gained. */ + if (!new->cml_members[x].cn_member) + continue; + + in_old = 0; + for (y = 0; y < old->cml_count; y++) { + if ((new->cml_members[x].cn_nodeid != + old->cml_members[y].cn_nodeid) || + !old->cml_members[y].cn_member) + continue; + in_old = 1; + break; + } + + if (in_old) + continue; + memcpy(&gained->cml_members[gained->cml_count], + &new->cml_members[x], sizeof(cman_node_t)); + } + + if (gained->cml_count == 0) { + free(gained); + gained = NULL; + } + + return gained; +} + + +/** + Generate and return a list of members which are lost or no longer online + in a new membership list, given the old membership list. User must call + @ref free_member_list + to free the returned + @ref cluster_member_list_t + structure. + + @param old Old membership list + @param new New membership list + @return NULL if no members were lost, or a newly + allocated cluster_member_list_t structure. + */ +cluster_member_list_t * +memb_lost(cluster_member_list_t *old, cluster_member_list_t *new) +{ + cluster_member_list_t *ret; + int x; + + /* Reverse. ;) */ + ret = memb_gained(new, old); + if (!ret) + return NULL; + + for (x = 0; x < ret->cml_count; x++) { + ret->cml_members[x].cn_member = 0; + } + + return ret; +} + + + +void +member_list_update(cluster_member_list_t *new_ml) +{ + pthread_rwlock_wrlock(&memblock); + if (membership) + free_member_list(membership); + if (new_ml) + membership = member_list_dup(new_ml); + else + membership = NULL; + pthread_rwlock_unlock(&memblock); +} + + +cluster_member_list_t * +member_list(void) +{ + cluster_member_list_t *ret = NULL; + pthread_rwlock_rdlock(&memblock); + if (membership) + ret = member_list_dup(membership); + pthread_rwlock_unlock(&memblock); + return ret; +} + + +char * +member_name(uint64_t id, char *buf, int buflen) +{ + char *n; + + if (!buf || !buflen) + return NULL; + + pthread_rwlock_rdlock(&memblock); + n = memb_id_to_name(membership, id); + if (n) { + strncpy(buf, n, buflen); + } else { + buf[0] = 0; + } + pthread_rwlock_unlock(&memblock); + return buf; +} + + + +cluster_member_list_t * +get_member_list(cman_handle_t h) +{ + int c; + cluster_member_list_t *ml = NULL; + cman_node_t *nodes = NULL; + + do { + if (nodes) + free(nodes); + + c = cman_get_node_count(h); + if (c <= 0) + return NULL; + + if (!ml) + ml = malloc(sizeof(*ml)); + if (!ml) + return NULL; + + nodes = malloc(sizeof(*nodes) * c); + if (!nodes) { + free(ml); + return NULL; + } + + memset(ml, 0, sizeof(*ml)); + memset(nodes, 0, sizeof(*nodes)*c); + + cman_get_nodes(h, c, &ml->cml_count, nodes); + + } while (ml->cml_count != c); + + ml->cml_members = nodes; + ml->cml_count = c; + return ml; +} + + +void +free_member_list(cluster_member_list_t *ml) +{ + if (ml) { + if (ml->cml_members) + free(ml->cml_members); + free(ml); + } +} + + +int +memb_online(cluster_member_list_t *ml, int nodeid) +{ + int x = 0; + + for (x = 0; x < ml->cml_count; x++) { + if (ml->cml_members[x].cn_nodeid == nodeid) + return ml->cml_members[x].cn_member; + } + + return 0; +} + + +int +memb_count(cluster_member_list_t *ml) +{ + int x = 0, count = 0; + + for (x = 0; x < ml->cml_count; x++) { + if (ml->cml_members[x].cn_member) + ++count; + } + + return count; +} + + +int +memb_mark_down(cluster_member_list_t *ml, int nodeid) +{ + int x = 0; + + for (x = 0; x < ml->cml_count; x++) { + if (ml->cml_members[x].cn_nodeid == nodeid) + ml->cml_members[x].cn_member = 0; + } + + return 0; +} + + + +char * +memb_id_to_name(cluster_member_list_t *ml, int nodeid) +{ + int x = 0; + + for (x = 0; x < ml->cml_count; x++) { + if (ml->cml_members[x].cn_nodeid == nodeid) + return ml->cml_members[x].cn_name; + } + + return 0; +} + + +cman_node_t * +memb_id_to_p(cluster_member_list_t *ml, int nodeid) +{ + int x = 0; + + for (x = 0; x < ml->cml_count; x++) { + if (ml->cml_members[x].cn_nodeid == nodeid) + return &ml->cml_members[x]; + } + + return 0; +} + + +int +memb_online_name(cluster_member_list_t *ml, char *name) +{ + int x = 0; + + for (x = 0; x < ml->cml_count; x++) { + if (!strcasecmp(ml->cml_members[x].cn_name, name)) + return ml->cml_members[x].cn_member; + } + + return 0; +} + + +int +memb_name_to_id(cluster_member_list_t *ml, char *name) +{ + int x = 0; + + for (x = 0; x < ml->cml_count; x++) { + if (!strcasecmp(ml->cml_members[x].cn_name, name)) + return ml->cml_members[x].cn_nodeid; + } + + return 0; +} + + +cman_node_t * +memb_name_to_p(cluster_member_list_t *ml, char *name) +{ + int x = 0; + + for (x = 0; x < ml->cml_count; x++) { + if (!strcasecmp(ml->cml_members[x].cn_name, name)) + return &ml->cml_members[x]; + } + + return 0; +} + +/** + Duplicate and return a cluster member list structure, sans the DNS resolution + information. + + @param orig List to duplicate. + @return NULL if there is nothing to duplicate or duplication + fails, or a newly allocated cluster_member_list_t + structure. + */ +cluster_member_list_t * +member_list_dup(cluster_member_list_t *orig) +{ + cluster_member_list_t *ret = NULL; + + if (!orig) + return NULL; + + ret = malloc(cml_size(orig->cml_count)); + memset(ret, 0, cml_size(orig->cml_count)); + memcpy(ret, orig, cml_size(orig->cml_count)); + + return ret; +} + /cvs/cluster/cluster/rgmanager/src/clulib/message.c,v --> standard output revision 1.1 --- cluster/rgmanager/src/clulib/message.c +++ - 2006-06-13 19:22:38.921541000 +0000 @@ -0,0 +1,1357 @@ +#define _MESSAGE_BUILD +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + + +/* Ripped from ccsd's setup_local_socket */ +#define RGMGR_SOCK "/var/run/cluster/rgmanager.sk" +#define MAX_CONTEXTS 32 /* Testing; production should be 1024-ish */ + +/* Context 0 is reserved for control messages */ + +/* Local-ish contexts */ +static pthread_mutex_t context_lock = PTHREAD_MUTEX_INITIALIZER; +static msgctx_t *contexts[MAX_CONTEXTS]; +static uint32_t context_index = 1; +static chandle_t *gch; +pthread_t comms_thread; +int thread_running; + + +#define is_established(ctx) \ + (((ctx->type == MSG_CLUSTER) && \ + (ctx->u.cluster_info.remote_ctx && ctx->u.cluster_info.local_ctx)) || \ + ((ctx->type == MSG_SOCKET) && \ + (ctx->u.local_info.sockfd != -1))) + + +static int +local_connect(void) +{ + struct sockaddr_un sun; + int sock = -1, error = 0; + + memset(&sun, 0, sizeof(sun)); + sun.sun_family = PF_LOCAL; + snprintf(sun.sun_path, sizeof(sun.sun_path), RGMGR_SOCK); + + sock = socket(PF_LOCAL, SOCK_STREAM, 0); + if (sock < 0) { + error = errno; + goto fail; + } + + error = connect(sock, (struct sockaddr *)&sun, sizeof(sun)); + if (error < 0) { + error = errno; + goto fail; + } + + sock = error; +fail: + + return sock; +} + + +static int +send_cluster_message(msgctx_t *ctx, void *msg, size_t len) +{ + char buf[4096]; + cluster_msg_hdr_t *h = (void *)buf; + int ret; + char *msgptr = (buf + sizeof(*h)); + + if ((len + sizeof(*h)) > sizeof(buf)) { + errno = E2BIG; + return -1; + } + + h->msg_control = M_DATA; + h->src_ctx = ctx->u.cluster_info.local_ctx; + h->dest_ctx = ctx->u.cluster_info.remote_ctx; + h->msg_port = ctx->u.cluster_info.port; + memcpy(msgptr, msg, len); + + /* + printf("sending cluster message, length = %d to nodeid %d port %d\n", + len + sizeof(*h), ctx->u.cluster_info.nodeid, ctx->u.cluster_info.port); + */ + + pthread_mutex_lock(&gch->c_lock); + h->src_nodeid = gch->c_nodeid; + + swab_cluster_msg_hdr_t(h); + + ret = cman_send_data(gch->c_cluster, (void *)h, len + sizeof(*h), + ctx->u.cluster_info.nodeid, + ctx->u.cluster_info.port, 0); + + pthread_mutex_unlock(&gch->c_lock); + + return len + sizeof(h); +} + + +/** + Wrapper around write(2) + */ +static int +send_socket_message(msgctx_t *ctx, void *msg, size_t len) +{ + char buf[4096]; + local_msg_hdr_t *h = (local_msg_hdr_t *)buf; + char *msgptr = (buf + sizeof(*h)); + + /* encapsulate ... ? */ + if ((len + sizeof(*h)) > sizeof(buf)) { + errno = E2BIG; + return -1; + } + + h->msg_control = M_DATA; + h->msg_len = len; + memcpy(msgptr, msg, len); + + return _write_retry(ctx->u.local_info.sockfd, msg, len + sizeof(*h), NULL); +} + + +/** + Message sending API. Sends to the cluster or a socket, depending on + the context. + */ +int +msg_send(msgctx_t *ctx, void *msg, size_t len) +{ + if (!ctx || !msg || !len) { + errno = EINVAL; + return -1; + } + + switch(ctx->type) { + case MSG_CLUSTER: + return send_cluster_message(ctx, msg, len); + case MSG_SOCKET: + return send_socket_message(ctx, msg, len); + default: + break; + } + + errno = EINVAL; + return -1; +} + + +/** + Assign a (free) cluster context ID + */ +static int +assign_ctx(msgctx_t *ctx) +{ + int start; + + /* Assign context index */ + ctx->type = MSG_CLUSTER; + + pthread_mutex_lock(&context_lock); + start = context_index++; + if (context_index >= MAX_CONTEXTS || context_index <= 0) + context_index = 1; + do { + if (contexts[context_index]) { + ++context_index; + if (context_index >= MAX_CONTEXTS) + context_index = 1; + + if (context_index == start) { + pthread_mutex_unlock(&context_lock); + errno = EAGAIN; + return -1; + } + + continue; + } + + contexts[context_index] = ctx; + ctx->u.cluster_info.local_ctx = context_index; + + } while (0); + pthread_mutex_unlock(&context_lock); + + return 0; +} + + +/* See if anything's on the cluster socket. If so, dispatch it + on to the requisite queues + XXX should be passed a connection arg! */ +static int +poll_cluster_messages(int timeout) +{ + int ret = -1; + fd_set rfds; + int fd; + struct timeval tv; + struct timeval *p = NULL; + + if (timeout >= 0) { + p = &tv; + tv.tv_sec = tv.tv_usec = timeout; + } + printf("%s\n", __FUNCTION__); + + FD_ZERO(&rfds); + + //pthread_mutex_lock(&gch->c_lock); + fd = cman_get_fd(gch->c_cluster); + FD_SET(fd, &rfds); + + if (select(fd + 1, &rfds, NULL, NULL, p) == 1) { + cman_dispatch(gch->c_cluster, 0); + ret = 0; + } + //pthread_mutex_unlock(&gch->c_lock); + + return ret; +} + + +/** + This is used to establish and tear down pseudo-private + contexts which are shared with the cluster context. + */ +static int +cluster_send_control_msg(msgctx_t *ctx, int type) +{ + cluster_msg_hdr_t cm; + + cm.msg_control = (uint8_t)type; + cm.src_nodeid = gch->c_nodeid; + cm.dest_ctx = ctx->u.cluster_info.remote_ctx; + cm.src_ctx = ctx->u.cluster_info.local_ctx; + cm.msg_port = ctx->u.cluster_info.port; + + swab_cluster_msg_hdr_t(&cm); + + return (cman_send_data(gch->c_cluster, (void *)&cm, sizeof(cm), + ctx->u.cluster_info.nodeid, + ctx->u.cluster_info.port, 0)); +} + + +/** + Wait for a message on a context. + */ +static int +cluster_msg_wait(msgctx_t *ctx, int timeout) +{ + struct timespec ts = {0, 0}; + int req = M_NONE; + struct timeval start; + struct timeval now; + + + if (timeout > 0) + gettimeofday(&start, NULL); + + ts.tv_sec = !!timeout; + + pthread_mutex_lock(&ctx->u.cluster_info.mutex); + while (1) { + /* See if we dispatched any messages on to our queue */ + if (ctx->u.cluster_info.queue) { + req = ctx->u.cluster_info.queue->message->msg_control; + /*printf("Queue not empty CTX%d : %d\n", + ctx->u.cluster_info.local_ctx, req);*/ + break; + } + + if (timeout == 0) + break; + + /* Ok, someone else has the mutex on our FD. Go to + sleep on a cond; maybe they'll wake us up */ + if (pthread_cond_timedwait(&ctx->u.cluster_info.cond, + &ctx->u.cluster_info.mutex, + &ts) < 0) { + + /* Mutex held */ + if (errno == ETIMEDOUT) { + if (timeout < 0) { + ts.tv_sec = 1; + ts.tv_nsec = 0; + continue; + } + + ts.tv_sec = !!timeout; + + /* Done */ + break; + } + } + + if (timeout > 0) { + gettimeofday(&now, NULL); + /* XXX imprecise */ + if (now.tv_sec - start.tv_sec > timeout) + break; + } + } + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + + return req; +} + + +static int +peekypeeky(int fd) +{ + local_msg_hdr_t h; + int ret; + + while ((ret = recv(fd, (void *)&h, sizeof(h), MSG_PEEK)) < 0) { + if (errno == EINTR) + continue; + return -1; + } + + if (ret == sizeof(h)) + return h.msg_control; + + if (ret == 0) + /* Socket closed? */ + return M_CLOSE; + + /* XXX */ + printf("PROTOCOL ERROR: Invalid message\n"); + return M_CLOSE; +} + + +static int +local_msg_wait(msgctx_t *ctx, int timeout) +{ + fd_set rfds; + struct timeval tv = {0, 0}; + struct timeval *p = NULL; + + if (timeout >= 0) { + tv.tv_sec = timeout; + p = &tv; + } + + FD_ZERO(&rfds); + FD_SET(ctx->u.local_info.sockfd, &rfds); + + if (_select_retry(ctx->u.local_info.sockfd + 1, &rfds, + NULL, NULL, p) == 1) { + return peekypeeky(ctx->u.local_info.sockfd); + } + + return M_NONE; +} + + +int +msg_get_nodeid(msgctx_t *ctx) +{ + switch(ctx->type) { + case MSG_CLUSTER: + return ctx->u.cluster_info.nodeid; + case MSG_SOCKET: + return 0; + default: + break; + } + + return -1; +} + + +int +msg_fd_set(msgctx_t *ctx, fd_set *fds, int *max) +{ + int e; + switch(ctx->type) { + case MSG_CLUSTER: + pthread_mutex_lock(&ctx->u.cluster_info.mutex); + if (ctx->u.cluster_info.select_pipe[0] < 0) { + if (pipe(ctx->u.cluster_info.select_pipe) < 0) { + e = errno; + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + errno = e; + return -1; + } + + printf("%s: Created cluster CTX select pipe " + "rd=%d wr=%d\n", __FUNCTION__, + ctx->u.cluster_info.select_pipe[0], + ctx->u.cluster_info.select_pipe[1]); + + } + + e = ctx->u.cluster_info.select_pipe[0]; + printf("%s: cluster %d\n", __FUNCTION__, e); + FD_SET(e, fds); + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + + if (e > *max) + *max = e; + return 0; + + case MSG_SOCKET: + if (ctx->u.local_info.sockfd >= 0) { + printf("%s: local %d\n", __FUNCTION__, + ctx->u.local_info.sockfd); + FD_SET(ctx->u.local_info.sockfd, fds); + + if (ctx->u.local_info.sockfd > *max) + *max = ctx->u.local_info.sockfd; + return 0; + } + return -1; + default: + break; + } + + return -1; +} + + +int +msg_fd_isset(msgctx_t *ctx, fd_set *fds) +{ + errno = EINVAL; + + if (!fds || !ctx) + return -1; + + switch(ctx->type) { + case MSG_CLUSTER: + pthread_mutex_lock(&ctx->u.cluster_info.mutex); + if (ctx->u.cluster_info.select_pipe[0] >= 0 && + FD_ISSET(ctx->u.cluster_info.select_pipe[0], fds)) { + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + return 1; + } + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + return 0; + case MSG_SOCKET: + if (ctx->u.local_info.sockfd >= 0 && + FD_ISSET(ctx->u.local_info.sockfd, fds)) { + return 1; + } + return 0; + default: + break; + } + + return -1; +} + + +int +msg_fd_clr(msgctx_t *ctx, fd_set *fds) +{ + errno = EINVAL; + + if (!fds || !ctx) + return -1; + + switch(ctx->type) { + case MSG_CLUSTER: + pthread_mutex_lock(&ctx->u.cluster_info.mutex); + if (ctx->u.cluster_info.select_pipe[0] >= 0) { + FD_CLR(ctx->u.cluster_info.select_pipe[0], fds); + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + return 1; + } + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + return 0; + case MSG_SOCKET: + if (ctx->u.local_info.sockfd >= 0) { + FD_CLR(ctx->u.local_info.sockfd, fds); + return 1; + } + return 0; + default: + break; + } + + return -1; +} + + +/** + This polls the context for 'timeout' seconds waiting for data + to become available. Return codes are M_DATA, M_CLOSE, and M_OPEN + + M_DATA - data available + M_OPEN - needs msg_accept( + M_CLOSE - context / socket closed by remote host + M_NONE - nothing available + + For the cluster connection, the return code could also map to one of + the CMAN return codes + + M_STATECHANGE - node has changed state + + */ +int +msg_wait(msgctx_t *ctx, int timeout) +{ + + if (!ctx) { + errno = EINVAL; + return -1; + } + + switch(ctx->type) { + case MSG_CLUSTER: + return cluster_msg_wait(ctx, timeout); + case MSG_SOCKET: + return local_msg_wait(ctx, timeout); + default: + break; + } + + errno = EINVAL; + return -1; +} + + +int +_cluster_msg_receive(msgctx_t *ctx, void **msg, size_t *len) +{ + cluster_msg_hdr_t *m; + msg_q_t *n; + int ret = 0; + + if (msg) + *msg = NULL; + if (len) + *len = 0; + + if (ctx->u.cluster_info.local_ctx < 0 || + ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) { + errno = EBADF; + return -1; + } + + /* trigger receive here */ + pthread_mutex_lock(&ctx->u.cluster_info.mutex); + + n = ctx->u.cluster_info.queue; + if (n == NULL) { + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + errno = EAGAIN; + return -1; + } + + list_remove(&ctx->u.cluster_info.queue, n); + + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + + m = n->message; + switch(m->msg_control) { + case M_CLOSE: + ctx->u.cluster_info.remote_ctx = 0; + break; + case M_OPEN_ACK: + /* Response to new connection */ + ctx->u.cluster_info.remote_ctx = m->src_ctx; + break; + case M_DATA: + /* Kill the message control structure */ + memmove(m, &m[1], n->len - sizeof(*m)); + if (msg) + *msg = (void *)m; + else { + printf("Warning: dropping data message\n"); + free(m); + } + if (len) + *len = (n->len - sizeof(*m)); + ret = (n->len - sizeof(*m)); + free(n); + + printf("Message received\n"); + return ret; + case M_OPEN: + /* Someone is trying to open a connection */ + default: + /* ?!! */ + ret = -1; + break; + } + + free(m); + free(n); + + return ret; +} + + +/** + Receive a message from a cluster-context. This copies out the contents + into the user-specified buffer, and does random other things. + */ +static int +cluster_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout) +{ + int req; + void *priv_msg; + size_t priv_len; + + if (!msg || !maxlen) { + errno = EINVAL; + return -1; + } + + req = cluster_msg_wait(ctx, timeout); + + switch (req) { + case M_DATA: + /* Copy out. */ + req = _cluster_msg_receive(ctx, &priv_msg, &priv_len); + if (req < 0) { + printf("Ruh roh!\n"); + return -1; + } + + priv_len = (priv_len < maxlen ? priv_len : maxlen); + + memcpy(msg, priv_msg, priv_len); + free(priv_msg); + return req; + case M_CLOSE: + errno = ECONNRESET; + return -1; + case 0: + /*printf("Nothing on queue\n");*/ + return 0; + default: + printf("PROTOCOL ERROR: Received %d\n", req); + return -1; + } + + printf("%s: CODE PATH ERROR\n", __FUNCTION__); + return -1; +} + + +static int +_local_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout) +{ + struct timeval tv = {0, 0}; + struct timeval *p = NULL; + local_msg_hdr_t h; + + if (timeout >= 0) { + tv.tv_sec = timeout; + p = &tv; + } + + if (_read_retry(ctx->u.local_info.sockfd, &h, sizeof(h), p) < 0) + return -1; + + if (maxlen < h.msg_len) { + printf("WARNING: Buffer too small for message!\n"); + h.msg_len = maxlen; + } + + return _read_retry(ctx->u.local_info.sockfd, msg, h.msg_len, p); +} + + +/** + Receive a message from a cluster-context. This copies out the contents + into the user-specified buffer, and does random other things. + */ +static int +local_msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout) +{ + int req; + char priv_msg[4096]; + size_t priv_len; + + if (!msg || !maxlen) { + errno = EINVAL; + return -1; + } + + switch (req) { + case M_DATA: + /* Copy out. */ + req = _local_msg_receive(ctx, priv_msg, priv_len, timeout); + if (req <= 0) + return -1; + + priv_len = (priv_len < maxlen ? priv_len : maxlen); + + memcpy(msg, priv_msg, priv_len); + free(msg); + return req; + case M_CLOSE: + errno = ECONNRESET; + return -1; + case 0: + /*printf("Nothing on queue\n");*/ + return 0; + default: + printf("PROTOCOL ERROR: Received %d\n", req); + return -1; + } + + printf("%s: CODE PATH ERROR\n", __FUNCTION__); + return -1; +} + + +int +msg_receive(msgctx_t *ctx, void *msg, size_t maxlen, int timeout) +{ + if (!ctx || !msg || !maxlen) { + errno = EINVAL; + return -1; + } + + switch(ctx->type) { + case MSG_CLUSTER: + return cluster_msg_receive(ctx, msg, maxlen, timeout); + case MSG_SOCKET: + return local_msg_receive(ctx, msg, maxlen, timeout); + default: + break; + } + + errno = EINVAL; + return -1; +} + + +/** + Open a connection to the specified node ID. + If the speficied node is 0, this connects via the socket in + /var/run/cluster... + */ +int +msg_open(int nodeid, int port, msgctx_t *ctx, int timeout) +{ + int t = 0; + + errno = EINVAL; + if (!ctx) + return -1; + + + /*printf("Opening pseudo channel to node %d\n", nodeid);*/ + + memset(ctx, 0, sizeof(*ctx)); + if (nodeid == CMAN_NODEID_US) { + if ((ctx->u.local_info.sockfd = local_connect()) < 0) { + return -1; + } + ctx->type = MSG_SOCKET; + return 0; + } + + ctx->type = MSG_CLUSTER; + ctx->u.cluster_info.nodeid = nodeid; + ctx->u.cluster_info.port = port; + ctx->u.cluster_info.local_ctx = -1; + ctx->u.cluster_info.remote_ctx = 0; + ctx->u.cluster_info.queue = NULL; + ctx->u.cluster_info.select_pipe[0] = -1; + ctx->u.cluster_info.select_pipe[1] = -1; + pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL); + pthread_cond_init(&ctx->u.cluster_info.cond, NULL); + + /* Assign context index */ + if (assign_ctx(ctx) < 0) + return -1; + + //printf(" Local CTX: %d\n", ctx->u.cluster_info.local_ctx); + + /* Send open */ + + //printf(" Sending control message M_OPEN\n"); + if (cluster_send_control_msg(ctx, M_OPEN) < 0) { + return -1; + } + + /* Ok, wait for a response */ + while (!is_established(ctx)) { + ++t; + if (t > timeout) { + msg_close(ctx); + errno = ETIMEDOUT; + return -1; + } + + switch(msg_wait(ctx, 1)) { + case M_OPEN_ACK: + _cluster_msg_receive(ctx, NULL, NULL); + break; + case M_NONE: + continue; + default: + printf("PROTO ERROR: M_OPEN_ACK not received \n"); + } + } + + /* + printf(" Remote CTX: %d\n", + ctx->u.cluster_info.remote_ctx); + printf(" Pseudo channel established!\n"); + */ + + + return 0; +} + + +/** + Close a connection context (cluster or socket; it doesn't matter) + In the case of a cluster context, we need to clear out the + receive queue and what-not. This isn't a big deal. Also, we + need to tell the other end that we're done -- just in case it does + not know yet ;) + + With a socket, the O/S cleans up the buffers for us. + */ +int +msg_close(msgctx_t *ctx) +{ + msg_q_t *n = NULL; + + if (!ctx) { + errno = EINVAL; + return -1; + } + + switch (ctx->type) { + case MSG_CLUSTER: + if (ctx->u.cluster_info.local_ctx >= MAX_CONTEXTS) { + errno = EINVAL; + return -1; + } + pthread_mutex_lock(&context_lock); + /* Other threads should not be able to see this again */ + if (contexts[ctx->u.cluster_info.local_ctx]) + contexts[ctx->u.cluster_info.local_ctx] = NULL; + pthread_mutex_unlock(&context_lock); + /* Clear receive queue */ + while ((n = ctx->u.cluster_info.queue) != NULL) { + list_remove(&ctx->u.cluster_info.queue, n); + free(n->message); + free(n); + } + /* Send close message */ + if (ctx->u.cluster_info.remote_ctx != 0) { + cluster_send_control_msg(ctx, M_CLOSE); + } + + /* Close pipe if it's open */ + if (ctx->u.cluster_info.select_pipe[0] >= 0) { + close(ctx->u.cluster_info.select_pipe[0]); + ctx->u.cluster_info.select_pipe[0] = -1; + } + if (ctx->u.cluster_info.select_pipe[1] >= 0) { + close(ctx->u.cluster_info.select_pipe[1]); + ctx->u.cluster_info.select_pipe[1] = -1; + } + return 0; + case MSG_SOCKET: + close(ctx->u.local_info.sockfd); + ctx->u.local_info.sockfd = -1; + return 0; + default: + break; + } + + errno = EINVAL; + return -1; +} + + +/** + Called by cman_dispatch to deal with messages coming across the + cluster socket. This function deals with fanning out the requests + and putting them on the per-context queues. We don't have + the benefits of pre-configured buffers, so we need this. + */ +static void +process_cman_msg(cman_handle_t h, void *priv, char *buf, int len, + uint8_t port, int nodeid) +{ + cluster_msg_hdr_t *m = (cluster_msg_hdr_t *)buf; + msg_q_t *node; + msgctx_t *ctx; + + if (len < sizeof(*m)) { + printf("Message too short.\n"); + return; + } + + swab_cluster_msg_hdr_t(m); + +#if 0 + printf("Processing "); + switch(m->msg_control) { + case M_NONE: + printf("M_NONE\n"); + break; + case M_OPEN: + printf("M_OPEN\n"); + break; + case M_OPEN_ACK: + printf("M_OPEN_ACK\n"); + break; + case M_DATA: + printf("M_DATA\n"); + break; + case M_CLOSE: + printf("M_CLOSE\n"); + break; + } + + printf(" Node ID: %d %d\n", m->src_nodeid, nodeid); + printf(" Remote CTX: %d Local CTX: %d\n", m->src_ctx, m->dest_ctx); +#endif + + if (m->dest_ctx >= MAX_CONTEXTS) { + printf("Context invalid; ignoring\n"); + return; + } + + while ((node = malloc(sizeof(*node))) == NULL) { + sleep(1); + } + memset(node, 0, sizeof(*node)); + while ((node->message = malloc(len)) == NULL) { + sleep(1); + } + memcpy(node->message, buf, len); + node->len = len; + + pthread_mutex_lock(&context_lock); + ctx = contexts[m->dest_ctx]; + if (!ctx) { + /* We received a close for something we've already + detached from our list. No big deal, just + ignore. */ + free(node->message); + free(node); + pthread_mutex_unlock(&context_lock); + return; + } + + pthread_mutex_lock(&ctx->u.cluster_info.mutex); + list_insert(&ctx->u.cluster_info.queue, node); + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + /* If a select pipe was set up, wake it up */ + if (ctx->u.cluster_info.select_pipe[1] >= 0) + write(ctx->u.cluster_info.select_pipe[1], "", 1); + pthread_mutex_unlock(&context_lock); + + pthread_cond_signal(&ctx->u.cluster_info.cond); +} + + +/** + Accept a new pseudo-private connection coming in over the + cluster socket. + */ +static int +cluster_msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx) +{ + errno = EINVAL; + cluster_msg_hdr_t *m; + msg_q_t *n; + char done = 0; + char foo; + + if (!listenctx || !acceptctx) + return -1; + if (listenctx->u.cluster_info.local_ctx != 0) + return -1; + + pthread_mutex_lock(&listenctx->u.cluster_info.mutex); + + n = listenctx->u.cluster_info.queue; + if (n == NULL) { + pthread_mutex_unlock(&listenctx->u.cluster_info.mutex); + errno = EAGAIN; + return -1; + } + + /* the OPEN should be the first thing on the list; this loop + is probably not necessary */ + list_do(&listenctx->u.cluster_info.queue, n) { + + m = n->message; + switch(m->msg_control) { + case M_OPEN: + list_remove(&listenctx->u.cluster_info.queue, n); + /*printf("Accepting connection from %d %d\n", + m->src_nodeid, m->src_ctx);*/ + + /* New connection */ + pthread_mutex_init(&acceptctx->u.cluster_info.mutex, + NULL); + pthread_cond_init(&acceptctx->u.cluster_info.cond, + NULL); + acceptctx->u.cluster_info.queue = NULL; + acceptctx->u.cluster_info.remote_ctx = m->src_ctx; + acceptctx->u.cluster_info.nodeid = m->src_nodeid; + acceptctx->u.cluster_info.port = m->msg_port; + + assign_ctx(acceptctx); + cluster_send_control_msg(acceptctx, M_OPEN_ACK); + + if (listenctx->u.cluster_info.select_pipe[0] >= 0) { + read(listenctx->u.cluster_info.select_pipe[0], + &foo, 1); + } + + done = 1; + free(m); + free(n); + + break; + case M_DATA: + /* Data messages (i.e. from broadcast msgs) are + okay too!... but we don't handle them here */ + break; + default: + /* Other message?! */ + printf("Odd... %d\n", m->msg_control); + break; + } + + if (done) + break; + + } while (!list_done(&listenctx->u.cluster_info.queue, n)); + + pthread_mutex_unlock(&listenctx->u.cluster_info.mutex); + + return 0; +} + + +/* XXX INCOMPLETE */ +int +msg_accept(msgctx_t *listenctx, msgctx_t *acceptctx) +{ + switch(listenctx->type) { + case MSG_CLUSTER: + return cluster_msg_accept(listenctx, acceptctx); + case MSG_SOCKET: + return 0; + default: + break; + } + + return -1; +} + + +static int +local_listener_sk(void) +{ + int sock; + struct sockaddr_un su; + mode_t om; + + sock = socket(PF_LOCAL, SOCK_STREAM, 0); + if (sock < 0) + return -1; + + unlink(RGMGR_SOCK); + om = umask(077); + su.sun_family = PF_LOCAL; + snprintf(su.sun_path, sizeof(su.sun_path), RGMGR_SOCK); + + if (bind(sock, &su, sizeof(su)) < 0) { + umask(om); + goto fail; + } + umask(om); + + if (listen(sock, SOMAXCONN) < 0) + goto fail; + + return sock; +fail: + if (sock >= 0) + close(sock); + return -1; +} + + +/** + This waits for events on the cluster comms FD and + dispatches them using cman_dispatch. Initially, + the design had no permanent threads, but that model + proved difficult to implement correctly. + */ +static void * +cluster_comms_thread(void *arg) +{ + while (thread_running) { + poll_cluster_messages(2); + } + + return NULL; +} + + +/* + Transliterates a CMAN event to a control message + */ +static void +process_cman_event(cman_handle_t handle, void *private, int reason, int arg) +{ + cluster_msg_hdr_t *msg; + int *argp; + msg_q_t *node; + msgctx_t *ctx; + + /* Allocate queue node */ + while ((node = malloc(sizeof(*node))) == NULL) { + sleep(1); + } + memset(node, 0, sizeof(*node)); + + /* Allocate message: header + int (for arg) */ + while ((msg = malloc(sizeof(int) + + sizeof(cluster_msg_hdr_t))) == NULL) { + sleep(1); + } + memset(msg, 0, sizeof(int)+sizeof(cluster_msg_hdr_t)); + + + switch(reason) { +#if defined(LIBCMAN_VERSION) && LIBCMAN_VERSION >= 2 + case CMAN_REASON_PORTOPENED: + msg->msg_control = M_PORTOPENED; + break; + case CMAN_REASON_TRY_SHUTDOWN: + msg->msg_control = M_TRY_SHUTDOWN; + break; +#endif + case CMAN_REASON_PORTCLOSED: + msg->msg_control = M_PORTCLOSED; + break; + case CMAN_REASON_STATECHANGE: + msg->msg_control = M_STATECHANGE; + break; + } + + argp = ((void *)msg + sizeof(cluster_msg_hdr_t)); + *argp = arg; + + node->len = sizeof(cluster_msg_hdr_t) + sizeof(int); + node->message = msg; + + pthread_mutex_lock(&context_lock); + ctx = contexts[0]; /* This is the cluster context... */ + if (!ctx) { + /* We received a close for something we've already + detached from our list. No big deal, just + ignore. */ + free(node->message); + free(node); + pthread_mutex_unlock(&context_lock); + return; + } + + pthread_mutex_lock(&ctx->u.cluster_info.mutex); + list_insert(&ctx->u.cluster_info.queue, node); + pthread_mutex_unlock(&ctx->u.cluster_info.mutex); + /* If a select pipe was set up, wake it up */ + if (ctx->u.cluster_info.select_pipe[1] >= 0) + write(ctx->u.cluster_info.select_pipe[1], "", 1); + pthread_mutex_unlock(&context_lock); + + pthread_cond_signal(&ctx->u.cluster_info.cond); +} + + +/* XXX INCOMPLETE */ +int +msg_init(chandle_t *ch) +{ + int e; + pthread_attr_t attrs; + msgctx_t *ctx; + + pthread_mutex_lock(&ch->c_lock); + + /* Set up local context */ + + ctx = msg_new_ctx(); + if (!ctx) { + pthread_mutex_unlock(&ch->c_lock); + return -1; + } + + ctx->type = MSG_SOCKET; + ctx->u.local_info.sockfd = local_listener_sk(); + ctx->u.local_info.flags = SKF_LISTEN; + + ch->local_ctx = ctx; + + ctx = msg_new_ctx(); + + if (!ctx) { + pthread_mutex_unlock(&ch->c_lock); + msg_free_ctx((msgctx_t *)ch->local_ctx); + return -1; + } + + gch = ch; + + if (cman_start_recv_data(ch->c_cluster, process_cman_msg, + RG_PORT) != 0) { + e = errno; + msg_close(ch->local_ctx); + pthread_mutex_unlock(&ch->c_lock); + msg_free_ctx((msgctx_t *)ch->local_ctx); + msg_free_ctx((msgctx_t *)ch->cluster_ctx); + errno = e; + return -1; + } + + if (cman_start_notification(ch->c_cluster, process_cman_event) != 0) { + e = errno; + msg_close(ch->local_ctx); + pthread_mutex_unlock(&ch->c_lock); + msg_free_ctx((msgctx_t *)ch->local_ctx); + msg_free_ctx((msgctx_t *)ch->cluster_ctx); + errno = e; + } + + ch->cluster_ctx = ctx; + pthread_mutex_unlock(&ch->c_lock); + + pthread_mutex_lock(&context_lock); + + memset(contexts, 0, sizeof(contexts)); + contexts[0] = ctx; + + ctx->type = MSG_CLUSTER; + ctx->u.cluster_info.port = RG_PORT; /* port! */ + ctx->u.cluster_info.nodeid = 0; /* Broadcast! */ + ctx->u.cluster_info.select_pipe[0] = -1; + ctx->u.cluster_info.select_pipe[1] = -1; + pthread_mutex_init(&ctx->u.cluster_info.mutex, NULL); + pthread_cond_init(&ctx->u.cluster_info.cond, NULL); + pthread_mutex_unlock(&context_lock); + + pthread_attr_init(&attrs); + pthread_attr_setinheritsched(&attrs, PTHREAD_INHERIT_SCHED); + pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED); + + thread_running = 1; + pthread_create(&comms_thread, &attrs, cluster_comms_thread, NULL); + + + pthread_attr_destroy(&attrs); + + return 0; +} + + +int +msg_print_ctx(int ctx) +{ + if (!contexts[ctx]) + return -1; + + printf("Cluster Message Context %d\n", ctx); + printf(" Node ID %d\n", contexts[ctx]->u.cluster_info.nodeid); + printf(" Remote %d\n", contexts[ctx]->u.cluster_info.remote_ctx); + return 0; +} + + +/* XXX INCOMPLETE */ +int +msg_shutdown(chandle_t *ch) +{ + if (!ch) { + errno = EINVAL; + return -1; + } + + while (pthread_kill(comms_thread, 0) == 0) + sleep(1); + + pthread_mutex_lock(&ch->c_lock); + + /* xxx purge everything */ + msg_close(ch->local_ctx); + cman_end_recv_data(ch->c_cluster); + + msg_free_ctx(ch->local_ctx); + msg_free_ctx(ch->cluster_ctx); + + + pthread_mutex_unlock(&ch->c_lock); + + return 0; +} + + +inline int +msgctx_size(void) +{ + return sizeof(msgctx_t); +} + + +msgctx_t * +msg_new_ctx(void) +{ + msgctx_t *p; + + printf("Alloc %d\n", sizeof(msgctx_t)); + p = malloc(sizeof(msgctx_t)); + if (!p) + return NULL; + + memset(p, 0, sizeof(p)); + p->type = MSG_NONE; + + return p; +} + + +void +msg_free_ctx(msgctx_t *dead) +{ + free(dead); +} +