netdev.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH v2 0/8] RDS bug fix collection
@ 2025-04-11 18:01 allison.henderson
  2025-04-11 18:02 ` [PATCH v2 1/8] net/rds: Add per cp work queue allison.henderson
                   ` (7 more replies)
  0 siblings, 8 replies; 16+ messages in thread
From: allison.henderson @ 2025-04-11 18:01 UTC (permalink / raw)
  To: netdev

From: Allison Henderson <allison.henderson@oracle.com>

Hi all,

This is v2 of the RDS bug fix collection set.

This set is a collection of bug fixes I've been working on porting from
uek rds to upstream rds.  We have some projects that are exploring the
idea of using upstream rds, but under enough stress, we've run into
dropped or out of sequence message bugs that we would like to fix.  We
have patches to address these, so I've ported them along with a handful
of other fixes they depend on.

Since v1, I've integrated the feed back from the last review, and have
since stabilized a few more patches that should be included as well.
Below is a summary of the changes:

net/rds: Avoid queuing superfluous send and recv work
   Removed

net/rds: Re-factor and avoid superfluous queuing of reconnect work
   Removed

net/rds: RDS/TCP does not initiate a connection
   Removed as a fix for "Re-factor and avoid superfluous queuing of reconnect work"
   
net/rds: rds_tcp_accept_one ought to not discard messages
   Fixed uninitialized inet found by kernel test robot
   Fixed smatch warning found by kernel test robot
   Moved kdoc for rds_transport into member specific comments.
   Added comment for the new conn_slots_available member

I've also included a few other bug fixes that I wanted to include in v1,
but still needed work at the time:
   net/rds: rds_tcp_conn_path_shutdown must not discard messages
   net/rds: Kick-start TCP receiver after accept

These last two needed some additional queue management patches to get
them through the selftests.  So those are included at the begining of
the set:
   net/rds: Add per cp work queue
   net/rds: Introduce a pool of worker threads for connection management
   net/rds: Re-factor queuing of shutdown work

So that's pretty much the break down of changes in this version.
Questions, comments, flames appreciated!
Thanks everybody!

Allison

Allison Henderson (1):
  net/rds: Add per cp work queue

Gerd Rausch (5):
  net/rds: No shortcut out of RDS_CONN_ERROR
  net/rds: rds_tcp_accept_one ought to not discard messages
  net/rds: Encode cp_index in TCP source port
  net/rds: rds_tcp_conn_path_shutdown must not discard messages
  net/rds: Kick-start TCP receiver after accept

Håkon Bugge (2):
  net/rds: Introduce a pool of worker threads for connection management
  net/rds: Re-factor queuing of shutdown work

 net/rds/connection.c  |  20 +++-
 net/rds/ib.c          |   5 +
 net/rds/ib_recv.c     |   2 +-
 net/rds/ib_send.c     |   2 +-
 net/rds/message.c     |   1 +
 net/rds/rds.h         |  74 +++++++++------
 net/rds/recv.c        |  11 +++
 net/rds/send.c        |  13 ++-
 net/rds/tcp.c         |  28 +++---
 net/rds/tcp.h         |  27 +++++-
 net/rds/tcp_connect.c |  68 ++++++++++++-
 net/rds/tcp_listen.c  | 216 +++++++++++++++++++++++++++++++-----------
 net/rds/tcp_recv.c    |   6 +-
 net/rds/tcp_send.c    |   4 +-
 net/rds/threads.c     |  40 ++++++--
 15 files changed, 395 insertions(+), 122 deletions(-)

-- 
2.43.0


^ permalink raw reply	[flat|nested] 16+ messages in thread

* [PATCH v2 1/8] net/rds: Add per cp work queue
  2025-04-11 18:01 [PATCH v2 0/8] RDS bug fix collection allison.henderson
@ 2025-04-11 18:02 ` allison.henderson
  2025-04-17  9:44   ` Simon Horman
  2025-04-11 18:02 ` [PATCH v2 2/8] net/rds: Introduce a pool of worker threads for connection management allison.henderson
                   ` (6 subsequent siblings)
  7 siblings, 1 reply; 16+ messages in thread
From: allison.henderson @ 2025-04-11 18:02 UTC (permalink / raw)
  To: netdev

From: Allison Henderson <allison.henderson@oracle.com>

This patch adds a per connection path queue which can be initialized
and used independently of the globally shared rds_wq.

This patch is the first in a series that aims to address multiple bugs
including following the appropriate shutdown sequnce described in
rfc793 (pg 23) https://www.ietf.org/rfc/rfc793.txt

This initial refactoring lays the ground work needed to alleviate
queue congestion during heavy reads and writes.  The independently
managed queues will allow shutdowns and reconnects respond more quickly
before the peer times out waiting for the proper acks.

Signed-off-by: Allison Henderson <allison.henderson@oracle.com>
---
 net/rds/connection.c |  5 +++--
 net/rds/ib_recv.c    |  2 +-
 net/rds/ib_send.c    |  2 +-
 net/rds/rds.h        |  1 +
 net/rds/send.c       |  8 ++++----
 net/rds/tcp_recv.c   |  2 +-
 net/rds/tcp_send.c   |  2 +-
 net/rds/threads.c    | 16 ++++++++--------
 8 files changed, 20 insertions(+), 18 deletions(-)

diff --git a/net/rds/connection.c b/net/rds/connection.c
index c749c5525b40..372542d67fdb 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -268,6 +268,7 @@ static struct rds_connection *__rds_conn_create(struct net *net,
 		__rds_conn_path_init(conn, &conn->c_path[i],
 				     is_outgoing);
 		conn->c_path[i].cp_index = i;
+		conn->c_path[i].cp_wq = rds_wq;
 	}
 	rcu_read_lock();
 	if (rds_destroy_pending(conn))
@@ -885,7 +886,7 @@ void rds_conn_path_drop(struct rds_conn_path *cp, bool destroy)
 		rcu_read_unlock();
 		return;
 	}
-	queue_work(rds_wq, &cp->cp_down_w);
+	queue_work(cp->cp_wq, &cp->cp_down_w);
 	rcu_read_unlock();
 }
 EXPORT_SYMBOL_GPL(rds_conn_path_drop);
@@ -910,7 +911,7 @@ void rds_conn_path_connect_if_down(struct rds_conn_path *cp)
 	}
 	if (rds_conn_path_state(cp) == RDS_CONN_DOWN &&
 	    !test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags))
-		queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
+		queue_delayed_work(cp->cp_wq, &cp->cp_conn_w, 0);
 	rcu_read_unlock();
 }
 EXPORT_SYMBOL_GPL(rds_conn_path_connect_if_down);
diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c
index e53b7f266bd7..add0018da8f3 100644
--- a/net/rds/ib_recv.c
+++ b/net/rds/ib_recv.c
@@ -457,7 +457,7 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp)
 	    (must_wake ||
 	    (can_wait && rds_ib_ring_low(&ic->i_recv_ring)) ||
 	    rds_ib_ring_empty(&ic->i_recv_ring))) {
-		queue_delayed_work(rds_wq, &conn->c_recv_w, 1);
+		queue_delayed_work(conn->c_path->cp_wq, &conn->c_recv_w, 1);
 	}
 	if (can_wait)
 		cond_resched();
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 4190b90ff3b1..e35bbb6ffb68 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -419,7 +419,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits)
 
 	atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits);
 	if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags))
-		queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+		queue_delayed_work(conn->c_path->cp_wq, &conn->c_send_w, 0);
 
 	WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384);
 
diff --git a/net/rds/rds.h b/net/rds/rds.h
index dc360252c515..1c51f3f6ed15 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -118,6 +118,7 @@ struct rds_conn_path {
 
 	void			*cp_transport_data;
 
+	struct workqueue_struct	*cp_wq;
 	atomic_t		cp_state;
 	unsigned long		cp_send_gen;
 	unsigned long		cp_flags;
diff --git a/net/rds/send.c b/net/rds/send.c
index 09a280110654..44a89d6c27a0 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -458,7 +458,7 @@ int rds_send_xmit(struct rds_conn_path *cp)
 			if (rds_destroy_pending(cp->cp_conn))
 				ret = -ENETUNREACH;
 			else
-				queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
+				queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 1);
 			rcu_read_unlock();
 		} else if (raced) {
 			rds_stats_inc(s_send_lock_queue_raced);
@@ -1380,7 +1380,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
 		if (rds_destroy_pending(cpath->cp_conn))
 			ret = -ENETUNREACH;
 		else
-			queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
+			queue_delayed_work(cpath->cp_wq, &cpath->cp_send_w, 1);
 		rcu_read_unlock();
 	}
 	if (ret)
@@ -1470,10 +1470,10 @@ rds_send_probe(struct rds_conn_path *cp, __be16 sport,
 	rds_stats_inc(s_send_queued);
 	rds_stats_inc(s_send_pong);
 
-	/* schedule the send work on rds_wq */
+	/* schedule the send work on cp_wq */
 	rcu_read_lock();
 	if (!rds_destroy_pending(cp->cp_conn))
-		queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
+		queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 1);
 	rcu_read_unlock();
 
 	rds_message_put(rm);
diff --git a/net/rds/tcp_recv.c b/net/rds/tcp_recv.c
index 7997a19d1da3..b7cf7f451430 100644
--- a/net/rds/tcp_recv.c
+++ b/net/rds/tcp_recv.c
@@ -327,7 +327,7 @@ void rds_tcp_data_ready(struct sock *sk)
 	if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) {
 		rcu_read_lock();
 		if (!rds_destroy_pending(cp->cp_conn))
-			queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+			queue_delayed_work(cp->cp_wq, &cp->cp_recv_w, 0);
 		rcu_read_unlock();
 	}
 out:
diff --git a/net/rds/tcp_send.c b/net/rds/tcp_send.c
index 7d284ac7e81a..4e82c9644aa6 100644
--- a/net/rds/tcp_send.c
+++ b/net/rds/tcp_send.c
@@ -201,7 +201,7 @@ void rds_tcp_write_space(struct sock *sk)
 	rcu_read_lock();
 	if ((refcount_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf &&
 	    !rds_destroy_pending(cp->cp_conn))
-		queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
+		queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 0);
 	rcu_read_unlock();
 
 out:
diff --git a/net/rds/threads.c b/net/rds/threads.c
index 1f424cbfcbb4..639302bab51e 100644
--- a/net/rds/threads.c
+++ b/net/rds/threads.c
@@ -89,8 +89,8 @@ void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
 	set_bit(0, &cp->cp_conn->c_map_queued);
 	rcu_read_lock();
 	if (!rds_destroy_pending(cp->cp_conn)) {
-		queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
-		queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+		queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 0);
+		queue_delayed_work(cp->cp_wq, &cp->cp_recv_w, 0);
 	}
 	rcu_read_unlock();
 	cp->cp_conn->c_proposed_version = RDS_PROTOCOL_VERSION;
@@ -140,7 +140,7 @@ void rds_queue_reconnect(struct rds_conn_path *cp)
 		cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
 		rcu_read_lock();
 		if (!rds_destroy_pending(cp->cp_conn))
-			queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
+			queue_delayed_work(cp->cp_wq, &cp->cp_conn_w, 0);
 		rcu_read_unlock();
 		return;
 	}
@@ -151,7 +151,7 @@ void rds_queue_reconnect(struct rds_conn_path *cp)
 		 conn, &conn->c_laddr, &conn->c_faddr);
 	rcu_read_lock();
 	if (!rds_destroy_pending(cp->cp_conn))
-		queue_delayed_work(rds_wq, &cp->cp_conn_w,
+		queue_delayed_work(cp->cp_wq, &cp->cp_conn_w,
 				   rand % cp->cp_reconnect_jiffies);
 	rcu_read_unlock();
 
@@ -203,11 +203,11 @@ void rds_send_worker(struct work_struct *work)
 		switch (ret) {
 		case -EAGAIN:
 			rds_stats_inc(s_send_immediate_retry);
-			queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
+			queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 0);
 			break;
 		case -ENOMEM:
 			rds_stats_inc(s_send_delayed_retry);
-			queue_delayed_work(rds_wq, &cp->cp_send_w, 2);
+			queue_delayed_work(cp->cp_wq, &cp->cp_send_w, 2);
 			break;
 		default:
 			break;
@@ -228,11 +228,11 @@ void rds_recv_worker(struct work_struct *work)
 		switch (ret) {
 		case -EAGAIN:
 			rds_stats_inc(s_recv_immediate_retry);
-			queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+			queue_delayed_work(cp->cp_wq, &cp->cp_recv_w, 0);
 			break;
 		case -ENOMEM:
 			rds_stats_inc(s_recv_delayed_retry);
-			queue_delayed_work(rds_wq, &cp->cp_recv_w, 2);
+			queue_delayed_work(cp->cp_wq, &cp->cp_recv_w, 2);
 			break;
 		default:
 			break;
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH v2 2/8] net/rds: Introduce a pool of worker threads for connection management
  2025-04-11 18:01 [PATCH v2 0/8] RDS bug fix collection allison.henderson
  2025-04-11 18:02 ` [PATCH v2 1/8] net/rds: Add per cp work queue allison.henderson
@ 2025-04-11 18:02 ` allison.henderson
  2025-04-15  0:46   ` Jakub Kicinski
  2025-04-11 18:02 ` [PATCH v2 3/8] net/rds: Re-factor queuing of shutdown work allison.henderson
                   ` (5 subsequent siblings)
  7 siblings, 1 reply; 16+ messages in thread
From: allison.henderson @ 2025-04-11 18:02 UTC (permalink / raw)
  To: netdev

From: Håkon Bugge <haakon.bugge@oracle.com>

RDS uses a single threaded work queue for connection management. This
involves creation and deletion of QPs and CQs. On certain HCAs, such
as CX-3, these operations are para-virtualized and some part of the
work has to be conducted by the Physical Function (PF) driver.

In fail-over and fail-back situations, there might be 1000s of
connections to tear down and re-establish. Hence, expand the number
work queues.

Signed-off-by: Håkon Bugge <haakon.bugge@oracle.com>
Signed-off-by: Allison Henderson <allison.henderson@oracle.com>
---
 net/rds/connection.c |  8 +++++++-
 net/rds/ib.c         |  5 +++++
 net/rds/rds.h        |  4 ++++
 net/rds/threads.c    | 23 +++++++++++++++++++++++
 4 files changed, 39 insertions(+), 1 deletion(-)

diff --git a/net/rds/connection.c b/net/rds/connection.c
index 372542d67fdb..f7f48abf7233 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -171,6 +171,10 @@ static struct rds_connection *__rds_conn_create(struct net *net,
 	unsigned long flags;
 	int ret, i;
 	int npaths = (trans->t_mp_capable ? RDS_MPATH_WORKERS : 1);
+	int cp_wqs_inx = jhash_3words(laddr->s6_addr32[3],
+				      faddr->s6_addr32[3],
+				      tos,
+				      0) % RDS_NMBR_CP_WQS;
 
 	rcu_read_lock();
 	conn = rds_conn_lookup(net, head, laddr, faddr, trans, tos, dev_if);
@@ -268,7 +272,9 @@ static struct rds_connection *__rds_conn_create(struct net *net,
 		__rds_conn_path_init(conn, &conn->c_path[i],
 				     is_outgoing);
 		conn->c_path[i].cp_index = i;
-		conn->c_path[i].cp_wq = rds_wq;
+
+		rdsdebug("using rds_cp_wqs index %d\n", cp_wqs_inx);
+		conn->c_path[i].cp_wq = rds_cp_wqs[cp_wqs_inx];
 	}
 	rcu_read_lock();
 	if (rds_destroy_pending(conn))
diff --git a/net/rds/ib.c b/net/rds/ib.c
index 9826fe7f9d00..6694d31e6cfd 100644
--- a/net/rds/ib.c
+++ b/net/rds/ib.c
@@ -491,9 +491,14 @@ static int rds_ib_laddr_check(struct net *net, const struct in6_addr *addr,
 
 static void rds_ib_unregister_client(void)
 {
+	int i;
+
 	ib_unregister_client(&rds_ib_client);
 	/* wait for rds_ib_dev_free() to complete */
 	flush_workqueue(rds_wq);
+
+	for (i = 0; i < RDS_NMBR_CP_WQS; ++i)
+		flush_workqueue(rds_cp_wqs[i]);
 }
 
 static void rds_ib_set_unloading(void)
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 1c51f3f6ed15..71a0020fe41d 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -40,6 +40,9 @@
 #ifdef ATOMIC64_INIT
 #define KERNEL_HAS_ATOMIC64
 #endif
+
+#define RDS_NMBR_CP_WQS 16
+
 #ifdef RDS_DEBUG
 #define rdsdebug(fmt, args...) pr_debug("%s(): " fmt, __func__ , ##args)
 #else
@@ -994,6 +997,7 @@ extern unsigned int  rds_sysctl_trace_level;
 int rds_threads_init(void);
 void rds_threads_exit(void);
 extern struct workqueue_struct *rds_wq;
+extern struct workqueue_struct *rds_cp_wqs[RDS_NMBR_CP_WQS];
 void rds_queue_reconnect(struct rds_conn_path *cp);
 void rds_connect_worker(struct work_struct *);
 void rds_shutdown_worker(struct work_struct *);
diff --git a/net/rds/threads.c b/net/rds/threads.c
index 639302bab51e..f713c6d9cd32 100644
--- a/net/rds/threads.c
+++ b/net/rds/threads.c
@@ -33,6 +33,7 @@
 #include <linux/kernel.h>
 #include <linux/random.h>
 #include <linux/export.h>
+#include <linux/workqueue.h>
 
 #include "rds.h"
 
@@ -70,6 +71,8 @@
  */
 struct workqueue_struct *rds_wq;
 EXPORT_SYMBOL_GPL(rds_wq);
+struct workqueue_struct *rds_cp_wqs[RDS_NMBR_CP_WQS];
+EXPORT_SYMBOL_GPL(rds_cp_wqs);
 
 void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
 {
@@ -251,16 +254,36 @@ void rds_shutdown_worker(struct work_struct *work)
 
 void rds_threads_exit(void)
 {
+	int i;
+
 	destroy_workqueue(rds_wq);
+	for (i = 0; i < RDS_NMBR_CP_WQS; ++i)
+		destroy_workqueue(rds_cp_wqs[i]);
 }
 
 int rds_threads_init(void)
 {
+	int i, j;
+
 	rds_wq = create_singlethread_workqueue("krdsd");
 	if (!rds_wq)
 		return -ENOMEM;
 
+	for (i = 0; i < RDS_NMBR_CP_WQS; ++i) {
+		rds_cp_wqs[i] = alloc_ordered_workqueue("krds_cp_wq_%d",
+							WQ_MEM_RECLAIM, i);
+		if (!rds_cp_wqs[i])
+			goto err;
+	}
+
 	return 0;
+
+err:
+	destroy_workqueue(rds_wq);
+	for (j = 0; j < i; ++j)
+		destroy_workqueue(rds_cp_wqs[j]);
+
+	return -ENOMEM;
 }
 
 /* Compare two IPv6 addresses.  Return 0 if the two addresses are equal.
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH v2 3/8] net/rds: Re-factor queuing of shutdown work
  2025-04-11 18:01 [PATCH v2 0/8] RDS bug fix collection allison.henderson
  2025-04-11 18:02 ` [PATCH v2 1/8] net/rds: Add per cp work queue allison.henderson
  2025-04-11 18:02 ` [PATCH v2 2/8] net/rds: Introduce a pool of worker threads for connection management allison.henderson
@ 2025-04-11 18:02 ` allison.henderson
  2025-04-11 18:02 ` [PATCH v2 4/8] net/rds: No shortcut out of RDS_CONN_ERROR allison.henderson
                   ` (4 subsequent siblings)
  7 siblings, 0 replies; 16+ messages in thread
From: allison.henderson @ 2025-04-11 18:02 UTC (permalink / raw)
  To: netdev

From: Håkon Bugge <haakon.bugge@oracle.com>

When RDS receives an RDMA_CM_EVENT_ESTABLISHED in close proximity in
time with a reconnect timeout, shutdown worker may be queued twice.

The reconnect timeout calls rds_conn_path_drop() which sets the
connection state to RDS_CONN_ERROR, before rds_connect_path_complete()
is able to set it to RDS_CONN_UP.

Fixed by conditionally queue the shutdown worker.

Signed-off-by: Håkon Bugge <haakon.bugge@oracle.com>
Signed-off-by: Allison Henderson <allison.henderson@oracle.com>
---
 net/rds/connection.c | 5 +++--
 net/rds/rds.h        | 1 +
 net/rds/threads.c    | 1 +
 3 files changed, 5 insertions(+), 2 deletions(-)

diff --git a/net/rds/connection.c b/net/rds/connection.c
index f7f48abf7233..17fd7f34ebbd 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -693,7 +693,6 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
 	rcu_read_unlock();
 }
 EXPORT_SYMBOL_GPL(rds_for_each_conn_info);
-
 static void rds_walk_conn_path_info(struct socket *sock, unsigned int len,
 				    struct rds_info_iterator *iter,
 				    struct rds_info_lengths *lens,
@@ -892,7 +891,9 @@ void rds_conn_path_drop(struct rds_conn_path *cp, bool destroy)
 		rcu_read_unlock();
 		return;
 	}
-	queue_work(cp->cp_wq, &cp->cp_down_w);
+	if (!test_and_set_bit(RDS_SHUTDOWN_WORK_QUEUED, &cp->cp_flags))
+		queue_work(cp->cp_wq, &cp->cp_down_w);
+
 	rcu_read_unlock();
 }
 EXPORT_SYMBOL_GPL(rds_conn_path_drop);
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 71a0020fe41d..d8d3abb2d5a6 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -93,6 +93,7 @@ enum {
 #define RDS_IN_XMIT		2
 #define RDS_RECV_REFILL		3
 #define	RDS_DESTROY_PENDING	4
+#define RDS_SHUTDOWN_WORK_QUEUED 7
 
 /* Max number of multipaths per RDS connection. Must be a power of 2 */
 #define	RDS_MPATH_WORKERS	8
diff --git a/net/rds/threads.c b/net/rds/threads.c
index f713c6d9cd32..915e57b06d73 100644
--- a/net/rds/threads.c
+++ b/net/rds/threads.c
@@ -250,6 +250,7 @@ void rds_shutdown_worker(struct work_struct *work)
 						cp_down_w);
 
 	rds_conn_shutdown(cp);
+	clear_bit(RDS_SHUTDOWN_WORK_QUEUED, &cp->cp_flags);
 }
 
 void rds_threads_exit(void)
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH v2 4/8] net/rds: No shortcut out of RDS_CONN_ERROR
  2025-04-11 18:01 [PATCH v2 0/8] RDS bug fix collection allison.henderson
                   ` (2 preceding siblings ...)
  2025-04-11 18:02 ` [PATCH v2 3/8] net/rds: Re-factor queuing of shutdown work allison.henderson
@ 2025-04-11 18:02 ` allison.henderson
  2025-04-15  0:47   ` Jakub Kicinski
  2025-04-11 18:02 ` [PATCH v2 5/8] net/rds: rds_tcp_accept_one ought to not discard messages allison.henderson
                   ` (3 subsequent siblings)
  7 siblings, 1 reply; 16+ messages in thread
From: allison.henderson @ 2025-04-11 18:02 UTC (permalink / raw)
  To: netdev

From: Gerd Rausch <gerd.rausch@oracle.com>

RDS connections carry a state "rds_conn_path::cp_state"
and transitions from one state to another are conditional
upon an expected state: "rds_conn_path_transition"

There is one exception to this conditionality, which is
"RDS_CONN_ERROR" that can be enforced by "rds_conn_path_drop"
regardless of what state the condition is currently in.

But as soon as a connection enters state "RDS_CONN_ERROR",
the connection handling code expects it to go through the
shutdown-path.

The RDS/TCP multipath changes added a shortcut out of
"RDS_CONN_ERROR" straight back to "RDS_CONN_CONNECTING"
via "rds_tcp_accept_one_path" (e.g. after "rds_tcp_state_change").

A subsequent "rds_tcp_reset_callbacks" can then transition
the state to "RDS_CONN_RESETTING" with a shutdown-worker queued.

That'll trip up "rds_conn_init_shutdown", which was
never adjust to handle "RDS_CONN_RESETTING" and subsequently
drops the connection with the dreaded "DR_INV_CONN_STATE",
which leaves "RDS_SHUTDOWN_WORK_QUEUED" on forever.

So we do two things here:

a) Don't shortcut "RDS_CONN_ERROR", but take the longer
   path through the shutdown code.

b) Add "RDS_CONN_RESETTING" to the expected states in
   "rds_conn_init_shutdown" so that we won't hit
   "DR_INV_CONN_STATE" and get stuck, if we ever
   hit weird state transitions like this again.

Fixes:  ("RDS: TCP: fix race windows in send-path quiescence by rds_tcp_accept_one()")

Signed-off-by: Gerd Rausch <gerd.rausch@oracle.com>
Signed-off-by: Allison Henderson <allison.henderson@oracle.com>
---
 net/rds/connection.c | 2 ++
 net/rds/tcp_listen.c | 5 -----
 2 files changed, 2 insertions(+), 5 deletions(-)

diff --git a/net/rds/connection.c b/net/rds/connection.c
index 17fd7f34ebbd..cfea54740219 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -388,6 +388,8 @@ void rds_conn_shutdown(struct rds_conn_path *cp)
 		if (!rds_conn_path_transition(cp, RDS_CONN_UP,
 					      RDS_CONN_DISCONNECTING) &&
 		    !rds_conn_path_transition(cp, RDS_CONN_ERROR,
+					      RDS_CONN_DISCONNECTING) &&
+		    !rds_conn_path_transition(cp, RDS_CONN_RESETTING,
 					      RDS_CONN_DISCONNECTING)) {
 			rds_conn_path_error(cp,
 					    "shutdown called in state %d\n",
diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c
index d89bd8d0c354..fced9e286f79 100644
--- a/net/rds/tcp_listen.c
+++ b/net/rds/tcp_listen.c
@@ -59,9 +59,6 @@ void rds_tcp_keepalive(struct socket *sock)
  * socket and force a reconneect from smaller -> larger ip addr. The reason
  * we special case cp_index 0 is to allow the rds probe ping itself to itself
  * get through efficiently.
- * Since reconnects are only initiated from the node with the numerically
- * smaller ip address, we recycle conns in RDS_CONN_ERROR on the passive side
- * by moving them to CONNECTING in this function.
  */
 static
 struct rds_tcp_connection *rds_tcp_accept_one_path(struct rds_connection *conn)
@@ -86,8 +83,6 @@ struct rds_tcp_connection *rds_tcp_accept_one_path(struct rds_connection *conn)
 		struct rds_conn_path *cp = &conn->c_path[i];
 
 		if (rds_conn_path_transition(cp, RDS_CONN_DOWN,
-					     RDS_CONN_CONNECTING) ||
-		    rds_conn_path_transition(cp, RDS_CONN_ERROR,
 					     RDS_CONN_CONNECTING)) {
 			return cp->cp_transport_data;
 		}
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH v2 5/8] net/rds: rds_tcp_accept_one ought to not discard messages
  2025-04-11 18:01 [PATCH v2 0/8] RDS bug fix collection allison.henderson
                   ` (3 preceding siblings ...)
  2025-04-11 18:02 ` [PATCH v2 4/8] net/rds: No shortcut out of RDS_CONN_ERROR allison.henderson
@ 2025-04-11 18:02 ` allison.henderson
  2025-04-11 18:02 ` [PATCH v2 6/8] net/rds: Encode cp_index in TCP source port allison.henderson
                   ` (2 subsequent siblings)
  7 siblings, 0 replies; 16+ messages in thread
From: allison.henderson @ 2025-04-11 18:02 UTC (permalink / raw)
  To: netdev

From: Gerd Rausch <gerd.rausch@oracle.com>

RDS/TCP differs from RDS/RDMA in that message acknowledgment
is done based on TCP sequence numbers:
As soon as the last byte of a message has been acknowledged
by the TCP stack of a peer, "rds_tcp_write_space()" goes on
to discard prior messages from the send queue.

Which is fine, for as long as the receiver never throws any messages away.

Unfortunately, that is *not* the case since the introduction of MPRDS:
commit "RDS: TCP: Enable multipath RDS for TCP"

A new function "rds_tcp_accept_one_path" was introduced,
which is entitled to return "NULL", if no connection path is currently
available.

Unfortunately, this happens after the "->accept()" call, and the new socket
often already contains messages, since the peer already transitioned
to "RDS_CONN_UP" on behalf of "TCP_ESTABLISHED".

That's also the case after this [1]:
commit "RDS: TCP: Force every connection to be initiated by numerically
smaller IP address"

which tried to address the situation of pending data by only transitioning
connections from a smaller IP address to "RDS_CONN_UP".

But even in those cases, and in particular if the "RDS_EXTHDR_NPATHS"
handshake has not occurred yet, and therefore we're working with
"c_npaths <= 1", "c_conn[0]" may be in a state distinct from
"RDS_CONN_DOWN", and therefore all messages on the just accepted socket
will be tossed away.

This fix changes "rds_tcp_accept_one":

* If connected from a peer with a larger IP address, the new socket
  will continue to get closed right away.
  With commit [1] above, there should not be any messages
  in the socket receive buffer, since the peer never transitioned
  to "RDS_CONN_UP".
  Therefore it should be okay to not make any efforts to dispatch
  the socket receive buffer.

* If connected from a peer with a smaller IP address,
  we call "rds_tcp_accept_one_path" to find a free slot/"path".
  If found, business goes on as usual.
  If none was found, we save/stash the newly accepted socket
  into "rds_tcp_accepted_sock", in order to not lose any
  messages that may have arrived already.
  We then return from "rds_tcp_accept_one" with "-ENOBUFS".
  Later on, when a slot/"path" does become available again
  (e.g. state transitioned to "RDS_CONN_DOWN",
   or HS extension header was received with "c_npaths > 1")
  we call "rds_tcp_conn_slots_available" that simply re-issues
  a "rds_tcp_accept_one_path" worker-callback and picks
  up the new socket from "rds_tcp_accepted_sock", and thereby
  continuing where it left with "-ENOBUFS" last time.
  Since a new slot has become available, those messages
  won't be lost, since processing proceeds as if that slot
  had been available the first time around.

Signed-off-by: Gerd Rausch <gerd.rausch@oracle.com>
Signed-off-by: Jack Vogel <jack.vogel@oracle.com>
Signed-off-by: Allison Henderson <allison.henderson@oracle.com>
---
 net/rds/connection.c |   4 ++
 net/rds/rds.h        |  65 ++++++++++--------
 net/rds/recv.c       |   4 ++
 net/rds/tcp.c        |  27 +++-----
 net/rds/tcp.h        |  22 +++++-
 net/rds/tcp_listen.c | 156 ++++++++++++++++++++++++++++++-------------
 6 files changed, 186 insertions(+), 92 deletions(-)

diff --git a/net/rds/connection.c b/net/rds/connection.c
index cfea54740219..05adbcd3c7b3 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -435,6 +435,10 @@ void rds_conn_shutdown(struct rds_conn_path *cp)
 	 * to the conn hash, so we never trigger a reconnect on this
 	 * conn - the reconnect is always triggered by the active peer. */
 	cancel_delayed_work_sync(&cp->cp_conn_w);
+
+	if (conn->c_trans->conn_slots_available)
+		conn->c_trans->conn_slots_available(conn);
+
 	rcu_read_lock();
 	if (!hlist_unhashed(&conn->c_hash_node)) {
 		rcu_read_unlock();
diff --git a/net/rds/rds.h b/net/rds/rds.h
index d8d3abb2d5a6..0d66d4f4cc63 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -510,33 +510,6 @@ struct rds_notifier {
  */
 #define	RDS_TRANS_LOOP	3
 
-/**
- * struct rds_transport -  transport specific behavioural hooks
- *
- * @xmit: .xmit is called by rds_send_xmit() to tell the transport to send
- *        part of a message.  The caller serializes on the send_sem so this
- *        doesn't need to be reentrant for a given conn.  The header must be
- *        sent before the data payload.  .xmit must be prepared to send a
- *        message with no data payload.  .xmit should return the number of
- *        bytes that were sent down the connection, including header bytes.
- *        Returning 0 tells the caller that it doesn't need to perform any
- *        additional work now.  This is usually the case when the transport has
- *        filled the sending queue for its connection and will handle
- *        triggering the rds thread to continue the send when space becomes
- *        available.  Returning -EAGAIN tells the caller to retry the send
- *        immediately.  Returning -ENOMEM tells the caller to retry the send at
- *        some point in the future.
- *
- * @conn_shutdown: conn_shutdown stops traffic on the given connection.  Once
- *                 it returns the connection can not call rds_recv_incoming().
- *                 This will only be called once after conn_connect returns
- *                 non-zero success and will The caller serializes this with
- *                 the send and connecting paths (xmit_* and conn_*).  The
- *                 transport is responsible for other serialization, including
- *                 rds_recv_incoming().  This is called in process context but
- *                 should try hard not to block.
- */
-
 struct rds_transport {
 	char			t_name[TRANSNAMSIZ];
 	struct list_head	t_item;
@@ -549,10 +522,48 @@ struct rds_transport {
 			   __u32 scope_id);
 	int (*conn_alloc)(struct rds_connection *conn, gfp_t gfp);
 	void (*conn_free)(void *data);
+
+	/*
+	 * conn_slots_available is invoked when a previously unavailable connection slot
+	 * becomes available again. rds_tcp_accept_one_path may return -ENOBUFS if it
+	 * cannot find an available slot, and then stashes the new socket in
+	 * "rds_tcp_accepted_sock". This function re-issues `rds_tcp_accept_one_path`,
+	 * which picks up the stashed socket and continuing where it left with "-ENOBUFS"
+	 * last time.  This ensures messages received on the new socket are not discarded
+	 * when no connection path was available at the time.
+	 */
+	void (*conn_slots_available)(struct rds_connection *conn);
 	int (*conn_path_connect)(struct rds_conn_path *cp);
+
+	/*
+	 * conn_shutdown stops traffic on the given connection.  Once
+	 * it returns the connection can not call rds_recv_incoming().
+	 * This will only be called once after conn_connect returns
+	 * non-zero success and will The caller serializes this with
+	 * the send and connecting paths (xmit_* and conn_*).  The
+	 * transport is responsible for other serialization, including
+	 * rds_recv_incoming().  This is called in process context but
+	 * should try hard not to block.
+	 */
 	void (*conn_path_shutdown)(struct rds_conn_path *conn);
 	void (*xmit_path_prepare)(struct rds_conn_path *cp);
 	void (*xmit_path_complete)(struct rds_conn_path *cp);
+
+	/*
+	 * .xmit is called by rds_send_xmit() to tell the transport to send
+	 * part of a message.  The caller serializes on the send_sem so this
+	 * doesn't need to be reentrant for a given conn.  The header must be
+	 * sent before the data payload.  .xmit must be prepared to send a
+	 * message with no data payload.  .xmit should return the number of
+	 * bytes that were sent down the connection, including header bytes.
+	 * Returning 0 tells the caller that it doesn't need to perform any
+	 * additional work now.  This is usually the case when the transport has
+	 * filled the sending queue for its connection and will handle
+	 * triggering the rds thread to continue the send when space becomes
+	 * available.  Returning -EAGAIN tells the caller to retry the send
+	 * immediately.  Returning -ENOMEM tells the caller to retry the send at
+	 * some point in the future.
+	 */
 	int (*xmit)(struct rds_connection *conn, struct rds_message *rm,
 		    unsigned int hdr_off, unsigned int sg, unsigned int off);
 	int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op);
diff --git a/net/rds/recv.c b/net/rds/recv.c
index 5627f80013f8..c0a89af29d1b 100644
--- a/net/rds/recv.c
+++ b/net/rds/recv.c
@@ -230,6 +230,10 @@ static void rds_recv_hs_exthdrs(struct rds_header *hdr,
 	conn->c_npaths = max_t(int, conn->c_npaths, 1);
 	conn->c_ping_triggered = 0;
 	rds_conn_peer_gen_update(conn, new_peer_gen_num);
+
+	if (conn->c_npaths > 1 &&
+	    conn->c_trans->conn_slots_available)
+		conn->c_trans->conn_slots_available(conn);
 }
 
 /* rds_start_mprds() will synchronously start multiple paths when appropriate.
diff --git a/net/rds/tcp.c b/net/rds/tcp.c
index 3cc2f303bf78..31e7425e2da9 100644
--- a/net/rds/tcp.c
+++ b/net/rds/tcp.c
@@ -213,6 +213,8 @@ void rds_tcp_set_callbacks(struct socket *sock, struct rds_conn_path *cp)
 		sock->sk->sk_data_ready = sock->sk->sk_user_data;
 
 	tc->t_sock = sock;
+	if (!tc->t_rtn)
+		tc->t_rtn = net_generic(sock_net(sock->sk), rds_tcp_netid);
 	tc->t_cpath = cp;
 	tc->t_orig_data_ready = sock->sk->sk_data_ready;
 	tc->t_orig_write_space = sock->sk->sk_write_space;
@@ -378,6 +380,7 @@ static int rds_tcp_conn_alloc(struct rds_connection *conn, gfp_t gfp)
 		}
 		mutex_init(&tc->t_conn_path_lock);
 		tc->t_sock = NULL;
+		tc->t_rtn = NULL;
 		tc->t_tinc = NULL;
 		tc->t_tinc_hdr_rem = sizeof(struct rds_header);
 		tc->t_tinc_data_rem = 0;
@@ -458,6 +461,7 @@ struct rds_transport rds_tcp_transport = {
 	.recv_path		= rds_tcp_recv_path,
 	.conn_alloc		= rds_tcp_conn_alloc,
 	.conn_free		= rds_tcp_conn_free,
+	.conn_slots_available	= rds_tcp_conn_slots_available,
 	.conn_path_connect	= rds_tcp_conn_path_connect,
 	.conn_path_shutdown	= rds_tcp_conn_path_shutdown,
 	.inc_copy_to_user	= rds_tcp_inc_copy_to_user,
@@ -473,17 +477,7 @@ struct rds_transport rds_tcp_transport = {
 	.t_unloading		= rds_tcp_is_unloading,
 };
 
-static unsigned int rds_tcp_netid;
-
-/* per-network namespace private data for this module */
-struct rds_tcp_net {
-	struct socket *rds_tcp_listen_sock;
-	struct work_struct rds_tcp_accept_w;
-	struct ctl_table_header *rds_tcp_sysctl;
-	struct ctl_table *ctl_table;
-	int sndbuf_size;
-	int rcvbuf_size;
-};
+int rds_tcp_netid;
 
 /* All module specific customizations to the RDS-TCP socket should be done in
  * rds_tcp_tune() and applied after socket creation.
@@ -526,15 +520,12 @@ static void rds_tcp_accept_worker(struct work_struct *work)
 					       struct rds_tcp_net,
 					       rds_tcp_accept_w);
 
-	while (rds_tcp_accept_one(rtn->rds_tcp_listen_sock) == 0)
+	while (rds_tcp_accept_one(rtn) == 0)
 		cond_resched();
 }
 
-void rds_tcp_accept_work(struct sock *sk)
+void rds_tcp_accept_work(struct rds_tcp_net *rtn)
 {
-	struct net *net = sock_net(sk);
-	struct rds_tcp_net *rtn = net_generic(net, rds_tcp_netid);
-
 	queue_work(rds_wq, &rtn->rds_tcp_accept_w);
 }
 
@@ -546,6 +537,8 @@ static __net_init int rds_tcp_init_net(struct net *net)
 
 	memset(rtn, 0, sizeof(*rtn));
 
+	mutex_init(&rtn->rds_tcp_accept_lock);
+
 	/* {snd, rcv}buf_size default to 0, which implies we let the
 	 * stack pick the value, and permit auto-tuning of buffer size.
 	 */
@@ -609,6 +602,8 @@ static void rds_tcp_kill_sock(struct net *net)
 
 	rtn->rds_tcp_listen_sock = NULL;
 	rds_tcp_listen_stop(lsock, &rtn->rds_tcp_accept_w);
+	if (rtn->rds_tcp_accepted_sock)
+		sock_release(rtn->rds_tcp_accepted_sock);
 	spin_lock_irq(&rds_tcp_conn_lock);
 	list_for_each_entry_safe(tc, _tc, &rds_tcp_conn_list, t_tcp_node) {
 		struct net *c_net = read_pnet(&tc->t_cpath->cp_conn->c_net);
diff --git a/net/rds/tcp.h b/net/rds/tcp.h
index 053aa7da87ef..2000f4acd57a 100644
--- a/net/rds/tcp.h
+++ b/net/rds/tcp.h
@@ -4,6 +4,21 @@
 
 #define RDS_TCP_PORT	16385
 
+/* per-network namespace private data for this module */
+struct rds_tcp_net {
+	/* serialize "rds_tcp_accept_one" with "rds_tcp_accept_lock"
+	 * to protect "rds_tcp_accepted_sock"
+	 */
+	struct mutex		rds_tcp_accept_lock;
+	struct socket		*rds_tcp_listen_sock;
+	struct socket		*rds_tcp_accepted_sock;
+	struct work_struct	rds_tcp_accept_w;
+	struct ctl_table_header	*rds_tcp_sysctl;
+	struct ctl_table	*ctl_table;
+	int			sndbuf_size;
+	int			rcvbuf_size;
+};
+
 struct rds_tcp_incoming {
 	struct rds_incoming	ti_inc;
 	struct sk_buff_head	ti_skb_list;
@@ -19,6 +34,7 @@ struct rds_tcp_connection {
 	 */
 	struct mutex		t_conn_path_lock;
 	struct socket		*t_sock;
+	struct rds_tcp_net	*t_rtn;
 	void			*t_orig_write_space;
 	void			*t_orig_data_ready;
 	void			*t_orig_state_change;
@@ -49,6 +65,7 @@ struct rds_tcp_statistics {
 };
 
 /* tcp.c */
+extern int rds_tcp_netid;
 bool rds_tcp_tune(struct socket *sock);
 void rds_tcp_set_callbacks(struct socket *sock, struct rds_conn_path *cp);
 void rds_tcp_reset_callbacks(struct socket *sock, struct rds_conn_path *cp);
@@ -57,7 +74,7 @@ void rds_tcp_restore_callbacks(struct socket *sock,
 u32 rds_tcp_write_seq(struct rds_tcp_connection *tc);
 u32 rds_tcp_snd_una(struct rds_tcp_connection *tc);
 extern struct rds_transport rds_tcp_transport;
-void rds_tcp_accept_work(struct sock *sk);
+void rds_tcp_accept_work(struct rds_tcp_net *rtn);
 int rds_tcp_laddr_check(struct net *net, const struct in6_addr *addr,
 			__u32 scope_id);
 /* tcp_connect.c */
@@ -69,7 +86,8 @@ void rds_tcp_state_change(struct sock *sk);
 struct socket *rds_tcp_listen_init(struct net *net, bool isv6);
 void rds_tcp_listen_stop(struct socket *sock, struct work_struct *acceptor);
 void rds_tcp_listen_data_ready(struct sock *sk);
-int rds_tcp_accept_one(struct socket *sock);
+void rds_tcp_conn_slots_available(struct rds_connection *conn);
+int rds_tcp_accept_one(struct rds_tcp_net *rtn);
 void rds_tcp_keepalive(struct socket *sock);
 void *rds_tcp_listen_sock_def_readable(struct net *net);
 
diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c
index fced9e286f79..78d4990b2086 100644
--- a/net/rds/tcp_listen.c
+++ b/net/rds/tcp_listen.c
@@ -35,6 +35,8 @@
 #include <linux/in.h>
 #include <net/tcp.h>
 #include <trace/events/sock.h>
+#include <net/net_namespace.h>
+#include <net/netns/generic.h>
 
 #include "rds.h"
 #include "tcp.h"
@@ -66,32 +68,46 @@ struct rds_tcp_connection *rds_tcp_accept_one_path(struct rds_connection *conn)
 	int i;
 	int npaths = max_t(int, 1, conn->c_npaths);
 
-	/* for mprds, all paths MUST be initiated by the peer
-	 * with the smaller address.
-	 */
-	if (rds_addr_cmp(&conn->c_faddr, &conn->c_laddr) >= 0) {
-		/* Make sure we initiate at least one path if this
-		 * has not already been done; rds_start_mprds() will
-		 * take care of additional paths, if necessary.
-		 */
-		if (npaths == 1)
-			rds_conn_path_connect_if_down(&conn->c_path[0]);
-		return NULL;
-	}
-
 	for (i = 0; i < npaths; i++) {
 		struct rds_conn_path *cp = &conn->c_path[i];
 
 		if (rds_conn_path_transition(cp, RDS_CONN_DOWN,
-					     RDS_CONN_CONNECTING)) {
+					     RDS_CONN_CONNECTING))
 			return cp->cp_transport_data;
-		}
 	}
 	return NULL;
 }
 
-int rds_tcp_accept_one(struct socket *sock)
+void rds_tcp_conn_slots_available(struct rds_connection *conn)
+{
+	struct rds_tcp_connection *tc;
+	struct rds_tcp_net *rtn;
+
+	if (rds_destroy_pending(conn))
+		return;
+
+	tc = conn->c_path->cp_transport_data;
+	rtn = tc->t_rtn;
+	if (!rtn)
+		return;
+
+	/* As soon as a connection went down,
+	 * it is safe to schedule a "rds_tcp_accept_one"
+	 * attempt even if there are no connections pending:
+	 * Function "rds_tcp_accept_one" won't block
+	 * but simply return -EAGAIN in that case.
+	 *
+	 * Doing so is necessary to address the case where an
+	 * incoming connection on "rds_tcp_listen_sock" is ready
+	 * to be acccepted prior to a free slot being available:
+	 * the -ENOBUFS case in "rds_tcp_accept_one".
+	 */
+	rds_tcp_accept_work(rtn);
+}
+
+int rds_tcp_accept_one(struct rds_tcp_net *rtn)
 {
+	struct socket *listen_sock = rtn->rds_tcp_listen_sock;
 	struct socket *new_sock = NULL;
 	struct rds_connection *conn;
 	int ret;
@@ -109,33 +125,40 @@ int rds_tcp_accept_one(struct socket *sock)
 #endif
 	int dev_if = 0;
 
-	if (!sock) /* module unload or netns delete in progress */
+	if (!listen_sock)
 		return -ENETUNREACH;
 
-	ret = sock_create_lite(sock->sk->sk_family,
-			       sock->sk->sk_type, sock->sk->sk_protocol,
-			       &new_sock);
-	if (ret)
-		goto out;
-
-	ret = sock->ops->accept(sock, new_sock, &arg);
-	if (ret < 0)
-		goto out;
-
-	/* sock_create_lite() does not get a hold on the owner module so we
-	 * need to do it here.  Note that sock_release() uses sock->ops to
-	 * determine if it needs to decrement the reference count.  So set
-	 * sock->ops after calling accept() in case that fails.  And there's
-	 * no need to do try_module_get() as the listener should have a hold
-	 * already.
-	 */
-	new_sock->ops = sock->ops;
-	__module_get(new_sock->ops->owner);
+	mutex_lock(&rtn->rds_tcp_accept_lock);
+	new_sock = rtn->rds_tcp_accepted_sock;
+	rtn->rds_tcp_accepted_sock = NULL;
+
+	if (!new_sock) {
+		ret = sock_create_lite(listen_sock->sk->sk_family,
+				       listen_sock->sk->sk_type,
+				       listen_sock->sk->sk_protocol,
+				       &new_sock);
+		if (ret)
+			goto out;
+
+		ret = listen_sock->ops->accept(listen_sock, new_sock, &arg);
+		if (ret < 0)
+			goto out;
+
+		/* sock_create_lite() does not get a hold on the owner module so we
+		 * need to do it here.  Note that sock_release() uses sock->ops to
+		 * determine if it needs to decrement the reference count.  So set
+		 * sock->ops after calling accept() in case that fails.  And there's
+		 * no need to do try_module_get() as the listener should have a hold
+		 * already.
+		 */
+		new_sock->ops = listen_sock->ops;
+		__module_get(new_sock->ops->owner);
 
-	rds_tcp_keepalive(new_sock);
-	if (!rds_tcp_tune(new_sock)) {
-		ret = -EINVAL;
-		goto out;
+		rds_tcp_keepalive(new_sock);
+		if (!rds_tcp_tune(new_sock)) {
+			ret = -EINVAL;
+			goto out;
+		}
 	}
 
 	inet = inet_sk(new_sock->sk);
@@ -150,7 +173,7 @@ int rds_tcp_accept_one(struct socket *sock)
 	peer_addr = &daddr;
 #endif
 	rdsdebug("accepted family %d tcp %pI6c:%u -> %pI6c:%u\n",
-		 sock->sk->sk_family,
+		 listen_sock->sk->sk_family,
 		 my_addr, ntohs(inet->inet_sport),
 		 peer_addr, ntohs(inet->inet_dport));
 
@@ -170,13 +193,13 @@ int rds_tcp_accept_one(struct socket *sock)
 	}
 #endif
 
-	if (!rds_tcp_laddr_check(sock_net(sock->sk), peer_addr, dev_if)) {
+	if (!rds_tcp_laddr_check(sock_net(listen_sock->sk), peer_addr, dev_if)) {
 		/* local address connection is only allowed via loopback */
 		ret = -EOPNOTSUPP;
 		goto out;
 	}
 
-	conn = rds_conn_create(sock_net(sock->sk),
+	conn = rds_conn_create(sock_net(listen_sock->sk),
 			       my_addr, peer_addr,
 			       &rds_tcp_transport, 0, GFP_KERNEL, dev_if);
 
@@ -189,15 +212,51 @@ int rds_tcp_accept_one(struct socket *sock)
 	 * If the client reboots, this conn will need to be cleaned up.
 	 * rds_tcp_state_change() will do that cleanup
 	 */
-	rs_tcp = rds_tcp_accept_one_path(conn);
-	if (!rs_tcp)
+	if (rds_addr_cmp(&conn->c_faddr, &conn->c_laddr) < 0) {
+		/* Try to obtain a free connection slot.
+		 * If unsuccessful, we need to preserve "new_sock"
+		 * that we just accepted, since its "sk_receive_queue"
+		 * may contain messages already that have been acknowledged
+		 * to and discarded by the sender.
+		 * We must not throw those away!
+		 */
+		rs_tcp = rds_tcp_accept_one_path(conn);
+		if (!rs_tcp) {
+			/* It's okay to stash "new_sock", since
+			 * "rds_tcp_conn_slots_available" triggers "rds_tcp_accept_one"
+			 * again as soon as one of the connection slots
+			 * becomes available again
+			 */
+			rtn->rds_tcp_accepted_sock = new_sock;
+			new_sock = NULL;
+			ret = -ENOBUFS;
+			goto out;
+		}
+	} else {
+		/* This connection request came from a peer with
+		 * a larger address.
+		 * Function "rds_tcp_state_change" makes sure
+		 * that the connection doesn't transition
+		 * to state "RDS_CONN_UP", and therefore
+		 * we should not have received any messages
+		 * on this socket yet.
+		 * This is the only case where it's okay to
+		 * not dequeue messages from "sk_receive_queue".
+		 */
+		if (conn->c_npaths <= 1)
+			rds_conn_path_connect_if_down(&conn->c_path[0]);
+		rs_tcp = NULL;
 		goto rst_nsk;
+	}
+
 	mutex_lock(&rs_tcp->t_conn_path_lock);
 	cp = rs_tcp->t_cpath;
 	conn_state = rds_conn_path_state(cp);
 	WARN_ON(conn_state == RDS_CONN_UP);
-	if (conn_state != RDS_CONN_CONNECTING && conn_state != RDS_CONN_ERROR)
+	if (conn_state != RDS_CONN_CONNECTING && conn_state != RDS_CONN_ERROR) {
+		rds_conn_path_drop(cp, 0);
 		goto rst_nsk;
+	}
 	if (rs_tcp->t_sock) {
 		/* Duelling SYN has been handled in rds_tcp_accept_one() */
 		rds_tcp_reset_callbacks(new_sock, cp);
@@ -227,6 +286,9 @@ int rds_tcp_accept_one(struct socket *sock)
 		mutex_unlock(&rs_tcp->t_conn_path_lock);
 	if (new_sock)
 		sock_release(new_sock);
+
+	mutex_unlock(&rtn->rds_tcp_accept_lock);
+
 	return ret;
 }
 
@@ -254,7 +316,7 @@ void rds_tcp_listen_data_ready(struct sock *sk)
 	 * the listen socket is being torn down.
 	 */
 	if (sk->sk_state == TCP_LISTEN)
-		rds_tcp_accept_work(sk);
+		rds_tcp_accept_work(net_generic(sock_net(sk), rds_tcp_netid));
 	else
 		ready = rds_tcp_listen_sock_def_readable(sock_net(sk));
 
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH v2 6/8] net/rds: Encode cp_index in TCP source port
  2025-04-11 18:01 [PATCH v2 0/8] RDS bug fix collection allison.henderson
                   ` (4 preceding siblings ...)
  2025-04-11 18:02 ` [PATCH v2 5/8] net/rds: rds_tcp_accept_one ought to not discard messages allison.henderson
@ 2025-04-11 18:02 ` allison.henderson
  2025-04-11 18:02 ` [PATCH v2 7/8] net/rds: rds_tcp_conn_path_shutdown must not discard messages allison.henderson
  2025-04-11 18:02 ` [PATCH v2 8/8] net/rds: Kick-start TCP receiver after accept allison.henderson
  7 siblings, 0 replies; 16+ messages in thread
From: allison.henderson @ 2025-04-11 18:02 UTC (permalink / raw)
  To: netdev

From: Gerd Rausch <gerd.rausch@oracle.com>

Upon "sendmsg", RDS/TCP selects a backend connection based
on a hash calculated from the source-port ("RDS_MPATH_HASH").

However, "rds_tcp_accept_one" accepts connections
in the order they arrive, which is non-deterministic.

Therefore the mapping of the sender's "cp->cp_index"
to that of the receiver changes if the backend
connections are dropped and reconnected.

However, connection state that's preserved across reconnects
(e.g. "cp_next_rx_seq") relies on that sender<->receiver
mapping to never change.

So we make sure that client and server of the TCP connection
have the exact same "cp->cp_index" across reconnects by
encoding "cp->cp_index" in the lower three bits of the
client's TCP source port.

A new extension "RDS_EXTHDR_SPORT_IDX" is introduced,
that allows the server to tell the difference between
clients that do the "cp->cp_index" encoding, and
legacy clients that pick source ports randomly.

Signed-off-by: Gerd Rausch <gerd.rausch@oracle.com>
Signed-off-by: Allison Henderson <allison.henderson@oracle.com>
---
 net/rds/message.c     |  1 +
 net/rds/rds.h         |  3 +++
 net/rds/recv.c        |  7 +++++++
 net/rds/send.c        |  5 +++++
 net/rds/tcp.h         |  1 +
 net/rds/tcp_connect.c | 22 ++++++++++++++++++++-
 net/rds/tcp_listen.c  | 45 +++++++++++++++++++++++++++++++++++++------
 7 files changed, 77 insertions(+), 7 deletions(-)

diff --git a/net/rds/message.c b/net/rds/message.c
index 7af59d2443e5..31990ac4f3ef 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -46,6 +46,7 @@ static unsigned int	rds_exthdr_size[__RDS_EXTHDR_MAX] = {
 [RDS_EXTHDR_RDMA_DEST]	= sizeof(struct rds_ext_header_rdma_dest),
 [RDS_EXTHDR_NPATHS]	= sizeof(u16),
 [RDS_EXTHDR_GEN_NUM]	= sizeof(u32),
+[RDS_EXTHDR_SPORT_IDX]  = 1,
 };
 
 void rds_message_addref(struct rds_message *rm)
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 0d66d4f4cc63..89235873eb71 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -151,6 +151,7 @@ struct rds_connection {
 				c_ping_triggered:1,
 				c_pad_to_32:29;
 	int			c_npaths;
+	bool			c_with_sport_idx;
 	struct rds_connection	*c_passive;
 	struct rds_transport	*c_trans;
 
@@ -267,8 +268,10 @@ struct rds_ext_header_rdma_dest {
  */
 #define RDS_EXTHDR_NPATHS	5
 #define RDS_EXTHDR_GEN_NUM	6
+#define RDS_EXTHDR_SPORT_IDX    8
 
 #define __RDS_EXTHDR_MAX	16 /* for now */
+
 #define RDS_RX_MAX_TRACES	(RDS_MSG_RX_DGRAM_TRACE_MAX + 1)
 #define	RDS_MSG_RX_HDR		0
 #define	RDS_MSG_RX_START	1
diff --git a/net/rds/recv.c b/net/rds/recv.c
index c0a89af29d1b..f33b4904073d 100644
--- a/net/rds/recv.c
+++ b/net/rds/recv.c
@@ -204,7 +204,9 @@ static void rds_recv_hs_exthdrs(struct rds_header *hdr,
 		struct rds_ext_header_version version;
 		u16 rds_npaths;
 		u32 rds_gen_num;
+		u8     dummy;
 	} buffer;
+	bool new_with_sport_idx = false;
 	u32 new_peer_gen_num = 0;
 
 	while (1) {
@@ -221,11 +223,16 @@ static void rds_recv_hs_exthdrs(struct rds_header *hdr,
 		case RDS_EXTHDR_GEN_NUM:
 			new_peer_gen_num = be32_to_cpu(buffer.rds_gen_num);
 			break;
+		case RDS_EXTHDR_SPORT_IDX:
+			new_with_sport_idx = true;
+			break;
 		default:
 			pr_warn_ratelimited("ignoring unknown exthdr type "
 					     "0x%x\n", type);
 		}
 	}
+
+	conn->c_with_sport_idx = new_with_sport_idx;
 	/* if RDS_EXTHDR_NPATHS was not found, default to a single-path */
 	conn->c_npaths = max_t(int, conn->c_npaths, 1);
 	conn->c_ping_triggered = 0;
diff --git a/net/rds/send.c b/net/rds/send.c
index 44a89d6c27a0..e173c6bcf7d5 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -1456,6 +1456,7 @@ rds_send_probe(struct rds_conn_path *cp, __be16 sport,
 	    cp->cp_conn->c_trans->t_mp_capable) {
 		u16 npaths = cpu_to_be16(RDS_MPATH_WORKERS);
 		u32 my_gen_num = cpu_to_be32(cp->cp_conn->c_my_gen_num);
+		u8 dummy = 0;
 
 		rds_message_add_extension(&rm->m_inc.i_hdr,
 					  RDS_EXTHDR_NPATHS, &npaths,
@@ -1464,6 +1465,10 @@ rds_send_probe(struct rds_conn_path *cp, __be16 sport,
 					  RDS_EXTHDR_GEN_NUM,
 					  &my_gen_num,
 					  sizeof(u32));
+		rds_message_add_extension(&rm->m_inc.i_hdr,
+					  RDS_EXTHDR_SPORT_IDX,
+					  &dummy,
+					  sizeof(u8));
 	}
 	spin_unlock_irqrestore(&cp->cp_lock, flags);
 
diff --git a/net/rds/tcp.h b/net/rds/tcp.h
index 2000f4acd57a..3beb0557104e 100644
--- a/net/rds/tcp.h
+++ b/net/rds/tcp.h
@@ -34,6 +34,7 @@ struct rds_tcp_connection {
 	 */
 	struct mutex		t_conn_path_lock;
 	struct socket		*t_sock;
+	u32			t_client_port_group;
 	struct rds_tcp_net	*t_rtn;
 	void			*t_orig_write_space;
 	void			*t_orig_data_ready;
diff --git a/net/rds/tcp_connect.c b/net/rds/tcp_connect.c
index a0046e99d6df..6b9d4776e504 100644
--- a/net/rds/tcp_connect.c
+++ b/net/rds/tcp_connect.c
@@ -93,6 +93,8 @@ int rds_tcp_conn_path_connect(struct rds_conn_path *cp)
 	struct sockaddr_in6 sin6;
 	struct sockaddr_in sin;
 	struct sockaddr *addr;
+	int port_low, port_high, port;
+	int port_groups, groups_left;
 	int addrlen;
 	bool isv6;
 	int ret;
@@ -145,7 +147,25 @@ int rds_tcp_conn_path_connect(struct rds_conn_path *cp)
 		addrlen = sizeof(sin);
 	}
 
-	ret = kernel_bind(sock, addr, addrlen);
+	/* encode cp->cp_index in lowest bits of source-port */
+	inet_get_local_port_range(rds_conn_net(conn), &port_low, &port_high);
+	port_low = ALIGN(port_low, RDS_MPATH_WORKERS);
+	port_groups = (port_high - port_low + 1) / RDS_MPATH_WORKERS;
+	ret = -EADDRINUSE;
+	groups_left = port_groups;
+	while (groups_left-- > 0 && ret) {
+		if (++tc->t_client_port_group >= port_groups)
+			tc->t_client_port_group = 0;
+		port =  port_low +
+			tc->t_client_port_group * RDS_MPATH_WORKERS +
+			cp->cp_index;
+
+		if (isv6)
+			sin6.sin6_port = htons(port);
+		else
+			sin.sin_port = htons(port);
+		ret = sock->ops->bind(sock, addr, addrlen);
+	}
 	if (ret) {
 		rdsdebug("bind failed with %d at address %pI6c\n",
 			 ret, &conn->c_laddr);
diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c
index 78d4990b2086..30146204dc6c 100644
--- a/net/rds/tcp_listen.c
+++ b/net/rds/tcp_listen.c
@@ -62,19 +62,52 @@ void rds_tcp_keepalive(struct socket *sock)
  * we special case cp_index 0 is to allow the rds probe ping itself to itself
  * get through efficiently.
  */
-static
-struct rds_tcp_connection *rds_tcp_accept_one_path(struct rds_connection *conn)
+static struct rds_tcp_connection *
+rds_tcp_accept_one_path(struct rds_connection *conn, struct socket *sock)
 {
-	int i;
-	int npaths = max_t(int, 1, conn->c_npaths);
+	union {
+		struct sockaddr_storage storage;
+		struct sockaddr addr;
+		struct sockaddr_in sin;
+		struct sockaddr_in6 sin6;
+	} saddr;
+	int sport, npaths, i_min, i_max, i;
+
+	if (conn->c_with_sport_idx &&
+	    kernel_getpeername(sock, &saddr.addr) == 0) {
+		/* cp->cp_index is encoded in lowest bits of source-port */
+		switch (saddr.addr.sa_family) {
+		case AF_INET:
+			sport = ntohs(saddr.sin.sin_port);
+			break;
+		case AF_INET6:
+			sport = ntohs(saddr.sin6.sin6_port);
+			break;
+		default:
+			sport = -1;
+		}
+	} else {
+		sport = -1;
+	}
+
+	npaths = max_t(int, 1, conn->c_npaths);
 
-	for (i = 0; i < npaths; i++) {
+	if (sport >= 0) {
+		i_min = sport % npaths;
+		i_max = i_min;
+	} else {
+		i_min = 0;
+		i_max = npaths - 1;
+	}
+
+	for (i = i_min; i <= i_max; i++) {
 		struct rds_conn_path *cp = &conn->c_path[i];
 
 		if (rds_conn_path_transition(cp, RDS_CONN_DOWN,
 					     RDS_CONN_CONNECTING))
 			return cp->cp_transport_data;
 	}
+
 	return NULL;
 }
 
@@ -220,7 +253,7 @@ int rds_tcp_accept_one(struct rds_tcp_net *rtn)
 		 * to and discarded by the sender.
 		 * We must not throw those away!
 		 */
-		rs_tcp = rds_tcp_accept_one_path(conn);
+		rs_tcp = rds_tcp_accept_one_path(conn, new_sock);
 		if (!rs_tcp) {
 			/* It's okay to stash "new_sock", since
 			 * "rds_tcp_conn_slots_available" triggers "rds_tcp_accept_one"
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH v2 7/8] net/rds: rds_tcp_conn_path_shutdown must not discard messages
  2025-04-11 18:01 [PATCH v2 0/8] RDS bug fix collection allison.henderson
                   ` (5 preceding siblings ...)
  2025-04-11 18:02 ` [PATCH v2 6/8] net/rds: Encode cp_index in TCP source port allison.henderson
@ 2025-04-11 18:02 ` allison.henderson
  2025-04-17  9:47   ` Simon Horman
  2025-04-11 18:02 ` [PATCH v2 8/8] net/rds: Kick-start TCP receiver after accept allison.henderson
  7 siblings, 1 reply; 16+ messages in thread
From: allison.henderson @ 2025-04-11 18:02 UTC (permalink / raw)
  To: netdev

From: Gerd Rausch <gerd.rausch@oracle.com>

RDS/TCP differs from RDS/RDMA in that message acknowledgment
is done based on TCP sequence numbers:
As soon as the last byte of a message has been acknowledged by the
TCP stack of a peer, rds_tcp_write_space() goes on to discard
prior messages from the send queue.

Which is fine, for as long as the receiver never throws any messages
away.

The dequeuing of messages in RDS/TCP is done either from the
"sk_data_ready" callback pointing to rds_tcp_data_ready()
(the most common case), or from the receive worker pointing
to rds_tcp_recv_path() which is called for as long as the
connection is "RDS_CONN_UP".

However, as soon as rds_conn_path_drop() is called for whatever reason,
including "DR_USER_RESET", "cp_state" transitions to "RDS_CONN_ERROR",
and rds_tcp_restore_callbacks() ends up restoring the callbacks
and thereby disabling message receipt.

So messages already acknowledged to the sender were dropped.

Furthermore, the "->shutdown" callback was always called
with an invalid parameter ("RCV_SHUTDOWN | SEND_SHUTDOWN == 3"),
instead of the correct pre-increment value ("SHUT_RDWR == 2").
inet_shutdown() returns "-EINVAL" in such cases, rendering
this call a NOOP.

So we change rds_tcp_conn_path_shutdown() to do the proper
"->shutdown(SHUT_WR)" call in order to signal EOF to the peer
and make it transition to "TCP_CLOSE_WAIT" (RFC 793).

This should make the peer also enter rds_tcp_conn_path_shutdown()
and do the same.

This allows us to dequeue all messages already received
and acknowledged to the peer.
We do so, until we know that the receive queue no longer has data
(skb_queue_empty()) and that we couldn't have any data
in flight anymore, because the socket transitioned to
any of the states "CLOSING", "TIME_WAIT", "CLOSE_WAIT",
"LAST_ACK", or "CLOSE" (RFC 793).

However, if we do just that, we suddenly see duplicate RDS
messages being delivered to the application.
So what gives?

Turns out that with MPRDS and its multitude of backend connections,
retransmitted messages ("RDS_FLAG_RETRANSMITTED") can outrace
the dequeuing of their original counterparts.

And the duplicate check implemented in rds_recv_local() only
discards duplicates if flag "RDS_FLAG_RETRANSMITTED" is set.

Rather curious, because a duplicate is a duplicate; it shouldn't
matter which copy is looked at and delivered first.

To avoid this entire situation, in recognition that can't fix
all of this brokenness in a single commit, we simply make
the sender discard messages from the send-queue right from
within rds_tcp_conn_path_shutdown().
Just like rds_tcp_write_space() would have done, were it
called in time or still called.

This makes sure that we no longer have messages that we know
the receiver already dequeued sitting in our send-queue,
and therefore avoid the entire "RDS_FLAG_RETRANSMITTED" fiasco.

Now we got rid of the duplicate RDS message delivery, but we
still run into cases where RDS messages are dropped.

This time it is due to the delayed setting of the socket-callbacks
in rds_tcp_accept_one() via either rds_tcp_reset_callbacks()
or rds_tcp_set_callbacks().

By the time rds_tcp_accept_one() gets there, the socket
may already have transitioned into state "TCP_CLOSE_WAIT",
but rds_tcp_state_change() was never called.

Subsequently, "->shutdown(SHUT_WR)" did not happen either.
So the peer ends up getting stuck in state "TCP_FIN_WAIT2".

We fix that by checking for states "TCP_CLOSE_WAIT", "TCP_LAST_ACK",
or "TCP_CLOSE" and drop the freshly accepted socket in that case.

This problem is observable by running "rds-stress --reset"
frequently on either of the two sides of a RDS connection,
or both while other "rds-stress" processes are exchanging data.
Those "rds-stress" processes reported out-of-sequence
errors, with the expected sequence number being smaller
than the one actually received (due to the dropped messages).

Signed-off-by: Gerd Rausch <gerd.rausch@oracle.com>
Signed-off-by: Allison Henderson <allison.henderson@oracle.com>
---
 net/rds/tcp.c         |  1 +
 net/rds/tcp.h         |  4 ++++
 net/rds/tcp_connect.c | 46 ++++++++++++++++++++++++++++++++++++++++++-
 net/rds/tcp_listen.c  | 14 +++++++++++++
 net/rds/tcp_recv.c    |  4 ++++
 net/rds/tcp_send.c    |  2 +-
 6 files changed, 69 insertions(+), 2 deletions(-)

diff --git a/net/rds/tcp.c b/net/rds/tcp.c
index 31e7425e2da9..45484a93d75f 100644
--- a/net/rds/tcp.c
+++ b/net/rds/tcp.c
@@ -384,6 +384,7 @@ static int rds_tcp_conn_alloc(struct rds_connection *conn, gfp_t gfp)
 		tc->t_tinc = NULL;
 		tc->t_tinc_hdr_rem = sizeof(struct rds_header);
 		tc->t_tinc_data_rem = 0;
+		init_waitqueue_head(&tc->t_recv_done_waitq);
 
 		conn->c_path[i].cp_transport_data = tc;
 		tc->t_cpath = &conn->c_path[i];
diff --git a/net/rds/tcp.h b/net/rds/tcp.h
index 3beb0557104e..1893bc4bd342 100644
--- a/net/rds/tcp.h
+++ b/net/rds/tcp.h
@@ -55,6 +55,9 @@ struct rds_tcp_connection {
 	u32			t_last_sent_nxt;
 	u32			t_last_expected_una;
 	u32			t_last_seen_una;
+
+	/* for rds_tcp_conn_path_shutdown */
+	wait_queue_head_t	t_recv_done_waitq;
 };
 
 struct rds_tcp_statistics {
@@ -105,6 +108,7 @@ void rds_tcp_xmit_path_prepare(struct rds_conn_path *cp);
 void rds_tcp_xmit_path_complete(struct rds_conn_path *cp);
 int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
 		 unsigned int hdr_off, unsigned int sg, unsigned int off);
+int rds_tcp_is_acked(struct rds_message *rm, uint64_t ack);
 void rds_tcp_write_space(struct sock *sk);
 
 /* tcp_stats.c */
diff --git a/net/rds/tcp_connect.c b/net/rds/tcp_connect.c
index 6b9d4776e504..f832cdfe149b 100644
--- a/net/rds/tcp_connect.c
+++ b/net/rds/tcp_connect.c
@@ -75,8 +75,16 @@ void rds_tcp_state_change(struct sock *sk)
 			rds_connect_path_complete(cp, RDS_CONN_CONNECTING);
 		}
 		break;
+	case TCP_CLOSING:
+	case TCP_TIME_WAIT:
+		if (wq_has_sleeper(&tc->t_recv_done_waitq))
+			wake_up(&tc->t_recv_done_waitq);
+		break;
 	case TCP_CLOSE_WAIT:
+	case TCP_LAST_ACK:
 	case TCP_CLOSE:
+		if (wq_has_sleeper(&tc->t_recv_done_waitq))
+			wake_up(&tc->t_recv_done_waitq);
 		rds_conn_path_drop(cp, false);
 		break;
 	default:
@@ -225,6 +233,7 @@ void rds_tcp_conn_path_shutdown(struct rds_conn_path *cp)
 {
 	struct rds_tcp_connection *tc = cp->cp_transport_data;
 	struct socket *sock = tc->t_sock;
+	unsigned int rounds;
 
 	rdsdebug("shutting down conn %p tc %p sock %p\n",
 		 cp->cp_conn, tc, sock);
@@ -232,8 +241,43 @@ void rds_tcp_conn_path_shutdown(struct rds_conn_path *cp)
 	if (sock) {
 		if (rds_destroy_pending(cp->cp_conn))
 			sock_no_linger(sock->sk);
-		sock->ops->shutdown(sock, RCV_SHUTDOWN | SEND_SHUTDOWN);
+
+		sock->ops->shutdown(sock, SHUT_WR);
+
+		/* after sending FIN,
+		 * wait until we processed all incoming messages
+		 * and we're sure that there won't be any more:
+		 * i.e. state CLOSING, TIME_WAIT, CLOSE_WAIT,
+		 * LAST_ACK, or CLOSE (RFC 793).
+		 *
+		 * Give up waiting after 5 seconds and allow messages
+		 * to theoretically get dropped, if the TCP transition
+		 * didn't happen.
+		 */
+		rounds = 0;
+		do {
+			/* we need to ensure messages are dequeued here
+			 * since "rds_recv_worker" only dispatches messages
+			 * while the connection is still in RDS_CONN_UP
+			 * and there is no guarantee that "rds_tcp_data_ready"
+			 * was called nor that "sk_data_ready" still points to it.
+			 */
+			rds_tcp_recv_path(cp);
+		} while (!wait_event_timeout(tc->t_recv_done_waitq,
+					     (sock->sk->sk_state == TCP_CLOSING ||
+					      sock->sk->sk_state == TCP_TIME_WAIT ||
+					      sock->sk->sk_state == TCP_CLOSE_WAIT ||
+					      sock->sk->sk_state == TCP_LAST_ACK ||
+					      sock->sk->sk_state == TCP_CLOSE) &&
+					     skb_queue_empty_lockless(&sock->sk->sk_receive_queue),
+					     msecs_to_jiffies(100)) &&
+			 ++rounds < 50);
 		lock_sock(sock->sk);
+
+		/* discard messages that the peer received already */
+		tc->t_last_seen_una = rds_tcp_snd_una(tc);
+		rds_send_path_drop_acked(cp, rds_tcp_snd_una(tc), rds_tcp_is_acked);
+
 		rds_tcp_restore_callbacks(sock, tc); /* tc->tc_sock = NULL */
 
 		release_sock(sock->sk);
diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c
index 30146204dc6c..a9596440a456 100644
--- a/net/rds/tcp_listen.c
+++ b/net/rds/tcp_listen.c
@@ -299,6 +299,20 @@ int rds_tcp_accept_one(struct rds_tcp_net *rtn)
 		rds_tcp_set_callbacks(new_sock, cp);
 		rds_connect_path_complete(cp, RDS_CONN_CONNECTING);
 	}
+
+	/* Since "rds_tcp_set_callbacks" happens this late
+	 * the connection may already have been closed without
+	 * "rds_tcp_state_change" doing its due dilligence.
+	 *
+	 * If that's the case, we simply drop the path,
+	 * knowing that "rds_tcp_conn_path_shutdown" will
+	 * dequeue pending messages.
+	 */
+	if (new_sock->sk->sk_state == TCP_CLOSE_WAIT ||
+	    new_sock->sk->sk_state == TCP_LAST_ACK ||
+	    new_sock->sk->sk_state == TCP_CLOSE)
+		rds_conn_path_drop(cp, 0);
+
 	new_sock = NULL;
 	ret = 0;
 	if (conn->c_npaths == 0)
diff --git a/net/rds/tcp_recv.c b/net/rds/tcp_recv.c
index b7cf7f451430..49f96ee0c40f 100644
--- a/net/rds/tcp_recv.c
+++ b/net/rds/tcp_recv.c
@@ -278,6 +278,10 @@ static int rds_tcp_read_sock(struct rds_conn_path *cp, gfp_t gfp)
 	rdsdebug("tcp_read_sock for tc %p gfp 0x%x returned %d\n", tc, gfp,
 		 desc.error);
 
+	if (skb_queue_empty_lockless(&sock->sk->sk_receive_queue) &&
+	    wq_has_sleeper(&tc->t_recv_done_waitq))
+		wake_up(&tc->t_recv_done_waitq);
+
 	return desc.error;
 }
 
diff --git a/net/rds/tcp_send.c b/net/rds/tcp_send.c
index 4e82c9644aa6..7c52acc749cf 100644
--- a/net/rds/tcp_send.c
+++ b/net/rds/tcp_send.c
@@ -169,7 +169,7 @@ int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
  * unacked byte of the TCP sequence space.  We have to do very careful
  * wrapping 32bit comparisons here.
  */
-static int rds_tcp_is_acked(struct rds_message *rm, uint64_t ack)
+int rds_tcp_is_acked(struct rds_message *rm, uint64_t ack)
 {
 	if (!test_bit(RDS_MSG_HAS_ACK_SEQ, &rm->m_flags))
 		return 0;
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* [PATCH v2 8/8] net/rds: Kick-start TCP receiver after accept
  2025-04-11 18:01 [PATCH v2 0/8] RDS bug fix collection allison.henderson
                   ` (6 preceding siblings ...)
  2025-04-11 18:02 ` [PATCH v2 7/8] net/rds: rds_tcp_conn_path_shutdown must not discard messages allison.henderson
@ 2025-04-11 18:02 ` allison.henderson
  7 siblings, 0 replies; 16+ messages in thread
From: allison.henderson @ 2025-04-11 18:02 UTC (permalink / raw)
  To: netdev

From: Gerd Rausch <gerd.rausch@oracle.com>

In cases where the server (the node with the higher IP-address)
in an RDS/TCP connection is overwhelmed
(e.g. by a client continuously issuing "rds-stress --reset"),
it can happen that the socket that was just accepted
is chock-full of messages, up to the limit of what
the socket receive buffer permits.

Subsequently, "rds_tcp_data_ready" won't be called anymore,
because there is no more space to receive additional messages.

Nor was it called prior to the point of calling "rds_tcp_set_callbacks",
because the "sk_data_ready" pointer didn't even point to
"rds_tcp_data_ready" yet.

We fix this by simply kick-starting the receive-worker
for all cases where the socket state is neither
"TCP_CLOSE_WAIT" nor "TCP_CLOSE".

Signed-off-by: Gerd Rausch <gerd.rausch@oracle.com>
Signed-off-by: Allison Henderson <allison.henderson@oracle.com>
---
 net/rds/tcp_listen.c | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c
index a9596440a456..39e6cf071fb6 100644
--- a/net/rds/tcp_listen.c
+++ b/net/rds/tcp_listen.c
@@ -312,6 +312,8 @@ int rds_tcp_accept_one(struct rds_tcp_net *rtn)
 	    new_sock->sk->sk_state == TCP_LAST_ACK ||
 	    new_sock->sk->sk_state == TCP_CLOSE)
 		rds_conn_path_drop(cp, 0);
+	else
+		queue_delayed_work(cp->cp_wq, &cp->cp_recv_w, 0);
 
 	new_sock = NULL;
 	ret = 0;
-- 
2.43.0


^ permalink raw reply related	[flat|nested] 16+ messages in thread

* Re: [PATCH v2 2/8] net/rds: Introduce a pool of worker threads for connection management
  2025-04-11 18:02 ` [PATCH v2 2/8] net/rds: Introduce a pool of worker threads for connection management allison.henderson
@ 2025-04-15  0:46   ` Jakub Kicinski
  2025-04-15 21:33     ` Allison Henderson
  0 siblings, 1 reply; 16+ messages in thread
From: Jakub Kicinski @ 2025-04-15  0:46 UTC (permalink / raw)
  To: allison.henderson; +Cc: netdev

On Fri, 11 Apr 2025 11:02:01 -0700 allison.henderson@oracle.com wrote:
> RDS uses a single threaded work queue for connection management. This
> involves creation and deletion of QPs and CQs. On certain HCAs, such
> as CX-3, these operations are para-virtualized and some part of the
> work has to be conducted by the Physical Function (PF) driver.
> 
> In fail-over and fail-back situations, there might be 1000s of
> connections to tear down and re-establish. Hence, expand the number
> work queues.

sparse warnings here (C=1):

net/rds/connection.c:174:55: warning: incorrect type in argument 1 (different base types)
net/rds/connection.c:174:55:    expected unsigned int [usertype] a
net/rds/connection.c:174:55:    got restricted __be32 const
net/rds/connection.c:175:55: warning: incorrect type in argument 2 (different base types)
net/rds/connection.c:175:55:    expected unsigned int [usertype] b
net/rds/connection.c:175:55:    got restricted __be32 const
-- 
pw-bot: cr

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH v2 4/8] net/rds: No shortcut out of RDS_CONN_ERROR
  2025-04-11 18:02 ` [PATCH v2 4/8] net/rds: No shortcut out of RDS_CONN_ERROR allison.henderson
@ 2025-04-15  0:47   ` Jakub Kicinski
  0 siblings, 0 replies; 16+ messages in thread
From: Jakub Kicinski @ 2025-04-15  0:47 UTC (permalink / raw)
  To: allison.henderson; +Cc: netdev

On Fri, 11 Apr 2025 11:02:03 -0700 allison.henderson@oracle.com wrote:
> Fixes:  ("RDS: TCP: fix race windows in send-path quiescence by rds_tcp_accept_one()")
> 

missing hash and unnecessary empty line

> Signed-off-by: Gerd Rausch <gerd.rausch@oracle.com>

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH v2 2/8] net/rds: Introduce a pool of worker threads for connection management
  2025-04-15  0:46   ` Jakub Kicinski
@ 2025-04-15 21:33     ` Allison Henderson
  0 siblings, 0 replies; 16+ messages in thread
From: Allison Henderson @ 2025-04-15 21:33 UTC (permalink / raw)
  To: kuba@kernel.org; +Cc: netdev@vger.kernel.org

On Mon, 2025-04-14 at 17:46 -0700, Jakub Kicinski wrote:
> On Fri, 11 Apr 2025 11:02:01 -0700 allison.henderson@oracle.com wrote:
> > RDS uses a single threaded work queue for connection management. This
> > involves creation and deletion of QPs and CQs. On certain HCAs, such
> > as CX-3, these operations are para-virtualized and some part of the
> > work has to be conducted by the Physical Function (PF) driver.
> > 
> > In fail-over and fail-back situations, there might be 1000s of
> > connections to tear down and re-establish. Hence, expand the number
> > work queues.
> 
> sparse warnings here (C=1):
> 
> net/rds/connection.c:174:55: warning: incorrect type in argument 1 (different base types)
> net/rds/connection.c:174:55:    expected unsigned int [usertype] a
> net/rds/connection.c:174:55:    got restricted __be32 const
> net/rds/connection.c:175:55: warning: incorrect type in argument 2 (different base types)
> net/rds/connection.c:175:55:    expected unsigned int [usertype] b
> net/rds/connection.c:175:55:    got restricted __be32 const

Alrighty, will fix these.  Thanks for the catch!

Allison

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH v2 1/8] net/rds: Add per cp work queue
  2025-04-11 18:02 ` [PATCH v2 1/8] net/rds: Add per cp work queue allison.henderson
@ 2025-04-17  9:44   ` Simon Horman
  2025-04-21 15:17     ` Allison Henderson
  0 siblings, 1 reply; 16+ messages in thread
From: Simon Horman @ 2025-04-17  9:44 UTC (permalink / raw)
  To: allison.henderson; +Cc: netdev

On Fri, Apr 11, 2025 at 11:02:00AM -0700, allison.henderson@oracle.com wrote:
> From: Allison Henderson <allison.henderson@oracle.com>
> 
> This patch adds a per connection path queue which can be initialized
> and used independently of the globally shared rds_wq.
> 
> This patch is the first in a series that aims to address multiple bugs
> including following the appropriate shutdown sequnce described in

nit: sequence

> rfc793 (pg 23) https://www.ietf.org/rfc/rfc793.txt
> 
> This initial refactoring lays the ground work needed to alleviate
> queue congestion during heavy reads and writes.  The independently
> managed queues will allow shutdowns and reconnects respond more quickly
> before the peer times out waiting for the proper acks.
> 
> Signed-off-by: Allison Henderson <allison.henderson@oracle.com>

...

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH v2 7/8] net/rds: rds_tcp_conn_path_shutdown must not discard messages
  2025-04-11 18:02 ` [PATCH v2 7/8] net/rds: rds_tcp_conn_path_shutdown must not discard messages allison.henderson
@ 2025-04-17  9:47   ` Simon Horman
  2025-04-21 15:16     ` Allison Henderson
  0 siblings, 1 reply; 16+ messages in thread
From: Simon Horman @ 2025-04-17  9:47 UTC (permalink / raw)
  To: allison.henderson; +Cc: netdev

On Fri, Apr 11, 2025 at 11:02:06AM -0700, allison.henderson@oracle.com wrote:

...

> diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c
> index 30146204dc6c..a9596440a456 100644
> --- a/net/rds/tcp_listen.c
> +++ b/net/rds/tcp_listen.c
> @@ -299,6 +299,20 @@ int rds_tcp_accept_one(struct rds_tcp_net *rtn)
>  		rds_tcp_set_callbacks(new_sock, cp);
>  		rds_connect_path_complete(cp, RDS_CONN_CONNECTING);
>  	}
> +
> +	/* Since "rds_tcp_set_callbacks" happens this late
> +	 * the connection may already have been closed without
> +	 * "rds_tcp_state_change" doing its due dilligence.

nit: diligence

checkpatch.pl --codespell is your friend :)

> +	 *
> +	 * If that's the case, we simply drop the path,
> +	 * knowing that "rds_tcp_conn_path_shutdown" will
> +	 * dequeue pending messages.
> +	 */
> +	if (new_sock->sk->sk_state == TCP_CLOSE_WAIT ||
> +	    new_sock->sk->sk_state == TCP_LAST_ACK ||
> +	    new_sock->sk->sk_state == TCP_CLOSE)
> +		rds_conn_path_drop(cp, 0);
> +
>  	new_sock = NULL;
>  	ret = 0;
>  	if (conn->c_npaths == 0)

^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH v2 7/8] net/rds: rds_tcp_conn_path_shutdown must not discard messages
  2025-04-17  9:47   ` Simon Horman
@ 2025-04-21 15:16     ` Allison Henderson
  0 siblings, 0 replies; 16+ messages in thread
From: Allison Henderson @ 2025-04-21 15:16 UTC (permalink / raw)
  To: horms@kernel.org; +Cc: netdev@vger.kernel.org

On Thu, 2025-04-17 at 10:47 +0100, Simon Horman wrote:
> On Fri, Apr 11, 2025 at 11:02:06AM -0700, allison.henderson@oracle.com wrote:
> 
> ...
> 
> > diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c
> > index 30146204dc6c..a9596440a456 100644
> > --- a/net/rds/tcp_listen.c
> > +++ b/net/rds/tcp_listen.c
> > @@ -299,6 +299,20 @@ int rds_tcp_accept_one(struct rds_tcp_net *rtn)
> >  		rds_tcp_set_callbacks(new_sock, cp);
> >  		rds_connect_path_complete(cp, RDS_CONN_CONNECTING);
> >  	}
> > +
> > +	/* Since "rds_tcp_set_callbacks" happens this late
> > +	 * the connection may already have been closed without
> > +	 * "rds_tcp_state_change" doing its due dilligence.
> 
> nit: diligence
> 
> checkpatch.pl --codespell is your friend :)

Got it, will update.  Thanks for the catch.  :-)

Allison
> 
> > +	 *
> > +	 * If that's the case, we simply drop the path,
> > +	 * knowing that "rds_tcp_conn_path_shutdown" will
> > +	 * dequeue pending messages.
> > +	 */
> > +	if (new_sock->sk->sk_state == TCP_CLOSE_WAIT ||
> > +	    new_sock->sk->sk_state == TCP_LAST_ACK ||
> > +	    new_sock->sk->sk_state == TCP_CLOSE)
> > +		rds_conn_path_drop(cp, 0);
> > +
> >  	new_sock = NULL;
> >  	ret = 0;
> >  	if (conn->c_npaths == 0)


^ permalink raw reply	[flat|nested] 16+ messages in thread

* Re: [PATCH v2 1/8] net/rds: Add per cp work queue
  2025-04-17  9:44   ` Simon Horman
@ 2025-04-21 15:17     ` Allison Henderson
  0 siblings, 0 replies; 16+ messages in thread
From: Allison Henderson @ 2025-04-21 15:17 UTC (permalink / raw)
  To: horms@kernel.org; +Cc: netdev@vger.kernel.org

On Thu, 2025-04-17 at 10:44 +0100, Simon Horman wrote:
> On Fri, Apr 11, 2025 at 11:02:00AM -0700, allison.henderson@oracle.com wrote:
> > From: Allison Henderson <allison.henderson@oracle.com>
> > 
> > This patch adds a per connection path queue which can be initialized
> > and used independently of the globally shared rds_wq.
> > 
> > This patch is the first in a series that aims to address multiple bugs
> > including following the appropriate shutdown sequnce described in
> 
> nit: sequence

Alrighty, will update.  Thanks!

Allison 
> 
> > rfc793 (pg 23) https://urldefense.com/v3/__https://www.ietf.org/rfc/rfc793.txt__;!!ACWV5N9M2RV99hQ!K6D2vBe046jyNdhTCsyagnZdGmR0dgSm-OprmwzWAqKRVSH2nY63ZFlKLimbLNwL_Cxl2GqLqWQrqt02Nwk$ 
> > 
> > This initial refactoring lays the ground work needed to alleviate
> > queue congestion during heavy reads and writes.  The independently
> > managed queues will allow shutdowns and reconnects respond more quickly
> > before the peer times out waiting for the proper acks.
> > 
> > Signed-off-by: Allison Henderson <allison.henderson@oracle.com>
> 
> ...


^ permalink raw reply	[flat|nested] 16+ messages in thread

end of thread, other threads:[~2025-04-21 15:17 UTC | newest]

Thread overview: 16+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2025-04-11 18:01 [PATCH v2 0/8] RDS bug fix collection allison.henderson
2025-04-11 18:02 ` [PATCH v2 1/8] net/rds: Add per cp work queue allison.henderson
2025-04-17  9:44   ` Simon Horman
2025-04-21 15:17     ` Allison Henderson
2025-04-11 18:02 ` [PATCH v2 2/8] net/rds: Introduce a pool of worker threads for connection management allison.henderson
2025-04-15  0:46   ` Jakub Kicinski
2025-04-15 21:33     ` Allison Henderson
2025-04-11 18:02 ` [PATCH v2 3/8] net/rds: Re-factor queuing of shutdown work allison.henderson
2025-04-11 18:02 ` [PATCH v2 4/8] net/rds: No shortcut out of RDS_CONN_ERROR allison.henderson
2025-04-15  0:47   ` Jakub Kicinski
2025-04-11 18:02 ` [PATCH v2 5/8] net/rds: rds_tcp_accept_one ought to not discard messages allison.henderson
2025-04-11 18:02 ` [PATCH v2 6/8] net/rds: Encode cp_index in TCP source port allison.henderson
2025-04-11 18:02 ` [PATCH v2 7/8] net/rds: rds_tcp_conn_path_shutdown must not discard messages allison.henderson
2025-04-17  9:47   ` Simon Horman
2025-04-21 15:16     ` Allison Henderson
2025-04-11 18:02 ` [PATCH v2 8/8] net/rds: Kick-start TCP receiver after accept allison.henderson

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).