From: Oleg Nesterov <oleg@redhat.com>
To: Peter Zijlstra <peterz@infradead.org>, Tejun Heo <tj@kernel.org>
Cc: paulmck@linux.vnet.ibm.com, mingo@redhat.com, der.herr@hofr.at,
dave@stgolabs.net, riel@redhat.com, viro@ZenIV.linux.org.uk,
torvalds@linux-foundation.org, linux-kernel@vger.kernel.org
Subject: [PATCH 6/5] stop_machine: kill stop_cpus_lock and lg_double_lock/unlock()
Date: Wed, 1 Jul 2015 21:23:41 +0200 [thread overview]
Message-ID: <20150701192341.GB1476@redhat.com> (raw)
In-Reply-To: <20150630012931.GA23904@redhat.com>
stop_two_cpus() and stop_cpus() use stop_cpus_lock to avoid the
deadlock, we need to ensure that the stopper functions can't be
queued "backwards" from one another.
Instead, we can change stop_two_cpus() to take 2 stopper->lock's
and queue both works "atomically"; just we need to check that both
->stop_work's on these CPU's are either free or already queued.
Note: this patch preserves the cpu_active() checks, but I think
we need to shift them into migrate_swap_stop(). However, we can't
do this without another cleanup: currently stopper->enabled does
not guarantee that work->fn() will be actually executed if we race
with cpu_down().
Signed-off-by: Oleg Nesterov <oleg@redhat.com>
---
include/linux/lglock.h | 5 --
kernel/locking/lglock.c | 22 ---------
kernel/stop_machine.c | 120 ++++++++++++++++++++++++++++-------------------
3 files changed, 71 insertions(+), 76 deletions(-)
diff --git a/include/linux/lglock.h b/include/linux/lglock.h
index c92ebd1..0081f00 100644
--- a/include/linux/lglock.h
+++ b/include/linux/lglock.h
@@ -52,15 +52,10 @@ struct lglock {
static struct lglock name = { .lock = &name ## _lock }
void lg_lock_init(struct lglock *lg, char *name);
-
void lg_local_lock(struct lglock *lg);
void lg_local_unlock(struct lglock *lg);
void lg_local_lock_cpu(struct lglock *lg, int cpu);
void lg_local_unlock_cpu(struct lglock *lg, int cpu);
-
-void lg_double_lock(struct lglock *lg, int cpu1, int cpu2);
-void lg_double_unlock(struct lglock *lg, int cpu1, int cpu2);
-
void lg_global_lock(struct lglock *lg);
void lg_global_unlock(struct lglock *lg);
diff --git a/kernel/locking/lglock.c b/kernel/locking/lglock.c
index 951cfcd..86ae2ae 100644
--- a/kernel/locking/lglock.c
+++ b/kernel/locking/lglock.c
@@ -60,28 +60,6 @@ void lg_local_unlock_cpu(struct lglock *lg, int cpu)
}
EXPORT_SYMBOL(lg_local_unlock_cpu);
-void lg_double_lock(struct lglock *lg, int cpu1, int cpu2)
-{
- BUG_ON(cpu1 == cpu2);
-
- /* lock in cpu order, just like lg_global_lock */
- if (cpu2 < cpu1)
- swap(cpu1, cpu2);
-
- preempt_disable();
- lock_acquire_shared(&lg->lock_dep_map, 0, 0, NULL, _RET_IP_);
- arch_spin_lock(per_cpu_ptr(lg->lock, cpu1));
- arch_spin_lock(per_cpu_ptr(lg->lock, cpu2));
-}
-
-void lg_double_unlock(struct lglock *lg, int cpu1, int cpu2)
-{
- lock_release(&lg->lock_dep_map, 1, _RET_IP_);
- arch_spin_unlock(per_cpu_ptr(lg->lock, cpu1));
- arch_spin_unlock(per_cpu_ptr(lg->lock, cpu2));
- preempt_enable();
-}
-
void lg_global_lock(struct lglock *lg)
{
int i;
diff --git a/kernel/stop_machine.c b/kernel/stop_machine.c
index 12484e5..20fb291 100644
--- a/kernel/stop_machine.c
+++ b/kernel/stop_machine.c
@@ -20,7 +20,6 @@
#include <linux/kallsyms.h>
#include <linux/smpboot.h>
#include <linux/atomic.h>
-#include <linux/lglock.h>
/*
* Structure to determine completion condition and record errors. May
@@ -47,14 +46,6 @@ struct cpu_stopper {
static DEFINE_PER_CPU(struct cpu_stopper, cpu_stopper);
static bool stop_machine_initialized = false;
-/*
- * Avoids a race between stop_two_cpus and global stop_cpus, where
- * the stoppers could get queued up in reverse order, leading to
- * system deadlock. Using an lglock means stop_two_cpus remains
- * relatively cheap.
- */
-DEFINE_STATIC_LGLOCK(stop_cpus_lock);
-
static void cpu_stop_init_done(struct cpu_stop_done *done, unsigned int nr_todo)
{
memset(done, 0, sizeof(*done));
@@ -73,21 +64,29 @@ static void cpu_stop_signal_done(struct cpu_stop_done *done, bool executed)
}
}
+static inline bool stop_work_pending(struct cpu_stopper *stopper)
+{
+ return !list_empty(&stopper->stop_work.list);
+}
+
+static void __cpu_stop_queue_work(struct cpu_stopper *stopper,
+ struct cpu_stop_work *work)
+{
+ list_add_tail(&work->list, &stopper->works);
+ wake_up_process(stopper->thread);
+}
+
/* queue @work to @stopper. if offline, @work is completed immediately */
static void cpu_stop_queue_work(unsigned int cpu, struct cpu_stop_work *work)
{
struct cpu_stopper *stopper = &per_cpu(cpu_stopper, cpu);
-
unsigned long flags;
spin_lock_irqsave(&stopper->lock, flags);
-
- if (stopper->enabled) {
- list_add_tail(&work->list, &stopper->works);
- wake_up_process(stopper->thread);
- } else
+ if (stopper->enabled)
+ __cpu_stop_queue_work(stopper, work);
+ else
cpu_stop_signal_done(work->done, false);
-
spin_unlock_irqrestore(&stopper->lock, flags);
}
@@ -213,6 +212,48 @@ static int multi_cpu_stop(void *data)
return err;
}
+static int cpu_stop_queue_two_works(int cpu1, struct cpu_stop_work *work1,
+ int cpu2, struct cpu_stop_work *work2)
+{
+ struct cpu_stopper *stopper1 = per_cpu_ptr(&cpu_stopper, cpu1);
+ struct cpu_stopper *stopper2 = per_cpu_ptr(&cpu_stopper, cpu2);
+ int err;
+retry:
+ spin_lock_irq(&stopper1->lock);
+ spin_lock_nested(&stopper2->lock, SINGLE_DEPTH_NESTING);
+ /*
+ * If we observe both CPUs active we know _cpu_down() cannot yet have
+ * queued its stop_machine works and therefore ours will get executed
+ * first. Or its not either one of our CPUs that's getting unplugged,
+ * in which case we don't care.
+ */
+ err = -ENOENT;
+ if (!cpu_active(cpu1) || !cpu_active(cpu2))
+ goto unlock;
+
+ WARN_ON(!stopper1->enabled || !stopper2->enabled);
+ /*
+ * Ensure that if we race with stop_cpus() the stoppers won't
+ * get queued up in reverse order, leading to system deadlock.
+ */
+ err = -EDEADLK;
+ if (stop_work_pending(stopper1) != stop_work_pending(stopper2))
+ goto unlock;
+
+ err = 0;
+ __cpu_stop_queue_work(stopper1, work1);
+ __cpu_stop_queue_work(stopper2, work2);
+unlock:
+ spin_unlock(&stopper2->lock);
+ spin_unlock_irq(&stopper1->lock);
+
+ if (unlikely(err == -EDEADLK)) {
+ cond_resched();
+ goto retry;
+ }
+ return err;
+}
+
/**
* stop_two_cpus - stops two cpus
* @cpu1: the cpu to stop
@@ -228,48 +269,28 @@ int stop_two_cpus(unsigned int cpu1, unsigned int cpu2, cpu_stop_fn_t fn, void *
{
struct cpu_stop_done done;
struct cpu_stop_work work1, work2;
- struct multi_stop_data msdata;
-
- preempt_disable();
- msdata = (struct multi_stop_data){
+ struct multi_stop_data msdata = {
.fn = fn,
.data = arg,
.num_threads = 2,
.active_cpus = cpumask_of(cpu1),
};
- work1 = work2 = (struct cpu_stop_work){
- .fn = multi_cpu_stop,
- .arg = &msdata,
- .done = &done
- };
-
- cpu_stop_init_done(&done, 2);
set_state(&msdata, MULTI_STOP_PREPARE);
+ cpu_stop_init_done(&done, 2);
- /*
- * If we observe both CPUs active we know _cpu_down() cannot yet have
- * queued its stop_machine works and therefore ours will get executed
- * first. Or its not either one of our CPUs that's getting unplugged,
- * in which case we don't care.
- *
- * This relies on the stopper workqueues to be FIFO.
- */
- if (!cpu_active(cpu1) || !cpu_active(cpu2)) {
- preempt_enable();
- return -ENOENT;
- }
-
- lg_double_lock(&stop_cpus_lock, cpu1, cpu2);
- cpu_stop_queue_work(cpu1, &work1);
- cpu_stop_queue_work(cpu2, &work2);
- lg_double_unlock(&stop_cpus_lock, cpu1, cpu2);
+ work1.fn = work2.fn = multi_cpu_stop;
+ work1.arg = work2.arg = &msdata;
+ work1.done = work2.done = &done;
- preempt_enable();
+ if (cpu1 > cpu2)
+ swap(cpu1, cpu2);
+ if (cpu_stop_queue_two_works(cpu1, &work1, cpu2, &work2))
+ return -ENOENT;
wait_for_completion(&done.completion);
-
- return done.executed ? done.ret : -ENOENT;
+ WARN_ON(!done.executed);
+ return done.ret;
}
/**
@@ -308,7 +329,7 @@ static void queue_stop_cpus_work(const struct cpumask *cpumask,
* preempted by a stopper which might wait for other stoppers
* to enter @fn which can lead to deadlock.
*/
- lg_global_lock(&stop_cpus_lock);
+ preempt_disable();
for_each_cpu(cpu, cpumask) {
work = &per_cpu(cpu_stopper.stop_work, cpu);
work->fn = fn;
@@ -316,7 +337,7 @@ static void queue_stop_cpus_work(const struct cpumask *cpumask,
work->done = done;
cpu_stop_queue_work(cpu, work);
}
- lg_global_unlock(&stop_cpus_lock);
+ preempt_enable();
}
static int __stop_cpus(const struct cpumask *cpumask,
@@ -505,6 +526,7 @@ static int __init cpu_stop_init(void)
spin_lock_init(&stopper->lock);
INIT_LIST_HEAD(&stopper->works);
+ INIT_LIST_HEAD(&stopper->stop_work.list);
}
BUG_ON(smpboot_register_percpu_thread(&cpu_stop_threads));
--
1.5.5.1
prev parent reply other threads:[~2015-07-01 19:25 UTC|newest]
Thread overview: 15+ messages / expand[flat|nested] mbox.gz Atom feed top
2015-06-30 1:29 [PATCH 0/5] stop_machine: cleanups and fix Oleg Nesterov
2015-06-30 1:29 ` [PATCH 1/5] stop_machine: move cpu_stopper_task and stop_cpus_work into struct cpu_stopper Oleg Nesterov
2015-08-03 17:08 ` [tip:sched/core] stop_machine: Move 'cpu_stopper_task' and ' stop_cpus_work' into 'struct cpu_stopper' tip-bot for Oleg Nesterov
2015-06-30 1:29 ` [PATCH 2/5] stop_machine: don't do for_each_cpu() twice in queue_stop_cpus_work() Oleg Nesterov
2015-08-03 17:09 ` [tip:sched/core] stop_machine: Don't " tip-bot for Oleg Nesterov
2015-06-30 1:29 ` [PATCH 3/5] stop_machine: unexport __stop_machine() Oleg Nesterov
2015-08-03 17:09 ` [tip:sched/core] stop_machine: Unexport __stop_machine() tip-bot for Oleg Nesterov
2015-06-30 1:29 ` [PATCH 4/5] stop_machine: use cpu_stop_fn_t where possible Oleg Nesterov
2015-06-30 14:28 ` Peter Zijlstra
2015-08-03 17:09 ` [tip:sched/core] stop_machine: Use 'cpu_stop_fn_t' " tip-bot for Oleg Nesterov
2015-06-30 1:29 ` [PATCH 5/5] stop_machine: cpu_stop_park() should remove cpu_stop_work's from list Oleg Nesterov
2015-08-03 17:10 ` [tip:sched/core] stop_machine: Remove cpu_stop_work' s from list in cpu_stop_park() tip-bot for Oleg Nesterov
2015-06-30 12:52 ` [PATCH 0/5] stop_machine: cleanups and fix Oleg Nesterov
2015-07-01 19:22 ` Oleg Nesterov
2015-07-01 19:23 ` Oleg Nesterov [this message]
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20150701192341.GB1476@redhat.com \
--to=oleg@redhat.com \
--cc=dave@stgolabs.net \
--cc=der.herr@hofr.at \
--cc=linux-kernel@vger.kernel.org \
--cc=mingo@redhat.com \
--cc=paulmck@linux.vnet.ibm.com \
--cc=peterz@infradead.org \
--cc=riel@redhat.com \
--cc=tj@kernel.org \
--cc=torvalds@linux-foundation.org \
--cc=viro@ZenIV.linux.org.uk \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.