* [PATCH] fs/ceph/messenger: add separate lock for `out_queue`
@ 2026-07-03 19:38 Max Kellermann
0 siblings, 0 replies; only message in thread
From: Max Kellermann @ 2026-07-03 19:38 UTC (permalink / raw)
To: idryomov, amarkuze, ceph-devel, linux-kernel; +Cc: Max Kellermann
There is a lot of lock contention for `ceph_connection.mutex` because
both the worker kthread (i.e. ceph_con_workfn()) and ceph_con_send()
are using this mutex.
ceph_con_workfn() does the actual socket I/O and handles incoming
packets; it can hold the mutex for very long periods of time.
ceph_con_send(), on the other hand, may be called by arbitrary
userspace processes invoking system calls for I/O on Ceph files. This
means heavy contention of the mutex may slow down thousands of
userspace processes.
However, ceph_con_send() accesses just con->out_queue and little else.
The real work is later done by ceph_con_workfn(). It is therefore
trivial to introduce a light-weight spinlock just for `out_queue` and
eliminate almost all of the contention.
This is a /proc/lock_stat I captured on one of our web servers for 5
minutes without to this patch:
class name con-bounces contentions waittime-min waittime-max waittime-total waittime-avg acq-bounces acquisitions holdtime-min holdtime-max holdtime-total holdtime-avg
&con->mutex: 15918 15962 0.40 37877.04 7853358.52 492.00 13430455 33994666 0.08 93555.76 446452667.84 13.13
-----------
&con->mutex 15902 [<00000000aacd116f>] ceph_con_send+0x5c/0x170
&con->mutex 51 [<000000001b111558>] ceph_con_in_msg_alloc+0x74/0x1c0
&con->mutex 5 [<0000000068ae10b2>] ceph_con_keepalive+0x28/0x80
&con->mutex 3 [<000000009208fc49>] ceph_con_workfn+0x30/0x418
-----------
&con->mutex 13253 [<00000000c9c4e344>] ceph_con_process_message+0x90/0xd0
&con->mutex 1792 [<000000009208fc49>] ceph_con_workfn+0x30/0x418
&con->mutex 897 [<000000001b111558>] ceph_con_in_msg_alloc+0x74/0x1c0
&con->mutex 20 [<00000000aacd116f>] ceph_con_send+0x5c/0x170
ceph_con_process_message() is holding the mutex all the time, and
there are lots of ceph_con_send() calls waiting for it. Each of these
gets delayed by an average of 0.5 ms.
Now the same but with this patch:
class name con-bounces contentions waittime-min waittime-max waittime-total waittime-avg acq-bounces acquisitions holdtime-min holdtime-max holdtime-total holdtime-avg
&con->out_lock: 63452 64484 0.04 835.92 210874.20 3.27 13212464 36953693 0.04 902.60 16032508.44 0.43
--------------
&con->out_lock 42003 [<00000000e6da95fb>] ceph_con_send+0x64/0x1a0
&con->out_lock 4451 [<00000000981ab2f3>] ceph_con_discard_sent+0x48/0x188
&con->out_lock 18025 [<000000009ca1ffc1>] ceph_con_v1_try_write+0x2d8/0x768
&con->out_lock 5 [<0000000030330169>] ceph_con_get_out_msg+0xc0/0x210
--------------
&con->out_lock 34132 [<000000009ca1ffc1>] ceph_con_v1_try_write+0x2d8/0x768
&con->out_lock 23239 [<00000000e6da95fb>] ceph_con_send+0x64/0x1a0
&con->out_lock 7112 [<00000000981ab2f3>] ceph_con_discard_sent+0x48/0x188
&con->out_lock 1 [<0000000030330169>] ceph_con_get_out_msg+0xc0/0x210
&con->mutex: 16 16 7.96 7130.96 13680.60 855.04 4962257 28359208 0.08 237939.28 369261170.32 13.02
-----------
&con->mutex 16 [<00000000f3e4b849>] ceph_con_keepalive+0x28/0x80
-----------
&con->mutex 15 [<000000005fc149e2>] ceph_con_process_message+0x90/0xd0
&con->mutex 1 [<000000006b2ce95a>] ceph_con_in_msg_alloc+0x74/0x1c0
Almost no contentions remain on the old `mutex`. The new `out_lock`
has more contentions than before, but these are shorter: the total
wait time of both is reduced by a factor of 35. The average wait time
was reduced from 492 us to 3 us.
Signed-off-by: Max Kellermann <max.kellermann@ionos.com>
---
include/linux/ceph/messenger.h | 19 +++++++-
net/ceph/messenger.c | 79 +++++++++++++++++++++++++++++-----
net/ceph/messenger_v1.c | 17 ++++++++
net/ceph/messenger_v2.c | 31 +++++++++++++
4 files changed, 135 insertions(+), 11 deletions(-)
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 6aa4c6478c9f..adfa4df2856a 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -476,7 +476,13 @@ struct ceph_connection {
struct ceph_messenger *msgr;
- int state; /* CEPH_CON_S_* */
+ /**
+ * CEPH_CON_S_*
+ *
+ * Protected by both #mutex and #out_lock. Writing must hold
+ * both, reading either one.
+ */
+ int state;
atomic_t sock_state;
struct socket *sock;
@@ -490,6 +496,12 @@ struct ceph_connection {
struct mutex mutex;
/* out queue */
+
+ /**
+ * Protects #out_queue and #out_sent.
+ */
+ spinlock_t out_lock;
+
struct list_head out_queue;
struct list_head out_sent; /* sending or sent but unacked */
u64 out_seq; /* last message queued for send */
@@ -550,6 +562,11 @@ void ceph_addr_set_port(struct ceph_entity_addr *addr, int p);
void ceph_con_process_message(struct ceph_connection *con);
int ceph_con_in_msg_alloc(struct ceph_connection *con,
struct ceph_msg_header *hdr, int *skip);
+
+/**
+ * Caller must hold both `ceph_connection.mutex` and
+ * `ceph_connection.out_lock`.
+ */
struct ceph_msg *ceph_con_get_out_msg(struct ceph_connection *con);
/* messenger_v1.c */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 34b3097b4c7b..3bd88054817a 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -554,12 +554,22 @@ static void ceph_msg_remove_list(struct list_head *head)
void ceph_con_reset_session(struct ceph_connection *con)
{
+ LIST_HEAD(tmp);
+
dout("%s con %p\n", __func__, con);
WARN_ON(con->in_msg);
WARN_ON(con->out_msg);
- ceph_msg_remove_list(&con->out_queue);
- ceph_msg_remove_list(&con->out_sent);
+
+ /* move all messages to the stack and call
+ * ceph_msg_remove_list() without holding the spinlock
+ */
+ spin_lock(&con->out_lock);
+ list_splice_init(&con->out_queue, &tmp);
+ list_splice_init(&con->out_sent, &tmp);
+ spin_unlock(&con->out_lock);
+ ceph_msg_remove_list(&tmp);
+
con->out_seq = 0;
con->in_seq = 0;
con->in_seq_acked = 0;
@@ -577,7 +587,9 @@ void ceph_con_close(struct ceph_connection *con)
{
mutex_lock(&con->mutex);
dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr));
+ spin_lock(&con->out_lock);
con->state = CEPH_CON_S_CLOSED;
+ spin_unlock(&con->out_lock);
ceph_con_flag_clear(con, CEPH_CON_F_LOSSYTX); /* so we retry next
connect */
@@ -603,7 +615,9 @@ void ceph_con_open(struct ceph_connection *con,
dout("con_open %p %s\n", con, ceph_pr_addr(addr));
WARN_ON(con->state != CEPH_CON_S_CLOSED);
+ spin_lock(&con->out_lock);
con->state = CEPH_CON_S_PREOPEN;
+ spin_unlock(&con->out_lock);
con->peer_name.type = (__u8) entity_type;
con->peer_name.num = cpu_to_le64(entity_num);
@@ -642,6 +656,7 @@ void ceph_con_init(struct ceph_connection *con, void *private,
con_sock_state_init(con);
mutex_init(&con->mutex);
+ spin_lock_init(&con->out_lock);
INIT_LIST_HEAD(&con->out_queue);
INIT_LIST_HEAD(&con->out_sent);
INIT_DELAYED_WORK(&con->work, ceph_con_workfn);
@@ -671,10 +686,16 @@ u32 ceph_get_global_seq(struct ceph_messenger *msgr, u32 gt)
*/
void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq)
{
+ LIST_HEAD(tmp);
struct ceph_msg *msg;
u64 seq;
dout("%s con %p ack_seq %llu\n", __func__, con, ack_seq);
+
+ /* move all discarded messages to the stack and call
+ * ceph_msg_remove_list() without holding the spinlock
+ */
+ spin_lock(&con->out_lock);
while (!list_empty(&con->out_sent)) {
msg = list_first_entry(&con->out_sent, struct ceph_msg,
list_head);
@@ -685,8 +706,11 @@ void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq)
dout("%s con %p discarding msg %p seq %llu\n", __func__, con,
msg, seq);
- ceph_msg_remove(msg);
+ list_move_tail(&msg->list_head, &tmp);
}
+ spin_unlock(&con->out_lock);
+
+ ceph_msg_remove_list(&tmp);
}
/*
@@ -696,10 +720,16 @@ void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq)
*/
void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
{
+ LIST_HEAD(tmp);
struct ceph_msg *msg;
u64 seq;
dout("%s con %p reconnect_seq %llu\n", __func__, con, reconnect_seq);
+
+ /* move all discarded messages to the stack and call
+ * ceph_msg_remove_list() without holding the spinlock
+ */
+ spin_lock(&con->out_lock);
while (!list_empty(&con->out_queue)) {
msg = list_first_entry(&con->out_queue, struct ceph_msg,
list_head);
@@ -711,8 +741,11 @@ void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
dout("%s con %p discarding msg %p seq %llu\n", __func__, con,
msg, seq);
- ceph_msg_remove(msg);
+ list_move_tail(&msg->list_head, &tmp);
}
+ spin_unlock(&con->out_lock);
+
+ ceph_msg_remove_list(&tmp);
}
#ifdef CONFIG_BLOCK
@@ -1626,9 +1659,12 @@ static void con_fault(struct ceph_connection *con)
ceph_con_reset_protocol(con);
+ spin_lock(&con->out_lock);
+
if (ceph_con_flag_test(con, CEPH_CON_F_LOSSYTX)) {
dout("fault on LOSSYTX channel, marking CLOSED\n");
con->state = CEPH_CON_S_CLOSED;
+ spin_unlock(&con->out_lock);
return;
}
@@ -1642,6 +1678,7 @@ static void con_fault(struct ceph_connection *con)
dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
con->state = CEPH_CON_S_STANDBY;
+ spin_unlock(&con->out_lock);
} else {
/* retry after a delay. */
con->state = CEPH_CON_S_PREOPEN;
@@ -1653,6 +1690,7 @@ static void con_fault(struct ceph_connection *con)
con->delay = MAX_DELAY_INTERVAL;
}
ceph_con_flag_set(con, CEPH_CON_F_BACKOFF);
+ spin_unlock(&con->out_lock);
queue_con(con);
}
}
@@ -1716,7 +1754,9 @@ static void clear_standby(struct ceph_connection *con)
/* come back from STANDBY? */
if (con->state == CEPH_CON_S_STANDBY) {
dout("clear_standby %p\n", con);
+ spin_lock(&con->out_lock);
con->state = CEPH_CON_S_PREOPEN;
+ spin_unlock(&con->out_lock);
if (!ceph_msgr2(from_msgr(con->msgr)))
con->v1.connect_seq++;
WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING));
@@ -1736,12 +1776,14 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
msg->needs_out_seq = true;
- mutex_lock(&con->mutex);
+ msg_con_set(msg, NULL);
+
+ spin_lock(&con->out_lock);
if (con->state == CEPH_CON_S_CLOSED) {
dout("con_send %p closed, dropping %p\n", con, msg);
+ spin_unlock(&con->out_lock);
ceph_msg_put(msg);
- mutex_unlock(&con->mutex);
return;
}
@@ -1756,8 +1798,14 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
le32_to_cpu(msg->hdr.middle_len),
le32_to_cpu(msg->hdr.data_len));
- clear_standby(con);
- mutex_unlock(&con->mutex);
+ if (con->state == CEPH_CON_S_STANDBY) {
+ spin_unlock(&con->out_lock);
+ mutex_lock(&con->mutex);
+ clear_standby(con);
+ mutex_unlock(&con->mutex);
+ } else {
+ spin_unlock(&con->out_lock);
+ }
/* if there wasn't anything waiting to send before, queue
* new work */
@@ -1779,16 +1827,21 @@ void ceph_msg_revoke(struct ceph_msg *msg)
}
mutex_lock(&con->mutex);
+ spin_lock(&con->out_lock);
if (list_empty(&msg->list_head)) {
WARN_ON(con->out_msg == msg);
dout("%s con %p msg %p not linked\n", __func__, con, msg);
+ spin_unlock(&con->out_lock);
mutex_unlock(&con->mutex);
return;
}
+ list_del_init(&msg->list_head);
+ spin_unlock(&con->out_lock);
+
dout("%s con %p msg %p was linked\n", __func__, con, msg);
msg->hdr.seq = 0;
- ceph_msg_remove(msg);
+ ceph_msg_put(msg);
if (con->out_msg == msg) {
WARN_ON(con->state != CEPH_CON_S_OPEN);
@@ -2114,6 +2167,9 @@ struct ceph_msg *ceph_con_get_out_msg(struct ceph_connection *con)
{
struct ceph_msg *msg;
+ lockdep_assert_held(&con->mutex);
+ lockdep_assert_held(&con->out_lock);
+
if (list_empty(&con->out_queue))
return NULL;
@@ -2134,8 +2190,11 @@ struct ceph_msg *ceph_con_get_out_msg(struct ceph_connection *con)
msg->hdr.seq = cpu_to_le64(++con->out_seq);
msg->needs_out_seq = false;
- if (con->ops->reencode_message)
+ if (con->ops->reencode_message) {
+ spin_unlock(&con->out_lock);
con->ops->reencode_message(msg);
+ spin_lock(&con->out_lock);
+ }
}
/*
diff --git a/net/ceph/messenger_v1.c b/net/ceph/messenger_v1.c
index c9e002d96319..01b870927c93 100644
--- a/net/ceph/messenger_v1.c
+++ b/net/ceph/messenger_v1.c
@@ -888,7 +888,9 @@ static int process_connect(struct ceph_connection *con)
}
WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG);
+ spin_lock(&con->out_lock);
con->state = CEPH_CON_S_OPEN;
+ spin_unlock(&con->out_lock);
con->v1.auth_retry = 0; /* we authenticated; clear flag */
con->v1.peer_global_seq =
le32_to_cpu(con->v1.in_reply.global_seq);
@@ -1343,7 +1345,9 @@ int ceph_con_v1_try_read(struct ceph_connection *con)
if (ret < 0)
goto out;
+ spin_lock(&con->out_lock);
con->state = CEPH_CON_S_V1_CONNECT_MSG;
+ spin_unlock(&con->out_lock);
/*
* Received banner is good, exchange connection info.
@@ -1403,7 +1407,9 @@ int ceph_con_v1_try_read(struct ceph_connection *con)
break;
case CEPH_MSGR_TAG_CLOSE:
ceph_con_close_socket(con);
+ spin_lock(&con->out_lock);
con->state = CEPH_CON_S_CLOSED;
+ spin_unlock(&con->out_lock);
goto out;
default:
goto bad_tag;
@@ -1481,7 +1487,9 @@ int ceph_con_v1_try_write(struct ceph_connection *con)
/* open the socket first? */
if (con->state == CEPH_CON_S_PREOPEN) {
BUG_ON(con->sock);
+ spin_lock(&con->out_lock);
con->state = CEPH_CON_S_V1_BANNER;
+ spin_unlock(&con->out_lock);
con_out_kvec_reset(con);
prepare_write_banner(con);
@@ -1536,18 +1544,26 @@ int ceph_con_v1_try_write(struct ceph_connection *con)
}
do_next:
+ /* hold con->out_lock while checking for things to write and
+ * unlock it after clearing CEPH_CON_F_WRITE_PENDING to avoid
+ * racing with ceph_con_send()
+ */
+ spin_lock(&con->out_lock);
if (con->state == CEPH_CON_S_OPEN) {
if (ceph_con_flag_test_and_clear(con,
CEPH_CON_F_KEEPALIVE_PENDING)) {
+ spin_unlock(&con->out_lock);
prepare_write_keepalive(con);
goto more;
}
/* is anything else pending? */
if ((msg = ceph_con_get_out_msg(con)) != NULL) {
+ spin_unlock(&con->out_lock);
prepare_write_message(con, msg);
goto more;
}
if (con->in_seq > con->in_seq_acked) {
+ spin_unlock(&con->out_lock);
prepare_write_ack(con);
goto more;
}
@@ -1555,6 +1571,7 @@ int ceph_con_v1_try_write(struct ceph_connection *con)
/* Nothing to do! */
ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
+ spin_unlock(&con->out_lock);
dout("try_write nothing else to write.\n");
ret = 0;
out:
diff --git a/net/ceph/messenger_v2.c b/net/ceph/messenger_v2.c
index 05f6eea299fc..96a78e0345dd 100644
--- a/net/ceph/messenger_v2.c
+++ b/net/ceph/messenger_v2.c
@@ -1744,7 +1744,9 @@ static int prepare_read_banner_prefix(struct ceph_connection *con)
reset_in_kvecs(con);
add_in_kvec(con, buf, CEPH_BANNER_V2_PREFIX_LEN);
add_in_sign_kvec(con, buf, CEPH_BANNER_V2_PREFIX_LEN);
+ spin_lock(&con->out_lock);
con->state = CEPH_CON_S_V2_BANNER_PREFIX;
+ spin_unlock(&con->out_lock);
return 0;
}
@@ -1760,7 +1762,9 @@ static int prepare_read_banner_payload(struct ceph_connection *con,
reset_in_kvecs(con);
add_in_kvec(con, buf, payload_len);
add_in_sign_kvec(con, buf, payload_len);
+ spin_lock(&con->out_lock);
con->state = CEPH_CON_S_V2_BANNER_PAYLOAD;
+ spin_unlock(&con->out_lock);
return 0;
}
@@ -2181,7 +2185,9 @@ static int process_banner_payload(struct ceph_connection *con)
return ret;
}
+ spin_lock(&con->out_lock);
con->state = CEPH_CON_S_V2_HELLO;
+ spin_unlock(&con->out_lock);
prepare_read_preamble(con);
return 0;
@@ -2249,7 +2255,9 @@ static int process_hello(struct ceph_connection *con, void *p, void *end)
return ret;
}
+ spin_lock(&con->out_lock);
con->state = CEPH_CON_S_V2_AUTH;
+ spin_unlock(&con->out_lock);
return 0;
bad:
@@ -2409,7 +2417,9 @@ static int process_auth_done(struct ceph_connection *con, void *p, void *end)
goto out;
}
+ spin_lock(&con->out_lock);
con->state = CEPH_CON_S_V2_AUTH_SIGNATURE;
+ spin_unlock(&con->out_lock);
out:
memzero_explicit(session_key, sizeof(session_key));
@@ -2451,7 +2461,9 @@ static int process_auth_signature(struct ceph_connection *con,
return ret;
}
+ spin_lock(&con->out_lock);
con->state = CEPH_CON_S_V2_SESSION_CONNECT;
+ spin_unlock(&con->out_lock);
} else {
ret = prepare_session_reconnect(con);
if (ret) {
@@ -2459,7 +2471,9 @@ static int process_auth_signature(struct ceph_connection *con,
return ret;
}
+ spin_lock(&con->out_lock);
con->state = CEPH_CON_S_V2_SESSION_RECONNECT;
+ spin_unlock(&con->out_lock);
}
return 0;
@@ -2544,7 +2558,9 @@ static int process_server_ident(struct ceph_connection *con,
free_conn_bufs(con);
con->delay = 0; /* reset backoff memory */
+ spin_lock(&con->out_lock);
con->state = CEPH_CON_S_OPEN;
+ spin_unlock(&con->out_lock);
con->v2.out_state = OUT_S_GET_NEXT;
return 0;
@@ -2595,7 +2611,9 @@ static int process_session_reconnect_ok(struct ceph_connection *con,
free_conn_bufs(con);
con->delay = 0; /* reset backoff memory */
+ spin_lock(&con->out_lock);
con->state = CEPH_CON_S_OPEN;
+ spin_unlock(&con->out_lock);
con->v2.out_state = OUT_S_GET_NEXT;
return 0;
@@ -2710,7 +2728,9 @@ static int process_session_reset(struct ceph_connection *con,
return ret;
}
+ spin_lock(&con->out_lock);
con->state = CEPH_CON_S_V2_SESSION_CONNECT;
+ spin_unlock(&con->out_lock);
return 0;
bad:
@@ -3285,6 +3305,7 @@ static int populate_out_iter(struct ceph_connection *con)
if (con->state != CEPH_CON_S_OPEN) {
WARN_ON(con->state < CEPH_CON_S_V2_BANNER_PREFIX ||
con->state > CEPH_CON_S_V2_SESSION_RECONNECT);
+ spin_lock(&con->out_lock);
goto nothing_pending;
}
@@ -3315,19 +3336,28 @@ static int populate_out_iter(struct ceph_connection *con)
}
WARN_ON(con->v2.out_state != OUT_S_GET_NEXT);
+
+ /* hold con->out_lock while checking for things to write and
+ * unlock it after clearing CEPH_CON_F_WRITE_PENDING to avoid
+ * racing with ceph_con_send()
+ */
+ spin_lock(&con->out_lock);
if (ceph_con_flag_test_and_clear(con, CEPH_CON_F_KEEPALIVE_PENDING)) {
+ spin_unlock(&con->out_lock);
ret = prepare_keepalive2(con);
if (ret) {
pr_err("prepare_keepalive2 failed: %d\n", ret);
return ret;
}
} else if ((msg = ceph_con_get_out_msg(con)) != NULL) {
+ spin_unlock(&con->out_lock);
ret = prepare_message(con, msg);
if (ret) {
pr_err("prepare_message failed: %d\n", ret);
return ret;
}
} else if (con->in_seq > con->in_seq_acked) {
+ spin_unlock(&con->out_lock);
ret = prepare_ack(con);
if (ret) {
pr_err("prepare_ack failed: %d\n", ret);
@@ -3348,6 +3378,7 @@ static int populate_out_iter(struct ceph_connection *con)
WARN_ON(iov_iter_count(&con->v2.out_iter));
dout("%s con %p nothing pending\n", __func__, con);
ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
+ spin_unlock(&con->out_lock);
return 0;
}
--
2.47.3
^ permalink raw reply related [flat|nested] only message in thread
only message in thread, other threads:[~2026-07-03 19:38 UTC | newest]
Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2026-07-03 19:38 [PATCH] fs/ceph/messenger: add separate lock for `out_queue` Max Kellermann
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox