public inbox for linux-kernel@vger.kernel.org
 help / color / mirror / Atom feed
* Re: semaphore: lockless fastpath using atomic_{inc,dec}_return
@ 2008-07-09 15:39 Bruno Santos
  2008-07-09 15:50 ` Arjan van de Ven
  0 siblings, 1 reply; 8+ messages in thread
From: Bruno Santos @ 2008-07-09 15:39 UTC (permalink / raw)
  To: Arjan van de Ven; +Cc: linux-kernel, Christoph Lameter

Hi,

 >hi,
 >
 >not to ruin the party but... how is this lockless? An atomic variable
 >is every bit a "lock" as a spinlock is... and very much equally
 >expensive as well for most cases ;-(

Perhaps not the best the choice of words, I should have omitted the word 
lockless. But it seems my understanding of lockless and yours is different.
And indeed, it's very expensive as a spinlock, but comparatively, is 
only one operation, that if successful doesn't have to lock and then 
unlock (that's why I called it lockless ...).
The mutex takes the same approach, however it uses it's own flavour of 
atomic ops. What I'm really interested is if this brings any benefit in 
terms of performance.


 > And is this safe? It seems that we can always be rescheduled after 
the atomic operation and
 > interrupts can occur too. You need to tell us why this is safe in all 
cases.

The slowpaths take care of that:
In 'down' slowpath after acquiring the spinlock the semaphore may have 
been unlocked ("we can always be rescheduled after the atomic operation 
and interrupts can occur too"), so we test again doing an 
atomic_dec_return, like in fastpath case, if it fails we proced to wait 
list and wait loop until someone wake us, if we get 
task_interrupted/timeout we just abandon the wait list.
In 'up' slowpath after acquiring the spinlock we wake up one waiter, 
however the list may be empty because we acquired the lock faster than a 
possible waiter(s) or the waiter(s) abandoned the wait list, in such 
case we atomic_inc the count to value that is >= 1 (taking into account 
another 'up').


^ permalink raw reply	[flat|nested] 8+ messages in thread
* Re: semaphore: lockless fastpath using atomic_{inc,dec}_return
@ 2008-07-09 13:13 Bruno Santos
  2008-07-09 14:16 ` Arjan van de Ven
  0 siblings, 1 reply; 8+ messages in thread
From: Bruno Santos @ 2008-07-09 13:13 UTC (permalink / raw)
  To: linux-kernel; +Cc: bsantos

[-- Attachment #1: Type: text/plain, Size: 7671 bytes --]

So far the machine I'm testing this (Core2 Duo) it's been up to almost 24H.

It seems the patch got screwed by the mailer, so I'm posting it again.


 From 343d08a5d172d103e49c77e5580a45f02fab2b5e Mon Sep 17 00:00:00 2001
From: Bruno Santos <bsantos@av.it.pt>
Date: Tue, 8 Jul 2008 23:40:53 +0100
Subject: [PATCH] semaphore lockless fastpath


Signed-off-by: Bruno Santos <bsantos@av.it.pt>
---
 include/linux/semaphore.h |    4 +-
 kernel/semaphore.c        |  131 
+++++++++++++++++++++++++--------------------
 2 files changed, 75 insertions(+), 60 deletions(-)

diff --git a/include/linux/semaphore.h b/include/linux/semaphore.h
index 9cae64b..d7850f0 100644
--- a/include/linux/semaphore.h
+++ b/include/linux/semaphore.h
@@ -14,15 +14,15 @@
 
 /* Please don't access any members of this structure directly */
 struct semaphore {
+    atomic_t        count;
     spinlock_t        lock;
-    unsigned int        count;
     struct list_head    wait_list;
 };
 
 #define __SEMAPHORE_INITIALIZER(name, n)                \
 {                                    \
+    .count        = ATOMIC_INIT(n),                \
     .lock        = __SPIN_LOCK_UNLOCKED((name).lock),        \
-    .count        = n,                        \
     .wait_list    = LIST_HEAD_INIT((name).wait_list),        \
 }
 
diff --git a/kernel/semaphore.c b/kernel/semaphore.c
index 5c2942e..2815980 100644
--- a/kernel/semaphore.c
+++ b/kernel/semaphore.c
@@ -8,6 +8,8 @@
  * A counting semaphore may be acquired 'n' times before sleeping.
  * See mutex.c for single-acquisition sleeping locks which enforce
  * rules which allow code to be debugged more easily.
+ *
+ * Lockless fastpath by Bruno Santos <bsantos@av.it.pt>
  */
 
 /*
@@ -22,7 +24,7 @@
  * too.
  *
  * The ->count variable represents how many more tasks can acquire this
- * semaphore.  If it's zero, there may be tasks waiting on the wait_list.
+ * semaphore.  If it's negative, there may be tasks waiting on the 
wait_list.
  */
 
 #include <linux/compiler.h>
@@ -51,14 +53,10 @@ static noinline void __up(struct semaphore *sem);
  */
 void down(struct semaphore *sem)
 {
-    unsigned long flags;
+    if (likely(atomic_dec_return(&sem->count) >= 0))
+        return;
 
-    spin_lock_irqsave(&sem->lock, flags);
-    if (likely(sem->count > 0))
-        sem->count--;
-    else
-        __down(sem);
-    spin_unlock_irqrestore(&sem->lock, flags);
+    __down(sem);
 }
 EXPORT_SYMBOL(down);
 
@@ -73,17 +71,10 @@ EXPORT_SYMBOL(down);
  */
 int down_interruptible(struct semaphore *sem)
 {
-    unsigned long flags;
-    int result = 0;
+    if (likely(atomic_dec_return(&sem->count) >= 0))
+        return 0;
 
-    spin_lock_irqsave(&sem->lock, flags);
-    if (likely(sem->count > 0))
-        sem->count--;
-    else
-        result = __down_interruptible(sem);
-    spin_unlock_irqrestore(&sem->lock, flags);
-
-    return result;
+    return __down_interruptible(sem);
 }
 EXPORT_SYMBOL(down_interruptible);
 
@@ -99,17 +90,10 @@ EXPORT_SYMBOL(down_interruptible);
  */
 int down_killable(struct semaphore *sem)
 {
-    unsigned long flags;
-    int result = 0;
+    if (likely(atomic_dec_return(&sem->count) >= 0))
+        return 0;
 
-    spin_lock_irqsave(&sem->lock, flags);
-    if (likely(sem->count > 0))
-        sem->count--;
-    else
-        result = __down_killable(sem);
-    spin_unlock_irqrestore(&sem->lock, flags);
-
-    return result;
+    return __down_killable(sem);
 }
 EXPORT_SYMBOL(down_killable);
 
@@ -128,16 +112,21 @@ EXPORT_SYMBOL(down_killable);
  */
 int down_trylock(struct semaphore *sem)
 {
-    unsigned long flags;
-    int count;
+    int old, cmp;
 
-    spin_lock_irqsave(&sem->lock, flags);
-    count = sem->count - 1;
-    if (likely(count >= 0))
-        sem->count = count;
-    spin_unlock_irqrestore(&sem->lock, flags);
+    /*
+     * The down_trylock fastpath is not very optimal compared to the
+     * down and up fastpath's, but it should be used less frequently.
+     */
+    old = atomic_read(&sem->count);
+    while (old > 0) {
+        cmp = old;
+        old = atomic_cmpxchg(&sem->count, cmp, old - 1);
+        if (old == cmp)
+            return 0;
+    }
 
-    return (count < 0);
+    return 1;
 }
 EXPORT_SYMBOL(down_trylock);
 
@@ -153,17 +142,10 @@ EXPORT_SYMBOL(down_trylock);
  */
 int down_timeout(struct semaphore *sem, long jiffies)
 {
-    unsigned long flags;
-    int result = 0;
-
-    spin_lock_irqsave(&sem->lock, flags);
-    if (likely(sem->count > 0))
-        sem->count--;
-    else
-        result = __down_timeout(sem, jiffies);
-    spin_unlock_irqrestore(&sem->lock, flags);
+    if (likely(atomic_dec_return(&sem->count) >= 0))
+        return 0;
 
-    return result;
+    return __down_timeout(sem, jiffies);
 }
 EXPORT_SYMBOL(down_timeout);
 
@@ -176,14 +158,10 @@ EXPORT_SYMBOL(down_timeout);
  */
 void up(struct semaphore *sem)
 {
-    unsigned long flags;
+    if (likely(atomic_inc_return(&sem->count) > 0))
+        return;
 
-    spin_lock_irqsave(&sem->lock, flags);
-    if (likely(list_empty(&sem->wait_list)))
-        sem->count++;
-    else
-        __up(sem);
-    spin_unlock_irqrestore(&sem->lock, flags);
+    __up(sem);
 }
 EXPORT_SYMBOL(up);
 
@@ -205,6 +183,15 @@ static inline int __sched __down_common(struct 
semaphore *sem, long state,
 {
     struct task_struct *task = current;
     struct semaphore_waiter waiter;
+    unsigned long flags;
+
+    spin_lock_irqsave(&sem->lock, flags);
+    /*
+     * Someone may have incremented the count and failed to wake
+     * us before we acquired the spinlock.
+     */
+    if (atomic_dec_return(&sem->count) >= 0)
+        goto done;
 
     list_add_tail(&waiter.list, &sem->wait_list);
     waiter.task = task;
@@ -222,16 +209,22 @@ static inline int __sched __down_common(struct 
semaphore *sem, long state,
         timeout = schedule_timeout(timeout);
         spin_lock_irq(&sem->lock);
         if (waiter.up)
-            return 0;
+            goto done;
     }
 
  timed_out:
     list_del(&waiter.list);
+    spin_unlock_irqrestore(&sem->lock, flags);
     return -ETIME;
 
  interrupted:
     list_del(&waiter.list);
+    spin_unlock_irqrestore(&sem->lock, flags);
     return -EINTR;
+
+ done:
+    spin_unlock_irqrestore(&sem->lock, flags);
+    return 0;
 }
 
 static noinline void __sched __down(struct semaphore *sem)
@@ -256,9 +249,31 @@ static noinline int __sched __down_timeout(struct 
semaphore *sem, long jiffies)
 
 static noinline void __sched __up(struct semaphore *sem)
 {
-    struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
+    struct semaphore_waiter *waiter;
+    unsigned long flags;
+    int old, cmp;
+
+    spin_lock_irqsave(&sem->lock, flags);
+    /*
+     * If the wait list is empty, it means either we acquired the spinlock
+     * faster than a possible waiter or the possible waiter abandoned the
+     * wait because it got interrupted or timed out. In such case we have
+     * to increment the count to a value that is greater or equal than 1.
+     */
+    if (list_empty(&sem->wait_list)) {
+        old = atomic_read(&sem->count);
+        do {
+            cmp = old;
+            old = (old > 0) ? old + 1 : 1;
+            old = atomic_cmpxchg(&sem->count, cmp, old);
+        } while (cmp != old);
+
+    } else {
+        waiter = list_first_entry(&sem->wait_list,
                         struct semaphore_waiter, list);
-    list_del(&waiter->list);
-    waiter->up = 1;
-    wake_up_process(waiter->task);
+        list_del(&waiter->list);
+        waiter->up = 1;
+        wake_up_process(waiter->task);
+    }
+    spin_unlock_irqrestore(&sem->lock, flags);
 }
-- 
1.5.2.5


[-- Attachment #2: semaphore-lockless-fastpath.patch --]
[-- Type: text/plain, Size: 6831 bytes --]

From 343d08a5d172d103e49c77e5580a45f02fab2b5e Mon Sep 17 00:00:00 2001
From: Bruno Santos <bsantos@av.it.pt>
Date: Tue, 8 Jul 2008 23:40:53 +0100
Subject: [PATCH] semaphore lockless fastpath


Signed-off-by: Bruno Santos <bsantos@av.it.pt>
---
 include/linux/semaphore.h |    4 +-
 kernel/semaphore.c        |  131 +++++++++++++++++++++++++--------------------
 2 files changed, 75 insertions(+), 60 deletions(-)

diff --git a/include/linux/semaphore.h b/include/linux/semaphore.h
index 9cae64b..d7850f0 100644
--- a/include/linux/semaphore.h
+++ b/include/linux/semaphore.h
@@ -14,15 +14,15 @@
 
 /* Please don't access any members of this structure directly */
 struct semaphore {
+	atomic_t		count;
 	spinlock_t		lock;
-	unsigned int		count;
 	struct list_head	wait_list;
 };
 
 #define __SEMAPHORE_INITIALIZER(name, n)				\
 {									\
+	.count		= ATOMIC_INIT(n),				\
 	.lock		= __SPIN_LOCK_UNLOCKED((name).lock),		\
-	.count		= n,						\
 	.wait_list	= LIST_HEAD_INIT((name).wait_list),		\
 }
 
diff --git a/kernel/semaphore.c b/kernel/semaphore.c
index 5c2942e..2815980 100644
--- a/kernel/semaphore.c
+++ b/kernel/semaphore.c
@@ -8,6 +8,8 @@
  * A counting semaphore may be acquired 'n' times before sleeping.
  * See mutex.c for single-acquisition sleeping locks which enforce
  * rules which allow code to be debugged more easily.
+ *
+ * Lockless fastpath by Bruno Santos <bsantos@av.it.pt>
  */
 
 /*
@@ -22,7 +24,7 @@
  * too.
  *
  * The ->count variable represents how many more tasks can acquire this
- * semaphore.  If it's zero, there may be tasks waiting on the wait_list.
+ * semaphore.  If it's negative, there may be tasks waiting on the wait_list.
  */
 
 #include <linux/compiler.h>
@@ -51,14 +53,10 @@ static noinline void __up(struct semaphore *sem);
  */
 void down(struct semaphore *sem)
 {
-	unsigned long flags;
+	if (likely(atomic_dec_return(&sem->count) >= 0))
+		return;
 
-	spin_lock_irqsave(&sem->lock, flags);
-	if (likely(sem->count > 0))
-		sem->count--;
-	else
-		__down(sem);
-	spin_unlock_irqrestore(&sem->lock, flags);
+	__down(sem);
 }
 EXPORT_SYMBOL(down);
 
@@ -73,17 +71,10 @@ EXPORT_SYMBOL(down);
  */
 int down_interruptible(struct semaphore *sem)
 {
-	unsigned long flags;
-	int result = 0;
+	if (likely(atomic_dec_return(&sem->count) >= 0))
+		return 0;
 
-	spin_lock_irqsave(&sem->lock, flags);
-	if (likely(sem->count > 0))
-		sem->count--;
-	else
-		result = __down_interruptible(sem);
-	spin_unlock_irqrestore(&sem->lock, flags);
-
-	return result;
+	return __down_interruptible(sem);
 }
 EXPORT_SYMBOL(down_interruptible);
 
@@ -99,17 +90,10 @@ EXPORT_SYMBOL(down_interruptible);
  */
 int down_killable(struct semaphore *sem)
 {
-	unsigned long flags;
-	int result = 0;
+	if (likely(atomic_dec_return(&sem->count) >= 0))
+		return 0;
 
-	spin_lock_irqsave(&sem->lock, flags);
-	if (likely(sem->count > 0))
-		sem->count--;
-	else
-		result = __down_killable(sem);
-	spin_unlock_irqrestore(&sem->lock, flags);
-
-	return result;
+	return __down_killable(sem);
 }
 EXPORT_SYMBOL(down_killable);
 
@@ -128,16 +112,21 @@ EXPORT_SYMBOL(down_killable);
  */
 int down_trylock(struct semaphore *sem)
 {
-	unsigned long flags;
-	int count;
+	int old, cmp;
 
-	spin_lock_irqsave(&sem->lock, flags);
-	count = sem->count - 1;
-	if (likely(count >= 0))
-		sem->count = count;
-	spin_unlock_irqrestore(&sem->lock, flags);
+	/*
+	 * The down_trylock fastpath is not very optimal compared to the
+	 * down and up fastpath's, but it should be used less frequently.
+	 */
+	old = atomic_read(&sem->count);
+	while (old > 0) {
+		cmp = old;
+		old = atomic_cmpxchg(&sem->count, cmp, old - 1);
+		if (old == cmp)
+			return 0;
+	}
 
-	return (count < 0);
+	return 1;
 }
 EXPORT_SYMBOL(down_trylock);
 
@@ -153,17 +142,10 @@ EXPORT_SYMBOL(down_trylock);
  */
 int down_timeout(struct semaphore *sem, long jiffies)
 {
-	unsigned long flags;
-	int result = 0;
-
-	spin_lock_irqsave(&sem->lock, flags);
-	if (likely(sem->count > 0))
-		sem->count--;
-	else
-		result = __down_timeout(sem, jiffies);
-	spin_unlock_irqrestore(&sem->lock, flags);
+	if (likely(atomic_dec_return(&sem->count) >= 0))
+		return 0;
 
-	return result;
+	return __down_timeout(sem, jiffies);
 }
 EXPORT_SYMBOL(down_timeout);
 
@@ -176,14 +158,10 @@ EXPORT_SYMBOL(down_timeout);
  */
 void up(struct semaphore *sem)
 {
-	unsigned long flags;
+	if (likely(atomic_inc_return(&sem->count) > 0))
+		return;
 
-	spin_lock_irqsave(&sem->lock, flags);
-	if (likely(list_empty(&sem->wait_list)))
-		sem->count++;
-	else
-		__up(sem);
-	spin_unlock_irqrestore(&sem->lock, flags);
+	__up(sem);
 }
 EXPORT_SYMBOL(up);
 
@@ -205,6 +183,15 @@ static inline int __sched __down_common(struct semaphore *sem, long state,
 {
 	struct task_struct *task = current;
 	struct semaphore_waiter waiter;
+	unsigned long flags;
+
+	spin_lock_irqsave(&sem->lock, flags);
+	/*
+	 * Someone may have incremented the count and failed to wake
+	 * us before we acquired the spinlock.
+	 */
+	if (atomic_dec_return(&sem->count) >= 0)
+		goto done;
 
 	list_add_tail(&waiter.list, &sem->wait_list);
 	waiter.task = task;
@@ -222,16 +209,22 @@ static inline int __sched __down_common(struct semaphore *sem, long state,
 		timeout = schedule_timeout(timeout);
 		spin_lock_irq(&sem->lock);
 		if (waiter.up)
-			return 0;
+			goto done;
 	}
 
  timed_out:
 	list_del(&waiter.list);
+	spin_unlock_irqrestore(&sem->lock, flags);
 	return -ETIME;
 
  interrupted:
 	list_del(&waiter.list);
+	spin_unlock_irqrestore(&sem->lock, flags);
 	return -EINTR;
+
+ done:
+	spin_unlock_irqrestore(&sem->lock, flags);
+	return 0;
 }
 
 static noinline void __sched __down(struct semaphore *sem)
@@ -256,9 +249,31 @@ static noinline int __sched __down_timeout(struct semaphore *sem, long jiffies)
 
 static noinline void __sched __up(struct semaphore *sem)
 {
-	struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
+	struct semaphore_waiter *waiter;
+	unsigned long flags;
+	int old, cmp;
+
+	spin_lock_irqsave(&sem->lock, flags);
+	/*
+	 * If the wait list is empty, it means either we acquired the spinlock
+	 * faster than a possible waiter or the possible waiter abandoned the
+	 * wait because it got interrupted or timed out. In such case we have
+	 * to increment the count to a value that is greater or equal than 1.
+	 */
+	if (list_empty(&sem->wait_list)) {
+		old = atomic_read(&sem->count);
+		do {
+			cmp = old;
+			old = (old > 0) ? old + 1 : 1;
+			old = atomic_cmpxchg(&sem->count, cmp, old);
+		} while (cmp != old);
+
+	} else {
+		waiter = list_first_entry(&sem->wait_list,
 						struct semaphore_waiter, list);
-	list_del(&waiter->list);
-	waiter->up = 1;
-	wake_up_process(waiter->task);
+		list_del(&waiter->list);
+		waiter->up = 1;
+		wake_up_process(waiter->task);
+	}
+	spin_unlock_irqrestore(&sem->lock, flags);
 }
-- 
1.5.2.5


^ permalink raw reply related	[flat|nested] 8+ messages in thread

end of thread, other threads:[~2008-07-09 20:05 UTC | newest]

Thread overview: 8+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2008-07-09 15:39 semaphore: lockless fastpath using atomic_{inc,dec}_return Bruno Santos
2008-07-09 15:50 ` Arjan van de Ven
2008-07-09 16:56   ` Bruno Santos
2008-07-09 19:27     ` Bruno Santos
2008-07-09 20:05     ` Arjan van de Ven
  -- strict thread matches above, loose matches on Subject: below --
2008-07-09 13:13 Bruno Santos
2008-07-09 14:16 ` Arjan van de Ven
2008-07-09 14:24   ` Christoph Lameter

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox