From: Jeff Garzik <jeff@garzik.org>
To: hail-devel@vger.kernel.org
Subject: [PATCH 1/3] CLD: convert back to libevent
Date: Fri, 31 Dec 2010 05:56:34 -0500 [thread overview]
Message-ID: <20101231105634.GA4210@havoc.gtf.org> (raw)
Switch CLD from hand-rolled server poll code, to libevent. Follows
similar techniques and rationale as chunkd commit
c1aed7464f237e5a6309351bf003162c77d69e27. This reverts ancient commit
90b3b5edcf5aa00577f4395fdbb490ed7e9be824.
Signed-off-by: Jeff Garzik <jgarzik@redhat.com>
---
cld/Makefile.am | 3 -
cld/cld.h | 22 +++----
cld/server.c | 161 ++++++++++++++++++++------------------------------------
cld/session.c | 69 ++++++++++++++++--------
4 files changed, 118 insertions(+), 137 deletions(-)
diff --git a/cld/Makefile.am b/cld/Makefile.am
index 9a13ce0..30eea0b 100644
--- a/cld/Makefile.am
+++ b/cld/Makefile.am
@@ -12,7 +12,8 @@ cld_SOURCES = cldb.h cld.h \
cldb.c msg.c server.c session.c util.c
cld_LDADD = \
../lib/libhail.la @GLIB_LIBS@ @CRYPTO_LIBS@ \
- @SSL_LIBS@ @DB4_LIBS@ @XML_LIBS@ @LIBCURL@
+ @SSL_LIBS@ @DB4_LIBS@ @XML_LIBS@ @LIBCURL@ \
+ @EVENT_LIBS@
cldbadm_SOURCES = cldb.h cldbadm.c
cldbadm_LDADD = @CRYPTO_LIBS@ @GLIB_LIBS@ @DB4_LIBS@
diff --git a/cld/cld.h b/cld/cld.h
index 4c0099f..17f14b8 100644
--- a/cld/cld.h
+++ b/cld/cld.h
@@ -22,8 +22,9 @@
#include <netinet/in.h>
#include <sys/time.h>
-#include <poll.h>
+#include <event.h>
#include <glib.h>
+#include <elist.h>
#include "cldb.h"
#include <cld_msg_rpc.h>
#include <cld_common.h>
@@ -59,13 +60,13 @@ struct session {
uint64_t last_contact;
uint64_t next_fh;
- struct cld_timer timer;
+ struct event timer;
uint64_t next_seqid_in;
uint64_t next_seqid_out;
GList *out_q; /* outgoing pkts (to client) */
- struct cld_timer retry_timer;
+ struct event retry_timer;
char user[CLD_MAX_USERNAME];
@@ -85,10 +86,10 @@ struct server_stats {
unsigned long garbage; /* num. garbage pkts dropped */
};
-struct server_poll {
+struct server_socket {
int fd;
- bool (*cb)(int fd, short events, void *userdata);
- void *userdata;
+ struct event ev;
+ struct list_head sockets_node;
};
struct server {
@@ -103,14 +104,13 @@ struct server {
struct cldb cldb; /* database info */
- GArray *polls;
- GArray *poll_data;
+ struct event_base *evbase_main;
- GHashTable *sessions;
+ struct list_head sockets;
- struct cld_timer_list timers;
+ GHashTable *sessions;
- struct cld_timer chkpt_timer; /* db4 checkpoint timer */
+ struct event chkpt_timer; /* db4 checkpoint timer */
struct server_stats stats; /* global statistics */
};
diff --git a/cld/server.c b/cld/server.c
index 7a57785..aed501b 100644
--- a/cld/server.c
+++ b/cld/server.c
@@ -559,7 +559,7 @@ static void simple_sendresp(int sock_fd, const struct client *cli,
info->op);
}
-static bool udp_srv_event(int fd, short events, void *userdata)
+static void udp_srv_event(int fd, short events, void *userdata)
{
struct client cli;
char host[64];
@@ -586,7 +586,7 @@ static bool udp_srv_event(int fd, short events, void *userdata)
rrc = recvmsg(fd, &hdr, 0);
if (rrc < 0) {
syslogerr("UDP recvmsg");
- return true; /* continue main loop; do NOT terminate server */
+ return;
}
cli.addr_len = hdr.msg_namelen;
@@ -601,59 +601,60 @@ static bool udp_srv_event(int fd, short events, void *userdata)
if (!parse_pkt_header(raw_pkt, rrc, &pkt, &hdr_len)) {
cld_srv.stats.garbage++;
- return true;
+ return;
}
if (!get_pkt_info(&pkt, raw_pkt, rrc, hdr_len, &info)) {
xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
cld_srv.stats.garbage++;
- return true;
+ return;
}
if (packet_is_dupe(&info)) {
/* silently drop dupes */
xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
- return true;
+ return;
}
err = validate_pkt_session(&info, &cli);
if (err) {
simple_sendresp(fd, &cli, &info, err);
xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
- return true;
+ return;
}
err = pkt_chk_sig(raw_pkt, rrc, &pkt);
if (err) {
simple_sendresp(fd, &cli, &info, err);
xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
- return true;
+ return;
}
if (!(cld_srv.cldb.is_master && cld_srv.cldb.up)) {
simple_sendmsg(fd, &cli, pkt.sid, pkt.user, 0xdeadbeef,
(xdrproc_t)xdr_void, NULL, CMO_NOT_MASTER);
xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
- return true;
+ return;
}
err = udp_rx(fd, &cli, &info, raw_pkt, rrc);
if (err) {
simple_sendresp(fd, &cli, &info, err);
xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
- return true;
+ return;
}
xdr_free((xdrproc_t)xdr_cld_pkt_hdr, (char *)&pkt);
- return true;
}
static void add_chkpt_timer(void)
{
- cld_timer_add(&cld_srv.timers, &cld_srv.chkpt_timer,
- time(NULL) + CLD_CHKPT_SEC);
+ struct timeval tv = { .tv_sec = CLD_CHKPT_SEC };
+
+ if (evtimer_add(&cld_srv.chkpt_timer, &tv) < 0)
+ HAIL_WARN(&srv_log, "chkpt timer add failed");
}
-static void cldb_checkpoint(struct cld_timer *timer)
+static void cldb_checkpoint(int fd, short events, void *userdata)
{
DB_ENV *dbenv = cld_srv.cldb.env;
int rc;
@@ -690,28 +691,28 @@ static int net_write_port(const char *port_file, const char *port_str)
static void net_close(void)
{
- struct pollfd *pfd;
- int i;
-
- if (!cld_srv.polls)
- return;
-
- for (i = 0; i < cld_srv.polls->len; i++) {
- pfd = &g_array_index(cld_srv.polls, struct pollfd, i);
- if (pfd->fd >= 0) {
- if (close(pfd->fd) < 0)
- HAIL_WARN(&srv_log, "%s(%d): %s",
- __func__, pfd->fd, strerror(errno));
- pfd->fd = -1;
+ struct server_socket *tmp, *iter;
+
+ list_for_each_entry_safe(tmp, iter, &cld_srv.sockets, sockets_node) {
+ if (tmp->fd >= 0) {
+ if (event_del(&tmp->ev) < 0)
+ HAIL_WARN(&srv_log, "Event delete(%d) failed",
+ tmp->fd);
+ if (close(tmp->fd) < 0)
+ HAIL_WARN(&srv_log, "Close(%d) failed: %s",
+ tmp->fd, strerror(errno));
+ tmp->fd = -1;
}
+
+ list_del(&tmp->sockets_node);
+ free(tmp);
}
}
static int net_open_socket(int addr_fam, int sock_type, int sock_prot,
int addr_len, void *addr_ptr)
{
- struct server_poll sp;
- struct pollfd pfd;
+ struct server_socket *sock;
int fd, rc;
fd = socket(addr_fam, sock_type, sock_prot);
@@ -732,15 +733,25 @@ static int net_open_socket(int addr_fam, int sock_type, int sock_prot,
return -errno;
}
- sp.fd = fd;
- sp.cb = udp_srv_event;
- sp.userdata = NULL;
- g_array_append_val(cld_srv.poll_data, sp);
+ sock = calloc(1, sizeof(*sock));
+ if (!sock) {
+ close(fd);
+ return -ENOMEM;
+ }
+
+ sock->fd = fd;
+ INIT_LIST_HEAD(&sock->sockets_node);
+
+ event_set(&sock->ev, fd, EV_READ | EV_PERSIST,
+ udp_srv_event, sock);
+
+ if (event_add(&sock->ev, NULL) < 0) {
+ free(sock);
+ close(fd);
+ return -EIO;
+ }
- pfd.fd = fd;
- pfd.events = POLLIN;
- pfd.revents = 0;
- g_array_append_val(cld_srv.polls, pfd);
+ list_add_tail(&sock->sockets_node, &cld_srv.sockets);
return fd;
}
@@ -891,11 +902,13 @@ static void segv_signal(int signo)
static void term_signal(int signo)
{
server_running = false;
+ event_loopbreak();
}
static void stats_signal(int signo)
{
dump_stats = true;
+ event_loopbreak();
}
#define X(stat) \
@@ -975,73 +988,16 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
static int main_loop(void)
{
- time_t next_timeout;
-
- next_timeout = cld_timers_run(&cld_srv.timers);
-
while (server_running) {
- struct pollfd *pfd;
- int i, fired, rc;
-
cld_srv.stats.poll++;
-
- /* poll for fd activity, or next timer event */
- rc = poll(&g_array_index(cld_srv.polls, struct pollfd, 0),
- cld_srv.polls->len,
- next_timeout ? (next_timeout * 1000) : -1);
- if (rc < 0) {
- syslogerr("poll");
- if (errno != EINTR)
- break;
- }
+ event_dispatch();
gettimeofday(¤t_time, NULL);
- /* determine which fd's fired; call their callbacks */
- fired = 0;
- for (i = 0; i < cld_srv.polls->len; i++) {
- struct server_poll *sp;
- bool runrunrun;
- short revents;
-
- /* ref pollfd struct */
- pfd = &g_array_index(cld_srv.polls, struct pollfd, i);
-
- /* if no events fired, move on to next */
- if (!pfd->revents)
- continue;
-
- fired++;
-
- revents = pfd->revents;
- pfd->revents = 0;
-
- /* ref 1:1 matching server_poll struct */
- sp = &g_array_index(cld_srv.poll_data,
- struct server_poll, i);
-
- cld_srv.stats.event++;
-
- /* call callback, shutting down server if requested */
- runrunrun = sp->cb(sp->fd, revents, sp->userdata);
- if (!runrunrun) {
- server_running = false;
- break;
- }
-
- /* if we reached poll(2) activity count, it is
- * pointless to continue looping
- */
- if (fired == rc)
- break;
- }
-
if (dump_stats) {
dump_stats = false;
stats_dump();
}
-
- next_timeout = cld_timers_run(&cld_srv.timers);
}
return 0;
@@ -1052,6 +1008,8 @@ int main (int argc, char *argv[])
error_t aprc;
int rc = 1;
+ INIT_LIST_HEAD(&cld_srv.sockets);
+
/* isspace() and strcasecmp() consistency requires this */
setlocale(LC_ALL, "C");
@@ -1075,6 +1033,8 @@ int main (int argc, char *argv[])
if (use_syslog)
openlog(PROGRAM_NAME, LOG_PID, LOG_LOCAL3);
+ cld_srv.evbase_main = event_init();
+
if (!(cld_srv.flags & SFL_FOREGROUND) && (daemon(1, !use_syslog) < 0)) {
syslogerr("daemon");
goto err_out;
@@ -1103,17 +1063,13 @@ int main (int argc, char *argv[])
ensure_root();
- cld_timer_init(&cld_srv.chkpt_timer, "db4-checkpoint",
- cldb_checkpoint, NULL);
+ evtimer_set(&cld_srv.chkpt_timer, cldb_checkpoint, NULL);
add_chkpt_timer();
rc = 1;
cld_srv.sessions = g_hash_table_new(sess_hash, sess_equal);
- cld_srv.poll_data = g_array_sized_new(FALSE, FALSE,
- sizeof(struct server_poll), 4);
- cld_srv.polls = g_array_sized_new(FALSE,FALSE,sizeof(struct pollfd), 4);
- if (!cld_srv.sessions || !cld_srv.poll_data || !cld_srv.polls)
+ if (!cld_srv.sessions)
goto err_out_pid;
if (sess_load(cld_srv.sessions) != 0)
@@ -1137,7 +1093,8 @@ int main (int argc, char *argv[])
HAIL_INFO(&srv_log, "shutting down");
if (strict_free)
- cld_timer_del(&cld_srv.timers, &cld_srv.chkpt_timer);
+ if (evtimer_del(&cld_srv.chkpt_timer) < 0)
+ HAIL_WARN(&srv_log, "chkpt timer del failed");
if (cld_srv.cldb.up)
cldb_down(&cld_srv.cldb);
@@ -1149,8 +1106,6 @@ err_out_pid:
err_out:
if (strict_free) {
net_close();
- g_array_free(cld_srv.polls, TRUE);
- g_array_free(cld_srv.poll_data, TRUE);
sessions_free();
g_hash_table_unref(cld_srv.sessions);
}
diff --git a/cld/session.c b/cld/session.c
index d76186b..9887aaa 100644
--- a/cld/session.c
+++ b/cld/session.c
@@ -43,8 +43,8 @@ struct session_outpkt {
void *done_data;
};
-static void session_retry(struct cld_timer *);
-static void session_timeout(struct cld_timer *);
+static void session_retry(int, short, void *);
+static void session_timeout(int, short, void *);
static int sess_load_db(GHashTable *ss, DB_TXN *txn);
static void op_unref(struct session_outpkt *op);
@@ -87,8 +87,8 @@ static struct session *session_new(void)
cld_rand64(&sess->next_seqid_out);
- cld_timer_init(&sess->timer, "session-timeout", session_timeout, sess);
- cld_timer_init(&sess->retry_timer, "session-retry", session_retry, sess);
+ evtimer_set(&sess->timer, session_timeout, sess);
+ evtimer_set(&sess->retry_timer, session_retry, sess);
return sess;
}
@@ -103,8 +103,10 @@ static void session_free(struct session *sess, bool hash_remove)
if (hash_remove)
g_hash_table_remove(cld_srv.sessions, sess->sid);
- cld_timer_del(&cld_srv.timers, &sess->timer);
- cld_timer_del(&cld_srv.timers, &sess->retry_timer);
+ if (evtimer_del(&sess->timer) < 0)
+ HAIL_ERR(&srv_log, "sess timer del failed");
+ if (evtimer_del(&sess->retry_timer) < 0)
+ HAIL_ERR(&srv_log, "sess retry timer del failed");
tmp = sess->out_q;
while (tmp) {
@@ -376,9 +378,9 @@ static void session_ping_done(struct session_outpkt *outpkt)
outpkt->sess->ping_open = false;
}
-static void session_timeout(struct cld_timer *timer)
+static void session_timeout(int fd, short events, void *userdata)
{
- struct session *sess = timer->userdata;
+ struct session *sess = userdata;
uint64_t sess_expire;
int rc;
DB_ENV *dbenv = cld_srv.cldb.env;
@@ -387,6 +389,8 @@ static void session_timeout(struct cld_timer *timer)
sess_expire = sess->last_contact + CLD_SESS_TIMEOUT;
if (!sess->dead && (sess_expire > now)) {
+ struct timeval tv;
+
if (!sess->ping_open &&
(sess_expire > (sess->last_contact + (CLD_SESS_TIMEOUT / 2) &&
(sess->sock_fd > 0)))) {
@@ -396,9 +400,12 @@ static void session_timeout(struct cld_timer *timer)
session_ping_done, NULL);
}
- cld_timer_add(&cld_srv.timers, &sess->timer,
- now + ((sess_expire - now) / 2) + 1);
- return; /* timer added; do not time out session */
+ tv.tv_sec = ((sess_expire - now) / 2) + 1;
+ tv.tv_usec = 0;
+ if (evtimer_add(&sess->timer, &tv) < 0)
+ HAIL_ERR(&srv_log, "timer add failed, sid " SIDFMT,
+ SIDARG(sess->sid));
+ return; /* timer added; do not time out session */
}
HAIL_INFO(&srv_log, "session %s, addr %s sid " SIDFMT,
@@ -554,25 +561,33 @@ static int sess_retry_output(struct session *sess, time_t *next_retry_out)
return rc;
}
-static void session_retry(struct cld_timer *timer)
+static void session_retry(int fd, short events, void *userdata)
{
- struct session *sess = timer->userdata;
+ struct session *sess = userdata;
time_t next_retry;
+ time_t now = time(NULL);
+ struct timeval tv;
if (!sess->out_q)
return;
sess_retry_output(sess, &next_retry);
- cld_timer_add(&cld_srv.timers, &sess->retry_timer, next_retry);
+ tv.tv_sec = next_retry - now;
+ tv.tv_usec = 0;
+
+ if (evtimer_add(&sess->retry_timer, &tv) < 0)
+ HAIL_ERR(&srv_log, "retry timer re-add failed");
}
static void session_outq(struct session *sess, GList *new_pkts)
{
/* if out_q empty, start retry timer */
- if (!sess->out_q)
- cld_timer_add(&cld_srv.timers, &sess->retry_timer,
- time(NULL) + CLD_RETRY_START);
+ if (!sess->out_q) {
+ struct timeval tv = { .tv_sec = CLD_RETRY_START };
+ if (evtimer_add(&sess->retry_timer, &tv) < 0)
+ HAIL_ERR(&srv_log, "retry timer start failed");
+ }
sess->out_q = g_list_concat(sess->out_q, new_pkts);
}
@@ -766,7 +781,8 @@ void msg_ack(struct session *sess, uint64_t seqid)
}
if (!sess->out_q)
- cld_timer_del(&cld_srv.timers, &sess->retry_timer);
+ if (evtimer_del(&sess->retry_timer) < 0)
+ HAIL_ERR(&srv_log, "sess retry timer del 2 failed");
}
void msg_new_sess(int sock_fd, const struct client *cli,
@@ -780,6 +796,7 @@ void msg_new_sess(int sock_fd, const struct client *cli,
int rc;
enum cle_err_codes resp_rc = CLE_OK;
struct cld_msg_generic_resp resp;
+ struct timeval tv;
sess = session_new();
if (!sess) {
@@ -832,8 +849,10 @@ void msg_new_sess(int sock_fd, const struct client *cli,
g_hash_table_insert(cld_srv.sessions, sess->sid, sess);
/* begin session timer */
- cld_timer_add(&cld_srv.timers, &sess->timer,
- time(NULL) + (CLD_SESS_TIMEOUT / 2));
+ tv.tv_sec = CLD_SESS_TIMEOUT / 2;
+ tv.tv_usec = 0;
+ if (evtimer_add(&sess->timer, &tv) < 0)
+ HAIL_ERR(&srv_log, "sess timer start failed");
/* send new-sess reply */
resp.code = CLE_OK;
@@ -933,6 +952,8 @@ static int sess_load_db(GHashTable *ss, DB_TXN *txn)
val.flags = DB_DBT_USERMEM;
while (1) {
+ struct timeval tv;
+
rc = cur->get(cur, &key, &val, DB_NEXT);
if (rc == DB_NOTFOUND)
break;
@@ -960,8 +981,12 @@ static int sess_load_db(GHashTable *ss, DB_TXN *txn)
g_hash_table_insert(ss, sess->sid, sess);
/* begin session timer */
- cld_timer_add(&cld_srv.timers, &sess->timer,
- time(NULL) + (CLD_SESS_TIMEOUT / 2));
+ tv.tv_sec = CLD_SESS_TIMEOUT / 2;
+ tv.tv_usec = 0;
+ if (evtimer_add(&sess->timer, &tv) < 0) {
+ HAIL_ERR(&srv_log, "sess timer loop start failed");
+ break;
+ }
}
cur->close(cur);
next reply other threads:[~2010-12-31 10:56 UTC|newest]
Thread overview: 6+ messages / expand[flat|nested] mbox.gz Atom feed top
2010-12-31 10:56 Jeff Garzik [this message]
2010-12-31 10:57 ` [PATCH 2/3] CLD: switch network proto from UDP to TCP Jeff Garzik
2010-12-31 10:58 ` [PATCH 3/3] CLD: enable replication on server and client Jeff Garzik
2011-01-02 23:32 ` [PATCH 2/3] CLD: switch network proto from UDP to TCP Pete Zaitcev
2011-01-03 10:43 ` Jim Meyering
2011-01-03 18:00 ` Jeff Garzik
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20101231105634.GA4210@havoc.gtf.org \
--to=jeff@garzik.org \
--cc=hail-devel@vger.kernel.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.