* [PATCH net-next 00/13] tipc: separate link and link aggregation layer
@ 2015-07-16 20:54 Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 01/13] tipc: introduce link entry structure to struct tipc_node Jon Maloy
` (13 more replies)
0 siblings, 14 replies; 15+ messages in thread
From: Jon Maloy @ 2015-07-16 20:54 UTC (permalink / raw)
To: davem
Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
tipc-discussion, Jon Maloy
This is the first batch of a longer series that has two main objectives:
o Finer lock granularity during message sending and reception,
especially regarding usage of the node spinlock.
o Better separation between the link layer implementation and the link
aggregation layer, represented by node.c::struct tipc_node.
Hopefully these changes also make this part of code somewhat easier
to comprehend and maintain.
Jon Maloy (13):
tipc: introduce link entry structure to struct tipc_node
tipc: move link creation from neighbor discoverer to node
tipc: move link input queue to tipc_node
tipc: use bearer index when looking up active links
tipc: change sk_buffer handling in tipc_link_xmit()
tipc: make media xmit call outside node spinlock context
tipc: clean up definitions and usage of link flags
tipc: introduce new link protocol msg create function
tipc: improve link FSM implementation
tipc: simplify link timer implementation
tipc: move link supervision timer to node level
tipc: introduce node contact FSM
tipc: reduce locking scope during packet reception
net/tipc/bcast.c | 31 +-
net/tipc/bcast.h | 1 +
net/tipc/bearer.c | 26 +
net/tipc/bearer.h | 3 +
net/tipc/core.h | 5 +
net/tipc/discover.c | 20 +-
net/tipc/link.c | 1517 ++++++++++++++++++++++++-------------------------
net/tipc/link.h | 74 +--
net/tipc/msg.h | 53 +-
net/tipc/name_distr.c | 6 +-
net/tipc/node.c | 549 ++++++++++++++----
net/tipc/node.h | 93 ++-
net/tipc/socket.c | 71 +--
13 files changed, 1431 insertions(+), 1018 deletions(-)
--
1.9.1
^ permalink raw reply [flat|nested] 15+ messages in thread
* [PATCH net-next 01/13] tipc: introduce link entry structure to struct tipc_node
2015-07-16 20:54 [PATCH net-next 00/13] tipc: separate link and link aggregation layer Jon Maloy
@ 2015-07-16 20:54 ` Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 02/13] tipc: move link creation from neighbor discoverer to node Jon Maloy
` (12 subsequent siblings)
13 siblings, 0 replies; 15+ messages in thread
From: Jon Maloy @ 2015-07-16 20:54 UTC (permalink / raw)
To: davem
Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
tipc-discussion, Jon Maloy
struct 'tipc_node' currently contains two arrays for link attributes,
one for the link pointers, and one for the usable link MTUs.
We now group those into a new struct 'tipc_link_entry', and intoduce
one single array consisting of such enties. Apart from being a cosmetic
improvement, this is a starting point for the strict master-slave
relation between node and link that we will introduce in the following
commits.
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
net/tipc/bcast.c | 2 +-
net/tipc/discover.c | 2 +-
net/tipc/link.c | 60 ++++++++++---------
net/tipc/name_distr.c | 2 +-
net/tipc/node.c | 163 ++++++++++++++++++++++++--------------------------
net/tipc/node.h | 50 +++++++++-------
6 files changed, 143 insertions(+), 136 deletions(-)
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index a816382..59b2f2a 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -413,7 +413,7 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
* all nodes in the cluster don't ACK at the same time
*/
if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) {
- tipc_link_proto_xmit(node->active_links[node->addr & 1],
+ tipc_link_proto_xmit(node_active_link(node, node->addr),
STATE_MSG, 0, 0, 0, 0);
tn->bcl->stats.sent_acks++;
}
diff --git a/net/tipc/discover.c b/net/tipc/discover.c
index 967e292..9334453 100644
--- a/net/tipc/discover.c
+++ b/net/tipc/discover.c
@@ -170,7 +170,7 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
return;
tipc_node_lock(node);
node->capabilities = caps;
- link = node->links[bearer->identity];
+ link = node->links[bearer->identity].link;
/* Prepare to validate requesting node's signature and media address */
sign_match = (signature == node->signature);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index eaa9fe5..03372a7 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -132,9 +132,11 @@ static void tipc_link_put(struct tipc_link *l_ptr)
static struct tipc_link *tipc_parallel_link(struct tipc_link *l)
{
- if (l->owner->active_links[0] != l)
- return l->owner->active_links[0];
- return l->owner->active_links[1];
+ struct tipc_node *n = l->owner;
+
+ if (node_active_link(n, 0) != l)
+ return node_active_link(n, 0);
+ return node_active_link(n, 1);
}
/*
@@ -147,10 +149,11 @@ int tipc_link_is_up(struct tipc_link *l_ptr)
return link_working_working(l_ptr) || link_working_unknown(l_ptr);
}
-int tipc_link_is_active(struct tipc_link *l_ptr)
+int tipc_link_is_active(struct tipc_link *l)
{
- return (l_ptr->owner->active_links[0] == l_ptr) ||
- (l_ptr->owner->active_links[1] == l_ptr);
+ struct tipc_node *n = l->owner;
+
+ return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l);
}
/**
@@ -240,7 +243,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
return NULL;
}
- if (n_ptr->links[b_ptr->identity]) {
+ if (n_ptr->links[b_ptr->identity].link) {
tipc_addr_string_fill(addr_string, n_ptr->addr);
pr_err("Attempt to establish second link on <%s> to %s\n",
b_ptr->name, addr_string);
@@ -321,7 +324,7 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id)
rcu_read_lock();
list_for_each_entry_rcu(node, &tn->node_list, list) {
tipc_node_lock(node);
- link = node->links[bearer_id];
+ link = node->links[bearer_id].link;
if (link)
tipc_link_delete(link);
tipc_node_unlock(node);
@@ -446,7 +449,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
return;
- tipc_node_link_down(l_ptr->owner, l_ptr);
+ tipc_node_link_down(l_ptr->owner, l_ptr->bearer_id);
tipc_bearer_remove_dest(owner->net, l_ptr->bearer_id, l_ptr->addr);
if (was_active_link && tipc_node_is_up(l_ptr->owner) && (pl != l_ptr)) {
@@ -482,7 +485,7 @@ static void link_activate(struct tipc_link *link)
link->rcv_nxt = 1;
link->stats.recv_info = 1;
link->silent_intv_cnt = 0;
- tipc_node_link_up(node, link);
+ tipc_node_link_up(node, link->bearer_id);
tipc_bearer_add_dest(node->net, link->bearer_id, link->addr);
}
@@ -577,7 +580,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
case TRAFFIC_MSG_EVT:
break;
case ACTIVATE_MSG:
- other = l_ptr->owner->active_links[0];
+ other = node_active_link(l_ptr->owner, 0);
if (other && link_working_unknown(other))
break;
l_ptr->state = WORKING_WORKING;
@@ -606,7 +609,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
switch (event) {
case TRAFFIC_MSG_EVT:
case ACTIVATE_MSG:
- other = l_ptr->owner->active_links[0];
+ other = node_active_link(l_ptr->owner, 0);
if (other && link_working_unknown(other))
break;
l_ptr->state = WORKING_WORKING;
@@ -755,7 +758,7 @@ int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
node = tipc_node_find(net, dnode);
if (node) {
tipc_node_lock(node);
- link = node->active_links[selector & 1];
+ link = node_active_link(node, selector & 1);
if (link)
rc = __tipc_link_xmit(net, link, list);
tipc_node_unlock(node);
@@ -858,9 +861,9 @@ void tipc_link_reset_all(struct tipc_node *node)
tipc_addr_string_fill(addr_string, node->addr));
for (i = 0; i < MAX_BEARERS; i++) {
- if (node->links[i]) {
- link_print(node->links[i], "Resetting link\n");
- tipc_link_reset(node->links[i]);
+ if (node->links[i].link) {
+ link_print(node->links[i].link, "Resetting link\n");
+ tipc_link_reset(node->links[i].link);
}
}
@@ -1029,7 +1032,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
tipc_node_lock(n_ptr);
/* Locate unicast link endpoint that should handle message */
- l_ptr = n_ptr->links[b_ptr->identity];
+ l_ptr = n_ptr->links[b_ptr->identity].link;
if (unlikely(!l_ptr))
goto unlock;
@@ -1496,7 +1499,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
struct sk_buff *skb;
u32 length = msg_size(msg);
- tunnel = l_ptr->owner->active_links[selector & 1];
+ tunnel = node_active_link(l_ptr->owner, selector & 1);
if (!tipc_link_is_up(tunnel)) {
pr_warn("%stunnel link no longer available\n", link_co_err);
return;
@@ -1522,7 +1525,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
{
int msgcount;
- struct tipc_link *tunnel = l_ptr->owner->active_links[0];
+ struct tipc_link *tunnel = node_active_link(l_ptr->owner, 0);
struct tipc_msg tunnel_hdr;
struct sk_buff *skb;
int split_bundles;
@@ -1556,8 +1559,8 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
return;
}
- split_bundles = (l_ptr->owner->active_links[0] !=
- l_ptr->owner->active_links[1]);
+ split_bundles = (node_active_link(l_ptr->owner, 0) !=
+ node_active_link(l_ptr->owner, 0));
skb_queue_walk(&l_ptr->transmq, skb) {
struct tipc_msg *msg = buf_msg(skb);
@@ -1660,7 +1663,7 @@ static bool tipc_link_failover_rcv(struct tipc_link *link,
if (bearer_id == link->bearer_id)
goto exit;
- pl = link->owner->links[bearer_id];
+ pl = link->owner->links[bearer_id].link;
if (pl && tipc_link_is_up(pl))
tipc_link_reset(pl);
@@ -1743,7 +1746,7 @@ static struct tipc_node *tipc_link_find_owner(struct net *net,
list_for_each_entry_rcu(n_ptr, &tn->node_list, list) {
tipc_node_lock(n_ptr);
for (i = 0; i < MAX_BEARERS; i++) {
- l_ptr = n_ptr->links[i];
+ l_ptr = n_ptr->links[i].link;
if (l_ptr && !strcmp(l_ptr->name, link_name)) {
*bearer_id = i;
found_node = n_ptr;
@@ -1865,7 +1868,7 @@ int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)
tipc_node_lock(node);
- link = node->links[bearer_id];
+ link = node->links[bearer_id].link;
if (!link) {
res = -EINVAL;
goto out;
@@ -2055,10 +2058,11 @@ static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg,
for (i = *prev_link; i < MAX_BEARERS; i++) {
*prev_link = i;
- if (!node->links[i])
+ if (!node->links[i].link)
continue;
- err = __tipc_nl_add_link(net, msg, node->links[i], NLM_F_MULTI);
+ err = __tipc_nl_add_link(net, msg,
+ node->links[i].link, NLM_F_MULTI);
if (err)
return err;
}
@@ -2172,7 +2176,7 @@ int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info)
return -EINVAL;
tipc_node_lock(node);
- link = node->links[bearer_id];
+ link = node->links[bearer_id].link;
if (!link) {
tipc_node_unlock(node);
nlmsg_free(msg.skb);
@@ -2227,7 +2231,7 @@ int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)
tipc_node_lock(node);
- link = node->links[bearer_id];
+ link = node->links[bearer_id].link;
if (!link) {
tipc_node_unlock(node);
return -EINVAL;
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 41e7b7e..3a1539e 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -96,7 +96,7 @@ void named_cluster_distribute(struct net *net, struct sk_buff *skb)
dnode = node->addr;
if (in_own_node(net, dnode))
continue;
- if (!tipc_node_active_links(node))
+ if (!tipc_node_is_up(node))
continue;
oskb = pskb_copy(skb, GFP_ATOMIC);
if (!oskb)
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 0b1d61a..db46e5d 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -224,126 +224,119 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
*
* Link becomes active (alone or shared) or standby, depending on its priority.
*/
-void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
+void tipc_node_link_up(struct tipc_node *n, int bearer_id)
{
- struct tipc_link **active = &n_ptr->active_links[0];
+ struct tipc_link_entry **actv = &n->active_links[0];
+ struct tipc_link_entry *le = &n->links[bearer_id];
+ struct tipc_link *l = le->link;
- n_ptr->working_links++;
- n_ptr->action_flags |= TIPC_NOTIFY_LINK_UP;
- n_ptr->link_id = l_ptr->peer_bearer_id << 16 | l_ptr->bearer_id;
+ /* Leave room for tunnel header when returning 'mtu' to users: */
+ n->links[bearer_id].mtu = l->mtu - INT_H_SIZE;
+
+ n->working_links++;
+ n->action_flags |= TIPC_NOTIFY_LINK_UP;
+ n->link_id = l->peer_bearer_id << 16 | l->bearer_id;
pr_debug("Established link <%s> on network plane %c\n",
- l_ptr->name, l_ptr->net_plane);
+ l->name, l->net_plane);
- if (!active[0]) {
- active[0] = active[1] = l_ptr;
- node_established_contact(n_ptr);
- goto exit;
+ /* No active links ? => take both active slots */
+ if (!actv[0]) {
+ actv[0] = le;
+ actv[1] = le;
+ node_established_contact(n);
+ return;
}
- if (l_ptr->priority < active[0]->priority) {
- pr_debug("New link <%s> becomes standby\n", l_ptr->name);
- goto exit;
+ if (l->priority < actv[0]->link->priority) {
+ pr_debug("New link <%s> becomes standby\n", l->name);
+ return;
}
- tipc_link_dup_queue_xmit(active[0], l_ptr);
- if (l_ptr->priority == active[0]->priority) {
- active[0] = l_ptr;
- goto exit;
+ tipc_link_dup_queue_xmit(actv[0]->link, l);
+
+ /* Take one active slot if applicable */
+ if (l->priority == actv[0]->link->priority) {
+ actv[0] = le;
+ return;
}
- pr_debug("Old link <%s> becomes standby\n", active[0]->name);
- if (active[1] != active[0])
- pr_debug("Old link <%s> becomes standby\n", active[1]->name);
- active[0] = active[1] = l_ptr;
-exit:
- /* Leave room for changeover header when returning 'mtu' to users: */
- n_ptr->act_mtus[0] = active[0]->mtu - INT_H_SIZE;
- n_ptr->act_mtus[1] = active[1]->mtu - INT_H_SIZE;
+ /* Higher prio than current active? => take both active slots */
+ pr_debug("Old l <%s> becomes standby\n", actv[0]->link->name);
+ if (actv[1] != actv[0])
+ pr_debug("Old link <%s> now standby\n", actv[1]->link->name);
+ actv[0] = le;
+ actv[1] = le;
}
/**
- * node_select_active_links - select active link
+ * node_select_active_links - select which working links should be active
*/
-static void node_select_active_links(struct tipc_node *n_ptr)
+static void node_select_active_links(struct tipc_node *n)
{
- struct tipc_link **active = &n_ptr->active_links[0];
- u32 i;
- u32 highest_prio = 0;
+ struct tipc_link_entry **actv = &n->active_links[0];
+ struct tipc_link *l;
+ u32 b, highest = 0;
- active[0] = active[1] = NULL;
-
- for (i = 0; i < MAX_BEARERS; i++) {
- struct tipc_link *l_ptr = n_ptr->links[i];
+ actv[0] = NULL;
+ actv[1] = NULL;
- if (!l_ptr || !tipc_link_is_up(l_ptr) ||
- (l_ptr->priority < highest_prio))
+ for (b = 0; b < MAX_BEARERS; b++) {
+ l = n->links[b].link;
+ if (!l || !tipc_link_is_up(l) || (l->priority < highest))
+ continue;
+ if (l->priority > highest) {
+ highest = l->priority;
+ actv[0] = &n->links[b];
+ actv[1] = &n->links[b];
continue;
-
- if (l_ptr->priority > highest_prio) {
- highest_prio = l_ptr->priority;
- active[0] = active[1] = l_ptr;
- } else {
- active[1] = l_ptr;
}
+ actv[1] = &n->links[b];
}
}
/**
* tipc_node_link_down - handle loss of link
*/
-void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
+void tipc_node_link_down(struct tipc_node *n, int bearer_id)
{
- struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
- struct tipc_link **active;
+ struct tipc_link_entry **actv = &n->active_links[0];
+ struct tipc_link_entry *le = &n->links[bearer_id];
+ struct tipc_link *l = le->link;
- n_ptr->working_links--;
- n_ptr->action_flags |= TIPC_NOTIFY_LINK_DOWN;
- n_ptr->link_id = l_ptr->peer_bearer_id << 16 | l_ptr->bearer_id;
+ n->working_links--;
+ n->action_flags |= TIPC_NOTIFY_LINK_DOWN;
+ n->link_id = l->peer_bearer_id << 16 | l->bearer_id;
- if (!tipc_link_is_active(l_ptr)) {
+ if (!tipc_link_is_active(l)) {
pr_debug("Lost standby link <%s> on network plane %c\n",
- l_ptr->name, l_ptr->net_plane);
+ l->name, l->net_plane);
return;
}
pr_debug("Lost link <%s> on network plane %c\n",
- l_ptr->name, l_ptr->net_plane);
-
- active = &n_ptr->active_links[0];
- if (active[0] == l_ptr)
- active[0] = active[1];
- if (active[1] == l_ptr)
- active[1] = active[0];
- if (active[0] == l_ptr)
- node_select_active_links(n_ptr);
- if (tipc_node_is_up(n_ptr))
- tipc_link_failover_send_queue(l_ptr);
- else
- node_lost_contact(n_ptr);
+ l->name, l->net_plane);
- /* Leave room for changeover header when returning 'mtu' to users: */
- if (active[0]) {
- n_ptr->act_mtus[0] = active[0]->mtu - INT_H_SIZE;
- n_ptr->act_mtus[1] = active[1]->mtu - INT_H_SIZE;
- return;
- }
- /* Loopback link went down? No fragmentation needed from now on. */
- if (n_ptr->addr == tn->own_addr) {
- n_ptr->act_mtus[0] = MAX_MSG_SIZE;
- n_ptr->act_mtus[1] = MAX_MSG_SIZE;
- }
-}
+ /* Resdistribute active slots if applicable */
+ if (actv[0] == le)
+ actv[0] = actv[1];
+ if (actv[1] == le)
+ actv[1] = actv[0];
-int tipc_node_active_links(struct tipc_node *n_ptr)
-{
- return n_ptr->active_links[0] != NULL;
+ /* Last link of this priority? => select other ones if available */
+ if (actv[0] == le)
+ node_select_active_links(n);
+
+ if (tipc_node_is_up(n))
+ tipc_link_failover_send_queue(l);
+ else
+ node_lost_contact(n);
}
-int tipc_node_is_up(struct tipc_node *n_ptr)
+bool tipc_node_is_up(struct tipc_node *n)
{
- return tipc_node_active_links(n_ptr);
+ return n->active_links[0];
}
void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
{
- n_ptr->links[l_ptr->bearer_id] = l_ptr;
+ n_ptr->links[l_ptr->bearer_id].link = l_ptr;
n_ptr->link_cnt++;
}
@@ -352,9 +345,9 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
int i;
for (i = 0; i < MAX_BEARERS; i++) {
- if (l_ptr != n_ptr->links[i])
+ if (l_ptr != n_ptr->links[i].link)
continue;
- n_ptr->links[i] = NULL;
+ n_ptr->links[i].link = NULL;
n_ptr->link_cnt--;
}
}
@@ -396,7 +389,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
/* Abort any ongoing link failover */
for (i = 0; i < MAX_BEARERS; i++) {
- struct tipc_link *l_ptr = n_ptr->links[i];
+ struct tipc_link *l_ptr = n_ptr->links[i].link;
if (!l_ptr)
continue;
l_ptr->flags &= ~LINK_FAILINGOVER;
@@ -453,7 +446,7 @@ int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr,
goto exit;
tipc_node_lock(node);
- link = node->links[bearer_id];
+ link = node->links[bearer_id].link;
if (link) {
strncpy(linkname, link->name, len);
err = 0;
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 5a834cf..320cea3 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -89,6 +89,11 @@ struct tipc_node_bclink {
bool recv_permitted;
};
+struct tipc_link_entry {
+ struct tipc_link *link;
+ u32 mtu;
+};
+
/**
* struct tipc_node - TIPC node structure
* @addr: network address of node
@@ -98,9 +103,8 @@ struct tipc_node_bclink {
* @hash: links to adjacent nodes in unsorted hash chain
* @inputq: pointer to input queue containing messages for msg event
* @namedq: pointer to name table input queue with name table messages
- * @curr_link: the link holding the node lock, if any
- * @active_links: pointers to active links to node
- * @links: pointers to all links to node
+ * @active_links: pointer into links[] array, identifying which links are active
+ * @links: array containing references to all links to node
* @action_flags: bit mask of different types of node actions
* @bclink: broadcast-related info
* @list: links to adjacent nodes in sorted list of cluster's nodes
@@ -120,9 +124,8 @@ struct tipc_node {
struct hlist_node hash;
struct sk_buff_head *inputq;
struct sk_buff_head *namedq;
- struct tipc_link *active_links[2];
- u32 act_mtus[2];
- struct tipc_link *links[MAX_BEARERS];
+ struct tipc_link_entry *active_links[2];
+ struct tipc_link_entry links[MAX_BEARERS];
int action_flags;
struct tipc_node_bclink bclink;
struct list_head list;
@@ -142,10 +145,9 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr);
void tipc_node_stop(struct net *net);
void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
-void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
-void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
-int tipc_node_active_links(struct tipc_node *n_ptr);
-int tipc_node_is_up(struct tipc_node *n_ptr);
+void tipc_node_link_down(struct tipc_node *n_ptr, int bearer_id);
+void tipc_node_link_up(struct tipc_node *n_ptr, int bearer_id);
+bool tipc_node_is_up(struct tipc_node *n);
int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node,
char *linkname, size_t len);
void tipc_node_unlock(struct tipc_node *node);
@@ -165,20 +167,28 @@ static inline bool tipc_node_blocked(struct tipc_node *node)
TIPC_NOTIFY_NODE_DOWN | TIPC_WAIT_OWN_LINKS_DOWN));
}
-static inline uint tipc_node_get_mtu(struct net *net, u32 addr, u32 selector)
+static inline struct tipc_link *node_active_link(struct tipc_node *n, int sel)
{
- struct tipc_node *node;
- u32 mtu;
+ struct tipc_link_entry *le = n->active_links[sel & 1];
- node = tipc_node_find(net, addr);
+ if (likely(le))
+ return le->link;
+ return NULL;
+}
- if (likely(node)) {
- mtu = node->act_mtus[selector & 1];
- tipc_node_put(node);
- } else {
- mtu = MAX_MSG_SIZE;
- }
+static inline uint tipc_node_get_mtu(struct net *net, u32 addr, u32 selector)
+{
+ struct tipc_node *n;
+ struct tipc_link_entry *le;
+ unsigned int mtu = MAX_MSG_SIZE;
+ n = tipc_node_find(net, addr);
+ if (unlikely(!n))
+ return mtu;
+ le = n->active_links[selector & 1];
+ if (likely(le))
+ mtu = le->mtu;
+ tipc_node_put(n);
return mtu;
}
--
1.9.1
^ permalink raw reply related [flat|nested] 15+ messages in thread
* [PATCH net-next 02/13] tipc: move link creation from neighbor discoverer to node
2015-07-16 20:54 [PATCH net-next 00/13] tipc: separate link and link aggregation layer Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 01/13] tipc: introduce link entry structure to struct tipc_node Jon Maloy
@ 2015-07-16 20:54 ` Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 03/13] tipc: move link input queue to tipc_node Jon Maloy
` (11 subsequent siblings)
13 siblings, 0 replies; 15+ messages in thread
From: Jon Maloy @ 2015-07-16 20:54 UTC (permalink / raw)
To: davem
Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
tipc-discussion, Jon Maloy
As a step towards turning links into node internal entities, we move the
creation of links from the neighbor discovery logics to the node's link
control logics.
We also create an additional entry for the link's media address in the
newly introduced struct tipc_link_entry, since this is where it is
needed in the upcoming commits. The current copy in struct tipc_link
is kept for now, but will be removed later.
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
net/tipc/discover.c | 20 ++++----------------
net/tipc/node.c | 27 +++++++++++++++++++++++++++
net/tipc/node.h | 6 ++++++
3 files changed, 37 insertions(+), 16 deletions(-)
diff --git a/net/tipc/discover.c b/net/tipc/discover.c
index 9334453..164d089 100644
--- a/net/tipc/discover.c
+++ b/net/tipc/discover.c
@@ -35,7 +35,7 @@
*/
#include "core.h"
-#include "link.h"
+#include "node.h"
#include "discover.h"
/* min delay during bearer start up */
@@ -125,7 +125,6 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_node *node;
- struct tipc_link *link;
struct tipc_media_addr maddr;
struct sk_buff *rbuf;
struct tipc_msg *msg = buf_msg(buf);
@@ -170,13 +169,10 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
return;
tipc_node_lock(node);
node->capabilities = caps;
- link = node->links[bearer->identity].link;
/* Prepare to validate requesting node's signature and media address */
sign_match = (signature == node->signature);
- addr_match = link && !memcmp(&link->media_addr, &maddr, sizeof(maddr));
- link_up = link && tipc_link_is_up(link);
-
+ tipc_node_check_dest(node, bearer, &link_up, &addr_match, &maddr);
/* These three flags give us eight permutations: */
@@ -239,16 +235,8 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
if (accept_sign)
node->signature = signature;
- if (accept_addr) {
- if (!link)
- link = tipc_link_create(node, bearer, &maddr);
- if (link) {
- memcpy(&link->media_addr, &maddr, sizeof(maddr));
- tipc_link_reset(link);
- } else {
- respond = false;
- }
- }
+ if (accept_addr && !tipc_node_update_dest(node, bearer, &maddr))
+ respond = false;
/* Send response, if necessary */
if (respond && (mtyp == DSC_REQ_MSG)) {
diff --git a/net/tipc/node.c b/net/tipc/node.c
index db46e5d..06f642a 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -334,6 +334,33 @@ bool tipc_node_is_up(struct tipc_node *n)
return n->active_links[0];
}
+void tipc_node_check_dest(struct tipc_node *n, struct tipc_bearer *b,
+ bool *link_up, bool *addr_match,
+ struct tipc_media_addr *maddr)
+{
+ struct tipc_link *l = n->links[b->identity].link;
+ struct tipc_media_addr *curr = &n->links[b->identity].maddr;
+
+ *link_up = l && tipc_link_is_up(l);
+ *addr_match = l && !memcmp(curr, maddr, sizeof(*maddr));
+}
+
+bool tipc_node_update_dest(struct tipc_node *n, struct tipc_bearer *b,
+ struct tipc_media_addr *maddr)
+{
+ struct tipc_link *l = n->links[b->identity].link;
+ struct tipc_media_addr *curr = &n->links[b->identity].maddr;
+
+ if (!l)
+ l = tipc_link_create(n, b, maddr);
+ if (!l)
+ return false;
+ memcpy(&l->media_addr, maddr, sizeof(*maddr));
+ memcpy(curr, maddr, sizeof(*maddr));
+ tipc_link_reset(l);
+ return true;
+}
+
void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
{
n_ptr->links[l_ptr->bearer_id].link = l_ptr;
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 320cea3..68579c7 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -92,6 +92,7 @@ struct tipc_node_bclink {
struct tipc_link_entry {
struct tipc_link *link;
u32 mtu;
+ struct tipc_media_addr maddr;
};
/**
@@ -143,6 +144,11 @@ struct tipc_node *tipc_node_find(struct net *net, u32 addr);
void tipc_node_put(struct tipc_node *node);
struct tipc_node *tipc_node_create(struct net *net, u32 addr);
void tipc_node_stop(struct net *net);
+void tipc_node_check_dest(struct tipc_node *n, struct tipc_bearer *bearer,
+ bool *link_up, bool *addr_match,
+ struct tipc_media_addr *maddr);
+bool tipc_node_update_dest(struct tipc_node *n, struct tipc_bearer *bearer,
+ struct tipc_media_addr *maddr);
void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
void tipc_node_link_down(struct tipc_node *n_ptr, int bearer_id);
--
1.9.1
^ permalink raw reply related [flat|nested] 15+ messages in thread
* [PATCH net-next 03/13] tipc: move link input queue to tipc_node
2015-07-16 20:54 [PATCH net-next 00/13] tipc: separate link and link aggregation layer Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 01/13] tipc: introduce link entry structure to struct tipc_node Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 02/13] tipc: move link creation from neighbor discoverer to node Jon Maloy
@ 2015-07-16 20:54 ` Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 04/13] tipc: use bearer index when looking up active links Jon Maloy
` (10 subsequent siblings)
13 siblings, 0 replies; 15+ messages in thread
From: Jon Maloy @ 2015-07-16 20:54 UTC (permalink / raw)
To: davem
Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
tipc-discussion, Jon Maloy
At present, the link input queue and the name distributor receive
queues are fields aggregated in struct tipc_link. This is a hazard,
because a link might be deleted while a receiving socket still keeps
reference to one of the queues.
This commit fixes this bug. However, rather than adding yet another
reference counter to the critical data path, we move the two queues
to safe ground inside struct tipc_node, which is already protected, and
let the link code only handle references to the queues. This is also
in line with planned later changes in this area.
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
net/tipc/link.c | 27 +++++++++++++++------------
net/tipc/link.h | 12 +++++++-----
net/tipc/node.c | 4 +++-
net/tipc/node.h | 3 ++-
4 files changed, 27 insertions(+), 19 deletions(-)
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 03372a7..f8e0e2c 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -227,7 +227,9 @@ static void link_set_timer(struct tipc_link *link, unsigned long time)
*/
struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
struct tipc_bearer *b_ptr,
- const struct tipc_media_addr *media_addr)
+ const struct tipc_media_addr *media_addr,
+ struct sk_buff_head *inputq,
+ struct sk_buff_head *namedq)
{
struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
struct tipc_link *l_ptr;
@@ -289,8 +291,9 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
__skb_queue_head_init(&l_ptr->backlogq);
__skb_queue_head_init(&l_ptr->deferdq);
skb_queue_head_init(&l_ptr->wakeupq);
- skb_queue_head_init(&l_ptr->inputq);
- skb_queue_head_init(&l_ptr->namedq);
+ l_ptr->inputq = inputq;
+ l_ptr->namedq = namedq;
+ skb_queue_head_init(l_ptr->inputq);
link_reset_statistics(l_ptr);
tipc_node_attach_link(n_ptr, l_ptr);
setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr);
@@ -391,8 +394,8 @@ void link_prepare_wakeup(struct tipc_link *l)
if ((pnd[imp] + l->backlog[imp].len) >= lim)
break;
skb_unlink(skb, &l->wakeupq);
- skb_queue_tail(&l->inputq, skb);
- l->owner->inputq = &l->inputq;
+ skb_queue_tail(l->inputq, skb);
+ l->owner->inputq = l->inputq;
l->owner->action_flags |= TIPC_MSG_EVT;
}
}
@@ -465,7 +468,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
__skb_queue_purge(&l_ptr->transmq);
__skb_queue_purge(&l_ptr->deferdq);
if (!owner->inputq)
- owner->inputq = &l_ptr->inputq;
+ owner->inputq = l_ptr->inputq;
skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);
if (!skb_queue_empty(owner->inputq))
owner->action_flags |= TIPC_MSG_EVT;
@@ -962,7 +965,7 @@ static bool link_synch(struct tipc_link *l)
/* Is it still in the input queue ? */
post_synch = mod(pl->rcv_nxt - l->synch_point) - 1;
- if (skb_queue_len(&pl->inputq) > post_synch)
+ if (skb_queue_len(pl->inputq) > post_synch)
return false;
synched:
l->flags &= ~LINK_SYNCHING;
@@ -1141,16 +1144,16 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)
case TIPC_HIGH_IMPORTANCE:
case TIPC_CRITICAL_IMPORTANCE:
case CONN_MANAGER:
- if (tipc_skb_queue_tail(&link->inputq, skb, dport)) {
- node->inputq = &link->inputq;
+ if (tipc_skb_queue_tail(link->inputq, skb, dport)) {
+ node->inputq = link->inputq;
node->action_flags |= TIPC_MSG_EVT;
}
return true;
case NAME_DISTRIBUTOR:
node->bclink.recv_permitted = true;
- node->namedq = &link->namedq;
- skb_queue_tail(&link->namedq, skb);
- if (skb_queue_len(&link->namedq) == 1)
+ node->namedq = link->namedq;
+ skb_queue_tail(link->namedq, skb);
+ if (skb_queue_len(link->namedq) == 1)
node->action_flags |= TIPC_NAMED_MSG_EVT;
return true;
case MSG_BUNDLER:
diff --git a/net/tipc/link.h b/net/tipc/link.h
index ae0a0ea..9c71d9e 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -192,8 +192,8 @@ struct tipc_link {
u16 rcv_nxt;
u32 rcv_unacked;
struct sk_buff_head deferdq;
- struct sk_buff_head inputq;
- struct sk_buff_head namedq;
+ struct sk_buff_head *inputq;
+ struct sk_buff_head *namedq;
/* Congestion handling */
struct sk_buff_head wakeupq;
@@ -207,9 +207,11 @@ struct tipc_link {
struct tipc_port;
-struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
- struct tipc_bearer *b_ptr,
- const struct tipc_media_addr *media_addr);
+struct tipc_link *tipc_link_create(struct tipc_node *n,
+ struct tipc_bearer *b,
+ const struct tipc_media_addr *maddr,
+ struct sk_buff_head *inputq,
+ struct sk_buff_head *namedq);
void tipc_link_delete(struct tipc_link *link);
void tipc_link_delete_list(struct net *net, unsigned int bearer_id);
void tipc_link_failover_send_queue(struct tipc_link *l_ptr);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 06f642a..20ec61c 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -132,6 +132,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)
INIT_LIST_HEAD(&n_ptr->list);
INIT_LIST_HEAD(&n_ptr->publ_list);
INIT_LIST_HEAD(&n_ptr->conn_sks);
+ skb_queue_head_init(&n_ptr->bclink.namedq);
__skb_queue_head_init(&n_ptr->bclink.deferdq);
hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
@@ -350,9 +351,10 @@ bool tipc_node_update_dest(struct tipc_node *n, struct tipc_bearer *b,
{
struct tipc_link *l = n->links[b->identity].link;
struct tipc_media_addr *curr = &n->links[b->identity].maddr;
+ struct sk_buff_head *inputq = &n->links[b->identity].inputq;
if (!l)
- l = tipc_link_create(n, b, maddr);
+ l = tipc_link_create(n, b, maddr, inputq, &n->bclink.namedq);
if (!l)
return false;
memcpy(&l->media_addr, maddr, sizeof(*maddr));
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 68579c7..0657cbf 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -85,13 +85,14 @@ struct tipc_node_bclink {
u32 deferred_size;
struct sk_buff_head deferdq;
struct sk_buff *reasm_buf;
- int inputq_map;
+ struct sk_buff_head namedq;
bool recv_permitted;
};
struct tipc_link_entry {
struct tipc_link *link;
u32 mtu;
+ struct sk_buff_head inputq;
struct tipc_media_addr maddr;
};
--
1.9.1
^ permalink raw reply related [flat|nested] 15+ messages in thread
* [PATCH net-next 04/13] tipc: use bearer index when looking up active links
2015-07-16 20:54 [PATCH net-next 00/13] tipc: separate link and link aggregation layer Jon Maloy
` (2 preceding siblings ...)
2015-07-16 20:54 ` [PATCH net-next 03/13] tipc: move link input queue to tipc_node Jon Maloy
@ 2015-07-16 20:54 ` Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 05/13] tipc: change sk_buffer handling in tipc_link_xmit() Jon Maloy
` (9 subsequent siblings)
13 siblings, 0 replies; 15+ messages in thread
From: Jon Maloy @ 2015-07-16 20:54 UTC (permalink / raw)
To: davem
Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
tipc-discussion, Jon Maloy
struct tipc_node currently holds two arrays of link pointers; one,
indexed by bearer identity, which contains all links irrespective of
current state, and one two-slot array for the currently active link
or links. The latter array contains direct pointers into the elements
of the former. This has the effect that we cannot know the bearer id of
a link when accessing it via the "active_links[]" array without actually
dereferencing the pointer, something we want to avoid in some cases.
In this commit, we do instead store the bearer identity in the
"active_links" array, and use this as an index to find the right element
in the overall link entry array. This change should be seen as a
preparation for the later commits in this series.
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
net/tipc/node.c | 106 +++++++++++++++++++++++---------------------------------
net/tipc/node.h | 26 ++++++++------
2 files changed, 59 insertions(+), 73 deletions(-)
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 20ec61c..1972964 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -142,6 +142,8 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)
list_add_tail_rcu(&n_ptr->list, &temp_node->list);
n_ptr->action_flags = TIPC_WAIT_PEER_LINKS_DOWN;
n_ptr->signature = INVALID_NODE_SIG;
+ n_ptr->active_links[0] = INVALID_BEARER_ID;
+ n_ptr->active_links[1] = INVALID_BEARER_ID;
tipc_node_get(n_ptr);
exit:
spin_unlock_bh(&tn->node_list_lock);
@@ -227,12 +229,13 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
*/
void tipc_node_link_up(struct tipc_node *n, int bearer_id)
{
- struct tipc_link_entry **actv = &n->active_links[0];
- struct tipc_link_entry *le = &n->links[bearer_id];
- struct tipc_link *l = le->link;
+ int *slot0 = &n->active_links[0];
+ int *slot1 = &n->active_links[1];
+ struct tipc_link_entry *links = n->links;
+ struct tipc_link *l = n->links[bearer_id].link;
/* Leave room for tunnel header when returning 'mtu' to users: */
- n->links[bearer_id].mtu = l->mtu - INT_H_SIZE;
+ links[bearer_id].mtu = l->mtu - INT_H_SIZE;
n->working_links++;
n->action_flags |= TIPC_NOTIFY_LINK_UP;
@@ -242,55 +245,30 @@ void tipc_node_link_up(struct tipc_node *n, int bearer_id)
l->name, l->net_plane);
/* No active links ? => take both active slots */
- if (!actv[0]) {
- actv[0] = le;
- actv[1] = le;
+ if (*slot0 < 0) {
+ *slot0 = bearer_id;
+ *slot1 = bearer_id;
node_established_contact(n);
return;
}
- if (l->priority < actv[0]->link->priority) {
+
+ /* Lower prio than current active ? => no slot */
+ if (l->priority < links[*slot0].link->priority) {
pr_debug("New link <%s> becomes standby\n", l->name);
return;
}
- tipc_link_dup_queue_xmit(actv[0]->link, l);
+ tipc_link_dup_queue_xmit(links[*slot0].link, l);
- /* Take one active slot if applicable */
- if (l->priority == actv[0]->link->priority) {
- actv[0] = le;
+ /* Same prio as current active ? => take one slot */
+ if (l->priority == links[*slot0].link->priority) {
+ *slot0 = bearer_id;
return;
}
- /* Higher prio than current active? => take both active slots */
- pr_debug("Old l <%s> becomes standby\n", actv[0]->link->name);
- if (actv[1] != actv[0])
- pr_debug("Old link <%s> now standby\n", actv[1]->link->name);
- actv[0] = le;
- actv[1] = le;
-}
-
-/**
- * node_select_active_links - select which working links should be active
- */
-static void node_select_active_links(struct tipc_node *n)
-{
- struct tipc_link_entry **actv = &n->active_links[0];
- struct tipc_link *l;
- u32 b, highest = 0;
- actv[0] = NULL;
- actv[1] = NULL;
-
- for (b = 0; b < MAX_BEARERS; b++) {
- l = n->links[b].link;
- if (!l || !tipc_link_is_up(l) || (l->priority < highest))
- continue;
- if (l->priority > highest) {
- highest = l->priority;
- actv[0] = &n->links[b];
- actv[1] = &n->links[b];
- continue;
- }
- actv[1] = &n->links[b];
- }
+ /* Higher prio than current active => take both active slots */
+ pr_debug("Old link <%s> now standby\n", links[*slot0].link->name);
+ *slot0 = bearer_id;
+ *slot1 = bearer_id;
}
/**
@@ -298,32 +276,36 @@ static void node_select_active_links(struct tipc_node *n)
*/
void tipc_node_link_down(struct tipc_node *n, int bearer_id)
{
- struct tipc_link_entry **actv = &n->active_links[0];
- struct tipc_link_entry *le = &n->links[bearer_id];
- struct tipc_link *l = le->link;
+ int *slot0 = &n->active_links[0];
+ int *slot1 = &n->active_links[1];
+ int i, highest = 0;
+ struct tipc_link *l, *_l;
+ l = n->links[bearer_id].link;
n->working_links--;
n->action_flags |= TIPC_NOTIFY_LINK_DOWN;
n->link_id = l->peer_bearer_id << 16 | l->bearer_id;
- if (!tipc_link_is_active(l)) {
- pr_debug("Lost standby link <%s> on network plane %c\n",
- l->name, l->net_plane);
- return;
- }
pr_debug("Lost link <%s> on network plane %c\n",
l->name, l->net_plane);
- /* Resdistribute active slots if applicable */
- if (actv[0] == le)
- actv[0] = actv[1];
- if (actv[1] == le)
- actv[1] = actv[0];
-
- /* Last link of this priority? => select other ones if available */
- if (actv[0] == le)
- node_select_active_links(n);
-
+ /* Select new active link if any available */
+ *slot0 = INVALID_BEARER_ID;
+ *slot1 = INVALID_BEARER_ID;
+ for (i = 0; i < MAX_BEARERS; i++) {
+ _l = n->links[i].link;
+ if (!_l || !tipc_link_is_up(_l))
+ continue;
+ if (_l->priority < highest)
+ continue;
+ if (_l->priority > highest) {
+ highest = _l->priority;
+ *slot0 = i;
+ *slot1 = i;
+ continue;
+ }
+ *slot1 = i;
+ }
if (tipc_node_is_up(n))
tipc_link_failover_send_queue(l);
else
@@ -332,7 +314,7 @@ void tipc_node_link_down(struct tipc_node *n, int bearer_id)
bool tipc_node_is_up(struct tipc_node *n)
{
- return n->active_links[0];
+ return n->active_links[0] != INVALID_BEARER_ID;
}
void tipc_node_check_dest(struct tipc_node *n, struct tipc_bearer *b,
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 0657cbf..74f278a 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -45,6 +45,8 @@
/* Out-of-range value for node signature */
#define INVALID_NODE_SIG 0x10000
+#define INVALID_BEARER_ID -1
+
/* Flags used to take different actions according to flag type
* TIPC_WAIT_PEER_LINKS_DOWN: wait to see that peer's links are down
* TIPC_WAIT_OWN_LINKS_DOWN: wait until peer node is declared down
@@ -105,7 +107,7 @@ struct tipc_link_entry {
* @hash: links to adjacent nodes in unsorted hash chain
* @inputq: pointer to input queue containing messages for msg event
* @namedq: pointer to name table input queue with name table messages
- * @active_links: pointer into links[] array, identifying which links are active
+ * @active_links: bearer ids of active links, used as index into links[] array
* @links: array containing references to all links to node
* @action_flags: bit mask of different types of node actions
* @bclink: broadcast-related info
@@ -126,7 +128,7 @@ struct tipc_node {
struct hlist_node hash;
struct sk_buff_head *inputq;
struct sk_buff_head *namedq;
- struct tipc_link_entry *active_links[2];
+ int active_links[2];
struct tipc_link_entry links[MAX_BEARERS];
int action_flags;
struct tipc_node_bclink bclink;
@@ -176,25 +178,27 @@ static inline bool tipc_node_blocked(struct tipc_node *node)
static inline struct tipc_link *node_active_link(struct tipc_node *n, int sel)
{
- struct tipc_link_entry *le = n->active_links[sel & 1];
+ int bearer_id = n->active_links[sel & 1];
+
+ if (unlikely(bearer_id == INVALID_BEARER_ID))
+ return NULL;
- if (likely(le))
- return le->link;
- return NULL;
+ return n->links[bearer_id].link;
}
-static inline uint tipc_node_get_mtu(struct net *net, u32 addr, u32 selector)
+static inline unsigned int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel)
{
struct tipc_node *n;
- struct tipc_link_entry *le;
+ int bearer_id;
unsigned int mtu = MAX_MSG_SIZE;
n = tipc_node_find(net, addr);
if (unlikely(!n))
return mtu;
- le = n->active_links[selector & 1];
- if (likely(le))
- mtu = le->mtu;
+
+ bearer_id = n->active_links[sel & 1];
+ if (likely(bearer_id != INVALID_BEARER_ID))
+ mtu = n->links[bearer_id].mtu;
tipc_node_put(n);
return mtu;
}
--
1.9.1
^ permalink raw reply related [flat|nested] 15+ messages in thread
* [PATCH net-next 05/13] tipc: change sk_buffer handling in tipc_link_xmit()
2015-07-16 20:54 [PATCH net-next 00/13] tipc: separate link and link aggregation layer Jon Maloy
` (3 preceding siblings ...)
2015-07-16 20:54 ` [PATCH net-next 04/13] tipc: use bearer index when looking up active links Jon Maloy
@ 2015-07-16 20:54 ` Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 06/13] tipc: make media xmit call outside node spinlock context Jon Maloy
` (8 subsequent siblings)
13 siblings, 0 replies; 15+ messages in thread
From: Jon Maloy @ 2015-07-16 20:54 UTC (permalink / raw)
To: davem
Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
tipc-discussion, Jon Maloy
When the function tipc_link_xmit() is given a buffer list for
transmission, it currently consumes the list both when transmission
is successful and when it fails, except for the special case when
it encounters link congestion.
This behavior is inconsistent, and needs to be corrected if we want
to avoid problems in later commits in this series.
In this commit, we change this to let the function consume the list
only when transmission is successful, and leave the list with the
sender in all other cases. We also modifiy the socket code so that
it adapts to this change, i.e., purges the list when a non-congestion
error code is returned.
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
net/tipc/bcast.c | 5 ++---
net/tipc/link.c | 23 +++++++++--------------
net/tipc/socket.c | 49 ++++++++++++++++++++++++++-----------------------
3 files changed, 37 insertions(+), 40 deletions(-)
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 59b2f2a..295bdc2 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -358,10 +358,9 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
/* Prepare clone of message for local node */
skb = tipc_msg_reassemble(list);
- if (unlikely(!skb)) {
- __skb_queue_purge(list);
+ if (unlikely(!skb))
return -EHOSTUNREACH;
- }
+
/* Broadcast to all nodes */
if (likely(bclink)) {
tipc_bclink_lock(net);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index f8e0e2c..ea32679 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -340,7 +340,7 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id)
* @link: congested link
* @list: message that was attempted sent
* Create pseudo msg to send back to user when congestion abates
- * Only consumes message if there is an error
+ * Does not consume buffer list
*/
static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
{
@@ -354,7 +354,7 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
tipc_link_reset(link);
- goto err;
+ return -ENOBUFS;
}
/* Non-blocking sender: */
if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
@@ -364,15 +364,12 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
addr, addr, oport, 0, 0);
if (!skb)
- goto err;
+ return -ENOBUFS;
TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);
TIPC_SKB_CB(skb)->chain_imp = imp;
skb_queue_tail(&link->wakeupq, skb);
link->stats.link_congs++;
return -ELINKCONG;
-err:
- __skb_queue_purge(list);
- return -ENOBUFS;
}
/**
@@ -641,8 +638,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
* @link: link to use
* @list: chain of buffers containing message
*
- * Consumes the buffer chain, except when returning -ELINKCONG,
- * since the caller then may want to make more send attempts.
+ * Consumes the buffer chain, except when returning an error code,
* Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
* Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
*/
@@ -666,10 +662,9 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
if (unlikely(link->backlog[i].len >= link->backlog[i].limit))
return link_schedule_user(link, list);
}
- if (unlikely(msg_size(msg) > mtu)) {
- __skb_queue_purge(list);
+ if (unlikely(msg_size(msg) > mtu))
return -EMSGSIZE;
- }
+
/* Prepare each packet for sending, and add to relevant queue: */
while (skb_queue_len(list)) {
skb = skb_peek(list);
@@ -722,7 +717,7 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
/* tipc_link_xmit_skb(): send single buffer to destination
* Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
- * messages, which will not be rejected
+ * messages, which will not cause link congestion
* The only exception is datagram messages rerouted after secondary
* lookup, which are rare and safe to dispose of anyway.
* TODO: Return real return value, and let callers use
@@ -736,7 +731,7 @@ int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
skb2list(skb, &head);
rc = tipc_link_xmit(net, &head, dnode, selector);
- if (rc == -ELINKCONG)
+ if (rc)
kfree_skb(skb);
return 0;
}
@@ -748,7 +743,7 @@ int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
* @dsz: amount of user data to be sent
* @dnode: address of destination node
* @selector: a number used for deterministic link selection
- * Consumes the buffer chain, except when returning -ELINKCONG
+ * Consumes the buffer chain, except when returning error
* Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
*/
int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 3a7567f..87fef25 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -686,21 +686,22 @@ new_mtu:
do {
rc = tipc_bclink_xmit(net, pktchain);
- if (likely(rc >= 0)) {
- rc = dsz;
- break;
+ if (likely(!rc))
+ return dsz;
+
+ if (rc == -ELINKCONG) {
+ tsk->link_cong = 1;
+ rc = tipc_wait_for_sndmsg(sock, &timeo);
+ if (!rc)
+ continue;
}
+ __skb_queue_purge(pktchain);
if (rc == -EMSGSIZE) {
msg->msg_iter = save;
goto new_mtu;
}
- if (rc != -ELINKCONG)
- break;
- tipc_sk(sk)->link_cong = 1;
- rc = tipc_wait_for_sndmsg(sock, &timeo);
- if (rc)
- __skb_queue_purge(pktchain);
- } while (!rc);
+ break;
+ } while (1);
return rc;
}
@@ -925,23 +926,24 @@ new_mtu:
skb = skb_peek(pktchain);
TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid);
- if (likely(rc >= 0)) {
+ if (likely(!rc)) {
if (sock->state != SS_READY)
sock->state = SS_CONNECTING;
- rc = dsz;
- break;
+ return dsz;
}
+ if (rc == -ELINKCONG) {
+ tsk->link_cong = 1;
+ rc = tipc_wait_for_sndmsg(sock, &timeo);
+ if (!rc)
+ continue;
+ }
+ __skb_queue_purge(pktchain);
if (rc == -EMSGSIZE) {
m->msg_iter = save;
goto new_mtu;
}
- if (rc != -ELINKCONG)
- break;
- tsk->link_cong = 1;
- rc = tipc_wait_for_sndmsg(sock, &timeo);
- if (rc)
- __skb_queue_purge(pktchain);
- } while (!rc);
+ break;
+ } while (1);
return rc;
}
@@ -1048,10 +1050,11 @@ next:
tsk->sent_unacked++;
sent += send;
if (sent == dsz)
- break;
+ return dsz;
goto next;
}
if (rc == -EMSGSIZE) {
+ __skb_queue_purge(pktchain);
tsk->max_pkt = tipc_node_get_mtu(net, dnode,
portid);
m->msg_iter = save;
@@ -1059,13 +1062,13 @@ next:
}
if (rc != -ELINKCONG)
break;
+
tsk->link_cong = 1;
}
rc = tipc_wait_for_sndpkt(sock, &timeo);
- if (rc)
- __skb_queue_purge(pktchain);
} while (!rc);
+ __skb_queue_purge(pktchain);
return sent ? sent : rc;
}
--
1.9.1
^ permalink raw reply related [flat|nested] 15+ messages in thread
* [PATCH net-next 06/13] tipc: make media xmit call outside node spinlock context
2015-07-16 20:54 [PATCH net-next 00/13] tipc: separate link and link aggregation layer Jon Maloy
` (4 preceding siblings ...)
2015-07-16 20:54 ` [PATCH net-next 05/13] tipc: change sk_buffer handling in tipc_link_xmit() Jon Maloy
@ 2015-07-16 20:54 ` Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 07/13] tipc: clean up definitions and usage of link flags Jon Maloy
` (7 subsequent siblings)
13 siblings, 0 replies; 15+ messages in thread
From: Jon Maloy @ 2015-07-16 20:54 UTC (permalink / raw)
To: davem
Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
tipc-discussion, Jon Maloy
Currently, message sending is performed through a deep call chain,
where the node spinlock is grabbed and held during a significant
part of the transmission time. This is clearly detrimental to
overall throughput performance; it would be better if we could send
the message after the spinlock has been released.
In this commit, we do instead let the call revert on the stack after
the buffer chain has been added to the transmission queue, whereafter
clones of the buffers are transmitted to the device layer outside the
spinlock scope.
As a further step in our effort to separate the roles of the node
and link entities we also move the function tipc_link_xmit() to
node.c, and rename it to tipc_node_xmit().
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
net/tipc/bearer.c | 26 ++++++++++
net/tipc/bearer.h | 3 ++
net/tipc/link.c | 132 +++++++++++++++++++++++++++-----------------------
net/tipc/link.h | 6 +--
net/tipc/name_distr.c | 4 +-
net/tipc/node.c | 78 +++++++++++++++++++++++++++++
net/tipc/node.h | 4 ++
net/tipc/socket.c | 22 ++++-----
8 files changed, 198 insertions(+), 77 deletions(-)
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 00bc0e6..eae58a6 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -470,6 +470,32 @@ void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
rcu_read_unlock();
}
+/* tipc_bearer_xmit() -send buffer to destination over bearer
+ */
+void tipc_bearer_xmit(struct net *net, u32 bearer_id,
+ struct sk_buff_head *xmitq,
+ struct tipc_media_addr *dst)
+{
+ struct tipc_net *tn = net_generic(net, tipc_net_id);
+ struct tipc_bearer *b;
+ struct sk_buff *skb, *tmp;
+
+ if (skb_queue_empty(xmitq))
+ return;
+
+ rcu_read_lock();
+ b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
+ if (likely(b)) {
+ skb_queue_walk_safe(xmitq, skb, tmp) {
+ __skb_dequeue(xmitq);
+ b->media->send_msg(net, skb, b, dst);
+ /* Until we remove cloning in tipc_l2_send_msg(): */
+ kfree_skb(skb);
+ }
+ }
+ rcu_read_unlock();
+}
+
/**
* tipc_l2_rcv_msg - handle incoming TIPC message from an interface
* @buf: the received packet
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index dc714d9..6426f24 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -217,5 +217,8 @@ void tipc_bearer_cleanup(void);
void tipc_bearer_stop(struct net *net);
void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
struct tipc_media_addr *dest);
+void tipc_bearer_xmit(struct net *net, u32 bearer_id,
+ struct sk_buff_head *xmitq,
+ struct tipc_media_addr *dst);
#endif /* _TIPC_BEARER_H */
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ea32679..c052437 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -353,7 +353,6 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
/* This really cannot happen... */
if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
- tipc_link_reset(link);
return -ENOBUFS;
}
/* Non-blocking sender: */
@@ -701,6 +700,78 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
return 0;
}
+/**
+ * tipc_link_xmit(): enqueue buffer list according to queue situation
+ * @link: link to use
+ * @list: chain of buffers containing message
+ * @xmitq: returned list of packets to be sent by caller
+ *
+ * Consumes the buffer chain, except when returning -ELINKCONG,
+ * since the caller then may want to make more send attempts.
+ * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
+ * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
+ */
+int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
+ struct sk_buff_head *xmitq)
+{
+ struct tipc_msg *hdr = buf_msg(skb_peek(list));
+ unsigned int maxwin = l->window;
+ unsigned int i, imp = msg_importance(hdr);
+ unsigned int mtu = l->mtu;
+ u16 ack = l->rcv_nxt - 1;
+ u16 seqno = l->snd_nxt;
+ u16 bc_last_in = l->owner->bclink.last_in;
+ struct sk_buff_head *transmq = &l->transmq;
+ struct sk_buff_head *backlogq = &l->backlogq;
+ struct sk_buff *skb, *_skb, *bskb;
+
+ /* Match msg importance against this and all higher backlog limits: */
+ for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
+ if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
+ return link_schedule_user(l, list);
+ }
+ if (unlikely(msg_size(hdr) > mtu))
+ return -EMSGSIZE;
+
+ /* Prepare each packet for sending, and add to relevant queue: */
+ while (skb_queue_len(list)) {
+ skb = skb_peek(list);
+ hdr = buf_msg(skb);
+ msg_set_seqno(hdr, seqno);
+ msg_set_ack(hdr, ack);
+ msg_set_bcast_ack(hdr, bc_last_in);
+
+ if (likely(skb_queue_len(transmq) < maxwin)) {
+ _skb = skb_clone(skb, GFP_ATOMIC);
+ if (!_skb)
+ return -ENOBUFS;
+ __skb_dequeue(list);
+ __skb_queue_tail(transmq, skb);
+ __skb_queue_tail(xmitq, _skb);
+ l->rcv_unacked = 0;
+ seqno++;
+ continue;
+ }
+ if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) {
+ kfree_skb(__skb_dequeue(list));
+ l->stats.sent_bundled++;
+ continue;
+ }
+ if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) {
+ kfree_skb(__skb_dequeue(list));
+ __skb_queue_tail(backlogq, bskb);
+ l->backlog[msg_importance(buf_msg(bskb))].len++;
+ l->stats.sent_bundled++;
+ l->stats.sent_bundles++;
+ continue;
+ }
+ l->backlog[imp].len += skb_queue_len(list);
+ skb_queue_splice_tail_init(list, backlogq);
+ }
+ l->snd_nxt = seqno;
+ return 0;
+}
+
static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
{
skb_queue_head_init(list);
@@ -715,65 +786,6 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
return __tipc_link_xmit(link->owner->net, link, &head);
}
-/* tipc_link_xmit_skb(): send single buffer to destination
- * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
- * messages, which will not cause link congestion
- * The only exception is datagram messages rerouted after secondary
- * lookup, which are rare and safe to dispose of anyway.
- * TODO: Return real return value, and let callers use
- * tipc_wait_for_sendpkt() where applicable
- */
-int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
- u32 selector)
-{
- struct sk_buff_head head;
- int rc;
-
- skb2list(skb, &head);
- rc = tipc_link_xmit(net, &head, dnode, selector);
- if (rc)
- kfree_skb(skb);
- return 0;
-}
-
-/**
- * tipc_link_xmit() is the general link level function for message sending
- * @net: the applicable net namespace
- * @list: chain of buffers containing message
- * @dsz: amount of user data to be sent
- * @dnode: address of destination node
- * @selector: a number used for deterministic link selection
- * Consumes the buffer chain, except when returning error
- * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
- */
-int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
- u32 selector)
-{
- struct tipc_link *link = NULL;
- struct tipc_node *node;
- int rc = -EHOSTUNREACH;
-
- node = tipc_node_find(net, dnode);
- if (node) {
- tipc_node_lock(node);
- link = node_active_link(node, selector & 1);
- if (link)
- rc = __tipc_link_xmit(net, link, list);
- tipc_node_unlock(node);
- tipc_node_put(node);
- }
- if (link)
- return rc;
-
- if (likely(in_own_node(net, dnode))) {
- tipc_sk_rcv(net, list);
- return 0;
- }
-
- __skb_queue_purge(list);
- return rc;
-}
-
/*
* tipc_link_sync_xmit - synchronize broadcast link endpoints.
*
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 9c71d9e..7add2b9 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -223,12 +223,10 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr);
void tipc_link_purge_backlog(struct tipc_link *l);
void tipc_link_reset_all(struct tipc_node *node);
void tipc_link_reset(struct tipc_link *l_ptr);
-int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
- u32 selector);
-int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dest,
- u32 selector);
int __tipc_link_xmit(struct net *net, struct tipc_link *link,
struct sk_buff_head *list);
+int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list,
+ struct sk_buff_head *xmitq);
void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
u32 gap, u32 tolerance, u32 priority);
void tipc_link_push_packets(struct tipc_link *l_ptr);
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 3a1539e..e6018b7 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -102,7 +102,7 @@ void named_cluster_distribute(struct net *net, struct sk_buff *skb)
if (!oskb)
break;
msg_set_destnode(buf_msg(oskb), dnode);
- tipc_link_xmit_skb(net, oskb, dnode, dnode);
+ tipc_node_xmit_skb(net, oskb, dnode, dnode);
}
rcu_read_unlock();
@@ -223,7 +223,7 @@ void tipc_named_node_up(struct net *net, u32 dnode)
&tn->nametbl->publ_list[TIPC_ZONE_SCOPE]);
rcu_read_unlock();
- tipc_link_xmit(net, &head, dnode, dnode);
+ tipc_node_xmit(net, &head, dnode, dnode);
}
static void tipc_publ_subscribe(struct net *net, struct publication *publ,
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 1972964..ad759bb 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -563,6 +563,84 @@ msg_full:
return -EMSGSIZE;
}
+static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
+ int *bearer_id,
+ struct tipc_media_addr **maddr)
+{
+ int id = n->active_links[sel & 1];
+
+ if (unlikely(id < 0))
+ return NULL;
+
+ *bearer_id = id;
+ *maddr = &n->links[id].maddr;
+ return n->links[id].link;
+}
+
+/**
+ * tipc_node_xmit() is the general link level function for message sending
+ * @net: the applicable net namespace
+ * @list: chain of buffers containing message
+ * @dnode: address of destination node
+ * @selector: a number used for deterministic link selection
+ * Consumes the buffer chain, except when returning -ELINKCONG
+ * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
+ */
+int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
+ u32 dnode, int selector)
+{
+ struct tipc_link *l = NULL;
+ struct tipc_node *n;
+ struct sk_buff_head xmitq;
+ struct tipc_media_addr *maddr;
+ int bearer_id;
+ int rc = -EHOSTUNREACH;
+
+ __skb_queue_head_init(&xmitq);
+ n = tipc_node_find(net, dnode);
+ if (likely(n)) {
+ tipc_node_lock(n);
+ l = tipc_node_select_link(n, selector, &bearer_id, &maddr);
+ if (likely(l))
+ rc = tipc_link_xmit(l, list, &xmitq);
+ if (unlikely(rc == -ENOBUFS))
+ tipc_link_reset(l);
+ tipc_node_unlock(n);
+ tipc_node_put(n);
+ }
+ if (likely(!rc)) {
+ tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
+ return 0;
+ }
+ if (likely(in_own_node(net, dnode))) {
+ tipc_sk_rcv(net, list);
+ return 0;
+ }
+ return rc;
+}
+
+/* tipc_node_xmit_skb(): send single buffer to destination
+ * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
+ * messages, which will not be rejected
+ * The only exception is datagram messages rerouted after secondary
+ * lookup, which are rare and safe to dispose of anyway.
+ * TODO: Return real return value, and let callers use
+ * tipc_wait_for_sendpkt() where applicable
+ */
+int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
+ u32 selector)
+{
+ struct sk_buff_head head;
+ int rc;
+
+ skb_queue_head_init(&head);
+ __skb_queue_tail(&head, skb);
+ rc = tipc_node_xmit(net, &head, dnode, selector);
+ if (rc == -ELINKCONG)
+ kfree_skb(skb);
+ return 0;
+}
+
int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
{
int err;
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 74f278a..86b7c74 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -160,6 +160,10 @@ bool tipc_node_is_up(struct tipc_node *n);
int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node,
char *linkname, size_t len);
void tipc_node_unlock(struct tipc_node *node);
+int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
+ int selector);
+int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
+ u32 selector);
int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 87fef25..5b0b08d 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -261,7 +261,7 @@ static void tsk_rej_rx_queue(struct sock *sk)
while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT))
- tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0);
+ tipc_node_xmit_skb(sock_net(sk), skb, dnode, 0);
}
}
@@ -443,7 +443,7 @@ static int tipc_release(struct socket *sock)
}
if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
TIPC_ERR_NO_PORT))
- tipc_link_xmit_skb(net, skb, dnode, 0);
+ tipc_node_xmit_skb(net, skb, dnode, 0);
}
}
@@ -456,7 +456,7 @@ static int tipc_release(struct socket *sock)
tsk_own_node(tsk), tsk_peer_port(tsk),
tsk->portid, TIPC_ERR_NO_PORT);
if (skb)
- tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
+ tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
tipc_node_remove_conn(net, dnode, tsk->portid);
}
@@ -925,7 +925,7 @@ new_mtu:
do {
skb = skb_peek(pktchain);
TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
- rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid);
+ rc = tipc_node_xmit(net, pktchain, dnode, tsk->portid);
if (likely(!rc)) {
if (sock->state != SS_READY)
sock->state = SS_CONNECTING;
@@ -1045,7 +1045,7 @@ next:
return rc;
do {
if (likely(!tsk_conn_cong(tsk))) {
- rc = tipc_link_xmit(net, pktchain, dnode, portid);
+ rc = tipc_node_xmit(net, pktchain, dnode, portid);
if (likely(!rc)) {
tsk->sent_unacked++;
sent += send;
@@ -1224,7 +1224,7 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
return;
msg = buf_msg(skb);
msg_set_msgcnt(msg, ack);
- tipc_link_xmit_skb(net, skb, dnode, msg_link_selector(msg));
+ tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
}
static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
@@ -1703,7 +1703,7 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
return 0;
}
if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err))
- tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
+ tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
return 0;
}
@@ -1799,7 +1799,7 @@ int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
continue;
xmit:
- tipc_link_xmit_skb(net, skb, dnode, dport);
+ tipc_node_xmit_skb(net, skb, dnode, dport);
}
return err ? -EHOSTUNREACH : 0;
}
@@ -2092,7 +2092,7 @@ restart:
}
if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
TIPC_CONN_SHUTDOWN))
- tipc_link_xmit_skb(net, skb, dnode,
+ tipc_node_xmit_skb(net, skb, dnode,
tsk->portid);
} else {
dnode = tsk_peer_node(tsk);
@@ -2102,7 +2102,7 @@ restart:
0, dnode, tsk_own_node(tsk),
tsk_peer_port(tsk),
tsk->portid, TIPC_CONN_SHUTDOWN);
- tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
+ tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
}
tsk->connected = 0;
sock->state = SS_DISCONNECTING;
@@ -2164,7 +2164,7 @@ static void tipc_sk_timeout(unsigned long data)
}
bh_unlock_sock(sk);
if (skb)
- tipc_link_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
+ tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
exit:
sock_put(sk);
}
--
1.9.1
^ permalink raw reply related [flat|nested] 15+ messages in thread
* [PATCH net-next 07/13] tipc: clean up definitions and usage of link flags
2015-07-16 20:54 [PATCH net-next 00/13] tipc: separate link and link aggregation layer Jon Maloy
` (5 preceding siblings ...)
2015-07-16 20:54 ` [PATCH net-next 06/13] tipc: make media xmit call outside node spinlock context Jon Maloy
@ 2015-07-16 20:54 ` Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 08/13] tipc: introduce new link protocol msg create function Jon Maloy
` (6 subsequent siblings)
13 siblings, 0 replies; 15+ messages in thread
From: Jon Maloy @ 2015-07-16 20:54 UTC (permalink / raw)
To: davem
Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
tipc-discussion, Jon Maloy
The status flag LINK_STOPPED is not needed any more, since the
mechanism for delayed deletion of links has been removed.
Likewise, LINK_STARTED and LINK_START_EVT are unnecessary,
because we can just as well start the link timer directly from
inside tipc_link_create().
We eliminate these flags in this commit.
Instead of the above flags, we now introduce three new link modes,
TIPC_LINK_OPEN, TIPC_LINK_BLOCKED and TIPC_LINK_TUNNEL. The values
indicate whether, and in the case of TIPC_LINK_TUNNEL, which, messages
the link is allowed to receive in this state. TIPC_LINK_BLOCKED also
blocks timer-driven protocol messages to be sent out, and any change
to the link FSM. Since the modes are mutually exclusive, we convert
them to state values, and rename the 'flags' field in struct tipc_link
to 'exec_mode'.
Finally, we move the #defines for link FSM states and events from link.h
into enums inside the file link.c, which is the real usage scope of
these definitions.
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
net/tipc/bcast.c | 1 -
net/tipc/link.c | 98 +++++++++++++++++++++++++++++++++-----------------------
net/tipc/link.h | 44 +++++--------------------
net/tipc/node.c | 2 +-
4 files changed, 67 insertions(+), 78 deletions(-)
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 295bdc2..aab4e8d 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -924,7 +924,6 @@ int tipc_bclink_init(struct net *net)
tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
bcl->bearer_id = MAX_BEARERS;
rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer);
- bcl->state = WORKING_WORKING;
bcl->pmsg = (struct tipc_msg *)&bcl->proto_msg;
msg_set_prevnode(bcl->pmsg, tn->own_addr);
strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index c052437..35a2da6 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -79,19 +79,49 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
/*
* Out-of-range value for link session numbers
*/
-#define INVALID_SESSION 0x10000
+#define WILDCARD_SESSION 0x10000
-/*
- * Link state events:
+/* State value stored in 'failover_pkts'
*/
-#define STARTING_EVT 856384768 /* link processing trigger */
-#define TRAFFIC_MSG_EVT 560815u /* rx'd ??? */
-#define SILENCE_EVT 560817u /* timer dicovered silence from peer */
+#define FIRST_FAILOVER 0xffffu
-/*
- * State value stored in 'failover_pkts'
+/* Link FSM states and events:
*/
-#define FIRST_FAILOVER 0xffffu
+enum {
+ WORKING_WORKING,
+ WORKING_UNKNOWN,
+ RESET_RESET,
+ RESET_UNKNOWN
+};
+
+enum {
+ PEER_RESET_EVT = RESET_MSG,
+ ACTIVATE_EVT = ACTIVATE_MSG,
+ TRAFFIC_EVT, /* Any other valid msg from peer */
+ SILENCE_EVT /* Peer was silent during last timer interval*/
+};
+
+/* Link FSM state checking routines
+ */
+static int link_working_working(struct tipc_link *l)
+{
+ return l->state == WORKING_WORKING;
+}
+
+static int link_working_unknown(struct tipc_link *l)
+{
+ return l->state == WORKING_UNKNOWN;
+}
+
+static int link_reset_unknown(struct tipc_link *l)
+{
+ return l->state == RESET_UNKNOWN;
+}
+
+static int link_reset_reset(struct tipc_link *l)
+{
+ return l->state == RESET_RESET;
+}
static void link_handle_out_of_seq_msg(struct tipc_link *link,
struct sk_buff *skb);
@@ -268,7 +298,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
/* note: peer i/f name is updated by reset/activate message */
memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
l_ptr->owner = n_ptr;
- l_ptr->peer_session = INVALID_SESSION;
+ l_ptr->peer_session = WILDCARD_SESSION;
l_ptr->bearer_id = b_ptr->identity;
link_set_supervision_props(l_ptr, b_ptr->tolerance);
l_ptr->state = RESET_UNKNOWN;
@@ -297,8 +327,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
link_reset_statistics(l_ptr);
tipc_node_attach_link(n_ptr, l_ptr);
setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr);
- link_state_event(l_ptr, STARTING_EVT);
-
+ link_set_timer(l_ptr, l_ptr->keepalive_intv);
return l_ptr;
}
@@ -311,7 +340,6 @@ void tipc_link_delete(struct tipc_link *l)
tipc_link_reset(l);
if (del_timer(&l->timer))
tipc_link_put(l);
- l->flags |= LINK_STOPPED;
/* Delete link now, or when timer is finished: */
tipc_link_reset_fragments(l);
tipc_node_detach_link(l->owner, l);
@@ -438,7 +466,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
/* Link is down, accept any session */
- l_ptr->peer_session = INVALID_SESSION;
+ l_ptr->peer_session = WILDCARD_SESSION;
/* Prepare for renewed mtu size negotiation */
l_ptr->mtu = l_ptr->advertised_mtu;
@@ -452,7 +480,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
tipc_bearer_remove_dest(owner->net, l_ptr->bearer_id, l_ptr->addr);
if (was_active_link && tipc_node_is_up(l_ptr->owner) && (pl != l_ptr)) {
- l_ptr->flags |= LINK_FAILINGOVER;
+ l_ptr->exec_mode = TIPC_LINK_BLOCKED;
l_ptr->failover_checkpt = l_ptr->rcv_nxt;
pl->failover_pkts = FIRST_FAILOVER;
pl->failover_checkpt = l_ptr->rcv_nxt;
@@ -496,21 +524,14 @@ static void link_activate(struct tipc_link *link)
static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
{
struct tipc_link *other;
- unsigned long timer_intv = l_ptr->keepalive_intv;
-
- if (l_ptr->flags & LINK_STOPPED)
- return;
-
- if (!(l_ptr->flags & LINK_STARTED) && (event != STARTING_EVT))
- return; /* Not yet. */
- if (l_ptr->flags & LINK_FAILINGOVER)
+ if (l_ptr->exec_mode == TIPC_LINK_BLOCKED)
return;
switch (l_ptr->state) {
case WORKING_WORKING:
switch (event) {
- case TRAFFIC_MSG_EVT:
+ case TRAFFIC_EVT:
case ACTIVATE_MSG:
l_ptr->silent_intv_cnt = 0;
break;
@@ -538,7 +559,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
break;
case WORKING_UNKNOWN:
switch (event) {
- case TRAFFIC_MSG_EVT:
+ case TRAFFIC_EVT:
case ACTIVATE_MSG:
l_ptr->state = WORKING_WORKING;
l_ptr->silent_intv_cnt = 0;
@@ -576,7 +597,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
break;
case RESET_UNKNOWN:
switch (event) {
- case TRAFFIC_MSG_EVT:
+ case TRAFFIC_EVT:
break;
case ACTIVATE_MSG:
other = node_active_link(l_ptr->owner, 0);
@@ -593,10 +614,6 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
1, 0, 0, 0);
break;
- case STARTING_EVT:
- l_ptr->flags |= LINK_STARTED;
- link_set_timer(l_ptr, timer_intv);
- break;
case SILENCE_EVT:
tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0);
break;
@@ -606,7 +623,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
break;
case RESET_RESET:
switch (event) {
- case TRAFFIC_MSG_EVT:
+ case TRAFFIC_EVT:
case ACTIVATE_MSG:
other = node_active_link(l_ptr->owner, 0);
if (other && link_working_unknown(other))
@@ -975,7 +992,7 @@ static bool link_synch(struct tipc_link *l)
if (skb_queue_len(pl->inputq) > post_synch)
return false;
synched:
- l->flags &= ~LINK_SYNCHING;
+ l->exec_mode = TIPC_LINK_OPEN;
return true;
}
@@ -1091,7 +1108,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
}
/* Traffic message. Conditionally activate link */
- link_state_event(l_ptr, TRAFFIC_MSG_EVT);
+ link_state_event(l_ptr, TRAFFIC_EVT);
if (link_working_working(l_ptr)) {
/* Re-insert buffer in front of queue */
@@ -1112,7 +1129,8 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
l_ptr->silent_intv_cnt = 0;
/* Synchronize with parallel link if applicable */
- if (unlikely((l_ptr->flags & LINK_SYNCHING) && !msg_dup(msg))) {
+ if (unlikely((l_ptr->exec_mode == TIPC_LINK_TUNNEL) &&
+ !msg_dup(msg))) {
if (!link_synch(l_ptr))
goto unlock;
}
@@ -1193,7 +1211,7 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
switch (msg_user(msg)) {
case TUNNEL_PROTOCOL:
if (msg_dup(msg)) {
- link->flags |= LINK_SYNCHING;
+ link->exec_mode = TIPC_LINK_TUNNEL;
link->synch_point = msg_seqno(msg_get_wrapped(msg));
kfree_skb(skb);
break;
@@ -1315,7 +1333,7 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
u16 last_rcv;
/* Don't send protocol message during link failover */
- if (l_ptr->flags & LINK_FAILINGOVER)
+ if (l_ptr->exec_mode == TIPC_LINK_BLOCKED)
return;
/* Abort non-RESET send if communication with node is prohibited */
@@ -1390,7 +1408,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
u32 msg_tol;
struct tipc_msg *msg = buf_msg(buf);
- if (l_ptr->flags & LINK_FAILINGOVER)
+ if (l_ptr->exec_mode == TIPC_LINK_BLOCKED)
goto exit;
if (l_ptr->net_plane != msg_net_plane(msg))
@@ -1401,7 +1419,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
case RESET_MSG:
if (!link_working_unknown(l_ptr) &&
- (l_ptr->peer_session != INVALID_SESSION)) {
+ (l_ptr->peer_session != WILDCARD_SESSION)) {
if (less_eq(msg_session(msg), l_ptr->peer_session))
break; /* duplicate or old reset: ignore */
}
@@ -1465,7 +1483,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
/* Record reception; force mismatch at next timeout: */
l_ptr->silent_intv_cnt = 0;
- link_state_event(l_ptr, TRAFFIC_MSG_EVT);
+ link_state_event(l_ptr, TRAFFIC_EVT);
l_ptr->stats.recv_states++;
if (link_reset_unknown(l_ptr))
break;
@@ -1704,7 +1722,7 @@ static bool tipc_link_failover_rcv(struct tipc_link *link,
}
exit:
if (!link->failover_pkts && pl)
- pl->flags &= ~LINK_FAILINGOVER;
+ pl->exec_mode = TIPC_LINK_OPEN;
kfree_skb(*skb);
*skb = iskb;
return *skb;
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 7add2b9..0509c6d 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -49,19 +49,14 @@
*/
#define INVALID_LINK_SEQ 0x10000
-/* Link working states
- */
-#define WORKING_WORKING 560810u
-#define WORKING_UNKNOWN 560811u
-#define RESET_UNKNOWN 560812u
-#define RESET_RESET 560813u
-/* Link endpoint execution states
+/* Link endpoint receive states
*/
-#define LINK_STARTED 0x0001
-#define LINK_STOPPED 0x0002
-#define LINK_SYNCHING 0x0004
-#define LINK_FAILINGOVER 0x0008
+enum {
+ TIPC_LINK_OPEN,
+ TIPC_LINK_BLOCKED,
+ TIPC_LINK_TUNNEL
+};
/* Starting value for maximum packet size negotiation on unicast links
* (unless bearer MTU is less)
@@ -106,7 +101,6 @@ struct tipc_stats {
* @timer: link timer
* @owner: pointer to peer node
* @refcnt: reference counter for permanent references (owner node & timer)
- * @flags: execution state flags for link endpoint instance
* @peer_session: link session # being used by peer end of link
* @peer_bearer_id: bearer id used by link's peer endpoint
* @bearer_id: local bearer id used by link
@@ -119,6 +113,7 @@ struct tipc_stats {
* @pmsg: convenience pointer to "proto_msg" field
* @priority: current link priority
* @net_plane: current link network plane ('A' through 'H')
+ * @exec_mode: transmit/receive mode for link endpoint instance
* @backlog_limit: backlog queue congestion thresholds (indexed by importance)
* @exp_msg_count: # of tunnelled messages expected during link changeover
* @reset_rcv_checkpt: seq # of last acknowledged message at time of link reset
@@ -149,7 +144,6 @@ struct tipc_link {
struct kref ref;
/* Management and link supervision data */
- unsigned int flags;
u32 peer_session;
u32 peer_bearer_id;
u32 bearer_id;
@@ -165,6 +159,7 @@ struct tipc_link {
struct tipc_msg *pmsg;
u32 priority;
char net_plane;
+ u8 exec_mode;
u16 synch_point;
/* Failover */
@@ -249,27 +244,4 @@ static inline u32 link_own_addr(struct tipc_link *l)
return msg_prevnode(l->pmsg);
}
-/*
- * Link status checking routines
- */
-static inline int link_working_working(struct tipc_link *l_ptr)
-{
- return l_ptr->state == WORKING_WORKING;
-}
-
-static inline int link_working_unknown(struct tipc_link *l_ptr)
-{
- return l_ptr->state == WORKING_UNKNOWN;
-}
-
-static inline int link_reset_unknown(struct tipc_link *l_ptr)
-{
- return l_ptr->state == RESET_UNKNOWN;
-}
-
-static inline int link_reset_reset(struct tipc_link *l_ptr)
-{
- return l_ptr->state == RESET_RESET;
-}
-
#endif
diff --git a/net/tipc/node.c b/net/tipc/node.c
index ad759bb..b7a4457 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -403,7 +403,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
struct tipc_link *l_ptr = n_ptr->links[i].link;
if (!l_ptr)
continue;
- l_ptr->flags &= ~LINK_FAILINGOVER;
+ l_ptr->exec_mode = TIPC_LINK_OPEN;
l_ptr->failover_checkpt = 0;
l_ptr->failover_pkts = 0;
kfree_skb(l_ptr->failover_skb);
--
1.9.1
^ permalink raw reply related [flat|nested] 15+ messages in thread
* [PATCH net-next 08/13] tipc: introduce new link protocol msg create function
2015-07-16 20:54 [PATCH net-next 00/13] tipc: separate link and link aggregation layer Jon Maloy
` (6 preceding siblings ...)
2015-07-16 20:54 ` [PATCH net-next 07/13] tipc: clean up definitions and usage of link flags Jon Maloy
@ 2015-07-16 20:54 ` Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 09/13] tipc: improve link FSM implementation Jon Maloy
` (5 subsequent siblings)
13 siblings, 0 replies; 15+ messages in thread
From: Jon Maloy @ 2015-07-16 20:54 UTC (permalink / raw)
To: davem
Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
tipc-discussion, Jon Maloy
As a preparation for later changes, we introduce a new function
tipc_link_build_proto_msg(). Instead of actually sending the created
protocol message, it only creates it and adds it to the head of a
skb queue provided by the caller.
Since we still need the existing function tipc_link_protocol_xmit()
for a while, we redesign it to make use of the new function.
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
net/tipc/link.c | 144 ++++++++++++++++++++++++++++++--------------------------
1 file changed, 77 insertions(+), 67 deletions(-)
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 35a2da6..657ba91 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -129,6 +129,9 @@ static void tipc_link_proto_rcv(struct tipc_link *link,
struct sk_buff *skb);
static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol);
static void link_state_event(struct tipc_link *l_ptr, u32 event);
+static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
+ u16 rcvgap, int tolerance, int priority,
+ struct sk_buff_head *xmitq);
static void link_reset_statistics(struct tipc_link *l_ptr);
static void link_print(struct tipc_link *l_ptr, const char *str);
static void tipc_link_sync_xmit(struct tipc_link *l);
@@ -1323,77 +1326,21 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
/*
* Send protocol message to the other endpoint.
*/
-void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
+void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg,
u32 gap, u32 tolerance, u32 priority)
{
- struct sk_buff *buf = NULL;
- struct tipc_msg *msg = l_ptr->pmsg;
- u32 msg_size = sizeof(l_ptr->proto_msg);
- int r_flag;
- u16 last_rcv;
+ struct sk_buff *skb = NULL;
+ struct sk_buff_head xmitq;
- /* Don't send protocol message during link failover */
- if (l_ptr->exec_mode == TIPC_LINK_BLOCKED)
- return;
-
- /* Abort non-RESET send if communication with node is prohibited */
- if ((tipc_node_blocked(l_ptr->owner)) && (msg_typ != RESET_MSG))
- return;
-
- /* Create protocol message with "out-of-sequence" sequence number */
- msg_set_type(msg, msg_typ);
- msg_set_net_plane(msg, l_ptr->net_plane);
- msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
- msg_set_last_bcast(msg, tipc_bclink_get_last_sent(l_ptr->owner->net));
-
- if (msg_typ == STATE_MSG) {
- u16 next_sent = l_ptr->snd_nxt;
-
- if (!tipc_link_is_up(l_ptr))
- return;
- msg_set_next_sent(msg, next_sent);
- if (!skb_queue_empty(&l_ptr->deferdq)) {
- last_rcv = buf_seqno(skb_peek(&l_ptr->deferdq));
- gap = mod(last_rcv - l_ptr->rcv_nxt);
- }
- msg_set_seq_gap(msg, gap);
- if (gap)
- l_ptr->stats.sent_nacks++;
- msg_set_link_tolerance(msg, tolerance);
- msg_set_linkprio(msg, priority);
- msg_set_max_pkt(msg, l_ptr->mtu);
- msg_set_ack(msg, mod(l_ptr->rcv_nxt - 1));
- msg_set_probe(msg, probe_msg != 0);
- if (probe_msg)
- l_ptr->stats.sent_probes++;
- l_ptr->stats.sent_states++;
- } else { /* RESET_MSG or ACTIVATE_MSG */
- msg_set_ack(msg, mod(l_ptr->failover_checkpt - 1));
- msg_set_seq_gap(msg, 0);
- msg_set_next_sent(msg, 1);
- msg_set_probe(msg, 0);
- msg_set_link_tolerance(msg, l_ptr->tolerance);
- msg_set_linkprio(msg, l_ptr->priority);
- msg_set_max_pkt(msg, l_ptr->advertised_mtu);
- }
-
- r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
- msg_set_redundant_link(msg, r_flag);
- msg_set_linkprio(msg, l_ptr->priority);
- msg_set_size(msg, msg_size);
-
- msg_set_seqno(msg, mod(l_ptr->snd_nxt + (0xffff / 2)));
-
- buf = tipc_buf_acquire(msg_size);
- if (!buf)
+ __skb_queue_head_init(&xmitq);
+ tipc_link_build_proto_msg(l, msg_typ, probe_msg, gap,
+ tolerance, priority, &xmitq);
+ skb = __skb_dequeue(&xmitq);
+ if (!skb)
return;
-
- skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
- buf->priority = TC_PRIO_CONTROL;
- tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, buf,
- &l_ptr->media_addr);
- l_ptr->rcv_unacked = 0;
- kfree_skb(buf);
+ tipc_bearer_send(l->owner->net, l->bearer_id, skb, &l->media_addr);
+ l->rcv_unacked = 0;
+ kfree_skb(skb);
}
/*
@@ -1514,6 +1461,69 @@ exit:
kfree_skb(buf);
}
+/* tipc_link_build_proto_msg: prepare link protocol message for transmission
+ */
+static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
+ u16 rcvgap, int tolerance, int priority,
+ struct sk_buff_head *xmitq)
+{
+ struct sk_buff *skb = NULL;
+ struct tipc_msg *hdr = l->pmsg;
+ u16 snd_nxt = l->snd_nxt;
+ u16 rcv_nxt = l->rcv_nxt;
+ u16 rcv_last = rcv_nxt - 1;
+ int node_up = l->owner->bclink.recv_permitted;
+
+ /* Don't send protocol message during reset or link failover */
+ if (l->exec_mode == TIPC_LINK_BLOCKED)
+ return;
+
+ /* Abort non-RESET send if communication with node is prohibited */
+ if ((tipc_node_blocked(l->owner)) && (mtyp != RESET_MSG))
+ return;
+
+ msg_set_type(hdr, mtyp);
+ msg_set_net_plane(hdr, l->net_plane);
+ msg_set_bcast_ack(hdr, l->owner->bclink.last_in);
+ msg_set_last_bcast(hdr, tipc_bclink_get_last_sent(l->owner->net));
+ msg_set_link_tolerance(hdr, tolerance);
+ msg_set_linkprio(hdr, priority);
+ msg_set_redundant_link(hdr, node_up);
+ msg_set_seq_gap(hdr, 0);
+
+ /* Compatibility: created msg must not be in sequence with pkt flow */
+ msg_set_seqno(hdr, snd_nxt + U16_MAX / 2);
+
+ if (mtyp == STATE_MSG) {
+ if (!tipc_link_is_up(l))
+ return;
+ msg_set_next_sent(hdr, snd_nxt);
+
+ /* Override rcvgap if there are packets in deferred queue */
+ if (!skb_queue_empty(&l->deferdq))
+ rcvgap = buf_seqno(skb_peek(&l->deferdq)) - rcv_nxt;
+ if (rcvgap) {
+ msg_set_seq_gap(hdr, rcvgap);
+ l->stats.sent_nacks++;
+ }
+ msg_set_ack(hdr, rcv_last);
+ msg_set_probe(hdr, probe);
+ if (probe)
+ l->stats.sent_probes++;
+ l->stats.sent_states++;
+ } else {
+ /* RESET_MSG or ACTIVATE_MSG */
+ msg_set_max_pkt(hdr, l->advertised_mtu);
+ msg_set_ack(hdr, l->failover_checkpt - 1);
+ msg_set_next_sent(hdr, 1);
+ }
+ skb = tipc_buf_acquire(msg_size(hdr));
+ if (!skb)
+ return;
+ skb_copy_to_linear_data(skb, hdr, msg_size(hdr));
+ skb->priority = TC_PRIO_CONTROL;
+ __skb_queue_head(xmitq, skb);
+}
/* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to
* a different bearer. Owner node is locked.
--
1.9.1
^ permalink raw reply related [flat|nested] 15+ messages in thread
* [PATCH net-next 09/13] tipc: improve link FSM implementation
2015-07-16 20:54 [PATCH net-next 00/13] tipc: separate link and link aggregation layer Jon Maloy
` (7 preceding siblings ...)
2015-07-16 20:54 ` [PATCH net-next 08/13] tipc: introduce new link protocol msg create function Jon Maloy
@ 2015-07-16 20:54 ` Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 10/13] tipc: simplify link timer implementation Jon Maloy
` (4 subsequent siblings)
13 siblings, 0 replies; 15+ messages in thread
From: Jon Maloy @ 2015-07-16 20:54 UTC (permalink / raw)
To: davem
Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
tipc-discussion, Jon Maloy
The link FSM implementation is currently unnecessarily complex.
It sometimes checks for conditional state outside the FSM data
before deciding next state, and often performs actions directly
inside the FSM logics.
In this commit, we create a second, simpler FSM implementation,
that as far as possible acts only on states and events that it is
strictly defined for, and postpone any actions until it is finished
with its decisions. It also returns an event flag field and an a
buffer queue which may potentially contain a protocol message to
be sent by the caller.
Unfortunately, we cannot yet make the FSM "clean", in the sense
that its decisions are only based on FSM state and event, and that
state changes happen only here. That will have to wait until the
activate/reset logics has been cleaned up in a future commit.
We also rename the link states as follows:
WORKING_WORKING -> TIPC_LINK_WORKING
WORKING_UNKNOWN -> TIPC_LINK_PROBING
RESET_UNKNOWN -> TIPC_LINK_RESETTING
RESET_RESET -> TIPC_LINK_ESTABLISHING
The existing FSM function, link_state_event(), is still needed for
a while, so we redesign it to make use of the new function.
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
net/tipc/link.c | 344 +++++++++++++++++++++++++++++++-------------------------
net/tipc/link.h | 7 ++
2 files changed, 195 insertions(+), 156 deletions(-)
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 657ba91..5d2f919 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -88,10 +88,10 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
/* Link FSM states and events:
*/
enum {
- WORKING_WORKING,
- WORKING_UNKNOWN,
- RESET_RESET,
- RESET_UNKNOWN
+ TIPC_LINK_WORKING,
+ TIPC_LINK_PROBING,
+ TIPC_LINK_RESETTING,
+ TIPC_LINK_ESTABLISHING
};
enum {
@@ -103,24 +103,24 @@ enum {
/* Link FSM state checking routines
*/
-static int link_working_working(struct tipc_link *l)
+static int link_working(struct tipc_link *l)
{
- return l->state == WORKING_WORKING;
+ return l->state == TIPC_LINK_WORKING;
}
-static int link_working_unknown(struct tipc_link *l)
+static int link_probing(struct tipc_link *l)
{
- return l->state == WORKING_UNKNOWN;
+ return l->state == TIPC_LINK_PROBING;
}
-static int link_reset_unknown(struct tipc_link *l)
+static int link_resetting(struct tipc_link *l)
{
- return l->state == RESET_UNKNOWN;
+ return l->state == TIPC_LINK_RESETTING;
}
-static int link_reset_reset(struct tipc_link *l)
+static int link_establishing(struct tipc_link *l)
{
- return l->state == RESET_RESET;
+ return l->state == TIPC_LINK_ESTABLISHING;
}
static void link_handle_out_of_seq_msg(struct tipc_link *link,
@@ -140,6 +140,8 @@ static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
static bool tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb);
static void link_set_timer(struct tipc_link *link, unsigned long time);
+static void link_activate(struct tipc_link *link);
+
/*
* Simple link routines
*/
@@ -179,7 +181,7 @@ int tipc_link_is_up(struct tipc_link *l_ptr)
{
if (!l_ptr)
return 0;
- return link_working_working(l_ptr) || link_working_unknown(l_ptr);
+ return link_working(l_ptr) || link_probing(l_ptr);
}
int tipc_link_is_active(struct tipc_link *l)
@@ -234,8 +236,11 @@ static void link_timeout(unsigned long data)
}
/* do all other link processing performed on a periodic basis */
- if (l_ptr->silent_intv_cnt || tipc_bclink_acks_missing(l_ptr->owner))
+ if (l_ptr->silent_intv_cnt)
link_state_event(l_ptr, SILENCE_EVT);
+ else if (link_working(l_ptr) && tipc_bclink_acks_missing(l_ptr->owner))
+ tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0);
+
l_ptr->silent_intv_cnt++;
if (skb_queue_len(&l_ptr->backlogq))
tipc_link_push_packets(l_ptr);
@@ -304,7 +309,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
l_ptr->peer_session = WILDCARD_SESSION;
l_ptr->bearer_id = b_ptr->identity;
link_set_supervision_props(l_ptr, b_ptr->tolerance);
- l_ptr->state = RESET_UNKNOWN;
+ l_ptr->state = TIPC_LINK_RESETTING;
l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
msg = l_ptr->pmsg;
@@ -367,6 +372,134 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id)
}
/**
+ * tipc_link_fsm_evt - link finite state machine
+ * @l: pointer to link
+ * @evt: state machine event to be processed
+ * @xmitq: queue to prepend created protocol message, if any
+ */
+static int tipc_link_fsm_evt(struct tipc_link *l, int evt,
+ struct sk_buff_head *xmitq)
+{
+ int mtyp = 0, rc = 0;
+ struct tipc_link *pl;
+ enum {
+ LINK_RESET = 1,
+ LINK_ACTIVATE = (1 << 1),
+ SND_PROBE = (1 << 2),
+ SND_STATE = (1 << 3),
+ SND_RESET = (1 << 4),
+ SND_ACTIVATE = (1 << 5)
+ } actions = 0;
+
+ if (l->exec_mode == TIPC_LINK_BLOCKED)
+ return rc;
+
+ switch (l->state) {
+ case TIPC_LINK_WORKING:
+ switch (evt) {
+ case TRAFFIC_EVT:
+ case ACTIVATE_EVT:
+ break;
+ case SILENCE_EVT:
+ l->state = TIPC_LINK_PROBING;
+ actions |= SND_PROBE;
+ break;
+ case PEER_RESET_EVT:
+ actions |= LINK_RESET | SND_ACTIVATE;
+ break;
+ default:
+ pr_debug("%s%u WORKING\n", link_unk_evt, evt);
+ }
+ break;
+ case TIPC_LINK_PROBING:
+ switch (evt) {
+ case TRAFFIC_EVT:
+ case ACTIVATE_EVT:
+ l->state = TIPC_LINK_WORKING;
+ break;
+ case PEER_RESET_EVT:
+ actions |= LINK_RESET | SND_ACTIVATE;
+ break;
+ case SILENCE_EVT:
+ if (l->silent_intv_cnt <= l->abort_limit) {
+ actions |= SND_PROBE;
+ break;
+ }
+ actions |= LINK_RESET | SND_RESET;
+ break;
+ default:
+ pr_err("%s%u PROBING\n", link_unk_evt, evt);
+ }
+ break;
+ case TIPC_LINK_RESETTING:
+ switch (evt) {
+ case TRAFFIC_EVT:
+ break;
+ case ACTIVATE_EVT:
+ pl = node_active_link(l->owner, 0);
+ if (pl && link_probing(pl))
+ break;
+ actions |= LINK_ACTIVATE;
+ if (l->owner->working_links == 1)
+ tipc_link_sync_xmit(l);
+ break;
+ case PEER_RESET_EVT:
+ l->state = TIPC_LINK_ESTABLISHING;
+ actions |= SND_ACTIVATE;
+ break;
+ case SILENCE_EVT:
+ actions |= SND_RESET;
+ break;
+ default:
+ pr_err("%s%u in RESETTING\n", link_unk_evt, evt);
+ }
+ break;
+ case TIPC_LINK_ESTABLISHING:
+ switch (evt) {
+ case TRAFFIC_EVT:
+ case ACTIVATE_EVT:
+ pl = node_active_link(l->owner, 0);
+ if (pl && link_probing(pl))
+ break;
+ actions |= LINK_ACTIVATE;
+ if (l->owner->working_links == 1)
+ tipc_link_sync_xmit(l);
+ break;
+ case PEER_RESET_EVT:
+ break;
+ case SILENCE_EVT:
+ actions |= SND_ACTIVATE;
+ break;
+ default:
+ pr_err("%s%u ESTABLISHING\n", link_unk_evt, evt);
+ }
+ break;
+ default:
+ pr_err("Unknown link state %u/%u\n", l->state, evt);
+ }
+
+ /* Perform actions as decided by FSM */
+ if (actions & LINK_RESET) {
+ l->exec_mode = TIPC_LINK_BLOCKED;
+ rc |= TIPC_LINK_DOWN_EVT;
+ }
+ if (actions & LINK_ACTIVATE) {
+ l->exec_mode = TIPC_LINK_OPEN;
+ rc |= TIPC_LINK_UP_EVT;
+ }
+ if (actions & (SND_STATE | SND_PROBE))
+ mtyp = STATE_MSG;
+ if (actions & SND_RESET)
+ mtyp = RESET_MSG;
+ if (actions & SND_ACTIVATE)
+ mtyp = ACTIVATE_MSG;
+ if (actions & (SND_PROBE | SND_STATE | SND_RESET | SND_ACTIVATE))
+ tipc_link_build_proto_msg(l, mtyp, actions & SND_PROBE,
+ 0, 0, 0, xmitq);
+ return rc;
+}
+
+/**
* link_schedule_user - schedule a message sender for wakeup after congestion
* @link: congested link
* @list: message that was attempted sent
@@ -474,9 +607,10 @@ void tipc_link_reset(struct tipc_link *l_ptr)
/* Prepare for renewed mtu size negotiation */
l_ptr->mtu = l_ptr->advertised_mtu;
- l_ptr->state = RESET_UNKNOWN;
+ l_ptr->state = TIPC_LINK_RESETTING;
- if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
+ if ((prev_state == TIPC_LINK_RESETTING) ||
+ (prev_state == TIPC_LINK_ESTABLISHING))
return;
tipc_node_link_down(l_ptr->owner, l_ptr->bearer_id);
@@ -515,6 +649,8 @@ static void link_activate(struct tipc_link *link)
link->rcv_nxt = 1;
link->stats.recv_info = 1;
link->silent_intv_cnt = 0;
+ link->state = TIPC_LINK_WORKING;
+ link->exec_mode = TIPC_LINK_OPEN;
tipc_node_link_up(node, link->bearer_id);
tipc_bearer_add_dest(node->net, link->bearer_id, link->addr);
}
@@ -524,132 +660,29 @@ static void link_activate(struct tipc_link *link)
* @l_ptr: pointer to link
* @event: state machine event to process
*/
-static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
+static void link_state_event(struct tipc_link *l, unsigned int evt)
{
- struct tipc_link *other;
+ int rc;
+ struct sk_buff_head xmitq;
+ struct sk_buff *skb;
- if (l_ptr->exec_mode == TIPC_LINK_BLOCKED)
+ if (l->exec_mode == TIPC_LINK_BLOCKED)
return;
- switch (l_ptr->state) {
- case WORKING_WORKING:
- switch (event) {
- case TRAFFIC_EVT:
- case ACTIVATE_MSG:
- l_ptr->silent_intv_cnt = 0;
- break;
- case SILENCE_EVT:
- if (!l_ptr->silent_intv_cnt) {
- if (tipc_bclink_acks_missing(l_ptr->owner))
- tipc_link_proto_xmit(l_ptr, STATE_MSG,
- 0, 0, 0, 0);
- break;
- }
- l_ptr->state = WORKING_UNKNOWN;
- tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0);
- break;
- case RESET_MSG:
- pr_debug("%s<%s>, requested by peer\n",
- link_rst_msg, l_ptr->name);
- tipc_link_reset(l_ptr);
- l_ptr->state = RESET_RESET;
- tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
- 0, 0, 0, 0);
- break;
- default:
- pr_debug("%s%u in WW state\n", link_unk_evt, event);
- }
- break;
- case WORKING_UNKNOWN:
- switch (event) {
- case TRAFFIC_EVT:
- case ACTIVATE_MSG:
- l_ptr->state = WORKING_WORKING;
- l_ptr->silent_intv_cnt = 0;
- break;
- case RESET_MSG:
- pr_debug("%s<%s>, requested by peer while probing\n",
- link_rst_msg, l_ptr->name);
- tipc_link_reset(l_ptr);
- l_ptr->state = RESET_RESET;
- tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
- 0, 0, 0, 0);
- break;
- case SILENCE_EVT:
- if (!l_ptr->silent_intv_cnt) {
- l_ptr->state = WORKING_WORKING;
- if (tipc_bclink_acks_missing(l_ptr->owner))
- tipc_link_proto_xmit(l_ptr, STATE_MSG,
- 0, 0, 0, 0);
- } else if (l_ptr->silent_intv_cnt <
- l_ptr->abort_limit) {
- tipc_link_proto_xmit(l_ptr, STATE_MSG,
- 1, 0, 0, 0);
- } else { /* Link has failed */
- pr_debug("%s<%s>, peer not responding\n",
- link_rst_msg, l_ptr->name);
- tipc_link_reset(l_ptr);
- l_ptr->state = RESET_UNKNOWN;
- tipc_link_proto_xmit(l_ptr, RESET_MSG,
- 0, 0, 0, 0);
- }
- break;
- default:
- pr_err("%s%u in WU state\n", link_unk_evt, event);
- }
- break;
- case RESET_UNKNOWN:
- switch (event) {
- case TRAFFIC_EVT:
- break;
- case ACTIVATE_MSG:
- other = node_active_link(l_ptr->owner, 0);
- if (other && link_working_unknown(other))
- break;
- l_ptr->state = WORKING_WORKING;
- link_activate(l_ptr);
- tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0);
- if (l_ptr->owner->working_links == 1)
- tipc_link_sync_xmit(l_ptr);
- break;
- case RESET_MSG:
- l_ptr->state = RESET_RESET;
- tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
- 1, 0, 0, 0);
- break;
- case SILENCE_EVT:
- tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0);
- break;
- default:
- pr_err("%s%u in RU state\n", link_unk_evt, event);
- }
- break;
- case RESET_RESET:
- switch (event) {
- case TRAFFIC_EVT:
- case ACTIVATE_MSG:
- other = node_active_link(l_ptr->owner, 0);
- if (other && link_working_unknown(other))
- break;
- l_ptr->state = WORKING_WORKING;
- link_activate(l_ptr);
- tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0);
- if (l_ptr->owner->working_links == 1)
- tipc_link_sync_xmit(l_ptr);
- break;
- case RESET_MSG:
- break;
- case SILENCE_EVT:
- tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
- 0, 0, 0, 0);
- break;
- default:
- pr_err("%s%u in RR state\n", link_unk_evt, event);
- }
- break;
- default:
- pr_err("Unknown link state %u/%u\n", l_ptr->state, event);
- }
+ __skb_queue_head_init(&xmitq);
+
+ rc = tipc_link_fsm_evt(l, evt, &xmitq);
+
+ if (rc & TIPC_LINK_UP_EVT)
+ link_activate(l);
+
+ if (rc & TIPC_LINK_DOWN_EVT)
+ tipc_link_reset(l);
+
+ skb = __skb_dequeue(&xmitq);
+ if (!skb)
+ return;
+ tipc_bearer_send(l->owner->net, l->bearer_id, skb, &l->media_addr);
}
/**
@@ -1102,7 +1135,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
link_prepare_wakeup(l_ptr);
/* Process the incoming packet */
- if (unlikely(!link_working_working(l_ptr))) {
+ if (unlikely(!link_working(l_ptr))) {
if (msg_user(msg) == LINK_PROTOCOL) {
tipc_link_proto_rcv(l_ptr, skb);
link_retrieve_defq(l_ptr, &head);
@@ -1113,7 +1146,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
/* Traffic message. Conditionally activate link */
link_state_event(l_ptr, TRAFFIC_EVT);
- if (link_working_working(l_ptr)) {
+ if (link_working(l_ptr)) {
/* Re-insert buffer in front of queue */
__skb_queue_head(&head, skb);
skb = NULL;
@@ -1122,7 +1155,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
goto unlock;
}
- /* Link is now in state WORKING_WORKING */
+ /* Link is now in state TIPC_LINK_WORKING */
if (unlikely(seq_no != l_ptr->rcv_nxt)) {
link_handle_out_of_seq_msg(l_ptr, skb);
link_retrieve_defq(l_ptr, &head);
@@ -1365,16 +1398,15 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
switch (msg_type(msg)) {
case RESET_MSG:
- if (!link_working_unknown(l_ptr) &&
+ if (!link_probing(l_ptr) &&
(l_ptr->peer_session != WILDCARD_SESSION)) {
if (less_eq(msg_session(msg), l_ptr->peer_session))
break; /* duplicate or old reset: ignore */
}
- if (!msg_redundant_link(msg) && (link_working_working(l_ptr) ||
- link_working_unknown(l_ptr))) {
- /*
- * peer has lost contact -- don't allow peer's links
+ if (!msg_redundant_link(msg) && (link_working(l_ptr) ||
+ link_probing(l_ptr))) {
+ /* peer has lost contact -- don't allow peer's links
* to reactivate before we recognize loss & clean up
*/
l_ptr->owner->action_flags |= TIPC_WAIT_OWN_LINKS_DOWN;
@@ -1432,7 +1464,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
link_state_event(l_ptr, TRAFFIC_EVT);
l_ptr->stats.recv_states++;
- if (link_reset_unknown(l_ptr))
+ if (link_resetting(l_ptr))
break;
if (less_eq(l_ptr->rcv_nxt, msg_next_sent(msg)))
@@ -1822,14 +1854,14 @@ static void link_print(struct tipc_link *l_ptr, const char *str)
pr_info("%s Link %x<%s>:", str, l_ptr->addr, b_ptr->name);
rcu_read_unlock();
- if (link_working_unknown(l_ptr))
- pr_cont(":WU\n");
- else if (link_reset_reset(l_ptr))
- pr_cont(":RR\n");
- else if (link_reset_unknown(l_ptr))
- pr_cont(":RU\n");
- else if (link_working_working(l_ptr))
- pr_cont(":WW\n");
+ if (link_probing(l_ptr))
+ pr_cont(":P\n");
+ else if (link_establishing(l_ptr))
+ pr_cont(":E\n");
+ else if (link_resetting(l_ptr))
+ pr_cont(":R\n");
+ else if (link_working(l_ptr))
+ pr_cont(":W\n");
else
pr_cont("\n");
}
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 0509c6d..ef68424 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -58,6 +58,13 @@ enum {
TIPC_LINK_TUNNEL
};
+/* Events occurring at packet reception or at timeout
+ */
+enum {
+ TIPC_LINK_UP_EVT = 1,
+ TIPC_LINK_DOWN_EVT = (1 << 1)
+};
+
/* Starting value for maximum packet size negotiation on unicast links
* (unless bearer MTU is less)
*/
--
1.9.1
^ permalink raw reply related [flat|nested] 15+ messages in thread
* [PATCH net-next 10/13] tipc: simplify link timer implementation
2015-07-16 20:54 [PATCH net-next 00/13] tipc: separate link and link aggregation layer Jon Maloy
` (8 preceding siblings ...)
2015-07-16 20:54 ` [PATCH net-next 09/13] tipc: improve link FSM implementation Jon Maloy
@ 2015-07-16 20:54 ` Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 11/13] tipc: move link supervision timer to node level Jon Maloy
` (3 subsequent siblings)
13 siblings, 0 replies; 15+ messages in thread
From: Jon Maloy @ 2015-07-16 20:54 UTC (permalink / raw)
To: davem
Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
tipc-discussion, Jon Maloy
We create a second, simpler, link timer function, tipc_link_timeout().
The new function makes use of the new FSM function introduced in the
previous commit, and just like it, takes a buffer queue as parameter.
It returns an event bit field and potentially a link protocol packet
to the caller.
The existing timer function, link_timeout(), is still needed for a
while, so we redesign it to become a wrapper around the new function.
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
net/tipc/link.c | 116 ++++++++++++++++++++++++++++++++++----------------------
net/tipc/link.h | 1 +
2 files changed, 72 insertions(+), 45 deletions(-)
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 5d2f919..f58bb43 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -193,60 +193,30 @@ int tipc_link_is_active(struct tipc_link *l)
/**
* link_timeout - handle expiration of link timer
- * @l_ptr: pointer to link
*/
static void link_timeout(unsigned long data)
{
- struct tipc_link *l_ptr = (struct tipc_link *)data;
+ struct tipc_link *l = (struct tipc_link *)data;
+ struct sk_buff_head xmitq;
struct sk_buff *skb;
+ int rc;
- tipc_node_lock(l_ptr->owner);
+ __skb_queue_head_init(&xmitq);
- /* update counters used in statistical profiling of send traffic */
- l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->transmq);
- l_ptr->stats.queue_sz_counts++;
+ tipc_node_lock(l->owner);
- skb = skb_peek(&l_ptr->transmq);
- if (skb) {
- struct tipc_msg *msg = buf_msg(skb);
- u32 length = msg_size(msg);
-
- if ((msg_user(msg) == MSG_FRAGMENTER) &&
- (msg_type(msg) == FIRST_FRAGMENT)) {
- length = msg_size(msg_get_wrapped(msg));
- }
- if (length) {
- l_ptr->stats.msg_lengths_total += length;
- l_ptr->stats.msg_length_counts++;
- if (length <= 64)
- l_ptr->stats.msg_length_profile[0]++;
- else if (length <= 256)
- l_ptr->stats.msg_length_profile[1]++;
- else if (length <= 1024)
- l_ptr->stats.msg_length_profile[2]++;
- else if (length <= 4096)
- l_ptr->stats.msg_length_profile[3]++;
- else if (length <= 16384)
- l_ptr->stats.msg_length_profile[4]++;
- else if (length <= 32768)
- l_ptr->stats.msg_length_profile[5]++;
- else
- l_ptr->stats.msg_length_profile[6]++;
- }
- }
+ rc = tipc_link_timeout(l, &xmitq);
- /* do all other link processing performed on a periodic basis */
- if (l_ptr->silent_intv_cnt)
- link_state_event(l_ptr, SILENCE_EVT);
- else if (link_working(l_ptr) && tipc_bclink_acks_missing(l_ptr->owner))
- tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0);
+ if (rc & TIPC_LINK_DOWN_EVT)
+ tipc_link_reset(l);
- l_ptr->silent_intv_cnt++;
- if (skb_queue_len(&l_ptr->backlogq))
- tipc_link_push_packets(l_ptr);
- link_set_timer(l_ptr, l_ptr->keepalive_intv);
- tipc_node_unlock(l_ptr->owner);
- tipc_link_put(l_ptr);
+ skb = __skb_dequeue(&xmitq);
+ if (skb)
+ tipc_bearer_send(l->owner->net, l->bearer_id,
+ skb, &l->media_addr);
+ link_set_timer(l, l->keepalive_intv);
+ tipc_node_unlock(l->owner);
+ tipc_link_put(l);
}
static void link_set_timer(struct tipc_link *link, unsigned long time)
@@ -499,6 +469,62 @@ static int tipc_link_fsm_evt(struct tipc_link *l, int evt,
return rc;
}
+/* link_profile_stats - update statistical profiling of traffic
+ */
+static void link_profile_stats(struct tipc_link *l)
+{
+ struct sk_buff *skb;
+ struct tipc_msg *msg;
+ int length;
+
+ /* Update counters used in statistical profiling of send traffic */
+ l->stats.accu_queue_sz += skb_queue_len(&l->transmq);
+ l->stats.queue_sz_counts++;
+
+ skb = skb_peek(&l->transmq);
+ if (!skb)
+ return;
+ msg = buf_msg(skb);
+ length = msg_size(msg);
+
+ if (msg_user(msg) == MSG_FRAGMENTER) {
+ if (msg_type(msg) != FIRST_FRAGMENT)
+ return;
+ length = msg_size(msg_get_wrapped(msg));
+ }
+ l->stats.msg_lengths_total += length;
+ l->stats.msg_length_counts++;
+ if (length <= 64)
+ l->stats.msg_length_profile[0]++;
+ else if (length <= 256)
+ l->stats.msg_length_profile[1]++;
+ else if (length <= 1024)
+ l->stats.msg_length_profile[2]++;
+ else if (length <= 4096)
+ l->stats.msg_length_profile[3]++;
+ else if (length <= 16384)
+ l->stats.msg_length_profile[4]++;
+ else if (length <= 32768)
+ l->stats.msg_length_profile[5]++;
+ else
+ l->stats.msg_length_profile[6]++;
+}
+
+/* tipc_link_timeout - perform periodic task as instructed from node timeout
+ */
+int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
+{
+ int rc = 0;
+
+ link_profile_stats(l);
+ if (l->silent_intv_cnt)
+ rc = tipc_link_fsm_evt(l, SILENCE_EVT, xmitq);
+ else if (link_working(l) && tipc_bclink_acks_missing(l->owner))
+ tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq);
+ l->silent_intv_cnt++;
+ return rc;
+}
+
/**
* link_schedule_user - schedule a message sender for wakeup after congestion
* @link: congested link
diff --git a/net/tipc/link.h b/net/tipc/link.h
index ef68424..98507b0 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -245,6 +245,7 @@ int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info);
int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info);
int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
void link_prepare_wakeup(struct tipc_link *l);
+int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq);
static inline u32 link_own_addr(struct tipc_link *l)
{
--
1.9.1
^ permalink raw reply related [flat|nested] 15+ messages in thread
* [PATCH net-next 11/13] tipc: move link supervision timer to node level
2015-07-16 20:54 [PATCH net-next 00/13] tipc: separate link and link aggregation layer Jon Maloy
` (9 preceding siblings ...)
2015-07-16 20:54 ` [PATCH net-next 10/13] tipc: simplify link timer implementation Jon Maloy
@ 2015-07-16 20:54 ` Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 12/13] tipc: introduce node contact FSM Jon Maloy
` (2 subsequent siblings)
13 siblings, 0 replies; 15+ messages in thread
From: Jon Maloy @ 2015-07-16 20:54 UTC (permalink / raw)
To: davem
Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
tipc-discussion, Jon Maloy
In our effort to move control of the links to the link aggregation
layer, we move the perodic link supervision timer to struct tipc_node.
The new timer is shared between all links belonging to the node, thus
saving resources, while still kicking the FSM on both its pertaining
links at each expiration.
The current link timer and corresponding functions are removed.
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
net/tipc/link.c | 78 +++------------------------------------------------------
net/tipc/link.h | 2 --
net/tipc/node.c | 66 +++++++++++++++++++++++++++++++++++++++++++++---
net/tipc/node.h | 2 ++
4 files changed, 68 insertions(+), 80 deletions(-)
diff --git a/net/tipc/link.c b/net/tipc/link.c
index f58bb43..5b4609b 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -127,7 +127,6 @@ static void link_handle_out_of_seq_msg(struct tipc_link *link,
struct sk_buff *skb);
static void tipc_link_proto_rcv(struct tipc_link *link,
struct sk_buff *skb);
-static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol);
static void link_state_event(struct tipc_link *l_ptr, u32 event);
static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
u16 rcvgap, int tolerance, int priority,
@@ -139,7 +138,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
static bool tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb);
-static void link_set_timer(struct tipc_link *link, unsigned long time);
static void link_activate(struct tipc_link *link);
/*
@@ -150,21 +148,6 @@ static unsigned int align(unsigned int i)
return (i + 3) & ~3u;
}
-static void tipc_link_release(struct kref *kref)
-{
- kfree(container_of(kref, struct tipc_link, ref));
-}
-
-static void tipc_link_get(struct tipc_link *l_ptr)
-{
- kref_get(&l_ptr->ref);
-}
-
-static void tipc_link_put(struct tipc_link *l_ptr)
-{
- kref_put(&l_ptr->ref, tipc_link_release);
-}
-
static struct tipc_link *tipc_parallel_link(struct tipc_link *l)
{
struct tipc_node *n = l->owner;
@@ -192,40 +175,6 @@ int tipc_link_is_active(struct tipc_link *l)
}
/**
- * link_timeout - handle expiration of link timer
- */
-static void link_timeout(unsigned long data)
-{
- struct tipc_link *l = (struct tipc_link *)data;
- struct sk_buff_head xmitq;
- struct sk_buff *skb;
- int rc;
-
- __skb_queue_head_init(&xmitq);
-
- tipc_node_lock(l->owner);
-
- rc = tipc_link_timeout(l, &xmitq);
-
- if (rc & TIPC_LINK_DOWN_EVT)
- tipc_link_reset(l);
-
- skb = __skb_dequeue(&xmitq);
- if (skb)
- tipc_bearer_send(l->owner->net, l->bearer_id,
- skb, &l->media_addr);
- link_set_timer(l, l->keepalive_intv);
- tipc_node_unlock(l->owner);
- tipc_link_put(l);
-}
-
-static void link_set_timer(struct tipc_link *link, unsigned long time)
-{
- if (!mod_timer(&link->timer, jiffies + time))
- tipc_link_get(link);
-}
-
-/**
* tipc_link_create - create a new link
* @n_ptr: pointer to associated node
* @b_ptr: pointer to associated bearer
@@ -265,7 +214,6 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
pr_warn("Link creation failed, no memory\n");
return NULL;
}
- kref_init(&l_ptr->ref);
l_ptr->addr = peer;
if_name = strchr(b_ptr->name, ':') + 1;
sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:unknown",
@@ -278,7 +226,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
l_ptr->owner = n_ptr;
l_ptr->peer_session = WILDCARD_SESSION;
l_ptr->bearer_id = b_ptr->identity;
- link_set_supervision_props(l_ptr, b_ptr->tolerance);
+ l_ptr->tolerance = b_ptr->tolerance;
l_ptr->state = TIPC_LINK_RESETTING;
l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
@@ -304,8 +252,6 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
skb_queue_head_init(l_ptr->inputq);
link_reset_statistics(l_ptr);
tipc_node_attach_link(n_ptr, l_ptr);
- setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr);
- link_set_timer(l_ptr, l_ptr->keepalive_intv);
return l_ptr;
}
@@ -316,12 +262,8 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
void tipc_link_delete(struct tipc_link *l)
{
tipc_link_reset(l);
- if (del_timer(&l->timer))
- tipc_link_put(l);
- /* Delete link now, or when timer is finished: */
tipc_link_reset_fragments(l);
tipc_node_detach_link(l->owner, l);
- tipc_link_put(l);
}
void tipc_link_delete_list(struct net *net, unsigned int bearer_id)
@@ -1447,7 +1389,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
msg_tol = msg_link_tolerance(msg);
if (msg_tol > l_ptr->tolerance)
- link_set_supervision_props(l_ptr, msg_tol);
+ l_ptr->tolerance = msg_tol;
if (msg_linkprio(msg) > l_ptr->priority)
l_ptr->priority = msg_linkprio(msg);
@@ -1473,7 +1415,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
msg_tol = msg_link_tolerance(msg);
if (msg_tol)
- link_set_supervision_props(l_ptr, msg_tol);
+ l_ptr->tolerance = msg_tol;
if (msg_linkprio(msg) &&
(msg_linkprio(msg) != l_ptr->priority)) {
@@ -1796,18 +1738,6 @@ exit:
return *skb;
}
-static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)
-{
- unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4;
-
- if ((tol < TIPC_MIN_LINK_TOL) || (tol > TIPC_MAX_LINK_TOL))
- return;
-
- l_ptr->tolerance = tol;
- l_ptr->keepalive_intv = msecs_to_jiffies(intv);
- l_ptr->abort_limit = tol / (jiffies_to_msecs(l_ptr->keepalive_intv));
-}
-
void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
{
int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE);
@@ -1984,7 +1914,7 @@ int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)
u32 tol;
tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
- link_set_supervision_props(link, tol);
+ link->tolerance = tol;
tipc_link_proto_xmit(link, STATE_MSG, 0, 0, tol, 0);
}
if (props[TIPC_NLA_PROP_PRIO]) {
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 98507b0..0cf7d2b 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -146,9 +146,7 @@ struct tipc_link {
u32 addr;
char name[TIPC_MAX_LINK_NAME];
struct tipc_media_addr media_addr;
- struct timer_list timer;
struct tipc_node *owner;
- struct kref ref;
/* Management and link supervision data */
u32 peer_session;
diff --git a/net/tipc/node.c b/net/tipc/node.c
index b7a4457..77effb2 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -44,6 +44,7 @@
static void node_lost_contact(struct tipc_node *n_ptr);
static void node_established_contact(struct tipc_node *n_ptr);
static void tipc_node_delete(struct tipc_node *node);
+static void tipc_node_timeout(unsigned long data);
struct tipc_sock_conn {
u32 port;
@@ -145,11 +146,27 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)
n_ptr->active_links[0] = INVALID_BEARER_ID;
n_ptr->active_links[1] = INVALID_BEARER_ID;
tipc_node_get(n_ptr);
+ setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr);
+ n_ptr->keepalive_intv = U32_MAX;
exit:
spin_unlock_bh(&tn->node_list_lock);
return n_ptr;
}
+static void tipc_node_calculate_timer(struct tipc_node *n, struct tipc_link *l)
+{
+ unsigned long tol = l->tolerance;
+ unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4;
+ unsigned long keepalive_intv = msecs_to_jiffies(intv);
+
+ /* Link with lowest tolerance determines timer interval */
+ if (keepalive_intv < n->keepalive_intv)
+ n->keepalive_intv = keepalive_intv;
+
+ /* Ensure link's abort limit corresponds to current interval */
+ l->abort_limit = l->tolerance / jiffies_to_msecs(n->keepalive_intv);
+}
+
static void tipc_node_delete(struct tipc_node *node)
{
list_del_rcu(&node->list);
@@ -163,8 +180,11 @@ void tipc_node_stop(struct net *net)
struct tipc_node *node, *t_node;
spin_lock_bh(&tn->node_list_lock);
- list_for_each_entry_safe(node, t_node, &tn->node_list, list)
+ list_for_each_entry_safe(node, t_node, &tn->node_list, list) {
+ if (del_timer(&node->timer))
+ tipc_node_put(node);
tipc_node_put(node);
+ }
spin_unlock_bh(&tn->node_list_lock);
}
@@ -222,6 +242,38 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
tipc_node_put(node);
}
+/* tipc_node_timeout - handle expiration of node timer
+ */
+static void tipc_node_timeout(unsigned long data)
+{
+ struct tipc_node *n = (struct tipc_node *)data;
+ struct sk_buff_head xmitq;
+ struct tipc_link *l;
+ struct tipc_media_addr *maddr;
+ int bearer_id;
+ int rc = 0;
+
+ __skb_queue_head_init(&xmitq);
+
+ for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
+ tipc_node_lock(n);
+ l = n->links[bearer_id].link;
+ if (l) {
+ /* Link tolerance may change asynchronously: */
+ tipc_node_calculate_timer(n, l);
+ rc = tipc_link_timeout(l, &xmitq);
+ if (rc & TIPC_LINK_DOWN_EVT)
+ tipc_link_reset(l);
+ }
+ tipc_node_unlock(n);
+ maddr = &n->links[bearer_id].maddr;
+ tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr);
+ }
+ if (!mod_timer(&n->timer, jiffies + n->keepalive_intv))
+ tipc_node_get(n);
+ tipc_node_put(n);
+}
+
/**
* tipc_node_link_up - handle addition of link
*
@@ -335,10 +387,16 @@ bool tipc_node_update_dest(struct tipc_node *n, struct tipc_bearer *b,
struct tipc_media_addr *curr = &n->links[b->identity].maddr;
struct sk_buff_head *inputq = &n->links[b->identity].inputq;
- if (!l)
+ if (!l) {
l = tipc_link_create(n, b, maddr, inputq, &n->bclink.namedq);
- if (!l)
- return false;
+ if (!l)
+ return false;
+ tipc_node_calculate_timer(n, l);
+ if (n->link_cnt == 1) {
+ if (!mod_timer(&n->timer, jiffies + n->keepalive_intv))
+ tipc_node_get(n);
+ }
+ }
memcpy(&l->media_addr, maddr, sizeof(*maddr));
memcpy(curr, maddr, sizeof(*maddr));
tipc_link_reset(l);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 86b7c74..2d56344 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -140,6 +140,8 @@ struct tipc_node {
u32 link_id;
struct list_head publ_list;
struct list_head conn_sks;
+ unsigned long keepalive_intv;
+ struct timer_list timer;
struct rcu_head rcu;
};
--
1.9.1
^ permalink raw reply related [flat|nested] 15+ messages in thread
* [PATCH net-next 12/13] tipc: introduce node contact FSM
2015-07-16 20:54 [PATCH net-next 00/13] tipc: separate link and link aggregation layer Jon Maloy
` (10 preceding siblings ...)
2015-07-16 20:54 ` [PATCH net-next 11/13] tipc: move link supervision timer to node level Jon Maloy
@ 2015-07-16 20:54 ` Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 13/13] tipc: reduce locking scope during packet reception Jon Maloy
2015-07-21 3:41 ` [PATCH net-next 00/13] tipc: separate link and link aggregation layer David Miller
13 siblings, 0 replies; 15+ messages in thread
From: Jon Maloy @ 2015-07-16 20:54 UTC (permalink / raw)
To: davem
Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
tipc-discussion, Jon Maloy
The logics for determining when a node is permitted to establish
and maintain contact with its peer node becomes non-trivial in the
presence of multiple parallel links that may come and go independently.
A known failure scenario is that one endpoint registers both its links
to the peer lost, cleans up it binding table, and prepares for a table
update once contact is re-establihed, while the other endpoint may
see its links reset and re-established one by one, hence seeing
no need to re-synchronize the binding table. To avoid this, a node
must not allow re-establishing contact until it has confirmation that
even the peer has lost both links.
Currently, the mechanism for handling this consists of setting and
resetting two state flags from different locations in the code. This
solution is hard to understand and maintain. A closer analysis even
reveals that it is not completely safe.
In this commit we do instead introduce an FSM that keeps track of
the conditions for when the node can establish and maintain links.
It has six states and four events, and is strictly based on explicit
knowledge about the own node's and the peer node's contact states.
Only events leading to state change are shown as edges in the figure
below.
+--------------+
| SELF_UP/ |
+---------------->| PEER_COMING |-----------------+
SELF_ | +--------------+ |PEER_
ESTBL_ | | |ESTBL_
CONTACT| SELF_LOST_CONTACT | |CONTACT
| v |
| +--------------+ |
| PEER_ | SELF_DOWN/ | SELF_ |
| LOST_ +--| PEER_LEAVING |<--+ LOST_ v
+-------------+ CONTACT | +--------------+ | CONTACT +-----------+
| SELF_DOWN/ |<----------+ +----------| SELF_UP/ |
| PEER_DOWN |<----------+ +----------| PEER_UP |
+-------------+ SELF_ | +--------------+ | PEER_ +-----------+
| LOST_ +--| SELF_LEAVING/|<--+ LOST_ A
| CONTACT | PEER_DOWN | CONTACT |
| +--------------+ |
| A |
PEER_ | PEER_LOST_CONTACT | |SELF_
ESTBL_ | | |ESTBL_
CONTACT| +--------------+ |CONTACT
+---------------->| PEER_UP/ |-----------------+
| SELF_COMING |
+--------------+
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
net/tipc/link.c | 74 ++++++++++++++------------------
net/tipc/msg.h | 7 +++
net/tipc/node.c | 130 +++++++++++++++++++++++++++++++++++++++++++++++++++++---
net/tipc/node.h | 28 +++++++++---
4 files changed, 185 insertions(+), 54 deletions(-)
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 5b4609b..eaccf45 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -911,9 +911,13 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
if (l_ptr->addr) {
/* Handle failure on standard link */
- link_print(l_ptr, "Resetting link\n");
+ link_print(l_ptr, "Resetting link ");
+ pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
+ msg_user(msg), msg_type(msg), msg_size(msg),
+ msg_errcode(msg));
+ pr_info("sqno %u, prev: %x, src: %x\n",
+ msg_seqno(msg), msg_prevnode(msg), msg_orignode(msg));
tipc_link_reset(l_ptr);
-
} else {
/* Handle failure on broadcast link */
struct tipc_node *n_ptr;
@@ -1067,15 +1071,8 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
if (unlikely(!l_ptr))
goto unlock;
- /* Verify that communication with node is currently allowed */
- if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) &&
- msg_user(msg) == LINK_PROTOCOL &&
- (msg_type(msg) == RESET_MSG ||
- msg_type(msg) == ACTIVATE_MSG) &&
- !msg_redundant_link(msg))
- n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN;
-
- if (tipc_node_blocked(n_ptr))
+ /* Is reception of this pkt permitted at the moment ? */
+ if (!tipc_node_filter_skb(n_ptr, msg))
goto unlock;
/* Validate message sequence number info */
@@ -1371,15 +1368,6 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
if (less_eq(msg_session(msg), l_ptr->peer_session))
break; /* duplicate or old reset: ignore */
}
-
- if (!msg_redundant_link(msg) && (link_working(l_ptr) ||
- link_probing(l_ptr))) {
- /* peer has lost contact -- don't allow peer's links
- * to reactivate before we recognize loss & clean up
- */
- l_ptr->owner->action_flags |= TIPC_WAIT_OWN_LINKS_DOWN;
- }
-
link_state_event(l_ptr, RESET_MSG);
/* fall thru' */
@@ -1408,6 +1396,8 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
l_ptr->peer_session = msg_session(msg);
l_ptr->peer_bearer_id = msg_bearer_id(msg);
+ if (!msg_peer_is_up(msg))
+ tipc_node_fsm_evt(l_ptr->owner, PEER_LOST_CONTACT_EVT);
if (msg_type(msg) == ACTIVATE_MSG)
link_state_event(l_ptr, ACTIVATE_MSG);
break;
@@ -1419,11 +1409,11 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
if (msg_linkprio(msg) &&
(msg_linkprio(msg) != l_ptr->priority)) {
- pr_debug("%s<%s>, priority change %u->%u\n",
- link_rst_msg, l_ptr->name,
- l_ptr->priority, msg_linkprio(msg));
+ pr_info("%s<%s>, priority change %u->%u\n",
+ link_rst_msg, l_ptr->name,
+ l_ptr->priority, msg_linkprio(msg));
l_ptr->priority = msg_linkprio(msg);
- tipc_link_reset(l_ptr); /* Enforce change to take effect */
+ tipc_link_reset(l_ptr);
break;
}
@@ -1446,15 +1436,18 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
tipc_bclink_update_link_state(l_ptr->owner,
msg_last_bcast(msg));
- if (rec_gap || (msg_probe(msg))) {
+ if (rec_gap || (msg_probe(msg)))
tipc_link_proto_xmit(l_ptr, STATE_MSG, 0,
rec_gap, 0, 0);
- }
+
if (msg_seq_gap(msg)) {
l_ptr->stats.recv_nacks++;
tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq),
msg_seq_gap(msg));
}
+ if (tipc_link_is_up(l_ptr))
+ tipc_node_fsm_evt(l_ptr->owner,
+ PEER_ESTABL_CONTACT_EVT);
break;
}
exit:
@@ -1478,10 +1471,6 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
if (l->exec_mode == TIPC_LINK_BLOCKED)
return;
- /* Abort non-RESET send if communication with node is prohibited */
- if ((tipc_node_blocked(l->owner)) && (mtyp != RESET_MSG))
- return;
-
msg_set_type(hdr, mtyp);
msg_set_net_plane(hdr, l->net_plane);
msg_set_bcast_ack(hdr, l->owner->bclink.last_in);
@@ -1799,27 +1788,28 @@ static void link_reset_statistics(struct tipc_link *l_ptr)
l_ptr->stats.recv_info = l_ptr->rcv_nxt;
}
-static void link_print(struct tipc_link *l_ptr, const char *str)
+static void link_print(struct tipc_link *l, const char *str)
{
- struct tipc_net *tn = net_generic(l_ptr->owner->net, tipc_net_id);
- struct tipc_bearer *b_ptr;
+ struct sk_buff *hskb = skb_peek(&l->transmq);
+ u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt;
+ u16 tail = l->snd_nxt - 1;
- rcu_read_lock();
- b_ptr = rcu_dereference_rtnl(tn->bearer_list[l_ptr->bearer_id]);
- if (b_ptr)
- pr_info("%s Link %x<%s>:", str, l_ptr->addr, b_ptr->name);
- rcu_read_unlock();
+ pr_info("%s Link <%s>:", str, l->name);
- if (link_probing(l_ptr))
+ if (link_probing(l))
pr_cont(":P\n");
- else if (link_establishing(l_ptr))
+ else if (link_establishing(l))
pr_cont(":E\n");
- else if (link_resetting(l_ptr))
+ else if (link_resetting(l))
pr_cont(":R\n");
- else if (link_working(l_ptr))
+ else if (link_working(l))
pr_cont(":W\n");
else
pr_cont("\n");
+
+ pr_info("XMTQ: %u [%u-%u], BKLGQ: %u, SNDNX: %u, RCVNX: %u\n",
+ skb_queue_len(&l->transmq), head, tail,
+ skb_queue_len(&l->backlogq), l->snd_nxt, l->rcv_nxt);
}
/* Parse and validate nested (link) properties valid for media, bearer and link
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 19c45fb..4dc66d9 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -766,6 +766,13 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
msg_set_bits(m, 9, 0, 0xffff, n);
}
+static inline bool msg_peer_is_up(struct tipc_msg *m)
+{
+ if (likely(msg_user(m) != LINK_PROTOCOL) || (msg_type(m) == STATE_MSG))
+ return true;
+ return msg_redundant_link(m);
+}
+
struct sk_buff *tipc_buf_acquire(u32 size);
bool tipc_msg_validate(struct sk_buff *skb);
bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode,
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 77effb2..9dbbb5d 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -141,7 +141,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)
break;
}
list_add_tail_rcu(&n_ptr->list, &temp_node->list);
- n_ptr->action_flags = TIPC_WAIT_PEER_LINKS_DOWN;
+ n_ptr->state = SELF_DOWN_PEER_DOWN;
n_ptr->signature = INVALID_NODE_SIG;
n_ptr->active_links[0] = INVALID_BEARER_ID;
n_ptr->active_links[1] = INVALID_BEARER_ID;
@@ -421,8 +421,131 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
}
}
+/* tipc_node_fsm_evt - node finite state machine
+ * Determines when contact is allowed with peer node
+ */
+void tipc_node_fsm_evt(struct tipc_node *n, int evt)
+{
+ int state = n->state;
+
+ switch (state) {
+ case SELF_DOWN_PEER_DOWN:
+ switch (evt) {
+ case SELF_ESTABL_CONTACT_EVT:
+ state = SELF_UP_PEER_COMING;
+ break;
+ case PEER_ESTABL_CONTACT_EVT:
+ state = SELF_COMING_PEER_UP;
+ break;
+ case SELF_LOST_CONTACT_EVT:
+ case PEER_LOST_CONTACT_EVT:
+ break;
+ default:
+ pr_err("Unknown node fsm evt %x/%x\n", state, evt);
+ }
+ break;
+ case SELF_UP_PEER_UP:
+ switch (evt) {
+ case SELF_LOST_CONTACT_EVT:
+ state = SELF_DOWN_PEER_LEAVING;
+ break;
+ case PEER_LOST_CONTACT_EVT:
+ state = SELF_LEAVING_PEER_DOWN;
+ break;
+ case SELF_ESTABL_CONTACT_EVT:
+ case PEER_ESTABL_CONTACT_EVT:
+ break;
+ default:
+ pr_err("Unknown node fsm evt %x/%x\n", state, evt);
+ }
+ break;
+ case SELF_DOWN_PEER_LEAVING:
+ switch (evt) {
+ case PEER_LOST_CONTACT_EVT:
+ state = SELF_DOWN_PEER_DOWN;
+ break;
+ case SELF_ESTABL_CONTACT_EVT:
+ case PEER_ESTABL_CONTACT_EVT:
+ case SELF_LOST_CONTACT_EVT:
+ break;
+ default:
+ pr_err("Unknown node fsm evt %x/%x\n", state, evt);
+ }
+ break;
+ case SELF_UP_PEER_COMING:
+ switch (evt) {
+ case PEER_ESTABL_CONTACT_EVT:
+ state = SELF_UP_PEER_UP;
+ break;
+ case SELF_LOST_CONTACT_EVT:
+ state = SELF_DOWN_PEER_LEAVING;
+ break;
+ case SELF_ESTABL_CONTACT_EVT:
+ case PEER_LOST_CONTACT_EVT:
+ break;
+ default:
+ pr_err("Unknown node fsm evt %x/%x\n", state, evt);
+ }
+ break;
+ case SELF_COMING_PEER_UP:
+ switch (evt) {
+ case SELF_ESTABL_CONTACT_EVT:
+ state = SELF_UP_PEER_UP;
+ break;
+ case PEER_LOST_CONTACT_EVT:
+ state = SELF_LEAVING_PEER_DOWN;
+ break;
+ case SELF_LOST_CONTACT_EVT:
+ case PEER_ESTABL_CONTACT_EVT:
+ break;
+ default:
+ pr_err("Unknown node fsm evt %x/%x\n", state, evt);
+ }
+ break;
+ case SELF_LEAVING_PEER_DOWN:
+ switch (evt) {
+ case SELF_LOST_CONTACT_EVT:
+ state = SELF_DOWN_PEER_DOWN;
+ break;
+ case SELF_ESTABL_CONTACT_EVT:
+ case PEER_ESTABL_CONTACT_EVT:
+ case PEER_LOST_CONTACT_EVT:
+ break;
+ default:
+ pr_err("Unknown node fsm evt %x/%x\n", state, evt);
+ }
+ break;
+ default:
+ pr_err("Unknown node fsm state %x\n", state);
+ break;
+ }
+
+ n->state = state;
+}
+
+bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_msg *hdr)
+{
+ int state = n->state;
+
+ if (likely(state == SELF_UP_PEER_UP))
+ return true;
+ if (state == SELF_DOWN_PEER_DOWN)
+ return true;
+ if (state == SELF_UP_PEER_COMING)
+ return true;
+ if (state == SELF_COMING_PEER_UP)
+ return true;
+ if (state == SELF_LEAVING_PEER_DOWN)
+ return false;
+ if (state == SELF_DOWN_PEER_LEAVING)
+ if (!msg_peer_is_up(hdr))
+ return true;
+ return false;
+}
+
static void node_established_contact(struct tipc_node *n_ptr)
{
+ tipc_node_fsm_evt(n_ptr, SELF_ESTABL_CONTACT_EVT);
n_ptr->action_flags |= TIPC_NOTIFY_NODE_UP;
n_ptr->bclink.oos_state = 0;
n_ptr->bclink.acked = tipc_bclink_get_last_sent(n_ptr->net);
@@ -468,11 +591,8 @@ static void node_lost_contact(struct tipc_node *n_ptr)
l_ptr->failover_skb = NULL;
tipc_link_reset_fragments(l_ptr);
}
-
- n_ptr->action_flags &= ~TIPC_WAIT_OWN_LINKS_DOWN;
-
/* Prevent re-contact with node until cleanup is done */
- n_ptr->action_flags |= TIPC_WAIT_PEER_LINKS_DOWN;
+ tipc_node_fsm_evt(n_ptr, SELF_LOST_CONTACT_EVT);
/* Notify publications from this node */
n_ptr->action_flags |= TIPC_NOTIFY_NODE_DOWN;
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 2d56344..270256e 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -47,6 +47,24 @@
#define INVALID_BEARER_ID -1
+/* Node FSM states and events:
+ */
+enum {
+ SELF_DOWN_PEER_DOWN = 0xdd,
+ SELF_UP_PEER_UP = 0xaa,
+ SELF_DOWN_PEER_LEAVING = 0xd1,
+ SELF_UP_PEER_COMING = 0xac,
+ SELF_COMING_PEER_UP = 0xca,
+ SELF_LEAVING_PEER_DOWN = 0x1d,
+};
+
+enum {
+ SELF_ESTABL_CONTACT_EVT = 0xec,
+ SELF_LOST_CONTACT_EVT = 0x1c,
+ PEER_ESTABL_CONTACT_EVT = 0xfec,
+ PEER_LOST_CONTACT_EVT = 0xf1c
+};
+
/* Flags used to take different actions according to flag type
* TIPC_WAIT_PEER_LINKS_DOWN: wait to see that peer's links are down
* TIPC_WAIT_OWN_LINKS_DOWN: wait until peer node is declared down
@@ -56,8 +74,6 @@
*/
enum {
TIPC_MSG_EVT = 1,
- TIPC_WAIT_PEER_LINKS_DOWN = (1 << 1),
- TIPC_WAIT_OWN_LINKS_DOWN = (1 << 2),
TIPC_NOTIFY_NODE_DOWN = (1 << 3),
TIPC_NOTIFY_NODE_UP = (1 << 4),
TIPC_WAKEUP_BCAST_USERS = (1 << 5),
@@ -133,6 +149,7 @@ struct tipc_node {
int action_flags;
struct tipc_node_bclink bclink;
struct list_head list;
+ int state;
int link_cnt;
u16 working_links;
u16 capabilities;
@@ -176,11 +193,8 @@ static inline void tipc_node_lock(struct tipc_node *node)
spin_lock_bh(&node->lock);
}
-static inline bool tipc_node_blocked(struct tipc_node *node)
-{
- return (node->action_flags & (TIPC_WAIT_PEER_LINKS_DOWN |
- TIPC_NOTIFY_NODE_DOWN | TIPC_WAIT_OWN_LINKS_DOWN));
-}
+void tipc_node_fsm_evt(struct tipc_node *n, int evt);
+bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_msg *hdr);
static inline struct tipc_link *node_active_link(struct tipc_node *n, int sel)
{
--
1.9.1
^ permalink raw reply related [flat|nested] 15+ messages in thread
* [PATCH net-next 13/13] tipc: reduce locking scope during packet reception
2015-07-16 20:54 [PATCH net-next 00/13] tipc: separate link and link aggregation layer Jon Maloy
` (11 preceding siblings ...)
2015-07-16 20:54 ` [PATCH net-next 12/13] tipc: introduce node contact FSM Jon Maloy
@ 2015-07-16 20:54 ` Jon Maloy
2015-07-21 3:41 ` [PATCH net-next 00/13] tipc: separate link and link aggregation layer David Miller
13 siblings, 0 replies; 15+ messages in thread
From: Jon Maloy @ 2015-07-16 20:54 UTC (permalink / raw)
To: davem
Cc: netdev, Paul Gortmaker, erik.hugne, ying.xue, maloy,
tipc-discussion, Jon Maloy
We convert packet/message reception according to the same principle
we have been using for message sending and timeout handling:
We move the function tipc_rcv() to node.c, hence handling the initial
packet reception at the link aggregation level. The function grabs
the node lock, selects the receiving link, and accesses it via a new
call tipc_link_rcv(). This function appends buffers to the input
queue for delivery upwards, but it may also append outgoing packets
to the xmit queue, just as we do during regular message sending. The
latter will happen when buffers are forwarded from the link backlog,
or when retransmission is requested.
Upon return of this function, and after having released the node lock,
tipc_rcv() delivers/tranmsits the contents of those queues, but it may
also perform actions such as link activation or reset, as indicated by
the return flags from the link.
This reduces the number of cpu cycles spent inside the node spinlock,
and reduces contention on that lock.
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
net/tipc/bcast.c | 23 ++
net/tipc/bcast.h | 1 +
net/tipc/core.h | 5 +
net/tipc/link.c | 673 +++++++++++++++++++++++++------------------------------
net/tipc/link.h | 6 +-
net/tipc/msg.h | 50 ++++-
net/tipc/node.c | 105 ++++++++-
net/tipc/node.h | 4 -
8 files changed, 478 insertions(+), 389 deletions(-)
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index aab4e8d..8b010c9 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -316,6 +316,29 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
}
}
+void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *hdr)
+{
+ u16 last = msg_last_bcast(hdr);
+ int mtyp = msg_type(hdr);
+
+ if (unlikely(msg_user(hdr) != LINK_PROTOCOL))
+ return;
+ if (mtyp == STATE_MSG) {
+ tipc_bclink_update_link_state(n, last);
+ return;
+ }
+ /* Compatibility: older nodes don't know BCAST_PROTOCOL synchronization,
+ * and transfer synch info in LINK_PROTOCOL messages.
+ */
+ if (tipc_node_is_up(n))
+ return;
+ if ((mtyp != RESET_MSG) && (mtyp != ACTIVATE_MSG))
+ return;
+ n->bclink.last_sent = last;
+ n->bclink.last_in = last;
+ n->bclink.oos_state = 0;
+}
+
/**
* bclink_peek_nack - monitor retransmission requests sent by other nodes
*
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 3c290a48..d74c69b 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -133,5 +133,6 @@ void tipc_bclink_wakeup_users(struct net *net);
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
void tipc_bclink_input(struct net *net);
+void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *msg);
#endif
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 0fcf133..f4ed677 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -129,6 +129,11 @@ static inline int less(u16 left, u16 right)
return less_eq(left, right) && (mod(right) != mod(left));
}
+static inline int in_range(u16 val, u16 min, u16 max)
+{
+ return !less(val, min) && !more(val, max);
+}
+
#ifdef CONFIG_SYSCTL
int tipc_register_sysctl(void);
void tipc_unregister_sysctl(void);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index eaccf45..55b675d 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -77,6 +77,10 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
};
/*
+ * Interval between NACKs when packets arrive out of order
+ */
+#define TIPC_NACK_INTV (TIPC_MIN_LINK_WIN * 2)
+/*
* Out-of-range value for link session numbers
*/
#define WILDCARD_SESSION 0x10000
@@ -123,22 +127,19 @@ static int link_establishing(struct tipc_link *l)
return l->state == TIPC_LINK_ESTABLISHING;
}
-static void link_handle_out_of_seq_msg(struct tipc_link *link,
- struct sk_buff *skb);
-static void tipc_link_proto_rcv(struct tipc_link *link,
- struct sk_buff *skb);
-static void link_state_event(struct tipc_link *l_ptr, u32 event);
+static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
+ struct sk_buff_head *xmitq);
static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
u16 rcvgap, int tolerance, int priority,
struct sk_buff_head *xmitq);
static void link_reset_statistics(struct tipc_link *l_ptr);
static void link_print(struct tipc_link *l_ptr, const char *str);
-static void tipc_link_sync_xmit(struct tipc_link *l);
+static void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
+ struct sk_buff_head *xmitq);
static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
static bool tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb);
-static void link_activate(struct tipc_link *link);
/*
* Simple link routines
@@ -283,6 +284,26 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id)
rcu_read_unlock();
}
+/* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints.
+ *
+ * Give a newly added peer node the sequence number where it should
+ * start receiving and acking broadcast packets.
+ */
+static void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
+ struct sk_buff_head *xmitq)
+{
+ struct sk_buff *skb;
+ struct sk_buff_head list;
+
+ skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE,
+ 0, l->addr, link_own_addr(l), 0, 0, 0);
+ if (!skb)
+ return;
+ __skb_queue_head_init(&list);
+ __skb_queue_tail(&list, skb);
+ tipc_link_xmit(l, &list, xmitq);
+}
+
/**
* tipc_link_fsm_evt - link finite state machine
* @l: pointer to link
@@ -295,12 +316,13 @@ static int tipc_link_fsm_evt(struct tipc_link *l, int evt,
int mtyp = 0, rc = 0;
struct tipc_link *pl;
enum {
- LINK_RESET = 1,
- LINK_ACTIVATE = (1 << 1),
- SND_PROBE = (1 << 2),
- SND_STATE = (1 << 3),
- SND_RESET = (1 << 4),
- SND_ACTIVATE = (1 << 5)
+ LINK_RESET = 1,
+ LINK_ACTIVATE = (1 << 1),
+ SND_PROBE = (1 << 2),
+ SND_STATE = (1 << 3),
+ SND_RESET = (1 << 4),
+ SND_ACTIVATE = (1 << 5),
+ SND_BCAST_SYNC = (1 << 6)
} actions = 0;
if (l->exec_mode == TIPC_LINK_BLOCKED)
@@ -352,8 +374,8 @@ static int tipc_link_fsm_evt(struct tipc_link *l, int evt,
if (pl && link_probing(pl))
break;
actions |= LINK_ACTIVATE;
- if (l->owner->working_links == 1)
- tipc_link_sync_xmit(l);
+ if (!l->owner->working_links)
+ actions |= SND_BCAST_SYNC;
break;
case PEER_RESET_EVT:
l->state = TIPC_LINK_ESTABLISHING;
@@ -374,8 +396,8 @@ static int tipc_link_fsm_evt(struct tipc_link *l, int evt,
if (pl && link_probing(pl))
break;
actions |= LINK_ACTIVATE;
- if (l->owner->working_links == 1)
- tipc_link_sync_xmit(l);
+ if (!l->owner->working_links)
+ actions |= SND_BCAST_SYNC;
break;
case PEER_RESET_EVT:
break;
@@ -408,6 +430,8 @@ static int tipc_link_fsm_evt(struct tipc_link *l, int evt,
if (actions & (SND_PROBE | SND_STATE | SND_RESET | SND_ACTIVATE))
tipc_link_build_proto_msg(l, mtyp, actions & SND_PROBE,
0, 0, 0, xmitq);
+ if (actions & SND_BCAST_SYNC)
+ tipc_link_build_bcast_sync_msg(l, xmitq);
return rc;
}
@@ -605,12 +629,14 @@ void tipc_link_reset(struct tipc_link *l_ptr)
l_ptr->reasm_buf = NULL;
l_ptr->rcv_unacked = 0;
l_ptr->snd_nxt = 1;
+ l_ptr->rcv_nxt = 1;
l_ptr->silent_intv_cnt = 0;
+ l_ptr->stats.recv_info = 0;
l_ptr->stale_count = 0;
link_reset_statistics(l_ptr);
}
-static void link_activate(struct tipc_link *link)
+void tipc_link_activate(struct tipc_link *link)
{
struct tipc_node *node = link->owner;
@@ -624,36 +650,6 @@ static void link_activate(struct tipc_link *link)
}
/**
- * link_state_event - link finite state machine
- * @l_ptr: pointer to link
- * @event: state machine event to process
- */
-static void link_state_event(struct tipc_link *l, unsigned int evt)
-{
- int rc;
- struct sk_buff_head xmitq;
- struct sk_buff *skb;
-
- if (l->exec_mode == TIPC_LINK_BLOCKED)
- return;
-
- __skb_queue_head_init(&xmitq);
-
- rc = tipc_link_fsm_evt(l, evt, &xmitq);
-
- if (rc & TIPC_LINK_UP_EVT)
- link_activate(l);
-
- if (rc & TIPC_LINK_DOWN_EVT)
- tipc_link_reset(l);
-
- skb = __skb_dequeue(&xmitq);
- if (!skb)
- return;
- tipc_bearer_send(l->owner->net, l->bearer_id, skb, &l->media_addr);
-}
-
-/**
* __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
* @link: link to use
* @list: chain of buffers containing message
@@ -808,30 +804,6 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
}
/*
- * tipc_link_sync_xmit - synchronize broadcast link endpoints.
- *
- * Give a newly added peer node the sequence number where it should
- * start receiving and acking broadcast packets.
- *
- * Called with node locked
- */
-static void tipc_link_sync_xmit(struct tipc_link *link)
-{
- struct sk_buff *skb;
- struct tipc_msg *msg;
-
- skb = tipc_buf_acquire(INT_H_SIZE);
- if (!skb)
- return;
-
- msg = buf_msg(skb);
- tipc_msg_init(link_own_addr(link), msg, BCAST_PROTOCOL, STATE_MSG,
- INT_H_SIZE, link->addr);
- msg_set_last_bcast(msg, link->owner->bclink.acked);
- __tipc_link_xmit_skb(link, skb);
-}
-
-/*
* tipc_link_sync_rcv - synchronize broadcast link endpoints.
* Receive the sequence number where we should start receiving and
* acking broadcast packets from a newly added peer node, and open
@@ -881,6 +853,34 @@ void tipc_link_push_packets(struct tipc_link *link)
link->snd_nxt = seqno;
}
+void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
+{
+ struct sk_buff *skb, *_skb;
+ struct tipc_msg *hdr;
+ u16 seqno = l->snd_nxt;
+ u16 ack = l->rcv_nxt - 1;
+
+ while (skb_queue_len(&l->transmq) < l->window) {
+ skb = skb_peek(&l->backlogq);
+ if (!skb)
+ break;
+ _skb = skb_clone(skb, GFP_ATOMIC);
+ if (!_skb)
+ break;
+ __skb_dequeue(&l->backlogq);
+ hdr = buf_msg(skb);
+ l->backlog[msg_importance(hdr)].len--;
+ __skb_queue_tail(&l->transmq, skb);
+ __skb_queue_tail(xmitq, _skb);
+ msg_set_ack(hdr, ack);
+ msg_set_seqno(hdr, seqno);
+ msg_set_bcast_ack(hdr, l->owner->bclink.last_in);
+ l->rcv_unacked = 0;
+ seqno++;
+ }
+ l->snd_nxt = seqno;
+}
+
void tipc_link_reset_all(struct tipc_node *node)
{
char addr_string[16];
@@ -978,6 +978,41 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
}
}
+static int tipc_link_retransm(struct tipc_link *l, int retransm,
+ struct sk_buff_head *xmitq)
+{
+ struct sk_buff *_skb, *skb = skb_peek(&l->transmq);
+ struct tipc_msg *hdr;
+
+ if (!skb)
+ return 0;
+
+ /* Detect repeated retransmit failures on same packet */
+ if (likely(l->last_retransm != buf_seqno(skb))) {
+ l->last_retransm = buf_seqno(skb);
+ l->stale_count = 1;
+ } else if (++l->stale_count > 100) {
+ link_retransmit_failure(l, skb);
+ return TIPC_LINK_DOWN_EVT;
+ }
+ skb_queue_walk(&l->transmq, skb) {
+ if (!retransm)
+ return 0;
+ hdr = buf_msg(skb);
+ _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC);
+ if (!_skb)
+ return 0;
+ hdr = buf_msg(_skb);
+ msg_set_ack(hdr, l->rcv_nxt - 1);
+ msg_set_bcast_ack(hdr, l->owner->bclink.last_in);
+ _skb->priority = TC_PRIO_CONTROL;
+ __skb_queue_tail(xmitq, _skb);
+ retransm--;
+ l->stats.retransmitted++;
+ }
+ return 0;
+}
+
/* link_synch(): check if all packets arrived before the synch
* point have been consumed
* Returns true if the parallel links are synched, otherwise false
@@ -1004,155 +1039,6 @@ synched:
return true;
}
-static void link_retrieve_defq(struct tipc_link *link,
- struct sk_buff_head *list)
-{
- u16 seq_no;
-
- if (skb_queue_empty(&link->deferdq))
- return;
-
- seq_no = buf_seqno(skb_peek(&link->deferdq));
- if (seq_no == link->rcv_nxt)
- skb_queue_splice_tail_init(&link->deferdq, list);
-}
-
-/**
- * tipc_rcv - process TIPC packets/messages arriving from off-node
- * @net: the applicable net namespace
- * @skb: TIPC packet
- * @b_ptr: pointer to bearer message arrived on
- *
- * Invoked with no locks held. Bearer pointer must point to a valid bearer
- * structure (i.e. cannot be NULL), but bearer can be inactive.
- */
-void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
-{
- struct tipc_net *tn = net_generic(net, tipc_net_id);
- struct sk_buff_head head;
- struct tipc_node *n_ptr;
- struct tipc_link *l_ptr;
- struct sk_buff *skb1, *tmp;
- struct tipc_msg *msg;
- u16 seq_no;
- u16 ackd;
- u32 released;
-
- skb2list(skb, &head);
-
- while ((skb = __skb_dequeue(&head))) {
- /* Ensure message is well-formed */
- if (unlikely(!tipc_msg_validate(skb)))
- goto discard;
-
- /* Handle arrival of a non-unicast link message */
- msg = buf_msg(skb);
- if (unlikely(msg_non_seq(msg))) {
- if (msg_user(msg) == LINK_CONFIG)
- tipc_disc_rcv(net, skb, b_ptr);
- else
- tipc_bclink_rcv(net, skb);
- continue;
- }
-
- /* Discard unicast link messages destined for another node */
- if (unlikely(!msg_short(msg) &&
- (msg_destnode(msg) != tn->own_addr)))
- goto discard;
-
- /* Locate neighboring node that sent message */
- n_ptr = tipc_node_find(net, msg_prevnode(msg));
- if (unlikely(!n_ptr))
- goto discard;
-
- tipc_node_lock(n_ptr);
- /* Locate unicast link endpoint that should handle message */
- l_ptr = n_ptr->links[b_ptr->identity].link;
- if (unlikely(!l_ptr))
- goto unlock;
-
- /* Is reception of this pkt permitted at the moment ? */
- if (!tipc_node_filter_skb(n_ptr, msg))
- goto unlock;
-
- /* Validate message sequence number info */
- seq_no = msg_seqno(msg);
- ackd = msg_ack(msg);
-
- /* Release acked messages */
- if (unlikely(n_ptr->bclink.acked != msg_bcast_ack(msg)))
- tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
-
- released = 0;
- skb_queue_walk_safe(&l_ptr->transmq, skb1, tmp) {
- if (more(buf_seqno(skb1), ackd))
- break;
- __skb_unlink(skb1, &l_ptr->transmq);
- kfree_skb(skb1);
- released = 1;
- }
-
- /* Try sending any messages link endpoint has pending */
- if (unlikely(skb_queue_len(&l_ptr->backlogq)))
- tipc_link_push_packets(l_ptr);
-
- if (released && !skb_queue_empty(&l_ptr->wakeupq))
- link_prepare_wakeup(l_ptr);
-
- /* Process the incoming packet */
- if (unlikely(!link_working(l_ptr))) {
- if (msg_user(msg) == LINK_PROTOCOL) {
- tipc_link_proto_rcv(l_ptr, skb);
- link_retrieve_defq(l_ptr, &head);
- skb = NULL;
- goto unlock;
- }
-
- /* Traffic message. Conditionally activate link */
- link_state_event(l_ptr, TRAFFIC_EVT);
-
- if (link_working(l_ptr)) {
- /* Re-insert buffer in front of queue */
- __skb_queue_head(&head, skb);
- skb = NULL;
- goto unlock;
- }
- goto unlock;
- }
-
- /* Link is now in state TIPC_LINK_WORKING */
- if (unlikely(seq_no != l_ptr->rcv_nxt)) {
- link_handle_out_of_seq_msg(l_ptr, skb);
- link_retrieve_defq(l_ptr, &head);
- skb = NULL;
- goto unlock;
- }
- l_ptr->silent_intv_cnt = 0;
-
- /* Synchronize with parallel link if applicable */
- if (unlikely((l_ptr->exec_mode == TIPC_LINK_TUNNEL) &&
- !msg_dup(msg))) {
- if (!link_synch(l_ptr))
- goto unlock;
- }
- l_ptr->rcv_nxt++;
- if (unlikely(!skb_queue_empty(&l_ptr->deferdq)))
- link_retrieve_defq(l_ptr, &head);
- if (unlikely(++l_ptr->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
- l_ptr->stats.sent_acks++;
- tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0);
- }
- tipc_link_input(l_ptr, skb);
- skb = NULL;
-unlock:
- tipc_node_unlock(n_ptr);
- tipc_node_put(n_ptr);
-discard:
- if (unlikely(skb))
- kfree_skb(skb);
- }
-}
-
/* tipc_data_input - deliver data and name distr msgs to upper layer
*
* Consumes buffer if message is of right type
@@ -1206,9 +1092,6 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
struct sk_buff *iskb;
int pos = 0;
- if (likely(tipc_data_input(link, skb)))
- return;
-
switch (msg_user(msg)) {
case TUNNEL_PROTOCOL:
if (msg_dup(msg)) {
@@ -1247,6 +1130,110 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
};
}
+static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
+{
+ bool released = false;
+ struct sk_buff *skb, *tmp;
+
+ skb_queue_walk_safe(&l->transmq, skb, tmp) {
+ if (more(buf_seqno(skb), acked))
+ break;
+ __skb_unlink(skb, &l->transmq);
+ kfree_skb(skb);
+ released = true;
+ }
+ return released;
+}
+
+/* tipc_link_rcv - process TIPC packets/messages arriving from off-node
+ * @link: the link that should handle the message
+ * @skb: TIPC packet
+ * @xmitq: queue to place packets to be sent after this call
+ */
+int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
+ struct sk_buff_head *xmitq)
+{
+ struct sk_buff_head *arrvq = &l->deferdq;
+ struct sk_buff *tmp;
+ struct tipc_msg *hdr;
+ u16 seqno, rcv_nxt;
+ int rc = 0;
+
+ if (unlikely(!__tipc_skb_queue_sorted(arrvq, skb))) {
+ if (!(skb_queue_len(arrvq) % TIPC_NACK_INTV))
+ tipc_link_build_proto_msg(l, STATE_MSG, 0,
+ 0, 0, 0, xmitq);
+ return rc;
+ }
+
+ skb_queue_walk_safe(arrvq, skb, tmp) {
+ hdr = buf_msg(skb);
+
+ /* Verify and update link state */
+ if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) {
+ __skb_dequeue(arrvq);
+ rc |= tipc_link_proto_rcv(l, skb, xmitq);
+ continue;
+ }
+
+ if (unlikely(!link_working(l))) {
+ rc |= tipc_link_fsm_evt(l, TRAFFIC_EVT, xmitq);
+ if (!link_working(l)) {
+ kfree_skb(__skb_dequeue(arrvq));
+ return rc;
+ }
+ }
+
+ l->silent_intv_cnt = 0;
+
+ /* Forward queues and wake up waiting users */
+ if (likely(tipc_link_release_pkts(l, msg_ack(hdr)))) {
+ tipc_link_advance_backlog(l, xmitq);
+ if (unlikely(!skb_queue_empty(&l->wakeupq)))
+ link_prepare_wakeup(l);
+ }
+
+ /* Defer reception if there is a gap in the sequence */
+ seqno = msg_seqno(hdr);
+ rcv_nxt = l->rcv_nxt;
+ if (unlikely(less(rcv_nxt, seqno))) {
+ l->stats.deferred_recv++;
+ return rc;
+ }
+
+ __skb_dequeue(arrvq);
+
+ /* Drop if packet already received */
+ if (unlikely(more(rcv_nxt, seqno))) {
+ l->stats.duplicates++;
+ kfree_skb(skb);
+ return rc;
+ }
+
+ /* Synchronize with parallel link if applicable */
+ if (unlikely(l->exec_mode == TIPC_LINK_TUNNEL))
+ if (!msg_dup(hdr) && !link_synch(l)) {
+ kfree_skb(skb);
+ return rc;
+ }
+
+ /* Packet can be delivered */
+ l->rcv_nxt++;
+ l->stats.recv_info++;
+ if (unlikely(!tipc_data_input(l, skb)))
+ tipc_link_input(l, skb);
+
+ /* Ack at regular intervals */
+ if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
+ l->rcv_unacked = 0;
+ l->stats.sent_acks++;
+ tipc_link_build_proto_msg(l, STATE_MSG,
+ 0, 0, 0, 0, xmitq);
+ }
+ }
+ return rc;
+}
+
/**
* tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
*
@@ -1287,41 +1274,6 @@ u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)
}
/*
- * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
- */
-static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
- struct sk_buff *buf)
-{
- u32 seq_no = buf_seqno(buf);
-
- if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
- tipc_link_proto_rcv(l_ptr, buf);
- return;
- }
-
- /* Record OOS packet arrival */
- l_ptr->silent_intv_cnt = 0;
-
- /*
- * Discard packet if a duplicate; otherwise add it to deferred queue
- * and notify peer of gap as per protocol specification
- */
- if (less(seq_no, l_ptr->rcv_nxt)) {
- l_ptr->stats.duplicates++;
- kfree_skb(buf);
- return;
- }
-
- if (tipc_link_defer_pkt(&l_ptr->deferdq, buf)) {
- l_ptr->stats.deferred_recv++;
- if ((skb_queue_len(&l_ptr->deferdq) % TIPC_MIN_LINK_WIN) == 1)
- tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0);
- } else {
- l_ptr->stats.duplicates++;
- }
-}
-
-/*
* Send protocol message to the other endpoint.
*/
void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg,
@@ -1341,119 +1293,6 @@ void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg,
kfree_skb(skb);
}
-/*
- * Receive protocol message :
- * Note that network plane id propagates through the network, and may
- * change at any time. The node with lowest address rules
- */
-static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
- struct sk_buff *buf)
-{
- u32 rec_gap = 0;
- u32 msg_tol;
- struct tipc_msg *msg = buf_msg(buf);
-
- if (l_ptr->exec_mode == TIPC_LINK_BLOCKED)
- goto exit;
-
- if (l_ptr->net_plane != msg_net_plane(msg))
- if (link_own_addr(l_ptr) > msg_prevnode(msg))
- l_ptr->net_plane = msg_net_plane(msg);
-
- switch (msg_type(msg)) {
-
- case RESET_MSG:
- if (!link_probing(l_ptr) &&
- (l_ptr->peer_session != WILDCARD_SESSION)) {
- if (less_eq(msg_session(msg), l_ptr->peer_session))
- break; /* duplicate or old reset: ignore */
- }
- link_state_event(l_ptr, RESET_MSG);
-
- /* fall thru' */
- case ACTIVATE_MSG:
- /* Update link settings according other endpoint's values */
- strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
-
- msg_tol = msg_link_tolerance(msg);
- if (msg_tol > l_ptr->tolerance)
- l_ptr->tolerance = msg_tol;
-
- if (msg_linkprio(msg) > l_ptr->priority)
- l_ptr->priority = msg_linkprio(msg);
-
- if (l_ptr->mtu > msg_max_pkt(msg))
- l_ptr->mtu = msg_max_pkt(msg);
-
- /* Synchronize broadcast link info, if not done previously */
- if (!tipc_node_is_up(l_ptr->owner)) {
- l_ptr->owner->bclink.last_sent =
- l_ptr->owner->bclink.last_in =
- msg_last_bcast(msg);
- l_ptr->owner->bclink.oos_state = 0;
- }
-
- l_ptr->peer_session = msg_session(msg);
- l_ptr->peer_bearer_id = msg_bearer_id(msg);
-
- if (!msg_peer_is_up(msg))
- tipc_node_fsm_evt(l_ptr->owner, PEER_LOST_CONTACT_EVT);
- if (msg_type(msg) == ACTIVATE_MSG)
- link_state_event(l_ptr, ACTIVATE_MSG);
- break;
- case STATE_MSG:
-
- msg_tol = msg_link_tolerance(msg);
- if (msg_tol)
- l_ptr->tolerance = msg_tol;
-
- if (msg_linkprio(msg) &&
- (msg_linkprio(msg) != l_ptr->priority)) {
- pr_info("%s<%s>, priority change %u->%u\n",
- link_rst_msg, l_ptr->name,
- l_ptr->priority, msg_linkprio(msg));
- l_ptr->priority = msg_linkprio(msg);
- tipc_link_reset(l_ptr);
- break;
- }
-
- /* Record reception; force mismatch at next timeout: */
- l_ptr->silent_intv_cnt = 0;
-
- link_state_event(l_ptr, TRAFFIC_EVT);
- l_ptr->stats.recv_states++;
- if (link_resetting(l_ptr))
- break;
-
- if (less_eq(l_ptr->rcv_nxt, msg_next_sent(msg)))
- rec_gap = mod(msg_next_sent(msg) - l_ptr->rcv_nxt);
-
- if (msg_probe(msg))
- l_ptr->stats.recv_probes++;
-
- /* Protocol message before retransmits, reduce loss risk */
- if (l_ptr->owner->bclink.recv_permitted)
- tipc_bclink_update_link_state(l_ptr->owner,
- msg_last_bcast(msg));
-
- if (rec_gap || (msg_probe(msg)))
- tipc_link_proto_xmit(l_ptr, STATE_MSG, 0,
- rec_gap, 0, 0);
-
- if (msg_seq_gap(msg)) {
- l_ptr->stats.recv_nacks++;
- tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq),
- msg_seq_gap(msg));
- }
- if (tipc_link_is_up(l_ptr))
- tipc_node_fsm_evt(l_ptr->owner,
- PEER_ESTABL_CONTACT_EVT);
- break;
- }
-exit:
- kfree_skb(buf);
-}
-
/* tipc_link_build_proto_msg: prepare link protocol message for transmission
*/
static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
@@ -1727,6 +1566,96 @@ exit:
return *skb;
}
+/* tipc_link_proto_rcv(): receive link level protocol message :
+ * Note that network plane id propagates through the network, and may
+ * change at any time. The node with lowest numerical id determines
+ * network plane
+ */
+static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
+ struct sk_buff_head *xmitq)
+{
+ struct tipc_msg *hdr = buf_msg(skb);
+ u16 rcvgap = 0;
+ u16 nacked_gap = msg_seq_gap(hdr);
+ u16 peers_snd_nxt = msg_next_sent(hdr);
+ u16 peers_tol = msg_link_tolerance(hdr);
+ u16 peers_prio = msg_linkprio(hdr);
+ char *if_name;
+ int rc = 0;
+
+ if (l->exec_mode == TIPC_LINK_BLOCKED)
+ goto exit;
+
+ if (link_own_addr(l) > msg_prevnode(hdr))
+ l->net_plane = msg_net_plane(hdr);
+
+ switch (msg_type(hdr)) {
+ case RESET_MSG:
+
+ /* Ignore duplicate RESET with old session number */
+ if ((less_eq(msg_session(hdr), l->peer_session)) &&
+ (l->peer_session != WILDCARD_SESSION))
+ break;
+ /* fall thru' */
+ case ACTIVATE_MSG:
+
+ /* Complete own link name with peer's interface name */
+ if_name = strrchr(l->name, ':') + 1;
+ if (sizeof(l->name) - (if_name - l->name) <= TIPC_MAX_IF_NAME)
+ break;
+ if (msg_data_sz(hdr) < TIPC_MAX_IF_NAME)
+ break;
+ strncpy(if_name, msg_data(hdr), TIPC_MAX_IF_NAME);
+
+ /* Update own tolerance if peer indicates a non-zero value */
+ if (in_range(peers_tol, TIPC_MIN_LINK_TOL, TIPC_MAX_LINK_TOL))
+ l->tolerance = peers_tol;
+
+ /* Update own priority if peer's priority is higher */
+ if (in_range(peers_prio, l->priority + 1, TIPC_MAX_LINK_PRI))
+ l->priority = peers_prio;
+
+ l->peer_session = msg_session(hdr);
+ l->peer_bearer_id = msg_bearer_id(hdr);
+ rc = tipc_link_fsm_evt(l, msg_type(hdr), xmitq);
+ if (l->mtu > msg_max_pkt(hdr))
+ l->mtu = msg_max_pkt(hdr);
+ break;
+ case STATE_MSG:
+ /* Update own tolerance if peer indicates a non-zero value */
+ if (in_range(peers_tol, TIPC_MIN_LINK_TOL, TIPC_MAX_LINK_TOL))
+ l->tolerance = peers_tol;
+
+ l->silent_intv_cnt = 0;
+ l->stats.recv_states++;
+ if (msg_probe(hdr))
+ l->stats.recv_probes++;
+ rc = tipc_link_fsm_evt(l, TRAFFIC_EVT, xmitq);
+ if (!tipc_link_is_up(l))
+ break;
+
+ /* Has peer sent packets we haven't received yet ? */
+ if (more(peers_snd_nxt, l->rcv_nxt))
+ rcvgap = peers_snd_nxt - l->rcv_nxt;
+ if (rcvgap || (msg_probe(hdr)))
+ tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap,
+ 0, l->mtu, xmitq);
+ tipc_link_release_pkts(l, msg_ack(hdr));
+
+ /* If NACK, retransmit will now start at right position */
+ if (nacked_gap) {
+ rc |= tipc_link_retransm(l, nacked_gap, xmitq);
+ l->stats.recv_nacks++;
+ }
+ tipc_link_advance_backlog(l, xmitq);
+ if (unlikely(!skb_queue_empty(&l->wakeupq)))
+ link_prepare_wakeup(l);
+ }
+exit:
+ kfree_skb(skb);
+ return rc;
+}
+
void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
{
int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE);
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 0cf7d2b..37cfd7d 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -58,7 +58,7 @@ enum {
TIPC_LINK_TUNNEL
};
-/* Events occurring at packet reception or at timeout
+/* Events returned from link at packet reception or at timeout
*/
enum {
TIPC_LINK_UP_EVT = 1,
@@ -223,6 +223,7 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr);
void tipc_link_purge_backlog(struct tipc_link *l);
void tipc_link_reset_all(struct tipc_node *node);
void tipc_link_reset(struct tipc_link *l_ptr);
+void tipc_link_activate(struct tipc_link *link);
int __tipc_link_xmit(struct net *net, struct tipc_link *link,
struct sk_buff_head *list);
int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list,
@@ -244,7 +245,8 @@ int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info);
int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
void link_prepare_wakeup(struct tipc_link *l);
int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq);
-
+int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
+ struct sk_buff_head *xmitq);
static inline u32 link_own_addr(struct tipc_link *l)
{
return msg_prevnode(l->pmsg);
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 4dc66d9..2f1563b 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -38,6 +38,7 @@
#define _TIPC_MSG_H
#include <linux/tipc.h>
+#include "core.h"
/*
* Constants and routines used to read and write TIPC payload message headers
@@ -658,12 +659,12 @@ static inline void msg_set_link_selector(struct tipc_msg *m, u32 n)
/*
* Word 5
*/
-static inline u32 msg_session(struct tipc_msg *m)
+static inline u16 msg_session(struct tipc_msg *m)
{
return msg_bits(m, 5, 16, 0xffff);
}
-static inline void msg_set_session(struct tipc_msg *m, u32 n)
+static inline void msg_set_session(struct tipc_msg *m, u16 n)
{
msg_set_bits(m, 5, 16, 0xffff, n);
}
@@ -766,10 +767,19 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
msg_set_bits(m, 9, 0, 0xffff, n);
}
-static inline bool msg_peer_is_up(struct tipc_msg *m)
+static inline bool msg_is_traffic(struct tipc_msg *m)
{
- if (likely(msg_user(m) != LINK_PROTOCOL) || (msg_type(m) == STATE_MSG))
+ if (likely(msg_user(m) != LINK_PROTOCOL))
return true;
+ if ((msg_type(m) == RESET_MSG) || (msg_type(m) == ACTIVATE_MSG))
+ return false;
+ return true;
+}
+
+static inline bool msg_peer_is_up(struct tipc_msg *m)
+{
+ if (likely(msg_is_traffic(m)))
+ return false;
return msg_redundant_link(m);
}
@@ -886,4 +896,36 @@ static inline bool tipc_skb_queue_tail(struct sk_buff_head *list,
return rv;
}
+/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
+ * @list: list to be appended to
+ * @skb: buffer to add
+ * Returns true if queue should treated further, otherwise false
+ */
+static inline bool __tipc_skb_queue_sorted(struct sk_buff_head *list,
+ struct sk_buff *skb)
+{
+ struct sk_buff *_skb, *tmp;
+ struct tipc_msg *hdr = buf_msg(skb);
+ u16 seqno = msg_seqno(hdr);
+
+ if (skb_queue_empty(list) || (msg_user(hdr) == LINK_PROTOCOL)) {
+ __skb_queue_head(list, skb);
+ return true;
+ }
+ if (likely(less(seqno, buf_seqno(skb_peek(list))))) {
+ __skb_queue_head(list, skb);
+ return true;
+ }
+ if (!more(seqno, buf_seqno(skb_peek_tail(list)))) {
+ skb_queue_walk_safe(list, _skb, tmp) {
+ if (likely(less(seqno, buf_seqno(_skb)))) {
+ __skb_queue_before(list, _skb, skb);
+ return true;
+ }
+ }
+ }
+ __skb_queue_tail(list, skb);
+ return false;
+}
+
#endif
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 9dbbb5d..e92f84a 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -40,11 +40,13 @@
#include "name_distr.h"
#include "socket.h"
#include "bcast.h"
+#include "discover.h"
static void node_lost_contact(struct tipc_node *n_ptr);
static void node_established_contact(struct tipc_node *n_ptr);
static void tipc_node_delete(struct tipc_node *node);
static void tipc_node_timeout(unsigned long data);
+static void tipc_node_fsm_evt(struct tipc_node *n, int evt);
struct tipc_sock_conn {
u32 port;
@@ -141,7 +143,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)
break;
}
list_add_tail_rcu(&n_ptr->list, &temp_node->list);
- n_ptr->state = SELF_DOWN_PEER_DOWN;
+ n_ptr->state = SELF_DOWN_PEER_LEAVING;
n_ptr->signature = INVALID_NODE_SIG;
n_ptr->active_links[0] = INVALID_BEARER_ID;
n_ptr->active_links[1] = INVALID_BEARER_ID;
@@ -424,7 +426,7 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
/* tipc_node_fsm_evt - node finite state machine
* Determines when contact is allowed with peer node
*/
-void tipc_node_fsm_evt(struct tipc_node *n, int evt)
+static void tipc_node_fsm_evt(struct tipc_node *n, int evt)
{
int state = n->state;
@@ -523,23 +525,36 @@ void tipc_node_fsm_evt(struct tipc_node *n, int evt)
n->state = state;
}
-bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_msg *hdr)
+bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_link *l,
+ struct tipc_msg *hdr)
{
int state = n->state;
if (likely(state == SELF_UP_PEER_UP))
return true;
+
if (state == SELF_DOWN_PEER_DOWN)
return true;
- if (state == SELF_UP_PEER_COMING)
+
+ if (state == SELF_UP_PEER_COMING) {
+ /* If not traffic msg, peer may still be ESTABLISHING */
+ if (tipc_link_is_up(l) && msg_is_traffic(hdr))
+ tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT);
return true;
+ }
+
if (state == SELF_COMING_PEER_UP)
return true;
+
if (state == SELF_LEAVING_PEER_DOWN)
return false;
- if (state == SELF_DOWN_PEER_LEAVING)
- if (!msg_peer_is_up(hdr))
- return true;
+
+ if (state == SELF_DOWN_PEER_LEAVING) {
+ if (msg_peer_is_up(hdr))
+ return false;
+ tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
+ return true;
+ }
return false;
}
@@ -819,6 +834,82 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
return 0;
}
+/**
+ * tipc_rcv - process TIPC packets/messages arriving from off-node
+ * @net: the applicable net namespace
+ * @skb: TIPC packet
+ * @bearer: pointer to bearer message arrived on
+ *
+ * Invoked with no locks held. Bearer pointer must point to a valid bearer
+ * structure (i.e. cannot be NULL), but bearer can be inactive.
+ */
+void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
+{
+ struct sk_buff_head xmitq;
+ struct tipc_node *n;
+ struct tipc_link *l;
+ struct tipc_msg *hdr;
+ struct tipc_media_addr *maddr;
+ int bearer_id = b->identity;
+ int rc = 0;
+
+ __skb_queue_head_init(&xmitq);
+
+ /* Ensure message is well-formed */
+ if (unlikely(!tipc_msg_validate(skb)))
+ goto discard;
+
+ /* Handle arrival of a non-unicast link packet */
+ hdr = buf_msg(skb);
+ if (unlikely(msg_non_seq(hdr))) {
+ if (msg_user(hdr) == LINK_CONFIG)
+ tipc_disc_rcv(net, skb, b);
+ else
+ tipc_bclink_rcv(net, skb);
+ return;
+ }
+
+ /* Locate neighboring node that sent packet */
+ n = tipc_node_find(net, msg_prevnode(hdr));
+ if (unlikely(!n))
+ goto discard;
+ tipc_node_lock(n);
+
+ /* Locate link endpoint that should handle packet */
+ l = n->links[bearer_id].link;
+ if (unlikely(!l))
+ goto unlock;
+
+ /* Is reception of this packet permitted at the moment ? */
+ if (unlikely(n->state != SELF_UP_PEER_UP))
+ if (!tipc_node_filter_skb(n, l, hdr))
+ goto unlock;
+
+ if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
+ tipc_bclink_sync_state(n, hdr);
+
+ /* Release acked broadcast messages */
+ if (unlikely(n->bclink.acked != msg_bcast_ack(hdr)))
+ tipc_bclink_acknowledge(n, msg_bcast_ack(hdr));
+
+ /* Check protocol and update link state */
+ rc = tipc_link_rcv(l, skb, &xmitq);
+
+ if (unlikely(rc & TIPC_LINK_UP_EVT))
+ tipc_link_activate(l);
+ if (unlikely(rc & TIPC_LINK_DOWN_EVT))
+ tipc_link_reset(l);
+ skb = NULL;
+unlock:
+ tipc_node_unlock(n);
+ tipc_sk_rcv(net, &n->links[bearer_id].inputq);
+ maddr = &n->links[bearer_id].maddr;
+ tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
+ tipc_node_put(n);
+discard:
+ kfree_skb(skb);
+}
+
int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
{
int err;
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 270256e..5e70168 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -185,7 +185,6 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
u32 selector);
int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
-
int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb);
static inline void tipc_node_lock(struct tipc_node *node)
@@ -193,9 +192,6 @@ static inline void tipc_node_lock(struct tipc_node *node)
spin_lock_bh(&node->lock);
}
-void tipc_node_fsm_evt(struct tipc_node *n, int evt);
-bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_msg *hdr);
-
static inline struct tipc_link *node_active_link(struct tipc_node *n, int sel)
{
int bearer_id = n->active_links[sel & 1];
--
1.9.1
^ permalink raw reply related [flat|nested] 15+ messages in thread
* Re: [PATCH net-next 00/13] tipc: separate link and link aggregation layer
2015-07-16 20:54 [PATCH net-next 00/13] tipc: separate link and link aggregation layer Jon Maloy
` (12 preceding siblings ...)
2015-07-16 20:54 ` [PATCH net-next 13/13] tipc: reduce locking scope during packet reception Jon Maloy
@ 2015-07-21 3:41 ` David Miller
13 siblings, 0 replies; 15+ messages in thread
From: David Miller @ 2015-07-21 3:41 UTC (permalink / raw)
To: jon.maloy
Cc: netdev, paul.gortmaker, erik.hugne, ying.xue, maloy,
tipc-discussion
From: Jon Maloy <jon.maloy@ericsson.com>
Date: Thu, 16 Jul 2015 16:54:18 -0400
> This is the first batch of a longer series that has two main objectives:
>
> o Finer lock granularity during message sending and reception,
> especially regarding usage of the node spinlock.
>
> o Better separation between the link layer implementation and the link
> aggregation layer, represented by node.c::struct tipc_node.
>
> Hopefully these changes also make this part of code somewhat easier
> to comprehend and maintain.
Series applied, thanks Jon.
^ permalink raw reply [flat|nested] 15+ messages in thread
end of thread, other threads:[~2015-07-21 3:41 UTC | newest]
Thread overview: 15+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2015-07-16 20:54 [PATCH net-next 00/13] tipc: separate link and link aggregation layer Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 01/13] tipc: introduce link entry structure to struct tipc_node Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 02/13] tipc: move link creation from neighbor discoverer to node Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 03/13] tipc: move link input queue to tipc_node Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 04/13] tipc: use bearer index when looking up active links Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 05/13] tipc: change sk_buffer handling in tipc_link_xmit() Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 06/13] tipc: make media xmit call outside node spinlock context Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 07/13] tipc: clean up definitions and usage of link flags Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 08/13] tipc: introduce new link protocol msg create function Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 09/13] tipc: improve link FSM implementation Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 10/13] tipc: simplify link timer implementation Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 11/13] tipc: move link supervision timer to node level Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 12/13] tipc: introduce node contact FSM Jon Maloy
2015-07-16 20:54 ` [PATCH net-next 13/13] tipc: reduce locking scope during packet reception Jon Maloy
2015-07-21 3:41 ` [PATCH net-next 00/13] tipc: separate link and link aggregation layer David Miller
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).