* [PATCH 00/11] continued messenger-related changes
@ 2012-06-06 3:24 Alex Elder
2012-06-06 3:30 ` [PATCH] libceph: osd_client: don't drop reply reference too early Alex Elder
` (10 more replies)
0 siblings, 11 replies; 23+ messages in thread
From: Alex Elder @ 2012-06-06 3:24 UTC (permalink / raw)
To: ceph-devel
This is a series of 11 related patches but I've grouped them into
smaller sub-sets as follows. A few of these were posted previously
but their changes--if any--were merely related to adjustments to
account for other patches inserted before them in the series. Two
of them are being submitted on behalf of Sage Weil.
These patches can also be viewed here:
https://github.com/ceph/ceph-client/commits/wip-messenger-review
-Alex
[PATCH] libceph: osd_client: don't drop reply reference too early
This fixes a potential bug found by inspection.
[PATCH] libceph: use con get/put ops from osd_client
This fixes a possibly serious problem with reference counting
that Sage found while reviewing my last series of patches.
[PATCH 1/2] libceph: embed ceph connection structure in mon_client
[PATCH 2/2] libceph: drop connection refcounting for mon_client
These two rearrange how the ceph connection is represented in
a mon_client structure, and make the referencing counting
methods no-ops.
[PATCH 1/2] libceph: init monitor connection when opening
[PATCH 2/2] libceph: fully initialize connection in con_init()
This finishes a small sub-series from before that did not get
fully committed. It unifies the way ceph connections are
initialized so that ceph connection fields are manipulated by
the messenger code, not the caller.
[PATCH] libceph: tweak ceph_alloc_msg()
This is a standalone cleanup that aims to make the real purpose
of this function a bit more obvious.
[PATCH 1/4] libceph: have messages point to their connection
[PATCH 2/4] libceph: have messages take a connection reference
[PATCH 3/4] libceph: make ceph_con_revoke() a msg operation
[PATCH 4/4] libceph: make ceph_con_revoke_message() a msg op
These four patches make every message maintain a reference to
the connection it is associated with. Having this reference
allows the revoke routines to be called with a message alone
rather than having to supply a connection pointer as well.
^ permalink raw reply [flat|nested] 23+ messages in thread
* [PATCH] libceph: osd_client: don't drop reply reference too early
2012-06-06 3:24 [PATCH 00/11] continued messenger-related changes Alex Elder
@ 2012-06-06 3:30 ` Alex Elder
2012-06-06 16:18 ` Sage Weil
2012-06-06 3:30 ` [PATCH] libceph: use con get/put ops from osd_client Alex Elder
` (9 subsequent siblings)
10 siblings, 1 reply; 23+ messages in thread
From: Alex Elder @ 2012-06-06 3:30 UTC (permalink / raw)
To: ceph-devel
In ceph_osdc_release_request(), a reference to the r_reply message
is dropped. But just after that, that same message is revoked if it
was in use to receive an incoming reply. Reorder these so we are
sure we hold a reference until we're actually done with the message.
Signed-off-by: Alex Elder <elder@inktank.com>
---
net/ceph/osd_client.c | 4 ++--
1 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index e30efbc..d8b6d31 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -139,8 +139,6 @@ void ceph_osdc_release_request(struct kref *kref)
if (req->r_request)
ceph_msg_put(req->r_request);
- if (req->r_reply)
- ceph_msg_put(req->r_reply);
if (req->r_con_filling_msg) {
dout("release_request revoking pages %p from con %p\n",
req->r_pages, req->r_con_filling_msg);
@@ -148,6 +146,8 @@ void ceph_osdc_release_request(struct kref *kref)
req->r_reply);
ceph_con_put(req->r_con_filling_msg);
}
+ if (req->r_reply)
+ ceph_msg_put(req->r_reply);
if (req->r_own_pages)
ceph_release_page_vector(req->r_pages,
req->r_num_pages);
--
1.7.5.4
^ permalink raw reply related [flat|nested] 23+ messages in thread
* [PATCH] libceph: use con get/put ops from osd_client
2012-06-06 3:24 [PATCH 00/11] continued messenger-related changes Alex Elder
2012-06-06 3:30 ` [PATCH] libceph: osd_client: don't drop reply reference too early Alex Elder
@ 2012-06-06 3:30 ` Alex Elder
2012-06-06 3:30 ` [PATCH 1/2] libceph: embed ceph connection structure in mon_client Alex Elder
` (8 subsequent siblings)
10 siblings, 0 replies; 23+ messages in thread
From: Alex Elder @ 2012-06-06 3:30 UTC (permalink / raw)
To: ceph-devel
From Sage Weil <sage@inktank.com>
There were a few direct calls to ceph_con_{get,put}() instead of the con
ops from osd_client.c. This is a bug since those ops aren't defined to
be ceph_con_get/put.
This breaks refcounting on the ceph_osd structs that contain the
ceph_connections, and could lead to all manner of strangeness.
The purpose of the ->get and ->put methods in a ceph connection are
to allow the connection to indicate it has a reference to something
external to the messaging system, *not* to indicate something
external has a reference to the connection.
[elder@inktank.com: added that last sentence]
Signed-off-by: Sage Weil <sage@newdream.net>
Reviewed-by: Alex Elder <elder@inktank.com>
---
net/ceph/osd_client.c | 8 ++++----
1 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index d8b6d31..5b41a69 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -144,7 +144,7 @@ void ceph_osdc_release_request(struct kref *kref)
req->r_pages, req->r_con_filling_msg);
ceph_con_revoke_message(req->r_con_filling_msg,
req->r_reply);
- ceph_con_put(req->r_con_filling_msg);
+ req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
}
if (req->r_reply)
ceph_msg_put(req->r_reply);
@@ -1216,7 +1216,7 @@ static void handle_reply(struct ceph_osd_client
*osdc, struct ceph_msg *msg,
if (req->r_con_filling_msg == con && req->r_reply == msg) {
dout(" dropping con_filling_msg ref %p\n", con);
req->r_con_filling_msg = NULL;
- ceph_con_put(con);
+ con->ops->put(con);
}
if (!req->r_got_reply) {
@@ -2028,7 +2028,7 @@ static struct ceph_msg *get_reply(struct
ceph_connection *con,
dout("get_reply revoking msg %p from old con %p\n",
req->r_reply, req->r_con_filling_msg);
ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
- ceph_con_put(req->r_con_filling_msg);
+ req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
req->r_con_filling_msg = NULL;
}
@@ -2063,7 +2063,7 @@ static struct ceph_msg *get_reply(struct
ceph_connection *con,
#endif
}
*skip = 0;
- req->r_con_filling_msg = ceph_con_get(con);
+ req->r_con_filling_msg = con->ops->get(con);
dout("get_reply tid %lld %p\n", tid, m);
out:
--
1.7.5.4
^ permalink raw reply related [flat|nested] 23+ messages in thread
* [PATCH 1/2] libceph: embed ceph connection structure in mon_client
2012-06-06 3:24 [PATCH 00/11] continued messenger-related changes Alex Elder
2012-06-06 3:30 ` [PATCH] libceph: osd_client: don't drop reply reference too early Alex Elder
2012-06-06 3:30 ` [PATCH] libceph: use con get/put ops from osd_client Alex Elder
@ 2012-06-06 3:30 ` Alex Elder
2012-06-06 5:22 ` Sage Weil
2012-06-06 16:19 ` Sage Weil
2012-06-06 3:31 ` [PATCH 2/2] libceph: drop connection refcounting for mon_client Alex Elder
` (7 subsequent siblings)
10 siblings, 2 replies; 23+ messages in thread
From: Alex Elder @ 2012-06-06 3:30 UTC (permalink / raw)
To: ceph-devel
A monitor client has a pointer to a ceph connection structure in it.
This is the only one of the three ceph client types that do it this
way; the OSD and MDS clients embed the connection into their main
structures. There is always exactly one ceph connection for a
monitor client, so there is no need to allocate it separate from the
monitor client structure.
So switch the ceph_mon_client structure to embed its
ceph_connection structure.
Signed-off-by: Alex Elder <elder@inktank.com>
---
include/linux/ceph/mon_client.h | 2 +-
net/ceph/mon_client.c | 47
++++++++++++++++----------------------
2 files changed, 21 insertions(+), 28 deletions(-)
diff --git a/include/linux/ceph/mon_client.h
b/include/linux/ceph/mon_client.h
index 545f859..2113e38 100644
--- a/include/linux/ceph/mon_client.h
+++ b/include/linux/ceph/mon_client.h
@@ -70,7 +70,7 @@ struct ceph_mon_client {
bool hunting;
int cur_mon; /* last monitor i contacted */
unsigned long sub_sent, sub_renew_after;
- struct ceph_connection *con;
+ struct ceph_connection con;
bool have_fsid;
/* pending generic requests */
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 704dc95..ac4d6b1 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -106,9 +106,9 @@ static void __send_prepared_auth_request(struct
ceph_mon_client *monc, int len)
monc->pending_auth = 1;
monc->m_auth->front.iov_len = len;
monc->m_auth->hdr.front_len = cpu_to_le32(len);
- ceph_con_revoke(monc->con, monc->m_auth);
+ ceph_con_revoke(&monc->con, monc->m_auth);
ceph_msg_get(monc->m_auth); /* keep our ref */
- ceph_con_send(monc->con, monc->m_auth);
+ ceph_con_send(&monc->con, monc->m_auth);
}
/*
@@ -117,8 +117,8 @@ static void __send_prepared_auth_request(struct
ceph_mon_client *monc, int len)
static void __close_session(struct ceph_mon_client *monc)
{
dout("__close_session closing mon%d\n", monc->cur_mon);
- ceph_con_revoke(monc->con, monc->m_auth);
- ceph_con_close(monc->con);
+ ceph_con_revoke(&monc->con, monc->m_auth);
+ ceph_con_close(&monc->con);
monc->cur_mon = -1;
monc->pending_auth = 0;
ceph_auth_reset(monc->auth);
@@ -142,9 +142,9 @@ static int __open_session(struct ceph_mon_client *monc)
monc->want_next_osdmap = !!monc->want_next_osdmap;
dout("open_session mon%d opening\n", monc->cur_mon);
- monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON;
- monc->con->peer_name.num = cpu_to_le64(monc->cur_mon);
- ceph_con_open(monc->con,
+ monc->con.peer_name.type = CEPH_ENTITY_TYPE_MON;
+ monc->con.peer_name.num = cpu_to_le64(monc->cur_mon);
+ ceph_con_open(&monc->con,
&monc->monmap->mon_inst[monc->cur_mon].addr);
/* initiatiate authentication handshake */
@@ -226,8 +226,8 @@ static void __send_subscribe(struct ceph_mon_client
*monc)
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
- ceph_con_revoke(monc->con, msg);
- ceph_con_send(monc->con, ceph_msg_get(msg));
+ ceph_con_revoke(&monc->con, msg);
+ ceph_con_send(&monc->con, ceph_msg_get(msg));
monc->sub_sent = jiffies | 1; /* never 0 */
}
@@ -247,7 +247,7 @@ static void handle_subscribe_ack(struct
ceph_mon_client *monc,
if (monc->hunting) {
pr_info("mon%d %s session established\n",
monc->cur_mon,
- ceph_pr_addr(&monc->con->peer_addr.in_addr));
+ ceph_pr_addr(&monc->con.peer_addr.in_addr));
monc->hunting = false;
}
dout("handle_subscribe_ack after %d seconds\n", seconds);
@@ -461,7 +461,7 @@ static int do_generic_request(struct ceph_mon_client
*monc,
req->request->hdr.tid = cpu_to_le64(req->tid);
__insert_generic_request(monc, req);
monc->num_generic_requests++;
- ceph_con_send(monc->con, ceph_msg_get(req->request));
+ ceph_con_send(&monc->con, ceph_msg_get(req->request));
mutex_unlock(&monc->mutex);
err = wait_for_completion_interruptible(&req->completion);
@@ -684,8 +684,8 @@ static void __resend_generic_request(struct
ceph_mon_client *monc)
for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
req = rb_entry(p, struct ceph_mon_generic_request, node);
- ceph_con_revoke(monc->con, req->request);
- ceph_con_send(monc->con, ceph_msg_get(req->request));
+ ceph_con_revoke(&monc->con, req->request);
+ ceph_con_send(&monc->con, ceph_msg_get(req->request));
}
}
@@ -705,7 +705,7 @@ static void delayed_work(struct work_struct *work)
__close_session(monc);
__open_session(monc); /* continue hunting */
} else {
- ceph_con_keepalive(monc->con);
+ ceph_con_keepalive(&monc->con);
__validate_auth(monc);
@@ -760,19 +760,16 @@ int ceph_monc_init(struct ceph_mon_client *monc,
struct ceph_client *cl)
goto out;
/* connection */
- monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
- if (!monc->con)
- goto out_monmap;
- ceph_con_init(&monc->client->msgr, monc->con);
- monc->con->private = monc;
- monc->con->ops = &mon_con_ops;
+ ceph_con_init(&monc->client->msgr, &monc->con);
+ monc->con.private = monc;
+ monc->con.ops = &mon_con_ops;
/* authentication */
monc->auth = ceph_auth_init(cl->options->name,
cl->options->key);
if (IS_ERR(monc->auth)) {
err = PTR_ERR(monc->auth);
- goto out_con;
+ goto out_monmap;
}
monc->auth->want_keys =
CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
@@ -824,8 +821,6 @@ out_subscribe_ack:
ceph_msg_put(monc->m_subscribe_ack);
out_auth:
ceph_auth_destroy(monc->auth);
-out_con:
- monc->con->ops->put(monc->con);
out_monmap:
kfree(monc->monmap);
out:
@@ -841,9 +836,7 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
mutex_lock(&monc->mutex);
__close_session(monc);
- monc->con->private = NULL;
- monc->con->ops->put(monc->con);
- monc->con = NULL;
+ monc->con.private = NULL;
mutex_unlock(&monc->mutex);
@@ -1021,7 +1014,7 @@ static void mon_fault(struct ceph_connection *con)
if (!monc->hunting)
pr_info("mon%d %s session lost, "
"hunting for new mon\n", monc->cur_mon,
- ceph_pr_addr(&monc->con->peer_addr.in_addr));
+ ceph_pr_addr(&monc->con.peer_addr.in_addr));
__close_session(monc);
if (!monc->hunting) {
--
1.7.5.4
^ permalink raw reply related [flat|nested] 23+ messages in thread
* [PATCH 2/2] libceph: drop connection refcounting for mon_client
2012-06-06 3:24 [PATCH 00/11] continued messenger-related changes Alex Elder
` (2 preceding siblings ...)
2012-06-06 3:30 ` [PATCH 1/2] libceph: embed ceph connection structure in mon_client Alex Elder
@ 2012-06-06 3:31 ` Alex Elder
2012-06-06 3:31 ` [PATCH 1/2] libceph: init monitor connection when opening Alex Elder
` (6 subsequent siblings)
10 siblings, 0 replies; 23+ messages in thread
From: Alex Elder @ 2012-06-06 3:31 UTC (permalink / raw)
To: ceph-devel
From Sage Weil <sage@inktank.com>
All references to the embedded ceph_connection come from the msgr
workqueue, which is drained prior to mon_client destruction. That
means we can ignore con refcounting entirely.
Signed-off-by: Sage Weil <sage@newdream.net>
Reviewed-by: Alex Elder <elder@inktank.com>
---
net/ceph/mon_client.c | 18 ++++++++++++++++--
1 files changed, 16 insertions(+), 2 deletions(-)
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index ac4d6b1..062b724 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -1029,9 +1029,23 @@ out:
mutex_unlock(&monc->mutex);
}
+/*
+ * We can ignore refcounting on the connection struct, as all references
+ * will come from the messenger workqueue, which is drained prior to
+ * mon_client destruction.
+ */
+static struct ceph_connection *con_get(struct ceph_connection *con)
+{
+ return con;
+}
+
+static void con_put(struct ceph_connection *con)
+{
+}
+
static const struct ceph_connection_operations mon_con_ops = {
- .get = ceph_con_get,
- .put = ceph_con_put,
+ .get = con_get,
+ .put = con_put,
.dispatch = dispatch,
.fault = mon_fault,
.alloc_msg = mon_alloc_msg,
--
1.7.5.4
^ permalink raw reply related [flat|nested] 23+ messages in thread
* [PATCH 1/2] libceph: init monitor connection when opening
2012-06-06 3:24 [PATCH 00/11] continued messenger-related changes Alex Elder
` (3 preceding siblings ...)
2012-06-06 3:31 ` [PATCH 2/2] libceph: drop connection refcounting for mon_client Alex Elder
@ 2012-06-06 3:31 ` Alex Elder
2012-06-06 3:31 ` [PATCH 2/2] libceph: fully initialize connection in con_init() Alex Elder
` (5 subsequent siblings)
10 siblings, 0 replies; 23+ messages in thread
From: Alex Elder @ 2012-06-06 3:31 UTC (permalink / raw)
To: ceph-devel
Hold off initializing a monitor client's connection until just
before it gets opened for use.
Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Sage Weil <sage@inktank.com>
---
net/ceph/mon_client.c | 13 ++++++-------
1 files changed, 6 insertions(+), 7 deletions(-)
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 062b724..6adbea7 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -119,6 +119,7 @@ static void __close_session(struct ceph_mon_client
*monc)
dout("__close_session closing mon%d\n", monc->cur_mon);
ceph_con_revoke(&monc->con, monc->m_auth);
ceph_con_close(&monc->con);
+ monc->con.private = NULL;
monc->cur_mon = -1;
monc->pending_auth = 0;
ceph_auth_reset(monc->auth);
@@ -141,9 +142,13 @@ static int __open_session(struct ceph_mon_client *monc)
monc->sub_renew_after = jiffies; /* i.e., expired */
monc->want_next_osdmap = !!monc->want_next_osdmap;
- dout("open_session mon%d opening\n", monc->cur_mon);
+ ceph_con_init(&monc->client->msgr, &monc->con);
+ monc->con.private = monc;
+ monc->con.ops = &mon_con_ops;
monc->con.peer_name.type = CEPH_ENTITY_TYPE_MON;
monc->con.peer_name.num = cpu_to_le64(monc->cur_mon);
+
+ dout("open_session mon%d opening\n", monc->cur_mon);
ceph_con_open(&monc->con,
&monc->monmap->mon_inst[monc->cur_mon].addr);
@@ -760,10 +765,6 @@ int ceph_monc_init(struct ceph_mon_client *monc,
struct ceph_client *cl)
goto out;
/* connection */
- ceph_con_init(&monc->client->msgr, &monc->con);
- monc->con.private = monc;
- monc->con.ops = &mon_con_ops;
-
/* authentication */
monc->auth = ceph_auth_init(cl->options->name,
cl->options->key);
@@ -836,8 +837,6 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
mutex_lock(&monc->mutex);
__close_session(monc);
- monc->con.private = NULL;
-
mutex_unlock(&monc->mutex);
ceph_auth_destroy(monc->auth);
--
1.7.5.4
^ permalink raw reply related [flat|nested] 23+ messages in thread
* [PATCH 2/2] libceph: fully initialize connection in con_init()
2012-06-06 3:24 [PATCH 00/11] continued messenger-related changes Alex Elder
` (4 preceding siblings ...)
2012-06-06 3:31 ` [PATCH 1/2] libceph: init monitor connection when opening Alex Elder
@ 2012-06-06 3:31 ` Alex Elder
2012-06-06 3:31 ` [PATCH] libceph: tweak ceph_alloc_msg() Alex Elder
` (4 subsequent siblings)
10 siblings, 0 replies; 23+ messages in thread
From: Alex Elder @ 2012-06-06 3:31 UTC (permalink / raw)
To: ceph-devel
Move the initialization of a ceph connection's private pointer,
operations vector pointer, and peer name information into
ceph_con_init(). Rearrange the arguments so the connection pointer
is first. Hide the byte-swapping of the peer entity number inside
ceph_con_init()
Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Sage Weil <sage@inktank.com>
---
fs/ceph/mds_client.c | 7 ++-----
include/linux/ceph/messenger.h | 6 ++++--
net/ceph/messenger.c | 9 ++++++++-
net/ceph/mon_client.c | 8 +++-----
net/ceph/osd_client.c | 7 ++-----
5 files changed, 19 insertions(+), 18 deletions(-)
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index ad30261..ecd7f15 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -394,11 +394,8 @@ static struct ceph_mds_session
*register_session(struct ceph_mds_client *mdsc,
s->s_seq = 0;
mutex_init(&s->s_mutex);
- ceph_con_init(&mdsc->fsc->client->msgr, &s->s_con);
- s->s_con.private = s;
- s->s_con.ops = &mds_con_ops;
- s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS;
- s->s_con.peer_name.num = cpu_to_le64(mds);
+ ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr,
+ CEPH_ENTITY_TYPE_MDS, mds);
spin_lock_init(&s->s_gen_ttl_lock);
s->s_cap_gen = 0;
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 5e852f4..dd27837 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -227,8 +227,10 @@ extern void ceph_messenger_init(struct
ceph_messenger *msgr,
u32 required_features,
bool nocrc);
-extern void ceph_con_init(struct ceph_messenger *msgr,
- struct ceph_connection *con);
+extern void ceph_con_init(struct ceph_connection *con, void *private,
+ const struct ceph_connection_operations *ops,
+ struct ceph_messenger *msgr, __u8 entity_type,
+ __u64 entity_num);
extern void ceph_con_open(struct ceph_connection *con,
struct ceph_entity_addr *addr);
extern bool ceph_con_opened(struct ceph_connection *con);
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 36b440a..3b65f6e 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -521,15 +521,22 @@ void ceph_con_put(struct ceph_connection *con)
/*
* initialize a new connection.
*/
-void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection
*con)
+void ceph_con_init(struct ceph_connection *con, void *private,
+ const struct ceph_connection_operations *ops,
+ struct ceph_messenger *msgr, __u8 entity_type, __u64 entity_num)
{
dout("con_init %p\n", con);
memset(con, 0, sizeof(*con));
+ con->private = private;
+ con->ops = ops;
atomic_set(&con->nref, 1);
con->msgr = msgr;
con_sock_state_init(con);
+ con->peer_name.type = (__u8) entity_type;
+ con->peer_name.num = cpu_to_le64(entity_num);
+
mutex_init(&con->mutex);
INIT_LIST_HEAD(&con->out_queue);
INIT_LIST_HEAD(&con->out_sent);
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 6adbea7..ab6b24a 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -142,11 +142,9 @@ static int __open_session(struct ceph_mon_client *monc)
monc->sub_renew_after = jiffies; /* i.e., expired */
monc->want_next_osdmap = !!monc->want_next_osdmap;
- ceph_con_init(&monc->client->msgr, &monc->con);
- monc->con.private = monc;
- monc->con.ops = &mon_con_ops;
- monc->con.peer_name.type = CEPH_ENTITY_TYPE_MON;
- monc->con.peer_name.num = cpu_to_le64(monc->cur_mon);
+ ceph_con_init(&monc->con, monc, &mon_con_ops,
+ &monc->client->msgr,
+ CEPH_ENTITY_TYPE_MON, monc->cur_mon);
dout("open_session mon%d opening\n", monc->cur_mon);
ceph_con_open(&monc->con,
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 5b41a69..448c9da 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -640,11 +640,8 @@ static struct ceph_osd *create_osd(struct
ceph_osd_client *osdc, int onum)
INIT_LIST_HEAD(&osd->o_osd_lru);
osd->o_incarnation = 1;
- ceph_con_init(&osdc->client->msgr, &osd->o_con);
- osd->o_con.private = osd;
- osd->o_con.ops = &osd_con_ops;
- osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
- osd->o_con.peer_name.num = cpu_to_le64(onum);
+ ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr,
+ CEPH_ENTITY_TYPE_OSD, onum);
INIT_LIST_HEAD(&osd->o_keepalive_item);
return osd;
--
1.7.5.4
^ permalink raw reply related [flat|nested] 23+ messages in thread
* [PATCH] libceph: tweak ceph_alloc_msg()
2012-06-06 3:24 [PATCH 00/11] continued messenger-related changes Alex Elder
` (5 preceding siblings ...)
2012-06-06 3:31 ` [PATCH 2/2] libceph: fully initialize connection in con_init() Alex Elder
@ 2012-06-06 3:31 ` Alex Elder
2012-06-06 5:14 ` Sage Weil
2012-06-06 3:31 ` [PATCH 1/4] libceph: have messages point to their connection Alex Elder
` (3 subsequent siblings)
10 siblings, 1 reply; 23+ messages in thread
From: Alex Elder @ 2012-06-06 3:31 UTC (permalink / raw)
To: ceph-devel
The function ceph_alloc_msg() is only used to allocate a message
that will be assigned to a connection's in_msg pointer. Rename the
function so this implied usage is more clear.
In addition, make that assignment inside the function (again, since
that's precisely what it's intended to be used for). This allows us
to return what is now provided via the passed-in address of a "skip"
variable. The return type is now Boolean to be explicit that there
are only two possible outcomes.
Signed-off-by: Alex Elder <elder@inktank.com>
---
net/ceph/messenger.c | 58
++++++++++++++++++++++++++-----------------------
1 files changed, 31 insertions(+), 27 deletions(-)
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 3b65f6e..f7c9061 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1655,9 +1655,8 @@ static int read_partial_message_section(struct
ceph_connection *con,
return 1;
}
-static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
- struct ceph_msg_header *hdr,
- int *skip);
+static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
+ struct ceph_msg_header *hdr);
static int read_partial_message_pages(struct ceph_connection *con,
@@ -1740,7 +1739,6 @@ static int read_partial_message(struct
ceph_connection *con)
int ret;
unsigned front_len, middle_len, data_len;
bool do_datacrc = !con->msgr->nocrc;
- int skip;
u64 seq;
u32 crc;
@@ -1793,9 +1791,7 @@ static int read_partial_message(struct
ceph_connection *con)
if (!con->in_msg) {
dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
con->in_hdr.front_len, con->in_hdr.data_len);
- skip = 0;
- con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
- if (skip) {
+ if (ceph_con_in_msg_alloc(con, &con->in_hdr)) {
/* skip this message */
dout("alloc_msg said skip message\n");
BUG_ON(con->in_msg);
@@ -2577,46 +2573,54 @@ static int ceph_alloc_middle(struct
ceph_connection *con, struct ceph_msg *msg)
}
/*
- * Generic message allocator, for incoming messages.
+ * Allocate a message for receiving an incoming message on a
+ * connection, and save the result in con->in_msg. Uses the
+ * connection's private alloc_msg op if available.
+ *
+ * Returns true if the message should be skipped, false otherwise.
*/
-static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
- struct ceph_msg_header *hdr,
- int *skip)
+static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
+ struct ceph_msg_header *hdr)
{
int type = le16_to_cpu(hdr->type);
int front_len = le32_to_cpu(hdr->front_len);
int middle_len = le32_to_cpu(hdr->middle_len);
- struct ceph_msg *msg = NULL;
int ret;
+ BUG_ON(con->in_msg != NULL);
+
if (con->ops->alloc_msg) {
+ int skip = 0;
+
mutex_unlock(&con->mutex);
- msg = con->ops->alloc_msg(con, hdr, skip);
+ con->in_msg = con->ops->alloc_msg(con, hdr, &skip);
mutex_lock(&con->mutex);
- if (!msg || *skip)
- return NULL;
+ if (skip)
+ con->in_msg = NULL;
+
+ if (!con->in_msg)
+ return skip != 0;
}
- if (!msg) {
- *skip = 0;
- msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
- if (!msg) {
+ if (!con->in_msg) {
+ con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
+ if (!con->in_msg) {
pr_err("unable to allocate msg type %d len %d\n",
type, front_len);
- return NULL;
+ return false;
}
- msg->page_alignment = le16_to_cpu(hdr->data_off);
+ con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
}
- memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
+ memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
- if (middle_len && !msg->middle) {
- ret = ceph_alloc_middle(con, msg);
+ if (middle_len && !con->in_msg->middle) {
+ ret = ceph_alloc_middle(con, con->in_msg);
if (ret < 0) {
- ceph_msg_put(msg);
- return NULL;
+ ceph_msg_put(con->in_msg);
+ con->in_msg = NULL;
}
}
- return msg;
+ return false;
}
--
1.7.5.4
^ permalink raw reply related [flat|nested] 23+ messages in thread
* [PATCH 1/4] libceph: have messages point to their connection
2012-06-06 3:24 [PATCH 00/11] continued messenger-related changes Alex Elder
` (6 preceding siblings ...)
2012-06-06 3:31 ` [PATCH] libceph: tweak ceph_alloc_msg() Alex Elder
@ 2012-06-06 3:31 ` Alex Elder
2012-06-06 5:16 ` Sage Weil
2012-06-06 3:31 ` [PATCH 2/4] libceph: have messages take a connection reference Alex Elder
` (2 subsequent siblings)
10 siblings, 1 reply; 23+ messages in thread
From: Alex Elder @ 2012-06-06 3:31 UTC (permalink / raw)
To: ceph-devel
When a ceph message is queued for sending it is placed on a list of
pending messages (ceph_connection->out_queue). When they are
actually sent over the wire, they are moved from that list to
another (ceph_connection->out_sent). When acknowledgement for the
message is received, it is removed from the sent messages list.
During that entire time the message is "in the possession" of a
single ceph connection. Keep track of that connection in the
message. This will be used in the next patch (and is a helpful
bit of information for debugging anyway).
Signed-off-by: Alex Elder <elder@inktank.com>
---
include/linux/ceph/messenger.h | 3 +++
net/ceph/messenger.c | 27 +++++++++++++++++++++++++--
2 files changed, 28 insertions(+), 2 deletions(-)
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index dd27837..6df837f 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -77,7 +77,10 @@ struct ceph_msg {
unsigned nr_pages; /* size of page array */
unsigned page_alignment; /* io offset in first page */
struct ceph_pagelist *pagelist; /* instead of pages */
+
+ struct ceph_connection *con;
struct list_head list_head;
+
struct kref kref;
struct bio *bio; /* instead of pages/pagelist */
struct bio *bio_iter; /* bio iterator */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index f7c9061..59fa5fb 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -414,6 +414,9 @@ static int con_close_socket(struct ceph_connection *con)
static void ceph_msg_remove(struct ceph_msg *msg)
{
list_del_init(&msg->list_head);
+ BUG_ON(msg->con == NULL);
+ msg->con = NULL;
+
ceph_msg_put(msg);
}
static void ceph_msg_remove_list(struct list_head *head)
@@ -433,6 +436,8 @@ static void reset_connection(struct ceph_connection
*con)
ceph_msg_remove_list(&con->out_sent);
if (con->in_msg) {
+ BUG_ON(con->in_msg->con != con);
+ con->in_msg->con = NULL;
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
}
@@ -625,8 +630,10 @@ static void prepare_write_message(struct
ceph_connection *con)
&con->out_temp_ack);
}
+ BUG_ON(list_empty(&con->out_queue));
m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
con->out_msg = m;
+ BUG_ON(m->con != con);
/* put message on sent list */
ceph_msg_get(m);
@@ -1806,6 +1813,8 @@ static int read_partial_message(struct
ceph_connection *con)
"error allocating memory for incoming message";
return -ENOMEM;
}
+
+ BUG_ON(con->in_msg->con != con);
m = con->in_msg;
m->front.iov_len = 0; /* haven't read it yet */
if (m->middle)
@@ -1901,6 +1910,8 @@ static void process_message(struct ceph_connection
*con)
{
struct ceph_msg *msg;
+ BUG_ON(con->in_msg->con != con);
+ con->in_msg->con = NULL;
msg = con->in_msg;
con->in_msg = NULL;
@@ -2260,6 +2271,8 @@ static void ceph_fault(struct ceph_connection *con)
con_close_socket(con);
if (con->in_msg) {
+ BUG_ON(con->in_msg->con != con);
+ con->in_msg->con = NULL;
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
}
@@ -2378,6 +2391,8 @@ void ceph_con_send(struct ceph_connection *con,
struct ceph_msg *msg)
/* queue */
mutex_lock(&con->mutex);
+ BUG_ON(msg->con != NULL);
+ msg->con = con;
BUG_ON(!list_empty(&msg->list_head));
list_add_tail(&msg->list_head, &con->out_queue);
dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
@@ -2403,13 +2418,16 @@ void ceph_con_revoke(struct ceph_connection
*con, struct ceph_msg *msg)
{
mutex_lock(&con->mutex);
if (!list_empty(&msg->list_head)) {
- dout("con_revoke %p msg %p - was on queue\n", con, msg);
+ dout("%s %p msg %p - was on queue\n", __func__, con, msg);
list_del_init(&msg->list_head);
+ BUG_ON(msg->con == NULL);
+ msg->con = NULL;
+
ceph_msg_put(msg);
msg->hdr.seq = 0;
}
if (con->out_msg == msg) {
- dout("con_revoke %p msg %p - was sending\n", con, msg);
+ dout("%s %p msg %p - was sending\n", __func__, con, msg);
con->out_msg = NULL;
if (con->out_kvec_is_msg) {
con->out_skip = con->out_kvec_bytes;
@@ -2478,6 +2496,8 @@ struct ceph_msg *ceph_msg_new(int type, int
front_len, gfp_t flags,
if (m == NULL)
goto out;
kref_init(&m->kref);
+
+ m->con = NULL;
INIT_LIST_HEAD(&m->list_head);
m->hdr.tid = 0;
@@ -2595,6 +2615,8 @@ static bool ceph_con_in_msg_alloc(struct
ceph_connection *con,
mutex_unlock(&con->mutex);
con->in_msg = con->ops->alloc_msg(con, hdr, &skip);
mutex_lock(&con->mutex);
+ if (con->in_msg)
+ con->in_msg->con = con;
if (skip)
con->in_msg = NULL;
@@ -2608,6 +2630,7 @@ static bool ceph_con_in_msg_alloc(struct
ceph_connection *con,
type, front_len);
return false;
}
+ con->in_msg->con = con;
con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
}
memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
--
1.7.5.4
^ permalink raw reply related [flat|nested] 23+ messages in thread
* [PATCH 2/4] libceph: have messages take a connection reference
2012-06-06 3:24 [PATCH 00/11] continued messenger-related changes Alex Elder
` (7 preceding siblings ...)
2012-06-06 3:31 ` [PATCH 1/4] libceph: have messages point to their connection Alex Elder
@ 2012-06-06 3:31 ` Alex Elder
2012-06-06 17:06 ` Sage Weil
2012-06-06 3:31 ` [PATCH 3/4] libceph: make ceph_con_revoke() a msg operation Alex Elder
2012-06-06 3:31 ` [PATCH 4/4] libceph: make ceph_con_revoke_message() a msg op Alex Elder
10 siblings, 1 reply; 23+ messages in thread
From: Alex Elder @ 2012-06-06 3:31 UTC (permalink / raw)
To: ceph-devel
There are essentially two types of ceph messages: incoming and
outgoing. Outgoing messages are always allocated via ceph_msg_new(),
and at the time of their allocation they are not associated with any
particular connection. Incoming messages are always allocated via
ceph_con_in_msg_alloc(), and they are initially associated with the
connection from which incoming data will be placed into the message.
When an outgoing message gets sent, it becomes associated with a
connection and remains that way until the message is successfully
sent. The association of an incoming message goes away at the point
it is sent to an upper layer via a con->ops->dispatch method.
This patch implements reference counting for all ceph messages, such
that every message holds a reference (and a pointer) to a connection
if and only if it is associated with that connection (as described
above).
For background, here is an explanation of the ceph message
lifecycle, emphasizing when an association exists between a message
and a connection.
Outgoing Messages
An outgoing message is "owned" by its allocator, from the time it is
allocated in ceph_msg_new() up to the point it gets queued for
sending in ceph_con_send(). Prior to that point the message's
msg->con pointer is null; at the point it is queued for sending its
message pointer is assigned to refer to the connection. At that
time the message is inserted into a connection's out_queue list.
When a message on the out_queue list has been sent to the socket
layer to be put on the wire, it is transferred out of that list and
into the connection's out_sent list. At that point it is still owned
by the connection, and will remain so until an acknowledgement is
received from the recipient that indicates the message was
successfully transferred. When such an acknowledgement is received
(in process_ack()), the message is removed from its list (in
ceph_msg_remove()), at which point it is no longer associated with
the connection.
So basically, any time a message is on one of a connection's lists,
it is associated with that connection. Reference counting outgoing
messages can thus be done at the points a message is added to the
out_queue (in ceph_con_send()) and the point it is removed from
either its two lists (in ceph_msg_remove())--at which point its
connection pointer becomes null.
Incoming Messages
When an incoming message on a connection is getting read (in
read_partial_message()) and there is no message in con->in_msg,
a new one is allocated using ceph_con_in_msg_alloc(). At that
point the message is associated with the connection. Once that
message has been completely and successfully read, it is passed to
upper layer code using the connection's con->ops->dispatch method.
At that point the association between the message and the connection
no longer exists.
Reference counting of connections for incoming messages can be done
by taking a reference to the connection when the message gets
allocated, and releasing that reference when it gets handed off
using the dispatch method.
We should never fail to get a connection reference for a
message--the since the caller should already hold one.
Signed-off-by: Alex Elder <elder@inktank.com>
---
net/ceph/messenger.c | 24 ++++++++++++++++++------
1 files changed, 18 insertions(+), 6 deletions(-)
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 59fa5fb..336707b 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -415,6 +415,7 @@ static void ceph_msg_remove(struct ceph_msg *msg)
{
list_del_init(&msg->list_head);
BUG_ON(msg->con == NULL);
+ ceph_con_put(msg->con);
msg->con = NULL;
ceph_msg_put(msg);
@@ -440,6 +441,7 @@ static void reset_connection(struct ceph_connection
*con)
con->in_msg->con = NULL;
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
+ ceph_con_put(con->in_msg->con);
}
con->connect_seq = 0;
@@ -1914,6 +1916,7 @@ static void process_message(struct ceph_connection
*con)
con->in_msg->con = NULL;
msg = con->in_msg;
con->in_msg = NULL;
+ ceph_con_put(con);
/* if first message, set peer_name */
if (con->peer_name.type == 0)
@@ -2275,6 +2278,7 @@ static void ceph_fault(struct ceph_connection *con)
con->in_msg->con = NULL;
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
+ ceph_con_put(con);
}
/* Requeue anything that hasn't been acked */
@@ -2391,8 +2395,11 @@ void ceph_con_send(struct ceph_connection *con,
struct ceph_msg *msg)
/* queue */
mutex_lock(&con->mutex);
+
BUG_ON(msg->con != NULL);
- msg->con = con;
+ msg->con = ceph_con_get(con);
+ BUG_ON(msg->con == NULL);
+
BUG_ON(!list_empty(&msg->list_head));
list_add_tail(&msg->list_head, &con->out_queue);
dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
@@ -2421,10 +2428,11 @@ void ceph_con_revoke(struct ceph_connection
*con, struct ceph_msg *msg)
dout("%s %p msg %p - was on queue\n", __func__, con, msg);
list_del_init(&msg->list_head);
BUG_ON(msg->con == NULL);
+ ceph_con_put(msg->con);
msg->con = NULL;
+ msg->hdr.seq = 0;
ceph_msg_put(msg);
- msg->hdr.seq = 0;
}
if (con->out_msg == msg) {
dout("%s %p msg %p - was sending\n", __func__, con, msg);
@@ -2433,8 +2441,9 @@ void ceph_con_revoke(struct ceph_connection *con,
struct ceph_msg *msg)
con->out_skip = con->out_kvec_bytes;
con->out_kvec_is_msg = false;
}
- ceph_msg_put(msg);
msg->hdr.seq = 0;
+
+ ceph_msg_put(msg);
}
mutex_unlock(&con->mutex);
}
@@ -2615,8 +2624,10 @@ static bool ceph_con_in_msg_alloc(struct
ceph_connection *con,
mutex_unlock(&con->mutex);
con->in_msg = con->ops->alloc_msg(con, hdr, &skip);
mutex_lock(&con->mutex);
- if (con->in_msg)
- con->in_msg->con = con;
+ if (con->in_msg) {
+ con->in_msg->con = ceph_con_get(con);
+ BUG_ON(con->in_msg->con == NULL);
+ }
if (skip)
con->in_msg = NULL;
@@ -2630,7 +2641,8 @@ static bool ceph_con_in_msg_alloc(struct
ceph_connection *con,
type, front_len);
return false;
}
- con->in_msg->con = con;
+ con->in_msg->con = ceph_con_get(con);
+ BUG_ON(con->in_msg->con == NULL);
con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
}
memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
--
1.7.5.4
^ permalink raw reply related [flat|nested] 23+ messages in thread
* [PATCH 3/4] libceph: make ceph_con_revoke() a msg operation
2012-06-06 3:24 [PATCH 00/11] continued messenger-related changes Alex Elder
` (8 preceding siblings ...)
2012-06-06 3:31 ` [PATCH 2/4] libceph: have messages take a connection reference Alex Elder
@ 2012-06-06 3:31 ` Alex Elder
2012-06-06 5:18 ` Sage Weil
2012-06-06 3:31 ` [PATCH 4/4] libceph: make ceph_con_revoke_message() a msg op Alex Elder
10 siblings, 1 reply; 23+ messages in thread
From: Alex Elder @ 2012-06-06 3:31 UTC (permalink / raw)
To: ceph-devel
ceph_con_revoke() is passed both a message and a ceph connection.
Now that any message associated with a connection holds a pointer
to that connection, there's no need to provide the connection when
revoking a message.
This has the added benefit of precluding the possibility of the
providing the wrong connection pointer. If the message's connection
pointer is null, it is not being tracked by any connection, so
revoking it is a no-op. This is supported as a convenience for
upper layers, so they can revoke a message that is not actually
"in flight."
Rename the function ceph_msg_revoke() to reflect that it is really
an operation on a message, not a connection.
Signed-off-by: Alex Elder <elder@inktank.com>
---
include/linux/ceph/messenger.h | 3 ++-
net/ceph/messenger.c | 7 ++++++-
net/ceph/mon_client.c | 8 ++++----
net/ceph/osd_client.c | 4 ++--
4 files changed, 14 insertions(+), 8 deletions(-)
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 6df837f..9008f81 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -239,7 +239,8 @@ extern void ceph_con_open(struct ceph_connection *con,
extern bool ceph_con_opened(struct ceph_connection *con);
extern void ceph_con_close(struct ceph_connection *con);
extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg
*msg);
-extern void ceph_con_revoke(struct ceph_connection *con, struct
ceph_msg *msg);
+
+extern void ceph_msg_revoke(struct ceph_msg *msg);
extern void ceph_con_revoke_message(struct ceph_connection *con,
struct ceph_msg *msg);
extern void ceph_con_keepalive(struct ceph_connection *con);
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 336707b..79f3032 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -2421,8 +2421,13 @@ EXPORT_SYMBOL(ceph_con_send);
/*
* Revoke a message that was previously queued for send
*/
-void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
+void ceph_msg_revoke(struct ceph_msg *msg)
{
+ struct ceph_connection *con = msg->con;
+
+ if (!con)
+ return; /* Message not in our possession */
+
mutex_lock(&con->mutex);
if (!list_empty(&msg->list_head)) {
dout("%s %p msg %p - was on queue\n", __func__, con, msg);
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index ab6b24a..cf82e04 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -106,7 +106,7 @@ static void __send_prepared_auth_request(struct
ceph_mon_client *monc, int len)
monc->pending_auth = 1;
monc->m_auth->front.iov_len = len;
monc->m_auth->hdr.front_len = cpu_to_le32(len);
- ceph_con_revoke(&monc->con, monc->m_auth);
+ ceph_msg_revoke(monc->m_auth);
ceph_msg_get(monc->m_auth); /* keep our ref */
ceph_con_send(&monc->con, monc->m_auth);
}
@@ -117,7 +117,7 @@ static void __send_prepared_auth_request(struct
ceph_mon_client *monc, int len)
static void __close_session(struct ceph_mon_client *monc)
{
dout("__close_session closing mon%d\n", monc->cur_mon);
- ceph_con_revoke(&monc->con, monc->m_auth);
+ ceph_msg_revoke(monc->m_auth);
ceph_con_close(&monc->con);
monc->con.private = NULL;
monc->cur_mon = -1;
@@ -229,7 +229,7 @@ static void __send_subscribe(struct ceph_mon_client
*monc)
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
- ceph_con_revoke(&monc->con, msg);
+ ceph_msg_revoke(msg);
ceph_con_send(&monc->con, ceph_msg_get(msg));
monc->sub_sent = jiffies | 1; /* never 0 */
@@ -687,7 +687,7 @@ static void __resend_generic_request(struct
ceph_mon_client *monc)
for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
req = rb_entry(p, struct ceph_mon_generic_request, node);
- ceph_con_revoke(&monc->con, req->request);
+ ceph_msg_revoke(req->request);
ceph_con_send(&monc->con, ceph_msg_get(req->request));
}
}
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 448c9da..403fefb 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -852,7 +852,7 @@ static void __unregister_request(struct
ceph_osd_client *osdc,
if (req->r_osd) {
/* make sure the original request isn't in flight. */
- ceph_con_revoke(&req->r_osd->o_con, req->r_request);
+ ceph_msg_revoke(req->r_request);
list_del_init(&req->r_osd_item);
if (list_empty(&req->r_osd->o_requests) &&
@@ -879,7 +879,7 @@ static void __unregister_request(struct
ceph_osd_client *osdc,
static void __cancel_request(struct ceph_osd_request *req)
{
if (req->r_sent && req->r_osd) {
- ceph_con_revoke(&req->r_osd->o_con, req->r_request);
+ ceph_msg_revoke(req->r_request);
req->r_sent = 0;
}
}
--
1.7.5.4
^ permalink raw reply related [flat|nested] 23+ messages in thread
* [PATCH 4/4] libceph: make ceph_con_revoke_message() a msg op
2012-06-06 3:24 [PATCH 00/11] continued messenger-related changes Alex Elder
` (9 preceding siblings ...)
2012-06-06 3:31 ` [PATCH 3/4] libceph: make ceph_con_revoke() a msg operation Alex Elder
@ 2012-06-06 3:31 ` Alex Elder
2012-06-06 5:22 ` Sage Weil
10 siblings, 1 reply; 23+ messages in thread
From: Alex Elder @ 2012-06-06 3:31 UTC (permalink / raw)
To: ceph-devel
ceph_con_revoke_message() is passed both a message and a ceph
connection. A ceph_msg allocated for incoming messages on a
connection always has a pointer to that connection, so there's no
need to provide the connection when revoking such a message.
Note that the existing logic does not preclude the message supplied
being a null/bogus message pointer. The only user of this interface
is the OSD client, and the only value an osd client passes is a
request's r_reply field. That is always non-null (except briefly in
an error path in ceph_osdc_alloc_request(), and that drops the
only reference so the request won't ever have a reply to revoke).
So we can safely assume the passed-in message is non-null, but add a
BUG_ON() to make it very obvious we are imposing this restriction.
Rename the function ceph_msg_revoke_incoming() to reflect that it is
really an operation on an incoming message.
Signed-off-by: Alex Elder <elder@inktank.com>
---
include/linux/ceph/messenger.h | 4 ++--
net/ceph/messenger.c | 19 +++++++++++++------
net/ceph/osd_client.c | 9 ++++-----
3 files changed, 19 insertions(+), 13 deletions(-)
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 9008f81..a334dbd 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -241,8 +241,8 @@ extern void ceph_con_close(struct ceph_connection *con);
extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg
*msg);
extern void ceph_msg_revoke(struct ceph_msg *msg);
-extern void ceph_con_revoke_message(struct ceph_connection *con,
- struct ceph_msg *msg);
+extern void ceph_msg_revoke_incoming(struct ceph_msg *msg);
+
extern void ceph_con_keepalive(struct ceph_connection *con);
extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
extern void ceph_con_put(struct ceph_connection *con);
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 79f3032..a036141 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -2456,17 +2456,24 @@ void ceph_msg_revoke(struct ceph_msg *msg)
/*
* Revoke a message that we may be reading data into
*/
-void ceph_con_revoke_message(struct ceph_connection *con, struct
ceph_msg *msg)
+void ceph_msg_revoke_incoming(struct ceph_msg *msg)
{
+ struct ceph_connection *con;
+
+ BUG_ON(msg == NULL);
+ if (!msg->con)
+ return; /* Message not in our posession */
+
+ con = msg->con;
mutex_lock(&con->mutex);
- if (con->in_msg && con->in_msg == msg) {
+ if (con->in_msg == msg) {
unsigned front_len = le32_to_cpu(con->in_hdr.front_len);
unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len);
unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
/* skip rest of message */
- dout("con_revoke_pages %p msg %p revoked\n", con, msg);
- con->in_base_pos = con->in_base_pos -
+ dout("%s %p msg %p revoked\n", __func__, con, msg);
+ con->in_base_pos = con->in_base_pos -
sizeof(struct ceph_msg_header) -
front_len -
middle_len -
@@ -2477,8 +2484,8 @@ void ceph_con_revoke_message(struct
ceph_connection *con, struct ceph_msg *msg)
con->in_tag = CEPH_MSGR_TAG_READY;
con->in_seq++;
} else {
- dout("con_revoke_pages %p msg %p pages %p no-op\n",
- con, con->in_msg, msg);
+ dout("%s %p in_msg %p msg %p no-op\n",
+ __func__, con, con->in_msg, msg);
}
mutex_unlock(&con->mutex);
}
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 403fefb..9f87edd 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -140,10 +140,9 @@ void ceph_osdc_release_request(struct kref *kref)
if (req->r_request)
ceph_msg_put(req->r_request);
if (req->r_con_filling_msg) {
- dout("release_request revoking pages %p from con %p\n",
+ dout("%s revoking pages %p from con %p\n", __func__,
req->r_pages, req->r_con_filling_msg);
- ceph_con_revoke_message(req->r_con_filling_msg,
- req->r_reply);
+ ceph_msg_revoke_incoming(req->r_reply);
req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
}
if (req->r_reply)
@@ -2022,9 +2021,9 @@ static struct ceph_msg *get_reply(struct
ceph_connection *con,
}
if (req->r_con_filling_msg) {
- dout("get_reply revoking msg %p from old con %p\n",
+ dout("%s revoking msg %p from old con %p\n", __func__,
req->r_reply, req->r_con_filling_msg);
- ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
+ ceph_msg_revoke_incoming(req->r_reply);
req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
req->r_con_filling_msg = NULL;
}
--
1.7.5.4
^ permalink raw reply related [flat|nested] 23+ messages in thread
* Re: [PATCH] libceph: tweak ceph_alloc_msg()
2012-06-06 3:31 ` [PATCH] libceph: tweak ceph_alloc_msg() Alex Elder
@ 2012-06-06 5:14 ` Sage Weil
0 siblings, 0 replies; 23+ messages in thread
From: Sage Weil @ 2012-06-06 5:14 UTC (permalink / raw)
To: Alex Elder; +Cc: ceph-devel
On Tue, 5 Jun 2012, Alex Elder wrote:
> The function ceph_alloc_msg() is only used to allocate a message
> that will be assigned to a connection's in_msg pointer. Rename the
> function so this implied usage is more clear.
>
> In addition, make that assignment inside the function (again, since
> that's precisely what it's intended to be used for). This allows us
> to return what is now provided via the passed-in address of a "skip"
> variable. The return type is now Boolean to be explicit that there
> are only two possible outcomes.
>
> Signed-off-by: Alex Elder <elder@inktank.com>
> ---
> net/ceph/messenger.c | 58
> ++++++++++++++++++++++++++-----------------------
> 1 files changed, 31 insertions(+), 27 deletions(-)
>
> diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
> index 3b65f6e..f7c9061 100644
> --- a/net/ceph/messenger.c
> +++ b/net/ceph/messenger.c
> @@ -1655,9 +1655,8 @@ static int read_partial_message_section(struct
> ceph_connection *con,
> return 1;
> }
>
> -static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
> - struct ceph_msg_header *hdr,
> - int *skip);
> +static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
> + struct ceph_msg_header *hdr);
>
>
> static int read_partial_message_pages(struct ceph_connection *con,
> @@ -1740,7 +1739,6 @@ static int read_partial_message(struct
> ceph_connection *con)
> int ret;
> unsigned front_len, middle_len, data_len;
> bool do_datacrc = !con->msgr->nocrc;
> - int skip;
> u64 seq;
> u32 crc;
>
> @@ -1793,9 +1791,7 @@ static int read_partial_message(struct
> ceph_connection *con)
> if (!con->in_msg) {
> dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
> con->in_hdr.front_len, con->in_hdr.data_len);
> - skip = 0;
> - con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
> - if (skip) {
> + if (ceph_con_in_msg_alloc(con, &con->in_hdr)) {
> /* skip this message */
> dout("alloc_msg said skip message\n");
> BUG_ON(con->in_msg);
> @@ -2577,46 +2573,54 @@ static int ceph_alloc_middle(struct
> ceph_connection *con, struct ceph_msg *msg)
> }
>
> /*
> - * Generic message allocator, for incoming messages.
> + * Allocate a message for receiving an incoming message on a
> + * connection, and save the result in con->in_msg. Uses the
> + * connection's private alloc_msg op if available.
> + *
> + * Returns true if the message should be skipped, false otherwise.
I would document that there are three possible outcomes:
true and in_msg == NULL .. skip next msg
false and in_msg != NULL .. msg is ready to go
false and in_msg == NULL .. ENOMEM :(
> */
> -static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
> - struct ceph_msg_header *hdr,
> - int *skip)
> +static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
> + struct ceph_msg_header *hdr)
> {
> int type = le16_to_cpu(hdr->type);
> int front_len = le32_to_cpu(hdr->front_len);
> int middle_len = le32_to_cpu(hdr->middle_len);
> - struct ceph_msg *msg = NULL;
> int ret;
>
> + BUG_ON(con->in_msg != NULL);
> +
> if (con->ops->alloc_msg) {
> + int skip = 0;
> +
> mutex_unlock(&con->mutex);
> - msg = con->ops->alloc_msg(con, hdr, skip);
> + con->in_msg = con->ops->alloc_msg(con, hdr, &skip);
> mutex_lock(&con->mutex);
> - if (!msg || *skip)
> - return NULL;
> + if (skip)
> + con->in_msg = NULL;
I think this can be a BUG_ON(con->in_msg)... the alloc_msg() op shouldn't
set skip = 1 *and* return a msg pointer. That would have leaked with the
old code, which is a bug.
> +
> + if (!con->in_msg)
> + return skip != 0;
or maybe
if (skip || !con->in_msg)
return skip;
?
Reviewed-by: Sage Weil <sage@inktank.com>
> }
> - if (!msg) {
> - *skip = 0;
> - msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
> - if (!msg) {
> + if (!con->in_msg) {
> + con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
> + if (!con->in_msg) {
> pr_err("unable to allocate msg type %d len %d\n",
> type, front_len);
> - return NULL;
> + return false;
> }
> - msg->page_alignment = le16_to_cpu(hdr->data_off);
> + con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
> }
> - memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
> + memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
>
> - if (middle_len && !msg->middle) {
> - ret = ceph_alloc_middle(con, msg);
> + if (middle_len && !con->in_msg->middle) {
> + ret = ceph_alloc_middle(con, con->in_msg);
> if (ret < 0) {
> - ceph_msg_put(msg);
> - return NULL;
> + ceph_msg_put(con->in_msg);
> + con->in_msg = NULL;
> }
> }
>
> - return msg;
> + return false;
> }
>
>
> --
> 1.7.5.4
>
> --
> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
>
^ permalink raw reply [flat|nested] 23+ messages in thread
* Re: [PATCH 1/4] libceph: have messages point to their connection
2012-06-06 3:31 ` [PATCH 1/4] libceph: have messages point to their connection Alex Elder
@ 2012-06-06 5:16 ` Sage Weil
0 siblings, 0 replies; 23+ messages in thread
From: Sage Weil @ 2012-06-06 5:16 UTC (permalink / raw)
To: Alex Elder; +Cc: ceph-devel
Reviewed-by: Sage Weil <sage@inktank.com>
On Tue, 5 Jun 2012, Alex Elder wrote:
> When a ceph message is queued for sending it is placed on a list of
> pending messages (ceph_connection->out_queue). When they are
> actually sent over the wire, they are moved from that list to
> another (ceph_connection->out_sent). When acknowledgement for the
> message is received, it is removed from the sent messages list.
>
> During that entire time the message is "in the possession" of a
> single ceph connection. Keep track of that connection in the
> message. This will be used in the next patch (and is a helpful
> bit of information for debugging anyway).
>
> Signed-off-by: Alex Elder <elder@inktank.com>
> ---
> include/linux/ceph/messenger.h | 3 +++
> net/ceph/messenger.c | 27 +++++++++++++++++++++++++--
> 2 files changed, 28 insertions(+), 2 deletions(-)
>
> diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
> index dd27837..6df837f 100644
> --- a/include/linux/ceph/messenger.h
> +++ b/include/linux/ceph/messenger.h
> @@ -77,7 +77,10 @@ struct ceph_msg {
> unsigned nr_pages; /* size of page array */
> unsigned page_alignment; /* io offset in first page */
> struct ceph_pagelist *pagelist; /* instead of pages */
> +
> + struct ceph_connection *con;
> struct list_head list_head;
> +
> struct kref kref;
> struct bio *bio; /* instead of pages/pagelist */
> struct bio *bio_iter; /* bio iterator */
> diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
> index f7c9061..59fa5fb 100644
> --- a/net/ceph/messenger.c
> +++ b/net/ceph/messenger.c
> @@ -414,6 +414,9 @@ static int con_close_socket(struct ceph_connection *con)
> static void ceph_msg_remove(struct ceph_msg *msg)
> {
> list_del_init(&msg->list_head);
> + BUG_ON(msg->con == NULL);
> + msg->con = NULL;
> +
> ceph_msg_put(msg);
> }
> static void ceph_msg_remove_list(struct list_head *head)
> @@ -433,6 +436,8 @@ static void reset_connection(struct ceph_connection
> *con)
> ceph_msg_remove_list(&con->out_sent);
>
> if (con->in_msg) {
> + BUG_ON(con->in_msg->con != con);
> + con->in_msg->con = NULL;
> ceph_msg_put(con->in_msg);
> con->in_msg = NULL;
> }
> @@ -625,8 +630,10 @@ static void prepare_write_message(struct
> ceph_connection *con)
> &con->out_temp_ack);
> }
>
> + BUG_ON(list_empty(&con->out_queue));
> m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
> con->out_msg = m;
> + BUG_ON(m->con != con);
>
> /* put message on sent list */
> ceph_msg_get(m);
> @@ -1806,6 +1813,8 @@ static int read_partial_message(struct
> ceph_connection *con)
> "error allocating memory for incoming message";
> return -ENOMEM;
> }
> +
> + BUG_ON(con->in_msg->con != con);
> m = con->in_msg;
> m->front.iov_len = 0; /* haven't read it yet */
> if (m->middle)
> @@ -1901,6 +1910,8 @@ static void process_message(struct ceph_connection
> *con)
> {
> struct ceph_msg *msg;
>
> + BUG_ON(con->in_msg->con != con);
> + con->in_msg->con = NULL;
> msg = con->in_msg;
> con->in_msg = NULL;
>
> @@ -2260,6 +2271,8 @@ static void ceph_fault(struct ceph_connection *con)
> con_close_socket(con);
>
> if (con->in_msg) {
> + BUG_ON(con->in_msg->con != con);
> + con->in_msg->con = NULL;
> ceph_msg_put(con->in_msg);
> con->in_msg = NULL;
> }
> @@ -2378,6 +2391,8 @@ void ceph_con_send(struct ceph_connection *con,
> struct ceph_msg *msg)
>
> /* queue */
> mutex_lock(&con->mutex);
> + BUG_ON(msg->con != NULL);
> + msg->con = con;
> BUG_ON(!list_empty(&msg->list_head));
> list_add_tail(&msg->list_head, &con->out_queue);
> dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
> @@ -2403,13 +2418,16 @@ void ceph_con_revoke(struct ceph_connection
> *con, struct ceph_msg *msg)
> {
> mutex_lock(&con->mutex);
> if (!list_empty(&msg->list_head)) {
> - dout("con_revoke %p msg %p - was on queue\n", con, msg);
> + dout("%s %p msg %p - was on queue\n", __func__, con, msg);
> list_del_init(&msg->list_head);
> + BUG_ON(msg->con == NULL);
> + msg->con = NULL;
> +
> ceph_msg_put(msg);
> msg->hdr.seq = 0;
> }
> if (con->out_msg == msg) {
> - dout("con_revoke %p msg %p - was sending\n", con, msg);
> + dout("%s %p msg %p - was sending\n", __func__, con, msg);
> con->out_msg = NULL;
> if (con->out_kvec_is_msg) {
> con->out_skip = con->out_kvec_bytes;
> @@ -2478,6 +2496,8 @@ struct ceph_msg *ceph_msg_new(int type, int
> front_len, gfp_t flags,
> if (m == NULL)
> goto out;
> kref_init(&m->kref);
> +
> + m->con = NULL;
> INIT_LIST_HEAD(&m->list_head);
>
> m->hdr.tid = 0;
> @@ -2595,6 +2615,8 @@ static bool ceph_con_in_msg_alloc(struct
> ceph_connection *con,
> mutex_unlock(&con->mutex);
> con->in_msg = con->ops->alloc_msg(con, hdr, &skip);
> mutex_lock(&con->mutex);
> + if (con->in_msg)
> + con->in_msg->con = con;
> if (skip)
> con->in_msg = NULL;
>
> @@ -2608,6 +2630,7 @@ static bool ceph_con_in_msg_alloc(struct
> ceph_connection *con,
> type, front_len);
> return false;
> }
> + con->in_msg->con = con;
> con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
> }
> memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
> --
> 1.7.5.4
>
> --
> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
>
^ permalink raw reply [flat|nested] 23+ messages in thread
* Re: [PATCH 3/4] libceph: make ceph_con_revoke() a msg operation
2012-06-06 3:31 ` [PATCH 3/4] libceph: make ceph_con_revoke() a msg operation Alex Elder
@ 2012-06-06 5:18 ` Sage Weil
2012-06-06 11:51 ` Alex Elder
0 siblings, 1 reply; 23+ messages in thread
From: Sage Weil @ 2012-06-06 5:18 UTC (permalink / raw)
To: Alex Elder; +Cc: ceph-devel
Reviewed-by: Sage Weil <sage@inktank.com>
I'm mildly concerned that if the msg isn't on the con the upper layer
thinks it is it is a bug, and we might be better off keeping this arg and
asserting that con == msg->con...
On Tue, 5 Jun 2012, Alex Elder wrote:
> ceph_con_revoke() is passed both a message and a ceph connection.
> Now that any message associated with a connection holds a pointer
> to that connection, there's no need to provide the connection when
> revoking a message.
>
> This has the added benefit of precluding the possibility of the
> providing the wrong connection pointer. If the message's connection
> pointer is null, it is not being tracked by any connection, so
> revoking it is a no-op. This is supported as a convenience for
> upper layers, so they can revoke a message that is not actually
> "in flight."
>
> Rename the function ceph_msg_revoke() to reflect that it is really
> an operation on a message, not a connection.
>
> Signed-off-by: Alex Elder <elder@inktank.com>
> ---
> include/linux/ceph/messenger.h | 3 ++-
> net/ceph/messenger.c | 7 ++++++-
> net/ceph/mon_client.c | 8 ++++----
> net/ceph/osd_client.c | 4 ++--
> 4 files changed, 14 insertions(+), 8 deletions(-)
>
> diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
> index 6df837f..9008f81 100644
> --- a/include/linux/ceph/messenger.h
> +++ b/include/linux/ceph/messenger.h
> @@ -239,7 +239,8 @@ extern void ceph_con_open(struct ceph_connection *con,
> extern bool ceph_con_opened(struct ceph_connection *con);
> extern void ceph_con_close(struct ceph_connection *con);
> extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg
> *msg);
> -extern void ceph_con_revoke(struct ceph_connection *con, struct
> ceph_msg *msg);
> +
> +extern void ceph_msg_revoke(struct ceph_msg *msg);
> extern void ceph_con_revoke_message(struct ceph_connection *con,
> struct ceph_msg *msg);
> extern void ceph_con_keepalive(struct ceph_connection *con);
> diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
> index 336707b..79f3032 100644
> --- a/net/ceph/messenger.c
> +++ b/net/ceph/messenger.c
> @@ -2421,8 +2421,13 @@ EXPORT_SYMBOL(ceph_con_send);
> /*
> * Revoke a message that was previously queued for send
> */
> -void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
> +void ceph_msg_revoke(struct ceph_msg *msg)
> {
> + struct ceph_connection *con = msg->con;
> +
> + if (!con)
> + return; /* Message not in our possession */
> +
> mutex_lock(&con->mutex);
> if (!list_empty(&msg->list_head)) {
> dout("%s %p msg %p - was on queue\n", __func__, con, msg);
> diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
> index ab6b24a..cf82e04 100644
> --- a/net/ceph/mon_client.c
> +++ b/net/ceph/mon_client.c
> @@ -106,7 +106,7 @@ static void __send_prepared_auth_request(struct
> ceph_mon_client *monc, int len)
> monc->pending_auth = 1;
> monc->m_auth->front.iov_len = len;
> monc->m_auth->hdr.front_len = cpu_to_le32(len);
> - ceph_con_revoke(&monc->con, monc->m_auth);
> + ceph_msg_revoke(monc->m_auth);
> ceph_msg_get(monc->m_auth); /* keep our ref */
> ceph_con_send(&monc->con, monc->m_auth);
> }
> @@ -117,7 +117,7 @@ static void __send_prepared_auth_request(struct
> ceph_mon_client *monc, int len)
> static void __close_session(struct ceph_mon_client *monc)
> {
> dout("__close_session closing mon%d\n", monc->cur_mon);
> - ceph_con_revoke(&monc->con, monc->m_auth);
> + ceph_msg_revoke(monc->m_auth);
> ceph_con_close(&monc->con);
> monc->con.private = NULL;
> monc->cur_mon = -1;
> @@ -229,7 +229,7 @@ static void __send_subscribe(struct ceph_mon_client
> *monc)
>
> msg->front.iov_len = p - msg->front.iov_base;
> msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
> - ceph_con_revoke(&monc->con, msg);
> + ceph_msg_revoke(msg);
> ceph_con_send(&monc->con, ceph_msg_get(msg));
>
> monc->sub_sent = jiffies | 1; /* never 0 */
> @@ -687,7 +687,7 @@ static void __resend_generic_request(struct
> ceph_mon_client *monc)
>
> for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
> req = rb_entry(p, struct ceph_mon_generic_request, node);
> - ceph_con_revoke(&monc->con, req->request);
> + ceph_msg_revoke(req->request);
> ceph_con_send(&monc->con, ceph_msg_get(req->request));
> }
> }
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index 448c9da..403fefb 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -852,7 +852,7 @@ static void __unregister_request(struct
> ceph_osd_client *osdc,
>
> if (req->r_osd) {
> /* make sure the original request isn't in flight. */
> - ceph_con_revoke(&req->r_osd->o_con, req->r_request);
> + ceph_msg_revoke(req->r_request);
>
> list_del_init(&req->r_osd_item);
> if (list_empty(&req->r_osd->o_requests) &&
> @@ -879,7 +879,7 @@ static void __unregister_request(struct
> ceph_osd_client *osdc,
> static void __cancel_request(struct ceph_osd_request *req)
> {
> if (req->r_sent && req->r_osd) {
> - ceph_con_revoke(&req->r_osd->o_con, req->r_request);
> + ceph_msg_revoke(req->r_request);
> req->r_sent = 0;
> }
> }
> --
> 1.7.5.4
>
> --
> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
>
^ permalink raw reply [flat|nested] 23+ messages in thread
* Re: [PATCH 4/4] libceph: make ceph_con_revoke_message() a msg op
2012-06-06 3:31 ` [PATCH 4/4] libceph: make ceph_con_revoke_message() a msg op Alex Elder
@ 2012-06-06 5:22 ` Sage Weil
2012-06-06 11:40 ` Alex Elder
0 siblings, 1 reply; 23+ messages in thread
From: Sage Weil @ 2012-06-06 5:22 UTC (permalink / raw)
To: Alex Elder; +Cc: ceph-devel
On Tue, 5 Jun 2012, Alex Elder wrote:
> ceph_con_revoke_message() is passed both a message and a ceph
> connection. A ceph_msg allocated for incoming messages on a
> connection always has a pointer to that connection, so there's no
> need to provide the connection when revoking such a message.
>
> Note that the existing logic does not preclude the message supplied
> being a null/bogus message pointer. The only user of this interface
> is the OSD client, and the only value an osd client passes is a
> request's r_reply field. That is always non-null (except briefly in
> an error path in ceph_osdc_alloc_request(), and that drops the
> only reference so the request won't ever have a reply to revoke).
> So we can safely assume the passed-in message is non-null, but add a
> BUG_ON() to make it very obvious we are imposing this restriction.
>
> Rename the function ceph_msg_revoke_incoming() to reflect that it is
> really an operation on an incoming message.
Yes
>
> Signed-off-by: Alex Elder <elder@inktank.com>
> ---
> include/linux/ceph/messenger.h | 4 ++--
> net/ceph/messenger.c | 19 +++++++++++++------
> net/ceph/osd_client.c | 9 ++++-----
> 3 files changed, 19 insertions(+), 13 deletions(-)
>
> diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
> index 9008f81..a334dbd 100644
> --- a/include/linux/ceph/messenger.h
> +++ b/include/linux/ceph/messenger.h
> @@ -241,8 +241,8 @@ extern void ceph_con_close(struct ceph_connection *con);
> extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg
> *msg);
>
> extern void ceph_msg_revoke(struct ceph_msg *msg);
> -extern void ceph_con_revoke_message(struct ceph_connection *con,
> - struct ceph_msg *msg);
> +extern void ceph_msg_revoke_incoming(struct ceph_msg *msg);
> +
> extern void ceph_con_keepalive(struct ceph_connection *con);
> extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
> extern void ceph_con_put(struct ceph_connection *con);
> diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
> index 79f3032..a036141 100644
> --- a/net/ceph/messenger.c
> +++ b/net/ceph/messenger.c
> @@ -2456,17 +2456,24 @@ void ceph_msg_revoke(struct ceph_msg *msg)
> /*
> * Revoke a message that we may be reading data into
> */
> -void ceph_con_revoke_message(struct ceph_connection *con, struct
> ceph_msg *msg)
> +void ceph_msg_revoke_incoming(struct ceph_msg *msg)
> {
> + struct ceph_connection *con;
> +
> + BUG_ON(msg == NULL);
> + if (!msg->con)
> + return; /* Message not in our posession */
This case weirds me out. I think it can happen, but maybe we should at
least get a debug msg, like the no-op one below.
Reviewed-by: Sage Weil <sage@inktank.com>
> +
> + con = msg->con;
> mutex_lock(&con->mutex);
> - if (con->in_msg && con->in_msg == msg) {
> + if (con->in_msg == msg) {
> unsigned front_len = le32_to_cpu(con->in_hdr.front_len);
> unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len);
> unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
>
> /* skip rest of message */
> - dout("con_revoke_pages %p msg %p revoked\n", con, msg);
> - con->in_base_pos = con->in_base_pos -
> + dout("%s %p msg %p revoked\n", __func__, con, msg);
> + con->in_base_pos = con->in_base_pos -
> sizeof(struct ceph_msg_header) -
> front_len -
> middle_len -
> @@ -2477,8 +2484,8 @@ void ceph_con_revoke_message(struct
> ceph_connection *con, struct ceph_msg *msg)
> con->in_tag = CEPH_MSGR_TAG_READY;
> con->in_seq++;
> } else {
> - dout("con_revoke_pages %p msg %p pages %p no-op\n",
> - con, con->in_msg, msg);
> + dout("%s %p in_msg %p msg %p no-op\n",
> + __func__, con, con->in_msg, msg);
> }
> mutex_unlock(&con->mutex);
> }
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index 403fefb..9f87edd 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -140,10 +140,9 @@ void ceph_osdc_release_request(struct kref *kref)
> if (req->r_request)
> ceph_msg_put(req->r_request);
> if (req->r_con_filling_msg) {
> - dout("release_request revoking pages %p from con %p\n",
> + dout("%s revoking pages %p from con %p\n", __func__,
> req->r_pages, req->r_con_filling_msg);
> - ceph_con_revoke_message(req->r_con_filling_msg,
> - req->r_reply);
> + ceph_msg_revoke_incoming(req->r_reply);
> req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
> }
> if (req->r_reply)
> @@ -2022,9 +2021,9 @@ static struct ceph_msg *get_reply(struct
> ceph_connection *con,
> }
>
> if (req->r_con_filling_msg) {
> - dout("get_reply revoking msg %p from old con %p\n",
> + dout("%s revoking msg %p from old con %p\n", __func__,
> req->r_reply, req->r_con_filling_msg);
> - ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
> + ceph_msg_revoke_incoming(req->r_reply);
> req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
> req->r_con_filling_msg = NULL;
> }
> --
> 1.7.5.4
>
> --
> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
>
^ permalink raw reply [flat|nested] 23+ messages in thread
* Re: [PATCH 1/2] libceph: embed ceph connection structure in mon_client
2012-06-06 3:30 ` [PATCH 1/2] libceph: embed ceph connection structure in mon_client Alex Elder
@ 2012-06-06 5:22 ` Sage Weil
2012-06-06 16:19 ` Sage Weil
1 sibling, 0 replies; 23+ messages in thread
From: Sage Weil @ 2012-06-06 5:22 UTC (permalink / raw)
To: Alex Elder; +Cc: ceph-devel
Reviewed-by: Sage Weil <sage@inktank.com>
On Tue, 5 Jun 2012, Alex Elder wrote:
> A monitor client has a pointer to a ceph connection structure in it.
> This is the only one of the three ceph client types that do it this
> way; the OSD and MDS clients embed the connection into their main
> structures. There is always exactly one ceph connection for a
> monitor client, so there is no need to allocate it separate from the
> monitor client structure.
>
> So switch the ceph_mon_client structure to embed its
> ceph_connection structure.
>
> Signed-off-by: Alex Elder <elder@inktank.com>
> ---
> include/linux/ceph/mon_client.h | 2 +-
> net/ceph/mon_client.c | 47
> ++++++++++++++++----------------------
> 2 files changed, 21 insertions(+), 28 deletions(-)
>
> diff --git a/include/linux/ceph/mon_client.h
> b/include/linux/ceph/mon_client.h
> index 545f859..2113e38 100644
> --- a/include/linux/ceph/mon_client.h
> +++ b/include/linux/ceph/mon_client.h
> @@ -70,7 +70,7 @@ struct ceph_mon_client {
> bool hunting;
> int cur_mon; /* last monitor i contacted */
> unsigned long sub_sent, sub_renew_after;
> - struct ceph_connection *con;
> + struct ceph_connection con;
> bool have_fsid;
>
> /* pending generic requests */
> diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
> index 704dc95..ac4d6b1 100644
> --- a/net/ceph/mon_client.c
> +++ b/net/ceph/mon_client.c
> @@ -106,9 +106,9 @@ static void __send_prepared_auth_request(struct
> ceph_mon_client *monc, int len)
> monc->pending_auth = 1;
> monc->m_auth->front.iov_len = len;
> monc->m_auth->hdr.front_len = cpu_to_le32(len);
> - ceph_con_revoke(monc->con, monc->m_auth);
> + ceph_con_revoke(&monc->con, monc->m_auth);
> ceph_msg_get(monc->m_auth); /* keep our ref */
> - ceph_con_send(monc->con, monc->m_auth);
> + ceph_con_send(&monc->con, monc->m_auth);
> }
>
> /*
> @@ -117,8 +117,8 @@ static void __send_prepared_auth_request(struct
> ceph_mon_client *monc, int len)
> static void __close_session(struct ceph_mon_client *monc)
> {
> dout("__close_session closing mon%d\n", monc->cur_mon);
> - ceph_con_revoke(monc->con, monc->m_auth);
> - ceph_con_close(monc->con);
> + ceph_con_revoke(&monc->con, monc->m_auth);
> + ceph_con_close(&monc->con);
> monc->cur_mon = -1;
> monc->pending_auth = 0;
> ceph_auth_reset(monc->auth);
> @@ -142,9 +142,9 @@ static int __open_session(struct ceph_mon_client *monc)
> monc->want_next_osdmap = !!monc->want_next_osdmap;
>
> dout("open_session mon%d opening\n", monc->cur_mon);
> - monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON;
> - monc->con->peer_name.num = cpu_to_le64(monc->cur_mon);
> - ceph_con_open(monc->con,
> + monc->con.peer_name.type = CEPH_ENTITY_TYPE_MON;
> + monc->con.peer_name.num = cpu_to_le64(monc->cur_mon);
> + ceph_con_open(&monc->con,
> &monc->monmap->mon_inst[monc->cur_mon].addr);
>
> /* initiatiate authentication handshake */
> @@ -226,8 +226,8 @@ static void __send_subscribe(struct ceph_mon_client
> *monc)
>
> msg->front.iov_len = p - msg->front.iov_base;
> msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
> - ceph_con_revoke(monc->con, msg);
> - ceph_con_send(monc->con, ceph_msg_get(msg));
> + ceph_con_revoke(&monc->con, msg);
> + ceph_con_send(&monc->con, ceph_msg_get(msg));
>
> monc->sub_sent = jiffies | 1; /* never 0 */
> }
> @@ -247,7 +247,7 @@ static void handle_subscribe_ack(struct
> ceph_mon_client *monc,
> if (monc->hunting) {
> pr_info("mon%d %s session established\n",
> monc->cur_mon,
> - ceph_pr_addr(&monc->con->peer_addr.in_addr));
> + ceph_pr_addr(&monc->con.peer_addr.in_addr));
> monc->hunting = false;
> }
> dout("handle_subscribe_ack after %d seconds\n", seconds);
> @@ -461,7 +461,7 @@ static int do_generic_request(struct ceph_mon_client
> *monc,
> req->request->hdr.tid = cpu_to_le64(req->tid);
> __insert_generic_request(monc, req);
> monc->num_generic_requests++;
> - ceph_con_send(monc->con, ceph_msg_get(req->request));
> + ceph_con_send(&monc->con, ceph_msg_get(req->request));
> mutex_unlock(&monc->mutex);
>
> err = wait_for_completion_interruptible(&req->completion);
> @@ -684,8 +684,8 @@ static void __resend_generic_request(struct
> ceph_mon_client *monc)
>
> for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
> req = rb_entry(p, struct ceph_mon_generic_request, node);
> - ceph_con_revoke(monc->con, req->request);
> - ceph_con_send(monc->con, ceph_msg_get(req->request));
> + ceph_con_revoke(&monc->con, req->request);
> + ceph_con_send(&monc->con, ceph_msg_get(req->request));
> }
> }
>
> @@ -705,7 +705,7 @@ static void delayed_work(struct work_struct *work)
> __close_session(monc);
> __open_session(monc); /* continue hunting */
> } else {
> - ceph_con_keepalive(monc->con);
> + ceph_con_keepalive(&monc->con);
>
> __validate_auth(monc);
>
> @@ -760,19 +760,16 @@ int ceph_monc_init(struct ceph_mon_client *monc,
> struct ceph_client *cl)
> goto out;
>
> /* connection */
> - monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
> - if (!monc->con)
> - goto out_monmap;
> - ceph_con_init(&monc->client->msgr, monc->con);
> - monc->con->private = monc;
> - monc->con->ops = &mon_con_ops;
> + ceph_con_init(&monc->client->msgr, &monc->con);
> + monc->con.private = monc;
> + monc->con.ops = &mon_con_ops;
>
> /* authentication */
> monc->auth = ceph_auth_init(cl->options->name,
> cl->options->key);
> if (IS_ERR(monc->auth)) {
> err = PTR_ERR(monc->auth);
> - goto out_con;
> + goto out_monmap;
> }
> monc->auth->want_keys =
> CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
> @@ -824,8 +821,6 @@ out_subscribe_ack:
> ceph_msg_put(monc->m_subscribe_ack);
> out_auth:
> ceph_auth_destroy(monc->auth);
> -out_con:
> - monc->con->ops->put(monc->con);
> out_monmap:
> kfree(monc->monmap);
> out:
> @@ -841,9 +836,7 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
> mutex_lock(&monc->mutex);
> __close_session(monc);
>
> - monc->con->private = NULL;
> - monc->con->ops->put(monc->con);
> - monc->con = NULL;
> + monc->con.private = NULL;
>
> mutex_unlock(&monc->mutex);
>
> @@ -1021,7 +1014,7 @@ static void mon_fault(struct ceph_connection *con)
> if (!monc->hunting)
> pr_info("mon%d %s session lost, "
> "hunting for new mon\n", monc->cur_mon,
> - ceph_pr_addr(&monc->con->peer_addr.in_addr));
> + ceph_pr_addr(&monc->con.peer_addr.in_addr));
>
> __close_session(monc);
> if (!monc->hunting) {
> --
> 1.7.5.4
>
> --
> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
>
^ permalink raw reply [flat|nested] 23+ messages in thread
* Re: [PATCH 4/4] libceph: make ceph_con_revoke_message() a msg op
2012-06-06 5:22 ` Sage Weil
@ 2012-06-06 11:40 ` Alex Elder
0 siblings, 0 replies; 23+ messages in thread
From: Alex Elder @ 2012-06-06 11:40 UTC (permalink / raw)
To: Sage Weil; +Cc: ceph-devel
On 06/06/2012 12:22 AM, Sage Weil wrote:
>> diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
>> > index 79f3032..a036141 100644
>> > --- a/net/ceph/messenger.c
>> > +++ b/net/ceph/messenger.c
>> > @@ -2456,17 +2456,24 @@ void ceph_msg_revoke(struct ceph_msg *msg)
>> > /*
>> > * Revoke a message that we may be reading data into
>> > */
>> > -void ceph_con_revoke_message(struct ceph_connection *con, struct
>> > ceph_msg *msg)
>> > +void ceph_msg_revoke_incoming(struct ceph_msg *msg)
>> > {
>> > + struct ceph_connection *con;
>> > +
>> > + BUG_ON(msg == NULL);
>> > + if (!msg->con)
>> > + return; /* Message not in our posession */
> This case weirds me out. I think it can happen, but maybe we should at
> least get a debug msg, like the no-op one below.
>
> Reviewed-by: Sage Weil <sage@inktank.com>
>
I have not been inserting debug messages, but probably should be.
I'm just not accustomed to doing that.
I am not actually convinced this can happen, but did it this way to
match what was happening in the ceph_msg_revoke() case, where
the caller would blindly revoke a message when closing, in case
it was currently queued.
However given how this is used now (only called if an osd_client request
thinks it is waiting for the connection to fill the reply) I see
why it weirds you out--it's conceivable the osd client's notion of
which connection holds the message is out of synch with reality.
I truly want to do away with that osd client's req->r_con_filling_msg
field, which now is kept only for the benefit of reference counting the
ceph_osd...
I will put in a dout() call in this case.
-Alex
^ permalink raw reply [flat|nested] 23+ messages in thread
* Re: [PATCH 3/4] libceph: make ceph_con_revoke() a msg operation
2012-06-06 5:18 ` Sage Weil
@ 2012-06-06 11:51 ` Alex Elder
0 siblings, 0 replies; 23+ messages in thread
From: Alex Elder @ 2012-06-06 11:51 UTC (permalink / raw)
To: Sage Weil; +Cc: ceph-devel
On 06/06/2012 12:18 AM, Sage Weil wrote:
> I'm mildly concerned that if the msg isn't on the con the upper layer
> thinks it is it is a bug, and we might be better off keeping this arg and
> asserting that con == msg->con...
The patch I burned myself on over the weekend actually assumed that
the msg->con was available to use by the client code. I.e., it
assumed that it was not reset to NULL whenever the message left
the connection layer.
Making it null like that allows for some run-time verification of
things (e.g., BUG_ON(msg->con == NULL)) but those aren't strictly
necessary, so we could have msg->con hold whatever non-null value
it was last assigned. We could even add a flag to indicate the
notion of who "owns" it if we still wanted the verification.
I'll just keep it as-is unless you have a strong preference for
another option; I'm open.
-Alex
^ permalink raw reply [flat|nested] 23+ messages in thread
* Re: [PATCH] libceph: osd_client: don't drop reply reference too early
2012-06-06 3:30 ` [PATCH] libceph: osd_client: don't drop reply reference too early Alex Elder
@ 2012-06-06 16:18 ` Sage Weil
0 siblings, 0 replies; 23+ messages in thread
From: Sage Weil @ 2012-06-06 16:18 UTC (permalink / raw)
To: Alex Elder; +Cc: ceph-devel
On Tue, 5 Jun 2012, Alex Elder wrote:
> In ceph_osdc_release_request(), a reference to the r_reply message
> is dropped. But just after that, that same message is revoked if it
> was in use to receive an incoming reply. Reorder these so we are
> sure we hold a reference until we're actually done with the message.
>
> Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Sage Weil <sage@inktank.com>
Good catch.
> ---
> net/ceph/osd_client.c | 4 ++--
> 1 files changed, 2 insertions(+), 2 deletions(-)
>
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index e30efbc..d8b6d31 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -139,8 +139,6 @@ void ceph_osdc_release_request(struct kref *kref)
>
> if (req->r_request)
> ceph_msg_put(req->r_request);
> - if (req->r_reply)
> - ceph_msg_put(req->r_reply);
> if (req->r_con_filling_msg) {
> dout("release_request revoking pages %p from con %p\n",
> req->r_pages, req->r_con_filling_msg);
> @@ -148,6 +146,8 @@ void ceph_osdc_release_request(struct kref *kref)
> req->r_reply);
> ceph_con_put(req->r_con_filling_msg);
> }
> + if (req->r_reply)
> + ceph_msg_put(req->r_reply);
> if (req->r_own_pages)
> ceph_release_page_vector(req->r_pages,
> req->r_num_pages);
> --
> 1.7.5.4
>
> --
> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
>
^ permalink raw reply [flat|nested] 23+ messages in thread
* Re: [PATCH 1/2] libceph: embed ceph connection structure in mon_client
2012-06-06 3:30 ` [PATCH 1/2] libceph: embed ceph connection structure in mon_client Alex Elder
2012-06-06 5:22 ` Sage Weil
@ 2012-06-06 16:19 ` Sage Weil
1 sibling, 0 replies; 23+ messages in thread
From: Sage Weil @ 2012-06-06 16:19 UTC (permalink / raw)
To: Alex Elder; +Cc: ceph-devel
Reviewed-by: Sage Weil <sage@inktank.com>
On Tue, 5 Jun 2012, Alex Elder wrote:
> A monitor client has a pointer to a ceph connection structure in it.
> This is the only one of the three ceph client types that do it this
> way; the OSD and MDS clients embed the connection into their main
> structures. There is always exactly one ceph connection for a
> monitor client, so there is no need to allocate it separate from the
> monitor client structure.
>
> So switch the ceph_mon_client structure to embed its
> ceph_connection structure.
>
> Signed-off-by: Alex Elder <elder@inktank.com>
> ---
> include/linux/ceph/mon_client.h | 2 +-
> net/ceph/mon_client.c | 47
> ++++++++++++++++----------------------
> 2 files changed, 21 insertions(+), 28 deletions(-)
>
> diff --git a/include/linux/ceph/mon_client.h
> b/include/linux/ceph/mon_client.h
> index 545f859..2113e38 100644
> --- a/include/linux/ceph/mon_client.h
> +++ b/include/linux/ceph/mon_client.h
> @@ -70,7 +70,7 @@ struct ceph_mon_client {
> bool hunting;
> int cur_mon; /* last monitor i contacted */
> unsigned long sub_sent, sub_renew_after;
> - struct ceph_connection *con;
> + struct ceph_connection con;
> bool have_fsid;
>
> /* pending generic requests */
> diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
> index 704dc95..ac4d6b1 100644
> --- a/net/ceph/mon_client.c
> +++ b/net/ceph/mon_client.c
> @@ -106,9 +106,9 @@ static void __send_prepared_auth_request(struct
> ceph_mon_client *monc, int len)
> monc->pending_auth = 1;
> monc->m_auth->front.iov_len = len;
> monc->m_auth->hdr.front_len = cpu_to_le32(len);
> - ceph_con_revoke(monc->con, monc->m_auth);
> + ceph_con_revoke(&monc->con, monc->m_auth);
> ceph_msg_get(monc->m_auth); /* keep our ref */
> - ceph_con_send(monc->con, monc->m_auth);
> + ceph_con_send(&monc->con, monc->m_auth);
> }
>
> /*
> @@ -117,8 +117,8 @@ static void __send_prepared_auth_request(struct
> ceph_mon_client *monc, int len)
> static void __close_session(struct ceph_mon_client *monc)
> {
> dout("__close_session closing mon%d\n", monc->cur_mon);
> - ceph_con_revoke(monc->con, monc->m_auth);
> - ceph_con_close(monc->con);
> + ceph_con_revoke(&monc->con, monc->m_auth);
> + ceph_con_close(&monc->con);
> monc->cur_mon = -1;
> monc->pending_auth = 0;
> ceph_auth_reset(monc->auth);
> @@ -142,9 +142,9 @@ static int __open_session(struct ceph_mon_client *monc)
> monc->want_next_osdmap = !!monc->want_next_osdmap;
>
> dout("open_session mon%d opening\n", monc->cur_mon);
> - monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON;
> - monc->con->peer_name.num = cpu_to_le64(monc->cur_mon);
> - ceph_con_open(monc->con,
> + monc->con.peer_name.type = CEPH_ENTITY_TYPE_MON;
> + monc->con.peer_name.num = cpu_to_le64(monc->cur_mon);
> + ceph_con_open(&monc->con,
> &monc->monmap->mon_inst[monc->cur_mon].addr);
>
> /* initiatiate authentication handshake */
> @@ -226,8 +226,8 @@ static void __send_subscribe(struct ceph_mon_client
> *monc)
>
> msg->front.iov_len = p - msg->front.iov_base;
> msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
> - ceph_con_revoke(monc->con, msg);
> - ceph_con_send(monc->con, ceph_msg_get(msg));
> + ceph_con_revoke(&monc->con, msg);
> + ceph_con_send(&monc->con, ceph_msg_get(msg));
>
> monc->sub_sent = jiffies | 1; /* never 0 */
> }
> @@ -247,7 +247,7 @@ static void handle_subscribe_ack(struct
> ceph_mon_client *monc,
> if (monc->hunting) {
> pr_info("mon%d %s session established\n",
> monc->cur_mon,
> - ceph_pr_addr(&monc->con->peer_addr.in_addr));
> + ceph_pr_addr(&monc->con.peer_addr.in_addr));
> monc->hunting = false;
> }
> dout("handle_subscribe_ack after %d seconds\n", seconds);
> @@ -461,7 +461,7 @@ static int do_generic_request(struct ceph_mon_client
> *monc,
> req->request->hdr.tid = cpu_to_le64(req->tid);
> __insert_generic_request(monc, req);
> monc->num_generic_requests++;
> - ceph_con_send(monc->con, ceph_msg_get(req->request));
> + ceph_con_send(&monc->con, ceph_msg_get(req->request));
> mutex_unlock(&monc->mutex);
>
> err = wait_for_completion_interruptible(&req->completion);
> @@ -684,8 +684,8 @@ static void __resend_generic_request(struct
> ceph_mon_client *monc)
>
> for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
> req = rb_entry(p, struct ceph_mon_generic_request, node);
> - ceph_con_revoke(monc->con, req->request);
> - ceph_con_send(monc->con, ceph_msg_get(req->request));
> + ceph_con_revoke(&monc->con, req->request);
> + ceph_con_send(&monc->con, ceph_msg_get(req->request));
> }
> }
>
> @@ -705,7 +705,7 @@ static void delayed_work(struct work_struct *work)
> __close_session(monc);
> __open_session(monc); /* continue hunting */
> } else {
> - ceph_con_keepalive(monc->con);
> + ceph_con_keepalive(&monc->con);
>
> __validate_auth(monc);
>
> @@ -760,19 +760,16 @@ int ceph_monc_init(struct ceph_mon_client *monc,
> struct ceph_client *cl)
> goto out;
>
> /* connection */
> - monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
> - if (!monc->con)
> - goto out_monmap;
> - ceph_con_init(&monc->client->msgr, monc->con);
> - monc->con->private = monc;
> - monc->con->ops = &mon_con_ops;
> + ceph_con_init(&monc->client->msgr, &monc->con);
> + monc->con.private = monc;
> + monc->con.ops = &mon_con_ops;
>
> /* authentication */
> monc->auth = ceph_auth_init(cl->options->name,
> cl->options->key);
> if (IS_ERR(monc->auth)) {
> err = PTR_ERR(monc->auth);
> - goto out_con;
> + goto out_monmap;
> }
> monc->auth->want_keys =
> CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
> @@ -824,8 +821,6 @@ out_subscribe_ack:
> ceph_msg_put(monc->m_subscribe_ack);
> out_auth:
> ceph_auth_destroy(monc->auth);
> -out_con:
> - monc->con->ops->put(monc->con);
> out_monmap:
> kfree(monc->monmap);
> out:
> @@ -841,9 +836,7 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
> mutex_lock(&monc->mutex);
> __close_session(monc);
>
> - monc->con->private = NULL;
> - monc->con->ops->put(monc->con);
> - monc->con = NULL;
> + monc->con.private = NULL;
>
> mutex_unlock(&monc->mutex);
>
> @@ -1021,7 +1014,7 @@ static void mon_fault(struct ceph_connection *con)
> if (!monc->hunting)
> pr_info("mon%d %s session lost, "
> "hunting for new mon\n", monc->cur_mon,
> - ceph_pr_addr(&monc->con->peer_addr.in_addr));
> + ceph_pr_addr(&monc->con.peer_addr.in_addr));
>
> __close_session(monc);
> if (!monc->hunting) {
> --
> 1.7.5.4
>
> --
> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
>
^ permalink raw reply [flat|nested] 23+ messages in thread
* Re: [PATCH 2/4] libceph: have messages take a connection reference
2012-06-06 3:31 ` [PATCH 2/4] libceph: have messages take a connection reference Alex Elder
@ 2012-06-06 17:06 ` Sage Weil
2012-06-06 17:34 ` Alex Elder
0 siblings, 1 reply; 23+ messages in thread
From: Sage Weil @ 2012-06-06 17:06 UTC (permalink / raw)
To: Alex Elder; +Cc: ceph-devel
On Tue, 5 Jun 2012, Alex Elder wrote:
> There are essentially two types of ceph messages: incoming and
> outgoing. Outgoing messages are always allocated via ceph_msg_new(),
> and at the time of their allocation they are not associated with any
> particular connection. Incoming messages are always allocated via
> ceph_con_in_msg_alloc(), and they are initially associated with the
> connection from which incoming data will be placed into the message.
>
> When an outgoing message gets sent, it becomes associated with a
> connection and remains that way until the message is successfully
> sent. The association of an incoming message goes away at the point
> it is sent to an upper layer via a con->ops->dispatch method.
>
> This patch implements reference counting for all ceph messages, such
> that every message holds a reference (and a pointer) to a connection
> if and only if it is associated with that connection (as described
> above).
>
>
> For background, here is an explanation of the ceph message
> lifecycle, emphasizing when an association exists between a message
> and a connection.
>
> Outgoing Messages
> An outgoing message is "owned" by its allocator, from the time it is
> allocated in ceph_msg_new() up to the point it gets queued for
> sending in ceph_con_send(). Prior to that point the message's
> msg->con pointer is null; at the point it is queued for sending its
> message pointer is assigned to refer to the connection. At that
> time the message is inserted into a connection's out_queue list.
>
> When a message on the out_queue list has been sent to the socket
> layer to be put on the wire, it is transferred out of that list and
> into the connection's out_sent list. At that point it is still owned
> by the connection, and will remain so until an acknowledgement is
> received from the recipient that indicates the message was
> successfully transferred. When such an acknowledgement is received
> (in process_ack()), the message is removed from its list (in
> ceph_msg_remove()), at which point it is no longer associated with
> the connection.
>
> So basically, any time a message is on one of a connection's lists,
> it is associated with that connection. Reference counting outgoing
> messages can thus be done at the points a message is added to the
> out_queue (in ceph_con_send()) and the point it is removed from
> either its two lists (in ceph_msg_remove())--at which point its
> connection pointer becomes null.
>
> Incoming Messages
> When an incoming message on a connection is getting read (in
> read_partial_message()) and there is no message in con->in_msg,
> a new one is allocated using ceph_con_in_msg_alloc(). At that
> point the message is associated with the connection. Once that
> message has been completely and successfully read, it is passed to
> upper layer code using the connection's con->ops->dispatch method.
> At that point the association between the message and the connection
> no longer exists.
>
> Reference counting of connections for incoming messages can be done
> by taking a reference to the connection when the message gets
> allocated, and releasing that reference when it gets handed off
> using the dispatch method.
>
> We should never fail to get a connection reference for a
> message--the since the caller should already hold one.
>
> Signed-off-by: Alex Elder <elder@inktank.com>
I have some reservations about this one. Not because it's wrong, but
because I'm not sure what it will fix (now or later). Right now the con
has references to the messages. If we add refs the other way around, we
need to make sure those refs go away or else the con will leak. And in
order to do that, the upper layer has to properly clean up. Currently, if
it doesn't clean up, the messages would point to an invalid/released con,
but nothing would reference them.. they'd leak. With this change, we
would also leak the con and it's containing structure. I'm not sure
that's any better or worse.
The net effect here is that we're doing a lot of additional atomic
operations on the containing object's ref count. Not sure if that cost is
important.
What do you think?
> ---
> net/ceph/messenger.c | 24 ++++++++++++++++++------
> 1 files changed, 18 insertions(+), 6 deletions(-)
>
> diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
> index 59fa5fb..336707b 100644
> --- a/net/ceph/messenger.c
> +++ b/net/ceph/messenger.c
> @@ -415,6 +415,7 @@ static void ceph_msg_remove(struct ceph_msg *msg)
> {
> list_del_init(&msg->list_head);
> BUG_ON(msg->con == NULL);
> + ceph_con_put(msg->con);
> msg->con = NULL;
>
> ceph_msg_put(msg);
> @@ -440,6 +441,7 @@ static void reset_connection(struct ceph_connection
> *con)
> con->in_msg->con = NULL;
> ceph_msg_put(con->in_msg);
> con->in_msg = NULL;
> + ceph_con_put(con->in_msg->con);
> }
>
> con->connect_seq = 0;
> @@ -1914,6 +1916,7 @@ static void process_message(struct ceph_connection
> *con)
> con->in_msg->con = NULL;
> msg = con->in_msg;
> con->in_msg = NULL;
> + ceph_con_put(con);
>
> /* if first message, set peer_name */
> if (con->peer_name.type == 0)
> @@ -2275,6 +2278,7 @@ static void ceph_fault(struct ceph_connection *con)
> con->in_msg->con = NULL;
> ceph_msg_put(con->in_msg);
> con->in_msg = NULL;
> + ceph_con_put(con);
> }
>
> /* Requeue anything that hasn't been acked */
> @@ -2391,8 +2395,11 @@ void ceph_con_send(struct ceph_connection *con,
> struct ceph_msg *msg)
>
> /* queue */
> mutex_lock(&con->mutex);
> +
> BUG_ON(msg->con != NULL);
> - msg->con = con;
> + msg->con = ceph_con_get(con);
> + BUG_ON(msg->con == NULL);
> +
> BUG_ON(!list_empty(&msg->list_head));
> list_add_tail(&msg->list_head, &con->out_queue);
> dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
> @@ -2421,10 +2428,11 @@ void ceph_con_revoke(struct ceph_connection
> *con, struct ceph_msg *msg)
> dout("%s %p msg %p - was on queue\n", __func__, con, msg);
> list_del_init(&msg->list_head);
> BUG_ON(msg->con == NULL);
> + ceph_con_put(msg->con);
> msg->con = NULL;
> + msg->hdr.seq = 0;
>
> ceph_msg_put(msg);
> - msg->hdr.seq = 0;
> }
> if (con->out_msg == msg) {
> dout("%s %p msg %p - was sending\n", __func__, con, msg);
> @@ -2433,8 +2441,9 @@ void ceph_con_revoke(struct ceph_connection *con,
> struct ceph_msg *msg)
> con->out_skip = con->out_kvec_bytes;
> con->out_kvec_is_msg = false;
> }
> - ceph_msg_put(msg);
> msg->hdr.seq = 0;
> +
> + ceph_msg_put(msg);
> }
> mutex_unlock(&con->mutex);
> }
> @@ -2615,8 +2624,10 @@ static bool ceph_con_in_msg_alloc(struct
> ceph_connection *con,
> mutex_unlock(&con->mutex);
> con->in_msg = con->ops->alloc_msg(con, hdr, &skip);
> mutex_lock(&con->mutex);
> - if (con->in_msg)
> - con->in_msg->con = con;
> + if (con->in_msg) {
> + con->in_msg->con = ceph_con_get(con);
> + BUG_ON(con->in_msg->con == NULL);
> + }
> if (skip)
> con->in_msg = NULL;
>
> @@ -2630,7 +2641,8 @@ static bool ceph_con_in_msg_alloc(struct
> ceph_connection *con,
> type, front_len);
> return false;
> }
> - con->in_msg->con = con;
> + con->in_msg->con = ceph_con_get(con);
> + BUG_ON(con->in_msg->con == NULL);
> con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
> }
> memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
> --
> 1.7.5.4
>
> --
> To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at http://vger.kernel.org/majordomo-info.html
>
>
^ permalink raw reply [flat|nested] 23+ messages in thread
* Re: [PATCH 2/4] libceph: have messages take a connection reference
2012-06-06 17:06 ` Sage Weil
@ 2012-06-06 17:34 ` Alex Elder
0 siblings, 0 replies; 23+ messages in thread
From: Alex Elder @ 2012-06-06 17:34 UTC (permalink / raw)
To: Sage Weil; +Cc: ceph-devel
On 06/06/2012 12:06 PM, Sage Weil wrote:
> I have some reservations about this one. Not because it's wrong, but
> because I'm not sure what it will fix (now or later). Right now the con
> has references to the messages. If we add refs the other way around, we
> need to make sure those refs go away or else the con will leak. And in
> order to do that, the upper layer has to properly clean up. Currently, if
> it doesn't clean up, the messages would point to an invalid/released con,
> but nothing would reference them.. they'd leak. With this change, we
> would also leak the con and it's containing structure. I'm not sure
> that's any better or worse.
>
> The net effect here is that we're doing a lot of additional atomic
> operations on the containing object's ref count. Not sure if that cost is
> important.
>
> What do you think?
I think you make good points.
I don't know that this change directly fixes anything, but it might
help verify things as we do ongoing development.
I also think that an upper layer not properly cleaning up is a bug,
which we will fix (and which the reference counting *could* help
identify). Also, I am not concerned about the cost of the atomic
operations, at least not now. We have much more important performance
concerns to address before start worrying about that kind of extra
cost. If we do find that the cost is excessive, we can remove them
(or maybe make them a debug-only feature).
The main thing I want is having the messenger pointing to its
connection, because if we are ever again debugging memory from
a crash it will be very helpful to have. Beyond that though, the
reference counting is a nice thing to keep things validated.
So I guess--as long as it is working--I'd like to keep it in there.
-Alex
^ permalink raw reply [flat|nested] 23+ messages in thread
end of thread, other threads:[~2012-06-06 17:34 UTC | newest]
Thread overview: 23+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2012-06-06 3:24 [PATCH 00/11] continued messenger-related changes Alex Elder
2012-06-06 3:30 ` [PATCH] libceph: osd_client: don't drop reply reference too early Alex Elder
2012-06-06 16:18 ` Sage Weil
2012-06-06 3:30 ` [PATCH] libceph: use con get/put ops from osd_client Alex Elder
2012-06-06 3:30 ` [PATCH 1/2] libceph: embed ceph connection structure in mon_client Alex Elder
2012-06-06 5:22 ` Sage Weil
2012-06-06 16:19 ` Sage Weil
2012-06-06 3:31 ` [PATCH 2/2] libceph: drop connection refcounting for mon_client Alex Elder
2012-06-06 3:31 ` [PATCH 1/2] libceph: init monitor connection when opening Alex Elder
2012-06-06 3:31 ` [PATCH 2/2] libceph: fully initialize connection in con_init() Alex Elder
2012-06-06 3:31 ` [PATCH] libceph: tweak ceph_alloc_msg() Alex Elder
2012-06-06 5:14 ` Sage Weil
2012-06-06 3:31 ` [PATCH 1/4] libceph: have messages point to their connection Alex Elder
2012-06-06 5:16 ` Sage Weil
2012-06-06 3:31 ` [PATCH 2/4] libceph: have messages take a connection reference Alex Elder
2012-06-06 17:06 ` Sage Weil
2012-06-06 17:34 ` Alex Elder
2012-06-06 3:31 ` [PATCH 3/4] libceph: make ceph_con_revoke() a msg operation Alex Elder
2012-06-06 5:18 ` Sage Weil
2012-06-06 11:51 ` Alex Elder
2012-06-06 3:31 ` [PATCH 4/4] libceph: make ceph_con_revoke_message() a msg op Alex Elder
2012-06-06 5:22 ` Sage Weil
2012-06-06 11:40 ` Alex Elder
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.