public inbox for linux-kernel@vger.kernel.org
 help / color / mirror / Atom feed
From: Peter Zijlstra <a.p.zijlstra@chello.nl>
To: Ingo Molnar <mingo@elte.hu>, Thomas Gleixner <tglx@linutronix.de>
Cc: linux-kernel@vger.kernel.org,
	Steven Rostedt <rostedt@goodmis.org>,
	Darren Hart <dvhart@linux.intel.com>,
	Manfred Spraul <manfred@colorfullife.com>,
	David Miller <davem@davemloft.net>,
	Eric Dumazet <eric.dumazet@gmail.com>,
	Mike Galbraith <efault@gmx.de>,
	Peter Zijlstra <a.p.zijlstra@chello.nl>
Subject: [RFC][PATCH 3/3] ipc/sem: Rework wakeup scheme
Date: Wed, 14 Sep 2011 15:30:37 +0200	[thread overview]
Message-ID: <20110914133750.916911903@chello.nl> (raw)
In-Reply-To: 20110914133034.687048806@chello.nl

[-- Attachment #1: sem-wakeup-list.patch --]
[-- Type: text/plain, Size: 13604 bytes --]

The current, convoluted, wakeup scheme exists to avoid two races:

 - if queue.status is set after wake_up_process, then the woken up idle
   thread could race forward and try (and fail) to acquire sma->lock
   before update_queue had a chance to set queue.status
 - if queue.status is written before wake_up_process and if the
   blocked process is woken up by a signal between writing
   queue.status and the wake_up_process, then the woken up
   process could return from semtimedop and die by calling
   sys_exit before wake_up_process is called. Then wake_up_process
   will oops, because the task structure is already invalid.
   (yes, this happened on s390 with sysv msg).

However, we can avoid the latter by keeping a ref on the task struct,
and can thus use the new wake_list infrastructure to issue the wakeups
and set q->status before the wakeup.

This removes the home-brew busy-wait and the requirement to keep
preemption disabled.

Cc: Thomas Gleixner <tglx@linutronix.de>
Cc: Manfred Spraul <manfred@colorfullife.com>
Signed-off-by: Peter Zijlstra <a.p.zijlstra@chello.nl>
---
 ipc/sem.c |  155 ++++++++++++++++++--------------------------------------------
 1 file changed, 45 insertions(+), 110 deletions(-)
Index: linux-2.6/ipc/sem.c
===================================================================
--- linux-2.6.orig/ipc/sem.c
+++ linux-2.6/ipc/sem.c
@@ -60,9 +60,6 @@
  *   anything - not even acquiring a lock or dropping a refcount.
  * - A woken up task may not even touch the semaphore array anymore, it may
  *   have been destroyed already by a semctl(RMID).
- * - The synchronizations between wake-ups due to a timeout/signal and a
- *   wake-up due to a completed semaphore operation is achieved by using an
- *   intermediate state (IN_WAKEUP).
  * - UNDO values are stored in an array (one per process and per
  *   semaphore array, lazily allocated). For backwards compatibility, multiple
  *   modes for the UNDO variables are supported (per process, per thread)
@@ -199,33 +196,18 @@ static inline void sem_rmid(struct ipc_n
  * - queue.status is initialized to -EINTR before blocking.
  * - wakeup is performed by
  *	* unlinking the queue entry from sma->sem_pending
- *	* setting queue.status to IN_WAKEUP
+ *	* adding the q->sleeper to the wake_list
+ *	* setting queue.status to error
  *	  This is the notification for the blocked thread that a
  *	  result value is imminent.
- *	* call wake_up_process
- *	* set queue.status to the final value.
+ *
  * - the previously blocked thread checks queue.status:
- *   	* if it's IN_WAKEUP, then it must wait until the value changes
  *   	* if it's not -EINTR, then the operation was completed by
  *   	  update_queue. semtimedop can return queue.status without
  *   	  performing any operation on the sem array.
  *   	* otherwise it must acquire the spinlock and check what's up.
  *
- * The two-stage algorithm is necessary to protect against the following
- * races:
- * - if queue.status is set after wake_up_process, then the woken up idle
- *   thread could race forward and try (and fail) to acquire sma->lock
- *   before update_queue had a chance to set queue.status
- * - if queue.status is written before wake_up_process and if the
- *   blocked process is woken up by a signal between writing
- *   queue.status and the wake_up_process, then the woken up
- *   process could return from semtimedop and die by calling
- *   sys_exit before wake_up_process is called. Then wake_up_process
- *   will oops, because the task structure is already invalid.
- *   (yes, this happened on s390 with sysv msg).
- *
  */
-#define IN_WAKEUP	1
 
 /**
  * newary - Create a new semaphore set
@@ -406,51 +388,39 @@ static int try_atomic_semop (struct sem_
 	return result;
 }
 
-/** wake_up_sem_queue_prepare(q, error): Prepare wake-up
+/** wake_up_sem_queue_prepare(wake_list, q, error): Prepare wake-up
+ * @wake_list: list to queue the to be woken task on
  * @q: queue entry that must be signaled
  * @error: Error value for the signal
  *
  * Prepare the wake-up of the queue entry q.
  */
-static void wake_up_sem_queue_prepare(struct list_head *pt,
+static void wake_up_sem_queue_prepare(struct wake_list_head *wake_list,
 				struct sem_queue *q, int error)
 {
-	if (list_empty(pt)) {
-		/*
-		 * Hold preempt off so that we don't get preempted and have the
-		 * wakee busy-wait until we're scheduled back on.
-		 */
-		preempt_disable();
-	}
-	q->status = IN_WAKEUP;
-	q->pid = error;
+	struct task_struct *p = ACCESS_ONCE(q->sleeper);
 
-	list_add_tail(&q->simple_list, pt);
+	get_task_struct(p);
+	q->status = error;
+	/*
+	 * implies a full barrier
+	 */
+	wake_list_add(wake_list, p);
+	put_task_struct(p);
 }
 
 /**
- * wake_up_sem_queue_do(pt) - do the actual wake-up
- * @pt: list of tasks to be woken up
+ * wake_up_sem_queue_do(wake_list) - do the actual wake-up
+ * @wake_list: list of tasks to be woken up
  *
  * Do the actual wake-up.
  * The function is called without any locks held, thus the semaphore array
  * could be destroyed already and the tasks can disappear as soon as the
  * status is set to the actual return code.
  */
-static void wake_up_sem_queue_do(struct list_head *pt)
+static void wake_up_sem_queue_do(struct wake_list_head *wake_list)
 {
-	struct sem_queue *q, *t;
-	int did_something;
-
-	did_something = !list_empty(pt);
-	list_for_each_entry_safe(q, t, pt, simple_list) {
-		wake_up_process(q->sleeper);
-		/* q can disappear immediately after writing q->status. */
-		smp_wmb();
-		q->status = q->pid;
-	}
-	if (did_something)
-		preempt_enable();
+	wake_up_list(wake_list, TASK_ALL);
 }
 
 static void unlink_queue(struct sem_array *sma, struct sem_queue *q)
@@ -527,19 +497,20 @@ static int check_restart(struct sem_arra
 
 
 /**
- * update_queue(sma, semnum): Look for tasks that can be completed.
+ * update_queue(sma, semnum, wake_list): Look for tasks that can be completed.
  * @sma: semaphore array.
  * @semnum: semaphore that was modified.
- * @pt: list head for the tasks that must be woken up.
+ * @wake_list: list for the tasks that must be woken up.
  *
  * update_queue must be called after a semaphore in a semaphore array
  * was modified. If multiple semaphore were modified, then @semnum
  * must be set to -1.
- * The tasks that must be woken up are added to @pt. The return code
+ * The tasks that must be woken up are added to @wake_list. The return code
  * is stored in q->pid.
  * The function return 1 if at least one semop was completed successfully.
  */
-static int update_queue(struct sem_array *sma, int semnum, struct list_head *pt)
+static int
+update_queue(struct sem_array *sma, int semnum, struct wake_list_head *wake_list)
 {
 	struct sem_queue *q;
 	struct list_head *walk;
@@ -597,7 +568,7 @@ static int update_queue(struct sem_array
 			restart = check_restart(sma, q);
 		}
 
-		wake_up_sem_queue_prepare(pt, q, error);
+		wake_up_sem_queue_prepare(wake_list, q, error);
 		if (restart)
 			goto again;
 	}
@@ -605,26 +576,26 @@ static int update_queue(struct sem_array
 }
 
 /**
- * do_smart_update(sma, sops, nsops, otime, pt) - optimized update_queue
+ * do_smart_update(sma, sops, nsops, otime, wake_list) - optimized update_queue
  * @sma: semaphore array
  * @sops: operations that were performed
  * @nsops: number of operations
  * @otime: force setting otime
- * @pt: list head of the tasks that must be woken up.
+ * @wake_list: list of the tasks that must be woken up.
  *
  * do_smart_update() does the required called to update_queue, based on the
  * actual changes that were performed on the semaphore array.
  * Note that the function does not do the actual wake-up: the caller is
- * responsible for calling wake_up_sem_queue_do(@pt).
+ * responsible for calling wake_up_sem_queue_do(@wake_list).
  * It is safe to perform this call after dropping all locks.
  */
 static void do_smart_update(struct sem_array *sma, struct sembuf *sops, int nsops,
-			int otime, struct list_head *pt)
+			int otime, struct wake_list_head *wake_list)
 {
 	int i;
 
 	if (sma->complex_count || sops == NULL) {
-		if (update_queue(sma, -1, pt))
+		if (update_queue(sma, -1, wake_list))
 			otime = 1;
 		goto done;
 	}
@@ -633,7 +604,7 @@ static void do_smart_update(struct sem_a
 		if (sops[i].sem_op > 0 ||
 			(sops[i].sem_op < 0 &&
 				sma->sem_base[sops[i].sem_num].semval == 0))
-			if (update_queue(sma, sops[i].sem_num, pt))
+			if (update_queue(sma, sops[i].sem_num, wake_list))
 				otime = 1;
 	}
 done:
@@ -698,7 +669,7 @@ static void freeary(struct ipc_namespace
 	struct sem_undo *un, *tu;
 	struct sem_queue *q, *tq;
 	struct sem_array *sma = container_of(ipcp, struct sem_array, sem_perm);
-	struct list_head tasks;
+	WAKE_LIST(wake_list);
 
 	/* Free the existing undo structures for this semaphore set.  */
 	assert_spin_locked(&sma->sem_perm.lock);
@@ -712,17 +683,16 @@ static void freeary(struct ipc_namespace
 	}
 
 	/* Wake up all pending processes and let them fail with EIDRM. */
-	INIT_LIST_HEAD(&tasks);
 	list_for_each_entry_safe(q, tq, &sma->sem_pending, list) {
 		unlink_queue(sma, q);
-		wake_up_sem_queue_prepare(&tasks, q, -EIDRM);
+		wake_up_sem_queue_prepare(&wake_list, q, -EIDRM);
 	}
 
 	/* Remove the semaphore set from the IDR */
 	sem_rmid(ns, sma);
 	sem_unlock(sma);
 
-	wake_up_sem_queue_do(&tasks);
+	wake_up_sem_queue_do(&wake_list);
 	ns->used_sems -= sma->sem_nsems;
 	security_sem_free(sma);
 	ipc_rcu_putref(sma);
@@ -846,13 +816,12 @@ static int semctl_main(struct ipc_namesp
 	ushort fast_sem_io[SEMMSL_FAST];
 	ushort* sem_io = fast_sem_io;
 	int nsems;
-	struct list_head tasks;
+	WAKE_LIST(wake_list);
 
 	sma = sem_lock_check(ns, semid);
 	if (IS_ERR(sma))
 		return PTR_ERR(sma);
 
-	INIT_LIST_HEAD(&tasks);
 	nsems = sma->sem_nsems;
 
 	err = -EACCES;
@@ -941,7 +910,7 @@ static int semctl_main(struct ipc_namesp
 		}
 		sma->sem_ctime = get_seconds();
 		/* maybe some queued-up processes were waiting for this */
-		do_smart_update(sma, NULL, 0, 0, &tasks);
+		do_smart_update(sma, NULL, 0, 0, &wake_list);
 		err = 0;
 		goto out_unlock;
 	}
@@ -983,14 +952,14 @@ static int semctl_main(struct ipc_namesp
 		curr->sempid = task_tgid_vnr(current);
 		sma->sem_ctime = get_seconds();
 		/* maybe some queued-up processes were waiting for this */
-		do_smart_update(sma, NULL, 0, 0, &tasks);
+		do_smart_update(sma, NULL, 0, 0, &wake_list);
 		err = 0;
 		goto out_unlock;
 	}
 	}
 out_unlock:
 	sem_unlock(sma);
-	wake_up_sem_queue_do(&tasks);
+	wake_up_sem_queue_do(&wake_list);
 
 out_free:
 	if(sem_io != fast_sem_io)
@@ -1255,32 +1224,6 @@ static struct sem_undo *find_alloc_undo(
 }
 
 
-/**
- * get_queue_result - Retrieve the result code from sem_queue
- * @q: Pointer to queue structure
- *
- * Retrieve the return code from the pending queue. If IN_WAKEUP is found in
- * q->status, then we must loop until the value is replaced with the final
- * value: This may happen if a task is woken up by an unrelated event (e.g.
- * signal) and in parallel the task is woken up by another task because it got
- * the requested semaphores.
- *
- * The function can be called with or without holding the semaphore spinlock.
- */
-static int get_queue_result(struct sem_queue *q)
-{
-	int error;
-
-	error = q->status;
-	while (unlikely(error == IN_WAKEUP)) {
-		cpu_relax();
-		error = q->status;
-	}
-
-	return error;
-}
-
-
 SYSCALL_DEFINE4(semtimedop, int, semid, struct sembuf __user *, tsops,
 		unsigned, nsops, const struct timespec __user *, timeout)
 {
@@ -1293,7 +1236,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, 
 	struct sem_queue queue;
 	unsigned long jiffies_left = 0;
 	struct ipc_namespace *ns;
-	struct list_head tasks;
+	WAKE_LIST(wake_list);
 
 	ns = current->nsproxy->ipc_ns;
 
@@ -1342,8 +1285,6 @@ SYSCALL_DEFINE4(semtimedop, int, semid, 
 	} else
 		un = NULL;
 
-	INIT_LIST_HEAD(&tasks);
-
 	sma = sem_lock_check(ns, semid);
 	if (IS_ERR(sma)) {
 		if (un)
@@ -1392,7 +1333,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, 
 	error = try_atomic_semop (sma, sops, nsops, un, task_tgid_vnr(current));
 	if (error <= 0) {
 		if (alter && error == 0)
-			do_smart_update(sma, sops, nsops, 1, &tasks);
+			do_smart_update(sma, sops, nsops, 1, &wake_list);
 
 		goto out_unlock_free;
 	}
@@ -1434,8 +1375,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, 
 	else
 		schedule();
 
-	error = get_queue_result(&queue);
-
+	error = queue.status;
 	if (error != -EINTR) {
 		/* fast path: update_queue already obtained all requested
 		 * resources.
@@ -1450,11 +1390,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, 
 	}
 
 	sma = sem_lock(ns, semid);
-
-	/*
-	 * Wait until it's guaranteed that no wakeup_sem_queue_do() is ongoing.
-	 */
-	error = get_queue_result(&queue);
+	error = queue.status;
 
 	/*
 	 * Array removed? If yes, leave without sem_unlock().
@@ -1484,7 +1420,7 @@ SYSCALL_DEFINE4(semtimedop, int, semid, 
 out_unlock_free:
 	sem_unlock(sma);
 
-	wake_up_sem_queue_do(&tasks);
+	wake_up_sem_queue_do(&wake_list);
 out_free:
 	if(sops != fast_sops)
 		kfree(sops);
@@ -1545,7 +1481,7 @@ void exit_sem(struct task_struct *tsk)
 	for (;;) {
 		struct sem_array *sma;
 		struct sem_undo *un;
-		struct list_head tasks;
+		WAKE_LIST(wake_list);
 		int semid;
 		int i;
 
@@ -1610,10 +1546,9 @@ void exit_sem(struct task_struct *tsk)
 			}
 		}
 		/* maybe some queued-up processes were waiting for this */
-		INIT_LIST_HEAD(&tasks);
-		do_smart_update(sma, NULL, 0, 1, &tasks);
+		do_smart_update(sma, NULL, 0, 1, &wake_list);
 		sem_unlock(sma);
-		wake_up_sem_queue_do(&tasks);
+		wake_up_sem_queue_do(&wake_list);
 
 		kfree_rcu(un, rcu);
 	}



  parent reply	other threads:[~2011-09-14 13:40 UTC|newest]

Thread overview: 33+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2011-09-14 13:30 [RFC][PATCH 0/3] delayed wakeup list Peter Zijlstra
2011-09-14 13:30 ` [RFC][PATCH 1/3] sched: Provide " Peter Zijlstra
2011-09-14 13:50   ` Peter Zijlstra
2011-09-14 14:08   ` Eric Dumazet
2011-09-14 14:12     ` Peter Zijlstra
2011-09-14 15:35   ` Darren Hart
2011-09-14 15:39     ` Peter Zijlstra
2011-09-14 15:49       ` Darren Hart
2011-09-16  7:59   ` Paul Turner
2011-09-16  7:59   ` Paul Turner
2011-09-16  8:48     ` Peter Zijlstra
2011-10-02 14:01   ` Manfred Spraul
2011-10-03 10:23     ` Peter Zijlstra
2011-09-14 13:30 ` [RFC][PATCH 2/3] futex: Reduce hash bucket lock contention Peter Zijlstra
2011-09-14 15:46   ` Darren Hart
2011-09-14 15:51     ` Peter Zijlstra
2011-09-14 16:00       ` Darren Hart
2011-09-14 20:49       ` Thomas Gleixner
2011-09-16 12:34   ` Peter Zijlstra
2011-09-17 12:57     ` Manfred Spraul
2011-09-19  7:37       ` Peter Zijlstra
2011-09-19  8:51         ` Peter Zijlstra
2011-09-14 13:30 ` Peter Zijlstra [this message]
2011-09-15 17:29   ` [RFC][PATCH 3/3] ipc/sem: Rework wakeup scheme Manfred Spraul
2011-09-15 19:32     ` Peter Zijlstra
2011-09-15 19:35     ` Peter Zijlstra
2011-09-15 19:45     ` Peter Zijlstra
2011-09-17 12:36       ` Manfred Spraul
2011-09-16 12:18     ` Peter Zijlstra
2011-09-17 12:32       ` Manfred Spraul
2011-09-16 12:39     ` Peter Zijlstra
2011-09-14 13:51 ` [RFC][PATCH 0/3] delayed wakeup list Eric Dumazet
2011-09-14 13:56   ` Peter Zijlstra

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20110914133750.916911903@chello.nl \
    --to=a.p.zijlstra@chello.nl \
    --cc=davem@davemloft.net \
    --cc=dvhart@linux.intel.com \
    --cc=efault@gmx.de \
    --cc=eric.dumazet@gmail.com \
    --cc=linux-kernel@vger.kernel.org \
    --cc=manfred@colorfullife.com \
    --cc=mingo@elte.hu \
    --cc=rostedt@goodmis.org \
    --cc=tglx@linutronix.de \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox