From: Alex Elder <elder@inktank.com>
To: ceph-devel@vger.kernel.org
Subject: [PATCH 4/8] libceph: start defining message data cursor
Date: Sun, 10 Mar 2013 14:16:11 -0500 [thread overview]
Message-ID: <513CDBFB.504@inktank.com> (raw)
In-Reply-To: <513CD9BE.1070505@inktank.com>
This patch lays out the foundation for using generic routines to
manage processing items of message data.
For simplicity, we'll start with just the trail portion of a
message, because it stands alone and is only present for outgoing
data.
First some basic concepts. We'll use the term "data item" to
represent one of the ceph_msg_data structures associated with a
message. There are currently four of those, with single-letter
field names p, l, b, and t. A data item is further broken into
"pieces" which always lie in a single page. A data item will
include a "cursor" that will track state as the memory defined by
the item is consumed by sending data from or receiving data into it.
We define three routines to manipulate a data item's cursor: the
"init" routine; the "next" routine; and the "advance" routine. The
"init" routine initializes the cursor so it points at the beginning
of the first piece in the item. The "next" routine returns the
page, page offset, and length (limited by both the page and item
size) of the next unconsumed piece in the item. It also indicates
to the caller whether the piece being returned is the last one in
the data item.
The "advance" routine consumes the requested number of bytes in the
item (advancing the cursor). This is used to record the number of
bytes from the current piece that were actually sent or received by
the network code. It returns an indication of whether the result
means the current piece has been fully consumed. This is used by
the message send code to determine whether it should calculate the
CRC for the next piece processed.
The trail of a message is implemented as a ceph pagelist. The
routines defined for it will be usable for non-trail pagelist data
as well.
Signed-off-by: Alex Elder <elder@inktank.com>
---
include/linux/ceph/messenger.h | 7 ++
net/ceph/messenger.c | 138
+++++++++++++++++++++++++++++++++++++---
2 files changed, 135 insertions(+), 10 deletions(-)
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 5860dd0..1486243 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -95,6 +95,12 @@ static __inline__ bool ceph_msg_data_type_valid(enum
ceph_msg_data_type type)
}
}
+struct ceph_msg_data_cursor {
+ bool last_piece; /* now at last piece of data item */
+ struct page *page; /* current page in pagelist */
+ size_t offset; /* pagelist bytes consumed */
+};
+
struct ceph_msg_data {
enum ceph_msg_data_type type;
union {
@@ -112,6 +118,7 @@ struct ceph_msg_data {
};
struct ceph_pagelist *pagelist;
};
+ struct ceph_msg_data_cursor cursor; /* pagelist only */
};
/*
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index f256b4b..b978cf8 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -21,6 +21,9 @@
#include <linux/ceph/pagelist.h>
#include <linux/export.h>
+#define list_entry_next(pos, member) \
+ list_entry(pos->member.next, typeof(*pos), member)
+
/*
* Ceph uses the messenger to exchange ceph_msg messages with other
* hosts in the system. The messenger provides ordered and reliable
@@ -738,6 +741,109 @@ static void iter_bio_next(struct bio **bio_iter,
unsigned int *seg)
}
#endif
+/*
+ * Message data is handled (sent or received) in pieces, where each
+ * piece resides on a single page. The network layer might not
+ * consume an entire piece at once. A data item's cursor keeps
+ * track of which piece is next to process and how much remains to
+ * be processed in that piece. It also tracks whether the current
+ * piece is the last one in the data item.
+ */
+static void ceph_msg_data_cursor_init(struct ceph_msg_data *data)
+{
+ struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_pagelist *pagelist;
+ struct page *page;
+
+ if (data->type != CEPH_MSG_DATA_PAGELIST)
+ return;
+
+ pagelist = data->pagelist;
+ BUG_ON(!pagelist);
+ if (!pagelist->length)
+ return; /* pagelist can be assigned but empty */
+
+ BUG_ON(list_empty(&pagelist->head));
+ page = list_first_entry(&pagelist->head, struct page, lru);
+
+ cursor->page = page;
+ cursor->offset = 0;
+ cursor->last_piece = pagelist->length <= PAGE_SIZE;
+}
+
+/*
+ * Return the page containing the next piece to process for a given
+ * data item, and supply the page offset and length of that piece.
+ * Indicate whether this is the last piece in this data item.
+ */
+static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
+ size_t *page_offset,
+ size_t *length,
+ bool *last_piece)
+{
+ struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_pagelist *pagelist;
+ size_t piece_end;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
+
+ pagelist = data->pagelist;
+ BUG_ON(!pagelist);
+
+ BUG_ON(!cursor->page);
+ BUG_ON(cursor->offset >= pagelist->length);
+
+ *last_piece = cursor->last_piece;
+ if (*last_piece) {
+ /* pagelist offset is always 0 */
+ piece_end = pagelist->length & ~PAGE_MASK;
+ if (!piece_end)
+ piece_end = PAGE_SIZE;
+ } else {
+ piece_end = PAGE_SIZE;
+ }
+ *page_offset = cursor->offset & ~PAGE_MASK;
+ *length = piece_end - *page_offset;
+
+ return data->cursor.page;
+}
+
+/*
+ * Returns true if the result moves the cursor on to the next piece
+ * (the next page) of the pagelist.
+ */
+static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
+{
+ struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_pagelist *pagelist;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
+
+ pagelist = data->pagelist;
+ BUG_ON(!pagelist);
+ BUG_ON(!cursor->page);
+ BUG_ON(cursor->offset + bytes > pagelist->length);
+ BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
+
+ /* Advance the cursor offset */
+
+ cursor->offset += bytes;
+ /* pagelist offset is always 0 */
+ if (!bytes || cursor->offset & ~PAGE_MASK)
+ return false; /* more bytes to process in the current page */
+
+ /* Move on to the next page */
+
+ BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
+ cursor->page = list_entry_next(cursor->page, lru);
+
+ /* cursor offset is at page boundary; pagelist offset is always 0 */
+ if (pagelist->length - cursor->offset <= PAGE_SIZE)
+ cursor->last_piece = true;
+
+ return true;
+}
+
static void prepare_message_data(struct ceph_msg *msg,
struct ceph_msg_pos *msg_pos)
{
@@ -755,6 +861,12 @@ static void prepare_message_data(struct ceph_msg *msg,
init_bio_iter(msg->b.bio, &msg->b.bio_iter, &msg->b.bio_seg);
#endif
msg_pos->data_pos = 0;
+
+ /* If there's a trail, initialize its cursor */
+
+ if (ceph_msg_has_trail(msg))
+ ceph_msg_data_cursor_init(&msg->t);
+
msg_pos->did_page_crc = false;
}
@@ -1045,6 +1157,12 @@ static void out_msg_pos_next(struct
ceph_connection *con, struct page *page,
msg_pos->data_pos += sent;
msg_pos->page_pos += sent;
+ if (in_trail) {
+ bool need_crc;
+
+ need_crc = ceph_msg_data_advance(&msg->t, sent);
+ BUG_ON(need_crc && sent != len);
+ }
if (sent < len)
return;
@@ -1052,10 +1170,7 @@ static void out_msg_pos_next(struct
ceph_connection *con, struct page *page,
msg_pos->page_pos = 0;
msg_pos->page++;
msg_pos->did_page_crc = false;
- if (in_trail) {
- BUG_ON(!ceph_msg_has_trail(msg));
- list_rotate_left(&msg->t.pagelist->head);
- } else if (ceph_msg_has_pagelist(msg)) {
+ if (ceph_msg_has_pagelist(msg)) {
list_rotate_left(&msg->l.pagelist->head);
#ifdef CONFIG_BLOCK
} else if (ceph_msg_has_bio(msg)) {
@@ -1141,6 +1256,8 @@ static int write_partial_message_data(struct
ceph_connection *con)
size_t length;
int max_write = PAGE_SIZE;
int bio_offset = 0;
+ bool use_cursor = false;
+ bool last_piece = true; /* preserve existing behavior */
in_trail = in_trail || msg_pos->data_pos >= trail_off;
if (!in_trail)
@@ -1148,9 +1265,9 @@ static int write_partial_message_data(struct
ceph_connection *con)
if (in_trail) {
BUG_ON(!ceph_msg_has_trail(msg));
- total_max_write = data_len - msg_pos->data_pos;
- page = list_first_entry(&msg->t.pagelist->head,
- struct page, lru);
+ use_cursor = true;
+ page = ceph_msg_data_next(&msg->t, &page_offset,
+ &length, &last_piece);
} else if (ceph_msg_has_pages(msg)) {
page = msg->p.pages[msg_pos->page];
} else if (ceph_msg_has_pagelist(msg)) {
@@ -1168,8 +1285,9 @@ static int write_partial_message_data(struct
ceph_connection *con)
} else {
page = zero_page;
}
- length = min_t(int, max_write - msg_pos->page_pos,
- total_max_write);
+ if (!use_cursor)
+ length = min_t(int, max_write - msg_pos->page_pos,
+ total_max_write);
page_offset = msg_pos->page_pos + bio_offset;
if (do_datacrc && !msg_pos->did_page_crc) {
@@ -1180,7 +1298,7 @@ static int write_partial_message_data(struct
ceph_connection *con)
msg_pos->did_page_crc = true;
}
ret = ceph_tcp_sendpage(con->sock, page, page_offset,
- length, true);
+ length, last_piece);
if (ret <= 0)
goto out;
--
1.7.9.5
next prev parent reply other threads:[~2013-03-10 19:16 UTC|newest]
Thread overview: 12+ messages / expand[flat|nested] mbox.gz Atom feed top
2013-03-10 19:06 [PATCH 0/8] libceph: implement cursor for outgoing data items Alex Elder
2013-03-10 19:12 ` [PATCH 1/8] libceph: define ceph_msg_has_*() data macros Alex Elder
2013-03-10 19:14 ` [PATCH 2/8] libceph: be explicit about message data representation Alex Elder
2013-03-10 19:15 ` [PATCH 3/8] libceph: abstract message data Alex Elder
2013-03-10 19:16 ` Alex Elder [this message]
2013-03-10 19:16 ` [PATCH 5/8] libceph: prepare for other message data item types Alex Elder
2013-03-10 19:17 ` [PATCH 6/8] libceph: use data cursor for message pagelist Alex Elder
2013-03-10 19:17 ` [PATCH 7/8] libceph: implement bio message data item cursor Alex Elder
2013-03-10 19:17 ` [PATCH 8/8] libceph: implement pages array cursor Alex Elder
2013-03-10 19:19 ` [PATCH 0/8] libceph: implement cursor for outgoing data items Alex Elder
2013-03-11 22:24 ` Josh Durgin
2013-03-11 22:26 ` Alex Elder
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=513CDBFB.504@inktank.com \
--to=elder@inktank.com \
--cc=ceph-devel@vger.kernel.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.