All of lore.kernel.org
 help / color / mirror / Atom feed
From: Maxime Peim <maxime.peim@gmail.com>
To: dev@dpdk.org
Cc: stephen@networkplumber.org
Subject: [PATCH v2] test/flow: add support for async API
Date: Mon,  2 Mar 2026 00:29:30 +0100	[thread overview]
Message-ID: <20260301232931.444294-1-maxime.peim@gmail.com> (raw)
In-Reply-To: <20260225142310.51f75f4d@phoenix.local>

Add async flow API mode to test-flow-perf application for improved
flow rule insertion performance. The async API allows batching flow
rule creation operations and processing completions in bulk, reducing
per-rule overhead.

New command line options:
  --async: enable async flow API mode
  --async-queue-size=N: size of async queues (default: 1024)
  --async-push-batch=N: flows to batch before push (default: 256)

Signed-off-by: Maxime Peim <maxime.peim@gmail.com>
---
v2:
  - Replace per-flow stack allocation with pre-allocated slot pool;
    flat buffers are initialized once at init time and the hot path
    only patches per-flow item/action values into a pre-set slot
  - Fix alloca misuse: use heap allocation for queue_attr_list, round
    queue_size to power of 2 for bitmask wrapping, add bounds checks
  - Fix race on file-scope flow variable, premature latency
    measurement, and integer division in rate calculation
  - Drop unrelated lgopts reformatting
  - Use malloc instead of rte_zmalloc for non-dataplane allocations
  - Various robustness and style fixes

 app/test-flow-perf/actions_gen.c | 281 +++++++++++-
 app/test-flow-perf/actions_gen.h |  31 ++
 app/test-flow-perf/async_flow.c  | 761 +++++++++++++++++++++++++++++++
 app/test-flow-perf/async_flow.h  |  54 +++
 app/test-flow-perf/items_gen.c   |  58 +++
 app/test-flow-perf/items_gen.h   |   6 +
 app/test-flow-perf/main.c        | 302 +++++++++++-
 app/test-flow-perf/meson.build   |   1 +
 8 files changed, 1454 insertions(+), 40 deletions(-)
 create mode 100644 app/test-flow-perf/async_flow.c
 create mode 100644 app/test-flow-perf/async_flow.h

diff --git a/app/test-flow-perf/actions_gen.c b/app/test-flow-perf/actions_gen.c
index 9d102e3af4..2b8edd50c8 100644
--- a/app/test-flow-perf/actions_gen.c
+++ b/app/test-flow-perf/actions_gen.c
@@ -36,27 +36,7 @@ struct additional_para {
 	bool unique_data;
 };
 
-/* Storage for struct rte_flow_action_raw_encap including external data. */
-struct action_raw_encap_data {
-	struct rte_flow_action_raw_encap conf;
-	uint8_t data[128];
-	uint8_t preserve[128];
-	uint16_t idx;
-};
-
-/* Storage for struct rte_flow_action_raw_decap including external data. */
-struct action_raw_decap_data {
-	struct rte_flow_action_raw_decap conf;
-	uint8_t data[128];
-	uint16_t idx;
-};
-
-/* Storage for struct rte_flow_action_rss including external data. */
-struct action_rss_data {
-	struct rte_flow_action_rss conf;
-	uint8_t key[40];
-	uint16_t queue[128];
-};
+/* Compound action data structs defined in actions_gen.h */
 
 static void
 add_mark(struct rte_flow_action *actions,
@@ -1165,3 +1145,262 @@ fill_actions(struct rte_flow_action *actions, uint64_t *flow_actions,
 	free(queues);
 	free(hairpin_queues);
 }
+
+static size_t
+action_conf_size(enum rte_flow_action_type type)
+{
+	switch (type) {
+	case RTE_FLOW_ACTION_TYPE_MARK:
+		return sizeof(struct rte_flow_action_mark);
+	case RTE_FLOW_ACTION_TYPE_QUEUE:
+		return sizeof(struct rte_flow_action_queue);
+	case RTE_FLOW_ACTION_TYPE_JUMP:
+		return sizeof(struct rte_flow_action_jump);
+	case RTE_FLOW_ACTION_TYPE_RSS:
+		return sizeof(struct action_rss_data);
+	case RTE_FLOW_ACTION_TYPE_SET_META:
+		return sizeof(struct rte_flow_action_set_meta);
+	case RTE_FLOW_ACTION_TYPE_SET_TAG:
+		return sizeof(struct rte_flow_action_set_tag);
+	case RTE_FLOW_ACTION_TYPE_PORT_ID:
+		return sizeof(struct rte_flow_action_port_id);
+	case RTE_FLOW_ACTION_TYPE_COUNT:
+		return sizeof(struct rte_flow_action_count);
+	case RTE_FLOW_ACTION_TYPE_SET_MAC_SRC:
+	case RTE_FLOW_ACTION_TYPE_SET_MAC_DST:
+		return sizeof(struct rte_flow_action_set_mac);
+	case RTE_FLOW_ACTION_TYPE_SET_IPV4_SRC:
+	case RTE_FLOW_ACTION_TYPE_SET_IPV4_DST:
+		return sizeof(struct rte_flow_action_set_ipv4);
+	case RTE_FLOW_ACTION_TYPE_SET_IPV6_SRC:
+	case RTE_FLOW_ACTION_TYPE_SET_IPV6_DST:
+		return sizeof(struct rte_flow_action_set_ipv6);
+	case RTE_FLOW_ACTION_TYPE_SET_TP_SRC:
+	case RTE_FLOW_ACTION_TYPE_SET_TP_DST:
+		return sizeof(struct rte_flow_action_set_tp);
+	case RTE_FLOW_ACTION_TYPE_INC_TCP_ACK:
+	case RTE_FLOW_ACTION_TYPE_DEC_TCP_ACK:
+	case RTE_FLOW_ACTION_TYPE_INC_TCP_SEQ:
+	case RTE_FLOW_ACTION_TYPE_DEC_TCP_SEQ:
+		return sizeof(rte_be32_t);
+	case RTE_FLOW_ACTION_TYPE_SET_TTL:
+		return sizeof(struct rte_flow_action_set_ttl);
+	case RTE_FLOW_ACTION_TYPE_SET_IPV4_DSCP:
+	case RTE_FLOW_ACTION_TYPE_SET_IPV6_DSCP:
+		return sizeof(struct rte_flow_action_set_dscp);
+	case RTE_FLOW_ACTION_TYPE_METER:
+		return sizeof(struct rte_flow_action_meter);
+	case RTE_FLOW_ACTION_TYPE_RAW_ENCAP:
+		return sizeof(struct action_raw_encap_data);
+	case RTE_FLOW_ACTION_TYPE_RAW_DECAP:
+		return sizeof(struct action_raw_decap_data);
+	case RTE_FLOW_ACTION_TYPE_VXLAN_ENCAP:
+		return sizeof(struct rte_flow_action_vxlan_encap) +
+		       5 * sizeof(struct rte_flow_item) + sizeof(struct rte_flow_item_eth) +
+		       sizeof(struct rte_flow_item_ipv4) + sizeof(struct rte_flow_item_udp) +
+		       sizeof(struct rte_flow_item_vxlan);
+	case RTE_FLOW_ACTION_TYPE_MODIFY_FIELD:
+		return sizeof(struct rte_flow_action_modify_field);
+	/* Zero-conf types */
+	case RTE_FLOW_ACTION_TYPE_DROP:
+	case RTE_FLOW_ACTION_TYPE_FLAG:
+	case RTE_FLOW_ACTION_TYPE_DEC_TTL:
+	case RTE_FLOW_ACTION_TYPE_VXLAN_DECAP:
+		return 0;
+	default:
+		return 0;
+	}
+}
+
+void
+fill_actions_template(struct rte_flow_action *actions, struct rte_flow_action *masks,
+		      uint64_t *flow_actions, struct rte_flow_port_attr *port_attr,
+		      bool *need_wire_orig_table, size_t *conf_sizes, uint32_t *n_actions_out)
+{
+	uint8_t actions_counter = 0;
+	uint8_t i, j;
+
+	*need_wire_orig_table = false;
+	memset(port_attr, 0, sizeof(*port_attr));
+
+	/* Static configurations for actions that need them in templates */
+	static struct rte_flow_action_mark mark_conf = {
+		.id = 1,
+	};
+	static struct rte_flow_action_queue queue_conf = {
+		.index = 0,
+	};
+	static struct rte_flow_action_port_id port_id_conf = {
+		.id = 0,
+	};
+	static struct rte_flow_action_jump jump_conf = {
+		.group = 1,
+	};
+	static struct rte_flow_action_modify_field set_meta_conf = {
+		.operation = RTE_FLOW_MODIFY_SET,
+		.dst = {.field = RTE_FLOW_FIELD_META},
+		.src =
+			{
+				.field = RTE_FLOW_FIELD_VALUE,
+				.value = {0, 0, 0, META_DATA},
+			},
+		.width = 32,
+	};
+
+	/* Static mask configurations for each action type */
+	static struct rte_flow_action_mark mark_mask = {
+		.id = UINT32_MAX,
+	};
+	static struct rte_flow_action_queue queue_mask = {
+		.index = UINT16_MAX,
+	};
+	static struct rte_flow_action_jump jump_mask = {
+		.group = UINT32_MAX,
+	};
+	static struct rte_flow_action_rss rss_mask = {
+		.level = UINT32_MAX,
+		.types = UINT64_MAX,
+	};
+	static struct rte_flow_action_set_meta set_meta_mask = {
+		.data = UINT32_MAX,
+		.mask = UINT32_MAX,
+	};
+	static struct rte_flow_action_set_tag set_tag_mask = {
+		.data = UINT32_MAX,
+		.mask = UINT32_MAX,
+		.index = UINT8_MAX,
+	};
+	static struct rte_flow_action_port_id port_id_mask = {
+		.id = UINT32_MAX,
+	};
+	static struct rte_flow_action_count count_mask;
+	static struct rte_flow_action_set_mac set_mac_mask = {
+		.mac_addr = {0xff, 0xff, 0xff, 0xff, 0xff, 0xff},
+	};
+	static struct rte_flow_action_set_ipv4 set_ipv4_mask = {
+		.ipv4_addr = UINT32_MAX,
+	};
+	static struct rte_flow_action_set_ipv6 set_ipv6_mask = {
+		.ipv6_addr.a = {0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
+				0xff, 0xff, 0xff, 0xff, 0xff}};
+	static struct rte_flow_action_set_tp set_tp_mask = {
+		.port = UINT16_MAX,
+	};
+	static rte_be32_t tcp_seq_ack_mask = UINT32_MAX;
+	static struct rte_flow_action_set_ttl set_ttl_mask = {
+		.ttl_value = UINT8_MAX,
+	};
+	static struct rte_flow_action_set_dscp set_dscp_mask = {
+		.dscp = UINT8_MAX,
+	};
+	static struct rte_flow_action_meter meter_mask = {
+		.mtr_id = UINT32_MAX,
+	};
+
+	static const struct {
+		uint64_t flow_mask;
+		enum rte_flow_action_type type;
+		const void *action_conf;
+		const void *action_mask;
+		const bool need_wire_orig_table;
+	} template_actions[] = {
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_MARK), RTE_FLOW_ACTION_TYPE_MARK, &mark_conf,
+		 &mark_mask, true},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_COUNT), RTE_FLOW_ACTION_TYPE_COUNT, NULL,
+		 &count_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_MODIFY_FIELD),
+		 RTE_FLOW_ACTION_TYPE_MODIFY_FIELD, &set_meta_conf, &set_meta_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_TAG), RTE_FLOW_ACTION_TYPE_SET_TAG, NULL,
+		 &set_tag_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_FLAG), RTE_FLOW_ACTION_TYPE_FLAG, NULL, NULL,
+		 false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_MAC_SRC),
+		 RTE_FLOW_ACTION_TYPE_SET_MAC_SRC, NULL, &set_mac_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_MAC_DST),
+		 RTE_FLOW_ACTION_TYPE_SET_MAC_DST, NULL, &set_mac_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV4_SRC),
+		 RTE_FLOW_ACTION_TYPE_SET_IPV4_SRC, NULL, &set_ipv4_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV4_DST),
+		 RTE_FLOW_ACTION_TYPE_SET_IPV4_DST, NULL, &set_ipv4_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV6_SRC),
+		 RTE_FLOW_ACTION_TYPE_SET_IPV6_SRC, NULL, &set_ipv6_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV6_DST),
+		 RTE_FLOW_ACTION_TYPE_SET_IPV6_DST, NULL, &set_ipv6_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_TP_SRC), RTE_FLOW_ACTION_TYPE_SET_TP_SRC,
+		 NULL, &set_tp_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_TP_DST), RTE_FLOW_ACTION_TYPE_SET_TP_DST,
+		 NULL, &set_tp_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_INC_TCP_ACK),
+		 RTE_FLOW_ACTION_TYPE_INC_TCP_ACK, NULL, &tcp_seq_ack_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_DEC_TCP_ACK),
+		 RTE_FLOW_ACTION_TYPE_DEC_TCP_ACK, NULL, &tcp_seq_ack_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_INC_TCP_SEQ),
+		 RTE_FLOW_ACTION_TYPE_INC_TCP_SEQ, NULL, &tcp_seq_ack_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_DEC_TCP_SEQ),
+		 RTE_FLOW_ACTION_TYPE_DEC_TCP_SEQ, NULL, &tcp_seq_ack_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_TTL), RTE_FLOW_ACTION_TYPE_SET_TTL, NULL,
+		 &set_ttl_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_DEC_TTL), RTE_FLOW_ACTION_TYPE_DEC_TTL, NULL,
+		 NULL, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV4_DSCP),
+		 RTE_FLOW_ACTION_TYPE_SET_IPV4_DSCP, NULL, &set_dscp_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_SET_IPV6_DSCP),
+		 RTE_FLOW_ACTION_TYPE_SET_IPV6_DSCP, NULL, &set_dscp_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_QUEUE), RTE_FLOW_ACTION_TYPE_QUEUE,
+		 &queue_conf, &queue_mask, true},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_RSS), RTE_FLOW_ACTION_TYPE_RSS, NULL,
+		 &rss_mask, true},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_JUMP), RTE_FLOW_ACTION_TYPE_JUMP, &jump_conf,
+		 &jump_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_PORT_ID), RTE_FLOW_ACTION_TYPE_PORT_ID,
+		 &port_id_conf, &port_id_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_DROP), RTE_FLOW_ACTION_TYPE_DROP, NULL, NULL,
+		 false},
+		{HAIRPIN_QUEUE_ACTION, RTE_FLOW_ACTION_TYPE_QUEUE, &queue_conf, &queue_mask, false},
+		{HAIRPIN_RSS_ACTION, RTE_FLOW_ACTION_TYPE_RSS, NULL, &rss_mask, false},
+		{FLOW_ACTION_MASK(RTE_FLOW_ACTION_TYPE_METER), RTE_FLOW_ACTION_TYPE_METER, NULL,
+		 &meter_mask, false},
+	};
+
+	for (j = 0; j < MAX_ACTIONS_NUM; j++) {
+		if (flow_actions[j] == 0)
+			break;
+		for (i = 0; i < RTE_DIM(template_actions); i++) {
+			if ((flow_actions[j] & template_actions[i].flow_mask) == 0)
+				continue;
+
+			switch (template_actions[i].type) {
+			case RTE_FLOW_ACTION_TYPE_COUNT:
+				port_attr->nb_counters++;
+				break;
+			case RTE_FLOW_ACTION_TYPE_AGE:
+				port_attr->nb_aging_objects++;
+				break;
+			case RTE_FLOW_ACTION_TYPE_METER:
+				port_attr->nb_meters++;
+				break;
+			case RTE_FLOW_ACTION_TYPE_CONNTRACK:
+				port_attr->nb_conn_tracks++;
+				break;
+			case RTE_FLOW_ACTION_TYPE_QUOTA:
+				port_attr->nb_quotas++;
+			default:;
+			}
+
+			actions[actions_counter].type = template_actions[i].type;
+			actions[actions_counter].conf = template_actions[i].action_conf;
+			masks[actions_counter].type = template_actions[i].type;
+			masks[actions_counter].conf = template_actions[i].action_mask;
+			conf_sizes[actions_counter] = action_conf_size(template_actions[i].type);
+			*need_wire_orig_table |= template_actions[i].need_wire_orig_table;
+			actions_counter++;
+			break;
+		}
+	}
+
+	actions[actions_counter].type = RTE_FLOW_ACTION_TYPE_END;
+	masks[actions_counter].type = RTE_FLOW_ACTION_TYPE_END;
+
+	/* take END into account */
+	*n_actions_out = actions_counter + 1;
+}
diff --git a/app/test-flow-perf/actions_gen.h b/app/test-flow-perf/actions_gen.h
index 9e13b164f9..3ac0ffed59 100644
--- a/app/test-flow-perf/actions_gen.h
+++ b/app/test-flow-perf/actions_gen.h
@@ -17,9 +17,40 @@
 #define RTE_VXLAN_GPE_UDP_PORT 250
 #define RTE_GENEVE_UDP_PORT 6081
 
+/* Compound action data structures (needed by async_flow.c for slot init) */
+
+/* Storage for struct rte_flow_action_raw_encap including external data. */
+struct action_raw_encap_data {
+	struct rte_flow_action_raw_encap conf;
+	uint8_t data[128];
+	uint8_t preserve[128];
+	uint16_t idx;
+};
+
+/* Storage for struct rte_flow_action_raw_decap including external data. */
+struct action_raw_decap_data {
+	struct rte_flow_action_raw_decap conf;
+	uint8_t data[128];
+	uint16_t idx;
+};
+
+/* Storage for struct rte_flow_action_rss including external data. */
+struct action_rss_data {
+	struct rte_flow_action_rss conf;
+	uint8_t key[40];
+	uint16_t queue[128];
+};
+
 void fill_actions(struct rte_flow_action *actions, uint64_t *flow_actions,
 	uint32_t counter, uint16_t next_table, uint16_t hairpinq,
 	uint64_t encap_data, uint64_t decap_data, uint8_t core_idx,
 	bool unique_data, uint8_t rx_queues_count, uint16_t dst_port);
 
+/* Fill actions template for async flow API (types only, no values).
+ * If conf_sizes is non-NULL, populates per-action conf sizes and n_actions_out.
+ */
+void fill_actions_template(struct rte_flow_action *actions, struct rte_flow_action *masks,
+			   uint64_t *flow_actions, struct rte_flow_port_attr *port_attr,
+			   bool *need_wire_orig_table, size_t *conf_sizes, uint32_t *n_actions_out);
+
 #endif /* FLOW_PERF_ACTION_GEN */
diff --git a/app/test-flow-perf/async_flow.c b/app/test-flow-perf/async_flow.c
new file mode 100644
index 0000000000..ae5a922856
--- /dev/null
+++ b/app/test-flow-perf/async_flow.c
@@ -0,0 +1,761 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright 2026 Maxime Peim <maxime.peim@gmail.com>
+ *
+ * This file contains the async flow API implementation
+ * for the flow-perf application.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <rte_bitops.h>
+#include <rte_common.h>
+#include <rte_ethdev.h>
+#include <rte_flow.h>
+#include <rte_vxlan.h>
+
+#include "actions_gen.h"
+#include "async_flow.h"
+#include "flow_gen.h"
+#include "items_gen.h"
+
+/* Max iterations when draining pending async completions during cleanup */
+#define DRAIN_MAX_ITERATIONS 100
+
+/* Per-port async flow resources */
+static struct async_flow_resources port_resources[MAX_PORTS];
+
+/*
+ * Initialize compound action types within a pre-allocated slot.
+ * Called once per slot during pool init to set up internal pointers
+ * for RSS, RAW_ENCAP, RAW_DECAP and VXLAN_ENCAP actions.
+ */
+static void
+init_slot_compound_actions(struct rte_flow_action *actions, uint32_t n_actions,
+			   const size_t *action_conf_sizes)
+{
+	uint32_t i;
+
+	for (i = 0; i < n_actions; i++) {
+		if (action_conf_sizes[i] == 0)
+			continue;
+
+		switch (actions[i].type) {
+		case RTE_FLOW_ACTION_TYPE_RSS: {
+			struct action_rss_data *rss =
+				(struct action_rss_data *)(uintptr_t)actions[i].conf;
+			rss->conf.func = RTE_ETH_HASH_FUNCTION_DEFAULT;
+			rss->conf.level = 0;
+			rss->conf.types = GET_RSS_HF();
+			rss->conf.key_len = sizeof(rss->key);
+			rss->conf.key = rss->key;
+			rss->conf.queue = rss->queue;
+			rss->key[0] = 1;
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_RAW_ENCAP: {
+			struct action_raw_encap_data *encap =
+				(struct action_raw_encap_data *)(uintptr_t)actions[i].conf;
+			encap->conf.data = encap->data;
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_RAW_DECAP: {
+			struct action_raw_decap_data *decap =
+				(struct action_raw_decap_data *)(uintptr_t)actions[i].conf;
+			decap->conf.data = decap->data;
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_VXLAN_ENCAP: {
+			/*
+			 * Layout within the conf area:
+			 *   struct rte_flow_action_vxlan_encap
+			 *   struct rte_flow_item[5]
+			 *   struct rte_flow_item_eth
+			 *   struct rte_flow_item_ipv4
+			 *   struct rte_flow_item_udp
+			 *   struct rte_flow_item_vxlan
+			 */
+			uint8_t *base = (uint8_t *)(uintptr_t)actions[i].conf;
+			struct rte_flow_action_vxlan_encap *ve =
+				(struct rte_flow_action_vxlan_encap *)base;
+			struct rte_flow_item *items =
+				(struct rte_flow_item
+					 *)(base + sizeof(struct rte_flow_action_vxlan_encap));
+			uint8_t *data = (uint8_t *)(items + 5);
+
+			struct rte_flow_item_eth *item_eth = (struct rte_flow_item_eth *)data;
+			data += sizeof(struct rte_flow_item_eth);
+			struct rte_flow_item_ipv4 *item_ipv4 = (struct rte_flow_item_ipv4 *)data;
+			data += sizeof(struct rte_flow_item_ipv4);
+			struct rte_flow_item_udp *item_udp = (struct rte_flow_item_udp *)data;
+			data += sizeof(struct rte_flow_item_udp);
+			struct rte_flow_item_vxlan *item_vxlan = (struct rte_flow_item_vxlan *)data;
+
+			memset(item_eth, 0, sizeof(*item_eth));
+			memset(item_ipv4, 0, sizeof(*item_ipv4));
+			memset(item_udp, 0, sizeof(*item_udp));
+			memset(item_vxlan, 0, sizeof(*item_vxlan));
+
+			item_ipv4->hdr.src_addr = RTE_IPV4(127, 0, 0, 1);
+			item_ipv4->hdr.version_ihl = RTE_IPV4_VHL_DEF;
+			item_udp->hdr.dst_port = RTE_BE16(RTE_VXLAN_DEFAULT_PORT);
+			item_vxlan->hdr.vni[2] = 1;
+
+			items[0].type = RTE_FLOW_ITEM_TYPE_ETH;
+			items[0].spec = item_eth;
+			items[0].mask = item_eth;
+			items[1].type = RTE_FLOW_ITEM_TYPE_IPV4;
+			items[1].spec = item_ipv4;
+			items[1].mask = item_ipv4;
+			items[2].type = RTE_FLOW_ITEM_TYPE_UDP;
+			items[2].spec = item_udp;
+			items[2].mask = item_udp;
+			items[3].type = RTE_FLOW_ITEM_TYPE_VXLAN;
+			items[3].spec = item_vxlan;
+			items[3].mask = item_vxlan;
+			items[4].type = RTE_FLOW_ITEM_TYPE_END;
+
+			ve->definition = items;
+			break;
+		}
+		default:
+			break;
+		}
+	}
+}
+
+/*
+ * Allocate and pre-initialize all per-slot flat buffers.
+ * Returns 0 on success.
+ */
+static int
+init_slot_pool(struct async_flow_resources *res, uint32_t nb_queues, uint32_t queue_size,
+	       const struct rte_flow_item *pattern, uint32_t n_items, const size_t *item_spec_sizes,
+	       const struct rte_flow_action *template_actions, uint32_t n_actions,
+	       const size_t *action_conf_sizes)
+{
+	uint32_t items_array_bytes, actions_array_bytes;
+	uint32_t spec_data_bytes, conf_data_bytes, mask_data_bytes;
+	uint32_t slot_size, num_slots;
+	uint32_t s, i;
+	uint8_t *mptr;
+
+	/* Compute shared mask size */
+	mask_data_bytes = 0;
+	for (i = 0; i < n_items; i++)
+		mask_data_bytes += RTE_ALIGN_CEIL(item_spec_sizes[i], 8);
+
+	/* specs and masks have the same size */
+	spec_data_bytes = mask_data_bytes;
+
+	conf_data_bytes = 0;
+	for (i = 0; i < n_actions; i++)
+		conf_data_bytes += RTE_ALIGN_CEIL(action_conf_sizes[i], 8);
+
+	/* Compute per-slot layout sizes (+ 1 for END sentinel) */
+	items_array_bytes = n_items * sizeof(struct rte_flow_item);
+	actions_array_bytes = n_actions * sizeof(struct rte_flow_action);
+
+	slot_size = RTE_ALIGN_CEIL(items_array_bytes + actions_array_bytes + spec_data_bytes +
+					   conf_data_bytes,
+				   RTE_CACHE_LINE_SIZE);
+
+	num_slots = queue_size * nb_queues;
+
+	/* Store layout info */
+	res->slot_size = slot_size;
+	res->slots_per_queue = queue_size;
+	res->nb_queues = nb_queues;
+	res->n_items = n_items;
+	res->n_actions = n_actions;
+
+	/* Allocate shared masks */
+	if (mask_data_bytes > 0) {
+		res->shared_masks = aligned_alloc(
+			RTE_CACHE_LINE_SIZE, RTE_ALIGN_CEIL(mask_data_bytes, RTE_CACHE_LINE_SIZE));
+		if (res->shared_masks == NULL) {
+			fprintf(stderr, "Failed to allocate shared masks (%u bytes)\n",
+				mask_data_bytes);
+			return -ENOMEM;
+		}
+		memset(res->shared_masks, 0, mask_data_bytes);
+
+		/* Copy mask data from template pattern */
+		mptr = res->shared_masks;
+		for (i = 0; i < n_items; i++) {
+			if (item_spec_sizes[i] > 0 && pattern[i].mask != NULL)
+				memcpy(mptr, pattern[i].mask, item_spec_sizes[i]);
+			mptr += RTE_ALIGN_CEIL(item_spec_sizes[i], 8);
+		}
+	}
+
+	/* Allocate per-slot pool */
+	/* slot_size is already cache-line aligned, so total is a multiple */
+	res->slot_pool = aligned_alloc(RTE_CACHE_LINE_SIZE, (size_t)num_slots * slot_size);
+	if (res->slot_pool == NULL) {
+		fprintf(stderr, "Failed to allocate slot pool (%u slots * %u bytes)\n", num_slots,
+			slot_size);
+		free(res->shared_masks);
+		res->shared_masks = NULL;
+		return -ENOMEM;
+	}
+	memset(res->slot_pool, 0, (size_t)num_slots * slot_size);
+
+	/* Pre-initialize every slot */
+	for (s = 0; s < num_slots; s++) {
+		uint8_t *slot = res->slot_pool + (size_t)s * slot_size;
+		struct rte_flow_item *items = (struct rte_flow_item *)slot;
+		struct rte_flow_action *actions =
+			(struct rte_flow_action *)(slot + items_array_bytes);
+		uint8_t *data = slot + items_array_bytes + actions_array_bytes;
+
+		/* Pre-set items: spec → per-slot data, mask → shared masks */
+		mptr = res->shared_masks;
+		for (i = 0; i < n_items; i++) {
+			items[i].type = pattern[i].type;
+			if (item_spec_sizes[i] > 0) {
+				items[i].spec = data;
+				items[i].mask = mptr;
+				data += RTE_ALIGN_CEIL(item_spec_sizes[i], 8);
+				mptr += RTE_ALIGN_CEIL(item_spec_sizes[i], 8);
+			}
+		}
+		items[n_items].type = RTE_FLOW_ITEM_TYPE_END;
+
+		/* Pre-set actions: conf → per-slot data */
+		for (i = 0; i < n_actions; i++) {
+			actions[i].type = template_actions[i].type;
+			if (action_conf_sizes[i] > 0) {
+				actions[i].conf = data;
+				data += RTE_ALIGN_CEIL(action_conf_sizes[i], 8);
+			}
+		}
+		actions[n_actions].type = RTE_FLOW_ACTION_TYPE_END;
+
+		/* Initialize compound action types (RSS, RAW_ENCAP, etc.) */
+		init_slot_compound_actions(actions, n_actions, action_conf_sizes);
+	}
+
+	/* Allocate and initialize per-queue slot tracking */
+	res->queues = aligned_alloc(
+		RTE_CACHE_LINE_SIZE,
+		RTE_ALIGN_CEIL(nb_queues * sizeof(struct async_flow_queue), RTE_CACHE_LINE_SIZE));
+	if (res->queues == NULL) {
+		fprintf(stderr, "Failed to allocate queue structs (%u queues)\n", nb_queues);
+		free(res->slot_pool);
+		res->slot_pool = NULL;
+		free(res->shared_masks);
+		res->shared_masks = NULL;
+		return -ENOMEM;
+	}
+	memset(res->queues, 0, nb_queues * sizeof(struct async_flow_queue));
+	for (s = 0; s < nb_queues; s++) {
+		res->queues[s].slots = res->slot_pool + (size_t)s * queue_size * slot_size;
+		res->queues[s].head = 0;
+	}
+
+	printf(":: Slot pool: %u slots * %u bytes = %u KB (shared masks: %u bytes)\n", num_slots,
+	       slot_size, (num_slots * slot_size) / 1024, mask_data_bytes);
+
+	return 0;
+}
+
+/*
+ * Hot-path: update per-flow item values through pre-set pointers.
+ * Only IPV4/IPV6 src_addr varies per flow (based on counter).
+ */
+static void
+update_item_values(struct rte_flow_item *items, uint32_t counter)
+{
+	uint8_t i;
+
+	for (i = 0; items[i].type != RTE_FLOW_ITEM_TYPE_END; i++) {
+		switch (items[i].type) {
+		case RTE_FLOW_ITEM_TYPE_IPV4:
+			((struct rte_flow_item_ipv4 *)(uintptr_t)items[i].spec)->hdr.src_addr =
+				RTE_BE32(counter);
+			break;
+		case RTE_FLOW_ITEM_TYPE_IPV6: {
+			struct rte_flow_item_ipv6 *spec =
+				(struct rte_flow_item_ipv6 *)(uintptr_t)items[i].spec;
+			uint8_t j;
+			for (j = 0; j < 4; j++)
+				spec->hdr.src_addr.a[15 - j] = counter >> (j * 8);
+			break;
+		}
+		default:
+			break;
+		}
+	}
+}
+
+/*
+ * Hot-path: update per-flow action values through pre-set pointers.
+ */
+static void
+update_action_values(struct rte_flow_action *actions, uint32_t counter, uint16_t hairpinq,
+		     uint64_t encap_data, uint64_t decap_data, __rte_unused uint8_t core_idx,
+		     bool unique_data, uint8_t rx_queues_count, uint16_t dst_port)
+{
+	uint8_t i;
+
+	for (i = 0; actions[i].type != RTE_FLOW_ACTION_TYPE_END; i++) {
+		switch (actions[i].type) {
+		case RTE_FLOW_ACTION_TYPE_MARK:
+			((struct rte_flow_action_mark *)(uintptr_t)actions[i].conf)->id =
+				(counter % 255) + 1;
+			break;
+		case RTE_FLOW_ACTION_TYPE_QUEUE:
+			((struct rte_flow_action_queue *)(uintptr_t)actions[i].conf)->index =
+				hairpinq ? (counter % hairpinq) + rx_queues_count :
+					   counter % rx_queues_count;
+			break;
+		case RTE_FLOW_ACTION_TYPE_METER:
+			((struct rte_flow_action_meter *)(uintptr_t)actions[i].conf)->mtr_id =
+				counter;
+			break;
+		case RTE_FLOW_ACTION_TYPE_RSS: {
+			struct action_rss_data *rss =
+				(struct action_rss_data *)(uintptr_t)actions[i].conf;
+			uint16_t q;
+			if (hairpinq) {
+				rss->conf.queue_num = hairpinq;
+				for (q = 0; q < hairpinq; q++)
+					rss->queue[q] = q + rx_queues_count;
+			} else {
+				rss->conf.queue_num = rx_queues_count;
+				for (q = 0; q < rx_queues_count; q++)
+					rss->queue[q] = q;
+			}
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_SET_MAC_SRC:
+		case RTE_FLOW_ACTION_TYPE_SET_MAC_DST: {
+			struct rte_flow_action_set_mac *mac =
+				(struct rte_flow_action_set_mac *)(uintptr_t)actions[i].conf;
+			uint32_t val = unique_data ? counter : 1;
+			uint8_t j;
+			for (j = 0; j < RTE_ETHER_ADDR_LEN; j++) {
+				mac->mac_addr[j] = val & 0xff;
+				val >>= 8;
+			}
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_SET_IPV4_SRC:
+		case RTE_FLOW_ACTION_TYPE_SET_IPV4_DST: {
+			uint32_t ip = unique_data ? counter : 1;
+			((struct rte_flow_action_set_ipv4 *)(uintptr_t)actions[i].conf)->ipv4_addr =
+				RTE_BE32(ip + 1);
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_SET_IPV6_SRC:
+		case RTE_FLOW_ACTION_TYPE_SET_IPV6_DST: {
+			struct rte_flow_action_set_ipv6 *v6 =
+				(struct rte_flow_action_set_ipv6 *)(uintptr_t)actions[i].conf;
+			uint32_t val = unique_data ? counter : 1;
+			uint8_t j;
+			for (j = 0; j < 16; j++) {
+				v6->ipv6_addr.a[j] = val & 0xff;
+				val >>= 8;
+			}
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_SET_TP_SRC: {
+			uint32_t tp = unique_data ? counter : 100;
+			tp = tp % 0xffff;
+			((struct rte_flow_action_set_tp *)(uintptr_t)actions[i].conf)->port =
+				RTE_BE16(tp & 0xffff);
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_SET_TP_DST: {
+			uint32_t tp = unique_data ? counter : 100;
+			if (tp > 0xffff)
+				tp >>= 16;
+			((struct rte_flow_action_set_tp *)(uintptr_t)actions[i].conf)->port =
+				RTE_BE16(tp & 0xffff);
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_INC_TCP_ACK:
+		case RTE_FLOW_ACTION_TYPE_DEC_TCP_ACK:
+		case RTE_FLOW_ACTION_TYPE_INC_TCP_SEQ:
+		case RTE_FLOW_ACTION_TYPE_DEC_TCP_SEQ: {
+			uint32_t val = unique_data ? counter : 1;
+			*(rte_be32_t *)(uintptr_t)actions[i].conf = RTE_BE32(val);
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_SET_TTL: {
+			uint32_t val = unique_data ? counter : 1;
+			((struct rte_flow_action_set_ttl *)(uintptr_t)actions[i].conf)->ttl_value =
+				val % 0xff;
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_SET_IPV4_DSCP:
+		case RTE_FLOW_ACTION_TYPE_SET_IPV6_DSCP: {
+			uint32_t val = unique_data ? counter : 1;
+			((struct rte_flow_action_set_dscp *)(uintptr_t)actions[i].conf)->dscp =
+				val % 0xff;
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_PORT_ID:
+			((struct rte_flow_action_port_id *)(uintptr_t)actions[i].conf)->id =
+				dst_port;
+			break;
+		case RTE_FLOW_ACTION_TYPE_RAW_ENCAP: {
+			struct action_raw_encap_data *encap =
+				(struct action_raw_encap_data *)(uintptr_t)actions[i].conf;
+			uint8_t *header = encap->data;
+			struct rte_ether_hdr eth_hdr;
+			struct rte_ipv4_hdr ipv4_hdr;
+			struct rte_udp_hdr udp_hdr;
+
+			memset(&eth_hdr, 0, sizeof(eth_hdr));
+			if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_ETH)) {
+				if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_VLAN))
+					eth_hdr.ether_type = RTE_BE16(RTE_ETHER_TYPE_VLAN);
+				else if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_IPV4))
+					eth_hdr.ether_type = RTE_BE16(RTE_ETHER_TYPE_IPV4);
+				else if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_IPV6))
+					eth_hdr.ether_type = RTE_BE16(RTE_ETHER_TYPE_IPV6);
+				memcpy(header, &eth_hdr, sizeof(eth_hdr));
+				header += sizeof(eth_hdr);
+			}
+			if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_IPV4)) {
+				uint32_t ip_dst = unique_data ? counter : 1;
+				memset(&ipv4_hdr, 0, sizeof(ipv4_hdr));
+				ipv4_hdr.src_addr = RTE_IPV4(127, 0, 0, 1);
+				ipv4_hdr.dst_addr = RTE_BE32(ip_dst);
+				ipv4_hdr.version_ihl = RTE_IPV4_VHL_DEF;
+				if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_UDP))
+					ipv4_hdr.next_proto_id = 17; /* UDP */
+				if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_GRE))
+					ipv4_hdr.next_proto_id = 47; /* GRE */
+				memcpy(header, &ipv4_hdr, sizeof(ipv4_hdr));
+				header += sizeof(ipv4_hdr);
+			}
+			if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_UDP)) {
+				memset(&udp_hdr, 0, sizeof(udp_hdr));
+				if (encap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_VXLAN))
+					udp_hdr.dst_port = RTE_BE16(RTE_VXLAN_DEFAULT_PORT);
+				memcpy(header, &udp_hdr, sizeof(udp_hdr));
+				header += sizeof(udp_hdr);
+			}
+			encap->conf.size = header - encap->data;
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_RAW_DECAP: {
+			struct action_raw_decap_data *decap_d =
+				(struct action_raw_decap_data *)(uintptr_t)actions[i].conf;
+			uint8_t *header = decap_d->data;
+			struct rte_ether_hdr eth_hdr;
+
+			memset(&eth_hdr, 0, sizeof(eth_hdr));
+			if (decap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_ETH)) {
+				if (decap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_IPV4))
+					eth_hdr.ether_type = RTE_BE16(RTE_ETHER_TYPE_IPV4);
+				else if (decap_data & FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_IPV6))
+					eth_hdr.ether_type = RTE_BE16(RTE_ETHER_TYPE_IPV6);
+				memcpy(header, &eth_hdr, sizeof(eth_hdr));
+				header += sizeof(eth_hdr);
+			}
+			decap_d->conf.size = header - decap_d->data;
+			break;
+		}
+		case RTE_FLOW_ACTION_TYPE_VXLAN_ENCAP: {
+			uint8_t *base = (uint8_t *)(uintptr_t)actions[i].conf;
+			struct rte_flow_item *vitems =
+				(struct rte_flow_item
+					 *)(base + sizeof(struct rte_flow_action_vxlan_encap));
+			uint32_t ip_dst = unique_data ? counter : 1;
+			/* vitems[1] is IPV4 */
+			((struct rte_flow_item_ipv4 *)(uintptr_t)vitems[1].spec)->hdr.dst_addr =
+				RTE_BE32(ip_dst);
+			break;
+		}
+		default:
+			break;
+		}
+	}
+}
+
+int
+async_flow_init_port(uint16_t port_id, uint32_t nb_queues, uint32_t queue_size,
+		     uint64_t *flow_items, uint64_t *flow_actions, uint64_t *flow_attrs,
+		     uint8_t flow_group, uint32_t rules_count)
+{
+	struct rte_flow_port_info port_info = {0};
+	struct rte_flow_queue_info queue_info = {0};
+	struct rte_flow_error error = {0};
+	struct rte_flow_port_attr port_attr = {0};
+	struct rte_flow_queue_attr queue_attr;
+	const struct rte_flow_queue_attr **queue_attr_list;
+	struct rte_flow_pattern_template_attr pt_attr = {0};
+	struct rte_flow_actions_template_attr at_attr = {0};
+	struct rte_flow_template_table_attr table_attr = {0};
+	struct rte_flow_item pattern[MAX_ITEMS_NUM];
+	struct rte_flow_action actions[MAX_ACTIONS_NUM];
+	struct rte_flow_action action_masks[MAX_ACTIONS_NUM];
+	size_t item_spec_sizes[MAX_ITEMS_NUM];
+	size_t action_conf_sizes[MAX_ACTIONS_NUM];
+	uint32_t n_items, n_actions;
+	struct async_flow_resources *res;
+	bool need_wire_orig_table = false;
+	uint32_t i;
+	int ret;
+
+	if (port_id >= MAX_PORTS)
+		return -1;
+
+	res = &port_resources[port_id];
+	memset(res, 0, sizeof(*res));
+
+	/* Query port flow info */
+	ret = rte_flow_info_get(port_id, &port_info, &queue_info, &error);
+	if (ret != 0) {
+		fprintf(stderr, "Port %u: rte_flow_info_get failed: %s\n", port_id,
+			error.message ? error.message : "(no message)");
+		return ret;
+	}
+
+	if (port_info.max_nb_queues == 0 || queue_info.max_size == 0) {
+		fprintf(stderr, "Port %u: rte_flow_info_get reports that no queues are supported\n",
+			port_id);
+		return -1;
+	}
+
+	/* Limit to device capabilities if reported */
+	if (port_info.max_nb_queues != 0 && port_info.max_nb_queues != UINT32_MAX &&
+	    nb_queues > port_info.max_nb_queues)
+		nb_queues = port_info.max_nb_queues;
+	if (queue_info.max_size != 0 && queue_info.max_size != UINT32_MAX &&
+	    queue_size > queue_info.max_size)
+		queue_size = queue_info.max_size;
+
+	/* Slot ring uses bitmask wrapping, so queue_size must be power of 2 */
+	queue_size = rte_align32prevpow2(queue_size);
+	if (queue_size == 0) {
+		fprintf(stderr, "Port %u: queue_size is 0 after rounding\n", port_id);
+		return -EINVAL;
+	}
+
+	for (i = 0; i < MAX_ATTRS_NUM; i++) {
+		if (flow_attrs[i] == 0)
+			break;
+		if (flow_attrs[i] & INGRESS)
+			pt_attr.ingress = 1;
+		else if (flow_attrs[i] & EGRESS)
+			pt_attr.egress = 1;
+		else if (flow_attrs[i] & TRANSFER)
+			pt_attr.transfer = 1;
+	}
+	/* Enable relaxed matching for better performance */
+	pt_attr.relaxed_matching = 1;
+
+	memset(pattern, 0, sizeof(pattern));
+	memset(actions, 0, sizeof(actions));
+	memset(action_masks, 0, sizeof(action_masks));
+
+	/* Fill templates and gather per-item/action sizes */
+	fill_items_template(pattern, flow_items, 0, 0, item_spec_sizes, &n_items);
+
+	at_attr.ingress = pt_attr.ingress;
+	at_attr.egress = pt_attr.egress;
+	at_attr.transfer = pt_attr.transfer;
+
+	fill_actions_template(actions, action_masks, flow_actions, &port_attr,
+			      &need_wire_orig_table, action_conf_sizes, &n_actions);
+
+	/* fill_actions_template count the number of actions that require each kind of object,
+	 * so we multiply by the number of rules to have correct number */
+	port_attr.nb_counters *= rules_count;
+	port_attr.nb_aging_objects *= rules_count;
+	port_attr.nb_meters *= rules_count;
+	port_attr.nb_conn_tracks *= rules_count;
+	port_attr.nb_quotas *= rules_count;
+
+	table_attr.flow_attr.group = flow_group;
+	table_attr.flow_attr.priority = 0;
+	table_attr.flow_attr.ingress = pt_attr.ingress;
+	table_attr.flow_attr.egress = pt_attr.egress;
+	table_attr.flow_attr.transfer = pt_attr.transfer;
+	table_attr.nb_flows = rules_count;
+
+	if (pt_attr.transfer && need_wire_orig_table)
+		table_attr.specialize = RTE_FLOW_TABLE_SPECIALIZE_TRANSFER_WIRE_ORIG;
+
+	queue_attr_list = malloc(sizeof(*queue_attr_list) * nb_queues);
+	if (queue_attr_list == NULL) {
+		fprintf(stderr, "Port %u: failed to allocate queue_attr_list\n", port_id);
+		return -ENOMEM;
+	}
+
+	queue_attr.size = queue_size;
+	for (i = 0; i < nb_queues; i++)
+		queue_attr_list[i] = &queue_attr;
+
+	ret = rte_flow_configure(port_id, &port_attr, nb_queues, queue_attr_list, &error);
+
+	free(queue_attr_list);
+
+	if (ret != 0) {
+		fprintf(stderr, "Port %u: rte_flow_configure failed (ret=%d, type=%d): %s\n",
+			port_id, ret, error.type, error.message ? error.message : "(no message)");
+		return ret;
+	}
+
+	/* Create pattern template */
+	res->pattern_template =
+		rte_flow_pattern_template_create(port_id, &pt_attr, pattern, &error);
+	if (res->pattern_template == NULL) {
+		fprintf(stderr, "Port %u: pattern template create failed: %s\n", port_id,
+			error.message ? error.message : "(no message)");
+		return -1;
+	}
+
+	/* Create actions template */
+	res->actions_template =
+		rte_flow_actions_template_create(port_id, &at_attr, actions, action_masks, &error);
+	if (res->actions_template == NULL) {
+		fprintf(stderr, "Port %u: actions template create failed: %s\n", port_id,
+			error.message ? error.message : "(no message)");
+		rte_flow_pattern_template_destroy(port_id, res->pattern_template, &error);
+		res->pattern_template = NULL;
+		return -1;
+	}
+
+	/* Create template table */
+	res->table = rte_flow_template_table_create(port_id, &table_attr, &res->pattern_template, 1,
+						    &res->actions_template, 1, &error);
+	if (res->table == NULL) {
+		fprintf(stderr, "Port %u: template table create failed: %s\n", port_id,
+			error.message ? error.message : "(no message)");
+		rte_flow_actions_template_destroy(port_id, res->actions_template, &error);
+		rte_flow_pattern_template_destroy(port_id, res->pattern_template, &error);
+		res->pattern_template = NULL;
+		res->actions_template = NULL;
+		return -1;
+	}
+
+	/* Allocate and pre-initialize per-slot flat buffers */
+	ret = init_slot_pool(res, nb_queues, queue_size, pattern, n_items, item_spec_sizes, actions,
+			     n_actions, action_conf_sizes);
+	if (ret != 0) {
+		fprintf(stderr, "Port %u: slot pool init failed\n", port_id);
+		rte_flow_template_table_destroy(port_id, res->table, &error);
+		rte_flow_actions_template_destroy(port_id, res->actions_template, &error);
+		rte_flow_pattern_template_destroy(port_id, res->pattern_template, &error);
+		res->table = NULL;
+		res->actions_template = NULL;
+		res->pattern_template = NULL;
+		return ret;
+	}
+
+	res->table_capacity = rules_count;
+	res->initialized = true;
+
+	printf(":: Port %u: Async flow engine initialized (queues=%u, queue_size=%u)\n", port_id,
+	       nb_queues, queue_size);
+
+	return 0;
+}
+
+struct rte_flow *
+async_generate_flow(uint16_t port_id, uint32_t queue_id, uint32_t counter, uint16_t hairpinq,
+		    uint64_t encap_data, uint64_t decap_data, uint16_t dst_port, uint8_t core_idx,
+		    uint8_t rx_queues_count, bool unique_data, bool postpone,
+		    struct rte_flow_error *error)
+{
+	struct async_flow_resources *res;
+	struct async_flow_queue *q;
+	uint8_t *slot;
+	uint32_t idx, items_array_bytes;
+	struct rte_flow_item *items;
+	struct rte_flow_action *actions;
+	struct rte_flow_op_attr op_attr = {
+		.postpone = postpone,
+	};
+
+	if (port_id >= MAX_PORTS) {
+		rte_flow_error_set(error, EINVAL, RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
+				   "Invalid port ID");
+		return NULL;
+	}
+
+	res = &port_resources[port_id];
+	if (!res->initialized) {
+		rte_flow_error_set(error, EINVAL, RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
+				   "Async flow resources not initialized");
+		return NULL;
+	}
+
+	if (queue_id >= res->nb_queues) {
+		rte_flow_error_set(error, EINVAL, RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
+				   "Invalid queue ID");
+		return NULL;
+	}
+
+	/* Pick the next slot from this queue's ring */
+	q = &res->queues[queue_id];
+	idx = q->head;
+	q->head = (idx + 1) & (res->slots_per_queue - 1);
+	slot = q->slots + (size_t)idx * res->slot_size;
+	items_array_bytes = res->n_items * sizeof(struct rte_flow_item);
+	items = (struct rte_flow_item *)slot;
+	actions = (struct rte_flow_action *)(slot + items_array_bytes);
+
+	/* Update only per-flow varying values */
+	update_item_values(items, counter);
+	update_action_values(actions, counter, hairpinq, encap_data, decap_data, core_idx,
+			     unique_data, rx_queues_count, dst_port);
+
+	return rte_flow_async_create(port_id, queue_id, &op_attr, res->table, items, 0, actions, 0,
+				     NULL, error);
+}
+
+void
+async_flow_cleanup_port(uint16_t port_id)
+{
+	struct async_flow_resources *res;
+	struct rte_flow_error error;
+	struct rte_flow_op_result results[64];
+	int ret, i;
+
+	if (port_id >= MAX_PORTS)
+		return;
+
+	res = &port_resources[port_id];
+	if (!res->initialized)
+		return;
+
+	/* Drain any pending async completions from flow flush */
+	for (i = 0; i < DRAIN_MAX_ITERATIONS; i++) {
+		rte_flow_push(port_id, 0, &error);
+		ret = rte_flow_pull(port_id, 0, results, 64, &error);
+		if (ret <= 0)
+			break;
+	}
+
+	if (res->table != NULL) {
+		rte_flow_template_table_destroy(port_id, res->table, &error);
+		res->table = NULL;
+	}
+
+	if (res->actions_template != NULL) {
+		rte_flow_actions_template_destroy(port_id, res->actions_template, &error);
+		res->actions_template = NULL;
+	}
+
+	if (res->pattern_template != NULL) {
+		rte_flow_pattern_template_destroy(port_id, res->pattern_template, &error);
+		res->pattern_template = NULL;
+	}
+
+	free(res->queues);
+	res->queues = NULL;
+	free(res->slot_pool);
+	res->slot_pool = NULL;
+	free(res->shared_masks);
+	res->shared_masks = NULL;
+
+	res->initialized = false;
+}
diff --git a/app/test-flow-perf/async_flow.h b/app/test-flow-perf/async_flow.h
new file mode 100644
index 0000000000..8c12924bc6
--- /dev/null
+++ b/app/test-flow-perf/async_flow.h
@@ -0,0 +1,54 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright 2026 Maxime Peim <maxime.peim@gmail.com>
+ *
+ * This file contains the async flow API related definitions
+ * and function declarations.
+ */
+
+#ifndef FLOW_PERF_ASYNC_FLOW
+#define FLOW_PERF_ASYNC_FLOW
+
+#include <rte_flow.h>
+#include <stdbool.h>
+#include <stdint.h>
+
+#include "config.h"
+
+/* Per-queue slot ring — tracks which slot to use next */
+struct async_flow_queue {
+	uint8_t *slots; /* pointer to this queue's region within slot_pool */
+	uint32_t head;	/* next slot index (wraps mod slots_per_queue) */
+};
+
+/* Per-port async flow resources */
+struct async_flow_resources {
+	struct rte_flow_pattern_template *pattern_template;
+	struct rte_flow_actions_template *actions_template;
+	struct rte_flow_template_table *table;
+	uint8_t *slot_pool;    /* flat buffer pool for all slots */
+	uint8_t *shared_masks; /* shared item mask data (one copy for all slots) */
+	struct async_flow_queue *queues;
+	uint32_t slot_size;	  /* bytes per slot (cache-line aligned) */
+	uint32_t slots_per_queue; /* = queue_size */
+	uint32_t nb_queues;
+	uint32_t n_items;   /* item count (excl. END) */
+	uint32_t n_actions; /* action count (excl. END) */
+	uint32_t table_capacity;
+	bool initialized;
+};
+
+/* Initialize async flow engine for a port */
+int async_flow_init_port(uint16_t port_id, uint32_t nb_queues, uint32_t queue_size,
+			 uint64_t *flow_items, uint64_t *flow_actions, uint64_t *flow_attrs,
+			 uint8_t flow_group, uint32_t rules_count);
+
+/* Create a flow rule asynchronously using pre-allocated slot */
+struct rte_flow *async_generate_flow(uint16_t port_id, uint32_t queue_id, uint32_t counter,
+				     uint16_t hairpinq, uint64_t encap_data, uint64_t decap_data,
+				     uint16_t dst_port, uint8_t core_idx, uint8_t rx_queues_count,
+				     bool unique_data, bool postpone, struct rte_flow_error *error);
+
+/* Cleanup async flow resources for a port */
+void async_flow_cleanup_port(uint16_t port_id);
+
+#endif /* FLOW_PERF_ASYNC_FLOW */
diff --git a/app/test-flow-perf/items_gen.c b/app/test-flow-perf/items_gen.c
index c740e1838f..58f1c16cf8 100644
--- a/app/test-flow-perf/items_gen.c
+++ b/app/test-flow-perf/items_gen.c
@@ -389,3 +389,61 @@ fill_items(struct rte_flow_item *items,
 
 	items[items_counter].type = RTE_FLOW_ITEM_TYPE_END;
 }
+
+static size_t
+item_spec_size(enum rte_flow_item_type type)
+{
+	switch (type) {
+	case RTE_FLOW_ITEM_TYPE_ETH:
+		return sizeof(struct rte_flow_item_eth);
+	case RTE_FLOW_ITEM_TYPE_VLAN:
+		return sizeof(struct rte_flow_item_vlan);
+	case RTE_FLOW_ITEM_TYPE_IPV4:
+		return sizeof(struct rte_flow_item_ipv4);
+	case RTE_FLOW_ITEM_TYPE_IPV6:
+		return sizeof(struct rte_flow_item_ipv6);
+	case RTE_FLOW_ITEM_TYPE_TCP:
+		return sizeof(struct rte_flow_item_tcp);
+	case RTE_FLOW_ITEM_TYPE_UDP:
+		return sizeof(struct rte_flow_item_udp);
+	case RTE_FLOW_ITEM_TYPE_VXLAN:
+		return sizeof(struct rte_flow_item_vxlan);
+	case RTE_FLOW_ITEM_TYPE_VXLAN_GPE:
+		return sizeof(struct rte_flow_item_vxlan_gpe);
+	case RTE_FLOW_ITEM_TYPE_GRE:
+		return sizeof(struct rte_flow_item_gre);
+	case RTE_FLOW_ITEM_TYPE_GENEVE:
+		return sizeof(struct rte_flow_item_geneve);
+	case RTE_FLOW_ITEM_TYPE_GTP:
+		return sizeof(struct rte_flow_item_gtp);
+	case RTE_FLOW_ITEM_TYPE_META:
+		return sizeof(struct rte_flow_item_meta);
+	case RTE_FLOW_ITEM_TYPE_TAG:
+		return sizeof(struct rte_flow_item_tag);
+	case RTE_FLOW_ITEM_TYPE_ICMP:
+		return sizeof(struct rte_flow_item_icmp);
+	case RTE_FLOW_ITEM_TYPE_ICMP6:
+		return sizeof(struct rte_flow_item_icmp6);
+	default:
+		return 0;
+	}
+}
+
+void
+fill_items_template(struct rte_flow_item *items, uint64_t *flow_items, uint32_t outer_ip_src,
+		    uint8_t core_idx, size_t *spec_sizes, uint32_t *n_items_out)
+{
+	uint32_t count;
+
+	fill_items(items, flow_items, outer_ip_src, core_idx);
+
+	/* Count items before END */
+	for (count = 0; items[count].type != RTE_FLOW_ITEM_TYPE_END; count++) {
+		spec_sizes[count] = item_spec_size(items[count].type);
+		/* For templates, set spec to NULL - only mask matters for template matching */
+		items[count].spec = NULL;
+	}
+
+	/* take END into account */
+	*n_items_out = count + 1;
+}
diff --git a/app/test-flow-perf/items_gen.h b/app/test-flow-perf/items_gen.h
index f4b0e9a981..0987f7be3c 100644
--- a/app/test-flow-perf/items_gen.h
+++ b/app/test-flow-perf/items_gen.h
@@ -15,4 +15,10 @@
 void fill_items(struct rte_flow_item *items, uint64_t *flow_items,
 	uint32_t outer_ip_src, uint8_t core_idx);
 
+/* Fill items template for async flow API (masks only, no spec values).
+ * If spec_sizes is non-NULL, populates per-item spec sizes and n_items_out.
+ */
+void fill_items_template(struct rte_flow_item *items, uint64_t *flow_items, uint32_t outer_ip_src,
+			 uint8_t core_idx, size_t *spec_sizes, uint32_t *n_items_out);
+
 #endif /* FLOW_PERF_ITEMS_GEN */
diff --git a/app/test-flow-perf/main.c b/app/test-flow-perf/main.c
index 6636d1517f..2c6def95c2 100644
--- a/app/test-flow-perf/main.c
+++ b/app/test-flow-perf/main.c
@@ -37,11 +37,15 @@
 #include <rte_mtr.h>
 #include <rte_os_shim.h>
 
-#include "config.h"
 #include "actions_gen.h"
+#include "async_flow.h"
+#include "config.h"
 #include "flow_gen.h"
+#include "rte_build_config.h"
 
 #define MAX_BATCHES_COUNT          100
+#define MAX_ASYNC_QUEUE_SIZE	     (1 << 14)
+#define MAX_PULL_RETRIES	     (1 << 20)
 #define DEFAULT_RULES_COUNT    4000000
 #define DEFAULT_RULES_BATCH     100000
 #define DEFAULT_GROUP                0
@@ -55,7 +59,6 @@
 #define HAIRPIN_TX_CONF_LOCKED_MEMORY (0x0100)
 #define HAIRPIN_TX_CONF_RTE_MEMORY    (0x0200)
 
-struct rte_flow *flow;
 static uint8_t flow_group;
 
 static uint64_t encap_data;
@@ -81,6 +84,9 @@ static bool enable_fwd;
 static bool unique_data;
 static bool policy_mtr;
 static bool packet_mode;
+static bool async_mode;
+static uint32_t async_queue_size = 1024;
+static uint32_t async_push_batch = 256;
 
 static uint8_t rx_queues_count;
 static uint8_t tx_queues_count;
@@ -598,6 +604,29 @@ usage(char *progname)
 		"Encapped data is fixed with pattern: ether,ipv4,udp,vxlan\n"
 		"With fixed values\n");
 	printf("  --vxlan-decap: add vxlan_decap action to flow actions\n");
+
+	printf("\nAsync flow API options:\n");
+	printf("  --async: enable async flow API mode\n");
+	printf("  --async-queue-size=N: size of each async queue,"
+	       " default is 1024\n");
+	printf("  --async-push-batch=N: flows to batch before push,"
+	       " default is 256\n");
+}
+
+static inline uint32_t
+prev_power_of_two(uint32_t x)
+{
+	uint32_t saved = x;
+	x--;
+	x |= x >> 1;
+	x |= x >> 2;
+	x |= x >> 4;
+	x |= x >> 8;
+	x |= x >> 16;
+	x++;
+	if (x == saved)
+		return x;
+	return x >> 1;
 }
 
 static void
@@ -734,6 +763,9 @@ args_parse(int argc, char **argv)
 		{ "policy-mtr",                 1, 0, 0 },
 		{ "meter-profile",              1, 0, 0 },
 		{ "packet-mode",                0, 0, 0 },
+		{ "async",                      0, 0, 0 },
+		{ "async-queue-size",           1, 0, 0 },
+		{ "async-push-batch",           1, 0, 0 },
 		{ 0, 0, 0, 0 },
 	};
 
@@ -913,8 +945,7 @@ args_parse(int argc, char **argv)
 					rte_exit(EXIT_FAILURE, "Invalid hairpin config mask\n");
 				hairpin_conf_mask = hp_conf;
 			}
-			if (strcmp(lgopts[opt_idx].name,
-					"port-id") == 0) {
+			if (strcmp(lgopts[opt_idx].name, "port-id") == 0) {
 				uint16_t port_idx = 0;
 
 				token = strtok(optarg, ",");
@@ -981,6 +1012,26 @@ args_parse(int argc, char **argv)
 			}
 			if (strcmp(lgopts[opt_idx].name, "packet-mode") == 0)
 				packet_mode = true;
+			if (strcmp(lgopts[opt_idx].name, "async") == 0)
+				async_mode = true;
+			if (strcmp(lgopts[opt_idx].name, "async-queue-size") == 0) {
+				n = atoi(optarg);
+				if (n >= MAX_ASYNC_QUEUE_SIZE)
+					async_queue_size = MAX_ASYNC_QUEUE_SIZE;
+				else if (n > 0)
+					async_queue_size = prev_power_of_two(n);
+				else
+					rte_exit(EXIT_FAILURE, "async-queue-size should be > 0\n");
+			}
+			if (strcmp(lgopts[opt_idx].name, "async-push-batch") == 0) {
+				n = atoi(optarg);
+				if (n >= MAX_ASYNC_QUEUE_SIZE >> 1)
+					async_push_batch = MAX_ASYNC_QUEUE_SIZE >> 1;
+				else if (n > 0)
+					async_push_batch = prev_power_of_two(n);
+				else
+					rte_exit(EXIT_FAILURE, "async-push-batch should be > 0\n");
+			}
 			break;
 		default:
 			usage(argv[0]);
@@ -1457,10 +1508,10 @@ query_flows(int port_id, uint8_t core_id, struct rte_flow **flows_list)
 	mc_pool.flows_record.query[port_id][core_id] = cpu_time_used;
 }
 
-static struct rte_flow **
-insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id)
+static void
+insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id, struct rte_flow **flows_list)
 {
-	struct rte_flow **flows_list;
+	struct rte_flow *flow;
 	struct rte_flow_error error;
 	clock_t start_batch, end_batch;
 	double first_flow_latency;
@@ -1485,8 +1536,7 @@ insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id)
 	global_items[0] = FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_ETH);
 	global_actions[0] = FLOW_ITEM_MASK(RTE_FLOW_ACTION_TYPE_JUMP);
 
-	flows_list = rte_zmalloc("flows_list",
-		(sizeof(struct rte_flow *) * (rules_count_per_core + 1)), 0);
+	flows_list = malloc(sizeof(struct rte_flow *) * (rules_count_per_core + 1));
 	if (flows_list == NULL)
 		rte_exit(EXIT_FAILURE, "No Memory available!\n");
 
@@ -1524,6 +1574,11 @@ insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id)
 			core_id, rx_queues_count,
 			unique_data, max_priority, &error);
 
+		if (!flow) {
+			print_flow_error(error);
+			rte_exit(EXIT_FAILURE, "Error in creating flow\n");
+		}
+
 		if (!counter) {
 			first_flow_latency = (double) (rte_get_timer_cycles() - start_batch);
 			first_flow_latency /= rte_get_timer_hz();
@@ -1537,11 +1592,6 @@ insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id)
 		if (force_quit)
 			counter = end_counter;
 
-		if (!flow) {
-			print_flow_error(error);
-			rte_exit(EXIT_FAILURE, "Error in creating flow\n");
-		}
-
 		flows_list[flow_index++] = flow;
 
 		/*
@@ -1575,7 +1625,203 @@ insert_flows(int port_id, uint8_t core_id, uint16_t dst_port_id)
 		port_id, core_id, rules_count_per_core, cpu_time_used);
 
 	mc_pool.flows_record.insertion[port_id][core_id] = cpu_time_used;
-	return flows_list;
+}
+
+static uint32_t push_counter[RTE_MAX_LCORE];
+
+static inline int
+push_pull_flows_async(int port_id, int queue_id, int core_id, uint32_t enqueued, bool empty,
+		      bool check_op_status, struct rte_flow_error *error)
+{
+	static struct rte_flow_op_result results[RTE_MAX_LCORE][MAX_ASYNC_QUEUE_SIZE];
+	uint32_t to_pull = (empty || async_push_batch > enqueued) ? enqueued : async_push_batch;
+	uint32_t pulled_complete = 0;
+	uint32_t retries = 0;
+	int pulled, i;
+	int ret = 0;
+
+	/* Push periodically to give HW work to do */
+	ret = rte_flow_push(port_id, queue_id, error);
+	if (ret)
+		return ret;
+	push_counter[core_id]++;
+
+	/* Check if queue is getting full, if so push and drain completions */
+	if (!empty && push_counter[core_id] == 1)
+		return 0;
+
+	while (to_pull > 0) {
+		pulled = rte_flow_pull(port_id, queue_id, results[core_id], to_pull, error);
+		if (pulled < 0) {
+			return -1;
+		} else if (pulled == 0) {
+			if (++retries > MAX_PULL_RETRIES) {
+				rte_flow_error_set(error, ETIMEDOUT,
+						   RTE_FLOW_ERROR_TYPE_UNSPECIFIED, NULL,
+						   "Timeout waiting for async completions");
+				return -1;
+			}
+			rte_pause();
+			continue;
+		}
+		retries = 0;
+
+		to_pull -= pulled;
+		pulled_complete += pulled;
+		if (!check_op_status)
+			continue;
+
+		for (i = 0; i < pulled; i++) {
+			if (results[core_id][i].status != RTE_FLOW_OP_SUCCESS) {
+				rte_flow_error_set(error, EINVAL, RTE_FLOW_ERROR_TYPE_UNSPECIFIED,
+						   NULL, "Some flow rule insertion failed");
+				return -1;
+			}
+		}
+	}
+
+	return pulled_complete;
+}
+
+static void
+insert_flows_async(int port_id, uint8_t core_id, uint16_t dst_port_id, struct rte_flow **flows_list)
+{
+	struct rte_flow *flow;
+	struct rte_flow_error error;
+	clock_t start_batch, end_batch;
+	double first_flow_latency;
+	double cpu_time_used;
+	double insertion_rate;
+	double cpu_time_per_batch[MAX_BATCHES_COUNT] = {0};
+	double delta;
+	uint32_t flow_index;
+	uint32_t counter, batch_counter, start_counter = 0, end_counter;
+	int rules_batch_idx;
+	int rules_count_per_core;
+	uint32_t enqueued = 0;
+	uint32_t queue_id = core_id;
+	bool first_batch = true;
+	int pulled;
+
+	rules_count_per_core = rules_count / mc_pool.cores_count;
+
+	if (async_push_batch > async_queue_size >> 1)
+		async_push_batch = async_queue_size >> 1;
+
+	/* Set boundaries of rules for each core. */
+	if (core_id)
+		start_counter = core_id * rules_count_per_core;
+	end_counter = (core_id + 1) * rules_count_per_core;
+
+	cpu_time_used = 0;
+	flow_index = 0;
+	push_counter[core_id] = 0;
+
+	if (flow_group > 0 && core_id == 0) {
+		/*
+		 * Create global rule to jump into flow_group,
+		 * this way the app will avoid the default rules.
+		 *
+		 * This rule will be created only once.
+		 *
+		 * Global rule:
+		 * group 0 eth / end actions jump group <flow_group>
+		 */
+
+		uint64_t global_items[MAX_ITEMS_NUM] = {0};
+		uint64_t global_actions[MAX_ACTIONS_NUM] = {0};
+		global_items[0] = FLOW_ITEM_MASK(RTE_FLOW_ITEM_TYPE_ETH);
+		global_actions[0] = FLOW_ITEM_MASK(RTE_FLOW_ACTION_TYPE_JUMP);
+		flow = generate_flow(port_id, 0, flow_attrs, global_items, global_actions,
+				     flow_group, 0, 0, 0, 0, dst_port_id, core_id, rx_queues_count,
+				     unique_data, max_priority, &error);
+
+		if (flow == NULL) {
+			print_flow_error(error);
+			rte_exit(EXIT_FAILURE, "Error in creating flow\n");
+		}
+		flows_list[flow_index++] = flow;
+	}
+
+	start_batch = rte_get_timer_cycles();
+	for (counter = start_counter; counter < end_counter;) {
+		/* batch adding flow rules, this avoids unnecessary checks for push/pull */
+		for (batch_counter = 0; batch_counter < async_push_batch && counter < end_counter;
+		     batch_counter++, counter++) {
+			/* Create flow with postpone=true to batch operations */
+			flow = async_generate_flow(port_id, queue_id, counter, hairpin_queues_num,
+						   encap_data, decap_data, dst_port_id, core_id,
+						   rx_queues_count, unique_data, true, &error);
+
+			if (!flow) {
+				print_flow_error(error);
+				rte_exit(EXIT_FAILURE, "Error in creating async flow\n");
+			}
+
+			if (force_quit)
+				break;
+
+			flows_list[flow_index++] = flow;
+			enqueued++;
+
+			/*
+			 * Save the insertion rate for rules batch.
+			 * Check if the insertion reached the rules
+			 * patch counter, then save the insertion rate
+			 * for this batch.
+			 */
+			if (!((counter + 1) % rules_batch)) {
+				end_batch = rte_get_timer_cycles();
+				delta = (double)(end_batch - start_batch);
+				rules_batch_idx = ((counter + 1) / rules_batch) - 1;
+				cpu_time_per_batch[rules_batch_idx] = delta / rte_get_timer_hz();
+				cpu_time_used += cpu_time_per_batch[rules_batch_idx];
+				start_batch = rte_get_timer_cycles();
+			}
+		}
+
+		if ((pulled = push_pull_flows_async(port_id, queue_id, core_id, enqueued, false,
+						    true, &error)) < 0) {
+			print_flow_error(error);
+			rte_exit(EXIT_FAILURE, "Error push/pull async operations\n");
+		}
+
+		enqueued -= pulled;
+
+		if (first_batch) {
+			first_flow_latency = (double)(rte_get_timer_cycles() - start_batch);
+			first_flow_latency /= rte_get_timer_hz();
+			/* In millisecond */
+			first_flow_latency *= 1000;
+			printf(":: First Flow Batch Latency (Async) :: Port %d :: First batch (%u) "
+			       "installed in %f milliseconds\n",
+			       port_id, async_push_batch, first_flow_latency);
+			first_batch = false;
+		}
+	}
+
+	if (push_pull_flows_async(port_id, queue_id, core_id, enqueued, true, true, &error) < 0) {
+		print_flow_error(error);
+		rte_exit(EXIT_FAILURE, "Error final push/pull async operations\n");
+	}
+
+	/* Print insertion rates for all batches */
+	if (dump_iterations)
+		print_rules_batches(cpu_time_per_batch);
+
+	printf(":: Port %d :: Core %d boundaries (Async) :: start @[%d] - end @[%d]\n", port_id,
+	       core_id, start_counter, end_counter - 1);
+
+	/* Insertion rate for all rules in one core */
+	if (cpu_time_used > 0) {
+		insertion_rate = ((double)rules_count_per_core / cpu_time_used) / 1000;
+		printf(":: Port %d :: Core %d :: Async rules insertion rate -> %f K Rule/Sec\n",
+		       port_id, core_id, insertion_rate);
+	}
+	printf(":: Port %d :: Core %d :: The time for creating %d async rules is %f seconds\n",
+	       port_id, core_id, rules_count_per_core, cpu_time_used);
+
+	mc_pool.flows_record.insertion[port_id][core_id] = cpu_time_used;
 }
 
 static void
@@ -1585,12 +1831,18 @@ flows_handler(uint8_t core_id)
 	uint16_t port_idx = 0;
 	uint16_t nr_ports;
 	int port_id;
+	int rules_count_per_core;
 
 	nr_ports = rte_eth_dev_count_avail();
 
 	if (rules_batch > rules_count)
 		rules_batch = rules_count;
 
+	rules_count_per_core = rules_count / mc_pool.cores_count;
+	flows_list = malloc(sizeof(struct rte_flow *) * (rules_count_per_core + 1));
+	if (flows_list == NULL)
+		rte_exit(EXIT_FAILURE, "No Memory available!\n");
+
 	printf(":: Rules Count per port: %d\n\n", rules_count);
 
 	for (port_id = 0; port_id < nr_ports; port_id++) {
@@ -1602,10 +1854,10 @@ flows_handler(uint8_t core_id)
 		mc_pool.last_alloc[core_id] = (int64_t)dump_socket_mem(stdout);
 		if (has_meter())
 			meters_handler(port_id, core_id, METER_CREATE);
-		flows_list = insert_flows(port_id, core_id,
-						dst_ports[port_idx++]);
-		if (flows_list == NULL)
-			rte_exit(EXIT_FAILURE, "Error: Insertion Failed!\n");
+		if (async_mode)
+			insert_flows_async(port_id, core_id, dst_ports[port_idx++], flows_list);
+		else
+			insert_flows(port_id, core_id, dst_ports[port_idx++], flows_list);
 		mc_pool.current_alloc[core_id] = (int64_t)dump_socket_mem(stdout);
 
 		if (query_flag)
@@ -2212,6 +2464,16 @@ init_port(void)
 			}
 		}
 
+		/* Configure async flow engine before device start */
+		if (async_mode) {
+			ret = async_flow_init_port(port_id, mc_pool.cores_count, async_queue_size,
+						   flow_items, flow_actions, flow_attrs, flow_group,
+						   rules_count);
+			if (ret != 0)
+				rte_exit(EXIT_FAILURE, "Failed to init async flow on port %d\n",
+					 port_id);
+		}
+
 		ret = rte_eth_dev_start(port_id);
 		if (ret < 0)
 			rte_exit(EXIT_FAILURE,
@@ -2291,6 +2553,8 @@ main(int argc, char **argv)
 
 	RTE_ETH_FOREACH_DEV(port) {
 		rte_flow_flush(port, &error);
+		if (async_mode)
+			async_flow_cleanup_port(port);
 		if (rte_eth_dev_stop(port) != 0)
 			printf("Failed to stop device on port %u\n", port);
 		rte_eth_dev_close(port);
diff --git a/app/test-flow-perf/meson.build b/app/test-flow-perf/meson.build
index e101449e32..2f820a7597 100644
--- a/app/test-flow-perf/meson.build
+++ b/app/test-flow-perf/meson.build
@@ -3,6 +3,7 @@
 
 sources = files(
         'actions_gen.c',
+        'async_flow.c',
         'flow_gen.c',
         'items_gen.c',
         'main.c',
-- 
2.43.0


  reply	other threads:[~2026-03-02  8:05 UTC|newest]

Thread overview: 8+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-02-24 10:56 [PATCH] test/flow: add support for async API Maxime Peim
2026-02-25 22:23 ` Stephen Hemminger
2026-03-01 23:29   ` Maxime Peim [this message]
2026-03-01 23:36     ` [PATCH v2] " Maxime Peim
2026-03-02  0:52     ` Stephen Hemminger
2026-03-02 10:57   ` [PATCH v3] " Maxime Peim
2026-03-02 14:35   ` [PATCH v4] " Maxime Peim
2026-03-09 12:52   ` [PATCH v5] " Maxime Peim

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260301232931.444294-1-maxime.peim@gmail.com \
    --to=maxime.peim@gmail.com \
    --cc=dev@dpdk.org \
    --cc=stephen@networkplumber.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.