netdev.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH net-next 0/3] tipc: clean up socket message reception
@ 2015-07-22 14:11 Jon Maloy
  2015-07-22 14:11 ` [PATCH net-next 1/3] tipc: let function tipc_msg_reverse() expand header when needed Jon Maloy
                   ` (3 more replies)
  0 siblings, 4 replies; 5+ messages in thread
From: Jon Maloy @ 2015-07-22 14:11 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

Despite recent improvements the message reception code in socket.c is
perceived as obscure and hard to follow, especially regarding the logics
for message rejection. With the commits in this series we try to remedy
this situation.

Jon Maloy (3):
  tipc: let function tipc_msg_reverse() expand header when needed
  tipc: introduce new tipc_sk_respond() function
  tipc: clean up socket layer message reception

 net/tipc/msg.c    |  86 ++++++++-------
 net/tipc/msg.h    |   6 +-
 net/tipc/socket.c | 322 +++++++++++++++++++++++++++---------------------------
 net/tipc/socket.h |   2 +-
 4 files changed, 213 insertions(+), 203 deletions(-)

-- 
1.9.1

^ permalink raw reply	[flat|nested] 5+ messages in thread

* [PATCH net-next 1/3] tipc: let function tipc_msg_reverse() expand header when needed
  2015-07-22 14:11 [PATCH net-next 0/3] tipc: clean up socket message reception Jon Maloy
@ 2015-07-22 14:11 ` Jon Maloy
  2015-07-22 14:11 ` [PATCH net-next 2/3] tipc: introduce new tipc_sk_respond() function Jon Maloy
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 5+ messages in thread
From: Jon Maloy @ 2015-07-22 14:11 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

The shortest TIPC message header, for cluster local CONNECTED messages,
is 24 bytes long. With this format, the fields "dest_node" and
"orig_node" are optimized away, since they in reality are redundant
in this particular case.

However, the absence of these fields leads to code inconsistencies
that are difficult to handle in some cases, especially when we need
to reverse or reject messages at the socket layer.

In this commit, we concentrate the handling of the absent fields
to one place, by letting the function tipc_msg_reverse() reallocate
the buffer and expand the header to 32 bytes when necessary. This
means that the socket code now can assume that the two previously
absent fields are present in the header when a message needs to be
rejected. This opens up for some further simplifications of the
socket code.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
 net/tipc/msg.c    | 67 ++++++++++++++++++++++++++++++++++---------------------
 net/tipc/msg.h    |  3 +--
 net/tipc/socket.c | 12 +++++-----
 3 files changed, 48 insertions(+), 34 deletions(-)

diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 08b4cc7..4339aab 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -463,43 +463,58 @@ bool tipc_msg_make_bundle(struct sk_buff **skb,  struct tipc_msg *msg,
 
 /**
  * tipc_msg_reverse(): swap source and destination addresses and add error code
- * @buf:  buffer containing message to be reversed
- * @dnode: return value: node where to send message after reversal
- * @err:  error code to be set in message
- * Consumes buffer if failure
+ * @own_node: originating node id for reversed message
+ * @skb:  buffer containing message to be reversed; may be replaced.
+ * @err:  error code to be set in message, if any
+ * Consumes buffer at failure
  * Returns true if success, otherwise false
  */
-bool tipc_msg_reverse(u32 own_addr,  struct sk_buff *buf, u32 *dnode,
-		      int err)
+bool tipc_msg_reverse(u32 own_node,  struct sk_buff **skb, u32 *dnode, int err)
 {
-	struct tipc_msg *msg = buf_msg(buf);
+	struct sk_buff *_skb = *skb;
+	struct tipc_msg *hdr = buf_msg(_skb);
 	struct tipc_msg ohdr;
-	uint rdsz = min_t(uint, msg_data_sz(msg), MAX_FORWARD_SIZE);
+	int dlen = min_t(uint, msg_data_sz(hdr), MAX_FORWARD_SIZE);
 
-	if (skb_linearize(buf))
+	if (skb_linearize(_skb))
 		goto exit;
-	msg = buf_msg(buf);
-	if (msg_dest_droppable(msg))
+	hdr = buf_msg(_skb);
+	if (msg_dest_droppable(hdr))
 		goto exit;
-	if (msg_errcode(msg))
+	if (msg_errcode(hdr))
 		goto exit;
-	memcpy(&ohdr, msg, msg_hdr_sz(msg));
-	msg_set_errcode(msg, err);
-	msg_set_origport(msg, msg_destport(&ohdr));
-	msg_set_destport(msg, msg_origport(&ohdr));
-	msg_set_prevnode(msg, own_addr);
-	if (!msg_short(msg)) {
-		msg_set_orignode(msg, msg_destnode(&ohdr));
-		msg_set_destnode(msg, msg_orignode(&ohdr));
+
+	/* Take a copy of original header before altering message */
+	memcpy(&ohdr, hdr, msg_hdr_sz(hdr));
+
+	/* Never return SHORT header; expand by replacing buffer if necessary */
+	if (msg_short(hdr)) {
+		*skb = tipc_buf_acquire(BASIC_H_SIZE + dlen);
+		if (!*skb)
+			goto exit;
+		memcpy((*skb)->data + BASIC_H_SIZE, msg_data(hdr), dlen);
+		kfree_skb(_skb);
+		_skb = *skb;
+		hdr = buf_msg(_skb);
+		memcpy(hdr, &ohdr, BASIC_H_SIZE);
+		msg_set_hdr_sz(hdr, BASIC_H_SIZE);
 	}
-	msg_set_size(msg, msg_hdr_sz(msg) + rdsz);
-	skb_trim(buf, msg_size(msg));
-	skb_orphan(buf);
-	*dnode = msg_orignode(&ohdr);
+
+	/* Now reverse the concerned fields */
+	msg_set_errcode(hdr, err);
+	msg_set_origport(hdr, msg_destport(&ohdr));
+	msg_set_destport(hdr, msg_origport(&ohdr));
+	msg_set_destnode(hdr, msg_prevnode(&ohdr));
+	msg_set_prevnode(hdr, own_node);
+	msg_set_orignode(hdr, own_node);
+	msg_set_size(hdr, msg_hdr_sz(hdr) + dlen);
+	*dnode = msg_destnode(hdr);
+	skb_trim(_skb, msg_size(hdr));
+	skb_orphan(_skb);
 	return true;
 exit:
-	kfree_skb(buf);
-	*dnode = 0;
+	kfree_skb(_skb);
+	*skb = NULL;
 	return false;
 }
 
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 2f1563b..0e96f59 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -785,8 +785,7 @@ static inline bool msg_peer_is_up(struct tipc_msg *m)
 
 struct sk_buff *tipc_buf_acquire(u32 size);
 bool tipc_msg_validate(struct sk_buff *skb);
-bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode,
-		      int err);
+bool tipc_msg_reverse(u32 own_addr, struct sk_buff **skb, u32 *dnode, int err);
 void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type,
 		   u32 hsize, u32 destnode);
 struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 5b0b08d..e2d5b98 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -260,7 +260,7 @@ static void tsk_rej_rx_queue(struct sock *sk)
 	u32 own_node = tsk_own_node(tipc_sk(sk));
 
 	while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
-		if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT))
+		if (tipc_msg_reverse(own_node, &skb, &dnode, TIPC_ERR_NO_PORT))
 			tipc_node_xmit_skb(sock_net(sk), skb, dnode, 0);
 	}
 }
@@ -441,7 +441,7 @@ static int tipc_release(struct socket *sock)
 				tsk->connected = 0;
 				tipc_node_remove_conn(net, dnode, tsk->portid);
 			}
-			if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
+			if (tipc_msg_reverse(tsk_own_node(tsk), &skb, &dnode,
 					     TIPC_ERR_NO_PORT))
 				tipc_node_xmit_skb(net, skb, dnode, 0);
 		}
@@ -784,7 +784,7 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff **skb)
 		if (conn_cong)
 			tsk->sk.sk_write_space(&tsk->sk);
 	} else if (msg_type(msg) == CONN_PROBE) {
-		if (tipc_msg_reverse(own_node, *skb, &dnode, TIPC_OK)) {
+		if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_OK)) {
 			msg_set_type(msg, CONN_PROBE_REPLY);
 			return;
 		}
@@ -1702,7 +1702,7 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
 			atomic_add(truesize, dcnt);
 		return 0;
 	}
-	if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err))
+	if (!err || tipc_msg_reverse(tsk_own_node(tsk), &skb, &dnode, -err))
 		tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
 	return 0;
 }
@@ -1796,7 +1796,7 @@ int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
 			goto xmit;
 		}
 		tn = net_generic(net, tipc_net_id);
-		if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
+		if (!tipc_msg_reverse(tn->own_addr, &skb, &dnode, -err))
 			continue;
 xmit:
 		tipc_node_xmit_skb(net, skb, dnode, dport);
@@ -2090,7 +2090,7 @@ restart:
 				kfree_skb(skb);
 				goto restart;
 			}
-			if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
+			if (tipc_msg_reverse(tsk_own_node(tsk), &skb, &dnode,
 					     TIPC_CONN_SHUTDOWN))
 				tipc_node_xmit_skb(net, skb, dnode,
 						   tsk->portid);
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH net-next 2/3] tipc: introduce new tipc_sk_respond() function
  2015-07-22 14:11 [PATCH net-next 0/3] tipc: clean up socket message reception Jon Maloy
  2015-07-22 14:11 ` [PATCH net-next 1/3] tipc: let function tipc_msg_reverse() expand header when needed Jon Maloy
@ 2015-07-22 14:11 ` Jon Maloy
  2015-07-22 14:11 ` [PATCH net-next 3/3] tipc: clean up socket layer message reception Jon Maloy
  2015-07-26 23:32 ` [PATCH net-next 0/3] tipc: clean up socket " David Miller
  3 siblings, 0 replies; 5+ messages in thread
From: Jon Maloy @ 2015-07-22 14:11 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

Currently, we use the code sequence

if (msg_reverse())
   tipc_link_xmit_skb()

at numerous locations in socket.c. The preparation of arguments
for these calls, as well as the sequence itself, makes the code
unecessarily complex.

In this commit, we introduce a new function, tipc_sk_respond(),
that performs this call combination. We also replace some, but not
yet all, of these explicit call sequences with calls to the new
function. Notably, we let the function tipc_sk_proto_rcv() use
the new function to directly send out PROBE_REPLY messages,
instead of deferring this to the calling tipc_sk_rcv() function,
as we do now.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
 net/tipc/msg.c    |  3 +--
 net/tipc/msg.h    |  2 +-
 net/tipc/socket.c | 81 ++++++++++++++++++++++++++++++-------------------------
 3 files changed, 47 insertions(+), 39 deletions(-)

diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 4339aab..b6cc58e 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -469,7 +469,7 @@ bool tipc_msg_make_bundle(struct sk_buff **skb,  struct tipc_msg *msg,
  * Consumes buffer at failure
  * Returns true if success, otherwise false
  */
-bool tipc_msg_reverse(u32 own_node,  struct sk_buff **skb, u32 *dnode, int err)
+bool tipc_msg_reverse(u32 own_node,  struct sk_buff **skb, int err)
 {
 	struct sk_buff *_skb = *skb;
 	struct tipc_msg *hdr = buf_msg(_skb);
@@ -508,7 +508,6 @@ bool tipc_msg_reverse(u32 own_node,  struct sk_buff **skb, u32 *dnode, int err)
 	msg_set_prevnode(hdr, own_node);
 	msg_set_orignode(hdr, own_node);
 	msg_set_size(hdr, msg_hdr_sz(hdr) + dlen);
-	*dnode = msg_destnode(hdr);
 	skb_trim(_skb, msg_size(hdr));
 	skb_orphan(_skb);
 	return true;
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 0e96f59..d0834bc 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -785,7 +785,7 @@ static inline bool msg_peer_is_up(struct tipc_msg *m)
 
 struct sk_buff *tipc_buf_acquire(u32 size);
 bool tipc_msg_validate(struct sk_buff *skb);
-bool tipc_msg_reverse(u32 own_addr, struct sk_buff **skb, u32 *dnode, int err);
+bool tipc_msg_reverse(u32 own_addr, struct sk_buff **skb, int err);
 void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type,
 		   u32 hsize, u32 destnode);
 struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index e2d5b98..71d88ad 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -248,6 +248,22 @@ static void tsk_advance_rx_queue(struct sock *sk)
 	kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
 }
 
+/* tipc_sk_respond() : send response message back to sender
+ */
+static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err)
+{
+	u32 selector;
+	u32 dnode;
+	u32 onode = tipc_own_addr(sock_net(sk));
+
+	if (!tipc_msg_reverse(onode, &skb, err))
+		return;
+
+	dnode = msg_destnode(buf_msg(skb));
+	selector = msg_origport(buf_msg(skb));
+	tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
+}
+
 /**
  * tsk_rej_rx_queue - reject all buffers in socket receive queue
  *
@@ -256,13 +272,9 @@ static void tsk_advance_rx_queue(struct sock *sk)
 static void tsk_rej_rx_queue(struct sock *sk)
 {
 	struct sk_buff *skb;
-	u32 dnode;
-	u32 own_node = tsk_own_node(tipc_sk(sk));
 
-	while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
-		if (tipc_msg_reverse(own_node, &skb, &dnode, TIPC_ERR_NO_PORT))
-			tipc_node_xmit_skb(sock_net(sk), skb, dnode, 0);
-	}
+	while ((skb = __skb_dequeue(&sk->sk_receive_queue)))
+		tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);
 }
 
 /* tsk_peer_msg - verify if message was sent by connected port's peer
@@ -441,9 +453,7 @@ static int tipc_release(struct socket *sock)
 				tsk->connected = 0;
 				tipc_node_remove_conn(net, dnode, tsk->portid);
 			}
-			if (tipc_msg_reverse(tsk_own_node(tsk), &skb, &dnode,
-					     TIPC_ERR_NO_PORT))
-				tipc_node_xmit_skb(net, skb, dnode, 0);
+			tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);
 		}
 	}
 
@@ -764,35 +774,35 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
 /**
  * tipc_sk_proto_rcv - receive a connection mng protocol message
  * @tsk: receiving socket
- * @skb: pointer to message buffer. Set to NULL if buffer is consumed.
+ * @skb: pointer to message buffer.
  */
-static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff **skb)
+static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
 {
-	struct tipc_msg *msg = buf_msg(*skb);
+	struct sock *sk = &tsk->sk;
+	struct tipc_msg *hdr = buf_msg(skb);
+	int mtyp = msg_type(hdr);
 	int conn_cong;
-	u32 dnode;
-	u32 own_node = tsk_own_node(tsk);
+
 	/* Ignore if connection cannot be validated: */
-	if (!tsk_peer_msg(tsk, msg))
+	if (!tsk_peer_msg(tsk, hdr))
 		goto exit;
 
 	tsk->probing_state = TIPC_CONN_OK;
 
-	if (msg_type(msg) == CONN_ACK) {
+	if (mtyp == CONN_PROBE) {
+		msg_set_type(hdr, CONN_PROBE_REPLY);
+		tipc_sk_respond(sk, skb, TIPC_OK);
+		return;
+	} else if (mtyp == CONN_ACK) {
 		conn_cong = tsk_conn_cong(tsk);
-		tsk->sent_unacked -= msg_msgcnt(msg);
+		tsk->sent_unacked -= msg_msgcnt(hdr);
 		if (conn_cong)
-			tsk->sk.sk_write_space(&tsk->sk);
-	} else if (msg_type(msg) == CONN_PROBE) {
-		if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_OK)) {
-			msg_set_type(msg, CONN_PROBE_REPLY);
-			return;
-		}
+			sk->sk_write_space(sk);
+	} else if (mtyp != CONN_PROBE_REPLY) {
+		pr_warn("Received unknown CONN_PROTO msg\n");
 	}
-	/* Do nothing if msg_type() == CONN_PROBE_REPLY */
 exit:
-	kfree_skb(*skb);
-	*skb = NULL;
+	kfree_skb(skb);
 }
 
 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
@@ -1638,7 +1648,7 @@ static int filter_rcv(struct sock *sk, struct sk_buff **skb)
 	int rc = TIPC_OK;
 
 	if (unlikely(msg_user(msg) == CONN_MANAGER)) {
-		tipc_sk_proto_rcv(tsk, skb);
+		tipc_sk_proto_rcv(tsk, *skb);
 		return TIPC_OK;
 	}
 
@@ -1690,7 +1700,7 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
 {
 	int err;
 	atomic_t *dcnt;
-	u32 dnode;
+	u32 dnode = msg_prevnode(buf_msg(skb));
 	struct tipc_sock *tsk = tipc_sk(sk);
 	struct net *net = sock_net(sk);
 	uint truesize = skb->truesize;
@@ -1702,7 +1712,7 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
 			atomic_add(truesize, dcnt);
 		return 0;
 	}
-	if (!err || tipc_msg_reverse(tsk_own_node(tsk), &skb, &dnode, -err))
+	if (!err || tipc_msg_reverse(tsk_own_node(tsk), &skb, -err))
 		tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
 	return 0;
 }
@@ -1794,9 +1804,11 @@ int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
 		if (!err) {
 			dnode = msg_destnode(buf_msg(skb));
 			goto xmit;
+		} else {
+			dnode = msg_prevnode(buf_msg(skb));
 		}
 		tn = net_generic(net, tipc_net_id);
-		if (!tipc_msg_reverse(tn->own_addr, &skb, &dnode, -err))
+		if (!tipc_msg_reverse(tn->own_addr, &skb, -err))
 			continue;
 xmit:
 		tipc_node_xmit_skb(net, skb, dnode, dport);
@@ -2083,6 +2095,8 @@ static int tipc_shutdown(struct socket *sock, int how)
 	case SS_CONNECTED:
 
 restart:
+		dnode = tsk_peer_node(tsk);
+
 		/* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
 		skb = __skb_dequeue(&sk->sk_receive_queue);
 		if (skb) {
@@ -2090,13 +2104,8 @@ restart:
 				kfree_skb(skb);
 				goto restart;
 			}
-			if (tipc_msg_reverse(tsk_own_node(tsk), &skb, &dnode,
-					     TIPC_CONN_SHUTDOWN))
-				tipc_node_xmit_skb(net, skb, dnode,
-						   tsk->portid);
+			tipc_sk_respond(sk, skb, TIPC_CONN_SHUTDOWN);
 		} else {
-			dnode = tsk_peer_node(tsk);
-
 			skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
 					      TIPC_CONN_MSG, SHORT_H_SIZE,
 					      0, dnode, tsk_own_node(tsk),
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 5+ messages in thread

* [PATCH net-next 3/3] tipc: clean up socket layer message reception
  2015-07-22 14:11 [PATCH net-next 0/3] tipc: clean up socket message reception Jon Maloy
  2015-07-22 14:11 ` [PATCH net-next 1/3] tipc: let function tipc_msg_reverse() expand header when needed Jon Maloy
  2015-07-22 14:11 ` [PATCH net-next 2/3] tipc: introduce new tipc_sk_respond() function Jon Maloy
@ 2015-07-22 14:11 ` Jon Maloy
  2015-07-26 23:32 ` [PATCH net-next 0/3] tipc: clean up socket " David Miller
  3 siblings, 0 replies; 5+ messages in thread
From: Jon Maloy @ 2015-07-22 14:11 UTC (permalink / raw)
  To: davem
  Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion, Jon Maloy

When a message is received in a socket, one of the call chains
tipc_sk_rcv()->tipc_sk_enqueue()->filter_rcv()(->tipc_sk_proto_rcv())
or
tipc_sk_backlog_rcv()->filter_rcv()(->tipc_sk_proto_rcv())
are followed. At each of these levels we may encounter situations
where the message may need to be rejected, or a new message
produced for transfer back to the sender. Despite recent
improvements, the current code for doing this is perceived
as awkward and hard to follow.

Leveraging the two previous commits in this series, we now
introduce a more uniform handling of such situations. We
let each of the functions in the chain itself produce/reverse
the message to be returned to the sender, but also perform the
actual forwarding. This simplifies the necessary logics within
each function.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
 net/tipc/msg.c    |  20 ++---
 net/tipc/msg.h    |   3 +-
 net/tipc/socket.c | 255 ++++++++++++++++++++++++++----------------------------
 net/tipc/socket.h |   2 +-
 4 files changed, 134 insertions(+), 146 deletions(-)

diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index b6cc58e..562c926 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -520,17 +520,15 @@ exit:
 /**
  * tipc_msg_lookup_dest(): try to find new destination for named message
  * @skb: the buffer containing the message.
- * @dnode: return value: next-hop node, if destination found
- * @err: return value: error code to use, if message to be rejected
+ * @err: error code to be used by caller if lookup fails
  * Does not consume buffer
  * Returns true if a destination is found, false otherwise
  */
-bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb,
-			  u32 *dnode, int *err)
+bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err)
 {
 	struct tipc_msg *msg = buf_msg(skb);
-	u32 dport;
-	u32 own_addr = tipc_own_addr(net);
+	u32 dport, dnode;
+	u32 onode = tipc_own_addr(net);
 
 	if (!msg_isdata(msg))
 		return false;
@@ -543,15 +541,15 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb,
 		return false;
 	if (msg_reroute_cnt(msg))
 		return false;
-	*dnode = addr_domain(net, msg_lookup_scope(msg));
+	dnode = addr_domain(net, msg_lookup_scope(msg));
 	dport = tipc_nametbl_translate(net, msg_nametype(msg),
-				       msg_nameinst(msg), dnode);
+				       msg_nameinst(msg), &dnode);
 	if (!dport)
 		return false;
 	msg_incr_reroute_cnt(msg);
-	if (*dnode != own_addr)
-		msg_set_prevnode(msg, own_addr);
-	msg_set_destnode(msg, *dnode);
+	if (dnode != onode)
+		msg_set_prevnode(msg, onode);
+	msg_set_destnode(msg, dnode);
 	msg_set_destport(msg, dport);
 	*err = TIPC_OK;
 	return true;
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index d0834bc..234fb05 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -798,8 +798,7 @@ bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg,
 bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);
 int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
 		   int offset, int dsz, int mtu, struct sk_buff_head *list);
-bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, u32 *dnode,
-			  int *err);
+bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
 struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list);
 
 static inline u16 buf_seqno(struct sk_buff *skb)
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 71d88ad..1060d52 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1520,82 +1520,81 @@ static void tipc_data_ready(struct sock *sk)
  * @tsk: TIPC socket
  * @skb: pointer to message buffer. Set to NULL if buffer is consumed
  *
- * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise
+ * Returns true if everything ok, false otherwise
  */
-static int filter_connect(struct tipc_sock *tsk, struct sk_buff **skb)
+static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
 {
 	struct sock *sk = &tsk->sk;
 	struct net *net = sock_net(sk);
 	struct socket *sock = sk->sk_socket;
-	struct tipc_msg *msg = buf_msg(*skb);
-	int retval = -TIPC_ERR_NO_PORT;
+	struct tipc_msg *hdr = buf_msg(skb);
 
-	if (msg_mcast(msg))
-		return retval;
+	if (unlikely(msg_mcast(hdr)))
+		return false;
 
 	switch ((int)sock->state) {
 	case SS_CONNECTED:
+
 		/* Accept only connection-based messages sent by peer */
-		if (tsk_peer_msg(tsk, msg)) {
-			if (unlikely(msg_errcode(msg))) {
-				sock->state = SS_DISCONNECTING;
-				tsk->connected = 0;
-				/* let timer expire on it's own */
-				tipc_node_remove_conn(net, tsk_peer_node(tsk),
-						      tsk->portid);
-			}
-			retval = TIPC_OK;
+		if (unlikely(!tsk_peer_msg(tsk, hdr)))
+			return false;
+
+		if (unlikely(msg_errcode(hdr))) {
+			sock->state = SS_DISCONNECTING;
+			tsk->connected = 0;
+			/* Let timer expire on it's own */
+			tipc_node_remove_conn(net, tsk_peer_node(tsk),
+					      tsk->portid);
 		}
-		break;
+		return true;
+
 	case SS_CONNECTING:
-		/* Accept only ACK or NACK message */
 
-		if (unlikely(!msg_connected(msg)))
-			break;
+		/* Accept only ACK or NACK message */
+		if (unlikely(!msg_connected(hdr)))
+			return false;
 
-		if (unlikely(msg_errcode(msg))) {
+		if (unlikely(msg_errcode(hdr))) {
 			sock->state = SS_DISCONNECTING;
 			sk->sk_err = ECONNREFUSED;
-			retval = TIPC_OK;
-			break;
+			return true;
 		}
 
-		if (unlikely(msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)) {
+		if (unlikely(!msg_isdata(hdr))) {
 			sock->state = SS_DISCONNECTING;
 			sk->sk_err = EINVAL;
-			retval = TIPC_OK;
-			break;
+			return true;
 		}
 
-		tipc_sk_finish_conn(tsk, msg_origport(msg), msg_orignode(msg));
-		msg_set_importance(&tsk->phdr, msg_importance(msg));
+		tipc_sk_finish_conn(tsk, msg_origport(hdr), msg_orignode(hdr));
+		msg_set_importance(&tsk->phdr, msg_importance(hdr));
 		sock->state = SS_CONNECTED;
 
-		/* If an incoming message is an 'ACK-', it should be
-		 * discarded here because it doesn't contain useful
-		 * data. In addition, we should try to wake up
-		 * connect() routine if sleeping.
-		 */
-		if (msg_data_sz(msg) == 0) {
-			kfree_skb(*skb);
-			*skb = NULL;
-			if (waitqueue_active(sk_sleep(sk)))
-				wake_up_interruptible(sk_sleep(sk));
-		}
-		retval = TIPC_OK;
-		break;
+		/* If 'ACK+' message, add to socket receive queue */
+		if (msg_data_sz(hdr))
+			return true;
+
+		/* If empty 'ACK-' message, wake up sleeping connect() */
+		if (waitqueue_active(sk_sleep(sk)))
+			wake_up_interruptible(sk_sleep(sk));
+
+		/* 'ACK-' message is neither accepted nor rejected: */
+		msg_set_dest_droppable(hdr, 1);
+		return false;
+
 	case SS_LISTENING:
 	case SS_UNCONNECTED:
+
 		/* Accept only SYN message */
-		if (!msg_connected(msg) && !(msg_errcode(msg)))
-			retval = TIPC_OK;
+		if (!msg_connected(hdr) && !(msg_errcode(hdr)))
+			return true;
 		break;
 	case SS_DISCONNECTING:
 		break;
 	default:
 		pr_err("Unknown socket state %u\n", sock->state);
 	}
-	return retval;
+	return false;
 }
 
 /**
@@ -1630,61 +1629,70 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
 /**
  * filter_rcv - validate incoming message
  * @sk: socket
- * @skb: pointer to message. Set to NULL if buffer is consumed.
+ * @skb: pointer to message.
  *
  * Enqueues message on receive queue if acceptable; optionally handles
  * disconnect indication for a connected socket.
  *
  * Called with socket lock already taken
  *
- * Returns 0 (TIPC_OK) if message was ok, -TIPC error code if rejected
+ * Returns true if message was added to socket receive queue, otherwise false
  */
-static int filter_rcv(struct sock *sk, struct sk_buff **skb)
+static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
 {
 	struct socket *sock = sk->sk_socket;
 	struct tipc_sock *tsk = tipc_sk(sk);
-	struct tipc_msg *msg = buf_msg(*skb);
-	unsigned int limit = rcvbuf_limit(sk, *skb);
-	int rc = TIPC_OK;
+	struct tipc_msg *hdr = buf_msg(skb);
+	unsigned int limit = rcvbuf_limit(sk, skb);
+	int err = TIPC_OK;
+	int usr = msg_user(hdr);
 
-	if (unlikely(msg_user(msg) == CONN_MANAGER)) {
-		tipc_sk_proto_rcv(tsk, *skb);
-		return TIPC_OK;
+	if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
+		tipc_sk_proto_rcv(tsk, skb);
+		return false;
 	}
 
-	if (unlikely(msg_user(msg) == SOCK_WAKEUP)) {
-		kfree_skb(*skb);
+	if (unlikely(usr == SOCK_WAKEUP)) {
+		kfree_skb(skb);
 		tsk->link_cong = 0;
 		sk->sk_write_space(sk);
-		*skb = NULL;
-		return TIPC_OK;
+		return false;
 	}
 
-	/* Reject message if it is wrong sort of message for socket */
-	if (msg_type(msg) > TIPC_DIRECT_MSG)
-		return -TIPC_ERR_NO_PORT;
+	/* Drop if illegal message type */
+	if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) {
+		kfree_skb(skb);
+		return false;
+	}
 
-	if (sock->state == SS_READY) {
-		if (msg_connected(msg))
-			return -TIPC_ERR_NO_PORT;
-	} else {
-		rc = filter_connect(tsk, skb);
-		if (rc != TIPC_OK || !*skb)
-			return rc;
+	/* Reject if wrong message type for current socket state */
+	if (unlikely(sock->state == SS_READY)) {
+		if (msg_connected(hdr)) {
+			err = TIPC_ERR_NO_PORT;
+			goto reject;
+		}
+	} else if (unlikely(!filter_connect(tsk, skb))) {
+		err = TIPC_ERR_NO_PORT;
+		goto reject;
 	}
 
 	/* Reject message if there isn't room to queue it */
-	if (sk_rmem_alloc_get(sk) + (*skb)->truesize >= limit)
-		return -TIPC_ERR_OVERLOAD;
+	if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) {
+		err = TIPC_ERR_OVERLOAD;
+		goto reject;
+	}
 
 	/* Enqueue message */
-	TIPC_SKB_CB(*skb)->handle = NULL;
-	__skb_queue_tail(&sk->sk_receive_queue, *skb);
-	skb_set_owner_r(*skb, sk);
+	TIPC_SKB_CB(skb)->handle = NULL;
+	__skb_queue_tail(&sk->sk_receive_queue, skb);
+	skb_set_owner_r(skb, sk);
 
 	sk->sk_data_ready(sk);
-	*skb = NULL;
-	return TIPC_OK;
+	return true;
+
+reject:
+	tipc_sk_respond(sk, skb, err);
+	return false;
 }
 
 /**
@@ -1698,22 +1706,10 @@ static int filter_rcv(struct sock *sk, struct sk_buff **skb)
  */
 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
 {
-	int err;
-	atomic_t *dcnt;
-	u32 dnode = msg_prevnode(buf_msg(skb));
-	struct tipc_sock *tsk = tipc_sk(sk);
-	struct net *net = sock_net(sk);
-	uint truesize = skb->truesize;
+	unsigned int truesize = skb->truesize;
 
-	err = filter_rcv(sk, &skb);
-	if (likely(!skb)) {
-		dcnt = &tsk->dupl_rcvcnt;
-		if (atomic_read(dcnt) < TIPC_CONN_OVERLOAD_LIMIT)
-			atomic_add(truesize, dcnt);
-		return 0;
-	}
-	if (!err || tipc_msg_reverse(tsk_own_node(tsk), &skb, -err))
-		tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
+	if (likely(filter_rcv(sk, skb)))
+		atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
 	return 0;
 }
 
@@ -1723,45 +1719,43 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
  * @inputq: list of incoming buffers with potentially different destinations
  * @sk: socket where the buffers should be enqueued
  * @dport: port number for the socket
- * @_skb: returned buffer to be forwarded or rejected, if applicable
  *
  * Caller must hold socket lock
- *
- * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD
- * or -TIPC_ERR_NO_PORT
  */
-static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
-			   u32 dport, struct sk_buff **_skb)
+static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
+			    u32 dport)
 {
 	unsigned int lim;
 	atomic_t *dcnt;
-	int err;
 	struct sk_buff *skb;
 	unsigned long time_limit = jiffies + 2;
 
 	while (skb_queue_len(inputq)) {
 		if (unlikely(time_after_eq(jiffies, time_limit)))
-			return TIPC_OK;
+			return;
+
 		skb = tipc_skb_dequeue(inputq, dport);
 		if (unlikely(!skb))
-			return TIPC_OK;
+			return;
+
+		/* Add message directly to receive queue if possible */
 		if (!sock_owned_by_user(sk)) {
-			err = filter_rcv(sk, &skb);
-			if (likely(!skb))
-				continue;
-			*_skb = skb;
-			return err;
+			filter_rcv(sk, skb);
+			continue;
 		}
+
+		/* Try backlog, compensating for double-counted bytes */
 		dcnt = &tipc_sk(sk)->dupl_rcvcnt;
 		if (sk->sk_backlog.len)
 			atomic_set(dcnt, 0);
 		lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
 		if (likely(!sk_add_backlog(sk, skb, lim)))
 			continue;
-		*_skb = skb;
-		return -TIPC_ERR_OVERLOAD;
+
+		/* Overload => reject message back to sender */
+		tipc_sk_respond(sk, skb, TIPC_ERR_OVERLOAD);
+		break;
 	}
-	return TIPC_OK;
 }
 
 /**
@@ -1769,51 +1763,46 @@ static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
  * @inputq: buffer list containing the buffers
  * Consumes all buffers in list until inputq is empty
  * Note: may be called in multiple threads referring to the same queue
- * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH
- * Only node local calls check the return value, sending single-buffer queues
  */
-int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
+void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
 {
 	u32 dnode, dport = 0;
 	int err;
-	struct sk_buff *skb;
 	struct tipc_sock *tsk;
-	struct tipc_net *tn;
 	struct sock *sk;
+	struct sk_buff *skb;
 
 	while (skb_queue_len(inputq)) {
-		err = -TIPC_ERR_NO_PORT;
-		skb = NULL;
 		dport = tipc_skb_peek_port(inputq, dport);
 		tsk = tipc_sk_lookup(net, dport);
+
 		if (likely(tsk)) {
 			sk = &tsk->sk;
 			if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
-				err = tipc_sk_enqueue(inputq, sk, dport, &skb);
+				tipc_sk_enqueue(inputq, sk, dport);
 				spin_unlock_bh(&sk->sk_lock.slock);
-				dport = 0;
 			}
 			sock_put(sk);
-		} else {
-			skb = tipc_skb_dequeue(inputq, dport);
-		}
-		if (likely(!skb))
 			continue;
-		if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
-			goto xmit;
-		if (!err) {
-			dnode = msg_destnode(buf_msg(skb));
-			goto xmit;
-		} else {
-			dnode = msg_prevnode(buf_msg(skb));
 		}
-		tn = net_generic(net, tipc_net_id);
-		if (!tipc_msg_reverse(tn->own_addr, &skb, -err))
+
+		/* No destination socket => dequeue skb if still there */
+		skb = tipc_skb_dequeue(inputq, dport);
+		if (!skb)
+			return;
+
+		/* Try secondary lookup if unresolved named message */
+		err = TIPC_ERR_NO_PORT;
+		if (tipc_msg_lookup_dest(net, skb, &err))
+			goto xmit;
+
+		/* Prepare for message rejection */
+		if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
 			continue;
 xmit:
+		dnode = msg_destnode(buf_msg(skb));
 		tipc_node_xmit_skb(net, skb, dnode, dport);
 	}
-	return err ? -EHOSTUNREACH : 0;
 }
 
 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
@@ -2082,7 +2071,10 @@ static int tipc_shutdown(struct socket *sock, int how)
 	struct net *net = sock_net(sk);
 	struct tipc_sock *tsk = tipc_sk(sk);
 	struct sk_buff *skb;
-	u32 dnode;
+	u32 dnode = tsk_peer_node(tsk);
+	u32 dport = tsk_peer_port(tsk);
+	u32 onode = tipc_own_addr(net);
+	u32 oport = tsk->portid;
 	int res;
 
 	if (how != SHUT_RDWR)
@@ -2108,9 +2100,8 @@ restart:
 		} else {
 			skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
 					      TIPC_CONN_MSG, SHORT_H_SIZE,
-					      0, dnode, tsk_own_node(tsk),
-					      tsk_peer_port(tsk),
-					      tsk->portid, TIPC_CONN_SHUTDOWN);
+					      0, dnode, onode, dport, oport,
+					      TIPC_CONN_SHUTDOWN);
 			tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
 		}
 		tsk->connected = 0;
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index bf65513..4241f22 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -44,7 +44,7 @@
 				  SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
 int tipc_socket_init(void);
 void tipc_socket_stop(void);
-int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);
+void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);
 void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
 		       struct sk_buff_head *inputq);
 void tipc_sk_reinit(struct net *net);
-- 
1.9.1

^ permalink raw reply related	[flat|nested] 5+ messages in thread

* Re: [PATCH net-next 0/3] tipc: clean up socket message reception
  2015-07-22 14:11 [PATCH net-next 0/3] tipc: clean up socket message reception Jon Maloy
                   ` (2 preceding siblings ...)
  2015-07-22 14:11 ` [PATCH net-next 3/3] tipc: clean up socket layer message reception Jon Maloy
@ 2015-07-26 23:32 ` David Miller
  3 siblings, 0 replies; 5+ messages in thread
From: David Miller @ 2015-07-26 23:32 UTC (permalink / raw)
  To: jon.maloy
  Cc: netdev, paul.gortmaker, erik.hugne, ying.xue, maloy,
	tipc-discussion

From: Jon Maloy <jon.maloy@ericsson.com>
Date: Wed, 22 Jul 2015 10:11:17 -0400

> Despite recent improvements the message reception code in socket.c is
> perceived as obscure and hard to follow, especially regarding the logics
> for message rejection. With the commits in this series we try to remedy
> this situation.

Series applied, thanks Jon.

^ permalink raw reply	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2015-07-26 23:32 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2015-07-22 14:11 [PATCH net-next 0/3] tipc: clean up socket message reception Jon Maloy
2015-07-22 14:11 ` [PATCH net-next 1/3] tipc: let function tipc_msg_reverse() expand header when needed Jon Maloy
2015-07-22 14:11 ` [PATCH net-next 2/3] tipc: introduce new tipc_sk_respond() function Jon Maloy
2015-07-22 14:11 ` [PATCH net-next 3/3] tipc: clean up socket layer message reception Jon Maloy
2015-07-26 23:32 ` [PATCH net-next 0/3] tipc: clean up socket " David Miller

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).