* [PATCH v1] CLD replication (WIP)
@ 2009-07-31 10:40 Jeff Garzik
2009-07-31 18:14 ` Sage Weil
0 siblings, 1 reply; 7+ messages in thread
From: Jeff Garzik @ 2009-07-31 10:40 UTC (permalink / raw)
To: hail-devel
Below is the current CLD replication patch, which takes CLD from being
a single-node service to a fully replicated, highly available service.
The server implementation should be complete.
The current merge blocker is needed code in libcldc, which does not
yet properly "hunt" for a master, among a group of peer CLD replicas
in a CLD cell.
This will be a big milestone for CLD, when merged. The next milestone
will be adding the needed strict-cache-coherence caching semantics
to CLD server and client lib.
This patch was generated against git commit
511b8dafb233ee85e60ddf7eda212f87963e150c.
---
server/cld.h | 20 +++
server/cldb.c | 69 +++++++++++--
server/cldb.h | 9 +
server/cldbadm.c | 8 -
server/server.c | 286 +++++++++++++++++++++++++++++++++++++++++++++++++++---
test/pid-exists | 13 +-
test/prep-db | 19 ++-
test/start-daemon | 26 ++++
test/stop-daemon | 32 ++++--
9 files changed, 428 insertions(+), 54 deletions(-)
diff --git a/server/cld.h b/server/cld.h
index 21f103d..08e6b12 100644
--- a/server/cld.h
+++ b/server/cld.h
@@ -91,6 +91,15 @@ struct msg_params {
size_t msg_len;
};
+enum st_cldb {
+ ST_CLDB_INIT,
+ ST_CLDB_OPEN,
+ ST_CLDB_ACTIVE,
+ ST_CLDB_MASTER,
+ ST_CLDB_SLAVE,
+ ST_CLDBNUM
+};
+
struct server_stats {
unsigned long poll; /* number polls */
unsigned long event; /* events dispatched */
@@ -114,6 +123,17 @@ struct server {
int pid_fd;
char *port; /* bind port */
+ unsigned short rep_port; /* db4 replication port */
+
+ char *myhost;
+ char *force_myhost;
+ GList *rep_remotes;
+
+ unsigned int n_peers; /* total peers in cell */
+
+ int rep_pipe[2];
+
+ enum st_cldb state_cldb, state_cldb_new;
struct cldb cldb; /* database info */
diff --git a/server/cldb.c b/server/cldb.c
index 3e7c95c..254decd 100644
--- a/server/cldb.c
+++ b/server/cldb.c
@@ -25,8 +25,6 @@
#include <glib.h>
#include "cld.h"
-static int cldb_up(struct cldb *cldb, unsigned int flags);
-
/*
* db4 page sizes for our various databases. Filesystem block size
* is recommended, so 4096 was chosen (default ext3 block size).
@@ -202,6 +200,30 @@ err_out:
return -EIO;
}
+static int add_remote_sites(DB_ENV *dbenv, GList *remotes, int *nsites)
+{
+ int rc;
+ struct db_remote *rp;
+ GList *tmp;
+
+ *nsites = 0;
+ for (tmp = remotes; tmp; tmp = tmp->next) {
+ rp = tmp->data;
+
+ rc = dbenv->repmgr_add_remote_site(dbenv, rp->host, rp->port,
+ NULL, 0);
+ if (rc) {
+ dbenv->err(dbenv, rc,
+ "dbenv->add.remote.site host %s port %u",
+ rp->host, rp->port);
+ return rc;
+ }
+ (*nsites)++;
+ }
+
+ return 0;
+}
+
static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
{
struct cldb *cldb = dbenv->app_private;
@@ -229,12 +251,13 @@ static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
unsigned int env_flags, const char *errpfx, bool do_syslog,
- unsigned int flags, void (*cb)(enum db_event))
+ GList *remotes, char *rep_host, unsigned short rep_port,
+ int n_peers, void (*cb)(enum db_event))
{
- int rc;
+ int rc, nsites = 0;
DB_ENV *dbenv;
- cldb->is_master = true;
+ cldb->is_master = false;
cldb->home = db_home;
cldb->state_cb = cb;
@@ -281,25 +304,55 @@ int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
cldb->keyed = true;
}
+ rc = dbenv->repmgr_set_local_site(dbenv, rep_host, rep_port, 0);
+ if (rc) {
+ dbenv->err(dbenv, rc, "dbenv->set_local_site");
+ goto err_out;
+ }
+
rc = dbenv->set_event_notify(dbenv, db4_event);
if (rc) {
dbenv->err(dbenv, rc, "dbenv->set_event_notify");
goto err_out;
}
+ rc = dbenv->rep_set_priority(dbenv, 100);
+ if (rc) {
+ dbenv->err(dbenv, rc, "dbenv->rep_set_priority");
+ goto err_out;
+ }
+
+ rc = dbenv->rep_set_nsites(dbenv, n_peers);
+ if (rc) {
+ dbenv->err(dbenv, rc, "dbenv->rep_set_nsites");
+ goto err_out;
+ }
+
+ rc = dbenv->repmgr_set_ack_policy(dbenv, DB_REPMGR_ACKS_QUORUM);
+ if (rc) {
+ dbenv->err(dbenv, rc, "dbenv->rep_ack_policy");
+ goto err_out;
+ }
+
/* init DB transactional environment, stored in directory db_home */
env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
- env_flags |= DB_INIT_TXN;
+ env_flags |= DB_INIT_TXN | DB_INIT_REP;
rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
if (rc) {
dbenv->err(dbenv, rc, "dbenv->open");
goto err_out;
}
- rc = cldb_up(cldb, flags);
+ rc = add_remote_sites(dbenv, remotes, &nsites);
if (rc)
goto err_out;
+ rc = dbenv->repmgr_start(dbenv, 2, DB_REP_ELECTION);
+ if (rc) {
+ dbenv->err(dbenv, rc, "dbenv->repmgr_start");
+ goto err_out;
+ }
+
return 0;
err_out:
@@ -310,7 +363,7 @@ err_out:
/*
* open databases
*/
-static int cldb_up(struct cldb *cldb, unsigned int flags)
+int cldb_up(struct cldb *cldb, unsigned int flags)
{
DB_ENV *dbenv = cldb->env;
int rc;
diff --git a/server/cldb.h b/server/cldb.h
index d28f732..f8f26db 100644
--- a/server/cldb.h
+++ b/server/cldb.h
@@ -107,6 +107,11 @@ enum db_event {
CLDB_EV_NONE, CLDB_EV_CLIENT, CLDB_EV_MASTER, CLDB_EV_ELECTED
};
+struct db_remote { /* remotes for cldb_init */
+ char *host;
+ unsigned short port;
+};
+
struct cldb {
bool is_master;
bool keyed; /* using encryption? */
@@ -133,7 +138,9 @@ struct cldb {
extern int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
unsigned int env_flags, const char *errpfx, bool do_syslog,
- unsigned int flags, void (*cb)(enum db_event));
+ GList *remotes, char *rep_host, unsigned short rep_port,
+ int n_peers, void (*cb)(enum db_event));
+extern int cldb_up(struct cldb *cldb, unsigned int flags);
extern void cldb_down(struct cldb *cldb);
extern void cldb_fini(struct cldb *cldb);
diff --git a/server/cldbadm.c b/server/cldbadm.c
index 37e8e36..9342f66 100644
--- a/server/cldbadm.c
+++ b/server/cldbadm.c
@@ -78,7 +78,8 @@ int main(int argc, char *argv[])
}
if (cldb_init(&cld_adm.cldb, cld_adm.data_dir, NULL,
- DB_RECOVER, "cldbadm", false, 0, NULL))
+ DB_RECOVER, "cldbadm", false,
+ NULL, NULL, 0, 0, NULL))
goto err_dbopen;
switch (cld_adm.mode) {
@@ -142,8 +143,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state)
* Stubs for contents of cldb.c
*/
int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
- unsigned int env_flags, const char *errpfx, bool do_syslog,
- unsigned int flags, void (*cb)(enum db_event))
+ unsigned int env_flags, const char *errpfx, bool do_syslog,
+ GList *remotes, char *rep_host, unsigned short rep_port,
+ int n_peers, void (*cb)(enum db_event))
{
return 0;
diff --git a/server/server.c b/server/server.c
index 02e6231..fb51a42 100644
--- a/server/server.c
+++ b/server/server.c
@@ -29,6 +29,7 @@
#include <errno.h>
#include <syslog.h>
#include <locale.h>
+#include <ctype.h>
#include <argp.h>
#include <netdb.h>
#include <signal.h>
@@ -46,6 +47,12 @@ const char *argp_program_version = PACKAGE_VERSION;
enum {
CLD_RAW_MSG_SZ = 4096,
+
+ CLD_DEF_REP_PORT = 9081,
+
+ CLD_DEF_PEERS = 5,
+ CLD_MIN_PEERS = 3,
+ CLD_MAX_PEERS = 400, /* arbitrary "sanity" limit */
};
static struct argp_option options[] = {
@@ -58,10 +65,18 @@ static struct argp_option options[] = {
"Switch the log to standard error" },
{ "foreground", 'F', NULL, 0,
"Run in foreground, do not fork" },
+ { "myhost", 'm', "HOST", 0,
+ "Force local hostname to HOST (def: autodetect)" },
{ "port", 'p', "PORT", 0,
"bind to UDP port PORT. Default: " CLD_DEF_PORT },
{ "pid", 'P', "FILE", 0,
"Write daemon process id to FILE. Default: " CLD_DEF_PIDFN },
+ { "rep-port", 'r', "PORT", 0,
+ "bind replication engine to port PORT (def: 9081)" },
+ { "remote", 'R', "HOST:PORT", 0,
+ "Add a HOST:PORT pair to list of remote hosts. Use this argument multiple times to build cell's peer list." },
+ { "cell-size", 'S', "PEERS", 0,
+ "Total number of PEERS in cell. (PEERS/2)+1 required for quorum. Must be an odd number (def: 5)" },
{ }
};
@@ -79,10 +94,15 @@ static bool use_syslog = true;
int debugging = 0;
struct timeval current_time;
+static const char *state_name_cldb[ST_CLDBNUM] = {
+ "Init", "Open", "Active", "Master", "Slave"
+};
struct server cld_srv = {
- .data_dir = CLD_DEF_DATADIR,
- .pid_file = CLD_DEF_PIDFN,
+ .data_dir = "/spare/tmp/cld/lib",
+ .pid_file = "/var/run/cld.pid",
.port = CLD_DEF_PORT,
+ .rep_port = CLD_DEF_REP_PORT,
+ .n_peers = CLD_DEF_PEERS,
};
static void ensure_root(void);
@@ -108,6 +128,33 @@ void cldlog(int prio, const char *fmt, ...)
va_end(ap);
}
+/*
+ * Find out own hostname.
+ * This is needed for:
+ * - finding the local domain and its SRV records
+ * Do this before our state machines start ticking, so we can quit with
+ * a meaningful message easily.
+ */
+static char *get_hostname(void)
+{
+ enum { hostsz = 64 };
+ char hostb[hostsz];
+ char *ret;
+
+ if (gethostname(hostb, hostsz-1) < 0) {
+ cldlog(LOG_ERR, "get_hostname: gethostname error (%d): %s",
+ errno, strerror(errno));
+ exit(1);
+ }
+ hostb[hostsz-1] = 0;
+ if ((ret = strdup(hostb)) == NULL) {
+ cldlog(LOG_ERR, "get_hostname: no core (%ld)",
+ (long)strlen(hostb));
+ exit(1);
+ }
+ return ret;
+}
+
int udp_tx(struct server_socket *sock, struct sockaddr *addr,
socklen_t addr_len, const void *data, size_t data_len)
{
@@ -484,6 +531,55 @@ static void cldb_checkpoint(struct timer *timer)
add_chkpt_timer();
}
+static void cldb_state_cb(enum db_event event)
+{
+
+ switch (event) {
+ case CLDB_EV_ELECTED:
+ /*
+ * Safe to stop ignoring bogus client indication,
+ * so unmute us by advancing the state.
+ */
+ if (cld_srv.state_cldb == ST_CLDB_OPEN)
+ cld_srv.state_cldb = ST_CLDB_ACTIVE;
+ break;
+ case CLDB_EV_CLIENT:
+ case CLDB_EV_MASTER:
+ /*
+ * This callback runs on the context of the replication
+ * manager thread, and calling any of our functions thus
+ * turns our program into a multi-threaded one. Instead
+ * we do a loopbreak and postpone the processing.
+ */
+ if (cld_srv.state_cldb != ST_CLDB_INIT &&
+ cld_srv.state_cldb != ST_CLDB_OPEN) {
+ char c = 0x42;
+
+ if (event == CLDB_EV_MASTER)
+ cld_srv.state_cldb_new = ST_CLDB_MASTER;
+ else
+ cld_srv.state_cldb_new = ST_CLDB_SLAVE;
+ if (debugging) {
+ cldlog(LOG_DEBUG, "CLDB state > %s",
+ state_name_cldb[cld_srv.state_cldb_new]);
+ }
+
+ /* wake up main loop */
+ write(cld_srv.rep_pipe[1], &c, 1);
+ }
+ break;
+ default:
+ cldlog(LOG_WARNING, "API confusion with CLDB, event 0x%x", event);
+ cld_srv.state_cldb = ST_CLDB_OPEN; /* wrong, stub for now */
+ cld_srv.state_cldb_new = ST_CLDB_INIT;
+ }
+}
+
+static bool noop_event(int fd, short events, void *userdata)
+{
+ return true; /* continue main loop; do NOT terminate server */
+}
+
static int net_open(void)
{
int ipv6_found;
@@ -575,6 +671,32 @@ err_addr:
return rc;
}
+static void cldb_state_process(enum st_cldb new_state)
+{
+ unsigned int db_flags;
+
+ if ((new_state == ST_CLDB_MASTER || new_state == ST_CLDB_SLAVE) &&
+ cld_srv.state_cldb == ST_CLDB_ACTIVE) {
+
+ db_flags = DB_CREATE | DB_THREAD;
+ if (cldb_up(&cld_srv.cldb, db_flags))
+ return;
+
+ ensure_root();
+
+ if (sess_load(cld_srv.sessions) != 0) {
+ cldlog(LOG_ERR, "session load failed. FIXME: I want error handling");
+ return;
+ }
+
+ add_chkpt_timer();
+ } else {
+ if (debugging)
+ cldlog(LOG_DEBUG, "unhandled state transition %d -> %d",
+ cld_srv.state_cldb, new_state);
+ }
+}
+
static void segv_signal(int signal)
{
cldlog(LOG_ERR, "SIGSEGV");
@@ -598,10 +720,59 @@ static void stats_dump(void)
{
X(poll);
X(event);
+ cldlog(LOG_INFO, "State: CLDB %s",
+ state_name_cldb[cld_srv.state_cldb]);
}
#undef X
+static bool add_remote(const char *arg)
+{
+ size_t arg_len = strlen(arg);
+ int i, port;
+ struct db_remote *rp;
+ char *s_port, *colon;
+
+ if (!arg_len)
+ return false;
+
+ /* verify no whitespace in input */
+ for (i = 0; i < arg_len; i++)
+ if (isspace(arg[i]))
+ return false;
+
+ /* find colon delimiter */
+ colon = strchr(arg, ':');
+ if (!colon || (colon == arg))
+ return false;
+ s_port = colon + 1;
+
+ /* parse replication port number */
+ port = atoi(s_port);
+ if (port < 1 || port > 65535)
+ return false;
+
+ /* alloc and fill in remote-host record */
+ rp = malloc(sizeof(*rp));
+ if (!rp)
+ return false;
+
+ rp->port = port;
+ rp->host = strdup(arg);
+ if (!rp->host) {
+ free(rp);
+ return false;
+ }
+
+ /* truncate string down to simply hostname portion */
+ rp->host[colon - arg] = 0;
+
+ /* add remote host to global list */
+ cld_srv.rep_remotes = g_list_append(cld_srv.rep_remotes, rp);
+
+ return true;
+}
+
static error_t parse_opt (int key, char *arg, struct argp_state *state)
{
switch(key) {
@@ -622,6 +793,15 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
case 'F':
cld_srv.flags |= SFL_FOREGROUND;
break;
+ case 'm':
+ if ((strlen(arg) > 3) && (strlen(arg) < 64) &&
+ (strchr(arg, '.')))
+ cld_srv.force_myhost = arg;
+ else {
+ fprintf(stderr, "invalid myhost: '%s'\n", arg);
+ argp_usage(state);
+ }
+ break;
case 'p':
if (atoi(arg) > 0 && atoi(arg) < 65536)
cld_srv.port = arg;
@@ -633,6 +813,31 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
case 'P':
cld_srv.pid_file = arg;
break;
+ case 'r':
+ if (atoi(arg) > 0 && atoi(arg) < 65536)
+ cld_srv.rep_port = atoi(arg);
+ else {
+ fprintf(stderr, "invalid rep-port: '%s'\n", arg);
+ argp_usage(state);
+ }
+ break;
+ case 'R':
+ if (!add_remote(arg)) {
+ fprintf(stderr, "invalid remote host:port: '%s'\n", arg);
+ argp_usage(state);
+ }
+ break;
+ case 'S': {
+ int n_peers = atoi(arg);
+ if ((n_peers >= CLD_MIN_PEERS) && (n_peers < CLD_MAX_PEERS) &&
+ (n_peers & 0x01))
+ cld_srv.n_peers = atoi(arg);
+ else {
+ fprintf(stderr, "invalid peer count: '%s'\n", arg);
+ argp_usage(state);
+ }
+ break;
+ }
case ARGP_KEY_ARG:
argp_usage(state); /* too many args */
break;
@@ -648,9 +853,12 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
int main (int argc, char *argv[])
{
error_t aprc;
- int rc = 1;
+ int rc = 1, env_flags;
time_t next_timeout;
+ cld_srv.state_cldb =
+ cld_srv.state_cldb_new = ST_CLDB_INIT;
+
/* isspace() and strcasecmp() consistency requires this */
setlocale(LC_ALL, "C");
@@ -674,6 +882,20 @@ int main (int argc, char *argv[])
if (use_syslog)
openlog(PROGRAM_NAME, LOG_PID, LOG_LOCAL3);
+ if (cld_srv.force_myhost)
+ cld_srv.myhost = strdup(cld_srv.force_myhost);
+ else
+ cld_srv.myhost = get_hostname();
+
+ if (debugging)
+ cldlog(LOG_DEBUG, "our hostname: %s", cld_srv.myhost);
+
+ /* remotes file should list all in peer group, except for us */
+ if ((cld_srv.n_peers - 1) != g_list_length(cld_srv.rep_remotes)) {
+ cldlog(LOG_ERR, "n_peers does not match remotes file loaded");
+ goto err_out;
+ }
+
if (!(cld_srv.flags & SFL_FOREGROUND) && (daemon(1, !use_syslog) < 0)) {
syslogerr("daemon");
goto err_out;
@@ -694,16 +916,7 @@ int main (int argc, char *argv[])
signal(SIGTERM, term_signal);
signal(SIGUSR1, stats_signal);
- if (cldb_init(&cld_srv.cldb, cld_srv.data_dir, NULL,
- DB_CREATE | DB_THREAD | DB_RECOVER,
- "cld", use_syslog,
- DB_CREATE | DB_THREAD, NULL))
- exit(1);
-
- ensure_root();
-
timer_init(&cld_srv.chkpt_timer, cldb_checkpoint, NULL);
- add_chkpt_timer();
rc = 1;
@@ -716,17 +929,53 @@ int main (int argc, char *argv[])
!cld_srv.polls)
goto err_out_pid;
- if (sess_load(cld_srv.sessions) != 0)
- goto err_out_pid;
+ /* init pipe for replication manager notifications to us */
+ if (pipe(cld_srv.rep_pipe) < 0) {
+ syslogerr("pipe");
+ goto err_out;
+ }
/* set up server networking */
rc = net_open();
if (rc)
goto err_out_pid;
+ {
+ struct pollfd pfd;
+ struct server_poll sp;
+
+ /*
+ * add pipe to poll list, after doing so with our net sockets
+ */
+ sp.fd = cld_srv.rep_pipe[0];
+ sp.cb = noop_event;
+ sp.userdata = NULL;
+ g_array_append_val(cld_srv.poll_data, sp);
+
+ pfd.fd = cld_srv.rep_pipe[0];
+ pfd.events = POLLIN;
+ pfd.revents = 0;
+ g_array_append_val(cld_srv.polls, pfd);
+ }
+
+ env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
+ if (cldb_init(&cld_srv.cldb, cld_srv.data_dir, NULL,
+ env_flags, "cld", true,
+ cld_srv.rep_remotes,
+ cld_srv.myhost, cld_srv.rep_port,
+ cld_srv.n_peers, cldb_state_cb)) {
+ cldlog(LOG_ERR, "Failed to open CLDB, limping");
+ } else {
+ cld_srv.state_cldb =
+ cld_srv.state_cldb_new = ST_CLDB_OPEN;
+ }
+
cldlog(LOG_INFO, "initialized: cport %s, dbg %u",
cld_srv.port,
debugging);
+ cldlog(LOG_INFO, "replication: %s:%u",
+ cld_srv.myhost,
+ cld_srv.rep_port);
next_timeout = timers_run();
@@ -789,13 +1038,20 @@ int main (int argc, char *argv[])
}
next_timeout = timers_run();
+
+ if (cld_srv.state_cldb_new != ST_CLDB_INIT &&
+ cld_srv.state_cldb_new != cld_srv.state_cldb) {
+ cldb_state_process(cld_srv.state_cldb_new);
+ cld_srv.state_cldb = cld_srv.state_cldb_new;
+ }
}
cldlog(LOG_INFO, "shutting down");
if (cld_srv.cldb.up)
cldb_down(&cld_srv.cldb);
- cldb_fini(&cld_srv.cldb);
+ if (cld_srv.state_cldb >= ST_CLDB_OPEN)
+ cldb_fini(&cld_srv.cldb);
rc = 0;
diff --git a/test/pid-exists b/test/pid-exists
index 351b4f1..4fa2275 100755
--- a/test/pid-exists
+++ b/test/pid-exists
@@ -1,9 +1,12 @@
#!/bin/sh
-if [ ! -f cld.pid ]
-then
- echo "pid file not found."
- exit 1
-fi
+for n in 1 2 3
+do
+ if [ ! -f cld$n.pid ]
+ then
+ echo "cld$n.pid not found."
+ exit 1
+ fi
+done
exit 0
diff --git a/test/prep-db b/test/prep-db
index 353ca4a..3e4fb60 100755
--- a/test/prep-db
+++ b/test/prep-db
@@ -2,13 +2,16 @@
DATADIR=data
-mkdir -p $DATADIR
-
-if [ ! -d $DATADIR ]
-then
- rm -rf $DATADIR
- echo "test database dir not found."
- exit 1
-fi
+for n in 1 2 3
+do
+ mkdir -p $DATADIR/n$n/data
+
+ if [ ! -d $DATADIR/n$n/data ]
+ then
+ rm -rf $DATADIR
+ echo "test database dir for node $n not found."
+ exit 1
+ fi
+done
exit 0
diff --git a/test/start-daemon b/test/start-daemon
index 4cb9fd7..06b3250 100755
--- a/test/start-daemon
+++ b/test/start-daemon
@@ -1,13 +1,31 @@
#!/bin/sh
-if [ -f cld.pid ]
+if [ -f cld1.pid -o -f cld2.pid -o -f cld3.pid ]
then
- echo "pid file found. daemon still running?"
+ echo "pid file found. daemons still running?"
exit 1
fi
-../server/cld -P cld.pid -d "$PWD/data" -p 18181 -E
+../server/cld -d "$PWD/data/n1/data" -p 18181 -r 19181 -P cld1.pid -E \
+ -D 2 -S 3 \
+ -m localhost.localdomain \
+ -R localhost.localdomain:19182 \
+ -R localhost.localdomain:19183
-sleep 3
+../server/cld -d "$PWD/data/n2/data" -p 18182 -r 19182 -P cld2.pid -E \
+ -D 2 -S 3 \
+ -m localhost.localdomain \
+ -R localhost.localdomain:19181 \
+ -R localhost.localdomain:19183
+
+../server/cld -d "$PWD/data/n3/data" -p 18183 -r 19183 -P cld3.pid -E \
+ -D 2 -S 3 \
+ -m localhost.localdomain \
+ -R localhost.localdomain:19181 \
+ -R localhost.localdomain:19182
+sleep 1
+
+echo " start-daemon: Waiting 20s, for daemons to start up..."
+sleep 20
exit 0
diff --git a/test/stop-daemon b/test/stop-daemon
index 221dc46..d00fda6 100755
--- a/test/stop-daemon
+++ b/test/stop-daemon
@@ -1,23 +1,35 @@
#!/bin/sh
-if [ ! -f cld.pid ]
-then
- echo no daemon pid file found.
- exit 1
-fi
+for n in 1 2 3
+do
+ if [ ! -f cld$n.pid ]
+ then
+ echo " stop-daemon: cld$n.pid not found."
+ exit 1
+ fi
+done
+
-kill `cat cld.pid`
+kill `cat cld1.pid cld2.pid cld3.pid`
for ((n = 0; n < 10; n++))
do
- if [ ! -f cld.pid ]
+ if [ -f cld1.pid -o -f cld2.pid -o -f cld3.pid ]
then
+ sleep 1
+ else
exit 0
fi
- sleep 1
done
-echo "PID file not removed, after signal sent."
-rm -f cld.pid
+for n in 1 2 3
+do
+ if [ -f cld$n.pid ]
+ then
+ echo " stop-daemon: cld$n.pid found, after signal sent."
+ fi
+done
+
+rm -f cld?.pid
exit 1
^ permalink raw reply related [flat|nested] 7+ messages in thread
* [PATCH v1] CLD replication (WIP)
@ 2009-07-31 11:15 Jeff Garzik
2009-08-06 5:49 ` [PATCH v2] " Jeff Garzik
0 siblings, 1 reply; 7+ messages in thread
From: Jeff Garzik @ 2009-07-31 11:15 UTC (permalink / raw)
To: Project Hail
[-- Attachment #1: Type: text/plain, Size: 1058 bytes --]
[resend #1... not sure where first try went]
Attached is the current CLD replication patch, which takes CLD from
being a single-node service to a fully replicated, highly available service.
The server implementation should be complete.
The current merge blocker is needed code in libcldc, which does not yet
properly "hunt" for a master, among a group of peer CLD replicas in a
CLD cell.
This will be a big milestone for CLD, when merged. The next milestone
will be adding the needed strict-cache-coherence caching semantics to
CLD server and client lib.
This patch was generated against git commit
511b8dafb233ee85e60ddf7eda212f87963e150c.
> server/cld.h | 20 +++
> server/cldb.c | 69 +++++++++++--
> server/cldb.h | 9 +
> server/cldbadm.c | 8 -
> server/server.c | 286 +++++++++++++++++++++++++++++++++++++++++++++++++++---
> test/pid-exists | 13 +-
> test/prep-db | 19 ++-
> test/start-daemon | 26 ++++
> test/stop-daemon | 32 ++++--
> 9 files changed, 428 insertions(+), 54 deletions(-)
[-- Attachment #2: patch.cld-rep --]
[-- Type: text/plain, Size: 19366 bytes --]
diff --git a/server/cld.h b/server/cld.h
index 21f103d..08e6b12 100644
--- a/server/cld.h
+++ b/server/cld.h
@@ -91,6 +91,15 @@ struct msg_params {
size_t msg_len;
};
+enum st_cldb {
+ ST_CLDB_INIT,
+ ST_CLDB_OPEN,
+ ST_CLDB_ACTIVE,
+ ST_CLDB_MASTER,
+ ST_CLDB_SLAVE,
+ ST_CLDBNUM
+};
+
struct server_stats {
unsigned long poll; /* number polls */
unsigned long event; /* events dispatched */
@@ -114,6 +123,17 @@ struct server {
int pid_fd;
char *port; /* bind port */
+ unsigned short rep_port; /* db4 replication port */
+
+ char *myhost;
+ char *force_myhost;
+ GList *rep_remotes;
+
+ unsigned int n_peers; /* total peers in cell */
+
+ int rep_pipe[2];
+
+ enum st_cldb state_cldb, state_cldb_new;
struct cldb cldb; /* database info */
diff --git a/server/cldb.c b/server/cldb.c
index 3e7c95c..254decd 100644
--- a/server/cldb.c
+++ b/server/cldb.c
@@ -25,8 +25,6 @@
#include <glib.h>
#include "cld.h"
-static int cldb_up(struct cldb *cldb, unsigned int flags);
-
/*
* db4 page sizes for our various databases. Filesystem block size
* is recommended, so 4096 was chosen (default ext3 block size).
@@ -202,6 +200,30 @@ err_out:
return -EIO;
}
+static int add_remote_sites(DB_ENV *dbenv, GList *remotes, int *nsites)
+{
+ int rc;
+ struct db_remote *rp;
+ GList *tmp;
+
+ *nsites = 0;
+ for (tmp = remotes; tmp; tmp = tmp->next) {
+ rp = tmp->data;
+
+ rc = dbenv->repmgr_add_remote_site(dbenv, rp->host, rp->port,
+ NULL, 0);
+ if (rc) {
+ dbenv->err(dbenv, rc,
+ "dbenv->add.remote.site host %s port %u",
+ rp->host, rp->port);
+ return rc;
+ }
+ (*nsites)++;
+ }
+
+ return 0;
+}
+
static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
{
struct cldb *cldb = dbenv->app_private;
@@ -229,12 +251,13 @@ static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
unsigned int env_flags, const char *errpfx, bool do_syslog,
- unsigned int flags, void (*cb)(enum db_event))
+ GList *remotes, char *rep_host, unsigned short rep_port,
+ int n_peers, void (*cb)(enum db_event))
{
- int rc;
+ int rc, nsites = 0;
DB_ENV *dbenv;
- cldb->is_master = true;
+ cldb->is_master = false;
cldb->home = db_home;
cldb->state_cb = cb;
@@ -281,25 +304,55 @@ int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
cldb->keyed = true;
}
+ rc = dbenv->repmgr_set_local_site(dbenv, rep_host, rep_port, 0);
+ if (rc) {
+ dbenv->err(dbenv, rc, "dbenv->set_local_site");
+ goto err_out;
+ }
+
rc = dbenv->set_event_notify(dbenv, db4_event);
if (rc) {
dbenv->err(dbenv, rc, "dbenv->set_event_notify");
goto err_out;
}
+ rc = dbenv->rep_set_priority(dbenv, 100);
+ if (rc) {
+ dbenv->err(dbenv, rc, "dbenv->rep_set_priority");
+ goto err_out;
+ }
+
+ rc = dbenv->rep_set_nsites(dbenv, n_peers);
+ if (rc) {
+ dbenv->err(dbenv, rc, "dbenv->rep_set_nsites");
+ goto err_out;
+ }
+
+ rc = dbenv->repmgr_set_ack_policy(dbenv, DB_REPMGR_ACKS_QUORUM);
+ if (rc) {
+ dbenv->err(dbenv, rc, "dbenv->rep_ack_policy");
+ goto err_out;
+ }
+
/* init DB transactional environment, stored in directory db_home */
env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
- env_flags |= DB_INIT_TXN;
+ env_flags |= DB_INIT_TXN | DB_INIT_REP;
rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
if (rc) {
dbenv->err(dbenv, rc, "dbenv->open");
goto err_out;
}
- rc = cldb_up(cldb, flags);
+ rc = add_remote_sites(dbenv, remotes, &nsites);
if (rc)
goto err_out;
+ rc = dbenv->repmgr_start(dbenv, 2, DB_REP_ELECTION);
+ if (rc) {
+ dbenv->err(dbenv, rc, "dbenv->repmgr_start");
+ goto err_out;
+ }
+
return 0;
err_out:
@@ -310,7 +363,7 @@ err_out:
/*
* open databases
*/
-static int cldb_up(struct cldb *cldb, unsigned int flags)
+int cldb_up(struct cldb *cldb, unsigned int flags)
{
DB_ENV *dbenv = cldb->env;
int rc;
diff --git a/server/cldb.h b/server/cldb.h
index d28f732..f8f26db 100644
--- a/server/cldb.h
+++ b/server/cldb.h
@@ -107,6 +107,11 @@ enum db_event {
CLDB_EV_NONE, CLDB_EV_CLIENT, CLDB_EV_MASTER, CLDB_EV_ELECTED
};
+struct db_remote { /* remotes for cldb_init */
+ char *host;
+ unsigned short port;
+};
+
struct cldb {
bool is_master;
bool keyed; /* using encryption? */
@@ -133,7 +138,9 @@ struct cldb {
extern int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
unsigned int env_flags, const char *errpfx, bool do_syslog,
- unsigned int flags, void (*cb)(enum db_event));
+ GList *remotes, char *rep_host, unsigned short rep_port,
+ int n_peers, void (*cb)(enum db_event));
+extern int cldb_up(struct cldb *cldb, unsigned int flags);
extern void cldb_down(struct cldb *cldb);
extern void cldb_fini(struct cldb *cldb);
diff --git a/server/cldbadm.c b/server/cldbadm.c
index 37e8e36..9342f66 100644
--- a/server/cldbadm.c
+++ b/server/cldbadm.c
@@ -78,7 +78,8 @@ int main(int argc, char *argv[])
}
if (cldb_init(&cld_adm.cldb, cld_adm.data_dir, NULL,
- DB_RECOVER, "cldbadm", false, 0, NULL))
+ DB_RECOVER, "cldbadm", false,
+ NULL, NULL, 0, 0, NULL))
goto err_dbopen;
switch (cld_adm.mode) {
@@ -142,8 +143,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state)
* Stubs for contents of cldb.c
*/
int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
- unsigned int env_flags, const char *errpfx, bool do_syslog,
- unsigned int flags, void (*cb)(enum db_event))
+ unsigned int env_flags, const char *errpfx, bool do_syslog,
+ GList *remotes, char *rep_host, unsigned short rep_port,
+ int n_peers, void (*cb)(enum db_event))
{
return 0;
diff --git a/server/server.c b/server/server.c
index 02e6231..fb51a42 100644
--- a/server/server.c
+++ b/server/server.c
@@ -29,6 +29,7 @@
#include <errno.h>
#include <syslog.h>
#include <locale.h>
+#include <ctype.h>
#include <argp.h>
#include <netdb.h>
#include <signal.h>
@@ -46,6 +47,12 @@ const char *argp_program_version = PACKAGE_VERSION;
enum {
CLD_RAW_MSG_SZ = 4096,
+
+ CLD_DEF_REP_PORT = 9081,
+
+ CLD_DEF_PEERS = 5,
+ CLD_MIN_PEERS = 3,
+ CLD_MAX_PEERS = 400, /* arbitrary "sanity" limit */
};
static struct argp_option options[] = {
@@ -58,10 +65,18 @@ static struct argp_option options[] = {
"Switch the log to standard error" },
{ "foreground", 'F', NULL, 0,
"Run in foreground, do not fork" },
+ { "myhost", 'm', "HOST", 0,
+ "Force local hostname to HOST (def: autodetect)" },
{ "port", 'p', "PORT", 0,
"bind to UDP port PORT. Default: " CLD_DEF_PORT },
{ "pid", 'P', "FILE", 0,
"Write daemon process id to FILE. Default: " CLD_DEF_PIDFN },
+ { "rep-port", 'r', "PORT", 0,
+ "bind replication engine to port PORT (def: 9081)" },
+ { "remote", 'R', "HOST:PORT", 0,
+ "Add a HOST:PORT pair to list of remote hosts. Use this argument multiple times to build cell's peer list." },
+ { "cell-size", 'S', "PEERS", 0,
+ "Total number of PEERS in cell. (PEERS/2)+1 required for quorum. Must be an odd number (def: 5)" },
{ }
};
@@ -79,10 +94,15 @@ static bool use_syslog = true;
int debugging = 0;
struct timeval current_time;
+static const char *state_name_cldb[ST_CLDBNUM] = {
+ "Init", "Open", "Active", "Master", "Slave"
+};
struct server cld_srv = {
- .data_dir = CLD_DEF_DATADIR,
- .pid_file = CLD_DEF_PIDFN,
+ .data_dir = "/spare/tmp/cld/lib",
+ .pid_file = "/var/run/cld.pid",
.port = CLD_DEF_PORT,
+ .rep_port = CLD_DEF_REP_PORT,
+ .n_peers = CLD_DEF_PEERS,
};
static void ensure_root(void);
@@ -108,6 +128,33 @@ void cldlog(int prio, const char *fmt, ...)
va_end(ap);
}
+/*
+ * Find out own hostname.
+ * This is needed for:
+ * - finding the local domain and its SRV records
+ * Do this before our state machines start ticking, so we can quit with
+ * a meaningful message easily.
+ */
+static char *get_hostname(void)
+{
+ enum { hostsz = 64 };
+ char hostb[hostsz];
+ char *ret;
+
+ if (gethostname(hostb, hostsz-1) < 0) {
+ cldlog(LOG_ERR, "get_hostname: gethostname error (%d): %s",
+ errno, strerror(errno));
+ exit(1);
+ }
+ hostb[hostsz-1] = 0;
+ if ((ret = strdup(hostb)) == NULL) {
+ cldlog(LOG_ERR, "get_hostname: no core (%ld)",
+ (long)strlen(hostb));
+ exit(1);
+ }
+ return ret;
+}
+
int udp_tx(struct server_socket *sock, struct sockaddr *addr,
socklen_t addr_len, const void *data, size_t data_len)
{
@@ -484,6 +531,55 @@ static void cldb_checkpoint(struct timer *timer)
add_chkpt_timer();
}
+static void cldb_state_cb(enum db_event event)
+{
+
+ switch (event) {
+ case CLDB_EV_ELECTED:
+ /*
+ * Safe to stop ignoring bogus client indication,
+ * so unmute us by advancing the state.
+ */
+ if (cld_srv.state_cldb == ST_CLDB_OPEN)
+ cld_srv.state_cldb = ST_CLDB_ACTIVE;
+ break;
+ case CLDB_EV_CLIENT:
+ case CLDB_EV_MASTER:
+ /*
+ * This callback runs on the context of the replication
+ * manager thread, and calling any of our functions thus
+ * turns our program into a multi-threaded one. Instead
+ * we do a loopbreak and postpone the processing.
+ */
+ if (cld_srv.state_cldb != ST_CLDB_INIT &&
+ cld_srv.state_cldb != ST_CLDB_OPEN) {
+ char c = 0x42;
+
+ if (event == CLDB_EV_MASTER)
+ cld_srv.state_cldb_new = ST_CLDB_MASTER;
+ else
+ cld_srv.state_cldb_new = ST_CLDB_SLAVE;
+ if (debugging) {
+ cldlog(LOG_DEBUG, "CLDB state > %s",
+ state_name_cldb[cld_srv.state_cldb_new]);
+ }
+
+ /* wake up main loop */
+ write(cld_srv.rep_pipe[1], &c, 1);
+ }
+ break;
+ default:
+ cldlog(LOG_WARNING, "API confusion with CLDB, event 0x%x", event);
+ cld_srv.state_cldb = ST_CLDB_OPEN; /* wrong, stub for now */
+ cld_srv.state_cldb_new = ST_CLDB_INIT;
+ }
+}
+
+static bool noop_event(int fd, short events, void *userdata)
+{
+ return true; /* continue main loop; do NOT terminate server */
+}
+
static int net_open(void)
{
int ipv6_found;
@@ -575,6 +671,32 @@ err_addr:
return rc;
}
+static void cldb_state_process(enum st_cldb new_state)
+{
+ unsigned int db_flags;
+
+ if ((new_state == ST_CLDB_MASTER || new_state == ST_CLDB_SLAVE) &&
+ cld_srv.state_cldb == ST_CLDB_ACTIVE) {
+
+ db_flags = DB_CREATE | DB_THREAD;
+ if (cldb_up(&cld_srv.cldb, db_flags))
+ return;
+
+ ensure_root();
+
+ if (sess_load(cld_srv.sessions) != 0) {
+ cldlog(LOG_ERR, "session load failed. FIXME: I want error handling");
+ return;
+ }
+
+ add_chkpt_timer();
+ } else {
+ if (debugging)
+ cldlog(LOG_DEBUG, "unhandled state transition %d -> %d",
+ cld_srv.state_cldb, new_state);
+ }
+}
+
static void segv_signal(int signal)
{
cldlog(LOG_ERR, "SIGSEGV");
@@ -598,10 +720,59 @@ static void stats_dump(void)
{
X(poll);
X(event);
+ cldlog(LOG_INFO, "State: CLDB %s",
+ state_name_cldb[cld_srv.state_cldb]);
}
#undef X
+static bool add_remote(const char *arg)
+{
+ size_t arg_len = strlen(arg);
+ int i, port;
+ struct db_remote *rp;
+ char *s_port, *colon;
+
+ if (!arg_len)
+ return false;
+
+ /* verify no whitespace in input */
+ for (i = 0; i < arg_len; i++)
+ if (isspace(arg[i]))
+ return false;
+
+ /* find colon delimiter */
+ colon = strchr(arg, ':');
+ if (!colon || (colon == arg))
+ return false;
+ s_port = colon + 1;
+
+ /* parse replication port number */
+ port = atoi(s_port);
+ if (port < 1 || port > 65535)
+ return false;
+
+ /* alloc and fill in remote-host record */
+ rp = malloc(sizeof(*rp));
+ if (!rp)
+ return false;
+
+ rp->port = port;
+ rp->host = strdup(arg);
+ if (!rp->host) {
+ free(rp);
+ return false;
+ }
+
+ /* truncate string down to simply hostname portion */
+ rp->host[colon - arg] = 0;
+
+ /* add remote host to global list */
+ cld_srv.rep_remotes = g_list_append(cld_srv.rep_remotes, rp);
+
+ return true;
+}
+
static error_t parse_opt (int key, char *arg, struct argp_state *state)
{
switch(key) {
@@ -622,6 +793,15 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
case 'F':
cld_srv.flags |= SFL_FOREGROUND;
break;
+ case 'm':
+ if ((strlen(arg) > 3) && (strlen(arg) < 64) &&
+ (strchr(arg, '.')))
+ cld_srv.force_myhost = arg;
+ else {
+ fprintf(stderr, "invalid myhost: '%s'\n", arg);
+ argp_usage(state);
+ }
+ break;
case 'p':
if (atoi(arg) > 0 && atoi(arg) < 65536)
cld_srv.port = arg;
@@ -633,6 +813,31 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
case 'P':
cld_srv.pid_file = arg;
break;
+ case 'r':
+ if (atoi(arg) > 0 && atoi(arg) < 65536)
+ cld_srv.rep_port = atoi(arg);
+ else {
+ fprintf(stderr, "invalid rep-port: '%s'\n", arg);
+ argp_usage(state);
+ }
+ break;
+ case 'R':
+ if (!add_remote(arg)) {
+ fprintf(stderr, "invalid remote host:port: '%s'\n", arg);
+ argp_usage(state);
+ }
+ break;
+ case 'S': {
+ int n_peers = atoi(arg);
+ if ((n_peers >= CLD_MIN_PEERS) && (n_peers < CLD_MAX_PEERS) &&
+ (n_peers & 0x01))
+ cld_srv.n_peers = atoi(arg);
+ else {
+ fprintf(stderr, "invalid peer count: '%s'\n", arg);
+ argp_usage(state);
+ }
+ break;
+ }
case ARGP_KEY_ARG:
argp_usage(state); /* too many args */
break;
@@ -648,9 +853,12 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
int main (int argc, char *argv[])
{
error_t aprc;
- int rc = 1;
+ int rc = 1, env_flags;
time_t next_timeout;
+ cld_srv.state_cldb =
+ cld_srv.state_cldb_new = ST_CLDB_INIT;
+
/* isspace() and strcasecmp() consistency requires this */
setlocale(LC_ALL, "C");
@@ -674,6 +882,20 @@ int main (int argc, char *argv[])
if (use_syslog)
openlog(PROGRAM_NAME, LOG_PID, LOG_LOCAL3);
+ if (cld_srv.force_myhost)
+ cld_srv.myhost = strdup(cld_srv.force_myhost);
+ else
+ cld_srv.myhost = get_hostname();
+
+ if (debugging)
+ cldlog(LOG_DEBUG, "our hostname: %s", cld_srv.myhost);
+
+ /* remotes file should list all in peer group, except for us */
+ if ((cld_srv.n_peers - 1) != g_list_length(cld_srv.rep_remotes)) {
+ cldlog(LOG_ERR, "n_peers does not match remotes file loaded");
+ goto err_out;
+ }
+
if (!(cld_srv.flags & SFL_FOREGROUND) && (daemon(1, !use_syslog) < 0)) {
syslogerr("daemon");
goto err_out;
@@ -694,16 +916,7 @@ int main (int argc, char *argv[])
signal(SIGTERM, term_signal);
signal(SIGUSR1, stats_signal);
- if (cldb_init(&cld_srv.cldb, cld_srv.data_dir, NULL,
- DB_CREATE | DB_THREAD | DB_RECOVER,
- "cld", use_syslog,
- DB_CREATE | DB_THREAD, NULL))
- exit(1);
-
- ensure_root();
-
timer_init(&cld_srv.chkpt_timer, cldb_checkpoint, NULL);
- add_chkpt_timer();
rc = 1;
@@ -716,17 +929,53 @@ int main (int argc, char *argv[])
!cld_srv.polls)
goto err_out_pid;
- if (sess_load(cld_srv.sessions) != 0)
- goto err_out_pid;
+ /* init pipe for replication manager notifications to us */
+ if (pipe(cld_srv.rep_pipe) < 0) {
+ syslogerr("pipe");
+ goto err_out;
+ }
/* set up server networking */
rc = net_open();
if (rc)
goto err_out_pid;
+ {
+ struct pollfd pfd;
+ struct server_poll sp;
+
+ /*
+ * add pipe to poll list, after doing so with our net sockets
+ */
+ sp.fd = cld_srv.rep_pipe[0];
+ sp.cb = noop_event;
+ sp.userdata = NULL;
+ g_array_append_val(cld_srv.poll_data, sp);
+
+ pfd.fd = cld_srv.rep_pipe[0];
+ pfd.events = POLLIN;
+ pfd.revents = 0;
+ g_array_append_val(cld_srv.polls, pfd);
+ }
+
+ env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
+ if (cldb_init(&cld_srv.cldb, cld_srv.data_dir, NULL,
+ env_flags, "cld", true,
+ cld_srv.rep_remotes,
+ cld_srv.myhost, cld_srv.rep_port,
+ cld_srv.n_peers, cldb_state_cb)) {
+ cldlog(LOG_ERR, "Failed to open CLDB, limping");
+ } else {
+ cld_srv.state_cldb =
+ cld_srv.state_cldb_new = ST_CLDB_OPEN;
+ }
+
cldlog(LOG_INFO, "initialized: cport %s, dbg %u",
cld_srv.port,
debugging);
+ cldlog(LOG_INFO, "replication: %s:%u",
+ cld_srv.myhost,
+ cld_srv.rep_port);
next_timeout = timers_run();
@@ -789,13 +1038,20 @@ int main (int argc, char *argv[])
}
next_timeout = timers_run();
+
+ if (cld_srv.state_cldb_new != ST_CLDB_INIT &&
+ cld_srv.state_cldb_new != cld_srv.state_cldb) {
+ cldb_state_process(cld_srv.state_cldb_new);
+ cld_srv.state_cldb = cld_srv.state_cldb_new;
+ }
}
cldlog(LOG_INFO, "shutting down");
if (cld_srv.cldb.up)
cldb_down(&cld_srv.cldb);
- cldb_fini(&cld_srv.cldb);
+ if (cld_srv.state_cldb >= ST_CLDB_OPEN)
+ cldb_fini(&cld_srv.cldb);
rc = 0;
diff --git a/test/pid-exists b/test/pid-exists
index 351b4f1..4fa2275 100755
--- a/test/pid-exists
+++ b/test/pid-exists
@@ -1,9 +1,12 @@
#!/bin/sh
-if [ ! -f cld.pid ]
-then
- echo "pid file not found."
- exit 1
-fi
+for n in 1 2 3
+do
+ if [ ! -f cld$n.pid ]
+ then
+ echo "cld$n.pid not found."
+ exit 1
+ fi
+done
exit 0
diff --git a/test/prep-db b/test/prep-db
index 353ca4a..3e4fb60 100755
--- a/test/prep-db
+++ b/test/prep-db
@@ -2,13 +2,16 @@
DATADIR=data
-mkdir -p $DATADIR
-
-if [ ! -d $DATADIR ]
-then
- rm -rf $DATADIR
- echo "test database dir not found."
- exit 1
-fi
+for n in 1 2 3
+do
+ mkdir -p $DATADIR/n$n/data
+
+ if [ ! -d $DATADIR/n$n/data ]
+ then
+ rm -rf $DATADIR
+ echo "test database dir for node $n not found."
+ exit 1
+ fi
+done
exit 0
diff --git a/test/start-daemon b/test/start-daemon
index 4cb9fd7..06b3250 100755
--- a/test/start-daemon
+++ b/test/start-daemon
@@ -1,13 +1,31 @@
#!/bin/sh
-if [ -f cld.pid ]
+if [ -f cld1.pid -o -f cld2.pid -o -f cld3.pid ]
then
- echo "pid file found. daemon still running?"
+ echo "pid file found. daemons still running?"
exit 1
fi
-../server/cld -P cld.pid -d "$PWD/data" -p 18181 -E
+../server/cld -d "$PWD/data/n1/data" -p 18181 -r 19181 -P cld1.pid -E \
+ -D 2 -S 3 \
+ -m localhost.localdomain \
+ -R localhost.localdomain:19182 \
+ -R localhost.localdomain:19183
-sleep 3
+../server/cld -d "$PWD/data/n2/data" -p 18182 -r 19182 -P cld2.pid -E \
+ -D 2 -S 3 \
+ -m localhost.localdomain \
+ -R localhost.localdomain:19181 \
+ -R localhost.localdomain:19183
+
+../server/cld -d "$PWD/data/n3/data" -p 18183 -r 19183 -P cld3.pid -E \
+ -D 2 -S 3 \
+ -m localhost.localdomain \
+ -R localhost.localdomain:19181 \
+ -R localhost.localdomain:19182
+sleep 1
+
+echo " start-daemon: Waiting 20s, for daemons to start up..."
+sleep 20
exit 0
diff --git a/test/stop-daemon b/test/stop-daemon
index 221dc46..d00fda6 100755
--- a/test/stop-daemon
+++ b/test/stop-daemon
@@ -1,23 +1,35 @@
#!/bin/sh
-if [ ! -f cld.pid ]
-then
- echo no daemon pid file found.
- exit 1
-fi
+for n in 1 2 3
+do
+ if [ ! -f cld$n.pid ]
+ then
+ echo " stop-daemon: cld$n.pid not found."
+ exit 1
+ fi
+done
+
-kill `cat cld.pid`
+kill `cat cld1.pid cld2.pid cld3.pid`
for ((n = 0; n < 10; n++))
do
- if [ ! -f cld.pid ]
+ if [ -f cld1.pid -o -f cld2.pid -o -f cld3.pid ]
then
+ sleep 1
+ else
exit 0
fi
- sleep 1
done
-echo "PID file not removed, after signal sent."
-rm -f cld.pid
+for n in 1 2 3
+do
+ if [ -f cld$n.pid ]
+ then
+ echo " stop-daemon: cld$n.pid found, after signal sent."
+ fi
+done
+
+rm -f cld?.pid
exit 1
^ permalink raw reply related [flat|nested] 7+ messages in thread
* Re: [PATCH v1] CLD replication (WIP)
2009-07-31 10:40 [PATCH v1] " Jeff Garzik
@ 2009-07-31 18:14 ` Sage Weil
2009-07-31 18:38 ` Jeff Garzik
0 siblings, 1 reply; 7+ messages in thread
From: Sage Weil @ 2009-07-31 18:14 UTC (permalink / raw)
To: Jeff Garzik; +Cc: hail-devel
Hi Jeff,
Do you still plan to replace bdb (and it's replication) with a something
based on paxos? I'm considering replacing the Ceph monitors (which
currently implement paxos, but in a very ceph-specific way) with cld if it
can meet the basic requirements.
What I'd kind of like to see is a clean implementation of a paxos
library--one that leaves out message transport and storage--to build a
replicated write-ahead log. And then a separate library for handling the
database/namespace served up by cld (be it regular files, bdb, whatever)
that leaves replication up to paxos. It looks like Google ended up doing
something similar with Chubby (see
http://labs.google.com/papers/paxos_made_live.html).
Does this sound like the direction you guys are heading in?
sage
On Fri, 31 Jul 2009, Jeff Garzik wrote:
>
> Below is the current CLD replication patch, which takes CLD from being
> a single-node service to a fully replicated, highly available service.
>
> The server implementation should be complete.
>
> The current merge blocker is needed code in libcldc, which does not
> yet properly "hunt" for a master, among a group of peer CLD replicas
> in a CLD cell.
>
> This will be a big milestone for CLD, when merged. The next milestone
> will be adding the needed strict-cache-coherence caching semantics
> to CLD server and client lib.
>
> This patch was generated against git commit
> 511b8dafb233ee85e60ddf7eda212f87963e150c.
>
> ---
> server/cld.h | 20 +++
> server/cldb.c | 69 +++++++++++--
> server/cldb.h | 9 +
> server/cldbadm.c | 8 -
> server/server.c | 286 +++++++++++++++++++++++++++++++++++++++++++++++++++---
> test/pid-exists | 13 +-
> test/prep-db | 19 ++-
> test/start-daemon | 26 ++++
> test/stop-daemon | 32 ++++--
> 9 files changed, 428 insertions(+), 54 deletions(-)
>
> diff --git a/server/cld.h b/server/cld.h
> index 21f103d..08e6b12 100644
> --- a/server/cld.h
> +++ b/server/cld.h
> @@ -91,6 +91,15 @@ struct msg_params {
> size_t msg_len;
> };
>
> +enum st_cldb {
> + ST_CLDB_INIT,
> + ST_CLDB_OPEN,
> + ST_CLDB_ACTIVE,
> + ST_CLDB_MASTER,
> + ST_CLDB_SLAVE,
> + ST_CLDBNUM
> +};
> +
> struct server_stats {
> unsigned long poll; /* number polls */
> unsigned long event; /* events dispatched */
> @@ -114,6 +123,17 @@ struct server {
> int pid_fd;
>
> char *port; /* bind port */
> + unsigned short rep_port; /* db4 replication port */
> +
> + char *myhost;
> + char *force_myhost;
> + GList *rep_remotes;
> +
> + unsigned int n_peers; /* total peers in cell */
> +
> + int rep_pipe[2];
> +
> + enum st_cldb state_cldb, state_cldb_new;
>
> struct cldb cldb; /* database info */
>
> diff --git a/server/cldb.c b/server/cldb.c
> index 3e7c95c..254decd 100644
> --- a/server/cldb.c
> +++ b/server/cldb.c
> @@ -25,8 +25,6 @@
> #include <glib.h>
> #include "cld.h"
>
> -static int cldb_up(struct cldb *cldb, unsigned int flags);
> -
> /*
> * db4 page sizes for our various databases. Filesystem block size
> * is recommended, so 4096 was chosen (default ext3 block size).
> @@ -202,6 +200,30 @@ err_out:
> return -EIO;
> }
>
> +static int add_remote_sites(DB_ENV *dbenv, GList *remotes, int *nsites)
> +{
> + int rc;
> + struct db_remote *rp;
> + GList *tmp;
> +
> + *nsites = 0;
> + for (tmp = remotes; tmp; tmp = tmp->next) {
> + rp = tmp->data;
> +
> + rc = dbenv->repmgr_add_remote_site(dbenv, rp->host, rp->port,
> + NULL, 0);
> + if (rc) {
> + dbenv->err(dbenv, rc,
> + "dbenv->add.remote.site host %s port %u",
> + rp->host, rp->port);
> + return rc;
> + }
> + (*nsites)++;
> + }
> +
> + return 0;
> +}
> +
> static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
> {
> struct cldb *cldb = dbenv->app_private;
> @@ -229,12 +251,13 @@ static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
>
> int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
> unsigned int env_flags, const char *errpfx, bool do_syslog,
> - unsigned int flags, void (*cb)(enum db_event))
> + GList *remotes, char *rep_host, unsigned short rep_port,
> + int n_peers, void (*cb)(enum db_event))
> {
> - int rc;
> + int rc, nsites = 0;
> DB_ENV *dbenv;
>
> - cldb->is_master = true;
> + cldb->is_master = false;
> cldb->home = db_home;
> cldb->state_cb = cb;
>
> @@ -281,25 +304,55 @@ int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
> cldb->keyed = true;
> }
>
> + rc = dbenv->repmgr_set_local_site(dbenv, rep_host, rep_port, 0);
> + if (rc) {
> + dbenv->err(dbenv, rc, "dbenv->set_local_site");
> + goto err_out;
> + }
> +
> rc = dbenv->set_event_notify(dbenv, db4_event);
> if (rc) {
> dbenv->err(dbenv, rc, "dbenv->set_event_notify");
> goto err_out;
> }
>
> + rc = dbenv->rep_set_priority(dbenv, 100);
> + if (rc) {
> + dbenv->err(dbenv, rc, "dbenv->rep_set_priority");
> + goto err_out;
> + }
> +
> + rc = dbenv->rep_set_nsites(dbenv, n_peers);
> + if (rc) {
> + dbenv->err(dbenv, rc, "dbenv->rep_set_nsites");
> + goto err_out;
> + }
> +
> + rc = dbenv->repmgr_set_ack_policy(dbenv, DB_REPMGR_ACKS_QUORUM);
> + if (rc) {
> + dbenv->err(dbenv, rc, "dbenv->rep_ack_policy");
> + goto err_out;
> + }
> +
> /* init DB transactional environment, stored in directory db_home */
> env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
> - env_flags |= DB_INIT_TXN;
> + env_flags |= DB_INIT_TXN | DB_INIT_REP;
> rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
> if (rc) {
> dbenv->err(dbenv, rc, "dbenv->open");
> goto err_out;
> }
>
> - rc = cldb_up(cldb, flags);
> + rc = add_remote_sites(dbenv, remotes, &nsites);
> if (rc)
> goto err_out;
>
> + rc = dbenv->repmgr_start(dbenv, 2, DB_REP_ELECTION);
> + if (rc) {
> + dbenv->err(dbenv, rc, "dbenv->repmgr_start");
> + goto err_out;
> + }
> +
> return 0;
>
> err_out:
> @@ -310,7 +363,7 @@ err_out:
> /*
> * open databases
> */
> -static int cldb_up(struct cldb *cldb, unsigned int flags)
> +int cldb_up(struct cldb *cldb, unsigned int flags)
> {
> DB_ENV *dbenv = cldb->env;
> int rc;
> diff --git a/server/cldb.h b/server/cldb.h
> index d28f732..f8f26db 100644
> --- a/server/cldb.h
> +++ b/server/cldb.h
> @@ -107,6 +107,11 @@ enum db_event {
> CLDB_EV_NONE, CLDB_EV_CLIENT, CLDB_EV_MASTER, CLDB_EV_ELECTED
> };
>
> +struct db_remote { /* remotes for cldb_init */
> + char *host;
> + unsigned short port;
> +};
> +
> struct cldb {
> bool is_master;
> bool keyed; /* using encryption? */
> @@ -133,7 +138,9 @@ struct cldb {
>
> extern int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
> unsigned int env_flags, const char *errpfx, bool do_syslog,
> - unsigned int flags, void (*cb)(enum db_event));
> + GList *remotes, char *rep_host, unsigned short rep_port,
> + int n_peers, void (*cb)(enum db_event));
> +extern int cldb_up(struct cldb *cldb, unsigned int flags);
> extern void cldb_down(struct cldb *cldb);
> extern void cldb_fini(struct cldb *cldb);
>
> diff --git a/server/cldbadm.c b/server/cldbadm.c
> index 37e8e36..9342f66 100644
> --- a/server/cldbadm.c
> +++ b/server/cldbadm.c
> @@ -78,7 +78,8 @@ int main(int argc, char *argv[])
> }
>
> if (cldb_init(&cld_adm.cldb, cld_adm.data_dir, NULL,
> - DB_RECOVER, "cldbadm", false, 0, NULL))
> + DB_RECOVER, "cldbadm", false,
> + NULL, NULL, 0, 0, NULL))
> goto err_dbopen;
>
> switch (cld_adm.mode) {
> @@ -142,8 +143,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state)
> * Stubs for contents of cldb.c
> */
> int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
> - unsigned int env_flags, const char *errpfx, bool do_syslog,
> - unsigned int flags, void (*cb)(enum db_event))
> + unsigned int env_flags, const char *errpfx, bool do_syslog,
> + GList *remotes, char *rep_host, unsigned short rep_port,
> + int n_peers, void (*cb)(enum db_event))
> {
>
> return 0;
> diff --git a/server/server.c b/server/server.c
> index 02e6231..fb51a42 100644
> --- a/server/server.c
> +++ b/server/server.c
> @@ -29,6 +29,7 @@
> #include <errno.h>
> #include <syslog.h>
> #include <locale.h>
> +#include <ctype.h>
> #include <argp.h>
> #include <netdb.h>
> #include <signal.h>
> @@ -46,6 +47,12 @@ const char *argp_program_version = PACKAGE_VERSION;
>
> enum {
> CLD_RAW_MSG_SZ = 4096,
> +
> + CLD_DEF_REP_PORT = 9081,
> +
> + CLD_DEF_PEERS = 5,
> + CLD_MIN_PEERS = 3,
> + CLD_MAX_PEERS = 400, /* arbitrary "sanity" limit */
> };
>
> static struct argp_option options[] = {
> @@ -58,10 +65,18 @@ static struct argp_option options[] = {
> "Switch the log to standard error" },
> { "foreground", 'F', NULL, 0,
> "Run in foreground, do not fork" },
> + { "myhost", 'm', "HOST", 0,
> + "Force local hostname to HOST (def: autodetect)" },
> { "port", 'p', "PORT", 0,
> "bind to UDP port PORT. Default: " CLD_DEF_PORT },
> { "pid", 'P', "FILE", 0,
> "Write daemon process id to FILE. Default: " CLD_DEF_PIDFN },
> + { "rep-port", 'r', "PORT", 0,
> + "bind replication engine to port PORT (def: 9081)" },
> + { "remote", 'R', "HOST:PORT", 0,
> + "Add a HOST:PORT pair to list of remote hosts. Use this argument multiple times to build cell's peer list." },
> + { "cell-size", 'S', "PEERS", 0,
> + "Total number of PEERS in cell. (PEERS/2)+1 required for quorum. Must be an odd number (def: 5)" },
> { }
> };
>
> @@ -79,10 +94,15 @@ static bool use_syslog = true;
> int debugging = 0;
> struct timeval current_time;
>
> +static const char *state_name_cldb[ST_CLDBNUM] = {
> + "Init", "Open", "Active", "Master", "Slave"
> +};
> struct server cld_srv = {
> - .data_dir = CLD_DEF_DATADIR,
> - .pid_file = CLD_DEF_PIDFN,
> + .data_dir = "/spare/tmp/cld/lib",
> + .pid_file = "/var/run/cld.pid",
> .port = CLD_DEF_PORT,
> + .rep_port = CLD_DEF_REP_PORT,
> + .n_peers = CLD_DEF_PEERS,
> };
>
> static void ensure_root(void);
> @@ -108,6 +128,33 @@ void cldlog(int prio, const char *fmt, ...)
> va_end(ap);
> }
>
> +/*
> + * Find out own hostname.
> + * This is needed for:
> + * - finding the local domain and its SRV records
> + * Do this before our state machines start ticking, so we can quit with
> + * a meaningful message easily.
> + */
> +static char *get_hostname(void)
> +{
> + enum { hostsz = 64 };
> + char hostb[hostsz];
> + char *ret;
> +
> + if (gethostname(hostb, hostsz-1) < 0) {
> + cldlog(LOG_ERR, "get_hostname: gethostname error (%d): %s",
> + errno, strerror(errno));
> + exit(1);
> + }
> + hostb[hostsz-1] = 0;
> + if ((ret = strdup(hostb)) == NULL) {
> + cldlog(LOG_ERR, "get_hostname: no core (%ld)",
> + (long)strlen(hostb));
> + exit(1);
> + }
> + return ret;
> +}
> +
> int udp_tx(struct server_socket *sock, struct sockaddr *addr,
> socklen_t addr_len, const void *data, size_t data_len)
> {
> @@ -484,6 +531,55 @@ static void cldb_checkpoint(struct timer *timer)
> add_chkpt_timer();
> }
>
> +static void cldb_state_cb(enum db_event event)
> +{
> +
> + switch (event) {
> + case CLDB_EV_ELECTED:
> + /*
> + * Safe to stop ignoring bogus client indication,
> + * so unmute us by advancing the state.
> + */
> + if (cld_srv.state_cldb == ST_CLDB_OPEN)
> + cld_srv.state_cldb = ST_CLDB_ACTIVE;
> + break;
> + case CLDB_EV_CLIENT:
> + case CLDB_EV_MASTER:
> + /*
> + * This callback runs on the context of the replication
> + * manager thread, and calling any of our functions thus
> + * turns our program into a multi-threaded one. Instead
> + * we do a loopbreak and postpone the processing.
> + */
> + if (cld_srv.state_cldb != ST_CLDB_INIT &&
> + cld_srv.state_cldb != ST_CLDB_OPEN) {
> + char c = 0x42;
> +
> + if (event == CLDB_EV_MASTER)
> + cld_srv.state_cldb_new = ST_CLDB_MASTER;
> + else
> + cld_srv.state_cldb_new = ST_CLDB_SLAVE;
> + if (debugging) {
> + cldlog(LOG_DEBUG, "CLDB state > %s",
> + state_name_cldb[cld_srv.state_cldb_new]);
> + }
> +
> + /* wake up main loop */
> + write(cld_srv.rep_pipe[1], &c, 1);
> + }
> + break;
> + default:
> + cldlog(LOG_WARNING, "API confusion with CLDB, event 0x%x", event);
> + cld_srv.state_cldb = ST_CLDB_OPEN; /* wrong, stub for now */
> + cld_srv.state_cldb_new = ST_CLDB_INIT;
> + }
> +}
> +
> +static bool noop_event(int fd, short events, void *userdata)
> +{
> + return true; /* continue main loop; do NOT terminate server */
> +}
> +
> static int net_open(void)
> {
> int ipv6_found;
> @@ -575,6 +671,32 @@ err_addr:
> return rc;
> }
>
> +static void cldb_state_process(enum st_cldb new_state)
> +{
> + unsigned int db_flags;
> +
> + if ((new_state == ST_CLDB_MASTER || new_state == ST_CLDB_SLAVE) &&
> + cld_srv.state_cldb == ST_CLDB_ACTIVE) {
> +
> + db_flags = DB_CREATE | DB_THREAD;
> + if (cldb_up(&cld_srv.cldb, db_flags))
> + return;
> +
> + ensure_root();
> +
> + if (sess_load(cld_srv.sessions) != 0) {
> + cldlog(LOG_ERR, "session load failed. FIXME: I want error handling");
> + return;
> + }
> +
> + add_chkpt_timer();
> + } else {
> + if (debugging)
> + cldlog(LOG_DEBUG, "unhandled state transition %d -> %d",
> + cld_srv.state_cldb, new_state);
> + }
> +}
> +
> static void segv_signal(int signal)
> {
> cldlog(LOG_ERR, "SIGSEGV");
> @@ -598,10 +720,59 @@ static void stats_dump(void)
> {
> X(poll);
> X(event);
> + cldlog(LOG_INFO, "State: CLDB %s",
> + state_name_cldb[cld_srv.state_cldb]);
> }
>
> #undef X
>
> +static bool add_remote(const char *arg)
> +{
> + size_t arg_len = strlen(arg);
> + int i, port;
> + struct db_remote *rp;
> + char *s_port, *colon;
> +
> + if (!arg_len)
> + return false;
> +
> + /* verify no whitespace in input */
> + for (i = 0; i < arg_len; i++)
> + if (isspace(arg[i]))
> + return false;
> +
> + /* find colon delimiter */
> + colon = strchr(arg, ':');
> + if (!colon || (colon == arg))
> + return false;
> + s_port = colon + 1;
> +
> + /* parse replication port number */
> + port = atoi(s_port);
> + if (port < 1 || port > 65535)
> + return false;
> +
> + /* alloc and fill in remote-host record */
> + rp = malloc(sizeof(*rp));
> + if (!rp)
> + return false;
> +
> + rp->port = port;
> + rp->host = strdup(arg);
> + if (!rp->host) {
> + free(rp);
> + return false;
> + }
> +
> + /* truncate string down to simply hostname portion */
> + rp->host[colon - arg] = 0;
> +
> + /* add remote host to global list */
> + cld_srv.rep_remotes = g_list_append(cld_srv.rep_remotes, rp);
> +
> + return true;
> +}
> +
> static error_t parse_opt (int key, char *arg, struct argp_state *state)
> {
> switch(key) {
> @@ -622,6 +793,15 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
> case 'F':
> cld_srv.flags |= SFL_FOREGROUND;
> break;
> + case 'm':
> + if ((strlen(arg) > 3) && (strlen(arg) < 64) &&
> + (strchr(arg, '.')))
> + cld_srv.force_myhost = arg;
> + else {
> + fprintf(stderr, "invalid myhost: '%s'\n", arg);
> + argp_usage(state);
> + }
> + break;
> case 'p':
> if (atoi(arg) > 0 && atoi(arg) < 65536)
> cld_srv.port = arg;
> @@ -633,6 +813,31 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
> case 'P':
> cld_srv.pid_file = arg;
> break;
> + case 'r':
> + if (atoi(arg) > 0 && atoi(arg) < 65536)
> + cld_srv.rep_port = atoi(arg);
> + else {
> + fprintf(stderr, "invalid rep-port: '%s'\n", arg);
> + argp_usage(state);
> + }
> + break;
> + case 'R':
> + if (!add_remote(arg)) {
> + fprintf(stderr, "invalid remote host:port: '%s'\n", arg);
> + argp_usage(state);
> + }
> + break;
> + case 'S': {
> + int n_peers = atoi(arg);
> + if ((n_peers >= CLD_MIN_PEERS) && (n_peers < CLD_MAX_PEERS) &&
> + (n_peers & 0x01))
> + cld_srv.n_peers = atoi(arg);
> + else {
> + fprintf(stderr, "invalid peer count: '%s'\n", arg);
> + argp_usage(state);
> + }
> + break;
> + }
> case ARGP_KEY_ARG:
> argp_usage(state); /* too many args */
> break;
> @@ -648,9 +853,12 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
> int main (int argc, char *argv[])
> {
> error_t aprc;
> - int rc = 1;
> + int rc = 1, env_flags;
> time_t next_timeout;
>
> + cld_srv.state_cldb =
> + cld_srv.state_cldb_new = ST_CLDB_INIT;
> +
> /* isspace() and strcasecmp() consistency requires this */
> setlocale(LC_ALL, "C");
>
> @@ -674,6 +882,20 @@ int main (int argc, char *argv[])
> if (use_syslog)
> openlog(PROGRAM_NAME, LOG_PID, LOG_LOCAL3);
>
> + if (cld_srv.force_myhost)
> + cld_srv.myhost = strdup(cld_srv.force_myhost);
> + else
> + cld_srv.myhost = get_hostname();
> +
> + if (debugging)
> + cldlog(LOG_DEBUG, "our hostname: %s", cld_srv.myhost);
> +
> + /* remotes file should list all in peer group, except for us */
> + if ((cld_srv.n_peers - 1) != g_list_length(cld_srv.rep_remotes)) {
> + cldlog(LOG_ERR, "n_peers does not match remotes file loaded");
> + goto err_out;
> + }
> +
> if (!(cld_srv.flags & SFL_FOREGROUND) && (daemon(1, !use_syslog) < 0)) {
> syslogerr("daemon");
> goto err_out;
> @@ -694,16 +916,7 @@ int main (int argc, char *argv[])
> signal(SIGTERM, term_signal);
> signal(SIGUSR1, stats_signal);
>
> - if (cldb_init(&cld_srv.cldb, cld_srv.data_dir, NULL,
> - DB_CREATE | DB_THREAD | DB_RECOVER,
> - "cld", use_syslog,
> - DB_CREATE | DB_THREAD, NULL))
> - exit(1);
> -
> - ensure_root();
> -
> timer_init(&cld_srv.chkpt_timer, cldb_checkpoint, NULL);
> - add_chkpt_timer();
>
> rc = 1;
>
> @@ -716,17 +929,53 @@ int main (int argc, char *argv[])
> !cld_srv.polls)
> goto err_out_pid;
>
> - if (sess_load(cld_srv.sessions) != 0)
> - goto err_out_pid;
> + /* init pipe for replication manager notifications to us */
> + if (pipe(cld_srv.rep_pipe) < 0) {
> + syslogerr("pipe");
> + goto err_out;
> + }
>
> /* set up server networking */
> rc = net_open();
> if (rc)
> goto err_out_pid;
>
> + {
> + struct pollfd pfd;
> + struct server_poll sp;
> +
> + /*
> + * add pipe to poll list, after doing so with our net sockets
> + */
> + sp.fd = cld_srv.rep_pipe[0];
> + sp.cb = noop_event;
> + sp.userdata = NULL;
> + g_array_append_val(cld_srv.poll_data, sp);
> +
> + pfd.fd = cld_srv.rep_pipe[0];
> + pfd.events = POLLIN;
> + pfd.revents = 0;
> + g_array_append_val(cld_srv.polls, pfd);
> + }
> +
> + env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
> + if (cldb_init(&cld_srv.cldb, cld_srv.data_dir, NULL,
> + env_flags, "cld", true,
> + cld_srv.rep_remotes,
> + cld_srv.myhost, cld_srv.rep_port,
> + cld_srv.n_peers, cldb_state_cb)) {
> + cldlog(LOG_ERR, "Failed to open CLDB, limping");
> + } else {
> + cld_srv.state_cldb =
> + cld_srv.state_cldb_new = ST_CLDB_OPEN;
> + }
> +
> cldlog(LOG_INFO, "initialized: cport %s, dbg %u",
> cld_srv.port,
> debugging);
> + cldlog(LOG_INFO, "replication: %s:%u",
> + cld_srv.myhost,
> + cld_srv.rep_port);
>
> next_timeout = timers_run();
>
> @@ -789,13 +1038,20 @@ int main (int argc, char *argv[])
> }
>
> next_timeout = timers_run();
> +
> + if (cld_srv.state_cldb_new != ST_CLDB_INIT &&
> + cld_srv.state_cldb_new != cld_srv.state_cldb) {
> + cldb_state_process(cld_srv.state_cldb_new);
> + cld_srv.state_cldb = cld_srv.state_cldb_new;
> + }
> }
>
> cldlog(LOG_INFO, "shutting down");
>
> if (cld_srv.cldb.up)
> cldb_down(&cld_srv.cldb);
> - cldb_fini(&cld_srv.cldb);
> + if (cld_srv.state_cldb >= ST_CLDB_OPEN)
> + cldb_fini(&cld_srv.cldb);
>
> rc = 0;
>
> diff --git a/test/pid-exists b/test/pid-exists
> index 351b4f1..4fa2275 100755
> --- a/test/pid-exists
> +++ b/test/pid-exists
> @@ -1,9 +1,12 @@
> #!/bin/sh
>
> -if [ ! -f cld.pid ]
> -then
> - echo "pid file not found."
> - exit 1
> -fi
> +for n in 1 2 3
> +do
> + if [ ! -f cld$n.pid ]
> + then
> + echo "cld$n.pid not found."
> + exit 1
> + fi
> +done
>
> exit 0
> diff --git a/test/prep-db b/test/prep-db
> index 353ca4a..3e4fb60 100755
> --- a/test/prep-db
> +++ b/test/prep-db
> @@ -2,13 +2,16 @@
>
> DATADIR=data
>
> -mkdir -p $DATADIR
> -
> -if [ ! -d $DATADIR ]
> -then
> - rm -rf $DATADIR
> - echo "test database dir not found."
> - exit 1
> -fi
> +for n in 1 2 3
> +do
> + mkdir -p $DATADIR/n$n/data
> +
> + if [ ! -d $DATADIR/n$n/data ]
> + then
> + rm -rf $DATADIR
> + echo "test database dir for node $n not found."
> + exit 1
> + fi
> +done
>
> exit 0
> diff --git a/test/start-daemon b/test/start-daemon
> index 4cb9fd7..06b3250 100755
> --- a/test/start-daemon
> +++ b/test/start-daemon
> @@ -1,13 +1,31 @@
> #!/bin/sh
>
> -if [ -f cld.pid ]
> +if [ -f cld1.pid -o -f cld2.pid -o -f cld3.pid ]
> then
> - echo "pid file found. daemon still running?"
> + echo "pid file found. daemons still running?"
> exit 1
> fi
>
> -../server/cld -P cld.pid -d "$PWD/data" -p 18181 -E
> +../server/cld -d "$PWD/data/n1/data" -p 18181 -r 19181 -P cld1.pid -E \
> + -D 2 -S 3 \
> + -m localhost.localdomain \
> + -R localhost.localdomain:19182 \
> + -R localhost.localdomain:19183
>
> -sleep 3
> +../server/cld -d "$PWD/data/n2/data" -p 18182 -r 19182 -P cld2.pid -E \
> + -D 2 -S 3 \
> + -m localhost.localdomain \
> + -R localhost.localdomain:19181 \
> + -R localhost.localdomain:19183
> +
> +../server/cld -d "$PWD/data/n3/data" -p 18183 -r 19183 -P cld3.pid -E \
> + -D 2 -S 3 \
> + -m localhost.localdomain \
> + -R localhost.localdomain:19181 \
> + -R localhost.localdomain:19182
> +sleep 1
> +
> +echo " start-daemon: Waiting 20s, for daemons to start up..."
> +sleep 20
>
> exit 0
> diff --git a/test/stop-daemon b/test/stop-daemon
> index 221dc46..d00fda6 100755
> --- a/test/stop-daemon
> +++ b/test/stop-daemon
> @@ -1,23 +1,35 @@
> #!/bin/sh
>
> -if [ ! -f cld.pid ]
> -then
> - echo no daemon pid file found.
> - exit 1
> -fi
> +for n in 1 2 3
> +do
> + if [ ! -f cld$n.pid ]
> + then
> + echo " stop-daemon: cld$n.pid not found."
> + exit 1
> + fi
> +done
> +
>
> -kill `cat cld.pid`
> +kill `cat cld1.pid cld2.pid cld3.pid`
>
> for ((n = 0; n < 10; n++))
> do
> - if [ ! -f cld.pid ]
> + if [ -f cld1.pid -o -f cld2.pid -o -f cld3.pid ]
> then
> + sleep 1
> + else
> exit 0
> fi
>
> - sleep 1
> done
>
> -echo "PID file not removed, after signal sent."
> -rm -f cld.pid
> +for n in 1 2 3
> +do
> + if [ -f cld$n.pid ]
> + then
> + echo " stop-daemon: cld$n.pid found, after signal sent."
> + fi
> +done
> +
> +rm -f cld?.pid
> exit 1
> --
> To unsubscribe from this list: send the line "unsubscribe hail-devel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
>
^ permalink raw reply [flat|nested] 7+ messages in thread
* Re: [PATCH v1] CLD replication (WIP)
2009-07-31 18:14 ` Sage Weil
@ 2009-07-31 18:38 ` Jeff Garzik
2009-07-31 18:52 ` Jeff Garzik
2009-07-31 19:44 ` Sage Weil
0 siblings, 2 replies; 7+ messages in thread
From: Jeff Garzik @ 2009-07-31 18:38 UTC (permalink / raw)
To: Sage Weil; +Cc: hail-devel
Sage Weil wrote:
> Hi Jeff,
>
> Do you still plan to replace bdb (and it's replication) with a something
> based on paxos? I'm considering replacing the Ceph monitors (which
> currently implement paxos, but in a very ceph-specific way) with cld if it
> can meet the basic requirements.
>
> What I'd kind of like to see is a clean implementation of a paxos
> library--one that leaves out message transport and storage--to build a
> replicated write-ahead log. And then a separate library for handling the
> database/namespace served up by cld (be it regular files, bdb, whatever)
> that leaves replication up to paxos. It looks like Google ended up doing
> something similar with Chubby (see
> http://labs.google.com/papers/paxos_made_live.html).
>
> Does this sound like the direction you guys are heading in?
You mean something like http://linux.yyz.us/misc/paxreg.c ? :)
Yes, a straight PAXOS implementation is definitely in the plans, for
similar reasons as Google describes: CLD just doesn't need full db4
replicated transactions, when a PAXOS replicated, write-ahead logging
database would work just fine.
That said, the urgency of this task is lowered, because current db4
(after much trial, tribulation and debugging) did eventually start
supporting PAXOS. Some of the Google hints paper hint at this, noting
that Sleepycat/Oracle "eventually" fixed all the problems -- but by then
it was too late, Google had moved on and done their own db.
Most importantly, from the view of a CLD client (libcldc user), CLD will
provide the necessary guarantees today. When CLD switches to native
PAXOS, the CLD client API will not change at all. So, the switchover
should be transparent from the client's point of view.
Jeff
^ permalink raw reply [flat|nested] 7+ messages in thread
* Re: [PATCH v1] CLD replication (WIP)
2009-07-31 18:38 ` Jeff Garzik
@ 2009-07-31 18:52 ` Jeff Garzik
2009-07-31 19:44 ` Sage Weil
1 sibling, 0 replies; 7+ messages in thread
From: Jeff Garzik @ 2009-07-31 18:52 UTC (permalink / raw)
To: Sage Weil; +Cc: hail-devel
Jeff Garzik wrote:
> Sage Weil wrote:
>> Hi Jeff,
>>
>> Do you still plan to replace bdb (and it's replication) with a
>> something based on paxos? I'm considering replacing the Ceph monitors
>> (which currently implement paxos, but in a very ceph-specific way)
>> with cld if it can meet the basic requirements.
>>
>> What I'd kind of like to see is a clean implementation of a paxos
>> library--one that leaves out message transport and storage--to build a
>> replicated write-ahead log. And then a separate library for handling
>> the database/namespace served up by cld (be it regular files, bdb,
>> whatever) that leaves replication up to paxos. It looks like Google
>> ended up doing something similar with Chubby (see
>> http://labs.google.com/papers/paxos_made_live.html).
>>
>> Does this sound like the direction you guys are heading in?
>
> You mean something like http://linux.yyz.us/misc/paxreg.c ? :)
Oh, and the paper related to this is "The Paxos Register",
http://www.cs.utexas.edu/~harry/papers/Li07Paxos.pdf
Jeff
^ permalink raw reply [flat|nested] 7+ messages in thread
* Re: [PATCH v1] CLD replication (WIP)
2009-07-31 18:38 ` Jeff Garzik
2009-07-31 18:52 ` Jeff Garzik
@ 2009-07-31 19:44 ` Sage Weil
1 sibling, 0 replies; 7+ messages in thread
From: Sage Weil @ 2009-07-31 19:44 UTC (permalink / raw)
To: Jeff Garzik; +Cc: hail-devel
On Fri, 31 Jul 2009, Jeff Garzik wrote:
> Sage Weil wrote:
> > Hi Jeff,
> >
> > Do you still plan to replace bdb (and it's replication) with a something
> > based on paxos? I'm considering replacing the Ceph monitors (which
> > currently implement paxos, but in a very ceph-specific way) with cld if it
> > can meet the basic requirements.
> >
> > What I'd kind of like to see is a clean implementation of a paxos
> > library--one that leaves out message transport and storage--to build a
> > replicated write-ahead log. And then a separate library for handling the
> > database/namespace served up by cld (be it regular files, bdb, whatever)
> > that leaves replication up to paxos. It looks like Google ended up doing
> > something similar with Chubby (see
> > http://labs.google.com/papers/paxos_made_live.html).
> >
> > Does this sound like the direction you guys are heading in?
>
> You mean something like http://linux.yyz.us/misc/paxreg.c ? :)
Yeah, for starters. I'm thinking of the larger problem of integrating of
master elections, add leasing and timeouts (to avoid querying peers for
reads), and so forth to make core paxos usable in a practical environment.
And the glue to bind it to the database (snapshotting and log trimming,
catch-up, etc.).
> Yes, a straight PAXOS implementation is definitely in the plans, for similar
> reasons as Google describes: CLD just doesn't need full db4 replicated
> transactions, when a PAXOS replicated, write-ahead logging database would work
> just fine.
>
> That said, the urgency of this task is lowered, because current db4 (after
> much trial, tribulation and debugging) did eventually start supporting PAXOS.
> Some of the Google hints paper hint at this, noting that Sleepycat/Oracle
> "eventually" fixed all the problems -- but by then it was too late, Google had
> moved on and done their own db.
Oh okay, that makes db4 entirely usable today then!
> Most importantly, from the view of a CLD client (libcldc user), CLD will
> provide the necessary guarantees today. When CLD switches to native PAXOS,
> the CLD client API will not change at all. So, the switchover should be
> transparent from the client's point of view.
Of course. :)
I'm going to look a bit more closely at what it'll take to moving ceph to
cld, then. Among other things, it'll mean part of cldc in the kernel, but
should be a net architectural improvement.
sage
^ permalink raw reply [flat|nested] 7+ messages in thread
* [PATCH v2] CLD replication (WIP)
2009-07-31 11:15 [PATCH v1] CLD replication (WIP) Jeff Garzik
@ 2009-08-06 5:49 ` Jeff Garzik
0 siblings, 0 replies; 7+ messages in thread
From: Jeff Garzik @ 2009-08-06 5:49 UTC (permalink / raw)
To: Project Hail
[-- Attachment #1: Type: text/plain, Size: 1070 bytes --]
Attached is the current CLD replication patch, which takes CLD from
being a single-node service to a fully replicated, highly available service.
The server implementation should be complete.
The current merge blocker is needed code in libcldc, which does not yet
properly "hunt" for a master, among a group of peer CLD replicas in a
CLD cell.
This will be a big milestone for CLD, when merged. The next milestone
will be adding the needed strict-cache-coherence caching semantics to
CLD server and client lib.
This patch was generated against git commit
924b04dde2792c19dbace6e66aae456eff8a8829.
Changes since last posting:
- rediff for current upstream
> server/cld.h | 20 +++
> server/cldb.c | 69 +++++++++++--
> server/cldb.h | 9 +
> server/cldbadm.c | 8 -
> server/server.c | 286 +++++++++++++++++++++++++++++++++++++++++++++++++++---
> test/pid-exists | 13 +-
> test/prep-db | 19 ++-
> test/start-daemon | 26 ++++
> test/stop-daemon | 16 +--
> 9 files changed, 414 insertions(+), 52 deletions(-)
[-- Attachment #2: patch.cld-rep --]
[-- Type: text/plain, Size: 19162 bytes --]
diff --git a/server/cld.h b/server/cld.h
index 21f103d..08e6b12 100644
--- a/server/cld.h
+++ b/server/cld.h
@@ -91,6 +91,15 @@ struct msg_params {
size_t msg_len;
};
+enum st_cldb {
+ ST_CLDB_INIT,
+ ST_CLDB_OPEN,
+ ST_CLDB_ACTIVE,
+ ST_CLDB_MASTER,
+ ST_CLDB_SLAVE,
+ ST_CLDBNUM
+};
+
struct server_stats {
unsigned long poll; /* number polls */
unsigned long event; /* events dispatched */
@@ -114,6 +123,17 @@ struct server {
int pid_fd;
char *port; /* bind port */
+ unsigned short rep_port; /* db4 replication port */
+
+ char *myhost;
+ char *force_myhost;
+ GList *rep_remotes;
+
+ unsigned int n_peers; /* total peers in cell */
+
+ int rep_pipe[2];
+
+ enum st_cldb state_cldb, state_cldb_new;
struct cldb cldb; /* database info */
diff --git a/server/cldb.c b/server/cldb.c
index 3e7c95c..254decd 100644
--- a/server/cldb.c
+++ b/server/cldb.c
@@ -25,8 +25,6 @@
#include <glib.h>
#include "cld.h"
-static int cldb_up(struct cldb *cldb, unsigned int flags);
-
/*
* db4 page sizes for our various databases. Filesystem block size
* is recommended, so 4096 was chosen (default ext3 block size).
@@ -202,6 +200,30 @@ err_out:
return -EIO;
}
+static int add_remote_sites(DB_ENV *dbenv, GList *remotes, int *nsites)
+{
+ int rc;
+ struct db_remote *rp;
+ GList *tmp;
+
+ *nsites = 0;
+ for (tmp = remotes; tmp; tmp = tmp->next) {
+ rp = tmp->data;
+
+ rc = dbenv->repmgr_add_remote_site(dbenv, rp->host, rp->port,
+ NULL, 0);
+ if (rc) {
+ dbenv->err(dbenv, rc,
+ "dbenv->add.remote.site host %s port %u",
+ rp->host, rp->port);
+ return rc;
+ }
+ (*nsites)++;
+ }
+
+ return 0;
+}
+
static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
{
struct cldb *cldb = dbenv->app_private;
@@ -229,12 +251,13 @@ static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
unsigned int env_flags, const char *errpfx, bool do_syslog,
- unsigned int flags, void (*cb)(enum db_event))
+ GList *remotes, char *rep_host, unsigned short rep_port,
+ int n_peers, void (*cb)(enum db_event))
{
- int rc;
+ int rc, nsites = 0;
DB_ENV *dbenv;
- cldb->is_master = true;
+ cldb->is_master = false;
cldb->home = db_home;
cldb->state_cb = cb;
@@ -281,25 +304,55 @@ int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
cldb->keyed = true;
}
+ rc = dbenv->repmgr_set_local_site(dbenv, rep_host, rep_port, 0);
+ if (rc) {
+ dbenv->err(dbenv, rc, "dbenv->set_local_site");
+ goto err_out;
+ }
+
rc = dbenv->set_event_notify(dbenv, db4_event);
if (rc) {
dbenv->err(dbenv, rc, "dbenv->set_event_notify");
goto err_out;
}
+ rc = dbenv->rep_set_priority(dbenv, 100);
+ if (rc) {
+ dbenv->err(dbenv, rc, "dbenv->rep_set_priority");
+ goto err_out;
+ }
+
+ rc = dbenv->rep_set_nsites(dbenv, n_peers);
+ if (rc) {
+ dbenv->err(dbenv, rc, "dbenv->rep_set_nsites");
+ goto err_out;
+ }
+
+ rc = dbenv->repmgr_set_ack_policy(dbenv, DB_REPMGR_ACKS_QUORUM);
+ if (rc) {
+ dbenv->err(dbenv, rc, "dbenv->rep_ack_policy");
+ goto err_out;
+ }
+
/* init DB transactional environment, stored in directory db_home */
env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
- env_flags |= DB_INIT_TXN;
+ env_flags |= DB_INIT_TXN | DB_INIT_REP;
rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
if (rc) {
dbenv->err(dbenv, rc, "dbenv->open");
goto err_out;
}
- rc = cldb_up(cldb, flags);
+ rc = add_remote_sites(dbenv, remotes, &nsites);
if (rc)
goto err_out;
+ rc = dbenv->repmgr_start(dbenv, 2, DB_REP_ELECTION);
+ if (rc) {
+ dbenv->err(dbenv, rc, "dbenv->repmgr_start");
+ goto err_out;
+ }
+
return 0;
err_out:
@@ -310,7 +363,7 @@ err_out:
/*
* open databases
*/
-static int cldb_up(struct cldb *cldb, unsigned int flags)
+int cldb_up(struct cldb *cldb, unsigned int flags)
{
DB_ENV *dbenv = cldb->env;
int rc;
diff --git a/server/cldb.h b/server/cldb.h
index d28f732..f8f26db 100644
--- a/server/cldb.h
+++ b/server/cldb.h
@@ -107,6 +107,11 @@ enum db_event {
CLDB_EV_NONE, CLDB_EV_CLIENT, CLDB_EV_MASTER, CLDB_EV_ELECTED
};
+struct db_remote { /* remotes for cldb_init */
+ char *host;
+ unsigned short port;
+};
+
struct cldb {
bool is_master;
bool keyed; /* using encryption? */
@@ -133,7 +138,9 @@ struct cldb {
extern int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
unsigned int env_flags, const char *errpfx, bool do_syslog,
- unsigned int flags, void (*cb)(enum db_event));
+ GList *remotes, char *rep_host, unsigned short rep_port,
+ int n_peers, void (*cb)(enum db_event));
+extern int cldb_up(struct cldb *cldb, unsigned int flags);
extern void cldb_down(struct cldb *cldb);
extern void cldb_fini(struct cldb *cldb);
diff --git a/server/cldbadm.c b/server/cldbadm.c
index 37e8e36..9342f66 100644
--- a/server/cldbadm.c
+++ b/server/cldbadm.c
@@ -78,7 +78,8 @@ int main(int argc, char *argv[])
}
if (cldb_init(&cld_adm.cldb, cld_adm.data_dir, NULL,
- DB_RECOVER, "cldbadm", false, 0, NULL))
+ DB_RECOVER, "cldbadm", false,
+ NULL, NULL, 0, 0, NULL))
goto err_dbopen;
switch (cld_adm.mode) {
@@ -142,8 +143,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state)
* Stubs for contents of cldb.c
*/
int cldb_init(struct cldb *cldb, const char *db_home, const char *db_password,
- unsigned int env_flags, const char *errpfx, bool do_syslog,
- unsigned int flags, void (*cb)(enum db_event))
+ unsigned int env_flags, const char *errpfx, bool do_syslog,
+ GList *remotes, char *rep_host, unsigned short rep_port,
+ int n_peers, void (*cb)(enum db_event))
{
return 0;
diff --git a/server/server.c b/server/server.c
index b6e1332..9e15284 100644
--- a/server/server.c
+++ b/server/server.c
@@ -29,6 +29,7 @@
#include <errno.h>
#include <syslog.h>
#include <locale.h>
+#include <ctype.h>
#include <argp.h>
#include <netdb.h>
#include <signal.h>
@@ -46,6 +47,12 @@ const char *argp_program_version = PACKAGE_VERSION;
enum {
CLD_RAW_MSG_SZ = 4096,
+
+ CLD_DEF_REP_PORT = 9081,
+
+ CLD_DEF_PEERS = 5,
+ CLD_MIN_PEERS = 3,
+ CLD_MAX_PEERS = 400, /* arbitrary "sanity" limit */
};
static struct argp_option options[] = {
@@ -58,10 +65,18 @@ static struct argp_option options[] = {
"Switch the log to standard error" },
{ "foreground", 'F', NULL, 0,
"Run in foreground, do not fork" },
+ { "myhost", 'm', "HOST", 0,
+ "Force local hostname to HOST (def: autodetect)" },
{ "port", 'p', "PORT", 0,
"bind to UDP port PORT. Default: " CLD_DEF_PORT },
{ "pid", 'P', "FILE", 0,
"Write daemon process id to FILE. Default: " CLD_DEF_PIDFN },
+ { "rep-port", 'r', "PORT", 0,
+ "bind replication engine to port PORT (def: 9081)" },
+ { "remote", 'R', "HOST:PORT", 0,
+ "Add a HOST:PORT pair to list of remote hosts. Use this argument multiple times to build cell's peer list." },
+ { "cell-size", 'S', "PEERS", 0,
+ "Total number of PEERS in cell. (PEERS/2)+1 required for quorum. Must be an odd number (def: 5)" },
{ }
};
@@ -79,10 +94,15 @@ static bool use_syslog = true;
int debugging = 0;
struct timeval current_time;
+static const char *state_name_cldb[ST_CLDBNUM] = {
+ "Init", "Open", "Active", "Master", "Slave"
+};
struct server cld_srv = {
- .data_dir = CLD_DEF_DATADIR,
- .pid_file = CLD_DEF_PIDFN,
+ .data_dir = "/spare/tmp/cld/lib",
+ .pid_file = "/var/run/cld.pid",
.port = CLD_DEF_PORT,
+ .rep_port = CLD_DEF_REP_PORT,
+ .n_peers = CLD_DEF_PEERS,
};
static void ensure_root(void);
@@ -108,6 +128,33 @@ void cldlog(int prio, const char *fmt, ...)
va_end(ap);
}
+/*
+ * Find out own hostname.
+ * This is needed for:
+ * - finding the local domain and its SRV records
+ * Do this before our state machines start ticking, so we can quit with
+ * a meaningful message easily.
+ */
+static char *get_hostname(void)
+{
+ enum { hostsz = 64 };
+ char hostb[hostsz];
+ char *ret;
+
+ if (gethostname(hostb, hostsz-1) < 0) {
+ cldlog(LOG_ERR, "get_hostname: gethostname error (%d): %s",
+ errno, strerror(errno));
+ exit(1);
+ }
+ hostb[hostsz-1] = 0;
+ if ((ret = strdup(hostb)) == NULL) {
+ cldlog(LOG_ERR, "get_hostname: no core (%ld)",
+ (long)strlen(hostb));
+ exit(1);
+ }
+ return ret;
+}
+
int udp_tx(struct server_socket *sock, struct sockaddr *addr,
socklen_t addr_len, const void *data, size_t data_len)
{
@@ -486,6 +533,55 @@ static void cldb_checkpoint(struct timer *timer)
add_chkpt_timer();
}
+static void cldb_state_cb(enum db_event event)
+{
+
+ switch (event) {
+ case CLDB_EV_ELECTED:
+ /*
+ * Safe to stop ignoring bogus client indication,
+ * so unmute us by advancing the state.
+ */
+ if (cld_srv.state_cldb == ST_CLDB_OPEN)
+ cld_srv.state_cldb = ST_CLDB_ACTIVE;
+ break;
+ case CLDB_EV_CLIENT:
+ case CLDB_EV_MASTER:
+ /*
+ * This callback runs on the context of the replication
+ * manager thread, and calling any of our functions thus
+ * turns our program into a multi-threaded one. Instead
+ * we do a loopbreak and postpone the processing.
+ */
+ if (cld_srv.state_cldb != ST_CLDB_INIT &&
+ cld_srv.state_cldb != ST_CLDB_OPEN) {
+ char c = 0x42;
+
+ if (event == CLDB_EV_MASTER)
+ cld_srv.state_cldb_new = ST_CLDB_MASTER;
+ else
+ cld_srv.state_cldb_new = ST_CLDB_SLAVE;
+ if (debugging) {
+ cldlog(LOG_DEBUG, "CLDB state > %s",
+ state_name_cldb[cld_srv.state_cldb_new]);
+ }
+
+ /* wake up main loop */
+ write(cld_srv.rep_pipe[1], &c, 1);
+ }
+ break;
+ default:
+ cldlog(LOG_WARNING, "API confusion with CLDB, event 0x%x", event);
+ cld_srv.state_cldb = ST_CLDB_OPEN; /* wrong, stub for now */
+ cld_srv.state_cldb_new = ST_CLDB_INIT;
+ }
+}
+
+static bool noop_event(int fd, short events, void *userdata)
+{
+ return true; /* continue main loop; do NOT terminate server */
+}
+
static int net_open(void)
{
int ipv6_found = 0;
@@ -587,6 +683,33 @@ err_addr:
return rc;
}
+static void cldb_state_process(enum st_cldb new_state)
+{
+ unsigned int db_flags;
+
+ if ((new_state == ST_CLDB_MASTER || new_state == ST_CLDB_SLAVE) &&
+ cld_srv.state_cldb == ST_CLDB_ACTIVE) {
+
+ db_flags = DB_CREATE | DB_THREAD;
+ if (cldb_up(&cld_srv.cldb, db_flags))
+ return;
+
+ ensure_root();
+
+ if (sess_load(cld_srv.sessions) != 0) {
+ cldlog(LOG_ERR, "session load failed. "
+ "FIXME: I want error handling");
+ return;
+ }
+
+ add_chkpt_timer();
+ } else {
+ if (debugging)
+ cldlog(LOG_DEBUG, "unhandled state transition %d -> %d",
+ cld_srv.state_cldb, new_state);
+ }
+}
+
static void segv_signal(int signo)
{
cldlog(LOG_ERR, "SIGSEGV");
@@ -610,10 +733,59 @@ static void stats_dump(void)
{
X(poll);
X(event);
+ cldlog(LOG_INFO, "State: CLDB %s",
+ state_name_cldb[cld_srv.state_cldb]);
}
#undef X
+static bool add_remote(const char *arg)
+{
+ size_t arg_len = strlen(arg);
+ int i, port;
+ struct db_remote *rp;
+ char *s_port, *colon;
+
+ if (!arg_len)
+ return false;
+
+ /* verify no whitespace in input */
+ for (i = 0; i < arg_len; i++)
+ if (isspace(arg[i]))
+ return false;
+
+ /* find colon delimiter */
+ colon = strchr(arg, ':');
+ if (!colon || (colon == arg))
+ return false;
+ s_port = colon + 1;
+
+ /* parse replication port number */
+ port = atoi(s_port);
+ if (port < 1 || port > 65535)
+ return false;
+
+ /* alloc and fill in remote-host record */
+ rp = malloc(sizeof(*rp));
+ if (!rp)
+ return false;
+
+ rp->port = port;
+ rp->host = strdup(arg);
+ if (!rp->host) {
+ free(rp);
+ return false;
+ }
+
+ /* truncate string down to simply hostname portion */
+ rp->host[colon - arg] = 0;
+
+ /* add remote host to global list */
+ cld_srv.rep_remotes = g_list_append(cld_srv.rep_remotes, rp);
+
+ return true;
+}
+
static error_t parse_opt (int key, char *arg, struct argp_state *state)
{
switch(key) {
@@ -634,6 +806,15 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
case 'F':
cld_srv.flags |= SFL_FOREGROUND;
break;
+ case 'm':
+ if ((strlen(arg) > 3) && (strlen(arg) < 64) &&
+ (strchr(arg, '.')))
+ cld_srv.force_myhost = arg;
+ else {
+ fprintf(stderr, "invalid myhost: '%s'\n", arg);
+ argp_usage(state);
+ }
+ break;
case 'p':
if (atoi(arg) > 0 && atoi(arg) < 65536)
cld_srv.port = arg;
@@ -645,6 +826,31 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
case 'P':
cld_srv.pid_file = arg;
break;
+ case 'r':
+ if (atoi(arg) > 0 && atoi(arg) < 65536)
+ cld_srv.rep_port = atoi(arg);
+ else {
+ fprintf(stderr, "invalid rep-port: '%s'\n", arg);
+ argp_usage(state);
+ }
+ break;
+ case 'R':
+ if (!add_remote(arg)) {
+ fprintf(stderr, "invalid remote host:port: '%s'\n", arg);
+ argp_usage(state);
+ }
+ break;
+ case 'S': {
+ int n_peers = atoi(arg);
+ if ((n_peers >= CLD_MIN_PEERS) && (n_peers < CLD_MAX_PEERS) &&
+ (n_peers & 0x01))
+ cld_srv.n_peers = atoi(arg);
+ else {
+ fprintf(stderr, "invalid peer count: '%s'\n", arg);
+ argp_usage(state);
+ }
+ break;
+ }
case ARGP_KEY_ARG:
argp_usage(state); /* too many args */
break;
@@ -660,9 +866,12 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
int main (int argc, char *argv[])
{
error_t aprc;
- int rc = 1;
+ int rc = 1, env_flags;
time_t next_timeout;
+ cld_srv.state_cldb =
+ cld_srv.state_cldb_new = ST_CLDB_INIT;
+
/* isspace() and strcasecmp() consistency requires this */
setlocale(LC_ALL, "C");
@@ -686,6 +895,20 @@ int main (int argc, char *argv[])
if (use_syslog)
openlog(PROGRAM_NAME, LOG_PID, LOG_LOCAL3);
+ if (cld_srv.force_myhost)
+ cld_srv.myhost = strdup(cld_srv.force_myhost);
+ else
+ cld_srv.myhost = get_hostname();
+
+ if (debugging)
+ cldlog(LOG_DEBUG, "our hostname: %s", cld_srv.myhost);
+
+ /* remotes file should list all in peer group, except for us */
+ if ((cld_srv.n_peers - 1) != g_list_length(cld_srv.rep_remotes)) {
+ cldlog(LOG_ERR, "n_peers does not match remotes file loaded");
+ goto err_out;
+ }
+
if (!(cld_srv.flags & SFL_FOREGROUND) && (daemon(1, !use_syslog) < 0)) {
syslogerr("daemon");
goto err_out;
@@ -706,16 +929,7 @@ int main (int argc, char *argv[])
signal(SIGTERM, term_signal);
signal(SIGUSR1, stats_signal);
- if (cldb_init(&cld_srv.cldb, cld_srv.data_dir, NULL,
- DB_CREATE | DB_THREAD | DB_RECOVER,
- "cld", use_syslog,
- DB_CREATE | DB_THREAD, NULL))
- exit(1);
-
- ensure_root();
-
timer_init(&cld_srv.chkpt_timer, cldb_checkpoint, NULL);
- add_chkpt_timer();
rc = 1;
@@ -728,16 +942,51 @@ int main (int argc, char *argv[])
!cld_srv.polls)
goto err_out_pid;
- if (sess_load(cld_srv.sessions) != 0)
- goto err_out_pid;
+ if (pipe(cld_srv.rep_pipe) < 0) {
+ syslogerr("pipe");
+ goto err_out;
+ }
/* set up server networking */
rc = net_open();
if (rc)
goto err_out_pid;
+ {
+ struct pollfd pfd;
+ struct server_poll sp;
+
+ /*
+ * add pipe to poll list, after doing so with our net sockets
+ */
+ sp.fd = cld_srv.rep_pipe[0];
+ sp.cb = noop_event;
+ sp.userdata = NULL;
+ g_array_append_val(cld_srv.poll_data, sp);
+
+ pfd.fd = cld_srv.rep_pipe[0];
+ pfd.events = POLLIN;
+ pfd.revents = 0;
+ g_array_append_val(cld_srv.polls, pfd);
+ }
+
+ env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
+ if (cldb_init(&cld_srv.cldb, cld_srv.data_dir, NULL,
+ env_flags, "cld", true,
+ cld_srv.rep_remotes,
+ cld_srv.myhost, cld_srv.rep_port,
+ cld_srv.n_peers, cldb_state_cb)) {
+ cldlog(LOG_ERR, "Failed to open CLDB, limping");
+ } else {
+ cld_srv.state_cldb =
+ cld_srv.state_cldb_new = ST_CLDB_OPEN;
+ }
+
cldlog(LOG_INFO, "initialized: dbg %u",
debugging);
+ cldlog(LOG_INFO, "replication: %s:%u",
+ cld_srv.myhost,
+ cld_srv.rep_port);
next_timeout = timers_run();
@@ -800,13 +1049,20 @@ int main (int argc, char *argv[])
}
next_timeout = timers_run();
+
+ if (cld_srv.state_cldb_new != ST_CLDB_INIT &&
+ cld_srv.state_cldb_new != cld_srv.state_cldb) {
+ cldb_state_process(cld_srv.state_cldb_new);
+ cld_srv.state_cldb = cld_srv.state_cldb_new;
+ }
}
cldlog(LOG_INFO, "shutting down");
if (cld_srv.cldb.up)
cldb_down(&cld_srv.cldb);
- cldb_fini(&cld_srv.cldb);
+ if (cld_srv.state_cldb >= ST_CLDB_OPEN)
+ cldb_fini(&cld_srv.cldb);
rc = 0;
diff --git a/test/pid-exists b/test/pid-exists
index 351b4f1..4fa2275 100755
--- a/test/pid-exists
+++ b/test/pid-exists
@@ -1,9 +1,12 @@
#!/bin/sh
-if [ ! -f cld.pid ]
-then
- echo "pid file not found."
- exit 1
-fi
+for n in 1 2 3
+do
+ if [ ! -f cld$n.pid ]
+ then
+ echo "cld$n.pid not found."
+ exit 1
+ fi
+done
exit 0
diff --git a/test/prep-db b/test/prep-db
index 353ca4a..3e4fb60 100755
--- a/test/prep-db
+++ b/test/prep-db
@@ -2,13 +2,16 @@
DATADIR=data
-mkdir -p $DATADIR
-
-if [ ! -d $DATADIR ]
-then
- rm -rf $DATADIR
- echo "test database dir not found."
- exit 1
-fi
+for n in 1 2 3
+do
+ mkdir -p $DATADIR/n$n/data
+
+ if [ ! -d $DATADIR/n$n/data ]
+ then
+ rm -rf $DATADIR
+ echo "test database dir for node $n not found."
+ exit 1
+ fi
+done
exit 0
diff --git a/test/start-daemon b/test/start-daemon
index 4cb9fd7..06b3250 100755
--- a/test/start-daemon
+++ b/test/start-daemon
@@ -1,13 +1,31 @@
#!/bin/sh
-if [ -f cld.pid ]
+if [ -f cld1.pid -o -f cld2.pid -o -f cld3.pid ]
then
- echo "pid file found. daemon still running?"
+ echo "pid file found. daemons still running?"
exit 1
fi
-../server/cld -P cld.pid -d "$PWD/data" -p 18181 -E
+../server/cld -d "$PWD/data/n1/data" -p 18181 -r 19181 -P cld1.pid -E \
+ -D 2 -S 3 \
+ -m localhost.localdomain \
+ -R localhost.localdomain:19182 \
+ -R localhost.localdomain:19183
-sleep 3
+../server/cld -d "$PWD/data/n2/data" -p 18182 -r 19182 -P cld2.pid -E \
+ -D 2 -S 3 \
+ -m localhost.localdomain \
+ -R localhost.localdomain:19181 \
+ -R localhost.localdomain:19183
+
+../server/cld -d "$PWD/data/n3/data" -p 18183 -r 19183 -P cld3.pid -E \
+ -D 2 -S 3 \
+ -m localhost.localdomain \
+ -R localhost.localdomain:19181 \
+ -R localhost.localdomain:19182
+sleep 1
+
+echo " start-daemon: Waiting 20s, for daemons to start up..."
+sleep 20
exit 0
diff --git a/test/stop-daemon b/test/stop-daemon
index 27d985e..7cb4a6c 100755
--- a/test/stop-daemon
+++ b/test/stop-daemon
@@ -1,23 +1,23 @@
#!/bin/sh
-if [ ! -f cld.pid ]
+if [ ! -f cld1.pid -o ! -f cld2.pid -o ! -f cld3.pid ]
then
- echo no daemon pid file found.
+ echo at least one daemon pid file missing.
exit 1
fi
-kill `cat cld.pid`
+kill `cat cld*.pid`
for n in 0 1 2 3 4 5 6 7 8 9
do
- if [ ! -f cld.pid ]
+ if [ -f cld1.pid -o -f cld2.pid -o -f cld3.pid ]
then
+ sleep 1
+ else
exit 0
fi
-
- sleep 1
done
-echo "PID file not removed, after signal sent."
-rm -f cld.pid
+echo "PID file(s) not removed, after signal sent."
+rm -f cld*.pid
exit 1
^ permalink raw reply related [flat|nested] 7+ messages in thread
end of thread, other threads:[~2009-08-06 5:49 UTC | newest]
Thread overview: 7+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2009-07-31 11:15 [PATCH v1] CLD replication (WIP) Jeff Garzik
2009-08-06 5:49 ` [PATCH v2] " Jeff Garzik
-- strict thread matches above, loose matches on Subject: below --
2009-07-31 10:40 [PATCH v1] " Jeff Garzik
2009-07-31 18:14 ` Sage Weil
2009-07-31 18:38 ` Jeff Garzik
2009-07-31 18:52 ` Jeff Garzik
2009-07-31 19:44 ` Sage Weil
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.