public inbox for dev@dpdk.org
 help / color / mirror / Atom feed
* [RFC PATCH dpdk 0/3] graph: deferred enqueue API for simplified node processing
@ 2026-02-05  9:26 Robin Jarry
  2026-02-05  9:26 ` [RFC PATCH dpdk 1/3] graph: optimize rte_node_enqueue_next to batch by edge Robin Jarry
                   ` (4 more replies)
  0 siblings, 5 replies; 8+ messages in thread
From: Robin Jarry @ 2026-02-05  9:26 UTC (permalink / raw)
  To: dev; +Cc: Jerin Jacob

This series introduces a deferred enqueue API for the graph library that
simplifies node development while maintaining performance.

The current node implementations use a manual speculation pattern where
each node pre-allocates destination buffer slots, tracks which packets
diverge from the speculated edge, and handles fixups at the end. This
results in complex boilerplate code with multiple local variables
(to_next, from, held, last_spec), memcpy calls, and stream get/put
operations repeated across every node.

The new rte_node_enqueue_deferred() API handles this automatically:
- Tracks runs of consecutive packets going to the same edge
- Flushes runs in bulk when the edge changes
- Uses rte_node_next_stream_move() (pointer swap) when all packets
  go to the same destination
- Preserves last_edge across invocations for cross-batch speculation

The deferred state is stored in the node's fast-path cache line 1,
alongside xstat_off, keeping frequently accessed data together.

Performance was measured with l3fwd forwarding between two ports of an
Intel E810-XXV 2x25G NIC (1 RX queue per port). Two graph worker threads
ran on hyper threads of the same physical core on an Intel Xeon Silver
4316 CPU @ 2.30GHz.

Results:
- Baseline (manual speculation): 37.0 Mpps
- Deferred API:                  36.2 Mpps (-2.2%)

The slight overhead comes from per-packet edge comparisons. However,
this is offset by:
- 826 fewer lines of code across 13 node implementations
- Reduced instruction cache pressure from simpler code paths
- Elimination of per-node speculation boilerplate
- Easier development of new nodes

Robin Jarry (3):
  graph: optimize rte_node_enqueue_next to batch by edge
  graph: add deferred enqueue API for batch processing
  node: use deferred enqueue API in process functions

 app/graph/ip4_output_hook.c         |  35 +-------
 lib/graph/graph_populate.c          |   1 +
 lib/graph/rte_graph_worker_common.h |  90 ++++++++++++++++++-
 lib/node/interface_tx_feature.c     | 105 +++-------------------
 lib/node/ip4_local.c                |  36 +-------
 lib/node/ip4_lookup.c               |  37 +-------
 lib/node/ip4_lookup_fib.c           |  36 +-------
 lib/node/ip4_lookup_neon.h          | 100 ++-------------------
 lib/node/ip4_lookup_sse.h           | 100 ++-------------------
 lib/node/ip4_rewrite.c              | 120 +++----------------------
 lib/node/ip6_lookup.c               |  95 ++------------------
 lib/node/ip6_lookup_fib.c           |  34 +-------
 lib/node/ip6_rewrite.c              | 118 +++----------------------
 lib/node/pkt_cls.c                  | 130 +++-------------------------
 lib/node/udp4_input.c               |  42 +--------
 15 files changed, 170 insertions(+), 909 deletions(-)

-- 
2.52.0


^ permalink raw reply	[flat|nested] 8+ messages in thread

* [RFC PATCH dpdk 1/3] graph: optimize rte_node_enqueue_next to batch by edge
  2026-02-05  9:26 [RFC PATCH dpdk 0/3] graph: deferred enqueue API for simplified node processing Robin Jarry
@ 2026-02-05  9:26 ` Robin Jarry
  2026-03-10  5:46   ` [EXTERNAL] " Pavan Nikhilesh Bhagavatula
  2026-02-05  9:26 ` [RFC PATCH dpdk 2/3] graph: add deferred enqueue API for batch processing Robin Jarry
                   ` (3 subsequent siblings)
  4 siblings, 1 reply; 8+ messages in thread
From: Robin Jarry @ 2026-02-05  9:26 UTC (permalink / raw)
  To: dev, Jerin Jacob, Kiran Kumar K, Nithin Dabilpuram, Zhirun Yan

Replace the per-object rte_node_enqueue_x1() calls with batched
rte_node_enqueue() calls. The function now tracks runs of consecutive
objects going to the same edge and flushes them in bulk.

When all objects go to the same edge and come from the node's own
buffer (objs == node->objs), use rte_node_next_stream_move() which
swaps pointers instead of copying.

Signed-off-by: Robin Jarry <rjarry@redhat.com>
---
 lib/graph/rte_graph_worker_common.h | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)

diff --git a/lib/graph/rte_graph_worker_common.h b/lib/graph/rte_graph_worker_common.h
index 4ab53a533e4c..7fda67c07169 100644
--- a/lib/graph/rte_graph_worker_common.h
+++ b/lib/graph/rte_graph_worker_common.h
@@ -432,10 +432,21 @@ static inline void
 rte_node_enqueue_next(struct rte_graph *graph, struct rte_node *node,
 		      rte_edge_t *nexts, void **objs, uint16_t nb_objs)
 {
+	rte_edge_t last = nexts[0];
+	uint16_t run_start = 0;
 	uint16_t i;
 
-	for (i = 0; i < nb_objs; i++)
-		rte_node_enqueue_x1(graph, node, nexts[i], objs[i]);
+	for (i = 1; i < nb_objs; i++) {
+		if (nexts[i] != last) {
+			rte_node_enqueue(graph, node, last, &objs[run_start], i - run_start);
+			run_start = i;
+			last = nexts[i];
+		}
+	}
+	if (run_start == 0 && objs == node->objs)
+		rte_node_next_stream_move(graph, node, last);
+	else
+		rte_node_enqueue(graph, node, last, &objs[run_start], nb_objs - run_start);
 }
 
 /**
-- 
2.52.0


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [RFC PATCH dpdk 2/3] graph: add deferred enqueue API for batch processing
  2026-02-05  9:26 [RFC PATCH dpdk 0/3] graph: deferred enqueue API for simplified node processing Robin Jarry
  2026-02-05  9:26 ` [RFC PATCH dpdk 1/3] graph: optimize rte_node_enqueue_next to batch by edge Robin Jarry
@ 2026-02-05  9:26 ` Robin Jarry
  2026-03-10  5:49   ` [EXTERNAL] " Pavan Nikhilesh Bhagavatula
  2026-02-05  9:26 ` [RFC PATCH dpdk 3/3] node: use deferred enqueue API in process functions Robin Jarry
                   ` (2 subsequent siblings)
  4 siblings, 1 reply; 8+ messages in thread
From: Robin Jarry @ 2026-02-05  9:26 UTC (permalink / raw)
  To: dev, Jerin Jacob, Kiran Kumar K, Nithin Dabilpuram, Zhirun Yan

Add rte_node_enqueue_deferred() which tracks runs of consecutive
objects going to the same edge and flushes them efficiently in bulk.
When all objects go to the same edge (the common case), the function
uses rte_node_next_stream_move() which swaps pointers instead of
copying data.

The deferred state (run_start and last_edge) is stored in the node
fast-path cache line 1, keeping it close to other frequently accessed
node data. The last_edge is preserved across node invocations,
allowing speculation: if traffic continues to the same destination,
no action is needed until the edge changes.

The flush is performed automatically at the end of node processing
by __rte_node_process().

Signed-off-by: Robin Jarry <rjarry@redhat.com>
---
 lib/graph/graph_populate.c          |  1 +
 lib/graph/rte_graph_worker_common.h | 75 +++++++++++++++++++++++++++++
 2 files changed, 76 insertions(+)

diff --git a/lib/graph/graph_populate.c b/lib/graph/graph_populate.c
index 026daecb2122..fda46a7dd386 100644
--- a/lib/graph/graph_populate.c
+++ b/lib/graph/graph_populate.c
@@ -84,6 +84,7 @@ graph_nodes_populate(struct graph *_graph)
 		struct rte_node *node = RTE_PTR_ADD(graph, off);
 		memset(node, 0, sizeof(*node));
 		node->fence = RTE_GRAPH_FENCE;
+		node->deferred_last_edge = RTE_EDGE_ID_INVALID;
 		node->off = off;
 		if (graph_pcap_is_enable()) {
 			node->process = graph_pcap_dispatch;
diff --git a/lib/graph/rte_graph_worker_common.h b/lib/graph/rte_graph_worker_common.h
index 7fda67c07169..c6741d44877c 100644
--- a/lib/graph/rte_graph_worker_common.h
+++ b/lib/graph/rte_graph_worker_common.h
@@ -119,6 +119,8 @@ struct __rte_cache_aligned rte_node {
 	/** Fast path area cache line 1. */
 	alignas(RTE_CACHE_LINE_MIN_SIZE)
 	rte_graph_off_t xstat_off; /**< Offset to xstat counters. */
+	uint16_t deferred_run_start; /**< Used by rte_node_enqueue_deferred(). */
+	rte_edge_t deferred_last_edge; /**< Used by rte_node_enqueue_deferred(). */
 
 	/** Fast path area cache line 2. */
 	__extension__ struct __rte_cache_aligned {
@@ -184,6 +186,8 @@ void __rte_node_stream_alloc_size(struct rte_graph *graph,
 
 /* Fast path helper functions */
 
+static inline void __rte_node_enqueue_deferred_flush(struct rte_graph *, struct rte_node *);
+
 /**
  * @internal
  *
@@ -204,6 +208,8 @@ __rte_node_process(struct rte_graph *graph, struct rte_node *node)
 	RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
 	objs = node->objs;
 	rte_prefetch0(objs);
+	node->deferred_run_start = 0;
+	/* Keep deferred_last_edge from previous invocation for speculation */
 
 	if (rte_graph_has_stats_feature()) {
 		start = rte_rdtsc();
@@ -214,6 +220,10 @@ __rte_node_process(struct rte_graph *graph, struct rte_node *node)
 	} else {
 		node->process(graph, node, objs, node->idx);
 	}
+
+	if (node->deferred_last_edge != RTE_EDGE_ID_INVALID)
+		__rte_node_enqueue_deferred_flush(graph, node);
+
 	node->idx = 0;
 }
 
@@ -412,6 +422,8 @@ rte_node_enqueue_x4(struct rte_graph *graph, struct rte_node *node,
 	node->idx = idx;
 }
 
+static inline void rte_node_next_stream_move(struct rte_graph *, struct rte_node *, rte_edge_t);
+
 /**
  * Enqueue objs to multiple next nodes for further processing and
  * set the next nodes to pending state in the circular buffer.
@@ -547,6 +559,69 @@ rte_node_next_stream_move(struct rte_graph *graph, struct rte_node *src,
 	}
 }
 
+/**
+ * Enqueue objects to a next node in a cache-efficient deferred manner.
+ *
+ * This function tracks runs of objects going to the same edge. When the edge
+ * changes, the previous run is flushed using bulk enqueue. At the end of node
+ * processing, any remaining objects are flushed automatically. When all
+ * objects go to the same edge (the common case), rte_node_next_stream_move()
+ * is used which swaps pointers instead of copying.
+ *
+ * The function does not require consecutive idx values. It can be called with
+ * any stride (e.g., 0, 4, 8, ... to process batches of 4). All objects from
+ * the previous idx up to the current one are considered part of the current
+ * run until the edge changes.
+ *
+ * For homogeneous traffic, the destination node structure is touched once
+ * per batch instead of once per object, reducing cache line bouncing.
+ *
+ * @param graph
+ *   Graph pointer returned from rte_graph_lookup().
+ * @param node
+ *   Current node pointer.
+ * @param next
+ *   Next node edge index.
+ * @param idx
+ *   Index of the current object being processed in node->objs[].
+ *
+ * @see rte_node_next_stream_move().
+ */
+static inline void
+rte_node_enqueue_deferred(struct rte_graph *graph, struct rte_node *node,
+			  rte_edge_t next, uint16_t idx)
+{
+	if (next != node->deferred_last_edge) {
+		/* edge changed, flush previous run if not empty */
+		if (idx > node->deferred_run_start)
+			rte_node_enqueue(graph, node, node->deferred_last_edge,
+					 &node->objs[node->deferred_run_start],
+					 idx - node->deferred_run_start);
+		node->deferred_run_start = idx;
+		node->deferred_last_edge = next;
+	}
+}
+
+/**
+ * @internal
+ * Flush any pending deferred enqueue at end of node processing.
+ */
+static inline void
+__rte_node_enqueue_deferred_flush(struct rte_graph *graph, struct rte_node *node)
+{
+	const uint16_t run_start = node->deferred_run_start;
+	const uint16_t count = node->idx;
+
+	if (run_start == 0 && count != 0) {
+		/* All packets went to the same edge - use stream move (pointer swap) */
+		rte_node_next_stream_move(graph, node, node->deferred_last_edge);
+	} else if (run_start < count) {
+		/* flush final run */
+		rte_node_enqueue(graph, node, node->deferred_last_edge,
+				 &node->objs[run_start], count - run_start);
+	}
+}
+
 /**
  * Test the validity of model.
  *
-- 
2.52.0


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [RFC PATCH dpdk 3/3] node: use deferred enqueue API in process functions
  2026-02-05  9:26 [RFC PATCH dpdk 0/3] graph: deferred enqueue API for simplified node processing Robin Jarry
  2026-02-05  9:26 ` [RFC PATCH dpdk 1/3] graph: optimize rte_node_enqueue_next to batch by edge Robin Jarry
  2026-02-05  9:26 ` [RFC PATCH dpdk 2/3] graph: add deferred enqueue API for batch processing Robin Jarry
@ 2026-02-05  9:26 ` Robin Jarry
  2026-03-10  5:31 ` [EXTERNAL] [RFC PATCH dpdk 0/3] graph: deferred enqueue API for simplified node processing Pavan Nikhilesh Bhagavatula
  2026-03-31  3:10 ` Stephen Hemminger
  4 siblings, 0 replies; 8+ messages in thread
From: Robin Jarry @ 2026-02-05  9:26 UTC (permalink / raw)
  To: dev, Sunil Kumar Kori, Rakesh Kudurumalla, Nithin Dabilpuram,
	Pavan Nikhilesh, Wathsala Vithanage, Bruce Richardson,
	Konstantin Ananyev
  Cc: Jerin Jacob

Convert all node process functions to use rte_node_enqueue_deferred()
instead of the manual speculation pattern with rte_node_enqueue_x1().

The deferred enqueue API automatically batches consecutive packets going
to the same edge and flushes them efficiently. When all packets go to
the same destination (the common case), it uses rte_node_next_stream_move()
which swaps pointers rather than copying.

This significantly simplifies the node process functions by removing
the speculation tracking logic (to_next, from, held, last_spec variables,
memcpy calls, and stream get/put operations).

The deferred state is stored in the node fast-path cache line, keeping
it close to other frequently accessed node data. The last_edge value
is preserved across invocations, providing cross-batch speculation for
free.

Performance testing with l3fwd on two E810 NICs shows throughput within
2-3% of the baseline while significantly reducing code complexity. The
slight overhead comes from per-packet edge comparisons, but this is
offset by reduced instruction cache pressure and simpler code paths.

Also remove unused speculation-related macros and fields from node
context structures:
- IP4_REWRITE_NODE_LAST_NEXT and next_index field
- IP6_REWRITE_NODE_LAST_NEXT and next_index field
- IF_TX_FEATURE_LAST_NEXT_INDEX and last_index field
- UDP4_INPUT_NODE_NEXT_INDEX and next_index field

Signed-off-by: Robin Jarry <rjarry@redhat.com>
---
 app/graph/ip4_output_hook.c     |  35 +--------
 lib/node/interface_tx_feature.c | 105 +++-----------------------
 lib/node/ip4_local.c            |  36 +--------
 lib/node/ip4_lookup.c           |  37 +--------
 lib/node/ip4_lookup_fib.c       |  36 +--------
 lib/node/ip4_lookup_neon.h      | 100 +++---------------------
 lib/node/ip4_lookup_sse.h       | 100 +++---------------------
 lib/node/ip4_rewrite.c          | 120 +++--------------------------
 lib/node/ip6_lookup.c           |  95 +++--------------------
 lib/node/ip6_lookup_fib.c       |  34 +--------
 lib/node/ip6_rewrite.c          | 118 +++--------------------------
 lib/node/pkt_cls.c              | 130 +++-----------------------------
 lib/node/udp4_input.c           |  42 +----------
 13 files changed, 81 insertions(+), 907 deletions(-)

diff --git a/app/graph/ip4_output_hook.c b/app/graph/ip4_output_hook.c
index 8757f294cb41..df048aacd22e 100644
--- a/app/graph/ip4_output_hook.c
+++ b/app/graph/ip4_output_hook.c
@@ -51,50 +51,19 @@ __app_graph_ip4_output_hook_node_process(struct rte_graph *graph, struct rte_nod
 	struct rte_graph_feature_arc *arc =
 		rte_graph_feature_arc_get(OUTPUT_HOOK_FEATURE_ARC(node->ctx));
 	struct rte_graph_feature_arc_mbuf_dynfields *mbfields = NULL;
-	uint16_t next = OUTPUT_HOOK_PKT_DROP;
-	void **to_next, **from;
-	uint16_t last_spec = 0;
-	rte_edge_t next_index;
 	struct rte_mbuf *mbuf;
-	uint16_t held = 0;
+	rte_edge_t next;
 	int i;
 
-	/* Speculative next */
-	next_index = OUTPUT_HOOK_LAST_NEXT_INDEX(node->ctx);
-
-	from = objs;
-	to_next = rte_node_next_stream_get(graph, node, next_index, nb_objs);
 	for (i = 0; i < nb_objs; i++) {
-
 		mbuf = (struct rte_mbuf *)objs[i];
 
 		/* Send mbuf to next enabled feature */
 		mbfields = rte_graph_feature_arc_mbuf_dynfields_get(mbuf, arc->mbuf_dyn_offset);
 		rte_graph_feature_data_next_feature_get(arc, &mbfields->feature_data, &next);
 
-		if (unlikely(next_index != next)) {
-			/* Copy things successfully speculated till now */
-			rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-			from += last_spec;
-			to_next += last_spec;
-			held += last_spec;
-			last_spec = 0;
-
-			rte_node_enqueue_x1(graph, node, next, from[0]);
-			from += 1;
-		} else {
-			last_spec += 1;
-		}
+		rte_node_enqueue_deferred(graph, node, next, i);
 	}
-	/* !!! Home run !!! */
-	if (likely(last_spec == nb_objs)) {
-		rte_node_next_stream_move(graph, node, next_index);
-		return nb_objs;
-	}
-	held += last_spec;
-	rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-	rte_node_next_stream_put(graph, node, next_index, held);
-	OUTPUT_HOOK_LAST_NEXT_INDEX(node->ctx) = next;
 
 	return nb_objs;
 }
diff --git a/lib/node/interface_tx_feature.c b/lib/node/interface_tx_feature.c
index c8809d5f913a..8b226cc1e27f 100644
--- a/lib/node/interface_tx_feature.c
+++ b/lib/node/interface_tx_feature.c
@@ -14,8 +14,6 @@
 #include "node_private.h"
 #include "interface_tx_feature_priv.h"
 
-#define IF_TX_FEATURE_LAST_NEXT_INDEX(ctx) \
-	(((struct if_tx_feature_node_ctx *)ctx)->last_index)
 /*
  * @internal array for mapping port to next node index
  */
@@ -23,10 +21,6 @@ struct if_tx_feature_node_main  {
 	uint16_t next_index[RTE_MAX_ETHPORTS];
 };
 
-struct if_tx_feature_node_ctx {
-	uint16_t last_index;
-};
-
 static struct if_tx_feature_node_main *if_tx_feature_nm;
 
 int
@@ -48,9 +42,7 @@ static int
 if_tx_feature_node_init(const struct rte_graph *graph, struct rte_node *node)
 {
 	RTE_SET_USED(graph);
-
-	/* pkt_drop */
-	IF_TX_FEATURE_LAST_NEXT_INDEX(node->ctx) = 0;
+	RTE_SET_USED(node);
 
 	return 0;
 }
@@ -59,21 +51,15 @@ static uint16_t
 if_tx_feature_node_process(struct rte_graph *graph, struct rte_node *node,
 			   void **objs, uint16_t nb_objs)
 {
-	uint16_t held = 0, next0 = 0, next1 = 0, next2 = 0, next3 = 0;
 	struct rte_mbuf *mbuf0, *mbuf1, *mbuf2, *mbuf3, **pkts;
-	uint16_t last_spec = 0, fix_spec = 0;
-	void **to_next, **from;
-	rte_edge_t next_index;
+	rte_edge_t next0, next1, next2, next3;
 	uint16_t n_left_from;
+	int i;
 
-	/* Speculative next */
-	next_index = IF_TX_FEATURE_LAST_NEXT_INDEX(node->ctx);
-
-	from = objs;
 	n_left_from = nb_objs;
 	pkts = (struct rte_mbuf **)objs;
 
-	to_next = rte_node_next_stream_get(graph, node, next_index, nb_objs);
+	i = 0;
 	while (n_left_from > 4) {
 		if (likely(n_left_from > 7)) {
 			/* Prefetch next mbuf */
@@ -95,57 +81,11 @@ if_tx_feature_node_process(struct rte_graph *graph, struct rte_node *node,
 		next2 = if_tx_feature_nm->next_index[mbuf2->port];
 		next3 = if_tx_feature_nm->next_index[mbuf3->port];
 
-		fix_spec = (next_index ^ next0) | (next_index ^ next1) |
-			(next_index ^ next2) | (next_index ^ next3);
-
-		if (unlikely(fix_spec)) {
-			/* Copy things successfully speculated till now */
-			rte_memcpy(to_next, from,
-				   last_spec * sizeof(from[0]));
-			from += last_spec;
-			to_next += last_spec;
-			held += last_spec;
-			last_spec = 0;
-
-			if (next0 == next_index) {
-				to_next[0] = from[0];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node,
-						    next0, from[0]);
-			}
-
-			if (next1 == next_index) {
-				to_next[0] = from[1];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node,
-						    next1, from[1]);
-			}
-
-			if (next2 == next_index) {
-				to_next[0] = from[2];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node,
-						    next2, from[2]);
-			}
-
-			if (next3 == next_index) {
-				to_next[0] = from[3];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node,
-						    next3, from[3]);
-			}
-			from += 4;
-		} else {
-			last_spec += 4;
-		}
+		rte_node_enqueue_deferred(graph, node, next0, i);
+		rte_node_enqueue_deferred(graph, node, next1, i + 1);
+		rte_node_enqueue_deferred(graph, node, next2, i + 2);
+		rte_node_enqueue_deferred(graph, node, next3, i + 3);
+		i += 4;
 	}
 
 	while (n_left_from > 0) {
@@ -155,34 +95,11 @@ if_tx_feature_node_process(struct rte_graph *graph, struct rte_node *node,
 		n_left_from -= 1;
 
 		next0 = if_tx_feature_nm->next_index[mbuf0->port];
-		if (unlikely(next0 != next_index)) {
-			/* Copy things successfully speculated till now */
-			rte_memcpy(to_next, from,
-				   last_spec * sizeof(from[0]));
-			from += last_spec;
-			to_next += last_spec;
-			held += last_spec;
-			last_spec = 0;
 
-			rte_node_enqueue_x1(graph, node,
-					    next0, from[0]);
-			from += 1;
-		} else {
-			last_spec += 1;
-		}
+		rte_node_enqueue_deferred(graph, node, next0, i);
+		i += 1;
 	}
 
-	/* !!! Home run !!! */
-	if (likely(last_spec == nb_objs)) {
-		rte_node_next_stream_move(graph, node, next_index);
-		return nb_objs;
-	}
-	held += last_spec;
-	rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-	rte_node_next_stream_put(graph, node, next_index, held);
-
-	IF_TX_FEATURE_LAST_NEXT_INDEX(node->ctx) = next0;
-
 	return nb_objs;
 }
 
diff --git a/lib/node/ip4_local.c b/lib/node/ip4_local.c
index 288f9399ff34..c15af6efbf3e 100644
--- a/lib/node/ip4_local.c
+++ b/lib/node/ip4_local.c
@@ -24,51 +24,21 @@ static uint16_t
 ip4_local_node_process_scalar(struct rte_graph *graph, struct rte_node *node,
 			      void **objs, uint16_t nb_objs)
 {
-	void **to_next, **from;
-	uint16_t last_spec = 0;
-	rte_edge_t next_index;
 	struct rte_mbuf *mbuf;
-	uint16_t held = 0;
+	rte_edge_t next;
 	uint32_t l4;
 	int i;
 
-	/* Speculative next */
-	next_index = RTE_NODE_IP4_LOCAL_NEXT_UDP4_INPUT;
-
-	from = objs;
-	to_next = rte_node_next_stream_get(graph, node, next_index, nb_objs);
 	for (i = 0; i < nb_objs; i++) {
-		uint16_t next;
-
 		mbuf = (struct rte_mbuf *)objs[i];
 		l4 = mbuf->packet_type & RTE_PTYPE_L4_MASK;
 
 		next = (l4 == RTE_PTYPE_L4_UDP)
-				? next_index
+				? RTE_NODE_IP4_LOCAL_NEXT_UDP4_INPUT
 				: RTE_NODE_IP4_LOCAL_NEXT_PKT_DROP;
 
-		if (unlikely(next_index != next)) {
-			/* Copy things successfully speculated till now */
-			rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-			from += last_spec;
-			to_next += last_spec;
-			held += last_spec;
-			last_spec = 0;
-
-			rte_node_enqueue_x1(graph, node, next, from[0]);
-			from += 1;
-		} else {
-			last_spec += 1;
-		}
+		rte_node_enqueue_deferred(graph, node, next, i);
 	}
-	/* !!! Home run !!! */
-	if (likely(last_spec == nb_objs)) {
-		rte_node_next_stream_move(graph, node, next_index);
-		return nb_objs;
-	}
-	held += last_spec;
-	rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-	rte_node_next_stream_put(graph, node, next_index, held);
 
 	return nb_objs;
 }
diff --git a/lib/node/ip4_lookup.c b/lib/node/ip4_lookup.c
index f6db3219f06b..196a7f3079eb 100644
--- a/lib/node/ip4_lookup.c
+++ b/lib/node/ip4_lookup.c
@@ -53,25 +53,16 @@ ip4_lookup_node_process_scalar(struct rte_graph *graph, struct rte_node *node,
 	struct rte_lpm *lpm = IP4_LOOKUP_NODE_LPM(node->ctx);
 	const int dyn = IP4_LOOKUP_NODE_PRIV1_OFF(node->ctx);
 	struct rte_ipv4_hdr *ipv4_hdr;
-	void **to_next, **from;
-	uint16_t last_spec = 0;
 	struct rte_mbuf *mbuf;
-	rte_edge_t next_index;
-	uint16_t held = 0;
 	uint32_t drop_nh;
 	int i, rc;
 
-	/* Speculative next */
-	next_index = RTE_NODE_IP4_LOOKUP_NEXT_REWRITE;
 	/* Drop node */
 	drop_nh = ((uint32_t)RTE_NODE_IP4_LOOKUP_NEXT_PKT_DROP) << 16;
-	from = objs;
 
-	/* Get stream for the speculated next node */
-	to_next = rte_node_next_stream_get(graph, node, next_index, nb_objs);
 	for (i = 0; i < nb_objs; i++) {
 		uint32_t next_hop;
-		uint16_t next;
+		rte_edge_t next;
 
 		mbuf = (struct rte_mbuf *)objs[i];
 
@@ -88,33 +79,11 @@ ip4_lookup_node_process_scalar(struct rte_graph *graph, struct rte_node *node,
 		NODE_INCREMENT_XSTAT_ID(node, 0, rc != 0, 1);
 
 		node_mbuf_priv1(mbuf, dyn)->nh = (uint16_t)next_hop;
-		next_hop = next_hop >> 16;
-		next = (uint16_t)next_hop;
+		next = (uint16_t)(next_hop >> 16);
 
-		if (unlikely(next_index != next)) {
-			/* Copy things successfully speculated till now */
-			rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-			from += last_spec;
-			to_next += last_spec;
-			held += last_spec;
-			last_spec = 0;
-
-			rte_node_enqueue_x1(graph, node, next, from[0]);
-			from += 1;
-		} else {
-			last_spec += 1;
-		}
+		rte_node_enqueue_deferred(graph, node, next, i);
 	}
 
-	/* !!! Home run !!! */
-	if (likely(last_spec == nb_objs)) {
-		rte_node_next_stream_move(graph, node, next_index);
-		return nb_objs;
-	}
-	held += last_spec;
-	rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-	rte_node_next_stream_put(graph, node, next_index, held);
-
 	return nb_objs;
 }
 
diff --git a/lib/node/ip4_lookup_fib.c b/lib/node/ip4_lookup_fib.c
index 0857d889fca6..9c66a021f7b9 100644
--- a/lib/node/ip4_lookup_fib.c
+++ b/lib/node/ip4_lookup_fib.c
@@ -51,24 +51,13 @@ ip4_lookup_fib_node_process(struct rte_graph *graph, struct rte_node *node, void
 	uint32_t ip[RTE_GRAPH_BURST_SIZE];
 	struct rte_ipv4_hdr *ipv4_hdr;
 	uint16_t lookup_err = 0;
-	void **to_next, **from;
-	uint16_t last_spec = 0;
-	rte_edge_t next_index;
 	uint16_t n_left_from;
-	uint16_t held = 0;
-	uint16_t next;
+	rte_edge_t next;
 	int i;
 
-	/* Speculative next */
-	next_index = RTE_NODE_IP4_LOOKUP_NEXT_REWRITE;
-
 	pkts = (struct rte_mbuf **)objs;
-	from = objs;
 	n_left_from = nb_objs;
 
-	/* Get stream for the speculated next node */
-	to_next = rte_node_next_stream_get(graph, node, next_index, nb_objs);
-
 	for (i = 0; i < 4 && i < n_left_from; i++)
 		rte_prefetch0(rte_pktmbuf_mtod_offset(pkts[i], void *,
 					sizeof(struct rte_ether_hdr)));
@@ -161,34 +150,13 @@ ip4_lookup_fib_node_process(struct rte_graph *graph, struct rte_node *node, void
 		node_mbuf_priv1(mbuf0, dyn)->nh = (uint16_t)next_hop[i];
 		next = (uint16_t)(next_hop[i] >> 16);
 
-		if (unlikely(next_index ^ next)) {
-			/* Copy things successfully speculated till now */
-			rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-			from += last_spec;
-			to_next += last_spec;
-			held += last_spec;
-			last_spec = 0;
-
-			rte_node_enqueue_x1(graph, node, next, from[0]);
-			from += 1;
-		} else {
-			last_spec += 1;
-		}
-
 		if (unlikely(next_hop[i] == FIB_DEFAULT_NH))
 			lookup_err += 1;
-	}
 
-	/* !!! Home run !!! */
-	if (likely(last_spec == nb_objs)) {
-		rte_node_next_stream_move(graph, node, next_index);
-		return nb_objs;
+		rte_node_enqueue_deferred(graph, node, next, i);
 	}
 
 	NODE_INCREMENT_XSTAT_ID(node, 0, lookup_err != 0, lookup_err);
-	held += last_spec;
-	rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-	rte_node_next_stream_put(graph, node, next_index, held);
 
 	return nb_objs;
 }
diff --git a/lib/node/ip4_lookup_neon.h b/lib/node/ip4_lookup_neon.h
index 1bc4ad655461..45831deb8c06 100644
--- a/lib/node/ip4_lookup_neon.h
+++ b/lib/node/ip4_lookup_neon.h
@@ -14,11 +14,7 @@ ip4_lookup_node_process_vec(struct rte_graph *graph, struct rte_node *node,
 	struct rte_lpm *lpm = IP4_LOOKUP_NODE_LPM(node->ctx);
 	const int dyn = IP4_LOOKUP_NODE_PRIV1_OFF(node->ctx);
 	struct rte_ipv4_hdr *ipv4_hdr;
-	void **to_next, **from;
-	uint16_t last_spec = 0;
-	rte_edge_t next_index;
 	uint16_t n_left_from;
-	uint16_t held = 0;
 	uint32_t drop_nh;
 	rte_xmm_t result;
 	rte_xmm_t priv01;
@@ -26,13 +22,10 @@ ip4_lookup_node_process_vec(struct rte_graph *graph, struct rte_node *node,
 	int32x4_t dip;
 	int rc, i;
 
-	/* Speculative next */
-	next_index = RTE_NODE_IP4_LOOKUP_NEXT_REWRITE;
 	/* Drop node */
 	drop_nh = ((uint32_t)RTE_NODE_IP4_LOOKUP_NEXT_PKT_DROP) << 16;
 
 	pkts = (struct rte_mbuf **)objs;
-	from = objs;
 	n_left_from = nb_objs;
 
 	for (i = OBJS_PER_CLINE; i < RTE_GRAPH_BURST_SIZE; i += OBJS_PER_CLINE)
@@ -43,8 +36,7 @@ ip4_lookup_node_process_vec(struct rte_graph *graph, struct rte_node *node,
 						sizeof(struct rte_ether_hdr)));
 
 	dip = vdupq_n_s32(0);
-	/* Get stream for the speculated next node */
-	to_next = rte_node_next_stream_get(graph, node, next_index, nb_objs);
+	i = 0;
 	while (n_left_from >= 4) {
 #if RTE_GRAPH_BURST_SIZE > 64
 		/* Prefetch next-next mbufs */
@@ -126,64 +118,11 @@ ip4_lookup_node_process_vec(struct rte_graph *graph, struct rte_node *node,
 		node_mbuf_priv1(mbuf2, dyn)->u = priv23.u64[0];
 		node_mbuf_priv1(mbuf3, dyn)->u = priv23.u64[1];
 
-		/* Enqueue four to next node */
-		rte_edge_t fix_spec = ((next_index == result.u16[1]) &&
-				       (result.u16[1] == result.u16[3]) &&
-				       (result.u16[3] == result.u16[5]) &&
-				       (result.u16[5] == result.u16[7]));
-
-		if (unlikely(fix_spec == 0)) {
-			/* Copy things successfully speculated till now */
-			rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-			from += last_spec;
-			to_next += last_spec;
-			held += last_spec;
-			last_spec = 0;
-
-			/* Next0 */
-			if (next_index == result.u16[1]) {
-				to_next[0] = from[0];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node, result.u16[1],
-						    from[0]);
-			}
-
-			/* Next1 */
-			if (next_index == result.u16[3]) {
-				to_next[0] = from[1];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node, result.u16[3],
-						    from[1]);
-			}
-
-			/* Next2 */
-			if (next_index == result.u16[5]) {
-				to_next[0] = from[2];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node, result.u16[5],
-						    from[2]);
-			}
-
-			/* Next3 */
-			if (next_index == result.u16[7]) {
-				to_next[0] = from[3];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node, result.u16[7],
-						    from[3]);
-			}
-
-			from += 4;
-		} else {
-			last_spec += 4;
-		}
+		rte_node_enqueue_deferred(graph, node, result.u16[1], i);
+		rte_node_enqueue_deferred(graph, node, result.u16[3], i + 1);
+		rte_node_enqueue_deferred(graph, node, result.u16[5], i + 2);
+		rte_node_enqueue_deferred(graph, node, result.u16[7], i + 3);
+		i += 4;
 	}
 
 	while (n_left_from > 0) {
@@ -208,33 +147,12 @@ ip4_lookup_node_process_vec(struct rte_graph *graph, struct rte_node *node,
 
 		NODE_INCREMENT_XSTAT_ID(node, 0, rc != 0, 1);
 		node_mbuf_priv1(mbuf0, dyn)->nh = (uint16_t)next_hop;
-		next_hop = next_hop >> 16;
-		next0 = (uint16_t)next_hop;
+		next0 = (uint16_t)(next_hop >> 16);
 
-		if (unlikely(next_index ^ next0)) {
-			/* Copy things successfully speculated till now */
-			rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-			from += last_spec;
-			to_next += last_spec;
-			held += last_spec;
-			last_spec = 0;
-
-			rte_node_enqueue_x1(graph, node, next0, from[0]);
-			from += 1;
-		} else {
-			last_spec += 1;
-		}
+		rte_node_enqueue_deferred(graph, node, next0, i);
+		i += 1;
 	}
 
-	/* !!! Home run !!! */
-	if (likely(last_spec == nb_objs)) {
-		rte_node_next_stream_move(graph, node, next_index);
-		return nb_objs;
-	}
-	held += last_spec;
-	rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-	rte_node_next_stream_put(graph, node, next_index, held);
-
 	return nb_objs;
 }
 
diff --git a/lib/node/ip4_lookup_sse.h b/lib/node/ip4_lookup_sse.h
index fb5f9c9b9901..de52439ae257 100644
--- a/lib/node/ip4_lookup_sse.h
+++ b/lib/node/ip4_lookup_sse.h
@@ -13,25 +13,19 @@ ip4_lookup_node_process_vec(struct rte_graph *graph, struct rte_node *node,
 	struct rte_mbuf *mbuf0, *mbuf1, *mbuf2, *mbuf3, **pkts;
 	struct rte_lpm *lpm = IP4_LOOKUP_NODE_LPM(node->ctx);
 	const int dyn = IP4_LOOKUP_NODE_PRIV1_OFF(node->ctx);
-	rte_edge_t next0, next1, next2, next3, next_index;
+	rte_edge_t next0, next1, next2, next3;
 	struct rte_ipv4_hdr *ipv4_hdr;
 	uint32_t ip0, ip1, ip2, ip3;
-	void **to_next, **from;
-	uint16_t last_spec = 0;
 	uint16_t n_left_from;
-	uint16_t held = 0;
 	uint32_t drop_nh;
 	rte_xmm_t dst;
 	__m128i dip; /* SSE register */
 	int rc, i;
 
-	/* Speculative next */
-	next_index = RTE_NODE_IP4_LOOKUP_NEXT_REWRITE;
 	/* Drop node */
 	drop_nh = ((uint32_t)RTE_NODE_IP4_LOOKUP_NEXT_PKT_DROP) << 16;
 
 	pkts = (struct rte_mbuf **)objs;
-	from = objs;
 	n_left_from = nb_objs;
 
 	if (n_left_from >= 4) {
@@ -40,8 +34,7 @@ ip4_lookup_node_process_vec(struct rte_graph *graph, struct rte_node *node,
 						sizeof(struct rte_ether_hdr)));
 	}
 
-	/* Get stream for the speculated next node */
-	to_next = rte_node_next_stream_get(graph, node, next_index, nb_objs);
+	i = 0;
 	while (n_left_from >= 4) {
 		/* Prefetch next-next mbufs */
 		if (likely(n_left_from > 11)) {
@@ -133,64 +126,11 @@ ip4_lookup_node_process_vec(struct rte_graph *graph, struct rte_node *node,
 		node_mbuf_priv1(mbuf3, dyn)->nh = dst.u32[3] & 0xFFFF;
 		next3 = (dst.u32[3] >> 16);
 
-		/* Enqueue four to next node */
-		rte_edge_t fix_spec =
-			(next_index ^ next0) | (next_index ^ next1) |
-			(next_index ^ next2) | (next_index ^ next3);
-
-		if (unlikely(fix_spec)) {
-			/* Copy things successfully speculated till now */
-			rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-			from += last_spec;
-			to_next += last_spec;
-			held += last_spec;
-			last_spec = 0;
-
-			/* Next0 */
-			if (next_index == next0) {
-				to_next[0] = from[0];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node, next0,
-						    from[0]);
-			}
-
-			/* Next1 */
-			if (next_index == next1) {
-				to_next[0] = from[1];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node, next1,
-						    from[1]);
-			}
-
-			/* Next2 */
-			if (next_index == next2) {
-				to_next[0] = from[2];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node, next2,
-						    from[2]);
-			}
-
-			/* Next3 */
-			if (next_index == next3) {
-				to_next[0] = from[3];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node, next3,
-						    from[3]);
-			}
-
-			from += 4;
-
-		} else {
-			last_spec += 4;
-		}
+		rte_node_enqueue_deferred(graph, node, next0, i);
+		rte_node_enqueue_deferred(graph, node, next1, i + 1);
+		rte_node_enqueue_deferred(graph, node, next2, i + 2);
+		rte_node_enqueue_deferred(graph, node, next3, i + 3);
+		i += 4;
 	}
 
 	while (n_left_from > 0) {
@@ -216,32 +156,10 @@ ip4_lookup_node_process_vec(struct rte_graph *graph, struct rte_node *node,
 		node_mbuf_priv1(mbuf0, dyn)->nh = next_hop & 0xFFFF;
 		next0 = (next_hop >> 16);
 
-		if (unlikely(next_index ^ next0)) {
-			/* Copy things successfully speculated till now */
-			rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-			from += last_spec;
-			to_next += last_spec;
-			held += last_spec;
-			last_spec = 0;
-
-			rte_node_enqueue_x1(graph, node, next0, from[0]);
-			from += 1;
-		} else {
-			last_spec += 1;
-		}
+		rte_node_enqueue_deferred(graph, node, next0, i);
+		i += 1;
 	}
 
-	/* !!! Home run !!! */
-	if (likely(last_spec == nb_objs)) {
-		rte_node_next_stream_move(graph, node, next_index);
-		return nb_objs;
-	}
-
-	held += last_spec;
-	/* Copy things successfully speculated till now */
-	rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-	rte_node_next_stream_put(graph, node, next_index, held);
-
 	return nb_objs;
 }
 
diff --git a/lib/node/ip4_rewrite.c b/lib/node/ip4_rewrite.c
index 37bc3a511fc7..2fa5ada3efba 100644
--- a/lib/node/ip4_rewrite.c
+++ b/lib/node/ip4_rewrite.c
@@ -26,8 +26,6 @@ struct ip4_rewrite_node_ctx {
 	int mbuf_priv1_off;
 	/* Dynamic offset to feature arc field */
 	int arc_dyn_off;
-	/* Cached next index */
-	uint16_t next_index;
 	/* tx interface of last mbuf */
 	uint16_t last_tx_if;
 	/* Cached feature arc handle */
@@ -37,9 +35,6 @@ struct ip4_rewrite_node_ctx {
 static struct ip4_rewrite_node_main *ip4_rewrite_nm;
 static int port_to_next_index_diff = -1;
 
-#define IP4_REWRITE_NODE_LAST_NEXT(ctx) \
-	(((struct ip4_rewrite_node_ctx *)ctx)->next_index)
-
 #define IP4_REWRITE_NODE_PRIV1_OFF(ctx) \
 	(((struct ip4_rewrite_node_ctx *)ctx)->mbuf_priv1_off)
 
@@ -190,22 +185,18 @@ __ip4_rewrite_node_process(struct rte_graph *graph, struct rte_node *node,
 	rte_graph_feature_data_t feature_data = RTE_GRAPH_FEATURE_DATA_INVALID;
 	struct rte_mbuf *mbuf0, *mbuf1, *mbuf2, *mbuf3, **pkts;
 	struct ip4_rewrite_nh_header *nh = ip4_rewrite_nm->nh;
-	uint16_t next0, next1, next2, next3, next_index;
 	struct rte_ipv4_hdr *ip0, *ip1, *ip2, *ip3;
-	uint16_t n_left_from, held = 0, last_spec = 0;
+	rte_edge_t next0, next1, next2, next3;
 	uint16_t last_tx_if, last_next_index;
 	void *d0, *d1, *d2, *d3;
-	void **to_next, **from;
+	uint16_t n_left_from;
 	rte_xmm_t priv01;
 	rte_xmm_t priv23;
 	int i;
 
-	/* Speculative next as last next */
-	next_index = IP4_REWRITE_NODE_LAST_NEXT(node->ctx);
 	rte_prefetch0(nh);
 
 	pkts = (struct rte_mbuf **)objs;
-	from = objs;
 	n_left_from = nb_objs;
 
 	for (i = 0; i < 4 && i < n_left_from; i++)
@@ -233,8 +224,7 @@ __ip4_rewrite_node_process(struct rte_graph *graph, struct rte_node *node,
 		last_next_index = UINT16_MAX;
 	}
 
-	/* Get stream for the speculated next node */
-	to_next = rte_node_next_stream_get(graph, node, next_index, nb_objs);
+	i = 0;
 	/* Update Ethernet header of pkts */
 	while (n_left_from >= 4) {
 		if (likely(n_left_from > 7)) {
@@ -319,76 +309,11 @@ __ip4_rewrite_node_process(struct rte_graph *graph, struct rte_node *node,
 						    &next0, &next1, &next2, &next3,
 						    &last_next_index, &feature_data, feat_dyn);
 
-		/* Enqueue four to next node */
-		rte_edge_t fix_spec =
-			((next_index == next0) && (next0 == next1) &&
-			 (next1 == next2) && (next2 == next3));
-
-		if (unlikely(fix_spec == 0)) {
-			/* Copy things successfully speculated till now */
-			rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-			from += last_spec;
-			to_next += last_spec;
-			held += last_spec;
-			last_spec = 0;
-
-			/* next0 */
-			if (next_index == next0) {
-				to_next[0] = from[0];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node, next0,
-						    from[0]);
-			}
-
-			/* next1 */
-			if (next_index == next1) {
-				to_next[0] = from[1];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node, next1,
-						    from[1]);
-			}
-
-			/* next2 */
-			if (next_index == next2) {
-				to_next[0] = from[2];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node, next2,
-						    from[2]);
-			}
-
-			/* next3 */
-			if (next_index == next3) {
-				to_next[0] = from[3];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node, next3,
-						    from[3]);
-			}
-
-			from += 4;
-
-			/* Change speculation if last two are same */
-			if ((next_index != next3) && (next2 == next3)) {
-				/* Put the current speculated node */
-				rte_node_next_stream_put(graph, node,
-							 next_index, held);
-				held = 0;
-
-				/* Get next speculated stream */
-				next_index = next3;
-				to_next = rte_node_next_stream_get(
-					graph, node, next_index, nb_objs);
-			}
-		} else {
-			last_spec += 4;
-		}
+		rte_node_enqueue_deferred(graph, node, next0, i);
+		rte_node_enqueue_deferred(graph, node, next1, i + 1);
+		rte_node_enqueue_deferred(graph, node, next2, i + 2);
+		rte_node_enqueue_deferred(graph, node, next3, i + 3);
+		i += 4;
 	}
 
 	while (n_left_from > 0) {
@@ -417,33 +342,10 @@ __ip4_rewrite_node_process(struct rte_graph *graph, struct rte_node *node,
 						    mbuf0, &next0, &last_next_index,
 						    &feature_data, feat_dyn);
 
-		if (unlikely(next_index ^ next0)) {
-			/* Copy things successfully speculated till now */
-			rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-			from += last_spec;
-			to_next += last_spec;
-			held += last_spec;
-			last_spec = 0;
-
-			rte_node_enqueue_x1(graph, node, next0, from[0]);
-			from += 1;
-		} else {
-			last_spec += 1;
-		}
+		rte_node_enqueue_deferred(graph, node, next0, i);
+		i += 1;
 	}
 
-	/* !!! Home run !!! */
-	if (likely(last_spec == nb_objs)) {
-		rte_node_next_stream_move(graph, node, next_index);
-		return nb_objs;
-	}
-
-	held += last_spec;
-	rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-	rte_node_next_stream_put(graph, node, next_index, held);
-	/* Save the last next used */
-	IP4_REWRITE_NODE_LAST_NEXT(node->ctx) = next_index;
-
 	if (check_enabled_features)
 		IP4_REWRITE_NODE_LAST_TX_IF(node->ctx) = last_tx_if;
 
@@ -515,8 +417,6 @@ ip4_rewrite_node_init(const struct rte_graph *graph, struct rte_node *node)
 			IP4_REWRITE_NODE_FEAT_OFF(node->ctx) =
 				rte_graph_feature_arc_get(feature_arc)->mbuf_dyn_offset;
 
-		/* By default, set cached next node to pkt_drop */
-		IP4_REWRITE_NODE_LAST_NEXT(node->ctx) = 0;
 		IP4_REWRITE_NODE_LAST_TX_IF(node->ctx) = 0;
 
 		init_once = true;
diff --git a/lib/node/ip6_lookup.c b/lib/node/ip6_lookup.c
index 83c0500c76ef..af286aef87a5 100644
--- a/lib/node/ip6_lookup.c
+++ b/lib/node/ip6_lookup.c
@@ -48,21 +48,14 @@ ip6_lookup_node_process_scalar(struct rte_graph *graph, struct rte_node *node,
 	struct rte_lpm6 *lpm6 = IP6_LOOKUP_NODE_LPM(node->ctx);
 	const int dyn = IP6_LOOKUP_NODE_PRIV1_OFF(node->ctx);
 	struct rte_ipv6_hdr *ipv6_hdr;
-	void **to_next, **from;
-	uint16_t last_spec = 0;
-	rte_edge_t next_index;
 	uint16_t n_left_from;
-	uint16_t held = 0;
 	uint32_t drop_nh;
 	int i, rc;
 
-	/* Speculative next */
-	next_index = RTE_NODE_IP6_LOOKUP_NEXT_REWRITE;
 	/* Drop node */
 	drop_nh = ((uint32_t)RTE_NODE_IP6_LOOKUP_NEXT_PKT_DROP) << 16;
 
 	pkts = (struct rte_mbuf **)objs;
-	from = objs;
 	n_left_from = nb_objs;
 
 	for (i = OBJS_PER_CLINE; i < RTE_GRAPH_BURST_SIZE; i += OBJS_PER_CLINE)
@@ -72,8 +65,7 @@ ip6_lookup_node_process_scalar(struct rte_graph *graph, struct rte_node *node,
 		rte_prefetch0(rte_pktmbuf_mtod_offset(pkts[i], void *,
 						sizeof(struct rte_ether_hdr)));
 
-	/* Get stream for the speculated next node */
-	to_next = rte_node_next_stream_get(graph, node, next_index, nb_objs);
+	i = 0;
 	while (n_left_from >= 4) {
 		struct rte_ipv6_addr ip_batch[4];
 		int32_t next_hop[4];
@@ -154,59 +146,11 @@ ip6_lookup_node_process_scalar(struct rte_graph *graph, struct rte_node *node,
 		node_mbuf_priv1(mbuf3, dyn)->nh = (uint16_t)next_hop[3];
 		next[3] = (uint16_t)(next_hop[3] >> 16);
 
-		rte_edge_t fix_spec = ((next_index == next[0]) &&
-					(next_index == next[1]) &&
-					(next_index == next[2]) &&
-					(next_index == next[3]));
-
-		if (unlikely(fix_spec == 0)) {
-			/* Copy things successfully speculated till now */
-			rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-			from += last_spec;
-			to_next += last_spec;
-			held += last_spec;
-			last_spec = 0;
-
-			/* Next0 */
-			if (next_index == next[0]) {
-				to_next[0] = from[0];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node, next[0], from[0]);
-			}
-
-			/* Next1 */
-			if (next_index == next[1]) {
-				to_next[0] = from[1];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node, next[1], from[1]);
-			}
-
-			/* Next2 */
-			if (next_index == next[2]) {
-				to_next[0] = from[2];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node, next[2], from[2]);
-			}
-
-			/* Next3 */
-			if (next_index == next[3]) {
-				to_next[0] = from[3];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node, next[3], from[3]);
-			}
-
-			from += 4;
-		} else {
-			last_spec += 4;
-		}
+		rte_node_enqueue_deferred(graph, node, next[0], i);
+		rte_node_enqueue_deferred(graph, node, next[1], i + 1);
+		rte_node_enqueue_deferred(graph, node, next[2], i + 2);
+		rte_node_enqueue_deferred(graph, node, next[3], i + 3);
+		i += 4;
 	}
 
 	while (n_left_from > 0) {
@@ -228,33 +172,12 @@ ip6_lookup_node_process_scalar(struct rte_graph *graph, struct rte_node *node,
 		next_hop = (rc == 0) ? next_hop : drop_nh;
 
 		node_mbuf_priv1(mbuf0, dyn)->nh = (uint16_t)next_hop;
-		next_hop = next_hop >> 16;
-		next0 = (uint16_t)next_hop;
+		next0 = (uint16_t)(next_hop >> 16);
 
-		if (unlikely(next_index ^ next0)) {
-			/* Copy things successfully speculated till now */
-			rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-			from += last_spec;
-			to_next += last_spec;
-			held += last_spec;
-			last_spec = 0;
-
-			rte_node_enqueue_x1(graph, node, next0, from[0]);
-			from += 1;
-		} else {
-			last_spec += 1;
-		}
+		rte_node_enqueue_deferred(graph, node, next0, i);
+		i += 1;
 	}
 
-	/* !!! Home run !!! */
-	if (likely(last_spec == nb_objs)) {
-		rte_node_next_stream_move(graph, node, next_index);
-		return nb_objs;
-	}
-	held += last_spec;
-	rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-	rte_node_next_stream_put(graph, node, next_index, held);
-
 	return nb_objs;
 }
 
diff --git a/lib/node/ip6_lookup_fib.c b/lib/node/ip6_lookup_fib.c
index 40c5c753dfac..f4e5cfc04e8f 100644
--- a/lib/node/ip6_lookup_fib.c
+++ b/lib/node/ip6_lookup_fib.c
@@ -51,24 +51,13 @@ ip6_lookup_fib_node_process(struct rte_graph *graph, struct rte_node *node, void
 	uint64_t next_hop[RTE_GRAPH_BURST_SIZE];
 	struct rte_ipv6_hdr *ipv6_hdr;
 	uint16_t lookup_err = 0;
-	void **to_next, **from;
-	uint16_t last_spec = 0;
-	rte_edge_t next_index;
 	uint16_t n_left_from;
-	uint16_t held = 0;
 	uint16_t next;
 	int i;
 
-	/* Speculative next */
-	next_index = RTE_NODE_IP6_LOOKUP_NEXT_REWRITE;
-
 	pkts = (struct rte_mbuf **)objs;
-	from = objs;
 	n_left_from = nb_objs;
 
-	/* Get stream for the speculated next node */
-	to_next = rte_node_next_stream_get(graph, node, next_index, nb_objs);
-
 	for (i = 0; i < 4 && i < n_left_from; i++)
 		rte_prefetch0(rte_pktmbuf_mtod_offset(pkts[i], void *,
 					sizeof(struct rte_ether_hdr)));
@@ -155,34 +144,13 @@ ip6_lookup_fib_node_process(struct rte_graph *graph, struct rte_node *node, void
 		node_mbuf_priv1(mbuf0, dyn)->nh = (uint16_t)next_hop[i];
 		next = (uint16_t)(next_hop[i] >> 16);
 
-		if (unlikely(next_index ^ next)) {
-			/* Copy things successfully speculated till now */
-			rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-			from += last_spec;
-			to_next += last_spec;
-			held += last_spec;
-			last_spec = 0;
-
-			rte_node_enqueue_x1(graph, node, next, from[0]);
-			from += 1;
-		} else {
-			last_spec += 1;
-		}
-
 		if (unlikely(next_hop[i] == FIB6_DEFAULT_NH))
 			lookup_err += 1;
-	}
 
-	/* !!! Home run !!! */
-	if (likely(last_spec == nb_objs)) {
-		rte_node_next_stream_move(graph, node, next_index);
-		return nb_objs;
+		rte_node_enqueue_deferred(graph, node, next, i);
 	}
 
 	NODE_INCREMENT_XSTAT_ID(node, 0, lookup_err != 0, lookup_err);
-	held += last_spec;
-	rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-	rte_node_next_stream_put(graph, node, next_index, held);
 
 	return nb_objs;
 }
diff --git a/lib/node/ip6_rewrite.c b/lib/node/ip6_rewrite.c
index d5488e7fa3f0..ab1368b8dac2 100644
--- a/lib/node/ip6_rewrite.c
+++ b/lib/node/ip6_rewrite.c
@@ -19,15 +19,10 @@
 struct ip6_rewrite_node_ctx {
 	/* Dynamic offset to mbuf priv1 */
 	int mbuf_priv1_off;
-	/* Cached next index */
-	uint16_t next_index;
 };
 
 static struct ip6_rewrite_node_main *ip6_rewrite_nm;
 
-#define IP6_REWRITE_NODE_LAST_NEXT(ctx) \
-	(((struct ip6_rewrite_node_ctx *)ctx)->next_index)
-
 #define IP6_REWRITE_NODE_PRIV1_OFF(ctx) \
 	(((struct ip6_rewrite_node_ctx *)ctx)->mbuf_priv1_off)
 
@@ -38,28 +33,23 @@ ip6_rewrite_node_process(struct rte_graph *graph, struct rte_node *node,
 	struct rte_mbuf *mbuf0, *mbuf1, *mbuf2, *mbuf3, **pkts;
 	struct ip6_rewrite_nh_header *nh = ip6_rewrite_nm->nh;
 	const int dyn = IP6_REWRITE_NODE_PRIV1_OFF(node->ctx);
-	uint16_t next0, next1, next2, next3, next_index;
-	uint16_t n_left_from, held = 0, last_spec = 0;
 	struct rte_ipv6_hdr *ip0, *ip1, *ip2, *ip3;
+	rte_edge_t next0, next1, next2, next3;
 	void *d0, *d1, *d2, *d3;
-	void **to_next, **from;
+	uint16_t n_left_from;
 	rte_xmm_t priv01;
 	rte_xmm_t priv23;
 	int i;
 
-	/* Speculative next as last next */
-	next_index = IP6_REWRITE_NODE_LAST_NEXT(node->ctx);
 	rte_prefetch0(nh);
 
 	pkts = (struct rte_mbuf **)objs;
-	from = objs;
 	n_left_from = nb_objs;
 
 	for (i = 0; i < 4 && i < n_left_from; i++)
 		rte_prefetch0(pkts[i]);
 
-	/* Get stream for the speculated next node */
-	to_next = rte_node_next_stream_get(graph, node, next_index, nb_objs);
+	i = 0;
 	/* Update Ethernet header of pkts */
 	while (n_left_from >= 4) {
 		if (likely(n_left_from > 7)) {
@@ -123,76 +113,11 @@ ip6_rewrite_node_process(struct rte_graph *graph, struct rte_node *node,
 					      sizeof(struct rte_ether_hdr));
 		ip3->hop_limits = priv23.u16[5] - 1;
 
-		/* Enqueue four packets to next node */
-		rte_edge_t fix_spec =
-			((next_index == next0) && (next0 == next1) &&
-			 (next1 == next2) && (next2 == next3));
-
-		if (unlikely(fix_spec == 0)) {
-			/* Copy things successfully speculated till now */
-			rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-			from += last_spec;
-			to_next += last_spec;
-			held += last_spec;
-			last_spec = 0;
-
-			/* next0 */
-			if (next_index == next0) {
-				to_next[0] = from[0];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node, next0,
-						    from[0]);
-			}
-
-			/* next1 */
-			if (next_index == next1) {
-				to_next[0] = from[1];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node, next1,
-						    from[1]);
-			}
-
-			/* next2 */
-			if (next_index == next2) {
-				to_next[0] = from[2];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node, next2,
-						    from[2]);
-			}
-
-			/* next3 */
-			if (next_index == next3) {
-				to_next[0] = from[3];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node, next3,
-						    from[3]);
-			}
-
-			from += 4;
-
-			/* Change speculation if last two are same */
-			if ((next_index != next3) && (next2 == next3)) {
-				/* Put the current speculated node */
-				rte_node_next_stream_put(graph, node,
-							 next_index, held);
-				held = 0;
-
-				/* Get next speculated stream */
-				next_index = next3;
-				to_next = rte_node_next_stream_get(
-					graph, node, next_index, nb_objs);
-			}
-		} else {
-			last_spec += 4;
-		}
+		rte_node_enqueue_deferred(graph, node, next0, i);
+		rte_node_enqueue_deferred(graph, node, next1, i + 1);
+		rte_node_enqueue_deferred(graph, node, next2, i + 2);
+		rte_node_enqueue_deferred(graph, node, next3, i + 3);
+		i += 4;
 	}
 
 	while (n_left_from > 0) {
@@ -210,33 +135,10 @@ ip6_rewrite_node_process(struct rte_graph *graph, struct rte_node *node,
 					      sizeof(struct rte_ether_hdr));
 		ip0->hop_limits = node_mbuf_priv1(mbuf0, dyn)->ttl - 1;
 
-		if (unlikely(next_index ^ next0)) {
-			/* Copy things successfully speculated till now */
-			rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-			from += last_spec;
-			to_next += last_spec;
-			held += last_spec;
-			last_spec = 0;
-
-			rte_node_enqueue_x1(graph, node, next0, from[0]);
-			from += 1;
-		} else {
-			last_spec += 1;
-		}
+		rte_node_enqueue_deferred(graph, node, next0, i);
+		i += 1;
 	}
 
-	/* !!! Home run !!! */
-	if (likely(last_spec == nb_objs)) {
-		rte_node_next_stream_move(graph, node, next_index);
-		return nb_objs;
-	}
-
-	held += last_spec;
-	rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-	rte_node_next_stream_put(graph, node, next_index, held);
-	/* Save the last next used */
-	IP6_REWRITE_NODE_LAST_NEXT(node->ctx) = next_index;
-
 	return nb_objs;
 }
 
diff --git a/lib/node/pkt_cls.c b/lib/node/pkt_cls.c
index ca323ea5732f..b05356596f45 100644
--- a/lib/node/pkt_cls.c
+++ b/lib/node/pkt_cls.c
@@ -47,15 +47,11 @@ pkt_cls_node_process(struct rte_graph *graph, struct rte_node *node,
 		     void **objs, uint16_t nb_objs)
 {
 	struct rte_mbuf *mbuf0, *mbuf1, *mbuf2, *mbuf3, **pkts;
-	uint8_t l0, l1, l2, l3, last_type;
-	uint16_t next_index, n_left_from;
-	uint16_t held = 0, last_spec = 0;
-	struct pkt_cls_node_ctx *ctx;
-	void **to_next, **from;
+	uint8_t l0, l1, l2, l3;
+	uint16_t n_left_from;
 	uint32_t i;
 
 	pkts = (struct rte_mbuf **)objs;
-	from = objs;
 	n_left_from = nb_objs;
 
 	for (i = OBJS_PER_CLINE; i < RTE_GRAPH_BURST_SIZE; i += OBJS_PER_CLINE)
@@ -66,13 +62,7 @@ pkt_cls_node_process(struct rte_graph *graph, struct rte_node *node,
 		rte_prefetch0(pkts[i]);
 #endif
 
-	ctx = (struct pkt_cls_node_ctx *)node->ctx;
-	last_type = ctx->l2l3_type;
-	next_index = p_nxt[last_type];
-
-	/* Get stream for the speculated next node */
-	to_next = rte_node_next_stream_get(graph, node,
-					   next_index, nb_objs);
+	i = 0;
 	while (n_left_from >= 4) {
 #if RTE_GRAPH_BURST_SIZE > 64
 		if (likely(n_left_from > 7)) {
@@ -99,86 +89,11 @@ pkt_cls_node_process(struct rte_graph *graph, struct rte_node *node,
 		l3 = mbuf3->packet_type &
 			(RTE_PTYPE_L2_MASK | RTE_PTYPE_L3_MASK);
 
-		/* Check if they are destined to same
-		 * next node based on l2l3 packet type.
-		 */
-		uint8_t fix_spec = (last_type ^ l0) | (last_type ^ l1) |
-			(last_type ^ l2) | (last_type ^ l3);
-
-		if (unlikely(fix_spec)) {
-			/* Copy things successfully speculated till now */
-			rte_memcpy(to_next, from,
-				   last_spec * sizeof(from[0]));
-			from += last_spec;
-			to_next += last_spec;
-			held += last_spec;
-			last_spec = 0;
-
-			/* l0 */
-			if (p_nxt[l0] == next_index) {
-				to_next[0] = from[0];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node,
-						    p_nxt[l0], from[0]);
-			}
-
-			/* l1 */
-			if (p_nxt[l1] == next_index) {
-				to_next[0] = from[1];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node,
-						    p_nxt[l1], from[1]);
-			}
-
-			/* l2 */
-			if (p_nxt[l2] == next_index) {
-				to_next[0] = from[2];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node,
-						    p_nxt[l2], from[2]);
-			}
-
-			/* l3 */
-			if (p_nxt[l3] == next_index) {
-				to_next[0] = from[3];
-				to_next++;
-				held++;
-			} else {
-				rte_node_enqueue_x1(graph, node,
-						    p_nxt[l3], from[3]);
-			}
-
-			/* Update speculated ptype */
-			if ((last_type != l3) && (l2 == l3) &&
-			    (next_index != p_nxt[l3])) {
-				/* Put the current stream for
-				 * speculated ltype.
-				 */
-				rte_node_next_stream_put(graph, node,
-							 next_index, held);
-
-				held = 0;
-
-				/* Get next stream for new ltype */
-				next_index = p_nxt[l3];
-				last_type = l3;
-				to_next = rte_node_next_stream_get(graph, node,
-								   next_index,
-								   nb_objs);
-			} else if (next_index == p_nxt[l3]) {
-				last_type = l3;
-			}
-
-			from += 4;
-		} else {
-			last_spec += 4;
-		}
+		rte_node_enqueue_deferred(graph, node, p_nxt[l0], i);
+		rte_node_enqueue_deferred(graph, node, p_nxt[l1], i + 1);
+		rte_node_enqueue_deferred(graph, node, p_nxt[l2], i + 2);
+		rte_node_enqueue_deferred(graph, node, p_nxt[l3], i + 3);
+		i += 4;
 	}
 
 	while (n_left_from > 0) {
@@ -189,36 +104,11 @@ pkt_cls_node_process(struct rte_graph *graph, struct rte_node *node,
 
 		l0 = mbuf0->packet_type &
 			(RTE_PTYPE_L2_MASK | RTE_PTYPE_L3_MASK);
-		if (unlikely((l0 != last_type) &&
-			     (p_nxt[l0] != next_index))) {
-			/* Copy things successfully speculated till now */
-			rte_memcpy(to_next, from,
-				   last_spec * sizeof(from[0]));
-			from += last_spec;
-			to_next += last_spec;
-			held += last_spec;
-			last_spec = 0;
 
-			rte_node_enqueue_x1(graph, node,
-					    p_nxt[l0], from[0]);
-			from += 1;
-		} else {
-			last_spec += 1;
-		}
+		rte_node_enqueue_deferred(graph, node, p_nxt[l0], i);
+		i += 1;
 	}
 
-	/* !!! Home run !!! */
-	if (likely(last_spec == nb_objs)) {
-		rte_node_next_stream_move(graph, node, next_index);
-		return nb_objs;
-	}
-
-	held += last_spec;
-	/* Copy things successfully speculated till now */
-	rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-	rte_node_next_stream_put(graph, node, next_index, held);
-
-	ctx->l2l3_type = last_type;
 	return nb_objs;
 }
 
diff --git a/lib/node/udp4_input.c b/lib/node/udp4_input.c
index 5a74e28c8586..496e619e3e24 100644
--- a/lib/node/udp4_input.c
+++ b/lib/node/udp4_input.c
@@ -26,10 +26,6 @@
 #define UDP4_INPUT_NODE_HASH(ctx) \
 	(((struct udp4_input_node_ctx *)ctx)->hash)
 
-#define UDP4_INPUT_NODE_NEXT_INDEX(ctx) \
-	(((struct udp4_input_node_ctx *)ctx)->next_index)
-
-
 /* UDP4 input  global data struct */
 struct udp4_input_node_main {
 	struct rte_hash *hash_tbl[RTE_MAX_NUMA_NODES];
@@ -40,8 +36,6 @@ static struct udp4_input_node_main udp4_input_nm;
 struct udp4_input_node_ctx {
 	/* Socket's Hash table */
 	struct rte_hash *hash;
-	/* Cached next index */
-	uint16_t next_index;
 };
 
 struct flow_key {
@@ -155,21 +149,11 @@ udp4_input_node_process_scalar(struct rte_graph *graph, struct rte_node *node,
 			       void **objs, uint16_t nb_objs)
 {
 	struct rte_hash *hash_tbl_handle = UDP4_INPUT_NODE_HASH(node->ctx);
-	rte_edge_t next_index, udplookup_node;
 	struct rte_udp_hdr *pkt_udp_hdr;
-	uint16_t last_spec = 0;
-	void **to_next, **from;
+	rte_edge_t udplookup_node, next;
 	struct rte_mbuf *mbuf;
-	uint16_t held = 0;
-	uint16_t next = 0;
 	int i, rc;
 
-	/* Speculative next */
-	next_index = UDP4_INPUT_NODE_NEXT_INDEX(node->ctx);
-
-	from = objs;
-
-	to_next = rte_node_next_stream_get(graph, node, next_index, nb_objs);
 	for (i = 0; i < nb_objs; i++) {
 		struct flow_key key_port;
 
@@ -185,30 +169,8 @@ udp4_input_node_process_scalar(struct rte_graph *graph, struct rte_node *node,
 		next = (rc < 0) ? RTE_NODE_UDP4_INPUT_NEXT_PKT_DROP
 				    : udplookup_node;
 
-		if (unlikely(next_index != next)) {
-			/* Copy things successfully speculated till now */
-			rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-			from += last_spec;
-			to_next += last_spec;
-			held += last_spec;
-			last_spec = 0;
-
-			rte_node_enqueue_x1(graph, node, next, from[0]);
-			from += 1;
-		} else {
-			last_spec += 1;
-		}
+		rte_node_enqueue_deferred(graph, node, next, i);
 	}
-	/* !!! Home run !!! */
-	if (likely(last_spec == nb_objs)) {
-		rte_node_next_stream_move(graph, node, next_index);
-		return nb_objs;
-	}
-	held += last_spec;
-	rte_memcpy(to_next, from, last_spec * sizeof(from[0]));
-	rte_node_next_stream_put(graph, node, next_index, held);
-	/* Save the last next used */
-	UDP4_INPUT_NODE_NEXT_INDEX(node->ctx) = next;
 
 	return nb_objs;
 }
-- 
2.52.0


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* Re: [EXTERNAL] [RFC PATCH dpdk 0/3] graph: deferred enqueue API for simplified node processing
  2026-02-05  9:26 [RFC PATCH dpdk 0/3] graph: deferred enqueue API for simplified node processing Robin Jarry
                   ` (2 preceding siblings ...)
  2026-02-05  9:26 ` [RFC PATCH dpdk 3/3] node: use deferred enqueue API in process functions Robin Jarry
@ 2026-03-10  5:31 ` Pavan Nikhilesh Bhagavatula
  2026-03-31  3:10 ` Stephen Hemminger
  4 siblings, 0 replies; 8+ messages in thread
From: Pavan Nikhilesh Bhagavatula @ 2026-03-10  5:31 UTC (permalink / raw)
  To: Robin Jarry, dev@dpdk.org; +Cc: Jerin Jacob

>
>Performance was measured with l3fwd forwarding between two ports of an
>Intel E810-XXV 2x25G NIC (1 RX queue per port). Two graph worker threads
>ran on hyper threads of the same physical core on an Intel Xeon Silver
>4316 CPU @ 2.30GHz.
>
>Results:
>- Baseline (manual speculation): 37.0 Mpps
>- Deferred API:                  36.2 Mpps (-2.2%)
>

On Octeon(Neoverse-n2) platform we see a slight performance increase ~1.5%.

>The slight overhead comes from per-packet edge comparisons. However,
>this is offset by:
>- 826 fewer lines of code across 13 node implementations
>- Reduced instruction cache pressure from simpler code paths
>- Elimination of per-node speculation boilerplate
>- Easier development of new nodes




^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [EXTERNAL] [RFC PATCH dpdk 1/3] graph: optimize rte_node_enqueue_next to batch by edge
  2026-02-05  9:26 ` [RFC PATCH dpdk 1/3] graph: optimize rte_node_enqueue_next to batch by edge Robin Jarry
@ 2026-03-10  5:46   ` Pavan Nikhilesh Bhagavatula
  0 siblings, 0 replies; 8+ messages in thread
From: Pavan Nikhilesh Bhagavatula @ 2026-03-10  5:46 UTC (permalink / raw)
  To: Robin Jarry, dev@dpdk.org, Jerin Jacob, Kiran Kumar Kokkilagadda,
	Nithin Kumar Dabilpuram, Zhirun Yan

>Replace the per-object rte_node_enqueue_x1() calls with batched
>rte_node_enqueue() calls. The function now tracks runs of consecutive
>objects going to the same edge and flushes them in bulk.
>
>When all objects go to the same edge and come from the node's own
>buffer (objs == node->objs), use rte_node_next_stream_move() which
>swaps pointers instead of copying.
>
>Signed-off-by: Robin Jarry <rjarry@redhat.com>
>---
> lib/graph/rte_graph_worker_common.h | 15 +++++++++++++--
> 1 file changed, 13 insertions(+), 2 deletions(-)
>
>diff --git a/lib/graph/rte_graph_worker_common.h b/lib/graph/rte_graph_worker_common.h
>index 4ab53a533e4c..7fda67c07169 100644
>--- a/lib/graph/rte_graph_worker_common.h
>+++ b/lib/graph/rte_graph_worker_common.h
>@@ -432,10 +432,21 @@ static inline void
> rte_node_enqueue_next(struct rte_graph *graph, struct rte_node *node,
>                      rte_edge_t *nexts, void **objs, uint16_t nb_objs)
> {
>+       rte_edge_t last = nexts[0];
>+       uint16_t run_start = 0;
>        uint16_t i;
>
>-       for (i = 0; i < nb_objs; i++)
>-               rte_node_enqueue_x1(graph, node, nexts[i], objs[i]);
>+       for (i = 1; i < nb_objs; i++) {
>+               if (nexts[i] != last) {

We can probably use SIMD here for comparision, 128b would process 8 nexts at a time.
But might not be worth it if nexts are too unique.

>+                       rte_node_enqueue(graph, node, last, &objs[run_start], i - run_start);
>+                       run_start = i;
>+                       last = nexts[i];
>+               }
>+       }
>+       if (run_start == 0 && objs == node->objs)
>+               rte_node_next_stream_move(graph, node, last);
>+       else
>+               rte_node_enqueue(graph, node, last, &objs[run_start], nb_objs - run_start);
> }
>
> /**
--
2.52.0



^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [EXTERNAL] [RFC PATCH dpdk 2/3] graph: add deferred enqueue API for batch processing
  2026-02-05  9:26 ` [RFC PATCH dpdk 2/3] graph: add deferred enqueue API for batch processing Robin Jarry
@ 2026-03-10  5:49   ` Pavan Nikhilesh Bhagavatula
  0 siblings, 0 replies; 8+ messages in thread
From: Pavan Nikhilesh Bhagavatula @ 2026-03-10  5:49 UTC (permalink / raw)
  To: Robin Jarry, dev@dpdk.org, Jerin Jacob, Kiran Kumar Kokkilagadda,
	Nithin Kumar Dabilpuram, Zhirun Yan

>Add rte_node_enqueue_deferred() which tracks runs of consecutive
>objects going to the same edge and flushes them efficiently in bulk.
>When all objects go to the same edge (the common case), the function
>uses rte_node_next_stream_move() which swaps pointers instead of
>copying data.
>
>The deferred state (run_start and last_edge) is stored in the node
>fast-path cache line 1, keeping it close to other frequently accessed
>node data. The last_edge is preserved across node invocations,
>allowing speculation: if traffic continues to the same destination,
>no action is needed until the edge changes.
>
>The flush is performed automatically at the end of node processing
>by __rte_node_process().
>
>Signed-off-by: Robin Jarry <rjarry@redhat.com>
>---
> lib/graph/graph_populate.c          |  1 +
> lib/graph/rte_graph_worker_common.h | 75 +++++++++++++++++++++++++++++
> 2 files changed, 76 insertions(+)
>

<snip>

>+/**
>+ * Enqueue objects to a next node in a cache-efficient deferred manner.
>+ *
>+ * This function tracks runs of objects going to the same edge. When the edge
>+ * changes, the previous run is flushed using bulk enqueue. At the end of node
>+ * processing, any remaining objects are flushed automatically. When all
>+ * objects go to the same edge (the common case), rte_node_next_stream_move()
>+ * is used which swaps pointers instead of copying.
>+ *
>+ * The function does not require consecutive idx values. It can be called with
>+ * any stride (e.g., 0, 4, 8, ... to process batches of 4). All objects from
>+ * the previous idx up to the current one are considered part of the current
>+ * run until the edge changes.
>+ *
>+ * For homogeneous traffic, the destination node structure is touched once
>+ * per batch instead of once per object, reducing cache line bouncing.
>+ *
>+ * @param graph
>+ *   Graph pointer returned from rte_graph_lookup().
>+ * @param node
>+ *   Current node pointer.
>+ * @param next
>+ *   Next node edge index.
>+ * @param idx
>+ *   Index of the current object being processed in node->objs[].
>+ *
>+ * @see rte_node_next_stream_move().
>+ */
>+static inline void
>+rte_node_enqueue_deferred(struct rte_graph *graph, struct rte_node *node,
>+                         rte_edge_t next, uint16_t idx)
>+{
>+       if (next != node->deferred_last_edge) {
>+               /* edge changed, flush previous run if not empty */
>+               if (idx > node->deferred_run_start)
>+                       rte_node_enqueue(graph, node, node->deferred_last_edge,
>+                                        &node->objs[node->deferred_run_start],
>+                                        idx - node->deferred_run_start);
>+               node->deferred_run_start = idx;
>+               node->deferred_last_edge = next;
>+       }
>+}
>+

Can we add a deferredx4 variant too? It need not have SIMD but would reduce LoC
further.

Thanks,
Pavan.

^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [RFC PATCH dpdk 0/3] graph: deferred enqueue API for simplified node processing
  2026-02-05  9:26 [RFC PATCH dpdk 0/3] graph: deferred enqueue API for simplified node processing Robin Jarry
                   ` (3 preceding siblings ...)
  2026-03-10  5:31 ` [EXTERNAL] [RFC PATCH dpdk 0/3] graph: deferred enqueue API for simplified node processing Pavan Nikhilesh Bhagavatula
@ 2026-03-31  3:10 ` Stephen Hemminger
  4 siblings, 0 replies; 8+ messages in thread
From: Stephen Hemminger @ 2026-03-31  3:10 UTC (permalink / raw)
  To: Robin Jarry; +Cc: dev, Jerin Jacob

On Thu,  5 Feb 2026 10:26:32 +0100
Robin Jarry <rjarry@redhat.com> wrote:

> This series introduces a deferred enqueue API for the graph library that
> simplifies node development while maintaining performance.
> 
> The current node implementations use a manual speculation pattern where
> each node pre-allocates destination buffer slots, tracks which packets
> diverge from the speculated edge, and handles fixups at the end. This
> results in complex boilerplate code with multiple local variables
> (to_next, from, held, last_spec), memcpy calls, and stream get/put
> operations repeated across every node.
> 
> The new rte_node_enqueue_deferred() API handles this automatically:
> - Tracks runs of consecutive packets going to the same edge
> - Flushes runs in bulk when the edge changes
> - Uses rte_node_next_stream_move() (pointer swap) when all packets
>   go to the same destination
> - Preserves last_edge across invocations for cross-batch speculation
> 
> The deferred state is stored in the node's fast-path cache line 1,
> alongside xstat_off, keeping frequently accessed data together.
> 
> Performance was measured with l3fwd forwarding between two ports of an
> Intel E810-XXV 2x25G NIC (1 RX queue per port). Two graph worker threads
> ran on hyper threads of the same physical core on an Intel Xeon Silver
> 4316 CPU @ 2.30GHz.
> 
> Results:
> - Baseline (manual speculation): 37.0 Mpps
> - Deferred API:                  36.2 Mpps (-2.2%)
> 
> The slight overhead comes from per-packet edge comparisons. However,
> this is offset by:
> - 826 fewer lines of code across 13 node implementations
> - Reduced instruction cache pressure from simpler code paths
> - Elimination of per-node speculation boilerplate
> - Easier development of new nodes
> 
> Robin Jarry (3):
>   graph: optimize rte_node_enqueue_next to batch by edge
>   graph: add deferred enqueue API for batch processing
>   node: use deferred enqueue API in process functions
> 
>  app/graph/ip4_output_hook.c         |  35 +-------
>  lib/graph/graph_populate.c          |   1 +
>  lib/graph/rte_graph_worker_common.h |  90 ++++++++++++++++++-
>  lib/node/interface_tx_feature.c     | 105 +++-------------------
>  lib/node/ip4_local.c                |  36 +-------
>  lib/node/ip4_lookup.c               |  37 +-------
>  lib/node/ip4_lookup_fib.c           |  36 +-------
>  lib/node/ip4_lookup_neon.h          | 100 ++-------------------
>  lib/node/ip4_lookup_sse.h           | 100 ++-------------------
>  lib/node/ip4_rewrite.c              | 120 +++----------------------
>  lib/node/ip6_lookup.c               |  95 ++------------------
>  lib/node/ip6_lookup_fib.c           |  34 +-------
>  lib/node/ip6_rewrite.c              | 118 +++----------------------
>  lib/node/pkt_cls.c                  | 130 +++-------------------------
>  lib/node/udp4_input.c               |  42 +--------
>  15 files changed, 170 insertions(+), 909 deletions(-)
> 

AI review comments:

Review: [RFC PATCH dpdk 1/3] graph: optimize rte_node_enqueue_next to batch by edge
        [RFC PATCH dpdk 2/3] graph: add deferred enqueue API for batch processing
        [RFC PATCH dpdk 3/3] node: use deferred enqueue API in process functions

This is a well-motivated series. The deferred enqueue API is a clean
abstraction that replaces the repetitive speculation boilerplate in
every node process function. The code reduction in patch 3 speaks
for itself -- roughly 900 lines of hand-rolled speculation logic
replaced with single-line rte_node_enqueue_deferred() calls.

Patch 1/3 - rte_node_enqueue_next batching

Error: Out-of-bounds read when nb_objs is 0.

  rte_edge_t last = nexts[0];

reads nexts[0] unconditionally. If nb_objs == 0, this is an
out-of-bounds access. The old code handled this correctly because
the loop `for (i = 0; i < nb_objs; i++)` would simply not execute.
Add a guard:

  if (nb_objs == 0)
      return;

Patch 2/3 - deferred enqueue API

Error: Flush in __rte_node_process fires for nodes that do not use
the deferred API, corrupting their data.

__rte_node_process now unconditionally checks:

  if (node->deferred_last_edge != RTE_EDGE_ID_INVALID)
      __rte_node_enqueue_deferred_flush(graph, node);

deferred_last_edge is initialized to RTE_EDGE_ID_INVALID at graph
populate time, but it is preserved across invocations ("Keep
deferred_last_edge from previous invocation for speculation"). Once
a deferred-API node sets it to a valid value, it will never be
reset to RTE_EDGE_ID_INVALID.

This means any node that uses the old manual speculation pattern
(rte_node_next_stream_move / stream_get / stream_put directly)
will have the flush fire after its process function returns,
because deferred_last_edge may be stale from a prior incarnation
or from the node's first call. The flush would then attempt
rte_node_next_stream_move based on the stale deferred_last_edge
and the current node->idx (which may still be non-zero because
stream_move does not zero src->idx).

Even though patch 3 converts all in-tree nodes, this is a public
API change in __rte_node_process that affects all node
implementations including third-party and out-of-tree nodes that
are compiled against the DPDK headers. Any node process function
that does not use rte_node_enqueue_deferred() will be silently
broken.

The fix is to ensure the flush only fires when the deferred API
was actually used during this invocation. For example, reset
deferred_last_edge to RTE_EDGE_ID_INVALID at the start of
__rte_node_process (instead of preserving it), or add a flag
that is set when rte_node_enqueue_deferred() is first called:

  /* Option A: reset each invocation (loses cross-batch
   * speculation but is safe) */
  node->deferred_last_edge = RTE_EDGE_ID_INVALID;

  /* Option B: add a "deferred_active" flag set by
   * rte_node_enqueue_deferred, cleared by flush */

Note that option A sacrifices the cross-batch speculation benefit
described in the commit message. Option B preserves speculation
but adds a branch and a byte to the cache line. The right choice
depends on how much the cross-batch speculation matters in
practice -- given the 2-3% overhead already mentioned, the
safety of option A may be preferable.

Warning: deferred_run_start and deferred_last_edge add 4 bytes to
cache line 1 of struct rte_node.

The commit says "stored in the node fast-path cache line 1,
keeping it close to other frequently accessed node data." Since
this is an ABI change to a public struct, it should be noted in
the release notes. Verify that the addition does not push other
hot fields out of the cache line or cross a cache line boundary.
The current cache line 1 only had xstat_off before this change,
so there is room, but this should be explicitly confirmed.

Warning: __rte_node_enqueue_deferred_flush uses node->idx after
the process function has potentially manipulated it.

The flush reads node->idx as "count" to determine how many
objects to flush. This relies on the invariant that the process
function did NOT modify node->idx. With the deferred API this
is true (the process function just calls
rte_node_enqueue_deferred which doesn't touch node->idx), but
this implicit contract is fragile and undocumented. If any
process function uses a mix of deferred and non-deferred enqueue
APIs, node->idx may not reflect the original object count. Add a
comment or assertion documenting this invariant.

Warning: the new API should be marked __rte_experimental.

rte_node_enqueue_deferred() is a new public inline function in an
installed header. Per DPDK policy, new APIs must be marked
__rte_experimental.

Patch 3/3 - node conversions

Warning: pkt_cls_node_process loses its cross-invocation l2l3_type
speculation state.

The old code saved the last packet type in ctx->l2l3_type and
used it as the speculative next index on the following invocation.
The new code removes the pkt_cls_node_ctx struct entirely and
relies on deferred_last_edge for speculation. This works because
deferred_last_edge maps to the next node edge, which is what the
old code was ultimately speculating on. However, the old code
speculated on the *packet type* (l2l3_type) which was then
mapped to the edge -- this two-level speculation could be more
precise if different packet types map to the same edge. In
practice, this is likely fine since the deferred API speculates
on the edge directly, which is what matters for performance.
This is noted for completeness.

Info: The conversions are mechanical and consistent across all
13 files. Each follows the same pattern: remove from/to_next/
held/last_spec/next_index variables, remove stream_get/put/
memcpy/enqueue_x1 logic, replace with
rte_node_enqueue_deferred(graph, node, next, i). The manual
`i` index tracking is correct in all cases -- the x4 loops
increment i by 4, the x1 tail loops by 1, matching the pkts
pointer increment in n_left_from.

Reviewed-by: Stephen Hemminger <stephen@networkplumber.org>

^ permalink raw reply	[flat|nested] 8+ messages in thread

end of thread, other threads:[~2026-03-31  3:10 UTC | newest]

Thread overview: 8+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2026-02-05  9:26 [RFC PATCH dpdk 0/3] graph: deferred enqueue API for simplified node processing Robin Jarry
2026-02-05  9:26 ` [RFC PATCH dpdk 1/3] graph: optimize rte_node_enqueue_next to batch by edge Robin Jarry
2026-03-10  5:46   ` [EXTERNAL] " Pavan Nikhilesh Bhagavatula
2026-02-05  9:26 ` [RFC PATCH dpdk 2/3] graph: add deferred enqueue API for batch processing Robin Jarry
2026-03-10  5:49   ` [EXTERNAL] " Pavan Nikhilesh Bhagavatula
2026-02-05  9:26 ` [RFC PATCH dpdk 3/3] node: use deferred enqueue API in process functions Robin Jarry
2026-03-10  5:31 ` [EXTERNAL] [RFC PATCH dpdk 0/3] graph: deferred enqueue API for simplified node processing Pavan Nikhilesh Bhagavatula
2026-03-31  3:10 ` Stephen Hemminger

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox