* [PATCH net-next 1/9] tipc: fix race/inefficiencies in poll/wait behaviour
2012-11-22 20:59 [PATCH net-next 0/9] tipc: updates for what will be v3.8 Paul Gortmaker
@ 2012-11-22 20:59 ` Paul Gortmaker
2012-11-22 20:59 ` [PATCH net-next 2/9] tipc: return POLLOUT for sockets in an unconnected state Paul Gortmaker
` (8 subsequent siblings)
9 siblings, 0 replies; 11+ messages in thread
From: Paul Gortmaker @ 2012-11-22 20:59 UTC (permalink / raw)
To: David Miller; +Cc: netdev, Ying Xue, Paul Gortmaker
From: Ying Xue <ying.xue@windriver.com>
When an application blocks at poll/select on a TIPC socket
while requesting a specific event mask, both the filter_rcv() and
wakeupdispatch() case will wake it up unconditionally whenever
the state changes (i.e an incoming message arrives, or congestion
has subsided). No mask is used.
To avoid this, we populate sk->sk_data_ready and sk->sk_write_space
with tipc_data_ready and tipc_write_space respectively, which makes
tipc more in alignment with the rest of the networking code. These
pass the exact set of possible events to the waker in fs/select.c
hence avoiding waking up blocked processes unnecessarily.
In doing so, we uncover another issue -- that there needs to be a
memory barrier in these poll/receive callbacks, otherwise we are
subject to the the same race as documented above wq_has_sleeper()
[in commit a57de0b4 "net: adding memory barrier to the poll and
receive callbacks"]. So we need to replace poll_wait() with
sock_poll_wait() and use rcu protection for the sk->sk_wq pointer
in these two new functions.
Signed-off-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com>
---
net/tipc/socket.c | 45 ++++++++++++++++++++++++++++++++++++++++-----
1 file changed, 40 insertions(+), 5 deletions(-)
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index fd5f042..b5fc8ed 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -62,6 +62,8 @@ struct tipc_sock {
static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
static void wakeupdispatch(struct tipc_port *tport);
+static void tipc_data_ready(struct sock *sk, int len);
+static void tipc_write_space(struct sock *sk);
static const struct proto_ops packet_ops;
static const struct proto_ops stream_ops;
@@ -221,6 +223,8 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
sock_init_data(sock, sk);
sk->sk_backlog_rcv = backlog_rcv;
sk->sk_rcvbuf = TIPC_FLOW_CONTROL_WIN * 2 * TIPC_MAX_USER_MSG_SIZE * 2;
+ sk->sk_data_ready = tipc_data_ready;
+ sk->sk_write_space = tipc_write_space;
tipc_sk(sk)->p = tp_ptr;
tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT;
@@ -435,7 +439,7 @@ static unsigned int poll(struct file *file, struct socket *sock,
struct sock *sk = sock->sk;
u32 mask = 0;
- poll_wait(file, sk_sleep(sk), wait);
+ sock_poll_wait(file, sk_sleep(sk), wait);
switch ((int)sock->state) {
case SS_READY:
@@ -1126,6 +1130,39 @@ exit:
}
/**
+ * tipc_write_space - wake up thread if port congestion is released
+ * @sk: socket
+ */
+static void tipc_write_space(struct sock *sk)
+{
+ struct socket_wq *wq;
+
+ rcu_read_lock();
+ wq = rcu_dereference(sk->sk_wq);
+ if (wq_has_sleeper(wq))
+ wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
+ POLLWRNORM | POLLWRBAND);
+ rcu_read_unlock();
+}
+
+/**
+ * tipc_data_ready - wake up threads to indicate messages have been received
+ * @sk: socket
+ * @len: the length of messages
+ */
+static void tipc_data_ready(struct sock *sk, int len)
+{
+ struct socket_wq *wq;
+
+ rcu_read_lock();
+ wq = rcu_dereference(sk->sk_wq);
+ if (wq_has_sleeper(wq))
+ wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
+ POLLRDNORM | POLLRDBAND);
+ rcu_read_unlock();
+}
+
+/**
* rx_queue_full - determine if receive queue can accept another message
* @msg: message to be added to queue
* @queue_size: current size of queue
@@ -1222,8 +1259,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
tipc_disconnect_port(tipc_sk_port(sk));
}
- if (waitqueue_active(sk_sleep(sk)))
- wake_up_interruptible(sk_sleep(sk));
+ sk->sk_data_ready(sk, 0);
return TIPC_OK;
}
@@ -1290,8 +1326,7 @@ static void wakeupdispatch(struct tipc_port *tport)
{
struct sock *sk = (struct sock *)tport->usr_handle;
- if (waitqueue_active(sk_sleep(sk)))
- wake_up_interruptible(sk_sleep(sk));
+ sk->sk_write_space(sk);
}
/**
--
1.7.12.1
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH net-next 2/9] tipc: return POLLOUT for sockets in an unconnected state
2012-11-22 20:59 [PATCH net-next 0/9] tipc: updates for what will be v3.8 Paul Gortmaker
2012-11-22 20:59 ` [PATCH net-next 1/9] tipc: fix race/inefficiencies in poll/wait behaviour Paul Gortmaker
@ 2012-11-22 20:59 ` Paul Gortmaker
2012-11-22 20:59 ` [PATCH net-next 3/9] tipc: wake up all waiting threads at socket shutdown Paul Gortmaker
` (7 subsequent siblings)
9 siblings, 0 replies; 11+ messages in thread
From: Paul Gortmaker @ 2012-11-22 20:59 UTC (permalink / raw)
To: David Miller; +Cc: netdev, Erik Hugne, Paul Gortmaker
From: Erik Hugne <erik.hugne@ericsson.com>
If an implied connect is attempted on a nonblocking STREAM/SEQPACKET
socket during link congestion, the connect message will be discarded
and sendmsg will return EAGAIN. This is normal behavior, and the
application is expected to poll the socket until POLLOUT is set,
after which the connection attempt can be retried.
However, the POLLOUT flag is never set for unconnected sockets and
poll() always returns a zero mask. The application is then left without
a trigger for when it can make another attempt at sending the message.
The solution is to check if we're polling on an unconnected socket
and set the POLLOUT flag if the TIPC port owned by this socket
is not congested. The TIPC ports waiting on a specific link will be
marked as 'not congested' when the link congestion have abated.
Signed-off-by: Erik Hugne <erik.hugne@ericsson.com>
Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com>
---
net/tipc/socket.c | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index b5fc8ed..59adc76 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -412,7 +412,7 @@ static int get_name(struct socket *sock, struct sockaddr *uaddr,
* socket state flags set
* ------------ ---------
* unconnected no read flags
- * no write flags
+ * POLLOUT if port is not congested
*
* connecting POLLIN/POLLRDNORM if ACK/NACK in rx queue
* no write flags
@@ -442,6 +442,10 @@ static unsigned int poll(struct file *file, struct socket *sock,
sock_poll_wait(file, sk_sleep(sk), wait);
switch ((int)sock->state) {
+ case SS_UNCONNECTED:
+ if (!tipc_sk_port(sk)->congested)
+ mask |= POLLOUT;
+ break;
case SS_READY:
case SS_CONNECTED:
if (!tipc_sk_port(sk)->congested)
--
1.7.12.1
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH net-next 3/9] tipc: wake up all waiting threads at socket shutdown
2012-11-22 20:59 [PATCH net-next 0/9] tipc: updates for what will be v3.8 Paul Gortmaker
2012-11-22 20:59 ` [PATCH net-next 1/9] tipc: fix race/inefficiencies in poll/wait behaviour Paul Gortmaker
2012-11-22 20:59 ` [PATCH net-next 2/9] tipc: return POLLOUT for sockets in an unconnected state Paul Gortmaker
@ 2012-11-22 20:59 ` Paul Gortmaker
2012-11-22 20:59 ` [PATCH net-next 4/9] tipc: remove the bearer congestion mechanism Paul Gortmaker
` (6 subsequent siblings)
9 siblings, 0 replies; 11+ messages in thread
From: Paul Gortmaker @ 2012-11-22 20:59 UTC (permalink / raw)
To: David Miller; +Cc: netdev, Ying Xue, Jon Maloy, Paul Gortmaker
From: Ying Xue <ying.xue@windriver.com>
When a socket is shut down, we should wake up all thread sleeping on
it, instead of just one of them. Otherwise, when several threads are
polling the same socket, and one of them does shutdown(), the
remaining threads may end up sleeping forever.
Also, to align socket usage with common practice in other stacks, we
use one of the common socket callback handlers, sk_state_change(),
to wake up pending users. This is similar to the usage in e.g.
inet_shutdown(). [net/ipv4/af_inet.c].
Signed-off-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com>
---
net/tipc/socket.c | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 59adc76..1a720c8 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1595,10 +1595,11 @@ restart:
case SS_DISCONNECTING:
- /* Discard any unreceived messages; wake up sleeping tasks */
+ /* Discard any unreceived messages */
discard_rx_queue(sk);
- if (waitqueue_active(sk_sleep(sk)))
- wake_up_interruptible(sk_sleep(sk));
+
+ /* Wake up anyone sleeping in poll */
+ sk->sk_state_change(sk);
res = 0;
break;
--
1.7.12.1
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH net-next 4/9] tipc: remove the bearer congestion mechanism
2012-11-22 20:59 [PATCH net-next 0/9] tipc: updates for what will be v3.8 Paul Gortmaker
` (2 preceding siblings ...)
2012-11-22 20:59 ` [PATCH net-next 3/9] tipc: wake up all waiting threads at socket shutdown Paul Gortmaker
@ 2012-11-22 20:59 ` Paul Gortmaker
2012-11-22 20:59 ` [PATCH net-next 5/9] tipc: remove supportable flag from bclink structure Paul Gortmaker
` (5 subsequent siblings)
9 siblings, 0 replies; 11+ messages in thread
From: Paul Gortmaker @ 2012-11-22 20:59 UTC (permalink / raw)
To: David Miller; +Cc: netdev, Ying Xue, Paul Gortmaker
From: Ying Xue <ying.xue@windriver.com>
Currently at the TIPC bearer layer there is the following congestion
mechanism:
Once sending packets has failed via that bearer, the bearer will be
flagged as being in congested state at once. During bearer congestion,
all packets arriving at link will be queued on the link's outgoing
buffer. When we detect that the state of bearer congestion has
relaxed (e.g. some packets are received from the bearer) we will try
our best to push all packets in the link's outgoing buffer until the
buffer is empty, or until the bearer is congested again.
However, in fact the TIPC bearer never receives any feedback from the
device layer whether a send was successful or not, so it must always
assume it was successful. Therefore, the bearer congestion mechanism
as it exists currently is of no value.
But the bearer blocking state is still useful for us. For example,
when the physical media goes down/up, we need to change the state of
the links bound to the bearer. So the code maintaing the state
information is not removed.
Signed-off-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com>
---
net/tipc/bcast.c | 21 ++++------
net/tipc/bearer.c | 110 +++++-------------------------------------------
net/tipc/bearer.h | 24 ++---------
net/tipc/discover.c | 2 +-
net/tipc/link.c | 117 +++++++++++++++++-----------------------------------
net/tipc/link.h | 4 --
6 files changed, 61 insertions(+), 217 deletions(-)
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index e4e6d8c..40da098 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -619,16 +619,14 @@ static int tipc_bcbearer_send(struct sk_buff *buf,
if (bcbearer->remains_new.count == bcbearer->remains.count)
continue; /* bearer pair doesn't add anything */
- if (p->blocked ||
- p->media->send_msg(buf, p, &p->media->bcast_addr)) {
+ if (!tipc_bearer_blocked(p))
+ tipc_bearer_send(p, buf, &p->media->bcast_addr);
+ else if (s && !tipc_bearer_blocked(s))
/* unable to send on primary bearer */
- if (!s || s->blocked ||
- s->media->send_msg(buf, s,
- &s->media->bcast_addr)) {
- /* unable to send on either bearer */
- continue;
- }
- }
+ tipc_bearer_send(s, buf, &s->media->bcast_addr);
+ else
+ /* unable to send on either bearer */
+ continue;
if (s) {
bcbearer->bpairs[bp_index].primary = s;
@@ -731,8 +729,8 @@ int tipc_bclink_stats(char *buf, const u32 buf_size)
" TX naks:%u acks:%u dups:%u\n",
s->sent_nacks, s->sent_acks, s->retransmitted);
ret += tipc_snprintf(buf + ret, buf_size - ret,
- " Congestion bearer:%u link:%u Send queue max:%u avg:%u\n",
- s->bearer_congs, s->link_congs, s->max_queue_sz,
+ " Congestion link:%u Send queue max:%u avg:%u\n",
+ s->link_congs, s->max_queue_sz,
s->queue_sz_counts ?
(s->accu_queue_sz / s->queue_sz_counts) : 0);
@@ -766,7 +764,6 @@ int tipc_bclink_set_queue_limits(u32 limit)
void tipc_bclink_init(void)
{
- INIT_LIST_HEAD(&bcbearer->bearer.cong_links);
bcbearer->bearer.media = &bcbearer->media;
bcbearer->media.send_msg = tipc_bcbearer_send;
sprintf(bcbearer->media.name, "tipc-broadcast");
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 4ec5c80..aa62f93 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -279,116 +279,31 @@ void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest)
}
/*
- * bearer_push(): Resolve bearer congestion. Force the waiting
- * links to push out their unsent packets, one packet per link
- * per iteration, until all packets are gone or congestion reoccurs.
- * 'tipc_net_lock' is read_locked when this function is called
- * bearer.lock must be taken before calling
- * Returns binary true(1) ore false(0)
- */
-static int bearer_push(struct tipc_bearer *b_ptr)
-{
- u32 res = 0;
- struct tipc_link *ln, *tln;
-
- if (b_ptr->blocked)
- return 0;
-
- while (!list_empty(&b_ptr->cong_links) && (res != PUSH_FAILED)) {
- list_for_each_entry_safe(ln, tln, &b_ptr->cong_links, link_list) {
- res = tipc_link_push_packet(ln);
- if (res == PUSH_FAILED)
- break;
- if (res == PUSH_FINISHED)
- list_move_tail(&ln->link_list, &b_ptr->links);
- }
- }
- return list_empty(&b_ptr->cong_links);
-}
-
-void tipc_bearer_lock_push(struct tipc_bearer *b_ptr)
-{
- spin_lock_bh(&b_ptr->lock);
- bearer_push(b_ptr);
- spin_unlock_bh(&b_ptr->lock);
-}
-
-
-/*
- * Interrupt enabling new requests after bearer congestion or blocking:
+ * Interrupt enabling new requests after bearer blocking:
* See bearer_send().
*/
-void tipc_continue(struct tipc_bearer *b_ptr)
+void tipc_continue(struct tipc_bearer *b)
{
- spin_lock_bh(&b_ptr->lock);
- if (!list_empty(&b_ptr->cong_links))
- tipc_k_signal((Handler)tipc_bearer_lock_push, (unsigned long)b_ptr);
- b_ptr->blocked = 0;
- spin_unlock_bh(&b_ptr->lock);
+ spin_lock_bh(&b->lock);
+ b->blocked = 0;
+ spin_unlock_bh(&b->lock);
}
/*
- * Schedule link for sending of messages after the bearer
- * has been deblocked by 'continue()'. This method is called
- * when somebody tries to send a message via this link while
- * the bearer is congested. 'tipc_net_lock' is in read_lock here
- * bearer.lock is busy
+ * tipc_bearer_blocked - determines if bearer is currently blocked
*/
-static void tipc_bearer_schedule_unlocked(struct tipc_bearer *b_ptr,
- struct tipc_link *l_ptr)
+int tipc_bearer_blocked(struct tipc_bearer *b)
{
- list_move_tail(&l_ptr->link_list, &b_ptr->cong_links);
-}
-
-/*
- * Schedule link for sending of messages after the bearer
- * has been deblocked by 'continue()'. This method is called
- * when somebody tries to send a message via this link while
- * the bearer is congested. 'tipc_net_lock' is in read_lock here,
- * bearer.lock is free
- */
-void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr)
-{
- spin_lock_bh(&b_ptr->lock);
- tipc_bearer_schedule_unlocked(b_ptr, l_ptr);
- spin_unlock_bh(&b_ptr->lock);
-}
-
+ int res;
-/*
- * tipc_bearer_resolve_congestion(): Check if there is bearer congestion,
- * and if there is, try to resolve it before returning.
- * 'tipc_net_lock' is read_locked when this function is called
- */
-int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr,
- struct tipc_link *l_ptr)
-{
- int res = 1;
+ spin_lock_bh(&b->lock);
+ res = b->blocked;
+ spin_unlock_bh(&b->lock);
- if (list_empty(&b_ptr->cong_links))
- return 1;
- spin_lock_bh(&b_ptr->lock);
- if (!bearer_push(b_ptr)) {
- tipc_bearer_schedule_unlocked(b_ptr, l_ptr);
- res = 0;
- }
- spin_unlock_bh(&b_ptr->lock);
return res;
}
/**
- * tipc_bearer_congested - determines if bearer is currently congested
- */
-int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr)
-{
- if (unlikely(b_ptr->blocked))
- return 1;
- if (likely(list_empty(&b_ptr->cong_links)))
- return 0;
- return !tipc_bearer_resolve_congestion(b_ptr, l_ptr);
-}
-
-/**
* tipc_enable_bearer - enable bearer with the given name
*/
int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
@@ -489,7 +404,6 @@ restart:
b_ptr->net_plane = bearer_id + 'A';
b_ptr->active = 1;
b_ptr->priority = priority;
- INIT_LIST_HEAD(&b_ptr->cong_links);
INIT_LIST_HEAD(&b_ptr->links);
spin_lock_init(&b_ptr->lock);
@@ -528,7 +442,6 @@ int tipc_block_bearer(const char *name)
pr_info("Blocking bearer <%s>\n", name);
spin_lock_bh(&b_ptr->lock);
b_ptr->blocked = 1;
- list_splice_init(&b_ptr->cong_links, &b_ptr->links);
list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
struct tipc_node *n_ptr = l_ptr->owner;
@@ -555,7 +468,6 @@ static void bearer_disable(struct tipc_bearer *b_ptr)
spin_lock_bh(&b_ptr->lock);
b_ptr->blocked = 1;
b_ptr->media->disable_bearer(b_ptr);
- list_splice_init(&b_ptr->cong_links, &b_ptr->links);
list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
tipc_link_delete(l_ptr);
}
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index dd4c2ab..39f1192 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -120,7 +120,6 @@ struct tipc_media {
* @identity: array index of this bearer within TIPC bearer array
* @link_req: ptr to (optional) structure making periodic link setup requests
* @links: list of non-congested links associated with bearer
- * @cong_links: list of congested links associated with bearer
* @active: non-zero if bearer structure is represents a bearer
* @net_plane: network plane ('A' through 'H') currently associated with bearer
* @nodes: indicates which nodes in cluster can be reached through bearer
@@ -143,7 +142,6 @@ struct tipc_bearer {
u32 identity;
struct tipc_link_req *link_req;
struct list_head links;
- struct list_head cong_links;
int active;
char net_plane;
struct tipc_node_map nodes;
@@ -185,39 +183,23 @@ struct sk_buff *tipc_media_get_names(void);
struct sk_buff *tipc_bearer_get_names(void);
void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest);
void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest);
-void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr);
struct tipc_bearer *tipc_bearer_find(const char *name);
struct tipc_bearer *tipc_bearer_find_interface(const char *if_name);
struct tipc_media *tipc_media_find(const char *name);
-int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr,
- struct tipc_link *l_ptr);
-int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr);
+int tipc_bearer_blocked(struct tipc_bearer *b_ptr);
void tipc_bearer_stop(void);
-void tipc_bearer_lock_push(struct tipc_bearer *b_ptr);
-
/**
* tipc_bearer_send- sends buffer to destination over bearer
*
- * Returns true (1) if successful, or false (0) if unable to send
- *
* IMPORTANT:
* The media send routine must not alter the buffer being passed in
* as it may be needed for later retransmission!
- *
- * If the media send routine returns a non-zero value (indicating that
- * it was unable to send the buffer), it must:
- * 1) mark the bearer as blocked,
- * 2) call tipc_continue() once the bearer is able to send again.
- * Media types that are unable to meet these two critera must ensure their
- * send routine always returns success -- even if the buffer was not sent --
- * and let TIPC's link code deal with the undelivered message.
*/
-static inline int tipc_bearer_send(struct tipc_bearer *b_ptr,
- struct sk_buff *buf,
+static inline void tipc_bearer_send(struct tipc_bearer *b, struct sk_buff *buf,
struct tipc_media_addr *dest)
{
- return !b_ptr->media->send_msg(buf, b_ptr, dest);
+ b->media->send_msg(buf, b, dest);
}
#endif /* _TIPC_BEARER_H */
diff --git a/net/tipc/discover.c b/net/tipc/discover.c
index 50eaa40..1074b95 100644
--- a/net/tipc/discover.c
+++ b/net/tipc/discover.c
@@ -243,7 +243,7 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
if ((type == DSC_REQ_MSG) && !link_fully_up && !b_ptr->blocked) {
rbuf = tipc_disc_init_msg(DSC_RESP_MSG, orig, b_ptr);
if (rbuf) {
- b_ptr->media->send_msg(rbuf, b_ptr, &media_addr);
+ tipc_bearer_send(b_ptr, rbuf, &media_addr);
kfree_skb(rbuf);
}
}
diff --git a/net/tipc/link.c b/net/tipc/link.c
index a79c755..0cc6480 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -872,17 +872,12 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
return link_send_long_buf(l_ptr, buf);
/* Packet can be queued or sent. */
- if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) &&
+ if (likely(!tipc_bearer_blocked(l_ptr->b_ptr) &&
!link_congested(l_ptr))) {
link_add_to_outqueue(l_ptr, buf, msg);
- if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) {
- l_ptr->unacked_window = 0;
- } else {
- tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
- l_ptr->stats.bearer_congs++;
- l_ptr->next_out = buf;
- }
+ tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+ l_ptr->unacked_window = 0;
return dsz;
}
/* Congestion: can message be bundled ? */
@@ -891,10 +886,8 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
/* Try adding message to an existing bundle */
if (l_ptr->next_out &&
- link_bundle_buf(l_ptr, l_ptr->last_out, buf)) {
- tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
+ link_bundle_buf(l_ptr, l_ptr->last_out, buf))
return dsz;
- }
/* Try creating a new bundle */
if (size <= max_packet * 2 / 3) {
@@ -917,7 +910,6 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
if (!l_ptr->next_out)
l_ptr->next_out = buf;
link_add_to_outqueue(l_ptr, buf, msg);
- tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
return dsz;
}
@@ -1006,16 +998,11 @@ static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
if (likely(!link_congested(l_ptr))) {
if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
- if (likely(list_empty(&l_ptr->b_ptr->cong_links))) {
+ if (likely(!tipc_bearer_blocked(l_ptr->b_ptr))) {
link_add_to_outqueue(l_ptr, buf, msg);
- if (likely(tipc_bearer_send(l_ptr->b_ptr, buf,
- &l_ptr->media_addr))) {
- l_ptr->unacked_window = 0;
- return res;
- }
- tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
- l_ptr->stats.bearer_congs++;
- l_ptr->next_out = buf;
+ tipc_bearer_send(l_ptr->b_ptr, buf,
+ &l_ptr->media_addr);
+ l_ptr->unacked_window = 0;
return res;
}
} else
@@ -1106,7 +1093,7 @@ exit:
/* Exit if link (or bearer) is congested */
if (link_congested(l_ptr) ||
- !list_empty(&l_ptr->b_ptr->cong_links)) {
+ tipc_bearer_blocked(l_ptr->b_ptr)) {
res = link_schedule_port(l_ptr,
sender->ref, res);
goto exit;
@@ -1329,15 +1316,11 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)
if (r_q_size && buf) {
msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
- if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
- l_ptr->retransm_queue_head = mod(++r_q_head);
- l_ptr->retransm_queue_size = --r_q_size;
- l_ptr->stats.retransmitted++;
- return 0;
- } else {
- l_ptr->stats.bearer_congs++;
- return PUSH_FAILED;
- }
+ tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+ l_ptr->retransm_queue_head = mod(++r_q_head);
+ l_ptr->retransm_queue_size = --r_q_size;
+ l_ptr->stats.retransmitted++;
+ return 0;
}
/* Send deferred protocol message, if any: */
@@ -1345,15 +1328,11 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)
if (buf) {
msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
- if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
- l_ptr->unacked_window = 0;
- kfree_skb(buf);
- l_ptr->proto_msg_queue = NULL;
- return 0;
- } else {
- l_ptr->stats.bearer_congs++;
- return PUSH_FAILED;
- }
+ tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+ l_ptr->unacked_window = 0;
+ kfree_skb(buf);
+ l_ptr->proto_msg_queue = NULL;
+ return 0;
}
/* Send one deferred data message, if send window not full: */
@@ -1366,18 +1345,14 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)
if (mod(next - first) < l_ptr->queue_limit[0]) {
msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
- if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
- if (msg_user(msg) == MSG_BUNDLER)
- msg_set_type(msg, CLOSED_MSG);
- l_ptr->next_out = buf->next;
- return 0;
- } else {
- l_ptr->stats.bearer_congs++;
- return PUSH_FAILED;
- }
+ tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+ if (msg_user(msg) == MSG_BUNDLER)
+ msg_set_type(msg, CLOSED_MSG);
+ l_ptr->next_out = buf->next;
+ return 0;
}
}
- return PUSH_FINISHED;
+ return 1;
}
/*
@@ -1388,15 +1363,12 @@ void tipc_link_push_queue(struct tipc_link *l_ptr)
{
u32 res;
- if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr))
+ if (tipc_bearer_blocked(l_ptr->b_ptr))
return;
do {
res = tipc_link_push_packet(l_ptr);
} while (!res);
-
- if (res == PUSH_FAILED)
- tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
}
static void link_reset_all(unsigned long addr)
@@ -1481,7 +1453,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
msg = buf_msg(buf);
- if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
+ if (tipc_bearer_blocked(l_ptr->b_ptr)) {
if (l_ptr->retransm_queue_size == 0) {
l_ptr->retransm_queue_head = msg_seqno(msg);
l_ptr->retransm_queue_size = retransmits;
@@ -1491,7 +1463,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
}
return;
} else {
- /* Detect repeated retransmit failures on uncongested bearer */
+ /* Detect repeated retransmit failures on unblocked bearer */
if (l_ptr->last_retransmitted == msg_seqno(msg)) {
if (++l_ptr->stale_count > 100) {
link_retransmit_failure(l_ptr, buf);
@@ -1507,17 +1479,10 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
msg = buf_msg(buf);
msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
- if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
- buf = buf->next;
- retransmits--;
- l_ptr->stats.retransmitted++;
- } else {
- tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
- l_ptr->stats.bearer_congs++;
- l_ptr->retransm_queue_head = buf_seqno(buf);
- l_ptr->retransm_queue_size = retransmits;
- return;
- }
+ tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+ buf = buf->next;
+ retransmits--;
+ l_ptr->stats.retransmitted++;
}
l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
@@ -1972,21 +1937,13 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
- /* Defer message if bearer is already congested */
- if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
- l_ptr->proto_msg_queue = buf;
- return;
- }
-
- /* Defer message if attempting to send results in bearer congestion */
- if (!tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
- tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
+ /* Defer message if bearer is already blocked */
+ if (tipc_bearer_blocked(l_ptr->b_ptr)) {
l_ptr->proto_msg_queue = buf;
- l_ptr->stats.bearer_congs++;
return;
}
- /* Discard message if it was sent successfully */
+ tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
l_ptr->unacked_window = 0;
kfree_skb(buf);
}
@@ -2937,8 +2894,8 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
s->sent_nacks, s->sent_acks, s->retransmitted);
ret += tipc_snprintf(buf + ret, buf_size - ret,
- " Congestion bearer:%u link:%u Send queue"
- " max:%u avg:%u\n", s->bearer_congs, s->link_congs,
+ " Congestion link:%u Send queue"
+ " max:%u avg:%u\n", s->link_congs,
s->max_queue_sz, s->queue_sz_counts ?
(s->accu_queue_sz / s->queue_sz_counts) : 0);
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 6e92112..c048ed1 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -40,9 +40,6 @@
#include "msg.h"
#include "node.h"
-#define PUSH_FAILED 1
-#define PUSH_FINISHED 2
-
/*
* Out-of-range value for link sequence numbers
*/
@@ -82,7 +79,6 @@ struct tipc_stats {
u32 recv_fragmented;
u32 recv_fragments;
u32 link_congs; /* # port sends blocked by congestion */
- u32 bearer_congs;
u32 deferred_recv;
u32 duplicates;
u32 max_queue_sz; /* send queue size high water mark */
--
1.7.12.1
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH net-next 5/9] tipc: remove supportable flag from bclink structure
2012-11-22 20:59 [PATCH net-next 0/9] tipc: updates for what will be v3.8 Paul Gortmaker
` (3 preceding siblings ...)
2012-11-22 20:59 ` [PATCH net-next 4/9] tipc: remove the bearer congestion mechanism Paul Gortmaker
@ 2012-11-22 20:59 ` Paul Gortmaker
2012-11-22 20:59 ` [PATCH net-next 6/9] tipc: rename supported flag to recv_permitted Paul Gortmaker
` (4 subsequent siblings)
9 siblings, 0 replies; 11+ messages in thread
From: Paul Gortmaker @ 2012-11-22 20:59 UTC (permalink / raw)
To: David Miller; +Cc: netdev, Ying Xue, Jon Maloy, Paul Gortmaker
From: Ying Xue <ying.xue@windriver.com>
The "supportable" flag in bclink structure is a compatibility flag
indicating whether a peer node is capable of receiving TIPC broadcast
messages. However, all TIPC versions since tipc-1.5, and after the
inclusion in the upstream Linux kernel in 2006, support this capability.
It is highly unlikely that anybody is still using such an old
version of TIPC, let alone that they want to mix it with TIPC-2.0
nodes. Therefore, we now remove the "supportable" flag.
Signed-off-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com>
---
net/tipc/link.c | 4 +---
net/tipc/node.c | 8 +++-----
net/tipc/node.h | 2 --
3 files changed, 4 insertions(+), 10 deletions(-)
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 0cc6480..f3a078f 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1426,8 +1426,7 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
tipc_addr_string_fill(addr_string, n_ptr->addr);
pr_info("Broadcast link info for %s\n", addr_string);
- pr_info("Supportable: %d, Supported: %d, Acked: %u\n",
- n_ptr->bclink.supportable,
+ pr_info("Supported: %d, Acked: %u\n",
n_ptr->bclink.supported,
n_ptr->bclink.acked);
pr_info("Last in: %u, Oos state: %u, Last sent: %u\n",
@@ -2014,7 +2013,6 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
} else {
l_ptr->max_pkt = l_ptr->max_pkt_target;
}
- l_ptr->owner->bclink.supportable = (max_pkt_info != 0);
/* Synchronize broadcast link info, if not done previously */
if (!tipc_node_is_up(l_ptr->owner)) {
diff --git a/net/tipc/node.c b/net/tipc/node.c
index d21db20..6f3e9b3 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -264,11 +264,9 @@ static void node_established_contact(struct tipc_node *n_ptr)
{
tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr);
- if (n_ptr->bclink.supportable) {
- n_ptr->bclink.acked = tipc_bclink_get_last_sent();
- tipc_bclink_add_node(n_ptr->addr);
- n_ptr->bclink.supported = 1;
- }
+ n_ptr->bclink.acked = tipc_bclink_get_last_sent();
+ tipc_bclink_add_node(n_ptr->addr);
+ n_ptr->bclink.supported = 1;
}
static void node_name_purge_complete(unsigned long node_addr)
diff --git a/net/tipc/node.h b/net/tipc/node.h
index cfcaf4d..3ac905f 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -67,7 +67,6 @@
* @permit_changeover: non-zero if node has redundant links to this system
* @signature: node instance identifier
* @bclink: broadcast-related info
- * @supportable: non-zero if node supports TIPC b'cast link capability
* @supported: non-zero if node supports TIPC b'cast capability
* @acked: sequence # of last outbound b'cast message acknowledged by node
* @last_in: sequence # of last in-sequence b'cast message received from node
@@ -92,7 +91,6 @@ struct tipc_node {
int permit_changeover;
u32 signature;
struct {
- u8 supportable;
u8 supported;
u32 acked;
u32 last_in;
--
1.7.12.1
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH net-next 6/9] tipc: rename supported flag to recv_permitted
2012-11-22 20:59 [PATCH net-next 0/9] tipc: updates for what will be v3.8 Paul Gortmaker
` (4 preceding siblings ...)
2012-11-22 20:59 ` [PATCH net-next 5/9] tipc: remove supportable flag from bclink structure Paul Gortmaker
@ 2012-11-22 20:59 ` Paul Gortmaker
2012-11-22 20:59 ` [PATCH net-next 7/9] tipc: introduce message to synchronize broadcast link Paul Gortmaker
` (3 subsequent siblings)
9 siblings, 0 replies; 11+ messages in thread
From: Paul Gortmaker @ 2012-11-22 20:59 UTC (permalink / raw)
To: David Miller; +Cc: netdev, Ying Xue, Jon Maloy, Paul Gortmaker
From: Ying Xue <ying.xue@windriver.com>
Rename the "supported" flag in bclink structure to "recv_permitted"
to better reflect what it is used for. When this flag is set for a
given node, we are permitted to receive and acknowledge broadcast
messages from that node. Convert it to a bool at the same time,
since it is not used to store any numerical values.
Signed-off-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com>
---
net/tipc/bcast.c | 6 +++---
net/tipc/link.c | 8 ++++----
net/tipc/node.c | 6 +++---
net/tipc/node.h | 4 ++--
4 files changed, 12 insertions(+), 12 deletions(-)
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 40da098..54f89f9 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -347,7 +347,7 @@ static void bclink_peek_nack(struct tipc_msg *msg)
tipc_node_lock(n_ptr);
- if (n_ptr->bclink.supported &&
+ if (n_ptr->bclink.recv_permitted &&
(n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
(n_ptr->bclink.last_in == msg_bcgap_after(msg)))
n_ptr->bclink.oos_state = 2;
@@ -429,7 +429,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
goto exit;
tipc_node_lock(node);
- if (unlikely(!node->bclink.supported))
+ if (unlikely(!node->bclink.recv_permitted))
goto unlock;
/* Handle broadcast protocol message */
@@ -564,7 +564,7 @@ exit:
u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
{
- return (n_ptr->bclink.supported &&
+ return (n_ptr->bclink.recv_permitted &&
(tipc_bclink_get_last_sent() != n_ptr->bclink.acked));
}
diff --git a/net/tipc/link.c b/net/tipc/link.c
index f3a078f..20f128f 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1426,8 +1426,8 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
tipc_addr_string_fill(addr_string, n_ptr->addr);
pr_info("Broadcast link info for %s\n", addr_string);
- pr_info("Supported: %d, Acked: %u\n",
- n_ptr->bclink.supported,
+ pr_info("Reception permitted: %d, Acked: %u\n",
+ n_ptr->bclink.recv_permitted,
n_ptr->bclink.acked);
pr_info("Last in: %u, Oos state: %u, Last sent: %u\n",
n_ptr->bclink.last_in,
@@ -1640,7 +1640,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
ackd = msg_ack(msg);
/* Release acked messages */
- if (n_ptr->bclink.supported)
+ if (n_ptr->bclink.recv_permitted)
tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
crs = l_ptr->first_out;
@@ -2067,7 +2067,7 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
}
/* Protocol message before retransmits, reduce loss risk */
- if (l_ptr->owner->bclink.supported)
+ if (l_ptr->owner->bclink.recv_permitted)
tipc_bclink_update_link_state(l_ptr->owner,
msg_last_bcast(msg));
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 6f3e9b3..283a59f 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -266,7 +266,7 @@ static void node_established_contact(struct tipc_node *n_ptr)
n_ptr->bclink.acked = tipc_bclink_get_last_sent();
tipc_bclink_add_node(n_ptr->addr);
- n_ptr->bclink.supported = 1;
+ n_ptr->bclink.recv_permitted = true;
}
static void node_name_purge_complete(unsigned long node_addr)
@@ -292,7 +292,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
tipc_addr_string_fill(addr_string, n_ptr->addr));
/* Flush broadcast link info associated with lost node */
- if (n_ptr->bclink.supported) {
+ if (n_ptr->bclink.recv_permitted) {
while (n_ptr->bclink.deferred_head) {
struct sk_buff *buf = n_ptr->bclink.deferred_head;
n_ptr->bclink.deferred_head = buf->next;
@@ -308,7 +308,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
tipc_bclink_remove_node(n_ptr->addr);
tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ);
- n_ptr->bclink.supported = 0;
+ n_ptr->bclink.recv_permitted = false;
}
/* Abort link changeover */
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 3ac905f..3c189b3 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -67,7 +67,6 @@
* @permit_changeover: non-zero if node has redundant links to this system
* @signature: node instance identifier
* @bclink: broadcast-related info
- * @supported: non-zero if node supports TIPC b'cast capability
* @acked: sequence # of last outbound b'cast message acknowledged by node
* @last_in: sequence # of last in-sequence b'cast message received from node
* @last_sent: sequence # of last b'cast message sent by node
@@ -76,6 +75,7 @@
* @deferred_head: oldest OOS b'cast message received from node
* @deferred_tail: newest OOS b'cast message received from node
* @defragm: list of partially reassembled b'cast message fragments from node
+ * @recv_permitted: true if node is allowed to receive b'cast messages
*/
struct tipc_node {
u32 addr;
@@ -91,7 +91,6 @@ struct tipc_node {
int permit_changeover;
u32 signature;
struct {
- u8 supported;
u32 acked;
u32 last_in;
u32 last_sent;
@@ -100,6 +99,7 @@ struct tipc_node {
struct sk_buff *deferred_head;
struct sk_buff *deferred_tail;
struct sk_buff *defragm;
+ bool recv_permitted;
} bclink;
};
--
1.7.12.1
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH net-next 7/9] tipc: introduce message to synchronize broadcast link
2012-11-22 20:59 [PATCH net-next 0/9] tipc: updates for what will be v3.8 Paul Gortmaker
` (5 preceding siblings ...)
2012-11-22 20:59 ` [PATCH net-next 6/9] tipc: rename supported flag to recv_permitted Paul Gortmaker
@ 2012-11-22 20:59 ` Paul Gortmaker
2012-11-22 20:59 ` [PATCH net-next 8/9] tipc: eliminate an unnecessary cast of node variable Paul Gortmaker
` (2 subsequent siblings)
9 siblings, 0 replies; 11+ messages in thread
From: Paul Gortmaker @ 2012-11-22 20:59 UTC (permalink / raw)
To: David Miller; +Cc: netdev, Jon Maloy, Ying Xue, Paul Gortmaker
From: Jon Maloy <jon.maloy@ericsson.com>
Upon establishing a first link between two nodes, there is
currently a risk that the two endpoints will disagree on exactly
which sequence number reception and acknowleding of broadcast
packets should start.
The following scenarios may happen:
1: Node A sends an ACTIVATE message to B, telling it to start acking
packets from sequence number N.
2: Node A sends out broadcast N, but does not expect an acknowledge
from B, since B is not yet in its broadcast receiver's list.
3: Node A receives ACK for N from all nodes except B, and releases
packet N.
4: Node B receives the ACTIVATE, activates its link endpoint, and
stores the value N as sequence number of first expected packet.
5: Node B sends a NAME_DISTR message to A.
6: Node A receives the NAME_DISTR message, and activates its endpoint.
At this moment B is added to A's broadcast receiver's set.
Node A also sets sequence number 0 as the first broadcast packet
to be received from B.
7: Node A sends broadcast N+1.
8: B receives N+1, determines there is a gap in the sequence, since
it is expecting N, and sends a NACK for N back to A.
9: Node A has already released N, so no retransmission is possible.
The broadcast link in direction A->B is stale.
In addition to, or instead of, 7-9 above, the following may happen:
10: Node B sends broadcast M > 0 to A.
11: Node A receives M, falsely decides there must be a gap, since
it is expecting packet 0, and asks for retransmission of packets
[0,M-1].
12: Node B has already released these packets, so the broadcast
link is stale in direction B->A.
We solve this problem by introducing a new unicast message type,
BCAST_PROTOCOL/STATE, to convey the sequence number of the next
sent broadcast packet to the other endpoint, at exactly the moment
that endpoint is added to the own node's broadcast receivers list,
and before any other unicast messages are permitted to be sent.
Furthermore, we don't allow any node to start receiving and
processing broadcast packets until this new synchronization
message has been received.
To maintain backwards compatibility, we still open up for
broadcast reception if we receive a NAME_DISTR message without
any preceding broadcast sync message. In this case, we must
assume that the other end has an older code version, and will
never send out the new synchronization message. Hence, for mixed
old and new nodes, the issue arising in 7-12 of the above may
happen with the same probability as before.
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com>
---
net/tipc/link.c | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++++++---
net/tipc/node.c | 5 ++---
2 files changed, 60 insertions(+), 6 deletions(-)
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 20f128f..87bf5aa 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1,7 +1,7 @@
/*
* net/tipc/link.c: TIPC link code
*
- * Copyright (c) 1996-2007, Ericsson AB
+ * Copyright (c) 1996-2007, 2012, Ericsson AB
* Copyright (c) 2004-2007, 2010-2011, Wind River Systems
* All rights reserved.
*
@@ -103,6 +103,8 @@ static void link_reset_statistics(struct tipc_link *l_ptr);
static void link_print(struct tipc_link *l_ptr, const char *str);
static void link_start(struct tipc_link *l_ptr);
static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
+static void tipc_link_send_sync(struct tipc_link *l);
+static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf);
/*
* Simple link routines
@@ -712,6 +714,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
link_activate(l_ptr);
tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
l_ptr->fsm_msg_cnt++;
+ if (l_ptr->owner->working_links == 1)
+ tipc_link_send_sync(l_ptr);
link_set_timer(l_ptr, cont_intv);
break;
case RESET_MSG:
@@ -745,6 +749,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
link_activate(l_ptr);
tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
l_ptr->fsm_msg_cnt++;
+ if (l_ptr->owner->working_links == 1)
+ tipc_link_send_sync(l_ptr);
link_set_timer(l_ptr, cont_intv);
break;
case RESET_MSG:
@@ -941,7 +947,48 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
return res;
}
-/**
+/*
+ * tipc_link_send_sync - synchronize broadcast link endpoints.
+ *
+ * Give a newly added peer node the sequence number where it should
+ * start receiving and acking broadcast packets.
+ *
+ * Called with node locked
+ */
+static void tipc_link_send_sync(struct tipc_link *l)
+{
+ struct sk_buff *buf;
+ struct tipc_msg *msg;
+
+ buf = tipc_buf_acquire(INT_H_SIZE);
+ if (!buf)
+ return;
+
+ msg = buf_msg(buf);
+ tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr);
+ msg_set_last_bcast(msg, l->owner->bclink.acked);
+ link_add_chain_to_outqueue(l, buf, 0);
+ tipc_link_push_queue(l);
+}
+
+/*
+ * tipc_link_recv_sync - synchronize broadcast link endpoints.
+ * Receive the sequence number where we should start receiving and
+ * acking broadcast packets from a newly added peer node, and open
+ * up for reception of such packets.
+ *
+ * Called with node locked
+ */
+static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf)
+{
+ struct tipc_msg *msg = buf_msg(buf);
+
+ n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
+ n->bclink.recv_permitted = true;
+ kfree_skb(buf);
+}
+
+/*
* tipc_link_send_names - send name table entries to new neighbor
*
* Send routine for bulk delivery of name table messages when contact
@@ -1691,9 +1738,14 @@ deliver:
tipc_link_recv_bundle(buf);
continue;
case NAME_DISTRIBUTOR:
+ n_ptr->bclink.recv_permitted = true;
tipc_node_unlock(n_ptr);
tipc_named_recv(buf);
continue;
+ case BCAST_PROTOCOL:
+ tipc_link_recv_sync(n_ptr, buf);
+ tipc_node_unlock(n_ptr);
+ continue;
case CONN_MANAGER:
tipc_node_unlock(n_ptr);
tipc_port_recv_proto_msg(buf);
@@ -1736,16 +1788,19 @@ deliver:
continue;
}
+ /* Link is not in state WORKING_WORKING */
if (msg_user(msg) == LINK_PROTOCOL) {
link_recv_proto_msg(l_ptr, buf);
head = link_insert_deferred_queue(l_ptr, head);
tipc_node_unlock(n_ptr);
continue;
}
+
+ /* Traffic message. Conditionally activate link */
link_state_event(l_ptr, TRAFFIC_MSG_EVT);
if (link_working_working(l_ptr)) {
- /* Re-insert in front of queue */
+ /* Re-insert buffer in front of queue */
buf->next = head;
head = buf;
tipc_node_unlock(n_ptr);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 283a59f..48f39dd 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1,7 +1,7 @@
/*
* net/tipc/node.c: TIPC node management routines
*
- * Copyright (c) 2000-2006, Ericsson AB
+ * Copyright (c) 2000-2006, 2012 Ericsson AB
* Copyright (c) 2005-2006, 2010-2011, Wind River Systems
* All rights reserved.
*
@@ -263,10 +263,9 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
static void node_established_contact(struct tipc_node *n_ptr)
{
tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr);
-
+ n_ptr->bclink.oos_state = 0;
n_ptr->bclink.acked = tipc_bclink_get_last_sent();
tipc_bclink_add_node(n_ptr->addr);
- n_ptr->bclink.recv_permitted = true;
}
static void node_name_purge_complete(unsigned long node_addr)
--
1.7.12.1
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH net-next 8/9] tipc: eliminate an unnecessary cast of node variable
2012-11-22 20:59 [PATCH net-next 0/9] tipc: updates for what will be v3.8 Paul Gortmaker
` (6 preceding siblings ...)
2012-11-22 20:59 ` [PATCH net-next 7/9] tipc: introduce message to synchronize broadcast link Paul Gortmaker
@ 2012-11-22 20:59 ` Paul Gortmaker
2012-11-22 20:59 ` [PATCH net-next 9/9] tipc: delete TIPC_ADVANCED Kconfig variable Paul Gortmaker
2012-11-23 19:10 ` [PATCH net-next 0/9] tipc: updates for what will be v3.8 David Miller
9 siblings, 0 replies; 11+ messages in thread
From: Paul Gortmaker @ 2012-11-22 20:59 UTC (permalink / raw)
To: David Miller; +Cc: netdev, Ying Xue, Paul Gortmaker
From: Ying Xue <ying.xue@windriver.com>
As the variable:node is currently defined to u32 type, it is
unnecessary to cast its type to u32 again when using it.
Signed-off-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com>
---
net/tipc/name_distr.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 55d3928..e0d0805 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -262,7 +262,7 @@ void tipc_named_node_up(unsigned long nodearg)
named_distribute(&message_list, node, &publ_zone, max_item_buf);
read_unlock_bh(&tipc_nametbl_lock);
- tipc_link_send_names(&message_list, (u32)node);
+ tipc_link_send_names(&message_list, node);
}
/**
--
1.7.12.1
^ permalink raw reply related [flat|nested] 11+ messages in thread
* [PATCH net-next 9/9] tipc: delete TIPC_ADVANCED Kconfig variable
2012-11-22 20:59 [PATCH net-next 0/9] tipc: updates for what will be v3.8 Paul Gortmaker
` (7 preceding siblings ...)
2012-11-22 20:59 ` [PATCH net-next 8/9] tipc: eliminate an unnecessary cast of node variable Paul Gortmaker
@ 2012-11-22 20:59 ` Paul Gortmaker
2012-11-23 19:10 ` [PATCH net-next 0/9] tipc: updates for what will be v3.8 David Miller
9 siblings, 0 replies; 11+ messages in thread
From: Paul Gortmaker @ 2012-11-22 20:59 UTC (permalink / raw)
To: David Miller; +Cc: netdev, Paul Gortmaker
There used to be a time when TIPC had lots of Kconfig knobs the
end user could alter, but they have all been made automatic or
obsolete, with the exception of CONFIG_TIPC_PORTS. This
previously existing set of options was all hidden under the
TIPC_ADVANCED setting, which does not exist in any code, but
only in Kconfig scope.
Having this now, just to hide the one remaining "advanced"
option no longer makes sense. Remove it. Also get rid of the
ifdeffery in the TIPC code that allowed for TIPC_PORTS to be
possibly undefined.
Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com>
---
net/tipc/Kconfig | 13 +------------
net/tipc/core.c | 5 -----
2 files changed, 1 insertion(+), 17 deletions(-)
diff --git a/net/tipc/Kconfig b/net/tipc/Kconfig
index 5854601..bc41bd3 100644
--- a/net/tipc/Kconfig
+++ b/net/tipc/Kconfig
@@ -20,18 +20,9 @@ menuconfig TIPC
If in doubt, say N.
-if TIPC
-
-config TIPC_ADVANCED
- bool "Advanced TIPC configuration"
- default n
- help
- Saying Y here will open some advanced configuration for TIPC.
- Most users do not need to bother; if unsure, just say N.
-
config TIPC_PORTS
int "Maximum number of ports in a node"
- depends on TIPC_ADVANCED
+ depends on TIPC
range 127 65535
default "8191"
help
@@ -40,5 +31,3 @@ config TIPC_PORTS
Setting this to a smaller value saves some memory,
setting it to higher allows for more ports.
-
-endif # TIPC
diff --git a/net/tipc/core.c b/net/tipc/core.c
index bfe8af8..fc05cec 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -42,11 +42,6 @@
#include <linux/module.h>
-#ifndef CONFIG_TIPC_PORTS
-#define CONFIG_TIPC_PORTS 8191
-#endif
-
-
/* global variables used by multiple sub-systems within TIPC */
int tipc_random __read_mostly;
--
1.7.12.1
^ permalink raw reply related [flat|nested] 11+ messages in thread
* Re: [PATCH net-next 0/9] tipc: updates for what will be v3.8
2012-11-22 20:59 [PATCH net-next 0/9] tipc: updates for what will be v3.8 Paul Gortmaker
` (8 preceding siblings ...)
2012-11-22 20:59 ` [PATCH net-next 9/9] tipc: delete TIPC_ADVANCED Kconfig variable Paul Gortmaker
@ 2012-11-23 19:10 ` David Miller
9 siblings, 0 replies; 11+ messages in thread
From: David Miller @ 2012-11-23 19:10 UTC (permalink / raw)
To: paul.gortmaker; +Cc: netdev
From: Paul Gortmaker <paul.gortmaker@windriver.com>
Date: Thu, 22 Nov 2012 15:59:45 -0500
> The most interesting thing here, at least from a user perspective,
> is the broadcast link fix -- where there was a corner case where
> two endpoints could get in a state where they disagree on where
> to start Rx and ack of broadcast packets.
>
> There is also the poll/wait changes which could also impact
> end users for certain use cases - the fixes there also better
> align tipc with the rest of the networking code.
>
> The rest largely falls into routine cleanup category, by getting
> rid of some unused routines, some Kconfig clutter, etc.
>
> Assuming there is nothing that jumps out as needing a rework,
> the full set can be found as per below.
Pulled, thanks.
^ permalink raw reply [flat|nested] 11+ messages in thread