* [tabled patch 3/3] Fix metadata replication
@ 2010-08-06 3:40 Pete Zaitcev
2010-08-10 19:14 ` Jeff Garzik
0 siblings, 1 reply; 5+ messages in thread
From: Pete Zaitcev @ 2010-08-06 3:40 UTC (permalink / raw)
To: Jeff Garzik; +Cc: Project Hail List
The metadata replication in tabled nominally existed, but did not
worked. There were a couple of small bugs (such as an attempt to
boot directly into Slave state would lead to a hang). However, the
biggest problem was how the identity of nodes in Replication Manager
API had to be the same as hostname. When doing so, repmgr code
used the hostname to bind instead of a wildcard socket. But when
doing so, on any stock Fedora or RHEL system it would end listening
on loopback only, because the /etc/hosts aliased the hostname to
loopback address. Thus running any replication required addition
host configuration that can cause any kind of unexpected consequences.
In addition it's impossible to run two nodes on one host for testing.
This patch does away with the Replication Manager and uses Base API
instead. This way, issues with host aliasing are addressed, and
the state transitions occur much faster because there is no voting.
Note that the provision is added to run peers on the same host,
using a configuration clause TDBRepName. I was unable to come up with
a reliable way to make persistent, nonconflicting identifiers that
would replace hostnames. Fortunately, this should only be used
for build tests, where we probably can live with it.
The resulting replication feature was tested to work. Not sure if
it is enough to trust it with one's data, but it's better than before.
Signed-off-by: Pete Zaitcev <zaitcev@redhat.com>
---
doc/etc.tabled.conf | 8
doc/setup.txt | 13 +
include/tdb.h | 15 -
lib/tdb.c | 130 ++++++-------
server/Makefile.am | 2
server/bucket.c | 34 +--
server/cldu.c | 416 ++++++++++++++++++++++++++++++++++++------
server/config.c | 10 +
server/object.c | 22 +-
server/replica.c | 8
server/server.c | 404 ++++++++++++++++++++++++++++++++++++----
server/tabled.h | 97 +++++++++
server/tdbadm.c | 51 +----
13 files changed, 963 insertions(+), 247 deletions(-)
commit 27a50dfeb3dec834b8f07dd95d0ec3d9c3963de3
Author: Pete Zaitcev <zaitcev@yahoo.com>
Date: Thu Aug 5 21:17:17 2010 -0600
Metadata replication.
diff --git a/doc/etc.tabled.conf b/doc/etc.tabled.conf
index 22d20a7..c3b1d1d 100644
--- a/doc/etc.tabled.conf
+++ b/doc/etc.tabled.conf
@@ -13,12 +13,12 @@
<!--
One group per DB, don't skimp on groups. Also, make sure the replication
- ports do not conflict when you make hosts to host several groups.
- Unfortunately, the diagnostics are not very good if they do.
- Most likely you'll see database corruption in such cases.
+ ports do not conflict when you make boxes to host several groups or use
+ replication instances iwth TDBRepName.
-->
<Group>ultracart2</Group>
-<TDB>/path/tabled/tdb</TDB>
+<TDB>/path/tabled-uc2/</TDB> <!-- mkdir -p /path/tabled-uc2 -->
+<!-- <TDBRepName>12345.my_local_node_name.example.com</TDBRepName> -->
<TDBRepPort>8083</TDBRepPort>
<!--
diff --git a/doc/setup.txt b/doc/setup.txt
index ac0dfb0..c7a4c6a 100644
--- a/doc/setup.txt
+++ b/doc/setup.txt
@@ -15,7 +15,9 @@ _cld._udp.phx2.ex.com has SRV record 10 50 8081 maika.phx2.ex.com.
Also, make sure that your hostname has a domain. We don't want to search
for CLD in the world-wide DNS root, do we?
- Make sure CLD is up (run "cldcli" to verify).
+ Once you know that CLD is running, verify that tabled can talk to
+ it by running "cldcli". UDP traffic to be allowed for port 8081 or
+ other port as specified in the SRV record.
*) Another thing to set up in DNS is a wildcard host for the system where
tabled will run. Unlike the SRV records of CLD, this is optional, but
@@ -30,6 +32,10 @@ emus3 IN A 192.168.128.9
All examples on Google say FQDN is required, and most presume aliasing
of A and AAAA records, but BIND 9 eats the above fine.
+*) Speaking of FQDN, it is possible to force tabled to use a non-default
+ hostname with ForceHost tag. In practice this is only useful when
+ the DNS is broken.
+
*) Copy configuration file from doc/etc.tabled.conf to /etc/tabled.conf
and edit to suit (see configurable items below). Notice that the file
looks like XML, but is not really. In particular, names of elements are
@@ -53,6 +59,11 @@ emus3 IN A 192.168.128.9
Group name defaults to "default", so you can leave this element unset,
but don't do it. Any name, even "qwerty", is better than the default.
+*) In each group, tabled uses its hostname to identify itself. However,
+ if you ever wish to run two tabled processes that serve the same group,
+ it can be accomplished by setting TDBRepName. N.B.: A loss of power for
+ the host will knock out all of them, so never use this in production.
+
*) Select the port to listen, if desired. This is done using the <Listen>
element:
diff --git a/include/tdb.h b/include/tdb.h
index 8895704..ff3b4b5 100644
--- a/include/tdb.h
+++ b/include/tdb.h
@@ -109,15 +109,12 @@ struct tabledb {
DB *oids; /* object ID db */
};
-struct db_remote { /* remotes for tdb_init */
- char *host;
- unsigned short port;
-};
-
-extern int tdb_init(struct tabledb *tdb, const char *home, const char *pass,
- unsigned int env_flags, const char *errpfx, bool do_syslog,
- GList *remotes, char *rep_host, unsigned short rep_port,
- void (*cb)(enum db_event));
+extern int tdb_init(struct tabledb *tdb, const char *db_home,
+ const char *db_password, const char *errpfx, bool do_syslog,
+ int rep_our_id,
+ int (*rep_send)(DB_ENV *dbenv, const DBT *ctl, const DBT *rec,
+ const DB_LSN *lsnp, int envid, uint32_t flags),
+ bool we_are_master, void (*cb)(enum db_event));
extern int tdb_up(struct tabledb *tdb, unsigned int open_flags);
extern void tdb_down(struct tabledb *tdb);
extern void tdb_fini(struct tabledb *tdb);
diff --git a/lib/tdb.c b/lib/tdb.c
index bc5e50a..29a18f0 100644
--- a/lib/tdb.c
+++ b/lib/tdb.c
@@ -1,6 +1,6 @@
/*
- * Copyright 2008-2009 Red Hat, Inc.
+ * Copyright 2008-2010 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -148,35 +148,15 @@ err_out:
return -EIO;
}
-static int add_remote_sites(DB_ENV *dbenv, GList *remotes, int *nsites)
-{
- int rc;
- struct db_remote *rp;
- GList *tmp;
-
- *nsites = 0;
- for (tmp = remotes; tmp; tmp = tmp->next) {
- rp = tmp->data;
-
- rc = dbenv->repmgr_add_remote_site(dbenv, rp->host, rp->port,
- NULL, 0);
- if (rc) {
- dbenv->err(dbenv, rc,
- "dbenv->add.remote.site host %s port %u",
- rp->host, rp->port);
- return rc;
- }
- (*nsites)++;
- }
-
- return 0;
-}
-
static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
{
struct tabledb *tdb = dbenv->app_private;
switch (event) {
+ case DB_EVENT_PANIC:
+ dbenv->errx(dbenv, "PANIC event is reported, exiting");
+ exit(2);
+ break;
case DB_EVENT_REP_CLIENT:
tdb->is_master = false;
if (tdb->state_cb)
@@ -191,6 +171,14 @@ static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
if (tdb->state_cb)
(*tdb->state_cb)(TDB_EV_ELECTED);
break;
+ case DB_EVENT_REP_NEWMASTER:
+ dbenv->errx(dbenv, "New master is reported: %d",
+ *(int *)event_info);
+ /* XXX Need to verify that it's the same master as before. */
+ break;
+ case DB_EVENT_REP_STARTUPDONE:
+ dbenv->errx(dbenv, "Client start-up complete");
+ break;
default:
/* do nothing */
break;
@@ -202,15 +190,18 @@ static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
* db_password, cb can be NULL
*/
int tdb_init(struct tabledb *tdb, const char *db_home, const char *db_password,
- unsigned int env_flags, const char *errpfx, bool do_syslog,
- GList *remotes, char *rep_host, unsigned short rep_port,
+ const char *errpfx, bool do_syslog, int rep_ourid,
+ int (*rep_send)(DB_ENV *dbenv, const DBT *ctl, const DBT *rec,
+ const DB_LSN *lsnp, int envid, uint32_t flags),
+ bool we_are_master,
void (*cb)(enum db_event))
{
- int nsites;
+ unsigned int env_flags;
+ unsigned int rep_flags;
int rc;
DB_ENV *dbenv;
- tdb->is_master = false;
+ tdb->is_master = we_are_master;
tdb->home = db_home;
tdb->state_cb = cb;
@@ -258,12 +249,6 @@ int tdb_init(struct tabledb *tdb, const char *db_home, const char *db_password,
tdb->keyed = true;
}
- rc = dbenv->repmgr_set_local_site(dbenv, rep_host, rep_port, 0);
- if (rc) {
- dbenv->err(dbenv, rc, "repmgr_set_local_site");
- goto err_out;
- }
-
rc = dbenv->set_event_notify(dbenv, db4_event);
if (rc) {
dbenv->err(dbenv, rc, "set_event_notify");
@@ -283,42 +268,65 @@ int tdb_init(struct tabledb *tdb, const char *db_home, const char *db_password,
// goto err_out;
// }
- rc = dbenv->rep_set_priority(dbenv, 100);
- if (rc) {
- dbenv->err(dbenv, rc, "rep_set_priority");
- goto err_out;
- }
+ if (rep_send) {
+ rc = dbenv->rep_set_transport(dbenv, rep_ourid, rep_send);
+ if (rc) {
+ dbenv->err(dbenv, rc, "rep_set_transport");
+ goto err_out;
+ }
- /* init DB transactional environment, stored in directory db_home */
- env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
- env_flags |= DB_INIT_TXN | DB_INIT_REP;
- rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
- if (rc) {
- dbenv->err(dbenv, rc, "open(dbenv)");
- goto err_out;
- }
+ // /*
+ // * Fix the derbies. This is the only way, since passing of
+ // * DB_REP_MASTER to rep_start() after a failover will end in:
+ // * "DB_REP_UNAVAIL: Unable to elect a master" (and a hang).
+ // */
+ // rc = dbenv->rep_set_priority(dbenv, we_are_master ? 100 : 10);
+ // if (rc) {
+ // dbenv->err(dbenv, rc, "rep_set_priority");
+ // goto err_out;
+ // }
+
+ env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
+ env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
+ env_flags |= DB_INIT_TXN | DB_INIT_REP;
+ rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
+ if (rc) {
+ dbenv->err(dbenv, rc, "open rep");
+ goto err_out;
+ }
- rc = add_remote_sites(dbenv, remotes, &nsites);
- if (rc)
- goto err_out;
+ rep_flags = we_are_master ? DB_REP_MASTER : DB_REP_CLIENT;
+ rc = dbenv->rep_start(dbenv, NULL, rep_flags);
+ if (rc) {
+ dbenv->err(dbenv, rc, "rep_start");
+ goto err_out;
+ }
- // rc = dbenv->rep_set_nsites(dbenv, nsites + 1);
- // if (rc) {
- // dbenv->err(dbenv, rc, "rep_set_nsites");
- // goto err_out;
- // }
+ } else {
+ env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
+ env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
+ env_flags |= DB_INIT_TXN;
+ rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
+ if (rc) {
+ dbenv->err(dbenv, rc, "open norep");
+ goto err_out;
+ }
- rc = dbenv->repmgr_start(dbenv, 2, DB_REP_ELECTION);
- if (rc) {
- dbenv->err(dbenv, rc, "repmgr_start");
- goto err_out;
+ /* XXX rip this out from tdbadm.c */
+ /*
+ * The db4 only delivers callbacks if replication was ordered.
+ * Since we force-set master, we ought to deliver them here
+ * for the universal code to work as if a master was elected.
+ */
+ if (cb)
+ (*cb)(we_are_master ? TDB_EV_MASTER : TDB_EV_CLIENT);
}
return 0;
err_out:
dbenv->close(dbenv, 0);
- return rc;
+ return -1;
}
/*
diff --git a/server/Makefile.am b/server/Makefile.am
index 6397245..5b53a0a 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -4,7 +4,7 @@ INCLUDES = -I$(top_srcdir)/include @GLIB_CFLAGS@ @HAIL_CFLAGS@
sbin_PROGRAMS = tabled tdbadm
tabled_SOURCES = tabled.h \
- bucket.c cldu.c config.c object.c replica.c \
+ bucket.c cldu.c config.c metarep.c object.c replica.c \
server.c status.c storage.c storparse.c util.c
tabled_LDADD = ../lib/libtdb.a \
@HAIL_LIBS@ @PCRE_LIBS@ @GLIB_LIBS@ \
diff --git a/server/bucket.c b/server/bucket.c
index a95d23e..eb03e03 100644
--- a/server/bucket.c
+++ b/server/bucket.c
@@ -43,11 +43,11 @@ bool has_access(const char *user, const char *bucket, const char *key,
size_t alloc_len, key_len = 0;
struct db_acl_key *acl_key;
struct db_acl_ent *acl;
- DB_ENV *dbenv = tdb.env;
+ DB_ENV *dbenv = tdbrep.tdb.env;
DB_TXN *txn = NULL;
DBT pkey, pval;
DBC *cur = NULL;
- DB *acls = tdb.acls;
+ DB *acls = tdbrep.tdb.acls;
if (user == NULL)
user = DB_ACL_ANON;
@@ -132,7 +132,7 @@ err_out:
static int add_access_user(DB_TXN *txn, const char *bucket, const char *key,
const char *user, const char *perms)
{
- DB *acls = tdb.acls;
+ DB *acls = tdbrep.tdb.acls;
int key_len;
int acl_len;
struct db_acl_ent *acl;
@@ -203,8 +203,8 @@ bool service_list(struct client *cli, const char *user)
bool rcb;
DB_TXN *txn = NULL;
DBC *cur = NULL;
- DB_ENV *dbenv = tdb.env;
- DB *bidx = tdb.buckets_idx;
+ DB_ENV *dbenv = tdbrep.tdb.env;
+ DB *bidx = tdbrep.tdb.buckets_idx;
DBT skey, pkey, pval;
if (asprintf(&s,
@@ -348,7 +348,7 @@ bool bucket_valid(const char *bucket)
static int bucket_find(DB_TXN *txn, const char *bucket, char *owner,
int owner_len)
{
- DB *buckets = tdb.buckets;
+ DB *buckets = tdbrep.tdb.buckets;
DBT key, val;
struct db_bucket_ent ent;
int rc;
@@ -455,9 +455,9 @@ bool bucket_add(struct client *cli, const char *user, const char *bucket)
struct db_bucket_ent ent;
bool setacl; /* is ok to put pre-existing bucket */
enum ReqACLC canacl;
- DB *buckets = tdb.buckets;
- DB *acls = tdb.acls;
- DB_ENV *dbenv = tdb.env;
+ DB *buckets = tdbrep.tdb.buckets;
+ DB *acls = tdbrep.tdb.acls;
+ DB_ENV *dbenv = tdbrep.tdb.env;
DB_TXN *txn = NULL;
DBT key, val;
@@ -589,11 +589,11 @@ bool bucket_del(struct client *cli, const char *user, const char *bucket)
enum errcode err = InternalError;
int rc;
struct db_bucket_ent ent;
- DB_ENV *dbenv = tdb.env;
+ DB_ENV *dbenv = tdbrep.tdb.env;
DB_TXN *txn = NULL;
- DB *buckets = tdb.buckets;
- DB *acls = tdb.acls;
- DB *objs = tdb.objs;
+ DB *buckets = tdbrep.tdb.buckets;
+ DB *acls = tdbrep.tdb.acls;
+ DB *objs = tdbrep.tdb.objs;
DBC *cur = NULL;
DBT key, val;
char structbuf[sizeof(struct db_acl_key) + 32];
@@ -922,9 +922,9 @@ static bool bucket_list_keys(struct client *cli, const char *user,
size_t pfx_len;
struct bucket_list_info bli;
bool rcb;
- DB_ENV *dbenv = tdb.env;
+ DB_ENV *dbenv = tdbrep.tdb.env;
DB_TXN *txn = NULL;
- DB *objs = tdb.objs;
+ DB *objs = tdbrep.tdb.objs;
DBC *cur = NULL;
DBT pkey, pval;
struct db_obj_key *obj_key;
@@ -1159,8 +1159,8 @@ bool access_list(struct client *cli, const char *bucket, const char *key,
GHashTable *param;
enum errcode err = InternalError;
- DB_ENV *dbenv = tdb.env;
- DB *acls = tdb.acls;
+ DB_ENV *dbenv = tdbrep.tdb.env;
+ DB *acls = tdbrep.tdb.acls;
int alloc_len;
char owner[64];
GList *res;
diff --git a/server/cldu.c b/server/cldu.c
index 5f3631b..45a6a83 100644
--- a/server/cldu.c
+++ b/server/cldu.c
@@ -35,6 +35,8 @@
#define ALIGN8(n) ((8 - ((n) & 7)) & 7)
+#define MASTER_FILE "MASTER"
+
struct chunk_node {
struct list_head link;
char name[65];
@@ -63,18 +65,22 @@ struct cld_session {
int actx; /* Active host cldv[actx] */
struct cld_host cldv[N_CLD];
+ char *thisname;
char *thisgroup;
char *thishost;
char *cfname; /* /tabled-group directory */
struct ncld_fh *cfh; /* /tabled-group directory, keep open for scan */
- char *ffname; /* /tabled-group/thishost */
- struct ncld_fh *ffh; /* /tabled-group/thishost, keep open for lock */
+ char *ffname; /* /tabled-group/thisname */
+ struct ncld_fh *ffh; /* /tabled-group/thisname, keep open for lock */
+ char *mfname; /* /tabled-group/MASTER */
+ struct ncld_fh *mfh; /* /tabled-group/MASTER, keep open for lock */
char *xfname; /* /chunk-GROUP directory */
struct list_head chunks; /* found in xfname, struct chunk_node */
};
static int cldu_set_cldc(struct cld_session *sp, int newactive);
+static int scan_peers(struct cld_session *sp);
static int scan_chunks(struct cld_session *sp);
static void next_chunk(struct cld_session *sp, struct chunk_node *np);
static void add_remote(const char *name);
@@ -113,13 +119,17 @@ static int cldu_nextactive(struct cld_session *sp)
* chunkservers that it uses, so this function only takes one group argument.
*/
static int cldu_setgroup(struct cld_session *sp,
- const char *thisgroup, const char *thishost)
+ const char *thisgroup, const char *thishost,
+ const char *thisname)
{
char *mem;
if (thisgroup == NULL) {
thisgroup = "default";
}
+ if (thisname == NULL) {
+ thisname = thishost;
+ }
sp->thisgroup = strdup(thisgroup);
if (!sp->thisgroup)
@@ -127,15 +137,22 @@ static int cldu_setgroup(struct cld_session *sp,
sp->thishost = strdup(thishost);
if (!sp->thishost)
goto err_oom;
+ sp->thisname = strdup(thisname);
+ if (!sp->thisname)
+ goto err_oom;
if (asprintf(&mem, "/tabled-%s", thisgroup) == -1)
goto err_oom;
sp->cfname = mem;
- if (asprintf(&mem, "/tabled-%s/%s", thisgroup, thishost) == -1)
+ if (asprintf(&mem, "/tabled-%s/%s", thisgroup, thisname) == -1)
goto err_oom;
sp->ffname = mem;
+ if (asprintf(&mem, "/tabled-%s/%s", thisgroup, MASTER_FILE) == -1)
+ goto err_oom;
+ sp->mfname = mem;
+
if (asprintf(&mem, "/chunk-%s", thisgroup) == -1)
goto err_oom;
sp->xfname = mem;
@@ -147,6 +164,259 @@ err_oom:
return 0;
}
+/*
+ * Ugh, side effects on tabled_srv.rep_master.
+ */
+static void cldu_parse_master(const char *mfname, const char *mfile, long len)
+{
+ enum lex_state { lex_tag, lex_colon, lex_val };
+ const char *tag, *val;
+ int taglen;
+ const char *name, *host, *port;
+ int namelen, hostlen, portlen;
+ char namebuf[65], hostbuf[65], portbuf[15];
+ long portnum;
+ enum lex_state state;
+ struct db_remote *rp;
+ const char *p;
+ char c;
+
+ name = NULL;
+ namelen = 0;
+ host = NULL;
+ hostlen = 0;
+ port = NULL;
+ portlen = 0;
+
+ p = mfile;
+ tag = p;
+ val = NULL;
+ state = lex_tag;
+ for (;;) {
+ if (p >= mfile+len)
+ break;
+ c = *p++;
+ if (state == lex_tag) {
+ if (c == ':') {
+ val = p;
+ state = lex_colon;
+ taglen = (p-1) - tag;
+ } else if (c == '\n') {
+ if (debugging)
+ applog(LOG_DEBUG,
+ "%s: No colon", mfname);
+ tag = p;
+ val = NULL;
+ state = lex_tag;
+ }
+ } else if (state == lex_colon) {
+ if (c == ' ') {
+ val = p;
+ } else if (c == '\n') {
+ if (debugging)
+ applog(LOG_DEBUG,
+ "%s: Empty value", mfname);
+ tag = p;
+ val = NULL;
+ state = lex_tag;
+ } else {
+ state = lex_val;
+ }
+ } else if (state == lex_val) {
+ if (c == '\n') {
+ if (taglen == sizeof("name")-1 &&
+ memcmp(tag, "name", taglen) == 0) {
+ name = val;
+ namelen = (p-1) - val;
+ } else if (taglen == sizeof("host")-1 &&
+ memcmp(tag, "host", taglen) == 0) {
+ host = val;
+ hostlen = (p-1) - val;
+ } else if (taglen == sizeof("port")-1 &&
+ memcmp(tag, "port", taglen) == 0) {
+ port = val;
+ portlen = (p-1) - val;
+ } else {
+ if (debugging)
+ applog(LOG_DEBUG,
+ "%s: Unknown tag %c[%d]",
+ mfname, tag[0], taglen);
+ }
+ tag = p;
+ val = NULL;
+ state = lex_tag;
+ }
+ } else {
+ return;
+ }
+ }
+
+ if (!name || !namelen) {
+ if (debugging)
+ applog(LOG_DEBUG, "%s: No name", mfname);
+ return;
+ }
+ if (!host || !hostlen) {
+ if (debugging)
+ applog(LOG_DEBUG, "%s: No host", mfname);
+ return;
+ }
+ if (!port || !portlen) {
+ if (debugging)
+ applog(LOG_DEBUG, "%s: No port", mfname);
+ return;
+ }
+
+ if (namelen >= sizeof(namebuf)) {
+ applog(LOG_ERR, "Long master name");
+ return;
+ }
+ memcpy(namebuf, name, namelen);
+ namebuf[namelen] = 0;
+
+ if (hostlen >= sizeof(hostbuf)) {
+ applog(LOG_ERR, "Long host");
+ return;
+ }
+ memcpy(hostbuf, host, hostlen);
+ hostbuf[hostlen] = 0;
+
+ if (portlen >= sizeof(portbuf)) {
+ applog(LOG_ERR, "Long port");
+ return;
+ }
+ memcpy(portbuf, port, portlen);
+ portbuf[portlen] = 0;
+ portnum = strtol(port, NULL, 10);
+ if (portnum <= 0 || portnum >= 65536) {
+ applog(LOG_ERR, "Bad port %s", portbuf);
+ return;
+ }
+
+ rp = tdb_find_remote_byname(namebuf);
+ if (!rp) {
+ if (debugging)
+ applog(LOG_DEBUG, "%s: Not found master %s",
+ mfname, namebuf);
+ return;
+ }
+
+ if (debugging)
+ applog(LOG_DEBUG, "Found master %s host %s port %u",
+ namebuf, hostbuf, portnum);
+
+ rp->host = strdup(hostbuf);
+ rp->port = portnum;
+ if (!rp->host)
+ return;
+ tabled_srv.rep_master = rp;
+}
+
+static void cldu_get_master(const char *mfname, struct ncld_fh *mfh)
+{
+ struct ncld_read *nrp;
+ struct timespec tm;
+ int error;
+
+ nrp = ncld_get(mfh, &error);
+ if (!nrp) {
+ applog(LOG_ERR, "CLD get(%s) failed: %d", mfname, error);
+ return;
+ }
+
+ if (nrp->length < 3) {
+ ncld_read_free(nrp);
+
+ /*
+ * Since master opens, locks, and writes, in that order,
+ * there's a gap between the lock and write. So, unrace a bit.
+ */
+ tm.tv_sec = 2;
+ tm.tv_nsec = 0;
+ nanosleep(&tm, NULL);
+
+ nrp = ncld_get(mfh, &error);
+ if (!nrp) {
+ applog(LOG_ERR, "CLD get(%s) failed: %d", mfname, error);
+ return;
+ }
+
+ if (nrp->length < 3) {
+ applog(LOG_ERR, "CLD master(%s) is empty", mfname);
+ ncld_read_free(nrp);
+ return;
+ }
+ }
+
+ cldu_parse_master(mfname, nrp->ptr, nrp->length);
+ ncld_read_free(nrp);
+}
+
+/*
+ * Lock the MASTER file, write or read it as needed.
+ * N.B. Only call this if you know that mfh is closed or never open:
+ * right after cldu_set_cldc (disposing of session closes handles),
+ * or when we were slave and so should not kept mfh ...
+ * FIXME this will become more interesting when we keep mfh open in slave
+ * state so we can have outstanding locks for master failover notification.
+ */
+static int cldu_set_master(struct cld_session *sp)
+{
+ char *buf;
+ int len;
+ int error;
+ int rc;
+
+ if (!sp->nsp)
+ return -1;
+
+ /* Maybe drop this later, after notifications work. */
+ if (debugging) {
+ rc = g_list_length(sp->nsp->handles);
+ applog(LOG_DEBUG, "open handles %d", rc);
+ }
+
+ sp->mfh = ncld_open(sp->nsp, sp->mfname,
+ COM_READ | COM_WRITE | COM_LOCK | COM_CREATE,
+ &error, 0, NULL, NULL);
+ if (!sp->mfh) {
+ applog(LOG_ERR, "CLD open(%s) failed: %d", sp->mfname, error);
+ goto err_open;
+ }
+
+ error = ncld_trylock(sp->mfh);
+ if (error) {
+ applog(LOG_INFO, "CLD lock(%s) failed: %d", sp->mfname, error);
+ cldu_get_master(sp->mfname, sp->mfh);
+ goto err_lock;
+ }
+
+ len = asprintf(&buf, "name: %s\nhost: %s\nport: %u\n",
+ sp->thisname, sp->thishost, tabled_srv.rep_port);
+ if (len < 0) {
+ applog(LOG_ERR, "internal error: no core");
+ goto err_wmem;
+ }
+
+ rc = ncld_write(sp->mfh, buf, len);
+ if (rc) {
+ applog(LOG_ERR, "CLD put(%s) failed: %d", sp->mfname, rc);
+ goto err_write;
+ }
+
+ free(buf);
+ return 0;
+
+err_write:
+ free(buf);
+err_wmem:
+ /* ncld_unlock() - close will unlock */
+err_lock:
+ ncld_close(sp->mfh);
+err_open:
+ return -1;
+}
+
static void cldu_tm_rescan(int fd, short events, void *userdata)
{
struct cld_session *sp = userdata;
@@ -162,14 +432,37 @@ static void cldu_tm_rescan(int fd, short events, void *userdata)
sp->nsp = NULL;
}
newactive = cldu_nextactive(sp);
- if (cldu_set_cldc(sp, newactive)) {
- evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
- return;
+ if (cldu_set_cldc(sp, newactive))
+ goto out;
+
+ if (cldu_set_master(sp) == 0) {
+ tabled_srv.state_want = ST_W_MASTER;
+ } else {
+ if (debugging)
+ applog(LOG_DEBUG, "Unable to relock %s",
+ sp->mfname);
+ tabled_srv.state_want = ST_W_SLAVE;
}
+ cld_update_cb();
+
sp->is_dead = false;
+ } else {
+ if (tabled_srv.state_want == ST_W_SLAVE) {
+ if (cldu_set_master(sp) == 0) {
+ tabled_srv.state_want = ST_W_MASTER;
+ } else {
+ if (debugging)
+ applog(LOG_DEBUG, "Unable to lock %s",
+ sp->mfname);
+ }
+ }
}
+ if (scan_peers(sp) != 0)
+ goto out;
scan_chunks(sp);
+
+ out:
evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
}
@@ -201,12 +494,6 @@ static void cldu_sess_event(void *priv, uint32_t what)
static int cldu_set_cldc(struct cld_session *sp, int newactive)
{
struct cldc_host *hp;
- struct ncld_read *nrp;
- char buf[100];
- const char *ptr;
- int dir_len;
- int total_len, rec_len, name_len;
- int len;
struct timespec tm;
int error;
int rc;
@@ -261,6 +548,7 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
/*
* Then, create the membership file for us.
+ * We lock it in case of two tabled running with same name by mistake.
*/
sp->ffh = ncld_open(sp->nsp, sp->ffname,
COM_WRITE | COM_LOCK | COM_CREATE,
@@ -285,11 +573,7 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
/*
* The usual reason why we get a lock conflict is
* restarting too quickly and hitting the previous lock
- * that is going to disappear soon.
- *
- * FIXME: However, it may also be that a master
- * is ok we we should become a slave, e.g. start TDB.
- * We do not support multi-node, but we should.
+ * that is going to disappear soon. Just wait it out.
*/
tm.tv_sec = 10;
tm.tv_nsec = 0;
@@ -299,21 +583,43 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
/*
* Write the file with our connection parameters.
*/
- len = snprintf(buf, sizeof(buf), "port: %u\n", tabled_srv.rep_port);
- if (len >= sizeof(buf)) {
- applog(LOG_ERR, "internal error: overflow for port (%d)", len);
- goto err_wmem;
- }
-
- rc = ncld_write(sp->ffh, buf, len);
+ rc = ncld_write(sp->ffh, "-\n", 2);
if (rc) {
applog(LOG_ERR, "CLD put(%s) failed: %d", sp->ffname, rc);
goto err_write;
}
/*
- * Read the directory.
+ * Finally, scan cfh to find peers, add with global effects.
*/
+ if (scan_peers(sp) != 0)
+ goto err_pscan;
+
+ return 0;
+
+err_pscan:
+err_write:
+err_lock:
+ ncld_close(sp->ffh); /* session-close closes these, maybe drop */
+err_fopen:
+ ncld_close(sp->cfh);
+err_copen:
+ ncld_sess_close(sp->nsp);
+ sp->nsp = NULL;
+err_nsess:
+err_addr:
+ return -1;
+}
+
+static int scan_peers(struct cld_session *sp)
+{
+ struct ncld_read *nrp;
+ char buf[65];
+ const char *ptr;
+ int dir_len;
+ int total_len, rec_len, name_len;
+ int error;
+
nrp = ncld_get(sp->cfh, &error);
if (!nrp) {
applog(LOG_ERR, "CLD get(%s) failed: %d", sp->cfname, error);
@@ -336,13 +642,20 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
else
buf[64] = 0;
- if (!strcmp(buf, sp->thishost)) {
+ if (!strcmp(buf, MASTER_FILE)) {
+ ; /* ignore special entry */
+ } else if (!strcmp(buf, sp->thisname)) {
if (debugging)
applog(LOG_DEBUG, " %s (ourselves)", buf);
} else {
- if (debugging)
- applog(LOG_DEBUG, " %s", buf);
- add_remote(buf);
+ if (tdb_find_remote_byname(buf)) {
+ if (debugging)
+ applog(LOG_DEBUG, " %s", buf);
+ } else {
+ if (debugging)
+ applog(LOG_DEBUG, " %s (new)", buf);
+ add_remote(buf);
+ }
}
ptr += total_len;
@@ -350,21 +663,9 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
}
ncld_read_free(nrp);
-
return 0;
err_dread:
-err_write:
-err_wmem:
-err_lock:
- ncld_close(sp->ffh); /* session-close closes these, maybe drop */
-err_fopen:
- ncld_close(sp->cfh);
-err_copen:
- ncld_sess_close(sp->nsp);
- sp->nsp = NULL;
-err_nsess:
-err_addr:
return -1;
}
@@ -508,9 +809,6 @@ err_mem:
return;
}
-/*
- * FIXME need to read port number from the file (port:<space>num).
- */
static void add_remote(const char *name)
{
struct db_remote *rp;
@@ -518,10 +816,15 @@ static void add_remote(const char *name)
rp = malloc(sizeof(struct db_remote));
if (!rp)
return;
+ memset(rp, 0, sizeof(struct db_remote));
+
+ /*
+ * Master assigns global IDs now, distributes them in login protocol.
+ */
+ rp->dbid = DBID_NONE;
- rp->port = 8083;
- rp->host = strdup(name);
- if (!rp->host) {
+ rp->name = strdup(name);
+ if (!rp->name) {
free(rp);
return;
}
@@ -564,7 +867,8 @@ void cld_init()
/*
* This initiates our sole session with a CLD instance.
*/
-int cld_begin(const char *thishost, const char *thisgroup, int verbose)
+int cld_begin(const char *thishost, const char *thisgroup,
+ const char *thisname, int verbose)
{
static struct cld_session *sp = &ses;
struct timespec tm;
@@ -575,7 +879,7 @@ int cld_begin(const char *thishost, const char *thisgroup, int verbose)
evtimer_set(&ses.tm_rescan, cldu_tm_rescan, &ses);
- if (cldu_setgroup(sp, thisgroup, thishost)) {
+ if (cldu_setgroup(sp, thisgroup, thishost, thisname)) {
/* Already logged error */
goto err_group;
}
@@ -626,6 +930,14 @@ int cld_begin(const char *thishost, const char *thisgroup, int verbose)
newactive = cldu_nextactive(sp);
}
+ if (cldu_set_master(sp) == 0) {
+ if (debugging)
+ applog(LOG_DEBUG, "Locked %s", sp->mfname);
+ tabled_srv.state_want = ST_W_MASTER;
+ } else {
+ tabled_srv.state_want = ST_W_SLAVE;
+ }
+
retry_cnt = 0;
for (;;) {
if (!scan_chunks(sp))
@@ -696,8 +1008,12 @@ void cld_end(void)
sp->ffname = NULL;
free(sp->xfname);
sp->xfname = NULL;
+ free(sp->mfname);
+ sp->mfname = NULL;
free(sp->thisgroup);
sp->thisgroup = NULL;
free(sp->thishost);
sp->thishost = NULL;
+ free(sp->thisname);
+ sp->thisname = NULL;
}
diff --git a/server/config.c b/server/config.c
index ff4d876..293a5dd 100644
--- a/server/config.c
+++ b/server/config.c
@@ -224,6 +224,16 @@ static void cfg_elm_end (GMarkupParseContext *context,
cc->text = NULL;
}
+ else if (!strcmp(element_name, "TDBRepName")) {
+ if (!cc->text) {
+ applog(LOG_WARNING, "TDBRepName element empty");
+ return;
+ }
+ free(tabled_srv.rep_name);
+ tabled_srv.rep_name = cc->text;
+ cc->text = NULL;
+ }
+
else if (!strcmp(element_name, "StatusPort")) {
if (!cc->text) {
applog(LOG_WARNING, "StatusPort element empty");
diff --git a/server/object.c b/server/object.c
index f8e7b12..3801e94 100644
--- a/server/object.c
+++ b/server/object.c
@@ -39,7 +39,7 @@
static int object_find(DB_TXN *txn, const char *bucket, const char *key,
struct db_obj_ent *pobj)
{
- DB *objs = tdb.objs;
+ DB *objs = tdbrep.tdb.objs;
struct db_obj_key *okey;
size_t alloc_len;
DBT pkey, pval;
@@ -72,7 +72,7 @@ static int object_find(DB_TXN *txn, const char *bucket, const char *key,
static bool __object_del(DB_TXN *txn, const char *bucket, const char *key)
{
- DB *objs = tdb.objs;
+ DB *objs = tdbrep.tdb.objs;
struct db_obj_key *okey;
size_t okey_len;
DBT pkey;
@@ -100,7 +100,7 @@ static bool __object_del(DB_TXN *txn, const char *bucket, const char *key)
bool object_del_acls(DB_TXN *txn, const char *bucket, const char *key)
{
- DB *acls = tdb.acls;
+ DB *acls = tdbrep.tdb.acls;
struct db_acl_key *akey;
size_t alloc_len;
DBT pkey;
@@ -163,8 +163,8 @@ bool object_del(struct client *cli, const char *user,
int rc;
enum errcode err = InternalError;
size_t alloc_len;
- DB_ENV *dbenv = tdb.env;
- DB *objs = tdb.objs;
+ DB_ENV *dbenv = tdbrep.tdb.env;
+ DB *objs = tdbrep.tdb.objs;
struct db_obj_key *okey;
struct db_obj_ent obje;
DBT pkey, pval;
@@ -326,9 +326,9 @@ static bool object_put_end(struct client *cli)
struct db_obj_ent oldobj;
bool delobj;
size_t alloc_len;
- DB_ENV *dbenv = tdb.env;
+ DB_ENV *dbenv = tdbrep.tdb.env;
DBT pkey, pval;
- DB *objs = tdb.objs;
+ DB *objs = tdbrep.tdb.objs;
DB_TXN *txn = NULL;
GByteArray *string_data;
GArray *string_lens;
@@ -786,7 +786,7 @@ static bool object_put_body(struct client *cli, const char *user,
return cli_err(cli, InternalError);
}
- objid = objid_next(&tabled_srv.object_count, &tdb);
+ objid = objid_next(&tabled_srv.object_count, &tdbrep.tdb);
rc = open_chunks(&cli->out_ch, &tabled_srv.all_stor,
cli, objid, content_len);
@@ -865,9 +865,9 @@ static bool object_put_acls(struct client *cli, const char *user,
{
enum errcode err = InternalError;
enum ReqACLC canacl;
- DB_ENV *dbenv = tdb.env;
+ DB_ENV *dbenv = tdbrep.tdb.env;
DB_TXN *txn = NULL;
- DB *objs = tdb.objs;
+ DB *objs = tdbrep.tdb.objs;
char *hdr;
char timestr[64];
int rc;
@@ -1130,7 +1130,7 @@ static bool object_get_body(struct client *cli, const char *user,
bool access_ok, modified = true;
GString *extra_hdr;
size_t alloc_len;
- DB *objs = tdb.objs;
+ DB *objs = tdbrep.tdb.objs;
struct db_obj_key *okey;
struct db_obj_ent *obj = NULL;
DBT pkey, pval;
diff --git a/server/replica.c b/server/replica.c
index ac14cb2..1b5e832 100644
--- a/server/replica.c
+++ b/server/replica.c
@@ -612,8 +612,8 @@ static void rep_scan_verify(struct rep_arg *arg,
static void rep_add_nid(unsigned int klen, struct db_obj_key *key, uint32_t nid)
{
- DB_ENV *db_env = tdb.env;
- DB *db_objs = tdb.objs;
+ DB_ENV *db_env = tdbrep.tdb.env;
+ DB *db_objs = tdbrep.tdb.objs;
DB_TXN *db_txn;
DBT pkey, pval;
struct db_obj_ent *obj;
@@ -749,8 +749,8 @@ static void rep_scan(struct rep_arg *arg)
g_mutex_unlock(kscan_mutex);
memset(&cur, 0, sizeof(struct cursor)); /* enough to construct */
- cur.db_env = tdb.env;
- cur.db_objs = tdb.objs;
+ cur.db_env = tdbrep.tdb.env;
+ cur.db_objs = tdbrep.tdb.objs;
kcnt = 0;
for (;;) {
diff --git a/server/server.c b/server/server.c
index 814afec..8859847 100644
--- a/server/server.c
+++ b/server/server.c
@@ -97,12 +97,15 @@ struct server tabled_srv = {
.config = "/etc/tabled.conf",
};
-struct tabledb tdb;
+struct tablerep tdbrep;
enum {
TT_CMD_DUMP,
TT_CMD_TDBST_MASTER,
- TT_CMD_TDBST_SLAVE
+ TT_CMD_TDBST_SLAVE,
+ TT_CMD_MASTER_LINK_RESET,
+ TT_CMD_LINK_SCRUB,
+ TT_CMDNUM
};
struct compiled_pat patterns[] = {
@@ -114,7 +117,11 @@ struct compiled_pat patterns[] = {
};
static char *state_name_tdb[ST_TDBNUM] = {
- "Init", "Open", "Active", "Master", "Slave"
+ "Init", "Open", "Master", "Slave"
+};
+
+static char *cmd_name_tdb[TT_CMDNUM] = {
+ "Dump", "GoMaster", "GoSlave", "MasterLinkReset", "LinkScrub"
};
static struct {
@@ -340,7 +347,7 @@ static int authcheck(struct http_req *req, char *extra_bucket,
* not match.
*/
- rc = tdb.passwd->get(tdb.passwd, NULL, &key, &val, 0);
+ rc = tdbrep.tdb.passwd->get(tdbrep.tdb.passwd, NULL, &key, &val, 0);
if (rc) {
pass = strdup("");
@@ -350,7 +357,7 @@ static int authcheck(struct http_req *req, char *extra_bucket,
char s[64];
snprintf(s, 64, "get user '%s'", user);
- tdb.passwd->err(tdb.passwd, rc, s);
+ tdbrep.tdb.passwd->err(tdbrep.tdb.passwd, rc, s);
}
} else {
pass = val.data;
@@ -387,8 +394,22 @@ static void stats_signal(int signo)
static void stats_dump(void)
{
- applog(LOG_INFO, "STATE: TDB %s",
- state_name_tdb[tabled_srv.state_tdb]);
+ struct db_remote *rp;
+ GList *tmp;
+
+ applog(LOG_INFO, "TDB: group %s state %s host %s rep_port %d dbid %d%s",
+ tabled_srv.group, state_name_tdb[tabled_srv.state_tdb],
+ tabled_srv.ourhost, tabled_srv.rep_port, tdbrep.thisid,
+ (tabled_srv.mc_delay)? " mc_delay": "");
+ for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) {
+ rp = tmp->data;
+ applog(LOG_INFO, "PN: name %s dbid %d", rp->name, rp->dbid);
+ if (rp->host)
+ applog(LOG_INFO, "PN: host %s port %d",
+ rp->host, rp->port);
+ if (rp == tabled_srv.rep_master)
+ applog(LOG_INFO, "PN (master)");
+ }
applog(LOG_INFO,
"STATS: poll %lu event %lu tcp_accept %lu opt_write %lu",
tabled_srv.stats.poll,
@@ -403,11 +424,17 @@ static void stats_dump(void)
bool stat_status(struct client *cli, GList *content)
{
+ struct db_remote *rp;
+ GList *tmp;
char *str;
+ int rc;
/*
* The loadavg is system dependent, we'll figure it out later.
* On Linux, applications read from /proc/loadavg.
+ *
+ * The listening info duplicates the hostname until we split
+ * the replication identifier from hostname.
*/
if (asprintf(&str,
"<h1>Status</h1>"
@@ -415,11 +442,50 @@ bool stat_status(struct client *cli, GList *content)
tabled_srv.ourhost, tabled_srv.port) < 0)
return false;
content = g_list_append(content, str);
+
if (asprintf(&str,
- "<p>State: TDB %s</p>\r\n",
- state_name_tdb[tabled_srv.state_tdb]) < 0)
+ "<p>TDB: group %s "
+ "state %s host %s rep_port %d dbid %d%s</p>\r\n",
+ tabled_srv.group, state_name_tdb[tabled_srv.state_tdb],
+ tabled_srv.ourhost, tabled_srv.rep_port, tdbrep.thisid,
+ (tabled_srv.mc_delay)? " mc_delay": "") < 0)
return false;
content = g_list_append(content, str);
+
+ if (tabled_srv.rep_remotes) {
+ if (asprintf(&str, "<p>") < 0)
+ return false;
+ content = g_list_append(content, str);
+ for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) {
+ rp = tmp->data;
+ rc = asprintf(&str, "Peer: name %s dbid %d",
+ rp->name, rp->dbid);
+ if (rc < 0)
+ return false;
+ content = g_list_append(content, str);
+ if (rp->host) {
+ rc = asprintf(&str, " host %s port %d",
+ rp->host, rp->port);
+ if (rc < 0)
+ return false;
+ content = g_list_append(content, str);
+ }
+ if (rp == tabled_srv.rep_master) {
+ str = strdup(" (master)");
+ if (!str)
+ return false;
+ content = g_list_append(content, str);
+ }
+ rc = asprintf(&str, "<br />\r\n");
+ if (rc < 0)
+ return false;
+ content = g_list_append(content, str);
+ }
+ if (asprintf(&str, "</p>\r\n") < 0)
+ return false;
+ content = g_list_append(content, str);
+ }
+
if (asprintf(&str,
"<p>Stats: "
"poll %lu event %lu tcp_accept %lu opt_write %lu</p>\r\n"
@@ -1421,7 +1487,7 @@ static void add_chkpt_timer(void)
static void tdb_checkpoint(int fd, short events, void *userdata)
{
- DB_ENV *dbenv = tdb.env;
+ DB_ENV *dbenv = tdbrep.tdb.env;
int rc;
if (debugging)
@@ -1436,29 +1502,50 @@ static void tdb_checkpoint(int fd, short events, void *userdata)
add_chkpt_timer();
}
+static void add_reup_timer(void)
+{
+ static const struct timeval tv = { TABLED_REUP_SEC, 0 };
+
+ if (evtimer_add(&tabled_srv.reup_timer, &tv) < 0)
+ applog(LOG_WARNING, "unable to add reup timer");
+}
+
+static void tdb_reup(int fd, short events, void *userdata)
+{
+
+ if (tabled_srv.state_want == ST_W_MASTER &&
+ tabled_srv.state_tdb == ST_TDB_MASTER) {
+ /*
+ * An upgrade failed, retry.
+ */
+ if (rtdb_restart(&tdbrep, true)) {
+ applog(LOG_WARNING, "Cannot restart to master");
+ add_reup_timer();
+ }
+ }
+}
+
static void tdb_state_cb(enum db_event event)
{
unsigned char cmd;
switch (event) {
case TDB_EV_ELECTED:
- /*
- * Safe to stop ignoring bogus client indication,
- * so unmute us by advancing the state.
- */
- if (tabled_srv.state_tdb == ST_TDB_OPEN)
- tabled_srv.state_tdb = ST_TDB_ACTIVE;
+ /* Just ignore this, we only care for the end state. */
break;
case TDB_EV_CLIENT:
+ /* P3 */ applog(LOG_INFO, "TDB event: slave, state %s", state_name_tdb[tabled_srv.state_tdb]);
+ goto overmsg;
case TDB_EV_MASTER:
+ /* P3 */ applog(LOG_INFO, "TDB event: master, state %s", state_name_tdb[tabled_srv.state_tdb]);
+ overmsg:
/*
* This callback runs on the context of the replication
* manager thread, and calling any of our functions thus
* turns our program into a multi-threaded one. Instead
* we signal the main thread to do the processing.
*/
- if (tabled_srv.state_tdb != ST_TDB_INIT &&
- tabled_srv.state_tdb != ST_TDB_OPEN) {
+ if (tabled_srv.state_tdb != ST_TDB_INIT) {
if (event == TDB_EV_MASTER)
cmd = TT_CMD_TDBST_MASTER;
else
@@ -1472,6 +1559,55 @@ static void tdb_state_cb(enum db_event event)
}
}
+void cld_update_cb(void)
+{
+ switch (tabled_srv.state_want) {
+ case ST_W_MASTER:
+ if (tabled_srv.state_tdb == ST_TDB_MASTER) {
+ ; /* CLD caught up to DB, better late than never */
+ } else if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+ /* CLD tells us to upgrade, do it */
+ if (rtdb_restart(&tdbrep, true)) {
+ applog(LOG_WARNING,
+ "Unable to restart to master");
+ /*
+ * Don't try rtdb_fini here, will end in a hang.
+ * Instead, retry endlessly until it succeeds.
+ */
+ add_reup_timer();
+ }
+ } else {
+ applog(LOG_WARNING, "Want Master while in state %s",
+ state_name_tdb[tabled_srv.state_tdb]);
+ }
+ break;
+ case ST_W_SLAVE:
+ if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+ ; /* all good */
+ } else if (tabled_srv.state_tdb == ST_TDB_MASTER) {
+ /*
+ * OK, this is bad. We lost our CLD session and some
+ * other node went master on us. Even if we downgrade
+ * the database now, some clients may have done some
+ * operations while CLD was bouncing. Complain loudly.
+ */
+ applog(LOG_WARNING,
+ "Downgrading the database,"
+ " data loss is possible");
+ if (rtdb_restart(&tdbrep, false)) {
+ tabled_srv.state_tdb = ST_TDB_INIT;
+ rtdb_fini(&tdbrep);
+ }
+ } else {
+ applog(LOG_WARNING, "Want Slave while in state %s",
+ state_name_tdb[tabled_srv.state_tdb]);
+ }
+ break;
+ default:
+ ;
+ }
+}
+
/*
* Due to the way storage_node management is tightly woven into the
* server, the management of nodes is not in storage.c, which deals
@@ -1485,7 +1621,6 @@ int stor_update_cb(void)
{
int num_up;
struct storage_node *stn;
- unsigned int env_flags;
if (debugging)
applog(LOG_DEBUG, "Know of potential %d storage node(s)",
@@ -1518,15 +1653,13 @@ int stor_update_cb(void)
* We initiate operations even if there's no redundancy in order
* to permit bootstrapping and build-time self-checking.
*/
+/* P3 */ applog(LOG_INFO, "storage updated, TDB state %s", state_name_tdb[tabled_srv.state_tdb]);
if (tabled_srv.state_tdb == ST_TDB_INIT) {
tabled_srv.state_tdb = ST_TDB_OPEN;
-
- env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
- if (tdb_init(&tdb, tabled_srv.tdb_dir, NULL,
- env_flags, "tabled", true,
- tabled_srv.rep_remotes,
- tabled_srv.ourhost, tabled_srv.rep_port,
- tdb_state_cb)) {
+ if (rtdb_start(&tdbrep, tabled_srv.tdb_dir,
+ tabled_srv.state_want == ST_W_MASTER,
+ tabled_srv.rep_master,
+ tabled_srv.rep_port, tdb_state_cb)) {
tabled_srv.state_tdb = ST_TDB_INIT;
applog(LOG_ERR, "Failed to open TDB, limping");
}
@@ -1535,10 +1668,122 @@ int stor_update_cb(void)
* FIXME This is where we should process redundancy decreases.
*/
;
+ } else if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+ if (tabled_srv.state_want == ST_W_MASTER) {
+ if (rtdb_restart(&tdbrep, true)) {
+ applog(LOG_WARNING,
+ "Failed to restart to master");
+ add_reup_timer();
+ }
+ }
}
return num_up;
}
+int tdb_slave_login_cb(int srcid)
+{
+ struct db_remote *master;
+
+ master = tabled_srv.rep_master;
+ if (!master) {
+ applog(LOG_INFO, "No master at login");
+ return -1;
+ }
+ if (master->dbid == 0) {
+ applog(LOG_INFO, "Master dbid %d", srcid);
+ } else {
+ if (master->dbid != srcid) {
+ /*
+ * This is probably a bad news. Perhaps master rebooted
+ * on the other side of the network partition and yet
+ * somehow won a lock in CLD, or something even weirder.
+ * But we don't know.
+ */
+ applog(LOG_INFO,
+ "Master switch from dbid %d to dbid %d",
+ master->dbid, srcid);
+ }
+ }
+ master->dbid = srcid;
+
+ if (tabled_srv.state_tdb == ST_TDB_OPEN) {
+ applog(LOG_INFO, "Established link, master %s dbid %d",
+ master->name, master->dbid);
+ if (tabled_srv.state_want != ST_W_SLAVE) {
+ applog(LOG_ERR, "Unexpected TDB state %s, limping",
+ state_name_tdb[tabled_srv.state_tdb]);
+ rtdb_fini(&tdbrep);
+ tabled_srv.state_tdb = ST_TDB_INIT;
+ return -1;
+ }
+ if (rtdb_start(&tdbrep, tabled_srv.tdb_dir,
+ false,
+ master,
+ tabled_srv.rep_port, tdb_state_cb)) {
+ tabled_srv.state_tdb = ST_TDB_INIT;
+ applog(LOG_ERR, "Failed to open TDB, limping");
+ return -1;
+ }
+ } else if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+ applog(LOG_INFO, "Recovered master connection");
+ } else {
+ applog(LOG_INFO, "Confused about connections");
+ }
+ return 0;
+}
+
+void tdb_slave_disc_cb(void)
+{
+ static const struct timeval tv = { TABLED_MCWAIT_SEC, 0 };
+
+ if (tabled_srv.mc_delay)
+ return;
+ evtimer_add(&tabled_srv.mc_timer, &tv);
+ tabled_srv.mc_delay = true;
+}
+
+static void tdb_mc_delay(int fd, short events, void *userdata)
+{
+ static const unsigned char cmd = TT_CMD_MASTER_LINK_RESET;
+
+ tabled_srv.mc_delay = false;
+ write(tabled_srv.ev_pipe[1], &cmd, 1);
+}
+
+void tdb_conn_scrub_cb(void)
+{
+ unsigned char cmd;
+
+ cmd = TT_CMD_LINK_SCRUB;
+ write(tabled_srv.ev_pipe[1], &cmd, 1);
+}
+
+struct db_remote *tdb_find_remote_byname(const char *name)
+{
+ struct db_remote *rp;
+ GList *tmp;
+
+ for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) {
+ rp = tmp->data;
+ if (strcmp(rp->name, name) == 0)
+ return rp;
+ }
+ return NULL;
+}
+
+struct db_remote *tdb_find_remote_byid(int id)
+{
+ struct db_remote *rp;
+ GList *tmp;
+
+ for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) {
+ rp = tmp->data;
+ if (rp->dbid == id)
+ return rp;
+ }
+ return NULL;
+}
+
static int net_open_socket(int addr_fam, int sock_type, int sock_prot,
int addr_len, void *addr_ptr, bool is_status)
{
@@ -1833,26 +2078,66 @@ static void compile_patterns(void)
}
}
-static void tdb_state_process(enum st_tdb new_state)
+static void tdb_startup(void)
{
unsigned int db_flags;
- if (debugging)
- applog(LOG_DEBUG, "TDB state > %s", state_name_tdb[new_state]);
- if ((new_state == ST_TDB_MASTER || new_state == ST_TDB_SLAVE) &&
- tabled_srv.state_tdb == ST_TDB_ACTIVE) {
+ db_flags = DB_CREATE | DB_THREAD;
+ if (tdb_up(&tdbrep.tdb, db_flags))
+ return;
+ if (objid_init(&tabled_srv.object_count, &tdbrep.tdb)) {
+ tdb_down(&tdbrep.tdb);
+ return;
+ }
+ add_chkpt_timer();
+ rep_start();
+ net_listen_client();
+}
- db_flags = DB_CREATE | DB_THREAD;
- if (tdb_up(&tdb, db_flags))
- return;
+static void tdb_state_process(enum st_tdb new_state)
+{
- if (objid_init(&tabled_srv.object_count, &tdb)) {
- tdb_down(&tdb);
- return;
+ applog(LOG_INFO, "TDB state %s > %s",
+ state_name_tdb[tabled_srv.state_tdb], state_name_tdb[new_state]);
+
+ if (tabled_srv.state_tdb == ST_TDB_OPEN) {
+ if (new_state == ST_TDB_MASTER) {
+ if (tabled_srv.state_want == ST_W_MASTER) {
+ tdb_startup();
+ } else {
+ /*
+ * We want slave if we cannot connect to CLD,
+ * or we cannot lock the master file, which
+ * means that other master may exist.
+ * But the db goes master on us, so
+ * either the other master is dead or we're
+ * misconfigured so DBs cannot talk.
+ * Either way, we should poke db until the
+ * desired result is accomplished. XXX
+ */
+ applog(LOG_INFO, "TDB went Master on us");
+ }
+ } else if (new_state == ST_TDB_SLAVE) {
+ applog(LOG_INFO, "TDB went Slave, so whatever");
+ ;
+ } else {
+ applog(LOG_ERR, "TDB went to unexpected state");
+ }
+ } else if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+ if (new_state == ST_TDB_MASTER) {
+ if (tabled_srv.state_want == ST_W_MASTER) {
+ tdb_startup();
+ } else {
+ /*
+ * This is either a net split or CLD is doing
+ * its timeouts and so we do not want to be
+ * a master yet.
+ */
+ applog(LOG_ERR, "TDB upgraded on us");
+ }
+ } else {
+ applog(LOG_ERR, "TDB is confused");
}
- add_chkpt_timer();
- rep_start();
- net_listen_client();
}
}
@@ -1871,6 +2156,11 @@ static void internal_event(int fd, short events, void *userdata)
abort();
}
+ if (debugging) {
+ applog(LOG_DEBUG, "Context Event %s, TDB state %s",
+ cmd_name_tdb[cmd], state_name_tdb[tabled_srv.state_tdb]);
+ }
+
switch (cmd) {
case TT_CMD_DUMP:
stats_dump();
@@ -1890,6 +2180,15 @@ static void internal_event(int fd, short events, void *userdata)
}
break;
+ case TT_CMD_MASTER_LINK_RESET:
+ rtdb_mc_reset(&tdbrep, tabled_srv.state_want == ST_W_MASTER,
+ tabled_srv.rep_master, tabled_srv.rep_port);
+ break;
+
+ case TT_CMD_LINK_SCRUB:
+ rtdb_dbc_scrub(&tdbrep);
+ break;
+
default:
applog(LOG_WARNING, "%s BUG: command 0x%x", __func__, cmd);
break;
@@ -1905,6 +2204,7 @@ int main (int argc, char *argv[])
INIT_LIST_HEAD(&tabled_srv.all_stor);
INIT_LIST_HEAD(&tabled_srv.write_compl_q);
tabled_srv.state_tdb = ST_TDB_INIT;
+ tabled_srv.rep_next_id = DBID_MIN;
/* isspace() and strcasecmp() consistency requires this */
setlocale(LC_ALL, "C");
@@ -1978,6 +2278,8 @@ int main (int argc, char *argv[])
tabled_srv.evbase_main = event_init();
event_base_rep = event_base_new();
evtimer_set(&tabled_srv.chkpt_timer, tdb_checkpoint, NULL);
+ evtimer_set(&tabled_srv.mc_timer, tdb_mc_delay, NULL);
+ evtimer_set(&tabled_srv.reup_timer, tdb_reup, NULL);
/* set up internal communication pipe */
if (pipe(tabled_srv.ev_pipe) < 0) {
@@ -1991,6 +2293,13 @@ int main (int argc, char *argv[])
goto err_pevt;
}
+ /* late-construct structures with allocations */
+ if (rtdb_init(&tdbrep, tabled_srv.ourhost)) {
+ applog(LOG_WARNING, "rtdb_init");
+ rc = 1;
+ goto err_rtdb;
+ }
+
/* set up server networking */
if (tabled_srv.status_port) {
if (net_open_known(tabled_srv.status_port, true) == 0)
@@ -2000,7 +2309,8 @@ int main (int argc, char *argv[])
if (rc)
goto err_out_net;
- if (cld_begin(tabled_srv.ourhost, tabled_srv.group, verbose) != 0) {
+ if (cld_begin(tabled_srv.ourhost, tabled_srv.group,
+ tabled_srv.rep_name, verbose) != 0) {
rc = 1;
goto err_cld_session;
}
@@ -2023,13 +2333,13 @@ err_cld_session:
err_out_net:
if (tabled_srv.state_tdb == ST_TDB_MASTER ||
tabled_srv.state_tdb == ST_TDB_SLAVE) {
- tdb_down(&tdb);
- tdb_fini(&tdb);
- } else if (tabled_srv.state_tdb == ST_TDB_OPEN ||
- tabled_srv.state_tdb == ST_TDB_ACTIVE) {
- tdb_fini(&tdb);
+ tdb_down(&tdbrep.tdb);
+ rtdb_fini(&tdbrep);
+ } else if (tabled_srv.state_tdb == ST_TDB_OPEN) {
+ rtdb_fini(&tdbrep);
}
-/* err_tdb_init: */
+err_rtdb:
+ event_del(&tabled_srv.pevt);
err_pevt:
close(tabled_srv.ev_pipe[0]);
close(tabled_srv.ev_pipe[1]);
diff --git a/server/tabled.h b/server/tabled.h
index ff419e3..c90511c 100644
--- a/server/tabled.h
+++ b/server/tabled.h
@@ -45,6 +45,8 @@ enum {
TABLED_CHKPT_SEC = 60 * 5, /* secs between db4 chkpt */
TABLED_RESCAN_SEC = 60*3 + 7, /* secs btw key rescans */
+ TABLED_MCWAIT_SEC = 35, /* secs to moderate reconn. */
+ TABLED_REUP_SEC = 35, /* secs to retry rtdb_restart */
CHUNK_REBOOT_TIME = 3*60, /* secs to declare chunk dead */
@@ -200,8 +202,12 @@ struct client {
char req_buf[CLI_REQ_BUF_SZ]; /* input buffer */
};
+enum st_want {
+ ST_W_INIT, ST_W_MASTER, ST_W_SLAVE
+};
+
enum st_tdb {
- ST_TDB_INIT, ST_TDB_OPEN, ST_TDB_ACTIVE, ST_TDB_MASTER, ST_TDB_SLAVE,
+ ST_TDB_INIT, ST_TDB_OPEN, ST_TDB_MASTER, ST_TDB_SLAVE,
ST_TDBNUM
};
@@ -218,6 +224,17 @@ struct server_stats {
unsigned long max_write_buf;
};
+#define DBID_NONE 0
+#define DBID_MIN 2
+#define DBID_MAX 105
+
+struct db_remote { /* other DB nodes */
+ char *name; /* do not resolve as a host */
+ char *host;
+ unsigned short port;
+ int dbid; /* signed in db4, traditional */
+};
+
struct listen_cfg {
/* bool encrypt; */
/* char *host; */
@@ -233,6 +250,8 @@ struct server {
int ev_pipe[2];
struct event pevt;
struct list_head write_compl_q; /* list of done writes */
+ bool mc_delay;
+ struct event mc_timer;
char *config; /* config file (static) */
@@ -242,6 +261,7 @@ struct server {
char *port_file;
char *chunk_user; /* username for stc_new */
char *chunk_key; /* key for stc_new */
+ char *rep_name; /* db4 replication name */
unsigned short rep_port; /* db4 replication port */
char *status_port; /* status webserver */
char *group; /* our group (both T and Ch) */
@@ -249,12 +269,16 @@ struct server {
char *ourhost; /* use this if DB master */
struct database *db; /* database handle */
GList *rep_remotes;
+ struct db_remote *rep_master; /* if we're slave */
+ int rep_next_id;
+ struct event reup_timer;
GList *sockets;
struct list_head all_stor; /* struct storage_node */
int num_stor; /* number of storage_node's */
uint64_t object_count;
+ enum st_want state_want;
enum st_tdb state_tdb;
enum st_net state_net;
@@ -263,7 +287,55 @@ struct server {
struct server_stats stats; /* global statistics */
};
-extern struct tabledb tdb;
+/*
+ * Low-level channel, for both sides.
+ *
+ * The combined link state confuses session (e.g. login) and the framing, which
+ * is not pretty but works. At least we have a separate link-state struct.
+ *
+ * In a settled state, db_conn corresponds 1:1 to db_remote, but
+ * it's not necesserily so when connections are being established.
+ */
+enum dbc_state { DBC_INIT, DBC_LOGIN, DBC_OPEN, DBC_DEAD };
+
+struct db_link {
+ int fd;
+ enum dbc_state state;
+
+ bool writing;
+ struct event wrev; /* when writing */
+ unsigned char *obuf;
+ int obuflen;
+ int done, togo;
+
+ struct event rcev; /* whenever fd >= 0 */
+ unsigned char *ibuf;
+ int ibuflen; /* currently allocated ibuf */
+ int cnt; /* currently in ibuf */
+ int explen; /* expected length */
+};
+
+struct db_conn { /* a connection with other DB node */
+ struct tablerep *rtdb;
+ struct db_remote *remote;
+ struct list_head link;
+
+ struct db_link lk;
+};
+
+struct tablerep {
+ struct tabledb tdb;
+ const char *thisname;
+ int thisid;
+
+ int sockfd4, sockfd6;
+ struct event lsev4, lsev6;
+ struct list_head conns; // struct db_conn
+
+ struct db_conn *mdbc;
+};
+
+extern struct tablerep tdbrep;
/* bucket.c */
extern bool has_access(const char *user, const char *bucket, const char *key,
@@ -295,7 +367,8 @@ extern void cli_in_end(struct client *cli);
/* cldu.c */
extern void cld_init(void);
-extern int cld_begin(const char *fqdn, const char *group, int verbose);
+extern int cld_begin(const char *fqdn, const char *group, const char *name,
+ int verbose);
extern void cldu_add_host(const char *host, unsigned int port);
extern void cld_end(void);
@@ -332,7 +405,13 @@ extern bool cli_write_start(struct client *cli);
extern bool cli_write_run_compl(void);
extern int cli_req_avail(struct client *cli);
extern void applog(int prio, const char *fmt, ...);
+extern void cld_update_cb(void);
extern int stor_update_cb(void);
+extern int tdb_slave_login_cb(int srcid);
+extern void tdb_slave_disc_cb(void);
+extern void tdb_conn_scrub_cb(void);
+extern struct db_remote *tdb_find_remote_byname(const char *name);
+extern struct db_remote *tdb_find_remote_byid(int id);
/* status.c */
extern bool stat_evt_http_req(struct client *cli, unsigned int events);
@@ -374,4 +453,16 @@ extern void rep_start(void);
extern void rep_stats(void);
extern bool rep_status(struct client *cli, GList *content);
+/* metarep.c */
+extern int rtdb_init(struct tablerep *rtdb, const char *thishost);
+extern int rtdb_start(struct tablerep *rtdb, const char *db_home,
+ bool we_are_master,
+ struct db_remote *rep_master, unsigned short rep_port,
+ void (*cb)(enum db_event));
+extern void rtdb_mc_reset(struct tablerep *rtdb, bool we_are_master,
+ struct db_remote *rep_master, unsigned short rep_port);
+extern void rtdb_dbc_scrub(struct tablerep *rtdb);
+extern int rtdb_restart(struct tablerep *rtdb, bool we_are_master);
+extern void rtdb_fini(struct tablerep *rtdb);
+
#endif /* __TABLED_H__ */
diff --git a/server/tdbadm.c b/server/tdbadm.c
index 86fa4b3..4bd26cc 100644
--- a/server/tdbadm.c
+++ b/server/tdbadm.c
@@ -45,11 +45,10 @@ enum various_modes {
static int mode_adm;
static unsigned long invalid_lines;
static char *tdb_dir;
-static unsigned short rep_port;
static char *config = "/etc/tabled.conf";
-static char *ourhost;
static struct tabledb tdb;
+static bool tdb_is_master;
const char *argp_program_version = PACKAGE_VERSION;
@@ -110,7 +109,6 @@ static void cfg_elm_end(GMarkupParseContext *context,
{
struct config_context *cc = user_data;
struct stat statb;
- int n;
if (!strcmp(element_name, "TDB") && cc->text) {
if (!tdb_dir) {
@@ -134,25 +132,6 @@ static void cfg_elm_end(GMarkupParseContext *context,
cc->text = NULL;
}
- else if (!strcmp(element_name, "ForceHost") && cc->text) {
- free(ourhost);
- ourhost = cc->text;
- cc->text = NULL;
- }
-
- else if (!strcmp(element_name, "TDBRepPort") && cc->text) {
- n = strtol(cc->text, NULL, 10);
- if (n <= 0 || n >= 65536) {
- fprintf(stderr, "warning: "
- "TDBRepPort '%s' invalid, ignoring", cc->text);
- free(cc->text);
- cc->text = NULL;
- return;
- }
- rep_port = n;
- free(cc->text);
- cc->text = NULL;
- }
}
static bool str_n_isspace(const char *s, size_t n)
@@ -198,8 +177,6 @@ static void read_config(void)
memset(&ctx, 0, sizeof(struct config_context));
- rep_port = 8083;
-
if (!g_file_get_contents(config, &text, &len, NULL)) {
fprintf(stderr, "failed to read config file %s\n", config);
exit(1);
@@ -603,10 +580,15 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
return 0;
}
+static void tdb_state_cb(enum db_event event)
+{
+ if (event == TDB_EV_MASTER)
+ tdb_is_master = true;
+}
+
int main(int argc, char *argv[])
{
- char hostname[64];
- unsigned int env_flags, db_flags;
+ unsigned int db_flags;
error_t aprc;
int rc = 1;
@@ -621,21 +603,12 @@ int main(int argc, char *argv[])
if (!tdb_dir)
die("no tdb dir (-t) specified\n");
- if (ourhost)
- strcpy(hostname, ourhost);
- else if (gethostname(hostname, sizeof(hostname)) < 0) {
- fprintf(stderr, "gethostname failed: %s\n", strerror(errno));
- return 1;
- }
-
- env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
- if (tdb_init(&tdb, tdb_dir, NULL, env_flags,
- "tdbadm", false, NULL, hostname, rep_port, NULL))
+ if (tdb_init(&tdb, tdb_dir, NULL, "tdbadm", false,
+ 0, NULL, true, tdb_state_cb))
goto err_dbinit;
- /* Usually takes about 12s */
- /* FIXME don't peek into private parts of tdb struct, use state_cb */
- while (!tdb.is_master)
+ /* Usually takes about 12s, if vote is involved. */
+ while (!tdb_is_master)
sleep(2);
db_flags = DB_CREATE | DB_THREAD;
^ permalink raw reply related [flat|nested] 5+ messages in thread
* Re: [tabled patch 3/3] Fix metadata replication
2010-08-06 3:40 [tabled patch 3/3] Fix metadata replication Pete Zaitcev
@ 2010-08-10 19:14 ` Jeff Garzik
2010-08-10 21:19 ` Pete Zaitcev
0 siblings, 1 reply; 5+ messages in thread
From: Jeff Garzik @ 2010-08-10 19:14 UTC (permalink / raw)
To: Pete Zaitcev; +Cc: Project Hail List
On 08/05/2010 11:40 PM, Pete Zaitcev wrote:
> The metadata replication in tabled nominally existed, but did not
> worked. There were a couple of small bugs (such as an attempt to
> boot directly into Slave state would lead to a hang). However, the
> biggest problem was how the identity of nodes in Replication Manager
> API had to be the same as hostname. When doing so, repmgr code
> used the hostname to bind instead of a wildcard socket. But when
> doing so, on any stock Fedora or RHEL system it would end listening
> on loopback only, because the /etc/hosts aliased the hostname to
> loopback address. Thus running any replication required addition
> host configuration that can cause any kind of unexpected consequences.
> In addition it's impossible to run two nodes on one host for testing.
>
> This patch does away with the Replication Manager and uses Base API
> instead. This way, issues with host aliasing are addressed, and
> the state transitions occur much faster because there is no voting.
>
> Note that the provision is added to run peers on the same host,
> using a configuration clause TDBRepName. I was unable to come up with
> a reliable way to make persistent, nonconflicting identifiers that
> would replace hostnames. Fortunately, this should only be used
> for build tests, where we probably can live with it.
>
> The resulting replication feature was tested to work. Not sure if
> it is enough to trust it with one's data, but it's better than before.
>
> Signed-off-by: Pete Zaitcev<zaitcev@redhat.com>
>
> ---
> doc/etc.tabled.conf | 8
> doc/setup.txt | 13 +
> include/tdb.h | 15 -
> lib/tdb.c | 130 ++++++-------
> server/Makefile.am | 2
> server/bucket.c | 34 +--
> server/cldu.c | 416 ++++++++++++++++++++++++++++++++++++------
> server/config.c | 10 +
> server/object.c | 22 +-
> server/replica.c | 8
> server/server.c | 404 ++++++++++++++++++++++++++++++++++++----
> server/tabled.h | 97 +++++++++
> server/tdbadm.c | 51 +----
> 13 files changed, 963 insertions(+), 247 deletions(-)
Including metarep.c would be helpful ;-)
Will wait on release for this...
^ permalink raw reply [flat|nested] 5+ messages in thread
* Re: [tabled patch 3/3] Fix metadata replication
2010-08-10 19:14 ` Jeff Garzik
@ 2010-08-10 21:19 ` Pete Zaitcev
2010-08-10 22:43 ` Jeff Garzik
0 siblings, 1 reply; 5+ messages in thread
From: Pete Zaitcev @ 2010-08-10 21:19 UTC (permalink / raw)
To: Jeff Garzik; +Cc: Project Hail List, zaitcev
On Tue, 10 Aug 2010 15:14:19 -0400
Jeff Garzik <jeff@garzik.org> wrote:
> On 08/05/2010 11:40 PM, Pete Zaitcev wrote:
> > The metadata replication in tabled nominally existed, but did not
> > worked. There were a couple of small bugs (such as an attempt to
> > boot directly into Slave state would lead to a hang). However, the
> > biggest problem was how the identity of nodes in Replication Manager
> > API had to be the same as hostname. When doing so, repmgr code
> > used the hostname to bind instead of a wildcard socket. But when
> > doing so, on any stock Fedora or RHEL system it would end listening
> > on loopback only, because the /etc/hosts aliased the hostname to
> > loopback address. Thus running any replication required addition
> > host configuration that can cause any kind of unexpected consequences.
> > In addition it's impossible to run two nodes on one host for testing.
> >
> > This patch does away with the Replication Manager and uses Base API
> > instead. This way, issues with host aliasing are addressed, and
> > the state transitions occur much faster because there is no voting.
> >
> > Note that the provision is added to run peers on the same host,
> > using a configuration clause TDBRepName. I was unable to come up with
> > a reliable way to make persistent, nonconflicting identifiers that
> > would replace hostnames. Fortunately, this should only be used
> > for build tests, where we probably can live with it.
> >
> > The resulting replication feature was tested to work. Not sure if
> > it is enough to trust it with one's data, but it's better than before.
> >
> > Signed-off-by: Pete Zaitcev <zaitcev@redhat.com>
> >
> > ---
> > doc/etc.tabled.conf | 8
> > doc/setup.txt | 13 +
> > include/tdb.h | 15 -
> > lib/tdb.c | 130 ++++++-------
> > server/Makefile.am | 2
> > server/bucket.c | 34 +--
> > server/cldu.c | 416 ++++++++++++++++++++++++++++++++++++------
> > server/config.c | 10 +
> > server/object.c | 22 +-
> > server/replica.c | 8
> > server/server.c | 404 ++++++++++++++++++++++++++++++++++++----
> > server/tabled.h | 97 +++++++++
> > server/tdbadm.c | 51 +----
> > 13 files changed, 963 insertions(+), 247 deletions(-)
>
> Including metarep.c would be helpful ;-)
>
> Will wait on release for this...
Sorry!
Unfortunately this is getting a little behind, because I started
working on the tests and they require some changes (e.g. the listening
host and port may be unknown at the time MASTER file is locked).
---
doc/etc.tabled.conf | 8
doc/setup.txt | 13
include/tdb.h | 15
lib/tdb.c | 130 ++--
server/Makefile.am | 2
server/bucket.c | 34 -
server/cldu.c | 416 ++++++++++++--
server/config.c | 10
server/metarep.c | 1245 ++++++++++++++++++++++++++++++++++++++++++
server/object.c | 22
server/replica.c | 8
server/server.c | 404 ++++++++++++-
server/tabled.h | 97 +++
server/tdbadm.c | 51 -
14 files changed, 2208 insertions(+), 247 deletions(-)
diff --git a/doc/etc.tabled.conf b/doc/etc.tabled.conf
index 22d20a7..c3b1d1d 100644
--- a/doc/etc.tabled.conf
+++ b/doc/etc.tabled.conf
@@ -13,12 +13,12 @@
<!--
One group per DB, don't skimp on groups. Also, make sure the replication
- ports do not conflict when you make hosts to host several groups.
- Unfortunately, the diagnostics are not very good if they do.
- Most likely you'll see database corruption in such cases.
+ ports do not conflict when you make boxes to host several groups or use
+ replication instances iwth TDBRepName.
-->
<Group>ultracart2</Group>
-<TDB>/path/tabled/tdb</TDB>
+<TDB>/path/tabled-uc2/</TDB> <!-- mkdir -p /path/tabled-uc2 -->
+<!-- <TDBRepName>12345.my_local_node_name.example.com</TDBRepName> -->
<TDBRepPort>8083</TDBRepPort>
<!--
diff --git a/doc/setup.txt b/doc/setup.txt
index ac0dfb0..c7a4c6a 100644
--- a/doc/setup.txt
+++ b/doc/setup.txt
@@ -15,7 +15,9 @@ _cld._udp.phx2.ex.com has SRV record 10 50 8081 maika.phx2.ex.com.
Also, make sure that your hostname has a domain. We don't want to search
for CLD in the world-wide DNS root, do we?
- Make sure CLD is up (run "cldcli" to verify).
+ Once you know that CLD is running, verify that tabled can talk to
+ it by running "cldcli". UDP traffic to be allowed for port 8081 or
+ other port as specified in the SRV record.
*) Another thing to set up in DNS is a wildcard host for the system where
tabled will run. Unlike the SRV records of CLD, this is optional, but
@@ -30,6 +32,10 @@ emus3 IN A 192.168.128.9
All examples on Google say FQDN is required, and most presume aliasing
of A and AAAA records, but BIND 9 eats the above fine.
+*) Speaking of FQDN, it is possible to force tabled to use a non-default
+ hostname with ForceHost tag. In practice this is only useful when
+ the DNS is broken.
+
*) Copy configuration file from doc/etc.tabled.conf to /etc/tabled.conf
and edit to suit (see configurable items below). Notice that the file
looks like XML, but is not really. In particular, names of elements are
@@ -53,6 +59,11 @@ emus3 IN A 192.168.128.9
Group name defaults to "default", so you can leave this element unset,
but don't do it. Any name, even "qwerty", is better than the default.
+*) In each group, tabled uses its hostname to identify itself. However,
+ if you ever wish to run two tabled processes that serve the same group,
+ it can be accomplished by setting TDBRepName. N.B.: A loss of power for
+ the host will knock out all of them, so never use this in production.
+
*) Select the port to listen, if desired. This is done using the <Listen>
element:
diff --git a/include/tdb.h b/include/tdb.h
index 8895704..ff3b4b5 100644
--- a/include/tdb.h
+++ b/include/tdb.h
@@ -109,15 +109,12 @@ struct tabledb {
DB *oids; /* object ID db */
};
-struct db_remote { /* remotes for tdb_init */
- char *host;
- unsigned short port;
-};
-
-extern int tdb_init(struct tabledb *tdb, const char *home, const char *pass,
- unsigned int env_flags, const char *errpfx, bool do_syslog,
- GList *remotes, char *rep_host, unsigned short rep_port,
- void (*cb)(enum db_event));
+extern int tdb_init(struct tabledb *tdb, const char *db_home,
+ const char *db_password, const char *errpfx, bool do_syslog,
+ int rep_our_id,
+ int (*rep_send)(DB_ENV *dbenv, const DBT *ctl, const DBT *rec,
+ const DB_LSN *lsnp, int envid, uint32_t flags),
+ bool we_are_master, void (*cb)(enum db_event));
extern int tdb_up(struct tabledb *tdb, unsigned int open_flags);
extern void tdb_down(struct tabledb *tdb);
extern void tdb_fini(struct tabledb *tdb);
diff --git a/lib/tdb.c b/lib/tdb.c
index bc5e50a..29a18f0 100644
--- a/lib/tdb.c
+++ b/lib/tdb.c
@@ -1,6 +1,6 @@
/*
- * Copyright 2008-2009 Red Hat, Inc.
+ * Copyright 2008-2010 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -148,35 +148,15 @@ err_out:
return -EIO;
}
-static int add_remote_sites(DB_ENV *dbenv, GList *remotes, int *nsites)
-{
- int rc;
- struct db_remote *rp;
- GList *tmp;
-
- *nsites = 0;
- for (tmp = remotes; tmp; tmp = tmp->next) {
- rp = tmp->data;
-
- rc = dbenv->repmgr_add_remote_site(dbenv, rp->host, rp->port,
- NULL, 0);
- if (rc) {
- dbenv->err(dbenv, rc,
- "dbenv->add.remote.site host %s port %u",
- rp->host, rp->port);
- return rc;
- }
- (*nsites)++;
- }
-
- return 0;
-}
-
static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
{
struct tabledb *tdb = dbenv->app_private;
switch (event) {
+ case DB_EVENT_PANIC:
+ dbenv->errx(dbenv, "PANIC event is reported, exiting");
+ exit(2);
+ break;
case DB_EVENT_REP_CLIENT:
tdb->is_master = false;
if (tdb->state_cb)
@@ -191,6 +171,14 @@ static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
if (tdb->state_cb)
(*tdb->state_cb)(TDB_EV_ELECTED);
break;
+ case DB_EVENT_REP_NEWMASTER:
+ dbenv->errx(dbenv, "New master is reported: %d",
+ *(int *)event_info);
+ /* XXX Need to verify that it's the same master as before. */
+ break;
+ case DB_EVENT_REP_STARTUPDONE:
+ dbenv->errx(dbenv, "Client start-up complete");
+ break;
default:
/* do nothing */
break;
@@ -202,15 +190,18 @@ static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
* db_password, cb can be NULL
*/
int tdb_init(struct tabledb *tdb, const char *db_home, const char *db_password,
- unsigned int env_flags, const char *errpfx, bool do_syslog,
- GList *remotes, char *rep_host, unsigned short rep_port,
+ const char *errpfx, bool do_syslog, int rep_ourid,
+ int (*rep_send)(DB_ENV *dbenv, const DBT *ctl, const DBT *rec,
+ const DB_LSN *lsnp, int envid, uint32_t flags),
+ bool we_are_master,
void (*cb)(enum db_event))
{
- int nsites;
+ unsigned int env_flags;
+ unsigned int rep_flags;
int rc;
DB_ENV *dbenv;
- tdb->is_master = false;
+ tdb->is_master = we_are_master;
tdb->home = db_home;
tdb->state_cb = cb;
@@ -258,12 +249,6 @@ int tdb_init(struct tabledb *tdb, const char *db_home, const char *db_password,
tdb->keyed = true;
}
- rc = dbenv->repmgr_set_local_site(dbenv, rep_host, rep_port, 0);
- if (rc) {
- dbenv->err(dbenv, rc, "repmgr_set_local_site");
- goto err_out;
- }
-
rc = dbenv->set_event_notify(dbenv, db4_event);
if (rc) {
dbenv->err(dbenv, rc, "set_event_notify");
@@ -283,42 +268,65 @@ int tdb_init(struct tabledb *tdb, const char *db_home, const char *db_password,
// goto err_out;
// }
- rc = dbenv->rep_set_priority(dbenv, 100);
- if (rc) {
- dbenv->err(dbenv, rc, "rep_set_priority");
- goto err_out;
- }
+ if (rep_send) {
+ rc = dbenv->rep_set_transport(dbenv, rep_ourid, rep_send);
+ if (rc) {
+ dbenv->err(dbenv, rc, "rep_set_transport");
+ goto err_out;
+ }
- /* init DB transactional environment, stored in directory db_home */
- env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
- env_flags |= DB_INIT_TXN | DB_INIT_REP;
- rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
- if (rc) {
- dbenv->err(dbenv, rc, "open(dbenv)");
- goto err_out;
- }
+ // /*
+ // * Fix the derbies. This is the only way, since passing of
+ // * DB_REP_MASTER to rep_start() after a failover will end in:
+ // * "DB_REP_UNAVAIL: Unable to elect a master" (and a hang).
+ // */
+ // rc = dbenv->rep_set_priority(dbenv, we_are_master ? 100 : 10);
+ // if (rc) {
+ // dbenv->err(dbenv, rc, "rep_set_priority");
+ // goto err_out;
+ // }
+
+ env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
+ env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
+ env_flags |= DB_INIT_TXN | DB_INIT_REP;
+ rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
+ if (rc) {
+ dbenv->err(dbenv, rc, "open rep");
+ goto err_out;
+ }
- rc = add_remote_sites(dbenv, remotes, &nsites);
- if (rc)
- goto err_out;
+ rep_flags = we_are_master ? DB_REP_MASTER : DB_REP_CLIENT;
+ rc = dbenv->rep_start(dbenv, NULL, rep_flags);
+ if (rc) {
+ dbenv->err(dbenv, rc, "rep_start");
+ goto err_out;
+ }
- // rc = dbenv->rep_set_nsites(dbenv, nsites + 1);
- // if (rc) {
- // dbenv->err(dbenv, rc, "rep_set_nsites");
- // goto err_out;
- // }
+ } else {
+ env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
+ env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
+ env_flags |= DB_INIT_TXN;
+ rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
+ if (rc) {
+ dbenv->err(dbenv, rc, "open norep");
+ goto err_out;
+ }
- rc = dbenv->repmgr_start(dbenv, 2, DB_REP_ELECTION);
- if (rc) {
- dbenv->err(dbenv, rc, "repmgr_start");
- goto err_out;
+ /* XXX rip this out from tdbadm.c */
+ /*
+ * The db4 only delivers callbacks if replication was ordered.
+ * Since we force-set master, we ought to deliver them here
+ * for the universal code to work as if a master was elected.
+ */
+ if (cb)
+ (*cb)(we_are_master ? TDB_EV_MASTER : TDB_EV_CLIENT);
}
return 0;
err_out:
dbenv->close(dbenv, 0);
- return rc;
+ return -1;
}
/*
diff --git a/server/Makefile.am b/server/Makefile.am
index 6397245..5b53a0a 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -4,7 +4,7 @@ INCLUDES = -I$(top_srcdir)/include @GLIB_CFLAGS@ @HAIL_CFLAGS@
sbin_PROGRAMS = tabled tdbadm
tabled_SOURCES = tabled.h \
- bucket.c cldu.c config.c object.c replica.c \
+ bucket.c cldu.c config.c metarep.c object.c replica.c \
server.c status.c storage.c storparse.c util.c
tabled_LDADD = ../lib/libtdb.a \
@HAIL_LIBS@ @PCRE_LIBS@ @GLIB_LIBS@ \
diff --git a/server/bucket.c b/server/bucket.c
index a95d23e..eb03e03 100644
--- a/server/bucket.c
+++ b/server/bucket.c
@@ -43,11 +43,11 @@ bool has_access(const char *user, const char *bucket, const char *key,
size_t alloc_len, key_len = 0;
struct db_acl_key *acl_key;
struct db_acl_ent *acl;
- DB_ENV *dbenv = tdb.env;
+ DB_ENV *dbenv = tdbrep.tdb.env;
DB_TXN *txn = NULL;
DBT pkey, pval;
DBC *cur = NULL;
- DB *acls = tdb.acls;
+ DB *acls = tdbrep.tdb.acls;
if (user == NULL)
user = DB_ACL_ANON;
@@ -132,7 +132,7 @@ err_out:
static int add_access_user(DB_TXN *txn, const char *bucket, const char *key,
const char *user, const char *perms)
{
- DB *acls = tdb.acls;
+ DB *acls = tdbrep.tdb.acls;
int key_len;
int acl_len;
struct db_acl_ent *acl;
@@ -203,8 +203,8 @@ bool service_list(struct client *cli, const char *user)
bool rcb;
DB_TXN *txn = NULL;
DBC *cur = NULL;
- DB_ENV *dbenv = tdb.env;
- DB *bidx = tdb.buckets_idx;
+ DB_ENV *dbenv = tdbrep.tdb.env;
+ DB *bidx = tdbrep.tdb.buckets_idx;
DBT skey, pkey, pval;
if (asprintf(&s,
@@ -348,7 +348,7 @@ bool bucket_valid(const char *bucket)
static int bucket_find(DB_TXN *txn, const char *bucket, char *owner,
int owner_len)
{
- DB *buckets = tdb.buckets;
+ DB *buckets = tdbrep.tdb.buckets;
DBT key, val;
struct db_bucket_ent ent;
int rc;
@@ -455,9 +455,9 @@ bool bucket_add(struct client *cli, const char *user, const char *bucket)
struct db_bucket_ent ent;
bool setacl; /* is ok to put pre-existing bucket */
enum ReqACLC canacl;
- DB *buckets = tdb.buckets;
- DB *acls = tdb.acls;
- DB_ENV *dbenv = tdb.env;
+ DB *buckets = tdbrep.tdb.buckets;
+ DB *acls = tdbrep.tdb.acls;
+ DB_ENV *dbenv = tdbrep.tdb.env;
DB_TXN *txn = NULL;
DBT key, val;
@@ -589,11 +589,11 @@ bool bucket_del(struct client *cli, const char *user, const char *bucket)
enum errcode err = InternalError;
int rc;
struct db_bucket_ent ent;
- DB_ENV *dbenv = tdb.env;
+ DB_ENV *dbenv = tdbrep.tdb.env;
DB_TXN *txn = NULL;
- DB *buckets = tdb.buckets;
- DB *acls = tdb.acls;
- DB *objs = tdb.objs;
+ DB *buckets = tdbrep.tdb.buckets;
+ DB *acls = tdbrep.tdb.acls;
+ DB *objs = tdbrep.tdb.objs;
DBC *cur = NULL;
DBT key, val;
char structbuf[sizeof(struct db_acl_key) + 32];
@@ -922,9 +922,9 @@ static bool bucket_list_keys(struct client *cli, const char *user,
size_t pfx_len;
struct bucket_list_info bli;
bool rcb;
- DB_ENV *dbenv = tdb.env;
+ DB_ENV *dbenv = tdbrep.tdb.env;
DB_TXN *txn = NULL;
- DB *objs = tdb.objs;
+ DB *objs = tdbrep.tdb.objs;
DBC *cur = NULL;
DBT pkey, pval;
struct db_obj_key *obj_key;
@@ -1159,8 +1159,8 @@ bool access_list(struct client *cli, const char *bucket, const char *key,
GHashTable *param;
enum errcode err = InternalError;
- DB_ENV *dbenv = tdb.env;
- DB *acls = tdb.acls;
+ DB_ENV *dbenv = tdbrep.tdb.env;
+ DB *acls = tdbrep.tdb.acls;
int alloc_len;
char owner[64];
GList *res;
diff --git a/server/cldu.c b/server/cldu.c
index 5f3631b..45a6a83 100644
--- a/server/cldu.c
+++ b/server/cldu.c
@@ -35,6 +35,8 @@
#define ALIGN8(n) ((8 - ((n) & 7)) & 7)
+#define MASTER_FILE "MASTER"
+
struct chunk_node {
struct list_head link;
char name[65];
@@ -63,18 +65,22 @@ struct cld_session {
int actx; /* Active host cldv[actx] */
struct cld_host cldv[N_CLD];
+ char *thisname;
char *thisgroup;
char *thishost;
char *cfname; /* /tabled-group directory */
struct ncld_fh *cfh; /* /tabled-group directory, keep open for scan */
- char *ffname; /* /tabled-group/thishost */
- struct ncld_fh *ffh; /* /tabled-group/thishost, keep open for lock */
+ char *ffname; /* /tabled-group/thisname */
+ struct ncld_fh *ffh; /* /tabled-group/thisname, keep open for lock */
+ char *mfname; /* /tabled-group/MASTER */
+ struct ncld_fh *mfh; /* /tabled-group/MASTER, keep open for lock */
char *xfname; /* /chunk-GROUP directory */
struct list_head chunks; /* found in xfname, struct chunk_node */
};
static int cldu_set_cldc(struct cld_session *sp, int newactive);
+static int scan_peers(struct cld_session *sp);
static int scan_chunks(struct cld_session *sp);
static void next_chunk(struct cld_session *sp, struct chunk_node *np);
static void add_remote(const char *name);
@@ -113,13 +119,17 @@ static int cldu_nextactive(struct cld_session *sp)
* chunkservers that it uses, so this function only takes one group argument.
*/
static int cldu_setgroup(struct cld_session *sp,
- const char *thisgroup, const char *thishost)
+ const char *thisgroup, const char *thishost,
+ const char *thisname)
{
char *mem;
if (thisgroup == NULL) {
thisgroup = "default";
}
+ if (thisname == NULL) {
+ thisname = thishost;
+ }
sp->thisgroup = strdup(thisgroup);
if (!sp->thisgroup)
@@ -127,15 +137,22 @@ static int cldu_setgroup(struct cld_session *sp,
sp->thishost = strdup(thishost);
if (!sp->thishost)
goto err_oom;
+ sp->thisname = strdup(thisname);
+ if (!sp->thisname)
+ goto err_oom;
if (asprintf(&mem, "/tabled-%s", thisgroup) == -1)
goto err_oom;
sp->cfname = mem;
- if (asprintf(&mem, "/tabled-%s/%s", thisgroup, thishost) == -1)
+ if (asprintf(&mem, "/tabled-%s/%s", thisgroup, thisname) == -1)
goto err_oom;
sp->ffname = mem;
+ if (asprintf(&mem, "/tabled-%s/%s", thisgroup, MASTER_FILE) == -1)
+ goto err_oom;
+ sp->mfname = mem;
+
if (asprintf(&mem, "/chunk-%s", thisgroup) == -1)
goto err_oom;
sp->xfname = mem;
@@ -147,6 +164,259 @@ err_oom:
return 0;
}
+/*
+ * Ugh, side effects on tabled_srv.rep_master.
+ */
+static void cldu_parse_master(const char *mfname, const char *mfile, long len)
+{
+ enum lex_state { lex_tag, lex_colon, lex_val };
+ const char *tag, *val;
+ int taglen;
+ const char *name, *host, *port;
+ int namelen, hostlen, portlen;
+ char namebuf[65], hostbuf[65], portbuf[15];
+ long portnum;
+ enum lex_state state;
+ struct db_remote *rp;
+ const char *p;
+ char c;
+
+ name = NULL;
+ namelen = 0;
+ host = NULL;
+ hostlen = 0;
+ port = NULL;
+ portlen = 0;
+
+ p = mfile;
+ tag = p;
+ val = NULL;
+ state = lex_tag;
+ for (;;) {
+ if (p >= mfile+len)
+ break;
+ c = *p++;
+ if (state == lex_tag) {
+ if (c == ':') {
+ val = p;
+ state = lex_colon;
+ taglen = (p-1) - tag;
+ } else if (c == '\n') {
+ if (debugging)
+ applog(LOG_DEBUG,
+ "%s: No colon", mfname);
+ tag = p;
+ val = NULL;
+ state = lex_tag;
+ }
+ } else if (state == lex_colon) {
+ if (c == ' ') {
+ val = p;
+ } else if (c == '\n') {
+ if (debugging)
+ applog(LOG_DEBUG,
+ "%s: Empty value", mfname);
+ tag = p;
+ val = NULL;
+ state = lex_tag;
+ } else {
+ state = lex_val;
+ }
+ } else if (state == lex_val) {
+ if (c == '\n') {
+ if (taglen == sizeof("name")-1 &&
+ memcmp(tag, "name", taglen) == 0) {
+ name = val;
+ namelen = (p-1) - val;
+ } else if (taglen == sizeof("host")-1 &&
+ memcmp(tag, "host", taglen) == 0) {
+ host = val;
+ hostlen = (p-1) - val;
+ } else if (taglen == sizeof("port")-1 &&
+ memcmp(tag, "port", taglen) == 0) {
+ port = val;
+ portlen = (p-1) - val;
+ } else {
+ if (debugging)
+ applog(LOG_DEBUG,
+ "%s: Unknown tag %c[%d]",
+ mfname, tag[0], taglen);
+ }
+ tag = p;
+ val = NULL;
+ state = lex_tag;
+ }
+ } else {
+ return;
+ }
+ }
+
+ if (!name || !namelen) {
+ if (debugging)
+ applog(LOG_DEBUG, "%s: No name", mfname);
+ return;
+ }
+ if (!host || !hostlen) {
+ if (debugging)
+ applog(LOG_DEBUG, "%s: No host", mfname);
+ return;
+ }
+ if (!port || !portlen) {
+ if (debugging)
+ applog(LOG_DEBUG, "%s: No port", mfname);
+ return;
+ }
+
+ if (namelen >= sizeof(namebuf)) {
+ applog(LOG_ERR, "Long master name");
+ return;
+ }
+ memcpy(namebuf, name, namelen);
+ namebuf[namelen] = 0;
+
+ if (hostlen >= sizeof(hostbuf)) {
+ applog(LOG_ERR, "Long host");
+ return;
+ }
+ memcpy(hostbuf, host, hostlen);
+ hostbuf[hostlen] = 0;
+
+ if (portlen >= sizeof(portbuf)) {
+ applog(LOG_ERR, "Long port");
+ return;
+ }
+ memcpy(portbuf, port, portlen);
+ portbuf[portlen] = 0;
+ portnum = strtol(port, NULL, 10);
+ if (portnum <= 0 || portnum >= 65536) {
+ applog(LOG_ERR, "Bad port %s", portbuf);
+ return;
+ }
+
+ rp = tdb_find_remote_byname(namebuf);
+ if (!rp) {
+ if (debugging)
+ applog(LOG_DEBUG, "%s: Not found master %s",
+ mfname, namebuf);
+ return;
+ }
+
+ if (debugging)
+ applog(LOG_DEBUG, "Found master %s host %s port %u",
+ namebuf, hostbuf, portnum);
+
+ rp->host = strdup(hostbuf);
+ rp->port = portnum;
+ if (!rp->host)
+ return;
+ tabled_srv.rep_master = rp;
+}
+
+static void cldu_get_master(const char *mfname, struct ncld_fh *mfh)
+{
+ struct ncld_read *nrp;
+ struct timespec tm;
+ int error;
+
+ nrp = ncld_get(mfh, &error);
+ if (!nrp) {
+ applog(LOG_ERR, "CLD get(%s) failed: %d", mfname, error);
+ return;
+ }
+
+ if (nrp->length < 3) {
+ ncld_read_free(nrp);
+
+ /*
+ * Since master opens, locks, and writes, in that order,
+ * there's a gap between the lock and write. So, unrace a bit.
+ */
+ tm.tv_sec = 2;
+ tm.tv_nsec = 0;
+ nanosleep(&tm, NULL);
+
+ nrp = ncld_get(mfh, &error);
+ if (!nrp) {
+ applog(LOG_ERR, "CLD get(%s) failed: %d", mfname, error);
+ return;
+ }
+
+ if (nrp->length < 3) {
+ applog(LOG_ERR, "CLD master(%s) is empty", mfname);
+ ncld_read_free(nrp);
+ return;
+ }
+ }
+
+ cldu_parse_master(mfname, nrp->ptr, nrp->length);
+ ncld_read_free(nrp);
+}
+
+/*
+ * Lock the MASTER file, write or read it as needed.
+ * N.B. Only call this if you know that mfh is closed or never open:
+ * right after cldu_set_cldc (disposing of session closes handles),
+ * or when we were slave and so should not kept mfh ...
+ * FIXME this will become more interesting when we keep mfh open in slave
+ * state so we can have outstanding locks for master failover notification.
+ */
+static int cldu_set_master(struct cld_session *sp)
+{
+ char *buf;
+ int len;
+ int error;
+ int rc;
+
+ if (!sp->nsp)
+ return -1;
+
+ /* Maybe drop this later, after notifications work. */
+ if (debugging) {
+ rc = g_list_length(sp->nsp->handles);
+ applog(LOG_DEBUG, "open handles %d", rc);
+ }
+
+ sp->mfh = ncld_open(sp->nsp, sp->mfname,
+ COM_READ | COM_WRITE | COM_LOCK | COM_CREATE,
+ &error, 0, NULL, NULL);
+ if (!sp->mfh) {
+ applog(LOG_ERR, "CLD open(%s) failed: %d", sp->mfname, error);
+ goto err_open;
+ }
+
+ error = ncld_trylock(sp->mfh);
+ if (error) {
+ applog(LOG_INFO, "CLD lock(%s) failed: %d", sp->mfname, error);
+ cldu_get_master(sp->mfname, sp->mfh);
+ goto err_lock;
+ }
+
+ len = asprintf(&buf, "name: %s\nhost: %s\nport: %u\n",
+ sp->thisname, sp->thishost, tabled_srv.rep_port);
+ if (len < 0) {
+ applog(LOG_ERR, "internal error: no core");
+ goto err_wmem;
+ }
+
+ rc = ncld_write(sp->mfh, buf, len);
+ if (rc) {
+ applog(LOG_ERR, "CLD put(%s) failed: %d", sp->mfname, rc);
+ goto err_write;
+ }
+
+ free(buf);
+ return 0;
+
+err_write:
+ free(buf);
+err_wmem:
+ /* ncld_unlock() - close will unlock */
+err_lock:
+ ncld_close(sp->mfh);
+err_open:
+ return -1;
+}
+
static void cldu_tm_rescan(int fd, short events, void *userdata)
{
struct cld_session *sp = userdata;
@@ -162,14 +432,37 @@ static void cldu_tm_rescan(int fd, short events, void *userdata)
sp->nsp = NULL;
}
newactive = cldu_nextactive(sp);
- if (cldu_set_cldc(sp, newactive)) {
- evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
- return;
+ if (cldu_set_cldc(sp, newactive))
+ goto out;
+
+ if (cldu_set_master(sp) == 0) {
+ tabled_srv.state_want = ST_W_MASTER;
+ } else {
+ if (debugging)
+ applog(LOG_DEBUG, "Unable to relock %s",
+ sp->mfname);
+ tabled_srv.state_want = ST_W_SLAVE;
}
+ cld_update_cb();
+
sp->is_dead = false;
+ } else {
+ if (tabled_srv.state_want == ST_W_SLAVE) {
+ if (cldu_set_master(sp) == 0) {
+ tabled_srv.state_want = ST_W_MASTER;
+ } else {
+ if (debugging)
+ applog(LOG_DEBUG, "Unable to lock %s",
+ sp->mfname);
+ }
+ }
}
+ if (scan_peers(sp) != 0)
+ goto out;
scan_chunks(sp);
+
+ out:
evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
}
@@ -201,12 +494,6 @@ static void cldu_sess_event(void *priv, uint32_t what)
static int cldu_set_cldc(struct cld_session *sp, int newactive)
{
struct cldc_host *hp;
- struct ncld_read *nrp;
- char buf[100];
- const char *ptr;
- int dir_len;
- int total_len, rec_len, name_len;
- int len;
struct timespec tm;
int error;
int rc;
@@ -261,6 +548,7 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
/*
* Then, create the membership file for us.
+ * We lock it in case of two tabled running with same name by mistake.
*/
sp->ffh = ncld_open(sp->nsp, sp->ffname,
COM_WRITE | COM_LOCK | COM_CREATE,
@@ -285,11 +573,7 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
/*
* The usual reason why we get a lock conflict is
* restarting too quickly and hitting the previous lock
- * that is going to disappear soon.
- *
- * FIXME: However, it may also be that a master
- * is ok we we should become a slave, e.g. start TDB.
- * We do not support multi-node, but we should.
+ * that is going to disappear soon. Just wait it out.
*/
tm.tv_sec = 10;
tm.tv_nsec = 0;
@@ -299,21 +583,43 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
/*
* Write the file with our connection parameters.
*/
- len = snprintf(buf, sizeof(buf), "port: %u\n", tabled_srv.rep_port);
- if (len >= sizeof(buf)) {
- applog(LOG_ERR, "internal error: overflow for port (%d)", len);
- goto err_wmem;
- }
-
- rc = ncld_write(sp->ffh, buf, len);
+ rc = ncld_write(sp->ffh, "-\n", 2);
if (rc) {
applog(LOG_ERR, "CLD put(%s) failed: %d", sp->ffname, rc);
goto err_write;
}
/*
- * Read the directory.
+ * Finally, scan cfh to find peers, add with global effects.
*/
+ if (scan_peers(sp) != 0)
+ goto err_pscan;
+
+ return 0;
+
+err_pscan:
+err_write:
+err_lock:
+ ncld_close(sp->ffh); /* session-close closes these, maybe drop */
+err_fopen:
+ ncld_close(sp->cfh);
+err_copen:
+ ncld_sess_close(sp->nsp);
+ sp->nsp = NULL;
+err_nsess:
+err_addr:
+ return -1;
+}
+
+static int scan_peers(struct cld_session *sp)
+{
+ struct ncld_read *nrp;
+ char buf[65];
+ const char *ptr;
+ int dir_len;
+ int total_len, rec_len, name_len;
+ int error;
+
nrp = ncld_get(sp->cfh, &error);
if (!nrp) {
applog(LOG_ERR, "CLD get(%s) failed: %d", sp->cfname, error);
@@ -336,13 +642,20 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
else
buf[64] = 0;
- if (!strcmp(buf, sp->thishost)) {
+ if (!strcmp(buf, MASTER_FILE)) {
+ ; /* ignore special entry */
+ } else if (!strcmp(buf, sp->thisname)) {
if (debugging)
applog(LOG_DEBUG, " %s (ourselves)", buf);
} else {
- if (debugging)
- applog(LOG_DEBUG, " %s", buf);
- add_remote(buf);
+ if (tdb_find_remote_byname(buf)) {
+ if (debugging)
+ applog(LOG_DEBUG, " %s", buf);
+ } else {
+ if (debugging)
+ applog(LOG_DEBUG, " %s (new)", buf);
+ add_remote(buf);
+ }
}
ptr += total_len;
@@ -350,21 +663,9 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
}
ncld_read_free(nrp);
-
return 0;
err_dread:
-err_write:
-err_wmem:
-err_lock:
- ncld_close(sp->ffh); /* session-close closes these, maybe drop */
-err_fopen:
- ncld_close(sp->cfh);
-err_copen:
- ncld_sess_close(sp->nsp);
- sp->nsp = NULL;
-err_nsess:
-err_addr:
return -1;
}
@@ -508,9 +809,6 @@ err_mem:
return;
}
-/*
- * FIXME need to read port number from the file (port:<space>num).
- */
static void add_remote(const char *name)
{
struct db_remote *rp;
@@ -518,10 +816,15 @@ static void add_remote(const char *name)
rp = malloc(sizeof(struct db_remote));
if (!rp)
return;
+ memset(rp, 0, sizeof(struct db_remote));
+
+ /*
+ * Master assigns global IDs now, distributes them in login protocol.
+ */
+ rp->dbid = DBID_NONE;
- rp->port = 8083;
- rp->host = strdup(name);
- if (!rp->host) {
+ rp->name = strdup(name);
+ if (!rp->name) {
free(rp);
return;
}
@@ -564,7 +867,8 @@ void cld_init()
/*
* This initiates our sole session with a CLD instance.
*/
-int cld_begin(const char *thishost, const char *thisgroup, int verbose)
+int cld_begin(const char *thishost, const char *thisgroup,
+ const char *thisname, int verbose)
{
static struct cld_session *sp = &ses;
struct timespec tm;
@@ -575,7 +879,7 @@ int cld_begin(const char *thishost, const char *thisgroup, int verbose)
evtimer_set(&ses.tm_rescan, cldu_tm_rescan, &ses);
- if (cldu_setgroup(sp, thisgroup, thishost)) {
+ if (cldu_setgroup(sp, thisgroup, thishost, thisname)) {
/* Already logged error */
goto err_group;
}
@@ -626,6 +930,14 @@ int cld_begin(const char *thishost, const char *thisgroup, int verbose)
newactive = cldu_nextactive(sp);
}
+ if (cldu_set_master(sp) == 0) {
+ if (debugging)
+ applog(LOG_DEBUG, "Locked %s", sp->mfname);
+ tabled_srv.state_want = ST_W_MASTER;
+ } else {
+ tabled_srv.state_want = ST_W_SLAVE;
+ }
+
retry_cnt = 0;
for (;;) {
if (!scan_chunks(sp))
@@ -696,8 +1008,12 @@ void cld_end(void)
sp->ffname = NULL;
free(sp->xfname);
sp->xfname = NULL;
+ free(sp->mfname);
+ sp->mfname = NULL;
free(sp->thisgroup);
sp->thisgroup = NULL;
free(sp->thishost);
sp->thishost = NULL;
+ free(sp->thisname);
+ sp->thisname = NULL;
}
diff --git a/server/config.c b/server/config.c
index ff4d876..293a5dd 100644
--- a/server/config.c
+++ b/server/config.c
@@ -224,6 +224,16 @@ static void cfg_elm_end (GMarkupParseContext *context,
cc->text = NULL;
}
+ else if (!strcmp(element_name, "TDBRepName")) {
+ if (!cc->text) {
+ applog(LOG_WARNING, "TDBRepName element empty");
+ return;
+ }
+ free(tabled_srv.rep_name);
+ tabled_srv.rep_name = cc->text;
+ cc->text = NULL;
+ }
+
else if (!strcmp(element_name, "StatusPort")) {
if (!cc->text) {
applog(LOG_WARNING, "StatusPort element empty");
diff --git a/server/metarep.c b/server/metarep.c
new file mode 100644
index 0000000..d3eec49
--- /dev/null
+++ b/server/metarep.c
@@ -0,0 +1,1245 @@
+
+/*
+ * Copyright 2008-2009 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING. If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define _GNU_SOURCE
+#include "tabled-config.h"
+
+#include <sys/types.h>
+#include <sys/ioctl.h>
+#include <stddef.h>
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <syslog.h>
+#include <glib.h>
+#include <tdb.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include "tabled.h"
+
+/* #define offsetof(type, member) \
+ (((unsigned char *)&((type *)0)->member) - (unsigned char *)0) */
+
+/*
+ * flags:
+ * <31:28> version (currently 1)
+ * <27:8> unused
+ * <7:0> rep_msg_type
+ */
+enum rep_msg_type { REP_MSG_NOP, REP_MSG_LOGIN, REP_MSG_LOGOK, REP_MSG_DATA };
+struct rep_msg_hdr {
+ unsigned int flags;
+ unsigned int lenctl;
+ unsigned int lendata;
+ unsigned short dst, src;
+};
+
+/*
+ * The naming convention is to identify the context in which the function runs.
+ */
+static int rtdb_master_login_reply(struct db_conn *dbc, unsigned char *msgbuf);
+
+/*
+ * Note that the invalid dbid is zero, not -1.
+ */
+static int make_remote_id(void)
+{
+ int id;
+
+ for (;;) {
+ id = rand() % (DBID_MAX+1 - DBID_MIN) + DBID_MIN;
+ if (!tdb_find_remote_byid(id))
+ return id;
+ }
+}
+
+static int dbl_init(struct db_link *dbl)
+{
+ dbl->fd = -1;
+ dbl->state = DBC_INIT;
+
+ dbl->obuflen = 500;
+ dbl->obuf = malloc(dbl->obuflen);
+ if (!dbl->obuf)
+ return -1;
+
+ dbl->ibuflen = sizeof(struct rep_msg_hdr);
+ dbl->ibuf = malloc(dbl->ibuflen);
+ if (!dbl->ibuf) {
+ free(dbl->obuf);
+ return -1;
+ }
+ dbl->cnt = 0;
+ dbl->explen = 1;
+
+ return 0;
+}
+
+static int dbl_irealloc(struct db_link *dbl, int len)
+{
+ unsigned char *newbuf;
+
+ if (len > dbl->ibuflen) {
+ if (!(newbuf = malloc(len)))
+ return -1;
+ memcpy(newbuf, dbl->ibuf, dbl->ibuflen);
+ free(dbl->ibuf);
+ dbl->ibuf = newbuf;
+ dbl->ibuflen = len;
+ }
+ return 0;
+}
+
+/*
+ * Expect the dbl->explen, return accumulated dbl->cnt.
+ */
+static int dbl_expect(struct db_link *dbl)
+{
+ int rc;
+
+ rc = read(dbl->fd, dbl->ibuf + dbl->cnt, dbl->explen - dbl->cnt);
+ if (rc < 0) {
+ if (errno == EAGAIN)
+ return dbl->cnt;
+ applog(LOG_ERR, "network read: %s", strerror(errno));
+ return -1;
+ }
+ if (rc == 0) {
+ applog(LOG_ERR, "EOF from peer"); /* P3 */
+ return -1;
+ }
+ dbl->cnt += rc;
+ return dbl->cnt;
+}
+
+static int dbl_hdr_validate(struct rep_msg_hdr *hdr, int thisid)
+{
+ unsigned int msgflags;
+ int srcid, dstid;
+
+ msgflags = GUINT32_FROM_BE(hdr->flags);
+ if ((msgflags >> 28) != 1) {
+ applog(LOG_ERR, "Link: bad protocol, flags 0x%08x", msgflags);
+ return -1;
+ }
+
+ srcid = GUINT16_TO_BE(hdr->src);
+ dstid = GUINT16_TO_BE(hdr->dst);
+ if (srcid == dstid) {
+ applog(LOG_ERR, "Link: loopback, dbid %d", dstid);
+ return -1;
+ }
+ if (srcid < DBID_MIN || srcid > DBID_MAX) {
+ applog(LOG_ERR, "Link: bad src dbid %d", srcid);
+ return -1;
+ }
+ if (dstid < DBID_MIN || dstid > DBID_MAX) {
+ applog(LOG_ERR, "Link: bad dst dbid %d", dstid);
+ return -1;
+ }
+
+ if (thisid != 0 && dstid != thisid) {
+ applog(LOG_ERR, "Link: misdirected, dst dbid %d our dbid %d",
+ dstid, thisid);
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ * Login message is different in two ways:
+ * - src and/or dst may be not set
+ * - lenctl is the dst name, lendata is the src name
+ * (but contents not available for validation)
+ */
+static int dbl_hdr_validate_login(struct rep_msg_hdr *hdr, int thisid)
+{
+ unsigned int msgflags;
+ unsigned int len;
+
+ msgflags = GUINT32_FROM_BE(hdr->flags);
+ if ((msgflags >> 28) != 1) {
+ applog(LOG_ERR, "Link: bad protocol, flags 0x%08x", msgflags);
+ return -1;
+ }
+ if ((msgflags & 0xff) != REP_MSG_LOGIN) {
+ applog(LOG_ERR, "Link: bad login request, flags 0x%08x",
+ msgflags);
+ return -1;
+ }
+
+#if 0 /* A bad idea as long as names are the persistent identifiers. */
+ int dstid;
+ dstid = GUINT32_FROM_BE(hdr->dst);
+ if (dstid && dstid != thisid) {
+ applog(LOG_ERR, "Link: login to wrong dbid %d", dstid);
+ return -1;
+ }
+#endif
+
+ len = GUINT32_FROM_BE(hdr->lenctl);
+ if (len == 0 || len > 64) {
+ applog(LOG_ERR, "Link: bad login dst len %u", len);
+ return -1;
+ }
+
+ len = GUINT32_FROM_BE(hdr->lendata);
+ if (len == 0 || len > 64) {
+ applog(LOG_ERR, "Link: bad login src len %u", len);
+ return -1;
+ }
+
+ return 0;
+}
+
+static void dbl_fini(struct db_link *dbl)
+{
+ if (dbl->writing) {
+ event_del(&dbl->wrev);
+ dbl->writing = false;
+ }
+ if (dbl->fd >= 0) {
+ event_del(&dbl->rcev);
+ close(dbl->fd);
+ }
+ if (dbl->ibuf)
+ free(dbl->ibuf);
+ if (dbl->obuf)
+ free(dbl->obuf);
+}
+
+static struct db_conn *tdb_find_byid(struct tablerep *rtdb, int id)
+{
+ struct db_conn *dbc;
+
+ list_for_each_entry(dbc, &rtdb->conns, link) {
+ if (dbc->remote && dbc->remote->dbid == id)
+ return dbc;
+ }
+ return NULL;
+}
+
+static struct db_conn *dbc_alloc(struct tablerep *rtdb, struct db_remote *rem)
+{
+ struct db_conn *dbc;
+
+ dbc = malloc(sizeof(*dbc));
+ if (!dbc)
+ goto out_mem;
+ memset(dbc, 0, sizeof(*dbc));
+ dbc->rtdb = rtdb;
+ dbc->remote = rem;
+ if (dbl_init(&dbc->lk))
+ goto out_dbl;
+ return dbc;
+
+ out_dbl:
+ free(dbc);
+ out_mem:
+ return NULL;
+}
+
+static void dbc_free(struct db_conn *dbc)
+{
+ dbl_fini(&dbc->lk);
+ free(dbc);
+}
+
+/*
+ * The dbc->remote is known here, see callers.
+ *
+ * The db4 code assumes that it is all right to block when sending. Of course
+ * in our case that means blocking the whole (single-threaded) server.
+ * It is also all right to drop messages, which is said to hurt performance
+ * in other ways. Still, as long as tabled is single-theaded we have no choice.
+ *
+ * Since we can only send complete messages, and even blocking sockets can
+ * return short writes, we must buffer output. But we do not create any
+ * additional queues beyond what is required for the atomicity.
+ */
+static int tdb_rep_send(struct tablerep *rtdb, struct db_link *dbl,
+ int dstid, const DBT *ctl, const DBT *rec,
+ bool easydrop)
+{
+ unsigned char *p;
+ struct rep_msg_hdr *hdr;
+ unsigned int msgflags;
+ ssize_t len;
+ ssize_t rc;
+
+ if (dbl->togo) {
+ /* Maybe poke the output here? Should not be necessary. */
+ return 1;
+ }
+
+ len = sizeof(struct rep_msg_hdr) + ctl->size + rec->size;
+ if (dbl->obuflen < len) {
+ free(dbl->obuf);
+ dbl->obuflen = 0;
+ dbl->obuf = malloc(len);
+ if (!dbl->obuf) {
+ applog(LOG_WARNING, "No core (%ld)", (long) len);
+ return -1;
+ }
+ dbl->obuflen = len;
+ }
+
+ hdr = (struct rep_msg_hdr *) dbl->obuf;
+ p = dbl->obuf;
+
+ memset(hdr, 0, sizeof(struct rep_msg_hdr));
+ msgflags = (1 << 28) | (REP_MSG_DATA);
+ hdr->flags = GUINT32_TO_BE(msgflags);
+ hdr->dst = GUINT16_TO_BE((unsigned short)dstid);
+ hdr->src = GUINT16_TO_BE((unsigned short)rtdb->thisid);
+ p += sizeof(struct rep_msg_hdr);
+ if (ctl->size) {
+ hdr->lenctl = GUINT32_TO_BE(ctl->size);
+ memcpy(p, ctl->data, ctl->size);
+ p += ctl->size;
+ }
+ if (rec->size) {
+ hdr->lendata = GUINT32_TO_BE(rec->size);
+ memcpy(p, rec->data, rec->size);
+ p += rec->size;
+ }
+
+ dbl->done = 0;
+ dbl->togo = p - dbl->obuf;
+
+ rc = write(dbl->fd, dbl->obuf + dbl->done, dbl->togo);
+ if (rc < 0) {
+ dbl->done = 0;
+ dbl->togo = 0;
+ applog(LOG_ERR, "socket write error, peer dbid %d: %s",
+ dstid, strerror(errno));
+ return -1;
+ }
+ if (rc < dbl->togo) {
+ if (!dbl->writing) {
+ if (event_add(&dbl->wrev, NULL))
+ applog(LOG_ERR, "event_add failed (write)");
+ else
+ dbl->writing = true;
+ }
+ }
+ dbl->done += rc;
+ dbl->togo -= rc;
+ return 0;
+}
+
+static int db4_rep_send(DB_ENV *dbenv, const DBT *ctl, const DBT *rec,
+ const DB_LSN *lsnp, int envid, uint32_t flags)
+{
+ struct tablerep *rtdb;
+ struct db_conn *dbc;
+ int cnt;
+ int rc;
+
+ rtdb = (struct tablerep *)
+ ((char *)dbenv->app_private - offsetof(struct tablerep, tdb));
+
+ if (envid == DB_EID_BROADCAST) {
+ cnt = 0;
+ list_for_each_entry(dbc, &rtdb->conns, link) {
+ if (dbc->lk.state == DBC_OPEN) {
+ rc = tdb_rep_send(rtdb, &dbc->lk,
+ dbc->remote->dbid,
+ ctl, rec, true);
+ if (!rc)
+ cnt++;
+ if (rc < 0)
+ dbc->lk.state = DBC_DEAD;
+ }
+ }
+ if (!cnt)
+ return DB_REP_UNAVAIL;
+ } else {
+ dbc = tdb_find_byid(rtdb, envid);
+ if (dbc && dbc->lk.state == DBC_OPEN) {
+ rc = tdb_rep_send(rtdb, &dbc->lk,
+ dbc->remote->dbid, ctl, rec, false);
+ if (rc < 0) {
+ dbc->lk.state = DBC_DEAD;
+ return DB_REP_UNAVAIL;
+ }
+ if (rc)
+ return DB_REP_UNAVAIL;
+ } else {
+ applog(LOG_INFO, "Send: dbid %d not found", envid);
+ return DB_REP_UNAVAIL;
+ }
+ }
+ return 0;
+}
+
+static int rtdb_process(struct db_conn *dbc, unsigned char *msgbuf)
+{
+ struct rep_msg_hdr *hdr = (struct rep_msg_hdr *) msgbuf;
+ DB_ENV *dbenv = dbc->rtdb->tdb.env;
+ DBT pctl, prec;
+ DB_LSN lsn;
+ struct db_remote *peer;
+ int rc;
+
+ peer = tdb_find_remote_byid(GUINT16_FROM_BE(hdr->src));
+ if (!peer) {
+ applog(LOG_INFO, "Unknown peer dbid %d",
+ GUINT16_FROM_BE(hdr->src));
+ return -1;
+ }
+
+ memset(&pctl, 0, sizeof(pctl));
+ pctl.data = msgbuf + sizeof(struct rep_msg_hdr);
+ pctl.size = GUINT32_FROM_BE(hdr->lenctl);
+ memset(&prec, 0, sizeof(prec));
+ prec.data = pctl.data + pctl.size;
+ prec.size = GUINT32_FROM_BE(hdr->lendata);
+ rc = dbenv->rep_process_message(dbenv, &pctl, &prec, peer->dbid, &lsn);
+ switch (rc) {
+ case DB_REP_ISPERM:
+ /*
+ * The "record is written" is normal in db4 operations,
+ * and shows up so much that we do not print it even under
+ * if (debugging).
+ */
+ break;
+ case DB_REP_DUPMASTER: /* DB thinks we have 2 */
+ case DB_REP_HANDLE_DEAD: /* what handle? */
+ case DB_REP_HOLDELECTION: /* maybe just rep_init it */
+ case DB_REP_IGNORE: /* well, whatever */
+ case DB_REP_JOIN_FAILURE:
+ case DB_REP_LEASE_EXPIRED:
+ case DB_REP_LOCKOUT:
+ case DB_REP_NEWSITE:
+ case DB_REP_NOTPERM:
+ case DB_REP_UNAVAIL:
+ default:
+ if (rc) {
+ applog(LOG_INFO, "rep_process_message: %d (%s)",
+ rc, db_strerror(rc));
+ }
+ }
+
+ return 0;
+}
+
+static int rtdb_send_more(struct db_link *dbl)
+{
+ ssize_t rc;
+
+ if (!dbl->togo) {
+ /* P3 */ applog(LOG_INFO, "stray write event");
+ event_del(&dbl->wrev);
+ dbl->writing = false;
+ return 0;
+ }
+
+ rc = write(dbl->fd, dbl->obuf + dbl->done, dbl->togo);
+ if (rc < 0) {
+ applog(LOG_ERR, "socket write error: %s", strerror(errno));
+ dbl->done = 0;
+ dbl->togo = 0;
+ return -1;
+ }
+ if (rc < dbl->togo) {
+ dbl->done += rc;
+ dbl->togo -= rc;
+ if (!dbl->writing) {
+ if (event_add(&dbl->wrev, NULL))
+ applog(LOG_ERR, "event_add failed (write)");
+ else
+ dbl->writing = true;
+ }
+ } else {
+ dbl->done = 0;
+ dbl->togo = 0;
+ if (dbl->writing) {
+ event_del(&dbl->wrev);
+ dbl->writing = false;
+ }
+ }
+ return 0;
+}
+
+static void rtdb_wr_event(int fd, short events, void *userdata)
+{
+ struct db_link *dbl = userdata;
+
+ if (rtdb_send_more(dbl))
+ dbl->state = DBC_DEAD;
+}
+
+static void rtdb_master_tcp_event(int fd, short events, void *userdata)
+{
+ struct db_conn *dbc = userdata;
+ struct rep_msg_hdr *hdr;
+ unsigned msgflags;
+ int ctllen, reclen;
+ int len;
+ int rc;
+
+ switch (dbc->lk.state) {
+ case DBC_LOGIN:
+ rc = dbl_expect(&dbc->lk);
+ if (rc < 0)
+ goto out_bad_dbc;
+ if (rc < dbc->lk.explen)
+ return;
+
+ if (dbc->lk.explen == sizeof(struct rep_msg_hdr)) {
+ hdr = (struct rep_msg_hdr *) dbc->lk.ibuf;
+ if (dbl_hdr_validate_login(hdr, dbc->rtdb->thisid))
+ goto out_bad_dbc;
+
+ ctllen = GUINT32_FROM_BE(hdr->lenctl);
+ reclen = GUINT32_FROM_BE(hdr->lendata);
+ len = sizeof(struct rep_msg_hdr) + ctllen + reclen;
+ if (dbl_irealloc(&dbc->lk, len) < 0) {
+ applog(LOG_ERR, "No core (%d)", len);
+ goto out_bad_dbc;
+ }
+ dbc->lk.explen = len;
+ } else {
+ if (rtdb_master_login_reply(dbc, dbc->lk.ibuf))
+ goto out_bad_dbc;
+
+ dbc->lk.state = DBC_OPEN;
+ dbc->lk.cnt = 0;
+ dbc->lk.explen = sizeof(struct rep_msg_hdr);
+ }
+ break;
+ case DBC_OPEN:
+ rc = dbl_expect(&dbc->lk);
+ if (rc < 0)
+ goto out_bad_dbc;
+ if (rc < dbc->lk.explen)
+ return;
+
+ if (dbc->lk.explen == sizeof(struct rep_msg_hdr)) {
+ hdr = (struct rep_msg_hdr *) dbc->lk.ibuf;
+ if (dbl_hdr_validate(hdr, dbc->rtdb->thisid))
+ goto out_bad_dbc;
+ msgflags = GUINT32_FROM_BE(hdr->flags);
+ if ((msgflags & 0xff) != REP_MSG_DATA) {
+ applog(LOG_ERR,
+ "Bad data message, flags 0x%08x",
+ msgflags);
+ goto out_bad_dbc;
+ }
+
+ ctllen = GUINT32_FROM_BE(hdr->lenctl);
+ reclen = GUINT32_FROM_BE(hdr->lendata);
+ len = sizeof(struct rep_msg_hdr) + ctllen + reclen;
+ if (dbl_irealloc(&dbc->lk, len) < 0) {
+ applog(LOG_ERR, "No core (%d)", len);
+ goto out_bad_dbc;
+ }
+ dbc->lk.explen = len;
+ } else {
+ if (rtdb_process(dbc, dbc->lk.ibuf))
+ goto out_bad_dbc;
+
+ dbc->lk.state = DBC_OPEN;
+ dbc->lk.cnt = 0;
+ dbc->lk.explen = sizeof(struct rep_msg_hdr);
+ }
+ break;
+ default: // DBC_DEAD
+ if (dbc->remote) {
+ applog(LOG_INFO,
+ "Event on a dead slave socket, slave %s",
+ dbc->remote->host);
+ } else {
+ applog(LOG_INFO,
+ "Event on a dead slave socket");
+ }
+ tdb_conn_scrub_cb();
+ }
+ return;
+
+ out_bad_dbc:
+ dbc->lk.state = DBC_DEAD;
+ tdb_conn_scrub_cb();
+ return;
+}
+
+static void tdb_conn_event(int fd, short events, void *userdata)
+{
+ struct tablerep *rtdb = userdata;
+ struct db_conn *dbc;
+ struct sockaddr_in6 addr;
+ socklen_t addrlen;
+ char host[65], port[15];
+
+ dbc = dbc_alloc(rtdb, NULL);
+ if (!dbc)
+ goto out_dbc;
+ dbc->lk.explen = sizeof(struct rep_msg_hdr);
+ dbc->lk.state = DBC_LOGIN;
+
+ addrlen = sizeof(addr);
+ dbc->lk.fd = accept(fd, (struct sockaddr *) &addr, &addrlen);
+ if (dbc->lk.fd < 0) {
+ applog(LOG_ERR, "accept: %s", strerror(errno));
+ goto out_accept;
+ }
+
+ getnameinfo((struct sockaddr *) &addr, addrlen,
+ host, sizeof(host), port, sizeof(port),
+ NI_NUMERICHOST|NI_NUMERICSERV);
+ applog(LOG_INFO, "db slave host %s port %s", host, port);
+
+ if (fcntl(dbc->lk.fd, F_SETFL, O_NONBLOCK) < 0) {
+ applog(LOG_ERR, "fcntl: %s", strerror(errno));
+ goto out_flags;
+ }
+
+ event_set(&dbc->lk.rcev, dbc->lk.fd, EV_READ | EV_PERSIST,
+ rtdb_master_tcp_event, dbc);
+ event_set(&dbc->lk.wrev, dbc->lk.fd, EV_WRITE | EV_PERSIST,
+ rtdb_wr_event, &dbc->lk);
+ if (event_add(&dbc->lk.rcev, NULL) < 0) {
+ applog(LOG_ERR, "event_add failed");
+ goto out_add;
+ }
+ list_add_tail(&dbc->link, &rtdb->conns);
+ return;
+
+ out_add:
+ out_flags:
+ close(dbc->lk.fd);
+ out_accept:
+ dbc_free(dbc);
+ out_dbc:
+ return;
+}
+
+static int tdb_rep_listen_open(struct sockaddr_in *addr, int addr_len)
+{
+ int fd;
+ int on;
+ int rc;
+
+ fd = socket(addr->sin_family, SOCK_STREAM, 0);
+ if (fd < 0)
+ return -errno;
+
+ on = 1;
+ if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
+ rc = -errno;
+ goto out_err;
+ }
+
+ if (bind(fd, addr, addr_len) < 0) {
+ rc = -errno;
+ goto out_err;
+ }
+
+ // rc = fsetflags("tcp server", fd, O_NONBLOCK);
+ // if (rc) {
+ // rc = -errno;
+ // goto out_err;
+ // }
+
+ if (listen(fd, 100) < 0) {
+ rc = -errno;
+ goto out_err;
+ }
+
+ return fd;
+
+ out_err:
+ close(fd);
+ return rc;
+}
+
+static int rtdb_rep_listen(struct tablerep *rtdb, unsigned short port)
+{
+ struct sockaddr_in addr4;
+ struct sockaddr_in6 addr6;
+ int rc;
+
+ memset(&addr6, 0, sizeof(addr6));
+ addr6.sin6_family = AF_INET6;
+ addr6.sin6_port = htons(port);
+ memcpy(&addr6.sin6_addr, &in6addr_any, sizeof(struct in6_addr));
+ rc = tdb_rep_listen_open((struct sockaddr_in *)&addr6, sizeof(addr6));
+ if (rc < 0) {
+ if (debugging)
+ applog(LOG_DEBUG,
+ "tdb_rep_listen_open(v6, %u) failed: %s",
+ port, strerror(-rc));
+ } else {
+ rtdb->sockfd6 = rc;
+ event_set(&rtdb->lsev6, rtdb->sockfd6, EV_READ | EV_PERSIST,
+ tdb_conn_event, rtdb);
+ if (event_add(&rtdb->lsev6, NULL) < 0)
+ applog(LOG_ERR, "event_add failed");
+ }
+
+ memset(&addr4, 0, sizeof(addr4));
+ addr4.sin_family = AF_INET;
+ addr4.sin_port = htons(port);
+ addr4.sin_addr.s_addr = htonl(INADDR_ANY);
+ rc = tdb_rep_listen_open((struct sockaddr_in *)&addr4, sizeof(addr4));
+ if (rc < 0) {
+ if (debugging)
+ applog(LOG_DEBUG,
+ "tdb_rep_listen_open(v4, %u) failed: %s",
+ port, strerror(-rc));
+ } else {
+ rtdb->sockfd4 = rc;
+ event_set(&rtdb->lsev4, rtdb->sockfd4, EV_READ | EV_PERSIST,
+ tdb_conn_event, rtdb);
+ if (event_add(&rtdb->lsev4, NULL) < 0)
+ applog(LOG_ERR, "event_add failed");
+ }
+
+ return 0;
+}
+
+static void rtdb_slave_tcp_event(int fd, short events, void *userdata)
+{
+ struct db_conn *dbc = userdata;
+ struct tablerep *rtdb = dbc->rtdb;
+ struct rep_msg_hdr *hdr;
+ unsigned msgflags;
+ int srcid, dstid;
+ int ctllen, reclen;
+ int len;
+ int rc;
+
+ switch (dbc->lk.state) {
+ case DBC_LOGIN:
+ rc = dbl_expect(&dbc->lk);
+ if (rc < 0)
+ goto out_bad_dbc;
+ if (rc < dbc->lk.explen)
+ return;
+
+ hdr = (struct rep_msg_hdr *) dbc->lk.ibuf;
+ if (dbl_hdr_validate(hdr, rtdb->thisid))
+ goto out_bad_dbc;
+ msgflags = GUINT32_FROM_BE(hdr->flags);
+ if ((msgflags & 0xff) != REP_MSG_LOGOK) {
+ applog(LOG_ERR, "Bad login reply, flags 0x%08x",
+ msgflags);
+ goto out_bad_dbc;
+ }
+ srcid = GUINT16_FROM_BE(hdr->src);
+ dstid = GUINT16_FROM_BE(hdr->dst);
+
+ if (rtdb->thisid == 0) {
+ applog(LOG_INFO, "Assigned local dbid %d", dstid);
+ } else {
+ if (rtdb->thisid != dstid) {
+ /*
+ * Oracle people posted that db won't like this,
+ * but what can we do. At worst, blow away the
+ * local db on slave by hand and let it resync.
+ */
+ applog(LOG_INFO,
+ "Reassigned local dbid from %d to %d",
+ rtdb->thisid, dstid);
+ rtdb->thisid = dstid;
+ }
+ }
+ rtdb->thisid = dstid;
+
+ dbc->lk.state = DBC_OPEN;
+ dbc->lk.cnt = 0;
+ dbc->lk.explen = sizeof(struct rep_msg_hdr);
+
+ if (tdb_slave_login_cb(srcid))
+ goto out_bad_dbc;
+ break;
+ case DBC_OPEN:
+ rc = dbl_expect(&dbc->lk);
+ if (rc < 0)
+ goto out_bad_dbc;
+ if (rc < dbc->lk.explen)
+ return;
+
+ if (dbc->lk.explen == sizeof(struct rep_msg_hdr)) {
+ hdr = (struct rep_msg_hdr *) dbc->lk.ibuf;
+ if (dbl_hdr_validate(hdr, rtdb->thisid))
+ goto out_bad_dbc;
+ msgflags = GUINT32_FROM_BE(hdr->flags);
+ if ((msgflags & 0xff) != REP_MSG_DATA) {
+ applog(LOG_ERR,
+ "Bad data message, flags 0x%08x",
+ msgflags);
+ goto out_bad_dbc;
+ }
+
+ ctllen = GUINT32_FROM_BE(hdr->lenctl);
+ reclen = GUINT32_FROM_BE(hdr->lendata);
+ len = sizeof(struct rep_msg_hdr) + ctllen + reclen;
+ if (dbl_irealloc(&dbc->lk, len) < 0) {
+ applog(LOG_ERR, "No core (%d)", len);
+ goto out_bad_dbc;
+ }
+ dbc->lk.explen = len;
+ } else {
+ if (rtdb_process(dbc, dbc->lk.ibuf))
+ goto out_bad_dbc;
+
+ dbc->lk.state = DBC_OPEN;
+ dbc->lk.cnt = 0;
+ dbc->lk.explen = sizeof(struct rep_msg_hdr);
+ }
+ break;
+ case DBC_DEAD:
+ tdb_slave_disc_cb();
+ break;
+ default:
+ /* P3 */ applog(LOG_INFO, "Event on a unready socket");
+ }
+ return;
+
+ out_bad_dbc:
+ dbc->lk.state = DBC_DEAD;
+ tdb_conn_scrub_cb();
+ return;
+}
+
+static int rtdb_master_login_reply(struct db_conn *dbc, unsigned char *msgbuf)
+{
+ struct tablerep *rtdb = dbc->rtdb;
+ struct rep_msg_hdr *hdr = (struct rep_msg_hdr *) msgbuf;
+ int srclen, dstlen;
+ char *srcname, *dstname;
+ struct db_conn *tmp;
+ struct db_remote *slave;
+ int newid;
+ struct rep_msg_hdr hdrb;
+ unsigned int msgflags;
+ int rc;
+
+ /*
+ * Before proceeding, extract and zero-terminate src and dst names.
+ */
+ dstlen = GUINT32_FROM_BE(hdr->lenctl);
+ dstname = malloc(dstlen + 1);
+ if (!dstname) {
+ applog(LOG_ERR, "No core");
+ return -1;
+ }
+ memcpy(dstname, msgbuf + sizeof(struct rep_msg_hdr), dstlen);
+ dstname[dstlen] = 0;
+
+ srclen = GUINT32_FROM_BE(hdr->lendata);
+ srcname = malloc(srclen + 1);
+ if (!srcname) {
+ applog(LOG_ERR, "No core");
+ free(dstname);
+ return -1;
+ }
+ memcpy(srcname, msgbuf + sizeof(struct rep_msg_hdr) + dstlen, srclen);
+ srcname[srclen] = 0;
+
+ if (dbc->remote) {
+ /* Never happens even with bad clients, our internal problem. */
+ applog(LOG_ERR, "Redone login for slave %s (src %s)",
+ dbc->remote->host, srcname);
+ goto out_err;
+ }
+
+ if (strcmp(srcname, rtdb->thisname) == 0) {
+ applog(LOG_ERR, "Login from aliasing slave %s", srcname);
+ goto out_err;
+ }
+
+ slave = tdb_find_remote_byname(srcname);
+ if (!slave) {
+ applog(LOG_INFO, "Unknown slave \"%s\"", srcname);
+ goto out_err;
+ }
+
+ if (slave->dbid == DBID_NONE) {
+ newid = GUINT16_FROM_BE(hdr->src);
+ if (newid == 0 || newid < DBID_MIN || newid > DBID_MAX) {
+ newid = make_remote_id();
+ }
+ slave->dbid = newid;
+ }
+ if (debugging)
+ applog(LOG_DEBUG, "Link login, slave %s dbid %d",
+ slave->host, slave->dbid);
+
+ /*
+ * Dispose of all existing connections. Our current implementation
+ * provides no security, so it is a proper thing to do. We assume
+ * that the slave knows what it's doing, maybe it detected a loss
+ * of TCP connection that we missed.
+ */
+ list_for_each_entry(tmp, &rtdb->conns, link) {
+ if (tmp->remote == slave)
+ tmp->lk.state = DBC_DEAD;
+ }
+
+ dbc->remote = slave;
+
+ memset(&hdrb, 0, sizeof(hdrb));
+ msgflags = (1 << 28) | (REP_MSG_LOGOK);
+ hdrb.flags = GUINT32_TO_BE(msgflags);
+ hdrb.dst = GUINT16_TO_BE((unsigned short)slave->dbid);
+ hdrb.src = GUINT16_TO_BE((unsigned short)rtdb->thisid);
+
+ rc = write(dbc->lk.fd, &hdrb, sizeof(hdrb));
+ if (rc < 0) {
+ applog(LOG_INFO, "Write error to peer %s: %s", slave->host,
+ strerror(errno));
+ goto out_err;
+ }
+ if (rc < sizeof(hdrb)) {
+ applog(LOG_INFO, "Write short to peer %s: %d", slave->host, rc);
+ goto out_err;
+ }
+
+ return 0;
+
+ out_err:
+ free(srcname);
+ free(dstname);
+ return -1;
+}
+
+static int rtdb_slave_login(struct db_conn *dbc)
+{
+ struct rep_msg_hdr *hdr;
+ unsigned char *msgbuf;
+ unsigned int msgflags;
+ int dstlen, srclen;
+ int len;
+
+ dstlen = strlen(dbc->remote->host);
+ srclen = strlen(dbc->rtdb->thisname);
+ len = sizeof(struct rep_msg_hdr) + dstlen + srclen;
+ msgbuf = malloc(len);
+ if (!msgbuf)
+ return -1;
+
+ hdr = (struct rep_msg_hdr *) msgbuf;
+ // memset(hdr, 0, sizeof(struct rep_msg_hdr)); /* no holes */
+ msgflags = (1 << 28) | (REP_MSG_LOGIN);
+ hdr->flags = GUINT32_TO_BE(msgflags);
+ hdr->lenctl = GUINT32_TO_BE(dstlen);
+ hdr->lendata = GUINT32_TO_BE(srclen);
+ hdr->dst = GUINT16_TO_BE((unsigned short)dbc->remote->dbid);
+ hdr->src = GUINT16_TO_BE((unsigned short)dbc->rtdb->thisid);
+ memcpy(msgbuf + sizeof(struct rep_msg_hdr), dbc->remote->host, dstlen);
+ memcpy(msgbuf + sizeof(struct rep_msg_hdr) + dstlen,
+ dbc->rtdb->thisname, srclen);
+
+ if (write(dbc->lk.fd, msgbuf, len) < len) {
+ dbc->lk.state = DBC_DEAD;
+ free(msgbuf);
+ return -1;
+ }
+ dbc->lk.state = DBC_LOGIN;
+ dbc->lk.explen = sizeof(struct rep_msg_hdr);
+ dbc->lk.cnt = 0;
+ free(msgbuf);
+ return 0;
+}
+
+static int tdb_rep_resolve(struct tablerep *rtdb, int *family,
+ int addrsize, unsigned char *addr, int *addrlen,
+ const char *hostname, unsigned short port)
+{
+ char portstr[15];
+ struct addrinfo hints;
+ struct addrinfo *res, *res0;
+ int rc;
+
+ snprintf(portstr, sizeof(portstr), "%u", port);
+
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = PF_UNSPEC;
+ hints.ai_socktype = SOCK_DGRAM;
+
+ rc = getaddrinfo(hostname, portstr, &hints, &res0);
+ if (rc) {
+ applog(LOG_WARNING, "getaddrinfo(%s:%s) failed: %s",
+ hostname, portstr, gai_strerror(rc));
+ return -1;
+ }
+
+ for (res = res0; res; res = res->ai_next) {
+ if (res->ai_family != AF_INET && res->ai_family != AF_INET6)
+ continue;
+
+ if (res->ai_addrlen > addrsize) /* should not happen */
+ continue;
+
+ memcpy(addr, res->ai_addr, res->ai_addrlen);
+ *addrlen = res->ai_addrlen;
+ *family = res->ai_family;
+
+ freeaddrinfo(res0);
+ return 0;
+ }
+
+ freeaddrinfo(res0);
+
+ applog(LOG_WARNING, "getaddrinfo(%s:%s): nothing suitable",
+ hostname, portstr);
+ return -1;
+}
+
+static int rtdb_rep_connect(struct db_conn *dbc)
+{
+ struct db_link *dbl = &dbc->lk;
+ struct db_remote *master = dbc->remote;
+ int family;
+ unsigned char addr[32];
+ int addrlen;
+ int rc;
+
+ rc = tdb_rep_resolve(dbc->rtdb, &family, sizeof(addr), addr, &addrlen,
+ master->host, master->port);
+ if (rc < 0)
+ return -1;
+
+ rc = socket(family, SOCK_STREAM, 0);
+ if (rc < 0) {
+ applog(LOG_WARNING, "socket: %s", strerror(errno));
+ return -1;
+ }
+ dbl->fd = rc;
+
+ if (connect(dbl->fd, (struct sockaddr *)addr, addrlen)) {
+ applog(LOG_WARNING, "connect(host %s port %u): %s",
+ master->host, master->port, strerror(errno));
+ close(dbl->fd);
+ return -1;
+ }
+
+ if (fcntl(dbl->fd, F_SETFL, O_NONBLOCK) < 0) {
+ applog(LOG_ERR, "fcntl: %s", strerror(errno));
+ close(dbl->fd);
+ return -1;
+ }
+
+ event_set(&dbl->rcev, dbl->fd, EV_READ | EV_PERSIST,
+ rtdb_slave_tcp_event, dbc);
+ if (event_add(&dbl->rcev, NULL) < 0) {
+ applog(LOG_ERR, "event_add failed");
+ close(dbl->fd);
+ return -1;
+ }
+ event_set(&dbl->wrev, dbl->fd, EV_WRITE | EV_PERSIST,
+ rtdb_wr_event, dbl);
+ return 0;
+}
+
+static void __rtdb_fini(struct tablerep *rtdb)
+{
+ struct db_conn *dbc;
+
+ if (rtdb->sockfd4 >= 0) {
+ event_del(&rtdb->lsev4);
+ close(rtdb->sockfd4);
+ rtdb->sockfd4 = -1;
+ }
+ if (rtdb->sockfd6 >= 0) {
+ event_del(&rtdb->lsev6);
+ close(rtdb->sockfd6);
+ rtdb->sockfd6 = -1;
+ }
+
+ while (!list_empty(&rtdb->conns)) {
+ dbc = list_entry(rtdb->conns.next, struct db_conn, link);
+ list_del(&dbc->link);
+ dbc_free(dbc);
+ }
+ rtdb->mdbc = NULL;
+}
+
+/*
+ * return:
+ * -1 - there was an error, things are in disarray, must call __rtdb_fini.
+ * 0 - all is up, may call tdb_init if desired.
+ * 1 - not done yet, just return to dispatch.
+ */
+static int __rtdb_start(struct tablerep *rtdb, bool we_are_master,
+ struct db_remote *rep_master, unsigned short rep_port)
+{
+ struct db_conn *dbc;
+
+ if (we_are_master) {
+ if (rtdb->thisid == DBID_NONE)
+ rtdb->thisid = make_remote_id();
+ if (rtdb_rep_listen(rtdb, rep_port))
+ return -1;
+ } else {
+ if (!rep_master) {
+ applog(LOG_INFO, "No master yet"); /* P3 */
+ return -1;
+ }
+ if (!rtdb->mdbc) {
+ dbc = dbc_alloc(rtdb, rep_master);
+ if (!dbc)
+ return -1;
+ dbc->lk.explen = sizeof(struct rep_msg_hdr);
+ dbc->lk.state = DBC_INIT;
+ list_add_tail(&dbc->link, &rtdb->conns);
+ rtdb->mdbc = dbc;
+ }
+ switch (rtdb->mdbc->lk.state) {
+ case DBC_OPEN:
+ break;
+ case DBC_INIT:
+ if (rtdb_rep_connect(rtdb->mdbc))
+ return -1;
+ if (rtdb_slave_login(rtdb->mdbc))
+ return -1;
+ return 1;
+ case DBC_LOGIN:
+ /* P3 */ applog(LOG_INFO, "start: no answer");
+ return -1;
+ default:
+ /* P3 */ applog(LOG_INFO, "start: confusion (state %d)",
+ rtdb->mdbc->lk.state);
+ return -1;
+ }
+ }
+ return 0;
+}
+
+int rtdb_init(struct tablerep *rtdb, const char *thisname)
+{
+ rtdb->thisname = thisname;
+
+ INIT_LIST_HEAD(&rtdb->conns);
+ rtdb->sockfd4 = -1;
+ rtdb->sockfd6 = -1;
+
+ // rtdb->mdbc = dbc_alloc(rtdb, NULL);
+ // if (!rtdb->mdbc)
+ // return -1;
+ // rtdb->mdbc->lk.explen = sizeof(struct rep_msg_hdr);
+ // rtdb->mdbc->lk.state = DBC_INIT;
+ // list_add_tail(&rtdb->mdbc.link, &rtdb->conns);
+ return 0;
+}
+
+int rtdb_start(struct tablerep *rtdb,
+ const char *db_home,
+ bool we_are_master,
+ struct db_remote *rep_master, unsigned short rep_port,
+ void (*cb)(enum db_event))
+{
+ int rc;
+
+ rc = __rtdb_start(rtdb, we_are_master, rep_master, rep_port);
+ if (rc < 0)
+ goto err_out;
+ if (rc > 0)
+ return 0;
+
+ /*
+ * Note that we only get here if either we're master, or slave
+ * and link is DBC_OPEN. In both cases rtdb->thidid must be set.
+ */
+ if (rtdb->thisid == 0) { /* never happens */
+ applog(LOG_WARNING, "Zero own dbid, master %d", we_are_master);
+ goto err_out;
+ }
+ if (tdb_init(&rtdb->tdb, db_home, NULL, "tabled", true,
+ rtdb->thisid, db4_rep_send, we_are_master, cb)) {
+ goto err_out;
+ }
+ return 0;
+
+err_out:
+ __rtdb_fini(rtdb);
+ return -1;
+}
+
+void rtdb_mc_reset(struct tablerep *rtdb, bool we_are_master,
+ struct db_remote *rep_master, unsigned short rep_port)
+{
+ int rc;
+
+ __rtdb_fini(rtdb);
+ rc = __rtdb_start(rtdb, we_are_master, rep_master, rep_port);
+ if (rc < 0) {
+ /*
+ * If we failed to reconnect immediately, we do not retry.
+ * This is because db4 has its own timeouts, so there's really
+ * no point in doing anything else: we would only interfere.
+ * From now on, rely on CLD to drive the attempts to reconnect.
+ */
+ /* P3 */ applog(LOG_INFO, "failed to reconnect (%d)", rc);
+ }
+}
+
+void rtdb_dbc_scrub(struct tablerep *rtdb)
+{
+ struct db_conn *dbc, *tmp;
+
+ list_for_each_entry_safe(dbc, tmp, &rtdb->conns, link) {
+ if (dbc->lk.state == DBC_DEAD) {
+ /*
+ * This prinout is misleading, since every remote
+ * may have several connections. But how to fix it?
+ */
+ if (dbc->remote) {
+ applog(LOG_INFO, "Closing, peer %s",
+ dbc->remote->host);
+ } else {
+ applog(LOG_INFO, "Closing");
+ }
+ if (dbc == rtdb->mdbc)
+ rtdb->mdbc = NULL;
+ list_del(&dbc->link);
+ dbc_free(dbc);
+ }
+ }
+}
+
+/*
+ * This wants to be both in here and in tdb.c. Problem.
+ */
+int rtdb_restart(struct tablerep *rtdb, bool we_are_master)
+{
+ DB_ENV *dbenv = rtdb->tdb.env;
+ unsigned int rep_flags;
+ int rc;
+
+ rep_flags = we_are_master ? DB_REP_MASTER : DB_REP_CLIENT;
+ rc = dbenv->rep_start(dbenv, NULL, rep_flags);
+ if (rc) {
+ dbenv->err(dbenv, rc, "rep_start(0x%x)", rep_flags);
+ return -1;
+ }
+ return 0;
+}
+
+void rtdb_fini(struct tablerep *rtdb)
+{
+ __rtdb_fini(rtdb);
+ tdb_fini(&rtdb->tdb);
+}
+
diff --git a/server/object.c b/server/object.c
index f8e7b12..3801e94 100644
--- a/server/object.c
+++ b/server/object.c
@@ -39,7 +39,7 @@
static int object_find(DB_TXN *txn, const char *bucket, const char *key,
struct db_obj_ent *pobj)
{
- DB *objs = tdb.objs;
+ DB *objs = tdbrep.tdb.objs;
struct db_obj_key *okey;
size_t alloc_len;
DBT pkey, pval;
@@ -72,7 +72,7 @@ static int object_find(DB_TXN *txn, const char *bucket, const char *key,
static bool __object_del(DB_TXN *txn, const char *bucket, const char *key)
{
- DB *objs = tdb.objs;
+ DB *objs = tdbrep.tdb.objs;
struct db_obj_key *okey;
size_t okey_len;
DBT pkey;
@@ -100,7 +100,7 @@ static bool __object_del(DB_TXN *txn, const char *bucket, const char *key)
bool object_del_acls(DB_TXN *txn, const char *bucket, const char *key)
{
- DB *acls = tdb.acls;
+ DB *acls = tdbrep.tdb.acls;
struct db_acl_key *akey;
size_t alloc_len;
DBT pkey;
@@ -163,8 +163,8 @@ bool object_del(struct client *cli, const char *user,
int rc;
enum errcode err = InternalError;
size_t alloc_len;
- DB_ENV *dbenv = tdb.env;
- DB *objs = tdb.objs;
+ DB_ENV *dbenv = tdbrep.tdb.env;
+ DB *objs = tdbrep.tdb.objs;
struct db_obj_key *okey;
struct db_obj_ent obje;
DBT pkey, pval;
@@ -326,9 +326,9 @@ static bool object_put_end(struct client *cli)
struct db_obj_ent oldobj;
bool delobj;
size_t alloc_len;
- DB_ENV *dbenv = tdb.env;
+ DB_ENV *dbenv = tdbrep.tdb.env;
DBT pkey, pval;
- DB *objs = tdb.objs;
+ DB *objs = tdbrep.tdb.objs;
DB_TXN *txn = NULL;
GByteArray *string_data;
GArray *string_lens;
@@ -786,7 +786,7 @@ static bool object_put_body(struct client *cli, const char *user,
return cli_err(cli, InternalError);
}
- objid = objid_next(&tabled_srv.object_count, &tdb);
+ objid = objid_next(&tabled_srv.object_count, &tdbrep.tdb);
rc = open_chunks(&cli->out_ch, &tabled_srv.all_stor,
cli, objid, content_len);
@@ -865,9 +865,9 @@ static bool object_put_acls(struct client *cli, const char *user,
{
enum errcode err = InternalError;
enum ReqACLC canacl;
- DB_ENV *dbenv = tdb.env;
+ DB_ENV *dbenv = tdbrep.tdb.env;
DB_TXN *txn = NULL;
- DB *objs = tdb.objs;
+ DB *objs = tdbrep.tdb.objs;
char *hdr;
char timestr[64];
int rc;
@@ -1130,7 +1130,7 @@ static bool object_get_body(struct client *cli, const char *user,
bool access_ok, modified = true;
GString *extra_hdr;
size_t alloc_len;
- DB *objs = tdb.objs;
+ DB *objs = tdbrep.tdb.objs;
struct db_obj_key *okey;
struct db_obj_ent *obj = NULL;
DBT pkey, pval;
diff --git a/server/replica.c b/server/replica.c
index ac14cb2..1b5e832 100644
--- a/server/replica.c
+++ b/server/replica.c
@@ -612,8 +612,8 @@ static void rep_scan_verify(struct rep_arg *arg,
static void rep_add_nid(unsigned int klen, struct db_obj_key *key, uint32_t nid)
{
- DB_ENV *db_env = tdb.env;
- DB *db_objs = tdb.objs;
+ DB_ENV *db_env = tdbrep.tdb.env;
+ DB *db_objs = tdbrep.tdb.objs;
DB_TXN *db_txn;
DBT pkey, pval;
struct db_obj_ent *obj;
@@ -749,8 +749,8 @@ static void rep_scan(struct rep_arg *arg)
g_mutex_unlock(kscan_mutex);
memset(&cur, 0, sizeof(struct cursor)); /* enough to construct */
- cur.db_env = tdb.env;
- cur.db_objs = tdb.objs;
+ cur.db_env = tdbrep.tdb.env;
+ cur.db_objs = tdbrep.tdb.objs;
kcnt = 0;
for (;;) {
diff --git a/server/server.c b/server/server.c
index 814afec..8859847 100644
--- a/server/server.c
+++ b/server/server.c
@@ -97,12 +97,15 @@ struct server tabled_srv = {
.config = "/etc/tabled.conf",
};
-struct tabledb tdb;
+struct tablerep tdbrep;
enum {
TT_CMD_DUMP,
TT_CMD_TDBST_MASTER,
- TT_CMD_TDBST_SLAVE
+ TT_CMD_TDBST_SLAVE,
+ TT_CMD_MASTER_LINK_RESET,
+ TT_CMD_LINK_SCRUB,
+ TT_CMDNUM
};
struct compiled_pat patterns[] = {
@@ -114,7 +117,11 @@ struct compiled_pat patterns[] = {
};
static char *state_name_tdb[ST_TDBNUM] = {
- "Init", "Open", "Active", "Master", "Slave"
+ "Init", "Open", "Master", "Slave"
+};
+
+static char *cmd_name_tdb[TT_CMDNUM] = {
+ "Dump", "GoMaster", "GoSlave", "MasterLinkReset", "LinkScrub"
};
static struct {
@@ -340,7 +347,7 @@ static int authcheck(struct http_req *req, char *extra_bucket,
* not match.
*/
- rc = tdb.passwd->get(tdb.passwd, NULL, &key, &val, 0);
+ rc = tdbrep.tdb.passwd->get(tdbrep.tdb.passwd, NULL, &key, &val, 0);
if (rc) {
pass = strdup("");
@@ -350,7 +357,7 @@ static int authcheck(struct http_req *req, char *extra_bucket,
char s[64];
snprintf(s, 64, "get user '%s'", user);
- tdb.passwd->err(tdb.passwd, rc, s);
+ tdbrep.tdb.passwd->err(tdbrep.tdb.passwd, rc, s);
}
} else {
pass = val.data;
@@ -387,8 +394,22 @@ static void stats_signal(int signo)
static void stats_dump(void)
{
- applog(LOG_INFO, "STATE: TDB %s",
- state_name_tdb[tabled_srv.state_tdb]);
+ struct db_remote *rp;
+ GList *tmp;
+
+ applog(LOG_INFO, "TDB: group %s state %s host %s rep_port %d dbid %d%s",
+ tabled_srv.group, state_name_tdb[tabled_srv.state_tdb],
+ tabled_srv.ourhost, tabled_srv.rep_port, tdbrep.thisid,
+ (tabled_srv.mc_delay)? " mc_delay": "");
+ for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) {
+ rp = tmp->data;
+ applog(LOG_INFO, "PN: name %s dbid %d", rp->name, rp->dbid);
+ if (rp->host)
+ applog(LOG_INFO, "PN: host %s port %d",
+ rp->host, rp->port);
+ if (rp == tabled_srv.rep_master)
+ applog(LOG_INFO, "PN (master)");
+ }
applog(LOG_INFO,
"STATS: poll %lu event %lu tcp_accept %lu opt_write %lu",
tabled_srv.stats.poll,
@@ -403,11 +424,17 @@ static void stats_dump(void)
bool stat_status(struct client *cli, GList *content)
{
+ struct db_remote *rp;
+ GList *tmp;
char *str;
+ int rc;
/*
* The loadavg is system dependent, we'll figure it out later.
* On Linux, applications read from /proc/loadavg.
+ *
+ * The listening info duplicates the hostname until we split
+ * the replication identifier from hostname.
*/
if (asprintf(&str,
"<h1>Status</h1>"
@@ -415,11 +442,50 @@ bool stat_status(struct client *cli, GList *content)
tabled_srv.ourhost, tabled_srv.port) < 0)
return false;
content = g_list_append(content, str);
+
if (asprintf(&str,
- "<p>State: TDB %s</p>\r\n",
- state_name_tdb[tabled_srv.state_tdb]) < 0)
+ "<p>TDB: group %s "
+ "state %s host %s rep_port %d dbid %d%s</p>\r\n",
+ tabled_srv.group, state_name_tdb[tabled_srv.state_tdb],
+ tabled_srv.ourhost, tabled_srv.rep_port, tdbrep.thisid,
+ (tabled_srv.mc_delay)? " mc_delay": "") < 0)
return false;
content = g_list_append(content, str);
+
+ if (tabled_srv.rep_remotes) {
+ if (asprintf(&str, "<p>") < 0)
+ return false;
+ content = g_list_append(content, str);
+ for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) {
+ rp = tmp->data;
+ rc = asprintf(&str, "Peer: name %s dbid %d",
+ rp->name, rp->dbid);
+ if (rc < 0)
+ return false;
+ content = g_list_append(content, str);
+ if (rp->host) {
+ rc = asprintf(&str, " host %s port %d",
+ rp->host, rp->port);
+ if (rc < 0)
+ return false;
+ content = g_list_append(content, str);
+ }
+ if (rp == tabled_srv.rep_master) {
+ str = strdup(" (master)");
+ if (!str)
+ return false;
+ content = g_list_append(content, str);
+ }
+ rc = asprintf(&str, "<br />\r\n");
+ if (rc < 0)
+ return false;
+ content = g_list_append(content, str);
+ }
+ if (asprintf(&str, "</p>\r\n") < 0)
+ return false;
+ content = g_list_append(content, str);
+ }
+
if (asprintf(&str,
"<p>Stats: "
"poll %lu event %lu tcp_accept %lu opt_write %lu</p>\r\n"
@@ -1421,7 +1487,7 @@ static void add_chkpt_timer(void)
static void tdb_checkpoint(int fd, short events, void *userdata)
{
- DB_ENV *dbenv = tdb.env;
+ DB_ENV *dbenv = tdbrep.tdb.env;
int rc;
if (debugging)
@@ -1436,29 +1502,50 @@ static void tdb_checkpoint(int fd, short events, void *userdata)
add_chkpt_timer();
}
+static void add_reup_timer(void)
+{
+ static const struct timeval tv = { TABLED_REUP_SEC, 0 };
+
+ if (evtimer_add(&tabled_srv.reup_timer, &tv) < 0)
+ applog(LOG_WARNING, "unable to add reup timer");
+}
+
+static void tdb_reup(int fd, short events, void *userdata)
+{
+
+ if (tabled_srv.state_want == ST_W_MASTER &&
+ tabled_srv.state_tdb == ST_TDB_MASTER) {
+ /*
+ * An upgrade failed, retry.
+ */
+ if (rtdb_restart(&tdbrep, true)) {
+ applog(LOG_WARNING, "Cannot restart to master");
+ add_reup_timer();
+ }
+ }
+}
+
static void tdb_state_cb(enum db_event event)
{
unsigned char cmd;
switch (event) {
case TDB_EV_ELECTED:
- /*
- * Safe to stop ignoring bogus client indication,
- * so unmute us by advancing the state.
- */
- if (tabled_srv.state_tdb == ST_TDB_OPEN)
- tabled_srv.state_tdb = ST_TDB_ACTIVE;
+ /* Just ignore this, we only care for the end state. */
break;
case TDB_EV_CLIENT:
+ /* P3 */ applog(LOG_INFO, "TDB event: slave, state %s", state_name_tdb[tabled_srv.state_tdb]);
+ goto overmsg;
case TDB_EV_MASTER:
+ /* P3 */ applog(LOG_INFO, "TDB event: master, state %s", state_name_tdb[tabled_srv.state_tdb]);
+ overmsg:
/*
* This callback runs on the context of the replication
* manager thread, and calling any of our functions thus
* turns our program into a multi-threaded one. Instead
* we signal the main thread to do the processing.
*/
- if (tabled_srv.state_tdb != ST_TDB_INIT &&
- tabled_srv.state_tdb != ST_TDB_OPEN) {
+ if (tabled_srv.state_tdb != ST_TDB_INIT) {
if (event == TDB_EV_MASTER)
cmd = TT_CMD_TDBST_MASTER;
else
@@ -1472,6 +1559,55 @@ static void tdb_state_cb(enum db_event event)
}
}
+void cld_update_cb(void)
+{
+ switch (tabled_srv.state_want) {
+ case ST_W_MASTER:
+ if (tabled_srv.state_tdb == ST_TDB_MASTER) {
+ ; /* CLD caught up to DB, better late than never */
+ } else if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+ /* CLD tells us to upgrade, do it */
+ if (rtdb_restart(&tdbrep, true)) {
+ applog(LOG_WARNING,
+ "Unable to restart to master");
+ /*
+ * Don't try rtdb_fini here, will end in a hang.
+ * Instead, retry endlessly until it succeeds.
+ */
+ add_reup_timer();
+ }
+ } else {
+ applog(LOG_WARNING, "Want Master while in state %s",
+ state_name_tdb[tabled_srv.state_tdb]);
+ }
+ break;
+ case ST_W_SLAVE:
+ if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+ ; /* all good */
+ } else if (tabled_srv.state_tdb == ST_TDB_MASTER) {
+ /*
+ * OK, this is bad. We lost our CLD session and some
+ * other node went master on us. Even if we downgrade
+ * the database now, some clients may have done some
+ * operations while CLD was bouncing. Complain loudly.
+ */
+ applog(LOG_WARNING,
+ "Downgrading the database,"
+ " data loss is possible");
+ if (rtdb_restart(&tdbrep, false)) {
+ tabled_srv.state_tdb = ST_TDB_INIT;
+ rtdb_fini(&tdbrep);
+ }
+ } else {
+ applog(LOG_WARNING, "Want Slave while in state %s",
+ state_name_tdb[tabled_srv.state_tdb]);
+ }
+ break;
+ default:
+ ;
+ }
+}
+
/*
* Due to the way storage_node management is tightly woven into the
* server, the management of nodes is not in storage.c, which deals
@@ -1485,7 +1621,6 @@ int stor_update_cb(void)
{
int num_up;
struct storage_node *stn;
- unsigned int env_flags;
if (debugging)
applog(LOG_DEBUG, "Know of potential %d storage node(s)",
@@ -1518,15 +1653,13 @@ int stor_update_cb(void)
* We initiate operations even if there's no redundancy in order
* to permit bootstrapping and build-time self-checking.
*/
+/* P3 */ applog(LOG_INFO, "storage updated, TDB state %s", state_name_tdb[tabled_srv.state_tdb]);
if (tabled_srv.state_tdb == ST_TDB_INIT) {
tabled_srv.state_tdb = ST_TDB_OPEN;
-
- env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
- if (tdb_init(&tdb, tabled_srv.tdb_dir, NULL,
- env_flags, "tabled", true,
- tabled_srv.rep_remotes,
- tabled_srv.ourhost, tabled_srv.rep_port,
- tdb_state_cb)) {
+ if (rtdb_start(&tdbrep, tabled_srv.tdb_dir,
+ tabled_srv.state_want == ST_W_MASTER,
+ tabled_srv.rep_master,
+ tabled_srv.rep_port, tdb_state_cb)) {
tabled_srv.state_tdb = ST_TDB_INIT;
applog(LOG_ERR, "Failed to open TDB, limping");
}
@@ -1535,10 +1668,122 @@ int stor_update_cb(void)
* FIXME This is where we should process redundancy decreases.
*/
;
+ } else if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+ if (tabled_srv.state_want == ST_W_MASTER) {
+ if (rtdb_restart(&tdbrep, true)) {
+ applog(LOG_WARNING,
+ "Failed to restart to master");
+ add_reup_timer();
+ }
+ }
}
return num_up;
}
+int tdb_slave_login_cb(int srcid)
+{
+ struct db_remote *master;
+
+ master = tabled_srv.rep_master;
+ if (!master) {
+ applog(LOG_INFO, "No master at login");
+ return -1;
+ }
+ if (master->dbid == 0) {
+ applog(LOG_INFO, "Master dbid %d", srcid);
+ } else {
+ if (master->dbid != srcid) {
+ /*
+ * This is probably a bad news. Perhaps master rebooted
+ * on the other side of the network partition and yet
+ * somehow won a lock in CLD, or something even weirder.
+ * But we don't know.
+ */
+ applog(LOG_INFO,
+ "Master switch from dbid %d to dbid %d",
+ master->dbid, srcid);
+ }
+ }
+ master->dbid = srcid;
+
+ if (tabled_srv.state_tdb == ST_TDB_OPEN) {
+ applog(LOG_INFO, "Established link, master %s dbid %d",
+ master->name, master->dbid);
+ if (tabled_srv.state_want != ST_W_SLAVE) {
+ applog(LOG_ERR, "Unexpected TDB state %s, limping",
+ state_name_tdb[tabled_srv.state_tdb]);
+ rtdb_fini(&tdbrep);
+ tabled_srv.state_tdb = ST_TDB_INIT;
+ return -1;
+ }
+ if (rtdb_start(&tdbrep, tabled_srv.tdb_dir,
+ false,
+ master,
+ tabled_srv.rep_port, tdb_state_cb)) {
+ tabled_srv.state_tdb = ST_TDB_INIT;
+ applog(LOG_ERR, "Failed to open TDB, limping");
+ return -1;
+ }
+ } else if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+ applog(LOG_INFO, "Recovered master connection");
+ } else {
+ applog(LOG_INFO, "Confused about connections");
+ }
+ return 0;
+}
+
+void tdb_slave_disc_cb(void)
+{
+ static const struct timeval tv = { TABLED_MCWAIT_SEC, 0 };
+
+ if (tabled_srv.mc_delay)
+ return;
+ evtimer_add(&tabled_srv.mc_timer, &tv);
+ tabled_srv.mc_delay = true;
+}
+
+static void tdb_mc_delay(int fd, short events, void *userdata)
+{
+ static const unsigned char cmd = TT_CMD_MASTER_LINK_RESET;
+
+ tabled_srv.mc_delay = false;
+ write(tabled_srv.ev_pipe[1], &cmd, 1);
+}
+
+void tdb_conn_scrub_cb(void)
+{
+ unsigned char cmd;
+
+ cmd = TT_CMD_LINK_SCRUB;
+ write(tabled_srv.ev_pipe[1], &cmd, 1);
+}
+
+struct db_remote *tdb_find_remote_byname(const char *name)
+{
+ struct db_remote *rp;
+ GList *tmp;
+
+ for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) {
+ rp = tmp->data;
+ if (strcmp(rp->name, name) == 0)
+ return rp;
+ }
+ return NULL;
+}
+
+struct db_remote *tdb_find_remote_byid(int id)
+{
+ struct db_remote *rp;
+ GList *tmp;
+
+ for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) {
+ rp = tmp->data;
+ if (rp->dbid == id)
+ return rp;
+ }
+ return NULL;
+}
+
static int net_open_socket(int addr_fam, int sock_type, int sock_prot,
int addr_len, void *addr_ptr, bool is_status)
{
@@ -1833,26 +2078,66 @@ static void compile_patterns(void)
}
}
-static void tdb_state_process(enum st_tdb new_state)
+static void tdb_startup(void)
{
unsigned int db_flags;
- if (debugging)
- applog(LOG_DEBUG, "TDB state > %s", state_name_tdb[new_state]);
- if ((new_state == ST_TDB_MASTER || new_state == ST_TDB_SLAVE) &&
- tabled_srv.state_tdb == ST_TDB_ACTIVE) {
+ db_flags = DB_CREATE | DB_THREAD;
+ if (tdb_up(&tdbrep.tdb, db_flags))
+ return;
+ if (objid_init(&tabled_srv.object_count, &tdbrep.tdb)) {
+ tdb_down(&tdbrep.tdb);
+ return;
+ }
+ add_chkpt_timer();
+ rep_start();
+ net_listen_client();
+}
- db_flags = DB_CREATE | DB_THREAD;
- if (tdb_up(&tdb, db_flags))
- return;
+static void tdb_state_process(enum st_tdb new_state)
+{
- if (objid_init(&tabled_srv.object_count, &tdb)) {
- tdb_down(&tdb);
- return;
+ applog(LOG_INFO, "TDB state %s > %s",
+ state_name_tdb[tabled_srv.state_tdb], state_name_tdb[new_state]);
+
+ if (tabled_srv.state_tdb == ST_TDB_OPEN) {
+ if (new_state == ST_TDB_MASTER) {
+ if (tabled_srv.state_want == ST_W_MASTER) {
+ tdb_startup();
+ } else {
+ /*
+ * We want slave if we cannot connect to CLD,
+ * or we cannot lock the master file, which
+ * means that other master may exist.
+ * But the db goes master on us, so
+ * either the other master is dead or we're
+ * misconfigured so DBs cannot talk.
+ * Either way, we should poke db until the
+ * desired result is accomplished. XXX
+ */
+ applog(LOG_INFO, "TDB went Master on us");
+ }
+ } else if (new_state == ST_TDB_SLAVE) {
+ applog(LOG_INFO, "TDB went Slave, so whatever");
+ ;
+ } else {
+ applog(LOG_ERR, "TDB went to unexpected state");
+ }
+ } else if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+ if (new_state == ST_TDB_MASTER) {
+ if (tabled_srv.state_want == ST_W_MASTER) {
+ tdb_startup();
+ } else {
+ /*
+ * This is either a net split or CLD is doing
+ * its timeouts and so we do not want to be
+ * a master yet.
+ */
+ applog(LOG_ERR, "TDB upgraded on us");
+ }
+ } else {
+ applog(LOG_ERR, "TDB is confused");
}
- add_chkpt_timer();
- rep_start();
- net_listen_client();
}
}
@@ -1871,6 +2156,11 @@ static void internal_event(int fd, short events, void *userdata)
abort();
}
+ if (debugging) {
+ applog(LOG_DEBUG, "Context Event %s, TDB state %s",
+ cmd_name_tdb[cmd], state_name_tdb[tabled_srv.state_tdb]);
+ }
+
switch (cmd) {
case TT_CMD_DUMP:
stats_dump();
@@ -1890,6 +2180,15 @@ static void internal_event(int fd, short events, void *userdata)
}
break;
+ case TT_CMD_MASTER_LINK_RESET:
+ rtdb_mc_reset(&tdbrep, tabled_srv.state_want == ST_W_MASTER,
+ tabled_srv.rep_master, tabled_srv.rep_port);
+ break;
+
+ case TT_CMD_LINK_SCRUB:
+ rtdb_dbc_scrub(&tdbrep);
+ break;
+
default:
applog(LOG_WARNING, "%s BUG: command 0x%x", __func__, cmd);
break;
@@ -1905,6 +2204,7 @@ int main (int argc, char *argv[])
INIT_LIST_HEAD(&tabled_srv.all_stor);
INIT_LIST_HEAD(&tabled_srv.write_compl_q);
tabled_srv.state_tdb = ST_TDB_INIT;
+ tabled_srv.rep_next_id = DBID_MIN;
/* isspace() and strcasecmp() consistency requires this */
setlocale(LC_ALL, "C");
@@ -1978,6 +2278,8 @@ int main (int argc, char *argv[])
tabled_srv.evbase_main = event_init();
event_base_rep = event_base_new();
evtimer_set(&tabled_srv.chkpt_timer, tdb_checkpoint, NULL);
+ evtimer_set(&tabled_srv.mc_timer, tdb_mc_delay, NULL);
+ evtimer_set(&tabled_srv.reup_timer, tdb_reup, NULL);
/* set up internal communication pipe */
if (pipe(tabled_srv.ev_pipe) < 0) {
@@ -1991,6 +2293,13 @@ int main (int argc, char *argv[])
goto err_pevt;
}
+ /* late-construct structures with allocations */
+ if (rtdb_init(&tdbrep, tabled_srv.ourhost)) {
+ applog(LOG_WARNING, "rtdb_init");
+ rc = 1;
+ goto err_rtdb;
+ }
+
/* set up server networking */
if (tabled_srv.status_port) {
if (net_open_known(tabled_srv.status_port, true) == 0)
@@ -2000,7 +2309,8 @@ int main (int argc, char *argv[])
if (rc)
goto err_out_net;
- if (cld_begin(tabled_srv.ourhost, tabled_srv.group, verbose) != 0) {
+ if (cld_begin(tabled_srv.ourhost, tabled_srv.group,
+ tabled_srv.rep_name, verbose) != 0) {
rc = 1;
goto err_cld_session;
}
@@ -2023,13 +2333,13 @@ err_cld_session:
err_out_net:
if (tabled_srv.state_tdb == ST_TDB_MASTER ||
tabled_srv.state_tdb == ST_TDB_SLAVE) {
- tdb_down(&tdb);
- tdb_fini(&tdb);
- } else if (tabled_srv.state_tdb == ST_TDB_OPEN ||
- tabled_srv.state_tdb == ST_TDB_ACTIVE) {
- tdb_fini(&tdb);
+ tdb_down(&tdbrep.tdb);
+ rtdb_fini(&tdbrep);
+ } else if (tabled_srv.state_tdb == ST_TDB_OPEN) {
+ rtdb_fini(&tdbrep);
}
-/* err_tdb_init: */
+err_rtdb:
+ event_del(&tabled_srv.pevt);
err_pevt:
close(tabled_srv.ev_pipe[0]);
close(tabled_srv.ev_pipe[1]);
diff --git a/server/tabled.h b/server/tabled.h
index ff419e3..c90511c 100644
--- a/server/tabled.h
+++ b/server/tabled.h
@@ -45,6 +45,8 @@ enum {
TABLED_CHKPT_SEC = 60 * 5, /* secs between db4 chkpt */
TABLED_RESCAN_SEC = 60*3 + 7, /* secs btw key rescans */
+ TABLED_MCWAIT_SEC = 35, /* secs to moderate reconn. */
+ TABLED_REUP_SEC = 35, /* secs to retry rtdb_restart */
CHUNK_REBOOT_TIME = 3*60, /* secs to declare chunk dead */
@@ -200,8 +202,12 @@ struct client {
char req_buf[CLI_REQ_BUF_SZ]; /* input buffer */
};
+enum st_want {
+ ST_W_INIT, ST_W_MASTER, ST_W_SLAVE
+};
+
enum st_tdb {
- ST_TDB_INIT, ST_TDB_OPEN, ST_TDB_ACTIVE, ST_TDB_MASTER, ST_TDB_SLAVE,
+ ST_TDB_INIT, ST_TDB_OPEN, ST_TDB_MASTER, ST_TDB_SLAVE,
ST_TDBNUM
};
@@ -218,6 +224,17 @@ struct server_stats {
unsigned long max_write_buf;
};
+#define DBID_NONE 0
+#define DBID_MIN 2
+#define DBID_MAX 105
+
+struct db_remote { /* other DB nodes */
+ char *name; /* do not resolve as a host */
+ char *host;
+ unsigned short port;
+ int dbid; /* signed in db4, traditional */
+};
+
struct listen_cfg {
/* bool encrypt; */
/* char *host; */
@@ -233,6 +250,8 @@ struct server {
int ev_pipe[2];
struct event pevt;
struct list_head write_compl_q; /* list of done writes */
+ bool mc_delay;
+ struct event mc_timer;
char *config; /* config file (static) */
@@ -242,6 +261,7 @@ struct server {
char *port_file;
char *chunk_user; /* username for stc_new */
char *chunk_key; /* key for stc_new */
+ char *rep_name; /* db4 replication name */
unsigned short rep_port; /* db4 replication port */
char *status_port; /* status webserver */
char *group; /* our group (both T and Ch) */
@@ -249,12 +269,16 @@ struct server {
char *ourhost; /* use this if DB master */
struct database *db; /* database handle */
GList *rep_remotes;
+ struct db_remote *rep_master; /* if we're slave */
+ int rep_next_id;
+ struct event reup_timer;
GList *sockets;
struct list_head all_stor; /* struct storage_node */
int num_stor; /* number of storage_node's */
uint64_t object_count;
+ enum st_want state_want;
enum st_tdb state_tdb;
enum st_net state_net;
@@ -263,7 +287,55 @@ struct server {
struct server_stats stats; /* global statistics */
};
-extern struct tabledb tdb;
+/*
+ * Low-level channel, for both sides.
+ *
+ * The combined link state confuses session (e.g. login) and the framing, which
+ * is not pretty but works. At least we have a separate link-state struct.
+ *
+ * In a settled state, db_conn corresponds 1:1 to db_remote, but
+ * it's not necesserily so when connections are being established.
+ */
+enum dbc_state { DBC_INIT, DBC_LOGIN, DBC_OPEN, DBC_DEAD };
+
+struct db_link {
+ int fd;
+ enum dbc_state state;
+
+ bool writing;
+ struct event wrev; /* when writing */
+ unsigned char *obuf;
+ int obuflen;
+ int done, togo;
+
+ struct event rcev; /* whenever fd >= 0 */
+ unsigned char *ibuf;
+ int ibuflen; /* currently allocated ibuf */
+ int cnt; /* currently in ibuf */
+ int explen; /* expected length */
+};
+
+struct db_conn { /* a connection with other DB node */
+ struct tablerep *rtdb;
+ struct db_remote *remote;
+ struct list_head link;
+
+ struct db_link lk;
+};
+
+struct tablerep {
+ struct tabledb tdb;
+ const char *thisname;
+ int thisid;
+
+ int sockfd4, sockfd6;
+ struct event lsev4, lsev6;
+ struct list_head conns; // struct db_conn
+
+ struct db_conn *mdbc;
+};
+
+extern struct tablerep tdbrep;
/* bucket.c */
extern bool has_access(const char *user, const char *bucket, const char *key,
@@ -295,7 +367,8 @@ extern void cli_in_end(struct client *cli);
/* cldu.c */
extern void cld_init(void);
-extern int cld_begin(const char *fqdn, const char *group, int verbose);
+extern int cld_begin(const char *fqdn, const char *group, const char *name,
+ int verbose);
extern void cldu_add_host(const char *host, unsigned int port);
extern void cld_end(void);
@@ -332,7 +405,13 @@ extern bool cli_write_start(struct client *cli);
extern bool cli_write_run_compl(void);
extern int cli_req_avail(struct client *cli);
extern void applog(int prio, const char *fmt, ...);
+extern void cld_update_cb(void);
extern int stor_update_cb(void);
+extern int tdb_slave_login_cb(int srcid);
+extern void tdb_slave_disc_cb(void);
+extern void tdb_conn_scrub_cb(void);
+extern struct db_remote *tdb_find_remote_byname(const char *name);
+extern struct db_remote *tdb_find_remote_byid(int id);
/* status.c */
extern bool stat_evt_http_req(struct client *cli, unsigned int events);
@@ -374,4 +453,16 @@ extern void rep_start(void);
extern void rep_stats(void);
extern bool rep_status(struct client *cli, GList *content);
+/* metarep.c */
+extern int rtdb_init(struct tablerep *rtdb, const char *thishost);
+extern int rtdb_start(struct tablerep *rtdb, const char *db_home,
+ bool we_are_master,
+ struct db_remote *rep_master, unsigned short rep_port,
+ void (*cb)(enum db_event));
+extern void rtdb_mc_reset(struct tablerep *rtdb, bool we_are_master,
+ struct db_remote *rep_master, unsigned short rep_port);
+extern void rtdb_dbc_scrub(struct tablerep *rtdb);
+extern int rtdb_restart(struct tablerep *rtdb, bool we_are_master);
+extern void rtdb_fini(struct tablerep *rtdb);
+
#endif /* __TABLED_H__ */
diff --git a/server/tdbadm.c b/server/tdbadm.c
index 86fa4b3..4bd26cc 100644
--- a/server/tdbadm.c
+++ b/server/tdbadm.c
@@ -45,11 +45,10 @@ enum various_modes {
static int mode_adm;
static unsigned long invalid_lines;
static char *tdb_dir;
-static unsigned short rep_port;
static char *config = "/etc/tabled.conf";
-static char *ourhost;
static struct tabledb tdb;
+static bool tdb_is_master;
const char *argp_program_version = PACKAGE_VERSION;
@@ -110,7 +109,6 @@ static void cfg_elm_end(GMarkupParseContext *context,
{
struct config_context *cc = user_data;
struct stat statb;
- int n;
if (!strcmp(element_name, "TDB") && cc->text) {
if (!tdb_dir) {
@@ -134,25 +132,6 @@ static void cfg_elm_end(GMarkupParseContext *context,
cc->text = NULL;
}
- else if (!strcmp(element_name, "ForceHost") && cc->text) {
- free(ourhost);
- ourhost = cc->text;
- cc->text = NULL;
- }
-
- else if (!strcmp(element_name, "TDBRepPort") && cc->text) {
- n = strtol(cc->text, NULL, 10);
- if (n <= 0 || n >= 65536) {
- fprintf(stderr, "warning: "
- "TDBRepPort '%s' invalid, ignoring", cc->text);
- free(cc->text);
- cc->text = NULL;
- return;
- }
- rep_port = n;
- free(cc->text);
- cc->text = NULL;
- }
}
static bool str_n_isspace(const char *s, size_t n)
@@ -198,8 +177,6 @@ static void read_config(void)
memset(&ctx, 0, sizeof(struct config_context));
- rep_port = 8083;
-
if (!g_file_get_contents(config, &text, &len, NULL)) {
fprintf(stderr, "failed to read config file %s\n", config);
exit(1);
@@ -603,10 +580,15 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
return 0;
}
+static void tdb_state_cb(enum db_event event)
+{
+ if (event == TDB_EV_MASTER)
+ tdb_is_master = true;
+}
+
int main(int argc, char *argv[])
{
- char hostname[64];
- unsigned int env_flags, db_flags;
+ unsigned int db_flags;
error_t aprc;
int rc = 1;
@@ -621,21 +603,12 @@ int main(int argc, char *argv[])
if (!tdb_dir)
die("no tdb dir (-t) specified\n");
- if (ourhost)
- strcpy(hostname, ourhost);
- else if (gethostname(hostname, sizeof(hostname)) < 0) {
- fprintf(stderr, "gethostname failed: %s\n", strerror(errno));
- return 1;
- }
-
- env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
- if (tdb_init(&tdb, tdb_dir, NULL, env_flags,
- "tdbadm", false, NULL, hostname, rep_port, NULL))
+ if (tdb_init(&tdb, tdb_dir, NULL, "tdbadm", false,
+ 0, NULL, true, tdb_state_cb))
goto err_dbinit;
- /* Usually takes about 12s */
- /* FIXME don't peek into private parts of tdb struct, use state_cb */
- while (!tdb.is_master)
+ /* Usually takes about 12s, if vote is involved. */
+ while (!tdb_is_master)
sleep(2);
db_flags = DB_CREATE | DB_THREAD;
^ permalink raw reply related [flat|nested] 5+ messages in thread
* Re: [tabled patch 3/3] Fix metadata replication
2010-08-10 21:19 ` Pete Zaitcev
@ 2010-08-10 22:43 ` Jeff Garzik
2010-08-11 0:25 ` Pete Zaitcev
0 siblings, 1 reply; 5+ messages in thread
From: Jeff Garzik @ 2010-08-10 22:43 UTC (permalink / raw)
To: Pete Zaitcev; +Cc: Project Hail List
On 08/10/2010 05:19 PM, Pete Zaitcev wrote:
> Unfortunately this is getting a little behind, because I started
> working on the tests and they require some changes (e.g. the listening
> host and port may be unknown at the time MASTER file is locked).
What does that mean -- I just applied the patch, should I wait for
further updates before rolling a tarball?
Jeff
^ permalink raw reply [flat|nested] 5+ messages in thread
* Re: [tabled patch 3/3] Fix metadata replication
2010-08-10 22:43 ` Jeff Garzik
@ 2010-08-11 0:25 ` Pete Zaitcev
0 siblings, 0 replies; 5+ messages in thread
From: Pete Zaitcev @ 2010-08-11 0:25 UTC (permalink / raw)
To: Jeff Garzik; +Cc: Project Hail List
On Tue, 10 Aug 2010 18:43:37 -0400
Jeff Garzik <jeff@garzik.org> wrote:
> On 08/10/2010 05:19 PM, Pete Zaitcev wrote:
> > Unfortunately this is getting a little behind, because I started
> > working on the tests and they require some changes (e.g. the listening
> > host and port may be unknown at the time MASTER file is locked).
>
> What does that mean -- I just applied the patch, should I wait for
> further updates before rolling a tarball?
No, please don't. It's going to take all of today and
probably tomorrow too... Perhaps even more.
I meant to say, if you would not take this, I'll have updates soon.
But with "git pull" it really does not matter to me, I can easily
base any new patches on the current origin HEAD.
-- Pete
^ permalink raw reply [flat|nested] 5+ messages in thread
end of thread, other threads:[~2010-08-11 0:25 UTC | newest]
Thread overview: 5+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2010-08-06 3:40 [tabled patch 3/3] Fix metadata replication Pete Zaitcev
2010-08-10 19:14 ` Jeff Garzik
2010-08-10 21:19 ` Pete Zaitcev
2010-08-10 22:43 ` Jeff Garzik
2010-08-11 0:25 ` Pete Zaitcev
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.