All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH 1/3] CLD: convert back to libevent
@ 2010-12-31 10:56 Jeff Garzik
  2010-12-31 10:57 ` [PATCH 2/3] CLD: switch network proto from UDP to TCP Jeff Garzik
  0 siblings, 1 reply; 6+ messages in thread
From: Jeff Garzik @ 2010-12-31 10:56 UTC (permalink / raw)
  To: hail-devel


Switch CLD from hand-rolled server poll code, to libevent.  Follows
similar techniques and rationale as chunkd commit
c1aed7464f237e5a6309351bf003162c77d69e27.  This reverts ancient commit
90b3b5edcf5aa00577f4395fdbb490ed7e9be824.

Signed-off-by: Jeff Garzik <jgarzik@redhat.com>
---
 cld/Makefile.am |    3 -
 cld/cld.h       |   22 +++----
 cld/server.c    |  161 ++++++++++++++++++++------------------------------------
 cld/session.c   |   69 ++++++++++++++++--------
 4 files changed, 118 insertions(+), 137 deletions(-)

diff --git a/cld/Makefile.am b/cld/Makefile.am
index 9a13ce0..30eea0b 100644
--- a/cld/Makefile.am
+++ b/cld/Makefile.am
@@ -12,7 +12,8 @@ cld_SOURCES	= cldb.h cld.h \
 		  cldb.c msg.c server.c session.c util.c
 cld_LDADD	= \
 		  ../lib/libhail.la @GLIB_LIBS@ @CRYPTO_LIBS@ \
-		  @SSL_LIBS@ @DB4_LIBS@ @XML_LIBS@ @LIBCURL@
+		  @SSL_LIBS@ @DB4_LIBS@ @XML_LIBS@ @LIBCURL@ \
+		  @EVENT_LIBS@
 
 cldbadm_SOURCES	= cldb.h cldbadm.c
 cldbadm_LDADD	= @CRYPTO_LIBS@ @GLIB_LIBS@ @DB4_LIBS@
diff --git a/cld/cld.h b/cld/cld.h
index 4c0099f..17f14b8 100644
--- a/cld/cld.h
+++ b/cld/cld.h
@@ -22,8 +22,9 @@
 
 #include <netinet/in.h>
 #include <sys/time.h>
-#include <poll.h>
+#include <event.h>
 #include <glib.h>
+#include <elist.h>
 #include "cldb.h"
 #include <cld_msg_rpc.h>
 #include <cld_common.h>
@@ -59,13 +60,13 @@ struct session {
 
 	uint64_t		last_contact;
 	uint64_t		next_fh;
-	struct cld_timer	timer;
+	struct event		timer;
 
 	uint64_t		next_seqid_in;
 	uint64_t		next_seqid_out;
 
 	GList			*out_q;		/* outgoing pkts (to client) */
-	struct cld_timer	retry_timer;
+	struct event		retry_timer;
 
 	char			user[CLD_MAX_USERNAME];
 
@@ -85,10 +86,10 @@ struct server_stats {
 	unsigned long		garbage;	/* num. garbage pkts dropped */
 };
 
-struct server_poll {
+struct server_socket {
 	int			fd;
-	bool			(*cb)(int fd, short events, void *userdata);
-	void			*userdata;
+	struct event		ev;
+	struct list_head	sockets_node;
 };
 
 struct server {
@@ -103,14 +104,13 @@ struct server {
 
 	struct cldb		cldb;		/* database info */
 
-	GArray			*polls;
-	GArray			*poll_data;
+	struct event_base	*evbase_main;
 
-	GHashTable		*sessions;
+	struct list_head	sockets;
 
-	struct cld_timer_list	timers;
+	GHashTable		*sessions;
 
-	struct cld_timer	chkpt_timer;	/* db4 checkpoint timer */
+	struct event		chkpt_timer;	/* db4 checkpoint timer */
 
 	struct server_stats	stats;		/* global statistics */
 };
diff --git a/cld/server.c b/cld/server.c
index 7a57785..aed501b 100644
--- a/cld/server.c
+++ b/cld/server.c
@@ -559,7 +559,7 @@ static void simple_sendresp(int sock_fd, const struct client *cli,
 		       info->op);
 }
 
-static bool udp_srv_event(int fd, short events, void *userdata)
+static void udp_srv_event(int fd, short events, void *userdata)
 {
 	struct client cli;
 	char host[64];
@@ -586,7 +586,7 @@ static bool udp_srv_event(int fd, short events, void *userdata)
 	rrc = recvmsg(fd, &hdr, 0);
 	if (rrc < 0) {
 		syslogerr("UDP recvmsg");
-		return true; /* continue main loop; do NOT terminate server */
+		return;
 	}
 	cli.addr_len = hdr.msg_namelen;
 
@@ -601,59 +601,60 @@ static bool udp_srv_event(int fd, short events, void *userdata)
 
 	if (!parse_pkt_header(raw_pkt, rrc, &pkt, &hdr_len)) {
 		cld_srv.stats.garbage++;
-		return true;
+		return;
 	}
 
 	if (!get_pkt_info(&pkt, raw_pkt, rrc, hdr_len, &info)) {
 		xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
 		cld_srv.stats.garbage++;
-		return true;
+		return;
 	}
 
 	if (packet_is_dupe(&info)) {
 		/* silently drop dupes */
 		xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
-		return true;
+		return;
 	}
 
 	err = validate_pkt_session(&info, &cli);
 	if (err) {
 		simple_sendresp(fd, &cli, &info, err);
 		xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
-		return true;
+		return;
 	}
 
 	err = pkt_chk_sig(raw_pkt, rrc, &pkt);
 	if (err) {
 		simple_sendresp(fd, &cli, &info, err);
 		xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
-		return true;
+		return;
 	}
 
 	if (!(cld_srv.cldb.is_master && cld_srv.cldb.up)) {
 		simple_sendmsg(fd, &cli, pkt.sid, pkt.user, 0xdeadbeef,
 			       (xdrproc_t)xdr_void, NULL, CMO_NOT_MASTER);
 		xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
-		return true;
+		return;
 	}
 
 	err = udp_rx(fd, &cli, &info, raw_pkt, rrc);
 	if (err) {
 		simple_sendresp(fd, &cli, &info, err);
 		xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
-		return true;
+		return;
 	}
 	xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
-	return true;
 }
 
 static void add_chkpt_timer(void)
 {
-	cld_timer_add(&cld_srv.timers, &cld_srv.chkpt_timer,
-		      time(NULL) + CLD_CHKPT_SEC);
+	struct timeval tv = { .tv_sec = CLD_CHKPT_SEC };
+
+	if (evtimer_add(&cld_srv.chkpt_timer, &tv) < 0)
+		HAIL_WARN(&srv_log, "chkpt timer add failed");
 }
 
-static void cldb_checkpoint(struct cld_timer *timer)
+static void cldb_checkpoint(int fd, short events, void *userdata)
 {
 	DB_ENV *dbenv = cld_srv.cldb.env;
 	int rc;
@@ -690,28 +691,28 @@ static int net_write_port(const char *port_file, const char *port_str)
 
 static void net_close(void)
 {
-	struct pollfd *pfd;
-	int i;
-
-	if (!cld_srv.polls)
-		return;
-
-	for (i = 0; i < cld_srv.polls->len; i++) {
-		pfd = &g_array_index(cld_srv.polls, struct pollfd, i);
-		if (pfd->fd >= 0) {
-			if (close(pfd->fd) < 0)
-				HAIL_WARN(&srv_log, "%s(%d): %s",
-					  __func__, pfd->fd, strerror(errno));
-			pfd->fd = -1;
+	struct server_socket *tmp, *iter;
+
+	list_for_each_entry_safe(tmp, iter, &cld_srv.sockets, sockets_node) {
+		if (tmp->fd >= 0) {
+			if (event_del(&tmp->ev) < 0)
+				HAIL_WARN(&srv_log, "Event delete(%d) failed",
+					  tmp->fd);
+			if (close(tmp->fd) < 0)
+				HAIL_WARN(&srv_log, "Close(%d) failed: %s",
+					  tmp->fd, strerror(errno));
+			tmp->fd = -1;
 		}
+
+		list_del(&tmp->sockets_node);
+		free(tmp);
 	}
 }
 
 static int net_open_socket(int addr_fam, int sock_type, int sock_prot,
 			   int addr_len, void *addr_ptr)
 {
-	struct server_poll sp;
-	struct pollfd pfd;
+	struct server_socket *sock;
 	int fd, rc;
 
 	fd = socket(addr_fam, sock_type, sock_prot);
@@ -732,15 +733,25 @@ static int net_open_socket(int addr_fam, int sock_type, int sock_prot,
 		return -errno;
 	}
 
-	sp.fd = fd;
-	sp.cb = udp_srv_event;
-	sp.userdata = NULL;
-	g_array_append_val(cld_srv.poll_data, sp);
+	sock = calloc(1, sizeof(*sock));
+	if (!sock) {
+		close(fd);
+		return -ENOMEM;
+	}
+
+	sock->fd = fd;
+	INIT_LIST_HEAD(&sock->sockets_node);
+
+	event_set(&sock->ev, fd, EV_READ | EV_PERSIST,
+		  udp_srv_event, sock);
+
+	if (event_add(&sock->ev, NULL) < 0) {
+		free(sock);
+		close(fd);
+		return -EIO;
+	}
 
-	pfd.fd = fd;
-	pfd.events = POLLIN;
-	pfd.revents = 0;
-	g_array_append_val(cld_srv.polls, pfd);
+	list_add_tail(&sock->sockets_node, &cld_srv.sockets);
 
 	return fd;
 }
@@ -891,11 +902,13 @@ static void segv_signal(int signo)
 static void term_signal(int signo)
 {
 	server_running = false;
+	event_loopbreak();
 }
 
 static void stats_signal(int signo)
 {
 	dump_stats = true;
+	event_loopbreak();
 }
 
 #define X(stat) \
@@ -975,73 +988,16 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
 
 static int main_loop(void)
 {
-	time_t next_timeout;
-
-	next_timeout = cld_timers_run(&cld_srv.timers);
-
 	while (server_running) {
-		struct pollfd *pfd;
-		int i, fired, rc;
-
 		cld_srv.stats.poll++;
-
-		/* poll for fd activity, or next timer event */
-		rc = poll(&g_array_index(cld_srv.polls, struct pollfd, 0),
-			  cld_srv.polls->len,
-			  next_timeout ? (next_timeout * 1000) : -1);
-		if (rc < 0) {
-			syslogerr("poll");
-			if (errno != EINTR)
-				break;
-		}
+		event_dispatch();
 
 		gettimeofday(&current_time, NULL);
 
-		/* determine which fd's fired; call their callbacks */
-		fired = 0;
-		for (i = 0; i < cld_srv.polls->len; i++) {
-			struct server_poll *sp;
-			bool runrunrun;
-			short revents;
-
-			/* ref pollfd struct */
-			pfd = &g_array_index(cld_srv.polls, struct pollfd, i);
-
-			/* if no events fired, move on to next */
-			if (!pfd->revents)
-				continue;
-
-			fired++;
-
-			revents = pfd->revents;
-			pfd->revents = 0;
-
-			/* ref 1:1 matching server_poll struct */
-			sp = &g_array_index(cld_srv.poll_data,
-					    struct server_poll, i);
-
-			cld_srv.stats.event++;
-
-			/* call callback, shutting down server if requested */
-			runrunrun = sp->cb(sp->fd, revents, sp->userdata);
-			if (!runrunrun) {
-				server_running = false;
-				break;
-			}
-
-			/* if we reached poll(2) activity count, it is
-			 * pointless to continue looping
-			 */
-			if (fired == rc)
-				break;
-		}
-
 		if (dump_stats) {
 			dump_stats = false;
 			stats_dump();
 		}
-
-		next_timeout = cld_timers_run(&cld_srv.timers);
 	}
 
 	return 0;
@@ -1052,6 +1008,8 @@ int main (int argc, char *argv[])
 	error_t aprc;
 	int rc = 1;
 
+	INIT_LIST_HEAD(&cld_srv.sockets);
+
 	/* isspace() and strcasecmp() consistency requires this */
 	setlocale(LC_ALL, "C");
 
@@ -1075,6 +1033,8 @@ int main (int argc, char *argv[])
 	if (use_syslog)
 		openlog(PROGRAM_NAME, LOG_PID, LOG_LOCAL3);
 
+	cld_srv.evbase_main = event_init();
+
 	if (!(cld_srv.flags & SFL_FOREGROUND) && (daemon(1, !use_syslog) < 0)) {
 		syslogerr("daemon");
 		goto err_out;
@@ -1103,17 +1063,13 @@ int main (int argc, char *argv[])
 
 	ensure_root();
 
-	cld_timer_init(&cld_srv.chkpt_timer, "db4-checkpoint",
-		       cldb_checkpoint, NULL);
+	evtimer_set(&cld_srv.chkpt_timer, cldb_checkpoint, NULL);
 	add_chkpt_timer();
 
 	rc = 1;
 
 	cld_srv.sessions = g_hash_table_new(sess_hash, sess_equal);
-	cld_srv.poll_data = g_array_sized_new(FALSE, FALSE,
-					   sizeof(struct server_poll), 4);
-	cld_srv.polls = g_array_sized_new(FALSE,FALSE,sizeof(struct pollfd), 4);
-	if (!cld_srv.sessions || !cld_srv.poll_data || !cld_srv.polls)
+	if (!cld_srv.sessions)
 		goto err_out_pid;
 
 	if (sess_load(cld_srv.sessions) != 0)
@@ -1137,7 +1093,8 @@ int main (int argc, char *argv[])
 	HAIL_INFO(&srv_log, "shutting down");
 
 	if (strict_free)
-		cld_timer_del(&cld_srv.timers, &cld_srv.chkpt_timer);
+		if (evtimer_del(&cld_srv.chkpt_timer) < 0)
+			HAIL_WARN(&srv_log, "chkpt timer del failed");
 
 	if (cld_srv.cldb.up)
 		cldb_down(&cld_srv.cldb);
@@ -1149,8 +1106,6 @@ err_out_pid:
 err_out:
 	if (strict_free) {
 		net_close();
-		g_array_free(cld_srv.polls, TRUE);
-		g_array_free(cld_srv.poll_data, TRUE);
 		sessions_free();
 		g_hash_table_unref(cld_srv.sessions);
 	}
diff --git a/cld/session.c b/cld/session.c
index d76186b..9887aaa 100644
--- a/cld/session.c
+++ b/cld/session.c
@@ -43,8 +43,8 @@ struct session_outpkt {
 	void			*done_data;
 };
 
-static void session_retry(struct cld_timer *);
-static void session_timeout(struct cld_timer *);
+static void session_retry(int, short, void *);
+static void session_timeout(int, short, void *);
 static int sess_load_db(GHashTable *ss, DB_TXN *txn);
 static void op_unref(struct session_outpkt *op);
 
@@ -87,8 +87,8 @@ static struct session *session_new(void)
 
 	cld_rand64(&sess->next_seqid_out);
 
-	cld_timer_init(&sess->timer, "session-timeout", session_timeout, sess);
-	cld_timer_init(&sess->retry_timer, "session-retry", session_retry, sess);
+	evtimer_set(&sess->timer, session_timeout, sess);
+	evtimer_set(&sess->retry_timer, session_retry, sess);
 
 	return sess;
 }
@@ -103,8 +103,10 @@ static void session_free(struct session *sess, bool hash_remove)
 	if (hash_remove)
 		g_hash_table_remove(cld_srv.sessions, sess->sid);
 
-	cld_timer_del(&cld_srv.timers, &sess->timer);
-	cld_timer_del(&cld_srv.timers, &sess->retry_timer);
+	if (evtimer_del(&sess->timer) < 0)
+		HAIL_ERR(&srv_log, "sess timer del failed");
+	if (evtimer_del(&sess->retry_timer) < 0)
+		HAIL_ERR(&srv_log, "sess retry timer del failed");
 
 	tmp = sess->out_q;
 	while (tmp) {
@@ -376,9 +378,9 @@ static void session_ping_done(struct session_outpkt *outpkt)
 	outpkt->sess->ping_open = false;
 }
 
-static void session_timeout(struct cld_timer *timer)
+static void session_timeout(int fd, short events, void *userdata)
 {
-	struct session *sess = timer->userdata;
+	struct session *sess = userdata;
 	uint64_t sess_expire;
 	int rc;
 	DB_ENV *dbenv = cld_srv.cldb.env;
@@ -387,6 +389,8 @@ static void session_timeout(struct cld_timer *timer)
 
 	sess_expire = sess->last_contact + CLD_SESS_TIMEOUT;
 	if (!sess->dead && (sess_expire > now)) {
+		struct timeval tv;
+
 		if (!sess->ping_open &&
 		    (sess_expire > (sess->last_contact + (CLD_SESS_TIMEOUT / 2) &&
 		    (sess->sock_fd > 0)))) {
@@ -396,9 +400,12 @@ static void session_timeout(struct cld_timer *timer)
 				     session_ping_done, NULL);
 		}
 
-		cld_timer_add(&cld_srv.timers, &sess->timer,
-			      now + ((sess_expire - now) / 2) + 1);
-		return;	/* timer added; do not time out session */
+		tv.tv_sec = ((sess_expire - now) / 2) + 1;
+		tv.tv_usec = 0;
+		if (evtimer_add(&sess->timer, &tv) < 0)
+			HAIL_ERR(&srv_log, "timer add failed, sid " SIDFMT,
+				 SIDARG(sess->sid));
+		return;		/* timer added; do not time out session */
 	}
 
 	HAIL_INFO(&srv_log, "session %s, addr %s sid " SIDFMT,
@@ -554,25 +561,33 @@ static int sess_retry_output(struct session *sess, time_t *next_retry_out)
 	return rc;
 }
 
-static void session_retry(struct cld_timer *timer)
+static void session_retry(int fd, short events, void *userdata)
 {
-	struct session *sess = timer->userdata;
+	struct session *sess = userdata;
 	time_t next_retry;
+	time_t now = time(NULL);
+	struct timeval tv;
 
 	if (!sess->out_q)
 		return;
 
 	sess_retry_output(sess, &next_retry);
 
-	cld_timer_add(&cld_srv.timers, &sess->retry_timer, next_retry);
+	tv.tv_sec = next_retry - now;
+	tv.tv_usec = 0;
+
+	if (evtimer_add(&sess->retry_timer, &tv) < 0)
+		HAIL_ERR(&srv_log, "retry timer re-add failed");
 }
 
 static void session_outq(struct session *sess, GList *new_pkts)
 {
 	/* if out_q empty, start retry timer */
-	if (!sess->out_q)
-		cld_timer_add(&cld_srv.timers, &sess->retry_timer,
-			      time(NULL) + CLD_RETRY_START);
+	if (!sess->out_q) {
+		struct timeval tv = { .tv_sec = CLD_RETRY_START };
+		if (evtimer_add(&sess->retry_timer, &tv) < 0)
+			HAIL_ERR(&srv_log, "retry timer start failed");
+	}
 
 	sess->out_q = g_list_concat(sess->out_q, new_pkts);
 }
@@ -766,7 +781,8 @@ void msg_ack(struct session *sess, uint64_t seqid)
 	}
 
 	if (!sess->out_q)
-		cld_timer_del(&cld_srv.timers, &sess->retry_timer);
+		if (evtimer_del(&sess->retry_timer) < 0)
+			HAIL_ERR(&srv_log, "sess retry timer del 2 failed");
 }
 
 void msg_new_sess(int sock_fd, const struct client *cli,
@@ -780,6 +796,7 @@ void msg_new_sess(int sock_fd, const struct client *cli,
 	int rc;
 	enum cle_err_codes resp_rc = CLE_OK;
 	struct cld_msg_generic_resp resp;
+	struct timeval tv;
 
 	sess = session_new();
 	if (!sess) {
@@ -832,8 +849,10 @@ void msg_new_sess(int sock_fd, const struct client *cli,
 	g_hash_table_insert(cld_srv.sessions, sess->sid, sess);
 
 	/* begin session timer */
-	cld_timer_add(&cld_srv.timers, &sess->timer,
-		      time(NULL) + (CLD_SESS_TIMEOUT / 2));
+	tv.tv_sec = CLD_SESS_TIMEOUT / 2;
+	tv.tv_usec = 0;
+	if (evtimer_add(&sess->timer, &tv) < 0)
+		HAIL_ERR(&srv_log, "sess timer start failed");
 
 	/* send new-sess reply */
 	resp.code = CLE_OK;
@@ -933,6 +952,8 @@ static int sess_load_db(GHashTable *ss, DB_TXN *txn)
 	val.flags = DB_DBT_USERMEM;
 
 	while (1) {
+		struct timeval tv;
+
 		rc = cur->get(cur, &key, &val, DB_NEXT);
 		if (rc == DB_NOTFOUND)
 			break;
@@ -960,8 +981,12 @@ static int sess_load_db(GHashTable *ss, DB_TXN *txn)
 		g_hash_table_insert(ss, sess->sid, sess);
 
 		/* begin session timer */
-		cld_timer_add(&cld_srv.timers, &sess->timer,
-			      time(NULL) + (CLD_SESS_TIMEOUT / 2));
+		tv.tv_sec = CLD_SESS_TIMEOUT / 2;
+		tv.tv_usec = 0;
+		if (evtimer_add(&sess->timer, &tv) < 0) {
+			HAIL_ERR(&srv_log, "sess timer loop start failed");
+			break;
+		}
 	}
 
 	cur->close(cur);

^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [PATCH 2/3] CLD: switch network proto from UDP to TCP
  2010-12-31 10:56 [PATCH 1/3] CLD: convert back to libevent Jeff Garzik
@ 2010-12-31 10:57 ` Jeff Garzik
  2010-12-31 10:58   ` [PATCH 3/3] CLD: enable replication on server and client Jeff Garzik
  2011-01-02 23:32   ` [PATCH 2/3] CLD: switch network proto from UDP to TCP Pete Zaitcev
  0 siblings, 2 replies; 6+ messages in thread
From: Jeff Garzik @ 2010-12-31 10:57 UTC (permalink / raw)
  To: hail-devel


Convert CLD network protocol from UDP to TCP.  Server, client lib,
and chunkd's cldu module are all updated.  tabled's cldu module must
be updated also.

The original rationale for UDP use was following Google's lead, based
on the advice in the original Chubby paper, describing TCP's back-off
policies and other behavior during times of high network congestion.

This seems a bit dubious without further third party evidence, and TCP
vastly simplifies our lives.  While the code remains open and modular
enough to support other protocols (hopefully RDMA or SCTP one day),
this upgrade from UDP to TCP promises to make the current codebase
much easier to use, while avoiding the "reinvent TCP, by using UDP"
problem, which was a rabbit hole threatening CLD.

Signed-off-by: Jeff Garzik <jgarzik@redhat.com>
---
 chunkd/cldu.c        |    6 
 cld/cld.h            |   43 ++++++
 cld/msg.c            |    4 
 cld/server.c         |  356 ++++++++++++++++++++++++++++++++++++---------------
 cld/session.c        |    4 
 configure.ac         |    1 
 include/Makefile.am  |    2 
 include/cld_common.h |    4 
 include/cldc.h       |   24 ++-
 include/ncld.h       |    4 
 include/ubbp.h       |   52 +++++++
 lib/Makefile.am      |    2 
 lib/cldc-dns.c       |    2 
 lib/cldc-tcp.c       |  185 ++++++++++++++++++++++++++
 lib/cldc-udp.c       |  141 --------------------
 lib/cldc.c           |   54 +++----
 16 files changed, 595 insertions(+), 289 deletions(-)

diff --git a/chunkd/cldu.c b/chunkd/cldu.c
index 026c523..41f94b5 100644
--- a/chunkd/cldu.c
+++ b/chunkd/cldu.c
@@ -165,7 +165,7 @@ static void cldu_sess_event(void *priv, uint32_t what)
 		 */
 		if (cs->nsess) {
 			applog(LOG_ERR, "Session failed, sid " SIDFMT,
-			       SIDARG(cs->nsess->udp->sess->sid));
+			       SIDARG(cs->nsess->tcp->sess->sid));
 		} else {
 			applog(LOG_ERR, "Session open failed");
 		}
@@ -177,7 +177,7 @@ static void cldu_sess_event(void *priv, uint32_t what)
 	} else {
 		if (cs)
 			applog(LOG_INFO, "cldc event 0x%x sid " SIDFMT,
-			       what, SIDARG(cs->nsess->udp->sess->sid));
+			       what, SIDARG(cs->nsess->tcp->sess->sid));
 		else
 			applog(LOG_INFO, "cldc event 0x%x no sid", what);
 	}
@@ -372,7 +372,7 @@ static int cldu_set_cldc(struct cld_session *cs, int newactive)
 	}
 
 	applog(LOG_INFO, "New CLD session created, sid " SIDFMT,
-	       SIDARG(cs->nsess->udp->sess->sid));
+	       SIDARG(cs->nsess->tcp->sess->sid));
 
 	/*
 	 * First, make sure the base directory exists.
diff --git a/cld/cld.h b/cld/cld.h
index 17f14b8..b1f9bbf 100644
--- a/cld/cld.h
+++ b/cld/cld.h
@@ -30,6 +30,7 @@
 #include <cld_common.h>
 #include <hail_log.h>
 #include <hail_private.h>
+#include <ubbp.h>
 
 struct client;
 struct session_outpkt;
@@ -43,10 +44,39 @@ enum {
 	SFL_FOREGROUND		= (1 << 0),	/* run in foreground */
 };
 
+struct atcp_read {
+	void			*buf;
+	unsigned int		buf_size;
+	unsigned int		bytes_wanted;
+	unsigned int		bytes_read;
+
+	void			(*cb)(void *, bool);
+	void			*cb_data;
+
+	struct list_head	node;
+};
+
+struct atcp_read_state {
+	struct list_head	q;
+};
+
 struct client {
+	int			fd;
+
+	struct event		ev;
+	short			ev_mask;	/* EV_READ and/or EV_WRITE */
+
 	struct sockaddr_in6	addr;		/* inet address */
 	socklen_t		addr_len;	/* inet address len */
 	char			addr_host[64];	/* ASCII version of inet addr */
+	char			addr_port[16];	/* ASCII version of inet addr */
+
+	struct atcp_read_state	rst;
+
+	struct ubbp_header	ubbp;
+
+	char			raw_pkt[CLD_RAW_MSG_SZ];
+	unsigned int		raw_size;
 };
 
 struct session {
@@ -124,6 +154,17 @@ struct pkt_info {
 	size_t			hdr_len;
 };
 
+#define ___constant_swab32(x) ((uint32_t)(                       \
+        (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) |            \
+        (((uint32_t)(x) & (uint32_t)0x0000ff00UL) <<  8) |            \
+        (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >>  8) |            \
+        (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
+
+static inline uint32_t swab32(uint32_t v)
+{
+	return ___constant_swab32(v);
+}
+
 /* msg.c */
 extern int inode_lock_rescan(DB_TXN *txn, cldino_t inum);
 extern void msg_get(struct session *sess, const void *v);
@@ -178,7 +219,7 @@ extern int sess_load(GHashTable *ss);
 extern struct server cld_srv;
 extern struct hail_log srv_log;
 extern struct timeval current_time;
-extern int udp_tx(int sock_fd, struct sockaddr *, socklen_t,
+extern int tcp_tx(int sock_fd, struct sockaddr *, socklen_t,
 		  const void *, size_t);
 extern const char *user_key(const char *user);
 
diff --git a/cld/msg.c b/cld/msg.c
index dd8cf03..8d83e74 100644
--- a/cld/msg.c
+++ b/cld/msg.c
@@ -29,10 +29,6 @@
 #include <cld_msg_rpc.h>
 #include "cld.h"
 
-enum {
-	CLD_MAX_UDP_SEG		= 1024,
-};
-
 struct pathname_info {
 	const char	*dir;
 	size_t		dir_len;
diff --git a/cld/server.c b/cld/server.c
index aed501b..5a73e54 100644
--- a/cld/server.c
+++ b/cld/server.c
@@ -33,6 +33,7 @@
 #include <netdb.h>
 #include <signal.h>
 #include <netinet/in.h>
+#include <netinet/tcp.h>
 #include <openssl/sha.h>
 #include <openssl/hmac.h>
 #include <cld-private.h>
@@ -46,10 +47,6 @@
 
 const char *argp_program_version = PACKAGE_VERSION;
 
-enum {
-	CLD_RAW_MSG_SZ		= 4096,
-};
-
 static struct argp_option options[] = {
 	{ "data", 'd', "DIRECTORY", 0,
 	  "Store database environment in DIRECTORY.  Default: "
@@ -61,7 +58,7 @@ static struct argp_option options[] = {
 	{ "foreground", 'F', NULL, 0,
 	  "Run in foreground, do not fork" },
 	{ "port", 'p', "PORT", 0,
-	  "Bind to UDP port PORT.  Default: " CLD_DEF_PORT },
+	  "Bind to TCP port PORT.  Default: " CLD_DEF_PORT },
 	{ "pid", 'P', "FILE", 0,
 	  "Write daemon process id to FILE.  Default: " CLD_DEF_PIDFN },
 	{ "strict-free", 1001, NULL, 0,
@@ -93,6 +90,11 @@ struct server cld_srv = {
 };
 
 static void ensure_root(void);
+static bool atcp_read(struct atcp_read_state *rst,
+		      void *buf, unsigned int buf_size,
+		      void (*cb)(void *, bool), void *cb_data);
+static void cli_free(struct client *cli);
+static void cli_rd_ubbp(void *userdata, bool success);
 
 static void applog(int prio, const char *fmt, ...)
 {
@@ -119,12 +121,28 @@ struct hail_log srv_log = {
 	.func = applog,
 };
 
-int udp_tx(int sock_fd, struct sockaddr *addr, socklen_t addr_len,
+int tcp_tx(int sock_fd, struct sockaddr *addr, socklen_t addr_len,
 	   const void *data, size_t data_len)
 {
 	ssize_t src;
+	struct ubbp_header ubbp;
+
+	memcpy(ubbp.magic, "CLD1", 4);
+	ubbp.op_size = (data_len << 8) | 2;
+#ifdef WORDS_BIGENDIAN
+	swab32(ubbp.op_size);
+#endif
+
+	src = write(sock_fd, &ubbp, sizeof(ubbp));
+	if (src < 0 && errno != EAGAIN)
+		HAIL_ERR(&srv_log, "%s sendto (fd %d, data_len %u): %s",
+			 __func__, sock_fd, (unsigned int) data_len,
+			 strerror(errno));
 
-	src = sendto(sock_fd, data, data_len, 0, addr, addr_len);
+	if (src < 0)
+		return -errno;
+	
+	src = write(sock_fd, data, data_len);
 	if (src < 0 && errno != EAGAIN)
 		HAIL_ERR(&srv_log, "%s sendto (fd %d, data_len %u): %s",
 			 __func__, sock_fd, (unsigned int) data_len,
@@ -148,7 +166,7 @@ const char *user_key(const char *user)
 	return user;	/* our secret key */
 }
 
-static int udp_rx_handle(struct session *sess,
+static int tcp_rx_handle(struct session *sess,
 			 void (*msg_handler)(struct session *, const void *),
 			 xdrproc_t xdrproc, void *xdrdata)
 {
@@ -167,9 +185,9 @@ static int udp_rx_handle(struct session *sess,
 	return 0;
 }
 
-/** Recieve a UDP packet
+/** Recieve a TCP packet
  *
- * @param sock_fd	The UDP socket we received the packet on
+ * @param sock_fd	The TCP socket we received the packet on
  * @param cli		Client address data
  * @param info		Packet information
  * @param raw_pkt	The raw packet buffer
@@ -178,7 +196,7 @@ static int udp_rx_handle(struct session *sess,
  * @return		An error code if we should send an error message
  *			response. CLE_OK if we are done.
  */
-static enum cle_err_codes udp_rx(int sock_fd, const struct client *cli,
+static enum cle_err_codes tcp_rx(int sock_fd, const struct client *cli,
 				 struct pkt_info *info, const char *raw_pkt,
 				 size_t raw_len)
 {
@@ -230,39 +248,39 @@ static enum cle_err_codes udp_rx(int sock_fd, const struct client *cli,
 		/* fall through */
 	case CMO_GET_META: {
 		struct cld_msg_get get = {0};
-		return udp_rx_handle(sess, msg_get,
+		return tcp_rx_handle(sess, msg_get,
 				     (xdrproc_t)xdr_cld_msg_get, &get);
 	}
 	case CMO_OPEN: {
 		struct cld_msg_open open_msg = {0};
-		return udp_rx_handle(sess, msg_open,
+		return tcp_rx_handle(sess, msg_open,
 				     (xdrproc_t)xdr_cld_msg_open, &open_msg);
 	}
 	case CMO_PUT: {
 		struct cld_msg_put put = {0};
-		return udp_rx_handle(sess, msg_put,
+		return tcp_rx_handle(sess, msg_put,
 				     (xdrproc_t)xdr_cld_msg_put, &put);
 	}
 	case CMO_CLOSE: {
 		struct cld_msg_close close_msg = {0};
-		return udp_rx_handle(sess, msg_close,
+		return tcp_rx_handle(sess, msg_close,
 				     (xdrproc_t)xdr_cld_msg_close, &close_msg);
 	}
 	case CMO_DEL: {
 		struct cld_msg_del del = {0};
-		return udp_rx_handle(sess, msg_del,
+		return tcp_rx_handle(sess, msg_del,
 				     (xdrproc_t)xdr_cld_msg_del, &del);
 	}
 	case CMO_UNLOCK: {
 		struct cld_msg_unlock unlock = {0};
-		return udp_rx_handle(sess, msg_unlock,
+		return tcp_rx_handle(sess, msg_unlock,
 				     (xdrproc_t)xdr_cld_msg_unlock, &unlock);
 	}
 	case CMO_TRYLOCK:
 		/* fall through */
 	case CMO_LOCK: {
 		struct cld_msg_lock lock = {0};
-		return udp_rx_handle(sess, msg_lock,
+		return tcp_rx_handle(sess, msg_lock,
 				     (xdrproc_t)xdr_cld_msg_lock, &lock);
 	}
 	case CMO_ACK:
@@ -452,31 +470,6 @@ static enum cle_err_codes pkt_chk_sig(const char *raw_pkt, int raw_len,
 	return 0;
 }
 
-/** Check if this packet is a duplicate
- *
- * @param info		Packet info
- *
- * @return		true only if the packet is a duplicate
- */
-static bool packet_is_dupe(const struct pkt_info *info)
-{
-	if (!info->sess)
-		return false;
-	if (info->op == CMO_ACK)
-		return false;
-
-	/* Check sequence IDs to discover if this packet is a dupe */
-	if (info->seqid != info->sess->next_seqid_in) {
-		HAIL_DEBUG(&srv_log, "dropping dup with seqid %llu "
-			   "(expected %llu).",
-			   (unsigned long long) info->seqid,
-			   (unsigned long long) info->sess->next_seqid_in);
-		return true;
-	}
-
-	return false;
-}
-
 void simple_sendmsg(int fd, const struct client *cli,
 		    uint64_t sid, const char *user, uint64_t seqid,
 		    xdrproc_t xdrproc, const void *xdrdata, enum cld_msg_op op)
@@ -541,7 +534,7 @@ void simple_sendmsg(int fd, const struct client *cli,
 		HAIL_ERR(&srv_log, "%s: authsign failed: %d",
 			 __func__, auth_rc);
 
-	udp_tx(fd, (struct sockaddr *) &cli->addr, cli->addr_len,
+	tcp_tx(fd, (struct sockaddr *) &cli->addr, cli->addr_len,
 		buf, buf_len);
 }
 
@@ -559,91 +552,88 @@ static void simple_sendresp(int sock_fd, const struct client *cli,
 		       info->op);
 }
 
-static void udp_srv_event(int fd, short events, void *userdata)
+static void cli_rd_pkt(void *userdata, bool success)
 {
-	struct client cli;
-	char host[64];
+	struct client *cli = userdata;
+	int fd = cli->fd;
 	ssize_t rrc, hdr_len;
-	struct msghdr hdr;
-	struct iovec iov[2];
-	char raw_pkt[CLD_RAW_MSG_SZ], ctl_msg[CLD_RAW_MSG_SZ];
 	struct cld_pkt_hdr pkt;
 	struct pkt_info info;
 	enum cle_err_codes err;
 
-	memset(&cli, 0, sizeof(cli));
-
-	iov[0].iov_base = raw_pkt;
-	iov[0].iov_len = sizeof(raw_pkt);
-
-	hdr.msg_name = &cli.addr;
-	hdr.msg_namelen = sizeof(cli.addr);
-	hdr.msg_iov = iov;
-	hdr.msg_iovlen = 1;
-	hdr.msg_control = ctl_msg;
-	hdr.msg_controllen = sizeof(ctl_msg);
-
-	rrc = recvmsg(fd, &hdr, 0);
-	if (rrc < 0) {
-		syslogerr("UDP recvmsg");
-		return;
-	}
-	cli.addr_len = hdr.msg_namelen;
-
-	/* pretty-print incoming cxn info */
-	getnameinfo((struct sockaddr *) &cli.addr, cli.addr_len,
-		    host, sizeof(host), NULL, 0, NI_NUMERICHOST);
-	host[sizeof(host) - 1] = 0;
+	rrc = cli->raw_size;
 
-	strcpy(cli.addr_host, host);
+	HAIL_DEBUG(&srv_log, "client %s message (%d bytes)",
+		   cli->addr_host, (int) rrc);
 
-	HAIL_DEBUG(&srv_log, "client %s message (%d bytes)", host, (int) rrc);
-
-	if (!parse_pkt_header(raw_pkt, rrc, &pkt, &hdr_len)) {
+	if (!parse_pkt_header(cli->raw_pkt, rrc, &pkt, &hdr_len)) {
 		cld_srv.stats.garbage++;
 		return;
 	}
 
-	if (!get_pkt_info(&pkt, raw_pkt, rrc, hdr_len, &info)) {
+	if (!get_pkt_info(&pkt, cli->raw_pkt, rrc, hdr_len, &info)) {
 		xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
 		cld_srv.stats.garbage++;
 		return;
 	}
 
-	if (packet_is_dupe(&info)) {
-		/* silently drop dupes */
-		xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
-		return;
-	}
-
-	err = validate_pkt_session(&info, &cli);
+	err = validate_pkt_session(&info, cli);
 	if (err) {
-		simple_sendresp(fd, &cli, &info, err);
+		simple_sendresp(fd, cli, &info, err);
 		xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
 		return;
 	}
 
-	err = pkt_chk_sig(raw_pkt, rrc, &pkt);
+	err = pkt_chk_sig(cli->raw_pkt, rrc, &pkt);
 	if (err) {
-		simple_sendresp(fd, &cli, &info, err);
+		simple_sendresp(fd, cli, &info, err);
 		xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
 		return;
 	}
 
 	if (!(cld_srv.cldb.is_master && cld_srv.cldb.up)) {
-		simple_sendmsg(fd, &cli, pkt.sid, pkt.user, 0xdeadbeef,
+		simple_sendmsg(fd, cli, pkt.sid, pkt.user, 0xdeadbeef,
 			       (xdrproc_t)xdr_void, NULL, CMO_NOT_MASTER);
 		xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
 		return;
 	}
 
-	err = udp_rx(fd, &cli, &info, raw_pkt, rrc);
+	err = tcp_rx(fd, cli, &info, cli->raw_pkt, rrc);
 	if (err) {
-		simple_sendresp(fd, &cli, &info, err);
+		simple_sendresp(fd, cli, &info, err);
 		xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
 		return;
 	}
 	xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
+
+	atcp_read(&cli->rst, &cli->ubbp, sizeof(cli->ubbp),
+		  cli_rd_ubbp, cli);
+}
+
+static void cli_rd_ubbp(void *userdata, bool success)
+{
+	struct client *cli = userdata;
+	uint32_t sz;
+
+#ifdef WORDS_BIGENDIAN
+	swab32(cli->ubbp.op_size);
+#endif
+	if (memcmp(cli->ubbp.magic, "CLD1", 4))
+		goto err_out;
+	if (UBBP_OP(cli->ubbp.op_size) != 1)
+		goto err_out;
+	sz = UBBP_SIZE(cli->ubbp.op_size);
+	if (sz > CLD_RAW_MSG_SZ)
+		goto err_out;
+
+	cli->raw_size = sz;
+
+	atcp_read(&cli->rst, cli->raw_pkt, sz, cli_rd_pkt, cli);
+
+	return;
+
+err_out:
+	cli_free(cli);
 }
 
 static void add_chkpt_timer(void)
@@ -672,6 +662,170 @@ static void cldb_checkpoint(int fd, short events, void *userdata)
 	add_chkpt_timer();
 }
 
+static void atcp_read_init(struct atcp_read_state *rst)
+{
+	memset(rst, 0, sizeof(*rst));
+	INIT_LIST_HEAD(&rst->q);
+}
+
+static bool atcp_read(struct atcp_read_state *rst,
+		      void *buf, unsigned int buf_size,
+		      void (*cb)(void *, bool), void *cb_data)
+{
+	struct atcp_read *rd;
+
+	rd = calloc(1, sizeof(*rd));
+	if (!rd)
+		goto err_out;
+
+	rd->buf = buf;
+	rd->buf_size = buf_size;
+	rd->bytes_wanted = buf_size;
+	rd->cb = cb;
+	rd->cb_data = cb_data;
+
+	INIT_LIST_HEAD(&rd->node);
+
+	list_add_tail(&rd->node, &rst->q);
+
+	return true;
+
+err_out:
+	cb(cb_data, false);
+	return false;
+}
+
+static bool atcp_read_event(struct atcp_read_state *rst, int fd)
+{
+	struct atcp_read *tmp, *iter;
+
+	list_for_each_entry_safe(tmp, iter, &rst->q, node) {
+		ssize_t rrc;
+
+		rrc = read(fd, tmp->buf + tmp->bytes_read,
+			   tmp->bytes_wanted);
+		if (rrc < 0) {
+			if (errno == EAGAIN)
+				return true;
+			return false;
+		}
+		if (rrc == 0)
+			break;
+
+		tmp->bytes_read += rrc;
+		tmp->bytes_wanted -= rrc;
+
+		if (tmp->bytes_read == tmp->buf_size) {
+			list_del_init(&tmp->node);
+
+			tmp->cb(tmp->cb_data, true);
+			free(tmp);
+		}
+	}
+
+	return true;
+}
+
+static struct client *cli_alloc(void)
+{
+	struct client *cli;
+
+	cli = calloc(1, sizeof(*cli));
+	if (!cli)
+		return NULL;
+
+	cli->addr_len = sizeof(cli->addr);
+
+	atcp_read_init(&cli->rst);
+	
+	return cli;
+}
+
+static void cli_free(struct client *cli)
+{
+	if (!cli)
+		return;
+
+	if (cli->fd >= 0) {
+		event_del(&cli->ev);
+		close(cli->fd);
+		cli->fd = -1;
+	}
+	
+	free(cli);
+}
+
+static void tcp_cli_event(int fd, short events, void *userdata)
+{
+	struct client *cli = userdata;
+
+	atcp_read_event(&cli->rst, fd);
+}
+
+static void tcp_srv_event(int fd, short events, void *userdata)
+{
+	struct server_socket *sock = userdata;
+	struct client *cli;
+	char host[64];
+	char port[16];
+	int on = 1;
+
+	cli = cli_alloc();
+	if (!cli) {
+		applog(LOG_ERR, "out of memory");
+		server_running = false;
+		event_loopbreak();
+		return;
+	}
+
+	/* receive TCP connection from kernel */
+	cli->fd = accept(sock->fd, (struct sockaddr *) &cli->addr,
+			 &cli->addr_len);
+	if (cli->fd < 0) {
+		syslogerr("tcp accept");
+		goto err_out;
+	}
+
+	/* mark non-blocking, for upcoming poll use */
+	if (fsetflags("tcp client", cli->fd, O_NONBLOCK) < 0)
+		goto err_out_fd;
+
+	/* disable delay of small output packets */
+	if (setsockopt(cli->fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)) < 0)
+		applog(LOG_WARNING, "TCP_NODELAY failed: %s",
+		       strerror(errno));
+
+	event_set(&cli->ev, cli->fd, EV_READ | EV_PERSIST,
+		  tcp_cli_event, cli);
+
+	/* pretty-print incoming cxn info */
+	getnameinfo((struct sockaddr *) &cli->addr, cli->addr_len,
+		    host, sizeof(host), port, sizeof(port),
+		    NI_NUMERICHOST | NI_NUMERICSERV);
+	host[sizeof(host) - 1] = 0;
+	port[sizeof(port) - 1] = 0;
+	applog(LOG_INFO, "client host %s port %s connected%s", host, port,
+		/* cli->ssl ? " via SSL" : */ "");
+
+	strcpy(cli->addr_host, host);
+	strcpy(cli->addr_port, port);
+
+	if (event_add(&cli->ev, NULL) < 0) {
+		applog(LOG_ERR, "unable to ready srv fd for polling");
+		goto err_out_fd;
+	}
+	cli->ev_mask = EV_READ;
+
+	atcp_read(&cli->rst, &cli->ubbp, sizeof(cli->ubbp),
+		  cli_rd_ubbp, cli);
+
+	return;
+
+err_out_fd:
+err_out:
+	cli_free(cli);
+}
+
 static int net_write_port(const char *port_file, const char *port_str)
 {
 	FILE *portf;
@@ -717,17 +871,23 @@ static int net_open_socket(int addr_fam, int sock_type, int sock_prot,
 
 	fd = socket(addr_fam, sock_type, sock_prot);
 	if (fd < 0) {
-		syslogerr("udp socket");
+		syslogerr("tcp socket");
 		return -errno;
 	}
 
 	if (bind(fd, addr_ptr, addr_len) < 0) {
-		syslogerr("udp bind");
+		syslogerr("tcp bind");
+		close(fd);
+		return -errno;
+	}
+
+	if (listen(fd, 100) < 0) {
+		syslogerr("tcp listen");
 		close(fd);
 		return -errno;
 	}
 
-	rc = fsetflags("udp server", fd, O_NONBLOCK);
+	rc = fsetflags("tcp server", fd, O_NONBLOCK);
 	if (rc) {
 		close(fd);
 		return -errno;
@@ -743,7 +903,7 @@ static int net_open_socket(int addr_fam, int sock_type, int sock_prot,
 	INIT_LIST_HEAD(&sock->sockets_node);
 
 	event_set(&sock->ev, fd, EV_READ | EV_PERSIST,
-		  udp_srv_event, sock);
+		  tcp_srv_event, sock);
 
 	if (event_add(&sock->ev, NULL) < 0) {
 		free(sock);
@@ -771,7 +931,7 @@ static int net_open_any(void)
 	memset(&addr6, 0, sizeof(addr6));
 	addr6.sin6_family = AF_INET6;
 	memcpy(&addr6.sin6_addr, &in6addr_any, sizeof(struct in6_addr));
-	fd6 = net_open_socket(AF_INET6, SOCK_DGRAM, 0, sizeof(addr6), &addr6);
+	fd6 = net_open_socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, sizeof(addr6), &addr6);
 
 	if (fd6 >= 0) {
 		addr_len = sizeof(addr6);
@@ -790,7 +950,7 @@ static int net_open_any(void)
 	/* If IPv6 worked, we must use the same port number for IPv4 */
 	if (port)
 		addr4.sin_port = port;
-	fd4 = net_open_socket(AF_INET, SOCK_DGRAM, 0, sizeof(addr4), &addr4);
+	fd4 = net_open_socket(AF_INET, SOCK_STREAM, IPPROTO_TCP, sizeof(addr4), &addr4);
 
 	if (!port) {
 		if (fd4 < 0)
@@ -824,7 +984,7 @@ static int net_open_known(const char *portstr)
 
 	memset(&hints, 0, sizeof(hints));
 	hints.ai_family = PF_UNSPEC;
-	hints.ai_socktype = SOCK_DGRAM;
+	hints.ai_socktype = SOCK_STREAM;
 	hints.ai_flags = AI_PASSIVE;
 
 	rc = getaddrinfo(NULL, portstr, &hints, &res0);
diff --git a/cld/session.c b/cld/session.c
index 9887aaa..a38874a 100644
--- a/cld/session.c
+++ b/cld/session.c
@@ -550,7 +550,7 @@ static int sess_retry_output(struct session *sess, time_t *next_retry_out)
 				  			op->pkt_len));
 		}
 
-		rc = udp_tx(sess->sock_fd, (struct sockaddr *) &sess->addr,
+		rc = tcp_tx(sess->sock_fd, (struct sockaddr *) &sess->addr,
 			    sess->addr_len, op->pkt_data, op->pkt_len);
 		if (rc)
 			break;
@@ -715,7 +715,7 @@ bool sess_sendmsg(struct session *sess,
 	     tmp_list = g_list_next(tmp_list)) {
 		struct session_outpkt *op =
 			(struct session_outpkt *) tmp_list->data;
-		udp_tx(sess->sock_fd, (struct sockaddr *) &sess->addr,
+		tcp_tx(sess->sock_fd, (struct sockaddr *) &sess->addr,
 			sess->addr_len, op->pkt_data, op->pkt_len);
 	}
 
diff --git a/configure.ac b/configure.ac
index 9cfad23..d93a9a5 100644
--- a/configure.ac
+++ b/configure.ac
@@ -69,6 +69,7 @@ AC_CHECK_HEADERS(sys/sendfile.h sys/filio.h)
 AC_CHECK_HEADER(db.h,[],exit 1)
 
 dnl Checks for typedefs, structures, and compiler characteristics.
+AC_C_BIGENDIAN
 dnl AC_TYPE_SIZE_T
 dnl AC_TYPE_PID_T
 
diff --git a/include/Makefile.am b/include/Makefile.am
index 967352a..70f56a9 100644
--- a/include/Makefile.am
+++ b/include/Makefile.am
@@ -4,6 +4,6 @@ EXTRA_DIST =		\
 	elist.h chunk_msg.h chunksrv.h chunk-private.h objcache.h
 
 include_HEADERS =	\
-	cldc.h cld_common.h ncld.h chunkc.h chunk_msg.h		\
+	ubbp.h cldc.h cld_common.h ncld.h chunkc.h chunk_msg.h		\
 	hail_log.h hstor.h anet.h
 
diff --git a/include/cld_common.h b/include/cld_common.h
index 84b1ec6..efb5911 100644
--- a/include/cld_common.h
+++ b/include/cld_common.h
@@ -31,6 +31,10 @@
 
 #define CLD_ALIGN8(n) ((8 - ((n) & 7)) & 7)
 
+enum {
+	CLD_RAW_MSG_SZ		= 4096,
+};
+
 struct cld_timer {
 	bool			fired;
 	bool			on_list;
diff --git a/include/cldc.h b/include/cldc.h
index c64eef9..f98d151 100644
--- a/include/cldc.h
+++ b/include/cldc.h
@@ -25,6 +25,7 @@
 #include <cld_msg_rpc.h>
 #include <cld_common.h>
 #include <hail_log.h>
+#include <ubbp.h>
 
 struct cldc_session;
 
@@ -142,13 +143,20 @@ struct cldc_host {
 	unsigned short	port;
 };
 
-/** A UDP implementation of the CLD client protocol */
-struct cldc_udp {
+/** A TCP implementation of the CLD client protocol */
+struct cldc_tcp {
 	uint8_t		addr[64];		/* server address */
 	size_t		addr_len;
 
 	int		fd;
 
+	struct ubbp_header ubbp;
+	unsigned int	ubbp_read;
+
+	char		raw_pkt[CLD_RAW_MSG_SZ];
+	unsigned int	raw_size;
+	unsigned int	raw_read;
+
 	struct cldc_session *sess;
 
 	int		(*cb)(struct cldc_session *, void *);
@@ -215,12 +223,12 @@ extern void cldc_copts_get_data(const struct cldc_call_opts *copts,
 extern void cldc_copts_get_metadata(const struct cldc_call_opts *copts,
 				    struct cldc_node_metadata *md);
 
-/* cldc-udp */
-extern void cldc_udp_free(struct cldc_udp *udp);
-extern int cldc_udp_new(const char *hostname, int port,
-		 struct cldc_udp **udp_out);
-extern int cldc_udp_receive_pkt(struct cldc_udp *udp);
-extern int cldc_udp_pkt_send(void *private,
+/* cldc-tcp */
+extern void cldc_tcp_free(struct cldc_tcp *tcp);
+extern int cldc_tcp_new(const char *hostname, int port,
+		 struct cldc_tcp **tcp_out);
+extern int cldc_tcp_receive_pkt_data(struct cldc_tcp *tcp);
+extern int cldc_tcp_pkt_send(void *private,
 			  const void *addr, size_t addrlen,
 			  const void *buf, size_t buflen);
 
diff --git a/include/ncld.h b/include/ncld.h
index aad88b7..21b6e36 100644
--- a/include/ncld.h
+++ b/include/ncld.h
@@ -40,8 +40,8 @@ struct ncld_sess {
 	int			errc;
 	GList			*handles;
 	int			to_thread[2];
-	struct cldc_udp		*udp;
-	struct cld_timer	udp_timer;
+	struct cldc_tcp		*tcp;
+	struct cld_timer	tcp_timer;
 	struct cld_timer_list	tlist;
 	void			(*event)(void *, unsigned int);
 	void			*event_arg;
diff --git a/include/ubbp.h b/include/ubbp.h
new file mode 100644
index 0000000..94dbbab
--- /dev/null
+++ b/include/ubbp.h
@@ -0,0 +1,52 @@
+#ifndef __UBBP_H__
+#define __UBBP_H__
+
+/* 
+
+   Universal Base Binary Protocol (UBBP) - a universal message header
+
+   Copyright 2010 Red Hat, Inc.
+
+   Permission is hereby granted, free of charge, to any person
+   obtaining a copy of this software and associated documentation
+   files (the "Software"), to deal in the Software without restriction,
+   including without limitation the rights to use, copy, modify, merge,
+   publish, distribute, sublicense, and/or sell copies of the Software,
+   and to permit persons to whom the Software is furnished to do so,
+   subject to the following conditions:
+
+   The above copyright notice and this permission notice shall be
+   included in all copies or substantial portions of the Software.
+
+   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
+   KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
+   WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+   NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+   BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
+   AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+   IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+   THE SOFTWARE.
+
+*/
+
+
+#include <stdint.h>
+
+struct ubbp_header {
+	/* magic number of your protocol */
+	unsigned char	magic[4];
+
+	/*
+	 * size: upper 24 bits (size = op_size >> 8)
+	 * op: lower 8 bits (op = op_size & 0xff)
+	 *
+	 * Byte order: little endian
+	 */
+	uint32_t	op_size;
+};
+
+#define UBBP_OP(op_size) ((op_size) & 0xff)
+#define UBBP_SIZE(op_size) ((op_size) >> 8)
+
+#endif /* __UBBP_H__ */
+
diff --git a/lib/Makefile.am b/lib/Makefile.am
index 616b881..8be1835 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -23,7 +23,7 @@ lib_LTLIBRARIES		= libhail.la
 libhail_la_SOURCES	=	\
 	atcp.c			\
 	cldc.c			\
-	cldc-udp.c		\
+	cldc-tcp.c		\
 	cldc-dns.c		\
 	common.c		\
 	libtimer.c		\
diff --git a/lib/cldc-dns.c b/lib/cldc-dns.c
index 92f875d..e987c01 100644
--- a/lib/cldc-dns.c
+++ b/lib/cldc-dns.c
@@ -63,7 +63,7 @@ int cldc_saveaddr(struct cldc_host *hp,
 
 	memset(&hints, 0, sizeof(struct addrinfo));
 	hints.ai_family = PF_UNSPEC;
-	hints.ai_socktype = SOCK_DGRAM;
+	hints.ai_socktype = SOCK_STREAM;
 
 	rc = getaddrinfo(hostname, portstr, &hints, &res0);
 	if (rc) {
diff --git a/lib/cldc-tcp.c b/lib/cldc-tcp.c
new file mode 100644
index 0000000..63a753b
--- /dev/null
+++ b/lib/cldc-tcp.c
@@ -0,0 +1,185 @@
+
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING.  If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define _GNU_SOURCE
+#include "hail-config.h"
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <errno.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <cldc.h>
+
+void cldc_tcp_free(struct cldc_tcp *tcp)
+{
+	if (!tcp)
+		return;
+
+	if (tcp->fd >= 0)
+		close(tcp->fd);
+
+	free(tcp);
+}
+
+int cldc_tcp_new(const char *hostname, int port,
+		 struct cldc_tcp **tcp_out)
+{
+	struct cldc_tcp *tcp;
+	struct addrinfo hints, *res, *rp;
+	char port_s[32];
+	int rc, fd = -1;
+
+	*tcp_out = NULL;
+
+	sprintf(port_s, "%d", port);
+
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_protocol = IPPROTO_TCP;
+
+	rc = getaddrinfo(hostname, port_s, &hints, &res);
+	if (rc)
+		return -ENOENT;
+
+	for (rp = res; rp; rp = rp->ai_next) {
+		fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+		if (fd < 0)
+			continue;
+
+		if (connect(fd, rp->ai_addr, rp->ai_addrlen) != -1)
+			break;	/* success */
+
+		close(fd);
+		fd = -1;
+	}
+
+	if (!rp) {
+		freeaddrinfo(res);
+		return -ENOENT;
+	}
+
+	tcp = calloc(1, sizeof(*tcp));
+	if (!tcp) {
+		freeaddrinfo(res);
+		close(fd);
+		return -ENOMEM;
+	}
+
+	memcpy(tcp->addr, rp->ai_addr, rp->ai_addrlen);
+	tcp->addr_len = rp->ai_addrlen;
+
+	tcp->fd = fd;
+
+	freeaddrinfo(res);
+
+	*tcp_out = tcp;
+
+	return 0;
+}
+
+int cldc_tcp_receive_pkt_data(struct cldc_tcp *tcp)
+{
+	static char buf[CLD_RAW_MSG_SZ];	/* BUG: static buf */
+	ssize_t rc, crc;
+	void *p;
+
+	if (tcp->ubbp_read < sizeof(tcp->ubbp)) {
+		p = &tcp->ubbp;
+		p += tcp->ubbp_read;
+		rc = read(tcp->fd, p, sizeof(tcp->ubbp) - tcp->ubbp_read);
+		if (rc < 0) {
+			if (errno != EAGAIN)
+				return -errno;
+			return 0;
+		}
+
+		tcp->ubbp_read += rc;
+		if (tcp->ubbp_read == sizeof(tcp->ubbp)) {
+#ifdef WORDS_BIGENDIAN
+			swab32(ubbp.op_size);
+#endif
+
+			if (memcmp(tcp->ubbp.magic, "CLD1", 4))
+				return -EIO;
+			if (UBBP_OP(tcp->ubbp.op_size) != 2)
+				return -EIO;
+			tcp->raw_read = 0;
+			tcp->raw_size = UBBP_SIZE(tcp->ubbp.op_size);
+			if (tcp->raw_size > CLD_RAW_MSG_SZ)
+				return -EIO;
+		}
+	}
+	if (!tcp->raw_size)
+		return 0;
+
+	p = buf;		/* BUG: uses temp buffer */
+	p += tcp->raw_read;
+	rc = read(tcp->fd, p, tcp->raw_size - tcp->raw_read);
+	if (rc < 0) {
+		if (errno != EAGAIN)
+			return -errno;
+		return 0;
+	}
+
+	tcp->raw_read += rc;
+
+	if (tcp->raw_read < tcp->raw_size)
+		return 0;
+
+	tcp->ubbp_read = 0;
+
+	crc = cldc_receive_pkt(tcp->sess, tcp->addr, tcp->addr_len, buf,
+				tcp->raw_size);
+	if (crc)
+		return crc;
+
+	return 0;
+}
+
+int cldc_tcp_pkt_send(void *private,
+			const void *addr, size_t addrlen,
+			const void *buf, size_t buflen)
+{
+	struct cldc_tcp *tcp = private;
+	ssize_t rc;
+	struct ubbp_header ubbp;
+
+	memcpy(ubbp.magic, "CLD1", 4);
+	ubbp.op_size = (buflen << 8) | 1;
+#ifdef WORDS_BIGENDIAN
+	swab32(ubbp.op_size);
+#endif
+
+	rc = write(tcp->fd, &ubbp, sizeof(ubbp));
+	if (rc < 0)
+		return -errno;
+
+	rc = write(tcp->fd, buf, buflen);
+	if (rc < 0)
+		return -errno;
+
+	return 0;
+}
+
diff --git a/lib/cldc-udp.c b/lib/cldc-udp.c
deleted file mode 100644
index 3042746..0000000
--- a/lib/cldc-udp.c
+++ /dev/null
@@ -1,141 +0,0 @@
-
-/*
- * Copyright 2009 Red Hat, Inc.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; see the file COPYING.  If not, write to
- * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
- *
- */
-
-#define _GNU_SOURCE
-#include "hail-config.h"
-
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <string.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <stdio.h>
-#include <errno.h>
-#include <netinet/in.h>
-#include <netdb.h>
-#include <cldc.h>
-
-void cldc_udp_free(struct cldc_udp *udp)
-{
-	if (!udp)
-		return;
-
-	if (udp->fd >= 0)
-		close(udp->fd);
-
-	free(udp);
-}
-
-int cldc_udp_new(const char *hostname, int port,
-		 struct cldc_udp **udp_out)
-{
-	struct cldc_udp *udp;
-	struct addrinfo hints, *res, *rp;
-	char port_s[32];
-	int rc, fd = -1;
-
-	*udp_out = NULL;
-
-	sprintf(port_s, "%d", port);
-
-	memset(&hints, 0, sizeof(hints));
-	hints.ai_family = AF_UNSPEC;
-	hints.ai_socktype = SOCK_DGRAM;
-	hints.ai_protocol = IPPROTO_UDP;
-
-	rc = getaddrinfo(hostname, port_s, &hints, &res);
-	if (rc)
-		return -ENOENT;
-
-	for (rp = res; rp; rp = rp->ai_next) {
-		fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
-		if (fd < 0)
-			continue;
-
-		if (connect(fd, rp->ai_addr, rp->ai_addrlen) != -1)
-			break;	/* success */
-
-		close(fd);
-		fd = -1;
-	}
-
-	if (!rp) {
-		freeaddrinfo(res);
-		return -ENOENT;
-	}
-
-	udp = calloc(1, sizeof(*udp));
-	if (!udp) {
-		freeaddrinfo(res);
-		close(fd);
-		return -ENOMEM;
-	}
-
-	memcpy(udp->addr, rp->ai_addr, rp->ai_addrlen);
-	udp->addr_len = rp->ai_addrlen;
-
-	udp->fd = fd;
-
-	freeaddrinfo(res);
-
-	*udp_out = udp;
-
-	return 0;
-}
-
-int cldc_udp_receive_pkt(struct cldc_udp *udp)
-{
-	char buf[2048];
-	ssize_t rc, crc;
-
-	rc = recv(udp->fd, buf, sizeof(buf), MSG_DONTWAIT);
-	if (rc < 0) {
-		if (errno != EAGAIN)
-			return -errno;
-	}
-	if (rc <= 0)
-		return 0;
-
-	if (!udp->sess)
-		return -ENXIO;
-
-	crc = cldc_receive_pkt(udp->sess, udp->addr, udp->addr_len, buf, rc);
-	if (crc)
-		return crc;
-
-	return 0;
-}
-
-int cldc_udp_pkt_send(void *private,
-			const void *addr, size_t addrlen,
-			const void *buf, size_t buflen)
-{
-	struct cldc_udp *udp = private;
-	ssize_t rc;
-
-	/* we are connected, so we ignore addr and addrlen args */
-	rc = send(udp->fd, buf, buflen, MSG_DONTWAIT);
-	if (rc < 0)
-		return -errno;
-	if (rc != buflen)
-		return -EILSEQ;
-
-	return 0;
-}
-
diff --git a/lib/cldc.c b/lib/cldc.c
index 5209ea2..f0c3b59 100644
--- a/lib/cldc.c
+++ b/lib/cldc.c
@@ -1375,13 +1375,13 @@ static int ncld_gethost(char **hostp, unsigned short *portp,
 	return 0;
 }
 
-static void ncld_udp_timer_event(struct cld_timer *timer)
+static void ncld_tcp_timer_event(struct cld_timer *timer)
 {
 	struct ncld_sess *nsess = timer->userdata;
-	struct cldc_udp *udp = nsess->udp;
+	struct cldc_tcp *tcp = nsess->tcp;
 
-	if (udp->cb)
-		udp->cb(udp->sess, udp->cb_private);
+	if (tcp->cb)
+		tcp->cb(tcp->sess, tcp->cb_private);
 }
 
 enum {
@@ -1447,7 +1447,7 @@ static gpointer ncld_sess_thr(gpointer data)
 		memset(pfd, 0, sizeof(pfd));
 		pfd[0].fd = nsess->to_thread[0];
 		pfd[0].events = POLLIN;
-		pfd[1].fd = nsess->udp->fd;
+		pfd[1].fd = nsess->tcp->fd;
 		pfd[1].events = POLLIN;
 
 		rc = poll(pfd, 2, tmo ? tmo*1000 : -1);
@@ -1464,7 +1464,7 @@ static gpointer ncld_sess_thr(gpointer data)
 					ncld_thread_command(nsess);
 				} else {
 					g_mutex_lock(nsess->mutex);
-					cldc_udp_receive_pkt(nsess->udp);
+					cldc_tcp_receive_pkt_data(nsess->tcp);
 					g_mutex_unlock(nsess->mutex);
 				}
 			}
@@ -1491,14 +1491,14 @@ static bool ncld_p_timer_ctl(void *priv, bool add,
 			     void *cb_priv, time_t secs)
 {
 	struct ncld_sess *nsess = priv;
-	struct cldc_udp *udp = nsess->udp;
+	struct cldc_tcp *tcp = nsess->tcp;
 
 	if (add) {
-		udp->cb = cb;
-		udp->cb_private = cb_priv;
-		cld_timer_add(&nsess->tlist, &nsess->udp_timer, time(NULL) + secs);
+		tcp->cb = cb;
+		tcp->cb_private = cb_priv;
+		cld_timer_add(&nsess->tlist, &nsess->tcp_timer, time(NULL) + secs);
 	} else {
-		cld_timer_del(&nsess->tlist, &nsess->udp_timer);
+		cld_timer_del(&nsess->tlist, &nsess->tcp_timer);
 	}
 	return true;
 }
@@ -1507,7 +1507,7 @@ static int ncld_p_pkt_send(void *priv, const void *addr, size_t addrlen,
 			       const void *buf, size_t buflen)
 {
 	struct ncld_sess *nsess = priv;
-	return cldc_udp_pkt_send(nsess->udp, addr, addrlen, buf, buflen);
+	return cldc_tcp_pkt_send(nsess->tcp, addr, addrlen, buf, buflen);
 }
 
 static void ncld_p_event(void *priv, struct cldc_session *csp,
@@ -1517,7 +1517,7 @@ static void ncld_p_event(void *priv, struct cldc_session *csp,
 	unsigned char cmd;
 
 	if (what == CE_SESS_FAILED) {
-		if (nsess->udp->sess != csp)
+		if (nsess->tcp->sess != csp)
 			abort();
 		if (!nsess->is_up)
 			return;
@@ -1542,7 +1542,7 @@ static void ncld_p_event(void *priv, struct cldc_session *csp,
 		 * Notice that we are already running on the context of the
 		 * thread that will deliver the event, so pipe really is not
 		 * needed: could as well set a flag and test it right after
-		 * the call to cldc_udp_receive_pkt(). But pipe also provides
+		 * the call to cldc_tcp_receive_pkt_data(). But pipe also provides
 		 * a queue of events, just in case. It's not like these events
 		 * are super-performance critical.
 		 */
@@ -1634,8 +1634,8 @@ struct ncld_sess *ncld_sess_open(const char *host, int port, int *error,
 	if (!nsess)
 		goto out_sesalloc;
 	memset(nsess, 0, sizeof(struct ncld_sess));
-	cld_timer_init(&nsess->udp_timer, "nsess-udp-timer",
-		       ncld_udp_timer_event, nsess);
+	cld_timer_init(&nsess->tcp_timer, "nsess-tcp-timer",
+		       ncld_tcp_timer_event, nsess);
 	nsess->mutex = g_mutex_new();
 	if (!nsess->mutex)
 		goto out_mutex;
@@ -1661,9 +1661,9 @@ struct ncld_sess *ncld_sess_open(const char *host, int port, int *error,
 		goto out_pipe_to;
 	}
 
-	if (cldc_udp_new(nsess->host, nsess->port, &nsess->udp)) {
+	if (cldc_tcp_new(nsess->host, nsess->port, &nsess->tcp)) {
 		err = 1023;
-		goto out_udp;
+		goto out_tcp;
 	}
 
 	nsess->thread = g_thread_create(ncld_sess_thr, nsess, TRUE, &gerr);
@@ -1677,9 +1677,9 @@ struct ncld_sess *ncld_sess_open(const char *host, int port, int *error,
 	copts.cb = ncld_new_sess;
 	copts.private = nsess;
 	if (cldc_new_sess_log(&ncld_ops, &copts,
-			      nsess->udp->addr, nsess->udp->addr_len,
+			      nsess->tcp->addr, nsess->tcp->addr_len,
 			      cld_user, cld_key, nsess, log,
-			      &nsess->udp->sess)) {
+			      &nsess->tcp->sess)) {
 		g_mutex_unlock(nsess->mutex);
 		err = 1024;
 		goto out_session;
@@ -1695,12 +1695,12 @@ struct ncld_sess *ncld_sess_open(const char *host, int port, int *error,
 	return nsess;
 
 out_start:
-	cldc_kill_sess(nsess->udp->sess);
+	cldc_kill_sess(nsess->tcp->sess);
 out_session:
 	ncld_thr_end(nsess);
 out_thread:
-	cldc_udp_free(nsess->udp);
-out_udp:
+	cldc_tcp_free(nsess->tcp);
+out_tcp:
 	close(nsess->to_thread[0]);
 	close(nsess->to_thread[1]);
 out_pipe_to:
@@ -1774,7 +1774,7 @@ struct ncld_fh *ncld_open(struct ncld_sess *nsess, const char *fname,
 	memset(&copts, 0, sizeof(copts));
 	copts.cb = ncld_open_cb;
 	copts.private = fh;
-	rc = cldc_open(nsess->udp->sess, &copts, fname, mode, events, &fh->fh);
+	rc = cldc_open(nsess->tcp->sess, &copts, fname, mode, events, &fh->fh);
 	if (rc) {
 		err = -rc;
 		g_mutex_unlock(nsess->mutex);
@@ -1850,7 +1850,7 @@ int ncld_del(struct ncld_sess *nsess, const char *fname)
 	memset(&copts, 0, sizeof(copts));
 	copts.cb = ncld_del_cb;
 	copts.private = &dpb;
-	rc = cldc_del(nsess->udp->sess, &copts, fname);
+	rc = cldc_del(nsess->tcp->sess, &copts, fname);
 	if (rc) {
 		g_mutex_unlock(nsess->mutex);
 		return -rc;
@@ -2266,9 +2266,9 @@ void ncld_sess_close(struct ncld_sess *nsess)
 		ncld_close(p);
 	g_list_free(nsess->handles);
 
-	cldc_kill_sess(nsess->udp->sess);
+	cldc_kill_sess(nsess->tcp->sess);
 	ncld_thr_end(nsess);
-	cldc_udp_free(nsess->udp);
+	cldc_tcp_free(nsess->tcp);
 	close(nsess->to_thread[0]);
 	close(nsess->to_thread[1]);
 	g_cond_free(nsess->cond);

^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [PATCH 3/3] CLD: enable replication on server and client
  2010-12-31 10:57 ` [PATCH 2/3] CLD: switch network proto from UDP to TCP Jeff Garzik
@ 2010-12-31 10:58   ` Jeff Garzik
  2011-01-02 23:32   ` [PATCH 2/3] CLD: switch network proto from UDP to TCP Pete Zaitcev
  1 sibling, 0 replies; 6+ messages in thread
From: Jeff Garzik @ 2010-12-31 10:58 UTC (permalink / raw)
  To: hail-devel


Enable master/slave replication in CLD, by db4's replication manager.

On the server side, you may input a list of peer servers via DNS SRV
records, or manually on the command line for testing.  Each peer
participates in replication elections, and functions as a slave until
the replication engine decides it is to be elected master.  Under the
hood, db4 does use PAXOS in versions >= 4.7.  All CLD network operations
are already wrapped inside db4 transactions, so this merely extends
current reliability.  The server requires a quorum of 5 peers by
default, but this is easily changeable.

For the most part, this piece of code has been around for a little
while.  It has now been updated for all recent hail changes.  The new
bits are client side work that enables replication/failover to work
smoothly and reliably.

On the client side, the main change is passing around a list of
cldc_host.  All users of the lib built this internally, and then hobbled
themselves by calling a singleton hostname/port interface.  This
limitation has been removed, properly exposing the entire list of CLD
servers to the CLD client API.

CLD clients will make TCP connections to each CLD server, in priority
order.  Connections failing will move on to the next server.  Ditto with
in-session server death; clients will move back into seeking-a-master
state.  CLD clients always seek the master for all operations; no
attempt is made to divide the load by sending clients to slave servers.

This is all moving towards the goal of having clients connect to a cell
(aka a cluster), rather than a host.  CLD users will specify

	cld://mycluster.example.com/tabled/1234/ABCD

and the client lib will perform a DNS SRV lookup on
"mycluster.example.com".  The DNS SRV lookup returns one or more hosts
and priorities, each of which are in turn looked-up with getaddrinfo(3).

As an added reliability feature, stolen from many current P2P
applications, if you reach at least one CLD server, you can download a
host list from that server.  This makes sure you can bootstrap into the
cloud regardless of how poorly your systems are running :)  Bittorrent
users are already familiar with "peer exchange."

Unlike the previous two patches, this code is hot off the presses, and
probably still has bugs (hence the "NOT-signed-off-by" below).

NOT-Signed-off-by: Jeff Garzik <jgarzik@redhat.com>
---
 chunkd/cldu.c   |   13 +-
 cld/cld.h       |   22 +++
 cld/cldb.c      |   69 ++++++++++--
 cld/cldb.h      |    9 +
 cld/cldbadm.c   |    6 -
 cld/server.c    |  313 ++++++++++++++++++++++++++++++++++++++++++++++++++++----
 include/cldc.h  |    3 
 include/ncld.h  |    3 
 lib/cldc-tcp.c  |   21 +++
 lib/cldc.c      |   22 ---
 tools/cldcli.c  |    4 
 tools/cldfuse.c |    5 
 12 files changed, 423 insertions(+), 67 deletions(-)

diff --git a/chunkd/cldu.c b/chunkd/cldu.c
index 41f94b5..b17643e 100644
--- a/chunkd/cldu.c
+++ b/chunkd/cldu.c
@@ -47,6 +47,8 @@ struct cld_session {
 	int			actx;		/* Active host cldv[actx] */
 	struct cld_host		cldv[N_CLD];
 
+	GList			*host_list;
+
 	int			event_pipe[2];
 	struct event		ev;
 	struct event		ev_timer;
@@ -357,7 +359,7 @@ static int cldu_set_cldc(struct cld_session *cs, int newactive)
 		applog(LOG_INFO, "Selected CLD host %s port %u",
 		       hp->host, hp->port);
 
-	cs->nsess = ncld_sess_open(hp->host, hp->port, &error,
+	cs->nsess = ncld_sess_open(cs->host_list, &error,
 				   cldu_sess_event, cs, "tabled", "tabled",
 				   &cldu_hail_log);
 	if (cs->nsess == NULL) {
@@ -505,8 +507,7 @@ int cld_begin(const char *thishost, uint32_t nid, char *infopath,
 			goto err_addr;
 		}
 
-		/* copy host_list into cld_session host array,
-		 * taking ownership of alloc'd strings along the way
+		/* copy host_list into cld_session host array
 		 */
 		i = 0;
 		for (tmp = host_list; tmp; tmp = tmp->next) {
@@ -514,15 +515,13 @@ int cld_begin(const char *thishost, uint32_t nid, char *infopath,
 			if (i < N_CLD) {
 				memcpy(&cs->cldv[i].h, hp,
 				       sizeof(struct cldc_host));
+				cs->cldv[i].h.host = strdup(hp->host);
 				cs->cldv[i].known = 1;
 				i++;
-			} else {
-				free(hp->host);
 			}
-			free(hp);
 		}
 
-		g_list_free(host_list);
+		cs->host_list = host_list;
 	}
 
 	if (pipe(cs->event_pipe) < 0) {
diff --git a/cld/cld.h b/cld/cld.h
index b1f9bbf..c5c38b5 100644
--- a/cld/cld.h
+++ b/cld/cld.h
@@ -44,6 +44,15 @@ enum {
 	SFL_FOREGROUND		= (1 << 0),	/* run in foreground */
 };
 
+enum st_cldb {
+	ST_CLDB_INIT,
+	ST_CLDB_OPEN,
+	ST_CLDB_ACTIVE,
+	ST_CLDB_MASTER,
+	ST_CLDB_SLAVE,
+	ST_CLDBNUM
+};
+
 struct atcp_read {
 	void			*buf;
 	unsigned int		buf_size;
@@ -142,6 +151,19 @@ struct server {
 
 	struct event		chkpt_timer;	/* db4 checkpoint timer */
 
+	unsigned short		rep_port;	/* db4 replication port */
+
+	char			*myhost;
+	char			*force_myhost;
+	GList			*rep_remotes;
+
+	unsigned int		n_peers;	/* total peers in cell */
+
+	int			ev_pipe[2];	/* internal event pipe */
+	struct event		pipe_ev;
+
+	enum st_cldb		state_cldb;	/* db & replication state */
+
 	struct server_stats	stats;		/* global statistics */
 };
 
diff --git a/cld/cldb.c b/cld/cldb.c
index f8afb61..671a4c2 100644
--- a/cld/cldb.c
+++ b/cld/cldb.c
@@ -29,8 +29,6 @@
 #include <cld-private.h>
 #include "cld.h"
 
-static int cldb_up(struct cldb *cldb, unsigned int flags);
-
 /*
  * db4 page sizes for our various databases.  Filesystem block size
  * is recommended, so 4096 was chosen (default ext3 block size).
@@ -206,6 +204,30 @@ err_out:
 	return -EIO;
 }
 
+static int add_remote_sites(DB_ENV *dbenv, GList *remotes, int *nsites)
+{
+	int rc;
+	struct db_remote *rp;
+	GList *tmp;
+
+	*nsites = 0;
+	for (tmp = remotes; tmp; tmp = tmp->next) {
+		rp = tmp->data;
+
+		rc = dbenv->repmgr_add_remote_site(dbenv, rp->host, rp->port,
+						   NULL, 0);
+		if (rc) {
+			dbenv->err(dbenv, rc,
+				   "dbenv->add.remote.site host %s port %u",
+				   rp->host, rp->port);
+			return rc;
+		}
+		(*nsites)++;
+	}
+
+	return 0;
+}
+
 static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
 {
 	struct cldb *cldb = dbenv->app_private;
@@ -233,12 +255,13 @@ static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
 
 int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
 	      unsigned int env_flags, const char *errpfx, bool do_syslog,
-	      unsigned int flags, void (*cb)(enum db_event))
+	      GList *remotes, char *rep_host, unsigned short rep_port,
+	      int n_peers, void (*cb)(enum db_event))
 {
-	int rc;
+	int rc, nsites = 0;
 	DB_ENV *dbenv;
 
-	cldb->is_master = true;
+	cldb->is_master = false;
 	cldb->home = db_home;
 	cldb->state_cb = cb;
 
@@ -285,25 +308,55 @@ int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
 		cldb->keyed = true;
 	}
 
+	rc = dbenv->repmgr_set_local_site(dbenv, rep_host, rep_port, 0);
+	if (rc) {
+		dbenv->err(dbenv, rc, "dbenv->set_local_site");
+		goto err_out;
+	}
+
 	rc = dbenv->set_event_notify(dbenv, db4_event);
 	if (rc) {
 		dbenv->err(dbenv, rc, "dbenv->set_event_notify");
 		goto err_out;
 	}
 
+	rc = dbenv->rep_set_priority(dbenv, 100);
+	if (rc) {
+		dbenv->err(dbenv, rc, "dbenv->rep_set_priority");
+		goto err_out;
+	}
+
+	rc = dbenv->rep_set_nsites(dbenv, n_peers);
+	if (rc) {
+		dbenv->err(dbenv, rc, "dbenv->rep_set_nsites");
+		goto err_out;
+	}
+
+	rc = dbenv->repmgr_set_ack_policy(dbenv, DB_REPMGR_ACKS_QUORUM);
+	if (rc) {
+		dbenv->err(dbenv, rc, "dbenv->rep_ack_policy");
+		goto err_out;
+	}
+
 	/* init DB transactional environment, stored in directory db_home */
 	env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
-	env_flags |= DB_INIT_TXN;
+	env_flags |= DB_INIT_TXN | DB_INIT_REP;
 	rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
 	if (rc) {
 		dbenv->err(dbenv, rc, "dbenv->open");
 		goto err_out;
 	}
 
-	rc = cldb_up(cldb, flags);
+	rc = add_remote_sites(dbenv, remotes, &nsites);
 	if (rc)
 		goto err_out;
 
+	rc = dbenv->repmgr_start(dbenv, 2, DB_REP_ELECTION);
+	if (rc) {
+		dbenv->err(dbenv, rc, "dbenv->repmgr_start");
+		goto err_out;
+	}
+
 	return 0;
 
 err_out:
@@ -314,7 +367,7 @@ err_out:
 /*
  * open databases
  */
-static int cldb_up(struct cldb *cldb, unsigned int flags)
+int cldb_up(struct cldb *cldb, unsigned int flags)
 {
 	DB_ENV *dbenv = cldb->env;
 	int rc;
diff --git a/cld/cldb.h b/cld/cldb.h
index 0d8e618..0fec486 100644
--- a/cld/cldb.h
+++ b/cld/cldb.h
@@ -104,6 +104,11 @@ enum db_event {
 	CLDB_EV_NONE, CLDB_EV_CLIENT, CLDB_EV_MASTER, CLDB_EV_ELECTED
 };
 
+struct db_remote {	/* remotes for cldb_init */
+	char		*host;
+	unsigned short	port;
+};
+
 struct cldb {
 	bool		is_master;
 	bool		keyed;			/* using encryption? */
@@ -130,7 +135,9 @@ struct cldb {
 
 extern int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
 	      unsigned int env_flags, const char *errpfx, bool do_syslog,
-	      unsigned int flags, void (*cb)(enum db_event));
+	      GList *remotes, char *rep_host, unsigned short rep_port,
+	      int n_peers, void (*cb)(enum db_event));
+extern int cldb_up(struct cldb *cldb, unsigned int flags);
 extern void cldb_down(struct cldb *cldb);
 extern void cldb_fini(struct cldb *cldb);
 
diff --git a/cld/cldbadm.c b/cld/cldbadm.c
index 7176f3b..1c0f26a 100644
--- a/cld/cldbadm.c
+++ b/cld/cldbadm.c
@@ -78,7 +78,8 @@ int main(int argc, char *argv[])
 	}
 
 	if (cldb_init(&cld_adm.cldb, cld_adm.data_dir, NULL,
-		      DB_RECOVER, "cldbadm", false, 0, NULL))
+		      DB_RECOVER, "cldbadm", false,
+		      NULL, NULL, 0, 0, NULL))
 		goto err_dbopen;
 
 	switch (cld_adm.mode) {
@@ -143,7 +144,8 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state)
  */
 int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
 	      unsigned int env_flags, const char *errpfx, bool do_syslog,
-	      unsigned int flags, void (*cb)(enum db_event))
+	      GList *remotes, char *rep_host, unsigned short rep_port,
+	      int n_peers, void (*cb)(enum db_event))
 {
 
 	return 0;
diff --git a/cld/server.c b/cld/server.c
index 5a73e54..f248f92 100644
--- a/cld/server.c
+++ b/cld/server.c
@@ -45,6 +45,21 @@
 #define CLD_DEF_PIDFN	CLD_LOCAL_STATE_DIR "/run/cld.pid"
 #define CLD_DEF_DATADIR	CLD_LIBDIR "/cld/lib"
 
+enum {
+	CLD_DEF_REP_PORT	= 9081,
+
+	CLD_DEF_PEERS		= 5,
+	CLD_MIN_PEERS		= 3,
+	CLD_MAX_PEERS		= 400,		/* arbitrary "sanity" limit */
+};
+
+enum int_event_cmd {
+	IEC_NONE,		/* invalid / no command */
+	IEC_DUMP,		/* statistics dump */
+	IEC_NOW_MASTER,		/* replication state -> master */
+	IEC_NOW_SLAVE,		/* replication state -> slave */
+};
+
 const char *argp_program_version = PACKAGE_VERSION;
 
 static struct argp_option options[] = {
@@ -57,10 +72,18 @@ static struct argp_option options[] = {
 	  "Switch the log to standard error" },
 	{ "foreground", 'F', NULL, 0,
 	  "Run in foreground, do not fork" },
+	{ "myhost", 'm', "HOST", 0,
+	  "Force local hostname to HOST (def: autodetect)" },
 	{ "port", 'p', "PORT", 0,
 	  "Bind to TCP port PORT.  Default: " CLD_DEF_PORT },
 	{ "pid", 'P', "FILE", 0,
 	  "Write daemon process id to FILE.  Default: " CLD_DEF_PIDFN },
+	{ "rep-port", 'r', "PORT", 0,
+	  "bind replication engine to port PORT (def: 9081)" },
+	{ "remote", 'R', "HOST:PORT", 0,
+	  "Add a HOST:PORT pair to list of remote hosts.  Use this argument multiple times to build cell's peer list." },
+	{ "cell-size", 'S', "PEERS", 0,
+	  "Total number of PEERS in cell. (PEERS/2)+1 required for quorum.  Must be an odd number (def: 5)" },
 	{ "strict-free", 1001, NULL, 0,
 	  "For memory-checker runs.  When shutting down server, free local "
 	  "heap, rather than simply exit(2)ing and letting OS clean up." },
@@ -78,17 +101,25 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state);
 static const struct argp argp = { options, parse_opt, NULL, doc };
 
 static bool server_running = true;
-static bool dump_stats;
 static bool use_syslog = true;
 static bool strict_free = false;
 struct timeval current_time;
 
+static const char *state_name_cldb[ST_CLDBNUM] = {
+	"Init", "Open", "Active", "Master", "Slave"
+};
+
 struct server cld_srv = {
 	.data_dir		= CLD_DEF_DATADIR,
 	.pid_file		= CLD_DEF_PIDFN,
 	.port			= CLD_DEF_PORT,
+	.rep_port		= CLD_DEF_REP_PORT,
+	.n_peers		= CLD_DEF_PEERS,
+	.state_cldb		= ST_CLDB_INIT,
 };
 
+static void cldb_state_process(enum st_cldb new_state);
+static void stats_dump(void);
 static void ensure_root(void);
 static bool atcp_read(struct atcp_read_state *rst,
 		      void *buf, unsigned int buf_size,
@@ -117,6 +148,33 @@ static void applog(int prio, const char *fmt, ...)
 	va_end(ap);
 }
 
+/*
+ * Find out own hostname.
+ * This is needed for:
+ *  - finding the local domain and its SRV records
+ * Do this before our state machines start ticking, so we can quit with
+ * a meaningful message easily.
+ */
+static char *get_hostname(void)
+{
+	enum { hostsz = 64 };
+	char hostb[hostsz];
+	char *ret;
+
+	if (gethostname(hostb, hostsz-1) < 0) {
+		HAIL_ERR(&srv_log, "get_hostname: gethostname error (%d): %s",
+			 errno, strerror(errno));
+		exit(1);
+	}
+	hostb[hostsz-1] = 0;
+	if ((ret = strdup(hostb)) == NULL) {
+		HAIL_ERR(&srv_log, "get_hostname: no core (%ld)",
+			 (long)strlen(hostb));
+		exit(1);
+	}
+	return ret;
+}
+
 struct hail_log srv_log = {
 	.func = applog,
 };
@@ -863,6 +921,119 @@ static void net_close(void)
 	}
 }
 
+static void cldb_state_cb(enum db_event event)
+{
+
+	switch (event) {
+	case CLDB_EV_ELECTED:
+		/*
+		 * Safe to stop ignoring bogus client indication,
+		 * so unmute us by advancing the state.
+		 */
+		if (cld_srv.state_cldb == ST_CLDB_OPEN)
+			cld_srv.state_cldb = ST_CLDB_ACTIVE;
+		break;
+	case CLDB_EV_CLIENT:
+	case CLDB_EV_MASTER:
+		/*
+		 * This callback runs on the context of the replication
+		 * manager thread, and calling any of our functions thus
+		 * turns our program into a multi-threaded one. Instead
+		 * we do a loopbreak and postpone the processing.
+		 */
+		if (cld_srv.state_cldb != ST_CLDB_INIT &&
+		    cld_srv.state_cldb != ST_CLDB_OPEN) {
+			unsigned char cmd;
+
+			if (event == CLDB_EV_MASTER)
+				cmd = IEC_NOW_MASTER;
+			else
+				cmd = IEC_NOW_SLAVE;
+
+			/* wake up main loop */
+			write(cld_srv.ev_pipe[1], &cmd, 1);
+		}
+		break;
+	default:
+		HAIL_WARN(&srv_log, "API confusion with CLDB, event 0x%x",
+			  event);
+		cld_srv.state_cldb = ST_CLDB_OPEN;  /* wrong, stub for now */
+		break;
+	}
+}
+
+static void internal_event(int fd, short events, void *userdata)
+{
+	unsigned char cmd;
+	ssize_t rrc;
+
+	rrc = read(cld_srv.ev_pipe[0], &cmd, 1);
+	if (rrc < 0) {
+		HAIL_WARN(&srv_log, "pipe read error: %s", strerror(errno));
+		abort();
+	}
+	if (rrc < 1) {
+		HAIL_WARN(&srv_log, "pipe short read");
+		abort();
+	}
+
+	switch(cmd) {
+	case IEC_DUMP:
+		stats_dump();
+		break;
+	case IEC_NOW_MASTER:
+		if (cld_srv.state_cldb == ST_CLDB_MASTER)
+			break;
+
+		cldb_state_process(ST_CLDB_MASTER);
+		cld_srv.state_cldb = ST_CLDB_MASTER;
+
+		HAIL_DEBUG(&srv_log, "CLDB state > %s",
+			   state_name_cldb[cld_srv.state_cldb]);
+		break;
+	case IEC_NOW_SLAVE:
+		if (cld_srv.state_cldb == ST_CLDB_SLAVE)
+			break;
+
+		cldb_state_process(ST_CLDB_SLAVE);
+		cld_srv.state_cldb = ST_CLDB_SLAVE;
+
+		HAIL_DEBUG(&srv_log, "CLDB state > %s",
+			   state_name_cldb[cld_srv.state_cldb]);
+		break;
+	default:
+		HAIL_WARN(&srv_log, "%s BUG: command 0x%x", __func__, cmd);
+		break;
+	}
+}
+
+static void cldb_state_process(enum st_cldb new_state)
+{
+	unsigned int db_flags;
+
+	if ((new_state == ST_CLDB_MASTER || new_state == ST_CLDB_SLAVE) &&
+	    cld_srv.state_cldb == ST_CLDB_ACTIVE) {
+
+		db_flags = DB_CREATE | DB_THREAD;
+		if (cldb_up(&cld_srv.cldb, db_flags))
+			return;
+
+		ensure_root();
+
+		if (sess_load(cld_srv.sessions) != 0) {
+			HAIL_ERR(&srv_log, "session load failed. "
+			       "FIXME: I want error handling");
+			return;
+		}
+
+		add_chkpt_timer();
+	} else {
+		if (srv_log.verbose)
+		      HAIL_DEBUG(&srv_log,"unhandled state transition %d -> %d",
+			     cld_srv.state_cldb, new_state);
+      }
+}
+
 static int net_open_socket(int addr_fam, int sock_type, int sock_prot,
 			   int addr_len, void *addr_ptr)
 {
@@ -1067,8 +1238,8 @@ static void term_signal(int signo)
 
 static void stats_signal(int signo)
 {
-	dump_stats = true;
-	event_loopbreak();
+	static const unsigned char cmd = IEC_DUMP;
+	write(cld_srv.ev_pipe[1], &cmd, 1);
 }
 
 #define X(stat) \
@@ -1083,6 +1254,53 @@ static void stats_dump(void)
 
 #undef X
 
+static bool add_remote(const char *arg)
+{
+	size_t arg_len = strlen(arg);
+	int i, port;
+	struct db_remote *rp;
+	char *s_port, *colon;
+
+	if (!arg_len)
+		return false;
+
+	/* verify no whitespace in input */
+	for (i = 0; i < arg_len; i++)
+		if (isspace(arg[i]))
+			return false;
+
+	/* find colon delimiter */
+	colon = strchr(arg, ':');
+	if (!colon || (colon == arg))
+		return false;
+	s_port = colon + 1;
+
+	/* parse replication port number */
+	port = atoi(s_port);
+	if (port < 1 || port > 65535)
+		return false;
+
+	/* alloc and fill in remote-host record */
+	rp = malloc(sizeof(*rp));
+	if (!rp)
+		return false;
+	
+	rp->port = port;
+	rp->host = strdup(arg);
+	if (!rp->host) {
+		free(rp);
+		return false;
+	}
+
+	/* truncate string down to simply hostname portion */
+	rp->host[colon - arg] = 0;
+
+	/* add remote host to global list */
+	cld_srv.rep_remotes = g_list_append(cld_srv.rep_remotes, rp);
+
+	return true;
+}
+
 static error_t parse_opt (int key, char *arg, struct argp_state *state)
 {
 	int v;
@@ -1108,6 +1326,15 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
 	case 'F':
 		cld_srv.flags |= SFL_FOREGROUND;
 		break;
+	case 'm':
+		if ((strlen(arg) > 3) && (strlen(arg) < 64) &&
+		    (strchr(arg, '.')))
+			cld_srv.force_myhost = arg;
+		else {
+			fprintf(stderr, "invalid myhost: '%s'\n", arg);
+			argp_usage(state);
+		}
+		break;
 	case 'p':
 		/*
 		 * We do not permit "0" as an argument in order to be safer
@@ -1126,6 +1353,31 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
 	case 'P':
 		cld_srv.pid_file = arg;
 		break;
+	case 'r':
+		if (atoi(arg) > 0 && atoi(arg) < 65536)
+			cld_srv.rep_port = atoi(arg);
+		else {
+			fprintf(stderr, "invalid rep-port: '%s'\n", arg);
+			argp_usage(state);
+		}
+		break;
+	case 'R':
+		if (!add_remote(arg)) {
+			fprintf(stderr, "invalid remote host:port: '%s'\n", arg);
+			argp_usage(state);
+		}
+		break;
+	case 'S': {
+		int n_peers = atoi(arg);
+		if ((n_peers >= CLD_MIN_PEERS) && (n_peers < CLD_MAX_PEERS) &&
+		    (n_peers & 0x01))
+			cld_srv.n_peers = atoi(arg);
+		else {
+			fprintf(stderr, "invalid peer count: '%s'\n", arg);
+			argp_usage(state);
+		}
+		break;
+		}
 
 	case 1001:			/* --strict-free */
 		strict_free = true;
@@ -1153,11 +1405,6 @@ static int main_loop(void)
 		event_dispatch();
 
 		gettimeofday(&current_time, NULL);
-
-		if (dump_stats) {
-			dump_stats = false;
-			stats_dump();
-		}
 	}
 
 	return 0;
@@ -1166,7 +1413,7 @@ static int main_loop(void)
 int main (int argc, char *argv[])
 {
 	error_t aprc;
-	int rc = 1;
+	int rc = 1, env_flags;
 
 	INIT_LIST_HEAD(&cld_srv.sockets);
 
@@ -1195,6 +1442,20 @@ int main (int argc, char *argv[])
 
 	cld_srv.evbase_main = event_init();
 
+	if (cld_srv.force_myhost)
+		cld_srv.myhost = strdup(cld_srv.force_myhost);
+	else
+		cld_srv.myhost = get_hostname();
+
+	if (srv_log.verbose)
+		HAIL_DEBUG(&srv_log, "our hostname: %s", cld_srv.myhost);
+
+	/* remotes file should list all in peer group, except for us */
+	if ((cld_srv.n_peers - 1) != g_list_length(cld_srv.rep_remotes)) {
+		HAIL_ERR(&srv_log, "n_peers does not match remotes file loaded");
+		goto err_out;
+	}
+
 	if (!(cld_srv.flags & SFL_FOREGROUND) && (daemon(1, !use_syslog) < 0)) {
 		syslogerr("daemon");
 		goto err_out;
@@ -1215,16 +1476,7 @@ int main (int argc, char *argv[])
 	signal(SIGTERM, term_signal);
 	signal(SIGUSR1, stats_signal);
 
-	if (cldb_init(&cld_srv.cldb, cld_srv.data_dir, NULL,
-		      DB_CREATE | DB_THREAD | DB_RECOVER,
-		      "cld", use_syslog,
-		      DB_CREATE | DB_THREAD, NULL))
-		exit(1);
-
-	ensure_root();
-
 	evtimer_set(&cld_srv.chkpt_timer, cldb_checkpoint, NULL);
-	add_chkpt_timer();
 
 	rc = 1;
 
@@ -1232,18 +1484,38 @@ int main (int argc, char *argv[])
 	if (!cld_srv.sessions)
 		goto err_out_pid;
 
-	if (sess_load(cld_srv.sessions) != 0)
+	if (pipe(cld_srv.ev_pipe) < 0) {
+		syslogerr("pipe");
 		goto err_out_pid;
+	}
 
 	/* set up server networking */
 	rc = net_open();
 	if (rc)
 		goto err_out_pid;
 
+	event_set(&cld_srv.pipe_ev, cld_srv.ev_pipe[0], EV_READ | EV_PERSIST,
+		  internal_event, NULL);
+	if (event_add(&cld_srv.pipe_ev, NULL) < 0)
+		goto err_out_pid;
+
+	env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
+	if (cldb_init(&cld_srv.cldb, cld_srv.data_dir, NULL,
+		    env_flags, "cld", true,
+		    cld_srv.rep_remotes,
+		    cld_srv.myhost, cld_srv.rep_port,
+		    cld_srv.n_peers, cldb_state_cb)) {
+		HAIL_ERR(&srv_log, "Failed to open CLDB, limping");
+	} else
+		cld_srv.state_cldb = ST_CLDB_OPEN;
+
 	HAIL_INFO(&srv_log, "initialized: %s%s%s",
 		  srv_log.debug ? "debug" : "nodebug",
 		  srv_log.verbose ? ", verbose" : "",
 		  strict_free ? ", strict-free" : "");
+	HAIL_INFO(&srv_log, "replication: %s:%u",
+		cld_srv.myhost,
+		cld_srv.rep_port);
 
 	/*
 	 * execute main loop
@@ -1258,7 +1530,8 @@ int main (int argc, char *argv[])
 
 	if (cld_srv.cldb.up)
 		cldb_down(&cld_srv.cldb);
-	cldb_fini(&cld_srv.cldb);
+	if (cld_srv.state_cldb >= ST_CLDB_OPEN)
+		cldb_fini(&cld_srv.cldb);
 
 err_out_pid:
 	unlink(cld_srv.pid_file);
diff --git a/include/cldc.h b/include/cldc.h
index f98d151..c4f0a64 100644
--- a/include/cldc.h
+++ b/include/cldc.h
@@ -225,8 +225,7 @@ extern void cldc_copts_get_metadata(const struct cldc_call_opts *copts,
 
 /* cldc-tcp */
 extern void cldc_tcp_free(struct cldc_tcp *tcp);
-extern int cldc_tcp_new(const char *hostname, int port,
-		 struct cldc_tcp **tcp_out);
+extern int cldc_tcp_new(GList *host_list, struct cldc_tcp **tcp_out);
 extern int cldc_tcp_receive_pkt_data(struct cldc_tcp *tcp);
 extern int cldc_tcp_pkt_send(void *private,
 			  const void *addr, size_t addrlen,
diff --git a/include/ncld.h b/include/ncld.h
index 21b6e36..e46a4e7 100644
--- a/include/ncld.h
+++ b/include/ncld.h
@@ -30,7 +30,6 @@
 #include <cldc.h>
 
 struct ncld_sess {
-	char			*host;
 	unsigned short		port;
 	GMutex			*mutex;
 	GCond			*cond;
@@ -71,7 +70,7 @@ struct ncld_read {
 	int		errc;
 };
 
-extern struct ncld_sess *ncld_sess_open(const char *host, int port,
+extern struct ncld_sess *ncld_sess_open(GList *host_list,
 	int *error, void (*event)(void *, unsigned int), void *ev_arg,
 	const char *cld_user, const char *cld_key, struct hail_log *log);
 extern struct ncld_fh *ncld_open(struct ncld_sess *s, const char *fname,
diff --git a/lib/cldc-tcp.c b/lib/cldc-tcp.c
index 63a753b..96d78e2 100644
--- a/lib/cldc-tcp.c
+++ b/lib/cldc-tcp.c
@@ -42,7 +42,7 @@ void cldc_tcp_free(struct cldc_tcp *tcp)
 	free(tcp);
 }
 
-int cldc_tcp_new(const char *hostname, int port,
+static int cldc_tcp_new_cxn(const char *hostname, int port,
 		 struct cldc_tcp **tcp_out)
 {
 	struct cldc_tcp *tcp;
@@ -99,6 +99,25 @@ int cldc_tcp_new(const char *hostname, int port,
 	return 0;
 }
 
+int cldc_tcp_new(GList *host_list, struct cldc_tcp **tcp_out)
+{
+	struct cldc_host *hp;
+	GList *tmp;
+	int rc = -ENOENT;
+
+	tmp = host_list;
+	while (tmp) {
+		hp = tmp->data;
+		tmp = tmp->next;
+
+		rc = cldc_tcp_new_cxn(hp->host, hp->port, tcp_out);
+		if (rc == 0)
+			break;
+	}
+
+	return rc;
+}
+
 int cldc_tcp_receive_pkt_data(struct cldc_tcp *tcp)
 {
 	static char buf[CLD_RAW_MSG_SZ];	/* BUG: static buf */
diff --git a/lib/cldc.c b/lib/cldc.c
index f0c3b59..8d99bf2 100644
--- a/lib/cldc.c
+++ b/lib/cldc.c
@@ -1326,6 +1326,7 @@ char *cldc_dirent_name(struct cld_dirent_cur *dc)
 	return s;
 }
 
+#if 0
 /*
  * On error, return the code (not negated code like a kernel function would).
  */
@@ -1374,6 +1375,7 @@ static int ncld_gethost(char **hostp, unsigned short *portp,
 
 	return 0;
 }
+#endif /* 0 */
 
 static void ncld_tcp_timer_event(struct cld_timer *timer)
 {
@@ -1600,8 +1602,7 @@ static int ncld_wait_session(struct ncld_sess *nsess)
  * this function returns, with the session going down, for example.
  * This is kind of dirty, but oh well. Maybe we'll fix this later.
  *
- * @param host Host name (NULL if resolving SRV records)
- * @param port Port
+ * @param host_list Host list
  * @param error Buffer for the error code
  * @param ev_func Session event function (ok to be NULL)
  * @param ev_arg User-supplied argument to the session event function
@@ -1609,7 +1610,7 @@ static int ncld_wait_session(struct ncld_sess *nsess)
  * @param cld_key The user key to be used to authentication
  * @param log The application log descriptor (ok to be NULL)
  */
-struct ncld_sess *ncld_sess_open(const char *host, int port, int *error,
+struct ncld_sess *ncld_sess_open(GList *host_list, int *error,
 				 void (*ev_func)(void *, unsigned int),
 				 void *ev_arg,
 				 const char *cld_user, const char *cld_key,
@@ -1643,16 +1644,6 @@ struct ncld_sess *ncld_sess_open(const char *host, int port, int *error,
 	if (!nsess->cond)
 		goto out_cond;
 
-	if (!host) {
-		err = ncld_getsrv(&nsess->host, &nsess->port, log);
-		if (err)
-			goto out_srv;
-	} else {
-		err = ncld_gethost(&nsess->host, &nsess->port, host, port);
-		if (err)
-			goto out_srv;
-	}
-
 	nsess->event = ev_func;
 	nsess->event_arg = ev_arg;
 
@@ -1661,7 +1652,7 @@ struct ncld_sess *ncld_sess_open(const char *host, int port, int *error,
 		goto out_pipe_to;
 	}
 
-	if (cldc_tcp_new(nsess->host, nsess->port, &nsess->tcp)) {
+	if (cldc_tcp_new(host_list, &nsess->tcp)) {
 		err = 1023;
 		goto out_tcp;
 	}
@@ -1704,8 +1695,6 @@ out_tcp:
 	close(nsess->to_thread[0]);
 	close(nsess->to_thread[1]);
 out_pipe_to:
-	free(nsess->host);
-out_srv:
 	g_cond_free(nsess->cond);
 out_cond:
 	g_mutex_free(nsess->mutex);
@@ -2273,7 +2262,6 @@ void ncld_sess_close(struct ncld_sess *nsess)
 	close(nsess->to_thread[1]);
 	g_cond_free(nsess->cond);
 	g_mutex_free(nsess->mutex);
-	free(nsess->host);
 	free(nsess);
 }
 
diff --git a/tools/cldcli.c b/tools/cldcli.c
index d347bf3..5321986 100644
--- a/tools/cldcli.c
+++ b/tools/cldcli.c
@@ -682,7 +682,6 @@ static void prompt(void)
 int main (int argc, char *argv[])
 {
 	char linebuf[CLD_PATH_MAX + 1];
-	struct cldc_host *dr;
 	error_t aprc;
 	int error;
 
@@ -720,9 +719,8 @@ int main (int argc, char *argv[])
 
 	printf("Waiting for session startup...\n");
 	fflush(stdout);
-	dr = host_list->data;
 
-	nsess = ncld_sess_open(dr->host, dr->port, &error, sess_event, NULL,
+	nsess = ncld_sess_open(host_list, &error, sess_event, NULL,
 			     our_user, our_user, &cli_log);
 	if (!nsess) {
 		if (error < 1000) {
diff --git a/tools/cldfuse.c b/tools/cldfuse.c
index 2cba917..395f4a3 100644
--- a/tools/cldfuse.c
+++ b/tools/cldfuse.c
@@ -348,7 +348,6 @@ static int cldfuse_process_arg(void *data, const char *arg, int key,
 int main(int argc, char *argv[])
 {
 	struct fuse_args args = FUSE_ARGS_INIT(argc, argv);
-	struct cldc_host *dr;
 	int error;
 
 	if (fuse_opt_parse(&args, NULL, cldfuse_opts, cldfuse_process_arg)) {
@@ -372,9 +371,7 @@ int main(int argc, char *argv[])
 		}
 	}
 
-	dr = param.host_list->data;
-
-	sess = ncld_sess_open(dr->host, dr->port, &error, sess_event, NULL,
+	sess = ncld_sess_open(param.host_list, &error, sess_event, NULL,
 			     "cldfuse", "cldfuse", &cldfuse_log);
 	if (!sess) {
 		if (error < 1000) {

^ permalink raw reply related	[flat|nested] 6+ messages in thread

* Re: [PATCH 2/3] CLD: switch network proto from UDP to TCP
  2010-12-31 10:57 ` [PATCH 2/3] CLD: switch network proto from UDP to TCP Jeff Garzik
  2010-12-31 10:58   ` [PATCH 3/3] CLD: enable replication on server and client Jeff Garzik
@ 2011-01-02 23:32   ` Pete Zaitcev
  2011-01-03 10:43     ` Jim Meyering
  2011-01-03 18:00     ` Jeff Garzik
  1 sibling, 2 replies; 6+ messages in thread
From: Pete Zaitcev @ 2011-01-02 23:32 UTC (permalink / raw)
  To: Jeff Garzik; +Cc: hail-devel

On Fri, 31 Dec 2010 05:57:28 -0500
Jeff Garzik <jeff@garzik.org> wrote:

> +	struct cldc_tcp *tcp = private;
> +	ssize_t rc;
> +	struct ubbp_header ubbp;
> +
> +	memcpy(ubbp.magic, "CLD1", 4);
> +	ubbp.op_size = (buflen << 8) | 1;
> +#ifdef WORDS_BIGENDIAN
> +	swab32(ubbp.op_size);
> +#endif
> +
> +	rc = write(tcp->fd, &ubbp, sizeof(ubbp));

Why not this:

	unsigned int n;

	n = (buflen << 8) | 1;
	ubbp.op_size = GUINT32_TO_LE(n);

-- P

^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [PATCH 2/3] CLD: switch network proto from UDP to TCP
  2011-01-02 23:32   ` [PATCH 2/3] CLD: switch network proto from UDP to TCP Pete Zaitcev
@ 2011-01-03 10:43     ` Jim Meyering
  2011-01-03 18:00     ` Jeff Garzik
  1 sibling, 0 replies; 6+ messages in thread
From: Jim Meyering @ 2011-01-03 10:43 UTC (permalink / raw)
  To: Pete Zaitcev; +Cc: Jeff Garzik, hail-devel

Pete Zaitcev wrote:
> On Fri, 31 Dec 2010 05:57:28 -0500
> Jeff Garzik <jeff@garzik.org> wrote:
>
>> +	struct cldc_tcp *tcp = private;
>> +	ssize_t rc;
>> +	struct ubbp_header ubbp;
>> +
>> +	memcpy(ubbp.magic, "CLD1", 4);
>> +	ubbp.op_size = (buflen << 8) | 1;
>> +#ifdef WORDS_BIGENDIAN
>> +	swab32(ubbp.op_size);
>> +#endif
>> +
>> +	rc = write(tcp->fd, &ubbp, sizeof(ubbp));
>
> Why not this:
>
> 	unsigned int n;
>
> 	n = (buflen << 8) | 1;
> 	ubbp.op_size = GUINT32_TO_LE(n);

Nice.
Avoiding those pesky in-function #ifdefs makes the code more readable.

IMHO, this is kinder still on the eyes of reviewers, since the
types of "n" and "rc" stay even closer to each definition/first-use:

  	unsigned int n = (buflen << 8) | 1;
  	ubbp.op_size = GUINT32_TO_LE(n);

        ssize_t rc = write(tcp->fd, &ubbp, sizeof(ubbp));

But if your coding guidelines require the all-vars-decl'd-at-top style
(seriously anachronistic and more prone to merge conflicts), then I guess
that's not an option...  Though you may want to reconsider the policy,
if your goal is portability:

    For the record, compilers that reject decl-after-stmt are not
    relevant any more.  At least, there are so few that I've been
    able to keep that sort of construct in coreutils for the past several
    years.  I used to maintain a c99-to-c89 patch, but removed even that
    almost two years ago.

^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [PATCH 2/3] CLD: switch network proto from UDP to TCP
  2011-01-02 23:32   ` [PATCH 2/3] CLD: switch network proto from UDP to TCP Pete Zaitcev
  2011-01-03 10:43     ` Jim Meyering
@ 2011-01-03 18:00     ` Jeff Garzik
  1 sibling, 0 replies; 6+ messages in thread
From: Jeff Garzik @ 2011-01-03 18:00 UTC (permalink / raw)
  To: Pete Zaitcev; +Cc: hail-devel

On 01/02/2011 06:32 PM, Pete Zaitcev wrote:
> On Fri, 31 Dec 2010 05:57:28 -0500
> Jeff Garzik<jeff@garzik.org>  wrote:
>
>> +	struct cldc_tcp *tcp = private;
>> +	ssize_t rc;
>> +	struct ubbp_header ubbp;
>> +
>> +	memcpy(ubbp.magic, "CLD1", 4);
>> +	ubbp.op_size = (buflen<<  8) | 1;
>> +#ifdef WORDS_BIGENDIAN
>> +	swab32(ubbp.op_size);
>> +#endif
>> +
>> +	rc = write(tcp->fd,&ubbp, sizeof(ubbp));
>
> Why not this:
>
> 	unsigned int n;
>
> 	n = (buflen<<  8) | 1;
> 	ubbp.op_size = GUINT32_TO_LE(n);

Yep.

I used the #ifdef on the read(2) side, where I did not want to create an 
additional var...  then I copied that onto the write(2) side, where it 
is less efficient as you point out.

	Jeff



^ permalink raw reply	[flat|nested] 6+ messages in thread

end of thread, other threads:[~2011-01-03 18:00 UTC | newest]

Thread overview: 6+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2010-12-31 10:56 [PATCH 1/3] CLD: convert back to libevent Jeff Garzik
2010-12-31 10:57 ` [PATCH 2/3] CLD: switch network proto from UDP to TCP Jeff Garzik
2010-12-31 10:58   ` [PATCH 3/3] CLD: enable replication on server and client Jeff Garzik
2011-01-02 23:32   ` [PATCH 2/3] CLD: switch network proto from UDP to TCP Pete Zaitcev
2011-01-03 10:43     ` Jim Meyering
2011-01-03 18:00     ` Jeff Garzik

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.