* [PATCH RFC 0/2] skiplists for range indexes
@ 2013-05-03 2:02 Chris Mason
2013-05-03 2:06 ` [PATCH RFC 1/2] core skiplist code Chris Mason
` (2 more replies)
0 siblings, 3 replies; 12+ messages in thread
From: Chris Mason @ 2013-05-03 2:02 UTC (permalink / raw)
To: Linux FS Devel, David Woodhouse, dchinner@redhat.com, bo.li.liu
Hi everyone,
I've been working on skiplists for indexing off/on for a little while
now. For btrfs, I really need a fast way to manage extents that doesn't
bog down in lock contention for reads or writes. Dave Chinner has
mentioned this as well, so I've cc'd him.
Liu Bo started this skiplist code, but at the time it didn't make
a huge difference. I've reworked things a bit and used the IOMMU
to test things. I ran some benchmarks on a single socket
server with two SSD cards to get a baseline of how much it is helping.
I tried to keep the basic structure of the IOMMU code, and there are probably
many better ways to fix the IOMMU contention. Really I'm just trying to
compare with rbtrees, not fix the IOMMU.
Basic numbers (all aio/dio):
Streaming writes
IOMMU off 2,575MB/s
skiplist 1,715MB/s
rbtree 1,659MB/s
Not a huge improvement, but the CPU time was lower.
16 threads, iodepth 10, 20MB random reads
IOMMU off 2,861MB/s (mostly idle)
skiplist 2,484MB/s (100% system time)
rbtree 99MB/s (100% system time)
16 threads, iodepth 10, 20MB random writes
IOMMU off 2,548MB/s
skiplist 1,649MB/s
rbtree 33MB/s
I ran this a bunch of times to make sure I got the rbtree numbers right,
lowering the thread count did improve the rbtree performance, but the
best I could do was around 300MB/s. For skiplists, all of the CPU time
is being spent in skiplist_insert_hole, which has a lot of room for
improvement.
>From here the code needs a lot of testing, and I need to fill out the API
to make it a little easier to use. But, I wanted to send this around
for comments and to see how other might want to use things. More
details on all internals are below.
It starts with a basic skiplist implementation where:
* skiplist nodes are dynamically sized. There is a slab cache for each
node height, so every node is actively using all of the pointers allocated
for it.
* Each node has a back pointer as well as a forward pointer
* Each skiplist node stores an array of pointers to items. This is a huge
speed improvement because the array of keys is much more cache friendly.
Since the array of item pointers is something of an extension
to the basic skiplist, I've separated them out into two structs.
First sl_node (just the indexing pointers) and then sl_leaf,
which has an sl_node and the array of item pointers.
There's no cache benefit to the leaves if I'm just storing
an array of pointers though, so it also has an array
of keys:
unsigned long keys[SKIP_KEYS_PER_NODE];
struct sl_slot *ptrs[SKIP_KEYS_PER_NODE];
The keys array is used for the first search pass, and based
on that result I narrow things down and only have to follow
one or two pointers from the corresponding ptrs array.
The sl_slot is very simple:
struct sl_slot {
unsigned long key;
unsigned long size;
}
The idea is to embed the sl_slot in your struct, giving us something like
this:
sl_leaf
________________
| node ptr N |
| .... |
| node ptr 0 |
|_______________|
| | keys | |
|___|________|__|
| . | ptrs |. |
|___|________|__|
/ \
/ \
------------- ------------
| sl_slot 0 | | sl_slot N |
| | | |
| data | | data |
------------- -------------
your
struct
This basic setup gives us similar performance to rbtrees, but uses less memory.
The performance varies (a lot really) with the size of the keys array.
Locking is a mixture of RCU and per-node locking. For searches,
it can be pure RCU, or it can use the per-node lock to synchronize
the final search though the last leaf. But, everything from the
skiplist head to that final node is RCU either way.
Insert locking is slightly different. Before the insert is started,
a new node is preallocated. The height of this node is used to
pick the level where we start locking nodes during the insert. Much
of the time we're able to get through huge portions of the list
without any locks.
For deletes, it does an RCU search for the leaf, and hold the per-node lock
while we remove a specific slot. If the leaf is empty it is marked as dead and
then unlinked from the skiplist level by level.
All the locking and tries does slow things down a bit, but the benefits
for concurrent access make a big difference.
^ permalink raw reply [flat|nested] 12+ messages in thread
* [PATCH RFC 1/2] core skiplist code
2013-05-03 2:02 [PATCH RFC 0/2] skiplists for range indexes Chris Mason
@ 2013-05-03 2:06 ` Chris Mason
2013-05-03 2:10 ` [PATCH RFC 2/2] skiplists for the IOMMU Chris Mason
2013-05-03 9:19 ` [PATCH RFC 0/2] skiplists for range indexes Jan Kara
2 siblings, 0 replies; 12+ messages in thread
From: Chris Mason @ 2013-05-03 2:06 UTC (permalink / raw)
To: Linux FS Devel, David Woodhouse, dchinner@redhat.com, bo.li.liu
diff --git a/include/linux/skiplist.h b/include/linux/skiplist.h
new file mode 100644
index 0000000..9b695fe
--- /dev/null
+++ b/include/linux/skiplist.h
@@ -0,0 +1,179 @@
+/*
+ * (C) 2011 Liu Bo <liubo2009@cn.fujitsu.com>
+ * (C) 2013 Fusion-io
+ *
+ * This program is free software; you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software
+ * Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 59 Temple
+ * Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+#ifndef _SKIPLIST_H
+#define _SKIPLIST_H
+
+#include <linux/spinlock.h>
+#include <linux/stacktrace.h>
+
+/*
+ * This includes a basic skiplist implementation and builds a more
+ * cache friendly variant on top meant to index ranges of keys.
+ *
+ * our random generation function has P at 0.5, so a rough metric
+ * of good performance happens for lists up to 2^MAXLEVEL in size.
+ * Since we have an array of keys, you can do 2^MAXLEVEL * SKIP_KEYS_PER_NODE
+ */
+#define SKIP_MAXLEVEL 32 /* skiplist_get_new_level requires <= 32 */
+
+struct sl_node_ptr {
+ struct sl_node *prev;
+ struct sl_node *next;
+};
+
+/*
+ * sl_node must be last in the leaf data struct. Allocate enough
+ * ram for a given size using either the sl_ptrs_size or sl_node_size
+ * helpers.
+ */
+struct sl_node {
+ int level;
+ int dead;
+ spinlock_t lock;
+ struct sl_node_ptr ptrs[];
+};
+
+/*
+ * The head list structure. The head node has no data,
+ * but it does have the full array of pointers for all the levels.
+ */
+struct sl_list {
+ /* in the head pointer, we use head->prev to point to
+ * the highest item in the list. But, the last item does
+ * not point back to the head. The head->prev items
+ * are protected the by node lock on the last item
+ */
+ struct sl_node *head;
+ spinlock_t lock;
+ unsigned int level;
+};
+
+/*
+ * If you are indexing extents, embed sl_slots into your structure and use
+ * sl_slot_entry to pull out your real struct. The key and size must not
+ * change if you're using rcu.
+ */
+struct sl_slot {
+ /*
+ * when rcu is on, we use this key to verify the pointer we pull
+ * out of the array. It must not change once the object is
+ * inserted
+ */
+ unsigned long key;
+
+ /*
+ * the range searching functions follow pointers into this slot
+ * struct and use this size field to find out how big the
+ * range is.
+ */
+ unsigned long size;
+
+ /*
+ * there is no reference count here, that's the job of whatever
+ * struct you embed this into. Good luck.
+ */
+};
+
+/*
+ * Larger values here make us faster when single threaded. Lower values
+ * increase cache misses but give more chances for concurrency.
+ */
+#define SKIP_KEYS_PER_NODE 32
+
+/*
+ * For indexing extents, this is a leaf in our skip list tree.
+ * Each leaf has a number of pointers and the max field
+ * is used to figure out the key space covered.
+ */
+struct sl_leaf {
+ /* number of valid keys/ptrs in this leaf */
+ int nr;
+
+ /*
+ * max value of the range covered by this leaf. This
+ * includes the size field of the very last extent,
+ * so max = keys[last_index] + ptrs[last_index]->size
+ */
+ unsigned long max;
+
+ /*
+ * sorted, all the keys
+ */
+ unsigned long keys[SKIP_KEYS_PER_NODE];
+
+ /*
+ * data pointers corresponding to the keys
+ */
+ struct sl_slot *ptrs[SKIP_KEYS_PER_NODE];
+
+ /* for freeing our objects after the grace period */
+ struct rcu_head rcu_head;
+
+ /* this needs to be at the end. The size changes based on the level */
+ struct sl_node node;
+};
+
+/*
+ * for a given level, how much memory we need for an array of
+ * all the pointers
+ */
+static inline int sl_ptrs_size(int level)
+{
+ return sizeof(struct sl_node_ptr) * (level + 1);
+}
+
+/*
+ * for a given level, how much memory we need for the
+ * array of pointers and the sl_node struct
+ */
+static inline int sl_node_size(int level)
+{
+ return sizeof(struct sl_node) + sl_ptrs_size(level);
+}
+
+static inline int sl_leaf_size(int level)
+{
+ return sizeof(struct sl_leaf) + sl_ptrs_size(level);
+}
+
+#define sl_entry(ptr) container_of((ptr), struct sl_leaf, node)
+#define sl_slot_entry(ptr, type, member) container_of(ptr, type, member)
+
+static inline int sl_empty(const struct sl_node *head)
+{
+ return head->ptrs[0].next == NULL;
+}
+
+int skiplist_preload(struct sl_list *list, gfp_t gfp_mask);
+int skiplist_get_new_level(struct sl_list *list, int max_level);
+int skiplist_insert(struct sl_list *list, struct sl_slot *slot,
+ int preload_token);
+int sl_init_list(struct sl_list *list, gfp_t mask);
+struct sl_slot *skiplist_lookup(struct sl_list *list, unsigned long key, unsigned long size);
+struct sl_slot *skiplist_lookup_rcu(struct sl_list *list, unsigned long key, unsigned long size);
+struct sl_slot *skiplist_delete(struct sl_list *list, unsigned long key, unsigned long size);
+int skiplist_insert_hole(struct sl_list *list, unsigned long hint,
+ unsigned long limit,
+ unsigned long size, unsigned long align,
+ struct sl_slot *slot,
+ gfp_t gfp_mask);
+void sl_lock_node(struct sl_node *n);
+void sl_unlock_node(struct sl_node *n);
+void sl_free_leaf(struct sl_leaf *leaf);
+unsigned long sl_highest_key(struct sl_list *list);
+#endif /* _SKIPLIST_H */
diff --git a/init/main.c b/init/main.c
index 63534a1..fccbaa4 100644
--- a/init/main.c
+++ b/init/main.c
@@ -90,6 +90,7 @@ extern void fork_init(unsigned long);
extern void mca_init(void);
extern void sbus_init(void);
extern void radix_tree_init(void);
+extern void skiplist_init(void);
#ifndef CONFIG_DEBUG_RODATA
static inline void mark_rodata_ro(void) { }
#endif
@@ -548,6 +549,7 @@ asmlinkage void __init start_kernel(void)
perf_event_init();
rcu_init();
radix_tree_init();
+
/* init some links before init_ISA_irqs() */
early_irq_init();
init_IRQ();
@@ -566,6 +568,10 @@ asmlinkage void __init start_kernel(void)
kmem_cache_init_late();
+#ifdef CONFIG_SKIPLIST
+ skiplist_init();
+#endif
+
/*
* HACK ALERT! This is early. We're enabling the console before
* we've done PCI setups etc, and console_init() must be aware of
diff --git a/lib/Kconfig b/lib/Kconfig
index 3958dc4..c571287 100644
--- a/lib/Kconfig
+++ b/lib/Kconfig
@@ -393,6 +393,20 @@ config SIGNATURE
Digital signature verification. Currently only RSA is supported.
Implementation is done using GnuPG MPI library
+config SKIPLIST
+ bool "Concurrent Skiplists"
+ default n
+ help
+ Concurrent skiplist indexing trees. Just say N
+
+config SKIPLIST_TEST
+ tristate "Skiplist testing module"
+ default n
+ select SKIPLIST
+ help
+ Testing module for the skiplist implementation
+
+#
#
# libfdt files, only selected if needed.
#
diff --git a/lib/Makefile b/lib/Makefile
index d7946ff..5c63f08 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -108,6 +108,9 @@ obj-$(CONFIG_NLATTR) += nlattr.o
obj-$(CONFIG_LRU_CACHE) += lru_cache.o
+obj-$(CONFIG_SKIPLIST) += skiplist.o
+obj-$(CONFIG_SKIPLIST_TEST) += skiplist_test.o
+
obj-$(CONFIG_DMA_API_DEBUG) += dma-debug.o
obj-$(CONFIG_GENERIC_CSUM) += checksum.o
diff --git a/lib/skiplist.c b/lib/skiplist.c
new file mode 100644
index 0000000..7692792
--- /dev/null
+++ b/lib/skiplist.c
@@ -0,0 +1,1877 @@
+/*
+ * (C) 2011 Liu Bo <liubo2009@cn.fujitsu.com>
+ * (C) 2013 Fusion-io
+ *
+ * This program is free software; you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software
+ * Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 59 Temple
+ * Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include <linux/errno.h>
+#include <linux/init.h>
+#include <linux/kernel.h>
+#include <linux/export.h>
+#include <linux/radix-tree.h>
+#include <linux/percpu.h>
+#include <linux/slab.h>
+#include <linux/notifier.h>
+#include <linux/cpu.h>
+#include <linux/string.h>
+#include <linux/bitops.h>
+#include <linux/rcupdate.h>
+#include <linux/random.h>
+#include <linux/skiplist.h>
+#include <linux/lockdep.h>
+#include <linux/stacktrace.h>
+#include <linux/sched.h>
+
+
+static struct kmem_cache *slab_caches[SKIP_MAXLEVEL];
+
+/*
+ * we have preemption off anyway for insertion, make the cursor
+ * per-cpu so we don't need to allocate one
+ */
+struct skip_preload {
+ /* one of these per CPU for tracking insertion */
+ struct sl_node *cursor[SKIP_MAXLEVEL + 1];
+
+ /*
+ * the preload is filled in based on the highest possible level
+ * of the list you're preloading. So we basically end up with
+ * one preloaded node for each max size.
+ */
+ struct sl_leaf *preload[SKIP_MAXLEVEL + 1];
+};
+
+static DEFINE_PER_CPU(struct skip_preload, skip_preloads) = { {NULL,}, };
+
+static void sl_init_node(struct sl_node *node, int level)
+{
+ spin_lock_init(&node->lock);
+
+ node->ptrs[level].prev = NULL;
+ node->level = level;
+ node->dead = 0;
+}
+
+/*
+ * the rcu based searches need to block reuse until a given search round
+ * is done. So, we use call_rcu for freeing the leaf structure.
+ */
+static void sl_free_rcu(struct rcu_head *head)
+{
+ struct sl_leaf *leaf = container_of(head, struct sl_leaf, rcu_head);
+ kmem_cache_free(slab_caches[leaf->node.level], leaf);
+}
+
+void sl_free_leaf(struct sl_leaf *leaf)
+{
+ call_rcu(&leaf->rcu_head, sl_free_rcu);
+}
+
+/*
+ * helper functions to wade through dead nodes pending deletion
+ * and return live ones.
+ */
+static struct sl_node *find_live_prev(struct sl_list *list,
+ struct sl_node *node, int level)
+{
+ /* head->prev points to the max, this makes sure we don't loop */
+ if (node == list->head)
+ return NULL;
+
+ while (node) {
+ node = rcu_dereference(node->ptrs[level].prev);
+ /*
+ * the head is never dead, so we'll never walk past
+ * it down in this loop
+ */
+ if (!node->dead)
+ break;
+ }
+
+ return node;
+}
+
+static struct sl_node *find_live_next(struct sl_list *list,
+ struct sl_node *node, int level)
+{
+ while (node) {
+ node = rcu_dereference(node->ptrs[level].next);
+ if (!node || !node->dead)
+ break;
+ }
+ return node;
+}
+
+/*
+ * having trouble teaching lockdep about the skiplist
+ * locking. The problem is that we're allowed to
+ * hold multiple locks on the same level as long as we
+ * go from left to right.
+ */
+void sl_lock_node(struct sl_node *n)
+{
+ spin_lock(&n->lock);
+}
+
+void sl_unlock_node(struct sl_node *n)
+{
+ if (n)
+ spin_unlock(&n->lock);
+}
+
+/*
+ * the cursors are used by the insertion code to remember the leaves we passed
+ * on the way down to our insertion point. Any new nodes are linking in
+ * after the nodes in our cursor.
+ *
+ * Nodes may appear in the cursor more than once, but if so they are
+ * always consecutive. We don't have A, B, C, B, D, only
+ * A, B, B, B, C, D. When locking and unlocking things, we
+ * have to make sure we leave any node inside the cursor properly locked.
+ *
+ * Right now, everything in the cursor must be locked.
+ */
+int found_in_cursor(struct sl_node **cursor, int max_level, struct sl_node *p)
+{
+ int i;
+
+ for (i = 0; i <= max_level; i++) {
+ if (cursor[i] == p)
+ return 1;
+ }
+ return 0;
+}
+
+/*
+ * add p into cursor at a specific level. If p is replacing another
+ * pointer, that pointer is unlocked, unless it is also at a
+ * higher level in the cursor.
+ *
+ * p is locked unless it was already in the cursor.
+ */
+static void add_to_cursor(struct sl_node **cursor, int level,
+ struct sl_node *p)
+{
+ struct sl_node *old;
+ struct sl_node *higher;
+
+ old = cursor[level];
+ cursor[level] = p;
+
+ if (old == p)
+ return;
+
+ if (level == SKIP_MAXLEVEL) {
+ sl_lock_node(p);
+ sl_unlock_node(old);
+ return;
+ }
+ higher = cursor[level + 1];
+
+ if (higher != p)
+ sl_lock_node(p);
+ if (higher != old)
+ sl_unlock_node(old);
+}
+
+/*
+ * same as add_to_cursor, but p must already be locked.
+ */
+static void add_locked_to_cursor(struct sl_node **cursor, int level,
+ struct sl_node *p)
+{
+ struct sl_node *old;
+
+ old = cursor[level];
+ cursor[level] = p;
+
+ if (old == p)
+ return;
+
+ if (level == SKIP_MAXLEVEL) {
+ sl_unlock_node(old);
+ return;
+ }
+
+ if (cursor[level + 1] != old)
+ sl_unlock_node(old);
+}
+
+/*
+ * unlock any nodes in the cursor below max_level
+ */
+static void free_cursor_locks(struct sl_node **cursor, int max_level)
+{
+ struct sl_node *p;
+ int i;
+
+ for (i = max_level; i >= 0; i--) {
+ p = cursor[i];
+ cursor[i] = NULL;
+ if (i == 0 || cursor[i - 1] != p)
+ sl_unlock_node(p);
+ }
+}
+
+/*
+ * helper function to link a single level during an insert.
+ * prev must be locked, and it is the node we are linking after.
+ *
+ * This will find a live next pointer, lock it, and link it
+ * with our new node
+ */
+static void sl_link_one_level(struct sl_list *list,
+ struct sl_node *prev,
+ struct sl_node *node, int level)
+{
+ struct sl_node *next;
+ struct sl_node *test;
+
+ assert_spin_locked(&prev->lock);
+ BUG_ON(prev->dead);
+
+again:
+ next = find_live_next(list, prev, level);
+ if (next) {
+ sl_lock_node(next);
+ test = find_live_next(list, prev, level);
+ if (test != next || next->dead) {
+ sl_unlock_node(next);
+ goto again;
+ }
+ /*
+ * make sure the our next and prev really point to each
+ * other now that we have next locked.
+ */
+ if (find_live_prev(list, next, level) != prev) {
+ sl_unlock_node(next);
+ goto again;
+ }
+ }
+
+ rcu_assign_pointer(node->ptrs[level].next, next);
+ rcu_assign_pointer(node->ptrs[level].prev, prev);
+ rcu_assign_pointer(prev->ptrs[level].next, node);
+
+ /*
+ * if next is null, we're the last node on this level.
+ * The head->prev pointer is used to cache this fact
+ */
+ if (next)
+ rcu_assign_pointer(next->ptrs[level].prev, node);
+ else
+ rcu_assign_pointer(list->head->ptrs[level].prev, node);
+
+ sl_unlock_node(next);
+}
+
+/*
+ * link a node at a given level. The cursor needs pointers to all the
+ * nodes that are just behind us in the list.
+ *
+ * Link from the bottom up so that our searches from the top down don't
+ * find bogus pointers
+ */
+static void sl_link_node(struct sl_list *list, struct sl_node *node,
+ struct sl_node **cursor, int level)
+{
+ int i;
+
+ for (i = 0; i <= level; i++)
+ sl_link_one_level(list, cursor[i], node, i);
+}
+
+/*
+ * just like sl_link_node, but use 'after' for starting point of the link.
+ * Any pointers not provided from 'after' come from the cursor.
+ * 'after' must be locked.
+ */
+static void sl_link_after_node(struct sl_list *list, struct sl_node *node,
+ struct sl_node *after,
+ struct sl_node **cursor, int level)
+{
+ int i;
+
+ /* first use all the pointers from 'after' */
+ for (i = 0; i <= after->level && i <= level; i++)
+ sl_link_one_level(list, after, node, i);
+
+ /* then use the cursor for anything left */
+ for (; i <= level; i++)
+ sl_link_one_level(list, cursor[i], node, i);
+
+}
+
+/*
+ * helper function to pull out the next live leaf at a given level.
+ * It is not locked
+ */
+struct sl_leaf *sl_next_leaf(struct sl_list *list, struct sl_node *p, int l)
+{
+ struct sl_node *next;
+ if (!p)
+ return NULL;
+
+ next = find_live_next(list, p, l);
+ if (next)
+ return sl_entry(next);
+ return NULL;
+}
+
+/*
+ * return the highest value for a given leaf. This is cached
+ * in leaf->max so that we don't have to wander into
+ * the slot pointers. The max is equal to the key + size of the
+ * last slot.
+ */
+static unsigned long sl_max_key(struct sl_leaf *leaf)
+{
+ smp_rmb();
+ return leaf->max;
+}
+
+/*
+ * return the lowest key for a given leaf. This comes out
+ * of the node key array and not the slots
+ */
+static unsigned long sl_min_key(struct sl_leaf *leaf)
+{
+ smp_rmb();
+ return leaf->keys[0];
+}
+
+struct sl_leaf *sl_first_leaf(struct sl_list *list)
+{
+ struct sl_leaf *leaf;
+ struct sl_node *p;
+
+ p = list->head->ptrs[0].next;
+ if (!p)
+ return NULL;
+ leaf = sl_entry(p);
+
+ return leaf;
+}
+
+struct sl_leaf *sl_last_leaf(struct sl_list *list)
+{
+ struct sl_leaf *leaf;
+ struct sl_node *p;
+
+ p = list->head->ptrs[0].prev;
+ if (!p)
+ return NULL;
+ leaf = sl_entry(p);
+
+ return leaf;
+}
+
+/*
+ * search inside the key array of a given leaf. The leaf must be
+ * locked because we're using a binary search. This returns
+ * zero if we found a slot with the key in it, and sets
+ * 'slot' to the number of the slot pointer.
+ *
+ * 1 is returned if the key was not found, and we set slot to
+ * the location where the insert needs to be performed.
+ *
+ */
+int leaf_slot_locked(struct sl_leaf *leaf, unsigned long key,
+ unsigned long size, int *slot)
+{
+ int low = 0;
+ int high = leaf->nr - 1;
+ int mid;
+ unsigned long k1;
+ struct sl_slot *found;
+
+ /*
+ * case1:
+ * [ key ... size ]
+ * [found .. found size ]
+ *
+ * case2:
+ * [key ... size ]
+ * [found .. found size ]
+ *
+ * case3:
+ * [key ... size ]
+ * [ found .. found size ]
+ *
+ * case4:
+ * [key ...size ]
+ * [ found ... found size ]
+ *
+ * case5:
+ * [key ...size ]
+ * [ found ... found size ]
+ */
+
+ while (low <= high) {
+ mid = low + (high - low) / 2;
+ k1 = leaf->keys[mid];
+ if (k1 < key) {
+ low = mid + 1;
+ } else if (k1 >= key + size) {
+ high = mid - 1;
+ } else {
+ *slot = mid;
+ return 0;
+ }
+ }
+
+ /*
+ * nothing found, at this point we're in the slot this key would
+ * normally be inserted at. Check the previous slot to see if
+ * it is inside the range there
+ */
+ if (low > 0) {
+ k1 = leaf->keys[low - 1];
+ found = leaf->ptrs[low - 1];
+
+ /* case1, case2, case5 */
+ if (k1 < key + size && k1 + found->size > key) {
+ *slot = low - 1;
+ return 0;
+ }
+
+ /* case3, case4 */
+ if (k1 < key + size && k1 >= key) {
+ *slot = low - 1;
+ return 0;
+ }
+ }
+ *slot = low;
+ return 1;
+}
+
+/*
+ * sequential search for lockless rcu. The insert/deletion routines
+ * try to order their operations to make this safe. See leaf_slot_locked
+ * for a list of extent range cases we're trying to cover.
+ */
+static int leaf_slot(struct sl_leaf *leaf, unsigned long key,
+ unsigned long size, int *slot)
+{
+ int i;
+ int cur;
+ int last;
+ unsigned long this_key;
+ struct sl_slot *found;
+
+again:
+ cur = 0;
+ last = leaf->nr;
+
+ /* find the first slot greater than our key */
+ for (i = 0; i < last; i++) {
+ smp_rmb();
+ this_key = leaf->keys[i];
+ if (this_key >= key + size)
+ break;
+ cur = i;
+ }
+ if (leaf->keys[cur] < key + size) {
+ /*
+ * if we're in the middle of an insert, pointer may
+ * be null. This little loop will wait for the insertion
+ * to finish.
+ */
+ while (1) {
+ found = rcu_dereference(leaf->ptrs[cur]);
+ if (found)
+ break;
+ cpu_relax();
+ }
+
+ /* insert is juggling our slots, try again */
+ if (found->key != leaf->keys[cur])
+ goto again;
+
+ /* case1, case2, case5 */
+ if (found->key < key + size && found->key + found->size > key) {
+ *slot = cur;
+ return 0;
+ }
+
+ /* case3, case4 */
+ if (found->key < key + size && found->key >= key) {
+ *slot = cur;
+ return 0;
+ }
+
+ *slot = cur + 1;
+ return 1;
+ }
+ *slot = cur;
+ return 1;
+}
+
+/*
+ * this does the dirty work of splitting and/or shifting a leaf
+ * to get a new slot inside. The leaf must be locked. slot
+ * tells us where into we should insert in the leaf and the cursor
+ * should have all the pointers we need to fully link any new nodes
+ * we have to create. leaf and everything in the cursor must be locked.
+ */
+static noinline int add_key_to_leaf(struct sl_list *list, struct sl_leaf *leaf,
+ struct sl_slot *slot_ptr, unsigned long key,
+ int slot, struct sl_node **cursor, int preload_token)
+{
+ struct sl_leaf *split;
+ struct skip_preload *skp;
+ int level;
+
+ /* no splitting required, just shift our way in */
+ if (leaf->nr < SKIP_KEYS_PER_NODE)
+ goto insert;
+
+ skp = &__get_cpu_var(skip_preloads);
+ split = skp->preload[preload_token];
+
+ /*
+ * we need to insert a new leaf, but we try not to insert a new leaf at
+ * the same height as our previous one, it's just a waste of high level
+ * searching. If the new node is the same level or lower than the
+ * existing one, try to use a level 0 leaf instead.
+ *
+ * The preallocation code tries to keep a level 0 leaf preallocated,
+ * lets see if we can grab one.
+ */
+ if (leaf->node.level > 0 && split->node.level <= leaf->node.level) {
+ if (skp->preload[0]) {
+ preload_token = 0;
+ split = skp->preload[0];
+ }
+ }
+ skp->preload[preload_token] = NULL;
+
+ level = split->node.level;
+
+ /*
+ * bump our list->level to whatever we've found. Nobody allocating
+ * a new node is going to set it higher than list->level + 1
+ */
+ if (level > list->level)
+ list->level = level;
+
+ /*
+ * this locking is really only required for the small window where
+ * we are linking the node and someone might be deleting one of the
+ * nodes we are linking with. The leaf passed in was already
+ * locked.
+ */
+ sl_lock_node(&split->node);
+
+ if (slot == leaf->nr) {
+ /*
+ * our new slot just goes at the front of the new leaf, don't
+ * bother shifting things in from the previous leaf.
+ */
+ slot = 0;
+ split->nr = 1;
+ split->max = key + slot_ptr->size;
+ split->keys[0] = key;
+ split->ptrs[0] = slot_ptr;
+ smp_wmb();
+ sl_link_after_node(list, &split->node, &leaf->node,
+ cursor, level);
+ sl_unlock_node(&split->node);
+ return 0;
+ } else {
+ int nr = SKIP_KEYS_PER_NODE / 2;
+ int mid = SKIP_KEYS_PER_NODE - nr;
+ int src_i = mid;
+ int dst_i = 0;
+ int orig_nr = leaf->nr;
+
+ /* split the previous leaf in half and copy items over */
+ split->nr = nr;
+ split->max = leaf->max;
+
+ while (src_i < slot) {
+ split->keys[dst_i] = leaf->keys[src_i];
+ split->ptrs[dst_i++] = leaf->ptrs[src_i++];
+ }
+
+ if (slot >= mid) {
+ split->keys[dst_i] = key;
+ split->ptrs[dst_i++] = slot_ptr;
+ split->nr++;
+ }
+
+ while (src_i < orig_nr) {
+ split->keys[dst_i] = leaf->keys[src_i];
+ split->ptrs[dst_i++] = leaf->ptrs[src_i++];
+ }
+
+ sl_link_after_node(list, &split->node, &leaf->node,
+ cursor, level);
+ nr = SKIP_KEYS_PER_NODE - nr;
+
+ /*
+ * now what we have all the items copied and our new
+ * leaf inserted, update the nr in this leaf. Anyone
+ * searching in rculand will find the fully updated
+ */
+ leaf->max = leaf->keys[nr - 1] + leaf->ptrs[nr - 1]->size;
+ smp_wmb();
+ leaf->nr = nr;
+ sl_unlock_node(&split->node);
+
+ /*
+ * if the slot was in split item, we're done,
+ * otherwise we need to move down into the
+ * code below that shifts the items and
+ * inserts the new key
+ */
+ if (slot >= mid)
+ return 0;
+ }
+insert:
+ if (slot < leaf->nr) {
+ int i;
+
+ /*
+ * put something sane into the new last slot so rcu
+ * searchers won't get confused
+ */
+ leaf->keys[leaf->nr] = 0;
+ leaf->ptrs[leaf->nr] = NULL;
+ smp_wmb();
+
+ /* then bump the nr */
+ leaf->nr++;
+
+ /*
+ * now step through each pointer after our
+ * destination and bubble it forward. memcpy
+ * would be faster but rcu searchers will be
+ * able to validate pointers as they go with
+ * this method.
+ */
+ for (i = leaf->nr - 1; i > slot; i--) {
+ leaf->keys[i] = leaf->keys[i - 1];
+ leaf->ptrs[i] = leaf->ptrs[i - 1];
+ /*
+ * make sure the key/pointer pair is
+ * fully visible in the new home before
+ * we move forward
+ */
+ smp_wmb();
+ }
+
+ /* finally stuff in our key */
+ leaf->keys[slot] = key;
+ leaf->ptrs[slot] = slot_ptr;
+ smp_wmb();
+ } else {
+ /*
+ * just extending the leaf, toss
+ * our key in and update things
+ */
+ leaf->max = key + slot_ptr->size;
+ leaf->keys[slot] = key;
+ leaf->ptrs[slot] = slot_ptr;
+
+ smp_wmb();
+ leaf->nr++;
+ }
+ return 0;
+}
+
+/*
+ * when we're extending a leaf with a new key, make sure the
+ * range of the [key,size] doesn't extend into the next
+ * leaf.
+ */
+static int check_overlap(struct sl_list *list, struct sl_leaf *leaf,
+ unsigned long key, unsigned long size)
+{
+ struct sl_node *p;
+ struct sl_leaf *next;
+ int ret = 0;
+
+ p = leaf->node.ptrs[0].next;
+ if (!p)
+ return 0;
+
+ sl_lock_node(p);
+ next = sl_entry(p);
+ if (key + size > sl_min_key(next))
+ ret = 1;
+ sl_unlock_node(p);
+
+ return ret;
+}
+
+/*
+ * helper function for insert. This will either return an existing
+ * key or insert a new slot into the list. leaf must be locked,
+ * and everything in the cursor must be locked.
+ */
+static noinline int find_or_add_key(struct sl_list *list, unsigned long key,
+ unsigned long size, struct sl_leaf *leaf,
+ struct sl_slot *slot_ptr,
+ struct sl_node **cursor, int preload_token)
+{
+ int ret;
+ int slot;
+
+ if (check_overlap(list, leaf, key, size)) {
+ ret = -EEXIST;
+ goto out;
+ }
+ if (key < leaf->max) {
+ ret = leaf_slot_locked(leaf, key, size, &slot);
+ if (ret == 0) {
+ ret = -EEXIST;
+ goto out;
+ }
+ } else {
+ slot = leaf->nr;
+ }
+
+ add_key_to_leaf(list, leaf, slot_ptr, key, slot, cursor, preload_token);
+ ret = 0;
+
+out:
+ return ret;
+}
+
+/*
+ * pull a new leaf out of the prealloc area, and insert the slot/key into it
+ */
+static struct sl_leaf *alloc_leaf(struct sl_slot *slot_ptr, unsigned long key,
+ int preload_token)
+{
+ struct sl_leaf *leaf;
+ struct skip_preload *skp;
+ int level;
+
+ skp = &__get_cpu_var(skip_preloads);
+ leaf = skp->preload[preload_token];
+ skp->preload[preload_token] = NULL;
+ level = leaf->node.level;
+
+ leaf->keys[0] = key;
+ leaf->ptrs[0] = slot_ptr;
+ leaf->nr = 1;
+ leaf->max = key + slot_ptr->size;
+ return leaf;
+}
+
+/*
+ * helper to grab the cursor from the prealloc area
+ * you must already have preempt off. The whole
+ * cursor is zero'd out, so don't call this if you're
+ * currently using the cursor.
+ */
+static struct sl_node **get_cursor(void)
+{
+ struct skip_preload *skp;
+ skp = &__get_cpu_var(skip_preloads);
+ memset(skp->cursor, 0, sizeof(skp->cursor[0]) * (SKIP_MAXLEVEL + 1));
+ return skp->cursor;
+}
+
+/*
+ * this returns with preempt disabled and the preallocation
+ * area setup for a new insert. To get there, it may or
+ * may not allocate a new leaf for the next insert.
+ *
+ * If allocations are done, this will also try to preallocate a level 0
+ * leaf, which allows us to optimize insertion by not placing two
+ * adjacent nodes together with the same level.
+ *
+ * This returns < 0 on errors. If everything works, it returns a preload
+ * token which you should use when fetching your preallocated items.
+ *
+ * The token allows us to preallocate based on the current
+ * highest level of the list. For a list of level N, we won't allocate
+ * higher than N + 1.
+ */
+int skiplist_preload(struct sl_list *list, gfp_t gfp_mask)
+{
+ struct skip_preload *skp;
+ struct sl_leaf *leaf;
+ struct sl_leaf *leaf0 = NULL;
+ int alloc_leaf0 = 1;
+ int level;
+ int max_level = min_t(int, list->level + 1, SKIP_MAXLEVEL - 1);
+ int token = max_level;
+
+ preempt_disable();
+ skp = &__get_cpu_var(skip_preloads);
+ if (max_level && !skp->preload[0])
+ alloc_leaf0 = 1;
+
+ if (skp->preload[max_level])
+ return token;
+
+ preempt_enable();
+ level = skiplist_get_new_level(list, max_level);
+ leaf = kmem_cache_alloc(slab_caches[level], gfp_mask);
+ if (leaf == NULL)
+ return -ENOMEM;
+
+ if (alloc_leaf0)
+ leaf0 = kmem_cache_alloc(slab_caches[0], gfp_mask);
+
+ preempt_disable();
+ skp = &__get_cpu_var(skip_preloads);
+
+ if (leaf0) {
+ if (skp->preload[0] == NULL) {
+ sl_init_node(&leaf0->node, 0);
+ skp->preload[0] = leaf0;
+ } else {
+ kmem_cache_free(slab_caches[0], leaf0);
+ }
+ }
+
+ if (skp->preload[max_level]) {
+ kmem_cache_free(slab_caches[level], leaf);
+ return token;
+ }
+ sl_init_node(&leaf->node, level);
+ skp->preload[max_level] = leaf;
+
+ return token;
+}
+EXPORT_SYMBOL(skiplist_preload);
+
+/*
+ * use the kernel prandom call to pick a new random level. This
+ * uses P = .50. If you bump the SKIP_MAXLEVEL past 32 bits,
+ * this function needs updating.
+ */
+int skiplist_get_new_level(struct sl_list *list, int max_level)
+{
+ int level = 0;
+ unsigned long randseed;
+
+ randseed = prandom_u32();
+
+ while (randseed && (randseed & 1)) {
+ randseed >>= 1;
+ level++;
+ if (level == max_level)
+ break;
+ }
+ return (level >= SKIP_MAXLEVEL ? SKIP_MAXLEVEL - 1: level);
+}
+EXPORT_SYMBOL(skiplist_get_new_level);
+
+/*
+ * just return the level of the leaf we're going to use
+ * for the next insert
+ */
+static int pending_insert_level(int preload_token)
+{
+ struct skip_preload *skp;
+ skp = &__get_cpu_var(skip_preloads);
+ return skp->preload[preload_token]->node.level;
+}
+
+/*
+ * after a lockless search, this makes sure a given key is still
+ * inside the min/max of a leaf. If not, you have to repeat the
+ * search and try again.
+ */
+static int verify_key_in_leaf(struct sl_leaf *leaf, unsigned long key,
+ unsigned long size)
+{
+ if (leaf->node.dead)
+ return 0;
+
+ if (key + size < sl_min_key(leaf) ||
+ key >= sl_max_key(leaf))
+ return 0;
+ return 1;
+}
+
+/* The insertion code tries to delay taking locks for as long as possible.
+ * Once we've found a good place to insert, we need to make sure the leaf
+ * we have picked is still a valid location.
+ *
+ * This checks the previous and next pointers to make sure everything is
+ * still correct for the insert. You should send an unlocked leaf, and
+ * it will return 1 with the leaf locked if everything worked.
+ *
+ * We return 0 if the insert cannot proceed, and the leaf is returned unlocked.
+ */
+static int verify_key_in_path(struct sl_list *list,
+ struct sl_node *node, unsigned long key,
+ unsigned long size, int level,
+ struct sl_node **cursor,
+ struct sl_node **locked)
+{
+ struct sl_leaf *prev = NULL;
+ struct sl_leaf *next;
+ struct sl_node *p;
+ struct sl_leaf *leaf = NULL;
+ struct sl_node *lock1;
+ struct sl_node *lock2;
+ struct sl_node *lock3;
+
+ BUG_ON(*locked);
+
+again:
+ lock1 = NULL;
+ lock2 = NULL;
+ lock3 = NULL;
+ if (node != list->head) {
+ p = node->ptrs[level].prev;
+ if (!found_in_cursor(cursor, SKIP_MAXLEVEL, p)) {
+ lock1 = p;
+ sl_lock_node(p);
+ }
+ sl_lock_node(node);
+ lock2 = node;
+
+ if (p->dead || node->dead)
+ goto out;
+
+ if (p != list->head)
+ prev = sl_entry(p);
+
+ /*
+ * once we have the locks, make sure everyone
+ * still points to each other
+ */
+ if (node->ptrs[level].prev != p ||
+ p->ptrs[level].next != node) {
+ sl_unlock_node(lock1);
+ sl_unlock_node(lock2);
+ goto again;
+ }
+
+ leaf = sl_entry(node);
+ } else {
+ sl_lock_node(node);
+ lock2 = node;
+ }
+
+ /*
+ * rule #1, the key must be greater than the max of the previous
+ * leaf
+ */
+ if (prev && key < sl_max_key(prev))
+ goto out;
+
+ /* we're done with prev, unlock it */
+ sl_unlock_node(lock1);
+ lock1 = NULL;
+
+ p = node->ptrs[level].next;
+ if (p)
+ next = sl_entry(p);
+ else
+ next = NULL;
+
+ if (next) {
+ sl_lock_node(&next->node);
+ lock3 = &next->node;
+ /*
+ * rule #2 the key must be smaller than the min key
+ * in the next node
+ */
+ if (node->ptrs[level].next != &next->node ||
+ next->node.ptrs[level].prev != node ||
+ next->node.dead || key >= sl_min_key(next)) {
+ /* next may be in the middle of a delete
+ * here. If so, we can't just goto
+ * again because the delete is waiting
+ * for our lock on node.
+ * FIXME, we can try harder to avoid
+ * the goto out here
+ */
+ goto out;
+ }
+ }
+ /*
+ * return with our leaf locked and sure that our leaf is the
+ * best place for this key
+ */
+ *locked = node;
+ sl_unlock_node(lock1);
+ sl_unlock_node(lock3);
+ return 1;
+
+out:
+ sl_unlock_node(lock1);
+ sl_unlock_node(lock2);
+ sl_unlock_node(lock3);
+ return 0;
+}
+
+
+/*
+ * Before calling this you must have stocked the preload area by
+ * calling skiplist_preload, and you must have kept preemption
+ * off. preload_token comes from skiplist_preload, pass in
+ * exactly what preload gave you.
+ *
+ * More details in the comments below.
+ */
+int skiplist_insert(struct sl_list *list, struct sl_slot *slot,
+ int preload_token)
+{
+ struct sl_node **cursor;
+ struct sl_node *p;
+ struct sl_node *ins_locked = NULL;
+ struct sl_leaf *leaf;
+ unsigned long key = slot->key;
+ unsigned long size = slot->size;
+ unsigned long min_key;
+ unsigned long max_key;
+ int level;
+ int pending_level = pending_insert_level(preload_token);
+ int ret;
+
+ rcu_read_lock();
+ ret = -EEXIST;
+ cursor = get_cursor();
+
+ /*
+ * notes on pending_level, locking and general flow:
+ *
+ * pending_level is the level of the node we might insert
+ * if we can't find free space in the tree. It's important
+ * because it tells us where our cursor is going to start
+ * recording nodes, and also the first node we have to lock
+ * to keep other inserts from messing with the nodes in our cursor.
+ *
+ * The most common answer is pending_level == 0, which is great
+ * because that means we won't have to take a lock until the
+ * very last level.
+ *
+ * if we're really lucky, we doing a level 0 insert for a key past
+ * our current max. We can just jump down to the insertion
+ * code
+ */
+ leaf = sl_last_leaf(list);
+ if (leaf && sl_min_key(leaf) <= key &&
+ (pending_level == 0 || leaf->nr < SKIP_KEYS_PER_NODE)) {
+ p = &leaf->node;
+
+ /* lock and recheck */
+ sl_lock_node(p);
+
+ if (!p->dead && sl_min_key(leaf) <= key &&
+ leaf == sl_last_leaf(list) &&
+ leaf->node.ptrs[0].next == NULL &&
+ (pending_level == 0 || leaf->nr < SKIP_KEYS_PER_NODE)) {
+ ins_locked = p;
+ level = 0;
+ goto find_or_add;
+ } else {
+ sl_unlock_node(p);
+ }
+ }
+
+again:
+ /*
+ * the goto again code below will bump pending level
+ * so we start locking higher and higher in the tree as
+ * contention increases. Make sure to limit
+ * it to SKIP_MAXLEVEL
+ */
+ pending_level = min(pending_level, SKIP_MAXLEVEL);
+ p = list->head;
+ level = list->level;
+
+ /*
+ * once we hit pending_level, we have to start filling
+ * in the cursor and locking nodes
+ */
+ if (level <= pending_level) {
+ if (level != pending_level)
+ add_to_cursor(cursor, pending_level, p);
+ add_to_cursor(cursor, level, p);
+ }
+
+ /*
+ * skip over any NULL levels in the list
+ */
+ while (p->ptrs[level].next == NULL && level > 0) {
+ level--;
+ if (level <= pending_level) {
+ add_to_cursor(cursor, level, p);
+ }
+ }
+
+ do {
+ while (1) {
+ leaf = sl_next_leaf(list, p, level);
+ if (!leaf) {
+ /*
+ * if we're at level 0 and p points to
+ * the head, the list is just empty. If
+ * we're not at level 0 yet, keep walking
+ * down.
+ */
+ if (p == list->head || level != 0)
+ break;
+
+ /*
+ * p was the last leaf on the bottom level,
+ * We're here because 'key' was bigger than the
+ * max key in p. find_or_add will append into
+ * the last leaf.
+ */
+ goto find_or_add;
+ }
+
+ /*
+ * once we get down to the pending level, we have to
+ * start locking. Lock the node and verify it really
+ * is exactly the node we expected to find. It may
+ * get used in the cursor.
+ */
+ if (level <= pending_level) {
+ sl_lock_node(&leaf->node);
+ if (leaf->node.dead ||
+ find_live_next(list, p, level) != &leaf->node ||
+ find_live_prev(list, &leaf->node, level) != p) {
+ sl_unlock_node(&leaf->node);
+ if (!found_in_cursor(cursor,
+ pending_level, p))
+ sl_unlock_node(p);
+ free_cursor_locks(cursor, pending_level);
+ goto again;
+ }
+ }
+ min_key = sl_min_key(leaf);
+ max_key = sl_max_key(leaf);
+
+ /*
+ * strictly speaking this test is covered again below.
+ * But usually we have to walk forward through the
+ * pointers, so this is the most common condition. Try
+ * it first.
+ */
+ if (key >= max_key)
+ goto next;
+
+ if (key < min_key) {
+ /*
+ * when we aren't locking, we have to make sure
+ * new nodes haven't appeared between p and us.
+ */
+ if (level > pending_level &&
+ (find_live_prev(list, &leaf->node, level) != p ||
+ min_key != sl_min_key(leaf))) {
+ goto again;
+ }
+
+ /*
+ * our key is smaller than the smallest key in
+ * leaf. If we're not in level 0 yet, we don't
+ * want to cross over into the leaf
+ */
+ if (level != 0) {
+ if (level <= pending_level)
+ sl_unlock_node(&leaf->node);
+ break;
+ }
+
+ /*
+ * we are in level 0, just stuff our slot into
+ * the front of this leaf. We could also stuff
+ * our slot into p. FIXME, we should pick the
+ * leaf with the smallest number of items.
+ */
+ if (level <= pending_level &&
+ !found_in_cursor(cursor, pending_level, p)) {
+ sl_unlock_node(p);
+ }
+
+ p = &leaf->node;
+ goto find_or_add;
+ }
+
+ if (key < sl_max_key(leaf)) {
+ /*
+ * our key is >= the min and < the max. This
+ * leaf is the one true home for our key. The
+ * level doesn't matter, we could walk the
+ * whole tree and this would still be the best
+ * location.
+ *
+ * So, stop now and do the insert. Our cursor
+ * might not be fully formed down to level0,
+ * but that's ok because every pointer from
+ * our current level down to zero is going to
+ * be this one leaf. find_or_add deals with
+ * all of that.
+ *
+ * If we haven't already locked this leaf,
+ * do so now and make sure it still is
+ * the right location for our key.
+ */
+ if (level > pending_level) {
+ sl_lock_node(&leaf->node);
+ if (key < sl_min_key(leaf) ||
+ key >= sl_max_key(leaf)) {
+ sl_unlock_node(&leaf->node);
+ pending_level = level;
+ goto again;
+ }
+ /*
+ * remember that we've locked this
+ * leaf for the goto find_or_add
+ */
+ ins_locked = &leaf->node;
+ }
+
+ /* unless p is in our cursor, we're done with it */
+ if (level <= pending_level &&
+ !found_in_cursor(cursor, pending_level, p)) {
+ sl_unlock_node(p);
+ }
+
+ p = &leaf->node;
+ goto find_or_add;
+ }
+next:
+ /* walk our lock forward */
+ if (level <= pending_level &&
+ !found_in_cursor(cursor, pending_level, p)) {
+ sl_unlock_node(p);
+ }
+ p = &leaf->node;
+ }
+
+ /*
+ * the while loop is done with this level. Put
+ * p into our cursor if we've started locking/
+ */
+ if (level <= pending_level)
+ add_locked_to_cursor(cursor, level, p);
+
+ level--;
+
+ /*
+ * pending_level is the line that tells us where we
+ * need to start locking. Each node
+ * we record in the cursor needs to be exactly right,
+ * so we verify the first node in the cursor here.
+ * At this point p isn't in the cursor yet but it
+ * will be (or a downstream pointer at the
+ * same level).
+ */
+ if (level == pending_level) {
+ struct sl_node *locked = NULL;
+ if (!verify_key_in_path(list, p,
+ key, size, level + 1,
+ cursor, &locked)) {
+ pending_level++;
+ goto again;
+ }
+ cursor[level] = locked;
+ }
+ } while (level >= 0);
+
+ /* we only get here if the list is completely empty. FIXME
+ * this can be folded into the find_or_add code below
+ */
+ if (!cursor[0]) {
+ add_to_cursor(cursor, 0, list->head);
+ if (list->head->ptrs[0].next != NULL) {
+ free_cursor_locks(cursor, pending_level);
+ goto again;
+ }
+
+ }
+ leaf = alloc_leaf(slot, key, preload_token);
+ level = leaf->node.level;
+
+ if (level > list->level) {
+ list->level++;
+ cursor[list->level] = list->head;
+ }
+
+ sl_link_node(list, &leaf->node, cursor, level);
+ ret = 0;
+ free_cursor_locks(cursor, list->level);
+ rcu_read_unlock();
+
+ return ret;
+
+find_or_add:
+
+ leaf = sl_entry(p);
+ if (!ins_locked) {
+ if (level > pending_level) {
+ /* our cursor is empty, lock this one node */
+ if (!verify_key_in_path(list, p, key, size, 0,
+ cursor, &ins_locked)) {
+ free_cursor_locks(cursor, pending_level);
+ pending_level++;
+ goto again;
+ }
+ } else if (!found_in_cursor(cursor, pending_level, p)) {
+ /* we have a good cursor, but we're linking after
+ * p. Make sure it gets unlocked below
+ */
+ ins_locked = p;
+ }
+ }
+
+ ret = find_or_add_key(list, key, size, leaf, slot, cursor, preload_token);
+ free_cursor_locks(cursor, pending_level);
+ sl_unlock_node(ins_locked);
+ rcu_read_unlock();
+ return ret;
+}
+EXPORT_SYMBOL(skiplist_insert);
+
+/*
+ * lookup has two stages. First we find the leaf that should have
+ * our key, and then we go through all the slots in that leaf and
+ * look for the key. This helper function is just the first stage
+ * and it must be called under rcu_read_lock(). You may be using the
+ * non-rcu final lookup variant, but this part must be rcu.
+ *
+ * We'll return NULL if we find nothing or the candidate leaf
+ * for you to search.
+ */
+static struct sl_leaf *skiplist_lookup_leaf(struct sl_list *list,
+ struct sl_node **last,
+ unsigned long key,
+ unsigned long size)
+{
+ struct sl_node *p;
+ struct sl_leaf *leaf;
+ int level;
+ struct sl_leaf *leaf_ret = NULL;
+ unsigned long max_key = 0;
+ unsigned long min_key = 0;
+
+again:
+ level = list->level;
+ p = list->head;
+ do {
+ while ((leaf = sl_next_leaf(list, p, level))) {
+ max_key = sl_max_key(leaf);
+ min_key = sl_min_key(leaf);
+
+ if (key >= max_key)
+ goto next;
+
+ if (key < min_key) {
+ smp_rmb();
+
+ /*
+ * we're about to stop walking. Make sure
+ * no new pointers have been inserted between
+ * p and our leaf
+ */
+ if (find_live_prev(list, &leaf->node, level) != p ||
+ sl_min_key(leaf) != min_key ||
+ p->dead ||
+ leaf->node.dead) {
+ goto again;
+ }
+ break;
+ }
+
+ if (key < max_key) {
+ leaf_ret = leaf;
+ goto done;
+ }
+next:
+ p = &leaf->node;
+ }
+ level--;
+ } while (level >= 0);
+
+done:
+ if (last)
+ *last = p;
+ return leaf_ret;
+}
+
+/*
+ * this lookup function expects RCU to protect the slots in the leaves
+ * as well as the skiplist indexing structures
+ *
+ * Note, you must call this with rcu_read_lock held, and you must verify
+ * the result yourself. If the key field of the returned slot doesn't
+ * match your key, repeat the lookup. Reference counting etc is also
+ * all your responsibility.
+ */
+struct sl_slot *skiplist_lookup_rcu(struct sl_list *list, unsigned long key,
+ unsigned long size)
+{
+ struct sl_leaf *leaf;
+ struct sl_slot *slot_ret = NULL;
+ int slot;
+ int ret;
+
+again:
+ leaf = skiplist_lookup_leaf(list, NULL, key, size);
+ if (leaf) {
+ ret = leaf_slot(leaf, key, size, &slot);
+ if (ret == 0)
+ slot_ret = rcu_dereference(leaf->ptrs[slot]);
+ else if (!verify_key_in_leaf(leaf, key, size))
+ goto again;
+ }
+ return slot_ret;
+
+}
+EXPORT_SYMBOL(skiplist_lookup_rcu);
+
+/*
+ * this lookup function only uses RCU to protect the skiplist indexing
+ * structs. The actual slots are protected by full locks.
+ */
+struct sl_slot *skiplist_lookup(struct sl_list *list, unsigned long key,
+ unsigned long size)
+{
+ struct sl_leaf *leaf;
+ struct sl_slot *slot_ret = NULL;
+ struct sl_node *p;
+ int slot;
+ int ret;
+
+again:
+ rcu_read_lock();
+ leaf = skiplist_lookup_leaf(list, &p, key, size);
+ if (leaf) {
+ sl_lock_node(&leaf->node);
+ if (!verify_key_in_leaf(leaf, key, size)) {
+ sl_unlock_node(&leaf->node);
+ rcu_read_unlock();
+ goto again;
+ }
+ ret = leaf_slot_locked(leaf, key, size, &slot);
+ if (ret == 0)
+ slot_ret = leaf->ptrs[slot];
+ sl_unlock_node(&leaf->node);
+ }
+ rcu_read_unlock();
+ return slot_ret;
+
+}
+EXPORT_SYMBOL(skiplist_lookup);
+
+/* helper for skiplist_insert_hole. the iommu requires alignment */
+static unsigned long align_start(unsigned long val, unsigned long align)
+{
+ return (val + align - 1) & ~(align - 1);
+}
+
+/*
+ * this is pretty ugly, but it is used to find a free spot in the
+ * tree for a new iommu allocation. We start from a given
+ * hint and try to find an aligned range of a given size.
+ *
+ * Send the slot pointer, and we'll update it with the location
+ * we found.
+ *
+ * This will return -EAGAIN if we found a good spot but someone
+ * raced in and allocated it before we could. This gives the
+ * caller the chance to update their hint.
+ *
+ * This will return -EEXIST if we couldn't find anything at all
+ *
+ * returns 0 if all went well, or some other negative error
+ * if things went badly.
+ */
+int skiplist_insert_hole(struct sl_list *list, unsigned long hint,
+ unsigned long limit,
+ unsigned long size, unsigned long align,
+ struct sl_slot *slot,
+ gfp_t gfp_mask)
+{
+ unsigned long last_end = 0;
+ struct sl_node *p;
+ struct sl_leaf *leaf;
+ int i;
+ int ret = -EEXIST;
+ int preload_token;
+ int pending_level;
+
+ preload_token = skiplist_preload(list, gfp_mask);
+ if (preload_token < 0) {
+ return preload_token;
+ }
+ pending_level = pending_insert_level(preload_token);
+
+ /* step one, lets find our hint */
+ rcu_read_lock();
+again:
+
+ last_end = max(last_end, hint);
+ last_end = align_start(last_end, align);
+ slot->key = align_start(hint, align);
+ slot->size = size;
+ leaf = skiplist_lookup_leaf(list, &p, hint, 1);
+ if (!p)
+ p = list->head;
+
+ if (leaf && !verify_key_in_leaf(leaf, hint, size)) {
+ goto again;
+ }
+
+again_lock:
+ sl_lock_node(p);
+ if (p->dead) {
+ sl_unlock_node(p);
+ goto again;
+ }
+
+ if (p != list->head) {
+ leaf = sl_entry(p);
+ /*
+ * the leaf we found was past the hint,
+ * go back one
+ */
+ if (sl_max_key(leaf) > hint) {
+ struct sl_node *locked = p;
+ p = p->ptrs[0].prev;
+ sl_unlock_node(locked);
+ goto again_lock;
+ }
+ last_end = align_start(sl_max_key(sl_entry(p)), align);
+ }
+
+ /*
+ * now walk at level 0 and find a hole. We could use lockless walks
+ * if we wanted to bang more on the insertion code, but this
+ * instead holds the lock on each node as we inspect it
+ *
+ * This is a little sloppy, insert will return -eexist if we get it
+ * wrong.
+ */
+ while(1) {
+ leaf = sl_next_leaf(list, p, 0);
+ if (!leaf)
+ break;
+
+ /* p and leaf are locked */
+ sl_lock_node(&leaf->node);
+ if (last_end > sl_max_key(leaf))
+ goto next;
+
+ for (i = 0; i < leaf->nr; i++) {
+ if (last_end > leaf->keys[i])
+ continue;
+ if (leaf->keys[i] - last_end >= size) {
+
+ if (last_end + size > limit) {
+ sl_unlock_node(&leaf->node);
+ goto out_rcu;
+ }
+
+ sl_unlock_node(p);
+ slot->key = last_end;
+ slot->size = size;
+ goto try_insert;
+ }
+ last_end = leaf->keys[i] + leaf->ptrs[i]->size;
+ last_end = align_start(last_end, align);
+ if (last_end + size > limit) {
+ sl_unlock_node(&leaf->node);
+ goto out_rcu;
+ }
+ }
+next:
+ sl_unlock_node(p);
+ p = &leaf->node;
+ }
+
+ if (last_end + size <= limit) {
+ sl_unlock_node(p);
+ slot->key = last_end;
+ slot->size = size;
+ goto try_insert;
+ }
+
+out_rcu:
+ /* we've failed */
+ sl_unlock_node(p);
+ rcu_read_unlock();
+ preempt_enable();
+
+ return ret;
+
+try_insert:
+ /*
+ * if the pending_level is zero or there is room in the
+ * leaf, we're ready to insert. This is true most of the
+ * time, and we won't have to drop our lock and give others
+ * the chance to race in and steal our spot.
+ */
+ if (leaf && (pending_level == 0 || leaf->nr < SKIP_KEYS_PER_NODE) &&
+ !leaf->node.dead && (slot->key >= sl_min_key(leaf) &&
+ slot->key + slot->size <= sl_max_key(leaf))) {
+ /* pass null for a cursor, it won't get used */
+ ret = find_or_add_key(list, slot->key, size, leaf, slot,
+ NULL, preload_token);
+ sl_unlock_node(&leaf->node);
+ rcu_read_unlock();
+ goto out;
+ }
+ /*
+ * no such luck, drop our lock and try the insert the
+ * old fashioned way
+ */
+ if (leaf)
+ sl_unlock_node(&leaf->node);
+
+ rcu_read_unlock();
+ ret = skiplist_insert(list, slot, preload_token);
+
+out:
+ /*
+ * if we get an EEXIST here, it just means we lost the race.
+ * return eagain to the caller so they can update the hint
+ */
+ if (ret == -EEXIST)
+ ret = -EAGAIN;
+
+ preempt_enable();
+ return ret;
+}
+EXPORT_SYMBOL(skiplist_insert_hole);
+
+/*
+ * we erase one level at a time, from top to bottom.
+ * The basic idea is to find a live prev and next pair,
+ * and make them point to each other.
+ *
+ * For a given level, this takes locks on prev, node, next
+ * makes sure they all point to each other and then
+ * removes node from the middle.
+ *
+ * The node must already be marked dead and it must already
+ * be empty.
+ */
+static void erase_one_level(struct sl_list *list,
+ struct sl_node *node, int level)
+{
+ struct sl_node *prev;
+ struct sl_node *next;
+ struct sl_node *test_prev;
+ struct sl_node *test_next;
+
+again:
+ prev = find_live_prev(list, node, level);
+ sl_lock_node(prev);
+ sl_lock_node(node);
+
+ test_prev = find_live_prev(list, node, level);
+ if (test_prev != prev || prev->dead) {
+ sl_unlock_node(prev);
+ sl_unlock_node(node);
+ goto again;
+ }
+
+again_next:
+ next = find_live_next(list, prev, level);
+ if (next) {
+ sl_lock_node(next);
+ test_next = find_live_next(list, prev, level);
+ if (test_next != next || next->dead) {
+ sl_unlock_node(next);
+ goto again_next;
+ }
+ test_prev = find_live_prev(list, next, level);
+ test_next = find_live_next(list, prev, level);
+ if (test_prev != prev || test_next != next) {
+ sl_unlock_node(prev);
+ sl_unlock_node(node);
+ sl_unlock_node(next);
+ goto again;
+ }
+ } else {
+ test_next = find_live_next(list, prev, level);
+ if (test_next) {
+ sl_unlock_node(prev);
+ sl_unlock_node(node);
+ goto again;
+ }
+ }
+ rcu_assign_pointer(prev->ptrs[level].next, next);
+ if (next)
+ rcu_assign_pointer(next->ptrs[level].prev, prev);
+ else if (prev != list->head)
+ rcu_assign_pointer(list->head->ptrs[level].prev, prev);
+ else
+ rcu_assign_pointer(list->head->ptrs[level].prev, NULL);
+
+ sl_unlock_node(prev);
+ sl_unlock_node(node);
+ sl_unlock_node(next);
+}
+
+void sl_erase(struct sl_list *list, struct sl_leaf *leaf)
+{
+ int i;
+ int level = leaf->node.level;
+
+ for (i = level; i >= 0; i--)
+ erase_one_level(list, &leaf->node, i);
+}
+
+/*
+ * helper for skiplist_delete, this pushes pointers
+ * around to remove a single slot
+ */
+static void delete_slot(struct sl_leaf *leaf, int slot)
+{
+ if (slot != leaf->nr - 1) {
+ int i;
+ for (i = slot; i <= leaf->nr - 1; i++) {
+ leaf->keys[i] = leaf->keys[i + 1];
+ leaf->ptrs[i] = leaf->ptrs[i + 1];
+ smp_wmb();
+ }
+ } else if (leaf->nr > 1) {
+ leaf->max = leaf->keys[leaf->nr - 2] +
+ leaf->ptrs[leaf->nr - 2]->size;
+ smp_wmb();
+ }
+ leaf->nr--;
+}
+
+/*
+ * find a given [key, size] in the skiplist and remove it.
+ * If we find anything, we return the slot pointer that
+ * was stored in the tree.
+ *
+ * deletion involves a mostly lockless lookup to
+ * find the right leaf. Then we take the lock and find the
+ * correct slot.
+ *
+ * The slot is removed from the leaf, and if the leaf
+ * is now empty, it is removed from the skiplist.
+ *
+ * FIXME -- merge mostly empty leaves.
+ */
+struct sl_slot *skiplist_delete(struct sl_list *list,
+ unsigned long key,
+ unsigned long size)
+{
+ struct sl_slot *slot_ret = NULL;
+ struct sl_leaf *leaf;
+ int slot;
+ int ret;
+
+ rcu_read_lock();
+again:
+ leaf = skiplist_lookup_leaf(list, NULL, key, size);
+ if (!leaf)
+ goto out;
+
+ sl_lock_node(&leaf->node);
+ if (!verify_key_in_leaf(leaf, key, size)) {
+ sl_unlock_node(&leaf->node);
+ goto again;
+ }
+
+ ret = leaf_slot_locked(leaf, key, size, &slot);
+ if (ret == 0) {
+ slot_ret = leaf->ptrs[slot];
+ } else {
+ sl_unlock_node(&leaf->node);
+ goto out;
+ }
+
+ delete_slot(leaf, slot);
+ if (leaf->nr == 0) {
+ /*
+ * sl_erase has to mess wit the prev pointers, so
+ * we need to unlock it here
+ */
+ leaf->node.dead = 1;
+ sl_unlock_node(&leaf->node);
+ sl_erase(list, leaf);
+ sl_free_leaf(leaf);
+ } else {
+ sl_unlock_node(&leaf->node);
+ }
+out:
+ rcu_read_unlock();
+ return slot_ret;
+}
+EXPORT_SYMBOL(skiplist_delete);
+
+int sl_init_list(struct sl_list *list, gfp_t mask)
+{
+ int i;
+
+ list->head = kmalloc(sl_node_size(SKIP_MAXLEVEL), mask);
+ if (!list->head)
+ return -ENOMEM;
+ sl_init_node(list->head, SKIP_MAXLEVEL);
+ list->level = 0;
+ spin_lock_init(&list->lock);
+
+ for (i = 0; i < SKIP_MAXLEVEL; i++) {
+ list->head->ptrs[i].next = NULL;
+ list->head->ptrs[i].prev = NULL;
+ }
+ return 0;
+}
+EXPORT_SYMBOL(sl_init_list);
+
+
+static int skiplist_callback(struct notifier_block *nfb,
+ unsigned long action,
+ void *hcpu)
+{
+ int cpu = (long)hcpu;
+ struct skip_preload *skp;
+ struct sl_leaf *l;
+ int level;
+ int i;
+
+ /* Free per-cpu pool of preloaded nodes */
+ if (action == CPU_DEAD || action == CPU_DEAD_FROZEN) {
+ skp = &per_cpu(skip_preloads, cpu);
+ for (i = 0; i < SKIP_MAXLEVEL + 1; i++) {
+ l = skp->preload[i];
+ if (!l)
+ continue;
+ level = l->node.level;
+ kmem_cache_free(slab_caches[level], l);
+ skp->preload[i] = NULL;
+ }
+ }
+ return NOTIFY_OK;
+}
+
+void __init skiplist_init(void)
+{
+ char buffer[16];
+ int i;
+
+ hotcpu_notifier(skiplist_callback, 0);
+ for (i = 0; i < SKIP_MAXLEVEL; i++) {
+ snprintf(buffer, 16, "skiplist-%d", i);
+ slab_caches[i] = kmem_cache_create(buffer,
+ sl_leaf_size(i), 0,
+ SLAB_RECLAIM_ACCOUNT | SLAB_MEM_SPREAD |
+ SLAB_DESTROY_BY_RCU,
+ NULL);
+ }
+}
+
diff --git a/lib/skiplist_test.c b/lib/skiplist_test.c
new file mode 100644
index 0000000..414b853
--- /dev/null
+++ b/lib/skiplist_test.c
@@ -0,0 +1,688 @@
+/*
+ * (C) 2013 Fusion-io
+ *
+ * This program is free software; you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software
+ * Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 59 Temple
+ * Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/slab.h>
+#include <linux/module.h>
+#include <linux/sched.h>
+#include <linux/moduleparam.h>
+#include <linux/skiplist.h>
+#include <linux/kthread.h>
+#include <linux/rbtree.h>
+#include <linux/random.h>
+
+static int threads = 1;
+static int rounds = 100;
+static int items = 100;
+static int module_exiting;
+static struct completion startup = COMPLETION_INITIALIZER(startup);
+static DEFINE_MUTEX(fill_mutex);
+static int filled;
+
+static struct timespec *times;
+
+
+#define FILL_TIME_INDEX 0
+#define CHECK_TIME_INDEX 1
+#define DEL_TIME_INDEX 2
+#define FIRST_THREAD_INDEX 3
+
+#define SKIPLIST_RCU_BENCH 1
+#define SKIPLIST_BENCH 2
+#define RBTREE_BENCH 3
+
+static int benchmark = SKIPLIST_RCU_BENCH;
+
+module_param(threads, int, 0);
+module_param(rounds, int, 0);
+module_param(items, int, 0);
+module_param(benchmark, int, 0);
+
+MODULE_PARM_DESC(threads, "number of threads to run");
+MODULE_PARM_DESC(rounds, "how many random operations to run");
+MODULE_PARM_DESC(items, "number of items to fill the list with");
+MODULE_PARM_DESC(benchmark, "benchmark to run 1=skiplist-rcu 2=skiplist-locking 3=rbtree");
+MODULE_LICENSE("GPL");
+
+static atomic_t threads_running = ATOMIC_INIT(0);
+
+/*
+ * since the skiplist code is more concurrent, it is also more likely to
+ * have races into the same slot during our delete/insert bashing run.
+ * This makes counts the number of delete/insert pairs done so we can
+ * make sure the results are roughly accurate
+ */
+static atomic_t pops_done = ATOMIC_INIT(0);
+
+static struct kmem_cache *slot_cache;
+struct sl_list skiplist;
+
+spinlock_t rbtree_lock;
+struct rb_root rb_root = RB_ROOT;
+
+struct rbtree_item {
+ struct rb_node rb_node;
+ unsigned long key;
+ unsigned long size;
+};
+
+static int __insert_one_rbtree(struct rb_root *root, struct rbtree_item *ins)
+{
+ struct rb_node **p = &root->rb_node;
+ struct rb_node *parent = NULL;
+ struct rbtree_item *item;
+ unsigned long key = ins->key;
+ int ret = -EEXIST;
+
+ while (*p) {
+ parent = *p;
+ item = rb_entry(parent, struct rbtree_item, rb_node);
+
+ if (key < item->key)
+ p = &(*p)->rb_left;
+ else if (key >= item->key + item->size)
+ p = &(*p)->rb_right;
+ else
+ goto out;
+ }
+
+ rb_link_node(&ins->rb_node, parent, p);
+ rb_insert_color(&ins->rb_node, root);
+ ret = 0;
+out:
+
+ return ret;
+}
+
+static int insert_one_rbtree(struct rb_root *root, unsigned long key,
+ unsigned long size)
+{
+ int ret;
+ struct rbtree_item *ins;
+
+ ins = kmalloc(sizeof(*ins), GFP_KERNEL);
+ ins->key = key;
+ ins->size = size;
+
+ spin_lock(&rbtree_lock);
+ ret = __insert_one_rbtree(root, ins);
+ spin_unlock(&rbtree_lock);
+
+ if (ret) {
+ printk(KERN_CRIT "err %d inserting rbtree key %lu\n", ret, key);
+ kfree(ins);
+ }
+ return ret;
+}
+
+static struct rbtree_item *__lookup_one_rbtree(struct rb_root *root, unsigned long key)
+{
+ struct rb_node *p = root->rb_node;
+ struct rbtree_item *item;
+ struct rbtree_item *ret;
+
+ while (p) {
+ item = rb_entry(p, struct rbtree_item, rb_node);
+
+ if (key < item->key)
+ p = p->rb_left;
+ else if (key >= item->key + item->size)
+ p = p->rb_right;
+ else {
+ ret = item;
+ goto out;
+ }
+ }
+
+ ret = NULL;
+out:
+ return ret;
+}
+
+static int lookup_one_rbtree(struct rb_root *root, unsigned long key)
+{
+ struct rbtree_item *item;
+ int ret;
+
+ spin_lock(&rbtree_lock);
+ item = __lookup_one_rbtree(root, key);
+ if (item)
+ ret = 0;
+ else
+ ret = -ENOENT;
+ spin_unlock(&rbtree_lock);
+
+ return ret;
+}
+
+static int pop_one_rbtree(struct rb_root *root, unsigned long key)
+{
+ int ret = 0;
+ struct rbtree_item *item;
+ struct rbtree_item **victims;
+ int nr_victims = 128;
+ int found = 0;
+ int loops = 0;
+ int i;
+
+ nr_victims = min(nr_victims, items / 2);
+
+ victims = kzalloc(nr_victims * sizeof(victims[0]), GFP_KERNEL);
+ /*
+ * this is intentionally deleting adjacent items to empty
+ * skiplist leaves. The goal is to find races between
+ * leaf deletion and the rest of the code
+ */
+ while (found < nr_victims && loops < 256) {
+ loops++;
+
+ spin_lock(&rbtree_lock);
+ item = __lookup_one_rbtree(root, key + loops * 4096);
+ if (item) {
+ victims[found] = item;
+ atomic_inc(&pops_done);
+ rb_erase(&item->rb_node, root);
+ found++;
+ }
+ spin_unlock(&rbtree_lock);
+ cond_resched();
+ }
+
+ for (i = 0; i < found; i++) {
+ item = victims[i];
+ spin_lock(&rbtree_lock);
+ ret = __insert_one_rbtree(root, item);
+ if (ret) {
+ printk(KERN_CRIT "pop_one unable to insert %lu\n", key);
+ kfree(item);
+ }
+ spin_unlock(&rbtree_lock);
+ cond_resched();
+ }
+ kfree(victims);
+ return ret;
+
+}
+
+static int run_initial_fill_rbtree(void)
+{
+ unsigned long i;
+ unsigned long key;
+ int ret;
+ int inserted = 0;
+
+ sl_init_list(&skiplist, GFP_KERNEL);
+
+ for (i = 0; i < items; i++) {
+ key = i * 4096;
+ ret = insert_one_rbtree(&rb_root, key, 4096);
+ if (ret)
+ return ret;
+ inserted++;
+ }
+ printk("rbtree inserted %d items\n", inserted);
+ return 0;
+}
+
+static void check_post_work_rbtree(void)
+{
+ unsigned long i;
+ unsigned long key;
+ int errors = 0;
+ int ret;
+
+ for (i = 0; i < items; i++) {
+ key = i * 4096;
+ ret = lookup_one_rbtree(&rb_root, key);
+ if (ret) {
+ printk("rbtree failed to find key %lu\n", key);
+ errors++;
+ }
+ cond_resched();
+ }
+ printk(KERN_CRIT "rbtree check found %d errors\n", errors);
+}
+
+static void delete_all_items_rbtree(void)
+{
+ unsigned long i;
+ unsigned long key;
+ int mid = items / 2;
+ int bounce;
+ struct rbtree_item *item;
+
+ for (i = 0; i < mid; i++) {
+ bounce = 0;
+ key = i * 4096;
+again:
+ spin_lock(&rbtree_lock);
+ item = __lookup_one_rbtree(&rb_root, key);
+ if (!item)
+ printk(KERN_CRIT "delete_all unable to find %lu\n", key);
+ rb_erase(&item->rb_node, &rb_root);
+ spin_unlock(&rbtree_lock);
+ kfree(item);
+
+ if (!bounce) {
+ key = (items - 1 - i) * 4096;
+ bounce = 1;
+ goto again;
+ }
+ }
+}
+
+static int insert_one_skiplist(struct sl_list *skiplist, unsigned long key,
+ unsigned long size)
+{
+ int ret;
+ int preload_token;
+ struct sl_slot *slot;
+
+ slot = kmem_cache_alloc(slot_cache, GFP_KERNEL);
+ if (!slot)
+ return -ENOMEM;
+
+ slot->key = key;
+ slot->size = size;
+
+ preload_token = skiplist_preload(skiplist, GFP_KERNEL);
+ if (preload_token < 0) {
+ ret = preload_token;
+ goto out;
+ }
+
+ ret = skiplist_insert(skiplist, slot, preload_token);
+ preempt_enable();
+
+out:
+ if (ret)
+ kmem_cache_free(slot_cache, slot);
+
+ return ret;
+}
+
+static unsigned long tester_random(void)
+{
+ return prandom_u32();
+}
+
+static int run_initial_fill_skiplist(void)
+{
+ unsigned long i;
+ unsigned long key;
+ int ret;
+ int inserted = 0;
+
+ sl_init_list(&skiplist, GFP_KERNEL);
+
+ for (i = 0; i < items; i++) {
+ key = i * 4096;
+ ret = insert_one_skiplist(&skiplist, key, 4096);
+ if (ret)
+ return ret;
+ inserted++;
+ }
+ printk("skiplist inserted %d items\n", inserted);
+ return 0;
+}
+
+static void check_post_work_skiplist(void)
+{
+ unsigned long i;
+ unsigned long key;
+ struct sl_slot *slot;
+ int errors = 0;
+
+ for (i = 0; i < items; i++) {
+ key = i * 4096;
+ if (benchmark == SKIPLIST_RCU_BENCH) {
+ rcu_read_lock();
+again:
+ slot = skiplist_lookup_rcu(&skiplist, key + 64, 512);
+ if (slot && slot->key != key) {
+ goto again;
+ }
+ rcu_read_unlock();
+ } else {
+ slot = skiplist_lookup(&skiplist, key + 64, 512);
+ }
+
+ if (!slot) {
+ printk("failed to find key %lu\n", key);
+ errors++;
+ } else if (slot->key != key) {
+ errors++;
+ printk("key mismatch wanted %lu found %lu\n", key, slot->key);
+ }
+ cond_resched();
+ }
+ printk(KERN_CRIT "skiplist check found %d errors\n", errors);
+}
+
+static void verify_post_work_skiplist(void)
+{
+ unsigned long i;
+ unsigned long key = 0;
+ struct sl_slot *slot;
+ struct sl_node *node = skiplist.head->ptrs[0].next;
+ struct sl_leaf *leaf;
+ int found = 0;
+
+ while (node) {
+ leaf = sl_entry(node);
+ for (i = 0; i < leaf->nr; i++) {
+ slot = leaf->ptrs[i];
+ if (slot->key != key) {
+ printk(KERN_CRIT "found bad key %lu wanted %lu\n", slot->key, key);
+ }
+ key += slot->size;
+ }
+ found += leaf->nr;
+ node = node->ptrs[0].next;
+ }
+ if (found != items) {
+ printk(KERN_CRIT "skiplist check found only %d items instead of %d\n", found, items);
+ } else {
+ printk(KERN_CRIT "skiplist verify passed\n");
+ }
+}
+
+static void delete_all_items_skiplist(void)
+{
+ unsigned long i;
+ unsigned long key;
+ struct sl_slot *slot;
+ int errors = 0;
+ int mid = items / 2;
+ int bounce;
+
+ for (i = 0; i < mid; i++) {
+ bounce = 0;
+ key = i * 4096;
+again:
+ slot = skiplist_delete(&skiplist, key + 512, 1);
+ if (!slot) {
+ printk("missing key %lu\n", key);
+ } else if (slot->key != key) {
+ errors++;
+ printk("key mismatch wanted %lu found %lu\n", key, slot->key);
+ }
+ kfree(slot);
+ if (!bounce) {
+ key = (items - 1 - i) * 4096;
+ bounce = 1;
+ goto again;
+ }
+ }
+ printk(KERN_CRIT "skiplist deletion done\n");
+}
+
+static int lookup_one_skiplist(struct sl_list *skiplist, unsigned long key)
+{
+ int ret = 0;
+ struct sl_slot *slot;
+
+ if (benchmark == SKIPLIST_RCU_BENCH)
+ slot = skiplist_lookup_rcu(skiplist, key, 4096);
+ else if (benchmark == SKIPLIST_BENCH)
+ slot = skiplist_lookup(skiplist, key, 4096);
+ if (!slot)
+ ret = -ENOENT;
+ return ret;
+}
+
+static int pop_one_skiplist(struct sl_list *skiplist, unsigned long key)
+{
+ int ret = 0;
+ int preload_token;
+ struct sl_slot *slot;
+ struct sl_slot **victims;
+ int nr_victims = 128;
+ int found = 0;
+ int loops = 0;
+ int i;
+
+ nr_victims = min(nr_victims, items / 2);
+
+ victims = kzalloc(nr_victims * sizeof(victims[0]), GFP_KERNEL);
+ /*
+ * this is intentionally deleting adjacent items to empty
+ * skiplist leaves. The goal is to find races between
+ * leaf deletion and the rest of the code
+ */
+ while (found < nr_victims && loops < 256) {
+ loops++;
+ slot = skiplist_delete(skiplist, key + loops * 4096, 1024);
+ if (!slot)
+ continue;
+
+ victims[found] = slot;
+ atomic_inc(&pops_done);
+ found++;
+ cond_resched();
+ }
+ for (i = 0; i < found; i++) {
+ preload_token = skiplist_preload(skiplist, GFP_KERNEL);
+ if (preload_token < 0) {
+ ret = preload_token;
+ goto out;
+ }
+
+ ret = skiplist_insert(skiplist, victims[i], preload_token);
+ if (ret) {
+ printk(KERN_CRIT "failed to insert key %lu ret %d\n", key, ret);
+ preempt_enable();
+ goto out;
+ }
+ ret = 0;
+ preempt_enable();
+ cond_resched();
+ }
+
+out:
+ kfree(victims);
+ return ret;
+}
+
+void tvsub(struct timespec * tdiff, struct timespec * t1, struct timespec * t0)
+{
+ tdiff->tv_sec = t1->tv_sec - t0->tv_sec;
+ tdiff->tv_nsec = t1->tv_nsec - t0->tv_nsec;
+ if (tdiff->tv_nsec < 0 && tdiff->tv_sec > 0) {
+ tdiff->tv_sec--;
+ tdiff->tv_nsec += 1000000000ULL;
+ }
+
+ /* time shouldn't go backwards!!! */
+ if (tdiff->tv_nsec < 0 || t1->tv_sec < t0->tv_sec) {
+ tdiff->tv_sec = 0;
+ tdiff->tv_nsec = 0;
+ }
+}
+
+static void pretty_time(struct timespec *ts, unsigned long long *seconds, unsigned long long *ms)
+{
+ unsigned long long m;
+
+ *seconds = ts->tv_sec;
+
+ m = ts->tv_nsec / 1000000ULL;
+ *ms = m;
+}
+
+static void runbench(int thread_index)
+{
+ int ret = 0;
+ unsigned long i;
+ unsigned long op;
+ unsigned long key;
+ struct timespec start;
+ struct timespec cur;
+ unsigned long long sec;
+ unsigned long long ms;
+ char *tag = "skiplist-rcu";
+
+ if (benchmark == SKIPLIST_BENCH)
+ tag = "skiplist-locking";
+ else if (benchmark == RBTREE_BENCH)
+ tag = "rbtree";
+
+ mutex_lock(&fill_mutex);
+
+ if (filled == 0) {
+ start = current_kernel_time();
+
+ printk(KERN_CRIT "Running %s benchmark\n", tag);
+
+ if (benchmark == SKIPLIST_RCU_BENCH || benchmark == SKIPLIST_BENCH)
+ ret = run_initial_fill_skiplist();
+ else if (benchmark == RBTREE_BENCH)
+ ret = run_initial_fill_rbtree();
+
+ if (ret < 0) {
+ printk(KERN_CRIT "failed to setup initial tree ret %d\n", ret);
+ filled = ret;
+ } else {
+ filled = 1;
+ }
+ cur = current_kernel_time();
+ tvsub(times + FILL_TIME_INDEX, &cur, &start);
+ }
+
+ mutex_unlock(&fill_mutex);
+ if (filled < 0)
+ return;
+
+ start = current_kernel_time();
+
+ for (i = 0; i < rounds; i++) {
+ op = tester_random();
+ key = op % items;
+ key *= 4096;
+ if (op % 2 == 0) {
+ if (benchmark == SKIPLIST_RCU_BENCH || benchmark == SKIPLIST_BENCH)
+ ret = lookup_one_skiplist(&skiplist, key);
+ else if (benchmark == RBTREE_BENCH)
+ ret = lookup_one_rbtree(&rb_root, key);
+ }
+ if (op % 3 == 0) {
+ if (benchmark == SKIPLIST_RCU_BENCH || benchmark == SKIPLIST_BENCH)
+ ret = pop_one_skiplist(&skiplist, key);
+ else if (benchmark == RBTREE_BENCH)
+ ret = pop_one_rbtree(&rb_root, key);
+ }
+ cond_resched();
+ }
+
+ cur = current_kernel_time();
+ tvsub(times + FIRST_THREAD_INDEX + thread_index, &cur, &start);
+
+ if (!atomic_dec_and_test(&threads_running)) {
+ return;
+ }
+
+ start = current_kernel_time();
+ if (benchmark == SKIPLIST_RCU_BENCH || benchmark == SKIPLIST_BENCH)
+ check_post_work_skiplist();
+ else if (benchmark == RBTREE_BENCH)
+ check_post_work_rbtree();
+
+ cur = current_kernel_time();
+
+ if (benchmark == SKIPLIST_RCU_BENCH || benchmark == SKIPLIST_BENCH)
+ verify_post_work_skiplist();
+
+ tvsub(times + CHECK_TIME_INDEX, &cur, &start);
+
+ start = current_kernel_time();
+ if (benchmark == SKIPLIST_RCU_BENCH || benchmark == SKIPLIST_BENCH)
+ delete_all_items_skiplist();
+ else if (benchmark == RBTREE_BENCH)
+ delete_all_items_rbtree();
+ cur = current_kernel_time();
+
+ tvsub(times + DEL_TIME_INDEX, &cur, &start);
+
+ pretty_time(×[FILL_TIME_INDEX], &sec, &ms);
+ printk("%s fill time %llu s %llu ms\n", tag, sec, ms);
+ pretty_time(×[CHECK_TIME_INDEX], &sec, &ms);
+ printk("%s check time %llu s %llu ms\n", tag, sec, ms);
+ pretty_time(×[DEL_TIME_INDEX], &sec, &ms);
+ printk("%s del time %llu s %llu ms \n", tag, sec, ms);
+ for (i = 0; i < threads; i++) {
+ pretty_time(×[FIRST_THREAD_INDEX + i], &sec, &ms);
+ printk("%s thread %lu time %llu s %llu ms\n", tag, i, sec, ms);
+ }
+
+ printk("worker thread pops done %d\n", atomic_read(&pops_done));
+
+ kfree(times);
+}
+
+static int skiptest_thread(void *index)
+{
+ unsigned long thread_index = (unsigned long)index;
+ complete(&startup);
+ runbench(thread_index);
+ complete(&startup);
+ return 0;
+}
+
+static int __init skiptest_init(void)
+{
+ unsigned long i;
+ init_completion(&startup);
+ slot_cache = kmem_cache_create("skiplist_slot", sizeof(struct sl_slot), 0,
+ SLAB_RECLAIM_ACCOUNT | SLAB_MEM_SPREAD |
+ SLAB_DESTROY_BY_RCU, NULL);
+
+ if (!slot_cache)
+ return -ENOMEM;
+
+ spin_lock_init(&rbtree_lock);
+
+ printk("skiptest benchmark module (%d threads) (%d items) (%d rounds)\n", threads, items, rounds);
+
+ times = kmalloc(sizeof(times[0]) * (threads + 3), GFP_KERNEL);
+
+ atomic_set(&threads_running, threads);
+ for (i = 0; i < threads; i++) {
+ kthread_run(skiptest_thread, (void *)i, "skiptest_thread");
+ }
+ for (i = 0; i < threads; i++)
+ wait_for_completion(&startup);
+ return 0;
+}
+
+static void __exit skiptest_exit(void)
+{
+ int i;
+ module_exiting = 1;
+
+ for (i = 0; i < threads; i++) {
+ wait_for_completion(&startup);
+ }
+
+ synchronize_rcu();
+ kmem_cache_destroy(slot_cache);
+ printk("all skiptest threads done\n");
+ return;
+}
+
+module_init(skiptest_init);
+module_exit(skiptest_exit);
--
1.8.2
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [PATCH RFC 2/2] skiplists for the IOMMU
2013-05-03 2:02 [PATCH RFC 0/2] skiplists for range indexes Chris Mason
2013-05-03 2:06 ` [PATCH RFC 1/2] core skiplist code Chris Mason
@ 2013-05-03 2:10 ` Chris Mason
2013-05-03 9:19 ` [PATCH RFC 0/2] skiplists for range indexes Jan Kara
2 siblings, 0 replies; 12+ messages in thread
From: Chris Mason @ 2013-05-03 2:10 UTC (permalink / raw)
To: Linux FS Devel, David Woodhouse, dchinner@redhat.com, bo.li.liu
This is only for experimental use, and it is only lightly tested.
diff --git a/drivers/iommu/intel-iommu.c b/drivers/iommu/intel-iommu.c
index 0099667..32d2920 100644
--- a/drivers/iommu/intel-iommu.c
+++ b/drivers/iommu/intel-iommu.c
@@ -1403,7 +1403,6 @@ static void iommu_detach_domain(struct dmar_domain *domain,
}
static struct iova_domain reserved_iova_list;
-static struct lock_class_key reserved_rbtree_key;
static int dmar_init_reserved_ranges(void)
{
@@ -1413,9 +1412,6 @@ static int dmar_init_reserved_ranges(void)
init_iova_domain(&reserved_iova_list, DMA_32BIT_PFN);
- lockdep_set_class(&reserved_iova_list.iova_rbtree_lock,
- &reserved_rbtree_key);
-
/* IOAPIC ranges shouldn't be accessed by DMA */
iova = reserve_iova(&reserved_iova_list, IOVA_PFN(IOAPIC_RANGE_START),
IOVA_PFN(IOAPIC_RANGE_END));
diff --git a/drivers/iommu/iova.c b/drivers/iommu/iova.c
index 67da6cff..efd5518 100644
--- a/drivers/iommu/iova.c
+++ b/drivers/iommu/iova.c
@@ -18,182 +18,113 @@
*/
#include <linux/iova.h>
+#include <linux/random.h>
+#define IOVA_PENDING_INIT ((unsigned long) -1)
void
init_iova_domain(struct iova_domain *iovad, unsigned long pfn_32bit)
{
- spin_lock_init(&iovad->iova_rbtree_lock);
- iovad->rbroot = RB_ROOT;
- iovad->cached32_node = NULL;
+ spin_lock_init(&iovad->del_skiplist_lock);
+ spin_lock_init(&iovad->skiplist_lock);
+ sl_init_list(&iovad->skiplist, GFP_ATOMIC);
iovad->dma_32bit_pfn = pfn_32bit;
}
-static struct rb_node *
-__get_cached_rbnode(struct iova_domain *iovad, unsigned long *limit_pfn)
+static unsigned long
+__get_cached_addr(struct iova_domain *iovad, unsigned long limit_pfn)
{
- if ((*limit_pfn != iovad->dma_32bit_pfn) ||
- (iovad->cached32_node == NULL))
- return rb_last(&iovad->rbroot);
- else {
- struct rb_node *prev_node = rb_prev(iovad->cached32_node);
- struct iova *curr_iova =
- container_of(iovad->cached32_node, struct iova, node);
- *limit_pfn = curr_iova->pfn_lo - 1;
- return prev_node;
- }
-}
-
-static void
-__cached_rbnode_insert_update(struct iova_domain *iovad,
- unsigned long limit_pfn, struct iova *new)
-{
- if (limit_pfn != iovad->dma_32bit_pfn)
- return;
- iovad->cached32_node = &new->node;
-}
+ unsigned long guess;
-static void
-__cached_rbnode_delete_update(struct iova_domain *iovad, struct iova *free)
-{
- struct iova *cached_iova;
- struct rb_node *curr;
+ prandom_bytes(&guess, sizeof(guess));
- if (!iovad->cached32_node)
- return;
- curr = iovad->cached32_node;
- cached_iova = container_of(curr, struct iova, node);
-
- if (free->pfn_lo >= cached_iova->pfn_lo) {
- struct rb_node *node = rb_next(&free->node);
- struct iova *iova = container_of(node, struct iova, node);
-
- /* only cache if it's below 32bit pfn */
- if (node && iova->pfn_lo < iovad->dma_32bit_pfn)
- iovad->cached32_node = node;
- else
- iovad->cached32_node = NULL;
+ /* the skiplist code is fastest when we spread out the
+ * key range as much as possible. Instead of caching the
+ * last freed or allocated number, return random guesses
+ * in hopes of fanning out our locking attempts.
+ */
+ if (limit_pfn == iovad->dma_32bit_pfn) {
+ guess = guess % iovad->dma_32bit_pfn;
+ guess = max_t(unsigned long, guess, IOVA_START_PFN);
+ return guess;
+ } else {
+ guess = max_t(unsigned long, guess, IOVA_START_PFN);
+ guess = min_t(unsigned long, guess, (~0UL) >> 1);
+ return guess;
}
}
-/* Computes the padding size required, to make the
- * the start address naturally aligned on its size
- */
-static int
-iova_get_pad_size(int size, unsigned int limit_pfn)
-{
- unsigned int pad_size = 0;
- unsigned int order = ilog2(size);
-
- if (order)
- pad_size = (limit_pfn + 1) % (1 << order);
-
- return pad_size;
-}
-
static int __alloc_and_insert_iova_range(struct iova_domain *iovad,
unsigned long size, unsigned long limit_pfn,
struct iova *new, bool size_aligned)
{
- struct rb_node *prev, *curr = NULL;
unsigned long flags;
- unsigned long saved_pfn;
- unsigned int pad_size = 0;
-
- /* Walk the tree backwards */
- spin_lock_irqsave(&iovad->iova_rbtree_lock, flags);
- saved_pfn = limit_pfn;
- curr = __get_cached_rbnode(iovad, &limit_pfn);
- prev = curr;
- while (curr) {
- struct iova *curr_iova = container_of(curr, struct iova, node);
-
- if (limit_pfn < curr_iova->pfn_lo)
- goto move_left;
- else if (limit_pfn < curr_iova->pfn_hi)
- goto adjust_limit_pfn;
- else {
- if (size_aligned)
- pad_size = iova_get_pad_size(size, limit_pfn);
- if ((curr_iova->pfn_hi + size + pad_size) <= limit_pfn)
- break; /* found a free slot */
- }
-adjust_limit_pfn:
- limit_pfn = curr_iova->pfn_lo - 1;
-move_left:
- prev = curr;
- curr = rb_prev(curr);
- }
+ unsigned long align = 1;
+ unsigned long hint;
- if (!curr) {
- if (size_aligned)
- pad_size = iova_get_pad_size(size, limit_pfn);
- if ((IOVA_START_PFN + size + pad_size) > limit_pfn) {
- spin_unlock_irqrestore(&iovad->iova_rbtree_lock, flags);
- return -ENOMEM;
- }
- }
+ int ret;
- /* pfn_lo will point to size aligned address if size_aligned is set */
- new->pfn_lo = limit_pfn - (size + pad_size) + 1;
- new->pfn_hi = new->pfn_lo + size - 1;
-
- /* Insert the new_iova into domain rbtree by holding writer lock */
- /* Add new node and rebalance tree. */
- {
- struct rb_node **entry, *parent = NULL;
-
- /* If we have 'prev', it's a valid place to start the
- insertion. Otherwise, start from the root. */
- if (prev)
- entry = &prev;
- else
- entry = &iovad->rbroot.rb_node;
-
- /* Figure out where to put new node */
- while (*entry) {
- struct iova *this = container_of(*entry,
- struct iova, node);
- parent = *entry;
-
- if (new->pfn_lo < this->pfn_lo)
- entry = &((*entry)->rb_left);
- else if (new->pfn_lo > this->pfn_lo)
- entry = &((*entry)->rb_right);
- else
- BUG(); /* this should not happen */
- }
+ if (size_aligned)
+ align = size;
- /* Add new node and rebalance tree. */
- rb_link_node(&new->node, parent, entry);
- rb_insert_color(&new->node, &iovad->rbroot);
+ /*
+ * make sure that a lockless search into the tree
+ * understands we're still doing setup on this iova
+ */
+ new->pfn_lo = IOVA_PENDING_INIT;
+ new->pfn_hi = IOVA_PENDING_INIT;
+ smp_wmb();
+again:
+ local_irq_save(flags);
+ hint = __get_cached_addr(iovad, limit_pfn);
+ ret = skiplist_insert_hole(&iovad->skiplist,
+ hint,
+ limit_pfn, size, align,
+ &new->slot, GFP_ATOMIC);
+ /*
+ * insert hole returns -eagain when it found a good
+ * spot but someone raced in and stole it. If
+ * that happens pick a new hint and try again
+ */
+ if (ret == -EAGAIN) {
+ local_irq_restore(flags);
+ goto again;
}
- __cached_rbnode_insert_update(iovad, saved_pfn, new);
- spin_unlock_irqrestore(&iovad->iova_rbtree_lock, flags);
+ /* we're fully inserted, set our lo/hi */
+ new->pfn_lo = new->slot.key;
+ new->pfn_hi = new->slot.key + new->slot.size - 1;
+ smp_wmb();
+ local_irq_restore(flags);
+ if (ret)
+ return -ENOMEM;
return 0;
}
-static void
-iova_insert_rbtree(struct rb_root *root, struct iova *iova)
+static int
+iova_insert_skiplist(struct sl_list *skiplist, struct iova *iova)
{
- struct rb_node **new = &(root->rb_node), *parent = NULL;
- /* Figure out where to put new node */
- while (*new) {
- struct iova *this = container_of(*new, struct iova, node);
- parent = *new;
-
- if (iova->pfn_lo < this->pfn_lo)
- new = &((*new)->rb_left);
- else if (iova->pfn_lo > this->pfn_lo)
- new = &((*new)->rb_right);
- else
- BUG(); /* this should not happen */
+ int ret;
+ int preload_token;
+ unsigned long flags;
+
+ local_irq_save(flags);
+ preload_token = skiplist_preload(skiplist, GFP_ATOMIC);
+ if (preload_token < 0) {
+ ret = preload_token;
+ local_irq_restore(flags);
+ goto out;
}
- /* Add new node and rebalance tree. */
- rb_link_node(&iova->node, parent, new);
- rb_insert_color(&iova->node, root);
+
+ iova->slot.key = iova->pfn_lo;
+ iova->slot.size = iova->pfn_hi - iova->pfn_lo + 1;
+
+ ret = skiplist_insert(skiplist, &iova->slot, preload_token);
+ local_irq_restore(flags);
+ preempt_enable();
+out:
+ return ret;
}
/**
@@ -245,53 +176,24 @@ alloc_iova(struct iova_domain *iovad, unsigned long size,
*/
struct iova *find_iova(struct iova_domain *iovad, unsigned long pfn)
{
+ struct sl_slot *slot;
unsigned long flags;
- struct rb_node *node;
-
- /* Take the lock so that no other thread is manipulating the rbtree */
- spin_lock_irqsave(&iovad->iova_rbtree_lock, flags);
- node = iovad->rbroot.rb_node;
- while (node) {
- struct iova *iova = container_of(node, struct iova, node);
-
- /* If pfn falls within iova's range, return iova */
- if ((pfn >= iova->pfn_lo) && (pfn <= iova->pfn_hi)) {
- spin_unlock_irqrestore(&iovad->iova_rbtree_lock, flags);
- /* We are not holding the lock while this iova
- * is referenced by the caller as the same thread
- * which called this function also calls __free_iova()
- * and it is by design that only one thread can possibly
- * reference a particular iova and hence no conflict.
- */
- return iova;
- }
+ struct iova *iova;
- if (pfn < iova->pfn_lo)
- node = node->rb_left;
- else if (pfn > iova->pfn_lo)
- node = node->rb_right;
+ local_irq_save(flags);
+ slot = skiplist_lookup(&iovad->skiplist, pfn, 1);
+ if (!slot) {
+ local_irq_restore(flags);
+ return NULL;
}
-
- spin_unlock_irqrestore(&iovad->iova_rbtree_lock, flags);
- return NULL;
-}
-
-/**
- * __free_iova - frees the given iova
- * @iovad: iova domain in question.
- * @iova: iova in question.
- * Frees the given iova belonging to the giving domain
- */
-void
-__free_iova(struct iova_domain *iovad, struct iova *iova)
-{
- unsigned long flags;
-
- spin_lock_irqsave(&iovad->iova_rbtree_lock, flags);
- __cached_rbnode_delete_update(iovad, iova);
- rb_erase(&iova->node, &iovad->rbroot);
- spin_unlock_irqrestore(&iovad->iova_rbtree_lock, flags);
- free_iova_mem(iova);
+ iova = sl_slot_entry(slot, struct iova, slot);
+ while (iova->pfn_lo == IOVA_PENDING_INIT ||
+ iova->pfn_hi == IOVA_PENDING_INIT) {
+ cpu_relax();
+ smp_rmb();
+ }
+ local_irq_restore(flags);
+ return iova;
}
/**
@@ -304,12 +206,29 @@ __free_iova(struct iova_domain *iovad, struct iova *iova)
void
free_iova(struct iova_domain *iovad, unsigned long pfn)
{
- struct iova *iova = find_iova(iovad, pfn);
- if (iova)
- __free_iova(iovad, iova);
+ struct iova *iova;
+ struct sl_slot *slot;
+ unsigned long flags;
+
+ local_irq_save(flags);
+ slot = skiplist_delete(&iovad->skiplist, pfn, 1);
+ local_irq_restore(flags);
+
+ if (!slot)
+ return;
+ iova = sl_slot_entry(slot, struct iova, slot);
+ free_iova_mem(iova);
}
+void
+__free_iova(struct iova_domain *iovad, struct iova *iova)
+{
+ unsigned long pfn = iova->pfn_lo;
+ free_iova(iovad, pfn);
+}
+
+
/**
* put_iova_domain - destroys the iova doamin
* @iovad: - iova domain in question.
@@ -317,29 +236,37 @@ free_iova(struct iova_domain *iovad, unsigned long pfn)
*/
void put_iova_domain(struct iova_domain *iovad)
{
- struct rb_node *node;
+ struct sl_list *skiplist = &iovad->skiplist;
+ struct sl_node *p;
+ struct sl_leaf *leaf;
unsigned long flags;
+ struct iova *iova;
+ struct sl_slot *slot;
+ int i;
- spin_lock_irqsave(&iovad->iova_rbtree_lock, flags);
- node = rb_first(&iovad->rbroot);
- while (node) {
- struct iova *iova = container_of(node, struct iova, node);
- rb_erase(node, &iovad->rbroot);
- free_iova_mem(iova);
- node = rb_first(&iovad->rbroot);
+ /*
+ * the skiplist code needs some helpers for iteration. For now
+ * roll our own
+ */
+ local_irq_save(flags);
+ spin_lock(&iovad->skiplist_lock);
+ sl_lock_node(skiplist->head);
+ p = skiplist->head->ptrs[0].next;
+ while (p) {
+ leaf = sl_entry(p);
+ for (i = 0; i < leaf->nr; i++) {
+ slot = leaf->ptrs[i];
+ iova = sl_slot_entry(slot, struct iova, slot);
+ free_iova_mem(iova);
+ }
+ p = leaf->node.ptrs[0].next;
+ sl_free_leaf(leaf);
}
- spin_unlock_irqrestore(&iovad->iova_rbtree_lock, flags);
-}
-
-static int
-__is_range_overlap(struct rb_node *node,
- unsigned long pfn_lo, unsigned long pfn_hi)
-{
- struct iova *iova = container_of(node, struct iova, node);
-
- if ((pfn_lo <= iova->pfn_hi) && (pfn_hi >= iova->pfn_lo))
- return 1;
- return 0;
+ /* FIXME call a helper here */
+ memset(skiplist->head->ptrs, 0, sl_node_size(SKIP_MAXLEVEL));
+ sl_unlock_node(skiplist->head);
+ spin_unlock(&iovad->skiplist_lock);
+ local_irq_restore(flags);
}
static struct iova *
@@ -347,6 +274,7 @@ __insert_new_range(struct iova_domain *iovad,
unsigned long pfn_lo, unsigned long pfn_hi)
{
struct iova *iova;
+ int ret;
iova = alloc_iova_mem();
if (!iova)
@@ -354,18 +282,16 @@ __insert_new_range(struct iova_domain *iovad,
iova->pfn_hi = pfn_hi;
iova->pfn_lo = pfn_lo;
- iova_insert_rbtree(&iovad->rbroot, iova);
- return iova;
-}
+ ret = iova_insert_skiplist(&iovad->skiplist, iova);
-static void
-__adjust_overlap_range(struct iova *iova,
- unsigned long *pfn_lo, unsigned long *pfn_hi)
-{
- if (*pfn_lo < iova->pfn_lo)
- iova->pfn_lo = *pfn_lo;
- if (*pfn_hi > iova->pfn_hi)
- *pfn_lo = iova->pfn_hi + 1;
+ if (ret == -ENOMEM) {
+ free_iova_mem(iova);
+ return NULL;
+ } else {
+ BUG_ON(ret);
+ }
+
+ return iova;
}
/**
@@ -380,32 +306,42 @@ struct iova *
reserve_iova(struct iova_domain *iovad,
unsigned long pfn_lo, unsigned long pfn_hi)
{
- struct rb_node *node;
+ struct sl_slot *slot;
unsigned long flags;
- struct iova *iova;
- unsigned int overlap = 0;
-
- spin_lock_irqsave(&iovad->iova_rbtree_lock, flags);
- for (node = rb_first(&iovad->rbroot); node; node = rb_next(node)) {
- if (__is_range_overlap(node, pfn_lo, pfn_hi)) {
- iova = container_of(node, struct iova, node);
- __adjust_overlap_range(iova, &pfn_lo, &pfn_hi);
- if ((pfn_lo >= iova->pfn_lo) &&
- (pfn_hi <= iova->pfn_hi))
- goto finish;
- overlap = 1;
-
- } else if (overlap)
- break;
- }
-
- /* We are here either because this is the first reserver node
- * or need to insert remaining non overlap addr range
+ struct iova *iova = NULL;
+ struct iova *found = NULL;
+ unsigned long size = pfn_hi - pfn_lo + 1;
+ unsigned long min_pfn = pfn_lo;
+ unsigned long max_pfn = pfn_hi;
+
+ /*
+ * this is not locking safe. It only happens while there are no
+ * concurrent IO requrests (I hope!)
*/
- iova = __insert_new_range(iovad, pfn_lo, pfn_hi);
-finish:
-
- spin_unlock_irqrestore(&iovad->iova_rbtree_lock, flags);
+ local_irq_save(flags);
+ spin_lock(&iovad->skiplist_lock);
+ while(1) {
+ /*
+ * really ugly, just delete anything overlapping and
+ * reinsert the new full range
+ */
+ slot = skiplist_delete(&iovad->skiplist, pfn_lo, size);
+ if (!slot)
+ break;
+
+ found = sl_slot_entry(slot, struct iova, slot);
+ while (found->pfn_lo == IOVA_PENDING_INIT ||
+ found->pfn_hi == IOVA_PENDING_INIT) {
+ cpu_relax();
+ smp_rmb();
+ }
+ min_pfn = min(found->pfn_lo, min_pfn);
+ max_pfn = max(found->pfn_hi, max_pfn);
+ free_iova_mem(found);
+ }
+ iova = __insert_new_range(iovad, min_pfn, max_pfn);
+ spin_unlock(&iovad->skiplist_lock);
+ local_irq_restore(flags);
return iova;
}
@@ -419,17 +355,33 @@ finish:
void
copy_reserved_iova(struct iova_domain *from, struct iova_domain *to)
{
+ struct sl_node *p;
+ struct sl_leaf *leaf;
unsigned long flags;
- struct rb_node *node;
-
- spin_lock_irqsave(&from->iova_rbtree_lock, flags);
- for (node = rb_first(&from->rbroot); node; node = rb_next(node)) {
- struct iova *iova = container_of(node, struct iova, node);
- struct iova *new_iova;
- new_iova = reserve_iova(to, iova->pfn_lo, iova->pfn_hi);
- if (!new_iova)
- printk(KERN_ERR "Reserve iova range %lx@%lx failed\n",
- iova->pfn_lo, iova->pfn_lo);
+ struct iova *iova;
+ struct iova *new_iova;
+ struct sl_slot *slot;
+ int i;
+
+ /*
+ * this is not locking safe. It only happens while there are no
+ * concurrent IO requrests (I hope!)
+ */
+ local_irq_save(flags);
+ sl_lock_node(from->skiplist.head);
+ p = from->skiplist.head->ptrs[0].next;
+ while (p) {
+ leaf = sl_entry(p);
+ for (i = 0; i < leaf->nr; i++) {
+ slot = leaf->ptrs[i];
+ iova = sl_slot_entry(slot, struct iova, slot);
+ new_iova = reserve_iova(to, iova->pfn_lo, iova->pfn_hi);
+ if (!new_iova)
+ printk(KERN_ERR "Reserve iova range %lx@%lx failed\n",
+ iova->pfn_lo, iova->pfn_lo);
+ }
+ p = leaf->node.ptrs[0].next;
}
- spin_unlock_irqrestore(&from->iova_rbtree_lock, flags);
+ sl_unlock_node(from->skiplist.head);
+ local_irq_restore(flags);
}
diff --git a/include/linux/iova.h b/include/linux/iova.h
index 76a0759..b514c18 100644
--- a/include/linux/iova.h
+++ b/include/linux/iova.h
@@ -13,7 +13,7 @@
#include <linux/types.h>
#include <linux/kernel.h>
-#include <linux/rbtree.h>
+#include <linux/skiplist.h>
#include <linux/dma-mapping.h>
/* IO virtual address start page frame number */
@@ -21,16 +21,16 @@
/* iova structure */
struct iova {
- struct rb_node node;
+ struct sl_slot slot;
unsigned long pfn_hi; /* IOMMU dish out addr hi */
unsigned long pfn_lo; /* IOMMU dish out addr lo */
};
/* holds all the iova translations for a domain */
struct iova_domain {
- spinlock_t iova_rbtree_lock; /* Lock to protect update of rbtree */
- struct rb_root rbroot; /* iova domain rbtree root */
- struct rb_node *cached32_node; /* Save last alloced node */
+ spinlock_t skiplist_lock;
+ spinlock_t del_skiplist_lock;
+ struct sl_list skiplist; /* iova domain skiplist */
unsigned long dma_32bit_pfn;
};
--
1.8.2
^ permalink raw reply related [flat|nested] 12+ messages in thread
* Re: [PATCH RFC 0/2] skiplists for range indexes
2013-05-03 2:02 [PATCH RFC 0/2] skiplists for range indexes Chris Mason
2013-05-03 2:06 ` [PATCH RFC 1/2] core skiplist code Chris Mason
2013-05-03 2:10 ` [PATCH RFC 2/2] skiplists for the IOMMU Chris Mason
@ 2013-05-03 9:19 ` Jan Kara
2013-05-03 10:45 ` Chris Mason
2 siblings, 1 reply; 12+ messages in thread
From: Jan Kara @ 2013-05-03 9:19 UTC (permalink / raw)
To: Chris Mason
Cc: Linux FS Devel, David Woodhouse, dchinner@redhat.com, bo.li.liu
Hi,
On Thu 02-05-13 22:02:11, Chris Mason wrote:
> I've been working on skiplists for indexing off/on for a little while
> now. For btrfs, I really need a fast way to manage extents that doesn't
> bog down in lock contention for reads or writes. Dave Chinner has
> mentioned this as well, so I've cc'd him.
But I guess you still need writer-writer exclusion, don't you? Or are you
able to do some partial locking of the structure to allow parallel
modifications?
> Liu Bo started this skiplist code, but at the time it didn't make
> a huge difference. I've reworked things a bit and used the IOMMU
> to test things. I ran some benchmarks on a single socket
> server with two SSD cards to get a baseline of how much it is helping.
>
> I tried to keep the basic structure of the IOMMU code, and there are probably
> many better ways to fix the IOMMU contention. Really I'm just trying to
> compare with rbtrees, not fix the IOMMU.
There also exist some research papers on RCU friendly RB-trees (or other
trees). Maybe they would be interesting to try? The basic trick is to use a
copy-on-write when you need to do a rotation. This is slightly impractical
because it requires memory allocation (or alternatively doubles memory
footprint of an rb node) but you need to do a similar stuff in skip lists
anyway. Just an idea...
Honza
> Basic numbers (all aio/dio):
>
> Streaming writes
> IOMMU off 2,575MB/s
> skiplist 1,715MB/s
> rbtree 1,659MB/s
>
> Not a huge improvement, but the CPU time was lower.
>
> 16 threads, iodepth 10, 20MB random reads
> IOMMU off 2,861MB/s (mostly idle)
> skiplist 2,484MB/s (100% system time)
> rbtree 99MB/s (100% system time)
>
> 16 threads, iodepth 10, 20MB random writes
> IOMMU off 2,548MB/s
> skiplist 1,649MB/s
> rbtree 33MB/s
>
> I ran this a bunch of times to make sure I got the rbtree numbers right,
> lowering the thread count did improve the rbtree performance, but the
> best I could do was around 300MB/s. For skiplists, all of the CPU time
> is being spent in skiplist_insert_hole, which has a lot of room for
> improvement.
>
> From here the code needs a lot of testing, and I need to fill out the API
> to make it a little easier to use. But, I wanted to send this around
> for comments and to see how other might want to use things. More
> details on all internals are below.
>
> It starts with a basic skiplist implementation where:
>
> * skiplist nodes are dynamically sized. There is a slab cache for each
> node height, so every node is actively using all of the pointers allocated
> for it.
>
> * Each node has a back pointer as well as a forward pointer
>
> * Each skiplist node stores an array of pointers to items. This is a huge
> speed improvement because the array of keys is much more cache friendly.
>
> Since the array of item pointers is something of an extension
> to the basic skiplist, I've separated them out into two structs.
> First sl_node (just the indexing pointers) and then sl_leaf,
> which has an sl_node and the array of item pointers.
>
> There's no cache benefit to the leaves if I'm just storing
> an array of pointers though, so it also has an array
> of keys:
>
> unsigned long keys[SKIP_KEYS_PER_NODE];
> struct sl_slot *ptrs[SKIP_KEYS_PER_NODE];
>
> The keys array is used for the first search pass, and based
> on that result I narrow things down and only have to follow
> one or two pointers from the corresponding ptrs array.
>
> The sl_slot is very simple:
>
> struct sl_slot {
> unsigned long key;
> unsigned long size;
> }
>
> The idea is to embed the sl_slot in your struct, giving us something like
> this:
>
> sl_leaf
> ________________
> | node ptr N |
> | .... |
> | node ptr 0 |
> |_______________|
> | | keys | |
> |___|________|__|
> | . | ptrs |. |
> |___|________|__|
> / \
> / \
> ------------- ------------
> | sl_slot 0 | | sl_slot N |
> | | | |
> | data | | data |
> ------------- -------------
> your
> struct
>
> This basic setup gives us similar performance to rbtrees, but uses less memory.
> The performance varies (a lot really) with the size of the keys array.
>
> Locking is a mixture of RCU and per-node locking. For searches,
> it can be pure RCU, or it can use the per-node lock to synchronize
> the final search though the last leaf. But, everything from the
> skiplist head to that final node is RCU either way.
>
> Insert locking is slightly different. Before the insert is started,
> a new node is preallocated. The height of this node is used to
> pick the level where we start locking nodes during the insert. Much
> of the time we're able to get through huge portions of the list
> without any locks.
>
> For deletes, it does an RCU search for the leaf, and hold the per-node lock
> while we remove a specific slot. If the leaf is empty it is marked as dead and
> then unlinked from the skiplist level by level.
>
> All the locking and tries does slow things down a bit, but the benefits
> for concurrent access make a big difference.
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at http://vger.kernel.org/majordomo-info.html
--
Jan Kara <jack@suse.cz>
SUSE Labs, CR
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH RFC 0/2] skiplists for range indexes
2013-05-03 9:19 ` [PATCH RFC 0/2] skiplists for range indexes Jan Kara
@ 2013-05-03 10:45 ` Chris Mason
2013-05-04 3:25 ` Dave Chinner
0 siblings, 1 reply; 12+ messages in thread
From: Chris Mason @ 2013-05-03 10:45 UTC (permalink / raw)
To: Jan Kara
Cc: Linux FS Devel, David Woodhouse, dchinner@redhat.com,
bo.li.liu@oracle.com
Quoting Jan Kara (2013-05-03 05:19:24)
> Hi,
>
> On Thu 02-05-13 22:02:11, Chris Mason wrote:
> > I've been working on skiplists for indexing off/on for a little while
> > now. For btrfs, I really need a fast way to manage extents that doesn't
> > bog down in lock contention for reads or writes. Dave Chinner has
> > mentioned this as well, so I've cc'd him.
> But I guess you still need writer-writer exclusion, don't you? Or are you
> able to do some partial locking of the structure to allow parallel
> modifications?
Hi Jan,
Yes, insert/delete still lock, but they only take locks for the levels
required for the operation. So if we're inserting at level 0, we'll
walk all the way down to level 0 before taking any locks.
If an insert happens at level 5, it'll start locking at level 5 and
build a list of nodes we'll have to update in order to do the insert.
Everything in the list is locked until the insert is done.
There's room for optimization there, since chances are good the
insertion point will have some free room and I won't need all those
locks. The real goal is a little less fine grained, just because all
the locking does slow things down when there's no contention.
For the iommu code, the biggest difference between skiplists and rbtrees
is the rbtrees are trying to remember a spot they are likely to find
free ranges to hand out. The skiplists pick a random starting point and
hope the locking spread saves us.
>
> > Liu Bo started this skiplist code, but at the time it didn't make
> > a huge difference. I've reworked things a bit and used the IOMMU
> > to test things. I ran some benchmarks on a single socket
> > server with two SSD cards to get a baseline of how much it is helping.
> >
> > I tried to keep the basic structure of the IOMMU code, and there are probably
> > many better ways to fix the IOMMU contention. Really I'm just trying to
> > compare with rbtrees, not fix the IOMMU.
> There also exist some research papers on RCU friendly RB-trees (or other
> trees). Maybe they would be interesting to try? The basic trick is to use a
> copy-on-write when you need to do a rotation. This is slightly impractical
> because it requires memory allocation (or alternatively doubles memory
> footprint of an rb node) but you need to do a similar stuff in skip lists
> anyway. Just an idea...
Definitely interested if you know of alternatives. Most of the ones I
saw were RCU for reads but not very fine grained for writes. For btrfs
at least, the indexes will be getting a lot of updates and I need higher
concurrency on writes.
-chris
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH RFC 0/2] skiplists for range indexes
2013-05-03 10:45 ` Chris Mason
@ 2013-05-04 3:25 ` Dave Chinner
2013-05-04 11:11 ` Chris Mason
0 siblings, 1 reply; 12+ messages in thread
From: Dave Chinner @ 2013-05-04 3:25 UTC (permalink / raw)
To: Chris Mason
Cc: Jan Kara, Linux FS Devel, David Woodhouse, dchinner@redhat.com,
bo.li.liu@oracle.com
On Fri, May 03, 2013 at 06:45:25AM -0400, Chris Mason wrote:
> Quoting Jan Kara (2013-05-03 05:19:24)
> > Hi,
> >
> > On Thu 02-05-13 22:02:11, Chris Mason wrote:
> > > I've been working on skiplists for indexing off/on for a little while
> > > now. For btrfs, I really need a fast way to manage extents that doesn't
> > > bog down in lock contention for reads or writes. Dave Chinner has
> > > mentioned this as well, so I've cc'd him.
> > But I guess you still need writer-writer exclusion, don't you? Or are you
> > able to do some partial locking of the structure to allow parallel
> > modifications?
>
> Hi Jan,
>
> Yes, insert/delete still lock, but they only take locks for the levels
> required for the operation. So if we're inserting at level 0, we'll
> walk all the way down to level 0 before taking any locks.
>
> If an insert happens at level 5, it'll start locking at level 5 and
> build a list of nodes we'll have to update in order to do the insert.
> Everything in the list is locked until the insert is done.
>
> There's room for optimization there, since chances are good the
> insertion point will have some free room and I won't need all those
> locks. The real goal is a little less fine grained, just because all
> the locking does slow things down when there's no contention.
>
> For the iommu code, the biggest difference between skiplists and rbtrees
> is the rbtrees are trying to remember a spot they are likely to find
> free ranges to hand out. The skiplists pick a random starting point and
> hope the locking spread saves us.
>
> >
> > > Liu Bo started this skiplist code, but at the time it didn't make
> > > a huge difference. I've reworked things a bit and used the IOMMU
> > > to test things. I ran some benchmarks on a single socket
> > > server with two SSD cards to get a baseline of how much it is helping.
> > >
> > > I tried to keep the basic structure of the IOMMU code, and there are probably
> > > many better ways to fix the IOMMU contention. Really I'm just trying to
> > > compare with rbtrees, not fix the IOMMU.
> > There also exist some research papers on RCU friendly RB-trees (or other
> > trees). Maybe they would be interesting to try? The basic trick is to use a
> > copy-on-write when you need to do a rotation. This is slightly impractical
> > because it requires memory allocation (or alternatively doubles memory
> > footprint of an rb node) but you need to do a similar stuff in skip lists
> > anyway. Just an idea...
>
> Definitely interested if you know of alternatives. Most of the ones I
> saw were RCU for reads but not very fine grained for writes. For btrfs
> at least, the indexes will be getting a lot of updates and I need higher
> concurrency on writes.
I've got two cases I care about. The first is the buffer cache
indexes which have a 1000:1 read:modify ratio and I'd really like the
lookups to be lockless. The other case is the extent tree, where we
do lots of inserts when the extent tree is first read, and after
than it's typically 2 lookups for every insert/remove. Having one
tree that works for both would be handy...
Cheers,
Dave.
--
Dave Chinner
david@fromorbit.com
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH RFC 0/2] skiplists for range indexes
2013-05-04 3:25 ` Dave Chinner
@ 2013-05-04 11:11 ` Chris Mason
2013-05-05 7:33 ` Dave Chinner
0 siblings, 1 reply; 12+ messages in thread
From: Chris Mason @ 2013-05-04 11:11 UTC (permalink / raw)
To: Dave Chinner
Cc: Jan Kara, Linux FS Devel, David Woodhouse, dchinner@redhat.com,
bo.li.liu@oracle.com
Quoting Dave Chinner (2013-05-03 23:25:36)
>
> I've got two cases I care about. The first is the buffer cache
> indexes which have a 1000:1 read:modify ratio and I'd really like the
> lookups to be lockless. The other case is the extent tree, where we
> do lots of inserts when the extent tree is first read, and after
> than it's typically 2 lookups for every insert/remove. Having one
> tree that works for both would be handy...
Ok, we're in a similar boat then. I'll finish off some of the API and
test the pure RCU side harder.
For the extent tree, are you doing a lot of merging once things are in
the tree? I'm not planning on doing pure-rcu for items that get merged
quiet yet.
Also, I'm using unsigned longs right now. My guess is we'll both want
u64s, which means I have to do an i_size_read/write trick in a few
spots.
-chris
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH RFC 0/2] skiplists for range indexes
2013-05-04 11:11 ` Chris Mason
@ 2013-05-05 7:33 ` Dave Chinner
2013-05-05 14:38 ` Chris Mason
0 siblings, 1 reply; 12+ messages in thread
From: Dave Chinner @ 2013-05-05 7:33 UTC (permalink / raw)
To: Chris Mason
Cc: Dave Chinner, Jan Kara, Linux FS Devel, David Woodhouse,
bo.li.liu@oracle.com
On Sat, May 04, 2013 at 07:11:51AM -0400, Chris Mason wrote:
> Quoting Dave Chinner (2013-05-03 23:25:36)
> >
> > I've got two cases I care about. The first is the buffer cache
> > indexes which have a 1000:1 read:modify ratio and I'd really like the
> > lookups to be lockless. The other case is the extent tree, where we
> > do lots of inserts when the extent tree is first read, and after
> > than it's typically 2 lookups for every insert/remove. Having one
> > tree that works for both would be handy...
>
> Ok, we're in a similar boat then. I'll finish off some of the API and
> test the pure RCU side harder.
>
> For the extent tree, are you doing a lot of merging once things are in
> the tree? I'm not planning on doing pure-rcu for items that get merged
> quiet yet.
Yes, we merge extents where ever possible. Almost all contiguous
allocations and unwritten extent conversions merge extents in some
manner...
> Also, I'm using unsigned longs right now. My guess is we'll both want
> u64s, which means I have to do an i_size_read/write trick in a few
> spots.
Yup, definitely needs to be u64 for XFS...
Cheers,
Dave.
--
Dave Chinner
dchinner@redhat.com
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH RFC 0/2] skiplists for range indexes
2013-05-05 7:33 ` Dave Chinner
@ 2013-05-05 14:38 ` Chris Mason
2013-05-05 22:44 ` Dave Chinner
0 siblings, 1 reply; 12+ messages in thread
From: Chris Mason @ 2013-05-05 14:38 UTC (permalink / raw)
To: Dave Chinner
Cc: Dave Chinner, Jan Kara, Linux FS Devel, David Woodhouse,
bo.li.liu@oracle.com
Quoting Dave Chinner (2013-05-05 03:33:57)
> On Sat, May 04, 2013 at 07:11:51AM -0400, Chris Mason wrote:
> > Quoting Dave Chinner (2013-05-03 23:25:36)
> > >
> > > I've got two cases I care about. The first is the buffer cache
> > > indexes which have a 1000:1 read:modify ratio and I'd really like the
> > > lookups to be lockless. The other case is the extent tree, where we
> > > do lots of inserts when the extent tree is first read, and after
> > > than it's typically 2 lookups for every insert/remove. Having one
> > > tree that works for both would be handy...
> >
> > Ok, we're in a similar boat then. I'll finish off some of the API and
> > test the pure RCU side harder.
> >
> > For the extent tree, are you doing a lot of merging once things are in
> > the tree? I'm not planning on doing pure-rcu for items that get merged
> > quiet yet.
>
> Yes, we merge extents where ever possible. Almost all contiguous
> allocations and unwritten extent conversions merge extents in some
> manner...
Ok, I'll make sure those helpers are generic. The helpers need to
search down to the leaf with the items we care about, take the
lock and then start merging things together. For btrfs, the decision to
merge is pretty complex, so it'll end up driven by the FS code.
The skiplist doesn't do the copy part of rcu. It carefully orders the
updates instead, but the merging should still be possible because I'm
making sure the keys in the leaf and the slot structure match before
trusting what I read.
>
> > Also, I'm using unsigned longs right now. My guess is we'll both want
> > u64s, which means I have to do an i_size_read/write trick in a few
> > spots.
>
> Yup, definitely needs to be u64 for XFS...
Fair enough. The i_size_read equiv will slow down searches some on
32 bit. I think the hit is worth it though, much better than two trees.
Is your buffer cache radix now or rbtree? It's worth mentioning that
radix is still 2x-3x faster than rbtree if you aren't doing range
searches.
I'm not sure if that carries from the microbenchmark into something we
can notice, but I definitely wouldn't use skiplists for something
like the page cache unless we have a good case for a major range based
overhaul.
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH RFC 0/2] skiplists for range indexes
2013-05-05 14:38 ` Chris Mason
@ 2013-05-05 22:44 ` Dave Chinner
2013-05-06 11:28 ` [BULK] " Chris Mason
0 siblings, 1 reply; 12+ messages in thread
From: Dave Chinner @ 2013-05-05 22:44 UTC (permalink / raw)
To: Chris Mason
Cc: Dave Chinner, Jan Kara, Linux FS Devel, David Woodhouse,
bo.li.liu@oracle.com
On Sun, May 05, 2013 at 10:38:12AM -0400, Chris Mason wrote:
> Quoting Dave Chinner (2013-05-05 03:33:57)
> > On Sat, May 04, 2013 at 07:11:51AM -0400, Chris Mason wrote:
> > > Quoting Dave Chinner (2013-05-03 23:25:36)
> > > >
> > > > I've got two cases I care about. The first is the buffer cache
> > > > indexes which have a 1000:1 read:modify ratio and I'd really like the
> > > > lookups to be lockless. The other case is the extent tree, where we
> > > > do lots of inserts when the extent tree is first read, and after
> > > > than it's typically 2 lookups for every insert/remove. Having one
> > > > tree that works for both would be handy...
> > >
> > > Ok, we're in a similar boat then. I'll finish off some of the API and
> > > test the pure RCU side harder.
> > >
> > > For the extent tree, are you doing a lot of merging once things are in
> > > the tree? I'm not planning on doing pure-rcu for items that get merged
> > > quiet yet.
> >
> > Yes, we merge extents where ever possible. Almost all contiguous
> > allocations and unwritten extent conversions merge extents in some
> > manner...
>
> Ok, I'll make sure those helpers are generic. The helpers need to
> search down to the leaf with the items we care about, take the
> lock and then start merging things together. For btrfs, the decision to
> merge is pretty complex, so it'll end up driven by the FS code.
>
> The skiplist doesn't do the copy part of rcu. It carefully orders the
> updates instead, but the merging should still be possible because I'm
> making sure the keys in the leaf and the slot structure match before
> trusting what I read.
>
> >
> > > Also, I'm using unsigned longs right now. My guess is we'll both want
> > > u64s, which means I have to do an i_size_read/write trick in a few
> > > spots.
> >
> > Yup, definitely needs to be u64 for XFS...
>
> Fair enough. The i_size_read equiv will slow down searches some on
> 32 bit. I think the hit is worth it though, much better than two trees.
>
> Is your buffer cache radix now or rbtree? It's worth mentioning that
> radix is still 2x-3x faster than rbtree if you aren't doing range
> searches.
It's an rbtree per allocation group. The code is doing an exact
extent match and there is potential for multiple buffers at the same
offset (key) into the tree so we can't use a radix tree at all. See
_xfs_buf_find() for the rbtree search code...
Also, the metadata buffers are sparsely indexed, so a radix tree
gobbles memory pretty badly, too...
Cheers,
Dave.
--
Dave Chinner
david@fromorbit.com
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [BULK] Re: [PATCH RFC 0/2] skiplists for range indexes
2013-05-05 22:44 ` Dave Chinner
@ 2013-05-06 11:28 ` Chris Mason
2013-05-07 2:12 ` Dave Chinner
0 siblings, 1 reply; 12+ messages in thread
From: Chris Mason @ 2013-05-06 11:28 UTC (permalink / raw)
To: Dave Chinner
Cc: Dave Chinner, Jan Kara, Linux FS Devel, David Woodhouse,
bo.li.liu@oracle.com
Quoting Dave Chinner (2013-05-05 18:44:16)
> On Sun, May 05, 2013 at 10:38:12AM -0400, Chris Mason wrote:
>
> It's an rbtree per allocation group. The code is doing an exact
> extent match and there is potential for multiple buffers at the same
> offset (key) into the tree so we can't use a radix tree at all. See
> _xfs_buf_find() for the rbtree search code...
The exact match part won't work with my current code. But a small
change will let you pick your own insertion point and allow the
duplicates. I'll have to break up deletion a little as well, but it is
already setup for this.
After the merge window, I've got linuxcon japan. Once both are done
I'll pick this up again and see what I can do.
-chris
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [BULK] Re: [PATCH RFC 0/2] skiplists for range indexes
2013-05-06 11:28 ` [BULK] " Chris Mason
@ 2013-05-07 2:12 ` Dave Chinner
0 siblings, 0 replies; 12+ messages in thread
From: Dave Chinner @ 2013-05-07 2:12 UTC (permalink / raw)
To: Chris Mason
Cc: Dave Chinner, Jan Kara, Linux FS Devel, David Woodhouse,
bo.li.liu@oracle.com
On Mon, May 06, 2013 at 07:28:19AM -0400, Chris Mason wrote:
> Quoting Dave Chinner (2013-05-05 18:44:16)
> > On Sun, May 05, 2013 at 10:38:12AM -0400, Chris Mason wrote:
> >
> > It's an rbtree per allocation group. The code is doing an exact
> > extent match and there is potential for multiple buffers at the same
> > offset (key) into the tree so we can't use a radix tree at all. See
> > _xfs_buf_find() for the rbtree search code...
>
> The exact match part won't work with my current code. But a small
> change will let you pick your own insertion point and allow the
> duplicates. I'll have to break up deletion a little as well, but it is
> already setup for this.
>
> After the merge window, I've got linuxcon japan. Once both are done
> I'll pick this up again and see what I can do.
No worries - I'm not in any hurry to replace this code right now.
Reducing the number of unnecessary/repeated buffer lookups is
probably a bigger win for XFS right now...
Cheers,
Dave.
--
Dave Chinner
david@fromorbit.com
^ permalink raw reply [flat|nested] 12+ messages in thread
end of thread, other threads:[~2013-05-07 2:12 UTC | newest]
Thread overview: 12+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2013-05-03 2:02 [PATCH RFC 0/2] skiplists for range indexes Chris Mason
2013-05-03 2:06 ` [PATCH RFC 1/2] core skiplist code Chris Mason
2013-05-03 2:10 ` [PATCH RFC 2/2] skiplists for the IOMMU Chris Mason
2013-05-03 9:19 ` [PATCH RFC 0/2] skiplists for range indexes Jan Kara
2013-05-03 10:45 ` Chris Mason
2013-05-04 3:25 ` Dave Chinner
2013-05-04 11:11 ` Chris Mason
2013-05-05 7:33 ` Dave Chinner
2013-05-05 14:38 ` Chris Mason
2013-05-05 22:44 ` Dave Chinner
2013-05-06 11:28 ` [BULK] " Chris Mason
2013-05-07 2:12 ` Dave Chinner
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).