All of lore.kernel.org
 help / color / mirror / Atom feed
From: Fam Zheng <famz@redhat.com>
To: Paolo Bonzini <pbonzini@redhat.com>
Cc: qemu-devel@nongnu.org, stefanha@redhat.com
Subject: Re: [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux
Date: Thu, 12 Jan 2017 21:34:45 +0800	[thread overview]
Message-ID: <20170112133445.GE26504@lemon> (raw)
In-Reply-To: <20170104132625.28059-5-pbonzini@redhat.com>

On Wed, 01/04 14:26, Paolo Bonzini wrote:
> diff --git a/include/qemu/futex.h b/include/qemu/futex.h
> new file mode 100644
> index 0000000..852d612
> --- /dev/null
> +++ b/include/qemu/futex.h
> @@ -0,0 +1,36 @@
> +/*
> + * Wrappers around Linux futex syscall
> + *
> + * Copyright Red Hat, Inc. 2015

2015 - 2017, too?

> + *
> + * Author:
> + *  Paolo Bonzini <pbonzini@redhat.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + *
> + */
> +
> +#include <sys/syscall.h>
> +#include <linux/futex.h>
> +
> +#define qemu_futex(...)              syscall(__NR_futex, __VA_ARGS__)
> +
> +static inline void qemu_futex_wake(void *f, int n)
> +{
> +    qemu_futex(f, FUTEX_WAKE, n, NULL, NULL, 0);
> +}
> +
> +static inline void qemu_futex_wait(void *f, unsigned val)
> +{
> +    while (qemu_futex(f, FUTEX_WAIT, (int) val, NULL, NULL, 0)) {
> +        switch (errno) {
> +        case EWOULDBLOCK:
> +            return;
> +        case EINTR:
> +            break; /* get out of switch and retry */
> +        default:
> +            abort();
> +        }
> +    }
> +}
> diff --git a/util/lockcnt.c b/util/lockcnt.c
> index 78ed1e4..40cc02a 100644
> --- a/util/lockcnt.c
> +++ b/util/lockcnt.c
> @@ -9,7 +9,288 @@
>  #include "qemu/osdep.h"
>  #include "qemu/thread.h"
>  #include "qemu/atomic.h"
> +#include "trace.h"
>  
> +#ifdef CONFIG_LINUX
> +#include "qemu/futex.h"
> +
> +/* On Linux, bits 0-1 are a futex-based lock, bits 2-31 are the counter.
> + * For the mutex algorithm see Ulrich Drepper's "Futexes Are Tricky" (ok,
> + * this is not the most relaxing citation I could make...).  It is similar
> + * to mutex2 in the paper.
> + */
> +
> +#define QEMU_LOCKCNT_STATE_MASK    3
> +#define QEMU_LOCKCNT_STATE_FREE    0
> +#define QEMU_LOCKCNT_STATE_LOCKED  1

I find the macro names a bit incomplete in describing the semantics but maybe
you want to limit the length, making it harder to understand the mutex
implementation without reading the paper. How about adding a comment saying
"locked" is "locked but _not waited_" and "waiting" is "_locked_ and waited"?
It's up to you, because this is trivial compared to the real complexity of this
patch. :)

> +#define QEMU_LOCKCNT_STATE_WAITING 2
> +
> +#define QEMU_LOCKCNT_COUNT_STEP    4
> +#define QEMU_LOCKCNT_COUNT_SHIFT   2
> +
> +void qemu_lockcnt_init(QemuLockCnt *lockcnt)
> +{
> +    lockcnt->count = 0;
> +}
> +
> +void qemu_lockcnt_destroy(QemuLockCnt *lockcnt)
> +{
> +}
> +
> +/* *val is the current value of lockcnt->count.
> + *
> + * If the lock is free, try a cmpxchg from *val to new_if_free; return
> + * true and set *val to the old value found by the cmpxchg in
> + * lockcnt->count.
> + *
> + * If the lock is taken, wait for it to be released and return false
> + * *without trying again to take the lock*.  Again, set *val to the
> + * new value of lockcnt->count.
> + *
> + * new_if_free's bottom two bits must not be QEMU_LOCKCNT_STATE_LOCKED
> + * if calling this function a second time after it has returned
> + * false.

"and waited"? I think it is possible this function return false with the lock
actually being free, when ...

> + */
> +static bool qemu_lockcnt_cmpxchg_or_wait(QemuLockCnt *lockcnt, int *val,
> +                                         int new_if_free, bool *waited)
> +{
> +    /* Fast path for when the lock is free.  */
> +    if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_FREE) {
> +        int expected = *val;
> +
> +        trace_lockcnt_fast_path_attempt(lockcnt, expected, new_if_free);
> +        *val = atomic_cmpxchg(&lockcnt->count, expected, new_if_free);
> +        if (*val == expected) {
> +            trace_lockcnt_fast_path_success(lockcnt, expected, new_if_free);
> +            *val = new_if_free;
> +            return true;
> +        }
> +    }
> +
> +    /* The slow path moves from locked to waiting if necessary, then
> +     * does a futex wait.  Both steps can be repeated ad nauseam,
> +     * only getting out of the loop if we can have another shot at the
> +     * fast path.  Once we can, get out to compute the new destination
> +     * value for the fast path.
> +     */
> +    while ((*val & QEMU_LOCKCNT_STATE_MASK) != QEMU_LOCKCNT_STATE_FREE) {
> +        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_LOCKED) {
> +            int expected = *val;
> +            int new = expected - QEMU_LOCKCNT_STATE_LOCKED + QEMU_LOCKCNT_STATE_WAITING;
> +
> +            trace_lockcnt_futex_wait_prepare(lockcnt, expected, new);

... the holder thread releases the lock at this point. In this case a second
call to this function in qemu_lockcnt_dec_and_lock does pass
QEMU_LOCKCNT_STATE_LOCKED in new_if_free, because 'waited' is left false there.
The code is okay, but the comment above is too strict.

> +            *val = atomic_cmpxchg(&lockcnt->count, expected, new);
> +            if (*val == expected) {
> +                *val = new;
> +            }
> +            continue;
> +        }
> +
> +        if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_WAITING) {
> +            *waited = true;
> +            trace_lockcnt_futex_wait(lockcnt, *val);
> +            qemu_futex_wait(&lockcnt->count, *val);
> +            *val = atomic_read(&lockcnt->count);
> +            trace_lockcnt_futex_wait_resume(lockcnt, *val);
> +            continue;
> +        }
> +
> +        abort();
> +    }
> +    return false;
> +}
> +
> +void qemu_lockcnt_inc(QemuLockCnt *lockcnt)
> +{
> +    int val = atomic_read(&lockcnt->count);
> +    bool waited = false;
> +
> +    for (;;) {
> +        if (val >= QEMU_LOCKCNT_COUNT_STEP) {
> +            int expected = val;
> +            val = atomic_cmpxchg(&lockcnt->count, val, val + QEMU_LOCKCNT_COUNT_STEP);
> +            if (val == expected) {
> +                break;
> +            }
> +        } else {
> +            /* The fast path is (0, unlocked)->(1, unlocked).  */
> +            if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, QEMU_LOCKCNT_COUNT_STEP,
> +                                             &waited)) {
> +                break;
> +            }
> +        }
> +    }
> +
> +    /* If we were woken by another thread, we should also wake one because
> +     * we are effectively releasing the lock that was given to us.  This is
> +     * the case where qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING
> +     * in the low bits, and qemu_lockcnt_inc_and_unlock would find it and
> +     * wake someone.
> +     */
> +    if (waited) {
> +        lockcnt_wake(lockcnt);
> +    }
> +}
> +
> +/* Decrement a counter, and return locked if it is decremented to zero.
> + * If the function returns true, it is impossible for the counter to
> + * become nonzero until the next qemu_lockcnt_unlock.
> + */
> +bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt)
> +{
> +    int val = atomic_read(&lockcnt->count);
> +    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
> +    bool waited = false;
> +
> +    for (;;) {
> +        if (val >= 2 * QEMU_LOCKCNT_COUNT_STEP) {
> +            int expected = val;
> +            int new = val - QEMU_LOCKCNT_COUNT_STEP;
> +            val = atomic_cmpxchg(&lockcnt->count, val, new);
> +            if (val == expected) {
> +                break;
> +            }

If (val != expected && val >= 2 * QEMU_LOCKCNT_COUNT_STEP), should this
atomic_cmpxchg be retried before trying qemu_lockcnt_cmpxchg_or_wait?

> +        }
> +
> +        /* If count is going 1->0, take the lock. The fast path is
> +         * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
> +         */
> +        if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
> +            return true;
> +        }
> +
> +        if (waited) {
> +            /* At this point we do not know if there are more waiters.  Assume
> +             * there are.
> +             */
> +            locked_state = QEMU_LOCKCNT_STATE_WAITING;
> +        }
> +    }
> +
> +    /* If we were woken by another thread, but we're returning in unlocked
> +     * state, we should also wake a thread because we are effectively
> +     * releasing the lock that was given to us.  This is the case where
> +     * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
> +     * bits, and qemu_lockcnt_unlock would find it and wake someone.
> +     */
> +    if (waited) {
> +        lockcnt_wake(lockcnt);
> +    }
> +    return false;
> +}
> +
> +/* If the counter is one, decrement it and return locked.  Otherwise do
> + * nothing.
> + *
> + * If the function returns true, it is impossible for the counter to
> + * become nonzero until the next qemu_lockcnt_unlock.
> + */
> +bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt)
> +{
> +    int val = atomic_read(&lockcnt->count);
> +    int locked_state = QEMU_LOCKCNT_STATE_LOCKED;
> +    bool waited = false;
> +
> +    while (val < 2 * QEMU_LOCKCNT_COUNT_STEP) {
> +        /* If count is going 1->0, take the lock. The fast path is
> +         * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting).
> +         */
> +        if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) {
> +            return true;
> +        }
> +
> +        if (waited) {
> +            /* At this point we do not know if there are more waiters.  Assume
> +             * there are.
> +             */
> +            locked_state = QEMU_LOCKCNT_STATE_WAITING;
> +        }
> +    }
> +
> +    /* If we were woken by another thread, but we're returning in unlocked
> +     * state, we should also wake a thread because we are effectively
> +     * releasing the lock that was given to us.  This is the case where
> +     * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low
> +     * bits, and qemu_lockcnt_inc_and_unlock would find it and wake someone.
> +     */
> +    if (waited) {
> +        lockcnt_wake(lockcnt);
> +    }
> +    return false;
> +}
> +
> +void qemu_lockcnt_lock(QemuLockCnt *lockcnt)
> +{
> +    int val = atomic_read(&lockcnt->count);
> +    int step = QEMU_LOCKCNT_STATE_LOCKED;
> +    bool waited = false;
> +
> +    /* The third argument is only used if the low bits of val are 0
> +     * (QEMU_LOCKCNT_STATE_FREE), so just blindly mix in the desired
> +     * state.
> +     */
> +    while (!qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, val + step, &waited)) {
> +        if (waited) {
> +            /* At this point we do not know if there are more waiters.  Assume
> +             * there are.
> +             */
> +            step = QEMU_LOCKCNT_STATE_WAITING;
> +        }
> +    }
> +}
> +
> +void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt)
> +{
> +    int expected, new, val;
> +
> +    val = atomic_read(&lockcnt->count);
> +    do {
> +        expected = val;
> +        new = (val + QEMU_LOCKCNT_COUNT_STEP) & ~QEMU_LOCKCNT_STATE_MASK;
> +        trace_lockcnt_unlock_attempt(lockcnt, val, new);
> +        val = atomic_cmpxchg(&lockcnt->count, val, new);
> +    } while (val != expected);
> +
> +    trace_lockcnt_unlock_success(lockcnt, val, new);
> +    if (val & QEMU_LOCKCNT_STATE_WAITING) {
> +        lockcnt_wake(lockcnt);
> +    }
> +}

Fam

  parent reply	other threads:[~2017-01-12 13:34 UTC|newest]

Thread overview: 28+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2017-01-04 13:26 [Qemu-devel] [PATCH v3 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
2017-01-04 13:26 ` [Qemu-devel] [PATCH 01/10] aio: rename bh_lock to list_lock Paolo Bonzini
2017-01-04 13:26 ` [Qemu-devel] [PATCH 02/10] qemu-thread: introduce QemuLockCnt Paolo Bonzini
2017-01-11 15:48   ` Fam Zheng
2017-01-11 16:09     ` Paolo Bonzini
2017-01-11 16:35   ` Stefan Hajnoczi
2017-01-11 16:51     ` Paolo Bonzini
2017-01-11 16:56   ` Stefan Hajnoczi
2017-01-04 13:26 ` [Qemu-devel] [PATCH 03/10] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh Paolo Bonzini
2017-01-04 13:26 ` [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini
2017-01-11 16:50   ` Stefan Hajnoczi
2017-01-11 16:52     ` Paolo Bonzini
2017-01-12 13:34   ` Fam Zheng [this message]
2017-01-12 15:40     ` Paolo Bonzini
2017-01-04 13:26 ` [Qemu-devel] [PATCH 05/10] aio-posix: split aio_dispatch_handlers out of aio_dispatch Paolo Bonzini
2017-01-11 16:57   ` Stefan Hajnoczi
2017-01-04 13:26 ` [Qemu-devel] [PATCH 06/10] aio: tweak walking in dispatch phase Paolo Bonzini
2017-01-04 13:26 ` [Qemu-devel] [PATCH 07/10] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock Paolo Bonzini
2017-01-11 17:01   ` Stefan Hajnoczi
2017-01-04 13:26 ` [Qemu-devel] [PATCH 08/10] aio-win32: " Paolo Bonzini
2017-01-04 13:26 ` [Qemu-devel] [PATCH 09/10] aio: document locking Paolo Bonzini
2017-01-04 13:26 ` [Qemu-devel] [PATCH 10/10] async: optimize aio_bh_poll Paolo Bonzini
2017-01-10 22:22 ` [Qemu-devel] [PATCH v3 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
  -- strict thread matches above, loose matches on Subject: below --
2017-01-12 18:07 [Qemu-devel] [PATCH v5 " Paolo Bonzini
2017-01-12 18:07 ` [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini
2017-01-12 16:55 [Qemu-devel] [PATCH v4 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
2017-01-12 16:55 ` [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini
2016-12-21 14:03 [Qemu-devel] [PATCH v2 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
2016-12-21 14:03 ` [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini
2016-11-29 11:46 [Qemu-devel] [PATCH for-2.9 00/10] aio_context_acquire/release pushdown, part 1 Paolo Bonzini
2016-11-29 11:47 ` [Qemu-devel] [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux Paolo Bonzini
2016-11-30 13:19   ` Stefan Hajnoczi

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20170112133445.GE26504@lemon \
    --to=famz@redhat.com \
    --cc=pbonzini@redhat.com \
    --cc=qemu-devel@nongnu.org \
    --cc=stefanha@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.