Linux RCU subsystem development
 help / color / mirror / Atom feed
* [PATCH 0/9] rcu_pending
@ 2024-08-19 16:59 Kent Overstreet
  2024-08-19 16:59 ` [PATCH 1/9] lib/generic-radix-tree.c: genradix_ptr_inlined() Kent Overstreet
                   ` (9 more replies)
  0 siblings, 10 replies; 24+ messages in thread
From: Kent Overstreet @ 2024-08-19 16:59 UTC (permalink / raw)
  To: rcu; +Cc: Kent Overstreet, paulmck, linux-kernel

New data structure for tracking objects waiting on an RCU grace period.
Supports regular RCU and SRCU, and possibly other RCU flavors in the
future. Uses radix trees for tracking pending objects, falling back to
linked lists on allocation failure.

This gets us a more general replacement for SLAB_TYPESAFE_BY_RCU, and a
cleaner and slightly faster backend for kvfree_call_rcu(), and
in the future a faster backend for call_rcu() as well.

There's still some small todo items, mentioned in the relevant patches.

Paul - I'm considering putting this into 6.11 for bcachefs (not the
patch that switches kvfree_rcu, of course), as I need it rather
pressingly. Thoughts? I can put it in fs/bcachefs/ if you hate it :)

Kent Overstreet (9):
  lib/generic-radix-tree.c: genradix_ptr_inlined()
  lib/generic-radix-tree.c: add preallocation
  darray: lift from bcachefs
  vmalloc: is_vmalloc_addr_inlined()
  rcu: delete lockdep_assert_irqs_enabled() assert in
    start_poll_synchronize_rcu_common()
  rcu: rcu_pending
  bcachefs: Rip out freelists from btree key cache
  bcachefs: key cache can now allocate from pending
  rcu: Switch kvfree_rcu() to new rcu_pending

 MAINTAINERS                             |   7 +
 fs/bcachefs/Makefile                    |   1 -
 fs/bcachefs/btree_key_cache.c           | 406 +++----------
 fs/bcachefs/btree_key_cache_types.h     |  18 +-
 fs/bcachefs/btree_node_scan_types.h     |   2 +-
 fs/bcachefs/btree_types.h               |   5 +-
 fs/bcachefs/btree_update.c              |   2 +
 fs/bcachefs/btree_write_buffer_types.h  |   2 +-
 fs/bcachefs/disk_accounting_types.h     |   2 +-
 fs/bcachefs/fsck.c                      |   2 +-
 fs/bcachefs/journal_io.h                |   2 +-
 fs/bcachefs/journal_sb.c                |   2 +-
 fs/bcachefs/sb-downgrade.c              |   3 +-
 fs/bcachefs/sb-errors_types.h           |   2 +-
 fs/bcachefs/sb-members.h                |   2 +-
 fs/bcachefs/subvolume.h                 |   1 -
 fs/bcachefs/subvolume_types.h           |   2 +-
 fs/bcachefs/thread_with_file_types.h    |   2 +-
 fs/bcachefs/util.h                      |  29 +-
 {fs/bcachefs => include/linux}/darray.h |  59 +-
 include/linux/darray_types.h            |  22 +
 include/linux/generic-radix-tree.h      | 106 +++-
 include/linux/mm.h                      |   7 +
 include/linux/rcu_pending.h             |  27 +
 init/main.c                             |   2 +
 kernel/rcu/Makefile                     |   2 +-
 kernel/rcu/pending.c                    | 623 ++++++++++++++++++++
 kernel/rcu/tree.c                       | 747 ------------------------
 kernel/rcu/update.c                     |   1 -
 lib/Makefile                            |   2 +-
 {fs/bcachefs => lib}/darray.c           |  12 +-
 lib/generic-radix-tree.c                |  80 +--
 mm/vmalloc.c                            |   4 +-
 33 files changed, 962 insertions(+), 1224 deletions(-)
 rename {fs/bcachefs => include/linux}/darray.h (66%)
 create mode 100644 include/linux/darray_types.h
 create mode 100644 include/linux/rcu_pending.h
 create mode 100644 kernel/rcu/pending.c
 rename {fs/bcachefs => lib}/darray.c (57%)

-- 
2.45.2


^ permalink raw reply	[flat|nested] 24+ messages in thread

* [PATCH 1/9] lib/generic-radix-tree.c: genradix_ptr_inlined()
  2024-08-19 16:59 [PATCH 0/9] rcu_pending Kent Overstreet
@ 2024-08-19 16:59 ` Kent Overstreet
  2024-08-19 16:59 ` [PATCH 2/9] lib/generic-radix-tree.c: add preallocation Kent Overstreet
                   ` (8 subsequent siblings)
  9 siblings, 0 replies; 24+ messages in thread
From: Kent Overstreet @ 2024-08-19 16:59 UTC (permalink / raw)
  To: rcu; +Cc: Kent Overstreet, paulmck, linux-kernel

Provide an inlined fast path

Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
---
 include/linux/generic-radix-tree.h | 75 ++++++++++++++++++++++++++++++
 lib/generic-radix-tree.c           | 64 +------------------------
 2 files changed, 76 insertions(+), 63 deletions(-)

diff --git a/include/linux/generic-radix-tree.h b/include/linux/generic-radix-tree.h
index f3512fddf3d7..8a3e1e886d1c 100644
--- a/include/linux/generic-radix-tree.h
+++ b/include/linux/generic-radix-tree.h
@@ -48,6 +48,49 @@ struct genradix_root;
 #define GENRADIX_NODE_SHIFT	9
 #define GENRADIX_NODE_SIZE	(1U << GENRADIX_NODE_SHIFT)
 
+#define GENRADIX_ARY		(GENRADIX_NODE_SIZE / sizeof(struct genradix_node *))
+#define GENRADIX_ARY_SHIFT	ilog2(GENRADIX_ARY)
+
+/* depth that's needed for a genradix that can address up to ULONG_MAX: */
+#define GENRADIX_MAX_DEPTH	\
+	DIV_ROUND_UP(BITS_PER_LONG - GENRADIX_NODE_SHIFT, GENRADIX_ARY_SHIFT)
+
+#define GENRADIX_DEPTH_MASK				\
+	((unsigned long) (roundup_pow_of_two(GENRADIX_MAX_DEPTH + 1) - 1))
+
+static inline int genradix_depth_shift(unsigned depth)
+{
+	return GENRADIX_NODE_SHIFT + GENRADIX_ARY_SHIFT * depth;
+}
+
+/*
+ * Returns size (of data, in bytes) that a tree of a given depth holds:
+ */
+static inline size_t genradix_depth_size(unsigned depth)
+{
+	return 1UL << genradix_depth_shift(depth);
+}
+
+static inline unsigned genradix_root_to_depth(struct genradix_root *r)
+{
+	return (unsigned long) r & GENRADIX_DEPTH_MASK;
+}
+
+static inline struct genradix_node *genradix_root_to_node(struct genradix_root *r)
+{
+	return (void *) ((unsigned long) r & ~GENRADIX_DEPTH_MASK);
+}
+
+struct genradix_node {
+	union {
+		/* Interior node: */
+		struct genradix_node	*children[GENRADIX_ARY];
+
+		/* Leaf: */
+		u8			data[GENRADIX_NODE_SIZE];
+	};
+};
+
 struct __genradix {
 	struct genradix_root		*root;
 };
@@ -128,6 +171,30 @@ static inline size_t __idx_to_offset(size_t idx, size_t obj_size)
 #define __genradix_idx_to_offset(_radix, _idx)			\
 	__idx_to_offset(_idx, __genradix_obj_size(_radix))
 
+static inline void *__genradix_ptr_inlined(struct __genradix *radix, size_t offset)
+{
+	struct genradix_root *r = READ_ONCE(radix->root);
+	struct genradix_node *n = genradix_root_to_node(r);
+	unsigned level		= genradix_root_to_depth(r);
+	unsigned shift		= genradix_depth_shift(level);
+
+	if (unlikely(ilog2(offset) >= genradix_depth_shift(level)))
+		return NULL;
+
+	while (n && shift > GENRADIX_NODE_SHIFT) {
+		shift -= GENRADIX_ARY_SHIFT;
+		n = n->children[offset >> shift];
+		offset &= (1UL << shift) - 1;
+	}
+
+	return n ? &n->data[offset] : NULL;
+}
+
+#define genradix_ptr_inlined(_radix, _idx)			\
+	(__genradix_cast(_radix)				\
+	 __genradix_ptr_inlined(&(_radix)->tree,		\
+			__genradix_idx_to_offset(_radix, _idx)))
+
 void *__genradix_ptr(struct __genradix *, size_t);
 
 /**
@@ -144,6 +211,14 @@ void *__genradix_ptr(struct __genradix *, size_t);
 
 void *__genradix_ptr_alloc(struct __genradix *, size_t, gfp_t);
 
+#define genradix_ptr_alloc_inlined(_radix, _idx, _gfp)			\
+	(__genradix_cast(_radix)					\
+	 (__genradix_ptr_inlined(&(_radix)->tree,			\
+			__genradix_idx_to_offset(_radix, _idx)) ?:	\
+	  __genradix_ptr_alloc(&(_radix)->tree,				\
+			__genradix_idx_to_offset(_radix, _idx),		\
+			_gfp)))
+
 /**
  * genradix_ptr_alloc - get a pointer to a genradix entry, allocating it
  *			if necessary
diff --git a/lib/generic-radix-tree.c b/lib/generic-radix-tree.c
index fa692c86f069..4efae0663049 100644
--- a/lib/generic-radix-tree.c
+++ b/lib/generic-radix-tree.c
@@ -5,75 +5,13 @@
 #include <linux/gfp.h>
 #include <linux/kmemleak.h>
 
-#define GENRADIX_ARY		(GENRADIX_NODE_SIZE / sizeof(struct genradix_node *))
-#define GENRADIX_ARY_SHIFT	ilog2(GENRADIX_ARY)
-
-struct genradix_node {
-	union {
-		/* Interior node: */
-		struct genradix_node	*children[GENRADIX_ARY];
-
-		/* Leaf: */
-		u8			data[GENRADIX_NODE_SIZE];
-	};
-};
-
-static inline int genradix_depth_shift(unsigned depth)
-{
-	return GENRADIX_NODE_SHIFT + GENRADIX_ARY_SHIFT * depth;
-}
-
-/*
- * Returns size (of data, in bytes) that a tree of a given depth holds:
- */
-static inline size_t genradix_depth_size(unsigned depth)
-{
-	return 1UL << genradix_depth_shift(depth);
-}
-
-/* depth that's needed for a genradix that can address up to ULONG_MAX: */
-#define GENRADIX_MAX_DEPTH	\
-	DIV_ROUND_UP(BITS_PER_LONG - GENRADIX_NODE_SHIFT, GENRADIX_ARY_SHIFT)
-
-#define GENRADIX_DEPTH_MASK				\
-	((unsigned long) (roundup_pow_of_two(GENRADIX_MAX_DEPTH + 1) - 1))
-
-static inline unsigned genradix_root_to_depth(struct genradix_root *r)
-{
-	return (unsigned long) r & GENRADIX_DEPTH_MASK;
-}
-
-static inline struct genradix_node *genradix_root_to_node(struct genradix_root *r)
-{
-	return (void *) ((unsigned long) r & ~GENRADIX_DEPTH_MASK);
-}
-
 /*
  * Returns pointer to the specified byte @offset within @radix, or NULL if not
  * allocated
  */
 void *__genradix_ptr(struct __genradix *radix, size_t offset)
 {
-	struct genradix_root *r = READ_ONCE(radix->root);
-	struct genradix_node *n = genradix_root_to_node(r);
-	unsigned level		= genradix_root_to_depth(r);
-
-	if (ilog2(offset) >= genradix_depth_shift(level))
-		return NULL;
-
-	while (1) {
-		if (!n)
-			return NULL;
-		if (!level)
-			break;
-
-		level--;
-
-		n = n->children[offset >> genradix_depth_shift(level)];
-		offset &= genradix_depth_size(level) - 1;
-	}
-
-	return &n->data[offset];
+	return __genradix_ptr_inlined(radix, offset);
 }
 EXPORT_SYMBOL(__genradix_ptr);
 
-- 
2.45.2


^ permalink raw reply related	[flat|nested] 24+ messages in thread

* [PATCH 2/9] lib/generic-radix-tree.c: add preallocation
  2024-08-19 16:59 [PATCH 0/9] rcu_pending Kent Overstreet
  2024-08-19 16:59 ` [PATCH 1/9] lib/generic-radix-tree.c: genradix_ptr_inlined() Kent Overstreet
@ 2024-08-19 16:59 ` Kent Overstreet
  2024-08-19 16:59 ` [PATCH 3/9] darray: lift from bcachefs Kent Overstreet
                   ` (7 subsequent siblings)
  9 siblings, 0 replies; 24+ messages in thread
From: Kent Overstreet @ 2024-08-19 16:59 UTC (permalink / raw)
  To: rcu; +Cc: Kent Overstreet, paulmck, linux-kernel

Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
---
 include/linux/generic-radix-tree.h | 39 +++++++++++++++++++++++++-----
 lib/generic-radix-tree.c           | 16 ++++--------
 2 files changed, 38 insertions(+), 17 deletions(-)

diff --git a/include/linux/generic-radix-tree.h b/include/linux/generic-radix-tree.h
index 8a3e1e886d1c..340bba5c9735 100644
--- a/include/linux/generic-radix-tree.h
+++ b/include/linux/generic-radix-tree.h
@@ -41,6 +41,7 @@
 #include <linux/limits.h>
 #include <linux/log2.h>
 #include <linux/math.h>
+#include <linux/slab.h>
 #include <linux/types.h>
 
 struct genradix_root;
@@ -81,6 +82,11 @@ static inline struct genradix_node *genradix_root_to_node(struct genradix_root *
 	return (void *) ((unsigned long) r & ~GENRADIX_DEPTH_MASK);
 }
 
+struct __genradix {
+	struct genradix_root		*root;
+	rwlock_t			free_lock;
+};
+
 struct genradix_node {
 	union {
 		/* Interior node: */
@@ -91,9 +97,15 @@ struct genradix_node {
 	};
 };
 
-struct __genradix {
-	struct genradix_root		*root;
-};
+static inline struct genradix_node *genradix_alloc_node(gfp_t gfp_mask)
+{
+	return kzalloc(GENRADIX_NODE_SIZE, gfp_mask);
+}
+
+static inline void genradix_free_node(struct genradix_node *node)
+{
+	kfree(node);
+}
 
 /*
  * NOTE: currently, sizeof(_type) must not be larger than GENRADIX_NODE_SIZE:
@@ -209,7 +221,8 @@ void *__genradix_ptr(struct __genradix *, size_t);
 	 __genradix_ptr(&(_radix)->tree,			\
 			__genradix_idx_to_offset(_radix, _idx)))
 
-void *__genradix_ptr_alloc(struct __genradix *, size_t, gfp_t);
+void *__genradix_ptr_alloc(struct __genradix *, size_t,
+			   struct genradix_node **, gfp_t);
 
 #define genradix_ptr_alloc_inlined(_radix, _idx, _gfp)			\
 	(__genradix_cast(_radix)					\
@@ -217,7 +230,15 @@ void *__genradix_ptr_alloc(struct __genradix *, size_t, gfp_t);
 			__genradix_idx_to_offset(_radix, _idx)) ?:	\
 	  __genradix_ptr_alloc(&(_radix)->tree,				\
 			__genradix_idx_to_offset(_radix, _idx),		\
-			_gfp)))
+			NULL, _gfp)))
+
+#define genradix_ptr_alloc_preallocated_inlined(_radix, _idx, _new_node, _gfp)\
+	(__genradix_cast(_radix)					\
+	 (__genradix_ptr_inlined(&(_radix)->tree,			\
+			__genradix_idx_to_offset(_radix, _idx)) ?:	\
+	  __genradix_ptr_alloc(&(_radix)->tree,				\
+			__genradix_idx_to_offset(_radix, _idx),		\
+			_new_node, _gfp)))
 
 /**
  * genradix_ptr_alloc - get a pointer to a genradix entry, allocating it
@@ -232,7 +253,13 @@ void *__genradix_ptr_alloc(struct __genradix *, size_t, gfp_t);
 	(__genradix_cast(_radix)				\
 	 __genradix_ptr_alloc(&(_radix)->tree,			\
 			__genradix_idx_to_offset(_radix, _idx),	\
-			_gfp))
+			NULL, _gfp))
+
+#define genradix_ptr_alloc_preallocated(_radix, _idx, _new_node, _gfp)\
+	(__genradix_cast(_radix)				\
+	 __genradix_ptr_alloc(&(_radix)->tree,			\
+			__genradix_idx_to_offset(_radix, _idx),	\
+			_new_node, _gfp))
 
 struct genradix_iter {
 	size_t			offset;
diff --git a/lib/generic-radix-tree.c b/lib/generic-radix-tree.c
index 4efae0663049..79e067b51488 100644
--- a/lib/generic-radix-tree.c
+++ b/lib/generic-radix-tree.c
@@ -15,27 +15,21 @@ void *__genradix_ptr(struct __genradix *radix, size_t offset)
 }
 EXPORT_SYMBOL(__genradix_ptr);
 
-static inline struct genradix_node *genradix_alloc_node(gfp_t gfp_mask)
-{
-	return kzalloc(GENRADIX_NODE_SIZE, gfp_mask);
-}
-
-static inline void genradix_free_node(struct genradix_node *node)
-{
-	kfree(node);
-}
-
 /*
  * Returns pointer to the specified byte @offset within @radix, allocating it if
  * necessary - newly allocated slots are always zeroed out:
  */
 void *__genradix_ptr_alloc(struct __genradix *radix, size_t offset,
+			   struct genradix_node **preallocated,
 			   gfp_t gfp_mask)
 {
 	struct genradix_root *v = READ_ONCE(radix->root);
 	struct genradix_node *n, *new_node = NULL;
 	unsigned level;
 
+	if (preallocated)
+		swap(new_node, *preallocated);
+
 	/* Increase tree depth if necessary: */
 	while (1) {
 		struct genradix_root *r = v, *new_root;
@@ -219,7 +213,7 @@ int __genradix_prealloc(struct __genradix *radix, size_t size,
 	size_t offset;
 
 	for (offset = 0; offset < size; offset += GENRADIX_NODE_SIZE)
-		if (!__genradix_ptr_alloc(radix, offset, gfp_mask))
+		if (!__genradix_ptr_alloc(radix, offset, NULL, gfp_mask))
 			return -ENOMEM;
 
 	return 0;
-- 
2.45.2


^ permalink raw reply related	[flat|nested] 24+ messages in thread

* [PATCH 3/9] darray: lift from bcachefs
  2024-08-19 16:59 [PATCH 0/9] rcu_pending Kent Overstreet
  2024-08-19 16:59 ` [PATCH 1/9] lib/generic-radix-tree.c: genradix_ptr_inlined() Kent Overstreet
  2024-08-19 16:59 ` [PATCH 2/9] lib/generic-radix-tree.c: add preallocation Kent Overstreet
@ 2024-08-19 16:59 ` Kent Overstreet
  2024-08-21  1:00   ` Jeff Johnson
  2024-08-19 16:59 ` [PATCH 4/9] vmalloc: is_vmalloc_addr_inlined() Kent Overstreet
                   ` (6 subsequent siblings)
  9 siblings, 1 reply; 24+ messages in thread
From: Kent Overstreet @ 2024-08-19 16:59 UTC (permalink / raw)
  To: rcu; +Cc: Kent Overstreet, paulmck, linux-kernel

dynamic arrays - inspired from CCAN darrays, basically c++ stl vectors.

Used by thread_with_stdio, which is also being lifted from bcachefs for
xfs.

Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
---
 MAINTAINERS                             |  7 +++
 fs/bcachefs/Makefile                    |  1 -
 fs/bcachefs/btree_node_scan_types.h     |  2 +-
 fs/bcachefs/btree_types.h               |  2 +-
 fs/bcachefs/btree_update.c              |  2 +
 fs/bcachefs/btree_write_buffer_types.h  |  2 +-
 fs/bcachefs/disk_accounting_types.h     |  2 +-
 fs/bcachefs/fsck.c                      |  2 +-
 fs/bcachefs/journal_io.h                |  2 +-
 fs/bcachefs/journal_sb.c                |  2 +-
 fs/bcachefs/sb-downgrade.c              |  3 +-
 fs/bcachefs/sb-errors_types.h           |  2 +-
 fs/bcachefs/sb-members.h                |  2 +-
 fs/bcachefs/subvolume.h                 |  1 -
 fs/bcachefs/subvolume_types.h           |  2 +-
 fs/bcachefs/thread_with_file_types.h    |  2 +-
 fs/bcachefs/util.h                      | 29 +-----------
 {fs/bcachefs => include/linux}/darray.h | 59 ++++++++++++++++---------
 include/linux/darray_types.h            | 22 +++++++++
 lib/Makefile                            |  2 +-
 {fs/bcachefs => lib}/darray.c           | 12 ++++-
 21 files changed, 96 insertions(+), 64 deletions(-)
 rename {fs/bcachefs => include/linux}/darray.h (66%)
 create mode 100644 include/linux/darray_types.h
 rename {fs/bcachefs => lib}/darray.c (57%)

diff --git a/MAINTAINERS b/MAINTAINERS
index 42decde38320..08a66234380e 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -6142,6 +6142,13 @@ F:	net/ax25/ax25_out.c
 F:	net/ax25/ax25_timer.c
 F:	net/ax25/sysctl_net_ax25.c
 
+DARRAY
+M:	Kent Overstreet <kent.overstreet@linux.dev>
+L:	linux-bcachefs@vger.kernel.org
+S:	Maintained
+F:	include/linux/darray.h
+F:	include/linux/darray_types.h
+
 DATA ACCESS MONITOR
 M:	SeongJae Park <sj@kernel.org>
 L:	damon@lists.linux.dev
diff --git a/fs/bcachefs/Makefile b/fs/bcachefs/Makefile
index 0ab533a2b03b..3d477690b71e 100644
--- a/fs/bcachefs/Makefile
+++ b/fs/bcachefs/Makefile
@@ -28,7 +28,6 @@ bcachefs-y		:=	\
 	checksum.o		\
 	clock.o			\
 	compress.o		\
-	darray.o		\
 	data_update.o		\
 	debug.o			\
 	dirent.o		\
diff --git a/fs/bcachefs/btree_node_scan_types.h b/fs/bcachefs/btree_node_scan_types.h
index b6c36c45d0be..069573d48134 100644
--- a/fs/bcachefs/btree_node_scan_types.h
+++ b/fs/bcachefs/btree_node_scan_types.h
@@ -2,7 +2,7 @@
 #ifndef _BCACHEFS_BTREE_NODE_SCAN_TYPES_H
 #define _BCACHEFS_BTREE_NODE_SCAN_TYPES_H
 
-#include "darray.h"
+#include <linux/darray.h>
 
 struct found_btree_node {
 	bool			range_updated:1;
diff --git a/fs/bcachefs/btree_types.h b/fs/bcachefs/btree_types.h
index b256b2a20a4f..5f760906e366 100644
--- a/fs/bcachefs/btree_types.h
+++ b/fs/bcachefs/btree_types.h
@@ -2,13 +2,13 @@
 #ifndef _BCACHEFS_BTREE_TYPES_H
 #define _BCACHEFS_BTREE_TYPES_H
 
+#include <linux/darray_types.h>
 #include <linux/list.h>
 #include <linux/rhashtable.h>
 
 #include "bbpos_types.h"
 #include "btree_key_cache_types.h"
 #include "buckets_types.h"
-#include "darray.h"
 #include "errcode.h"
 #include "journal_types.h"
 #include "replicas_types.h"
diff --git a/fs/bcachefs/btree_update.c b/fs/bcachefs/btree_update.c
index 514df618548e..2289c26b5208 100644
--- a/fs/bcachefs/btree_update.c
+++ b/fs/bcachefs/btree_update.c
@@ -14,6 +14,8 @@
 #include "snapshot.h"
 #include "trace.h"
 
+#include <linux/darray.h>
+
 static inline int btree_insert_entry_cmp(const struct btree_insert_entry *l,
 					 const struct btree_insert_entry *r)
 {
diff --git a/fs/bcachefs/btree_write_buffer_types.h b/fs/bcachefs/btree_write_buffer_types.h
index e9e76e20f43b..d39d163c6ea9 100644
--- a/fs/bcachefs/btree_write_buffer_types.h
+++ b/fs/bcachefs/btree_write_buffer_types.h
@@ -2,7 +2,7 @@
 #ifndef _BCACHEFS_BTREE_WRITE_BUFFER_TYPES_H
 #define _BCACHEFS_BTREE_WRITE_BUFFER_TYPES_H
 
-#include "darray.h"
+#include <linux/darray_types.h>
 #include "journal_types.h"
 
 #define BTREE_WRITE_BUFERED_VAL_U64s_MAX	4
diff --git a/fs/bcachefs/disk_accounting_types.h b/fs/bcachefs/disk_accounting_types.h
index 1687a45177a7..42de8c284d1f 100644
--- a/fs/bcachefs/disk_accounting_types.h
+++ b/fs/bcachefs/disk_accounting_types.h
@@ -2,7 +2,7 @@
 #ifndef _BCACHEFS_DISK_ACCOUNTING_TYPES_H
 #define _BCACHEFS_DISK_ACCOUNTING_TYPES_H
 
-#include "darray.h"
+#include <linux/darray.h>
 
 struct accounting_mem_entry {
 	struct bpos				pos;
diff --git a/fs/bcachefs/fsck.c b/fs/bcachefs/fsck.c
index 9138944c5ae6..2d37316ca721 100644
--- a/fs/bcachefs/fsck.c
+++ b/fs/bcachefs/fsck.c
@@ -5,7 +5,6 @@
 #include "btree_cache.h"
 #include "btree_update.h"
 #include "buckets.h"
-#include "darray.h"
 #include "dirent.h"
 #include "error.h"
 #include "fs-common.h"
@@ -18,6 +17,7 @@
 #include "xattr.h"
 
 #include <linux/bsearch.h>
+#include <linux/darray.h>
 #include <linux/dcache.h> /* struct qstr */
 
 /*
diff --git a/fs/bcachefs/journal_io.h b/fs/bcachefs/journal_io.h
index 2ca9cde30ea8..2b8f458cf13c 100644
--- a/fs/bcachefs/journal_io.h
+++ b/fs/bcachefs/journal_io.h
@@ -2,7 +2,7 @@
 #ifndef _BCACHEFS_JOURNAL_IO_H
 #define _BCACHEFS_JOURNAL_IO_H
 
-#include "darray.h"
+#include <linux/darray_types.h>
 
 void bch2_journal_pos_from_member_info_set(struct bch_fs *);
 void bch2_journal_pos_from_member_info_resume(struct bch_fs *);
diff --git a/fs/bcachefs/journal_sb.c b/fs/bcachefs/journal_sb.c
index db80e506e3ab..9db57f6f1035 100644
--- a/fs/bcachefs/journal_sb.c
+++ b/fs/bcachefs/journal_sb.c
@@ -2,8 +2,8 @@
 
 #include "bcachefs.h"
 #include "journal_sb.h"
-#include "darray.h"
 
+#include <linux/darray.h>
 #include <linux/sort.h>
 
 /* BCH_SB_FIELD_journal: */
diff --git a/fs/bcachefs/sb-downgrade.c b/fs/bcachefs/sb-downgrade.c
index 650a1f77ca40..b42f8f42b71a 100644
--- a/fs/bcachefs/sb-downgrade.c
+++ b/fs/bcachefs/sb-downgrade.c
@@ -6,12 +6,13 @@
  */
 
 #include "bcachefs.h"
-#include "darray.h"
 #include "recovery_passes.h"
 #include "sb-downgrade.h"
 #include "sb-errors.h"
 #include "super-io.h"
 
+#include <linux/darray.h>
+
 #define RECOVERY_PASS_ALL_FSCK		BIT_ULL(63)
 
 /*
diff --git a/fs/bcachefs/sb-errors_types.h b/fs/bcachefs/sb-errors_types.h
index 40325239c3b0..3b28871d23ed 100644
--- a/fs/bcachefs/sb-errors_types.h
+++ b/fs/bcachefs/sb-errors_types.h
@@ -2,7 +2,7 @@
 #ifndef _BCACHEFS_SB_ERRORS_TYPES_H
 #define _BCACHEFS_SB_ERRORS_TYPES_H
 
-#include "darray.h"
+#include <linux/darray_types.h>
 
 struct bch_sb_error_entry_cpu {
 	u64			id:16,
diff --git a/fs/bcachefs/sb-members.h b/fs/bcachefs/sb-members.h
index f307f2857603..2c2f883e8d83 100644
--- a/fs/bcachefs/sb-members.h
+++ b/fs/bcachefs/sb-members.h
@@ -2,8 +2,8 @@
 #ifndef _BCACHEFS_SB_MEMBERS_H
 #define _BCACHEFS_SB_MEMBERS_H
 
-#include "darray.h"
 #include "bkey_types.h"
+#include <linux/darray.h>
 
 extern char * const bch2_member_error_strs[];
 
diff --git a/fs/bcachefs/subvolume.h b/fs/bcachefs/subvolume.h
index e62f876541fe..52daf4303605 100644
--- a/fs/bcachefs/subvolume.h
+++ b/fs/bcachefs/subvolume.h
@@ -2,7 +2,6 @@
 #ifndef _BCACHEFS_SUBVOLUME_H
 #define _BCACHEFS_SUBVOLUME_H
 
-#include "darray.h"
 #include "subvolume_types.h"
 
 enum bch_validate_flags;
diff --git a/fs/bcachefs/subvolume_types.h b/fs/bcachefs/subvolume_types.h
index f2ec4277c2a5..a0c0d5909e9b 100644
--- a/fs/bcachefs/subvolume_types.h
+++ b/fs/bcachefs/subvolume_types.h
@@ -2,7 +2,7 @@
 #ifndef _BCACHEFS_SUBVOLUME_TYPES_H
 #define _BCACHEFS_SUBVOLUME_TYPES_H
 
-#include "darray.h"
+#include <linux/darray_types.h>
 
 typedef DARRAY(u32) snapshot_id_list;
 
diff --git a/fs/bcachefs/thread_with_file_types.h b/fs/bcachefs/thread_with_file_types.h
index f4d484d44f63..254b8493ec4b 100644
--- a/fs/bcachefs/thread_with_file_types.h
+++ b/fs/bcachefs/thread_with_file_types.h
@@ -2,7 +2,7 @@
 #ifndef _BCACHEFS_THREAD_WITH_FILE_TYPES_H
 #define _BCACHEFS_THREAD_WITH_FILE_TYPES_H
 
-#include "darray.h"
+#include <linux/darray_types.h>
 
 struct stdio_buf {
 	spinlock_t		lock;
diff --git a/fs/bcachefs/util.h b/fs/bcachefs/util.h
index fb02c1c36004..742d7d9f365f 100644
--- a/fs/bcachefs/util.h
+++ b/fs/bcachefs/util.h
@@ -5,23 +5,22 @@
 #include <linux/bio.h>
 #include <linux/blkdev.h>
 #include <linux/closure.h>
+#include <linux/darray.h>
 #include <linux/errno.h>
 #include <linux/freezer.h>
 #include <linux/kernel.h>
 #include <linux/min_heap.h>
-#include <linux/sched/clock.h>
 #include <linux/llist.h>
 #include <linux/log2.h>
 #include <linux/percpu.h>
 #include <linux/preempt.h>
 #include <linux/ratelimit.h>
+#include <linux/sched/clock.h>
 #include <linux/slab.h>
 #include <linux/vmalloc.h>
 #include <linux/workqueue.h>
 
 #include "mean_and_variance.h"
-
-#include "darray.h"
 #include "time_stats.h"
 
 struct closure;
@@ -513,30 +512,6 @@ static inline void memset_u64s_tail(void *s, int c, unsigned bytes)
 	memset(s + bytes, c, rem);
 }
 
-/* just the memmove, doesn't update @_nr */
-#define __array_insert_item(_array, _nr, _pos)				\
-	memmove(&(_array)[(_pos) + 1],					\
-		&(_array)[(_pos)],					\
-		sizeof((_array)[0]) * ((_nr) - (_pos)))
-
-#define array_insert_item(_array, _nr, _pos, _new_item)			\
-do {									\
-	__array_insert_item(_array, _nr, _pos);				\
-	(_nr)++;							\
-	(_array)[(_pos)] = (_new_item);					\
-} while (0)
-
-#define array_remove_items(_array, _nr, _pos, _nr_to_remove)		\
-do {									\
-	(_nr) -= (_nr_to_remove);					\
-	memmove(&(_array)[(_pos)],					\
-		&(_array)[(_pos) + (_nr_to_remove)],			\
-		sizeof((_array)[0]) * ((_nr) - (_pos)));		\
-} while (0)
-
-#define array_remove_item(_array, _nr, _pos)				\
-	array_remove_items(_array, _nr, _pos, 1)
-
 static inline void __move_gap(void *array, size_t element_size,
 			      size_t nr, size_t size,
 			      size_t old_gap, size_t new_gap)
diff --git a/fs/bcachefs/darray.h b/include/linux/darray.h
similarity index 66%
rename from fs/bcachefs/darray.h
rename to include/linux/darray.h
index 4b340d13caac..ff167eb795f2 100644
--- a/fs/bcachefs/darray.h
+++ b/include/linux/darray.h
@@ -1,34 +1,26 @@
 /* SPDX-License-Identifier: GPL-2.0 */
-#ifndef _BCACHEFS_DARRAY_H
-#define _BCACHEFS_DARRAY_H
+/*
+ * (C) 2022-2024 Kent Overstreet <kent.overstreet@linux.dev>
+ */
+#ifndef _LINUX_DARRAY_H
+#define _LINUX_DARRAY_H
 
 /*
- * Dynamic arrays:
+ * Dynamic arrays
  *
  * Inspired by CCAN's darray
  */
 
+#include <linux/darray_types.h>
 #include <linux/slab.h>
 
-#define DARRAY_PREALLOCATED(_type, _nr)					\
-struct {								\
-	size_t nr, size;						\
-	_type *data;							\
-	_type preallocated[_nr];					\
-}
-
-#define DARRAY(_type) DARRAY_PREALLOCATED(_type, 0)
-
-typedef DARRAY(char)	darray_char;
-typedef DARRAY(char *) darray_str;
-
-int __bch2_darray_resize(darray_char *, size_t, size_t, gfp_t);
+int __darray_resize_slowpath(darray_char *, size_t, size_t, gfp_t);
 
 static inline int __darray_resize(darray_char *d, size_t element_size,
 				  size_t new_size, gfp_t gfp)
 {
 	return unlikely(new_size > d->size)
-		? __bch2_darray_resize(d, element_size, new_size, gfp)
+		? __darray_resize_slowpath(d, element_size, new_size, gfp)
 		: 0;
 }
 
@@ -69,6 +61,28 @@ static inline int __darray_make_room(darray_char *d, size_t t_size, size_t more,
 #define darray_first(_d)	((_d).data[0])
 #define darray_last(_d)		((_d).data[(_d).nr - 1])
 
+/* Insert/remove items into the middle of a darray: */
+
+#define array_insert_item(_array, _nr, _pos, _new_item)			\
+do {									\
+	memmove(&(_array)[(_pos) + 1],					\
+		&(_array)[(_pos)],					\
+		sizeof((_array)[0]) * ((_nr) - (_pos)));		\
+	(_nr)++;							\
+	(_array)[(_pos)] = (_new_item);					\
+} while (0)
+
+#define array_remove_items(_array, _nr, _pos, _nr_to_remove)		\
+do {									\
+	(_nr) -= (_nr_to_remove);					\
+	memmove(&(_array)[(_pos)],					\
+		&(_array)[(_pos) + (_nr_to_remove)],			\
+		sizeof((_array)[0]) * ((_nr) - (_pos)));		\
+} while (0)
+
+#define array_remove_item(_array, _nr, _pos)				\
+	array_remove_items(_array, _nr, _pos, 1)
+
 #define darray_insert_item(_d, pos, _item)				\
 ({									\
 	size_t _pos = (pos);						\
@@ -79,10 +93,15 @@ static inline int __darray_make_room(darray_char *d, size_t t_size, size_t more,
 	_ret;								\
 })
 
+#define darray_remove_items(_d, _pos, _nr_to_remove)			\
+	array_remove_items((_d)->data, (_d)->nr, (_pos) - (_d)->data, _nr_to_remove)
+
 #define darray_remove_item(_d, _pos)					\
-	array_remove_item((_d)->data, (_d)->nr, (_pos) - (_d)->data)
+	darray_remove_items(_d, _pos, 1)
+
+/* Iteration: */
 
-#define __darray_for_each(_d, _i)						\
+#define __darray_for_each(_d, _i)					\
 	for ((_i) = (_d).data; _i < (_d).data + (_d).nr; _i++)
 
 #define darray_for_each(_d, _i)						\
@@ -106,4 +125,4 @@ do {									\
 	darray_init(_d);						\
 } while (0)
 
-#endif /* _BCACHEFS_DARRAY_H */
+#endif /* _LINUX_DARRAY_H */
diff --git a/include/linux/darray_types.h b/include/linux/darray_types.h
new file mode 100644
index 000000000000..a400a0c3600d
--- /dev/null
+++ b/include/linux/darray_types.h
@@ -0,0 +1,22 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+/*
+ * (C) 2022-2024 Kent Overstreet <kent.overstreet@linux.dev>
+ */
+#ifndef _LINUX_DARRAY_TYpES_H
+#define _LINUX_DARRAY_TYpES_H
+
+#include <linux/types.h>
+
+#define DARRAY_PREALLOCATED(_type, _nr)					\
+struct {								\
+	size_t nr, size;						\
+	_type *data;							\
+	_type preallocated[_nr];					\
+}
+
+#define DARRAY(_type) DARRAY_PREALLOCATED(_type, 0)
+
+typedef DARRAY(char)	darray_char;
+typedef DARRAY(char *)	darray_str;
+
+#endif /* _LINUX_DARRAY_TYpES_H */
diff --git a/lib/Makefile b/lib/Makefile
index 322bb127b4dc..87f4310c6076 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -48,7 +48,7 @@ obj-y += bcd.o sort.o parser.o debug_locks.o random32.o \
 	 bsearch.o find_bit.o llist.o lwq.o memweight.o kfifo.o \
 	 percpu-refcount.o rhashtable.o base64.o \
 	 once.o refcount.o rcuref.o usercopy.o errseq.o bucket_locks.o \
-	 generic-radix-tree.o bitmap-str.o
+	 generic-radix-tree.o bitmap-str.o darray.o
 obj-$(CONFIG_STRING_KUNIT_TEST) += string_kunit.o
 obj-y += string_helpers.o
 obj-$(CONFIG_STRING_HELPERS_KUNIT_TEST) += string_helpers_kunit.o
diff --git a/fs/bcachefs/darray.c b/lib/darray.c
similarity index 57%
rename from fs/bcachefs/darray.c
rename to lib/darray.c
index b7d223f85873..b6868db7f956 100644
--- a/fs/bcachefs/darray.c
+++ b/lib/darray.c
@@ -1,10 +1,14 @@
 // SPDX-License-Identifier: GPL-2.0
+/*
+ * (C) 2022-2024 Kent Overstreet <kent.overstreet@linux.dev>
+ */
 
+#include <linux/darray.h>
 #include <linux/log2.h>
+#include <linux/module.h>
 #include <linux/slab.h>
-#include "darray.h"
 
-int __bch2_darray_resize(darray_char *d, size_t element_size, size_t new_size, gfp_t gfp)
+int __darray_resize_slowpath(darray_char *d, size_t element_size, size_t new_size, gfp_t gfp)
 {
 	if (new_size > d->size) {
 		new_size = roundup_pow_of_two(new_size);
@@ -23,3 +27,7 @@ int __bch2_darray_resize(darray_char *d, size_t element_size, size_t new_size, g
 
 	return 0;
 }
+EXPORT_SYMBOL_GPL(__darray_resize_slowpath);
+
+MODULE_AUTHOR("Kent Overstreet");
+MODULE_LICENSE("GPL");
-- 
2.45.2


^ permalink raw reply related	[flat|nested] 24+ messages in thread

* [PATCH 4/9] vmalloc: is_vmalloc_addr_inlined()
  2024-08-19 16:59 [PATCH 0/9] rcu_pending Kent Overstreet
                   ` (2 preceding siblings ...)
  2024-08-19 16:59 ` [PATCH 3/9] darray: lift from bcachefs Kent Overstreet
@ 2024-08-19 16:59 ` Kent Overstreet
  2024-08-19 16:59 ` [PATCH 5/9] rcu: delete lockdep_assert_irqs_enabled() assert in start_poll_synchronize_rcu_common() Kent Overstreet
                   ` (5 subsequent siblings)
  9 siblings, 0 replies; 24+ messages in thread
From: Kent Overstreet @ 2024-08-19 16:59 UTC (permalink / raw)
  To: rcu; +Cc: Kent Overstreet, paulmck, linux-kernel

Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
---
 include/linux/mm.h | 7 +++++++
 mm/vmalloc.c       | 4 +---
 2 files changed, 8 insertions(+), 3 deletions(-)

diff --git a/include/linux/mm.h b/include/linux/mm.h
index c4b238a20b76..e23516d197d7 100644
--- a/include/linux/mm.h
+++ b/include/linux/mm.h
@@ -1193,6 +1193,13 @@ unsigned long vmalloc_to_pfn(const void *addr);
  * is no special casing required.
  */
 #ifdef CONFIG_MMU
+static inline bool is_vmalloc_addr_inlined(const void *x)
+{
+	unsigned long addr = (unsigned long)kasan_reset_tag(x);
+
+	return addr >= VMALLOC_START && addr < VMALLOC_END;
+}
+
 extern bool is_vmalloc_addr(const void *x);
 extern int is_vmalloc_or_module_addr(const void *x);
 #else
diff --git a/mm/vmalloc.c b/mm/vmalloc.c
index 6b783baf12a1..2bba32d54fb6 100644
--- a/mm/vmalloc.c
+++ b/mm/vmalloc.c
@@ -78,9 +78,7 @@ static const bool vmap_allow_huge = false;
 
 bool is_vmalloc_addr(const void *x)
 {
-	unsigned long addr = (unsigned long)kasan_reset_tag(x);
-
-	return addr >= VMALLOC_START && addr < VMALLOC_END;
+	return is_vmalloc_addr_inlined(x);
 }
 EXPORT_SYMBOL(is_vmalloc_addr);
 
-- 
2.45.2


^ permalink raw reply related	[flat|nested] 24+ messages in thread

* [PATCH 5/9] rcu: delete lockdep_assert_irqs_enabled() assert in start_poll_synchronize_rcu_common()
  2024-08-19 16:59 [PATCH 0/9] rcu_pending Kent Overstreet
                   ` (3 preceding siblings ...)
  2024-08-19 16:59 ` [PATCH 4/9] vmalloc: is_vmalloc_addr_inlined() Kent Overstreet
@ 2024-08-19 16:59 ` Kent Overstreet
  2024-08-19 21:11   ` Paul E. McKenney
  2024-08-19 16:59 ` [PATCH 6/9] rcu: rcu_pending Kent Overstreet
                   ` (4 subsequent siblings)
  9 siblings, 1 reply; 24+ messages in thread
From: Kent Overstreet @ 2024-08-19 16:59 UTC (permalink / raw)
  To: rcu; +Cc: Kent Overstreet, paulmck, linux-kernel

this assertion appears to have been entirely unnecessary

Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
---
 kernel/rcu/tree.c | 1 -
 1 file changed, 1 deletion(-)

diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index e641cc681901..52f9f0bf1b8e 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -4119,7 +4119,6 @@ static void start_poll_synchronize_rcu_common(void)
 	struct rcu_data *rdp;
 	struct rcu_node *rnp;
 
-	lockdep_assert_irqs_enabled();
 	local_irq_save(flags);
 	rdp = this_cpu_ptr(&rcu_data);
 	rnp = rdp->mynode;
-- 
2.45.2


^ permalink raw reply related	[flat|nested] 24+ messages in thread

* [PATCH 6/9] rcu: rcu_pending
  2024-08-19 16:59 [PATCH 0/9] rcu_pending Kent Overstreet
                   ` (4 preceding siblings ...)
  2024-08-19 16:59 ` [PATCH 5/9] rcu: delete lockdep_assert_irqs_enabled() assert in start_poll_synchronize_rcu_common() Kent Overstreet
@ 2024-08-19 16:59 ` Kent Overstreet
  2024-08-19 22:58   ` Paul E. McKenney
  2024-08-19 16:59 ` [PATCH 7/9] bcachefs: Rip out freelists from btree key cache Kent Overstreet
                   ` (3 subsequent siblings)
  9 siblings, 1 reply; 24+ messages in thread
From: Kent Overstreet @ 2024-08-19 16:59 UTC (permalink / raw)
  To: rcu; +Cc: Kent Overstreet, paulmck, linux-kernel

Generic data structure for explicitly tracking pending RCU items,
allowing items to be dequeued (i.e. allocate from items pending
freeing). Works with conventional RCU and SRCU, and possibly other RCU
flavors in the future, meaning this can serve as a more generic
replacement for SLAB_TYPESAFE_BY_RCU.

Pending items are tracked in radix trees; if memory allocation fails, we
fall back to linked lists.

A rcu_pending is initialized with a callback, which is invoked when
pending items's grace periods have expired. Two types of callback
processing are handled specially:

- RCU_PENDING_KVFREE_FN

  New backend for kvfree_rcu(). Slightly faster, and eliminates the
  synchronize_rcu() slowpath in kvfree_rcu_mightsleep() - instead, an
  rcu_head is allocated if we don't have one and can't use the radix
  tree

  TODO:
  - add a shrinker (as in the existing kvfree_rcu implementation) so that
    memory reclaim can free expired objects if callback processing isn't
    keeping up, and to expedite a grace period if we're under memory
    pressure and too much memory is stranded by RCU

  - add a counter for amount of memory pending

- RCU_PENDING_CALL_RCU_FN

  Accelerated backend for call_rcu() - pending callbacks are tracked in
  a radix tree to eliminate linked list overhead.

to serve as replacement backends for kvfree_rcu() and call_rcu(); these
may be of interest to other uses (e.g. SLAB_TYPESAFE_BY_RCU users).

Note:

Internally, we're using a single rearming call_rcu() callback for
notifications from the core RCU subsystem for notifications when objects
are ready to be processed.

Ideally we would be getting a callback every time a grace period
completes for which we have objects, but that would require multiple
rcu_heads in flight, and since the number of gp sequence numbers with
uncompleted callbacks is not bounded, we can't do that yet.

Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
---
 include/linux/rcu_pending.h |  25 ++
 kernel/rcu/Makefile         |   2 +-
 kernel/rcu/pending.c        | 603 ++++++++++++++++++++++++++++++++++++
 3 files changed, 629 insertions(+), 1 deletion(-)
 create mode 100644 include/linux/rcu_pending.h
 create mode 100644 kernel/rcu/pending.c

diff --git a/include/linux/rcu_pending.h b/include/linux/rcu_pending.h
new file mode 100644
index 000000000000..a875c640da8d
--- /dev/null
+++ b/include/linux/rcu_pending.h
@@ -0,0 +1,25 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef _LINUX_RCU_PENDING_H
+#define _LINUX_RCU_PENDING_H
+
+struct rcu_pending;
+typedef void (*rcu_pending_process_fn)(struct rcu_pending *, struct rcu_head *);
+
+struct rcu_pending_pcpu;
+
+struct rcu_pending {
+	struct rcu_pending_pcpu __percpu *p;
+	struct srcu_struct		*srcu;
+	rcu_pending_process_fn		process;
+};
+
+void rcu_pending_enqueue(struct rcu_pending *pending, struct rcu_head *obj);
+struct rcu_head *rcu_pending_dequeue(struct rcu_pending *pending);
+struct rcu_head *rcu_pending_dequeue_from_all(struct rcu_pending *pending);
+
+void rcu_pending_exit(struct rcu_pending *pending);
+int rcu_pending_init(struct rcu_pending *pending,
+		     struct srcu_struct *srcu,
+		     rcu_pending_process_fn process);
+
+#endif /* _LINUX_RCU_PENDING_H */
diff --git a/kernel/rcu/Makefile b/kernel/rcu/Makefile
index 0cfb009a99b9..2582f0324a11 100644
--- a/kernel/rcu/Makefile
+++ b/kernel/rcu/Makefile
@@ -7,7 +7,7 @@ ifeq ($(CONFIG_KCSAN),y)
 KBUILD_CFLAGS += -g -fno-omit-frame-pointer
 endif
 
-obj-y += update.o sync.o
+obj-y += update.o sync.o pending.o
 obj-$(CONFIG_TREE_SRCU) += srcutree.o
 obj-$(CONFIG_TINY_SRCU) += srcutiny.o
 obj-$(CONFIG_RCU_TORTURE_TEST) += rcutorture.o
diff --git a/kernel/rcu/pending.c b/kernel/rcu/pending.c
new file mode 100644
index 000000000000..c0e2351ba198
--- /dev/null
+++ b/kernel/rcu/pending.c
@@ -0,0 +1,603 @@
+// SPDX-License-Identifier: GPL-2.0
+#define pr_fmt(fmt) "%s() " fmt "\n", __func__
+
+#include <linux/darray.h>
+#include <linux/generic-radix-tree.h>
+#include <linux/mm.h>
+#include <linux/percpu.h>
+#include <linux/rcu_pending.h>
+#include <linux/slab.h>
+#include <linux/srcu.h>
+#include <linux/vmalloc.h>
+
+#include "rcu.h"
+
+#define static_array_for_each(_a, _i)			\
+	for (typeof(&(_a)[0]) _i = _a;			\
+	     _i < (_a) + ARRAY_SIZE(_a);		\
+	     _i++)
+
+enum rcu_pending_special {
+	RCU_PENDING_KVFREE	= 1,
+	RCU_PENDING_CALL_RCU	= 2,
+};
+
+#define RCU_PENDING_KVFREE_FN		((rcu_pending_process_fn) (ulong) RCU_PENDING_KVFREE)
+#define RCU_PENDING_CALL_RCU_FN		((rcu_pending_process_fn) (ulong) RCU_PENDING_CALL_RCU)
+
+static inline unsigned long __get_state_synchronize_rcu(struct srcu_struct *ssp)
+{
+	return ssp
+		? get_state_synchronize_srcu(ssp)
+		: get_state_synchronize_rcu();
+}
+
+static inline unsigned long __start_poll_synchronize_rcu(struct srcu_struct *ssp)
+{
+	return ssp
+		? start_poll_synchronize_srcu(ssp)
+		: start_poll_synchronize_rcu();
+}
+
+static inline bool __poll_state_synchronize_rcu(struct srcu_struct *ssp, unsigned long cookie)
+{
+	return ssp
+		? poll_state_synchronize_srcu(ssp, cookie)
+		: poll_state_synchronize_rcu(cookie);
+}
+
+static inline void __rcu_barrier(struct srcu_struct *ssp)
+{
+	return ssp
+		? srcu_barrier(ssp)
+		: rcu_barrier();
+}
+
+static inline void __call_rcu(struct srcu_struct *ssp, struct rcu_head *rhp,
+			      rcu_callback_t func)
+{
+	if (ssp)
+		call_srcu(ssp, rhp, func);
+	else
+		call_rcu(rhp, func);
+}
+
+struct rcu_pending_seq {
+	/*
+	 * We're using a radix tree like a vector - we're just pushing elements
+	 * onto the end; we're using a radix tree instead of an actual vector to
+	 * avoid reallocation overhead
+	 */
+	GENRADIX(struct rcu_head *)	objs;
+	size_t				nr;
+	struct rcu_head			**cursor;
+	unsigned long			seq;
+};
+
+struct rcu_pending_list {
+	struct rcu_head			*head;
+	struct rcu_head			*tail;
+	unsigned long			seq;
+};
+
+struct rcu_pending_pcpu {
+	struct rcu_pending		*parent;
+	spinlock_t			lock;
+	int				cpu;
+
+	/*
+	 * We can't bound the number of unprocessed gp sequence numbers, and we
+	 * can't efficiently merge radix trees for expired grace periods, so we
+	 * need darray/vector:
+	 */
+	DARRAY_PREALLOCATED(struct rcu_pending_seq, 4) objs;
+
+	/* Third entry is for expired objects: */
+	struct rcu_pending_list		lists[NUM_ACTIVE_RCU_POLL_OLDSTATE + 1];
+
+	struct rcu_head			cb;
+	bool				cb_armed;
+	struct work_struct		work;
+};
+
+static bool __rcu_pending_has_pending(struct rcu_pending_pcpu *p)
+{
+	if (p->objs.nr)
+		return true;
+
+	static_array_for_each(p->lists, i)
+		if (i->head)
+			return true;
+
+	return false;
+}
+
+static void rcu_pending_list_merge(struct rcu_pending_list *l1,
+				   struct rcu_pending_list *l2)
+{
+	if (!l1->head)
+		l1->head = l2->head;
+	else
+		l1->tail->next = l2->head;
+	l1->tail = l2->tail;
+
+	l2->head = l2->tail = NULL;
+}
+
+static void rcu_pending_list_add(struct rcu_pending_list *l,
+				 struct rcu_head *n)
+{
+	if (!l->head)
+		l->head = n;
+	else
+		l->tail->next = n;
+	l->tail = n;
+	n->next = NULL;
+}
+
+static void merge_expired_lists(struct rcu_pending_pcpu *p)
+{
+	struct rcu_pending_list *expired = &p->lists[NUM_ACTIVE_RCU_POLL_OLDSTATE];
+
+	for (struct rcu_pending_list *i = p->lists; i < expired; i++)
+		if (i->head && __poll_state_synchronize_rcu(p->parent->srcu, i->seq))
+			rcu_pending_list_merge(expired, i);
+}
+
+static noinline void __process_finished_items(struct rcu_pending *pending,
+					      struct rcu_pending_pcpu *p,
+					      unsigned long flags)
+{
+	struct rcu_pending_list *expired = &p->lists[NUM_ACTIVE_RCU_POLL_OLDSTATE];
+	struct rcu_pending_seq objs = {};
+	struct rcu_head *list = NULL;
+
+	if (p->objs.nr &&
+	    __poll_state_synchronize_rcu(pending->srcu, p->objs.data[0].seq)) {
+		objs = p->objs.data[0];
+		darray_remove_item(&p->objs, p->objs.data);
+	}
+
+	merge_expired_lists(p);
+
+	list = expired->head;
+	expired->head = expired->tail = NULL;
+
+	spin_unlock_irqrestore(&p->lock, flags);
+
+	switch ((ulong) pending->process) {
+	case RCU_PENDING_KVFREE:
+		for (size_t i = 0; i < objs.nr; ) {
+			size_t nr_this_node = min(GENRADIX_NODE_SIZE / sizeof(void *), objs.nr - i);
+
+			kfree_bulk(nr_this_node, (void **) genradix_ptr(&objs.objs, i));
+			i += nr_this_node;
+		}
+		genradix_free(&objs.objs);
+
+		while (list) {
+			struct rcu_head *obj = list;
+			list = obj->next;
+
+			/*
+			 * low bit of pointer indicates whether rcu_head needs
+			 * to be freed - kvfree_rcu_mightsleep()
+			 */
+			BUILD_BUG_ON(ARCH_SLAB_MINALIGN == 0);
+
+			void *ptr = (void *)(((unsigned long) obj->func) & ~1UL);
+			kvfree(ptr);
+
+			bool free_head = ((unsigned long) obj->func) & 1UL;
+			if (free_head)
+				kfree(obj);
+		}
+
+		break;
+
+	case RCU_PENDING_CALL_RCU:
+		for (size_t i = 0; i < objs.nr; i++) {
+			struct rcu_head *obj = *genradix_ptr(&objs.objs, i);
+			obj->func(obj);
+		}
+		genradix_free(&objs.objs);
+
+		while (list) {
+			struct rcu_head *obj = list;
+			list = obj->next;
+			obj->func(obj);
+		}
+		break;
+
+	default:
+		for (size_t i = 0; i < objs.nr; i++)
+			pending->process(pending, *genradix_ptr(&objs.objs, i));
+		genradix_free(&objs.objs);
+
+		while (list) {
+			struct rcu_head *obj = list;
+			list = obj->next;
+			pending->process(pending, obj);
+		}
+		break;
+	}
+}
+
+static bool process_finished_items(struct rcu_pending *pending,
+				   struct rcu_pending_pcpu *p,
+				   unsigned long flags)
+{
+	/*
+	 * XXX: we should grab the gp seq once and avoid multiple function
+	 * calls, this is called from __rcu_pending_enqueue() fastpath in
+	 * may_sleep==true mode
+	 */
+	if ((p->objs.nr && __poll_state_synchronize_rcu(pending->srcu, p->objs.data[0].seq)) ||
+	    (p->lists[0].head && __poll_state_synchronize_rcu(pending->srcu, p->lists[0].seq)) ||
+	    (p->lists[1].head && __poll_state_synchronize_rcu(pending->srcu, p->lists[1].seq)) ||
+	    p->lists[2].head) {
+		__process_finished_items(pending, p, flags);
+		return true;
+	}
+
+	return false;
+}
+
+static void rcu_pending_work(struct work_struct *work)
+{
+	struct rcu_pending_pcpu *p =
+		container_of(work, struct rcu_pending_pcpu, work);
+	struct rcu_pending *pending = p->parent;
+	unsigned long flags;
+
+	do {
+		spin_lock_irqsave(&p->lock, flags);
+	} while (process_finished_items(pending, p, flags));
+
+	spin_unlock_irqrestore(&p->lock, flags);
+}
+
+static void rcu_pending_rcu_cb(struct rcu_head *rcu)
+{
+	struct rcu_pending_pcpu *p = container_of(rcu, struct rcu_pending_pcpu, cb);
+
+	schedule_work_on(p->cpu, &p->work);
+
+	unsigned long flags;
+	spin_lock_irqsave(&p->lock, flags);
+	if (__rcu_pending_has_pending(p))
+		__call_rcu(p->parent->srcu, &p->cb, rcu_pending_rcu_cb);
+	else
+		p->cb_armed = false;
+	spin_unlock_irqrestore(&p->lock, flags);
+}
+
+static __always_inline struct rcu_pending_seq *
+get_object_radix(struct rcu_pending_pcpu *p, unsigned long seq)
+{
+	darray_for_each_reverse(p->objs, objs)
+		if (objs->seq == seq)
+			return objs;
+
+	if (darray_push_gfp(&p->objs, ((struct rcu_pending_seq) { .seq = seq }), GFP_ATOMIC))
+		return NULL;
+
+	return &darray_last(p->objs);
+}
+
+static noinline bool
+rcu_pending_enqueue_list(struct rcu_pending_pcpu *p, unsigned long seq,
+			 struct rcu_head *head, void *ptr,
+			 unsigned long *flags)
+{
+	if (ptr) {
+		if (!head) {
+			/*
+			 * kvfree_rcu_mightsleep(): we weren't passed an
+			 * rcu_head, but we need one: use the low bit of the
+			 * ponter to free to flag that the head needs to be
+			 * freed as well:
+			 */
+			ptr = (void *)(((unsigned long) ptr)|1UL);
+			head = kmalloc(sizeof(*head), __GFP_NOWARN);
+			if (!head) {
+				spin_unlock_irqrestore(&p->lock, *flags);
+				head = kmalloc(sizeof(*head), GFP_KERNEL|__GFP_NOFAIL);
+				/*
+				 * dropped lock, did GFP_KERNEL allocation,
+				 * check for gp expiration
+				 */
+				if (unlikely(__poll_state_synchronize_rcu(p->parent->srcu, seq))) {
+					kvfree(--ptr);
+					kfree(head);
+					spin_lock_irqsave(&p->lock, *flags);
+					return false;
+				}
+			}
+		}
+
+		head->func = ptr;
+	}
+again:
+	for (struct rcu_pending_list *i = p->lists;
+	     i < p->lists + NUM_ACTIVE_RCU_POLL_OLDSTATE; i++) {
+		if (i->seq == seq) {
+			rcu_pending_list_add(i, head);
+			return false;
+		}
+	}
+
+	for (struct rcu_pending_list *i = p->lists;
+	     i < p->lists + NUM_ACTIVE_RCU_POLL_OLDSTATE; i++) {
+		if (!i->head) {
+			i->seq = seq;
+			rcu_pending_list_add(i, head);
+			return true;
+		}
+	}
+
+	merge_expired_lists(p);
+	goto again;
+}
+
+/*
+ * __rcu_pending_enqueue: enqueue a pending RCU item, to be processed (via
+ * pending->pracess) once grace period elapses.
+ *
+ * Attempt to enqueue items onto a radix tree; if memory allocation fails, fall
+ * back to a linked list.
+ *
+ * - If @ptr is NULL, we're enqueuing an item for a generic @pending with a
+ *   process callback
+ *
+ * - If @ptr and @head are both not NULL, we're kvfree_rcu()
+ *
+ * - If @ptr is not NULL and @head is, we're kvfree_rcu_mightsleep()
+ *
+ * - If @may_sleep is true, will do GFP_KERNEL memory allocations and process
+ *   expired items.
+ */
+static __always_inline void
+__rcu_pending_enqueue(struct rcu_pending *pending, struct rcu_head *head,
+		      void *ptr, bool may_sleep)
+{
+
+	struct rcu_pending_pcpu *p;
+	struct rcu_pending_seq *objs;
+	struct genradix_node *new_node = NULL;
+	unsigned long seq, flags;
+	bool start_gp = false;
+
+	BUG_ON((ptr != NULL) != (pending->process == RCU_PENDING_KVFREE_FN));
+
+	local_irq_save(flags);
+	p = this_cpu_ptr(pending->p);
+	spin_lock(&p->lock);
+	seq = __get_state_synchronize_rcu(pending->srcu);
+restart:
+	if (may_sleep &&
+	    unlikely(process_finished_items(pending, p, flags)))
+		goto check_expired;
+
+	/*
+	 * In kvfree_rcu() mode, the radix tree is only for slab pointers so
+	 * that we can do kfree_bulk() - vmalloc pointers always use the linked
+	 * list:
+	 */
+	if (ptr && unlikely(is_vmalloc_addr_inlined(ptr)))
+		goto list_add;
+
+	objs = get_object_radix(p, seq);
+	if (unlikely(!objs))
+		goto list_add;
+
+	if (unlikely(!objs->cursor)) {
+		/*
+		 * New radix tree nodes must be added under @p->lock because the
+		 * tree root is in a darray that can be resized (typically,
+		 * genradix supports concurrent unlocked allocation of new
+		 * nodes) - hence preallocation and the retry loop:
+		 */
+		objs->cursor = genradix_ptr_alloc_preallocated_inlined(&objs->objs,
+						objs->nr, &new_node, GFP_ATOMIC|__GFP_NOWARN);
+		if (unlikely(!objs->cursor)) {
+			if (may_sleep) {
+				spin_unlock_irqrestore(&p->lock, flags);
+
+				gfp_t gfp = GFP_KERNEL;
+				if (!head)
+					gfp |= __GFP_NOFAIL;
+
+				new_node = genradix_alloc_node(gfp);
+				if (!new_node)
+					may_sleep = false;
+				goto check_expired;
+			}
+list_add:
+			start_gp = rcu_pending_enqueue_list(p, seq, head, ptr, &flags);
+			goto start_gp;
+		}
+	}
+
+	*objs->cursor++ = ptr ?: head;
+	/* zero cursor if we hit the end of a radix tree node: */
+	if (!(((ulong) objs->cursor) & (GENRADIX_NODE_SIZE - 1)))
+		objs->cursor = NULL;
+	start_gp = !objs->nr;
+	objs->nr++;
+start_gp:
+	if (unlikely(start_gp)) {
+		/*
+		 * We only have one callback (ideally, we would have one for
+		 * every outstanding graceperiod) - so if our callback is
+		 * already in flight, we may still have to start a grace period
+		 * (since we used get_state() above, not start_poll())
+		 */
+		if (!p->cb_armed) {
+			p->cb_armed = true;
+			__call_rcu(pending->srcu, &p->cb, rcu_pending_rcu_cb);
+		} else {
+			__start_poll_synchronize_rcu(pending->srcu);
+		}
+	}
+	spin_unlock_irqrestore(&p->lock, flags);
+free_node:
+	if (new_node)
+		genradix_free_node(new_node);
+	return;
+check_expired:
+	if (unlikely(__poll_state_synchronize_rcu(pending->srcu, seq))) {
+		switch ((ulong) pending->process) {
+		case RCU_PENDING_KVFREE:
+			kvfree(ptr);
+			break;
+		case RCU_PENDING_CALL_RCU:
+			head->func(head);
+			break;
+		default:
+			pending->process(pending, head);
+			break;
+		}
+		goto free_node;
+	}
+
+	local_irq_save(flags);
+	p = this_cpu_ptr(pending->p);
+	spin_lock(&p->lock);
+	goto restart;
+}
+
+void rcu_pending_enqueue(struct rcu_pending *pending, struct rcu_head *obj)
+{
+	__rcu_pending_enqueue(pending, obj, NULL, true);
+}
+
+static struct rcu_head *rcu_pending_pcpu_dequeue(struct rcu_pending_pcpu *p)
+{
+	struct rcu_head *ret = NULL;
+
+	spin_lock_irq(&p->lock);
+	darray_for_each(p->objs, objs)
+		if (objs->nr) {
+			ret = *genradix_ptr(&objs->objs, --objs->nr);
+			objs->cursor = NULL;
+			if (!objs->nr)
+				genradix_free(&objs->objs);
+			goto out;
+		}
+
+	static_array_for_each(p->lists, i)
+		if (i->head) {
+			ret = i->head;
+			i->head = ret->next;
+			if (!i->head)
+				i->tail = NULL;
+			goto out;
+		}
+out:
+	spin_unlock_irq(&p->lock);
+
+	return ret;
+}
+
+struct rcu_head *rcu_pending_dequeue(struct rcu_pending *pending)
+{
+	return rcu_pending_pcpu_dequeue(raw_cpu_ptr(pending->p));
+}
+
+struct rcu_head *rcu_pending_dequeue_from_all(struct rcu_pending *pending)
+{
+	struct rcu_head *ret = rcu_pending_dequeue(pending);
+
+	if (ret)
+		return ret;
+
+	int cpu;
+	for_each_possible_cpu(cpu) {
+		ret = rcu_pending_pcpu_dequeue(per_cpu_ptr(pending->p, cpu));
+		if (ret)
+			break;
+	}
+	return ret;
+}
+
+static bool rcu_pending_has_pending_or_armed(struct rcu_pending *pending)
+{
+	int cpu;
+	for_each_possible_cpu(cpu) {
+		struct rcu_pending_pcpu *p = per_cpu_ptr(pending->p, cpu);
+		spin_lock_irq(&p->lock);
+		if (__rcu_pending_has_pending(p) || p->cb_armed) {
+			spin_unlock_irq(&p->lock);
+			return true;
+		}
+		spin_unlock_irq(&p->lock);
+	}
+
+	return false;
+}
+
+void rcu_pending_exit(struct rcu_pending *pending)
+{
+	int cpu;
+
+	if (!pending->p)
+		return;
+
+	while (rcu_pending_has_pending_or_armed(pending)) {
+		__rcu_barrier(pending->srcu);
+
+		for_each_possible_cpu(cpu) {
+			struct rcu_pending_pcpu *p = per_cpu_ptr(pending->p, cpu);
+			flush_work(&p->work);
+		}
+	}
+
+	for_each_possible_cpu(cpu) {
+		struct rcu_pending_pcpu *p = per_cpu_ptr(pending->p, cpu);
+		flush_work(&p->work);
+	}
+
+	for_each_possible_cpu(cpu) {
+		struct rcu_pending_pcpu *p = per_cpu_ptr(pending->p, cpu);
+
+		static_array_for_each(p->lists, i)
+			WARN_ON(i->head);
+		WARN_ON(p->objs.nr);
+		darray_exit(&p->objs);
+	}
+	free_percpu(pending->p);
+}
+
+/**
+ * rcu_pending_init: - initialize a rcu_pending
+ *
+ * @pending:	Object to init
+ * @srcu:	May optionally be used with an srcu_struct; if NULL, uses normal
+ *		RCU flavor
+ * @process:	Callback function invoked on objects once their RCU barriers
+ *		have completed; if NULL, kvfree() is used.
+ */
+int rcu_pending_init(struct rcu_pending *pending,
+		     struct srcu_struct *srcu,
+		     rcu_pending_process_fn process)
+{
+	pending->p = alloc_percpu(struct rcu_pending_pcpu);
+	if (!pending->p)
+		return -ENOMEM;
+
+	int cpu;
+	for_each_possible_cpu(cpu) {
+		struct rcu_pending_pcpu *p = per_cpu_ptr(pending->p, cpu);
+		p->parent	= pending;
+		p->cpu		= cpu;
+		spin_lock_init(&p->lock);
+		darray_init(&p->objs);
+		INIT_WORK(&p->work, rcu_pending_work);
+	}
+
+	pending->srcu = srcu;
+	pending->process = process;
+
+	return 0;
+}
-- 
2.45.2


^ permalink raw reply related	[flat|nested] 24+ messages in thread

* [PATCH 7/9] bcachefs: Rip out freelists from btree key cache
  2024-08-19 16:59 [PATCH 0/9] rcu_pending Kent Overstreet
                   ` (5 preceding siblings ...)
  2024-08-19 16:59 ` [PATCH 6/9] rcu: rcu_pending Kent Overstreet
@ 2024-08-19 16:59 ` Kent Overstreet
  2024-08-19 16:59 ` [PATCH 8/9] bcachefs: key cache can now allocate from pending Kent Overstreet
                   ` (2 subsequent siblings)
  9 siblings, 0 replies; 24+ messages in thread
From: Kent Overstreet @ 2024-08-19 16:59 UTC (permalink / raw)
  To: rcu; +Cc: Kent Overstreet, paulmck, linux-kernel

Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
---
 fs/bcachefs/btree_key_cache.c       | 368 +++++-----------------------
 fs/bcachefs/btree_key_cache_types.h |  14 --
 fs/bcachefs/btree_types.h           |   4 +-
 3 files changed, 57 insertions(+), 329 deletions(-)

diff --git a/fs/bcachefs/btree_key_cache.c b/fs/bcachefs/btree_key_cache.c
index 027e0a7d4765..b304a18eaea4 100644
--- a/fs/bcachefs/btree_key_cache.c
+++ b/fs/bcachefs/btree_key_cache.c
@@ -80,130 +80,39 @@ static bool bkey_cached_lock_for_evict(struct bkey_cached *ck)
 	return true;
 }
 
-static void bkey_cached_evict(struct btree_key_cache *c,
+static bool bkey_cached_evict(struct btree_key_cache *c,
 			      struct bkey_cached *ck)
 {
-	BUG_ON(rhashtable_remove_fast(&c->table, &ck->hash,
-				      bch2_btree_key_cache_params));
-	memset(&ck->key, ~0, sizeof(ck->key));
-
-	atomic_long_dec(&c->nr_keys);
-}
-
-static void bkey_cached_free(struct btree_key_cache *bc,
-			     struct bkey_cached *ck)
-{
-	struct bch_fs *c = container_of(bc, struct bch_fs, btree_key_cache);
-
-	BUG_ON(test_bit(BKEY_CACHED_DIRTY, &ck->flags));
-
-	ck->btree_trans_barrier_seq =
-		start_poll_synchronize_srcu(&c->btree_trans_barrier);
-
-	if (ck->c.lock.readers) {
-		list_move_tail(&ck->list, &bc->freed_pcpu);
-		bc->nr_freed_pcpu++;
-	} else {
-		list_move_tail(&ck->list, &bc->freed_nonpcpu);
-		bc->nr_freed_nonpcpu++;
+	bool ret = !rhashtable_remove_fast(&c->table, &ck->hash,
+				      bch2_btree_key_cache_params);
+	if (ret) {
+		memset(&ck->key, ~0, sizeof(ck->key));
+		atomic_long_dec(&c->nr_keys);
 	}
-	atomic_long_inc(&bc->nr_freed);
-
-	kfree(ck->k);
-	ck->k		= NULL;
-	ck->u64s	= 0;
 
-	six_unlock_write(&ck->c.lock);
-	six_unlock_intent(&ck->c.lock);
+	return ret;
 }
 
-#ifdef __KERNEL__
-static void __bkey_cached_move_to_freelist_ordered(struct btree_key_cache *bc,
-						   struct bkey_cached *ck)
+static void __bkey_cached_free(struct rcu_head *rcu)
 {
-	struct bkey_cached *pos;
-
-	bc->nr_freed_nonpcpu++;
+	struct bkey_cached *ck = container_of(rcu, struct bkey_cached, rcu);
 
-	list_for_each_entry_reverse(pos, &bc->freed_nonpcpu, list) {
-		if (ULONG_CMP_GE(ck->btree_trans_barrier_seq,
-				 pos->btree_trans_barrier_seq)) {
-			list_move(&ck->list, &pos->list);
-			return;
-		}
-	}
-
-	list_move(&ck->list, &bc->freed_nonpcpu);
+	kmem_cache_free(bch2_key_cache, ck);
 }
-#endif
-
-static void bkey_cached_move_to_freelist(struct btree_key_cache *bc,
-					 struct bkey_cached *ck)
-{
-	BUG_ON(test_bit(BKEY_CACHED_DIRTY, &ck->flags));
-
-	if (!ck->c.lock.readers) {
-#ifdef __KERNEL__
-		struct btree_key_cache_freelist *f;
-		bool freed = false;
-
-		preempt_disable();
-		f = this_cpu_ptr(bc->pcpu_freed);
 
-		if (f->nr < ARRAY_SIZE(f->objs)) {
-			f->objs[f->nr++] = ck;
-			freed = true;
-		}
-		preempt_enable();
-
-		if (!freed) {
-			mutex_lock(&bc->lock);
-			preempt_disable();
-			f = this_cpu_ptr(bc->pcpu_freed);
-
-			while (f->nr > ARRAY_SIZE(f->objs) / 2) {
-				struct bkey_cached *ck2 = f->objs[--f->nr];
-
-				__bkey_cached_move_to_freelist_ordered(bc, ck2);
-			}
-			preempt_enable();
-
-			__bkey_cached_move_to_freelist_ordered(bc, ck);
-			mutex_unlock(&bc->lock);
-		}
-#else
-		mutex_lock(&bc->lock);
-		list_move_tail(&ck->list, &bc->freed_nonpcpu);
-		bc->nr_freed_nonpcpu++;
-		mutex_unlock(&bc->lock);
-#endif
-	} else {
-		mutex_lock(&bc->lock);
-		list_move_tail(&ck->list, &bc->freed_pcpu);
-		bc->nr_freed_pcpu++;
-		mutex_unlock(&bc->lock);
-	}
-}
-
-static void bkey_cached_free_fast(struct btree_key_cache *bc,
-				  struct bkey_cached *ck)
+static void bkey_cached_free(struct btree_key_cache *bc,
+			     struct bkey_cached *ck)
 {
 	struct bch_fs *c = container_of(bc, struct bch_fs, btree_key_cache);
 
-	ck->btree_trans_barrier_seq =
-		start_poll_synchronize_srcu(&c->btree_trans_barrier);
-
-	list_del_init(&ck->list);
-	atomic_long_inc(&bc->nr_freed);
-
 	kfree(ck->k);
 	ck->k		= NULL;
 	ck->u64s	= 0;
 
-	bkey_cached_move_to_freelist(bc, ck);
-
 	six_unlock_write(&ck->c.lock);
 	six_unlock_intent(&ck->c.lock);
+
+	call_srcu(&c->btree_trans_barrier, &ck->rcu, __bkey_cached_free);
 }
 
 static struct bkey_cached *__bkey_cached_alloc(unsigned key_u64s, gfp_t gfp)
@@ -223,78 +132,10 @@ static struct bkey_cached *__bkey_cached_alloc(unsigned key_u64s, gfp_t gfp)
 static struct bkey_cached *
 bkey_cached_alloc(struct btree_trans *trans, struct btree_path *path, unsigned key_u64s)
 {
-	struct bch_fs *c = trans->c;
-	struct btree_key_cache *bc = &c->btree_key_cache;
-	struct bkey_cached *ck = NULL;
 	bool pcpu_readers = btree_uses_pcpu_readers(path->btree_id);
 	int ret;
 
-	if (!pcpu_readers) {
-#ifdef __KERNEL__
-		struct btree_key_cache_freelist *f;
-
-		preempt_disable();
-		f = this_cpu_ptr(bc->pcpu_freed);
-		if (f->nr)
-			ck = f->objs[--f->nr];
-		preempt_enable();
-
-		if (!ck) {
-			mutex_lock(&bc->lock);
-			preempt_disable();
-			f = this_cpu_ptr(bc->pcpu_freed);
-
-			while (!list_empty(&bc->freed_nonpcpu) &&
-			       f->nr < ARRAY_SIZE(f->objs) / 2) {
-				ck = list_last_entry(&bc->freed_nonpcpu, struct bkey_cached, list);
-				list_del_init(&ck->list);
-				bc->nr_freed_nonpcpu--;
-				f->objs[f->nr++] = ck;
-			}
-
-			ck = f->nr ? f->objs[--f->nr] : NULL;
-			preempt_enable();
-			mutex_unlock(&bc->lock);
-		}
-#else
-		mutex_lock(&bc->lock);
-		if (!list_empty(&bc->freed_nonpcpu)) {
-			ck = list_last_entry(&bc->freed_nonpcpu, struct bkey_cached, list);
-			list_del_init(&ck->list);
-			bc->nr_freed_nonpcpu--;
-		}
-		mutex_unlock(&bc->lock);
-#endif
-	} else {
-		mutex_lock(&bc->lock);
-		if (!list_empty(&bc->freed_pcpu)) {
-			ck = list_last_entry(&bc->freed_pcpu, struct bkey_cached, list);
-			list_del_init(&ck->list);
-			bc->nr_freed_pcpu--;
-		}
-		mutex_unlock(&bc->lock);
-	}
-
-	if (ck) {
-		ret = btree_node_lock_nopath(trans, &ck->c, SIX_LOCK_intent, _THIS_IP_);
-		if (unlikely(ret)) {
-			bkey_cached_move_to_freelist(bc, ck);
-			return ERR_PTR(ret);
-		}
-
-		btree_path_cached_set(trans, path, ck, BTREE_NODE_INTENT_LOCKED);
-
-		ret = bch2_btree_node_lock_write(trans, path, &ck->c);
-		if (unlikely(ret)) {
-			btree_node_unlock(trans, path, 0);
-			bkey_cached_move_to_freelist(bc, ck);
-			return ERR_PTR(ret);
-		}
-
-		return ck;
-	}
-
-	ck = allocate_dropping_locks(trans, ret,
+	struct bkey_cached *ck = allocate_dropping_locks(trans, ret,
 				     __bkey_cached_alloc(key_u64s, _gfp));
 	if (ret) {
 		if (ck)
@@ -306,7 +147,6 @@ bkey_cached_alloc(struct btree_trans *trans, struct btree_path *path, unsigned k
 	if (!ck)
 		return NULL;
 
-	INIT_LIST_HEAD(&ck->list);
 	bch2_btree_lock_init(&ck->c, pcpu_readers ? SIX_LOCK_INIT_PCPU : 0);
 
 	ck->c.cached = true;
@@ -323,21 +163,21 @@ bkey_cached_reuse(struct btree_key_cache *c)
 	struct bkey_cached *ck;
 	unsigned i;
 
-	mutex_lock(&c->lock);
 	rcu_read_lock();
 	tbl = rht_dereference_rcu(c->table.tbl, &c->table);
 	for (i = 0; i < tbl->size; i++)
 		rht_for_each_entry_rcu(ck, pos, tbl, i, hash) {
 			if (!test_bit(BKEY_CACHED_DIRTY, &ck->flags) &&
 			    bkey_cached_lock_for_evict(ck)) {
-				bkey_cached_evict(c, ck);
-				goto out;
+				if (bkey_cached_evict(c, ck))
+					goto out;
+				six_unlock_write(&ck->c.lock);
+				six_unlock_intent(&ck->c.lock);
 			}
 		}
 	ck = NULL;
 out:
 	rcu_read_unlock();
-	mutex_unlock(&c->lock);
 	return ck;
 }
 
@@ -416,7 +256,7 @@ static int btree_key_cache_create(struct btree_trans *trans, struct btree_path *
 	path->uptodate = BTREE_ITER_UPTODATE;
 	return 0;
 err:
-	bkey_cached_free_fast(bc, ck);
+	bkey_cached_free(bc, ck);
 	mark_btree_node_locked_noreset(path, 0, BTREE_NODE_UNLOCKED);
 
 	return ret;
@@ -612,8 +452,12 @@ static int btree_key_cache_flush_pos(struct btree_trans *trans,
 		}
 
 		mark_btree_node_locked_noreset(path, 0, BTREE_NODE_UNLOCKED);
-		bkey_cached_evict(&c->btree_key_cache, ck);
-		bkey_cached_free_fast(&c->btree_key_cache, ck);
+		if (bkey_cached_evict(&c->btree_key_cache, ck)) {
+			bkey_cached_free(&c->btree_key_cache, ck);
+		} else {
+			six_unlock_write(&ck->c.lock);
+			six_unlock_intent(&ck->c.lock);
+		}
 	}
 out:
 	bch2_trans_iter_exit(trans, &b_iter);
@@ -723,7 +567,7 @@ void bch2_btree_key_cache_drop(struct btree_trans *trans,
 	}
 
 	bkey_cached_evict(bc, ck);
-	bkey_cached_free_fast(bc, ck);
+	bkey_cached_free(bc, ck);
 
 	mark_btree_node_locked(trans, path, 0, BTREE_NODE_UNLOCKED);
 	btree_path_set_dirty(path, BTREE_ITER_NEED_TRAVERSE);
@@ -735,60 +579,27 @@ static unsigned long bch2_btree_key_cache_scan(struct shrinker *shrink,
 	struct bch_fs *c = shrink->private_data;
 	struct btree_key_cache *bc = &c->btree_key_cache;
 	struct bucket_table *tbl;
-	struct bkey_cached *ck, *t;
+	struct bkey_cached *ck;
 	size_t scanned = 0, freed = 0, nr = sc->nr_to_scan;
-	unsigned start, flags;
+	unsigned iter, start;
 	int srcu_idx;
 
-	mutex_lock(&bc->lock);
-	bc->requested_to_free += sc->nr_to_scan;
-
 	srcu_idx = srcu_read_lock(&c->btree_trans_barrier);
-	flags = memalloc_nofs_save();
-
-	/*
-	 * Newest freed entries are at the end of the list - once we hit one
-	 * that's too new to be freed, we can bail out:
-	 */
-	list_for_each_entry_safe(ck, t, &bc->freed_nonpcpu, list) {
-		if (!poll_state_synchronize_srcu(&c->btree_trans_barrier,
-						 ck->btree_trans_barrier_seq))
-			break;
-
-		list_del(&ck->list);
-		six_lock_exit(&ck->c.lock);
-		kmem_cache_free(bch2_key_cache, ck);
-		atomic_long_dec(&bc->nr_freed);
-		bc->nr_freed_nonpcpu--;
-		bc->freed++;
-	}
-
-	list_for_each_entry_safe(ck, t, &bc->freed_pcpu, list) {
-		if (!poll_state_synchronize_srcu(&c->btree_trans_barrier,
-						 ck->btree_trans_barrier_seq))
-			break;
-
-		list_del(&ck->list);
-		six_lock_exit(&ck->c.lock);
-		kmem_cache_free(bch2_key_cache, ck);
-		atomic_long_dec(&bc->nr_freed);
-		bc->nr_freed_pcpu--;
-		bc->freed++;
-	}
-
 	rcu_read_lock();
+
 	tbl = rht_dereference_rcu(bc->table.tbl, &bc->table);
-	if (bc->shrink_iter >= tbl->size)
-		bc->shrink_iter = 0;
-	start = bc->shrink_iter;
+	iter = bc->shrink_iter;
+	if (iter >= tbl->size)
+		iter = 0;
+	start = iter;
 
 	do {
 		struct rhash_head *pos, *next;
 
-		pos = rht_ptr_rcu(rht_bucket(tbl, bc->shrink_iter));
+		pos = rht_ptr_rcu(rht_bucket(tbl, iter));
 
 		while (!rht_is_a_nulls(pos)) {
-			next = rht_dereference_bucket_rcu(pos->next, tbl, bc->shrink_iter);
+			next = rht_dereference_bucket_rcu(pos->next, tbl, iter);
 			ck = container_of(pos, struct bkey_cached, hash);
 
 			if (test_bit(BKEY_CACHED_DIRTY, &ck->flags)) {
@@ -798,29 +609,31 @@ static unsigned long bch2_btree_key_cache_scan(struct shrinker *shrink,
 				bc->skipped_accessed++;
 			} else if (!bkey_cached_lock_for_evict(ck)) {
 				bc->skipped_lock_fail++;
-			} else {
-				bkey_cached_evict(bc, ck);
+			} else if (bkey_cached_evict(bc, ck)) {
 				bkey_cached_free(bc, ck);
-				bc->moved_to_freelist++;
+				bc->freed++;
 				freed++;
+			} else {
+				six_unlock_write(&ck->c.lock);
+				six_unlock_intent(&ck->c.lock);
 			}
 
 			scanned++;
 			if (scanned >= nr)
-				break;
+				goto out;
 
 			pos = next;
 		}
 
-		bc->shrink_iter++;
-		if (bc->shrink_iter >= tbl->size)
-			bc->shrink_iter = 0;
-	} while (scanned < nr && bc->shrink_iter != start);
+		iter++;
+		if (iter >= tbl->size)
+			iter = 0;
+	} while (scanned < nr && iter != start);
+out:
+	bc->shrink_iter = iter;
 
 	rcu_read_unlock();
-	memalloc_nofs_restore(flags);
 	srcu_read_unlock(&c->btree_trans_barrier, srcu_idx);
-	mutex_unlock(&bc->lock);
 
 	return freed;
 }
@@ -848,18 +661,13 @@ void bch2_fs_btree_key_cache_exit(struct btree_key_cache *bc)
 {
 	struct bch_fs *c = container_of(bc, struct bch_fs, btree_key_cache);
 	struct bucket_table *tbl;
-	struct bkey_cached *ck, *n;
+	struct bkey_cached *ck;
 	struct rhash_head *pos;
 	LIST_HEAD(items);
 	unsigned i;
-#ifdef __KERNEL__
-	int cpu;
-#endif
 
 	shrinker_free(bc->shrink);
 
-	mutex_lock(&bc->lock);
-
 	/*
 	 * The loop is needed to guard against racing with rehash:
 	 */
@@ -868,44 +676,15 @@ void bch2_fs_btree_key_cache_exit(struct btree_key_cache *bc)
 		tbl = rht_dereference_rcu(bc->table.tbl, &bc->table);
 		if (tbl)
 			for (i = 0; i < tbl->size; i++)
-				rht_for_each_entry_rcu(ck, pos, tbl, i, hash) {
-					bkey_cached_evict(bc, ck);
-					list_add(&ck->list, &items);
+				while (pos = rht_ptr_rcu(rht_bucket(tbl, i)),
+							 !rht_is_a_nulls(pos)) {
+					ck = container_of(pos, struct bkey_cached, hash);
+					BUG_ON(!bkey_cached_evict(bc, ck));
+					kmem_cache_free(bch2_key_cache, ck);
 				}
 		rcu_read_unlock();
 	}
 
-#ifdef __KERNEL__
-	if (bc->pcpu_freed) {
-		for_each_possible_cpu(cpu) {
-			struct btree_key_cache_freelist *f =
-				per_cpu_ptr(bc->pcpu_freed, cpu);
-
-			for (i = 0; i < f->nr; i++) {
-				ck = f->objs[i];
-				list_add(&ck->list, &items);
-			}
-		}
-	}
-#endif
-
-	BUG_ON(list_count_nodes(&bc->freed_pcpu) != bc->nr_freed_pcpu);
-	BUG_ON(list_count_nodes(&bc->freed_nonpcpu) != bc->nr_freed_nonpcpu);
-
-	list_splice(&bc->freed_pcpu,	&items);
-	list_splice(&bc->freed_nonpcpu,	&items);
-
-	mutex_unlock(&bc->lock);
-
-	list_for_each_entry_safe(ck, n, &items, list) {
-		cond_resched();
-
-		list_del(&ck->list);
-		kfree(ck->k);
-		six_lock_exit(&ck->c.lock);
-		kmem_cache_free(bch2_key_cache, ck);
-	}
-
 	if (atomic_long_read(&bc->nr_dirty) &&
 	    !bch2_journal_error(&c->journal) &&
 	    test_bit(BCH_FS_was_rw, &c->flags))
@@ -918,15 +697,10 @@ void bch2_fs_btree_key_cache_exit(struct btree_key_cache *bc)
 
 	if (bc->table_init_done)
 		rhashtable_destroy(&bc->table);
-
-	free_percpu(bc->pcpu_freed);
 }
 
 void bch2_fs_btree_key_cache_init_early(struct btree_key_cache *c)
 {
-	mutex_init(&c->lock);
-	INIT_LIST_HEAD(&c->freed_pcpu);
-	INIT_LIST_HEAD(&c->freed_nonpcpu);
 }
 
 static void bch2_btree_key_cache_shrinker_to_text(struct seq_buf *s, struct shrinker *shrink)
@@ -946,12 +720,6 @@ int bch2_fs_btree_key_cache_init(struct btree_key_cache *bc)
 	struct bch_fs *c = container_of(bc, struct bch_fs, btree_key_cache);
 	struct shrinker *shrink;
 
-#ifdef __KERNEL__
-	bc->pcpu_freed = alloc_percpu(struct btree_key_cache_freelist);
-	if (!bc->pcpu_freed)
-		return -BCH_ERR_ENOMEM_fs_btree_cache_init;
-#endif
-
 	if (rhashtable_init(&bc->table, &bch2_btree_key_cache_params))
 		return -BCH_ERR_ENOMEM_fs_btree_cache_init;
 
@@ -973,45 +741,19 @@ int bch2_fs_btree_key_cache_init(struct btree_key_cache *bc)
 
 void bch2_btree_key_cache_to_text(struct printbuf *out, struct btree_key_cache *bc)
 {
-	struct bch_fs *c = container_of(bc, struct bch_fs, btree_key_cache);
-
 	printbuf_tabstop_push(out, 24);
 	printbuf_tabstop_push(out, 12);
 
-	unsigned flags = memalloc_nofs_save();
-	mutex_lock(&bc->lock);
 	prt_printf(out, "keys:\t%lu\r\n",		atomic_long_read(&bc->nr_keys));
 	prt_printf(out, "dirty:\t%lu\r\n",		atomic_long_read(&bc->nr_dirty));
-	prt_printf(out, "freelist:\t%lu\r\n",		atomic_long_read(&bc->nr_freed));
-	prt_printf(out, "nonpcpu freelist:\t%zu\r\n",	bc->nr_freed_nonpcpu);
-	prt_printf(out, "pcpu freelist:\t%zu\r\n",	bc->nr_freed_pcpu);
+	prt_printf(out, "table size:\t%u\r\n",		bc->table.tbl->size);
 
 	prt_printf(out, "\nshrinker:\n");
 	prt_printf(out, "requested_to_free:\t%lu\r\n",	bc->requested_to_free);
 	prt_printf(out, "freed:\t%lu\r\n",		bc->freed);
-	prt_printf(out, "moved_to_freelist:\t%lu\r\n",	bc->moved_to_freelist);
 	prt_printf(out, "skipped_dirty:\t%lu\r\n",	bc->skipped_dirty);
 	prt_printf(out, "skipped_accessed:\t%lu\r\n",	bc->skipped_accessed);
 	prt_printf(out, "skipped_lock_fail:\t%lu\r\n",	bc->skipped_lock_fail);
-
-	prt_printf(out, "srcu seq:\t%lu\r\n",		get_state_synchronize_srcu(&c->btree_trans_barrier));
-
-	struct bkey_cached *ck;
-	unsigned iter = 0;
-	list_for_each_entry(ck, &bc->freed_nonpcpu, list) {
-		prt_printf(out, "freed_nonpcpu:\t%lu\r\n", ck->btree_trans_barrier_seq);
-		if (++iter > 10)
-			break;
-	}
-
-	iter = 0;
-	list_for_each_entry(ck, &bc->freed_pcpu, list) {
-		prt_printf(out, "freed_pcpu:\t%lu\r\n", ck->btree_trans_barrier_seq);
-		if (++iter > 10)
-			break;
-	}
-	mutex_unlock(&bc->lock);
-	memalloc_flags_restore(flags);
 }
 
 void bch2_btree_key_cache_exit(void)
diff --git a/fs/bcachefs/btree_key_cache_types.h b/fs/bcachefs/btree_key_cache_types.h
index 237e8bb3ac40..e026c65f54e1 100644
--- a/fs/bcachefs/btree_key_cache_types.h
+++ b/fs/bcachefs/btree_key_cache_types.h
@@ -2,33 +2,19 @@
 #ifndef _BCACHEFS_BTREE_KEY_CACHE_TYPES_H
 #define _BCACHEFS_BTREE_KEY_CACHE_TYPES_H
 
-struct btree_key_cache_freelist {
-	struct bkey_cached	*objs[16];
-	unsigned		nr;
-};
-
 struct btree_key_cache {
-	struct mutex		lock;
 	struct rhashtable	table;
 	bool			table_init_done;
 
-	struct list_head	freed_pcpu;
-	size_t			nr_freed_pcpu;
-	struct list_head	freed_nonpcpu;
-	size_t			nr_freed_nonpcpu;
-
 	struct shrinker		*shrink;
 	unsigned		shrink_iter;
-	struct btree_key_cache_freelist __percpu *pcpu_freed;
 
-	atomic_long_t		nr_freed;
 	atomic_long_t		nr_keys;
 	atomic_long_t		nr_dirty;
 
 	/* shrinker stats */
 	unsigned long		requested_to_free;
 	unsigned long		freed;
-	unsigned long		moved_to_freelist;
 	unsigned long		skipped_dirty;
 	unsigned long		skipped_accessed;
 	unsigned long		skipped_lock_fail;
diff --git a/fs/bcachefs/btree_types.h b/fs/bcachefs/btree_types.h
index 5f760906e366..e8190f4e45ac 100644
--- a/fs/bcachefs/btree_types.h
+++ b/fs/bcachefs/btree_types.h
@@ -386,17 +386,17 @@ struct bkey_cached {
 	struct btree_bkey_cached_common c;
 
 	unsigned long		flags;
-	unsigned long		btree_trans_barrier_seq;
 	u16			u64s;
 	struct bkey_cached_key	key;
 
 	struct rhash_head	hash;
-	struct list_head	list;
 
 	struct journal_entry_pin journal;
 	u64			seq;
 
 	struct bkey_i		*k;
+
+	struct rcu_head		rcu;
 };
 
 static inline struct bpos btree_node_pos(struct btree_bkey_cached_common *b)
-- 
2.45.2


^ permalink raw reply related	[flat|nested] 24+ messages in thread

* [PATCH 8/9] bcachefs: key cache can now allocate from pending
  2024-08-19 16:59 [PATCH 0/9] rcu_pending Kent Overstreet
                   ` (6 preceding siblings ...)
  2024-08-19 16:59 ` [PATCH 7/9] bcachefs: Rip out freelists from btree key cache Kent Overstreet
@ 2024-08-19 16:59 ` Kent Overstreet
  2024-08-19 16:59 ` [PATCH 9/9] rcu: Switch kvfree_rcu() to new rcu_pending Kent Overstreet
  2024-08-19 23:07 ` [PATCH 0/9] rcu_pending Paul E. McKenney
  9 siblings, 0 replies; 24+ messages in thread
From: Kent Overstreet @ 2024-08-19 16:59 UTC (permalink / raw)
  To: rcu; +Cc: Kent Overstreet, paulmck, linux-kernel

btree_trans objects can hold the btree_trans_barrier srcu read lock for
an extended amount of time (they shouldn't, but it's difficult to
guarantee).

the srcu barrier blocks memory reclaim, so to avoid too many stranded
key cache items, this uses the new pending_rcu_items to allocate from
pending items - like we did before, but now without a global lock on the
key cache.

Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
---
 fs/bcachefs/btree_key_cache.c       | 58 ++++++++++++++++++++++-------
 fs/bcachefs/btree_key_cache_types.h |  6 +++
 fs/bcachefs/btree_types.h           |  1 -
 3 files changed, 50 insertions(+), 15 deletions(-)

diff --git a/fs/bcachefs/btree_key_cache.c b/fs/bcachefs/btree_key_cache.c
index b304a18eaea4..d69b513e57f1 100644
--- a/fs/bcachefs/btree_key_cache.c
+++ b/fs/bcachefs/btree_key_cache.c
@@ -93,18 +93,18 @@ static bool bkey_cached_evict(struct btree_key_cache *c,
 	return ret;
 }
 
-static void __bkey_cached_free(struct rcu_head *rcu)
+static void __bkey_cached_free(struct rcu_pending *pending, struct rcu_head *rcu)
 {
+	struct bch_fs *c = container_of(pending->srcu, struct bch_fs, btree_trans_barrier);
 	struct bkey_cached *ck = container_of(rcu, struct bkey_cached, rcu);
 
+	this_cpu_dec(*c->btree_key_cache.nr_pending);
 	kmem_cache_free(bch2_key_cache, ck);
 }
 
 static void bkey_cached_free(struct btree_key_cache *bc,
 			     struct bkey_cached *ck)
 {
-	struct bch_fs *c = container_of(bc, struct bch_fs, btree_key_cache);
-
 	kfree(ck->k);
 	ck->k		= NULL;
 	ck->u64s	= 0;
@@ -112,7 +112,9 @@ static void bkey_cached_free(struct btree_key_cache *bc,
 	six_unlock_write(&ck->c.lock);
 	six_unlock_intent(&ck->c.lock);
 
-	call_srcu(&c->btree_trans_barrier, &ck->rcu, __bkey_cached_free);
+	bool pcpu_readers = ck->c.lock.readers != NULL;
+	rcu_pending_enqueue(&bc->pending[pcpu_readers], &ck->rcu);
+	this_cpu_inc(*bc->nr_pending);
 }
 
 static struct bkey_cached *__bkey_cached_alloc(unsigned key_u64s, gfp_t gfp)
@@ -132,10 +134,18 @@ static struct bkey_cached *__bkey_cached_alloc(unsigned key_u64s, gfp_t gfp)
 static struct bkey_cached *
 bkey_cached_alloc(struct btree_trans *trans, struct btree_path *path, unsigned key_u64s)
 {
+	struct bch_fs *c = trans->c;
+	struct btree_key_cache *bc = &c->btree_key_cache;
 	bool pcpu_readers = btree_uses_pcpu_readers(path->btree_id);
 	int ret;
 
-	struct bkey_cached *ck = allocate_dropping_locks(trans, ret,
+	struct bkey_cached *ck = container_of_or_null(
+				rcu_pending_dequeue(&bc->pending[pcpu_readers]),
+				struct bkey_cached, rcu);
+	if (ck)
+		goto lock;
+
+	ck = allocate_dropping_locks(trans, ret,
 				     __bkey_cached_alloc(key_u64s, _gfp));
 	if (ret) {
 		if (ck)
@@ -144,14 +154,19 @@ bkey_cached_alloc(struct btree_trans *trans, struct btree_path *path, unsigned k
 		return ERR_PTR(ret);
 	}
 
-	if (!ck)
-		return NULL;
-
-	bch2_btree_lock_init(&ck->c, pcpu_readers ? SIX_LOCK_INIT_PCPU : 0);
+	if (ck) {
+		bch2_btree_lock_init(&ck->c, pcpu_readers ? SIX_LOCK_INIT_PCPU : 0);
+		ck->c.cached = true;
+		goto lock;
+	}
 
-	ck->c.cached = true;
-	BUG_ON(!six_trylock_intent(&ck->c.lock));
-	BUG_ON(!six_trylock_write(&ck->c.lock));
+	ck = container_of_or_null(rcu_pending_dequeue_from_all(&bc->pending[pcpu_readers]),
+				  struct bkey_cached, rcu);
+	if (ck)
+		goto lock;
+lock:
+	six_lock_intent(&ck->c.lock, NULL, NULL);
+	six_lock_write(&ck->c.lock, NULL, NULL);
 	return ck;
 }
 
@@ -697,6 +712,11 @@ void bch2_fs_btree_key_cache_exit(struct btree_key_cache *bc)
 
 	if (bc->table_init_done)
 		rhashtable_destroy(&bc->table);
+
+	free_percpu(bc->nr_pending);
+
+	rcu_pending_exit(&bc->pending[0]);
+	rcu_pending_exit(&bc->pending[1]);
 }
 
 void bch2_fs_btree_key_cache_init_early(struct btree_key_cache *c)
@@ -720,6 +740,14 @@ int bch2_fs_btree_key_cache_init(struct btree_key_cache *bc)
 	struct bch_fs *c = container_of(bc, struct bch_fs, btree_key_cache);
 	struct shrinker *shrink;
 
+	if (rcu_pending_init(&bc->pending[0], &c->btree_trans_barrier, __bkey_cached_free) ||
+	    rcu_pending_init(&bc->pending[1], &c->btree_trans_barrier, __bkey_cached_free))
+		return -BCH_ERR_ENOMEM_fs_btree_cache_init;
+
+	bc->nr_pending = alloc_percpu(size_t);
+	if (!bc->nr_pending)
+		return -BCH_ERR_ENOMEM_fs_btree_cache_init;
+
 	if (rhashtable_init(&bc->table, &bch2_btree_key_cache_params))
 		return -BCH_ERR_ENOMEM_fs_btree_cache_init;
 
@@ -747,13 +775,15 @@ void bch2_btree_key_cache_to_text(struct printbuf *out, struct btree_key_cache *
 	prt_printf(out, "keys:\t%lu\r\n",		atomic_long_read(&bc->nr_keys));
 	prt_printf(out, "dirty:\t%lu\r\n",		atomic_long_read(&bc->nr_dirty));
 	prt_printf(out, "table size:\t%u\r\n",		bc->table.tbl->size);
-
-	prt_printf(out, "\nshrinker:\n");
+	prt_newline(out);
+	prt_printf(out, "shrinker:\n");
 	prt_printf(out, "requested_to_free:\t%lu\r\n",	bc->requested_to_free);
 	prt_printf(out, "freed:\t%lu\r\n",		bc->freed);
 	prt_printf(out, "skipped_dirty:\t%lu\r\n",	bc->skipped_dirty);
 	prt_printf(out, "skipped_accessed:\t%lu\r\n",	bc->skipped_accessed);
 	prt_printf(out, "skipped_lock_fail:\t%lu\r\n",	bc->skipped_lock_fail);
+	prt_newline(out);
+	prt_printf(out, "pending:\t%lu\r\n",		per_cpu_sum(bc->nr_pending));
 }
 
 void bch2_btree_key_cache_exit(void)
diff --git a/fs/bcachefs/btree_key_cache_types.h b/fs/bcachefs/btree_key_cache_types.h
index e026c65f54e1..739f97022d81 100644
--- a/fs/bcachefs/btree_key_cache_types.h
+++ b/fs/bcachefs/btree_key_cache_types.h
@@ -2,6 +2,8 @@
 #ifndef _BCACHEFS_BTREE_KEY_CACHE_TYPES_H
 #define _BCACHEFS_BTREE_KEY_CACHE_TYPES_H
 
+#include <linux/rcu_pending.h>
+
 struct btree_key_cache {
 	struct rhashtable	table;
 	bool			table_init_done;
@@ -9,6 +11,10 @@ struct btree_key_cache {
 	struct shrinker		*shrink;
 	unsigned		shrink_iter;
 
+	/* 0: non pcpu reader locks, 1: pcpu reader locks */
+	struct rcu_pending	pending[2];
+	size_t __percpu		*nr_pending;
+
 	atomic_long_t		nr_keys;
 	atomic_long_t		nr_dirty;
 
diff --git a/fs/bcachefs/btree_types.h b/fs/bcachefs/btree_types.h
index e8190f4e45ac..ee4bcac44beb 100644
--- a/fs/bcachefs/btree_types.h
+++ b/fs/bcachefs/btree_types.h
@@ -395,7 +395,6 @@ struct bkey_cached {
 	u64			seq;
 
 	struct bkey_i		*k;
-
 	struct rcu_head		rcu;
 };
 
-- 
2.45.2


^ permalink raw reply related	[flat|nested] 24+ messages in thread

* [PATCH 9/9] rcu: Switch kvfree_rcu() to new rcu_pending
  2024-08-19 16:59 [PATCH 0/9] rcu_pending Kent Overstreet
                   ` (7 preceding siblings ...)
  2024-08-19 16:59 ` [PATCH 8/9] bcachefs: key cache can now allocate from pending Kent Overstreet
@ 2024-08-19 16:59 ` Kent Overstreet
  2024-08-19 22:18   ` Paul E. McKenney
  2024-08-19 23:07 ` [PATCH 0/9] rcu_pending Paul E. McKenney
  9 siblings, 1 reply; 24+ messages in thread
From: Kent Overstreet @ 2024-08-19 16:59 UTC (permalink / raw)
  To: rcu; +Cc: Kent Overstreet, paulmck, linux-kernel

This nets us a slight performance increase, and converts to common
code.

Todo - re-add the shrinker, so that memory reclaim can free expired
objects and expedite a grace period when necessary.

Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
---
 include/linux/rcu_pending.h |   2 +
 init/main.c                 |   2 +
 kernel/rcu/pending.c        |  20 +
 kernel/rcu/tree.c           | 746 ------------------------------------
 kernel/rcu/update.c         |   1 -
 5 files changed, 24 insertions(+), 747 deletions(-)

diff --git a/include/linux/rcu_pending.h b/include/linux/rcu_pending.h
index a875c640da8d..5ef6392ce180 100644
--- a/include/linux/rcu_pending.h
+++ b/include/linux/rcu_pending.h
@@ -22,4 +22,6 @@ int rcu_pending_init(struct rcu_pending *pending,
 		     struct srcu_struct *srcu,
 		     rcu_pending_process_fn process);
 
+void __init kvfree_rcu_pending_init(void);
+
 #endif /* _LINUX_RCU_PENDING_H */
diff --git a/init/main.c b/init/main.c
index 206acdde51f5..a3f0fd6ec3da 100644
--- a/init/main.c
+++ b/init/main.c
@@ -43,6 +43,7 @@
 #include <linux/profile.h>
 #include <linux/kfence.h>
 #include <linux/rcupdate.h>
+#include <linux/rcu_pending.h>
 #include <linux/srcu.h>
 #include <linux/moduleparam.h>
 #include <linux/kallsyms.h>
@@ -993,6 +994,7 @@ void start_kernel(void)
 	workqueue_init_early();
 
 	rcu_init();
+	kvfree_rcu_pending_init();
 
 	/* Trace events are available after this */
 	trace_init();
diff --git a/kernel/rcu/pending.c b/kernel/rcu/pending.c
index c0e2351ba198..9c57f373d494 100644
--- a/kernel/rcu/pending.c
+++ b/kernel/rcu/pending.c
@@ -601,3 +601,23 @@ int rcu_pending_init(struct rcu_pending *pending,
 
 	return 0;
 }
+
+#ifndef CONFIG_TINY_RCU
+/* kvfree_rcu */
+
+static struct rcu_pending kvfree_rcu_pending;
+
+void kvfree_call_rcu(struct rcu_head *head, void *ptr)
+{
+	BUG_ON(!ptr);
+
+	__rcu_pending_enqueue(&kvfree_rcu_pending, head, ptr, head == NULL);
+}
+EXPORT_SYMBOL_GPL(kvfree_call_rcu);
+
+void __init kvfree_rcu_pending_init(void)
+{
+	if (rcu_pending_init(&kvfree_rcu_pending, NULL, RCU_PENDING_KVFREE_FN))
+		panic("%s failed\n", __func__);
+}
+#endif
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 52f9f0bf1b8e..3bb6c81e2d30 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -3216,702 +3216,6 @@ EXPORT_SYMBOL_GPL(call_rcu);
 #define KFREE_N_BATCHES 2
 #define FREE_N_CHANNELS 2
 
-/**
- * struct kvfree_rcu_bulk_data - single block to store kvfree_rcu() pointers
- * @list: List node. All blocks are linked between each other
- * @gp_snap: Snapshot of RCU state for objects placed to this bulk
- * @nr_records: Number of active pointers in the array
- * @records: Array of the kvfree_rcu() pointers
- */
-struct kvfree_rcu_bulk_data {
-	struct list_head list;
-	struct rcu_gp_oldstate gp_snap;
-	unsigned long nr_records;
-	void *records[];
-};
-
-/*
- * This macro defines how many entries the "records" array
- * will contain. It is based on the fact that the size of
- * kvfree_rcu_bulk_data structure becomes exactly one page.
- */
-#define KVFREE_BULK_MAX_ENTR \
-	((PAGE_SIZE - sizeof(struct kvfree_rcu_bulk_data)) / sizeof(void *))
-
-/**
- * struct kfree_rcu_cpu_work - single batch of kfree_rcu() requests
- * @rcu_work: Let queue_rcu_work() invoke workqueue handler after grace period
- * @head_free: List of kfree_rcu() objects waiting for a grace period
- * @head_free_gp_snap: Grace-period snapshot to check for attempted premature frees.
- * @bulk_head_free: Bulk-List of kvfree_rcu() objects waiting for a grace period
- * @krcp: Pointer to @kfree_rcu_cpu structure
- */
-
-struct kfree_rcu_cpu_work {
-	struct rcu_work rcu_work;
-	struct rcu_head *head_free;
-	struct rcu_gp_oldstate head_free_gp_snap;
-	struct list_head bulk_head_free[FREE_N_CHANNELS];
-	struct kfree_rcu_cpu *krcp;
-};
-
-/**
- * struct kfree_rcu_cpu - batch up kfree_rcu() requests for RCU grace period
- * @head: List of kfree_rcu() objects not yet waiting for a grace period
- * @head_gp_snap: Snapshot of RCU state for objects placed to "@head"
- * @bulk_head: Bulk-List of kvfree_rcu() objects not yet waiting for a grace period
- * @krw_arr: Array of batches of kfree_rcu() objects waiting for a grace period
- * @lock: Synchronize access to this structure
- * @monitor_work: Promote @head to @head_free after KFREE_DRAIN_JIFFIES
- * @initialized: The @rcu_work fields have been initialized
- * @head_count: Number of objects in rcu_head singular list
- * @bulk_count: Number of objects in bulk-list
- * @bkvcache:
- *	A simple cache list that contains objects for reuse purpose.
- *	In order to save some per-cpu space the list is singular.
- *	Even though it is lockless an access has to be protected by the
- *	per-cpu lock.
- * @page_cache_work: A work to refill the cache when it is empty
- * @backoff_page_cache_fill: Delay cache refills
- * @work_in_progress: Indicates that page_cache_work is running
- * @hrtimer: A hrtimer for scheduling a page_cache_work
- * @nr_bkv_objs: number of allocated objects at @bkvcache.
- *
- * This is a per-CPU structure.  The reason that it is not included in
- * the rcu_data structure is to permit this code to be extracted from
- * the RCU files.  Such extraction could allow further optimization of
- * the interactions with the slab allocators.
- */
-struct kfree_rcu_cpu {
-	// Objects queued on a linked list
-	// through their rcu_head structures.
-	struct rcu_head *head;
-	unsigned long head_gp_snap;
-	atomic_t head_count;
-
-	// Objects queued on a bulk-list.
-	struct list_head bulk_head[FREE_N_CHANNELS];
-	atomic_t bulk_count[FREE_N_CHANNELS];
-
-	struct kfree_rcu_cpu_work krw_arr[KFREE_N_BATCHES];
-	raw_spinlock_t lock;
-	struct delayed_work monitor_work;
-	bool initialized;
-
-	struct delayed_work page_cache_work;
-	atomic_t backoff_page_cache_fill;
-	atomic_t work_in_progress;
-	struct hrtimer hrtimer;
-
-	struct llist_head bkvcache;
-	int nr_bkv_objs;
-};
-
-static DEFINE_PER_CPU(struct kfree_rcu_cpu, krc) = {
-	.lock = __RAW_SPIN_LOCK_UNLOCKED(krc.lock),
-};
-
-static __always_inline void
-debug_rcu_bhead_unqueue(struct kvfree_rcu_bulk_data *bhead)
-{
-#ifdef CONFIG_DEBUG_OBJECTS_RCU_HEAD
-	int i;
-
-	for (i = 0; i < bhead->nr_records; i++)
-		debug_rcu_head_unqueue((struct rcu_head *)(bhead->records[i]));
-#endif
-}
-
-static inline struct kfree_rcu_cpu *
-krc_this_cpu_lock(unsigned long *flags)
-{
-	struct kfree_rcu_cpu *krcp;
-
-	local_irq_save(*flags);	// For safely calling this_cpu_ptr().
-	krcp = this_cpu_ptr(&krc);
-	raw_spin_lock(&krcp->lock);
-
-	return krcp;
-}
-
-static inline void
-krc_this_cpu_unlock(struct kfree_rcu_cpu *krcp, unsigned long flags)
-{
-	raw_spin_unlock_irqrestore(&krcp->lock, flags);
-}
-
-static inline struct kvfree_rcu_bulk_data *
-get_cached_bnode(struct kfree_rcu_cpu *krcp)
-{
-	if (!krcp->nr_bkv_objs)
-		return NULL;
-
-	WRITE_ONCE(krcp->nr_bkv_objs, krcp->nr_bkv_objs - 1);
-	return (struct kvfree_rcu_bulk_data *)
-		llist_del_first(&krcp->bkvcache);
-}
-
-static inline bool
-put_cached_bnode(struct kfree_rcu_cpu *krcp,
-	struct kvfree_rcu_bulk_data *bnode)
-{
-	// Check the limit.
-	if (krcp->nr_bkv_objs >= rcu_min_cached_objs)
-		return false;
-
-	llist_add((struct llist_node *) bnode, &krcp->bkvcache);
-	WRITE_ONCE(krcp->nr_bkv_objs, krcp->nr_bkv_objs + 1);
-	return true;
-}
-
-static int
-drain_page_cache(struct kfree_rcu_cpu *krcp)
-{
-	unsigned long flags;
-	struct llist_node *page_list, *pos, *n;
-	int freed = 0;
-
-	if (!rcu_min_cached_objs)
-		return 0;
-
-	raw_spin_lock_irqsave(&krcp->lock, flags);
-	page_list = llist_del_all(&krcp->bkvcache);
-	WRITE_ONCE(krcp->nr_bkv_objs, 0);
-	raw_spin_unlock_irqrestore(&krcp->lock, flags);
-
-	llist_for_each_safe(pos, n, page_list) {
-		free_page((unsigned long)pos);
-		freed++;
-	}
-
-	return freed;
-}
-
-static void
-kvfree_rcu_bulk(struct kfree_rcu_cpu *krcp,
-	struct kvfree_rcu_bulk_data *bnode, int idx)
-{
-	unsigned long flags;
-	int i;
-
-	if (!WARN_ON_ONCE(!poll_state_synchronize_rcu_full(&bnode->gp_snap))) {
-		debug_rcu_bhead_unqueue(bnode);
-		rcu_lock_acquire(&rcu_callback_map);
-		if (idx == 0) { // kmalloc() / kfree().
-			trace_rcu_invoke_kfree_bulk_callback(
-				rcu_state.name, bnode->nr_records,
-				bnode->records);
-
-			kfree_bulk(bnode->nr_records, bnode->records);
-		} else { // vmalloc() / vfree().
-			for (i = 0; i < bnode->nr_records; i++) {
-				trace_rcu_invoke_kvfree_callback(
-					rcu_state.name, bnode->records[i], 0);
-
-				vfree(bnode->records[i]);
-			}
-		}
-		rcu_lock_release(&rcu_callback_map);
-	}
-
-	raw_spin_lock_irqsave(&krcp->lock, flags);
-	if (put_cached_bnode(krcp, bnode))
-		bnode = NULL;
-	raw_spin_unlock_irqrestore(&krcp->lock, flags);
-
-	if (bnode)
-		free_page((unsigned long) bnode);
-
-	cond_resched_tasks_rcu_qs();
-}
-
-static void
-kvfree_rcu_list(struct rcu_head *head)
-{
-	struct rcu_head *next;
-
-	for (; head; head = next) {
-		void *ptr = (void *) head->func;
-		unsigned long offset = (void *) head - ptr;
-
-		next = head->next;
-		debug_rcu_head_unqueue((struct rcu_head *)ptr);
-		rcu_lock_acquire(&rcu_callback_map);
-		trace_rcu_invoke_kvfree_callback(rcu_state.name, head, offset);
-
-		if (!WARN_ON_ONCE(!__is_kvfree_rcu_offset(offset)))
-			kvfree(ptr);
-
-		rcu_lock_release(&rcu_callback_map);
-		cond_resched_tasks_rcu_qs();
-	}
-}
-
-/*
- * This function is invoked in workqueue context after a grace period.
- * It frees all the objects queued on ->bulk_head_free or ->head_free.
- */
-static void kfree_rcu_work(struct work_struct *work)
-{
-	unsigned long flags;
-	struct kvfree_rcu_bulk_data *bnode, *n;
-	struct list_head bulk_head[FREE_N_CHANNELS];
-	struct rcu_head *head;
-	struct kfree_rcu_cpu *krcp;
-	struct kfree_rcu_cpu_work *krwp;
-	struct rcu_gp_oldstate head_gp_snap;
-	int i;
-
-	krwp = container_of(to_rcu_work(work),
-		struct kfree_rcu_cpu_work, rcu_work);
-	krcp = krwp->krcp;
-
-	raw_spin_lock_irqsave(&krcp->lock, flags);
-	// Channels 1 and 2.
-	for (i = 0; i < FREE_N_CHANNELS; i++)
-		list_replace_init(&krwp->bulk_head_free[i], &bulk_head[i]);
-
-	// Channel 3.
-	head = krwp->head_free;
-	krwp->head_free = NULL;
-	head_gp_snap = krwp->head_free_gp_snap;
-	raw_spin_unlock_irqrestore(&krcp->lock, flags);
-
-	// Handle the first two channels.
-	for (i = 0; i < FREE_N_CHANNELS; i++) {
-		// Start from the tail page, so a GP is likely passed for it.
-		list_for_each_entry_safe(bnode, n, &bulk_head[i], list)
-			kvfree_rcu_bulk(krcp, bnode, i);
-	}
-
-	/*
-	 * This is used when the "bulk" path can not be used for the
-	 * double-argument of kvfree_rcu().  This happens when the
-	 * page-cache is empty, which means that objects are instead
-	 * queued on a linked list through their rcu_head structures.
-	 * This list is named "Channel 3".
-	 */
-	if (head && !WARN_ON_ONCE(!poll_state_synchronize_rcu_full(&head_gp_snap)))
-		kvfree_rcu_list(head);
-}
-
-static bool
-need_offload_krc(struct kfree_rcu_cpu *krcp)
-{
-	int i;
-
-	for (i = 0; i < FREE_N_CHANNELS; i++)
-		if (!list_empty(&krcp->bulk_head[i]))
-			return true;
-
-	return !!READ_ONCE(krcp->head);
-}
-
-static bool
-need_wait_for_krwp_work(struct kfree_rcu_cpu_work *krwp)
-{
-	int i;
-
-	for (i = 0; i < FREE_N_CHANNELS; i++)
-		if (!list_empty(&krwp->bulk_head_free[i]))
-			return true;
-
-	return !!krwp->head_free;
-}
-
-static int krc_count(struct kfree_rcu_cpu *krcp)
-{
-	int sum = atomic_read(&krcp->head_count);
-	int i;
-
-	for (i = 0; i < FREE_N_CHANNELS; i++)
-		sum += atomic_read(&krcp->bulk_count[i]);
-
-	return sum;
-}
-
-static void
-schedule_delayed_monitor_work(struct kfree_rcu_cpu *krcp)
-{
-	long delay, delay_left;
-
-	delay = krc_count(krcp) >= KVFREE_BULK_MAX_ENTR ? 1:KFREE_DRAIN_JIFFIES;
-	if (delayed_work_pending(&krcp->monitor_work)) {
-		delay_left = krcp->monitor_work.timer.expires - jiffies;
-		if (delay < delay_left)
-			mod_delayed_work(system_wq, &krcp->monitor_work, delay);
-		return;
-	}
-	queue_delayed_work(system_wq, &krcp->monitor_work, delay);
-}
-
-static void
-kvfree_rcu_drain_ready(struct kfree_rcu_cpu *krcp)
-{
-	struct list_head bulk_ready[FREE_N_CHANNELS];
-	struct kvfree_rcu_bulk_data *bnode, *n;
-	struct rcu_head *head_ready = NULL;
-	unsigned long flags;
-	int i;
-
-	raw_spin_lock_irqsave(&krcp->lock, flags);
-	for (i = 0; i < FREE_N_CHANNELS; i++) {
-		INIT_LIST_HEAD(&bulk_ready[i]);
-
-		list_for_each_entry_safe_reverse(bnode, n, &krcp->bulk_head[i], list) {
-			if (!poll_state_synchronize_rcu_full(&bnode->gp_snap))
-				break;
-
-			atomic_sub(bnode->nr_records, &krcp->bulk_count[i]);
-			list_move(&bnode->list, &bulk_ready[i]);
-		}
-	}
-
-	if (krcp->head && poll_state_synchronize_rcu(krcp->head_gp_snap)) {
-		head_ready = krcp->head;
-		atomic_set(&krcp->head_count, 0);
-		WRITE_ONCE(krcp->head, NULL);
-	}
-	raw_spin_unlock_irqrestore(&krcp->lock, flags);
-
-	for (i = 0; i < FREE_N_CHANNELS; i++) {
-		list_for_each_entry_safe(bnode, n, &bulk_ready[i], list)
-			kvfree_rcu_bulk(krcp, bnode, i);
-	}
-
-	if (head_ready)
-		kvfree_rcu_list(head_ready);
-}
-
-/*
- * This function is invoked after the KFREE_DRAIN_JIFFIES timeout.
- */
-static void kfree_rcu_monitor(struct work_struct *work)
-{
-	struct kfree_rcu_cpu *krcp = container_of(work,
-		struct kfree_rcu_cpu, monitor_work.work);
-	unsigned long flags;
-	int i, j;
-
-	// Drain ready for reclaim.
-	kvfree_rcu_drain_ready(krcp);
-
-	raw_spin_lock_irqsave(&krcp->lock, flags);
-
-	// Attempt to start a new batch.
-	for (i = 0; i < KFREE_N_BATCHES; i++) {
-		struct kfree_rcu_cpu_work *krwp = &(krcp->krw_arr[i]);
-
-		// Try to detach bulk_head or head and attach it, only when
-		// all channels are free.  Any channel is not free means at krwp
-		// there is on-going rcu work to handle krwp's free business.
-		if (need_wait_for_krwp_work(krwp))
-			continue;
-
-		// kvfree_rcu_drain_ready() might handle this krcp, if so give up.
-		if (need_offload_krc(krcp)) {
-			// Channel 1 corresponds to the SLAB-pointer bulk path.
-			// Channel 2 corresponds to vmalloc-pointer bulk path.
-			for (j = 0; j < FREE_N_CHANNELS; j++) {
-				if (list_empty(&krwp->bulk_head_free[j])) {
-					atomic_set(&krcp->bulk_count[j], 0);
-					list_replace_init(&krcp->bulk_head[j],
-						&krwp->bulk_head_free[j]);
-				}
-			}
-
-			// Channel 3 corresponds to both SLAB and vmalloc
-			// objects queued on the linked list.
-			if (!krwp->head_free) {
-				krwp->head_free = krcp->head;
-				get_state_synchronize_rcu_full(&krwp->head_free_gp_snap);
-				atomic_set(&krcp->head_count, 0);
-				WRITE_ONCE(krcp->head, NULL);
-			}
-
-			// One work is per one batch, so there are three
-			// "free channels", the batch can handle. It can
-			// be that the work is in the pending state when
-			// channels have been detached following by each
-			// other.
-			queue_rcu_work(system_wq, &krwp->rcu_work);
-		}
-	}
-
-	raw_spin_unlock_irqrestore(&krcp->lock, flags);
-
-	// If there is nothing to detach, it means that our job is
-	// successfully done here. In case of having at least one
-	// of the channels that is still busy we should rearm the
-	// work to repeat an attempt. Because previous batches are
-	// still in progress.
-	if (need_offload_krc(krcp))
-		schedule_delayed_monitor_work(krcp);
-}
-
-static enum hrtimer_restart
-schedule_page_work_fn(struct hrtimer *t)
-{
-	struct kfree_rcu_cpu *krcp =
-		container_of(t, struct kfree_rcu_cpu, hrtimer);
-
-	queue_delayed_work(system_highpri_wq, &krcp->page_cache_work, 0);
-	return HRTIMER_NORESTART;
-}
-
-static void fill_page_cache_func(struct work_struct *work)
-{
-	struct kvfree_rcu_bulk_data *bnode;
-	struct kfree_rcu_cpu *krcp =
-		container_of(work, struct kfree_rcu_cpu,
-			page_cache_work.work);
-	unsigned long flags;
-	int nr_pages;
-	bool pushed;
-	int i;
-
-	nr_pages = atomic_read(&krcp->backoff_page_cache_fill) ?
-		1 : rcu_min_cached_objs;
-
-	for (i = READ_ONCE(krcp->nr_bkv_objs); i < nr_pages; i++) {
-		bnode = (struct kvfree_rcu_bulk_data *)
-			__get_free_page(GFP_KERNEL | __GFP_NORETRY | __GFP_NOMEMALLOC | __GFP_NOWARN);
-
-		if (!bnode)
-			break;
-
-		raw_spin_lock_irqsave(&krcp->lock, flags);
-		pushed = put_cached_bnode(krcp, bnode);
-		raw_spin_unlock_irqrestore(&krcp->lock, flags);
-
-		if (!pushed) {
-			free_page((unsigned long) bnode);
-			break;
-		}
-	}
-
-	atomic_set(&krcp->work_in_progress, 0);
-	atomic_set(&krcp->backoff_page_cache_fill, 0);
-}
-
-static void
-run_page_cache_worker(struct kfree_rcu_cpu *krcp)
-{
-	// If cache disabled, bail out.
-	if (!rcu_min_cached_objs)
-		return;
-
-	if (rcu_scheduler_active == RCU_SCHEDULER_RUNNING &&
-			!atomic_xchg(&krcp->work_in_progress, 1)) {
-		if (atomic_read(&krcp->backoff_page_cache_fill)) {
-			queue_delayed_work(system_wq,
-				&krcp->page_cache_work,
-					msecs_to_jiffies(rcu_delay_page_cache_fill_msec));
-		} else {
-			hrtimer_init(&krcp->hrtimer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
-			krcp->hrtimer.function = schedule_page_work_fn;
-			hrtimer_start(&krcp->hrtimer, 0, HRTIMER_MODE_REL);
-		}
-	}
-}
-
-// Record ptr in a page managed by krcp, with the pre-krc_this_cpu_lock()
-// state specified by flags.  If can_alloc is true, the caller must
-// be schedulable and not be holding any locks or mutexes that might be
-// acquired by the memory allocator or anything that it might invoke.
-// Returns true if ptr was successfully recorded, else the caller must
-// use a fallback.
-static inline bool
-add_ptr_to_bulk_krc_lock(struct kfree_rcu_cpu **krcp,
-	unsigned long *flags, void *ptr, bool can_alloc)
-{
-	struct kvfree_rcu_bulk_data *bnode;
-	int idx;
-
-	*krcp = krc_this_cpu_lock(flags);
-	if (unlikely(!(*krcp)->initialized))
-		return false;
-
-	idx = !!is_vmalloc_addr(ptr);
-	bnode = list_first_entry_or_null(&(*krcp)->bulk_head[idx],
-		struct kvfree_rcu_bulk_data, list);
-
-	/* Check if a new block is required. */
-	if (!bnode || bnode->nr_records == KVFREE_BULK_MAX_ENTR) {
-		bnode = get_cached_bnode(*krcp);
-		if (!bnode && can_alloc) {
-			krc_this_cpu_unlock(*krcp, *flags);
-
-			// __GFP_NORETRY - allows a light-weight direct reclaim
-			// what is OK from minimizing of fallback hitting point of
-			// view. Apart of that it forbids any OOM invoking what is
-			// also beneficial since we are about to release memory soon.
-			//
-			// __GFP_NOMEMALLOC - prevents from consuming of all the
-			// memory reserves. Please note we have a fallback path.
-			//
-			// __GFP_NOWARN - it is supposed that an allocation can
-			// be failed under low memory or high memory pressure
-			// scenarios.
-			bnode = (struct kvfree_rcu_bulk_data *)
-				__get_free_page(GFP_KERNEL | __GFP_NORETRY | __GFP_NOMEMALLOC | __GFP_NOWARN);
-			raw_spin_lock_irqsave(&(*krcp)->lock, *flags);
-		}
-
-		if (!bnode)
-			return false;
-
-		// Initialize the new block and attach it.
-		bnode->nr_records = 0;
-		list_add(&bnode->list, &(*krcp)->bulk_head[idx]);
-	}
-
-	// Finally insert and update the GP for this page.
-	bnode->records[bnode->nr_records++] = ptr;
-	get_state_synchronize_rcu_full(&bnode->gp_snap);
-	atomic_inc(&(*krcp)->bulk_count[idx]);
-
-	return true;
-}
-
-/*
- * Queue a request for lazy invocation of the appropriate free routine
- * after a grace period.  Please note that three paths are maintained,
- * two for the common case using arrays of pointers and a third one that
- * is used only when the main paths cannot be used, for example, due to
- * memory pressure.
- *
- * Each kvfree_call_rcu() request is added to a batch. The batch will be drained
- * every KFREE_DRAIN_JIFFIES number of jiffies. All the objects in the batch will
- * be free'd in workqueue context. This allows us to: batch requests together to
- * reduce the number of grace periods during heavy kfree_rcu()/kvfree_rcu() load.
- */
-void kvfree_call_rcu(struct rcu_head *head, void *ptr)
-{
-	unsigned long flags;
-	struct kfree_rcu_cpu *krcp;
-	bool success;
-
-	/*
-	 * Please note there is a limitation for the head-less
-	 * variant, that is why there is a clear rule for such
-	 * objects: it can be used from might_sleep() context
-	 * only. For other places please embed an rcu_head to
-	 * your data.
-	 */
-	if (!head)
-		might_sleep();
-
-	// Queue the object but don't yet schedule the batch.
-	if (debug_rcu_head_queue(ptr)) {
-		// Probable double kfree_rcu(), just leak.
-		WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n",
-			  __func__, head);
-
-		// Mark as success and leave.
-		return;
-	}
-
-	kasan_record_aux_stack_noalloc(ptr);
-	success = add_ptr_to_bulk_krc_lock(&krcp, &flags, ptr, !head);
-	if (!success) {
-		run_page_cache_worker(krcp);
-
-		if (head == NULL)
-			// Inline if kvfree_rcu(one_arg) call.
-			goto unlock_return;
-
-		head->func = ptr;
-		head->next = krcp->head;
-		WRITE_ONCE(krcp->head, head);
-		atomic_inc(&krcp->head_count);
-
-		// Take a snapshot for this krcp.
-		krcp->head_gp_snap = get_state_synchronize_rcu();
-		success = true;
-	}
-
-	/*
-	 * The kvfree_rcu() caller considers the pointer freed at this point
-	 * and likely removes any references to it. Since the actual slab
-	 * freeing (and kmemleak_free()) is deferred, tell kmemleak to ignore
-	 * this object (no scanning or false positives reporting).
-	 */
-	kmemleak_ignore(ptr);
-
-	// Set timer to drain after KFREE_DRAIN_JIFFIES.
-	if (rcu_scheduler_active == RCU_SCHEDULER_RUNNING)
-		schedule_delayed_monitor_work(krcp);
-
-unlock_return:
-	krc_this_cpu_unlock(krcp, flags);
-
-	/*
-	 * Inline kvfree() after synchronize_rcu(). We can do
-	 * it from might_sleep() context only, so the current
-	 * CPU can pass the QS state.
-	 */
-	if (!success) {
-		debug_rcu_head_unqueue((struct rcu_head *) ptr);
-		synchronize_rcu();
-		kvfree(ptr);
-	}
-}
-EXPORT_SYMBOL_GPL(kvfree_call_rcu);
-
-static unsigned long
-kfree_rcu_shrink_count(struct shrinker *shrink, struct shrink_control *sc)
-{
-	int cpu;
-	unsigned long count = 0;
-
-	/* Snapshot count of all CPUs */
-	for_each_possible_cpu(cpu) {
-		struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
-
-		count += krc_count(krcp);
-		count += READ_ONCE(krcp->nr_bkv_objs);
-		atomic_set(&krcp->backoff_page_cache_fill, 1);
-	}
-
-	return count == 0 ? SHRINK_EMPTY : count;
-}
-
-static unsigned long
-kfree_rcu_shrink_scan(struct shrinker *shrink, struct shrink_control *sc)
-{
-	int cpu, freed = 0;
-
-	for_each_possible_cpu(cpu) {
-		int count;
-		struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
-
-		count = krc_count(krcp);
-		count += drain_page_cache(krcp);
-		kfree_rcu_monitor(&krcp->monitor_work.work);
-
-		sc->nr_to_scan -= count;
-		freed += count;
-
-		if (sc->nr_to_scan <= 0)
-			break;
-	}
-
-	return freed == 0 ? SHRINK_STOP : freed;
-}
-
-void __init kfree_rcu_scheduler_running(void)
-{
-	int cpu;
-
-	for_each_possible_cpu(cpu) {
-		struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
-
-		if (need_offload_krc(krcp))
-			schedule_delayed_monitor_work(krcp);
-	}
-}
-
 /*
  * During early boot, any blocking grace-period wait automatically
  * implies a grace period.
@@ -5567,62 +4871,12 @@ static void __init rcu_dump_rcu_node_tree(void)
 
 struct workqueue_struct *rcu_gp_wq;
 
-static void __init kfree_rcu_batch_init(void)
-{
-	int cpu;
-	int i, j;
-	struct shrinker *kfree_rcu_shrinker;
-
-	/* Clamp it to [0:100] seconds interval. */
-	if (rcu_delay_page_cache_fill_msec < 0 ||
-		rcu_delay_page_cache_fill_msec > 100 * MSEC_PER_SEC) {
-
-		rcu_delay_page_cache_fill_msec =
-			clamp(rcu_delay_page_cache_fill_msec, 0,
-				(int) (100 * MSEC_PER_SEC));
-
-		pr_info("Adjusting rcutree.rcu_delay_page_cache_fill_msec to %d ms.\n",
-			rcu_delay_page_cache_fill_msec);
-	}
-
-	for_each_possible_cpu(cpu) {
-		struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
-
-		for (i = 0; i < KFREE_N_BATCHES; i++) {
-			INIT_RCU_WORK(&krcp->krw_arr[i].rcu_work, kfree_rcu_work);
-			krcp->krw_arr[i].krcp = krcp;
-
-			for (j = 0; j < FREE_N_CHANNELS; j++)
-				INIT_LIST_HEAD(&krcp->krw_arr[i].bulk_head_free[j]);
-		}
-
-		for (i = 0; i < FREE_N_CHANNELS; i++)
-			INIT_LIST_HEAD(&krcp->bulk_head[i]);
-
-		INIT_DELAYED_WORK(&krcp->monitor_work, kfree_rcu_monitor);
-		INIT_DELAYED_WORK(&krcp->page_cache_work, fill_page_cache_func);
-		krcp->initialized = true;
-	}
-
-	kfree_rcu_shrinker = shrinker_alloc(0, "rcu-kfree");
-	if (!kfree_rcu_shrinker) {
-		pr_err("Failed to allocate kfree_rcu() shrinker!\n");
-		return;
-	}
-
-	kfree_rcu_shrinker->count_objects = kfree_rcu_shrink_count;
-	kfree_rcu_shrinker->scan_objects = kfree_rcu_shrink_scan;
-
-	shrinker_register(kfree_rcu_shrinker);
-}
-
 void __init rcu_init(void)
 {
 	int cpu = smp_processor_id();
 
 	rcu_early_boot_tests();
 
-	kfree_rcu_batch_init();
 	rcu_bootup_announce();
 	sanitize_kthread_prio();
 	rcu_init_geometry();
diff --git a/kernel/rcu/update.c b/kernel/rcu/update.c
index f8436969e0c8..82ca9f490d7c 100644
--- a/kernel/rcu/update.c
+++ b/kernel/rcu/update.c
@@ -273,7 +273,6 @@ static int __init rcu_set_runtime_mode(void)
 {
 	rcu_test_sync_prims();
 	rcu_scheduler_active = RCU_SCHEDULER_RUNNING;
-	kfree_rcu_scheduler_running();
 	rcu_test_sync_prims();
 	return 0;
 }
-- 
2.45.2


^ permalink raw reply related	[flat|nested] 24+ messages in thread

* Re: [PATCH 5/9] rcu: delete lockdep_assert_irqs_enabled() assert in start_poll_synchronize_rcu_common()
  2024-08-19 16:59 ` [PATCH 5/9] rcu: delete lockdep_assert_irqs_enabled() assert in start_poll_synchronize_rcu_common() Kent Overstreet
@ 2024-08-19 21:11   ` Paul E. McKenney
  0 siblings, 0 replies; 24+ messages in thread
From: Paul E. McKenney @ 2024-08-19 21:11 UTC (permalink / raw)
  To: Kent Overstreet; +Cc: rcu, linux-kernel

On Mon, Aug 19, 2024 at 12:59:31PM -0400, Kent Overstreet wrote:
> this assertion appears to have been entirely unnecessary
> 
> Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>

Please also adjust the comments.  Or just take this from the -rcu tree's
"dev" branch, which is shown at the end of this email:

e0a917e08b9b ("rcu: Permit start_poll_synchronize_rcu*() with interrupts disabled")

> ---
>  kernel/rcu/tree.c | 1 -
>  1 file changed, 1 deletion(-)
> 
> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> index e641cc681901..52f9f0bf1b8e 100644
> --- a/kernel/rcu/tree.c
> +++ b/kernel/rcu/tree.c
> @@ -4119,7 +4119,6 @@ static void start_poll_synchronize_rcu_common(void)
>  	struct rcu_data *rdp;
>  	struct rcu_node *rnp;
>  
> -	lockdep_assert_irqs_enabled();
>  	local_irq_save(flags);
>  	rdp = this_cpu_ptr(&rcu_data);
>  	rnp = rdp->mynode;
> -- 
> 2.45.2
> 

------------------------------------------------------------------------

commit e0a917e08b9b39eee5870738250d0b8b10272dbe
Author: Paul E. McKenney <paulmck@kernel.org>
Date:   Fri Aug 16 14:22:48 2024 -0700

    rcu: Permit start_poll_synchronize_rcu*() with interrupts disabled
    
    The header comment for both start_poll_synchronize_rcu() and
    start_poll_synchronize_rcu_full() state that interrupts must be enabled
    when calling these two functions, and there is a lockdep assertion in
    start_poll_synchronize_rcu_common() enforcing this restriction.  However,
    there is no need for this restrictions, as can be seen in call_rcu(),
    which does wakeups when interrupts are disabled.
    
    This commit therefore removes the lockdep assertion and the comments.
    
    Reported-by: Kent Overstreet <kent.overstreet@linux.dev>
    Signed-off-by: Paul E. McKenney <paulmck@kernel.org>

diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 36b207918158c..47c753ae9bbcd 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -4103,7 +4103,6 @@ static void start_poll_synchronize_rcu_common(void)
 	struct rcu_data *rdp;
 	struct rcu_node *rnp;
 
-	lockdep_assert_irqs_enabled();
 	local_irq_save(flags);
 	rdp = this_cpu_ptr(&rcu_data);
 	rnp = rdp->mynode;
@@ -4128,9 +4127,6 @@ static void start_poll_synchronize_rcu_common(void)
  * grace period has elapsed in the meantime.  If the needed grace period
  * is not already slated to start, notifies RCU core of the need for that
  * grace period.
- *
- * Interrupts must be enabled for the case where it is necessary to awaken
- * the grace-period kthread.
  */
 unsigned long start_poll_synchronize_rcu(void)
 {
@@ -4151,9 +4147,6 @@ EXPORT_SYMBOL_GPL(start_poll_synchronize_rcu);
  * grace period (whether normal or expedited) has elapsed in the meantime.
  * If the needed grace period is not already slated to start, notifies
  * RCU core of the need for that grace period.
- *
- * Interrupts must be enabled for the case where it is necessary to awaken
- * the grace-period kthread.
  */
 void start_poll_synchronize_rcu_full(struct rcu_gp_oldstate *rgosp)
 {

^ permalink raw reply related	[flat|nested] 24+ messages in thread

* Re: [PATCH 9/9] rcu: Switch kvfree_rcu() to new rcu_pending
  2024-08-19 16:59 ` [PATCH 9/9] rcu: Switch kvfree_rcu() to new rcu_pending Kent Overstreet
@ 2024-08-19 22:18   ` Paul E. McKenney
  2024-08-19 23:05     ` Kent Overstreet
  0 siblings, 1 reply; 24+ messages in thread
From: Paul E. McKenney @ 2024-08-19 22:18 UTC (permalink / raw)
  To: Kent Overstreet; +Cc: rcu, linux-kernel, linux-arch

On Mon, Aug 19, 2024 at 12:59:35PM -0400, Kent Overstreet wrote:
> This nets us a slight performance increase, and converts to common
> code.

How did you measure the performance, and what was the actual amount
of the increase?

Either way, the performance increase needs to be verified by the people
for whom the current code works well.  As noted a couple months ago,
I am having a hard time imagining a tree beating a linked list of pages
of pointers in terms of cache locality.  I added linux-arch on CC in
case I am out of date on how computer systems work.

In the meantime, I need you to hold off on this one.

							Thanx, Paul

> Todo - re-add the shrinker, so that memory reclaim can free expired
> objects and expedite a grace period when necessary.
> 
> Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
> ---
>  include/linux/rcu_pending.h |   2 +
>  init/main.c                 |   2 +
>  kernel/rcu/pending.c        |  20 +
>  kernel/rcu/tree.c           | 746 ------------------------------------
>  kernel/rcu/update.c         |   1 -
>  5 files changed, 24 insertions(+), 747 deletions(-)
> 
> diff --git a/include/linux/rcu_pending.h b/include/linux/rcu_pending.h
> index a875c640da8d..5ef6392ce180 100644
> --- a/include/linux/rcu_pending.h
> +++ b/include/linux/rcu_pending.h
> @@ -22,4 +22,6 @@ int rcu_pending_init(struct rcu_pending *pending,
>  		     struct srcu_struct *srcu,
>  		     rcu_pending_process_fn process);
>  
> +void __init kvfree_rcu_pending_init(void);
> +
>  #endif /* _LINUX_RCU_PENDING_H */
> diff --git a/init/main.c b/init/main.c
> index 206acdde51f5..a3f0fd6ec3da 100644
> --- a/init/main.c
> +++ b/init/main.c
> @@ -43,6 +43,7 @@
>  #include <linux/profile.h>
>  #include <linux/kfence.h>
>  #include <linux/rcupdate.h>
> +#include <linux/rcu_pending.h>
>  #include <linux/srcu.h>
>  #include <linux/moduleparam.h>
>  #include <linux/kallsyms.h>
> @@ -993,6 +994,7 @@ void start_kernel(void)
>  	workqueue_init_early();
>  
>  	rcu_init();
> +	kvfree_rcu_pending_init();
>  
>  	/* Trace events are available after this */
>  	trace_init();
> diff --git a/kernel/rcu/pending.c b/kernel/rcu/pending.c
> index c0e2351ba198..9c57f373d494 100644
> --- a/kernel/rcu/pending.c
> +++ b/kernel/rcu/pending.c
> @@ -601,3 +601,23 @@ int rcu_pending_init(struct rcu_pending *pending,
>  
>  	return 0;
>  }
> +
> +#ifndef CONFIG_TINY_RCU
> +/* kvfree_rcu */
> +
> +static struct rcu_pending kvfree_rcu_pending;
> +
> +void kvfree_call_rcu(struct rcu_head *head, void *ptr)
> +{
> +	BUG_ON(!ptr);
> +
> +	__rcu_pending_enqueue(&kvfree_rcu_pending, head, ptr, head == NULL);
> +}
> +EXPORT_SYMBOL_GPL(kvfree_call_rcu);
> +
> +void __init kvfree_rcu_pending_init(void)
> +{
> +	if (rcu_pending_init(&kvfree_rcu_pending, NULL, RCU_PENDING_KVFREE_FN))
> +		panic("%s failed\n", __func__);
> +}
> +#endif
> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> index 52f9f0bf1b8e..3bb6c81e2d30 100644
> --- a/kernel/rcu/tree.c
> +++ b/kernel/rcu/tree.c
> @@ -3216,702 +3216,6 @@ EXPORT_SYMBOL_GPL(call_rcu);
>  #define KFREE_N_BATCHES 2
>  #define FREE_N_CHANNELS 2
>  
> -/**
> - * struct kvfree_rcu_bulk_data - single block to store kvfree_rcu() pointers
> - * @list: List node. All blocks are linked between each other
> - * @gp_snap: Snapshot of RCU state for objects placed to this bulk
> - * @nr_records: Number of active pointers in the array
> - * @records: Array of the kvfree_rcu() pointers
> - */
> -struct kvfree_rcu_bulk_data {
> -	struct list_head list;
> -	struct rcu_gp_oldstate gp_snap;
> -	unsigned long nr_records;
> -	void *records[];
> -};
> -
> -/*
> - * This macro defines how many entries the "records" array
> - * will contain. It is based on the fact that the size of
> - * kvfree_rcu_bulk_data structure becomes exactly one page.
> - */
> -#define KVFREE_BULK_MAX_ENTR \
> -	((PAGE_SIZE - sizeof(struct kvfree_rcu_bulk_data)) / sizeof(void *))
> -
> -/**
> - * struct kfree_rcu_cpu_work - single batch of kfree_rcu() requests
> - * @rcu_work: Let queue_rcu_work() invoke workqueue handler after grace period
> - * @head_free: List of kfree_rcu() objects waiting for a grace period
> - * @head_free_gp_snap: Grace-period snapshot to check for attempted premature frees.
> - * @bulk_head_free: Bulk-List of kvfree_rcu() objects waiting for a grace period
> - * @krcp: Pointer to @kfree_rcu_cpu structure
> - */
> -
> -struct kfree_rcu_cpu_work {
> -	struct rcu_work rcu_work;
> -	struct rcu_head *head_free;
> -	struct rcu_gp_oldstate head_free_gp_snap;
> -	struct list_head bulk_head_free[FREE_N_CHANNELS];
> -	struct kfree_rcu_cpu *krcp;
> -};
> -
> -/**
> - * struct kfree_rcu_cpu - batch up kfree_rcu() requests for RCU grace period
> - * @head: List of kfree_rcu() objects not yet waiting for a grace period
> - * @head_gp_snap: Snapshot of RCU state for objects placed to "@head"
> - * @bulk_head: Bulk-List of kvfree_rcu() objects not yet waiting for a grace period
> - * @krw_arr: Array of batches of kfree_rcu() objects waiting for a grace period
> - * @lock: Synchronize access to this structure
> - * @monitor_work: Promote @head to @head_free after KFREE_DRAIN_JIFFIES
> - * @initialized: The @rcu_work fields have been initialized
> - * @head_count: Number of objects in rcu_head singular list
> - * @bulk_count: Number of objects in bulk-list
> - * @bkvcache:
> - *	A simple cache list that contains objects for reuse purpose.
> - *	In order to save some per-cpu space the list is singular.
> - *	Even though it is lockless an access has to be protected by the
> - *	per-cpu lock.
> - * @page_cache_work: A work to refill the cache when it is empty
> - * @backoff_page_cache_fill: Delay cache refills
> - * @work_in_progress: Indicates that page_cache_work is running
> - * @hrtimer: A hrtimer for scheduling a page_cache_work
> - * @nr_bkv_objs: number of allocated objects at @bkvcache.
> - *
> - * This is a per-CPU structure.  The reason that it is not included in
> - * the rcu_data structure is to permit this code to be extracted from
> - * the RCU files.  Such extraction could allow further optimization of
> - * the interactions with the slab allocators.
> - */
> -struct kfree_rcu_cpu {
> -	// Objects queued on a linked list
> -	// through their rcu_head structures.
> -	struct rcu_head *head;
> -	unsigned long head_gp_snap;
> -	atomic_t head_count;
> -
> -	// Objects queued on a bulk-list.
> -	struct list_head bulk_head[FREE_N_CHANNELS];
> -	atomic_t bulk_count[FREE_N_CHANNELS];
> -
> -	struct kfree_rcu_cpu_work krw_arr[KFREE_N_BATCHES];
> -	raw_spinlock_t lock;
> -	struct delayed_work monitor_work;
> -	bool initialized;
> -
> -	struct delayed_work page_cache_work;
> -	atomic_t backoff_page_cache_fill;
> -	atomic_t work_in_progress;
> -	struct hrtimer hrtimer;
> -
> -	struct llist_head bkvcache;
> -	int nr_bkv_objs;
> -};
> -
> -static DEFINE_PER_CPU(struct kfree_rcu_cpu, krc) = {
> -	.lock = __RAW_SPIN_LOCK_UNLOCKED(krc.lock),
> -};
> -
> -static __always_inline void
> -debug_rcu_bhead_unqueue(struct kvfree_rcu_bulk_data *bhead)
> -{
> -#ifdef CONFIG_DEBUG_OBJECTS_RCU_HEAD
> -	int i;
> -
> -	for (i = 0; i < bhead->nr_records; i++)
> -		debug_rcu_head_unqueue((struct rcu_head *)(bhead->records[i]));
> -#endif
> -}
> -
> -static inline struct kfree_rcu_cpu *
> -krc_this_cpu_lock(unsigned long *flags)
> -{
> -	struct kfree_rcu_cpu *krcp;
> -
> -	local_irq_save(*flags);	// For safely calling this_cpu_ptr().
> -	krcp = this_cpu_ptr(&krc);
> -	raw_spin_lock(&krcp->lock);
> -
> -	return krcp;
> -}
> -
> -static inline void
> -krc_this_cpu_unlock(struct kfree_rcu_cpu *krcp, unsigned long flags)
> -{
> -	raw_spin_unlock_irqrestore(&krcp->lock, flags);
> -}
> -
> -static inline struct kvfree_rcu_bulk_data *
> -get_cached_bnode(struct kfree_rcu_cpu *krcp)
> -{
> -	if (!krcp->nr_bkv_objs)
> -		return NULL;
> -
> -	WRITE_ONCE(krcp->nr_bkv_objs, krcp->nr_bkv_objs - 1);
> -	return (struct kvfree_rcu_bulk_data *)
> -		llist_del_first(&krcp->bkvcache);
> -}
> -
> -static inline bool
> -put_cached_bnode(struct kfree_rcu_cpu *krcp,
> -	struct kvfree_rcu_bulk_data *bnode)
> -{
> -	// Check the limit.
> -	if (krcp->nr_bkv_objs >= rcu_min_cached_objs)
> -		return false;
> -
> -	llist_add((struct llist_node *) bnode, &krcp->bkvcache);
> -	WRITE_ONCE(krcp->nr_bkv_objs, krcp->nr_bkv_objs + 1);
> -	return true;
> -}
> -
> -static int
> -drain_page_cache(struct kfree_rcu_cpu *krcp)
> -{
> -	unsigned long flags;
> -	struct llist_node *page_list, *pos, *n;
> -	int freed = 0;
> -
> -	if (!rcu_min_cached_objs)
> -		return 0;
> -
> -	raw_spin_lock_irqsave(&krcp->lock, flags);
> -	page_list = llist_del_all(&krcp->bkvcache);
> -	WRITE_ONCE(krcp->nr_bkv_objs, 0);
> -	raw_spin_unlock_irqrestore(&krcp->lock, flags);
> -
> -	llist_for_each_safe(pos, n, page_list) {
> -		free_page((unsigned long)pos);
> -		freed++;
> -	}
> -
> -	return freed;
> -}
> -
> -static void
> -kvfree_rcu_bulk(struct kfree_rcu_cpu *krcp,
> -	struct kvfree_rcu_bulk_data *bnode, int idx)
> -{
> -	unsigned long flags;
> -	int i;
> -
> -	if (!WARN_ON_ONCE(!poll_state_synchronize_rcu_full(&bnode->gp_snap))) {
> -		debug_rcu_bhead_unqueue(bnode);
> -		rcu_lock_acquire(&rcu_callback_map);
> -		if (idx == 0) { // kmalloc() / kfree().
> -			trace_rcu_invoke_kfree_bulk_callback(
> -				rcu_state.name, bnode->nr_records,
> -				bnode->records);
> -
> -			kfree_bulk(bnode->nr_records, bnode->records);
> -		} else { // vmalloc() / vfree().
> -			for (i = 0; i < bnode->nr_records; i++) {
> -				trace_rcu_invoke_kvfree_callback(
> -					rcu_state.name, bnode->records[i], 0);
> -
> -				vfree(bnode->records[i]);
> -			}
> -		}
> -		rcu_lock_release(&rcu_callback_map);
> -	}
> -
> -	raw_spin_lock_irqsave(&krcp->lock, flags);
> -	if (put_cached_bnode(krcp, bnode))
> -		bnode = NULL;
> -	raw_spin_unlock_irqrestore(&krcp->lock, flags);
> -
> -	if (bnode)
> -		free_page((unsigned long) bnode);
> -
> -	cond_resched_tasks_rcu_qs();
> -}
> -
> -static void
> -kvfree_rcu_list(struct rcu_head *head)
> -{
> -	struct rcu_head *next;
> -
> -	for (; head; head = next) {
> -		void *ptr = (void *) head->func;
> -		unsigned long offset = (void *) head - ptr;
> -
> -		next = head->next;
> -		debug_rcu_head_unqueue((struct rcu_head *)ptr);
> -		rcu_lock_acquire(&rcu_callback_map);
> -		trace_rcu_invoke_kvfree_callback(rcu_state.name, head, offset);
> -
> -		if (!WARN_ON_ONCE(!__is_kvfree_rcu_offset(offset)))
> -			kvfree(ptr);
> -
> -		rcu_lock_release(&rcu_callback_map);
> -		cond_resched_tasks_rcu_qs();
> -	}
> -}
> -
> -/*
> - * This function is invoked in workqueue context after a grace period.
> - * It frees all the objects queued on ->bulk_head_free or ->head_free.
> - */
> -static void kfree_rcu_work(struct work_struct *work)
> -{
> -	unsigned long flags;
> -	struct kvfree_rcu_bulk_data *bnode, *n;
> -	struct list_head bulk_head[FREE_N_CHANNELS];
> -	struct rcu_head *head;
> -	struct kfree_rcu_cpu *krcp;
> -	struct kfree_rcu_cpu_work *krwp;
> -	struct rcu_gp_oldstate head_gp_snap;
> -	int i;
> -
> -	krwp = container_of(to_rcu_work(work),
> -		struct kfree_rcu_cpu_work, rcu_work);
> -	krcp = krwp->krcp;
> -
> -	raw_spin_lock_irqsave(&krcp->lock, flags);
> -	// Channels 1 and 2.
> -	for (i = 0; i < FREE_N_CHANNELS; i++)
> -		list_replace_init(&krwp->bulk_head_free[i], &bulk_head[i]);
> -
> -	// Channel 3.
> -	head = krwp->head_free;
> -	krwp->head_free = NULL;
> -	head_gp_snap = krwp->head_free_gp_snap;
> -	raw_spin_unlock_irqrestore(&krcp->lock, flags);
> -
> -	// Handle the first two channels.
> -	for (i = 0; i < FREE_N_CHANNELS; i++) {
> -		// Start from the tail page, so a GP is likely passed for it.
> -		list_for_each_entry_safe(bnode, n, &bulk_head[i], list)
> -			kvfree_rcu_bulk(krcp, bnode, i);
> -	}
> -
> -	/*
> -	 * This is used when the "bulk" path can not be used for the
> -	 * double-argument of kvfree_rcu().  This happens when the
> -	 * page-cache is empty, which means that objects are instead
> -	 * queued on a linked list through their rcu_head structures.
> -	 * This list is named "Channel 3".
> -	 */
> -	if (head && !WARN_ON_ONCE(!poll_state_synchronize_rcu_full(&head_gp_snap)))
> -		kvfree_rcu_list(head);
> -}
> -
> -static bool
> -need_offload_krc(struct kfree_rcu_cpu *krcp)
> -{
> -	int i;
> -
> -	for (i = 0; i < FREE_N_CHANNELS; i++)
> -		if (!list_empty(&krcp->bulk_head[i]))
> -			return true;
> -
> -	return !!READ_ONCE(krcp->head);
> -}
> -
> -static bool
> -need_wait_for_krwp_work(struct kfree_rcu_cpu_work *krwp)
> -{
> -	int i;
> -
> -	for (i = 0; i < FREE_N_CHANNELS; i++)
> -		if (!list_empty(&krwp->bulk_head_free[i]))
> -			return true;
> -
> -	return !!krwp->head_free;
> -}
> -
> -static int krc_count(struct kfree_rcu_cpu *krcp)
> -{
> -	int sum = atomic_read(&krcp->head_count);
> -	int i;
> -
> -	for (i = 0; i < FREE_N_CHANNELS; i++)
> -		sum += atomic_read(&krcp->bulk_count[i]);
> -
> -	return sum;
> -}
> -
> -static void
> -schedule_delayed_monitor_work(struct kfree_rcu_cpu *krcp)
> -{
> -	long delay, delay_left;
> -
> -	delay = krc_count(krcp) >= KVFREE_BULK_MAX_ENTR ? 1:KFREE_DRAIN_JIFFIES;
> -	if (delayed_work_pending(&krcp->monitor_work)) {
> -		delay_left = krcp->monitor_work.timer.expires - jiffies;
> -		if (delay < delay_left)
> -			mod_delayed_work(system_wq, &krcp->monitor_work, delay);
> -		return;
> -	}
> -	queue_delayed_work(system_wq, &krcp->monitor_work, delay);
> -}
> -
> -static void
> -kvfree_rcu_drain_ready(struct kfree_rcu_cpu *krcp)
> -{
> -	struct list_head bulk_ready[FREE_N_CHANNELS];
> -	struct kvfree_rcu_bulk_data *bnode, *n;
> -	struct rcu_head *head_ready = NULL;
> -	unsigned long flags;
> -	int i;
> -
> -	raw_spin_lock_irqsave(&krcp->lock, flags);
> -	for (i = 0; i < FREE_N_CHANNELS; i++) {
> -		INIT_LIST_HEAD(&bulk_ready[i]);
> -
> -		list_for_each_entry_safe_reverse(bnode, n, &krcp->bulk_head[i], list) {
> -			if (!poll_state_synchronize_rcu_full(&bnode->gp_snap))
> -				break;
> -
> -			atomic_sub(bnode->nr_records, &krcp->bulk_count[i]);
> -			list_move(&bnode->list, &bulk_ready[i]);
> -		}
> -	}
> -
> -	if (krcp->head && poll_state_synchronize_rcu(krcp->head_gp_snap)) {
> -		head_ready = krcp->head;
> -		atomic_set(&krcp->head_count, 0);
> -		WRITE_ONCE(krcp->head, NULL);
> -	}
> -	raw_spin_unlock_irqrestore(&krcp->lock, flags);
> -
> -	for (i = 0; i < FREE_N_CHANNELS; i++) {
> -		list_for_each_entry_safe(bnode, n, &bulk_ready[i], list)
> -			kvfree_rcu_bulk(krcp, bnode, i);
> -	}
> -
> -	if (head_ready)
> -		kvfree_rcu_list(head_ready);
> -}
> -
> -/*
> - * This function is invoked after the KFREE_DRAIN_JIFFIES timeout.
> - */
> -static void kfree_rcu_monitor(struct work_struct *work)
> -{
> -	struct kfree_rcu_cpu *krcp = container_of(work,
> -		struct kfree_rcu_cpu, monitor_work.work);
> -	unsigned long flags;
> -	int i, j;
> -
> -	// Drain ready for reclaim.
> -	kvfree_rcu_drain_ready(krcp);
> -
> -	raw_spin_lock_irqsave(&krcp->lock, flags);
> -
> -	// Attempt to start a new batch.
> -	for (i = 0; i < KFREE_N_BATCHES; i++) {
> -		struct kfree_rcu_cpu_work *krwp = &(krcp->krw_arr[i]);
> -
> -		// Try to detach bulk_head or head and attach it, only when
> -		// all channels are free.  Any channel is not free means at krwp
> -		// there is on-going rcu work to handle krwp's free business.
> -		if (need_wait_for_krwp_work(krwp))
> -			continue;
> -
> -		// kvfree_rcu_drain_ready() might handle this krcp, if so give up.
> -		if (need_offload_krc(krcp)) {
> -			// Channel 1 corresponds to the SLAB-pointer bulk path.
> -			// Channel 2 corresponds to vmalloc-pointer bulk path.
> -			for (j = 0; j < FREE_N_CHANNELS; j++) {
> -				if (list_empty(&krwp->bulk_head_free[j])) {
> -					atomic_set(&krcp->bulk_count[j], 0);
> -					list_replace_init(&krcp->bulk_head[j],
> -						&krwp->bulk_head_free[j]);
> -				}
> -			}
> -
> -			// Channel 3 corresponds to both SLAB and vmalloc
> -			// objects queued on the linked list.
> -			if (!krwp->head_free) {
> -				krwp->head_free = krcp->head;
> -				get_state_synchronize_rcu_full(&krwp->head_free_gp_snap);
> -				atomic_set(&krcp->head_count, 0);
> -				WRITE_ONCE(krcp->head, NULL);
> -			}
> -
> -			// One work is per one batch, so there are three
> -			// "free channels", the batch can handle. It can
> -			// be that the work is in the pending state when
> -			// channels have been detached following by each
> -			// other.
> -			queue_rcu_work(system_wq, &krwp->rcu_work);
> -		}
> -	}
> -
> -	raw_spin_unlock_irqrestore(&krcp->lock, flags);
> -
> -	// If there is nothing to detach, it means that our job is
> -	// successfully done here. In case of having at least one
> -	// of the channels that is still busy we should rearm the
> -	// work to repeat an attempt. Because previous batches are
> -	// still in progress.
> -	if (need_offload_krc(krcp))
> -		schedule_delayed_monitor_work(krcp);
> -}
> -
> -static enum hrtimer_restart
> -schedule_page_work_fn(struct hrtimer *t)
> -{
> -	struct kfree_rcu_cpu *krcp =
> -		container_of(t, struct kfree_rcu_cpu, hrtimer);
> -
> -	queue_delayed_work(system_highpri_wq, &krcp->page_cache_work, 0);
> -	return HRTIMER_NORESTART;
> -}
> -
> -static void fill_page_cache_func(struct work_struct *work)
> -{
> -	struct kvfree_rcu_bulk_data *bnode;
> -	struct kfree_rcu_cpu *krcp =
> -		container_of(work, struct kfree_rcu_cpu,
> -			page_cache_work.work);
> -	unsigned long flags;
> -	int nr_pages;
> -	bool pushed;
> -	int i;
> -
> -	nr_pages = atomic_read(&krcp->backoff_page_cache_fill) ?
> -		1 : rcu_min_cached_objs;
> -
> -	for (i = READ_ONCE(krcp->nr_bkv_objs); i < nr_pages; i++) {
> -		bnode = (struct kvfree_rcu_bulk_data *)
> -			__get_free_page(GFP_KERNEL | __GFP_NORETRY | __GFP_NOMEMALLOC | __GFP_NOWARN);
> -
> -		if (!bnode)
> -			break;
> -
> -		raw_spin_lock_irqsave(&krcp->lock, flags);
> -		pushed = put_cached_bnode(krcp, bnode);
> -		raw_spin_unlock_irqrestore(&krcp->lock, flags);
> -
> -		if (!pushed) {
> -			free_page((unsigned long) bnode);
> -			break;
> -		}
> -	}
> -
> -	atomic_set(&krcp->work_in_progress, 0);
> -	atomic_set(&krcp->backoff_page_cache_fill, 0);
> -}
> -
> -static void
> -run_page_cache_worker(struct kfree_rcu_cpu *krcp)
> -{
> -	// If cache disabled, bail out.
> -	if (!rcu_min_cached_objs)
> -		return;
> -
> -	if (rcu_scheduler_active == RCU_SCHEDULER_RUNNING &&
> -			!atomic_xchg(&krcp->work_in_progress, 1)) {
> -		if (atomic_read(&krcp->backoff_page_cache_fill)) {
> -			queue_delayed_work(system_wq,
> -				&krcp->page_cache_work,
> -					msecs_to_jiffies(rcu_delay_page_cache_fill_msec));
> -		} else {
> -			hrtimer_init(&krcp->hrtimer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
> -			krcp->hrtimer.function = schedule_page_work_fn;
> -			hrtimer_start(&krcp->hrtimer, 0, HRTIMER_MODE_REL);
> -		}
> -	}
> -}
> -
> -// Record ptr in a page managed by krcp, with the pre-krc_this_cpu_lock()
> -// state specified by flags.  If can_alloc is true, the caller must
> -// be schedulable and not be holding any locks or mutexes that might be
> -// acquired by the memory allocator or anything that it might invoke.
> -// Returns true if ptr was successfully recorded, else the caller must
> -// use a fallback.
> -static inline bool
> -add_ptr_to_bulk_krc_lock(struct kfree_rcu_cpu **krcp,
> -	unsigned long *flags, void *ptr, bool can_alloc)
> -{
> -	struct kvfree_rcu_bulk_data *bnode;
> -	int idx;
> -
> -	*krcp = krc_this_cpu_lock(flags);
> -	if (unlikely(!(*krcp)->initialized))
> -		return false;
> -
> -	idx = !!is_vmalloc_addr(ptr);
> -	bnode = list_first_entry_or_null(&(*krcp)->bulk_head[idx],
> -		struct kvfree_rcu_bulk_data, list);
> -
> -	/* Check if a new block is required. */
> -	if (!bnode || bnode->nr_records == KVFREE_BULK_MAX_ENTR) {
> -		bnode = get_cached_bnode(*krcp);
> -		if (!bnode && can_alloc) {
> -			krc_this_cpu_unlock(*krcp, *flags);
> -
> -			// __GFP_NORETRY - allows a light-weight direct reclaim
> -			// what is OK from minimizing of fallback hitting point of
> -			// view. Apart of that it forbids any OOM invoking what is
> -			// also beneficial since we are about to release memory soon.
> -			//
> -			// __GFP_NOMEMALLOC - prevents from consuming of all the
> -			// memory reserves. Please note we have a fallback path.
> -			//
> -			// __GFP_NOWARN - it is supposed that an allocation can
> -			// be failed under low memory or high memory pressure
> -			// scenarios.
> -			bnode = (struct kvfree_rcu_bulk_data *)
> -				__get_free_page(GFP_KERNEL | __GFP_NORETRY | __GFP_NOMEMALLOC | __GFP_NOWARN);
> -			raw_spin_lock_irqsave(&(*krcp)->lock, *flags);
> -		}
> -
> -		if (!bnode)
> -			return false;
> -
> -		// Initialize the new block and attach it.
> -		bnode->nr_records = 0;
> -		list_add(&bnode->list, &(*krcp)->bulk_head[idx]);
> -	}
> -
> -	// Finally insert and update the GP for this page.
> -	bnode->records[bnode->nr_records++] = ptr;
> -	get_state_synchronize_rcu_full(&bnode->gp_snap);
> -	atomic_inc(&(*krcp)->bulk_count[idx]);
> -
> -	return true;
> -}
> -
> -/*
> - * Queue a request for lazy invocation of the appropriate free routine
> - * after a grace period.  Please note that three paths are maintained,
> - * two for the common case using arrays of pointers and a third one that
> - * is used only when the main paths cannot be used, for example, due to
> - * memory pressure.
> - *
> - * Each kvfree_call_rcu() request is added to a batch. The batch will be drained
> - * every KFREE_DRAIN_JIFFIES number of jiffies. All the objects in the batch will
> - * be free'd in workqueue context. This allows us to: batch requests together to
> - * reduce the number of grace periods during heavy kfree_rcu()/kvfree_rcu() load.
> - */
> -void kvfree_call_rcu(struct rcu_head *head, void *ptr)
> -{
> -	unsigned long flags;
> -	struct kfree_rcu_cpu *krcp;
> -	bool success;
> -
> -	/*
> -	 * Please note there is a limitation for the head-less
> -	 * variant, that is why there is a clear rule for such
> -	 * objects: it can be used from might_sleep() context
> -	 * only. For other places please embed an rcu_head to
> -	 * your data.
> -	 */
> -	if (!head)
> -		might_sleep();
> -
> -	// Queue the object but don't yet schedule the batch.
> -	if (debug_rcu_head_queue(ptr)) {
> -		// Probable double kfree_rcu(), just leak.
> -		WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n",
> -			  __func__, head);
> -
> -		// Mark as success and leave.
> -		return;
> -	}
> -
> -	kasan_record_aux_stack_noalloc(ptr);
> -	success = add_ptr_to_bulk_krc_lock(&krcp, &flags, ptr, !head);
> -	if (!success) {
> -		run_page_cache_worker(krcp);
> -
> -		if (head == NULL)
> -			// Inline if kvfree_rcu(one_arg) call.
> -			goto unlock_return;
> -
> -		head->func = ptr;
> -		head->next = krcp->head;
> -		WRITE_ONCE(krcp->head, head);
> -		atomic_inc(&krcp->head_count);
> -
> -		// Take a snapshot for this krcp.
> -		krcp->head_gp_snap = get_state_synchronize_rcu();
> -		success = true;
> -	}
> -
> -	/*
> -	 * The kvfree_rcu() caller considers the pointer freed at this point
> -	 * and likely removes any references to it. Since the actual slab
> -	 * freeing (and kmemleak_free()) is deferred, tell kmemleak to ignore
> -	 * this object (no scanning or false positives reporting).
> -	 */
> -	kmemleak_ignore(ptr);
> -
> -	// Set timer to drain after KFREE_DRAIN_JIFFIES.
> -	if (rcu_scheduler_active == RCU_SCHEDULER_RUNNING)
> -		schedule_delayed_monitor_work(krcp);
> -
> -unlock_return:
> -	krc_this_cpu_unlock(krcp, flags);
> -
> -	/*
> -	 * Inline kvfree() after synchronize_rcu(). We can do
> -	 * it from might_sleep() context only, so the current
> -	 * CPU can pass the QS state.
> -	 */
> -	if (!success) {
> -		debug_rcu_head_unqueue((struct rcu_head *) ptr);
> -		synchronize_rcu();
> -		kvfree(ptr);
> -	}
> -}
> -EXPORT_SYMBOL_GPL(kvfree_call_rcu);
> -
> -static unsigned long
> -kfree_rcu_shrink_count(struct shrinker *shrink, struct shrink_control *sc)
> -{
> -	int cpu;
> -	unsigned long count = 0;
> -
> -	/* Snapshot count of all CPUs */
> -	for_each_possible_cpu(cpu) {
> -		struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
> -
> -		count += krc_count(krcp);
> -		count += READ_ONCE(krcp->nr_bkv_objs);
> -		atomic_set(&krcp->backoff_page_cache_fill, 1);
> -	}
> -
> -	return count == 0 ? SHRINK_EMPTY : count;
> -}
> -
> -static unsigned long
> -kfree_rcu_shrink_scan(struct shrinker *shrink, struct shrink_control *sc)
> -{
> -	int cpu, freed = 0;
> -
> -	for_each_possible_cpu(cpu) {
> -		int count;
> -		struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
> -
> -		count = krc_count(krcp);
> -		count += drain_page_cache(krcp);
> -		kfree_rcu_monitor(&krcp->monitor_work.work);
> -
> -		sc->nr_to_scan -= count;
> -		freed += count;
> -
> -		if (sc->nr_to_scan <= 0)
> -			break;
> -	}
> -
> -	return freed == 0 ? SHRINK_STOP : freed;
> -}
> -
> -void __init kfree_rcu_scheduler_running(void)
> -{
> -	int cpu;
> -
> -	for_each_possible_cpu(cpu) {
> -		struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
> -
> -		if (need_offload_krc(krcp))
> -			schedule_delayed_monitor_work(krcp);
> -	}
> -}
> -
>  /*
>   * During early boot, any blocking grace-period wait automatically
>   * implies a grace period.
> @@ -5567,62 +4871,12 @@ static void __init rcu_dump_rcu_node_tree(void)
>  
>  struct workqueue_struct *rcu_gp_wq;
>  
> -static void __init kfree_rcu_batch_init(void)
> -{
> -	int cpu;
> -	int i, j;
> -	struct shrinker *kfree_rcu_shrinker;
> -
> -	/* Clamp it to [0:100] seconds interval. */
> -	if (rcu_delay_page_cache_fill_msec < 0 ||
> -		rcu_delay_page_cache_fill_msec > 100 * MSEC_PER_SEC) {
> -
> -		rcu_delay_page_cache_fill_msec =
> -			clamp(rcu_delay_page_cache_fill_msec, 0,
> -				(int) (100 * MSEC_PER_SEC));
> -
> -		pr_info("Adjusting rcutree.rcu_delay_page_cache_fill_msec to %d ms.\n",
> -			rcu_delay_page_cache_fill_msec);
> -	}
> -
> -	for_each_possible_cpu(cpu) {
> -		struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
> -
> -		for (i = 0; i < KFREE_N_BATCHES; i++) {
> -			INIT_RCU_WORK(&krcp->krw_arr[i].rcu_work, kfree_rcu_work);
> -			krcp->krw_arr[i].krcp = krcp;
> -
> -			for (j = 0; j < FREE_N_CHANNELS; j++)
> -				INIT_LIST_HEAD(&krcp->krw_arr[i].bulk_head_free[j]);
> -		}
> -
> -		for (i = 0; i < FREE_N_CHANNELS; i++)
> -			INIT_LIST_HEAD(&krcp->bulk_head[i]);
> -
> -		INIT_DELAYED_WORK(&krcp->monitor_work, kfree_rcu_monitor);
> -		INIT_DELAYED_WORK(&krcp->page_cache_work, fill_page_cache_func);
> -		krcp->initialized = true;
> -	}
> -
> -	kfree_rcu_shrinker = shrinker_alloc(0, "rcu-kfree");
> -	if (!kfree_rcu_shrinker) {
> -		pr_err("Failed to allocate kfree_rcu() shrinker!\n");
> -		return;
> -	}
> -
> -	kfree_rcu_shrinker->count_objects = kfree_rcu_shrink_count;
> -	kfree_rcu_shrinker->scan_objects = kfree_rcu_shrink_scan;
> -
> -	shrinker_register(kfree_rcu_shrinker);
> -}
> -
>  void __init rcu_init(void)
>  {
>  	int cpu = smp_processor_id();
>  
>  	rcu_early_boot_tests();
>  
> -	kfree_rcu_batch_init();
>  	rcu_bootup_announce();
>  	sanitize_kthread_prio();
>  	rcu_init_geometry();
> diff --git a/kernel/rcu/update.c b/kernel/rcu/update.c
> index f8436969e0c8..82ca9f490d7c 100644
> --- a/kernel/rcu/update.c
> +++ b/kernel/rcu/update.c
> @@ -273,7 +273,6 @@ static int __init rcu_set_runtime_mode(void)
>  {
>  	rcu_test_sync_prims();
>  	rcu_scheduler_active = RCU_SCHEDULER_RUNNING;
> -	kfree_rcu_scheduler_running();
>  	rcu_test_sync_prims();
>  	return 0;
>  }
> -- 
> 2.45.2
> 

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [PATCH 6/9] rcu: rcu_pending
  2024-08-19 16:59 ` [PATCH 6/9] rcu: rcu_pending Kent Overstreet
@ 2024-08-19 22:58   ` Paul E. McKenney
  2024-08-19 23:59     ` Kent Overstreet
  0 siblings, 1 reply; 24+ messages in thread
From: Paul E. McKenney @ 2024-08-19 22:58 UTC (permalink / raw)
  To: Kent Overstreet
  Cc: rcu, linux-kernel, vbabka, roman.gushchin, 42.hyeyoo, akpm, cl,
	mhocko, urezki, neeraj.upadhyay

I still remain quite skeptical of this one, but it has improved since
June's version.

Responses inline.

							Thanx, Paul

In the meantime, thank you for avoiding reaching into RCU's and SRCU's
innards.  This makes it reasonable to have you put this file into your
code, at least initially.  That way, you get what you want *now* and us
RCU guys are not committing to maintain it before it is ready for us to
do so.

On Mon, Aug 19, 2024 at 12:59:32PM -0400, Kent Overstreet wrote:
> Generic data structure for explicitly tracking pending RCU items,
> allowing items to be dequeued (i.e. allocate from items pending
> freeing). Works with conventional RCU and SRCU, and possibly other RCU
> flavors in the future, meaning this can serve as a more generic
> replacement for SLAB_TYPESAFE_BY_RCU.

I added a few of the slab guys on CC for his thoughts on what would be
required to make slabs typesafe by other forms of RCU.

> Pending items are tracked in radix trees; if memory allocation fails, we
> fall back to linked lists.

I still have strong misgivings about radix trees and cache locality.
Poor cache locality on the other side of the grace period can result
in OOMs during callback-flooding events, hence the move of kfree_rcu()
and friends to pages of pointers.  And, as you note below, radix trees
don't merge nicely.

> A rcu_pending is initialized with a callback, which is invoked when
> pending items's grace periods have expired. Two types of callback
> processing are handled specially:
> 
> - RCU_PENDING_KVFREE_FN
> 
>   New backend for kvfree_rcu(). Slightly faster, and eliminates the
>   synchronize_rcu() slowpath in kvfree_rcu_mightsleep() - instead, an
>   rcu_head is allocated if we don't have one and can't use the radix
>   tree

The advantage of synchronize_rcu() is that it can proceed without
memory allocation.  If you block on allocation, even of a 16-byte
rcu_head structure, you can go into OOM-induced deadlock.

It might well make sense to do an rcu_head allocation with GFP flags
that try reasonably hard, but still allow failure before falling all
the way back to synchronize_rcu().  (And for all I know, this might have
been tested and found wanting, but Uladzislau Rezki (CCed) would know.)
But a hard wait on that allocation is asking for trouble.

>   TODO:
>   - add a shrinker (as in the existing kvfree_rcu implementation) so that
>     memory reclaim can free expired objects if callback processing isn't
>     keeping up, and to expedite a grace period if we're under memory
>     pressure and too much memory is stranded by RCU

There is work underway to make the current callback lists take advantage
of expedited grace periods, transparent to the caller.  This allows
the shrinker (or whatever) to speed up everything by simply invoking
synchronize_rcu_expedited().  This speedup includes callback processing
because it avoids "bubbles" in the callback processing that can occur
when the next grace period has not yet completed, but all callbacks from
earlier grace periods have been invoked.

>   - add a counter for amount of memory pending

As noted earlier, for things like call_rcu() and synchronize_rcu(),
this needs help from the callers of those functions.

> - RCU_PENDING_CALL_RCU_FN
> 
>   Accelerated backend for call_rcu() - pending callbacks are tracked in
>   a radix tree to eliminate linked list overhead.

But to add radix-tree overhead.  And to eliminate the ability to do O(1)
list merges.  Is this really a win?

> to serve as replacement backends for kvfree_rcu() and call_rcu(); these
> may be of interest to other uses (e.g. SLAB_TYPESAFE_BY_RCU users).
> 
> Note:
> 
> Internally, we're using a single rearming call_rcu() callback for
> notifications from the core RCU subsystem for notifications when objects
> are ready to be processed.
> 
> Ideally we would be getting a callback every time a grace period
> completes for which we have objects, but that would require multiple
> rcu_heads in flight, and since the number of gp sequence numbers with
> uncompleted callbacks is not bounded, we can't do that yet.

Doesn't the call from __rcu_pending_enqueue() to process_finished_items()
serve this purpose?  After all, the case that causes trouble is the one
where lots of things are being very frequently queued.

> Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
> ---
>  include/linux/rcu_pending.h |  25 ++
>  kernel/rcu/Makefile         |   2 +-
>  kernel/rcu/pending.c        | 603 ++++++++++++++++++++++++++++++++++++
>  3 files changed, 629 insertions(+), 1 deletion(-)
>  create mode 100644 include/linux/rcu_pending.h
>  create mode 100644 kernel/rcu/pending.c
> 
> diff --git a/include/linux/rcu_pending.h b/include/linux/rcu_pending.h
> new file mode 100644
> index 000000000000..a875c640da8d
> --- /dev/null
> +++ b/include/linux/rcu_pending.h
> @@ -0,0 +1,25 @@
> +/* SPDX-License-Identifier: GPL-2.0 */
> +#ifndef _LINUX_RCU_PENDING_H
> +#define _LINUX_RCU_PENDING_H
> +
> +struct rcu_pending;
> +typedef void (*rcu_pending_process_fn)(struct rcu_pending *, struct rcu_head *);
> +
> +struct rcu_pending_pcpu;
> +
> +struct rcu_pending {
> +	struct rcu_pending_pcpu __percpu *p;
> +	struct srcu_struct		*srcu;
> +	rcu_pending_process_fn		process;
> +};
> +
> +void rcu_pending_enqueue(struct rcu_pending *pending, struct rcu_head *obj);
> +struct rcu_head *rcu_pending_dequeue(struct rcu_pending *pending);
> +struct rcu_head *rcu_pending_dequeue_from_all(struct rcu_pending *pending);
> +
> +void rcu_pending_exit(struct rcu_pending *pending);
> +int rcu_pending_init(struct rcu_pending *pending,
> +		     struct srcu_struct *srcu,
> +		     rcu_pending_process_fn process);
> +
> +#endif /* _LINUX_RCU_PENDING_H */
> diff --git a/kernel/rcu/Makefile b/kernel/rcu/Makefile
> index 0cfb009a99b9..2582f0324a11 100644
> --- a/kernel/rcu/Makefile
> +++ b/kernel/rcu/Makefile
> @@ -7,7 +7,7 @@ ifeq ($(CONFIG_KCSAN),y)
>  KBUILD_CFLAGS += -g -fno-omit-frame-pointer
>  endif
>  
> -obj-y += update.o sync.o
> +obj-y += update.o sync.o pending.o
>  obj-$(CONFIG_TREE_SRCU) += srcutree.o
>  obj-$(CONFIG_TINY_SRCU) += srcutiny.o
>  obj-$(CONFIG_RCU_TORTURE_TEST) += rcutorture.o
> diff --git a/kernel/rcu/pending.c b/kernel/rcu/pending.c
> new file mode 100644
> index 000000000000..c0e2351ba198
> --- /dev/null
> +++ b/kernel/rcu/pending.c
> @@ -0,0 +1,603 @@
> +// SPDX-License-Identifier: GPL-2.0
> +#define pr_fmt(fmt) "%s() " fmt "\n", __func__
> +
> +#include <linux/darray.h>
> +#include <linux/generic-radix-tree.h>
> +#include <linux/mm.h>
> +#include <linux/percpu.h>
> +#include <linux/rcu_pending.h>
> +#include <linux/slab.h>
> +#include <linux/srcu.h>
> +#include <linux/vmalloc.h>
> +
> +#include "rcu.h"
> +
> +#define static_array_for_each(_a, _i)			\
> +	for (typeof(&(_a)[0]) _i = _a;			\
> +	     _i < (_a) + ARRAY_SIZE(_a);		\
> +	     _i++)
> +
> +enum rcu_pending_special {
> +	RCU_PENDING_KVFREE	= 1,
> +	RCU_PENDING_CALL_RCU	= 2,
> +};
> +
> +#define RCU_PENDING_KVFREE_FN		((rcu_pending_process_fn) (ulong) RCU_PENDING_KVFREE)
> +#define RCU_PENDING_CALL_RCU_FN		((rcu_pending_process_fn) (ulong) RCU_PENDING_CALL_RCU)
> +
> +static inline unsigned long __get_state_synchronize_rcu(struct srcu_struct *ssp)
> +{
> +	return ssp
> +		? get_state_synchronize_srcu(ssp)
> +		: get_state_synchronize_rcu();
> +}
> +
> +static inline unsigned long __start_poll_synchronize_rcu(struct srcu_struct *ssp)
> +{
> +	return ssp
> +		? start_poll_synchronize_srcu(ssp)
> +		: start_poll_synchronize_rcu();
> +}
> +
> +static inline bool __poll_state_synchronize_rcu(struct srcu_struct *ssp, unsigned long cookie)
> +{
> +	return ssp
> +		? poll_state_synchronize_srcu(ssp, cookie)
> +		: poll_state_synchronize_rcu(cookie);
> +}
> +
> +static inline void __rcu_barrier(struct srcu_struct *ssp)
> +{
> +	return ssp
> +		? srcu_barrier(ssp)
> +		: rcu_barrier();
> +}
> +
> +static inline void __call_rcu(struct srcu_struct *ssp, struct rcu_head *rhp,
> +			      rcu_callback_t func)
> +{
> +	if (ssp)
> +		call_srcu(ssp, rhp, func);
> +	else
> +		call_rcu(rhp, func);
> +}

I do sympathize with the desire to provide RCU-variant-oblivious API
members, and maybe SRCU and RCU will be all that is required.  You might
need call_rcu_hurry()?

> +struct rcu_pending_seq {
> +	/*
> +	 * We're using a radix tree like a vector - we're just pushing elements
> +	 * onto the end; we're using a radix tree instead of an actual vector to
> +	 * avoid reallocation overhead
> +	 */
> +	GENRADIX(struct rcu_head *)	objs;

OK, I will bite...  Why not a doubly-linked list of pages of pointers?

Are you encountering situations where page-sized allocations fail,
but smaller allocations proceed nicely?  But that should be rare, and
also should be handledd by your fallback to linked list of rcu_head
structures, right?

> +	size_t				nr;
> +	struct rcu_head			**cursor;
> +	unsigned long			seq;
> +};
> +
> +struct rcu_pending_list {
> +	struct rcu_head			*head;
> +	struct rcu_head			*tail;
> +	unsigned long			seq;
> +};
> +
> +struct rcu_pending_pcpu {
> +	struct rcu_pending		*parent;
> +	spinlock_t			lock;
> +	int				cpu;
> +
> +	/*
> +	 * We can't bound the number of unprocessed gp sequence numbers, and we
> +	 * can't efficiently merge radix trees for expired grace periods, so we
> +	 * need darray/vector:
> +	 */
> +	DARRAY_PREALLOCATED(struct rcu_pending_seq, 4) objs;

This is a big strength of the current linked list of pages: Merging is
trivial.  Then there need only be one list for everything for which a
grace period has completed.

I ran out of time at this point.  I do have additional comments further
down due to searching for things, but this is where I will continue my
review before the end of this week.

> +	/* Third entry is for expired objects: */
> +	struct rcu_pending_list		lists[NUM_ACTIVE_RCU_POLL_OLDSTATE + 1];
> +
> +	struct rcu_head			cb;
> +	bool				cb_armed;
> +	struct work_struct		work;
> +};
> +
> +static bool __rcu_pending_has_pending(struct rcu_pending_pcpu *p)
> +{
> +	if (p->objs.nr)
> +		return true;
> +
> +	static_array_for_each(p->lists, i)
> +		if (i->head)
> +			return true;
> +
> +	return false;
> +}
> +
> +static void rcu_pending_list_merge(struct rcu_pending_list *l1,
> +				   struct rcu_pending_list *l2)
> +{
> +	if (!l1->head)
> +		l1->head = l2->head;
> +	else
> +		l1->tail->next = l2->head;
> +	l1->tail = l2->tail;
> +
> +	l2->head = l2->tail = NULL;
> +}

The rcu_segcblist structure handles this sort of thing straightforwardly.

> +static void rcu_pending_list_add(struct rcu_pending_list *l,
> +				 struct rcu_head *n)
> +{
> +	if (!l->head)
> +		l->head = n;
> +	else
> +		l->tail->next = n;
> +	l->tail = n;
> +	n->next = NULL;
> +}
> +
> +static void merge_expired_lists(struct rcu_pending_pcpu *p)
> +{
> +	struct rcu_pending_list *expired = &p->lists[NUM_ACTIVE_RCU_POLL_OLDSTATE];
> +
> +	for (struct rcu_pending_list *i = p->lists; i < expired; i++)
> +		if (i->head && __poll_state_synchronize_rcu(p->parent->srcu, i->seq))
> +			rcu_pending_list_merge(expired, i);
> +}
> +
> +static noinline void __process_finished_items(struct rcu_pending *pending,
> +					      struct rcu_pending_pcpu *p,
> +					      unsigned long flags)
> +{
> +	struct rcu_pending_list *expired = &p->lists[NUM_ACTIVE_RCU_POLL_OLDSTATE];
> +	struct rcu_pending_seq objs = {};
> +	struct rcu_head *list = NULL;
> +
> +	if (p->objs.nr &&
> +	    __poll_state_synchronize_rcu(pending->srcu, p->objs.data[0].seq)) {
> +		objs = p->objs.data[0];
> +		darray_remove_item(&p->objs, p->objs.data);
> +	}
> +
> +	merge_expired_lists(p);
> +
> +	list = expired->head;
> +	expired->head = expired->tail = NULL;
> +
> +	spin_unlock_irqrestore(&p->lock, flags);
> +
> +	switch ((ulong) pending->process) {
> +	case RCU_PENDING_KVFREE:
> +		for (size_t i = 0; i < objs.nr; ) {
> +			size_t nr_this_node = min(GENRADIX_NODE_SIZE / sizeof(void *), objs.nr - i);
> +
> +			kfree_bulk(nr_this_node, (void **) genradix_ptr(&objs.objs, i));
> +			i += nr_this_node;
> +		}
> +		genradix_free(&objs.objs);
> +
> +		while (list) {
> +			struct rcu_head *obj = list;
> +			list = obj->next;
> +
> +			/*
> +			 * low bit of pointer indicates whether rcu_head needs
> +			 * to be freed - kvfree_rcu_mightsleep()
> +			 */
> +			BUILD_BUG_ON(ARCH_SLAB_MINALIGN == 0);
> +
> +			void *ptr = (void *)(((unsigned long) obj->func) & ~1UL);
> +			kvfree(ptr);
> +
> +			bool free_head = ((unsigned long) obj->func) & 1UL;
> +			if (free_head)
> +				kfree(obj);
> +		}
> +
> +		break;
> +
> +	case RCU_PENDING_CALL_RCU:
> +		for (size_t i = 0; i < objs.nr; i++) {
> +			struct rcu_head *obj = *genradix_ptr(&objs.objs, i);
> +			obj->func(obj);
> +		}
> +		genradix_free(&objs.objs);
> +
> +		while (list) {
> +			struct rcu_head *obj = list;
> +			list = obj->next;
> +			obj->func(obj);
> +		}
> +		break;
> +
> +	default:
> +		for (size_t i = 0; i < objs.nr; i++)
> +			pending->process(pending, *genradix_ptr(&objs.objs, i));
> +		genradix_free(&objs.objs);
> +
> +		while (list) {
> +			struct rcu_head *obj = list;
> +			list = obj->next;
> +			pending->process(pending, obj);
> +		}
> +		break;
> +	}
> +}
> +
> +static bool process_finished_items(struct rcu_pending *pending,
> +				   struct rcu_pending_pcpu *p,
> +				   unsigned long flags)
> +{
> +	/*
> +	 * XXX: we should grab the gp seq once and avoid multiple function
> +	 * calls, this is called from __rcu_pending_enqueue() fastpath in
> +	 * may_sleep==true mode
> +	 */
> +	if ((p->objs.nr && __poll_state_synchronize_rcu(pending->srcu, p->objs.data[0].seq)) ||
> +	    (p->lists[0].head && __poll_state_synchronize_rcu(pending->srcu, p->lists[0].seq)) ||
> +	    (p->lists[1].head && __poll_state_synchronize_rcu(pending->srcu, p->lists[1].seq)) ||

One approach would be to create a variant of poll_state_synchronize_rcu()
that takes multiple cookies, and returns the index of the first one that
has expired.  Would that help?

> +	    p->lists[2].head) {
> +		__process_finished_items(pending, p, flags);
> +		return true;
> +	}
> +
> +	return false;
> +}
> +
> +static void rcu_pending_work(struct work_struct *work)
> +{
> +	struct rcu_pending_pcpu *p =
> +		container_of(work, struct rcu_pending_pcpu, work);
> +	struct rcu_pending *pending = p->parent;
> +	unsigned long flags;
> +
> +	do {
> +		spin_lock_irqsave(&p->lock, flags);
> +	} while (process_finished_items(pending, p, flags));
> +
> +	spin_unlock_irqrestore(&p->lock, flags);
> +}
> +
> +static void rcu_pending_rcu_cb(struct rcu_head *rcu)
> +{
> +	struct rcu_pending_pcpu *p = container_of(rcu, struct rcu_pending_pcpu, cb);
> +
> +	schedule_work_on(p->cpu, &p->work);
> +
> +	unsigned long flags;
> +	spin_lock_irqsave(&p->lock, flags);
> +	if (__rcu_pending_has_pending(p))
> +		__call_rcu(p->parent->srcu, &p->cb, rcu_pending_rcu_cb);
> +	else
> +		p->cb_armed = false;
> +	spin_unlock_irqrestore(&p->lock, flags);
> +}
> +
> +static __always_inline struct rcu_pending_seq *
> +get_object_radix(struct rcu_pending_pcpu *p, unsigned long seq)
> +{
> +	darray_for_each_reverse(p->objs, objs)
> +		if (objs->seq == seq)
> +			return objs;
> +
> +	if (darray_push_gfp(&p->objs, ((struct rcu_pending_seq) { .seq = seq }), GFP_ATOMIC))
> +		return NULL;
> +
> +	return &darray_last(p->objs);
> +}
> +
> +static noinline bool
> +rcu_pending_enqueue_list(struct rcu_pending_pcpu *p, unsigned long seq,
> +			 struct rcu_head *head, void *ptr,
> +			 unsigned long *flags)
> +{
> +	if (ptr) {
> +		if (!head) {
> +			/*
> +			 * kvfree_rcu_mightsleep(): we weren't passed an
> +			 * rcu_head, but we need one: use the low bit of the
> +			 * ponter to free to flag that the head needs to be
> +			 * freed as well:
> +			 */
> +			ptr = (void *)(((unsigned long) ptr)|1UL);
> +			head = kmalloc(sizeof(*head), __GFP_NOWARN);
> +			if (!head) {
> +				spin_unlock_irqrestore(&p->lock, *flags);
> +				head = kmalloc(sizeof(*head), GFP_KERNEL|__GFP_NOFAIL);
> +				/*
> +				 * dropped lock, did GFP_KERNEL allocation,
> +				 * check for gp expiration
> +				 */
> +				if (unlikely(__poll_state_synchronize_rcu(p->parent->srcu, seq))) {
> +					kvfree(--ptr);
> +					kfree(head);
> +					spin_lock_irqsave(&p->lock, *flags);
> +					return false;
> +				}
> +			}
> +		}
> +
> +		head->func = ptr;
> +	}
> +again:
> +	for (struct rcu_pending_list *i = p->lists;
> +	     i < p->lists + NUM_ACTIVE_RCU_POLL_OLDSTATE; i++) {
> +		if (i->seq == seq) {
> +			rcu_pending_list_add(i, head);
> +			return false;
> +		}
> +	}
> +
> +	for (struct rcu_pending_list *i = p->lists;
> +	     i < p->lists + NUM_ACTIVE_RCU_POLL_OLDSTATE; i++) {
> +		if (!i->head) {
> +			i->seq = seq;
> +			rcu_pending_list_add(i, head);
> +			return true;
> +		}
> +	}
> +
> +	merge_expired_lists(p);
> +	goto again;
> +}
> +
> +/*
> + * __rcu_pending_enqueue: enqueue a pending RCU item, to be processed (via
> + * pending->pracess) once grace period elapses.
> + *
> + * Attempt to enqueue items onto a radix tree; if memory allocation fails, fall
> + * back to a linked list.
> + *
> + * - If @ptr is NULL, we're enqueuing an item for a generic @pending with a
> + *   process callback
> + *
> + * - If @ptr and @head are both not NULL, we're kvfree_rcu()
> + *
> + * - If @ptr is not NULL and @head is, we're kvfree_rcu_mightsleep()
> + *
> + * - If @may_sleep is true, will do GFP_KERNEL memory allocations and process
> + *   expired items.
> + */
> +static __always_inline void
> +__rcu_pending_enqueue(struct rcu_pending *pending, struct rcu_head *head,
> +		      void *ptr, bool may_sleep)
> +{
> +
> +	struct rcu_pending_pcpu *p;
> +	struct rcu_pending_seq *objs;
> +	struct genradix_node *new_node = NULL;
> +	unsigned long seq, flags;
> +	bool start_gp = false;
> +
> +	BUG_ON((ptr != NULL) != (pending->process == RCU_PENDING_KVFREE_FN));
> +
> +	local_irq_save(flags);
> +	p = this_cpu_ptr(pending->p);
> +	spin_lock(&p->lock);
> +	seq = __get_state_synchronize_rcu(pending->srcu);
> +restart:
> +	if (may_sleep &&
> +	    unlikely(process_finished_items(pending, p, flags)))
> +		goto check_expired;
> +
> +	/*
> +	 * In kvfree_rcu() mode, the radix tree is only for slab pointers so
> +	 * that we can do kfree_bulk() - vmalloc pointers always use the linked
> +	 * list:
> +	 */
> +	if (ptr && unlikely(is_vmalloc_addr_inlined(ptr)))
> +		goto list_add;
> +
> +	objs = get_object_radix(p, seq);
> +	if (unlikely(!objs))
> +		goto list_add;
> +
> +	if (unlikely(!objs->cursor)) {
> +		/*
> +		 * New radix tree nodes must be added under @p->lock because the
> +		 * tree root is in a darray that can be resized (typically,
> +		 * genradix supports concurrent unlocked allocation of new
> +		 * nodes) - hence preallocation and the retry loop:
> +		 */
> +		objs->cursor = genradix_ptr_alloc_preallocated_inlined(&objs->objs,
> +						objs->nr, &new_node, GFP_ATOMIC|__GFP_NOWARN);
> +		if (unlikely(!objs->cursor)) {
> +			if (may_sleep) {
> +				spin_unlock_irqrestore(&p->lock, flags);
> +
> +				gfp_t gfp = GFP_KERNEL;
> +				if (!head)
> +					gfp |= __GFP_NOFAIL;
> +
> +				new_node = genradix_alloc_node(gfp);
> +				if (!new_node)
> +					may_sleep = false;
> +				goto check_expired;
> +			}
> +list_add:
> +			start_gp = rcu_pending_enqueue_list(p, seq, head, ptr, &flags);
> +			goto start_gp;
> +		}
> +	}
> +
> +	*objs->cursor++ = ptr ?: head;
> +	/* zero cursor if we hit the end of a radix tree node: */
> +	if (!(((ulong) objs->cursor) & (GENRADIX_NODE_SIZE - 1)))
> +		objs->cursor = NULL;
> +	start_gp = !objs->nr;
> +	objs->nr++;
> +start_gp:
> +	if (unlikely(start_gp)) {
> +		/*
> +		 * We only have one callback (ideally, we would have one for
> +		 * every outstanding graceperiod) - so if our callback is
> +		 * already in flight, we may still have to start a grace period
> +		 * (since we used get_state() above, not start_poll())
> +		 */
> +		if (!p->cb_armed) {
> +			p->cb_armed = true;
> +			__call_rcu(pending->srcu, &p->cb, rcu_pending_rcu_cb);
> +		} else {
> +			__start_poll_synchronize_rcu(pending->srcu);
> +		}
> +	}
> +	spin_unlock_irqrestore(&p->lock, flags);
> +free_node:
> +	if (new_node)
> +		genradix_free_node(new_node);
> +	return;
> +check_expired:
> +	if (unlikely(__poll_state_synchronize_rcu(pending->srcu, seq))) {
> +		switch ((ulong) pending->process) {
> +		case RCU_PENDING_KVFREE:
> +			kvfree(ptr);
> +			break;
> +		case RCU_PENDING_CALL_RCU:
> +			head->func(head);
> +			break;
> +		default:
> +			pending->process(pending, head);
> +			break;
> +		}
> +		goto free_node;
> +	}
> +
> +	local_irq_save(flags);
> +	p = this_cpu_ptr(pending->p);
> +	spin_lock(&p->lock);
> +	goto restart;
> +}
> +
> +void rcu_pending_enqueue(struct rcu_pending *pending, struct rcu_head *obj)
> +{
> +	__rcu_pending_enqueue(pending, obj, NULL, true);
> +}
> +
> +static struct rcu_head *rcu_pending_pcpu_dequeue(struct rcu_pending_pcpu *p)
> +{
> +	struct rcu_head *ret = NULL;
> +
> +	spin_lock_irq(&p->lock);
> +	darray_for_each(p->objs, objs)
> +		if (objs->nr) {
> +			ret = *genradix_ptr(&objs->objs, --objs->nr);
> +			objs->cursor = NULL;
> +			if (!objs->nr)
> +				genradix_free(&objs->objs);
> +			goto out;
> +		}
> +
> +	static_array_for_each(p->lists, i)
> +		if (i->head) {
> +			ret = i->head;
> +			i->head = ret->next;
> +			if (!i->head)
> +				i->tail = NULL;
> +			goto out;
> +		}
> +out:
> +	spin_unlock_irq(&p->lock);
> +
> +	return ret;
> +}
> +
> +struct rcu_head *rcu_pending_dequeue(struct rcu_pending *pending)
> +{
> +	return rcu_pending_pcpu_dequeue(raw_cpu_ptr(pending->p));
> +}
> +
> +struct rcu_head *rcu_pending_dequeue_from_all(struct rcu_pending *pending)
> +{
> +	struct rcu_head *ret = rcu_pending_dequeue(pending);
> +
> +	if (ret)
> +		return ret;
> +
> +	int cpu;
> +	for_each_possible_cpu(cpu) {
> +		ret = rcu_pending_pcpu_dequeue(per_cpu_ptr(pending->p, cpu));
> +		if (ret)
> +			break;
> +	}
> +	return ret;
> +}
> +
> +static bool rcu_pending_has_pending_or_armed(struct rcu_pending *pending)
> +{
> +	int cpu;
> +	for_each_possible_cpu(cpu) {
> +		struct rcu_pending_pcpu *p = per_cpu_ptr(pending->p, cpu);
> +		spin_lock_irq(&p->lock);
> +		if (__rcu_pending_has_pending(p) || p->cb_armed) {
> +			spin_unlock_irq(&p->lock);
> +			return true;
> +		}
> +		spin_unlock_irq(&p->lock);
> +	}
> +
> +	return false;
> +}
> +
> +void rcu_pending_exit(struct rcu_pending *pending)
> +{
> +	int cpu;
> +
> +	if (!pending->p)
> +		return;
> +
> +	while (rcu_pending_has_pending_or_armed(pending)) {
> +		__rcu_barrier(pending->srcu);
> +
> +		for_each_possible_cpu(cpu) {
> +			struct rcu_pending_pcpu *p = per_cpu_ptr(pending->p, cpu);
> +			flush_work(&p->work);
> +		}
> +	}
> +
> +	for_each_possible_cpu(cpu) {
> +		struct rcu_pending_pcpu *p = per_cpu_ptr(pending->p, cpu);
> +		flush_work(&p->work);
> +	}
> +
> +	for_each_possible_cpu(cpu) {
> +		struct rcu_pending_pcpu *p = per_cpu_ptr(pending->p, cpu);
> +
> +		static_array_for_each(p->lists, i)
> +			WARN_ON(i->head);
> +		WARN_ON(p->objs.nr);
> +		darray_exit(&p->objs);
> +	}
> +	free_percpu(pending->p);
> +}
> +
> +/**
> + * rcu_pending_init: - initialize a rcu_pending
> + *
> + * @pending:	Object to init
> + * @srcu:	May optionally be used with an srcu_struct; if NULL, uses normal
> + *		RCU flavor
> + * @process:	Callback function invoked on objects once their RCU barriers
> + *		have completed; if NULL, kvfree() is used.
> + */
> +int rcu_pending_init(struct rcu_pending *pending,
> +		     struct srcu_struct *srcu,
> +		     rcu_pending_process_fn process)
> +{
> +	pending->p = alloc_percpu(struct rcu_pending_pcpu);
> +	if (!pending->p)
> +		return -ENOMEM;
> +
> +	int cpu;
> +	for_each_possible_cpu(cpu) {
> +		struct rcu_pending_pcpu *p = per_cpu_ptr(pending->p, cpu);
> +		p->parent	= pending;
> +		p->cpu		= cpu;
> +		spin_lock_init(&p->lock);
> +		darray_init(&p->objs);
> +		INIT_WORK(&p->work, rcu_pending_work);
> +	}
> +
> +	pending->srcu = srcu;
> +	pending->process = process;
> +
> +	return 0;
> +}
> -- 
> 2.45.2
> 

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [PATCH 9/9] rcu: Switch kvfree_rcu() to new rcu_pending
  2024-08-19 22:18   ` Paul E. McKenney
@ 2024-08-19 23:05     ` Kent Overstreet
  0 siblings, 0 replies; 24+ messages in thread
From: Kent Overstreet @ 2024-08-19 23:05 UTC (permalink / raw)
  To: Paul E. McKenney; +Cc: rcu, linux-kernel, linux-arch

On Mon, Aug 19, 2024 at 03:18:02PM GMT, Paul E. McKenney wrote:
> On Mon, Aug 19, 2024 at 12:59:35PM -0400, Kent Overstreet wrote:
> > This nets us a slight performance increase, and converts to common
> > code.
> 
> How did you measure the performance, and what was the actual amount
> of the increase?
> 
> Either way, the performance increase needs to be verified by the people
> for whom the current code works well.  As noted a couple months ago,
> I am having a hard time imagining a tree beating a linked list of pages
> of pointers in terms of cache locality.  I added linux-arch on CC in
> case I am out of date on how computer systems work.

The point isn't performance, I was just noting what I saw, and the small
performance increase just came from carefull optimization, nothing
special. The old and new versions are close enough to not matter.

The point of the patchset is to consolidate kvfree_rcu() and
SLAB_TYPESAFE_BY_RCU into something more general, and dare I say it,
something a bit saner.

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [PATCH 0/9] rcu_pending
  2024-08-19 16:59 [PATCH 0/9] rcu_pending Kent Overstreet
                   ` (8 preceding siblings ...)
  2024-08-19 16:59 ` [PATCH 9/9] rcu: Switch kvfree_rcu() to new rcu_pending Kent Overstreet
@ 2024-08-19 23:07 ` Paul E. McKenney
  2024-08-19 23:29   ` Kent Overstreet
  9 siblings, 1 reply; 24+ messages in thread
From: Paul E. McKenney @ 2024-08-19 23:07 UTC (permalink / raw)
  To: Kent Overstreet; +Cc: rcu, linux-kernel

On Mon, Aug 19, 2024 at 12:59:26PM -0400, Kent Overstreet wrote:
> New data structure for tracking objects waiting on an RCU grace period.
> Supports regular RCU and SRCU, and possibly other RCU flavors in the
> future. Uses radix trees for tracking pending objects, falling back to
> linked lists on allocation failure.
> 
> This gets us a more general replacement for SLAB_TYPESAFE_BY_RCU, and a
> cleaner and slightly faster backend for kvfree_call_rcu(), and
> in the future a faster backend for call_rcu() as well.
> 
> There's still some small todo items, mentioned in the relevant patches.
> 
> Paul - I'm considering putting this into 6.11 for bcachefs (not the
> patch that switches kvfree_rcu, of course), as I need it rather
> pressingly. Thoughts? I can put it in fs/bcachefs/ if you hate it :)

I am having a hard time imagining it being ready for inclusion in
kernel/rcu by the upcoming (v6.12) merge window, so if you need it then,
you will need to pull it into fs/bcachefs.

Don't get me wrong, it has improved since the June version.  And we
can always move it later, if/when appropriate.

And yes, the kvfree_rcu() changes would need some serious performance
evaluation by the people who benefitted from the changes that make
kvfree_rcu() what it is today.  So let's please defer that one.

Having the equivalent of SLAB_TYPESAFE_BY_SRCU could be attractive,
but I suspect that there are smaller changes that get us that with the
existing slab allocator.

							Thanx, Paul

> Kent Overstreet (9):
>   lib/generic-radix-tree.c: genradix_ptr_inlined()
>   lib/generic-radix-tree.c: add preallocation
>   darray: lift from bcachefs
>   vmalloc: is_vmalloc_addr_inlined()
>   rcu: delete lockdep_assert_irqs_enabled() assert in
>     start_poll_synchronize_rcu_common()
>   rcu: rcu_pending
>   bcachefs: Rip out freelists from btree key cache
>   bcachefs: key cache can now allocate from pending
>   rcu: Switch kvfree_rcu() to new rcu_pending
> 
>  MAINTAINERS                             |   7 +
>  fs/bcachefs/Makefile                    |   1 -
>  fs/bcachefs/btree_key_cache.c           | 406 +++----------
>  fs/bcachefs/btree_key_cache_types.h     |  18 +-
>  fs/bcachefs/btree_node_scan_types.h     |   2 +-
>  fs/bcachefs/btree_types.h               |   5 +-
>  fs/bcachefs/btree_update.c              |   2 +
>  fs/bcachefs/btree_write_buffer_types.h  |   2 +-
>  fs/bcachefs/disk_accounting_types.h     |   2 +-
>  fs/bcachefs/fsck.c                      |   2 +-
>  fs/bcachefs/journal_io.h                |   2 +-
>  fs/bcachefs/journal_sb.c                |   2 +-
>  fs/bcachefs/sb-downgrade.c              |   3 +-
>  fs/bcachefs/sb-errors_types.h           |   2 +-
>  fs/bcachefs/sb-members.h                |   2 +-
>  fs/bcachefs/subvolume.h                 |   1 -
>  fs/bcachefs/subvolume_types.h           |   2 +-
>  fs/bcachefs/thread_with_file_types.h    |   2 +-
>  fs/bcachefs/util.h                      |  29 +-
>  {fs/bcachefs => include/linux}/darray.h |  59 +-
>  include/linux/darray_types.h            |  22 +
>  include/linux/generic-radix-tree.h      | 106 +++-
>  include/linux/mm.h                      |   7 +
>  include/linux/rcu_pending.h             |  27 +
>  init/main.c                             |   2 +
>  kernel/rcu/Makefile                     |   2 +-
>  kernel/rcu/pending.c                    | 623 ++++++++++++++++++++
>  kernel/rcu/tree.c                       | 747 ------------------------
>  kernel/rcu/update.c                     |   1 -
>  lib/Makefile                            |   2 +-
>  {fs/bcachefs => lib}/darray.c           |  12 +-
>  lib/generic-radix-tree.c                |  80 +--
>  mm/vmalloc.c                            |   4 +-
>  33 files changed, 962 insertions(+), 1224 deletions(-)
>  rename {fs/bcachefs => include/linux}/darray.h (66%)
>  create mode 100644 include/linux/darray_types.h
>  create mode 100644 include/linux/rcu_pending.h
>  create mode 100644 kernel/rcu/pending.c
>  rename {fs/bcachefs => lib}/darray.c (57%)
> 
> -- 
> 2.45.2
> 

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [PATCH 0/9] rcu_pending
  2024-08-19 23:07 ` [PATCH 0/9] rcu_pending Paul E. McKenney
@ 2024-08-19 23:29   ` Kent Overstreet
  0 siblings, 0 replies; 24+ messages in thread
From: Kent Overstreet @ 2024-08-19 23:29 UTC (permalink / raw)
  To: Paul E. McKenney; +Cc: rcu, linux-kernel

On Mon, Aug 19, 2024 at 04:07:16PM GMT, Paul E. McKenney wrote:
> On Mon, Aug 19, 2024 at 12:59:26PM -0400, Kent Overstreet wrote:
> > New data structure for tracking objects waiting on an RCU grace period.
> > Supports regular RCU and SRCU, and possibly other RCU flavors in the
> > future. Uses radix trees for tracking pending objects, falling back to
> > linked lists on allocation failure.
> > 
> > This gets us a more general replacement for SLAB_TYPESAFE_BY_RCU, and a
> > cleaner and slightly faster backend for kvfree_call_rcu(), and
> > in the future a faster backend for call_rcu() as well.
> > 
> > There's still some small todo items, mentioned in the relevant patches.
> > 
> > Paul - I'm considering putting this into 6.11 for bcachefs (not the
> > patch that switches kvfree_rcu, of course), as I need it rather
> > pressingly. Thoughts? I can put it in fs/bcachefs/ if you hate it :)
> 
> I am having a hard time imagining it being ready for inclusion in
> kernel/rcu by the upcoming (v6.12) merge window, so if you need it then,
> you will need to pull it into fs/bcachefs.
> 
> Don't get me wrong, it has improved since the June version.  And we
> can always move it later, if/when appropriate.
> 
> And yes, the kvfree_rcu() changes would need some serious performance
> evaluation by the people who benefitted from the changes that make
> kvfree_rcu() what it is today.  So let's please defer that one.
> 
> Having the equivalent of SLAB_TYPESAFE_BY_SRCU could be attractive,
> but I suspect that there are smaller changes that get us that with the
> existing slab allocator.

I seriously doubt that - where are you going to stash the srcu_struct
pointer?

This is small and self contained, which is a _major_ win vs. dumping
more complex algorithmic stuff into slub.

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [PATCH 6/9] rcu: rcu_pending
  2024-08-19 22:58   ` Paul E. McKenney
@ 2024-08-19 23:59     ` Kent Overstreet
  2024-08-26 14:44       ` Paul E. McKenney
  0 siblings, 1 reply; 24+ messages in thread
From: Kent Overstreet @ 2024-08-19 23:59 UTC (permalink / raw)
  To: Paul E. McKenney
  Cc: rcu, linux-kernel, vbabka, roman.gushchin, 42.hyeyoo, akpm, cl,
	mhocko, urezki, neeraj.upadhyay

On Mon, Aug 19, 2024 at 03:58:26PM GMT, Paul E. McKenney wrote:
> I still remain quite skeptical of this one, but it has improved since
> June's version.
> 
> Responses inline.
> 
> 							Thanx, Paul
> 
> In the meantime, thank you for avoiding reaching into RCU's and SRCU's
> innards.  This makes it reasonable to have you put this file into your
> code, at least initially.  That way, you get what you want *now* and us
> RCU guys are not committing to maintain it before it is ready for us to
> do so.

The RCU interfaces do force a lot of function calls for things that
should be inlined though, and the gratuitious interface fragmentation
between RCU and SRCU is... annoying.

> I still have strong misgivings about radix trees and cache locality.
> Poor cache locality on the other side of the grace period can result
> in OOMs during callback-flooding events, hence the move of kfree_rcu()
> and friends to pages of pointers.  And, as you note below, radix trees
> don't merge nicely.

Cache locality where?

On the enqueue side, which is the fastpath, this uses a cursor - we're
not walking the radix tree every time.

On the processing side, where we're kfree_bulk()ing entire radix tree
nodes, the radix tree is going to have better cache locality than a list
of pages.

> The advantage of synchronize_rcu() is that it can proceed without
> memory allocation.  If you block on allocation, even of a 16-byte
> rcu_head structure, you can go into OOM-induced deadlock.
> 
> It might well make sense to do an rcu_head allocation with GFP flags
> that try reasonably hard, but still allow failure before falling all
> the way back to synchronize_rcu().  (And for all I know, this might have
> been tested and found wanting, but Uladzislau Rezki (CCed) would know.)
> But a hard wait on that allocation is asking for trouble.

That's reasonable - as long as we're trying the 16 byte allocation
before doing a synchronize_rcu().

> There is work underway to make the current callback lists take advantage
> of expedited grace periods, transparent to the caller.  This allows
> the shrinker (or whatever) to speed up everything by simply invoking
> synchronize_rcu_expedited().  This speedup includes callback processing
> because it avoids "bubbles" in the callback processing that can occur
> when the next grace period has not yet completed, but all callbacks from
> earlier grace periods have been invoked.

Glad to here, that was first on my list when adding a shrinker to this
code.

> > - RCU_PENDING_CALL_RCU_FN
> > 
> >   Accelerated backend for call_rcu() - pending callbacks are tracked in
> >   a radix tree to eliminate linked list overhead.
> 
> But to add radix-tree overhead.  And to eliminate the ability to do O(1)
> list merges.  Is this really a win?

As mentioned, there's a cursor so we're not adding radix-tree overhead
to the fast path, and there's no need to merge lists for expired objects
- that's all handled fine.

But yes, using it for call_rcu() would need more performance
justification. I haven't seen workloads where call_rcu() performance
particularly matters, so it's not something I'm pushing for - I included
that because it's minimal code and other people might know of workloads
where we do want it.

> > Ideally we would be getting a callback every time a grace period
> > completes for which we have objects, but that would require multiple
> > rcu_heads in flight, and since the number of gp sequence numbers with
> > uncompleted callbacks is not bounded, we can't do that yet.
> 
> Doesn't the call from __rcu_pending_enqueue() to process_finished_items()
> serve this purpose?  After all, the case that causes trouble is the one
> where lots of things are being very frequently queued.

No, because that's unpredictable, and we don't process pending items in
enqueue() unless we know we can sleep (i.e., we don't do it if the
caller is latency sensitive).

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [PATCH 3/9] darray: lift from bcachefs
  2024-08-19 16:59 ` [PATCH 3/9] darray: lift from bcachefs Kent Overstreet
@ 2024-08-21  1:00   ` Jeff Johnson
  0 siblings, 0 replies; 24+ messages in thread
From: Jeff Johnson @ 2024-08-21  1:00 UTC (permalink / raw)
  To: Kent Overstreet, rcu; +Cc: paulmck, linux-kernel

On 8/19/24 09:59, Kent Overstreet wrote:
> dynamic arrays - inspired from CCAN darrays, basically c++ stl vectors.
> 
> Used by thread_with_stdio, which is also being lifted from bcachefs for
> xfs.
> 
> Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
> ---
>  MAINTAINERS                             |  7 +++
>  fs/bcachefs/Makefile                    |  1 -
>  fs/bcachefs/btree_node_scan_types.h     |  2 +-
>  fs/bcachefs/btree_types.h               |  2 +-
>  fs/bcachefs/btree_update.c              |  2 +
>  fs/bcachefs/btree_write_buffer_types.h  |  2 +-
>  fs/bcachefs/disk_accounting_types.h     |  2 +-
>  fs/bcachefs/fsck.c                      |  2 +-
>  fs/bcachefs/journal_io.h                |  2 +-
>  fs/bcachefs/journal_sb.c                |  2 +-
>  fs/bcachefs/sb-downgrade.c              |  3 +-
>  fs/bcachefs/sb-errors_types.h           |  2 +-
>  fs/bcachefs/sb-members.h                |  2 +-
>  fs/bcachefs/subvolume.h                 |  1 -
>  fs/bcachefs/subvolume_types.h           |  2 +-
>  fs/bcachefs/thread_with_file_types.h    |  2 +-
>  fs/bcachefs/util.h                      | 29 +-----------
>  {fs/bcachefs => include/linux}/darray.h | 59 ++++++++++++++++---------
>  include/linux/darray_types.h            | 22 +++++++++
>  lib/Makefile                            |  2 +-
>  {fs/bcachefs => lib}/darray.c           | 12 ++++-
>  21 files changed, 96 insertions(+), 64 deletions(-)
>  rename {fs/bcachefs => include/linux}/darray.h (66%)
>  create mode 100644 include/linux/darray_types.h
>  rename {fs/bcachefs => lib}/darray.c (57%)

...

> diff --git a/fs/bcachefs/darray.c b/lib/darray.c
> similarity index 57%
> rename from fs/bcachefs/darray.c
> rename to lib/darray.c
> index b7d223f85873..b6868db7f956 100644
> --- a/fs/bcachefs/darray.c
> +++ b/lib/darray.c
> @@ -1,10 +1,14 @@
>  // SPDX-License-Identifier: GPL-2.0
> +/*
> + * (C) 2022-2024 Kent Overstreet <kent.overstreet@linux.dev>
> + */
>  
> +#include <linux/darray.h>
>  #include <linux/log2.h>
> +#include <linux/module.h>
>  #include <linux/slab.h>
> -#include "darray.h"
>  
> -int __bch2_darray_resize(darray_char *d, size_t element_size, size_t new_size, gfp_t gfp)
> +int __darray_resize_slowpath(darray_char *d, size_t element_size, size_t new_size, gfp_t gfp)
>  {
>  	if (new_size > d->size) {
>  		new_size = roundup_pow_of_two(new_size);
> @@ -23,3 +27,7 @@ int __bch2_darray_resize(darray_char *d, size_t element_size, size_t new_size, g
>  
>  	return 0;
>  }
> +EXPORT_SYMBOL_GPL(__darray_resize_slowpath);
> +
> +MODULE_AUTHOR("Kent Overstreet");
> +MODULE_LICENSE("GPL");

Since commit 1fffe7a34c89 ("script: modpost: emit a warning when the
description is missing"), a module without a MODULE_DESCRIPTION() will
result in a warning when built with make W=1. Recently, multiple
developers, including myself, have been eradicating these warnings
treewide, and to prevent new instances from being introduced I scan lore
for patches which contain a MODULE_LICENSE but do not contain a
MODULE_DESCRIPTION(). This patch was flagged.

Normally I just ask that a MODULE_DESCRIPTION() be added. But in this
case I don't see how this can ever be built as a module since it is part
of an obj-y rule, and hence, unless I'm missing something, will always
be built-in. So in this case I would suggest removing the MODULE_AUTHOR
and MODULE_LICENSE.

However, if you do anticipate this could be built as a module in the
future, please just add the MODULE_DESCRIPTION().

/jeff



^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [PATCH 6/9] rcu: rcu_pending
  2024-08-19 23:59     ` Kent Overstreet
@ 2024-08-26 14:44       ` Paul E. McKenney
  2024-08-26 15:17         ` Kent Overstreet
  0 siblings, 1 reply; 24+ messages in thread
From: Paul E. McKenney @ 2024-08-26 14:44 UTC (permalink / raw)
  To: Kent Overstreet
  Cc: rcu, linux-kernel, vbabka, roman.gushchin, 42.hyeyoo, akpm, cl,
	mhocko, urezki, neeraj.upadhyay

On Mon, Aug 19, 2024 at 07:59:34PM -0400, Kent Overstreet wrote:
> On Mon, Aug 19, 2024 at 03:58:26PM GMT, Paul E. McKenney wrote:
> > I still remain quite skeptical of this one, but it has improved since
> > June's version.
> > 
> > Responses inline.
> > 
> > 							Thanx, Paul
> > 
> > In the meantime, thank you for avoiding reaching into RCU's and SRCU's
> > innards.  This makes it reasonable to have you put this file into your
> > code, at least initially.  That way, you get what you want *now* and us
> > RCU guys are not committing to maintain it before it is ready for us to
> > do so.
> 
> The RCU interfaces do force a lot of function calls for things that
> should be inlined though, and the gratuitious interface fragmentation
> between RCU and SRCU is... annoying.

I have gotten requests for a variant of SRCU that dispenses with the
read-side memory barriers, which would allow you to dispense with RCU.
Maybe an srcu_read_lock_lite() and srcu_read_unlock_lite(), similar to
the _nmisafe() variants.  There is always a price, and in this case the
price would be that srcu_read_lock_lite() and srcu_read_unlock_lite()
be restricted to code regions where RCU is watching.  But from what I
know, fs code must already abide by that restriction.

Would that help?

> > I still have strong misgivings about radix trees and cache locality.
> > Poor cache locality on the other side of the grace period can result
> > in OOMs during callback-flooding events, hence the move of kfree_rcu()
> > and friends to pages of pointers.  And, as you note below, radix trees
> > don't merge nicely.
> 
> Cache locality where?
> 
> On the enqueue side, which is the fastpath, this uses a cursor - we're
> not walking the radix tree every time.
> 
> On the processing side, where we're kfree_bulk()ing entire radix tree
> nodes, the radix tree is going to have better cache locality than a list
> of pages.

Interesting.  What testing or analysis did you do in the course of
arriving at this conclusion?  In the meantime I will assume that the
analysis involves your radix-tree nodes being more than one page in size.

> > The advantage of synchronize_rcu() is that it can proceed without
> > memory allocation.  If you block on allocation, even of a 16-byte
> > rcu_head structure, you can go into OOM-induced deadlock.
> > 
> > It might well make sense to do an rcu_head allocation with GFP flags
> > that try reasonably hard, but still allow failure before falling all
> > the way back to synchronize_rcu().  (And for all I know, this might have
> > been tested and found wanting, but Uladzislau Rezki (CCed) would know.)
> > But a hard wait on that allocation is asking for trouble.
> 
> That's reasonable - as long as we're trying the 16 byte allocation
> before doing a synchronize_rcu().

It might or might not be reasonable, depending on what is going on in the
rest of the system.  The current kfree_rcu() code can run the system out
of full pages, but it won't impede other code allocating smaller blocks
of memory.  We could easily change it to allocate individual rcu_head
structures, but doing so would not necessarily be a win in OOM conditions,
again, depending on what else is going on.

We are very likely to add this, but also very likely to have a "chicken
switch" to allow the system administrator to control it.

But I freely concede that if your radix tree is using multi-page nodes,
then your code might well have greater need to allocate individual
rcu_head structures than does the current kfree_rcu() implementation.

> > There is work underway to make the current callback lists take advantage
> > of expedited grace periods, transparent to the caller.  This allows
> > the shrinker (or whatever) to speed up everything by simply invoking
> > synchronize_rcu_expedited().  This speedup includes callback processing
> > because it avoids "bubbles" in the callback processing that can occur
> > when the next grace period has not yet completed, but all callbacks from
> > earlier grace periods have been invoked.
> 
> Glad to here, that was first on my list when adding a shrinker to this
> code.

Glad you approve.

This has been on my list for quite some time, and we now have thing in
place to make it easy.  Well, easier, anyway.

> > > - RCU_PENDING_CALL_RCU_FN
> > > 
> > >   Accelerated backend for call_rcu() - pending callbacks are tracked in
> > >   a radix tree to eliminate linked list overhead.
> > 
> > But to add radix-tree overhead.  And to eliminate the ability to do O(1)
> > list merges.  Is this really a win?
> 
> As mentioned, there's a cursor so we're not adding radix-tree overhead
> to the fast path, and there's no need to merge lists for expired objects
> - that's all handled fine.

I took a quick look at __rcu_pending_enqueue() and my eyes are still
bleeding.  Concatenating linked list of pages is way simpler, way faster,
and way more robust.

> But yes, using it for call_rcu() would need more performance
> justification. I haven't seen workloads where call_rcu() performance
> particularly matters, so it's not something I'm pushing for - I included
> that because it's minimal code and other people might know of workloads
> where we do want it.

Plus, unlike kfree_rcu() post-grace-period handling, call_rcu() callback
functions usually access the memory block passed to them, which means
that they are incurring that per-element cache miss in any case.

> > > Ideally we would be getting a callback every time a grace period
> > > completes for which we have objects, but that would require multiple
> > > rcu_heads in flight, and since the number of gp sequence numbers with
> > > uncompleted callbacks is not bounded, we can't do that yet.
> > 
> > Doesn't the call from __rcu_pending_enqueue() to process_finished_items()
> > serve this purpose?  After all, the case that causes trouble is the one
> > where lots of things are being very frequently queued.
> 
> No, because that's unpredictable, and we don't process pending items in
> enqueue() unless we know we can sleep (i.e., we don't do it if the
> caller is latency sensitive).

It is possible to do this in an extremely lightweight fashion in the
common case.

							Thanx, Paul

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [PATCH 6/9] rcu: rcu_pending
  2024-08-26 14:44       ` Paul E. McKenney
@ 2024-08-26 15:17         ` Kent Overstreet
  2024-08-26 16:01           ` Paul E. McKenney
  0 siblings, 1 reply; 24+ messages in thread
From: Kent Overstreet @ 2024-08-26 15:17 UTC (permalink / raw)
  To: Paul E. McKenney
  Cc: rcu, linux-kernel, vbabka, roman.gushchin, 42.hyeyoo, akpm, cl,
	mhocko, urezki, neeraj.upadhyay

On Mon, Aug 26, 2024 at 07:44:12AM GMT, Paul E. McKenney wrote:
> I have gotten requests for a variant of SRCU that dispenses with the
> read-side memory barriers, which would allow you to dispense with RCU.
> Maybe an srcu_read_lock_lite() and srcu_read_unlock_lite(), similar to
> the _nmisafe() variants.  There is always a price, and in this case the
> price would be that srcu_read_lock_lite() and srcu_read_unlock_lite()
> be restricted to code regions where RCU is watching.  But from what I
> know, fs code must already abide by that restriction.
> 
> Would that help?

I don't think that helps here, but that is something I'd be interested
in.

What I would like for this is:
 - a single #define for RCU_NR_OLDSTATES for both RCU and SRCU

 - a way of getting the current RCU gp sequence number without a
   function call, since we hit that on every single enqueue(). One of my
   development versions added a function to RCU and SRCU for getting a
   pointer to the internal sequence number, which we'd call at init
   time; this lets us skip the function call and a branch. 

Another thing that might make the code a bit easier to reason about is
if rcu_read_lock() also worked as an srcu_read_lock() - is something the
SRCU variant you're talking about previously would provide?

In my current version of the code, we call __get_state_synchronize_rcu()
after we've disabled interrupts; we know the rcu gp seq isn't going to
tick while we're in the critical path. But this doesn't apply if it's
for SRCU, and I don't want to add if (src) srcu_read_lock() branches to
it.

Not at all essential, the races that result from this are harmless, but
if we e.g. decide it's worthwhile to only kick off a gp if it hasn't
ticked (i.e. only kick rcu if we're still on seq of the object being
enqueued) it'd be nice.

> > On the enqueue side, which is the fastpath, this uses a cursor - we're
> > not walking the radix tree every time.
> > 
> > On the processing side, where we're kfree_bulk()ing entire radix tree
> > nodes, the radix tree is going to have better cache locality than a list
> > of pages.
> 
> Interesting.  What testing or analysis did you do in the course of
> arriving at this conclusion?  In the meantime I will assume that the
> analysis involves your radix-tree nodes being more than one page in size.

No, the radix tree nodes are 512 bytes; that's been sufficient in my
testing.

(also, please don't refer to PAGE_SIZE outside of mm/ code without a
_good_ reason; that's something we've been trying to clean up.)

I'll try to post some performance numbers when I have some time.

> It might or might not be reasonable, depending on what is going on in the
> rest of the system.  The current kfree_rcu() code can run the system out
> of full pages, but it won't impede other code allocating smaller blocks
> of memory.  We could easily change it to allocate individual rcu_head
> structures, but doing so would not necessarily be a win in OOM conditions,
> again, depending on what else is going on.

As long as the thread calling kvfree_rcu_mightsleep() can block without
blocking memory reclaim it'll be safe. We might want to tweak the GFP
flags so to tell the allocator "don't try too hard, bail out so we can
check if the gp has expired".

> I took a quick look at __rcu_pending_enqueue() and my eyes are still
> bleeding.  Concatenating linked list of pages is way simpler, way faster,
> and way more robust.

Funny, I had the same thoughts trying to read your code... :)

But, most of __rcu_pending_enqueue() is slowpaths; the fastpath is just

	objs = get_object_radix(p, seq); /* lookup seq in p->objs */

        *objs->cursor++ = ptr ?: head;                                                                                                       
        /* zero cursor if we hit the end of a radix tree node: */                                                                            
        if (!(((ulong) objs->cursor) & (GENRADIX_NODE_SIZE - 1)))                                                                            
                objs->cursor = NULL;                                                                                                         
        start_gp = !objs->nr;                                                                                                                
        objs->nr++;

So I think the performance concerns are moot, and for robustness -
memory allocation failure always turns into "use the linked lists of
objects", which works similarly to the old code.

> > But yes, using it for call_rcu() would need more performance
> > justification. I haven't seen workloads where call_rcu() performance
> > particularly matters, so it's not something I'm pushing for - I included
> > that because it's minimal code and other people might know of workloads
> > where we do want it.
> 
> Plus, unlike kfree_rcu() post-grace-period handling, call_rcu() callback
> functions usually access the memory block passed to them, which means
> that they are incurring that per-element cache miss in any case.

True. But this would allow us to prefetch those objects (several
iterations in advance).

> > > > Ideally we would be getting a callback every time a grace period
> > > > completes for which we have objects, but that would require multiple
> > > > rcu_heads in flight, and since the number of gp sequence numbers with
> > > > uncompleted callbacks is not bounded, we can't do that yet.
> > > 
> > > Doesn't the call from __rcu_pending_enqueue() to process_finished_items()
> > > serve this purpose?  After all, the case that causes trouble is the one
> > > where lots of things are being very frequently queued.
> > 
> > No, because that's unpredictable, and we don't process pending items in
> > enqueue() unless we know we can sleep (i.e., we don't do it if the
> > caller is latency sensitive).
> 
> It is possible to do this in an extremely lightweight fashion in the
> common case.

Just processing a few items? hmm, would we want to though, when
otherwise we'd be calling kfree_bulk()? I think icache locality means
we'd generally prefer not to.

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [PATCH 6/9] rcu: rcu_pending
  2024-08-26 15:17         ` Kent Overstreet
@ 2024-08-26 16:01           ` Paul E. McKenney
  2024-08-26 17:09             ` Kent Overstreet
  0 siblings, 1 reply; 24+ messages in thread
From: Paul E. McKenney @ 2024-08-26 16:01 UTC (permalink / raw)
  To: Kent Overstreet
  Cc: rcu, linux-kernel, vbabka, roman.gushchin, 42.hyeyoo, akpm, cl,
	mhocko, urezki, neeraj.upadhyay

On Mon, Aug 26, 2024 at 11:17:29AM -0400, Kent Overstreet wrote:
> On Mon, Aug 26, 2024 at 07:44:12AM GMT, Paul E. McKenney wrote:
> > I have gotten requests for a variant of SRCU that dispenses with the
> > read-side memory barriers, which would allow you to dispense with RCU.
> > Maybe an srcu_read_lock_lite() and srcu_read_unlock_lite(), similar to
> > the _nmisafe() variants.  There is always a price, and in this case the
> > price would be that srcu_read_lock_lite() and srcu_read_unlock_lite()
> > be restricted to code regions where RCU is watching.  But from what I
> > know, fs code must already abide by that restriction.
> > 
> > Would that help?
> 
> I don't think that helps here, but that is something I'd be interested
> in.
> 
> What I would like for this is:
>  - a single #define for RCU_NR_OLDSTATES for both RCU and SRCU
> 
>  - a way of getting the current RCU gp sequence number without a
>    function call, since we hit that on every single enqueue(). One of my
>    development versions added a function to RCU and SRCU for getting a
>    pointer to the internal sequence number, which we'd call at init
>    time; this lets us skip the function call and a branch. 
> 
> Another thing that might make the code a bit easier to reason about is
> if rcu_read_lock() also worked as an srcu_read_lock() - is something the
> SRCU variant you're talking about previously would provide?

Yes, srcu_read_lock_lite() would return an value that you later pass
to the corresponding srcu_read_unlock_lite(), just as srcu_read_lock()
and srcu_read_unlock() do now.

And they would have the same grace-period algorithm, and thus the same
value for NUM_ACTIVE_SRCU_POLL_OLDSTATE, as requested above.

But get_state_synchronize_srcu() is still a function call.  But the
offer of a function that checks multiple grace-period-state cookies in
one call still stands.

> In my current version of the code, we call __get_state_synchronize_rcu()
> after we've disabled interrupts; we know the rcu gp seq isn't going to
> tick while we're in the critical path. But this doesn't apply if it's
> for SRCU, and I don't want to add if (src) srcu_read_lock() branches to
> it.

Actually, disabling interrupts does *not* prevent RCU's grace-period 
sequence number from changing.  For example, the following really can
happen:

o	RCU notices that the current grace period can end.

o	A CPU disables interrupts here.

o	A call to get_state_synchronize_rcu() returns a cookie
	corresponding to the end of the grace period following the
	current one.

o	RCU ends the current grace period, therefore updating the
	grace-period sequence number.

o	RCU starts a new grace period, therefore updating the
	grace-period sequence number once again.

o	RCU cannot complete this new grace period until that CPU
	re-enables interrupts, but it has already updated its grace-period
	sequence number not once, but twice.

So if your code knows that RCU's grace-period sequence number cannot
change while a given CPU has interrupts disabled, that code has a bug.
A low-probability bug, perhaps, but if your code is running on enough
systems, it will make its presence known.

> Not at all essential, the races that result from this are harmless, but
> if we e.g. decide it's worthwhile to only kick off a gp if it hasn't
> ticked (i.e. only kick rcu if we're still on seq of the object being
> enqueued) it'd be nice.

Why not call get_state_synchronize_rcu(), and ask for a new grace period
if the value returned is different than that from the previous call?

> > > On the enqueue side, which is the fastpath, this uses a cursor - we're
> > > not walking the radix tree every time.
> > > 
> > > On the processing side, where we're kfree_bulk()ing entire radix tree
> > > nodes, the radix tree is going to have better cache locality than a list
> > > of pages.
> > 
> > Interesting.  What testing or analysis did you do in the course of
> > arriving at this conclusion?  In the meantime I will assume that the
> > analysis involves your radix-tree nodes being more than one page in size.
> 
> No, the radix tree nodes are 512 bytes; that's been sufficient in my
> testing.
> 
> (also, please don't refer to PAGE_SIZE outside of mm/ code without a
> _good_ reason; that's something we've been trying to clean up.)

Moving kfree_rcu() into mm/ will fix that problem.  Plus we did discuss
this with the mm folks back in the day, especially about the GFP flags,
so apparently there was a sufficiently good reason.  Maybe still is.

> I'll try to post some performance numbers when I have some time.

I look forward to seeing what you come up with.

> > It might or might not be reasonable, depending on what is going on in the
> > rest of the system.  The current kfree_rcu() code can run the system out
> > of full pages, but it won't impede other code allocating smaller blocks
> > of memory.  We could easily change it to allocate individual rcu_head
> > structures, but doing so would not necessarily be a win in OOM conditions,
> > again, depending on what else is going on.
> 
> As long as the thread calling kvfree_rcu_mightsleep() can block without
> blocking memory reclaim it'll be safe. We might want to tweak the GFP
> flags so to tell the allocator "don't try too hard, bail out so we can
> check if the gp has expired".

There can be quite a difference between "safe" and "does well in actual
workloads".

> > I took a quick look at __rcu_pending_enqueue() and my eyes are still
> > bleeding.  Concatenating linked list of pages is way simpler, way faster,
> > and way more robust.
> 
> Funny, I had the same thoughts trying to read your code... :)

Amazing how much easier it is to generate new code than to understand
existing code, isn't it?  ;-)

> But, most of __rcu_pending_enqueue() is slowpaths; the fastpath is just
> 
> 	objs = get_object_radix(p, seq); /* lookup seq in p->objs */
> 
>         *objs->cursor++ = ptr ?: head;                                                                                                       
>         /* zero cursor if we hit the end of a radix tree node: */                                                                            
>         if (!(((ulong) objs->cursor) & (GENRADIX_NODE_SIZE - 1)))                                                                            
>                 objs->cursor = NULL;                                                                                                         
>         start_gp = !objs->nr;                                                                                                                
>         objs->nr++;
> 
> So I think the performance concerns are moot, and for robustness -
> memory allocation failure always turns into "use the linked lists of
> objects", which works similarly to the old code.

Bugs live on slowpaths as well as on fastpaths.  In fact, they tend to
accumulate there.

> > > But yes, using it for call_rcu() would need more performance
> > > justification. I haven't seen workloads where call_rcu() performance
> > > particularly matters, so it's not something I'm pushing for - I included
> > > that because it's minimal code and other people might know of workloads
> > > where we do want it.
> > 
> > Plus, unlike kfree_rcu() post-grace-period handling, call_rcu() callback
> > functions usually access the memory block passed to them, which means
> > that they are incurring that per-element cache miss in any case.
> 
> True. But this would allow us to prefetch those objects (several
> iterations in advance).

I need to see a CPU on which this actually make a significant difference
before adding this sort of complexity.

> > > > > Ideally we would be getting a callback every time a grace period
> > > > > completes for which we have objects, but that would require multiple
> > > > > rcu_heads in flight, and since the number of gp sequence numbers with
> > > > > uncompleted callbacks is not bounded, we can't do that yet.
> > > > 
> > > > Doesn't the call from __rcu_pending_enqueue() to process_finished_items()
> > > > serve this purpose?  After all, the case that causes trouble is the one
> > > > where lots of things are being very frequently queued.
> > > 
> > > No, because that's unpredictable, and we don't process pending items in
> > > enqueue() unless we know we can sleep (i.e., we don't do it if the
> > > caller is latency sensitive).
> > 
> > It is possible to do this in an extremely lightweight fashion in the
> > common case.
> 
> Just processing a few items? hmm, would we want to though, when
> otherwise we'd be calling kfree_bulk()? I think icache locality means
> we'd generally prefer not to.

You might not want to yet, but you eventually would want this.

							Thanx, Paul

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [PATCH 6/9] rcu: rcu_pending
  2024-08-26 16:01           ` Paul E. McKenney
@ 2024-08-26 17:09             ` Kent Overstreet
  2024-08-30 19:01               ` Paul E. McKenney
  0 siblings, 1 reply; 24+ messages in thread
From: Kent Overstreet @ 2024-08-26 17:09 UTC (permalink / raw)
  To: Paul E. McKenney
  Cc: rcu, linux-kernel, vbabka, roman.gushchin, 42.hyeyoo, akpm, cl,
	mhocko, urezki, neeraj.upadhyay

On Mon, Aug 26, 2024 at 09:01:54AM GMT, Paul E. McKenney wrote:
> But get_state_synchronize_srcu() is still a function call.  But the
> offer of a function that checks multiple grace-period-state cookies in
> one call still stands.

It shouldn't be though, it's just reading a sequence number - I'd much
prefer if it could be at least a static inline.

Which you won't want to do, because it would mean exposing RCU private
data structures; hence my approach of exposing an RCU-only api for
getting a pointer to the sequence number.

> 
> > In my current version of the code, we call __get_state_synchronize_rcu()
> > after we've disabled interrupts; we know the rcu gp seq isn't going to
> > tick while we're in the critical path. But this doesn't apply if it's
> > for SRCU, and I don't want to add if (src) srcu_read_lock() branches to
> > it.
> 
> Actually, disabling interrupts does *not* prevent RCU's grace-period 
> sequence number from changing.  For example, the following really can
> happen:
> 
> o	RCU notices that the current grace period can end.
> 
> o	A CPU disables interrupts here.
> 
> o	A call to get_state_synchronize_rcu() returns a cookie
> 	corresponding to the end of the grace period following the
> 	current one.
> 
> o	RCU ends the current grace period, therefore updating the
> 	grace-period sequence number.
> 
> o	RCU starts a new grace period, therefore updating the
> 	grace-period sequence number once again.
> 
> o	RCU cannot complete this new grace period until that CPU
> 	re-enables interrupts, but it has already updated its grace-period
> 	sequence number not once, but twice.
> 
> So if your code knows that RCU's grace-period sequence number cannot
> change while a given CPU has interrupts disabled, that code has a bug.
> A low-probability bug, perhaps, but if your code is running on enough
> systems, it will make its presence known.

Ok, good to know

> 
> > Not at all essential, the races that result from this are harmless, but
> > if we e.g. decide it's worthwhile to only kick off a gp if it hasn't
> > ticked (i.e. only kick rcu if we're still on seq of the object being
> > enqueued) it'd be nice.
> 
> Why not call get_state_synchronize_rcu(), and ask for a new grace period
> if the value returned is different than that from the previous call?

We don't want to falsely think the object expires later than it actually
does, and have more accumulated work than we need to.

> > Funny, I had the same thoughts trying to read your code... :)
> 
> Amazing how much easier it is to generate new code than to understand
> existing code, isn't it?  ;-)

I would have much preferred if your existing code worked with SRCU. You
think I'm doing this for fun?

> > > Plus, unlike kfree_rcu() post-grace-period handling, call_rcu() callback
> > > functions usually access the memory block passed to them, which means
> > > that they are incurring that per-element cache miss in any case.
> > 
> > True. But this would allow us to prefetch those objects (several
> > iterations in advance).
> 
> I need to see a CPU on which this actually make a significant difference
> before adding this sort of complexity.

We would of course want benchmarks to show that this was worthwhile
before switching call_rcu(), since absent a performance improvement we'd
want to stick with the approach that doesn't allocate memory.

> > Just processing a few items? hmm, would we want to though, when
> > otherwise we'd be calling kfree_bulk()? I think icache locality means
> > we'd generally prefer not to.
> 
> You might not want to yet, but you eventually would want this.

Because?

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [PATCH 6/9] rcu: rcu_pending
  2024-08-26 17:09             ` Kent Overstreet
@ 2024-08-30 19:01               ` Paul E. McKenney
  2024-09-01  0:03                 ` Kent Overstreet
  0 siblings, 1 reply; 24+ messages in thread
From: Paul E. McKenney @ 2024-08-30 19:01 UTC (permalink / raw)
  To: Kent Overstreet
  Cc: rcu, linux-kernel, vbabka, roman.gushchin, 42.hyeyoo, akpm, cl,
	mhocko, urezki, neeraj.upadhyay

On Mon, Aug 26, 2024 at 01:09:15PM -0400, Kent Overstreet wrote:
> On Mon, Aug 26, 2024 at 09:01:54AM GMT, Paul E. McKenney wrote:
> > But get_state_synchronize_srcu() is still a function call.  But the
> > offer of a function that checks multiple grace-period-state cookies in
> > one call still stands.
> 
> It shouldn't be though, it's just reading a sequence number - I'd much
> prefer if it could be at least a static inline.
> 
> Which you won't want to do, because it would mean exposing RCU private
> data structures; hence my approach of exposing an RCU-only api for
> getting a pointer to the sequence number.

True enough.

In addition, I have not yet been able to come up with any safe use of
this sequence number that does not require memory ordering.  And -that-
I most emphatically won't want to export to the caller.

Hence my offer of a function taking multiple grace-period-state cookies
in one go, allowing the memory-ordering overhead to be amortized over
the set of cookies.

> > > In my current version of the code, we call __get_state_synchronize_rcu()
> > > after we've disabled interrupts; we know the rcu gp seq isn't going to
> > > tick while we're in the critical path. But this doesn't apply if it's
> > > for SRCU, and I don't want to add if (src) srcu_read_lock() branches to
> > > it.
> > 
> > Actually, disabling interrupts does *not* prevent RCU's grace-period 
> > sequence number from changing.  For example, the following really can
> > happen:
> > 
> > o	RCU notices that the current grace period can end.
> > 
> > o	A CPU disables interrupts here.
> > 
> > o	A call to get_state_synchronize_rcu() returns a cookie
> > 	corresponding to the end of the grace period following the
> > 	current one.
> > 
> > o	RCU ends the current grace period, therefore updating the
> > 	grace-period sequence number.
> > 
> > o	RCU starts a new grace period, therefore updating the
> > 	grace-period sequence number once again.
> > 
> > o	RCU cannot complete this new grace period until that CPU
> > 	re-enables interrupts, but it has already updated its grace-period
> > 	sequence number not once, but twice.
> > 
> > So if your code knows that RCU's grace-period sequence number cannot
> > change while a given CPU has interrupts disabled, that code has a bug.
> > A low-probability bug, perhaps, but if your code is running on enough
> > systems, it will make its presence known.
> 
> Ok, good to know
> 
> > > Not at all essential, the races that result from this are harmless, but
> > > if we e.g. decide it's worthwhile to only kick off a gp if it hasn't
> > > ticked (i.e. only kick rcu if we're still on seq of the object being
> > > enqueued) it'd be nice.
> > 
> > Why not call get_state_synchronize_rcu(), and ask for a new grace period
> > if the value returned is different than that from the previous call?
> 
> We don't want to falsely think the object expires later than it actually
> does, and have more accumulated work than we need to.

Yes, that can happen, but grace periods are long and that race window
will normally be very short.  It will not matter in actual practice.

> > > Funny, I had the same thoughts trying to read your code... :)
> > 
> > Amazing how much easier it is to generate new code than to understand
> > existing code, isn't it?  ;-)
> 
> I would have much preferred if your existing code worked with SRCU. You
> think I'm doing this for fun?

To be fair, the "for fun" reason seemed to me to be one of the more
positive explanations for your providing an unsolicited rip-and-replace
patch for kfree_rcu() without prior consultation. ;-)

But, to your point, why haven't we already implemented kfree_srcu()?

Because in v6.10, there are six use cases for it, and they appear to have
infrequent updates.  Thus, kfree_srcu() is not yet not worth its weight.

> > > > Plus, unlike kfree_rcu() post-grace-period handling, call_rcu() callback
> > > > functions usually access the memory block passed to them, which means
> > > > that they are incurring that per-element cache miss in any case.
> > > 
> > > True. But this would allow us to prefetch those objects (several
> > > iterations in advance).
> > 
> > I need to see a CPU on which this actually make a significant difference
> > before adding this sort of complexity.
> 
> We would of course want benchmarks to show that this was worthwhile
> before switching call_rcu(), since absent a performance improvement we'd
> want to stick with the approach that doesn't allocate memory.

Agreed.

Furthermore, any approach that does allocate memory needs a non-allocating
fallback.  Out-of-memory deadlocks are not fun.

> > > Just processing a few items? hmm, would we want to though, when
> > > otherwise we'd be calling kfree_bulk()? I think icache locality means
> > > we'd generally prefer not to.
> > 
> > You might not want to yet, but you eventually would want this.
> 
> Because?

TL;DR:  Lessons learned from bitter experience.

For more detail, please read on.

Because certain inconvenient laws of physics (and I am looking at *you*,
finite speed of light and atomic nature of matter!) mean that global
agreement across a computer system is expensive.  This is especially
a problem if synchronous global agreement is required, which is one
reason why reader-writer locking is so slow, one way or another.
(You can either make readers slow or you can make updaters *really* slow.)

RCU does not completely avoid the cost of global agreement.  After all,
at the end of a grace period, there must be global agreement that all
pre-existing readers have completed.  But the computation of that global
agreement can be (and is) spread over time.  This spreading has two
good effects: First, it permits CPUs and tasks to use lighter-weight
operations for high-frequency operations such as rcu_read_lock(), and
second, the grace-period requests that arrived during one grace-period
computation can all be satisfied by the next grace-period computation,
in turn permitting the overhead of that next computation to be amortized
over all those requests.  Which is one reason why grace periods are not
unconditionally expedited.

But it is all high-amortization fun and games only until there is risk
of exhausting memory.  At that point, RCU takes steps to expedite
the current grace period, albeit in a light-weight fashion compared
to synchronize_rcu_expedited().  It has proven unwise to wait for mm
to complain to RCU (for example, via a shrinker), so RCU does this
lightweight expediting using heuristics based on the number and rate of
callbacks on each CPU.  And yes, RCU also uses shrinkers, just not as
the first line of defense.

So why don't we have such heuristics in SRCU?

Because SRCU has not yet been subjected to workloads that flood SRCU with
callbacks, so it has not yet proven necessary.  Obviously, should SRCU
start getting flooded, SRCU will start taking whatever forms of evasive
action are indicated by the situation at hand, quite likely including
expediting the current SRCU grace period.

Until that time, we keep it simple.  Or simpler, anyway.

							Thanx, Paul

^ permalink raw reply	[flat|nested] 24+ messages in thread

* Re: [PATCH 6/9] rcu: rcu_pending
  2024-08-30 19:01               ` Paul E. McKenney
@ 2024-09-01  0:03                 ` Kent Overstreet
  0 siblings, 0 replies; 24+ messages in thread
From: Kent Overstreet @ 2024-09-01  0:03 UTC (permalink / raw)
  To: Paul E. McKenney
  Cc: rcu, linux-kernel, vbabka, roman.gushchin, 42.hyeyoo, akpm, cl,
	mhocko, urezki, neeraj.upadhyay

On Fri, Aug 30, 2024 at 12:01:26PM GMT, Paul E. McKenney wrote:
> On Mon, Aug 26, 2024 at 01:09:15PM -0400, Kent Overstreet wrote:
> > On Mon, Aug 26, 2024 at 09:01:54AM GMT, Paul E. McKenney wrote:
> > > But get_state_synchronize_srcu() is still a function call.  But the
> > > offer of a function that checks multiple grace-period-state cookies in
> > > one call still stands.
> > 
> > It shouldn't be though, it's just reading a sequence number - I'd much
> > prefer if it could be at least a static inline.
> > 
> > Which you won't want to do, because it would mean exposing RCU private
> > data structures; hence my approach of exposing an RCU-only api for
> > getting a pointer to the sequence number.
> 
> True enough.
> 
> In addition, I have not yet been able to come up with any safe use of
> this sequence number that does not require memory ordering.  And -that-
> I most emphatically won't want to export to the caller.
> 
> Hence my offer of a function taking multiple grace-period-state cookies
> in one go, allowing the memory-ordering overhead to be amortized over
> the set of cookies.

I'd have to have an idea of what that would look like to comment.

> > We don't want to falsely think the object expires later than it actually
> > does, and have more accumulated work than we need to.
> 
> Yes, that can happen, but grace periods are long and that race window
> will normally be very short.  It will not matter in actual practice.

True, it's not much of a practical consideration.

Part of the reason I bring this up is I'm coming to this with experience
with the bcachefs journal flushing API, which feels quite similar in a
lot of ways - we have items associated with sequence numbers, and we may
later need to flush and wait for a given sequence number to be
persistent.

That code just feels easier to reason about when we're being explicit
about which sequence number we're talking about whenever possible.
Granted it's not as necessary with RCU - but who knows, maybe someday?
Just wanted to share the thought.

> > I would have much preferred if your existing code worked with SRCU. You
> > think I'm doing this for fun?
> 
> To be fair, the "for fun" reason seemed to me to be one of the more
> positive explanations for your providing an unsolicited rip-and-replace
> patch for kfree_rcu() without prior consultation. ;-)

Well, partly yes; one of the things that I derive the most satisfaction
from is clean modular solutions to interesting problems with nice
simple APIs, where I get to slim down the original chunk of code and cut
out something grotty :)

There hit a point when working on the btree key cache lock contention
issues where I realized I was onto something nice, and thought - "this
might be worth doing right".

And I still feel that way. Slimming down rcu/tree.c seems like a nice
win, and if we could slim down the slab code by pulling out
SLAB_TYPESAFE_BY_RCU too, that'd be a nice win.

I really like codebases that are nicely organized, where individual
components aren't too big and have a clear purpose, where I can still
find my way around them years later :)

> But, to your point, why haven't we already implemented kfree_srcu()?
> 
> Because in v6.10, there are six use cases for it, and they appear to have
> infrequent updates.  Thus, kfree_srcu() is not yet not worth its weight.

Only if you leave out bcachefs.

> > We would of course want benchmarks to show that this was worthwhile
> > before switching call_rcu(), since absent a performance improvement we'd
> > want to stick with the approach that doesn't allocate memory.
> 
> Agreed.
> 
> Furthermore, any approach that does allocate memory needs a non-allocating
> fallback.  Out-of-memory deadlocks are not fun.

Try telling that to the filesystem people who are currently obsessed
with __GFP_NOFAIL...

But the fallbacks are already there - if you didn't get that far through
reading the code, the -ENOMEM fallback works pretty much exactly like
how call_rcu() works currently, with three lists of pending objects (one
for each outstanding grace period, and a list for expired objects).

> > > > Just processing a few items? hmm, would we want to though, when
> > > > otherwise we'd be calling kfree_bulk()? I think icache locality means
> > > > we'd generally prefer not to.
> > > 
> > > You might not want to yet, but you eventually would want this.
> > 
> > Because?
> 
> TL;DR:  Lessons learned from bitter experience.
> 
> For more detail, please read on.
> 
> Because certain inconvenient laws of physics (and I am looking at *you*,
> finite speed of light and atomic nature of matter!) mean that global
> agreement across a computer system is expensive.  This is especially
> a problem if synchronous global agreement is required, which is one
> reason why reader-writer locking is so slow, one way or another.
> (You can either make readers slow or you can make updaters *really* slow.)
> 
> RCU does not completely avoid the cost of global agreement.  After all,
> at the end of a grace period, there must be global agreement that all
> pre-existing readers have completed.  But the computation of that global
> agreement can be (and is) spread over time.  This spreading has two
> good effects: First, it permits CPUs and tasks to use lighter-weight
> operations for high-frequency operations such as rcu_read_lock(), and
> second, the grace-period requests that arrived during one grace-period
> computation can all be satisfied by the next grace-period computation,
> in turn permitting the overhead of that next computation to be amortized
> over all those requests.  Which is one reason why grace periods are not
> unconditionally expedited.

*nod* There's a fun algorithm for ticking the global RCU sequence
number, is there not? I assume it's where tree-RCU gets its name, but I
wasn't able to find documented or in the code.

My version goes like: construct a binary tree, where every CPU is a leaf
node. When a CPU hits a quiescent point, it compares its node's sequence
number to the root; if they're the same, increment its node.

On increment, check sibling node; if they're now identical, tick parent
node; recurse up to the root.

Do I have that right?

> But it is all high-amortization fun and games only until there is risk
> of exhausting memory.  At that point, RCU takes steps to expedite
> the current grace period, albeit in a light-weight fashion compared
> to synchronize_rcu_expedited().  It has proven unwise to wait for mm
> to complain to RCU (for example, via a shrinker), so RCU does this
> lightweight expediting using heuristics based on the number and rate of
> callbacks on each CPU.  And yes, RCU also uses shrinkers, just not as
> the first line of defense.

So that's interesting. I think it would be really valuable to get those
heuristics better documented (and perhaps better factored out; if we got
them in rcu_pending we might make it easier for mm people to find and
understand that code, vs. having to dig through rcu/tree.c).

Speaking from painful personal experience, memory reclaim issues are one
of the most frustrating things to debug - they never crop up in a
controlled environment, it's always users hitting them at the worst time
and I always have very little to go on, and there's _so many_ things
that can go wrong.

Hence my current work to improve shrinker debugging and add it to the
show_mem.c oom report, and my recent ask for actual byte counters on
amount of memory stranded by RCU - it would be lovely if we could have
that included in the show_mem.c report (perhaps when heuristics tell us
they're likely to be relevant), along with anything else we might want
when debugging (are we waiting on an unusually long grace period?).

^ permalink raw reply	[flat|nested] 24+ messages in thread

end of thread, other threads:[~2024-09-01  0:03 UTC | newest]

Thread overview: 24+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2024-08-19 16:59 [PATCH 0/9] rcu_pending Kent Overstreet
2024-08-19 16:59 ` [PATCH 1/9] lib/generic-radix-tree.c: genradix_ptr_inlined() Kent Overstreet
2024-08-19 16:59 ` [PATCH 2/9] lib/generic-radix-tree.c: add preallocation Kent Overstreet
2024-08-19 16:59 ` [PATCH 3/9] darray: lift from bcachefs Kent Overstreet
2024-08-21  1:00   ` Jeff Johnson
2024-08-19 16:59 ` [PATCH 4/9] vmalloc: is_vmalloc_addr_inlined() Kent Overstreet
2024-08-19 16:59 ` [PATCH 5/9] rcu: delete lockdep_assert_irqs_enabled() assert in start_poll_synchronize_rcu_common() Kent Overstreet
2024-08-19 21:11   ` Paul E. McKenney
2024-08-19 16:59 ` [PATCH 6/9] rcu: rcu_pending Kent Overstreet
2024-08-19 22:58   ` Paul E. McKenney
2024-08-19 23:59     ` Kent Overstreet
2024-08-26 14:44       ` Paul E. McKenney
2024-08-26 15:17         ` Kent Overstreet
2024-08-26 16:01           ` Paul E. McKenney
2024-08-26 17:09             ` Kent Overstreet
2024-08-30 19:01               ` Paul E. McKenney
2024-09-01  0:03                 ` Kent Overstreet
2024-08-19 16:59 ` [PATCH 7/9] bcachefs: Rip out freelists from btree key cache Kent Overstreet
2024-08-19 16:59 ` [PATCH 8/9] bcachefs: key cache can now allocate from pending Kent Overstreet
2024-08-19 16:59 ` [PATCH 9/9] rcu: Switch kvfree_rcu() to new rcu_pending Kent Overstreet
2024-08-19 22:18   ` Paul E. McKenney
2024-08-19 23:05     ` Kent Overstreet
2024-08-19 23:07 ` [PATCH 0/9] rcu_pending Paul E. McKenney
2024-08-19 23:29   ` Kent Overstreet

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox