From: Pete Zaitcev <zaitcev@redhat.com>
To: Colin McCabe <cmccabe@alumni.cmu.edu>
Cc: Jeff Garzik <jeff@garzik.org>,
Project Hail List <hail-devel@vger.kernel.org>
Subject: Re: the evils of the CLD api
Date: Tue, 19 Jan 2010 14:54:05 -0700 [thread overview]
Message-ID: <20100119145405.16faaa24@redhat.com> (raw)
In-Reply-To: <436f52801001161338w7c6ec624s3c07b7e950904b9d@mail.gmail.com>
On Sat, 16 Jan 2010 13:38:26 -0800
Colin McCabe <cmccabe@alumni.cmu.edu> wrote:
> Looks promising...
Here's a version that works:
diff --git a/include/ncld.h b/include/ncld.h
new file mode 100644
index 0000000..067e6ff
--- /dev/null
+++ b/include/ncld.h
@@ -0,0 +1,88 @@
+#ifndef __NCLD_H__
+#define __NCLD_H__
+
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING. If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+/*
+ * The ncld.h API is a replacement for cldc.h. Do not include both.
+ *
+ * We do not believe into making "internal" structures "opaque"
+ * with pointers to void. Therefore, this header might want include
+ * some legacy definitions or whatnot, but users do not need to.
+ */
+#include <stdbool.h>
+#include <glib.h>
+#include <cldc.h>
+#include <libtimer.h>
+
+struct ncld_sess {
+ char *host;
+ unsigned short port;
+ GMutex *mutex;
+ GCond *cond;
+ GThread *thread;
+ bool is_up;
+ int errc;
+ GList *handles;
+ int to_thread[2];
+ struct cldc_udp *udp;
+ struct cld_timer udp_timer;
+ struct cld_timer_list tlist;
+ void (*event)(void *, unsigned int);
+ void *event_arg;
+};
+
+struct ncld_fh {
+ struct ncld_sess *ses;
+ struct cldc_fh *fh; /* FIXME cldc_open2 take direct & */
+ bool is_open;
+ int errc;
+ int nios;
+ unsigned int event_mask;
+ void (*event_func)(void *, unsigned int);
+ void *event_arg;
+};
+
+struct ncld_read {
+ /* public to application */
+ const void *ptr;
+ long length;
+
+ struct ncld_fh *fh;
+ /* GCond *cond; -- abusing the conditional of file handle for now */
+ bool is_done;
+ int errc;
+};
+
+extern struct ncld_sess *ncld_sess_open(const char *host, const char *port,
+ int *error, void (*event)(void *, unsigned int), void *ev_arg,
+ const char *cld_user, const char *cld_key, struct hail_log *hlog);
+extern struct ncld_fh *ncld_open(struct ncld_sess *s, const char *fname,
+ unsigned int mode, int *error, unsigned int events,
+ void (*event)(void *, unsigned int), void *ev_arg);
+extern int ncld_del(struct ncld_sess *nsp, const char *fname);
+extern struct ncld_read *ncld_get(struct ncld_fh *fhp, int *error);
+extern void ncld_read_free(struct ncld_read *rp);
+extern int ncld_write(struct ncld_fh *, const void *data, long len);
+extern int ncld_trylock(struct ncld_fh *);
+extern int ncld_unlock(struct ncld_fh *);
+extern void ncld_close(struct ncld_fh *);
+extern void ncld_sess_close(struct ncld_sess *s);
+extern void ncld_init(void);
+
+#endif /* __NCLD_H__ */
diff --git a/lib/cldc.c b/lib/cldc.c
index bc4b48c..0bd2ec3 100644
--- a/lib/cldc.c
+++ b/lib/cldc.c
@@ -30,12 +30,17 @@
#include <errno.h>
#include <time.h>
#include <stdarg.h>
+#include <syslog.h>
+#include <poll.h>
#include <openssl/sha.h>
#include <openssl/hmac.h>
#include <glib.h>
+#include <libtimer.h>
#include <cld-private.h>
#include <cldc.h>
-#include <syslog.h>
+#include <ncld.h>
+
+#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
enum {
CLDC_MSG_EXPIRE = 5 * 60,
@@ -640,7 +645,11 @@ static void sess_expire(struct cldc_session *sess)
sess->ops->timer_ctl(sess->private, false, NULL, NULL, 0);
sess->ops->event(sess->private, sess, NULL, CE_SESS_FAILED);
- /* FIXME why not sess_free here */
+ /*
+ * Do not do sess_free here, or else the application is bound to
+ * step into use-after-free. Only call sess_free when requested.
+ * However, it may be requested right from this callback. Or later.
+ */
}
static int sess_send_pkt(struct cldc_session *sess,
@@ -1379,6 +1388,831 @@ char *cldc_dirent_name(struct cld_dirent_cur *dc)
}
/*
+ * On error, return the code (not negated code like a kernel function would).
+ */
+static int ncld_getsrv(char **hostp, unsigned short *portp,
+ struct hail_log *hlog)
+{
+ enum { hostsz = 64 };
+ char hostb[hostsz];
+ GList *host_list = NULL;
+ GList *tmp;
+ struct cldc_host *hp;
+
+ if (gethostname(hostb, hostsz-1) < 0)
+ return errno;
+ hostb[hostsz-1] = 0;
+
+ if (cldc_getaddr(&host_list, hostb, hlog))
+ return 1001;
+
+ /*
+ * This is a good place to compare weights and priorities, maybe later.
+ * For now, just grab first host in the list.
+ */
+ hp = host_list->data; /* cannot be NULL if success above */
+ *hostp = hp->host;
+ *portp = hp->port;
+
+ for (tmp = host_list; tmp; tmp = tmp->next) {
+ hp = tmp->data;
+ free(hp->host);
+ free(hp);
+ }
+ g_list_free(host_list);
+ return 0;
+}
+
+static int ncld_gethost(char **hostp, unsigned short *portp,
+ const char *hostname, const char *portstr)
+{
+ long n;
+
+ n = strtol(portstr, NULL, 10);
+ if (n <= 0 || n >= 65536)
+ return EINVAL;
+ *portp = n;
+
+ if (!(*hostp = strdup(hostname)))
+ return ENOMEM;
+
+ return 0;
+}
+
+static void ncld_udp_timer_event(struct cld_timer *timer)
+{
+ struct ncld_sess *nsp = timer->userdata;
+ struct cldc_udp *udp = nsp->udp;
+
+ if (udp->cb)
+ udp->cb(udp->sess, udp->cb_private);
+}
+
+enum {
+ NCLD_CMD_END = 0,
+ NCLD_CMD_SESEV /* argument - 4 bytes in host order */
+};
+
+/*
+ * All the error printouts are likely to be lost for daemons, but it's
+ * not a big deal. We abort instead of exist to indicate that something
+ * went wrong, so system features should report it (usualy as a core).
+ * When debugging, strace or -F mode will capture the output.
+ */
+static void ncld_thread_command(struct ncld_sess *nsp)
+{
+ ssize_t rrc;
+ unsigned char cmd;
+ uint32_t what;
+
+ rrc = read(nsp->to_thread[0], &cmd, 1);
+ if (rrc < 0) {
+ fprintf(stderr, "read error: %s\n", strerror(errno));
+ abort();
+ }
+ if (rrc < 1) {
+ fprintf(stderr, "short read\n");
+ abort();
+ }
+
+ if (cmd == NCLD_CMD_END) {
+ /* No answer to requestor. Wait with g_thread_join. */
+ g_thread_exit(NULL);
+ } else if (cmd == NCLD_CMD_SESEV) {
+ rrc = read(nsp->to_thread[0], &what, sizeof(uint32_t));
+ if (rrc < sizeof(uint32_t)) {
+ fprintf(stderr, "bad read param\n");
+ g_thread_exit(NULL);
+ }
+ if (nsp->event)
+ (*nsp->event)(nsp->event_arg, what);
+ } else {
+ fprintf(stderr, "bad command 0x%x\n", cmd);
+ abort();
+ }
+}
+
+static gpointer ncld_sess_thr(gpointer data)
+{
+ struct ncld_sess *nsp = data;
+ struct pollfd pfd[2];
+ time_t tmo;
+ int i;
+ int rc;
+
+ for (;;) {
+ g_mutex_lock(nsp->mutex);
+ tmo = cld_timers_run(&nsp->tlist);
+ g_mutex_unlock(nsp->mutex);
+
+ memset(pfd, 0, sizeof(pfd));
+ pfd[0].fd = nsp->to_thread[0];
+ pfd[0].events = POLLIN;
+ pfd[1].fd = nsp->udp->fd;
+ pfd[1].events = POLLIN;
+
+ rc = poll(pfd, 2, tmo ? tmo*1000 : -1);
+ if (rc == 0)
+ continue;
+ if (rc < 0) {
+ fprintf(stderr, "poll error: %s\n", strerror(errno));
+ abort();
+ }
+
+ for (i = 0; i < ARRAY_SIZE(pfd); i++) {
+ if (pfd[i].revents) {
+ if (i == 0) {
+ ncld_thread_command(nsp);
+ } else {
+ g_mutex_lock(nsp->mutex);
+ cldc_udp_receive_pkt(nsp->udp);
+ g_mutex_unlock(nsp->mutex);
+ }
+ }
+ }
+ }
+
+ return NULL;
+}
+
+/*
+ * Ask the thread to exit and wait until it does.
+ */
+static void ncld_thr_end(struct ncld_sess *nsp)
+{
+ unsigned char cmd;
+
+ cmd = NCLD_CMD_END;
+ write(nsp->to_thread[1], &cmd, 1);
+ g_thread_join(nsp->thread);
+}
+
+static bool ncld_p_timer_ctl(void *priv, bool add,
+ int (*cb)(struct cldc_session *, void *),
+ void *cb_priv, time_t secs)
+{
+ struct ncld_sess *nsp = priv;
+ struct cldc_udp *udp = nsp->udp;
+
+ if (add) {
+ udp->cb = cb;
+ udp->cb_private = cb_priv;
+ cld_timer_add(&nsp->tlist, &nsp->udp_timer, time(NULL) + secs);
+ } else {
+ cld_timer_del(&nsp->tlist, &nsp->udp_timer);
+ }
+ return true;
+}
+
+static int ncld_p_pkt_send(void *priv, const void *addr, size_t addrlen,
+ const void *buf, size_t buflen)
+{
+ struct ncld_sess *nsp = priv;
+ return cldc_udp_pkt_send(nsp->udp, addr, addrlen, buf, buflen);
+}
+
+static void ncld_p_event(void *priv, struct cldc_session *csp,
+ struct cldc_fh *fh, uint32_t what)
+{
+ struct ncld_sess *nsp = priv;
+ unsigned char cmd;
+
+ if (what == CE_SESS_FAILED) {
+ if (nsp->udp->sess != csp)
+ abort();
+ nsp->is_up = false;
+ /* XXX wake up all I/O waiters here */
+ /*
+ * This is a trick. As a direct callback from clcd layer,
+ * we are running under nsp->mutex, so we cannot call back
+ * into a user of ncld. If we do, it may invoke another
+ * ncld operation and deadlock. So, bump session callbacks
+ * into the part of the helper thread that runs unlocked.
+ */
+ cmd = NCLD_CMD_SESEV;
+ write(nsp->to_thread[1], &cmd, 1);
+ write(nsp->to_thread[1], &what, sizeof(what));
+ }
+}
+
+static struct cldc_ops ncld_ops = {
+ .timer_ctl = ncld_p_timer_ctl,
+ .pkt_send = ncld_p_pkt_send,
+ .event = ncld_p_event,
+ .errlog = NULL,
+};
+
+static int ncld_new_sess(struct cldc_call_opts *copts, enum cle_err_codes errc)
+{
+ struct ncld_sess *nsp = copts->private;
+
+ /*
+ * All callbacks from cldc layer run on the context of the thread
+ * with nsp->mutex locked, so we don't lock it again here.
+ */
+ nsp->errc = errc;
+ nsp->is_up = true;
+ g_cond_broadcast(nsp->cond);
+ return 0;
+}
+
+static int ncld_wait_session(struct ncld_sess *nsp)
+{
+ g_mutex_lock(nsp->mutex);
+ while (!nsp->is_up)
+ g_cond_wait(nsp->cond, nsp->mutex);
+ g_mutex_unlock(nsp->mutex);
+ return nsp->errc;
+}
+
+/*
+ * Initiating session may take a while, since we retry failures.
+ * We promise that eventually we return from this function.
+ *
+ * On error, returns NULL and sets the error code (system errno if below 1000,
+ * otherwise our own mysterious codes that you should just print in decimal).
+ *
+ * Keep in mind that the (*event)() is likely to be called on a context
+ * of the extra thread that we spawn. At least we won't steal some random
+ * context and monkey with signals. Also, (*event)() may be called before
+ * this function returns, with the session going down, for example.
+ * This is kind of dirty, but oh well. Maybe we'll fix this later.
+ *
+ * The lifetime of the hlog must be longer than that of the session.
+ *
+ * @param host Host name (NULL if resolving SRV records)
+ * @param port Port string (NULL if resolving SRV records)
+ * @param error Buffer for the error code
+ * @param ev_func Session event function (ok to be NULL)
+ * @param ev_arg User-supplied argument to the session event function
+ * @param cld_user The user identifier to be used to authentication
+ * @param cld_key The user key to be used to authentication
+ * @param hlog The log to which we (sadly) write, because we use cldc.
+ */
+struct ncld_sess *ncld_sess_open(const char *host, const char *port, int *error,
+ void (*ev_func)(void *, unsigned int), void *ev_arg,
+ const char *cld_user, const char *cld_key, struct hail_log *hlog)
+{
+ struct ncld_sess *nsp;
+ struct cldc_call_opts copts;
+ int err;
+ GError *gerr;
+ int rc;
+
+ err = ENOMEM;
+ nsp = malloc(sizeof(struct ncld_sess));
+ if (!nsp)
+ goto out_sesalloc;
+ memset(nsp, 0, sizeof(struct ncld_sess));
+ cld_timer_init(&nsp->udp_timer, "nsp-udp-timer", ncld_udp_timer_event,
+ nsp);
+ nsp->mutex = g_mutex_new();
+ if (!nsp->mutex)
+ goto out_mutex;
+ nsp->cond = g_cond_new();
+ if (!nsp->cond)
+ goto out_cond;
+
+ if (!host) {
+ err = EINVAL;
+ if (port)
+ goto out_portarg;
+ err = ncld_getsrv(&nsp->host, &nsp->port, hlog);
+ if (err)
+ goto out_srv;
+ } else {
+ err = EINVAL;
+ if (!port)
+ goto out_portarg;
+ err = ncld_gethost(&nsp->host, &nsp->port, host, port);
+ if (err)
+ goto out_srv;
+ }
+
+ nsp->event = ev_func;
+ nsp->event_arg = ev_arg;
+
+ if (pipe(nsp->to_thread) < 0) {
+ err = errno;
+ goto out_pipe_to;
+ }
+
+ if (cldc_udp_new(nsp->host, nsp->port, &nsp->udp)) {
+ err = 1023;
+ goto out_udp;
+ }
+
+ nsp->thread = g_thread_create(ncld_sess_thr, nsp, TRUE, &gerr);
+ if (nsp->thread == NULL) {
+ HAIL_ERR(hlog, "Failed to start replication thread: %s",
+ gerr->message);
+ err = 1022;
+ goto out_thread;
+ }
+
+ /*
+ * FIXME If anyone ever gets 2 sessions in one application,
+ * their logs' lifetimes will overlap. We need to move the errlog
+ * from ops to the session itself.
+ */
+ ncld_ops.errlog = hlog->func;
+
+ memset(&copts, 0, sizeof(copts));
+ copts.cb = ncld_new_sess;
+ copts.private = nsp;
+ if (cldc_new_sess(&ncld_ops, &copts, nsp->udp->addr, nsp->udp->addr_len,
+ cld_user, cld_key, nsp, &nsp->udp->sess)) {
+ err = 1024;
+ goto out_session;
+ }
+
+ /* nsp->udp->sess->log.verbose = hlog->verbose; */
+
+ rc = ncld_wait_session(nsp);
+ if (rc) {
+ err = 1100 + rc % 1000;
+ goto out_start;
+ }
+
+ return nsp;
+
+out_start:
+ cldc_kill_sess(nsp->udp->sess);
+out_session:
+ ncld_thr_end(nsp);
+out_thread:
+ cldc_udp_free(nsp->udp);
+out_udp:
+ close(nsp->to_thread[0]);
+ close(nsp->to_thread[1]);
+out_pipe_to:
+ free(nsp->host);
+out_srv:
+out_portarg:
+ g_cond_free(nsp->cond);
+out_cond:
+ g_mutex_free(nsp->mutex);
+out_mutex:
+ free(nsp);
+out_sesalloc:
+ *error = err;
+ return NULL;
+}
+
+static int ncld_open_cb(struct cldc_call_opts *copts, enum cle_err_codes errc)
+{
+ struct ncld_fh *fhp = copts->private;
+
+ fhp->errc = errc;
+ fhp->is_open = true;
+ g_cond_broadcast(fhp->ses->cond);
+ return 0;
+}
+
+static int ncld_wait_open(struct ncld_fh *fhp)
+{
+ struct ncld_sess *nsp = fhp->ses;
+
+ g_mutex_lock(nsp->mutex);
+ while (!fhp->is_open)
+ g_cond_wait(nsp->cond, nsp->mutex);
+ g_mutex_unlock(nsp->mutex);
+ return fhp->errc;
+}
+
+/*
+ * We do not provide anything like ncld_set_callback because it inevitably
+ * leads to lost events. Unused arguments are not too onerous, so just
+ * put zero into the events mask if you don't want notifications.
+ *
+ * FIXME It's a moot point for now anyway, since we don't have unlock-notify,
+ * or a waiting lock anyway.
+ *
+ * On error, return NULL and set the error code (can be errno or our own code).
+ */
+struct ncld_fh *ncld_open(struct ncld_sess *nsp, const char *fname,
+ unsigned int mode, int *error, unsigned int events,
+ void (*ev_func)(void *, unsigned int), void *ev_arg)
+{
+ struct ncld_fh *fhp;
+ struct cldc_call_opts copts;
+ int err;
+ int rc;
+
+ err = EBUSY;
+ if (!nsp->is_up)
+ goto out_session;
+
+ err = ENOMEM;
+ fhp = malloc(sizeof(struct ncld_fh));
+ if (!fhp)
+ goto out_alloc;
+ memset(fhp, 0, sizeof(struct ncld_fh));
+ fhp->ses = nsp;
+ fhp->event_mask = events;
+ fhp->event_func = ev_func;
+ fhp->event_arg = ev_arg;
+
+ g_mutex_lock(nsp->mutex);
+ memset(&copts, 0, sizeof(copts));
+ copts.cb = ncld_open_cb;
+ copts.private = fhp;
+ rc = cldc_open(nsp->udp->sess, &copts, fname, mode, events, &fhp->fh);
+ if (rc) {
+ err = -rc;
+ g_mutex_unlock(nsp->mutex);
+ goto out_open;
+ }
+ g_mutex_unlock(nsp->mutex);
+
+ rc = ncld_wait_open(fhp);
+ if (rc) {
+ err = 1100 + rc;
+ goto out_start;
+ }
+
+ nsp->handles = g_list_prepend(nsp->handles, fhp);
+ return fhp;
+
+out_start:
+ /*
+ * This is screwy, but we do nothing here. There is no way to free
+ * a cldc file handle that failed its open. It is stuck in the
+ * array of handles until the session closes (although it can be
+ * garbage-collected if we're lucky).
+ */
+out_open:
+ free(fhp);
+out_alloc:
+out_session:
+ *error = err;
+ return NULL;
+}
+
+struct ncld_delio {
+ struct ncld_sess *ses;
+ bool is_done;
+ int errc;
+};
+
+static int ncld_del_cb(struct cldc_call_opts *copts, enum cle_err_codes errc)
+{
+ struct ncld_delio *dp = copts->private;
+ struct ncld_sess *nsp = dp->ses;
+
+ dp->errc = errc;
+ dp->is_done = true;
+ g_cond_broadcast(nsp->cond);
+ return 0;
+}
+
+static int ncld_wait_del(struct ncld_delio *dp)
+{
+ struct ncld_sess *nsp = dp->ses;
+
+ g_mutex_lock(nsp->mutex);
+ while (!dp->is_done)
+ g_cond_wait(nsp->cond, nsp->mutex);
+ g_mutex_unlock(nsp->mutex);
+ return dp->errc;
+}
+
+int ncld_del(struct ncld_sess *nsp, const char *fname)
+{
+ struct cldc_call_opts copts;
+ struct ncld_delio dpb;
+ int rc;
+
+ if (!nsp->is_up)
+ return -EBUSY;
+
+ memset(&dpb, 0, sizeof(struct ncld_delio));
+ dpb.ses = nsp;
+
+ g_mutex_lock(nsp->mutex);
+ memset(&copts, 0, sizeof(copts));
+ copts.cb = ncld_del_cb;
+ copts.private = nsp;
+ rc = cldc_del(nsp->udp->sess, &copts, fname);
+ if (rc) {
+ g_mutex_unlock(nsp->mutex);
+ return -rc;
+ }
+ /* XXX A delete operation is not accounted for end-session */
+ g_mutex_unlock(nsp->mutex);
+
+ rc = ncld_wait_del(&dpb);
+ if (rc)
+ return rc + 1100;
+
+ return 0;
+}
+
+static int ncld_read_cb(struct cldc_call_opts *copts, enum cle_err_codes errc)
+{
+ struct ncld_read *rp = copts->private;
+ struct ncld_fh *fhp = rp->fh;
+
+ if (errc) {
+ rp->errc = errc;
+ } else {
+ rp->ptr = copts->u.get.buf;
+ rp->length = copts->u.get.size;
+ }
+ rp->is_done = true;
+ g_cond_broadcast(fhp->ses->cond);
+ return 0;
+}
+
+static int ncld_wait_read(struct ncld_read *rp)
+{
+ struct ncld_fh *fhp = rp->fh;
+ struct ncld_sess *nsp = fhp->ses;
+
+ g_mutex_lock(nsp->mutex);
+ while (!rp->is_done)
+ g_cond_wait(nsp->cond, nsp->mutex);
+ --fhp->nios;
+ g_mutex_unlock(nsp->mutex);
+ return rp->errc;
+}
+
+/*
+ * @error Error code buffer.
+ * @return Pointer to struct ncld_read or NULL if error.
+ */
+struct ncld_read *ncld_get(struct ncld_fh *fhp, int *error)
+{
+ struct ncld_sess *nsp = fhp->ses;
+ struct ncld_read *rp;
+ struct cldc_call_opts copts;
+ int rc;
+
+ if (!fhp->is_open) {
+ *error = EBUSY;
+ return NULL;
+ }
+
+ rp = malloc(sizeof(struct ncld_read));
+ if (!rp) {
+ *error = ENOMEM;
+ return NULL;
+ }
+ memset(rp, 0, sizeof(struct ncld_read));
+ rp->fh = fhp;
+
+ g_mutex_lock(nsp->mutex);
+ memset(&copts, 0, sizeof(copts));
+ copts.cb = ncld_read_cb;
+ copts.private = rp;
+ rc = cldc_get(fhp->fh, &copts, false);
+ if (rc) {
+ g_mutex_unlock(nsp->mutex);
+ free(rp);
+ *error = -rc;
+ return NULL;
+ }
+ fhp->nios++;
+ g_mutex_unlock(nsp->mutex);
+
+ rc = ncld_wait_read(rp);
+ if (rc) {
+ free(rp);
+ *error = rc + 1100;
+ return NULL;
+ }
+
+ return rp;
+}
+
+void ncld_read_free(struct ncld_read *rp)
+{
+
+ /*
+ * FIXME: Actually the rp->ptr points to sess->msg_buf, so we
+ * cannot issue 2 cldc_get independently.
+ */
+ /* free(rp->ptr); */
+
+ free(rp);
+}
+
+struct ncld_genio {
+ struct ncld_fh *fh;
+ bool is_done;
+ int errc;
+};
+
+static int ncld_genio_cb(struct cldc_call_opts *copts, enum cle_err_codes errc)
+{
+ struct ncld_genio *ap = copts->private;
+ struct ncld_fh *fhp = ap->fh;
+
+ ap->errc = errc;
+ ap->is_done = true;
+ g_cond_broadcast(fhp->ses->cond);
+ return 0;
+}
+
+static int ncld_wait_genio(struct ncld_genio *ap)
+{
+ struct ncld_fh *fhp = ap->fh;
+ struct ncld_sess *nsp = fhp->ses;
+
+ g_mutex_lock(nsp->mutex);
+ while (!ap->is_done)
+ g_cond_wait(nsp->cond, nsp->mutex);
+ --fhp->nios;
+ g_mutex_unlock(nsp->mutex);
+ return ap->errc;
+}
+
+/*
+ * @return: Zero or error code.
+ */
+int ncld_write(struct ncld_fh *fhp, const void *data, long len)
+{
+ struct ncld_sess *nsp = fhp->ses;
+ struct cldc_call_opts copts;
+ struct ncld_genio apb;
+ int rc;
+
+ if (!fhp->is_open)
+ return -EBUSY;
+
+ memset(&apb, 0, sizeof(struct ncld_genio));
+ apb.fh = fhp;
+
+ g_mutex_lock(nsp->mutex);
+ memset(&copts, 0, sizeof(copts));
+ copts.cb = ncld_genio_cb;
+ copts.private = &apb;
+ rc = cldc_put(fhp->fh, &copts, data, len);
+ if (rc) {
+ g_mutex_unlock(nsp->mutex);
+ return -rc;
+ }
+ fhp->nios++;
+ g_mutex_unlock(nsp->mutex);
+
+ rc = ncld_wait_genio(&apb);
+ if (rc)
+ return rc + 1100;
+
+ return 0;
+}
+
+/*
+ * FIXME There is no "wait for lock".
+ */
+int ncld_trylock(struct ncld_fh *fhp)
+{
+ struct ncld_sess *nsp = fhp->ses;
+ struct cldc_call_opts copts;
+ struct ncld_genio apb;
+ int rc;
+
+ if (!fhp->is_open)
+ return -EBUSY;
+
+ memset(&apb, 0, sizeof(struct ncld_genio));
+ apb.fh = fhp;
+
+ g_mutex_lock(nsp->mutex);
+ memset(&copts, 0, sizeof(copts));
+ copts.cb = ncld_genio_cb;
+ copts.private = &apb;
+ rc = cldc_lock(fhp->fh, &copts, 0, false);
+ if (rc) {
+ g_mutex_unlock(nsp->mutex);
+ return -rc;
+ }
+ fhp->nios++;
+ g_mutex_unlock(nsp->mutex);
+
+ rc = ncld_wait_genio(&apb);
+ if (rc)
+ return rc + 1100;
+
+ return 0;
+}
+
+int ncld_unlock(struct ncld_fh *fhp)
+{
+ struct ncld_sess *nsp = fhp->ses;
+ struct cldc_call_opts copts;
+ struct ncld_genio apb;
+ int rc;
+
+ if (!fhp->is_open)
+ return -EBUSY;
+
+ memset(&apb, 0, sizeof(struct ncld_genio));
+ apb.fh = fhp;
+
+ g_mutex_lock(nsp->mutex);
+ memset(&copts, 0, sizeof(copts));
+ copts.cb = ncld_genio_cb;
+ copts.private = &apb;
+ rc = cldc_unlock(fhp->fh, &copts);
+ if (rc) {
+ g_mutex_unlock(nsp->mutex);
+ return -rc;
+ }
+ fhp->nios++;
+ g_mutex_unlock(nsp->mutex);
+
+ rc = ncld_wait_genio(&apb);
+ if (rc)
+ return rc + 1100;
+
+ return 0;
+}
+
+static int ncld_close_cb(struct cldc_call_opts *copts, enum cle_err_codes errc)
+{
+ struct ncld_fh *fhp = copts->private;
+
+ fhp->errc = errc;
+ fhp->is_open = false;
+ g_cond_broadcast(fhp->ses->cond);
+ return 0;
+}
+
+/*
+ * The poisoning in ncld_close is mostly for bug-catching. Do not try to
+ * force-close file handles if other threads bomb them I/Os. This cannot work
+ * for us, because users keep pointers, not file descriptor numbers.
+ * Applications should stop application I/O first, then close.
+ */
+void ncld_close(struct ncld_fh *fhp)
+{
+ struct ncld_sess *nsp = fhp->ses;
+ struct cldc_call_opts copts;
+ int rc;
+
+ if (!fhp->is_open)
+ abort();
+
+ g_mutex_lock(nsp->mutex);
+ memset(&copts, 0, sizeof(copts));
+ copts.cb = ncld_close_cb;
+ copts.private = fhp;
+ rc = cldc_close(fhp->fh, &copts);
+ g_mutex_unlock(nsp->mutex);
+
+ if (rc == 0) {
+ g_mutex_lock(nsp->mutex);
+ while (fhp->is_open)
+ g_cond_wait(nsp->cond, nsp->mutex);
+ g_mutex_unlock(nsp->mutex);
+ /* At this point, we ignore fhp->errc. */
+ }
+
+ /*
+ * The cldc layer dowes not allow us to kill handles willy-nilly,
+ * so the only thing we can do is to wait for I/Os to terminate.
+ *
+ * N.B.: this is making use of the fact that we only have one
+ * conditional per session, and therefore end-of-I/O pokes us here.
+ */
+ g_mutex_lock(nsp->mutex);
+ while (fhp->nios)
+ g_cond_wait(nsp->cond, nsp->mutex);
+ g_mutex_unlock(nsp->mutex);
+
+ nsp->handles = g_list_remove_all(nsp->handles, fhp);
+ free(fhp);
+}
+
+static void ncld_func_close(gpointer data, gpointer priv)
+{
+ ncld_close(data);
+}
+
+void ncld_sess_close(struct ncld_sess *nsp)
+{
+
+ g_list_foreach(nsp->handles, ncld_func_close, NULL);
+ g_list_free(nsp->handles);
+
+ cldc_kill_sess(nsp->udp->sess);
+ ncld_thr_end(nsp);
+ cldc_udp_free(nsp->udp);
+ close(nsp->to_thread[0]);
+ close(nsp->to_thread[1]);
+ g_cond_free(nsp->cond);
+ g_mutex_free(nsp->mutex);
+ free(nsp->host);
+ free(nsp);
+}
+
+void ncld_init(void)
+{
+ cldc_init();
+}
+
+/*
* For extra safety, call cldc_init after g_thread_init, if present.
* Currently we just call srand(), but since we use GLib, we may need
* to add some Glib stuff here and that must come after g_thread_init.
diff --git a/test/lock-file-event.c b/test/lock-file-event.c
index 0772e4e..f2d7c5e 100644
--- a/test/lock-file-event.c
+++ b/test/lock-file-event.c
@@ -25,267 +25,91 @@
#include "cld-config.h"
#include <sys/types.h>
-#include <unistd.h>
#include <time.h>
#include <stdlib.h>
#include <stdio.h>
-#include <string.h>
-#include <libtimer.h>
-#include <cldc.h>
+#include <stdarg.h>
+#include <ncld.h>
#include "test.h"
-struct run {
- struct cldc_udp *udp;
- struct timer tmr_test;
- struct timer tmr_udp;
- struct cldc_fh *fh;
- char buf[LOCKLEN];
-};
-
-static int new_sess_cb(struct cldc_call_opts *copts, enum cle_err_codes errc);
-static int open_1_cb(struct cldc_call_opts *coptarg, enum cle_err_codes errc);
-static int write_1_cb(struct cldc_call_opts *coptarg, enum cle_err_codes errc);
-static int lock_1_cb(struct cldc_call_opts *coptarg, enum cle_err_codes errc);
-static void timer_1(struct run *rp);
-static int close_1_cb(struct cldc_call_opts *coptarg, enum cle_err_codes errc);
-static int end_sess_cb(struct cldc_call_opts *copts, enum cle_err_codes errc);
-
-static bool do_timer_ctl(void *priv, bool add,
- int (*cb)(struct cldc_session *, void *),
- void *cb_priv, time_t secs)
+static void applog(int prio, const char *fmt, ...)
{
- struct run *rp = priv;
+ char buf[200];
+ va_list ap;
- if (add) {
- rp->udp->cb = cb;
- rp->udp->cb_private = cb_priv;
- timer_add(&rp->tmr_udp, time(NULL) + secs);
- } else {
- timer_del(&rp->tmr_udp);
- }
-
- return true;
+ va_start(ap, fmt);
+ snprintf(buf, 200, "%s\n", fmt);
+ vfprintf(stderr, buf, ap);
+ va_end(ap);
}
-static int do_pkt_send(void *priv, const void *addr, size_t addrlen,
- const void *buf, size_t buflen)
-{
- struct run *rp = priv;
- return cldc_udp_pkt_send(rp->udp, addr, addrlen, buf, buflen);
-}
-
-static void timer_udp_event(struct timer *timer)
-{
- struct run *rp = timer->userdata;
- struct cldc_udp *udp = rp->udp;
-
- if (udp->cb)
- udp->cb(udp->sess, udp->cb_private);
-}
-
-static void do_event(void *private, struct cldc_session *sess,
- struct cldc_fh *fh, uint32_t event_mask)
-{
- fprintf(stderr, "EVENT(0x%x)\n", event_mask);
-}
+static struct hail_log test_log = {
+ .func = applog,
+ .verbose = 1,
+};
-static int new_sess_cb(struct cldc_call_opts *coptarg, enum cle_err_codes errc)
+int main (int argc, char *argv[])
{
- struct run *rp = coptarg->private;
- struct cldc_call_opts copts;
+ struct ncld_sess *nsp;
+ struct ncld_fh *fhp;
+ int port;
+ char portstr[6];
+ struct timespec tm;
+ int error;
int rc;
- if (errc != CLE_OK) {
- fprintf(stderr, "new-sess failed: %d\n", errc);
- exit(1);
- }
-
- /* We use a fixed file name because we contact a private copy of CLD */
- memset(&copts, 0, sizeof(copts));
- copts.cb = open_1_cb;
- copts.private = rp;
- rc = cldc_open(rp->udp->sess, &copts, TLNAME,
- COM_WRITE | COM_LOCK | COM_CREATE,
- CE_SESS_FAILED, &rp->fh);
- if (rc) {
- fprintf(stderr, "cldc_open call error %d\n", rc);
- exit(1);
- }
- return 0;
-}
-
-static int open_1_cb(struct cldc_call_opts *coptarg, enum cle_err_codes errc)
-{
- struct run *rp = coptarg->private;
- struct cldc_call_opts copts;
- int rc;
+ g_thread_init(NULL);
+ ncld_init();
- if (errc != CLE_OK) {
- fprintf(stderr, "first-open failed: %d\n", errc);
- exit(1);
- }
- if (rp->fh == NULL) {
- fprintf(stderr, "first-open NULL fh\n");
- exit(1);
- }
- if (!rp->fh->valid) {
- fprintf(stderr, "first-open invalid fh\n");
- exit(1);
- }
+ port = cld_readport(TEST_PORTFILE_CLD);
+ if (port < 0)
+ return port;
+ if (port == 0)
+ return -1;
+ snprintf(portstr, sizeof(portstr), "%u", port);
- memset(&copts, 0, sizeof(copts));
- copts.cb = write_1_cb;
- copts.private = rp;
- rc = cldc_put(rp->fh, &copts, rp->buf, LOCKLEN);
- if (rc) {
- fprintf(stderr, "cldc_put call error %d\n", rc);
+ nsp = ncld_sess_open(TEST_HOST, portstr, &error, NULL, NULL,
+ TEST_USER, TEST_USER_KEY, &test_log);
+ if (!nsp) {
+ fprintf(stderr, "ncld_sess_open(host %s port %s) failed: %d\n",
+ TEST_HOST, portstr, error);
exit(1);
}
- return 0;
-}
-
-static int write_1_cb(struct cldc_call_opts *coptarg, enum cle_err_codes errc)
-{
- struct run *rp = coptarg->private;
- struct cldc_call_opts copts;
- int rc;
- if (errc != CLE_OK) {
- fprintf(stderr, "first-put failed: %d\n", errc);
+ fhp = ncld_open(nsp, TLNAME, COM_WRITE | COM_LOCK | COM_CREATE,
+ &error, 0, NULL, NULL);
+ if (!fhp) {
+ fprintf(stderr, "ncld_open(%s) failed: %d\n", TLNAME, error);
exit(1);
}
- memset(&copts, 0, sizeof(copts));
- copts.cb = lock_1_cb;
- copts.private = rp;
- rc = cldc_lock(rp->fh, &copts, 0, false);
+ rc = ncld_write(fhp, LOCKSTR, LOCKLEN);
if (rc) {
- fprintf(stderr, "cldc_lock call error %d\n", rc);
- exit(1);
- }
- return 0;
-}
-
-static int lock_1_cb(struct cldc_call_opts *coptarg, enum cle_err_codes errc)
-{
- struct run *rp = coptarg->private;
-
- if (errc != CLE_OK) {
- fprintf(stderr, "first-lock failed: %d\n", errc);
+ fprintf(stderr, "ncld_write failed: %d\n", rc);
exit(1);
}
- /* Idle for 40s to verify that session sustains a protocol ping. */
- timer_add(&rp->tmr_test, time(NULL) + 40);
- return 0;
-}
-
-static void timer_test_event(struct timer *timer)
-{
- struct run *rp = timer->userdata;
-
- timer_1(rp);
-}
-
-static void timer_1(struct run *rp)
-{
- struct cldc_call_opts copts;
- int rc;
-
- memset(&copts, 0, sizeof(copts));
- copts.cb = close_1_cb;
- copts.private = rp;
- rc = cldc_close(rp->fh, &copts);
+ rc = ncld_trylock(fhp);
if (rc) {
- fprintf(stderr, "cldc_close call error %d\n", rc);
+ fprintf(stderr, "ncld_trylock failed: %d\n", rc);
exit(1);
}
-}
-static int close_1_cb(struct cldc_call_opts *coptarg, enum cle_err_codes errc)
-{
- struct run *rp = coptarg->private;
- struct cldc_call_opts copts;
- int rc;
-
- if (errc != CLE_OK) {
- fprintf(stderr, "first-close failed: %d\n", errc);
- exit(1);
- }
- rp->fh = NULL;
+ printf("idling 40s...\n"); fflush(stdout);
+ /* Idle for 40s to verify that session sustains a protocol ping. */
+ tm.tv_sec = 40;
+ tm.tv_nsec = 0;
+ nanosleep(&tm, NULL);
- memset(&copts, 0, sizeof(copts));
- copts.cb = end_sess_cb;
- copts.private = rp;
- rc = cldc_end_sess(rp->udp->sess, &copts);
+ rc = ncld_unlock(fhp);
if (rc) {
- fprintf(stderr, "cldc_end_sess call error %d\n", rc);
+ fprintf(stderr, "ncld_unlock failed: %d\n", rc);
exit(1);
}
- return 0;
-}
-
-static int end_sess_cb(struct cldc_call_opts *copts, enum cle_err_codes errc)
-{
- if (errc != CLE_OK) {
- fprintf(stderr, "end-sess failed: %d\n", errc);
- exit(1);
- }
-
- /* session ended; success */
- exit(0);
- return 0;
-}
-
-static struct run run;
-
-static struct cldc_ops ops = {
- .timer_ctl = do_timer_ctl,
- .pkt_send = do_pkt_send,
- .event = do_event,
-};
-
-static int init(void)
-{
- int rc;
- int port;
- struct cldc_call_opts copts;
-
- memcpy(run.buf, LOCKSTR, LOCKLEN);
-
- port = cld_readport(TEST_PORTFILE_CLD);
- if (port < 0)
- return port;
- if (port == 0)
- return -1;
-
- timer_init(&run.tmr_test, "lock-timer", timer_test_event, &run);
- timer_init(&run.tmr_udp, "udp-timer", timer_udp_event, &run);
-
- rc = cldc_udp_new(TEST_HOST, port, &run.udp);
- if (rc)
- return rc;
-
- memset(&copts, 0, sizeof(copts));
- copts.cb = new_sess_cb;
- copts.private = &run;
- rc = cldc_new_sess(&ops, &copts, run.udp->addr, run.udp->addr_len,
- TEST_USER, TEST_USER_KEY, &run, &run.udp->sess);
- if (rc)
- return rc;
-
- // run.udp->sess->verbose = true;
- return 0;
-}
-
-int main (int argc, char *argv[])
-{
- g_thread_init(NULL);
- cldc_init();
- if (init())
- return 1;
- test_loop(run.udp);
+ /* These two are perfect places to hang or crash, so don't just exit. */
+ ncld_close(fhp);
+ ncld_sess_close(nsp);
return 0;
}
I am going to work on this gradually and flip over cldcli, rest of tests,
and cldu.c in chunkd and tabled, then we'll see how it works.
-- Pete
next prev parent reply other threads:[~2010-01-19 21:54 UTC|newest]
Thread overview: 8+ messages / expand[flat|nested] mbox.gz Atom feed top
2010-01-16 2:40 the evils of the CLD api Colin McCabe
2010-01-16 6:53 ` Pete Zaitcev
2010-01-16 21:38 ` Colin McCabe
2010-01-16 23:21 ` Pete Zaitcev
2010-01-17 1:54 ` Colin McCabe
2010-01-17 5:55 ` Pete Zaitcev
2010-01-19 21:54 ` Pete Zaitcev [this message]
2010-01-18 21:58 ` Jeff Garzik
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20100119145405.16faaa24@redhat.com \
--to=zaitcev@redhat.com \
--cc=cmccabe@alumni.cmu.edu \
--cc=hail-devel@vger.kernel.org \
--cc=jeff@garzik.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.