All of lore.kernel.org
 help / color / mirror / Atom feed
* [Patch 1/1] tabled: switch to ncld
@ 2010-02-09  4:42 Pete Zaitcev
  2010-02-09  5:06 ` Jeff Garzik
  2010-02-14  2:44 ` Jeff Garzik
  0 siblings, 2 replies; 3+ messages in thread
From: Pete Zaitcev @ 2010-02-09  4:42 UTC (permalink / raw)
  To: Jeff Garzik; +Cc: Project Hail List

No new function just yet, only a switch-over.

Signed-Off-By: Pete Zaitcev <zaitcev@redhat.com>

---
 server/cldu.c |  789 +++++++++++++-----------------------------------
 1 file changed, 215 insertions(+), 574 deletions(-)

diff --git a/server/cldu.c b/server/cldu.c
index 7e176d4..aecf336 100644
--- a/server/cldu.c
+++ b/server/cldu.c
@@ -29,8 +29,8 @@
 #include <unistd.h>
 #include <event.h>
 #include <errno.h>
-#include <cldc.h>
 #include <elist.h>
+#include <ncld.h>
 #include "tabled.h"
 
 #define ALIGN8(n)	((8 - ((n) & 7)) & 7)
@@ -49,60 +49,38 @@ struct cld_host {
 
 struct cld_session {
 	bool forced_hosts;		/* Administrator overrode default CLD */
-	bool sess_open;
-	struct cldc_udp *lib;		/* library state */
-	struct event lib_timer;
-	int retry_cnt;
-	int last_recv_err;
+	bool is_dead;
+	struct ncld_sess *nsp;		/* library state */
 
 	/*
 	 * For code sanity and being isomorphic with conventional programming
 	 * using sleep(), neither of the timers must ever be active simultane-
 	 * ously with any other. But using one timer structure is too annoying.
 	 */
-	struct event tm_retry;
+	// struct event tm_relock;
 	struct event tm_rescan;
-	struct event tm_reopen;
 
 	int actx;		/* Active host cldv[actx] */
 	struct cld_host cldv[N_CLD];
 
 	char *thisgroup;
 	char *thishost;
-	struct event ev;	/* Associated with fd */
 	char *cfname;		/* /tabled-group directory */
-	struct cldc_fh *cfh;	/* /tabled-group directory, keep open for scan */
+	struct ncld_fh *cfh;	/* /tabled-group directory, keep open for scan */
 	char *ffname;		/* /tabled-group/thishost */
-	struct cldc_fh *ffh;	/* /tabled-group/thishost, keep open for lock */
+	struct ncld_fh *ffh;	/* /tabled-group/thishost, keep open for lock */
 	char *xfname;		/* /chunk-GROUP directory */
-	struct cldc_fh *xfh;	/* /chunk-GROUP directory */
-	char *yfname;		/* /chunk-GROUP/NID file */
-	struct cldc_fh *yfh;	/* /chunk-GROUP/NID file */
 
 	struct list_head chunks;	/* found in xfname, struct chunk_node */
 };
 
 static int cldu_set_cldc(struct cld_session *sp, int newactive);
-static int cldu_new_sess(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_open_c_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_open_f_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_lock_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_put_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static void try_open_x(struct cld_session *sp);
-static int cldu_open_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_get_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_close_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static void next_chunk(struct cld_session *sp);
-static int cldu_open_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_get_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static int cldu_close_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
+static int scan_chunks(struct cld_session *sp);
+static void next_chunk(struct cld_session *sp, struct chunk_node *np);
 static void add_remote(const char *name);
 static void add_chunk_node(struct cld_session *sp, const char *name);
 
-static struct timeval cldu_retry_delay = { 5, 0 };
 static struct timeval cldu_rescan_delay = { 50, 0 };
-static struct timeval cldu_reopen_delay = { 3, 0 };
 
 struct hail_log cldu_hail_log = {
 	.func		= applog,
@@ -169,170 +147,68 @@ err_oom:
 	return 0;
 }
 
-static void cldu_tm_retry(int fd, short events, void *userdata)
-{
-	struct cld_session *sp = userdata;
-
-	if (++sp->retry_cnt >= 5) {
-		applog(LOG_INFO, "Out of retries for %s, bailing", sp->xfname);
-		exit(1);
-	}
-	if (debugging)
-		applog(LOG_DEBUG, "Trying to open %s", sp->xfname);
-	try_open_x(sp);
-}
-
 static void cldu_tm_rescan(int fd, short events, void *userdata)
 {
 	struct cld_session *sp = userdata;
+	int newactive;
 
 	/* Add rescanning for tabled nodes as well. FIXME */
 	if (debugging)
 		applog(LOG_DEBUG, "Rescanning for Chunks in %s", sp->xfname);
-	try_open_x(sp);
-}
-
-static void cldu_tm_reopen(int fd, short events, void *userdata)
-{
-	struct cld_session *sp = userdata;
-
-	if (debugging)
-		applog(LOG_DEBUG, "Trying to reopen %d storage nodes",
-		       tabled_srv.num_stor);
-	if (stor_update_cb() < 1)
-		evtimer_add(&sp->tm_reopen, &cldu_reopen_delay);
-}
 
-static void cldu_event(int fd, short events, void *userdata)
-{
-	struct cld_session *sp = userdata;
-	int rc;
-
-	if (!sp->lib) {
-		applog(LOG_WARNING, "Stray UDP event");
-		return;
-	}
-
-	rc = cldc_udp_receive_pkt(sp->lib);
-	if (rc) {
-		if (rc != sp->last_recv_err) {
-			if (rc < -1000)		/* our internal code */
-				applog(LOG_INFO,
-				       "cldc_udp_receive_pkt failed: %d", rc);
-			else
-				applog(LOG_INFO,
-				       "cldc_udp_receive_pkt failed: %s",
-				       strerror(-rc));
-			sp->last_recv_err = rc;
-		}
-		/*
-		 * Reacting to ICMP messages is a bad idea, because
-		 *  - it makes us loop hard in case CLD is down, unless we
-		 *    insert additional tricky timeouts
-		 *  - it deals poorly with transient problems like CLD reboots
-		 */
-#if 0
-		if (rc == -ECONNREFUSED) {	/* ICMP tells us */
-			int newactive;
-			// evtimer_del(&sp->tm);
-			cldc_kill_sess(sp->lib->sess);
-			sp->lib->sess = NULL;
-			newactive = cldu_nextactive(sp);
-			if (cldu_set_cldc(sp, newactive))
-				return;
-			// evtimer_add(&sp->tm, &cldc_to_delay);
+	if (sp->is_dead) {
+		ncld_sess_close(sp->nsp);
+		sp->nsp = NULL;
+		sp->is_dead = 0;
+		newactive = cldu_nextactive(sp);
+		if (cldu_set_cldc(sp, newactive)) {
+			evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
+			return;
 		}
-		return;
-#endif
-	}
-}
-
-static bool cldu_p_timer_ctl(void *priv, bool add,
-			     int (*cb)(struct cldc_session *, void *),
-			     void *cb_priv, time_t secs)
-{
-	struct cld_session *sp = priv;
-	struct cldc_udp *udp = sp->lib;
-	struct timeval tv = { secs, 0 };
-
-	if (add) {
-		udp->cb = cb;
-		udp->cb_private = cb_priv;
-		return evtimer_add(&sp->lib_timer, &tv) == 0;
-	} else {
-		return evtimer_del(&sp->lib_timer) == 0;
 	}
-}
 
-static int cldu_p_pkt_send(void *priv, const void *addr, size_t addrlen,
-			       const void *buf, size_t buflen)
-{
-	struct cld_session *sp = priv;
-	return cldc_udp_pkt_send(sp->lib, addr, addrlen, buf, buflen);
+	scan_chunks(sp);
+	evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
 }
 
-static void cldu_udp_timer_event(int fd, short events, void *userdata)
-
-{
-	struct cld_session *sp = userdata;
-	struct cldc_udp *udp = sp->lib;
-
-	if (udp->cb)
-		udp->cb(udp->sess, udp->cb_private);
-}
-
-static void cldu_p_event(void *priv, struct cldc_session *csp,
-			 struct cldc_fh *fh, uint32_t what)
+static void cldu_sess_event(void *priv, uint32_t what)
 {
 	struct cld_session *sp = priv;
-	int newactive;
 
 	if (what == CE_SESS_FAILED) {
-		sp->sess_open = false;
-		if (sp->lib->sess != csp)
-			applog(LOG_ERR, "Stray session failed, sid " SIDFMT,
-			       SIDARG(csp->sid));
-		else
-			applog(LOG_ERR, "Session failed, sid " SIDFMT,
-			       SIDARG(csp->sid));
-		// evtimer_del(&sp->tm);
-		sp->lib->sess = NULL;
-		newactive = cldu_nextactive(sp);
-		if (cldu_set_cldc(sp, newactive))
-			return;
-		// evtimer_add(&sp->tm, &cldc_to_delay);
+		applog(LOG_ERR, "Session failed, sid " SIDFMT,
+			       SIDARG(sp->nsp->udp->sess->sid));
+		sp->is_dead = true;
 	} else {
-		if (csp)
+		if (sp->nsp)
 			applog(LOG_INFO, "cldc event 0x%x sid " SIDFMT,
-			       what, SIDARG(csp->sid));
+			       what, SIDARG(sp->nsp->udp->sess->sid));
 		else
 			applog(LOG_INFO, "cldc event 0x%x no sid", what);
 	}
 }
 
-static struct cldc_ops cld_ops = {
-	.timer_ctl =	cldu_p_timer_ctl,
-	.pkt_send =	cldu_p_pkt_send,
-	.event =	cldu_p_event,
-	.errlog =	applog,
-};
-
 /*
- * Open the library, start its session, and reguster its socket with libevent.
+ * Open the library, start its session, pre-open files, and set timers.
  * Our session remains consistent in case of an error in this function,
  * so that we can continue and retry meaningfuly.
  */
 static int cldu_set_cldc(struct cld_session *sp, int newactive)
 {
 	struct cldc_host *hp;
-	struct cldc_udp *lib;
-	struct cldc_call_opts copts;
+	struct ncld_read *nrp;
+	char buf[100];
+	const char *ptr;
+	int dir_len;
+	int total_len, rec_len, name_len;
+	int len;
+	struct timespec tm;
+	int error;
 	int rc;
 
-	if (sp->lib) {
-		event_del(&sp->ev);
-		cldc_udp_free(sp->lib);
-		sp->lib = NULL;
+	if (sp->nsp) {
+		ncld_sess_close(sp->nsp);
+		sp->nsp = NULL;
 	}
 
 	sp->actx = newactive;
@@ -342,105 +218,36 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
 	}
 	hp = &sp->cldv[sp->actx].h;
 
-	evtimer_set(&sp->lib_timer, cldu_udp_timer_event, sp);
-
-	rc = cldc_udp_new(hp->host, hp->port, &sp->lib);
-	if (rc) {
-		applog(LOG_ERR, "cldc_udp_new(%s,%u) error: %d",
-		       hp->host, hp->port, rc);
-		goto err_lib_new;
-	}
-	lib = sp->lib;
-
 	if (debugging)
 		applog(LOG_INFO, "Selected CLD host %s port %u",
 		       hp->host, hp->port);
 
-	/*
-	 * This is a little iffy: we assume that it's ok to re-issue
-	 * event_set() for an event that was unregistered with event_del().
-	 * In any case, there's no other way to set the file descriptor.
-	 */
-	event_set(&sp->ev, sp->lib->fd, EV_READ | EV_PERSIST, cldu_event, sp);
-
-	if (event_add(&sp->ev, NULL) < 0) {
-		applog(LOG_INFO, "Failed to add CLD event");
-		goto err_event;
-	}
-
-	memset(&copts, 0, sizeof(struct cldc_call_opts));
-	copts.cb = cldu_new_sess;
-	copts.private = sp;
-	rc = cldc_new_sess(&cld_ops, &copts, lib->addr, lib->addr_len,
-			   "tabled", "tabled", sp, &lib->sess);
-	if (rc) {
-		applog(LOG_INFO,
-		       "Failed to start CLD session on host %s port %u",
-		       hp->host, hp->port);
-		goto err_sess;
-	}
-
-	// if (debugging)
-	//	lib->sess->verbose = true;
-
-	return 0;
-
-err_sess:
-err_event:
-	cldc_udp_free(sp->lib);
-	sp->lib = NULL;
-err_lib_new:
-err_addr:
-	return -1;
-}
-
-static int cldu_new_sess(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
-	struct cld_session *sp = carg->private;
-	struct cldc_call_opts copts;
-	int rc;
-
-	if (errc != CLE_OK) {
-		applog(LOG_INFO, "New CLD session creation failed: %d", errc);
-		return 0;
+	sp->nsp = ncld_sess_open(hp->host, hp->port, &error,
+				 cldu_sess_event, sp, "tabled", "tabled");
+	if (sp->nsp == NULL) {
+		if (error < 1000) {
+			applog(LOG_ERR, "ncld_sess_open(%s,%u) error: %s",
+			       hp->host, hp->port, strerror(error));
+		} else {
+			applog(LOG_ERR, "ncld_sess_open(%s,%u) error: %d",
+			       hp->host, hp->port, error);
+		}
+		goto err_nsess;
 	}
 
-	sp->sess_open = true;
 	applog(LOG_INFO, "New CLD session created, sid " SIDFMT,
-	       SIDARG(sp->lib->sess->sid));
+	       SIDARG(sp->nsp->udp->sess->sid));
 
 	/*
 	 * First, make sure the base directory exists.
 	 */
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_open_c_cb;
-	copts.private = sp;
-	rc = cldc_open(sp->lib->sess, &copts, sp->cfname,
-		       COM_READ | COM_WRITE | COM_CREATE | COM_DIRECTORY,
-		       CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->cfh);
-	if (rc) {
-		applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->cfname, rc);
-	}
-	return 0;
-}
-
-static int cldu_open_c_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
-	struct cld_session *sp = carg->private;
-	struct cldc_call_opts copts;
-	int rc;
-
-	if (errc != CLE_OK) {
-		applog(LOG_ERR, "CLD open(%s) failed: %d", sp->cfname, errc);
-		return 0;
-	}
-	if (sp->cfh == NULL) {
-		applog(LOG_ERR, "CLD open(%s) failed: NULL fh", sp->cfname);
-		return 0;
-	}
-	if (!sp->cfh->valid) {
-		applog(LOG_ERR, "CLD open(%s) failed: invalid fh", sp->cfname);
-		return 0;
+	sp->cfh = ncld_open(sp->nsp, sp->cfname,
+			    COM_READ | COM_WRITE | COM_CREATE | COM_DIRECTORY,
+			    &error, 0 /* CE_MASTER_FAILOVER | CE_SESS_FAILED */,
+			    NULL, NULL);
+	if (!sp->cfh) {
+		applog(LOG_ERR, "CLD open(%s) failed: %d", sp->cfname, error);
+		goto err_copen;
 	}
 
 	if (debugging)
@@ -449,65 +256,38 @@ static int cldu_open_c_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
 	/*
 	 * Then, create the membership file for us.
 	 */
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_open_f_cb;
-	copts.private = sp;
-	rc = cldc_open(sp->lib->sess, &copts, sp->ffname,
-		       COM_WRITE | COM_LOCK | COM_CREATE,
-		       CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->ffh);
-	if (rc) {
-		applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->ffname, rc);
-	}
-	return 0;
-}
-
-static int cldu_open_f_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
-	struct cld_session *sp = carg->private;
-	struct cldc_call_opts copts;
-	int rc;
-
-	if (errc != CLE_OK) {
-		applog(LOG_ERR, "CLD open(%s) failed: %d", sp->ffname, errc);
-		return 0;
-	}
-	if (sp->ffh == NULL) {
-		applog(LOG_ERR, "CLD open(%s) failed: NULL fh", sp->ffname);
-		return 0;
-	}
-	if (!sp->ffh->valid) {
-		applog(LOG_ERR, "CLD open(%s) failed: invalid fh", sp->ffname);
-		return 0;
+	sp->ffh = ncld_open(sp->nsp, sp->ffname,
+			    COM_WRITE | COM_LOCK | COM_CREATE,
+			    &error, 0, NULL, NULL);
+	if (!sp->ffh) {
+		applog(LOG_ERR, "CLD open(%s) failed: %d", sp->ffname, error);
+		goto err_fopen;
 	}
 
 	if (debugging)
 		applog(LOG_DEBUG, "CLD file \"%s\" created", sp->ffname);
 
-	/*
-	 * Lock the file, in case two hosts got the same hostname.
-	 */
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_lock_cb;
-	copts.private = sp;
-	rc = cldc_lock(sp->ffh, &copts, 0, false);
-	if (rc) {
-		applog(LOG_ERR, "cldc_lock call error %d", rc);
-	}
+	for (;;) {
+		rc = ncld_trylock(sp->ffh);
+		if (!rc)
+			break;
 
-	return 0;
-}
+		applog(LOG_ERR, "CLD lock(%s) failed: %d", sp->ffname, rc);
+		if (rc != CLE_LOCK_CONFLICT + 1100)
+			goto err_lock;
 
-static int cldu_lock_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
-	struct cld_session *sp = carg->private;
-	char buf[100];
-	int len;
-	struct cldc_call_opts copts;
-	int rc;
-
-	if (errc != CLE_OK) {
-		applog(LOG_ERR, "CLD lock(%s) failed: %d", sp->ffname, errc);
-		return 0;
+		/*
+		 * The usual reason why we get a lock conflict is
+		 * restarting too quickly and hitting the previous lock
+		 * that is going to disappear soon.
+		 *
+		 * FIXME: However, it may also be that a master
+		 * is ok we we should become a slave, e.g. start TDB.
+		 * We do not support multi-node, but we should.
+		 */
+		tm.tv_sec = 10;
+		tm.tv_nsec = 0;
+		nanosleep(&tm, NULL);
 	}
 
 	/*
@@ -515,65 +295,30 @@ static int cldu_lock_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
 	 */
 	len = snprintf(buf, sizeof(buf), "port: %u\n", tabled_srv.rep_port);
 	if (len >= sizeof(buf)) {
-		applog(LOG_ERR,
-		       "internal error: overflow in cldu_lock_cb (%d)", len);
-		return 0;
+		applog(LOG_ERR, "internal error: overflow for port (%d)", len);
+		goto err_wmem;
 	}
 
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_put_cb;
-	copts.private = sp;
-	rc = cldc_put(sp->ffh, &copts, buf, len);
+	rc = ncld_write(sp->ffh, buf, len);
 	if (rc) {
-		applog(LOG_ERR, "cldc_put(%s) call error: %d", sp->ffname, rc);
-	}
-
-	return 0;
-}
-
-static int cldu_put_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
-	struct cld_session *sp = carg->private;
-	struct cldc_call_opts copts;
-	int rc;
-
-	if (errc != CLE_OK) {
-		applog(LOG_ERR, "CLD put(%s) failed: %d", sp->ffname, errc);
-		return 0;
+		applog(LOG_ERR, "CLD put(%s) failed: %d", sp->ffname, rc);
+		goto err_write;
 	}
 
 	/*
 	 * Read the directory.
 	 */
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_get_1_cb;
-	copts.private = sp;
-	rc = cldc_get(sp->cfh, &copts, false);
-	if (rc) {
-		applog(LOG_ERR, "cldc_get(%s) call error: %d", sp->cfname, rc);
-	}
-
-	return 0;
-}
-
-static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
-	struct cld_session *sp = carg->private;
-	const char *ptr;
-	int dir_len;
-	int total_len, rec_len, name_len;
-	char buf[65];
-
-	if (errc != CLE_OK) {
-		applog(LOG_ERR, "CLD get(%s) failed: %d", sp->cfname, errc);
-		return 0;
+	nrp = ncld_get(sp->cfh, &error);
+	if (!nrp) {
+		applog(LOG_ERR, "CLD get(%s) failed: %d", sp->cfname, error);
+		goto err_dread;
 	}
 
 	if (debugging)
 		applog(LOG_DEBUG, "Known tabled nodes");
 
-	ptr = carg->u.get.buf;
-	dir_len = carg->u.get.size;
+	ptr = nrp->ptr;
+	dir_len = nrp->length;
 	while (dir_len) {
 		name_len = GUINT16_FROM_LE(*(uint16_t *)ptr);
 		rec_len = name_len + 2;
@@ -598,6 +343,8 @@ static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
 		dir_len -= total_len;
 	}
 
+	ncld_read_free(nrp);
+
 	/*
 	 * If configuration gives us storage nodes, we shortcut scanning
 	 * of CLD, because:
@@ -609,96 +356,78 @@ static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
 		if (debugging)
 			applog(LOG_DEBUG, "Trying to open %d storage nodes",
 			       tabled_srv.num_stor);
-		if (stor_update_cb() < 1) {
-			evtimer_add(&sp->tm_reopen, &cldu_reopen_delay);
+		while (stor_update_cb() < 1) {
+			tm.tv_sec = 3;
+			tm.tv_nsec = 0;
+			nanosleep(&tm, NULL);
+			if (debugging)
+				applog(LOG_DEBUG,
+				       "Trying to reopen %d storage nodes",
+				       tabled_srv.num_stor);
 		}
 		return 0;
 	}
 
-	sp->retry_cnt = 0;
-	try_open_x(sp);
 	return 0;
-}
-
-/*
- * Open the xfname, so we can collect registered Chunk servers.
- */
-static void try_open_x(struct cld_session *sp)
-{
-	struct cldc_call_opts copts;
-	int rc;
 
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_open_x_cb;
-	copts.private = sp;
-	rc = cldc_open(sp->lib->sess, &copts, sp->xfname,
-		       COM_READ | COM_DIRECTORY,
-		       CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->xfh);
-	if (rc) {
-		applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->xfname, rc);
-	}
+err_dread:
+err_write:
+err_wmem:
+err_lock:
+	ncld_close(sp->ffh);	/* session-close closes these, maybe drop */
+err_fopen:
+	ncld_close(sp->cfh);
+err_copen:
+	ncld_sess_close(sp->nsp);
+	sp->nsp = NULL;
+err_nsess:
+err_addr:
+	return -1;
 }
 
-static int cldu_open_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
+static int scan_chunks(struct cld_session *sp)
 {
-	struct cld_session *sp = carg->private;
-	struct cldc_call_opts copts;
-	int rc;
-
-	if (errc != CLE_OK) {
-		if (errc == CLE_INODE_INVAL || errc == CLE_NAME_INVAL) {
+	struct ncld_fh *xfh;	/* /chunk-GROUP directory */
+	struct ncld_read *nrp;
+	struct chunk_node *np;
+	const char *ptr;
+	int dir_len;
+	int total_len, rec_len, name_len;
+	char buf[65];
+	int error;
+
+	xfh = ncld_open(sp->nsp, sp->xfname, COM_READ | COM_DIRECTORY,
+			&error, 0 /* CE_MASTER_FAILOVER | CE_SESS_FAILED */,
+			NULL, NULL);
+	if (!xfh) {
+		if (error == CLE_INODE_INVAL + 1100 ||
+		    error == CLE_NAME_INVAL + 1100) {
 			applog(LOG_ERR, "%s: open failed, retrying",
 			       sp->xfname);
-			evtimer_add(&sp->tm_retry, &cldu_retry_delay);
+			return 1;
 		} else {
 			applog(LOG_ERR, "CLD open(%s) failed: %d",
-			       sp->xfname, errc);
+			       sp->xfname, error);
 			/* XXX we're dead, why not exit(1) right away? */
+			return -1;
 		}
-		return 0;
-	}
-	if (sp->xfh == NULL) {
-		applog(LOG_ERR, "CLD open(%s) failed: NULL fh", sp->xfname);
-		return 0;
-	}
-	if (!sp->xfh->valid) {
-		applog(LOG_ERR, "CLD open(%s) failed: invalid fh", sp->xfname);
-		return 0;
 	}
 
 	/*
 	 * Read the directory.
 	 */
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_get_x_cb;
-	copts.private = sp;
-	rc = cldc_get(sp->xfh, &copts, false);
-	if (rc) {
-		applog(LOG_ERR, "cldc_get(%s) call error: %d", sp->cfname, rc);
-	}
-	return 0;
-}
-
-static int cldu_get_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
-	struct cld_session *sp = carg->private;
-	struct cldc_call_opts copts;
-	int rc;
-	const char *ptr;
-	int dir_len;
-	int total_len, rec_len, name_len;
-	char buf[65];
-
-	if (errc != CLE_OK) {
-		applog(LOG_ERR, "CLD get(%s) failed: %d", sp->xfname, errc);
-		return 0;
+	nrp = ncld_get(xfh, &error);
+	if (!nrp) {
+		ncld_close(xfh);
+		applog(LOG_ERR, "CLD get(%s) failed: %d", sp->xfname, error);
+		return -1;
 	}
 
 	if (debugging)
 		applog(LOG_DEBUG, "Known Chunk nodes");
 
-	ptr = carg->u.get.buf;
-	dir_len = carg->u.get.size;
+	ptr = nrp->ptr;
+	dir_len = nrp->length;
 	while (dir_len) {
 		name_len = GUINT16_FROM_LE(*(uint16_t *)ptr);
 		rec_len = name_len + 2;
@@ -718,176 +447,82 @@ static int cldu_get_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
 		dir_len -= total_len;
 	}
 
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_close_x_cb;
-	copts.private = sp;
-	rc = cldc_close(sp->xfh, &copts);
-	if (rc) {
-		applog(LOG_ERR, "cldc_close call error %d", rc);
-	}
-	return 0;
-}
-
-static int cldu_close_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
-	struct cld_session *sp = carg->private;
-	// struct cldc_call_opts copts;
-	// int rc;
-
-	if (errc != CLE_OK) {
-		applog(LOG_ERR, "CLD close(%s) failed: %d", sp->xfname, errc);
-		return 0;
-	}
+	ncld_read_free(nrp);
+	ncld_close(xfh);
 
+	/*
+	 * Scan the collected directory contents and fill the entries.
+	 */
 	if (list_empty(&sp->chunks)) {
 		applog(LOG_INFO, "%s: No Chunk nodes found, retrying",
 		       sp->xfname);
-		if (evtimer_add(&sp->tm_retry, &cldu_retry_delay) != 0) {
-			applog(LOG_ERR, "evtimer_add error %s",
-			       strerror(errno));
-		}
-	} else {
-		next_chunk(sp);
+		return 1;
 	}
-	return 0;
-}
-
-static void next_chunk(struct cld_session *sp)
-{
-	struct chunk_node *np;
-	char *mem;
-	struct cldc_call_opts copts;
-	int rc;
-
-	np = list_entry(sp->chunks.next, struct chunk_node, link);
-
-	if (asprintf(&mem, "/chunk-%s/%s", sp->thisgroup, np->name) == -1) {
-		applog(LOG_WARNING, "OOM in cldu");
-		return;
-	}
-	sp->yfname = mem;
-
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_open_y_cb;
-	copts.private = sp;
-	rc = cldc_open(sp->lib->sess, &copts, sp->yfname,
-		       COM_READ,
-		       CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->yfh);
-	if (rc) {
-		applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->yfname, rc);
-	}
-}
-
-static int cldu_open_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
-	struct cld_session *sp = carg->private;
-	struct cldc_call_opts copts;
-	int rc;
-
-	if (errc != CLE_OK) {
-		applog(LOG_ERR, "CLD open(%s) failed: %d", sp->yfname, errc);
-		free(sp->yfname);
-		sp->yfname = NULL;
-		return 0;
-	}
-	if (sp->yfh == NULL) {
-		applog(LOG_ERR, "CLD open(%s) failed: NULL fh", sp->yfname);
-		free(sp->yfname);
-		sp->yfname = NULL;
-		return 0;
-	}
-	if (!sp->yfh->valid) {
-		applog(LOG_ERR, "CLD open(%s) failed: invalid fh", sp->yfname);
-		free(sp->yfname);
-		sp->yfname = NULL;
-		return 0;
+	while (!list_empty(&sp->chunks)) {
+		np = list_entry(sp->chunks.next, struct chunk_node, link);
+		next_chunk(sp, np);
+		list_del(&np->link);
 	}
 
 	/*
-	 * Read the Chunk's parameter file.
+	 * Poke the dispatch about the possible changes in the
+	 * configuration of Chunk.
+	 *
+	 * It's possible that the CLD directories have many entries,
+	 * but no useable Chunk servers. In that case, treat everything
+	 * like a usual retry.
+	 *
+	 * For the case of normal operation, we also set up a rescan, for now.
+	 * In the future, we'll subscribe for change notification. FIXME.
 	 */
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_get_y_cb;
-	copts.private = sp;
-	rc = cldc_get(sp->yfh, &copts, false);
-	if (rc) {
-		applog(LOG_ERR, "cldc_get(%s) call error: %d", sp->yfname, rc);
-	}
-	return 0;
-}
+	if (!stor_update_cb())
+		return 1;
 
-static int cldu_get_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
-{
-	struct cld_session *sp = carg->private;
-	struct cldc_call_opts copts;
-	int rc;
-	const char *ptr;
-	int len;
-
-	if (errc != CLE_OK) {
-		applog(LOG_ERR, "CLD get(%s) failed: %d", sp->yfname, errc);
-		goto close_and_next;	/* spaghetti */
-	}
-
-	ptr = carg->u.get.buf;
-	len = carg->u.get.size;
-	stor_parse(sp->yfname, ptr, len);
-
-close_and_next:
-	memset(&copts, 0, sizeof(copts));
-	copts.cb = cldu_close_y_cb;
-	copts.private = sp;
-	rc = cldc_close(sp->yfh, &copts);
-	if (rc) {
-		applog(LOG_ERR, "cldc_close call error %d", rc);
-	}
 	return 0;
 }
 
-static int cldu_close_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
+static void next_chunk(struct cld_session *sp, struct chunk_node *np)
 {
-	struct cld_session *sp = carg->private;
-	struct chunk_node *np;
-	// struct cldc_call_opts copts;
-	// int rc;
+	char *mem;
+	char *yfname;		/* /chunk-GROUP/NID file */
+	struct ncld_fh *yfh;	/* /chunk-GROUP/NID file */
+	struct ncld_read *nrp;
+	int error;
 
-	if (errc != CLE_OK) {
-		applog(LOG_ERR, "CLD close(%s) failed: %d", sp->yfname, errc);
-		return 0;
+	if (asprintf(&mem, "/chunk-%s/%s", sp->thisgroup, np->name) == -1) {
+		applog(LOG_WARNING, "OOM in cldu");
+		goto err_mem;
 	}
+	yfname = mem;
 
-	free(sp->yfname);
-	sp->yfname = NULL;
-
-	np = list_entry(sp->chunks.next, struct chunk_node, link);
-	list_del(&np->link);
-
-	if (!list_empty(&sp->chunks)) {
-		next_chunk(sp);
-		return 0;
+	yfh = ncld_open(sp->nsp, yfname, COM_READ, &error,
+			0 /* CE_MASTER_FAILOVER | CE_SESS_FAILED */,
+			NULL, NULL);
+	if (!yfh) {
+		applog(LOG_ERR, "CLD open(%s) failed: %d", yfname, error);
+		goto err_open;
 	}
 
 	/*
-	 * No more chunks to consider in this cycle, we're all done.
-	 * Now, poke the dispatch about the possible changes in the
-	 * configuration of Chunk.
-	 *
-	 * It's possible that the CLD directories are full of all garbage,
-	 * but no useable Chunk servers. In that case, treat everything
-	 * like a usual retry.
-	 *
-	 * For the case of normal operation, we also set up a rescan, for now.
-	 * In the future, we'll subscribe for change notification. FIXME.
+	 * Read the Chunk's parameter file.
 	 */
-	if (stor_update_cb()) {
-		evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
-	} else {
-		if (evtimer_add(&sp->tm_retry, &cldu_retry_delay) != 0) {
-			applog(LOG_ERR, "evtimer_add error %s",
-			       strerror(errno));
-		}
-	}
-	return 0;
+	nrp = ncld_get(yfh, &error);
+	if (!nrp) {
+		applog(LOG_ERR, "CLD get(%s) failed: %d", yfname, error);
+		goto err_get;
+	}
+	stor_parse(yfname, nrp->ptr, nrp->length);
+	ncld_read_free(nrp);
+	ncld_close(yfh);
+	free(yfname);
+	return;
+
+err_get:
+	ncld_close(yfh);
+err_open:
+	free(yfname);
+err_mem:
+	return;
 }
 
 /*
@@ -937,7 +572,7 @@ static struct cld_session ses;
  */
 void cld_init()
 {
-	cldc_init();
+	ncld_init();
 
 	// memset(&ses, 0, sizeof(struct cld_session));
 	INIT_LIST_HEAD(&ses.chunks);
@@ -949,10 +584,10 @@ void cld_init()
 int cld_begin(const char *thishost, const char *thisgroup)
 {
 	static struct cld_session *sp = &ses;
+	struct timespec tm;
+	int retry_cnt;
 
-	evtimer_set(&ses.tm_retry, cldu_tm_retry, &ses);
 	evtimer_set(&ses.tm_rescan, cldu_tm_rescan, &ses);
-	evtimer_set(&ses.tm_reopen, cldu_tm_reopen, &ses);
 
 	if (cldu_setgroup(sp, thisgroup, thishost)) {
 		/* Already logged error */
@@ -999,8 +634,21 @@ int cld_begin(const char *thishost, const char *thisgroup)
 		goto err_net;
 	}
 
+	retry_cnt = 0;
+	for (;;) {
+		if (!scan_chunks(sp))
+			break;
+		if (++retry_cnt == 5)
+			goto err_scan;
+		tm.tv_sec = 5;
+		tm.tv_nsec = 0;
+		nanosleep(&tm, NULL);
+	}
+
+	evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
 	return 0;
 
+err_scan:
 err_net:
 err_addr:
 err_group:
@@ -1034,12 +682,9 @@ void cld_end(void)
 	static struct cld_session *sp = &ses;
 	int i;
 
-	if (sp->lib) {
-		event_del(&sp->ev);
-		// if (sp->sess_open)	/* kill it always, include half-open */
-		cldc_kill_sess(sp->lib->sess);
-		cldc_udp_free(sp->lib);
-		sp->lib = NULL;
+	if (sp->nsp) {
+		ncld_sess_close(sp->nsp);
+		sp->nsp = NULL;
 	}
 
 	if (!sp->forced_hosts) {
@@ -1051,9 +696,7 @@ void cld_end(void)
 		}
 	}
 
-	evtimer_del(&sp->tm_retry);
 	evtimer_del(&sp->tm_rescan);
-	evtimer_del(&sp->tm_reopen);
 
 	free(sp->cfname);
 	sp->cfname = NULL;
@@ -1061,8 +704,6 @@ void cld_end(void)
 	sp->ffname = NULL;
 	free(sp->xfname);
 	sp->xfname = NULL;
-	free(sp->yfname);
-	sp->yfname = NULL;
 	free(sp->thisgroup);
 	sp->thisgroup = NULL;
 	free(sp->thishost);

^ permalink raw reply related	[flat|nested] 3+ messages in thread

* Re: [Patch 1/1] tabled: switch to ncld
  2010-02-09  4:42 [Patch 1/1] tabled: switch to ncld Pete Zaitcev
@ 2010-02-09  5:06 ` Jeff Garzik
  2010-02-14  2:44 ` Jeff Garzik
  1 sibling, 0 replies; 3+ messages in thread
From: Jeff Garzik @ 2010-02-09  5:06 UTC (permalink / raw)
  To: Pete Zaitcev; +Cc: Project Hail List

On 02/08/2010 11:42 PM, Pete Zaitcev wrote:
> No new function just yet, only a switch-over.
>
> Signed-Off-By: Pete Zaitcev<zaitcev@redhat.com>
>
> ---
>   server/cldu.c |  789 +++++++++++++-----------------------------------
>   1 file changed, 215 insertions(+), 574 deletions(-)

Looks good, but needs a rediff against the latest.  Colin's patch for 
the recent CLD API update went in on Sunday, though the repo was not 
pushed until recently.

	Jeff



^ permalink raw reply	[flat|nested] 3+ messages in thread

* Re: [Patch 1/1] tabled: switch to ncld
  2010-02-09  4:42 [Patch 1/1] tabled: switch to ncld Pete Zaitcev
  2010-02-09  5:06 ` Jeff Garzik
@ 2010-02-14  2:44 ` Jeff Garzik
  1 sibling, 0 replies; 3+ messages in thread
From: Jeff Garzik @ 2010-02-14  2:44 UTC (permalink / raw)
  To: Pete Zaitcev; +Cc: Project Hail List

On 02/08/2010 11:42 PM, Pete Zaitcev wrote:
> No new function just yet, only a switch-over.
>
> Signed-Off-By: Pete Zaitcev<zaitcev@redhat.com>
>
> ---
>   server/cldu.c |  789 +++++++++++++-----------------------------------
>   1 file changed, 215 insertions(+), 574 deletions(-)

I went back and used git to apply this patch to an earlier commit, and 
then merge the result into main branch of tabled.git.  It should be 
applied now, but please do verify that I merged everything OK.

^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2010-02-14  2:44 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2010-02-09  4:42 [Patch 1/1] tabled: switch to ncld Pete Zaitcev
2010-02-09  5:06 ` Jeff Garzik
2010-02-14  2:44 ` Jeff Garzik

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.