public inbox for linux-kernel@vger.kernel.org
 help / color / mirror / Atom feed
From: Bruno Santos <bsantos@av.it.pt>
To: linux-kernel@vger.kernel.org
Cc: bsantos@av.it.pt
Subject: Re: semaphore: lockless fastpath using atomic_{inc,dec}_return
Date: Wed, 09 Jul 2008 14:13:29 +0100	[thread overview]
Message-ID: <4874B979.4020608@av.it.pt> (raw)

[-- Attachment #1: Type: text/plain, Size: 7671 bytes --]

So far the machine I'm testing this (Core2 Duo) it's been up to almost 24H.

It seems the patch got screwed by the mailer, so I'm posting it again.


 From 343d08a5d172d103e49c77e5580a45f02fab2b5e Mon Sep 17 00:00:00 2001
From: Bruno Santos <bsantos@av.it.pt>
Date: Tue, 8 Jul 2008 23:40:53 +0100
Subject: [PATCH] semaphore lockless fastpath


Signed-off-by: Bruno Santos <bsantos@av.it.pt>
---
 include/linux/semaphore.h |    4 +-
 kernel/semaphore.c        |  131 
+++++++++++++++++++++++++--------------------
 2 files changed, 75 insertions(+), 60 deletions(-)

diff --git a/include/linux/semaphore.h b/include/linux/semaphore.h
index 9cae64b..d7850f0 100644
--- a/include/linux/semaphore.h
+++ b/include/linux/semaphore.h
@@ -14,15 +14,15 @@
 
 /* Please don't access any members of this structure directly */
 struct semaphore {
+    atomic_t        count;
     spinlock_t        lock;
-    unsigned int        count;
     struct list_head    wait_list;
 };
 
 #define __SEMAPHORE_INITIALIZER(name, n)                \
 {                                    \
+    .count        = ATOMIC_INIT(n),                \
     .lock        = __SPIN_LOCK_UNLOCKED((name).lock),        \
-    .count        = n,                        \
     .wait_list    = LIST_HEAD_INIT((name).wait_list),        \
 }
 
diff --git a/kernel/semaphore.c b/kernel/semaphore.c
index 5c2942e..2815980 100644
--- a/kernel/semaphore.c
+++ b/kernel/semaphore.c
@@ -8,6 +8,8 @@
  * A counting semaphore may be acquired 'n' times before sleeping.
  * See mutex.c for single-acquisition sleeping locks which enforce
  * rules which allow code to be debugged more easily.
+ *
+ * Lockless fastpath by Bruno Santos <bsantos@av.it.pt>
  */
 
 /*
@@ -22,7 +24,7 @@
  * too.
  *
  * The ->count variable represents how many more tasks can acquire this
- * semaphore.  If it's zero, there may be tasks waiting on the wait_list.
+ * semaphore.  If it's negative, there may be tasks waiting on the 
wait_list.
  */
 
 #include <linux/compiler.h>
@@ -51,14 +53,10 @@ static noinline void __up(struct semaphore *sem);
  */
 void down(struct semaphore *sem)
 {
-    unsigned long flags;
+    if (likely(atomic_dec_return(&sem->count) >= 0))
+        return;
 
-    spin_lock_irqsave(&sem->lock, flags);
-    if (likely(sem->count > 0))
-        sem->count--;
-    else
-        __down(sem);
-    spin_unlock_irqrestore(&sem->lock, flags);
+    __down(sem);
 }
 EXPORT_SYMBOL(down);
 
@@ -73,17 +71,10 @@ EXPORT_SYMBOL(down);
  */
 int down_interruptible(struct semaphore *sem)
 {
-    unsigned long flags;
-    int result = 0;
+    if (likely(atomic_dec_return(&sem->count) >= 0))
+        return 0;
 
-    spin_lock_irqsave(&sem->lock, flags);
-    if (likely(sem->count > 0))
-        sem->count--;
-    else
-        result = __down_interruptible(sem);
-    spin_unlock_irqrestore(&sem->lock, flags);
-
-    return result;
+    return __down_interruptible(sem);
 }
 EXPORT_SYMBOL(down_interruptible);
 
@@ -99,17 +90,10 @@ EXPORT_SYMBOL(down_interruptible);
  */
 int down_killable(struct semaphore *sem)
 {
-    unsigned long flags;
-    int result = 0;
+    if (likely(atomic_dec_return(&sem->count) >= 0))
+        return 0;
 
-    spin_lock_irqsave(&sem->lock, flags);
-    if (likely(sem->count > 0))
-        sem->count--;
-    else
-        result = __down_killable(sem);
-    spin_unlock_irqrestore(&sem->lock, flags);
-
-    return result;
+    return __down_killable(sem);
 }
 EXPORT_SYMBOL(down_killable);
 
@@ -128,16 +112,21 @@ EXPORT_SYMBOL(down_killable);
  */
 int down_trylock(struct semaphore *sem)
 {
-    unsigned long flags;
-    int count;
+    int old, cmp;
 
-    spin_lock_irqsave(&sem->lock, flags);
-    count = sem->count - 1;
-    if (likely(count >= 0))
-        sem->count = count;
-    spin_unlock_irqrestore(&sem->lock, flags);
+    /*
+     * The down_trylock fastpath is not very optimal compared to the
+     * down and up fastpath's, but it should be used less frequently.
+     */
+    old = atomic_read(&sem->count);
+    while (old > 0) {
+        cmp = old;
+        old = atomic_cmpxchg(&sem->count, cmp, old - 1);
+        if (old == cmp)
+            return 0;
+    }
 
-    return (count < 0);
+    return 1;
 }
 EXPORT_SYMBOL(down_trylock);
 
@@ -153,17 +142,10 @@ EXPORT_SYMBOL(down_trylock);
  */
 int down_timeout(struct semaphore *sem, long jiffies)
 {
-    unsigned long flags;
-    int result = 0;
-
-    spin_lock_irqsave(&sem->lock, flags);
-    if (likely(sem->count > 0))
-        sem->count--;
-    else
-        result = __down_timeout(sem, jiffies);
-    spin_unlock_irqrestore(&sem->lock, flags);
+    if (likely(atomic_dec_return(&sem->count) >= 0))
+        return 0;
 
-    return result;
+    return __down_timeout(sem, jiffies);
 }
 EXPORT_SYMBOL(down_timeout);
 
@@ -176,14 +158,10 @@ EXPORT_SYMBOL(down_timeout);
  */
 void up(struct semaphore *sem)
 {
-    unsigned long flags;
+    if (likely(atomic_inc_return(&sem->count) > 0))
+        return;
 
-    spin_lock_irqsave(&sem->lock, flags);
-    if (likely(list_empty(&sem->wait_list)))
-        sem->count++;
-    else
-        __up(sem);
-    spin_unlock_irqrestore(&sem->lock, flags);
+    __up(sem);
 }
 EXPORT_SYMBOL(up);
 
@@ -205,6 +183,15 @@ static inline int __sched __down_common(struct 
semaphore *sem, long state,
 {
     struct task_struct *task = current;
     struct semaphore_waiter waiter;
+    unsigned long flags;
+
+    spin_lock_irqsave(&sem->lock, flags);
+    /*
+     * Someone may have incremented the count and failed to wake
+     * us before we acquired the spinlock.
+     */
+    if (atomic_dec_return(&sem->count) >= 0)
+        goto done;
 
     list_add_tail(&waiter.list, &sem->wait_list);
     waiter.task = task;
@@ -222,16 +209,22 @@ static inline int __sched __down_common(struct 
semaphore *sem, long state,
         timeout = schedule_timeout(timeout);
         spin_lock_irq(&sem->lock);
         if (waiter.up)
-            return 0;
+            goto done;
     }
 
  timed_out:
     list_del(&waiter.list);
+    spin_unlock_irqrestore(&sem->lock, flags);
     return -ETIME;
 
  interrupted:
     list_del(&waiter.list);
+    spin_unlock_irqrestore(&sem->lock, flags);
     return -EINTR;
+
+ done:
+    spin_unlock_irqrestore(&sem->lock, flags);
+    return 0;
 }
 
 static noinline void __sched __down(struct semaphore *sem)
@@ -256,9 +249,31 @@ static noinline int __sched __down_timeout(struct 
semaphore *sem, long jiffies)
 
 static noinline void __sched __up(struct semaphore *sem)
 {
-    struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
+    struct semaphore_waiter *waiter;
+    unsigned long flags;
+    int old, cmp;
+
+    spin_lock_irqsave(&sem->lock, flags);
+    /*
+     * If the wait list is empty, it means either we acquired the spinlock
+     * faster than a possible waiter or the possible waiter abandoned the
+     * wait because it got interrupted or timed out. In such case we have
+     * to increment the count to a value that is greater or equal than 1.
+     */
+    if (list_empty(&sem->wait_list)) {
+        old = atomic_read(&sem->count);
+        do {
+            cmp = old;
+            old = (old > 0) ? old + 1 : 1;
+            old = atomic_cmpxchg(&sem->count, cmp, old);
+        } while (cmp != old);
+
+    } else {
+        waiter = list_first_entry(&sem->wait_list,
                         struct semaphore_waiter, list);
-    list_del(&waiter->list);
-    waiter->up = 1;
-    wake_up_process(waiter->task);
+        list_del(&waiter->list);
+        waiter->up = 1;
+        wake_up_process(waiter->task);
+    }
+    spin_unlock_irqrestore(&sem->lock, flags);
 }
-- 
1.5.2.5


[-- Attachment #2: semaphore-lockless-fastpath.patch --]
[-- Type: text/plain, Size: 6831 bytes --]

From 343d08a5d172d103e49c77e5580a45f02fab2b5e Mon Sep 17 00:00:00 2001
From: Bruno Santos <bsantos@av.it.pt>
Date: Tue, 8 Jul 2008 23:40:53 +0100
Subject: [PATCH] semaphore lockless fastpath


Signed-off-by: Bruno Santos <bsantos@av.it.pt>
---
 include/linux/semaphore.h |    4 +-
 kernel/semaphore.c        |  131 +++++++++++++++++++++++++--------------------
 2 files changed, 75 insertions(+), 60 deletions(-)

diff --git a/include/linux/semaphore.h b/include/linux/semaphore.h
index 9cae64b..d7850f0 100644
--- a/include/linux/semaphore.h
+++ b/include/linux/semaphore.h
@@ -14,15 +14,15 @@
 
 /* Please don't access any members of this structure directly */
 struct semaphore {
+	atomic_t		count;
 	spinlock_t		lock;
-	unsigned int		count;
 	struct list_head	wait_list;
 };
 
 #define __SEMAPHORE_INITIALIZER(name, n)				\
 {									\
+	.count		= ATOMIC_INIT(n),				\
 	.lock		= __SPIN_LOCK_UNLOCKED((name).lock),		\
-	.count		= n,						\
 	.wait_list	= LIST_HEAD_INIT((name).wait_list),		\
 }
 
diff --git a/kernel/semaphore.c b/kernel/semaphore.c
index 5c2942e..2815980 100644
--- a/kernel/semaphore.c
+++ b/kernel/semaphore.c
@@ -8,6 +8,8 @@
  * A counting semaphore may be acquired 'n' times before sleeping.
  * See mutex.c for single-acquisition sleeping locks which enforce
  * rules which allow code to be debugged more easily.
+ *
+ * Lockless fastpath by Bruno Santos <bsantos@av.it.pt>
  */
 
 /*
@@ -22,7 +24,7 @@
  * too.
  *
  * The ->count variable represents how many more tasks can acquire this
- * semaphore.  If it's zero, there may be tasks waiting on the wait_list.
+ * semaphore.  If it's negative, there may be tasks waiting on the wait_list.
  */
 
 #include <linux/compiler.h>
@@ -51,14 +53,10 @@ static noinline void __up(struct semaphore *sem);
  */
 void down(struct semaphore *sem)
 {
-	unsigned long flags;
+	if (likely(atomic_dec_return(&sem->count) >= 0))
+		return;
 
-	spin_lock_irqsave(&sem->lock, flags);
-	if (likely(sem->count > 0))
-		sem->count--;
-	else
-		__down(sem);
-	spin_unlock_irqrestore(&sem->lock, flags);
+	__down(sem);
 }
 EXPORT_SYMBOL(down);
 
@@ -73,17 +71,10 @@ EXPORT_SYMBOL(down);
  */
 int down_interruptible(struct semaphore *sem)
 {
-	unsigned long flags;
-	int result = 0;
+	if (likely(atomic_dec_return(&sem->count) >= 0))
+		return 0;
 
-	spin_lock_irqsave(&sem->lock, flags);
-	if (likely(sem->count > 0))
-		sem->count--;
-	else
-		result = __down_interruptible(sem);
-	spin_unlock_irqrestore(&sem->lock, flags);
-
-	return result;
+	return __down_interruptible(sem);
 }
 EXPORT_SYMBOL(down_interruptible);
 
@@ -99,17 +90,10 @@ EXPORT_SYMBOL(down_interruptible);
  */
 int down_killable(struct semaphore *sem)
 {
-	unsigned long flags;
-	int result = 0;
+	if (likely(atomic_dec_return(&sem->count) >= 0))
+		return 0;
 
-	spin_lock_irqsave(&sem->lock, flags);
-	if (likely(sem->count > 0))
-		sem->count--;
-	else
-		result = __down_killable(sem);
-	spin_unlock_irqrestore(&sem->lock, flags);
-
-	return result;
+	return __down_killable(sem);
 }
 EXPORT_SYMBOL(down_killable);
 
@@ -128,16 +112,21 @@ EXPORT_SYMBOL(down_killable);
  */
 int down_trylock(struct semaphore *sem)
 {
-	unsigned long flags;
-	int count;
+	int old, cmp;
 
-	spin_lock_irqsave(&sem->lock, flags);
-	count = sem->count - 1;
-	if (likely(count >= 0))
-		sem->count = count;
-	spin_unlock_irqrestore(&sem->lock, flags);
+	/*
+	 * The down_trylock fastpath is not very optimal compared to the
+	 * down and up fastpath's, but it should be used less frequently.
+	 */
+	old = atomic_read(&sem->count);
+	while (old > 0) {
+		cmp = old;
+		old = atomic_cmpxchg(&sem->count, cmp, old - 1);
+		if (old == cmp)
+			return 0;
+	}
 
-	return (count < 0);
+	return 1;
 }
 EXPORT_SYMBOL(down_trylock);
 
@@ -153,17 +142,10 @@ EXPORT_SYMBOL(down_trylock);
  */
 int down_timeout(struct semaphore *sem, long jiffies)
 {
-	unsigned long flags;
-	int result = 0;
-
-	spin_lock_irqsave(&sem->lock, flags);
-	if (likely(sem->count > 0))
-		sem->count--;
-	else
-		result = __down_timeout(sem, jiffies);
-	spin_unlock_irqrestore(&sem->lock, flags);
+	if (likely(atomic_dec_return(&sem->count) >= 0))
+		return 0;
 
-	return result;
+	return __down_timeout(sem, jiffies);
 }
 EXPORT_SYMBOL(down_timeout);
 
@@ -176,14 +158,10 @@ EXPORT_SYMBOL(down_timeout);
  */
 void up(struct semaphore *sem)
 {
-	unsigned long flags;
+	if (likely(atomic_inc_return(&sem->count) > 0))
+		return;
 
-	spin_lock_irqsave(&sem->lock, flags);
-	if (likely(list_empty(&sem->wait_list)))
-		sem->count++;
-	else
-		__up(sem);
-	spin_unlock_irqrestore(&sem->lock, flags);
+	__up(sem);
 }
 EXPORT_SYMBOL(up);
 
@@ -205,6 +183,15 @@ static inline int __sched __down_common(struct semaphore *sem, long state,
 {
 	struct task_struct *task = current;
 	struct semaphore_waiter waiter;
+	unsigned long flags;
+
+	spin_lock_irqsave(&sem->lock, flags);
+	/*
+	 * Someone may have incremented the count and failed to wake
+	 * us before we acquired the spinlock.
+	 */
+	if (atomic_dec_return(&sem->count) >= 0)
+		goto done;
 
 	list_add_tail(&waiter.list, &sem->wait_list);
 	waiter.task = task;
@@ -222,16 +209,22 @@ static inline int __sched __down_common(struct semaphore *sem, long state,
 		timeout = schedule_timeout(timeout);
 		spin_lock_irq(&sem->lock);
 		if (waiter.up)
-			return 0;
+			goto done;
 	}
 
  timed_out:
 	list_del(&waiter.list);
+	spin_unlock_irqrestore(&sem->lock, flags);
 	return -ETIME;
 
  interrupted:
 	list_del(&waiter.list);
+	spin_unlock_irqrestore(&sem->lock, flags);
 	return -EINTR;
+
+ done:
+	spin_unlock_irqrestore(&sem->lock, flags);
+	return 0;
 }
 
 static noinline void __sched __down(struct semaphore *sem)
@@ -256,9 +249,31 @@ static noinline int __sched __down_timeout(struct semaphore *sem, long jiffies)
 
 static noinline void __sched __up(struct semaphore *sem)
 {
-	struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
+	struct semaphore_waiter *waiter;
+	unsigned long flags;
+	int old, cmp;
+
+	spin_lock_irqsave(&sem->lock, flags);
+	/*
+	 * If the wait list is empty, it means either we acquired the spinlock
+	 * faster than a possible waiter or the possible waiter abandoned the
+	 * wait because it got interrupted or timed out. In such case we have
+	 * to increment the count to a value that is greater or equal than 1.
+	 */
+	if (list_empty(&sem->wait_list)) {
+		old = atomic_read(&sem->count);
+		do {
+			cmp = old;
+			old = (old > 0) ? old + 1 : 1;
+			old = atomic_cmpxchg(&sem->count, cmp, old);
+		} while (cmp != old);
+
+	} else {
+		waiter = list_first_entry(&sem->wait_list,
 						struct semaphore_waiter, list);
-	list_del(&waiter->list);
-	waiter->up = 1;
-	wake_up_process(waiter->task);
+		list_del(&waiter->list);
+		waiter->up = 1;
+		wake_up_process(waiter->task);
+	}
+	spin_unlock_irqrestore(&sem->lock, flags);
 }
-- 
1.5.2.5


             reply	other threads:[~2008-07-09 14:14 UTC|newest]

Thread overview: 8+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2008-07-09 13:13 Bruno Santos [this message]
2008-07-09 14:16 ` semaphore: lockless fastpath using atomic_{inc,dec}_return Arjan van de Ven
2008-07-09 14:24   ` Christoph Lameter
  -- strict thread matches above, loose matches on Subject: below --
2008-07-09 15:39 Bruno Santos
2008-07-09 15:50 ` Arjan van de Ven
2008-07-09 16:56   ` Bruno Santos
2008-07-09 19:27     ` Bruno Santos
2008-07-09 20:05     ` Arjan van de Ven

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=4874B979.4020608@av.it.pt \
    --to=bsantos@av.it.pt \
    --cc=linux-kernel@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox