From: Timothy McDaniel <timothy.mcdaniel@intel.com>
Cc: dev@dpdk.org, erik.g.carrillo@intel.com, gage.eads@intel.com,
harry.van.haaren@intel.com, jerinj@marvell.com,
thomas@monjalon.net
Subject: [dpdk-dev] [PATCH v2 1/1] event/dlb: optimize Dequeue Operations
Date: Tue, 13 Apr 2021 15:30:39 -0500 [thread overview]
Message-ID: <1618345839-3515-2-git-send-email-timothy.mcdaniel@intel.com> (raw)
In-Reply-To: <1618345839-3515-1-git-send-email-timothy.mcdaniel@intel.com>
Convert code to use x86 vector instructions, thereby significantly
improving dequeue performance.
Signed-off-by: Timothy McDaniel <timothy.mcdaniel@intel.com>
Signed-off-by: Harry Van Haaren <harry.van.haaren@intel.com>
---
drivers/event/dlb/dlb2.c | 445 ++++++++++++++++++++++++++++++----
drivers/event/dlb/dlb2_priv.h | 22 +-
2 files changed, 414 insertions(+), 53 deletions(-)
diff --git a/drivers/event/dlb/dlb2.c b/drivers/event/dlb/dlb2.c
index 818b1c367..c8a50cddf 100644
--- a/drivers/event/dlb/dlb2.c
+++ b/drivers/event/dlb/dlb2.c
@@ -375,6 +375,26 @@ set_default_depth_thresh(const char *key __rte_unused,
return 0;
}
+static int
+set_vector_opts_disab(const char *key __rte_unused,
+ const char *value,
+ void *opaque)
+{
+ bool *dlb2_vector_opts_disabled = opaque;
+
+ if (value == NULL || opaque == NULL) {
+ DLB2_LOG_ERR("NULL pointer\n");
+ return -EINVAL;
+ }
+
+ if ((*value == 'y') || (*value == 'Y'))
+ *dlb2_vector_opts_disabled = true;
+ else
+ *dlb2_vector_opts_disabled = false;
+
+ return 0;
+}
+
static int
set_qid_depth_thresh(const char *key __rte_unused,
const char *value,
@@ -1240,6 +1260,37 @@ dlb2_event_enqueue_forward_burst_delayed(void *event_port,
const struct rte_event events[],
uint16_t num);
+/* Generate the required bitmask for rotate-style expected QE gen bits.
+ * This requires a pattern of 1's and zeros, starting with expected as
+ * 1 bits, so when hardware writes 0's they're "new". This requires the
+ * ring size to be powers of 2 to wrap correctly.
+ */
+static void
+dlb2_hw_cq_bitmask_init(struct dlb2_port *qm_port, uint32_t cq_depth)
+{
+ uint64_t cq_build_mask = 0;
+ uint32_t i;
+
+ if (cq_depth > 64)
+ return; /* need to fall back to scalar code */
+
+ /*
+ * all 1's in first u64, all zeros in second is correct bit pattern to
+ * start. Special casing == 64 easier than adapting complex loop logic.
+ */
+ if (cq_depth == 64) {
+ qm_port->cq_rolling_mask = 0;
+ qm_port->cq_rolling_mask_2 = -1;
+ return;
+ }
+
+ for (i = 0; i < 64; i += (cq_depth * 2))
+ cq_build_mask |= ((1ULL << cq_depth) - 1) << (i + cq_depth);
+
+ qm_port->cq_rolling_mask = cq_build_mask;
+ qm_port->cq_rolling_mask_2 = cq_build_mask;
+}
+
static int
dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
struct dlb2_eventdev_port *ev_port,
@@ -1357,6 +1408,8 @@ dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
/* starting value of gen bit - it toggles at wrap time */
qm_port->gen_bit = 1;
+ dlb2_hw_cq_bitmask_init(qm_port, qm_port->cq_depth);
+
qm_port->int_armed = false;
/* Save off for later use in info and lookup APIs. */
@@ -1408,6 +1461,18 @@ dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
dequeue_depth,
qm_port->credits);
}
+
+ qm_port->use_scalar = false;
+
+#if (!defined RTE_ARCH_X86_64)
+ qm_port->use_scalar = true;
+#else
+ if ((qm_port->cq_depth > 64) ||
+ (!rte_is_power_of_2(qm_port->cq_depth)) ||
+ (dlb2->vector_opts_disabled == true))
+ qm_port->use_scalar = true;
+#endif
+
rte_spinlock_unlock(&handle->resource_lock);
return 0;
@@ -1553,6 +1618,7 @@ dlb2_hw_create_dir_port(struct dlb2_eventdev *dlb2,
qm_port->gen_bit_shift = __builtin_popcount(qm_port->cq_depth_mask);
/* starting value of gen bit - it toggles at wrap time */
qm_port->gen_bit = 1;
+ dlb2_hw_cq_bitmask_init(qm_port, qm_port->cq_depth);
qm_port->int_armed = false;
@@ -1593,6 +1659,16 @@ dlb2_hw_create_dir_port(struct dlb2_eventdev *dlb2,
dequeue_depth,
credit_high_watermark);
}
+
+#if (!defined RTE_ARCH_X86_64)
+ qm_port->use_scalar = true;
+#else
+ if ((qm_port->cq_depth > 64) ||
+ (!rte_is_power_of_2(qm_port->cq_depth)) ||
+ (dlb2->vector_opts_disabled == true))
+ qm_port->use_scalar = true;
+#endif
+
rte_spinlock_unlock(&handle->resource_lock);
return 0;
@@ -2987,10 +3063,11 @@ dlb2_event_release(struct dlb2_eventdev *dlb2,
int j = 0;
/* Zero-out QEs */
- qm_port->qe4[0].cmd_byte = 0;
- qm_port->qe4[1].cmd_byte = 0;
- qm_port->qe4[2].cmd_byte = 0;
- qm_port->qe4[3].cmd_byte = 0;
+ _mm_storeu_si128((void *)&qm_port->qe4[0], _mm_setzero_si128());
+ _mm_storeu_si128((void *)&qm_port->qe4[1], _mm_setzero_si128());
+ _mm_storeu_si128((void *)&qm_port->qe4[2], _mm_setzero_si128());
+ _mm_storeu_si128((void *)&qm_port->qe4[3], _mm_setzero_si128());
+
for (; j < DLB2_NUM_QES_PER_CACHE_LINE && (i + j) < n; j++) {
int16_t thresh = qm_port->token_pop_thresh;
@@ -3020,7 +3097,7 @@ dlb2_event_release(struct dlb2_eventdev *dlb2,
sw_credit_update:
/* each release returns one credit */
- if (!ev_port->outstanding_releases) {
+ if (unlikely(!ev_port->outstanding_releases)) {
DLB2_LOG_ERR("%s: Outstanding releases underflowed.\n",
__func__);
return;
@@ -3137,7 +3214,7 @@ dlb2_dequeue_wait(struct dlb2_eventdev *dlb2,
return 0;
}
-static inline int
+static __rte_noinline int
dlb2_process_dequeue_qes(struct dlb2_eventdev_port *ev_port,
struct dlb2_port *qm_port,
struct rte_event *events,
@@ -3406,8 +3483,7 @@ dlb2_recv_qe_sparse(struct dlb2_port *qm_port, struct dlb2_dequeue_qe *qe)
cq_addr = dlb2_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
- idx = qm_port->cq_idx;
-
+ idx = qm_port->cq_idx_unmasked & qm_port->cq_depth_mask;
/* Load the next 4 QEs */
addr[0] = (uintptr_t)&cq_addr[idx];
addr[1] = (uintptr_t)&cq_addr[(idx + 4) & qm_port->cq_depth_mask];
@@ -3452,6 +3528,272 @@ dlb2_recv_qe_sparse(struct dlb2_port *qm_port, struct dlb2_dequeue_qe *qe)
return __builtin_popcount(gen_bits);
}
+static inline void
+_process_deq_qes_vec_impl(struct dlb2_port *qm_port,
+ struct rte_event *events,
+ __m128i v_qe_3,
+ __m128i v_qe_2,
+ __m128i v_qe_1,
+ __m128i v_qe_0,
+ __m128i v_qe_meta,
+ __m128i v_qe_status,
+ uint32_t valid_events)
+{
+ /* Look up the event QIDs, using the hardware QIDs to index the
+ * port's QID mapping.
+ *
+ * Each v_qe_[0-4] is just a 16-byte load of the whole QE. It is
+ * passed along in registers as the QE data is required later.
+ *
+ * v_qe_meta is an u32 unpack of all 4x QEs. Aka, it contains one
+ * 32-bit slice of each QE, so makes up a full SSE register. This
+ * allows parallel processing of 4x QEs in a single register.
+ */
+
+ __m128i v_qid_done = {0};
+ int hw_qid0 = _mm_extract_epi8(v_qe_meta, 2);
+ int hw_qid1 = _mm_extract_epi8(v_qe_meta, 6);
+ int hw_qid2 = _mm_extract_epi8(v_qe_meta, 10);
+ int hw_qid3 = _mm_extract_epi8(v_qe_meta, 14);
+
+ int ev_qid0 = qm_port->qid_mappings[hw_qid0];
+ int ev_qid1 = qm_port->qid_mappings[hw_qid1];
+ int ev_qid2 = qm_port->qid_mappings[hw_qid2];
+ int ev_qid3 = qm_port->qid_mappings[hw_qid3];
+
+ v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid0, 2);
+ v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid1, 6);
+ v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid2, 10);
+ v_qid_done = _mm_insert_epi8(v_qid_done, ev_qid3, 14);
+
+ /* Schedule field remapping using byte shuffle
+ * - Full byte containing sched field handled here (op, rsvd are zero)
+ * - Note sanitizing the register requires two masking ANDs:
+ * 1) to strip prio/msg_type from byte for correct shuffle lookup
+ * 2) to strip any non-sched-field lanes from any results to OR later
+ * - Final byte result is >> 10 to another byte-lane inside the u32.
+ * This makes the final combination OR easier to make the rte_event.
+ */
+ __m128i v_sched_done;
+ __m128i v_sched_bits;
+ {
+ static const uint8_t sched_type_map[16] = {
+ [DLB2_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
+ [DLB2_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
+ [DLB2_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
+ [DLB2_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
+ };
+ static const uint8_t sched_and_mask[16] = {
+ 0x00, 0x00, 0x00, 0x03,
+ 0x00, 0x00, 0x00, 0x03,
+ 0x00, 0x00, 0x00, 0x03,
+ 0x00, 0x00, 0x00, 0x03,
+ };
+ const __m128i v_sched_map = _mm_loadu_si128(
+ (const __m128i *)sched_type_map);
+ __m128i v_sched_mask = _mm_loadu_si128(
+ (const __m128i *)&sched_and_mask);
+ v_sched_bits = _mm_and_si128(v_qe_meta, v_sched_mask);
+ __m128i v_sched_remapped = _mm_shuffle_epi8(v_sched_map,
+ v_sched_bits);
+ __m128i v_preshift = _mm_and_si128(v_sched_remapped,
+ v_sched_mask);
+ v_sched_done = _mm_srli_epi32(v_preshift, 10);
+ }
+
+ /* Priority handling
+ * - QE provides 3 bits of priority
+ * - Shift << 3 to move to MSBs for byte-prio in rte_event
+ * - Mask bits to avoid pollution, leaving only 3 prio MSBs in reg
+ */
+ __m128i v_prio_done;
+ {
+ static const uint8_t prio_mask[16] = {
+ 0x00, 0x00, 0x00, 0x07 << 5,
+ 0x00, 0x00, 0x00, 0x07 << 5,
+ 0x00, 0x00, 0x00, 0x07 << 5,
+ 0x00, 0x00, 0x00, 0x07 << 5,
+ };
+ __m128i v_prio_mask = _mm_loadu_si128(
+ (const __m128i *)prio_mask);
+ __m128i v_prio_shifted = _mm_slli_epi32(v_qe_meta, 3);
+ v_prio_done = _mm_and_si128(v_prio_shifted, v_prio_mask);
+ }
+
+ /* Event Sub/Type handling:
+ * we want to keep the lower 12 bits of each QE. Shift up by 20 bits
+ * to get the sub/ev type data into rte_event location, clearing the
+ * lower 20 bits in the process.
+ */
+ __m128i v_types_done;
+ {
+ static const uint8_t event_mask[16] = {
+ 0x0f, 0x00, 0x00, 0x00,
+ 0x0f, 0x00, 0x00, 0x00,
+ 0x0f, 0x00, 0x00, 0x00,
+ 0x0f, 0x00, 0x00, 0x00,
+ };
+ static const uint8_t sub_event_mask[16] = {
+ 0xff, 0x00, 0x00, 0x00,
+ 0xff, 0x00, 0x00, 0x00,
+ 0xff, 0x00, 0x00, 0x00,
+ 0xff, 0x00, 0x00, 0x00,
+ };
+ static const uint8_t flow_mask[16] = {
+ 0xff, 0xff, 0x00, 0x00,
+ 0xff, 0xff, 0x00, 0x00,
+ 0xff, 0xff, 0x00, 0x00,
+ 0xff, 0xff, 0x00, 0x00,
+ };
+ __m128i v_event_mask = _mm_loadu_si128(
+ (const __m128i *)event_mask);
+ __m128i v_sub_event_mask = _mm_loadu_si128(
+ (const __m128i *)sub_event_mask);
+ __m128i v_flow_mask = _mm_loadu_si128(
+ (const __m128i *)flow_mask);
+ __m128i v_sub = _mm_srli_epi32(v_qe_meta, 8);
+ v_sub = _mm_and_si128(v_sub, v_sub_event_mask);
+ __m128i v_type = _mm_and_si128(v_qe_meta, v_event_mask);
+ v_type = _mm_slli_epi32(v_type, 8);
+ v_types_done = _mm_or_si128(v_type, v_sub);
+ v_types_done = _mm_slli_epi32(v_types_done, 20);
+ __m128i v_flow = _mm_and_si128(v_qe_status, v_flow_mask);
+ v_types_done = _mm_or_si128(v_types_done, v_flow);
+ }
+
+ /* Combine QID, Sched and Prio fields, then Shift >> 8 bits to align
+ * with the rte_event, allowing unpacks to move/blend with payload.
+ */
+ __m128i v_q_s_p_done;
+ {
+ __m128i v_qid_sched = _mm_or_si128(v_qid_done, v_sched_done);
+ __m128i v_q_s_prio = _mm_or_si128(v_qid_sched, v_prio_done);
+ v_q_s_p_done = _mm_srli_epi32(v_q_s_prio, 8);
+ }
+
+ __m128i v_unpk_ev_23, v_unpk_ev_01, v_ev_2, v_ev_3, v_ev_0, v_ev_1;
+
+ /* Unpack evs into u64 metadata, then indiv events */
+ v_unpk_ev_23 = _mm_unpackhi_epi32(v_types_done, v_q_s_p_done);
+ v_unpk_ev_01 = _mm_unpacklo_epi32(v_types_done, v_q_s_p_done);
+
+ switch (valid_events) {
+ case 4:
+ v_ev_3 = _mm_blend_epi16(v_unpk_ev_23, v_qe_3, 0x0F);
+ v_ev_3 = _mm_alignr_epi8(v_ev_3, v_ev_3, 8);
+ _mm_storeu_si128((__m128i *)&events[3], v_ev_3);
+ /* fallthrough */
+ case 3:
+ v_ev_2 = _mm_unpacklo_epi64(v_unpk_ev_23, v_qe_2);
+ _mm_storeu_si128((__m128i *)&events[2], v_ev_2);
+ /* fallthrough */
+ case 2:
+ v_ev_1 = _mm_blend_epi16(v_unpk_ev_01, v_qe_1, 0x0F);
+ v_ev_1 = _mm_alignr_epi8(v_ev_1, v_ev_1, 8);
+ _mm_storeu_si128((__m128i *)&events[1], v_ev_1);
+ /* fallthrough */
+ case 1:
+ v_ev_0 = _mm_unpacklo_epi64(v_unpk_ev_01, v_qe_0);
+ _mm_storeu_si128((__m128i *)&events[0], v_ev_0);
+ }
+}
+
+static __rte_always_inline int
+dlb2_recv_qe_sparse_vec(struct dlb2_port *qm_port, void *events,
+ uint32_t max_events)
+{
+ /* Using unmasked idx for perf, and masking manually */
+ uint16_t idx = qm_port->cq_idx_unmasked;
+ volatile struct dlb2_dequeue_qe *cq_addr;
+
+ cq_addr = dlb2_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
+
+ uintptr_t qe_ptr_3 = (uintptr_t)&cq_addr[(idx + 12) &
+ qm_port->cq_depth_mask];
+ uintptr_t qe_ptr_2 = (uintptr_t)&cq_addr[(idx + 8) &
+ qm_port->cq_depth_mask];
+ uintptr_t qe_ptr_1 = (uintptr_t)&cq_addr[(idx + 4) &
+ qm_port->cq_depth_mask];
+ uintptr_t qe_ptr_0 = (uintptr_t)&cq_addr[(idx + 0) &
+ qm_port->cq_depth_mask];
+
+ /* Load QEs from CQ: use compiler barriers to avoid load reordering */
+ __m128i v_qe_3 = _mm_loadu_si128((const __m128i *)qe_ptr_3);
+ rte_compiler_barrier();
+ __m128i v_qe_2 = _mm_loadu_si128((const __m128i *)qe_ptr_2);
+ rte_compiler_barrier();
+ __m128i v_qe_1 = _mm_loadu_si128((const __m128i *)qe_ptr_1);
+ rte_compiler_barrier();
+ __m128i v_qe_0 = _mm_loadu_si128((const __m128i *)qe_ptr_0);
+
+ /* Generate the pkt_shuffle mask;
+ * - Avoids load in otherwise load-heavy section of code
+ * - Moves bytes 3,7,11,15 (gen bit bytes) to LSB bytes in XMM
+ */
+ const uint32_t stat_shuf_bytes = (15 << 24) | (11 << 16) | (7 << 8) | 3;
+ __m128i v_zeros = _mm_setzero_si128();
+ __m128i v_ffff = _mm_cmpeq_epi8(v_zeros, v_zeros);
+ __m128i v_stat_shuf_mask = _mm_insert_epi32(v_ffff, stat_shuf_bytes, 0);
+
+ /* Extract u32 components required from the QE
+ * - QE[64 to 95 ] for metadata (qid, sched, prio, event type, ...)
+ * - QE[96 to 127] for status (cq gen bit, error)
+ *
+ * Note that stage 1 of the unpacking is re-used for both u32 extracts
+ */
+ __m128i v_qe_02 = _mm_unpackhi_epi32(v_qe_0, v_qe_2);
+ __m128i v_qe_13 = _mm_unpackhi_epi32(v_qe_1, v_qe_3);
+ __m128i v_qe_status = _mm_unpackhi_epi32(v_qe_02, v_qe_13);
+ __m128i v_qe_meta = _mm_unpacklo_epi32(v_qe_02, v_qe_13);
+
+ /* Status byte (gen_bit, error) handling:
+ * - Shuffle to lanes 0,1,2,3, clear all others
+ * - Shift right by 7 for gen bit to MSB, movemask to scalar
+ * - Shift right by 2 for error bit to MSB, movemask to scalar
+ */
+ __m128i v_qe_shuffled = _mm_shuffle_epi8(v_qe_status, v_stat_shuf_mask);
+ __m128i v_qes_shift_gen_bit = _mm_slli_epi32(v_qe_shuffled, 7);
+ int32_t qe_gen_bits = _mm_movemask_epi8(v_qes_shift_gen_bit) & 0xf;
+
+ /* Expected vs Reality of QE Gen bits
+ * - cq_rolling_mask provides expected bits
+ * - QE loads, unpacks/shuffle and movemask provides reality
+ * - XOR of the two gives bitmask of new packets
+ * - POPCNT to get the number of new events
+ */
+ uint64_t rolling = qm_port->cq_rolling_mask & 0xF;
+ uint64_t qe_xor_bits = (qe_gen_bits ^ rolling);
+ uint32_t count_new = __builtin_popcount(qe_xor_bits);
+ count_new = RTE_MIN(count_new, max_events);
+ if (!count_new)
+ return 0;
+
+ /* emulate a 128 bit rotate using 2x 64-bit numbers and bit-shifts */
+
+ uint64_t m_rshift = qm_port->cq_rolling_mask >> count_new;
+ uint64_t m_lshift = qm_port->cq_rolling_mask << (64 - count_new);
+ uint64_t m2_rshift = qm_port->cq_rolling_mask_2 >> count_new;
+ uint64_t m2_lshift = qm_port->cq_rolling_mask_2 << (64 - count_new);
+
+ /* shifted out of m2 into MSB of m */
+ qm_port->cq_rolling_mask = (m_rshift | m2_lshift);
+
+ /* shifted out of m "looped back" into MSB of m2 */
+ qm_port->cq_rolling_mask_2 = (m2_rshift | m_lshift);
+
+ /* Prefetch the next QEs - should run as IPC instead of cycles */
+ rte_prefetch0(&cq_addr[(idx + 16) & qm_port->cq_depth_mask]);
+ rte_prefetch0(&cq_addr[(idx + 20) & qm_port->cq_depth_mask]);
+ rte_prefetch0(&cq_addr[(idx + 24) & qm_port->cq_depth_mask]);
+ rte_prefetch0(&cq_addr[(idx + 28) & qm_port->cq_depth_mask]);
+
+ /* Convert QEs from XMM regs to events and store events directly */
+ _process_deq_qes_vec_impl(qm_port, events, v_qe_3, v_qe_2, v_qe_1,
+ v_qe_0, v_qe_meta, v_qe_status, count_new);
+
+ return count_new;
+}
+
static inline void
dlb2_inc_cq_idx(struct dlb2_port *qm_port, int cnt)
{
@@ -3469,25 +3811,15 @@ dlb2_hw_dequeue_sparse(struct dlb2_eventdev *dlb2,
uint16_t max_num,
uint64_t dequeue_timeout_ticks)
{
- uint64_t timeout;
uint64_t start_ticks = 0ULL;
struct dlb2_port *qm_port;
int num = 0;
+ bool use_scalar;
+ uint64_t timeout;
qm_port = &ev_port->qm_port;
+ use_scalar = qm_port->use_scalar;
- /* We have a special implementation for waiting. Wait can be:
- * 1) no waiting at all
- * 2) busy poll only
- * 3) wait for interrupt. If wakeup and poll time
- * has expired, then return to caller
- * 4) umonitor/umwait repeatedly up to poll time
- */
-
- /* If configured for per dequeue wait, then use wait value provided
- * to this API. Otherwise we must use the global
- * value from eventdev config time.
- */
if (!dlb2->global_dequeue_wait)
timeout = dequeue_timeout_ticks;
else
@@ -3495,35 +3827,41 @@ dlb2_hw_dequeue_sparse(struct dlb2_eventdev *dlb2,
start_ticks = rte_get_timer_cycles();
+ use_scalar = use_scalar || (max_num & 0x3);
+
while (num < max_num) {
struct dlb2_dequeue_qe qes[DLB2_NUM_QES_PER_CACHE_LINE];
int num_avail;
-
- /* Copy up to 4 QEs from the current cache line into qes */
- num_avail = dlb2_recv_qe_sparse(qm_port, qes);
-
- /* But don't process more than the user requested */
- num_avail = RTE_MIN(num_avail, max_num - num);
-
- dlb2_inc_cq_idx(qm_port, num_avail << 2);
-
- if (num_avail == DLB2_NUM_QES_PER_CACHE_LINE)
- num += dlb2_process_dequeue_four_qes(ev_port,
- qm_port,
- &events[num],
- &qes[0]);
- else if (num_avail)
- num += dlb2_process_dequeue_qes(ev_port,
- qm_port,
- &events[num],
- &qes[0],
- num_avail);
- else if ((timeout == 0) || (num > 0))
- /* Not waiting in any form, or 1+ events received? */
- break;
- else if (dlb2_dequeue_wait(dlb2, ev_port, qm_port,
- timeout, start_ticks))
- break;
+ if (use_scalar) {
+ num_avail = dlb2_recv_qe_sparse(qm_port, qes);
+ num_avail = RTE_MIN(num_avail, max_num - num);
+ dlb2_inc_cq_idx(qm_port, num_avail << 2);
+ if (num_avail == DLB2_NUM_QES_PER_CACHE_LINE)
+ num += dlb2_process_dequeue_four_qes(ev_port,
+ qm_port,
+ &events[num],
+ &qes[0]);
+ else if (num_avail)
+ num += dlb2_process_dequeue_qes(ev_port,
+ qm_port,
+ &events[num],
+ &qes[0],
+ num_avail);
+ } else { /* !use_scalar */
+ num_avail = dlb2_recv_qe_sparse_vec(qm_port,
+ &events[num],
+ max_num - num);
+ num += num_avail;
+ dlb2_inc_cq_idx(qm_port, num_avail << 2);
+ DLB2_INC_STAT(ev_port->stats.traffic.rx_ok, num_avail);
+ }
+ if (!num_avail) {
+ if (num > 0)
+ break;
+ else if (dlb2_dequeue_wait(dlb2, ev_port, qm_port,
+ timeout, start_ticks))
+ break;
+ }
}
qm_port->owed_tokens += num;
@@ -4083,6 +4421,7 @@ dlb2_primary_eventdev_probe(struct rte_eventdev *dev,
dlb2->poll_interval = dlb2_args->poll_interval;
dlb2->sw_credit_quanta = dlb2_args->sw_credit_quanta;
dlb2->default_depth_thresh = dlb2_args->default_depth_thresh;
+ dlb2->vector_opts_disabled = dlb2_args->vector_opts_disabled;
err = dlb2_iface_open(&dlb2->qm_instance, name);
if (err < 0) {
@@ -4186,6 +4525,7 @@ dlb2_parse_params(const char *params,
DLB2_POLL_INTERVAL_ARG,
DLB2_SW_CREDIT_QUANTA_ARG,
DLB2_DEPTH_THRESH_ARG,
+ DLB2_VECTOR_OPTS_DISAB_ARG,
NULL };
if (params != NULL && params[0] != '\0') {
@@ -4299,6 +4639,17 @@ dlb2_parse_params(const char *params,
return ret;
}
+ ret = rte_kvargs_process(kvlist,
+ DLB2_VECTOR_OPTS_DISAB_ARG,
+ set_vector_opts_disab,
+ &dlb2_args->vector_opts_disabled);
+ if (ret != 0) {
+ DLB2_LOG_ERR("%s: Error parsing vector opts disabled",
+ name);
+ rte_kvargs_free(kvlist);
+ return ret;
+ }
+
rte_kvargs_free(kvlist);
}
}
diff --git a/drivers/event/dlb/dlb2_priv.h b/drivers/event/dlb/dlb2_priv.h
index 3c540a264..8b38d04fb 100644
--- a/drivers/event/dlb/dlb2_priv.h
+++ b/drivers/event/dlb/dlb2_priv.h
@@ -39,6 +39,7 @@
#define DLB2_POLL_INTERVAL_ARG "poll_interval"
#define DLB2_SW_CREDIT_QUANTA_ARG "sw_credit_quanta"
#define DLB2_DEPTH_THRESH_ARG "default_depth_thresh"
+#define DLB2_VECTOR_OPTS_DISAB_ARG "vector_opts_disable"
/* Begin HW related defines and structs */
@@ -206,9 +207,9 @@ enum dlb2_enqueue_type {
/* hw-specific format - do not change */
struct dlb2_event_type {
- uint8_t major:4;
- uint8_t unused:4;
- uint8_t sub;
+ uint16_t major:4;
+ uint16_t unused:4;
+ uint16_t sub:8;
};
union dlb2_opaque_data {
@@ -352,6 +353,12 @@ struct dlb2_port {
uint16_t cq_idx_unmasked;
uint16_t cq_depth_mask;
uint16_t gen_bit_shift;
+ uint64_t cq_rolling_mask; /*
+ * rotate to always have right expected
+ * gen bits
+ */
+ uint64_t cq_rolling_mask_2;
+ void *cq_addr_cached; /* avoid multiple refs */
enum dlb2_port_state state;
enum dlb2_configuration_state config_state;
int num_mapped_qids;
@@ -361,6 +368,7 @@ struct dlb2_port {
struct dlb2_cq_pop_qe *consume_qe;
struct dlb2_eventdev *dlb2; /* back ptr */
struct dlb2_eventdev_port *ev_port; /* back ptr */
+ bool use_scalar; /* force usage of scalar code */
};
/* Per-process per-port mmio and memory pointers */
@@ -514,9 +522,9 @@ struct dlb2_queue {
uint32_t num_qid_inflights; /* User config */
uint32_t num_atm_inflights; /* User config */
enum dlb2_configuration_state config_state;
- int sched_type; /* LB queue only */
- uint32_t id;
- bool is_directed;
+ int sched_type; /* LB queue only */
+ uint8_t id;
+ bool is_directed;
};
struct dlb2_eventdev_queue {
@@ -559,6 +567,7 @@ struct dlb2_eventdev {
uint32_t new_event_limit;
int max_num_events_override;
int num_dir_credits_override;
+ bool vector_opts_disabled;
volatile enum dlb2_run_state run_state;
uint16_t num_dir_queues; /* total num of evdev dir queues requested */
union {
@@ -618,6 +627,7 @@ struct dlb2_devargs {
int poll_interval;
int sw_credit_quanta;
int default_depth_thresh;
+ bool vector_opts_disabled;
};
/* End Eventdev related defines and structs */
--
2.23.0
next prev parent reply other threads:[~2021-04-13 20:31 UTC|newest]
Thread overview: 9+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-03-17 17:02 [dpdk-dev] [PATCH] event/dlb2: Optimize Dequeue Operations Timothy McDaniel
2021-03-21 11:10 ` Jerin Jacob
2021-04-13 20:30 ` [dpdk-dev] [PATCH v2 0/1] Optimize DLB Dequeue Timothy McDaniel
2021-04-13 20:30 ` Timothy McDaniel [this message]
2021-04-29 7:20 ` Jerin Jacob
2021-04-29 13:45 ` McDaniel, Timothy
2021-05-01 19:07 ` [dpdk-dev] [PATCH v3 0/1] Optimize DLB2 Dequeue Operations McDaniel, Timothy
2021-05-01 19:07 ` [dpdk-dev] [PATCH v3 1/1] event/dlb2: optimize " McDaniel, Timothy
2021-05-04 8:29 ` Jerin Jacob
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1618345839-3515-2-git-send-email-timothy.mcdaniel@intel.com \
--to=timothy.mcdaniel@intel.com \
--cc=dev@dpdk.org \
--cc=erik.g.carrillo@intel.com \
--cc=gage.eads@intel.com \
--cc=harry.van.haaren@intel.com \
--cc=jerinj@marvell.com \
--cc=thomas@monjalon.net \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.