From: Stephen Hemminger <stephen@networkplumber.org>
To: dev@dpdk.org
Cc: Stephen Hemminger <stephen@networkplumber.org>,
Konstantin Ananyev <konstantin.ananyev@huawei.com>,
Wathsala Vithanage <wathsala.vithanage@arm.com>
Subject: [PATCH 1/5] ring: split single thread vs multi-thread cases
Date: Tue, 2 Jun 2026 10:07:27 -0700 [thread overview]
Message-ID: <20260602171552.686349-2-stephen@networkplumber.org> (raw)
In-Reply-To: <20260602171552.686349-1-stephen@networkplumber.org>
The move head function has optimization for updating when
being used on single threaded ring. Code is cleaner if the two
cases are split into separate functions.
Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
---
lib/ring/rte_ring_c11_pvt.h | 100 +++++++++++++++++++++++++-------
lib/ring/rte_ring_elem_pvt.h | 16 +++--
lib/ring/rte_ring_generic_pvt.h | 77 ++++++++++++++++++++----
lib/ring/soring.c | 24 +++++---
4 files changed, 171 insertions(+), 46 deletions(-)
diff --git a/lib/ring/rte_ring_c11_pvt.h b/lib/ring/rte_ring_c11_pvt.h
index 07b6efc416..5afc14dec9 100644
--- a/lib/ring/rte_ring_c11_pvt.h
+++ b/lib/ring/rte_ring_c11_pvt.h
@@ -46,6 +46,7 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
/**
* @internal This is a helper function that moves the producer/consumer head
+ * optimized for single threaded case
*
* @param d
* A pointer to the headtail structure with head value to be moved
@@ -54,8 +55,6 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
* function only reads tail value from it
* @param capacity
* Either ring capacity value (for producer), or zero (for consumer)
- * @param is_st
- * Indicates whether multi-thread safe path is needed or not
* @param n
* The number of elements we want to move head value on
* @param behavior
@@ -72,14 +71,77 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
* If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
*/
static __rte_always_inline unsigned int
-__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
+__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d,
const struct rte_ring_headtail *s, uint32_t capacity,
- unsigned int is_st, unsigned int n,
+ unsigned int n,
enum rte_ring_queue_behavior behavior,
uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
{
uint32_t stail;
- int success;
+
+ /* Single producer: only this thread writes d->head,
+ * so a relaxed load is sufficient.
+ */
+ *old_head = rte_atomic_load_explicit(&d->head, rte_memory_order_relaxed);
+
+ /* Acquire pairs with the consumer's release-store of tail in __rte_ring_update_tail,
+ * ensuring the consumer's ring-element reads are complete before
+ * we observe the updated tail.
+ */
+ stail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire);
+
+ /* Unsigned subtraction is modulo 2^32, so entries is always in
+ * [0, capacity) even if old_head > stail.
+ */
+ *entries = capacity + stail - *old_head;
+
+ /* check that we have enough room in ring */
+ if (unlikely(n > *entries))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+ if (n > 0) {
+ *new_head = *old_head + n;
+ rte_atomic_store_explicit(&d->head, *new_head, rte_memory_order_relaxed);
+ }
+
+ return n;
+}
+
+/**
+ * @internal This is a helper function that moves the producer/consumer head
+ * for use in multi-thread safe path
+ *
+ * @param d
+ * A pointer to the headtail structure with head value to be moved
+ * @param s
+ * A pointer to the counter-part headtail structure. Note that this
+ * function only reads tail value from it
+ * @param capacity
+ * Either ring capacity value (for producer), or zero (for consumer)
+ * @param n
+ * The number of elements we want to move head value on
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
+ * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
+ * @param old_head
+ * Returns head value as it was before the move
+ * @param new_head
+ * Returns the new head value
+ * @param entries
+ * Returns the number of ring entries available BEFORE head was moved
+ * @return
+ * Actual number of objects the head was moved on
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
+ */
+static __rte_always_inline unsigned int
+__rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
+ const struct rte_ring_headtail *s, uint32_t capacity,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
+{
+ uint32_t stail;
+ bool success;
unsigned int max = n;
/*
@@ -120,25 +182,21 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
return 0;
*new_head = *old_head + n;
- if (is_st) {
- d->head = *new_head;
- success = 1;
- } else
- /* on failure, *old_head is updated */
- /*
- * R1/A2.
- * R1: Establishes a synchronizing edge with A0 of a
- * different thread.
- * A2: Establishes a synchronizing edge with R1 of a
- * different thread to observe same value for stail
- * observed by that thread on CAS failure (to retry
- * with an updated *old_head).
- */
- success = rte_atomic_compare_exchange_strong_explicit(
+ /* on failure, *old_head is updated */
+ /*
+ * R1/A2.
+ * R1: Establishes a synchronizing edge with A0 of a
+ * different thread.
+ * A2: Establishes a synchronizing edge with R1 of a
+ * different thread to observe same value for stail
+ * observed by that thread on CAS failure (to retry
+ * with an updated *old_head).
+ */
+ success = rte_atomic_compare_exchange_strong_explicit(
&d->head, old_head, *new_head,
rte_memory_order_release,
rte_memory_order_acquire);
- } while (unlikely(success == 0));
+ } while (unlikely(!success));
return n;
}
diff --git a/lib/ring/rte_ring_elem_pvt.h b/lib/ring/rte_ring_elem_pvt.h
index 6eafae121f..a0fdec9812 100644
--- a/lib/ring/rte_ring_elem_pvt.h
+++ b/lib/ring/rte_ring_elem_pvt.h
@@ -341,8 +341,12 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
uint32_t *old_head, uint32_t *new_head,
uint32_t *free_entries)
{
- return __rte_ring_headtail_move_head(&r->prod, &r->cons, r->capacity,
- is_sp, n, behavior, old_head, new_head, free_entries);
+ if (is_sp)
+ return __rte_ring_headtail_move_head_st(&r->prod, &r->cons, r->capacity,
+ n, behavior, old_head, new_head, free_entries);
+ else
+ return __rte_ring_headtail_move_head_mt(&r->prod, &r->cons, r->capacity,
+ n, behavior, old_head, new_head, free_entries);
}
/**
@@ -374,8 +378,12 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc,
uint32_t *old_head, uint32_t *new_head,
uint32_t *entries)
{
- return __rte_ring_headtail_move_head(&r->cons, &r->prod, 0,
- is_sc, n, behavior, old_head, new_head, entries);
+ if (is_sc)
+ return __rte_ring_headtail_move_head_st(&r->cons, &r->prod, 0,
+ n, behavior, old_head, new_head, entries);
+ else
+ return __rte_ring_headtail_move_head_mt(&r->cons, &r->prod, 0,
+ n, behavior, old_head, new_head, entries);
}
/**
diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_generic_pvt.h
index affd2d5ba7..c044b0824f 100644
--- a/lib/ring/rte_ring_generic_pvt.h
+++ b/lib/ring/rte_ring_generic_pvt.h
@@ -42,6 +42,7 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
/**
* @internal This is a helper function that moves the producer/consumer head
+ * for use in multi-thread safe path
*
* @param d
* A pointer to the headtail structure with head value to be moved
@@ -50,8 +51,6 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
* function only reads tail value from it
* @param capacity
* Either ring capacity value (for producer), or zero (for consumer)
- * @param is_st
- * Indicates whether multi-thread safe path is needed or not
* @param n
* The number of elements we want to move head value on
* @param behavior
@@ -68,10 +67,9 @@ __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
* If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
*/
static __rte_always_inline unsigned int
-__rte_ring_headtail_move_head(struct rte_ring_headtail *d,
+__rte_ring_headtail_move_head_mt(struct rte_ring_headtail *d,
const struct rte_ring_headtail *s, uint32_t capacity,
- unsigned int is_st, unsigned int n,
- enum rte_ring_queue_behavior behavior,
+ unsigned int n, enum rte_ring_queue_behavior behavior,
uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
{
unsigned int max = n;
@@ -105,15 +103,70 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
return 0;
*new_head = *old_head + n;
- if (is_st) {
- d->head = *new_head;
- success = 1;
- } else
- success = rte_atomic32_cmpset(
- (uint32_t *)(uintptr_t)&d->head,
- *old_head, *new_head);
+ success = rte_atomic32_cmpset(
+ (uint32_t *)(uintptr_t)&d->head,
+ *old_head, *new_head);
} while (unlikely(success == 0));
return n;
}
+/**
+ * @internal This is a helper function that moves the producer/consumer head
+ * optimized for single threaded case
+ *
+ * @param d
+ * A pointer to the headtail structure with head value to be moved
+ * @param s
+ * A pointer to the counter-part headtail structure. Note that this
+ * function only reads tail value from it
+ * @param capacity
+ * Either ring capacity value (for producer), or zero (for consumer)
+ * @param n
+ * The number of elements we want to move head value on
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Move on a fixed number of items
+ * RTE_RING_QUEUE_VARIABLE: Move on as many items as possible
+ * @param old_head
+ * Returns head value as it was before the move
+ * @param new_head
+ * Returns the new head value
+ * @param entries
+ * Returns the number of ring entries available BEFORE head was moved
+ * @return
+ * Actual number of objects the head was moved on
+ * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only
+ */
+static __rte_always_inline unsigned int
+__rte_ring_headtail_move_head_st(struct rte_ring_headtail *d,
+ const struct rte_ring_headtail *s, uint32_t capacity,
+ unsigned int n,
+ enum rte_ring_queue_behavior behavior,
+ uint32_t *old_head, uint32_t *new_head, uint32_t *entries)
+{
+ *old_head = d->head;
+
+ /* add rmb barrier to avoid load/load reorder in weak
+ * memory model. It is noop on x86
+ */
+ rte_smp_rmb();
+
+ /*
+ * The subtraction is done between two unsigned 32bits value
+ * (the result is always modulo 32 bits even if we have
+ * *old_head > s->tail). So 'entries' is always between 0
+ * and capacity (which is < size).
+ */
+ *entries = (capacity + s->tail - *old_head);
+
+ /* check that we have enough room in ring */
+ if (unlikely(n > *entries))
+ n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
+
+ if (likely(n > 0)) {
+ *new_head = *old_head + n;
+ d->head = *new_head;
+ }
+ return n;
+}
+
#endif /* _RTE_RING_GENERIC_PVT_H_ */
diff --git a/lib/ring/soring.c b/lib/ring/soring.c
index e9c75619fe..22f9c60e9c 100644
--- a/lib/ring/soring.c
+++ b/lib/ring/soring.c
@@ -135,9 +135,12 @@ __rte_soring_move_prod_head(struct rte_soring *r, uint32_t num,
switch (st) {
case RTE_RING_SYNC_ST:
+ n = __rte_ring_headtail_move_head_st(&r->prod.ht, &r->cons.ht,
+ r->capacity, num, behavior, head, next, free);
+ break;
case RTE_RING_SYNC_MT:
- n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht,
- r->capacity, st, num, behavior, head, next, free);
+ n = __rte_ring_headtail_move_head_mt(&r->prod.ht, &r->cons.ht,
+ r->capacity, num, behavior, head, next, free);
break;
case RTE_RING_SYNC_MT_RTS:
n = __rte_ring_rts_move_head(&r->prod.rts, &r->cons.ht,
@@ -168,9 +171,13 @@ __rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num,
switch (st) {
case RTE_RING_SYNC_ST:
+ n = __rte_ring_headtail_move_head_st(&r->cons.ht,
+ &r->stage[stage].ht, 0, num, behavior,
+ head, next, avail);
+ break;
case RTE_RING_SYNC_MT:
- n = __rte_ring_headtail_move_head(&r->cons.ht,
- &r->stage[stage].ht, 0, st, num, behavior,
+ n = __rte_ring_headtail_move_head_mt(&r->cons.ht,
+ &r->stage[stage].ht, 0, num, behavior,
head, next, avail);
break;
case RTE_RING_SYNC_MT_RTS:
@@ -309,9 +316,8 @@ soring_enqueue_start(struct rte_soring *r, uint32_t num,
switch (st) {
case RTE_RING_SYNC_ST:
- n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht,
- r->capacity, RTE_RING_SYNC_ST, num, behavior,
- &head, &next, &free);
+ n = __rte_ring_headtail_move_head_st(&r->prod.ht, &r->cons.ht,
+ r->capacity, num, behavior, &head, &next, &free);
break;
case RTE_RING_SYNC_MT_HTS:
n = __rte_ring_hts_move_head(&r->prod.hts, &r->cons.ht,
@@ -419,8 +425,8 @@ soring_dequeue_start(struct rte_soring *r, void *objs, void *meta,
switch (st) {
case RTE_RING_SYNC_ST:
- n = __rte_ring_headtail_move_head(&r->cons.ht, &r->stage[ns].ht,
- 0, RTE_RING_SYNC_ST, num, behavior, &head, &next,
+ n = __rte_ring_headtail_move_head_st(&r->cons.ht, &r->stage[ns].ht,
+ 0, num, behavior, &head, &next,
&avail);
break;
case RTE_RING_SYNC_MT_HTS:
--
2.53.0
next prev parent reply other threads:[~2026-06-02 17:16 UTC|newest]
Thread overview: 17+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-06-02 17:07 [PATCH 0/5] ring: convert to C11 atomics where practical Stephen Hemminger
2026-06-02 17:07 ` Stephen Hemminger [this message]
2026-06-04 15:09 ` [PATCH 1/5] ring: split single thread vs multi-thread cases Konstantin Ananyev
2026-06-02 17:07 ` [PATCH 2/5] ring: use GCC builtin as alternative to rte_atomic32 Stephen Hemminger
2026-06-04 15:11 ` Konstantin Ananyev
2026-06-04 15:20 ` Stephen Hemminger
2026-06-04 15:43 ` Konstantin Ananyev
2026-06-02 17:07 ` [PATCH 3/5] ring: use C11 for update_tail Stephen Hemminger
2026-06-04 15:39 ` Konstantin Ananyev
2026-06-02 17:07 ` [PATCH 4/5] ring: drop unused arg to update_tail Stephen Hemminger
2026-06-04 15:40 ` Konstantin Ananyev
2026-06-02 17:07 ` [PATCH 5/5] ring: use C11 for single thread move head Stephen Hemminger
2026-06-04 15:41 ` Konstantin Ananyev
2026-06-04 16:32 ` [PATCH v2] ring: convert to C11 atomics where practical Stephen Hemminger
2026-06-04 16:32 ` [PATCH v2 1/3] ring: split single thread vs multi-thread cases Stephen Hemminger
2026-06-04 16:32 ` [PATCH v2 2/3] ring: use GCC builtin as alternative to rte_atomic32 Stephen Hemminger
2026-06-04 16:32 ` [PATCH v2 3/3] ring: cleanup the C11 code Stephen Hemminger
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260602171552.686349-2-stephen@networkplumber.org \
--to=stephen@networkplumber.org \
--cc=dev@dpdk.org \
--cc=konstantin.ananyev@huawei.com \
--cc=wathsala.vithanage@arm.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox