public inbox for linux-kernel@vger.kernel.org
 help / color / mirror / Atom feed
* [PATCH] ipc/msg: Implement lockless pipelined wakeups
@ 2015-10-30 11:26 Sebastian Andrzej Siewior
  2015-10-30 22:30 ` George Spelvin
                   ` (2 more replies)
  0 siblings, 3 replies; 12+ messages in thread
From: Sebastian Andrzej Siewior @ 2015-10-30 11:26 UTC (permalink / raw)
  To: linux-kernel
  Cc: Sebastian Andrzej Siewior, Davidlohr Bueso, Manfred Spraul,
	Andrew Morton, George Spelvin, Thomas Gleixner, Peter Zijlstra

This patch moves the wakeup_process() invocation so it is not done under
the perm->lock by making use of a lockless wake_q. With this change, the
waiter is woken up once the message has been assigned and it does not
need to loop on SMP if the message points to NULL. In the signal case we
still need to check the pointer under the lock to verify the state.

This change should also avoid the introduction of preempt_disable() in
-RT which avoids a busy-loop which pools for the NULL -> !NULL
change if the waiter has a higher priority compared to the waker.

Cc: Davidlohr Bueso <dave@stgolabs.net>
Cc: Manfred Spraul <manfred@colorfullife.com>
Cc: Andrew Morton <akpm@linux-foundation.org>
Cc: George Spelvin <linux@horizon.com>
Cc: Thomas Gleixner <tglx@linutronix.de>
Cc: Peter Zijlstra <peterz@infradead.org>
Signed-off-by: Sebastian Andrzej Siewior <bigeasy@linutronix.de>
---

This was just compile tested. It would be nice if somone with real
workload could test it. Otherwise I could hack something myself and check
if it still works.

 ipc/msg.c | 101 +++++++++++++++++---------------------------------------------
 1 file changed, 28 insertions(+), 73 deletions(-)

diff --git a/ipc/msg.c b/ipc/msg.c
index 1471db9a7e61..b8c5e7f2bebc 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -183,20 +183,14 @@ static void ss_wakeup(struct list_head *h, int kill)
 	}
 }
 
-static void expunge_all(struct msg_queue *msq, int res)
+static void expunge_all(struct msg_queue *msq, int res,
+			struct wake_q_head *wake_q)
 {
 	struct msg_receiver *msr, *t;
 
 	list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) {
-		msr->r_msg = NULL; /* initialize expunge ordering */
-		wake_up_process(msr->r_tsk);
-		/*
-		 * Ensure that the wakeup is visible before setting r_msg as
-		 * the receiving end depends on it: either spinning on a nil,
-		 * or dealing with -EAGAIN cases. See lockless receive part 1
-		 * and 2 in do_msgrcv().
-		 */
-		smp_wmb(); /* barrier (B) */
+
+		wake_q_add(wake_q, msr->r_tsk);
 		msr->r_msg = ERR_PTR(res);
 	}
 }
@@ -213,11 +207,13 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 {
 	struct msg_msg *msg, *t;
 	struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm);
+	WAKE_Q(wake_q);
 
-	expunge_all(msq, -EIDRM);
+	expunge_all(msq, -EIDRM, &wake_q);
 	ss_wakeup(&msq->q_senders, 1);
 	msg_rmid(ns, msq);
 	ipc_unlock_object(&msq->q_perm);
+	wake_up_q(&wake_q);
 	rcu_read_unlock();
 
 	list_for_each_entry_safe(msg, t, &msq->q_messages, m_list) {
@@ -342,6 +338,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 	struct kern_ipc_perm *ipcp;
 	struct msqid64_ds uninitialized_var(msqid64);
 	struct msg_queue *msq;
+	WAKE_Q(wake_q);
 	int err;
 
 	if (cmd == IPC_SET) {
@@ -389,7 +386,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 		/* sleeping receivers might be excluded by
 		 * stricter permissions.
 		 */
-		expunge_all(msq, -EAGAIN);
+		expunge_all(msq, -EAGAIN, &wake_q);
 		/* sleeping senders might be able to send
 		 * due to a larger queue size.
 		 */
@@ -402,6 +399,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 
 out_unlock0:
 	ipc_unlock_object(&msq->q_perm);
+	wake_up_q(&wake_q);
 out_unlock1:
 	rcu_read_unlock();
 out_up:
@@ -566,7 +564,8 @@ static int testmsg(struct msg_msg *msg, long type, int mode)
 	return 0;
 }
 
-static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
+static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg,
+				 struct wake_q_head *wake_q)
 {
 	struct msg_receiver *msr, *t;
 
@@ -577,27 +576,13 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
 
 			list_del(&msr->r_list);
 			if (msr->r_maxsize < msg->m_ts) {
-				/* initialize pipelined send ordering */
-				msr->r_msg = NULL;
-				wake_up_process(msr->r_tsk);
-				/* barrier (B) see barrier comment below */
-				smp_wmb();
+				wake_q_add(wake_q, msr->r_tsk);
 				msr->r_msg = ERR_PTR(-E2BIG);
 			} else {
-				msr->r_msg = NULL;
 				msq->q_lrpid = task_pid_vnr(msr->r_tsk);
 				msq->q_rtime = get_seconds();
-				wake_up_process(msr->r_tsk);
-				/*
-				 * Ensure that the wakeup is visible before
-				 * setting r_msg, as the receiving can otherwise
-				 * exit - once r_msg is set, the receiver can
-				 * continue. See lockless receive part 1 and 2
-				 * in do_msgrcv(). Barrier (B).
-				 */
-				smp_wmb();
+				wake_q_add(wake_q, msr->r_tsk);
 				msr->r_msg = msg;
-
 				return 1;
 			}
 		}
@@ -613,6 +598,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 	struct msg_msg *msg;
 	int err;
 	struct ipc_namespace *ns;
+	WAKE_Q(wake_q);
 
 	ns = current->nsproxy->ipc_ns;
 
@@ -698,7 +684,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 	msq->q_lspid = task_tgid_vnr(current);
 	msq->q_stime = get_seconds();
 
-	if (!pipelined_send(msq, msg)) {
+	if (!pipelined_send(msq, msg, &wake_q)) {
 		/* no one is waiting for this message, enqueue it */
 		list_add_tail(&msg->m_list, &msq->q_messages);
 		msq->q_cbytes += msgsz;
@@ -712,6 +698,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 
 out_unlock0:
 	ipc_unlock_object(&msq->q_perm);
+	wake_up_q(&wake_q);
 out_unlock1:
 	rcu_read_unlock();
 	if (msg != NULL)
@@ -932,57 +919,25 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
 		rcu_read_lock();
 
 		/* Lockless receive, part 2:
-		 * Wait until pipelined_send or expunge_all are outside of
-		 * wake_up_process(). There is a race with exit(), see
-		 * ipc/mqueue.c for the details. The correct serialization
-		 * ensures that a receiver cannot continue without the wakeup
-		 * being visibible _before_ setting r_msg:
+		 * The work in pipelined_send() and expunge_all():
+		 * - Set pointer to message
+		 * - Queue the receiver task for later wakeup
+		 * - Wake up the process after the lock is dropped.
 		 *
-		 * CPU 0                             CPU 1
-		 * <loop receiver>
-		 *   smp_rmb(); (A) <-- pair -.      <waker thread>
-		 *   <load ->r_msg>           |        msr->r_msg = NULL;
-		 *                            |        wake_up_process();
-		 * <continue>                 `------> smp_wmb(); (B)
-		 *                                     msr->r_msg = msg;
-		 *
-		 * Where (A) orders the message value read and where (B) orders
-		 * the write to the r_msg -- done in both pipelined_send and
-		 * expunge_all.
+		 * Should the process wake up before this wakeup (due to a
+		 * signal) it will either see the message and continue …
 		 */
-		for (;;) {
-			/*
-			 * Pairs with writer barrier in pipelined_send
-			 * or expunge_all.
-			 */
-			smp_rmb(); /* barrier (A) */
-			msg = (struct msg_msg *)msr_d.r_msg;
-			if (msg)
-				break;
 
-			/*
-			 * The cpu_relax() call is a compiler barrier
-			 * which forces everything in this loop to be
-			 * re-loaded.
-			 */
-			cpu_relax();
-		}
-
-		/* Lockless receive, part 3:
-		 * If there is a message or an error then accept it without
-		 * locking.
-		 */
+		msg = (struct msg_msg *)msr_d.r_msg;
 		if (msg != ERR_PTR(-EAGAIN))
 			goto out_unlock1;
 
-		/* Lockless receive, part 3:
-		 * Acquire the queue spinlock.
-		 */
+		 /*
+		  * … or see -EAGAIN, acquire the lock to check the message
+		  * again.
+		  */
 		ipc_lock_object(&msq->q_perm);
 
-		/* Lockless receive, part 4:
-		 * Repeat test after acquiring the spinlock.
-		 */
 		msg = (struct msg_msg *)msr_d.r_msg;
 		if (msg != ERR_PTR(-EAGAIN))
 			goto out_unlock0;
-- 
2.6.2


^ permalink raw reply related	[flat|nested] 12+ messages in thread

* Re: [PATCH] ipc/msg: Implement lockless pipelined wakeups
  2015-10-30 11:26 [PATCH] ipc/msg: Implement lockless pipelined wakeups Sebastian Andrzej Siewior
@ 2015-10-30 22:30 ` George Spelvin
  2015-11-03 14:01   ` Sebastian Andrzej Siewior
  2015-10-31 10:31 ` Thomas Gleixner
  2015-11-01  4:32 ` Manfred Spraul
  2 siblings, 1 reply; 12+ messages in thread
From: George Spelvin @ 2015-10-30 22:30 UTC (permalink / raw)
  To: bigeasy, linux-kernel; +Cc: akpm, dave, linux, manfred, peterz, tglx

I haven't got a test case, but that definitley looks like a great
cleanup!

One very minor kibitz: would you mind using "..." for ellipsis in the
comments rather than U+2026?

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [PATCH] ipc/msg: Implement lockless pipelined wakeups
  2015-10-30 11:26 [PATCH] ipc/msg: Implement lockless pipelined wakeups Sebastian Andrzej Siewior
  2015-10-30 22:30 ` George Spelvin
@ 2015-10-31 10:31 ` Thomas Gleixner
  2015-10-31 13:02   ` George Spelvin
  2015-11-01  4:32 ` Manfred Spraul
  2 siblings, 1 reply; 12+ messages in thread
From: Thomas Gleixner @ 2015-10-31 10:31 UTC (permalink / raw)
  To: Sebastian Andrzej Siewior
  Cc: linux-kernel, Davidlohr Bueso, Manfred Spraul, Andrew Morton,
	George Spelvin, Peter Zijlstra

[-- Attachment #1: Type: TEXT/PLAIN, Size: 4455 bytes --]

On Fri, 30 Oct 2015, Sebastian Andrzej Siewior wrote:
> This was just compile tested. It would be nice if somone with real
> workload could test it. Otherwise I could hack something myself and check
> if it still works.

ltp and glibc should have tests at least at the functional level
 
> -static void expunge_all(struct msg_queue *msq, int res)
> +static void expunge_all(struct msg_queue *msq, int res,
> +			struct wake_q_head *wake_q)
>  {
>  	struct msg_receiver *msr, *t;
>  
>  	list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) {
> -		msr->r_msg = NULL; /* initialize expunge ordering */

Don't we need to keep that NULL init? I might be missing something.

> -		wake_up_process(msr->r_tsk);
> -		/*
> -		 * Ensure that the wakeup is visible before setting r_msg as
> -		 * the receiving end depends on it: either spinning on a nil,
> -		 * or dealing with -EAGAIN cases. See lockless receive part 1
> -		 * and 2 in do_msgrcv().
> -		 */
> -		smp_wmb(); /* barrier (B) */

Please keep the comment intact and fix it as it documents the mechanism
including the barriers. The barrier is now inside of wake_q_add().

> +
> +		wake_q_add(wake_q, msr->r_tsk);
>  		msr->r_msg = ERR_PTR(res);


> -static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
> +static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg,
> +				 struct wake_q_head *wake_q)
>  {
>  	struct msg_receiver *msr, *t;
>  
> @@ -577,27 +576,13 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
>  
>  			list_del(&msr->r_list);
>  			if (msr->r_maxsize < msg->m_ts) {
> -				/* initialize pipelined send ordering */
> -				msr->r_msg = NULL;

See above.

> -				wake_up_process(msr->r_tsk);
> -				/* barrier (B) see barrier comment below */
> -				smp_wmb();

Please do not remove barrier documentation. Update it.

> +				wake_q_add(wake_q, msr->r_tsk);
>  				msr->r_msg = ERR_PTR(-E2BIG);
>  			} else {
> -				msr->r_msg = NULL;
>  				msq->q_lrpid = task_pid_vnr(msr->r_tsk);
>  				msq->q_rtime = get_seconds();
> -				wake_up_process(msr->r_tsk);
> -				/*
> -				 * Ensure that the wakeup is visible before
> -				 * setting r_msg, as the receiving can otherwise
> -				 * exit - once r_msg is set, the receiver can
> -				 * continue. See lockless receive part 1 and 2
> -				 * in do_msgrcv(). Barrier (B).
> -				 */

Ditto.

> -				smp_wmb();
> +				wake_q_add(wake_q, msr->r_tsk);
>  				msr->r_msg = msg;
> -
>  				return 1;
>  			}
>  		}

> @@ -932,57 +919,25 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
>  		rcu_read_lock();
>  
>  		/* Lockless receive, part 2:
> -		 * Wait until pipelined_send or expunge_all are outside of
> -		 * wake_up_process(). There is a race with exit(), see
> -		 * ipc/mqueue.c for the details. The correct serialization
> -		 * ensures that a receiver cannot continue without the wakeup
> -		 * being visibible _before_ setting r_msg:
> +		 * The work in pipelined_send() and expunge_all():
> +		 * - Set pointer to message
> +		 * - Queue the receiver task for later wakeup
> +		 * - Wake up the process after the lock is dropped.
>  		 *
> -		 * CPU 0                             CPU 1
> -		 * <loop receiver>
> -		 *   smp_rmb(); (A) <-- pair -.      <waker thread>
> -		 *   <load ->r_msg>           |        msr->r_msg = NULL;
> -		 *                            |        wake_up_process();
> -		 * <continue>                 `------> smp_wmb(); (B)
> -		 *                                     msr->r_msg = msg;
> -		 *
> -		 * Where (A) orders the message value read and where (B) orders
> -		 * the write to the r_msg -- done in both pipelined_send and
> -		 * expunge_all.
> +		 * Should the process wake up before this wakeup (due to a
> +		 * signal) it will either see the message and continue …

Please update the barrier documentation so it matches the new code.

>  		 */
> -		for (;;) {
> -			/*
> -			 * Pairs with writer barrier in pipelined_send
> -			 * or expunge_all.
> -			 */
> -			smp_rmb(); /* barrier (A) */
> -			msg = (struct msg_msg *)msr_d.r_msg;
> -			if (msg)
> -				break;
>  
> -			/*
> -			 * The cpu_relax() call is a compiler barrier
> -			 * which forces everything in this loop to be
> -			 * re-loaded.
> -			 */
> -			cpu_relax();
> -		}

Are you sure that this is not longer required? If so then please
explain and update the documentation accordingly.

Thanks,

	tglx

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [PATCH] ipc/msg: Implement lockless pipelined wakeups
  2015-10-31 10:31 ` Thomas Gleixner
@ 2015-10-31 13:02   ` George Spelvin
  2015-10-31 15:17     ` Thomas Gleixner
  2015-10-31 18:54     ` Davidlohr Bueso
  0 siblings, 2 replies; 12+ messages in thread
From: George Spelvin @ 2015-10-31 13:02 UTC (permalink / raw)
  To: bigeasy, tglx; +Cc: akpm, dave, linux-kernel, linux, manfred, peterz

> Don't we need to keep that NULL init? I might be missing something.

I wondered the same thing, but on reading it, the cleanup is that he's
gotten rid of the need for the entire thing.  Previously, there was a
mechanism for detecting "wakeup not quite finished" that used a NULL
value, but it's no longer needed.

The resultant busy-waiting on the part of the woken-up task was the
entire problem this patch aims to fix.  So it gets rid of a whole lot
of code and barriers.  And, as you noticed, the comments explaining them.

As the old code explained, the issue is that a task may exit as
soon as r_msg is set, so the wakeup procedure has to be:
- Ensure r_msg is set to NULL (special-case flag)
- Do the wake up
- Set r_msg to the final value

The woken-up task has to spin as long as r_msg is NULL.  Ick.

However, a wake_q keeps a reference to a task, so exiting is
not a danger.  As long as wake_q_add precedes setting r_msg,
all is well.

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [PATCH] ipc/msg: Implement lockless pipelined wakeups
  2015-10-31 13:02   ` George Spelvin
@ 2015-10-31 15:17     ` Thomas Gleixner
  2015-10-31 18:54     ` Davidlohr Bueso
  1 sibling, 0 replies; 12+ messages in thread
From: Thomas Gleixner @ 2015-10-31 15:17 UTC (permalink / raw)
  To: George Spelvin; +Cc: bigeasy, akpm, dave, linux-kernel, manfred, peterz

On Sat, 31 Oct 2015, George Spelvin wrote:
> > Don't we need to keep that NULL init? I might be missing something.
> 
> I wondered the same thing, but on reading it, the cleanup is that he's
> gotten rid of the need for the entire thing.  Previously, there was a
> mechanism for detecting "wakeup not quite finished" that used a NULL
> value, but it's no longer needed.
> 
> The resultant busy-waiting on the part of the woken-up task was the
> entire problem this patch aims to fix.  So it gets rid of a whole lot
> of code and barriers.  And, as you noticed, the comments explaining them.
> 
> As the old code explained, the issue is that a task may exit as
> soon as r_msg is set, so the wakeup procedure has to be:
> - Ensure r_msg is set to NULL (special-case flag)
> - Do the wake up
> - Set r_msg to the final value
> 
> The woken-up task has to spin as long as r_msg is NULL.  Ick.
> 
> However, a wake_q keeps a reference to a task, so exiting is
> not a danger.  As long as wake_q_add precedes setting r_msg,
> all is well.

Right. I figured that after staring some more into it. Though it would
be nice if exactly that explanation is in the code.

Thanks,

	tglx


^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [PATCH] ipc/msg: Implement lockless pipelined wakeups
  2015-10-31 13:02   ` George Spelvin
  2015-10-31 15:17     ` Thomas Gleixner
@ 2015-10-31 18:54     ` Davidlohr Bueso
  2015-10-31 19:06       ` Davidlohr Bueso
  2015-11-03 14:11       ` Sebastian Andrzej Siewior
  1 sibling, 2 replies; 12+ messages in thread
From: Davidlohr Bueso @ 2015-10-31 18:54 UTC (permalink / raw)
  To: George Spelvin; +Cc: bigeasy, tglx, akpm, linux-kernel, manfred, peterz

On Sat, 31 Oct 2015, George Spelvin wrote:

>> Don't we need to keep that NULL init? I might be missing something.
>
>I wondered the same thing, but on reading it, the cleanup is that he's
>gotten rid of the need for the entire thing.  Previously, there was a
>mechanism for detecting "wakeup not quite finished" that used a NULL
>value, but it's no longer needed.
>
>The resultant busy-waiting on the part of the woken-up task was the
>entire problem this patch aims to fix.  So it gets rid of a whole lot
>of code and barriers.  And, as you noticed, the comments explaining them.
>
>As the old code explained, the issue is that a task may exit as
>soon as r_msg is set, so the wakeup procedure has to be:
>- Ensure r_msg is set to NULL (special-case flag)
>- Do the wake up
>- Set r_msg to the final value
>
>The woken-up task has to spin as long as r_msg is NULL.  Ick.

I agree this is a nice cleanup. It is similar to what we did
with posix mqueues, but this one gets rid of even more code afaict.

>However, a wake_q keeps a reference to a task, so exiting is
>not a danger.  As long as wake_q_add precedes setting r_msg,
>all is well.

Yes, and this confirms that we still rely on the implicit barrier
from the cmpxchg as tglx mentioned. As such, we also need to keep
the pairing when reading 'r_msg' in do_msgrcv(), instead of dropping
the comments.

Thanks,
Davidlohr

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [PATCH] ipc/msg: Implement lockless pipelined wakeups
  2015-10-31 18:54     ` Davidlohr Bueso
@ 2015-10-31 19:06       ` Davidlohr Bueso
  2015-11-03 14:15         ` Sebastian Andrzej Siewior
  2015-11-03 14:11       ` Sebastian Andrzej Siewior
  1 sibling, 1 reply; 12+ messages in thread
From: Davidlohr Bueso @ 2015-10-31 19:06 UTC (permalink / raw)
  To: George Spelvin; +Cc: bigeasy, tglx, akpm, linux-kernel, manfred, peterz

On Sat, 31 Oct 2015, Bueso wrote:

>Yes, and this confirms that we still rely on the implicit barrier
>from the cmpxchg as tglx mentioned. As such, we also need to keep
>the pairing when reading 'r_msg' in do_msgrcv(), instead of dropping
>the comments.

Hmm having r_msg as volatile seems even less needed now, we should
drop it. I imagine it was there initially for the busy-wait on the variable
becoming non-nil, but we had cpu_relax _anyway_, so ...

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [PATCH] ipc/msg: Implement lockless pipelined wakeups
  2015-10-30 11:26 [PATCH] ipc/msg: Implement lockless pipelined wakeups Sebastian Andrzej Siewior
  2015-10-30 22:30 ` George Spelvin
  2015-10-31 10:31 ` Thomas Gleixner
@ 2015-11-01  4:32 ` Manfred Spraul
  2015-11-03 14:51   ` Sebastian Andrzej Siewior
  2 siblings, 1 reply; 12+ messages in thread
From: Manfred Spraul @ 2015-11-01  4:32 UTC (permalink / raw)
  To: Sebastian Andrzej Siewior, linux-kernel
  Cc: Davidlohr Bueso, Andrew Morton, George Spelvin, Thomas Gleixner,
	Peter Zijlstra

[-- Attachment #1: Type: text/plain, Size: 901 bytes --]

Hi Sebastian,

On 10/30/2015 12:26 PM, Sebastian Andrzej Siewior wrote:
> This patch moves the wakeup_process() invocation so it is not done under
> the perm->lock by making use of a lockless wake_q. With this change, the
> waiter is woken up once the message has been assigned and it does not
> need to loop on SMP if the message points to NULL. In the signal case we
> still need to check the pointer under the lock to verify the state.
>
> This change should also avoid the introduction of preempt_disable() in
> -RT which avoids a busy-loop which pools for the NULL -> !NULL
> change if the waiter has a higher priority compared to the waker.
with regards to functional tests:
fakeroot is a heavy system V user (at least if it is configured for sysv).
"make -j" under fakeroot was a useful stresstest

with regards to benchmarks:
I've attached one of my files.

Otherwise: Nice!

--
     Manfred


[-- Attachment #2: pmsg-shared.cpp --]
[-- Type: text/x-c++src, Size: 5187 bytes --]

/*
 * pmsg.cpp, parallel sysv msg pingpong
 *
 * Copyright (C) 1999, 2001, 2005, 2008 by Manfred Spraul.
 *	All rights reserved except the rights granted by the GPL.
 *
 * Redistribution of this file is permitted under the terms of the GNU 
 * General Public License (GPL) version 2 or later.
 * $Header$
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <getopt.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <pthread.h>

//////////////////////////////////////////////////////////////////////////////

static enum {
	WAITING,
	RUNNING,
	STOPPED,
} volatile g_state = WAITING;

unsigned long long *g_results;
int g_svmsg_id;
pthread_t *g_threads;

struct taskinfo {
	int svmsg_id;
	int threadid;
	int cpuid;
	int sender;
	int offset;
};

#define DATASIZE	16

void* worker_thread(void *arg)
{
	struct taskinfo *ti = (struct taskinfo*)arg;
	unsigned long long rounds;
	int ret;
	struct {
		long mtype;
		unsigned char buffer[DATASIZE];
	} mbuf;

	{
		cpu_set_t cpus;
		CPU_ZERO(&cpus);
		CPU_SET(ti->cpuid, &cpus);

		ret = pthread_setaffinity_np(g_threads[ti->threadid], sizeof(cpus), &cpus);
		if (ret < 0) {
			printf("pthread_setaffinity_np failed for thread %d with errno %d.\n",
					ti->threadid, errno);
		}

		ret = pthread_getaffinity_np(g_threads[ti->threadid], sizeof(cpus), &cpus);
		if (ret < 0) {
			printf("pthread_getaffinity_np() failed for thread %d with errno %d.\n",
					ti->threadid, errno);
			fflush(stdout);
		} else {
			printf("thread %d: sysvmsg %8d, offset 0x%08x type %d bound to %04lxh\n",ti->threadid,
					ti->svmsg_id, ti->offset, ti->sender, cpus.__bits[0]);
		}
		fflush(stdout);
	}

	rounds = 0;
	while(g_state == WAITING) {
#ifdef __i386__
		__asm__ __volatile__("pause": : :"memory");
#endif
	}

	if (ti->sender) {
		mbuf.mtype = ti->offset+ti->sender+1;
		mbuf.buffer[0] = ti->offset & 0xff;
		mbuf.buffer[1] = (ti->offset >> 8) & 0xff;
		ret = msgsnd(ti->svmsg_id, &mbuf, DATASIZE, 0);
		if (ret != 0) {
			printf("Initial send failed, errno %d.\n", errno);
			exit(1);
		}
	}
	while(g_state == RUNNING) {
		int target = ti->offset+1+!ti->sender;

		ret = msgrcv(ti->svmsg_id, &mbuf, DATASIZE, target, 0);
		if (ret != DATASIZE) {
			if (errno == EIDRM)
				break;
			printf("Error on msgrcv, got %d, errno %d.\n", ret, errno);
			exit(1);
		}
		if ((mbuf.buffer[0] != (unsigned)(ti->offset & 0xff)) || 
				(mbuf.buffer[1] != (unsigned)((ti->offset >> 8) & 0xff))) {
			printf("Error - incorrect message received.\n");
			printf("cpu %d ti->offset 0x%08x ti->sender %d.\n",
					ti->cpuid, ti->offset, ti->sender);

			printf("got %02x%02x.\n",
					(unsigned char)mbuf.buffer[0],
					(unsigned char)mbuf.buffer[1]);
			exit(1);
		}
		mbuf.mtype = ti->offset+ti->sender+1;
		ret = msgsnd(ti->svmsg_id, &mbuf, DATASIZE, 0);
		if (ret != 0) {
			if (errno == EIDRM)
				break;
			printf("send failed, errno %d.\n", errno);
			exit(1);
		}
		rounds++;
	}
	/* store result */
	g_results[ti->threadid] = rounds;

	pthread_exit(0);
	return NULL;
}

void init_threads(int cpu, int cpus)
{
	int ret;
	struct taskinfo *ti1, *ti2;

	ti1 = new (struct taskinfo);
	ti2 = new (struct taskinfo);
	if (!ti1 || !ti2) {
		printf("Could not allocate task info\n");
		exit(1);
	}

	if (cpu == 0) {
		g_svmsg_id = msgget(IPC_PRIVATE,0777|IPC_CREAT);
		if(g_svmsg_id == -1) {
			printf(" message queue create failed.\n");
			exit(1);
		}
	}

	g_results[cpu] = 0;
	g_results[cpu+cpus] = 0;

	ti1->svmsg_id = g_svmsg_id;
	ti1->offset = 3*cpu+5;
	ti1->threadid = cpu;
	ti1->cpuid = cpu;
	ti1->sender = 1;
	ti2->svmsg_id = g_svmsg_id;
	ti2->offset = ti1->offset;
	ti2->threadid = cpu+cpus;
	ti2->cpuid = cpu;
	ti2->sender = 0;

	ret = pthread_create(&g_threads[ti1->threadid], NULL, worker_thread, ti1);
	if (ret) {
		printf(" pthread_create failed with error code %d\n", ret);
		exit(1);
	}
	ret = pthread_create(&g_threads[ti2->threadid], NULL, worker_thread, ti2);
	if (ret) {
		printf(" pthread_create failed with error code %d\n", ret);
		exit(1);
	}
}

//////////////////////////////////////////////////////////////////////////////

int main(int argc, char **argv)
{
	int queues, timeout;
	unsigned long long totals;
	int i;
	int res;

	printf("pmsg [nr queues] [timeout]\n");
	if (argc != 3) {
		printf(" Invalid parameters.\n");
		return 0;
	}
	queues = atoi(argv[1]);
	timeout = atoi(argv[2]);
	printf("Using %d queues/cpus (%d threads) for %d seconds.\n",
			queues, 2*queues, timeout);

	g_results = new unsigned long long[2*queues];
	g_threads = new pthread_t[2*queues];
	for (i=0;i<queues;i++) {
		init_threads(i, queues);
	}

	sleep(1);
	g_state = RUNNING;
	sleep(timeout);
	g_state = STOPPED;
	sleep(1);

	res = msgctl(g_svmsg_id,IPC_RMID,NULL);
	if (res < 0) {
		printf("msgctl(IPC_RMID) failed for %d, errno%d.\n",
			g_svmsg_id, errno);
	}
	for (i=0;i<2*queues;i++)
		pthread_join(g_threads[i], NULL);

	printf("Result matrix:\n");
	totals = 0;
	for (i=0;i<queues;i++) {
		printf("  Thread %3d: %8lld     %3d: %8lld\n",
				i, g_results[i], i+queues, g_results[i+queues]);
		totals += g_results[i] + g_results[i+queues];
	}
	printf("Total: %lld\n", totals);
}

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [PATCH] ipc/msg: Implement lockless pipelined wakeups
  2015-10-30 22:30 ` George Spelvin
@ 2015-11-03 14:01   ` Sebastian Andrzej Siewior
  0 siblings, 0 replies; 12+ messages in thread
From: Sebastian Andrzej Siewior @ 2015-11-03 14:01 UTC (permalink / raw)
  To: George Spelvin, linux-kernel; +Cc: akpm, dave, manfred, peterz, tglx

On 10/30/2015 11:30 PM, George Spelvin wrote:
> One very minor kibitz: would you mind using "..." for ellipsis in the
> comments rather than U+2026?
mkay

Sebastian

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [PATCH] ipc/msg: Implement lockless pipelined wakeups
  2015-10-31 18:54     ` Davidlohr Bueso
  2015-10-31 19:06       ` Davidlohr Bueso
@ 2015-11-03 14:11       ` Sebastian Andrzej Siewior
  1 sibling, 0 replies; 12+ messages in thread
From: Sebastian Andrzej Siewior @ 2015-11-03 14:11 UTC (permalink / raw)
  To: Davidlohr Bueso, George Spelvin; +Cc: tglx, akpm, linux-kernel, manfred, peterz

On 10/31/2015 07:54 PM, Davidlohr Bueso wrote:
>> However, a wake_q keeps a reference to a task, so exiting is
>> not a danger.  As long as wake_q_add precedes setting r_msg,
>> all is well.
> 
> Yes, and this confirms that we still rely on the implicit barrier
> from the cmpxchg as tglx mentioned. As such, we also need to keep
> the pairing when reading 'r_msg' in do_msgrcv(), instead of dropping
> the comments.

Okay, I took the memory-barrier comment from your mqueue patch and
adjusted.

> 
> Thanks,
> Davidlohr

Sebastian

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [PATCH] ipc/msg: Implement lockless pipelined wakeups
  2015-10-31 19:06       ` Davidlohr Bueso
@ 2015-11-03 14:15         ` Sebastian Andrzej Siewior
  0 siblings, 0 replies; 12+ messages in thread
From: Sebastian Andrzej Siewior @ 2015-11-03 14:15 UTC (permalink / raw)
  To: Davidlohr Bueso, George Spelvin; +Cc: tglx, akpm, linux-kernel, manfred, peterz

On 10/31/2015 08:06 PM, Davidlohr Bueso wrote:
> On Sat, 31 Oct 2015, Bueso wrote:
> 
>> Yes, and this confirms that we still rely on the implicit barrier
>> from the cmpxchg as tglx mentioned. As such, we also need to keep
>> the pairing when reading 'r_msg' in do_msgrcv(), instead of dropping
>> the comments.
> 
> Hmm having r_msg as volatile seems even less needed now, we should
> drop it. I imagine it was there initially for the busy-wait on the variable
> becoming non-nil, but we had cpu_relax _anyway_, so ...

Didn't noticed that. So that volatile part and the casts are gone, too.

Sebastian

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [PATCH] ipc/msg: Implement lockless pipelined wakeups
  2015-11-01  4:32 ` Manfred Spraul
@ 2015-11-03 14:51   ` Sebastian Andrzej Siewior
  0 siblings, 0 replies; 12+ messages in thread
From: Sebastian Andrzej Siewior @ 2015-11-03 14:51 UTC (permalink / raw)
  To: Manfred Spraul, linux-kernel
  Cc: Davidlohr Bueso, Andrew Morton, George Spelvin, Thomas Gleixner,
	Peter Zijlstra

On 11/01/2015 05:32 AM, Manfred Spraul wrote:
> Hi Sebastian,

Hi Manfred,

> with regards to functional tests:
> fakeroot is a heavy system V user (at least if it is configured for sysv).
> "make -j" under fakeroot was a useful stresstest

I don't see the msgsnd/msgrcv syscalls here. I do see others, sem
related for instance….

> with regards to benchmarks:
> I've attached one of my files.

Thank you. On 'AMD A10-7800 Radeon R7, 12 Compute Cores 4C+8G' I have
here:

test             |  before    |   after    | diff
-----------------|------------|-------------
pmsg-shared 8 60 | 52,386,363 | 56,455,763 | + ~7.77%
pmsg-shared 2 60 | 69,200,531 | 73,346,420 | + ~5.99%

so it improved a little :)

> Otherwise: Nice!
> 
> -- 
>     Manfred
> 

Sebastian

^ permalink raw reply	[flat|nested] 12+ messages in thread

end of thread, other threads:[~2015-11-03 14:52 UTC | newest]

Thread overview: 12+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2015-10-30 11:26 [PATCH] ipc/msg: Implement lockless pipelined wakeups Sebastian Andrzej Siewior
2015-10-30 22:30 ` George Spelvin
2015-11-03 14:01   ` Sebastian Andrzej Siewior
2015-10-31 10:31 ` Thomas Gleixner
2015-10-31 13:02   ` George Spelvin
2015-10-31 15:17     ` Thomas Gleixner
2015-10-31 18:54     ` Davidlohr Bueso
2015-10-31 19:06       ` Davidlohr Bueso
2015-11-03 14:15         ` Sebastian Andrzej Siewior
2015-11-03 14:11       ` Sebastian Andrzej Siewior
2015-11-01  4:32 ` Manfred Spraul
2015-11-03 14:51   ` Sebastian Andrzej Siewior

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox