All of lore.kernel.org
 help / color / mirror / Atom feed
From: Alex Elder <elder@inktank.com>
To: ceph-devel@vger.kernel.org
Subject: [PATCH 8/8] libceph: implement pages array cursor
Date: Sun, 10 Mar 2013 14:17:53 -0500	[thread overview]
Message-ID: <513CDC61.5070004@inktank.com> (raw)
In-Reply-To: <513CD9BE.1070505@inktank.com>

Implement and use cursor routines for page array message data items
for outbound message data.

Signed-off-by: Alex Elder <elder@inktank.com>
---
 include/linux/ceph/messenger.h |    6 +++
 net/ceph/messenger.c           |   92
++++++++++++++++++++++++++++++++++++++--
 2 files changed, 95 insertions(+), 3 deletions(-)

diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 76b4645..b53b9ef 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -105,6 +105,12 @@ struct ceph_msg_data_cursor {
 			unsigned int	vector_offset;	/* bytes from vector */
 		};
 #endif /* CONFIG_BLOCK */
+		struct {				/* pages */
+			size_t		resid;		/* bytes from array */
+			unsigned int	page_offset;	/* offset in page */
+			unsigned short	page_index;	/* index in array */
+			unsigned short	page_count;	/* pages in array */
+		};
 		struct {				/* pagelist */
 			struct page	*page;		/* page from list */
 			size_t		offset;		/* bytes from list */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 5ddbe5d..12d416f 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -833,6 +833,81 @@ static bool ceph_msg_data_bio_advance(struct
ceph_msg_data *data, size_t bytes)
 #endif

 /*
+ * For a page array, a piece comes from the first page in the array
+ * that has not already been fully consumed.
+ */
+static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data)
+{
+	struct ceph_msg_data_cursor *cursor = &data->cursor;
+	int page_count;
+
+	BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
+
+	BUG_ON(!data->pages);
+	BUG_ON(!data->length);
+
+	page_count = calc_pages_for(data->alignment, (u64)data->length);
+	BUG_ON(page_count > (int) USHRT_MAX);
+	cursor->resid = data->length;
+	cursor->page_offset = data->alignment & ~PAGE_MASK;
+	cursor->page_index = 0;
+	cursor->page_count = (unsigned short) page_count;
+	cursor->last_piece = cursor->page_count == 1;
+}
+
+static struct page *ceph_msg_data_pages_next(struct ceph_msg_data *data,
+					size_t *page_offset,
+					size_t *length,
+					bool *last_piece)
+{
+	struct ceph_msg_data_cursor *cursor = &data->cursor;
+
+	BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
+
+	BUG_ON(cursor->page_index >= cursor->page_count);
+	BUG_ON(cursor->page_offset >= PAGE_SIZE);
+	BUG_ON(!cursor->resid);
+
+	*page_offset = cursor->page_offset;
+	*last_piece = cursor->last_piece;
+	if (*last_piece) {
+		BUG_ON(*page_offset + cursor->resid > PAGE_SIZE);
+		*length = cursor->resid;
+	} else {
+		*length = PAGE_SIZE - *page_offset;
+	}
+
+	return data->pages[cursor->page_index];
+}
+
+static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
+						size_t bytes)
+{
+	struct ceph_msg_data_cursor *cursor = &data->cursor;
+
+	BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
+
+	BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
+	BUG_ON(bytes > cursor->resid);
+
+	/* Advance the cursor page offset */
+
+	cursor->resid -= bytes;
+	cursor->page_offset += bytes;
+	if (!bytes || cursor->page_offset & ~PAGE_MASK)
+		return false;	/* more bytes to process in the current page */
+
+	/* Move on to the next page */
+
+	BUG_ON(cursor->page_index >= cursor->page_count);
+	cursor->page_offset = 0;
+	cursor->page_index++;
+	cursor->last_piece = cursor->page_index == cursor->page_count - 1;
+
+	return true;
+}
+
+/*
  * For a pagelist, a piece is whatever remains to be consumed in the
  * first page in the list, or the front of the next page.
  */
@@ -934,7 +1009,9 @@ static void ceph_msg_data_cursor_init(struct
ceph_msg_data *data)
 {
 	switch (data->type) {
 	case CEPH_MSG_DATA_NONE:
+		break;
 	case CEPH_MSG_DATA_PAGES:
+		ceph_msg_data_pages_cursor_init(data);
 		break;
 	case CEPH_MSG_DATA_PAGELIST:
 		ceph_msg_data_pagelist_cursor_init(data);
@@ -961,8 +1038,10 @@ static struct page *ceph_msg_data_next(struct
ceph_msg_data *data,
 {
 	switch (data->type) {
 	case CEPH_MSG_DATA_NONE:
-	case CEPH_MSG_DATA_PAGES:
 		break;
+	case CEPH_MSG_DATA_PAGES:
+		return ceph_msg_data_pages_next(data, page_offset, length,
+						last_piece);
 	case CEPH_MSG_DATA_PAGELIST:
 		return ceph_msg_data_pagelist_next(data, page_offset, length,
 						last_piece);
@@ -984,8 +1063,9 @@ static bool ceph_msg_data_advance(struct
ceph_msg_data *data, size_t bytes)
 {
 	switch (data->type) {
 	case CEPH_MSG_DATA_NONE:
-	case CEPH_MSG_DATA_PAGES:
 		break;
+	case CEPH_MSG_DATA_PAGES:
+		return ceph_msg_data_pages_advance(data, bytes);
 	case CEPH_MSG_DATA_PAGELIST:
 		return ceph_msg_data_pagelist_advance(data, bytes);
 #ifdef CONFIG_BLOCK
@@ -1021,6 +1101,8 @@ static void prepare_message_data(struct ceph_msg *msg,
 	if (ceph_msg_has_bio(msg))
 		ceph_msg_data_cursor_init(&msg->b);
 #endif /* CONFIG_BLOCK */
+	if (ceph_msg_has_pages(msg))
+		ceph_msg_data_cursor_init(&msg->p);
 	if (ceph_msg_has_pagelist(msg))
 		ceph_msg_data_cursor_init(&msg->l);
 	if (ceph_msg_has_trail(msg))
@@ -1319,6 +1401,8 @@ static void out_msg_pos_next(struct
ceph_connection *con, struct page *page,
 	msg_pos->page_pos += sent;
 	if (in_trail)
 		need_crc = ceph_msg_data_advance(&msg->t, sent);
+	else if (ceph_msg_has_pages(msg))
+		need_crc = ceph_msg_data_advance(&msg->p, sent);
 	else if (ceph_msg_has_pagelist(msg))
 		need_crc = ceph_msg_data_advance(&msg->l, sent);
 #ifdef CONFIG_BLOCK
@@ -1424,7 +1508,9 @@ static int write_partial_message_data(struct
ceph_connection *con)
 			page = ceph_msg_data_next(&msg->t, &page_offset,
 							&length, &last_piece);
 		} else if (ceph_msg_has_pages(msg)) {
-			page = msg->p.pages[msg_pos->page];
+			use_cursor = true;
+			page = ceph_msg_data_next(&msg->p, &page_offset,
+							&length, &last_piece);
 		} else if (ceph_msg_has_pagelist(msg)) {
 			use_cursor = true;
 			page = ceph_msg_data_next(&msg->l, &page_offset,
-- 
1.7.9.5


  parent reply	other threads:[~2013-03-10 19:17 UTC|newest]

Thread overview: 12+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2013-03-10 19:06 [PATCH 0/8] libceph: implement cursor for outgoing data items Alex Elder
2013-03-10 19:12 ` [PATCH 1/8] libceph: define ceph_msg_has_*() data macros Alex Elder
2013-03-10 19:14 ` [PATCH 2/8] libceph: be explicit about message data representation Alex Elder
2013-03-10 19:15 ` [PATCH 3/8] libceph: abstract message data Alex Elder
2013-03-10 19:16 ` [PATCH 4/8] libceph: start defining message data cursor Alex Elder
2013-03-10 19:16 ` [PATCH 5/8] libceph: prepare for other message data item types Alex Elder
2013-03-10 19:17 ` [PATCH 6/8] libceph: use data cursor for message pagelist Alex Elder
2013-03-10 19:17 ` [PATCH 7/8] libceph: implement bio message data item cursor Alex Elder
2013-03-10 19:17 ` Alex Elder [this message]
2013-03-10 19:19 ` [PATCH 0/8] libceph: implement cursor for outgoing data items Alex Elder
2013-03-11 22:24 ` Josh Durgin
2013-03-11 22:26   ` Alex Elder

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=513CDC61.5070004@inktank.com \
    --to=elder@inktank.com \
    --cc=ceph-devel@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.