From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1753412AbbFZCR1 (ORCPT ); Thu, 25 Jun 2015 22:17:27 -0400 Received: from mx1.redhat.com ([209.132.183.28]:39625 "EHLO mx1.redhat.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1752373AbbFZCQp (ORCPT ); Thu, 25 Jun 2015 22:16:45 -0400 Date: Fri, 26 Jun 2015 04:15:26 +0200 From: Oleg Nesterov To: Peter Zijlstra , paulmck@linux.vnet.ibm.com, tj@kernel.org, mingo@redhat.com, der.herr@hofr.at, dave@stgolabs.net, riel@redhat.com, viro@ZenIV.linux.org.uk, torvalds@linux-foundation.org Cc: linux-kernel@vger.kernel.org Subject: [RFC PATCH 6/6] stop_machine: optimize stop_work_alloc() Message-ID: <20150626021526.GA5717@redhat.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20150626021455.GA5675@redhat.com> User-Agent: Mutt/1.5.18 (2008-05-17) Sender: linux-kernel-owner@vger.kernel.org List-ID: X-Mailing-List: linux-kernel@vger.kernel.org wait_event()/wake_up_all() in stop_work_alloc/stop_work_free logic is very suboptimal because of non-exclusive wakeups. So we add the wait_queue_func_t alloc_wake() helper which wakes the waiter up only a) if it actually waits for a stop_work in the "freed" cpumask, and b) only after we already set ->stop_owner = waiter. So if 2 stop_machine()'s race with each other, the loser will likely call schedule() only once and we will have a single wakeup. TODO: we can optimize (and simplify!) this code more. We can remove stop_work_alloc_one() and fold it into stop_work_alloc(), so that prepare_to_wait() will be the outer loop. Lets do this later. TODO: the init_waitqueue_func_entry() code in stop_work_alloc_one() is really annoying, we need the trivial new __init_wait(wait, func) helper. Signed-off-by: Oleg Nesterov --- kernel/stop_machine.c | 47 +++++++++++++++++++++++++++++++++++++++++++---- 1 files changed, 43 insertions(+), 4 deletions(-) diff --git a/kernel/stop_machine.c b/kernel/stop_machine.c index 572abc9..bbfc670 100644 --- a/kernel/stop_machine.c +++ b/kernel/stop_machine.c @@ -63,21 +63,60 @@ static void stop_work_free(const struct cpumask *cpumask) for_each_cpu(cpu, cpumask) stop_work_free_one(cpu); - wake_up_all(&stop_work_wq); + __wake_up(&stop_work_wq, TASK_ALL, 0, (void *)cpumask); +} + +struct alloc_wait { + wait_queue_t wait; + int cpu; +}; + +static int alloc_wake(wait_queue_t *wait, unsigned int mode, int sync, void *key) +{ + struct alloc_wait *aw = container_of(wait, struct alloc_wait, wait); + struct cpu_stopper *stopper = &per_cpu(cpu_stopper, aw->cpu); + const struct cpumask *cpumask = key; + + if (!cpumask_test_cpu(aw->cpu, cpumask)) + return 0; + if (cmpxchg(&stopper->stop_owner, NULL, aw->wait.private) != NULL) + return 0; + + return autoremove_wake_function(wait, mode, sync, key); } static struct cpu_stop_work *stop_work_alloc_one(int cpu, bool wait) { struct cpu_stopper *stopper = &per_cpu(cpu_stopper, cpu); + struct task_struct *me = current; + struct alloc_wait aw; - if (cmpxchg(&stopper->stop_owner, NULL, current) == NULL) + if (cmpxchg(&stopper->stop_owner, NULL, me) == NULL) goto done; if (!wait) return NULL; - __wait_event(stop_work_wq, - cmpxchg(&stopper->stop_owner, NULL, current) == NULL); + /* TODO: add __init_wait(wait, func) helper! */ + INIT_LIST_HEAD(&aw.wait.task_list); + init_waitqueue_func_entry(&aw.wait, alloc_wake); + aw.wait.private = me; + aw.cpu = cpu; + for (;;) { + prepare_to_wait(&stop_work_wq, &aw.wait, TASK_UNINTERRUPTIBLE); + /* + * This can "falsely" fail if we race with alloc_wake() and + * stopper->stop_owner is already me, in this case schedule() + * won't block and the check below will notice this change. + */ + if (cmpxchg(&stopper->stop_owner, NULL, me) == NULL) + break; + + schedule(); + if (likely(stopper->stop_owner == me)) + break; + } + finish_wait(&stop_work_wq, &aw.wait); done: return &stopper->stop_work; } -- 1.5.5.1