All of lore.kernel.org
 help / color / mirror / Atom feed
From: Yipeng Wang <yipeng1.wang@intel.com>
To: pablo.de.lara.guarch@intel.com
Cc: dev@dpdk.org, yipeng1.wang@intel.com, bruce.richardson@intel.com,
	honnappa.nagarahalli@arm.com, vguvva@caviumnetworks.com,
	brijesh.s.singh@gmail.com
Subject: [PATCH v4 5/8] hash: add read and write concurrency support
Date: Mon,  9 Jul 2018 03:45:00 -0700	[thread overview]
Message-ID: <1531133103-437316-6-git-send-email-yipeng1.wang@intel.com> (raw)
In-Reply-To: <1531133103-437316-1-git-send-email-yipeng1.wang@intel.com>

The existing implementation of librte_hash does not support read-write
concurrency. This commit implements read-write safety using rte_rwlock
and rte_rwlock TM version if hardware transactional memory is available.

Both multi-writer and read-write concurrency is protected by rte_rwlock
now. The x86 specific header file is removed since the x86 specific RTM
function is not called directly by rte hash now.

Signed-off-by: Yipeng Wang <yipeng1.wang@intel.com>
Acked-by: Pablo de Lara <pablo.de.lara.guarch@intel.com>
---
 lib/librte_hash/meson.build           |   1 -
 lib/librte_hash/rte_cuckoo_hash.c     | 520 ++++++++++++++++++++++------------
 lib/librte_hash/rte_cuckoo_hash.h     |  18 +-
 lib/librte_hash/rte_cuckoo_hash_x86.h | 167 -----------
 lib/librte_hash/rte_hash.h            |   3 +
 5 files changed, 348 insertions(+), 361 deletions(-)
 delete mode 100644 lib/librte_hash/rte_cuckoo_hash_x86.h

diff --git a/lib/librte_hash/meson.build b/lib/librte_hash/meson.build
index e139e1d..efc06ed 100644
--- a/lib/librte_hash/meson.build
+++ b/lib/librte_hash/meson.build
@@ -6,7 +6,6 @@ headers = files('rte_cmp_arm64.h',
 	'rte_cmp_x86.h',
 	'rte_crc_arm64.h',
 	'rte_cuckoo_hash.h',
-	'rte_cuckoo_hash_x86.h',
 	'rte_fbk_hash.h',
 	'rte_hash_crc.h',
 	'rte_hash.h',
diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c
index b812f33..35631cc 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -31,9 +31,6 @@
 #include "rte_hash.h"
 #include "rte_cuckoo_hash.h"
 
-#if defined(RTE_ARCH_X86)
-#include "rte_cuckoo_hash_x86.h"
-#endif
 
 TAILQ_HEAD(rte_hash_list, rte_tailq_entry);
 
@@ -93,8 +90,10 @@ rte_hash_create(const struct rte_hash_parameters *params)
 	void *buckets = NULL;
 	char ring_name[RTE_RING_NAMESIZE];
 	unsigned num_key_slots;
-	unsigned hw_trans_mem_support = 0;
 	unsigned i;
+	unsigned int hw_trans_mem_support = 0, multi_writer_support = 0;
+	unsigned int readwrite_concur_support = 0;
+
 	rte_hash_function default_hash_func = (rte_hash_function)rte_jhash;
 
 	hash_list = RTE_TAILQ_CAST(rte_hash_tailq.head, rte_hash_list);
@@ -118,8 +117,16 @@ rte_hash_create(const struct rte_hash_parameters *params)
 	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_TRANS_MEM_SUPPORT)
 		hw_trans_mem_support = 1;
 
+	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_MULTI_WRITER_ADD)
+		multi_writer_support = 1;
+
+	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY) {
+		readwrite_concur_support = 1;
+		multi_writer_support = 1;
+	}
+
 	/* Store all keys and leave the first entry as a dummy entry for lookup_bulk */
-	if (hw_trans_mem_support)
+	if (multi_writer_support)
 		/*
 		 * Increase number of slots by total number of indices
 		 * that can be stored in the lcore caches
@@ -233,7 +240,7 @@ rte_hash_create(const struct rte_hash_parameters *params)
 	h->cmp_jump_table_idx = KEY_OTHER_BYTES;
 #endif
 
-	if (hw_trans_mem_support) {
+	if (multi_writer_support) {
 		h->local_free_slots = rte_zmalloc_socket(NULL,
 				sizeof(struct lcore_cache) * RTE_MAX_LCORE,
 				RTE_CACHE_LINE_SIZE, params->socket_id);
@@ -261,6 +268,8 @@ rte_hash_create(const struct rte_hash_parameters *params)
 	h->key_store = k;
 	h->free_slots = r;
 	h->hw_trans_mem_support = hw_trans_mem_support;
+	h->multi_writer_support = multi_writer_support;
+	h->readwrite_concur_support = readwrite_concur_support;
 
 #if defined(RTE_ARCH_X86)
 	if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_AVX2))
@@ -271,24 +280,17 @@ rte_hash_create(const struct rte_hash_parameters *params)
 #endif
 		h->sig_cmp_fn = RTE_HASH_COMPARE_SCALAR;
 
-	/* Turn on multi-writer only with explicit flat from user and TM
+	/* Turn on multi-writer only with explicit flag from user and TM
 	 * support.
 	 */
-	if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_MULTI_WRITER_ADD) {
-		if (h->hw_trans_mem_support) {
-			h->add_key = ADD_KEY_MULTIWRITER_TM;
-		} else {
-			h->add_key = ADD_KEY_MULTIWRITER;
-			h->multiwriter_lock = rte_malloc(NULL,
-							sizeof(rte_spinlock_t),
-							RTE_CACHE_LINE_SIZE);
-			if (h->multiwriter_lock == NULL)
-				goto err_unlock;
-
-			rte_spinlock_init(h->multiwriter_lock);
-		}
-	} else
-		h->add_key = ADD_KEY_SINGLEWRITER;
+	if (h->multi_writer_support) {
+		h->readwrite_lock = rte_malloc(NULL, sizeof(rte_rwlock_t),
+						RTE_CACHE_LINE_SIZE);
+		if (h->readwrite_lock == NULL)
+			goto err_unlock;
+
+		rte_rwlock_init(h->readwrite_lock);
+	}
 
 	/* Populate free slots ring. Entry zero is reserved for key misses. */
 	for (i = 1; i < num_key_slots; i++)
@@ -338,11 +340,10 @@ rte_hash_free(struct rte_hash *h)
 
 	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
 
-	if (h->hw_trans_mem_support)
+	if (h->multi_writer_support) {
 		rte_free(h->local_free_slots);
-
-	if (h->add_key == ADD_KEY_MULTIWRITER)
-		rte_free(h->multiwriter_lock);
+		rte_free(h->readwrite_lock);
+	}
 	rte_ring_free(h->free_slots);
 	rte_free(h->key_store);
 	rte_free(h->buckets);
@@ -369,6 +370,44 @@ rte_hash_secondary_hash(const hash_sig_t primary_hash)
 	return primary_hash ^ ((tag + 1) * alt_bits_xor);
 }
 
+/* Read write locks implemented using rte_rwlock */
+static inline void
+__hash_rw_writer_lock(const struct rte_hash *h)
+{
+	if (h->multi_writer_support && h->hw_trans_mem_support)
+		rte_rwlock_write_lock_tm(h->readwrite_lock);
+	else if (h->multi_writer_support)
+		rte_rwlock_write_lock(h->readwrite_lock);
+}
+
+
+static inline void
+__hash_rw_reader_lock(const struct rte_hash *h)
+{
+	if (h->readwrite_concur_support && h->hw_trans_mem_support)
+		rte_rwlock_read_lock_tm(h->readwrite_lock);
+	else if (h->readwrite_concur_support)
+		rte_rwlock_read_lock(h->readwrite_lock);
+}
+
+static inline void
+__hash_rw_writer_unlock(const struct rte_hash *h)
+{
+	if (h->multi_writer_support && h->hw_trans_mem_support)
+		rte_rwlock_write_unlock_tm(h->readwrite_lock);
+	else if (h->multi_writer_support)
+		rte_rwlock_write_unlock(h->readwrite_lock);
+}
+
+static inline void
+__hash_rw_reader_unlock(const struct rte_hash *h)
+{
+	if (h->readwrite_concur_support && h->hw_trans_mem_support)
+		rte_rwlock_read_unlock_tm(h->readwrite_lock);
+	else if (h->readwrite_concur_support)
+		rte_rwlock_read_unlock(h->readwrite_lock);
+}
+
 void
 rte_hash_reset(struct rte_hash *h)
 {
@@ -378,6 +417,7 @@ rte_hash_reset(struct rte_hash *h)
 	if (h == NULL)
 		return;
 
+	__hash_rw_writer_lock(h);
 	memset(h->buckets, 0, h->num_buckets * sizeof(struct rte_hash_bucket));
 	memset(h->key_store, 0, h->key_entry_size * (h->entries + 1));
 
@@ -386,7 +426,7 @@ rte_hash_reset(struct rte_hash *h)
 		rte_pause();
 
 	/* Repopulate the free slots ring. Entry zero is reserved for key misses */
-	if (h->hw_trans_mem_support)
+	if (h->multi_writer_support)
 		tot_ring_cnt = h->entries + (RTE_MAX_LCORE - 1) *
 					(LCORE_CACHE_SIZE - 1);
 	else
@@ -395,77 +435,12 @@ rte_hash_reset(struct rte_hash *h)
 	for (i = 1; i < tot_ring_cnt + 1; i++)
 		rte_ring_sp_enqueue(h->free_slots, (void *)((uintptr_t) i));
 
-	if (h->hw_trans_mem_support) {
+	if (h->multi_writer_support) {
 		/* Reset local caches per lcore */
 		for (i = 0; i < RTE_MAX_LCORE; i++)
 			h->local_free_slots[i].len = 0;
 	}
-}
-
-/* Search for an entry that can be pushed to its alternative location */
-static inline int
-make_space_bucket(const struct rte_hash *h, struct rte_hash_bucket *bkt,
-		unsigned int *nr_pushes)
-{
-	unsigned i, j;
-	int ret;
-	uint32_t next_bucket_idx;
-	struct rte_hash_bucket *next_bkt[RTE_HASH_BUCKET_ENTRIES];
-
-	/*
-	 * Push existing item (search for bucket with space in
-	 * alternative locations) to its alternative location
-	 */
-	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-		/* Search for space in alternative locations */
-		next_bucket_idx = bkt->sig_alt[i] & h->bucket_bitmask;
-		next_bkt[i] = &h->buckets[next_bucket_idx];
-		for (j = 0; j < RTE_HASH_BUCKET_ENTRIES; j++) {
-			if (next_bkt[i]->key_idx[j] == EMPTY_SLOT)
-				break;
-		}
-
-		if (j != RTE_HASH_BUCKET_ENTRIES)
-			break;
-	}
-
-	/* Alternative location has spare room (end of recursive function) */
-	if (i != RTE_HASH_BUCKET_ENTRIES) {
-		next_bkt[i]->sig_alt[j] = bkt->sig_current[i];
-		next_bkt[i]->sig_current[j] = bkt->sig_alt[i];
-		next_bkt[i]->key_idx[j] = bkt->key_idx[i];
-		return i;
-	}
-
-	/* Pick entry that has not been pushed yet */
-	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++)
-		if (bkt->flag[i] == 0)
-			break;
-
-	/* All entries have been pushed, so entry cannot be added */
-	if (i == RTE_HASH_BUCKET_ENTRIES || ++(*nr_pushes) > RTE_HASH_MAX_PUSHES)
-		return -ENOSPC;
-
-	/* Set flag to indicate that this entry is going to be pushed */
-	bkt->flag[i] = 1;
-
-	/* Need room in alternative bucket to insert the pushed entry */
-	ret = make_space_bucket(h, next_bkt[i], nr_pushes);
-	/*
-	 * After recursive function.
-	 * Clear flags and insert the pushed entry
-	 * in its alternative location if successful,
-	 * or return error
-	 */
-	bkt->flag[i] = 0;
-	if (ret >= 0) {
-		next_bkt[i]->sig_alt[ret] = bkt->sig_current[i];
-		next_bkt[i]->sig_current[ret] = bkt->sig_alt[i];
-		next_bkt[i]->key_idx[ret] = bkt->key_idx[i];
-		return i;
-	} else
-		return ret;
-
+	__hash_rw_writer_unlock(h);
 }
 
 /*
@@ -478,7 +453,7 @@ enqueue_slot_back(const struct rte_hash *h,
 		struct lcore_cache *cached_free_slots,
 		void *slot_id)
 {
-	if (h->hw_trans_mem_support) {
+	if (h->multi_writer_support) {
 		cached_free_slots->objs[cached_free_slots->len] = slot_id;
 		cached_free_slots->len++;
 	} else
@@ -512,13 +487,207 @@ search_and_update(const struct rte_hash *h, void *data, const void *key,
 	return -1;
 }
 
+/* Only tries to insert at one bucket (@prim_bkt) without trying to push
+ * buckets around.
+ * return 1 if matching existing key, return 0 if succeeds, return -1 for no
+ * empty entry.
+ */
+static inline int32_t
+rte_hash_cuckoo_insert_mw(const struct rte_hash *h,
+		struct rte_hash_bucket *prim_bkt,
+		struct rte_hash_bucket *sec_bkt,
+		const struct rte_hash_key *key, void *data,
+		hash_sig_t sig, hash_sig_t alt_hash, uint32_t new_idx,
+		int32_t *ret_val)
+{
+	unsigned int i;
+	struct rte_hash_bucket *cur_bkt = prim_bkt;
+	int32_t ret;
+
+	__hash_rw_writer_lock(h);
+	/* Check if key was inserted after last check but before this
+	 * protected region in case of inserting duplicated keys.
+	 */
+	ret = search_and_update(h, data, key, cur_bkt, sig, alt_hash);
+	if (ret != -1) {
+		__hash_rw_writer_unlock(h);
+		*ret_val = ret;
+		return 1;
+	}
+	ret = search_and_update(h, data, key, sec_bkt, alt_hash, sig);
+	if (ret != -1) {
+		__hash_rw_writer_unlock(h);
+		*ret_val = ret;
+		return 1;
+	}
+
+	/* Insert new entry if there is room in the primary
+	 * bucket.
+	 */
+	for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
+		/* Check if slot is available */
+		if (likely(prim_bkt->key_idx[i] == EMPTY_SLOT)) {
+			prim_bkt->sig_current[i] = sig;
+			prim_bkt->sig_alt[i] = alt_hash;
+			prim_bkt->key_idx[i] = new_idx;
+			break;
+		}
+	}
+	__hash_rw_writer_unlock(h);
+
+	if (i != RTE_HASH_BUCKET_ENTRIES)
+		return 0;
+
+	/* no empty entry */
+	return -1;
+}
+
+/* Shift buckets along provided cuckoo_path (@leaf and @leaf_slot) and fill
+ * the path head with new entry (sig, alt_hash, new_idx)
+ * return 1 if matched key found, return -1 if cuckoo path invalided and fail,
+ * return 0 if succeeds.
+ */
+static inline int
+rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
+			struct rte_hash_bucket *bkt,
+			struct rte_hash_bucket *alt_bkt,
+			const struct rte_hash_key *key, void *data,
+			struct queue_node *leaf, uint32_t leaf_slot,
+			hash_sig_t sig, hash_sig_t alt_hash, uint32_t new_idx,
+			int32_t *ret_val)
+{
+	uint32_t prev_alt_bkt_idx;
+	struct rte_hash_bucket *cur_bkt = bkt;
+	struct queue_node *prev_node, *curr_node = leaf;
+	struct rte_hash_bucket *prev_bkt, *curr_bkt = leaf->bkt;
+	uint32_t prev_slot, curr_slot = leaf_slot;
+	int32_t ret;
+
+	__hash_rw_writer_lock(h);
+
+	/* In case empty slot was gone before entering protected region */
+	if (curr_bkt->key_idx[curr_slot] != EMPTY_SLOT) {
+		__hash_rw_writer_unlock(h);
+		return -1;
+	}
+
+	/* Check if key was inserted after last check but before this
+	 * protected region.
+	 */
+	ret = search_and_update(h, data, key, cur_bkt, sig, alt_hash);
+	if (ret != -1) {
+		__hash_rw_writer_unlock(h);
+		*ret_val = ret;
+		return 1;
+	}
+
+	ret = search_and_update(h, data, key, alt_bkt, alt_hash, sig);
+	if (ret != -1) {
+		__hash_rw_writer_unlock(h);
+		*ret_val = ret;
+		return 1;
+	}
+
+	while (likely(curr_node->prev != NULL)) {
+		prev_node = curr_node->prev;
+		prev_bkt = prev_node->bkt;
+		prev_slot = curr_node->prev_slot;
+
+		prev_alt_bkt_idx =
+			prev_bkt->sig_alt[prev_slot] & h->bucket_bitmask;
+
+		if (unlikely(&h->buckets[prev_alt_bkt_idx]
+				!= curr_bkt)) {
+			/* revert it to empty, otherwise duplicated keys */
+			curr_bkt->key_idx[curr_slot] = EMPTY_SLOT;
+			__hash_rw_writer_unlock(h);
+			return -1;
+		}
+
+		/* Need to swap current/alt sig to allow later
+		 * Cuckoo insert to move elements back to its
+		 * primary bucket if available
+		 */
+		curr_bkt->sig_alt[curr_slot] =
+			 prev_bkt->sig_current[prev_slot];
+		curr_bkt->sig_current[curr_slot] =
+			prev_bkt->sig_alt[prev_slot];
+		curr_bkt->key_idx[curr_slot] =
+			prev_bkt->key_idx[prev_slot];
+
+		curr_slot = prev_slot;
+		curr_node = prev_node;
+		curr_bkt = curr_node->bkt;
+	}
+
+	curr_bkt->sig_current[curr_slot] = sig;
+	curr_bkt->sig_alt[curr_slot] = alt_hash;
+	curr_bkt->key_idx[curr_slot] = new_idx;
+
+	__hash_rw_writer_unlock(h);
+
+	return 0;
+
+}
+
+/*
+ * Make space for new key, using bfs Cuckoo Search and Multi-Writer safe
+ * Cuckoo
+ */
+static inline int
+rte_hash_cuckoo_make_space_mw(const struct rte_hash *h,
+			struct rte_hash_bucket *bkt,
+			struct rte_hash_bucket *sec_bkt,
+			const struct rte_hash_key *key, void *data,
+			hash_sig_t sig, hash_sig_t alt_hash,
+			uint32_t new_idx, int32_t *ret_val)
+{
+	unsigned int i;
+	struct queue_node queue[RTE_HASH_BFS_QUEUE_MAX_LEN];
+	struct queue_node *tail, *head;
+	struct rte_hash_bucket *curr_bkt, *alt_bkt;
+
+	tail = queue;
+	head = queue + 1;
+	tail->bkt = bkt;
+	tail->prev = NULL;
+	tail->prev_slot = -1;
+
+	/* Cuckoo bfs Search */
+	while (likely(tail != head && head <
+					queue + RTE_HASH_BFS_QUEUE_MAX_LEN -
+					RTE_HASH_BUCKET_ENTRIES)) {
+		curr_bkt = tail->bkt;
+		for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
+			if (curr_bkt->key_idx[i] == EMPTY_SLOT) {
+				int32_t ret = rte_hash_cuckoo_move_insert_mw(h,
+						bkt, sec_bkt, key, data,
+						tail, i, sig, alt_hash,
+						new_idx, ret_val);
+				if (likely(ret != -1))
+					return ret;
+			}
+
+			/* Enqueue new node and keep prev node info */
+			alt_bkt = &(h->buckets[curr_bkt->sig_alt[i]
+						    & h->bucket_bitmask]);
+			head->bkt = alt_bkt;
+			head->prev = tail;
+			head->prev_slot = i;
+			head++;
+		}
+		tail++;
+	}
+
+	return -ENOSPC;
+}
+
 static inline int32_t
 __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 						hash_sig_t sig, void *data)
 {
 	hash_sig_t alt_hash;
 	uint32_t prim_bucket_idx, sec_bucket_idx;
-	unsigned i;
 	struct rte_hash_bucket *prim_bkt, *sec_bkt;
 	struct rte_hash_key *new_k, *keys = h->key_store;
 	void *slot_id = NULL;
@@ -527,10 +696,7 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 	unsigned n_slots;
 	unsigned lcore_id;
 	struct lcore_cache *cached_free_slots = NULL;
-	unsigned int nr_pushes = 0;
-
-	if (h->add_key == ADD_KEY_MULTIWRITER)
-		rte_spinlock_lock(h->multiwriter_lock);
+	int32_t ret_val;
 
 	prim_bucket_idx = sig & h->bucket_bitmask;
 	prim_bkt = &h->buckets[prim_bucket_idx];
@@ -541,8 +707,24 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 	sec_bkt = &h->buckets[sec_bucket_idx];
 	rte_prefetch0(sec_bkt);
 
-	/* Get a new slot for storing the new key */
-	if (h->hw_trans_mem_support) {
+	/* Check if key is already inserted in primary location */
+	__hash_rw_writer_lock(h);
+	ret = search_and_update(h, data, key, prim_bkt, sig, alt_hash);
+	if (ret != -1) {
+		__hash_rw_writer_unlock(h);
+		return ret;
+	}
+
+	/* Check if key is already inserted in secondary location */
+	ret = search_and_update(h, data, key, sec_bkt, alt_hash, sig);
+	if (ret != -1) {
+		__hash_rw_writer_unlock(h);
+		return ret;
+	}
+	__hash_rw_writer_unlock(h);
+
+	/* Did not find a match, so get a new slot for storing the new key */
+	if (h->multi_writer_support) {
 		lcore_id = rte_lcore_id();
 		cached_free_slots = &h->local_free_slots[lcore_id];
 		/* Try to get a free slot from the local cache */
@@ -552,8 +734,7 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 					cached_free_slots->objs,
 					LCORE_CACHE_SIZE, NULL);
 			if (n_slots == 0) {
-				ret = -ENOSPC;
-				goto failure;
+				return -ENOSPC;
 			}
 
 			cached_free_slots->len += n_slots;
@@ -564,92 +745,50 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
 		slot_id = cached_free_slots->objs[cached_free_slots->len];
 	} else {
 		if (rte_ring_sc_dequeue(h->free_slots, &slot_id) != 0) {
-			ret = -ENOSPC;
-			goto failure;
+			return -ENOSPC;
 		}
 	}
 
 	new_k = RTE_PTR_ADD(keys, (uintptr_t)slot_id * h->key_entry_size);
-	rte_prefetch0(new_k);
 	new_idx = (uint32_t)((uintptr_t) slot_id);
-
-	/* Check if key is already inserted in primary location */
-	ret = search_and_update(h, data, key, prim_bkt, sig, alt_hash);
-	if (ret != -1)
-		goto failure;
-
-	/* Check if key is already inserted in secondary location */
-	ret = search_and_update(h, data, key, sec_bkt, alt_hash, sig);
-	if (ret != -1)
-		goto failure;
-
 	/* Copy key */
 	rte_memcpy(new_k->key, key, h->key_len);
 	new_k->pdata = data;
 
-#if defined(RTE_ARCH_X86) /* currently only x86 support HTM */
-	if (h->add_key == ADD_KEY_MULTIWRITER_TM) {
-		ret = rte_hash_cuckoo_insert_mw_tm(prim_bkt,
-				sig, alt_hash, new_idx);
-		if (ret >= 0)
-			return new_idx - 1;
 
-		/* Primary bucket full, need to make space for new entry */
-		ret = rte_hash_cuckoo_make_space_mw_tm(h, prim_bkt, sig,
-							alt_hash, new_idx);
+	/* Find an empty slot and insert */
+	ret = rte_hash_cuckoo_insert_mw(h, prim_bkt, sec_bkt, key, data,
+					sig, alt_hash, new_idx, &ret_val);
+	if (ret == 0)
+		return new_idx - 1;
+	else if (ret == 1) {
+		enqueue_slot_back(h, cached_free_slots, slot_id);
+		return ret_val;
+	}
 
-		if (ret >= 0)
-			return new_idx - 1;
+	/* Primary bucket full, need to make space for new entry */
+	ret = rte_hash_cuckoo_make_space_mw(h, prim_bkt, sec_bkt, key, data,
+					sig, alt_hash, new_idx, &ret_val);
+	if (ret == 0)
+		return new_idx - 1;
+	else if (ret == 1) {
+		enqueue_slot_back(h, cached_free_slots, slot_id);
+		return ret_val;
+	}
 
-		/* Also search secondary bucket to get better occupancy */
-		ret = rte_hash_cuckoo_make_space_mw_tm(h, sec_bkt, sig,
-							alt_hash, new_idx);
+	/* Also search secondary bucket to get better occupancy */
+	ret = rte_hash_cuckoo_make_space_mw(h, sec_bkt, prim_bkt, key, data,
+					alt_hash, sig, new_idx, &ret_val);
 
-		if (ret >= 0)
-			return new_idx - 1;
+	if (ret == 0)
+		return new_idx - 1;
+	else if (ret == 1) {
+		enqueue_slot_back(h, cached_free_slots, slot_id);
+		return ret_val;
 	} else {
-#endif
-		for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-			/* Check if slot is available */
-			if (likely(prim_bkt->key_idx[i] == EMPTY_SLOT)) {
-				prim_bkt->sig_current[i] = sig;
-				prim_bkt->sig_alt[i] = alt_hash;
-				prim_bkt->key_idx[i] = new_idx;
-				break;
-			}
-		}
-
-		if (i != RTE_HASH_BUCKET_ENTRIES) {
-			if (h->add_key == ADD_KEY_MULTIWRITER)
-				rte_spinlock_unlock(h->multiwriter_lock);
-			return new_idx - 1;
-		}
-
-		/* Primary bucket full, need to make space for new entry
-		 * After recursive function.
-		 * Insert the new entry in the position of the pushed entry
-		 * if successful or return error and
-		 * store the new slot back in the ring
-		 */
-		ret = make_space_bucket(h, prim_bkt, &nr_pushes);
-		if (ret >= 0) {
-			prim_bkt->sig_current[ret] = sig;
-			prim_bkt->sig_alt[ret] = alt_hash;
-			prim_bkt->key_idx[ret] = new_idx;
-			if (h->add_key == ADD_KEY_MULTIWRITER)
-				rte_spinlock_unlock(h->multiwriter_lock);
-			return new_idx - 1;
-		}
-#if defined(RTE_ARCH_X86)
+		enqueue_slot_back(h, cached_free_slots, slot_id);
+		return ret;
 	}
-#endif
-	/* Error in addition, store new slot back in the ring and return error */
-	enqueue_slot_back(h, cached_free_slots, (void *)((uintptr_t) new_idx));
-
-failure:
-	if (h->add_key == ADD_KEY_MULTIWRITER)
-		rte_spinlock_unlock(h->multiwriter_lock);
-	return ret;
 }
 
 int32_t
@@ -734,12 +873,14 @@ __rte_hash_lookup_with_hash(const struct rte_hash *h, const void *key,
 	bucket_idx = sig & h->bucket_bitmask;
 	bkt = &h->buckets[bucket_idx];
 
+	__hash_rw_reader_lock(h);
 
 	/* Check if key is in primary location */
 	ret = search_one_bucket(h, key, sig, data, bkt);
-	if (ret != -1)
+	if (ret != -1) {
+		__hash_rw_reader_unlock(h);
 		return ret;
-
+	}
 	/* Calculate secondary hash */
 	alt_hash = rte_hash_secondary_hash(sig);
 	bucket_idx = alt_hash & h->bucket_bitmask;
@@ -747,9 +888,11 @@ __rte_hash_lookup_with_hash(const struct rte_hash *h, const void *key,
 
 	/* Check if key is in secondary location */
 	ret = search_one_bucket(h, key, alt_hash, data, bkt);
-	if (ret != -1)
+	if (ret != -1) {
+		__hash_rw_reader_unlock(h);
 		return ret;
-
+	}
+	__hash_rw_reader_unlock(h);
 	return -ENOENT;
 }
 
@@ -791,7 +934,7 @@ remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
 
 	bkt->sig_current[i] = NULL_SIGNATURE;
 	bkt->sig_alt[i] = NULL_SIGNATURE;
-	if (h->hw_trans_mem_support) {
+	if (h->multi_writer_support) {
 		lcore_id = rte_lcore_id();
 		cached_free_slots = &h->local_free_slots[lcore_id];
 		/* Cache full, need to free it. */
@@ -855,10 +998,13 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 	bucket_idx = sig & h->bucket_bitmask;
 	bkt = &h->buckets[bucket_idx];
 
+	__hash_rw_writer_lock(h);
 	/* look for key in primary bucket */
 	ret = search_and_remove(h, key, bkt, sig);
-	if (ret != -1)
+	if (ret != -1) {
+		__hash_rw_writer_unlock(h);
 		return ret;
+	}
 
 	/* Calculate secondary hash */
 	alt_hash = rte_hash_secondary_hash(sig);
@@ -867,9 +1013,12 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
 
 	/* look for key in secondary bucket */
 	ret = search_and_remove(h, key, bkt, alt_hash);
-	if (ret != -1)
+	if (ret != -1) {
+		__hash_rw_writer_unlock(h);
 		return ret;
+	}
 
+	__hash_rw_writer_unlock(h);
 	return -ENOENT;
 }
 
@@ -1011,6 +1160,7 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
 		rte_prefetch0(secondary_bkt[i]);
 	}
 
+	__hash_rw_reader_lock(h);
 	/* Compare signatures and prefetch key slot of first hit */
 	for (i = 0; i < num_keys; i++) {
 		compare_signatures(&prim_hitmask[i], &sec_hitmask[i],
@@ -1093,6 +1243,8 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
 		continue;
 	}
 
+	__hash_rw_reader_unlock(h);
+
 	if (hit_mask != NULL)
 		*hit_mask = hits;
 }
@@ -1151,7 +1303,7 @@ rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32
 		bucket_idx = *next / RTE_HASH_BUCKET_ENTRIES;
 		idx = *next % RTE_HASH_BUCKET_ENTRIES;
 	}
-
+	__hash_rw_reader_lock(h);
 	/* Get position of entry in key table */
 	position = h->buckets[bucket_idx].key_idx[idx];
 	next_key = (struct rte_hash_key *) ((char *)h->key_store +
@@ -1160,6 +1312,8 @@ rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32
 	*key = next_key->key;
 	*data = next_key->pdata;
 
+	__hash_rw_reader_unlock(h);
+
 	/* Increment iterator */
 	(*next)++;
 
diff --git a/lib/librte_hash/rte_cuckoo_hash.h b/lib/librte_hash/rte_cuckoo_hash.h
index 7a54e55..db4d1a0 100644
--- a/lib/librte_hash/rte_cuckoo_hash.h
+++ b/lib/librte_hash/rte_cuckoo_hash.h
@@ -88,11 +88,6 @@ const rte_hash_cmp_eq_t cmp_jump_table[NUM_KEY_CMP_CASES] = {
 
 #endif
 
-enum add_key_case {
-	ADD_KEY_SINGLEWRITER = 0,
-	ADD_KEY_MULTIWRITER,
-	ADD_KEY_MULTIWRITER_TM,
-};
 
 /** Number of items per bucket. */
 #define RTE_HASH_BUCKET_ENTRIES		8
@@ -155,18 +150,20 @@ struct rte_hash {
 
 	struct rte_ring *free_slots;
 	/**< Ring that stores all indexes of the free slots in the key table */
-	uint8_t hw_trans_mem_support;
-	/**< Hardware transactional memory support */
+
 	struct lcore_cache *local_free_slots;
 	/**< Local cache per lcore, storing some indexes of the free slots */
-	enum add_key_case add_key; /**< Multi-writer hash add behavior */
-
-	rte_spinlock_t *multiwriter_lock; /**< Multi-writer spinlock for w/o TM */
 
 	/* Fields used in lookup */
 
 	uint32_t key_len __rte_cache_aligned;
 	/**< Length of hash key. */
+	uint8_t hw_trans_mem_support;
+	/**< If hardware transactional memory is used. */
+	uint8_t multi_writer_support;
+	/**< If multi-writer support is enabled. */
+	uint8_t readwrite_concur_support;
+	/**< If read-write concurrency support is enabled */
 	rte_hash_function hash_func;    /**< Function used to calculate hash. */
 	uint32_t hash_func_init_val;    /**< Init value used by hash_func. */
 	rte_hash_cmp_eq_t rte_hash_custom_cmp_eq;
@@ -184,6 +181,7 @@ struct rte_hash {
 	/**< Table with buckets storing all the	hash values and key indexes
 	 * to the key table.
 	 */
+	rte_rwlock_t *readwrite_lock; /**< Read-write lock thread-safety. */
 } __rte_cache_aligned;
 
 struct queue_node {
diff --git a/lib/librte_hash/rte_cuckoo_hash_x86.h b/lib/librte_hash/rte_cuckoo_hash_x86.h
deleted file mode 100644
index 981d7bd..0000000
--- a/lib/librte_hash/rte_cuckoo_hash_x86.h
+++ /dev/null
@@ -1,167 +0,0 @@
-/* SPDX-License-Identifier: BSD-3-Clause
- * Copyright(c) 2016 Intel Corporation
- */
-
-/* rte_cuckoo_hash_x86.h
- * This file holds all x86 specific Cuckoo Hash functions
- */
-
-/* Only tries to insert at one bucket (@prim_bkt) without trying to push
- * buckets around
- */
-static inline unsigned
-rte_hash_cuckoo_insert_mw_tm(struct rte_hash_bucket *prim_bkt,
-		hash_sig_t sig, hash_sig_t alt_hash, uint32_t new_idx)
-{
-	unsigned i, status;
-	unsigned try = 0;
-
-	while (try < RTE_HASH_TSX_MAX_RETRY) {
-		status = rte_xbegin();
-		if (likely(status == RTE_XBEGIN_STARTED)) {
-			/* Insert new entry if there is room in the primary
-			* bucket.
-			*/
-			for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-				/* Check if slot is available */
-				if (likely(prim_bkt->key_idx[i] == EMPTY_SLOT)) {
-					prim_bkt->sig_current[i] = sig;
-					prim_bkt->sig_alt[i] = alt_hash;
-					prim_bkt->key_idx[i] = new_idx;
-					break;
-				}
-			}
-			rte_xend();
-
-			if (i != RTE_HASH_BUCKET_ENTRIES)
-				return 0;
-
-			break; /* break off try loop if transaction commits */
-		} else {
-			/* If we abort we give up this cuckoo path. */
-			try++;
-			rte_pause();
-		}
-	}
-
-	return -1;
-}
-
-/* Shift buckets along provided cuckoo_path (@leaf and @leaf_slot) and fill
- * the path head with new entry (sig, alt_hash, new_idx)
- */
-static inline int
-rte_hash_cuckoo_move_insert_mw_tm(const struct rte_hash *h,
-			struct queue_node *leaf, uint32_t leaf_slot,
-			hash_sig_t sig, hash_sig_t alt_hash, uint32_t new_idx)
-{
-	unsigned try = 0;
-	unsigned status;
-	uint32_t prev_alt_bkt_idx;
-
-	struct queue_node *prev_node, *curr_node = leaf;
-	struct rte_hash_bucket *prev_bkt, *curr_bkt = leaf->bkt;
-	uint32_t prev_slot, curr_slot = leaf_slot;
-
-	while (try < RTE_HASH_TSX_MAX_RETRY) {
-		status = rte_xbegin();
-		if (likely(status == RTE_XBEGIN_STARTED)) {
-			/* In case empty slot was gone before entering TSX */
-			if (curr_bkt->key_idx[curr_slot] != EMPTY_SLOT)
-				rte_xabort(RTE_XABORT_CUCKOO_PATH_INVALIDED);
-			while (likely(curr_node->prev != NULL)) {
-				prev_node = curr_node->prev;
-				prev_bkt = prev_node->bkt;
-				prev_slot = curr_node->prev_slot;
-
-				prev_alt_bkt_idx
-					= prev_bkt->sig_alt[prev_slot]
-					    & h->bucket_bitmask;
-
-				if (unlikely(&h->buckets[prev_alt_bkt_idx]
-					     != curr_bkt)) {
-					rte_xabort(RTE_XABORT_CUCKOO_PATH_INVALIDED);
-				}
-
-				/* Need to swap current/alt sig to allow later
-				 * Cuckoo insert to move elements back to its
-				 * primary bucket if available
-				 */
-				curr_bkt->sig_alt[curr_slot] =
-				    prev_bkt->sig_current[prev_slot];
-				curr_bkt->sig_current[curr_slot] =
-				    prev_bkt->sig_alt[prev_slot];
-				curr_bkt->key_idx[curr_slot]
-				    = prev_bkt->key_idx[prev_slot];
-
-				curr_slot = prev_slot;
-				curr_node = prev_node;
-				curr_bkt = curr_node->bkt;
-			}
-
-			curr_bkt->sig_current[curr_slot] = sig;
-			curr_bkt->sig_alt[curr_slot] = alt_hash;
-			curr_bkt->key_idx[curr_slot] = new_idx;
-
-			rte_xend();
-
-			return 0;
-		}
-
-		/* If we abort we give up this cuckoo path, since most likely it's
-		 * no longer valid as TSX detected data conflict
-		 */
-		try++;
-		rte_pause();
-	}
-
-	return -1;
-}
-
-/*
- * Make space for new key, using bfs Cuckoo Search and Multi-Writer safe
- * Cuckoo
- */
-static inline int
-rte_hash_cuckoo_make_space_mw_tm(const struct rte_hash *h,
-			struct rte_hash_bucket *bkt,
-			hash_sig_t sig, hash_sig_t alt_hash,
-			uint32_t new_idx)
-{
-	unsigned i;
-	struct queue_node queue[RTE_HASH_BFS_QUEUE_MAX_LEN];
-	struct queue_node *tail, *head;
-	struct rte_hash_bucket *curr_bkt, *alt_bkt;
-
-	tail = queue;
-	head = queue + 1;
-	tail->bkt = bkt;
-	tail->prev = NULL;
-	tail->prev_slot = -1;
-
-	/* Cuckoo bfs Search */
-	while (likely(tail != head && head <
-					queue + RTE_HASH_BFS_QUEUE_MAX_LEN -
-					RTE_HASH_BUCKET_ENTRIES)) {
-		curr_bkt = tail->bkt;
-		for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-			if (curr_bkt->key_idx[i] == EMPTY_SLOT) {
-				if (likely(rte_hash_cuckoo_move_insert_mw_tm(h,
-						tail, i, sig,
-						alt_hash, new_idx) == 0))
-					return 0;
-			}
-
-			/* Enqueue new node and keep prev node info */
-			alt_bkt = &(h->buckets[curr_bkt->sig_alt[i]
-						    & h->bucket_bitmask]);
-			head->bkt = alt_bkt;
-			head->prev = tail;
-			head->prev_slot = i;
-			head++;
-		}
-		tail++;
-	}
-
-	return -ENOSPC;
-}
diff --git a/lib/librte_hash/rte_hash.h b/lib/librte_hash/rte_hash.h
index f71ca9f..ecb49e4 100644
--- a/lib/librte_hash/rte_hash.h
+++ b/lib/librte_hash/rte_hash.h
@@ -34,6 +34,9 @@ extern "C" {
 /** Default behavior of insertion, single writer/multi writer */
 #define RTE_HASH_EXTRA_FLAGS_MULTI_WRITER_ADD 0x02
 
+/** Flag to support reader writer concurrency */
+#define RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY 0x04
+
 /** Signature of key that is stored internally. */
 typedef uint32_t hash_sig_t;
 
-- 
2.7.4

  parent reply	other threads:[~2018-07-09 17:52 UTC|newest]

Thread overview: 65+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-06-08 10:51 [PATCH v1 0/3] Add read-write concurrency to rte_hash library Yipeng Wang
2018-06-08 10:51 ` [PATCH v1 1/3] hash: add read and write concurrency support Yipeng Wang
2018-06-26 14:59   ` De Lara Guarch, Pablo
2018-06-08 10:51 ` [PATCH v1 2/3] test: add test case for read write concurrency Yipeng Wang
2018-06-26 15:48   ` De Lara Guarch, Pablo
2018-06-08 10:51 ` [PATCH v1 3/3] hash: add new API function to query the key count Yipeng Wang
2018-06-26 16:11   ` De Lara Guarch, Pablo
2018-06-29 12:24 ` [PATCH v2 0/6] Add read-write concurrency to rte_hash library Yipeng Wang
2018-06-29 12:24   ` [PATCH v2 1/6] hash: make duplicated code into functions Yipeng Wang
2018-07-06 10:04     ` De Lara Guarch, Pablo
2018-06-29 12:24   ` [PATCH v2 2/6] hash: add read and write concurrency support Yipeng Wang
2018-07-06 17:11     ` De Lara Guarch, Pablo
2018-06-29 12:24   ` [PATCH v2 3/6] test: add tests in hash table perf test Yipeng Wang
2018-07-06 17:17     ` De Lara Guarch, Pablo
2018-06-29 12:24   ` [PATCH v2 4/6] test: add test case for read write concurrency Yipeng Wang
2018-07-06 17:31     ` De Lara Guarch, Pablo
2018-06-29 12:24   ` [PATCH v2 5/6] hash: fix to have more accurate key slot size Yipeng Wang
2018-07-06 17:32     ` De Lara Guarch, Pablo
2018-06-29 12:24   ` [PATCH v2 6/6] hash: add new API function to query the key count Yipeng Wang
2018-07-06 17:36     ` De Lara Guarch, Pablo
2018-07-06 19:46 ` [PATCH v3 0/8] Add read-write concurrency to rte_hash library Yipeng Wang
2018-07-06 19:46   ` [PATCH v3 1/8] hash: fix multiwriter lock memory allocation Yipeng Wang
2018-07-09 11:26     ` De Lara Guarch, Pablo
2018-07-06 19:46   ` [PATCH v3 2/8] hash: fix a multi-writer bug Yipeng Wang
2018-07-09 14:16     ` De Lara Guarch, Pablo
2018-07-06 19:46   ` [PATCH v3 3/8] hash: fix to have more accurate key slot size Yipeng Wang
2018-07-09 14:20     ` De Lara Guarch, Pablo
2018-07-06 19:46   ` [PATCH v3 4/8] hash: make duplicated code into functions Yipeng Wang
2018-07-09 14:25     ` De Lara Guarch, Pablo
2018-07-06 19:46   ` [PATCH v3 5/8] hash: add read and write concurrency support Yipeng Wang
2018-07-09 14:28     ` De Lara Guarch, Pablo
2018-07-06 19:46   ` [PATCH v3 6/8] test: add tests in hash table perf test Yipeng Wang
2018-07-09 15:33     ` De Lara Guarch, Pablo
2018-07-06 19:46   ` [PATCH v3 7/8] test: add test case for read write concurrency Yipeng Wang
2018-07-09 16:24     ` De Lara Guarch, Pablo
2018-07-06 19:46   ` [PATCH v3 8/8] hash: add new API function to query the key count Yipeng Wang
2018-07-09 16:22     ` De Lara Guarch, Pablo
2018-07-09 10:44 ` [PATCH v4 0/8] Add read-write concurrency to rte_hash library Yipeng Wang
2018-07-09 10:44   ` [PATCH v4 1/8] hash: fix multiwriter lock memory allocation Yipeng Wang
2018-07-09 10:44   ` [PATCH v4 2/8] hash: fix a multi-writer race condition Yipeng Wang
2018-07-09 10:44   ` [PATCH v4 3/8] hash: fix key slot size accuracy Yipeng Wang
2018-07-09 10:44   ` [PATCH v4 4/8] hash: make duplicated code into functions Yipeng Wang
2018-07-09 10:45   ` Yipeng Wang [this message]
2018-07-09 10:45   ` [PATCH v4 6/8] test: add tests in hash table perf test Yipeng Wang
2018-07-09 10:45   ` [PATCH v4 7/8] test: add test case for read write concurrency Yipeng Wang
2018-07-09 10:45   ` [PATCH v4 8/8] hash: add new API function to query the key count Yipeng Wang
2018-07-10 18:00   ` [PATCH v4 0/8] Add read-write concurrency to rte_hash library Honnappa Nagarahalli
2018-07-12  1:31     ` Wang, Yipeng1
2018-07-12  2:36       ` Honnappa Nagarahalli
2018-07-13  1:47         ` Wang, Yipeng1
2018-07-10 16:59 ` [PATCH v5 " Yipeng Wang
2018-07-10 16:59   ` [PATCH v5 1/8] hash: fix multiwriter lock memory allocation Yipeng Wang
2018-07-10 16:59   ` [PATCH v5 2/8] hash: fix a multi-writer race condition Yipeng Wang
2018-07-10 16:59   ` [PATCH v5 3/8] hash: fix key slot size accuracy Yipeng Wang
2018-07-10 16:59   ` [PATCH v5 4/8] hash: make duplicated code into functions Yipeng Wang
2018-07-10 16:59   ` [PATCH v5 5/8] hash: add read and write concurrency support Yipeng Wang
2018-07-11 20:49     ` Stephen Hemminger
2018-07-12  1:22       ` Wang, Yipeng1
2018-07-12 20:30         ` Thomas Monjalon
2018-07-13  1:55           ` Wang, Yipeng1
2018-08-17 12:51             ` ASM
2018-07-10 16:59   ` [PATCH v5 6/8] test: add tests in hash table perf test Yipeng Wang
2018-07-10 17:00   ` [PATCH v5 7/8] test: add test case for read write concurrency Yipeng Wang
2018-07-10 17:00   ` [PATCH v5 8/8] hash: add new API function to query the key count Yipeng Wang
2018-07-12 21:03   ` [PATCH v5 0/8] Add read-write concurrency to rte_hash library Thomas Monjalon

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1531133103-437316-6-git-send-email-yipeng1.wang@intel.com \
    --to=yipeng1.wang@intel.com \
    --cc=brijesh.s.singh@gmail.com \
    --cc=bruce.richardson@intel.com \
    --cc=dev@dpdk.org \
    --cc=honnappa.nagarahalli@arm.com \
    --cc=pablo.de.lara.guarch@intel.com \
    --cc=vguvva@caviumnetworks.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.