* [PATCH, RFC] RCU: New scalable, multi-GP and preemptible RCU implementation
@ 2010-08-30 9:31 Lai Jiangshan
2010-08-30 23:57 ` Paul E. McKenney
0 siblings, 1 reply; 8+ messages in thread
From: Lai Jiangshan @ 2010-08-30 9:31 UTC (permalink / raw)
To: Paul E. McKenney, Linus Torvalds, Ingo Molnar, Andrew Morton,
David Miller, Manfred Spraul, Mathieu Desnoyers, Steven Rostedt,
Thomas Gleixner, Peter Zijlstra, LKML, dipankar
0) contents
1) overview
2) scalable
3) multi-GP
4) integrated expedited synchronize_rcu
5) preemptible
6) top-down implies
7) speech
8) core rcuring
1) overview
This is a new RCU implementation: RCURING. It uses a ring atomic counters
to control the completion of GPs and a ring callbacks batches to process
the callbacks.
In 2008, I think out the idea, and wrote the first 300lines code of rcuring.c,
but I'm too lazy to finish it
2) scalable
There is only 2 locks in every RCU domain(rcu_bh, rcu_sched, rcu_preempt),
one of them is for misc things, it is a infrequency lock, the other is
for advancing complete, it is frequency lock. But somebody else would
very probably do the same work for me if somebody else is holding this lock,
so we alway use spin_trylock() for this lock, so it is scalable.
We don't need any lock for reporting quiescent state, we just need 2
atomic operations for every RCU domain.
3) multi-GP
All old rcu implementations have only one waiting GP.
time -----ta-tb--------------------tc----------------td------------->
GP -----|------GP1---------------|--------GP2------|------------->
cpu#x ........|---------------------------------------|...............>
|->insert a callback |->handle callback
the callback have to wait a partial GP1, and a full GP2, it may wait
near 2 GPs in worse case because it have to wait the rcu read sites witch
start at (tb, tc] which are needless to wait. In average, a callback
have to wait (1 + 1/(1+1))= 1.5GPs
if we can have three simultaneous waiting GPs:
GPs .....|------GP1---------------|..........................
...........|--------GP2-----------|....................
....................|---------GP3---------------|........
cpu#x ........|-------------------------|..........................>
|->insert a callback |->handle callback
The callback is handled more quickly, In average, a callback have to wait
(1 + 1/(3+1))=1.25GPs
if we can have X simultaneous waiting GPs, a callback have to wait
(1 + 1/(X+1)) GPs in average. The default X in this patch is 15
(including the reserved GPs).
4) integrated expedited synchronize_rcu
expedited synchronize_rcu is implemented in rcuring.c, not in other files.
The whole of it is less than 50 lines of code. we just reserve several
GPs for expedited synchronize_rcu, when a expedited callback is inserted,
we start a new GP immediately(we have reserved GPs, this step will
very probably success), and then force this new GP to be completed quickly.
(thanks to multi-GP)
5) preemptible
new simple preemptible code, rcu_read_lock/unlock() is simple and is inline function.
add new kernel-offset.c to avoid recursively header files inclusion.
6) top-down implies
irq and local_irq_disable() implies preempt_disable()/rcu_read_lock_sched(),
irq and local_irq_disable() implies rcu_read_lock(). (*)
preempt_disable()/rcu_read_lock_sched() implies rcu_read_lock(). (*)
*: In preemptible rcutree/rcutiny, it is false.
7) speech
I will give a speech about RCU and this RCU implementation
in Japan linuxcon 2010. Welcome to Japan Linuxcon 2010.
8) core rcuring
(Comments and document is still been writing)
all read_read_lock/unlock_domain() is nested in a coresponding
rcuring_lock/unlock(), and the algorithm handles a rcuring_lock/unlock()
ciritcal region as a full rcu read site ciritcal region.
read site:
rcuring_lock()
lock a complete, and ensure this locked_complete - done_complete(1) > 0
curr_complete(1) - locked_complete >= 0
mb(1)
rcu_derefrence(ptr)
rcuring_unlock()
mb(2)
release its locked_complete.
update site:
rcu_assign_pointer(ptr, new)
(handle callback:call_rcu()+rcu_do_batch())
(call_rcu():)
mb(3)
get the curr_complete(2)
set callback's complete as this curr_complete(2).
(we don't have any field to record the callback's complete, we just
emulate it by rdp->done_complete and the position of this callback
in rdp->batch.)
time pass... until done_complete(2) - callback's complete > 0
(rcu_do_batch():)
mb(4)
call the callback: (free old ptr, etc.)
Signed-off-by: Lai Jiangshan <laijs@cn.fujitsu.com>
---
Kbuild | 75 ++-
include/linux/rcupdate.h | 6
include/linux/rcuring.h | 195 +++++++++
include/linux/sched.h | 6
init/Kconfig | 32 +
kernel/Makefile | 1
kernel/kernel-offsets.c | 20
kernel/rcuring.c | 1002 +++++++++++++++++++++++++++++++++++++++++++++++
kernel/sched.c | 2
kernel/time/tick-sched.c | 4
10 files changed, 1324 insertions(+), 19 deletions(-)
diff --git a/Kbuild b/Kbuild
index e3737ad..bce37cb 100644
--- a/Kbuild
+++ b/Kbuild
@@ -3,7 +3,19 @@
# This file takes care of the following:
# 1) Generate bounds.h
# 2) Generate asm-offsets.h (may need bounds.h)
-# 3) Check for missing system calls
+# 3) Generate kernel-offsets.h
+# 4) Check for missing system calls
+
+# Default sed regexp - multiline due to syntax constraints
+define sed-y
+ "/^->/{s:->#\(.*\):/* \1 */:; \
+ s:^->\([^ ]*\) [\$$#]*\([^ ]*\) \(.*\):#define \1 \2 /* \3 */:; \
+ s:->::; p;}"
+endef
+
+quiet_cmd_offsets_cc_s_c = CC $(quiet_modtag) $@
+cmd_offsets_cc_s_c = $(CC) -D__GENARATING_OFFSETS__ \
+ $(c_flags) -fverbose-asm -S -o $@ $<
#####
# 1) Generate bounds.h
@@ -43,22 +55,14 @@ $(obj)/$(bounds-file): kernel/bounds.s Kbuild
# 2) Generate asm-offsets.h
#
-offsets-file := include/generated/asm-offsets.h
+asm-offsets-file := include/generated/asm-offsets.h
-always += $(offsets-file)
-targets += $(offsets-file)
+always += $(asm-offsets-file)
+targets += $(asm-offsets-file)
targets += arch/$(SRCARCH)/kernel/asm-offsets.s
-
-# Default sed regexp - multiline due to syntax constraints
-define sed-y
- "/^->/{s:->#\(.*\):/* \1 */:; \
- s:^->\([^ ]*\) [\$$#]*\([^ ]*\) \(.*\):#define \1 \2 /* \3 */:; \
- s:->::; p;}"
-endef
-
-quiet_cmd_offsets = GEN $@
-define cmd_offsets
+quiet_cmd_asm_offsets = GEN $@
+define cmd_asm_offsets
(set -e; \
echo "#ifndef __ASM_OFFSETS_H__"; \
echo "#define __ASM_OFFSETS_H__"; \
@@ -78,13 +82,48 @@ endef
arch/$(SRCARCH)/kernel/asm-offsets.s: arch/$(SRCARCH)/kernel/asm-offsets.c \
$(obj)/$(bounds-file) FORCE
$(Q)mkdir -p $(dir $@)
- $(call if_changed_dep,cc_s_c)
+ $(call if_changed_dep,offsets_cc_s_c)
+
+$(obj)/$(asm-offsets-file): arch/$(SRCARCH)/kernel/asm-offsets.s Kbuild
+ $(call cmd,asm_offsets)
+
+#####
+# 3) Generate kernel-offsets.h
+#
+
+kernel-offsets-file := include/generated/kernel-offsets.h
+
+always += $(kernel-offsets-file)
+targets += $(kernel-offsets-file)
+targets += kernel/kernel-offsets.s
+
+quiet_cmd_kernel_offsets = GEN $@
+define cmd_kernel_offsets
+ (set -e; \
+ echo "#ifndef __LINUX_KERNEL_OFFSETS_H__"; \
+ echo "#define __LINUX_KERNEL_OFFSETS_H__"; \
+ echo "/*"; \
+ echo " * DO NOT MODIFY."; \
+ echo " *"; \
+ echo " * This file was generated by Kbuild"; \
+ echo " *"; \
+ echo " */"; \
+ echo ""; \
+ sed -ne $(sed-y) $<; \
+ echo ""; \
+ echo "#endif" ) > $@
+endef
+
+# We use internal kbuild rules to avoid the "is up to date" message from make
+kernel/kernel-offsets.s: kernel/kernel-offsets.c $(obj)/$(bounds-file) \
+ $(obj)/$(asm-offsets-file) FORCE
+ $(call if_changed_dep,offsets_cc_s_c)
-$(obj)/$(offsets-file): arch/$(SRCARCH)/kernel/asm-offsets.s Kbuild
- $(call cmd,offsets)
+$(obj)/$(kernel-offsets-file): kernel/kernel-offsets.s Kbuild
+ $(call cmd,kernel_offsets)
#####
-# 3) Check for missing system calls
+# 4) Check for missing system calls
#
quiet_cmd_syscalls = CALL $<
diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
index 89414d6..42cd6bc 100644
--- a/include/linux/rcupdate.h
+++ b/include/linux/rcupdate.h
@@ -124,6 +124,7 @@ extern void rcu_bh_qs(int cpu);
extern void rcu_check_callbacks(int cpu, int user);
struct notifier_block;
+#ifndef CONFIG_RCURING
#ifdef CONFIG_NO_HZ
extern void rcu_enter_nohz(void);
@@ -140,11 +141,14 @@ static inline void rcu_exit_nohz(void)
}
#endif /* #else #ifdef CONFIG_NO_HZ */
+#endif
#if defined(CONFIG_TREE_RCU) || defined(CONFIG_TREE_PREEMPT_RCU)
#include <linux/rcutree.h>
#elif defined(CONFIG_TINY_RCU) || defined(CONFIG_TINY_PREEMPT_RCU)
#include <linux/rcutiny.h>
+#elif defined(CONFIG_RCURING)
+#include <linux/rcuring.h>
#else
#error "Unknown RCU implementation specified to kernel configuration"
#endif
@@ -686,6 +690,7 @@ struct rcu_synchronize {
extern void wakeme_after_rcu(struct rcu_head *head);
+#ifndef CONFIG_RCURING
#ifdef CONFIG_PREEMPT_RCU
/**
@@ -710,6 +715,7 @@ extern void call_rcu(struct rcu_head *head,
#define call_rcu call_rcu_sched
#endif /* #else #ifdef CONFIG_PREEMPT_RCU */
+#endif
/**
* call_rcu_bh() - Queue an RCU for invocation after a quicker grace period.
diff --git a/include/linux/rcuring.h b/include/linux/rcuring.h
new file mode 100644
index 0000000..343b932
--- /dev/null
+++ b/include/linux/rcuring.h
@@ -0,0 +1,195 @@
+/*
+ * Read-Copy Update mechanism for mutual exclusion
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright IBM Corporation, 2008
+ * Copyright Fujitsu 2008-2010 Lai Jiangshan
+ *
+ * Authors: Dipankar Sarma <dipankar@in.ibm.com>
+ * Manfred Spraul <manfred@colorfullife.com>
+ * Paul E. McKenney <paulmck@linux.vnet.ibm.com> Hierarchical version
+ * Lai Jiangshan <laijs@cn.fujitsu.com> RCURING version
+ */
+
+#ifndef __LINUX_RCURING_H
+#define __LINUX_RCURING_H
+
+#define __rcu_read_lock_bh() local_bh_disable()
+#define __rcu_read_unlock_bh() local_bh_enable()
+#define __rcu_read_lock_sched() preempt_disable()
+#define __rcu_read_unlock_sched() preempt_enable()
+
+#ifdef CONFIG_RCURING_BH
+extern void call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *));
+extern void synchronize_rcu_bh(void);
+extern void synchronize_rcu_bh_expedited(void);
+extern void rcu_bh_qs(int cpu);
+extern long rcu_batches_completed_bh(void);
+extern void rcu_barrier_bh(void);
+extern void rcu_bh_force_quiescent_state(void);
+#else /* CONFIG_RCURING_BH */
+#define call_rcu_bh call_rcu_sched
+#define synchronize_rcu_bh synchronize_rcu_sched
+#define synchronize_rcu_bh_expedited synchronize_rcu_sched_expedited
+#define rcu_bh_qs(cpu) do { (void)(cpu); } while (0)
+#define rcu_batches_completed_bh rcu_batches_completed_sched
+#define rcu_barrier_bh rcu_barrier_sched
+#define rcu_bh_force_quiescent_state rcu_sched_force_quiescent_state
+#endif /* CONFIG_RCURING_BH */
+
+extern
+void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *));
+extern void synchronize_rcu_sched(void);
+#define synchronize_sched synchronize_rcu_sched
+extern void synchronize_rcu_sched_expedited(void);
+#define synchronize_sched_expedited synchronize_rcu_sched_expedited
+extern void rcu_note_context_switch(int cpu);
+extern long rcu_batches_completed_sched(void);
+extern void rcu_barrier_sched(void);
+extern void rcu_sched_force_quiescent_state(void);
+
+/*
+ * The @flags is valid only when RCURING_PREEMPT_SAVED is set.
+ *
+ * When preemptible rcu read site is preempted, we save RCURING_PREEMPT_SAVED,
+ * RCURING_PREEMPT_QS and this read site's locked_complete(only the last
+ * RCURING_COUNTERS_SHIFT bits) in the @flags.
+ *
+ * When the outmost rcu read site is closed, we will clear
+ * RCURING_PREEMPT_QS bit if it's saved, and let rcu system knows
+ * this task has passed a quiescent state and release the old read site's
+ * locked_complete.
+ */
+#define RCURING_PREEMPT_SAVED (1 << 31)
+#define RCURING_PREEMPT_QS (1 << 30)
+#define RCURING_PREEMPT_FLAGS (RCURING_PREEMPT_SAVED | RCURING_PREEMPT_QS)
+
+struct rcu_task_preempt {
+ unsigned int nesting;
+ unsigned int flags;
+};
+
+static inline void rcu_read_lock_preempt(struct rcu_task_preempt *rcu_task)
+{
+ rcu_task->nesting++;
+ barrier();
+}
+
+static inline void rcu_read_unlock_preempt(struct rcu_task_preempt *rcu_task)
+{
+ int nesting = rcu_task->nesting;
+
+ if (nesting == 1) {
+ unsigned int flags;
+
+ barrier();
+ flags = ACCESS_ONCE(rcu_task->flags);
+
+ if ((flags & RCURING_PREEMPT_FLAGS) == RCURING_PREEMPT_FLAGS) {
+ flags &= ~RCURING_PREEMPT_QS;
+ ACCESS_ONCE(rcu_task->flags) = flags;
+ }
+ }
+
+ barrier();
+ rcu_task->nesting = nesting - 1;
+}
+
+#ifdef CONFIG_RCURING_PREEMPT
+
+#ifdef __GENARATING_OFFSETS__
+#define task_rcu_preempt(p) ((struct rcu_task_preempt *)NULL)
+#else
+#include <generated/kernel-offsets.h>
+#define task_rcu_preempt(p) \
+ ((struct rcu_task_preempt *)(((void *)p) + TASK_RCU_PREEMPT))
+#endif
+
+#define current_rcu_preempt() task_rcu_preempt(current)
+#define __rcu_read_lock() rcu_read_lock_preempt(current_rcu_preempt())
+#define __rcu_read_unlock() rcu_read_unlock_preempt(current_rcu_preempt())
+extern
+void call_rcu_preempt(struct rcu_head *head, void (*func)(struct rcu_head *));
+#define call_rcu call_rcu_preempt
+extern void synchronize_rcu_preempt(void);
+#define synchronize_rcu synchronize_rcu_preempt
+extern void synchronize_rcu_preempt_expedited(void);
+#define synchronize_rcu_expedited synchronize_rcu_preempt_expedited
+extern long rcu_batches_completed_preempt(void);
+#define rcu_batches_completed rcu_batches_completed_preempt
+extern void rcu_barrier_preempt(void);
+#define rcu_barrier rcu_barrier_preempt
+extern void rcu_preempt_force_quiescent_state(void);
+#define rcu_force_quiescent_state rcu_preempt_force_quiescent_state
+
+struct task_struct;
+static inline void rcu_copy_process(struct task_struct *p)
+{
+ struct rcu_task_preempt *rcu_task = task_rcu_preempt(p);
+
+ rcu_task->nesting = 0;
+ rcu_task->flags = 0;
+}
+
+static inline void exit_rcu(void)
+{
+ if (current_rcu_preempt()->nesting) {
+ WARN_ON(1);
+ current_rcu_preempt()->nesting = 0;
+ }
+}
+
+#else /*CONFIG_RCURING_PREEMPT */
+
+#define __rcu_read_lock() __rcu_read_lock_sched();
+#define __rcu_read_unlock() __rcu_read_unlock_sched();
+#define call_rcu call_rcu_sched
+#define synchronize_rcu synchronize_rcu_sched
+#define synchronize_rcu_expedited synchronize_rcu_sched_expedited
+#define rcu_batches_completed rcu_batches_completed_sched
+#define rcu_barrier rcu_barrier_sched
+#define rcu_force_quiescent_state rcu_sched_force_quiescent_state
+
+static inline void rcu_copy_process(struct task_struct *p) {}
+static inline void exit_rcu(void) {}
+#endif /* CONFIG_RCURING_PREEMPT */
+
+#ifdef CONFIG_NO_HZ
+extern void rcu_kernel_enter(void);
+extern void rcu_kernel_exit(void);
+
+static inline void rcu_nmi_enter(void) { rcu_kernel_enter(); }
+static inline void rcu_nmi_exit(void) { rcu_kernel_exit(); }
+static inline void rcu_irq_enter(void) { rcu_kernel_enter(); }
+static inline void rcu_irq_exit(void) { rcu_kernel_exit(); }
+extern void rcu_enter_nohz(void);
+extern void rcu_exit_nohz(void);
+#else
+static inline void rcu_nmi_enter(void) {}
+static inline void rcu_nmi_exit(void) {}
+static inline void rcu_irq_enter(void) {}
+static inline void rcu_irq_exit(void) {}
+static inline void rcu_enter_nohz(void) {}
+static inline void rcu_exit_nohz(void) {}
+#endif
+
+extern int rcu_needs_cpu(int cpu);
+extern void rcu_check_callbacks(int cpu, int user);
+
+extern void rcu_scheduler_starting(void);
+extern int rcu_scheduler_active __read_mostly;
+
+#endif /* __LINUX_RCURING_H */
diff --git a/include/linux/sched.h b/include/linux/sched.h
index e18473f..15b332d 100644
--- a/include/linux/sched.h
+++ b/include/linux/sched.h
@@ -1211,6 +1211,10 @@ struct task_struct {
struct rcu_node *rcu_blocked_node;
#endif /* #ifdef CONFIG_TREE_PREEMPT_RCU */
+#ifdef CONFIG_RCURING_PREEMPT
+ struct rcu_task_preempt rcu_task_preempt;
+#endif
+
#if defined(CONFIG_SCHEDSTATS) || defined(CONFIG_TASK_DELAY_ACCT)
struct sched_info sched_info;
#endif
@@ -1757,7 +1761,7 @@ static inline void rcu_copy_process(struct task_struct *p)
INIT_LIST_HEAD(&p->rcu_node_entry);
}
-#else
+#elif !defined(CONFIG_RCURING)
static inline void rcu_copy_process(struct task_struct *p)
{
diff --git a/init/Kconfig b/init/Kconfig
index a619a1a..48e58d9 100644
--- a/init/Kconfig
+++ b/init/Kconfig
@@ -374,6 +374,9 @@ config TINY_PREEMPT_RCU
for real-time UP systems. This option greatly reduces the
memory footprint of RCU.
+config RCURING
+ bool "Multi-GP ring based RCU"
+
endchoice
config PREEMPT_RCU
@@ -450,6 +453,35 @@ config TREE_RCU_TRACE
TREE_PREEMPT_RCU implementations, permitting Makefile to
trivially select kernel/rcutree_trace.c.
+config RCURING_BH
+ bool "RCU for bh"
+ default y
+ depends on RCURING && !PREEMPT_RT
+ help
+ If Y, use a independent rcu domain for rcu_bh.
+ If N, rcu_bh will map to rcu_sched(except rcu_read_lock_bh,
+ rcu_read_unlock_bh).
+
+config RCURING_BH_PREEMPT
+ bool
+ depends on PREEMPT_RT
+ help
+ rcu_bh will map to rcu_preempt.
+
+config RCURING_PREEMPT
+ bool "RCURING based reemptible RCU"
+ default n
+ depends on RCURING && PREEMPT
+ help
+ If Y, normal rcu functionality will map to rcu_preempt.
+ If N, normal rcu functionality will map to rcu_sched.
+
+config RCURING_COUNTERS_SHIFT
+ int "RCURING's counter shift"
+ range 1 10
+ depends on RCURING
+ default 4
+
endmenu # "RCU Subsystem"
config IKCONFIG
diff --git a/kernel/Makefile b/kernel/Makefile
index 92b7420..66eab8c 100644
--- a/kernel/Makefile
+++ b/kernel/Makefile
@@ -86,6 +86,7 @@ obj-$(CONFIG_TREE_PREEMPT_RCU) += rcutree.o
obj-$(CONFIG_TREE_RCU_TRACE) += rcutree_trace.o
obj-$(CONFIG_TINY_RCU) += rcutiny.o
obj-$(CONFIG_TINY_PREEMPT_RCU) += rcutiny.o
+obj-$(CONFIG_RCURING) += rcuring.o
obj-$(CONFIG_RELAY) += relay.o
obj-$(CONFIG_SYSCTL) += utsname_sysctl.o
obj-$(CONFIG_TASK_DELAY_ACCT) += delayacct.o
diff --git a/kernel/kernel-offsets.c b/kernel/kernel-offsets.c
new file mode 100644
index 0000000..0d30817
--- /dev/null
+++ b/kernel/kernel-offsets.c
@@ -0,0 +1,20 @@
+/*
+ * Generate definitions needed by assembly language modules.
+ *
+ * Copyright (C) 2008-2010 Lai Jiangshan
+ */
+
+#define __GENERATING_KERNEL_OFFSETS__
+#include <linux/rcupdate.h>
+#include <linux/sched.h>
+#include <linux/kbuild.h>
+
+void foo(void);
+
+void foo(void)
+{
+#ifdef CONFIG_RCURING_PREEMPT
+ OFFSET(TASK_RCU_PREEMPT, task_struct, rcu_task_preempt);
+#endif
+}
+
diff --git a/kernel/rcuring.c b/kernel/rcuring.c
new file mode 100644
index 0000000..e7cedb5
--- /dev/null
+++ b/kernel/rcuring.c
@@ -0,0 +1,1002 @@
+/*
+ * Read-Copy Update mechanism for mutual exclusion
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright IBM Corporation, 2008
+ * Copyright Fujitsu 2008-2010 Lai Jiangshan
+ *
+ * Authors: Dipankar Sarma <dipankar@in.ibm.com>
+ * Manfred Spraul <manfred@colorfullife.com>
+ * Paul E. McKenney <paulmck@linux.vnet.ibm.com> Hierarchical version
+ * Lai Jiangshan <laijs@cn.fujitsu.com> RCURING version
+ */
+
+#include <linux/rcupdate.h>
+#include <linux/percpu.h>
+#include <asm/atomic.h>
+#include <linux/spinlock.h>
+#include <linux/module.h>
+#include <linux/interrupt.h>
+#include <linux/cpu.h>
+
+#define RCURING_IDX_SHIFT CONFIG_RCURING_COUNTERS_SHIFT
+#define RCURING_IDX_MAX (1L << RCURING_IDX_SHIFT)
+#define RCURING_IDX_MASK (RCURING_IDX_MAX - 1)
+#define RCURING_IDX(complete) ((complete) & RCURING_IDX_MASK)
+
+struct rcuring {
+ atomic_t ctr[RCURING_IDX_MAX];
+ long curr_complete;
+ long done_complete;
+
+ raw_spinlock_t lock;
+};
+
+/* It is long history that we use -300 as an initial complete */
+#define RCURING_INIT_COMPLETE -300L
+#define RCURING_INIT(name) \
+{ \
+ {[RCURING_IDX(RCURING_INIT_COMPLETE)] = ATOMIC_INIT(1),}, \
+ RCURING_INIT_COMPLETE, \
+ RCURING_INIT_COMPLETE - 1, \
+ __RAW_SPIN_LOCK_UNLOCKED(name.lock), \
+}
+
+static inline
+int rcuring_ctr_read(struct rcuring *rr, unsigned long complete)
+{
+ int res;
+
+ /* atomic_read() does not ensure to imply barrier(). */
+ barrier();
+ res = atomic_read(rr->ctr + RCURING_IDX(complete));
+ barrier();
+
+ return res;
+}
+
+void __rcuring_advance_done_complete(struct rcuring *rr)
+{
+ long wait_complete = rr->done_complete + 1;
+
+ if (!rcuring_ctr_read(rr, wait_complete)) {
+ do {
+ wait_complete++;
+ } while (!rcuring_ctr_read(rr, wait_complete));
+
+ ACCESS_ONCE(rr->done_complete) = wait_complete - 1;
+ }
+}
+
+static inline
+int rcuring_could_advance_done_complete(struct rcuring *rr)
+{
+ return !rcuring_ctr_read(rr, rr->done_complete + 1);
+}
+
+static inline
+void rcuring_advance_done_complete(struct rcuring *rr)
+{
+ if (rcuring_could_advance_done_complete(rr)) {
+ if (__raw_spin_trylock(&rr->lock)) {
+ __rcuring_advance_done_complete(rr);
+ __raw_spin_unlock(&rr->lock);
+ }
+ }
+}
+
+void __rcuring_advance_curr_complete(struct rcuring *rr, long next_complete)
+{
+ long curr_complete = rr->curr_complete;
+
+ if (curr_complete + 1 == next_complete) {
+ if (!rcuring_ctr_read(rr, next_complete)) {
+ ACCESS_ONCE(rr->curr_complete) = next_complete;
+ smp_mb__before_atomic_inc();
+ atomic_inc(rr->ctr + RCURING_IDX(next_complete));
+ smp_mb__after_atomic_inc();
+ atomic_dec(rr->ctr + RCURING_IDX(curr_complete));
+ }
+ }
+}
+
+static inline
+int rcuring_could_advance_complete(struct rcuring *rr, long next_complete)
+{
+ if (rcuring_could_advance_done_complete(rr))
+ return 1;
+
+ if (rr->curr_complete + 1 == next_complete) {
+ if (!rcuring_ctr_read(rr, next_complete))
+ return 1;
+ }
+
+ return 0;
+}
+
+static inline
+void rcuring_advance_complete(struct rcuring *rr, long next_complete)
+{
+ if (rcuring_could_advance_complete(rr, next_complete)) {
+ if (__raw_spin_trylock(&rr->lock)) {
+ __rcuring_advance_done_complete(rr);
+ __rcuring_advance_curr_complete(rr, next_complete);
+ __raw_spin_unlock(&rr->lock);
+ }
+ }
+}
+
+static inline
+void rcuring_advance_complete_force(struct rcuring *rr, long next_complete)
+{
+ if (rcuring_could_advance_complete(rr, next_complete)) {
+ __raw_spin_lock(&rr->lock);
+ __rcuring_advance_done_complete(rr);
+ __rcuring_advance_curr_complete(rr, next_complete);
+ __raw_spin_unlock(&rr->lock);
+ }
+}
+
+static inline long __locked_complete_adjust(int locked_idx, long complete)
+{
+ long locked_complete;
+
+ locked_complete = (complete & ~RCURING_IDX_MASK) | (long)locked_idx;
+ if (locked_complete - complete <= 0)
+ return locked_complete;
+ else
+ return locked_complete - RCURING_IDX_MAX;
+}
+
+static long rcuring_lock(struct rcuring *rr)
+{
+ long locked_complete;
+ int idx;
+
+ for (;;) {
+ idx = RCURING_IDX(ACCESS_ONCE(rr->curr_complete));
+ if (likely(atomic_inc_not_zero(rr->ctr + idx)))
+ break;
+ }
+ smp_mb__after_atomic_inc();
+
+ locked_complete = ACCESS_ONCE(rr->curr_complete);
+ if (likely(RCURING_IDX(locked_complete) == idx))
+ return locked_complete;
+ else
+ return __locked_complete_adjust(idx, locked_complete);
+}
+
+static void rcuring_unlock(struct rcuring *rr, long locked_complete)
+{
+ smp_mb__before_atomic_dec();
+
+ atomic_dec(rr->ctr + RCURING_IDX(locked_complete));
+}
+
+static inline
+void rcuring_dup_lock(struct rcuring *rr, long locked_complete)
+{
+ atomic_inc(rr->ctr + RCURING_IDX(locked_complete));
+}
+
+static inline
+long rcuring_advance_lock(struct rcuring *rr, long locked_complete)
+{
+ long new_locked_complete = locked_complete;
+
+ if (locked_complete != rr->curr_complete) {
+ /*
+ * Lock the new complete at first, and then release
+ * the old one. It prevents errors when NMI/SMI occurs
+ * after rcuring_unlock() - we still lock it.
+ */
+ new_locked_complete = rcuring_lock(rr);
+ rcuring_unlock(rr, locked_complete);
+ }
+
+ return new_locked_complete;
+}
+
+static inline long rcuring_get_done_complete(struct rcuring *rr)
+{
+ return ACCESS_ONCE(rr->done_complete);
+}
+
+static inline long rcuring_get_curr_complete(struct rcuring *rr)
+{
+ return ACCESS_ONCE(rr->curr_complete);
+}
+
+struct rcu_batch {
+ struct rcu_head *list, **tail;
+};
+
+static void rcu_batch_merge(struct rcu_batch *to, struct rcu_batch *from)
+{
+ if (from->list != NULL) {
+ *to->tail = from->list;
+ to->tail = from->tail;
+
+ from->list = NULL;
+ from->tail = &from->list;
+ }
+}
+
+static void rcu_batch_add(struct rcu_batch *batch, struct rcu_head *new)
+{
+ *batch->tail = new;
+ batch->tail = &new->next;
+}
+
+struct rcu_data {
+ long locked_complete;
+
+ /*
+ * callbacks are in batches (done_complete, curr_complete]
+ * curr_complete - done_complete >= 0
+ * curr_complete - done_complete <= RCURING_IDX_MAX
+ * domain's curr_complete - curr_complete >= 0
+ * domain's done_complete - done_complete >= 0
+ *
+ * curr_complete == done_complete just means @batch is empty.
+ * we have at least one callback in batch[RCURING_IDX(done_complete)]
+ * if curr_complete != done_complete
+ */
+ long curr_complete;
+ long done_complete;
+
+ struct rcu_batch batch[RCURING_IDX_MAX];
+ struct rcu_batch done_batch;
+
+ unsigned int qlen;
+ unsigned long jiffies;
+ unsigned long stall_jiffies;
+};
+
+struct rcu_domain {
+ struct rcuring rcuring;
+ const char *domain_name;
+ struct rcu_data __percpu *rcu_data;
+
+ spinlock_t lock; /* this lock is for fqs or other misc things */
+ long fqs_complete;
+ void (*force_quiescent_state)(struct rcu_domain *domain,
+ long fqs_complete);
+};
+
+#define EXPEDITED_GP_RESERVED_RECOMMEND (RCURING_IDX_SHIFT - 1)
+
+static inline long gp_reserved(struct rcu_domain *domain)
+{
+ return EXPEDITED_GP_RESERVED_RECOMMEND;
+}
+
+static inline void __init rcu_data_init(struct rcu_data *rdp)
+{
+ int idx;
+
+ for (idx = 0; idx < RCURING_IDX_MAX; idx++)
+ rdp->batch[idx].tail = &rdp->batch[idx].list;
+
+ rdp->done_batch.tail = &rdp->done_batch.list;
+}
+
+static void do_advance_callbacks(struct rcu_data *rdp, long done_complete)
+{
+ int idx;
+
+ while (rdp->done_complete != done_complete) {
+ rdp->done_complete++;
+ idx = RCURING_IDX(rdp->done_complete);
+ rcu_batch_merge(&rdp->done_batch, rdp->batch + idx);
+ }
+}
+
+/* Is value in the range (@left_open, @right_close] */
+static inline bool in_range(long left_open, long right_close, long value)
+{
+ return left_open - value < 0 && value - right_close <= 0;
+}
+
+static void advance_callbacks(struct rcu_data *rdp, long done_complete,
+ long curr_complete)
+{
+ if (in_range(done_complete, curr_complete, rdp->curr_complete)) {
+ do_advance_callbacks(rdp, done_complete);
+ } else {
+ do_advance_callbacks(rdp, rdp->curr_complete);
+ rdp->curr_complete = done_complete;
+ rdp->done_complete = done_complete;
+ }
+}
+
+static void __force_quiescent_state(struct rcu_domain *domain,
+ long fqs_complete)
+{
+ int cpu;
+ long fqs_complete_old;
+ long curr_complete = rcuring_get_curr_complete(&domain->rcuring);
+ long done_complete = rcuring_get_done_complete(&domain->rcuring);
+
+ spin_lock(&domain->lock);
+
+ fqs_complete_old = domain->fqs_complete;
+ if (!in_range(done_complete, curr_complete, fqs_complete_old))
+ fqs_complete_old = done_complete;
+
+ if (fqs_complete_old - fqs_complete >= 0) {
+ spin_unlock(&domain->lock);
+ return;
+ }
+
+ domain->fqs_complete = fqs_complete;
+ spin_unlock(&domain->lock);
+
+ for_each_online_cpu(cpu) {
+ struct rcu_data *rdp = per_cpu_ptr(domain->rcu_data, cpu);
+ long locked_complete = ACCESS_ONCE(rdp->locked_complete);
+
+ if (!in_range(fqs_complete_old, fqs_complete, locked_complete))
+ continue;
+
+ if (cpu == smp_processor_id())
+ set_tsk_need_resched(current);
+ else
+ smp_send_reschedule(cpu);
+ }
+}
+
+static void force_quiescent_state(struct rcu_domain *domain, long fqs_complete)
+{
+ domain->force_quiescent_state(domain, fqs_complete);
+}
+
+static
+long prepare_for_new_callback(struct rcu_domain *domain, struct rcu_data *rdp)
+{
+ long curr_complete, done_complete;
+ struct rcuring *rr = &domain->rcuring;
+
+ smp_mb();
+ curr_complete = rcuring_get_curr_complete(rr);
+ done_complete = rcuring_get_done_complete(rr);
+
+ advance_callbacks(rdp, done_complete, curr_complete);
+ rdp->curr_complete = curr_complete;
+
+ if (!rdp->qlen) {
+ rdp->jiffies = jiffies;
+ rdp->stall_jiffies = jiffies;
+ }
+
+ return curr_complete;
+}
+
+static long __call_rcu(struct rcu_domain *domain, struct rcu_head *head,
+ void (*func)(struct rcu_head *))
+{
+ struct rcu_data *rdp;
+ long curr_complete;
+
+ head->next = NULL;
+ head->func = func;
+
+ rdp = this_cpu_ptr(domain->rcu_data);
+ curr_complete = prepare_for_new_callback(domain, rdp);
+ rcu_batch_add(rdp->batch + RCURING_IDX(curr_complete), head);
+ rdp->qlen++;
+
+ return curr_complete;
+}
+
+static void print_rcuring_stall(struct rcu_domain *domain)
+{
+ struct rcuring *rr = &domain->rcuring;
+ unsigned long curr_complete = rcuring_get_curr_complete(rr);
+ unsigned long done_complete = rcuring_get_done_complete(rr);
+ int cpu;
+
+ printk(KERN_ERR "RCU is stall, cpu=%d, domain_name=%s\n", smp_processor_id(),
+ domain->domain_name);
+
+ printk(KERN_ERR "domain's complete(done/curr)=%ld/%ld\n",
+ curr_complete, done_complete);
+ printk(KERN_ERR "curr_ctr=%d, wait_ctr=%d\n",
+ rcuring_ctr_read(rr, curr_complete),
+ rcuring_ctr_read(rr, done_complete + 1));
+
+ for_each_online_cpu(cpu) {
+ struct rcu_data *rdp = per_cpu_ptr(domain->rcu_data, cpu);
+ printk(KERN_ERR "cpu=%d, qlen=%d, complete(locked/dome/curr/)="
+ "%ld/%ld/%ld\n", cpu, rdp->qlen,
+ rdp->locked_complete, rdp->done_complete,
+ rdp->curr_complete);
+ }
+}
+
+static void __rcu_check_callbacks(struct rcu_domain *domain,
+ struct rcu_data *rdp, int in_rcu_softirq)
+{
+ long curr_complete, done_complete;
+ struct rcuring *rr = &domain->rcuring;
+
+ if (!rdp->qlen)
+ return;
+
+ rcuring_advance_done_complete(rr);
+
+ curr_complete = rcuring_get_curr_complete(rr);
+ done_complete = ACCESS_ONCE(rr->done_complete);
+ advance_callbacks(rdp, done_complete, curr_complete);
+
+ if (rdp->curr_complete == curr_complete
+ && rdp->batch[RCURING_IDX(rdp->curr_complete)].list) {
+ long max_gp_allow = RCURING_IDX_MAX - gp_reserved(domain);
+
+ if (curr_complete - done_complete <= max_gp_allow)
+ rcuring_advance_complete(rr, curr_complete + 1);
+ }
+
+ if (rdp->done_batch.list) {
+ if (!in_rcu_softirq)
+ raise_softirq(RCU_SOFTIRQ);
+ } else {
+ if (jiffies - rdp->jiffies > 10) {
+ force_quiescent_state(domain, rdp->curr_complete);
+ rdp->jiffies = jiffies;
+ }
+ if (jiffies - rdp->stall_jiffies > 2 * HZ) {
+ print_rcuring_stall(domain);
+ rdp->stall_jiffies = jiffies;
+ }
+ }
+}
+
+static void rcu_do_batch(struct rcu_domain *domain, struct rcu_data *rdp)
+{
+ int count = 0;
+ struct rcu_head *list, *next;
+
+ list = rdp->done_batch.list;
+
+ if (list) {
+ rdp->done_batch.list = NULL;
+ rdp->done_batch.tail = &rdp->done_batch.list;
+
+ local_irq_enable();
+
+ smp_mb();
+
+ while (list) {
+ next = list->next;
+ prefetch(next);
+ list->func(list);
+ count++;
+ list = next;
+ }
+ local_irq_disable();
+
+ rdp->qlen -= count;
+ rdp->jiffies = jiffies;
+ }
+}
+
+static int __rcu_needs_cpu(struct rcu_data *rdp)
+{
+ return !!rdp->qlen;
+}
+
+static inline
+void __rcu_qsctr_inc(struct rcu_domain *domain, struct rcu_data *rdp)
+{
+ rdp->locked_complete = rcuring_advance_lock(&domain->rcuring,
+ rdp->locked_complete);
+}
+
+static inline void rcu_preempt_save_complete(struct rcu_task_preempt *rcu_task,
+ long locked_complete)
+{
+ unsigned int new_flags;
+
+ new_flags = RCURING_PREEMPT_FLAGS | RCURING_IDX(locked_complete);
+ ACCESS_ONCE(rcu_task->flags) = new_flags;
+}
+
+/*
+ * Every cpu hold a lock_complete. But for preempt rcu, any task may also
+ * hold a lock_complete. This function increases lock_complete for the current
+ * cpu and check(dup_lock_and_save or release or advance) the lock_complete of
+ * the current task.
+ */
+static inline
+void __rcu_preempt_qsctr_inc(struct rcu_domain *domain, struct rcu_data *rdp,
+ struct rcu_task_preempt *rcu_task)
+{
+ long locked_complete = rdp->locked_complete;
+ unsigned int flags = ACCESS_ONCE(rcu_task->flags);
+
+ if (!(flags & RCURING_PREEMPT_SAVED)) {
+ BUG_ON(flags & RCURING_PREEMPT_QS);
+ if (rcu_task->nesting) {
+ /* dup_lock_and_save current task's lock_complete */
+ rcuring_dup_lock(&domain->rcuring, locked_complete);
+ rcu_preempt_save_complete(rcu_task, locked_complete);
+ }
+ } else if (!(rcu_task->nesting)) {
+ /* release current task's lock_complete */
+ rcuring_unlock(&domain->rcuring, (long)flags);
+ ACCESS_ONCE(rcu_task->flags) = 0;
+ } else if (!(flags & RCURING_PREEMPT_QS)) {
+ /* advance_and_save current task's lock_complete */
+ if (RCURING_IDX(locked_complete) != RCURING_IDX((long)flags)) {
+ rcuring_unlock(&domain->rcuring, (long)flags);
+ rcuring_dup_lock(&domain->rcuring, locked_complete);
+ }
+ rcu_preempt_save_complete(rcu_task, locked_complete);
+ }
+
+ /* increases lock_complete for the current cpu */
+ rdp->locked_complete = rcuring_advance_lock(&domain->rcuring,
+ locked_complete);
+}
+
+static void __synchronize_rcu(struct rcu_domain *domain, int expedited)
+{
+ struct rcu_synchronize rcu;
+ long complete;
+
+ init_completion(&rcu.completion);
+
+ local_irq_disable();
+ complete = __call_rcu(domain, &rcu.head, wakeme_after_rcu);
+
+ if (expedited) {
+ /*
+ * Fore a new gp to be started immediately(can use reserved gp).
+ * But if all gp are used, it will fail to start new gp,
+ * and we have to wait until some new gps available.
+ */
+ rcuring_advance_complete_force(&domain->rcuring,
+ complete + 1);
+
+ /* Fore qs and expedite the new gp */
+ force_quiescent_state(domain, complete);
+ }
+ local_irq_enable();
+
+ wait_for_completion(&rcu.completion);
+}
+
+static inline long __rcu_batches_completed(struct rcu_domain *domain)
+{
+ return rcuring_get_done_complete(&domain->rcuring);
+}
+
+#ifdef CONFIG_RCURING_BH
+#define gen_code_for_bh(gen_code, args...) gen_code(bh, ##args)
+#else
+#define gen_code_for_bh(gen_code, args...)
+#endif
+#define gen_code_for_sched(gen_code, args...) gen_code(sched, ##args)
+#ifdef CONFIG_RCURING_PREEMPT
+#define gen_code_for_preempt(gen_code, args...) gen_code(preempt, ##args)
+#else
+#define gen_code_for_preempt(gen_code, args...)
+#endif
+
+#define GEN_CODES(gen_code, args...) \
+ gen_code_for_bh(gen_code, ##args) \
+ gen_code_for_sched(gen_code, ##args) \
+ gen_code_for_preempt(gen_code, ##args) \
+
+#define gen_basic_code(domain) \
+ \
+static void force_quiescent_state_##domain(struct rcu_domain *domain, \
+ long fqs_complete); \
+ \
+static DEFINE_PER_CPU(struct rcu_data, rcu_data_##domain); \
+ \
+struct rcu_domain rcu_domain_##domain = \
+{ \
+ .rcuring = RCURING_INIT(rcu_domain_##domain.rcuring), \
+ .domain_name = #domain, \
+ .rcu_data = &rcu_data_##domain, \
+ .lock = __SPIN_LOCK_UNLOCKED(rcu_domain_##domain.lock), \
+ .force_quiescent_state = force_quiescent_state_##domain, \
+}; \
+ \
+void call_rcu_##domain(struct rcu_head *head, \
+ void (*func)(struct rcu_head *)) \
+{ \
+ unsigned long flags; \
+ \
+ local_irq_save(flags); \
+ __call_rcu(&rcu_domain_##domain, head, func); \
+ local_irq_restore(flags); \
+} \
+EXPORT_SYMBOL_GPL(call_rcu_##domain); \
+ \
+void synchronize_rcu_##domain(void) \
+{ \
+ __synchronize_rcu(&rcu_domain_##domain, 0); \
+} \
+EXPORT_SYMBOL_GPL(synchronize_rcu_##domain); \
+ \
+void synchronize_rcu_##domain##_expedited(void) \
+{ \
+ __synchronize_rcu(&rcu_domain_##domain, 1); \
+} \
+EXPORT_SYMBOL_GPL(synchronize_rcu_##domain##_expedited); \
+ \
+long rcu_batches_completed_##domain(void) \
+{ \
+ return __rcu_batches_completed(&rcu_domain_##domain); \
+} \
+EXPORT_SYMBOL_GPL(rcu_batches_completed_##domain); \
+ \
+void rcu_##domain##_force_quiescent_state(void) \
+{ \
+ force_quiescent_state(&rcu_domain_##domain, 0); \
+} \
+EXPORT_SYMBOL_GPL(rcu_##domain##_force_quiescent_state);
+
+GEN_CODES(gen_basic_code)
+
+static DEFINE_PER_CPU(int, kernel_count);
+
+#define LOCK_COMPLETE(domain) \
+ long complete_##domain = rcuring_lock(&rcu_domain_##domain.rcuring);
+#define SAVE_COMPLETE(domain) \
+ per_cpu(rcu_data_##domain, cpu).locked_complete = complete_##domain;
+#define LOAD_COMPLETE(domain) \
+ int complete_##domain = per_cpu(rcu_data_##domain, cpu).locked_complete;
+#define RELEASE_COMPLETE(domain) \
+ rcuring_unlock(&rcu_domain_##domain.rcuring, complete_##domain);
+
+static void __rcu_kernel_enter_outmost(int cpu)
+{
+ GEN_CODES(LOCK_COMPLETE)
+
+ barrier();
+ per_cpu(kernel_count, cpu) = 1;
+ barrier();
+
+ GEN_CODES(SAVE_COMPLETE)
+}
+
+static void __rcu_kernel_exit_outmost(int cpu)
+{
+ GEN_CODES(LOAD_COMPLETE)
+
+ barrier();
+ per_cpu(kernel_count, cpu) = 0;
+ barrier();
+
+ GEN_CODES(RELEASE_COMPLETE)
+}
+
+#ifdef CONFIG_RCURING_BH
+static void force_quiescent_state_bh(struct rcu_domain *domain,
+ long fqs_complete)
+{
+ __force_quiescent_state(domain, fqs_complete);
+}
+
+static inline void rcu_bh_qsctr_inc_irqoff(int cpu)
+{
+ __rcu_qsctr_inc(&rcu_domain_bh, &per_cpu(rcu_data_bh, cpu));
+}
+
+void rcu_bh_qs(int cpu)
+{
+ unsigned long flags;
+
+ local_irq_save(flags);
+ rcu_bh_qsctr_inc_irqoff(cpu);
+ local_irq_restore(flags);
+}
+
+#else /* CONFIG_RCURING_BH */
+static inline void rcu_bh_qsctr_inc_irqoff(int cpu) { (void)(cpu); }
+#endif /* CONFIG_RCURING_BH */
+
+static void force_quiescent_state_sched(struct rcu_domain *domain,
+ long fqs_complete)
+{
+ __force_quiescent_state(domain, fqs_complete);
+}
+
+static inline void rcu_sched_qsctr_inc_irqoff(int cpu)
+{
+ __rcu_qsctr_inc(&rcu_domain_sched, &per_cpu(rcu_data_sched, cpu));
+}
+
+#ifdef CONFIG_RCURING_PREEMPT
+static void force_quiescent_state_preempt(struct rcu_domain *domain,
+ long fqs_complete)
+{
+ /* To Be Implemented */
+}
+
+static inline void rcu_preempt_qsctr_inc_irqoff(int cpu)
+{
+ __rcu_preempt_qsctr_inc(&rcu_domain_preempt,
+ &per_cpu(rcu_data_preempt, cpu),
+ ¤t->rcu_task_preempt);
+}
+
+#else /* CONFIG_RCURING_PREEMPT */
+static inline void rcu_preempt_qsctr_inc_irqoff(int cpu)
+{
+ (void)(cpu);
+ (void)(__rcu_preempt_qsctr_inc);
+}
+#endif /* CONFIG_RCURING_PREEMPT */
+
+#define gen_needs_cpu(domain, cpu) \
+ if (__rcu_needs_cpu(&per_cpu(rcu_data_##domain, cpu))) \
+ return 1;
+int rcu_needs_cpu(int cpu)
+{
+ GEN_CODES(gen_needs_cpu, cpu)
+ return 0;
+}
+
+#define call_func(domain, func, args...) \
+ func(&rcu_domain_##domain, &__get_cpu_var(rcu_data_##domain), ##args);
+
+/*
+ * Note a context switch. This is a quiescent state for RCU-sched,
+ * and requires special handling for preemptible RCU.
+ */
+void rcu_note_context_switch(int cpu)
+{
+ unsigned long flags;
+
+#ifdef CONFIG_HOTPLUG_CPU
+ /*
+ * The stoper thread need to schedule() to the idle thread
+ * after CPU_DYING.
+ */
+ if (unlikely(!per_cpu(kernel_count, cpu))) {
+ BUG_ON(cpu_online(cpu));
+ return;
+ }
+#endif
+
+ local_irq_save(flags);
+ rcu_bh_qsctr_inc_irqoff(cpu);
+ rcu_sched_qsctr_inc_irqoff(cpu);
+ rcu_preempt_qsctr_inc_irqoff(cpu);
+ local_irq_restore(flags);
+}
+
+static int rcu_cpu_idle(int cpu)
+{
+ return idle_cpu(cpu) && rcu_scheduler_active &&
+ !in_softirq() && hardirq_count() <= (1 << HARDIRQ_SHIFT);
+}
+
+void rcu_check_callbacks(int cpu, int user)
+{
+ unsigned long flags;
+
+ local_irq_save(flags);
+
+ if (user || rcu_cpu_idle(cpu)) {
+ rcu_bh_qsctr_inc_irqoff(cpu);
+ rcu_sched_qsctr_inc_irqoff(cpu);
+ rcu_preempt_qsctr_inc_irqoff(cpu);
+ } else if (!in_softirq())
+ rcu_bh_qsctr_inc_irqoff(cpu);
+
+ GEN_CODES(call_func, __rcu_check_callbacks, 0)
+ local_irq_restore(flags);
+}
+
+static void rcu_process_callbacks(struct softirq_action *unused)
+{
+ local_irq_disable();
+ GEN_CODES(call_func, __rcu_check_callbacks, 1)
+ GEN_CODES(call_func, rcu_do_batch)
+ local_irq_enable();
+}
+
+static void __cpuinit __rcu_offline_cpu(struct rcu_domain *domain,
+ struct rcu_data *off_rdp, struct rcu_data *rdp)
+{
+ if (off_rdp->qlen > 0) {
+ unsigned long flags;
+ long curr_complete;
+
+ /* move all callbacks in @off_rdp to @off_rdp->done_batch */
+ do_advance_callbacks(off_rdp, off_rdp->curr_complete);
+
+ /* move all callbacks in @off_rdp to this cpu(@rdp) */
+ local_irq_save(flags);
+ curr_complete = prepare_for_new_callback(domain, rdp);
+ rcu_batch_merge(rdp->batch + RCURING_IDX(curr_complete),
+ &off_rdp->done_batch);
+ rdp->qlen += off_rdp->qlen;
+ local_irq_restore(flags);
+
+ off_rdp->qlen = 0;
+ }
+}
+
+#define gen_offline(domain, cpu, recieve_cpu) \
+ __rcu_offline_cpu(&rcu_domain_##domain, \
+ &per_cpu(rcu_data_##domain, cpu), \
+ &per_cpu(rcu_data_##domain, recieve_cpu));
+
+static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
+ unsigned long action, void *hcpu)
+{
+ int cpu = (long)hcpu;
+ int recieve_cpu;
+
+ (void)(recieve_cpu);
+ (void)(__rcu_offline_cpu);
+ (void)(__rcu_kernel_exit_outmost);
+
+ switch (action) {
+#ifdef CONFIG_HOTPLUG_CPU
+ case CPU_DYING:
+ case CPU_DYING_FROZEN:
+ /*
+ * the machine is stopped now, we can access to
+ * any other cpu data.
+ * */
+ recieve_cpu = cpumask_any_but(cpu_online_mask, cpu);
+ GEN_CODES(gen_offline, cpu, recieve_cpu)
+ __rcu_kernel_exit_outmost(cpu);
+ break;
+#endif
+ case CPU_STARTING:
+ case CPU_STARTING_FROZEN:
+ __rcu_kernel_enter_outmost(cpu);
+ default:
+ break;
+ }
+ return NOTIFY_OK;
+}
+
+#define rcu_init_domain_data(domain, cpu) \
+ for_each_possible_cpu(cpu) \
+ rcu_data_init(&per_cpu(rcu_data_##domain, cpu));
+
+void __init rcu_init(void)
+{
+ int cpu;
+
+ GEN_CODES(rcu_init_domain_data, cpu)
+
+ __rcu_kernel_enter_outmost(raw_smp_processor_id());
+ open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
+ cpu_notifier(rcu_cpu_notify, 0);
+}
+
+int rcu_scheduler_active __read_mostly;
+EXPORT_SYMBOL_GPL(rcu_scheduler_active);
+
+/*
+ * This function is invoked towards the end of the scheduler's initialization
+ * process. Before this is called, the idle task might contain
+ * RCU read-side critical sections (during which time, this idle
+ * task is booting the system). After this function is called, the
+ * idle tasks are prohibited from containing RCU read-side critical
+ * sections. This function also enables RCU lockdep checking.
+ */
+void rcu_scheduler_starting(void)
+{
+ rcu_scheduler_active = 1;
+}
+
+#ifdef CONFIG_NO_HZ
+
+void rcu_kernel_enter(void)
+{
+ unsigned long flags;
+
+ raw_local_irq_save(flags);
+ if (__get_cpu_var(kernel_count) == 0)
+ __rcu_kernel_enter_outmost(raw_smp_processor_id());
+ else
+ __get_cpu_var(kernel_count)++;
+ raw_local_irq_restore(flags);
+}
+
+void rcu_kernel_exit(void)
+{
+ unsigned long flags;
+
+ raw_local_irq_save(flags);
+ if (__get_cpu_var(kernel_count) == 1)
+ __rcu_kernel_exit_outmost(raw_smp_processor_id());
+ else
+ __get_cpu_var(kernel_count)--;
+ raw_local_irq_restore(flags);
+}
+
+void rcu_enter_nohz(void)
+{
+ unsigned long flags;
+
+ raw_local_irq_save(flags);
+ __rcu_kernel_exit_outmost(raw_smp_processor_id());
+ raw_local_irq_restore(flags);
+}
+
+void rcu_exit_nohz(void)
+{
+ unsigned long flags;
+
+ raw_local_irq_save(flags);
+ __rcu_kernel_enter_outmost(raw_smp_processor_id());
+ raw_local_irq_restore(flags);
+}
+#endif /* CONFIG_NO_HZ */
+
+static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head);
+static struct completion rcu_barrier_completion;
+static struct kref rcu_barrier_wait_ref;
+static DEFINE_MUTEX(rcu_barrier_mutex);
+
+static void rcu_barrier_release_ref(struct kref *notused)
+{
+ complete(&rcu_barrier_completion);
+}
+
+static void rcu_barrier_callback(struct rcu_head *notused)
+{
+ kref_put(&rcu_barrier_wait_ref, rcu_barrier_release_ref);
+}
+
+static void rcu_barrier_func(void *data)
+{
+ struct rcu_domain *rcu_domain = data;
+
+ kref_get(&rcu_barrier_wait_ref);
+ __call_rcu(rcu_domain, &__get_cpu_var(rcu_barrier_head),
+ rcu_barrier_callback);
+}
+
+static void __rcu_barrier(struct rcu_domain *rcu_domain)
+{
+ mutex_lock(&rcu_barrier_mutex);
+
+ init_completion(&rcu_barrier_completion);
+ kref_init(&rcu_barrier_wait_ref);
+
+ /* queue barrier rcu_heads for every cpu */
+ on_each_cpu(rcu_barrier_func, rcu_domain, 1);
+
+ kref_put(&rcu_barrier_wait_ref, rcu_barrier_release_ref);
+ wait_for_completion(&rcu_barrier_completion);
+
+ mutex_unlock(&rcu_barrier_mutex);
+}
+
+#define gen_rcu_barrier(domain) \
+void rcu_barrier_##domain(void) \
+{ \
+ __rcu_barrier(&rcu_domain_##domain); \
+} \
+EXPORT_SYMBOL_GPL(rcu_barrier_##domain);
+
+GEN_CODES(gen_rcu_barrier)
+
diff --git a/kernel/sched.c b/kernel/sched.c
index 09b574e..967f25a 100644
--- a/kernel/sched.c
+++ b/kernel/sched.c
@@ -9113,6 +9113,7 @@ struct cgroup_subsys cpuacct_subsys = {
};
#endif /* CONFIG_CGROUP_CPUACCT */
+#ifndef CONFIG_RCURING
#ifndef CONFIG_SMP
void synchronize_sched_expedited(void)
@@ -9182,3 +9183,4 @@ void synchronize_sched_expedited(void)
EXPORT_SYMBOL_GPL(synchronize_sched_expedited);
#endif /* #else #ifndef CONFIG_SMP */
+#endif /* CONFIG_RCURING */
diff --git a/kernel/time/tick-sched.c b/kernel/time/tick-sched.c
index 3e216e0..0eba561 100644
--- a/kernel/time/tick-sched.c
+++ b/kernel/time/tick-sched.c
@@ -269,6 +269,10 @@ void tick_nohz_stop_sched_tick(int inidle)
cpu = smp_processor_id();
ts = &per_cpu(tick_cpu_sched, cpu);
+ /* Don't enter nohz when cpu is offlining, it is going to die */
+ if (cpu_is_offline(cpu))
+ goto end;
+
/*
* Call to tick_nohz_start_idle stops the last_update_time from being
* updated. Thus, it must not be called in the event we are called from
^ permalink raw reply related [flat|nested] 8+ messages in thread* Re: [PATCH, RFC] RCU: New scalable, multi-GP and preemptible RCU implementation 2010-08-30 9:31 [PATCH, RFC] RCU: New scalable, multi-GP and preemptible RCU implementation Lai Jiangshan @ 2010-08-30 23:57 ` Paul E. McKenney 2010-08-31 3:03 ` Lai Jiangshan 0 siblings, 1 reply; 8+ messages in thread From: Paul E. McKenney @ 2010-08-30 23:57 UTC (permalink / raw) To: Lai Jiangshan Cc: Linus Torvalds, Ingo Molnar, Andrew Morton, David Miller, Manfred Spraul, Mathieu Desnoyers, Steven Rostedt, Thomas Gleixner, Peter Zijlstra, LKML, dipankar On Mon, Aug 30, 2010 at 05:31:44PM +0800, Lai Jiangshan wrote: > > 0) contents > 1) overview > 2) scalable > 3) multi-GP > 4) integrated expedited synchronize_rcu > 5) preemptible > 6) top-down implies > 7) speech > 8) core rcuring Interesting! I can't claim to have dug through every detail, but figured I should ask a few questions first. Thanx, Paul Of course, the first question is whether it passes rcutorture, and if so on what hardware configuration. At first glance, it looks to live somewhere between Manfred Spraul's counter-based hierarchical RCU and classic RCU. There are also some resemblances to K42's "generations" RCU-like mechanism. > 1) overview > This is a new RCU implementation: RCURING. It uses a ring atomic counters > to control the completion of GPs and a ring callbacks batches to process > the callbacks. The ring is a set of grace periods, correct? > In 2008, I think out the idea, and wrote the first 300lines code of rcuring.c, > but I'm too lazy to finish it > > 2) scalable > There is only 2 locks in every RCU domain(rcu_bh, rcu_sched, rcu_preempt), > one of them is for misc things, it is a infrequency lock, the other is > for advancing complete, it is frequency lock. But somebody else would > very probably do the same work for me if somebody else is holding this lock, > so we alway use spin_trylock() for this lock, so it is scalable. > > We don't need any lock for reporting quiescent state, we just need 2 > atomic operations for every RCU domain. If you have a single GP in flight, all CPUs need to do an atomic op on the same element of the RCU ring, correct? My first thought was that each CPU was writing the current grace-period number to its own variable, but that would not require an atomic operation. Of course, the downside of this latter approach is that some CPU would need to scan all CPUs' counters. > 3) multi-GP > All old rcu implementations have only one waiting GP. > > time -----ta-tb--------------------tc----------------td-------------> > GP -----|------GP1---------------|--------GP2------|-------------> > cpu#x ........|---------------------------------------|...............> > |->insert a callback |->handle callback > > the callback have to wait a partial GP1, and a full GP2, it may wait > near 2 GPs in worse case because it have to wait the rcu read sites witch > start at (tb, tc] which are needless to wait. In average, a callback > have to wait (1 + 1/(1+1))= 1.5GPs > > if we can have three simultaneous waiting GPs: > > GPs .....|------GP1---------------|.......................... > ...........|--------GP2-----------|.................... > ....................|---------GP3---------------|........ > cpu#x ........|-------------------------|..........................> > |->insert a callback |->handle callback > The callback is handled more quickly, In average, a callback have to wait > (1 + 1/(3+1))=1.25GPs > > if we can have X simultaneous waiting GPs, a callback have to wait > (1 + 1/(X+1)) GPs in average. The default X in this patch is 15 > (including the reserved GPs). Assuming that the start times of the GPs are evenly spaced, correct? > 4) integrated expedited synchronize_rcu > expedited synchronize_rcu is implemented in rcuring.c, not in other files. > The whole of it is less than 50 lines of code. we just reserve several > GPs for expedited synchronize_rcu, when a expedited callback is inserted, > we start a new GP immediately(we have reserved GPs, this step will > very probably success), and then force this new GP to be completed quickly. > (thanks to multi-GP) I would be more convinced had you implemented force_quiescent_state() for preemptible RCU. ;-) > 5) preemptible > new simple preemptible code, rcu_read_lock/unlock() is simple and is inline function. > add new kernel-offset.c to avoid recursively header files inclusion. > > 6) top-down implies > irq and local_irq_disable() implies preempt_disable()/rcu_read_lock_sched(), > irq and local_irq_disable() implies rcu_read_lock(). (*) > preempt_disable()/rcu_read_lock_sched() implies rcu_read_lock(). (*) > *: In preemptible rcutree/rcutiny, it is false. > > 7) speech > I will give a speech about RCU and this RCU implementation > in Japan linuxcon 2010. Welcome to Japan Linuxcon 2010. Congratulations! > 8) core rcuring > (Comments and document is still been writing) > > all read_read_lock/unlock_domain() is nested in a coresponding > rcuring_lock/unlock(), and the algorithm handles a rcuring_lock/unlock() > ciritcal region as a full rcu read site ciritcal region. > > read site: > rcuring_lock() > lock a complete, and ensure this locked_complete - done_complete(1) > 0 > curr_complete(1) - locked_complete >= 0 > mb(1) This is really a compiler barrier? > > rcu_derefrence(ptr) > > rcuring_unlock() > mb(2) Ditto? > release its locked_complete. > > update site: > rcu_assign_pointer(ptr, new) > > (handle callback:call_rcu()+rcu_do_batch()) > (call_rcu():) > mb(3) > get the curr_complete(2) > set callback's complete as this curr_complete(2). > (we don't have any field to record the callback's complete, we just > emulate it by rdp->done_complete and the position of this callback > in rdp->batch.) > > time pass... until done_complete(2) - callback's complete > 0 I have to ask about counter overflow on 32-bit systems. > (rcu_do_batch():) > mb(4) > call the callback: (free old ptr, etc.) > > Signed-off-by: Lai Jiangshan <laijs@cn.fujitsu.com> > --- > Kbuild | 75 ++- > include/linux/rcupdate.h | 6 > include/linux/rcuring.h | 195 +++++++++ > include/linux/sched.h | 6 > init/Kconfig | 32 + > kernel/Makefile | 1 > kernel/kernel-offsets.c | 20 > kernel/rcuring.c | 1002 +++++++++++++++++++++++++++++++++++++++++++++++ > kernel/sched.c | 2 > kernel/time/tick-sched.c | 4 > 10 files changed, 1324 insertions(+), 19 deletions(-) > diff --git a/Kbuild b/Kbuild > index e3737ad..bce37cb 100644 > --- a/Kbuild > +++ b/Kbuild > @@ -3,7 +3,19 @@ > # This file takes care of the following: > # 1) Generate bounds.h > # 2) Generate asm-offsets.h (may need bounds.h) > -# 3) Check for missing system calls > +# 3) Generate kernel-offsets.h > +# 4) Check for missing system calls > + > +# Default sed regexp - multiline due to syntax constraints > +define sed-y > + "/^->/{s:->#\(.*\):/* \1 */:; \ > + s:^->\([^ ]*\) [\$$#]*\([^ ]*\) \(.*\):#define \1 \2 /* \3 */:; \ > + s:->::; p;}" > +endef > + > +quiet_cmd_offsets_cc_s_c = CC $(quiet_modtag) $@ > +cmd_offsets_cc_s_c = $(CC) -D__GENARATING_OFFSETS__ \ s/GENARATING/GENERATING/ just to be pedantic. ;-) > + $(c_flags) -fverbose-asm -S -o $@ $< Pulling out the offsets might have the advantage of allowing both rcu_read_lock() and rcu_read_unlock() to be inlined, but without pulling in include/linux/sched.h everywhere. I had been thinking in terms of moving the state from the task struct to the taskinfo struct, but this approach might be worth considering. > ##### > # 1) Generate bounds.h > @@ -43,22 +55,14 @@ $(obj)/$(bounds-file): kernel/bounds.s Kbuild > # 2) Generate asm-offsets.h > # > > -offsets-file := include/generated/asm-offsets.h > +asm-offsets-file := include/generated/asm-offsets.h > > -always += $(offsets-file) > -targets += $(offsets-file) > +always += $(asm-offsets-file) > +targets += $(asm-offsets-file) > targets += arch/$(SRCARCH)/kernel/asm-offsets.s > > - > -# Default sed regexp - multiline due to syntax constraints > -define sed-y > - "/^->/{s:->#\(.*\):/* \1 */:; \ > - s:^->\([^ ]*\) [\$$#]*\([^ ]*\) \(.*\):#define \1 \2 /* \3 */:; \ > - s:->::; p;}" > -endef > - > -quiet_cmd_offsets = GEN $@ > -define cmd_offsets > +quiet_cmd_asm_offsets = GEN $@ > +define cmd_asm_offsets > (set -e; \ > echo "#ifndef __ASM_OFFSETS_H__"; \ > echo "#define __ASM_OFFSETS_H__"; \ > @@ -78,13 +82,48 @@ endef > arch/$(SRCARCH)/kernel/asm-offsets.s: arch/$(SRCARCH)/kernel/asm-offsets.c \ > $(obj)/$(bounds-file) FORCE > $(Q)mkdir -p $(dir $@) > - $(call if_changed_dep,cc_s_c) > + $(call if_changed_dep,offsets_cc_s_c) > + > +$(obj)/$(asm-offsets-file): arch/$(SRCARCH)/kernel/asm-offsets.s Kbuild > + $(call cmd,asm_offsets) > + > +##### > +# 3) Generate kernel-offsets.h > +# > + > +kernel-offsets-file := include/generated/kernel-offsets.h > + > +always += $(kernel-offsets-file) > +targets += $(kernel-offsets-file) > +targets += kernel/kernel-offsets.s > + > +quiet_cmd_kernel_offsets = GEN $@ > +define cmd_kernel_offsets > + (set -e; \ > + echo "#ifndef __LINUX_KERNEL_OFFSETS_H__"; \ > + echo "#define __LINUX_KERNEL_OFFSETS_H__"; \ > + echo "/*"; \ > + echo " * DO NOT MODIFY."; \ > + echo " *"; \ > + echo " * This file was generated by Kbuild"; \ > + echo " *"; \ > + echo " */"; \ > + echo ""; \ > + sed -ne $(sed-y) $<; \ > + echo ""; \ > + echo "#endif" ) > $@ > +endef > + > +# We use internal kbuild rules to avoid the "is up to date" message from make > +kernel/kernel-offsets.s: kernel/kernel-offsets.c $(obj)/$(bounds-file) \ > + $(obj)/$(asm-offsets-file) FORCE > + $(call if_changed_dep,offsets_cc_s_c) > > -$(obj)/$(offsets-file): arch/$(SRCARCH)/kernel/asm-offsets.s Kbuild > - $(call cmd,offsets) > +$(obj)/$(kernel-offsets-file): kernel/kernel-offsets.s Kbuild > + $(call cmd,kernel_offsets) > > ##### > -# 3) Check for missing system calls > +# 4) Check for missing system calls > # > > quiet_cmd_syscalls = CALL $< > diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h > index 89414d6..42cd6bc 100644 > --- a/include/linux/rcupdate.h > +++ b/include/linux/rcupdate.h > @@ -124,6 +124,7 @@ extern void rcu_bh_qs(int cpu); > extern void rcu_check_callbacks(int cpu, int user); > struct notifier_block; > > +#ifndef CONFIG_RCURING > #ifdef CONFIG_NO_HZ > > extern void rcu_enter_nohz(void); > @@ -140,11 +141,14 @@ static inline void rcu_exit_nohz(void) > } > > #endif /* #else #ifdef CONFIG_NO_HZ */ > +#endif > > #if defined(CONFIG_TREE_RCU) || defined(CONFIG_TREE_PREEMPT_RCU) > #include <linux/rcutree.h> > #elif defined(CONFIG_TINY_RCU) || defined(CONFIG_TINY_PREEMPT_RCU) > #include <linux/rcutiny.h> > +#elif defined(CONFIG_RCURING) > +#include <linux/rcuring.h> > #else > #error "Unknown RCU implementation specified to kernel configuration" > #endif > @@ -686,6 +690,7 @@ struct rcu_synchronize { > > extern void wakeme_after_rcu(struct rcu_head *head); > > +#ifndef CONFIG_RCURING > #ifdef CONFIG_PREEMPT_RCU > > /** > @@ -710,6 +715,7 @@ extern void call_rcu(struct rcu_head *head, > #define call_rcu call_rcu_sched > > #endif /* #else #ifdef CONFIG_PREEMPT_RCU */ > +#endif > > /** > * call_rcu_bh() - Queue an RCU for invocation after a quicker grace period. > diff --git a/include/linux/rcuring.h b/include/linux/rcuring.h > new file mode 100644 > index 0000000..343b932 > --- /dev/null > +++ b/include/linux/rcuring.h > @@ -0,0 +1,195 @@ > +/* > + * Read-Copy Update mechanism for mutual exclusion > + * > + * This program is free software; you can redistribute it and/or modify > + * it under the terms of the GNU General Public License as published by > + * the Free Software Foundation; either version 2 of the License, or > + * (at your option) any later version. > + * > + * This program is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > + * GNU General Public License for more details. > + * > + * You should have received a copy of the GNU General Public License > + * along with this program; if not, write to the Free Software > + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. > + * > + * Copyright IBM Corporation, 2008 > + * Copyright Fujitsu 2008-2010 Lai Jiangshan > + * > + * Authors: Dipankar Sarma <dipankar@in.ibm.com> > + * Manfred Spraul <manfred@colorfullife.com> > + * Paul E. McKenney <paulmck@linux.vnet.ibm.com> Hierarchical version > + * Lai Jiangshan <laijs@cn.fujitsu.com> RCURING version > + */ > + > +#ifndef __LINUX_RCURING_H > +#define __LINUX_RCURING_H > + > +#define __rcu_read_lock_bh() local_bh_disable() > +#define __rcu_read_unlock_bh() local_bh_enable() > +#define __rcu_read_lock_sched() preempt_disable() > +#define __rcu_read_unlock_sched() preempt_enable() > + > +#ifdef CONFIG_RCURING_BH > +extern void call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *)); > +extern void synchronize_rcu_bh(void); > +extern void synchronize_rcu_bh_expedited(void); > +extern void rcu_bh_qs(int cpu); > +extern long rcu_batches_completed_bh(void); > +extern void rcu_barrier_bh(void); > +extern void rcu_bh_force_quiescent_state(void); > +#else /* CONFIG_RCURING_BH */ > +#define call_rcu_bh call_rcu_sched > +#define synchronize_rcu_bh synchronize_rcu_sched > +#define synchronize_rcu_bh_expedited synchronize_rcu_sched_expedited > +#define rcu_bh_qs(cpu) do { (void)(cpu); } while (0) > +#define rcu_batches_completed_bh rcu_batches_completed_sched > +#define rcu_barrier_bh rcu_barrier_sched > +#define rcu_bh_force_quiescent_state rcu_sched_force_quiescent_state > +#endif /* CONFIG_RCURING_BH */ > + > +extern > +void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *)); > +extern void synchronize_rcu_sched(void); > +#define synchronize_sched synchronize_rcu_sched > +extern void synchronize_rcu_sched_expedited(void); > +#define synchronize_sched_expedited synchronize_rcu_sched_expedited > +extern void rcu_note_context_switch(int cpu); > +extern long rcu_batches_completed_sched(void); > +extern void rcu_barrier_sched(void); > +extern void rcu_sched_force_quiescent_state(void); > + > +/* > + * The @flags is valid only when RCURING_PREEMPT_SAVED is set. > + * > + * When preemptible rcu read site is preempted, we save RCURING_PREEMPT_SAVED, > + * RCURING_PREEMPT_QS and this read site's locked_complete(only the last > + * RCURING_COUNTERS_SHIFT bits) in the @flags. > + * > + * When the outmost rcu read site is closed, we will clear > + * RCURING_PREEMPT_QS bit if it's saved, and let rcu system knows > + * this task has passed a quiescent state and release the old read site's > + * locked_complete. > + */ > +#define RCURING_PREEMPT_SAVED (1 << 31) > +#define RCURING_PREEMPT_QS (1 << 30) > +#define RCURING_PREEMPT_FLAGS (RCURING_PREEMPT_SAVED | RCURING_PREEMPT_QS) > + > +struct rcu_task_preempt { > + unsigned int nesting; > + unsigned int flags; > +}; > + > +static inline void rcu_read_lock_preempt(struct rcu_task_preempt *rcu_task) > +{ > + rcu_task->nesting++; > + barrier(); > +} > + > +static inline void rcu_read_unlock_preempt(struct rcu_task_preempt *rcu_task) > +{ > + int nesting = rcu_task->nesting; > + > + if (nesting == 1) { > + unsigned int flags; > + > + barrier(); > + flags = ACCESS_ONCE(rcu_task->flags); > + > + if ((flags & RCURING_PREEMPT_FLAGS) == RCURING_PREEMPT_FLAGS) { > + flags &= ~RCURING_PREEMPT_QS; > + ACCESS_ONCE(rcu_task->flags) = flags; > + } > + } > + > + barrier(); > + rcu_task->nesting = nesting - 1; > +} > + > +#ifdef CONFIG_RCURING_PREEMPT > + > +#ifdef __GENARATING_OFFSETS__ > +#define task_rcu_preempt(p) ((struct rcu_task_preempt *)NULL) The above looks a bit strange. The idea is that if you were generating offsets for the fields within rcu_task_preempt, you would select the fields? Something like the following? #define example_offset(p) &(((struct rcu_task_preempt *)NULL)->field_a) > +#else > +#include <generated/kernel-offsets.h> > +#define task_rcu_preempt(p) \ > + ((struct rcu_task_preempt *)(((void *)p) + TASK_RCU_PREEMPT)) > +#endif > + > +#define current_rcu_preempt() task_rcu_preempt(current) > +#define __rcu_read_lock() rcu_read_lock_preempt(current_rcu_preempt()) > +#define __rcu_read_unlock() rcu_read_unlock_preempt(current_rcu_preempt()) > +extern > +void call_rcu_preempt(struct rcu_head *head, void (*func)(struct rcu_head *)); > +#define call_rcu call_rcu_preempt > +extern void synchronize_rcu_preempt(void); > +#define synchronize_rcu synchronize_rcu_preempt > +extern void synchronize_rcu_preempt_expedited(void); > +#define synchronize_rcu_expedited synchronize_rcu_preempt_expedited > +extern long rcu_batches_completed_preempt(void); > +#define rcu_batches_completed rcu_batches_completed_preempt > +extern void rcu_barrier_preempt(void); > +#define rcu_barrier rcu_barrier_preempt > +extern void rcu_preempt_force_quiescent_state(void); > +#define rcu_force_quiescent_state rcu_preempt_force_quiescent_state > + > +struct task_struct; > +static inline void rcu_copy_process(struct task_struct *p) > +{ > + struct rcu_task_preempt *rcu_task = task_rcu_preempt(p); > + > + rcu_task->nesting = 0; > + rcu_task->flags = 0; > +} > + > +static inline void exit_rcu(void) > +{ > + if (current_rcu_preempt()->nesting) { > + WARN_ON(1); > + current_rcu_preempt()->nesting = 0; If the RCU core was waiting on this task, how does it learn that this task is no longer blocking the current grace period(s)? > + } > +} > + > +#else /*CONFIG_RCURING_PREEMPT */ > + > +#define __rcu_read_lock() __rcu_read_lock_sched(); > +#define __rcu_read_unlock() __rcu_read_unlock_sched(); > +#define call_rcu call_rcu_sched > +#define synchronize_rcu synchronize_rcu_sched > +#define synchronize_rcu_expedited synchronize_rcu_sched_expedited > +#define rcu_batches_completed rcu_batches_completed_sched > +#define rcu_barrier rcu_barrier_sched > +#define rcu_force_quiescent_state rcu_sched_force_quiescent_state > + > +static inline void rcu_copy_process(struct task_struct *p) {} > +static inline void exit_rcu(void) {} > +#endif /* CONFIG_RCURING_PREEMPT */ > + > +#ifdef CONFIG_NO_HZ > +extern void rcu_kernel_enter(void); > +extern void rcu_kernel_exit(void); > + > +static inline void rcu_nmi_enter(void) { rcu_kernel_enter(); } > +static inline void rcu_nmi_exit(void) { rcu_kernel_exit(); } > +static inline void rcu_irq_enter(void) { rcu_kernel_enter(); } > +static inline void rcu_irq_exit(void) { rcu_kernel_exit(); } > +extern void rcu_enter_nohz(void); > +extern void rcu_exit_nohz(void); > +#else > +static inline void rcu_nmi_enter(void) {} > +static inline void rcu_nmi_exit(void) {} > +static inline void rcu_irq_enter(void) {} > +static inline void rcu_irq_exit(void) {} > +static inline void rcu_enter_nohz(void) {} > +static inline void rcu_exit_nohz(void) {} > +#endif > + > +extern int rcu_needs_cpu(int cpu); > +extern void rcu_check_callbacks(int cpu, int user); > + > +extern void rcu_scheduler_starting(void); > +extern int rcu_scheduler_active __read_mostly; > + > +#endif /* __LINUX_RCURING_H */ > diff --git a/include/linux/sched.h b/include/linux/sched.h > index e18473f..15b332d 100644 > --- a/include/linux/sched.h > +++ b/include/linux/sched.h > @@ -1211,6 +1211,10 @@ struct task_struct { > struct rcu_node *rcu_blocked_node; > #endif /* #ifdef CONFIG_TREE_PREEMPT_RCU */ > > +#ifdef CONFIG_RCURING_PREEMPT > + struct rcu_task_preempt rcu_task_preempt; > +#endif > + > #if defined(CONFIG_SCHEDSTATS) || defined(CONFIG_TASK_DELAY_ACCT) > struct sched_info sched_info; > #endif > @@ -1757,7 +1761,7 @@ static inline void rcu_copy_process(struct task_struct *p) > INIT_LIST_HEAD(&p->rcu_node_entry); > } > > -#else > +#elif !defined(CONFIG_RCURING) > > static inline void rcu_copy_process(struct task_struct *p) > { > diff --git a/init/Kconfig b/init/Kconfig > index a619a1a..48e58d9 100644 > --- a/init/Kconfig > +++ b/init/Kconfig > @@ -374,6 +374,9 @@ config TINY_PREEMPT_RCU > for real-time UP systems. This option greatly reduces the > memory footprint of RCU. > > +config RCURING > + bool "Multi-GP ring based RCU" > + > endchoice > > config PREEMPT_RCU > @@ -450,6 +453,35 @@ config TREE_RCU_TRACE > TREE_PREEMPT_RCU implementations, permitting Makefile to > trivially select kernel/rcutree_trace.c. > > +config RCURING_BH > + bool "RCU for bh" > + default y > + depends on RCURING && !PREEMPT_RT > + help > + If Y, use a independent rcu domain for rcu_bh. > + If N, rcu_bh will map to rcu_sched(except rcu_read_lock_bh, > + rcu_read_unlock_bh). If I understand correctly, "N" defeats the purpose of bh, which is to have faster grace periods -- not having to wait for a context switch. > + > +config RCURING_BH_PREEMPT > + bool > + depends on PREEMPT_RT > + help > + rcu_bh will map to rcu_preempt. If I understand what you are doing, this config option will break networking. > + > +config RCURING_PREEMPT > + bool "RCURING based reemptible RCU" > + default n > + depends on RCURING && PREEMPT > + help > + If Y, normal rcu functionality will map to rcu_preempt. > + If N, normal rcu functionality will map to rcu_sched. > + > +config RCURING_COUNTERS_SHIFT > + int "RCURING's counter shift" > + range 1 10 > + depends on RCURING > + default 4 > + > endmenu # "RCU Subsystem" > > config IKCONFIG > diff --git a/kernel/Makefile b/kernel/Makefile > index 92b7420..66eab8c 100644 > --- a/kernel/Makefile > +++ b/kernel/Makefile > @@ -86,6 +86,7 @@ obj-$(CONFIG_TREE_PREEMPT_RCU) += rcutree.o > obj-$(CONFIG_TREE_RCU_TRACE) += rcutree_trace.o > obj-$(CONFIG_TINY_RCU) += rcutiny.o > obj-$(CONFIG_TINY_PREEMPT_RCU) += rcutiny.o > +obj-$(CONFIG_RCURING) += rcuring.o > obj-$(CONFIG_RELAY) += relay.o > obj-$(CONFIG_SYSCTL) += utsname_sysctl.o > obj-$(CONFIG_TASK_DELAY_ACCT) += delayacct.o > diff --git a/kernel/kernel-offsets.c b/kernel/kernel-offsets.c > new file mode 100644 > index 0000000..0d30817 > --- /dev/null > +++ b/kernel/kernel-offsets.c > @@ -0,0 +1,20 @@ > +/* > + * Generate definitions needed by assembly language modules. > + * > + * Copyright (C) 2008-2010 Lai Jiangshan > + */ > + > +#define __GENERATING_KERNEL_OFFSETS__ > +#include <linux/rcupdate.h> > +#include <linux/sched.h> > +#include <linux/kbuild.h> > + > +void foo(void); > + > +void foo(void) > +{ > +#ifdef CONFIG_RCURING_PREEMPT > + OFFSET(TASK_RCU_PREEMPT, task_struct, rcu_task_preempt); > +#endif > +} > + > diff --git a/kernel/rcuring.c b/kernel/rcuring.c > new file mode 100644 > index 0000000..e7cedb5 > --- /dev/null > +++ b/kernel/rcuring.c > @@ -0,0 +1,1002 @@ > +/* > + * Read-Copy Update mechanism for mutual exclusion > + * > + * This program is free software; you can redistribute it and/or modify > + * it under the terms of the GNU General Public License as published by > + * the Free Software Foundation; either version 2 of the License, or > + * (at your option) any later version. > + * > + * This program is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > + * GNU General Public License for more details. > + * > + * You should have received a copy of the GNU General Public License > + * along with this program; if not, write to the Free Software > + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. > + * > + * Copyright IBM Corporation, 2008 > + * Copyright Fujitsu 2008-2010 Lai Jiangshan > + * > + * Authors: Dipankar Sarma <dipankar@in.ibm.com> > + * Manfred Spraul <manfred@colorfullife.com> > + * Paul E. McKenney <paulmck@linux.vnet.ibm.com> Hierarchical version > + * Lai Jiangshan <laijs@cn.fujitsu.com> RCURING version > + */ > + > +#include <linux/rcupdate.h> > +#include <linux/percpu.h> > +#include <asm/atomic.h> > +#include <linux/spinlock.h> > +#include <linux/module.h> > +#include <linux/interrupt.h> > +#include <linux/cpu.h> > + > +#define RCURING_IDX_SHIFT CONFIG_RCURING_COUNTERS_SHIFT > +#define RCURING_IDX_MAX (1L << RCURING_IDX_SHIFT) > +#define RCURING_IDX_MASK (RCURING_IDX_MAX - 1) > +#define RCURING_IDX(complete) ((complete) & RCURING_IDX_MASK) > + > +struct rcuring { > + atomic_t ctr[RCURING_IDX_MAX]; > + long curr_complete; > + long done_complete; > + > + raw_spinlock_t lock; > +}; > + > +/* It is long history that we use -300 as an initial complete */ > +#define RCURING_INIT_COMPLETE -300L > +#define RCURING_INIT(name) \ > +{ \ > + {[RCURING_IDX(RCURING_INIT_COMPLETE)] = ATOMIC_INIT(1),}, \ > + RCURING_INIT_COMPLETE, \ > + RCURING_INIT_COMPLETE - 1, \ > + __RAW_SPIN_LOCK_UNLOCKED(name.lock), \ Please use the gcc-style initializers here. > +} > + > +static inline > +int rcuring_ctr_read(struct rcuring *rr, unsigned long complete) > +{ > + int res; > + > + /* atomic_read() does not ensure to imply barrier(). */ > + barrier(); > + res = atomic_read(rr->ctr + RCURING_IDX(complete)); > + barrier(); > + > + return res; > +} > + > +void __rcuring_advance_done_complete(struct rcuring *rr) > +{ > + long wait_complete = rr->done_complete + 1; > + > + if (!rcuring_ctr_read(rr, wait_complete)) { > + do { > + wait_complete++; > + } while (!rcuring_ctr_read(rr, wait_complete)); > + > + ACCESS_ONCE(rr->done_complete) = wait_complete - 1; > + } > +} For a given grace period, all quiescent states hit the same counter. Or am I missing something? > + > +static inline > +int rcuring_could_advance_done_complete(struct rcuring *rr) > +{ > + return !rcuring_ctr_read(rr, rr->done_complete + 1); > +} > + > +static inline > +void rcuring_advance_done_complete(struct rcuring *rr) > +{ > + if (rcuring_could_advance_done_complete(rr)) { > + if (__raw_spin_trylock(&rr->lock)) { > + __rcuring_advance_done_complete(rr); > + __raw_spin_unlock(&rr->lock); > + } > + } > +} > + > +void __rcuring_advance_curr_complete(struct rcuring *rr, long next_complete) > +{ > + long curr_complete = rr->curr_complete; > + > + if (curr_complete + 1 == next_complete) { > + if (!rcuring_ctr_read(rr, next_complete)) { > + ACCESS_ONCE(rr->curr_complete) = next_complete; > + smp_mb__before_atomic_inc(); > + atomic_inc(rr->ctr + RCURING_IDX(next_complete)); > + smp_mb__after_atomic_inc(); > + atomic_dec(rr->ctr + RCURING_IDX(curr_complete)); > + } > + } > +} > + > +static inline > +int rcuring_could_advance_complete(struct rcuring *rr, long next_complete) > +{ > + if (rcuring_could_advance_done_complete(rr)) > + return 1; > + > + if (rr->curr_complete + 1 == next_complete) { > + if (!rcuring_ctr_read(rr, next_complete)) > + return 1; > + } > + > + return 0; > +} > + > +static inline > +void rcuring_advance_complete(struct rcuring *rr, long next_complete) > +{ > + if (rcuring_could_advance_complete(rr, next_complete)) { > + if (__raw_spin_trylock(&rr->lock)) { > + __rcuring_advance_done_complete(rr); > + __rcuring_advance_curr_complete(rr, next_complete); > + __raw_spin_unlock(&rr->lock); > + } > + } > +} > + > +static inline > +void rcuring_advance_complete_force(struct rcuring *rr, long next_complete) > +{ > + if (rcuring_could_advance_complete(rr, next_complete)) { > + __raw_spin_lock(&rr->lock); > + __rcuring_advance_done_complete(rr); > + __rcuring_advance_curr_complete(rr, next_complete); > + __raw_spin_unlock(&rr->lock); > + } > +} > + > +static inline long __locked_complete_adjust(int locked_idx, long complete) > +{ > + long locked_complete; > + > + locked_complete = (complete & ~RCURING_IDX_MASK) | (long)locked_idx; > + if (locked_complete - complete <= 0) > + return locked_complete; > + else > + return locked_complete - RCURING_IDX_MAX; > +} > + > +static long rcuring_lock(struct rcuring *rr) > +{ > + long locked_complete; > + int idx; > + > + for (;;) { > + idx = RCURING_IDX(ACCESS_ONCE(rr->curr_complete)); > + if (likely(atomic_inc_not_zero(rr->ctr + idx))) > + break; > + } > + smp_mb__after_atomic_inc(); > + > + locked_complete = ACCESS_ONCE(rr->curr_complete); > + if (likely(RCURING_IDX(locked_complete) == idx)) > + return locked_complete; > + else > + return __locked_complete_adjust(idx, locked_complete); > +} > + > +static void rcuring_unlock(struct rcuring *rr, long locked_complete) > +{ > + smp_mb__before_atomic_dec(); > + > + atomic_dec(rr->ctr + RCURING_IDX(locked_complete)); > +} > + > +static inline > +void rcuring_dup_lock(struct rcuring *rr, long locked_complete) > +{ > + atomic_inc(rr->ctr + RCURING_IDX(locked_complete)); > +} > + > +static inline > +long rcuring_advance_lock(struct rcuring *rr, long locked_complete) > +{ > + long new_locked_complete = locked_complete; > + > + if (locked_complete != rr->curr_complete) { > + /* > + * Lock the new complete at first, and then release > + * the old one. It prevents errors when NMI/SMI occurs > + * after rcuring_unlock() - we still lock it. > + */ > + new_locked_complete = rcuring_lock(rr); > + rcuring_unlock(rr, locked_complete); > + } > + > + return new_locked_complete; > +} > + > +static inline long rcuring_get_done_complete(struct rcuring *rr) > +{ > + return ACCESS_ONCE(rr->done_complete); > +} > + > +static inline long rcuring_get_curr_complete(struct rcuring *rr) > +{ > + return ACCESS_ONCE(rr->curr_complete); > +} > + > +struct rcu_batch { > + struct rcu_head *list, **tail; > +}; > + > +static void rcu_batch_merge(struct rcu_batch *to, struct rcu_batch *from) > +{ > + if (from->list != NULL) { > + *to->tail = from->list; > + to->tail = from->tail; > + > + from->list = NULL; > + from->tail = &from->list; > + } > +} > + > +static void rcu_batch_add(struct rcu_batch *batch, struct rcu_head *new) > +{ > + *batch->tail = new; > + batch->tail = &new->next; > +} > + > +struct rcu_data { > + long locked_complete; > + > + /* > + * callbacks are in batches (done_complete, curr_complete] > + * curr_complete - done_complete >= 0 > + * curr_complete - done_complete <= RCURING_IDX_MAX > + * domain's curr_complete - curr_complete >= 0 > + * domain's done_complete - done_complete >= 0 > + * > + * curr_complete == done_complete just means @batch is empty. > + * we have at least one callback in batch[RCURING_IDX(done_complete)] > + * if curr_complete != done_complete > + */ > + long curr_complete; > + long done_complete; > + > + struct rcu_batch batch[RCURING_IDX_MAX]; > + struct rcu_batch done_batch; > + > + unsigned int qlen; > + unsigned long jiffies; > + unsigned long stall_jiffies; > +}; > + > +struct rcu_domain { > + struct rcuring rcuring; > + const char *domain_name; > + struct rcu_data __percpu *rcu_data; > + > + spinlock_t lock; /* this lock is for fqs or other misc things */ > + long fqs_complete; > + void (*force_quiescent_state)(struct rcu_domain *domain, > + long fqs_complete); > +}; > + > +#define EXPEDITED_GP_RESERVED_RECOMMEND (RCURING_IDX_SHIFT - 1) > + > +static inline long gp_reserved(struct rcu_domain *domain) > +{ > + return EXPEDITED_GP_RESERVED_RECOMMEND; > +} > + > +static inline void __init rcu_data_init(struct rcu_data *rdp) > +{ > + int idx; > + > + for (idx = 0; idx < RCURING_IDX_MAX; idx++) > + rdp->batch[idx].tail = &rdp->batch[idx].list; > + > + rdp->done_batch.tail = &rdp->done_batch.list; > +} > + > +static void do_advance_callbacks(struct rcu_data *rdp, long done_complete) > +{ > + int idx; > + > + while (rdp->done_complete != done_complete) { > + rdp->done_complete++; > + idx = RCURING_IDX(rdp->done_complete); > + rcu_batch_merge(&rdp->done_batch, rdp->batch + idx); > + } > +} > + > +/* Is value in the range (@left_open, @right_close] */ > +static inline bool in_range(long left_open, long right_close, long value) > +{ > + return left_open - value < 0 && value - right_close <= 0; > +} > + > +static void advance_callbacks(struct rcu_data *rdp, long done_complete, > + long curr_complete) > +{ > + if (in_range(done_complete, curr_complete, rdp->curr_complete)) { > + do_advance_callbacks(rdp, done_complete); > + } else { > + do_advance_callbacks(rdp, rdp->curr_complete); > + rdp->curr_complete = done_complete; > + rdp->done_complete = done_complete; > + } > +} > + > +static void __force_quiescent_state(struct rcu_domain *domain, > + long fqs_complete) > +{ > + int cpu; > + long fqs_complete_old; > + long curr_complete = rcuring_get_curr_complete(&domain->rcuring); > + long done_complete = rcuring_get_done_complete(&domain->rcuring); > + > + spin_lock(&domain->lock); > + > + fqs_complete_old = domain->fqs_complete; > + if (!in_range(done_complete, curr_complete, fqs_complete_old)) > + fqs_complete_old = done_complete; > + > + if (fqs_complete_old - fqs_complete >= 0) { > + spin_unlock(&domain->lock); > + return; > + } > + > + domain->fqs_complete = fqs_complete; > + spin_unlock(&domain->lock); > + > + for_each_online_cpu(cpu) { > + struct rcu_data *rdp = per_cpu_ptr(domain->rcu_data, cpu); > + long locked_complete = ACCESS_ONCE(rdp->locked_complete); > + > + if (!in_range(fqs_complete_old, fqs_complete, locked_complete)) > + continue; Is preemption disabled here? > + > + if (cpu == smp_processor_id()) > + set_tsk_need_resched(current); > + else > + smp_send_reschedule(cpu); > + } > +} > + > +static void force_quiescent_state(struct rcu_domain *domain, long fqs_complete) > +{ > + domain->force_quiescent_state(domain, fqs_complete); > +} > + > +static > +long prepare_for_new_callback(struct rcu_domain *domain, struct rcu_data *rdp) > +{ > + long curr_complete, done_complete; > + struct rcuring *rr = &domain->rcuring; > + > + smp_mb(); > + curr_complete = rcuring_get_curr_complete(rr); > + done_complete = rcuring_get_done_complete(rr); > + > + advance_callbacks(rdp, done_complete, curr_complete); > + rdp->curr_complete = curr_complete; > + > + if (!rdp->qlen) { > + rdp->jiffies = jiffies; > + rdp->stall_jiffies = jiffies; > + } > + > + return curr_complete; > +} > + > +static long __call_rcu(struct rcu_domain *domain, struct rcu_head *head, > + void (*func)(struct rcu_head *)) > +{ > + struct rcu_data *rdp; > + long curr_complete; > + > + head->next = NULL; > + head->func = func; > + > + rdp = this_cpu_ptr(domain->rcu_data); > + curr_complete = prepare_for_new_callback(domain, rdp); > + rcu_batch_add(rdp->batch + RCURING_IDX(curr_complete), head); > + rdp->qlen++; > + > + return curr_complete; > +} > + > +static void print_rcuring_stall(struct rcu_domain *domain) > +{ > + struct rcuring *rr = &domain->rcuring; > + unsigned long curr_complete = rcuring_get_curr_complete(rr); > + unsigned long done_complete = rcuring_get_done_complete(rr); > + int cpu; > + > + printk(KERN_ERR "RCU is stall, cpu=%d, domain_name=%s\n", smp_processor_id(), > + domain->domain_name); > + > + printk(KERN_ERR "domain's complete(done/curr)=%ld/%ld\n", > + curr_complete, done_complete); > + printk(KERN_ERR "curr_ctr=%d, wait_ctr=%d\n", > + rcuring_ctr_read(rr, curr_complete), > + rcuring_ctr_read(rr, done_complete + 1)); > + > + for_each_online_cpu(cpu) { > + struct rcu_data *rdp = per_cpu_ptr(domain->rcu_data, cpu); > + printk(KERN_ERR "cpu=%d, qlen=%d, complete(locked/dome/curr/)=" > + "%ld/%ld/%ld\n", cpu, rdp->qlen, > + rdp->locked_complete, rdp->done_complete, > + rdp->curr_complete); > + } > +} > + > +static void __rcu_check_callbacks(struct rcu_domain *domain, > + struct rcu_data *rdp, int in_rcu_softirq) > +{ > + long curr_complete, done_complete; > + struct rcuring *rr = &domain->rcuring; > + > + if (!rdp->qlen) > + return; > + > + rcuring_advance_done_complete(rr); > + > + curr_complete = rcuring_get_curr_complete(rr); > + done_complete = ACCESS_ONCE(rr->done_complete); > + advance_callbacks(rdp, done_complete, curr_complete); > + > + if (rdp->curr_complete == curr_complete > + && rdp->batch[RCURING_IDX(rdp->curr_complete)].list) { > + long max_gp_allow = RCURING_IDX_MAX - gp_reserved(domain); > + > + if (curr_complete - done_complete <= max_gp_allow) > + rcuring_advance_complete(rr, curr_complete + 1); > + } > + > + if (rdp->done_batch.list) { > + if (!in_rcu_softirq) > + raise_softirq(RCU_SOFTIRQ); > + } else { > + if (jiffies - rdp->jiffies > 10) { > + force_quiescent_state(domain, rdp->curr_complete); > + rdp->jiffies = jiffies; > + } > + if (jiffies - rdp->stall_jiffies > 2 * HZ) { > + print_rcuring_stall(domain); > + rdp->stall_jiffies = jiffies; > + } > + } > +} > + > +static void rcu_do_batch(struct rcu_domain *domain, struct rcu_data *rdp) > +{ > + int count = 0; > + struct rcu_head *list, *next; > + > + list = rdp->done_batch.list; > + > + if (list) { > + rdp->done_batch.list = NULL; > + rdp->done_batch.tail = &rdp->done_batch.list; > + > + local_irq_enable(); > + > + smp_mb(); > + > + while (list) { > + next = list->next; > + prefetch(next); > + list->func(list); > + count++; > + list = next; > + } > + local_irq_disable(); > + > + rdp->qlen -= count; > + rdp->jiffies = jiffies; > + } > +} > + > +static int __rcu_needs_cpu(struct rcu_data *rdp) > +{ > + return !!rdp->qlen; > +} > + > +static inline > +void __rcu_qsctr_inc(struct rcu_domain *domain, struct rcu_data *rdp) > +{ > + rdp->locked_complete = rcuring_advance_lock(&domain->rcuring, > + rdp->locked_complete); > +} > + > +static inline void rcu_preempt_save_complete(struct rcu_task_preempt *rcu_task, > + long locked_complete) > +{ > + unsigned int new_flags; > + > + new_flags = RCURING_PREEMPT_FLAGS | RCURING_IDX(locked_complete); > + ACCESS_ONCE(rcu_task->flags) = new_flags; > +} > + > +/* > + * Every cpu hold a lock_complete. But for preempt rcu, any task may also > + * hold a lock_complete. This function increases lock_complete for the current > + * cpu and check(dup_lock_and_save or release or advance) the lock_complete of > + * the current task. > + */ > +static inline > +void __rcu_preempt_qsctr_inc(struct rcu_domain *domain, struct rcu_data *rdp, > + struct rcu_task_preempt *rcu_task) > +{ > + long locked_complete = rdp->locked_complete; > + unsigned int flags = ACCESS_ONCE(rcu_task->flags); > + > + if (!(flags & RCURING_PREEMPT_SAVED)) { > + BUG_ON(flags & RCURING_PREEMPT_QS); > + if (rcu_task->nesting) { > + /* dup_lock_and_save current task's lock_complete */ > + rcuring_dup_lock(&domain->rcuring, locked_complete); > + rcu_preempt_save_complete(rcu_task, locked_complete); > + } > + } else if (!(rcu_task->nesting)) { > + /* release current task's lock_complete */ > + rcuring_unlock(&domain->rcuring, (long)flags); > + ACCESS_ONCE(rcu_task->flags) = 0; > + } else if (!(flags & RCURING_PREEMPT_QS)) { > + /* advance_and_save current task's lock_complete */ > + if (RCURING_IDX(locked_complete) != RCURING_IDX((long)flags)) { > + rcuring_unlock(&domain->rcuring, (long)flags); > + rcuring_dup_lock(&domain->rcuring, locked_complete); > + } > + rcu_preempt_save_complete(rcu_task, locked_complete); > + } > + > + /* increases lock_complete for the current cpu */ > + rdp->locked_complete = rcuring_advance_lock(&domain->rcuring, > + locked_complete); > +} When we need to boost the priority of preempted readers, how do we tell who they are? > + > +static void __synchronize_rcu(struct rcu_domain *domain, int expedited) > +{ > + struct rcu_synchronize rcu; > + long complete; > + > + init_completion(&rcu.completion); > + > + local_irq_disable(); > + complete = __call_rcu(domain, &rcu.head, wakeme_after_rcu); > + > + if (expedited) { > + /* > + * Fore a new gp to be started immediately(can use reserved gp). > + * But if all gp are used, it will fail to start new gp, > + * and we have to wait until some new gps available. > + */ > + rcuring_advance_complete_force(&domain->rcuring, > + complete + 1); It looks to me like rcuring_advance_complete_force() can just fail silently. How does this work? > + > + /* Fore qs and expedite the new gp */ > + force_quiescent_state(domain, complete); > + } > + local_irq_enable(); > + > + wait_for_completion(&rcu.completion); > +} > + > +static inline long __rcu_batches_completed(struct rcu_domain *domain) > +{ > + return rcuring_get_done_complete(&domain->rcuring); > +} > + > +#ifdef CONFIG_RCURING_BH > +#define gen_code_for_bh(gen_code, args...) gen_code(bh, ##args) Where is gen_code() defined? > +#else > +#define gen_code_for_bh(gen_code, args...) > +#endif > +#define gen_code_for_sched(gen_code, args...) gen_code(sched, ##args) > +#ifdef CONFIG_RCURING_PREEMPT > +#define gen_code_for_preempt(gen_code, args...) gen_code(preempt, ##args) > +#else > +#define gen_code_for_preempt(gen_code, args...) > +#endif > + > +#define GEN_CODES(gen_code, args...) \ > + gen_code_for_bh(gen_code, ##args) \ > + gen_code_for_sched(gen_code, ##args) \ > + gen_code_for_preempt(gen_code, ##args) \ > + > +#define gen_basic_code(domain) \ > + \ > +static void force_quiescent_state_##domain(struct rcu_domain *domain, \ > + long fqs_complete); \ > + \ > +static DEFINE_PER_CPU(struct rcu_data, rcu_data_##domain); \ > + \ > +struct rcu_domain rcu_domain_##domain = \ > +{ \ > + .rcuring = RCURING_INIT(rcu_domain_##domain.rcuring), \ > + .domain_name = #domain, \ > + .rcu_data = &rcu_data_##domain, \ > + .lock = __SPIN_LOCK_UNLOCKED(rcu_domain_##domain.lock), \ > + .force_quiescent_state = force_quiescent_state_##domain, \ > +}; \ > + \ > +void call_rcu_##domain(struct rcu_head *head, \ > + void (*func)(struct rcu_head *)) \ > +{ \ > + unsigned long flags; \ > + \ > + local_irq_save(flags); \ > + __call_rcu(&rcu_domain_##domain, head, func); \ > + local_irq_restore(flags); \ > +} \ > +EXPORT_SYMBOL_GPL(call_rcu_##domain); \ > + \ > +void synchronize_rcu_##domain(void) \ > +{ \ > + __synchronize_rcu(&rcu_domain_##domain, 0); \ > +} \ > +EXPORT_SYMBOL_GPL(synchronize_rcu_##domain); \ > + \ > +void synchronize_rcu_##domain##_expedited(void) \ > +{ \ > + __synchronize_rcu(&rcu_domain_##domain, 1); \ > +} \ > +EXPORT_SYMBOL_GPL(synchronize_rcu_##domain##_expedited); \ > + \ > +long rcu_batches_completed_##domain(void) \ > +{ \ > + return __rcu_batches_completed(&rcu_domain_##domain); \ > +} \ > +EXPORT_SYMBOL_GPL(rcu_batches_completed_##domain); \ > + \ > +void rcu_##domain##_force_quiescent_state(void) \ > +{ \ > + force_quiescent_state(&rcu_domain_##domain, 0); \ > +} \ > +EXPORT_SYMBOL_GPL(rcu_##domain##_force_quiescent_state); I used to use a single macro for the synchronize_rcu*() APIs, but was talked into separate functions. Maybe some of the more ornate recent macros have shifted people away from duplicate code in functions, so might be time to try again. ;-) > +GEN_CODES(gen_basic_code) > + > +static DEFINE_PER_CPU(int, kernel_count); > + > +#define LOCK_COMPLETE(domain) \ > + long complete_##domain = rcuring_lock(&rcu_domain_##domain.rcuring); > +#define SAVE_COMPLETE(domain) \ > + per_cpu(rcu_data_##domain, cpu).locked_complete = complete_##domain; > +#define LOAD_COMPLETE(domain) \ > + int complete_##domain = per_cpu(rcu_data_##domain, cpu).locked_complete; > +#define RELEASE_COMPLETE(domain) \ > + rcuring_unlock(&rcu_domain_##domain.rcuring, complete_##domain); > + > +static void __rcu_kernel_enter_outmost(int cpu) > +{ > + GEN_CODES(LOCK_COMPLETE) > + > + barrier(); > + per_cpu(kernel_count, cpu) = 1; > + barrier(); > + > + GEN_CODES(SAVE_COMPLETE) > +} > + > +static void __rcu_kernel_exit_outmost(int cpu) > +{ > + GEN_CODES(LOAD_COMPLETE) > + > + barrier(); > + per_cpu(kernel_count, cpu) = 0; > + barrier(); > + > + GEN_CODES(RELEASE_COMPLETE) > +} > + > +#ifdef CONFIG_RCURING_BH > +static void force_quiescent_state_bh(struct rcu_domain *domain, > + long fqs_complete) > +{ > + __force_quiescent_state(domain, fqs_complete); > +} > + > +static inline void rcu_bh_qsctr_inc_irqoff(int cpu) > +{ > + __rcu_qsctr_inc(&rcu_domain_bh, &per_cpu(rcu_data_bh, cpu)); > +} > + > +void rcu_bh_qs(int cpu) > +{ > + unsigned long flags; > + > + local_irq_save(flags); > + rcu_bh_qsctr_inc_irqoff(cpu); > + local_irq_restore(flags); > +} > + > +#else /* CONFIG_RCURING_BH */ > +static inline void rcu_bh_qsctr_inc_irqoff(int cpu) { (void)(cpu); } > +#endif /* CONFIG_RCURING_BH */ > + > +static void force_quiescent_state_sched(struct rcu_domain *domain, > + long fqs_complete) > +{ > + __force_quiescent_state(domain, fqs_complete); > +} > + > +static inline void rcu_sched_qsctr_inc_irqoff(int cpu) > +{ > + __rcu_qsctr_inc(&rcu_domain_sched, &per_cpu(rcu_data_sched, cpu)); > +} > + > +#ifdef CONFIG_RCURING_PREEMPT > +static void force_quiescent_state_preempt(struct rcu_domain *domain, > + long fqs_complete) > +{ > + /* To Be Implemented */ > +} > + > +static inline void rcu_preempt_qsctr_inc_irqoff(int cpu) > +{ > + __rcu_preempt_qsctr_inc(&rcu_domain_preempt, > + &per_cpu(rcu_data_preempt, cpu), > + ¤t->rcu_task_preempt); > +} > + > +#else /* CONFIG_RCURING_PREEMPT */ > +static inline void rcu_preempt_qsctr_inc_irqoff(int cpu) > +{ > + (void)(cpu); > + (void)(__rcu_preempt_qsctr_inc); > +} > +#endif /* CONFIG_RCURING_PREEMPT */ > + > +#define gen_needs_cpu(domain, cpu) \ > + if (__rcu_needs_cpu(&per_cpu(rcu_data_##domain, cpu))) \ > + return 1; > +int rcu_needs_cpu(int cpu) > +{ > + GEN_CODES(gen_needs_cpu, cpu) > + return 0; > +} > + > +#define call_func(domain, func, args...) \ > + func(&rcu_domain_##domain, &__get_cpu_var(rcu_data_##domain), ##args); > + > +/* > + * Note a context switch. This is a quiescent state for RCU-sched, > + * and requires special handling for preemptible RCU. > + */ > +void rcu_note_context_switch(int cpu) > +{ > + unsigned long flags; > + > +#ifdef CONFIG_HOTPLUG_CPU > + /* > + * The stoper thread need to schedule() to the idle thread > + * after CPU_DYING. > + */ > + if (unlikely(!per_cpu(kernel_count, cpu))) { > + BUG_ON(cpu_online(cpu)); > + return; > + } > +#endif > + > + local_irq_save(flags); > + rcu_bh_qsctr_inc_irqoff(cpu); > + rcu_sched_qsctr_inc_irqoff(cpu); > + rcu_preempt_qsctr_inc_irqoff(cpu); > + local_irq_restore(flags); > +} > + > +static int rcu_cpu_idle(int cpu) > +{ > + return idle_cpu(cpu) && rcu_scheduler_active && > + !in_softirq() && hardirq_count() <= (1 << HARDIRQ_SHIFT); > +} > + > +void rcu_check_callbacks(int cpu, int user) > +{ > + unsigned long flags; > + > + local_irq_save(flags); > + > + if (user || rcu_cpu_idle(cpu)) { > + rcu_bh_qsctr_inc_irqoff(cpu); > + rcu_sched_qsctr_inc_irqoff(cpu); > + rcu_preempt_qsctr_inc_irqoff(cpu); > + } else if (!in_softirq()) > + rcu_bh_qsctr_inc_irqoff(cpu); > + > + GEN_CODES(call_func, __rcu_check_callbacks, 0) > + local_irq_restore(flags); > +} > + > +static void rcu_process_callbacks(struct softirq_action *unused) > +{ > + local_irq_disable(); > + GEN_CODES(call_func, __rcu_check_callbacks, 1) > + GEN_CODES(call_func, rcu_do_batch) > + local_irq_enable(); > +} > + > +static void __cpuinit __rcu_offline_cpu(struct rcu_domain *domain, > + struct rcu_data *off_rdp, struct rcu_data *rdp) > +{ > + if (off_rdp->qlen > 0) { > + unsigned long flags; > + long curr_complete; > + > + /* move all callbacks in @off_rdp to @off_rdp->done_batch */ > + do_advance_callbacks(off_rdp, off_rdp->curr_complete); > + > + /* move all callbacks in @off_rdp to this cpu(@rdp) */ > + local_irq_save(flags); > + curr_complete = prepare_for_new_callback(domain, rdp); > + rcu_batch_merge(rdp->batch + RCURING_IDX(curr_complete), > + &off_rdp->done_batch); > + rdp->qlen += off_rdp->qlen; > + local_irq_restore(flags); > + > + off_rdp->qlen = 0; > + } > +} > + > +#define gen_offline(domain, cpu, recieve_cpu) \ > + __rcu_offline_cpu(&rcu_domain_##domain, \ > + &per_cpu(rcu_data_##domain, cpu), \ > + &per_cpu(rcu_data_##domain, recieve_cpu)); > + > +static int __cpuinit rcu_cpu_notify(struct notifier_block *self, > + unsigned long action, void *hcpu) > +{ > + int cpu = (long)hcpu; > + int recieve_cpu; > + > + (void)(recieve_cpu); > + (void)(__rcu_offline_cpu); > + (void)(__rcu_kernel_exit_outmost); > + > + switch (action) { > +#ifdef CONFIG_HOTPLUG_CPU > + case CPU_DYING: > + case CPU_DYING_FROZEN: > + /* > + * the machine is stopped now, we can access to > + * any other cpu data. > + * */ > + recieve_cpu = cpumask_any_but(cpu_online_mask, cpu); > + GEN_CODES(gen_offline, cpu, recieve_cpu) > + __rcu_kernel_exit_outmost(cpu); > + break; > +#endif > + case CPU_STARTING: > + case CPU_STARTING_FROZEN: > + __rcu_kernel_enter_outmost(cpu); > + default: > + break; > + } > + return NOTIFY_OK; > +} > + > +#define rcu_init_domain_data(domain, cpu) \ > + for_each_possible_cpu(cpu) \ > + rcu_data_init(&per_cpu(rcu_data_##domain, cpu)); > + > +void __init rcu_init(void) > +{ > + int cpu; > + > + GEN_CODES(rcu_init_domain_data, cpu) This one actually consumes more lines than would writing out the calls explicitly. ;-) > + > + __rcu_kernel_enter_outmost(raw_smp_processor_id()); > + open_softirq(RCU_SOFTIRQ, rcu_process_callbacks); > + cpu_notifier(rcu_cpu_notify, 0); > +} > + > +int rcu_scheduler_active __read_mostly; > +EXPORT_SYMBOL_GPL(rcu_scheduler_active); > + > +/* > + * This function is invoked towards the end of the scheduler's initialization > + * process. Before this is called, the idle task might contain > + * RCU read-side critical sections (during which time, this idle > + * task is booting the system). After this function is called, the > + * idle tasks are prohibited from containing RCU read-side critical > + * sections. This function also enables RCU lockdep checking. > + */ > +void rcu_scheduler_starting(void) > +{ > + rcu_scheduler_active = 1; > +} > + > +#ifdef CONFIG_NO_HZ > + > +void rcu_kernel_enter(void) > +{ > + unsigned long flags; > + > + raw_local_irq_save(flags); > + if (__get_cpu_var(kernel_count) == 0) > + __rcu_kernel_enter_outmost(raw_smp_processor_id()); > + else > + __get_cpu_var(kernel_count)++; > + raw_local_irq_restore(flags); > +} > + > +void rcu_kernel_exit(void) > +{ > + unsigned long flags; > + > + raw_local_irq_save(flags); > + if (__get_cpu_var(kernel_count) == 1) > + __rcu_kernel_exit_outmost(raw_smp_processor_id()); > + else > + __get_cpu_var(kernel_count)--; > + raw_local_irq_restore(flags); > +} > + > +void rcu_enter_nohz(void) > +{ > + unsigned long flags; > + > + raw_local_irq_save(flags); > + __rcu_kernel_exit_outmost(raw_smp_processor_id()); > + raw_local_irq_restore(flags); > +} > + > +void rcu_exit_nohz(void) > +{ > + unsigned long flags; > + > + raw_local_irq_save(flags); > + __rcu_kernel_enter_outmost(raw_smp_processor_id()); > + raw_local_irq_restore(flags); > +} > +#endif /* CONFIG_NO_HZ */ > + > +static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head); > +static struct completion rcu_barrier_completion; > +static struct kref rcu_barrier_wait_ref; > +static DEFINE_MUTEX(rcu_barrier_mutex); > + > +static void rcu_barrier_release_ref(struct kref *notused) > +{ > + complete(&rcu_barrier_completion); > +} > + > +static void rcu_barrier_callback(struct rcu_head *notused) > +{ > + kref_put(&rcu_barrier_wait_ref, rcu_barrier_release_ref); > +} > + > +static void rcu_barrier_func(void *data) > +{ > + struct rcu_domain *rcu_domain = data; > + > + kref_get(&rcu_barrier_wait_ref); > + __call_rcu(rcu_domain, &__get_cpu_var(rcu_barrier_head), > + rcu_barrier_callback); > +} > + > +static void __rcu_barrier(struct rcu_domain *rcu_domain) > +{ > + mutex_lock(&rcu_barrier_mutex); > + > + init_completion(&rcu_barrier_completion); > + kref_init(&rcu_barrier_wait_ref); > + > + /* queue barrier rcu_heads for every cpu */ > + on_each_cpu(rcu_barrier_func, rcu_domain, 1); > + > + kref_put(&rcu_barrier_wait_ref, rcu_barrier_release_ref); > + wait_for_completion(&rcu_barrier_completion); > + > + mutex_unlock(&rcu_barrier_mutex); > +} > + > +#define gen_rcu_barrier(domain) \ > +void rcu_barrier_##domain(void) \ > +{ \ > + __rcu_barrier(&rcu_domain_##domain); \ > +} \ > +EXPORT_SYMBOL_GPL(rcu_barrier_##domain); > + > +GEN_CODES(gen_rcu_barrier) > + > diff --git a/kernel/sched.c b/kernel/sched.c > index 09b574e..967f25a 100644 > --- a/kernel/sched.c > +++ b/kernel/sched.c > @@ -9113,6 +9113,7 @@ struct cgroup_subsys cpuacct_subsys = { > }; > #endif /* CONFIG_CGROUP_CPUACCT */ > > +#ifndef CONFIG_RCURING > #ifndef CONFIG_SMP > > void synchronize_sched_expedited(void) > @@ -9182,3 +9183,4 @@ void synchronize_sched_expedited(void) > EXPORT_SYMBOL_GPL(synchronize_sched_expedited); > > #endif /* #else #ifndef CONFIG_SMP */ > +#endif /* CONFIG_RCURING */ > diff --git a/kernel/time/tick-sched.c b/kernel/time/tick-sched.c > index 3e216e0..0eba561 100644 > --- a/kernel/time/tick-sched.c > +++ b/kernel/time/tick-sched.c > @@ -269,6 +269,10 @@ void tick_nohz_stop_sched_tick(int inidle) > cpu = smp_processor_id(); > ts = &per_cpu(tick_cpu_sched, cpu); > > + /* Don't enter nohz when cpu is offlining, it is going to die */ > + if (cpu_is_offline(cpu)) > + goto end; > + > /* > * Call to tick_nohz_start_idle stops the last_update_time from being > * updated. Thus, it must not be called in the event we are called from > ^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [PATCH, RFC] RCU: New scalable, multi-GP and preemptible RCU implementation 2010-08-30 23:57 ` Paul E. McKenney @ 2010-08-31 3:03 ` Lai Jiangshan 2010-09-08 19:53 ` Paul E. McKenney 0 siblings, 1 reply; 8+ messages in thread From: Lai Jiangshan @ 2010-08-31 3:03 UTC (permalink / raw) To: paulmck Cc: Linus Torvalds, Ingo Molnar, Andrew Morton, David Miller, Manfred Spraul, Mathieu Desnoyers, Steven Rostedt, Thomas Gleixner, Peter Zijlstra, LKML, dipankar On 08/31/2010 07:57 AM, Paul E. McKenney wrote: > On Mon, Aug 30, 2010 at 05:31:44PM +0800, Lai Jiangshan wrote: >> >> 0) contents >> 1) overview >> 2) scalable >> 3) multi-GP >> 4) integrated expedited synchronize_rcu >> 5) preemptible >> 6) top-down implies >> 7) speech >> 8) core rcuring > > Interesting! I can't claim to have dug through every detail, but figured > I should ask a few questions first. > > Thanx, Paul > > Of course, the first question is whether it passes rcutorture, and if > so on what hardware configuration. Only in x86(32bit and 64bit) system. Did you patch it in power and test it? It' need more test for different archs, but I have no other arch. What is "hardware configuration"? > > At first glance, it looks to live somewhere between Manfred Spraul's > counter-based hierarchical RCU and classic RCU. There are also some > resemblances to K42's "generations" RCU-like mechanism. > >> 1) overview >> This is a new RCU implementation: RCURING. It uses a ring atomic counters >> to control the completion of GPs and a ring callbacks batches to process >> the callbacks. > > The ring is a set of grace periods, correct? correct. A GP is also controled by a set ring counters in (domain's done_complete, this GP's complete], this GP is only completed until all these conters become zero. I pull up a question of your here: > For a given grace period, all quiescent states hit the same counter. > Or am I missing something? No. any rcu read site has a locked_complete, if a cpu hold a locked_complete, the counter of this locked_complete will not become zero. Any quiescent states will release its previous rcu read site's locked_complete, so different QS may hit different counter. QS alway locks the newest GP(not started to waiting, domain's curr_complete) for next rcu read site and get the new locked_complete. What I said seems still indistinct, more questions are wellcome, questions also help for documents. Thank you very much. > >> In 2008, I think out the idea, and wrote the first 300lines code of rcuring.c, >> but I'm too lazy to finish it >> >> 2) scalable >> There is only 2 locks in every RCU domain(rcu_bh, rcu_sched, rcu_preempt), >> one of them is for misc things, it is a infrequency lock, the other is >> for advancing complete, it is frequency lock. But somebody else would >> very probably do the same work for me if somebody else is holding this lock, >> so we alway use spin_trylock() for this lock, so it is scalable. >> >> We don't need any lock for reporting quiescent state, we just need 2 >> atomic operations for every RCU domain. > > If you have a single GP in flight, all CPUs need to do an atomic op on the > same element of the RCU ring, correct? Yes, but it seldom have only a single GP in flight. We start GPs when need. > My first thought was that each CPU > was writing the current grace-period number to its own variable, but that > would not require an atomic operation. Of course, the downside of this > latter approach is that some CPU would need to scan all CPUs' counters. > >> 3) multi-GP >> All old rcu implementations have only one waiting GP. >> >> time -----ta-tb--------------------tc----------------td-------------> >> GP -----|------GP1---------------|--------GP2------|-------------> >> cpu#x ........|---------------------------------------|...............> >> |->insert a callback |->handle callback >> >> the callback have to wait a partial GP1, and a full GP2, it may wait >> near 2 GPs in worse case because it have to wait the rcu read sites witch >> start at (tb, tc] which are needless to wait. In average, a callback >> have to wait (1 + 1/(1+1))= 1.5GPs >> >> if we can have three simultaneous waiting GPs: >> >> GPs .....|------GP1---------------|.......................... >> ...........|--------GP2-----------|.................... >> ....................|---------GP3---------------|........ >> cpu#x ........|-------------------------|..........................> >> |->insert a callback |->handle callback >> The callback is handled more quickly, In average, a callback have to wait >> (1 + 1/(3+1))=1.25GPs >> >> if we can have X simultaneous waiting GPs, a callback have to wait >> (1 + 1/(X+1)) GPs in average. The default X in this patch is 15 >> (including the reserved GPs). > > Assuming that the start times of the GPs are evenly spaced, correct? No, We start GPs when need. If a cpu has queued some callbacks, and the callbacks's complete is equals to rcu domain's curr_complete, this cpu will start a new GPs for these callbacks. > >> 4) integrated expedited synchronize_rcu >> expedited synchronize_rcu is implemented in rcuring.c, not in other files. >> The whole of it is less than 50 lines of code. we just reserve several >> GPs for expedited synchronize_rcu, when a expedited callback is inserted, >> we start a new GP immediately(we have reserved GPs, this step will >> very probably success), and then force this new GP to be completed quickly. >> (thanks to multi-GP) > > I would be more convinced had you implemented force_quiescent_state() > for preemptible RCU. ;-) > >> 5) preemptible >> new simple preemptible code, rcu_read_lock/unlock() is simple and is inline function. >> add new kernel-offset.c to avoid recursively header files inclusion. >> >> 6) top-down implies >> irq and local_irq_disable() implies preempt_disable()/rcu_read_lock_sched(), >> irq and local_irq_disable() implies rcu_read_lock(). (*) >> preempt_disable()/rcu_read_lock_sched() implies rcu_read_lock(). (*) >> *: In preemptible rcutree/rcutiny, it is false. >> >> 7) speech >> I will give a speech about RCU and this RCU implementation >> in Japan linuxcon 2010. Welcome to Japan Linuxcon 2010. > > Congratulations! > >> 8) core rcuring >> (Comments and document is still been writing) >> >> all read_read_lock/unlock_domain() is nested in a coresponding >> rcuring_lock/unlock(), and the algorithm handles a rcuring_lock/unlock() >> ciritcal region as a full rcu read site ciritcal region. >> >> read site: >> rcuring_lock() >> lock a complete, and ensure this locked_complete - done_complete(1) > 0 >> curr_complete(1) - locked_complete >= 0 >> mb(1) > > This is really a compiler barrier? a really memory barrier. these memory barriers are not in rcu_read_lock(), there in the code where pass QS. > >> >> rcu_derefrence(ptr) >> >> rcuring_unlock() >> mb(2) > > Ditto? > >> release its locked_complete. >> >> update site: >> rcu_assign_pointer(ptr, new) >> >> (handle callback:call_rcu()+rcu_do_batch()) >> (call_rcu():) >> mb(3) >> get the curr_complete(2) >> set callback's complete as this curr_complete(2). >> (we don't have any field to record the callback's complete, we just >> emulate it by rdp->done_complete and the position of this callback >> in rdp->batch.) >> >> time pass... until done_complete(2) - callback's complete > 0 > > I have to ask about counter overflow on 32-bit systems. The "complete" is "long" not "unsigned long" if callback's complete - rdp->curr_complete > 0, we also queue this callback to done_batch. > >> (rcu_do_batch():) >> mb(4) >> call the callback: (free old ptr, etc.) >> >> Signed-off-by: Lai Jiangshan <laijs@cn.fujitsu.com> >> --- >> Kbuild | 75 ++- >> include/linux/rcupdate.h | 6 >> include/linux/rcuring.h | 195 +++++++++ >> include/linux/sched.h | 6 >> init/Kconfig | 32 + >> kernel/Makefile | 1 >> kernel/kernel-offsets.c | 20 >> kernel/rcuring.c | 1002 +++++++++++++++++++++++++++++++++++++++++++++++ >> kernel/sched.c | 2 >> kernel/time/tick-sched.c | 4 >> 10 files changed, 1324 insertions(+), 19 deletions(-) >> diff --git a/Kbuild b/Kbuild >> index e3737ad..bce37cb 100644 >> --- a/Kbuild >> +++ b/Kbuild >> @@ -3,7 +3,19 @@ >> # This file takes care of the following: >> # 1) Generate bounds.h >> # 2) Generate asm-offsets.h (may need bounds.h) >> -# 3) Check for missing system calls >> +# 3) Generate kernel-offsets.h >> +# 4) Check for missing system calls >> + >> +# Default sed regexp - multiline due to syntax constraints >> +define sed-y >> + "/^->/{s:->#\(.*\):/* \1 */:; \ >> + s:^->\([^ ]*\) [\$$#]*\([^ ]*\) \(.*\):#define \1 \2 /* \3 */:; \ >> + s:->::; p;}" >> +endef >> + >> +quiet_cmd_offsets_cc_s_c = CC $(quiet_modtag) $@ >> +cmd_offsets_cc_s_c = $(CC) -D__GENARATING_OFFSETS__ \ > > s/GENARATING/GENERATING/ just to be pedantic. ;-) > >> + $(c_flags) -fverbose-asm -S -o $@ $< > > Pulling out the offsets might have the advantage of allowing both > rcu_read_lock() and rcu_read_unlock() to be inlined, but without pulling > in include/linux/sched.h everywhere. I had been thinking in terms of > moving the state from the task struct to the taskinfo struct, but > this approach might be worth considering. > >> ##### >> # 1) Generate bounds.h >> @@ -43,22 +55,14 @@ $(obj)/$(bounds-file): kernel/bounds.s Kbuild >> # 2) Generate asm-offsets.h >> # >> >> -offsets-file := include/generated/asm-offsets.h >> +asm-offsets-file := include/generated/asm-offsets.h >> >> -always += $(offsets-file) >> -targets += $(offsets-file) >> +always += $(asm-offsets-file) >> +targets += $(asm-offsets-file) >> targets += arch/$(SRCARCH)/kernel/asm-offsets.s >> >> - >> -# Default sed regexp - multiline due to syntax constraints >> -define sed-y >> - "/^->/{s:->#\(.*\):/* \1 */:; \ >> - s:^->\([^ ]*\) [\$$#]*\([^ ]*\) \(.*\):#define \1 \2 /* \3 */:; \ >> - s:->::; p;}" >> -endef >> - >> -quiet_cmd_offsets = GEN $@ >> -define cmd_offsets >> +quiet_cmd_asm_offsets = GEN $@ >> +define cmd_asm_offsets >> (set -e; \ >> echo "#ifndef __ASM_OFFSETS_H__"; \ >> echo "#define __ASM_OFFSETS_H__"; \ >> @@ -78,13 +82,48 @@ endef >> arch/$(SRCARCH)/kernel/asm-offsets.s: arch/$(SRCARCH)/kernel/asm-offsets.c \ >> $(obj)/$(bounds-file) FORCE >> $(Q)mkdir -p $(dir $@) >> - $(call if_changed_dep,cc_s_c) >> + $(call if_changed_dep,offsets_cc_s_c) >> + >> +$(obj)/$(asm-offsets-file): arch/$(SRCARCH)/kernel/asm-offsets.s Kbuild >> + $(call cmd,asm_offsets) >> + >> +##### >> +# 3) Generate kernel-offsets.h >> +# >> + >> +kernel-offsets-file := include/generated/kernel-offsets.h >> + >> +always += $(kernel-offsets-file) >> +targets += $(kernel-offsets-file) >> +targets += kernel/kernel-offsets.s >> + >> +quiet_cmd_kernel_offsets = GEN $@ >> +define cmd_kernel_offsets >> + (set -e; \ >> + echo "#ifndef __LINUX_KERNEL_OFFSETS_H__"; \ >> + echo "#define __LINUX_KERNEL_OFFSETS_H__"; \ >> + echo "/*"; \ >> + echo " * DO NOT MODIFY."; \ >> + echo " *"; \ >> + echo " * This file was generated by Kbuild"; \ >> + echo " *"; \ >> + echo " */"; \ >> + echo ""; \ >> + sed -ne $(sed-y) $<; \ >> + echo ""; \ >> + echo "#endif" ) > $@ >> +endef >> + >> +# We use internal kbuild rules to avoid the "is up to date" message from make >> +kernel/kernel-offsets.s: kernel/kernel-offsets.c $(obj)/$(bounds-file) \ >> + $(obj)/$(asm-offsets-file) FORCE >> + $(call if_changed_dep,offsets_cc_s_c) >> >> -$(obj)/$(offsets-file): arch/$(SRCARCH)/kernel/asm-offsets.s Kbuild >> - $(call cmd,offsets) >> +$(obj)/$(kernel-offsets-file): kernel/kernel-offsets.s Kbuild >> + $(call cmd,kernel_offsets) >> >> ##### >> -# 3) Check for missing system calls >> +# 4) Check for missing system calls >> # >> >> quiet_cmd_syscalls = CALL $< >> diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h >> index 89414d6..42cd6bc 100644 >> --- a/include/linux/rcupdate.h >> +++ b/include/linux/rcupdate.h >> @@ -124,6 +124,7 @@ extern void rcu_bh_qs(int cpu); >> extern void rcu_check_callbacks(int cpu, int user); >> struct notifier_block; >> >> +#ifndef CONFIG_RCURING >> #ifdef CONFIG_NO_HZ >> >> extern void rcu_enter_nohz(void); >> @@ -140,11 +141,14 @@ static inline void rcu_exit_nohz(void) >> } >> >> #endif /* #else #ifdef CONFIG_NO_HZ */ >> +#endif >> >> #if defined(CONFIG_TREE_RCU) || defined(CONFIG_TREE_PREEMPT_RCU) >> #include <linux/rcutree.h> >> #elif defined(CONFIG_TINY_RCU) || defined(CONFIG_TINY_PREEMPT_RCU) >> #include <linux/rcutiny.h> >> +#elif defined(CONFIG_RCURING) >> +#include <linux/rcuring.h> >> #else >> #error "Unknown RCU implementation specified to kernel configuration" >> #endif >> @@ -686,6 +690,7 @@ struct rcu_synchronize { >> >> extern void wakeme_after_rcu(struct rcu_head *head); >> >> +#ifndef CONFIG_RCURING >> #ifdef CONFIG_PREEMPT_RCU >> >> /** >> @@ -710,6 +715,7 @@ extern void call_rcu(struct rcu_head *head, >> #define call_rcu call_rcu_sched >> >> #endif /* #else #ifdef CONFIG_PREEMPT_RCU */ >> +#endif >> >> /** >> * call_rcu_bh() - Queue an RCU for invocation after a quicker grace period. >> diff --git a/include/linux/rcuring.h b/include/linux/rcuring.h >> new file mode 100644 >> index 0000000..343b932 >> --- /dev/null >> +++ b/include/linux/rcuring.h >> @@ -0,0 +1,195 @@ >> +/* >> + * Read-Copy Update mechanism for mutual exclusion >> + * >> + * This program is free software; you can redistribute it and/or modify >> + * it under the terms of the GNU General Public License as published by >> + * the Free Software Foundation; either version 2 of the License, or >> + * (at your option) any later version. >> + * >> + * This program is distributed in the hope that it will be useful, >> + * but WITHOUT ANY WARRANTY; without even the implied warranty of >> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the >> + * GNU General Public License for more details. >> + * >> + * You should have received a copy of the GNU General Public License >> + * along with this program; if not, write to the Free Software >> + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. >> + * >> + * Copyright IBM Corporation, 2008 >> + * Copyright Fujitsu 2008-2010 Lai Jiangshan >> + * >> + * Authors: Dipankar Sarma <dipankar@in.ibm.com> >> + * Manfred Spraul <manfred@colorfullife.com> >> + * Paul E. McKenney <paulmck@linux.vnet.ibm.com> Hierarchical version >> + * Lai Jiangshan <laijs@cn.fujitsu.com> RCURING version >> + */ >> + >> +#ifndef __LINUX_RCURING_H >> +#define __LINUX_RCURING_H >> + >> +#define __rcu_read_lock_bh() local_bh_disable() >> +#define __rcu_read_unlock_bh() local_bh_enable() >> +#define __rcu_read_lock_sched() preempt_disable() >> +#define __rcu_read_unlock_sched() preempt_enable() >> + >> +#ifdef CONFIG_RCURING_BH >> +extern void call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *)); >> +extern void synchronize_rcu_bh(void); >> +extern void synchronize_rcu_bh_expedited(void); >> +extern void rcu_bh_qs(int cpu); >> +extern long rcu_batches_completed_bh(void); >> +extern void rcu_barrier_bh(void); >> +extern void rcu_bh_force_quiescent_state(void); >> +#else /* CONFIG_RCURING_BH */ >> +#define call_rcu_bh call_rcu_sched >> +#define synchronize_rcu_bh synchronize_rcu_sched >> +#define synchronize_rcu_bh_expedited synchronize_rcu_sched_expedited >> +#define rcu_bh_qs(cpu) do { (void)(cpu); } while (0) >> +#define rcu_batches_completed_bh rcu_batches_completed_sched >> +#define rcu_barrier_bh rcu_barrier_sched >> +#define rcu_bh_force_quiescent_state rcu_sched_force_quiescent_state >> +#endif /* CONFIG_RCURING_BH */ >> + >> +extern >> +void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *)); >> +extern void synchronize_rcu_sched(void); >> +#define synchronize_sched synchronize_rcu_sched >> +extern void synchronize_rcu_sched_expedited(void); >> +#define synchronize_sched_expedited synchronize_rcu_sched_expedited >> +extern void rcu_note_context_switch(int cpu); >> +extern long rcu_batches_completed_sched(void); >> +extern void rcu_barrier_sched(void); >> +extern void rcu_sched_force_quiescent_state(void); >> + >> +/* >> + * The @flags is valid only when RCURING_PREEMPT_SAVED is set. >> + * >> + * When preemptible rcu read site is preempted, we save RCURING_PREEMPT_SAVED, >> + * RCURING_PREEMPT_QS and this read site's locked_complete(only the last >> + * RCURING_COUNTERS_SHIFT bits) in the @flags. >> + * >> + * When the outmost rcu read site is closed, we will clear >> + * RCURING_PREEMPT_QS bit if it's saved, and let rcu system knows >> + * this task has passed a quiescent state and release the old read site's >> + * locked_complete. >> + */ >> +#define RCURING_PREEMPT_SAVED (1 << 31) >> +#define RCURING_PREEMPT_QS (1 << 30) >> +#define RCURING_PREEMPT_FLAGS (RCURING_PREEMPT_SAVED | RCURING_PREEMPT_QS) >> + >> +struct rcu_task_preempt { >> + unsigned int nesting; >> + unsigned int flags; >> +}; >> + >> +static inline void rcu_read_lock_preempt(struct rcu_task_preempt *rcu_task) >> +{ >> + rcu_task->nesting++; >> + barrier(); >> +} >> + >> +static inline void rcu_read_unlock_preempt(struct rcu_task_preempt *rcu_task) >> +{ >> + int nesting = rcu_task->nesting; >> + >> + if (nesting == 1) { >> + unsigned int flags; >> + >> + barrier(); >> + flags = ACCESS_ONCE(rcu_task->flags); >> + >> + if ((flags & RCURING_PREEMPT_FLAGS) == RCURING_PREEMPT_FLAGS) { >> + flags &= ~RCURING_PREEMPT_QS; >> + ACCESS_ONCE(rcu_task->flags) = flags; >> + } >> + } >> + >> + barrier(); >> + rcu_task->nesting = nesting - 1; >> +} >> + >> +#ifdef CONFIG_RCURING_PREEMPT >> + >> +#ifdef __GENARATING_OFFSETS__ >> +#define task_rcu_preempt(p) ((struct rcu_task_preempt *)NULL) > > The above looks a bit strange. The idea is that if you were generating > offsets for the fields within rcu_task_preempt, you would select the > fields? Something like the following? when __GENARATING_OFFSETS__, no one really use task_rcu_preempt(), it just for avoiding the compiling warnings. > > #define example_offset(p) &(((struct rcu_task_preempt *)NULL)->field_a) > >> +#else >> +#include <generated/kernel-offsets.h> >> +#define task_rcu_preempt(p) \ >> + ((struct rcu_task_preempt *)(((void *)p) + TASK_RCU_PREEMPT)) >> +#endif >> + >> +#define current_rcu_preempt() task_rcu_preempt(current) >> +#define __rcu_read_lock() rcu_read_lock_preempt(current_rcu_preempt()) >> +#define __rcu_read_unlock() rcu_read_unlock_preempt(current_rcu_preempt()) >> +extern >> +void call_rcu_preempt(struct rcu_head *head, void (*func)(struct rcu_head *)); >> +#define call_rcu call_rcu_preempt >> +extern void synchronize_rcu_preempt(void); >> +#define synchronize_rcu synchronize_rcu_preempt >> +extern void synchronize_rcu_preempt_expedited(void); >> +#define synchronize_rcu_expedited synchronize_rcu_preempt_expedited >> +extern long rcu_batches_completed_preempt(void); >> +#define rcu_batches_completed rcu_batches_completed_preempt >> +extern void rcu_barrier_preempt(void); >> +#define rcu_barrier rcu_barrier_preempt >> +extern void rcu_preempt_force_quiescent_state(void); >> +#define rcu_force_quiescent_state rcu_preempt_force_quiescent_state >> + >> +struct task_struct; >> +static inline void rcu_copy_process(struct task_struct *p) >> +{ >> + struct rcu_task_preempt *rcu_task = task_rcu_preempt(p); >> + >> + rcu_task->nesting = 0; >> + rcu_task->flags = 0; >> +} >> + >> +static inline void exit_rcu(void) >> +{ >> + if (current_rcu_preempt()->nesting) { >> + WARN_ON(1); >> + current_rcu_preempt()->nesting = 0; > > If the RCU core was waiting on this task, how does it learn that this > task is no longer blocking the current grace period(s)? A schedule() will be called after exit_rcu(), and RCU core collect infos when task do schedule(). > >> + } >> +} >> + >> +#else /*CONFIG_RCURING_PREEMPT */ >> + >> +#define __rcu_read_lock() __rcu_read_lock_sched(); >> +#define __rcu_read_unlock() __rcu_read_unlock_sched(); >> +#define call_rcu call_rcu_sched >> +#define synchronize_rcu synchronize_rcu_sched >> +#define synchronize_rcu_expedited synchronize_rcu_sched_expedited >> +#define rcu_batches_completed rcu_batches_completed_sched >> +#define rcu_barrier rcu_barrier_sched >> +#define rcu_force_quiescent_state rcu_sched_force_quiescent_state >> + >> +static inline void rcu_copy_process(struct task_struct *p) {} >> +static inline void exit_rcu(void) {} >> +#endif /* CONFIG_RCURING_PREEMPT */ >> + >> +#ifdef CONFIG_NO_HZ >> +extern void rcu_kernel_enter(void); >> +extern void rcu_kernel_exit(void); >> + >> +static inline void rcu_nmi_enter(void) { rcu_kernel_enter(); } >> +static inline void rcu_nmi_exit(void) { rcu_kernel_exit(); } >> +static inline void rcu_irq_enter(void) { rcu_kernel_enter(); } >> +static inline void rcu_irq_exit(void) { rcu_kernel_exit(); } >> +extern void rcu_enter_nohz(void); >> +extern void rcu_exit_nohz(void); >> +#else >> +static inline void rcu_nmi_enter(void) {} >> +static inline void rcu_nmi_exit(void) {} >> +static inline void rcu_irq_enter(void) {} >> +static inline void rcu_irq_exit(void) {} >> +static inline void rcu_enter_nohz(void) {} >> +static inline void rcu_exit_nohz(void) {} >> +#endif >> + >> +extern int rcu_needs_cpu(int cpu); >> +extern void rcu_check_callbacks(int cpu, int user); >> + >> +extern void rcu_scheduler_starting(void); >> +extern int rcu_scheduler_active __read_mostly; >> + >> +#endif /* __LINUX_RCURING_H */ >> diff --git a/include/linux/sched.h b/include/linux/sched.h >> index e18473f..15b332d 100644 >> --- a/include/linux/sched.h >> +++ b/include/linux/sched.h >> @@ -1211,6 +1211,10 @@ struct task_struct { >> struct rcu_node *rcu_blocked_node; >> #endif /* #ifdef CONFIG_TREE_PREEMPT_RCU */ >> >> +#ifdef CONFIG_RCURING_PREEMPT >> + struct rcu_task_preempt rcu_task_preempt; >> +#endif >> + >> #if defined(CONFIG_SCHEDSTATS) || defined(CONFIG_TASK_DELAY_ACCT) >> struct sched_info sched_info; >> #endif >> @@ -1757,7 +1761,7 @@ static inline void rcu_copy_process(struct task_struct *p) >> INIT_LIST_HEAD(&p->rcu_node_entry); >> } >> >> -#else >> +#elif !defined(CONFIG_RCURING) >> >> static inline void rcu_copy_process(struct task_struct *p) >> { >> diff --git a/init/Kconfig b/init/Kconfig >> index a619a1a..48e58d9 100644 >> --- a/init/Kconfig >> +++ b/init/Kconfig >> @@ -374,6 +374,9 @@ config TINY_PREEMPT_RCU >> for real-time UP systems. This option greatly reduces the >> memory footprint of RCU. >> >> +config RCURING >> + bool "Multi-GP ring based RCU" >> + >> endchoice >> >> config PREEMPT_RCU >> @@ -450,6 +453,35 @@ config TREE_RCU_TRACE >> TREE_PREEMPT_RCU implementations, permitting Makefile to >> trivially select kernel/rcutree_trace.c. >> >> +config RCURING_BH >> + bool "RCU for bh" >> + default y >> + depends on RCURING && !PREEMPT_RT >> + help >> + If Y, use a independent rcu domain for rcu_bh. >> + If N, rcu_bh will map to rcu_sched(except rcu_read_lock_bh, >> + rcu_read_unlock_bh). > > If I understand correctly, "N" defeats the purpose of bh, which is to > have faster grace periods -- not having to wait for a context switch. I remembered, BH is for defeating DOS, so somebody will not need rcu domain for BH if he don't care this kind of DOS. > >> + >> +config RCURING_BH_PREEMPT >> + bool >> + depends on PREEMPT_RT >> + help >> + rcu_bh will map to rcu_preempt. > > If I understand what you are doing, this config option will break > networking. In PREEMPT_RT, BH will be preemptible. but RCURING_BH_PREEMPT is not used here, will be removed. > >> + >> +config RCURING_PREEMPT >> + bool "RCURING based reemptible RCU" >> + default n >> + depends on RCURING && PREEMPT >> + help >> + If Y, normal rcu functionality will map to rcu_preempt. >> + If N, normal rcu functionality will map to rcu_sched. >> + >> +config RCURING_COUNTERS_SHIFT >> + int "RCURING's counter shift" >> + range 1 10 >> + depends on RCURING >> + default 4 >> + >> endmenu # "RCU Subsystem" >> >> config IKCONFIG >> diff --git a/kernel/Makefile b/kernel/Makefile >> index 92b7420..66eab8c 100644 >> --- a/kernel/Makefile >> +++ b/kernel/Makefile >> @@ -86,6 +86,7 @@ obj-$(CONFIG_TREE_PREEMPT_RCU) += rcutree.o >> obj-$(CONFIG_TREE_RCU_TRACE) += rcutree_trace.o >> obj-$(CONFIG_TINY_RCU) += rcutiny.o >> obj-$(CONFIG_TINY_PREEMPT_RCU) += rcutiny.o >> +obj-$(CONFIG_RCURING) += rcuring.o >> obj-$(CONFIG_RELAY) += relay.o >> obj-$(CONFIG_SYSCTL) += utsname_sysctl.o >> obj-$(CONFIG_TASK_DELAY_ACCT) += delayacct.o >> diff --git a/kernel/kernel-offsets.c b/kernel/kernel-offsets.c >> new file mode 100644 >> index 0000000..0d30817 >> --- /dev/null >> +++ b/kernel/kernel-offsets.c >> @@ -0,0 +1,20 @@ >> +/* >> + * Generate definitions needed by assembly language modules. >> + * >> + * Copyright (C) 2008-2010 Lai Jiangshan >> + */ >> + >> +#define __GENERATING_KERNEL_OFFSETS__ >> +#include <linux/rcupdate.h> >> +#include <linux/sched.h> >> +#include <linux/kbuild.h> >> + >> +void foo(void); >> + >> +void foo(void) >> +{ >> +#ifdef CONFIG_RCURING_PREEMPT >> + OFFSET(TASK_RCU_PREEMPT, task_struct, rcu_task_preempt); >> +#endif >> +} >> + >> diff --git a/kernel/rcuring.c b/kernel/rcuring.c >> new file mode 100644 >> index 0000000..e7cedb5 >> --- /dev/null >> +++ b/kernel/rcuring.c >> @@ -0,0 +1,1002 @@ >> +/* >> + * Read-Copy Update mechanism for mutual exclusion >> + * >> + * This program is free software; you can redistribute it and/or modify >> + * it under the terms of the GNU General Public License as published by >> + * the Free Software Foundation; either version 2 of the License, or >> + * (at your option) any later version. >> + * >> + * This program is distributed in the hope that it will be useful, >> + * but WITHOUT ANY WARRANTY; without even the implied warranty of >> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the >> + * GNU General Public License for more details. >> + * >> + * You should have received a copy of the GNU General Public License >> + * along with this program; if not, write to the Free Software >> + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. >> + * >> + * Copyright IBM Corporation, 2008 >> + * Copyright Fujitsu 2008-2010 Lai Jiangshan >> + * >> + * Authors: Dipankar Sarma <dipankar@in.ibm.com> >> + * Manfred Spraul <manfred@colorfullife.com> >> + * Paul E. McKenney <paulmck@linux.vnet.ibm.com> Hierarchical version >> + * Lai Jiangshan <laijs@cn.fujitsu.com> RCURING version >> + */ >> + >> +#include <linux/rcupdate.h> >> +#include <linux/percpu.h> >> +#include <asm/atomic.h> >> +#include <linux/spinlock.h> >> +#include <linux/module.h> >> +#include <linux/interrupt.h> >> +#include <linux/cpu.h> >> + >> +#define RCURING_IDX_SHIFT CONFIG_RCURING_COUNTERS_SHIFT >> +#define RCURING_IDX_MAX (1L << RCURING_IDX_SHIFT) >> +#define RCURING_IDX_MASK (RCURING_IDX_MAX - 1) >> +#define RCURING_IDX(complete) ((complete) & RCURING_IDX_MASK) >> + >> +struct rcuring { >> + atomic_t ctr[RCURING_IDX_MAX]; >> + long curr_complete; >> + long done_complete; >> + >> + raw_spinlock_t lock; >> +}; >> + >> +/* It is long history that we use -300 as an initial complete */ >> +#define RCURING_INIT_COMPLETE -300L >> +#define RCURING_INIT(name) \ >> +{ \ >> + {[RCURING_IDX(RCURING_INIT_COMPLETE)] = ATOMIC_INIT(1),}, \ >> + RCURING_INIT_COMPLETE, \ >> + RCURING_INIT_COMPLETE - 1, \ >> + __RAW_SPIN_LOCK_UNLOCKED(name.lock), \ > > Please use the gcc-style initializers here. I will be use gcc-style initializers in next version. > >> +} >> + >> +static inline >> +int rcuring_ctr_read(struct rcuring *rr, unsigned long complete) >> +{ >> + int res; >> + >> + /* atomic_read() does not ensure to imply barrier(). */ >> + barrier(); >> + res = atomic_read(rr->ctr + RCURING_IDX(complete)); >> + barrier(); >> + >> + return res; >> +} >> + >> +void __rcuring_advance_done_complete(struct rcuring *rr) >> +{ >> + long wait_complete = rr->done_complete + 1; >> + >> + if (!rcuring_ctr_read(rr, wait_complete)) { >> + do { >> + wait_complete++; >> + } while (!rcuring_ctr_read(rr, wait_complete)); >> + >> + ACCESS_ONCE(rr->done_complete) = wait_complete - 1; >> + } >> +} > > For a given grace period, all quiescent states hit the same counter. > Or am I missing something? No. see the top of this email. > >> + >> +static inline >> +int rcuring_could_advance_done_complete(struct rcuring *rr) >> +{ >> + return !rcuring_ctr_read(rr, rr->done_complete + 1); >> +} >> + >> +static inline >> +void rcuring_advance_done_complete(struct rcuring *rr) >> +{ >> + if (rcuring_could_advance_done_complete(rr)) { >> + if (__raw_spin_trylock(&rr->lock)) { >> + __rcuring_advance_done_complete(rr); >> + __raw_spin_unlock(&rr->lock); >> + } >> + } >> +} >> + >> +void __rcuring_advance_curr_complete(struct rcuring *rr, long next_complete) >> +{ >> + long curr_complete = rr->curr_complete; >> + >> + if (curr_complete + 1 == next_complete) { >> + if (!rcuring_ctr_read(rr, next_complete)) { >> + ACCESS_ONCE(rr->curr_complete) = next_complete; >> + smp_mb__before_atomic_inc(); >> + atomic_inc(rr->ctr + RCURING_IDX(next_complete)); >> + smp_mb__after_atomic_inc(); >> + atomic_dec(rr->ctr + RCURING_IDX(curr_complete)); >> + } >> + } >> +} >> + >> +static inline >> +int rcuring_could_advance_complete(struct rcuring *rr, long next_complete) >> +{ >> + if (rcuring_could_advance_done_complete(rr)) >> + return 1; >> + >> + if (rr->curr_complete + 1 == next_complete) { >> + if (!rcuring_ctr_read(rr, next_complete)) >> + return 1; >> + } >> + >> + return 0; >> +} >> + >> +static inline >> +void rcuring_advance_complete(struct rcuring *rr, long next_complete) >> +{ >> + if (rcuring_could_advance_complete(rr, next_complete)) { >> + if (__raw_spin_trylock(&rr->lock)) { >> + __rcuring_advance_done_complete(rr); >> + __rcuring_advance_curr_complete(rr, next_complete); >> + __raw_spin_unlock(&rr->lock); >> + } >> + } >> +} >> + >> +static inline >> +void rcuring_advance_complete_force(struct rcuring *rr, long next_complete) >> +{ >> + if (rcuring_could_advance_complete(rr, next_complete)) { >> + __raw_spin_lock(&rr->lock); >> + __rcuring_advance_done_complete(rr); >> + __rcuring_advance_curr_complete(rr, next_complete); >> + __raw_spin_unlock(&rr->lock); >> + } >> +} >> + >> +static inline long __locked_complete_adjust(int locked_idx, long complete) >> +{ >> + long locked_complete; >> + >> + locked_complete = (complete & ~RCURING_IDX_MASK) | (long)locked_idx; >> + if (locked_complete - complete <= 0) >> + return locked_complete; >> + else >> + return locked_complete - RCURING_IDX_MAX; >> +} >> + >> +static long rcuring_lock(struct rcuring *rr) >> +{ >> + long locked_complete; >> + int idx; >> + >> + for (;;) { >> + idx = RCURING_IDX(ACCESS_ONCE(rr->curr_complete)); >> + if (likely(atomic_inc_not_zero(rr->ctr + idx))) >> + break; >> + } >> + smp_mb__after_atomic_inc(); >> + >> + locked_complete = ACCESS_ONCE(rr->curr_complete); >> + if (likely(RCURING_IDX(locked_complete) == idx)) >> + return locked_complete; >> + else >> + return __locked_complete_adjust(idx, locked_complete); >> +} >> + >> +static void rcuring_unlock(struct rcuring *rr, long locked_complete) >> +{ >> + smp_mb__before_atomic_dec(); >> + >> + atomic_dec(rr->ctr + RCURING_IDX(locked_complete)); >> +} >> + >> +static inline >> +void rcuring_dup_lock(struct rcuring *rr, long locked_complete) >> +{ >> + atomic_inc(rr->ctr + RCURING_IDX(locked_complete)); >> +} >> + >> +static inline >> +long rcuring_advance_lock(struct rcuring *rr, long locked_complete) >> +{ >> + long new_locked_complete = locked_complete; >> + >> + if (locked_complete != rr->curr_complete) { >> + /* >> + * Lock the new complete at first, and then release >> + * the old one. It prevents errors when NMI/SMI occurs >> + * after rcuring_unlock() - we still lock it. >> + */ >> + new_locked_complete = rcuring_lock(rr); >> + rcuring_unlock(rr, locked_complete); >> + } >> + >> + return new_locked_complete; >> +} >> + >> +static inline long rcuring_get_done_complete(struct rcuring *rr) >> +{ >> + return ACCESS_ONCE(rr->done_complete); >> +} >> + >> +static inline long rcuring_get_curr_complete(struct rcuring *rr) >> +{ >> + return ACCESS_ONCE(rr->curr_complete); >> +} >> + >> +struct rcu_batch { >> + struct rcu_head *list, **tail; >> +}; >> + >> +static void rcu_batch_merge(struct rcu_batch *to, struct rcu_batch *from) >> +{ >> + if (from->list != NULL) { >> + *to->tail = from->list; >> + to->tail = from->tail; >> + >> + from->list = NULL; >> + from->tail = &from->list; >> + } >> +} >> + >> +static void rcu_batch_add(struct rcu_batch *batch, struct rcu_head *new) >> +{ >> + *batch->tail = new; >> + batch->tail = &new->next; >> +} >> + >> +struct rcu_data { >> + long locked_complete; >> + >> + /* >> + * callbacks are in batches (done_complete, curr_complete] >> + * curr_complete - done_complete >= 0 >> + * curr_complete - done_complete <= RCURING_IDX_MAX >> + * domain's curr_complete - curr_complete >= 0 >> + * domain's done_complete - done_complete >= 0 >> + * >> + * curr_complete == done_complete just means @batch is empty. >> + * we have at least one callback in batch[RCURING_IDX(done_complete)] >> + * if curr_complete != done_complete >> + */ >> + long curr_complete; >> + long done_complete; >> + >> + struct rcu_batch batch[RCURING_IDX_MAX]; >> + struct rcu_batch done_batch; >> + >> + unsigned int qlen; >> + unsigned long jiffies; >> + unsigned long stall_jiffies; >> +}; >> + >> +struct rcu_domain { >> + struct rcuring rcuring; >> + const char *domain_name; >> + struct rcu_data __percpu *rcu_data; >> + >> + spinlock_t lock; /* this lock is for fqs or other misc things */ >> + long fqs_complete; >> + void (*force_quiescent_state)(struct rcu_domain *domain, >> + long fqs_complete); >> +}; >> + >> +#define EXPEDITED_GP_RESERVED_RECOMMEND (RCURING_IDX_SHIFT - 1) >> + >> +static inline long gp_reserved(struct rcu_domain *domain) >> +{ >> + return EXPEDITED_GP_RESERVED_RECOMMEND; >> +} >> + >> +static inline void __init rcu_data_init(struct rcu_data *rdp) >> +{ >> + int idx; >> + >> + for (idx = 0; idx < RCURING_IDX_MAX; idx++) >> + rdp->batch[idx].tail = &rdp->batch[idx].list; >> + >> + rdp->done_batch.tail = &rdp->done_batch.list; >> +} >> + >> +static void do_advance_callbacks(struct rcu_data *rdp, long done_complete) >> +{ >> + int idx; >> + >> + while (rdp->done_complete != done_complete) { >> + rdp->done_complete++; >> + idx = RCURING_IDX(rdp->done_complete); >> + rcu_batch_merge(&rdp->done_batch, rdp->batch + idx); >> + } >> +} >> + >> +/* Is value in the range (@left_open, @right_close] */ >> +static inline bool in_range(long left_open, long right_close, long value) >> +{ >> + return left_open - value < 0 && value - right_close <= 0; >> +} >> + >> +static void advance_callbacks(struct rcu_data *rdp, long done_complete, >> + long curr_complete) >> +{ >> + if (in_range(done_complete, curr_complete, rdp->curr_complete)) { >> + do_advance_callbacks(rdp, done_complete); >> + } else { >> + do_advance_callbacks(rdp, rdp->curr_complete); >> + rdp->curr_complete = done_complete; >> + rdp->done_complete = done_complete; >> + } >> +} >> + >> +static void __force_quiescent_state(struct rcu_domain *domain, >> + long fqs_complete) >> +{ >> + int cpu; >> + long fqs_complete_old; >> + long curr_complete = rcuring_get_curr_complete(&domain->rcuring); >> + long done_complete = rcuring_get_done_complete(&domain->rcuring); >> + >> + spin_lock(&domain->lock); >> + >> + fqs_complete_old = domain->fqs_complete; >> + if (!in_range(done_complete, curr_complete, fqs_complete_old)) >> + fqs_complete_old = done_complete; >> + >> + if (fqs_complete_old - fqs_complete >= 0) { >> + spin_unlock(&domain->lock); >> + return; >> + } >> + >> + domain->fqs_complete = fqs_complete; >> + spin_unlock(&domain->lock); >> + >> + for_each_online_cpu(cpu) { >> + struct rcu_data *rdp = per_cpu_ptr(domain->rcu_data, cpu); >> + long locked_complete = ACCESS_ONCE(rdp->locked_complete); >> + >> + if (!in_range(fqs_complete_old, fqs_complete, locked_complete)) >> + continue; > > Is preemption disabled here? Yes, it called with local irq disabled. > >> + >> + if (cpu == smp_processor_id()) >> + set_tsk_need_resched(current); >> + else >> + smp_send_reschedule(cpu); >> + } >> +} >> + >> +static void force_quiescent_state(struct rcu_domain *domain, long fqs_complete) >> +{ >> + domain->force_quiescent_state(domain, fqs_complete); >> +} >> + >> +static >> +long prepare_for_new_callback(struct rcu_domain *domain, struct rcu_data *rdp) >> +{ >> + long curr_complete, done_complete; >> + struct rcuring *rr = &domain->rcuring; >> + >> + smp_mb(); >> + curr_complete = rcuring_get_curr_complete(rr); >> + done_complete = rcuring_get_done_complete(rr); >> + >> + advance_callbacks(rdp, done_complete, curr_complete); >> + rdp->curr_complete = curr_complete; >> + >> + if (!rdp->qlen) { >> + rdp->jiffies = jiffies; >> + rdp->stall_jiffies = jiffies; >> + } >> + >> + return curr_complete; >> +} >> + >> +static long __call_rcu(struct rcu_domain *domain, struct rcu_head *head, >> + void (*func)(struct rcu_head *)) >> +{ >> + struct rcu_data *rdp; >> + long curr_complete; >> + >> + head->next = NULL; >> + head->func = func; >> + >> + rdp = this_cpu_ptr(domain->rcu_data); >> + curr_complete = prepare_for_new_callback(domain, rdp); >> + rcu_batch_add(rdp->batch + RCURING_IDX(curr_complete), head); >> + rdp->qlen++; >> + >> + return curr_complete; >> +} >> + >> +static void print_rcuring_stall(struct rcu_domain *domain) >> +{ >> + struct rcuring *rr = &domain->rcuring; >> + unsigned long curr_complete = rcuring_get_curr_complete(rr); >> + unsigned long done_complete = rcuring_get_done_complete(rr); >> + int cpu; >> + >> + printk(KERN_ERR "RCU is stall, cpu=%d, domain_name=%s\n", smp_processor_id(), >> + domain->domain_name); >> + >> + printk(KERN_ERR "domain's complete(done/curr)=%ld/%ld\n", >> + curr_complete, done_complete); >> + printk(KERN_ERR "curr_ctr=%d, wait_ctr=%d\n", >> + rcuring_ctr_read(rr, curr_complete), >> + rcuring_ctr_read(rr, done_complete + 1)); >> + >> + for_each_online_cpu(cpu) { >> + struct rcu_data *rdp = per_cpu_ptr(domain->rcu_data, cpu); >> + printk(KERN_ERR "cpu=%d, qlen=%d, complete(locked/dome/curr/)=" >> + "%ld/%ld/%ld\n", cpu, rdp->qlen, >> + rdp->locked_complete, rdp->done_complete, >> + rdp->curr_complete); >> + } >> +} >> + >> +static void __rcu_check_callbacks(struct rcu_domain *domain, >> + struct rcu_data *rdp, int in_rcu_softirq) >> +{ >> + long curr_complete, done_complete; >> + struct rcuring *rr = &domain->rcuring; >> + >> + if (!rdp->qlen) >> + return; >> + >> + rcuring_advance_done_complete(rr); >> + >> + curr_complete = rcuring_get_curr_complete(rr); >> + done_complete = ACCESS_ONCE(rr->done_complete); >> + advance_callbacks(rdp, done_complete, curr_complete); >> + >> + if (rdp->curr_complete == curr_complete >> + && rdp->batch[RCURING_IDX(rdp->curr_complete)].list) { >> + long max_gp_allow = RCURING_IDX_MAX - gp_reserved(domain); >> + >> + if (curr_complete - done_complete <= max_gp_allow) >> + rcuring_advance_complete(rr, curr_complete + 1); >> + } >> + >> + if (rdp->done_batch.list) { >> + if (!in_rcu_softirq) >> + raise_softirq(RCU_SOFTIRQ); >> + } else { >> + if (jiffies - rdp->jiffies > 10) { >> + force_quiescent_state(domain, rdp->curr_complete); >> + rdp->jiffies = jiffies; >> + } >> + if (jiffies - rdp->stall_jiffies > 2 * HZ) { >> + print_rcuring_stall(domain); >> + rdp->stall_jiffies = jiffies; >> + } >> + } >> +} >> + >> +static void rcu_do_batch(struct rcu_domain *domain, struct rcu_data *rdp) >> +{ >> + int count = 0; >> + struct rcu_head *list, *next; >> + >> + list = rdp->done_batch.list; >> + >> + if (list) { >> + rdp->done_batch.list = NULL; >> + rdp->done_batch.tail = &rdp->done_batch.list; >> + >> + local_irq_enable(); >> + >> + smp_mb(); >> + >> + while (list) { >> + next = list->next; >> + prefetch(next); >> + list->func(list); >> + count++; >> + list = next; >> + } >> + local_irq_disable(); >> + >> + rdp->qlen -= count; >> + rdp->jiffies = jiffies; >> + } >> +} >> + >> +static int __rcu_needs_cpu(struct rcu_data *rdp) >> +{ >> + return !!rdp->qlen; >> +} >> + >> +static inline >> +void __rcu_qsctr_inc(struct rcu_domain *domain, struct rcu_data *rdp) >> +{ >> + rdp->locked_complete = rcuring_advance_lock(&domain->rcuring, >> + rdp->locked_complete); >> +} >> + >> +static inline void rcu_preempt_save_complete(struct rcu_task_preempt *rcu_task, >> + long locked_complete) >> +{ >> + unsigned int new_flags; >> + >> + new_flags = RCURING_PREEMPT_FLAGS | RCURING_IDX(locked_complete); >> + ACCESS_ONCE(rcu_task->flags) = new_flags; >> +} >> + >> +/* >> + * Every cpu hold a lock_complete. But for preempt rcu, any task may also >> + * hold a lock_complete. This function increases lock_complete for the current >> + * cpu and check(dup_lock_and_save or release or advance) the lock_complete of >> + * the current task. >> + */ >> +static inline >> +void __rcu_preempt_qsctr_inc(struct rcu_domain *domain, struct rcu_data *rdp, >> + struct rcu_task_preempt *rcu_task) >> +{ >> + long locked_complete = rdp->locked_complete; >> + unsigned int flags = ACCESS_ONCE(rcu_task->flags); >> + >> + if (!(flags & RCURING_PREEMPT_SAVED)) { >> + BUG_ON(flags & RCURING_PREEMPT_QS); >> + if (rcu_task->nesting) { >> + /* dup_lock_and_save current task's lock_complete */ >> + rcuring_dup_lock(&domain->rcuring, locked_complete); >> + rcu_preempt_save_complete(rcu_task, locked_complete); >> + } >> + } else if (!(rcu_task->nesting)) { >> + /* release current task's lock_complete */ >> + rcuring_unlock(&domain->rcuring, (long)flags); >> + ACCESS_ONCE(rcu_task->flags) = 0; >> + } else if (!(flags & RCURING_PREEMPT_QS)) { >> + /* advance_and_save current task's lock_complete */ >> + if (RCURING_IDX(locked_complete) != RCURING_IDX((long)flags)) { >> + rcuring_unlock(&domain->rcuring, (long)flags); >> + rcuring_dup_lock(&domain->rcuring, locked_complete); >> + } >> + rcu_preempt_save_complete(rcu_task, locked_complete); >> + } >> + >> + /* increases lock_complete for the current cpu */ >> + rdp->locked_complete = rcuring_advance_lock(&domain->rcuring, >> + locked_complete); >> +} > > When we need to boost the priority of preempted readers, how do we tell > who they are? And struct list_head in struct rcu_task_preempt, and a ring struct list_head in rdp, and use percpu locks or scheduler's runqueues's locks protect all these struct list_head. > >> + >> +static void __synchronize_rcu(struct rcu_domain *domain, int expedited) >> +{ >> + struct rcu_synchronize rcu; >> + long complete; >> + >> + init_completion(&rcu.completion); >> + >> + local_irq_disable(); >> + complete = __call_rcu(domain, &rcu.head, wakeme_after_rcu); >> + >> + if (expedited) { >> + /* >> + * Fore a new gp to be started immediately(can use reserved gp). >> + * But if all gp are used, it will fail to start new gp, >> + * and we have to wait until some new gps available. >> + */ >> + rcuring_advance_complete_force(&domain->rcuring, >> + complete + 1); > > It looks to me like rcuring_advance_complete_force() can just fail > silently. How does this work? If it fails, We have no GPs lefts, synchronize_rcu_expedited() will becomes synchronize_rcu(). the next fqs will expedite it only a little. It's OK, we have 3 reserved GPs by default, It means we allows 3 synchronize_rcu_expedited()s concurrent at least and it's enough. > >> + >> + /* Fore qs and expedite the new gp */ >> + force_quiescent_state(domain, complete); >> + } >> + local_irq_enable(); >> + >> + wait_for_completion(&rcu.completion); >> +} >> + >> +static inline long __rcu_batches_completed(struct rcu_domain *domain) >> +{ >> + return rcuring_get_done_complete(&domain->rcuring); >> +} >> + >> +#ifdef CONFIG_RCURING_BH >> +#define gen_code_for_bh(gen_code, args...) gen_code(bh, ##args) > > Where is gen_code() defined? > >> +#else >> +#define gen_code_for_bh(gen_code, args...) >> +#endif >> +#define gen_code_for_sched(gen_code, args...) gen_code(sched, ##args) >> +#ifdef CONFIG_RCURING_PREEMPT >> +#define gen_code_for_preempt(gen_code, args...) gen_code(preempt, ##args) >> +#else >> +#define gen_code_for_preempt(gen_code, args...) >> +#endif >> + >> +#define GEN_CODES(gen_code, args...) \ >> + gen_code_for_bh(gen_code, ##args) \ >> + gen_code_for_sched(gen_code, ##args) \ >> + gen_code_for_preempt(gen_code, ##args) \ >> + >> +#define gen_basic_code(domain) \ >> + \ >> +static void force_quiescent_state_##domain(struct rcu_domain *domain, \ >> + long fqs_complete); \ >> + \ >> +static DEFINE_PER_CPU(struct rcu_data, rcu_data_##domain); \ >> + \ >> +struct rcu_domain rcu_domain_##domain = \ >> +{ \ >> + .rcuring = RCURING_INIT(rcu_domain_##domain.rcuring), \ >> + .domain_name = #domain, \ >> + .rcu_data = &rcu_data_##domain, \ >> + .lock = __SPIN_LOCK_UNLOCKED(rcu_domain_##domain.lock), \ >> + .force_quiescent_state = force_quiescent_state_##domain, \ >> +}; \ >> + \ >> +void call_rcu_##domain(struct rcu_head *head, \ >> + void (*func)(struct rcu_head *)) \ >> +{ \ >> + unsigned long flags; \ >> + \ >> + local_irq_save(flags); \ >> + __call_rcu(&rcu_domain_##domain, head, func); \ >> + local_irq_restore(flags); \ >> +} \ >> +EXPORT_SYMBOL_GPL(call_rcu_##domain); \ >> + \ >> +void synchronize_rcu_##domain(void) \ >> +{ \ >> + __synchronize_rcu(&rcu_domain_##domain, 0); \ >> +} \ >> +EXPORT_SYMBOL_GPL(synchronize_rcu_##domain); \ >> + \ >> +void synchronize_rcu_##domain##_expedited(void) \ >> +{ \ >> + __synchronize_rcu(&rcu_domain_##domain, 1); \ >> +} \ >> +EXPORT_SYMBOL_GPL(synchronize_rcu_##domain##_expedited); \ >> + \ >> +long rcu_batches_completed_##domain(void) \ >> +{ \ >> + return __rcu_batches_completed(&rcu_domain_##domain); \ >> +} \ >> +EXPORT_SYMBOL_GPL(rcu_batches_completed_##domain); \ >> + \ >> +void rcu_##domain##_force_quiescent_state(void) \ >> +{ \ >> + force_quiescent_state(&rcu_domain_##domain, 0); \ >> +} \ >> +EXPORT_SYMBOL_GPL(rcu_##domain##_force_quiescent_state); > > I used to use a single macro for the synchronize_rcu*() APIs, but was > talked into separate functions. Maybe some of the more ornate recent > macros have shifted people away from duplicate code in functions, so > might be time to try again. ;-) Too much dumplicated code, I have to do this :-) > >> +GEN_CODES(gen_basic_code) >> + >> +static DEFINE_PER_CPU(int, kernel_count); >> + >> +#define LOCK_COMPLETE(domain) \ >> + long complete_##domain = rcuring_lock(&rcu_domain_##domain.rcuring); >> +#define SAVE_COMPLETE(domain) \ >> + per_cpu(rcu_data_##domain, cpu).locked_complete = complete_##domain; >> +#define LOAD_COMPLETE(domain) \ >> + int complete_##domain = per_cpu(rcu_data_##domain, cpu).locked_complete; >> +#define RELEASE_COMPLETE(domain) \ >> + rcuring_unlock(&rcu_domain_##domain.rcuring, complete_##domain); >> + >> +static void __rcu_kernel_enter_outmost(int cpu) >> +{ >> + GEN_CODES(LOCK_COMPLETE) >> + >> + barrier(); >> + per_cpu(kernel_count, cpu) = 1; >> + barrier(); >> + >> + GEN_CODES(SAVE_COMPLETE) >> +} >> + >> +static void __rcu_kernel_exit_outmost(int cpu) >> +{ >> + GEN_CODES(LOAD_COMPLETE) >> + >> + barrier(); >> + per_cpu(kernel_count, cpu) = 0; >> + barrier(); >> + >> + GEN_CODES(RELEASE_COMPLETE) >> +} >> + >> +#ifdef CONFIG_RCURING_BH >> +static void force_quiescent_state_bh(struct rcu_domain *domain, >> + long fqs_complete) >> +{ >> + __force_quiescent_state(domain, fqs_complete); >> +} >> + >> +static inline void rcu_bh_qsctr_inc_irqoff(int cpu) >> +{ >> + __rcu_qsctr_inc(&rcu_domain_bh, &per_cpu(rcu_data_bh, cpu)); >> +} >> + >> +void rcu_bh_qs(int cpu) >> +{ >> + unsigned long flags; >> + >> + local_irq_save(flags); >> + rcu_bh_qsctr_inc_irqoff(cpu); >> + local_irq_restore(flags); >> +} >> + >> +#else /* CONFIG_RCURING_BH */ >> +static inline void rcu_bh_qsctr_inc_irqoff(int cpu) { (void)(cpu); } >> +#endif /* CONFIG_RCURING_BH */ >> + >> +static void force_quiescent_state_sched(struct rcu_domain *domain, >> + long fqs_complete) >> +{ >> + __force_quiescent_state(domain, fqs_complete); >> +} >> + >> +static inline void rcu_sched_qsctr_inc_irqoff(int cpu) >> +{ >> + __rcu_qsctr_inc(&rcu_domain_sched, &per_cpu(rcu_data_sched, cpu)); >> +} >> + >> +#ifdef CONFIG_RCURING_PREEMPT >> +static void force_quiescent_state_preempt(struct rcu_domain *domain, >> + long fqs_complete) >> +{ >> + /* To Be Implemented */ >> +} >> + >> +static inline void rcu_preempt_qsctr_inc_irqoff(int cpu) >> +{ >> + __rcu_preempt_qsctr_inc(&rcu_domain_preempt, >> + &per_cpu(rcu_data_preempt, cpu), >> + ¤t->rcu_task_preempt); >> +} >> + >> +#else /* CONFIG_RCURING_PREEMPT */ >> +static inline void rcu_preempt_qsctr_inc_irqoff(int cpu) >> +{ >> + (void)(cpu); >> + (void)(__rcu_preempt_qsctr_inc); >> +} >> +#endif /* CONFIG_RCURING_PREEMPT */ >> + >> +#define gen_needs_cpu(domain, cpu) \ >> + if (__rcu_needs_cpu(&per_cpu(rcu_data_##domain, cpu))) \ >> + return 1; >> +int rcu_needs_cpu(int cpu) >> +{ >> + GEN_CODES(gen_needs_cpu, cpu) >> + return 0; >> +} >> + >> +#define call_func(domain, func, args...) \ >> + func(&rcu_domain_##domain, &__get_cpu_var(rcu_data_##domain), ##args); >> + >> +/* >> + * Note a context switch. This is a quiescent state for RCU-sched, >> + * and requires special handling for preemptible RCU. >> + */ >> +void rcu_note_context_switch(int cpu) >> +{ >> + unsigned long flags; >> + >> +#ifdef CONFIG_HOTPLUG_CPU >> + /* >> + * The stoper thread need to schedule() to the idle thread >> + * after CPU_DYING. >> + */ >> + if (unlikely(!per_cpu(kernel_count, cpu))) { >> + BUG_ON(cpu_online(cpu)); >> + return; >> + } >> +#endif >> + >> + local_irq_save(flags); >> + rcu_bh_qsctr_inc_irqoff(cpu); >> + rcu_sched_qsctr_inc_irqoff(cpu); >> + rcu_preempt_qsctr_inc_irqoff(cpu); >> + local_irq_restore(flags); >> +} >> + >> +static int rcu_cpu_idle(int cpu) >> +{ >> + return idle_cpu(cpu) && rcu_scheduler_active && >> + !in_softirq() && hardirq_count() <= (1 << HARDIRQ_SHIFT); >> +} >> + >> +void rcu_check_callbacks(int cpu, int user) >> +{ >> + unsigned long flags; >> + >> + local_irq_save(flags); >> + >> + if (user || rcu_cpu_idle(cpu)) { >> + rcu_bh_qsctr_inc_irqoff(cpu); >> + rcu_sched_qsctr_inc_irqoff(cpu); >> + rcu_preempt_qsctr_inc_irqoff(cpu); >> + } else if (!in_softirq()) >> + rcu_bh_qsctr_inc_irqoff(cpu); >> + >> + GEN_CODES(call_func, __rcu_check_callbacks, 0) >> + local_irq_restore(flags); >> +} >> + >> +static void rcu_process_callbacks(struct softirq_action *unused) >> +{ >> + local_irq_disable(); >> + GEN_CODES(call_func, __rcu_check_callbacks, 1) >> + GEN_CODES(call_func, rcu_do_batch) >> + local_irq_enable(); >> +} >> + >> +static void __cpuinit __rcu_offline_cpu(struct rcu_domain *domain, >> + struct rcu_data *off_rdp, struct rcu_data *rdp) >> +{ >> + if (off_rdp->qlen > 0) { >> + unsigned long flags; >> + long curr_complete; >> + >> + /* move all callbacks in @off_rdp to @off_rdp->done_batch */ >> + do_advance_callbacks(off_rdp, off_rdp->curr_complete); >> + >> + /* move all callbacks in @off_rdp to this cpu(@rdp) */ >> + local_irq_save(flags); >> + curr_complete = prepare_for_new_callback(domain, rdp); >> + rcu_batch_merge(rdp->batch + RCURING_IDX(curr_complete), >> + &off_rdp->done_batch); >> + rdp->qlen += off_rdp->qlen; >> + local_irq_restore(flags); >> + >> + off_rdp->qlen = 0; >> + } >> +} >> + >> +#define gen_offline(domain, cpu, recieve_cpu) \ >> + __rcu_offline_cpu(&rcu_domain_##domain, \ >> + &per_cpu(rcu_data_##domain, cpu), \ >> + &per_cpu(rcu_data_##domain, recieve_cpu)); >> + >> +static int __cpuinit rcu_cpu_notify(struct notifier_block *self, >> + unsigned long action, void *hcpu) >> +{ >> + int cpu = (long)hcpu; >> + int recieve_cpu; >> + >> + (void)(recieve_cpu); >> + (void)(__rcu_offline_cpu); >> + (void)(__rcu_kernel_exit_outmost); >> + >> + switch (action) { >> +#ifdef CONFIG_HOTPLUG_CPU >> + case CPU_DYING: >> + case CPU_DYING_FROZEN: >> + /* >> + * the machine is stopped now, we can access to >> + * any other cpu data. >> + * */ >> + recieve_cpu = cpumask_any_but(cpu_online_mask, cpu); >> + GEN_CODES(gen_offline, cpu, recieve_cpu) >> + __rcu_kernel_exit_outmost(cpu); >> + break; >> +#endif >> + case CPU_STARTING: >> + case CPU_STARTING_FROZEN: >> + __rcu_kernel_enter_outmost(cpu); >> + default: >> + break; >> + } >> + return NOTIFY_OK; >> +} >> + >> +#define rcu_init_domain_data(domain, cpu) \ >> + for_each_possible_cpu(cpu) \ >> + rcu_data_init(&per_cpu(rcu_data_##domain, cpu)); >> + >> +void __init rcu_init(void) >> +{ >> + int cpu; >> + >> + GEN_CODES(rcu_init_domain_data, cpu) > > This one actually consumes more lines than would writing out the > calls explicitly. ;-) like following? more lines and more "#ifdef" GEN_CODES reduces dumplicated code and reduces "#ifdef"s. #ifdef CONFIG_RCURING_BH for_each_possible_cpu(cpu) rcu_data_init(&per_cpu(rcu_data_bh, cpu)); #endif for_each_possible_cpu(cpu) rcu_data_init(&per_cpu(rcu_data_sched, cpu)); #ifdef CONFIG_RCURING_PREEMPT for_each_possible_cpu(cpu) rcu_data_init(&per_cpu(rcu_data_preempt, cpu)); #endif > >> + >> + __rcu_kernel_enter_outmost(raw_smp_processor_id()); >> + open_softirq(RCU_SOFTIRQ, rcu_process_callbacks); >> + cpu_notifier(rcu_cpu_notify, 0); >> +} >> + >> +int rcu_scheduler_active __read_mostly; >> +EXPORT_SYMBOL_GPL(rcu_scheduler_active); >> + >> +/* >> + * This function is invoked towards the end of the scheduler's initialization >> + * process. Before this is called, the idle task might contain >> + * RCU read-side critical sections (during which time, this idle >> + * task is booting the system). After this function is called, the >> + * idle tasks are prohibited from containing RCU read-side critical >> + * sections. This function also enables RCU lockdep checking. >> + */ >> +void rcu_scheduler_starting(void) >> +{ >> + rcu_scheduler_active = 1; >> +} >> + >> +#ifdef CONFIG_NO_HZ >> + >> +void rcu_kernel_enter(void) >> +{ >> + unsigned long flags; >> + >> + raw_local_irq_save(flags); >> + if (__get_cpu_var(kernel_count) == 0) >> + __rcu_kernel_enter_outmost(raw_smp_processor_id()); >> + else >> + __get_cpu_var(kernel_count)++; >> + raw_local_irq_restore(flags); >> +} >> + >> +void rcu_kernel_exit(void) >> +{ >> + unsigned long flags; >> + >> + raw_local_irq_save(flags); >> + if (__get_cpu_var(kernel_count) == 1) >> + __rcu_kernel_exit_outmost(raw_smp_processor_id()); >> + else >> + __get_cpu_var(kernel_count)--; >> + raw_local_irq_restore(flags); >> +} >> + >> +void rcu_enter_nohz(void) >> +{ >> + unsigned long flags; >> + >> + raw_local_irq_save(flags); >> + __rcu_kernel_exit_outmost(raw_smp_processor_id()); >> + raw_local_irq_restore(flags); >> +} >> + >> +void rcu_exit_nohz(void) >> +{ >> + unsigned long flags; >> + >> + raw_local_irq_save(flags); >> + __rcu_kernel_enter_outmost(raw_smp_processor_id()); >> + raw_local_irq_restore(flags); >> +} >> +#endif /* CONFIG_NO_HZ */ >> + >> +static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head); >> +static struct completion rcu_barrier_completion; >> +static struct kref rcu_barrier_wait_ref; >> +static DEFINE_MUTEX(rcu_barrier_mutex); >> + >> +static void rcu_barrier_release_ref(struct kref *notused) >> +{ >> + complete(&rcu_barrier_completion); >> +} >> + >> +static void rcu_barrier_callback(struct rcu_head *notused) >> +{ >> + kref_put(&rcu_barrier_wait_ref, rcu_barrier_release_ref); >> +} >> + >> +static void rcu_barrier_func(void *data) >> +{ >> + struct rcu_domain *rcu_domain = data; >> + >> + kref_get(&rcu_barrier_wait_ref); >> + __call_rcu(rcu_domain, &__get_cpu_var(rcu_barrier_head), >> + rcu_barrier_callback); >> +} >> + >> +static void __rcu_barrier(struct rcu_domain *rcu_domain) >> +{ >> + mutex_lock(&rcu_barrier_mutex); >> + >> + init_completion(&rcu_barrier_completion); >> + kref_init(&rcu_barrier_wait_ref); >> + >> + /* queue barrier rcu_heads for every cpu */ >> + on_each_cpu(rcu_barrier_func, rcu_domain, 1); >> + >> + kref_put(&rcu_barrier_wait_ref, rcu_barrier_release_ref); >> + wait_for_completion(&rcu_barrier_completion); >> + >> + mutex_unlock(&rcu_barrier_mutex); >> +} >> + >> +#define gen_rcu_barrier(domain) \ >> +void rcu_barrier_##domain(void) \ >> +{ \ >> + __rcu_barrier(&rcu_domain_##domain); \ >> +} \ >> +EXPORT_SYMBOL_GPL(rcu_barrier_##domain); >> + >> +GEN_CODES(gen_rcu_barrier) >> + >> diff --git a/kernel/sched.c b/kernel/sched.c >> index 09b574e..967f25a 100644 >> --- a/kernel/sched.c >> +++ b/kernel/sched.c >> @@ -9113,6 +9113,7 @@ struct cgroup_subsys cpuacct_subsys = { >> }; >> #endif /* CONFIG_CGROUP_CPUACCT */ >> >> +#ifndef CONFIG_RCURING >> #ifndef CONFIG_SMP >> >> void synchronize_sched_expedited(void) >> @@ -9182,3 +9183,4 @@ void synchronize_sched_expedited(void) >> EXPORT_SYMBOL_GPL(synchronize_sched_expedited); >> >> #endif /* #else #ifndef CONFIG_SMP */ >> +#endif /* CONFIG_RCURING */ >> diff --git a/kernel/time/tick-sched.c b/kernel/time/tick-sched.c >> index 3e216e0..0eba561 100644 >> --- a/kernel/time/tick-sched.c >> +++ b/kernel/time/tick-sched.c >> @@ -269,6 +269,10 @@ void tick_nohz_stop_sched_tick(int inidle) >> cpu = smp_processor_id(); >> ts = &per_cpu(tick_cpu_sched, cpu); >> >> + /* Don't enter nohz when cpu is offlining, it is going to die */ >> + if (cpu_is_offline(cpu)) >> + goto end; >> + >> /* >> * Call to tick_nohz_start_idle stops the last_update_time from being >> * updated. Thus, it must not be called in the event we are called from >> > > ^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [PATCH, RFC] RCU: New scalable, multi-GP and preemptible RCU implementation 2010-08-31 3:03 ` Lai Jiangshan @ 2010-09-08 19:53 ` Paul E. McKenney 2010-09-13 12:27 ` Lai Jiangshan 0 siblings, 1 reply; 8+ messages in thread From: Paul E. McKenney @ 2010-09-08 19:53 UTC (permalink / raw) To: Lai Jiangshan Cc: Linus Torvalds, Ingo Molnar, Andrew Morton, David Miller, Manfred Spraul, Mathieu Desnoyers, Steven Rostedt, Thomas Gleixner, Peter Zijlstra, LKML, dipankar On Tue, Aug 31, 2010 at 11:03:11AM +0800, Lai Jiangshan wrote: > On 08/31/2010 07:57 AM, Paul E. McKenney wrote: > > On Mon, Aug 30, 2010 at 05:31:44PM +0800, Lai Jiangshan wrote: > >> > >> 0) contents > >> 1) overview > >> 2) scalable > >> 3) multi-GP > >> 4) integrated expedited synchronize_rcu > >> 5) preemptible > >> 6) top-down implies > >> 7) speech > >> 8) core rcuring > > > > Interesting! I can't claim to have dug through every detail, but figured > > I should ask a few questions first. > > > > Thanx, Paul > > > > Of course, the first question is whether it passes rcutorture, and if > > so on what hardware configuration. > > Only in x86(32bit and 64bit) system. > Did you patch it in power and test it? It' need more test for different archs, > but I have no other arch. > > What is "hardware configuration"? Mainly the number of CPUs. The socket/core/thread counts could also be helpful. > > At first glance, it looks to live somewhere between Manfred Spraul's > > counter-based hierarchical RCU and classic RCU. There are also some > > resemblances to K42's "generations" RCU-like mechanism. > > > >> 1) overview > >> This is a new RCU implementation: RCURING. It uses a ring atomic counters > >> to control the completion of GPs and a ring callbacks batches to process > >> the callbacks. > > > > The ring is a set of grace periods, correct? > > correct. A GP is also controled by a set ring counters in > (domain's done_complete, this GP's complete], this GP is only completed > until all these conters become zero. Understood. > I pull up a question of your here: > > For a given grace period, all quiescent states hit the same counter. > > Or am I missing something? > > No. > any rcu read site has a locked_complete, if a cpu hold a locked_complete, > the counter of this locked_complete will not become zero. > > Any quiescent states will release its previous rcu read site's locked_complete, > so different QS may hit different counter. > QS alway locks the newest GP(not started to waiting, domain's curr_complete) > for next rcu read site and get the new locked_complete. > > What I said seems still indistinct, more questions are wellcome, questions > also help for documents. Thank you very much. OK, here is my assessment thus far. First my understanding of how this all works: o Maintain a circular array of counters. Maintain a parallel circular array of callback lists. o Each non-dyntick-idle CPU holds a reference to one element of the array. If Ring RCU is idle, it holds a reference to the element that will serve the next grace period. Similarly, if a given CPU has passed through a quiescent state for all existing grace periods, it will hold a reference to the element that will serve for the next grace period. o Upon encountering a quiescent state, we acquire a reference to the element that will serve for the next grace period, and then release the reference to the element that we were holding. The rcuring_lock() function acquires a reference to the element that will serve for the next grace period. Not yet clear what the atomic_inc_not_zero() is for. If the reference count is zero, what makes it non-zero? The rcuring_dup_lock() function acquires another reference to the element that the CPU already holds a reference to. This is used in preemptible RCU when a task is preempted in an RCU read-side critical section. The rcuring_unlock() function releases a reference. The rcuring_advance_lock() acquires a new reference, then releases the old one. This is used when a CPU passes through a quiescent state. o When a CPU enters dyntick-idle mode, it releases its reference. When it exits dyntick-idle mode (including irq and NMI), it acquires a referent to the element that will serve for the next grace period. o The __rcu_check_callbacks() function checks to see if the oldest grace period is now reference-free, and, if so, rcuring_advance_done_complete() retires the old grace periods. The advance_callbacks() then moves any affected callbacks to be invoked. o If RCU callbacks are not invoked quickly enough (10 jiffies), force_quiescent_state() sends IPIs to CPUs needing help reaching a quiescent state. So there were some things that struck me as being really good with your approach, most of which I intend to pull into the current RCU implementations: o Offsets into task_struct, allowing inlining of rcu_read_lock() and the fastpath of rcu_read_unlock(). o Faster grace periods, at least when the system is busy. o Placement of rcu_copy_process() and exit_rcu() into include/linux/ringrcu.h. Not clear that this maps nicely to tree/tiny implementations, though. o Defining __rcu_barrier() in terms of __call_rcu() instead of the various call_rcu*() variants. Ditto for __synchronize_rcu(). There are some things that I am unsure of, perhaps because I am still misunderstanting something: o The per-CPU reference-counting scheme can gain much of the benefit that TREE_RCU gains by handling large numbers of updates with a single grace period. However, CPUs that pass through quiescent states quickly, for example, by frequently interrupting out of dyntick-idle state, can incur lots of cache misses cycling through lots of RING RCU array elements. o Aggressive code shrinkage through large cpp macros. You seem to share my ambivalence given that you use it in only some of the situations where it might be applied. ;-) The savings of lines of code does vary -- many definitions can be placed under a single #ifdef, after all. o Use of kref rather than a simple counter. Saves a line or two, adds a function. o __rcu_check_callbacks() won't force a grace period if there are no callbacks waiting to be invoked. On the one hand, one can argue that forcing a grace period is useless in that case, but on the other hand, there is usually more than one CPU. This is probably a consequence of the force_quiescent_state() time being based on callback residence time rather than on grace-period duration -- only CPUs with callbacks can possibly measure a meaningful residence time. o NMI handling? Note that raw_local_irq_save() does not block NMIs, so need to analyze recursion into rcu_kernel_enter() and rcu_kernel_exit(). Looks like it might be OK, but... Ironically enough, courtesy of the atomic instructions and cache misses that are so scary in these functions. ;-) o Eliminating rcu_pending() might be a good thing, though perhaps simplifying it (making it less exact) might be the right thing to do in TREE_RCU, particularly given that priority boosting turns RCU_SOFTIRQ into a kthread. Here are some things that could be problematic about Ring RCU. Some of them are fixable, others might or might not be: o Potentially high memory contention on a given ring: worst case is quite bad on large SMP systems. On the other hand, best case is really good, given that each CPU could get its own rcuring counter. Average case is likely to be modestly bad. The default value of 4 for RCURING_COUNTERS_SHIFT maps to 16 array elements, which could easily show worst case behavior from time to time on large systems. o Real-time latency could be problematic due to: o Scans across CPUs and across grace-period ring elements. o Worst-case memory contention. o High interrupt rates out of dyntick-idle state. o Potentially large memory consumption due to RCURING_IDX_MAX arrays in rcu_data and rcuring structures. The default of 16 elements should be OK. o Use of atomic instructions in dyntick path. (Not just one, but potentially a long string of them -- and implied cache misses. Understand that the expected case is a single atomic in rcuring_lock() -- but worst case is at least somewhat important.) Benefit is that you don't need to scan the CPUs to check for dyntick-idle CPUs -- force_quiescent_state() is then limited to resched IPIs. Unless of course the dyntick-idle CPUs remain in dyntick-idle state throughout, in which case they get IPIed. This situation limits dyntick-idle state to 10 jiffies, which would result in the energy-efficiency guys screaming bloody murder. The battery-powered embedded guys would not bother screaming, but could instead be expected to come after us with pitchforks. o Energy efficiency: see above paragraph. The remainder are shortcomings that should be easy to fix: o force_quiescent_state() for preemptible variant not implemented. (Waiting on RCU priority boosting.) o Constants in __rcu_check_callbacks() really need to be macroized. "10"? "2 * HZ"? o rcu_do_batch() needs to self-limit in multi-CPU configurations. Otherwise, latency is a problem. TINY_RCU gets away without this (for the time being, anyway), but TREE_RCU and CLASSIC_RCU before it need the blimit functionality. Otherwise the Linux Audio guys will scream. o Too much stuff directly called from scheduling-clock-interrupt context! Full scan of CPUs in __force_quiescent_state(), for example. Need to hand off to RCU_SOFTIRQ or a kthread. o print_rcuring_stall() does not dump stacks. So, what am I still missing about your approach? Thanx, Paul > >> In 2008, I think out the idea, and wrote the first 300lines code of rcuring.c, > >> but I'm too lazy to finish it > >> > >> 2) scalable > >> There is only 2 locks in every RCU domain(rcu_bh, rcu_sched, rcu_preempt), > >> one of them is for misc things, it is a infrequency lock, the other is > >> for advancing complete, it is frequency lock. But somebody else would > >> very probably do the same work for me if somebody else is holding this lock, > >> so we alway use spin_trylock() for this lock, so it is scalable. > >> > >> We don't need any lock for reporting quiescent state, we just need 2 > >> atomic operations for every RCU domain. > > > > If you have a single GP in flight, all CPUs need to do an atomic op on the > > same element of the RCU ring, correct? > > Yes, but it seldom have only a single GP in flight. We start GPs when need. > > > My first thought was that each CPU > > was writing the current grace-period number to its own variable, but that > > would not require an atomic operation. Of course, the downside of this > > latter approach is that some CPU would need to scan all CPUs' counters. > > > >> 3) multi-GP > >> All old rcu implementations have only one waiting GP. > >> > >> time -----ta-tb--------------------tc----------------td-------------> > >> GP -----|------GP1---------------|--------GP2------|-------------> > >> cpu#x ........|---------------------------------------|...............> > >> |->insert a callback |->handle callback > >> > >> the callback have to wait a partial GP1, and a full GP2, it may wait > >> near 2 GPs in worse case because it have to wait the rcu read sites witch > >> start at (tb, tc] which are needless to wait. In average, a callback > >> have to wait (1 + 1/(1+1))= 1.5GPs > >> > >> if we can have three simultaneous waiting GPs: > >> > >> GPs .....|------GP1---------------|.......................... > >> ...........|--------GP2-----------|.................... > >> ....................|---------GP3---------------|........ > >> cpu#x ........|-------------------------|..........................> > >> |->insert a callback |->handle callback > >> The callback is handled more quickly, In average, a callback have to wait > >> (1 + 1/(3+1))=1.25GPs > >> > >> if we can have X simultaneous waiting GPs, a callback have to wait > >> (1 + 1/(X+1)) GPs in average. The default X in this patch is 15 > >> (including the reserved GPs). > > > > Assuming that the start times of the GPs are evenly spaced, correct? > > No, We start GPs when need. > If a cpu has queued some callbacks, and the callbacks's > complete is equals to rcu domain's curr_complete, this cpu will > start a new GPs for these callbacks. > > > > >> 4) integrated expedited synchronize_rcu > >> expedited synchronize_rcu is implemented in rcuring.c, not in other files. > >> The whole of it is less than 50 lines of code. we just reserve several > >> GPs for expedited synchronize_rcu, when a expedited callback is inserted, > >> we start a new GP immediately(we have reserved GPs, this step will > >> very probably success), and then force this new GP to be completed quickly. > >> (thanks to multi-GP) > > > > I would be more convinced had you implemented force_quiescent_state() > > for preemptible RCU. ;-) > > > >> 5) preemptible > >> new simple preemptible code, rcu_read_lock/unlock() is simple and is inline function. > >> add new kernel-offset.c to avoid recursively header files inclusion. > >> > >> 6) top-down implies > >> irq and local_irq_disable() implies preempt_disable()/rcu_read_lock_sched(), > >> irq and local_irq_disable() implies rcu_read_lock(). (*) > >> preempt_disable()/rcu_read_lock_sched() implies rcu_read_lock(). (*) > >> *: In preemptible rcutree/rcutiny, it is false. > >> > >> 7) speech > >> I will give a speech about RCU and this RCU implementation > >> in Japan linuxcon 2010. Welcome to Japan Linuxcon 2010. > > > > Congratulations! > > > >> 8) core rcuring > >> (Comments and document is still been writing) > >> > >> all read_read_lock/unlock_domain() is nested in a coresponding > >> rcuring_lock/unlock(), and the algorithm handles a rcuring_lock/unlock() > >> ciritcal region as a full rcu read site ciritcal region. > >> > >> read site: > >> rcuring_lock() > >> lock a complete, and ensure this locked_complete - done_complete(1) > 0 > >> curr_complete(1) - locked_complete >= 0 > >> mb(1) > > > > This is really a compiler barrier? > > a really memory barrier. > these memory barriers are not in rcu_read_lock(), there in the code where > pass QS. > > > > >> > >> rcu_derefrence(ptr) > >> > >> rcuring_unlock() > >> mb(2) > > > > Ditto? > > > >> release its locked_complete. > >> > >> update site: > >> rcu_assign_pointer(ptr, new) > >> > >> (handle callback:call_rcu()+rcu_do_batch()) > >> (call_rcu():) > >> mb(3) > >> get the curr_complete(2) > >> set callback's complete as this curr_complete(2). > >> (we don't have any field to record the callback's complete, we just > >> emulate it by rdp->done_complete and the position of this callback > >> in rdp->batch.) > >> > >> time pass... until done_complete(2) - callback's complete > 0 > > > > I have to ask about counter overflow on 32-bit systems. > > The "complete" is "long" not "unsigned long" > > if callback's complete - rdp->curr_complete > 0, we also queue > this callback to done_batch. > > > > >> (rcu_do_batch():) > >> mb(4) > >> call the callback: (free old ptr, etc.) > >> > >> Signed-off-by: Lai Jiangshan <laijs@cn.fujitsu.com> > >> --- > >> Kbuild | 75 ++- > >> include/linux/rcupdate.h | 6 > >> include/linux/rcuring.h | 195 +++++++++ > >> include/linux/sched.h | 6 > >> init/Kconfig | 32 + > >> kernel/Makefile | 1 > >> kernel/kernel-offsets.c | 20 > >> kernel/rcuring.c | 1002 +++++++++++++++++++++++++++++++++++++++++++++++ > >> kernel/sched.c | 2 > >> kernel/time/tick-sched.c | 4 > >> 10 files changed, 1324 insertions(+), 19 deletions(-) > >> diff --git a/Kbuild b/Kbuild > >> index e3737ad..bce37cb 100644 > >> --- a/Kbuild > >> +++ b/Kbuild > >> @@ -3,7 +3,19 @@ > >> # This file takes care of the following: > >> # 1) Generate bounds.h > >> # 2) Generate asm-offsets.h (may need bounds.h) > >> -# 3) Check for missing system calls > >> +# 3) Generate kernel-offsets.h > >> +# 4) Check for missing system calls > >> + > >> +# Default sed regexp - multiline due to syntax constraints > >> +define sed-y > >> + "/^->/{s:->#\(.*\):/* \1 */:; \ > >> + s:^->\([^ ]*\) [\$$#]*\([^ ]*\) \(.*\):#define \1 \2 /* \3 */:; \ > >> + s:->::; p;}" > >> +endef > >> + > >> +quiet_cmd_offsets_cc_s_c = CC $(quiet_modtag) $@ > >> +cmd_offsets_cc_s_c = $(CC) -D__GENARATING_OFFSETS__ \ > > > > s/GENARATING/GENERATING/ just to be pedantic. ;-) > > > >> + $(c_flags) -fverbose-asm -S -o $@ $< > > > > Pulling out the offsets might have the advantage of allowing both > > rcu_read_lock() and rcu_read_unlock() to be inlined, but without pulling > > in include/linux/sched.h everywhere. I had been thinking in terms of > > moving the state from the task struct to the taskinfo struct, but > > this approach might be worth considering. > > > >> ##### > >> # 1) Generate bounds.h > >> @@ -43,22 +55,14 @@ $(obj)/$(bounds-file): kernel/bounds.s Kbuild > >> # 2) Generate asm-offsets.h > >> # > >> > >> -offsets-file := include/generated/asm-offsets.h > >> +asm-offsets-file := include/generated/asm-offsets.h > >> > >> -always += $(offsets-file) > >> -targets += $(offsets-file) > >> +always += $(asm-offsets-file) > >> +targets += $(asm-offsets-file) > >> targets += arch/$(SRCARCH)/kernel/asm-offsets.s > >> > >> - > >> -# Default sed regexp - multiline due to syntax constraints > >> -define sed-y > >> - "/^->/{s:->#\(.*\):/* \1 */:; \ > >> - s:^->\([^ ]*\) [\$$#]*\([^ ]*\) \(.*\):#define \1 \2 /* \3 */:; \ > >> - s:->::; p;}" > >> -endef > >> - > >> -quiet_cmd_offsets = GEN $@ > >> -define cmd_offsets > >> +quiet_cmd_asm_offsets = GEN $@ > >> +define cmd_asm_offsets > >> (set -e; \ > >> echo "#ifndef __ASM_OFFSETS_H__"; \ > >> echo "#define __ASM_OFFSETS_H__"; \ > >> @@ -78,13 +82,48 @@ endef > >> arch/$(SRCARCH)/kernel/asm-offsets.s: arch/$(SRCARCH)/kernel/asm-offsets.c \ > >> $(obj)/$(bounds-file) FORCE > >> $(Q)mkdir -p $(dir $@) > >> - $(call if_changed_dep,cc_s_c) > >> + $(call if_changed_dep,offsets_cc_s_c) > >> + > >> +$(obj)/$(asm-offsets-file): arch/$(SRCARCH)/kernel/asm-offsets.s Kbuild > >> + $(call cmd,asm_offsets) > >> + > >> +##### > >> +# 3) Generate kernel-offsets.h > >> +# > >> + > >> +kernel-offsets-file := include/generated/kernel-offsets.h > >> + > >> +always += $(kernel-offsets-file) > >> +targets += $(kernel-offsets-file) > >> +targets += kernel/kernel-offsets.s > >> + > >> +quiet_cmd_kernel_offsets = GEN $@ > >> +define cmd_kernel_offsets > >> + (set -e; \ > >> + echo "#ifndef __LINUX_KERNEL_OFFSETS_H__"; \ > >> + echo "#define __LINUX_KERNEL_OFFSETS_H__"; \ > >> + echo "/*"; \ > >> + echo " * DO NOT MODIFY."; \ > >> + echo " *"; \ > >> + echo " * This file was generated by Kbuild"; \ > >> + echo " *"; \ > >> + echo " */"; \ > >> + echo ""; \ > >> + sed -ne $(sed-y) $<; \ > >> + echo ""; \ > >> + echo "#endif" ) > $@ > >> +endef > >> + > >> +# We use internal kbuild rules to avoid the "is up to date" message from make > >> +kernel/kernel-offsets.s: kernel/kernel-offsets.c $(obj)/$(bounds-file) \ > >> + $(obj)/$(asm-offsets-file) FORCE > >> + $(call if_changed_dep,offsets_cc_s_c) > >> > >> -$(obj)/$(offsets-file): arch/$(SRCARCH)/kernel/asm-offsets.s Kbuild > >> - $(call cmd,offsets) > >> +$(obj)/$(kernel-offsets-file): kernel/kernel-offsets.s Kbuild > >> + $(call cmd,kernel_offsets) > >> > >> ##### > >> -# 3) Check for missing system calls > >> +# 4) Check for missing system calls > >> # > >> > >> quiet_cmd_syscalls = CALL $< > >> diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h > >> index 89414d6..42cd6bc 100644 > >> --- a/include/linux/rcupdate.h > >> +++ b/include/linux/rcupdate.h > >> @@ -124,6 +124,7 @@ extern void rcu_bh_qs(int cpu); > >> extern void rcu_check_callbacks(int cpu, int user); > >> struct notifier_block; > >> > >> +#ifndef CONFIG_RCURING > >> #ifdef CONFIG_NO_HZ > >> > >> extern void rcu_enter_nohz(void); > >> @@ -140,11 +141,14 @@ static inline void rcu_exit_nohz(void) > >> } > >> > >> #endif /* #else #ifdef CONFIG_NO_HZ */ > >> +#endif > >> > >> #if defined(CONFIG_TREE_RCU) || defined(CONFIG_TREE_PREEMPT_RCU) > >> #include <linux/rcutree.h> > >> #elif defined(CONFIG_TINY_RCU) || defined(CONFIG_TINY_PREEMPT_RCU) > >> #include <linux/rcutiny.h> > >> +#elif defined(CONFIG_RCURING) > >> +#include <linux/rcuring.h> > >> #else > >> #error "Unknown RCU implementation specified to kernel configuration" > >> #endif > >> @@ -686,6 +690,7 @@ struct rcu_synchronize { > >> > >> extern void wakeme_after_rcu(struct rcu_head *head); > >> > >> +#ifndef CONFIG_RCURING > >> #ifdef CONFIG_PREEMPT_RCU > >> > >> /** > >> @@ -710,6 +715,7 @@ extern void call_rcu(struct rcu_head *head, > >> #define call_rcu call_rcu_sched > >> > >> #endif /* #else #ifdef CONFIG_PREEMPT_RCU */ > >> +#endif > >> > >> /** > >> * call_rcu_bh() - Queue an RCU for invocation after a quicker grace period. > >> diff --git a/include/linux/rcuring.h b/include/linux/rcuring.h > >> new file mode 100644 > >> index 0000000..343b932 > >> --- /dev/null > >> +++ b/include/linux/rcuring.h > >> @@ -0,0 +1,195 @@ > >> +/* > >> + * Read-Copy Update mechanism for mutual exclusion > >> + * > >> + * This program is free software; you can redistribute it and/or modify > >> + * it under the terms of the GNU General Public License as published by > >> + * the Free Software Foundation; either version 2 of the License, or > >> + * (at your option) any later version. > >> + * > >> + * This program is distributed in the hope that it will be useful, > >> + * but WITHOUT ANY WARRANTY; without even the implied warranty of > >> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > >> + * GNU General Public License for more details. > >> + * > >> + * You should have received a copy of the GNU General Public License > >> + * along with this program; if not, write to the Free Software > >> + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. > >> + * > >> + * Copyright IBM Corporation, 2008 > >> + * Copyright Fujitsu 2008-2010 Lai Jiangshan > >> + * > >> + * Authors: Dipankar Sarma <dipankar@in.ibm.com> > >> + * Manfred Spraul <manfred@colorfullife.com> > >> + * Paul E. McKenney <paulmck@linux.vnet.ibm.com> Hierarchical version > >> + * Lai Jiangshan <laijs@cn.fujitsu.com> RCURING version > >> + */ > >> + > >> +#ifndef __LINUX_RCURING_H > >> +#define __LINUX_RCURING_H > >> + > >> +#define __rcu_read_lock_bh() local_bh_disable() > >> +#define __rcu_read_unlock_bh() local_bh_enable() > >> +#define __rcu_read_lock_sched() preempt_disable() > >> +#define __rcu_read_unlock_sched() preempt_enable() > >> + > >> +#ifdef CONFIG_RCURING_BH > >> +extern void call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *)); > >> +extern void synchronize_rcu_bh(void); > >> +extern void synchronize_rcu_bh_expedited(void); > >> +extern void rcu_bh_qs(int cpu); > >> +extern long rcu_batches_completed_bh(void); > >> +extern void rcu_barrier_bh(void); > >> +extern void rcu_bh_force_quiescent_state(void); > >> +#else /* CONFIG_RCURING_BH */ > >> +#define call_rcu_bh call_rcu_sched > >> +#define synchronize_rcu_bh synchronize_rcu_sched > >> +#define synchronize_rcu_bh_expedited synchronize_rcu_sched_expedited > >> +#define rcu_bh_qs(cpu) do { (void)(cpu); } while (0) > >> +#define rcu_batches_completed_bh rcu_batches_completed_sched > >> +#define rcu_barrier_bh rcu_barrier_sched > >> +#define rcu_bh_force_quiescent_state rcu_sched_force_quiescent_state > >> +#endif /* CONFIG_RCURING_BH */ > >> + > >> +extern > >> +void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *)); > >> +extern void synchronize_rcu_sched(void); > >> +#define synchronize_sched synchronize_rcu_sched > >> +extern void synchronize_rcu_sched_expedited(void); > >> +#define synchronize_sched_expedited synchronize_rcu_sched_expedited > >> +extern void rcu_note_context_switch(int cpu); > >> +extern long rcu_batches_completed_sched(void); > >> +extern void rcu_barrier_sched(void); > >> +extern void rcu_sched_force_quiescent_state(void); > >> + > >> +/* > >> + * The @flags is valid only when RCURING_PREEMPT_SAVED is set. > >> + * > >> + * When preemptible rcu read site is preempted, we save RCURING_PREEMPT_SAVED, > >> + * RCURING_PREEMPT_QS and this read site's locked_complete(only the last > >> + * RCURING_COUNTERS_SHIFT bits) in the @flags. > >> + * > >> + * When the outmost rcu read site is closed, we will clear > >> + * RCURING_PREEMPT_QS bit if it's saved, and let rcu system knows > >> + * this task has passed a quiescent state and release the old read site's > >> + * locked_complete. > >> + */ > >> +#define RCURING_PREEMPT_SAVED (1 << 31) > >> +#define RCURING_PREEMPT_QS (1 << 30) > >> +#define RCURING_PREEMPT_FLAGS (RCURING_PREEMPT_SAVED | RCURING_PREEMPT_QS) > >> + > >> +struct rcu_task_preempt { > >> + unsigned int nesting; > >> + unsigned int flags; > >> +}; > >> + > >> +static inline void rcu_read_lock_preempt(struct rcu_task_preempt *rcu_task) > >> +{ > >> + rcu_task->nesting++; > >> + barrier(); > >> +} > >> + > >> +static inline void rcu_read_unlock_preempt(struct rcu_task_preempt *rcu_task) > >> +{ > >> + int nesting = rcu_task->nesting; > >> + > >> + if (nesting == 1) { > >> + unsigned int flags; > >> + > >> + barrier(); > >> + flags = ACCESS_ONCE(rcu_task->flags); > >> + > >> + if ((flags & RCURING_PREEMPT_FLAGS) == RCURING_PREEMPT_FLAGS) { > >> + flags &= ~RCURING_PREEMPT_QS; > >> + ACCESS_ONCE(rcu_task->flags) = flags; > >> + } > >> + } > >> + > >> + barrier(); > >> + rcu_task->nesting = nesting - 1; > >> +} > >> + > >> +#ifdef CONFIG_RCURING_PREEMPT > >> + > >> +#ifdef __GENARATING_OFFSETS__ > >> +#define task_rcu_preempt(p) ((struct rcu_task_preempt *)NULL) > > > > The above looks a bit strange. The idea is that if you were generating > > offsets for the fields within rcu_task_preempt, you would select the > > fields? Something like the following? > > when __GENARATING_OFFSETS__, no one really use task_rcu_preempt(), > it just for avoiding the compiling warnings. > > > > > #define example_offset(p) &(((struct rcu_task_preempt *)NULL)->field_a) > > > >> +#else > >> +#include <generated/kernel-offsets.h> > >> +#define task_rcu_preempt(p) \ > >> + ((struct rcu_task_preempt *)(((void *)p) + TASK_RCU_PREEMPT)) > >> +#endif > >> + > >> +#define current_rcu_preempt() task_rcu_preempt(current) > >> +#define __rcu_read_lock() rcu_read_lock_preempt(current_rcu_preempt()) > >> +#define __rcu_read_unlock() rcu_read_unlock_preempt(current_rcu_preempt()) > >> +extern > >> +void call_rcu_preempt(struct rcu_head *head, void (*func)(struct rcu_head *)); > >> +#define call_rcu call_rcu_preempt > >> +extern void synchronize_rcu_preempt(void); > >> +#define synchronize_rcu synchronize_rcu_preempt > >> +extern void synchronize_rcu_preempt_expedited(void); > >> +#define synchronize_rcu_expedited synchronize_rcu_preempt_expedited > >> +extern long rcu_batches_completed_preempt(void); > >> +#define rcu_batches_completed rcu_batches_completed_preempt > >> +extern void rcu_barrier_preempt(void); > >> +#define rcu_barrier rcu_barrier_preempt > >> +extern void rcu_preempt_force_quiescent_state(void); > >> +#define rcu_force_quiescent_state rcu_preempt_force_quiescent_state > >> + > >> +struct task_struct; > >> +static inline void rcu_copy_process(struct task_struct *p) > >> +{ > >> + struct rcu_task_preempt *rcu_task = task_rcu_preempt(p); > >> + > >> + rcu_task->nesting = 0; > >> + rcu_task->flags = 0; > >> +} > >> + > >> +static inline void exit_rcu(void) > >> +{ > >> + if (current_rcu_preempt()->nesting) { > >> + WARN_ON(1); > >> + current_rcu_preempt()->nesting = 0; > > > > If the RCU core was waiting on this task, how does it learn that this > > task is no longer blocking the current grace period(s)? > > A schedule() will be called after exit_rcu(), and RCU core collect infos > when task do schedule(). > > > > >> + } > >> +} > >> + > >> +#else /*CONFIG_RCURING_PREEMPT */ > >> + > >> +#define __rcu_read_lock() __rcu_read_lock_sched(); > >> +#define __rcu_read_unlock() __rcu_read_unlock_sched(); > >> +#define call_rcu call_rcu_sched > >> +#define synchronize_rcu synchronize_rcu_sched > >> +#define synchronize_rcu_expedited synchronize_rcu_sched_expedited > >> +#define rcu_batches_completed rcu_batches_completed_sched > >> +#define rcu_barrier rcu_barrier_sched > >> +#define rcu_force_quiescent_state rcu_sched_force_quiescent_state > >> + > >> +static inline void rcu_copy_process(struct task_struct *p) {} > >> +static inline void exit_rcu(void) {} > >> +#endif /* CONFIG_RCURING_PREEMPT */ > >> + > >> +#ifdef CONFIG_NO_HZ > >> +extern void rcu_kernel_enter(void); > >> +extern void rcu_kernel_exit(void); > >> + > >> +static inline void rcu_nmi_enter(void) { rcu_kernel_enter(); } > >> +static inline void rcu_nmi_exit(void) { rcu_kernel_exit(); } > >> +static inline void rcu_irq_enter(void) { rcu_kernel_enter(); } > >> +static inline void rcu_irq_exit(void) { rcu_kernel_exit(); } > >> +extern void rcu_enter_nohz(void); > >> +extern void rcu_exit_nohz(void); > >> +#else > >> +static inline void rcu_nmi_enter(void) {} > >> +static inline void rcu_nmi_exit(void) {} > >> +static inline void rcu_irq_enter(void) {} > >> +static inline void rcu_irq_exit(void) {} > >> +static inline void rcu_enter_nohz(void) {} > >> +static inline void rcu_exit_nohz(void) {} > >> +#endif > >> + > >> +extern int rcu_needs_cpu(int cpu); > >> +extern void rcu_check_callbacks(int cpu, int user); > >> + > >> +extern void rcu_scheduler_starting(void); > >> +extern int rcu_scheduler_active __read_mostly; > >> + > >> +#endif /* __LINUX_RCURING_H */ > >> diff --git a/include/linux/sched.h b/include/linux/sched.h > >> index e18473f..15b332d 100644 > >> --- a/include/linux/sched.h > >> +++ b/include/linux/sched.h > >> @@ -1211,6 +1211,10 @@ struct task_struct { > >> struct rcu_node *rcu_blocked_node; > >> #endif /* #ifdef CONFIG_TREE_PREEMPT_RCU */ > >> > >> +#ifdef CONFIG_RCURING_PREEMPT > >> + struct rcu_task_preempt rcu_task_preempt; > >> +#endif > >> + > >> #if defined(CONFIG_SCHEDSTATS) || defined(CONFIG_TASK_DELAY_ACCT) > >> struct sched_info sched_info; > >> #endif > >> @@ -1757,7 +1761,7 @@ static inline void rcu_copy_process(struct task_struct *p) > >> INIT_LIST_HEAD(&p->rcu_node_entry); > >> } > >> > >> -#else > >> +#elif !defined(CONFIG_RCURING) > >> > >> static inline void rcu_copy_process(struct task_struct *p) > >> { > >> diff --git a/init/Kconfig b/init/Kconfig > >> index a619a1a..48e58d9 100644 > >> --- a/init/Kconfig > >> +++ b/init/Kconfig > >> @@ -374,6 +374,9 @@ config TINY_PREEMPT_RCU > >> for real-time UP systems. This option greatly reduces the > >> memory footprint of RCU. > >> > >> +config RCURING > >> + bool "Multi-GP ring based RCU" > >> + > >> endchoice > >> > >> config PREEMPT_RCU > >> @@ -450,6 +453,35 @@ config TREE_RCU_TRACE > >> TREE_PREEMPT_RCU implementations, permitting Makefile to > >> trivially select kernel/rcutree_trace.c. > >> > >> +config RCURING_BH > >> + bool "RCU for bh" > >> + default y > >> + depends on RCURING && !PREEMPT_RT > >> + help > >> + If Y, use a independent rcu domain for rcu_bh. > >> + If N, rcu_bh will map to rcu_sched(except rcu_read_lock_bh, > >> + rcu_read_unlock_bh). > > > > If I understand correctly, "N" defeats the purpose of bh, which is to > > have faster grace periods -- not having to wait for a context switch. > > I remembered, BH is for defeating DOS, so somebody will not need > rcu domain for BH if he don't care this kind of DOS. > > > > >> + > >> +config RCURING_BH_PREEMPT > >> + bool > >> + depends on PREEMPT_RT > >> + help > >> + rcu_bh will map to rcu_preempt. > > > > If I understand what you are doing, this config option will break > > networking. > > In PREEMPT_RT, BH will be preemptible. but RCURING_BH_PREEMPT > is not used here, will be removed. > > > > >> + > >> +config RCURING_PREEMPT > >> + bool "RCURING based reemptible RCU" > >> + default n > >> + depends on RCURING && PREEMPT > >> + help > >> + If Y, normal rcu functionality will map to rcu_preempt. > >> + If N, normal rcu functionality will map to rcu_sched. > >> + > >> +config RCURING_COUNTERS_SHIFT > >> + int "RCURING's counter shift" > >> + range 1 10 > >> + depends on RCURING > >> + default 4 > >> + > >> endmenu # "RCU Subsystem" > >> > >> config IKCONFIG > >> diff --git a/kernel/Makefile b/kernel/Makefile > >> index 92b7420..66eab8c 100644 > >> --- a/kernel/Makefile > >> +++ b/kernel/Makefile > >> @@ -86,6 +86,7 @@ obj-$(CONFIG_TREE_PREEMPT_RCU) += rcutree.o > >> obj-$(CONFIG_TREE_RCU_TRACE) += rcutree_trace.o > >> obj-$(CONFIG_TINY_RCU) += rcutiny.o > >> obj-$(CONFIG_TINY_PREEMPT_RCU) += rcutiny.o > >> +obj-$(CONFIG_RCURING) += rcuring.o > >> obj-$(CONFIG_RELAY) += relay.o > >> obj-$(CONFIG_SYSCTL) += utsname_sysctl.o > >> obj-$(CONFIG_TASK_DELAY_ACCT) += delayacct.o > >> diff --git a/kernel/kernel-offsets.c b/kernel/kernel-offsets.c > >> new file mode 100644 > >> index 0000000..0d30817 > >> --- /dev/null > >> +++ b/kernel/kernel-offsets.c > >> @@ -0,0 +1,20 @@ > >> +/* > >> + * Generate definitions needed by assembly language modules. > >> + * > >> + * Copyright (C) 2008-2010 Lai Jiangshan > >> + */ > >> + > >> +#define __GENERATING_KERNEL_OFFSETS__ > >> +#include <linux/rcupdate.h> > >> +#include <linux/sched.h> > >> +#include <linux/kbuild.h> > >> + > >> +void foo(void); > >> + > >> +void foo(void) > >> +{ > >> +#ifdef CONFIG_RCURING_PREEMPT > >> + OFFSET(TASK_RCU_PREEMPT, task_struct, rcu_task_preempt); > >> +#endif > >> +} > >> + > >> diff --git a/kernel/rcuring.c b/kernel/rcuring.c > >> new file mode 100644 > >> index 0000000..e7cedb5 > >> --- /dev/null > >> +++ b/kernel/rcuring.c > >> @@ -0,0 +1,1002 @@ > >> +/* > >> + * Read-Copy Update mechanism for mutual exclusion > >> + * > >> + * This program is free software; you can redistribute it and/or modify > >> + * it under the terms of the GNU General Public License as published by > >> + * the Free Software Foundation; either version 2 of the License, or > >> + * (at your option) any later version. > >> + * > >> + * This program is distributed in the hope that it will be useful, > >> + * but WITHOUT ANY WARRANTY; without even the implied warranty of > >> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > >> + * GNU General Public License for more details. > >> + * > >> + * You should have received a copy of the GNU General Public License > >> + * along with this program; if not, write to the Free Software > >> + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. > >> + * > >> + * Copyright IBM Corporation, 2008 > >> + * Copyright Fujitsu 2008-2010 Lai Jiangshan > >> + * > >> + * Authors: Dipankar Sarma <dipankar@in.ibm.com> > >> + * Manfred Spraul <manfred@colorfullife.com> > >> + * Paul E. McKenney <paulmck@linux.vnet.ibm.com> Hierarchical version > >> + * Lai Jiangshan <laijs@cn.fujitsu.com> RCURING version > >> + */ > >> + > >> +#include <linux/rcupdate.h> > >> +#include <linux/percpu.h> > >> +#include <asm/atomic.h> > >> +#include <linux/spinlock.h> > >> +#include <linux/module.h> > >> +#include <linux/interrupt.h> > >> +#include <linux/cpu.h> > >> + > >> +#define RCURING_IDX_SHIFT CONFIG_RCURING_COUNTERS_SHIFT > >> +#define RCURING_IDX_MAX (1L << RCURING_IDX_SHIFT) > >> +#define RCURING_IDX_MASK (RCURING_IDX_MAX - 1) > >> +#define RCURING_IDX(complete) ((complete) & RCURING_IDX_MASK) > >> + > >> +struct rcuring { > >> + atomic_t ctr[RCURING_IDX_MAX]; > >> + long curr_complete; > >> + long done_complete; > >> + > >> + raw_spinlock_t lock; > >> +}; > >> + > >> +/* It is long history that we use -300 as an initial complete */ > >> +#define RCURING_INIT_COMPLETE -300L > >> +#define RCURING_INIT(name) \ > >> +{ \ > >> + {[RCURING_IDX(RCURING_INIT_COMPLETE)] = ATOMIC_INIT(1),}, \ > >> + RCURING_INIT_COMPLETE, \ > >> + RCURING_INIT_COMPLETE - 1, \ > >> + __RAW_SPIN_LOCK_UNLOCKED(name.lock), \ > > > > Please use the gcc-style initializers here. > > I will be use gcc-style initializers in next version. > > > > >> +} > >> + > >> +static inline > >> +int rcuring_ctr_read(struct rcuring *rr, unsigned long complete) > >> +{ > >> + int res; > >> + > >> + /* atomic_read() does not ensure to imply barrier(). */ > >> + barrier(); > >> + res = atomic_read(rr->ctr + RCURING_IDX(complete)); > >> + barrier(); > >> + > >> + return res; > >> +} > >> + > >> +void __rcuring_advance_done_complete(struct rcuring *rr) > >> +{ > >> + long wait_complete = rr->done_complete + 1; > >> + > >> + if (!rcuring_ctr_read(rr, wait_complete)) { > >> + do { > >> + wait_complete++; > >> + } while (!rcuring_ctr_read(rr, wait_complete)); > >> + > >> + ACCESS_ONCE(rr->done_complete) = wait_complete - 1; > >> + } > >> +} > > > > For a given grace period, all quiescent states hit the same counter. > > Or am I missing something? > > No. see the top of this email. > > > > >> + > >> +static inline > >> +int rcuring_could_advance_done_complete(struct rcuring *rr) > >> +{ > >> + return !rcuring_ctr_read(rr, rr->done_complete + 1); > >> +} > >> + > >> +static inline > >> +void rcuring_advance_done_complete(struct rcuring *rr) > >> +{ > >> + if (rcuring_could_advance_done_complete(rr)) { > >> + if (__raw_spin_trylock(&rr->lock)) { > >> + __rcuring_advance_done_complete(rr); > >> + __raw_spin_unlock(&rr->lock); > >> + } > >> + } > >> +} > >> + > >> +void __rcuring_advance_curr_complete(struct rcuring *rr, long next_complete) > >> +{ > >> + long curr_complete = rr->curr_complete; > >> + > >> + if (curr_complete + 1 == next_complete) { > >> + if (!rcuring_ctr_read(rr, next_complete)) { > >> + ACCESS_ONCE(rr->curr_complete) = next_complete; > >> + smp_mb__before_atomic_inc(); > >> + atomic_inc(rr->ctr + RCURING_IDX(next_complete)); > >> + smp_mb__after_atomic_inc(); > >> + atomic_dec(rr->ctr + RCURING_IDX(curr_complete)); > >> + } > >> + } > >> +} > >> + > >> +static inline > >> +int rcuring_could_advance_complete(struct rcuring *rr, long next_complete) > >> +{ > >> + if (rcuring_could_advance_done_complete(rr)) > >> + return 1; > >> + > >> + if (rr->curr_complete + 1 == next_complete) { > >> + if (!rcuring_ctr_read(rr, next_complete)) > >> + return 1; > >> + } > >> + > >> + return 0; > >> +} > >> + > >> +static inline > >> +void rcuring_advance_complete(struct rcuring *rr, long next_complete) > >> +{ > >> + if (rcuring_could_advance_complete(rr, next_complete)) { > >> + if (__raw_spin_trylock(&rr->lock)) { > >> + __rcuring_advance_done_complete(rr); > >> + __rcuring_advance_curr_complete(rr, next_complete); > >> + __raw_spin_unlock(&rr->lock); > >> + } > >> + } > >> +} > >> + > >> +static inline > >> +void rcuring_advance_complete_force(struct rcuring *rr, long next_complete) > >> +{ > >> + if (rcuring_could_advance_complete(rr, next_complete)) { > >> + __raw_spin_lock(&rr->lock); > >> + __rcuring_advance_done_complete(rr); > >> + __rcuring_advance_curr_complete(rr, next_complete); > >> + __raw_spin_unlock(&rr->lock); > >> + } > >> +} > >> + > >> +static inline long __locked_complete_adjust(int locked_idx, long complete) > >> +{ > >> + long locked_complete; > >> + > >> + locked_complete = (complete & ~RCURING_IDX_MASK) | (long)locked_idx; > >> + if (locked_complete - complete <= 0) > >> + return locked_complete; > >> + else > >> + return locked_complete - RCURING_IDX_MAX; > >> +} > >> + > >> +static long rcuring_lock(struct rcuring *rr) > >> +{ > >> + long locked_complete; > >> + int idx; > >> + > >> + for (;;) { > >> + idx = RCURING_IDX(ACCESS_ONCE(rr->curr_complete)); > >> + if (likely(atomic_inc_not_zero(rr->ctr + idx))) > >> + break; > >> + } > >> + smp_mb__after_atomic_inc(); > >> + > >> + locked_complete = ACCESS_ONCE(rr->curr_complete); > >> + if (likely(RCURING_IDX(locked_complete) == idx)) > >> + return locked_complete; > >> + else > >> + return __locked_complete_adjust(idx, locked_complete); > >> +} > >> + > >> +static void rcuring_unlock(struct rcuring *rr, long locked_complete) > >> +{ > >> + smp_mb__before_atomic_dec(); > >> + > >> + atomic_dec(rr->ctr + RCURING_IDX(locked_complete)); > >> +} > >> + > >> +static inline > >> +void rcuring_dup_lock(struct rcuring *rr, long locked_complete) > >> +{ > >> + atomic_inc(rr->ctr + RCURING_IDX(locked_complete)); > >> +} > >> + > >> +static inline > >> +long rcuring_advance_lock(struct rcuring *rr, long locked_complete) > >> +{ > >> + long new_locked_complete = locked_complete; > >> + > >> + if (locked_complete != rr->curr_complete) { > >> + /* > >> + * Lock the new complete at first, and then release > >> + * the old one. It prevents errors when NMI/SMI occurs > >> + * after rcuring_unlock() - we still lock it. > >> + */ > >> + new_locked_complete = rcuring_lock(rr); > >> + rcuring_unlock(rr, locked_complete); > >> + } > >> + > >> + return new_locked_complete; > >> +} > >> + > >> +static inline long rcuring_get_done_complete(struct rcuring *rr) > >> +{ > >> + return ACCESS_ONCE(rr->done_complete); > >> +} > >> + > >> +static inline long rcuring_get_curr_complete(struct rcuring *rr) > >> +{ > >> + return ACCESS_ONCE(rr->curr_complete); > >> +} > >> + > >> +struct rcu_batch { > >> + struct rcu_head *list, **tail; > >> +}; > >> + > >> +static void rcu_batch_merge(struct rcu_batch *to, struct rcu_batch *from) > >> +{ > >> + if (from->list != NULL) { > >> + *to->tail = from->list; > >> + to->tail = from->tail; > >> + > >> + from->list = NULL; > >> + from->tail = &from->list; > >> + } > >> +} > >> + > >> +static void rcu_batch_add(struct rcu_batch *batch, struct rcu_head *new) > >> +{ > >> + *batch->tail = new; > >> + batch->tail = &new->next; > >> +} > >> + > >> +struct rcu_data { > >> + long locked_complete; > >> + > >> + /* > >> + * callbacks are in batches (done_complete, curr_complete] > >> + * curr_complete - done_complete >= 0 > >> + * curr_complete - done_complete <= RCURING_IDX_MAX > >> + * domain's curr_complete - curr_complete >= 0 > >> + * domain's done_complete - done_complete >= 0 > >> + * > >> + * curr_complete == done_complete just means @batch is empty. > >> + * we have at least one callback in batch[RCURING_IDX(done_complete)] > >> + * if curr_complete != done_complete > >> + */ > >> + long curr_complete; > >> + long done_complete; > >> + > >> + struct rcu_batch batch[RCURING_IDX_MAX]; > >> + struct rcu_batch done_batch; > >> + > >> + unsigned int qlen; > >> + unsigned long jiffies; > >> + unsigned long stall_jiffies; > >> +}; > >> + > >> +struct rcu_domain { > >> + struct rcuring rcuring; > >> + const char *domain_name; > >> + struct rcu_data __percpu *rcu_data; > >> + > >> + spinlock_t lock; /* this lock is for fqs or other misc things */ > >> + long fqs_complete; > >> + void (*force_quiescent_state)(struct rcu_domain *domain, > >> + long fqs_complete); > >> +}; > >> + > >> +#define EXPEDITED_GP_RESERVED_RECOMMEND (RCURING_IDX_SHIFT - 1) > >> + > >> +static inline long gp_reserved(struct rcu_domain *domain) > >> +{ > >> + return EXPEDITED_GP_RESERVED_RECOMMEND; > >> +} > >> + > >> +static inline void __init rcu_data_init(struct rcu_data *rdp) > >> +{ > >> + int idx; > >> + > >> + for (idx = 0; idx < RCURING_IDX_MAX; idx++) > >> + rdp->batch[idx].tail = &rdp->batch[idx].list; > >> + > >> + rdp->done_batch.tail = &rdp->done_batch.list; > >> +} > >> + > >> +static void do_advance_callbacks(struct rcu_data *rdp, long done_complete) > >> +{ > >> + int idx; > >> + > >> + while (rdp->done_complete != done_complete) { > >> + rdp->done_complete++; > >> + idx = RCURING_IDX(rdp->done_complete); > >> + rcu_batch_merge(&rdp->done_batch, rdp->batch + idx); > >> + } > >> +} > >> + > >> +/* Is value in the range (@left_open, @right_close] */ > >> +static inline bool in_range(long left_open, long right_close, long value) > >> +{ > >> + return left_open - value < 0 && value - right_close <= 0; > >> +} > >> + > >> +static void advance_callbacks(struct rcu_data *rdp, long done_complete, > >> + long curr_complete) > >> +{ > >> + if (in_range(done_complete, curr_complete, rdp->curr_complete)) { > >> + do_advance_callbacks(rdp, done_complete); > >> + } else { > >> + do_advance_callbacks(rdp, rdp->curr_complete); > >> + rdp->curr_complete = done_complete; > >> + rdp->done_complete = done_complete; > >> + } > >> +} > >> + > >> +static void __force_quiescent_state(struct rcu_domain *domain, > >> + long fqs_complete) > >> +{ > >> + int cpu; > >> + long fqs_complete_old; > >> + long curr_complete = rcuring_get_curr_complete(&domain->rcuring); > >> + long done_complete = rcuring_get_done_complete(&domain->rcuring); > >> + > >> + spin_lock(&domain->lock); > >> + > >> + fqs_complete_old = domain->fqs_complete; > >> + if (!in_range(done_complete, curr_complete, fqs_complete_old)) > >> + fqs_complete_old = done_complete; > >> + > >> + if (fqs_complete_old - fqs_complete >= 0) { > >> + spin_unlock(&domain->lock); > >> + return; > >> + } > >> + > >> + domain->fqs_complete = fqs_complete; > >> + spin_unlock(&domain->lock); > >> + > >> + for_each_online_cpu(cpu) { > >> + struct rcu_data *rdp = per_cpu_ptr(domain->rcu_data, cpu); > >> + long locked_complete = ACCESS_ONCE(rdp->locked_complete); > >> + > >> + if (!in_range(fqs_complete_old, fqs_complete, locked_complete)) > >> + continue; > > > > Is preemption disabled here? > > Yes, it called with local irq disabled. > > > > >> + > >> + if (cpu == smp_processor_id()) > >> + set_tsk_need_resched(current); > >> + else > >> + smp_send_reschedule(cpu); > >> + } > >> +} > >> + > >> +static void force_quiescent_state(struct rcu_domain *domain, long fqs_complete) > >> +{ > >> + domain->force_quiescent_state(domain, fqs_complete); > >> +} > >> + > >> +static > >> +long prepare_for_new_callback(struct rcu_domain *domain, struct rcu_data *rdp) > >> +{ > >> + long curr_complete, done_complete; > >> + struct rcuring *rr = &domain->rcuring; > >> + > >> + smp_mb(); > >> + curr_complete = rcuring_get_curr_complete(rr); > >> + done_complete = rcuring_get_done_complete(rr); > >> + > >> + advance_callbacks(rdp, done_complete, curr_complete); > >> + rdp->curr_complete = curr_complete; > >> + > >> + if (!rdp->qlen) { > >> + rdp->jiffies = jiffies; > >> + rdp->stall_jiffies = jiffies; > >> + } > >> + > >> + return curr_complete; > >> +} > >> + > >> +static long __call_rcu(struct rcu_domain *domain, struct rcu_head *head, > >> + void (*func)(struct rcu_head *)) > >> +{ > >> + struct rcu_data *rdp; > >> + long curr_complete; > >> + > >> + head->next = NULL; > >> + head->func = func; > >> + > >> + rdp = this_cpu_ptr(domain->rcu_data); > >> + curr_complete = prepare_for_new_callback(domain, rdp); > >> + rcu_batch_add(rdp->batch + RCURING_IDX(curr_complete), head); > >> + rdp->qlen++; > >> + > >> + return curr_complete; > >> +} > >> + > >> +static void print_rcuring_stall(struct rcu_domain *domain) > >> +{ > >> + struct rcuring *rr = &domain->rcuring; > >> + unsigned long curr_complete = rcuring_get_curr_complete(rr); > >> + unsigned long done_complete = rcuring_get_done_complete(rr); > >> + int cpu; > >> + > >> + printk(KERN_ERR "RCU is stall, cpu=%d, domain_name=%s\n", smp_processor_id(), > >> + domain->domain_name); > >> + > >> + printk(KERN_ERR "domain's complete(done/curr)=%ld/%ld\n", > >> + curr_complete, done_complete); > >> + printk(KERN_ERR "curr_ctr=%d, wait_ctr=%d\n", > >> + rcuring_ctr_read(rr, curr_complete), > >> + rcuring_ctr_read(rr, done_complete + 1)); > >> + > >> + for_each_online_cpu(cpu) { > >> + struct rcu_data *rdp = per_cpu_ptr(domain->rcu_data, cpu); > >> + printk(KERN_ERR "cpu=%d, qlen=%d, complete(locked/dome/curr/)=" > >> + "%ld/%ld/%ld\n", cpu, rdp->qlen, > >> + rdp->locked_complete, rdp->done_complete, > >> + rdp->curr_complete); > >> + } > >> +} > >> + > >> +static void __rcu_check_callbacks(struct rcu_domain *domain, > >> + struct rcu_data *rdp, int in_rcu_softirq) > >> +{ > >> + long curr_complete, done_complete; > >> + struct rcuring *rr = &domain->rcuring; > >> + > >> + if (!rdp->qlen) > >> + return; > >> + > >> + rcuring_advance_done_complete(rr); > >> + > >> + curr_complete = rcuring_get_curr_complete(rr); > >> + done_complete = ACCESS_ONCE(rr->done_complete); > >> + advance_callbacks(rdp, done_complete, curr_complete); > >> + > >> + if (rdp->curr_complete == curr_complete > >> + && rdp->batch[RCURING_IDX(rdp->curr_complete)].list) { > >> + long max_gp_allow = RCURING_IDX_MAX - gp_reserved(domain); > >> + > >> + if (curr_complete - done_complete <= max_gp_allow) > >> + rcuring_advance_complete(rr, curr_complete + 1); > >> + } > >> + > >> + if (rdp->done_batch.list) { > >> + if (!in_rcu_softirq) > >> + raise_softirq(RCU_SOFTIRQ); > >> + } else { > >> + if (jiffies - rdp->jiffies > 10) { > >> + force_quiescent_state(domain, rdp->curr_complete); > >> + rdp->jiffies = jiffies; > >> + } > >> + if (jiffies - rdp->stall_jiffies > 2 * HZ) { > >> + print_rcuring_stall(domain); > >> + rdp->stall_jiffies = jiffies; > >> + } > >> + } > >> +} > >> + > >> +static void rcu_do_batch(struct rcu_domain *domain, struct rcu_data *rdp) > >> +{ > >> + int count = 0; > >> + struct rcu_head *list, *next; > >> + > >> + list = rdp->done_batch.list; > >> + > >> + if (list) { > >> + rdp->done_batch.list = NULL; > >> + rdp->done_batch.tail = &rdp->done_batch.list; > >> + > >> + local_irq_enable(); > >> + > >> + smp_mb(); > >> + > >> + while (list) { > >> + next = list->next; > >> + prefetch(next); > >> + list->func(list); > >> + count++; > >> + list = next; > >> + } > >> + local_irq_disable(); > >> + > >> + rdp->qlen -= count; > >> + rdp->jiffies = jiffies; > >> + } > >> +} > >> + > >> +static int __rcu_needs_cpu(struct rcu_data *rdp) > >> +{ > >> + return !!rdp->qlen; > >> +} > >> + > >> +static inline > >> +void __rcu_qsctr_inc(struct rcu_domain *domain, struct rcu_data *rdp) > >> +{ > >> + rdp->locked_complete = rcuring_advance_lock(&domain->rcuring, > >> + rdp->locked_complete); > >> +} > >> + > >> +static inline void rcu_preempt_save_complete(struct rcu_task_preempt *rcu_task, > >> + long locked_complete) > >> +{ > >> + unsigned int new_flags; > >> + > >> + new_flags = RCURING_PREEMPT_FLAGS | RCURING_IDX(locked_complete); > >> + ACCESS_ONCE(rcu_task->flags) = new_flags; > >> +} > >> + > >> +/* > >> + * Every cpu hold a lock_complete. But for preempt rcu, any task may also > >> + * hold a lock_complete. This function increases lock_complete for the current > >> + * cpu and check(dup_lock_and_save or release or advance) the lock_complete of > >> + * the current task. > >> + */ > >> +static inline > >> +void __rcu_preempt_qsctr_inc(struct rcu_domain *domain, struct rcu_data *rdp, > >> + struct rcu_task_preempt *rcu_task) > >> +{ > >> + long locked_complete = rdp->locked_complete; > >> + unsigned int flags = ACCESS_ONCE(rcu_task->flags); > >> + > >> + if (!(flags & RCURING_PREEMPT_SAVED)) { > >> + BUG_ON(flags & RCURING_PREEMPT_QS); > >> + if (rcu_task->nesting) { > >> + /* dup_lock_and_save current task's lock_complete */ > >> + rcuring_dup_lock(&domain->rcuring, locked_complete); > >> + rcu_preempt_save_complete(rcu_task, locked_complete); > >> + } > >> + } else if (!(rcu_task->nesting)) { > >> + /* release current task's lock_complete */ > >> + rcuring_unlock(&domain->rcuring, (long)flags); > >> + ACCESS_ONCE(rcu_task->flags) = 0; > >> + } else if (!(flags & RCURING_PREEMPT_QS)) { > >> + /* advance_and_save current task's lock_complete */ > >> + if (RCURING_IDX(locked_complete) != RCURING_IDX((long)flags)) { > >> + rcuring_unlock(&domain->rcuring, (long)flags); > >> + rcuring_dup_lock(&domain->rcuring, locked_complete); > >> + } > >> + rcu_preempt_save_complete(rcu_task, locked_complete); > >> + } > >> + > >> + /* increases lock_complete for the current cpu */ > >> + rdp->locked_complete = rcuring_advance_lock(&domain->rcuring, > >> + locked_complete); > >> +} > > > > When we need to boost the priority of preempted readers, how do we tell > > who they are? > > And struct list_head in struct rcu_task_preempt, and a ring struct list_head > in rdp, and use percpu locks or scheduler's runqueues's locks > protect all these struct list_head. > > > > >> + > >> +static void __synchronize_rcu(struct rcu_domain *domain, int expedited) > >> +{ > >> + struct rcu_synchronize rcu; > >> + long complete; > >> + > >> + init_completion(&rcu.completion); > >> + > >> + local_irq_disable(); > >> + complete = __call_rcu(domain, &rcu.head, wakeme_after_rcu); > >> + > >> + if (expedited) { > >> + /* > >> + * Fore a new gp to be started immediately(can use reserved gp). > >> + * But if all gp are used, it will fail to start new gp, > >> + * and we have to wait until some new gps available. > >> + */ > >> + rcuring_advance_complete_force(&domain->rcuring, > >> + complete + 1); > > > > It looks to me like rcuring_advance_complete_force() can just fail > > silently. How does this work? > > If it fails, We have no GPs lefts, synchronize_rcu_expedited() > will becomes synchronize_rcu(). the next fqs will expedite it only a little. > > It's OK, we have 3 reserved GPs by default, It means we allows 3 > synchronize_rcu_expedited()s concurrent at least and it's enough. > > > > >> + > >> + /* Fore qs and expedite the new gp */ > >> + force_quiescent_state(domain, complete); > >> + } > >> + local_irq_enable(); > >> + > >> + wait_for_completion(&rcu.completion); > >> +} > >> + > >> +static inline long __rcu_batches_completed(struct rcu_domain *domain) > >> +{ > >> + return rcuring_get_done_complete(&domain->rcuring); > >> +} > >> + > >> +#ifdef CONFIG_RCURING_BH > >> +#define gen_code_for_bh(gen_code, args...) gen_code(bh, ##args) > > > > Where is gen_code() defined? > > > >> +#else > >> +#define gen_code_for_bh(gen_code, args...) > >> +#endif > >> +#define gen_code_for_sched(gen_code, args...) gen_code(sched, ##args) > >> +#ifdef CONFIG_RCURING_PREEMPT > >> +#define gen_code_for_preempt(gen_code, args...) gen_code(preempt, ##args) > >> +#else > >> +#define gen_code_for_preempt(gen_code, args...) > >> +#endif > >> + > >> +#define GEN_CODES(gen_code, args...) \ > >> + gen_code_for_bh(gen_code, ##args) \ > >> + gen_code_for_sched(gen_code, ##args) \ > >> + gen_code_for_preempt(gen_code, ##args) \ > >> + > >> +#define gen_basic_code(domain) \ > >> + \ > >> +static void force_quiescent_state_##domain(struct rcu_domain *domain, \ > >> + long fqs_complete); \ > >> + \ > >> +static DEFINE_PER_CPU(struct rcu_data, rcu_data_##domain); \ > >> + \ > >> +struct rcu_domain rcu_domain_##domain = \ > >> +{ \ > >> + .rcuring = RCURING_INIT(rcu_domain_##domain.rcuring), \ > >> + .domain_name = #domain, \ > >> + .rcu_data = &rcu_data_##domain, \ > >> + .lock = __SPIN_LOCK_UNLOCKED(rcu_domain_##domain.lock), \ > >> + .force_quiescent_state = force_quiescent_state_##domain, \ > >> +}; \ > >> + \ > >> +void call_rcu_##domain(struct rcu_head *head, \ > >> + void (*func)(struct rcu_head *)) \ > >> +{ \ > >> + unsigned long flags; \ > >> + \ > >> + local_irq_save(flags); \ > >> + __call_rcu(&rcu_domain_##domain, head, func); \ > >> + local_irq_restore(flags); \ > >> +} \ > >> +EXPORT_SYMBOL_GPL(call_rcu_##domain); \ > >> + \ > >> +void synchronize_rcu_##domain(void) \ > >> +{ \ > >> + __synchronize_rcu(&rcu_domain_##domain, 0); \ > >> +} \ > >> +EXPORT_SYMBOL_GPL(synchronize_rcu_##domain); \ > >> + \ > >> +void synchronize_rcu_##domain##_expedited(void) \ > >> +{ \ > >> + __synchronize_rcu(&rcu_domain_##domain, 1); \ > >> +} \ > >> +EXPORT_SYMBOL_GPL(synchronize_rcu_##domain##_expedited); \ > >> + \ > >> +long rcu_batches_completed_##domain(void) \ > >> +{ \ > >> + return __rcu_batches_completed(&rcu_domain_##domain); \ > >> +} \ > >> +EXPORT_SYMBOL_GPL(rcu_batches_completed_##domain); \ > >> + \ > >> +void rcu_##domain##_force_quiescent_state(void) \ > >> +{ \ > >> + force_quiescent_state(&rcu_domain_##domain, 0); \ > >> +} \ > >> +EXPORT_SYMBOL_GPL(rcu_##domain##_force_quiescent_state); > > > > I used to use a single macro for the synchronize_rcu*() APIs, but was > > talked into separate functions. Maybe some of the more ornate recent > > macros have shifted people away from duplicate code in functions, so > > might be time to try again. ;-) > > Too much dumplicated code, I have to do this :-) > > > > >> +GEN_CODES(gen_basic_code) > >> + > >> +static DEFINE_PER_CPU(int, kernel_count); > >> + > >> +#define LOCK_COMPLETE(domain) \ > >> + long complete_##domain = rcuring_lock(&rcu_domain_##domain.rcuring); > >> +#define SAVE_COMPLETE(domain) \ > >> + per_cpu(rcu_data_##domain, cpu).locked_complete = complete_##domain; > >> +#define LOAD_COMPLETE(domain) \ > >> + int complete_##domain = per_cpu(rcu_data_##domain, cpu).locked_complete; > >> +#define RELEASE_COMPLETE(domain) \ > >> + rcuring_unlock(&rcu_domain_##domain.rcuring, complete_##domain); > >> + > >> +static void __rcu_kernel_enter_outmost(int cpu) > >> +{ > >> + GEN_CODES(LOCK_COMPLETE) > >> + > >> + barrier(); > >> + per_cpu(kernel_count, cpu) = 1; > >> + barrier(); > >> + > >> + GEN_CODES(SAVE_COMPLETE) > >> +} > >> + > >> +static void __rcu_kernel_exit_outmost(int cpu) > >> +{ > >> + GEN_CODES(LOAD_COMPLETE) > >> + > >> + barrier(); > >> + per_cpu(kernel_count, cpu) = 0; > >> + barrier(); > >> + > >> + GEN_CODES(RELEASE_COMPLETE) > >> +} > >> + > >> +#ifdef CONFIG_RCURING_BH > >> +static void force_quiescent_state_bh(struct rcu_domain *domain, > >> + long fqs_complete) > >> +{ > >> + __force_quiescent_state(domain, fqs_complete); > >> +} > >> + > >> +static inline void rcu_bh_qsctr_inc_irqoff(int cpu) > >> +{ > >> + __rcu_qsctr_inc(&rcu_domain_bh, &per_cpu(rcu_data_bh, cpu)); > >> +} > >> + > >> +void rcu_bh_qs(int cpu) > >> +{ > >> + unsigned long flags; > >> + > >> + local_irq_save(flags); > >> + rcu_bh_qsctr_inc_irqoff(cpu); > >> + local_irq_restore(flags); > >> +} > >> + > >> +#else /* CONFIG_RCURING_BH */ > >> +static inline void rcu_bh_qsctr_inc_irqoff(int cpu) { (void)(cpu); } > >> +#endif /* CONFIG_RCURING_BH */ > >> + > >> +static void force_quiescent_state_sched(struct rcu_domain *domain, > >> + long fqs_complete) > >> +{ > >> + __force_quiescent_state(domain, fqs_complete); > >> +} > >> + > >> +static inline void rcu_sched_qsctr_inc_irqoff(int cpu) > >> +{ > >> + __rcu_qsctr_inc(&rcu_domain_sched, &per_cpu(rcu_data_sched, cpu)); > >> +} > >> + > >> +#ifdef CONFIG_RCURING_PREEMPT > >> +static void force_quiescent_state_preempt(struct rcu_domain *domain, > >> + long fqs_complete) > >> +{ > >> + /* To Be Implemented */ > >> +} > >> + > >> +static inline void rcu_preempt_qsctr_inc_irqoff(int cpu) > >> +{ > >> + __rcu_preempt_qsctr_inc(&rcu_domain_preempt, > >> + &per_cpu(rcu_data_preempt, cpu), > >> + ¤t->rcu_task_preempt); > >> +} > >> + > >> +#else /* CONFIG_RCURING_PREEMPT */ > >> +static inline void rcu_preempt_qsctr_inc_irqoff(int cpu) > >> +{ > >> + (void)(cpu); > >> + (void)(__rcu_preempt_qsctr_inc); > >> +} > >> +#endif /* CONFIG_RCURING_PREEMPT */ > >> + > >> +#define gen_needs_cpu(domain, cpu) \ > >> + if (__rcu_needs_cpu(&per_cpu(rcu_data_##domain, cpu))) \ > >> + return 1; > >> +int rcu_needs_cpu(int cpu) > >> +{ > >> + GEN_CODES(gen_needs_cpu, cpu) > >> + return 0; > >> +} > >> + > >> +#define call_func(domain, func, args...) \ > >> + func(&rcu_domain_##domain, &__get_cpu_var(rcu_data_##domain), ##args); > >> + > >> +/* > >> + * Note a context switch. This is a quiescent state for RCU-sched, > >> + * and requires special handling for preemptible RCU. > >> + */ > >> +void rcu_note_context_switch(int cpu) > >> +{ > >> + unsigned long flags; > >> + > >> +#ifdef CONFIG_HOTPLUG_CPU > >> + /* > >> + * The stoper thread need to schedule() to the idle thread > >> + * after CPU_DYING. > >> + */ > >> + if (unlikely(!per_cpu(kernel_count, cpu))) { > >> + BUG_ON(cpu_online(cpu)); > >> + return; > >> + } > >> +#endif > >> + > >> + local_irq_save(flags); > >> + rcu_bh_qsctr_inc_irqoff(cpu); > >> + rcu_sched_qsctr_inc_irqoff(cpu); > >> + rcu_preempt_qsctr_inc_irqoff(cpu); > >> + local_irq_restore(flags); > >> +} > >> + > >> +static int rcu_cpu_idle(int cpu) > >> +{ > >> + return idle_cpu(cpu) && rcu_scheduler_active && > >> + !in_softirq() && hardirq_count() <= (1 << HARDIRQ_SHIFT); > >> +} > >> + > >> +void rcu_check_callbacks(int cpu, int user) > >> +{ > >> + unsigned long flags; > >> + > >> + local_irq_save(flags); > >> + > >> + if (user || rcu_cpu_idle(cpu)) { > >> + rcu_bh_qsctr_inc_irqoff(cpu); > >> + rcu_sched_qsctr_inc_irqoff(cpu); > >> + rcu_preempt_qsctr_inc_irqoff(cpu); > >> + } else if (!in_softirq()) > >> + rcu_bh_qsctr_inc_irqoff(cpu); > >> + > >> + GEN_CODES(call_func, __rcu_check_callbacks, 0) > >> + local_irq_restore(flags); > >> +} > >> + > >> +static void rcu_process_callbacks(struct softirq_action *unused) > >> +{ > >> + local_irq_disable(); > >> + GEN_CODES(call_func, __rcu_check_callbacks, 1) > >> + GEN_CODES(call_func, rcu_do_batch) > >> + local_irq_enable(); > >> +} > >> + > >> +static void __cpuinit __rcu_offline_cpu(struct rcu_domain *domain, > >> + struct rcu_data *off_rdp, struct rcu_data *rdp) > >> +{ > >> + if (off_rdp->qlen > 0) { > >> + unsigned long flags; > >> + long curr_complete; > >> + > >> + /* move all callbacks in @off_rdp to @off_rdp->done_batch */ > >> + do_advance_callbacks(off_rdp, off_rdp->curr_complete); > >> + > >> + /* move all callbacks in @off_rdp to this cpu(@rdp) */ > >> + local_irq_save(flags); > >> + curr_complete = prepare_for_new_callback(domain, rdp); > >> + rcu_batch_merge(rdp->batch + RCURING_IDX(curr_complete), > >> + &off_rdp->done_batch); > >> + rdp->qlen += off_rdp->qlen; > >> + local_irq_restore(flags); > >> + > >> + off_rdp->qlen = 0; > >> + } > >> +} > >> + > >> +#define gen_offline(domain, cpu, recieve_cpu) \ > >> + __rcu_offline_cpu(&rcu_domain_##domain, \ > >> + &per_cpu(rcu_data_##domain, cpu), \ > >> + &per_cpu(rcu_data_##domain, recieve_cpu)); > >> + > >> +static int __cpuinit rcu_cpu_notify(struct notifier_block *self, > >> + unsigned long action, void *hcpu) > >> +{ > >> + int cpu = (long)hcpu; > >> + int recieve_cpu; > >> + > >> + (void)(recieve_cpu); > >> + (void)(__rcu_offline_cpu); > >> + (void)(__rcu_kernel_exit_outmost); > >> + > >> + switch (action) { > >> +#ifdef CONFIG_HOTPLUG_CPU > >> + case CPU_DYING: > >> + case CPU_DYING_FROZEN: > >> + /* > >> + * the machine is stopped now, we can access to > >> + * any other cpu data. > >> + * */ > >> + recieve_cpu = cpumask_any_but(cpu_online_mask, cpu); > >> + GEN_CODES(gen_offline, cpu, recieve_cpu) > >> + __rcu_kernel_exit_outmost(cpu); > >> + break; > >> +#endif > >> + case CPU_STARTING: > >> + case CPU_STARTING_FROZEN: > >> + __rcu_kernel_enter_outmost(cpu); > >> + default: > >> + break; > >> + } > >> + return NOTIFY_OK; > >> +} > >> + > >> +#define rcu_init_domain_data(domain, cpu) \ > >> + for_each_possible_cpu(cpu) \ > >> + rcu_data_init(&per_cpu(rcu_data_##domain, cpu)); > >> + > >> +void __init rcu_init(void) > >> +{ > >> + int cpu; > >> + > >> + GEN_CODES(rcu_init_domain_data, cpu) > > > > This one actually consumes more lines than would writing out the > > calls explicitly. ;-) > > like following? more lines and more "#ifdef" > GEN_CODES reduces dumplicated code and reduces "#ifdef"s. > > #ifdef CONFIG_RCURING_BH > for_each_possible_cpu(cpu) > rcu_data_init(&per_cpu(rcu_data_bh, cpu)); > #endif > for_each_possible_cpu(cpu) > rcu_data_init(&per_cpu(rcu_data_sched, cpu)); > #ifdef CONFIG_RCURING_PREEMPT > for_each_possible_cpu(cpu) > rcu_data_init(&per_cpu(rcu_data_preempt, cpu)); > #endif > > > > >> + > >> + __rcu_kernel_enter_outmost(raw_smp_processor_id()); > >> + open_softirq(RCU_SOFTIRQ, rcu_process_callbacks); > >> + cpu_notifier(rcu_cpu_notify, 0); > >> +} > >> + > >> +int rcu_scheduler_active __read_mostly; > >> +EXPORT_SYMBOL_GPL(rcu_scheduler_active); > >> + > >> +/* > >> + * This function is invoked towards the end of the scheduler's initialization > >> + * process. Before this is called, the idle task might contain > >> + * RCU read-side critical sections (during which time, this idle > >> + * task is booting the system). After this function is called, the > >> + * idle tasks are prohibited from containing RCU read-side critical > >> + * sections. This function also enables RCU lockdep checking. > >> + */ > >> +void rcu_scheduler_starting(void) > >> +{ > >> + rcu_scheduler_active = 1; > >> +} > >> + > >> +#ifdef CONFIG_NO_HZ > >> + > >> +void rcu_kernel_enter(void) > >> +{ > >> + unsigned long flags; > >> + > >> + raw_local_irq_save(flags); > >> + if (__get_cpu_var(kernel_count) == 0) > >> + __rcu_kernel_enter_outmost(raw_smp_processor_id()); > >> + else > >> + __get_cpu_var(kernel_count)++; > >> + raw_local_irq_restore(flags); > >> +} > >> + > >> +void rcu_kernel_exit(void) > >> +{ > >> + unsigned long flags; > >> + > >> + raw_local_irq_save(flags); > >> + if (__get_cpu_var(kernel_count) == 1) > >> + __rcu_kernel_exit_outmost(raw_smp_processor_id()); > >> + else > >> + __get_cpu_var(kernel_count)--; > >> + raw_local_irq_restore(flags); > >> +} > >> + > >> +void rcu_enter_nohz(void) > >> +{ > >> + unsigned long flags; > >> + > >> + raw_local_irq_save(flags); > >> + __rcu_kernel_exit_outmost(raw_smp_processor_id()); > >> + raw_local_irq_restore(flags); > >> +} > >> + > >> +void rcu_exit_nohz(void) > >> +{ > >> + unsigned long flags; > >> + > >> + raw_local_irq_save(flags); > >> + __rcu_kernel_enter_outmost(raw_smp_processor_id()); > >> + raw_local_irq_restore(flags); > >> +} > >> +#endif /* CONFIG_NO_HZ */ > >> + > >> +static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head); > >> +static struct completion rcu_barrier_completion; > >> +static struct kref rcu_barrier_wait_ref; > >> +static DEFINE_MUTEX(rcu_barrier_mutex); > >> + > >> +static void rcu_barrier_release_ref(struct kref *notused) > >> +{ > >> + complete(&rcu_barrier_completion); > >> +} > >> + > >> +static void rcu_barrier_callback(struct rcu_head *notused) > >> +{ > >> + kref_put(&rcu_barrier_wait_ref, rcu_barrier_release_ref); > >> +} > >> + > >> +static void rcu_barrier_func(void *data) > >> +{ > >> + struct rcu_domain *rcu_domain = data; > >> + > >> + kref_get(&rcu_barrier_wait_ref); > >> + __call_rcu(rcu_domain, &__get_cpu_var(rcu_barrier_head), > >> + rcu_barrier_callback); > >> +} > >> + > >> +static void __rcu_barrier(struct rcu_domain *rcu_domain) > >> +{ > >> + mutex_lock(&rcu_barrier_mutex); > >> + > >> + init_completion(&rcu_barrier_completion); > >> + kref_init(&rcu_barrier_wait_ref); > >> + > >> + /* queue barrier rcu_heads for every cpu */ > >> + on_each_cpu(rcu_barrier_func, rcu_domain, 1); > >> + > >> + kref_put(&rcu_barrier_wait_ref, rcu_barrier_release_ref); > >> + wait_for_completion(&rcu_barrier_completion); > >> + > >> + mutex_unlock(&rcu_barrier_mutex); > >> +} > >> + > >> +#define gen_rcu_barrier(domain) \ > >> +void rcu_barrier_##domain(void) \ > >> +{ \ > >> + __rcu_barrier(&rcu_domain_##domain); \ > >> +} \ > >> +EXPORT_SYMBOL_GPL(rcu_barrier_##domain); > >> + > >> +GEN_CODES(gen_rcu_barrier) > >> + > >> diff --git a/kernel/sched.c b/kernel/sched.c > >> index 09b574e..967f25a 100644 > >> --- a/kernel/sched.c > >> +++ b/kernel/sched.c > >> @@ -9113,6 +9113,7 @@ struct cgroup_subsys cpuacct_subsys = { > >> }; > >> #endif /* CONFIG_CGROUP_CPUACCT */ > >> > >> +#ifndef CONFIG_RCURING > >> #ifndef CONFIG_SMP > >> > >> void synchronize_sched_expedited(void) > >> @@ -9182,3 +9183,4 @@ void synchronize_sched_expedited(void) > >> EXPORT_SYMBOL_GPL(synchronize_sched_expedited); > >> > >> #endif /* #else #ifndef CONFIG_SMP */ > >> +#endif /* CONFIG_RCURING */ > >> diff --git a/kernel/time/tick-sched.c b/kernel/time/tick-sched.c > >> index 3e216e0..0eba561 100644 > >> --- a/kernel/time/tick-sched.c > >> +++ b/kernel/time/tick-sched.c > >> @@ -269,6 +269,10 @@ void tick_nohz_stop_sched_tick(int inidle) > >> cpu = smp_processor_id(); > >> ts = &per_cpu(tick_cpu_sched, cpu); > >> > >> + /* Don't enter nohz when cpu is offlining, it is going to die */ > >> + if (cpu_is_offline(cpu)) > >> + goto end; > >> + > >> /* > >> * Call to tick_nohz_start_idle stops the last_update_time from being > >> * updated. Thus, it must not be called in the event we are called from > >> > > > > > ^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [PATCH, RFC] RCU: New scalable, multi-GP and preemptible RCU implementation 2010-09-08 19:53 ` Paul E. McKenney @ 2010-09-13 12:27 ` Lai Jiangshan 2010-09-13 23:17 ` Paul E. McKenney 0 siblings, 1 reply; 8+ messages in thread From: Lai Jiangshan @ 2010-09-13 12:27 UTC (permalink / raw) To: paulmck Cc: Linus Torvalds, Ingo Molnar, Andrew Morton, David Miller, Manfred Spraul, Mathieu Desnoyers, Steven Rostedt, Thomas Gleixner, Peter Zijlstra, LKML, dipankar On 09/09/2010 03:53 AM, Paul E. McKenney wrote: > On Tue, Aug 31, 2010 at 11:03:11AM +0800, Lai Jiangshan wrote: >> On 08/31/2010 07:57 AM, Paul E. McKenney wrote: >>> On Mon, Aug 30, 2010 at 05:31:44PM +0800, Lai Jiangshan wrote: >>>> >>>> 0) contents >>>> 1) overview >>>> 2) scalable >>>> 3) multi-GP >>>> 4) integrated expedited synchronize_rcu >>>> 5) preemptible >>>> 6) top-down implies >>>> 7) speech >>>> 8) core rcuring >>> >>> Interesting! I can't claim to have dug through every detail, but figured >>> I should ask a few questions first. >>> >>> Thanx, Paul >>> >>> Of course, the first question is whether it passes rcutorture, and if >>> so on what hardware configuration. >> >> Only in x86(32bit and 64bit) system. >> Did you patch it in power and test it? It' need more test for different archs, >> but I have no other arch. >> >> What is "hardware configuration"? > > Mainly the number of CPUs. The socket/core/thread counts could also > be helpful. > >>> At first glance, it looks to live somewhere between Manfred Spraul's >>> counter-based hierarchical RCU and classic RCU. There are also some >>> resemblances to K42's "generations" RCU-like mechanism. >>> >>>> 1) overview >>>> This is a new RCU implementation: RCURING. It uses a ring atomic counters >>>> to control the completion of GPs and a ring callbacks batches to process >>>> the callbacks. >>> >>> The ring is a set of grace periods, correct? >> >> correct. A GP is also controled by a set ring counters in >> (domain's done_complete, this GP's complete], this GP is only completed >> until all these conters become zero. > > Understood. > >> I pull up a question of your here: >>> For a given grace period, all quiescent states hit the same counter. >>> Or am I missing something? >> >> No. >> any rcu read site has a locked_complete, if a cpu hold a locked_complete, >> the counter of this locked_complete will not become zero. >> >> Any quiescent states will release its previous rcu read site's locked_complete, >> so different QS may hit different counter. >> QS alway locks the newest GP(not started to waiting, domain's curr_complete) >> for next rcu read site and get the new locked_complete. >> >> What I said seems still indistinct, more questions are wellcome, questions >> also help for documents. Thank you very much. > > OK, here is my assessment thus far. First my understanding of how this > all works: > > o Maintain a circular array of counters. Maintain a parallel > circular array of callback lists. > > o Each non-dyntick-idle CPU holds a reference to one element of the > array. If Ring RCU is idle, it holds a reference to the element > that will serve the next grace period. Similarly, if a given > CPU has passed through a quiescent state for all existing grace > periods, it will hold a reference to the element that will serve > for the next grace period. > > o Upon encountering a quiescent state, we acquire a reference to > the element that will serve for the next grace period, and > then release the reference to the element that we were holding. > > The rcuring_lock() function acquires a reference to the element > that will serve for the next grace period. Not yet clear what > the atomic_inc_not_zero() is for. If the reference count is zero, > what makes it non-zero? rcuring_lock() may be called when current CPU did not hold any reference of rcuring, so the rcuring may completes many GPs while rcuring_lock() is requiring reference. but rcuring_lock() should not require the finish complete GP's reference. so I use atomic_inc_not_zero(): If the reference count is zero, it maybe finish complete GP's reference, we should try again. In rcuring_advance_lock(), we have no such problem, we can use rcuring_dup_lock() actually. > > The rcuring_dup_lock() function acquires another reference to > the element that the CPU already holds a reference to. This > is used in preemptible RCU when a task is preempted in an > RCU read-side critical section. > > The rcuring_unlock() function releases a reference. > > The rcuring_advance_lock() acquires a new reference, then > releases the old one. This is used when a CPU passes through > a quiescent state. > > o When a CPU enters dyntick-idle mode, it releases its reference. > When it exits dyntick-idle mode (including irq and NMI), it > acquires a referent to the element that will serve for the next > grace period. Yes, the rcuring will totally ignore this cpu, if it did not hold any referent. > > o The __rcu_check_callbacks() function checks to see if the > oldest grace period is now reference-free, and, if so, > rcuring_advance_done_complete() retires the old grace periods. > The advance_callbacks() then moves any affected callbacks to > be invoked. > > o If RCU callbacks are not invoked quickly enough (10 jiffies), > force_quiescent_state() sends IPIs to CPUs needing help > reaching a quiescent state. > > > So there were some things that struck me as being really good with > your approach, most of which I intend to pull into the current RCU > implementations: I will do this, also. > > o Offsets into task_struct, allowing inlining of rcu_read_lock() > and the fastpath of rcu_read_unlock(). > > o Faster grace periods, at least when the system is busy. > > o Placement of rcu_copy_process() and exit_rcu() into > include/linux/ringrcu.h. Not clear that this maps nicely > to tree/tiny implementations, though. > > o Defining __rcu_barrier() in terms of __call_rcu() instead of the > various call_rcu*() variants. Ditto for __synchronize_rcu(). > > > There are some things that I am unsure of, perhaps because I am still > misunderstanting something: > > o The per-CPU reference-counting scheme can gain much of the > benefit that TREE_RCU gains by handling large numbers of updates > with a single grace period. However, CPUs that pass through > quiescent states quickly, for example, by frequently interrupting > out of dyntick-idle state, can incur lots of cache misses cycling > through lots of RING RCU array elements. It is true. I think frequently interrupting is seldom happened when dyntick-idle state. And this can be fixed easily. > > o Aggressive code shrinkage through large cpp macros. You seem to > share my ambivalence given that you use it in only some of the > situations where it might be applied. ;-) > > The savings of lines of code does vary -- many definitions can > be placed under a single #ifdef, after all. I tried my best to abstract rcu, and to retry the common codes, I think we can use it for rcutree. > > o Use of kref rather than a simple counter. Saves a line or two, > adds a function. I want the save the comments, kref is always inited as 1. code itself is comments. > > o __rcu_check_callbacks() won't force a grace period if there > are no callbacks waiting to be invoked. On the one hand, one > can argue that forcing a grace period is useless in that case, > but on the other hand, there is usually more than one CPU. > This is probably a consequence of the force_quiescent_state() > time being based on callback residence time rather than on > grace-period duration -- only CPUs with callbacks can possibly > measure a meaningful residence time. I think it's useless to FQS when there is no callbacks in current cpu. FQS is not scalable because is require global lock. > > o NMI handling? Note that raw_local_irq_save() does not > block NMIs, so need to analyze recursion into rcu_kernel_enter() > and rcu_kernel_exit(). Looks like it might be OK, but... > Ironically enough, courtesy of the atomic instructions and > cache misses that are so scary in these functions. ;-) It's safe when NMI, I added barrier()s. +static void __rcu_kernel_enter_outmost(int cpu) +{ + GEN_CODES(LOCK_COMPLETE) + + barrier(); + per_cpu(kernel_count, cpu) = 1; + barrier(); + + GEN_CODES(SAVE_COMPLETE) +} > > o Eliminating rcu_pending() might be a good thing, though perhaps > simplifying it (making it less exact) might be the right thing > to do in TREE_RCU, particularly given that priority boosting > turns RCU_SOFTIRQ into a kthread. > > > Here are some things that could be problematic about Ring RCU. Some > of them are fixable, others might or might not be: > > o Potentially high memory contention on a given ring: worst case > is quite bad on large SMP systems. On the other hand, best > case is really good, given that each CPU could get its own > rcuring counter. Average case is likely to be modestly bad. > The default value of 4 for RCURING_COUNTERS_SHIFT maps to 16 > array elements, which could easily show worst case behavior from > time to time on large systems. It may be true, only a global rcuring for a rcu_domain. by it just do 6 atomic operations for a QS without any locks, will it reduce the pain of memory contention? It is hard to fix this if it becomes a problem, but I believe it's very economical only 6 atomic operations for a QS. (maybe the only way to fix: use hierarchy for RCURING) > > o Real-time latency could be problematic due to: > > o Scans across CPUs and across grace-period ring elements. Scans across CPUs: only when FQS, but RCURING very very seldom do a FQS. and we can fix it. call FQS in kthread or... etc. across grace-period ring elements: we use small RCURING_IDX_MAX. > > o Worst-case memory contention. > > o High interrupt rates out of dyntick-idle state. > > o Potentially large memory consumption due to RCURING_IDX_MAX > arrays in rcu_data and rcuring structures. The default of > 16 elements should be OK. > > o Use of atomic instructions in dyntick path. (Not just one, but > potentially a long string of them -- and implied cache misses. > Understand that the expected case is a single atomic in > rcuring_lock() -- but worst case is at least somewhat important.) > > Benefit is that you don't need to scan the CPUs to check for > dyntick-idle CPUs -- force_quiescent_state() is then limited to > resched IPIs. Unless of course the dyntick-idle CPUs remain in > dyntick-idle state throughout, in which case they get IPIed. This > situation limits dyntick-idle state to 10 jiffies, which would > result in the energy-efficiency guys screaming bloody murder. > The battery-powered embedded guys would not bother screaming, > but could instead be expected to come after us with pitchforks. I hate to call force_quiescent_state(), it is not scalable in RCURING/RCUTREE In RCURING/RCUTREE, A GP is about 3 jiffies in average, so I use 10 jiffies for FQS interval, thus force_quiescent_state() is called rarely. in force_quiescent_state(), I forgot to test the dest cpu's kernel_count, It will be fixed. RCURING is Energy efficiency. > > o Energy efficiency: see above paragraph. Good catch. > > > The remainder are shortcomings that should be easy to fix: > > o force_quiescent_state() for preemptible variant not implemented. > (Waiting on RCU priority boosting.) > > o Constants in __rcu_check_callbacks() really need to be > macroized. "10"? "2 * HZ"? > > o rcu_do_batch() needs to self-limit in multi-CPU configurations. > Otherwise, latency is a problem. TINY_RCU gets away without > this (for the time being, anyway), but TREE_RCU and CLASSIC_RCU > before it need the blimit functionality. Otherwise the > Linux Audio guys will scream. > > o Too much stuff directly called from scheduling-clock-interrupt > context! Full scan of CPUs in __force_quiescent_state(), for > example. Need to hand off to RCU_SOFTIRQ or a kthread. > > o print_rcuring_stall() does not dump stacks. > > So, what am I still missing about your approach? > Thank you very much. I think the potentially high memory contention cold be a hard problem, but will you plan to merge it. Lai. ^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [PATCH, RFC] RCU: New scalable, multi-GP and preemptible RCU implementation 2010-09-13 12:27 ` Lai Jiangshan @ 2010-09-13 23:17 ` Paul E. McKenney 2010-09-15 9:43 ` Lai Jiangshan 0 siblings, 1 reply; 8+ messages in thread From: Paul E. McKenney @ 2010-09-13 23:17 UTC (permalink / raw) To: Lai Jiangshan Cc: Linus Torvalds, Ingo Molnar, Andrew Morton, David Miller, Manfred Spraul, Mathieu Desnoyers, Steven Rostedt, Thomas Gleixner, Peter Zijlstra, LKML, dipankar On Mon, Sep 13, 2010 at 08:27:45PM +0800, Lai Jiangshan wrote: > On 09/09/2010 03:53 AM, Paul E. McKenney wrote: > > On Tue, Aug 31, 2010 at 11:03:11AM +0800, Lai Jiangshan wrote: > >> On 08/31/2010 07:57 AM, Paul E. McKenney wrote: > >>> On Mon, Aug 30, 2010 at 05:31:44PM +0800, Lai Jiangshan wrote: > >>>> > >>>> 0) contents > >>>> 1) overview > >>>> 2) scalable > >>>> 3) multi-GP > >>>> 4) integrated expedited synchronize_rcu > >>>> 5) preemptible > >>>> 6) top-down implies > >>>> 7) speech > >>>> 8) core rcuring > >>> > >>> Interesting! I can't claim to have dug through every detail, but figured > >>> I should ask a few questions first. > >>> > >>> Thanx, Paul > >>> > >>> Of course, the first question is whether it passes rcutorture, and if > >>> so on what hardware configuration. > >> > >> Only in x86(32bit and 64bit) system. > >> Did you patch it in power and test it? It' need more test for different archs, > >> but I have no other arch. > >> > >> What is "hardware configuration"? > > > > Mainly the number of CPUs. The socket/core/thread counts could also > > be helpful. Any info? ;-) > >>> At first glance, it looks to live somewhere between Manfred Spraul's > >>> counter-based hierarchical RCU and classic RCU. There are also some > >>> resemblances to K42's "generations" RCU-like mechanism. > >>> > >>>> 1) overview > >>>> This is a new RCU implementation: RCURING. It uses a ring atomic counters > >>>> to control the completion of GPs and a ring callbacks batches to process > >>>> the callbacks. > >>> > >>> The ring is a set of grace periods, correct? > >> > >> correct. A GP is also controled by a set ring counters in > >> (domain's done_complete, this GP's complete], this GP is only completed > >> until all these conters become zero. > > > > Understood. > > > >> I pull up a question of your here: > >>> For a given grace period, all quiescent states hit the same counter. > >>> Or am I missing something? > >> > >> No. > >> any rcu read site has a locked_complete, if a cpu hold a locked_complete, > >> the counter of this locked_complete will not become zero. > >> > >> Any quiescent states will release its previous rcu read site's locked_complete, > >> so different QS may hit different counter. > >> QS alway locks the newest GP(not started to waiting, domain's curr_complete) > >> for next rcu read site and get the new locked_complete. > >> > >> What I said seems still indistinct, more questions are wellcome, questions > >> also help for documents. Thank you very much. > > > > OK, here is my assessment thus far. First my understanding of how this > > all works: > > > > o Maintain a circular array of counters. Maintain a parallel > > circular array of callback lists. > > > > o Each non-dyntick-idle CPU holds a reference to one element of the > > array. If Ring RCU is idle, it holds a reference to the element > > that will serve the next grace period. Similarly, if a given > > CPU has passed through a quiescent state for all existing grace > > periods, it will hold a reference to the element that will serve > > for the next grace period. > > > > o Upon encountering a quiescent state, we acquire a reference to > > the element that will serve for the next grace period, and > > then release the reference to the element that we were holding. > > > > The rcuring_lock() function acquires a reference to the element > > that will serve for the next grace period. Not yet clear what > > the atomic_inc_not_zero() is for. If the reference count is zero, > > what makes it non-zero? > > rcuring_lock() may be called when current CPU did not hold any reference > of rcuring, so the rcuring may completes many GPs while rcuring_lock() > is requiring reference. but rcuring_lock() should not require the finish > complete GP's reference. so I use atomic_inc_not_zero(): If the reference count > is zero, it maybe finish complete GP's reference, we should try again. > > In rcuring_advance_lock(), we have no such problem, we can use rcuring_dup_lock() > actually. Got it, thank you! > > The rcuring_dup_lock() function acquires another reference to > > the element that the CPU already holds a reference to. This > > is used in preemptible RCU when a task is preempted in an > > RCU read-side critical section. > > > > The rcuring_unlock() function releases a reference. > > > > The rcuring_advance_lock() acquires a new reference, then > > releases the old one. This is used when a CPU passes through > > a quiescent state. > > > > o When a CPU enters dyntick-idle mode, it releases its reference. > > When it exits dyntick-idle mode (including irq and NMI), it > > acquires a referent to the element that will serve for the next > > grace period. > > Yes, the rcuring will totally ignore this cpu, if it did not hold > any referent. Good! > > o The __rcu_check_callbacks() function checks to see if the > > oldest grace period is now reference-free, and, if so, > > rcuring_advance_done_complete() retires the old grace periods. > > The advance_callbacks() then moves any affected callbacks to > > be invoked. > > > > o If RCU callbacks are not invoked quickly enough (10 jiffies), > > force_quiescent_state() sends IPIs to CPUs needing help > > reaching a quiescent state. > > > > > > So there were some things that struck me as being really good with > > your approach, most of which I intend to pull into the current RCU > > implementations: > > I will do this, also. Very good!!! The first and last (offsets and __rcu_barrier()/__synchronize_rcu()) seem like the most important in the short term. > > o Offsets into task_struct, allowing inlining of rcu_read_lock() > > and the fastpath of rcu_read_unlock(). > > > > o Faster grace periods, at least when the system is busy. > > > > o Placement of rcu_copy_process() and exit_rcu() into > > include/linux/ringrcu.h. Not clear that this maps nicely > > to tree/tiny implementations, though. > > > > o Defining __rcu_barrier() in terms of __call_rcu() instead of the > > various call_rcu*() variants. Ditto for __synchronize_rcu(). > > > > > > There are some things that I am unsure of, perhaps because I am still > > misunderstanting something: > > > > o The per-CPU reference-counting scheme can gain much of the > > benefit that TREE_RCU gains by handling large numbers of updates > > with a single grace period. However, CPUs that pass through > > quiescent states quickly, for example, by frequently interrupting > > out of dyntick-idle state, can incur lots of cache misses cycling > > through lots of RING RCU array elements. > > It is true. I think frequently interrupting is seldom happened when > dyntick-idle state. And this can be fixed easily. That is the intention for dyntick-idle state, but fast transitions can happen for some workloads. > > o Aggressive code shrinkage through large cpp macros. You seem to > > share my ambivalence given that you use it in only some of the > > situations where it might be applied. ;-) > > > > The savings of lines of code does vary -- many definitions can > > be placed under a single #ifdef, after all. > > I tried my best to abstract rcu, and to retry the common codes, > I think we can use it for rcutree. We should of course decide on a case-by-case basis. > > o Use of kref rather than a simple counter. Saves a line or two, > > adds a function. > > I want the save the comments, kref is always inited as 1. > code itself is comments. Fair point, so probably worth doing. > > o __rcu_check_callbacks() won't force a grace period if there > > are no callbacks waiting to be invoked. On the one hand, one > > can argue that forcing a grace period is useless in that case, > > but on the other hand, there is usually more than one CPU. > > This is probably a consequence of the force_quiescent_state() > > time being based on callback residence time rather than on > > grace-period duration -- only CPUs with callbacks can possibly > > measure a meaningful residence time. > > I think it's useless to FQS when there is no callbacks in current cpu. > FQS is not scalable because is require global lock. On systems with large numbers of CPUs that need real-time response, it will likely be necessary to make fqs be incremental, in which case it is good to have CPUs that have no callbacks help out. (I have gotten one request for incremental fqs, sent then the patch, and didn't hear anything since.) > > o NMI handling? Note that raw_local_irq_save() does not > > block NMIs, so need to analyze recursion into rcu_kernel_enter() > > and rcu_kernel_exit(). Looks like it might be OK, but... > > Ironically enough, courtesy of the atomic instructions and > > cache misses that are so scary in these functions. ;-) > > It's safe when NMI, I added barrier()s. > > +static void __rcu_kernel_enter_outmost(int cpu) > +{ > + GEN_CODES(LOCK_COMPLETE) > + > + barrier(); > + per_cpu(kernel_count, cpu) = 1; > + barrier(); > + > + GEN_CODES(SAVE_COMPLETE) > +} It does look plausible, but will need careful checking. > > o Eliminating rcu_pending() might be a good thing, though perhaps > > simplifying it (making it less exact) might be the right thing > > to do in TREE_RCU, particularly given that priority boosting > > turns RCU_SOFTIRQ into a kthread. > > > > > > Here are some things that could be problematic about Ring RCU. Some > > of them are fixable, others might or might not be: > > > > o Potentially high memory contention on a given ring: worst case > > is quite bad on large SMP systems. On the other hand, best > > case is really good, given that each CPU could get its own > > rcuring counter. Average case is likely to be modestly bad. > > The default value of 4 for RCURING_COUNTERS_SHIFT maps to 16 > > array elements, which could easily show worst case behavior from > > time to time on large systems. > > It may be true, only a global rcuring for a rcu_domain. > by it just do 6 atomic operations for a QS without any locks, will it reduce > the pain of memory contention? It does greatly reduce the best-case and (I think) average level of memory contention, but I am still concerned about the case where all CPUs have references to the next-to-be-used ring element, and then a grace period starts. All the CPUs will then need to do rcuring_unlock() on the same element. On a really large system, this could be a problem. > It is hard to fix this if it becomes a problem, but I believe > it's very economical only 6 atomic operations for a QS. > > (maybe the only way to fix: use hierarchy for RCURING) That might be one possibility. The big thing will be determining what RCURING is really good at compared to existing implementations in the kernel. > > o Real-time latency could be problematic due to: > > > > o Scans across CPUs and across grace-period ring elements. > > Scans across CPUs: only when FQS, but RCURING very very seldom do a FQS. > and we can fix it. call FQS in kthread or... etc. Or incrementally scan, one way or another. > across grace-period ring elements: we use small RCURING_IDX_MAX. True, though if you had a large number of CPUs, you would want to merge the GPs in that case -- otherwise, you have very long grace-period latencies. (Maybe you already do merge GPs, but if so, I missed it.) > > o Worst-case memory contention. > > > > o High interrupt rates out of dyntick-idle state. > > > > o Potentially large memory consumption due to RCURING_IDX_MAX > > arrays in rcu_data and rcuring structures. The default of > > 16 elements should be OK. > > > > o Use of atomic instructions in dyntick path. (Not just one, but > > potentially a long string of them -- and implied cache misses. > > Understand that the expected case is a single atomic in > > rcuring_lock() -- but worst case is at least somewhat important.) > > > > Benefit is that you don't need to scan the CPUs to check for > > dyntick-idle CPUs -- force_quiescent_state() is then limited to > > resched IPIs. Unless of course the dyntick-idle CPUs remain in > > dyntick-idle state throughout, in which case they get IPIed. This > > situation limits dyntick-idle state to 10 jiffies, which would > > result in the energy-efficiency guys screaming bloody murder. > > The battery-powered embedded guys would not bother screaming, > > but could instead be expected to come after us with pitchforks. > > I hate to call force_quiescent_state(), it is not scalable in RCURING/RCUTREE > > In RCURING/RCUTREE, A GP is about 3 jiffies in average, > so I use 10 jiffies for FQS interval, thus force_quiescent_state() > is called rarely. > > in force_quiescent_state(), I forgot to test the dest cpu's kernel_count, > It will be fixed. RCURING is Energy efficiency. Good! > > o Energy efficiency: see above paragraph. > > Good catch. > > > > > > > The remainder are shortcomings that should be easy to fix: > > > > o force_quiescent_state() for preemptible variant not implemented. > > (Waiting on RCU priority boosting.) > > > > o Constants in __rcu_check_callbacks() really need to be > > macroized. "10"? "2 * HZ"? > > > > o rcu_do_batch() needs to self-limit in multi-CPU configurations. > > Otherwise, latency is a problem. TINY_RCU gets away without > > this (for the time being, anyway), but TREE_RCU and CLASSIC_RCU > > before it need the blimit functionality. Otherwise the > > Linux Audio guys will scream. > > > > o Too much stuff directly called from scheduling-clock-interrupt > > context! Full scan of CPUs in __force_quiescent_state(), for > > example. Need to hand off to RCU_SOFTIRQ or a kthread. > > > > o print_rcuring_stall() does not dump stacks. > > > > So, what am I still missing about your approach? > > > > Thank you very much. I think the potentially high memory contention > cold be a hard problem, but will you plan to merge it. I do not know the answer to that yet. Clearly, it would need to handle the various corner cases we have discussed, and possibly others that we have not yet uncovered. It would also need to do something that the current RCU implementations can not be made to handle well. Otherwise, my approach is to merge implementations (e.g., SRCU into TREE_PREEMPT_RCU and TINY_PREEMPT_RCU) rather than adding new ones. THanx, Paul ^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [PATCH, RFC] RCU: New scalable, multi-GP and preemptible RCU implementation 2010-09-13 23:17 ` Paul E. McKenney @ 2010-09-15 9:43 ` Lai Jiangshan 2010-09-16 6:36 ` Paul E. McKenney 0 siblings, 1 reply; 8+ messages in thread From: Lai Jiangshan @ 2010-09-15 9:43 UTC (permalink / raw) To: paulmck Cc: Linus Torvalds, Ingo Molnar, Andrew Morton, David Miller, Manfred Spraul, Mathieu Desnoyers, Steven Rostedt, Thomas Gleixner, Peter Zijlstra, LKML, dipankar On 09/14/2010 07:17 AM, Paul E. McKenney wrote: >>>> >>>> Only in x86(32bit and 64bit) system. >>>> Did you patch it in power and test it? It' need more test for different archs, >>>> but I have no other arch. >>>> >>>> What is "hardware configuration"? >>> >>> Mainly the number of CPUs. The socket/core/thread counts could also >>> be helpful. > > Any info? ;-) > Ouch, answer this first: hardware1: x86, 1cpu X 2cores hardware2: x86_64, 2cpu X 2cores 48 hours rcutorture for each machine, (NO_HZ, cpu_plugin concurrently ... ) ~120 minutes rcutorture for each different CONFIG. this work is my own work, not my company's, I can't access to big machines. Lai. ^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [PATCH, RFC] RCU: New scalable, multi-GP and preemptible RCU implementation 2010-09-15 9:43 ` Lai Jiangshan @ 2010-09-16 6:36 ` Paul E. McKenney 0 siblings, 0 replies; 8+ messages in thread From: Paul E. McKenney @ 2010-09-16 6:36 UTC (permalink / raw) To: Lai Jiangshan Cc: Linus Torvalds, Ingo Molnar, Andrew Morton, David Miller, Manfred Spraul, Mathieu Desnoyers, Steven Rostedt, Thomas Gleixner, Peter Zijlstra, LKML, dipankar On Wed, Sep 15, 2010 at 05:43:12PM +0800, Lai Jiangshan wrote: > On 09/14/2010 07:17 AM, Paul E. McKenney wrote: > >>>> > >>>> Only in x86(32bit and 64bit) system. > >>>> Did you patch it in power and test it? It' need more test for different archs, > >>>> but I have no other arch. > >>>> > >>>> What is "hardware configuration"? > >>> > >>> Mainly the number of CPUs. The socket/core/thread counts could also > >>> be helpful. > > > > Any info? ;-) > > > > Ouch, answer this first: > hardware1: x86, 1cpu X 2cores > hardware2: x86_64, 2cpu X 2cores > 48 hours rcutorture for each machine, (NO_HZ, cpu_plugin concurrently ... ) > ~120 minutes rcutorture for each different CONFIG. > > this work is my own work, not my company's, I can't access to > big machines. Thank you for the info! I will do some testing some time in the next few weeks. Initial smoke tests look promising. Thanx, Paul ^ permalink raw reply [flat|nested] 8+ messages in thread
end of thread, other threads:[~2010-09-16 6:36 UTC | newest] Thread overview: 8+ messages (download: mbox.gz follow: Atom feed -- links below jump to the message on this page -- 2010-08-30 9:31 [PATCH, RFC] RCU: New scalable, multi-GP and preemptible RCU implementation Lai Jiangshan 2010-08-30 23:57 ` Paul E. McKenney 2010-08-31 3:03 ` Lai Jiangshan 2010-09-08 19:53 ` Paul E. McKenney 2010-09-13 12:27 ` Lai Jiangshan 2010-09-13 23:17 ` Paul E. McKenney 2010-09-15 9:43 ` Lai Jiangshan 2010-09-16 6:36 ` Paul E. McKenney
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox