From: Stephen Hemminger <stephen@networkplumber.org>
To: dev@dpdk.org
Cc: Stephen Hemminger <stephen@networkplumber.org>,
Thomas Monjalon <thomas@monjalon.net>,
Reshma Pattan <reshma.pattan@intel.com>,
Anatoly Burakov <anatoly.burakov@intel.com>
Subject: [RFC 2/4] capture: infrastructure wireshark packet capture
Date: Tue, 9 Jun 2026 14:02:03 -0700 [thread overview]
Message-ID: <20260609210540.768074-3-stephen@networkplumber.org> (raw)
In-Reply-To: <20260609210540.768074-1-stephen@networkplumber.org>
This provides a telemetry extension to provide packet capture.
It is intended to be used with a front end script to provide
external packet capture for wireshark.
Signed-off-by: Stephen Hemminger <stephen@networkplumber.org>
---
MAINTAINERS | 1 +
doc/guides/rel_notes/release_26_07.rst | 7 +
lib/capture/capture.c | 821 +++++++++++++++++++++++++
lib/capture/capture_impl.h | 56 ++
lib/capture/filter.c | 108 ++++
lib/capture/meson.build | 19 +
lib/meson.build | 1 +
7 files changed, 1013 insertions(+)
create mode 100644 lib/capture/capture.c
create mode 100644 lib/capture/capture_impl.h
create mode 100644 lib/capture/filter.c
create mode 100644 lib/capture/meson.build
diff --git a/MAINTAINERS b/MAINTAINERS
index 4a68a19b32..dd359d956e 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -1723,6 +1723,7 @@ F: doc/guides/sample_app_ug/qos_scheduler.rst
Packet capture
M: Reshma Pattan <reshma.pattan@intel.com>
M: Stephen Hemminger <stephen@networkplumber.org>
+F: lib/capture/
F: lib/pdump/
F: doc/guides/prog_guide/pdump_lib.rst
F: app/test/test_pdump.*
diff --git a/doc/guides/rel_notes/release_26_07.rst b/doc/guides/rel_notes/release_26_07.rst
index d7a2df88c1..309a6078bd 100644
--- a/doc/guides/rel_notes/release_26_07.rst
+++ b/doc/guides/rel_notes/release_26_07.rst
@@ -146,6 +146,13 @@ New Features
Add experimental telemetry callback ``rte_telemetry_register_cmd_fd_arg()``
to allow command to receive file descriptors passed by client.
+* **Added packet capture library.**
+
+ Added a new ``capture`` library which provides a mechanism via telemetry
+ interface for capturing packets to a file descriptor. This mechanism
+ is used by the new ``dpdk-wireshark-extcap.py`` script which provides
+ seamless integration with Wireshark.
+
Removed Items
-------------
diff --git a/lib/capture/capture.c b/lib/capture/capture.c
new file mode 100644
index 0000000000..a837c377fc
--- /dev/null
+++ b/lib/capture/capture.c
@@ -0,0 +1,821 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2026 Stephen Hemminger
+ */
+
+#include <ctype.h>
+#include <errno.h>
+#include <pthread.h>
+#include <poll.h>
+#include <signal.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <sys/queue.h>
+#include <sys/utsname.h>
+#include <net/if.h>
+#include <unistd.h>
+
+#include <rte_branch_prediction.h>
+#include <rte_common.h>
+#include <rte_debug.h>
+#include <rte_ethdev.h>
+#include <rte_log.h>
+#include <rte_malloc.h>
+#include <rte_memory.h>
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include <rte_pcapng.h>
+#include <rte_pause.h>
+#include <rte_ring.h>
+#include <rte_spinlock.h>
+#include <rte_stdatomic.h>
+#include <rte_string_fns.h>
+#include <rte_telemetry.h>
+#include <rte_version.h>
+
+#include "capture_impl.h"
+
+#ifndef DLT_EN10MB
+#define DLT_EN10MB 1
+#endif
+
+RTE_LOG_REGISTER_DEFAULT(rte_capture_logtype, NOTICE);
+
+/*
+ * List of active captures.
+ *
+ * This is a control-plane only structure: it is created, walked and torn down
+ * from the telemetry handler thread and from the per-capture drain threads,
+ * never from the dataplane. A plain spinlock is therefore enough; the EAL
+ * shared tailq (rte_tailq) is not used because captures are not visible to
+ * secondary processes in this design.
+ */
+TAILQ_HEAD(capture_list, capture);
+static struct capture_list capture_list = TAILQ_HEAD_INITIALIZER(capture_list);
+static rte_spinlock_t capture_lock = RTE_SPINLOCK_INITIALIZER;
+
+#define DEFAULT_SNAPLEN 262144u /* from tcpdump et.al. */
+#define CAPTURE_BURST_SIZE 32u
+#define MBUF_POOL_CACHE_SIZE 32
+#define CAPTURE_RING_SIZE 256
+#define CAPTURE_POOL_SIZE 1024
+#define SLEEP_THRESHOLD 100
+#define SLEEP_US 100
+
+/* Parameter values: only used on stack inside parsing */
+struct capture_config {
+ uint16_t port_id;
+ uint32_t snaplen;
+ const char *filter_str;
+};
+
+/*
+ * Data used by callback
+ * This per-queue to avoid cache thrashing
+ */
+struct __rte_cache_aligned capture_rxtx_cb {
+ RTE_ATOMIC(uint32_t) use_count;
+ const struct rte_eth_rxtx_callback *cb;
+
+ struct capture_stats {
+ RTE_ATOMIC(uint64_t) accepted; /**< Number of packets accepted by filter. */
+ RTE_ATOMIC(uint64_t) filtered; /**< Number of packets rejected by filter. */
+ RTE_ATOMIC(uint64_t) nombuf; /**< Number of mbuf allocation failures. */
+ RTE_ATOMIC(uint64_t) ringfull; /**< Number of missed packets due to ring full. */
+ } stats;
+};
+
+/*
+ * Per-capture instance state.
+ */
+struct capture {
+ TAILQ_ENTRY(capture) next; /* links into capture_list */
+ unsigned int idx;
+ RTE_ATOMIC(bool) running;
+ int fd; /* file descriptor of FIFO */
+ struct rte_capture_filter *filter;
+ struct rte_ring *ring; /* ring from dataplane to capture thread */
+ struct rte_mempool *mp; /* mempool for capture mbufs */
+
+ uint32_t snaplen; /* amount of data to copy */
+ uint16_t port_id;
+ uint16_t tx_queues;
+ uint16_t rx_queues;
+
+ /* per-queue data sized to max(tx_queue, rx_queues) */
+ struct capture_cbs {
+ struct capture_rxtx_cb tx_cb;
+ struct capture_rxtx_cb rx_cb;
+ } cbs[];
+};
+
+/* Wait for callbacks to be idle before free */
+static void
+capture_cb_wait(struct capture_rxtx_cb *cbs)
+{
+ /* wait until use_count is even (not in use) */
+ RTE_WAIT_UNTIL_MASKED(&cbs->use_count, 1, ==, 0, rte_memory_order_acquire);
+}
+
+/* Hold a reference to callback while active */
+static inline __rte_hot void
+capture_cb_hold(struct capture_rxtx_cb *cbs)
+{
+ rte_atomic_fetch_add_explicit(&cbs->use_count, 1, rte_memory_order_acquire);
+}
+
+/* Drop reference to callback when done */
+static inline __rte_hot void
+capture_cb_release(struct capture_rxtx_cb *cbs)
+{
+ rte_atomic_fetch_sub_explicit(&cbs->use_count, 1, rte_memory_order_release);
+}
+
+/* Cleanup call backs */
+static void __rte_cold
+capture_cb_cleanup(struct capture *cap)
+{
+
+ for (unsigned int q = 0; q < cap->tx_queues; q++) {
+ struct capture_rxtx_cb *tx_cb = &cap->cbs[q].tx_cb;
+ if (tx_cb->cb) {
+ rte_eth_remove_tx_callback(cap->port_id, q, tx_cb->cb);
+ capture_cb_wait(tx_cb);
+ tx_cb->cb = NULL;
+ }
+ }
+
+ for (unsigned int q = 0; q < cap->rx_queues; q++) {
+ struct capture_rxtx_cb *rx_cb = &cap->cbs[q].rx_cb;
+ if (rx_cb->cb) {
+ rte_eth_remove_rx_callback(cap->port_id, q, rx_cb->cb);
+ capture_cb_wait(rx_cb);
+ rx_cb->cb = NULL;
+ }
+ }
+}
+
+/* Create a clone of mbuf to be placed into ring. */
+static inline __rte_hot void
+capture_copy_burst(uint16_t port_id, uint16_t queue_id,
+ enum rte_pcapng_direction direction,
+ struct rte_mbuf **pkts, unsigned int nb_pkts,
+ const struct capture *cap,
+ struct capture_stats *stats)
+{
+ unsigned int i, ring_enq, d_pkts = 0;
+ struct rte_mbuf *dup_bufs[CAPTURE_BURST_SIZE]; /* duplicated packets */
+ struct rte_ring *ring = cap->ring;
+ struct rte_mempool *mp = cap->mp;
+ uint32_t snaplen = cap->snaplen;
+ struct rte_mbuf *p;
+
+ RTE_ASSERT(nb_pkts <= CAPTURE_BURST_SIZE);
+
+ for (i = 0; i < nb_pkts; i++) {
+ /*
+ * This uses same BPF return value convention as socket filter and pcap_offline_filter.
+ * if program returns zero then packet doesn't match the filter (will be ignored).
+ */
+ if (cap->filter) {
+ uint64_t rc = __rte_capture_filter(cap->filter, pkts[i]);
+ if (rc == 0) {
+ rte_atomic_fetch_add_explicit(&stats->filtered, 1,
+ rte_memory_order_relaxed);
+ continue;
+ }
+ }
+
+ p = rte_pcapng_copy(port_id, queue_id, pkts[i], mp,
+ snaplen, direction, NULL);
+
+ if (unlikely(p == NULL))
+ rte_atomic_fetch_add_explicit(&stats->nombuf, 1,
+ rte_memory_order_relaxed);
+ else
+ dup_bufs[d_pkts++] = p;
+ }
+
+ if (d_pkts == 0)
+ return;
+
+ rte_atomic_fetch_add_explicit(&stats->accepted, d_pkts, rte_memory_order_relaxed);
+
+ ring_enq = rte_ring_enqueue_burst(ring, (void *)&dup_bufs[0], d_pkts, NULL);
+ if (unlikely(ring_enq < d_pkts)) {
+ unsigned int drops = d_pkts - ring_enq;
+
+ rte_atomic_fetch_add_explicit(&stats->ringfull, drops, rte_memory_order_relaxed);
+ rte_pktmbuf_free_bulk(&dup_bufs[ring_enq], drops);
+ }
+}
+
+/* Create a clone of mbuf to be placed into ring. */
+static __rte_hot inline void
+capture_copy(uint16_t port_id, uint16_t queue_id,
+ enum rte_pcapng_direction direction,
+ struct rte_mbuf **pkts, uint16_t nb_pkts,
+ const struct capture *cap,
+ struct capture_stats *stats)
+{
+ unsigned int offs = 0;
+
+ do {
+ unsigned int n = RTE_MIN(nb_pkts - offs, CAPTURE_BURST_SIZE);
+
+ capture_copy_burst(port_id, queue_id, direction, &pkts[offs], n, cap, stats);
+ offs += n;
+ } while (offs < nb_pkts);
+}
+
+static __rte_hot uint16_t
+capture_rx(uint16_t port, uint16_t queue,
+ struct rte_mbuf **pkts, uint16_t nb_pkts,
+ uint16_t max_pkts __rte_unused, void *user_params)
+{
+ struct capture *cap = user_params;
+ struct capture_rxtx_cb *cbs = &cap->cbs[queue].rx_cb;
+
+ capture_cb_hold(cbs);
+ capture_copy(port, queue, RTE_PCAPNG_DIRECTION_IN, pkts, nb_pkts, cap, &cbs->stats);
+ capture_cb_release(cbs);
+
+ return nb_pkts;
+}
+
+static __rte_hot uint16_t
+capture_tx(uint16_t port, uint16_t queue,
+ struct rte_mbuf **pkts, uint16_t nb_pkts, void *user_params)
+{
+ struct capture *capture = user_params;
+ struct capture_rxtx_cb *cbs = &capture->cbs[queue].tx_cb;
+
+ capture_cb_hold(cbs);
+ capture_copy(port, queue, RTE_PCAPNG_DIRECTION_OUT, pkts, nb_pkts, capture, &cbs->stats);
+ capture_cb_release(cbs);
+
+ return nb_pkts;
+}
+
+/*
+ * Break the comma separated parameter string into tokens
+ * and fill in the capture config structure.
+ *
+ * Does not use rte_kvargs because that would mangle [] etc in filter expression.
+ */
+static __rte_cold int
+parse_params(char *str, struct capture_config *cfg)
+{
+ uint32_t snaplen = DEFAULT_SNAPLEN;
+
+ char *args[4];
+ int nargs = rte_strsplit(str, strlen(str), args, RTE_DIM(args), ',');
+ /* Need at least the port id */
+ if (nargs < 1) {
+ CAPTURE_LOG(ERR, "missing parameters '%s'", str);
+ return -1;
+ }
+
+ /* Parse port id (required) */
+ char *endp;
+ errno = 0;
+ unsigned long port_id = strtoul(args[0], &endp, 10);
+ if (errno != 0 || port_id >= RTE_MAX_ETHPORTS) {
+ CAPTURE_LOG(ERR, "invalid port_id=%s", args[0]);
+ return -1;
+ }
+ if (*endp != '\0') {
+ CAPTURE_LOG(ERR, "garbage after port_id value");
+ return -1;
+ }
+
+ /* parse remainder as name=value parameters */
+ for (int i = 1; i < nargs; i++) {
+ char *key = args[i];
+
+ /* split at the = */
+ char *eq = strchr(args[i], '=');
+
+ /* all current options require argument after = */
+ if (eq == NULL || eq[1] == '\0') {
+ CAPTURE_LOG(ERR, "missing value for '%s'", key);
+ return -1;
+ }
+ *eq = '\0';
+ char *value = eq + 1;
+
+ if (strcmp(key, "filter") == 0) {
+ cfg->filter_str = value;
+ } else if (strcmp(key, "snaplen") == 0) {
+ errno = 0;
+ unsigned long len = strtoul(value, &endp, 10);
+ if (errno != 0 || *endp != '\0' || len >= UINT32_MAX) {
+ CAPTURE_LOG(ERR, "invalid snaplen '%lu'", len);
+ return -1;
+ }
+ snaplen = len;
+ } else {
+ CAPTURE_LOG(ERR, "unknown parameter '%s'", key);
+ return -1;
+ }
+ }
+
+ cfg->port_id = port_id;
+
+ /*
+ * Default is 256K from tcpdump legacy
+ * using snaplen=0 means everything.
+ */
+ cfg->snaplen = snaplen > 0 ? snaplen : UINT32_MAX;
+
+ return 0;
+}
+
+/*
+ * Open pcapng handle.
+ * Look up OS name and add DPDK version.
+ */
+static __rte_cold rte_pcapng_t *
+capture_pcapng_open(int fd, uint16_t port_id, const char *filter)
+{
+ rte_pcapng_t *pcapng = NULL;
+ char port_name[RTE_ETH_NAME_MAX_LEN];
+ char ifname[IFNAMSIZ];
+ char *ifdescr = NULL;
+ struct utsname uts;
+ char *osname = NULL;
+
+ /* OS name is optional, just keep going if not found */
+ if (uname(&uts) == 0 &&
+ asprintf(&osname, "%s %s", uts.sysname, uts.release) < 0)
+ osname = NULL;
+
+ /* add DPDK internal name */
+ if (rte_eth_dev_get_name_by_port(port_id, port_name) != 0) {
+ CAPTURE_LOG(NOTICE, "Could not find port name for %u", port_id);
+ goto close_fd;
+ }
+
+ /* match name convention used by dpdk-wireshark-extcap.py */
+ snprintf(ifname, sizeof(ifname), "dpdk:%u", port_id);
+ if (asprintf(&ifdescr, "DPDK %s (port %u)", port_name, port_id) < 0)
+ ifdescr = NULL;
+
+ pcapng = rte_pcapng_fdopen(fd, osname, NULL, rte_version(), NULL);
+ if (pcapng == NULL) {
+ CAPTURE_LOG(ERR, "Add section block failed");
+ goto close_fd;
+ }
+
+ if (rte_pcapng_add_interface(pcapng, port_id, DLT_EN10MB, ifname, ifdescr, filter) < 0) {
+ CAPTURE_LOG(ERR, "Add interface for port %u:%s failed", port_id, ifname);
+ rte_pcapng_close(pcapng); /* closes fd */
+ pcapng = NULL;
+ }
+ goto cleanup;
+
+close_fd:
+ close(fd);
+cleanup:
+ free(osname);
+ free(ifdescr);
+ return pcapng;
+}
+
+static __rte_cold void
+capture_link(struct capture *cap)
+{
+ rte_spinlock_lock(&capture_lock);
+ TAILQ_INSERT_TAIL(&capture_list, cap, next);
+ rte_spinlock_unlock(&capture_lock);
+}
+
+static __rte_cold void
+capture_unlink(struct capture *cap)
+{
+ rte_spinlock_lock(&capture_lock);
+ TAILQ_REMOVE(&capture_list, cap, next);
+ rte_spinlock_unlock(&capture_lock);
+}
+
+static __rte_cold void
+capture_free(struct capture *cap)
+{
+ if (cap == NULL)
+ return;
+
+ __rte_capture_filter_free(cap->filter);
+ rte_ring_free(cap->ring);
+ rte_mempool_free(cap->mp);
+ rte_free(cap);
+}
+
+/* Generate unique id for naming and telemetry */
+static unsigned int
+get_unique_id(void)
+{
+ static RTE_ATOMIC(unsigned int) capture_instance;
+
+ return rte_atomic_fetch_add_explicit(&capture_instance, 1, rte_memory_order_relaxed);
+}
+
+/*
+ * Convert configuration into running state
+ */
+static struct capture *
+capture_alloc(const struct capture_config *cfg, int fd,
+ const struct rte_eth_dev_info *dev_info,
+ int socket_id)
+{
+ struct capture *cap;
+ char ring_name[RTE_RING_NAMESIZE];
+ uint16_t mbuf_size;
+ uint16_t num_queues = RTE_MAX(dev_info->nb_tx_queues, dev_info->nb_rx_queues);
+ size_t cb_size = sizeof(*cap) + num_queues * sizeof(cap->cbs[0]);
+
+ cap = rte_zmalloc_socket("capture", cb_size, RTE_CACHE_LINE_SIZE, socket_id);
+ if (cap == NULL) {
+ CAPTURE_LOG(ERR, "Could not allocate capture struct");
+ goto err_close_fd;
+ }
+
+ cap->idx = get_unique_id();
+
+ snprintf(ring_name, sizeof(ring_name), "capture-%u", cap->idx);
+ cap->ring = rte_ring_create(ring_name, CAPTURE_RING_SIZE, socket_id, 0);
+ if (cap->ring == NULL) {
+ CAPTURE_LOG(ERR, "Could not create ring");
+ goto err_close_fd;
+ }
+
+ /*
+ * If snapshot length is smaller than one mbuf segment then pool
+ * element size can be reduced; otherwise can just use the default
+ * and rte_pktmbuf_copy handle multiple segments.
+ */
+ if (cfg->snaplen < RTE_MBUF_DEFAULT_BUF_SIZE)
+ mbuf_size = rte_pcapng_mbuf_size(cfg->snaplen);
+ else
+ mbuf_size = RTE_MBUF_DEFAULT_BUF_SIZE;
+
+ cap->mp = rte_pktmbuf_pool_create_by_ops(ring_name, CAPTURE_POOL_SIZE,
+ MBUF_POOL_CACHE_SIZE, 0, mbuf_size,
+ socket_id, "ring_mp_mc");
+ if (cap->mp == NULL) {
+ CAPTURE_LOG(ERR, "Could not create mempool");
+ goto err_close_fd;
+ }
+
+ if (cfg->filter_str) {
+ cap->filter = __rte_capture_filter_create(cfg->filter_str);
+ if (cap->filter == NULL) {
+ CAPTURE_LOG(ERR, "Could not compile filter: %s", cfg->filter_str);
+ goto err_close_fd;
+ }
+ }
+
+ cap->fd = fd;
+ cap->port_id = cfg->port_id;
+ rte_atomic_store_explicit(&cap->running, true, rte_memory_order_relaxed);
+ cap->snaplen = cfg->snaplen;
+ cap->tx_queues = dev_info->nb_tx_queues;
+ cap->rx_queues = dev_info->nb_rx_queues;
+
+ for (unsigned int q = 0; q < cap->tx_queues; q++) {
+ struct capture_rxtx_cb *tx_cb = &cap->cbs[q].tx_cb;
+ tx_cb->cb = rte_eth_add_tx_callback(cfg->port_id, q, capture_tx, cap);
+ if (tx_cb->cb == NULL)
+ CAPTURE_LOG(ERR, "Register tx callback for %u:%u failed",
+ cfg->port_id, q);
+ }
+
+ for (unsigned int q = 0; q < cap->rx_queues; q++) {
+ struct capture_rxtx_cb *rx_cb = &cap->cbs[q].rx_cb;
+ rx_cb->cb = rte_eth_add_rx_callback(cfg->port_id, q, capture_rx, cap);
+ if (rx_cb->cb == NULL)
+ CAPTURE_LOG(ERR, "Register rx callback for %u:%u failed",
+ cfg->port_id, q);
+ }
+
+ return cap;
+
+err_close_fd:
+ close(fd);
+ capture_free(cap);
+ return NULL;
+}
+
+/*
+ * The capture thread that moves packets from ring into the FIFO
+ */
+static void *
+capture_thread(void *arg)
+{
+ struct capture *cap = arg;
+ unsigned int empty_count = 0;
+
+ CAPTURE_LOG(INFO, "capture thread starting");
+
+ /* This thread wants to detect when FIFO gets closed */
+ sigset_t set;
+ sigemptyset(&set);
+ sigaddset(&set, SIGPIPE);
+ pthread_sigmask(SIG_BLOCK, &set, NULL);
+
+ rte_pcapng_t *pcapng = capture_pcapng_open(cap->fd, cap->port_id,
+ __rte_capture_filter_string(cap->filter));
+ if (pcapng == NULL)
+ goto error;
+
+ while (rte_atomic_load_explicit(&cap->running, rte_memory_order_relaxed)) {
+ unsigned int avail, n;
+ struct rte_mbuf *pkts[CAPTURE_BURST_SIZE];
+
+ n = rte_ring_sc_dequeue_burst(cap->ring, (void **) pkts, CAPTURE_BURST_SIZE, &avail);
+
+ /*
+ * If the ring is empty, apply simple heuristic to keep this
+ * thread from fully consuming the CPU.
+ */
+ if (n == 0) {
+ /* repeat a few times before waiting */
+ if (empty_count < SLEEP_THRESHOLD) {
+ ++empty_count;
+ } else {
+ struct pollfd pfd = { .fd = cap->fd };
+ struct timespec ts = { .tv_nsec = SLEEP_US * 1000 };
+
+ if (ppoll(&pfd, 1, &ts, NULL) > 0 &&
+ (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))) {
+ CAPTURE_LOG(NOTICE, "fifo reader closed");
+ break; /* reader is gone */
+ }
+ }
+ continue;
+ }
+
+ /* If this drained the ring count it as first emptying */
+ empty_count = (avail == 0);
+
+ if (unlikely(rte_pcapng_write_packets(pcapng, pkts, n) < 0)) {
+ CAPTURE_LOG(NOTICE, "write to fifo failed: %s", strerror(errno));
+ break;
+ }
+ }
+
+ rte_atomic_store_explicit(&cap->running, false, rte_memory_order_relaxed);
+
+ /* Capture exiting */
+ CAPTURE_LOG(INFO, "capture thread stopping");
+ rte_pcapng_close(pcapng);
+
+error:
+
+ capture_cb_cleanup(cap);
+ capture_unlink(cap);
+ capture_free(cap);
+
+ return NULL;
+}
+
+/*
+ * Callback handler for telemetry library to start capture.
+ *
+ * Need to handle: <iface>,snaplen=<n>,filter=<str>
+ */
+static int
+capture_start_req(const char *cmd, const char *params, void *arg __rte_unused,
+ const int *fds, unsigned int n_fds, struct rte_tel_data *d)
+{
+ struct capture *cap = NULL;
+ struct capture_config cfg = { };
+ struct rte_eth_dev_info dev_info;
+
+ CAPTURE_LOG(DEBUG, "telemetry: %s %s", cmd, params);
+
+ if (rte_eal_process_type() != RTE_PROC_PRIMARY) {
+ CAPTURE_LOG(ERR, "capture can only be started from primary");
+ goto error;
+ }
+
+ if (params == NULL || !isdigit((unsigned char)*params))
+ goto error;
+
+ /* Note: params is const so need non-const copy for parsing */
+ if (parse_params(strdupa(params), &cfg) < 0)
+ goto error;
+
+ /* Need one fd for output */
+ if (n_fds != 1) {
+ if (n_fds == 0)
+ CAPTURE_LOG(ERR, "missing output fd");
+ else
+ CAPTURE_LOG(ERR, "too many fds");
+ goto error;
+ }
+
+ /* Lookup number of queues etc, also validates port_id */
+ if (rte_eth_dev_info_get(cfg.port_id, &dev_info) < 0) {
+ CAPTURE_LOG(ERR, "can not get info for port %u", cfg.port_id);
+ goto error;
+ }
+
+ int socket_id = rte_eth_dev_socket_id(cfg.port_id);
+ if (socket_id < 0) {
+ CAPTURE_LOG(NOTICE, "could not determine socket for port %u", cfg.port_id);
+ socket_id = SOCKET_ID_ANY;
+ }
+
+ cap = capture_alloc(&cfg, fds[0], &dev_info, socket_id);
+ if (cap == NULL)
+ return -1; /* fd already closed by capture_alloc */
+
+ /*
+ * Publish into the active list before starting the drain thread so the
+ * thread is guaranteed to find itself there when it removes itself on
+ * exit (it may exit immediately, e.g. if the FIFO reader is already
+ * gone). On thread-create failure we undo the insertion here.
+ */
+ unsigned int idx = cap->idx;
+ capture_link(cap);
+
+ /*
+ * Make a new thread to do the capture work
+ * Thread will inherit affinity from the telemetry handler that calls us
+ */
+ pthread_t thread_id;
+ if (pthread_create(&thread_id, NULL, capture_thread, cap) != 0) {
+ CAPTURE_LOG(ERR, "Capture thread start failed: %s", strerror(errno));
+
+ close(cap->fd);
+ capture_unlink(cap);
+ capture_cb_cleanup(cap);
+ capture_free(cap);
+ return -1;
+ }
+
+ /* Nothing will be waiting for this thread. */
+ pthread_detach(thread_id);
+
+ /* Return id back for later use. */
+ rte_tel_data_start_dict(d);
+ rte_tel_data_add_dict_uint(d, "id", idx);
+ rte_tel_data_add_dict_string(d, "status", "running");
+ return 0;
+
+error:
+ for (unsigned int i = 0; i < n_fds; i++)
+ close(fds[i]);
+ return -1;
+}
+
+
+
+/* Telemetry: stop active capture. */
+static int
+capture_stop_req(const char *cmd, const char *params, struct rte_tel_data *d)
+{
+
+ CAPTURE_LOG(DEBUG, "telemetry %s %s", cmd, params);
+
+ if (params == NULL || *params == '\0')
+ return -EINVAL;
+
+ errno = 0;
+ char *endp;
+ unsigned long idx = strtoul(params, &endp, 10);
+ if (errno != 0 || *endp != '\0')
+ return -EINVAL;
+
+ rte_spinlock_lock(&capture_lock);
+ struct capture *cap;
+ TAILQ_FOREACH(cap, &capture_list, next) {
+ if (cap->idx == idx)
+ break;
+ }
+ if (cap == NULL) {
+ CAPTURE_LOG(ERR, "Capture index %lu not found", idx);
+ rte_spinlock_unlock(&capture_lock);
+ return -ENOENT;
+ }
+ rte_atomic_store_explicit(&cap->running, false, rte_memory_order_relaxed);
+ rte_spinlock_unlock(&capture_lock);
+ rte_tel_data_start_dict(d);
+ rte_tel_data_add_dict_string(d, "status", "stopped");
+ return 0;
+}
+
+/* Telemetry: list the ids of all active captures. */
+static int
+capture_list_req(const char *cmd __rte_unused, const char *params __rte_unused,
+ struct rte_tel_data *d)
+{
+ struct capture *cap;
+
+ CAPTURE_LOG(DEBUG, "telemetry %s %s", cmd, params);
+ rte_tel_data_start_array(d, RTE_TEL_UINT_VAL);
+
+ rte_spinlock_lock(&capture_lock);
+ TAILQ_FOREACH(cap, &capture_list, next)
+ rte_tel_data_add_array_uint(d, cap->idx);
+ rte_spinlock_unlock(&capture_lock);
+
+ return 0;
+}
+
+/* Aggregate per-queue counters of a capture instance. */
+struct capture_total {
+ uint64_t accepted;
+ uint64_t filtered;
+ uint64_t nombuf;
+ uint64_t ringfull;
+};
+
+static void
+capture_sum_one(struct capture_total *t, const struct capture_stats *s)
+{
+ t->accepted += rte_atomic_load_explicit(&s->accepted, rte_memory_order_relaxed);
+ t->filtered += rte_atomic_load_explicit(&s->filtered, rte_memory_order_relaxed);
+ t->nombuf += rte_atomic_load_explicit(&s->nombuf, rte_memory_order_relaxed);
+ t->ringfull += rte_atomic_load_explicit(&s->ringfull, rte_memory_order_relaxed);
+}
+
+/* Sum the rx and tx counters across all queues. Caller holds capture_lock. */
+static void
+capture_sum_stats(const struct capture *cap, struct capture_total *t)
+{
+ *t = (struct capture_total){ };
+
+ for (unsigned int q = 0; q < cap->rx_queues; q++)
+ capture_sum_one(t, &cap->cbs[q].rx_cb.stats);
+ for (unsigned int q = 0; q < cap->tx_queues; q++)
+ capture_sum_one(t, &cap->cbs[q].tx_cb.stats);
+}
+
+/* Telemetry: report configuration and counters for one capture. */
+static int
+capture_stats_req(const char *cmd, const char *params,
+ struct rte_tel_data *d)
+{
+ struct capture *cap;
+ struct capture_total t;
+ char *endp;
+
+ CAPTURE_LOG(DEBUG, "telemetry %s %s", cmd, params);
+ if (params == NULL || *params == '\0')
+ return -EINVAL;
+
+ errno = 0;
+ unsigned long idx = strtoul(params, &endp, 10);
+ if (errno != 0 || *endp != '\0')
+ return -EINVAL;
+
+ /* Find the instance and snapshot what we need while holding the lock. */
+ rte_spinlock_lock(&capture_lock);
+ TAILQ_FOREACH(cap, &capture_list, next) {
+ if (cap->idx == idx)
+ break;
+ }
+ if (cap == NULL) {
+ CAPTURE_LOG(ERR, "Capture index %lu not found", idx);
+ rte_spinlock_unlock(&capture_lock);
+ return -ENOENT;
+ }
+
+ rte_tel_data_start_dict(d);
+ rte_tel_data_add_dict_uint(d, "port_id", cap->port_id);
+ if (cap->filter)
+ rte_tel_data_add_dict_string(d, "filter",
+ __rte_capture_filter_string(cap->filter));
+ rte_tel_data_add_dict_int(d, "running",
+ rte_atomic_load_explicit(&cap->running,
+ rte_memory_order_relaxed));
+ rte_tel_data_add_dict_uint(d, "snaplen", cap->snaplen);
+ rte_tel_data_add_dict_uint(d, "rx_queues", cap->rx_queues);
+ rte_tel_data_add_dict_uint(d, "tx_queues", cap->tx_queues);
+ capture_sum_stats(cap, &t);
+ rte_spinlock_unlock(&capture_lock);
+
+ rte_tel_data_add_dict_uint(d, "accepted", t.accepted);
+ rte_tel_data_add_dict_uint(d, "filtered", t.filtered);
+ rte_tel_data_add_dict_uint(d, "nombuf", t.nombuf);
+ rte_tel_data_add_dict_uint(d, "ringfull", t.ringfull);
+
+ return 0;
+}
+
+RTE_INIT(capture_telemetry)
+{
+ rte_telemetry_register_cmd("/ethdev/capture/list", capture_list_req,
+ "List ids of active captures. Takes no parameters.");
+ rte_telemetry_register_cmd("/ethdev/capture/stats", capture_stats_req,
+ "Report configuration and counters for a capture. Parameters: id");
+ rte_telemetry_register_cmd_fd_arg("/ethdev/capture/start", capture_start_req, NULL,
+ "Start capture."
+ "Parameters: port_id,snaplen=N(optional),filter=string(optional)");
+ rte_telemetry_register_cmd("/ethdev/capture/stop", capture_stop_req,
+ "Stop an active capture. Parameters: id");
+}
diff --git a/lib/capture/capture_impl.h b/lib/capture/capture_impl.h
new file mode 100644
index 0000000000..adee734b6c
--- /dev/null
+++ b/lib/capture/capture_impl.h
@@ -0,0 +1,56 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2026 Stephen Hemminger
+ */
+#ifndef CAPTURE_IMPL_H
+#define CAPTURE_IMPL_H
+
+#define RTE_LOGTYPE_CAPTURE rte_capture_logtype
+extern int rte_capture_logtype;
+#define CAPTURE_LOG(level, ...) \
+ RTE_LOG_LINE_PREFIX(level, CAPTURE, "%s(): ", __func__, __VA_ARGS__)
+
+struct rte_capture_filter;
+
+#ifdef RTE_HAS_LIBPCAP
+struct rte_capture_filter *__rte_capture_filter_create(const char *str);
+const char *__rte_capture_filter_string(struct rte_capture_filter *filter);
+void __rte_capture_filter_free(struct rte_capture_filter *filter);
+uint64_t __rte_capture_filter(const struct rte_capture_filter *filter, struct rte_mbuf *mb);
+
+#else /* !RTE_HAS_LIBPCAP */
+
+/* Stub version if pcap is not available */
+static inline struct rte_capture_filter *
+__rte_capture_filter_create(const char *str)
+{
+ RTE_SET_USED(str);
+ return NULL; /* not supported */
+}
+
+static inline const char *
+__rte_capture_filter_string(struct rte_capture_filter *filter)
+{
+ RTE_SET_USED(filter);
+ return NULL;
+}
+
+static inline void
+__rte_capture_filter_free(struct rte_capture_filter *filter)
+{
+ RTE_SET_USED(filter);
+}
+
+/*
+ * This will be zero if the packet doesn't match the filter and non-zero if
+ * the packet matches the filter.
+ */
+static inline uint64_t
+__rte_capture_filter(const struct rte_capture_filter *filter, struct rte_mbuf *mb)
+{
+ RTE_SET_USED(filter);
+ RTE_SET_USED(mb);
+ return 1;
+}
+
+#endif /* !RTE_HAS_LIBPCAP */
+#endif /* CAPTURE_IMPL_H */
diff --git a/lib/capture/filter.c b/lib/capture/filter.c
new file mode 100644
index 0000000000..ecb5e8a765
--- /dev/null
+++ b/lib/capture/filter.c
@@ -0,0 +1,108 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2026 Stephen Hemminger
+ */
+
+#include <stdint.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <pcap/pcap.h>
+
+#include <rte_bpf.h>
+#include <rte_errno.h>
+#include <rte_log.h>
+#include <rte_malloc.h>
+#include <rte_mbuf.h>
+
+#include "capture_impl.h"
+
+struct rte_capture_filter {
+ struct rte_bpf *bpf;
+ struct rte_bpf_jit jit;
+ char expr[]; /* original filter text */
+};
+
+/*
+ * Convert text string into an eBPF program
+ */
+struct rte_capture_filter *
+__rte_capture_filter_create(const char *filter)
+{
+ struct rte_capture_filter *flt = NULL;
+ struct rte_bpf_prm *prm = NULL;
+
+ /* libpcap needs a handle */
+ pcap_t *pcap = pcap_open_dead(DLT_EN10MB, UINT16_MAX);
+ if (!pcap) {
+ CAPTURE_LOG(ERR, "pcap: can not open handle");
+ return NULL;
+ }
+
+ flt = rte_zmalloc("capture_filter", sizeof(*flt) + strlen(filter) + 1, 0);
+ if (flt == NULL) {
+ CAPTURE_LOG(ERR, "capture filter alloc failed");
+ goto error;
+ }
+
+ /* convert string to cBPF program */
+ struct bpf_program bf;
+ if (pcap_compile(pcap, &bf, filter, 1, PCAP_NETMASK_UNKNOWN) != 0) {
+ CAPTURE_LOG(ERR, "pcap: can not compile filter: %s",
+ pcap_geterr(pcap));
+ goto error;
+ }
+ strcpy(flt->expr, filter);
+
+ /* convert cBPF to eBPF */
+ prm = rte_bpf_convert(&bf);
+ pcap_freecode(&bf); /* drop the cBPF program */
+
+ if (prm == NULL) {
+ CAPTURE_LOG(ERR, "BPF convert interface %s(%d)",
+ rte_strerror(rte_errno), rte_errno);
+ goto error;
+ }
+
+ flt->bpf = rte_bpf_load(prm);
+ if (flt->bpf == NULL) {
+ CAPTURE_LOG(ERR, "BPF load failed: %s(%d)",
+ rte_strerror(rte_errno), rte_errno);
+ goto error;
+ }
+
+ rte_bpf_get_jit(flt->bpf, &flt->jit);
+ if (flt->jit.func == NULL)
+ CAPTURE_LOG(NOTICE, "No JIT available for filter");
+
+ pcap_close(pcap);
+ rte_free(prm);
+ return flt;
+
+error:
+ pcap_close(pcap);
+ rte_free(prm);
+ rte_free(flt);
+ return NULL;
+}
+
+const char *__rte_capture_filter_string(struct rte_capture_filter *filter)
+{
+ return filter ? filter->expr : NULL;
+}
+
+void __rte_capture_filter_free(struct rte_capture_filter *filter)
+{
+ if (filter == NULL)
+ return;
+
+ rte_bpf_destroy(filter->bpf);
+ rte_free(filter);
+}
+
+uint64_t __rte_capture_filter(const struct rte_capture_filter *filter, struct rte_mbuf *mb)
+{
+ if (filter->jit.func)
+ return filter->jit.func(mb);
+ else
+ return rte_bpf_exec(filter->bpf, mb);
+}
diff --git a/lib/capture/meson.build b/lib/capture/meson.build
new file mode 100644
index 0000000000..4dbe0d1a78
--- /dev/null
+++ b/lib/capture/meson.build
@@ -0,0 +1,19 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2026 Stephen Hemminger
+
+if is_windows
+ build = false
+ reason = 'not supported on Windows'
+ subdir_done()
+endif
+
+sources = files('capture.c')
+
+deps += ['ethdev', 'pcapng', 'bpf']
+
+if dpdk_conf.has('RTE_HAS_LIBPCAP')
+ sources += files('filter.c')
+ ext_deps += pcap_dep
+else
+ warning('libpcap is missing, capture filtering will be disabled')
+endif
diff --git a/lib/meson.build b/lib/meson.build
index af5c160cb8..6d9992f61f 100644
--- a/lib/meson.build
+++ b/lib/meson.build
@@ -49,6 +49,7 @@ libraries = [
'lpm',
'member',
'pcapng',
+ 'capture', # depends on pcapng and bpf
'power',
'rawdev',
'regexdev',
--
2.53.0
next prev parent reply other threads:[~2026-06-09 21:05 UTC|newest]
Thread overview: 5+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-06-09 21:02 [RFC 0/4] alternative capture mechanism Stephen Hemminger
2026-06-09 21:02 ` [RFC 1/4] telemetry: allow commands to receive file descriptors Stephen Hemminger
2026-06-09 21:02 ` Stephen Hemminger [this message]
2026-06-09 21:02 ` [RFC 3/4] test: add test for capture hooks Stephen Hemminger
2026-06-09 21:02 ` [RFC 4/4] usertools/dpdk-wireshark-extcap.py: script for external capture Stephen Hemminger
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260609210540.768074-3-stephen@networkplumber.org \
--to=stephen@networkplumber.org \
--cc=anatoly.burakov@intel.com \
--cc=dev@dpdk.org \
--cc=reshma.pattan@intel.com \
--cc=thomas@monjalon.net \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox