[1/1] rosebush: Add new data structure

Message ID 20240222203726.1101861-2-willy@infradead.org
State New
Headers
Series Rosebush, a new hash table |

Commit Message

Matthew Wilcox Feb. 22, 2024, 8:37 p.m. UTC
  Rosebush is a resizing hash table.  See
Docuemntation/core-api/rosebush.rst for details.

Signed-off-by: Matthew Wilcox (Oracle) <willy@infradead.org>
---
 Documentation/core-api/index.rst    |   1 +
 Documentation/core-api/rosebush.rst | 135 ++++++
 MAINTAINERS                         |   8 +
 include/linux/rosebush.h            |  41 ++
 lib/Kconfig.debug                   |   3 +
 lib/Makefile                        |   3 +-
 lib/rosebush.c                      | 707 ++++++++++++++++++++++++++++
 lib/test_rosebush.c                 | 135 ++++++
 8 files changed, 1032 insertions(+), 1 deletion(-)
 create mode 100644 Documentation/core-api/rosebush.rst
 create mode 100644 include/linux/rosebush.h
 create mode 100644 lib/rosebush.c
 create mode 100644 lib/test_rosebush.c
  

Comments

Al Viro Feb. 25, 2024, 6:38 a.m. UTC | #1
On Thu, Feb 22, 2024 at 08:37:24PM +0000, Matthew Wilcox (Oracle) wrote:
> Rosebush is a resizing hash table.  See
> Docuemntation/core-api/rosebush.rst for details.

Interesting...  A few obvious questions wrt dcache:
	* how do the locks that stuff nest wrt ->d_lock?  Inside?
	* same for rename_lock (on rename dentries change names/parents/hash
values).
	* we would be really forced to maintain DCACHE_UNHASHED in ->d_flags;
it's not impossible to do, but will take care (protection of ->d_flags has
several dark corners).
	* the cost of d_drop() goes up.  Might or might not get painful.
	* if the bucket locks nest inside ->d_lock, we'd better look out
for the latter getting held for longer.  Might get unpleasant.
  

Patch

diff --git a/Documentation/core-api/index.rst b/Documentation/core-api/index.rst
index 7a3a08d81f11..380dfb30b073 100644
--- a/Documentation/core-api/index.rst
+++ b/Documentation/core-api/index.rst
@@ -36,6 +36,7 @@  Library functionality that is used throughout the kernel.
    kobject
    kref
    assoc_array
+   rosebush
    xarray
    maple_tree
    idr
diff --git a/Documentation/core-api/rosebush.rst b/Documentation/core-api/rosebush.rst
new file mode 100644
index 000000000000..ed17b2572f06
--- /dev/null
+++ b/Documentation/core-api/rosebush.rst
@@ -0,0 +1,135 @@ 
+.. SPDX-License-Identifier: GPL-2.0+
+
+========
+Rosebush
+========
+
+:Author: Matthew Wilcox
+
+Overview
+========
+
+Rosebush is a hashtable, different from the rhashtable.  It is scalable
+(one spinlock per bucket), resizing in two dimensions (number and size
+of buckets), and concurrent (can be iterated under the RCU read lock).
+It is designed to minimise dependent cache misses, which can stall a
+modern CPU for thousands of instructions.
+
+Objects stored in a rosebush do not have an embedded linked list.
+They can therefore be placed into the same rosebush multiple times and
+be placed in multiple rosebushes.  It is also possible to store pointers
+which have special meaning like ERR_PTR().  It is not possible to store
+a NULL pointer in a rosebush, as this is the return value that indicates
+the iteration has finished.
+
+The user of the rosebush is responsible for calculating their own hash.
+A high quality hash is desirable to keep the scalable properties of
+the rosebush, but a hash with poor distribution in the lower bits will
+not lead to a catastrophic breakdown.  It may lead to excessive memory
+consumption and a lot of CPU time spent during lookup.
+
+Rosebush is not yet IRQ or BH safe.  It can be iterated in interrupt
+context, but not modified.
+
+RCU Iteration
+-------------
+
+There is no rosebush_lookup() function.  This is because the hash value
+may not be unique.  Instead, the user should iterate the rosebush,
+which will return pointers which probably have matching hash values.
+It is the user's responsibility to determine if the returned pointer is
+one they are interested in.
+
+Rosebush iteration guarantees to return all pointers which have a
+matching hash, were in the rosebush before the iteration started and
+remain in the rosebush after iteration ends.  It may return additional
+pointers, including pointers which do not have a matching hash value,
+but it guarantees not to skip any pointers, and it guarantees to only
+return pointers which have (at some point) been stored in the rosebush.
+
+If the rosebush is modified while the iteration is in progress, newly
+added entries may or may not be returned and removed entries may or may
+not be returned.  Causality is not honoured; e.g. if Entry A is known
+to be inserted before Entry B, it is possible for an iteration to return
+Entry B and not Entry A.
+
+Functions and structures
+========================
+
+.. kernel-doc:: include/linux/rosebush.h
+.. kernel-doc:: lib/rosebush.c
+
+Internals
+=========
+
+The rosebush is organised into a top level table which contains pointers
+to buckets.  Each bucket contains a spinlock (for modifications to the
+bucket), the number of entries in the bucket and a contention counter.
+
+The top level table is a power of two in size.  The bottom M bits of
+the hash are used to index into this table, which contains a bucket.
+The bucket contains W hash values followed by W pointers.  We also track
+a contention count, which lets us know if this spinlock is overloaded
+and we should split the bucket to improve scalability.
+
+If the bucket is full, we can increase the size of the bucket.  Currently
+we double the size of the bucket because the slab allocator has slabs for
+powers of two.  We could use any size bucket, or even create slab caches
+for our own buckets (eg a 584 byte bucket would give us 48 entries and
+we'd get 7 buckets per page).  The larger the bucket is, the longer it
+will take to walk the bucket, for modifications and lookup, so there's
+a reasonable limit on the size of an individual bucket.
+
+When we decide that the table needs to be resized, we allocate a new
+table, and duplicate the current contents of the table into each half
+of the new table.  At this point, all buckets in the table are shared.
+A bucket may be shared between multiple table entries.  For simplicity,
+we require that all buckets are shared between a power-of-two number
+of slots.  For example, a table with 8 entries might have entries that
+point to buckets 0, 1, 0, 1, 0, 2, 0, 3.  If we were to then split bucket
+0, we would have to replace either the first or last pair of pointers
+with pointers to bucket 4.  This is akin to the buddy page allocator.
+
+We need to decide when to unshare a bucket.  The most eager option would
+be to do it at table resize, but this seems like a high penalty
+for the unlucky caller who causes a table resize.  Slightly more lazy is
+when we're about to insert into a bucket and discover that it is shared.
+Lazier again would be when we discover that a bucket has no free entries
+in it.  Laziest of all would be to only unshare a bucket when it has been
+grown to the maximum size possible.  All these options have consequences
+for lookup, insertion & deletion time.
+
+In all but the first case, we also need to decide how far to unshare
+the bucket.  For example, if a bucket is currently shared between eight
+slots, do we allocate two new buckets (shared between four slots each),
+three new buckets (one for four slots, two for two slots) or four new
+buckets (one for four slots, one for two slots, and two unshared buckets)?
+Unless the hash is of poor quality, or one particular bucket is highly
+contended, we are unlikely to encounter this situation.  We also need
+to decide how large a bucket to allocate when unsharing a bucket; we
+can count the number of entries that will be in it and choose a bucket
+at least x% larger.  Or we can assume that the new bucket will grow to
+at least the current size.
+
+IRQ / BH safety
+---------------
+
+If we decide to make the rosebush modifiable in IRQ context, we need
+to take the locks in an irq-safe way; we need to figure out how to
+allocate the top level table without vmalloc(), and we need to manage
+without kvfree_rcu_mightsleep().  These all have solutions, but those
+solutions have a cost that isn't worth paying until we have users.
+
+Some of those problems go away if we limit our support to removal in IRQ
+context and only allow insertions in process context (as we do not need
+to reallocate the table or bucket when removing an item).
+
+Small rosebushes
+----------------
+
+As an optimisation, if the rosebush has no entries, the buckets pointer
+is NULL.  If the rosebush has only a few entries, there are only two
+buckets (allocated as a single allocation) and the table pointer points
+directly to the first one instead of pointing to a table.  In this case,
+both buckets are very small and the rosebush is likely to allocate a
+table and separate buckets very early on.
diff --git a/MAINTAINERS b/MAINTAINERS
index 722b894f305e..d8296f0aebf2 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -19008,6 +19008,14 @@  F:	include/net/rose.h
 F:	include/uapi/linux/rose.h
 F:	net/rose/
 
+ROSEBUSH DATA STRUCTURE
+M:	Matthew Wilcox <willy@infradead.org>
+L:	maple-tree@lists.infradead.org
+S:	Supported
+F:	Documentation/core-api/rosebush.rst
+F:	include/linux/rosebush.h
+F:	lib/*rosebush.c
+
 ROTATION DRIVER FOR ALLWINNER A83T
 M:	Jernej Skrabec <jernej.skrabec@gmail.com>
 L:	linux-media@vger.kernel.org
diff --git a/include/linux/rosebush.h b/include/linux/rosebush.h
new file mode 100644
index 000000000000..8f96f6f9743e
--- /dev/null
+++ b/include/linux/rosebush.h
@@ -0,0 +1,41 @@ 
+// SPDX-License-Identifier: GPL-2.0+
+/* See lib/rosebush.c */
+
+#include <linux/spinlock.h>
+
+/*
+ * Embed this struct in your struct, don't allocate it separately.
+ * None of this is for customers to use; treat it as opaque.
+ * In particular, taking the rbh_resize_lock will prevent only rbh_table
+ * from being reallocated; buckets can still be grown and split without
+ * the lock.  But you will get incomprehensible lockdep warnings!
+ */
+struct rbh {
+	spinlock_t	rbh_resize_lock;
+	unsigned long	rbh_table;	/* A tagged pointer */
+};
+
+#define DEFINE_ROSEBUSH(name)	struct rbh name = \
+	{ .rbh_resize_lock = __SPIN_LOCK_UNLOCKED(name.lock), }
+
+int rbh_insert(struct rbh *rbh, u32 hash, void *p);
+int rbh_reserve(struct rbh *rbh, u32 hash);
+int rbh_use(struct rbh *rbh, u32 hash, void *p);
+int rbh_remove(struct rbh *rbh, u32 hash, void *p);
+
+struct rbh_iter {
+	struct rbh *rbh;
+	struct rbh_bucket *bucket;
+	u32 hash;
+	unsigned int index;
+};
+
+#define RBH_ITER(name, _rbh, _hash)					\
+	struct rbh_iter name = { .rbh = _rbh, .hash = _hash, }
+
+void *rbh_next(struct rbh_iter *rbhi);
+
+void rbh_iter_remove(struct rbh_iter *rbhi, void *p);
+void rbh_iter_lock(struct rbh_iter *rbhi);
+void rbh_iter_unlock(struct rbh_iter *rbhi);
+
diff --git a/lib/Kconfig.debug b/lib/Kconfig.debug
index 975a07f9f1cc..d78f5effa310 100644
--- a/lib/Kconfig.debug
+++ b/lib/Kconfig.debug
@@ -2396,6 +2396,9 @@  config TEST_RHASHTABLE
 
 	  If unsure, say N.
 
+config TEST_ROSEBUSH
+	tristate "Test the Rosebush data structure"
+
 config TEST_IDA
 	tristate "Perform selftest on IDA functions"
 
diff --git a/lib/Makefile b/lib/Makefile
index 6b09731d8e61..61592aa5374d 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -28,7 +28,7 @@  CFLAGS_string.o += -fno-stack-protector
 endif
 
 lib-y := ctype.o string.o vsprintf.o cmdline.o \
-	 rbtree.o radix-tree.o timerqueue.o xarray.o \
+	 rosebush.o rbtree.o radix-tree.o timerqueue.o xarray.o \
 	 maple_tree.o idr.o extable.o irq_regs.o argv_split.o \
 	 flex_proportions.o ratelimit.o \
 	 is_single_threaded.o plist.o decompress.o kobject_uevent.o \
@@ -75,6 +75,7 @@  obj-$(CONFIG_TEST_LIST_SORT) += test_list_sort.o
 obj-$(CONFIG_TEST_MIN_HEAP) += test_min_heap.o
 obj-$(CONFIG_TEST_LKM) += test_module.o
 obj-$(CONFIG_TEST_VMALLOC) += test_vmalloc.o
+obj-$(CONFIG_TEST_ROSEBUSH) += test_rosebush.o
 obj-$(CONFIG_TEST_RHASHTABLE) += test_rhashtable.o
 obj-$(CONFIG_TEST_SORT) += test_sort.o
 obj-$(CONFIG_TEST_USER_COPY) += test_user_copy.o
diff --git a/lib/rosebush.c b/lib/rosebush.c
new file mode 100644
index 000000000000..405ec864ef35
--- /dev/null
+++ b/lib/rosebush.c
@@ -0,0 +1,707 @@ 
+// SPDX-License-Identifier: GPL-2.0+
+/*
+ * Rosebush, a resizing bucket hash table
+ * Copyright (c) 2024 Oracle Corporation
+ * Author: Matthew Wilcox <willy@infradead.org>
+ */
+
+#include <linux/rosebush.h>
+#include <linux/rcupdate.h>
+#include <linux/slab.h>
+#include <linux/vmalloc.h>
+
+/*
+ * The lock is held whenever we are modifying the contents of the bucket.
+ * The contention counter tracks whether we need to split the bucket due
+ * to contention on the spinlock.
+ * Entries tells us how many hashes (and therefore how many slots) we have.
+ * The bucket also contains the slots, but C doesn't let you declare
+ * two unsized arrays next to each other.
+ */
+struct rbh_bucket {
+	spinlock_t	rbh_lock;
+	u16		rbh_contention;
+	u16		rbh_entries;
+	u32		rbh_hashes[] __counted_by(rbh_entries);
+};
+
+struct rbh_table {
+	DECLARE_FLEX_ARRAY(struct rbh_bucket __rcu *, buckets);
+};
+
+#ifdef CONFIG_64BIT
+#define INITIAL_SLOTS	10
+/* 10 (128), 20 (248), 42 (512), 84 (1016), 170 (2048), 340 (4088) */
+#define MAX_SLOTS	340
+#else
+#define INITIAL_SLOTS	15
+/* 15 (128), 31 (256), 63 (512), 127 (1024), 255 (2048, 511 (4096) */
+#define MAX_SLOTS	511
+#endif
+
+static inline unsigned int next_bucket_size(unsigned int nr)
+{
+#ifdef CONFIG_64BIT
+	if (nr % 4 == 0)
+		return nr * 2 + 2;
+	return nr * 2;
+#else
+	return nr * 2 + 1;
+#endif
+}
+
+#define ENTRY_SIZE	(sizeof(int) + sizeof(void *))
+#define BUCKET_SIZE(nr)	(sizeof(struct rbh_bucket) + ENTRY_SIZE * (nr))
+#define INITIAL_SIZE	BUCKET_SIZE(INITIAL_SLOTS)
+
+struct rbh_initial_bucket {
+	struct rbh_bucket b;
+	struct {
+		u32		rbh_hashes[INITIAL_SLOTS];
+		void __rcu *	rbh_slots[INITIAL_SLOTS];
+	} i;
+};
+
+struct rbh_initial_table {
+	struct rbh_initial_bucket buckets[2];
+};
+
+static void __rcu **bucket_slots(const struct rbh_bucket *bucket,
+		unsigned int size)
+{
+	unsigned long p = (unsigned long)&bucket->rbh_hashes[size];
+	return (void __rcu **)((p | (sizeof(void *) - 1)) + 1);
+}
+
+/*
+ * As far as lockdep is concerned, all buckets in the same rosebush use
+ * the same lock.  We use the classes to distinguish the rbh resize lock
+ * from the bucket locks.  The only time we take two bucket locks is
+ * when we already hold the resize lock, so there is no need to define
+ * an order between bucket locks.
+ */
+#ifdef CONFIG_DEBUG_SPINLOCK
+#define bucket_lock_init(rbh, bucket)					\
+	__raw_spin_lock_init(spinlock_check(&(bucket)->rbh_lock),	\
+		"rbh", (rbh)->rbh_resize_lock.dep_map.key, LD_WAIT_SPIN)
+#else
+#define bucket_lock_init(rbh, bucket)					\
+	spin_lock_init(&(bucket)->rbh_lock)
+#endif
+
+#define rbh_resize_lock(rbh)	spin_lock_nested(&(rbh)->rbh_resize_lock, 2)
+#define rbh_resize_unlock(rbh)	spin_unlock(&(rbh)->rbh_resize_lock)
+
+enum split_state {
+	SPLIT_MAYBE,	/* Willing to try to split */
+	SPLIT_SHOULD,	/* Try to split */
+	SPLIT_DONE,	/* Already tried to split a bucket */
+};
+
+struct insert_state {
+	enum split_state split;
+	u32 mask;
+};
+
+/*
+ * A very complicated way of spelling &rbh->bucket[hash].
+ *
+ * The first complication is that we encode the number of buckets
+ * in the pointer so that we can get both in an atomic load.
+ *
+ * The second complication is that small hash tables don't have a top
+ * level table; instead the buckets pointer points to a pair of buckets
+ * of size 128 bytes.
+ *
+ * The third complication is that we reuse the first two slots of the
+ * table for RCU freeing, so we have to check that didn't happen before
+ * we return the pointer.  That does not absolve us of rechecking after
+ * we get the lock in the caller; we have to check here that we have a
+ * pointer to a bucket, and we have to check after we get the lock that
+ * this bucket is still the current bucket.
+ */
+static struct rbh_bucket *get_bucket(struct rbh *rbh, u32 hash,
+		struct insert_state *state)
+{
+	unsigned long tagged;
+	struct rbh_table *table;
+	struct rbh_initial_table *initial_table;
+	u32 mask;
+	unsigned int bid;
+
+	/* rcu_dereference(), but not a pointer */
+	tagged = READ_ONCE(rbh->rbh_table);
+	if (!tagged)
+		return NULL;
+
+	/* The lowest bits indicates how many buckets the table holds */
+	table = (struct rbh_table *)(tagged & (tagged + 1));
+	mask = tagged - (unsigned long)table;
+	bid = hash & mask;
+	if (mask != 1) {
+		if (state) {
+			if (state->split == SPLIT_MAYBE) {
+				u32 buddy = bid ^ ((mask + 1) / 2);
+				if (table->buckets[bid] ==
+				    table->buckets[buddy])
+					state->split = SPLIT_SHOULD;
+				else
+					state->split = SPLIT_DONE;
+			}
+			state->mask = mask;
+		}
+		return rcu_dereference(table->buckets[bid]);
+	}
+	initial_table = (struct rbh_initial_table *)table;
+	if (state) {
+		state->split = SPLIT_DONE;
+		state->mask = mask;
+	}
+	return &initial_table->buckets[bid].b;
+}
+
+static struct rbh_bucket *lock_bucket(struct rbh *rbh, u32 hash)
+	__cond_acquires(&bucket->rbh_lock)
+{
+	struct rbh_bucket *bucket, *new_bucket;
+
+	bucket = get_bucket(rbh, hash, NULL);
+	if (!bucket)
+		return bucket;
+again:
+	spin_lock(&bucket->rbh_lock);
+	new_bucket = get_bucket(rbh, hash, NULL);
+	if (bucket == new_bucket)
+		return bucket;
+	spin_unlock(&bucket->rbh_lock);
+	bucket = new_bucket;
+	goto again;
+}
+
+static bool rbh_first(struct rbh *rbh, u32 hash)
+{
+	struct rbh_initial_table *table;
+	int i;
+
+printk("%s: table size %zd\n", __func__, sizeof(*table));
+	table = kmalloc(sizeof(*table), GFP_KERNEL);
+	if (!table)
+		return false;
+
+	rbh_resize_lock(rbh);
+	if (rbh->rbh_table) {
+		rbh_resize_unlock(rbh);
+		/* Somebody else resized it for us */
+		kfree(table);
+		return true;
+	}
+
+	bucket_lock_init(rbh, &table->buckets[0].b);
+	table->buckets[0].b.rbh_entries = INITIAL_SLOTS;
+	table->buckets[0].b.rbh_contention = 0;
+	bucket_lock_init(rbh, &table->buckets[1].b);
+	table->buckets[1].b.rbh_entries = INITIAL_SLOTS;
+	table->buckets[1].b.rbh_contention = 0;
+	for (i = 0; i < INITIAL_SLOTS; i++) {
+		table->buckets[0].b.rbh_hashes[i] = ~0;
+		table->buckets[1].b.rbh_hashes[i] = 0;
+	}
+	/* rcu_assign_pointer() but not a pointer */
+	smp_store_release(&rbh->rbh_table, (unsigned long)table | 1);
+	rbh_resize_unlock(rbh);
+
+printk("%s: new table = %px\n", __func__, table);
+	return true;
+}
+
+static void copy_initial_buckets(const struct rbh *rbh,
+		struct rbh_table *table, struct rbh_initial_table *init_table)
+	__acquires(&init_table->buckets[0].b.rbh_lock)
+	__acquires(&init_table->buckets[1].b.rbh_lock)
+{
+	struct rbh_bucket *bucket;
+
+	bucket = (void __force *)table->buckets[0];
+	spin_lock(&init_table->buckets[0].b.rbh_lock);
+	memcpy(bucket, &init_table->buckets[0], sizeof(init_table->buckets[0]));
+	bucket_lock_init(rbh, bucket);
+
+	bucket = (void __force *)table->buckets[1];
+	spin_lock_nested(&init_table->buckets[1].b.rbh_lock, 1);
+	memcpy(bucket, &init_table->buckets[1], sizeof(init_table->buckets[1]));
+	bucket_lock_init(rbh, bucket);
+}
+
+/*
+ * When we grow the table, we duplicate the bucket pointers so this
+ * thread doesn't pay the entire cost of growing the table.
+ */
+static int rbh_grow_table(struct rbh *rbh, u32 hash)
+{
+	struct rbh_table *table, *old_table;
+	struct rbh_initial_table *init_table;
+	unsigned long tagged;
+	u32 mask, buddy;
+	size_t size;
+
+	/* If the bucket is shared, we don't need to grow the table */
+	tagged = READ_ONCE(rbh->rbh_table);
+	table = (struct rbh_table *)(tagged & (tagged + 1));
+	mask = tagged - (unsigned long)table;
+	buddy = (hash ^ ((mask + 1) / 2)) & mask;
+	if (mask > 1 && table->buckets[hash & mask] == table->buckets[buddy])
+		return 0;
+
+	old_table = table;
+	size = (mask + 1) * 2 * sizeof(void *);
+	if (size > 4 * PAGE_SIZE)
+		/* XXX: NUMA_NO_NODE doesn't necessarily interleave */
+		table = __vmalloc_node(size, size, GFP_KERNEL, NUMA_NO_NODE,
+				&table);
+	else
+		table = kvmalloc(size, GFP_KERNEL);
+	if (!table) {
+		/* Maybe somebody resized it for us */
+		if (READ_ONCE(rbh->rbh_table) != tagged)
+			return 0;
+		return -ENOMEM;
+	}
+
+	if (mask == 1) {
+		/* Don't need to bother with RCU until we publish the table */
+		table->buckets[0] = (void __rcu *)kmalloc(INITIAL_SIZE, GFP_KERNEL);
+		if (!table->buckets[0])
+			goto free_all;
+		table->buckets[1] = (void __rcu *)kmalloc(INITIAL_SIZE, GFP_KERNEL);
+		if (!table->buckets[1])
+			goto free_all;
+	}
+
+	rbh_resize_lock(rbh);
+	if (rbh->rbh_table != tagged) {
+		rbh_resize_unlock(rbh);
+		/* Somebody else resized it for us */
+		kvfree(table);
+		return 0;
+	}
+
+printk("%s: replacing old_table %px with table %px mask %d\n", __func__, old_table, table, mask);
+	if (mask == 1) {
+		init_table = (void *)old_table;
+		copy_initial_buckets(rbh, table, init_table);
+	} else {
+		memcpy(&table->buckets, &old_table->buckets,
+				(mask + 1) * sizeof(void *));
+	}
+	memcpy(&table->buckets[mask + 1], &table->buckets[0],
+			(mask + 1) * sizeof(void *));
+
+	tagged = ((unsigned long)table) | (mask << 1) | 1;
+	/* rcu_assign_pointer() but not a pointer */
+	smp_store_release(&rbh->rbh_table, tagged);
+	rbh_resize_unlock(rbh);
+	if (mask == 1) {
+		spin_unlock(&init_table->buckets[0].b.rbh_lock);
+		spin_unlock(&init_table->buckets[1].b.rbh_lock);
+	}
+	kvfree_rcu_mightsleep(old_table);
+
+	return 0;
+free_all:
+	kfree((void __force *)table->buckets[0]);
+	kvfree(table);
+	return false;
+}
+
+static void bucket_copy(const struct rbh *rbh, struct rbh_bucket *bucket,
+		const struct rbh_bucket *old_bucket, unsigned int nr,
+		u32 hash, u32 mask)
+{
+	unsigned int old_nr = old_bucket->rbh_entries;
+	void __rcu **old_slots = bucket_slots(old_bucket, old_nr);
+	void __rcu **slots = bucket_slots(bucket, nr);
+	unsigned int i, j = 0;
+
+	bucket->rbh_entries = nr;
+	bucket_lock_init(rbh, bucket);
+	for (i = 0; i < old_nr; i++) {
+		if ((old_bucket->rbh_hashes[i] & mask) != (hash & mask))
+			continue;
+		bucket->rbh_hashes[j] = old_bucket->rbh_hashes[i];
+		slots[j++] = old_slots[i];
+	}
+printk("%s: bucket:%px(%d) copied %d/%d entries from %px hash:%x mask:%x\n", __func__, bucket, nr, j, old_nr, old_bucket, hash, mask);
+
+	while (j < nr)
+		bucket->rbh_hashes[j++] = ~hash;
+}
+
+#define rbh_dereference_protected(p, rbh)				\
+	rcu_dereference_protected(p, lockdep_is_held(&(rbh)->rbh_resize_lock))
+
+/*
+ */
+static void rbh_split_bucket(struct rbh *rbh,
+		struct rbh_bucket *old_bucket, unsigned int nr, u32 hash)
+{
+	struct rbh_table *table;
+	unsigned long tagged;
+	u32 mask, bit;
+	struct rbh_bucket *bucket = kmalloc(BUCKET_SIZE(nr), GFP_KERNEL);
+
+printk("%s: adding bucket %px for hash %d\n", __func__, bucket, hash);
+	if (!bucket)
+		return;
+
+	rbh_resize_lock(rbh);
+	tagged = rbh->rbh_table;
+	table = (struct rbh_table *)(tagged & (tagged + 1));
+	mask = tagged - (unsigned long)table;
+	hash &= mask;
+	if (rbh_dereference_protected(table->buckets[hash], rbh) != old_bucket)
+		goto free;
+
+	/* Figure out how many buckets we need to fill */
+	bit = (mask + 1) / 2;
+	mask = 0;
+	while (bit > 1) {
+printk("hash:%d buddy:%d\n", hash, hash ^ bit);
+		if (rbh_dereference_protected(table->buckets[hash ^ bit], rbh)
+				!= old_bucket)
+			break;
+		mask |= bit;
+		bit /= 2;
+	}
+	if (!mask)
+		goto free;
+
+	spin_lock(&old_bucket->rbh_lock);
+	bucket->rbh_contention = 0;
+	bucket_copy(rbh, bucket, old_bucket, nr, hash, mask | 1);
+
+	mask = (mask & (mask - 1)) / 2;
+printk("hash:%d mask:%d bit:%d\n", hash, mask, bit);
+	hash &= ~mask;
+	for (;;) {
+printk("assigning bucket %px to index %d\n", bucket, hash);
+		rcu_assign_pointer(table->buckets[hash], bucket);
+		if (hash == (hash | mask))
+			break;
+		hash += bit;
+	}
+	spin_unlock(&old_bucket->rbh_lock);
+	bucket = NULL;
+free:
+	rbh_resize_unlock(rbh);
+printk("%s: freeing bucket %px\n", __func__, bucket);
+	kfree(bucket);
+}
+
+/*
+ * thorns are leftovers from when this bucket was shared.  If they are
+ * sufficiently numerous, we'll just allocate another bucket of this size.
+ * Otherwise we'll allocate a larger bucket.
+ */
+static int rbh_expand_bucket(struct rbh *rbh,
+		struct rbh_bucket *old_bucket, unsigned int nr,
+		unsigned int thorns, u32 hash, u32 old_mask)
+{
+	unsigned long tagged;
+	struct rbh_table *table;
+	struct rbh_bucket *bucket;
+	u32 mask;
+
+	/* Can't expand an initial bucket, must create a table */
+	if (old_mask == 1)
+		return rbh_grow_table(rbh, hash);
+	if (thorns < nr / 4)
+		nr = next_bucket_size(nr);
+	if (nr > MAX_SLOTS)
+		return -E2BIG;
+
+	bucket = kmalloc(BUCKET_SIZE(nr), GFP_KERNEL);
+	if (!bucket)
+		return -ENOMEM;
+
+printk("%s: adding bucket %px for hash %d\n", __func__, bucket, hash);
+	rbh_resize_lock(rbh);
+	tagged = READ_ONCE(rbh->rbh_table);
+	table = (struct rbh_table *)(tagged & (tagged + 1));
+	mask = tagged - (unsigned long)table;
+
+	/* If the table expanded while we slept just try again */
+	if (old_mask != mask)
+		goto free;
+	hash &= mask;
+	if (rbh_dereference_protected(table->buckets[hash], rbh) != old_bucket)
+		goto free;
+
+	spin_lock(&old_bucket->rbh_lock);
+	bucket->rbh_contention = old_bucket->rbh_contention;
+	bucket_copy(rbh, bucket, old_bucket, nr, hash, mask);
+	rcu_assign_pointer(table->buckets[hash], bucket);
+
+	spin_unlock(&old_bucket->rbh_lock);
+	bucket = NULL;
+free:
+	rbh_resize_unlock(rbh);
+printk("%s: freeing bucket %px\n", __func__, bucket);
+	kfree(bucket);
+	if (bucket)
+		kvfree_rcu_mightsleep(old_bucket);
+
+	return 0;
+}
+
+static int __rbh_insert(struct rbh *rbh, u32 hash, void *p)
+{
+	struct rbh_bucket *bucket, *new_bucket;
+	unsigned int i, nr, thorns = 0;
+	int err;
+	struct insert_state state = { .split = SPLIT_MAYBE, };
+
+restart:
+	rcu_read_lock();
+	bucket = get_bucket(rbh, hash, &state);
+	if (unlikely(!bucket)) {
+		rcu_read_unlock();
+		if (!rbh_first(rbh, hash))
+			return -ENOMEM;
+		goto restart;
+	}
+
+again:
+	if (spin_trylock(&bucket->rbh_lock)) {
+		if (bucket->rbh_contention)
+			bucket->rbh_contention--;
+	} else {
+		spin_lock(&bucket->rbh_lock);
+		/* Numbers chosen ad-hoc */
+		bucket->rbh_contention += 10;
+		if (unlikely(bucket->rbh_contention > 5000)) {
+			spin_unlock(&bucket->rbh_lock);
+			rcu_read_unlock();
+			/* OK if this fails; it's only contention */
+			rbh_grow_table(rbh, hash);
+
+			rcu_read_lock();
+			bucket = get_bucket(rbh, hash, &state);
+			spin_lock(&bucket->rbh_lock);
+		}
+	}
+
+	new_bucket = get_bucket(rbh, hash, &state);
+	if (bucket != new_bucket) {
+		spin_unlock(&bucket->rbh_lock);
+		bucket = new_bucket;
+		goto again;
+	}
+
+printk("%s: bucket:%px hash %d\n", __func__, bucket, hash);
+	nr = bucket->rbh_entries;
+	/* If the bucket is shared, split it before inserting a new element */
+	if (state.split == SPLIT_SHOULD) {
+		spin_unlock(&bucket->rbh_lock);
+		rcu_read_unlock();
+		rbh_split_bucket(rbh, bucket, nr, hash);
+		state.split = SPLIT_DONE;
+		goto restart;
+	}
+
+	/* Deleted elements differ in their bottom bit */
+	for (i = 0; i < nr; i++) {
+		void __rcu **slots;
+		u32 bhash = bucket->rbh_hashes[i];
+
+		if ((bhash & state.mask) != (hash & state.mask))
+			thorns++;
+		if ((bhash & 1) == (hash & 1))
+			continue;
+printk("%s: hash:%x bhash:%x index %d\n", __func__, hash, bhash, i);
+		slots = bucket_slots(bucket, nr);
+		rcu_assign_pointer(slots[i], p);
+		/* This array is read under RCU */
+		WRITE_ONCE(bucket->rbh_hashes[i], hash);
+
+		spin_unlock(&bucket->rbh_lock);
+		rcu_read_unlock();
+		return 0;
+	}
+
+	/* No space in this bucket */
+	spin_unlock(&bucket->rbh_lock);
+	rcu_read_unlock();
+
+	err = rbh_expand_bucket(rbh, bucket, nr, thorns, hash, state.mask);
+	if (err == -ENOMEM)
+		return -ENOMEM;
+	if (err == -E2BIG) {
+		if (rbh_grow_table(rbh, hash) < 0)
+			return -ENOMEM;
+	}
+	state.split = SPLIT_MAYBE;
+	goto restart;
+}
+
+/**
+ * rbh_insert - Add a pointer to a rosebush.
+ * @rbh: The rosebush.
+ * @hash: The hash value for this pointer.
+ * @p: The pointer to add.
+ *
+ * Return: 0 on success, -ENOMEM if memory allocation fails,
+ * -EINVAL if @p is NULL.
+ */
+int rbh_insert(struct rbh *rbh, u32 hash, void *p)
+{
+	if (p == NULL)
+		return -EINVAL;
+	return __rbh_insert(rbh, hash, p);
+}
+EXPORT_SYMBOL(rbh_insert);
+
+/**
+ * rbh_remove - Remove a pointer from a rosebush.
+ * @rbh: The rosebush.
+ * @hash: The hash value for this pointer.
+ * @p: The pointer to remove.
+ *
+ * Return: 0 on success, -ENOENT if this pointer could not be found.
+ */
+int rbh_remove(struct rbh *rbh, u32 hash, void *p)
+{
+	struct rbh_bucket *bucket;
+	void __rcu **slots;
+	unsigned int i, nr;
+	int err = -ENOENT;
+
+	rcu_read_lock();
+	bucket = lock_bucket(rbh, hash);
+	if (!bucket)
+		goto rcu_unlock;
+
+	nr = bucket->rbh_entries;
+	slots = bucket_slots(bucket, nr);
+	for (i = 0; i < nr; i++) {
+		if (bucket->rbh_hashes[i] != hash)
+			continue;
+		if (rcu_dereference_protected(slots[i],
+				lockdep_is_held(&bucket->rbh_lock)) != p)
+			continue;
+		bucket->rbh_hashes[i] = ~hash;
+		/* Do not modify the slot */
+		err = 0;
+		break;
+	}
+
+	spin_unlock(&bucket->rbh_lock);
+rcu_unlock:
+	rcu_read_unlock();
+	return err;
+}
+EXPORT_SYMBOL(rbh_remove);
+
+/**
+ * rbh_reserve - Reserve a slot in a rosebush for later use.
+ * @rbh: The rosebush.
+ * @hash: The hash value that will be used.
+ *
+ * Some callers need to take another lock before inserting an object
+ * into the rosebush.  This function reserves space for them to do that.
+ * A subsequent call to rbh_use() will not allocate memory.  If you find
+ * that you do not need the reserved space any more, call rbh_remove(),
+ * passing NULL as the pointer.
+ *
+ * Return: 0 on success, -ENOMEM on failure.
+ */
+int rbh_reserve(struct rbh *rbh, u32 hash)
+{
+	return __rbh_insert(rbh, hash, NULL);
+}
+EXPORT_SYMBOL(rbh_reserve);
+
+/**
+ * rbh_use - Use a reserved slot in a rosebush.
+ * @rbh: The rosebush.
+ * @hash: The hash value for this pointer.
+ * @p: The pointer to add.
+ *
+ * Return: 0 on success, -EINVAL if @p is NULL,
+ * -ENOENT if no reserved slot could be found.
+ */
+int rbh_use(struct rbh *rbh, u32 hash, void *p)
+{
+	struct rbh_bucket *bucket;
+	void __rcu **slots;
+	unsigned int i, nr;
+	int err = -ENOENT;
+
+	rcu_read_lock();
+	bucket = lock_bucket(rbh, hash);
+	if (!bucket)
+		goto rcu_unlock;
+
+	nr = bucket->rbh_entries;
+	slots = bucket_slots(bucket, nr);
+	for (i = 0; i < nr; i++) {
+		if (bucket->rbh_hashes[i] != hash)
+			continue;
+		if (slots[i] != NULL)
+			continue;
+		rcu_assign_pointer(slots[i], p);
+		err = 0;
+		break;
+	}
+
+	spin_unlock(&bucket->rbh_lock);
+rcu_unlock:
+	rcu_read_unlock();
+	return err;
+}
+EXPORT_SYMBOL(rbh_use);
+
+/**
+ * rbh_next - Find the next entry matching this hash
+ * @rbhi: The rosebush iterator.
+ *
+ * Return: NULL if there are no more matching hash values, otherwise
+ * the next pointer.
+ */
+void *rbh_next(struct rbh_iter *rbhi)
+{
+	struct rbh_bucket *bucket = rbhi->bucket;
+	unsigned int nr;
+	void __rcu **slots;
+	void *p;
+
+	if (!bucket) {
+		bucket = get_bucket(rbhi->rbh, rbhi->hash, NULL);
+		if (!bucket)
+			return NULL;
+		rbhi->bucket = bucket;
+		rbhi->index = UINT_MAX;
+	}
+
+	nr = bucket->rbh_entries;
+	slots = bucket_slots(bucket, nr);
+
+	while (++rbhi->index < nr) {
+		if (READ_ONCE(bucket->rbh_hashes[rbhi->index]) != rbhi->hash)
+			continue;
+		p = rcu_dereference(slots[rbhi->index]);
+		if (p)
+			return p;
+	}
+
+	return NULL;
+}
+EXPORT_SYMBOL(rbh_next);
+
+/*
+ * TODO:
+ * * convert the dcache
+ * * 2 byte hashes in the bucket.  Once the table has 2^17 buckets, we can
+ *   use 10 bytes per entry instead of 12
+ * * 1 byte hashes in the bucket.  Once the table has 2^25 buckets, we can
+ *   use 9 bytes per entry instead of 10?
+ */
diff --git a/lib/test_rosebush.c b/lib/test_rosebush.c
new file mode 100644
index 000000000000..344479cb8a94
--- /dev/null
+++ b/lib/test_rosebush.c
@@ -0,0 +1,135 @@ 
+#include <linux/rosebush.h>
+#include <kunit/test.h>
+
+static void iter_rbh(struct kunit *test, struct rbh *rbh, u32 hash, void *p)
+{
+	RBH_ITER(iter, rbh, hash);
+	void *q;
+
+	rcu_read_lock();
+	q = rbh_next(&iter);
+	rcu_read_unlock();
+	KUNIT_EXPECT_PTR_EQ_MSG(test, p, q,
+		"rbh_next hash:%u returned %px, expected %px", hash, q, p);
+}
+
+static void check_empty_rbh(struct kunit *test, struct rbh *rbh)
+{
+	iter_rbh(test, rbh, 0, NULL);
+	iter_rbh(test, rbh, 1, NULL);
+	iter_rbh(test, rbh, 17, NULL);
+	iter_rbh(test, rbh, 42, NULL);
+}
+
+static void insert(struct kunit *test, struct rbh *rbh, u32 hash)
+{
+	void *p = (void *)((hash << 1) | 1UL);
+	int err;
+
+	err = rbh_insert(rbh, hash, p);
+	KUNIT_EXPECT_EQ(test, err, 0);
+
+	iter_rbh(test, rbh, hash, p);
+}
+
+static void reserve(struct kunit *test, struct rbh *rbh, u32 hash)
+{
+	int err;
+
+	err = rbh_reserve(rbh, hash);
+	KUNIT_EXPECT_EQ(test, err, 0);
+
+	iter_rbh(test, rbh, hash, NULL);
+}
+
+static void use(struct kunit *test, struct rbh *rbh, u32 hash)
+{
+	void *p = (void *)((hash << 1) | 1UL);
+	int err;
+
+	err = rbh_use(rbh, hash, p);
+	KUNIT_EXPECT_EQ(test, err, 0);
+
+	iter_rbh(test, rbh, hash, p);
+}
+
+static void remove(struct kunit *test, struct rbh *rbh, u32 hash)
+{
+	void *p = (void *)((hash << 1) | 1UL);
+	int err;
+
+	err = rbh_remove(rbh, hash, p);
+	KUNIT_EXPECT_EQ(test, err, 0);
+
+	iter_rbh(test, rbh, hash, NULL);
+}
+
+static DEFINE_ROSEBUSH(rosebush);
+
+/*
+ * Conduct a number of tests on a rosebush that has never been used.
+ * They should all return NULL or an errno.  We're looking for crashes
+ * here.
+ */
+static void empty(struct kunit *test)
+{
+	int err;
+
+	check_empty_rbh(test, &rosebush);
+	err = rbh_remove(&rosebush, 0, test);
+	KUNIT_EXPECT_EQ(test, err, -ENOENT);
+	err = rbh_use(&rosebush, 0, test);
+	KUNIT_EXPECT_EQ(test, err, -ENOENT);
+	KUNIT_EXPECT_EQ(test, rosebush.rbh_table, 0);
+}
+
+static void first(struct kunit *test)
+{
+	int err;
+
+	insert(test, &rosebush, 5);
+	check_empty_rbh(test, &rosebush);
+	remove(test, &rosebush, 5);
+	check_empty_rbh(test, &rosebush);
+
+	err = rbh_remove(&rosebush, 5, NULL);
+	KUNIT_EXPECT_EQ(test, err, -ENOENT);
+	reserve(test, &rosebush, 5);
+	err = rbh_remove(&rosebush, 5, test);
+	KUNIT_EXPECT_EQ(test, err, -ENOENT);
+	err = rbh_remove(&rosebush, 5, NULL);
+	KUNIT_EXPECT_EQ(test, err, 0);
+	err = rbh_remove(&rosebush, 5, NULL);
+	KUNIT_EXPECT_EQ(test, err, -ENOENT);
+
+	reserve(test, &rosebush, 5);
+	use(test, &rosebush, 5);
+	err = rbh_remove(&rosebush, 5, NULL);
+	KUNIT_EXPECT_EQ(test, err, -ENOENT);
+	remove(test, &rosebush, 5);
+}
+
+static void grow(struct kunit *test)
+{
+	int i;
+
+	for (i = 3; i < 3333; i += 2)
+		insert(test, &rosebush, i);
+}
+
+static struct kunit_case rosebush_cases[] __refdata = {
+	KUNIT_CASE(empty),
+	KUNIT_CASE(first),
+	KUNIT_CASE(grow),
+	{}
+};
+
+static struct kunit_suite rosebush_suite = {
+	.name = "rosebush",
+	.test_cases = rosebush_cases,
+};
+
+kunit_test_suite(rosebush_suite);
+
+MODULE_AUTHOR("Matthew Wilcox (Oracle) <willy@infradead.org>");
+MODULE_LICENSE("GPL");