* [PATCH net-next 0/3] tipc: redesign socket-level flow control
@ 2016-05-02 14:22 Jon Maloy
2016-05-02 14:22 ` [PATCH net-next 1/3] tipc: re-enable compensation for socket receive buffer double counting Jon Maloy
` (3 more replies)
0 siblings, 4 replies; 6+ messages in thread
From: Jon Maloy @ 2016-05-02 14:22 UTC (permalink / raw)
To: davem; +Cc: Jon Maloy, netdev, Paul Gortmaker, tipc-discussion
The socket-level flow control in TIPC has long been due for a major
overhaul. This series fixes this.
Jon Maloy (3):
tipc: re-enable compensation for socket receive buffer double counting
tipc: propagate peer node capabilities to socket layer
tipc: redesign connection-level flow control
net/tipc/core.c | 8 ++-
net/tipc/msg.h | 14 +++++-
net/tipc/node.c | 21 +++++++-
net/tipc/node.h | 6 ++-
net/tipc/socket.c | 144 +++++++++++++++++++++++++++++++++++-------------------
net/tipc/socket.h | 17 +++++--
6 files changed, 145 insertions(+), 65 deletions(-)
--
1.9.1
------------------------------------------------------------------------------
Find and fix application performance issues faster with Applications Manager
Applications Manager provides deep performance insights into multiple tiers of
your business applications. It resolves application problems quickly and
reduces your MTTR. Get your free trial!
https://ad.doubleclick.net/ddm/clk/302982198;130105516;z
^ permalink raw reply [flat|nested] 6+ messages in thread
* [PATCH net-next 1/3] tipc: re-enable compensation for socket receive buffer double counting
2016-05-02 14:22 [PATCH net-next 0/3] tipc: redesign socket-level flow control Jon Maloy
@ 2016-05-02 14:22 ` Jon Maloy
2016-05-02 14:22 ` [PATCH net-next 2/3] tipc: propagate peer node capabilities to socket layer Jon Maloy
` (2 subsequent siblings)
3 siblings, 0 replies; 6+ messages in thread
From: Jon Maloy @ 2016-05-02 14:22 UTC (permalink / raw)
To: davem; +Cc: Jon Maloy, netdev, Paul Gortmaker, tipc-discussion
In the refactoring commit d570d86497ee ("tipc: enqueue arrived buffers
in socket in separate function") we did by accident replace the test
if (sk->sk_backlog.len == 0)
atomic_set(&tsk->dupl_rcvcnt, 0);
with
if (sk->sk_backlog.len)
atomic_set(&tsk->dupl_rcvcnt, 0);
This effectively disables the compensation we have for the double
receive buffer accounting that occurs temporarily when buffers are
moved from the backlog to the socket receive queue. Until now, this
has gone unnoticed because of the large receive buffer limits we are
applying, but becomes indispensable when we reduce this buffer limit
later in this series.
We now fix this by inverting the mentioned condition.
Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
net/tipc/socket.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 3eeb50a..d37a940 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1748,7 +1748,7 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
/* Try backlog, compensating for double-counted bytes */
dcnt = &tipc_sk(sk)->dupl_rcvcnt;
- if (sk->sk_backlog.len)
+ if (!sk->sk_backlog.len)
atomic_set(dcnt, 0);
lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
if (likely(!sk_add_backlog(sk, skb, lim)))
--
1.9.1
------------------------------------------------------------------------------
Find and fix application performance issues faster with Applications Manager
Applications Manager provides deep performance insights into multiple tiers of
your business applications. It resolves application problems quickly and
reduces your MTTR. Get your free trial!
https://ad.doubleclick.net/ddm/clk/302982198;130105516;z
^ permalink raw reply related [flat|nested] 6+ messages in thread
* [PATCH net-next 2/3] tipc: propagate peer node capabilities to socket layer
2016-05-02 14:22 [PATCH net-next 0/3] tipc: redesign socket-level flow control Jon Maloy
2016-05-02 14:22 ` [PATCH net-next 1/3] tipc: re-enable compensation for socket receive buffer double counting Jon Maloy
@ 2016-05-02 14:22 ` Jon Maloy
2016-05-02 14:22 ` [PATCH net-next 3/3] tipc: redesign connection-level flow control Jon Maloy
2016-05-03 19:51 ` [PATCH net-next 0/3] tipc: redesign socket-level " David Miller
3 siblings, 0 replies; 6+ messages in thread
From: Jon Maloy @ 2016-05-02 14:22 UTC (permalink / raw)
To: davem; +Cc: Jon Maloy, netdev, Paul Gortmaker, tipc-discussion
During neighbor discovery, nodes advertise their capabilities as a bit
map in a dedicated 16-bit field in the discovery message header. This
bit map has so far only be stored in the node structure on the peer
nodes, but we now see the need to keep a copy even in the socket
structure.
This commit adds this functionality.
Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
net/tipc/node.c | 21 +++++++++++++++++++--
net/tipc/node.h | 1 +
net/tipc/socket.c | 2 ++
3 files changed, 22 insertions(+), 2 deletions(-)
diff --git a/net/tipc/node.c b/net/tipc/node.c
index c299156..29cc853 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1,7 +1,7 @@
/*
* net/tipc/node.c: TIPC node management routines
*
- * Copyright (c) 2000-2006, 2012-2015, Ericsson AB
+ * Copyright (c) 2000-2006, 2012-2016, Ericsson AB
* Copyright (c) 2005-2006, 2010-2014, Wind River Systems
* All rights reserved.
*
@@ -191,6 +191,20 @@ int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel)
tipc_node_put(n);
return mtu;
}
+
+u16 tipc_node_get_capabilities(struct net *net, u32 addr)
+{
+ struct tipc_node *n;
+ u16 caps;
+
+ n = tipc_node_find(net, addr);
+ if (unlikely(!n))
+ return TIPC_NODE_CAPABILITIES;
+ caps = n->capabilities;
+ tipc_node_put(n);
+ return caps;
+}
+
/*
* A trivial power-of-two bitmask technique is used for speed, since this
* operation is done for every incoming TIPC packet. The number of hash table
@@ -304,8 +318,11 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
spin_lock_bh(&tn->node_list_lock);
n = tipc_node_find(net, addr);
- if (n)
+ if (n) {
+ /* Same node may come back with new capabilities */
+ n->capabilities = capabilities;
goto exit;
+ }
n = kzalloc(sizeof(*n), GFP_ATOMIC);
if (!n) {
pr_warn("Node creation failed, no memory\n");
diff --git a/net/tipc/node.h b/net/tipc/node.h
index f39d9d0..1823768 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -70,6 +70,7 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb);
int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel);
+u16 tipc_node_get_capabilities(struct net *net, u32 addr);
int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb);
int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb);
int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct genl_info *info);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index d37a940..94bd286 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -98,6 +98,7 @@ struct tipc_sock {
bool link_cong;
uint sent_unacked;
uint rcv_unacked;
+ u16 peer_caps;
struct sockaddr_tipc remote;
struct rhash_head node;
struct rcu_head rcu;
@@ -1118,6 +1119,7 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
+ tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
}
/**
--
1.9.1
------------------------------------------------------------------------------
Find and fix application performance issues faster with Applications Manager
Applications Manager provides deep performance insights into multiple tiers of
your business applications. It resolves application problems quickly and
reduces your MTTR. Get your free trial!
https://ad.doubleclick.net/ddm/clk/302982198;130105516;z
^ permalink raw reply related [flat|nested] 6+ messages in thread
* [PATCH net-next 3/3] tipc: redesign connection-level flow control
2016-05-02 14:22 [PATCH net-next 0/3] tipc: redesign socket-level flow control Jon Maloy
2016-05-02 14:22 ` [PATCH net-next 1/3] tipc: re-enable compensation for socket receive buffer double counting Jon Maloy
2016-05-02 14:22 ` [PATCH net-next 2/3] tipc: propagate peer node capabilities to socket layer Jon Maloy
@ 2016-05-02 14:22 ` Jon Maloy
2016-05-03 19:51 ` [PATCH net-next 0/3] tipc: redesign socket-level " David Miller
3 siblings, 0 replies; 6+ messages in thread
From: Jon Maloy @ 2016-05-02 14:22 UTC (permalink / raw)
To: davem; +Cc: Jon Maloy, netdev, Paul Gortmaker, tipc-discussion
There are two flow control mechanisms in TIPC; one at link level that
handles network congestion, burst control, and retransmission, and one
at connection level which' only remaining task is to prevent overflow
in the receiving socket buffer. In TIPC, the latter task has to be
solved end-to-end because messages can not be thrown away once they
have been accepted and delivered upwards from the link layer, i.e, we
can never permit the receive buffer to overflow.
Currently, this algorithm is message based. A counter in the receiving
socket keeps track of number of consumed messages, and sends a dedicated
acknowledge message back to the sender for each 256 consumed message.
A counter at the sending end keeps track of the sent, not yet
acknowledged messages, and blocks the sender if this number ever reaches
512 unacknowledged messages. When the missing acknowledge arrives, the
socket is then woken up for renewed transmission. This works well for
keeping the message flow running, as it almost never happens that a
sender socket is blocked this way.
A problem with the current mechanism is that it potentially is very
memory consuming. Since we don't distinguish bewteen small and large
messages, we have to dimension the socket receive buffer according
to a worst-case of both. I.e., the window size must be chosen large
enough to sustain a reasonable throughput even for the smallest
messages, while we must still consider a scenario where all messages
are of maximum size. Hence, the current fix window size of 512 messages
and a maximum message size of 66k results in a receive buffer of 66 MB
when truesize(66k) = 131k is taken into account. It is possible to do
much better.
This commit introduces an algorithm where we instead use 1024-byte
blocks as base unit. This unit, always rounded upwards from the
actual message size, is used when we advertise windows as well as when
we count and acknowledge transmitted data. The advertised window is
based on the configured receive buffer size in such a way that even
the worst-case truesize/msgsize ratio always is covered. Since the
smallest possible message size (from a flow control viewpoint) now is
1024 bytes, we can safely assume this ratio to be less than four, which
is the value we are now using.
This way, we have been able to reduce the default receive buffer size
from 66 MB to 2 MB with maintained performance.
In order to keep this solution backwards compatible, we introduce a
new capability bit in the discovery protocol, and use this throughout
the message sending/reception path to always select the right unit.
Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
net/tipc/core.c | 8 ++--
net/tipc/msg.h | 14 +++++-
net/tipc/node.h | 5 +-
net/tipc/socket.c | 140 +++++++++++++++++++++++++++++++++++-------------------
net/tipc/socket.h | 17 +++++--
5 files changed, 122 insertions(+), 62 deletions(-)
diff --git a/net/tipc/core.c b/net/tipc/core.c
index e2bdb07a..fe1b062 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -112,11 +112,9 @@ static int __init tipc_init(void)
pr_info("Activated (version " TIPC_MOD_VER ")\n");
- sysctl_tipc_rmem[0] = TIPC_CONN_OVERLOAD_LIMIT >> 4 <<
- TIPC_LOW_IMPORTANCE;
- sysctl_tipc_rmem[1] = TIPC_CONN_OVERLOAD_LIMIT >> 4 <<
- TIPC_CRITICAL_IMPORTANCE;
- sysctl_tipc_rmem[2] = TIPC_CONN_OVERLOAD_LIMIT;
+ sysctl_tipc_rmem[0] = RCVBUF_MIN;
+ sysctl_tipc_rmem[1] = RCVBUF_DEF;
+ sysctl_tipc_rmem[2] = RCVBUF_MAX;
err = tipc_netlink_start();
if (err)
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 58bf515..024da8a 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -743,16 +743,26 @@ static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n)
msg_set_bits(m, 9, 16, 0xffff, n);
}
-static inline u32 msg_bcast_tag(struct tipc_msg *m)
+static inline u32 msg_conn_ack(struct tipc_msg *m)
{
return msg_bits(m, 9, 16, 0xffff);
}
-static inline void msg_set_bcast_tag(struct tipc_msg *m, u32 n)
+static inline void msg_set_conn_ack(struct tipc_msg *m, u32 n)
{
msg_set_bits(m, 9, 16, 0xffff, n);
}
+static inline u32 msg_adv_win(struct tipc_msg *m)
+{
+ return msg_bits(m, 9, 0, 0xffff);
+}
+
+static inline void msg_set_adv_win(struct tipc_msg *m, u32 n)
+{
+ msg_set_bits(m, 9, 0, 0xffff, n);
+}
+
static inline u32 msg_max_pkt(struct tipc_msg *m)
{
return msg_bits(m, 9, 16, 0xffff) * 4;
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 1823768..8264b3d 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -45,10 +45,11 @@
/* Optional capabilities supported by this code version
*/
enum {
- TIPC_BCAST_SYNCH = (1 << 1)
+ TIPC_BCAST_SYNCH = (1 << 1),
+ TIPC_BLOCK_FLOWCTL = (2 << 1)
};
-#define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH
+#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | TIPC_BLOCK_FLOWCTL)
#define INVALID_BEARER_ID -1
void tipc_node_stop(struct net *net);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 94bd286..1262889 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -96,9 +96,11 @@ struct tipc_sock {
uint conn_timeout;
atomic_t dupl_rcvcnt;
bool link_cong;
- uint sent_unacked;
- uint rcv_unacked;
+ u16 snt_unacked;
+ u16 snd_win;
u16 peer_caps;
+ u16 rcv_unacked;
+ u16 rcv_win;
struct sockaddr_tipc remote;
struct rhash_head node;
struct rcu_head rcu;
@@ -228,9 +230,29 @@ static struct tipc_sock *tipc_sk(const struct sock *sk)
return container_of(sk, struct tipc_sock, sk);
}
-static int tsk_conn_cong(struct tipc_sock *tsk)
+static bool tsk_conn_cong(struct tipc_sock *tsk)
{
- return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
+ return tsk->snt_unacked >= tsk->snd_win;
+}
+
+/* tsk_blocks(): translate a buffer size in bytes to number of
+ * advertisable blocks, taking into account the ratio truesize(len)/len
+ * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
+ */
+static u16 tsk_adv_blocks(int len)
+{
+ return len / FLOWCTL_BLK_SZ / 4;
+}
+
+/* tsk_inc(): increment counter for sent or received data
+ * - If block based flow control is not supported by peer we
+ * fall back to message based ditto, incrementing the counter
+ */
+static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
+{
+ if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
+ return ((msglen / FLOWCTL_BLK_SZ) + 1);
+ return 1;
}
/**
@@ -378,9 +400,12 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
sk->sk_write_space = tipc_write_space;
sk->sk_destruct = tipc_sock_destruct;
tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
- tsk->sent_unacked = 0;
atomic_set(&tsk->dupl_rcvcnt, 0);
+ /* Start out with safe limits until we receive an advertised window */
+ tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
+ tsk->rcv_win = tsk->snd_win;
+
if (sock->state == SS_READY) {
tsk_set_unreturnable(tsk, true);
if (sock->type == SOCK_DGRAM)
@@ -776,7 +801,7 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
struct sock *sk = &tsk->sk;
struct tipc_msg *hdr = buf_msg(skb);
int mtyp = msg_type(hdr);
- int conn_cong;
+ bool conn_cong;
/* Ignore if connection cannot be validated: */
if (!tsk_peer_msg(tsk, hdr))
@@ -790,7 +815,9 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
return;
} else if (mtyp == CONN_ACK) {
conn_cong = tsk_conn_cong(tsk);
- tsk->sent_unacked -= msg_msgcnt(hdr);
+ tsk->snt_unacked -= msg_conn_ack(hdr);
+ if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
+ tsk->snd_win = msg_adv_win(hdr);
if (conn_cong)
sk->sk_write_space(sk);
} else if (mtyp != CONN_PROBE_REPLY) {
@@ -1021,12 +1048,14 @@ static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
u32 dnode;
uint mtu, send, sent = 0;
struct iov_iter save;
+ int hlen = MIN_H_SIZE;
/* Handle implied connection establishment */
if (unlikely(dest)) {
rc = __tipc_sendmsg(sock, m, dsz);
+ hlen = msg_hdr_sz(mhdr);
if (dsz && (dsz == rc))
- tsk->sent_unacked = 1;
+ tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
return rc;
}
if (dsz > (uint)INT_MAX)
@@ -1055,7 +1084,7 @@ next:
if (likely(!tsk_conn_cong(tsk))) {
rc = tipc_node_xmit(net, &pktchain, dnode, portid);
if (likely(!rc)) {
- tsk->sent_unacked++;
+ tsk->snt_unacked += tsk_inc(tsk, send + hlen);
sent += send;
if (sent == dsz)
return dsz;
@@ -1120,6 +1149,12 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
+ if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
+ return;
+
+ /* Fall back to message based flow control */
+ tsk->rcv_win = FLOWCTL_MSG_WIN;
+ tsk->snd_win = FLOWCTL_MSG_WIN;
}
/**
@@ -1216,7 +1251,7 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
return 0;
}
-static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
+static void tipc_sk_send_ack(struct tipc_sock *tsk)
{
struct net *net = sock_net(&tsk->sk);
struct sk_buff *skb = NULL;
@@ -1232,7 +1267,14 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
if (!skb)
return;
msg = buf_msg(skb);
- msg_set_msgcnt(msg, ack);
+ msg_set_conn_ack(msg, tsk->rcv_unacked);
+ tsk->rcv_unacked = 0;
+
+ /* Adjust to and advertize the correct window limit */
+ if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
+ tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
+ msg_set_adv_win(msg, tsk->rcv_win);
+ }
tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
}
@@ -1290,7 +1332,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
long timeo;
unsigned int sz;
u32 err;
- int res;
+ int res, hlen;
/* Catch invalid receive requests */
if (unlikely(!buf_len))
@@ -1315,6 +1357,7 @@ restart:
buf = skb_peek(&sk->sk_receive_queue);
msg = buf_msg(buf);
sz = msg_data_sz(msg);
+ hlen = msg_hdr_sz(msg);
err = msg_errcode(msg);
/* Discard an empty non-errored message & try again */
@@ -1337,7 +1380,7 @@ restart:
sz = buf_len;
m->msg_flags |= MSG_TRUNC;
}
- res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz);
+ res = skb_copy_datagram_msg(buf, hlen, m, sz);
if (res)
goto exit;
res = sz;
@@ -1349,15 +1392,15 @@ restart:
res = -ECONNRESET;
}
- /* Consume received message (optional) */
- if (likely(!(flags & MSG_PEEK))) {
- if ((sock->state != SS_READY) &&
- (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
- tipc_sk_send_ack(tsk, tsk->rcv_unacked);
- tsk->rcv_unacked = 0;
- }
- tsk_advance_rx_queue(sk);
+ if (unlikely(flags & MSG_PEEK))
+ goto exit;
+
+ if (likely(sock->state != SS_READY)) {
+ tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
+ if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
+ tipc_sk_send_ack(tsk);
}
+ tsk_advance_rx_queue(sk);
exit:
release_sock(sk);
return res;
@@ -1386,7 +1429,7 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
int sz_to_copy, target, needed;
int sz_copied = 0;
u32 err;
- int res = 0;
+ int res = 0, hlen;
/* Catch invalid receive attempts */
if (unlikely(!buf_len))
@@ -1412,6 +1455,7 @@ restart:
buf = skb_peek(&sk->sk_receive_queue);
msg = buf_msg(buf);
sz = msg_data_sz(msg);
+ hlen = msg_hdr_sz(msg);
err = msg_errcode(msg);
/* Discard an empty non-errored message & try again */
@@ -1436,8 +1480,7 @@ restart:
needed = (buf_len - sz_copied);
sz_to_copy = (sz <= needed) ? sz : needed;
- res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset,
- m, sz_to_copy);
+ res = skb_copy_datagram_msg(buf, hlen + offset, m, sz_to_copy);
if (res)
goto exit;
@@ -1459,20 +1502,18 @@ restart:
res = -ECONNRESET;
}
- /* Consume received message (optional) */
- if (likely(!(flags & MSG_PEEK))) {
- if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
- tipc_sk_send_ack(tsk, tsk->rcv_unacked);
- tsk->rcv_unacked = 0;
- }
- tsk_advance_rx_queue(sk);
- }
+ if (unlikely(flags & MSG_PEEK))
+ goto exit;
+
+ tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
+ if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
+ tipc_sk_send_ack(tsk);
+ tsk_advance_rx_queue(sk);
/* Loop around if more data is required */
if ((sz_copied < buf_len) && /* didn't get all requested data */
(!skb_queue_empty(&sk->sk_receive_queue) ||
(sz_copied < target)) && /* and more is ready or required */
- (!(flags & MSG_PEEK)) && /* and aren't just peeking at data */
(!err)) /* and haven't reached a FIN */
goto restart;
@@ -1604,30 +1645,33 @@ static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
/**
* rcvbuf_limit - get proper overload limit of socket receive queue
* @sk: socket
- * @buf: message
+ * @skb: message
*
- * For all connection oriented messages, irrespective of importance,
- * the default overload value (i.e. 67MB) is set as limit.
+ * For connection oriented messages, irrespective of importance,
+ * default queue limit is 2 MB.
*
- * For all connectionless messages, by default new queue limits are
- * as belows:
+ * For connectionless messages, queue limits are based on message
+ * importance as follows:
*
- * TIPC_LOW_IMPORTANCE (4 MB)
- * TIPC_MEDIUM_IMPORTANCE (8 MB)
- * TIPC_HIGH_IMPORTANCE (16 MB)
- * TIPC_CRITICAL_IMPORTANCE (32 MB)
+ * TIPC_LOW_IMPORTANCE (2 MB)
+ * TIPC_MEDIUM_IMPORTANCE (4 MB)
+ * TIPC_HIGH_IMPORTANCE (8 MB)
+ * TIPC_CRITICAL_IMPORTANCE (16 MB)
*
* Returns overload limit according to corresponding message importance
*/
-static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
+static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
{
- struct tipc_msg *msg = buf_msg(buf);
+ struct tipc_sock *tsk = tipc_sk(sk);
+ struct tipc_msg *hdr = buf_msg(skb);
+
+ if (unlikely(!msg_connected(hdr)))
+ return sk->sk_rcvbuf << msg_importance(hdr);
- if (msg_connected(msg))
- return sysctl_tipc_rmem[2];
+ if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
+ return sk->sk_rcvbuf;
- return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
- msg_importance(msg);
+ return FLOWCTL_MSG_LIM;
}
/**
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 4241f22..06fb594 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -1,6 +1,6 @@
/* net/tipc/socket.h: Include file for TIPC socket code
*
- * Copyright (c) 2014-2015, Ericsson AB
+ * Copyright (c) 2014-2016, Ericsson AB
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -38,10 +38,17 @@
#include <net/sock.h>
#include <net/genetlink.h>
-#define TIPC_CONNACK_INTV 256
-#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2)
-#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \
- SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
+/* Compatibility values for deprecated message based flow control */
+#define FLOWCTL_MSG_WIN 512
+#define FLOWCTL_MSG_LIM ((FLOWCTL_MSG_WIN * 2 + 1) * SKB_TRUESIZE(MAX_MSG_SIZE))
+
+#define FLOWCTL_BLK_SZ 1024
+
+/* Socket receive buffer sizes */
+#define RCVBUF_MIN (FLOWCTL_BLK_SZ * 512)
+#define RCVBUF_DEF (FLOWCTL_BLK_SZ * 1024 * 2)
+#define RCVBUF_MAX (FLOWCTL_BLK_SZ * 1024 * 16)
+
int tipc_socket_init(void);
void tipc_socket_stop(void);
void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);
--
1.9.1
------------------------------------------------------------------------------
Find and fix application performance issues faster with Applications Manager
Applications Manager provides deep performance insights into multiple tiers of
your business applications. It resolves application problems quickly and
reduces your MTTR. Get your free trial!
https://ad.doubleclick.net/ddm/clk/302982198;130105516;z
^ permalink raw reply related [flat|nested] 6+ messages in thread
* [PATCH net-next 3/3] tipc: redesign connection-level flow control
2016-05-02 15:58 Jon Maloy
@ 2016-05-02 15:58 ` Jon Maloy
0 siblings, 0 replies; 6+ messages in thread
From: Jon Maloy @ 2016-05-02 15:58 UTC (permalink / raw)
To: davem
Cc: netdev, Paul Gortmaker, parthasarathy.bhuvaragan, richard.alpe,
ying.xue, maloy, tipc-discussion, Jon Maloy
There are two flow control mechanisms in TIPC; one at link level that
handles network congestion, burst control, and retransmission, and one
at connection level which' only remaining task is to prevent overflow
in the receiving socket buffer. In TIPC, the latter task has to be
solved end-to-end because messages can not be thrown away once they
have been accepted and delivered upwards from the link layer, i.e, we
can never permit the receive buffer to overflow.
Currently, this algorithm is message based. A counter in the receiving
socket keeps track of number of consumed messages, and sends a dedicated
acknowledge message back to the sender for each 256 consumed message.
A counter at the sending end keeps track of the sent, not yet
acknowledged messages, and blocks the sender if this number ever reaches
512 unacknowledged messages. When the missing acknowledge arrives, the
socket is then woken up for renewed transmission. This works well for
keeping the message flow running, as it almost never happens that a
sender socket is blocked this way.
A problem with the current mechanism is that it potentially is very
memory consuming. Since we don't distinguish between small and large
messages, we have to dimension the socket receive buffer according
to a worst-case of both. I.e., the window size must be chosen large
enough to sustain a reasonable throughput even for the smallest
messages, while we must still consider a scenario where all messages
are of maximum size. Hence, the current fix window size of 512 messages
and a maximum message size of 66k results in a receive buffer of 66 MB
when truesize(66k) = 131k is taken into account. It is possible to do
much better.
This commit introduces an algorithm where we instead use 1024-byte
blocks as base unit. This unit, always rounded upwards from the
actual message size, is used when we advertise windows as well as when
we count and acknowledge transmitted data. The advertised window is
based on the configured receive buffer size in such a way that even
the worst-case truesize/msgsize ratio always is covered. Since the
smallest possible message size (from a flow control viewpoint) now is
1024 bytes, we can safely assume this ratio to be less than four, which
is the value we are now using.
This way, we have been able to reduce the default receive buffer size
from 66 MB to 2 MB with maintained performance.
In order to keep this solution backwards compatible, we introduce a
new capability bit in the discovery protocol, and use this throughout
the message sending/reception path to always select the right unit.
Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
net/tipc/core.c | 8 ++--
net/tipc/msg.h | 14 +++++-
net/tipc/node.h | 5 +-
net/tipc/socket.c | 140 +++++++++++++++++++++++++++++++++++-------------------
net/tipc/socket.h | 17 +++++--
5 files changed, 122 insertions(+), 62 deletions(-)
diff --git a/net/tipc/core.c b/net/tipc/core.c
index e2bdb07a..fe1b062 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -112,11 +112,9 @@ static int __init tipc_init(void)
pr_info("Activated (version " TIPC_MOD_VER ")\n");
- sysctl_tipc_rmem[0] = TIPC_CONN_OVERLOAD_LIMIT >> 4 <<
- TIPC_LOW_IMPORTANCE;
- sysctl_tipc_rmem[1] = TIPC_CONN_OVERLOAD_LIMIT >> 4 <<
- TIPC_CRITICAL_IMPORTANCE;
- sysctl_tipc_rmem[2] = TIPC_CONN_OVERLOAD_LIMIT;
+ sysctl_tipc_rmem[0] = RCVBUF_MIN;
+ sysctl_tipc_rmem[1] = RCVBUF_DEF;
+ sysctl_tipc_rmem[2] = RCVBUF_MAX;
err = tipc_netlink_start();
if (err)
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 58bf515..024da8a 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -743,16 +743,26 @@ static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n)
msg_set_bits(m, 9, 16, 0xffff, n);
}
-static inline u32 msg_bcast_tag(struct tipc_msg *m)
+static inline u32 msg_conn_ack(struct tipc_msg *m)
{
return msg_bits(m, 9, 16, 0xffff);
}
-static inline void msg_set_bcast_tag(struct tipc_msg *m, u32 n)
+static inline void msg_set_conn_ack(struct tipc_msg *m, u32 n)
{
msg_set_bits(m, 9, 16, 0xffff, n);
}
+static inline u32 msg_adv_win(struct tipc_msg *m)
+{
+ return msg_bits(m, 9, 0, 0xffff);
+}
+
+static inline void msg_set_adv_win(struct tipc_msg *m, u32 n)
+{
+ msg_set_bits(m, 9, 0, 0xffff, n);
+}
+
static inline u32 msg_max_pkt(struct tipc_msg *m)
{
return msg_bits(m, 9, 16, 0xffff) * 4;
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 1823768..8264b3d 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -45,10 +45,11 @@
/* Optional capabilities supported by this code version
*/
enum {
- TIPC_BCAST_SYNCH = (1 << 1)
+ TIPC_BCAST_SYNCH = (1 << 1),
+ TIPC_BLOCK_FLOWCTL = (2 << 1)
};
-#define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH
+#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | TIPC_BLOCK_FLOWCTL)
#define INVALID_BEARER_ID -1
void tipc_node_stop(struct net *net);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 94bd286..1262889 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -96,9 +96,11 @@ struct tipc_sock {
uint conn_timeout;
atomic_t dupl_rcvcnt;
bool link_cong;
- uint sent_unacked;
- uint rcv_unacked;
+ u16 snt_unacked;
+ u16 snd_win;
u16 peer_caps;
+ u16 rcv_unacked;
+ u16 rcv_win;
struct sockaddr_tipc remote;
struct rhash_head node;
struct rcu_head rcu;
@@ -228,9 +230,29 @@ static struct tipc_sock *tipc_sk(const struct sock *sk)
return container_of(sk, struct tipc_sock, sk);
}
-static int tsk_conn_cong(struct tipc_sock *tsk)
+static bool tsk_conn_cong(struct tipc_sock *tsk)
{
- return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
+ return tsk->snt_unacked >= tsk->snd_win;
+}
+
+/* tsk_blocks(): translate a buffer size in bytes to number of
+ * advertisable blocks, taking into account the ratio truesize(len)/len
+ * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
+ */
+static u16 tsk_adv_blocks(int len)
+{
+ return len / FLOWCTL_BLK_SZ / 4;
+}
+
+/* tsk_inc(): increment counter for sent or received data
+ * - If block based flow control is not supported by peer we
+ * fall back to message based ditto, incrementing the counter
+ */
+static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
+{
+ if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
+ return ((msglen / FLOWCTL_BLK_SZ) + 1);
+ return 1;
}
/**
@@ -378,9 +400,12 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
sk->sk_write_space = tipc_write_space;
sk->sk_destruct = tipc_sock_destruct;
tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
- tsk->sent_unacked = 0;
atomic_set(&tsk->dupl_rcvcnt, 0);
+ /* Start out with safe limits until we receive an advertised window */
+ tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
+ tsk->rcv_win = tsk->snd_win;
+
if (sock->state == SS_READY) {
tsk_set_unreturnable(tsk, true);
if (sock->type == SOCK_DGRAM)
@@ -776,7 +801,7 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
struct sock *sk = &tsk->sk;
struct tipc_msg *hdr = buf_msg(skb);
int mtyp = msg_type(hdr);
- int conn_cong;
+ bool conn_cong;
/* Ignore if connection cannot be validated: */
if (!tsk_peer_msg(tsk, hdr))
@@ -790,7 +815,9 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
return;
} else if (mtyp == CONN_ACK) {
conn_cong = tsk_conn_cong(tsk);
- tsk->sent_unacked -= msg_msgcnt(hdr);
+ tsk->snt_unacked -= msg_conn_ack(hdr);
+ if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
+ tsk->snd_win = msg_adv_win(hdr);
if (conn_cong)
sk->sk_write_space(sk);
} else if (mtyp != CONN_PROBE_REPLY) {
@@ -1021,12 +1048,14 @@ static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
u32 dnode;
uint mtu, send, sent = 0;
struct iov_iter save;
+ int hlen = MIN_H_SIZE;
/* Handle implied connection establishment */
if (unlikely(dest)) {
rc = __tipc_sendmsg(sock, m, dsz);
+ hlen = msg_hdr_sz(mhdr);
if (dsz && (dsz == rc))
- tsk->sent_unacked = 1;
+ tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
return rc;
}
if (dsz > (uint)INT_MAX)
@@ -1055,7 +1084,7 @@ next:
if (likely(!tsk_conn_cong(tsk))) {
rc = tipc_node_xmit(net, &pktchain, dnode, portid);
if (likely(!rc)) {
- tsk->sent_unacked++;
+ tsk->snt_unacked += tsk_inc(tsk, send + hlen);
sent += send;
if (sent == dsz)
return dsz;
@@ -1120,6 +1149,12 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
+ if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
+ return;
+
+ /* Fall back to message based flow control */
+ tsk->rcv_win = FLOWCTL_MSG_WIN;
+ tsk->snd_win = FLOWCTL_MSG_WIN;
}
/**
@@ -1216,7 +1251,7 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
return 0;
}
-static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
+static void tipc_sk_send_ack(struct tipc_sock *tsk)
{
struct net *net = sock_net(&tsk->sk);
struct sk_buff *skb = NULL;
@@ -1232,7 +1267,14 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
if (!skb)
return;
msg = buf_msg(skb);
- msg_set_msgcnt(msg, ack);
+ msg_set_conn_ack(msg, tsk->rcv_unacked);
+ tsk->rcv_unacked = 0;
+
+ /* Adjust to and advertize the correct window limit */
+ if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
+ tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
+ msg_set_adv_win(msg, tsk->rcv_win);
+ }
tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
}
@@ -1290,7 +1332,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
long timeo;
unsigned int sz;
u32 err;
- int res;
+ int res, hlen;
/* Catch invalid receive requests */
if (unlikely(!buf_len))
@@ -1315,6 +1357,7 @@ restart:
buf = skb_peek(&sk->sk_receive_queue);
msg = buf_msg(buf);
sz = msg_data_sz(msg);
+ hlen = msg_hdr_sz(msg);
err = msg_errcode(msg);
/* Discard an empty non-errored message & try again */
@@ -1337,7 +1380,7 @@ restart:
sz = buf_len;
m->msg_flags |= MSG_TRUNC;
}
- res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz);
+ res = skb_copy_datagram_msg(buf, hlen, m, sz);
if (res)
goto exit;
res = sz;
@@ -1349,15 +1392,15 @@ restart:
res = -ECONNRESET;
}
- /* Consume received message (optional) */
- if (likely(!(flags & MSG_PEEK))) {
- if ((sock->state != SS_READY) &&
- (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
- tipc_sk_send_ack(tsk, tsk->rcv_unacked);
- tsk->rcv_unacked = 0;
- }
- tsk_advance_rx_queue(sk);
+ if (unlikely(flags & MSG_PEEK))
+ goto exit;
+
+ if (likely(sock->state != SS_READY)) {
+ tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
+ if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
+ tipc_sk_send_ack(tsk);
}
+ tsk_advance_rx_queue(sk);
exit:
release_sock(sk);
return res;
@@ -1386,7 +1429,7 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
int sz_to_copy, target, needed;
int sz_copied = 0;
u32 err;
- int res = 0;
+ int res = 0, hlen;
/* Catch invalid receive attempts */
if (unlikely(!buf_len))
@@ -1412,6 +1455,7 @@ restart:
buf = skb_peek(&sk->sk_receive_queue);
msg = buf_msg(buf);
sz = msg_data_sz(msg);
+ hlen = msg_hdr_sz(msg);
err = msg_errcode(msg);
/* Discard an empty non-errored message & try again */
@@ -1436,8 +1480,7 @@ restart:
needed = (buf_len - sz_copied);
sz_to_copy = (sz <= needed) ? sz : needed;
- res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset,
- m, sz_to_copy);
+ res = skb_copy_datagram_msg(buf, hlen + offset, m, sz_to_copy);
if (res)
goto exit;
@@ -1459,20 +1502,18 @@ restart:
res = -ECONNRESET;
}
- /* Consume received message (optional) */
- if (likely(!(flags & MSG_PEEK))) {
- if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
- tipc_sk_send_ack(tsk, tsk->rcv_unacked);
- tsk->rcv_unacked = 0;
- }
- tsk_advance_rx_queue(sk);
- }
+ if (unlikely(flags & MSG_PEEK))
+ goto exit;
+
+ tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
+ if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
+ tipc_sk_send_ack(tsk);
+ tsk_advance_rx_queue(sk);
/* Loop around if more data is required */
if ((sz_copied < buf_len) && /* didn't get all requested data */
(!skb_queue_empty(&sk->sk_receive_queue) ||
(sz_copied < target)) && /* and more is ready or required */
- (!(flags & MSG_PEEK)) && /* and aren't just peeking at data */
(!err)) /* and haven't reached a FIN */
goto restart;
@@ -1604,30 +1645,33 @@ static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
/**
* rcvbuf_limit - get proper overload limit of socket receive queue
* @sk: socket
- * @buf: message
+ * @skb: message
*
- * For all connection oriented messages, irrespective of importance,
- * the default overload value (i.e. 67MB) is set as limit.
+ * For connection oriented messages, irrespective of importance,
+ * default queue limit is 2 MB.
*
- * For all connectionless messages, by default new queue limits are
- * as belows:
+ * For connectionless messages, queue limits are based on message
+ * importance as follows:
*
- * TIPC_LOW_IMPORTANCE (4 MB)
- * TIPC_MEDIUM_IMPORTANCE (8 MB)
- * TIPC_HIGH_IMPORTANCE (16 MB)
- * TIPC_CRITICAL_IMPORTANCE (32 MB)
+ * TIPC_LOW_IMPORTANCE (2 MB)
+ * TIPC_MEDIUM_IMPORTANCE (4 MB)
+ * TIPC_HIGH_IMPORTANCE (8 MB)
+ * TIPC_CRITICAL_IMPORTANCE (16 MB)
*
* Returns overload limit according to corresponding message importance
*/
-static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
+static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
{
- struct tipc_msg *msg = buf_msg(buf);
+ struct tipc_sock *tsk = tipc_sk(sk);
+ struct tipc_msg *hdr = buf_msg(skb);
+
+ if (unlikely(!msg_connected(hdr)))
+ return sk->sk_rcvbuf << msg_importance(hdr);
- if (msg_connected(msg))
- return sysctl_tipc_rmem[2];
+ if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
+ return sk->sk_rcvbuf;
- return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
- msg_importance(msg);
+ return FLOWCTL_MSG_LIM;
}
/**
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 4241f22..06fb594 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -1,6 +1,6 @@
/* net/tipc/socket.h: Include file for TIPC socket code
*
- * Copyright (c) 2014-2015, Ericsson AB
+ * Copyright (c) 2014-2016, Ericsson AB
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -38,10 +38,17 @@
#include <net/sock.h>
#include <net/genetlink.h>
-#define TIPC_CONNACK_INTV 256
-#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2)
-#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \
- SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
+/* Compatibility values for deprecated message based flow control */
+#define FLOWCTL_MSG_WIN 512
+#define FLOWCTL_MSG_LIM ((FLOWCTL_MSG_WIN * 2 + 1) * SKB_TRUESIZE(MAX_MSG_SIZE))
+
+#define FLOWCTL_BLK_SZ 1024
+
+/* Socket receive buffer sizes */
+#define RCVBUF_MIN (FLOWCTL_BLK_SZ * 512)
+#define RCVBUF_DEF (FLOWCTL_BLK_SZ * 1024 * 2)
+#define RCVBUF_MAX (FLOWCTL_BLK_SZ * 1024 * 16)
+
int tipc_socket_init(void);
void tipc_socket_stop(void);
void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);
--
1.9.1
^ permalink raw reply related [flat|nested] 6+ messages in thread
* Re: [PATCH net-next 0/3] tipc: redesign socket-level flow control
2016-05-02 14:22 [PATCH net-next 0/3] tipc: redesign socket-level flow control Jon Maloy
` (2 preceding siblings ...)
2016-05-02 14:22 ` [PATCH net-next 3/3] tipc: redesign connection-level flow control Jon Maloy
@ 2016-05-03 19:51 ` David Miller
3 siblings, 0 replies; 6+ messages in thread
From: David Miller @ 2016-05-03 19:51 UTC (permalink / raw)
To: jon.maloy
Cc: netdev, paul.gortmaker, parthasarathy.bhuvaragan, richard.alpe,
ying.xue, maloy, tipc-discussion
From: Jon Maloy <jon.maloy@ericsson.com>
Date: Mon, 2 May 2016 10:22:33 -0400
> The socket-level flow control in TIPC has long been due for a major
> overhaul. This series fixes this.
Series applied, thanks Jon.
^ permalink raw reply [flat|nested] 6+ messages in thread
end of thread, other threads:[~2016-05-03 19:51 UTC | newest]
Thread overview: 6+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2016-05-02 14:22 [PATCH net-next 0/3] tipc: redesign socket-level flow control Jon Maloy
2016-05-02 14:22 ` [PATCH net-next 1/3] tipc: re-enable compensation for socket receive buffer double counting Jon Maloy
2016-05-02 14:22 ` [PATCH net-next 2/3] tipc: propagate peer node capabilities to socket layer Jon Maloy
2016-05-02 14:22 ` [PATCH net-next 3/3] tipc: redesign connection-level flow control Jon Maloy
2016-05-03 19:51 ` [PATCH net-next 0/3] tipc: redesign socket-level " David Miller
-- strict thread matches above, loose matches on Subject: below --
2016-05-02 15:58 Jon Maloy
2016-05-02 15:58 ` [PATCH net-next 3/3] tipc: redesign connection-level " Jon Maloy
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).