All of lore.kernel.org
 help / color / mirror / Atom feed
From: Jerin Jacob <jerin.jacob@caviumnetworks.com>
To: Jia He <hejianet@gmail.com>
Cc: dev@dpdk.org, olivier.matz@6wind.com,
	konstantin.ananyev@intel.com, bruce.richardson@intel.com,
	jianbo.liu@arm.com, hemant.agrawal@nxp.com,
	jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com,
	jia.he@hxt-semitech.com
Subject: Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when doing
Date: Tue, 7 Nov 2017 15:27:29 +0530	[thread overview]
Message-ID: <20171107095727.GA23010@jerin> (raw)
In-Reply-To: <c2ce8774-a1b6-edf6-444e-ee0981df7497@gmail.com>

-----Original Message-----
> Date: Tue, 7 Nov 2017 16:34:30 +0800
> From: Jia He <hejianet@gmail.com>
> To: Jerin Jacob <jerin.jacob@caviumnetworks.com>
> Cc: dev@dpdk.org, olivier.matz@6wind.com, konstantin.ananyev@intel.com,
>  bruce.richardson@intel.com, jianbo.liu@arm.com, hemant.agrawal@nxp.com,
>  jie2.liu@hxt-semitech.com, bing.zhao@hxt-semitech.com,
>  jia.he@hxt-semitech.com
> Subject: Re: [PATCH v2] ring: guarantee ordering of cons/prod loading when
>  doing
> User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101
>  Thunderbird/52.4.0
> 
> 
> 
> On 11/7/2017 12:36 PM, Jerin Jacob Wrote:
> > -----Original Message-----
> > 
> > On option could be to change the prototype of update_tail() and make
> > compiler accommodate it for zero cost for arm64(Which I think, it it the
> > case. But you can check the generated instructions)
> > If not, move, __rte_ring_do_dequeue() and __rte_ring_do_enqueue() instead of
> > __rte_ring_move_prod_head/__rte_ring_move_cons_head/update_tail()
> > 
> > 
> > ➜ [master][dpdk.org] $ git diff
> > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h
> > index 5e9b3b7b4..b32648825 100644
> > --- a/lib/librte_ring/rte_ring.h
> > +++ b/lib/librte_ring/rte_ring.h
> > @@ -358,8 +358,12 @@ void rte_ring_dump(FILE *f, const struct rte_ring
> > *r);
> >   static __rte_always_inline void
> >   update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
> > new_val,
> > -               uint32_t single)
> > +               uint32_t single, const uint32_t enqueue)
> >   {
> > +       if (enqueue)
> > +               rte_smp_wmb();
> > +       else
> > +               rte_smp_rmb();
> >          /*
> >           * If there are other enqueues/dequeues in progress that
> >           * preceded us,
> >           * we need to wait for them to complete
> > @@ -470,9 +474,8 @@ __rte_ring_do_enqueue(struct rte_ring *r, void *
> > const *obj_table,
> >                  goto end;
> >          ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
> > -       rte_smp_wmb();
> > -       update_tail(&r->prod, prod_head, prod_next, is_sp);
> > +       update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
> >   end:
> >          if (free_space != NULL)
> >                  *free_space = free_entries - n;
> > @@ -575,9 +578,8 @@ __rte_ring_do_dequeue(struct rte_ring *r, void
> > **obj_table,
> >                  goto end;
> >          DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
> > -       rte_smp_rmb();
> > -       update_tail(&r->cons, cons_head, cons_next, is_sc);
> > +       update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
> >   end:
> >          if (available != NULL)
> > 
> > 
> > 
> Hi Jerin, yes I knew this suggestion in update_tail.
> But what I mean is the rte_smp_rmb() in __rte_ring_move_cons_head and
> __rte_ring_move_pros_head:
> [option 1]
> +        *old_head = r->cons.head;
> +        rte_smp_rmb();
> +        const uint32_t prod_tail = r->prod.tail;
> 
> [option 2]
> +        *old_head = __atomic_load_n(&r->cons.head,
> +                    __ATOMIC_ACQUIRE);
> +        *old_head = r->cons.head;
> 
> ie.I wonder what is the suitable new config name to distinguish the above 2
> options?

Why?
If you fix the generic version with rte_smp_rmb() then we just need only
one config to differentiate between c11 vs generic. See comments below,

> Thanks for the patience :-)
> 
> see my drafted patch below, the marcro "PREFER":
> + */
> +
> +#ifndef _RTE_RING_C11_MEM_H_
> +#define _RTE_RING_C11_MEM_H_
> +
> +static __rte_always_inline void
> +update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
> new_val,
> +        uint32_t single, uint32_t enqueue)
> +{
> +    /* Don't need wmb/rmb when we prefer to use load_acquire/
> +     * store_release barrier */
> +#ifndef PREFER
> +    if (enqueue)
> +        rte_smp_wmb();
> +    else
> +        rte_smp_rmb();
> +#endif

You can remove PREFER and let the "generic" version has this. For x86,
rte_smp_?mb() it will be NOOP. So no issue.

> +
> +    /*
> +     * If there are other enqueues/dequeues in progress that preceded us,
> +     * we need to wait for them to complete
> +     */
> +    if (!single)
> +        while (unlikely(ht->tail != old_val))
> +            rte_pause();
> +
> +#ifdef PREFER
> +    __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);

for c11 mem model version, it needs only __atomic_store_n version.

> +#else
> +    ht->tail = new_val;
> +#endif
> +}
> +
> +/**
> + * @internal This function updates the producer head for enqueue
> + *
> + * @param r
> + *   A pointer to the ring structure
> + * @param is_sp
> + *   Indicates whether multi-producer path is needed or not
> + * @param n
> + *   The number of elements we will want to enqueue, i.e. how far should
> the
> + *   head be moved
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
> + * @param old_head
> + *   Returns head value as it was before the move, i.e. where enqueue
> starts
> + * @param new_head
> + *   Returns the current/new head value i.e. where enqueue finishes
> + * @param free_entries
> + *   Returns the amount of free space in the ring BEFORE head was moved
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> +        unsigned int n, enum rte_ring_queue_behavior behavior,
> +        uint32_t *old_head, uint32_t *new_head,
> +        uint32_t *free_entries)
> +{
> +    const uint32_t capacity = r->capacity;
> +    unsigned int max = n;
> +    int success;
> +
> +    do {
> +        /* Reset n to the initial burst count */
> +        n = max;
> +
> +#ifdef PREFER
> +        *old_head = __atomic_load_n(&r->prod.head,
> +                    __ATOMIC_ACQUIRE);
> +#else
> +        *old_head = r->prod.head;
> +        /* prevent reorder of load/load */
> +        rte_smp_rmb();
> +#endif

Same as above comment.

> +        const uint32_t cons_tail = r->cons.tail;
> +        /*
> +         *  The subtraction is done between two unsigned 32bits value
> +         * (the result is always modulo 32 bits even if we have
> +         * *old_head > cons_tail). So 'free_entries' is always between 0
> +         * and capacity (which is < size).
> +         */
> +        *free_entries = (capacity + cons_tail - *old_head);
> +
> +        /* check that we have enough room in ring */
> +        if (unlikely(n > *free_entries))

> +static __rte_always_inline unsigned int
> +__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
> +         unsigned int n, enum rte_ring_queue_behavior behavior,
> +         int is_sp, unsigned int *free_space)
> +{


Duplicate function, No need to replicate on both versions.


> +static __rte_always_inline unsigned int
> +__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
> +        unsigned int n, enum rte_ring_queue_behavior behavior,
> +        uint32_t *old_head, uint32_t *new_head,
> +        uint32_t *entries)
> +{
> +    unsigned int max = n;
> +    int success;
> +
> +    /* move cons.head atomically */
> +    do {
> +        /* Restore n as it may change every loop */
> +        n = max;
> +#ifdef PREFER
> +        *old_head = __atomic_load_n(&r->cons.head,
> +                    __ATOMIC_ACQUIRE);
> +#else
> +        *old_head = r->cons.head;
> +        /*  prevent reorder of load/load */
> +        rte_smp_rmb();
> +#endif

Same as above comment

> +
> +        const uint32_t prod_tail = r->prod.tail;
> +        /* The subtraction is done between two unsigned 32bits value
> +         * (the result is always modulo 32 bits even if we have
> +         * cons_head > prod_tail). So 'entries' is always between 0
> +         * and size(ring)-1. */
> +        *entries = (prod_tail - *old_head);
> +
> +        /* Set the actual entries for dequeue */
> +        if (n > *entries)
> +            n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
> +
> +        if (unlikely(n == 0))
> +            return 0;
> +
> +        *new_head = *old_head + n;
> +        if (is_sc)
> +            r->cons.head = *new_head, success = 1;
> +        else
> +#ifdef PREFER
> +            success = arch_rte_atomic32_cmpset(&r->cons.head,
> +                            old_head, *new_head,
> +                            0, __ATOMIC_ACQUIRE,
> +                            __ATOMIC_RELAXED);
> +#else
> +            success = rte_atomic32_cmpset(&r->cons.head, *old_head,
> +                    *new_head);
> +#endif

Same as above comment

> +    } while (unlikely(success == 0));
> +    return n;
> +}
> +
> +/**
> + * @internal Dequeue several objects from the ring
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
> + * @param is_sc
> + *   Indicates whether to use single consumer or multi-consumer head update
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue has
> finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
> +         unsigned int n, enum rte_ring_queue_behavior behavior,
> +         int is_sc, unsigned int *available)
> +{

Duplicate function, No need to replicate on both versions.


> +    uint32_t cons_head, cons_next;
> +    uint32_t entries;
> +
> +    n = __rte_ring_move_cons_head(r, is_sc, n, behavior,
> +            &cons_head, &cons_next, &entries);
> +    if (n == 0)
> +        goto end;
> +
> +    DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
> +
> +    update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
> +
> +end:
> +    if (available != NULL)
> +        *available = entries - n;
> +    return n;
> +}
> +
> +#endif /* _RTE_RING_C11_MEM_H_ */
> +
> diff --git a/lib/librte_ring/rte_ring_generic.h
> b/lib/librte_ring/rte_ring_generic.h
> new file mode 100644
> index 0000000..0ce6d57
> --- /dev/null
> +++ b/lib/librte_ring/rte_ring_generic.h
> @@ -0,0 +1,268 @@
> +/*-
> + *   BSD LICENSE
> + *
> + *   Copyright(c) 2017 hxt-semitech. All rights reserved.
> + *
> + *   Redistribution and use in source and binary forms, with or without
> + *   modification, are permitted provided that the following conditions
> + *   are met:
> + *
> + *     * Redistributions of source code must retain the above copyright
> + *       notice, this list of conditions and the following disclaimer.
> + *     * Redistributions in binary form must reproduce the above copyright
> + *       notice, this list of conditions and the following disclaimer in
> + *       the documentation and/or other materials provided with the
> + *       distribution.
> + *     * Neither the name of hxt-semitech nor the names of its
> + *       contributors may be used to endorse or promote products derived
> + *       from this software without specific prior written permission.
> + *
> + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
> + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
> + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
> + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
> + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
> + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
> + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
> + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
> + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> + */
> +
> +#ifndef _RTE_RING_GENERIC_H_
> +#define _RTE_RING_GENERIC_H_
> +
> +static __rte_always_inline void
> +update_tail(struct rte_ring_headtail *ht, uint32_t old_val, uint32_t
> new_val,
> +        uint32_t single, uint32_t enqueue)
> +{
> +    if (enqueue)
> +        rte_smp_wmb();
> +    else
> +        rte_smp_rmb();
> +    /*
> +     * If there are other enqueues/dequeues in progress that preceded us,
> +     * we need to wait for them to complete
> +     */
> +    if (!single)
> +        while (unlikely(ht->tail != old_val))
> +            rte_pause();
> +
> +    ht->tail = new_val;
> +}
> +
> +/**
> + * @internal This function updates the producer head for enqueue
> + *
> + * @param r
> + *   A pointer to the ring structure
> + * @param is_sp
> + *   Indicates whether multi-producer path is needed or not
> + * @param n
> + *   The number of elements we will want to enqueue, i.e. how far should
> the
> + *   head be moved
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
> + * @param old_head
> + *   Returns head value as it was before the move, i.e. where enqueue
> starts
> + * @param new_head
> + *   Returns the current/new head value i.e. where enqueue finishes
> + * @param free_entries
> + *   Returns the amount of free space in the ring BEFORE head was moved
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_move_prod_head(struct rte_ring *r, int is_sp,
> +        unsigned int n, enum rte_ring_queue_behavior behavior,
> +        uint32_t *old_head, uint32_t *new_head,
> +        uint32_t *free_entries)
> +{
> +    const uint32_t capacity = r->capacity;
> +    unsigned int max = n;
> +    int success;
> +
> +    do {
> +        /* Reset n to the initial burst count */
> +        n = max;
> +
> +        *old_head = r->prod.head;

adding rte_smp_rmb() no harm here as it is NOOP for x86 and it is
semantically correct too.


> +        const uint32_t cons_tail = r->cons.tail;
> +        /*
> +         *  The subtraction is done between two unsigned 32bits value
> +         * (the result is always modulo 32 bits even if we have
> +         * *old_head > cons_tail). So 'free_entries' is always between 0
> +         * and capacity (which is < size).
> +         */
> +        *free_entries = (capacity + cons_tail - *old_head);
> +
> +        /* check that we have enough room in ring */
> +        if (unlikely(n > *free_entries))
> +            n = (behavior == RTE_RING_QUEUE_FIXED) ?
> +                    0 : *free_entries;
> +
> +        if (n == 0)
> +            return 0;
> +
> +        *new_head = *old_head + n;
> +        if (is_sp)
> +            r->prod.head = *new_head, success = 1;
> +        else
> +            success = rte_atomic32_cmpset(&r->prod.head,
> +                    *old_head, *new_head);
> +    } while (unlikely(success == 0));
> +    return n;
> +}
> +
> +/**
> + * @internal Enqueue several objects on the ring
> + *
> +  * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
> + * @param is_sp
> + *   Indicates whether to use single producer or multi-producer head update
> + * @param free_space
> + *   returns the amount of space after the enqueue operation has finished
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
> +         unsigned int n, enum rte_ring_queue_behavior behavior,
> +         int is_sp, unsigned int *free_space)
> +{

Duplicate function, No need to replicate on both versions.

> +
> +/**
> + * @internal This function updates the consumer head for dequeue
> + *
> + * @param r
> + *   A pointer to the ring structure
> + * @param is_sc
> + *   Indicates whether multi-consumer path is needed or not
> + * @param n
> + *   The number of elements we will want to enqueue, i.e. how far should
> the
> + *   head be moved
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
> + * @param old_head
> + *   Returns head value as it was before the move, i.e. where dequeue
> starts
> + * @param new_head
> + *   Returns the current/new head value i.e. where dequeue finishes
> + * @param entries
> + *   Returns the number of entries in the ring BEFORE head was moved
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
> +        unsigned int n, enum rte_ring_queue_behavior behavior,
> +        uint32_t *old_head, uint32_t *new_head,
> +        uint32_t *entries)
> +{
> +    unsigned int max = n;
> +    int success;
> +
> +    /* move cons.head atomically */
> +    do {
> +        /* Restore n as it may change every loop */
> +        n = max;
> +
> +        *old_head = r->cons.head;
> +        const uint32_t prod_tail = r->prod.tail;

Same as above comment.

> +        /* The subtraction is done between two unsigned 32bits value
> +         * (the result is always modulo 32 bits even if we have
> +         * cons_head > prod_tail). So 'entries' is always between 0
> +         * and size(ring)-1. */
> +        *entries = (prod_tail - *old_head);
> +
> +        /* Set the actual entries for dequeue */
> +        if (n > *entries)
> +            n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
> +
> +        if (unlikely(n == 0))
> +            return 0;
> +
> +        *new_head = *old_head + n;
> +        if (is_sc)
> +            r->cons.head = *new_head, success = 1;
> +        else
> +            success = rte_atomic32_cmpset(&r->cons.head, *old_head,
> +                    *new_head);
> +    } while (unlikely(success == 0));
> +    return n;
> +}
> +
> +/**
> + * @internal Dequeue several objects from the ring
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
> + * @param is_sc
> + *   Indicates whether to use single consumer or multi-consumer head update
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue has
> finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
> +         unsigned int n, enum rte_ring_queue_behavior behavior,
> +         int is_sc, unsigned int *available)
> +{
Duplicate function, No need to replicate on both versions.
> 

  reply	other threads:[~2017-11-07  9:58 UTC|newest]

Thread overview: 13+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2017-11-02  8:43 [PATCH v2] ring: guarantee ordering of cons/prod loading when doing Jia He
2017-11-02 13:26 ` Ananyev, Konstantin
2017-11-02 15:42   ` Jia He
2017-11-02 16:16     ` Ananyev, Konstantin
2017-11-02 17:00       ` Jerin Jacob
2017-11-02 17:23 ` Jerin Jacob
2017-11-03  1:46   ` Jia He
2017-11-03 12:56     ` Jerin Jacob
2017-11-06  7:25       ` Jia He
2017-11-07  4:36         ` Jerin Jacob
2017-11-07  8:34           ` Jia He
2017-11-07  9:57             ` Jerin Jacob [this message]
2017-11-08  2:31               ` Jia He

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20171107095727.GA23010@jerin \
    --to=jerin.jacob@caviumnetworks.com \
    --cc=bing.zhao@hxt-semitech.com \
    --cc=bruce.richardson@intel.com \
    --cc=dev@dpdk.org \
    --cc=hejianet@gmail.com \
    --cc=hemant.agrawal@nxp.com \
    --cc=jia.he@hxt-semitech.com \
    --cc=jianbo.liu@arm.com \
    --cc=jie2.liu@hxt-semitech.com \
    --cc=konstantin.ananyev@intel.com \
    --cc=olivier.matz@6wind.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.