* [PATCH] libhail: add async TCP network writing API, atcp_wr*
@ 2010-09-24 0:32 Jeff Garzik
2010-09-24 0:34 ` [PATCH] tabled: use libhail's anet Jeff Garzik
2010-09-24 0:35 ` [PATCH] itd: " Jeff Garzik
0 siblings, 2 replies; 4+ messages in thread
From: Jeff Garzik @ 2010-09-24 0:32 UTC (permalink / raw)
To: hail-devel
Just committed into libhail... renamed the include to 'anet.h' for
'asynchronous networking'.
include/Makefile.am | 2
include/anet.h | 111 +++++++++++++++++++++++
lib/Makefile.am | 1
lib/atcp.c | 241 ++++++++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 354 insertions(+), 1 deletion(-)
commit 22de683a8f0566852818fac8b54ca26ae46490f0
Author: Jeff Garzik <jeff@garzik.org>
Date: Thu Sep 23 20:17:56 2010 -0400
libhail: add async TCP network writing API, atcp_wr*
Signed-off-by: Jeff Garzik <jgarzik@redhat.com>
diff --git a/include/Makefile.am b/include/Makefile.am
index 234cf8a..967352a 100644
--- a/include/Makefile.am
+++ b/include/Makefile.am
@@ -5,5 +5,5 @@ EXTRA_DIST = \
include_HEADERS = \
cldc.h cld_common.h ncld.h chunkc.h chunk_msg.h \
- hail_log.h hstor.h
+ hail_log.h hstor.h anet.h
diff --git a/include/anet.h b/include/anet.h
new file mode 100644
index 0000000..5c216c7
--- /dev/null
+++ b/include/anet.h
@@ -0,0 +1,111 @@
+#ifndef __ANET_H__
+#define __ANET_H__
+
+/*
+ * Copyright 2010 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING. If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#include <stdint.h>
+#include <stdbool.h>
+#include <sys/time.h>
+#include <elist.h>
+
+enum {
+ ATCP_MAX_WR_IOV = 32, /* max iov per writev(2) */
+};
+
+typedef void (*atcp_ev_func)(int, short, void *);
+
+struct atcp_wr_ops {
+ int (*ev_wset)(void *, int, atcp_ev_func, void *);
+ int (*ev_add)(void *, const struct timeval *);
+ int (*ev_del)(void *);
+};
+
+struct atcp_wr_state {
+ int fd; /* our socket */
+
+ bool writing; /* actively trying to write? */
+
+ size_t write_cnt; /* water level */
+ size_t write_cnt_max;
+
+ struct list_head write_q; /* list of async writes */
+ struct list_head write_compl_q; /* list of done writes */
+
+ void *priv; /* untouched by atcp */
+
+ /* various statistics */
+ uint64_t opt_write; /* optimistic writes */
+
+ const struct atcp_wr_ops *ops;
+ void *ev_info; /* passed to ops->ev_* */
+};
+
+typedef bool (*atcp_write_func)(struct atcp_wr_state *, void *, bool);
+
+struct atcp_write {
+ const void *buf; /* write buffer pointer */
+ int togo; /* write buffer remainder */
+
+ int length; /* length for accounting */
+ atcp_write_func cb; /* callback */
+ void *cb_data; /* data passed to cb */
+
+ struct atcp_wr_state *wst; /* our parent */
+
+ struct list_head node; /* write_[compl_]q list node */
+};
+
+/* setup and teardown atcp write state */
+extern void atcp_wr_exit(struct atcp_wr_state *wst);
+extern void atcp_wr_init(struct atcp_wr_state *wst,
+ const struct atcp_wr_ops *ops, void *ev_info,
+ void *priv);
+
+/* generic write callback, that call free(cb_data2) */
+extern bool atcp_cb_free(struct atcp_wr_state *wst, void *cb_data, bool done);
+
+/* clear all write queues immediately, even if not complete */
+extern void atcp_write_free_all(struct atcp_wr_state *wst);
+
+/* complete all writes found on completion queue */
+extern bool atcp_write_run_compl(struct atcp_wr_state *wst);
+
+/* initialize internal fd, event setup */
+extern void atcp_wr_set_fd(struct atcp_wr_state *wst, int fd);
+
+/* add a buffer to the write queue */
+extern int atcp_writeq(struct atcp_wr_state *wst, const void *buf, unsigned int buflen,
+ atcp_write_func cb, void *cb_data);
+
+/* begin pushing write queue to socket */
+extern bool atcp_write_start(struct atcp_wr_state *wst);
+
+/* is anything on the write queue at the moment? */
+static inline bool atcp_wq_empty(struct atcp_wr_state *wst)
+{
+ return list_empty(&wst->write_q) ? true : false;
+}
+
+/* total number of octets queued at this moment */
+static inline size_t atcp_wqueued(struct atcp_wr_state *wst)
+{
+ return wst->write_cnt;
+}
+
+#endif /* __ANET_H__ */
diff --git a/lib/Makefile.am b/lib/Makefile.am
index f7b27ff..616b881 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -21,6 +21,7 @@ LINK = $(LIBTOOL) --mode=link $(CC) $(CFLAGS) $(LDFLAGS) -o $@
lib_LTLIBRARIES = libhail.la
libhail_la_SOURCES = \
+ atcp.c \
cldc.c \
cldc-udp.c \
cldc-dns.c \
diff --git a/lib/atcp.c b/lib/atcp.c
new file mode 100644
index 0000000..dfdb954
--- /dev/null
+++ b/lib/atcp.c
@@ -0,0 +1,241 @@
+
+/*
+ * Copyright 2010 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING. If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#include "hail-config.h"
+
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <sys/uio.h>
+#include <anet.h>
+
+bool atcp_cb_free(struct atcp_wr_state *wst, void *cb_data, bool done)
+{
+ free(cb_data);
+ return false;
+}
+
+static void atcp_write_complete(struct atcp_write *tmp)
+{
+ struct atcp_wr_state *wst = tmp->wst;
+
+ list_del(&tmp->node);
+ list_add_tail(&tmp->node, &wst->write_compl_q);
+}
+
+static bool atcp_write_free(struct atcp_write *tmp, bool done)
+{
+ struct atcp_wr_state *wst = tmp->wst;
+ bool rcb = false;
+
+ wst->write_cnt -= tmp->length;
+ list_del_init(&tmp->node);
+ if (tmp->cb)
+ rcb = tmp->cb(wst, tmp->cb_data, done);
+ free(tmp);
+
+ return rcb;
+}
+
+bool atcp_write_run_compl(struct atcp_wr_state *wst)
+{
+ struct atcp_write *wr;
+ bool do_loop;
+
+ do_loop = false;
+ while (!list_empty(&wst->write_compl_q)) {
+ wr = list_entry(wst->write_compl_q.next,
+ struct atcp_write, node);
+ do_loop |= atcp_write_free(wr, true);
+ }
+ return do_loop;
+}
+
+void atcp_write_free_all(struct atcp_wr_state *wst)
+{
+ struct atcp_write *wr, *tmp;
+
+ atcp_write_run_compl(wst);
+ list_for_each_entry_safe(wr, tmp, &wst->write_q, node) {
+ atcp_write_free(wr, false);
+ }
+}
+
+static bool atcp_writable(struct atcp_wr_state *wst)
+{
+ int n_iov;
+ struct atcp_write *tmp;
+ ssize_t rc;
+ struct iovec iov[ATCP_MAX_WR_IOV];
+
+ /* accumulate pending writes into iovec */
+ n_iov = 0;
+ list_for_each_entry(tmp, &wst->write_q, node) {
+ if (n_iov == ATCP_MAX_WR_IOV)
+ break;
+ /* bleh, struct iovec should declare iov_base const */
+ iov[n_iov].iov_base = (void *) tmp->buf;
+ iov[n_iov].iov_len = tmp->togo;
+ n_iov++;
+ }
+
+ /* execute non-blocking write */
+do_write:
+ rc = writev(wst->fd, iov, n_iov);
+ if (rc < 0) {
+ if (errno == EINTR)
+ goto do_write;
+ if (errno != EAGAIN)
+ goto err_out;
+ return true;
+ }
+
+ /* iterate through write queue, issuing completions based on
+ * amount of data written
+ */
+ while (rc > 0) {
+ int sz;
+
+ /* get pointer to first record on list */
+ tmp = list_entry(wst->write_q.next, struct atcp_write, node);
+
+ /* mark data consumed by decreasing tmp->len */
+ sz = (tmp->togo < rc) ? tmp->togo : rc;
+ tmp->togo -= sz;
+ tmp->buf += sz;
+ rc -= sz;
+
+ /* if tmp->len reaches zero, write is complete,
+ * so schedule it for clean up (cannot call callback
+ * right away or an endless recursion will result)
+ */
+ if (tmp->togo == 0)
+ atcp_write_complete(tmp);
+ }
+
+ /* if we emptied the queue, clear write notification */
+ if (atcp_wq_empty(wst)) {
+ wst->writing = false;
+ if (wst->ops->ev_del(wst->ev_info) < 0)
+ goto err_out;
+ }
+
+ return true;
+
+err_out:
+ atcp_write_free_all(wst);
+ return false;
+}
+
+static void atcp_wr_event(int fd, short events, void *userdata)
+{
+ struct atcp_wr_state *wst = userdata;
+
+ atcp_writable(wst);
+ atcp_write_run_compl(wst);
+}
+
+void atcp_wr_set_fd(struct atcp_wr_state *wst, int fd)
+{
+ wst->fd = fd;
+
+ wst->ops->ev_wset(wst->ev_info, wst->fd,
+ atcp_wr_event, wst);
+}
+
+bool atcp_write_start(struct atcp_wr_state *wst)
+{
+ if (atcp_wq_empty(wst))
+ return true; /* loop, not poll */
+
+ /* if write-poll already active, nothing further to do */
+ if (wst->writing)
+ return false; /* poll wait */
+
+ /* attempt optimistic write, in hopes of avoiding poll,
+ * or at least refill the write buffers so as to not
+ * get -immediately- called again by the kernel
+ */
+ atcp_writable(wst);
+ if (atcp_wq_empty(wst)) {
+ wst->opt_write++;
+ return true; /* loop, not poll */
+ }
+
+ if (wst->ops->ev_add(wst->ev_info, NULL) < 0)
+ return true; /* loop, not poll */
+
+ wst->writing = true;
+
+ return false; /* poll wait */
+}
+
+int atcp_writeq(struct atcp_wr_state *wst, const void *buf, unsigned int buflen,
+ atcp_write_func cb, void *cb_data)
+{
+ struct atcp_write *wr;
+
+ if (!buf || !buflen)
+ return -EINVAL;
+
+ wr = calloc(1, sizeof(struct atcp_write));
+ if (!wr)
+ return -ENOMEM;
+
+ wr->buf = buf;
+ wr->togo = buflen;
+ wr->length = buflen;
+ wr->cb = cb;
+ wr->cb_data = cb_data;
+ wr->wst = wst;
+ list_add_tail(&wr->node, &wst->write_q);
+ wst->write_cnt += buflen;
+ if (wst->write_cnt > wst->write_cnt_max)
+ wst->write_cnt_max = wst->write_cnt;
+
+ return 0;
+}
+
+void atcp_wr_exit(struct atcp_wr_state *wst)
+{
+ if (!wst)
+ return;
+
+ if (wst->writing)
+ wst->ops->ev_del(wst->ev_info);
+
+ atcp_write_free_all(wst);
+}
+
+void atcp_wr_init(struct atcp_wr_state *wst,
+ const struct atcp_wr_ops *ops, void *ev_info,
+ void *priv)
+{
+ memset(wst, 0, sizeof(*wst));
+
+ INIT_LIST_HEAD(&wst->write_q);
+ INIT_LIST_HEAD(&wst->write_compl_q);
+
+ wst->fd = -1;
+
+ wst->ops = ops;
+ wst->ev_info = ev_info;
+ wst->priv = priv;
+}
+
^ permalink raw reply related [flat|nested] 4+ messages in thread
* [PATCH] tabled: use libhail's anet
2010-09-24 0:32 [PATCH] libhail: add async TCP network writing API, atcp_wr* Jeff Garzik
@ 2010-09-24 0:34 ` Jeff Garzik
2010-09-24 0:35 ` [PATCH] itd: " Jeff Garzik
1 sibling, 0 replies; 4+ messages in thread
From: Jeff Garzik @ 2010-09-24 0:34 UTC (permalink / raw)
To: hail-devel
Updated for move to libhail. Not committed on main branch until sme
more time passes.
server/bucket.c | 8 -
server/object.c | 56 +++++-----
server/server.c | 294 ++++++++++++++------------------------------------------
server/status.c | 3
server/tabled.h | 45 ++------
5 files changed, 123 insertions(+), 283 deletions(-)
diff --git a/server/bucket.c b/server/bucket.c
index eb03e03..a81eca1 100644
--- a/server/bucket.c
+++ b/server/bucket.c
@@ -566,13 +566,13 @@ bool bucket_add(struct client *cli, const char *user, const char *bucket)
bucket) < 0)
return cli_err(cli, InternalError);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out:
rc = txn->abort(txn);
@@ -718,13 +718,13 @@ bool bucket_del(struct client *cli, const char *user, const char *bucket)
hutil_time2str(timestr, sizeof(timestr), time(NULL))) < 0)
return cli_err(cli, InternalError);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out:
rc = txn->abort(txn);
diff --git a/server/object.c b/server/object.c
index 3801e94..64cc8b6 100644
--- a/server/object.c
+++ b/server/object.c
@@ -227,13 +227,13 @@ bool object_del(struct client *cli, const char *user,
hutil_time2str(timestr, sizeof(timestr), time(NULL))) < 0)
return cli_err(cli, InternalError);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out:
rc = txn->abort(txn);
@@ -525,13 +525,13 @@ static bool object_put_end(struct client *cli)
return cli_err(cli, InternalError);
}
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out_rb:
rc = txn->abort(txn);
@@ -618,7 +618,8 @@ static int object_put_buf(struct client *cli, struct open_chunk *ochunk,
return 0;
}
-bool cli_evt_http_data_in(struct client *cli, unsigned int events)
+bool cli_evt_http_data_in(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
ssize_t avail;
struct open_chunk *ochunk;
@@ -812,8 +813,8 @@ static bool object_put_body(struct client *cli, const char *user,
cli_out_end(cli);
return cli_err(cli, InternalError);
}
- cli_writeq(cli, cont, strlen(cont), cli_cb_free, cont);
- cli_write_start(cli);
+ atcp_writeq(&cli->wst, cont, strlen(cont), atcp_cb_free, cont);
+ atcp_write_start(&cli->wst);
}
avail = MIN(cli_req_avail(cli), content_len);
@@ -940,13 +941,13 @@ static bool object_put_acls(struct client *cli, const char *user,
return cli_err(cli, InternalError);
}
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
return true;
}
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out_rb:
rc = txn->abort(txn);
@@ -990,10 +991,10 @@ void cli_in_end(struct client *cli)
cli->in_len = 0;
}
-static bool object_get_more(struct client *cli, void *cb_data, bool done);
+static bool object_get_more(struct atcp_wr_state *wst, void *cb_data, bool done);
/*
- * Return true iff cli_writeq was called. This is compatible with the
+ * Return true iff atcp_writeq was called. This is compatible with the
* convention for cli continuation callbacks, so object_get_more can call us.
*/
static bool object_get_poke(struct client *cli)
@@ -1026,7 +1027,7 @@ static bool object_get_poke(struct client *cli)
if (bytes == 0) {
if (!cli->in_len) {
cli_in_end(cli);
- cli_write_start(cli);
+ atcp_write_start(&cli->wst);
}
free(buf);
return false;
@@ -1034,15 +1035,15 @@ static bool object_get_poke(struct client *cli)
cli->in_len -= bytes;
if (!cli->in_len) {
- if (cli_writeq(cli, buf, bytes, cli_cb_free, buf))
+ if (atcp_writeq(&cli->wst, buf, bytes, atcp_cb_free, buf))
goto err_out;
cli_in_end(cli);
- cli_write_start(cli);
+ atcp_write_start(&cli->wst);
} else {
- if (cli_writeq(cli, buf, bytes, object_get_more, buf))
+ if (atcp_writeq(&cli->wst, buf, bytes, object_get_more, buf))
goto err_out;
- if (cli_wqueued(cli) >= CLI_DATA_BUF_SZ)
- cli_write_start(cli);
+ if (atcp_wqueued(&cli->wst) >= CLI_DATA_BUF_SZ)
+ atcp_write_start(&cli->wst);
}
return true;
@@ -1053,8 +1054,9 @@ err_out:
}
/* callback from the client side: a queued write is being disposed */
-static bool object_get_more(struct client *cli, void *cb_data, bool done)
+static bool object_get_more(struct atcp_wr_state *wst, void *cb_data, bool done)
{
+ struct client *cli = wst->priv;
/* free now-written buffer */
free(cb_data);
@@ -1071,8 +1073,10 @@ static bool object_get_more(struct client *cli, void *cb_data, bool done)
/* callback from the chunkd side: some data is available */
static void object_get_event(struct open_chunk *ochunk)
{
- object_get_poke(ochunk->cli);
- cli_write_run_compl();
+ struct client *cli = ochunk->cli;
+
+ object_get_poke(cli);
+ atcp_write_run_compl(&cli->wst);
}
static int object_node_count_up(struct db_obj_ent *obj)
@@ -1327,7 +1331,7 @@ static bool object_get_body(struct client *cli, const char *user,
if (!want_body) {
cli_in_end(cli);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
return true;
@@ -1347,7 +1351,7 @@ static bool object_get_body(struct client *cli, const char *user,
if (!cli->in_len)
cli_in_end(cli);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
goto err_out_in_end;
@@ -1365,21 +1369,21 @@ static bool object_get_body(struct client *cli, const char *user,
goto err_out_in_end;
memcpy(tmp, buf, bytes);
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
free(tmp);
return true;
}
- if (cli_writeq(cli, tmp, bytes,
- cli->in_len ? object_get_more : cli_cb_free, tmp))
+ if (atcp_writeq(&cli->wst, tmp, bytes,
+ cli->in_len ? object_get_more : atcp_cb_free, tmp))
goto err_out_in_end;
start_write:
free(obj);
g_string_free(extra_hdr, TRUE);
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
err_out_in_end:
cli_in_end(cli);
diff --git a/server/server.c b/server/server.c
index 3398026..a4c9928 100644
--- a/server/server.c
+++ b/server/server.c
@@ -56,8 +56,6 @@
const char *argp_program_version = PACKAGE_VERSION;
enum {
- CLI_MAX_WR_IOV = 32, /* max iov per writev(2) */
-
SFL_FOREGROUND = (1 << 0), /* run in foreground */
};
@@ -488,65 +486,25 @@ bool stat_status(struct client *cli, GList *content)
if (asprintf(&str,
"<p>Stats: "
- "poll %lu event %lu tcp_accept %lu opt_write %lu</p>\r\n"
- "<p>Debug: max_write_buf %lu</p>\r\n",
- tabled_srv.stats.poll,
- tabled_srv.stats.event,
- tabled_srv.stats.tcp_accept,
- tabled_srv.stats.opt_write,
- tabled_srv.stats.max_write_buf) < 0)
+ "poll %llu event %llu tcp_accept %llu opt_write %llu</p>\r\n"
+ "<p>Debug: max_write_buf %llu</p>\r\n",
+ (unsigned long long) tabled_srv.stats.poll,
+ (unsigned long long) tabled_srv.stats.event,
+ (unsigned long long) tabled_srv.stats.tcp_accept,
+ (unsigned long long) tabled_srv.stats.opt_write,
+ (unsigned long long) tabled_srv.stats.max_write_buf) < 0)
return false;
content = g_list_append(content, str);
return true;
}
-static void cli_write_complete(struct client *cli, struct client_write *tmp)
-{
- list_del(&tmp->node);
- list_add_tail(&tmp->node, &tabled_srv.write_compl_q);
-}
-
-static bool cli_write_free(struct client_write *tmp, bool done)
-{
- struct client *cli = tmp->cb_cli;
- bool rcb = false;
-
- cli->write_cnt -= tmp->length;
- list_del(&tmp->node);
- if (tmp->cb)
- rcb = tmp->cb(cli, tmp->cb_data, done);
- free(tmp);
-
- return rcb;
-}
-
-static void cli_write_free_all(struct client *cli)
-{
- struct client_write *wr, *tmp;
-
- cli_write_run_compl();
- list_for_each_entry_safe(wr, tmp, &cli->write_q, node) {
- cli_write_free(wr, false);
- }
-}
-
-bool cli_write_run_compl(void)
-{
- struct client_write *wr;
- bool do_loop;
-
- do_loop = false;
- while (!list_empty(&tabled_srv.write_compl_q)) {
- wr = list_entry(tabled_srv.write_compl_q.next,
- struct client_write, node);
- do_loop |= cli_write_free(wr, true);
- }
- return do_loop;
-}
-
static void cli_free(struct client *cli)
{
- cli_write_free_all(cli);
+ if (cli->wst.write_cnt_max > tabled_srv.stats.max_write_buf)
+ tabled_srv.stats.max_write_buf = cli->wst.write_cnt_max;
+ tabled_srv.stats.opt_write += cli->wst.opt_write;
+
+ atcp_wr_exit(&cli->wst);
cli_out_end(cli);
cli_in_end(cli);
@@ -561,27 +519,28 @@ static void cli_free(struct client *cli)
hreq_free(&cli->req);
- if (cli->write_cnt_max > tabled_srv.stats.max_write_buf)
- tabled_srv.stats.max_write_buf = cli->write_cnt_max;
-
if (debugging)
applog(LOG_INFO, "client %s ended", cli->addr_host);
free(cli);
}
-static bool cli_evt_dispose(struct client *cli, unsigned int events)
+static bool cli_evt_dispose(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
/* if write queue is not empty, we should continue to get
* poll callbacks here until it is
*/
- if (list_empty(&cli->write_q))
+ if (atcp_wq_empty(&cli->wst)) {
cli_free(cli);
+ *invalidate_cli = true;
+ }
return false;
}
-static bool cli_evt_recycle(struct client *cli, unsigned int events)
+static bool cli_evt_recycle(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
unsigned int slop;
@@ -608,134 +567,6 @@ static bool cli_evt_recycle(struct client *cli, unsigned int events)
return true;
}
-static void cli_writable(struct client *cli)
-{
- int n_iov;
- struct client_write *tmp;
- ssize_t rc;
- struct iovec iov[CLI_MAX_WR_IOV];
-
- /* accumulate pending writes into iovec */
- n_iov = 0;
- list_for_each_entry(tmp, &cli->write_q, node) {
- if (n_iov == CLI_MAX_WR_IOV)
- break;
- /* bleh, struct iovec should declare iov_base const */
- iov[n_iov].iov_base = (void *) tmp->buf;
- iov[n_iov].iov_len = tmp->togo;
- n_iov++;
- }
-
- /* execute non-blocking write */
-do_write:
- rc = writev(cli->fd, iov, n_iov);
- if (rc < 0) {
- if (errno == EINTR)
- goto do_write;
- if (errno != EAGAIN)
- goto err_out;
- return;
- }
-
- /* iterate through write queue, issuing completions based on
- * amount of data written
- */
- while (rc > 0) {
- int sz;
-
- /* get pointer to first record on list */
- tmp = list_entry(cli->write_q.next, struct client_write, node);
-
- /* mark data consumed by decreasing tmp->len */
- sz = (tmp->togo < rc) ? tmp->togo : rc;
- tmp->togo -= sz;
- tmp->buf += sz;
- rc -= sz;
-
- /* if tmp->len reaches zero, write is complete,
- * so schedule it for clean up (cannot call callback
- * right away or an endless recursion will result)
- */
- if (tmp->togo == 0)
- cli_write_complete(cli, tmp);
- }
-
- /* if we emptied the queue, clear write notification */
- if (list_empty(&cli->write_q)) {
- cli->writing = false;
- if (event_del(&cli->write_ev) < 0) {
- applog(LOG_WARNING, "cli_writable event_del");
- goto err_out;
- }
- }
-
- return;
-
-err_out:
- cli->state = evt_dispose;
- cli_write_free_all(cli);
-}
-
-bool cli_write_start(struct client *cli)
-{
- if (list_empty(&cli->write_q))
- return true; /* loop, not poll */
-
- /* if write-poll already active, nothing further to do */
- if (cli->writing)
- return false; /* poll wait */
-
- /* attempt optimistic write, in hopes of avoiding poll,
- * or at least refill the write buffers so as to not
- * get -immediately- called again by the kernel
- */
- cli_writable(cli);
- if (list_empty(&cli->write_q)) {
- tabled_srv.stats.opt_write++;
- return true; /* loop, not poll */
- }
-
- if (event_add(&cli->write_ev, NULL) < 0) {
- applog(LOG_WARNING, "cli_write event_add");
- return true; /* loop, not poll */
- }
-
- cli->writing = true;
-
- return false; /* poll wait */
-}
-
-int cli_writeq(struct client *cli, const void *buf, unsigned int buflen,
- cli_write_func cb, void *cb_data)
-{
- struct client_write *wr;
-
- if (!buf || !buflen)
- return -EINVAL;
-
- wr = calloc(1, sizeof(struct client_write));
- if (!wr)
- return -ENOMEM;
-
- wr->buf = buf;
- wr->togo = buflen;
- wr->length = buflen;
- wr->cb = cb;
- wr->cb_data = cb_data;
- wr->cb_cli = cli;
- list_add_tail(&wr->node, &cli->write_q);
- cli->write_cnt += buflen;
- if (cli->write_cnt > cli->write_cnt_max)
- cli->write_cnt_max = cli->write_cnt;
-
- return 0;
-}
-
-size_t cli_wqueued(struct client *cli)
-{
- return cli->write_cnt;
-}
-
/*
* Return:
* 0: progress was NOT made (EOF)
@@ -771,12 +602,6 @@ do_read:
return rc != 0;
}
-bool cli_cb_free(struct client *cli, void *cb_data, bool done)
-{
- free(cb_data);
- return false;
-}
-
static int cli_write_list(struct client *cli, GList *list)
{
int rc = 0;
@@ -784,8 +609,8 @@ static int cli_write_list(struct client *cli, GList *list)
tmp = list;
while (tmp) {
- rc = cli_writeq(cli, tmp->data, strlen(tmp->data),
- cli_cb_free, tmp->data);
+ rc = atcp_writeq(&cli->wst, tmp->data, strlen(tmp->data),
+ atcp_cb_free, tmp->data);
if (rc)
goto out;
@@ -870,14 +695,14 @@ bool cli_err_write(struct client *cli, char *hdr, char *content)
cli->state = evt_dispose;
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc)
return true;
- rc = cli_writeq(cli, content, strlen(content), cli_cb_free, content);
+ rc = atcp_writeq(&cli->wst, content, strlen(content), atcp_cb_free, content);
if (rc)
return true;
- return cli_write_start(cli);
+ return atcp_write_start(&cli->wst);
}
static bool cli_resp(struct client *cli, int http_status,
@@ -911,7 +736,7 @@ static bool cli_resp(struct client *cli, int http_status,
else
cli->state = evt_recycle;
- rc = cli_writeq(cli, hdr, strlen(hdr), cli_cb_free, hdr);
+ rc = atcp_writeq(&cli->wst, hdr, strlen(hdr), atcp_cb_free, hdr);
if (rc) {
free(hdr);
cli->state = evt_dispose;
@@ -924,7 +749,7 @@ static bool cli_resp(struct client *cli, int http_status,
return true;
}
- rcb = cli_write_start(cli);
+ rcb = atcp_write_start(&cli->wst);
if (cli->state == evt_recycle)
return true;
@@ -942,7 +767,8 @@ bool cli_resp_html(struct client *cli, int http_status, GList *content)
return cli_resp(cli, http_status, "text/html", content);
}
-static bool cli_evt_http_req(struct client *cli, unsigned int events)
+static bool cli_evt_http_req(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
struct http_req *req = &cli->req;
char *host, *auth, *content_len_str;
@@ -1195,7 +1021,8 @@ err_out:
return true;
}
-static bool cli_evt_parse_hdr(struct client *cli, unsigned int events)
+static bool cli_evt_parse_hdr(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
char *buf, *buf_eol;
bool eoh = false;
@@ -1252,7 +1079,8 @@ static bool cli_evt_parse_hdr(struct client *cli, unsigned int events)
return true;
}
-static bool cli_evt_read_hdr(struct client *cli, unsigned int events)
+static bool cli_evt_read_hdr(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
int rc = cli_read(cli);
if (rc <= 0) {
@@ -1268,7 +1096,8 @@ static bool cli_evt_read_hdr(struct client *cli, unsigned int events)
return true;
}
-static bool cli_evt_parse_req(struct client *cli, unsigned int events)
+static bool cli_evt_parse_req(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
char *sp1, *sp2, *buf;
enum errcode err_resp;
@@ -1336,7 +1165,8 @@ err_out:
return cli_err(cli, err_resp);
}
-static bool cli_evt_read_req(struct client *cli, unsigned int events)
+static bool cli_evt_read_req(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
int rc = cli_read(cli);
if (rc <= 0) {
@@ -1374,6 +1204,32 @@ static cli_evt_func evt_funcs_status[] = {
[evt_recycle] = cli_evt_recycle,
};
+static int cli_le_wset(void *ev_info, int fd, atcp_ev_func cb, void *cb_data)
+{
+ struct event *ev = ev_info;
+
+ event_set(ev, fd, EV_WRITE | EV_PERSIST, cb, cb_data);
+ return 0;
+}
+
+static int cli_le_add(void *ev_info, const struct timeval *tv)
+{
+ struct event *ev = ev_info;
+ return event_add(ev, tv);
+}
+
+static int cli_le_del(void *ev_info)
+{
+ struct event *ev = ev_info;
+ return event_del(ev);
+}
+
+static const struct atcp_wr_ops libevent_wr_ops = {
+ .ev_wset = cli_le_wset,
+ .ev_add = cli_le_add,
+ .ev_del = cli_le_del,
+};
+
static struct client *cli_alloc(bool is_status)
{
struct client *cli;
@@ -1385,9 +1241,10 @@ static struct client *cli_alloc(bool is_status)
return NULL;
}
+ atcp_wr_init(&cli->wst, &libevent_wr_ops, &cli->write_ev, cli);
+
cli->state = evt_read_req;
cli->evt_table = is_status? evt_funcs_status: evt_funcs_server;
- INIT_LIST_HEAD(&cli->write_q);
INIT_LIST_HEAD(&cli->out_ch);
cli->req_ptr = cli->req_buf;
memset(&cli->req, 0, sizeof(cli->req) - sizeof(cli->req.hdr));
@@ -1395,22 +1252,20 @@ static struct client *cli_alloc(bool is_status)
return cli;
}
-static void tcp_cli_wr_event(int fd, short events, void *userdata)
-{
- struct client *cli = userdata;
-
- cli_writable(cli);
- cli_write_run_compl();
-}
-
static void tcp_cli_event(int fd, short events, void *userdata)
{
struct client *cli = userdata;
bool loop;
+ bool invalidate_cli = false;
do {
- loop = cli->evt_table[cli->state](cli, events);
- loop |= cli_write_run_compl();
+ loop = cli->evt_table[cli->state](cli, events, &invalidate_cli);
+ if (invalidate_cli) {
+ cli = NULL;
+ break;
+ }
+
+ loop |= atcp_write_run_compl(&cli->wst);
} while (loop);
}
@@ -1438,11 +1293,11 @@ static void tcp_srv_event(int fd, short events, void *userdata)
goto err_out;
}
+ atcp_wr_set_fd(&cli->wst, cli->fd);
+
tabled_srv.stats.tcp_accept++;
event_set(&cli->ev, cli->fd, EV_READ | EV_PERSIST, tcp_cli_event, cli);
- event_set(&cli->write_ev, cli->fd, EV_WRITE | EV_PERSIST,
- tcp_cli_wr_event, cli);
/* mark non-blocking, for upcoming poll use */
if (fsetflags("tcp client", cli->fd, O_NONBLOCK) < 0)
@@ -2205,7 +2060,6 @@ int main (int argc, char *argv[])
struct event_base *event_base_rep;
INIT_LIST_HEAD(&tabled_srv.all_stor);
- INIT_LIST_HEAD(&tabled_srv.write_compl_q);
tabled_srv.state_tdb = ST_TDB_INIT;
tabled_srv.rep_next_id = DBID_MIN;
diff --git a/server/status.c b/server/status.c
index e9fbb38..9af60f7 100644
--- a/server/status.c
+++ b/server/status.c
@@ -150,7 +150,8 @@ out_err:
return cli_err(cli, InternalError);
}
-bool stat_evt_http_req(struct client *cli, unsigned int events)
+bool stat_evt_http_req(struct client *cli, unsigned int events,
+ bool *invalidate_cli)
{
struct http_req *req = &cli->req;
char *method = req->method;
diff --git a/server/tabled.h b/server/tabled.h
index d4d2048..4d3a2d9 100644
--- a/server/tabled.h
+++ b/server/tabled.h
@@ -31,6 +31,7 @@
#include <elist.h>
#include <tdb.h>
#include <hail_log.h>
+#include <anet.h>
#ifndef ARRAY_SIZE
#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
@@ -102,20 +103,8 @@ struct storage_node {
int ref; /* number of open_chunk or other */
};
-typedef bool (*cli_evt_func)(struct client *, unsigned int);
-typedef bool (*cli_write_func)(struct client *, void *, bool);
-
-struct client_write {
- const void *buf; /* write buffer pointer */
- int togo; /* write buffer remainder */
-
- int length; /* length for accounting */
- cli_write_func cb; /* callback */
- void *cb_data; /* data passed to cb */
- struct client *cb_cli; /* cli passed to cb */
-
- struct list_head node;
-};
+typedef bool (*cli_evt_func)(struct client *, unsigned int,
+ bool *invalidate_cli);
/* an open chunkd client */
struct open_chunk {
@@ -165,11 +154,8 @@ struct client {
struct event ev;
struct event write_ev;
- struct list_head write_q; /* list of async writes */
- size_t write_cnt; /* water level */
- bool writing;
+ struct atcp_wr_state wst;
/* some debugging stats */
- size_t write_cnt_max;
unsigned int req_used; /* amount of req_buf in use */
char *req_ptr; /* start of unexamined data */
@@ -216,12 +202,12 @@ enum st_net {
};
struct server_stats {
- unsigned long poll; /* number polls */
- unsigned long event; /* events dispatched */
- unsigned long tcp_accept; /* TCP accepted cxns */
- unsigned long opt_write; /* optimistic writes */
+ uint64_t poll; /* number polls */
+ uint64_t event; /* events dispatched */
+ uint64_t tcp_accept; /* TCP accepted cxns */
+ uint64_t opt_write; /* optimistic writes */
- unsigned long max_write_buf;
+ uint64_t max_write_buf;
};
#define DBID_NONE 0
@@ -249,7 +235,6 @@ struct server {
struct event_base *evbase_main;
int ev_pipe[2];
struct event pevt;
- struct list_head write_compl_q; /* list of done writes */
bool mc_delay;
struct event mc_timer;
@@ -361,7 +346,8 @@ extern bool object_put(struct client *cli, const char *user, const char *bucket,
const char *key, long content_len, bool expect_cont);
extern bool object_get(struct client *cli, const char *user, const char *bucket,
const char *key, bool want_body);
-extern bool cli_evt_http_data_in(struct client *cli, unsigned int events);
+extern bool cli_evt_http_data_in(struct client *cli, unsigned int events,
+ bool *invalidate_cli);
extern void cli_out_end(struct client *cli);
extern void cli_in_end(struct client *cli);
@@ -398,12 +384,6 @@ extern bool cli_err(struct client *cli, enum errcode code);
extern bool cli_err_write(struct client *cli, char *hdr, char *content);
extern bool cli_resp_xml(struct client *cli, int http_status, GList *content);
extern bool cli_resp_html(struct client *cli, int http_status, GList *content);
-extern int cli_writeq(struct client *cli, const void *buf, unsigned int buflen,
- cli_write_func cb, void *cb_data);
-extern size_t cli_wqueued(struct client *cli);
-extern bool cli_cb_free(struct client *cli, void *cb_data, bool done);
-extern bool cli_write_start(struct client *cli);
-extern bool cli_write_run_compl(void);
extern int cli_req_avail(struct client *cli);
extern void applog(int prio, const char *fmt, ...);
extern void cld_update_cb(void);
@@ -415,7 +395,8 @@ extern struct db_remote *tdb_find_remote_byname(const char *name);
extern struct db_remote *tdb_find_remote_byid(int id);
/* status.c */
-extern bool stat_evt_http_req(struct client *cli, unsigned int events);
+extern bool stat_evt_http_req(struct client *cli, unsigned int events,
+ bool *invalidate_cli);
/* config.c */
extern void read_config(void);
^ permalink raw reply related [flat|nested] 4+ messages in thread
* [PATCH] itd: use libhail's anet
2010-09-24 0:32 [PATCH] libhail: add async TCP network writing API, atcp_wr* Jeff Garzik
2010-09-24 0:34 ` [PATCH] tabled: use libhail's anet Jeff Garzik
@ 2010-09-24 0:35 ` Jeff Garzik
2010-09-24 19:13 ` Jeff Garzik
1 sibling, 1 reply; 4+ messages in thread
From: Jeff Garzik @ 2010-09-24 0:35 UTC (permalink / raw)
To: hail-devel
Updated for libhail's anet. Not yet committed, waiting for more time to
pass.
Makefile.am | 2
configure.ac | 1
iscsiutil.h | 52 +------------
target.c | 53 ++++++++++----
target.h | 3
util.c | 223 ++---------------------------------------------------------
6 files changed, 57 insertions(+), 277 deletions(-)
diff --git a/Makefile.am b/Makefile.am
index d559165..d99ab0c 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -9,7 +9,7 @@ sbin_PROGRAMS = itd
itd_SOURCES = \
elist.h scsi_cmd_codes.h iscsiutil.h iscsi.h parameters.h target.h \
main.c iscsi.c target.c util.c parameters.c
-itd_LDADD = @GLIB_LIBS@ @CRYPTO_LIBS@ @EVENT_LIBS@
+itd_LDADD = @GLIB_LIBS@ @CRYPTO_LIBS@ @EVENT_LIBS@ @HAIL_LIBS@
EXTRA_DIST = autogen.sh
diff --git a/configure.ac b/configure.ac
index 7cbbe3f..e2f6731 100644
--- a/configure.ac
+++ b/configure.ac
@@ -71,6 +71,7 @@ dnl autoconf output generation
dnl --------------------------
AM_PATH_GLIB_2_0(2.0.0)
+PKG_CHECK_MODULES(HAIL, libhail)
AC_SUBST(CRYPTO_LIBS)
AC_SUBST(EVENT_LIBS)
diff --git a/iscsiutil.h b/iscsiutil.h
index 5e5f5bf..2430c48 100644
--- a/iscsiutil.h
+++ b/iscsiutil.h
@@ -82,8 +82,8 @@
#endif
#include <string.h>
-#include <event.h>
#include "elist.h"
+#include <anet.h>
/*
* Debugging Levels
@@ -138,11 +138,11 @@ extern void iscsi_print_buffer(uint8_t *, const size_t);
*/
struct target_session;
-struct tcp_write_state;
+struct atcp_wr_state;
extern const char *sopstr(uint8_t op);
extern int fsetflags(const char *prefix, int fd, int or_flags);
-extern int iscsi_writev(struct tcp_write_state *st,
+extern int iscsi_writev(struct atcp_wr_state *wst,
void *header, unsigned header_len,
const void *data, unsigned data_len);
@@ -241,53 +241,11 @@ typedef struct name { \
extern size_t strlcpy(char *, const char *, size_t);
#endif
-enum {
- TCP_MAX_WR_IOV = 512, /* arbitrary, pick better one */
- TCP_MAX_WR_CNT = 10000,/* arbitrary, pick better one */
-};
-
-struct tcp_write_state {
- int fd;
- struct list_head write_q;
- struct list_head write_compl_q;
- size_t write_cnt; /* water level */
- size_t write_cnt_max;
- bool writing;
- struct event write_ev;
-
- void *priv; /* useable by any app */
-
- /* stats */
- unsigned long opt_write;
-};
-
-struct tcp_write {
- const void *buf; /* write buffer pointer */
- int togo; /* write buffer remainder */
-
- int length; /* length for accounting */
-
- /* callback */
- bool (*cb)(struct tcp_write_state *, void *, bool);
- void *cb_data; /* data passed to cb */
-
- struct list_head node;
-};
-
-extern int tcp_writeq(struct tcp_write_state *st, const void *buf, unsigned int buflen,
- bool (*cb)(struct tcp_write_state *, void *, bool),
- void *cb_data);
-extern bool tcp_wr_cb_free(struct tcp_write_state *st, void *cb_data, bool done);
-extern void tcp_write_init(struct tcp_write_state *st, int fd);
-extern void tcp_write_exit(struct tcp_write_state *st);
-extern bool tcp_write_start(struct tcp_write_state *st);
-extern bool tcp_write_run_compl(struct tcp_write_state *st);
-
-extern void send_padding(struct tcp_write_state *st, unsigned int len_out);
+extern void send_padding(struct atcp_wr_state *wst, unsigned int len_out);
extern void *header_get(void);
extern void header_put(void *mem);
-extern bool hdr_cb_free(struct tcp_write_state *st, void *cb_data, bool done);
+extern bool hdr_cb_free(struct atcp_wr_state *, void *, bool);
extern void hdrs_free_all(void);
static inline int padding_bytes(unsigned int len_out)
diff --git a/target.c b/target.c
index 7ef7d72..be6a88b 100644
--- a/target.c
+++ b/target.c
@@ -613,9 +613,9 @@ static int task_command_t(struct target_session *sess, const uint8_t *header)
goto err_out_hdr;
}
- tcp_writeq(&sess->wst, rsp_header, ISCSI_HEADER_LEN,
+ atcp_writeq(&sess->wst, rsp_header, ISCSI_HEADER_LEN,
hdr_cb_free, rsp_header);
- tcp_write_start(&sess->wst);
+ atcp_write_start(&sess->wst);
return 0;
@@ -839,18 +839,18 @@ static int text_command_t(struct target_session *sess, const uint8_t *header)
goto err_out_hdr;
}
- tcp_writeq(&sess->wst, rsp_header, ISCSI_HEADER_LEN,
+ atcp_writeq(&sess->wst, rsp_header, ISCSI_HEADER_LEN,
hdr_cb_free, rsp_header);
if (len_out) {
- tcp_writeq(&sess->wst, text_out, len_out,
- tcp_wr_cb_free, text_out);
+ atcp_writeq(&sess->wst, text_out, len_out,
+ atcp_cb_free, text_out);
text_out = NULL;
send_padding(&sess->wst, len_out);
}
- tcp_write_start(&sess->wst);
+ atcp_write_start(&sess->wst);
free(text_in);
free(text_out);
@@ -1228,9 +1228,9 @@ static int logout_command_t(struct target_session *sess, const uint8_t *header)
return -1;
}
- tcp_writeq(&sess->wst, rsp_header, ISCSI_HEADER_LEN,
+ atcp_writeq(&sess->wst, rsp_header, ISCSI_HEADER_LEN,
hdr_cb_free, rsp_header);
- tcp_write_start(&sess->wst);
+ atcp_write_start(&sess->wst);
iscsi_trace(TRACE_ISCSI_DEBUG, __FILE__, __LINE__,
"sent logout response OK\n");
@@ -1494,9 +1494,9 @@ static int send_r2t(struct target_session *sess)
sess->xfer.r2t.tag, sess->xfer.r2t.transfer_tag,
sess->xfer.r2t.length, sess->xfer.r2t.offset);
- tcp_writeq(&sess->wst, header, ISCSI_HEADER_LEN,
+ atcp_writeq(&sess->wst, header, ISCSI_HEADER_LEN,
hdr_cb_free, header);
- tcp_write_start(&sess->wst);
+ atcp_write_start(&sess->wst);
sess->xfer.r2t_flag = 1;
sess->xfer.r2t.R2TSN += 1;
@@ -1761,7 +1761,7 @@ int target_sess_cleanup(struct target_session *sess)
event_del(&sess->ev);
- tcp_write_exit(&sess->wst);
+ atcp_wr_exit(&sess->wst);
/* Terminate connection */
if (sess->fd >= 0)
@@ -1965,7 +1965,7 @@ restart:
break;
}
- tcp_write_run_compl(&sess->wst);
+ atcp_write_run_compl(&sess->wst);
return;
err_out:
@@ -1982,6 +1982,32 @@ static void target_tcp_evt(int fd, short events, void *userdata)
target_read_evt(sess);
}
+static int target_sess_le_wset(void *ev_info, int fd, atcp_ev_func cb, void *cb_data)
+{
+ struct event *ev = ev_info;
+
+ event_set(ev, fd, EV_WRITE | EV_PERSIST, cb, cb_data);
+ return 0;
+}
+
+static int target_sess_le_add(void *ev_info, const struct timeval *tv)
+{
+ struct event *ev = ev_info;
+ return event_add(ev, tv);
+}
+
+static int target_sess_le_del(void *ev_info)
+{
+ struct event *ev = ev_info;
+ return event_del(ev);
+}
+
+static const struct atcp_wr_ops libevent_wr_ops = {
+ .ev_wset = target_sess_le_wset,
+ .ev_add = target_sess_le_add,
+ .ev_del = target_sess_le_del,
+};
+
int target_accept(struct globals *gp, struct server_socket *sock)
{
struct target_session *sess;
@@ -2013,7 +2039,8 @@ int target_accept(struct globals *gp, struct server_socket *sock)
sess->globals = gp;
- tcp_write_init(&sess->wst, sess->fd);
+ atcp_wr_init(&sess->wst, &libevent_wr_ops, &sess->write_ev, sess);
+ atcp_wr_set_fd(&sess->wst, sess->fd);
event_set(&sess->ev, sess->fd, EV_READ | EV_PERSIST,
target_tcp_evt, sess);
diff --git a/target.h b/target.h
index ddedb0a..e764730 100644
--- a/target.h
+++ b/target.h
@@ -219,8 +219,9 @@ struct target_session {
int fd;
struct sockaddr addr;
struct event ev;
+ struct event write_ev;
- struct tcp_write_state wst;
+ struct atcp_wr_state wst;
struct session_xfer xfer;
diff --git a/util.c b/util.c
index c3e4455..eb41f84 100644
--- a/util.c
+++ b/util.c
@@ -307,7 +307,7 @@ void header_put(void *mem)
g_trash_stack_push(&free_headers, mem);
}
-bool hdr_cb_free(struct tcp_write_state *st, void *cb_data, bool done)
+bool hdr_cb_free(struct atcp_wr_state *wst, void *cb_data, bool done)
{
header_put(cb_data);
return false;
@@ -326,7 +326,7 @@ void hdrs_free_all(void)
}
}
-void send_padding(struct tcp_write_state *st, unsigned int len_out)
+void send_padding(struct atcp_wr_state *st, unsigned int len_out)
{
int pad_len;
static const char pad_buf[4] = { 0, 0, 0, 0 };
@@ -335,7 +335,7 @@ void send_padding(struct tcp_write_state *st, unsigned int len_out)
if (!pad_len)
return;
- tcp_writeq(st, pad_buf, pad_len, NULL, NULL);
+ atcp_writeq(st, pad_buf, pad_len, NULL, NULL);
}
/*
@@ -348,7 +348,7 @@ void send_padding(struct tcp_write_state *st, unsigned int len_out)
* data, else send as two separate messages.
*/
-int iscsi_writev(struct tcp_write_state *st,
+int iscsi_writev(struct atcp_wr_state *st,
void *header, unsigned header_len,
const void *data, unsigned data_len)
{
@@ -356,7 +356,7 @@ int iscsi_writev(struct tcp_write_state *st,
"NET: writing %u header bytes, %u data bytes\n",
header_len, data_len);
- tcp_writeq(st, header, header_len, hdr_cb_free, header);
+ atcp_writeq(st, header, header_len, hdr_cb_free, header);
if (data && data_len > 0) {
void *mem;
@@ -364,13 +364,13 @@ int iscsi_writev(struct tcp_write_state *st,
mem = g_memdup(data, data_len);
if (!mem)
return -1;
- tcp_writeq(st, mem, data_len,
- tcp_wr_cb_free, mem);
+ atcp_writeq(st, mem, data_len,
+ atcp_cb_free, mem);
}
send_padding(st, data_len);
- tcp_write_start(st);
+ atcp_write_start(st);
return header_len + data_len;
}
@@ -712,213 +712,6 @@ int fsetflags(const char *prefix, int fd, int or_flags)
return rc;
}
-static void tcp_write_complete(struct tcp_write_state *st, struct tcp_write *tmp)
-{
- list_del(&tmp->node);
- list_add_tail(&tmp->node, &st->write_compl_q);
-}
-
-bool tcp_wr_cb_free(struct tcp_write_state *st, void *cb_data, bool done)
-{
- free(cb_data);
- return false;
-}
-
-static bool tcp_write_free(struct tcp_write_state *st, struct tcp_write *tmp,
- bool done)
-{
- bool rcb = false;
-
- st->write_cnt -= tmp->length;
- list_del_init(&tmp->node);
- if (tmp->cb)
- rcb = tmp->cb(st, tmp->cb_data, done);
- free(tmp);
-
- return rcb;
-}
-
-static void tcp_write_free_all(struct tcp_write_state *st)
-{
- struct tcp_write *wr, *tmp;
-
- list_for_each_entry_safe(wr, tmp, &st->write_compl_q, node) {
- tcp_write_free(st, wr, true);
- }
- list_for_each_entry_safe(wr, tmp, &st->write_q, node) {
- tcp_write_free(st, wr, false);
- }
-}
-
-bool tcp_write_run_compl(struct tcp_write_state *st)
-{
- struct tcp_write *wr;
- bool do_loop;
-
- do_loop = false;
- while (!list_empty(&st->write_compl_q)) {
- wr = list_entry(st->write_compl_q.next, struct tcp_write,
- node);
- do_loop |= tcp_write_free(st, wr, true);
- }
- return do_loop;
-}
-
-static bool tcp_writable(struct tcp_write_state *st)
-{
- int n_iov;
- struct tcp_write *tmp;
- ssize_t rc;
- struct iovec iov[TCP_MAX_WR_IOV];
-
- /* accumulate pending writes into iovec */
- n_iov = 0;
- list_for_each_entry(tmp, &st->write_q, node) {
- if (n_iov == TCP_MAX_WR_IOV)
- break;
- /* bleh, struct iovec should declare iov_base const */
- iov[n_iov].iov_base = (void *) tmp->buf;
- iov[n_iov].iov_len = tmp->togo;
- n_iov++;
- }
-
- /* execute non-blocking write */
-do_write:
- rc = writev(st->fd, iov, n_iov);
- if (rc < 0) {
- if (errno == EINTR)
- goto do_write;
- if (errno != EAGAIN)
- goto err_out;
- return true;
- }
-
- /* iterate through write queue, issuing completions based on
- * amount of data written
- */
- while (rc > 0) {
- int sz;
-
- /* get pointer to first record on list */
- tmp = list_entry(st->write_q.next, struct tcp_write, node);
-
- /* mark data consumed by decreasing tmp->len */
- sz = (tmp->togo < rc) ? tmp->togo : rc;
- tmp->togo -= sz;
- tmp->buf += sz;
- rc -= sz;
-
- /* if tmp->len reaches zero, write is complete,
- * so schedule it for clean up (cannot call callback
- * right away or an endless recursion will result)
- */
- if (tmp->togo == 0)
- tcp_write_complete(st, tmp);
- }
-
- /* if we emptied the queue, clear write notification */
- if (list_empty(&st->write_q)) {
- st->writing = false;
- if (event_del(&st->write_ev) < 0)
- goto err_out;
- }
-
- return true;
-
-err_out:
- tcp_write_free_all(st);
- return false;
-}
-
-bool tcp_write_start(struct tcp_write_state *st)
-{
- if (list_empty(&st->write_q))
- return true; /* loop, not poll */
-
- /* if write-poll already active, nothing further to do */
- if (st->writing)
- return false; /* poll wait */
-
- /* attempt optimistic write, in hopes of avoiding poll,
- * or at least refill the write buffers so as to not
- * get -immediately- called again by the kernel
- */
- tcp_writable(st);
- if (list_empty(&st->write_q)) {
- st->opt_write++;
- return true; /* loop, not poll */
- }
-
- if (event_add(&st->write_ev, NULL) < 0)
- return true; /* loop, not poll */
-
- st->writing = true;
-
- return false; /* poll wait */
-}
-
-int tcp_writeq(struct tcp_write_state *st, const void *buf, unsigned int buflen,
- bool (*cb)(struct tcp_write_state *, void *, bool),
- void *cb_data)
-{
- struct tcp_write *wr;
-
- if (!buf || !buflen)
- return -EINVAL;
-
- wr = calloc(1, sizeof(struct tcp_write));
- if (!wr)
- return -ENOMEM;
-
- wr->buf = buf;
- wr->togo = buflen;
- wr->length = buflen;
- wr->cb = cb;
- wr->cb_data = cb_data;
- list_add_tail(&wr->node, &st->write_q);
- st->write_cnt += buflen;
- if (st->write_cnt > st->write_cnt_max)
- st->write_cnt_max = st->write_cnt;
-
- return 0;
-}
-
-size_t tcp_wqueued(struct tcp_write_state *st)
-{
- return st->write_cnt;
-}
-
-static void tcp_wr_evt(int fd, short events, void *userdata)
-{
- struct tcp_write_state *st = userdata;
-
- tcp_writable(st);
- tcp_write_run_compl(st);
-}
-
-void tcp_write_init(struct tcp_write_state *st, int fd)
-{
- memset(st, 0, sizeof(*st));
-
- st->fd = fd;
-
- INIT_LIST_HEAD(&st->write_q);
- INIT_LIST_HEAD(&st->write_compl_q);
-
- st->write_cnt_max = TCP_MAX_WR_CNT;
-
- event_set(&st->write_ev, fd, EV_WRITE | EV_PERSIST,
- tcp_wr_evt, st);
-}
-
-void tcp_write_exit(struct tcp_write_state *st)
-{
- if (st->writing)
- event_del(&st->write_ev);
-
- tcp_write_free_all(st);
-}
-
/*
* CRC32C chksum,
* as copied from Linux kernel's crypto/crc32c.c
^ permalink raw reply related [flat|nested] 4+ messages in thread
* Re: [PATCH] itd: use libhail's anet
2010-09-24 0:35 ` [PATCH] itd: " Jeff Garzik
@ 2010-09-24 19:13 ` Jeff Garzik
0 siblings, 0 replies; 4+ messages in thread
From: Jeff Garzik @ 2010-09-24 19:13 UTC (permalink / raw)
To: hail-devel
On 09/23/2010 08:35 PM, Jeff Garzik wrote:
>
> Updated for libhail's anet. Not yet committed, waiting for more time to
> pass.
>
> Makefile.am | 2
> configure.ac | 1
> iscsiutil.h | 52 +------------
> target.c | 53 ++++++++++----
> target.h | 3
> util.c | 223 ++---------------------------------------------------------
> 6 files changed, 57 insertions(+), 277 deletions(-)
This got pushed to itd.git late last night.
Jeff
^ permalink raw reply [flat|nested] 4+ messages in thread
end of thread, other threads:[~2010-09-24 19:13 UTC | newest]
Thread overview: 4+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2010-09-24 0:32 [PATCH] libhail: add async TCP network writing API, atcp_wr* Jeff Garzik
2010-09-24 0:34 ` [PATCH] tabled: use libhail's anet Jeff Garzik
2010-09-24 0:35 ` [PATCH] itd: " Jeff Garzik
2010-09-24 19:13 ` Jeff Garzik
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.