* Re: the evils of the CLD api
2010-01-16 21:38 ` Colin McCabe
2010-01-16 23:21 ` Pete Zaitcev
@ 2010-01-19 21:54 ` Pete Zaitcev
1 sibling, 0 replies; 8+ messages in thread
From: Pete Zaitcev @ 2010-01-19 21:54 UTC (permalink / raw)
To: Colin McCabe; +Cc: Jeff Garzik, Project Hail List
On Sat, 16 Jan 2010 13:38:26 -0800
Colin McCabe <cmccabe@alumni.cmu.edu> wrote:
> Looks promising...
Here's a version that works:
diff --git a/include/ncld.h b/include/ncld.h
new file mode 100644
index 0000000..067e6ff
--- /dev/null
+++ b/include/ncld.h
@@ -0,0 +1,88 @@
+#ifndef __NCLD_H__
+#define __NCLD_H__
+
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING. If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+/*
+ * The ncld.h API is a replacement for cldc.h. Do not include both.
+ *
+ * We do not believe into making "internal" structures "opaque"
+ * with pointers to void. Therefore, this header might want include
+ * some legacy definitions or whatnot, but users do not need to.
+ */
+#include <stdbool.h>
+#include <glib.h>
+#include <cldc.h>
+#include <libtimer.h>
+
+struct ncld_sess {
+ char *host;
+ unsigned short port;
+ GMutex *mutex;
+ GCond *cond;
+ GThread *thread;
+ bool is_up;
+ int errc;
+ GList *handles;
+ int to_thread[2];
+ struct cldc_udp *udp;
+ struct cld_timer udp_timer;
+ struct cld_timer_list tlist;
+ void (*event)(void *, unsigned int);
+ void *event_arg;
+};
+
+struct ncld_fh {
+ struct ncld_sess *ses;
+ struct cldc_fh *fh; /* FIXME cldc_open2 take direct & */
+ bool is_open;
+ int errc;
+ int nios;
+ unsigned int event_mask;
+ void (*event_func)(void *, unsigned int);
+ void *event_arg;
+};
+
+struct ncld_read {
+ /* public to application */
+ const void *ptr;
+ long length;
+
+ struct ncld_fh *fh;
+ /* GCond *cond; -- abusing the conditional of file handle for now */
+ bool is_done;
+ int errc;
+};
+
+extern struct ncld_sess *ncld_sess_open(const char *host, const char *port,
+ int *error, void (*event)(void *, unsigned int), void *ev_arg,
+ const char *cld_user, const char *cld_key, struct hail_log *hlog);
+extern struct ncld_fh *ncld_open(struct ncld_sess *s, const char *fname,
+ unsigned int mode, int *error, unsigned int events,
+ void (*event)(void *, unsigned int), void *ev_arg);
+extern int ncld_del(struct ncld_sess *nsp, const char *fname);
+extern struct ncld_read *ncld_get(struct ncld_fh *fhp, int *error);
+extern void ncld_read_free(struct ncld_read *rp);
+extern int ncld_write(struct ncld_fh *, const void *data, long len);
+extern int ncld_trylock(struct ncld_fh *);
+extern int ncld_unlock(struct ncld_fh *);
+extern void ncld_close(struct ncld_fh *);
+extern void ncld_sess_close(struct ncld_sess *s);
+extern void ncld_init(void);
+
+#endif /* __NCLD_H__ */
diff --git a/lib/cldc.c b/lib/cldc.c
index bc4b48c..0bd2ec3 100644
--- a/lib/cldc.c
+++ b/lib/cldc.c
@@ -30,12 +30,17 @@
#include <errno.h>
#include <time.h>
#include <stdarg.h>
+#include <syslog.h>
+#include <poll.h>
#include <openssl/sha.h>
#include <openssl/hmac.h>
#include <glib.h>
+#include <libtimer.h>
#include <cld-private.h>
#include <cldc.h>
-#include <syslog.h>
+#include <ncld.h>
+
+#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
enum {
CLDC_MSG_EXPIRE = 5 * 60,
@@ -640,7 +645,11 @@ static void sess_expire(struct cldc_session *sess)
sess->ops->timer_ctl(sess->private, false, NULL, NULL, 0);
sess->ops->event(sess->private, sess, NULL, CE_SESS_FAILED);
- /* FIXME why not sess_free here */
+ /*
+ * Do not do sess_free here, or else the application is bound to
+ * step into use-after-free. Only call sess_free when requested.
+ * However, it may be requested right from this callback. Or later.
+ */
}
static int sess_send_pkt(struct cldc_session *sess,
@@ -1379,6 +1388,831 @@ char *cldc_dirent_name(struct cld_dirent_cur *dc)
}
/*
+ * On error, return the code (not negated code like a kernel function would).
+ */
+static int ncld_getsrv(char **hostp, unsigned short *portp,
+ struct hail_log *hlog)
+{
+ enum { hostsz = 64 };
+ char hostb[hostsz];
+ GList *host_list = NULL;
+ GList *tmp;
+ struct cldc_host *hp;
+
+ if (gethostname(hostb, hostsz-1) < 0)
+ return errno;
+ hostb[hostsz-1] = 0;
+
+ if (cldc_getaddr(&host_list, hostb, hlog))
+ return 1001;
+
+ /*
+ * This is a good place to compare weights and priorities, maybe later.
+ * For now, just grab first host in the list.
+ */
+ hp = host_list->data; /* cannot be NULL if success above */
+ *hostp = hp->host;
+ *portp = hp->port;
+
+ for (tmp = host_list; tmp; tmp = tmp->next) {
+ hp = tmp->data;
+ free(hp->host);
+ free(hp);
+ }
+ g_list_free(host_list);
+ return 0;
+}
+
+static int ncld_gethost(char **hostp, unsigned short *portp,
+ const char *hostname, const char *portstr)
+{
+ long n;
+
+ n = strtol(portstr, NULL, 10);
+ if (n <= 0 || n >= 65536)
+ return EINVAL;
+ *portp = n;
+
+ if (!(*hostp = strdup(hostname)))
+ return ENOMEM;
+
+ return 0;
+}
+
+static void ncld_udp_timer_event(struct cld_timer *timer)
+{
+ struct ncld_sess *nsp = timer->userdata;
+ struct cldc_udp *udp = nsp->udp;
+
+ if (udp->cb)
+ udp->cb(udp->sess, udp->cb_private);
+}
+
+enum {
+ NCLD_CMD_END = 0,
+ NCLD_CMD_SESEV /* argument - 4 bytes in host order */
+};
+
+/*
+ * All the error printouts are likely to be lost for daemons, but it's
+ * not a big deal. We abort instead of exist to indicate that something
+ * went wrong, so system features should report it (usualy as a core).
+ * When debugging, strace or -F mode will capture the output.
+ */
+static void ncld_thread_command(struct ncld_sess *nsp)
+{
+ ssize_t rrc;
+ unsigned char cmd;
+ uint32_t what;
+
+ rrc = read(nsp->to_thread[0], &cmd, 1);
+ if (rrc < 0) {
+ fprintf(stderr, "read error: %s\n", strerror(errno));
+ abort();
+ }
+ if (rrc < 1) {
+ fprintf(stderr, "short read\n");
+ abort();
+ }
+
+ if (cmd == NCLD_CMD_END) {
+ /* No answer to requestor. Wait with g_thread_join. */
+ g_thread_exit(NULL);
+ } else if (cmd == NCLD_CMD_SESEV) {
+ rrc = read(nsp->to_thread[0], &what, sizeof(uint32_t));
+ if (rrc < sizeof(uint32_t)) {
+ fprintf(stderr, "bad read param\n");
+ g_thread_exit(NULL);
+ }
+ if (nsp->event)
+ (*nsp->event)(nsp->event_arg, what);
+ } else {
+ fprintf(stderr, "bad command 0x%x\n", cmd);
+ abort();
+ }
+}
+
+static gpointer ncld_sess_thr(gpointer data)
+{
+ struct ncld_sess *nsp = data;
+ struct pollfd pfd[2];
+ time_t tmo;
+ int i;
+ int rc;
+
+ for (;;) {
+ g_mutex_lock(nsp->mutex);
+ tmo = cld_timers_run(&nsp->tlist);
+ g_mutex_unlock(nsp->mutex);
+
+ memset(pfd, 0, sizeof(pfd));
+ pfd[0].fd = nsp->to_thread[0];
+ pfd[0].events = POLLIN;
+ pfd[1].fd = nsp->udp->fd;
+ pfd[1].events = POLLIN;
+
+ rc = poll(pfd, 2, tmo ? tmo*1000 : -1);
+ if (rc == 0)
+ continue;
+ if (rc < 0) {
+ fprintf(stderr, "poll error: %s\n", strerror(errno));
+ abort();
+ }
+
+ for (i = 0; i < ARRAY_SIZE(pfd); i++) {
+ if (pfd[i].revents) {
+ if (i == 0) {
+ ncld_thread_command(nsp);
+ } else {
+ g_mutex_lock(nsp->mutex);
+ cldc_udp_receive_pkt(nsp->udp);
+ g_mutex_unlock(nsp->mutex);
+ }
+ }
+ }
+ }
+
+ return NULL;
+}
+
+/*
+ * Ask the thread to exit and wait until it does.
+ */
+static void ncld_thr_end(struct ncld_sess *nsp)
+{
+ unsigned char cmd;
+
+ cmd = NCLD_CMD_END;
+ write(nsp->to_thread[1], &cmd, 1);
+ g_thread_join(nsp->thread);
+}
+
+static bool ncld_p_timer_ctl(void *priv, bool add,
+ int (*cb)(struct cldc_session *, void *),
+ void *cb_priv, time_t secs)
+{
+ struct ncld_sess *nsp = priv;
+ struct cldc_udp *udp = nsp->udp;
+
+ if (add) {
+ udp->cb = cb;
+ udp->cb_private = cb_priv;
+ cld_timer_add(&nsp->tlist, &nsp->udp_timer, time(NULL) + secs);
+ } else {
+ cld_timer_del(&nsp->tlist, &nsp->udp_timer);
+ }
+ return true;
+}
+
+static int ncld_p_pkt_send(void *priv, const void *addr, size_t addrlen,
+ const void *buf, size_t buflen)
+{
+ struct ncld_sess *nsp = priv;
+ return cldc_udp_pkt_send(nsp->udp, addr, addrlen, buf, buflen);
+}
+
+static void ncld_p_event(void *priv, struct cldc_session *csp,
+ struct cldc_fh *fh, uint32_t what)
+{
+ struct ncld_sess *nsp = priv;
+ unsigned char cmd;
+
+ if (what == CE_SESS_FAILED) {
+ if (nsp->udp->sess != csp)
+ abort();
+ nsp->is_up = false;
+ /* XXX wake up all I/O waiters here */
+ /*
+ * This is a trick. As a direct callback from clcd layer,
+ * we are running under nsp->mutex, so we cannot call back
+ * into a user of ncld. If we do, it may invoke another
+ * ncld operation and deadlock. So, bump session callbacks
+ * into the part of the helper thread that runs unlocked.
+ */
+ cmd = NCLD_CMD_SESEV;
+ write(nsp->to_thread[1], &cmd, 1);
+ write(nsp->to_thread[1], &what, sizeof(what));
+ }
+}
+
+static struct cldc_ops ncld_ops = {
+ .timer_ctl = ncld_p_timer_ctl,
+ .pkt_send = ncld_p_pkt_send,
+ .event = ncld_p_event,
+ .errlog = NULL,
+};
+
+static int ncld_new_sess(struct cldc_call_opts *copts, enum cle_err_codes errc)
+{
+ struct ncld_sess *nsp = copts->private;
+
+ /*
+ * All callbacks from cldc layer run on the context of the thread
+ * with nsp->mutex locked, so we don't lock it again here.
+ */
+ nsp->errc = errc;
+ nsp->is_up = true;
+ g_cond_broadcast(nsp->cond);
+ return 0;
+}
+
+static int ncld_wait_session(struct ncld_sess *nsp)
+{
+ g_mutex_lock(nsp->mutex);
+ while (!nsp->is_up)
+ g_cond_wait(nsp->cond, nsp->mutex);
+ g_mutex_unlock(nsp->mutex);
+ return nsp->errc;
+}
+
+/*
+ * Initiating session may take a while, since we retry failures.
+ * We promise that eventually we return from this function.
+ *
+ * On error, returns NULL and sets the error code (system errno if below 1000,
+ * otherwise our own mysterious codes that you should just print in decimal).
+ *
+ * Keep in mind that the (*event)() is likely to be called on a context
+ * of the extra thread that we spawn. At least we won't steal some random
+ * context and monkey with signals. Also, (*event)() may be called before
+ * this function returns, with the session going down, for example.
+ * This is kind of dirty, but oh well. Maybe we'll fix this later.
+ *
+ * The lifetime of the hlog must be longer than that of the session.
+ *
+ * @param host Host name (NULL if resolving SRV records)
+ * @param port Port string (NULL if resolving SRV records)
+ * @param error Buffer for the error code
+ * @param ev_func Session event function (ok to be NULL)
+ * @param ev_arg User-supplied argument to the session event function
+ * @param cld_user The user identifier to be used to authentication
+ * @param cld_key The user key to be used to authentication
+ * @param hlog The log to which we (sadly) write, because we use cldc.
+ */
+struct ncld_sess *ncld_sess_open(const char *host, const char *port, int *error,
+ void (*ev_func)(void *, unsigned int), void *ev_arg,
+ const char *cld_user, const char *cld_key, struct hail_log *hlog)
+{
+ struct ncld_sess *nsp;
+ struct cldc_call_opts copts;
+ int err;
+ GError *gerr;
+ int rc;
+
+ err = ENOMEM;
+ nsp = malloc(sizeof(struct ncld_sess));
+ if (!nsp)
+ goto out_sesalloc;
+ memset(nsp, 0, sizeof(struct ncld_sess));
+ cld_timer_init(&nsp->udp_timer, "nsp-udp-timer", ncld_udp_timer_event,
+ nsp);
+ nsp->mutex = g_mutex_new();
+ if (!nsp->mutex)
+ goto out_mutex;
+ nsp->cond = g_cond_new();
+ if (!nsp->cond)
+ goto out_cond;
+
+ if (!host) {
+ err = EINVAL;
+ if (port)
+ goto out_portarg;
+ err = ncld_getsrv(&nsp->host, &nsp->port, hlog);
+ if (err)
+ goto out_srv;
+ } else {
+ err = EINVAL;
+ if (!port)
+ goto out_portarg;
+ err = ncld_gethost(&nsp->host, &nsp->port, host, port);
+ if (err)
+ goto out_srv;
+ }
+
+ nsp->event = ev_func;
+ nsp->event_arg = ev_arg;
+
+ if (pipe(nsp->to_thread) < 0) {
+ err = errno;
+ goto out_pipe_to;
+ }
+
+ if (cldc_udp_new(nsp->host, nsp->port, &nsp->udp)) {
+ err = 1023;
+ goto out_udp;
+ }
+
+ nsp->thread = g_thread_create(ncld_sess_thr, nsp, TRUE, &gerr);
+ if (nsp->thread == NULL) {
+ HAIL_ERR(hlog, "Failed to start replication thread: %s",
+ gerr->message);
+ err = 1022;
+ goto out_thread;
+ }
+
+ /*
+ * FIXME If anyone ever gets 2 sessions in one application,
+ * their logs' lifetimes will overlap. We need to move the errlog
+ * from ops to the session itself.
+ */
+ ncld_ops.errlog = hlog->func;
+
+ memset(&copts, 0, sizeof(copts));
+ copts.cb = ncld_new_sess;
+ copts.private = nsp;
+ if (cldc_new_sess(&ncld_ops, &copts, nsp->udp->addr, nsp->udp->addr_len,
+ cld_user, cld_key, nsp, &nsp->udp->sess)) {
+ err = 1024;
+ goto out_session;
+ }
+
+ /* nsp->udp->sess->log.verbose = hlog->verbose; */
+
+ rc = ncld_wait_session(nsp);
+ if (rc) {
+ err = 1100 + rc % 1000;
+ goto out_start;
+ }
+
+ return nsp;
+
+out_start:
+ cldc_kill_sess(nsp->udp->sess);
+out_session:
+ ncld_thr_end(nsp);
+out_thread:
+ cldc_udp_free(nsp->udp);
+out_udp:
+ close(nsp->to_thread[0]);
+ close(nsp->to_thread[1]);
+out_pipe_to:
+ free(nsp->host);
+out_srv:
+out_portarg:
+ g_cond_free(nsp->cond);
+out_cond:
+ g_mutex_free(nsp->mutex);
+out_mutex:
+ free(nsp);
+out_sesalloc:
+ *error = err;
+ return NULL;
+}
+
+static int ncld_open_cb(struct cldc_call_opts *copts, enum cle_err_codes errc)
+{
+ struct ncld_fh *fhp = copts->private;
+
+ fhp->errc = errc;
+ fhp->is_open = true;
+ g_cond_broadcast(fhp->ses->cond);
+ return 0;
+}
+
+static int ncld_wait_open(struct ncld_fh *fhp)
+{
+ struct ncld_sess *nsp = fhp->ses;
+
+ g_mutex_lock(nsp->mutex);
+ while (!fhp->is_open)
+ g_cond_wait(nsp->cond, nsp->mutex);
+ g_mutex_unlock(nsp->mutex);
+ return fhp->errc;
+}
+
+/*
+ * We do not provide anything like ncld_set_callback because it inevitably
+ * leads to lost events. Unused arguments are not too onerous, so just
+ * put zero into the events mask if you don't want notifications.
+ *
+ * FIXME It's a moot point for now anyway, since we don't have unlock-notify,
+ * or a waiting lock anyway.
+ *
+ * On error, return NULL and set the error code (can be errno or our own code).
+ */
+struct ncld_fh *ncld_open(struct ncld_sess *nsp, const char *fname,
+ unsigned int mode, int *error, unsigned int events,
+ void (*ev_func)(void *, unsigned int), void *ev_arg)
+{
+ struct ncld_fh *fhp;
+ struct cldc_call_opts copts;
+ int err;
+ int rc;
+
+ err = EBUSY;
+ if (!nsp->is_up)
+ goto out_session;
+
+ err = ENOMEM;
+ fhp = malloc(sizeof(struct ncld_fh));
+ if (!fhp)
+ goto out_alloc;
+ memset(fhp, 0, sizeof(struct ncld_fh));
+ fhp->ses = nsp;
+ fhp->event_mask = events;
+ fhp->event_func = ev_func;
+ fhp->event_arg = ev_arg;
+
+ g_mutex_lock(nsp->mutex);
+ memset(&copts, 0, sizeof(copts));
+ copts.cb = ncld_open_cb;
+ copts.private = fhp;
+ rc = cldc_open(nsp->udp->sess, &copts, fname, mode, events, &fhp->fh);
+ if (rc) {
+ err = -rc;
+ g_mutex_unlock(nsp->mutex);
+ goto out_open;
+ }
+ g_mutex_unlock(nsp->mutex);
+
+ rc = ncld_wait_open(fhp);
+ if (rc) {
+ err = 1100 + rc;
+ goto out_start;
+ }
+
+ nsp->handles = g_list_prepend(nsp->handles, fhp);
+ return fhp;
+
+out_start:
+ /*
+ * This is screwy, but we do nothing here. There is no way to free
+ * a cldc file handle that failed its open. It is stuck in the
+ * array of handles until the session closes (although it can be
+ * garbage-collected if we're lucky).
+ */
+out_open:
+ free(fhp);
+out_alloc:
+out_session:
+ *error = err;
+ return NULL;
+}
+
+struct ncld_delio {
+ struct ncld_sess *ses;
+ bool is_done;
+ int errc;
+};
+
+static int ncld_del_cb(struct cldc_call_opts *copts, enum cle_err_codes errc)
+{
+ struct ncld_delio *dp = copts->private;
+ struct ncld_sess *nsp = dp->ses;
+
+ dp->errc = errc;
+ dp->is_done = true;
+ g_cond_broadcast(nsp->cond);
+ return 0;
+}
+
+static int ncld_wait_del(struct ncld_delio *dp)
+{
+ struct ncld_sess *nsp = dp->ses;
+
+ g_mutex_lock(nsp->mutex);
+ while (!dp->is_done)
+ g_cond_wait(nsp->cond, nsp->mutex);
+ g_mutex_unlock(nsp->mutex);
+ return dp->errc;
+}
+
+int ncld_del(struct ncld_sess *nsp, const char *fname)
+{
+ struct cldc_call_opts copts;
+ struct ncld_delio dpb;
+ int rc;
+
+ if (!nsp->is_up)
+ return -EBUSY;
+
+ memset(&dpb, 0, sizeof(struct ncld_delio));
+ dpb.ses = nsp;
+
+ g_mutex_lock(nsp->mutex);
+ memset(&copts, 0, sizeof(copts));
+ copts.cb = ncld_del_cb;
+ copts.private = nsp;
+ rc = cldc_del(nsp->udp->sess, &copts, fname);
+ if (rc) {
+ g_mutex_unlock(nsp->mutex);
+ return -rc;
+ }
+ /* XXX A delete operation is not accounted for end-session */
+ g_mutex_unlock(nsp->mutex);
+
+ rc = ncld_wait_del(&dpb);
+ if (rc)
+ return rc + 1100;
+
+ return 0;
+}
+
+static int ncld_read_cb(struct cldc_call_opts *copts, enum cle_err_codes errc)
+{
+ struct ncld_read *rp = copts->private;
+ struct ncld_fh *fhp = rp->fh;
+
+ if (errc) {
+ rp->errc = errc;
+ } else {
+ rp->ptr = copts->u.get.buf;
+ rp->length = copts->u.get.size;
+ }
+ rp->is_done = true;
+ g_cond_broadcast(fhp->ses->cond);
+ return 0;
+}
+
+static int ncld_wait_read(struct ncld_read *rp)
+{
+ struct ncld_fh *fhp = rp->fh;
+ struct ncld_sess *nsp = fhp->ses;
+
+ g_mutex_lock(nsp->mutex);
+ while (!rp->is_done)
+ g_cond_wait(nsp->cond, nsp->mutex);
+ --fhp->nios;
+ g_mutex_unlock(nsp->mutex);
+ return rp->errc;
+}
+
+/*
+ * @error Error code buffer.
+ * @return Pointer to struct ncld_read or NULL if error.
+ */
+struct ncld_read *ncld_get(struct ncld_fh *fhp, int *error)
+{
+ struct ncld_sess *nsp = fhp->ses;
+ struct ncld_read *rp;
+ struct cldc_call_opts copts;
+ int rc;
+
+ if (!fhp->is_open) {
+ *error = EBUSY;
+ return NULL;
+ }
+
+ rp = malloc(sizeof(struct ncld_read));
+ if (!rp) {
+ *error = ENOMEM;
+ return NULL;
+ }
+ memset(rp, 0, sizeof(struct ncld_read));
+ rp->fh = fhp;
+
+ g_mutex_lock(nsp->mutex);
+ memset(&copts, 0, sizeof(copts));
+ copts.cb = ncld_read_cb;
+ copts.private = rp;
+ rc = cldc_get(fhp->fh, &copts, false);
+ if (rc) {
+ g_mutex_unlock(nsp->mutex);
+ free(rp);
+ *error = -rc;
+ return NULL;
+ }
+ fhp->nios++;
+ g_mutex_unlock(nsp->mutex);
+
+ rc = ncld_wait_read(rp);
+ if (rc) {
+ free(rp);
+ *error = rc + 1100;
+ return NULL;
+ }
+
+ return rp;
+}
+
+void ncld_read_free(struct ncld_read *rp)
+{
+
+ /*
+ * FIXME: Actually the rp->ptr points to sess->msg_buf, so we
+ * cannot issue 2 cldc_get independently.
+ */
+ /* free(rp->ptr); */
+
+ free(rp);
+}
+
+struct ncld_genio {
+ struct ncld_fh *fh;
+ bool is_done;
+ int errc;
+};
+
+static int ncld_genio_cb(struct cldc_call_opts *copts, enum cle_err_codes errc)
+{
+ struct ncld_genio *ap = copts->private;
+ struct ncld_fh *fhp = ap->fh;
+
+ ap->errc = errc;
+ ap->is_done = true;
+ g_cond_broadcast(fhp->ses->cond);
+ return 0;
+}
+
+static int ncld_wait_genio(struct ncld_genio *ap)
+{
+ struct ncld_fh *fhp = ap->fh;
+ struct ncld_sess *nsp = fhp->ses;
+
+ g_mutex_lock(nsp->mutex);
+ while (!ap->is_done)
+ g_cond_wait(nsp->cond, nsp->mutex);
+ --fhp->nios;
+ g_mutex_unlock(nsp->mutex);
+ return ap->errc;
+}
+
+/*
+ * @return: Zero or error code.
+ */
+int ncld_write(struct ncld_fh *fhp, const void *data, long len)
+{
+ struct ncld_sess *nsp = fhp->ses;
+ struct cldc_call_opts copts;
+ struct ncld_genio apb;
+ int rc;
+
+ if (!fhp->is_open)
+ return -EBUSY;
+
+ memset(&apb, 0, sizeof(struct ncld_genio));
+ apb.fh = fhp;
+
+ g_mutex_lock(nsp->mutex);
+ memset(&copts, 0, sizeof(copts));
+ copts.cb = ncld_genio_cb;
+ copts.private = &apb;
+ rc = cldc_put(fhp->fh, &copts, data, len);
+ if (rc) {
+ g_mutex_unlock(nsp->mutex);
+ return -rc;
+ }
+ fhp->nios++;
+ g_mutex_unlock(nsp->mutex);
+
+ rc = ncld_wait_genio(&apb);
+ if (rc)
+ return rc + 1100;
+
+ return 0;
+}
+
+/*
+ * FIXME There is no "wait for lock".
+ */
+int ncld_trylock(struct ncld_fh *fhp)
+{
+ struct ncld_sess *nsp = fhp->ses;
+ struct cldc_call_opts copts;
+ struct ncld_genio apb;
+ int rc;
+
+ if (!fhp->is_open)
+ return -EBUSY;
+
+ memset(&apb, 0, sizeof(struct ncld_genio));
+ apb.fh = fhp;
+
+ g_mutex_lock(nsp->mutex);
+ memset(&copts, 0, sizeof(copts));
+ copts.cb = ncld_genio_cb;
+ copts.private = &apb;
+ rc = cldc_lock(fhp->fh, &copts, 0, false);
+ if (rc) {
+ g_mutex_unlock(nsp->mutex);
+ return -rc;
+ }
+ fhp->nios++;
+ g_mutex_unlock(nsp->mutex);
+
+ rc = ncld_wait_genio(&apb);
+ if (rc)
+ return rc + 1100;
+
+ return 0;
+}
+
+int ncld_unlock(struct ncld_fh *fhp)
+{
+ struct ncld_sess *nsp = fhp->ses;
+ struct cldc_call_opts copts;
+ struct ncld_genio apb;
+ int rc;
+
+ if (!fhp->is_open)
+ return -EBUSY;
+
+ memset(&apb, 0, sizeof(struct ncld_genio));
+ apb.fh = fhp;
+
+ g_mutex_lock(nsp->mutex);
+ memset(&copts, 0, sizeof(copts));
+ copts.cb = ncld_genio_cb;
+ copts.private = &apb;
+ rc = cldc_unlock(fhp->fh, &copts);
+ if (rc) {
+ g_mutex_unlock(nsp->mutex);
+ return -rc;
+ }
+ fhp->nios++;
+ g_mutex_unlock(nsp->mutex);
+
+ rc = ncld_wait_genio(&apb);
+ if (rc)
+ return rc + 1100;
+
+ return 0;
+}
+
+static int ncld_close_cb(struct cldc_call_opts *copts, enum cle_err_codes errc)
+{
+ struct ncld_fh *fhp = copts->private;
+
+ fhp->errc = errc;
+ fhp->is_open = false;
+ g_cond_broadcast(fhp->ses->cond);
+ return 0;
+}
+
+/*
+ * The poisoning in ncld_close is mostly for bug-catching. Do not try to
+ * force-close file handles if other threads bomb them I/Os. This cannot work
+ * for us, because users keep pointers, not file descriptor numbers.
+ * Applications should stop application I/O first, then close.
+ */
+void ncld_close(struct ncld_fh *fhp)
+{
+ struct ncld_sess *nsp = fhp->ses;
+ struct cldc_call_opts copts;
+ int rc;
+
+ if (!fhp->is_open)
+ abort();
+
+ g_mutex_lock(nsp->mutex);
+ memset(&copts, 0, sizeof(copts));
+ copts.cb = ncld_close_cb;
+ copts.private = fhp;
+ rc = cldc_close(fhp->fh, &copts);
+ g_mutex_unlock(nsp->mutex);
+
+ if (rc == 0) {
+ g_mutex_lock(nsp->mutex);
+ while (fhp->is_open)
+ g_cond_wait(nsp->cond, nsp->mutex);
+ g_mutex_unlock(nsp->mutex);
+ /* At this point, we ignore fhp->errc. */
+ }
+
+ /*
+ * The cldc layer dowes not allow us to kill handles willy-nilly,
+ * so the only thing we can do is to wait for I/Os to terminate.
+ *
+ * N.B.: this is making use of the fact that we only have one
+ * conditional per session, and therefore end-of-I/O pokes us here.
+ */
+ g_mutex_lock(nsp->mutex);
+ while (fhp->nios)
+ g_cond_wait(nsp->cond, nsp->mutex);
+ g_mutex_unlock(nsp->mutex);
+
+ nsp->handles = g_list_remove_all(nsp->handles, fhp);
+ free(fhp);
+}
+
+static void ncld_func_close(gpointer data, gpointer priv)
+{
+ ncld_close(data);
+}
+
+void ncld_sess_close(struct ncld_sess *nsp)
+{
+
+ g_list_foreach(nsp->handles, ncld_func_close, NULL);
+ g_list_free(nsp->handles);
+
+ cldc_kill_sess(nsp->udp->sess);
+ ncld_thr_end(nsp);
+ cldc_udp_free(nsp->udp);
+ close(nsp->to_thread[0]);
+ close(nsp->to_thread[1]);
+ g_cond_free(nsp->cond);
+ g_mutex_free(nsp->mutex);
+ free(nsp->host);
+ free(nsp);
+}
+
+void ncld_init(void)
+{
+ cldc_init();
+}
+
+/*
* For extra safety, call cldc_init after g_thread_init, if present.
* Currently we just call srand(), but since we use GLib, we may need
* to add some Glib stuff here and that must come after g_thread_init.
diff --git a/test/lock-file-event.c b/test/lock-file-event.c
index 0772e4e..f2d7c5e 100644
--- a/test/lock-file-event.c
+++ b/test/lock-file-event.c
@@ -25,267 +25,91 @@
#include "cld-config.h"
#include <sys/types.h>
-#include <unistd.h>
#include <time.h>
#include <stdlib.h>
#include <stdio.h>
-#include <string.h>
-#include <libtimer.h>
-#include <cldc.h>
+#include <stdarg.h>
+#include <ncld.h>
#include "test.h"
-struct run {
- struct cldc_udp *udp;
- struct timer tmr_test;
- struct timer tmr_udp;
- struct cldc_fh *fh;
- char buf[LOCKLEN];
-};
-
-static int new_sess_cb(struct cldc_call_opts *copts, enum cle_err_codes errc);
-static int open_1_cb(struct cldc_call_opts *coptarg, enum cle_err_codes errc);
-static int write_1_cb(struct cldc_call_opts *coptarg, enum cle_err_codes errc);
-static int lock_1_cb(struct cldc_call_opts *coptarg, enum cle_err_codes errc);
-static void timer_1(struct run *rp);
-static int close_1_cb(struct cldc_call_opts *coptarg, enum cle_err_codes errc);
-static int end_sess_cb(struct cldc_call_opts *copts, enum cle_err_codes errc);
-
-static bool do_timer_ctl(void *priv, bool add,
- int (*cb)(struct cldc_session *, void *),
- void *cb_priv, time_t secs)
+static void applog(int prio, const char *fmt, ...)
{
- struct run *rp = priv;
+ char buf[200];
+ va_list ap;
- if (add) {
- rp->udp->cb = cb;
- rp->udp->cb_private = cb_priv;
- timer_add(&rp->tmr_udp, time(NULL) + secs);
- } else {
- timer_del(&rp->tmr_udp);
- }
-
- return true;
+ va_start(ap, fmt);
+ snprintf(buf, 200, "%s\n", fmt);
+ vfprintf(stderr, buf, ap);
+ va_end(ap);
}
-static int do_pkt_send(void *priv, const void *addr, size_t addrlen,
- const void *buf, size_t buflen)
-{
- struct run *rp = priv;
- return cldc_udp_pkt_send(rp->udp, addr, addrlen, buf, buflen);
-}
-
-static void timer_udp_event(struct timer *timer)
-{
- struct run *rp = timer->userdata;
- struct cldc_udp *udp = rp->udp;
-
- if (udp->cb)
- udp->cb(udp->sess, udp->cb_private);
-}
-
-static void do_event(void *private, struct cldc_session *sess,
- struct cldc_fh *fh, uint32_t event_mask)
-{
- fprintf(stderr, "EVENT(0x%x)\n", event_mask);
-}
+static struct hail_log test_log = {
+ .func = applog,
+ .verbose = 1,
+};
-static int new_sess_cb(struct cldc_call_opts *coptarg, enum cle_err_codes errc)
+int main (int argc, char *argv[])
{
- struct run *rp = coptarg->private;
- struct cldc_call_opts copts;
+ struct ncld_sess *nsp;
+ struct ncld_fh *fhp;
+ int port;
+ char portstr[6];
+ struct timespec tm;
+ int error;
int rc;
- if (errc != CLE_OK) {
- fprintf(stderr, "new-sess failed: %d\n", errc);
- exit(1);
- }
-
- /* We use a fixed file name because we contact a private copy of CLD */
- memset(&copts, 0, sizeof(copts));
- copts.cb = open_1_cb;
- copts.private = rp;
- rc = cldc_open(rp->udp->sess, &copts, TLNAME,
- COM_WRITE | COM_LOCK | COM_CREATE,
- CE_SESS_FAILED, &rp->fh);
- if (rc) {
- fprintf(stderr, "cldc_open call error %d\n", rc);
- exit(1);
- }
- return 0;
-}
-
-static int open_1_cb(struct cldc_call_opts *coptarg, enum cle_err_codes errc)
-{
- struct run *rp = coptarg->private;
- struct cldc_call_opts copts;
- int rc;
+ g_thread_init(NULL);
+ ncld_init();
- if (errc != CLE_OK) {
- fprintf(stderr, "first-open failed: %d\n", errc);
- exit(1);
- }
- if (rp->fh == NULL) {
- fprintf(stderr, "first-open NULL fh\n");
- exit(1);
- }
- if (!rp->fh->valid) {
- fprintf(stderr, "first-open invalid fh\n");
- exit(1);
- }
+ port = cld_readport(TEST_PORTFILE_CLD);
+ if (port < 0)
+ return port;
+ if (port == 0)
+ return -1;
+ snprintf(portstr, sizeof(portstr), "%u", port);
- memset(&copts, 0, sizeof(copts));
- copts.cb = write_1_cb;
- copts.private = rp;
- rc = cldc_put(rp->fh, &copts, rp->buf, LOCKLEN);
- if (rc) {
- fprintf(stderr, "cldc_put call error %d\n", rc);
+ nsp = ncld_sess_open(TEST_HOST, portstr, &error, NULL, NULL,
+ TEST_USER, TEST_USER_KEY, &test_log);
+ if (!nsp) {
+ fprintf(stderr, "ncld_sess_open(host %s port %s) failed: %d\n",
+ TEST_HOST, portstr, error);
exit(1);
}
- return 0;
-}
-
-static int write_1_cb(struct cldc_call_opts *coptarg, enum cle_err_codes errc)
-{
- struct run *rp = coptarg->private;
- struct cldc_call_opts copts;
- int rc;
- if (errc != CLE_OK) {
- fprintf(stderr, "first-put failed: %d\n", errc);
+ fhp = ncld_open(nsp, TLNAME, COM_WRITE | COM_LOCK | COM_CREATE,
+ &error, 0, NULL, NULL);
+ if (!fhp) {
+ fprintf(stderr, "ncld_open(%s) failed: %d\n", TLNAME, error);
exit(1);
}
- memset(&copts, 0, sizeof(copts));
- copts.cb = lock_1_cb;
- copts.private = rp;
- rc = cldc_lock(rp->fh, &copts, 0, false);
+ rc = ncld_write(fhp, LOCKSTR, LOCKLEN);
if (rc) {
- fprintf(stderr, "cldc_lock call error %d\n", rc);
- exit(1);
- }
- return 0;
-}
-
-static int lock_1_cb(struct cldc_call_opts *coptarg, enum cle_err_codes errc)
-{
- struct run *rp = coptarg->private;
-
- if (errc != CLE_OK) {
- fprintf(stderr, "first-lock failed: %d\n", errc);
+ fprintf(stderr, "ncld_write failed: %d\n", rc);
exit(1);
}
- /* Idle for 40s to verify that session sustains a protocol ping. */
- timer_add(&rp->tmr_test, time(NULL) + 40);
- return 0;
-}
-
-static void timer_test_event(struct timer *timer)
-{
- struct run *rp = timer->userdata;
-
- timer_1(rp);
-}
-
-static void timer_1(struct run *rp)
-{
- struct cldc_call_opts copts;
- int rc;
-
- memset(&copts, 0, sizeof(copts));
- copts.cb = close_1_cb;
- copts.private = rp;
- rc = cldc_close(rp->fh, &copts);
+ rc = ncld_trylock(fhp);
if (rc) {
- fprintf(stderr, "cldc_close call error %d\n", rc);
+ fprintf(stderr, "ncld_trylock failed: %d\n", rc);
exit(1);
}
-}
-static int close_1_cb(struct cldc_call_opts *coptarg, enum cle_err_codes errc)
-{
- struct run *rp = coptarg->private;
- struct cldc_call_opts copts;
- int rc;
-
- if (errc != CLE_OK) {
- fprintf(stderr, "first-close failed: %d\n", errc);
- exit(1);
- }
- rp->fh = NULL;
+ printf("idling 40s...\n"); fflush(stdout);
+ /* Idle for 40s to verify that session sustains a protocol ping. */
+ tm.tv_sec = 40;
+ tm.tv_nsec = 0;
+ nanosleep(&tm, NULL);
- memset(&copts, 0, sizeof(copts));
- copts.cb = end_sess_cb;
- copts.private = rp;
- rc = cldc_end_sess(rp->udp->sess, &copts);
+ rc = ncld_unlock(fhp);
if (rc) {
- fprintf(stderr, "cldc_end_sess call error %d\n", rc);
+ fprintf(stderr, "ncld_unlock failed: %d\n", rc);
exit(1);
}
- return 0;
-}
-
-static int end_sess_cb(struct cldc_call_opts *copts, enum cle_err_codes errc)
-{
- if (errc != CLE_OK) {
- fprintf(stderr, "end-sess failed: %d\n", errc);
- exit(1);
- }
-
- /* session ended; success */
- exit(0);
- return 0;
-}
-
-static struct run run;
-
-static struct cldc_ops ops = {
- .timer_ctl = do_timer_ctl,
- .pkt_send = do_pkt_send,
- .event = do_event,
-};
-
-static int init(void)
-{
- int rc;
- int port;
- struct cldc_call_opts copts;
-
- memcpy(run.buf, LOCKSTR, LOCKLEN);
-
- port = cld_readport(TEST_PORTFILE_CLD);
- if (port < 0)
- return port;
- if (port == 0)
- return -1;
-
- timer_init(&run.tmr_test, "lock-timer", timer_test_event, &run);
- timer_init(&run.tmr_udp, "udp-timer", timer_udp_event, &run);
-
- rc = cldc_udp_new(TEST_HOST, port, &run.udp);
- if (rc)
- return rc;
-
- memset(&copts, 0, sizeof(copts));
- copts.cb = new_sess_cb;
- copts.private = &run;
- rc = cldc_new_sess(&ops, &copts, run.udp->addr, run.udp->addr_len,
- TEST_USER, TEST_USER_KEY, &run, &run.udp->sess);
- if (rc)
- return rc;
-
- // run.udp->sess->verbose = true;
- return 0;
-}
-
-int main (int argc, char *argv[])
-{
- g_thread_init(NULL);
- cldc_init();
- if (init())
- return 1;
- test_loop(run.udp);
+ /* These two are perfect places to hang or crash, so don't just exit. */
+ ncld_close(fhp);
+ ncld_sess_close(nsp);
return 0;
}
I am going to work on this gradually and flip over cldcli, rest of tests,
and cldu.c in chunkd and tabled, then we'll see how it works.
-- Pete
^ permalink raw reply related [flat|nested] 8+ messages in thread