* [PATCH RFC 0/2] rcu skiplists v2
@ 2013-06-16 14:56 Chris Mason
2013-06-16 14:57 ` [PATCH RFC 1/2] Skiplist: rcu range index Chris Mason
` (2 more replies)
0 siblings, 3 replies; 12+ messages in thread
From: Chris Mason @ 2013-06-16 14:56 UTC (permalink / raw)
To: Mathieu Desnoyers
Cc: Linux FS Devel, David Woodhouse, dchinner@redhat.com,
bo.li.liu@oracle.com, rp@svcs.cs.pdx.edu, Paul E. McKenney,
Lai Jiangshan, Stephen Hemminger, Alan Stern
Hi everyone,
This is another iteration of my skiplists patch, with some bugs fixed
and a few missing parts of the API done. The biggest change is in the
insertion code, where I now link from the bottom up once I find the
proper insertion point. This makes it possible to write custom
insertion routines that allow duplicate keys.
It also means insertion doesn't have to lock and track all the nodes
from the top as it searches. In the likely event that we're able to
insert into a free spot in an existing leaf, it only needs to take one
lock.
The IOMMU part of the patch is updated slightly, but still not using all
the new bits in the API. This is mostly because the IOMMU part is going
to change later on and I'm really only using it for testing now.
For benchmarking, the IOMMU numbers are slightly higher than last time,
but not more than 5% or so. This is because the bottleneck is still
skiplist_insert_hole(), which I haven't really changed in this round.
Most of my benchmarking now is with the skiplist_test module, which has
expanded to a few new cases. For random operations, my performance is
slightly slower than rbtree when single threaded.
Random lookups on 10 million items: (inserts are similar)
rbtree random lookup time 4 s 327 ms
skiplist-rcu random lookup time 5 s 175 ms
The new API makes it easier to do sequential operations. Here is a walk
through the 10 million item list:
skiplist-rcu check time 0 s 79 ms
rbtree check time 0 s 295 ms
And sequential insert:
kiplist-rcu fill time 1 s 599 ms
rbtree fill time 1 s 875 ms
The benchmark does random lookup/deletion/insertion rounds. Most of the
operations done are either deletion or insertion, so I'm not skewing the
results with the easy rcu lookup operation.
Single threaded rbtree does consistently win across all sizes of lists:
skiplist-rcu thread 0 time 0 s 287 ms
rbtree thread 0 time 0 s 215 ms
But once we go up to two threads, skiplists win:
skiplist-rcu thread 1 time 0 s 299 ms
rbtree thread 1 time 0 s 499 ms
At 8 threads, the skiplists don't get linear scaling, but it's really
pretty good. Since I'm doing random operations on a wide key space,
the locking skiplist variant is also fast:
skiplist-rcu thread 7 time 0 s 379 ms
skiplist-locking thread 7 time 0 s 425 ms
rbtree thread 7 time 3 s 711 ms
My test machine is 8 cores, so at 16 threads we're into HT:
skiplist-rcu thread 15 time 0 s 583 ms
skiplist-locking thread 15 time 0 s 559 ms
rbtree thread 15 time 7 s 423 ms
It's not all sunshine though. If I shrink the keyspace down to 1000
keys or less, there is more contention on the node locks and we're tied
(or slightly worse) than rbtree. In that kind of workload it makes
sense to add a big fat lock around the skiplist, or just stick with
rbtrees.
The skiplists do have locking and rcu variants of lookup operations.
The locking ones are built on top of the rcu ones, and you can use them
both at the same time.
Patches follow.
-chris
^ permalink raw reply [flat|nested] 12+ messages in thread
* [PATCH RFC 1/2] Skiplist: rcu range index
2013-06-16 14:56 [PATCH RFC 0/2] rcu skiplists v2 Chris Mason
@ 2013-06-16 14:57 ` Chris Mason
2013-07-14 14:06 ` Dong Fang
2013-06-16 14:58 ` [PATCH RFC 2/2] Switch the IOMMU over to the skiplists Chris Mason
2013-06-26 23:02 ` [PATCH RFC 0/2] rcu skiplists v2 Mathieu Desnoyers
2 siblings, 1 reply; 12+ messages in thread
From: Chris Mason @ 2013-06-16 14:57 UTC (permalink / raw)
To: Chris Mason, Mathieu Desnoyers
Cc: Linux FS Devel, David Woodhouse, dchinner@redhat.com,
bo.li.liu@oracle.com, rp@svcs.cs.pdx.edu, Paul E. McKenney,
Lai Jiangshan, Stephen Hemminger, Alan Stern
Signed-off-by: Chris Mason <chris.mason@fusionio.com>
---
include/linux/skiplist.h | 235 ++++++
init/main.c | 5 +
lib/Kconfig | 14 +
lib/Makefile | 3 +
lib/skiplist.c | 2106 ++++++++++++++++++++++++++++++++++++++++++++++
lib/skiplist_test.c | 882 +++++++++++++++++++
6 files changed, 3245 insertions(+)
create mode 100644 include/linux/skiplist.h
create mode 100644 lib/skiplist.c
create mode 100644 lib/skiplist_test.c
diff --git a/include/linux/skiplist.h b/include/linux/skiplist.h
new file mode 100644
index 0000000..04cfe5e
--- /dev/null
+++ b/include/linux/skiplist.h
@@ -0,0 +1,235 @@
+/*
+ * (C) 2011 Liu Bo <liubo2009@cn.fujitsu.com>
+ * (C) 2013 Fusion-io
+ *
+ * This program is free software; you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software
+ * Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 59 Temple
+ * Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+#ifndef _SKIPLIST_H
+#define _SKIPLIST_H
+
+#include <linux/spinlock.h>
+
+/*
+ * This includes a basic skiplist implementation and builds a more
+ * cache friendly variant on top meant to index ranges of keys.
+ *
+ * our random generation function has P at 0.5, so a rough metric
+ * of good performance happens for lists up to 2^MAXLEVEL in size.
+ * Since we have an array of keys, you can do 2^MAXLEVEL * SKIP_KEYS_PER_NODE
+ */
+#define SKIP_MAXLEVEL 32 /* skiplist_get_new_level requires <= 32 */
+
+struct sl_node_ptr {
+ struct sl_node *prev;
+ struct sl_node *next;
+};
+
+/* possible values for sl_node->pending */
+#define SKIPLIST_LIVE 0 /* normal operations */
+#define SKIPLIST_PENDING_DEAD 1 /* active unlink in progress */
+#define SKIPLIST_PENDING_INSERT 2 /* active link in progress */
+
+/*
+ * sl_node must be last in the leaf data struct. Allocate enough
+ * ram for a given size using either the sl_ptrs_size or sl_node_size
+ * helpers.
+ */
+struct sl_node {
+ /* level tells us how big the ptrs array is. It cannot change */
+ unsigned long level;
+
+ /* pending is used to indicate a delete or link in progress */
+ unsigned long pending;
+
+ spinlock_t lock;
+
+ /*
+ * ptrs must be at the bottom because it is variably sized
+ * based on the level
+ */
+ struct sl_node_ptr ptrs[];
+};
+
+/*
+ * The head list structure. The head node has no data,
+ * but it does have the full array of pointers for all the levels.
+ */
+struct sl_list {
+ /*
+ * in the head pointer, we use head->prev to point to
+ * the highest item in the list. But, the last item does
+ * not point back to the head. The head->prev items
+ * are protected the by node lock on the last item
+ */
+ struct sl_node *head;
+ unsigned int level;
+};
+
+/*
+ * If you are indexing extents, embed sl_slots into your structure and use
+ * sl_slot_entry to pull out your real struct. The key and size must not
+ * change if you're using rcu.
+ */
+struct sl_slot {
+ /*
+ * when rcu is on, we use this key to verify the pointer we pull
+ * out of the array. It must not change once the object is
+ * inserted
+ */
+ unsigned long key;
+
+ /*
+ * the range searching functions follow pointers into this slot
+ * struct and use this size field to find out how big the
+ * range is.
+ */
+ unsigned long size;
+
+ /*
+ * a ref is taken when the slot is inserted, or
+ * during non-rcu lookup. It's
+ * the caller's job to drop the refs on deletion
+ * and rcu lookup.
+ */
+ atomic_t refs;
+};
+
+/*
+ * Larger values here make us faster when single threaded. Lower values
+ * increase cache misses but give more chances for concurrency.
+ */
+#define SKIP_KEYS_PER_NODE 32
+
+/*
+ * For indexing extents, this is a leaf in our skip list tree.
+ * Each leaf has a number of pointers and the max field
+ * is used to figure out the key space covered.
+ */
+struct sl_leaf {
+ /* number of valid keys/ptrs in this leaf */
+ int nr;
+
+ /* reference count for this leaf */
+ atomic_t refs;
+
+ /*
+ * max value of the range covered by this leaf. This
+ * includes the size field of the very last extent,
+ * so max = keys[last_index] + ptrs[last_index]->size
+ */
+ unsigned long max;
+
+ /*
+ * sorted, all the keys
+ */
+ unsigned long keys[SKIP_KEYS_PER_NODE];
+
+ /*
+ * data pointers corresponding to the keys
+ */
+ struct sl_slot *ptrs[SKIP_KEYS_PER_NODE];
+
+ /* for freeing our objects after the grace period */
+ struct rcu_head rcu_head;
+
+ /* this needs to be at the end. The size changes based on the level */
+ struct sl_node node;
+};
+
+/*
+ * for a given level, how much memory we need for an array of
+ * all the pointers
+ */
+static inline int sl_ptrs_size(int level)
+{
+ return sizeof(struct sl_node_ptr) * (level + 1);
+}
+
+/*
+ * for a given level, how much memory we need for the
+ * array of pointers and the sl_node struct
+ */
+static inline int sl_node_size(int level)
+{
+ return sizeof(struct sl_node) + sl_ptrs_size(level);
+}
+
+static inline int sl_leaf_size(int level)
+{
+ return sizeof(struct sl_leaf) + sl_ptrs_size(level);
+}
+
+#define sl_entry(ptr) container_of((ptr), struct sl_leaf, node)
+#define sl_slot_entry(ptr, type, member) container_of(ptr, type, member)
+
+static inline int sl_empty(const struct sl_node *head)
+{
+ return head->ptrs[0].next == NULL;
+}
+
+static inline int sl_node_dead(struct sl_node *node)
+{
+ return node->pending == SKIPLIST_PENDING_DEAD;
+}
+
+static inline int sl_node_inserting(struct sl_node *node)
+{
+ return node->pending == SKIPLIST_PENDING_INSERT;
+}
+
+int skiplist_preload(struct sl_list *list, gfp_t gfp_mask);
+int skiplist_get_new_level(struct sl_list *list, int max_level);
+int skiplist_insert(struct sl_list *list, struct sl_slot *slot,
+ int preload_token, struct sl_leaf **cache);
+int sl_init_list(struct sl_list *list, gfp_t mask);
+struct sl_slot *skiplist_lookup(struct sl_list *list, unsigned long key, unsigned long size);
+struct sl_slot *skiplist_lookup_rcu(struct sl_list *list, unsigned long key, unsigned long size);
+struct sl_slot *skiplist_delete(struct sl_list *list, unsigned long key, unsigned long size);
+int skiplist_insert_hole(struct sl_list *list, unsigned long hint,
+ unsigned long limit,
+ unsigned long size, unsigned long align,
+ struct sl_slot *slot,
+ gfp_t gfp_mask);
+void sl_lock_node(struct sl_node *n);
+void sl_unlock_node(struct sl_node *n);
+unsigned long sl_highest_key(struct sl_list *list);
+int skiplist_search_leaf(struct sl_leaf *leaf, unsigned long key,
+ unsigned long size, int *slot);
+struct sl_leaf *skiplist_lookup_leaf(struct sl_list *list,
+ unsigned long key,
+ unsigned long size);
+struct sl_leaf *skiplist_lookup_first_leaf(struct sl_list *list,
+ unsigned long key,
+ unsigned long size);
+struct sl_leaf *skiplist_lookup_leaf_rcu(struct sl_list *list,
+ unsigned long key,
+ unsigned long size);
+struct sl_leaf *skiplist_lookup_first_leaf_rcu(struct sl_list *list,
+ unsigned long key,
+ unsigned long size);
+void skiplist_unlock_leaf(struct sl_leaf *leaf);
+void skiplist_lock_leaf(struct sl_leaf *leaf);
+void skiplist_delete_leaf(struct sl_list *list, struct sl_leaf *leaf,
+ struct sl_leaf **cache_next);
+struct sl_leaf *skiplist_next_leaf(struct sl_list *list,
+ struct sl_leaf *leaf);
+struct sl_leaf *skiplist_next_leaf_rcu(struct sl_list *list,
+ struct sl_leaf *leaf);
+struct sl_leaf *skiplist_first_leaf(struct sl_list *list);
+struct sl_leaf *skiplist_first_leaf_rcu(struct sl_list *list);
+void skiplist_get_leaf(struct sl_leaf *leaf);
+int skiplist_get_leaf_not_zero(struct sl_leaf *leaf);
+void skiplist_put_leaf(struct sl_leaf *leaf);
+void skiplist_wait_pending_insert(struct sl_node *node);
+#endif /* _SKIPLIST_H */
diff --git a/init/main.c b/init/main.c
index 63534a1..ad9b631 100644
--- a/init/main.c
+++ b/init/main.c
@@ -90,6 +90,7 @@ extern void fork_init(unsigned long);
extern void mca_init(void);
extern void sbus_init(void);
extern void radix_tree_init(void);
+extern void skiplist_init(void);
#ifndef CONFIG_DEBUG_RODATA
static inline void mark_rodata_ro(void) { }
#endif
@@ -566,6 +567,10 @@ asmlinkage void __init start_kernel(void)
kmem_cache_init_late();
+#ifdef CONFIG_SKIPLIST
+ skiplist_init();
+#endif
+
/*
* HACK ALERT! This is early. We're enabling the console before
* we've done PCI setups etc, and console_init() must be aware of
diff --git a/lib/Kconfig b/lib/Kconfig
index fe01d41..94bb235 100644
--- a/lib/Kconfig
+++ b/lib/Kconfig
@@ -393,6 +393,20 @@ config SIGNATURE
Digital signature verification. Currently only RSA is supported.
Implementation is done using GnuPG MPI library
+config SKIPLIST
+ bool "Concurrent Skiplists"
+ default n
+ help
+ Concurrent skiplist indexing trees. Just say N
+
+config SKIPLIST_TEST
+ tristate "Skiplist testing module"
+ default n
+ select SKIPLIST
+ help
+ Testing module for the skiplist implementation
+
+#
#
# libfdt files, only selected if needed.
#
diff --git a/lib/Makefile b/lib/Makefile
index 6e2cc56..7c14a12 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -108,6 +108,9 @@ obj-$(CONFIG_NLATTR) += nlattr.o
obj-$(CONFIG_LRU_CACHE) += lru_cache.o
+obj-$(CONFIG_SKIPLIST) += skiplist.o
+obj-$(CONFIG_SKIPLIST_TEST) += skiplist_test.o
+
obj-$(CONFIG_DMA_API_DEBUG) += dma-debug.o
obj-$(CONFIG_GENERIC_CSUM) += checksum.o
diff --git a/lib/skiplist.c b/lib/skiplist.c
new file mode 100644
index 0000000..02cf04e
--- /dev/null
+++ b/lib/skiplist.c
@@ -0,0 +1,2106 @@
+/*
+ * (C) 2011 Liu Bo <liubo2009@cn.fujitsu.com>
+ * (C) 2013 Fusion-io
+ *
+ * This program is free software; you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software
+ * Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 59 Temple
+ * Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include <linux/errno.h>
+#include <linux/init.h>
+#include <linux/kernel.h>
+#include <linux/export.h>
+#include <linux/percpu.h>
+#include <linux/slab.h>
+#include <linux/notifier.h>
+#include <linux/cpu.h>
+#include <linux/string.h>
+#include <linux/rcupdate.h>
+#include <linux/random.h>
+#include <linux/skiplist.h>
+#include <linux/lockdep.h>
+#include <linux/sched.h>
+
+static struct kmem_cache *slab_caches[SKIP_MAXLEVEL];
+
+/*
+ * before starting an insert, we preload based on the current
+ * height of the list. This holds the preload result
+ * in a per-cpu array
+ */
+struct skip_preload {
+ /*
+ * the preload is filled in based on the highest possible level
+ * of the list you're preloading. So we basically end up with
+ * one preloaded node for each max size.
+ */
+ struct sl_leaf *preload[SKIP_MAXLEVEL + 1];
+};
+
+static DEFINE_PER_CPU(struct skip_preload, skip_preloads) = { {NULL,}, };
+
+static void sl_init_node(struct sl_node *node, int level)
+{
+ int i;
+ spin_lock_init(&node->lock);
+
+ /*
+ * from a locking point of view, the skiplists are a dumb linked
+ * list where we take up to three locks in order from left to
+ * right. I haven't been able to teach lockdep how to do this
+ * yet
+ */
+ lockdep_set_novalidate_class(&node->lock);
+
+ for (i = 0; i <= level; i++) {
+ node->ptrs[i].prev = NULL;
+ node->ptrs[i].next = NULL;
+ }
+ node->level = level;
+ node->pending = SKIPLIST_LIVE;
+}
+
+static void sl_init_leaf(struct sl_leaf *leaf, int level)
+{
+ atomic_set(&leaf->refs, 1);
+ sl_init_node(&leaf->node, level);
+}
+
+/*
+ * the rcu based searches need to block reuse until a given search round
+ * is done. So, we use call_rcu for freeing the leaf structure.
+ */
+static void sl_free_rcu(struct rcu_head *head)
+{
+ struct sl_leaf *leaf = container_of(head, struct sl_leaf, rcu_head);
+ kmem_cache_free(slab_caches[leaf->node.level], leaf);
+}
+
+void skiplist_get_leaf(struct sl_leaf *leaf)
+{
+ atomic_inc(&leaf->refs);
+}
+EXPORT_SYMBOL(skiplist_get_leaf);
+
+int skiplist_get_leaf_not_zero(struct sl_leaf *leaf)
+{
+ return atomic_inc_not_zero(&leaf->refs);
+}
+EXPORT_SYMBOL(skiplist_get_leaf_not_zero);
+
+void skiplist_put_leaf(struct sl_leaf *leaf)
+{
+ BUG_ON(atomic_read(&leaf->refs) == 0);
+ if (atomic_dec_and_test(&leaf->refs))
+ call_rcu(&leaf->rcu_head, sl_free_rcu);
+}
+EXPORT_SYMBOL(skiplist_put_leaf);
+
+/*
+ * if a node is currently inserting, this spins until the
+ * insertion is complete. Calling this with another node
+ * locked usually leads to deadlocks because the linking
+ * needs to take neighboring locks.
+ */
+void skiplist_wait_pending_insert(struct sl_node *node)
+{
+ while (sl_node_inserting(node)) {
+ cpu_relax();
+ smp_rmb();
+ }
+}
+EXPORT_SYMBOL(skiplist_wait_pending_insert);
+
+/*
+ * helper function to pull out the next live leaf at a given level.
+ * no locks are required or taken
+ *
+ * Note that if this returns NULL, you may have to wait for a pending
+ * insertion on 'node' before you can trust the answer
+ */
+static inline struct sl_leaf *sl_next_leaf(struct sl_list *list,
+ struct sl_node *node, int l)
+{
+ do {
+ node = rcu_dereference(node->ptrs[l].next);
+ if (!node)
+ return NULL;
+ if (!sl_node_dead(node))
+ return sl_entry(node);
+ } while (1);
+}
+
+/*
+ * helper functions to wade through dead nodes pending deletion
+ * and return live ones. This does wait on pending insertions
+ * as it walks backwards.
+ */
+static struct sl_node *find_live_prev(struct sl_list *list,
+ struct sl_node *node, int level)
+{
+ struct sl_node *prev = NULL;
+ /* head->prev points to the max, this makes sure we don't loop */
+ if (node == list->head)
+ return NULL;
+
+ while (node) {
+ prev = rcu_dereference(node->ptrs[level].prev);
+ if (!prev) {
+ skiplist_wait_pending_insert(node);
+ prev = rcu_dereference(node->ptrs[level].prev);
+ }
+ node = prev;
+ /*
+ * the head is never dead, so we'll never walk past
+ * it down in this loop
+ */
+ if (!sl_node_dead(prev))
+ break;
+ }
+
+ return node;
+}
+
+/*
+ * walks forward to find a live next pointer. This does
+ * not wait on pending insertions because it would deadlock
+ * the callers
+ */
+static struct sl_node *find_live_next(struct sl_list *list,
+ struct sl_node *node, int level)
+{
+ while (1) {
+ node = rcu_dereference(node->ptrs[level].next);
+ if (!node)
+ return NULL;
+ if (!sl_node_dead(node))
+ return node;
+ }
+}
+
+/*
+ * return the highest value for a given leaf. This is cached
+ * in leaf->max so that we don't have to wander into
+ * the slot pointers. The max is equal to the key + size of the
+ * last slot.
+ */
+static unsigned long sl_max_key(struct sl_leaf *leaf)
+{
+ smp_rmb();
+ return leaf->max;
+}
+
+/*
+ * return the lowest key for a given leaf. This comes out
+ * of the node key array and not the slots
+ */
+static unsigned long sl_min_key(struct sl_leaf *leaf)
+{
+ smp_rmb();
+ return leaf->keys[0];
+}
+
+void sl_lock_node(struct sl_node *n)
+{
+ spin_lock(&n->lock);
+}
+
+void sl_unlock_node(struct sl_node *n)
+{
+ if (n)
+ spin_unlock(&n->lock);
+}
+
+void skiplist_lock_leaf(struct sl_leaf *leaf)
+{
+ sl_lock_node(&leaf->node);
+}
+EXPORT_SYMBOL(skiplist_lock_leaf);
+
+void skiplist_unlock_leaf(struct sl_leaf *leaf)
+{
+ if (leaf)
+ sl_unlock_node(&leaf->node);
+}
+EXPORT_SYMBOL(skiplist_unlock_leaf);
+
+void assert_locked_node(struct sl_node *n)
+{
+ assert_spin_locked(&n->lock);
+}
+
+/*
+ * helper function to link a single level during an insert.
+ * prev must be locked, and it is the node we are linking after.
+ *
+ * This will find a live next pointer, lock it, and link it
+ * with our new node
+ */
+static void sl_link_one_level(struct sl_list *list,
+ struct sl_node *prev,
+ struct sl_node *node, int level)
+{
+ struct sl_node *next;
+ struct sl_node *test;
+
+ assert_locked_node(prev);
+ BUG_ON(sl_node_dead(prev));
+
+again:
+ next = find_live_next(list, prev, level);
+ if (next) {
+ /*
+ * next may be pending insertion at this point
+ * but it was linked far enough up for prev to
+ * point to it. So we can safely just use it
+ */
+ sl_lock_node(next);
+ test = find_live_next(list, prev, level);
+ if (test != next || sl_node_dead(next)) {
+ sl_unlock_node(next);
+ goto again;
+ }
+ /*
+ * make sure the our next and prev really point to each
+ * other now that we have next locked.
+ */
+ if (find_live_prev(list, next, level) != prev) {
+ sl_unlock_node(next);
+ goto again;
+ }
+ }
+
+ rcu_assign_pointer(node->ptrs[level].next, next);
+ rcu_assign_pointer(node->ptrs[level].prev, prev);
+ rcu_assign_pointer(prev->ptrs[level].next, node);
+
+ /*
+ * if next is null, we're the last node on this level.
+ * The head->prev pointer is used to cache this fact
+ */
+ if (next)
+ rcu_assign_pointer(next->ptrs[level].prev, node);
+ else
+ rcu_assign_pointer(list->head->ptrs[level].prev, node);
+
+ sl_unlock_node(next);
+}
+
+/*
+ * for everything above level 0 in a new leaf, we link from the bottom
+ * up. This walks the prev pointers in the new leaf to find higher
+ * leaves to link with. The basic idea is to go down one level
+ * and walk backwards until you find a leaf of the right level.
+ *
+ * It looks like much more work than walking down from the top, but
+ * most of the leaves are at the lower levels. Walking from the top
+ * is sure to waste time going past leaves we never use.
+ */
+static noinline struct sl_node *find_higher_prev(struct sl_list *list,
+ struct sl_leaf *ins, int level)
+{
+ struct sl_node *cur;
+ struct sl_node *prev;
+ struct sl_leaf *leaf;
+ int search_level = level - 1;
+
+ BUG_ON(level == 0);
+
+ /*
+ * step one, walk backward on the lower level until
+ * we find a leaf of the proper height.
+ */
+ cur = &ins->node;
+ while (1) {
+ prev = find_live_prev(list, cur, search_level);
+ skiplist_wait_pending_insert(prev);
+ if (prev->level >= level)
+ break;
+ cur = prev;
+ }
+
+ /*
+ * now we have a node at the right level, but while we
+ * walked someone might have inserted leaves between
+ * prev and the insertion point. Walk forward to make
+ * sure we have best leaf for linking.
+ */
+ while (1) {
+ leaf = sl_next_leaf(list, prev, level);
+ if (!leaf || sl_max_key(ins) <= sl_min_key(leaf))
+ return prev;
+
+ prev = &leaf->node;
+ skiplist_wait_pending_insert(prev);
+ }
+}
+
+/*
+ * this does all of the hard work to finish linking a pending leaf.
+ * 'level' tells us how high the caller has already linked, and the
+ * caller must have at least linked level 0.
+ *
+ * leaf must be unlocked, nothing is locked when we return.
+ */
+static noinline void sl_link_pending_leaf(struct sl_list *list,
+ struct sl_leaf *leaf, int level)
+{
+ struct sl_node *prev;
+ struct sl_node *search_prev;
+ struct sl_node *next;
+ struct sl_leaf *next_leaf;
+ struct sl_node *node = &leaf->node;
+ int last_level = node->level;
+
+ /* did the caller do enough already? */
+ if (level > node->level)
+ return;
+
+ while (level <= last_level) {
+ if (node->ptrs[level].prev)
+ prev = find_live_prev(list, node, level);
+ else
+ prev = find_higher_prev(list, leaf, level);
+
+ skiplist_wait_pending_insert(prev);
+ sl_lock_node(prev);
+
+ /* don't link with the dead */
+ if (sl_node_dead(prev)) {
+ sl_unlock_node(prev);
+ continue;
+ }
+
+ /* lock ourselves */
+ sl_lock_node(node);
+
+ /*
+ * if prev and next are already set for this level,
+ * we're done.
+ */
+ if (node->ptrs[level].prev) {
+ if (node->ptrs[level].next) {
+ level++;
+ goto unlock_node;
+ }
+ /*
+ * if our node already has a prev pointer
+ * make sure the prev we found does point to us
+ */
+ if (find_live_next(list, prev, level) != node)
+ goto unlock_node;
+ }
+again:
+ /*
+ * if our node has a next pointer, use it. Otherwise
+ * use the next from prev
+ */
+ if (node->ptrs[level].next)
+ next = find_live_next(list, node, level);
+ else
+ next = find_live_next(list, prev, level);
+
+ /*
+ * if we followed prev, someone might have raced in
+ * and linked this level of node. Make sure we
+ * don't deadlock by trying to node ourselves twice.
+ */
+ if (next == node)
+ goto again;
+
+ if (next) {
+ /*
+ * someone may have raced in and inserted a
+ * node between prev and node. Since we
+ * have node locked, we have to make sure not
+ * to break locking rules if next is actually
+ * before node in the list.
+ */
+ next_leaf = sl_entry(next);
+ if (sl_min_key(next_leaf) < sl_max_key(leaf))
+ goto unlock_node;
+
+ /*
+ * we can't check next for pending here.
+ * we'd just end up deadlocked on him
+ * waiting for us to clear pending.
+ */
+ sl_lock_node(next);
+
+ /*
+ * make sure we've still got the right spot to
+ * link things in
+ */
+ if (sl_node_dead(next))
+ goto unlock_all;
+
+ if (sl_min_key(next_leaf) < sl_max_key(leaf))
+ goto unlock_all;
+
+ /*
+ * finally make sure next really points back to
+ * either prev or our node
+ */
+ search_prev = find_live_prev(list, next, level);
+ if (search_prev != node && search_prev != prev)
+ goto unlock_all;
+ }
+ rcu_assign_pointer(node->ptrs[level].next, next);
+ rcu_assign_pointer(node->ptrs[level].prev, prev);
+ rcu_assign_pointer(prev->ptrs[level].next, node);
+
+ /*
+ * if next is null, we're the last node on this level.
+ * The head->prev pointer is used to cache this fact
+ */
+ if (next)
+ rcu_assign_pointer(next->ptrs[level].prev, node);
+ else
+ rcu_assign_pointer(list->head->ptrs[level].prev, node);
+
+ level++;
+unlock_all:
+ sl_unlock_node(next);
+unlock_node:
+ sl_unlock_node(node);
+ sl_unlock_node(prev);
+ }
+}
+
+/*
+ * when we split a leaf to do an insert, this links our new leaf with the
+ * one we split. We'll use any pointers we can from 'after'
+ */
+static void __sl_link_after_node(struct sl_list *list, struct sl_node *node,
+ struct sl_node *after, int level)
+{
+ int i;
+
+ /* first use all the pointers from 'after' */
+ for (i = 0; i <= after->level && i <= level; i++)
+ sl_link_one_level(list, after, node, i);
+}
+
+/*
+ * final stage of linking, this is where we actually clear pending
+ */
+static void __link_pending_insert(struct sl_list *list, struct sl_node *node, int start_level)
+{
+ sl_link_pending_leaf(list, sl_entry(node), start_level);
+ smp_wmb();
+ node->pending = 0;
+}
+
+static void sl_link_after_node(struct sl_list *list, struct sl_node *node,
+ struct sl_node *after, int level)
+{
+ __sl_link_after_node(list, node, after, level);
+
+ sl_unlock_node(node);
+ sl_unlock_node(after);
+
+ __link_pending_insert(list, node, after->level + 1);
+}
+
+/*
+ * returns the first leaf in the list, locked with an
+ * extra reference added
+ */
+struct sl_leaf *skiplist_first_leaf(struct sl_list *list)
+{
+ struct sl_leaf *leaf = NULL;
+ struct sl_node *p;
+
+ rcu_read_lock();
+ while (1) {
+ p = find_live_next(list, list->head, 0);
+ if (!p)
+ break;
+
+ sl_lock_node(p);
+ if (!sl_node_dead(p) &&
+ find_live_next(list, list->head, 0) == p) {
+ leaf = sl_entry(p);
+ skiplist_get_leaf(leaf);
+ goto out;
+ }
+ sl_unlock_node(p);
+ }
+out:
+ rcu_read_unlock();
+ return leaf;
+}
+EXPORT_SYMBOL(skiplist_first_leaf);
+
+/*
+ * returns the first leaf in the list. No locks are held and
+ * no references are added. Must be called under rcu_read_lock()
+ */
+struct sl_leaf *skiplist_first_leaf_rcu(struct sl_list *list)
+{
+ struct sl_node *p;
+
+ p = find_live_next(list, list->head, 0);
+ if (p)
+ return sl_entry(p);
+ return NULL;
+
+}
+EXPORT_SYMBOL(skiplist_first_leaf_rcu);
+
+/*
+ * sequential search for lockless rcu. The insert/deletion routines
+ * order their operations to make this rcu safe.
+ *
+ * If we find the key, we return zero and set 'slot' to the location.
+ *
+ * If we don't find anything, we return 1 and set 'slot' to the location
+ * where the insertion should take place
+ *
+ * case1:
+ * [ key ... size ]
+ * [found .. found size ]
+ *
+ * case2:
+ * [key ... size ]
+ * [found .. found size ]
+ *
+ * case3:
+ * [key ... size ]
+ * [ found .. found size ]
+ *
+ * case4:
+ * [key ...size ]
+ * [ found ... found size ]
+ *
+ * case5:
+ * [key ...size ]
+ * [ found ... found size ]
+ */
+int skiplist_search_leaf(struct sl_leaf *leaf, unsigned long key,
+ unsigned long size, int *slot)
+{
+ int i;
+ int cur;
+ int last;
+ unsigned long this_key;
+ struct sl_slot *found;
+
+again:
+ cur = 0;
+ last = leaf->nr;
+
+ /* find the first slot greater than our key */
+ for (i = 0; i < last; i++) {
+ smp_rmb();
+ this_key = leaf->keys[i];
+ if (this_key >= key + size)
+ break;
+ cur = i;
+ }
+ if (leaf->keys[cur] < key + size) {
+ /*
+ * if we're in the middle of an insert, pointer may
+ * be null. This little loop will wait for the insertion
+ * to finish.
+ */
+ while (1) {
+ found = rcu_dereference(leaf->ptrs[cur]);
+ if (found)
+ break;
+ cpu_relax();
+ }
+
+ /* insert is juggling our slots, try again */
+ if (found->key != leaf->keys[cur])
+ goto again;
+
+ /* case1, case2, case5 */
+ if (found->key < key + size && found->key + found->size > key) {
+ *slot = cur;
+ return 0;
+ }
+
+ /* case3, case4 */
+ if (found->key < key + size && found->key >= key) {
+ *slot = cur;
+ return 0;
+ }
+
+ *slot = cur + 1;
+ return 1;
+ }
+ *slot = cur;
+ return 1;
+}
+EXPORT_SYMBOL(skiplist_search_leaf);
+
+/*
+ * helper to put a cached leaf and zero out the
+ * pointer
+ */
+static void invalidate_cache(struct sl_leaf **cache)
+{
+ if (cache) {
+ skiplist_put_leaf(*cache);
+ *cache = NULL;
+ }
+}
+
+/*
+ * helper to grab a reference on a cached leaf.
+ */
+static void cache_leaf(struct sl_leaf *leaf, struct sl_leaf **cache)
+{
+ assert_locked_node(&leaf->node);
+ BUG_ON(sl_node_dead(&leaf->node));
+ if (cache && *cache != leaf) {
+ if (*cache)
+ skiplist_put_leaf(*cache);
+ skiplist_get_leaf(leaf);
+ *cache = leaf;
+ }
+}
+
+/*
+ * this does the dirty work of splitting and/or shifting a leaf
+ * to get a new slot inside. The leaf must be locked. slot
+ * tells us where into we should insert in the leaf and the cursor
+ * should have all the pointers we need to fully link any new nodes
+ * we have to create.
+ */
+static noinline int add_key_to_leaf(struct sl_list *list, struct sl_leaf *leaf,
+ struct sl_slot *slot_ptr, unsigned long key,
+ int slot, int preload_token, struct sl_leaf **cache)
+{
+ struct sl_leaf *split;
+ struct skip_preload *skp;
+ int level;
+ int finish_split_insert = 0;
+
+ /* no splitting required, just shift our way in */
+ if (leaf->nr < SKIP_KEYS_PER_NODE)
+ goto insert;
+
+ skp = &__get_cpu_var(skip_preloads);
+ split = skp->preload[preload_token];
+
+ /*
+ * we need to insert a new leaf, but we try not to insert a new leaf at
+ * the same height as our previous one, it's just a waste of high level
+ * searching. If the new node is the same level or lower than the
+ * existing one, try to use a level 0 leaf instead.
+ */
+ if (leaf->node.level > 0 && split->node.level <= leaf->node.level) {
+ if (skp->preload[0]) {
+ preload_token = 0;
+ split = skp->preload[0];
+ }
+ }
+ skp->preload[preload_token] = NULL;
+
+ level = split->node.level;
+
+ /*
+ * bump our list->level to whatever we've found. Nobody allocating
+ * a new node is going to set it higher than list->level + 1
+ */
+ if (level > list->level)
+ list->level = level;
+
+ /*
+ * this locking is really only required for the small window where
+ * we are linking the node and someone might be deleting one of the
+ * nodes we are linking with. The leaf passed in was already
+ * locked.
+ */
+ split->node.pending = SKIPLIST_PENDING_INSERT;
+ sl_lock_node(&split->node);
+
+ if (slot == leaf->nr) {
+ /*
+ * our new slot just goes at the front of the new leaf, don't
+ * bother shifting things in from the previous leaf.
+ */
+ slot = 0;
+ split->nr = 1;
+ split->max = key + slot_ptr->size;
+ split->keys[0] = key;
+ split->ptrs[0] = slot_ptr;
+ smp_wmb();
+ cache_leaf(split, cache);
+ sl_link_after_node(list, &split->node, &leaf->node, level);
+ return 0;
+ } else {
+ int nr = SKIP_KEYS_PER_NODE / 2;
+ int mid = SKIP_KEYS_PER_NODE - nr;
+ int src_i = mid;
+ int dst_i = 0;
+ int orig_nr = leaf->nr;
+
+ /* split the previous leaf in half and copy items over */
+ split->nr = nr;
+ split->max = leaf->max;
+
+ while (src_i < slot) {
+ split->keys[dst_i] = leaf->keys[src_i];
+ split->ptrs[dst_i++] = leaf->ptrs[src_i++];
+ }
+
+ if (slot >= mid) {
+ split->keys[dst_i] = key;
+ split->ptrs[dst_i++] = slot_ptr;
+ split->nr++;
+ }
+
+ while (src_i < orig_nr) {
+ split->keys[dst_i] = leaf->keys[src_i];
+ split->ptrs[dst_i++] = leaf->ptrs[src_i++];
+ }
+
+ /*
+ * this completes the initial link, but we still
+ * have the original node and split locked
+ */
+ __sl_link_after_node(list, &split->node, &leaf->node, level);
+
+ nr = SKIP_KEYS_PER_NODE - nr;
+
+ /*
+ * now what we have all the items copied and our new
+ * leaf inserted, update the nr in this leaf. Anyone
+ * searching in rculand will find the fully updated
+ */
+ leaf->max = leaf->keys[nr - 1] + leaf->ptrs[nr - 1]->size;
+ smp_wmb();
+ leaf->nr = nr;
+
+ /*
+ * if the slot was in split item, we're done,
+ * otherwise we need to move down into the
+ * code below that shifts the items and
+ * inserts the new key
+ */
+ if (slot >= mid) {
+ cache_leaf(split, cache);
+ sl_unlock_node(&split->node);
+ sl_unlock_node(&leaf->node);
+
+ /* finish linking split all the way to the top */
+ __link_pending_insert(list, &split->node,
+ leaf->node.level + 1);
+ return 0;
+ }
+
+ /*
+ * unlock our split node and let the code below finish
+ * the key insertion. We'll finish inserting split
+ * after we unlock leaf
+ */
+ sl_unlock_node(&split->node);
+ finish_split_insert = 1;
+ }
+insert:
+ if (slot < leaf->nr) {
+ int i;
+
+ /*
+ * put something sane into the new last slot so rcu
+ * searchers won't get confused
+ */
+ leaf->keys[leaf->nr] = 0;
+ leaf->ptrs[leaf->nr] = NULL;
+ smp_wmb();
+
+ /* then bump the nr */
+ leaf->nr++;
+
+ /*
+ * now step through each pointer after our
+ * destination and bubble it forward. memcpy
+ * would be faster but rcu searchers will be
+ * able to validate pointers as they go with
+ * this method.
+ */
+ for (i = leaf->nr - 1; i > slot; i--) {
+ leaf->keys[i] = leaf->keys[i - 1];
+ leaf->ptrs[i] = leaf->ptrs[i - 1];
+ /*
+ * make sure the key/pointer pair is
+ * fully visible in the new home before
+ * we move forward
+ */
+ smp_wmb();
+ }
+
+ /* finally stuff in our key */
+ leaf->keys[slot] = key;
+ leaf->ptrs[slot] = slot_ptr;
+ smp_wmb();
+ } else {
+ /*
+ * just extending the leaf, toss
+ * our key in and update things
+ */
+ leaf->max = key + slot_ptr->size;
+ leaf->keys[slot] = key;
+ leaf->ptrs[slot] = slot_ptr;
+
+ smp_wmb();
+ leaf->nr++;
+ }
+ cache_leaf(leaf, cache);
+ sl_unlock_node(&leaf->node);
+ if (finish_split_insert)
+ __link_pending_insert(list, &split->node, leaf->node.level + 1);
+ return 0;
+}
+
+/*
+ * helper function for insert. This will either return an existing
+ * key or insert a new slot into the list. leaf must be locked.
+ */
+static noinline int find_or_add_key(struct sl_list *list, unsigned long key,
+ unsigned long size, struct sl_leaf *leaf,
+ struct sl_slot *slot_ptr,
+ int preload_token, struct sl_leaf **cache)
+{
+ int ret;
+ int slot;
+
+ if (key < leaf->max) {
+ ret = skiplist_search_leaf(leaf, key, size, &slot);
+ if (ret == 0) {
+ ret = -EEXIST;
+ cache_leaf(leaf, cache);
+ sl_unlock_node(&leaf->node);
+ goto out;
+ }
+ } else {
+ slot = leaf->nr;
+ }
+
+ atomic_inc(&slot_ptr->refs);
+
+ /* add_key_to_leaf unlocks the node */
+ add_key_to_leaf(list, leaf, slot_ptr, key, slot, preload_token, cache);
+ ret = 0;
+
+out:
+ return ret;
+}
+
+/*
+ * pull a new leaf out of the prealloc area, and insert the slot/key into it
+ */
+static struct sl_leaf *alloc_leaf(struct sl_slot *slot_ptr, unsigned long key,
+ int preload_token)
+{
+ struct sl_leaf *leaf;
+ struct skip_preload *skp;
+ int level;
+
+ skp = &__get_cpu_var(skip_preloads);
+ leaf = skp->preload[preload_token];
+ skp->preload[preload_token] = NULL;
+ level = leaf->node.level;
+
+ leaf->keys[0] = key;
+ leaf->ptrs[0] = slot_ptr;
+ leaf->nr = 1;
+ leaf->max = key + slot_ptr->size;
+ return leaf;
+}
+
+/*
+ * this returns with preempt disabled and the preallocation
+ * area setup for a new insert. To get there, it may or
+ * may not allocate a new leaf for the next insert.
+ *
+ * If allocations are done, this will also try to preallocate a level 0
+ * leaf, which allows us to optimize insertion by not placing two
+ * adjacent nodes together with the same level.
+ *
+ * This returns < 0 on errors. If everything works, it returns a preload
+ * token which you should use when fetching your preallocated items.
+ *
+ * The token allows us to preallocate based on the current
+ * highest level of the list. For a list of level N, we won't allocate
+ * higher than N + 1.
+ */
+int skiplist_preload(struct sl_list *list, gfp_t gfp_mask)
+{
+ struct skip_preload *skp;
+ struct sl_leaf *leaf;
+ struct sl_leaf *leaf0 = NULL;
+ int alloc_leaf0 = 1;
+ int level;
+ int max_level = min_t(int, list->level + 1, SKIP_MAXLEVEL - 1);
+ int token = max_level;
+
+ preempt_disable();
+ skp = &__get_cpu_var(skip_preloads);
+ if (max_level && !skp->preload[0])
+ alloc_leaf0 = 1;
+
+ if (skp->preload[max_level])
+ return token;
+
+ preempt_enable();
+ level = skiplist_get_new_level(list, max_level);
+ leaf = kmem_cache_alloc(slab_caches[level], gfp_mask);
+ if (leaf == NULL)
+ return -ENOMEM;
+
+ if (alloc_leaf0)
+ leaf0 = kmem_cache_alloc(slab_caches[0], gfp_mask);
+
+ preempt_disable();
+ skp = &__get_cpu_var(skip_preloads);
+
+ if (leaf0) {
+ sl_init_leaf(leaf0, 0);
+ if (skp->preload[0] == NULL) {
+ skp->preload[0] = leaf0;
+ } else {
+ skiplist_put_leaf(leaf0);
+ }
+ }
+
+ sl_init_leaf(leaf, level);
+ if (skp->preload[max_level]) {
+ skiplist_put_leaf(leaf);
+ return token;
+ }
+ skp->preload[max_level] = leaf;
+
+ return token;
+}
+EXPORT_SYMBOL(skiplist_preload);
+
+/*
+ * use the kernel prandom call to pick a new random level. This
+ * uses P = .50. If you bump the SKIP_MAXLEVEL past 32 levels,
+ * this function needs updating.
+ */
+int skiplist_get_new_level(struct sl_list *list, int max_level)
+{
+ int level = 0;
+ unsigned long randseed;
+
+ randseed = prandom_u32();
+
+ while (randseed && (randseed & 1)) {
+ randseed >>= 1;
+ level++;
+ if (level == max_level)
+ break;
+ }
+ return (level >= SKIP_MAXLEVEL ? SKIP_MAXLEVEL - 1: level);
+}
+EXPORT_SYMBOL(skiplist_get_new_level);
+
+/*
+ * just return the level of the leaf we're going to use
+ * for the next insert
+ */
+static int pending_insert_level(int preload_token)
+{
+ struct skip_preload *skp;
+ skp = &__get_cpu_var(skip_preloads);
+ return skp->preload[preload_token]->node.level;
+}
+
+/*
+ * after a lockless search, this makes sure a given key is still
+ * inside the min/max of a leaf. If not, you have to repeat the
+ * search and try again.
+ */
+static int verify_key_in_leaf(struct sl_leaf *leaf, unsigned long key,
+ unsigned long size)
+{
+ if (sl_node_dead(&leaf->node))
+ return 0;
+
+ if (key + size < sl_min_key(leaf) ||
+ key >= sl_max_key(leaf))
+ return 0;
+ return 1;
+}
+
+/*
+ * The insertion code tries to delay taking locks for as long as possible.
+ * Once we've found a good place to insert, we need to make sure the leaf
+ * we have picked is still a valid location.
+ *
+ * This checks the previous and next pointers to make sure everything is
+ * still correct for the insert. You should send an unlocked leaf, and
+ * it will return 1 with the leaf locked if everything worked.
+ *
+ */
+static int verify_key_in_path(struct sl_list *list,
+ struct sl_node *node, unsigned long key,
+ unsigned long size)
+{
+ struct sl_leaf *prev = NULL;
+ struct sl_leaf *next;
+ struct sl_node *p;
+ struct sl_leaf *leaf = NULL;
+ struct sl_node *lock1;
+ struct sl_node *lock2;
+ struct sl_node *lock3;
+ int level = 0;
+ int ret = -EAGAIN;
+
+again:
+ lock1 = NULL;
+ lock2 = NULL;
+ lock3 = NULL;
+ if (node != list->head) {
+ p = rcu_dereference(node->ptrs[level].prev);
+ skiplist_wait_pending_insert(p);
+ sl_lock_node(p);
+ lock1 = p;
+
+ sl_lock_node(node);
+ lock2 = node;
+
+ if (sl_node_dead(node))
+ goto out;
+
+ if (p->ptrs[level].next != node) {
+ sl_unlock_node(lock1);
+ sl_unlock_node(lock2);
+ goto again;
+ }
+
+ if (sl_node_dead(p))
+ goto out;
+
+
+ if (p != list->head)
+ prev = sl_entry(p);
+
+ /*
+ * once we have the locks, make sure everyone
+ * still points to each other
+ */
+ if (node->ptrs[level].prev != p) {
+ sl_unlock_node(lock1);
+ sl_unlock_node(lock2);
+ goto again;
+ }
+
+ leaf = sl_entry(node);
+ } else {
+ sl_lock_node(node);
+ lock2 = node;
+ }
+
+ /*
+ * rule #1, the key must be greater than the max of the previous
+ * leaf
+ */
+ if (prev && key < sl_max_key(prev))
+ goto out;
+
+ /* we're done with prev, unlock it */
+ sl_unlock_node(lock1);
+ lock1 = NULL;
+
+ /* rule #2 is the key must be smaller than the min key
+ * in the next node. If the key is already smaller than
+ * the max key of this node, we know we're safe for rule #2
+ */
+ if (leaf && key < sl_max_key(leaf))
+ return 0;
+again_next:
+ p = node->ptrs[level].next;
+ if (p)
+ next = sl_entry(p);
+ else
+ next = NULL;
+
+ if (next) {
+ if (key >= sl_min_key(next))
+ goto out;
+
+ if (sl_node_inserting(&next->node)) {
+ sl_unlock_node(lock2);
+ skiplist_wait_pending_insert(&next->node);
+ goto again;
+ }
+ sl_lock_node(&next->node);
+ lock3 = &next->node;
+
+ if (sl_node_dead(&next->node)) {
+ sl_unlock_node(lock3);
+ sl_unlock_node(lock2);
+ goto again;
+ }
+
+ if (next->node.ptrs[level].prev != node) {
+ sl_unlock_node(lock3);
+ goto again_next;
+ }
+
+ /*
+ * rule #2 the key must be smaller than the min key
+ * in the next node
+ */
+ if (key >= sl_min_key(next))
+ goto out;
+
+ if (key + size > sl_min_key(next)) {
+ ret = -EEXIST;
+ goto out;
+ }
+ }
+ /*
+ * return with our leaf locked and sure that our leaf is the
+ * best place for this key
+ */
+ sl_unlock_node(lock1);
+ sl_unlock_node(lock3);
+ return 0;
+
+out:
+ sl_unlock_node(lock1);
+ sl_unlock_node(lock2);
+ sl_unlock_node(lock3);
+ return ret;
+}
+
+
+/*
+ * Before calling this you must have stocked the preload area by
+ * calling skiplist_preload, and you must have kept preemption
+ * off. preload_token comes from skiplist_preload, pass in
+ * exactly what preload gave you.
+ *
+ * 'slot' will be inserted into the skiplist, returning 0
+ * on success or < 0 on failure
+ *
+ * If 'cache' isn't null, we try to insert into cache first.
+ * When we return a reference to the leaf where the insertion
+ * was done is added.
+ *
+ * More details in the comments below.
+ */
+int skiplist_insert(struct sl_list *list, struct sl_slot *slot,
+ int preload_token, struct sl_leaf **cache)
+{
+ struct sl_node *p;
+ struct sl_node *ins_locked = NULL;
+ struct sl_leaf *leaf;
+ unsigned long key = slot->key;
+ unsigned long size = slot->size;
+ unsigned long min_key;
+ unsigned long max_key;
+ int level;
+ int ret;
+ int err;
+
+ rcu_read_lock();
+ ret = -EEXIST;
+
+ /* try our cache first */
+ if (cache && *cache) {
+ leaf = *cache;
+ skiplist_wait_pending_insert(&leaf->node);
+ err = verify_key_in_path(list, &leaf->node, key, size);
+ if (err == -EEXIST) {
+ ret = -EEXIST;
+ goto fail;
+ } else if (err == 0) {
+ ins_locked = &leaf->node;
+ level = 0;
+ goto find_or_add;
+ } else {
+ invalidate_cache(cache);
+ }
+ }
+
+again:
+ p = list->head;
+ level = list->level;
+
+ do {
+ while (1) {
+ leaf = sl_next_leaf(list, p, level);
+ if (!leaf) {
+ skiplist_wait_pending_insert(p);
+ /*
+ * if we're at level 0 and p points to
+ * the head, the list is just empty. If
+ * we're not at level 0 yet, keep walking
+ * down.
+ */
+ if (p == list->head || level != 0)
+ break;
+
+ err = verify_key_in_path(list, p, key, size);
+ if (err == -EEXIST) {
+ ret = -EEXIST;
+ goto fail;
+ } else if (err) {
+ goto again;
+ }
+
+ leaf = sl_next_leaf(list, p, level);
+ if (leaf) {
+ sl_unlock_node(p);
+ goto again;
+ }
+
+ /*
+ * p was the last leaf on the bottom level,
+ * We're here because 'key' was bigger than the
+ * max key in p. find_or_add will append into
+ * the last leaf.
+ */
+ ins_locked = p;
+ goto find_or_add;
+ }
+
+ max_key = sl_max_key(leaf);
+
+ /*
+ * strictly speaking this test is covered again below.
+ * But usually we have to walk forward through the
+ * pointers, so this is the most common condition. Try
+ * it first.
+ */
+ if (key >= max_key)
+ goto next;
+
+ min_key = sl_min_key(leaf);
+
+ if (key < min_key) {
+ /*
+ * our key is smaller than the smallest key in
+ * leaf. If we're not in level 0 yet, we don't
+ * want to cross over into the leaf
+ */
+ if (level != 0)
+ break;
+
+ p = &leaf->node;
+ skiplist_wait_pending_insert(p);
+
+ err = verify_key_in_path(list, p, key, size);
+ if (err == -EEXIST) {
+ ret = -EEXIST;
+ goto fail;
+ } else if (err) {
+ goto again;
+ }
+
+ if (key >= sl_min_key(leaf)) {
+ sl_unlock_node(p);
+ goto again;
+ }
+
+ /*
+ * we are in level 0, prepend our key
+ * into this leaf
+ */
+ ins_locked = p;
+ goto find_or_add;
+ }
+
+ if (key < sl_max_key(leaf)) {
+ /*
+ * our key is smaller than the max
+ * and bigger than the min, this is
+ * the one true leaf for our key no
+ * matter what level we're in
+ */
+ p = &leaf->node;
+ skiplist_wait_pending_insert(p);
+ sl_lock_node(p);
+
+ if (sl_node_dead(p) ||
+ key >= sl_max_key(leaf) ||
+ key < sl_min_key(leaf)) {
+ sl_unlock_node(p);
+ goto again;
+ }
+ ins_locked = p;
+ goto find_or_add;
+ }
+next:
+ p = &leaf->node;
+ }
+
+ level--;
+ } while (level >= 0);
+
+ /*
+ * we only get here if the list is completely empty. FIXME
+ * this can be folded into the find_or_add code below
+ */
+
+ sl_lock_node(list->head);
+ if (list->head->ptrs[0].next != NULL) {
+ sl_unlock_node(list->head);
+ goto again;
+ }
+ atomic_inc(&slot->refs);
+ leaf = alloc_leaf(slot, key, preload_token);
+ level = leaf->node.level;
+ leaf->node.pending = SKIPLIST_PENDING_INSERT;
+ sl_lock_node(&leaf->node);
+
+ if (level > list->level)
+ list->level++;
+
+ cache_leaf(leaf, cache);
+
+ /* unlocks the leaf and the list head */
+ sl_link_after_node(list, &leaf->node, list->head, level);
+ ret = 0;
+ rcu_read_unlock();
+
+ return ret;
+
+find_or_add:
+
+ leaf = sl_entry(ins_locked);
+
+ /* ins_locked is unlocked */
+ ret = find_or_add_key(list, key, size, leaf, slot,
+ preload_token, cache);
+fail:
+ rcu_read_unlock();
+ return ret;
+}
+EXPORT_SYMBOL(skiplist_insert);
+
+/*
+ * lookup has two stages. First we find the leaf that should have
+ * our key, and then we go through all the slots in that leaf and
+ * look for the key. This helper function is just the first stage
+ * and it must be called under rcu_read_lock(). You may be using the
+ * non-rcu final lookup variant, but this part must be rcu.
+ *
+ * We'll return NULL if we find nothing or the candidate leaf
+ * for you to search.
+ *
+ * last is set to one leaf before the leaf we checked. skiplist_insert_hole
+ * uses this to search for ranges.
+ */
+struct sl_leaf *__skiplist_lookup_leaf(struct sl_list *list,
+ struct sl_node **last,
+ unsigned long key,
+ unsigned long size)
+{
+ struct sl_node *p;
+ struct sl_leaf *leaf;
+ int level;
+ struct sl_leaf *leaf_ret = NULL;
+ unsigned long max_key = 0;
+ unsigned long min_key = 0;
+
+ level = list->level;
+ p = list->head;
+ do {
+ while (1) {
+ leaf = sl_next_leaf(list, p, level);
+ if (!leaf) {
+ if (sl_node_inserting(p)) {
+ skiplist_wait_pending_insert(p);
+ continue;
+ }
+ break;
+ }
+ max_key = sl_max_key(leaf);
+
+ if (key >= max_key)
+ goto next;
+
+ min_key = sl_min_key(leaf);
+
+ if (key < min_key)
+ break;
+
+ if (key < max_key) {
+ leaf_ret = leaf;
+ goto done;
+ }
+next:
+ p = &leaf->node;
+ }
+ level--;
+ } while (level >= 0);
+
+done:
+ if (last)
+ *last = p;
+ return leaf_ret;
+}
+
+/*
+ * return the first leaf containing this key/size. NULL
+ * is returned if we find nothing. Must be called under
+ * rcu_read_lock.
+ */
+struct sl_leaf *skiplist_lookup_leaf_rcu(struct sl_list *list,
+ unsigned long key,
+ unsigned long size)
+{
+ return __skiplist_lookup_leaf(list, NULL, key, size);
+}
+EXPORT_SYMBOL(skiplist_lookup_leaf_rcu);
+
+/*
+ * returns a locked leaf that contains key/size, or NULL
+ */
+struct sl_leaf *skiplist_lookup_leaf(struct sl_list *list,
+ unsigned long key,
+ unsigned long size)
+
+{
+ struct sl_leaf *leaf;
+ rcu_read_lock();
+again:
+ leaf = __skiplist_lookup_leaf(list, NULL, key, size);
+ if (leaf) {
+ sl_lock_node(&leaf->node);
+ if (!verify_key_in_leaf(leaf, key, size)) {
+ sl_unlock_node(&leaf->node);
+ goto again;
+ }
+ }
+ rcu_read_unlock();
+ return leaf;
+}
+EXPORT_SYMBOL(skiplist_lookup_leaf);
+
+/*
+ * returns a locked leaf that might contain key/size. An
+ * extra reference is taken on the leaf we return
+ */
+struct sl_leaf *skiplist_lookup_first_leaf(struct sl_list *list,
+ unsigned long key,
+ unsigned long size)
+
+{
+ struct sl_leaf *leaf;
+ struct sl_node *last = NULL;
+ rcu_read_lock();
+again:
+ leaf = __skiplist_lookup_leaf(list, &last, key, size);
+ if (leaf) {
+ smp_rmb();
+ skiplist_wait_pending_insert(&leaf->node);
+ sl_lock_node(&leaf->node);
+ if (!verify_key_in_leaf(leaf, key, size)) {
+ sl_unlock_node(&leaf->node);
+ goto again;
+ }
+ skiplist_get_leaf(leaf);
+ } else if (last && last != list->head) {
+ smp_rmb();
+ if (sl_node_dead(last))
+ goto again;
+
+ skiplist_wait_pending_insert(last);
+ sl_lock_node(last);
+
+ if (sl_node_dead(last)) {
+ sl_unlock_node(last);
+ goto again;
+ }
+ leaf = sl_entry(last);
+ skiplist_get_leaf(leaf);
+ }
+ rcu_read_unlock();
+ return leaf;
+}
+EXPORT_SYMBOL(skiplist_lookup_first_leaf);
+
+/*
+ * rcu_read_lock must be held. This returns a leaf that may
+ * contain key/size
+ */
+struct sl_leaf *skiplist_lookup_first_leaf_rcu(struct sl_list *list,
+ unsigned long key,
+ unsigned long size)
+
+{
+ struct sl_leaf *leaf;
+ struct sl_node *last;
+
+again:
+ last = NULL;
+ leaf = __skiplist_lookup_leaf(list, &last, key, size);
+ if (leaf) {
+ return leaf;
+ } else if (last && last != list->head) {
+ if (sl_node_dead(last))
+ goto again;
+ return sl_entry(last);
+ }
+ return NULL;
+}
+EXPORT_SYMBOL(skiplist_lookup_first_leaf_rcu);
+
+/*
+ * given a locked leaf, this returns the next leaf in the
+ * skiplist (locked)
+ */
+struct sl_leaf *skiplist_next_leaf(struct sl_list *list,
+ struct sl_leaf *leaf)
+{
+ struct sl_leaf *next_leaf = NULL;
+ struct sl_node *node;
+
+ rcu_read_lock();
+ while (1) {
+ node = find_live_next(list, &leaf->node, 0);
+ if (!node)
+ break;
+
+ next_leaf = sl_entry(node);
+
+ sl_lock_node(node);
+ if (!sl_node_dead(node) &&
+ find_live_prev(list, node, 0) == &leaf->node) {
+ skiplist_get_leaf(next_leaf);
+ break;
+ }
+
+ sl_unlock_node(node);
+ }
+ rcu_read_unlock();
+
+ return next_leaf;
+}
+EXPORT_SYMBOL(skiplist_next_leaf);
+
+struct sl_leaf *skiplist_next_leaf_rcu(struct sl_list *list,
+ struct sl_leaf *leaf)
+{
+ struct sl_node *next;
+
+ next = find_live_next(list, &leaf->node, 0);
+ if (next)
+ return sl_entry(next);
+ return NULL;
+}
+EXPORT_SYMBOL(skiplist_next_leaf_rcu);
+
+/*
+ * this lookup function expects RCU to protect the slots in the leaves
+ * as well as the skiplist indexing structures
+ *
+ * Note, you must call this with rcu_read_lock held, and you must verify
+ * the result yourself. If the key field of the returned slot doesn't
+ * match your key, repeat the lookup. Reference counting etc is also
+ * all your responsibility.
+ */
+struct sl_slot *skiplist_lookup_rcu(struct sl_list *list, unsigned long key,
+ unsigned long size)
+{
+ struct sl_leaf *leaf;
+ struct sl_slot *slot_ret = NULL;
+ int slot;
+ int ret;
+
+again:
+ leaf = __skiplist_lookup_leaf(list, NULL, key, size);
+ if (leaf) {
+ ret = skiplist_search_leaf(leaf, key, size, &slot);
+ if (ret == 0)
+ slot_ret = rcu_dereference(leaf->ptrs[slot]);
+ else if (!verify_key_in_leaf(leaf, key, size))
+ goto again;
+ }
+ return slot_ret;
+
+}
+EXPORT_SYMBOL(skiplist_lookup_rcu);
+
+/*
+ * this lookup function only uses RCU to protect the skiplist indexing
+ * structs. The actual slots are protected by full locks.
+ */
+struct sl_slot *skiplist_lookup(struct sl_list *list, unsigned long key,
+ unsigned long size)
+{
+ struct sl_leaf *leaf;
+ struct sl_slot *slot_ret = NULL;
+ struct sl_node *p;
+ int slot;
+ int ret;
+
+again:
+ rcu_read_lock();
+ leaf = __skiplist_lookup_leaf(list, &p, key, size);
+ if (leaf) {
+ sl_lock_node(&leaf->node);
+ if (!verify_key_in_leaf(leaf, key, size)) {
+ sl_unlock_node(&leaf->node);
+ rcu_read_unlock();
+ goto again;
+ }
+ ret = skiplist_search_leaf(leaf, key, size, &slot);
+ if (ret == 0) {
+ slot_ret = leaf->ptrs[slot];
+ if (atomic_inc_not_zero(&slot_ret->refs) == 0)
+ slot_ret = NULL;
+ }
+ sl_unlock_node(&leaf->node);
+ }
+ rcu_read_unlock();
+ return slot_ret;
+
+}
+EXPORT_SYMBOL(skiplist_lookup);
+
+/* helper for skiplist_insert_hole. the iommu requires alignment */
+static unsigned long align_start(unsigned long val, unsigned long align)
+{
+ return (val + align - 1) & ~(align - 1);
+}
+
+/*
+ * this is pretty ugly, but it is used to find a free spot in the
+ * tree for a new iommu allocation. We start from a given
+ * hint and try to find an aligned range of a given size.
+ *
+ * Send the slot pointer, and we'll update it with the location
+ * we found.
+ *
+ * This will return -EAGAIN if we found a good spot but someone
+ * raced in and allocated it before we could. This gives the
+ * caller the chance to update their hint.
+ *
+ * This will return -EEXIST if we couldn't find anything at all
+ *
+ * returns 0 if all went well, or some other negative error
+ * if things went badly.
+ */
+int skiplist_insert_hole(struct sl_list *list, unsigned long hint,
+ unsigned long limit,
+ unsigned long size, unsigned long align,
+ struct sl_slot *slot,
+ gfp_t gfp_mask)
+{
+ unsigned long last_end = 0;
+ struct sl_node *p;
+ struct sl_leaf *leaf;
+ int i;
+ int ret = -EEXIST;
+ int preload_token;
+ int pending_level;
+
+ preload_token = skiplist_preload(list, gfp_mask);
+ if (preload_token < 0) {
+ return preload_token;
+ }
+ pending_level = pending_insert_level(preload_token);
+
+ /* step one, lets find our hint */
+ rcu_read_lock();
+again:
+
+ last_end = max(last_end, hint);
+ last_end = align_start(last_end, align);
+ slot->key = align_start(hint, align);
+ slot->size = size;
+ leaf = __skiplist_lookup_leaf(list, &p, hint, 1);
+ if (!p)
+ p = list->head;
+
+ if (leaf && !verify_key_in_leaf(leaf, hint, size)) {
+ goto again;
+ }
+
+again_lock:
+ sl_lock_node(p);
+ if (sl_node_dead(p)) {
+ sl_unlock_node(p);
+ goto again;
+ }
+
+ if (p != list->head) {
+ leaf = sl_entry(p);
+ /*
+ * the leaf we found was past the hint,
+ * go back one
+ */
+ if (sl_max_key(leaf) > hint) {
+ struct sl_node *locked = p;
+ p = p->ptrs[0].prev;
+ sl_unlock_node(locked);
+ goto again_lock;
+ }
+ last_end = align_start(sl_max_key(sl_entry(p)), align);
+ }
+
+ /*
+ * now walk at level 0 and find a hole. We could use lockless walks
+ * if we wanted to bang more on the insertion code, but this
+ * instead holds the lock on each node as we inspect it
+ *
+ * This is a little sloppy, insert will return -eexist if we get it
+ * wrong.
+ */
+ while(1) {
+ leaf = sl_next_leaf(list, p, 0);
+ if (!leaf)
+ break;
+
+ /* p and leaf are locked */
+ sl_lock_node(&leaf->node);
+ if (last_end > sl_max_key(leaf))
+ goto next;
+
+ for (i = 0; i < leaf->nr; i++) {
+ if (last_end > leaf->keys[i])
+ continue;
+ if (leaf->keys[i] - last_end >= size) {
+
+ if (last_end + size > limit) {
+ sl_unlock_node(&leaf->node);
+ goto out_rcu;
+ }
+
+ sl_unlock_node(p);
+ slot->key = last_end;
+ slot->size = size;
+ goto try_insert;
+ }
+ last_end = leaf->keys[i] + leaf->ptrs[i]->size;
+ last_end = align_start(last_end, align);
+ if (last_end + size > limit) {
+ sl_unlock_node(&leaf->node);
+ goto out_rcu;
+ }
+ }
+next:
+ sl_unlock_node(p);
+ p = &leaf->node;
+ }
+
+ if (last_end + size <= limit) {
+ sl_unlock_node(p);
+ slot->key = last_end;
+ slot->size = size;
+ goto try_insert;
+ }
+
+out_rcu:
+ /* we've failed */
+ sl_unlock_node(p);
+ rcu_read_unlock();
+ preempt_enable();
+
+ return ret;
+
+try_insert:
+ /*
+ * if the pending_level is zero or there is room in the
+ * leaf, we're ready to insert. This is true most of the
+ * time, and we won't have to drop our lock and give others
+ * the chance to race in and steal our spot.
+ */
+ if (leaf && (pending_level == 0 || leaf->nr < SKIP_KEYS_PER_NODE) &&
+ !sl_node_dead(&leaf->node) && (slot->key >= sl_min_key(leaf) &&
+ slot->key + slot->size <= sl_max_key(leaf))) {
+ ret = find_or_add_key(list, slot->key, size, leaf, slot,
+ preload_token, NULL);
+ rcu_read_unlock();
+ goto out;
+ }
+ /*
+ * no such luck, drop our lock and try the insert the
+ * old fashioned way
+ */
+ if (leaf)
+ sl_unlock_node(&leaf->node);
+
+ rcu_read_unlock();
+ ret = skiplist_insert(list, slot, preload_token, NULL);
+
+out:
+ /*
+ * if we get an EEXIST here, it just means we lost the race.
+ * return eagain to the caller so they can update the hint
+ */
+ if (ret == -EEXIST)
+ ret = -EAGAIN;
+
+ preempt_enable();
+ return ret;
+}
+EXPORT_SYMBOL(skiplist_insert_hole);
+
+/*
+ * we erase one level at a time, from top to bottom.
+ * The basic idea is to find a live prev and next pair,
+ * and make them point to each other.
+ *
+ * For a given level, this takes locks on prev, node, next
+ * makes sure they all point to each other and then
+ * removes node from the middle.
+ *
+ * The node must already be marked dead and it must already
+ * be empty.
+ */
+static void erase_one_level(struct sl_list *list,
+ struct sl_node *node, int level)
+{
+ struct sl_node *prev;
+ struct sl_node *next;
+ struct sl_node *test_prev;
+ struct sl_node *test_next;
+
+again:
+ prev = find_live_prev(list, node, level);
+ skiplist_wait_pending_insert(prev);
+ sl_lock_node(prev);
+
+ test_prev = find_live_prev(list, node, level);
+ if (test_prev != prev || sl_node_dead(prev)) {
+ sl_unlock_node(prev);
+ goto again;
+ }
+
+again_next:
+ next = find_live_next(list, prev, level);
+ if (next) {
+ if (sl_node_inserting(next)) {
+ sl_unlock_node(prev);
+ skiplist_wait_pending_insert(next);
+ goto again;
+ }
+ sl_lock_node(next);
+ test_next = find_live_next(list, prev, level);
+ if (test_next != next || sl_node_dead(next)) {
+ sl_unlock_node(next);
+ goto again_next;
+ }
+ test_prev = find_live_prev(list, next, level);
+ test_next = find_live_next(list, prev, level);
+ if (test_prev != prev || test_next != next) {
+ sl_unlock_node(prev);
+ sl_unlock_node(next);
+ goto again;
+ }
+ }
+ rcu_assign_pointer(prev->ptrs[level].next, next);
+ if (next)
+ rcu_assign_pointer(next->ptrs[level].prev, prev);
+ else if (prev != list->head)
+ rcu_assign_pointer(list->head->ptrs[level].prev, prev);
+ else
+ rcu_assign_pointer(list->head->ptrs[level].prev, NULL);
+
+ sl_unlock_node(prev);
+ sl_unlock_node(next);
+}
+
+/*
+ * for a marked dead, unlink all the levels. The leaf must
+ * not be locked
+ */
+void sl_erase(struct sl_list *list, struct sl_leaf *leaf)
+{
+ int i;
+ int level = leaf->node.level;
+
+ for (i = level; i >= 0; i--)
+ erase_one_level(list, &leaf->node, i);
+}
+
+/*
+ * helper for skiplist_delete, this pushes pointers
+ * around to remove a single slot
+ */
+static void delete_slot(struct sl_leaf *leaf, int slot)
+{
+ if (slot != leaf->nr - 1) {
+ int i;
+ for (i = slot; i <= leaf->nr - 1; i++) {
+ leaf->keys[i] = leaf->keys[i + 1];
+ leaf->ptrs[i] = leaf->ptrs[i + 1];
+ smp_wmb();
+ }
+ } else if (leaf->nr > 1) {
+ leaf->max = leaf->keys[leaf->nr - 2] +
+ leaf->ptrs[leaf->nr - 2]->size;
+ smp_wmb();
+ }
+ leaf->nr--;
+}
+
+/*
+ * find a given [key, size] in the skiplist and remove it.
+ * If we find anything, we return the slot pointer that
+ * was stored in the tree.
+ *
+ * deletion involves a mostly lockless lookup to
+ * find the right leaf. Then we take the lock and find the
+ * correct slot.
+ *
+ * The slot is removed from the leaf, and if the leaf
+ * is now empty, it is removed from the skiplist.
+ */
+struct sl_slot *skiplist_delete(struct sl_list *list,
+ unsigned long key,
+ unsigned long size)
+{
+ struct sl_slot *slot_ret = NULL;
+ struct sl_leaf *leaf;
+ int slot;
+ int ret;
+
+ rcu_read_lock();
+again:
+ leaf = __skiplist_lookup_leaf(list, NULL, key, size);
+ if (!leaf)
+ goto out;
+
+ skiplist_wait_pending_insert(&leaf->node);
+
+ sl_lock_node(&leaf->node);
+ if (!verify_key_in_leaf(leaf, key, size)) {
+ sl_unlock_node(&leaf->node);
+ goto again;
+ }
+
+ ret = skiplist_search_leaf(leaf, key, size, &slot);
+ if (ret == 0) {
+ slot_ret = leaf->ptrs[slot];
+ } else {
+ sl_unlock_node(&leaf->node);
+ goto out;
+ }
+
+ delete_slot(leaf, slot);
+ if (leaf->nr == 0) {
+ /*
+ * sl_erase has to mess wit the prev pointers, so
+ * we need to unlock it here
+ */
+ leaf->node.pending = SKIPLIST_PENDING_DEAD;
+ sl_unlock_node(&leaf->node);
+ sl_erase(list, leaf);
+ skiplist_put_leaf(leaf);
+ } else {
+ sl_unlock_node(&leaf->node);
+ }
+out:
+ rcu_read_unlock();
+ return slot_ret;
+}
+EXPORT_SYMBOL(skiplist_delete);
+
+/*
+ * this deletes an entire leaf, the caller must have some
+ * other way to free all the slots that are linked in. The
+ * leaf must be locked.
+ */
+void skiplist_delete_leaf(struct sl_list *list, struct sl_leaf *leaf,
+ struct sl_leaf **cache_next)
+{
+ struct sl_node *next = NULL;
+
+ rcu_read_lock();
+ assert_locked_node(&leaf->node);
+ leaf->nr = 0;
+
+ BUG_ON(sl_node_inserting(&leaf->node));
+
+ leaf->node.pending = SKIPLIST_PENDING_DEAD;
+
+ if (cache_next) {
+ *cache_next = NULL;
+ next = find_live_next(list, &leaf->node, 0);
+ }
+
+ sl_unlock_node(&leaf->node);
+ sl_erase(list, leaf);
+ /* once for the skiplist */
+ skiplist_put_leaf(leaf);
+ /* once for the caller */
+ skiplist_put_leaf(leaf);
+
+ if (cache_next && next && !sl_node_dead(next)) {
+ int ret;
+
+ leaf = sl_entry(next);
+ ret = skiplist_get_leaf_not_zero(leaf);
+ if (ret)
+ *cache_next = leaf;
+ }
+ rcu_read_unlock();
+}
+EXPORT_SYMBOL(skiplist_delete_leaf);
+
+int sl_init_list(struct sl_list *list, gfp_t mask)
+{
+ list->head = kmalloc(sl_node_size(SKIP_MAXLEVEL), mask);
+ if (!list->head)
+ return -ENOMEM;
+ sl_init_node(list->head, SKIP_MAXLEVEL);
+ list->level = 0;
+ return 0;
+}
+EXPORT_SYMBOL(sl_init_list);
+
+
+static int skiplist_callback(struct notifier_block *nfb,
+ unsigned long action,
+ void *hcpu)
+{
+ int cpu = (long)hcpu;
+ struct skip_preload *skp;
+ struct sl_leaf *l;
+ int level;
+ int i;
+
+ /* Free per-cpu pool of preloaded nodes */
+ if (action == CPU_DEAD || action == CPU_DEAD_FROZEN) {
+ skp = &per_cpu(skip_preloads, cpu);
+ for (i = 0; i < SKIP_MAXLEVEL + 1; i++) {
+ l = skp->preload[i];
+ if (!l)
+ continue;
+ level = l->node.level;
+ kmem_cache_free(slab_caches[level], l);
+ skp->preload[i] = NULL;
+ }
+ }
+ return NOTIFY_OK;
+}
+
+void __init skiplist_init(void)
+{
+ char buffer[16];
+ int i;
+
+ hotcpu_notifier(skiplist_callback, 0);
+ for (i = 0; i <= SKIP_MAXLEVEL; i++) {
+ snprintf(buffer, 16, "skiplist-%d", i);
+ slab_caches[i] = kmem_cache_create(buffer,
+ sl_leaf_size(i), 0,
+ SLAB_RECLAIM_ACCOUNT | SLAB_MEM_SPREAD |
+ SLAB_DESTROY_BY_RCU,
+ NULL);
+ }
+}
+
diff --git a/lib/skiplist_test.c b/lib/skiplist_test.c
new file mode 100644
index 0000000..8f44829
--- /dev/null
+++ b/lib/skiplist_test.c
@@ -0,0 +1,882 @@
+/*
+ * (C) 2013 Fusion-io
+ *
+ * This program is free software; you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software
+ * Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 59 Temple
+ * Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/slab.h>
+#include <linux/module.h>
+#include <linux/sched.h>
+#include <linux/moduleparam.h>
+#include <linux/skiplist.h>
+#include <linux/kthread.h>
+#include <linux/rbtree.h>
+#include <linux/random.h>
+
+static int threads = 1;
+static int rounds = 100;
+static int items = 100;
+static int module_exiting;
+static struct completion startup = COMPLETION_INITIALIZER(startup);
+static DEFINE_MUTEX(fill_mutex);
+static int filled;
+
+static struct timespec *times;
+
+
+#define FILL_TIME_INDEX 0
+#define CHECK_TIME_INDEX 1
+#define DEL_TIME_INDEX 2
+#define RANDOM_INS_INDEX 3
+#define RANDOM_LOOKUP_INDEX 4
+#define FIRST_THREAD_INDEX 5
+
+#define SKIPLIST_RCU_BENCH 1
+#define SKIPLIST_BENCH 2
+#define RBTREE_BENCH 3
+
+static int benchmark = SKIPLIST_RCU_BENCH;
+
+module_param(threads, int, 0);
+module_param(rounds, int, 0);
+module_param(items, int, 0);
+module_param(benchmark, int, 0);
+
+MODULE_PARM_DESC(threads, "number of threads to run");
+MODULE_PARM_DESC(rounds, "how many random operations to run");
+MODULE_PARM_DESC(items, "number of items to fill the list with");
+MODULE_PARM_DESC(benchmark, "benchmark to run 1=skiplist-rcu 2=skiplist-locking 3=rbtree");
+MODULE_LICENSE("GPL");
+
+static atomic_t threads_running = ATOMIC_INIT(0);
+
+/*
+ * since the skiplist code is more concurrent, it is also more likely to
+ * have races into the same slot during our delete/insert bashing run.
+ * This makes counts the number of delete/insert pairs done so we can
+ * make sure the results are roughly accurate
+ */
+static atomic_t pops_done = ATOMIC_INIT(0);
+
+static struct kmem_cache *slot_cache;
+struct sl_list skiplist;
+
+spinlock_t rbtree_lock;
+struct rb_root rb_root = RB_ROOT;
+
+struct rbtree_item {
+ struct rb_node rb_node;
+ unsigned long key;
+ unsigned long size;
+};
+
+static int __insert_one_rbtree(struct rb_root *root, struct rbtree_item *ins)
+{
+ struct rb_node **p = &root->rb_node;
+ struct rb_node *parent = NULL;
+ struct rbtree_item *item;
+ unsigned long key = ins->key;
+ int ret = -EEXIST;
+
+ while (*p) {
+ parent = *p;
+ item = rb_entry(parent, struct rbtree_item, rb_node);
+
+ if (key < item->key)
+ p = &(*p)->rb_left;
+ else if (key >= item->key + item->size)
+ p = &(*p)->rb_right;
+ else
+ goto out;
+ }
+
+ rb_link_node(&ins->rb_node, parent, p);
+ rb_insert_color(&ins->rb_node, root);
+ ret = 0;
+out:
+
+ return ret;
+}
+
+static int insert_one_rbtree(struct rb_root *root, unsigned long key,
+ unsigned long size)
+{
+ int ret;
+ struct rbtree_item *ins;
+
+ ins = kmalloc(sizeof(*ins), GFP_KERNEL);
+ ins->key = key;
+ ins->size = size;
+
+ spin_lock(&rbtree_lock);
+ ret = __insert_one_rbtree(root, ins);
+ spin_unlock(&rbtree_lock);
+
+ if (ret) {
+ kfree(ins);
+ }
+ return ret;
+}
+
+static struct rbtree_item *__lookup_one_rbtree(struct rb_root *root, unsigned long key)
+{
+ struct rb_node *p = root->rb_node;
+ struct rbtree_item *item;
+ struct rbtree_item *ret;
+
+ while (p) {
+ item = rb_entry(p, struct rbtree_item, rb_node);
+
+ if (key < item->key)
+ p = p->rb_left;
+ else if (key >= item->key + item->size)
+ p = p->rb_right;
+ else {
+ ret = item;
+ goto out;
+ }
+ }
+
+ ret = NULL;
+out:
+ return ret;
+}
+
+static struct rbtree_item *__lookup_first_rbtree(struct rb_root *root, unsigned long key)
+{
+ struct rb_node *p = root->rb_node;
+ struct rbtree_item *item;
+ struct rbtree_item *ret;
+
+ while (p) {
+ item = rb_entry(p, struct rbtree_item, rb_node);
+
+ if (key < item->key)
+ p = p->rb_left;
+ else if (key >= item->key + item->size)
+ p = p->rb_right;
+ else {
+ ret = item;
+ goto out;
+ }
+ }
+
+ if (p)
+ ret = rb_entry(p, struct rbtree_item, rb_node);
+ else
+ ret = NULL;
+out:
+ return ret;
+}
+
+static int lookup_one_rbtree(struct rb_root *root, unsigned long key)
+{
+ struct rbtree_item *item;
+ int ret;
+
+ spin_lock(&rbtree_lock);
+ item = __lookup_one_rbtree(root, key);
+ if (item)
+ ret = 0;
+ else
+ ret = -ENOENT;
+ spin_unlock(&rbtree_lock);
+
+ return ret;
+}
+
+static int delete_many_rbtree(struct rb_root *root, unsigned long key)
+{
+ int ret = 0;
+ struct rbtree_item *item;
+ struct rbtree_item **victims;
+ struct rb_node *next;
+ int nr_victims = 128;
+ int found = 0;
+ int i;
+
+ nr_victims = min(nr_victims, items / 2);
+
+ victims = kzalloc(nr_victims * sizeof(victims[0]), GFP_KERNEL);
+ spin_lock(&rbtree_lock);
+ item = __lookup_first_rbtree(root, key);
+ if (!item) {
+ spin_unlock(&rbtree_lock);
+ goto out;
+ }
+
+ while (found < nr_victims) {
+ victims[found] = item;
+ next = rb_next(&item->rb_node);
+ rb_erase(&item->rb_node, root);
+ found++;
+ if (!next)
+ break;
+ item = rb_entry(next, struct rbtree_item, rb_node);
+ }
+ spin_unlock(&rbtree_lock);
+
+ for (i = 0; i < found; i++) {
+ item = victims[i];
+ spin_lock(&rbtree_lock);
+ ret = __insert_one_rbtree(root, item);
+ if (ret) {
+ printk(KERN_CRIT "delete_many unable to insert %lu\n", key);
+ kfree(item);
+ }
+ spin_unlock(&rbtree_lock);
+ }
+out:
+ atomic_add(found, &pops_done);
+ kfree(victims);
+ return ret;
+
+}
+
+static int delete_one_rbtree(struct rb_root *root, unsigned long key)
+{
+ int ret = -ENOENT;
+ struct rbtree_item *item;
+
+ spin_lock(&rbtree_lock);
+ item = __lookup_first_rbtree(root, key);
+ if (!item)
+ goto out;
+
+ rb_erase(&item->rb_node, root);
+
+ ret = __insert_one_rbtree(root, item);
+ if (ret) {
+ printk(KERN_CRIT "delete_one unable to insert %lu\n", key);
+ goto out;
+ }
+ ret = 0;
+out:
+ spin_unlock(&rbtree_lock);
+ atomic_inc(&pops_done);
+ return ret;
+
+}
+
+static int run_initial_fill_rbtree(void)
+{
+ unsigned long i;
+ unsigned long key;
+ int ret;
+ int inserted = 0;
+
+ for (i = 0; i < items; i++) {
+ key = i * 4096;
+ ret = insert_one_rbtree(&rb_root, key, 4096);
+ if (ret)
+ return ret;
+ inserted++;
+ }
+ printk("rbtree inserted %d items\n", inserted);
+ return 0;
+}
+
+static unsigned long tester_random(void)
+{
+ return (prandom_u32() % items) * 4096;
+}
+
+static void random_insert_rbtree(void)
+{
+ unsigned long i;
+ unsigned long key;
+ int ret;
+ int inserted = 0;
+
+ for (i = 0; i < items; i++) {
+ key = tester_random();
+ ret = insert_one_rbtree(&rb_root, key, 4096);
+ if (!ret)
+ inserted++;
+ }
+ printk("rbtree radomly inserted %d items\n", inserted);
+}
+
+static void random_lookup_rbtree(void)
+{
+ int i;
+ unsigned long key;
+ int ret;
+ int found = 0;
+
+ for (i = 0; i < items; i++) {
+ key = tester_random();
+ ret = lookup_one_rbtree(&rb_root, key);
+ if (!ret)
+ found++;
+ }
+ printk("rbtree randomly searched %d items\n", found);
+}
+
+static void check_post_work_rbtree(void)
+{
+ unsigned long key = 0;
+ int errors = 0;
+ struct rb_node *p = rb_first(&rb_root);
+ struct rbtree_item *item;
+
+ while (p) {
+ item = rb_entry(p, struct rbtree_item, rb_node);
+ if (item->key != key) {
+ if (!errors)
+ printk("rbtree failed to find key %lu\n", key);
+ errors++;
+ }
+ key += 4096;
+ p = rb_next(p);
+ }
+ printk(KERN_CRIT "rbtree check found %d errors\n", errors);
+}
+
+static void delete_all_items_rbtree(int check)
+{
+ unsigned long key = 0;
+ int errors = 0;
+ struct rb_node *p = rb_first(&rb_root);
+ struct rb_node *next;
+ struct rbtree_item *item;
+
+ while (p) {
+ item = rb_entry(p, struct rbtree_item, rb_node);
+ if (check && item->key != key) {
+ if (!errors)
+ printk("rbtree failed to find key %lu\n", key);
+ errors++;
+ }
+ key += 4096;
+ next = rb_next(p);
+ rb_erase(p, &rb_root);
+ kfree(item);
+ p = next;
+ }
+ printk(KERN_CRIT "rbtree deletion found %d errors\n", errors);
+}
+
+static void put_slot(struct sl_slot *slot)
+{
+ if (atomic_dec_and_test(&slot->refs))
+ kmem_cache_free(slot_cache, slot);
+
+}
+static int insert_one_skiplist(struct sl_list *skiplist, unsigned long key,
+ unsigned long size, struct sl_leaf **cache)
+{
+ int ret;
+ int preload_token;
+ struct sl_slot *slot;
+
+ slot = kmem_cache_alloc(slot_cache, GFP_KERNEL);
+ if (!slot)
+ return -ENOMEM;
+
+ slot->key = key;
+ slot->size = size;
+ atomic_set(&slot->refs, 0);
+
+ preload_token = skiplist_preload(skiplist, GFP_KERNEL);
+ if (preload_token < 0) {
+ ret = preload_token;
+ goto out;
+ }
+
+ ret = skiplist_insert(skiplist, slot, preload_token, cache);
+ preempt_enable();
+
+out:
+ if (ret)
+ kmem_cache_free(slot_cache, slot);
+
+ return ret;
+}
+
+static int run_initial_fill_skiplist(void)
+{
+ unsigned long i;
+ unsigned long key;
+ int ret = 0;
+ int inserted = 0;
+ struct sl_leaf *cache = NULL;
+
+ sl_init_list(&skiplist, GFP_KERNEL);
+
+ for (i = 0; i < items; i++) {
+ key = i * 4096;
+ ret = insert_one_skiplist(&skiplist, key, 4096, &cache);
+ if (ret)
+ break;
+ inserted++;
+ }
+ if (cache)
+ skiplist_put_leaf(cache);
+
+ printk("skiplist inserted %d items\n", inserted);
+ return ret;
+}
+
+static void check_post_work_skiplist(void)
+{
+ unsigned long i;
+ unsigned long key = 0;
+ struct sl_slot *slot;
+ struct sl_leaf *leaf;
+ struct sl_leaf *next;
+ int errors = 0;
+ unsigned long found = 0;
+
+ leaf = skiplist_first_leaf(&skiplist);
+ while (leaf) {
+ for (i = 0; i < leaf->nr; i++) {
+ slot = leaf->ptrs[i];
+ if (slot->key != key) {
+ if (errors == 0)
+ printk("key mismatch wanted %lu found %lu\n", key, slot->key);
+ errors++;
+ }
+ key += 4096;
+ }
+ found += leaf->nr;
+ next = skiplist_next_leaf(&skiplist, leaf);
+ skiplist_unlock_leaf(leaf);
+ skiplist_put_leaf(leaf);
+ leaf = next;
+ }
+ if (found != items)
+ printk("skiplist check only found %lu items\n", found);
+ printk(KERN_CRIT "skiplist check found %lu items with %d errors\n", found, errors);
+}
+
+static void delete_all_items_skiplist(int check)
+{
+ unsigned long i;
+ unsigned long key = 0;
+ struct sl_slot *slot;
+ struct sl_leaf *leaf;
+ int errors = 0;
+ unsigned long slots_empty = 0;
+ unsigned long total_leaves = 0;
+ unsigned long found = 0;
+
+ /*
+ * the benchmark is done at this point, so we can safely
+ * just free all the items. In the real world you should
+ * not free anything until the leaf is fully removed from
+ * the tree.
+ */
+ while (1) {
+ leaf = skiplist_first_leaf(&skiplist);
+ if (!leaf)
+ break;
+ for (i = 0; i < leaf->nr; i++) {
+ slot = leaf->ptrs[i];
+ if (check && slot->key != key) {
+ if (errors == 0)
+ printk("delete key mismatch wanted %lu found %lu\n", key, slot->key);
+ errors++;
+ }
+ key += 4096;
+ /* delete the one ref from the skiplist */
+ put_slot(slot);
+ }
+ found += leaf->nr;
+ slots_empty += SKIP_KEYS_PER_NODE - leaf->nr;
+ total_leaves++;
+ skiplist_delete_leaf(&skiplist, leaf, NULL);
+ }
+ printk(KERN_CRIT "skiplist delete found %lu items with %d errors %lu empty slots %lu leaves\n", found, errors, slots_empty, total_leaves);
+}
+
+static int lookup_one_skiplist(struct sl_list *skiplist, unsigned long key)
+{
+ int ret = 0;
+ struct sl_slot *slot = NULL;
+
+ if (benchmark == SKIPLIST_RCU_BENCH) {
+ slot = skiplist_lookup_rcu(skiplist, key, 4096);
+ } else if (benchmark == SKIPLIST_BENCH) {
+ slot = skiplist_lookup(skiplist, key, 4096);
+ if (slot)
+ put_slot(slot);
+ }
+ if (!slot)
+ ret = -ENOENT;
+ return ret;
+}
+
+static int delete_one_skiplist(struct sl_list *skiplist, unsigned long key)
+{
+ int ret = 0;
+ struct sl_slot *slot = NULL;
+ int preload_token;
+
+ slot = skiplist_delete(skiplist, key, 1);
+ if (!slot)
+ return -ENOENT;
+
+ preload_token = skiplist_preload(skiplist, GFP_KERNEL);
+ if (preload_token < 0) {
+ ret = preload_token;
+ goto out_no_preempt;
+ }
+
+ ret = skiplist_insert(skiplist, slot, preload_token, NULL);
+ if (ret) {
+ printk(KERN_CRIT "failed to insert key %lu ret %d\n", key, ret);
+ goto out;
+ }
+ put_slot(slot);
+ ret = 0;
+ atomic_inc(&pops_done);
+out:
+ preempt_enable();
+out_no_preempt:
+ return ret;
+}
+
+static int delete_many_skiplist(struct sl_list *skiplist, unsigned long key)
+{
+ int ret = 0;
+ int preload_token;
+ struct sl_slot **victims;
+ struct sl_leaf *leaf;
+ struct sl_leaf *next = NULL;
+ int nr_victims = 128;
+ int found = 0;
+ int i;
+ struct sl_leaf *cache = NULL;
+
+ nr_victims = min(nr_victims, items / 2);
+
+ victims = kzalloc(nr_victims * sizeof(victims[0]), GFP_KERNEL);
+ /*
+ * this is intentionally deleting adjacent items to empty
+ * skiplist leaves. The goal is to find races between
+ * leaf deletion and the rest of the code
+ */
+ leaf = skiplist_lookup_first_leaf(skiplist, key, 1);
+ while (leaf) {
+ memcpy(victims + found, leaf->ptrs, sizeof(victims[0]) * leaf->nr);
+ found += leaf->nr;
+
+ skiplist_delete_leaf(skiplist, leaf, &next);
+ leaf = next;
+ if (!leaf)
+ break;
+
+ if (leaf->nr + found > nr_victims) {
+ skiplist_put_leaf(leaf);
+ break;
+ }
+
+ skiplist_wait_pending_insert(&leaf->node);
+
+ skiplist_lock_leaf(leaf);
+ if (sl_node_dead(&leaf->node) ||
+ leaf->nr + found > nr_victims) {
+ skiplist_put_leaf(leaf);
+ skiplist_unlock_leaf(leaf);
+ break;
+ }
+ }
+
+ for (i = 0; i < found; i++) {
+ preload_token = skiplist_preload(skiplist, GFP_KERNEL);
+ if (preload_token < 0) {
+ ret = preload_token;
+ goto out;
+ }
+
+ ret = skiplist_insert(skiplist, victims[i], preload_token, &cache);
+ if (ret) {
+ printk(KERN_CRIT "failed to insert key %lu ret %d\n", key, ret);
+ preempt_enable();
+ goto out;
+ }
+ /* insert added an extra ref, take it away here */
+ put_slot(victims[i]);
+ ret = 0;
+ preempt_enable();
+ }
+out:
+ if (cache)
+ skiplist_put_leaf(cache);
+
+ atomic_add(found, &pops_done);
+ kfree(victims);
+ return ret;
+}
+
+static void random_lookup_skiplist(void)
+{
+ int i;
+ unsigned long key;
+ int ret;
+ int found = 0;
+
+ for (i = 0; i < items; i++) {
+ key = tester_random();
+ ret = lookup_one_skiplist(&skiplist, key);
+ if (!ret)
+ found++;
+ }
+ printk("skiplist randomly searched %d items\n", found);
+}
+
+static void random_insert_skiplist(void)
+{
+ int i;
+ unsigned long key;
+ int ret;
+ int inserted = 0;
+
+ for (i = 0; i < items; i++) {
+ key = tester_random();
+ ret = insert_one_skiplist(&skiplist, key, 4096, NULL);
+ if (!ret)
+ inserted++;
+ }
+ printk("skiplist randomly inserted %d items\n", inserted);
+}
+
+void tvsub(struct timespec * tdiff, struct timespec * t1, struct timespec * t0)
+{
+ tdiff->tv_sec = t1->tv_sec - t0->tv_sec;
+ tdiff->tv_nsec = t1->tv_nsec - t0->tv_nsec;
+ if (tdiff->tv_nsec < 0 && tdiff->tv_sec > 0) {
+ tdiff->tv_sec--;
+ tdiff->tv_nsec += 1000000000ULL;
+ }
+
+ /* time shouldn't go backwards!!! */
+ if (tdiff->tv_nsec < 0 || t1->tv_sec < t0->tv_sec) {
+ tdiff->tv_sec = 0;
+ tdiff->tv_nsec = 0;
+ }
+}
+
+static void pretty_time(struct timespec *ts, unsigned long long *seconds, unsigned long long *ms)
+{
+ unsigned long long m;
+
+ *seconds = ts->tv_sec;
+
+ m = ts->tv_nsec / 1000000ULL;
+ *ms = m;
+}
+
+static void runbench(int thread_index)
+{
+ int ret = 0;
+ unsigned long i;
+ unsigned long key;
+ struct timespec start;
+ struct timespec cur;
+ unsigned long long sec;
+ unsigned long long ms;
+ char *tag = "skiplist-rcu";
+
+ if (benchmark == SKIPLIST_BENCH)
+ tag = "skiplist-locking";
+ else if (benchmark == RBTREE_BENCH)
+ tag = "rbtree";
+
+ mutex_lock(&fill_mutex);
+
+ if (filled == 0) {
+ start = current_kernel_time();
+
+ printk(KERN_CRIT "Running %s benchmark\n", tag);
+
+ if (benchmark == SKIPLIST_RCU_BENCH || benchmark == SKIPLIST_BENCH)
+ ret = run_initial_fill_skiplist();
+ else if (benchmark == RBTREE_BENCH)
+ ret = run_initial_fill_rbtree();
+
+ if (ret < 0) {
+ printk(KERN_CRIT "failed to setup initial tree ret %d\n", ret);
+ filled = ret;
+ } else {
+ filled = 1;
+ }
+ cur = current_kernel_time();
+ tvsub(times + FILL_TIME_INDEX, &cur, &start);
+ printk("initial fill done\n");
+ }
+
+ mutex_unlock(&fill_mutex);
+ if (filled < 0)
+ return;
+
+ start = current_kernel_time();
+
+ for (i = 0; i < rounds; i++) {
+ key = tester_random();
+
+ if (benchmark == SKIPLIST_RCU_BENCH || benchmark == SKIPLIST_BENCH)
+ ret = lookup_one_skiplist(&skiplist, key);
+ else if (benchmark == RBTREE_BENCH)
+ ret = lookup_one_rbtree(&rb_root, key);
+
+ cond_resched();
+
+ key = tester_random();
+
+ if (benchmark == SKIPLIST_RCU_BENCH || benchmark == SKIPLIST_BENCH)
+ ret = delete_many_skiplist(&skiplist, key);
+ else if (benchmark == RBTREE_BENCH)
+ ret = delete_many_rbtree(&rb_root, key);
+
+ cond_resched();
+
+ key = tester_random();
+
+ if (benchmark == SKIPLIST_RCU_BENCH || benchmark == SKIPLIST_BENCH)
+ ret = delete_one_skiplist(&skiplist, key);
+ else if (benchmark == RBTREE_BENCH)
+ ret = delete_one_rbtree(&rb_root, key);
+
+ cond_resched();
+ }
+
+ cur = current_kernel_time();
+ tvsub(times + FIRST_THREAD_INDEX + thread_index, &cur, &start);
+
+ if (!atomic_dec_and_test(&threads_running)) {
+ return;
+ }
+
+ start = current_kernel_time();
+ if (benchmark == SKIPLIST_RCU_BENCH || benchmark == SKIPLIST_BENCH)
+ check_post_work_skiplist();
+ else if (benchmark == RBTREE_BENCH)
+ check_post_work_rbtree();
+
+ cur = current_kernel_time();
+
+ tvsub(times + CHECK_TIME_INDEX, &cur, &start);
+
+ start = current_kernel_time();
+ if (benchmark == SKIPLIST_RCU_BENCH || benchmark == SKIPLIST_BENCH)
+ delete_all_items_skiplist(1);
+ else if (benchmark == RBTREE_BENCH)
+ delete_all_items_rbtree(1);
+ cur = current_kernel_time();
+
+ tvsub(times + DEL_TIME_INDEX, &cur, &start);
+
+ start = current_kernel_time();
+ if (benchmark == SKIPLIST_RCU_BENCH || benchmark == SKIPLIST_BENCH) {
+ random_insert_skiplist();
+ } else if (benchmark == RBTREE_BENCH) {
+ random_insert_rbtree();
+ }
+ cur = current_kernel_time();
+
+ tvsub(times + RANDOM_INS_INDEX, &cur, &start);
+
+ start = current_kernel_time();
+ if (benchmark == SKIPLIST_RCU_BENCH || benchmark == SKIPLIST_BENCH) {
+ random_lookup_skiplist();
+ } else if (benchmark == RBTREE_BENCH) {
+ random_lookup_rbtree();
+ }
+ cur = current_kernel_time();
+
+ tvsub(times + RANDOM_LOOKUP_INDEX, &cur, &start);
+
+ if (benchmark == SKIPLIST_RCU_BENCH || benchmark == SKIPLIST_BENCH) {
+ delete_all_items_skiplist(0);
+ } else if (benchmark == RBTREE_BENCH) {
+ delete_all_items_rbtree(0);
+ }
+
+ pretty_time(×[FILL_TIME_INDEX], &sec, &ms);
+ printk("%s fill time %llu s %llu ms\n", tag, sec, ms);
+ pretty_time(×[CHECK_TIME_INDEX], &sec, &ms);
+ printk("%s check time %llu s %llu ms\n", tag, sec, ms);
+ pretty_time(×[DEL_TIME_INDEX], &sec, &ms);
+ printk("%s del time %llu s %llu ms \n", tag, sec, ms);
+ pretty_time(×[RANDOM_INS_INDEX], &sec, &ms);
+ printk("%s random insert time %llu s %llu ms \n", tag, sec, ms);
+ pretty_time(×[RANDOM_LOOKUP_INDEX], &sec, &ms);
+ printk("%s random lookup time %llu s %llu ms \n", tag, sec, ms);
+ for (i = 0; i < threads; i++) {
+ pretty_time(×[FIRST_THREAD_INDEX + i], &sec, &ms);
+ printk("%s thread %lu time %llu s %llu ms\n", tag, i, sec, ms);
+ }
+
+ printk("worker thread pops done %d\n", atomic_read(&pops_done));
+
+ kfree(times);
+}
+
+static int skiptest_thread(void *index)
+{
+ unsigned long thread_index = (unsigned long)index;
+ complete(&startup);
+ runbench(thread_index);
+ complete(&startup);
+ return 0;
+}
+
+static int __init skiptest_init(void)
+{
+ unsigned long i;
+ init_completion(&startup);
+ slot_cache = kmem_cache_create("skiplist_slot", sizeof(struct sl_slot), 0,
+ SLAB_RECLAIM_ACCOUNT | SLAB_MEM_SPREAD |
+ SLAB_DESTROY_BY_RCU, NULL);
+
+ if (!slot_cache)
+ return -ENOMEM;
+
+ spin_lock_init(&rbtree_lock);
+
+ printk("skiptest benchmark module (%d threads) (%d items) (%d rounds)\n", threads, items, rounds);
+
+ times = kmalloc(sizeof(times[0]) * (threads + FIRST_THREAD_INDEX),
+ GFP_KERNEL);
+
+ atomic_set(&threads_running, threads);
+ for (i = 0; i < threads; i++) {
+ kthread_run(skiptest_thread, (void *)i, "skiptest_thread");
+ }
+ for (i = 0; i < threads; i++)
+ wait_for_completion(&startup);
+ return 0;
+}
+
+static void __exit skiptest_exit(void)
+{
+ int i;
+ module_exiting = 1;
+
+ for (i = 0; i < threads; i++) {
+ wait_for_completion(&startup);
+ }
+
+ synchronize_rcu();
+ kmem_cache_destroy(slot_cache);
+ printk("all skiptest threads done\n");
+ return;
+}
+
+module_init(skiptest_init);
+module_exit(skiptest_exit);
--
1.8.2
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [PATCH RFC 2/2] Switch the IOMMU over to the skiplists
2013-06-16 14:56 [PATCH RFC 0/2] rcu skiplists v2 Chris Mason
2013-06-16 14:57 ` [PATCH RFC 1/2] Skiplist: rcu range index Chris Mason
@ 2013-06-16 14:58 ` Chris Mason
2013-06-26 23:02 ` [PATCH RFC 0/2] rcu skiplists v2 Mathieu Desnoyers
2 siblings, 0 replies; 12+ messages in thread
From: Chris Mason @ 2013-06-16 14:58 UTC (permalink / raw)
To: Chris Mason, Mathieu Desnoyers
Cc: Linux FS Devel, David Woodhouse, dchinner@redhat.com,
bo.li.liu@oracle.com, rp@svcs.cs.pdx.edu, Paul E. McKenney,
Lai Jiangshan, Stephen Hemminger, Alan Stern
This is for testing only and should never be used
Signed-off-by: Chris Mason <chris.mason@fusionio.com>
---
drivers/iommu/intel-iommu.c | 4 -
drivers/iommu/iova.c | 466 ++++++++++++++++++++------------------------
include/linux/iova.h | 8 +-
3 files changed, 210 insertions(+), 268 deletions(-)
diff --git a/drivers/iommu/intel-iommu.c b/drivers/iommu/intel-iommu.c
index 0099667..c7c971b 100644
--- a/drivers/iommu/intel-iommu.c
+++ b/drivers/iommu/intel-iommu.c
@@ -1403,7 +1403,6 @@ static void iommu_detach_domain(struct dmar_domain *domain,
}
static struct iova_domain reserved_iova_list;
-static struct lock_class_key reserved_rbtree_key;
static int dmar_init_reserved_ranges(void)
{
@@ -1413,9 +1412,6 @@ static int dmar_init_reserved_ranges(void)
init_iova_domain(&reserved_iova_list, DMA_32BIT_PFN);
- lockdep_set_class(&reserved_iova_list.iova_rbtree_lock,
- &reserved_rbtree_key);
-
/* IOAPIC ranges shouldn't be accessed by DMA */
iova = reserve_iova(&reserved_iova_list, IOVA_PFN(IOAPIC_RANGE_START),
IOVA_PFN(IOAPIC_RANGE_END));
diff --git a/drivers/iommu/iova.c b/drivers/iommu/iova.c
index 67da6cff..08ab0d8 100644
--- a/drivers/iommu/iova.c
+++ b/drivers/iommu/iova.c
@@ -18,182 +18,113 @@
*/
#include <linux/iova.h>
+#include <linux/random.h>
+#define IOVA_PENDING_INIT ((unsigned long) -1)
void
init_iova_domain(struct iova_domain *iovad, unsigned long pfn_32bit)
{
- spin_lock_init(&iovad->iova_rbtree_lock);
- iovad->rbroot = RB_ROOT;
- iovad->cached32_node = NULL;
+ sl_init_list(&iovad->skiplist, GFP_ATOMIC);
iovad->dma_32bit_pfn = pfn_32bit;
}
-static struct rb_node *
-__get_cached_rbnode(struct iova_domain *iovad, unsigned long *limit_pfn)
+static unsigned long
+__get_cached_addr(struct iova_domain *iovad, unsigned long limit_pfn)
{
- if ((*limit_pfn != iovad->dma_32bit_pfn) ||
- (iovad->cached32_node == NULL))
- return rb_last(&iovad->rbroot);
- else {
- struct rb_node *prev_node = rb_prev(iovad->cached32_node);
- struct iova *curr_iova =
- container_of(iovad->cached32_node, struct iova, node);
- *limit_pfn = curr_iova->pfn_lo - 1;
- return prev_node;
- }
-}
-
-static void
-__cached_rbnode_insert_update(struct iova_domain *iovad,
- unsigned long limit_pfn, struct iova *new)
-{
- if (limit_pfn != iovad->dma_32bit_pfn)
- return;
- iovad->cached32_node = &new->node;
-}
+ unsigned long guess;
-static void
-__cached_rbnode_delete_update(struct iova_domain *iovad, struct iova *free)
-{
- struct iova *cached_iova;
- struct rb_node *curr;
+ prandom_bytes(&guess, sizeof(guess));
- if (!iovad->cached32_node)
- return;
- curr = iovad->cached32_node;
- cached_iova = container_of(curr, struct iova, node);
-
- if (free->pfn_lo >= cached_iova->pfn_lo) {
- struct rb_node *node = rb_next(&free->node);
- struct iova *iova = container_of(node, struct iova, node);
-
- /* only cache if it's below 32bit pfn */
- if (node && iova->pfn_lo < iovad->dma_32bit_pfn)
- iovad->cached32_node = node;
- else
- iovad->cached32_node = NULL;
+ /* the skiplist code is fastest when we spread out the
+ * key range as much as possible. Instead of caching the
+ * last freed or allocated number, return random guesses
+ * in hopes of fanning out our locking attempts.
+ */
+ if (limit_pfn == iovad->dma_32bit_pfn) {
+ guess = guess % iovad->dma_32bit_pfn;
+ guess = max_t(unsigned long, guess, IOVA_START_PFN);
+ return guess;
+ } else {
+ guess = max_t(unsigned long, guess, IOVA_START_PFN);
+ guess = min_t(unsigned long, guess, (~0UL) >> 1);
+ return guess;
}
}
-/* Computes the padding size required, to make the
- * the start address naturally aligned on its size
- */
-static int
-iova_get_pad_size(int size, unsigned int limit_pfn)
-{
- unsigned int pad_size = 0;
- unsigned int order = ilog2(size);
-
- if (order)
- pad_size = (limit_pfn + 1) % (1 << order);
-
- return pad_size;
-}
-
static int __alloc_and_insert_iova_range(struct iova_domain *iovad,
unsigned long size, unsigned long limit_pfn,
struct iova *new, bool size_aligned)
{
- struct rb_node *prev, *curr = NULL;
unsigned long flags;
- unsigned long saved_pfn;
- unsigned int pad_size = 0;
-
- /* Walk the tree backwards */
- spin_lock_irqsave(&iovad->iova_rbtree_lock, flags);
- saved_pfn = limit_pfn;
- curr = __get_cached_rbnode(iovad, &limit_pfn);
- prev = curr;
- while (curr) {
- struct iova *curr_iova = container_of(curr, struct iova, node);
-
- if (limit_pfn < curr_iova->pfn_lo)
- goto move_left;
- else if (limit_pfn < curr_iova->pfn_hi)
- goto adjust_limit_pfn;
- else {
- if (size_aligned)
- pad_size = iova_get_pad_size(size, limit_pfn);
- if ((curr_iova->pfn_hi + size + pad_size) <= limit_pfn)
- break; /* found a free slot */
- }
-adjust_limit_pfn:
- limit_pfn = curr_iova->pfn_lo - 1;
-move_left:
- prev = curr;
- curr = rb_prev(curr);
- }
+ unsigned long align = 1;
+ unsigned long hint;
- if (!curr) {
- if (size_aligned)
- pad_size = iova_get_pad_size(size, limit_pfn);
- if ((IOVA_START_PFN + size + pad_size) > limit_pfn) {
- spin_unlock_irqrestore(&iovad->iova_rbtree_lock, flags);
- return -ENOMEM;
- }
- }
+ int ret;
- /* pfn_lo will point to size aligned address if size_aligned is set */
- new->pfn_lo = limit_pfn - (size + pad_size) + 1;
- new->pfn_hi = new->pfn_lo + size - 1;
-
- /* Insert the new_iova into domain rbtree by holding writer lock */
- /* Add new node and rebalance tree. */
- {
- struct rb_node **entry, *parent = NULL;
-
- /* If we have 'prev', it's a valid place to start the
- insertion. Otherwise, start from the root. */
- if (prev)
- entry = &prev;
- else
- entry = &iovad->rbroot.rb_node;
-
- /* Figure out where to put new node */
- while (*entry) {
- struct iova *this = container_of(*entry,
- struct iova, node);
- parent = *entry;
-
- if (new->pfn_lo < this->pfn_lo)
- entry = &((*entry)->rb_left);
- else if (new->pfn_lo > this->pfn_lo)
- entry = &((*entry)->rb_right);
- else
- BUG(); /* this should not happen */
- }
+ if (size_aligned)
+ align = size;
- /* Add new node and rebalance tree. */
- rb_link_node(&new->node, parent, entry);
- rb_insert_color(&new->node, &iovad->rbroot);
+ /*
+ * make sure that a lockless search into the tree
+ * understands we're still doing setup on this iova
+ */
+ new->pfn_lo = IOVA_PENDING_INIT;
+ new->pfn_hi = IOVA_PENDING_INIT;
+ smp_wmb();
+again:
+ local_irq_save(flags);
+ hint = __get_cached_addr(iovad, limit_pfn);
+ atomic_set(&new->slot.refs, 0);
+ ret = skiplist_insert_hole(&iovad->skiplist,
+ hint,
+ limit_pfn, size, align,
+ &new->slot, GFP_ATOMIC);
+ /*
+ * insert hole returns -eagain when it found a good
+ * spot but someone raced in and stole it. If
+ * that happens pick a new hint and try again
+ */
+ if (ret == -EAGAIN) {
+ local_irq_restore(flags);
+ goto again;
}
- __cached_rbnode_insert_update(iovad, saved_pfn, new);
- spin_unlock_irqrestore(&iovad->iova_rbtree_lock, flags);
+ /* we're fully inserted, set our lo/hi */
+ new->pfn_lo = new->slot.key;
+ new->pfn_hi = new->slot.key + new->slot.size - 1;
+ smp_wmb();
+ local_irq_restore(flags);
+ if (ret)
+ return -ENOMEM;
return 0;
}
-static void
-iova_insert_rbtree(struct rb_root *root, struct iova *iova)
+static int
+iova_insert_skiplist(struct sl_list *skiplist, struct iova *iova)
{
- struct rb_node **new = &(root->rb_node), *parent = NULL;
- /* Figure out where to put new node */
- while (*new) {
- struct iova *this = container_of(*new, struct iova, node);
- parent = *new;
-
- if (iova->pfn_lo < this->pfn_lo)
- new = &((*new)->rb_left);
- else if (iova->pfn_lo > this->pfn_lo)
- new = &((*new)->rb_right);
- else
- BUG(); /* this should not happen */
+ int ret;
+ int preload_token;
+ unsigned long flags;
+
+ local_irq_save(flags);
+ preload_token = skiplist_preload(skiplist, GFP_ATOMIC);
+ if (preload_token < 0) {
+ ret = preload_token;
+ local_irq_restore(flags);
+ goto out;
}
- /* Add new node and rebalance tree. */
- rb_link_node(&iova->node, parent, new);
- rb_insert_color(&iova->node, root);
+
+ iova->slot.key = iova->pfn_lo;
+ iova->slot.size = iova->pfn_hi - iova->pfn_lo + 1;
+ atomic_set(&iova->slot.refs, 0);
+
+ ret = skiplist_insert(skiplist, &iova->slot, preload_token, NULL);
+ local_irq_restore(flags);
+ preempt_enable();
+out:
+ return ret;
}
/**
@@ -245,53 +176,24 @@ alloc_iova(struct iova_domain *iovad, unsigned long size,
*/
struct iova *find_iova(struct iova_domain *iovad, unsigned long pfn)
{
+ struct sl_slot *slot;
unsigned long flags;
- struct rb_node *node;
-
- /* Take the lock so that no other thread is manipulating the rbtree */
- spin_lock_irqsave(&iovad->iova_rbtree_lock, flags);
- node = iovad->rbroot.rb_node;
- while (node) {
- struct iova *iova = container_of(node, struct iova, node);
-
- /* If pfn falls within iova's range, return iova */
- if ((pfn >= iova->pfn_lo) && (pfn <= iova->pfn_hi)) {
- spin_unlock_irqrestore(&iovad->iova_rbtree_lock, flags);
- /* We are not holding the lock while this iova
- * is referenced by the caller as the same thread
- * which called this function also calls __free_iova()
- * and it is by design that only one thread can possibly
- * reference a particular iova and hence no conflict.
- */
- return iova;
- }
+ struct iova *iova;
- if (pfn < iova->pfn_lo)
- node = node->rb_left;
- else if (pfn > iova->pfn_lo)
- node = node->rb_right;
+ local_irq_save(flags);
+ slot = skiplist_lookup(&iovad->skiplist, pfn, 1);
+ if (!slot) {
+ local_irq_restore(flags);
+ return NULL;
}
-
- spin_unlock_irqrestore(&iovad->iova_rbtree_lock, flags);
- return NULL;
-}
-
-/**
- * __free_iova - frees the given iova
- * @iovad: iova domain in question.
- * @iova: iova in question.
- * Frees the given iova belonging to the giving domain
- */
-void
-__free_iova(struct iova_domain *iovad, struct iova *iova)
-{
- unsigned long flags;
-
- spin_lock_irqsave(&iovad->iova_rbtree_lock, flags);
- __cached_rbnode_delete_update(iovad, iova);
- rb_erase(&iova->node, &iovad->rbroot);
- spin_unlock_irqrestore(&iovad->iova_rbtree_lock, flags);
- free_iova_mem(iova);
+ iova = sl_slot_entry(slot, struct iova, slot);
+ while (iova->pfn_lo == IOVA_PENDING_INIT ||
+ iova->pfn_hi == IOVA_PENDING_INIT) {
+ cpu_relax();
+ smp_rmb();
+ }
+ local_irq_restore(flags);
+ return iova;
}
/**
@@ -304,12 +206,29 @@ __free_iova(struct iova_domain *iovad, struct iova *iova)
void
free_iova(struct iova_domain *iovad, unsigned long pfn)
{
- struct iova *iova = find_iova(iovad, pfn);
- if (iova)
- __free_iova(iovad, iova);
+ struct iova *iova;
+ struct sl_slot *slot;
+ unsigned long flags;
+
+ local_irq_save(flags);
+ slot = skiplist_delete(&iovad->skiplist, pfn, 1);
+ local_irq_restore(flags);
+
+ if (!slot)
+ return;
+ iova = sl_slot_entry(slot, struct iova, slot);
+ free_iova_mem(iova);
}
+void
+__free_iova(struct iova_domain *iovad, struct iova *iova)
+{
+ unsigned long pfn = iova->pfn_lo;
+ free_iova(iovad, pfn);
+}
+
+
/**
* put_iova_domain - destroys the iova doamin
* @iovad: - iova domain in question.
@@ -317,29 +236,35 @@ free_iova(struct iova_domain *iovad, unsigned long pfn)
*/
void put_iova_domain(struct iova_domain *iovad)
{
- struct rb_node *node;
+ struct sl_list *skiplist = &iovad->skiplist;
+ struct sl_node *p;
+ struct sl_leaf *leaf;
unsigned long flags;
+ struct iova *iova;
+ struct sl_slot *slot;
+ int i;
- spin_lock_irqsave(&iovad->iova_rbtree_lock, flags);
- node = rb_first(&iovad->rbroot);
- while (node) {
- struct iova *iova = container_of(node, struct iova, node);
- rb_erase(node, &iovad->rbroot);
- free_iova_mem(iova);
- node = rb_first(&iovad->rbroot);
+ /*
+ * the skiplist code needs some helpers for iteration. For now
+ * roll our own
+ */
+ local_irq_save(flags);
+ sl_lock_node(skiplist->head);
+ p = skiplist->head->ptrs[0].next;
+ while (p) {
+ leaf = sl_entry(p);
+ for (i = 0; i < leaf->nr; i++) {
+ slot = leaf->ptrs[i];
+ iova = sl_slot_entry(slot, struct iova, slot);
+ free_iova_mem(iova);
+ }
+ p = leaf->node.ptrs[0].next;
+ skiplist_put_leaf(leaf);
}
- spin_unlock_irqrestore(&iovad->iova_rbtree_lock, flags);
-}
-
-static int
-__is_range_overlap(struct rb_node *node,
- unsigned long pfn_lo, unsigned long pfn_hi)
-{
- struct iova *iova = container_of(node, struct iova, node);
-
- if ((pfn_lo <= iova->pfn_hi) && (pfn_hi >= iova->pfn_lo))
- return 1;
- return 0;
+ /* FIXME call a helper here */
+ memset(skiplist->head->ptrs, 0, sl_node_size(SKIP_MAXLEVEL));
+ sl_unlock_node(skiplist->head);
+ local_irq_restore(flags);
}
static struct iova *
@@ -347,6 +272,7 @@ __insert_new_range(struct iova_domain *iovad,
unsigned long pfn_lo, unsigned long pfn_hi)
{
struct iova *iova;
+ int ret;
iova = alloc_iova_mem();
if (!iova)
@@ -354,18 +280,16 @@ __insert_new_range(struct iova_domain *iovad,
iova->pfn_hi = pfn_hi;
iova->pfn_lo = pfn_lo;
- iova_insert_rbtree(&iovad->rbroot, iova);
- return iova;
-}
+ ret = iova_insert_skiplist(&iovad->skiplist, iova);
-static void
-__adjust_overlap_range(struct iova *iova,
- unsigned long *pfn_lo, unsigned long *pfn_hi)
-{
- if (*pfn_lo < iova->pfn_lo)
- iova->pfn_lo = *pfn_lo;
- if (*pfn_hi > iova->pfn_hi)
- *pfn_lo = iova->pfn_hi + 1;
+ if (ret == -ENOMEM) {
+ free_iova_mem(iova);
+ return NULL;
+ } else {
+ BUG_ON(ret);
+ }
+
+ return iova;
}
/**
@@ -380,32 +304,40 @@ struct iova *
reserve_iova(struct iova_domain *iovad,
unsigned long pfn_lo, unsigned long pfn_hi)
{
- struct rb_node *node;
+ struct sl_slot *slot;
unsigned long flags;
- struct iova *iova;
- unsigned int overlap = 0;
-
- spin_lock_irqsave(&iovad->iova_rbtree_lock, flags);
- for (node = rb_first(&iovad->rbroot); node; node = rb_next(node)) {
- if (__is_range_overlap(node, pfn_lo, pfn_hi)) {
- iova = container_of(node, struct iova, node);
- __adjust_overlap_range(iova, &pfn_lo, &pfn_hi);
- if ((pfn_lo >= iova->pfn_lo) &&
- (pfn_hi <= iova->pfn_hi))
- goto finish;
- overlap = 1;
-
- } else if (overlap)
- break;
- }
-
- /* We are here either because this is the first reserver node
- * or need to insert remaining non overlap addr range
+ struct iova *iova = NULL;
+ struct iova *found = NULL;
+ unsigned long size = pfn_hi - pfn_lo + 1;
+ unsigned long min_pfn = pfn_lo;
+ unsigned long max_pfn = pfn_hi;
+
+ /*
+ * this is not locking safe. It only happens while there are no
+ * concurrent IO requrests (I hope!)
*/
- iova = __insert_new_range(iovad, pfn_lo, pfn_hi);
-finish:
-
- spin_unlock_irqrestore(&iovad->iova_rbtree_lock, flags);
+ local_irq_save(flags);
+ while(1) {
+ /*
+ * really ugly, just delete anything overlapping and
+ * reinsert the new full range
+ */
+ slot = skiplist_delete(&iovad->skiplist, pfn_lo, size);
+ if (!slot)
+ break;
+
+ found = sl_slot_entry(slot, struct iova, slot);
+ while (found->pfn_lo == IOVA_PENDING_INIT ||
+ found->pfn_hi == IOVA_PENDING_INIT) {
+ cpu_relax();
+ smp_rmb();
+ }
+ min_pfn = min(found->pfn_lo, min_pfn);
+ max_pfn = max(found->pfn_hi, max_pfn);
+ free_iova_mem(found);
+ }
+ iova = __insert_new_range(iovad, min_pfn, max_pfn);
+ local_irq_restore(flags);
return iova;
}
@@ -419,17 +351,33 @@ finish:
void
copy_reserved_iova(struct iova_domain *from, struct iova_domain *to)
{
+ struct sl_node *p;
+ struct sl_leaf *leaf;
unsigned long flags;
- struct rb_node *node;
-
- spin_lock_irqsave(&from->iova_rbtree_lock, flags);
- for (node = rb_first(&from->rbroot); node; node = rb_next(node)) {
- struct iova *iova = container_of(node, struct iova, node);
- struct iova *new_iova;
- new_iova = reserve_iova(to, iova->pfn_lo, iova->pfn_hi);
- if (!new_iova)
- printk(KERN_ERR "Reserve iova range %lx@%lx failed\n",
- iova->pfn_lo, iova->pfn_lo);
+ struct iova *iova;
+ struct iova *new_iova;
+ struct sl_slot *slot;
+ int i;
+
+ /*
+ * this is not locking safe. It only happens while there are no
+ * concurrent IO requrests (I hope!)
+ */
+ local_irq_save(flags);
+ sl_lock_node(from->skiplist.head);
+ p = from->skiplist.head->ptrs[0].next;
+ while (p) {
+ leaf = sl_entry(p);
+ for (i = 0; i < leaf->nr; i++) {
+ slot = leaf->ptrs[i];
+ iova = sl_slot_entry(slot, struct iova, slot);
+ new_iova = reserve_iova(to, iova->pfn_lo, iova->pfn_hi);
+ if (!new_iova)
+ printk(KERN_ERR "Reserve iova range %lx@%lx failed\n",
+ iova->pfn_lo, iova->pfn_lo);
+ }
+ p = leaf->node.ptrs[0].next;
}
- spin_unlock_irqrestore(&from->iova_rbtree_lock, flags);
+ sl_unlock_node(from->skiplist.head);
+ local_irq_restore(flags);
}
diff --git a/include/linux/iova.h b/include/linux/iova.h
index 76a0759..b8d0502 100644
--- a/include/linux/iova.h
+++ b/include/linux/iova.h
@@ -13,7 +13,7 @@
#include <linux/types.h>
#include <linux/kernel.h>
-#include <linux/rbtree.h>
+#include <linux/skiplist.h>
#include <linux/dma-mapping.h>
/* IO virtual address start page frame number */
@@ -21,16 +21,14 @@
/* iova structure */
struct iova {
- struct rb_node node;
+ struct sl_slot slot;
unsigned long pfn_hi; /* IOMMU dish out addr hi */
unsigned long pfn_lo; /* IOMMU dish out addr lo */
};
/* holds all the iova translations for a domain */
struct iova_domain {
- spinlock_t iova_rbtree_lock; /* Lock to protect update of rbtree */
- struct rb_root rbroot; /* iova domain rbtree root */
- struct rb_node *cached32_node; /* Save last alloced node */
+ struct sl_list skiplist; /* iova domain skiplist */
unsigned long dma_32bit_pfn;
};
--
1.8.2
^ permalink raw reply related [flat|nested] 12+ messages in thread
* Re: [PATCH RFC 0/2] rcu skiplists v2
2013-06-16 14:56 [PATCH RFC 0/2] rcu skiplists v2 Chris Mason
2013-06-16 14:57 ` [PATCH RFC 1/2] Skiplist: rcu range index Chris Mason
2013-06-16 14:58 ` [PATCH RFC 2/2] Switch the IOMMU over to the skiplists Chris Mason
@ 2013-06-26 23:02 ` Mathieu Desnoyers
2013-06-26 23:51 ` Chris Mason
2 siblings, 1 reply; 12+ messages in thread
From: Mathieu Desnoyers @ 2013-06-26 23:02 UTC (permalink / raw)
To: Chris Mason
Cc: Linux FS Devel, David Woodhouse, dchinner@redhat.com,
bo.li.liu@oracle.com, rp@svcs.cs.pdx.edu, Paul E. McKenney,
Lai Jiangshan, Stephen Hemminger, Alan Stern
Hi Chris,
* Chris Mason (clmason@fusionio.com) wrote:
> Hi everyone,
>
> This is another iteration of my skiplists patch, with some bugs fixed
> and a few missing parts of the API done. The biggest change is in the
> insertion code, where I now link from the bottom up once I find the
> proper insertion point. This makes it possible to write custom
> insertion routines that allow duplicate keys.
>
> It also means insertion doesn't have to lock and track all the nodes
> from the top as it searches. In the likely event that we're able to
> insert into a free spot in an existing leaf, it only needs to take one
> lock.
nice, I did pretty much the same for RCU Judy arrays. I made insertion
and deletion take only the localized locks required, nesting from bottom
to top.
> The IOMMU part of the patch is updated slightly, but still not using all
> the new bits in the API. This is mostly because the IOMMU part is going
> to change later on and I'm really only using it for testing now.
>
> For benchmarking, the IOMMU numbers are slightly higher than last time,
> but not more than 5% or so. This is because the bottleneck is still
> skiplist_insert_hole(), which I haven't really changed in this round.
>
> Most of my benchmarking now is with the skiplist_test module, which has
> expanded to a few new cases. For random operations, my performance is
> slightly slower than rbtree when single threaded.
>
> Random lookups on 10 million items: (inserts are similar)
>
> rbtree random lookup time 4 s 327 ms
> skiplist-rcu random lookup time 5 s 175 ms
These benchmarks are only considering averages.
I'm worried that the skiplist approach has chances to create long chains
(probabilistically non-zero). Clearly, the lack of bound is something
the RT guys might frown upon. I'd be curious to see how long the worse
chains encountered can be on, say, 1000 machines stress-testing this for
a week.
FWIW, my benchmark of RCU Judy array with ranges in similar conditions:
- Using 32-bit key length
- I first populated 10M ranges of len = 1, sequentially.
- Then, I run a reader threads for 10s, which perform random lookups
(all successful) in keys from 0 to 10M.
The average lookup time is (on a per-core basis):
- single-thread: 522 ns / lookup ( 19122393 lookups in 10s)
- 2 threads: 553 ns / lookup ( ~36134994 lookups in 10s on 2 cores)
- 8 threads: 696 ns / lookup (~114854104 lookups in 10s on 8 cores)
We get an efficiency of 0.79 when going from 2 to 8 threads (efficiency
of 1 being linear scalability).
Performed on this HW:
8-core
model name : Intel(R) Xeon(R) CPU E5405 @ 2.00GHz
What I gather from your skiplist-rcu numbers, it seems to take
517ns/lookup (avg), and for rbtree: 432ns/lookup. I'd be curious to
test all these alternatives in the same test bench.
>
> The new API makes it easier to do sequential operations. Here is a walk
> through the 10 million item list:
>
> skiplist-rcu check time 0 s 79 ms
> rbtree check time 0 s 295 ms
>
> And sequential insert:
>
> kiplist-rcu fill time 1 s 599 ms
> rbtree fill time 1 s 875 ms
>
> The benchmark does random lookup/deletion/insertion rounds. Most of the
> operations done are either deletion or insertion, so I'm not skewing the
> results with the easy rcu lookup operation.
We could do helpers for sequential traversal of ranges, they are not
implemented in RCU Judy array yet though.
>
> Single threaded rbtree does consistently win across all sizes of lists:
>
> skiplist-rcu thread 0 time 0 s 287 ms
> rbtree thread 0 time 0 s 215 ms
Are the 287ms and 215ms numbers you get for skiplist-rcu and rbtree the
total for 10M add/delete, or is it per operation, or is it a different
number of operations altogether ? Also, how many items are present in
the data structures in average ?
> But once we go up to two threads, skiplists win:
>
> skiplist-rcu thread 1 time 0 s 299 ms
> rbtree thread 1 time 0 s 499 ms
>
> At 8 threads, the skiplists don't get linear scaling, but it's really
> pretty good. Since I'm doing random operations on a wide key space,
> the locking skiplist variant is also fast:
>
> skiplist-rcu thread 7 time 0 s 379 ms
> skiplist-locking thread 7 time 0 s 425 ms
> rbtree thread 7 time 3 s 711 ms
Hopefully I'm having a setup not too different from yours. Here is my
benchmark of the RCU Judy Array insert operations. Please note that I
have not focused on optimizing the update-side, I've made design choices
aimed towards making lookups as fast as possible, sometimes at the
expense of update overhead.
I used a keyspace of 400M, just so the number of inserts that end up
being a simple rcu lookup are non significant. I'm still using a 32-bit
key length. I'm adding ranges of length 1.
Here is the average time taken per insertion (on a per-core basis):
- single-thread: 15000 ns/insert ( 645474 inserts in 10s)
- 2 threads: 17000 ns/insert (~1188232 inserts in 10s on 2 cores)
- 8 threads: 25000 ns/insert (~3161024 inserts in 10s on 8 cores)
We get an efficiency of 0.66 when going from 2 to 8 threads (efficiency
of 1 being linear scalability).
>
> My test machine is 8 cores, so at 16 threads we're into HT:
>
> skiplist-rcu thread 15 time 0 s 583 ms
> skiplist-locking thread 15 time 0 s 559 ms
> rbtree thread 15 time 7 s 423 ms
Skipping this test, since my test machine does not have hyperthreading.
>
> It's not all sunshine though. If I shrink the keyspace down to 1000
> keys or less, there is more contention on the node locks and we're tied
> (or slightly worse) than rbtree. In that kind of workload it makes
> sense to add a big fat lock around the skiplist, or just stick with
> rbtrees.
I'd expect a similar behavior with RCU judy arrays with respect to small
keyspace (single lock around updates might perform better that locks
distributed within the data). However, even with a small keyspace (and
few nodes), I expect that RCU Judy Array and skiplist-rcu will win over
non-RCU rbtree for lookup speed when we have concurrent updates and
lookups from many cores.
Feedback is welcome,
Thanks!
Mathieu
>
> The skiplists do have locking and rcu variants of lookup operations.
> The locking ones are built on top of the rcu ones, and you can use them
> both at the same time.
>
> Patches follow.
>
> -chris
>
--
Mathieu Desnoyers
EfficiOS Inc.
http://www.efficios.com
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH RFC 0/2] rcu skiplists v2
2013-06-26 23:02 ` [PATCH RFC 0/2] rcu skiplists v2 Mathieu Desnoyers
@ 2013-06-26 23:51 ` Chris Mason
2013-06-27 2:29 ` Mathieu Desnoyers
0 siblings, 1 reply; 12+ messages in thread
From: Chris Mason @ 2013-06-26 23:51 UTC (permalink / raw)
To: Mathieu Desnoyers
Cc: Linux FS Devel, David Woodhouse, dchinner@redhat.com,
bo.li.liu@oracle.com, rp@svcs.cs.pdx.edu, Paul E. McKenney,
Lai Jiangshan, Stephen Hemminger, Alan Stern
Quoting Mathieu Desnoyers (2013-06-26 19:02:18)
> > The IOMMU part of the patch is updated slightly, but still not using all
> > the new bits in the API. This is mostly because the IOMMU part is going
> > to change later on and I'm really only using it for testing now.
> >
> > For benchmarking, the IOMMU numbers are slightly higher than last time,
> > but not more than 5% or so. This is because the bottleneck is still
> > skiplist_insert_hole(), which I haven't really changed in this round.
> >
> > Most of my benchmarking now is with the skiplist_test module, which has
> > expanded to a few new cases. For random operations, my performance is
> > slightly slower than rbtree when single threaded.
> >
> > Random lookups on 10 million items: (inserts are similar)
> >
> > rbtree random lookup time 4 s 327 ms
> > skiplist-rcu random lookup time 5 s 175 ms
>
> These benchmarks are only considering averages.
>
> I'm worried that the skiplist approach has chances to create long chains
> (probabilistically non-zero). Clearly, the lack of bound is something
> the RT guys might frown upon. I'd be curious to see how long the worse
> chains encountered can be on, say, 1000 machines stress-testing this for
> a week.
Correct, the probability part may run into problems for RT. I haven't
put in instrumentation for worst case. I think my retries (goto again)
for concurrent update are a bigger RT problem, but they can be cut down
significantly.
>
> FWIW, my benchmark of RCU Judy array with ranges in similar conditions:
>
> - Using 32-bit key length
> - I first populated 10M ranges of len = 1, sequentially.
> - Then, I run a reader threads for 10s, which perform random lookups
> (all successful) in keys from 0 to 10M.
Similar, I had 64 bit keys and the lookups were totally random (not all
successful). I doubt it matters too much for these numbers.
Also, my benchmarks were not just inserting keys but keys pointing to
things. So a lookup walked the tree and found an object and then
returned the object. radix can just return a key/value without
dereferencing the value, but that wasn't the case in my runs.
> > Single threaded rbtree does consistently win across all sizes of lists:
> >
> > skiplist-rcu thread 0 time 0 s 287 ms
> > rbtree thread 0 time 0 s 215 ms
>
> Are the 287ms and 215ms numbers you get for skiplist-rcu and rbtree the
> total for 10M add/delete, or is it per operation, or is it a different
> number of operations altogether ? Also, how many items are present in
> the data structures in average ?
The 287 and 215ms were total run time for the mixed bag of operations.
It had lookups, single delete then re-insert and bulk (128 keys) delete
then re-insert. There were always the same number of items present
(10M).
It looks like you have some strong numbers here, especially considering
that my test was all in kernel vs userland rcu.
-chris
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH RFC 0/2] rcu skiplists v2
2013-06-26 23:51 ` Chris Mason
@ 2013-06-27 2:29 ` Mathieu Desnoyers
2013-06-27 4:46 ` Chris Mason
2013-06-27 5:19 ` Dave Chinner
0 siblings, 2 replies; 12+ messages in thread
From: Mathieu Desnoyers @ 2013-06-27 2:29 UTC (permalink / raw)
To: Chris Mason
Cc: Linux FS Devel, David Woodhouse, dchinner@redhat.com,
bo.li.liu@oracle.com, rp@svcs.cs.pdx.edu, Paul E. McKenney,
Lai Jiangshan, Stephen Hemminger, Alan Stern
* Chris Mason (clmason@fusionio.com) wrote:
> Quoting Mathieu Desnoyers (2013-06-26 19:02:18)
> > > The IOMMU part of the patch is updated slightly, but still not using all
> > > the new bits in the API. This is mostly because the IOMMU part is going
> > > to change later on and I'm really only using it for testing now.
> > >
> > > For benchmarking, the IOMMU numbers are slightly higher than last time,
> > > but not more than 5% or so. This is because the bottleneck is still
> > > skiplist_insert_hole(), which I haven't really changed in this round.
> > >
> > > Most of my benchmarking now is with the skiplist_test module, which has
> > > expanded to a few new cases. For random operations, my performance is
> > > slightly slower than rbtree when single threaded.
> > >
> > > Random lookups on 10 million items: (inserts are similar)
> > >
> > > rbtree random lookup time 4 s 327 ms
> > > skiplist-rcu random lookup time 5 s 175 ms
> >
> > These benchmarks are only considering averages.
> >
> > I'm worried that the skiplist approach has chances to create long chains
> > (probabilistically non-zero). Clearly, the lack of bound is something
> > the RT guys might frown upon. I'd be curious to see how long the worse
> > chains encountered can be on, say, 1000 machines stress-testing this for
> > a week.
>
> Correct, the probability part may run into problems for RT. I haven't
> put in instrumentation for worst case. I think my retries (goto again)
> for concurrent update are a bigger RT problem, but they can be cut down
> significantly.
My RCU Judy array implementation uses a similar retry trick: e.g., for a
delete operation, I first do a RCU lookup of the node I need to delete,
and then take the mutexes needed to protect each internal node that
needs to be modified. Once each mutex is taken, I do a validation that
the nodes that were just locked did not change between lookup and mutex
acquisition. If any of those has been updated concurrently, every lock
is unlocked, and we retry the entire operation. Indeed, this would be
bad for RT. One approach RT could take is to just grab a global mutex
around the entire insert/delete operations on the data structure. It
would improve RT behavior at the expense of update scalability.
The problem with long chains (skip list) is that they hurt RCU lookups
too, and I don't see any workaround to fix this, given it's built into
the skip list design.
>
> >
> > FWIW, my benchmark of RCU Judy array with ranges in similar conditions:
> >
> > - Using 32-bit key length
> > - I first populated 10M ranges of len = 1, sequentially.
> > - Then, I run a reader threads for 10s, which perform random lookups
> > (all successful) in keys from 0 to 10M.
>
> Similar, I had 64 bit keys and the lookups were totally random (not all
> successful). I doubt it matters too much for these numbers.
I'd have to try with 64-bit keys, since it matters for RCU Judy. It
means a successful lookup will need to read twice as many cache lines as
for 32-bit keys. For my range implementation (on top of Judy), every
lookup ends up succeeding, because it either finds an "empty" range or a
populated range, so having match or non-match does not matter much for
range lookups.
>
> Also, my benchmarks were not just inserting keys but keys pointing to
> things. So a lookup walked the tree and found an object and then
> returned the object. radix can just return a key/value without
> dereferencing the value, but that wasn't the case in my runs.
In the specific test I ran, I'm looking up the "range" object, which is
the dereferenced "value" pointer in terms of Judy lookup. My Judy array
implementation represents items as a linked list of structures matching
a given key. This linked list is embedded within the structures,
similarly to the linux/list.h API. Then, if the lookup succeeds, I take
a mutex on the range, and check if it has been concurrently removed.
The only thing I'm not currently doing is to dereference the "private
data" pointer I have in the range. Eventually, I could even embed the
range structure within another structure to use inheritance to save a
pointer indirection. But I wanted to get the basic things to work first
before going too far into optimisation land.
>
> > > Single threaded rbtree does consistently win across all sizes of lists:
> > >
> > > skiplist-rcu thread 0 time 0 s 287 ms
> > > rbtree thread 0 time 0 s 215 ms
> >
> > Are the 287ms and 215ms numbers you get for skiplist-rcu and rbtree the
> > total for 10M add/delete, or is it per operation, or is it a different
> > number of operations altogether ? Also, how many items are present in
> > the data structures in average ?
>
> The 287 and 215ms were total run time for the mixed bag of operations.
> It had lookups, single delete then re-insert and bulk (128 keys) delete
> then re-insert. There were always the same number of items present
> (10M).
When I find time, I'll try to have a closer look at this sequence of
operations, and see if I can reproduce it.
>
> It looks like you have some strong numbers here, especially considering
> that my test was all in kernel vs userland rcu.
I'm glad to see that Userspace RCU seems to closely match the kernel RCU
performance (well, at least up to 8 cores) ;-) I'm pretty sure we'll
eventually run into the same scalability issues that led to the design
of the Linux kernel Tree RCU implementation some day though.
Thanks,
Mathieu
--
Mathieu Desnoyers
EfficiOS Inc.
http://www.efficios.com
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH RFC 0/2] rcu skiplists v2
2013-06-27 2:29 ` Mathieu Desnoyers
@ 2013-06-27 4:46 ` Chris Mason
2013-06-27 11:42 ` Mathieu Desnoyers
2013-06-27 5:19 ` Dave Chinner
1 sibling, 1 reply; 12+ messages in thread
From: Chris Mason @ 2013-06-27 4:46 UTC (permalink / raw)
To: Mathieu Desnoyers
Cc: Linux FS Devel, David Woodhouse, dchinner@redhat.com,
bo.li.liu@oracle.com, rp@svcs.cs.pdx.edu, Paul E. McKenney,
Lai Jiangshan, Stephen Hemminger, Alan Stern
Quoting Mathieu Desnoyers (2013-06-26 22:29:36)
>
> >
> > >
> > > FWIW, my benchmark of RCU Judy array with ranges in similar conditions:
> > >
> > > - Using 32-bit key length
> > > - I first populated 10M ranges of len = 1, sequentially.
> > > - Then, I run a reader threads for 10s, which perform random lookups
> > > (all successful) in keys from 0 to 10M.
> >
> > Similar, I had 64 bit keys and the lookups were totally random (not all
> > successful). I doubt it matters too much for these numbers.
>
> I'd have to try with 64-bit keys, since it matters for RCU Judy. It
> means a successful lookup will need to read twice as many cache lines as
> for 32-bit keys. For my range implementation (on top of Judy), every
> lookup ends up succeeding, because it either finds an "empty" range or a
> populated range, so having match or non-match does not matter much for
> range lookups.
>
> >
> > Also, my benchmarks were not just inserting keys but keys pointing to
> > things. So a lookup walked the tree and found an object and then
> > returned the object. radix can just return a key/value without
> > dereferencing the value, but that wasn't the case in my runs.
>
> In the specific test I ran, I'm looking up the "range" object, which is
> the dereferenced "value" pointer in terms of Judy lookup. My Judy array
> implementation represents items as a linked list of structures matching
> a given key. This linked list is embedded within the structures,
> similarly to the linux/list.h API. Then, if the lookup succeeds, I take
> a mutex on the range, and check if it has been concurrently removed.
>
> The only thing I'm not currently doing is to dereference the "private
> data" pointer I have in the range. Eventually, I could even embed the
> range structure within another structure to use inheritance to save a
> pointer indirection. But I wanted to get the basic things to work first
> before going too far into optimisation land.
Hmmm, in my tests for both rbtree and skiplists, the private data
object also has the range length. So both are doing at least one
dereference when they check against the length. It sounds like this
corresponds to the Judy value pointer?
>
> >
> > > > Single threaded rbtree does consistently win across all sizes of lists:
> > > >
> > > > skiplist-rcu thread 0 time 0 s 287 ms
> > > > rbtree thread 0 time 0 s 215 ms
> > >
> > > Are the 287ms and 215ms numbers you get for skiplist-rcu and rbtree the
> > > total for 10M add/delete, or is it per operation, or is it a different
> > > number of operations altogether ? Also, how many items are present in
> > > the data structures in average ?
> >
> > The 287 and 215ms were total run time for the mixed bag of operations.
> > It had lookups, single delete then re-insert and bulk (128 keys) delete
> > then re-insert. There were always the same number of items present
> > (10M).
>
> When I find time, I'll try to have a closer look at this sequence of
> operations, and see if I can reproduce it.
It's fine to benchmark against pure lookup/insertion/deletion. Or said
a different way, my assortment of operations isn't really matching any
workload, so there's on reason to emulate it ;)
Batch lookup and deletion are probably important though.
-chris
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH RFC 0/2] rcu skiplists v2
2013-06-27 2:29 ` Mathieu Desnoyers
2013-06-27 4:46 ` Chris Mason
@ 2013-06-27 5:19 ` Dave Chinner
2013-06-27 10:55 ` [BULK] " Chris Mason
2013-06-27 12:12 ` Mathieu Desnoyers
1 sibling, 2 replies; 12+ messages in thread
From: Dave Chinner @ 2013-06-27 5:19 UTC (permalink / raw)
To: Mathieu Desnoyers
Cc: Chris Mason, Linux FS Devel, David Woodhouse, dchinner@redhat.com,
bo.li.liu@oracle.com, rp@svcs.cs.pdx.edu, Paul E. McKenney,
Lai Jiangshan, Stephen Hemminger, Alan Stern
On Wed, Jun 26, 2013 at 10:29:36PM -0400, Mathieu Desnoyers wrote:
> * Chris Mason (clmason@fusionio.com) wrote:
> > Quoting Mathieu Desnoyers (2013-06-26 19:02:18)
> > > FWIW, my benchmark of RCU Judy array with ranges in similar conditions:
> > >
> > > - Using 32-bit key length
> > > - I first populated 10M ranges of len = 1, sequentially.
> > > - Then, I run a reader threads for 10s, which perform random lookups
> > > (all successful) in keys from 0 to 10M.
> >
> > Similar, I had 64 bit keys and the lookups were totally random (not all
> > successful). I doubt it matters too much for these numbers.
>
> I'd have to try with 64-bit keys, since it matters for RCU Judy. It
> means a successful lookup will need to read twice as many cache lines as
> for 32-bit keys. For my range implementation (on top of Judy), every
> lookup ends up succeeding, because it either finds an "empty" range or a
> populated range, so having match or non-match does not matter much for
> range lookups.
Yeah, I only care about performance with 64 bit keys, sparse
keyspace population and random extent lengths. Random lookup
performance is more important than insert and delete, though I do
have cases where bulk sequential insert and removal are important,
too.
> > Also, my benchmarks were not just inserting keys but keys pointing to
> > things. So a lookup walked the tree and found an object and then
> > returned the object. radix can just return a key/value without
> > dereferencing the value, but that wasn't the case in my runs.
>
> In the specific test I ran, I'm looking up the "range" object, which is
> the dereferenced "value" pointer in terms of Judy lookup. My Judy array
> implementation represents items as a linked list of structures matching
> a given key. This linked list is embedded within the structures,
> similarly to the linux/list.h API. Then, if the lookup succeeds, I take
> a mutex on the range, and check if it has been concurrently removed.
Does that mean that each "extent" that is indexed has a list head
embedded in it? That blows the size of the index out when all I
might want to store in the tree is a 64 bit value for a block
mapping...
FWIW, when a bunch of scalability work was done on xfs_repair years
ago, judy arrays were benchmarked for storing extent lists that
tracked free/used space. We ended up using a btree, because while it
was slower than the original bitmap code, it was actually faster
than the highly optimised judy array library and at the scale we
needed there was no memory usage advantage to using a judy array,
either...
So I'm really starting to wonder if it'd be simpler for me just to
resurrect the old RCU friendly btree code Peter Z wrote years ago
(http://programming.kicks-ass.net/kernel-patches/vma_lookup/) and
customise it for the couple of uses I have in XFS....
Cheers,
Dave.
--
Dave Chinner
david@fromorbit.com
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [BULK] Re: [PATCH RFC 0/2] rcu skiplists v2
2013-06-27 5:19 ` Dave Chinner
@ 2013-06-27 10:55 ` Chris Mason
2013-06-27 12:12 ` Mathieu Desnoyers
1 sibling, 0 replies; 12+ messages in thread
From: Chris Mason @ 2013-06-27 10:55 UTC (permalink / raw)
To: Dave Chinner, Mathieu Desnoyers
Cc: Linux FS Devel, David Woodhouse, dchinner@redhat.com,
bo.li.liu@oracle.com, rp@svcs.cs.pdx.edu, Paul E. McKenney,
Lai Jiangshan, Stephen Hemminger, Alan Stern
Quoting Dave Chinner (2013-06-27 01:19:18)
> On Wed, Jun 26, 2013 at 10:29:36PM -0400, Mathieu Desnoyers wrote:
> > > Also, my benchmarks were not just inserting keys but keys pointing to
> > > things. So a lookup walked the tree and found an object and then
> > > returned the object. radix can just return a key/value without
> > > dereferencing the value, but that wasn't the case in my runs.
> >
> > In the specific test I ran, I'm looking up the "range" object, which is
> > the dereferenced "value" pointer in terms of Judy lookup. My Judy array
> > implementation represents items as a linked list of structures matching
> > a given key. This linked list is embedded within the structures,
> > similarly to the linux/list.h API. Then, if the lookup succeeds, I take
> > a mutex on the range, and check if it has been concurrently removed.
>
> Does that mean that each "extent" that is indexed has a list head
> embedded in it? That blows the size of the index out when all I
> might want to store in the tree is a 64 bit value for a block
> mapping...
For the skiplists, it might make sense to take the optimizations a
little farther and put the start/len/value triplet directly in the leaf.
Right now I push the len/value part into the user object. For btrfs
this is always bigger than a single block mapping (some kind of flags
etc).
>
> FWIW, when a bunch of scalability work was done on xfs_repair years
> ago, judy arrays were benchmarked for storing extent lists that
> tracked free/used space. We ended up using a btree, because while it
> was slower than the original bitmap code, it was actually faster
> than the highly optimised judy array library and at the scale we
> needed there was no memory usage advantage to using a judy array,
> either...
>
> So I'm really starting to wonder if it'd be simpler for me just to
> resurrect the old RCU friendly btree code Peter Z wrote years ago
> (http://programming.kicks-ass.net/kernel-patches/vma_lookup/) and
> customise it for the couple of uses I have in XFS....
I did start with his rcu btree, but the problem for me was concurrent
updates.
For xfs, the skiplists need two things:
i_size_read() style usage of u64 for keys instead of unsigned long.
Helper to allow duplicate keys.
Both are pretty easy, but I'm trying things out in btrfs first to make
sure I've worked out any problems.
-chris
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH RFC 0/2] rcu skiplists v2
2013-06-27 4:46 ` Chris Mason
@ 2013-06-27 11:42 ` Mathieu Desnoyers
0 siblings, 0 replies; 12+ messages in thread
From: Mathieu Desnoyers @ 2013-06-27 11:42 UTC (permalink / raw)
To: Chris Mason
Cc: Linux FS Devel, David Woodhouse, dchinner@redhat.com,
bo.li.liu@oracle.com, rp@svcs.cs.pdx.edu, Paul E. McKenney,
Lai Jiangshan, Stephen Hemminger, Alan Stern
* Chris Mason (clmason@fusionio.com) wrote:
> Quoting Mathieu Desnoyers (2013-06-26 22:29:36)
> >
> > >
> > > >
> > > > FWIW, my benchmark of RCU Judy array with ranges in similar conditions:
> > > >
> > > > - Using 32-bit key length
> > > > - I first populated 10M ranges of len = 1, sequentially.
> > > > - Then, I run a reader threads for 10s, which perform random lookups
> > > > (all successful) in keys from 0 to 10M.
> > >
> > > Similar, I had 64 bit keys and the lookups were totally random (not all
> > > successful). I doubt it matters too much for these numbers.
> >
> > I'd have to try with 64-bit keys, since it matters for RCU Judy. It
> > means a successful lookup will need to read twice as many cache lines as
> > for 32-bit keys. For my range implementation (on top of Judy), every
> > lookup ends up succeeding, because it either finds an "empty" range or a
> > populated range, so having match or non-match does not matter much for
> > range lookups.
> >
> > >
> > > Also, my benchmarks were not just inserting keys but keys pointing to
> > > things. So a lookup walked the tree and found an object and then
> > > returned the object. radix can just return a key/value without
> > > dereferencing the value, but that wasn't the case in my runs.
> >
> > In the specific test I ran, I'm looking up the "range" object, which is
> > the dereferenced "value" pointer in terms of Judy lookup. My Judy array
> > implementation represents items as a linked list of structures matching
> > a given key. This linked list is embedded within the structures,
> > similarly to the linux/list.h API. Then, if the lookup succeeds, I take
> > a mutex on the range, and check if it has been concurrently removed.
> >
> > The only thing I'm not currently doing is to dereference the "private
> > data" pointer I have in the range. Eventually, I could even embed the
> > range structure within another structure to use inheritance to save a
> > pointer indirection. But I wanted to get the basic things to work first
> > before going too far into optimisation land.
>
> Hmmm, in my tests for both rbtree and skiplists, the private data
> object also has the range length. So both are doing at least one
> dereference when they check against the length. It sounds like this
> corresponds to the Judy value pointer?
Yes, this is correct.
>
> >
> > >
> > > > > Single threaded rbtree does consistently win across all sizes of lists:
> > > > >
> > > > > skiplist-rcu thread 0 time 0 s 287 ms
> > > > > rbtree thread 0 time 0 s 215 ms
> > > >
> > > > Are the 287ms and 215ms numbers you get for skiplist-rcu and rbtree the
> > > > total for 10M add/delete, or is it per operation, or is it a different
> > > > number of operations altogether ? Also, how many items are present in
> > > > the data structures in average ?
> > >
> > > The 287 and 215ms were total run time for the mixed bag of operations.
> > > It had lookups, single delete then re-insert and bulk (128 keys) delete
> > > then re-insert. There were always the same number of items present
> > > (10M).
> >
> > When I find time, I'll try to have a closer look at this sequence of
> > operations, and see if I can reproduce it.
>
> It's fine to benchmark against pure lookup/insertion/deletion. Or said
> a different way, my assortment of operations isn't really matching any
> workload, so there's on reason to emulate it ;)
>
> Batch lookup and deletion are probably important though.
Noted, thanks!
Mathieu
>
> -chris
--
Mathieu Desnoyers
EfficiOS Inc.
http://www.efficios.com
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH RFC 0/2] rcu skiplists v2
2013-06-27 5:19 ` Dave Chinner
2013-06-27 10:55 ` [BULK] " Chris Mason
@ 2013-06-27 12:12 ` Mathieu Desnoyers
1 sibling, 0 replies; 12+ messages in thread
From: Mathieu Desnoyers @ 2013-06-27 12:12 UTC (permalink / raw)
To: Dave Chinner
Cc: Chris Mason, Linux FS Devel, David Woodhouse, dchinner@redhat.com,
bo.li.liu@oracle.com, rp@svcs.cs.pdx.edu, Paul E. McKenney,
Lai Jiangshan, Stephen Hemminger, Alan Stern
* Dave Chinner (david@fromorbit.com) wrote:
> On Wed, Jun 26, 2013 at 10:29:36PM -0400, Mathieu Desnoyers wrote:
> > * Chris Mason (clmason@fusionio.com) wrote:
> > > Quoting Mathieu Desnoyers (2013-06-26 19:02:18)
> > > > FWIW, my benchmark of RCU Judy array with ranges in similar conditions:
> > > >
> > > > - Using 32-bit key length
> > > > - I first populated 10M ranges of len = 1, sequentially.
> > > > - Then, I run a reader threads for 10s, which perform random lookups
> > > > (all successful) in keys from 0 to 10M.
> > >
> > > Similar, I had 64 bit keys and the lookups were totally random (not all
> > > successful). I doubt it matters too much for these numbers.
> >
> > I'd have to try with 64-bit keys, since it matters for RCU Judy. It
> > means a successful lookup will need to read twice as many cache lines as
> > for 32-bit keys. For my range implementation (on top of Judy), every
> > lookup ends up succeeding, because it either finds an "empty" range or a
> > populated range, so having match or non-match does not matter much for
> > range lookups.
>
> Yeah, I only care about performance with 64 bit keys, sparse
> keyspace population and random extent lengths. Random lookup
> performance is more important than insert and delete, though I do
> have cases where bulk sequential insert and removal are important,
> too.
One thing I noticed about 64-bit keys in Judy though: let's say we only
use part of the key space (e.g. lower 32-bits). Even with a 64-bit key
lookup, we end up always touching the same cache-lines for the top level
nodes, and therefore, the number of cache lines we need to bring from
memory will be quite close to that of a Judy array with 32-bit keys.
I'll have to confirm this with benchmarks though.
>
> > > Also, my benchmarks were not just inserting keys but keys pointing to
> > > things. So a lookup walked the tree and found an object and then
> > > returned the object. radix can just return a key/value without
> > > dereferencing the value, but that wasn't the case in my runs.
> >
> > In the specific test I ran, I'm looking up the "range" object, which is
> > the dereferenced "value" pointer in terms of Judy lookup. My Judy array
> > implementation represents items as a linked list of structures matching
> > a given key. This linked list is embedded within the structures,
> > similarly to the linux/list.h API. Then, if the lookup succeeds, I take
> > a mutex on the range, and check if it has been concurrently removed.
>
> Does that mean that each "extent" that is indexed has a list head
> embedded in it? That blows the size of the index out when all I
> might want to store in the tree is a 64 bit value for a block
> mapping...
My implementation currently has chaining of duplicates for genericity.
I'm keeping it simple (no special-cases) on purpose until we find out if
the Judy approach is interesting at all. We could quite easily create a
variant of the RCU Judy array augmented with range support that has this
as node:
/* The 64-bit start of range value is implicit within the Judy Array */
struct {
uint64_t end; /* end of range */
spinlock_t lock; /* lock updates on range */
struct rcu_head head; /* if call_rcu() is required for reclaim */
unsigned int flags:2; /* range is free, allocated, or removed */
};
Depending on what you are ready to let go in terms of scalability and
RCU reclaim batching, the lock and rcu_head could be removed. This ends
up being a trade-off between update scalability and memory footprint. So
if you go all the way for low memory footprint with a single lock
covering all updates, and use synchronize_rcu() to perform reclaim, you
end up with:
/* The 64-bit start of range value is implicit within the Judy Array */
struct {
uint64_t end; /* end of range */
unsigned int flags:2; /* range is free, allocated, or removed */
};
We could probably encode the flags into unused low-order bits of the
pointer to the range if needed.
>
> FWIW, when a bunch of scalability work was done on xfs_repair years
> ago, judy arrays were benchmarked for storing extent lists that
> tracked free/used space. We ended up using a btree, because while it
> was slower than the original bitmap code, it was actually faster
> than the highly optimised judy array library and at the scale we
> needed there was no memory usage advantage to using a judy array,
> either...
>
> So I'm really starting to wonder if it'd be simpler for me just to
> resurrect the old RCU friendly btree code Peter Z wrote years ago
> (http://programming.kicks-ass.net/kernel-patches/vma_lookup/) and
> customise it for the couple of uses I have in XFS....
Balanced tree structures end up having contention near the root if you
want to perform concurrent updates on them. The advantage of RCU skip
lists and Judy arrays over trees is that no rebalancing is required, so
updates can be performed concurrently when touching different areas of
the key space.
Thanks,
Mathieu
>
> Cheers,
>
> Dave.
> --
> Dave Chinner
> david@fromorbit.com
--
Mathieu Desnoyers
EfficiOS Inc.
http://www.efficios.com
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH RFC 1/2] Skiplist: rcu range index
2013-06-16 14:57 ` [PATCH RFC 1/2] Skiplist: rcu range index Chris Mason
@ 2013-07-14 14:06 ` Dong Fang
0 siblings, 0 replies; 12+ messages in thread
From: Dong Fang @ 2013-07-14 14:06 UTC (permalink / raw)
To: Chris Mason
Cc: Chris Mason, Mathieu Desnoyers, Linux FS Devel, David Woodhouse,
dchinner@redhat.com, bo.li.liu@oracle.com, rp@svcs.cs.pdx.edu,
Paul E. McKenney, Lai Jiangshan, Stephen Hemminger, Alan Stern
On 06/16/2013 10:57 AM, Chris Mason wrote:
> Signed-off-by: Chris Mason <chris.mason@fusionio.com>
> ---
> include/linux/skiplist.h | 235 ++++++
> init/main.c | 5 +
> lib/Kconfig | 14 +
> lib/Makefile | 3 +
> lib/skiplist.c | 2106
++++++++++++++++++++++++++++++++++++++++++++++
> lib/skiplist_test.c | 882 +++++++++++++++++++
> 6 files changed, 3245 insertions(+)
> create mode 100644 include/linux/skiplist.h
> create mode 100644 lib/skiplist.c
> create mode 100644 lib/skiplist_test.c
>
...
> +
> +static void pretty_time(struct timespec *ts, unsigned long long
*seconds, unsigned long long *ms)
> +{
> + unsigned long long m;
> +
> + *seconds = ts->tv_sec;
> +
> + m = ts->tv_nsec / 1000000ULL;
Linux: centos 6.4 32bit 3.10
Gcc version: gcc (GCC) 4.4.7 20120313 (Red Hat 4.4.7-3)
compiler error:
CC [M] lib/skiplist_test.o
Building modules, stage 2.
MODPOST 12 modules
ERROR: "__udivdi3" [lib/skiplist_test.ko] undefined!
> + *ms = m;
> +}
> +
...
^ permalink raw reply [flat|nested] 12+ messages in thread
end of thread, other threads:[~2013-07-14 2:08 UTC | newest]
Thread overview: 12+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2013-06-16 14:56 [PATCH RFC 0/2] rcu skiplists v2 Chris Mason
2013-06-16 14:57 ` [PATCH RFC 1/2] Skiplist: rcu range index Chris Mason
2013-07-14 14:06 ` Dong Fang
2013-06-16 14:58 ` [PATCH RFC 2/2] Switch the IOMMU over to the skiplists Chris Mason
2013-06-26 23:02 ` [PATCH RFC 0/2] rcu skiplists v2 Mathieu Desnoyers
2013-06-26 23:51 ` Chris Mason
2013-06-27 2:29 ` Mathieu Desnoyers
2013-06-27 4:46 ` Chris Mason
2013-06-27 11:42 ` Mathieu Desnoyers
2013-06-27 5:19 ` Dave Chinner
2013-06-27 10:55 ` [BULK] " Chris Mason
2013-06-27 12:12 ` Mathieu Desnoyers
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).