From mboxrd@z Thu Jan 1 00:00:00 1970 From: Jeff Garzik Subject: Re: iSCSI front-end for Hail Date: Sun, 02 May 2010 02:32:37 -0400 Message-ID: <4BDD1C85.1040307@garzik.org> References: <4BDCAB1A.6070109@garzik.org> <20100501205602.7d4dc32c@redhat.com> Mime-Version: 1.0 Content-Type: multipart/mixed; boundary="------------030104020400050106000108" Return-path: DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=gamma; h=domainkey-signature:received:received:sender:message-id:date:from :user-agent:mime-version:to:cc:subject:references:in-reply-to :content-type; bh=eIgqXf2LltslNAsj4i28bfrTpjzHiwZzHsrUWTfzw0Y=; b=gYlvDKWINMTJ1yHZTC1DTa0UQNAqzhiBTxqYoa0hBVDW09WG6SK9xkn7Co7fHErGZB xko4QKZu4QRoR4cs8EN2f7gWuvNiqej6F57SQCz/UvTdCT4kWSwPYoN/UPYoHzxw8ZNh J42LiT4jluqmxurLXU2nUO7DD3b1wFUnBAoNA= In-Reply-To: <20100501205602.7d4dc32c@redhat.com> Sender: hail-devel-owner@vger.kernel.org List-ID: To: Pete Zaitcev Cc: Project Hail This is a multi-part message in MIME format. --------------030104020400050106000108 Content-Type: text/plain; charset=ISO-8859-1; format=flowed Content-Transfer-Encoding: 7bit On 05/01/2010 10:56 PM, Pete Zaitcev wrote: > On Sat, 01 May 2010 18:28:42 -0400 > Jeff Garzik wrote: > >> As I write this email, I am borrowing a lot of networking code from >> tabled, to convert from GNet over to the more-flexible TCP server >> codebase found in tabled -- notably the asynchronous background TCP >> writing code in tabled. Hopefully will finish and commit this by the >> end of the weekend. > > This seems crying for a common repository or something like libhail, > not sure what. Remember the timer case. Eventually we'll make changes > to tabled that itd will need to copy. But I don't know what course > is best. I was definitely thinking along those lines, when I abstracted and modularized the code a bit. See attached... I put all the TCP write-related code into a two structures, tcp_write_state and tcp_write. The code received s/cli_wr/tcp_wr/g and other obvious, cosmetic changes. libhail definitely seems like the direction to go. It would be easiest from a packaging perspective to put it into CLD. But maybe it deserves its own repo.? Jeff --------------030104020400050106000108 Content-Type: text/plain; name="util.c.txt" Content-Transfer-Encoding: 7bit Content-Disposition: attachment; filename="util.c.txt" ====================================SNIP CUT HERE SNIP========================= enum { TCP_MAX_WR_IOV = 512, /* arbitrary, pick better one */ TCP_MAX_WR_CNT = 10000,/* arbitrary, pick better one */ }; struct tcp_write_state { int fd; struct list_head write_q; struct list_head write_compl_q; size_t write_cnt; /* water level */ size_t write_cnt_max; bool writing; struct event write_ev; void *priv; /* useable by any app */ /* stats */ unsigned long opt_write; }; struct tcp_write { const void *buf; /* write buffer pointer */ int togo; /* write buffer remainder */ int length; /* length for accounting */ /* callback */ bool (*cb)(struct tcp_write_state *, void *, bool); void *cb_data; /* data passed to cb */ struct list_head node; }; extern int tcp_writeq(struct tcp_write_state *st, const void *buf, unsigned int buflen, bool (*cb)(struct tcp_write_state *, void *, bool), void *cb_data); extern bool tcp_wr_cb_free(struct tcp_write_state *st, void *cb_data, bool done); extern void tcp_write_init(struct tcp_write_state *st, int fd); extern void tcp_write_exit(struct tcp_write_state *st); extern bool tcp_write_start(struct tcp_write_state *st); ====================================SNIP CUT HERE SNIP========================= static void tcp_write_complete(struct tcp_write_state *st, struct tcp_write *tmp) { list_del(&tmp->node); list_add_tail(&tmp->node, &st->write_compl_q); } bool tcp_wr_cb_free(struct tcp_write_state *st, void *cb_data, bool done) { free(cb_data); return false; } static bool tcp_write_free(struct tcp_write_state *st, struct tcp_write *tmp, bool done) { bool rcb = false; st->write_cnt -= tmp->length; list_del(&tmp->node); if (tmp->cb) rcb = tmp->cb(st, tmp->cb_data, done); free(tmp); return rcb; } static void tcp_write_free_all(struct tcp_write_state *st) { struct tcp_write *wr, *tmp; list_for_each_entry_safe(wr, tmp, &st->write_compl_q, node) { tcp_write_free(st, wr, true); } list_for_each_entry_safe(wr, tmp, &st->write_q, node) { tcp_write_free(st, wr, false); } } bool tcp_write_run_compl(struct tcp_write_state *st) { struct tcp_write *wr; bool do_loop; do_loop = false; while (!list_empty(&st->write_compl_q)) { wr = list_entry(st->write_compl_q.next, struct tcp_write, node); do_loop |= tcp_write_free(st, wr, true); } return do_loop; } static bool tcp_writable(struct tcp_write_state *st) { int n_iov; struct tcp_write *tmp; ssize_t rc; struct iovec iov[TCP_MAX_WR_IOV]; /* accumulate pending writes into iovec */ n_iov = 0; list_for_each_entry(tmp, &st->write_q, node) { if (n_iov == TCP_MAX_WR_IOV) break; /* bleh, struct iovec should declare iov_base const */ iov[n_iov].iov_base = (void *) tmp->buf; iov[n_iov].iov_len = tmp->togo; n_iov++; } /* execute non-blocking write */ do_write: rc = writev(st->fd, iov, n_iov); if (rc < 0) { if (errno == EINTR) goto do_write; if (errno != EAGAIN) goto err_out; return true; } /* iterate through write queue, issuing completions based on * amount of data written */ while (rc > 0) { int sz; /* get pointer to first record on list */ tmp = list_entry(st->write_q.next, struct tcp_write, node); /* mark data consumed by decreasing tmp->len */ sz = (tmp->togo < rc) ? tmp->togo : rc; tmp->togo -= sz; tmp->buf += sz; rc -= sz; /* if tmp->len reaches zero, write is complete, * so schedule it for clean up (cannot call callback * right away or an endless recursion will result) */ if (tmp->togo == 0) tcp_write_complete(st, tmp); } /* if we emptied the queue, clear write notification */ if (list_empty(&st->write_q)) { st->writing = false; if (event_del(&st->write_ev) < 0) goto err_out; } return true; err_out: tcp_write_free_all(st); return false; } bool tcp_write_start(struct tcp_write_state *st) { if (list_empty(&st->write_q)) return true; /* loop, not poll */ /* if write-poll already active, nothing further to do */ if (st->writing) return false; /* poll wait */ /* attempt optimistic write, in hopes of avoiding poll, * or at least refill the write buffers so as to not * get -immediately- called again by the kernel */ tcp_writable(st); if (list_empty(&st->write_q)) { st->opt_write++; return true; /* loop, not poll */ } if (event_add(&st->write_ev, NULL) < 0) return true; /* loop, not poll */ st->writing = true; return false; /* poll wait */ } int tcp_writeq(struct tcp_write_state *st, const void *buf, unsigned int buflen, bool (*cb)(struct tcp_write_state *, void *, bool), void *cb_data) { struct tcp_write *wr; if (!buf || !buflen) return -EINVAL; wr = calloc(1, sizeof(struct tcp_write)); if (!wr) return -ENOMEM; wr->buf = buf; wr->togo = buflen; wr->length = buflen; wr->cb = cb; wr->cb_data = cb_data; list_add_tail(&wr->node, &st->write_q); st->write_cnt += buflen; if (st->write_cnt > st->write_cnt_max) st->write_cnt_max = st->write_cnt; return 0; } size_t tcp_wqueued(struct tcp_write_state *st) { return st->write_cnt; } static void tcp_wr_evt(int fd, short events, void *userdata) { struct tcp_write_state *st = userdata; tcp_writable(st); } void tcp_write_init(struct tcp_write_state *st, int fd) { memset(st, 0, sizeof(*st)); st->fd = fd; INIT_LIST_HEAD(&st->write_q); INIT_LIST_HEAD(&st->write_compl_q); st->write_cnt_max = TCP_MAX_WR_CNT; event_set(&st->write_ev, fd, EV_WRITE | EV_PERSIST, tcp_wr_evt, st); } void tcp_write_exit(struct tcp_write_state *st) { if (st->writing) event_del(&st->write_ev); tcp_write_free_all(st); } ====================================SNIP CUT HERE SNIP========================= --------------030104020400050106000108--