* [tabled patch] abstract out TCP-write code
@ 2010-09-23 0:09 Jeff Garzik
2010-09-23 0:28 ` Pete Zaitcev
2010-09-23 21:09 ` [tabled patch v2] " Jeff Garzik
0 siblings, 2 replies; 11+ messages in thread
From: Jeff Garzik @ 2010-09-23 0:09 UTC (permalink / raw)
To: hail-devel
This is step #1. Other steps in the process:
2) update server/atcp.c and itd/*.c in tandem, until they have matching
TCP-write code. should be straightforward, as both are based on the
same codebase (tabled).
3) move atcp to libhail
4) remove TCP-write code from tabled, itd
5) update atcp to support SSL, sendfile
6) update chunkd to support atcp (req. step #5)
All this code bears the same lineage, so it shouldn't be too difficult.
Also note, this is a first draft with embedded libevent dependencies. I
agree w/ zaitcev that the goal should be to eliminate these. atcp is
wonderfully generic at present; not even a GLib dependency, IIRC.
server/Makefile.am | 1
server/atcp.c | 243 +++++++++++++++++++++++++++++++++++++++++++++++++++++
server/atcp.h | 90 +++++++++++++++++++
server/bucket.c | 8 -
server/object.c | 56 ++++++------
server/server.c | 237 +++++----------------------------------------------
server/tabled.h | 37 +-------
7 files changed, 400 insertions(+), 272 deletions(-)
diff --git a/server/Makefile.am b/server/Makefile.am
index 5b53a0a..5e0abd5 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -4,6 +4,7 @@ INCLUDES = -I$(top_srcdir)/include @GLIB_CFLAGS@ @HAIL_CFLAGS@
sbin_PROGRAMS = tabled tdbadm
tabled_SOURCES = tabled.h \
+ atcp.c atcp.h \
bucket.c cldu.c config.c metarep.c object.c replica.c \
server.c status.c storage.c storparse.c util.c
tabled_LDADD = ../lib/libtdb.a \
diff --git a/server/atcp.c b/server/atcp.c
new file mode 100644
index 0000000..dac5b91
--- /dev/null
+++ b/server/atcp.c
@@ -0,0 +1,243 @@
+
+/*
+ * Copyright 2010 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING. If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define _GNU_SOURCE
+#include "tabled-config.h"
+
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <sys/uio.h>
+#include "atcp.h"
+
+bool atcp_cb_free(void *cb_data1, void *cb_data2, bool done)
+{
+ /* in typical usage, cb_data1 is the owner of cb_data2,
+ and has a longer lifetime. Therefore, by convention,
+ cb_data2 is the buffer to be released.
+ */
+ free(cb_data2);
+ return false;
+}
+
+static void atcp_write_complete(struct atcp_write *tmp)
+{
+ struct atcp_wr_state *wst = tmp->wst;
+
+ list_del(&tmp->node);
+ list_add_tail(&tmp->node, &wst->write_compl_q);
+}
+
+static bool atcp_write_free(struct atcp_write *tmp, bool done)
+{
+ struct atcp_wr_state *wst = tmp->wst;
+ bool rcb = false;
+
+ wst->write_cnt -= tmp->length;
+ list_del_init(&tmp->node);
+ if (tmp->cb)
+ rcb = tmp->cb(tmp->cb_data1, tmp->cb_data2, done);
+ free(tmp);
+
+ return rcb;
+}
+
+bool atcp_write_run_compl(struct atcp_wr_state *wst)
+{
+ struct atcp_write *wr;
+ bool do_loop;
+
+ do_loop = false;
+ while (!list_empty(&wst->write_compl_q)) {
+ wr = list_entry(wst->write_compl_q.next,
+ struct atcp_write, node);
+ do_loop |= atcp_write_free(wr, true);
+ }
+ return do_loop;
+}
+
+void atcp_write_free_all(struct atcp_wr_state *wst)
+{
+ struct atcp_write *wr, *tmp;
+
+ atcp_write_run_compl(wst);
+ list_for_each_entry_safe(wr, tmp, &wst->write_q, node) {
+ atcp_write_free(wr, false);
+ }
+}
+
+size_t atcp_wqueued(struct atcp_wr_state *wst)
+{
+ return wst->write_cnt;
+}
+
+static bool atcp_writable(struct atcp_wr_state *wst)
+{
+ int n_iov;
+ struct atcp_write *tmp;
+ ssize_t rc;
+ struct iovec iov[ATCP_MAX_WR_IOV];
+
+ /* accumulate pending writes into iovec */
+ n_iov = 0;
+ list_for_each_entry(tmp, &wst->write_q, node) {
+ if (n_iov == ATCP_MAX_WR_IOV)
+ break;
+ /* bleh, struct iovec should declare iov_base const */
+ iov[n_iov].iov_base = (void *) tmp->buf;
+ iov[n_iov].iov_len = tmp->togo;
+ n_iov++;
+ }
+
+ /* execute non-blocking write */
+do_write:
+ rc = writev(wst->fd, iov, n_iov);
+ if (rc < 0) {
+ if (errno == EINTR)
+ goto do_write;
+ if (errno != EAGAIN)
+ goto err_out;
+ return true;
+ }
+
+ /* iterate through write queue, issuing completions based on
+ * amount of data written
+ */
+ while (rc > 0) {
+ int sz;
+
+ /* get pointer to first record on list */
+ tmp = list_entry(wst->write_q.next, struct atcp_write, node);
+
+ /* mark data consumed by decreasing tmp->len */
+ sz = (tmp->togo < rc) ? tmp->togo : rc;
+ tmp->togo -= sz;
+ tmp->buf += sz;
+ rc -= sz;
+
+ /* if tmp->len reaches zero, write is complete,
+ * so schedule it for clean up (cannot call callback
+ * right away or an endless recursion will result)
+ */
+ if (tmp->togo == 0)
+ atcp_write_complete(tmp);
+ }
+
+ /* if we emptied the queue, clear write notification */
+ if (list_empty(&wst->write_q)) {
+ wst->writing = false;
+ if (event_del(&wst->write_ev) < 0)
+ goto err_out;
+ }
+
+ return true;
+
+err_out:
+ atcp_write_free_all(wst);
+ return false;
+}
+
+static void atcp_wr_event(int fd, short events, void *userdata)
+{
+ struct atcp_wr_state *wst = userdata;
+
+ atcp_writable(wst);
+ atcp_write_run_compl(wst);
+}
+
+void atcp_wr_set_fd(struct atcp_wr_state *wst, int fd)
+{
+ wst->fd = fd;
+
+ event_set(&wst->write_ev, fd, EV_WRITE | EV_PERSIST,
+ atcp_wr_event, wst);
+}
+
+bool atcp_write_start(struct atcp_wr_state *wst)
+{
+ if (list_empty(&wst->write_q))
+ return true; /* loop, not poll */
+
+ /* if write-poll already active, nothing further to do */
+ if (wst->writing)
+ return false; /* poll wait */
+
+ /* attempt optimistic write, in hopes of avoiding poll,
+ * or at least refill the write buffers so as to not
+ * get -immediately- called again by the kernel
+ */
+ atcp_writable(wst);
+ if (list_empty(&wst->write_q)) {
+ wst->opt_write++;
+ return true; /* loop, not poll */
+ }
+
+ if (event_add(&wst->write_ev, NULL) < 0)
+ return true; /* loop, not poll */
+
+ wst->writing = true;
+
+ return false; /* poll wait */
+}
+
+int atcp_writeq(struct atcp_wr_state *wst, const void *buf, unsigned int buflen,
+ atcp_write_func cb, void *cb_data1, void *cb_data2)
+{
+ struct atcp_write *wr;
+
+ if (!buf || !buflen)
+ return -EINVAL;
+
+ wr = calloc(1, sizeof(struct atcp_write));
+ if (!wr)
+ return -ENOMEM;
+
+ wr->buf = buf;
+ wr->togo = buflen;
+ wr->length = buflen;
+ wr->cb = cb;
+ wr->cb_data1 = cb_data1;
+ wr->cb_data2 = cb_data2;
+ wr->wst = wst;
+ list_add_tail(&wr->node, &wst->write_q);
+ wst->write_cnt += buflen;
+ if (wst->write_cnt > wst->write_cnt_max)
+ wst->write_cnt_max = wst->write_cnt;
+
+ return 0;
+}
+
+void atcp_wr_exit(struct atcp_wr_state *wst)
+{
+ if (!wst)
+ return;
+
+ atcp_write_free_all(wst);
+}
+
+void atcp_wr_init(struct atcp_wr_state *wst)
+{
+ memset(wst, 0, sizeof(*wst));
+
+ INIT_LIST_HEAD(&wst->write_q);
+ INIT_LIST_HEAD(&wst->write_compl_q);
+
+ wst->fd = -1;
+}
+
diff --git a/server/atcp.h b/server/atcp.h
new file mode 100644
index 0000000..996a59e
--- /dev/null
+++ b/server/atcp.h
@@ -0,0 +1,90 @@
+#ifndef __ATCP_H__
+#define __ATCP_H__
+
+/*
+ * Copyright 2010 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING. If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <event.h>
+#include <elist.h>
+
+enum {
+ ATCP_MAX_WR_IOV = 32, /* max iov per writev(2) */
+};
+
+struct atcp_wr_state {
+ int fd; /* our socket */
+
+ bool writing; /* actively trying to write? */
+
+ size_t write_cnt; /* water level */
+ size_t write_cnt_max;
+
+ struct list_head write_q; /* list of async writes */
+ struct list_head write_compl_q; /* list of done writes */
+
+ struct event write_ev;
+
+ /* various statistics */
+ uint64_t opt_write; /* optimistic writes */
+};
+
+typedef bool (*atcp_write_func)(void *, void *, bool);
+
+struct atcp_write {
+ const void *buf; /* write buffer pointer */
+ int togo; /* write buffer remainder */
+
+ int length; /* length for accounting */
+ atcp_write_func cb; /* callback */
+ void *cb_data1; /* first data passed to cb */
+ void *cb_data2; /* second data passed to cb */
+
+ struct atcp_wr_state *wst; /* our parent */
+
+ struct list_head node; /* write_[compl_]q list node */
+};
+
+/* setup and teardown atcp write state */
+extern void atcp_wr_exit(struct atcp_wr_state *wst);
+extern void atcp_wr_init(struct atcp_wr_state *wst);
+
+/* generic write callback, that call free(cb_data2) */
+extern bool atcp_cb_free(void *cb_data1, void *cb_data2, bool done);
+
+/* clear all write queues immediately, even if not complete */
+extern void atcp_write_free_all(struct atcp_wr_state *wst);
+
+/* complete all writes found on completion queue */
+extern bool atcp_write_run_compl(struct atcp_wr_state *wst);
+
+/* total number of octets queued at this moment */
+extern size_t atcp_wqueued(struct atcp_wr_state *wst);
+
+/* initialize internal fd, event setup */
+extern void atcp_wr_set_fd(struct atcp_wr_state *wst, int fd);
+
+/* add a buffer to the write queue */
+extern int atcp_writeq(struct atcp_wr_state *wst, const void *buf, unsigned int buflen,
+ atcp_write_func cb, void *cb_data1, void *cb_data2);
+
+/* begin pushing write queue to socket */
+extern bool atcp_write_start(struct atcp_wr_state *wst);
+
+#endif /* __ATCP_H__ */
diff --git a/server/bucket.c b/server/bucket.c
index eb03e03..982ed62 100644
--- a/server/bucket.c
+++ b/server/bucket.c
@@ -566,13 +566,13 @@ bool bucket_add(struct client *cli, const char *user, const char *bucket)
bucket) < 0)
return cli_err(cli, InternalError);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out:
rc = txn->abort(txn);
@@ -718,13 +718,13 @@ bool bucket_del(struct client *cli, const char *user, const char *bucket)
hutil_time2str(timestr, sizeof(timestr), time(NULL))) < 0)
return cli_err(cli, InternalError);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out:
rc = txn->abort(txn);
diff --git a/server/object.c b/server/object.c
index 3801e94..b053ed9 100644
--- a/server/object.c
+++ b/server/object.c
@@ -227,13 +227,13 @@ bool object_del(struct client *cli, const char *user,
hutil_time2str(timestr, sizeof(timestr), time(NULL))) < 0)
return cli_err(cli, InternalError);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out:
rc = txn->abort(txn);
@@ -525,13 +525,13 @@ static bool object_put_end(struct client *cli)
return cli_err(cli, InternalError);
}
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out_rb:
rc = txn->abort(txn);
@@ -812,8 +812,8 @@ static bool object_put_body(struct client *cli, const char *user,
cli_out_end(cli);
return cli_err(cli, InternalError);
}
- cli_writeq(cli, cont, strlen(cont), cli_cb_free, cont);
- cli_write_start(cli);
+ atcp_writeq(&cli->wst, cont, strlen(cont), atcp_cb_free, cli, cont);
+ atcp_write_start(&cli->wst);
}
avail = MIN(cli_req_avail(cli), content_len);
@@ -940,13 +940,13 @@ static bool object_put_acls(struct client *cli, const char *user,
return cli_err(cli, InternalError);
}
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out_rb:
rc = txn->abort(txn);
@@ -990,10 +990,10 @@ void cli_in_end(struct client *cli)
cli->in_len = 0;
}
-static bool object_get_more(struct client *cli, void *cb_data, bool done);
+static bool object_get_more(void *, void *, bool);
/*
- * Return true iff cli_writeq was called. This is compatible with the
+ * Return true iff atcp_writeq was called. This is compatible with the
* convention for cli continuation callbacks, so object_get_more can call us.
*/
static bool object_get_poke(struct client *cli)
@@ -1026,7 +1026,7 @@ static bool object_get_poke(struct client *cli)
if (bytes == 0) {
if (!cli->in_len) {
cli_in_end(cli);
- cli_write_start(cli);
+ atcp_write_start(&cli->wst);
}
free(buf);
return false;
@@ -1034,15 +1034,15 @@ static bool object_get_poke(struct client *cli)
cli->in_len -= bytes;
if (!cli->in_len) {
- if (cli_writeq(cli, buf, bytes, cli_cb_free, buf))
+ if (atcp_writeq(&cli->wst, buf, bytes, atcp_cb_free, cli, buf))
goto err_out;
cli_in_end(cli);
- cli_write_start(cli);
+ atcp_write_start(&cli->wst);
} else {
- if (cli_writeq(cli, buf, bytes, object_get_more, buf))
+ if (atcp_writeq(&cli->wst, buf, bytes, object_get_more, cli, buf))
goto err_out;
- if (cli_wqueued(cli) >= CLI_DATA_BUF_SZ)
- cli_write_start(cli);
+ if (atcp_wqueued(&cli->wst) >= CLI_DATA_BUF_SZ)
+ atcp_write_start(&cli->wst);
}
return true;
@@ -1053,11 +1053,12 @@ err_out:
}
/* callback from the client side: a queued write is being disposed */
-static bool object_get_more(struct client *cli, void *cb_data, bool done)
+static bool object_get_more(void *cb_data1, void *cb_data2, bool done)
{
+ struct client *cli = cb_data1;
/* free now-written buffer */
- free(cb_data);
+ free(cb_data2);
/* do not queue more, if !completion or fd was closed early */
if (!done) /* FIXME We used to test for input errors here. */
@@ -1071,8 +1072,10 @@ static bool object_get_more(struct client *cli, void *cb_data, bool done)
/* callback from the chunkd side: some data is available */
static void object_get_event(struct open_chunk *ochunk)
{
- object_get_poke(ochunk->cli);
- cli_write_run_compl();
+ struct client *cli = ochunk->cli;
+
+ object_get_poke(cli);
+ atcp_write_run_compl(&cli->wst);
}
static int object_node_count_up(struct db_obj_ent *obj)
@@ -1327,7 +1330,7 @@ static bool object_get_body(struct client *cli, const char *user,
if (!want_body) {
cli_in_end(cli);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr);
if (rc) {
free(hdr);
return true;
@@ -1347,7 +1350,7 @@ static bool object_get_body(struct client *cli, const char *user,
if (!cli->in_len)
cli_in_end(cli);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr);
if (rc) {
free(hdr);
goto err_out_in_end;
@@ -1365,21 +1368,22 @@ static bool object_get_body(struct client *cli, const char *user,
goto err_out_in_end;
memcpy(tmp, buf, bytes);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr);
if (rc) {
free(hdr);
free(tmp);
return true;
}
- if (cli_writeq(cli, tmp, bytes,
- cli->in_len ? object_get_more : cli_cb_free, tmp))
+ if (atcp_writeq(&cli->wst, tmp, bytes,
+ cli->in_len ? object_get_more : atcp_cb_free,
+ cli, tmp))
goto err_out_in_end;
start_write:
free(obj);
g_string_free(extra_hdr, TRUE);
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out_in_end:
cli_in_end(cli);
diff --git a/server/server.c b/server/server.c
index 7a9fb7a..f8c4540 100644
--- a/server/server.c
+++ b/server/server.c
@@ -56,8 +56,6 @@
const char *argp_program_version = PACKAGE_VERSION;
enum {
- CLI_MAX_WR_IOV = 32, /* max iov per writev(2) */
-
SFL_FOREGROUND = (1 << 0), /* run in foreground */
};
@@ -488,65 +486,25 @@ bool stat_status(struct client *cli, GList *content)
if (asprintf(&str,
"<p>Stats: "
- "poll %lu event %lu tcp_accept %lu opt_write %lu</p>\r\n"
- "<p>Debug: max_write_buf %lu</p>\r\n",
- tabled_srv.stats.poll,
- tabled_srv.stats.event,
- tabled_srv.stats.tcp_accept,
- tabled_srv.stats.opt_write,
- tabled_srv.stats.max_write_buf) < 0)
+ "poll %llu event %llu tcp_accept %llu opt_write %llu</p>\r\n"
+ "<p>Debug: max_write_buf %llu</p>\r\n",
+ (unsigned long long) tabled_srv.stats.poll,
+ (unsigned long long) tabled_srv.stats.event,
+ (unsigned long long) tabled_srv.stats.tcp_accept,
+ (unsigned long long) tabled_srv.stats.opt_write,
+ (unsigned long long) tabled_srv.stats.max_write_buf) < 0)
return false;
content = g_list_append(content, str);
return true;
}
-static void cli_write_complete(struct client *cli, struct client_write *tmp)
-{
- list_del(&tmp->node);
- list_add_tail(&tmp->node, &tabled_srv.write_compl_q);
-}
-
-static bool cli_write_free(struct client_write *tmp, bool done)
-{
- struct client *cli = tmp->cb_cli;
- bool rcb = false;
-
- cli->write_cnt -= tmp->length;
- list_del(&tmp->node);
- if (tmp->cb)
- rcb = tmp->cb(cli, tmp->cb_data, done);
- free(tmp);
-
- return rcb;
-}
-
-static void cli_write_free_all(struct client *cli)
-{
- struct client_write *wr, *tmp;
-
- cli_write_run_compl();
- list_for_each_entry_safe(wr, tmp, &cli->write_q, node) {
- cli_write_free(wr, false);
- }
-}
-
-bool cli_write_run_compl(void)
-{
- struct client_write *wr;
- bool do_loop;
-
- do_loop = false;
- while (!list_empty(&tabled_srv.write_compl_q)) {
- wr = list_entry(tabled_srv.write_compl_q.next,
- struct client_write, node);
- do_loop |= cli_write_free(wr, true);
- }
- return do_loop;
-}
-
static void cli_free(struct client *cli)
{
- cli_write_free_all(cli);
+ if (cli->wst.write_cnt_max > tabled_srv.stats.max_write_buf)
+ tabled_srv.stats.max_write_buf = cli->wst.write_cnt_max;
+ tabled_srv.stats.opt_write += cli->wst.opt_write;
+
+ atcp_wr_exit(&cli->wst);
cli_out_end(cli);
cli_in_end(cli);
@@ -561,9 +519,6 @@ static void cli_free(struct client *cli)
hreq_free(&cli->req);
- if (cli->write_cnt_max > tabled_srv.stats.max_write_buf)
- tabled_srv.stats.max_write_buf = cli->write_cnt_max;
-
if (debugging)
applog(LOG_INFO, "client %s ended", cli->addr_host);
@@ -575,7 +530,7 @@ static bool cli_evt_dispose(struct client *cli, unsigned int events)
/* if write queue is not empty, we should continue to get
* poll callbacks here until it is
*/
- if (list_empty(&cli->write_q))
+ if (list_empty(&cli->wst.write_q))
cli_free(cli);
return false;
@@ -608,134 +563,6 @@ static bool cli_evt_recycle(struct client *cli, unsigned int events)
return true;
}
-static void cli_writable(struct client *cli)
-{
- int n_iov;
- struct client_write *tmp;
- ssize_t rc;
- struct iovec iov[CLI_MAX_WR_IOV];
-
- /* accumulate pending writes into iovec */
- n_iov = 0;
- list_for_each_entry(tmp, &cli->write_q, node) {
- if (n_iov == CLI_MAX_WR_IOV)
- break;
- /* bleh, struct iovec should declare iov_base const */
- iov[n_iov].iov_base = (void *) tmp->buf;
- iov[n_iov].iov_len = tmp->togo;
- n_iov++;
- }
-
- /* execute non-blocking write */
-do_write:
- rc = writev(cli->fd, iov, n_iov);
- if (rc < 0) {
- if (errno == EINTR)
- goto do_write;
- if (errno != EAGAIN)
- goto err_out;
- return;
- }
-
- /* iterate through write queue, issuing completions based on
- * amount of data written
- */
- while (rc > 0) {
- int sz;
-
- /* get pointer to first record on list */
- tmp = list_entry(cli->write_q.next, struct client_write, node);
-
- /* mark data consumed by decreasing tmp->len */
- sz = (tmp->togo < rc) ? tmp->togo : rc;
- tmp->togo -= sz;
- tmp->buf += sz;
- rc -= sz;
-
- /* if tmp->len reaches zero, write is complete,
- * so schedule it for clean up (cannot call callback
- * right away or an endless recursion will result)
- */
- if (tmp->togo == 0)
- cli_write_complete(cli, tmp);
- }
-
- /* if we emptied the queue, clear write notification */
- if (list_empty(&cli->write_q)) {
- cli->writing = false;
- if (event_del(&cli->write_ev) < 0) {
- applog(LOG_WARNING, "cli_writable event_del");
- goto err_out;
- }
- }
-
- return;
-
-err_out:
- cli->state = evt_dispose;
- cli_write_free_all(cli);
-}
-
-bool cli_write_start(struct client *cli)
-{
- if (list_empty(&cli->write_q))
- return true; /* loop, not poll */
-
- /* if write-poll already active, nothing further to do */
- if (cli->writing)
- return false; /* poll wait */
-
- /* attempt optimistic write, in hopes of avoiding poll,
- * or at least refill the write buffers so as to not
- * get -immediately- called again by the kernel
- */
- cli_writable(cli);
- if (list_empty(&cli->write_q)) {
- tabled_srv.stats.opt_write++;
- return true; /* loop, not poll */
- }
-
- if (event_add(&cli->write_ev, NULL) < 0) {
- applog(LOG_WARNING, "cli_write event_add");
- return true; /* loop, not poll */
- }
-
- cli->writing = true;
-
- return false; /* poll wait */
-}
-
-int cli_writeq(struct client *cli, const void *buf, unsigned int buflen,
- cli_write_func cb, void *cb_data)
-{
- struct client_write *wr;
-
- if (!buf || !buflen)
- return -EINVAL;
-
- wr = calloc(1, sizeof(struct client_write));
- if (!wr)
- return -ENOMEM;
-
- wr->buf = buf;
- wr->togo = buflen;
- wr->length = buflen;
- wr->cb = cb;
- wr->cb_data = cb_data;
- wr->cb_cli = cli;
- list_add_tail(&wr->node, &cli->write_q);
- cli->write_cnt += buflen;
- if (cli->write_cnt > cli->write_cnt_max)
- cli->write_cnt_max = cli->write_cnt;
-
- return 0;
-}
-
-size_t cli_wqueued(struct client *cli)
-{
- return cli->write_cnt;
-}
-
/*
* Return:
* 0: progress was NOT made (EOF)
@@ -771,12 +598,6 @@ do_read:
return rc != 0;
}
-bool cli_cb_free(struct client *cli, void *cb_data, bool done)
-{
- free(cb_data);
- return false;
-}
-
static int cli_write_list(struct client *cli, GList *list)
{
int rc = 0;
@@ -784,8 +605,8 @@ static int cli_write_list(struct client *cli, GList *list)
tmp = list;
while (tmp) {
- rc = cli_writeq(cli, tmp->data, strlen(tmp->data),
- cli_cb_free, tmp->data);
+ rc = atcp_writeq(&cli->wst, tmp->data, strlen(tmp->data),
+ atcp_cb_free, cli, tmp->data);
if (rc)
goto out;
@@ -870,14 +691,14 @@ bool cli_err_write(struct client *cli, char *hdr, char *content)
cli->state = evt_dispose;
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr);
if (rc)
return true;
- rc = cli_writeq(cli, content, strlen(content), cli_cb_free, content);
+ rc = atcp_writeq(&cli->wst, content, strlen(content), atcp_cb_free, cli, content);
if (rc)
return true;
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
}
static bool cli_resp(struct client *cli, int http_status,
@@ -911,7 +732,7 @@ static bool cli_resp(struct client *cli, int http_status,
else
cli->state = evt_recycle;
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, cli, hdr);
if (rc) {
free(hdr);
cli->state = evt_dispose;
@@ -924,7 +745,7 @@ static bool cli_resp(struct client *cli, int http_status,
return true;
}
- rcb = cli_write_start(cli);
+ rcb = atcp_write_start(&cli->wst);
if (cli->state == evt_recycle)
return true;
@@ -1385,9 +1206,10 @@ static struct client *cli_alloc(bool is_status)
return NULL;
}
+ atcp_wr_init(&cli->wst);
+
cli->state = evt_read_req;
cli->evt_table = is_status? evt_funcs_status: evt_funcs_server;
- INIT_LIST_HEAD(&cli->write_q);
INIT_LIST_HEAD(&cli->out_ch);
cli->req_ptr = cli->req_buf;
memset(&cli->req, 0, sizeof(cli->req) - sizeof(cli->req.hdr));
@@ -1395,14 +1217,6 @@ static struct client *cli_alloc(bool is_status)
return cli;
}
-static void tcp_cli_wr_event(int fd, short events, void *userdata)
-{
- struct client *cli = userdata;
-
- cli_writable(cli);
- cli_write_run_compl();
-}
-
static void tcp_cli_event(int fd, short events, void *userdata)
{
struct client *cli = userdata;
@@ -1410,7 +1224,7 @@ static void tcp_cli_event(int fd, short events, void *userdata)
do {
loop = cli->evt_table[cli->state](cli, events);
- loop |= cli_write_run_compl();
+ loop |= atcp_write_run_compl(&cli->wst);
} while (loop);
}
@@ -1438,11 +1252,11 @@ static void tcp_srv_event(int fd, short events, void *userdata)
goto err_out;
}
+ atcp_wr_set_fd(&cli->wst, cli->fd);
+
tabled_srv.stats.tcp_accept++;
event_set(&cli->ev, cli->fd, EV_READ | EV_PERSIST, tcp_cli_event, cli);
- event_set(&cli->write_ev, cli->fd, EV_WRITE | EV_PERSIST,
- tcp_cli_wr_event, cli);
/* mark non-blocking, for upcoming poll use */
if (fsetflags("tcp client", cli->fd, O_NONBLOCK) < 0)
@@ -2202,7 +2016,6 @@ int main (int argc, char *argv[])
struct event_base *event_base_rep;
INIT_LIST_HEAD(&tabled_srv.all_stor);
- INIT_LIST_HEAD(&tabled_srv.write_compl_q);
tabled_srv.state_tdb = ST_TDB_INIT;
tabled_srv.rep_next_id = DBID_MIN;
diff --git a/server/tabled.h b/server/tabled.h
index d4d2048..be6c9b4 100644
--- a/server/tabled.h
+++ b/server/tabled.h
@@ -31,6 +31,7 @@
#include <elist.h>
#include <tdb.h>
#include <hail_log.h>
+#include "atcp.h"
#ifndef ARRAY_SIZE
#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
@@ -103,19 +104,6 @@ struct storage_node {
};
typedef bool (*cli_evt_func)(struct client *, unsigned int);
-typedef bool (*cli_write_func)(struct client *, void *, bool);
-
-struct client_write {
- const void *buf; /* write buffer pointer */
- int togo; /* write buffer remainder */
-
- int length; /* length for accounting */
- cli_write_func cb; /* callback */
- void *cb_data; /* data passed to cb */
- struct client *cb_cli; /* cli passed to cb */
-
- struct list_head node;
-};
/* an open chunkd client */
struct open_chunk {
@@ -163,13 +151,9 @@ struct client {
int fd; /* socket */
bool ev_active;
struct event ev;
- struct event write_ev;
- struct list_head write_q; /* list of async writes */
- size_t write_cnt; /* water level */
- bool writing;
+ struct atcp_wr_state wst;
/* some debugging stats */
- size_t write_cnt_max;
unsigned int req_used; /* amount of req_buf in use */
char *req_ptr; /* start of unexamined data */
@@ -216,12 +200,12 @@ enum st_net {
};
struct server_stats {
- unsigned long poll; /* number polls */
- unsigned long event; /* events dispatched */
- unsigned long tcp_accept; /* TCP accepted cxns */
- unsigned long opt_write; /* optimistic writes */
+ uint64_t poll; /* number polls */
+ uint64_t event; /* events dispatched */
+ uint64_t tcp_accept; /* TCP accepted cxns */
+ uint64_t opt_write; /* optimistic writes */
- unsigned long max_write_buf;
+ uint64_t max_write_buf;
};
#define DBID_NONE 0
@@ -249,7 +233,6 @@ struct server {
struct event_base *evbase_main;
int ev_pipe[2];
struct event pevt;
- struct list_head write_compl_q; /* list of done writes */
bool mc_delay;
struct event mc_timer;
@@ -398,12 +381,6 @@ extern bool cli_err(struct client *cli, enum errcode code);
extern bool cli_err_write(struct client *cli, char *hdr, char *content);
extern bool cli_resp_xml(struct client *cli, int http_status, GList *content);
extern bool cli_resp_html(struct client *cli, int http_status, GList *content);
-extern int cli_writeq(struct client *cli, const void *buf, unsigned int buflen,
- cli_write_func cb, void *cb_data);
-extern size_t cli_wqueued(struct client *cli);
-extern bool cli_cb_free(struct client *cli, void *cb_data, bool done);
-extern bool cli_write_start(struct client *cli);
-extern bool cli_write_run_compl(void);
extern int cli_req_avail(struct client *cli);
extern void applog(int prio, const char *fmt, ...);
extern void cld_update_cb(void);
^ permalink raw reply related [flat|nested] 11+ messages in thread
* Re: [tabled patch] abstract out TCP-write code
2010-09-23 0:09 [tabled patch] abstract out TCP-write code Jeff Garzik
@ 2010-09-23 0:28 ` Pete Zaitcev
2010-09-23 1:26 ` Jeff Garzik
2010-09-23 21:09 ` [tabled patch v2] " Jeff Garzik
1 sibling, 1 reply; 11+ messages in thread
From: Pete Zaitcev @ 2010-09-23 0:28 UTC (permalink / raw)
To: Jeff Garzik; +Cc: hail-devel
On Wed, 22 Sep 2010 20:09:08 -0400
Jeff Garzik <jeff@garzik.org> wrote:
> @@ -1410,7 +1224,7 @@ static void tcp_cli_event(int fd, short events, void *userdata)
>
> do {
> loop = cli->evt_table[cli->state](cli, events);
> - loop |= cli_write_run_compl();
> + loop |= atcp_write_run_compl(&cli->wst);
> } while (loop);
> }
This cannot be right. Please see commit
d1a45fca7908b7128ed4fe2ab611111f02ee938f:
tabled: fix running completions over disposed cli
Miracluously this never actually crashed on me, but I added unrelated
debugging printout into the dispatch routine and it printed weird
values. Then it dawned on me that a state change function may dispose
of the struct cli, in which case cli_write_run_compl is use-after-free.
It may seem that checking if the old state was evt_dispose before
running cli_write_run_compl is an expedient fix, but that does not
work, because we do not always dispose of the cli in such case.
If the cli to be disposed still has anything in the queue, we
need to continue to deliver events, and for that we have to
run outstanding completions.
So, we go a longer route and re-hook the list of completions
to a per-server global instead of a client. The patch is straight-
forward. The only thing we need to be careful is to make sure
that no outstanding completions are left in the queue before
freeing a client struct. This is ensured by force-running completions.
One other necessary change was to add a back poiter from a completion
to the current client. This is because one caller needed the client
pointer (object_get_more).
Speaking of backpointers, I think it would be much cleaner
to get rid of two-argument format for callback. It stinks of
special-casing. Either throw a back pointer into the first word
of buf, or create some kind of object/struct passed as
argument to atcp_writeq(), that's what I would do.
-- Pete
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [tabled patch] abstract out TCP-write code
2010-09-23 0:28 ` Pete Zaitcev
@ 2010-09-23 1:26 ` Jeff Garzik
2010-09-23 2:37 ` Pete Zaitcev
0 siblings, 1 reply; 11+ messages in thread
From: Jeff Garzik @ 2010-09-23 1:26 UTC (permalink / raw)
To: Pete Zaitcev; +Cc: hail-devel
On 09/22/2010 08:28 PM, Pete Zaitcev wrote:
> On Wed, 22 Sep 2010 20:09:08 -0400
> Jeff Garzik<jeff@garzik.org> wrote:
>
>> @@ -1410,7 +1224,7 @@ static void tcp_cli_event(int fd, short events, void *userdata)
>>
>> do {
>> loop = cli->evt_table[cli->state](cli, events);
>> - loop |= cli_write_run_compl();
>> + loop |= atcp_write_run_compl(&cli->wst);
>> } while (loop);
>> }
>
> This cannot be right. Please see commit
> d1a45fca7908b7128ed4fe2ab611111f02ee938f:
>
> tabled: fix running completions over disposed cli
>
> Miracluously this never actually crashed on me, but I added unrelated
> debugging printout into the dispatch routine and it printed weird
> values. Then it dawned on me that a state change function may dispose
> of the struct cli, in which case cli_write_run_compl is use-after-free.
>
> It may seem that checking if the old state was evt_dispose before
> running cli_write_run_compl is an expedient fix, but that does not
> work, because we do not always dispose of the cli in such case.
> If the cli to be disposed still has anything in the queue, we
> need to continue to deliver events, and for that we have to
> run outstanding completions.
>
> So, we go a longer route and re-hook the list of completions
> to a per-server global instead of a client. The patch is straight-
> forward. The only thing we need to be careful is to make sure
> that no outstanding completions are left in the queue before
> freeing a client struct. This is ensured by force-running completions.
>
> One other necessary change was to add a back poiter from a completion
> to the current client. This is because one caller needed the client
> pointer (object_get_more).
Thanks for the reminder.
Looking at this change again, I don't see how this avoids
use-after-free. If completions exist after state change function leads
one to cli_evt_dispose() -> cli_free(), then cli_write_run_compl() still
calls cli_write_free() with the stale 'cli' pointer.
It seems like the real fix is to have the functions in the FSM loop
return an additional piece of information, indicating that 'cli' is no
longer valid.
client_write's object lifetime should always be a subset of client's
object lifetime.
> Speaking of backpointers, I think it would be much cleaner
> to get rid of two-argument format for callback. It stinks of
> special-casing. Either throw a back pointer into the first word
> of buf, or create some kind of object/struct passed as
> argument to atcp_writeq(), that's what I would do.
It is a common idiom even in GLib that callbacks receive two anonymous
pointers; witness the data type GFunc's 'data' and 'user_data'
arguments:
http://library.gnome.org/devel/glib/stable/glib-Doubly-Linked-Lists.html#GFunc
This is because typically one has two objects -- a parent and a child --
with different object lifetime. When considering both tabled and itd,
you see example of this in the need for struct client (tabled) or struct
session (itd) pointers on occasion, in cli_writeq calls.
Jeff
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [tabled patch] abstract out TCP-write code
2010-09-23 1:26 ` Jeff Garzik
@ 2010-09-23 2:37 ` Pete Zaitcev
2010-09-23 4:32 ` Jeff Garzik
2010-09-23 23:51 ` Jeff Garzik
0 siblings, 2 replies; 11+ messages in thread
From: Pete Zaitcev @ 2010-09-23 2:37 UTC (permalink / raw)
To: Jeff Garzik; +Cc: hail-devel
On Wed, 22 Sep 2010 21:26:13 -0400
Jeff Garzik <jeff@garzik.org> wrote:
> > So, we go a longer route and re-hook the list of completions
> > to a per-server global instead of a client. The patch is straight-
> > forward. The only thing we need to be careful is to make sure
> > that no outstanding completions are left in the queue before
> > freeing a client struct. This is ensured by force-running completions.
> Looking at this change again, I don't see how this avoids
> use-after-free. If completions exist after state change function leads
> one to cli_evt_dispose() -> cli_free(), then cli_write_run_compl() still
> calls cli_write_free() with the stale 'cli' pointer.
We run completions before freeing in all cases. My patch was correct.
> It seems like the real fix is to have the functions in the FSM loop
> return an additional piece of information, indicating that 'cli' is no
> longer valid.
That's kinda backwards. Might as well add refcounts.
> client_write's object lifetime should always be a subset of client's
> object lifetime.
Sure. But that's an argument for refcounting struct cli.
> > Speaking of backpointers, I think it would be much cleaner
> > to get rid of two-argument format for callback. It stinks of
> > special-casing. Either throw a back pointer into the first word
> > of buf, or create some kind of object/struct passed as
> > argument to atcp_writeq(), that's what I would do.
>
> It is a common idiom even in GLib that callbacks receive two anonymous
> pointers; witness the data type GFunc's 'data' and 'user_data'
> arguments:
> http://library.gnome.org/devel/glib/stable/glib-Doubly-Linked-Lists.html#GFunc
There's a lot of retarged garbage in Glib, just look at their lists.
If someone smarter wrote Glib, we would not need struct list_head.
Heck in fact queue.h existed before Glib and they still blew it.
Oh god and their hash is no better.
> This is because typically one has two objects -- a parent and a child --
> with different object lifetime. []
I think your excuses are getting more sophisticated than the code
is worth. If you don't see how 2 arguments are special-casing, fine.
Just say you like it.
-- Pete
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [tabled patch] abstract out TCP-write code
2010-09-23 2:37 ` Pete Zaitcev
@ 2010-09-23 4:32 ` Jeff Garzik
2010-09-23 13:57 ` Pete Zaitcev
2010-09-23 23:51 ` Jeff Garzik
1 sibling, 1 reply; 11+ messages in thread
From: Jeff Garzik @ 2010-09-23 4:32 UTC (permalink / raw)
To: Pete Zaitcev; +Cc: hail-devel
On 09/22/2010 10:37 PM, Pete Zaitcev wrote:
> On Wed, 22 Sep 2010 21:26:13 -0400
> Jeff Garzik<jeff@garzik.org> wrote:
>
>>> So, we go a longer route and re-hook the list of completions
>>> to a per-server global instead of a client. The patch is straight-
>>> forward. The only thing we need to be careful is to make sure
>>> that no outstanding completions are left in the queue before
>>> freeing a client struct. This is ensured by force-running completions.
>
>> Looking at this change again, I don't see how this avoids
>> use-after-free. If completions exist after state change function leads
>> one to cli_evt_dispose() -> cli_free(), then cli_write_run_compl() still
>> calls cli_write_free() with the stale 'cli' pointer.
>
> We run completions before freeing in all cases. My patch was correct.
Logically, if completions are run before freeing in all cases, there is
no need to make write_compl_q global. That was a red herring, which by
side effect avoided the bug with the stale 'cli' pointer.
Jeff
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [tabled patch] abstract out TCP-write code
2010-09-23 4:32 ` Jeff Garzik
@ 2010-09-23 13:57 ` Pete Zaitcev
2010-09-23 15:28 ` Jim Meyering
2010-09-23 16:47 ` Jeff Garzik
0 siblings, 2 replies; 11+ messages in thread
From: Pete Zaitcev @ 2010-09-23 13:57 UTC (permalink / raw)
To: Jeff Garzik; +Cc: hail-devel, zaitcev
On Thu, 23 Sep 2010 00:32:09 -0400
Jeff Garzik <jeff@garzik.org> wrote:
> >>> So, we go a longer route and re-hook the list of completions
> >>> to a per-server global instead of a client. The patch is straight-
> >>> forward. The only thing we need to be careful is to make sure
> >>> that no outstanding completions are left in the queue before
> >>> freeing a client struct. This is ensured by force-running completions.
> >
> >> Looking at this change again, I don't see how this avoids
> >> use-after-free. If completions exist after state change function leads
> >> one to cli_evt_dispose() -> cli_free(), then cli_write_run_compl() still
> >> calls cli_write_free() with the stale 'cli' pointer.
> >
> > We run completions before freeing in all cases. My patch was correct.
>
> Logically, if completions are run before freeing in all cases, there is
> no need to make write_compl_q global. That was a red herring, which by
> side effect avoided the bug with the stale 'cli' pointer.
Side effect or not, if one applies your patch and executes
"export MALLOC_PERTURB_=43" command before "make check",
the result is a crash:
Core was generated by `../server/tabled -C ../test/tabled-test.conf -E'.
Program terminated with signal 11, Segmentation fault.
#0 atcp_write_free (tmp=0x2b2b2b2b2b2b2afb, done=true) at atcp.c:49
49 struct atcp_wr_state *wst = tmp->wst;
(gdb) where
#0 atcp_write_free (tmp=0x2b2b2b2b2b2b2afb, done=true) at atcp.c:49
#1 0x000000000040387e in atcp_write_run_compl (wst=0x15a9be8) at atcp.c:70
#2 0x000000000040f20a in tcp_cli_event (fd=<value optimized out>, events=2,
userdata=0x15a9af0) at server.c:1227
#3 0x00007f9320aec774 in event_base_loop () from /usr/lib64/libevent-1.4.so.2
#4 0x00000000004107a5 in main (argc=<value optimized out>,
argv=<value optimized out>) at server.c:2139
(gdb)
The existing code (with the commit that you criticized) produces no crash.
Granted MALLOC_PERTURB_ is like SLAB debug option -- is not the normal
operating environment -- but IMHO it is completely legitimate.
-- Pete
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [tabled patch] abstract out TCP-write code
2010-09-23 13:57 ` Pete Zaitcev
@ 2010-09-23 15:28 ` Jim Meyering
2010-09-23 23:48 ` Jeff Garzik
2010-09-23 16:47 ` Jeff Garzik
1 sibling, 1 reply; 11+ messages in thread
From: Jim Meyering @ 2010-09-23 15:28 UTC (permalink / raw)
To: Pete Zaitcev; +Cc: Jeff Garzik, hail-devel
Pete Zaitcev wrote:
> On Thu, 23 Sep 2010 00:32:09 -0400
> Jeff Garzik <jeff@garzik.org> wrote:
>
>> >>> So, we go a longer route and re-hook the list of completions
>> >>> to a per-server global instead of a client. The patch is straight-
>> >>> forward. The only thing we need to be careful is to make sure
>> >>> that no outstanding completions are left in the queue before
>> >>> freeing a client struct. This is ensured by force-running completions.
>> >
>> >> Looking at this change again, I don't see how this avoids
>> >> use-after-free. If completions exist after state change function leads
>> >> one to cli_evt_dispose() -> cli_free(), then cli_write_run_compl() still
>> >> calls cli_write_free() with the stale 'cli' pointer.
>> >
>> > We run completions before freeing in all cases. My patch was correct.
>>
>> Logically, if completions are run before freeing in all cases, there is
>> no need to make write_compl_q global. That was a red herring, which by
>> side effect avoided the bug with the stale 'cli' pointer.
>
> Side effect or not, if one applies your patch and executes
> "export MALLOC_PERTURB_=43" command before "make check",
> the result is a crash:
>
> Core was generated by `../server/tabled -C ../test/tabled-test.conf -E'.
> Program terminated with signal 11, Segmentation fault.
> #0 atcp_write_free (tmp=0x2b2b2b2b2b2b2afb, done=true) at atcp.c:49
> 49 struct atcp_wr_state *wst = tmp->wst;
> (gdb) where
> #0 atcp_write_free (tmp=0x2b2b2b2b2b2b2afb, done=true) at atcp.c:49
> #1 0x000000000040387e in atcp_write_run_compl (wst=0x15a9be8) at atcp.c:70
> #2 0x000000000040f20a in tcp_cli_event (fd=<value optimized out>, events=2,
> userdata=0x15a9af0) at server.c:1227
> #3 0x00007f9320aec774 in event_base_loop () from /usr/lib64/libevent-1.4.so.2
> #4 0x00000000004107a5 in main (argc=<value optimized out>,
> argv=<value optimized out>) at server.c:2139
> (gdb)
>
> The existing code (with the commit that you criticized) produces no crash.
> Granted MALLOC_PERTURB_ is like SLAB debug option -- is not the normal
> operating environment -- but IMHO it is completely legitimate.
Every developer should have MALLOC_PERTURB_=N (N in 1..255) set in
his/her environment on glibc-based systems. Almost all the time.
Maybe I should push harder to get it added to rawhide's /etc/profile.
I proposed that a few months ago:
use MALLOC_PERTURB_ ... or lose
http://thread.gmane.org/gmane.linux.redhat.fedora.devel/132690
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [tabled patch] abstract out TCP-write code
2010-09-23 13:57 ` Pete Zaitcev
2010-09-23 15:28 ` Jim Meyering
@ 2010-09-23 16:47 ` Jeff Garzik
1 sibling, 0 replies; 11+ messages in thread
From: Jeff Garzik @ 2010-09-23 16:47 UTC (permalink / raw)
To: Pete Zaitcev; +Cc: hail-devel
On 09/23/2010 09:57 AM, Pete Zaitcev wrote:
> Side effect or not, if one applies your patch and executes
> "export MALLOC_PERTURB_=43" command before "make check",
> the result is a crash:
Yes, it is clear my patch re-introduces the problem. I thanked you for
pointing that out in the original reply. :)
The point was, that does not invalidate the approach of storing
write_compl_q in struct atcp_wr_state.
It merely highlights that a further fix (to the FSM? refcount client?
etc.) is required.
Jeff
^ permalink raw reply [flat|nested] 11+ messages in thread
* [tabled patch v2] abstract out TCP-write code
2010-09-23 0:09 [tabled patch] abstract out TCP-write code Jeff Garzik
2010-09-23 0:28 ` Pete Zaitcev
@ 2010-09-23 21:09 ` Jeff Garzik
1 sibling, 0 replies; 11+ messages in thread
From: Jeff Garzik @ 2010-09-23 21:09 UTC (permalink / raw)
To: hail-devel
Changes from v1:
- avoid referencing dead struct client (grep for 'invalidate_cli'),
by changing FSM callback prototype.
- insert 'void *priv' member into struct atcp_wr_state, and replace
cb_data1/cb_data2 callback parameters with (struct atcp_wr_state *, void *).
struct client / struct session, or whatever, may be stored in
atcp_wr_state::priv.
- minor API polishing and further abstraction
server/Makefile.am | 1
server/atcp.c | 238 +++++++++++++++++++++++++++++++++++++++++++++++
server/atcp.h | 100 +++++++++++++++++++
server/bucket.c | 8 -
server/object.c | 56 +++++------
server/server.c | 268 +++++++++--------------------------------------------
server/status.c | 3
server/tabled.h | 46 ++-------
8 files changed, 436 insertions(+), 284 deletions(-)
diff --git a/server/Makefile.am b/server/Makefile.am
index 5b53a0a..5e0abd5 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -4,6 +4,7 @@ INCLUDES = -I$(top_srcdir)/include @GLIB_CFLAGS@ @HAIL_CFLAGS@
sbin_PROGRAMS = tabled tdbadm
tabled_SOURCES = tabled.h \
+ atcp.c atcp.h \
bucket.c cldu.c config.c metarep.c object.c replica.c \
server.c status.c storage.c storparse.c util.c
tabled_LDADD = ../lib/libtdb.a \
diff --git a/server/atcp.c b/server/atcp.c
new file mode 100644
index 0000000..0050a68
--- /dev/null
+++ b/server/atcp.c
@@ -0,0 +1,238 @@
+
+/*
+ * Copyright 2010 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING. If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define _GNU_SOURCE
+#include "tabled-config.h"
+
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <sys/uio.h>
+#include "atcp.h"
+
+bool atcp_cb_free(struct atcp_wr_state *wst, void *cb_data, bool done)
+{
+ free(cb_data);
+ return false;
+}
+
+static void atcp_write_complete(struct atcp_write *tmp)
+{
+ struct atcp_wr_state *wst = tmp->wst;
+
+ list_del(&tmp->node);
+ list_add_tail(&tmp->node, &wst->write_compl_q);
+}
+
+static bool atcp_write_free(struct atcp_write *tmp, bool done)
+{
+ struct atcp_wr_state *wst = tmp->wst;
+ bool rcb = false;
+
+ wst->write_cnt -= tmp->length;
+ list_del_init(&tmp->node);
+ if (tmp->cb)
+ rcb = tmp->cb(wst, tmp->cb_data, done);
+ free(tmp);
+
+ return rcb;
+}
+
+bool atcp_write_run_compl(struct atcp_wr_state *wst)
+{
+ struct atcp_write *wr;
+ bool do_loop;
+
+ do_loop = false;
+ while (!list_empty(&wst->write_compl_q)) {
+ wr = list_entry(wst->write_compl_q.next,
+ struct atcp_write, node);
+ do_loop |= atcp_write_free(wr, true);
+ }
+ return do_loop;
+}
+
+void atcp_write_free_all(struct atcp_wr_state *wst)
+{
+ struct atcp_write *wr, *tmp;
+
+ atcp_write_run_compl(wst);
+ list_for_each_entry_safe(wr, tmp, &wst->write_q, node) {
+ atcp_write_free(wr, false);
+ }
+}
+
+static bool atcp_writable(struct atcp_wr_state *wst)
+{
+ int n_iov;
+ struct atcp_write *tmp;
+ ssize_t rc;
+ struct iovec iov[ATCP_MAX_WR_IOV];
+
+ /* accumulate pending writes into iovec */
+ n_iov = 0;
+ list_for_each_entry(tmp, &wst->write_q, node) {
+ if (n_iov == ATCP_MAX_WR_IOV)
+ break;
+ /* bleh, struct iovec should declare iov_base const */
+ iov[n_iov].iov_base = (void *) tmp->buf;
+ iov[n_iov].iov_len = tmp->togo;
+ n_iov++;
+ }
+
+ /* execute non-blocking write */
+do_write:
+ rc = writev(wst->fd, iov, n_iov);
+ if (rc < 0) {
+ if (errno == EINTR)
+ goto do_write;
+ if (errno != EAGAIN)
+ goto err_out;
+ return true;
+ }
+
+ /* iterate through write queue, issuing completions based on
+ * amount of data written
+ */
+ while (rc > 0) {
+ int sz;
+
+ /* get pointer to first record on list */
+ tmp = list_entry(wst->write_q.next, struct atcp_write, node);
+
+ /* mark data consumed by decreasing tmp->len */
+ sz = (tmp->togo < rc) ? tmp->togo : rc;
+ tmp->togo -= sz;
+ tmp->buf += sz;
+ rc -= sz;
+
+ /* if tmp->len reaches zero, write is complete,
+ * so schedule it for clean up (cannot call callback
+ * right away or an endless recursion will result)
+ */
+ if (tmp->togo == 0)
+ atcp_write_complete(tmp);
+ }
+
+ /* if we emptied the queue, clear write notification */
+ if (list_empty(&wst->write_q)) {
+ wst->writing = false;
+ if (event_del(&wst->write_ev) < 0)
+ goto err_out;
+ }
+
+ return true;
+
+err_out:
+ atcp_write_free_all(wst);
+ return false;
+}
+
+static void atcp_wr_event(int fd, short events, void *userdata)
+{
+ struct atcp_wr_state *wst = userdata;
+
+ atcp_writable(wst);
+ atcp_write_run_compl(wst);
+}
+
+void atcp_wr_set_fd(struct atcp_wr_state *wst, int fd)
+{
+ wst->fd = fd;
+
+ event_set(&wst->write_ev, fd, EV_WRITE | EV_PERSIST,
+ atcp_wr_event, wst);
+}
+
+bool atcp_write_start(struct atcp_wr_state *wst)
+{
+ if (list_empty(&wst->write_q))
+ return true; /* loop, not poll */
+
+ /* if write-poll already active, nothing further to do */
+ if (wst->writing)
+ return false; /* poll wait */
+
+ /* attempt optimistic write, in hopes of avoiding poll,
+ * or at least refill the write buffers so as to not
+ * get -immediately- called again by the kernel
+ */
+ atcp_writable(wst);
+ if (list_empty(&wst->write_q)) {
+ wst->opt_write++;
+ return true; /* loop, not poll */
+ }
+
+ if (event_add(&wst->write_ev, NULL) < 0)
+ return true; /* loop, not poll */
+
+ wst->writing = true;
+
+ return false; /* poll wait */
+}
+
+int atcp_writeq(struct atcp_wr_state *wst, const void *buf, unsigned int buflen,
+ atcp_write_func cb, void *cb_data)
+{
+ struct atcp_write *wr;
+
+ if (!buf || !buflen)
+ return -EINVAL;
+
+ wr = calloc(1, sizeof(struct atcp_write));
+ if (!wr)
+ return -ENOMEM;
+
+ wr->buf = buf;
+ wr->togo = buflen;
+ wr->length = buflen;
+ wr->cb = cb;
+ wr->cb_data = cb_data;
+ wr->wst = wst;
+ list_add_tail(&wr->node, &wst->write_q);
+ wst->write_cnt += buflen;
+ if (wst->write_cnt > wst->write_cnt_max)
+ wst->write_cnt_max = wst->write_cnt;
+
+ return 0;
+}
+
+void atcp_wr_exit(struct atcp_wr_state *wst)
+{
+ if (!wst)
+ return;
+
+ if (wst->writing)
+ event_del(&wst->write_ev);
+
+ atcp_write_free_all(wst);
+}
+
+void atcp_wr_init(struct atcp_wr_state *wst, void *priv)
+{
+ memset(wst, 0, sizeof(*wst));
+
+ INIT_LIST_HEAD(&wst->write_q);
+ INIT_LIST_HEAD(&wst->write_compl_q);
+
+ wst->fd = -1;
+
+ wst->priv = priv;
+}
+
diff --git a/server/atcp.h b/server/atcp.h
new file mode 100644
index 0000000..e611aab
--- /dev/null
+++ b/server/atcp.h
@@ -0,0 +1,100 @@
+#ifndef __ATCP_H__
+#define __ATCP_H__
+
+/*
+ * Copyright 2010 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING. If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <event.h>
+#include <elist.h>
+
+enum {
+ ATCP_MAX_WR_IOV = 32, /* max iov per writev(2) */
+};
+
+struct atcp_wr_state {
+ int fd; /* our socket */
+
+ bool writing; /* actively trying to write? */
+
+ size_t write_cnt; /* water level */
+ size_t write_cnt_max;
+
+ struct list_head write_q; /* list of async writes */
+ struct list_head write_compl_q; /* list of done writes */
+
+ struct event write_ev;
+
+ void *priv; /* untouched by atcp */
+
+ /* various statistics */
+ uint64_t opt_write; /* optimistic writes */
+};
+
+typedef bool (*atcp_write_func)(struct atcp_wr_state *, void *, bool);
+
+struct atcp_write {
+ const void *buf; /* write buffer pointer */
+ int togo; /* write buffer remainder */
+
+ int length; /* length for accounting */
+ atcp_write_func cb; /* callback */
+ void *cb_data; /* data passed to cb */
+
+ struct atcp_wr_state *wst; /* our parent */
+
+ struct list_head node; /* write_[compl_]q list node */
+};
+
+/* setup and teardown atcp write state */
+extern void atcp_wr_exit(struct atcp_wr_state *wst);
+extern void atcp_wr_init(struct atcp_wr_state *wst, void *priv);
+
+/* generic write callback, that call free(cb_data2) */
+extern bool atcp_cb_free(struct atcp_wr_state *wst, void *cb_data, bool done);
+
+/* clear all write queues immediately, even if not complete */
+extern void atcp_write_free_all(struct atcp_wr_state *wst);
+
+/* complete all writes found on completion queue */
+extern bool atcp_write_run_compl(struct atcp_wr_state *wst);
+
+/* initialize internal fd, event setup */
+extern void atcp_wr_set_fd(struct atcp_wr_state *wst, int fd);
+
+/* add a buffer to the write queue */
+extern int atcp_writeq(struct atcp_wr_state *wst, const void *buf, unsigned int buflen,
+ atcp_write_func cb, void *cb_data);
+
+/* begin pushing write queue to socket */
+extern bool atcp_write_start(struct atcp_wr_state *wst);
+
+/* is anything on the write queue at the moment? */
+static inline bool atcp_wq_empty(struct atcp_wr_state *wst)
+{
+ return list_empty(&wst->write_q) ? true : false;
+}
+
+/* total number of octets queued at this moment */
+static inline size_t atcp_wqueued(struct atcp_wr_state *wst)
+{
+ return wst->write_cnt;
+}
+
+#endif /* __ATCP_H__ */
diff --git a/server/bucket.c b/server/bucket.c
index eb03e03..a81eca1 100644
--- a/server/bucket.c
+++ b/server/bucket.c
@@ -566,13 +566,13 @@ bool bucket_add(struct client *cli, const char *user, const char *bucket)
bucket) < 0)
return cli_err(cli, InternalError);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out:
rc = txn->abort(txn);
@@ -718,13 +718,13 @@ bool bucket_del(struct client *cli, const char *user, const char *bucket)
hutil_time2str(timestr, sizeof(timestr), time(NULL))) < 0)
return cli_err(cli, InternalError);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out:
rc = txn->abort(txn);
diff --git a/server/object.c b/server/object.c
index 3801e94..64cc8b6 100644
--- a/server/object.c
+++ b/server/object.c
@@ -227,13 +227,13 @@ bool object_del(struct client *cli, const char *user,
hutil_time2str(timestr, sizeof(timestr), time(NULL))) < 0)
return cli_err(cli, InternalError);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out:
rc = txn->abort(txn);
@@ -525,13 +525,13 @@ static bool object_put_end(struct client *cli)
return cli_err(cli, InternalError);
}
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out_rb:
rc = txn->abort(txn);
@@ -618,7 +618,8 @@ static int object_put_buf(struct client *cli, struct open_chunk *ochunk,
return 0;
}
-bool cli_evt_http_data_in(struct client *cli, unsigned int events)
+bool cli_evt_http_data_in(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
ssize_t avail;
struct open_chunk *ochunk;
@@ -812,8 +813,8 @@ static bool object_put_body(struct client *cli, const char *user,
cli_out_end(cli);
return cli_err(cli, InternalError);
}
- cli_writeq(cli, cont, strlen(cont), cli_cb_free, cont);
- cli_write_start(cli);
+ atcp_writeq(&cli->wst, cont, strlen(cont), atcp_cb_free, cont);
+ atcp_write_start(&cli->wst);
}
avail = MIN(cli_req_avail(cli), content_len);
@@ -940,13 +941,13 @@ static bool object_put_acls(struct client *cli, const char *user,
return cli_err(cli, InternalError);
}
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out_rb:
rc = txn->abort(txn);
@@ -990,10 +991,10 @@ void cli_in_end(struct client *cli)
cli->in_len = 0;
}
-static bool object_get_more(struct client *cli, void *cb_data, bool done);
+static bool object_get_more(struct atcp_wr_state *wst, void *cb_data, bool done);
/*
- * Return true iff cli_writeq was called. This is compatible with the
+ * Return true iff atcp_writeq was called. This is compatible with the
* convention for cli continuation callbacks, so object_get_more can call us.
*/
static bool object_get_poke(struct client *cli)
@@ -1026,7 +1027,7 @@ static bool object_get_poke(struct client *cli)
if (bytes == 0) {
if (!cli->in_len) {
cli_in_end(cli);
- cli_write_start(cli);
+ atcp_write_start(&cli->wst);
}
free(buf);
return false;
@@ -1034,15 +1035,15 @@ static bool object_get_poke(struct client *cli)
cli->in_len -= bytes;
if (!cli->in_len) {
- if (cli_writeq(cli, buf, bytes, cli_cb_free, buf))
+ if (atcp_writeq(&cli->wst, buf, bytes, atcp_cb_free, buf))
goto err_out;
cli_in_end(cli);
- cli_write_start(cli);
+ atcp_write_start(&cli->wst);
} else {
- if (cli_writeq(cli, buf, bytes, object_get_more, buf))
+ if (atcp_writeq(&cli->wst, buf, bytes, object_get_more, buf))
goto err_out;
- if (cli_wqueued(cli) >= CLI_DATA_BUF_SZ)
- cli_write_start(cli);
+ if (atcp_wqueued(&cli->wst) >= CLI_DATA_BUF_SZ)
+ atcp_write_start(&cli->wst);
}
return true;
@@ -1053,8 +1054,9 @@ err_out:
}
/* callback from the client side: a queued write is being disposed */
-static bool object_get_more(struct client *cli, void *cb_data, bool done)
+static bool object_get_more(struct atcp_wr_state *wst, void *cb_data, bool done)
{
+ struct client *cli = wst->priv;
/* free now-written buffer */
free(cb_data);
@@ -1071,8 +1073,10 @@ static bool object_get_more(struct client *cli, void *cb_data, bool done)
/* callback from the chunkd side: some data is available */
static void object_get_event(struct open_chunk *ochunk)
{
- object_get_poke(ochunk->cli);
- cli_write_run_compl();
+ struct client *cli = ochunk->cli;
+
+ object_get_poke(cli);
+ atcp_write_run_compl(&cli->wst);
}
static int object_node_count_up(struct db_obj_ent *obj)
@@ -1327,7 +1331,7 @@ static bool object_get_body(struct client *cli, const char *user,
if (!want_body) {
cli_in_end(cli);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
return true;
@@ -1347,7 +1351,7 @@ static bool object_get_body(struct client *cli, const char *user,
if (!cli->in_len)
cli_in_end(cli);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
goto err_out_in_end;
@@ -1365,21 +1369,21 @@ static bool object_get_body(struct client *cli, const char *user,
goto err_out_in_end;
memcpy(tmp, buf, bytes);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
free(tmp);
return true;
}
- if (cli_writeq(cli, tmp, bytes,
- cli->in_len ? object_get_more : cli_cb_free, tmp))
+ if (atcp_writeq(&cli->wst, tmp, bytes,
+ cli->in_len ? object_get_more : atcp_cb_free, tmp))
goto err_out_in_end;
start_write:
free(obj);
g_string_free(extra_hdr, TRUE);
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out_in_end:
cli_in_end(cli);
diff --git a/server/server.c b/server/server.c
index 3398026..6bdccad 100644
--- a/server/server.c
+++ b/server/server.c
@@ -56,8 +56,6 @@
const char *argp_program_version = PACKAGE_VERSION;
enum {
- CLI_MAX_WR_IOV = 32, /* max iov per writev(2) */
-
SFL_FOREGROUND = (1 << 0), /* run in foreground */
};
@@ -488,65 +486,25 @@ bool stat_status(struct client *cli, GList *content)
if (asprintf(&str,
"<p>Stats: "
- "poll %lu event %lu tcp_accept %lu opt_write %lu</p>\r\n"
- "<p>Debug: max_write_buf %lu</p>\r\n",
- tabled_srv.stats.poll,
- tabled_srv.stats.event,
- tabled_srv.stats.tcp_accept,
- tabled_srv.stats.opt_write,
- tabled_srv.stats.max_write_buf) < 0)
+ "poll %llu event %llu tcp_accept %llu opt_write %llu</p>\r\n"
+ "<p>Debug: max_write_buf %llu</p>\r\n",
+ (unsigned long long) tabled_srv.stats.poll,
+ (unsigned long long) tabled_srv.stats.event,
+ (unsigned long long) tabled_srv.stats.tcp_accept,
+ (unsigned long long) tabled_srv.stats.opt_write,
+ (unsigned long long) tabled_srv.stats.max_write_buf) < 0)
return false;
content = g_list_append(content, str);
return true;
}
-static void cli_write_complete(struct client *cli, struct client_write *tmp)
-{
- list_del(&tmp->node);
- list_add_tail(&tmp->node, &tabled_srv.write_compl_q);
-}
-
-static bool cli_write_free(struct client_write *tmp, bool done)
-{
- struct client *cli = tmp->cb_cli;
- bool rcb = false;
-
- cli->write_cnt -= tmp->length;
- list_del(&tmp->node);
- if (tmp->cb)
- rcb = tmp->cb(cli, tmp->cb_data, done);
- free(tmp);
-
- return rcb;
-}
-
-static void cli_write_free_all(struct client *cli)
-{
- struct client_write *wr, *tmp;
-
- cli_write_run_compl();
- list_for_each_entry_safe(wr, tmp, &cli->write_q, node) {
- cli_write_free(wr, false);
- }
-}
-
-bool cli_write_run_compl(void)
-{
- struct client_write *wr;
- bool do_loop;
-
- do_loop = false;
- while (!list_empty(&tabled_srv.write_compl_q)) {
- wr = list_entry(tabled_srv.write_compl_q.next,
- struct client_write, node);
- do_loop |= cli_write_free(wr, true);
- }
- return do_loop;
-}
-
static void cli_free(struct client *cli)
{
- cli_write_free_all(cli);
+ if (cli->wst.write_cnt_max > tabled_srv.stats.max_write_buf)
+ tabled_srv.stats.max_write_buf = cli->wst.write_cnt_max;
+ tabled_srv.stats.opt_write += cli->wst.opt_write;
+
+ atcp_wr_exit(&cli->wst);
cli_out_end(cli);
cli_in_end(cli);
@@ -561,27 +519,28 @@ static void cli_free(struct client *cli)
hreq_free(&cli->req);
- if (cli->write_cnt_max > tabled_srv.stats.max_write_buf)
- tabled_srv.stats.max_write_buf = cli->write_cnt_max;
-
if (debugging)
applog(LOG_INFO, "client %s ended", cli->addr_host);
free(cli);
}
-static bool cli_evt_dispose(struct client *cli, unsigned int events)
+static bool cli_evt_dispose(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
/* if write queue is not empty, we should continue to get
* poll callbacks here until it is
*/
- if (list_empty(&cli->write_q))
+ if (atcp_wq_empty(&cli->wst)) {
cli_free(cli);
+ *invalidate_cli = true;
+ }
return false;
}
-static bool cli_evt_recycle(struct client *cli, unsigned int events)
+static bool cli_evt_recycle(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
unsigned int slop;
@@ -608,134 +567,6 @@ static bool cli_evt_recycle(struct client *cli, unsigned int events)
return true;
}
-static void cli_writable(struct client *cli)
-{
- int n_iov;
- struct client_write *tmp;
- ssize_t rc;
- struct iovec iov[CLI_MAX_WR_IOV];
-
- /* accumulate pending writes into iovec */
- n_iov = 0;
- list_for_each_entry(tmp, &cli->write_q, node) {
- if (n_iov == CLI_MAX_WR_IOV)
- break;
- /* bleh, struct iovec should declare iov_base const */
- iov[n_iov].iov_base = (void *) tmp->buf;
- iov[n_iov].iov_len = tmp->togo;
- n_iov++;
- }
-
- /* execute non-blocking write */
-do_write:
- rc = writev(cli->fd, iov, n_iov);
- if (rc < 0) {
- if (errno == EINTR)
- goto do_write;
- if (errno != EAGAIN)
- goto err_out;
- return;
- }
-
- /* iterate through write queue, issuing completions based on
- * amount of data written
- */
- while (rc > 0) {
- int sz;
-
- /* get pointer to first record on list */
- tmp = list_entry(cli->write_q.next, struct client_write, node);
-
- /* mark data consumed by decreasing tmp->len */
- sz = (tmp->togo < rc) ? tmp->togo : rc;
- tmp->togo -= sz;
- tmp->buf += sz;
- rc -= sz;
-
- /* if tmp->len reaches zero, write is complete,
- * so schedule it for clean up (cannot call callback
- * right away or an endless recursion will result)
- */
- if (tmp->togo == 0)
- cli_write_complete(cli, tmp);
- }
-
- /* if we emptied the queue, clear write notification */
- if (list_empty(&cli->write_q)) {
- cli->writing = false;
- if (event_del(&cli->write_ev) < 0) {
- applog(LOG_WARNING, "cli_writable event_del");
- goto err_out;
- }
- }
-
- return;
-
-err_out:
- cli->state = evt_dispose;
- cli_write_free_all(cli);
-}
-
-bool cli_write_start(struct client *cli)
-{
- if (list_empty(&cli->write_q))
- return true; /* loop, not poll */
-
- /* if write-poll already active, nothing further to do */
- if (cli->writing)
- return false; /* poll wait */
-
- /* attempt optimistic write, in hopes of avoiding poll,
- * or at least refill the write buffers so as to not
- * get -immediately- called again by the kernel
- */
- cli_writable(cli);
- if (list_empty(&cli->write_q)) {
- tabled_srv.stats.opt_write++;
- return true; /* loop, not poll */
- }
-
- if (event_add(&cli->write_ev, NULL) < 0) {
- applog(LOG_WARNING, "cli_write event_add");
- return true; /* loop, not poll */
- }
-
- cli->writing = true;
-
- return false; /* poll wait */
-}
-
-int cli_writeq(struct client *cli, const void *buf, unsigned int buflen,
- cli_write_func cb, void *cb_data)
-{
- struct client_write *wr;
-
- if (!buf || !buflen)
- return -EINVAL;
-
- wr = calloc(1, sizeof(struct client_write));
- if (!wr)
- return -ENOMEM;
-
- wr->buf = buf;
- wr->togo = buflen;
- wr->length = buflen;
- wr->cb = cb;
- wr->cb_data = cb_data;
- wr->cb_cli = cli;
- list_add_tail(&wr->node, &cli->write_q);
- cli->write_cnt += buflen;
- if (cli->write_cnt > cli->write_cnt_max)
- cli->write_cnt_max = cli->write_cnt;
-
- return 0;
-}
-
-size_t cli_wqueued(struct client *cli)
-{
- return cli->write_cnt;
-}
-
/*
* Return:
* 0: progress was NOT made (EOF)
@@ -771,12 +602,6 @@ do_read:
return rc != 0;
}
-bool cli_cb_free(struct client *cli, void *cb_data, bool done)
-{
- free(cb_data);
- return false;
-}
-
static int cli_write_list(struct client *cli, GList *list)
{
int rc = 0;
@@ -784,8 +609,8 @@ static int cli_write_list(struct client *cli, GList *list)
tmp = list;
while (tmp) {
- rc = cli_writeq(cli, tmp->data, strlen(tmp->data),
- cli_cb_free, tmp->data);
+ rc = atcp_writeq(&cli->wst, tmp->data, strlen(tmp->data),
+ atcp_cb_free, tmp->data);
if (rc)
goto out;
@@ -870,14 +695,14 @@ bool cli_err_write(struct client *cli, char *hdr, char *content)
cli->state = evt_dispose;
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc)
return true;
- rc = cli_writeq(cli, content, strlen(content), cli_cb_free, content);
+ rc = atcp_writeq(&cli->wst, content, strlen(content), atcp_cb_free, content);
if (rc)
return true;
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
}
static bool cli_resp(struct client *cli, int http_status,
@@ -911,7 +736,7 @@ static bool cli_resp(struct client *cli, int http_status,
else
cli->state = evt_recycle;
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
cli->state = evt_dispose;
@@ -924,7 +749,7 @@ static bool cli_resp(struct client *cli, int http_status,
return true;
}
- rcb = cli_write_start(cli);
+ rcb = atcp_write_start(&cli->wst);
if (cli->state == evt_recycle)
return true;
@@ -942,7 +767,8 @@ bool cli_resp_html(struct client *cli, int http_status, GList *content)
return cli_resp(cli, http_status, "text/html", content);
}
-static bool cli_evt_http_req(struct client *cli, unsigned int events)
+static bool cli_evt_http_req(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
struct http_req *req = &cli->req;
char *host, *auth, *content_len_str;
@@ -1195,7 +1021,8 @@ err_out:
return true;
}
-static bool cli_evt_parse_hdr(struct client *cli, unsigned int events)
+static bool cli_evt_parse_hdr(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
char *buf, *buf_eol;
bool eoh = false;
@@ -1252,7 +1079,8 @@ static bool cli_evt_parse_hdr(struct client *cli, unsigned int events)
return true;
}
-static bool cli_evt_read_hdr(struct client *cli, unsigned int events)
+static bool cli_evt_read_hdr(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
int rc = cli_read(cli);
if (rc <= 0) {
@@ -1268,7 +1096,8 @@ static bool cli_evt_read_hdr(struct client *cli, unsigned int events)
return true;
}
-static bool cli_evt_parse_req(struct client *cli, unsigned int events)
+static bool cli_evt_parse_req(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
char *sp1, *sp2, *buf;
enum errcode err_resp;
@@ -1336,7 +1165,8 @@ err_out:
return cli_err(cli, err_resp);
}
-static bool cli_evt_read_req(struct client *cli, unsigned int events)
+static bool cli_evt_read_req(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
int rc = cli_read(cli);
if (rc <= 0) {
@@ -1385,9 +1215,10 @@ static struct client *cli_alloc(bool is_status)
return NULL;
}
+ atcp_wr_init(&cli->wst, cli);
+
cli->state = evt_read_req;
cli->evt_table = is_status? evt_funcs_status: evt_funcs_server;
- INIT_LIST_HEAD(&cli->write_q);
INIT_LIST_HEAD(&cli->out_ch);
cli->req_ptr = cli->req_buf;
memset(&cli->req, 0, sizeof(cli->req) - sizeof(cli->req.hdr));
@@ -1395,22 +1226,20 @@ static struct client *cli_alloc(bool is_status)
return cli;
}
-static void tcp_cli_wr_event(int fd, short events, void *userdata)
-{
- struct client *cli = userdata;
-
- cli_writable(cli);
- cli_write_run_compl();
-}
-
static void tcp_cli_event(int fd, short events, void *userdata)
{
struct client *cli = userdata;
bool loop;
+ bool invalidate_cli = false;
do {
- loop = cli->evt_table[cli->state](cli, events);
- loop |= cli_write_run_compl();
+ loop = cli->evt_table[cli->state](cli, events, &invalidate_cli);
+ if (invalidate_cli) {
+ cli = NULL;
+ break;
+ }
+
+ loop |= atcp_write_run_compl(&cli->wst);
} while (loop);
}
@@ -1438,11 +1267,11 @@ static void tcp_srv_event(int fd, short events, void *userdata)
goto err_out;
}
+ atcp_wr_set_fd(&cli->wst, cli->fd);
+
tabled_srv.stats.tcp_accept++;
event_set(&cli->ev, cli->fd, EV_READ | EV_PERSIST, tcp_cli_event, cli);
- event_set(&cli->write_ev, cli->fd, EV_WRITE | EV_PERSIST,
- tcp_cli_wr_event, cli);
/* mark non-blocking, for upcoming poll use */
if (fsetflags("tcp client", cli->fd, O_NONBLOCK) < 0)
@@ -2205,7 +2034,6 @@ int main (int argc, char *argv[])
struct event_base *event_base_rep;
INIT_LIST_HEAD(&tabled_srv.all_stor);
- INIT_LIST_HEAD(&tabled_srv.write_compl_q);
tabled_srv.state_tdb = ST_TDB_INIT;
tabled_srv.rep_next_id = DBID_MIN;
diff --git a/server/status.c b/server/status.c
index e9fbb38..9af60f7 100644
--- a/server/status.c
+++ b/server/status.c
@@ -150,7 +150,8 @@ out_err:
return cli_err(cli, InternalError);
}
-bool stat_evt_http_req(struct client *cli, unsigned int events)
+bool stat_evt_http_req(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
struct http_req *req = &cli->req;
char *method = req->method;
diff --git a/server/tabled.h b/server/tabled.h
index d4d2048..08fd9ad 100644
--- a/server/tabled.h
+++ b/server/tabled.h
@@ -31,6 +31,7 @@
#include <elist.h>
#include <tdb.h>
#include <hail_log.h>
+#include "atcp.h"
#ifndef ARRAY_SIZE
#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
@@ -102,20 +103,8 @@ struct storage_node {
int ref; /* number of open_chunk or other */
};
-typedef bool (*cli_evt_func)(struct client *, unsigned int);
-typedef bool (*cli_write_func)(struct client *, void *, bool);
-
-struct client_write {
- const void *buf; /* write buffer pointer */
- int togo; /* write buffer remainder */
-
- int length; /* length for accounting */
- cli_write_func cb; /* callback */
- void *cb_data; /* data passed to cb */
- struct client *cb_cli; /* cli passed to cb */
-
- struct list_head node;
-};
+typedef bool (*cli_evt_func)(struct client *, unsigned int,
+ bool *invalidate_cli);
/* an open chunkd client */
struct open_chunk {
@@ -163,13 +152,9 @@ struct client {
int fd; /* socket */
bool ev_active;
struct event ev;
- struct event write_ev;
- struct list_head write_q; /* list of async writes */
- size_t write_cnt; /* water level */
- bool writing;
+ struct atcp_wr_state wst;
/* some debugging stats */
- size_t write_cnt_max;
unsigned int req_used; /* amount of req_buf in use */
char *req_ptr; /* start of unexamined data */
@@ -216,12 +201,12 @@ enum st_net {
};
struct server_stats {
- unsigned long poll; /* number polls */
- unsigned long event; /* events dispatched */
- unsigned long tcp_accept; /* TCP accepted cxns */
- unsigned long opt_write; /* optimistic writes */
+ uint64_t poll; /* number polls */
+ uint64_t event; /* events dispatched */
+ uint64_t tcp_accept; /* TCP accepted cxns */
+ uint64_t opt_write; /* optimistic writes */
- unsigned long max_write_buf;
+ uint64_t max_write_buf;
};
#define DBID_NONE 0
@@ -249,7 +234,6 @@ struct server {
struct event_base *evbase_main;
int ev_pipe[2];
struct event pevt;
- struct list_head write_compl_q; /* list of done writes */
bool mc_delay;
struct event mc_timer;
@@ -361,7 +345,8 @@ extern bool object_put(struct client *cli, const char *user, const char *bucket,
const char *key, long content_len, bool expect_cont);
extern bool object_get(struct client *cli, const char *user, const char *bucket,
const char *key, bool want_body);
-extern bool cli_evt_http_data_in(struct client *cli, unsigned int events);
+extern bool cli_evt_http_data_in(struct client *cli, unsigned int events,
+ bool *invalidate_cli);
extern void cli_out_end(struct client *cli);
extern void cli_in_end(struct client *cli);
@@ -398,12 +383,6 @@ extern bool cli_err(struct client *cli, enum errcode code);
extern bool cli_err_write(struct client *cli, char *hdr, char *content);
extern bool cli_resp_xml(struct client *cli, int http_status, GList *content);
extern bool cli_resp_html(struct client *cli, int http_status, GList *content);
-extern int cli_writeq(struct client *cli, const void *buf, unsigned int buflen,
- cli_write_func cb, void *cb_data);
-extern size_t cli_wqueued(struct client *cli);
-extern bool cli_cb_free(struct client *cli, void *cb_data, bool done);
-extern bool cli_write_start(struct client *cli);
-extern bool cli_write_run_compl(void);
extern int cli_req_avail(struct client *cli);
extern void applog(int prio, const char *fmt, ...);
extern void cld_update_cb(void);
@@ -415,7 +394,8 @@ extern struct db_remote *tdb_find_remote_byname(const char *name);
extern struct db_remote *tdb_find_remote_byid(int id);
/* status.c */
-extern bool stat_evt_http_req(struct client *cli, unsigned int events);
+extern bool stat_evt_http_req(struct client *cli, unsigned int events,
+ bool *invalidate_cli);
/* config.c */
extern void read_config(void);
^ permalink raw reply related [flat|nested] 11+ messages in thread
* Re: [tabled patch] abstract out TCP-write code
2010-09-23 15:28 ` Jim Meyering
@ 2010-09-23 23:48 ` Jeff Garzik
0 siblings, 0 replies; 11+ messages in thread
From: Jeff Garzik @ 2010-09-23 23:48 UTC (permalink / raw)
To: Jim Meyering; +Cc: Pete Zaitcev, hail-devel
On 09/23/2010 11:28 AM, Jim Meyering wrote:
> Every developer should have MALLOC_PERTURB_=N (N in 1..255) set in
> his/her environment on glibc-based systems. Almost all the time.
I heard about it a while ago, even submitted a bugzilla bug to have it
documented adequately. But apparently its absent from my .bash_profile.
Added.
Jeff
^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [tabled patch] abstract out TCP-write code
2010-09-23 2:37 ` Pete Zaitcev
2010-09-23 4:32 ` Jeff Garzik
@ 2010-09-23 23:51 ` Jeff Garzik
1 sibling, 0 replies; 11+ messages in thread
From: Jeff Garzik @ 2010-09-23 23:51 UTC (permalink / raw)
To: Pete Zaitcev; +Cc: hail-devel
On 09/22/2010 10:37 PM, Pete Zaitcev wrote:
> On Wed, 22 Sep 2010 21:26:13 -0400
> Jeff Garzik<jeff@garzik.org> wrote:
>> It is a common idiom even in GLib that callbacks receive two anonymous
>> pointers; witness the data type GFunc's 'data' and 'user_data'
>> arguments:
>> http://library.gnome.org/devel/glib/stable/glib-Doubly-Linked-Lists.html#GFunc
>
> There's a lot of retarged garbage in Glib, just look at their lists.
> If someone smarter wrote Glib, we would not need struct list_head.
I use both list types, because there's a use case for both. You don't
always have the luxury of having a struct in which to embed data+next
pointers. Allocated strings are an excellent example.
GFunc has two parameters for a reason :) See for example
http://library.gnome.org/devel/glib/stable/glib-Doubly-Linked-Lists.html#g-list-foreach
It really is a common idiom, based on a common need, not just my style
preference. :)
Jeff
^ permalink raw reply [flat|nested] 11+ messages in thread
end of thread, other threads:[~2010-09-23 23:51 UTC | newest]
Thread overview: 11+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2010-09-23 0:09 [tabled patch] abstract out TCP-write code Jeff Garzik
2010-09-23 0:28 ` Pete Zaitcev
2010-09-23 1:26 ` Jeff Garzik
2010-09-23 2:37 ` Pete Zaitcev
2010-09-23 4:32 ` Jeff Garzik
2010-09-23 13:57 ` Pete Zaitcev
2010-09-23 15:28 ` Jim Meyering
2010-09-23 23:48 ` Jeff Garzik
2010-09-23 16:47 ` Jeff Garzik
2010-09-23 23:51 ` Jeff Garzik
2010-09-23 21:09 ` [tabled patch v2] " Jeff Garzik
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.