From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from mails.dpdk.org (mails.dpdk.org [217.70.189.124]) by smtp.lore.kernel.org (Postfix) with ESMTP id 93FF9C5472E for ; Mon, 26 Aug 2024 19:05:01 +0000 (UTC) Received: from mails.dpdk.org (localhost [127.0.0.1]) by mails.dpdk.org (Postfix) with ESMTP id 648AB400EF; Mon, 26 Aug 2024 21:05:00 +0200 (CEST) Received: from mail.lysator.liu.se (mail.lysator.liu.se [130.236.254.3]) by mails.dpdk.org (Postfix) with ESMTP id 6020F4003C for ; Mon, 26 Aug 2024 21:04:59 +0200 (CEST) Received: from mail.lysator.liu.se (localhost [127.0.0.1]) by mail.lysator.liu.se (Postfix) with ESMTP id C31E31B8AB for ; Mon, 26 Aug 2024 21:04:57 +0200 (CEST) Received: by mail.lysator.liu.se (Postfix, from userid 1004) id A705E1B7CA; Mon, 26 Aug 2024 21:04:57 +0200 (CEST) Received: from [192.168.1.86] (h-62-63-215-114.A163.priv.bahnhof.se [62.63.215.114]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by mail.lysator.liu.se (Postfix) with ESMTPSA id 2DDFF1B7C9; Mon, 26 Aug 2024 21:04:52 +0200 (CEST) Message-ID: <6af430b0-3bec-46f5-ac53-f69f5ba1de1e@lysator.liu.se> Date: Mon, 26 Aug 2024 21:04:51 +0200 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird Subject: Re: [RFC 3/6] ring/soring: introduce Staged Ordered Ring To: Konstantin Ananyev , dev@dpdk.org Cc: honnappa.nagarahalli@arm.com, jerinj@marvell.com, hemant.agrawal@nxp.com, bruce.richardson@intel.com, drc@linux.vnet.ibm.com, ruifeng.wang@arm.com, mb@smartsharesystems.com, Konstantin Ananyev References: <20240815085339.1434-1-konstantin.v.ananyev@yandex.ru> <20240815085339.1434-4-konstantin.v.ananyev@yandex.ru> Content-Language: en-US From: =?UTF-8?Q?Mattias_R=C3=B6nnblom?= In-Reply-To: <20240815085339.1434-4-konstantin.v.ananyev@yandex.ru> Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: 7bit X-Virus-Scanned: ClamAV using ClamSMTP X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org On 2024-08-15 10:53, Konstantin Ananyev wrote: > From: Konstantin Ananyev > > Staged-Ordered-Ring (SORING) provides a SW abstraction for 'ordered' queues > with multiple processing 'stages'. > It is based on conventional DPDK rte_ring, re-uses many of its concepts, > and even substantial part of its code. > It can be viewed as an 'extension' of rte_ring functionality. > In particular, main SORING properties: > - circular ring buffer with fixed size objects > - producer, consumer plus multiple processing stages in the middle. > - allows to split objects processing into multiple stages. > - objects remain in the same ring while moving from one stage to the other, > initial order is preserved, no extra copying needed. > - preserves the ingress order of objects within the queue across multiple > stages, i.e.: > at the same stage multiple threads can process objects from the ring in > any order, but for the next stage objects will always appear in the > original order. > - each stage (and producer/consumer) can be served by single and/or > multiple threads. > - number of stages, size and number of objects in the ring are > configurable at ring initialization time. > > Data-path API provides four main operations: > - enqueue/dequeue works in the same manner as for conventional rte_ring, > all rte_ring synchronization types are supported. > - acquire/release - for each stage there is an acquire (start) and > release (finish) operation. > after some objects are 'acquired' - given thread can safely assume that > it has exclusive possession of these objects till 'release' for them is > invoked. > Note that right now user has to release exactly the same number of > objects that was acquired before. > After 'release', objects can be 'acquired' by next stage and/or dequeued > by the consumer (in case of last stage). > > Expected use-case: applications that uses pipeline model > (probably with multiple stages) for packet processing, when preserving > incoming packet order is important. I.E.: IPsec processing, etc. > How does SORING related to Eventdev? Would it be feasible to reshape this into a SW event device? > Signed-off-by: Konstantin Ananyev > --- > lib/ring/meson.build | 4 +- > lib/ring/rte_soring.c | 144 ++++++++++++++ > lib/ring/rte_soring.h | 270 ++++++++++++++++++++++++++ > lib/ring/soring.c | 431 ++++++++++++++++++++++++++++++++++++++++++ > lib/ring/soring.h | 124 ++++++++++++ > lib/ring/version.map | 13 ++ > 6 files changed, 984 insertions(+), 2 deletions(-) > create mode 100644 lib/ring/rte_soring.c > create mode 100644 lib/ring/rte_soring.h > create mode 100644 lib/ring/soring.c > create mode 100644 lib/ring/soring.h > > diff --git a/lib/ring/meson.build b/lib/ring/meson.build > index 7fca958ed7..21f2c12989 100644 > --- a/lib/ring/meson.build > +++ b/lib/ring/meson.build > @@ -1,8 +1,8 @@ > # SPDX-License-Identifier: BSD-3-Clause > # Copyright(c) 2017 Intel Corporation > > -sources = files('rte_ring.c') > -headers = files('rte_ring.h') > +sources = files('rte_ring.c', 'rte_soring.c', 'soring.c') > +headers = files('rte_ring.h', 'rte_soring.h') > # most sub-headers are not for direct inclusion > indirect_headers += files ( > 'rte_ring_core.h', > diff --git a/lib/ring/rte_soring.c b/lib/ring/rte_soring.c > new file mode 100644 > index 0000000000..17b1b73a42 > --- /dev/null > +++ b/lib/ring/rte_soring.c > @@ -0,0 +1,144 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * Copyright(c) 2024 Huawei Technologies Co., Ltd > + */ > + > +#include "soring.h" > +#include > + > +RTE_LOG_REGISTER_DEFAULT(soring_logtype, INFO); > +#define RTE_LOGTYPE_SORING soring_logtype > +#define SORING_LOG(level, ...) \ > + RTE_LOG_LINE(level, SORING, "" __VA_ARGS__) > + > +static uint32_t > +soring_calc_elem_num(uint32_t count) > +{ > + return rte_align32pow2(count + 1); > +} > + > +static int > +soring_check_param(uint32_t esize, uint32_t stsize, uint32_t count, > + uint32_t stages) > +{ > + if (stages == 0) { > + SORING_LOG(ERR, "invalid number of stages: %u", stages); > + return -EINVAL; > + } > + > + /* Check if element size is a multiple of 4B */ > + if (esize == 0 || esize % 4 != 0) { > + SORING_LOG(ERR, "invalid element size: %u", esize); > + return -EINVAL; > + } > + > + /* Check if ret-code size is a multiple of 4B */ > + if (stsize % 4 != 0) { > + SORING_LOG(ERR, "invalid retcode size: %u", stsize); > + return -EINVAL; > + } > + > + /* count must be a power of 2 */ > + if (rte_is_power_of_2(count) == 0 || > + (count > RTE_SORING_ELEM_MAX + 1)) { > + SORING_LOG(ERR, "invalid number of elements: %u", count); > + return -EINVAL; > + } > + > + return 0; > +} > + > +/* > + * Calculate size offsets for SORING internal data layout. > + */ > +static size_t > +soring_get_szofs(uint32_t esize, uint32_t stsize, uint32_t count, > + uint32_t stages, size_t *elst_ofs, size_t *state_ofs, > + size_t *stage_ofs) > +{ > + size_t sz; > + const struct rte_soring * const r = NULL; > + > + sz = sizeof(r[0]) + (size_t)count * esize; > + sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); > + > + if (elst_ofs != NULL) > + *elst_ofs = sz; > + > + sz = sz + (size_t)count * stsize; > + sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); > + > + if (state_ofs != NULL) > + *state_ofs = sz; > + > + sz += sizeof(r->state[0]) * count; > + sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); > + > + if (stage_ofs != NULL) > + *stage_ofs = sz; > + > + sz += sizeof(r->stage[0]) * stages; > + sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); > + > + return sz; > +} > + > + > +ssize_t > +rte_soring_get_memsize(const struct rte_soring_param *prm) > +{ > + int32_t rc; > + uint32_t count; > + > + count = soring_calc_elem_num(prm->elems); > + rc = soring_check_param(prm->esize, prm->stsize, count, prm->stages); > + if (rc != 0) > + return rc; > + > + return soring_get_szofs(prm->esize, prm->stsize, count, prm->stages, > + NULL, NULL, NULL); > +} > + > +int > +rte_soring_init(struct rte_soring *r, const struct rte_soring_param *prm) > +{ > + int32_t rc; > + uint32_t n; > + size_t elst_ofs, stage_ofs, state_ofs; > + > + if (r == NULL || prm == NULL) > + return -EINVAL; > + > + n = soring_calc_elem_num(prm->elems); > + rc = soring_check_param(prm->esize, prm->stsize, n, prm->stages); > + if (rc != 0) > + return rc; > + > + soring_get_szofs(prm->esize, prm->stsize, n, prm->stages, &elst_ofs, > + &state_ofs, &stage_ofs); > + > + memset(r, 0, sizeof(*r)); > + rc = strlcpy(r->name, prm->name, sizeof(r->name)); > + if (rc < 0 || rc >= (int)sizeof(r->name)) > + return -ENAMETOOLONG; > + > + r->size = n; > + r->mask = r->size - 1; > + r->capacity = prm->elems; > + r->esize = prm->esize; > + r->stsize = prm->stsize; > + > + r->prod.ht.sync_type = prm->prod_synt; > + r->cons.ht.sync_type = prm->cons_synt; > + > + r->state = (union soring_state *)((uintptr_t)r + state_ofs); > + memset(r->state, 0, sizeof(r->state[0]) * r->size); > + > + r->stage = (struct soring_stage *)((uintptr_t)r + stage_ofs); > + r->nb_stage = prm->stages; > + memset(r->stage, 0, r->nb_stage * sizeof(r->stage[0])); > + > + if (r->stsize != 0) > + r->elemst = (void *)((uintptr_t)r + elst_ofs); > + > + return 0; > +} > diff --git a/lib/ring/rte_soring.h b/lib/ring/rte_soring.h > new file mode 100644 > index 0000000000..fb0e75b39a > --- /dev/null > +++ b/lib/ring/rte_soring.h > @@ -0,0 +1,270 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * Copyright(c) 2024 Huawei Technologies Co., Ltd > + */ > + > +#ifndef _RTE_SORING_H_ > +#define _RTE_SORING_H_ > + > +/** > + * @file > + * This file contains definition of RTE soring (Staged Ordered Ring) public API. > + * Brief description: > + * enqueue/dequeue works the same as for conventional rte_ring: > + * any rte_ring sync types can be used, etc. > + * Plus there could be multiple 'stages'. > + * For each stage there is an acquire (start) and release (finish) operation. > + * after some elems are 'acquired' - user can safely assume that he has > + * exclusive possession of these elems till 'release' for them is done. > + * Note that right now user has to release exactly the same number of elems > + * he acquired before. > + * After 'release', elems can be 'acquired' by next stage and/or dequeued > + * (in case of last stage). > + */ > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +#include > + > +/* upper 2 bits are used for status */ > +#define RTE_SORING_ST_BIT 30 > + > +/* max possible number of elements in the soring */ > +#define RTE_SORING_ELEM_MAX (RTE_BIT32(RTE_SORING_ST_BIT) - 1) > + > +struct rte_soring_param { > + /** expected name of the ring */ > + const char *name; > + /** number of elemnts in the ring */ > + uint32_t elems; > + /** size of elements in the ring, must be a multiple of 4 */ > + uint32_t esize; > + /** > + * size of retcode for each elem, must be a multiple of 4. > + * This parameter defines a size of supplementary and optional > + * array of ret-codes associated with each object in the soring. > + * While element size is configurable (see @esize parameter above), > + * so user can specify it big enough to hold both object and its > + * ret-code together, for performance reasons it might be plausible > + * to access them as separate arrays. > + * Common usage scenario when such separation helps: > + * enqueue() - writes to objects array > + * acquire() - reads from objects array > + * release() - writes to ret-code array > + * dequeue() - reads both objects and ret-code array > + */ > + uint32_t stsize; > + /** number of stages in the ring */ > + uint32_t stages; > + /** sync type for producer */ > + enum rte_ring_sync_type prod_synt; > + /** sync type for consumer */ > + enum rte_ring_sync_type cons_synt; > +}; > + > +struct rte_soring; > + > +/** > + * Calculate the memory size needed for a soring > + * > + * This function returns the number of bytes needed for a ring, given > + * the expected parameters for it. This value is the sum of the size of > + * the internal metadata and the size of the memory needed by the > + * actual ring elements and theri rec-codes. The value is aligned to a cache > + * line size. > + * > + * @param prm > + * Pointer to the structure that contains soring creation paramers. > + * @return > + * - The memory size needed for the soring on success. > + * - -EINVAL if provided paramer values are invalid. > + */ > +__rte_experimental > +ssize_t > +rte_soring_get_memsize(const struct rte_soring_param *prm); > + > +/** > + * Initialize a soring structure. > + * > + * Initialize a soring structure in memory pointed by "r". > + * The size of the memory area must be large enough to store the soring > + * internal structures plus the objects and ret-code tables. > + * It is strongly advised to use rte_ring_get_memsize() to get the > + * appropriate size. > + * > + * @param r > + * Pointer to the soring structure. > + * @param prm > + * Pointer to the structure that contains soring creation paramers. > + * @return > + * - 0 on success, or a negative error code. > + */ > +__rte_experimental > +int > +rte_soring_init(struct rte_soring *r, const struct rte_soring_param *prm); > + > +/** > + * Return the total number of filled entries in a ring. > + * > + * @param r > + * A pointer to the soring structure. > + * @return > + * The number of entries in the ring. > + */ > +__rte_experimental > +unsigned int > +rte_soring_count(const struct rte_soring *r); > + > +/** > + * Enqueue several objects on the ring. > + * > + * @param r > + * A pointer to the soring structure. > + * @param obj_table > + * A pointer to an array of objects to enqueue. > + * Size of objects to enqueue must be the same value as @esize parameter > + * used while creating the ring. Otherwise the results are undefined. > + * @param objst > + * A pointer to an array of status values for each object to enqueue. > + * Note that if user not using object status values, then this parameter > + * can be NULL. > + * Size of elements in this array must be the same value as @stsize parameter > + * used while creating the ring. If user created the soring with > + * @stsize value equals zero, then @objst parameter should be NULL. > + * Otherwise the results are undefined. > + * @param n > + * The number of objects to add in the ring from the obj_table. > + * @param behavior > + * Behavior type, one of: > + * - RTE_RING_QUEUE_FIXED: enqueue either exactly @n objects or none. > + * - RTE_RING_QUEUE_VARIABLE: enqueue up to @n objects. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * enqueue operation has finished. > + * @return > + * - Actual number of objects enqueued. > + */ > +__rte_experimental > +uint32_t > +rte_soring_enqueue(struct rte_soring *r, const void *obj_table, > + const void *objst, uint32_t n, enum rte_ring_queue_behavior behavior, > + uint32_t *free_space); > + > +/** > + * Dequeue several objects from the ring. > + * > + * @param r > + * A pointer to the soring structure. > + * @param obj_table > + * A pointer to an array of objects to dequeue. > + * Size of objects to enqueue must be the same value as @esize parameter > + * used while creating the ring. Otherwise the results are undefined. > + * @param objst > + * A pointer to array of status values for each object to dequeue. > + * Note that if user not using object status values, then this parameter > + * can be NULL. > + * Size of elements in this array must be the same value as @stsize parameter > + * used while creating the ring. If user created the soring with > + * @stsize value equals zero, then @objst parameter should be NULL. > + * Otherwise the results are undefined. > + * @param num > + * The number of objects to dequeue from the ring into the obj_table. > + * @param behavior > + * Behavior type, one of: > + * - RTE_RING_QUEUE_FIXED: dequeue either exactly @n objects or none. > + * - RTE_RING_QUEUE_VARIABLE: dequeue up to @n objects. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * - Actual number of objects dequeued. > + */ > +__rte_experimental > +uint32_t > +rte_soring_dequeue(struct rte_soring *r, void *obj_table, void *objst, > + uint32_t num, enum rte_ring_queue_behavior behavior, > + uint32_t *available); > + > +/** > + * Acquire several objects from the ring for given stage. > + * > + * @param r > + * A pointer to the soring structure. > + * @param objs > + * A pointer to an array of objects to acquire. > + * Size of objects must be the same value as @esize parameter > + * used while creating the ring. Otherwise the results are undefined. > + * @param objst > + * A pointer to an array of status values for each for each acquired object. > + * Note that if user not using object status values, then this parameter > + * can be NULL. > + * Size of elements in this array must be the same value as @stsize parameter > + * used while creating the ring. If user created the soring with > + * @stsize value equals zero, then @objst parameter should be NULL. > + * Otherwise the results are undefined. > + * @param stage > + * Stage to acquire objects for. > + * @param num > + * The number of objects to acquire. > + * @param behavior > + * Behavior type, one of: > + * - RTE_RING_QUEUE_FIXED: acquire either exactly @n objects or none. > + * - RTE_RING_QUEUE_VARIABLE: acquire up to @n objects. > + * @param ftoken > + * Pointer to the opaque 'token' value used by release() op. > + * User has to store this value somewhere, and later provide to the > + * release(). > + * @param available > + * If non-NULL, returns the number of remaining ring entries for given stage > + * after the acquire has finished. > + * @return > + * - Actual number of objects acquired. > + */ > +__rte_experimental > +uint32_t > +rte_soring_acquire(struct rte_soring *r, void *objs, void *objst, > + uint32_t stage, uint32_t num, enum rte_ring_queue_behavior behavior, > + uint32_t *ftoken, uint32_t *available); > + > +/** > + * Release several objects for given stage back to the ring. > + * Note that it means these objects become avaialble for next stage or > + * dequeue. > + * > + * @param r > + * A pointer to the soring structure. > + * @param objs > + * A pointer to an array of objects to relase. > + * Note that unless user needs to overwrite ring objects this parameter > + * can be NULL. > + * Size of objects must be the same value as @esize parameter > + * used while creating the ring. Otherwise the results are undefined. > + * @param objst > + * A pointer to an array of status values for each object to release. > + * Note that if user not using object status values, then this parameter > + * can be NULL. > + * Size of elements in this array must be the same value as @stsize parameter > + * used while creating the ring. If user created the soring with > + * @stsize value equals zero, then objst parameter should be NULL. > + * Otherwise the results are undefined. > + * @param stage > + * Current stage. > + * @param n > + * The number of objects to release. > + * Has to be the same value as returned by acquire() op. > + * @param ftoken > + * Opaque 'token' value obtained from acquire() op. > + * @return > + * - None. > + */ > +__rte_experimental > +void > +rte_soring_release(struct rte_soring *r, const void *objs, > + const void *objst, uint32_t stage, uint32_t n, uint32_t ftoken); > + > +#ifdef __cplusplus > +} > +#endif > + > +#endif /* _RTE_SORING_H_ */ > diff --git a/lib/ring/soring.c b/lib/ring/soring.c > new file mode 100644 > index 0000000000..929bde9697 > --- /dev/null > +++ b/lib/ring/soring.c > @@ -0,0 +1,431 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * Copyright(c) 2024 Huawei Technologies Co., Ltd > + */ > + > +/** > + * @file > + * This file contains implementation of SORING 'datapath' functions. > + * Brief description: > + * ================== > + * enqueue/dequeue works the same as for conventional rte_ring: > + * any rte_ring sync types can be used, etc. > + * Plus there could be multiple 'stages'. > + * For each stage there is an acquire (start) and release (finish) operation. > + * After some elems are 'acquired' - user can safely assume that he has > + * exclusive possession of these elems till 'release' for them is done. > + * Note that right now user has to release exactly the same number of elems > + * he acquired before. > + * After 'release', elems can be 'acquired' by next stage and/or dequeued > + * (in case of last stage). > + * Internal structure: > + * =================== > + * In addition to 'normal' ring of elems, it also has a ring of states of the > + * same size. Each state[] corresponds to exactly one elem[]. > + * state[] will be used by acquire/release/dequeue functions to store internal > + * information and should not be accessed by the user directly. > + * How it works: > + * ============= > + * 'acquire()' just moves stage's head (same as rte_ring move_head does), > + * plus it saves in state[stage.cur_head] information about how many elems > + * were acquired, current head position and special flag value to indicate > + * that elems are acquired (RTE_SORING_ST_START). > + * Note that 'acquire()' returns to the user a special 'ftoken' that user has > + * to provide for 'release()' (in fact it is just a position for current head). > + * 'release()' extracts old head value from provided ftoken and checks that > + * corresponding 'state[]' contains expected values(mostly for sanity > + * purposes). > + * * Then it marks this state[] with 'RTE_SORING_ST_FINISH' flag to indicate > + * that given subset of objects was released. > + * After that, it checks does old head value equals to current tail value? > + * If yes, then it performs 'finalize()' operation, otherwise 'release()' > + * just returns (without spinning on stage tail value). > + * As updated state[] is shared by all threads, some other thread can do > + * 'finalize()' for given stage. > + * That allows 'release()' to avoid excessive waits on the tail value. > + * Main purpose of 'finalize()' operation is to walk through 'state[]' > + * from current stage tail up to its head, check state[] and move stage tail > + * through elements that already are in RTE_SORING_ST_FINISH state. > + * Along with that, corresponding state[] values are reset to zero. > + * Note that 'finalize()' for given stage can be done from multiple places: > + * 'release()' for that stage or from 'acquire()' for next stage > + * even from consumer's 'dequeue()' - in case given stage is the last one. > + * So 'finalize()' has to be MT-safe and inside it we have to > + * guarantee that only one thread will update state[] and stage's tail values. > + */ > + > +#include "soring.h" > + > +/* > + * Inline functions (fastpath) start here. > + */ > +static __rte_always_inline uint32_t > +__rte_soring_stage_finalize(struct soring_stage_headtail *sht, > + union soring_state *rstate, uint32_t rmask, uint32_t maxn) > +{ > + int32_t rc; > + uint32_t head, i, idx, k, n, tail; > + union soring_stage_tail nt, ot; > + union soring_state st; > + > + /* try to grab exclusive right to update tail value */ > + ot.raw = rte_atomic_load_explicit(&sht->tail.raw, > + rte_memory_order_acquire); > + > + /* other thread already finalizing it for us */ > + if (ot.sync != 0) > + return 0; > + > + nt.pos = ot.pos; > + nt.sync = 1; > + rc = rte_atomic_compare_exchange_strong_explicit(&sht->tail.raw, > + &ot.raw, nt.raw, rte_memory_order_acquire, > + rte_memory_order_relaxed); > + > + /* other thread won the race */ > + if (rc == 0) > + return 0; > + > + /* > + * start with current tail and walk through states that are > + * already finished. > + */ > + > + head = rte_atomic_load_explicit(&sht->head, rte_memory_order_relaxed); > + n = RTE_MIN(head - ot.pos, maxn); > + for (i = 0, tail = ot.pos; i < n; i += k, tail += k) { > + > + idx = tail & rmask; > + st.raw = rte_atomic_load_explicit(&rstate[idx].raw, > + rte_memory_order_relaxed); > + if ((st.stnum & RTE_SORING_ST_MASK) != RTE_SORING_ST_FINISH || > + st.ftoken != tail) > + break; > + > + k = st.stnum & ~RTE_SORING_ST_MASK; > + rte_atomic_store_explicit(&rstate[idx].raw, 0, > + rte_memory_order_relaxed); > + } > + > + > + /* release exclusive right to update along with new tail value */ > + ot.pos = tail; > + rte_atomic_store_explicit(&sht->tail.raw, ot.raw, > + rte_memory_order_release); > + > + return i; > +} > + > +static __rte_always_inline uint32_t > +__rte_soring_move_prod_head(struct rte_soring *r, uint32_t num, > + enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st, > + uint32_t *head, uint32_t *next, uint32_t *free) > +{ > + uint32_t n; > + > + switch (st) { > + case RTE_RING_SYNC_ST: > + case RTE_RING_SYNC_MT: > + n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht, > + r->capacity, st, num, behavior, head, next, free); > + break; > + case RTE_RING_SYNC_MT_RTS: > + n = __rte_ring_rts_move_head(&r->prod.rts, &r->cons.ht, > + r->capacity, num, behavior, head, free); > + *next = *head + n; > + break; > + case RTE_RING_SYNC_MT_HTS: > + n = __rte_ring_hts_move_head(&r->prod.hts, &r->cons.ht, > + r->capacity, num, behavior, head, free); > + *next = *head + n; > + break; > + default: > + /* unsupported mode, shouldn't be here */ > + RTE_ASSERT(0); > + *free = 0; > + n = 0; > + } > + > + return n; > +} > + > +static __rte_always_inline uint32_t > +__rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num, > + enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st, > + uint32_t *head, uint32_t *next, uint32_t *avail) > +{ > + uint32_t n; > + > + switch (st) { > + case RTE_RING_SYNC_ST: > + case RTE_RING_SYNC_MT: > + n = __rte_ring_headtail_move_head(&r->cons.ht, > + &r->stage[stage].ht, 0, st, num, behavior, > + head, next, avail); > + break; > + case RTE_RING_SYNC_MT_RTS: > + n = __rte_ring_rts_move_head(&r->cons.rts, &r->stage[stage].ht, > + 0, num, behavior, head, avail); > + *next = *head + n; > + break; > + case RTE_RING_SYNC_MT_HTS: > + n = __rte_ring_hts_move_head(&r->cons.hts, &r->stage[stage].ht, > + 0, num, behavior, head, avail); > + *next = *head + n; > + break; > + default: > + /* unsupported mode, shouldn't be here */ > + RTE_ASSERT(0); > + *avail = 0; > + n = 0; > + } > + > + return n; > +} > + > +static __rte_always_inline void > +__rte_soring_update_tail(struct __rte_ring_headtail *rht, > + enum rte_ring_sync_type st, uint32_t head, uint32_t next, uint32_t enq) > +{ > + uint32_t n; > + > + switch (st) { > + case RTE_RING_SYNC_ST: > + case RTE_RING_SYNC_MT: > + __rte_ring_update_tail(&rht->ht, head, next, st, enq); > + break; > + case RTE_RING_SYNC_MT_RTS: > + __rte_ring_rts_update_tail(&rht->rts); > + break; > + case RTE_RING_SYNC_MT_HTS: > + n = next - head; > + __rte_ring_hts_update_tail(&rht->hts, head, n, enq); > + break; > + default: > + /* unsupported mode, shouldn't be here */ > + RTE_ASSERT(0); > + } > +} > + > +static __rte_always_inline uint32_t > +__rte_soring_stage_move_head(struct soring_stage_headtail *d, > + const struct rte_ring_headtail *s, uint32_t capacity, uint32_t num, > + enum rte_ring_queue_behavior behavior, > + uint32_t *old_head, uint32_t *new_head, uint32_t *avail) > +{ > + uint32_t n, tail; > + > + *old_head = rte_atomic_load_explicit(&d->head, > + rte_memory_order_acquire); > + > + do { > + n = num; > + tail = rte_atomic_load_explicit(&s->tail, > + rte_memory_order_relaxed); > + *avail = capacity + tail - *old_head; > + if (n > *avail) > + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *avail; > + if (n == 0) > + return 0; > + *new_head = *old_head + n; > + } while (rte_atomic_compare_exchange_strong_explicit(&d->head, > + old_head, *new_head, rte_memory_order_acquire, > + rte_memory_order_acquire) == 0); > + > + return n; > +} > + > +/* > + * Public functions (data-path) start here. > + */ > + > +unsigned int > +rte_soring_count(const struct rte_soring *r) > +{ > + uint32_t prod_tail = r->prod.ht.tail; > + uint32_t cons_tail = r->cons.ht.tail; > + uint32_t count = (prod_tail - cons_tail) & r->mask; > + return (count > r->capacity) ? r->capacity : count; > +} > + > +uint32_t > +rte_soring_enqueue(struct rte_soring *r, const void *obj_table, > + const void *objst, uint32_t n, enum rte_ring_queue_behavior behavior, > + uint32_t *free_space) > +{ > + enum rte_ring_sync_type st; > + uint32_t nb_free, prod_head, prod_next; > + > + RTE_ASSERT(r != NULL && r->nb_stage > 0); > + RTE_ASSERT(objst == NULL || r->elemst != NULL); > + > + st = r->prod.ht.sync_type; > + > + n = __rte_soring_move_prod_head(r, n, behavior, st, > + &prod_head, &prod_next, &nb_free); > + if (n != 0) { > + __rte_ring_do_enqueue_elems(&r[1], obj_table, r->size, > + prod_head & r->mask, r->esize, n); > + if (objst != NULL) > + __rte_ring_do_enqueue_elems(r->elemst, objst, r->size, > + prod_head & r->mask, r->stsize, n); > + __rte_soring_update_tail(&r->prod, st, prod_head, prod_next, 1); > + } > + > + if (free_space != NULL) > + *free_space = nb_free - n; > + return n; > +} > + > +uint32_t > +rte_soring_dequeue(struct rte_soring *r, void *obj_table, void *objst, > + uint32_t num, enum rte_ring_queue_behavior behavior, > + uint32_t *available) > +{ > + enum rte_ring_sync_type st; > + uint32_t entries, cons_head, cons_next, n, ns, reqn; > + > + RTE_ASSERT(r != NULL && r->nb_stage > 0); > + RTE_ASSERT(objst == NULL || r->elemst != NULL); > + > + ns = r->nb_stage - 1; > + st = r->cons.ht.sync_type; > + > + /* try to grab exactly @num elems first */ > + n = __rte_soring_move_cons_head(r, ns, num, RTE_RING_QUEUE_FIXED, st, > + &cons_head, &cons_next, &entries); > + if (n == 0) { > + /* try to finalize some elems from previous stage */ > + n = __rte_soring_stage_finalize(&r->stage[ns].sht, > + r->state, r->mask, 2 * num); > + entries += n; > + > + /* repeat attempt to grab elems */ > + reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0; > + if (entries >= reqn) > + n = __rte_soring_move_cons_head(r, ns, num, behavior, > + st, &cons_head, &cons_next, &entries); > + else > + n = 0; > + } > + > + /* we have some elems to consume */ > + if (n != 0) { > + __rte_ring_do_dequeue_elems(obj_table, &r[1], r->size, > + cons_head & r->mask, r->esize, n); > + if (objst != NULL) > + __rte_ring_do_dequeue_elems(objst, r->elemst, r->size, > + cons_head & r->mask, r->stsize, n); > + __rte_soring_update_tail(&r->cons, st, cons_head, cons_next, 0); > + } > + > + if (available != NULL) > + *available = entries - n; > + return n; > +} > + > +uint32_t > +rte_soring_acquire(struct rte_soring *r, void *objs, void *objst, > + uint32_t stage, uint32_t num, enum rte_ring_queue_behavior behavior, > + uint32_t *ftoken, uint32_t *available) > +{ > + uint32_t avail, head, idx, n, next, reqn; > + union soring_state st; > + struct soring_stage *pstg; > + struct soring_stage_headtail *cons; > + > + RTE_ASSERT(r != NULL && stage < r->nb_stage); > + RTE_ASSERT(objst == NULL || r->elemst != NULL); > + > + cons = &r->stage[stage].sht; > + > + if (stage == 0) > + n = __rte_soring_stage_move_head(cons, &r->prod.ht, 0, num, > + behavior, &head, &next, &avail); > + else { > + pstg = r->stage + stage - 1; > + > + /* try to grab exactly @num elems */ > + n = __rte_soring_stage_move_head(cons, &pstg->ht, 0, num, > + RTE_RING_QUEUE_FIXED, &head, &next, &avail); > + if (n == 0) { > + /* try to finalize some elems from previous stage */ > + n = __rte_soring_stage_finalize(&pstg->sht, r->state, > + r->mask, 2 * num); > + avail += n; > + > + /* repeat attempt to grab elems */ > + reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0; > + if (avail >= reqn) > + n = __rte_soring_stage_move_head(cons, > + &pstg->ht, 0, num, behavior, &head, > + &next, &avail); > + else > + n = 0; > + } > + } > + > + if (n != 0) { > + > + idx = head & r->mask; > + > + /* copy elems that are ready for given stage */ > + __rte_ring_do_dequeue_elems(objs, &r[1], r->size, idx, > + r->esize, n); > + if (objst != NULL) > + __rte_ring_do_dequeue_elems(objst, r->elemst, > + r->size, idx, r->stsize, n); > + > + /* update status ring */ > + st.ftoken = head; > + st.stnum = (RTE_SORING_ST_START | n); > + > + rte_atomic_store_explicit(&r->state[idx].raw, st.raw, > + rte_memory_order_relaxed); > + *ftoken = head; > + } > + > + if (available != NULL) > + *available = avail - n; > + return n; > +} > + > +void > +rte_soring_release(struct rte_soring *r, const void *objs, > + const void *objst, uint32_t stage, uint32_t n, uint32_t ftoken) > +{ > + uint32_t idx, tail; > + struct soring_stage *stg; > + union soring_state st; > + > + RTE_ASSERT(r != NULL && stage < r->nb_stage); > + RTE_ASSERT(objst == NULL || r->elemst != NULL); > + > + stg = r->stage + stage; > + tail = rte_atomic_load_explicit(&stg->sht.tail.pos, > + rte_memory_order_relaxed); > + > + idx = ftoken & r->mask; > + st.raw = rte_atomic_load_explicit(&r->state[idx].raw, > + rte_memory_order_acquire); > + > + /* check state ring contents */ > + RTE_VERIFY(st.stnum == (RTE_SORING_ST_START | n) && > + st.ftoken == ftoken); > + > + /* update contents of the ring, if necessary */ > + if (objs != NULL) > + __rte_ring_do_enqueue_elems(&r[1], objs, r->size, idx, > + r->esize, n); > + if (objst != NULL) > + __rte_ring_do_enqueue_elems(r->elemst, objst, r->size, idx, > + r->stsize, n); > + > + /* set state to FINISH, make sure it is not reordered */ > + st.stnum = RTE_SORING_ST_FINISH | n; > + rte_atomic_store_explicit(&r->state[idx].raw, st.raw, > + rte_memory_order_release); > + > + if (tail == ftoken) > + __rte_soring_stage_finalize(&stg->sht, r->state, r->mask, > + r->capacity); > +} > diff --git a/lib/ring/soring.h b/lib/ring/soring.h > new file mode 100644 > index 0000000000..3a3f6efa76 > --- /dev/null > +++ b/lib/ring/soring.h > @@ -0,0 +1,124 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * Copyright(c) 2024 Huawei Technologies Co., Ltd > + */ > + > +#ifndef _SORING_H_ > +#define _SORING_H_ > + > +/** > + * @file > + * This file contains internal strctures of RTE soring: Staged Ordered Ring. > + * Sort of extension of conventional RTE ring. > + * Internal structure: > + * In addition to 'normal' ring of elems, it also has a ring of states of the > + * same size. Each state[] corresponds to exactly one elem[]. > + * state[] will be used by acquire/release/dequeue functions to store internal > + * information and should not be accessed by the user directly. > + * For actual implementation details, please refer to soring.c > + */ > + > +#include > + > +union soring_state { > + alignas(sizeof(uint64_t)) RTE_ATOMIC(uint64_t) raw; > + struct { > + RTE_ATOMIC(uint32_t) ftoken; > + RTE_ATOMIC(uint32_t) stnum; > + }; > +}; > + > +/* upper 2 bits are used for status */ > +#define RTE_SORING_ST_START RTE_SHIFT_VAL32(1, RTE_SORING_ST_BIT) > +#define RTE_SORING_ST_FINISH RTE_SHIFT_VAL32(2, RTE_SORING_ST_BIT) > + > +#define RTE_SORING_ST_MASK \ > + RTE_GENMASK32(sizeof(uint32_t) * CHAR_BIT - 1, RTE_SORING_ST_BIT) > + > +/** > + * SORING re-uses rte_ring internal structures and implementation > + * for enqueue/dequeue operations. > + */ > +struct __rte_ring_headtail { > + > + union __rte_cache_aligned { > + struct rte_ring_headtail ht; > + struct rte_ring_hts_headtail hts; > + struct rte_ring_rts_headtail rts; > + }; > + > + RTE_CACHE_GUARD; > +}; > + > +union soring_stage_tail { > + /** raw 8B value to read/write *sync* and *pos* as one atomic op */ > + alignas(sizeof(uint64_t)) RTE_ATOMIC(uint64_t) raw; > + struct { > + RTE_ATOMIC(uint32_t) sync; > + RTE_ATOMIC(uint32_t) pos; > + }; > +}; > + > +struct soring_stage_headtail { > + volatile union soring_stage_tail tail; > + enum rte_ring_sync_type unused; /**< unused */ > + volatile RTE_ATOMIC(uint32_t) head; > +}; > + > +/** > + * SORING stage head_tail structure is 'compatible' with rte_ring ones. > + * rte_ring internal functions for moving producer/consumer head > + * can work transparently with stage head_tail and visa-versa > + * (no modifications required). > + */ > +struct soring_stage { > + > + union __rte_cache_aligned { > + struct rte_ring_headtail ht; > + struct soring_stage_headtail sht; > + }; > + > + RTE_CACHE_GUARD; > +}; > + > +/** > + * RTE soring internal structure. > + * As with rte_ring actual elements array supposed to be located direclty > + * after the rte_soring structure. > + */ > +struct __rte_cache_aligned rte_soring { > + uint32_t size; /**< Size of ring. */ > + uint32_t mask; /**< Mask (size-1) of ring. */ > + uint32_t capacity; /**< Usable size of ring */ > + uint32_t esize; > + /**< size of elements in the ring, must be a multiple of 4*/ > + uint32_t stsize; > + /**< size of status value for each elem, must be a multiple of 4 */ > + > + /** Ring stages */ > + struct soring_stage *stage; > + uint32_t nb_stage; > + > + /** Ring of states (one per element) */ > + union soring_state *state; > + > + /** Pointer to the buffer where status values for each elements > + * are stored. This is supplementary and optional information that > + * user can attach to each element of the ring. > + * While it is possible to incorporate this information inside > + * user-defined element, in many cases it is plausible to maintain it > + * as a separate array (mainly for performance reasons). > + */ > + void *elemst; > + > + RTE_CACHE_GUARD; > + > + /** Ring producer status. */ > + struct __rte_ring_headtail prod; > + > + /** Ring consumer status. */ > + struct __rte_ring_headtail cons; > + > + char name[RTE_RING_NAMESIZE]; /**< Name of the ring. */ > +}; > + > +#endif /* _SORING_H_ */ > diff --git a/lib/ring/version.map b/lib/ring/version.map > index 8da094a69a..a1f95a500f 100644 > --- a/lib/ring/version.map > +++ b/lib/ring/version.map > @@ -14,3 +14,16 @@ DPDK_25 { > > local: *; > }; > + > +EXPERIMENTAL { > + global: > + > + # added in 24.11 > + rte_soring_acquire; > + rte_soring_count; > + rte_soring_dequeue; > + rte_soring_enqueue; > + rte_soring_get_memsize; > + rte_soring_init; > + rte_soring_release; > +};