From: Jeff Garzik <jeff@garzik.org>
To: hail-devel@vger.kernel.org
Subject: [PATCH] tabled: use libhail's anet
Date: Thu, 23 Sep 2010 20:34:48 -0400 [thread overview]
Message-ID: <20100924003448.GA28284@havoc.gtf.org> (raw)
In-Reply-To: <20100924003237.GA28163@havoc.gtf.org>
Updated for move to libhail. Not committed on main branch until sme
more time passes.
server/bucket.c | 8 -
server/object.c | 56 +++++-----
server/server.c | 294 ++++++++++++++------------------------------------------
server/status.c | 3
server/tabled.h | 45 ++------
5 files changed, 123 insertions(+), 283 deletions(-)
diff --git a/server/bucket.c b/server/bucket.c
index eb03e03..a81eca1 100644
--- a/server/bucket.c
+++ b/server/bucket.c
@@ -566,13 +566,13 @@ bool bucket_add(struct client *cli, const char *user, const char *bucket)
bucket) < 0)
return cli_err(cli, InternalError);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out:
rc = txn->abort(txn);
@@ -718,13 +718,13 @@ bool bucket_del(struct client *cli, const char *user, const char *bucket)
hutil_time2str(timestr, sizeof(timestr), time(NULL))) < 0)
return cli_err(cli, InternalError);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out:
rc = txn->abort(txn);
diff --git a/server/object.c b/server/object.c
index 3801e94..64cc8b6 100644
--- a/server/object.c
+++ b/server/object.c
@@ -227,13 +227,13 @@ bool object_del(struct client *cli, const char *user,
hutil_time2str(timestr, sizeof(timestr), time(NULL))) < 0)
return cli_err(cli, InternalError);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out:
rc = txn->abort(txn);
@@ -525,13 +525,13 @@ static bool object_put_end(struct client *cli)
return cli_err(cli, InternalError);
}
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out_rb:
rc = txn->abort(txn);
@@ -618,7 +618,8 @@ static int object_put_buf(struct client *cli, struct open_chunk *ochunk,
return 0;
}
-bool cli_evt_http_data_in(struct client *cli, unsigned int events)
+bool cli_evt_http_data_in(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
ssize_t avail;
struct open_chunk *ochunk;
@@ -812,8 +813,8 @@ static bool object_put_body(struct client *cli, const char *user,
cli_out_end(cli);
return cli_err(cli, InternalError);
}
- cli_writeq(cli, cont, strlen(cont), cli_cb_free, cont);
- cli_write_start(cli);
+ atcp_writeq(&cli->wst, cont, strlen(cont), atcp_cb_free, cont);
+ atcp_write_start(&cli->wst);
}
avail = MIN(cli_req_avail(cli), content_len);
@@ -940,13 +941,13 @@ static bool object_put_acls(struct client *cli, const char *user,
return cli_err(cli, InternalError);
}
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out_rb:
rc = txn->abort(txn);
@@ -990,10 +991,10 @@ void cli_in_end(struct client *cli)
cli->in_len = 0;
}
-static bool object_get_more(struct client *cli, void *cb_data, bool done);
+static bool object_get_more(struct atcp_wr_state *wst, void *cb_data, bool done);
/*
- * Return true iff cli_writeq was called. This is compatible with the
+ * Return true iff atcp_writeq was called. This is compatible with the
* convention for cli continuation callbacks, so object_get_more can call us.
*/
static bool object_get_poke(struct client *cli)
@@ -1026,7 +1027,7 @@ static bool object_get_poke(struct client *cli)
if (bytes == 0) {
if (!cli->in_len) {
cli_in_end(cli);
- cli_write_start(cli);
+ atcp_write_start(&cli->wst);
}
free(buf);
return false;
@@ -1034,15 +1035,15 @@ static bool object_get_poke(struct client *cli)
cli->in_len -= bytes;
if (!cli->in_len) {
- if (cli_writeq(cli, buf, bytes, cli_cb_free, buf))
+ if (atcp_writeq(&cli->wst, buf, bytes, atcp_cb_free, buf))
goto err_out;
cli_in_end(cli);
- cli_write_start(cli);
+ atcp_write_start(&cli->wst);
} else {
- if (cli_writeq(cli, buf, bytes, object_get_more, buf))
+ if (atcp_writeq(&cli->wst, buf, bytes, object_get_more, buf))
goto err_out;
- if (cli_wqueued(cli) >= CLI_DATA_BUF_SZ)
- cli_write_start(cli);
+ if (atcp_wqueued(&cli->wst) >= CLI_DATA_BUF_SZ)
+ atcp_write_start(&cli->wst);
}
return true;
@@ -1053,8 +1054,9 @@ err_out:
}
/* callback from the client side: a queued write is being disposed */
-static bool object_get_more(struct client *cli, void *cb_data, bool done)
+static bool object_get_more(struct atcp_wr_state *wst, void *cb_data, bool done)
{
+ struct client *cli = wst->priv;
/* free now-written buffer */
free(cb_data);
@@ -1071,8 +1073,10 @@ static bool object_get_more(struct client *cli, void *cb_data, bool done)
/* callback from the chunkd side: some data is available */
static void object_get_event(struct open_chunk *ochunk)
{
- object_get_poke(ochunk->cli);
- cli_write_run_compl();
+ struct client *cli = ochunk->cli;
+
+ object_get_poke(cli);
+ atcp_write_run_compl(&cli->wst);
}
static int object_node_count_up(struct db_obj_ent *obj)
@@ -1327,7 +1331,7 @@ static bool object_get_body(struct client *cli, const char *user,
if (!want_body) {
cli_in_end(cli);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
return true;
@@ -1347,7 +1351,7 @@ static bool object_get_body(struct client *cli, const char *user,
if (!cli->in_len)
cli_in_end(cli);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
goto err_out_in_end;
@@ -1365,21 +1369,21 @@ static bool object_get_body(struct client *cli, const char *user,
goto err_out_in_end;
memcpy(tmp, buf, bytes);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
free(tmp);
return true;
}
- if (cli_writeq(cli, tmp, bytes,
- cli->in_len ? object_get_more : cli_cb_free, tmp))
+ if (atcp_writeq(&cli->wst, tmp, bytes,
+ cli->in_len ? object_get_more : atcp_cb_free, tmp))
goto err_out_in_end;
start_write:
free(obj);
g_string_free(extra_hdr, TRUE);
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out_in_end:
cli_in_end(cli);
diff --git a/server/server.c b/server/server.c
index 3398026..a4c9928 100644
--- a/server/server.c
+++ b/server/server.c
@@ -56,8 +56,6 @@
const char *argp_program_version = PACKAGE_VERSION;
enum {
- CLI_MAX_WR_IOV = 32, /* max iov per writev(2) */
-
SFL_FOREGROUND = (1 << 0), /* run in foreground */
};
@@ -488,65 +486,25 @@ bool stat_status(struct client *cli, GList *content)
if (asprintf(&str,
"<p>Stats: "
- "poll %lu event %lu tcp_accept %lu opt_write %lu</p>\r\n"
- "<p>Debug: max_write_buf %lu</p>\r\n",
- tabled_srv.stats.poll,
- tabled_srv.stats.event,
- tabled_srv.stats.tcp_accept,
- tabled_srv.stats.opt_write,
- tabled_srv.stats.max_write_buf) < 0)
+ "poll %llu event %llu tcp_accept %llu opt_write %llu</p>\r\n"
+ "<p>Debug: max_write_buf %llu</p>\r\n",
+ (unsigned long long) tabled_srv.stats.poll,
+ (unsigned long long) tabled_srv.stats.event,
+ (unsigned long long) tabled_srv.stats.tcp_accept,
+ (unsigned long long) tabled_srv.stats.opt_write,
+ (unsigned long long) tabled_srv.stats.max_write_buf) < 0)
return false;
content = g_list_append(content, str);
return true;
}
-static void cli_write_complete(struct client *cli, struct client_write *tmp)
-{
- list_del(&tmp->node);
- list_add_tail(&tmp->node, &tabled_srv.write_compl_q);
-}
-
-static bool cli_write_free(struct client_write *tmp, bool done)
-{
- struct client *cli = tmp->cb_cli;
- bool rcb = false;
-
- cli->write_cnt -= tmp->length;
- list_del(&tmp->node);
- if (tmp->cb)
- rcb = tmp->cb(cli, tmp->cb_data, done);
- free(tmp);
-
- return rcb;
-}
-
-static void cli_write_free_all(struct client *cli)
-{
- struct client_write *wr, *tmp;
-
- cli_write_run_compl();
- list_for_each_entry_safe(wr, tmp, &cli->write_q, node) {
- cli_write_free(wr, false);
- }
-}
-
-bool cli_write_run_compl(void)
-{
- struct client_write *wr;
- bool do_loop;
-
- do_loop = false;
- while (!list_empty(&tabled_srv.write_compl_q)) {
- wr = list_entry(tabled_srv.write_compl_q.next,
- struct client_write, node);
- do_loop |= cli_write_free(wr, true);
- }
- return do_loop;
-}
-
static void cli_free(struct client *cli)
{
- cli_write_free_all(cli);
+ if (cli->wst.write_cnt_max > tabled_srv.stats.max_write_buf)
+ tabled_srv.stats.max_write_buf = cli->wst.write_cnt_max;
+ tabled_srv.stats.opt_write += cli->wst.opt_write;
+
+ atcp_wr_exit(&cli->wst);
cli_out_end(cli);
cli_in_end(cli);
@@ -561,27 +519,28 @@ static void cli_free(struct client *cli)
hreq_free(&cli->req);
- if (cli->write_cnt_max > tabled_srv.stats.max_write_buf)
- tabled_srv.stats.max_write_buf = cli->write_cnt_max;
-
if (debugging)
applog(LOG_INFO, "client %s ended", cli->addr_host);
free(cli);
}
-static bool cli_evt_dispose(struct client *cli, unsigned int events)
+static bool cli_evt_dispose(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
/* if write queue is not empty, we should continue to get
* poll callbacks here until it is
*/
- if (list_empty(&cli->write_q))
+ if (atcp_wq_empty(&cli->wst)) {
cli_free(cli);
+ *invalidate_cli = true;
+ }
return false;
}
-static bool cli_evt_recycle(struct client *cli, unsigned int events)
+static bool cli_evt_recycle(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
unsigned int slop;
@@ -608,134 +567,6 @@ static bool cli_evt_recycle(struct client *cli, unsigned int events)
return true;
}
-static void cli_writable(struct client *cli)
-{
- int n_iov;
- struct client_write *tmp;
- ssize_t rc;
- struct iovec iov[CLI_MAX_WR_IOV];
-
- /* accumulate pending writes into iovec */
- n_iov = 0;
- list_for_each_entry(tmp, &cli->write_q, node) {
- if (n_iov == CLI_MAX_WR_IOV)
- break;
- /* bleh, struct iovec should declare iov_base const */
- iov[n_iov].iov_base = (void *) tmp->buf;
- iov[n_iov].iov_len = tmp->togo;
- n_iov++;
- }
-
- /* execute non-blocking write */
-do_write:
- rc = writev(cli->fd, iov, n_iov);
- if (rc < 0) {
- if (errno == EINTR)
- goto do_write;
- if (errno != EAGAIN)
- goto err_out;
- return;
- }
-
- /* iterate through write queue, issuing completions based on
- * amount of data written
- */
- while (rc > 0) {
- int sz;
-
- /* get pointer to first record on list */
- tmp = list_entry(cli->write_q.next, struct client_write, node);
-
- /* mark data consumed by decreasing tmp->len */
- sz = (tmp->togo < rc) ? tmp->togo : rc;
- tmp->togo -= sz;
- tmp->buf += sz;
- rc -= sz;
-
- /* if tmp->len reaches zero, write is complete,
- * so schedule it for clean up (cannot call callback
- * right away or an endless recursion will result)
- */
- if (tmp->togo == 0)
- cli_write_complete(cli, tmp);
- }
-
- /* if we emptied the queue, clear write notification */
- if (list_empty(&cli->write_q)) {
- cli->writing = false;
- if (event_del(&cli->write_ev) < 0) {
- applog(LOG_WARNING, "cli_writable event_del");
- goto err_out;
- }
- }
-
- return;
-
-err_out:
- cli->state = evt_dispose;
- cli_write_free_all(cli);
-}
-
-bool cli_write_start(struct client *cli)
-{
- if (list_empty(&cli->write_q))
- return true; /* loop, not poll */
-
- /* if write-poll already active, nothing further to do */
- if (cli->writing)
- return false; /* poll wait */
-
- /* attempt optimistic write, in hopes of avoiding poll,
- * or at least refill the write buffers so as to not
- * get -immediately- called again by the kernel
- */
- cli_writable(cli);
- if (list_empty(&cli->write_q)) {
- tabled_srv.stats.opt_write++;
- return true; /* loop, not poll */
- }
-
- if (event_add(&cli->write_ev, NULL) < 0) {
- applog(LOG_WARNING, "cli_write event_add");
- return true; /* loop, not poll */
- }
-
- cli->writing = true;
-
- return false; /* poll wait */
-}
-
-int cli_writeq(struct client *cli, const void *buf, unsigned int buflen,
- cli_write_func cb, void *cb_data)
-{
- struct client_write *wr;
-
- if (!buf || !buflen)
- return -EINVAL;
-
- wr = calloc(1, sizeof(struct client_write));
- if (!wr)
- return -ENOMEM;
-
- wr->buf = buf;
- wr->togo = buflen;
- wr->length = buflen;
- wr->cb = cb;
- wr->cb_data = cb_data;
- wr->cb_cli = cli;
- list_add_tail(&wr->node, &cli->write_q);
- cli->write_cnt += buflen;
- if (cli->write_cnt > cli->write_cnt_max)
- cli->write_cnt_max = cli->write_cnt;
-
- return 0;
-}
-
-size_t cli_wqueued(struct client *cli)
-{
- return cli->write_cnt;
-}
-
/*
* Return:
* 0: progress was NOT made (EOF)
@@ -771,12 +602,6 @@ do_read:
return rc != 0;
}
-bool cli_cb_free(struct client *cli, void *cb_data, bool done)
-{
- free(cb_data);
- return false;
-}
-
static int cli_write_list(struct client *cli, GList *list)
{
int rc = 0;
@@ -784,8 +609,8 @@ static int cli_write_list(struct client *cli, GList *list)
tmp = list;
while (tmp) {
- rc = cli_writeq(cli, tmp->data, strlen(tmp->data),
- cli_cb_free, tmp->data);
+ rc = atcp_writeq(&cli->wst, tmp->data, strlen(tmp->data),
+ atcp_cb_free, tmp->data);
if (rc)
goto out;
@@ -870,14 +695,14 @@ bool cli_err_write(struct client *cli, char *hdr, char *content)
cli->state = evt_dispose;
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc)
return true;
- rc = cli_writeq(cli, content, strlen(content), cli_cb_free, content);
+ rc = atcp_writeq(&cli->wst, content, strlen(content), atcp_cb_free, content);
if (rc)
return true;
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
}
static bool cli_resp(struct client *cli, int http_status,
@@ -911,7 +736,7 @@ static bool cli_resp(struct client *cli, int http_status,
else
cli->state = evt_recycle;
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
cli->state = evt_dispose;
@@ -924,7 +749,7 @@ static bool cli_resp(struct client *cli, int http_status,
return true;
}
- rcb = cli_write_start(cli);
+ rcb = atcp_write_start(&cli->wst);
if (cli->state == evt_recycle)
return true;
@@ -942,7 +767,8 @@ bool cli_resp_html(struct client *cli, int http_status, GList *content)
return cli_resp(cli, http_status, "text/html", content);
}
-static bool cli_evt_http_req(struct client *cli, unsigned int events)
+static bool cli_evt_http_req(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
struct http_req *req = &cli->req;
char *host, *auth, *content_len_str;
@@ -1195,7 +1021,8 @@ err_out:
return true;
}
-static bool cli_evt_parse_hdr(struct client *cli, unsigned int events)
+static bool cli_evt_parse_hdr(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
char *buf, *buf_eol;
bool eoh = false;
@@ -1252,7 +1079,8 @@ static bool cli_evt_parse_hdr(struct client *cli, unsigned int events)
return true;
}
-static bool cli_evt_read_hdr(struct client *cli, unsigned int events)
+static bool cli_evt_read_hdr(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
int rc = cli_read(cli);
if (rc <= 0) {
@@ -1268,7 +1096,8 @@ static bool cli_evt_read_hdr(struct client *cli, unsigned int events)
return true;
}
-static bool cli_evt_parse_req(struct client *cli, unsigned int events)
+static bool cli_evt_parse_req(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
char *sp1, *sp2, *buf;
enum errcode err_resp;
@@ -1336,7 +1165,8 @@ err_out:
return cli_err(cli, err_resp);
}
-static bool cli_evt_read_req(struct client *cli, unsigned int events)
+static bool cli_evt_read_req(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
int rc = cli_read(cli);
if (rc <= 0) {
@@ -1374,6 +1204,32 @@ static cli_evt_func evt_funcs_status[] = {
[evt_recycle] = cli_evt_recycle,
};
+static int cli_le_wset(void *ev_info, int fd, atcp_ev_func cb, void *cb_data)
+{
+ struct event *ev = ev_info;
+
+ event_set(ev, fd, EV_WRITE | EV_PERSIST, cb, cb_data);
+ return 0;
+}
+
+static int cli_le_add(void *ev_info, const struct timeval *tv)
+{
+ struct event *ev = ev_info;
+ return event_add(ev, tv);
+}
+
+static int cli_le_del(void *ev_info)
+{
+ struct event *ev = ev_info;
+ return event_del(ev);
+}
+
+static const struct atcp_wr_ops libevent_wr_ops = {
+ .ev_wset = cli_le_wset,
+ .ev_add = cli_le_add,
+ .ev_del = cli_le_del,
+};
+
static struct client *cli_alloc(bool is_status)
{
struct client *cli;
@@ -1385,9 +1241,10 @@ static struct client *cli_alloc(bool is_status)
return NULL;
}
+ atcp_wr_init(&cli->wst, &libevent_wr_ops, &cli->write_ev, cli);
+
cli->state = evt_read_req;
cli->evt_table = is_status? evt_funcs_status: evt_funcs_server;
- INIT_LIST_HEAD(&cli->write_q);
INIT_LIST_HEAD(&cli->out_ch);
cli->req_ptr = cli->req_buf;
memset(&cli->req, 0, sizeof(cli->req) - sizeof(cli->req.hdr));
@@ -1395,22 +1252,20 @@ static struct client *cli_alloc(bool is_status)
return cli;
}
-static void tcp_cli_wr_event(int fd, short events, void *userdata)
-{
- struct client *cli = userdata;
-
- cli_writable(cli);
- cli_write_run_compl();
-}
-
static void tcp_cli_event(int fd, short events, void *userdata)
{
struct client *cli = userdata;
bool loop;
+ bool invalidate_cli = false;
do {
- loop = cli->evt_table[cli->state](cli, events);
- loop |= cli_write_run_compl();
+ loop = cli->evt_table[cli->state](cli, events, &invalidate_cli);
+ if (invalidate_cli) {
+ cli = NULL;
+ break;
+ }
+
+ loop |= atcp_write_run_compl(&cli->wst);
} while (loop);
}
@@ -1438,11 +1293,11 @@ static void tcp_srv_event(int fd, short events, void *userdata)
goto err_out;
}
+ atcp_wr_set_fd(&cli->wst, cli->fd);
+
tabled_srv.stats.tcp_accept++;
event_set(&cli->ev, cli->fd, EV_READ | EV_PERSIST, tcp_cli_event, cli);
- event_set(&cli->write_ev, cli->fd, EV_WRITE | EV_PERSIST,
- tcp_cli_wr_event, cli);
/* mark non-blocking, for upcoming poll use */
if (fsetflags("tcp client", cli->fd, O_NONBLOCK) < 0)
@@ -2205,7 +2060,6 @@ int main (int argc, char *argv[])
struct event_base *event_base_rep;
INIT_LIST_HEAD(&tabled_srv.all_stor);
- INIT_LIST_HEAD(&tabled_srv.write_compl_q);
tabled_srv.state_tdb = ST_TDB_INIT;
tabled_srv.rep_next_id = DBID_MIN;
diff --git a/server/status.c b/server/status.c
index e9fbb38..9af60f7 100644
--- a/server/status.c
+++ b/server/status.c
@@ -150,7 +150,8 @@ out_err:
return cli_err(cli, InternalError);
}
-bool stat_evt_http_req(struct client *cli, unsigned int events)
+bool stat_evt_http_req(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
struct http_req *req = &cli->req;
char *method = req->method;
diff --git a/server/tabled.h b/server/tabled.h
index d4d2048..4d3a2d9 100644
--- a/server/tabled.h
+++ b/server/tabled.h
@@ -31,6 +31,7 @@
#include <elist.h>
#include <tdb.h>
#include <hail_log.h>
+#include <anet.h>
#ifndef ARRAY_SIZE
#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
@@ -102,20 +103,8 @@ struct storage_node {
int ref; /* number of open_chunk or other */
};
-typedef bool (*cli_evt_func)(struct client *, unsigned int);
-typedef bool (*cli_write_func)(struct client *, void *, bool);
-
-struct client_write {
- const void *buf; /* write buffer pointer */
- int togo; /* write buffer remainder */
-
- int length; /* length for accounting */
- cli_write_func cb; /* callback */
- void *cb_data; /* data passed to cb */
- struct client *cb_cli; /* cli passed to cb */
-
- struct list_head node;
-};
+typedef bool (*cli_evt_func)(struct client *, unsigned int,
+ bool *invalidate_cli);
/* an open chunkd client */
struct open_chunk {
@@ -165,11 +154,8 @@ struct client {
struct event ev;
struct event write_ev;
- struct list_head write_q; /* list of async writes */
- size_t write_cnt; /* water level */
- bool writing;
+ struct atcp_wr_state wst;
/* some debugging stats */
- size_t write_cnt_max;
unsigned int req_used; /* amount of req_buf in use */
char *req_ptr; /* start of unexamined data */
@@ -216,12 +202,12 @@ enum st_net {
};
struct server_stats {
- unsigned long poll; /* number polls */
- unsigned long event; /* events dispatched */
- unsigned long tcp_accept; /* TCP accepted cxns */
- unsigned long opt_write; /* optimistic writes */
+ uint64_t poll; /* number polls */
+ uint64_t event; /* events dispatched */
+ uint64_t tcp_accept; /* TCP accepted cxns */
+ uint64_t opt_write; /* optimistic writes */
- unsigned long max_write_buf;
+ uint64_t max_write_buf;
};
#define DBID_NONE 0
@@ -249,7 +235,6 @@ struct server {
struct event_base *evbase_main;
int ev_pipe[2];
struct event pevt;
- struct list_head write_compl_q; /* list of done writes */
bool mc_delay;
struct event mc_timer;
@@ -361,7 +346,8 @@ extern bool object_put(struct client *cli, const char *user, const char *bucket,
const char *key, long content_len, bool expect_cont);
extern bool object_get(struct client *cli, const char *user, const char *bucket,
const char *key, bool want_body);
-extern bool cli_evt_http_data_in(struct client *cli, unsigned int events);
+extern bool cli_evt_http_data_in(struct client *cli, unsigned int events,
+ bool *invalidate_cli);
extern void cli_out_end(struct client *cli);
extern void cli_in_end(struct client *cli);
@@ -398,12 +384,6 @@ extern bool cli_err(struct client *cli, enum errcode code);
extern bool cli_err_write(struct client *cli, char *hdr, char *content);
extern bool cli_resp_xml(struct client *cli, int http_status, GList *content);
extern bool cli_resp_html(struct client *cli, int http_status, GList *content);
-extern int cli_writeq(struct client *cli, const void *buf, unsigned int buflen,
- cli_write_func cb, void *cb_data);
-extern size_t cli_wqueued(struct client *cli);
-extern bool cli_cb_free(struct client *cli, void *cb_data, bool done);
-extern bool cli_write_start(struct client *cli);
-extern bool cli_write_run_compl(void);
extern int cli_req_avail(struct client *cli);
extern void applog(int prio, const char *fmt, ...);
extern void cld_update_cb(void);
@@ -415,7 +395,8 @@ extern struct db_remote *tdb_find_remote_byname(const char *name);
extern struct db_remote *tdb_find_remote_byid(int id);
/* status.c */
-extern bool stat_evt_http_req(struct client *cli, unsigned int events);
+extern bool stat_evt_http_req(struct client *cli, unsigned int events,
+ bool *invalidate_cli);
/* config.c */
extern void read_config(void);
next prev parent reply other threads:[~2010-09-24 0:34 UTC|newest]
Thread overview: 4+ messages / expand[flat|nested] mbox.gz Atom feed top
2010-09-24 0:32 [PATCH] libhail: add async TCP network writing API, atcp_wr* Jeff Garzik
2010-09-24 0:34 ` Jeff Garzik [this message]
2010-09-24 0:35 ` [PATCH] itd: use libhail's anet Jeff Garzik
2010-09-24 19:13 ` Jeff Garzik
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20100924003448.GA28284@havoc.gtf.org \
--to=jeff@garzik.org \
--cc=hail-devel@vger.kernel.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.