* Re: [tabled patch 3/3] Fix metadata replication
2010-08-10 19:14 ` Jeff Garzik
@ 2010-08-10 21:19 ` Pete Zaitcev
2010-08-10 22:43 ` Jeff Garzik
0 siblings, 1 reply; 5+ messages in thread
From: Pete Zaitcev @ 2010-08-10 21:19 UTC (permalink / raw)
To: Jeff Garzik; +Cc: Project Hail List, zaitcev
On Tue, 10 Aug 2010 15:14:19 -0400
Jeff Garzik <jeff@garzik.org> wrote:
> On 08/05/2010 11:40 PM, Pete Zaitcev wrote:
> > The metadata replication in tabled nominally existed, but did not
> > worked. There were a couple of small bugs (such as an attempt to
> > boot directly into Slave state would lead to a hang). However, the
> > biggest problem was how the identity of nodes in Replication Manager
> > API had to be the same as hostname. When doing so, repmgr code
> > used the hostname to bind instead of a wildcard socket. But when
> > doing so, on any stock Fedora or RHEL system it would end listening
> > on loopback only, because the /etc/hosts aliased the hostname to
> > loopback address. Thus running any replication required addition
> > host configuration that can cause any kind of unexpected consequences.
> > In addition it's impossible to run two nodes on one host for testing.
> >
> > This patch does away with the Replication Manager and uses Base API
> > instead. This way, issues with host aliasing are addressed, and
> > the state transitions occur much faster because there is no voting.
> >
> > Note that the provision is added to run peers on the same host,
> > using a configuration clause TDBRepName. I was unable to come up with
> > a reliable way to make persistent, nonconflicting identifiers that
> > would replace hostnames. Fortunately, this should only be used
> > for build tests, where we probably can live with it.
> >
> > The resulting replication feature was tested to work. Not sure if
> > it is enough to trust it with one's data, but it's better than before.
> >
> > Signed-off-by: Pete Zaitcev <zaitcev@redhat.com>
> >
> > ---
> > doc/etc.tabled.conf | 8
> > doc/setup.txt | 13 +
> > include/tdb.h | 15 -
> > lib/tdb.c | 130 ++++++-------
> > server/Makefile.am | 2
> > server/bucket.c | 34 +--
> > server/cldu.c | 416 ++++++++++++++++++++++++++++++++++++------
> > server/config.c | 10 +
> > server/object.c | 22 +-
> > server/replica.c | 8
> > server/server.c | 404 ++++++++++++++++++++++++++++++++++++----
> > server/tabled.h | 97 +++++++++
> > server/tdbadm.c | 51 +----
> > 13 files changed, 963 insertions(+), 247 deletions(-)
>
> Including metarep.c would be helpful ;-)
>
> Will wait on release for this...
Sorry!
Unfortunately this is getting a little behind, because I started
working on the tests and they require some changes (e.g. the listening
host and port may be unknown at the time MASTER file is locked).
---
doc/etc.tabled.conf | 8
doc/setup.txt | 13
include/tdb.h | 15
lib/tdb.c | 130 ++--
server/Makefile.am | 2
server/bucket.c | 34 -
server/cldu.c | 416 ++++++++++++--
server/config.c | 10
server/metarep.c | 1245 ++++++++++++++++++++++++++++++++++++++++++
server/object.c | 22
server/replica.c | 8
server/server.c | 404 ++++++++++++-
server/tabled.h | 97 +++
server/tdbadm.c | 51 -
14 files changed, 2208 insertions(+), 247 deletions(-)
diff --git a/doc/etc.tabled.conf b/doc/etc.tabled.conf
index 22d20a7..c3b1d1d 100644
--- a/doc/etc.tabled.conf
+++ b/doc/etc.tabled.conf
@@ -13,12 +13,12 @@
<!--
One group per DB, don't skimp on groups. Also, make sure the replication
- ports do not conflict when you make hosts to host several groups.
- Unfortunately, the diagnostics are not very good if they do.
- Most likely you'll see database corruption in such cases.
+ ports do not conflict when you make boxes to host several groups or use
+ replication instances iwth TDBRepName.
-->
<Group>ultracart2</Group>
-<TDB>/path/tabled/tdb</TDB>
+<TDB>/path/tabled-uc2/</TDB> <!-- mkdir -p /path/tabled-uc2 -->
+<!-- <TDBRepName>12345.my_local_node_name.example.com</TDBRepName> -->
<TDBRepPort>8083</TDBRepPort>
<!--
diff --git a/doc/setup.txt b/doc/setup.txt
index ac0dfb0..c7a4c6a 100644
--- a/doc/setup.txt
+++ b/doc/setup.txt
@@ -15,7 +15,9 @@ _cld._udp.phx2.ex.com has SRV record 10 50 8081 maika.phx2.ex.com.
Also, make sure that your hostname has a domain. We don't want to search
for CLD in the world-wide DNS root, do we?
- Make sure CLD is up (run "cldcli" to verify).
+ Once you know that CLD is running, verify that tabled can talk to
+ it by running "cldcli". UDP traffic to be allowed for port 8081 or
+ other port as specified in the SRV record.
*) Another thing to set up in DNS is a wildcard host for the system where
tabled will run. Unlike the SRV records of CLD, this is optional, but
@@ -30,6 +32,10 @@ emus3 IN A 192.168.128.9
All examples on Google say FQDN is required, and most presume aliasing
of A and AAAA records, but BIND 9 eats the above fine.
+*) Speaking of FQDN, it is possible to force tabled to use a non-default
+ hostname with ForceHost tag. In practice this is only useful when
+ the DNS is broken.
+
*) Copy configuration file from doc/etc.tabled.conf to /etc/tabled.conf
and edit to suit (see configurable items below). Notice that the file
looks like XML, but is not really. In particular, names of elements are
@@ -53,6 +59,11 @@ emus3 IN A 192.168.128.9
Group name defaults to "default", so you can leave this element unset,
but don't do it. Any name, even "qwerty", is better than the default.
+*) In each group, tabled uses its hostname to identify itself. However,
+ if you ever wish to run two tabled processes that serve the same group,
+ it can be accomplished by setting TDBRepName. N.B.: A loss of power for
+ the host will knock out all of them, so never use this in production.
+
*) Select the port to listen, if desired. This is done using the <Listen>
element:
diff --git a/include/tdb.h b/include/tdb.h
index 8895704..ff3b4b5 100644
--- a/include/tdb.h
+++ b/include/tdb.h
@@ -109,15 +109,12 @@ struct tabledb {
DB *oids; /* object ID db */
};
-struct db_remote { /* remotes for tdb_init */
- char *host;
- unsigned short port;
-};
-
-extern int tdb_init(struct tabledb *tdb, const char *home, const char *pass,
- unsigned int env_flags, const char *errpfx, bool do_syslog,
- GList *remotes, char *rep_host, unsigned short rep_port,
- void (*cb)(enum db_event));
+extern int tdb_init(struct tabledb *tdb, const char *db_home,
+ const char *db_password, const char *errpfx, bool do_syslog,
+ int rep_our_id,
+ int (*rep_send)(DB_ENV *dbenv, const DBT *ctl, const DBT *rec,
+ const DB_LSN *lsnp, int envid, uint32_t flags),
+ bool we_are_master, void (*cb)(enum db_event));
extern int tdb_up(struct tabledb *tdb, unsigned int open_flags);
extern void tdb_down(struct tabledb *tdb);
extern void tdb_fini(struct tabledb *tdb);
diff --git a/lib/tdb.c b/lib/tdb.c
index bc5e50a..29a18f0 100644
--- a/lib/tdb.c
+++ b/lib/tdb.c
@@ -1,6 +1,6 @@
/*
- * Copyright 2008-2009 Red Hat, Inc.
+ * Copyright 2008-2010 Red Hat, Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -148,35 +148,15 @@ err_out:
return -EIO;
}
-static int add_remote_sites(DB_ENV *dbenv, GList *remotes, int *nsites)
-{
- int rc;
- struct db_remote *rp;
- GList *tmp;
-
- *nsites = 0;
- for (tmp = remotes; tmp; tmp = tmp->next) {
- rp = tmp->data;
-
- rc = dbenv->repmgr_add_remote_site(dbenv, rp->host, rp->port,
- NULL, 0);
- if (rc) {
- dbenv->err(dbenv, rc,
- "dbenv->add.remote.site host %s port %u",
- rp->host, rp->port);
- return rc;
- }
- (*nsites)++;
- }
-
- return 0;
-}
-
static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
{
struct tabledb *tdb = dbenv->app_private;
switch (event) {
+ case DB_EVENT_PANIC:
+ dbenv->errx(dbenv, "PANIC event is reported, exiting");
+ exit(2);
+ break;
case DB_EVENT_REP_CLIENT:
tdb->is_master = false;
if (tdb->state_cb)
@@ -191,6 +171,14 @@ static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
if (tdb->state_cb)
(*tdb->state_cb)(TDB_EV_ELECTED);
break;
+ case DB_EVENT_REP_NEWMASTER:
+ dbenv->errx(dbenv, "New master is reported: %d",
+ *(int *)event_info);
+ /* XXX Need to verify that it's the same master as before. */
+ break;
+ case DB_EVENT_REP_STARTUPDONE:
+ dbenv->errx(dbenv, "Client start-up complete");
+ break;
default:
/* do nothing */
break;
@@ -202,15 +190,18 @@ static void db4_event(DB_ENV *dbenv, u_int32_t event, void *event_info)
* db_password, cb can be NULL
*/
int tdb_init(struct tabledb *tdb, const char *db_home, const char *db_password,
- unsigned int env_flags, const char *errpfx, bool do_syslog,
- GList *remotes, char *rep_host, unsigned short rep_port,
+ const char *errpfx, bool do_syslog, int rep_ourid,
+ int (*rep_send)(DB_ENV *dbenv, const DBT *ctl, const DBT *rec,
+ const DB_LSN *lsnp, int envid, uint32_t flags),
+ bool we_are_master,
void (*cb)(enum db_event))
{
- int nsites;
+ unsigned int env_flags;
+ unsigned int rep_flags;
int rc;
DB_ENV *dbenv;
- tdb->is_master = false;
+ tdb->is_master = we_are_master;
tdb->home = db_home;
tdb->state_cb = cb;
@@ -258,12 +249,6 @@ int tdb_init(struct tabledb *tdb, const char *db_home, const char *db_password,
tdb->keyed = true;
}
- rc = dbenv->repmgr_set_local_site(dbenv, rep_host, rep_port, 0);
- if (rc) {
- dbenv->err(dbenv, rc, "repmgr_set_local_site");
- goto err_out;
- }
-
rc = dbenv->set_event_notify(dbenv, db4_event);
if (rc) {
dbenv->err(dbenv, rc, "set_event_notify");
@@ -283,42 +268,65 @@ int tdb_init(struct tabledb *tdb, const char *db_home, const char *db_password,
// goto err_out;
// }
- rc = dbenv->rep_set_priority(dbenv, 100);
- if (rc) {
- dbenv->err(dbenv, rc, "rep_set_priority");
- goto err_out;
- }
+ if (rep_send) {
+ rc = dbenv->rep_set_transport(dbenv, rep_ourid, rep_send);
+ if (rc) {
+ dbenv->err(dbenv, rc, "rep_set_transport");
+ goto err_out;
+ }
- /* init DB transactional environment, stored in directory db_home */
- env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
- env_flags |= DB_INIT_TXN | DB_INIT_REP;
- rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
- if (rc) {
- dbenv->err(dbenv, rc, "open(dbenv)");
- goto err_out;
- }
+ // /*
+ // * Fix the derbies. This is the only way, since passing of
+ // * DB_REP_MASTER to rep_start() after a failover will end in:
+ // * "DB_REP_UNAVAIL: Unable to elect a master" (and a hang).
+ // */
+ // rc = dbenv->rep_set_priority(dbenv, we_are_master ? 100 : 10);
+ // if (rc) {
+ // dbenv->err(dbenv, rc, "rep_set_priority");
+ // goto err_out;
+ // }
+
+ env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
+ env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
+ env_flags |= DB_INIT_TXN | DB_INIT_REP;
+ rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
+ if (rc) {
+ dbenv->err(dbenv, rc, "open rep");
+ goto err_out;
+ }
- rc = add_remote_sites(dbenv, remotes, &nsites);
- if (rc)
- goto err_out;
+ rep_flags = we_are_master ? DB_REP_MASTER : DB_REP_CLIENT;
+ rc = dbenv->rep_start(dbenv, NULL, rep_flags);
+ if (rc) {
+ dbenv->err(dbenv, rc, "rep_start");
+ goto err_out;
+ }
- // rc = dbenv->rep_set_nsites(dbenv, nsites + 1);
- // if (rc) {
- // dbenv->err(dbenv, rc, "rep_set_nsites");
- // goto err_out;
- // }
+ } else {
+ env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
+ env_flags |= DB_INIT_LOG | DB_INIT_LOCK | DB_INIT_MPOOL;
+ env_flags |= DB_INIT_TXN;
+ rc = dbenv->open(dbenv, db_home, env_flags, S_IRUSR | S_IWUSR);
+ if (rc) {
+ dbenv->err(dbenv, rc, "open norep");
+ goto err_out;
+ }
- rc = dbenv->repmgr_start(dbenv, 2, DB_REP_ELECTION);
- if (rc) {
- dbenv->err(dbenv, rc, "repmgr_start");
- goto err_out;
+ /* XXX rip this out from tdbadm.c */
+ /*
+ * The db4 only delivers callbacks if replication was ordered.
+ * Since we force-set master, we ought to deliver them here
+ * for the universal code to work as if a master was elected.
+ */
+ if (cb)
+ (*cb)(we_are_master ? TDB_EV_MASTER : TDB_EV_CLIENT);
}
return 0;
err_out:
dbenv->close(dbenv, 0);
- return rc;
+ return -1;
}
/*
diff --git a/server/Makefile.am b/server/Makefile.am
index 6397245..5b53a0a 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -4,7 +4,7 @@ INCLUDES = -I$(top_srcdir)/include @GLIB_CFLAGS@ @HAIL_CFLAGS@
sbin_PROGRAMS = tabled tdbadm
tabled_SOURCES = tabled.h \
- bucket.c cldu.c config.c object.c replica.c \
+ bucket.c cldu.c config.c metarep.c object.c replica.c \
server.c status.c storage.c storparse.c util.c
tabled_LDADD = ../lib/libtdb.a \
@HAIL_LIBS@ @PCRE_LIBS@ @GLIB_LIBS@ \
diff --git a/server/bucket.c b/server/bucket.c
index a95d23e..eb03e03 100644
--- a/server/bucket.c
+++ b/server/bucket.c
@@ -43,11 +43,11 @@ bool has_access(const char *user, const char *bucket, const char *key,
size_t alloc_len, key_len = 0;
struct db_acl_key *acl_key;
struct db_acl_ent *acl;
- DB_ENV *dbenv = tdb.env;
+ DB_ENV *dbenv = tdbrep.tdb.env;
DB_TXN *txn = NULL;
DBT pkey, pval;
DBC *cur = NULL;
- DB *acls = tdb.acls;
+ DB *acls = tdbrep.tdb.acls;
if (user == NULL)
user = DB_ACL_ANON;
@@ -132,7 +132,7 @@ err_out:
static int add_access_user(DB_TXN *txn, const char *bucket, const char *key,
const char *user, const char *perms)
{
- DB *acls = tdb.acls;
+ DB *acls = tdbrep.tdb.acls;
int key_len;
int acl_len;
struct db_acl_ent *acl;
@@ -203,8 +203,8 @@ bool service_list(struct client *cli, const char *user)
bool rcb;
DB_TXN *txn = NULL;
DBC *cur = NULL;
- DB_ENV *dbenv = tdb.env;
- DB *bidx = tdb.buckets_idx;
+ DB_ENV *dbenv = tdbrep.tdb.env;
+ DB *bidx = tdbrep.tdb.buckets_idx;
DBT skey, pkey, pval;
if (asprintf(&s,
@@ -348,7 +348,7 @@ bool bucket_valid(const char *bucket)
static int bucket_find(DB_TXN *txn, const char *bucket, char *owner,
int owner_len)
{
- DB *buckets = tdb.buckets;
+ DB *buckets = tdbrep.tdb.buckets;
DBT key, val;
struct db_bucket_ent ent;
int rc;
@@ -455,9 +455,9 @@ bool bucket_add(struct client *cli, const char *user, const char *bucket)
struct db_bucket_ent ent;
bool setacl; /* is ok to put pre-existing bucket */
enum ReqACLC canacl;
- DB *buckets = tdb.buckets;
- DB *acls = tdb.acls;
- DB_ENV *dbenv = tdb.env;
+ DB *buckets = tdbrep.tdb.buckets;
+ DB *acls = tdbrep.tdb.acls;
+ DB_ENV *dbenv = tdbrep.tdb.env;
DB_TXN *txn = NULL;
DBT key, val;
@@ -589,11 +589,11 @@ bool bucket_del(struct client *cli, const char *user, const char *bucket)
enum errcode err = InternalError;
int rc;
struct db_bucket_ent ent;
- DB_ENV *dbenv = tdb.env;
+ DB_ENV *dbenv = tdbrep.tdb.env;
DB_TXN *txn = NULL;
- DB *buckets = tdb.buckets;
- DB *acls = tdb.acls;
- DB *objs = tdb.objs;
+ DB *buckets = tdbrep.tdb.buckets;
+ DB *acls = tdbrep.tdb.acls;
+ DB *objs = tdbrep.tdb.objs;
DBC *cur = NULL;
DBT key, val;
char structbuf[sizeof(struct db_acl_key) + 32];
@@ -922,9 +922,9 @@ static bool bucket_list_keys(struct client *cli, const char *user,
size_t pfx_len;
struct bucket_list_info bli;
bool rcb;
- DB_ENV *dbenv = tdb.env;
+ DB_ENV *dbenv = tdbrep.tdb.env;
DB_TXN *txn = NULL;
- DB *objs = tdb.objs;
+ DB *objs = tdbrep.tdb.objs;
DBC *cur = NULL;
DBT pkey, pval;
struct db_obj_key *obj_key;
@@ -1159,8 +1159,8 @@ bool access_list(struct client *cli, const char *bucket, const char *key,
GHashTable *param;
enum errcode err = InternalError;
- DB_ENV *dbenv = tdb.env;
- DB *acls = tdb.acls;
+ DB_ENV *dbenv = tdbrep.tdb.env;
+ DB *acls = tdbrep.tdb.acls;
int alloc_len;
char owner[64];
GList *res;
diff --git a/server/cldu.c b/server/cldu.c
index 5f3631b..45a6a83 100644
--- a/server/cldu.c
+++ b/server/cldu.c
@@ -35,6 +35,8 @@
#define ALIGN8(n) ((8 - ((n) & 7)) & 7)
+#define MASTER_FILE "MASTER"
+
struct chunk_node {
struct list_head link;
char name[65];
@@ -63,18 +65,22 @@ struct cld_session {
int actx; /* Active host cldv[actx] */
struct cld_host cldv[N_CLD];
+ char *thisname;
char *thisgroup;
char *thishost;
char *cfname; /* /tabled-group directory */
struct ncld_fh *cfh; /* /tabled-group directory, keep open for scan */
- char *ffname; /* /tabled-group/thishost */
- struct ncld_fh *ffh; /* /tabled-group/thishost, keep open for lock */
+ char *ffname; /* /tabled-group/thisname */
+ struct ncld_fh *ffh; /* /tabled-group/thisname, keep open for lock */
+ char *mfname; /* /tabled-group/MASTER */
+ struct ncld_fh *mfh; /* /tabled-group/MASTER, keep open for lock */
char *xfname; /* /chunk-GROUP directory */
struct list_head chunks; /* found in xfname, struct chunk_node */
};
static int cldu_set_cldc(struct cld_session *sp, int newactive);
+static int scan_peers(struct cld_session *sp);
static int scan_chunks(struct cld_session *sp);
static void next_chunk(struct cld_session *sp, struct chunk_node *np);
static void add_remote(const char *name);
@@ -113,13 +119,17 @@ static int cldu_nextactive(struct cld_session *sp)
* chunkservers that it uses, so this function only takes one group argument.
*/
static int cldu_setgroup(struct cld_session *sp,
- const char *thisgroup, const char *thishost)
+ const char *thisgroup, const char *thishost,
+ const char *thisname)
{
char *mem;
if (thisgroup == NULL) {
thisgroup = "default";
}
+ if (thisname == NULL) {
+ thisname = thishost;
+ }
sp->thisgroup = strdup(thisgroup);
if (!sp->thisgroup)
@@ -127,15 +137,22 @@ static int cldu_setgroup(struct cld_session *sp,
sp->thishost = strdup(thishost);
if (!sp->thishost)
goto err_oom;
+ sp->thisname = strdup(thisname);
+ if (!sp->thisname)
+ goto err_oom;
if (asprintf(&mem, "/tabled-%s", thisgroup) == -1)
goto err_oom;
sp->cfname = mem;
- if (asprintf(&mem, "/tabled-%s/%s", thisgroup, thishost) == -1)
+ if (asprintf(&mem, "/tabled-%s/%s", thisgroup, thisname) == -1)
goto err_oom;
sp->ffname = mem;
+ if (asprintf(&mem, "/tabled-%s/%s", thisgroup, MASTER_FILE) == -1)
+ goto err_oom;
+ sp->mfname = mem;
+
if (asprintf(&mem, "/chunk-%s", thisgroup) == -1)
goto err_oom;
sp->xfname = mem;
@@ -147,6 +164,259 @@ err_oom:
return 0;
}
+/*
+ * Ugh, side effects on tabled_srv.rep_master.
+ */
+static void cldu_parse_master(const char *mfname, const char *mfile, long len)
+{
+ enum lex_state { lex_tag, lex_colon, lex_val };
+ const char *tag, *val;
+ int taglen;
+ const char *name, *host, *port;
+ int namelen, hostlen, portlen;
+ char namebuf[65], hostbuf[65], portbuf[15];
+ long portnum;
+ enum lex_state state;
+ struct db_remote *rp;
+ const char *p;
+ char c;
+
+ name = NULL;
+ namelen = 0;
+ host = NULL;
+ hostlen = 0;
+ port = NULL;
+ portlen = 0;
+
+ p = mfile;
+ tag = p;
+ val = NULL;
+ state = lex_tag;
+ for (;;) {
+ if (p >= mfile+len)
+ break;
+ c = *p++;
+ if (state == lex_tag) {
+ if (c == ':') {
+ val = p;
+ state = lex_colon;
+ taglen = (p-1) - tag;
+ } else if (c == '\n') {
+ if (debugging)
+ applog(LOG_DEBUG,
+ "%s: No colon", mfname);
+ tag = p;
+ val = NULL;
+ state = lex_tag;
+ }
+ } else if (state == lex_colon) {
+ if (c == ' ') {
+ val = p;
+ } else if (c == '\n') {
+ if (debugging)
+ applog(LOG_DEBUG,
+ "%s: Empty value", mfname);
+ tag = p;
+ val = NULL;
+ state = lex_tag;
+ } else {
+ state = lex_val;
+ }
+ } else if (state == lex_val) {
+ if (c == '\n') {
+ if (taglen == sizeof("name")-1 &&
+ memcmp(tag, "name", taglen) == 0) {
+ name = val;
+ namelen = (p-1) - val;
+ } else if (taglen == sizeof("host")-1 &&
+ memcmp(tag, "host", taglen) == 0) {
+ host = val;
+ hostlen = (p-1) - val;
+ } else if (taglen == sizeof("port")-1 &&
+ memcmp(tag, "port", taglen) == 0) {
+ port = val;
+ portlen = (p-1) - val;
+ } else {
+ if (debugging)
+ applog(LOG_DEBUG,
+ "%s: Unknown tag %c[%d]",
+ mfname, tag[0], taglen);
+ }
+ tag = p;
+ val = NULL;
+ state = lex_tag;
+ }
+ } else {
+ return;
+ }
+ }
+
+ if (!name || !namelen) {
+ if (debugging)
+ applog(LOG_DEBUG, "%s: No name", mfname);
+ return;
+ }
+ if (!host || !hostlen) {
+ if (debugging)
+ applog(LOG_DEBUG, "%s: No host", mfname);
+ return;
+ }
+ if (!port || !portlen) {
+ if (debugging)
+ applog(LOG_DEBUG, "%s: No port", mfname);
+ return;
+ }
+
+ if (namelen >= sizeof(namebuf)) {
+ applog(LOG_ERR, "Long master name");
+ return;
+ }
+ memcpy(namebuf, name, namelen);
+ namebuf[namelen] = 0;
+
+ if (hostlen >= sizeof(hostbuf)) {
+ applog(LOG_ERR, "Long host");
+ return;
+ }
+ memcpy(hostbuf, host, hostlen);
+ hostbuf[hostlen] = 0;
+
+ if (portlen >= sizeof(portbuf)) {
+ applog(LOG_ERR, "Long port");
+ return;
+ }
+ memcpy(portbuf, port, portlen);
+ portbuf[portlen] = 0;
+ portnum = strtol(port, NULL, 10);
+ if (portnum <= 0 || portnum >= 65536) {
+ applog(LOG_ERR, "Bad port %s", portbuf);
+ return;
+ }
+
+ rp = tdb_find_remote_byname(namebuf);
+ if (!rp) {
+ if (debugging)
+ applog(LOG_DEBUG, "%s: Not found master %s",
+ mfname, namebuf);
+ return;
+ }
+
+ if (debugging)
+ applog(LOG_DEBUG, "Found master %s host %s port %u",
+ namebuf, hostbuf, portnum);
+
+ rp->host = strdup(hostbuf);
+ rp->port = portnum;
+ if (!rp->host)
+ return;
+ tabled_srv.rep_master = rp;
+}
+
+static void cldu_get_master(const char *mfname, struct ncld_fh *mfh)
+{
+ struct ncld_read *nrp;
+ struct timespec tm;
+ int error;
+
+ nrp = ncld_get(mfh, &error);
+ if (!nrp) {
+ applog(LOG_ERR, "CLD get(%s) failed: %d", mfname, error);
+ return;
+ }
+
+ if (nrp->length < 3) {
+ ncld_read_free(nrp);
+
+ /*
+ * Since master opens, locks, and writes, in that order,
+ * there's a gap between the lock and write. So, unrace a bit.
+ */
+ tm.tv_sec = 2;
+ tm.tv_nsec = 0;
+ nanosleep(&tm, NULL);
+
+ nrp = ncld_get(mfh, &error);
+ if (!nrp) {
+ applog(LOG_ERR, "CLD get(%s) failed: %d", mfname, error);
+ return;
+ }
+
+ if (nrp->length < 3) {
+ applog(LOG_ERR, "CLD master(%s) is empty", mfname);
+ ncld_read_free(nrp);
+ return;
+ }
+ }
+
+ cldu_parse_master(mfname, nrp->ptr, nrp->length);
+ ncld_read_free(nrp);
+}
+
+/*
+ * Lock the MASTER file, write or read it as needed.
+ * N.B. Only call this if you know that mfh is closed or never open:
+ * right after cldu_set_cldc (disposing of session closes handles),
+ * or when we were slave and so should not kept mfh ...
+ * FIXME this will become more interesting when we keep mfh open in slave
+ * state so we can have outstanding locks for master failover notification.
+ */
+static int cldu_set_master(struct cld_session *sp)
+{
+ char *buf;
+ int len;
+ int error;
+ int rc;
+
+ if (!sp->nsp)
+ return -1;
+
+ /* Maybe drop this later, after notifications work. */
+ if (debugging) {
+ rc = g_list_length(sp->nsp->handles);
+ applog(LOG_DEBUG, "open handles %d", rc);
+ }
+
+ sp->mfh = ncld_open(sp->nsp, sp->mfname,
+ COM_READ | COM_WRITE | COM_LOCK | COM_CREATE,
+ &error, 0, NULL, NULL);
+ if (!sp->mfh) {
+ applog(LOG_ERR, "CLD open(%s) failed: %d", sp->mfname, error);
+ goto err_open;
+ }
+
+ error = ncld_trylock(sp->mfh);
+ if (error) {
+ applog(LOG_INFO, "CLD lock(%s) failed: %d", sp->mfname, error);
+ cldu_get_master(sp->mfname, sp->mfh);
+ goto err_lock;
+ }
+
+ len = asprintf(&buf, "name: %s\nhost: %s\nport: %u\n",
+ sp->thisname, sp->thishost, tabled_srv.rep_port);
+ if (len < 0) {
+ applog(LOG_ERR, "internal error: no core");
+ goto err_wmem;
+ }
+
+ rc = ncld_write(sp->mfh, buf, len);
+ if (rc) {
+ applog(LOG_ERR, "CLD put(%s) failed: %d", sp->mfname, rc);
+ goto err_write;
+ }
+
+ free(buf);
+ return 0;
+
+err_write:
+ free(buf);
+err_wmem:
+ /* ncld_unlock() - close will unlock */
+err_lock:
+ ncld_close(sp->mfh);
+err_open:
+ return -1;
+}
+
static void cldu_tm_rescan(int fd, short events, void *userdata)
{
struct cld_session *sp = userdata;
@@ -162,14 +432,37 @@ static void cldu_tm_rescan(int fd, short events, void *userdata)
sp->nsp = NULL;
}
newactive = cldu_nextactive(sp);
- if (cldu_set_cldc(sp, newactive)) {
- evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
- return;
+ if (cldu_set_cldc(sp, newactive))
+ goto out;
+
+ if (cldu_set_master(sp) == 0) {
+ tabled_srv.state_want = ST_W_MASTER;
+ } else {
+ if (debugging)
+ applog(LOG_DEBUG, "Unable to relock %s",
+ sp->mfname);
+ tabled_srv.state_want = ST_W_SLAVE;
}
+ cld_update_cb();
+
sp->is_dead = false;
+ } else {
+ if (tabled_srv.state_want == ST_W_SLAVE) {
+ if (cldu_set_master(sp) == 0) {
+ tabled_srv.state_want = ST_W_MASTER;
+ } else {
+ if (debugging)
+ applog(LOG_DEBUG, "Unable to lock %s",
+ sp->mfname);
+ }
+ }
}
+ if (scan_peers(sp) != 0)
+ goto out;
scan_chunks(sp);
+
+ out:
evtimer_add(&sp->tm_rescan, &cldu_rescan_delay);
}
@@ -201,12 +494,6 @@ static void cldu_sess_event(void *priv, uint32_t what)
static int cldu_set_cldc(struct cld_session *sp, int newactive)
{
struct cldc_host *hp;
- struct ncld_read *nrp;
- char buf[100];
- const char *ptr;
- int dir_len;
- int total_len, rec_len, name_len;
- int len;
struct timespec tm;
int error;
int rc;
@@ -261,6 +548,7 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
/*
* Then, create the membership file for us.
+ * We lock it in case of two tabled running with same name by mistake.
*/
sp->ffh = ncld_open(sp->nsp, sp->ffname,
COM_WRITE | COM_LOCK | COM_CREATE,
@@ -285,11 +573,7 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
/*
* The usual reason why we get a lock conflict is
* restarting too quickly and hitting the previous lock
- * that is going to disappear soon.
- *
- * FIXME: However, it may also be that a master
- * is ok we we should become a slave, e.g. start TDB.
- * We do not support multi-node, but we should.
+ * that is going to disappear soon. Just wait it out.
*/
tm.tv_sec = 10;
tm.tv_nsec = 0;
@@ -299,21 +583,43 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
/*
* Write the file with our connection parameters.
*/
- len = snprintf(buf, sizeof(buf), "port: %u\n", tabled_srv.rep_port);
- if (len >= sizeof(buf)) {
- applog(LOG_ERR, "internal error: overflow for port (%d)", len);
- goto err_wmem;
- }
-
- rc = ncld_write(sp->ffh, buf, len);
+ rc = ncld_write(sp->ffh, "-\n", 2);
if (rc) {
applog(LOG_ERR, "CLD put(%s) failed: %d", sp->ffname, rc);
goto err_write;
}
/*
- * Read the directory.
+ * Finally, scan cfh to find peers, add with global effects.
*/
+ if (scan_peers(sp) != 0)
+ goto err_pscan;
+
+ return 0;
+
+err_pscan:
+err_write:
+err_lock:
+ ncld_close(sp->ffh); /* session-close closes these, maybe drop */
+err_fopen:
+ ncld_close(sp->cfh);
+err_copen:
+ ncld_sess_close(sp->nsp);
+ sp->nsp = NULL;
+err_nsess:
+err_addr:
+ return -1;
+}
+
+static int scan_peers(struct cld_session *sp)
+{
+ struct ncld_read *nrp;
+ char buf[65];
+ const char *ptr;
+ int dir_len;
+ int total_len, rec_len, name_len;
+ int error;
+
nrp = ncld_get(sp->cfh, &error);
if (!nrp) {
applog(LOG_ERR, "CLD get(%s) failed: %d", sp->cfname, error);
@@ -336,13 +642,20 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
else
buf[64] = 0;
- if (!strcmp(buf, sp->thishost)) {
+ if (!strcmp(buf, MASTER_FILE)) {
+ ; /* ignore special entry */
+ } else if (!strcmp(buf, sp->thisname)) {
if (debugging)
applog(LOG_DEBUG, " %s (ourselves)", buf);
} else {
- if (debugging)
- applog(LOG_DEBUG, " %s", buf);
- add_remote(buf);
+ if (tdb_find_remote_byname(buf)) {
+ if (debugging)
+ applog(LOG_DEBUG, " %s", buf);
+ } else {
+ if (debugging)
+ applog(LOG_DEBUG, " %s (new)", buf);
+ add_remote(buf);
+ }
}
ptr += total_len;
@@ -350,21 +663,9 @@ static int cldu_set_cldc(struct cld_session *sp, int newactive)
}
ncld_read_free(nrp);
-
return 0;
err_dread:
-err_write:
-err_wmem:
-err_lock:
- ncld_close(sp->ffh); /* session-close closes these, maybe drop */
-err_fopen:
- ncld_close(sp->cfh);
-err_copen:
- ncld_sess_close(sp->nsp);
- sp->nsp = NULL;
-err_nsess:
-err_addr:
return -1;
}
@@ -508,9 +809,6 @@ err_mem:
return;
}
-/*
- * FIXME need to read port number from the file (port:<space>num).
- */
static void add_remote(const char *name)
{
struct db_remote *rp;
@@ -518,10 +816,15 @@ static void add_remote(const char *name)
rp = malloc(sizeof(struct db_remote));
if (!rp)
return;
+ memset(rp, 0, sizeof(struct db_remote));
+
+ /*
+ * Master assigns global IDs now, distributes them in login protocol.
+ */
+ rp->dbid = DBID_NONE;
- rp->port = 8083;
- rp->host = strdup(name);
- if (!rp->host) {
+ rp->name = strdup(name);
+ if (!rp->name) {
free(rp);
return;
}
@@ -564,7 +867,8 @@ void cld_init()
/*
* This initiates our sole session with a CLD instance.
*/
-int cld_begin(const char *thishost, const char *thisgroup, int verbose)
+int cld_begin(const char *thishost, const char *thisgroup,
+ const char *thisname, int verbose)
{
static struct cld_session *sp = &ses;
struct timespec tm;
@@ -575,7 +879,7 @@ int cld_begin(const char *thishost, const char *thisgroup, int verbose)
evtimer_set(&ses.tm_rescan, cldu_tm_rescan, &ses);
- if (cldu_setgroup(sp, thisgroup, thishost)) {
+ if (cldu_setgroup(sp, thisgroup, thishost, thisname)) {
/* Already logged error */
goto err_group;
}
@@ -626,6 +930,14 @@ int cld_begin(const char *thishost, const char *thisgroup, int verbose)
newactive = cldu_nextactive(sp);
}
+ if (cldu_set_master(sp) == 0) {
+ if (debugging)
+ applog(LOG_DEBUG, "Locked %s", sp->mfname);
+ tabled_srv.state_want = ST_W_MASTER;
+ } else {
+ tabled_srv.state_want = ST_W_SLAVE;
+ }
+
retry_cnt = 0;
for (;;) {
if (!scan_chunks(sp))
@@ -696,8 +1008,12 @@ void cld_end(void)
sp->ffname = NULL;
free(sp->xfname);
sp->xfname = NULL;
+ free(sp->mfname);
+ sp->mfname = NULL;
free(sp->thisgroup);
sp->thisgroup = NULL;
free(sp->thishost);
sp->thishost = NULL;
+ free(sp->thisname);
+ sp->thisname = NULL;
}
diff --git a/server/config.c b/server/config.c
index ff4d876..293a5dd 100644
--- a/server/config.c
+++ b/server/config.c
@@ -224,6 +224,16 @@ static void cfg_elm_end (GMarkupParseContext *context,
cc->text = NULL;
}
+ else if (!strcmp(element_name, "TDBRepName")) {
+ if (!cc->text) {
+ applog(LOG_WARNING, "TDBRepName element empty");
+ return;
+ }
+ free(tabled_srv.rep_name);
+ tabled_srv.rep_name = cc->text;
+ cc->text = NULL;
+ }
+
else if (!strcmp(element_name, "StatusPort")) {
if (!cc->text) {
applog(LOG_WARNING, "StatusPort element empty");
diff --git a/server/metarep.c b/server/metarep.c
new file mode 100644
index 0000000..d3eec49
--- /dev/null
+++ b/server/metarep.c
@@ -0,0 +1,1245 @@
+
+/*
+ * Copyright 2008-2009 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING. If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define _GNU_SOURCE
+#include "tabled-config.h"
+
+#include <sys/types.h>
+#include <sys/ioctl.h>
+#include <stddef.h>
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <syslog.h>
+#include <glib.h>
+#include <tdb.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include "tabled.h"
+
+/* #define offsetof(type, member) \
+ (((unsigned char *)&((type *)0)->member) - (unsigned char *)0) */
+
+/*
+ * flags:
+ * <31:28> version (currently 1)
+ * <27:8> unused
+ * <7:0> rep_msg_type
+ */
+enum rep_msg_type { REP_MSG_NOP, REP_MSG_LOGIN, REP_MSG_LOGOK, REP_MSG_DATA };
+struct rep_msg_hdr {
+ unsigned int flags;
+ unsigned int lenctl;
+ unsigned int lendata;
+ unsigned short dst, src;
+};
+
+/*
+ * The naming convention is to identify the context in which the function runs.
+ */
+static int rtdb_master_login_reply(struct db_conn *dbc, unsigned char *msgbuf);
+
+/*
+ * Note that the invalid dbid is zero, not -1.
+ */
+static int make_remote_id(void)
+{
+ int id;
+
+ for (;;) {
+ id = rand() % (DBID_MAX+1 - DBID_MIN) + DBID_MIN;
+ if (!tdb_find_remote_byid(id))
+ return id;
+ }
+}
+
+static int dbl_init(struct db_link *dbl)
+{
+ dbl->fd = -1;
+ dbl->state = DBC_INIT;
+
+ dbl->obuflen = 500;
+ dbl->obuf = malloc(dbl->obuflen);
+ if (!dbl->obuf)
+ return -1;
+
+ dbl->ibuflen = sizeof(struct rep_msg_hdr);
+ dbl->ibuf = malloc(dbl->ibuflen);
+ if (!dbl->ibuf) {
+ free(dbl->obuf);
+ return -1;
+ }
+ dbl->cnt = 0;
+ dbl->explen = 1;
+
+ return 0;
+}
+
+static int dbl_irealloc(struct db_link *dbl, int len)
+{
+ unsigned char *newbuf;
+
+ if (len > dbl->ibuflen) {
+ if (!(newbuf = malloc(len)))
+ return -1;
+ memcpy(newbuf, dbl->ibuf, dbl->ibuflen);
+ free(dbl->ibuf);
+ dbl->ibuf = newbuf;
+ dbl->ibuflen = len;
+ }
+ return 0;
+}
+
+/*
+ * Expect the dbl->explen, return accumulated dbl->cnt.
+ */
+static int dbl_expect(struct db_link *dbl)
+{
+ int rc;
+
+ rc = read(dbl->fd, dbl->ibuf + dbl->cnt, dbl->explen - dbl->cnt);
+ if (rc < 0) {
+ if (errno == EAGAIN)
+ return dbl->cnt;
+ applog(LOG_ERR, "network read: %s", strerror(errno));
+ return -1;
+ }
+ if (rc == 0) {
+ applog(LOG_ERR, "EOF from peer"); /* P3 */
+ return -1;
+ }
+ dbl->cnt += rc;
+ return dbl->cnt;
+}
+
+static int dbl_hdr_validate(struct rep_msg_hdr *hdr, int thisid)
+{
+ unsigned int msgflags;
+ int srcid, dstid;
+
+ msgflags = GUINT32_FROM_BE(hdr->flags);
+ if ((msgflags >> 28) != 1) {
+ applog(LOG_ERR, "Link: bad protocol, flags 0x%08x", msgflags);
+ return -1;
+ }
+
+ srcid = GUINT16_TO_BE(hdr->src);
+ dstid = GUINT16_TO_BE(hdr->dst);
+ if (srcid == dstid) {
+ applog(LOG_ERR, "Link: loopback, dbid %d", dstid);
+ return -1;
+ }
+ if (srcid < DBID_MIN || srcid > DBID_MAX) {
+ applog(LOG_ERR, "Link: bad src dbid %d", srcid);
+ return -1;
+ }
+ if (dstid < DBID_MIN || dstid > DBID_MAX) {
+ applog(LOG_ERR, "Link: bad dst dbid %d", dstid);
+ return -1;
+ }
+
+ if (thisid != 0 && dstid != thisid) {
+ applog(LOG_ERR, "Link: misdirected, dst dbid %d our dbid %d",
+ dstid, thisid);
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ * Login message is different in two ways:
+ * - src and/or dst may be not set
+ * - lenctl is the dst name, lendata is the src name
+ * (but contents not available for validation)
+ */
+static int dbl_hdr_validate_login(struct rep_msg_hdr *hdr, int thisid)
+{
+ unsigned int msgflags;
+ unsigned int len;
+
+ msgflags = GUINT32_FROM_BE(hdr->flags);
+ if ((msgflags >> 28) != 1) {
+ applog(LOG_ERR, "Link: bad protocol, flags 0x%08x", msgflags);
+ return -1;
+ }
+ if ((msgflags & 0xff) != REP_MSG_LOGIN) {
+ applog(LOG_ERR, "Link: bad login request, flags 0x%08x",
+ msgflags);
+ return -1;
+ }
+
+#if 0 /* A bad idea as long as names are the persistent identifiers. */
+ int dstid;
+ dstid = GUINT32_FROM_BE(hdr->dst);
+ if (dstid && dstid != thisid) {
+ applog(LOG_ERR, "Link: login to wrong dbid %d", dstid);
+ return -1;
+ }
+#endif
+
+ len = GUINT32_FROM_BE(hdr->lenctl);
+ if (len == 0 || len > 64) {
+ applog(LOG_ERR, "Link: bad login dst len %u", len);
+ return -1;
+ }
+
+ len = GUINT32_FROM_BE(hdr->lendata);
+ if (len == 0 || len > 64) {
+ applog(LOG_ERR, "Link: bad login src len %u", len);
+ return -1;
+ }
+
+ return 0;
+}
+
+static void dbl_fini(struct db_link *dbl)
+{
+ if (dbl->writing) {
+ event_del(&dbl->wrev);
+ dbl->writing = false;
+ }
+ if (dbl->fd >= 0) {
+ event_del(&dbl->rcev);
+ close(dbl->fd);
+ }
+ if (dbl->ibuf)
+ free(dbl->ibuf);
+ if (dbl->obuf)
+ free(dbl->obuf);
+}
+
+static struct db_conn *tdb_find_byid(struct tablerep *rtdb, int id)
+{
+ struct db_conn *dbc;
+
+ list_for_each_entry(dbc, &rtdb->conns, link) {
+ if (dbc->remote && dbc->remote->dbid == id)
+ return dbc;
+ }
+ return NULL;
+}
+
+static struct db_conn *dbc_alloc(struct tablerep *rtdb, struct db_remote *rem)
+{
+ struct db_conn *dbc;
+
+ dbc = malloc(sizeof(*dbc));
+ if (!dbc)
+ goto out_mem;
+ memset(dbc, 0, sizeof(*dbc));
+ dbc->rtdb = rtdb;
+ dbc->remote = rem;
+ if (dbl_init(&dbc->lk))
+ goto out_dbl;
+ return dbc;
+
+ out_dbl:
+ free(dbc);
+ out_mem:
+ return NULL;
+}
+
+static void dbc_free(struct db_conn *dbc)
+{
+ dbl_fini(&dbc->lk);
+ free(dbc);
+}
+
+/*
+ * The dbc->remote is known here, see callers.
+ *
+ * The db4 code assumes that it is all right to block when sending. Of course
+ * in our case that means blocking the whole (single-threaded) server.
+ * It is also all right to drop messages, which is said to hurt performance
+ * in other ways. Still, as long as tabled is single-theaded we have no choice.
+ *
+ * Since we can only send complete messages, and even blocking sockets can
+ * return short writes, we must buffer output. But we do not create any
+ * additional queues beyond what is required for the atomicity.
+ */
+static int tdb_rep_send(struct tablerep *rtdb, struct db_link *dbl,
+ int dstid, const DBT *ctl, const DBT *rec,
+ bool easydrop)
+{
+ unsigned char *p;
+ struct rep_msg_hdr *hdr;
+ unsigned int msgflags;
+ ssize_t len;
+ ssize_t rc;
+
+ if (dbl->togo) {
+ /* Maybe poke the output here? Should not be necessary. */
+ return 1;
+ }
+
+ len = sizeof(struct rep_msg_hdr) + ctl->size + rec->size;
+ if (dbl->obuflen < len) {
+ free(dbl->obuf);
+ dbl->obuflen = 0;
+ dbl->obuf = malloc(len);
+ if (!dbl->obuf) {
+ applog(LOG_WARNING, "No core (%ld)", (long) len);
+ return -1;
+ }
+ dbl->obuflen = len;
+ }
+
+ hdr = (struct rep_msg_hdr *) dbl->obuf;
+ p = dbl->obuf;
+
+ memset(hdr, 0, sizeof(struct rep_msg_hdr));
+ msgflags = (1 << 28) | (REP_MSG_DATA);
+ hdr->flags = GUINT32_TO_BE(msgflags);
+ hdr->dst = GUINT16_TO_BE((unsigned short)dstid);
+ hdr->src = GUINT16_TO_BE((unsigned short)rtdb->thisid);
+ p += sizeof(struct rep_msg_hdr);
+ if (ctl->size) {
+ hdr->lenctl = GUINT32_TO_BE(ctl->size);
+ memcpy(p, ctl->data, ctl->size);
+ p += ctl->size;
+ }
+ if (rec->size) {
+ hdr->lendata = GUINT32_TO_BE(rec->size);
+ memcpy(p, rec->data, rec->size);
+ p += rec->size;
+ }
+
+ dbl->done = 0;
+ dbl->togo = p - dbl->obuf;
+
+ rc = write(dbl->fd, dbl->obuf + dbl->done, dbl->togo);
+ if (rc < 0) {
+ dbl->done = 0;
+ dbl->togo = 0;
+ applog(LOG_ERR, "socket write error, peer dbid %d: %s",
+ dstid, strerror(errno));
+ return -1;
+ }
+ if (rc < dbl->togo) {
+ if (!dbl->writing) {
+ if (event_add(&dbl->wrev, NULL))
+ applog(LOG_ERR, "event_add failed (write)");
+ else
+ dbl->writing = true;
+ }
+ }
+ dbl->done += rc;
+ dbl->togo -= rc;
+ return 0;
+}
+
+static int db4_rep_send(DB_ENV *dbenv, const DBT *ctl, const DBT *rec,
+ const DB_LSN *lsnp, int envid, uint32_t flags)
+{
+ struct tablerep *rtdb;
+ struct db_conn *dbc;
+ int cnt;
+ int rc;
+
+ rtdb = (struct tablerep *)
+ ((char *)dbenv->app_private - offsetof(struct tablerep, tdb));
+
+ if (envid == DB_EID_BROADCAST) {
+ cnt = 0;
+ list_for_each_entry(dbc, &rtdb->conns, link) {
+ if (dbc->lk.state == DBC_OPEN) {
+ rc = tdb_rep_send(rtdb, &dbc->lk,
+ dbc->remote->dbid,
+ ctl, rec, true);
+ if (!rc)
+ cnt++;
+ if (rc < 0)
+ dbc->lk.state = DBC_DEAD;
+ }
+ }
+ if (!cnt)
+ return DB_REP_UNAVAIL;
+ } else {
+ dbc = tdb_find_byid(rtdb, envid);
+ if (dbc && dbc->lk.state == DBC_OPEN) {
+ rc = tdb_rep_send(rtdb, &dbc->lk,
+ dbc->remote->dbid, ctl, rec, false);
+ if (rc < 0) {
+ dbc->lk.state = DBC_DEAD;
+ return DB_REP_UNAVAIL;
+ }
+ if (rc)
+ return DB_REP_UNAVAIL;
+ } else {
+ applog(LOG_INFO, "Send: dbid %d not found", envid);
+ return DB_REP_UNAVAIL;
+ }
+ }
+ return 0;
+}
+
+static int rtdb_process(struct db_conn *dbc, unsigned char *msgbuf)
+{
+ struct rep_msg_hdr *hdr = (struct rep_msg_hdr *) msgbuf;
+ DB_ENV *dbenv = dbc->rtdb->tdb.env;
+ DBT pctl, prec;
+ DB_LSN lsn;
+ struct db_remote *peer;
+ int rc;
+
+ peer = tdb_find_remote_byid(GUINT16_FROM_BE(hdr->src));
+ if (!peer) {
+ applog(LOG_INFO, "Unknown peer dbid %d",
+ GUINT16_FROM_BE(hdr->src));
+ return -1;
+ }
+
+ memset(&pctl, 0, sizeof(pctl));
+ pctl.data = msgbuf + sizeof(struct rep_msg_hdr);
+ pctl.size = GUINT32_FROM_BE(hdr->lenctl);
+ memset(&prec, 0, sizeof(prec));
+ prec.data = pctl.data + pctl.size;
+ prec.size = GUINT32_FROM_BE(hdr->lendata);
+ rc = dbenv->rep_process_message(dbenv, &pctl, &prec, peer->dbid, &lsn);
+ switch (rc) {
+ case DB_REP_ISPERM:
+ /*
+ * The "record is written" is normal in db4 operations,
+ * and shows up so much that we do not print it even under
+ * if (debugging).
+ */
+ break;
+ case DB_REP_DUPMASTER: /* DB thinks we have 2 */
+ case DB_REP_HANDLE_DEAD: /* what handle? */
+ case DB_REP_HOLDELECTION: /* maybe just rep_init it */
+ case DB_REP_IGNORE: /* well, whatever */
+ case DB_REP_JOIN_FAILURE:
+ case DB_REP_LEASE_EXPIRED:
+ case DB_REP_LOCKOUT:
+ case DB_REP_NEWSITE:
+ case DB_REP_NOTPERM:
+ case DB_REP_UNAVAIL:
+ default:
+ if (rc) {
+ applog(LOG_INFO, "rep_process_message: %d (%s)",
+ rc, db_strerror(rc));
+ }
+ }
+
+ return 0;
+}
+
+static int rtdb_send_more(struct db_link *dbl)
+{
+ ssize_t rc;
+
+ if (!dbl->togo) {
+ /* P3 */ applog(LOG_INFO, "stray write event");
+ event_del(&dbl->wrev);
+ dbl->writing = false;
+ return 0;
+ }
+
+ rc = write(dbl->fd, dbl->obuf + dbl->done, dbl->togo);
+ if (rc < 0) {
+ applog(LOG_ERR, "socket write error: %s", strerror(errno));
+ dbl->done = 0;
+ dbl->togo = 0;
+ return -1;
+ }
+ if (rc < dbl->togo) {
+ dbl->done += rc;
+ dbl->togo -= rc;
+ if (!dbl->writing) {
+ if (event_add(&dbl->wrev, NULL))
+ applog(LOG_ERR, "event_add failed (write)");
+ else
+ dbl->writing = true;
+ }
+ } else {
+ dbl->done = 0;
+ dbl->togo = 0;
+ if (dbl->writing) {
+ event_del(&dbl->wrev);
+ dbl->writing = false;
+ }
+ }
+ return 0;
+}
+
+static void rtdb_wr_event(int fd, short events, void *userdata)
+{
+ struct db_link *dbl = userdata;
+
+ if (rtdb_send_more(dbl))
+ dbl->state = DBC_DEAD;
+}
+
+static void rtdb_master_tcp_event(int fd, short events, void *userdata)
+{
+ struct db_conn *dbc = userdata;
+ struct rep_msg_hdr *hdr;
+ unsigned msgflags;
+ int ctllen, reclen;
+ int len;
+ int rc;
+
+ switch (dbc->lk.state) {
+ case DBC_LOGIN:
+ rc = dbl_expect(&dbc->lk);
+ if (rc < 0)
+ goto out_bad_dbc;
+ if (rc < dbc->lk.explen)
+ return;
+
+ if (dbc->lk.explen == sizeof(struct rep_msg_hdr)) {
+ hdr = (struct rep_msg_hdr *) dbc->lk.ibuf;
+ if (dbl_hdr_validate_login(hdr, dbc->rtdb->thisid))
+ goto out_bad_dbc;
+
+ ctllen = GUINT32_FROM_BE(hdr->lenctl);
+ reclen = GUINT32_FROM_BE(hdr->lendata);
+ len = sizeof(struct rep_msg_hdr) + ctllen + reclen;
+ if (dbl_irealloc(&dbc->lk, len) < 0) {
+ applog(LOG_ERR, "No core (%d)", len);
+ goto out_bad_dbc;
+ }
+ dbc->lk.explen = len;
+ } else {
+ if (rtdb_master_login_reply(dbc, dbc->lk.ibuf))
+ goto out_bad_dbc;
+
+ dbc->lk.state = DBC_OPEN;
+ dbc->lk.cnt = 0;
+ dbc->lk.explen = sizeof(struct rep_msg_hdr);
+ }
+ break;
+ case DBC_OPEN:
+ rc = dbl_expect(&dbc->lk);
+ if (rc < 0)
+ goto out_bad_dbc;
+ if (rc < dbc->lk.explen)
+ return;
+
+ if (dbc->lk.explen == sizeof(struct rep_msg_hdr)) {
+ hdr = (struct rep_msg_hdr *) dbc->lk.ibuf;
+ if (dbl_hdr_validate(hdr, dbc->rtdb->thisid))
+ goto out_bad_dbc;
+ msgflags = GUINT32_FROM_BE(hdr->flags);
+ if ((msgflags & 0xff) != REP_MSG_DATA) {
+ applog(LOG_ERR,
+ "Bad data message, flags 0x%08x",
+ msgflags);
+ goto out_bad_dbc;
+ }
+
+ ctllen = GUINT32_FROM_BE(hdr->lenctl);
+ reclen = GUINT32_FROM_BE(hdr->lendata);
+ len = sizeof(struct rep_msg_hdr) + ctllen + reclen;
+ if (dbl_irealloc(&dbc->lk, len) < 0) {
+ applog(LOG_ERR, "No core (%d)", len);
+ goto out_bad_dbc;
+ }
+ dbc->lk.explen = len;
+ } else {
+ if (rtdb_process(dbc, dbc->lk.ibuf))
+ goto out_bad_dbc;
+
+ dbc->lk.state = DBC_OPEN;
+ dbc->lk.cnt = 0;
+ dbc->lk.explen = sizeof(struct rep_msg_hdr);
+ }
+ break;
+ default: // DBC_DEAD
+ if (dbc->remote) {
+ applog(LOG_INFO,
+ "Event on a dead slave socket, slave %s",
+ dbc->remote->host);
+ } else {
+ applog(LOG_INFO,
+ "Event on a dead slave socket");
+ }
+ tdb_conn_scrub_cb();
+ }
+ return;
+
+ out_bad_dbc:
+ dbc->lk.state = DBC_DEAD;
+ tdb_conn_scrub_cb();
+ return;
+}
+
+static void tdb_conn_event(int fd, short events, void *userdata)
+{
+ struct tablerep *rtdb = userdata;
+ struct db_conn *dbc;
+ struct sockaddr_in6 addr;
+ socklen_t addrlen;
+ char host[65], port[15];
+
+ dbc = dbc_alloc(rtdb, NULL);
+ if (!dbc)
+ goto out_dbc;
+ dbc->lk.explen = sizeof(struct rep_msg_hdr);
+ dbc->lk.state = DBC_LOGIN;
+
+ addrlen = sizeof(addr);
+ dbc->lk.fd = accept(fd, (struct sockaddr *) &addr, &addrlen);
+ if (dbc->lk.fd < 0) {
+ applog(LOG_ERR, "accept: %s", strerror(errno));
+ goto out_accept;
+ }
+
+ getnameinfo((struct sockaddr *) &addr, addrlen,
+ host, sizeof(host), port, sizeof(port),
+ NI_NUMERICHOST|NI_NUMERICSERV);
+ applog(LOG_INFO, "db slave host %s port %s", host, port);
+
+ if (fcntl(dbc->lk.fd, F_SETFL, O_NONBLOCK) < 0) {
+ applog(LOG_ERR, "fcntl: %s", strerror(errno));
+ goto out_flags;
+ }
+
+ event_set(&dbc->lk.rcev, dbc->lk.fd, EV_READ | EV_PERSIST,
+ rtdb_master_tcp_event, dbc);
+ event_set(&dbc->lk.wrev, dbc->lk.fd, EV_WRITE | EV_PERSIST,
+ rtdb_wr_event, &dbc->lk);
+ if (event_add(&dbc->lk.rcev, NULL) < 0) {
+ applog(LOG_ERR, "event_add failed");
+ goto out_add;
+ }
+ list_add_tail(&dbc->link, &rtdb->conns);
+ return;
+
+ out_add:
+ out_flags:
+ close(dbc->lk.fd);
+ out_accept:
+ dbc_free(dbc);
+ out_dbc:
+ return;
+}
+
+static int tdb_rep_listen_open(struct sockaddr_in *addr, int addr_len)
+{
+ int fd;
+ int on;
+ int rc;
+
+ fd = socket(addr->sin_family, SOCK_STREAM, 0);
+ if (fd < 0)
+ return -errno;
+
+ on = 1;
+ if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
+ rc = -errno;
+ goto out_err;
+ }
+
+ if (bind(fd, addr, addr_len) < 0) {
+ rc = -errno;
+ goto out_err;
+ }
+
+ // rc = fsetflags("tcp server", fd, O_NONBLOCK);
+ // if (rc) {
+ // rc = -errno;
+ // goto out_err;
+ // }
+
+ if (listen(fd, 100) < 0) {
+ rc = -errno;
+ goto out_err;
+ }
+
+ return fd;
+
+ out_err:
+ close(fd);
+ return rc;
+}
+
+static int rtdb_rep_listen(struct tablerep *rtdb, unsigned short port)
+{
+ struct sockaddr_in addr4;
+ struct sockaddr_in6 addr6;
+ int rc;
+
+ memset(&addr6, 0, sizeof(addr6));
+ addr6.sin6_family = AF_INET6;
+ addr6.sin6_port = htons(port);
+ memcpy(&addr6.sin6_addr, &in6addr_any, sizeof(struct in6_addr));
+ rc = tdb_rep_listen_open((struct sockaddr_in *)&addr6, sizeof(addr6));
+ if (rc < 0) {
+ if (debugging)
+ applog(LOG_DEBUG,
+ "tdb_rep_listen_open(v6, %u) failed: %s",
+ port, strerror(-rc));
+ } else {
+ rtdb->sockfd6 = rc;
+ event_set(&rtdb->lsev6, rtdb->sockfd6, EV_READ | EV_PERSIST,
+ tdb_conn_event, rtdb);
+ if (event_add(&rtdb->lsev6, NULL) < 0)
+ applog(LOG_ERR, "event_add failed");
+ }
+
+ memset(&addr4, 0, sizeof(addr4));
+ addr4.sin_family = AF_INET;
+ addr4.sin_port = htons(port);
+ addr4.sin_addr.s_addr = htonl(INADDR_ANY);
+ rc = tdb_rep_listen_open((struct sockaddr_in *)&addr4, sizeof(addr4));
+ if (rc < 0) {
+ if (debugging)
+ applog(LOG_DEBUG,
+ "tdb_rep_listen_open(v4, %u) failed: %s",
+ port, strerror(-rc));
+ } else {
+ rtdb->sockfd4 = rc;
+ event_set(&rtdb->lsev4, rtdb->sockfd4, EV_READ | EV_PERSIST,
+ tdb_conn_event, rtdb);
+ if (event_add(&rtdb->lsev4, NULL) < 0)
+ applog(LOG_ERR, "event_add failed");
+ }
+
+ return 0;
+}
+
+static void rtdb_slave_tcp_event(int fd, short events, void *userdata)
+{
+ struct db_conn *dbc = userdata;
+ struct tablerep *rtdb = dbc->rtdb;
+ struct rep_msg_hdr *hdr;
+ unsigned msgflags;
+ int srcid, dstid;
+ int ctllen, reclen;
+ int len;
+ int rc;
+
+ switch (dbc->lk.state) {
+ case DBC_LOGIN:
+ rc = dbl_expect(&dbc->lk);
+ if (rc < 0)
+ goto out_bad_dbc;
+ if (rc < dbc->lk.explen)
+ return;
+
+ hdr = (struct rep_msg_hdr *) dbc->lk.ibuf;
+ if (dbl_hdr_validate(hdr, rtdb->thisid))
+ goto out_bad_dbc;
+ msgflags = GUINT32_FROM_BE(hdr->flags);
+ if ((msgflags & 0xff) != REP_MSG_LOGOK) {
+ applog(LOG_ERR, "Bad login reply, flags 0x%08x",
+ msgflags);
+ goto out_bad_dbc;
+ }
+ srcid = GUINT16_FROM_BE(hdr->src);
+ dstid = GUINT16_FROM_BE(hdr->dst);
+
+ if (rtdb->thisid == 0) {
+ applog(LOG_INFO, "Assigned local dbid %d", dstid);
+ } else {
+ if (rtdb->thisid != dstid) {
+ /*
+ * Oracle people posted that db won't like this,
+ * but what can we do. At worst, blow away the
+ * local db on slave by hand and let it resync.
+ */
+ applog(LOG_INFO,
+ "Reassigned local dbid from %d to %d",
+ rtdb->thisid, dstid);
+ rtdb->thisid = dstid;
+ }
+ }
+ rtdb->thisid = dstid;
+
+ dbc->lk.state = DBC_OPEN;
+ dbc->lk.cnt = 0;
+ dbc->lk.explen = sizeof(struct rep_msg_hdr);
+
+ if (tdb_slave_login_cb(srcid))
+ goto out_bad_dbc;
+ break;
+ case DBC_OPEN:
+ rc = dbl_expect(&dbc->lk);
+ if (rc < 0)
+ goto out_bad_dbc;
+ if (rc < dbc->lk.explen)
+ return;
+
+ if (dbc->lk.explen == sizeof(struct rep_msg_hdr)) {
+ hdr = (struct rep_msg_hdr *) dbc->lk.ibuf;
+ if (dbl_hdr_validate(hdr, rtdb->thisid))
+ goto out_bad_dbc;
+ msgflags = GUINT32_FROM_BE(hdr->flags);
+ if ((msgflags & 0xff) != REP_MSG_DATA) {
+ applog(LOG_ERR,
+ "Bad data message, flags 0x%08x",
+ msgflags);
+ goto out_bad_dbc;
+ }
+
+ ctllen = GUINT32_FROM_BE(hdr->lenctl);
+ reclen = GUINT32_FROM_BE(hdr->lendata);
+ len = sizeof(struct rep_msg_hdr) + ctllen + reclen;
+ if (dbl_irealloc(&dbc->lk, len) < 0) {
+ applog(LOG_ERR, "No core (%d)", len);
+ goto out_bad_dbc;
+ }
+ dbc->lk.explen = len;
+ } else {
+ if (rtdb_process(dbc, dbc->lk.ibuf))
+ goto out_bad_dbc;
+
+ dbc->lk.state = DBC_OPEN;
+ dbc->lk.cnt = 0;
+ dbc->lk.explen = sizeof(struct rep_msg_hdr);
+ }
+ break;
+ case DBC_DEAD:
+ tdb_slave_disc_cb();
+ break;
+ default:
+ /* P3 */ applog(LOG_INFO, "Event on a unready socket");
+ }
+ return;
+
+ out_bad_dbc:
+ dbc->lk.state = DBC_DEAD;
+ tdb_conn_scrub_cb();
+ return;
+}
+
+static int rtdb_master_login_reply(struct db_conn *dbc, unsigned char *msgbuf)
+{
+ struct tablerep *rtdb = dbc->rtdb;
+ struct rep_msg_hdr *hdr = (struct rep_msg_hdr *) msgbuf;
+ int srclen, dstlen;
+ char *srcname, *dstname;
+ struct db_conn *tmp;
+ struct db_remote *slave;
+ int newid;
+ struct rep_msg_hdr hdrb;
+ unsigned int msgflags;
+ int rc;
+
+ /*
+ * Before proceeding, extract and zero-terminate src and dst names.
+ */
+ dstlen = GUINT32_FROM_BE(hdr->lenctl);
+ dstname = malloc(dstlen + 1);
+ if (!dstname) {
+ applog(LOG_ERR, "No core");
+ return -1;
+ }
+ memcpy(dstname, msgbuf + sizeof(struct rep_msg_hdr), dstlen);
+ dstname[dstlen] = 0;
+
+ srclen = GUINT32_FROM_BE(hdr->lendata);
+ srcname = malloc(srclen + 1);
+ if (!srcname) {
+ applog(LOG_ERR, "No core");
+ free(dstname);
+ return -1;
+ }
+ memcpy(srcname, msgbuf + sizeof(struct rep_msg_hdr) + dstlen, srclen);
+ srcname[srclen] = 0;
+
+ if (dbc->remote) {
+ /* Never happens even with bad clients, our internal problem. */
+ applog(LOG_ERR, "Redone login for slave %s (src %s)",
+ dbc->remote->host, srcname);
+ goto out_err;
+ }
+
+ if (strcmp(srcname, rtdb->thisname) == 0) {
+ applog(LOG_ERR, "Login from aliasing slave %s", srcname);
+ goto out_err;
+ }
+
+ slave = tdb_find_remote_byname(srcname);
+ if (!slave) {
+ applog(LOG_INFO, "Unknown slave \"%s\"", srcname);
+ goto out_err;
+ }
+
+ if (slave->dbid == DBID_NONE) {
+ newid = GUINT16_FROM_BE(hdr->src);
+ if (newid == 0 || newid < DBID_MIN || newid > DBID_MAX) {
+ newid = make_remote_id();
+ }
+ slave->dbid = newid;
+ }
+ if (debugging)
+ applog(LOG_DEBUG, "Link login, slave %s dbid %d",
+ slave->host, slave->dbid);
+
+ /*
+ * Dispose of all existing connections. Our current implementation
+ * provides no security, so it is a proper thing to do. We assume
+ * that the slave knows what it's doing, maybe it detected a loss
+ * of TCP connection that we missed.
+ */
+ list_for_each_entry(tmp, &rtdb->conns, link) {
+ if (tmp->remote == slave)
+ tmp->lk.state = DBC_DEAD;
+ }
+
+ dbc->remote = slave;
+
+ memset(&hdrb, 0, sizeof(hdrb));
+ msgflags = (1 << 28) | (REP_MSG_LOGOK);
+ hdrb.flags = GUINT32_TO_BE(msgflags);
+ hdrb.dst = GUINT16_TO_BE((unsigned short)slave->dbid);
+ hdrb.src = GUINT16_TO_BE((unsigned short)rtdb->thisid);
+
+ rc = write(dbc->lk.fd, &hdrb, sizeof(hdrb));
+ if (rc < 0) {
+ applog(LOG_INFO, "Write error to peer %s: %s", slave->host,
+ strerror(errno));
+ goto out_err;
+ }
+ if (rc < sizeof(hdrb)) {
+ applog(LOG_INFO, "Write short to peer %s: %d", slave->host, rc);
+ goto out_err;
+ }
+
+ return 0;
+
+ out_err:
+ free(srcname);
+ free(dstname);
+ return -1;
+}
+
+static int rtdb_slave_login(struct db_conn *dbc)
+{
+ struct rep_msg_hdr *hdr;
+ unsigned char *msgbuf;
+ unsigned int msgflags;
+ int dstlen, srclen;
+ int len;
+
+ dstlen = strlen(dbc->remote->host);
+ srclen = strlen(dbc->rtdb->thisname);
+ len = sizeof(struct rep_msg_hdr) + dstlen + srclen;
+ msgbuf = malloc(len);
+ if (!msgbuf)
+ return -1;
+
+ hdr = (struct rep_msg_hdr *) msgbuf;
+ // memset(hdr, 0, sizeof(struct rep_msg_hdr)); /* no holes */
+ msgflags = (1 << 28) | (REP_MSG_LOGIN);
+ hdr->flags = GUINT32_TO_BE(msgflags);
+ hdr->lenctl = GUINT32_TO_BE(dstlen);
+ hdr->lendata = GUINT32_TO_BE(srclen);
+ hdr->dst = GUINT16_TO_BE((unsigned short)dbc->remote->dbid);
+ hdr->src = GUINT16_TO_BE((unsigned short)dbc->rtdb->thisid);
+ memcpy(msgbuf + sizeof(struct rep_msg_hdr), dbc->remote->host, dstlen);
+ memcpy(msgbuf + sizeof(struct rep_msg_hdr) + dstlen,
+ dbc->rtdb->thisname, srclen);
+
+ if (write(dbc->lk.fd, msgbuf, len) < len) {
+ dbc->lk.state = DBC_DEAD;
+ free(msgbuf);
+ return -1;
+ }
+ dbc->lk.state = DBC_LOGIN;
+ dbc->lk.explen = sizeof(struct rep_msg_hdr);
+ dbc->lk.cnt = 0;
+ free(msgbuf);
+ return 0;
+}
+
+static int tdb_rep_resolve(struct tablerep *rtdb, int *family,
+ int addrsize, unsigned char *addr, int *addrlen,
+ const char *hostname, unsigned short port)
+{
+ char portstr[15];
+ struct addrinfo hints;
+ struct addrinfo *res, *res0;
+ int rc;
+
+ snprintf(portstr, sizeof(portstr), "%u", port);
+
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = PF_UNSPEC;
+ hints.ai_socktype = SOCK_DGRAM;
+
+ rc = getaddrinfo(hostname, portstr, &hints, &res0);
+ if (rc) {
+ applog(LOG_WARNING, "getaddrinfo(%s:%s) failed: %s",
+ hostname, portstr, gai_strerror(rc));
+ return -1;
+ }
+
+ for (res = res0; res; res = res->ai_next) {
+ if (res->ai_family != AF_INET && res->ai_family != AF_INET6)
+ continue;
+
+ if (res->ai_addrlen > addrsize) /* should not happen */
+ continue;
+
+ memcpy(addr, res->ai_addr, res->ai_addrlen);
+ *addrlen = res->ai_addrlen;
+ *family = res->ai_family;
+
+ freeaddrinfo(res0);
+ return 0;
+ }
+
+ freeaddrinfo(res0);
+
+ applog(LOG_WARNING, "getaddrinfo(%s:%s): nothing suitable",
+ hostname, portstr);
+ return -1;
+}
+
+static int rtdb_rep_connect(struct db_conn *dbc)
+{
+ struct db_link *dbl = &dbc->lk;
+ struct db_remote *master = dbc->remote;
+ int family;
+ unsigned char addr[32];
+ int addrlen;
+ int rc;
+
+ rc = tdb_rep_resolve(dbc->rtdb, &family, sizeof(addr), addr, &addrlen,
+ master->host, master->port);
+ if (rc < 0)
+ return -1;
+
+ rc = socket(family, SOCK_STREAM, 0);
+ if (rc < 0) {
+ applog(LOG_WARNING, "socket: %s", strerror(errno));
+ return -1;
+ }
+ dbl->fd = rc;
+
+ if (connect(dbl->fd, (struct sockaddr *)addr, addrlen)) {
+ applog(LOG_WARNING, "connect(host %s port %u): %s",
+ master->host, master->port, strerror(errno));
+ close(dbl->fd);
+ return -1;
+ }
+
+ if (fcntl(dbl->fd, F_SETFL, O_NONBLOCK) < 0) {
+ applog(LOG_ERR, "fcntl: %s", strerror(errno));
+ close(dbl->fd);
+ return -1;
+ }
+
+ event_set(&dbl->rcev, dbl->fd, EV_READ | EV_PERSIST,
+ rtdb_slave_tcp_event, dbc);
+ if (event_add(&dbl->rcev, NULL) < 0) {
+ applog(LOG_ERR, "event_add failed");
+ close(dbl->fd);
+ return -1;
+ }
+ event_set(&dbl->wrev, dbl->fd, EV_WRITE | EV_PERSIST,
+ rtdb_wr_event, dbl);
+ return 0;
+}
+
+static void __rtdb_fini(struct tablerep *rtdb)
+{
+ struct db_conn *dbc;
+
+ if (rtdb->sockfd4 >= 0) {
+ event_del(&rtdb->lsev4);
+ close(rtdb->sockfd4);
+ rtdb->sockfd4 = -1;
+ }
+ if (rtdb->sockfd6 >= 0) {
+ event_del(&rtdb->lsev6);
+ close(rtdb->sockfd6);
+ rtdb->sockfd6 = -1;
+ }
+
+ while (!list_empty(&rtdb->conns)) {
+ dbc = list_entry(rtdb->conns.next, struct db_conn, link);
+ list_del(&dbc->link);
+ dbc_free(dbc);
+ }
+ rtdb->mdbc = NULL;
+}
+
+/*
+ * return:
+ * -1 - there was an error, things are in disarray, must call __rtdb_fini.
+ * 0 - all is up, may call tdb_init if desired.
+ * 1 - not done yet, just return to dispatch.
+ */
+static int __rtdb_start(struct tablerep *rtdb, bool we_are_master,
+ struct db_remote *rep_master, unsigned short rep_port)
+{
+ struct db_conn *dbc;
+
+ if (we_are_master) {
+ if (rtdb->thisid == DBID_NONE)
+ rtdb->thisid = make_remote_id();
+ if (rtdb_rep_listen(rtdb, rep_port))
+ return -1;
+ } else {
+ if (!rep_master) {
+ applog(LOG_INFO, "No master yet"); /* P3 */
+ return -1;
+ }
+ if (!rtdb->mdbc) {
+ dbc = dbc_alloc(rtdb, rep_master);
+ if (!dbc)
+ return -1;
+ dbc->lk.explen = sizeof(struct rep_msg_hdr);
+ dbc->lk.state = DBC_INIT;
+ list_add_tail(&dbc->link, &rtdb->conns);
+ rtdb->mdbc = dbc;
+ }
+ switch (rtdb->mdbc->lk.state) {
+ case DBC_OPEN:
+ break;
+ case DBC_INIT:
+ if (rtdb_rep_connect(rtdb->mdbc))
+ return -1;
+ if (rtdb_slave_login(rtdb->mdbc))
+ return -1;
+ return 1;
+ case DBC_LOGIN:
+ /* P3 */ applog(LOG_INFO, "start: no answer");
+ return -1;
+ default:
+ /* P3 */ applog(LOG_INFO, "start: confusion (state %d)",
+ rtdb->mdbc->lk.state);
+ return -1;
+ }
+ }
+ return 0;
+}
+
+int rtdb_init(struct tablerep *rtdb, const char *thisname)
+{
+ rtdb->thisname = thisname;
+
+ INIT_LIST_HEAD(&rtdb->conns);
+ rtdb->sockfd4 = -1;
+ rtdb->sockfd6 = -1;
+
+ // rtdb->mdbc = dbc_alloc(rtdb, NULL);
+ // if (!rtdb->mdbc)
+ // return -1;
+ // rtdb->mdbc->lk.explen = sizeof(struct rep_msg_hdr);
+ // rtdb->mdbc->lk.state = DBC_INIT;
+ // list_add_tail(&rtdb->mdbc.link, &rtdb->conns);
+ return 0;
+}
+
+int rtdb_start(struct tablerep *rtdb,
+ const char *db_home,
+ bool we_are_master,
+ struct db_remote *rep_master, unsigned short rep_port,
+ void (*cb)(enum db_event))
+{
+ int rc;
+
+ rc = __rtdb_start(rtdb, we_are_master, rep_master, rep_port);
+ if (rc < 0)
+ goto err_out;
+ if (rc > 0)
+ return 0;
+
+ /*
+ * Note that we only get here if either we're master, or slave
+ * and link is DBC_OPEN. In both cases rtdb->thidid must be set.
+ */
+ if (rtdb->thisid == 0) { /* never happens */
+ applog(LOG_WARNING, "Zero own dbid, master %d", we_are_master);
+ goto err_out;
+ }
+ if (tdb_init(&rtdb->tdb, db_home, NULL, "tabled", true,
+ rtdb->thisid, db4_rep_send, we_are_master, cb)) {
+ goto err_out;
+ }
+ return 0;
+
+err_out:
+ __rtdb_fini(rtdb);
+ return -1;
+}
+
+void rtdb_mc_reset(struct tablerep *rtdb, bool we_are_master,
+ struct db_remote *rep_master, unsigned short rep_port)
+{
+ int rc;
+
+ __rtdb_fini(rtdb);
+ rc = __rtdb_start(rtdb, we_are_master, rep_master, rep_port);
+ if (rc < 0) {
+ /*
+ * If we failed to reconnect immediately, we do not retry.
+ * This is because db4 has its own timeouts, so there's really
+ * no point in doing anything else: we would only interfere.
+ * From now on, rely on CLD to drive the attempts to reconnect.
+ */
+ /* P3 */ applog(LOG_INFO, "failed to reconnect (%d)", rc);
+ }
+}
+
+void rtdb_dbc_scrub(struct tablerep *rtdb)
+{
+ struct db_conn *dbc, *tmp;
+
+ list_for_each_entry_safe(dbc, tmp, &rtdb->conns, link) {
+ if (dbc->lk.state == DBC_DEAD) {
+ /*
+ * This prinout is misleading, since every remote
+ * may have several connections. But how to fix it?
+ */
+ if (dbc->remote) {
+ applog(LOG_INFO, "Closing, peer %s",
+ dbc->remote->host);
+ } else {
+ applog(LOG_INFO, "Closing");
+ }
+ if (dbc == rtdb->mdbc)
+ rtdb->mdbc = NULL;
+ list_del(&dbc->link);
+ dbc_free(dbc);
+ }
+ }
+}
+
+/*
+ * This wants to be both in here and in tdb.c. Problem.
+ */
+int rtdb_restart(struct tablerep *rtdb, bool we_are_master)
+{
+ DB_ENV *dbenv = rtdb->tdb.env;
+ unsigned int rep_flags;
+ int rc;
+
+ rep_flags = we_are_master ? DB_REP_MASTER : DB_REP_CLIENT;
+ rc = dbenv->rep_start(dbenv, NULL, rep_flags);
+ if (rc) {
+ dbenv->err(dbenv, rc, "rep_start(0x%x)", rep_flags);
+ return -1;
+ }
+ return 0;
+}
+
+void rtdb_fini(struct tablerep *rtdb)
+{
+ __rtdb_fini(rtdb);
+ tdb_fini(&rtdb->tdb);
+}
+
diff --git a/server/object.c b/server/object.c
index f8e7b12..3801e94 100644
--- a/server/object.c
+++ b/server/object.c
@@ -39,7 +39,7 @@
static int object_find(DB_TXN *txn, const char *bucket, const char *key,
struct db_obj_ent *pobj)
{
- DB *objs = tdb.objs;
+ DB *objs = tdbrep.tdb.objs;
struct db_obj_key *okey;
size_t alloc_len;
DBT pkey, pval;
@@ -72,7 +72,7 @@ static int object_find(DB_TXN *txn, const char *bucket, const char *key,
static bool __object_del(DB_TXN *txn, const char *bucket, const char *key)
{
- DB *objs = tdb.objs;
+ DB *objs = tdbrep.tdb.objs;
struct db_obj_key *okey;
size_t okey_len;
DBT pkey;
@@ -100,7 +100,7 @@ static bool __object_del(DB_TXN *txn, const char *bucket, const char *key)
bool object_del_acls(DB_TXN *txn, const char *bucket, const char *key)
{
- DB *acls = tdb.acls;
+ DB *acls = tdbrep.tdb.acls;
struct db_acl_key *akey;
size_t alloc_len;
DBT pkey;
@@ -163,8 +163,8 @@ bool object_del(struct client *cli, const char *user,
int rc;
enum errcode err = InternalError;
size_t alloc_len;
- DB_ENV *dbenv = tdb.env;
- DB *objs = tdb.objs;
+ DB_ENV *dbenv = tdbrep.tdb.env;
+ DB *objs = tdbrep.tdb.objs;
struct db_obj_key *okey;
struct db_obj_ent obje;
DBT pkey, pval;
@@ -326,9 +326,9 @@ static bool object_put_end(struct client *cli)
struct db_obj_ent oldobj;
bool delobj;
size_t alloc_len;
- DB_ENV *dbenv = tdb.env;
+ DB_ENV *dbenv = tdbrep.tdb.env;
DBT pkey, pval;
- DB *objs = tdb.objs;
+ DB *objs = tdbrep.tdb.objs;
DB_TXN *txn = NULL;
GByteArray *string_data;
GArray *string_lens;
@@ -786,7 +786,7 @@ static bool object_put_body(struct client *cli, const char *user,
return cli_err(cli, InternalError);
}
- objid = objid_next(&tabled_srv.object_count, &tdb);
+ objid = objid_next(&tabled_srv.object_count, &tdbrep.tdb);
rc = open_chunks(&cli->out_ch, &tabled_srv.all_stor,
cli, objid, content_len);
@@ -865,9 +865,9 @@ static bool object_put_acls(struct client *cli, const char *user,
{
enum errcode err = InternalError;
enum ReqACLC canacl;
- DB_ENV *dbenv = tdb.env;
+ DB_ENV *dbenv = tdbrep.tdb.env;
DB_TXN *txn = NULL;
- DB *objs = tdb.objs;
+ DB *objs = tdbrep.tdb.objs;
char *hdr;
char timestr[64];
int rc;
@@ -1130,7 +1130,7 @@ static bool object_get_body(struct client *cli, const char *user,
bool access_ok, modified = true;
GString *extra_hdr;
size_t alloc_len;
- DB *objs = tdb.objs;
+ DB *objs = tdbrep.tdb.objs;
struct db_obj_key *okey;
struct db_obj_ent *obj = NULL;
DBT pkey, pval;
diff --git a/server/replica.c b/server/replica.c
index ac14cb2..1b5e832 100644
--- a/server/replica.c
+++ b/server/replica.c
@@ -612,8 +612,8 @@ static void rep_scan_verify(struct rep_arg *arg,
static void rep_add_nid(unsigned int klen, struct db_obj_key *key, uint32_t nid)
{
- DB_ENV *db_env = tdb.env;
- DB *db_objs = tdb.objs;
+ DB_ENV *db_env = tdbrep.tdb.env;
+ DB *db_objs = tdbrep.tdb.objs;
DB_TXN *db_txn;
DBT pkey, pval;
struct db_obj_ent *obj;
@@ -749,8 +749,8 @@ static void rep_scan(struct rep_arg *arg)
g_mutex_unlock(kscan_mutex);
memset(&cur, 0, sizeof(struct cursor)); /* enough to construct */
- cur.db_env = tdb.env;
- cur.db_objs = tdb.objs;
+ cur.db_env = tdbrep.tdb.env;
+ cur.db_objs = tdbrep.tdb.objs;
kcnt = 0;
for (;;) {
diff --git a/server/server.c b/server/server.c
index 814afec..8859847 100644
--- a/server/server.c
+++ b/server/server.c
@@ -97,12 +97,15 @@ struct server tabled_srv = {
.config = "/etc/tabled.conf",
};
-struct tabledb tdb;
+struct tablerep tdbrep;
enum {
TT_CMD_DUMP,
TT_CMD_TDBST_MASTER,
- TT_CMD_TDBST_SLAVE
+ TT_CMD_TDBST_SLAVE,
+ TT_CMD_MASTER_LINK_RESET,
+ TT_CMD_LINK_SCRUB,
+ TT_CMDNUM
};
struct compiled_pat patterns[] = {
@@ -114,7 +117,11 @@ struct compiled_pat patterns[] = {
};
static char *state_name_tdb[ST_TDBNUM] = {
- "Init", "Open", "Active", "Master", "Slave"
+ "Init", "Open", "Master", "Slave"
+};
+
+static char *cmd_name_tdb[TT_CMDNUM] = {
+ "Dump", "GoMaster", "GoSlave", "MasterLinkReset", "LinkScrub"
};
static struct {
@@ -340,7 +347,7 @@ static int authcheck(struct http_req *req, char *extra_bucket,
* not match.
*/
- rc = tdb.passwd->get(tdb.passwd, NULL, &key, &val, 0);
+ rc = tdbrep.tdb.passwd->get(tdbrep.tdb.passwd, NULL, &key, &val, 0);
if (rc) {
pass = strdup("");
@@ -350,7 +357,7 @@ static int authcheck(struct http_req *req, char *extra_bucket,
char s[64];
snprintf(s, 64, "get user '%s'", user);
- tdb.passwd->err(tdb.passwd, rc, s);
+ tdbrep.tdb.passwd->err(tdbrep.tdb.passwd, rc, s);
}
} else {
pass = val.data;
@@ -387,8 +394,22 @@ static void stats_signal(int signo)
static void stats_dump(void)
{
- applog(LOG_INFO, "STATE: TDB %s",
- state_name_tdb[tabled_srv.state_tdb]);
+ struct db_remote *rp;
+ GList *tmp;
+
+ applog(LOG_INFO, "TDB: group %s state %s host %s rep_port %d dbid %d%s",
+ tabled_srv.group, state_name_tdb[tabled_srv.state_tdb],
+ tabled_srv.ourhost, tabled_srv.rep_port, tdbrep.thisid,
+ (tabled_srv.mc_delay)? " mc_delay": "");
+ for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) {
+ rp = tmp->data;
+ applog(LOG_INFO, "PN: name %s dbid %d", rp->name, rp->dbid);
+ if (rp->host)
+ applog(LOG_INFO, "PN: host %s port %d",
+ rp->host, rp->port);
+ if (rp == tabled_srv.rep_master)
+ applog(LOG_INFO, "PN (master)");
+ }
applog(LOG_INFO,
"STATS: poll %lu event %lu tcp_accept %lu opt_write %lu",
tabled_srv.stats.poll,
@@ -403,11 +424,17 @@ static void stats_dump(void)
bool stat_status(struct client *cli, GList *content)
{
+ struct db_remote *rp;
+ GList *tmp;
char *str;
+ int rc;
/*
* The loadavg is system dependent, we'll figure it out later.
* On Linux, applications read from /proc/loadavg.
+ *
+ * The listening info duplicates the hostname until we split
+ * the replication identifier from hostname.
*/
if (asprintf(&str,
"<h1>Status</h1>"
@@ -415,11 +442,50 @@ bool stat_status(struct client *cli, GList *content)
tabled_srv.ourhost, tabled_srv.port) < 0)
return false;
content = g_list_append(content, str);
+
if (asprintf(&str,
- "<p>State: TDB %s</p>\r\n",
- state_name_tdb[tabled_srv.state_tdb]) < 0)
+ "<p>TDB: group %s "
+ "state %s host %s rep_port %d dbid %d%s</p>\r\n",
+ tabled_srv.group, state_name_tdb[tabled_srv.state_tdb],
+ tabled_srv.ourhost, tabled_srv.rep_port, tdbrep.thisid,
+ (tabled_srv.mc_delay)? " mc_delay": "") < 0)
return false;
content = g_list_append(content, str);
+
+ if (tabled_srv.rep_remotes) {
+ if (asprintf(&str, "<p>") < 0)
+ return false;
+ content = g_list_append(content, str);
+ for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) {
+ rp = tmp->data;
+ rc = asprintf(&str, "Peer: name %s dbid %d",
+ rp->name, rp->dbid);
+ if (rc < 0)
+ return false;
+ content = g_list_append(content, str);
+ if (rp->host) {
+ rc = asprintf(&str, " host %s port %d",
+ rp->host, rp->port);
+ if (rc < 0)
+ return false;
+ content = g_list_append(content, str);
+ }
+ if (rp == tabled_srv.rep_master) {
+ str = strdup(" (master)");
+ if (!str)
+ return false;
+ content = g_list_append(content, str);
+ }
+ rc = asprintf(&str, "<br />\r\n");
+ if (rc < 0)
+ return false;
+ content = g_list_append(content, str);
+ }
+ if (asprintf(&str, "</p>\r\n") < 0)
+ return false;
+ content = g_list_append(content, str);
+ }
+
if (asprintf(&str,
"<p>Stats: "
"poll %lu event %lu tcp_accept %lu opt_write %lu</p>\r\n"
@@ -1421,7 +1487,7 @@ static void add_chkpt_timer(void)
static void tdb_checkpoint(int fd, short events, void *userdata)
{
- DB_ENV *dbenv = tdb.env;
+ DB_ENV *dbenv = tdbrep.tdb.env;
int rc;
if (debugging)
@@ -1436,29 +1502,50 @@ static void tdb_checkpoint(int fd, short events, void *userdata)
add_chkpt_timer();
}
+static void add_reup_timer(void)
+{
+ static const struct timeval tv = { TABLED_REUP_SEC, 0 };
+
+ if (evtimer_add(&tabled_srv.reup_timer, &tv) < 0)
+ applog(LOG_WARNING, "unable to add reup timer");
+}
+
+static void tdb_reup(int fd, short events, void *userdata)
+{
+
+ if (tabled_srv.state_want == ST_W_MASTER &&
+ tabled_srv.state_tdb == ST_TDB_MASTER) {
+ /*
+ * An upgrade failed, retry.
+ */
+ if (rtdb_restart(&tdbrep, true)) {
+ applog(LOG_WARNING, "Cannot restart to master");
+ add_reup_timer();
+ }
+ }
+}
+
static void tdb_state_cb(enum db_event event)
{
unsigned char cmd;
switch (event) {
case TDB_EV_ELECTED:
- /*
- * Safe to stop ignoring bogus client indication,
- * so unmute us by advancing the state.
- */
- if (tabled_srv.state_tdb == ST_TDB_OPEN)
- tabled_srv.state_tdb = ST_TDB_ACTIVE;
+ /* Just ignore this, we only care for the end state. */
break;
case TDB_EV_CLIENT:
+ /* P3 */ applog(LOG_INFO, "TDB event: slave, state %s", state_name_tdb[tabled_srv.state_tdb]);
+ goto overmsg;
case TDB_EV_MASTER:
+ /* P3 */ applog(LOG_INFO, "TDB event: master, state %s", state_name_tdb[tabled_srv.state_tdb]);
+ overmsg:
/*
* This callback runs on the context of the replication
* manager thread, and calling any of our functions thus
* turns our program into a multi-threaded one. Instead
* we signal the main thread to do the processing.
*/
- if (tabled_srv.state_tdb != ST_TDB_INIT &&
- tabled_srv.state_tdb != ST_TDB_OPEN) {
+ if (tabled_srv.state_tdb != ST_TDB_INIT) {
if (event == TDB_EV_MASTER)
cmd = TT_CMD_TDBST_MASTER;
else
@@ -1472,6 +1559,55 @@ static void tdb_state_cb(enum db_event event)
}
}
+void cld_update_cb(void)
+{
+ switch (tabled_srv.state_want) {
+ case ST_W_MASTER:
+ if (tabled_srv.state_tdb == ST_TDB_MASTER) {
+ ; /* CLD caught up to DB, better late than never */
+ } else if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+ /* CLD tells us to upgrade, do it */
+ if (rtdb_restart(&tdbrep, true)) {
+ applog(LOG_WARNING,
+ "Unable to restart to master");
+ /*
+ * Don't try rtdb_fini here, will end in a hang.
+ * Instead, retry endlessly until it succeeds.
+ */
+ add_reup_timer();
+ }
+ } else {
+ applog(LOG_WARNING, "Want Master while in state %s",
+ state_name_tdb[tabled_srv.state_tdb]);
+ }
+ break;
+ case ST_W_SLAVE:
+ if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+ ; /* all good */
+ } else if (tabled_srv.state_tdb == ST_TDB_MASTER) {
+ /*
+ * OK, this is bad. We lost our CLD session and some
+ * other node went master on us. Even if we downgrade
+ * the database now, some clients may have done some
+ * operations while CLD was bouncing. Complain loudly.
+ */
+ applog(LOG_WARNING,
+ "Downgrading the database,"
+ " data loss is possible");
+ if (rtdb_restart(&tdbrep, false)) {
+ tabled_srv.state_tdb = ST_TDB_INIT;
+ rtdb_fini(&tdbrep);
+ }
+ } else {
+ applog(LOG_WARNING, "Want Slave while in state %s",
+ state_name_tdb[tabled_srv.state_tdb]);
+ }
+ break;
+ default:
+ ;
+ }
+}
+
/*
* Due to the way storage_node management is tightly woven into the
* server, the management of nodes is not in storage.c, which deals
@@ -1485,7 +1621,6 @@ int stor_update_cb(void)
{
int num_up;
struct storage_node *stn;
- unsigned int env_flags;
if (debugging)
applog(LOG_DEBUG, "Know of potential %d storage node(s)",
@@ -1518,15 +1653,13 @@ int stor_update_cb(void)
* We initiate operations even if there's no redundancy in order
* to permit bootstrapping and build-time self-checking.
*/
+/* P3 */ applog(LOG_INFO, "storage updated, TDB state %s", state_name_tdb[tabled_srv.state_tdb]);
if (tabled_srv.state_tdb == ST_TDB_INIT) {
tabled_srv.state_tdb = ST_TDB_OPEN;
-
- env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
- if (tdb_init(&tdb, tabled_srv.tdb_dir, NULL,
- env_flags, "tabled", true,
- tabled_srv.rep_remotes,
- tabled_srv.ourhost, tabled_srv.rep_port,
- tdb_state_cb)) {
+ if (rtdb_start(&tdbrep, tabled_srv.tdb_dir,
+ tabled_srv.state_want == ST_W_MASTER,
+ tabled_srv.rep_master,
+ tabled_srv.rep_port, tdb_state_cb)) {
tabled_srv.state_tdb = ST_TDB_INIT;
applog(LOG_ERR, "Failed to open TDB, limping");
}
@@ -1535,10 +1668,122 @@ int stor_update_cb(void)
* FIXME This is where we should process redundancy decreases.
*/
;
+ } else if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+ if (tabled_srv.state_want == ST_W_MASTER) {
+ if (rtdb_restart(&tdbrep, true)) {
+ applog(LOG_WARNING,
+ "Failed to restart to master");
+ add_reup_timer();
+ }
+ }
}
return num_up;
}
+int tdb_slave_login_cb(int srcid)
+{
+ struct db_remote *master;
+
+ master = tabled_srv.rep_master;
+ if (!master) {
+ applog(LOG_INFO, "No master at login");
+ return -1;
+ }
+ if (master->dbid == 0) {
+ applog(LOG_INFO, "Master dbid %d", srcid);
+ } else {
+ if (master->dbid != srcid) {
+ /*
+ * This is probably a bad news. Perhaps master rebooted
+ * on the other side of the network partition and yet
+ * somehow won a lock in CLD, or something even weirder.
+ * But we don't know.
+ */
+ applog(LOG_INFO,
+ "Master switch from dbid %d to dbid %d",
+ master->dbid, srcid);
+ }
+ }
+ master->dbid = srcid;
+
+ if (tabled_srv.state_tdb == ST_TDB_OPEN) {
+ applog(LOG_INFO, "Established link, master %s dbid %d",
+ master->name, master->dbid);
+ if (tabled_srv.state_want != ST_W_SLAVE) {
+ applog(LOG_ERR, "Unexpected TDB state %s, limping",
+ state_name_tdb[tabled_srv.state_tdb]);
+ rtdb_fini(&tdbrep);
+ tabled_srv.state_tdb = ST_TDB_INIT;
+ return -1;
+ }
+ if (rtdb_start(&tdbrep, tabled_srv.tdb_dir,
+ false,
+ master,
+ tabled_srv.rep_port, tdb_state_cb)) {
+ tabled_srv.state_tdb = ST_TDB_INIT;
+ applog(LOG_ERR, "Failed to open TDB, limping");
+ return -1;
+ }
+ } else if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+ applog(LOG_INFO, "Recovered master connection");
+ } else {
+ applog(LOG_INFO, "Confused about connections");
+ }
+ return 0;
+}
+
+void tdb_slave_disc_cb(void)
+{
+ static const struct timeval tv = { TABLED_MCWAIT_SEC, 0 };
+
+ if (tabled_srv.mc_delay)
+ return;
+ evtimer_add(&tabled_srv.mc_timer, &tv);
+ tabled_srv.mc_delay = true;
+}
+
+static void tdb_mc_delay(int fd, short events, void *userdata)
+{
+ static const unsigned char cmd = TT_CMD_MASTER_LINK_RESET;
+
+ tabled_srv.mc_delay = false;
+ write(tabled_srv.ev_pipe[1], &cmd, 1);
+}
+
+void tdb_conn_scrub_cb(void)
+{
+ unsigned char cmd;
+
+ cmd = TT_CMD_LINK_SCRUB;
+ write(tabled_srv.ev_pipe[1], &cmd, 1);
+}
+
+struct db_remote *tdb_find_remote_byname(const char *name)
+{
+ struct db_remote *rp;
+ GList *tmp;
+
+ for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) {
+ rp = tmp->data;
+ if (strcmp(rp->name, name) == 0)
+ return rp;
+ }
+ return NULL;
+}
+
+struct db_remote *tdb_find_remote_byid(int id)
+{
+ struct db_remote *rp;
+ GList *tmp;
+
+ for (tmp = tabled_srv.rep_remotes; tmp; tmp = tmp->next) {
+ rp = tmp->data;
+ if (rp->dbid == id)
+ return rp;
+ }
+ return NULL;
+}
+
static int net_open_socket(int addr_fam, int sock_type, int sock_prot,
int addr_len, void *addr_ptr, bool is_status)
{
@@ -1833,26 +2078,66 @@ static void compile_patterns(void)
}
}
-static void tdb_state_process(enum st_tdb new_state)
+static void tdb_startup(void)
{
unsigned int db_flags;
- if (debugging)
- applog(LOG_DEBUG, "TDB state > %s", state_name_tdb[new_state]);
- if ((new_state == ST_TDB_MASTER || new_state == ST_TDB_SLAVE) &&
- tabled_srv.state_tdb == ST_TDB_ACTIVE) {
+ db_flags = DB_CREATE | DB_THREAD;
+ if (tdb_up(&tdbrep.tdb, db_flags))
+ return;
+ if (objid_init(&tabled_srv.object_count, &tdbrep.tdb)) {
+ tdb_down(&tdbrep.tdb);
+ return;
+ }
+ add_chkpt_timer();
+ rep_start();
+ net_listen_client();
+}
- db_flags = DB_CREATE | DB_THREAD;
- if (tdb_up(&tdb, db_flags))
- return;
+static void tdb_state_process(enum st_tdb new_state)
+{
- if (objid_init(&tabled_srv.object_count, &tdb)) {
- tdb_down(&tdb);
- return;
+ applog(LOG_INFO, "TDB state %s > %s",
+ state_name_tdb[tabled_srv.state_tdb], state_name_tdb[new_state]);
+
+ if (tabled_srv.state_tdb == ST_TDB_OPEN) {
+ if (new_state == ST_TDB_MASTER) {
+ if (tabled_srv.state_want == ST_W_MASTER) {
+ tdb_startup();
+ } else {
+ /*
+ * We want slave if we cannot connect to CLD,
+ * or we cannot lock the master file, which
+ * means that other master may exist.
+ * But the db goes master on us, so
+ * either the other master is dead or we're
+ * misconfigured so DBs cannot talk.
+ * Either way, we should poke db until the
+ * desired result is accomplished. XXX
+ */
+ applog(LOG_INFO, "TDB went Master on us");
+ }
+ } else if (new_state == ST_TDB_SLAVE) {
+ applog(LOG_INFO, "TDB went Slave, so whatever");
+ ;
+ } else {
+ applog(LOG_ERR, "TDB went to unexpected state");
+ }
+ } else if (tabled_srv.state_tdb == ST_TDB_SLAVE) {
+ if (new_state == ST_TDB_MASTER) {
+ if (tabled_srv.state_want == ST_W_MASTER) {
+ tdb_startup();
+ } else {
+ /*
+ * This is either a net split or CLD is doing
+ * its timeouts and so we do not want to be
+ * a master yet.
+ */
+ applog(LOG_ERR, "TDB upgraded on us");
+ }
+ } else {
+ applog(LOG_ERR, "TDB is confused");
}
- add_chkpt_timer();
- rep_start();
- net_listen_client();
}
}
@@ -1871,6 +2156,11 @@ static void internal_event(int fd, short events, void *userdata)
abort();
}
+ if (debugging) {
+ applog(LOG_DEBUG, "Context Event %s, TDB state %s",
+ cmd_name_tdb[cmd], state_name_tdb[tabled_srv.state_tdb]);
+ }
+
switch (cmd) {
case TT_CMD_DUMP:
stats_dump();
@@ -1890,6 +2180,15 @@ static void internal_event(int fd, short events, void *userdata)
}
break;
+ case TT_CMD_MASTER_LINK_RESET:
+ rtdb_mc_reset(&tdbrep, tabled_srv.state_want == ST_W_MASTER,
+ tabled_srv.rep_master, tabled_srv.rep_port);
+ break;
+
+ case TT_CMD_LINK_SCRUB:
+ rtdb_dbc_scrub(&tdbrep);
+ break;
+
default:
applog(LOG_WARNING, "%s BUG: command 0x%x", __func__, cmd);
break;
@@ -1905,6 +2204,7 @@ int main (int argc, char *argv[])
INIT_LIST_HEAD(&tabled_srv.all_stor);
INIT_LIST_HEAD(&tabled_srv.write_compl_q);
tabled_srv.state_tdb = ST_TDB_INIT;
+ tabled_srv.rep_next_id = DBID_MIN;
/* isspace() and strcasecmp() consistency requires this */
setlocale(LC_ALL, "C");
@@ -1978,6 +2278,8 @@ int main (int argc, char *argv[])
tabled_srv.evbase_main = event_init();
event_base_rep = event_base_new();
evtimer_set(&tabled_srv.chkpt_timer, tdb_checkpoint, NULL);
+ evtimer_set(&tabled_srv.mc_timer, tdb_mc_delay, NULL);
+ evtimer_set(&tabled_srv.reup_timer, tdb_reup, NULL);
/* set up internal communication pipe */
if (pipe(tabled_srv.ev_pipe) < 0) {
@@ -1991,6 +2293,13 @@ int main (int argc, char *argv[])
goto err_pevt;
}
+ /* late-construct structures with allocations */
+ if (rtdb_init(&tdbrep, tabled_srv.ourhost)) {
+ applog(LOG_WARNING, "rtdb_init");
+ rc = 1;
+ goto err_rtdb;
+ }
+
/* set up server networking */
if (tabled_srv.status_port) {
if (net_open_known(tabled_srv.status_port, true) == 0)
@@ -2000,7 +2309,8 @@ int main (int argc, char *argv[])
if (rc)
goto err_out_net;
- if (cld_begin(tabled_srv.ourhost, tabled_srv.group, verbose) != 0) {
+ if (cld_begin(tabled_srv.ourhost, tabled_srv.group,
+ tabled_srv.rep_name, verbose) != 0) {
rc = 1;
goto err_cld_session;
}
@@ -2023,13 +2333,13 @@ err_cld_session:
err_out_net:
if (tabled_srv.state_tdb == ST_TDB_MASTER ||
tabled_srv.state_tdb == ST_TDB_SLAVE) {
- tdb_down(&tdb);
- tdb_fini(&tdb);
- } else if (tabled_srv.state_tdb == ST_TDB_OPEN ||
- tabled_srv.state_tdb == ST_TDB_ACTIVE) {
- tdb_fini(&tdb);
+ tdb_down(&tdbrep.tdb);
+ rtdb_fini(&tdbrep);
+ } else if (tabled_srv.state_tdb == ST_TDB_OPEN) {
+ rtdb_fini(&tdbrep);
}
-/* err_tdb_init: */
+err_rtdb:
+ event_del(&tabled_srv.pevt);
err_pevt:
close(tabled_srv.ev_pipe[0]);
close(tabled_srv.ev_pipe[1]);
diff --git a/server/tabled.h b/server/tabled.h
index ff419e3..c90511c 100644
--- a/server/tabled.h
+++ b/server/tabled.h
@@ -45,6 +45,8 @@ enum {
TABLED_CHKPT_SEC = 60 * 5, /* secs between db4 chkpt */
TABLED_RESCAN_SEC = 60*3 + 7, /* secs btw key rescans */
+ TABLED_MCWAIT_SEC = 35, /* secs to moderate reconn. */
+ TABLED_REUP_SEC = 35, /* secs to retry rtdb_restart */
CHUNK_REBOOT_TIME = 3*60, /* secs to declare chunk dead */
@@ -200,8 +202,12 @@ struct client {
char req_buf[CLI_REQ_BUF_SZ]; /* input buffer */
};
+enum st_want {
+ ST_W_INIT, ST_W_MASTER, ST_W_SLAVE
+};
+
enum st_tdb {
- ST_TDB_INIT, ST_TDB_OPEN, ST_TDB_ACTIVE, ST_TDB_MASTER, ST_TDB_SLAVE,
+ ST_TDB_INIT, ST_TDB_OPEN, ST_TDB_MASTER, ST_TDB_SLAVE,
ST_TDBNUM
};
@@ -218,6 +224,17 @@ struct server_stats {
unsigned long max_write_buf;
};
+#define DBID_NONE 0
+#define DBID_MIN 2
+#define DBID_MAX 105
+
+struct db_remote { /* other DB nodes */
+ char *name; /* do not resolve as a host */
+ char *host;
+ unsigned short port;
+ int dbid; /* signed in db4, traditional */
+};
+
struct listen_cfg {
/* bool encrypt; */
/* char *host; */
@@ -233,6 +250,8 @@ struct server {
int ev_pipe[2];
struct event pevt;
struct list_head write_compl_q; /* list of done writes */
+ bool mc_delay;
+ struct event mc_timer;
char *config; /* config file (static) */
@@ -242,6 +261,7 @@ struct server {
char *port_file;
char *chunk_user; /* username for stc_new */
char *chunk_key; /* key for stc_new */
+ char *rep_name; /* db4 replication name */
unsigned short rep_port; /* db4 replication port */
char *status_port; /* status webserver */
char *group; /* our group (both T and Ch) */
@@ -249,12 +269,16 @@ struct server {
char *ourhost; /* use this if DB master */
struct database *db; /* database handle */
GList *rep_remotes;
+ struct db_remote *rep_master; /* if we're slave */
+ int rep_next_id;
+ struct event reup_timer;
GList *sockets;
struct list_head all_stor; /* struct storage_node */
int num_stor; /* number of storage_node's */
uint64_t object_count;
+ enum st_want state_want;
enum st_tdb state_tdb;
enum st_net state_net;
@@ -263,7 +287,55 @@ struct server {
struct server_stats stats; /* global statistics */
};
-extern struct tabledb tdb;
+/*
+ * Low-level channel, for both sides.
+ *
+ * The combined link state confuses session (e.g. login) and the framing, which
+ * is not pretty but works. At least we have a separate link-state struct.
+ *
+ * In a settled state, db_conn corresponds 1:1 to db_remote, but
+ * it's not necesserily so when connections are being established.
+ */
+enum dbc_state { DBC_INIT, DBC_LOGIN, DBC_OPEN, DBC_DEAD };
+
+struct db_link {
+ int fd;
+ enum dbc_state state;
+
+ bool writing;
+ struct event wrev; /* when writing */
+ unsigned char *obuf;
+ int obuflen;
+ int done, togo;
+
+ struct event rcev; /* whenever fd >= 0 */
+ unsigned char *ibuf;
+ int ibuflen; /* currently allocated ibuf */
+ int cnt; /* currently in ibuf */
+ int explen; /* expected length */
+};
+
+struct db_conn { /* a connection with other DB node */
+ struct tablerep *rtdb;
+ struct db_remote *remote;
+ struct list_head link;
+
+ struct db_link lk;
+};
+
+struct tablerep {
+ struct tabledb tdb;
+ const char *thisname;
+ int thisid;
+
+ int sockfd4, sockfd6;
+ struct event lsev4, lsev6;
+ struct list_head conns; // struct db_conn
+
+ struct db_conn *mdbc;
+};
+
+extern struct tablerep tdbrep;
/* bucket.c */
extern bool has_access(const char *user, const char *bucket, const char *key,
@@ -295,7 +367,8 @@ extern void cli_in_end(struct client *cli);
/* cldu.c */
extern void cld_init(void);
-extern int cld_begin(const char *fqdn, const char *group, int verbose);
+extern int cld_begin(const char *fqdn, const char *group, const char *name,
+ int verbose);
extern void cldu_add_host(const char *host, unsigned int port);
extern void cld_end(void);
@@ -332,7 +405,13 @@ extern bool cli_write_start(struct client *cli);
extern bool cli_write_run_compl(void);
extern int cli_req_avail(struct client *cli);
extern void applog(int prio, const char *fmt, ...);
+extern void cld_update_cb(void);
extern int stor_update_cb(void);
+extern int tdb_slave_login_cb(int srcid);
+extern void tdb_slave_disc_cb(void);
+extern void tdb_conn_scrub_cb(void);
+extern struct db_remote *tdb_find_remote_byname(const char *name);
+extern struct db_remote *tdb_find_remote_byid(int id);
/* status.c */
extern bool stat_evt_http_req(struct client *cli, unsigned int events);
@@ -374,4 +453,16 @@ extern void rep_start(void);
extern void rep_stats(void);
extern bool rep_status(struct client *cli, GList *content);
+/* metarep.c */
+extern int rtdb_init(struct tablerep *rtdb, const char *thishost);
+extern int rtdb_start(struct tablerep *rtdb, const char *db_home,
+ bool we_are_master,
+ struct db_remote *rep_master, unsigned short rep_port,
+ void (*cb)(enum db_event));
+extern void rtdb_mc_reset(struct tablerep *rtdb, bool we_are_master,
+ struct db_remote *rep_master, unsigned short rep_port);
+extern void rtdb_dbc_scrub(struct tablerep *rtdb);
+extern int rtdb_restart(struct tablerep *rtdb, bool we_are_master);
+extern void rtdb_fini(struct tablerep *rtdb);
+
#endif /* __TABLED_H__ */
diff --git a/server/tdbadm.c b/server/tdbadm.c
index 86fa4b3..4bd26cc 100644
--- a/server/tdbadm.c
+++ b/server/tdbadm.c
@@ -45,11 +45,10 @@ enum various_modes {
static int mode_adm;
static unsigned long invalid_lines;
static char *tdb_dir;
-static unsigned short rep_port;
static char *config = "/etc/tabled.conf";
-static char *ourhost;
static struct tabledb tdb;
+static bool tdb_is_master;
const char *argp_program_version = PACKAGE_VERSION;
@@ -110,7 +109,6 @@ static void cfg_elm_end(GMarkupParseContext *context,
{
struct config_context *cc = user_data;
struct stat statb;
- int n;
if (!strcmp(element_name, "TDB") && cc->text) {
if (!tdb_dir) {
@@ -134,25 +132,6 @@ static void cfg_elm_end(GMarkupParseContext *context,
cc->text = NULL;
}
- else if (!strcmp(element_name, "ForceHost") && cc->text) {
- free(ourhost);
- ourhost = cc->text;
- cc->text = NULL;
- }
-
- else if (!strcmp(element_name, "TDBRepPort") && cc->text) {
- n = strtol(cc->text, NULL, 10);
- if (n <= 0 || n >= 65536) {
- fprintf(stderr, "warning: "
- "TDBRepPort '%s' invalid, ignoring", cc->text);
- free(cc->text);
- cc->text = NULL;
- return;
- }
- rep_port = n;
- free(cc->text);
- cc->text = NULL;
- }
}
static bool str_n_isspace(const char *s, size_t n)
@@ -198,8 +177,6 @@ static void read_config(void)
memset(&ctx, 0, sizeof(struct config_context));
- rep_port = 8083;
-
if (!g_file_get_contents(config, &text, &len, NULL)) {
fprintf(stderr, "failed to read config file %s\n", config);
exit(1);
@@ -603,10 +580,15 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
return 0;
}
+static void tdb_state_cb(enum db_event event)
+{
+ if (event == TDB_EV_MASTER)
+ tdb_is_master = true;
+}
+
int main(int argc, char *argv[])
{
- char hostname[64];
- unsigned int env_flags, db_flags;
+ unsigned int db_flags;
error_t aprc;
int rc = 1;
@@ -621,21 +603,12 @@ int main(int argc, char *argv[])
if (!tdb_dir)
die("no tdb dir (-t) specified\n");
- if (ourhost)
- strcpy(hostname, ourhost);
- else if (gethostname(hostname, sizeof(hostname)) < 0) {
- fprintf(stderr, "gethostname failed: %s\n", strerror(errno));
- return 1;
- }
-
- env_flags = DB_RECOVER | DB_CREATE | DB_THREAD;
- if (tdb_init(&tdb, tdb_dir, NULL, env_flags,
- "tdbadm", false, NULL, hostname, rep_port, NULL))
+ if (tdb_init(&tdb, tdb_dir, NULL, "tdbadm", false,
+ 0, NULL, true, tdb_state_cb))
goto err_dbinit;
- /* Usually takes about 12s */
- /* FIXME don't peek into private parts of tdb struct, use state_cb */
- while (!tdb.is_master)
+ /* Usually takes about 12s, if vote is involved. */
+ while (!tdb_is_master)
sleep(2);
db_flags = DB_CREATE | DB_THREAD;
^ permalink raw reply related [flat|nested] 5+ messages in thread