* [PATCH 1/6] libceph: record bio length
2013-04-05 22:24 [PATCH 0/6] Alex Elder
@ 2013-04-05 22:25 ` Alex Elder
2013-04-08 23:58 ` Josh Durgin
2013-04-05 22:25 ` [PATCH 2/6] libceph: move cursor into message Alex Elder
` (4 subsequent siblings)
5 siblings, 1 reply; 22+ messages in thread
From: Alex Elder @ 2013-04-05 22:25 UTC (permalink / raw)
To: ceph-devel@vger.kernel.org
The bio is the only data item type that doesn't record its full
length. Fix that.
Signed-off-by: Alex Elder <elder@inktank.com>
---
include/linux/ceph/messenger.h | 5 ++++-
net/ceph/messenger.c | 1 +
2 files changed, 5 insertions(+), 1 deletion(-)
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index cdeebae..4fb870a 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -116,7 +116,10 @@ struct ceph_msg_data {
enum ceph_msg_data_type type;
union {
#ifdef CONFIG_BLOCK
- struct bio *bio;
+ struct {
+ struct bio *bio;
+ size_t bio_length;
+ };
#endif /* CONFIG_BLOCK */
struct {
struct page **pages; /* NOT OWNER. */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index ae825e44..9571d03 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -3030,6 +3030,7 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg,
struct bio *bio,
data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
BUG_ON(!data);
data->bio = bio;
+ data->bio_length = length;
msg->data = data;
msg->data_length = length;
--
1.7.9.5
^ permalink raw reply related [flat|nested] 22+ messages in thread* Re: [PATCH 1/6] libceph: record bio length
2013-04-05 22:25 ` [PATCH 1/6] libceph: record bio length Alex Elder
@ 2013-04-08 23:58 ` Josh Durgin
0 siblings, 0 replies; 22+ messages in thread
From: Josh Durgin @ 2013-04-08 23:58 UTC (permalink / raw)
To: Alex Elder; +Cc: ceph-devel@vger.kernel.org
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
On 04/05/2013 03:25 PM, Alex Elder wrote:
> The bio is the only data item type that doesn't record its full
> length. Fix that.
>
> Signed-off-by: Alex Elder <elder@inktank.com>
> ---
> include/linux/ceph/messenger.h | 5 ++++-
> net/ceph/messenger.c | 1 +
> 2 files changed, 5 insertions(+), 1 deletion(-)
>
> diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
> index cdeebae..4fb870a 100644
> --- a/include/linux/ceph/messenger.h
> +++ b/include/linux/ceph/messenger.h
> @@ -116,7 +116,10 @@ struct ceph_msg_data {
> enum ceph_msg_data_type type;
> union {
> #ifdef CONFIG_BLOCK
> - struct bio *bio;
> + struct {
> + struct bio *bio;
> + size_t bio_length;
> + };
> #endif /* CONFIG_BLOCK */
> struct {
> struct page **pages; /* NOT OWNER. */
> diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
> index ae825e44..9571d03 100644
> --- a/net/ceph/messenger.c
> +++ b/net/ceph/messenger.c
> @@ -3030,6 +3030,7 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg,
> struct bio *bio,
> data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
> BUG_ON(!data);
> data->bio = bio;
> + data->bio_length = length;
>
> msg->data = data;
> msg->data_length = length;
>
^ permalink raw reply [flat|nested] 22+ messages in thread
* [PATCH 2/6] libceph: move cursor into message
2013-04-05 22:24 [PATCH 0/6] Alex Elder
2013-04-05 22:25 ` [PATCH 1/6] libceph: record bio length Alex Elder
@ 2013-04-05 22:25 ` Alex Elder
2013-04-08 23:58 ` Josh Durgin
2013-04-05 22:25 ` [PATCH 3/6] libceph: have cursor point to data Alex Elder
` (3 subsequent siblings)
5 siblings, 1 reply; 22+ messages in thread
From: Alex Elder @ 2013-04-05 22:25 UTC (permalink / raw)
To: ceph-devel@vger.kernel.org
A message will only be processing a single data item at a time, so
there's no need for each data item to have its own cursor.
Move the cursor embedded in the message data structure into the
message itself. To minimize the impact, keep the data->cursor
field, but make it be a pointer to the cursor in the message.
Move the definition of ceph_msg_data above ceph_msg_data_cursor so
the cursor can point to the data without a forward definition rather
than vice-versa.
This and the upcoming patches are part of:
http://tracker.ceph.com/issues/3761
Signed-off-by: Alex Elder <elder@inktank.com>
---
include/linux/ceph/messenger.h | 43
++++++++++++++++++++--------------------
net/ceph/messenger.c | 35 +++++++++++++++++---------------
2 files changed, 41 insertions(+), 37 deletions(-)
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 4fb870a..e755724 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -88,6 +88,25 @@ static __inline__ bool ceph_msg_data_type_valid(enum
ceph_msg_data_type type)
}
}
+struct ceph_msg_data {
+ enum ceph_msg_data_type type;
+ union {
+#ifdef CONFIG_BLOCK
+ struct {
+ struct bio *bio;
+ size_t bio_length;
+ };
+#endif /* CONFIG_BLOCK */
+ struct {
+ struct page **pages; /* NOT OWNER. */
+ size_t length; /* total # bytes */
+ unsigned int alignment; /* first page */
+ };
+ struct ceph_pagelist *pagelist;
+ };
+ struct ceph_msg_data_cursor *cursor;
+};
+
struct ceph_msg_data_cursor {
size_t resid; /* bytes not yet consumed */
bool last_piece; /* now at last piece of data item */
@@ -112,25 +131,6 @@ struct ceph_msg_data_cursor {
};
};
-struct ceph_msg_data {
- enum ceph_msg_data_type type;
- union {
-#ifdef CONFIG_BLOCK
- struct {
- struct bio *bio;
- size_t bio_length;
- };
-#endif /* CONFIG_BLOCK */
- struct {
- struct page **pages; /* NOT OWNER. */
- size_t length; /* total # bytes */
- unsigned int alignment; /* first page */
- };
- struct ceph_pagelist *pagelist;
- };
- struct ceph_msg_data_cursor cursor; /* pagelist only */
-};
-
/*
* a single message. it contains a header (src, dest, message type, etc.),
* footer (crc values, mainly), a "front" message body, and possibly a
@@ -142,8 +142,9 @@ struct ceph_msg {
struct kvec front; /* unaligned blobs of message */
struct ceph_buffer *middle;
- size_t data_length;
- struct ceph_msg_data *data; /* data payload */
+ size_t data_length;
+ struct ceph_msg_data *data;
+ struct ceph_msg_data_cursor cursor;
struct ceph_connection *con;
struct list_head list_head; /* links for connection lists */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 9571d03..287b7fb 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -725,7 +725,7 @@ static void con_out_kvec_add(struct ceph_connection
*con,
static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data,
size_t length)
{
- struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_msg_data_cursor *cursor = data->cursor;
struct bio *bio;
BUG_ON(data->type != CEPH_MSG_DATA_BIO);
@@ -745,7 +745,7 @@ static struct page *ceph_msg_data_bio_next(struct
ceph_msg_data *data,
size_t *page_offset,
size_t *length)
{
- struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_msg_data_cursor *cursor = data->cursor;
struct bio *bio;
struct bio_vec *bio_vec;
unsigned int index;
@@ -774,7 +774,7 @@ static struct page *ceph_msg_data_bio_next(struct
ceph_msg_data *data,
static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data,
size_t bytes)
{
- struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_msg_data_cursor *cursor = data->cursor;
struct bio *bio;
struct bio_vec *bio_vec;
unsigned int index;
@@ -826,7 +826,7 @@ static bool ceph_msg_data_bio_advance(struct
ceph_msg_data *data, size_t bytes)
static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data,
size_t length)
{
- struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_msg_data_cursor *cursor = data->cursor;
int page_count;
BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
@@ -849,7 +849,7 @@ static struct page *ceph_msg_data_pages_next(struct
ceph_msg_data *data,
size_t *page_offset,
size_t *length)
{
- struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_msg_data_cursor *cursor = data->cursor;
BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
@@ -868,7 +868,7 @@ static struct page *ceph_msg_data_pages_next(struct
ceph_msg_data *data,
static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
size_t bytes)
{
- struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_msg_data_cursor *cursor = data->cursor;
BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
@@ -897,7 +897,7 @@ static bool ceph_msg_data_pages_advance(struct
ceph_msg_data *data,
static void ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data,
size_t length)
{
- struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_msg_data_cursor *cursor = data->cursor;
struct ceph_pagelist *pagelist;
struct page *page;
@@ -923,7 +923,7 @@ static struct page
*ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
size_t *page_offset,
size_t *length)
{
- struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_msg_data_cursor *cursor = data->cursor;
struct ceph_pagelist *pagelist;
BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
@@ -941,13 +941,13 @@ static struct page
*ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
else
*length = PAGE_SIZE - *page_offset;
- return data->cursor.page;
+ return data->cursor->page;
}
static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data *data,
size_t bytes)
{
- struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_msg_data_cursor *cursor = data->cursor;
struct ceph_pagelist *pagelist;
BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
@@ -1003,7 +1003,7 @@ static void ceph_msg_data_cursor_init(struct
ceph_msg_data *data,
/* BUG(); */
break;
}
- data->cursor.need_crc = true;
+ data->cursor->need_crc = true;
}
/*
@@ -1039,7 +1039,7 @@ static struct page *ceph_msg_data_next(struct
ceph_msg_data *data,
BUG_ON(*page_offset + *length > PAGE_SIZE);
BUG_ON(!*length);
if (last_piece)
- *last_piece = data->cursor.last_piece;
+ *last_piece = data->cursor->last_piece;
return page;
}
@@ -1050,7 +1050,7 @@ static struct page *ceph_msg_data_next(struct
ceph_msg_data *data,
*/
static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
{
- struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_msg_data_cursor *cursor = data->cursor;
bool new_piece;
BUG_ON(bytes > cursor->resid);
@@ -1071,7 +1071,7 @@ static bool ceph_msg_data_advance(struct
ceph_msg_data *data, size_t bytes)
BUG();
break;
}
- data->cursor.need_crc = new_piece;
+ data->cursor->need_crc = new_piece;
return new_piece;
}
@@ -1404,7 +1404,7 @@ static u32 ceph_crc32c_page(u32 crc, struct page
*page,
static int write_partial_message_data(struct ceph_connection *con)
{
struct ceph_msg *msg = con->out_msg;
- struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
+ struct ceph_msg_data_cursor *cursor = msg->data->cursor;
bool do_datacrc = !con->msgr->nocrc;
u32 crc;
@@ -2102,7 +2102,7 @@ static int read_partial_message_section(struct
ceph_connection *con,
static int read_partial_msg_data(struct ceph_connection *con)
{
struct ceph_msg *msg = con->in_msg;
- struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
+ struct ceph_msg_data_cursor *cursor = msg->data->cursor;
const bool do_datacrc = !con->msgr->nocrc;
struct page *page;
size_t page_offset;
@@ -2989,6 +2989,7 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg,
struct page **pages,
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
BUG_ON(!data);
+ data->cursor = &msg->cursor;
data->pages = pages;
data->length = length;
data->alignment = alignment & ~PAGE_MASK;
@@ -3010,6 +3011,7 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
BUG_ON(!data);
+ data->cursor = &msg->cursor;
data->pagelist = pagelist;
msg->data = data;
@@ -3029,6 +3031,7 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg,
struct bio *bio,
data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
BUG_ON(!data);
+ data->cursor = &msg->cursor;
data->bio = bio;
data->bio_length = length;
--
1.7.9.5
^ permalink raw reply related [flat|nested] 22+ messages in thread* Re: [PATCH 2/6] libceph: move cursor into message
2013-04-05 22:25 ` [PATCH 2/6] libceph: move cursor into message Alex Elder
@ 2013-04-08 23:58 ` Josh Durgin
0 siblings, 0 replies; 22+ messages in thread
From: Josh Durgin @ 2013-04-08 23:58 UTC (permalink / raw)
To: Alex Elder; +Cc: ceph-devel@vger.kernel.org
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
On 04/05/2013 03:25 PM, Alex Elder wrote:
> A message will only be processing a single data item at a time, so
> there's no need for each data item to have its own cursor.
>
> Move the cursor embedded in the message data structure into the
> message itself. To minimize the impact, keep the data->cursor
> field, but make it be a pointer to the cursor in the message.
>
> Move the definition of ceph_msg_data above ceph_msg_data_cursor so
> the cursor can point to the data without a forward definition rather
> than vice-versa.
>
> This and the upcoming patches are part of:
> http://tracker.ceph.com/issues/3761
>
> Signed-off-by: Alex Elder <elder@inktank.com>
> ---
> include/linux/ceph/messenger.h | 43
> ++++++++++++++++++++--------------------
> net/ceph/messenger.c | 35 +++++++++++++++++---------------
> 2 files changed, 41 insertions(+), 37 deletions(-)
>
> diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
> index 4fb870a..e755724 100644
> --- a/include/linux/ceph/messenger.h
> +++ b/include/linux/ceph/messenger.h
> @@ -88,6 +88,25 @@ static __inline__ bool ceph_msg_data_type_valid(enum
> ceph_msg_data_type type)
> }
> }
>
> +struct ceph_msg_data {
> + enum ceph_msg_data_type type;
> + union {
> +#ifdef CONFIG_BLOCK
> + struct {
> + struct bio *bio;
> + size_t bio_length;
> + };
> +#endif /* CONFIG_BLOCK */
> + struct {
> + struct page **pages; /* NOT OWNER. */
> + size_t length; /* total # bytes */
> + unsigned int alignment; /* first page */
> + };
> + struct ceph_pagelist *pagelist;
> + };
> + struct ceph_msg_data_cursor *cursor;
> +};
> +
> struct ceph_msg_data_cursor {
> size_t resid; /* bytes not yet consumed */
> bool last_piece; /* now at last piece of data item */
> @@ -112,25 +131,6 @@ struct ceph_msg_data_cursor {
> };
> };
>
> -struct ceph_msg_data {
> - enum ceph_msg_data_type type;
> - union {
> -#ifdef CONFIG_BLOCK
> - struct {
> - struct bio *bio;
> - size_t bio_length;
> - };
> -#endif /* CONFIG_BLOCK */
> - struct {
> - struct page **pages; /* NOT OWNER. */
> - size_t length; /* total # bytes */
> - unsigned int alignment; /* first page */
> - };
> - struct ceph_pagelist *pagelist;
> - };
> - struct ceph_msg_data_cursor cursor; /* pagelist only */
> -};
> -
> /*
> * a single message. it contains a header (src, dest, message type, etc.),
> * footer (crc values, mainly), a "front" message body, and possibly a
> @@ -142,8 +142,9 @@ struct ceph_msg {
> struct kvec front; /* unaligned blobs of message */
> struct ceph_buffer *middle;
>
> - size_t data_length;
> - struct ceph_msg_data *data; /* data payload */
> + size_t data_length;
> + struct ceph_msg_data *data;
> + struct ceph_msg_data_cursor cursor;
>
> struct ceph_connection *con;
> struct list_head list_head; /* links for connection lists */
> diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
> index 9571d03..287b7fb 100644
> --- a/net/ceph/messenger.c
> +++ b/net/ceph/messenger.c
> @@ -725,7 +725,7 @@ static void con_out_kvec_add(struct ceph_connection
> *con,
> static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data,
> size_t length)
> {
> - struct ceph_msg_data_cursor *cursor = &data->cursor;
> + struct ceph_msg_data_cursor *cursor = data->cursor;
> struct bio *bio;
>
> BUG_ON(data->type != CEPH_MSG_DATA_BIO);
> @@ -745,7 +745,7 @@ static struct page *ceph_msg_data_bio_next(struct
> ceph_msg_data *data,
> size_t *page_offset,
> size_t *length)
> {
> - struct ceph_msg_data_cursor *cursor = &data->cursor;
> + struct ceph_msg_data_cursor *cursor = data->cursor;
> struct bio *bio;
> struct bio_vec *bio_vec;
> unsigned int index;
> @@ -774,7 +774,7 @@ static struct page *ceph_msg_data_bio_next(struct
> ceph_msg_data *data,
>
> static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data,
> size_t bytes)
> {
> - struct ceph_msg_data_cursor *cursor = &data->cursor;
> + struct ceph_msg_data_cursor *cursor = data->cursor;
> struct bio *bio;
> struct bio_vec *bio_vec;
> unsigned int index;
> @@ -826,7 +826,7 @@ static bool ceph_msg_data_bio_advance(struct
> ceph_msg_data *data, size_t bytes)
> static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data,
> size_t length)
> {
> - struct ceph_msg_data_cursor *cursor = &data->cursor;
> + struct ceph_msg_data_cursor *cursor = data->cursor;
> int page_count;
>
> BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
> @@ -849,7 +849,7 @@ static struct page *ceph_msg_data_pages_next(struct
> ceph_msg_data *data,
> size_t *page_offset,
> size_t *length)
> {
> - struct ceph_msg_data_cursor *cursor = &data->cursor;
> + struct ceph_msg_data_cursor *cursor = data->cursor;
>
> BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
>
> @@ -868,7 +868,7 @@ static struct page *ceph_msg_data_pages_next(struct
> ceph_msg_data *data,
> static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
> size_t bytes)
> {
> - struct ceph_msg_data_cursor *cursor = &data->cursor;
> + struct ceph_msg_data_cursor *cursor = data->cursor;
>
> BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
>
> @@ -897,7 +897,7 @@ static bool ceph_msg_data_pages_advance(struct
> ceph_msg_data *data,
> static void ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data,
> size_t length)
> {
> - struct ceph_msg_data_cursor *cursor = &data->cursor;
> + struct ceph_msg_data_cursor *cursor = data->cursor;
> struct ceph_pagelist *pagelist;
> struct page *page;
>
> @@ -923,7 +923,7 @@ static struct page
> *ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
> size_t *page_offset,
> size_t *length)
> {
> - struct ceph_msg_data_cursor *cursor = &data->cursor;
> + struct ceph_msg_data_cursor *cursor = data->cursor;
> struct ceph_pagelist *pagelist;
>
> BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
> @@ -941,13 +941,13 @@ static struct page
> *ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
> else
> *length = PAGE_SIZE - *page_offset;
>
> - return data->cursor.page;
> + return data->cursor->page;
> }
>
> static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data *data,
> size_t bytes)
> {
> - struct ceph_msg_data_cursor *cursor = &data->cursor;
> + struct ceph_msg_data_cursor *cursor = data->cursor;
> struct ceph_pagelist *pagelist;
>
> BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
> @@ -1003,7 +1003,7 @@ static void ceph_msg_data_cursor_init(struct
> ceph_msg_data *data,
> /* BUG(); */
> break;
> }
> - data->cursor.need_crc = true;
> + data->cursor->need_crc = true;
> }
>
> /*
> @@ -1039,7 +1039,7 @@ static struct page *ceph_msg_data_next(struct
> ceph_msg_data *data,
> BUG_ON(*page_offset + *length > PAGE_SIZE);
> BUG_ON(!*length);
> if (last_piece)
> - *last_piece = data->cursor.last_piece;
> + *last_piece = data->cursor->last_piece;
>
> return page;
> }
> @@ -1050,7 +1050,7 @@ static struct page *ceph_msg_data_next(struct
> ceph_msg_data *data,
> */
> static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
> {
> - struct ceph_msg_data_cursor *cursor = &data->cursor;
> + struct ceph_msg_data_cursor *cursor = data->cursor;
> bool new_piece;
>
> BUG_ON(bytes > cursor->resid);
> @@ -1071,7 +1071,7 @@ static bool ceph_msg_data_advance(struct
> ceph_msg_data *data, size_t bytes)
> BUG();
> break;
> }
> - data->cursor.need_crc = new_piece;
> + data->cursor->need_crc = new_piece;
>
> return new_piece;
> }
> @@ -1404,7 +1404,7 @@ static u32 ceph_crc32c_page(u32 crc, struct page
> *page,
> static int write_partial_message_data(struct ceph_connection *con)
> {
> struct ceph_msg *msg = con->out_msg;
> - struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
> + struct ceph_msg_data_cursor *cursor = msg->data->cursor;
> bool do_datacrc = !con->msgr->nocrc;
> u32 crc;
>
> @@ -2102,7 +2102,7 @@ static int read_partial_message_section(struct
> ceph_connection *con,
> static int read_partial_msg_data(struct ceph_connection *con)
> {
> struct ceph_msg *msg = con->in_msg;
> - struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
> + struct ceph_msg_data_cursor *cursor = msg->data->cursor;
> const bool do_datacrc = !con->msgr->nocrc;
> struct page *page;
> size_t page_offset;
> @@ -2989,6 +2989,7 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg,
> struct page **pages,
>
> data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
> BUG_ON(!data);
> + data->cursor = &msg->cursor;
> data->pages = pages;
> data->length = length;
> data->alignment = alignment & ~PAGE_MASK;
> @@ -3010,6 +3011,7 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
>
> data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
> BUG_ON(!data);
> + data->cursor = &msg->cursor;
> data->pagelist = pagelist;
>
> msg->data = data;
> @@ -3029,6 +3031,7 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg,
> struct bio *bio,
>
> data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
> BUG_ON(!data);
> + data->cursor = &msg->cursor;
> data->bio = bio;
> data->bio_length = length;
>
^ permalink raw reply [flat|nested] 22+ messages in thread
* [PATCH 3/6] libceph: have cursor point to data
2013-04-05 22:24 [PATCH 0/6] Alex Elder
2013-04-05 22:25 ` [PATCH 1/6] libceph: record bio length Alex Elder
2013-04-05 22:25 ` [PATCH 2/6] libceph: move cursor into message Alex Elder
@ 2013-04-05 22:25 ` Alex Elder
2013-04-08 23:58 ` Josh Durgin
2013-04-05 22:26 ` [PATCH 4/6] libceph: replace message data pointer with list Alex Elder
` (2 subsequent siblings)
5 siblings, 1 reply; 22+ messages in thread
From: Alex Elder @ 2013-04-05 22:25 UTC (permalink / raw)
To: ceph-devel@vger.kernel.org
Rather than having a ceph message data item point to the cursor it's
associated with, have the cursor point to a data item. This will
allow a message cursor to be used for more than one data item.
Signed-off-by: Alex Elder <elder@inktank.com>
---
include/linux/ceph/messenger.h | 8 +--
net/ceph/messenger.c | 113
+++++++++++++++++++---------------------
2 files changed, 59 insertions(+), 62 deletions(-)
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index e755724..8846ff6 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -104,13 +104,13 @@ struct ceph_msg_data {
};
struct ceph_pagelist *pagelist;
};
- struct ceph_msg_data_cursor *cursor;
};
struct ceph_msg_data_cursor {
- size_t resid; /* bytes not yet consumed */
- bool last_piece; /* now at last piece of data item */
- bool need_crc; /* new piece; crc update needed */
+ struct ceph_msg_data *data; /* data item this describes */
+ size_t resid; /* bytes not yet consumed */
+ bool last_piece; /* current is last piece */
+ bool need_crc; /* crc update needed */
union {
#ifdef CONFIG_BLOCK
struct { /* bio */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 287b7fb..50b4093 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -722,10 +722,10 @@ static void con_out_kvec_add(struct
ceph_connection *con,
* entry in the current bio iovec, or the first entry in the next
* bio in the list.
*/
-static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data,
+static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor
*cursor,
size_t length)
{
- struct ceph_msg_data_cursor *cursor = data->cursor;
+ struct ceph_msg_data *data = cursor->data;
struct bio *bio;
BUG_ON(data->type != CEPH_MSG_DATA_BIO);
@@ -741,11 +741,11 @@ static void ceph_msg_data_bio_cursor_init(struct
ceph_msg_data *data,
cursor->last_piece = length <= bio->bi_io_vec[0].bv_len;
}
-static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data,
+static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor
*cursor,
size_t *page_offset,
size_t *length)
{
- struct ceph_msg_data_cursor *cursor = data->cursor;
+ struct ceph_msg_data *data = cursor->data;
struct bio *bio;
struct bio_vec *bio_vec;
unsigned int index;
@@ -772,14 +772,14 @@ static struct page *ceph_msg_data_bio_next(struct
ceph_msg_data *data,
return bio_vec->bv_page;
}
-static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data,
size_t bytes)
+static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
+ size_t bytes)
{
- struct ceph_msg_data_cursor *cursor = data->cursor;
struct bio *bio;
struct bio_vec *bio_vec;
unsigned int index;
- BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+ BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO);
bio = cursor->bio;
BUG_ON(!bio);
@@ -823,10 +823,10 @@ static bool ceph_msg_data_bio_advance(struct
ceph_msg_data *data, size_t bytes)
* For a page array, a piece comes from the first page in the array
* that has not already been fully consumed.
*/
-static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data,
+static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor
*cursor,
size_t length)
{
- struct ceph_msg_data_cursor *cursor = data->cursor;
+ struct ceph_msg_data *data = cursor->data;
int page_count;
BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
@@ -845,11 +845,11 @@ static void ceph_msg_data_pages_cursor_init(struct
ceph_msg_data *data,
cursor->last_piece = (size_t)cursor->page_offset + length <= PAGE_SIZE;
}
-static struct page *ceph_msg_data_pages_next(struct ceph_msg_data *data,
- size_t *page_offset,
- size_t *length)
+static struct page *
+ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset, size_t *length)
{
- struct ceph_msg_data_cursor *cursor = data->cursor;
+ struct ceph_msg_data *data = cursor->data;
BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
@@ -865,12 +865,10 @@ static struct page
*ceph_msg_data_pages_next(struct ceph_msg_data *data,
return data->pages[cursor->page_index];
}
-static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
+static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor
*cursor,
size_t bytes)
{
- struct ceph_msg_data_cursor *cursor = data->cursor;
-
- BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
+ BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES);
BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
@@ -894,10 +892,11 @@ static bool ceph_msg_data_pages_advance(struct
ceph_msg_data *data,
* For a pagelist, a piece is whatever remains to be consumed in the
* first page in the list, or the front of the next page.
*/
-static void ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data,
+static void
+ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
size_t length)
{
- struct ceph_msg_data_cursor *cursor = data->cursor;
+ struct ceph_msg_data *data = cursor->data;
struct ceph_pagelist *pagelist;
struct page *page;
@@ -919,11 +918,11 @@ static void
ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data,
cursor->last_piece = length <= PAGE_SIZE;
}
-static struct page *ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
- size_t *page_offset,
- size_t *length)
+static struct page *
+ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset, size_t *length)
{
- struct ceph_msg_data_cursor *cursor = data->cursor;
+ struct ceph_msg_data *data = cursor->data;
struct ceph_pagelist *pagelist;
BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
@@ -941,13 +940,13 @@ static struct page
*ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
else
*length = PAGE_SIZE - *page_offset;
- return data->cursor->page;
+ return cursor->page;
}
-static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data *data,
+static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor
*cursor,
size_t bytes)
{
- struct ceph_msg_data_cursor *cursor = data->cursor;
+ struct ceph_msg_data *data = cursor->data;
struct ceph_pagelist *pagelist;
BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
@@ -983,19 +982,21 @@ static bool ceph_msg_data_pagelist_advance(struct
ceph_msg_data *data,
* be processed in that piece. It also tracks whether the current
* piece is the last one in the data item.
*/
-static void ceph_msg_data_cursor_init(struct ceph_msg_data *data,
- size_t length)
+static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
{
- switch (data->type) {
+ struct ceph_msg_data_cursor *cursor = &msg->cursor;
+
+ cursor->data = msg->data;
+ switch (cursor->data->type) {
case CEPH_MSG_DATA_PAGELIST:
- ceph_msg_data_pagelist_cursor_init(data, length);
+ ceph_msg_data_pagelist_cursor_init(cursor, length);
break;
case CEPH_MSG_DATA_PAGES:
- ceph_msg_data_pages_cursor_init(data, length);
+ ceph_msg_data_pages_cursor_init(cursor, length);
break;
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
- ceph_msg_data_bio_cursor_init(data, length);
+ ceph_msg_data_bio_cursor_init(cursor, length);
break;
#endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE:
@@ -1003,7 +1004,7 @@ static void ceph_msg_data_cursor_init(struct
ceph_msg_data *data,
/* BUG(); */
break;
}
- data->cursor->need_crc = true;
+ cursor->need_crc = true;
}
/*
@@ -1011,23 +1012,22 @@ static void ceph_msg_data_cursor_init(struct
ceph_msg_data *data,
* data item, and supply the page offset and length of that piece.
* Indicate whether this is the last piece in this data item.
*/
-static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
- size_t *page_offset,
- size_t *length,
+static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
+ size_t *page_offset, size_t *length,
bool *last_piece)
{
struct page *page;
- switch (data->type) {
+ switch (cursor->data->type) {
case CEPH_MSG_DATA_PAGELIST:
- page = ceph_msg_data_pagelist_next(data, page_offset, length);
+ page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
break;
case CEPH_MSG_DATA_PAGES:
- page = ceph_msg_data_pages_next(data, page_offset, length);
+ page = ceph_msg_data_pages_next(cursor, page_offset, length);
break;
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
- page = ceph_msg_data_bio_next(data, page_offset, length);
+ page = ceph_msg_data_bio_next(cursor, page_offset, length);
break;
#endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE:
@@ -1039,7 +1039,7 @@ static struct page *ceph_msg_data_next(struct
ceph_msg_data *data,
BUG_ON(*page_offset + *length > PAGE_SIZE);
BUG_ON(!*length);
if (last_piece)
- *last_piece = data->cursor->last_piece;
+ *last_piece = cursor->last_piece;
return page;
}
@@ -1048,22 +1048,22 @@ static struct page *ceph_msg_data_next(struct
ceph_msg_data *data,
* Returns true if the result moves the cursor on to the next piece
* of the data item.
*/
-static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
+static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
+ size_t bytes)
{
- struct ceph_msg_data_cursor *cursor = data->cursor;
bool new_piece;
BUG_ON(bytes > cursor->resid);
- switch (data->type) {
+ switch (cursor->data->type) {
case CEPH_MSG_DATA_PAGELIST:
- new_piece = ceph_msg_data_pagelist_advance(data, bytes);
+ new_piece = ceph_msg_data_pagelist_advance(cursor, bytes);
break;
case CEPH_MSG_DATA_PAGES:
- new_piece = ceph_msg_data_pages_advance(data, bytes);
+ new_piece = ceph_msg_data_pages_advance(cursor, bytes);
break;
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
- new_piece = ceph_msg_data_bio_advance(data, bytes);
+ new_piece = ceph_msg_data_bio_advance(cursor, bytes);
break;
#endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_NONE:
@@ -1071,7 +1071,7 @@ static bool ceph_msg_data_advance(struct
ceph_msg_data *data, size_t bytes)
BUG();
break;
}
- data->cursor->need_crc = new_piece;
+ cursor->need_crc = new_piece;
return new_piece;
}
@@ -1083,7 +1083,7 @@ static void prepare_message_data(struct ceph_msg
*msg, u32 data_len)
/* Initialize data cursor */
- ceph_msg_data_cursor_init(msg->data, (size_t)data_len);
+ ceph_msg_data_cursor_init(msg, (size_t)data_len);
}
/*
@@ -1404,7 +1404,7 @@ static u32 ceph_crc32c_page(u32 crc, struct page
*page,
static int write_partial_message_data(struct ceph_connection *con)
{
struct ceph_msg *msg = con->out_msg;
- struct ceph_msg_data_cursor *cursor = msg->data->cursor;
+ struct ceph_msg_data_cursor *cursor = &msg->cursor;
bool do_datacrc = !con->msgr->nocrc;
u32 crc;
@@ -1430,7 +1430,7 @@ static int write_partial_message_data(struct
ceph_connection *con)
bool need_crc;
int ret;
- page = ceph_msg_data_next(msg->data, &page_offset, &length,
+ page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
&last_piece);
ret = ceph_tcp_sendpage(con->sock, page, page_offset,
length, last_piece);
@@ -1442,7 +1442,7 @@ static int write_partial_message_data(struct
ceph_connection *con)
}
if (do_datacrc && cursor->need_crc)
crc = ceph_crc32c_page(crc, page, page_offset, length);
- need_crc = ceph_msg_data_advance(msg->data, (size_t)ret);
+ need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret);
}
dout("%s %p msg %p done\n", __func__, con, msg);
@@ -2102,7 +2102,7 @@ static int read_partial_message_section(struct
ceph_connection *con,
static int read_partial_msg_data(struct ceph_connection *con)
{
struct ceph_msg *msg = con->in_msg;
- struct ceph_msg_data_cursor *cursor = msg->data->cursor;
+ struct ceph_msg_data_cursor *cursor = &msg->cursor;
const bool do_datacrc = !con->msgr->nocrc;
struct page *page;
size_t page_offset;
@@ -2117,7 +2117,7 @@ static int read_partial_msg_data(struct
ceph_connection *con)
if (do_datacrc)
crc = con->in_data_crc;
while (cursor->resid) {
- page = ceph_msg_data_next(msg->data, &page_offset, &length,
+ page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
NULL);
ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
if (ret <= 0) {
@@ -2129,7 +2129,7 @@ static int read_partial_msg_data(struct
ceph_connection *con)
if (do_datacrc)
crc = ceph_crc32c_page(crc, page, page_offset, ret);
- (void) ceph_msg_data_advance(msg->data, (size_t)ret);
+ (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret);
}
if (do_datacrc)
con->in_data_crc = crc;
@@ -2989,7 +2989,6 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg,
struct page **pages,
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
BUG_ON(!data);
- data->cursor = &msg->cursor;
data->pages = pages;
data->length = length;
data->alignment = alignment & ~PAGE_MASK;
@@ -3011,7 +3010,6 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
BUG_ON(!data);
- data->cursor = &msg->cursor;
data->pagelist = pagelist;
msg->data = data;
@@ -3031,7 +3029,6 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg,
struct bio *bio,
data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
BUG_ON(!data);
- data->cursor = &msg->cursor;
data->bio = bio;
data->bio_length = length;
--
1.7.9.5
^ permalink raw reply related [flat|nested] 22+ messages in thread* Re: [PATCH 3/6] libceph: have cursor point to data
2013-04-05 22:25 ` [PATCH 3/6] libceph: have cursor point to data Alex Elder
@ 2013-04-08 23:58 ` Josh Durgin
0 siblings, 0 replies; 22+ messages in thread
From: Josh Durgin @ 2013-04-08 23:58 UTC (permalink / raw)
To: Alex Elder; +Cc: ceph-devel@vger.kernel.org
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
On 04/05/2013 03:25 PM, Alex Elder wrote:
> Rather than having a ceph message data item point to the cursor it's
> associated with, have the cursor point to a data item. This will
> allow a message cursor to be used for more than one data item.
>
> Signed-off-by: Alex Elder <elder@inktank.com>
> ---
> include/linux/ceph/messenger.h | 8 +--
> net/ceph/messenger.c | 113
> +++++++++++++++++++---------------------
> 2 files changed, 59 insertions(+), 62 deletions(-)
>
> diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
> index e755724..8846ff6 100644
> --- a/include/linux/ceph/messenger.h
> +++ b/include/linux/ceph/messenger.h
> @@ -104,13 +104,13 @@ struct ceph_msg_data {
> };
> struct ceph_pagelist *pagelist;
> };
> - struct ceph_msg_data_cursor *cursor;
> };
>
> struct ceph_msg_data_cursor {
> - size_t resid; /* bytes not yet consumed */
> - bool last_piece; /* now at last piece of data item */
> - bool need_crc; /* new piece; crc update needed */
> + struct ceph_msg_data *data; /* data item this describes */
> + size_t resid; /* bytes not yet consumed */
> + bool last_piece; /* current is last piece */
> + bool need_crc; /* crc update needed */
> union {
> #ifdef CONFIG_BLOCK
> struct { /* bio */
> diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
> index 287b7fb..50b4093 100644
> --- a/net/ceph/messenger.c
> +++ b/net/ceph/messenger.c
> @@ -722,10 +722,10 @@ static void con_out_kvec_add(struct
> ceph_connection *con,
> * entry in the current bio iovec, or the first entry in the next
> * bio in the list.
> */
> -static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data,
> +static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor
> *cursor,
> size_t length)
> {
> - struct ceph_msg_data_cursor *cursor = data->cursor;
> + struct ceph_msg_data *data = cursor->data;
> struct bio *bio;
>
> BUG_ON(data->type != CEPH_MSG_DATA_BIO);
> @@ -741,11 +741,11 @@ static void ceph_msg_data_bio_cursor_init(struct
> ceph_msg_data *data,
> cursor->last_piece = length <= bio->bi_io_vec[0].bv_len;
> }
>
> -static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data,
> +static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor
> *cursor,
> size_t *page_offset,
> size_t *length)
> {
> - struct ceph_msg_data_cursor *cursor = data->cursor;
> + struct ceph_msg_data *data = cursor->data;
> struct bio *bio;
> struct bio_vec *bio_vec;
> unsigned int index;
> @@ -772,14 +772,14 @@ static struct page *ceph_msg_data_bio_next(struct
> ceph_msg_data *data,
> return bio_vec->bv_page;
> }
>
> -static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data,
> size_t bytes)
> +static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
> + size_t bytes)
> {
> - struct ceph_msg_data_cursor *cursor = data->cursor;
> struct bio *bio;
> struct bio_vec *bio_vec;
> unsigned int index;
>
> - BUG_ON(data->type != CEPH_MSG_DATA_BIO);
> + BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO);
>
> bio = cursor->bio;
> BUG_ON(!bio);
> @@ -823,10 +823,10 @@ static bool ceph_msg_data_bio_advance(struct
> ceph_msg_data *data, size_t bytes)
> * For a page array, a piece comes from the first page in the array
> * that has not already been fully consumed.
> */
> -static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data,
> +static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor
> *cursor,
> size_t length)
> {
> - struct ceph_msg_data_cursor *cursor = data->cursor;
> + struct ceph_msg_data *data = cursor->data;
> int page_count;
>
> BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
> @@ -845,11 +845,11 @@ static void ceph_msg_data_pages_cursor_init(struct
> ceph_msg_data *data,
> cursor->last_piece = (size_t)cursor->page_offset + length <= PAGE_SIZE;
> }
>
> -static struct page *ceph_msg_data_pages_next(struct ceph_msg_data *data,
> - size_t *page_offset,
> - size_t *length)
> +static struct page *
> +ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor,
> + size_t *page_offset, size_t *length)
> {
> - struct ceph_msg_data_cursor *cursor = data->cursor;
> + struct ceph_msg_data *data = cursor->data;
>
> BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
>
> @@ -865,12 +865,10 @@ static struct page
> *ceph_msg_data_pages_next(struct ceph_msg_data *data,
> return data->pages[cursor->page_index];
> }
>
> -static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
> +static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor
> *cursor,
> size_t bytes)
> {
> - struct ceph_msg_data_cursor *cursor = data->cursor;
> -
> - BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
> + BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES);
>
> BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
>
> @@ -894,10 +892,11 @@ static bool ceph_msg_data_pages_advance(struct
> ceph_msg_data *data,
> * For a pagelist, a piece is whatever remains to be consumed in the
> * first page in the list, or the front of the next page.
> */
> -static void ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data,
> +static void
> +ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor,
> size_t length)
> {
> - struct ceph_msg_data_cursor *cursor = data->cursor;
> + struct ceph_msg_data *data = cursor->data;
> struct ceph_pagelist *pagelist;
> struct page *page;
>
> @@ -919,11 +918,11 @@ static void
> ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data *data,
> cursor->last_piece = length <= PAGE_SIZE;
> }
>
> -static struct page *ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
> - size_t *page_offset,
> - size_t *length)
> +static struct page *
> +ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor,
> + size_t *page_offset, size_t *length)
> {
> - struct ceph_msg_data_cursor *cursor = data->cursor;
> + struct ceph_msg_data *data = cursor->data;
> struct ceph_pagelist *pagelist;
>
> BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
> @@ -941,13 +940,13 @@ static struct page
> *ceph_msg_data_pagelist_next(struct ceph_msg_data *data,
> else
> *length = PAGE_SIZE - *page_offset;
>
> - return data->cursor->page;
> + return cursor->page;
> }
>
> -static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data *data,
> +static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor
> *cursor,
> size_t bytes)
> {
> - struct ceph_msg_data_cursor *cursor = data->cursor;
> + struct ceph_msg_data *data = cursor->data;
> struct ceph_pagelist *pagelist;
>
> BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
> @@ -983,19 +982,21 @@ static bool ceph_msg_data_pagelist_advance(struct
> ceph_msg_data *data,
> * be processed in that piece. It also tracks whether the current
> * piece is the last one in the data item.
> */
> -static void ceph_msg_data_cursor_init(struct ceph_msg_data *data,
> - size_t length)
> +static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
> {
> - switch (data->type) {
> + struct ceph_msg_data_cursor *cursor = &msg->cursor;
> +
> + cursor->data = msg->data;
> + switch (cursor->data->type) {
> case CEPH_MSG_DATA_PAGELIST:
> - ceph_msg_data_pagelist_cursor_init(data, length);
> + ceph_msg_data_pagelist_cursor_init(cursor, length);
> break;
> case CEPH_MSG_DATA_PAGES:
> - ceph_msg_data_pages_cursor_init(data, length);
> + ceph_msg_data_pages_cursor_init(cursor, length);
> break;
> #ifdef CONFIG_BLOCK
> case CEPH_MSG_DATA_BIO:
> - ceph_msg_data_bio_cursor_init(data, length);
> + ceph_msg_data_bio_cursor_init(cursor, length);
> break;
> #endif /* CONFIG_BLOCK */
> case CEPH_MSG_DATA_NONE:
> @@ -1003,7 +1004,7 @@ static void ceph_msg_data_cursor_init(struct
> ceph_msg_data *data,
> /* BUG(); */
> break;
> }
> - data->cursor->need_crc = true;
> + cursor->need_crc = true;
> }
>
> /*
> @@ -1011,23 +1012,22 @@ static void ceph_msg_data_cursor_init(struct
> ceph_msg_data *data,
> * data item, and supply the page offset and length of that piece.
> * Indicate whether this is the last piece in this data item.
> */
> -static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
> - size_t *page_offset,
> - size_t *length,
> +static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
> + size_t *page_offset, size_t *length,
> bool *last_piece)
> {
> struct page *page;
>
> - switch (data->type) {
> + switch (cursor->data->type) {
> case CEPH_MSG_DATA_PAGELIST:
> - page = ceph_msg_data_pagelist_next(data, page_offset, length);
> + page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
> break;
> case CEPH_MSG_DATA_PAGES:
> - page = ceph_msg_data_pages_next(data, page_offset, length);
> + page = ceph_msg_data_pages_next(cursor, page_offset, length);
> break;
> #ifdef CONFIG_BLOCK
> case CEPH_MSG_DATA_BIO:
> - page = ceph_msg_data_bio_next(data, page_offset, length);
> + page = ceph_msg_data_bio_next(cursor, page_offset, length);
> break;
> #endif /* CONFIG_BLOCK */
> case CEPH_MSG_DATA_NONE:
> @@ -1039,7 +1039,7 @@ static struct page *ceph_msg_data_next(struct
> ceph_msg_data *data,
> BUG_ON(*page_offset + *length > PAGE_SIZE);
> BUG_ON(!*length);
> if (last_piece)
> - *last_piece = data->cursor->last_piece;
> + *last_piece = cursor->last_piece;
>
> return page;
> }
> @@ -1048,22 +1048,22 @@ static struct page *ceph_msg_data_next(struct
> ceph_msg_data *data,
> * Returns true if the result moves the cursor on to the next piece
> * of the data item.
> */
> -static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
> +static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
> + size_t bytes)
> {
> - struct ceph_msg_data_cursor *cursor = data->cursor;
> bool new_piece;
>
> BUG_ON(bytes > cursor->resid);
> - switch (data->type) {
> + switch (cursor->data->type) {
> case CEPH_MSG_DATA_PAGELIST:
> - new_piece = ceph_msg_data_pagelist_advance(data, bytes);
> + new_piece = ceph_msg_data_pagelist_advance(cursor, bytes);
> break;
> case CEPH_MSG_DATA_PAGES:
> - new_piece = ceph_msg_data_pages_advance(data, bytes);
> + new_piece = ceph_msg_data_pages_advance(cursor, bytes);
> break;
> #ifdef CONFIG_BLOCK
> case CEPH_MSG_DATA_BIO:
> - new_piece = ceph_msg_data_bio_advance(data, bytes);
> + new_piece = ceph_msg_data_bio_advance(cursor, bytes);
> break;
> #endif /* CONFIG_BLOCK */
> case CEPH_MSG_DATA_NONE:
> @@ -1071,7 +1071,7 @@ static bool ceph_msg_data_advance(struct
> ceph_msg_data *data, size_t bytes)
> BUG();
> break;
> }
> - data->cursor->need_crc = new_piece;
> + cursor->need_crc = new_piece;
>
> return new_piece;
> }
> @@ -1083,7 +1083,7 @@ static void prepare_message_data(struct ceph_msg
> *msg, u32 data_len)
>
> /* Initialize data cursor */
>
> - ceph_msg_data_cursor_init(msg->data, (size_t)data_len);
> + ceph_msg_data_cursor_init(msg, (size_t)data_len);
> }
>
> /*
> @@ -1404,7 +1404,7 @@ static u32 ceph_crc32c_page(u32 crc, struct page
> *page,
> static int write_partial_message_data(struct ceph_connection *con)
> {
> struct ceph_msg *msg = con->out_msg;
> - struct ceph_msg_data_cursor *cursor = msg->data->cursor;
> + struct ceph_msg_data_cursor *cursor = &msg->cursor;
> bool do_datacrc = !con->msgr->nocrc;
> u32 crc;
>
> @@ -1430,7 +1430,7 @@ static int write_partial_message_data(struct
> ceph_connection *con)
> bool need_crc;
> int ret;
>
> - page = ceph_msg_data_next(msg->data, &page_offset, &length,
> + page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
> &last_piece);
> ret = ceph_tcp_sendpage(con->sock, page, page_offset,
> length, last_piece);
> @@ -1442,7 +1442,7 @@ static int write_partial_message_data(struct
> ceph_connection *con)
> }
> if (do_datacrc && cursor->need_crc)
> crc = ceph_crc32c_page(crc, page, page_offset, length);
> - need_crc = ceph_msg_data_advance(msg->data, (size_t)ret);
> + need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret);
> }
>
> dout("%s %p msg %p done\n", __func__, con, msg);
> @@ -2102,7 +2102,7 @@ static int read_partial_message_section(struct
> ceph_connection *con,
> static int read_partial_msg_data(struct ceph_connection *con)
> {
> struct ceph_msg *msg = con->in_msg;
> - struct ceph_msg_data_cursor *cursor = msg->data->cursor;
> + struct ceph_msg_data_cursor *cursor = &msg->cursor;
> const bool do_datacrc = !con->msgr->nocrc;
> struct page *page;
> size_t page_offset;
> @@ -2117,7 +2117,7 @@ static int read_partial_msg_data(struct
> ceph_connection *con)
> if (do_datacrc)
> crc = con->in_data_crc;
> while (cursor->resid) {
> - page = ceph_msg_data_next(msg->data, &page_offset, &length,
> + page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
> NULL);
> ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
> if (ret <= 0) {
> @@ -2129,7 +2129,7 @@ static int read_partial_msg_data(struct
> ceph_connection *con)
>
> if (do_datacrc)
> crc = ceph_crc32c_page(crc, page, page_offset, ret);
> - (void) ceph_msg_data_advance(msg->data, (size_t)ret);
> + (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret);
> }
> if (do_datacrc)
> con->in_data_crc = crc;
> @@ -2989,7 +2989,6 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg,
> struct page **pages,
>
> data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
> BUG_ON(!data);
> - data->cursor = &msg->cursor;
> data->pages = pages;
> data->length = length;
> data->alignment = alignment & ~PAGE_MASK;
> @@ -3011,7 +3010,6 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
>
> data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
> BUG_ON(!data);
> - data->cursor = &msg->cursor;
> data->pagelist = pagelist;
>
> msg->data = data;
> @@ -3031,7 +3029,6 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg,
> struct bio *bio,
>
> data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
> BUG_ON(!data);
> - data->cursor = &msg->cursor;
> data->bio = bio;
> data->bio_length = length;
>
^ permalink raw reply [flat|nested] 22+ messages in thread
* [PATCH 4/6] libceph: replace message data pointer with list
2013-04-05 22:24 [PATCH 0/6] Alex Elder
` (2 preceding siblings ...)
2013-04-05 22:25 ` [PATCH 3/6] libceph: have cursor point to data Alex Elder
@ 2013-04-05 22:26 ` Alex Elder
2013-04-09 0:04 ` Josh Durgin
2013-04-05 22:26 ` [PATCH 5/6] libceph: implement multiple data items in a message Alex Elder
2013-04-05 22:26 ` [PATCH 6/6] libceph: add, don't set data for " Alex Elder
5 siblings, 1 reply; 22+ messages in thread
From: Alex Elder @ 2013-04-05 22:26 UTC (permalink / raw)
To: ceph-devel@vger.kernel.org
In place of the message data pointer, use a list head which links
through message data items. For now we only support a single entry
on that list.
Signed-off-by: Alex Elder <elder@inktank.com>
---
include/linux/ceph/messenger.h | 3 ++-
net/ceph/messenger.c | 46
+++++++++++++++++++++++++++-------------
2 files changed, 33 insertions(+), 16 deletions(-)
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 8846ff6..318da01 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -89,6 +89,7 @@ static __inline__ bool ceph_msg_data_type_valid(enum
ceph_msg_data_type type)
}
struct ceph_msg_data {
+ struct list_head links; /* ceph_msg->data */
enum ceph_msg_data_type type;
union {
#ifdef CONFIG_BLOCK
@@ -143,7 +144,7 @@ struct ceph_msg {
struct ceph_buffer *middle;
size_t data_length;
- struct ceph_msg_data *data;
+ struct list_head data;
struct ceph_msg_data_cursor cursor;
struct ceph_connection *con;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 50b4093..9ce667e 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -985,8 +985,10 @@ static bool ceph_msg_data_pagelist_advance(struct
ceph_msg_data_cursor *cursor,
static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
{
struct ceph_msg_data_cursor *cursor = &msg->cursor;
+ struct ceph_msg_data *data;
- cursor->data = msg->data;
+ data = list_first_entry(&msg->data, struct ceph_msg_data, links);
+ cursor->data = data;
switch (cursor->data->type) {
case CEPH_MSG_DATA_PAGELIST:
ceph_msg_data_pagelist_cursor_init(cursor, length);
@@ -1410,7 +1412,7 @@ static int write_partial_message_data(struct
ceph_connection *con)
dout("%s %p msg %p\n", __func__, con, msg);
- if (WARN_ON(!msg->data))
+ if (list_empty(&msg->data))
return -EINVAL;
/*
@@ -2111,7 +2113,7 @@ static int read_partial_msg_data(struct
ceph_connection *con)
int ret;
BUG_ON(!msg);
- if (!msg->data)
+ if (list_empty(&msg->data))
return -EIO;
if (do_datacrc)
@@ -2961,6 +2963,7 @@ static struct ceph_msg_data
*ceph_msg_data_create(enum ceph_msg_data_type type)
data = kzalloc(sizeof (*data), GFP_NOFS);
if (data)
data->type = type;
+ INIT_LIST_HEAD(&data->links);
return data;
}
@@ -2970,6 +2973,7 @@ static void ceph_msg_data_destroy(struct
ceph_msg_data *data)
if (!data)
return;
+ WARN_ON(!list_empty(&data->links));
if (data->type == CEPH_MSG_DATA_PAGELIST) {
ceph_pagelist_release(data->pagelist);
kfree(data->pagelist);
@@ -2985,7 +2989,7 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg,
struct page **pages,
BUG_ON(!pages);
BUG_ON(!length);
BUG_ON(msg->data_length);
- BUG_ON(msg->data != NULL);
+ BUG_ON(!list_empty(&msg->data));
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
BUG_ON(!data);
@@ -2993,8 +2997,9 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg,
struct page **pages,
data->length = length;
data->alignment = alignment & ~PAGE_MASK;
- msg->data = data;
- msg->data_length = length;
+ BUG_ON(!list_empty(&msg->data));
+ list_add_tail(&data->links, &msg->data);
+ msg->data_length += length;
}
EXPORT_SYMBOL(ceph_msg_data_set_pages);
@@ -3006,14 +3011,14 @@ void ceph_msg_data_set_pagelist(struct ceph_msg
*msg,
BUG_ON(!pagelist);
BUG_ON(!pagelist->length);
BUG_ON(msg->data_length);
- BUG_ON(msg->data != NULL);
+ BUG_ON(!list_empty(&msg->data));
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
BUG_ON(!data);
data->pagelist = pagelist;
- msg->data = data;
- msg->data_length = pagelist->length;
+ list_add_tail(&data->links, &msg->data);
+ msg->data_length += pagelist->length;
}
EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
@@ -3025,15 +3030,15 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg,
struct bio *bio,
BUG_ON(!bio);
BUG_ON(msg->data_length);
- BUG_ON(msg->data != NULL);
+ BUG_ON(!list_empty(&msg->data));
data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
BUG_ON(!data);
data->bio = bio;
data->bio_length = length;
- msg->data = data;
- msg->data_length = length;
+ list_add_tail(&data->links, &msg->data);
+ msg->data_length += length;
}
EXPORT_SYMBOL(ceph_msg_data_set_bio);
#endif /* CONFIG_BLOCK */
@@ -3057,6 +3062,7 @@ struct ceph_msg *ceph_msg_new(int type, int
front_len, gfp_t flags,
INIT_LIST_HEAD(&m->list_head);
kref_init(&m->kref);
+ INIT_LIST_HEAD(&m->data);
/* front */
m->front_max = front_len;
@@ -3202,6 +3208,9 @@ void ceph_msg_kfree(struct ceph_msg *m)
void ceph_msg_last_put(struct kref *kref)
{
struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
+ LIST_HEAD(data);
+ struct list_head *links;
+ struct list_head *next;
dout("ceph_msg_put last one on %p\n", m);
WARN_ON(!list_empty(&m->list_head));
@@ -3211,8 +3220,15 @@ void ceph_msg_last_put(struct kref *kref)
ceph_buffer_put(m->middle);
m->middle = NULL;
}
- ceph_msg_data_destroy(m->data);
- m->data = NULL;
+
+ list_splice_init(&m->data, &data);
+ list_for_each_safe(links, next, &data) {
+ struct ceph_msg_data *data;
+
+ data = list_entry(links, struct ceph_msg_data, links);
+ list_del_init(links);
+ ceph_msg_data_destroy(data);
+ }
m->data_length = 0;
if (m->pool)
@@ -3225,7 +3241,7 @@ EXPORT_SYMBOL(ceph_msg_last_put);
void ceph_msg_dump(struct ceph_msg *msg)
{
pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
- msg->front_max, msg->data->length);
+ msg->front_max, msg->data_length);
print_hex_dump(KERN_DEBUG, "header: ",
DUMP_PREFIX_OFFSET, 16, 1,
&msg->hdr, sizeof(msg->hdr), true);
--
1.7.9.5
^ permalink raw reply related [flat|nested] 22+ messages in thread* Re: [PATCH 4/6] libceph: replace message data pointer with list
2013-04-05 22:26 ` [PATCH 4/6] libceph: replace message data pointer with list Alex Elder
@ 2013-04-09 0:04 ` Josh Durgin
0 siblings, 0 replies; 22+ messages in thread
From: Josh Durgin @ 2013-04-09 0:04 UTC (permalink / raw)
To: Alex Elder; +Cc: ceph-devel@vger.kernel.org
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
On 04/05/2013 03:26 PM, Alex Elder wrote:
> In place of the message data pointer, use a list head which links
> through message data items. For now we only support a single entry
> on that list.
>
> Signed-off-by: Alex Elder <elder@inktank.com>
> ---
> include/linux/ceph/messenger.h | 3 ++-
> net/ceph/messenger.c | 46
> +++++++++++++++++++++++++++-------------
> 2 files changed, 33 insertions(+), 16 deletions(-)
>
> diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
> index 8846ff6..318da01 100644
> --- a/include/linux/ceph/messenger.h
> +++ b/include/linux/ceph/messenger.h
> @@ -89,6 +89,7 @@ static __inline__ bool ceph_msg_data_type_valid(enum
> ceph_msg_data_type type)
> }
>
> struct ceph_msg_data {
> + struct list_head links; /* ceph_msg->data */
> enum ceph_msg_data_type type;
> union {
> #ifdef CONFIG_BLOCK
> @@ -143,7 +144,7 @@ struct ceph_msg {
> struct ceph_buffer *middle;
>
> size_t data_length;
> - struct ceph_msg_data *data;
> + struct list_head data;
> struct ceph_msg_data_cursor cursor;
>
> struct ceph_connection *con;
> diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
> index 50b4093..9ce667e 100644
> --- a/net/ceph/messenger.c
> +++ b/net/ceph/messenger.c
> @@ -985,8 +985,10 @@ static bool ceph_msg_data_pagelist_advance(struct
> ceph_msg_data_cursor *cursor,
> static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
> {
> struct ceph_msg_data_cursor *cursor = &msg->cursor;
> + struct ceph_msg_data *data;
>
> - cursor->data = msg->data;
> + data = list_first_entry(&msg->data, struct ceph_msg_data, links);
> + cursor->data = data;
> switch (cursor->data->type) {
> case CEPH_MSG_DATA_PAGELIST:
> ceph_msg_data_pagelist_cursor_init(cursor, length);
> @@ -1410,7 +1412,7 @@ static int write_partial_message_data(struct
> ceph_connection *con)
>
> dout("%s %p msg %p\n", __func__, con, msg);
>
> - if (WARN_ON(!msg->data))
> + if (list_empty(&msg->data))
> return -EINVAL;
>
> /*
> @@ -2111,7 +2113,7 @@ static int read_partial_msg_data(struct
> ceph_connection *con)
> int ret;
>
> BUG_ON(!msg);
> - if (!msg->data)
> + if (list_empty(&msg->data))
> return -EIO;
>
> if (do_datacrc)
> @@ -2961,6 +2963,7 @@ static struct ceph_msg_data
> *ceph_msg_data_create(enum ceph_msg_data_type type)
> data = kzalloc(sizeof (*data), GFP_NOFS);
> if (data)
> data->type = type;
> + INIT_LIST_HEAD(&data->links);
>
> return data;
> }
> @@ -2970,6 +2973,7 @@ static void ceph_msg_data_destroy(struct
> ceph_msg_data *data)
> if (!data)
> return;
>
> + WARN_ON(!list_empty(&data->links));
> if (data->type == CEPH_MSG_DATA_PAGELIST) {
> ceph_pagelist_release(data->pagelist);
> kfree(data->pagelist);
> @@ -2985,7 +2989,7 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg,
> struct page **pages,
> BUG_ON(!pages);
> BUG_ON(!length);
> BUG_ON(msg->data_length);
> - BUG_ON(msg->data != NULL);
> + BUG_ON(!list_empty(&msg->data));
>
> data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
> BUG_ON(!data);
> @@ -2993,8 +2997,9 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg,
> struct page **pages,
> data->length = length;
> data->alignment = alignment & ~PAGE_MASK;
>
> - msg->data = data;
> - msg->data_length = length;
> + BUG_ON(!list_empty(&msg->data));
> + list_add_tail(&data->links, &msg->data);
> + msg->data_length += length;
> }
> EXPORT_SYMBOL(ceph_msg_data_set_pages);
>
> @@ -3006,14 +3011,14 @@ void ceph_msg_data_set_pagelist(struct ceph_msg
> *msg,
> BUG_ON(!pagelist);
> BUG_ON(!pagelist->length);
> BUG_ON(msg->data_length);
> - BUG_ON(msg->data != NULL);
> + BUG_ON(!list_empty(&msg->data));
>
> data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
> BUG_ON(!data);
> data->pagelist = pagelist;
>
> - msg->data = data;
> - msg->data_length = pagelist->length;
> + list_add_tail(&data->links, &msg->data);
> + msg->data_length += pagelist->length;
> }
> EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
>
> @@ -3025,15 +3030,15 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg,
> struct bio *bio,
>
> BUG_ON(!bio);
> BUG_ON(msg->data_length);
> - BUG_ON(msg->data != NULL);
> + BUG_ON(!list_empty(&msg->data));
>
> data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
> BUG_ON(!data);
> data->bio = bio;
> data->bio_length = length;
>
> - msg->data = data;
> - msg->data_length = length;
> + list_add_tail(&data->links, &msg->data);
> + msg->data_length += length;
> }
> EXPORT_SYMBOL(ceph_msg_data_set_bio);
> #endif /* CONFIG_BLOCK */
> @@ -3057,6 +3062,7 @@ struct ceph_msg *ceph_msg_new(int type, int
> front_len, gfp_t flags,
>
> INIT_LIST_HEAD(&m->list_head);
> kref_init(&m->kref);
> + INIT_LIST_HEAD(&m->data);
>
> /* front */
> m->front_max = front_len;
> @@ -3202,6 +3208,9 @@ void ceph_msg_kfree(struct ceph_msg *m)
> void ceph_msg_last_put(struct kref *kref)
> {
> struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
> + LIST_HEAD(data);
> + struct list_head *links;
> + struct list_head *next;
>
> dout("ceph_msg_put last one on %p\n", m);
> WARN_ON(!list_empty(&m->list_head));
> @@ -3211,8 +3220,15 @@ void ceph_msg_last_put(struct kref *kref)
> ceph_buffer_put(m->middle);
> m->middle = NULL;
> }
> - ceph_msg_data_destroy(m->data);
> - m->data = NULL;
> +
> + list_splice_init(&m->data, &data);
> + list_for_each_safe(links, next, &data) {
> + struct ceph_msg_data *data;
> +
> + data = list_entry(links, struct ceph_msg_data, links);
> + list_del_init(links);
> + ceph_msg_data_destroy(data);
> + }
> m->data_length = 0;
>
> if (m->pool)
> @@ -3225,7 +3241,7 @@ EXPORT_SYMBOL(ceph_msg_last_put);
> void ceph_msg_dump(struct ceph_msg *msg)
> {
> pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
> - msg->front_max, msg->data->length);
> + msg->front_max, msg->data_length);
> print_hex_dump(KERN_DEBUG, "header: ",
> DUMP_PREFIX_OFFSET, 16, 1,
> &msg->hdr, sizeof(msg->hdr), true);
>
^ permalink raw reply [flat|nested] 22+ messages in thread
* [PATCH 5/6] libceph: implement multiple data items in a message
2013-04-05 22:24 [PATCH 0/6] Alex Elder
` (3 preceding siblings ...)
2013-04-05 22:26 ` [PATCH 4/6] libceph: replace message data pointer with list Alex Elder
@ 2013-04-05 22:26 ` Alex Elder
2013-04-05 23:36 ` [PATCH 5/6, v2] " Alex Elder
2013-04-05 22:26 ` [PATCH 6/6] libceph: add, don't set data for " Alex Elder
5 siblings, 1 reply; 22+ messages in thread
From: Alex Elder @ 2013-04-05 22:26 UTC (permalink / raw)
To: ceph-devel@vger.kernel.org
This patch adds support to the messenger for more than one data item
in its data list.
A message data cursor has two more fields to support this:
- a count of the number of bytes left to be consumed across
all data items in the list, "total_resid"
- a pointer to the head of the list (for validation only)
The cursor initialization routine has been split into two parts: the
outer one, which initializes the cursor for traversing the entire
list of data items; and the inner one, which initializes the cursor
to start processing a single data item.
When a message cursor is first initialized, the outer initialization
routine sets total_resid to the length provided. The data pointer
is initialized to the first data item on the list. From there, the
inner initialization routine finishes by setting up to process the
data item the cursor points to.
Advancing the cursor consumes bytes in total_resid. If the resid
field reaches zero, it means the current data item is fully
consumed. If total_resid indicates there is more data, the cursor
is advanced to point to the next data item, and then the inner
initialization routine prepares for using that. (A check is made at
this point to make sure we don't wrap around the front of the list.)
The type-specific init routines are modified so they can be given a
length that's larger than what the data item can support. The resid
field is initialized to the smaller of the provided length and the
length of the entire data item.
When total_resid reaches zero, we're done.
This resolves:
http://tracker.ceph.com/issues/3761
Signed-off-by: Alex Elder <elder@inktank.com>
---
include/linux/ceph/messenger.h | 5 ++++-
net/ceph/messenger.c | 42
++++++++++++++++++++++++++++++----------
2 files changed, 36 insertions(+), 11 deletions(-)
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 318da01..de1d2e1 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -108,7 +108,10 @@ struct ceph_msg_data {
};
struct ceph_msg_data_cursor {
- struct ceph_msg_data *data; /* data item this describes */
+ size_t total_resid; /* across all data items */
+ struct list_head *data_head; /* = &ceph_msg->data */
+
+ struct ceph_msg_data *data; /* current data item */
size_t resid; /* bytes not yet consumed */
bool last_piece; /* current is last piece */
bool need_crc; /* crc update needed */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 9ce667e..3f78944 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -734,7 +734,7 @@ static void ceph_msg_data_bio_cursor_init(struct
ceph_msg_data_cursor *cursor,
BUG_ON(!bio);
BUG_ON(!bio->bi_vcnt);
- cursor->resid = length;
+ cursor->resid = min(length, data->bio_length);
cursor->bio = bio;
cursor->vector_index = 0;
cursor->vector_offset = 0;
@@ -833,9 +833,8 @@ static void ceph_msg_data_pages_cursor_init(struct
ceph_msg_data_cursor *cursor,
BUG_ON(!data->pages);
BUG_ON(!data->length);
- BUG_ON(length > data->length); /* short reads are OK */
- cursor->resid = length;
+ cursor->resid = min(length, data->length);
page_count = calc_pages_for(data->alignment, (u64)data->length);
cursor->page_offset = data->alignment & ~PAGE_MASK;
cursor->page_index = 0;
@@ -904,7 +903,6 @@ ceph_msg_data_pagelist_cursor_init(struct
ceph_msg_data_cursor *cursor,
pagelist = data->pagelist;
BUG_ON(!pagelist);
- BUG_ON(length > pagelist->length); /* short reads are OK */
if (!length)
return; /* pagelist can be assigned but empty */
@@ -912,7 +910,7 @@ ceph_msg_data_pagelist_cursor_init(struct
ceph_msg_data_cursor *cursor,
BUG_ON(list_empty(&pagelist->head));
page = list_first_entry(&pagelist->head, struct page, lru);
- cursor->resid = length;
+ cursor->resid = min(length, pagelist->length);
cursor->page = page;
cursor->offset = 0;
cursor->last_piece = length <= PAGE_SIZE;
@@ -982,13 +980,10 @@ static bool ceph_msg_data_pagelist_advance(struct
ceph_msg_data_cursor *cursor,
* be processed in that piece. It also tracks whether the current
* piece is the last one in the data item.
*/
-static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
+static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor
*cursor)
{
- struct ceph_msg_data_cursor *cursor = &msg->cursor;
- struct ceph_msg_data *data;
+ size_t length = cursor->total_resid;
- data = list_first_entry(&msg->data, struct ceph_msg_data, links);
- cursor->data = data;
switch (cursor->data->type) {
case CEPH_MSG_DATA_PAGELIST:
ceph_msg_data_pagelist_cursor_init(cursor, length);
@@ -1009,6 +1004,25 @@ static void ceph_msg_data_cursor_init(struct
ceph_msg *msg, size_t length)
cursor->need_crc = true;
}
+static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
+{
+ struct ceph_msg_data_cursor *cursor = &msg->cursor;
+ struct ceph_msg_data *data;
+
+ BUG_ON(!length);
+ BUG_ON(length > msg->data_length);
+ BUG_ON(list_empty(&msg->data));
+
+ cursor->total_resid = length;
+ data = list_first_entry(&msg->data, struct ceph_msg_data, links);
+
+ cursor->data_head = &msg->data;
+ cursor->data = data;
+ cursor->total_resid = msg->data_length;
+
+ __ceph_msg_data_cursor_init(cursor);
+}
+
/*
* Return the page containing the next piece to process for a given
* data item, and supply the page offset and length of that piece.
@@ -1073,8 +1087,16 @@ static bool ceph_msg_data_advance(struct
ceph_msg_data_cursor *cursor,
BUG();
break;
}
+ cursor->total_resid -= bytes;
cursor->need_crc = new_piece;
+ if (!cursor->resid && cursor->total_resid) {
+ WARN_ON(!new_piece);
+ BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
+ cursor->data = list_entry_next(cursor->data, links);
+ __ceph_msg_data_cursor_init(cursor);
+ }
+
return new_piece;
}
--
1.7.9.5
^ permalink raw reply related [flat|nested] 22+ messages in thread* [PATCH 5/6, v2] libceph: implement multiple data items in a message
2013-04-05 22:26 ` [PATCH 5/6] libceph: implement multiple data items in a message Alex Elder
@ 2013-04-05 23:36 ` Alex Elder
2013-04-08 23:48 ` [PATCH 5/6, v3] " Alex Elder
0 siblings, 1 reply; 22+ messages in thread
From: Alex Elder @ 2013-04-05 23:36 UTC (permalink / raw)
To: ceph-devel@vger.kernel.org
I found a problem in the first version of this.
ceph_msg_data_cursor_init() was mistakenly initializing
total_resid a second time, to the wrong value. We want
to use the passed-in length, not the length of the data
available. For write requests they're the same, but
read requests can be short, so this was wrong.
-Alex
This patch adds support to the messenger for more than one data item
in its data list.
A message data cursor has two more fields to support this:
- a count of the number of bytes left to be consumed across
all data items in the list, "total_resid"
- a pointer to the head of the list (for validation only)
The cursor initialization routine has been split into two parts: the
outer one, which initializes the cursor for traversing the entire
list of data items; and the inner one, which initializes the cursor
to start processing a single data item.
When a message cursor is first initialized, the outer initialization
routine sets total_resid to the length provided. The data pointer
is initialized to the first data item on the list. From there, the
inner initialization routine finishes by setting up to process the
data item the cursor points to.
Advancing the cursor consumes bytes in total_resid. If the resid
field reaches zero, it means the current data item is fully
consumed. If total_resid indicates there is more data, the cursor
is advanced to point to the next data item, and then the inner
initialization routine prepares for using that. (A check is made at
this point to make sure we don't wrap around the front of the list.)
The type-specific init routines are modified so they can be given a
length that's larger than what the data item can support. The resid
field is initialized to the smaller of the provided length and the
length of the entire data item.
When total_resid reaches zero, we're done.
This resolves:
http://tracker.ceph.com/issues/3761
Signed-off-by: Alex Elder <elder@inktank.com>
---
v2: set total_resid to the passed-in length when initializing cursor
include/linux/ceph/messenger.h | 5 ++++-
net/ceph/messenger.c | 42
++++++++++++++++++++++++++++++----------
2 files changed, 36 insertions(+), 11 deletions(-)
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 318da01..de1d2e1 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -108,7 +108,10 @@ struct ceph_msg_data {
};
struct ceph_msg_data_cursor {
- struct ceph_msg_data *data; /* data item this describes */
+ size_t total_resid; /* across all data items */
+ struct list_head *data_head; /* = &ceph_msg->data */
+
+ struct ceph_msg_data *data; /* current data item */
size_t resid; /* bytes not yet consumed */
bool last_piece; /* current is last piece */
bool need_crc; /* crc update needed */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 9ce667e..bd1d804 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -734,7 +734,7 @@ static void ceph_msg_data_bio_cursor_init(struct
ceph_msg_data_cursor *cursor,
BUG_ON(!bio);
BUG_ON(!bio->bi_vcnt);
- cursor->resid = length;
+ cursor->resid = min(length, data->bio_length);
cursor->bio = bio;
cursor->vector_index = 0;
cursor->vector_offset = 0;
@@ -833,9 +833,8 @@ static void ceph_msg_data_pages_cursor_init(struct
ceph_msg_data_cursor *cursor,
BUG_ON(!data->pages);
BUG_ON(!data->length);
- BUG_ON(length > data->length); /* short reads are OK */
- cursor->resid = length;
+ cursor->resid = min(length, data->length);
page_count = calc_pages_for(data->alignment, (u64)data->length);
cursor->page_offset = data->alignment & ~PAGE_MASK;
cursor->page_index = 0;
@@ -904,7 +903,6 @@ ceph_msg_data_pagelist_cursor_init(struct
ceph_msg_data_cursor *cursor,
pagelist = data->pagelist;
BUG_ON(!pagelist);
- BUG_ON(length > pagelist->length); /* short reads are OK */
if (!length)
return; /* pagelist can be assigned but empty */
@@ -912,7 +910,7 @@ ceph_msg_data_pagelist_cursor_init(struct
ceph_msg_data_cursor *cursor,
BUG_ON(list_empty(&pagelist->head));
page = list_first_entry(&pagelist->head, struct page, lru);
- cursor->resid = length;
+ cursor->resid = min(length, pagelist->length);
cursor->page = page;
cursor->offset = 0;
cursor->last_piece = length <= PAGE_SIZE;
@@ -982,13 +980,10 @@ static bool ceph_msg_data_pagelist_advance(struct
ceph_msg_data_cursor *cursor,
* be processed in that piece. It also tracks whether the current
* piece is the last one in the data item.
*/
-static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
+static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor
*cursor)
{
- struct ceph_msg_data_cursor *cursor = &msg->cursor;
- struct ceph_msg_data *data;
+ size_t length = cursor->total_resid;
- data = list_first_entry(&msg->data, struct ceph_msg_data, links);
- cursor->data = data;
switch (cursor->data->type) {
case CEPH_MSG_DATA_PAGELIST:
ceph_msg_data_pagelist_cursor_init(cursor, length);
@@ -1009,6 +1004,25 @@ static void ceph_msg_data_cursor_init(struct
ceph_msg *msg, size_t length)
cursor->need_crc = true;
}
+static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
+{
+ struct ceph_msg_data_cursor *cursor = &msg->cursor;
+ struct ceph_msg_data *data;
+
+ BUG_ON(!length);
+ BUG_ON(length > msg->data_length);
+ BUG_ON(list_empty(&msg->data));
+
+ data = list_first_entry(&msg->data, struct ceph_msg_data, links);
+
+ cursor->data_head = &msg->data;
+ cursor->total_resid = length;
+ data = list_first_entry(&msg->data, struct ceph_msg_data, links);
+ cursor->data = data;
+
+ __ceph_msg_data_cursor_init(cursor);
+}
+
/*
* Return the page containing the next piece to process for a given
* data item, and supply the page offset and length of that piece.
@@ -1073,8 +1087,16 @@ static bool ceph_msg_data_advance(struct
ceph_msg_data_cursor *cursor,
BUG();
break;
}
+ cursor->total_resid -= bytes;
cursor->need_crc = new_piece;
+ if (!cursor->resid && cursor->total_resid) {
+ WARN_ON(!new_piece);
+ BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
+ cursor->data = list_entry_next(cursor->data, links);
+ __ceph_msg_data_cursor_init(cursor);
+ }
+
return new_piece;
}
--
1.7.9.5
^ permalink raw reply related [flat|nested] 22+ messages in thread* [PATCH 5/6, v3] libceph: implement multiple data items in a message
2013-04-05 23:36 ` [PATCH 5/6, v2] " Alex Elder
@ 2013-04-08 23:48 ` Alex Elder
2013-04-09 0:27 ` Josh Durgin
0 siblings, 1 reply; 22+ messages in thread
From: Alex Elder @ 2013-04-08 23:48 UTC (permalink / raw)
To: ceph-devel@vger.kernel.org; +Cc: Josh Durgin
Sorry, I've found another problem with this patch.
I've updated it below, and have updated the patches
that followed it. The result is available in the
"review/wip-3761-5" branch of the ceph-client git
repository.
Previously there were assertions that caught attempts
to add more than one data item to a message. Those
need to go away, now that we support multiple
data items.
This patch adds support to the messenger for more than one data item
in its data list.
A message data cursor has two more fields to support this:
- a count of the number of bytes left to be consumed across
all data items in the list, "total_resid"
- a pointer to the head of the list (for validation only)
The cursor initialization routine has been split into two parts: the
outer one, which initializes the cursor for traversing the entire
list of data items; and the inner one, which initializes the cursor
to start processing a single data item.
When a message cursor is first initialized, the outer initialization
routine sets total_resid to the length provided. The data pointer
is initialized to the first data item on the list. From there, the
inner initialization routine finishes by setting up to process the
data item the cursor points to.
Advancing the cursor consumes bytes in total_resid. If the resid
field reaches zero, it means the current data item is fully
consumed. If total_resid indicates there is more data, the cursor
is advanced to point to the next data item, and then the inner
initialization routine prepares for using that. (A check is made at
this point to make sure we don't wrap around the front of the list.)
The type-specific init routines are modified so they can be given a
length that's larger than what the data item can support. The resid
field is initialized to the smaller of the provided length and the
length of the entire data item.
When total_resid reaches zero, we're done.
This resolves:
http://tracker.ceph.com/issues/3761
Signed-off-by: Alex Elder <elder@inktank.com>
---
v2: set total_resid to the passed-in length when initializing cursor
v3: turn off single data item assertions; it's ok now
include/linux/ceph/messenger.h | 5 ++++-
net/ceph/messenger.c | 48
++++++++++++++++++++++++++--------------
2 files changed, 36 insertions(+), 17 deletions(-)
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 318da01..de1d2e1 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -108,7 +108,10 @@ struct ceph_msg_data {
};
struct ceph_msg_data_cursor {
- struct ceph_msg_data *data; /* data item this describes */
+ size_t total_resid; /* across all data items */
+ struct list_head *data_head; /* = &ceph_msg->data */
+
+ struct ceph_msg_data *data; /* current data item */
size_t resid; /* bytes not yet consumed */
bool last_piece; /* current is last piece */
bool need_crc; /* crc update needed */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 8bfe7d3..84703e5 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -734,7 +734,7 @@ static void ceph_msg_data_bio_cursor_init(struct
ceph_msg_data_cursor *cursor,
BUG_ON(!bio);
BUG_ON(!bio->bi_vcnt);
- cursor->resid = length;
+ cursor->resid = min(length, data->bio_length);
cursor->bio = bio;
cursor->vector_index = 0;
cursor->vector_offset = 0;
@@ -833,9 +833,8 @@ static void ceph_msg_data_pages_cursor_init(struct
ceph_msg_data_cursor *cursor,
BUG_ON(!data->pages);
BUG_ON(!data->length);
- BUG_ON(length > data->length); /* short reads are OK */
- cursor->resid = length;
+ cursor->resid = min(length, data->length);
page_count = calc_pages_for(data->alignment, (u64)data->length);
cursor->page_offset = data->alignment & ~PAGE_MASK;
cursor->page_index = 0;
@@ -904,7 +903,6 @@ ceph_msg_data_pagelist_cursor_init(struct
ceph_msg_data_cursor *cursor,
pagelist = data->pagelist;
BUG_ON(!pagelist);
- BUG_ON(length > pagelist->length); /* short reads are OK */
if (!length)
return; /* pagelist can be assigned but empty */
@@ -912,7 +910,7 @@ ceph_msg_data_pagelist_cursor_init(struct
ceph_msg_data_cursor *cursor,
BUG_ON(list_empty(&pagelist->head));
page = list_first_entry(&pagelist->head, struct page, lru);
- cursor->resid = length;
+ cursor->resid = min(length, pagelist->length);
cursor->page = page;
cursor->offset = 0;
cursor->last_piece = length <= PAGE_SIZE;
@@ -982,13 +980,10 @@ static bool ceph_msg_data_pagelist_advance(struct
ceph_msg_data_cursor *cursor,
* be processed in that piece. It also tracks whether the current
* piece is the last one in the data item.
*/
-static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
+static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor
*cursor)
{
- struct ceph_msg_data_cursor *cursor = &msg->cursor;
- struct ceph_msg_data *data;
+ size_t length = cursor->total_resid;
- data = list_first_entry(&msg->data, struct ceph_msg_data, links);
- cursor->data = data;
switch (cursor->data->type) {
case CEPH_MSG_DATA_PAGELIST:
ceph_msg_data_pagelist_cursor_init(cursor, length);
@@ -1009,6 +1004,25 @@ static void ceph_msg_data_cursor_init(struct
ceph_msg *msg, size_t length)
cursor->need_crc = true;
}
+static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
+{
+ struct ceph_msg_data_cursor *cursor = &msg->cursor;
+ struct ceph_msg_data *data;
+
+ BUG_ON(!length);
+ BUG_ON(length > msg->data_length);
+ BUG_ON(list_empty(&msg->data));
+
+ data = list_first_entry(&msg->data, struct ceph_msg_data, links);
+
+ cursor->data_head = &msg->data;
+ cursor->total_resid = length;
+ data = list_first_entry(&msg->data, struct ceph_msg_data, links);
+ cursor->data = data;
+
+ __ceph_msg_data_cursor_init(cursor);
+}
+
/*
* Return the page containing the next piece to process for a given
* data item, and supply the page offset and length of that piece.
@@ -1073,8 +1087,16 @@ static bool ceph_msg_data_advance(struct
ceph_msg_data_cursor *cursor,
BUG();
break;
}
+ cursor->total_resid -= bytes;
cursor->need_crc = new_piece;
+ if (!cursor->resid && cursor->total_resid) {
+ WARN_ON(!cursor->last_piece);
+ BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
+ cursor->data = list_entry_next(cursor->data, links);
+ __ceph_msg_data_cursor_init(cursor);
+ }
+
return new_piece;
}
@@ -2990,8 +3012,6 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg,
struct page **pages,
BUG_ON(!pages);
BUG_ON(!length);
- BUG_ON(msg->data_length);
- BUG_ON(!list_empty(&msg->data));
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
BUG_ON(!data);
@@ -3012,8 +3032,6 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
BUG_ON(!pagelist);
BUG_ON(!pagelist->length);
- BUG_ON(msg->data_length);
- BUG_ON(!list_empty(&msg->data));
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
BUG_ON(!data);
@@ -3031,8 +3049,6 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg,
struct bio *bio,
struct ceph_msg_data *data;
BUG_ON(!bio);
- BUG_ON(msg->data_length);
- BUG_ON(!list_empty(&msg->data));
data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
BUG_ON(!data);
--
1.7.9.5
On 04/05/2013 06:36 PM, Alex Elder wrote:
> I found a problem in the first version of this.
>
> ceph_msg_data_cursor_init() was mistakenly initializing
> total_resid a second time, to the wrong value. We want
> to use the passed-in length, not the length of the data
> available. For write requests they're the same, but
> read requests can be short, so this was wrong.
>
> -Alex
>
>
> This patch adds support to the messenger for more than one data item
> in its data list.
>
> A message data cursor has two more fields to support this:
> - a count of the number of bytes left to be consumed across
> all data items in the list, "total_resid"
> - a pointer to the head of the list (for validation only)
>
> The cursor initialization routine has been split into two parts: the
> outer one, which initializes the cursor for traversing the entire
> list of data items; and the inner one, which initializes the cursor
> to start processing a single data item.
>
> When a message cursor is first initialized, the outer initialization
> routine sets total_resid to the length provided. The data pointer
> is initialized to the first data item on the list. From there, the
> inner initialization routine finishes by setting up to process the
> data item the cursor points to.
>
> Advancing the cursor consumes bytes in total_resid. If the resid
> field reaches zero, it means the current data item is fully
> consumed. If total_resid indicates there is more data, the cursor
> is advanced to point to the next data item, and then the inner
> initialization routine prepares for using that. (A check is made at
> this point to make sure we don't wrap around the front of the list.)
>
> The type-specific init routines are modified so they can be given a
> length that's larger than what the data item can support. The resid
> field is initialized to the smaller of the provided length and the
> length of the entire data item.
>
> When total_resid reaches zero, we're done.
>
> This resolves:
> http://tracker.ceph.com/issues/3761
>
> Signed-off-by: Alex Elder <elder@inktank.com>
> ---
> v2: set total_resid to the passed-in length when initializing cursor
>
> include/linux/ceph/messenger.h | 5 ++++-
> net/ceph/messenger.c | 42
> ++++++++++++++++++++++++++++++----------
> 2 files changed, 36 insertions(+), 11 deletions(-)
>
> diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
> index 318da01..de1d2e1 100644
> --- a/include/linux/ceph/messenger.h
> +++ b/include/linux/ceph/messenger.h
> @@ -108,7 +108,10 @@ struct ceph_msg_data {
> };
>
> struct ceph_msg_data_cursor {
> - struct ceph_msg_data *data; /* data item this describes */
> + size_t total_resid; /* across all data items */
> + struct list_head *data_head; /* = &ceph_msg->data */
> +
> + struct ceph_msg_data *data; /* current data item */
> size_t resid; /* bytes not yet consumed */
> bool last_piece; /* current is last piece */
> bool need_crc; /* crc update needed */
> diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
> index 9ce667e..bd1d804 100644
> --- a/net/ceph/messenger.c
> +++ b/net/ceph/messenger.c
> @@ -734,7 +734,7 @@ static void ceph_msg_data_bio_cursor_init(struct
> ceph_msg_data_cursor *cursor,
> BUG_ON(!bio);
> BUG_ON(!bio->bi_vcnt);
>
> - cursor->resid = length;
> + cursor->resid = min(length, data->bio_length);
> cursor->bio = bio;
> cursor->vector_index = 0;
> cursor->vector_offset = 0;
> @@ -833,9 +833,8 @@ static void ceph_msg_data_pages_cursor_init(struct
> ceph_msg_data_cursor *cursor,
>
> BUG_ON(!data->pages);
> BUG_ON(!data->length);
> - BUG_ON(length > data->length); /* short reads are OK */
>
> - cursor->resid = length;
> + cursor->resid = min(length, data->length);
> page_count = calc_pages_for(data->alignment, (u64)data->length);
> cursor->page_offset = data->alignment & ~PAGE_MASK;
> cursor->page_index = 0;
> @@ -904,7 +903,6 @@ ceph_msg_data_pagelist_cursor_init(struct
> ceph_msg_data_cursor *cursor,
>
> pagelist = data->pagelist;
> BUG_ON(!pagelist);
> - BUG_ON(length > pagelist->length); /* short reads are OK */
>
> if (!length)
> return; /* pagelist can be assigned but empty */
> @@ -912,7 +910,7 @@ ceph_msg_data_pagelist_cursor_init(struct
> ceph_msg_data_cursor *cursor,
> BUG_ON(list_empty(&pagelist->head));
> page = list_first_entry(&pagelist->head, struct page, lru);
>
> - cursor->resid = length;
> + cursor->resid = min(length, pagelist->length);
> cursor->page = page;
> cursor->offset = 0;
> cursor->last_piece = length <= PAGE_SIZE;
> @@ -982,13 +980,10 @@ static bool ceph_msg_data_pagelist_advance(struct
> ceph_msg_data_cursor *cursor,
> * be processed in that piece. It also tracks whether the current
> * piece is the last one in the data item.
> */
> -static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
> +static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor
> *cursor)
> {
> - struct ceph_msg_data_cursor *cursor = &msg->cursor;
> - struct ceph_msg_data *data;
> + size_t length = cursor->total_resid;
>
> - data = list_first_entry(&msg->data, struct ceph_msg_data, links);
> - cursor->data = data;
> switch (cursor->data->type) {
> case CEPH_MSG_DATA_PAGELIST:
> ceph_msg_data_pagelist_cursor_init(cursor, length);
> @@ -1009,6 +1004,25 @@ static void ceph_msg_data_cursor_init(struct
> ceph_msg *msg, size_t length)
> cursor->need_crc = true;
> }
>
> +static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
> +{
> + struct ceph_msg_data_cursor *cursor = &msg->cursor;
> + struct ceph_msg_data *data;
> +
> + BUG_ON(!length);
> + BUG_ON(length > msg->data_length);
> + BUG_ON(list_empty(&msg->data));
> +
> + data = list_first_entry(&msg->data, struct ceph_msg_data, links);
> +
> + cursor->data_head = &msg->data;
> + cursor->total_resid = length;
> + data = list_first_entry(&msg->data, struct ceph_msg_data, links);
> + cursor->data = data;
> +
> + __ceph_msg_data_cursor_init(cursor);
> +}
> +
> /*
> * Return the page containing the next piece to process for a given
> * data item, and supply the page offset and length of that piece.
> @@ -1073,8 +1087,16 @@ static bool ceph_msg_data_advance(struct
> ceph_msg_data_cursor *cursor,
> BUG();
> break;
> }
> + cursor->total_resid -= bytes;
> cursor->need_crc = new_piece;
>
> + if (!cursor->resid && cursor->total_resid) {
> + WARN_ON(!new_piece);
> + BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
> + cursor->data = list_entry_next(cursor->data, links);
> + __ceph_msg_data_cursor_init(cursor);
> + }
> +
> return new_piece;
> }
>
^ permalink raw reply related [flat|nested] 22+ messages in thread* Re: [PATCH 5/6, v3] libceph: implement multiple data items in a message
2013-04-08 23:48 ` [PATCH 5/6, v3] " Alex Elder
@ 2013-04-09 0:27 ` Josh Durgin
0 siblings, 0 replies; 22+ messages in thread
From: Josh Durgin @ 2013-04-09 0:27 UTC (permalink / raw)
To: Alex Elder; +Cc: ceph-devel@vger.kernel.org
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
On 04/08/2013 04:48 PM, Alex Elder wrote:
> Sorry, I've found another problem with this patch.
> I've updated it below, and have updated the patches
> that followed it. The result is available in the
> "review/wip-3761-5" branch of the ceph-client git
> repository.
>
> Previously there were assertions that caught attempts
> to add more than one data item to a message. Those
> need to go away, now that we support multiple
> data items.
>
>
>
> This patch adds support to the messenger for more than one data item
> in its data list.
>
> A message data cursor has two more fields to support this:
> - a count of the number of bytes left to be consumed across
> all data items in the list, "total_resid"
> - a pointer to the head of the list (for validation only)
>
> The cursor initialization routine has been split into two parts: the
> outer one, which initializes the cursor for traversing the entire
> list of data items; and the inner one, which initializes the cursor
> to start processing a single data item.
>
> When a message cursor is first initialized, the outer initialization
> routine sets total_resid to the length provided. The data pointer
> is initialized to the first data item on the list. From there, the
> inner initialization routine finishes by setting up to process the
> data item the cursor points to.
>
> Advancing the cursor consumes bytes in total_resid. If the resid
> field reaches zero, it means the current data item is fully
> consumed. If total_resid indicates there is more data, the cursor
> is advanced to point to the next data item, and then the inner
> initialization routine prepares for using that. (A check is made at
> this point to make sure we don't wrap around the front of the list.)
>
> The type-specific init routines are modified so they can be given a
> length that's larger than what the data item can support. The resid
> field is initialized to the smaller of the provided length and the
> length of the entire data item.
>
> When total_resid reaches zero, we're done.
>
> This resolves:
> http://tracker.ceph.com/issues/3761
>
> Signed-off-by: Alex Elder <elder@inktank.com>
> ---
> v2: set total_resid to the passed-in length when initializing cursor
> v3: turn off single data item assertions; it's ok now
>
> include/linux/ceph/messenger.h | 5 ++++-
> net/ceph/messenger.c | 48
> ++++++++++++++++++++++++++--------------
> 2 files changed, 36 insertions(+), 17 deletions(-)
>
> diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
> index 318da01..de1d2e1 100644
> --- a/include/linux/ceph/messenger.h
> +++ b/include/linux/ceph/messenger.h
> @@ -108,7 +108,10 @@ struct ceph_msg_data {
> };
>
> struct ceph_msg_data_cursor {
> - struct ceph_msg_data *data; /* data item this describes */
> + size_t total_resid; /* across all data items */
> + struct list_head *data_head; /* = &ceph_msg->data */
> +
> + struct ceph_msg_data *data; /* current data item */
> size_t resid; /* bytes not yet consumed */
> bool last_piece; /* current is last piece */
> bool need_crc; /* crc update needed */
> diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
> index 8bfe7d3..84703e5 100644
> --- a/net/ceph/messenger.c
> +++ b/net/ceph/messenger.c
> @@ -734,7 +734,7 @@ static void ceph_msg_data_bio_cursor_init(struct
> ceph_msg_data_cursor *cursor,
> BUG_ON(!bio);
> BUG_ON(!bio->bi_vcnt);
>
> - cursor->resid = length;
> + cursor->resid = min(length, data->bio_length);
> cursor->bio = bio;
> cursor->vector_index = 0;
> cursor->vector_offset = 0;
> @@ -833,9 +833,8 @@ static void ceph_msg_data_pages_cursor_init(struct
> ceph_msg_data_cursor *cursor,
>
> BUG_ON(!data->pages);
> BUG_ON(!data->length);
> - BUG_ON(length > data->length); /* short reads are OK */
>
> - cursor->resid = length;
> + cursor->resid = min(length, data->length);
> page_count = calc_pages_for(data->alignment, (u64)data->length);
> cursor->page_offset = data->alignment & ~PAGE_MASK;
> cursor->page_index = 0;
> @@ -904,7 +903,6 @@ ceph_msg_data_pagelist_cursor_init(struct
> ceph_msg_data_cursor *cursor,
>
> pagelist = data->pagelist;
> BUG_ON(!pagelist);
> - BUG_ON(length > pagelist->length); /* short reads are OK */
>
> if (!length)
> return; /* pagelist can be assigned but empty */
> @@ -912,7 +910,7 @@ ceph_msg_data_pagelist_cursor_init(struct
> ceph_msg_data_cursor *cursor,
> BUG_ON(list_empty(&pagelist->head));
> page = list_first_entry(&pagelist->head, struct page, lru);
>
> - cursor->resid = length;
> + cursor->resid = min(length, pagelist->length);
> cursor->page = page;
> cursor->offset = 0;
> cursor->last_piece = length <= PAGE_SIZE;
> @@ -982,13 +980,10 @@ static bool ceph_msg_data_pagelist_advance(struct
> ceph_msg_data_cursor *cursor,
> * be processed in that piece. It also tracks whether the current
> * piece is the last one in the data item.
> */
> -static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
> +static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor
> *cursor)
> {
> - struct ceph_msg_data_cursor *cursor = &msg->cursor;
> - struct ceph_msg_data *data;
> + size_t length = cursor->total_resid;
>
> - data = list_first_entry(&msg->data, struct ceph_msg_data, links);
> - cursor->data = data;
> switch (cursor->data->type) {
> case CEPH_MSG_DATA_PAGELIST:
> ceph_msg_data_pagelist_cursor_init(cursor, length);
> @@ -1009,6 +1004,25 @@ static void ceph_msg_data_cursor_init(struct
> ceph_msg *msg, size_t length)
> cursor->need_crc = true;
> }
>
> +static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
> +{
> + struct ceph_msg_data_cursor *cursor = &msg->cursor;
> + struct ceph_msg_data *data;
> +
> + BUG_ON(!length);
> + BUG_ON(length > msg->data_length);
> + BUG_ON(list_empty(&msg->data));
> +
> + data = list_first_entry(&msg->data, struct ceph_msg_data, links);
> +
> + cursor->data_head = &msg->data;
> + cursor->total_resid = length;
> + data = list_first_entry(&msg->data, struct ceph_msg_data, links);
> + cursor->data = data;
> +
> + __ceph_msg_data_cursor_init(cursor);
> +}
> +
> /*
> * Return the page containing the next piece to process for a given
> * data item, and supply the page offset and length of that piece.
> @@ -1073,8 +1087,16 @@ static bool ceph_msg_data_advance(struct
> ceph_msg_data_cursor *cursor,
> BUG();
> break;
> }
> + cursor->total_resid -= bytes;
> cursor->need_crc = new_piece;
>
> + if (!cursor->resid && cursor->total_resid) {
> + WARN_ON(!cursor->last_piece);
> + BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
> + cursor->data = list_entry_next(cursor->data, links);
> + __ceph_msg_data_cursor_init(cursor);
> + }
> +
> return new_piece;
> }
>
> @@ -2990,8 +3012,6 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg,
> struct page **pages,
>
> BUG_ON(!pages);
> BUG_ON(!length);
> - BUG_ON(msg->data_length);
> - BUG_ON(!list_empty(&msg->data));
>
> data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
> BUG_ON(!data);
> @@ -3012,8 +3032,6 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
>
> BUG_ON(!pagelist);
> BUG_ON(!pagelist->length);
> - BUG_ON(msg->data_length);
> - BUG_ON(!list_empty(&msg->data));
>
> data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
> BUG_ON(!data);
> @@ -3031,8 +3049,6 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg,
> struct bio *bio,
> struct ceph_msg_data *data;
>
> BUG_ON(!bio);
> - BUG_ON(msg->data_length);
> - BUG_ON(!list_empty(&msg->data));
>
> data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
> BUG_ON(!data);
>
^ permalink raw reply [flat|nested] 22+ messages in thread
* [PATCH 6/6] libceph: add, don't set data for a message
2013-04-05 22:24 [PATCH 0/6] Alex Elder
` (4 preceding siblings ...)
2013-04-05 22:26 ` [PATCH 5/6] libceph: implement multiple data items in a message Alex Elder
@ 2013-04-05 22:26 ` Alex Elder
2013-04-09 0:11 ` Josh Durgin
5 siblings, 1 reply; 22+ messages in thread
From: Alex Elder @ 2013-04-05 22:26 UTC (permalink / raw)
To: ceph-devel@vger.kernel.org
Change the names of the functions that put data on a pagelist to
reflect that we're adding to whatever's already there rather than
just setting it to the one thing. Currently only one data item is
ever added to a message, but that's about to change.
This resolves:
http://tracker.ceph.com/issues/2770
Signed-off-by: Alex Elder <elder@inktank.com>
---
fs/ceph/mds_client.c | 4 ++--
include/linux/ceph/messenger.h | 6 +++---
net/ceph/messenger.c | 12 ++++++------
net/ceph/osd_client.c | 16 ++++++++--------
4 files changed, 19 insertions(+), 19 deletions(-)
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index c07ab3c..15c265c 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1724,7 +1724,7 @@ static struct ceph_msg
*create_request_message(struct ceph_mds_client *mdsc,
if (req->r_data_len) {
/* outbound data set only by ceph_sync_setxattr() */
BUG_ON(!req->r_pages);
- ceph_msg_data_set_pages(msg, req->r_pages, req->r_data_len, 0);
+ ceph_msg_data_add_pages(msg, req->r_pages, req->r_data_len, 0);
}
msg->hdr.data_len = cpu_to_le32(req->r_data_len);
@@ -2608,7 +2608,7 @@ static void send_mds_reconnect(struct
ceph_mds_client *mdsc,
if (pagelist->length) {
/* set up outbound data if we have any */
reply->hdr.data_len = cpu_to_le32(pagelist->length);
- ceph_msg_data_set_pagelist(reply, pagelist);
+ ceph_msg_data_add_pagelist(reply, pagelist);
}
ceph_con_send(&session->s_con, reply);
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index de1d2e1..7c1420b 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -275,12 +275,12 @@ extern void ceph_msg_revoke_incoming(struct
ceph_msg *msg);
extern void ceph_con_keepalive(struct ceph_connection *con);
-extern void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page
**pages,
+extern void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page
**pages,
size_t length, size_t alignment);
-extern void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
+extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
struct ceph_pagelist *pagelist);
#ifdef CONFIG_BLOCK
-extern void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio,
+extern void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
size_t length);
#endif /* CONFIG_BLOCK */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 3f78944..c46c4df 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -3003,7 +3003,7 @@ static void ceph_msg_data_destroy(struct
ceph_msg_data *data)
kfree(data);
}
-void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
+void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
size_t length, size_t alignment)
{
struct ceph_msg_data *data;
@@ -3023,9 +3023,9 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg,
struct page **pages,
list_add_tail(&data->links, &msg->data);
msg->data_length += length;
}
-EXPORT_SYMBOL(ceph_msg_data_set_pages);
+EXPORT_SYMBOL(ceph_msg_data_add_pages);
-void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
+void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
struct ceph_pagelist *pagelist)
{
struct ceph_msg_data *data;
@@ -3042,10 +3042,10 @@ void ceph_msg_data_set_pagelist(struct ceph_msg
*msg,
list_add_tail(&data->links, &msg->data);
msg->data_length += pagelist->length;
}
-EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
+EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
#ifdef CONFIG_BLOCK
-void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio,
+void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
size_t length)
{
struct ceph_msg_data *data;
@@ -3062,7 +3062,7 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg,
struct bio *bio,
list_add_tail(&data->links, &msg->data);
msg->data_length += length;
}
-EXPORT_SYMBOL(ceph_msg_data_set_bio);
+EXPORT_SYMBOL(ceph_msg_data_add_bio);
#endif /* CONFIG_BLOCK */
/*
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 2562e4e..7322785 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -550,7 +550,7 @@ void osd_req_op_watch_init(struct ceph_osd_request
*osd_req,
}
EXPORT_SYMBOL(osd_req_op_watch_init);
-static void ceph_osdc_msg_data_set(struct ceph_msg *msg,
+static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
struct ceph_osd_data *osd_data)
{
u64 length = ceph_osd_data_length(osd_data);
@@ -558,14 +558,14 @@ static void ceph_osdc_msg_data_set(struct ceph_msg
*msg,
if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
BUG_ON(length > (u64) SIZE_MAX);
if (length)
- ceph_msg_data_set_pages(msg, osd_data->pages,
+ ceph_msg_data_add_pages(msg, osd_data->pages,
length, osd_data->alignment);
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
BUG_ON(!length);
- ceph_msg_data_set_pagelist(msg, osd_data->pagelist);
+ ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
#ifdef CONFIG_BLOCK
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
- ceph_msg_data_set_bio(msg, osd_data->bio, length);
+ ceph_msg_data_add_bio(msg, osd_data->bio, length);
#endif
} else {
BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
@@ -600,18 +600,18 @@ static u64 osd_req_encode_op(struct
ceph_osd_request *req,
dst->extent.truncate_seq =
cpu_to_le32(src->extent.truncate_seq);
if (src->op == CEPH_OSD_OP_WRITE)
- ceph_osdc_msg_data_set(req->r_request,
+ ceph_osdc_msg_data_add(req->r_request,
&src->extent.osd_data);
else
- ceph_osdc_msg_data_set(req->r_reply,
+ ceph_osdc_msg_data_add(req->r_reply,
&src->extent.osd_data);
break;
case CEPH_OSD_OP_CALL:
dst->cls.class_len = src->cls.class_len;
dst->cls.method_len = src->cls.method_len;
dst->cls.indata_len = cpu_to_le32(src->cls.request_data_len);
- ceph_osdc_msg_data_set(req->r_reply, &src->cls.response_data);
- ceph_osdc_msg_data_set(req->r_request, &src->cls.request_info);
+ ceph_osdc_msg_data_add(req->r_reply, &src->cls.response_data);
+ ceph_osdc_msg_data_add(req->r_request, &src->cls.request_info);
BUG_ON(src->cls.request_info.type !=
CEPH_OSD_DATA_TYPE_PAGELIST);
request_data_len = src->cls.request_info.pagelist->length;
--
1.7.9.5
^ permalink raw reply related [flat|nested] 22+ messages in thread* Re: [PATCH 6/6] libceph: add, don't set data for a message
2013-04-05 22:26 ` [PATCH 6/6] libceph: add, don't set data for " Alex Elder
@ 2013-04-09 0:11 ` Josh Durgin
0 siblings, 0 replies; 22+ messages in thread
From: Josh Durgin @ 2013-04-09 0:11 UTC (permalink / raw)
To: Alex Elder; +Cc: ceph-devel@vger.kernel.org
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
On 04/05/2013 03:26 PM, Alex Elder wrote:
> Change the names of the functions that put data on a pagelist to
> reflect that we're adding to whatever's already there rather than
> just setting it to the one thing. Currently only one data item is
> ever added to a message, but that's about to change.
>
> This resolves:
> http://tracker.ceph.com/issues/2770
>
> Signed-off-by: Alex Elder <elder@inktank.com>
> ---
> fs/ceph/mds_client.c | 4 ++--
> include/linux/ceph/messenger.h | 6 +++---
> net/ceph/messenger.c | 12 ++++++------
> net/ceph/osd_client.c | 16 ++++++++--------
> 4 files changed, 19 insertions(+), 19 deletions(-)
>
> diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
> index c07ab3c..15c265c 100644
> --- a/fs/ceph/mds_client.c
> +++ b/fs/ceph/mds_client.c
> @@ -1724,7 +1724,7 @@ static struct ceph_msg
> *create_request_message(struct ceph_mds_client *mdsc,
> if (req->r_data_len) {
> /* outbound data set only by ceph_sync_setxattr() */
> BUG_ON(!req->r_pages);
> - ceph_msg_data_set_pages(msg, req->r_pages, req->r_data_len, 0);
> + ceph_msg_data_add_pages(msg, req->r_pages, req->r_data_len, 0);
> }
>
> msg->hdr.data_len = cpu_to_le32(req->r_data_len);
> @@ -2608,7 +2608,7 @@ static void send_mds_reconnect(struct
> ceph_mds_client *mdsc,
> if (pagelist->length) {
> /* set up outbound data if we have any */
> reply->hdr.data_len = cpu_to_le32(pagelist->length);
> - ceph_msg_data_set_pagelist(reply, pagelist);
> + ceph_msg_data_add_pagelist(reply, pagelist);
> }
> ceph_con_send(&session->s_con, reply);
>
> diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
> index de1d2e1..7c1420b 100644
> --- a/include/linux/ceph/messenger.h
> +++ b/include/linux/ceph/messenger.h
> @@ -275,12 +275,12 @@ extern void ceph_msg_revoke_incoming(struct
> ceph_msg *msg);
>
> extern void ceph_con_keepalive(struct ceph_connection *con);
>
> -extern void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page
> **pages,
> +extern void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page
> **pages,
> size_t length, size_t alignment);
> -extern void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
> +extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
> struct ceph_pagelist *pagelist);
> #ifdef CONFIG_BLOCK
> -extern void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio,
> +extern void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
> size_t length);
> #endif /* CONFIG_BLOCK */
>
> diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
> index 3f78944..c46c4df 100644
> --- a/net/ceph/messenger.c
> +++ b/net/ceph/messenger.c
> @@ -3003,7 +3003,7 @@ static void ceph_msg_data_destroy(struct
> ceph_msg_data *data)
> kfree(data);
> }
>
> -void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
> +void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
> size_t length, size_t alignment)
> {
> struct ceph_msg_data *data;
> @@ -3023,9 +3023,9 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg,
> struct page **pages,
> list_add_tail(&data->links, &msg->data);
> msg->data_length += length;
> }
> -EXPORT_SYMBOL(ceph_msg_data_set_pages);
> +EXPORT_SYMBOL(ceph_msg_data_add_pages);
>
> -void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
> +void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
> struct ceph_pagelist *pagelist)
> {
> struct ceph_msg_data *data;
> @@ -3042,10 +3042,10 @@ void ceph_msg_data_set_pagelist(struct ceph_msg
> *msg,
> list_add_tail(&data->links, &msg->data);
> msg->data_length += pagelist->length;
> }
> -EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
> +EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
>
> #ifdef CONFIG_BLOCK
> -void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio,
> +void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
> size_t length)
> {
> struct ceph_msg_data *data;
> @@ -3062,7 +3062,7 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg,
> struct bio *bio,
> list_add_tail(&data->links, &msg->data);
> msg->data_length += length;
> }
> -EXPORT_SYMBOL(ceph_msg_data_set_bio);
> +EXPORT_SYMBOL(ceph_msg_data_add_bio);
> #endif /* CONFIG_BLOCK */
>
> /*
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index 2562e4e..7322785 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -550,7 +550,7 @@ void osd_req_op_watch_init(struct ceph_osd_request
> *osd_req,
> }
> EXPORT_SYMBOL(osd_req_op_watch_init);
>
> -static void ceph_osdc_msg_data_set(struct ceph_msg *msg,
> +static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
> struct ceph_osd_data *osd_data)
> {
> u64 length = ceph_osd_data_length(osd_data);
> @@ -558,14 +558,14 @@ static void ceph_osdc_msg_data_set(struct ceph_msg
> *msg,
> if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
> BUG_ON(length > (u64) SIZE_MAX);
> if (length)
> - ceph_msg_data_set_pages(msg, osd_data->pages,
> + ceph_msg_data_add_pages(msg, osd_data->pages,
> length, osd_data->alignment);
> } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
> BUG_ON(!length);
> - ceph_msg_data_set_pagelist(msg, osd_data->pagelist);
> + ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
> #ifdef CONFIG_BLOCK
> } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
> - ceph_msg_data_set_bio(msg, osd_data->bio, length);
> + ceph_msg_data_add_bio(msg, osd_data->bio, length);
> #endif
> } else {
> BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
> @@ -600,18 +600,18 @@ static u64 osd_req_encode_op(struct
> ceph_osd_request *req,
> dst->extent.truncate_seq =
> cpu_to_le32(src->extent.truncate_seq);
> if (src->op == CEPH_OSD_OP_WRITE)
> - ceph_osdc_msg_data_set(req->r_request,
> + ceph_osdc_msg_data_add(req->r_request,
> &src->extent.osd_data);
> else
> - ceph_osdc_msg_data_set(req->r_reply,
> + ceph_osdc_msg_data_add(req->r_reply,
> &src->extent.osd_data);
> break;
> case CEPH_OSD_OP_CALL:
> dst->cls.class_len = src->cls.class_len;
> dst->cls.method_len = src->cls.method_len;
> dst->cls.indata_len = cpu_to_le32(src->cls.request_data_len);
> - ceph_osdc_msg_data_set(req->r_reply, &src->cls.response_data);
> - ceph_osdc_msg_data_set(req->r_request, &src->cls.request_info);
> + ceph_osdc_msg_data_add(req->r_reply, &src->cls.response_data);
> + ceph_osdc_msg_data_add(req->r_request, &src->cls.request_info);
> BUG_ON(src->cls.request_info.type !=
> CEPH_OSD_DATA_TYPE_PAGELIST);
> request_data_len = src->cls.request_info.pagelist->length;
>
^ permalink raw reply [flat|nested] 22+ messages in thread