* [PATCH 1/5] ring: split single thread vs multi-thread cases
2026-06-02 17:07 [PATCH 0/5] ring: convert to C11 atomics where practical Stephen Hemminger
@ 2026-06-02 17:07 ` Stephen Hemminger
2026-06-04 15:09 ` Konstantin Ananyev
2026-06-02 17:07 ` [PATCH 2/5] ring: use GCC builtin as alternative to rte_atomic32 Stephen Hemminger
` (4 subsequent siblings)
5 siblings, 1 reply; 17+ messages in thread
From: Stephen Hemminger @ 2026-06-02 17:07 UTC (permalink / raw)
To: dev; +Cc: Stephen Hemminger, Konstantin Ananyev, Wathsala Vithanage
The move head function has optimization for updating when
being used on single threaded ring. Code is cleaner if the two
cases are split into separate functions.
Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
---
lib/ring/rte_ring_c11_pvt.h | 100 +++++++++++++++++++++++++-------
lib/ring/rte_ring_elem_pvt.h | 16 +++--
lib/ring/rte_ring_generic_pvt.h | 77 ++++++++++++++++++++----
lib/ring/soring.c | 24 +++++---
4 files changed, 171 insertions(+), 46 deletions(-)
diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h
index 07b6efc416..5afc14dec9 100644
--- a/lib/ring/rte_ring_c11_pvt.h
+++ b/lib/ring/rte_ring_c11_pvt.h
@@ -46,6 +46,7 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
/**
* @internal This is a helper function that moves the producer/consumer head
+ * optimized for single threaded case
*
* @param d
* A pointer to the headtail structure with head value to be moved
@@ -54,8 +55,6 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
* function only reads tail value from it
* @param capacity
* Either ring capacity value (for producer), or zero (for consumer)
- * @param is_st
- * Indicates whether multi-thread safe path is needed or not
* @param n
* The number of elements we want to move head value on
* @param behavior
@@ -72,14 +71,77 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
* If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
*/
static __rte_always_inline unsigned int
-__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
+__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d,
const struct rte_ring_headtail *s, uint32_t capacity,
- unsigned int is_st, unsigned int n,
+ unsigned int n,
enum rte_ring_queue_behavior behavior,
uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
{
uint32_t stail;
- int success;
+
+ /* Single producer: only this thread writes d->head,
+ * so a relaxed load is sufficient.
+ */
+ *old_head = rte_atomic_load_explicit(&d->head, rte_memory_order_relaxed);
+
+ /* Acquire pairs with the consumer's release-store of tail in __rte_ring_update_tail,
+ * ensuring the consumer's ring-element reads are complete before
+ * we observe the updated tail.
+ */
+ stail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire);
+
+ /* Unsigned subtraction is modulo 2^32, so entries is always in
+ * [0, capacity) even if old_head > stail.
+ */
+ *entries = capacity + stail - *old_head;
+
+ /* check that we have enough room in ring */
+ if (unlikely(n > *entries))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+ if (n > 0) {
+ *new_head = *old_head + n;
+ rte_atomic_store_explicit(&d->head, *new_head, rte_memory_order_relaxed);
+ }
+
+ return n;
+}
+
+/**
+ * @internal This is a helper function that moves the producer/consumer head
+ * for use in multi-thread safe path
+ *
+ * @param d
+ * A pointer to the headtail structure with head value to be moved
+ * @param s
+ * A pointer to the counter-part headtail structure. Note that this
+ * function only reads tail value from it
+ * @param capacity
+ * Either ring capacity value (for producer), or zero (for consumer)
+ * @param n
+ * The number of elements we want to move head value on
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
+ * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
+ * @param old_head
+ * Returns head value as it was before the move
+ * @param new_head
+ * Returns the new head value
+ * @param entries
+ * Returns the number of ring entries available BEFORE head was moved
+ * @return
+ * Actual number of objects the head was moved on
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
+ */
+static __rte_always_inline unsigned int
+__rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
+ const struct rte_ring_headtail *s, uint32_t capacity,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
+{
+ uint32_t stail;
+ bool success;
unsigned int max = n;
/*
@@ -120,25 +182,21 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
return 0;
*new_head = *old_head + n;
- if (is_st) {
- d->head = *new_head;
- success = 1;
- } else
- /* on failure, *old_head is updated */
- /*
- * R1/A2.
- * R1: Establishes a synchronizing edge with A0 of a
- * different thread.
- * A2: Establishes a synchronizing edge with R1 of a
- * different thread to observe same value for stail
- * observed by that thread on CAS failure (to retry
- * with an updated *old_head).
- */
- success = rte_atomic_compare_exchange_strong_explicit(
+ /* on failure, *old_head is updated */
+ /*
+ * R1/A2.
+ * R1: Establishes a synchronizing edge with A0 of a
+ * different thread.
+ * A2: Establishes a synchronizing edge with R1 of a
+ * different thread to observe same value for stail
+ * observed by that thread on CAS failure (to retry
+ * with an updated *old_head).
+ */
+ success = rte_atomic_compare_exchange_strong_explicit(
&d->head, old_head, *new_head,
rte_memory_order_release,
rte_memory_order_acquire);
- } while (unlikely(success == 0));
+ } while (unlikely(!success));
return n;
}
diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h
index 6eafae121f..a0fdec9812 100644
--- a/lib/ring/rte_ring_elem_pvt.h
+++ b/lib/ring/rte_ring_elem_pvt.h
@@ -341,8 +341,12 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
uint32_t *old_head, uint32_t *new_head,
uint32_t *free_entries)
{
- return __rte_ring_headtail_move_head(&r->prod, &r->cons, r->capacity,
- is_sp, n, behavior, old_head, new_head, free_entries);
+ if (is_sp)
+ return __rte_ring_headtail_move_head_st(&r->prod, &r->cons, r->capacity,
+ n, behavior, old_head, new_head, free_entries);
+ else
+ return __rte_ring_headtail_move_head_mt(&r->prod, &r->cons, r->capacity,
+ n, behavior, old_head, new_head, free_entries);
}
/**
@@ -374,8 +378,12 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
uint32_t *old_head, uint32_t *new_head,
uint32_t *entries)
{
- return __rte_ring_headtail_move_head(&r->cons, &r->prod, 0,
- is_sc, n, behavior, old_head, new_head, entries);
+ if (is_sc)
+ return __rte_ring_headtail_move_head_st(&r->cons, &r->prod, 0,
+ n, behavior, old_head, new_head, entries);
+ else
+ return __rte_ring_headtail_move_head_mt(&r->cons, &r->prod, 0,
+ n, behavior, old_head, new_head, entries);
}
/**
diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_generic_pvt.h
index affd2d5ba7..c044b0824f 100644
--- a/lib/ring/rte_ring_generic_pvt.h
+++ b/lib/ring/rte_ring_generic_pvt.h
@@ -42,6 +42,7 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
/**
* @internal This is a helper function that moves the producer/consumer head
+ * for use in multi-thread safe path
*
* @param d
* A pointer to the headtail structure with head value to be moved
@@ -50,8 +51,6 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
* function only reads tail value from it
* @param capacity
* Either ring capacity value (for producer), or zero (for consumer)
- * @param is_st
- * Indicates whether multi-thread safe path is needed or not
* @param n
* The number of elements we want to move head value on
* @param behavior
@@ -68,10 +67,9 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
* If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
*/
static __rte_always_inline unsigned int
-__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
+__rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
const struct rte_ring_headtail *s, uint32_t capacity,
- unsigned int is_st, unsigned int n,
- enum rte_ring_queue_behavior behavior,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
{
unsigned int max = n;
@@ -105,15 +103,70 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
return 0;
*new_head = *old_head + n;
- if (is_st) {
- d->head = *new_head;
- success = 1;
- } else
- success = rte_atomic32_cmpset(
- (uint32_t *)(uintptr_t)&d->head,
- *old_head, *new_head);
+ success = rte_atomic32_cmpset(
+ (uint32_t *)(uintptr_t)&d->head,
+ *old_head, *new_head);
} while (unlikely(success == 0));
return n;
}
+/**
+ * @internal This is a helper function that moves the producer/consumer head
+ * optimized for single threaded case
+ *
+ * @param d
+ * A pointer to the headtail structure with head value to be moved
+ * @param s
+ * A pointer to the counter-part headtail structure. Note that this
+ * function only reads tail value from it
+ * @param capacity
+ * Either ring capacity value (for producer), or zero (for consumer)
+ * @param n
+ * The number of elements we want to move head value on
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
+ * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
+ * @param old_head
+ * Returns head value as it was before the move
+ * @param new_head
+ * Returns the new head value
+ * @param entries
+ * Returns the number of ring entries available BEFORE head was moved
+ * @return
+ * Actual number of objects the head was moved on
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
+ */
+static __rte_always_inline unsigned int
+__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d,
+ const struct rte_ring_headtail *s, uint32_t capacity,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
+{
+ *old_head = d->head;
+
+ /* add rmb barrier to avoid load/load reorder in weak
+ * memory model. It is noop on x86
+ */
+ rte_smp_rmb();
+
+ /*
+ * The subtraction is done between two unsigned 32bits value
+ * (the result is always modulo 32 bits even if we have
+ * *old_head > s->tail). So 'entries' is always between 0
+ * and capacity (which is < size).
+ */
+ *entries = (capacity + s->tail - *old_head);
+
+ /* check that we have enough room in ring */
+ if (unlikely(n > *entries))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+ if (likely(n > 0)) {
+ *new_head = *old_head + n;
+ d->head = *new_head;
+ }
+ return n;
+}
+
#endif /* _RTE_RING_GENERIC_PVT_H_ */
diff --git a/lib/ring/soring.c b/lib/ring/soring.c
index e9c75619fe..22f9c60e9c 100644
--- a/lib/ring/soring.c
+++ b/lib/ring/soring.c
@@ -135,9 +135,12 @@ __rte_soring_move_prod_head(struct rte_soring *r, uint32_t num,
switch (st) {
case RTE_RING_SYNC_ST:
+ n = __rte_ring_headtail_move_head_st(&r->prod.ht, &r->cons.ht,
+ r->capacity, num, behavior, head, next, free);
+ break;
case RTE_RING_SYNC_MT:
- n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht,
- r->capacity, st, num, behavior, head, next, free);
+ n = __rte_ring_headtail_move_head_mt(&r->prod.ht, &r->cons.ht,
+ r->capacity, num, behavior, head, next, free);
break;
case RTE_RING_SYNC_MT_RTS:
n = __rte_ring_rts_move_head(&r->prod.rts, &r->cons.ht,
@@ -168,9 +171,13 @@ __rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num,
switch (st) {
case RTE_RING_SYNC_ST:
+ n = __rte_ring_headtail_move_head_st(&r->cons.ht,
+ &r->stage[stage].ht, 0, num, behavior,
+ head, next, avail);
+ break;
case RTE_RING_SYNC_MT:
- n = __rte_ring_headtail_move_head(&r->cons.ht,
- &r->stage[stage].ht, 0, st, num, behavior,
+ n = __rte_ring_headtail_move_head_mt(&r->cons.ht,
+ &r->stage[stage].ht, 0, num, behavior,
head, next, avail);
break;
case RTE_RING_SYNC_MT_RTS:
@@ -309,9 +316,8 @@ soring_enqueue_start(struct rte_soring *r, uint32_t num,
switch (st) {
case RTE_RING_SYNC_ST:
- n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht,
- r->capacity, RTE_RING_SYNC_ST, num, behavior,
- &head, &next, &free);
+ n = __rte_ring_headtail_move_head_st(&r->prod.ht, &r->cons.ht,
+ r->capacity, num, behavior, &head, &next, &free);
break;
case RTE_RING_SYNC_MT_HTS:
n = __rte_ring_hts_move_head(&r->prod.hts, &r->cons.ht,
@@ -419,8 +425,8 @@ soring_dequeue_start(struct rte_soring *r, void *objs, void *meta,
switch (st) {
case RTE_RING_SYNC_ST:
- n = __rte_ring_headtail_move_head(&r->cons.ht, &r->stage[ns].ht,
- 0, RTE_RING_SYNC_ST, num, behavior, &head, &next,
+ n = __rte_ring_headtail_move_head_st(&r->cons.ht, &r->stage[ns].ht,
+ 0, num, behavior, &head, &next,
&avail);
break;
case RTE_RING_SYNC_MT_HTS:
--
2.53.0
^ permalink raw reply related [flat|nested] 17+ messages in thread* RE: [PATCH 1/5] ring: split single thread vs multi-thread cases
2026-06-02 17:07 ` [PATCH 1/5] ring: split single thread vs multi-thread cases Stephen Hemminger
@ 2026-06-04 15:09 ` Konstantin Ananyev
0 siblings, 0 replies; 17+ messages in thread
From: Konstantin Ananyev @ 2026-06-04 15:09 UTC (permalink / raw)
To: Stephen Hemminger, dev@dpdk.org; +Cc: Wathsala Vithanage
> The move head function has optimization for updating when
> being used on single threaded ring. Code is cleaner if the two
> cases are split into separate functions.
>
> Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
> ---
> lib/ring/rte_ring_c11_pvt.h | 100 +++++++++++++++++++++++++-------
> lib/ring/rte_ring_elem_pvt.h | 16 +++--
> lib/ring/rte_ring_generic_pvt.h | 77 ++++++++++++++++++++----
> lib/ring/soring.c | 24 +++++---
> 4 files changed, 171 insertions(+), 46 deletions(-)
>
> diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h
> index 07b6efc416..5afc14dec9 100644
> --- a/lib/ring/rte_ring_c11_pvt.h
> +++ b/lib/ring/rte_ring_c11_pvt.h
> @@ -46,6 +46,7 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht,
> uint32_t old_val,
>
> /**
> * @internal This is a helper function that moves the producer/consumer head
> + * optimized for single threaded case
> *
> * @param d
> * A pointer to the headtail structure with head value to be moved
> @@ -54,8 +55,6 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht,
> uint32_t old_val,
> * function only reads tail value from it
> * @param capacity
> * Either ring capacity value (for producer), or zero (for consumer)
> - * @param is_st
> - * Indicates whether multi-thread safe path is needed or not
> * @param n
> * The number of elements we want to move head value on
> * @param behavior
> @@ -72,14 +71,77 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht,
> uint32_t old_val,
> * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
> */
> static __rte_always_inline unsigned int
> -__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
> +__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d,
> const struct rte_ring_headtail *s, uint32_t capacity,
> - unsigned int is_st, unsigned int n,
> + unsigned int n,
> enum rte_ring_queue_behavior behavior,
> uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
> {
> uint32_t stail;
> - int success;
> +
> + /* Single producer: only this thread writes d->head,
> + * so a relaxed load is sufficient.
> + */
> + *old_head = rte_atomic_load_explicit(&d->head,
> rte_memory_order_relaxed);
> +
> + /* Acquire pairs with the consumer's release-store of tail in
> __rte_ring_update_tail,
> + * ensuring the consumer's ring-element reads are complete before
> + * we observe the updated tail.
> + */
> + stail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire);
> +
> + /* Unsigned subtraction is modulo 2^32, so entries is always in
> + * [0, capacity) even if old_head > stail.
> + */
> + *entries = capacity + stail - *old_head;
> +
> + /* check that we have enough room in ring */
> + if (unlikely(n > *entries))
> + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
> +
> + if (n > 0) {
> + *new_head = *old_head + n;
> + rte_atomic_store_explicit(&d->head, *new_head,
> rte_memory_order_relaxed);
> + }
> +
> + return n;
> +}
> +
> +/**
> + * @internal This is a helper function that moves the producer/consumer head
> + * for use in multi-thread safe path
> + *
> + * @param d
> + * A pointer to the headtail structure with head value to be moved
> + * @param s
> + * A pointer to the counter-part headtail structure. Note that this
> + * function only reads tail value from it
> + * @param capacity
> + * Either ring capacity value (for producer), or zero (for consumer)
> + * @param n
> + * The number of elements we want to move head value on
> + * @param behavior
> + * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
> + * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
> + * @param old_head
> + * Returns head value as it was before the move
> + * @param new_head
> + * Returns the new head value
> + * @param entries
> + * Returns the number of ring entries available BEFORE head was moved
> + * @return
> + * Actual number of objects the head was moved on
> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
> + const struct rte_ring_headtail *s, uint32_t capacity,
> + unsigned int n,
> + enum rte_ring_queue_behavior behavior,
> + uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
> +{
> + uint32_t stail;
> + bool success;
> unsigned int max = n;
>
> /*
> @@ -120,25 +182,21 @@ __rte_ring_headtail_move_head(struct
> rte_ring_headtail *d,
> return 0;
>
> *new_head = *old_head + n;
> - if (is_st) {
> - d->head = *new_head;
> - success = 1;
> - } else
> - /* on failure, *old_head is updated */
> - /*
> - * R1/A2.
> - * R1: Establishes a synchronizing edge with A0 of a
> - * different thread.
> - * A2: Establishes a synchronizing edge with R1 of a
> - * different thread to observe same value for stail
> - * observed by that thread on CAS failure (to retry
> - * with an updated *old_head).
> - */
> - success =
> rte_atomic_compare_exchange_strong_explicit(
> + /* on failure, *old_head is updated */
> + /*
> + * R1/A2.
> + * R1: Establishes a synchronizing edge with A0 of a
> + * different thread.
> + * A2: Establishes a synchronizing edge with R1 of a
> + * different thread to observe same value for stail
> + * observed by that thread on CAS failure (to retry
> + * with an updated *old_head).
> + */
> + success = rte_atomic_compare_exchange_strong_explicit(
> &d->head, old_head, *new_head,
> rte_memory_order_release,
> rte_memory_order_acquire);
> - } while (unlikely(success == 0));
> + } while (unlikely(!success));
> return n;
> }
>
> diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h
> index 6eafae121f..a0fdec9812 100644
> --- a/lib/ring/rte_ring_elem_pvt.h
> +++ b/lib/ring/rte_ring_elem_pvt.h
> @@ -341,8 +341,12 @@ __rte_ring_move_prod_head(struct rte_ring *r,
> unsigned int is_sp,
> uint32_t *old_head, uint32_t *new_head,
> uint32_t *free_entries)
> {
> - return __rte_ring_headtail_move_head(&r->prod, &r->cons, r->capacity,
> - is_sp, n, behavior, old_head, new_head, free_entries);
> + if (is_sp)
> + return __rte_ring_headtail_move_head_st(&r->prod, &r->cons,
> r->capacity,
> + n, behavior, old_head, new_head, free_entries);
> + else
> + return __rte_ring_headtail_move_head_mt(&r->prod, &r->cons,
> r->capacity,
> + n, behavior, old_head, new_head, free_entries);
> }
>
> /**
> @@ -374,8 +378,12 @@ __rte_ring_move_cons_head(struct rte_ring *r,
> unsigned int is_sc,
> uint32_t *old_head, uint32_t *new_head,
> uint32_t *entries)
> {
> - return __rte_ring_headtail_move_head(&r->cons, &r->prod, 0,
> - is_sc, n, behavior, old_head, new_head, entries);
> + if (is_sc)
> + return __rte_ring_headtail_move_head_st(&r->cons, &r->prod,
> 0,
> + n, behavior, old_head, new_head, entries);
> + else
> + return __rte_ring_headtail_move_head_mt(&r->cons, &r->prod,
> 0,
> + n, behavior, old_head, new_head, entries);
> }
>
> /**
> diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_generic_pvt.h
> index affd2d5ba7..c044b0824f 100644
> --- a/lib/ring/rte_ring_generic_pvt.h
> +++ b/lib/ring/rte_ring_generic_pvt.h
> @@ -42,6 +42,7 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht,
> uint32_t old_val,
>
> /**
> * @internal This is a helper function that moves the producer/consumer head
> + * for use in multi-thread safe path
> *
> * @param d
> * A pointer to the headtail structure with head value to be moved
> @@ -50,8 +51,6 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht,
> uint32_t old_val,
> * function only reads tail value from it
> * @param capacity
> * Either ring capacity value (for producer), or zero (for consumer)
> - * @param is_st
> - * Indicates whether multi-thread safe path is needed or not
> * @param n
> * The number of elements we want to move head value on
> * @param behavior
> @@ -68,10 +67,9 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht,
> uint32_t old_val,
> * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
> */
> static __rte_always_inline unsigned int
> -__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
> +__rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
> const struct rte_ring_headtail *s, uint32_t capacity,
> - unsigned int is_st, unsigned int n,
> - enum rte_ring_queue_behavior behavior,
> + unsigned int n, enum rte_ring_queue_behavior behavior,
> uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
> {
> unsigned int max = n;
> @@ -105,15 +103,70 @@ __rte_ring_headtail_move_head(struct
> rte_ring_headtail *d,
> return 0;
>
> *new_head = *old_head + n;
> - if (is_st) {
> - d->head = *new_head;
> - success = 1;
> - } else
> - success = rte_atomic32_cmpset(
> - (uint32_t *)(uintptr_t)&d->head,
> - *old_head, *new_head);
> + success = rte_atomic32_cmpset(
> + (uint32_t *)(uintptr_t)&d->head,
> + *old_head, *new_head);
> } while (unlikely(success == 0));
> return n;
> }
>
> +/**
> + * @internal This is a helper function that moves the producer/consumer head
> + * optimized for single threaded case
> + *
> + * @param d
> + * A pointer to the headtail structure with head value to be moved
> + * @param s
> + * A pointer to the counter-part headtail structure. Note that this
> + * function only reads tail value from it
> + * @param capacity
> + * Either ring capacity value (for producer), or zero (for consumer)
> + * @param n
> + * The number of elements we want to move head value on
> + * @param behavior
> + * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
> + * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
> + * @param old_head
> + * Returns head value as it was before the move
> + * @param new_head
> + * Returns the new head value
> + * @param entries
> + * Returns the number of ring entries available BEFORE head was moved
> + * @return
> + * Actual number of objects the head was moved on
> + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d,
> + const struct rte_ring_headtail *s, uint32_t capacity,
> + unsigned int n,
> + enum rte_ring_queue_behavior behavior,
> + uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
> +{
> + *old_head = d->head;
> +
> + /* add rmb barrier to avoid load/load reorder in weak
> + * memory model. It is noop on x86
> + */
> + rte_smp_rmb();
> +
> + /*
> + * The subtraction is done between two unsigned 32bits value
> + * (the result is always modulo 32 bits even if we have
> + * *old_head > s->tail). So 'entries' is always between 0
> + * and capacity (which is < size).
> + */
> + *entries = (capacity + s->tail - *old_head);
> +
> + /* check that we have enough room in ring */
> + if (unlikely(n > *entries))
> + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
> +
> + if (likely(n > 0)) {
> + *new_head = *old_head + n;
> + d->head = *new_head;
> + }
> + return n;
> +}
> +
> #endif /* _RTE_RING_GENERIC_PVT_H_ */
> diff --git a/lib/ring/soring.c b/lib/ring/soring.c
> index e9c75619fe..22f9c60e9c 100644
> --- a/lib/ring/soring.c
> +++ b/lib/ring/soring.c
> @@ -135,9 +135,12 @@ __rte_soring_move_prod_head(struct rte_soring *r,
> uint32_t num,
>
> switch (st) {
> case RTE_RING_SYNC_ST:
> + n = __rte_ring_headtail_move_head_st(&r->prod.ht, &r-
> >cons.ht,
> + r->capacity, num, behavior, head, next, free);
> + break;
> case RTE_RING_SYNC_MT:
> - n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht,
> - r->capacity, st, num, behavior, head, next, free);
> + n = __rte_ring_headtail_move_head_mt(&r->prod.ht, &r-
> >cons.ht,
> + r->capacity, num, behavior, head, next, free);
> break;
> case RTE_RING_SYNC_MT_RTS:
> n = __rte_ring_rts_move_head(&r->prod.rts, &r->cons.ht,
> @@ -168,9 +171,13 @@ __rte_soring_move_cons_head(struct rte_soring *r,
> uint32_t stage, uint32_t num,
>
> switch (st) {
> case RTE_RING_SYNC_ST:
> + n = __rte_ring_headtail_move_head_st(&r->cons.ht,
> + &r->stage[stage].ht, 0, num, behavior,
> + head, next, avail);
> + break;
> case RTE_RING_SYNC_MT:
> - n = __rte_ring_headtail_move_head(&r->cons.ht,
> - &r->stage[stage].ht, 0, st, num, behavior,
> + n = __rte_ring_headtail_move_head_mt(&r->cons.ht,
> + &r->stage[stage].ht, 0, num, behavior,
> head, next, avail);
> break;
> case RTE_RING_SYNC_MT_RTS:
> @@ -309,9 +316,8 @@ soring_enqueue_start(struct rte_soring *r, uint32_t num,
>
> switch (st) {
> case RTE_RING_SYNC_ST:
> - n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht,
> - r->capacity, RTE_RING_SYNC_ST, num, behavior,
> - &head, &next, &free);
> + n = __rte_ring_headtail_move_head_st(&r->prod.ht, &r-
> >cons.ht,
> + r->capacity, num, behavior, &head, &next, &free);
> break;
> case RTE_RING_SYNC_MT_HTS:
> n = __rte_ring_hts_move_head(&r->prod.hts, &r->cons.ht,
> @@ -419,8 +425,8 @@ soring_dequeue_start(struct rte_soring *r, void *objs,
> void *meta,
>
> switch (st) {
> case RTE_RING_SYNC_ST:
> - n = __rte_ring_headtail_move_head(&r->cons.ht, &r-
> >stage[ns].ht,
> - 0, RTE_RING_SYNC_ST, num, behavior, &head, &next,
> + n = __rte_ring_headtail_move_head_st(&r->cons.ht, &r-
> >stage[ns].ht,
> + 0, num, behavior, &head, &next,
> &avail);
> break;
> case RTE_RING_SYNC_MT_HTS:
> --
Acked-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
Tested-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
> 2.53.0
^ permalink raw reply [flat|nested] 17+ messages in thread
* [PATCH 2/5] ring: use GCC builtin as alternative to rte_atomic32
2026-06-02 17:07 [PATCH 0/5] ring: convert to C11 atomics where practical Stephen Hemminger
2026-06-02 17:07 ` [PATCH 1/5] ring: split single thread vs multi-thread cases Stephen Hemminger
@ 2026-06-02 17:07 ` Stephen Hemminger
2026-06-04 15:11 ` Konstantin Ananyev
2026-06-02 17:07 ` [PATCH 3/5] ring: use C11 for update_tail Stephen Hemminger
` (3 subsequent siblings)
5 siblings, 1 reply; 17+ messages in thread
From: Stephen Hemminger @ 2026-06-02 17:07 UTC (permalink / raw)
To: dev; +Cc: Stephen Hemminger, Konstantin Ananyev, Wathsala Vithanage
This patch replaces use of the deprecated rte_atomic32 code with
GCC builtin atomic operations.
Although it would be preferable to use C11 version on all architectures,
there is a performance loss if we do it that way:
Measured on i9-13900H, two physical cores MP/MC bulk n=128, 10 runs:
with C11 builtin: 5.86 cycles/elem
with __sync builtin: 5.36 cycles/elem (-9.4%)
The C11 __atomic_compare_exchange_n builtin writes the actual value back
to its expected pointer on failure. On x86 this forces GCC
to emit extra instructions on the critical path between the CAS
and the success-test.
__sync_bool_compare_and_swap returns a plain bool with no pointer
writeback, allowing GCC to emit tighter code.
Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
---
lib/ring/meson.build | 2 +-
lib/ring/rte_ring_c11_pvt.h | 3 +-
lib/ring/rte_ring_elem_pvt.h | 2 +-
..._ring_generic_pvt.h => rte_ring_gcc_pvt.h} | 37 +++++++++++--------
4 files changed, 24 insertions(+), 20 deletions(-)
rename lib/ring/{rte_ring_generic_pvt.h => rte_ring_gcc_pvt.h} (87%)
diff --git a/lib/ring/meson.build b/lib/ring/meson.build
index 21f2c12989..2ba160b178 100644
--- a/lib/ring/meson.build
+++ b/lib/ring/meson.build
@@ -9,7 +9,7 @@ indirect_headers += files (
'rte_ring_elem.h',
'rte_ring_elem_pvt.h',
'rte_ring_c11_pvt.h',
- 'rte_ring_generic_pvt.h',
+ 'rte_ring_gcc_pvt.h',
'rte_ring_hts.h',
'rte_ring_hts_elem_pvt.h',
'rte_ring_peek.h',
diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h
index 5afc14dec9..8358b0f21f 100644
--- a/lib/ring/rte_ring_c11_pvt.h
+++ b/lib/ring/rte_ring_c11_pvt.h
@@ -43,7 +43,6 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
*/
rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release);
}
-
/**
* @internal This is a helper function that moves the producer/consumer head
* optimized for single threaded case
@@ -82,7 +81,7 @@ __rte_ring_headtail_move_head_st(struct rte_ring_headtail *d,
/* Single producer: only this thread writes d->head,
* so a relaxed load is sufficient.
*/
- *old_head = rte_atomic_load_explicit(&d->head, rte_memory_order_relaxed);
+ *old_head = rte_atomic_load_explicit(&d->head, rte_memory_order_acquire);
/* Acquire pairs with the consumer's release-store of tail in __rte_ring_update_tail,
* ensuring the consumer's ring-element reads are complete before
diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h
index a0fdec9812..9a0170c4f0 100644
--- a/lib/ring/rte_ring_elem_pvt.h
+++ b/lib/ring/rte_ring_elem_pvt.h
@@ -309,7 +309,7 @@ __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head,
#ifdef RTE_USE_C11_MEM_MODEL
#include "rte_ring_c11_pvt.h"
#else
-#include "rte_ring_generic_pvt.h"
+#include "rte_ring_gcc_pvt.h"
#endif
/**
diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_gcc_pvt.h
similarity index 87%
rename from lib/ring/rte_ring_generic_pvt.h
rename to lib/ring/rte_ring_gcc_pvt.h
index c044b0824f..9033a15647 100644
--- a/lib/ring/rte_ring_generic_pvt.h
+++ b/lib/ring/rte_ring_gcc_pvt.h
@@ -7,11 +7,11 @@
* Used as BSD-3 Licensed with permission from Kip Macy.
*/
-#ifndef _RTE_RING_GENERIC_PVT_H_
-#define _RTE_RING_GENERIC_PVT_H_
+#ifndef _RTE_RING_GCC_PVT_H_
+#define _RTE_RING_GCC_PVT_H_
/**
- * @file rte_ring_generic_pvt.h
+ * @file rte_ring_gcc_pvt.h
* It is not recommended to include this file directly,
* include <rte_ring.h> instead.
* Contains internal helper functions for MP/SP and MC/SC ring modes.
@@ -25,10 +25,8 @@ static __rte_always_inline void
__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
uint32_t new_val, uint32_t single, uint32_t enqueue)
{
- if (enqueue)
- rte_smp_wmb();
- else
- rte_smp_rmb();
+ RTE_SET_USED(enqueue);
+
/*
* If there are other enqueues/dequeues in progress that preceded us,
* we need to wait for them to complete
@@ -37,7 +35,12 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
rte_wait_until_equal_32((volatile uint32_t *)(uintptr_t)&ht->tail, old_val,
rte_memory_order_relaxed);
- ht->tail = new_val;
+ /*
+ * R0: Establishes a synchronizing edge with load-acquire of tail at A1.
+ * Ensures that memory effects by this thread on ring elements array
+ * is observed by a different thread of the other type.
+ */
+ __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
}
/**
@@ -72,8 +75,8 @@ __rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
unsigned int n, enum rte_ring_queue_behavior behavior,
uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
{
+ bool success;
unsigned int max = n;
- int success;
do {
/* Reset n to the initial burst count */
@@ -81,10 +84,10 @@ __rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
*old_head = d->head;
- /* add rmb barrier to avoid load/load reorder in weak
+ /* add fence to avoid load/load reorder in weak
* memory model. It is noop on x86
*/
- rte_smp_rmb();
+ __atomic_thread_fence(__ATOMIC_ACQUIRE);
/*
* The subtraction is done between two unsigned 32bits value
@@ -92,7 +95,7 @@ __rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
* *old_head > s->tail). So 'entries' is always between 0
* and capacity (which is < size).
*/
- *entries = (capacity + s->tail - *old_head);
+ *entries = capacity + s->tail - *old_head;
/* check that we have enough room in ring */
if (unlikely(n > *entries))
@@ -100,13 +103,15 @@ __rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
0 : *entries;
if (n == 0)
- return 0;
+ break;
*new_head = *old_head + n;
- success = rte_atomic32_cmpset(
+
+ success = __sync_bool_compare_and_swap(
(uint32_t *)(uintptr_t)&d->head,
*old_head, *new_head);
- } while (unlikely(success == 0));
+ } while (unlikely(!success));
+
return n;
}
@@ -169,4 +174,4 @@ __rte_ring_headtail_move_head_st(struct rte_ring_headtail *d,
return n;
}
-#endif /* _RTE_RING_GENERIC_PVT_H_ */
+#endif /* _RTE_RING_GCC_PVT_H_ */
--
2.53.0
^ permalink raw reply related [flat|nested] 17+ messages in thread* RE: [PATCH 2/5] ring: use GCC builtin as alternative to rte_atomic32
2026-06-02 17:07 ` [PATCH 2/5] ring: use GCC builtin as alternative to rte_atomic32 Stephen Hemminger
@ 2026-06-04 15:11 ` Konstantin Ananyev
2026-06-04 15:20 ` Stephen Hemminger
0 siblings, 1 reply; 17+ messages in thread
From: Konstantin Ananyev @ 2026-06-04 15:11 UTC (permalink / raw)
To: Stephen Hemminger, dev@dpdk.org; +Cc: Wathsala Vithanage
> This patch replaces use of the deprecated rte_atomic32 code with
> GCC builtin atomic operations.
>
> Although it would be preferable to use C11 version on all architectures,
> there is a performance loss if we do it that way:
>
> Measured on i9-13900H, two physical cores MP/MC bulk n=128, 10 runs:
> with C11 builtin: 5.86 cycles/elem
> with __sync builtin: 5.36 cycles/elem (-9.4%)
>
> The C11 __atomic_compare_exchange_n builtin writes the actual value back
> to its expected pointer on failure. On x86 this forces GCC
> to emit extra instructions on the critical path between the CAS
> and the success-test.
>
> __sync_bool_compare_and_swap returns a plain bool with no pointer
> writeback, allowing GCC to emit tighter code.
>
> Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
> ---
> lib/ring/meson.build | 2 +-
> lib/ring/rte_ring_c11_pvt.h | 3 +-
> lib/ring/rte_ring_elem_pvt.h | 2 +-
> ..._ring_generic_pvt.h => rte_ring_gcc_pvt.h} | 37 +++++++++++--------
> 4 files changed, 24 insertions(+), 20 deletions(-)
> rename lib/ring/{rte_ring_generic_pvt.h => rte_ring_gcc_pvt.h} (87%)
>
> diff --git a/lib/ring/meson.build b/lib/ring/meson.build
> index 21f2c12989..2ba160b178 100644
> --- a/lib/ring/meson.build
> +++ b/lib/ring/meson.build
> @@ -9,7 +9,7 @@ indirect_headers += files (
> 'rte_ring_elem.h',
> 'rte_ring_elem_pvt.h',
> 'rte_ring_c11_pvt.h',
> - 'rte_ring_generic_pvt.h',
> + 'rte_ring_gcc_pvt.h',
> 'rte_ring_hts.h',
> 'rte_ring_hts_elem_pvt.h',
> 'rte_ring_peek.h',
> diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h
> index 5afc14dec9..8358b0f21f 100644
> --- a/lib/ring/rte_ring_c11_pvt.h
> +++ b/lib/ring/rte_ring_c11_pvt.h
> @@ -43,7 +43,6 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht,
> uint32_t old_val,
> */
> rte_atomic_store_explicit(&ht->tail, new_val,
> rte_memory_order_release);
> }
> -
> /**
> * @internal This is a helper function that moves the producer/consumer head
> * optimized for single threaded case
> @@ -82,7 +81,7 @@ __rte_ring_headtail_move_head_st(struct rte_ring_headtail
> *d,
> /* Single producer: only this thread writes d->head,
> * so a relaxed load is sufficient.
> */
> - *old_head = rte_atomic_load_explicit(&d->head,
> rte_memory_order_relaxed);
> + *old_head = rte_atomic_load_explicit(&d->head,
> rte_memory_order_acquire);
Not sure, why it had changed to 'acquire' here?
Looks like just patch splitting mistake, no?
>
> /* Acquire pairs with the consumer's release-store of tail in
> __rte_ring_update_tail,
> * ensuring the consumer's ring-element reads are complete before
^ permalink raw reply [flat|nested] 17+ messages in thread* Re: [PATCH 2/5] ring: use GCC builtin as alternative to rte_atomic32
2026-06-04 15:11 ` Konstantin Ananyev
@ 2026-06-04 15:20 ` Stephen Hemminger
2026-06-04 15:43 ` Konstantin Ananyev
0 siblings, 1 reply; 17+ messages in thread
From: Stephen Hemminger @ 2026-06-04 15:20 UTC (permalink / raw)
To: Konstantin Ananyev; +Cc: dev@dpdk.org, Wathsala Vithanage
On Thu, 4 Jun 2026 15:11:25 +0000
Konstantin Ananyev <konstantin.ananyev@huawei.com> wrote:
> > /**
> > * @internal This is a helper function that moves the producer/consumer head
> > * optimized for single threaded case
> > @@ -82,7 +81,7 @@ __rte_ring_headtail_move_head_st(struct rte_ring_headtail
> > *d,
> > /* Single producer: only this thread writes d->head,
> > * so a relaxed load is sufficient.
> > */
> > - *old_head = rte_atomic_load_explicit(&d->head,
> > rte_memory_order_relaxed);
> > + *old_head = rte_atomic_load_explicit(&d->head,
> > rte_memory_order_acquire);
>
> Not sure, why it had changed to 'acquire' here?
> Looks like just patch splitting mistake, no?
I should have kept it as relaxed for the first load.
^ permalink raw reply [flat|nested] 17+ messages in thread
* RE: [PATCH 2/5] ring: use GCC builtin as alternative to rte_atomic32
2026-06-04 15:20 ` Stephen Hemminger
@ 2026-06-04 15:43 ` Konstantin Ananyev
0 siblings, 0 replies; 17+ messages in thread
From: Konstantin Ananyev @ 2026-06-04 15:43 UTC (permalink / raw)
To: Stephen Hemminger; +Cc: dev@dpdk.org, Wathsala Vithanage
> On Thu, 4 Jun 2026 15:11:25 +0000
> Konstantin Ananyev <konstantin.ananyev@huawei.com> wrote:
>
> > > /**
> > > * @internal This is a helper function that moves the producer/consumer
> head
> > > * optimized for single threaded case
> > > @@ -82,7 +81,7 @@ __rte_ring_headtail_move_head_st(struct
> rte_ring_headtail
> > > *d,
> > > /* Single producer: only this thread writes d->head,
> > > * so a relaxed load is sufficient.
> > > */
> > > - *old_head = rte_atomic_load_explicit(&d->head,
> > > rte_memory_order_relaxed);
> > > + *old_head = rte_atomic_load_explicit(&d->head,
> > > rte_memory_order_acquire);
> >
> > Not sure, why it had changed to 'acquire' here?
> > Looks like just patch splitting mistake, no?
>
> I should have kept it as relaxed for the first load.
Yes, I believe so.
In fact, I reverted it back to 'relaxed' in final version (after applying all 5 patches)
and run both stress_ring_autotest and stress_soring_autotest on ARM box in our lab.
All passed.
Konstantin
^ permalink raw reply [flat|nested] 17+ messages in thread
* [PATCH 3/5] ring: use C11 for update_tail
2026-06-02 17:07 [PATCH 0/5] ring: convert to C11 atomics where practical Stephen Hemminger
2026-06-02 17:07 ` [PATCH 1/5] ring: split single thread vs multi-thread cases Stephen Hemminger
2026-06-02 17:07 ` [PATCH 2/5] ring: use GCC builtin as alternative to rte_atomic32 Stephen Hemminger
@ 2026-06-02 17:07 ` Stephen Hemminger
2026-06-04 15:39 ` Konstantin Ananyev
2026-06-02 17:07 ` [PATCH 4/5] ring: drop unused arg to update_tail Stephen Hemminger
` (2 subsequent siblings)
5 siblings, 1 reply; 17+ messages in thread
From: Stephen Hemminger @ 2026-06-02 17:07 UTC (permalink / raw)
To: dev; +Cc: Stephen Hemminger, Konstantin Ananyev, Wathsala Vithanage
The GCC builtin atomic special case is not needed for updating tail.
The performance is the same with C11 memory model.
Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
---
lib/ring/rte_ring_c11_pvt.h | 24 ------------------------
lib/ring/rte_ring_elem_pvt.h | 22 ++++++++++++++++++++++
lib/ring/rte_ring_gcc_pvt.h | 25 -------------------------
3 files changed, 22 insertions(+), 49 deletions(-)
diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h
index 8358b0f21f..3258829696 100644
--- a/lib/ring/rte_ring_c11_pvt.h
+++ b/lib/ring/rte_ring_c11_pvt.h
@@ -19,30 +19,6 @@
* For more information please refer to <rte_ring.h>.
*/
-/**
- * @internal This function updates tail values.
- */
-static __rte_always_inline void
-__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
- uint32_t new_val, uint32_t single, uint32_t enqueue)
-{
- RTE_SET_USED(enqueue);
-
- /*
- * If there are other enqueues/dequeues in progress that preceded us,
- * we need to wait for them to complete
- */
- if (!single)
- rte_wait_until_equal_32((uint32_t *)(uintptr_t)&ht->tail, old_val,
- rte_memory_order_relaxed);
-
- /*
- * R0: Establishes a synchronizing edge with load-acquire of tail at A1.
- * Ensures that memory effects by this thread on ring elements array
- * is observed by a different thread of the other type.
- */
- rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release);
-}
/**
* @internal This is a helper function that moves the producer/consumer head
* optimized for single threaded case
diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h
index 9a0170c4f0..a7ff76931b 100644
--- a/lib/ring/rte_ring_elem_pvt.h
+++ b/lib/ring/rte_ring_elem_pvt.h
@@ -299,6 +299,28 @@ __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head,
cons_head & r->mask, esize, num);
}
+static __rte_always_inline void
+__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
+ uint32_t new_val, uint32_t single, uint32_t enqueue)
+{
+ RTE_SET_USED(enqueue);
+
+ /*
+ * If there are other enqueues/dequeues in progress that preceded us,
+ * we need to wait for them to complete
+ */
+ if (!single)
+ rte_wait_until_equal_32((uint32_t *)(uintptr_t)&ht->tail, old_val,
+ rte_memory_order_relaxed);
+
+ /*
+ * R0: Establishes a synchronizing edge with load-acquire of tail at A1.
+ * Ensures that memory effects by this thread on ring elements array
+ * is observed by a different thread of the other type.
+ */
+ rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release);
+}
+
/* Between load and load. there might be cpu reorder in weak model
* (powerpc/arm).
* There are 2 choices for the users
diff --git a/lib/ring/rte_ring_gcc_pvt.h b/lib/ring/rte_ring_gcc_pvt.h
index 9033a15647..6b14c1c822 100644
--- a/lib/ring/rte_ring_gcc_pvt.h
+++ b/lib/ring/rte_ring_gcc_pvt.h
@@ -18,31 +18,6 @@
* For more information please refer to <rte_ring.h>.
*/
-/**
- * @internal This function updates tail values.
- */
-static __rte_always_inline void
-__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
- uint32_t new_val, uint32_t single, uint32_t enqueue)
-{
- RTE_SET_USED(enqueue);
-
- /*
- * If there are other enqueues/dequeues in progress that preceded us,
- * we need to wait for them to complete
- */
- if (!single)
- rte_wait_until_equal_32((volatile uint32_t *)(uintptr_t)&ht->tail, old_val,
- rte_memory_order_relaxed);
-
- /*
- * R0: Establishes a synchronizing edge with load-acquire of tail at A1.
- * Ensures that memory effects by this thread on ring elements array
- * is observed by a different thread of the other type.
- */
- __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
-}
-
/**
* @internal This is a helper function that moves the producer/consumer head
* for use in multi-thread safe path
--
2.53.0
^ permalink raw reply related [flat|nested] 17+ messages in thread* RE: [PATCH 3/5] ring: use C11 for update_tail
2026-06-02 17:07 ` [PATCH 3/5] ring: use C11 for update_tail Stephen Hemminger
@ 2026-06-04 15:39 ` Konstantin Ananyev
0 siblings, 0 replies; 17+ messages in thread
From: Konstantin Ananyev @ 2026-06-04 15:39 UTC (permalink / raw)
To: Stephen Hemminger, dev@dpdk.org; +Cc: Wathsala Vithanage
> The GCC builtin atomic special case is not needed for updating tail.
> The performance is the same with C11 memory model.
>
> Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
> ---
> lib/ring/rte_ring_c11_pvt.h | 24 ------------------------
> lib/ring/rte_ring_elem_pvt.h | 22 ++++++++++++++++++++++
> lib/ring/rte_ring_gcc_pvt.h | 25 -------------------------
> 3 files changed, 22 insertions(+), 49 deletions(-)
>
> --
Acked-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
Tested-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
> 2.53.0
^ permalink raw reply [flat|nested] 17+ messages in thread
* [PATCH 4/5] ring: drop unused arg to update_tail
2026-06-02 17:07 [PATCH 0/5] ring: convert to C11 atomics where practical Stephen Hemminger
` (2 preceding siblings ...)
2026-06-02 17:07 ` [PATCH 3/5] ring: use C11 for update_tail Stephen Hemminger
@ 2026-06-02 17:07 ` Stephen Hemminger
2026-06-04 15:40 ` Konstantin Ananyev
2026-06-02 17:07 ` [PATCH 5/5] ring: use C11 for single thread move head Stephen Hemminger
2026-06-04 16:32 ` [PATCH v2] ring: convert to C11 atomics where practical Stephen Hemminger
5 siblings, 1 reply; 17+ messages in thread
From: Stephen Hemminger @ 2026-06-02 17:07 UTC (permalink / raw)
To: dev; +Cc: Stephen Hemminger, Konstantin Ananyev, Wathsala Vithanage
The internal functions to update tail of ring no longer use
the enqueue flag argument.
Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
---
lib/ring/rte_ring_elem_pvt.h | 8 +++-----
lib/ring/rte_ring_hts_elem_pvt.h | 8 +++-----
lib/ring/soring.c | 10 +++++-----
3 files changed, 11 insertions(+), 15 deletions(-)
diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h
index a7ff76931b..74b5fef771 100644
--- a/lib/ring/rte_ring_elem_pvt.h
+++ b/lib/ring/rte_ring_elem_pvt.h
@@ -301,10 +301,8 @@ __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head,
static __rte_always_inline void
__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
- uint32_t new_val, uint32_t single, uint32_t enqueue)
+ uint32_t new_val, uint32_t single)
{
- RTE_SET_USED(enqueue);
-
/*
* If there are other enqueues/dequeues in progress that preceded us,
* we need to wait for them to complete
@@ -448,7 +446,7 @@ __rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table,
__rte_ring_enqueue_elems(r, prod_head, obj_table, esize, n);
- __rte_ring_update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
+ __rte_ring_update_tail(&r->prod, prod_head, prod_next, is_sp);
end:
if (free_space != NULL)
*free_space = free_entries - n;
@@ -495,7 +493,7 @@ __rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table,
__rte_ring_dequeue_elems(r, cons_head, obj_table, esize, n);
- __rte_ring_update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
+ __rte_ring_update_tail(&r->cons, cons_head, cons_next, is_sc);
end:
if (available != NULL)
diff --git a/lib/ring/rte_ring_hts_elem_pvt.h b/lib/ring/rte_ring_hts_elem_pvt.h
index a01089d15d..97ae240e2e 100644
--- a/lib/ring/rte_ring_hts_elem_pvt.h
+++ b/lib/ring/rte_ring_hts_elem_pvt.h
@@ -25,12 +25,10 @@
*/
static __rte_always_inline void
__rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t old_tail,
- uint32_t num, uint32_t enqueue)
+ uint32_t num)
{
uint32_t tail;
- RTE_SET_USED(enqueue);
-
tail = old_tail + num;
/*
@@ -217,7 +215,7 @@ __rte_ring_do_hts_enqueue_elem(struct rte_ring *r, const void *obj_table,
if (n != 0) {
__rte_ring_enqueue_elems(r, head, obj_table, esize, n);
- __rte_ring_hts_update_tail(&r->hts_prod, head, n, 1);
+ __rte_ring_hts_update_tail(&r->hts_prod, head, n);
}
if (free_space != NULL)
@@ -258,7 +256,7 @@ __rte_ring_do_hts_dequeue_elem(struct rte_ring *r, void *obj_table,
if (n != 0) {
__rte_ring_dequeue_elems(r, head, obj_table, esize, n);
- __rte_ring_hts_update_tail(&r->hts_cons, head, n, 0);
+ __rte_ring_hts_update_tail(&r->hts_cons, head, n);
}
if (available != NULL)
diff --git a/lib/ring/soring.c b/lib/ring/soring.c
index 22f9c60e9c..45292c0f78 100644
--- a/lib/ring/soring.c
+++ b/lib/ring/soring.c
@@ -202,21 +202,21 @@ __rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num,
static __rte_always_inline void
__rte_soring_update_tail(struct __rte_ring_headtail *rht,
- enum rte_ring_sync_type st, uint32_t head, uint32_t next, uint32_t enq)
+ enum rte_ring_sync_type st, uint32_t head, uint32_t next)
{
uint32_t n;
switch (st) {
case RTE_RING_SYNC_ST:
case RTE_RING_SYNC_MT:
- __rte_ring_update_tail(&rht->ht, head, next, st, enq);
+ __rte_ring_update_tail(&rht->ht, head, next, st);
break;
case RTE_RING_SYNC_MT_RTS:
__rte_ring_rts_update_tail(&rht->rts);
break;
case RTE_RING_SYNC_MT_HTS:
n = next - head;
- __rte_ring_hts_update_tail(&rht->hts, head, n, enq);
+ __rte_ring_hts_update_tail(&rht->hts, head, n);
break;
default:
/* unsupported mode, shouldn't be here */
@@ -295,7 +295,7 @@ soring_enqueue(struct rte_soring *r, const void *objs,
&prod_head, &prod_next, &nb_free);
if (n != 0) {
__enqueue_elems(r, objs, meta, prod_head, n);
- __rte_soring_update_tail(&r->prod, st, prod_head, prod_next, 1);
+ __rte_soring_update_tail(&r->prod, st, prod_head, prod_next);
}
if (free_space != NULL)
@@ -401,7 +401,7 @@ soring_dequeue(struct rte_soring *r, void *objs, void *meta,
/* we have some elems to consume */
if (n != 0) {
__dequeue_elems(r, objs, meta, cons_head, n);
- __rte_soring_update_tail(&r->cons, st, cons_head, cons_next, 0);
+ __rte_soring_update_tail(&r->cons, st, cons_head, cons_next);
}
if (available != NULL)
--
2.53.0
^ permalink raw reply related [flat|nested] 17+ messages in thread* RE: [PATCH 4/5] ring: drop unused arg to update_tail
2026-06-02 17:07 ` [PATCH 4/5] ring: drop unused arg to update_tail Stephen Hemminger
@ 2026-06-04 15:40 ` Konstantin Ananyev
0 siblings, 0 replies; 17+ messages in thread
From: Konstantin Ananyev @ 2026-06-04 15:40 UTC (permalink / raw)
To: Stephen Hemminger, dev@dpdk.org; +Cc: Wathsala Vithanage
> The internal functions to update tail of ring no longer use
> the enqueue flag argument.
>
> Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
> ---
> lib/ring/rte_ring_elem_pvt.h | 8 +++-----
> lib/ring/rte_ring_hts_elem_pvt.h | 8 +++-----
> lib/ring/soring.c | 10 +++++-----
> 3 files changed, 11 insertions(+), 15 deletions(-)
>
> --
Acked-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
Tested-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
> 2.53.0
^ permalink raw reply [flat|nested] 17+ messages in thread
* [PATCH 5/5] ring: use C11 for single thread move head
2026-06-02 17:07 [PATCH 0/5] ring: convert to C11 atomics where practical Stephen Hemminger
` (3 preceding siblings ...)
2026-06-02 17:07 ` [PATCH 4/5] ring: drop unused arg to update_tail Stephen Hemminger
@ 2026-06-02 17:07 ` Stephen Hemminger
2026-06-04 15:41 ` Konstantin Ananyev
2026-06-04 16:32 ` [PATCH v2] ring: convert to C11 atomics where practical Stephen Hemminger
5 siblings, 1 reply; 17+ messages in thread
From: Stephen Hemminger @ 2026-06-02 17:07 UTC (permalink / raw)
To: dev; +Cc: Stephen Hemminger, Konstantin Ananyev, Wathsala Vithanage
The function to move head for single threaded case can always
use the C11 code, there is no performance difference from GCC
intrinsics.
This reduces the exception code to just one function.
Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
---
lib/ring/rte_ring_c11_pvt.h | 62 ------------------------------
lib/ring/rte_ring_elem_pvt.h | 74 +++++++++++++++++++++++++++++++++---
lib/ring/rte_ring_gcc_pvt.h | 59 ----------------------------
3 files changed, 68 insertions(+), 127 deletions(-)
diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h
index 3258829696..0ba64379fa 100644
--- a/lib/ring/rte_ring_c11_pvt.h
+++ b/lib/ring/rte_ring_c11_pvt.h
@@ -19,68 +19,6 @@
* For more information please refer to <rte_ring.h>.
*/
-/**
- * @internal This is a helper function that moves the producer/consumer head
- * optimized for single threaded case
- *
- * @param d
- * A pointer to the headtail structure with head value to be moved
- * @param s
- * A pointer to the counter-part headtail structure. Note that this
- * function only reads tail value from it
- * @param capacity
- * Either ring capacity value (for producer), or zero (for consumer)
- * @param n
- * The number of elements we want to move head value on
- * @param behavior
- * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
- * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
- * @param old_head
- * Returns head value as it was before the move
- * @param new_head
- * Returns the new head value
- * @param entries
- * Returns the number of ring entries available BEFORE head was moved
- * @return
- * Actual number of objects the head was moved on
- * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
- */
-static __rte_always_inline unsigned int
-__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d,
- const struct rte_ring_headtail *s, uint32_t capacity,
- unsigned int n,
- enum rte_ring_queue_behavior behavior,
- uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
-{
- uint32_t stail;
-
- /* Single producer: only this thread writes d->head,
- * so a relaxed load is sufficient.
- */
- *old_head = rte_atomic_load_explicit(&d->head, rte_memory_order_acquire);
-
- /* Acquire pairs with the consumer's release-store of tail in __rte_ring_update_tail,
- * ensuring the consumer's ring-element reads are complete before
- * we observe the updated tail.
- */
- stail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire);
-
- /* Unsigned subtraction is modulo 2^32, so entries is always in
- * [0, capacity) even if old_head > stail.
- */
- *entries = capacity + stail - *old_head;
-
- /* check that we have enough room in ring */
- if (unlikely(n > *entries))
- n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
-
- if (n > 0) {
- *new_head = *old_head + n;
- rte_atomic_store_explicit(&d->head, *new_head, rte_memory_order_relaxed);
- }
-
- return n;
-}
/**
* @internal This is a helper function that moves the producer/consumer head
diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h
index 74b5fef771..cd77343f38 100644
--- a/lib/ring/rte_ring_elem_pvt.h
+++ b/lib/ring/rte_ring_elem_pvt.h
@@ -319,12 +319,74 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release);
}
-/* Between load and load. there might be cpu reorder in weak model
- * (powerpc/arm).
- * There are 2 choices for the users
- * 1.use rmb() memory barrier
- * 2.use one-direction load_acquire/store_release barrier
- * It depends on performance test results.
+/**
+ * @internal This is a helper function that moves the producer/consumer head
+ * optimized for single threaded case
+ *
+ * @param d
+ * A pointer to the headtail structure with head value to be moved
+ * @param s
+ * A pointer to the counter-part headtail structure. Note that this
+ * function only reads tail value from it
+ * @param capacity
+ * Either ring capacity value (for producer), or zero (for consumer)
+ * @param n
+ * The number of elements we want to move head value on
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
+ * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
+ * @param old_head
+ * Returns head value as it was before the move
+ * @param new_head
+ * Returns the new head value
+ * @param entries
+ * Returns the number of ring entries available BEFORE head was moved
+ * @return
+ * Actual number of objects the head was moved on
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
+ */
+static __rte_always_inline unsigned int
+__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d,
+ const struct rte_ring_headtail *s, uint32_t capacity,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
+{
+ uint32_t stail;
+
+ /* Single producer: only this thread writes d->head,
+ * so a relaxed load is sufficient.
+ */
+ *old_head = rte_atomic_load_explicit(&d->head, rte_memory_order_acquire);
+
+ /* Acquire pairs with the consumer's release-store of tail in __rte_ring_update_tail,
+ * ensuring the consumer's ring-element reads are complete before
+ * we observe the updated tail.
+ */
+ stail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire);
+
+ /* Unsigned subtraction is modulo 2^32, so entries is always in
+ * [0, capacity) even if old_head > stail.
+ */
+ *entries = capacity + stail - *old_head;
+
+ /* check that we have enough room in ring */
+ if (unlikely(n > *entries))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+ if (n > 0) {
+ *new_head = *old_head + n;
+ rte_atomic_store_explicit(&d->head, *new_head, rte_memory_order_relaxed);
+ }
+
+ return n;
+}
+
+/*
+ * The function __rte_ring_headtail_move_head_mt has two versions
+ * based on what is most efficient on a given architecture.
+ *
+ * The C11 is preferred but on x86 GCC has 10% performance drop.
*/
#ifdef RTE_USE_C11_MEM_MODEL
#include "rte_ring_c11_pvt.h"
diff --git a/lib/ring/rte_ring_gcc_pvt.h b/lib/ring/rte_ring_gcc_pvt.h
index 6b14c1c822..ec26fe557a 100644
--- a/lib/ring/rte_ring_gcc_pvt.h
+++ b/lib/ring/rte_ring_gcc_pvt.h
@@ -90,63 +90,4 @@ __rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
return n;
}
-/**
- * @internal This is a helper function that moves the producer/consumer head
- * optimized for single threaded case
- *
- * @param d
- * A pointer to the headtail structure with head value to be moved
- * @param s
- * A pointer to the counter-part headtail structure. Note that this
- * function only reads tail value from it
- * @param capacity
- * Either ring capacity value (for producer), or zero (for consumer)
- * @param n
- * The number of elements we want to move head value on
- * @param behavior
- * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
- * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
- * @param old_head
- * Returns head value as it was before the move
- * @param new_head
- * Returns the new head value
- * @param entries
- * Returns the number of ring entries available BEFORE head was moved
- * @return
- * Actual number of objects the head was moved on
- * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
- */
-static __rte_always_inline unsigned int
-__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d,
- const struct rte_ring_headtail *s, uint32_t capacity,
- unsigned int n,
- enum rte_ring_queue_behavior behavior,
- uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
-{
- *old_head = d->head;
-
- /* add rmb barrier to avoid load/load reorder in weak
- * memory model. It is noop on x86
- */
- rte_smp_rmb();
-
- /*
- * The subtraction is done between two unsigned 32bits value
- * (the result is always modulo 32 bits even if we have
- * *old_head > s->tail). So 'entries' is always between 0
- * and capacity (which is < size).
- */
- *entries = (capacity + s->tail - *old_head);
-
- /* check that we have enough room in ring */
- if (unlikely(n > *entries))
- n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
-
- if (likely(n > 0)) {
- *new_head = *old_head + n;
- d->head = *new_head;
- }
- return n;
-}
-
#endif /* _RTE_RING_GCC_PVT_H_ */
--
2.53.0
^ permalink raw reply related [flat|nested] 17+ messages in thread* RE: [PATCH 5/5] ring: use C11 for single thread move head
2026-06-02 17:07 ` [PATCH 5/5] ring: use C11 for single thread move head Stephen Hemminger
@ 2026-06-04 15:41 ` Konstantin Ananyev
0 siblings, 0 replies; 17+ messages in thread
From: Konstantin Ananyev @ 2026-06-04 15:41 UTC (permalink / raw)
To: Stephen Hemminger, dev@dpdk.org; +Cc: Wathsala Vithanage
> The function to move head for single threaded case can always
> use the C11 code, there is no performance difference from GCC
> intrinsics.
>
> This reduces the exception code to just one function.
>
> Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
> ---
> lib/ring/rte_ring_c11_pvt.h | 62 ------------------------------
> lib/ring/rte_ring_elem_pvt.h | 74 +++++++++++++++++++++++++++++++++---
> lib/ring/rte_ring_gcc_pvt.h | 59 ----------------------------
> 3 files changed, 68 insertions(+), 127 deletions(-)
>
Acked-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
Tested-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
> --
> 2.53.0
^ permalink raw reply [flat|nested] 17+ messages in thread
* [PATCH v2] ring: convert to C11 atomics where practical
2026-06-02 17:07 [PATCH 0/5] ring: convert to C11 atomics where practical Stephen Hemminger
` (4 preceding siblings ...)
2026-06-02 17:07 ` [PATCH 5/5] ring: use C11 for single thread move head Stephen Hemminger
@ 2026-06-04 16:32 ` Stephen Hemminger
2026-06-04 16:32 ` [PATCH v2 1/3] ring: split single thread vs multi-thread cases Stephen Hemminger
` (2 more replies)
5 siblings, 3 replies; 17+ messages in thread
From: Stephen Hemminger @ 2026-06-04 16:32 UTC (permalink / raw)
To: dev; +Cc: Stephen Hemminger
This is split out from the atomic deprecation series. It converts lib/ring
off rte_atomic32 and onto the C11 memory model, except where the C11 version
has a noticeable performance drop on x86 with GCC.
The pre-existing C11 and GCC-builtin paths lived in separate headers with
substantial duplication. After this series, only the MP head CAS
(__rte_ring_headtail_move_head_mt) retains separate implementations;
everything else is shared. Patch 2 documents the reason for keeping the GCC
builtin on the MP head CAS.
The default RTE_USE_C11_MEM_MODEL selection per architecture is unchanged.
v2 - consolidate cleanup patches
- fix the memory order on first load in _st case.
it was going back/forth across the patches
Stephen Hemminger (3):
ring: split single thread vs multi-thread cases
ring: use GCC builtin as alternative to rte_atomic32
ring: cleanup the C11 code
lib/ring/meson.build | 2 +-
lib/ring/rte_ring_c11_pvt.h | 62 +++-------
lib/ring/rte_ring_elem_pvt.h | 116 ++++++++++++++++--
..._ring_generic_pvt.h => rte_ring_gcc_pvt.h} | 58 +++------
lib/ring/rte_ring_hts_elem_pvt.h | 8 +-
lib/ring/soring.c | 34 ++---
6 files changed, 159 insertions(+), 121 deletions(-)
rename lib/ring/{rte_ring_generic_pvt.h => rte_ring_gcc_pvt.h} (65%)
--
2.53.0
^ permalink raw reply [flat|nested] 17+ messages in thread* [PATCH v2 1/3] ring: split single thread vs multi-thread cases
2026-06-04 16:32 ` [PATCH v2] ring: convert to C11 atomics where practical Stephen Hemminger
@ 2026-06-04 16:32 ` Stephen Hemminger
2026-06-04 16:32 ` [PATCH v2 2/3] ring: use GCC builtin as alternative to rte_atomic32 Stephen Hemminger
2026-06-04 16:32 ` [PATCH v2 3/3] ring: cleanup the C11 code Stephen Hemminger
2 siblings, 0 replies; 17+ messages in thread
From: Stephen Hemminger @ 2026-06-04 16:32 UTC (permalink / raw)
To: dev; +Cc: Stephen Hemminger, Konstantin Ananyev, Wathsala Vithanage
The move head function has optimization for updating when
being used on single threaded ring. Code is cleaner if the two
cases are split into separate functions.
Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
Acked-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
Tested-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
---
lib/ring/rte_ring_c11_pvt.h | 100 +++++++++++++++++++++++++-------
lib/ring/rte_ring_elem_pvt.h | 16 +++--
lib/ring/rte_ring_generic_pvt.h | 77 ++++++++++++++++++++----
lib/ring/soring.c | 24 +++++---
4 files changed, 171 insertions(+), 46 deletions(-)
diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h
index 07b6efc416..5afc14dec9 100644
--- a/lib/ring/rte_ring_c11_pvt.h
+++ b/lib/ring/rte_ring_c11_pvt.h
@@ -46,6 +46,7 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
/**
* @internal This is a helper function that moves the producer/consumer head
+ * optimized for single threaded case
*
* @param d
* A pointer to the headtail structure with head value to be moved
@@ -54,8 +55,6 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
* function only reads tail value from it
* @param capacity
* Either ring capacity value (for producer), or zero (for consumer)
- * @param is_st
- * Indicates whether multi-thread safe path is needed or not
* @param n
* The number of elements we want to move head value on
* @param behavior
@@ -72,14 +71,77 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
* If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
*/
static __rte_always_inline unsigned int
-__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
+__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d,
const struct rte_ring_headtail *s, uint32_t capacity,
- unsigned int is_st, unsigned int n,
+ unsigned int n,
enum rte_ring_queue_behavior behavior,
uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
{
uint32_t stail;
- int success;
+
+ /* Single producer: only this thread writes d->head,
+ * so a relaxed load is sufficient.
+ */
+ *old_head = rte_atomic_load_explicit(&d->head, rte_memory_order_relaxed);
+
+ /* Acquire pairs with the consumer's release-store of tail in __rte_ring_update_tail,
+ * ensuring the consumer's ring-element reads are complete before
+ * we observe the updated tail.
+ */
+ stail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire);
+
+ /* Unsigned subtraction is modulo 2^32, so entries is always in
+ * [0, capacity) even if old_head > stail.
+ */
+ *entries = capacity + stail - *old_head;
+
+ /* check that we have enough room in ring */
+ if (unlikely(n > *entries))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+ if (n > 0) {
+ *new_head = *old_head + n;
+ rte_atomic_store_explicit(&d->head, *new_head, rte_memory_order_relaxed);
+ }
+
+ return n;
+}
+
+/**
+ * @internal This is a helper function that moves the producer/consumer head
+ * for use in multi-thread safe path
+ *
+ * @param d
+ * A pointer to the headtail structure with head value to be moved
+ * @param s
+ * A pointer to the counter-part headtail structure. Note that this
+ * function only reads tail value from it
+ * @param capacity
+ * Either ring capacity value (for producer), or zero (for consumer)
+ * @param n
+ * The number of elements we want to move head value on
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
+ * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
+ * @param old_head
+ * Returns head value as it was before the move
+ * @param new_head
+ * Returns the new head value
+ * @param entries
+ * Returns the number of ring entries available BEFORE head was moved
+ * @return
+ * Actual number of objects the head was moved on
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
+ */
+static __rte_always_inline unsigned int
+__rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
+ const struct rte_ring_headtail *s, uint32_t capacity,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
+{
+ uint32_t stail;
+ bool success;
unsigned int max = n;
/*
@@ -120,25 +182,21 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
return 0;
*new_head = *old_head + n;
- if (is_st) {
- d->head = *new_head;
- success = 1;
- } else
- /* on failure, *old_head is updated */
- /*
- * R1/A2.
- * R1: Establishes a synchronizing edge with A0 of a
- * different thread.
- * A2: Establishes a synchronizing edge with R1 of a
- * different thread to observe same value for stail
- * observed by that thread on CAS failure (to retry
- * with an updated *old_head).
- */
- success = rte_atomic_compare_exchange_strong_explicit(
+ /* on failure, *old_head is updated */
+ /*
+ * R1/A2.
+ * R1: Establishes a synchronizing edge with A0 of a
+ * different thread.
+ * A2: Establishes a synchronizing edge with R1 of a
+ * different thread to observe same value for stail
+ * observed by that thread on CAS failure (to retry
+ * with an updated *old_head).
+ */
+ success = rte_atomic_compare_exchange_strong_explicit(
&d->head, old_head, *new_head,
rte_memory_order_release,
rte_memory_order_acquire);
- } while (unlikely(success == 0));
+ } while (unlikely(!success));
return n;
}
diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h
index 6eafae121f..a0fdec9812 100644
--- a/lib/ring/rte_ring_elem_pvt.h
+++ b/lib/ring/rte_ring_elem_pvt.h
@@ -341,8 +341,12 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
uint32_t *old_head, uint32_t *new_head,
uint32_t *free_entries)
{
- return __rte_ring_headtail_move_head(&r->prod, &r->cons, r->capacity,
- is_sp, n, behavior, old_head, new_head, free_entries);
+ if (is_sp)
+ return __rte_ring_headtail_move_head_st(&r->prod, &r->cons, r->capacity,
+ n, behavior, old_head, new_head, free_entries);
+ else
+ return __rte_ring_headtail_move_head_mt(&r->prod, &r->cons, r->capacity,
+ n, behavior, old_head, new_head, free_entries);
}
/**
@@ -374,8 +378,12 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
uint32_t *old_head, uint32_t *new_head,
uint32_t *entries)
{
- return __rte_ring_headtail_move_head(&r->cons, &r->prod, 0,
- is_sc, n, behavior, old_head, new_head, entries);
+ if (is_sc)
+ return __rte_ring_headtail_move_head_st(&r->cons, &r->prod, 0,
+ n, behavior, old_head, new_head, entries);
+ else
+ return __rte_ring_headtail_move_head_mt(&r->cons, &r->prod, 0,
+ n, behavior, old_head, new_head, entries);
}
/**
diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_generic_pvt.h
index affd2d5ba7..c044b0824f 100644
--- a/lib/ring/rte_ring_generic_pvt.h
+++ b/lib/ring/rte_ring_generic_pvt.h
@@ -42,6 +42,7 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
/**
* @internal This is a helper function that moves the producer/consumer head
+ * for use in multi-thread safe path
*
* @param d
* A pointer to the headtail structure with head value to be moved
@@ -50,8 +51,6 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
* function only reads tail value from it
* @param capacity
* Either ring capacity value (for producer), or zero (for consumer)
- * @param is_st
- * Indicates whether multi-thread safe path is needed or not
* @param n
* The number of elements we want to move head value on
* @param behavior
@@ -68,10 +67,9 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
* If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
*/
static __rte_always_inline unsigned int
-__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
+__rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
const struct rte_ring_headtail *s, uint32_t capacity,
- unsigned int is_st, unsigned int n,
- enum rte_ring_queue_behavior behavior,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
{
unsigned int max = n;
@@ -105,15 +103,70 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
return 0;
*new_head = *old_head + n;
- if (is_st) {
- d->head = *new_head;
- success = 1;
- } else
- success = rte_atomic32_cmpset(
- (uint32_t *)(uintptr_t)&d->head,
- *old_head, *new_head);
+ success = rte_atomic32_cmpset(
+ (uint32_t *)(uintptr_t)&d->head,
+ *old_head, *new_head);
} while (unlikely(success == 0));
return n;
}
+/**
+ * @internal This is a helper function that moves the producer/consumer head
+ * optimized for single threaded case
+ *
+ * @param d
+ * A pointer to the headtail structure with head value to be moved
+ * @param s
+ * A pointer to the counter-part headtail structure. Note that this
+ * function only reads tail value from it
+ * @param capacity
+ * Either ring capacity value (for producer), or zero (for consumer)
+ * @param n
+ * The number of elements we want to move head value on
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
+ * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
+ * @param old_head
+ * Returns head value as it was before the move
+ * @param new_head
+ * Returns the new head value
+ * @param entries
+ * Returns the number of ring entries available BEFORE head was moved
+ * @return
+ * Actual number of objects the head was moved on
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
+ */
+static __rte_always_inline unsigned int
+__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d,
+ const struct rte_ring_headtail *s, uint32_t capacity,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
+{
+ *old_head = d->head;
+
+ /* add rmb barrier to avoid load/load reorder in weak
+ * memory model. It is noop on x86
+ */
+ rte_smp_rmb();
+
+ /*
+ * The subtraction is done between two unsigned 32bits value
+ * (the result is always modulo 32 bits even if we have
+ * *old_head > s->tail). So 'entries' is always between 0
+ * and capacity (which is < size).
+ */
+ *entries = (capacity + s->tail - *old_head);
+
+ /* check that we have enough room in ring */
+ if (unlikely(n > *entries))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+ if (likely(n > 0)) {
+ *new_head = *old_head + n;
+ d->head = *new_head;
+ }
+ return n;
+}
+
#endif /* _RTE_RING_GENERIC_PVT_H_ */
diff --git a/lib/ring/soring.c b/lib/ring/soring.c
index e9c75619fe..22f9c60e9c 100644
--- a/lib/ring/soring.c
+++ b/lib/ring/soring.c
@@ -135,9 +135,12 @@ __rte_soring_move_prod_head(struct rte_soring *r, uint32_t num,
switch (st) {
case RTE_RING_SYNC_ST:
+ n = __rte_ring_headtail_move_head_st(&r->prod.ht, &r->cons.ht,
+ r->capacity, num, behavior, head, next, free);
+ break;
case RTE_RING_SYNC_MT:
- n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht,
- r->capacity, st, num, behavior, head, next, free);
+ n = __rte_ring_headtail_move_head_mt(&r->prod.ht, &r->cons.ht,
+ r->capacity, num, behavior, head, next, free);
break;
case RTE_RING_SYNC_MT_RTS:
n = __rte_ring_rts_move_head(&r->prod.rts, &r->cons.ht,
@@ -168,9 +171,13 @@ __rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num,
switch (st) {
case RTE_RING_SYNC_ST:
+ n = __rte_ring_headtail_move_head_st(&r->cons.ht,
+ &r->stage[stage].ht, 0, num, behavior,
+ head, next, avail);
+ break;
case RTE_RING_SYNC_MT:
- n = __rte_ring_headtail_move_head(&r->cons.ht,
- &r->stage[stage].ht, 0, st, num, behavior,
+ n = __rte_ring_headtail_move_head_mt(&r->cons.ht,
+ &r->stage[stage].ht, 0, num, behavior,
head, next, avail);
break;
case RTE_RING_SYNC_MT_RTS:
@@ -309,9 +316,8 @@ soring_enqueue_start(struct rte_soring *r, uint32_t num,
switch (st) {
case RTE_RING_SYNC_ST:
- n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht,
- r->capacity, RTE_RING_SYNC_ST, num, behavior,
- &head, &next, &free);
+ n = __rte_ring_headtail_move_head_st(&r->prod.ht, &r->cons.ht,
+ r->capacity, num, behavior, &head, &next, &free);
break;
case RTE_RING_SYNC_MT_HTS:
n = __rte_ring_hts_move_head(&r->prod.hts, &r->cons.ht,
@@ -419,8 +425,8 @@ soring_dequeue_start(struct rte_soring *r, void *objs, void *meta,
switch (st) {
case RTE_RING_SYNC_ST:
- n = __rte_ring_headtail_move_head(&r->cons.ht, &r->stage[ns].ht,
- 0, RTE_RING_SYNC_ST, num, behavior, &head, &next,
+ n = __rte_ring_headtail_move_head_st(&r->cons.ht, &r->stage[ns].ht,
+ 0, num, behavior, &head, &next,
&avail);
break;
case RTE_RING_SYNC_MT_HTS:
--
2.53.0
^ permalink raw reply related [flat|nested] 17+ messages in thread* [PATCH v2 2/3] ring: use GCC builtin as alternative to rte_atomic32
2026-06-04 16:32 ` [PATCH v2] ring: convert to C11 atomics where practical Stephen Hemminger
2026-06-04 16:32 ` [PATCH v2 1/3] ring: split single thread vs multi-thread cases Stephen Hemminger
@ 2026-06-04 16:32 ` Stephen Hemminger
2026-06-04 16:32 ` [PATCH v2 3/3] ring: cleanup the C11 code Stephen Hemminger
2 siblings, 0 replies; 17+ messages in thread
From: Stephen Hemminger @ 2026-06-04 16:32 UTC (permalink / raw)
To: dev; +Cc: Stephen Hemminger, Konstantin Ananyev, Wathsala Vithanage
This patch replaces use of the deprecated rte_atomic32 code with
GCC builtin atomic operations.
Although it would be preferable to use C11 version on all architectures,
there is a performance loss if we do it that way:
Measured on i9-13900H, two physical cores MP/MC bulk n=128, 10 runs:
with C11 builtin: 5.86 cycles/elem
with __sync builtin: 5.36 cycles/elem (-9.4%)
The C11 __atomic_compare_exchange_n builtin writes the actual value back
to its expected pointer on failure. On x86 this forces GCC
to emit extra instructions on the critical path between the CAS
and the success-test.
__sync_bool_compare_and_swap returns a plain bool with no pointer
writeback, allowing GCC to emit tighter code.
Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
---
lib/ring/meson.build | 2 +-
lib/ring/rte_ring_elem_pvt.h | 2 +-
..._ring_generic_pvt.h => rte_ring_gcc_pvt.h} | 33 +++++++++++--------
3 files changed, 21 insertions(+), 16 deletions(-)
rename lib/ring/{rte_ring_generic_pvt.h => rte_ring_gcc_pvt.h} (88%)
diff --git a/lib/ring/meson.build b/lib/ring/meson.build
index 21f2c12989..2ba160b178 100644
--- a/lib/ring/meson.build
+++ b/lib/ring/meson.build
@@ -9,7 +9,7 @@ indirect_headers += files (
'rte_ring_elem.h',
'rte_ring_elem_pvt.h',
'rte_ring_c11_pvt.h',
- 'rte_ring_generic_pvt.h',
+ 'rte_ring_gcc_pvt.h',
'rte_ring_hts.h',
'rte_ring_hts_elem_pvt.h',
'rte_ring_peek.h',
diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h
index a0fdec9812..9a0170c4f0 100644
--- a/lib/ring/rte_ring_elem_pvt.h
+++ b/lib/ring/rte_ring_elem_pvt.h
@@ -309,7 +309,7 @@ __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head,
#ifdef RTE_USE_C11_MEM_MODEL
#include "rte_ring_c11_pvt.h"
#else
-#include "rte_ring_generic_pvt.h"
+#include "rte_ring_gcc_pvt.h"
#endif
/**
diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_gcc_pvt.h
similarity index 88%
rename from lib/ring/rte_ring_generic_pvt.h
rename to lib/ring/rte_ring_gcc_pvt.h
index c044b0824f..68ab1355e8 100644
--- a/lib/ring/rte_ring_generic_pvt.h
+++ b/lib/ring/rte_ring_gcc_pvt.h
@@ -7,11 +7,11 @@
* Used as BSD-3 Licensed with permission from Kip Macy.
*/
-#ifndef _RTE_RING_GENERIC_PVT_H_
-#define _RTE_RING_GENERIC_PVT_H_
+#ifndef _RTE_RING_GCC_PVT_H_
+#define _RTE_RING_GCC_PVT_H_
/**
- * @file rte_ring_generic_pvt.h
+ * @file rte_ring_gcc_pvt.h
* It is not recommended to include this file directly,
* include <rte_ring.h> instead.
* Contains internal helper functions for MP/SP and MC/SC ring modes.
@@ -25,10 +25,8 @@ static __rte_always_inline void
__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
uint32_t new_val, uint32_t single, uint32_t enqueue)
{
- if (enqueue)
- rte_smp_wmb();
- else
- rte_smp_rmb();
+ RTE_SET_USED(enqueue);
+
/*
* If there are other enqueues/dequeues in progress that preceded us,
* we need to wait for them to complete
@@ -37,7 +35,12 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
rte_wait_until_equal_32((volatile uint32_t *)(uintptr_t)&ht->tail, old_val,
rte_memory_order_relaxed);
- ht->tail = new_val;
+ /*
+ * R0: Establishes a synchronizing edge with load-acquire of tail at A1.
+ * Ensures that memory effects by this thread on ring elements array
+ * is observed by a different thread of the other type.
+ */
+ __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
}
/**
@@ -73,7 +76,7 @@ __rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
{
unsigned int max = n;
- int success;
+ bool success;
do {
/* Reset n to the initial burst count */
@@ -81,10 +84,10 @@ __rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
*old_head = d->head;
- /* add rmb barrier to avoid load/load reorder in weak
+ /* add fence to avoid load/load reorder in weak
* memory model. It is noop on x86
*/
- rte_smp_rmb();
+ __atomic_thread_fence(__ATOMIC_ACQUIRE);
/*
* The subtraction is done between two unsigned 32bits value
@@ -103,10 +106,12 @@ __rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
return 0;
*new_head = *old_head + n;
- success = rte_atomic32_cmpset(
+
+ success = __sync_bool_compare_and_swap(
(uint32_t *)(uintptr_t)&d->head,
*old_head, *new_head);
- } while (unlikely(success == 0));
+ } while (unlikely(!success));
+
return n;
}
@@ -169,4 +174,4 @@ __rte_ring_headtail_move_head_st(struct rte_ring_headtail *d,
return n;
}
-#endif /* _RTE_RING_GENERIC_PVT_H_ */
+#endif /* _RTE_RING_GCC_PVT_H_ */
--
2.53.0
^ permalink raw reply related [flat|nested] 17+ messages in thread* [PATCH v2 3/3] ring: cleanup the C11 code
2026-06-04 16:32 ` [PATCH v2] ring: convert to C11 atomics where practical Stephen Hemminger
2026-06-04 16:32 ` [PATCH v2 1/3] ring: split single thread vs multi-thread cases Stephen Hemminger
2026-06-04 16:32 ` [PATCH v2 2/3] ring: use GCC builtin as alternative to rte_atomic32 Stephen Hemminger
@ 2026-06-04 16:32 ` Stephen Hemminger
2 siblings, 0 replies; 17+ messages in thread
From: Stephen Hemminger @ 2026-06-04 16:32 UTC (permalink / raw)
To: dev; +Cc: Stephen Hemminger, Konstantin Ananyev, Wathsala Vithanage
Put the C11 code in the rte_ring_elem_pvt.h file
and only have the GCC vs C11 code split in separate includes.
The internal functions to update tail of ring no longer use
the enqueue flag argument.
Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
Acked-by: Konstantin Ananyev <konstantin.ananyev@huawei.com>
---
lib/ring/rte_ring_c11_pvt.h | 88 ----------------------------
lib/ring/rte_ring_elem_pvt.h | 98 +++++++++++++++++++++++++++++---
lib/ring/rte_ring_gcc_pvt.h | 84 ---------------------------
lib/ring/rte_ring_hts_elem_pvt.h | 8 +--
lib/ring/soring.c | 10 ++--
5 files changed, 98 insertions(+), 190 deletions(-)
diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h
index 5afc14dec9..d232e5ac34 100644
--- a/lib/ring/rte_ring_c11_pvt.h
+++ b/lib/ring/rte_ring_c11_pvt.h
@@ -19,94 +19,6 @@
* For more information please refer to <rte_ring.h>.
*/
-/**
- * @internal This function updates tail values.
- */
-static __rte_always_inline void
-__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
- uint32_t new_val, uint32_t single, uint32_t enqueue)
-{
- RTE_SET_USED(enqueue);
-
- /*
- * If there are other enqueues/dequeues in progress that preceded us,
- * we need to wait for them to complete
- */
- if (!single)
- rte_wait_until_equal_32((uint32_t *)(uintptr_t)&ht->tail, old_val,
- rte_memory_order_relaxed);
-
- /*
- * R0: Establishes a synchronizing edge with load-acquire of tail at A1.
- * Ensures that memory effects by this thread on ring elements array
- * is observed by a different thread of the other type.
- */
- rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release);
-}
-
-/**
- * @internal This is a helper function that moves the producer/consumer head
- * optimized for single threaded case
- *
- * @param d
- * A pointer to the headtail structure with head value to be moved
- * @param s
- * A pointer to the counter-part headtail structure. Note that this
- * function only reads tail value from it
- * @param capacity
- * Either ring capacity value (for producer), or zero (for consumer)
- * @param n
- * The number of elements we want to move head value on
- * @param behavior
- * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
- * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
- * @param old_head
- * Returns head value as it was before the move
- * @param new_head
- * Returns the new head value
- * @param entries
- * Returns the number of ring entries available BEFORE head was moved
- * @return
- * Actual number of objects the head was moved on
- * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
- */
-static __rte_always_inline unsigned int
-__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d,
- const struct rte_ring_headtail *s, uint32_t capacity,
- unsigned int n,
- enum rte_ring_queue_behavior behavior,
- uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
-{
- uint32_t stail;
-
- /* Single producer: only this thread writes d->head,
- * so a relaxed load is sufficient.
- */
- *old_head = rte_atomic_load_explicit(&d->head, rte_memory_order_relaxed);
-
- /* Acquire pairs with the consumer's release-store of tail in __rte_ring_update_tail,
- * ensuring the consumer's ring-element reads are complete before
- * we observe the updated tail.
- */
- stail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire);
-
- /* Unsigned subtraction is modulo 2^32, so entries is always in
- * [0, capacity) even if old_head > stail.
- */
- *entries = capacity + stail - *old_head;
-
- /* check that we have enough room in ring */
- if (unlikely(n > *entries))
- n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
-
- if (n > 0) {
- *new_head = *old_head + n;
- rte_atomic_store_explicit(&d->head, *new_head, rte_memory_order_relaxed);
- }
-
- return n;
-}
-
/**
* @internal This is a helper function that moves the producer/consumer head
* for use in multi-thread safe path
diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h
index 9a0170c4f0..17ec450b8a 100644
--- a/lib/ring/rte_ring_elem_pvt.h
+++ b/lib/ring/rte_ring_elem_pvt.h
@@ -299,12 +299,94 @@ __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head,
cons_head & r->mask, esize, num);
}
-/* Between load and load. there might be cpu reorder in weak model
- * (powerpc/arm).
- * There are 2 choices for the users
- * 1.use rmb() memory barrier
- * 2.use one-direction load_acquire/store_release barrier
- * It depends on performance test results.
+static __rte_always_inline void
+__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
+ uint32_t new_val, uint32_t single)
+{
+ /*
+ * If there are other enqueues/dequeues in progress that preceded us,
+ * we need to wait for them to complete
+ */
+ if (!single)
+ rte_wait_until_equal_32((uint32_t *)(uintptr_t)&ht->tail, old_val,
+ rte_memory_order_relaxed);
+
+ /*
+ * R0: Establishes a synchronizing edge with load-acquire of tail at A1.
+ * Ensures that memory effects by this thread on ring elements array
+ * is observed by a different thread of the other type.
+ */
+ rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release);
+}
+
+/**
+ * @internal This is a helper function that moves the producer/consumer head
+ * optimized for single threaded case
+ *
+ * @param d
+ * A pointer to the headtail structure with head value to be moved
+ * @param s
+ * A pointer to the counter-part headtail structure. Note that this
+ * function only reads tail value from it
+ * @param capacity
+ * Either ring capacity value (for producer), or zero (for consumer)
+ * @param n
+ * The number of elements we want to move head value on
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
+ * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
+ * @param old_head
+ * Returns head value as it was before the move
+ * @param new_head
+ * Returns the new head value
+ * @param entries
+ * Returns the number of ring entries available BEFORE head was moved
+ * @return
+ * Actual number of objects the head was moved on
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
+ */
+static __rte_always_inline unsigned int
+__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d,
+ const struct rte_ring_headtail *s, uint32_t capacity,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
+{
+ uint32_t stail;
+
+ /* Single producer: only this thread writes d->head,
+ * so a relaxed load is sufficient.
+ */
+ *old_head = rte_atomic_load_explicit(&d->head, rte_memory_order_relaxed);
+
+ /* Acquire pairs with the consumer's release-store of tail in __rte_ring_update_tail,
+ * ensuring the consumer's ring-element reads are complete before
+ * we observe the updated tail.
+ */
+ stail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire);
+
+ /* Unsigned subtraction is modulo 2^32, so entries is always in
+ * [0, capacity) even if old_head > stail.
+ */
+ *entries = capacity + stail - *old_head;
+
+ /* check that we have enough room in ring */
+ if (unlikely(n > *entries))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+ if (n > 0) {
+ *new_head = *old_head + n;
+ rte_atomic_store_explicit(&d->head, *new_head, rte_memory_order_relaxed);
+ }
+
+ return n;
+}
+
+/*
+ * The function __rte_ring_headtail_move_head_mt has two versions
+ * based on what is most efficient on a given architecture.
+ *
+ * The C11 is preferred but on x86 GCC has 10% performance drop.
*/
#ifdef RTE_USE_C11_MEM_MODEL
#include "rte_ring_c11_pvt.h"
@@ -426,7 +508,7 @@ __rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table,
__rte_ring_enqueue_elems(r, prod_head, obj_table, esize, n);
- __rte_ring_update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
+ __rte_ring_update_tail(&r->prod, prod_head, prod_next, is_sp);
end:
if (free_space != NULL)
*free_space = free_entries - n;
@@ -473,7 +555,7 @@ __rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table,
__rte_ring_dequeue_elems(r, cons_head, obj_table, esize, n);
- __rte_ring_update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
+ __rte_ring_update_tail(&r->cons, cons_head, cons_next, is_sc);
end:
if (available != NULL)
diff --git a/lib/ring/rte_ring_gcc_pvt.h b/lib/ring/rte_ring_gcc_pvt.h
index 68ab1355e8..70fb4c3fcb 100644
--- a/lib/ring/rte_ring_gcc_pvt.h
+++ b/lib/ring/rte_ring_gcc_pvt.h
@@ -18,31 +18,6 @@
* For more information please refer to <rte_ring.h>.
*/
-/**
- * @internal This function updates tail values.
- */
-static __rte_always_inline void
-__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
- uint32_t new_val, uint32_t single, uint32_t enqueue)
-{
- RTE_SET_USED(enqueue);
-
- /*
- * If there are other enqueues/dequeues in progress that preceded us,
- * we need to wait for them to complete
- */
- if (!single)
- rte_wait_until_equal_32((volatile uint32_t *)(uintptr_t)&ht->tail, old_val,
- rte_memory_order_relaxed);
-
- /*
- * R0: Establishes a synchronizing edge with load-acquire of tail at A1.
- * Ensures that memory effects by this thread on ring elements array
- * is observed by a different thread of the other type.
- */
- __atomic_store_n(&ht->tail, new_val, __ATOMIC_RELEASE);
-}
-
/**
* @internal This is a helper function that moves the producer/consumer head
* for use in multi-thread safe path
@@ -115,63 +90,4 @@ __rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
return n;
}
-/**
- * @internal This is a helper function that moves the producer/consumer head
- * optimized for single threaded case
- *
- * @param d
- * A pointer to the headtail structure with head value to be moved
- * @param s
- * A pointer to the counter-part headtail structure. Note that this
- * function only reads tail value from it
- * @param capacity
- * Either ring capacity value (for producer), or zero (for consumer)
- * @param n
- * The number of elements we want to move head value on
- * @param behavior
- * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
- * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
- * @param old_head
- * Returns head value as it was before the move
- * @param new_head
- * Returns the new head value
- * @param entries
- * Returns the number of ring entries available BEFORE head was moved
- * @return
- * Actual number of objects the head was moved on
- * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
- */
-static __rte_always_inline unsigned int
-__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d,
- const struct rte_ring_headtail *s, uint32_t capacity,
- unsigned int n,
- enum rte_ring_queue_behavior behavior,
- uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
-{
- *old_head = d->head;
-
- /* add rmb barrier to avoid load/load reorder in weak
- * memory model. It is noop on x86
- */
- rte_smp_rmb();
-
- /*
- * The subtraction is done between two unsigned 32bits value
- * (the result is always modulo 32 bits even if we have
- * *old_head > s->tail). So 'entries' is always between 0
- * and capacity (which is < size).
- */
- *entries = (capacity + s->tail - *old_head);
-
- /* check that we have enough room in ring */
- if (unlikely(n > *entries))
- n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
-
- if (likely(n > 0)) {
- *new_head = *old_head + n;
- d->head = *new_head;
- }
- return n;
-}
-
#endif /* _RTE_RING_GCC_PVT_H_ */
diff --git a/lib/ring/rte_ring_hts_elem_pvt.h b/lib/ring/rte_ring_hts_elem_pvt.h
index a01089d15d..97ae240e2e 100644
--- a/lib/ring/rte_ring_hts_elem_pvt.h
+++ b/lib/ring/rte_ring_hts_elem_pvt.h
@@ -25,12 +25,10 @@
*/
static __rte_always_inline void
__rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t old_tail,
- uint32_t num, uint32_t enqueue)
+ uint32_t num)
{
uint32_t tail;
- RTE_SET_USED(enqueue);
-
tail = old_tail + num;
/*
@@ -217,7 +215,7 @@ __rte_ring_do_hts_enqueue_elem(struct rte_ring *r, const void *obj_table,
if (n != 0) {
__rte_ring_enqueue_elems(r, head, obj_table, esize, n);
- __rte_ring_hts_update_tail(&r->hts_prod, head, n, 1);
+ __rte_ring_hts_update_tail(&r->hts_prod, head, n);
}
if (free_space != NULL)
@@ -258,7 +256,7 @@ __rte_ring_do_hts_dequeue_elem(struct rte_ring *r, void *obj_table,
if (n != 0) {
__rte_ring_dequeue_elems(r, head, obj_table, esize, n);
- __rte_ring_hts_update_tail(&r->hts_cons, head, n, 0);
+ __rte_ring_hts_update_tail(&r->hts_cons, head, n);
}
if (available != NULL)
diff --git a/lib/ring/soring.c b/lib/ring/soring.c
index 22f9c60e9c..45292c0f78 100644
--- a/lib/ring/soring.c
+++ b/lib/ring/soring.c
@@ -202,21 +202,21 @@ __rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num,
static __rte_always_inline void
__rte_soring_update_tail(struct __rte_ring_headtail *rht,
- enum rte_ring_sync_type st, uint32_t head, uint32_t next, uint32_t enq)
+ enum rte_ring_sync_type st, uint32_t head, uint32_t next)
{
uint32_t n;
switch (st) {
case RTE_RING_SYNC_ST:
case RTE_RING_SYNC_MT:
- __rte_ring_update_tail(&rht->ht, head, next, st, enq);
+ __rte_ring_update_tail(&rht->ht, head, next, st);
break;
case RTE_RING_SYNC_MT_RTS:
__rte_ring_rts_update_tail(&rht->rts);
break;
case RTE_RING_SYNC_MT_HTS:
n = next - head;
- __rte_ring_hts_update_tail(&rht->hts, head, n, enq);
+ __rte_ring_hts_update_tail(&rht->hts, head, n);
break;
default:
/* unsupported mode, shouldn't be here */
@@ -295,7 +295,7 @@ soring_enqueue(struct rte_soring *r, const void *objs,
&prod_head, &prod_next, &nb_free);
if (n != 0) {
__enqueue_elems(r, objs, meta, prod_head, n);
- __rte_soring_update_tail(&r->prod, st, prod_head, prod_next, 1);
+ __rte_soring_update_tail(&r->prod, st, prod_head, prod_next);
}
if (free_space != NULL)
@@ -401,7 +401,7 @@ soring_dequeue(struct rte_soring *r, void *objs, void *meta,
/* we have some elems to consume */
if (n != 0) {
__dequeue_elems(r, objs, meta, cons_head, n);
- __rte_soring_update_tail(&r->cons, st, cons_head, cons_next, 0);
+ __rte_soring_update_tail(&r->cons, st, cons_head, cons_next);
}
if (available != NULL)
--
2.53.0
^ permalink raw reply related [flat|nested] 17+ messages in thread