From: Gerd Gerats <gerd.gerats.lkml@gmx.de>
To: tglx@linutronix.de, mingo@redhat.com, peterz@infradead.org,
dvhart@infradead.org
Cc: LKML <linux-kernel@vger.kernel.org>
Subject: [RFC] futex: hashbucket as list of futex instead of waiters
Date: Sun, 10 Sep 2017 23:41:58 +0200 [thread overview]
Message-ID: <20170910213324.GA24252@pluto.fritz.box> (raw)
When using futex as a condition variable, for example: to manage a
threadpool, there may be a lot of threads inside the futex_wait to sleep on
this futex. The futex_hash_bucket consists therefore of many struct futex_q
for the same futex.
On bad luck another futex, used as mutex, hashed into the same bucket.
Every futex_wake on this mutex, has to scan the whole chain of above waiter
to find the struct futex_q for this mutex. For non-unusual threadpool sizes
of more than 20, this should be a considerable effort.
I therefore suggest to include in the hash-bucketchain only one struct
futex_q per futex and to queue additional waiter in an extrachain at the
'top' futex_q entry. Thus different futex are isolated from each other, the
cost of a hash collision is reduced.
To show the idea, I added a sample patch. Here, the plist is exchanged for
a futex-specific implementation. kernel/pring.h is certainly not not the
right place.
Signed-off-by: Gerd Gerats <gerd.gerats@gmx.de>
---
kernel/futex.c | 159 +++++++++++++++++++++++++++++++++++++++------------------
kernel/pring.h | 110 +++++++++++++++++++++++++++++++++++++++
2 files changed, 219 insertions(+), 50 deletions(-)
create mode 100644 kernel/pring.h
diff --git a/kernel/futex.c b/kernel/futex.c
index 357348a6cf6b..5598c4b1ad96 100644
--- a/kernel/futex.c
+++ b/kernel/futex.c
@@ -72,6 +72,8 @@
#include "locking/rtmutex_common.h"
+#include "pring.h"
+
/*
* READ this before attempting to hack on futexes!
*
@@ -229,7 +231,7 @@ struct futex_pi_state {
* we can wake only the relevant ones (hashed queues may be shared).
*
* A futex_q has a woken state, just like tasks have TASK_RUNNING.
- * It is considered woken when plist_node_empty(&q->list) || q->lock_ptr == 0.
+ * It is considered woken when futexq_lists_empty(&q) || q->lock_ptr == 0.
* The order of wakeup is always to make the first condition true, then
* the second.
*
@@ -237,7 +239,8 @@ struct futex_pi_state {
* the rt_mutex code. See unqueue_me_pi().
*/
struct futex_q {
- struct plist_node list;
+ struct list_head list;
+ struct pring_node ring;
struct task_struct *task;
spinlock_t *lock_ptr;
@@ -262,7 +265,7 @@ static const struct futex_q futex_q_init = {
struct futex_hash_bucket {
atomic_t waiters;
spinlock_t lock;
- struct plist_head chain;
+ struct list_head chain;
} ____cacheline_aligned_in_smp;
/*
@@ -745,13 +748,78 @@ static struct futex_q *futex_top_waiter(struct futex_hash_bucket *hb,
{
struct futex_q *this;
- plist_for_each_entry(this, &hb->chain, list) {
+ list_for_each_entry(this, &hb->chain, list) {
if (match_futex(&this->key, key))
return this;
}
return NULL;
}
+/**
+ * futexq_init() - initialize futex_q chain fields
+ * @q The futex_q to initialize
+ * @prio: priority
+ */
+static void futexq_init(struct futex_q *q, int prio)
+{
+ INIT_LIST_HEAD(&q->list);
+ pring_init(&q->ring, prio);
+}
+
+/**
+ * futexq_add() - Enqueue the futex_q on the futex_hash_bucket
+ * @q: The futex_q to enqueue
+ * @hb: The destination hash bucket
+ */
+static void futexq_add(struct futex_q *q, struct futex_hash_bucket *hb)
+{
+ struct futex_q *top;
+
+ top = futex_top_waiter(hb, &q->key);
+ if (top) {
+ if (pring_add(&q->ring, &top->ring) == &q->ring)
+ list_replace_init(&top->list, &q->list);
+ } else {
+ list_add_tail(&q->list, &hb->chain);
+ }
+}
+
+/**
+ * futexq_del() - Remove the futex_q from its futex_hash_bucket
+ * @q: The futex_q to unqueue
+ * @hb: hash bucket to remove from
+ */
+static void futexq_del(struct futex_q *q, struct futex_hash_bucket *hb)
+{
+ struct futex_q *next;
+
+ if (pring_is_singular(&q->ring)) {
+ list_del_init(&q->list);
+ } else {
+ next = pring_next_entry(q, ring);
+ if (!list_empty(&q->list))
+ list_replace_init(&q->list, &next->list);
+ pring_del(&q->ring);
+ }
+}
+
+static inline bool futexq_lists_empty(struct futex_q *q)
+{
+ return list_empty(&q->list) && pring_is_singular(&q->ring);
+}
+
+static inline struct futex_q *futexq_next_or_null(struct futex_q *q)
+{
+ struct futex_q *next = pring_next_entry(q, ring);
+
+ return list_empty(&next->list) ? next : 0;
+}
+
+#define futex_for_each_match_safe(pos, n, hb, key) \
+ for (pos = futex_top_waiter(hb, key); \
+ pos && ({ n = futexq_next_or_null(pos); 1; }); \
+ pos = n)
+
static int cmpxchg_futex_value_locked(u32 *curval, u32 __user *uaddr,
u32 uval, u32 newval)
{
@@ -1352,11 +1420,11 @@ static void __unqueue_futex(struct futex_q *q)
struct futex_hash_bucket *hb;
if (WARN_ON_SMP(!q->lock_ptr || !spin_is_locked(q->lock_ptr))
- || WARN_ON(plist_node_empty(&q->list)))
+ || WARN_ON(futexq_lists_empty(q)))
return;
hb = container_of(q->lock_ptr, struct futex_hash_bucket, lock);
- plist_del(&q->list, &hb->chain);
+ futexq_del(q, hb);
hb_waiters_dec(hb);
}
@@ -1384,7 +1452,7 @@ static void mark_wake_futex(struct wake_q_head *wake_q, struct futex_q *q)
* is written, without taking any locks. This is possible in the event
* of a spurious wakeup, for example. A memory barrier is required here
* to prevent the following store to lock_ptr from getting ahead of the
- * plist_del in __unqueue_futex().
+ * futexq_del/pring_del in __unqueue_futex().
*/
smp_store_release(&q->lock_ptr, NULL);
}
@@ -1521,21 +1589,19 @@ futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
spin_lock(&hb->lock);
- plist_for_each_entry_safe(this, next, &hb->chain, list) {
- if (match_futex (&this->key, &key)) {
- if (this->pi_state || this->rt_waiter) {
- ret = -EINVAL;
- break;
- }
+ futex_for_each_match_safe(this, next, hb, &key) {
+ if (this->pi_state || this->rt_waiter) {
+ ret = -EINVAL;
+ break;
+ }
- /* Check if one of the bits is set in both bitsets */
- if (!(this->bitset & bitset))
- continue;
+ /* Check if one of the bits is set in both bitsets */
+ if (!(this->bitset & bitset))
+ continue;
- mark_wake_futex(&wake_q, this);
- if (++ret >= nr_wake)
- break;
- }
+ mark_wake_futex(&wake_q, this);
+ if (++ret >= nr_wake)
+ break;
}
spin_unlock(&hb->lock);
@@ -1604,30 +1670,26 @@ futex_wake_op(u32 __user *uaddr1, unsigned int flags, u32 __user *uaddr2,
goto retry;
}
- plist_for_each_entry_safe(this, next, &hb1->chain, list) {
- if (match_futex (&this->key, &key1)) {
- if (this->pi_state || this->rt_waiter) {
- ret = -EINVAL;
- goto out_unlock;
- }
- mark_wake_futex(&wake_q, this);
- if (++ret >= nr_wake)
- break;
+ futex_for_each_match_safe(this, next, hb1, &key1) {
+ if (this->pi_state || this->rt_waiter) {
+ ret = -EINVAL;
+ goto out_unlock;
}
+ mark_wake_futex(&wake_q, this);
+ if (++ret >= nr_wake)
+ break;
}
if (op_ret > 0) {
op_ret = 0;
- plist_for_each_entry_safe(this, next, &hb2->chain, list) {
- if (match_futex (&this->key, &key2)) {
- if (this->pi_state || this->rt_waiter) {
- ret = -EINVAL;
- goto out_unlock;
- }
- mark_wake_futex(&wake_q, this);
- if (++op_ret >= nr_wake2)
- break;
+ futex_for_each_match_safe(this, next, hb2, &key2) {
+ if (this->pi_state || this->rt_waiter) {
+ ret = -EINVAL;
+ goto out_unlock;
}
+ mark_wake_futex(&wake_q, this);
+ if (++op_ret >= nr_wake2)
+ break;
}
ret += op_ret;
}
@@ -1656,18 +1718,18 @@ void requeue_futex(struct futex_q *q, struct futex_hash_bucket *hb1,
{
/*
- * If key1 and key2 hash to the same bucket, no need to
+ * If key1 and key2 hash to the same bucket, we still need to
* requeue.
*/
+ futexq_del(q, hb1);
if (likely(&hb1->chain != &hb2->chain)) {
- plist_del(&q->list, &hb1->chain);
hb_waiters_dec(hb1);
hb_waiters_inc(hb2);
- plist_add(&q->list, &hb2->chain);
q->lock_ptr = &hb2->lock;
}
get_futex_key_refs(key2);
q->key = *key2;
+ futexq_add(q, hb2);
}
/**
@@ -1949,13 +2011,10 @@ static int futex_requeue(u32 __user *uaddr1, unsigned int flags,
}
}
- plist_for_each_entry_safe(this, next, &hb1->chain, list) {
+ futex_for_each_match_safe(this, next, hb1, &key1) {
if (task_count - nr_wake >= nr_requeue)
break;
- if (!match_futex(&this->key, &key1))
- continue;
-
/*
* FUTEX_WAIT_REQEUE_PI and FUTEX_CMP_REQUEUE_PI should always
* be paired with each other and no other futex ops.
@@ -2110,8 +2169,8 @@ static inline void __queue_me(struct futex_q *q, struct futex_hash_bucket *hb)
*/
prio = min(current->normal_prio, MAX_RT_PRIO);
- plist_node_init(&q->list, prio);
- plist_add(&q->list, &hb->chain);
+ futexq_init(q, prio);
+ futexq_add(q, hb);
q->task = current;
}
@@ -2396,7 +2455,7 @@ static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
* If we have been removed from the hash list, then another task
* has tried to wake us, and we can skip the call to schedule().
*/
- if (likely(!plist_node_empty(&q->list))) {
+ if (likely(!futexq_lists_empty(q))) {
/*
* If the timer has already expired, current will already be
* flagged for rescheduling. Only call schedule if there
@@ -2918,7 +2977,7 @@ int handle_early_requeue_pi_wakeup(struct futex_hash_bucket *hb,
* We were woken prior to requeue by a timeout or a signal.
* Unqueue the futex_q and determine which it was.
*/
- plist_del(&q->list, &hb->chain);
+ futexq_del(q, hb);
hb_waiters_dec(hb);
/* Handle spurious wakeups gracefully */
@@ -3495,7 +3554,7 @@ static int __init futex_init(void)
for (i = 0; i < futex_hashsize; i++) {
atomic_set(&futex_queues[i].waiters, 0);
- plist_head_init(&futex_queues[i].chain);
+ INIT_LIST_HEAD(&futex_queues[i].chain);
spin_lock_init(&futex_queues[i].lock);
}
diff --git a/kernel/pring.h b/kernel/pring.h
new file mode 100644
index 000000000000..5821ee4797e6
--- /dev/null
+++ b/kernel/pring.h
@@ -0,0 +1,110 @@
+#ifndef _LINUX_PRING_H_
+#define _LINUX_PRING_H_
+
+/*
+ * futex specific priority-sorted ring
+ *
+ * based on include/linux/plist.h
+ *
+ * Simple ASCII art explanation:
+ *
+ * fl:futx_list
+ * pl:prio_list
+ * nl:node_list
+ *
+ * +------+
+ * | v
+ * | |fl| HEAD
+ * | ^
+ * | |
+ * | v
+ * | +--------+
+ * | +->|pl|<-+
+ * | |10| (prio)
+ * | | |
+ * | +->|nl|<-+
+ * | +--------+
+ * | ^
+ * | |
+ * | v
+ * | +------------------------------------+
+ * | +->|pl|<->|pl|<--------------->|pl|<-+
+ * | |10| |21| |21| |21| |40| (prio)
+ * | | | | | | | | | | |
+ * | +->|nl|<->|nl|<->|nl|<->|nl|<->|nl|<-+
+ * | +------------------------------------+
+ * | ^
+ * +------+
+ */
+
+#include <linux/kernel.h>
+#include <linux/list.h>
+
+struct pring_node {
+ int prio;
+ struct list_head prio_list;
+ struct list_head node_list;
+};
+
+static void pring_init(struct pring_node *node, int prio)
+{
+ node->prio = prio;
+ INIT_LIST_HEAD(&node->prio_list);
+ INIT_LIST_HEAD(&node->node_list);
+}
+
+static inline bool pring_is_singular(struct pring_node *node)
+{
+ return list_empty(&node->node_list);
+}
+
+static void pring_del(struct pring_node *node)
+{
+ if (WARN_ON(list_empty(&node->node_list)))
+ return;
+ if (!list_empty(&node->prio_list)) {
+ struct pring_node *next;
+
+ next = list_next_entry(node, node_list);
+ /* add the next node into prio_list */
+ if (list_empty(&next->prio_list))
+ list_add(&next->prio_list, &node->prio_list);
+ list_del_init(&node->prio_list);
+ }
+ list_del_init(&node->node_list);
+}
+
+static struct pring_node *
+pring_add(struct pring_node *node, struct pring_node *top)
+{
+ struct pring_node *iter, *prev = NULL;
+ struct list_head *node_next = &top->node_list;
+
+ WARN_ON(!list_empty(&node->node_list));
+ WARN_ON(!list_empty(&node->prio_list));
+
+ iter = top;
+
+ do {
+ if (node->prio < iter->prio) {
+ node_next = &iter->node_list;
+ break;
+ }
+
+ prev = iter;
+ iter = list_entry(iter->prio_list.next,
+ struct pring_node, prio_list);
+ } while (iter != top);
+
+ if (!prev || prev->prio != node->prio)
+ list_add_tail(&node->prio_list, &iter->prio_list);
+ list_add_tail(&node->node_list, node_next);
+
+ return prev ? top : node;
+}
+
+#define pring_next_entry(pos, member) \
+ container_of(list_next_entry(&(pos)->member, node_list), \
+ typeof(*(pos)), member)
+
+#endif
--
2.14.1
next reply other threads:[~2017-09-10 21:42 UTC|newest]
Thread overview: 5+ messages / expand[flat|nested] mbox.gz Atom feed top
2017-09-10 21:41 Gerd Gerats [this message]
2017-09-28 10:58 ` [RFC] futex: hashbucket as list of futex instead of waiters Peter Zijlstra
2017-09-28 12:13 ` Thomas Gleixner
2017-09-28 22:39 ` Gerd Gerats
2017-09-29 8:36 ` Peter Zijlstra
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20170910213324.GA24252@pluto.fritz.box \
--to=gerd.gerats.lkml@gmx.de \
--cc=dvhart@infradead.org \
--cc=linux-kernel@vger.kernel.org \
--cc=mingo@redhat.com \
--cc=peterz@infradead.org \
--cc=tglx@linutronix.de \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.