All of lore.kernel.org
 help / color / mirror / Atom feed
From: Christoph Paasch <cpaasch at apple.com>
To: mptcp at lists.01.org
Subject: Re: [MPTCP] [RFC PATCH v3 15/16] mptcp: Implement MPTCP receive path
Date: Mon, 08 Oct 2018 11:38:48 -0700	[thread overview]
Message-ID: <20181008183848.GZ36310@MacBook-Pro-19.local> (raw)
In-Reply-To: 20181005225918.9786-16-mathew.j.martineau@linux.intel.com

[-- Attachment #1: Type: text/plain, Size: 29655 bytes --]

On 05/10/18 - 15:59:17, Mat Martineau wrote:
> Parses incoming DSS options and populates outgoing MPTCP ACK
> fields. Clone incoming skbs containing MPTCP headers and propagate them
> to the MMPTCP socket using the error queue. This avoids the need for TCP
> coalesce/collapse bypass for MPTCP, although cloning incoming skbs does
> prevent some coalescing.

I think it's acceptable to bypass coalesce/collapse for MPTCP instead of
cloning every incoming packet over to the error-queue.

The way this is now, makes the error-queue quite loaded, and I don't think
it handles packet-reordering correctly.


Christoph

> 
> Keeping DSS map handling separate from data reading makes it easier to
> add multiple-subflow receiving, since the receive code can determine
> which subflow contains relevant data or data that needs to be purged
> without touching the subflow data queues.
> 
> MPTCP ACK values are now populated from an atomic value stored in the
> connection socket rather than carried in the extended control block,
> which makes use of the most up-to-date sequence number and allows the
> MPTCP ACK to be populated on TCP ACK packets that have no payload.
> 
> Signed-off-by: Mat Martineau <mathew.j.martineau(a)linux.intel.com>
> ---
>  include/linux/tcp.h           |  12 ++
>  include/net/mptcp.h           |  41 +++--
>  include/uapi/linux/errqueue.h |   1 +
>  net/ipv4/tcp_input.c          |   3 +
>  net/ipv4/tcp_output.c         | 123 +++++++-------
>  net/mptcp/options.c           |  90 +++++++++-
>  net/mptcp/protocol.c          | 304 +++++++++++++++++++++++++++++++---
>  net/mptcp/subflow.c           |  29 +++-
>  8 files changed, 508 insertions(+), 95 deletions(-)
> 
> diff --git a/include/linux/tcp.h b/include/linux/tcp.h
> index b109798482d3..845da5ee9c44 100644
> --- a/include/linux/tcp.h
> +++ b/include/linux/tcp.h
> @@ -112,6 +112,18 @@ struct tcp_options_received {
>  		u8      flags;
>  		u64     sndr_key;
>  		u64     rcvr_key;
> +		u64	ack;
> +		u64	seq;
> +		u32	subflow_seq;
> +		u16	dll;
> +		u16	checksum;
> +		u8	use_ack:1,
> +			ack64:1,
> +			use_map:1,
> +			dsn64:1,
> +			use_checksum:1,
> +			data_fin:1,
> +			__unused:2;
>  	} mptcp;
>  
>  };
> diff --git a/include/net/mptcp.h b/include/net/mptcp.h
> index 73f2d25cc42d..6c9516360de4 100644
> --- a/include/net/mptcp.h
> +++ b/include/net/mptcp.h
> @@ -38,14 +38,14 @@
>  /* MPTCP connection sock */
>  struct mptcp_sock {
>  	/* inet_connection_sock must be the first member */
> -	struct	inet_connection_sock sk;
> -	u64	local_key;
> -	u64	remote_key;
> -	u64	write_seq;
> -	u64	ack_seq;
> -	u32	token;
> -	struct	socket *connection_list; /* @@ needs to be a list */
> -	struct	socket *subflow; /* outgoing connect, listener or !mp_capable */
> +	struct inet_connection_sock sk;
> +	u64		local_key;
> +	u64		remote_key;
> +	u64		write_seq;
> +	atomic64_t	ack_seq;
> +	u32		token;
> +	struct socket	*connection_list; /* @@ needs to be a list */
> +	struct socket	*subflow; /* outgoing connect, listener or !mp_capable */
>  };
>  
>  static inline struct mptcp_sock *mptcp_sk(const struct sock *sk)
> @@ -56,7 +56,6 @@ static inline struct mptcp_sock *mptcp_sk(const struct sock *sk)
>  /* MPTCP sk_buff private control buffer */
>  struct mptcp_skb_cb {
>  	refcount_t	refcnt;
> -	u64		data_ack;
>  	u64		data_seq;
>  	u32		subflow_seq;
>  	u16		dll;
> @@ -65,9 +64,7 @@ struct mptcp_skb_cb {
>  			dsn64:1,
>  			use_checksum:1,
>  			data_fin:1,
> -			use_ack:1,
> -			ack64:1,
> -			__unused:2;
> +			__unused:4;
>  };
>  
>  static inline struct mptcp_skb_cb *mptcp_skb_priv_cb(struct sk_buff *skb)
> @@ -81,16 +78,23 @@ struct subflow_sock {
>  	/* tcp_sock must be the first member */
>  	struct	tcp_sock sk;
>  	u64	local_key;
> +	u64	map_seq;
> +	u32	map_subflow_seq;
>  	u32	token;
>  	u64	idsn;
>  	u64	remote_key;
>  	u32	rel_write_seq;
> +	u32	ssn_offset;
> +	u16	map_dll;
>  	bool	request_mptcp;	// send MP_CAPABLE
>  	bool	checksum;
>  	bool	version;
>  	bool	mp_capable;	// remote is MPTCP capable
>  	bool	fourth_ack;	// send initial DSS
> +	bool	conn_finished;
> +	bool	map_valid;
>  	struct	sock *conn;	// parent mptcp_sock
> +	void	(*tcp_sk_data_ready)(struct sock *sk);
>  };
>  
>  static inline struct subflow_sock *subflow_sk(const struct sock *sk)
> @@ -98,6 +102,16 @@ static inline struct subflow_sock *subflow_sk(const struct sock *sk)
>  	return (struct subflow_sock *)sk;
>  }
>  
> +static inline struct subflow_sock *subflow_tp(const struct tcp_sock *tp)
> +{
> +	return (struct subflow_sock *)tp;
> +}
> +
> +static inline struct sock *sock_sk(const struct subflow_sock *sk)
> +{
> +	return (struct sock *)sk;
> +}
> +
>  struct subflow_request_sock {
>  	struct	tcp_request_sock sk;
>  	u8	mp_capable : 1,
> @@ -109,6 +123,7 @@ struct subflow_request_sock {
>  	u32	token;
>  	u64	idsn;
>  	u64	remote_key;
> +	u32	ssn_offset;
>  };
>  
>  static inline
> @@ -133,6 +148,8 @@ void mptcp_subflow_exit(void);
>  void mptcp_get_options(const struct sk_buff *skb,
>  		       struct tcp_options_received *options);
>  
> +void mptcp_queue_headers(struct sock *sk, struct sk_buff *original_skb);
> +
>  extern const struct tcp_request_sock_ops tcp_request_sock_ipv4_ops;
>  
>  void token_init(void);
> diff --git a/include/uapi/linux/errqueue.h b/include/uapi/linux/errqueue.h
> index c0151200f7d1..1a1ec7362a55 100644
> --- a/include/uapi/linux/errqueue.h
> +++ b/include/uapi/linux/errqueue.h
> @@ -21,6 +21,7 @@ struct sock_extended_err {
>  #define SO_EE_ORIGIN_TXSTATUS	4
>  #define SO_EE_ORIGIN_ZEROCOPY	5
>  #define SO_EE_ORIGIN_TXTIME	6
> +#define SO_EE_ORIGIN_MPTCP	7
>  #define SO_EE_ORIGIN_TIMESTAMPING SO_EE_ORIGIN_TXSTATUS
>  
>  #define SO_EE_OFFENDER(ee)	((struct sockaddr*)((ee)+1))
> diff --git a/net/ipv4/tcp_input.c b/net/ipv4/tcp_input.c
> index 9a326729637f..0904d9baa685 100644
> --- a/net/ipv4/tcp_input.c
> +++ b/net/ipv4/tcp_input.c
> @@ -5642,6 +5642,9 @@ void tcp_rcv_established(struct sock *sk, struct sk_buff *skb)
>  	tcp_urg(sk, skb, th);
>  
>  	/* step 7: process the segment text */
> +	if (tcp_sk(sk)->is_mptcp)
> +		mptcp_queue_headers(sk, skb);
> +
>  	tcp_data_queue(sk, skb);
>  
>  	tcp_data_snd_check(sk);
> diff --git a/net/ipv4/tcp_output.c b/net/ipv4/tcp_output.c
> index 471f196711fc..1c9517db82c2 100644
> --- a/net/ipv4/tcp_output.c
> +++ b/net/ipv4/tcp_output.c
> @@ -422,8 +422,8 @@ static inline bool tcp_urg_mode(const struct tcp_sock *tp)
>  #define OPTION_MPTCP_MPC_SYN	(1 << 0)
>  #define OPTION_MPTCP_MPC_SYNACK	(1 << 1)
>  #define OPTION_MPTCP_MPC_ACK	(1 << 2)
> -#define OPTION_MPTCP_MPC_DSS_MAP	(1 << 6)
> -#define OPTION_MPTCP_MPC_DSS_ACK	(1 << 7)
> +#define OPTION_MPTCP_DSS_MAP	(1 << 6)
> +#define OPTION_MPTCP_DSS_ACK	(1 << 7)
>  
>  struct tcp_out_options {
>  	u16 options;		/* bit field of OPTION_* */
> @@ -472,16 +472,12 @@ static void mptcp_options_write(__be32 *ptr, struct sk_buff *skb,
>  		}
>  	}
>  
> -	if ((OPTION_MPTCP_MPC_DSS_MAP |
> -	     OPTION_MPTCP_MPC_DSS_ACK) & opts->suboptions) {
> -		struct mptcp_skb_cb *mcb = mptcp_skb_priv_cb(skb);
> -		bool write_ack = (OPTION_MPTCP_MPC_DSS_ACK & opts->suboptions) && skb->priv_used && mcb->use_ack;
> -		bool write_map = (OPTION_MPTCP_MPC_DSS_MAP & opts->suboptions) && skb->priv_used && mcb->use_map;
> +	if ((OPTION_MPTCP_DSS_MAP | OPTION_MPTCP_DSS_ACK) & opts->suboptions) {
> +		bool write_ack = !!(OPTION_MPTCP_DSS_ACK & opts->suboptions);
> +		bool write_map = (OPTION_MPTCP_DSS_MAP & opts->suboptions) &&
> +			skb->priv_used && mptcp_skb_priv_cb(skb)->use_map;
>  		u8 flags = 0;
>  		u8 len = 4;
> -		u8 *p = (u8 *)ptr;
> -
> -		BUG_ON(!write_ack && !write_map);
>  
>  		if (write_ack) {
>  			len += 8;
> @@ -489,46 +485,60 @@ static void mptcp_options_write(__be32 *ptr, struct sk_buff *skb,
>  		}
>  
>  		if (write_map) {
> +			pr_debug("Updating DSS length and flags for map");
>  			len += 14;
>  
> -			if (mcb->use_checksum)
> +			if (mptcp_skb_priv_cb(skb)->use_checksum)
>  				len += 2;
>  
>  			/* Use only 64-bit mapping flags for now, add
>  			 * support for optional 32-bit mappings later.
>  			 */
>  			flags |= 0x0c;
> -			if (mcb->data_fin)
> +			if (mptcp_skb_priv_cb(skb)->data_fin)
>  				flags |= 0x10;
>  		}
>  
> -		*p++ = 0x1e; // TCP option: Multipath TCP
> -		*p++ = len;  // length
> -		*p++ = 0x20; // subtype=DSS
> -		*p++ = flags;
> +		*ptr++ = htonl((0x1e << 24) |  // TCP option: Multipath TCP
> +			       (len  << 16) |  // length
> +			       (0x20 <<  8) |  // subtype=DSS
> +			       (flags));
>  
>  		if (write_ack) {
> -			*(__be64 *)p = cpu_to_be64(mcb->data_ack);
> -			p += 8;
> +			struct mptcp_sock *msk = mptcp_sk(subflow_tp(tp)->conn);
> +			u64 ack_seq;
> +			__be64 ack;
> +
> +			if (msk) {
> +				ack_seq = atomic64_read(&msk->ack_seq);
> +			} else {
> +				crypto_key_sha1(subflow_tp(tp)->remote_key,
> +						NULL, &ack_seq);
> +				ack_seq++;
> +			}
> +
> +			pr_debug("ack=%llu", ack_seq);
> +			ack = cpu_to_be64(ack_seq);
> +			memcpy((u8 *) ptr, (u8 *) &ack, 8);
> +			ptr += 2;
>  		}
>  
>  		if (write_map) {
> -			*(__be64 *)p = cpu_to_be64(mcb->data_seq);
> -			p += 8;
> -
> -			*(__be32 *)p = htonl(mcb->subflow_seq);
> -			p += 4;
> +			struct mptcp_skb_cb *mcb = mptcp_skb_priv_cb(skb);
> +			u16 checksum;
> +			__be64 dss;
>  
> -			*(__be16 *)p = htons(mcb->dll);
> -			p += 2;
> +			pr_debug("Writing map values");
> +			dss = cpu_to_be64(mcb->data_seq);
> +			memcpy((u8 *) ptr, &dss, 8);
> +			ptr += 2;
> +			*ptr++ = htonl(mcb->subflow_seq);
>  
> -			if (mcb->use_checksum) {
> -				*(__be16 *)p = htons(mcb->checksum);
> -				p += 2;
> -			} else {
> -				*p++ = TCPOPT_NOP;
> -				*p++ = TCPOPT_NOP;
> -			}
> +			if (mcb->use_checksum)
> +				checksum = (__force u16) mcb->checksum;
> +			else
> +				checksum = TCPOPT_NOP << 8 | TCPOPT_NOP;
> +			*ptr++ = htonl(mcb->dll << 16 | checksum);
>  		}
>  	}
>  }
> @@ -899,50 +909,44 @@ static unsigned int tcp_established_options(struct sock *sk, struct sk_buff *skb
>  					size += 20;
>  				}
>  				subflow->fourth_ack = 1;
> -			} else if (skb && skb->priv_used) {
> -				struct mptcp_skb_cb *cb;
> +			} else if (skb) {
>  				unsigned int dss_size = 0;
> -				u16 options = 0;
> +				unsigned int ack_size = 8;
> +				u16 suboptions = 0;
>  
> -				cb = mptcp_skb_priv_cb(skb);
> -
> -				if (skb->priv_used && cb->use_map) {
> +				if (skb->priv_used &&
> +				    mptcp_skb_priv_cb(skb)->use_map) {
>  					unsigned int map_size = 18;
>  
> -					if (cb->use_checksum)
> +					if (mptcp_skb_priv_cb(skb)->use_checksum)
>  						map_size += 2;
>  
>  					if (map_size <= remaining) {
>  						remaining -= map_size;
>  						dss_size = map_size;
> -						opts->options |= OPTION_MPTCP;
> -						opts->suboptions = OPTION_MPTCP_MPC_DSS_MAP;
> +						suboptions = OPTION_MPTCP_DSS_MAP;
>  					} else {
>  						WARN(1, "MPTCP: Map dropped");
>  					}
>  				}
>  
> -				if (cb->use_ack) {
> -					unsigned int ack_size = 8;
> -
> -					/* Add kind/length/subtype/flag
> -					 * overhead if mapping not populated
> -					 */
> -					if (dss_size == 0)
> -						ack_size += 4;
> -
> -					if (ack_size <= remaining) {
> -						dss_size += ack_size;
> -						opts->options |= OPTION_MPTCP;
> -						opts->suboptions |= OPTION_MPTCP_MPC_DSS_ACK;
> -					} else {
> -						WARN(1, "MPTCP: Ack dropped");
> -					}
> +				/* Add kind/length/subtype/flag
> +				 * overhead if mapping not populated
> +				 */
> +				if (dss_size == 0)
> +					ack_size += 4;
> +
> +				if (ack_size <= remaining) {
> +					dss_size += ack_size;
> +					suboptions |= OPTION_MPTCP_DSS_ACK;
> +				} else {
> +					WARN(1, "MPTCP: Ack dropped");
>  				}
>  
>  				if (dss_size) {
>  					size += ALIGN(dss_size, 4);
> -					opts->options |= options;
> +					opts->options |= OPTION_MPTCP;
> +					opts->suboptions = suboptions;
>  				}
>  			}
>  		}
> @@ -3804,6 +3808,9 @@ void __tcp_send_ack(struct sock *sk, u32 rcv_nxt)
>  	skb_set_tcp_pure_ack(buff);
>  
>  	/* Send it off, this clears delayed acks for us. */
> +	if (tcp_sk(sk)->is_mptcp)
> +		pr_debug("mptcp sk=%p", sk);
> +
>  	__tcp_transmit_skb(sk, buff, 0, (__force gfp_t)0, rcv_nxt);
>  }
>  EXPORT_SYMBOL_GPL(__tcp_send_ack);
> diff --git a/net/mptcp/options.c b/net/mptcp/options.c
> index 7e48d1d92aac..f5b7be888e84 100644
> --- a/net/mptcp/options.c
> +++ b/net/mptcp/options.c
> @@ -14,6 +14,7 @@
>   */
>  
>  #include <linux/kernel.h>
> +#include <linux/errqueue.h>
>  #include <net/tcp.h>
>  #include <net/mptcp.h>
>  
> @@ -21,9 +22,8 @@ void mptcp_parse_option(const unsigned char *ptr, int opsize,
>  			struct tcp_options_received *opt_rx)
>  {
>  	u8 subtype;
> -	u64 *p;
> +	int expected_opsize;
>  
> -	opsize -= 2;
>  	subtype = *ptr++;
>  
>  	/* MPTCPOPT_MP_CAPABLE
> @@ -82,6 +82,68 @@ void mptcp_parse_option(const unsigned char *ptr, int opsize,
>  	case 0x20:
>  		pr_debug("DSS");
>  		opt_rx->mptcp.dss = 1;
> +
> +		opt_rx->mptcp.flags = (*ptr++) & 0x1F;
> +		opt_rx->mptcp.data_fin = (opt_rx->mptcp.flags & 0x10) != 0;
> +		opt_rx->mptcp.dsn64 = (opt_rx->mptcp.flags & 0x08) != 0;
> +		opt_rx->mptcp.use_map = (opt_rx->mptcp.flags & 0x04) != 0;
> +		opt_rx->mptcp.ack64 = (opt_rx->mptcp.flags & 0x02) != 0;
> +		opt_rx->mptcp.use_ack = (opt_rx->mptcp.flags & 0x01);
> +
> +		pr_debug("data_fin=%d dsn64=%d use_map=%d ack64=%d use_ack=%d",
> +			 opt_rx->mptcp.data_fin, opt_rx->mptcp.dsn64,
> +			 opt_rx->mptcp.use_map, opt_rx->mptcp.ack64,
> +			 opt_rx->mptcp.use_ack);
> +
> +		expected_opsize = 0;
> +
> +		if (opt_rx->mptcp.use_ack) {
> +			expected_opsize = 4;
> +			if (opt_rx->mptcp.ack64)
> +				expected_opsize += 4;
> +
> +			if (opsize < expected_opsize)
> +				break;
> +
> +			if (opt_rx->mptcp.ack64) {
> +				opt_rx->mptcp.ack = get_unaligned_be64(ptr);
> +				ptr += 8;
> +			} else {
> +				opt_rx->mptcp.ack = get_unaligned_be32(ptr);
> +				ptr += 4;
> +			}
> +
> +			pr_debug("ack=%llu", opt_rx->mptcp.ack);
> +		}
> +
> +		if (opt_rx->mptcp.use_map) {
> +			expected_opsize += 12;
> +			if (opt_rx->mptcp.dsn64)
> +				expected_opsize += 4;
> +
> +			if (opsize < expected_opsize)
> +				break;
> +
> +			if (opt_rx->mptcp.dsn64) {
> +				opt_rx->mptcp.seq = get_unaligned_be64(ptr);
> +				ptr += 8;
> +			} else {
> +				opt_rx->mptcp.seq = get_unaligned_be32(ptr);
> +				ptr += 4;
> +			}
> +
> +			opt_rx->mptcp.subflow_seq = get_unaligned_be32(ptr);
> +			ptr += 4;
> +
> +			opt_rx->mptcp.dll = get_unaligned_be16(ptr);
> +			ptr += 2;
> +
> +			opt_rx->mptcp.checksum = get_unaligned_be16(ptr);
> +
> +			pr_debug("seq=%llu subflow_seq=%u dll=%u ck=%u",
> +				 opt_rx->mptcp.seq, opt_rx->mptcp.subflow_seq,
> +				 opt_rx->mptcp.dll, opt_rx->mptcp.checksum);
> +		}
>  		break;
>  
>  	/* MPTCPOPT_ADD_ADDR
> @@ -175,3 +237,27 @@ unsigned int mptcp_synack_options(struct request_sock *req, u64 *local_key,
>  	}
>  	return subflow_req->mp_capable;
>  }
> +
> +void mptcp_queue_headers(struct sock *sk, struct sk_buff *original_skb)
> +{
> +	struct sock_exterr_skb *serr;
> +	struct sk_buff *skb;
> +	int err;
> +
> +	BUILD_BUG_ON(sizeof(struct sock_exterr_skb) > sizeof(skb->cb));
> +
> +	skb = skb_clone(original_skb, GFP_ATOMIC);
> +	if (!skb) {
> +		pr_err("clone failed skb=%p sk=%p", original_skb, sk);
> +		return;
> +	}
> +
> +	serr = SKB_EXT_ERR(skb);
> +	memset(serr, 0, sizeof(*serr));
> +	serr->ee.ee_origin = SO_EE_ORIGIN_MPTCP;
> +
> +	err = sock_queue_err_skb(sk, skb);
> +
> +	if (err)
> +		kfree_skb(skb);
> +}
> diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
> index dcaaa0dd3784..911428122c9c 100644
> --- a/net/mptcp/protocol.c
> +++ b/net/mptcp/protocol.c
> @@ -16,6 +16,8 @@
>  #include <linux/kernel.h>
>  #include <linux/module.h>
>  #include <linux/netdevice.h>
> +#include <linux/sched/signal.h>
> +#include <linux/atomic.h>
>  #include <net/sock.h>
>  #include <net/inet_common.h>
>  #include <net/inet_hashtables.h>
> @@ -23,6 +25,12 @@
>  #include <net/tcp.h>
>  #include <net/mptcp.h>
>  
> +static inline bool before64(__u64 seq1, __u64 seq2)
> +{
> +	return (__s64)(seq1-seq2) < 0;
> +}
> +#define after64(seq2, seq1)	before64(seq1, seq2)
> +
>  static void mptcp_cb_copy(const struct sk_buff *from, struct sk_buff *to)
>  {
>  	struct mptcp_skb_cb *mcb = from->priv;
> @@ -131,15 +139,12 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
>  	skb = tcp_write_queue_tail(ssk);
>  
>  	refcount_set(&mcb->refcnt, 1);
> -	mcb->data_ack = msk->ack_seq;
>  	mcb->data_seq = msk->write_seq;
>  	mcb->subflow_seq = subflow_sk(ssk)->rel_write_seq;
>  	mcb->dll = ret;
>  	mcb->checksum = 0xbeef;
>  	mcb->use_map = 1;
>  	mcb->dsn64 = 1;
> -	mcb->use_ack = 1;
> -	mcb->ack64 = 1;
>  
>  	if (mcb->use_map) {
>  		pr_debug("data_seq=%llu subflow_seq=%u "
> @@ -170,21 +175,262 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
>  	return ret;
>  }
>  
> +struct mptcp_read_arg {
> +	struct msghdr *msg;
> +};
> +
> +static u64 expand_seq(u64 old_seq, u16 old_dll, u64 seq)
> +{
> +	if ((u32)seq == (u32)old_seq)
> +		return old_seq;
> +
> +	/* Assume map covers data not mapped yet. */
> +	return seq | ((old_seq + old_dll + 1) & ~0xFFFFFFFFULL);
> +}
> +
> +static u64 get_mapped_dsn(struct subflow_sock *subflow)
> +{
> +	u32 map_offset = (tcp_sk(sock_sk(subflow))->copied_seq -
> +			  subflow->ssn_offset -
> +			  subflow->map_subflow_seq);
> +
> +	return subflow->map_seq + map_offset;
> +}
> +
> +static int mptcp_read_actor(read_descriptor_t *desc, struct sk_buff *skb,
> +			    unsigned int offset, size_t len)
> +{
> +	struct mptcp_read_arg *arg = desc->arg.data;
> +	size_t copy_len;
> +
> +	copy_len = min(desc->count, len);
> +
> +	if (likely(arg->msg)) {
> +		int err;
> +
> +		err = skb_copy_datagram_msg(skb, offset, arg->msg, copy_len);
> +		if (err) {
> +			pr_debug("error path");
> +			desc->error = err;
> +			return err;
> +		}
> +	} else {
> +		pr_debug("Flushing skb payload");
> +	}
> +
> +	// MSG_PEEK support? Other flags? MSG_TRUNC?
> +
> +	desc->count -= copy_len;
> +
> +	pr_debug("consumed %lu bytes, %lu left", copy_len, desc->count);
> +	return copy_len;
> +}
> +
> +static bool mptcp_get_mapping(struct subflow_sock *subflow)
> +{
> +	struct tcp_options_received opt;
> +	struct sk_buff *skb;
> +	bool found_map = false;
> +
> +	do {
> +		pr_debug("Dequeue mapping skb");
> +		skb = sock_dequeue_err_skb(sock_sk(subflow));
> +		if (!skb) {
> +			pr_debug("Empty queue");
> +			break;
> +		}
> +
> +		opt.mptcp.dss = 0;
> +		opt.mptcp.use_map = 0;
> +		opt.mptcp.data_fin = 0;
> +		mptcp_get_options(skb, &opt);
> +
> +		if (opt.mptcp.dss && opt.mptcp.use_map) {
> +			pr_debug("seq=%llu is64=%d ssn=%u dll=%u ck=%u",
> +				 opt.mptcp.seq, opt.mptcp.dsn64,
> +				 opt.mptcp.subflow_seq,
> +				 opt.mptcp.dll, opt.mptcp.checksum);
> +
> +			if (opt.mptcp.dll == 0) {
> +				pr_err("Infinite mapping not handled");
> +			} else if (opt.mptcp.subflow_seq == 0 &&
> +				   opt.mptcp.data_fin == 1) {
> +				pr_debug("DATA_FIN with no payload");
> +				kfree_skb(skb);
> +				break;
> +			}
> +
> +			if (!opt.mptcp.dsn64) {
> +				subflow->map_seq = expand_seq(subflow->map_seq,
> +							      subflow->map_dll,
> +							      opt.mptcp.seq);
> +				pr_debug("expanded seq=%llu", subflow->map_seq);
> +			} else {
> +				subflow->map_seq = opt.mptcp.seq;
> +			}
> +
> +			subflow->map_subflow_seq = opt.mptcp.subflow_seq;
> +			subflow->map_dll = opt.mptcp.dll;
> +			subflow->map_valid = true;
> +			pr_debug("new map seq=%llu subflow_seq=%u dll=%u",
> +				 subflow->map_seq, subflow->map_subflow_seq,
> +				 subflow->map_dll);
> +			found_map = true;
> +		}
> +
> +		kfree_skb(skb);
> +	} while (!found_map);
> +
> +	return found_map;
> +}
> +
>  static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
>  			 int nonblock, int flags, int *addr_len)
>  {
>  	struct mptcp_sock *msk = mptcp_sk(sk);
> -	struct socket *subflow;
> +	struct subflow_sock *subflow;
> +	struct mptcp_read_arg arg;
> +	read_descriptor_t desc;
> +	struct tcp_sock *tp;
> +	struct sock *ssk;
> +	int copied = 0;
> +	long timeo;
>  
> -	if (msk->connection_list) {
> -		subflow = msk->connection_list;
> -		pr_debug("conn_list->subflow=%p", subflow->sk);
> -	} else {
> -		subflow = msk->subflow;
> -		pr_debug("subflow=%p", subflow->sk);
> +	if (!msk->connection_list) {
> +		pr_debug("fallback-read subflow=%p", msk->subflow->sk);
> +		return sock_recvmsg(msk->subflow, msg, flags);
>  	}
>  
> -	return sock_recvmsg(subflow, msg, flags);
> +	ssk = msk->connection_list->sk;
> +	subflow = subflow_sk(ssk);
> +	tp = tcp_sk(ssk);
> +
> +	desc.arg.data = &arg;
> +	desc.error = 0;
> +
> +	timeo = sock_rcvtimeo(sk, nonblock);
> +
> +	len = min_t(size_t, len, INT_MAX);
> +
> +	while (copied < len) {
> +		size_t discard_len = 0;
> +		int bytes_read;
> +		u64 ack_seq;
> +		u64 old_ack;
> +		u32 ssn;
> +
> +		pr_debug("map_valid=%d", subflow->map_valid);
> +		if (!subflow->map_valid && !mptcp_get_mapping(subflow))
> +			goto wait_for_data;
> +
> +		ssn = tcp_sk(ssk)->copied_seq - subflow->ssn_offset;
> +		old_ack = atomic64_read(&msk->ack_seq);
> +
> +		if (unlikely(before(ssn, subflow->map_subflow_seq))) {
> +			/* Mapping covers data later in the subflow stream,
> +			 * discard unmapped data. */
> +			pr_debug("Mapping covers data later in stream");
> +			discard_len = subflow->map_subflow_seq - ssn;
> +		} else if (unlikely(!before(ssn, (subflow->map_subflow_seq +
> +						  subflow->map_dll)))) {
> +			/* Mapping ends earlier in the subflow stream.
> +			 * Invalidate the mapping and try again.
> +			 */
> +			subflow->map_valid = false;
> +			pr_debug("Invalid mapping ssn=%d map_seq=%d map_dll=%d",
> +				 ssn, subflow->map_subflow_seq, subflow->map_dll);
> +			continue;
> +		} else {
> +			ack_seq = get_mapped_dsn(subflow);
> +
> +			if (before64(ack_seq, old_ack)) {
> +				/* Mapping covers data already received,
> +				 * discard data in the current mapping
> +				 * and invalidate the map
> +				 */
> +				u64 map_end_dsn = subflow->map_seq +
> +					subflow->map_dll;
> +				discard_len = min(map_end_dsn - ack_seq,
> +						  old_ack - ack_seq);
> +				subflow->map_valid = false;
> +				pr_debug("Duplicate MPTCP data found");
> +			}
> +		}
> +
> +		if (discard_len) {
> +			/* Discard data for the current mapping.
> +			 */
> +			pr_debug("Discard %lu bytes", discard_len);
> +
> +			arg.msg = NULL;
> +			desc.count = discard_len;
> +
> +			bytes_read = tcp_read_sock(ssk, &desc,
> +						   mptcp_read_actor);
> +
> +			if (bytes_read < 0)
> +				break;
> +			else if (bytes_read == discard_len)
> +				continue;
> +			else
> +				goto wait_for_data;
> +		}
> +
> +		/* Read mapped data */
> +		desc.count = ssn - subflow->map_subflow_seq + subflow->map_dll;
> +		arg.msg = msg;
> +		bytes_read = tcp_read_sock(ssk, &desc, mptcp_read_actor);
> +		if (bytes_read < 0)
> +			break;
> +
> +		/* Refresh current MPTCP sequence number based on subflow seq */
> +		ack_seq = get_mapped_dsn(subflow);
> +
> +		if (before64(old_ack, ack_seq)) {
> +			atomic64_set(&msk->ack_seq, ack_seq);
> +		}
> +
> +		if (!before(tcp_sk(ssk)->copied_seq - subflow->ssn_offset,
> +			    subflow->map_subflow_seq + subflow->map_dll)) {
> +			subflow->map_valid = false;
> +			pr_debug("Done with mapping: seq=%u dll=%u",
> +				 subflow->map_subflow_seq, subflow->map_dll);
> +		}
> +
> +		copied += bytes_read;
> +
> +wait_for_data:
> +		if (copied)
> +			break;
> +
> +		if (tp->urg_data && tp->urg_seq == tp->copied_seq) {
> +			pr_err("Urgent data present, cannot proceed");
> +			break;
> +		}
> +
> +		if (ssk->sk_err || ssk->sk_state == TCP_CLOSE ||
> +		    (ssk->sk_shutdown & RCV_SHUTDOWN) || !timeo ||
> +		    signal_pending(current)) {
> +			pr_debug("nonblock or error");
> +			break;
> +		}
> +
> +		/* Handle blocking and retry read if needed.
> +		 *
> +		 * Wait on MPTCP sock, the subflow will notify via data ready.
> +		 */
> +
> +		pr_debug("block");
> +		release_sock(ssk);
> +		sk_wait_data(sk, &timeo, NULL);
> +		lock_sock(ssk);
> +
> +	}
> +
> +	release_sock(ssk);
> +	release_sock(sk);
> +
> +	return copied;
>  }
>  
>  static int mptcp_init_sock(struct sock *sk)
> @@ -240,18 +486,27 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
>  	subflow->conn = mp->sk;
>  
>  	if (subflow->mp_capable) {
> +		u64 ack_seq;
> +
> +		msk->remote_key = subflow->remote_key;
>  		msk->local_key = subflow->local_key;
>  		msk->token = subflow->token;
> +		pr_debug("token=%u", msk->token);
>  		token_update_accept(new_sock->sk, mp->sk);
> +		msk->connection_list = new_sock;
> +
> +		crypto_key_sha1(msk->remote_key, NULL, &ack_seq);
>  		msk->write_seq = subflow->idsn + 1;
> +		ack_seq++;
> +		atomic64_set(&msk->ack_seq, ack_seq);
> +		subflow->map_seq = ack_seq;
> +		subflow->map_subflow_seq = 1;
>  		subflow->rel_write_seq = 1;
> -		msk->remote_key = subflow->remote_key;
> -		crypto_key_sha1(msk->remote_key, NULL, &msk->ack_seq);
> -		msk->ack_seq++;
> -		msk->connection_list = new_sock;
> +		subflow->conn = mp->sk;
>  	} else {
>  		msk->subflow = new_sock;
>  	}
> +	inet_sk_state_store(sk, TCP_ESTABLISHED);
>  
>  	return mp->sk;
>  }
> @@ -329,17 +584,24 @@ void mptcp_finish_connect(struct sock *sk, int mp_capable)
>  	pr_debug("msk=%p", msk);
>  
>  	if (mp_capable) {
> +		u64 ack_seq;
> +
> +		msk->remote_key = subflow->remote_key;
>  		msk->local_key = subflow->local_key;
>  		msk->token = subflow->token;
> -		msk->write_seq = subflow->idsn + 1;
> -		subflow->rel_write_seq = 1;
> -		msk->remote_key = subflow->remote_key;
> -		crypto_key_sha1(msk->remote_key, NULL, &msk->ack_seq);
> -		msk->ack_seq++;
> +		pr_debug("token=%u", msk->token);
>  		msk->connection_list = msk->subflow;
>  		msk->subflow = NULL;
> +
> +		crypto_key_sha1(msk->remote_key, NULL, &ack_seq);
> +		msk->write_seq = subflow->idsn + 1;
> +		ack_seq++;
> +		atomic64_set(&msk->ack_seq, ack_seq);
> +		subflow->map_seq = ack_seq;
> +		subflow->map_subflow_seq = 1;
> +		subflow->rel_write_seq = 1;
>  	}
> -	sk->sk_state = TCP_ESTABLISHED;
> +	inet_sk_state_store(sk, TCP_ESTABLISHED);
>  }
>  
>  static int subflow_create(struct sock *sock)
> diff --git a/net/mptcp/subflow.c b/net/mptcp/subflow.c
> index e0b4aa8fe51d..3c9dca4be26a 100644
> --- a/net/mptcp/subflow.c
> +++ b/net/mptcp/subflow.c
> @@ -107,6 +107,8 @@ static void subflow_v4_init_req(struct request_sock *req,
>  		subflow_req->remote_key = rx_opt.mptcp.sndr_key;
>  		pr_debug("remote_key=%llu", subflow_req->remote_key);
>  		token_new_request(req, skb);
> +		pr_debug("syn seq=%u", TCP_SKB_CB(skb)->seq);
> +		subflow_req->ssn_offset = TCP_SKB_CB(skb)->seq;
>  	} else {
>  		subflow_req->mp_capable = 0;
>  	}
> @@ -120,10 +122,15 @@ static void subflow_finish_connect(struct sock *sk, const struct sk_buff *skb)
>  
>  	pr_debug("subflow=%p", subflow);
>  
> -	if (subflow->conn) {
> +	if (!subflow->conn_finished) {
>  		pr_debug("remote_key=%llu", subflow->remote_key);
>  		mptcp_finish_connect(subflow->conn, subflow->mp_capable);
> -		subflow->conn = NULL;
> +		subflow->conn_finished = 1;
> +
> +		if (skb) {
> +			pr_debug("synack seq=%u", TCP_SKB_CB(skb)->seq);
> +			subflow->ssn_offset = TCP_SKB_CB(skb)->seq;
> +		}
>  	}
>  }
>  
> @@ -172,7 +179,9 @@ static struct sock *subflow_syn_recv_sock(const struct sock *sk,
>  			subflow->fourth_ack = 1;
>  			subflow->remote_key = subflow_req->remote_key;
>  			subflow->local_key = subflow_req->local_key;
> +			subflow->ssn_offset = subflow_req->ssn_offset;
>  			subflow->token = subflow_req->token;
> +			subflow->idsn = subflow_req->idsn;
>  			pr_debug("token=%u", subflow->token);
>  			token_new_accept(child);
>  		} else {
> @@ -202,6 +211,20 @@ const struct inet_connection_sock_af_ops subflow_specific = {
>  	.mtu_reduced	   = tcp_v4_mtu_reduced,
>  };
>  
> +static void subflow_data_ready(struct sock *sk)
> +{
> +	struct subflow_sock *subflow = subflow_sk(sk);
> +	struct sock *parent = subflow->conn;
> +
> +	pr_debug("sk=%p", sk);
> +	subflow->tcp_sk_data_ready(sk);
> +
> +	if (parent) {
> +		pr_debug("parent=%p", parent);
> +		parent->sk_data_ready(parent);
> +	}
> +}
> +
>  static int subflow_init_sock(struct sock *sk)
>  {
>  	struct subflow_sock *subflow = subflow_sk(sk);
> @@ -215,6 +238,8 @@ static int subflow_init_sock(struct sock *sk)
>  	if (!err) { // @@ AND mptcp is enabled
>  		tsk->is_mptcp = 1;
>  		icsk->icsk_af_ops = &subflow_specific;
> +		subflow->tcp_sk_data_ready = sk->sk_data_ready;
> +		sk->sk_data_ready = subflow_data_ready;
>  	}
>  
>  	return err;
> -- 
> 2.19.1
> 
> _______________________________________________
> mptcp mailing list
> mptcp(a)lists.01.org
> https://lists.01.org/mailman/listinfo/mptcp

             reply	other threads:[~2018-10-08 18:38 UTC|newest]

Thread overview: 7+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-10-08 18:38 Christoph Paasch [this message]
  -- strict thread matches above, loose matches on Subject: below --
2018-10-11  0:17 [MPTCP] [RFC PATCH v3 15/16] mptcp: Implement MPTCP receive path Mat Martineau
2018-10-10 20:58 Christoph Paasch
2018-10-10 19:09 Mat Martineau
2018-10-09 16:31 Christoph Paasch
2018-10-09  0:48 Mat Martineau
2018-10-05 22:59 Mat Martineau

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20181008183848.GZ36310@MacBook-Pro-19.local \
    --to=unknown@example.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.