netdev.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
* [PATCH net-next 0/8] tipc: convert name table read-write lock to RCU
@ 2014-12-02  7:00 Ying Xue
  2014-12-02  7:00 ` [PATCH net-next 1/8] tipc: remove size variable from publ_list struct Ying Xue
                   ` (8 more replies)
  0 siblings, 9 replies; 10+ messages in thread
From: Ying Xue @ 2014-12-02  7:00 UTC (permalink / raw)
  To: davem
  Cc: jon.maloy, Paul.Gortmaker, erik.hugne, richard.alpe, tero.aho,
	netdev, tipc-discussion

Now TIPC name table is statically allocated and is protected with a
Read-Write lock. To enhance the performance of TIPC name table lookup,
we are going to involve RCU lock to protect the name table. As a
consequence, it becomes lockless to concurrently look up name table on
read side. However, before the conversion can be successfully made,
the following two things must be first done:

- change allocation way of name table from static to dynamic
- fix several incorrect locking policy issues

Ying Xue (8):
  tipc: remove size variable from publ_list struct
  tipc: make name table allocated dynamically
  tipc: ensure all name sequences are released when name table is
    stopped
  tipc: ensure all name sequences are properly protected with its lock
  tipc: any name table member must be protected under name table lock
  tipc: simplify relationship between name table lock and node lock
  tipc: remove unnecessary INIT_LIST_HEAD
  tipc: convert name table read-write lock to RCU

 include/linux/rculist.h |    9 +++
 net/tipc/name_distr.c   |   83 +++++++------------
 net/tipc/name_table.c   |  202 ++++++++++++++++++++++++-----------------------
 net/tipc/name_table.h   |   20 ++++-
 net/tipc/subscr.c       |    1 -
 5 files changed, 157 insertions(+), 158 deletions(-)

-- 
1.7.9.5

^ permalink raw reply	[flat|nested] 10+ messages in thread

* [PATCH net-next 1/8] tipc: remove size variable from publ_list struct
  2014-12-02  7:00 [PATCH net-next 0/8] tipc: convert name table read-write lock to RCU Ying Xue
@ 2014-12-02  7:00 ` Ying Xue
  2014-12-02  7:00 ` [PATCH net-next 2/8] tipc: make name table allocated dynamically Ying Xue
                   ` (7 subsequent siblings)
  8 siblings, 0 replies; 10+ messages in thread
From: Ying Xue @ 2014-12-02  7:00 UTC (permalink / raw)
  To: davem
  Cc: jon.maloy, Paul.Gortmaker, erik.hugne, richard.alpe, tero.aho,
	netdev, tipc-discussion

The size variable is introduced in publ_list struct to help us exactly
calculate SKB buffer sizes needed by publications when all publications
in name table are delivered in bulk in named_distribute(). But if
publication SKB buffer size is assumed to MTU, the size variable in
publ_list struct can be completely eliminated at the cost of wasting
a bit memory space for last SKB.

Signed-off-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Tero Aho <tero.aho@coriant.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Jon Maloy <jon.maloy@ericsson.com>
Tested-by: Erik Hugne <erik.hugne@ericsson.com>
---
 net/tipc/name_distr.c |   19 +++++++------------
 1 file changed, 7 insertions(+), 12 deletions(-)

diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 56248db..628cd85 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -41,26 +41,21 @@
 /**
  * struct publ_list - list of publications made by this node
  * @list: circular list of publications
- * @list_size: number of entries in list
  */
 struct publ_list {
 	struct list_head list;
-	u32 size;
 };
 
 static struct publ_list publ_zone = {
 	.list = LIST_HEAD_INIT(publ_zone.list),
-	.size = 0,
 };
 
 static struct publ_list publ_cluster = {
 	.list = LIST_HEAD_INIT(publ_cluster.list),
-	.size = 0,
 };
 
 static struct publ_list publ_node = {
 	.list = LIST_HEAD_INIT(publ_node.list),
-	.size = 0,
 };
 
 static struct publ_list *publ_lists[] = {
@@ -147,7 +142,6 @@ struct sk_buff *tipc_named_publish(struct publication *publ)
 	struct distr_item *item;
 
 	list_add_tail(&publ->local_list, &publ_lists[publ->scope]->list);
-	publ_lists[publ->scope]->size++;
 
 	if (publ->scope == TIPC_NODE_SCOPE)
 		return NULL;
@@ -172,7 +166,6 @@ struct sk_buff *tipc_named_withdraw(struct publication *publ)
 	struct distr_item *item;
 
 	list_del(&publ->local_list);
-	publ_lists[publ->scope]->size--;
 
 	if (publ->scope == TIPC_NODE_SCOPE)
 		return NULL;
@@ -200,16 +193,12 @@ static void named_distribute(struct sk_buff_head *list, u32 dnode,
 	struct publication *publ;
 	struct sk_buff *skb = NULL;
 	struct distr_item *item = NULL;
-	uint dsz = pls->size * ITEM_SIZE;
 	uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE;
-	uint rem = dsz;
-	uint msg_rem = 0;
+	uint msg_rem = msg_dsz;
 
 	list_for_each_entry(publ, &pls->list, local_list) {
 		/* Prepare next buffer: */
 		if (!skb) {
-			msg_rem = min_t(uint, rem, msg_dsz);
-			rem -= msg_rem;
 			skb = named_prepare_buf(PUBLICATION, msg_rem, dnode);
 			if (!skb) {
 				pr_warn("Bulk publication failure\n");
@@ -227,8 +216,14 @@ static void named_distribute(struct sk_buff_head *list, u32 dnode,
 		if (!msg_rem) {
 			__skb_queue_tail(list, skb);
 			skb = NULL;
+			msg_rem = msg_dsz;
 		}
 	}
+	if (skb) {
+		msg_set_size(buf_msg(skb), INT_H_SIZE + (msg_dsz - msg_rem));
+		skb_trim(skb, INT_H_SIZE + (msg_dsz - msg_rem));
+		__skb_queue_tail(list, skb);
+	}
 }
 
 /**
-- 
1.7.9.5

^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH net-next 2/8] tipc: make name table allocated dynamically
  2014-12-02  7:00 [PATCH net-next 0/8] tipc: convert name table read-write lock to RCU Ying Xue
  2014-12-02  7:00 ` [PATCH net-next 1/8] tipc: remove size variable from publ_list struct Ying Xue
@ 2014-12-02  7:00 ` Ying Xue
  2014-12-02  7:00 ` [PATCH net-next 3/8] tipc: ensure all name sequences are released when name table is stopped Ying Xue
                   ` (6 subsequent siblings)
  8 siblings, 0 replies; 10+ messages in thread
From: Ying Xue @ 2014-12-02  7:00 UTC (permalink / raw)
  To: davem; +Cc: jon.maloy, netdev, Paul.Gortmaker, tipc-discussion

Name table locking policy is going to be adjusted from read-write
lock protection to RCU lock protection in the future commits. But
its essential precondition is to convert the allocation way of name
table from static to dynamic mode.

Signed-off-by: Ying Xue <ying.xue@windriver.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Jon Maloy <jon.maloy@ericsson.com>
Tested-by: Erik Hugne <erik.hugne@ericsson.com>
---
 net/tipc/name_distr.c |   44 +++++++++---------------------------
 net/tipc/name_table.c |   60 ++++++++++++++++++++++++-------------------------
 net/tipc/name_table.h |   16 ++++++++++++-
 3 files changed, 55 insertions(+), 65 deletions(-)

diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 628cd85..ed00929 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -38,34 +38,6 @@
 #include "link.h"
 #include "name_distr.h"
 
-/**
- * struct publ_list - list of publications made by this node
- * @list: circular list of publications
- */
-struct publ_list {
-	struct list_head list;
-};
-
-static struct publ_list publ_zone = {
-	.list = LIST_HEAD_INIT(publ_zone.list),
-};
-
-static struct publ_list publ_cluster = {
-	.list = LIST_HEAD_INIT(publ_cluster.list),
-};
-
-static struct publ_list publ_node = {
-	.list = LIST_HEAD_INIT(publ_node.list),
-};
-
-static struct publ_list *publ_lists[] = {
-	NULL,
-	&publ_zone,	/* publ_lists[TIPC_ZONE_SCOPE]		*/
-	&publ_cluster,	/* publ_lists[TIPC_CLUSTER_SCOPE]	*/
-	&publ_node	/* publ_lists[TIPC_NODE_SCOPE]		*/
-};
-
-
 int sysctl_tipc_named_timeout __read_mostly = 2000;
 
 /**
@@ -141,7 +113,8 @@ struct sk_buff *tipc_named_publish(struct publication *publ)
 	struct sk_buff *buf;
 	struct distr_item *item;
 
-	list_add_tail(&publ->local_list, &publ_lists[publ->scope]->list);
+	list_add_tail(&publ->local_list,
+		      &tipc_nametbl->publ_list[publ->scope]);
 
 	if (publ->scope == TIPC_NODE_SCOPE)
 		return NULL;
@@ -188,7 +161,7 @@ struct sk_buff *tipc_named_withdraw(struct publication *publ)
  * @pls: linked list of publication items to be packed into buffer chain
  */
 static void named_distribute(struct sk_buff_head *list, u32 dnode,
-			     struct publ_list *pls)
+			     struct list_head *pls)
 {
 	struct publication *publ;
 	struct sk_buff *skb = NULL;
@@ -196,7 +169,7 @@ static void named_distribute(struct sk_buff_head *list, u32 dnode,
 	uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE;
 	uint msg_rem = msg_dsz;
 
-	list_for_each_entry(publ, &pls->list, local_list) {
+	list_for_each_entry(publ, pls, local_list) {
 		/* Prepare next buffer: */
 		if (!skb) {
 			skb = named_prepare_buf(PUBLICATION, msg_rem, dnode);
@@ -236,8 +209,10 @@ void tipc_named_node_up(u32 dnode)
 	__skb_queue_head_init(&head);
 
 	read_lock_bh(&tipc_nametbl_lock);
-	named_distribute(&head, dnode, &publ_cluster);
-	named_distribute(&head, dnode, &publ_zone);
+	named_distribute(&head, dnode,
+			 &tipc_nametbl->publ_list[TIPC_CLUSTER_SCOPE]);
+	named_distribute(&head, dnode,
+			 &tipc_nametbl->publ_list[TIPC_ZONE_SCOPE]);
 	read_unlock_bh(&tipc_nametbl_lock);
 
 	tipc_link_xmit(&head, dnode, dnode);
@@ -427,7 +402,8 @@ void tipc_named_reinit(void)
 	write_lock_bh(&tipc_nametbl_lock);
 
 	for (scope = TIPC_ZONE_SCOPE; scope <= TIPC_NODE_SCOPE; scope++)
-		list_for_each_entry(publ, &publ_lists[scope]->list, local_list)
+		list_for_each_entry(publ, &tipc_nametbl->publ_list[scope],
+				    local_list)
 			publ->node = tipc_own_addr;
 
 	write_unlock_bh(&tipc_nametbl_lock);
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 772be1c..df3da29 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -2,7 +2,7 @@
  * net/tipc/name_table.c: TIPC name table code
  *
  * Copyright (c) 2000-2006, 2014, Ericsson AB
- * Copyright (c) 2004-2008, 2010-2011, Wind River Systems
+ * Copyright (c) 2004-2008, 2010-2014, Wind River Systems
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -103,18 +103,7 @@ struct name_seq {
 	spinlock_t lock;
 };
 
-/**
- * struct name_table - table containing all existing port name publications
- * @types: pointer to fixed-sized array of name sequence lists,
- *         accessed via hashing on 'type'; name sequence lists are *not* sorted
- * @local_publ_count: number of publications issued by this node
- */
-struct name_table {
-	struct hlist_head *types;
-	u32 local_publ_count;
-};
-
-static struct name_table table;
+struct name_table *tipc_nametbl;
 DEFINE_RWLOCK(tipc_nametbl_lock);
 
 static int hash(int x)
@@ -475,7 +464,7 @@ static struct name_seq *nametbl_find_seq(u32 type)
 	struct hlist_head *seq_head;
 	struct name_seq *ns;
 
-	seq_head = &table.types[hash(type)];
+	seq_head = &tipc_nametbl->seq_hlist[hash(type)];
 	hlist_for_each_entry(ns, seq_head, ns_list) {
 		if (ns->type == type)
 			return ns;
@@ -488,6 +477,7 @@ struct publication *tipc_nametbl_insert_publ(u32 type, u32 lower, u32 upper,
 					     u32 scope, u32 node, u32 port, u32 key)
 {
 	struct name_seq *seq = nametbl_find_seq(type);
+	int index = hash(type);
 
 	if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE) ||
 	    (lower > upper)) {
@@ -497,7 +487,8 @@ struct publication *tipc_nametbl_insert_publ(u32 type, u32 lower, u32 upper,
 	}
 
 	if (!seq)
-		seq = tipc_nameseq_create(type, &table.types[hash(type)]);
+		seq = tipc_nameseq_create(type,
+					  &tipc_nametbl->seq_hlist[index]);
 	if (!seq)
 		return NULL;
 
@@ -667,7 +658,7 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
 	struct publication *publ;
 	struct sk_buff *buf = NULL;
 
-	if (table.local_publ_count >= TIPC_MAX_PUBLICATIONS) {
+	if (tipc_nametbl->local_publ_count >= TIPC_MAX_PUBLICATIONS) {
 		pr_warn("Publication failed, local publication limit reached (%u)\n",
 			TIPC_MAX_PUBLICATIONS);
 		return NULL;
@@ -677,7 +668,7 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
 	publ = tipc_nametbl_insert_publ(type, lower, upper, scope,
 				   tipc_own_addr, port_ref, key);
 	if (likely(publ)) {
-		table.local_publ_count++;
+		tipc_nametbl->local_publ_count++;
 		buf = tipc_named_publish(publ);
 		/* Any pending external events? */
 		tipc_named_process_backlog();
@@ -700,7 +691,7 @@ int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key)
 	write_lock_bh(&tipc_nametbl_lock);
 	publ = tipc_nametbl_remove_publ(type, lower, tipc_own_addr, ref, key);
 	if (likely(publ)) {
-		table.local_publ_count--;
+		tipc_nametbl->local_publ_count--;
 		buf = tipc_named_withdraw(publ);
 		/* Any pending external events? */
 		tipc_named_process_backlog();
@@ -725,12 +716,14 @@ int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key)
 void tipc_nametbl_subscribe(struct tipc_subscription *s)
 {
 	u32 type = s->seq.type;
+	int index = hash(type);
 	struct name_seq *seq;
 
 	write_lock_bh(&tipc_nametbl_lock);
 	seq = nametbl_find_seq(type);
 	if (!seq)
-		seq = tipc_nameseq_create(type, &table.types[hash(type)]);
+		seq = tipc_nameseq_create(type,
+					  &tipc_nametbl->seq_hlist[index]);
 	if (seq) {
 		spin_lock_bh(&seq->lock);
 		tipc_nameseq_subscribe(seq, s);
@@ -882,7 +875,7 @@ static int nametbl_list(char *buf, int len, u32 depth_info,
 		lowbound = 0;
 		upbound = ~0;
 		for (i = 0; i < TIPC_NAMETBL_SIZE; i++) {
-			seq_head = &table.types[i];
+			seq_head = &tipc_nametbl->seq_hlist[i];
 			hlist_for_each_entry(seq, seq_head, ns_list) {
 				ret += nameseq_list(seq, buf + ret, len - ret,
 						   depth, seq->type,
@@ -898,7 +891,7 @@ static int nametbl_list(char *buf, int len, u32 depth_info,
 		}
 		ret += nametbl_header(buf + ret, len - ret, depth);
 		i = hash(type);
-		seq_head = &table.types[i];
+		seq_head = &tipc_nametbl->seq_hlist[i];
 		hlist_for_each_entry(seq, seq_head, ns_list) {
 			if (seq->type == type) {
 				ret += nameseq_list(seq, buf + ret, len - ret,
@@ -945,12 +938,18 @@ struct sk_buff *tipc_nametbl_get(const void *req_tlv_area, int req_tlv_space)
 
 int tipc_nametbl_init(void)
 {
-	table.types = kcalloc(TIPC_NAMETBL_SIZE, sizeof(struct hlist_head),
-			      GFP_ATOMIC);
-	if (!table.types)
+	int i;
+
+	tipc_nametbl = kzalloc(sizeof(*tipc_nametbl), GFP_ATOMIC);
+	if (!tipc_nametbl)
 		return -ENOMEM;
 
-	table.local_publ_count = 0;
+	for (i = 0; i < TIPC_NAMETBL_SIZE; i++)
+		INIT_HLIST_HEAD(&tipc_nametbl->seq_hlist[i]);
+
+	INIT_LIST_HEAD(&tipc_nametbl->publ_list[TIPC_ZONE_SCOPE]);
+	INIT_LIST_HEAD(&tipc_nametbl->publ_list[TIPC_CLUSTER_SCOPE]);
+	INIT_LIST_HEAD(&tipc_nametbl->publ_list[TIPC_NODE_SCOPE]);
 	return 0;
 }
 
@@ -990,16 +989,17 @@ void tipc_nametbl_stop(void)
 	 */
 	write_lock_bh(&tipc_nametbl_lock);
 	for (i = 0; i < TIPC_NAMETBL_SIZE; i++) {
-		if (hlist_empty(&table.types[i]))
+		if (hlist_empty(&tipc_nametbl->seq_hlist[i]))
 			continue;
-		seq_head = &table.types[i];
+		seq_head = &tipc_nametbl->seq_hlist[i];
 		hlist_for_each_entry_safe(seq, safe, seq_head, ns_list) {
 			tipc_purge_publications(seq);
 		}
 	}
-	kfree(table.types);
-	table.types = NULL;
 	write_unlock_bh(&tipc_nametbl_lock);
+
+	kfree(tipc_nametbl);
+
 }
 
 static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg,
@@ -1113,7 +1113,7 @@ static int __tipc_nl_seq_list(struct tipc_nl_msg *msg, u32 *last_type,
 		i = 0;
 
 	for (; i < TIPC_NAMETBL_SIZE; i++) {
-		seq_head = &table.types[i];
+		seq_head = &tipc_nametbl->seq_hlist[i];
 
 		if (*last_type) {
 			seq = nametbl_find_seq(*last_type);
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index c628778..c1fd734 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -43,7 +43,9 @@ struct tipc_port_list;
 /*
  * TIPC name types reserved for internal TIPC use (both current and planned)
  */
-#define TIPC_ZM_SRV 3		/* zone master service name type */
+#define TIPC_ZM_SRV		3	/* zone master service name type */
+#define TIPC_PUBL_SCOPE_NUM	(TIPC_NODE_SCOPE + 1)
+#define TIPC_NAMETBL_SIZE	1024	/* must be a power of 2 */
 
 /**
  * struct publication - info about a published (name or) name sequence
@@ -79,8 +81,20 @@ struct publication {
 	struct list_head zone_list;
 };
 
+/**
+ * struct name_table - table containing all existing port name publications
+ * @seq_hlist: name sequence hash lists
+ * @publ_list: pulication lists
+ * @local_publ_count: number of publications issued by this node
+ */
+struct name_table {
+	struct hlist_head seq_hlist[TIPC_NAMETBL_SIZE];
+	struct list_head publ_list[TIPC_PUBL_SCOPE_NUM];
+	u32 local_publ_count;
+};
 
 extern rwlock_t tipc_nametbl_lock;
+extern struct name_table *tipc_nametbl;
 
 int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
 
-- 
1.7.9.5


------------------------------------------------------------------------------
Download BIRT iHub F-Type - The Free Enterprise-Grade BIRT Server
from Actuate! Instantly Supercharge Your Business Reports and Dashboards
with Interactivity, Sharing, Native Excel Exports, App Integration & more
Get technology previously reserved for billion-dollar corporations, FREE
http://pubads.g.doubleclick.net/gampad/clk?id=157005751&iu=/4140/ostg.clktrk

^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH net-next 3/8] tipc: ensure all name sequences are released when name table is stopped
  2014-12-02  7:00 [PATCH net-next 0/8] tipc: convert name table read-write lock to RCU Ying Xue
  2014-12-02  7:00 ` [PATCH net-next 1/8] tipc: remove size variable from publ_list struct Ying Xue
  2014-12-02  7:00 ` [PATCH net-next 2/8] tipc: make name table allocated dynamically Ying Xue
@ 2014-12-02  7:00 ` Ying Xue
  2014-12-02  7:00 ` [PATCH net-next 4/8] tipc: ensure all name sequences are properly protected with its lock Ying Xue
                   ` (5 subsequent siblings)
  8 siblings, 0 replies; 10+ messages in thread
From: Ying Xue @ 2014-12-02  7:00 UTC (permalink / raw)
  To: davem; +Cc: jon.maloy, netdev, Paul.Gortmaker, tipc-discussion

As TIPC subscriber server is terminated before name table, no user
depends on subscription list of name sequence when name table is
stopped. Therefore, all name sequences stored in name table should
be released whatever their subscriptions lists are empty or not,
otherwise, memory leak might happen.

Signed-off-by: Ying Xue <ying.xue@windriver.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Jon Maloy <jon.maloy@ericsson.com>
Tested-by: Erik Hugne <erik.hugne@ericsson.com>
---
 net/tipc/name_table.c |    7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index df3da29..ba0ee3e 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -964,10 +964,6 @@ static void tipc_purge_publications(struct name_seq *seq)
 	struct sub_seq *sseq;
 	struct name_info *info;
 
-	if (!seq->sseqs) {
-		nameseq_delete_empty(seq);
-		return;
-	}
 	sseq = seq->sseqs;
 	info = sseq->info;
 	list_for_each_entry_safe(publ, safe, &info->zone_list, zone_list) {
@@ -975,6 +971,9 @@ static void tipc_purge_publications(struct name_seq *seq)
 					 publ->ref, publ->key);
 		kfree(publ);
 	}
+	hlist_del_init(&seq->ns_list);
+	kfree(seq->sseqs);
+	kfree(seq);
 }
 
 void tipc_nametbl_stop(void)
-- 
1.7.9.5


------------------------------------------------------------------------------
Download BIRT iHub F-Type - The Free Enterprise-Grade BIRT Server
from Actuate! Instantly Supercharge Your Business Reports and Dashboards
with Interactivity, Sharing, Native Excel Exports, App Integration & more
Get technology previously reserved for billion-dollar corporations, FREE
http://pubads.g.doubleclick.net/gampad/clk?id=157005751&iu=/4140/ostg.clktrk

^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH net-next 4/8] tipc: ensure all name sequences are properly protected with its lock
  2014-12-02  7:00 [PATCH net-next 0/8] tipc: convert name table read-write lock to RCU Ying Xue
                   ` (2 preceding siblings ...)
  2014-12-02  7:00 ` [PATCH net-next 3/8] tipc: ensure all name sequences are released when name table is stopped Ying Xue
@ 2014-12-02  7:00 ` Ying Xue
  2014-12-02  7:00 ` [PATCH net-next 5/8] tipc: any name table member must be protected under name table lock Ying Xue
                   ` (4 subsequent siblings)
  8 siblings, 0 replies; 10+ messages in thread
From: Ying Xue @ 2014-12-02  7:00 UTC (permalink / raw)
  To: davem; +Cc: jon.maloy, netdev, Paul.Gortmaker, tipc-discussion

TIPC internally created a name table which is used to store name
sequences. Now there is a read-write lock - tipc_nametbl_lock to
protect the table, and each name sequence saved in the table is
protected with its private lock. When a name sequence is inserted
or removed to or from the table, its members might need to change.
Therefore, in normal case, the two locks must be held while TIPC
operates the table. However, there are still several places where
we only hold tipc_nametbl_lock without proprerly obtaining name
sequence lock, which might cause the corruption of name sequence.

Signed-off-by: Ying Xue <ying.xue@windriver.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Jon Maloy <jon.maloy@ericsson.com>
Tested-by: Erik Hugne <erik.hugne@ericsson.com>
---
 net/tipc/name_table.c |   48 +++++++++++++++++++++++++++---------------------
 1 file changed, 27 insertions(+), 21 deletions(-)

diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index ba0ee3e..0d61f58 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -172,18 +172,6 @@ static struct name_seq *tipc_nameseq_create(u32 type, struct hlist_head *seq_hea
 	return nseq;
 }
 
-/*
- * nameseq_delete_empty - deletes a name sequence structure if now unused
- */
-static void nameseq_delete_empty(struct name_seq *seq)
-{
-	if (!seq->first_free && list_empty(&seq->subscriptions)) {
-		hlist_del_init(&seq->ns_list);
-		kfree(seq->sseqs);
-		kfree(seq);
-	}
-}
-
 /**
  * nameseq_find_subseq - find sub-sequence (if any) matching a name instance
  *
@@ -476,6 +464,7 @@ static struct name_seq *nametbl_find_seq(u32 type)
 struct publication *tipc_nametbl_insert_publ(u32 type, u32 lower, u32 upper,
 					     u32 scope, u32 node, u32 port, u32 key)
 {
+	struct publication *publ;
 	struct name_seq *seq = nametbl_find_seq(type);
 	int index = hash(type);
 
@@ -492,8 +481,11 @@ struct publication *tipc_nametbl_insert_publ(u32 type, u32 lower, u32 upper,
 	if (!seq)
 		return NULL;
 
-	return tipc_nameseq_insert_publ(seq, type, lower, upper,
+	spin_lock_bh(&seq->lock);
+	publ = tipc_nameseq_insert_publ(seq, type, lower, upper,
 					scope, node, port, key);
+	spin_unlock_bh(&seq->lock);
+	return publ;
 }
 
 struct publication *tipc_nametbl_remove_publ(u32 type, u32 lower,
@@ -505,8 +497,16 @@ struct publication *tipc_nametbl_remove_publ(u32 type, u32 lower,
 	if (!seq)
 		return NULL;
 
+	spin_lock_bh(&seq->lock);
 	publ = tipc_nameseq_remove_publ(seq, lower, node, ref, key);
-	nameseq_delete_empty(seq);
+	if (!seq->first_free && list_empty(&seq->subscriptions)) {
+		hlist_del_init(&seq->ns_list);
+		spin_unlock_bh(&seq->lock);
+		kfree(seq->sseqs);
+		kfree(seq);
+		return publ;
+	}
+	spin_unlock_bh(&seq->lock);
 	return publ;
 }
 
@@ -539,10 +539,10 @@ u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *destnode)
 	seq = nametbl_find_seq(type);
 	if (unlikely(!seq))
 		goto not_found;
+	spin_lock_bh(&seq->lock);
 	sseq = nameseq_find_subseq(seq, instance);
 	if (unlikely(!sseq))
-		goto not_found;
-	spin_lock_bh(&seq->lock);
+		goto no_match;
 	info = sseq->info;
 
 	/* Closest-First Algorithm */
@@ -624,7 +624,6 @@ int tipc_nametbl_mc_translate(u32 type, u32 lower, u32 upper, u32 limit,
 		goto exit;
 
 	spin_lock_bh(&seq->lock);
-
 	sseq = seq->sseqs + nameseq_locate_subseq(seq, lower);
 	sseq_stop = seq->sseqs + seq->first_free;
 	for (; sseq != sseq_stop; sseq++) {
@@ -642,7 +641,6 @@ int tipc_nametbl_mc_translate(u32 type, u32 lower, u32 upper, u32 limit,
 		if (info->cluster_list_size != info->node_list_size)
 			res = 1;
 	}
-
 	spin_unlock_bh(&seq->lock);
 exit:
 	read_unlock_bh(&tipc_nametbl_lock);
@@ -747,8 +745,14 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s)
 	if (seq != NULL) {
 		spin_lock_bh(&seq->lock);
 		list_del_init(&s->nameseq_list);
-		spin_unlock_bh(&seq->lock);
-		nameseq_delete_empty(seq);
+		if (!seq->first_free && list_empty(&seq->subscriptions)) {
+			hlist_del_init(&seq->ns_list);
+			spin_unlock_bh(&seq->lock);
+			kfree(seq->sseqs);
+			kfree(seq);
+		} else {
+			spin_unlock_bh(&seq->lock);
+		}
 	}
 	write_unlock_bh(&tipc_nametbl_lock);
 }
@@ -964,6 +968,7 @@ static void tipc_purge_publications(struct name_seq *seq)
 	struct sub_seq *sseq;
 	struct name_info *info;
 
+	spin_lock_bh(&seq->lock);
 	sseq = seq->sseqs;
 	info = sseq->info;
 	list_for_each_entry_safe(publ, safe, &info->zone_list, zone_list) {
@@ -972,6 +977,8 @@ static void tipc_purge_publications(struct name_seq *seq)
 		kfree(publ);
 	}
 	hlist_del_init(&seq->ns_list);
+	spin_lock_bh(&seq->lock);
+
 	kfree(seq->sseqs);
 	kfree(seq);
 }
@@ -1127,7 +1134,6 @@ static int __tipc_nl_seq_list(struct tipc_nl_msg *msg, u32 *last_type,
 
 		hlist_for_each_entry_from(seq, ns_list) {
 			spin_lock_bh(&seq->lock);
-
 			err = __tipc_nl_subseq_list(msg, seq, last_lower,
 						    last_publ);
 
-- 
1.7.9.5


------------------------------------------------------------------------------
Download BIRT iHub F-Type - The Free Enterprise-Grade BIRT Server
from Actuate! Instantly Supercharge Your Business Reports and Dashboards
with Interactivity, Sharing, Native Excel Exports, App Integration & more
Get technology previously reserved for billion-dollar corporations, FREE
http://pubads.g.doubleclick.net/gampad/clk?id=157005751&iu=/4140/ostg.clktrk

^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH net-next 5/8] tipc: any name table member must be protected under name table lock
  2014-12-02  7:00 [PATCH net-next 0/8] tipc: convert name table read-write lock to RCU Ying Xue
                   ` (3 preceding siblings ...)
  2014-12-02  7:00 ` [PATCH net-next 4/8] tipc: ensure all name sequences are properly protected with its lock Ying Xue
@ 2014-12-02  7:00 ` Ying Xue
  2014-12-02  7:00 ` [PATCH net-next 6/8] tipc: simplify relationship between name table lock and node lock Ying Xue
                   ` (3 subsequent siblings)
  8 siblings, 0 replies; 10+ messages in thread
From: Ying Xue @ 2014-12-02  7:00 UTC (permalink / raw)
  To: davem; +Cc: jon.maloy, netdev, Paul.Gortmaker, tipc-discussion

As tipc_nametbl_lock is used to protect name_table structure, the lock
must be held while all members of name_table structure are accessed.
However, the lock is not obtained while a member of name_table
structure - local_publ_count is read in tipc_nametbl_publish(), as
a consequence, an inconsistent value of local_publ_count might be got.

Signed-off-by: Ying Xue <ying.xue@windriver.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Jon Maloy <jon.maloy@ericsson.com>
Tested-by: Erik Hugne <erik.hugne@ericsson.com>
---
 net/tipc/name_table.c |    3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 0d61f58..93bac40 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -656,13 +656,14 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
 	struct publication *publ;
 	struct sk_buff *buf = NULL;
 
+	write_lock_bh(&tipc_nametbl_lock);
 	if (tipc_nametbl->local_publ_count >= TIPC_MAX_PUBLICATIONS) {
 		pr_warn("Publication failed, local publication limit reached (%u)\n",
 			TIPC_MAX_PUBLICATIONS);
+		write_unlock_bh(&tipc_nametbl_lock);
 		return NULL;
 	}
 
-	write_lock_bh(&tipc_nametbl_lock);
 	publ = tipc_nametbl_insert_publ(type, lower, upper, scope,
 				   tipc_own_addr, port_ref, key);
 	if (likely(publ)) {
-- 
1.7.9.5


------------------------------------------------------------------------------
Download BIRT iHub F-Type - The Free Enterprise-Grade BIRT Server
from Actuate! Instantly Supercharge Your Business Reports and Dashboards
with Interactivity, Sharing, Native Excel Exports, App Integration & more
Get technology previously reserved for billion-dollar corporations, FREE
http://pubads.g.doubleclick.net/gampad/clk?id=157005751&iu=/4140/ostg.clktrk

^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH net-next 6/8] tipc: simplify relationship between name table lock and node lock
  2014-12-02  7:00 [PATCH net-next 0/8] tipc: convert name table read-write lock to RCU Ying Xue
                   ` (4 preceding siblings ...)
  2014-12-02  7:00 ` [PATCH net-next 5/8] tipc: any name table member must be protected under name table lock Ying Xue
@ 2014-12-02  7:00 ` Ying Xue
  2014-12-02  7:00 ` [PATCH net-next 7/8] tipc: remove unnecessary INIT_LIST_HEAD Ying Xue
                   ` (2 subsequent siblings)
  8 siblings, 0 replies; 10+ messages in thread
From: Ying Xue @ 2014-12-02  7:00 UTC (permalink / raw)
  To: davem; +Cc: jon.maloy, netdev, Paul.Gortmaker, tipc-discussion

When tipc name sequence is published, name table lock is released
before name sequence buffer is delivered to remote nodes through its
underlying unicast links. However, when name sequence is withdrawn,
the name table lock is held until the transmission of the removal
message of name sequence is finished. During the process, node lock
is nested in name table lock. To prevent node lock from being nested
in name table lock, while withdrawing name, we should adopt the same
locking policy of publishing name sequence: name table lock should
be released before message is sent.

Signed-off-by: Ying Xue <ying.xue@windriver.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Jon Maloy <jon.maloy@ericsson.com>
Tested-by: Erik Hugne <erik.hugne@ericsson.com>
---
 net/tipc/name_table.c |   19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)

diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 93bac40..bc3e7ed 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -685,27 +685,28 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
 int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key)
 {
 	struct publication *publ;
-	struct sk_buff *buf;
+	struct sk_buff *skb = NULL;
 
 	write_lock_bh(&tipc_nametbl_lock);
 	publ = tipc_nametbl_remove_publ(type, lower, tipc_own_addr, ref, key);
 	if (likely(publ)) {
 		tipc_nametbl->local_publ_count--;
-		buf = tipc_named_withdraw(publ);
+		skb = tipc_named_withdraw(publ);
 		/* Any pending external events? */
 		tipc_named_process_backlog();
-		write_unlock_bh(&tipc_nametbl_lock);
 		list_del_init(&publ->pport_list);
 		kfree(publ);
+	} else {
+		pr_err("Unable to remove local publication\n"
+		       "(type=%u, lower=%u, ref=%u, key=%u)\n",
+		       type, lower, ref, key);
+	}
+	write_unlock_bh(&tipc_nametbl_lock);
 
-		if (buf)
-			named_cluster_distribute(buf);
+	if (skb) {
+		named_cluster_distribute(skb);
 		return 1;
 	}
-	write_unlock_bh(&tipc_nametbl_lock);
-	pr_err("Unable to remove local publication\n"
-	       "(type=%u, lower=%u, ref=%u, key=%u)\n",
-	       type, lower, ref, key);
 	return 0;
 }
 
-- 
1.7.9.5


------------------------------------------------------------------------------
Download BIRT iHub F-Type - The Free Enterprise-Grade BIRT Server
from Actuate! Instantly Supercharge Your Business Reports and Dashboards
with Interactivity, Sharing, Native Excel Exports, App Integration & more
Get technology previously reserved for billion-dollar corporations, FREE
http://pubads.g.doubleclick.net/gampad/clk?id=157005751&iu=/4140/ostg.clktrk

^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH net-next 7/8] tipc: remove unnecessary INIT_LIST_HEAD
  2014-12-02  7:00 [PATCH net-next 0/8] tipc: convert name table read-write lock to RCU Ying Xue
                   ` (5 preceding siblings ...)
  2014-12-02  7:00 ` [PATCH net-next 6/8] tipc: simplify relationship between name table lock and node lock Ying Xue
@ 2014-12-02  7:00 ` Ying Xue
  2014-12-02  7:00 ` [PATCH net-next 8/8] tipc: convert name table read-write lock to RCU Ying Xue
  2014-12-09  1:41 ` [PATCH net-next 0/8] " David Miller
  8 siblings, 0 replies; 10+ messages in thread
From: Ying Xue @ 2014-12-02  7:00 UTC (permalink / raw)
  To: davem; +Cc: jon.maloy, netdev, Paul.Gortmaker, tipc-discussion

When a list_head variable is seen as a new entry to be added to a
list head, it's unnecessary to be initialized with INIT_LIST_HEAD().

Signed-off-by: Ying Xue <ying.xue@windriver.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Jon Maloy <jon.maloy@ericsson.com>
Tested-by: Erik Hugne <erik.hugne@ericsson.com>
---
 net/tipc/name_table.c |    2 --
 net/tipc/subscr.c     |    1 -
 2 files changed, 3 deletions(-)

diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index bc3e7ed..3c2e0c3 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -131,9 +131,7 @@ static struct publication *publ_create(u32 type, u32 lower, u32 upper,
 	publ->node = node;
 	publ->ref = port_ref;
 	publ->key = key;
-	INIT_LIST_HEAD(&publ->local_list);
 	INIT_LIST_HEAD(&publ->pport_list);
-	INIT_LIST_HEAD(&publ->nodesub_list);
 	return publ;
 }
 
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 31b5cb2..0344206 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -305,7 +305,6 @@ static int subscr_subscribe(struct tipc_subscr *s,
 		kfree(sub);
 		return -EINVAL;
 	}
-	INIT_LIST_HEAD(&sub->nameseq_list);
 	list_add(&sub->subscription_list, &subscriber->subscription_list);
 	sub->subscriber = subscriber;
 	sub->swap = swap;
-- 
1.7.9.5


------------------------------------------------------------------------------
Download BIRT iHub F-Type - The Free Enterprise-Grade BIRT Server
from Actuate! Instantly Supercharge Your Business Reports and Dashboards
with Interactivity, Sharing, Native Excel Exports, App Integration & more
Get technology previously reserved for billion-dollar corporations, FREE
http://pubads.g.doubleclick.net/gampad/clk?id=157005751&iu=/4140/ostg.clktrk

^ permalink raw reply related	[flat|nested] 10+ messages in thread

* [PATCH net-next 8/8] tipc: convert name table read-write lock to RCU
  2014-12-02  7:00 [PATCH net-next 0/8] tipc: convert name table read-write lock to RCU Ying Xue
                   ` (6 preceding siblings ...)
  2014-12-02  7:00 ` [PATCH net-next 7/8] tipc: remove unnecessary INIT_LIST_HEAD Ying Xue
@ 2014-12-02  7:00 ` Ying Xue
  2014-12-09  1:41 ` [PATCH net-next 0/8] " David Miller
  8 siblings, 0 replies; 10+ messages in thread
From: Ying Xue @ 2014-12-02  7:00 UTC (permalink / raw)
  To: davem; +Cc: jon.maloy, netdev, Paul.Gortmaker, tipc-discussion

Convert tipc name table read-write lock to RCU. After this change,
a new spin lock is used to protect name table on write side while
RCU is applied on read side.

Signed-off-by: Ying Xue <ying.xue@windriver.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Jon Maloy <jon.maloy@ericsson.com>
Tested-by: Erik Hugne <erik.hugne@ericsson.com>
---
 include/linux/rculist.h |    9 +++++
 net/tipc/name_distr.c   |   28 +++++++--------
 net/tipc/name_table.c   |   87 +++++++++++++++++++++++------------------------
 net/tipc/name_table.h   |    4 ++-
 4 files changed, 69 insertions(+), 59 deletions(-)

diff --git a/include/linux/rculist.h b/include/linux/rculist.h
index 372ad5e..aa79b3c 100644
--- a/include/linux/rculist.h
+++ b/include/linux/rculist.h
@@ -542,6 +542,15 @@ static inline void hlist_add_behind_rcu(struct hlist_node *n,
 	     pos = hlist_entry_safe(rcu_dereference_bh((pos)->member.next),\
 			typeof(*(pos)), member))
 
+/**
+ * hlist_for_each_entry_from_rcu - iterate over a hlist continuing from current point
+ * @pos:	the type * to use as a loop cursor.
+ * @member:	the name of the hlist_node within the struct.
+ */
+#define hlist_for_each_entry_from_rcu(pos, member)			\
+	for (; pos;							\
+	     pos = hlist_entry_safe(rcu_dereference((pos)->member.next),\
+			typeof(*(pos)), member))
 
 #endif	/* __KERNEL__ */
 #endif
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index ed00929..ba6083d 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -113,8 +113,8 @@ struct sk_buff *tipc_named_publish(struct publication *publ)
 	struct sk_buff *buf;
 	struct distr_item *item;
 
-	list_add_tail(&publ->local_list,
-		      &tipc_nametbl->publ_list[publ->scope]);
+	list_add_tail_rcu(&publ->local_list,
+			  &tipc_nametbl->publ_list[publ->scope]);
 
 	if (publ->scope == TIPC_NODE_SCOPE)
 		return NULL;
@@ -208,12 +208,12 @@ void tipc_named_node_up(u32 dnode)
 
 	__skb_queue_head_init(&head);
 
-	read_lock_bh(&tipc_nametbl_lock);
+	rcu_read_lock();
 	named_distribute(&head, dnode,
 			 &tipc_nametbl->publ_list[TIPC_CLUSTER_SCOPE]);
 	named_distribute(&head, dnode,
 			 &tipc_nametbl->publ_list[TIPC_ZONE_SCOPE]);
-	read_unlock_bh(&tipc_nametbl_lock);
+	rcu_read_unlock();
 
 	tipc_link_xmit(&head, dnode, dnode);
 }
@@ -260,12 +260,12 @@ static void tipc_publ_purge(struct publication *publ, u32 addr)
 {
 	struct publication *p;
 
-	write_lock_bh(&tipc_nametbl_lock);
+	spin_lock_bh(&tipc_nametbl_lock);
 	p = tipc_nametbl_remove_publ(publ->type, publ->lower,
 				     publ->node, publ->ref, publ->key);
 	if (p)
 		tipc_publ_unsubscribe(p, addr);
-	write_unlock_bh(&tipc_nametbl_lock);
+	spin_unlock_bh(&tipc_nametbl_lock);
 
 	if (p != publ) {
 		pr_err("Unable to remove publication from failed node\n"
@@ -274,7 +274,7 @@ static void tipc_publ_purge(struct publication *publ, u32 addr)
 		       publ->key);
 	}
 
-	kfree(p);
+	kfree_rcu(p, rcu);
 }
 
 void tipc_publ_notify(struct list_head *nsub_list, u32 addr)
@@ -311,7 +311,7 @@ static bool tipc_update_nametbl(struct distr_item *i, u32 node, u32 dtype)
 						ntohl(i->key));
 		if (publ) {
 			tipc_publ_unsubscribe(publ, node);
-			kfree(publ);
+			kfree_rcu(publ, rcu);
 			return true;
 		}
 	} else {
@@ -376,14 +376,14 @@ void tipc_named_rcv(struct sk_buff *buf)
 	u32 count = msg_data_sz(msg) / ITEM_SIZE;
 	u32 node = msg_orignode(msg);
 
-	write_lock_bh(&tipc_nametbl_lock);
+	spin_lock_bh(&tipc_nametbl_lock);
 	while (count--) {
 		if (!tipc_update_nametbl(item, node, msg_type(msg)))
 			tipc_named_add_backlog(item, msg_type(msg), node);
 		item++;
 	}
 	tipc_named_process_backlog();
-	write_unlock_bh(&tipc_nametbl_lock);
+	spin_unlock_bh(&tipc_nametbl_lock);
 	kfree_skb(buf);
 }
 
@@ -399,12 +399,12 @@ void tipc_named_reinit(void)
 	struct publication *publ;
 	int scope;
 
-	write_lock_bh(&tipc_nametbl_lock);
+	spin_lock_bh(&tipc_nametbl_lock);
 
 	for (scope = TIPC_ZONE_SCOPE; scope <= TIPC_NODE_SCOPE; scope++)
-		list_for_each_entry(publ, &tipc_nametbl->publ_list[scope],
-				    local_list)
+		list_for_each_entry_rcu(publ, &tipc_nametbl->publ_list[scope],
+					local_list)
 			publ->node = tipc_own_addr;
 
-	write_unlock_bh(&tipc_nametbl_lock);
+	spin_unlock_bh(&tipc_nametbl_lock);
 }
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 3c2e0c3..aafa684c 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -92,6 +92,7 @@ struct sub_seq {
  * @ns_list: links to adjacent name sequences in hash chain
  * @subscriptions: list of subscriptions for this 'type'
  * @lock: spinlock controlling access to publication lists of all sub-sequences
+ * @rcu: RCU callback head used for deferred freeing
  */
 struct name_seq {
 	u32 type;
@@ -101,10 +102,11 @@ struct name_seq {
 	struct hlist_node ns_list;
 	struct list_head subscriptions;
 	spinlock_t lock;
+	struct rcu_head rcu;
 };
 
 struct name_table *tipc_nametbl;
-DEFINE_RWLOCK(tipc_nametbl_lock);
+DEFINE_SPINLOCK(tipc_nametbl_lock);
 
 static int hash(int x)
 {
@@ -166,7 +168,7 @@ static struct name_seq *tipc_nameseq_create(u32 type, struct hlist_head *seq_hea
 	nseq->alloc = 1;
 	INIT_HLIST_NODE(&nseq->ns_list);
 	INIT_LIST_HEAD(&nseq->subscriptions);
-	hlist_add_head(&nseq->ns_list, seq_head);
+	hlist_add_head_rcu(&nseq->ns_list, seq_head);
 	return nseq;
 }
 
@@ -451,7 +453,7 @@ static struct name_seq *nametbl_find_seq(u32 type)
 	struct name_seq *ns;
 
 	seq_head = &tipc_nametbl->seq_hlist[hash(type)];
-	hlist_for_each_entry(ns, seq_head, ns_list) {
+	hlist_for_each_entry_rcu(ns, seq_head, ns_list) {
 		if (ns->type == type)
 			return ns;
 	}
@@ -498,10 +500,10 @@ struct publication *tipc_nametbl_remove_publ(u32 type, u32 lower,
 	spin_lock_bh(&seq->lock);
 	publ = tipc_nameseq_remove_publ(seq, lower, node, ref, key);
 	if (!seq->first_free && list_empty(&seq->subscriptions)) {
-		hlist_del_init(&seq->ns_list);
-		spin_unlock_bh(&seq->lock);
+		hlist_del_init_rcu(&seq->ns_list);
 		kfree(seq->sseqs);
-		kfree(seq);
+		spin_unlock_bh(&seq->lock);
+		kfree_rcu(seq, rcu);
 		return publ;
 	}
 	spin_unlock_bh(&seq->lock);
@@ -533,7 +535,7 @@ u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *destnode)
 	if (!tipc_in_scope(*destnode, tipc_own_addr))
 		return 0;
 
-	read_lock_bh(&tipc_nametbl_lock);
+	rcu_read_lock();
 	seq = nametbl_find_seq(type);
 	if (unlikely(!seq))
 		goto not_found;
@@ -590,7 +592,7 @@ u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *destnode)
 no_match:
 	spin_unlock_bh(&seq->lock);
 not_found:
-	read_unlock_bh(&tipc_nametbl_lock);
+	rcu_read_unlock();
 	*destnode = node;
 	return ref;
 }
@@ -616,7 +618,7 @@ int tipc_nametbl_mc_translate(u32 type, u32 lower, u32 upper, u32 limit,
 	struct name_info *info;
 	int res = 0;
 
-	read_lock_bh(&tipc_nametbl_lock);
+	rcu_read_lock();
 	seq = nametbl_find_seq(type);
 	if (!seq)
 		goto exit;
@@ -641,7 +643,7 @@ int tipc_nametbl_mc_translate(u32 type, u32 lower, u32 upper, u32 limit,
 	}
 	spin_unlock_bh(&seq->lock);
 exit:
-	read_unlock_bh(&tipc_nametbl_lock);
+	rcu_read_unlock();
 	return res;
 }
 
@@ -654,11 +656,11 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
 	struct publication *publ;
 	struct sk_buff *buf = NULL;
 
-	write_lock_bh(&tipc_nametbl_lock);
+	spin_lock_bh(&tipc_nametbl_lock);
 	if (tipc_nametbl->local_publ_count >= TIPC_MAX_PUBLICATIONS) {
 		pr_warn("Publication failed, local publication limit reached (%u)\n",
 			TIPC_MAX_PUBLICATIONS);
-		write_unlock_bh(&tipc_nametbl_lock);
+		spin_unlock_bh(&tipc_nametbl_lock);
 		return NULL;
 	}
 
@@ -670,7 +672,7 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
 		/* Any pending external events? */
 		tipc_named_process_backlog();
 	}
-	write_unlock_bh(&tipc_nametbl_lock);
+	spin_unlock_bh(&tipc_nametbl_lock);
 
 	if (buf)
 		named_cluster_distribute(buf);
@@ -685,7 +687,7 @@ int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key)
 	struct publication *publ;
 	struct sk_buff *skb = NULL;
 
-	write_lock_bh(&tipc_nametbl_lock);
+	spin_lock_bh(&tipc_nametbl_lock);
 	publ = tipc_nametbl_remove_publ(type, lower, tipc_own_addr, ref, key);
 	if (likely(publ)) {
 		tipc_nametbl->local_publ_count--;
@@ -693,13 +695,13 @@ int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key)
 		/* Any pending external events? */
 		tipc_named_process_backlog();
 		list_del_init(&publ->pport_list);
-		kfree(publ);
+		kfree_rcu(publ, rcu);
 	} else {
 		pr_err("Unable to remove local publication\n"
 		       "(type=%u, lower=%u, ref=%u, key=%u)\n",
 		       type, lower, ref, key);
 	}
-	write_unlock_bh(&tipc_nametbl_lock);
+	spin_unlock_bh(&tipc_nametbl_lock);
 
 	if (skb) {
 		named_cluster_distribute(skb);
@@ -717,7 +719,7 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s)
 	int index = hash(type);
 	struct name_seq *seq;
 
-	write_lock_bh(&tipc_nametbl_lock);
+	spin_lock_bh(&tipc_nametbl_lock);
 	seq = nametbl_find_seq(type);
 	if (!seq)
 		seq = tipc_nameseq_create(type,
@@ -730,7 +732,7 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s)
 		pr_warn("Failed to create subscription for {%u,%u,%u}\n",
 			s->seq.type, s->seq.lower, s->seq.upper);
 	}
-	write_unlock_bh(&tipc_nametbl_lock);
+	spin_unlock_bh(&tipc_nametbl_lock);
 }
 
 /**
@@ -740,24 +742,23 @@ void tipc_nametbl_unsubscribe(struct tipc_subscription *s)
 {
 	struct name_seq *seq;
 
-	write_lock_bh(&tipc_nametbl_lock);
+	spin_lock_bh(&tipc_nametbl_lock);
 	seq = nametbl_find_seq(s->seq.type);
 	if (seq != NULL) {
 		spin_lock_bh(&seq->lock);
 		list_del_init(&s->nameseq_list);
 		if (!seq->first_free && list_empty(&seq->subscriptions)) {
-			hlist_del_init(&seq->ns_list);
-			spin_unlock_bh(&seq->lock);
+			hlist_del_init_rcu(&seq->ns_list);
 			kfree(seq->sseqs);
-			kfree(seq);
+			spin_unlock_bh(&seq->lock);
+			kfree_rcu(seq, rcu);
 		} else {
 			spin_unlock_bh(&seq->lock);
 		}
 	}
-	write_unlock_bh(&tipc_nametbl_lock);
+	spin_unlock_bh(&tipc_nametbl_lock);
 }
 
-
 /**
  * subseq_list - print specified sub-sequence contents into the given buffer
  */
@@ -880,7 +881,7 @@ static int nametbl_list(char *buf, int len, u32 depth_info,
 		upbound = ~0;
 		for (i = 0; i < TIPC_NAMETBL_SIZE; i++) {
 			seq_head = &tipc_nametbl->seq_hlist[i];
-			hlist_for_each_entry(seq, seq_head, ns_list) {
+			hlist_for_each_entry_rcu(seq, seq_head, ns_list) {
 				ret += nameseq_list(seq, buf + ret, len - ret,
 						   depth, seq->type,
 						   lowbound, upbound, i);
@@ -896,7 +897,7 @@ static int nametbl_list(char *buf, int len, u32 depth_info,
 		ret += nametbl_header(buf + ret, len - ret, depth);
 		i = hash(type);
 		seq_head = &tipc_nametbl->seq_hlist[i];
-		hlist_for_each_entry(seq, seq_head, ns_list) {
+		hlist_for_each_entry_rcu(seq, seq_head, ns_list) {
 			if (seq->type == type) {
 				ret += nameseq_list(seq, buf + ret, len - ret,
 						   depth, type,
@@ -928,11 +929,11 @@ struct sk_buff *tipc_nametbl_get(const void *req_tlv_area, int req_tlv_space)
 	pb = TLV_DATA(rep_tlv);
 	pb_len = ULTRA_STRING_MAX_LEN;
 	argv = (struct tipc_name_table_query *)TLV_DATA(req_tlv_area);
-	read_lock_bh(&tipc_nametbl_lock);
+	rcu_read_lock();
 	str_len = nametbl_list(pb, pb_len, ntohl(argv->depth),
 			       ntohl(argv->type),
 			       ntohl(argv->lowbound), ntohl(argv->upbound));
-	read_unlock_bh(&tipc_nametbl_lock);
+	rcu_read_unlock();
 	str_len += 1;	/* for "\0" */
 	skb_put(buf, TLV_SPACE(str_len));
 	TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
@@ -974,13 +975,13 @@ static void tipc_purge_publications(struct name_seq *seq)
 	list_for_each_entry_safe(publ, safe, &info->zone_list, zone_list) {
 		tipc_nametbl_remove_publ(publ->type, publ->lower, publ->node,
 					 publ->ref, publ->key);
-		kfree(publ);
+		kfree_rcu(publ, rcu);
 	}
-	hlist_del_init(&seq->ns_list);
+	hlist_del_init_rcu(&seq->ns_list);
+	kfree(seq->sseqs);
 	spin_lock_bh(&seq->lock);
 
-	kfree(seq->sseqs);
-	kfree(seq);
+	kfree_rcu(seq, rcu);
 }
 
 void tipc_nametbl_stop(void)
@@ -988,22 +989,22 @@ void tipc_nametbl_stop(void)
 	u32 i;
 	struct name_seq *seq;
 	struct hlist_head *seq_head;
-	struct hlist_node *safe;
 
 	/* Verify name table is empty and purge any lingering
 	 * publications, then release the name table
 	 */
-	write_lock_bh(&tipc_nametbl_lock);
+	spin_lock_bh(&tipc_nametbl_lock);
 	for (i = 0; i < TIPC_NAMETBL_SIZE; i++) {
 		if (hlist_empty(&tipc_nametbl->seq_hlist[i]))
 			continue;
 		seq_head = &tipc_nametbl->seq_hlist[i];
-		hlist_for_each_entry_safe(seq, safe, seq_head, ns_list) {
+		hlist_for_each_entry_rcu(seq, seq_head, ns_list) {
 			tipc_purge_publications(seq);
 		}
 	}
-	write_unlock_bh(&tipc_nametbl_lock);
+	spin_unlock_bh(&tipc_nametbl_lock);
 
+	synchronize_net();
 	kfree(tipc_nametbl);
 
 }
@@ -1109,7 +1110,7 @@ static int __tipc_nl_seq_list(struct tipc_nl_msg *msg, u32 *last_type,
 			      u32 *last_lower, u32 *last_publ)
 {
 	struct hlist_head *seq_head;
-	struct name_seq *seq;
+	struct name_seq *seq = NULL;
 	int err;
 	int i;
 
@@ -1126,13 +1127,13 @@ static int __tipc_nl_seq_list(struct tipc_nl_msg *msg, u32 *last_type,
 			if (!seq)
 				return -EPIPE;
 		} else {
-			seq = hlist_entry_safe((seq_head)->first,
-					       struct name_seq, ns_list);
+			hlist_for_each_entry_rcu(seq, seq_head, ns_list)
+				break;
 			if (!seq)
 				continue;
 		}
 
-		hlist_for_each_entry_from(seq, ns_list) {
+		hlist_for_each_entry_from_rcu(seq, ns_list) {
 			spin_lock_bh(&seq->lock);
 			err = __tipc_nl_subseq_list(msg, seq, last_lower,
 						    last_publ);
@@ -1165,8 +1166,7 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb)
 	msg.portid = NETLINK_CB(cb->skb).portid;
 	msg.seq = cb->nlh->nlmsg_seq;
 
-	read_lock_bh(&tipc_nametbl_lock);
-
+	rcu_read_lock();
 	err = __tipc_nl_seq_list(&msg, &last_type, &last_lower, &last_publ);
 	if (!err) {
 		done = 1;
@@ -1179,8 +1179,7 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb)
 		 */
 		cb->prev_seq = 1;
 	}
-
-	read_unlock_bh(&tipc_nametbl_lock);
+	rcu_read_unlock();
 
 	cb->args[0] = last_type;
 	cb->args[1] = last_lower;
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index c1fd734..5f0dee9 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -62,6 +62,7 @@ struct tipc_port_list;
  * @node_list: adjacent matching name seq publications with >= node scope
  * @cluster_list: adjacent matching name seq publications with >= cluster scope
  * @zone_list: adjacent matching name seq publications with >= zone scope
+ * @rcu: RCU callback head used for deferred freeing
  *
  * Note that the node list, cluster list, and zone list are circular lists.
  */
@@ -79,6 +80,7 @@ struct publication {
 	struct list_head node_list;
 	struct list_head cluster_list;
 	struct list_head zone_list;
+	struct rcu_head rcu;
 };
 
 /**
@@ -93,7 +95,7 @@ struct name_table {
 	u32 local_publ_count;
 };
 
-extern rwlock_t tipc_nametbl_lock;
+extern spinlock_t tipc_nametbl_lock;
 extern struct name_table *tipc_nametbl;
 
 int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
-- 
1.7.9.5


------------------------------------------------------------------------------
Download BIRT iHub F-Type - The Free Enterprise-Grade BIRT Server
from Actuate! Instantly Supercharge Your Business Reports and Dashboards
with Interactivity, Sharing, Native Excel Exports, App Integration & more
Get technology previously reserved for billion-dollar corporations, FREE
http://pubads.g.doubleclick.net/gampad/clk?id=157005751&iu=/4140/ostg.clktrk

^ permalink raw reply related	[flat|nested] 10+ messages in thread

* Re: [PATCH net-next 0/8] tipc: convert name table read-write lock to RCU
  2014-12-02  7:00 [PATCH net-next 0/8] tipc: convert name table read-write lock to RCU Ying Xue
                   ` (7 preceding siblings ...)
  2014-12-02  7:00 ` [PATCH net-next 8/8] tipc: convert name table read-write lock to RCU Ying Xue
@ 2014-12-09  1:41 ` David Miller
  8 siblings, 0 replies; 10+ messages in thread
From: David Miller @ 2014-12-09  1:41 UTC (permalink / raw)
  To: ying.xue
  Cc: jon.maloy, Paul.Gortmaker, erik.hugne, richard.alpe, tero.aho,
	netdev, tipc-discussion

From: Ying Xue <ying.xue@windriver.com>
Date: Tue, 2 Dec 2014 15:00:22 +0800

> Now TIPC name table is statically allocated and is protected with a
> Read-Write lock. To enhance the performance of TIPC name table lookup,
> we are going to involve RCU lock to protect the name table. As a
> consequence, it becomes lockless to concurrently look up name table on
> read side. However, before the conversion can be successfully made,
> the following two things must be first done:
> 
> - change allocation way of name table from static to dynamic
> - fix several incorrect locking policy issues

Series applied, thanks.

^ permalink raw reply	[flat|nested] 10+ messages in thread

end of thread, other threads:[~2014-12-09  1:41 UTC | newest]

Thread overview: 10+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2014-12-02  7:00 [PATCH net-next 0/8] tipc: convert name table read-write lock to RCU Ying Xue
2014-12-02  7:00 ` [PATCH net-next 1/8] tipc: remove size variable from publ_list struct Ying Xue
2014-12-02  7:00 ` [PATCH net-next 2/8] tipc: make name table allocated dynamically Ying Xue
2014-12-02  7:00 ` [PATCH net-next 3/8] tipc: ensure all name sequences are released when name table is stopped Ying Xue
2014-12-02  7:00 ` [PATCH net-next 4/8] tipc: ensure all name sequences are properly protected with its lock Ying Xue
2014-12-02  7:00 ` [PATCH net-next 5/8] tipc: any name table member must be protected under name table lock Ying Xue
2014-12-02  7:00 ` [PATCH net-next 6/8] tipc: simplify relationship between name table lock and node lock Ying Xue
2014-12-02  7:00 ` [PATCH net-next 7/8] tipc: remove unnecessary INIT_LIST_HEAD Ying Xue
2014-12-02  7:00 ` [PATCH net-next 8/8] tipc: convert name table read-write lock to RCU Ying Xue
2014-12-09  1:41 ` [PATCH net-next 0/8] " David Miller

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).