DPDK-dev Archive on lore.kernel.org
 help / color / mirror / Atom feed
* Re: Why IP_PIPELINE is faster than L2FWD
From: Royce Niu @ 2016-12-22 12:48 UTC (permalink / raw)
  To: Bruce Richardson; +Cc: Royce Niu, dev
In-Reply-To: <20161222111528.GA11104@bricha3-MOBL3.ger.corp.intel.com>

But, actually, L3FWD of IP_PIPELINE is also faster than stock L2FWD, which
also modifies mac addr. How can explain this?

Actually, I want to know why IP_PIPELINE is much faster and I can learn
from IP_PIPELINE and make our own program.

But, the documentation of that is not detailed enough. if it is possible,
could you tell me where is the key to boost? Thanks!

On Thu, Dec 22, 2016 at 7:15 PM, Bruce Richardson <
bruce.richardson@intel.com> wrote:

> On Thu, Dec 22, 2016 at 12:18:12AM +0800, Royce Niu wrote:
> > Hi all,
> >
> > I tested default L2FWD and IP_PIPELINE (pass-through). The throughput of
> > IP_PIPELINE is higher immensely.
> >
> > There are only two virtual NICs in KVM. The experiment is just moving
> > packet from vNIC0  to vNIC1. I think the function is so simple. Why L2FWD
> > is much slower?
> >
> > How can I improve L2FWD, to make L2FWD faster?
> >
> Is IP_PIPELINE in passthrough mode modifying the packets? L2FWD swaps
> the mac addresses on each packet as it processes them, which can slow it
> down. L2FWD is also more an example of how the APIs work than anything
> else. For fastest possible port-to-port forwarding, testpmd should give
> the highest performance.
>
> /Bruce
>



-- 
Regards,

Royce

^ permalink raw reply

* Re: [PATCH v2 00/25] Generic flow API (rte_flow)
From: Adrien Mazarguil @ 2016-12-22 12:48 UTC (permalink / raw)
  To: Simon Horman; +Cc: dev
In-Reply-To: <20161221161914.GA14515@penelope.horms.nl>

On Wed, Dec 21, 2016 at 05:19:16PM +0100, Simon Horman wrote:
> On Fri, Dec 16, 2016 at 05:24:57PM +0100, Adrien Mazarguil wrote:
> > As previously discussed in RFC v1 [1], RFC v2 [2], with changes
> > described in [3] (also pasted below), here is the first non-draft series
> > for this new API.
> > 
> > Its capabilities are so generic that its name had to be vague, it may be
> > called "Generic flow API", "Generic flow interface" (possibly shortened
> > as "GFI") to refer to the name of the new filter type, or "rte_flow" from
> > the prefix used for its public symbols. I personally favor the latter.
> > 
> > While it is currently meant to supersede existing filter types in order for
> > all PMDs to expose a common filtering/classification interface, it may
> > eventually evolve to cover the following ideas as well:
> > 
> > - Rx/Tx offloads configuration through automatic offloads for specific
> >   packets, e.g. performing checksum on TCP packets could be expressed with
> >   an egress rule with a TCP pattern and a kind of checksum action.
> > 
> > - RSS configuration (already defined actually). Could be global or per rule
> >   depending on hardware capabilities.
> > 
> > - Switching configuration for devices with many physical ports; rules doing
> >   both ingress and egress could even be used to completely bypass software
> >   if supported by hardware.

Hi Simon,

> Hi Adrien,
> 
> thanks for this valuable work.
> 
> I would like to ask some high level questions on the proposal.
> I apologise in advance if any of these questions are based on a
> misunderstanding on my part.
> 
> * I am wondering about provisions for actions to modify packet data or
>   metadata.  I do see support for marking packets. Is the implication of
>   this that the main focus is to provide a mechanism for classification
>   with the assumption that any actions - other than drop and variants of
>   output - would be performed elsewhere?

I'm not sure to understand what you mean by "elsewhere" here. Packet marking
as currently defined is a purely ingress action, i.e. HW matches some packet
and returns a user-defined tag in related meta-data that the PMD copies to
the appropriate mbuf structure field before returning it to the application.

There is provision for egress rules and I wrote down a few ideas describing
how they could be useful (as above), however they remain to be defined.

>   If so I would observe that this seems somewhat limiting in the case of
>   hardware that can perform a richer set of actions. And seems particularly
>   limiting on egress as there doesn't seem anywhere else that other actions
>   could be performed after classification is performed by this API.

A single flow rule may contain any number of distinct actions. For egress,
it means you could wrap matching packets in VLAN and VXLAN at once.

If you wanted to perform the same action twice on matching packets, you'd
have to provide two rules with defined priorities and use a non-terminating
action for the first one:

- Rule with priority 0: match UDP -> add VLAN 42, passthrough
- Rule with priority 1: match UDP -> add VLAN 64, terminating

This is how automatic QinQ would be defined for outgoing UDP packets.

> * I am curious to know what considerations have been given to supporting          support for tunnelling (encapsulation and decapsulation of e.g. VXLAN),
>   tagging (pushing and popping e.g. VLANs), and labels (pushing or popping
>   e.g. MPLS).
> 
>   Such features seem would useful for application of this work in a variety
>   of situations including overlay networks and VNFs.

This is also what I had in mind and we'd only have to define specific
ingress/egress actions for these. Currently rte_flow only implements a basic
set of existing features from the legacy filtering framework, but is meant
to be extended.

> * I am wondering if any thought has gone into supporting matching on the
>   n-th instance of a field that may appear more than once: e.g. VLAN tag.

Sure, please see the latest documentation [1] and testpmd examples [2].
Pattern items being stacked in the same order as protocol layers, maching
specific QinQ traffic and redirecting it to some queue could be expressed
with something like:

 testpmd> flow create 0 ingress pattern eth / vlan vid is 64 / vlan vid is 42 / end 
    actions queue 6 / end

Such a rule is translated as-is to rte_flow pattern items and action
structures.

> With the above questions in mind I am curious to know what use-cases
> the proposal is targeted at.

Well, it should be easier to answer if you have a specific use-case in mind
you would like to support but that cannot be expressed with the API as
defined in [1], in which case please share it with the community.

[1] http://dpdk.org/ml/archives/dev/2016-December/052954.html
[2] http://dpdk.org/ml/archives/dev/2016-December/052975.html

-- 
Adrien Mazarguil
6WIND

^ permalink raw reply

* Re: [PATCH v2 1/5] lib: distributor performance enhancements
From: Jerin Jacob @ 2016-12-22 12:47 UTC (permalink / raw)
  To: David Hunt; +Cc: dev, bruce.richardson
In-Reply-To: <1482381428-148094-2-git-send-email-david.hunt@intel.com>

On Thu, Dec 22, 2016 at 04:37:04AM +0000, David Hunt wrote:
> Now sends bursts of up to 8 mbufs to each worker, and tracks
> the in-flight flow-ids (atomic scheduling)
> 
> New file with a new api, similar to the old API except with _burst
> at the end of the function names
> 
> Signed-off-by: David Hunt <david.hunt@intel.com>
> +
> +int
> +rte_distributor_get_pkt_burst(struct rte_distributor_burst *d,
> +		unsigned int worker_id, struct rte_mbuf **pkts,
> +		struct rte_mbuf **oldpkt, unsigned int return_count)
> +{
> +	unsigned int count;
> +	uint64_t retries = 0;
> +
> +	rte_distributor_request_pkt_burst(d, worker_id, oldpkt, return_count);
> +
> +	count = rte_distributor_poll_pkt_burst(d, worker_id, pkts);
> +	while (count == 0) {
> +		rte_pause();
> +		retries++;
> +		if (retries > 1000) {
> +			retries = 0;

This retries write may not have any significance as it just before the
return

> +			return 0;
> +		}
> +		uint64_t t = __rdtsc()+100;

Use rte_ version of __rdtsc.

> +
> +		while (__rdtsc() < t)
> +			rte_pause();
> +
> +		count = rte_distributor_poll_pkt_burst(d, worker_id, pkts);
> +	}
> +	return count;
> +}
> +
> +int
> +rte_distributor_return_pkt_burst(struct rte_distributor_burst *d,
> +		unsigned int worker_id, struct rte_mbuf **oldpkt, int num)
> +{
> +	struct rte_distributor_buffer_burst *buf = &d->bufs[worker_id];
> +	unsigned int i;
> +
> +	for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
> +		/* Switch off the return bit first */
> +		buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
> +
> +	for (i = num; i-- > 0; )
> +		buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<
> +			RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
> +
> +	/* set the GET_BUF but even if we got no returns */
> +	buf->retptr64[0] |= RTE_DISTRIB_GET_BUF;
> +
> +	return 0;
> +}
> +
> +#if RTE_MACHINE_CPUFLAG_SSE2
> +static inline void

Move SSE version of the code to separate file so that later other SIMD arch
specific version like NEON can be incorporated.

> +find_match_sse2(struct rte_distributor_burst *d,
> +			uint16_t *data_ptr,
> +			uint16_t *output_ptr)
> +{
> +	/* Setup */
> +	__m128i incoming_fids;
> +	__m128i inflight_fids;
> +	__m128i preflight_fids;
> +	__m128i wkr;
> +	__m128i mask1;
> +	__m128i mask2;
> +	__m128i output;
> +	struct rte_distributor_backlog *bl;
> +
> +	/*
> +	 * Function overview:
> +	 * 2. Loop through all worker ID's
> +	 *  2a. Load the current inflights for that worker into an xmm reg
> +	 *  2b. Load the current backlog for that worker into an xmm reg
> +	 *  2c. use cmpestrm to intersect flow_ids with backlog and inflights
> +	 *  2d. Add any matches to the output
> +	 * 3. Write the output xmm (matching worker ids).
> +	 */
> +
> +
> +	output = _mm_set1_epi16(0);
> +	incoming_fids = _mm_load_si128((__m128i *)data_ptr);
> +
> +	for (uint16_t i = 0; i < d->num_workers; i++) {
> +		bl = &d->backlog[i];
> +
> +		inflight_fids =
> +			_mm_load_si128((__m128i *)&(d->in_flight_tags[i]));
> +		preflight_fids =
> +			_mm_load_si128((__m128i *)(bl->tags));
> +
> +		/*
> +		 * Any incoming_fid that exists anywhere in inflight_fids will
> +		 * have 0xffff in same position of the mask as the incoming fid
> +		 * Example (shortened to bytes for brevity):
> +		 * incoming_fids   0x01 0x02 0x03 0x04 0x05 0x06 0x07 0x08
> +		 * inflight_fids   0x03 0x05 0x07 0x00 0x00 0x00 0x00 0x00
> +		 * mask            0x00 0x00 0xff 0x00 0xff 0x00 0xff 0x00
> +		 */
> +
> +		mask1 = _mm_cmpestrm(inflight_fids, 8, incoming_fids, 8,
> +			_SIDD_UWORD_OPS |
> +			_SIDD_CMP_EQUAL_ANY |
> +			_SIDD_UNIT_MASK);
> +		mask2 = _mm_cmpestrm(preflight_fids, 8, incoming_fids, 8,
> +			_SIDD_UWORD_OPS |
> +			_SIDD_CMP_EQUAL_ANY |
> +			_SIDD_UNIT_MASK);
> +
> +		mask1 = _mm_or_si128(mask1, mask2);
> +		/*
> +		 * Now mask contains 0xffff where there's a match.
> +		 * Next we need to store the worker_id in the relevant position
> +		 * in the output.
> +		 */
> +
> +		wkr = _mm_set1_epi16(i+1);
> +		mask1 = _mm_and_si128(mask1, wkr);
> +		output = _mm_or_si128(mask1, output);
> +	}
> +
> +/* process a set of packets to distribute them to workers */
> +int
> +rte_distributor_process_burst(struct rte_distributor_burst *d,
> +		struct rte_mbuf **mbufs, unsigned int num_mbufs)
> +{
> +	unsigned int next_idx = 0;
> +	static unsigned int wkr;
> +	struct rte_mbuf *next_mb = NULL;
> +	int64_t next_value = 0;
> +	uint16_t new_tag = 0;
> +	uint16_t flows[8] __rte_cache_aligned;

The const 8 has been used down in the function also. Please replace with macro

> +	//static int iter=0;

Please remove the test-code with // across the patch.

> +
> +	if (unlikely(num_mbufs == 0)) {
> +		/* Flush out all non-full cache-lines to workers. */
> +		for (unsigned int wid = 0 ; wid < d->num_workers; wid++) {
> +			if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF)) {
> +				release(d, wid);
> +				handle_returns(d, wid);
> +			}
> +		}
> +		return 0;
> +	}
> +
> +	while (next_idx < num_mbufs) {
> +		uint16_t matches[8];
> +		int pkts;
> +
> +		if (d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF)
> +			d->bufs[wkr].count = 0;
> +
> +		for (unsigned int i = 0; i < RTE_DIST_BURST_SIZE; i++) {
> +			if (mbufs[next_idx + i]) {
> +				/* flows have to be non-zero */
> +				flows[i] = mbufs[next_idx + i]->hash.usr | 1;
> +			} else
> +				flows[i] = 0;
> +		}
> +
> +		switch (d->dist_match_fn) {
> +#ifdef RTE_MACHINE_CPUFLAG_SSE2

Is this conditional compilation flag is really required ? i.e
RTE_DIST_MATCH_SSE will not enabled in non SSE case

> +		case RTE_DIST_MATCH_SSE:
> +			find_match_sse2(d, &flows[0], &matches[0]);
> +			break;
> +#endif
> +		default:
> +			find_match_scalar(d, &flows[0], &matches[0]);
> +		}
> +
> +		/*
> +		 * Matches array now contain the intended worker ID (+1) of
> +		 * the incoming packets. Any zeroes need to be assigned
> +		 * workers.
> +		 */
> +
> +		if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE)
> +			pkts = num_mbufs - next_idx;
> +		else
> +			pkts = RTE_DIST_BURST_SIZE;
> +
> +		for (int j = 0; j < pkts; j++) {
> +
> +			next_mb = mbufs[next_idx++];
> +			next_value = (((int64_t)(uintptr_t)next_mb) <<
> +					RTE_DISTRIB_FLAG_BITS);
> +			/*
> +			 * User is advocated to set tag vaue for each
> +			 * mbuf before calling rte_distributor_process.
> +			 * User defined tags are used to identify flows,
> +			 * or sessions.
> +			 */
> +			/* flows MUST be non-zero */
> +			new_tag = (uint16_t)(next_mb->hash.usr) | 1;
> +
> +			/*
> +			 * Using the next line will cause the find_match
> +			 * function to be optimised out, making this function
> +			 * do parallel (non-atomic) distribution
> +			 */
> +			//matches[j] = 0;

test code with //

^ permalink raw reply

* Re: [PATCH v2] ethdev: cleanup device ops struct whitespace
From: Thomas Monjalon @ 2016-12-22 12:46 UTC (permalink / raw)
  To: Ferruh Yigit; +Cc: dev
In-Reply-To: <20161222115330.7164-1-ferruh.yigit@intel.com>

2016-12-22 11:53, Ferruh Yigit:
> To make it easy to comment to latest struct, copy-paste here:
> [With some extra notes]
> 
> struct eth_dev_ops {
> 	eth_dev_configure_t        dev_configure; /**< Configure device. */
> 	eth_dev_start_t            dev_start;     /**< Start device. */
> 	eth_dev_stop_t             dev_stop;      /**< Stop device. */
> 	eth_dev_set_link_up_t      dev_set_link_up;   /**< Device link up. */
> 	eth_dev_set_link_down_t    dev_set_link_down; /**< Device link down. */
> 	eth_dev_close_t            dev_close;     /**< Close device. */
> 	eth_promiscuous_enable_t   promiscuous_enable; /**< Promiscuous ON. */
> 	eth_promiscuous_disable_t  promiscuous_disable;/**< Promiscuous OFF. */
> 	eth_allmulticast_enable_t  allmulticast_enable;/**< RX multicast ON. */
> 	eth_allmulticast_disable_t allmulticast_disable;/**< RX multicast OF. */
> 	eth_link_update_t          link_update;   /**< Get device link state. */
> 
> 	eth_stats_get_t            stats_get;     /**< Get generic device statistics. */
> 	eth_stats_reset_t          stats_reset;   /**< Reset generic device statistics. */
> 	eth_xstats_get_t           xstats_get;    /**< Get extended device statistics. */
> 	eth_xstats_reset_t         xstats_reset;  /**< Reset extended device statistics. */
> 	eth_xstats_get_names_t     xstats_get_names;
> 	/**< Get names of extended statistics. */
> 	eth_queue_stats_mapping_set_t queue_stats_mapping_set;
> 	/**< Configure per queue stat counter mapping. */
> 
> 	eth_dev_infos_get_t        dev_infos_get; /**< Get device info. */
> 	eth_dev_supported_ptypes_get_t dev_supported_ptypes_get;
> 	/**< Get packet types supported and identified by device. */
> 
> 	mtu_set_t                  mtu_set;       /**< Set MTU. */
> 
> 	vlan_filter_set_t          vlan_filter_set; /**< Filter VLAN Setup. */
> 	vlan_tpid_set_t            vlan_tpid_set; /**< Outer/Inner VLAN TPID Setup. */
> 	vlan_strip_queue_set_t     vlan_strip_queue_set; /**< VLAN Stripping on queue. */
> 	vlan_offload_set_t         vlan_offload_set; /**< Set VLAN Offload. */
> 	vlan_pvid_set_t            vlan_pvid_set; /**< Set port based TX VLAN insertion. */
> 
> 	eth_queue_start_t          rx_queue_start;/**< Start RX for a queue. */
> 	eth_queue_stop_t           rx_queue_stop; /**< Stop RX for a queue. */
> 	eth_queue_start_t          tx_queue_start;/**< Start TX for a queue. */
> 	eth_queue_stop_t           tx_queue_stop; /**< Stop TX for a queue. */
> 	eth_rx_queue_setup_t       rx_queue_setup;/**< Set up device RX queue. */
> 	eth_queue_release_t        rx_queue_release; /**< Release RX queue. */
> 	eth_rx_queue_count_t       rx_queue_count;/**< Get Rx queue count. */
> 	eth_rx_descriptor_done_t   rx_descriptor_done; /**< Check rxd DD bit. */
> 	eth_rx_enable_intr_t       rx_queue_intr_enable;  /**< Enable Rx queue interrupt. */
> 	eth_rx_disable_intr_t      rx_queue_intr_disable; /**< Disable Rx queue interrupt. */
> 	eth_tx_queue_setup_t       tx_queue_setup;/**< Set up device TX queue. */
> 	eth_queue_release_t        tx_queue_release; /**< Release TX queue. */
> 
> 	eth_dev_led_on_t           dev_led_on;    /**< Turn on LED. */	[Really need these comments?]
> 	eth_dev_led_off_t          dev_led_off;   /**< Turn off LED. */
> 
> 	flow_ctrl_get_t            flow_ctrl_get; /**< Get flow control. */
> 	flow_ctrl_set_t            flow_ctrl_set; /**< Setup flow control. */
> 	priority_flow_ctrl_set_t   priority_flow_ctrl_set; /**< Setup priority flow control. */
> 
> 	eth_mac_addr_remove_t      mac_addr_remove; /**< Remove MAC address. */
> 	eth_mac_addr_add_t         mac_addr_add;  /**< Add a MAC address. */
> 	eth_mac_addr_set_t         mac_addr_set;  /**< Set a MAC address. */
> 	eth_set_mc_addr_list_t     set_mc_addr_list; /**< set list of mcast addrs. */

Could we group the MAC functions with promiscuous and allmulticast?

> 	eth_uc_hash_table_set_t    uc_hash_table_set; /**< Set Unicast Table Array. */
> 	eth_uc_all_hash_table_set_t uc_all_hash_table_set; /**< Set Unicast hash bitmap. */
> 
> 	eth_mirror_rule_set_t	   mirror_rule_set; /**< Add a traffic mirror rule. */
> 	eth_mirror_rule_reset_t	   mirror_rule_reset; /**< reset a traffic mirror rule. */
> 
> 	[Following already removed from next-net]
> 	eth_set_vf_rx_mode_t       set_vf_rx_mode;/**< Set VF RX mode. */
> 	eth_set_vf_rx_t            set_vf_rx;     /**< enable/disable a VF receive. */
> 	eth_set_vf_tx_t            set_vf_tx;     /**< enable/disable a VF transmit. */
> 	eth_set_vf_vlan_filter_t   set_vf_vlan_filter; /**< Set VF VLAN filter. */
> 	eth_set_vf_rate_limit_t    set_vf_rate_limit; /**< Set VF rate limit. */
> 
> 	eth_udp_tunnel_port_add_t  udp_tunnel_port_add; /** Add UDP tunnel port. */
> 	eth_udp_tunnel_port_del_t  udp_tunnel_port_del; /** Del UDP tunnel port. */
> 
> 	eth_set_queue_rate_limit_t set_queue_rate_limit; /**< Set queue rate limit. */
> 
> 	rss_hash_update_t          rss_hash_update; /** Configure RSS hash protocols. */
> 	rss_hash_conf_get_t        rss_hash_conf_get; /** Get current RSS hash configuration. */
> 	reta_update_t              reta_update;   /** Update redirection table. */
> 	reta_query_t               reta_query;    /** Query redirection table. */
> 
> 	eth_get_reg_t              get_reg;           /**< Get registers. */
> 	eth_get_eeprom_length_t    get_eeprom_length; /**< Get eeprom length. */
> 	eth_get_eeprom_t           get_eeprom;        /**< Get eeprom data. */
> 	eth_set_eeprom_t           set_eeprom;        /**< Set eeprom. */
> 
> 	/* bypass control */
> 	bypass_init_t              bypass_init;
> 	bypass_state_set_t         bypass_state_set;
> 	bypass_state_show_t        bypass_state_show;
> 	bypass_event_set_t         bypass_event_set;
> 	bypass_event_show_t        bypass_event_show;
> 	bypass_wd_timeout_set_t    bypass_wd_timeout_set;
> 	bypass_wd_timeout_show_t   bypass_wd_timeout_show;
> 	bypass_ver_show_t          bypass_ver_show;
> 	bypass_wd_reset_t          bypass_wd_reset;
> 
> 	eth_filter_ctrl_t          filter_ctrl; /**< common filter control. */
> 
> 	eth_rxq_info_get_t         rxq_info_get; /**< retrieve RX queue information. */
> 	eth_txq_info_get_t         txq_info_get; /**< retrieve TX queue information. */

It can be grouped with dev_infos_get

> 	eth_get_dcb_info           get_dcb_info; /** Get DCB information. */
> 
> 	eth_timesync_enable_t      timesync_enable;
> 	/** Turn IEEE1588/802.1AS timestamping on. */
> 	eth_timesync_disable_t     timesync_disable;
> 	/** Turn IEEE1588/802.1AS timestamping off. */
> 	eth_timesync_read_rx_timestamp_t timesync_read_rx_timestamp;
> 	/** Read the IEEE1588/802.1AS RX timestamp. */
> 	eth_timesync_read_tx_timestamp_t timesync_read_tx_timestamp;
> 	/** Read the IEEE1588/802.1AS TX timestamp. */
> 	eth_timesync_adjust_time   timesync_adjust_time; /** Adjust the device clock. */
> 	eth_timesync_read_time     timesync_read_time; /** Get the device clock time. */
> 	eth_timesync_write_time    timesync_write_time; /** Set the device clock time. */
> 
> 	eth_l2_tunnel_eth_type_conf_t l2_tunnel_eth_type_conf;
> 	/** Config ether type of l2 tunnel. */
> 	eth_l2_tunnel_offload_set_t   l2_tunnel_offload_set;
> 	/** Enable/disable l2 tunnel offload functions. */

May it be grouped with other tunnel functions?

> };

^ permalink raw reply

* Re: [PATCH 23/28] net/ixgbe: use eal I/O device memory read/write API
From: Santosh Shukla @ 2016-12-22 12:36 UTC (permalink / raw)
  To: Jianbo Liu
  Cc: Jerin Jacob, dev, Ananyev, Konstantin, Thomas Monjalon,
	Bruce Richardson, Jan Viktorin, Helin Zhang
In-Reply-To: <20161216044017.GA29607@santosh-Latitude-E5530-non-vPro>

Hi Jiangbo,

On Thu, Dec 15, 2016 at 08:40:19PM -0800, Santosh Shukla wrote:
> On Thu, Dec 15, 2016 at 04:37:12PM +0800, Jianbo Liu wrote:
> > On 14 December 2016 at 09:55, Jerin Jacob
> > <jerin.jacob@caviumnetworks.com> wrote:
> > > From: Santosh Shukla <santosh.shukla@caviumnetworks.com>
> > >
> > 
> > memory barrier operation is put inside IXGBE_PCI_REG_READ/WRITE in
> > your change, but I found rte_*mb is called before these macros in some
> > places.
> > Can you remove all these redundant calls? And please do the same
> > checking for other drivers.
> >
> 
> Ok.
> 
> Thinking of adding _relaxed_rd/wr style macro agnostic to arch for ixgbe case 
> in particular. Such that for those code incident:
> x86 case> first default barrier + relaxed call.
> arm case> first default barrier + relaxed call.
> 
> Does that make sense to you? If so then will take care in v2.
> 
> Santosh.

We spend time looking at drivers code where double barrier
may happen. Most of them are in driver init path,
configuration/control path code. So keeping double
barrier won't impact performance. 

We plan to replace only fast path code with _relaxed
style API's. That way we won't impact each driver
performance and we'll have the clean port. 

Does it make sense? Thought?

> 
> > >  #define IXGBE_PCI_REG_ADDR(hw, reg) \
> > >         ((volatile uint32_t *)((char *)(hw)->hw_addr + (reg)))
> > > --
> > > 2.5.5
> > >

^ permalink raw reply

* Re: [PATCH v2 3/5] test: add distributor_perf autotest
From: Jerin Jacob @ 2016-12-22 12:19 UTC (permalink / raw)
  To: David Hunt; +Cc: dev, bruce.richardson
In-Reply-To: <1482381428-148094-4-git-send-email-david.hunt@intel.com>

On Thu, Dec 22, 2016 at 04:37:06AM +0000, David Hunt wrote:
> Signed-off-by: David Hunt <david.hunt@intel.com>
> ---
> + * it does nothing but return packets and count them.
> + */
> +static int
> +handle_work_burst(void *arg)
> +{
> +	//struct rte_mbuf *pkt = NULL;

Seems like their is lot test code with // in this file. Please remove it.

> +	struct rte_distributor_burst *d = arg;
> +	unsigned int count = 0;
> +	unsigned int num = 0;
> +	unsigned int id = __sync_fetch_and_add(&worker_idx, 1);

Use rte_atomic equivalent

^ permalink raw reply

* Re: [PATCH] net/sfc: advertise kmod dependencies in pmdinfo
From: Ferruh Yigit @ 2016-12-22 12:15 UTC (permalink / raw)
  To: Andrew Rybchenko, dev; +Cc: olivier.matz
In-Reply-To: <1482406360-6557-1-git-send-email-arybchenko@solarflare.com>

On 12/22/2016 11:32 AM, Andrew Rybchenko wrote:
> Fixes: 0880c40113ef ("drivers: advertise kmod dependencies in pmdinfo")
> 
> Signed-off-by: Andrew Rybchenko <arybchenko@solarflare.com>

Applied to dpdk-next-net/master, thanks.

^ permalink raw reply

* Re: [PATCH v3] drivers: advertise kmod dependencies in pmdinfo
From: Andrew Rybchenko @ 2016-12-22 12:08 UTC (permalink / raw)
  To: Ferruh Yigit, Neil Horman
  Cc: Thomas Monjalon, Olivier Matz, Adrien Mazarguil, dev, vido,
	fiona.trahe, stephen
In-Reply-To: <6abf05af-a834-d9b6-ea67-9e2d250539fe@intel.com>

On 12/22/2016 03:07 PM, Ferruh Yigit wrote:
> On 12/22/2016 11:35 AM, Andrew Rybchenko wrote:
>> On 12/22/2016 02:04 PM, Ferruh Yigit wrote:
>>> On 12/21/2016 11:40 AM, Andrew Rybchenko wrote:
>>>> On 12/21/2016 02:37 PM, Neil Horman wrote:
>>>>> On Wed, Dec 21, 2016 at 12:21:14PM +0300, Andrew Rybchenko wrote:
>>>>>> On 12/20/2016 08:26 PM, Thomas Monjalon wrote:
>>>>>>>>> Add a new macro RTE_PMD_REGISTER_KMOD_DEP() that allows a driver to
>>>>>>>>> declare the list of kernel modules required to run properly.
>>>>>>>>>
>>>>>>>>> Today, most PCI drivers require uio/vfio.
>>>>>>>>>
>>>>>>>>> Signed-off-by: Olivier Matz <olivier.matz@6wind.com>
>>>>>>>>> Acked-by: Fiona Trahe <fiona.trahe@intel.com>
>>>>>>>> Acked-by: Adrien Mazarguil <adrien.mazarguil@6wind.com>
>>>>>>> Applied in main tree, thanks
>>>>>> Is there any plan on how it will be done/solved for a new drivers in
>>>>>> dpdk-next-net?
>>>>>> Should I care about it for sfc?
>>>>>>
>>>>> Given that all pmdinfo information is opt-in (that is to say not obligatory),
>>>>> you can now wait until net-next does its next rebase, and as you continue your
>>>>> development of the sfc driver, you can add the use of this macro in at your
>>>>> leisure.  As more people do that, we will arrive at 100% coverage
>>>> I see. Will do. Thanks.
>>>>
>>> Hi Andrew,
>>>
>>> Patch rebased to next-net, would you mind doing the mentioned patch for it?
>> Hi Ferruh,
>>
>> done. I was in doubts which changeset to specify in fixes, but finally
>> chosen the latest from mine and Olivier's. Please, correct me, if it is
>> wrong.
> I think no fixes line required here, this patch is not fixing a defect,
> but adding a new support for a pmdinfo tool.
> I can remove fixes line while applying.

OK, I see. Thanks.

Andrew.

> Thanks,
> ferruh
>
>> Andrew.
>>

^ permalink raw reply

* Re: [PATCH v3] drivers: advertise kmod dependencies in pmdinfo
From: Ferruh Yigit @ 2016-12-22 12:07 UTC (permalink / raw)
  To: Andrew Rybchenko, Neil Horman
  Cc: Thomas Monjalon, Olivier Matz, Adrien Mazarguil, dev, vido,
	fiona.trahe, stephen
In-Reply-To: <c760a9b0-93fa-58c1-72a6-050eff5a52a4@solarflare.com>

On 12/22/2016 11:35 AM, Andrew Rybchenko wrote:
> On 12/22/2016 02:04 PM, Ferruh Yigit wrote:
>> On 12/21/2016 11:40 AM, Andrew Rybchenko wrote:
>>> On 12/21/2016 02:37 PM, Neil Horman wrote:
>>>> On Wed, Dec 21, 2016 at 12:21:14PM +0300, Andrew Rybchenko wrote:
>>>>> On 12/20/2016 08:26 PM, Thomas Monjalon wrote:
>>>>>>>> Add a new macro RTE_PMD_REGISTER_KMOD_DEP() that allows a driver to
>>>>>>>> declare the list of kernel modules required to run properly.
>>>>>>>>
>>>>>>>> Today, most PCI drivers require uio/vfio.
>>>>>>>>
>>>>>>>> Signed-off-by: Olivier Matz <olivier.matz@6wind.com>
>>>>>>>> Acked-by: Fiona Trahe <fiona.trahe@intel.com>
>>>>>>> Acked-by: Adrien Mazarguil <adrien.mazarguil@6wind.com>
>>>>>> Applied in main tree, thanks
>>>>> Is there any plan on how it will be done/solved for a new drivers in
>>>>> dpdk-next-net?
>>>>> Should I care about it for sfc?
>>>>>
>>>> Given that all pmdinfo information is opt-in (that is to say not obligatory),
>>>> you can now wait until net-next does its next rebase, and as you continue your
>>>> development of the sfc driver, you can add the use of this macro in at your
>>>> leisure.  As more people do that, we will arrive at 100% coverage
>>> I see. Will do. Thanks.
>>>
>> Hi Andrew,
>>
>> Patch rebased to next-net, would you mind doing the mentioned patch for it?
> 
> Hi Ferruh,
> 
> done. I was in doubts which changeset to specify in fixes, but finally 
> chosen the latest from mine and Olivier's. Please, correct me, if it is 
> wrong.

I think no fixes line required here, this patch is not fixing a defect,
but adding a new support for a pmdinfo tool.
I can remove fixes line while applying.

Thanks,
ferruh

> 
> Andrew.
> 

^ permalink raw reply

* [PATCH] ethdev: remove invalid function from version map file
From: Ferruh Yigit @ 2016-12-22 12:00 UTC (permalink / raw)
  To: dev; +Cc: Thomas Monjalon, Ferruh Yigit

Fixes: 9d41beed24b0 ("lib: provide initial versioning")

Signed-off-by: Ferruh Yigit <ferruh.yigit@intel.com>
---
 lib/librte_ether/rte_ether_version.map | 1 -
 1 file changed, 1 deletion(-)

diff --git a/lib/librte_ether/rte_ether_version.map b/lib/librte_ether/rte_ether_version.map
index 209328c..b7ee80c 100644
--- a/lib/librte_ether/rte_ether_version.map
+++ b/lib/librte_ether/rte_ether_version.map
@@ -19,7 +19,6 @@ DPDK_2.2 {
 	rte_eth_dev_bypass_ver_show;
 	rte_eth_dev_bypass_wd_reset;
 	rte_eth_dev_bypass_wd_timeout_show;
-	rte_eth_dev_callback_process;
 	rte_eth_dev_callback_register;
 	rte_eth_dev_callback_unregister;
 	rte_eth_dev_close;
-- 
2.9.3

^ permalink raw reply related

* [PATCH v2] ethdev: cleanup device ops struct whitespace
From: Ferruh Yigit @ 2016-12-22 11:53 UTC (permalink / raw)
  To: dev; +Cc: Thomas Monjalon, Ferruh Yigit
In-Reply-To: <20161208135940.17233-1-ferruh.yigit@intel.com>

- Grouped related items using empty lines
- Aligned arguments to same column
- All item comments that doesn't fit same line are placed blow the item
  itself
- Moved some comments to same line if overall line < 100 chars

Signed-off-by: Ferruh Yigit <ferruh.yigit@intel.com>

---

- ! This patch has the problem of trashing the git history for the struct,
  which is indeed valid argument.
- Some re-ordering also may be required which I hesitate to do
- Some item comments doesn't give extra information and can be removed

v2:
- extract mtu_set into new group
- move rss_hash_* to reta_* group
- move set_mc_addr_list to mac_addr_* group
- move set_vf_rate_limit to set_vf_* group
- move get_dcb_info out of timesync_* group

To make it easy to comment to latest struct, copy-paste here:
[With some extra notes]

struct eth_dev_ops {
	eth_dev_configure_t        dev_configure; /**< Configure device. */
	eth_dev_start_t            dev_start;     /**< Start device. */
	eth_dev_stop_t             dev_stop;      /**< Stop device. */
	eth_dev_set_link_up_t      dev_set_link_up;   /**< Device link up. */
	eth_dev_set_link_down_t    dev_set_link_down; /**< Device link down. */
	eth_dev_close_t            dev_close;     /**< Close device. */
	eth_promiscuous_enable_t   promiscuous_enable; /**< Promiscuous ON. */
	eth_promiscuous_disable_t  promiscuous_disable;/**< Promiscuous OFF. */
	eth_allmulticast_enable_t  allmulticast_enable;/**< RX multicast ON. */
	eth_allmulticast_disable_t allmulticast_disable;/**< RX multicast OF. */
	eth_link_update_t          link_update;   /**< Get device link state. */

	eth_stats_get_t            stats_get;     /**< Get generic device statistics. */
	eth_stats_reset_t          stats_reset;   /**< Reset generic device statistics. */
	eth_xstats_get_t           xstats_get;    /**< Get extended device statistics. */
	eth_xstats_reset_t         xstats_reset;  /**< Reset extended device statistics. */
	eth_xstats_get_names_t     xstats_get_names;
	/**< Get names of extended statistics. */
	eth_queue_stats_mapping_set_t queue_stats_mapping_set;
	/**< Configure per queue stat counter mapping. */

	eth_dev_infos_get_t        dev_infos_get; /**< Get device info. */
	eth_dev_supported_ptypes_get_t dev_supported_ptypes_get;
	/**< Get packet types supported and identified by device. */

	mtu_set_t                  mtu_set;       /**< Set MTU. */

	vlan_filter_set_t          vlan_filter_set; /**< Filter VLAN Setup. */
	vlan_tpid_set_t            vlan_tpid_set; /**< Outer/Inner VLAN TPID Setup. */
	vlan_strip_queue_set_t     vlan_strip_queue_set; /**< VLAN Stripping on queue. */
	vlan_offload_set_t         vlan_offload_set; /**< Set VLAN Offload. */
	vlan_pvid_set_t            vlan_pvid_set; /**< Set port based TX VLAN insertion. */

	eth_queue_start_t          rx_queue_start;/**< Start RX for a queue. */
	eth_queue_stop_t           rx_queue_stop; /**< Stop RX for a queue. */
	eth_queue_start_t          tx_queue_start;/**< Start TX for a queue. */
	eth_queue_stop_t           tx_queue_stop; /**< Stop TX for a queue. */
	eth_rx_queue_setup_t       rx_queue_setup;/**< Set up device RX queue. */
	eth_queue_release_t        rx_queue_release; /**< Release RX queue. */
	eth_rx_queue_count_t       rx_queue_count;/**< Get Rx queue count. */
	eth_rx_descriptor_done_t   rx_descriptor_done; /**< Check rxd DD bit. */
	eth_rx_enable_intr_t       rx_queue_intr_enable;  /**< Enable Rx queue interrupt. */
	eth_rx_disable_intr_t      rx_queue_intr_disable; /**< Disable Rx queue interrupt. */
	eth_tx_queue_setup_t       tx_queue_setup;/**< Set up device TX queue. */
	eth_queue_release_t        tx_queue_release; /**< Release TX queue. */

	eth_dev_led_on_t           dev_led_on;    /**< Turn on LED. */	[Really need these comments?]
	eth_dev_led_off_t          dev_led_off;   /**< Turn off LED. */

	flow_ctrl_get_t            flow_ctrl_get; /**< Get flow control. */
	flow_ctrl_set_t            flow_ctrl_set; /**< Setup flow control. */
	priority_flow_ctrl_set_t   priority_flow_ctrl_set; /**< Setup priority flow control. */

	eth_mac_addr_remove_t      mac_addr_remove; /**< Remove MAC address. */
	eth_mac_addr_add_t         mac_addr_add;  /**< Add a MAC address. */
	eth_mac_addr_set_t         mac_addr_set;  /**< Set a MAC address. */
	eth_set_mc_addr_list_t     set_mc_addr_list; /**< set list of mcast addrs. */

	eth_uc_hash_table_set_t    uc_hash_table_set; /**< Set Unicast Table Array. */
	eth_uc_all_hash_table_set_t uc_all_hash_table_set; /**< Set Unicast hash bitmap. */

	eth_mirror_rule_set_t	   mirror_rule_set; /**< Add a traffic mirror rule. */
	eth_mirror_rule_reset_t	   mirror_rule_reset; /**< reset a traffic mirror rule. */

	[Following already removed from next-net]
	eth_set_vf_rx_mode_t       set_vf_rx_mode;/**< Set VF RX mode. */
	eth_set_vf_rx_t            set_vf_rx;     /**< enable/disable a VF receive. */
	eth_set_vf_tx_t            set_vf_tx;     /**< enable/disable a VF transmit. */
	eth_set_vf_vlan_filter_t   set_vf_vlan_filter; /**< Set VF VLAN filter. */
	eth_set_vf_rate_limit_t    set_vf_rate_limit; /**< Set VF rate limit. */

	eth_udp_tunnel_port_add_t  udp_tunnel_port_add; /** Add UDP tunnel port. */
	eth_udp_tunnel_port_del_t  udp_tunnel_port_del; /** Del UDP tunnel port. */

	eth_set_queue_rate_limit_t set_queue_rate_limit; /**< Set queue rate limit. */

	rss_hash_update_t          rss_hash_update; /** Configure RSS hash protocols. */
	rss_hash_conf_get_t        rss_hash_conf_get; /** Get current RSS hash configuration. */
	reta_update_t              reta_update;   /** Update redirection table. */
	reta_query_t               reta_query;    /** Query redirection table. */

	eth_get_reg_t              get_reg;           /**< Get registers. */
	eth_get_eeprom_length_t    get_eeprom_length; /**< Get eeprom length. */
	eth_get_eeprom_t           get_eeprom;        /**< Get eeprom data. */
	eth_set_eeprom_t           set_eeprom;        /**< Set eeprom. */

	/* bypass control */
	bypass_init_t              bypass_init;
	bypass_state_set_t         bypass_state_set;
	bypass_state_show_t        bypass_state_show;
	bypass_event_set_t         bypass_event_set;
	bypass_event_show_t        bypass_event_show;
	bypass_wd_timeout_set_t    bypass_wd_timeout_set;
	bypass_wd_timeout_show_t   bypass_wd_timeout_show;
	bypass_ver_show_t          bypass_ver_show;
	bypass_wd_reset_t          bypass_wd_reset;

	eth_filter_ctrl_t          filter_ctrl; /**< common filter control. */

	eth_rxq_info_get_t         rxq_info_get; /**< retrieve RX queue information. */
	eth_txq_info_get_t         txq_info_get; /**< retrieve TX queue information. */

	eth_get_dcb_info           get_dcb_info; /** Get DCB information. */

	eth_timesync_enable_t      timesync_enable;
	/** Turn IEEE1588/802.1AS timestamping on. */
	eth_timesync_disable_t     timesync_disable;
	/** Turn IEEE1588/802.1AS timestamping off. */
	eth_timesync_read_rx_timestamp_t timesync_read_rx_timestamp;
	/** Read the IEEE1588/802.1AS RX timestamp. */
	eth_timesync_read_tx_timestamp_t timesync_read_tx_timestamp;
	/** Read the IEEE1588/802.1AS TX timestamp. */
	eth_timesync_adjust_time   timesync_adjust_time; /** Adjust the device clock. */
	eth_timesync_read_time     timesync_read_time; /** Get the device clock time. */
	eth_timesync_write_time    timesync_write_time; /** Set the device clock time. */

	eth_l2_tunnel_eth_type_conf_t l2_tunnel_eth_type_conf;
	/** Config ether type of l2 tunnel. */
	eth_l2_tunnel_offload_set_t   l2_tunnel_offload_set;
	/** Enable/disable l2 tunnel offload functions. */
};
---
 lib/librte_ether/rte_ethdev.h | 171 +++++++++++++++++++++---------------------
 1 file changed, 85 insertions(+), 86 deletions(-)

diff --git a/lib/librte_ether/rte_ethdev.h b/lib/librte_ether/rte_ethdev.h
index 52119af..3c92fd8 100644
--- a/lib/librte_ether/rte_ethdev.h
+++ b/lib/librte_ether/rte_ethdev.h
@@ -1436,6 +1436,7 @@ struct eth_dev_ops {
 	eth_allmulticast_enable_t  allmulticast_enable;/**< RX multicast ON. */
 	eth_allmulticast_disable_t allmulticast_disable;/**< RX multicast OF. */
 	eth_link_update_t          link_update;   /**< Get device link state. */
+
 	eth_stats_get_t            stats_get;     /**< Get generic device statistics. */
 	eth_stats_reset_t          stats_reset;   /**< Reset generic device statistics. */
 	eth_xstats_get_t           xstats_get;    /**< Get extended device statistics. */
@@ -1444,109 +1445,107 @@ struct eth_dev_ops {
 	/**< Get names of extended statistics. */
 	eth_queue_stats_mapping_set_t queue_stats_mapping_set;
 	/**< Configure per queue stat counter mapping. */
+
 	eth_dev_infos_get_t        dev_infos_get; /**< Get device info. */
 	eth_dev_supported_ptypes_get_t dev_supported_ptypes_get;
-	/**< Get packet types supported and identified by device*/
-	mtu_set_t                  mtu_set; /**< Set MTU. */
-	vlan_filter_set_t          vlan_filter_set;  /**< Filter VLAN Setup. */
-	vlan_tpid_set_t            vlan_tpid_set;      /**< Outer/Inner VLAN TPID Setup. */
+	/**< Get packet types supported and identified by device. */
+
+	mtu_set_t                  mtu_set;       /**< Set MTU. */
+
+	vlan_filter_set_t          vlan_filter_set; /**< Filter VLAN Setup. */
+	vlan_tpid_set_t            vlan_tpid_set; /**< Outer/Inner VLAN TPID Setup. */
 	vlan_strip_queue_set_t     vlan_strip_queue_set; /**< VLAN Stripping on queue. */
 	vlan_offload_set_t         vlan_offload_set; /**< Set VLAN Offload. */
-	vlan_pvid_set_t            vlan_pvid_set; /**< Set port based TX VLAN insertion */
-	eth_queue_start_t          rx_queue_start;/**< Start RX for a queue.*/
-	eth_queue_stop_t           rx_queue_stop;/**< Stop RX for a queue.*/
-	eth_queue_start_t          tx_queue_start;/**< Start TX for a queue.*/
-	eth_queue_stop_t           tx_queue_stop;/**< Stop TX for a queue.*/
-	eth_rx_queue_setup_t       rx_queue_setup;/**< Set up device RX queue.*/
-	eth_queue_release_t        rx_queue_release;/**< Release RX queue.*/
-	eth_rx_queue_count_t       rx_queue_count; /**< Get Rx queue count. */
-	eth_rx_descriptor_done_t   rx_descriptor_done;  /**< Check rxd DD bit */
-	/**< Enable Rx queue interrupt. */
-	eth_rx_enable_intr_t       rx_queue_intr_enable;
-	/**< Disable Rx queue interrupt.*/
-	eth_rx_disable_intr_t      rx_queue_intr_disable;
-	eth_tx_queue_setup_t       tx_queue_setup;/**< Set up device TX queue.*/
-	eth_queue_release_t        tx_queue_release;/**< Release TX queue.*/
+	vlan_pvid_set_t            vlan_pvid_set; /**< Set port based TX VLAN insertion. */
+
+	eth_queue_start_t          rx_queue_start;/**< Start RX for a queue. */
+	eth_queue_stop_t           rx_queue_stop; /**< Stop RX for a queue. */
+	eth_queue_start_t          tx_queue_start;/**< Start TX for a queue. */
+	eth_queue_stop_t           tx_queue_stop; /**< Stop TX for a queue. */
+	eth_rx_queue_setup_t       rx_queue_setup;/**< Set up device RX queue. */
+	eth_queue_release_t        rx_queue_release; /**< Release RX queue. */
+	eth_rx_queue_count_t       rx_queue_count;/**< Get Rx queue count. */
+	eth_rx_descriptor_done_t   rx_descriptor_done; /**< Check rxd DD bit. */
+	eth_rx_enable_intr_t       rx_queue_intr_enable;  /**< Enable Rx queue interrupt. */
+	eth_rx_disable_intr_t      rx_queue_intr_disable; /**< Disable Rx queue interrupt. */
+	eth_tx_queue_setup_t       tx_queue_setup;/**< Set up device TX queue. */
+	eth_queue_release_t        tx_queue_release; /**< Release TX queue. */
+
 	eth_dev_led_on_t           dev_led_on;    /**< Turn on LED. */
 	eth_dev_led_off_t          dev_led_off;   /**< Turn off LED. */
+
 	flow_ctrl_get_t            flow_ctrl_get; /**< Get flow control. */
 	flow_ctrl_set_t            flow_ctrl_set; /**< Setup flow control. */
-	priority_flow_ctrl_set_t   priority_flow_ctrl_set; /**< Setup priority flow control.*/
-	eth_mac_addr_remove_t      mac_addr_remove; /**< Remove MAC address */
-	eth_mac_addr_add_t         mac_addr_add;  /**< Add a MAC address */
-	eth_mac_addr_set_t         mac_addr_set;  /**< Set a MAC address */
-	eth_uc_hash_table_set_t    uc_hash_table_set;  /**< Set Unicast Table Array */
-	eth_uc_all_hash_table_set_t uc_all_hash_table_set;  /**< Set Unicast hash bitmap */
-	eth_mirror_rule_set_t	   mirror_rule_set;  /**< Add a traffic mirror rule.*/
-	eth_mirror_rule_reset_t	   mirror_rule_reset;  /**< reset a traffic mirror rule.*/
-	eth_set_vf_rx_mode_t       set_vf_rx_mode;   /**< Set VF RX mode */
-	eth_set_vf_rx_t            set_vf_rx;  /**< enable/disable a VF receive */
-	eth_set_vf_tx_t            set_vf_tx;  /**< enable/disable a VF transmit */
-	eth_set_vf_vlan_filter_t   set_vf_vlan_filter;  /**< Set VF VLAN filter */
-	/** Add UDP tunnel port. */
-	eth_udp_tunnel_port_add_t udp_tunnel_port_add;
-	/** Del UDP tunnel port. */
-	eth_udp_tunnel_port_del_t udp_tunnel_port_del;
-	eth_set_queue_rate_limit_t set_queue_rate_limit;   /**< Set queue rate limit */
-	eth_set_vf_rate_limit_t    set_vf_rate_limit;   /**< Set VF rate limit */
-	/** Update redirection table. */
-	reta_update_t reta_update;
-	/** Query redirection table. */
-	reta_query_t reta_query;
-
-	eth_get_reg_t get_reg;
-	/**< Get registers */
-	eth_get_eeprom_length_t get_eeprom_length;
-	/**< Get eeprom length */
-	eth_get_eeprom_t get_eeprom;
-	/**< Get eeprom data */
-	eth_set_eeprom_t set_eeprom;
-	/**< Set eeprom */
-  /* bypass control */
+	priority_flow_ctrl_set_t   priority_flow_ctrl_set; /**< Setup priority flow control. */
+
+	eth_mac_addr_remove_t      mac_addr_remove; /**< Remove MAC address. */
+	eth_mac_addr_add_t         mac_addr_add;  /**< Add a MAC address. */
+	eth_mac_addr_set_t         mac_addr_set;  /**< Set a MAC address. */
+	eth_set_mc_addr_list_t     set_mc_addr_list; /**< set list of mcast addrs. */
+
+	eth_uc_hash_table_set_t    uc_hash_table_set; /**< Set Unicast Table Array. */
+	eth_uc_all_hash_table_set_t uc_all_hash_table_set; /**< Set Unicast hash bitmap. */
+
+	eth_mirror_rule_set_t	   mirror_rule_set; /**< Add a traffic mirror rule. */
+	eth_mirror_rule_reset_t	   mirror_rule_reset; /**< reset a traffic mirror rule. */
+
+	eth_set_vf_rx_mode_t       set_vf_rx_mode;/**< Set VF RX mode. */
+	eth_set_vf_rx_t            set_vf_rx;     /**< enable/disable a VF receive. */
+	eth_set_vf_tx_t            set_vf_tx;     /**< enable/disable a VF transmit. */
+	eth_set_vf_vlan_filter_t   set_vf_vlan_filter; /**< Set VF VLAN filter. */
+	eth_set_vf_rate_limit_t    set_vf_rate_limit; /**< Set VF rate limit. */
+
+	eth_udp_tunnel_port_add_t  udp_tunnel_port_add; /** Add UDP tunnel port. */
+	eth_udp_tunnel_port_del_t  udp_tunnel_port_del; /** Del UDP tunnel port. */
+
+	eth_set_queue_rate_limit_t set_queue_rate_limit; /**< Set queue rate limit. */
+
+	rss_hash_update_t          rss_hash_update; /** Configure RSS hash protocols. */
+	rss_hash_conf_get_t        rss_hash_conf_get; /** Get current RSS hash configuration. */
+	reta_update_t              reta_update;   /** Update redirection table. */
+	reta_query_t               reta_query;    /** Query redirection table. */
+
+	eth_get_reg_t              get_reg;           /**< Get registers. */
+	eth_get_eeprom_length_t    get_eeprom_length; /**< Get eeprom length. */
+	eth_get_eeprom_t           get_eeprom;        /**< Get eeprom data. */
+	eth_set_eeprom_t           set_eeprom;        /**< Set eeprom. */
+
+	/* bypass control */
 #ifdef RTE_NIC_BYPASS
-  bypass_init_t bypass_init;
-  bypass_state_set_t bypass_state_set;
-  bypass_state_show_t bypass_state_show;
-  bypass_event_set_t bypass_event_set;
-  bypass_event_show_t bypass_event_show;
-  bypass_wd_timeout_set_t bypass_wd_timeout_set;
-  bypass_wd_timeout_show_t bypass_wd_timeout_show;
-  bypass_ver_show_t bypass_ver_show;
-  bypass_wd_reset_t bypass_wd_reset;
+	bypass_init_t              bypass_init;
+	bypass_state_set_t         bypass_state_set;
+	bypass_state_show_t        bypass_state_show;
+	bypass_event_set_t         bypass_event_set;
+	bypass_event_show_t        bypass_event_show;
+	bypass_wd_timeout_set_t    bypass_wd_timeout_set;
+	bypass_wd_timeout_show_t   bypass_wd_timeout_show;
+	bypass_ver_show_t          bypass_ver_show;
+	bypass_wd_reset_t          bypass_wd_reset;
 #endif
 
-	/** Configure RSS hash protocols. */
-	rss_hash_update_t rss_hash_update;
-	/** Get current RSS hash configuration. */
-	rss_hash_conf_get_t rss_hash_conf_get;
-	eth_filter_ctrl_t              filter_ctrl;
-	/**< common filter control. */
-	eth_set_mc_addr_list_t set_mc_addr_list; /**< set list of mcast addrs */
-	eth_rxq_info_get_t rxq_info_get;
-	/**< retrieve RX queue information. */
-	eth_txq_info_get_t txq_info_get;
-	/**< retrieve TX queue information. */
+	eth_filter_ctrl_t          filter_ctrl; /**< common filter control. */
+
+	eth_rxq_info_get_t         rxq_info_get; /**< retrieve RX queue information. */
+	eth_txq_info_get_t         txq_info_get; /**< retrieve TX queue information. */
+
+	eth_get_dcb_info           get_dcb_info; /** Get DCB information. */
+
+	eth_timesync_enable_t      timesync_enable;
 	/** Turn IEEE1588/802.1AS timestamping on. */
-	eth_timesync_enable_t timesync_enable;
+	eth_timesync_disable_t     timesync_disable;
 	/** Turn IEEE1588/802.1AS timestamping off. */
-	eth_timesync_disable_t timesync_disable;
-	/** Read the IEEE1588/802.1AS RX timestamp. */
 	eth_timesync_read_rx_timestamp_t timesync_read_rx_timestamp;
-	/** Read the IEEE1588/802.1AS TX timestamp. */
+	/** Read the IEEE1588/802.1AS RX timestamp. */
 	eth_timesync_read_tx_timestamp_t timesync_read_tx_timestamp;
+	/** Read the IEEE1588/802.1AS TX timestamp. */
+	eth_timesync_adjust_time   timesync_adjust_time; /** Adjust the device clock. */
+	eth_timesync_read_time     timesync_read_time; /** Get the device clock time. */
+	eth_timesync_write_time    timesync_write_time; /** Set the device clock time. */
 
-	/** Get DCB information */
-	eth_get_dcb_info get_dcb_info;
-	/** Adjust the device clock.*/
-	eth_timesync_adjust_time timesync_adjust_time;
-	/** Get the device clock time. */
-	eth_timesync_read_time timesync_read_time;
-	/** Set the device clock time. */
-	eth_timesync_write_time timesync_write_time;
-	/** Config ether type of l2 tunnel */
 	eth_l2_tunnel_eth_type_conf_t l2_tunnel_eth_type_conf;
-	/** Enable/disable l2 tunnel offload functions */
-	eth_l2_tunnel_offload_set_t l2_tunnel_offload_set;
+	/** Config ether type of l2 tunnel. */
+	eth_l2_tunnel_offload_set_t   l2_tunnel_offload_set;
+	/** Enable/disable l2 tunnel offload functions. */
 };
 
 /**
-- 
2.9.3

^ permalink raw reply related

* [PATCH v2 5/5] doc: distributor library changes for new burst api
From: David Hunt @ 2016-12-22  4:37 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson, David Hunt
In-Reply-To: <1482381428-148094-1-git-send-email-david.hunt@intel.com>

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 doc/guides/prog_guide/packet_distrib_lib.rst | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/doc/guides/prog_guide/packet_distrib_lib.rst b/doc/guides/prog_guide/packet_distrib_lib.rst
index b5bdabb..dffd4ad 100644
--- a/doc/guides/prog_guide/packet_distrib_lib.rst
+++ b/doc/guides/prog_guide/packet_distrib_lib.rst
@@ -42,6 +42,10 @@ The model of operation is shown in the diagram below.
 
    Packet Distributor mode of operation
 
+There are two versions of the API in the distributor Library, one which sends one packet at a time to workers,
+and another which sends bursts of up to 8 packets at a time to workers. The functions names of the second API
+are identified by "_burst", and must not be intermixed with the single packet API. The operations described below
+apply to both API's, select which API you wish to use by including the relevant header file.
 
 Distributor Core Operation
 --------------------------
-- 
2.7.4

^ permalink raw reply related

* [PATCH v2 3/5] test: add distributor_perf autotest
From: David Hunt @ 2016-12-22  4:37 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson, David Hunt
In-Reply-To: <1482381428-148094-1-git-send-email-david.hunt@intel.com>

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 app/test/test_distributor_perf.c | 133 +++++++++++++++++++++++++++++++++++++--
 1 file changed, 127 insertions(+), 6 deletions(-)

diff --git a/app/test/test_distributor_perf.c b/app/test/test_distributor_perf.c
index 7947fe9..86285fd 100644
--- a/app/test/test_distributor_perf.c
+++ b/app/test/test_distributor_perf.c
@@ -40,9 +40,11 @@
 #include <rte_common.h>
 #include <rte_mbuf.h>
 #include <rte_distributor.h>
+#include <rte_distributor_burst.h>
 
-#define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
-#define BURST 32
+#define ITER_POWER_CL 25 /* log 2 of how many iterations  for Cache Line test */
+#define ITER_POWER 21 /* log 2 of how many iterations we do when timing. */
+#define BURST 64
 #define BIG_BATCH 1024
 
 /* static vars - zero initialized by default */
@@ -86,7 +88,7 @@ time_cache_line_switch(void)
 		rte_pause();
 
 	const uint64_t start_time = rte_rdtsc();
-	for (i = 0; i < (1 << ITER_POWER); i++) {
+	for (i = 0; i < (1 << ITER_POWER_CL); i++) {
 		while (*pdata)
 			rte_pause();
 		*pdata = 1;
@@ -98,10 +100,10 @@ time_cache_line_switch(void)
 	*pdata = 2;
 	rte_eal_wait_lcore(slaveid);
 	printf("==== Cache line switch test ===\n");
-	printf("Time for %u iterations = %"PRIu64" ticks\n", (1<<ITER_POWER),
+	printf("Time for %u iterations = %"PRIu64" ticks\n", (1<<ITER_POWER_CL),
 			end_time-start_time);
 	printf("Ticks per iteration = %"PRIu64"\n\n",
-			(end_time-start_time) >> ITER_POWER);
+			(end_time-start_time) >> ITER_POWER_CL);
 }
 
 /* returns the total count of the number of packets handled by the worker
@@ -144,6 +146,34 @@ handle_work(void *arg)
 	return 0;
 }
 
+/* this is the basic worker function for performance tests.
+ * it does nothing but return packets and count them.
+ */
+static int
+handle_work_burst(void *arg)
+{
+	//struct rte_mbuf *pkt = NULL;
+	struct rte_distributor_burst *d = arg;
+	unsigned int count = 0;
+	unsigned int num = 0;
+	unsigned int id = __sync_fetch_and_add(&worker_idx, 1);
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
+
+	for (int i = 0; i < 8; i++)
+		buf[i] = NULL;
+
+	num = rte_distributor_get_pkt_burst(d, id, buf, buf, num);
+	while (!quit) {
+		worker_stats[id].handled_packets += num;
+		count += num;
+		num = rte_distributor_get_pkt_burst(d, id, buf, buf, num);
+	}
+	worker_stats[id].handled_packets += num;
+	count += num;
+	rte_distributor_return_pkt_burst(d, id, buf, num);
+	return 0;
+}
+
 /* this basic performance test just repeatedly sends in 32 packets at a time
  * to the distributor and verifies at the end that we got them all in the worker
  * threads and finally how long per packet the processing took.
@@ -174,6 +204,8 @@ perf_test(struct rte_distributor *d, struct rte_mempool *p)
 		rte_distributor_process(d, NULL, 0);
 	} while (total_packet_count() < (BURST << ITER_POWER));
 
+	rte_distributor_clear_returns(d);
+
 	printf("=== Performance test of distributor ===\n");
 	printf("Time per burst:  %"PRIu64"\n", (end - start) >> ITER_POWER);
 	printf("Time per packet: %"PRIu64"\n\n",
@@ -190,6 +222,54 @@ perf_test(struct rte_distributor *d, struct rte_mempool *p)
 	return 0;
 }
 
+/* this basic performance test just repeatedly sends in 32 packets at a time
+ * to the distributor and verifies at the end that we got them all in the worker
+ * threads and finally how long per packet the processing took.
+ */
+static inline int
+perf_test_burst(struct rte_distributor_burst *d, struct rte_mempool *p)
+{
+	unsigned int i;
+	uint64_t start, end;
+	struct rte_mbuf *bufs[BURST];
+
+	clear_packet_count();
+	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+		printf("Error getting mbufs from pool\n");
+		return -1;
+	}
+	/* ensure we have different hash value for each pkt */
+	for (i = 0; i < BURST; i++)
+		bufs[i]->hash.usr = i;
+
+	start = rte_rdtsc();
+	for (i = 0; i < (1<<ITER_POWER); i++)
+		rte_distributor_process_burst(d, bufs, BURST);
+	end = rte_rdtsc();
+
+	do {
+		usleep(100);
+		rte_distributor_process_burst(d, NULL, 0);
+	} while (total_packet_count() < (BURST << ITER_POWER));
+
+	rte_distributor_clear_returns_burst(d);
+
+	printf("=== Performance test of burst distributor ===\n");
+	printf("Time per burst:  %"PRIu64"\n", (end - start) >> ITER_POWER);
+	printf("Time per packet: %"PRIu64"\n\n",
+			((end - start) >> ITER_POWER)/BURST);
+	rte_mempool_put_bulk(p, (void *)bufs, BURST);
+
+	for (i = 0; i < rte_lcore_count() - 1; i++)
+		printf("Worker %u handled %u packets\n", i,
+				worker_stats[i].handled_packets);
+	printf("Total packets: %u (%x)\n", total_packet_count(),
+			total_packet_count());
+	printf("=== Perf test done ===\n\n");
+
+	return 0;
+}
+
 /* Useful function which ensures that all worker functions terminate */
 static void
 quit_workers(struct rte_distributor *d, struct rte_mempool *p)
@@ -212,10 +292,34 @@ quit_workers(struct rte_distributor *d, struct rte_mempool *p)
 	worker_idx = 0;
 }
 
+/* Useful function which ensures that all worker functions terminate */
+static void
+quit_workers_burst(struct rte_distributor_burst *d, struct rte_mempool *p)
+{
+	const unsigned int num_workers = rte_lcore_count() - 1;
+	unsigned int i;
+	struct rte_mbuf *bufs[RTE_MAX_LCORE];
+
+	rte_mempool_get_bulk(p, (void *)bufs, num_workers);
+
+	quit = 1;
+	for (i = 0; i < num_workers; i++)
+		bufs[i]->hash.usr = i << 1;
+	rte_distributor_process_burst(d, bufs, num_workers);
+
+	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
+
+	rte_distributor_process_burst(d, NULL, 0);
+	rte_eal_mp_wait_lcore();
+	quit = 0;
+	worker_idx = 0;
+}
+
 static int
 test_distributor_perf(void)
 {
 	static struct rte_distributor *d;
+	static struct rte_distributor_burst *db;
 	static struct rte_mempool *p;
 
 	if (rte_lcore_count() < 2) {
@@ -234,10 +338,22 @@ test_distributor_perf(void)
 			return -1;
 		}
 	} else {
-		rte_distributor_flush(d);
+		//rte_distributor_flush_burst(d);
 		rte_distributor_clear_returns(d);
 	}
 
+	if (db == NULL) {
+		db = rte_distributor_create_burst("Test_burst", rte_socket_id(),
+				rte_lcore_count() - 1);
+		if (db == NULL) {
+			printf("Error creating burst distributor\n");
+			return -1;
+		}
+	} else {
+		//rte_distributor_flush_burst(d);
+		rte_distributor_clear_returns_burst(db);
+	}
+
 	const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
 			(BIG_BATCH * 2) - 1 : (511 * rte_lcore_count());
 	if (p == NULL) {
@@ -254,6 +370,11 @@ test_distributor_perf(void)
 		return -1;
 	quit_workers(d, p);
 
+	rte_eal_mp_remote_launch(handle_work_burst, db, SKIP_MASTER);
+	if (perf_test_burst(db, p) < 0)
+		return -1;
+	quit_workers_burst(db, p);
+
 	return 0;
 }
 
-- 
2.7.4

^ permalink raw reply related

* [PATCH v2 4/5] example: distributor app showing burst api
From: David Hunt @ 2016-12-22  4:37 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson, David Hunt
In-Reply-To: <1482381428-148094-1-git-send-email-david.hunt@intel.com>

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 examples/distributor/main.c | 505 ++++++++++++++++++++++++++++++++++----------
 1 file changed, 388 insertions(+), 117 deletions(-)

diff --git a/examples/distributor/main.c b/examples/distributor/main.c
index e7641d2..451e253 100644
--- a/examples/distributor/main.c
+++ b/examples/distributor/main.c
@@ -1,8 +1,7 @@
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
- *   All rights reserved.
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
  *   modification, are permitted provided that the following conditions
@@ -31,6 +30,8 @@
  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 
+#define BURST_API 1
+
 #include <stdint.h>
 #include <inttypes.h>
 #include <unistd.h>
@@ -43,39 +44,87 @@
 #include <rte_malloc.h>
 #include <rte_debug.h>
 #include <rte_prefetch.h>
+#if BURST_API
+#include <rte_distributor_burst.h>
+#else
 #include <rte_distributor.h>
+#endif
 
-#define RX_RING_SIZE 256
-#define TX_RING_SIZE 512
+#define RX_QUEUE_SIZE 512
+#define TX_QUEUE_SIZE 512
 #define NUM_MBUFS ((64*1024)-1)
-#define MBUF_CACHE_SIZE 250
+#define MBUF_CACHE_SIZE 128
+#if BURST_API
+#define BURST_SIZE 64
+#define SCHED_RX_RING_SZ 8192
+#define SCHED_TX_RING_SZ 65536
+#else
 #define BURST_SIZE 32
-#define RTE_RING_SZ 1024
+#define SCHED_RX_RING_SZ 1024
+#define SCHED_TX_RING_SZ 1024
+#endif
+#define BURST_SIZE_TX 32
 
 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
 
+#define ANSI_COLOR_RED     "\x1b[31m"
+#define ANSI_COLOR_RESET   "\x1b[0m"
+
 /* mask of enabled ports */
 static uint32_t enabled_port_mask;
 volatile uint8_t quit_signal;
 volatile uint8_t quit_signal_rx;
+volatile uint8_t quit_signal_dist;
+volatile uint8_t quit_signal_work;
 
 static volatile struct app_stats {
 	struct {
 		uint64_t rx_pkts;
 		uint64_t returned_pkts;
 		uint64_t enqueued_pkts;
+		uint64_t enqdrop_pkts;
 	} rx __rte_cache_aligned;
+	int pad1 __rte_cache_aligned;
+
+	struct {
+		uint64_t in_pkts;
+		uint64_t ret_pkts;
+		uint64_t sent_pkts;
+		uint64_t enqdrop_pkts;
+	} dist __rte_cache_aligned;
+	int pad2 __rte_cache_aligned;
 
 	struct {
 		uint64_t dequeue_pkts;
 		uint64_t tx_pkts;
+		uint64_t enqdrop_pkts;
 	} tx __rte_cache_aligned;
+	int pad3 __rte_cache_aligned;
+
+	uint64_t worker_pkts[64] __rte_cache_aligned;
+
+	int pad4 __rte_cache_aligned;
+
+	uint64_t worker_bursts[64][8] __rte_cache_aligned;
+
+	int pad5 __rte_cache_aligned;
+
+	uint64_t port_rx_pkts[64] __rte_cache_aligned;
+	uint64_t port_tx_pkts[64] __rte_cache_aligned;
 } app_stats;
 
+struct app_stats prev_app_stats;
+
 static const struct rte_eth_conf port_conf_default = {
 	.rxmode = {
 		.mq_mode = ETH_MQ_RX_RSS,
 		.max_rx_pkt_len = ETHER_MAX_LEN,
+		.split_hdr_size = 0,
+		.header_split   = 0, /**< Header Split disabled */
+		.hw_ip_checksum = 1, /**< IP checksum offload enabled */
+		.hw_vlan_filter = 0, /**< VLAN filtering disabled */
+		.jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
+		.hw_strip_crc   = 0, /**< CRC stripped by hardware */
 	},
 	.txmode = {
 		.mq_mode = ETH_MQ_TX_NONE,
@@ -93,6 +142,8 @@ struct output_buffer {
 	struct rte_mbuf *mbufs[BURST_SIZE];
 };
 
+static void print_stats(void);
+
 /*
  * Initialises a given port using global settings and with the rx buffers
  * coming from the mbuf_pool passed as parameter
@@ -101,9 +152,13 @@ static inline int
 port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 {
 	struct rte_eth_conf port_conf = port_conf_default;
-	const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
-	int retval;
+	const uint16_t rxRings = 1;
+	uint16_t txRings = rte_lcore_count() - 1;
 	uint16_t q;
+	int retval;
+
+	if (txRings > RTE_MAX_ETHPORTS)
+		txRings = RTE_MAX_ETHPORTS;
 
 	if (port >= rte_eth_dev_count())
 		return -1;
@@ -113,7 +168,7 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 		return retval;
 
 	for (q = 0; q < rxRings; q++) {
-		retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
+		retval = rte_eth_rx_queue_setup(port, q, RX_QUEUE_SIZE,
 						rte_eth_dev_socket_id(port),
 						NULL, mbuf_pool);
 		if (retval < 0)
@@ -121,7 +176,7 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 	}
 
 	for (q = 0; q < txRings; q++) {
-		retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE,
+		retval = rte_eth_tx_queue_setup(port, q, TX_QUEUE_SIZE,
 						rte_eth_dev_socket_id(port),
 						NULL);
 		if (retval < 0)
@@ -134,7 +189,8 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 
 	struct rte_eth_link link;
 	rte_eth_link_get_nowait(port, &link);
-	if (!link.link_status) {
+	while (!link.link_status) {
+		printf("Waiting for Link up on port %"PRIu8"\n", port);
 		sleep(1);
 		rte_eth_link_get_nowait(port, &link);
 	}
@@ -160,41 +216,52 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 
 struct lcore_params {
 	unsigned worker_id;
-	struct rte_distributor *d;
-	struct rte_ring *r;
+	struct rte_distributor_burst *d;
+	struct rte_ring *rx_dist_ring;
+	struct rte_ring *dist_tx_ring;
 	struct rte_mempool *mem_pool;
 };
 
-static int
-quit_workers(struct rte_distributor *d, struct rte_mempool *p)
+static inline void
+flush_one_port(struct output_buffer *outbuf, uint8_t outp)
 {
-	const unsigned num_workers = rte_lcore_count() - 2;
-	unsigned i;
-	struct rte_mbuf *bufs[num_workers];
+	unsigned int nb_tx = rte_eth_tx_burst(outp, 0,
+			outbuf->mbufs, outbuf->count);
+	app_stats.tx.tx_pkts += outbuf->count;
 
-	if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
-		printf("line %d: Error getting mbufs from pool\n", __LINE__);
-		return -1;
+	if (unlikely(nb_tx < outbuf->count)) {
+		app_stats.tx.enqdrop_pkts +=  outbuf->count - nb_tx;
+		do {
+			rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
+		} while (++nb_tx < outbuf->count);
 	}
+	outbuf->count = 0;
+}
 
-	for (i = 0; i < num_workers; i++)
-		bufs[i]->hash.rss = i << 1;
+static inline void
+flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports)
+{
+	uint8_t outp;
 
-	rte_distributor_process(d, bufs, num_workers);
-	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
+	for (outp = 0; outp < nb_ports; outp++) {
+		/* skip ports that are not enabled */
+		if ((enabled_port_mask & (1 << outp)) == 0)
+			continue;
 
-	return 0;
+		if (tx_buffers[outp].count == 0)
+			continue;
+
+		flush_one_port(&tx_buffers[outp], outp);
+	}
 }
 
 static int
 lcore_rx(struct lcore_params *p)
 {
-	struct rte_distributor *d = p->d;
-	struct rte_mempool *mem_pool = p->mem_pool;
-	struct rte_ring *r = p->r;
 	const uint8_t nb_ports = rte_eth_dev_count();
 	const int socket_id = rte_socket_id();
 	uint8_t port;
+	struct rte_mbuf *bufs[BURST_SIZE*2];
 
 	for (port = 0; port < nb_ports; port++) {
 		/* skip ports that are not enabled */
@@ -210,6 +277,7 @@ lcore_rx(struct lcore_params *p)
 
 	printf("\nCore %u doing packet RX.\n", rte_lcore_id());
 	port = 0;
+
 	while (!quit_signal_rx) {
 
 		/* skip ports that are not enabled */
@@ -218,7 +286,7 @@ lcore_rx(struct lcore_params *p)
 				port = 0;
 			continue;
 		}
-		struct rte_mbuf *bufs[BURST_SIZE*2];
+
 		const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
 				BURST_SIZE);
 		if (unlikely(nb_rx == 0)) {
@@ -228,19 +296,46 @@ lcore_rx(struct lcore_params *p)
 		}
 		app_stats.rx.rx_pkts += nb_rx;
 
-		rte_distributor_process(d, bufs, nb_rx);
-		const uint16_t nb_ret = rte_distributor_returned_pkts(d,
-				bufs, BURST_SIZE*2);
+/*
+ * You can run the distributor on the rx core with this code. Returned
+ * packets are then send straight to the tx core.
+ */
+#if 0
+
+#if BURST_API
+	rte_distributor_process_burst(d, bufs, nb_rx);
+	const uint16_t nb_ret = rte_distributor_returned_pkts_burst(d,
+			bufs, BURST_SIZE*2);
+#else
+	rte_distributor_process(d, bufs, nb_rx);
+	const uint16_t nb_ret = rte_distributor_returned_pkts(d,
+			bufs, BURST_SIZE*2);
+#endif
+
 		app_stats.rx.returned_pkts += nb_ret;
 		if (unlikely(nb_ret == 0)) {
 			if (++port == nb_ports)
 				port = 0;
 			continue;
 		}
-
-		uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret);
+		struct rte_ring *tx_ring = p->dist_tx_ring;
+		uint16_t sent = rte_ring_enqueue_burst(tx_ring,
+				(void *)bufs, nb_ret);
+#else
+		uint16_t nb_ret = nb_rx;
+		/*
+		* Swap the following two lines if you want the rx traffic
+		* to go directly to tx, no distribution.
+		*/
+		struct rte_ring *out_ring = p->rx_dist_ring;
+		//struct rte_ring *out_ring = p->dist_tx_ring;
+
+		uint16_t sent = rte_ring_enqueue_burst(out_ring,
+				(void *)bufs, nb_ret);
+#endif
 		app_stats.rx.enqueued_pkts += sent;
 		if (unlikely(sent < nb_ret)) {
+			app_stats.rx.enqdrop_pkts +=  nb_ret - sent;
 			RTE_LOG_DP(DEBUG, DISTRAPP,
 				"%s:Packet loss due to full ring\n", __func__);
 			while (sent < nb_ret)
@@ -249,56 +344,88 @@ lcore_rx(struct lcore_params *p)
 		if (++port == nb_ports)
 			port = 0;
 	}
-	rte_distributor_process(d, NULL, 0);
-	/* flush distributor to bring to known state */
-	rte_distributor_flush(d);
 	/* set worker & tx threads quit flag */
+	printf("\nCore %u exiting rx task.\n", rte_lcore_id());
 	quit_signal = 1;
-	/*
-	 * worker threads may hang in get packet as
-	 * distributor process is not running, just make sure workers
-	 * get packets till quit_signal is actually been
-	 * received and they gracefully shutdown
-	 */
-	if (quit_workers(d, mem_pool) != 0)
-		return -1;
-	/* rx thread should quit at last */
 	return 0;
 }
 
-static inline void
-flush_one_port(struct output_buffer *outbuf, uint8_t outp)
-{
-	unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
-			outbuf->count);
-	app_stats.tx.tx_pkts += nb_tx;
 
-	if (unlikely(nb_tx < outbuf->count)) {
-		RTE_LOG_DP(DEBUG, DISTRAPP,
-			"%s:Packet loss with tx_burst\n", __func__);
-		do {
-			rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
-		} while (++nb_tx < outbuf->count);
-	}
-	outbuf->count = 0;
-}
 
-static inline void
-flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports)
+static int
+lcore_distributor(struct lcore_params *p)
 {
-	uint8_t outp;
-	for (outp = 0; outp < nb_ports; outp++) {
-		/* skip ports that are not enabled */
-		if ((enabled_port_mask & (1 << outp)) == 0)
-			continue;
-
-		if (tx_buffers[outp].count == 0)
-			continue;
-
-		flush_one_port(&tx_buffers[outp], outp);
+	struct rte_ring *in_r = p->rx_dist_ring;
+	struct rte_ring *out_r = p->dist_tx_ring;
+	struct rte_mbuf *bufs[BURST_SIZE * 4];
+	struct rte_distributor_burst *d = p->d;
+
+	printf("\nCore %u acting as distributor core.\n", rte_lcore_id());
+	while (!quit_signal_dist) {
+		const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
+				(void *)bufs, BURST_SIZE*1);
+		if (nb_rx) {
+			app_stats.dist.in_pkts += nb_rx;
+/*
+ * This '#if' allows you to bypass the distributor. Incoming packets may be
+ * sent straight to the tx ring.
+ */
+#if 1
+
+#if BURST_API
+			/* Distribute the packets */
+			rte_distributor_process_burst(d, bufs, nb_rx);
+			/* Handle Returns */
+			const uint16_t nb_ret =
+				rte_distributor_returned_pkts_burst(d,
+					bufs, BURST_SIZE*2);
+#else
+			/* Distribute the packets */
+			rte_distributor_process(d, bufs, nb_rx);
+			/* Handle Returns */
+			const uint16_t nb_ret =
+				rte_distributor_returned_pkts(d,
+					bufs, BURST_SIZE*2);
+#endif
+
+#else
+			/* Bypass the distributor */
+			const unsigned int xor_val = (rte_eth_dev_count() > 1);
+			/* Touch the mbuf by xor'ing the port */
+			for (unsigned int i = 0; i < nb_rx; i++)
+				bufs[i]->port ^= xor_val;
+
+			const uint16_t nb_ret = nb_rx;
+#endif
+			if (unlikely(nb_ret == 0))
+				continue;
+			app_stats.dist.ret_pkts += nb_ret;
+
+			uint16_t sent = rte_ring_enqueue_burst(out_r,
+					(void *)bufs, nb_ret);
+			app_stats.dist.sent_pkts += sent;
+			if (unlikely(sent < nb_ret)) {
+				app_stats.dist.enqdrop_pkts += nb_ret - sent;
+				RTE_LOG(DEBUG, DISTRAPP,
+					"%s:Packet loss due to full out ring\n",
+					__func__);
+				while (sent < nb_ret)
+					rte_pktmbuf_free(bufs[sent++]);
+			}
+		}
 	}
+	printf("\nCore %u exiting distributor task.\n", rte_lcore_id());
+	quit_signal_work = 1;
+
+#if BURST_API
+	/* Unblock any returns so workers can exit */
+	rte_distributor_clear_returns_burst(d);
+#endif
+	quit_signal_rx = 1;
+	return 0;
 }
 
+
 static int
 lcore_tx(struct rte_ring *in_r)
 {
@@ -327,9 +454,9 @@ lcore_tx(struct rte_ring *in_r)
 			if ((enabled_port_mask & (1 << port)) == 0)
 				continue;
 
-			struct rte_mbuf *bufs[BURST_SIZE];
+			struct rte_mbuf *bufs[BURST_SIZE_TX];
 			const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
-					(void *)bufs, BURST_SIZE);
+					(void *)bufs, BURST_SIZE_TX);
 			app_stats.tx.dequeue_pkts += nb_rx;
 
 			/* if we get no traffic, flush anything we have */
@@ -358,11 +485,12 @@ lcore_tx(struct rte_ring *in_r)
 
 				outbuf = &tx_buffers[outp];
 				outbuf->mbufs[outbuf->count++] = bufs[i];
-				if (outbuf->count == BURST_SIZE)
+				if (outbuf->count == BURST_SIZE_TX)
 					flush_one_port(outbuf, outp);
 			}
 		}
 	}
+	printf("\nCore %u exiting tx task.\n", rte_lcore_id());
 	return 0;
 }
 
@@ -371,7 +499,7 @@ int_handler(int sig_num)
 {
 	printf("Exiting on signal %d\n", sig_num);
 	/* set quit flag for rx thread to exit */
-	quit_signal_rx = 1;
+	quit_signal_dist = 1;
 }
 
 static void
@@ -379,44 +507,138 @@ print_stats(void)
 {
 	struct rte_eth_stats eth_stats;
 	unsigned i;
-
-	printf("\nRX thread stats:\n");
-	printf(" - Received:    %"PRIu64"\n", app_stats.rx.rx_pkts);
-	printf(" - Processed:   %"PRIu64"\n", app_stats.rx.returned_pkts);
-	printf(" - Enqueued:    %"PRIu64"\n", app_stats.rx.enqueued_pkts);
-
-	printf("\nTX thread stats:\n");
-	printf(" - Dequeued:    %"PRIu64"\n", app_stats.tx.dequeue_pkts);
-	printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts);
+	const unsigned int num_workers = rte_lcore_count() - 4;
 
 	for (i = 0; i < rte_eth_dev_count(); i++) {
 		rte_eth_stats_get(i, &eth_stats);
-		printf("\nPort %u stats:\n", i);
-		printf(" - Pkts in:   %"PRIu64"\n", eth_stats.ipackets);
-		printf(" - Pkts out:  %"PRIu64"\n", eth_stats.opackets);
-		printf(" - In Errs:   %"PRIu64"\n", eth_stats.ierrors);
-		printf(" - Out Errs:  %"PRIu64"\n", eth_stats.oerrors);
-		printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf);
+		app_stats.port_rx_pkts[i] = eth_stats.ipackets;
+		app_stats.port_tx_pkts[i] = eth_stats.opackets;
+	}
+
+	printf("\n\nRX Thread:\n");
+	for (i = 0; i < rte_eth_dev_count(); i++) {
+		printf("Port %u Pktsin : %5.2f\n", i,
+				(app_stats.port_rx_pkts[i] -
+				prev_app_stats.port_rx_pkts[i])/1000000.0);
+		prev_app_stats.port_rx_pkts[i] = app_stats.port_rx_pkts[i];
+	}
+	printf(" - Received:    %5.2f\n",
+			(app_stats.rx.rx_pkts -
+			prev_app_stats.rx.rx_pkts)/1000000.0);
+	printf(" - Returned:    %5.2f\n",
+			(app_stats.rx.returned_pkts -
+			prev_app_stats.rx.returned_pkts)/1000000.0);
+	printf(" - Enqueued:    %5.2f\n",
+			(app_stats.rx.enqueued_pkts -
+			prev_app_stats.rx.enqueued_pkts)/1000000.0);
+	printf(" - Dropped:     %s%5.2f%s\n", ANSI_COLOR_RED,
+			(app_stats.rx.enqdrop_pkts -
+			prev_app_stats.rx.enqdrop_pkts)/1000000.0,
+			ANSI_COLOR_RESET);
+
+	printf("Distributor thread:\n");
+	printf(" - In:          %5.2f\n",
+			(app_stats.dist.in_pkts -
+			prev_app_stats.dist.in_pkts)/1000000.0);
+	printf(" - Returned:    %5.2f\n",
+			(app_stats.dist.ret_pkts -
+			prev_app_stats.dist.ret_pkts)/1000000.0);
+	printf(" - Sent:        %5.2f\n",
+			(app_stats.dist.sent_pkts -
+			prev_app_stats.dist.sent_pkts)/1000000.0);
+	printf(" - Dropped      %s%5.2f%s\n", ANSI_COLOR_RED,
+			(app_stats.dist.enqdrop_pkts -
+			prev_app_stats.dist.enqdrop_pkts)/1000000.0,
+			ANSI_COLOR_RESET);
+
+	printf("TX thread:\n");
+	printf(" - Dequeued:    %5.2f\n",
+			(app_stats.tx.dequeue_pkts -
+			prev_app_stats.tx.dequeue_pkts)/1000000.0);
+	for (i = 0; i < rte_eth_dev_count(); i++) {
+		printf("Port %u Pktsout: %5.2f\n",
+				i, (app_stats.port_tx_pkts[i] -
+				prev_app_stats.port_tx_pkts[i])/1000000.0);
+		prev_app_stats.port_tx_pkts[i] = app_stats.port_tx_pkts[i];
+	}
+	printf(" - Transmitted: %5.2f\n",
+			(app_stats.tx.tx_pkts -
+			prev_app_stats.tx.tx_pkts)/1000000.0);
+	printf(" - Dropped:     %s%5.2f%s\n", ANSI_COLOR_RED,
+			(app_stats.tx.enqdrop_pkts -
+			prev_app_stats.tx.enqdrop_pkts)/1000000.0,
+			ANSI_COLOR_RESET);
+
+	prev_app_stats.rx.rx_pkts = app_stats.rx.rx_pkts;
+	prev_app_stats.rx.returned_pkts = app_stats.rx.returned_pkts;
+	prev_app_stats.rx.enqueued_pkts = app_stats.rx.enqueued_pkts;
+	prev_app_stats.rx.enqdrop_pkts = app_stats.rx.enqdrop_pkts;
+	prev_app_stats.dist.in_pkts = app_stats.dist.in_pkts;
+	prev_app_stats.dist.ret_pkts = app_stats.dist.ret_pkts;
+	prev_app_stats.dist.sent_pkts = app_stats.dist.sent_pkts;
+	prev_app_stats.dist.enqdrop_pkts = app_stats.dist.enqdrop_pkts;
+	prev_app_stats.tx.dequeue_pkts = app_stats.tx.dequeue_pkts;
+	prev_app_stats.tx.tx_pkts = app_stats.tx.tx_pkts;
+	prev_app_stats.tx.enqdrop_pkts = app_stats.tx.enqdrop_pkts;
+
+	for (i = 0; i < num_workers; i++) {
+		printf("Worker %02u Pkts: %5.2f. Bursts(1-8): ", i,
+				(app_stats.worker_pkts[i] -
+				prev_app_stats.worker_pkts[i])/1000000.0);
+		for (int j = 0; j < 8; j++)
+			printf("%ld ", app_stats.worker_bursts[i][j]);
+		printf("\n");
+		prev_app_stats.worker_pkts[i] = app_stats.worker_pkts[i];
 	}
 }
 
 static int
 lcore_worker(struct lcore_params *p)
 {
-	struct rte_distributor *d = p->d;
+	struct rte_distributor_burst *d = p->d;
 	const unsigned id = p->worker_id;
+	unsigned int num = 0;
+
 	/*
 	 * for single port, xor_val will be zero so we won't modify the output
 	 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa
 	 */
 	const unsigned xor_val = (rte_eth_dev_count() > 1);
-	struct rte_mbuf *buf = NULL;
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
+
+	for (int i = 0; i < 8; i++)
+		buf[i] = NULL;
+
+	app_stats.worker_pkts[p->worker_id] = 1;
+
 
 	printf("\nCore %u acting as worker core.\n", rte_lcore_id());
-	while (!quit_signal) {
-		buf = rte_distributor_get_pkt(d, id, buf);
-		buf->port ^= xor_val;
+	while (!quit_signal_work) {
+
+#if BURST_API
+		num = rte_distributor_get_pkt_burst(d, id, buf, buf, num);
+		/* Do a little bit of work for each packet */
+		for (unsigned int i = 0; i < num; i++) {
+			uint64_t t = __rdtsc()+100;
+
+			while (__rdtsc() < t)
+				rte_pause();
+			buf[i]->port ^= xor_val;
+		}
+#else
+		buf[0] = rte_distributor_get_pkt(d, id, buf[0]);
+		uint64_t t = __rdtsc() + 10;
+
+		while (__rdtsc() < t)
+			rte_pause();
+		buf[0]->port ^= xor_val;
+#endif
+
+		app_stats.worker_pkts[p->worker_id] += num;
+		if (num > 0)
+			app_stats.worker_bursts[p->worker_id][num-1]++;
 	}
+	printf("\nCore %u exiting worker task.\n", rte_lcore_id());
 	return 0;
 }
 
@@ -496,12 +718,14 @@ int
 main(int argc, char *argv[])
 {
 	struct rte_mempool *mbuf_pool;
-	struct rte_distributor *d;
-	struct rte_ring *output_ring;
+	struct rte_distributor_burst *d;
+	struct rte_ring *dist_tx_ring;
+	struct rte_ring *rx_dist_ring;
 	unsigned lcore_id, worker_id = 0;
 	unsigned nb_ports;
 	uint8_t portid;
 	uint8_t nb_ports_available;
+	uint64_t t, freq;
 
 	/* catch ctrl-c so we can print on exit */
 	signal(SIGINT, int_handler);
@@ -518,10 +742,12 @@ main(int argc, char *argv[])
 	if (ret < 0)
 		rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n");
 
-	if (rte_lcore_count() < 3)
+	if (rte_lcore_count() < 5)
 		rte_exit(EXIT_FAILURE, "Error, This application needs at "
-				"least 3 logical cores to run:\n"
-				"1 lcore for packet RX and distribution\n"
+				"least 5 logical cores to run:\n"
+				"1 lcore for stats (can be core 0)\n"
+				"1 lcore for packet RX\n"
+				"1 lcore for distribution\n"
 				"1 lcore for packet TX\n"
 				"and at least 1 lcore for worker threads\n");
 
@@ -560,41 +786,86 @@ main(int argc, char *argv[])
 				"All available ports are disabled. Please set portmask.\n");
 	}
 
+#if BURST_API
+	d = rte_distributor_create_burst("PKT_DIST", rte_socket_id(),
+			rte_lcore_count() - 4);
+#else
 	d = rte_distributor_create("PKT_DIST", rte_socket_id(),
-			rte_lcore_count() - 2);
+			rte_lcore_count() - 4);
+#endif
 	if (d == NULL)
 		rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
 
 	/*
-	 * scheduler ring is read only by the transmitter core, but written to
-	 * by multiple threads
+	 * scheduler ring is read by the transmitter core, and written to
+	 * by scheduler core
 	 */
-	output_ring = rte_ring_create("Output_ring", RTE_RING_SZ,
-			rte_socket_id(), RING_F_SC_DEQ);
-	if (output_ring == NULL)
+	dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ,
+			rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
+	if (dist_tx_ring == NULL)
+		rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
+
+	rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ,
+			rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
+	if (rx_dist_ring == NULL)
 		rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
 
 	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
-		if (worker_id == rte_lcore_count() - 2)
+		if (worker_id == rte_lcore_count() - 3) {
+			printf("Starting distributor on lcore_id %d\n",
+					lcore_id);
+			/* distributor core */
+			struct lcore_params *p =
+					rte_malloc(NULL, sizeof(*p), 0);
+			if (!p)
+				rte_panic("malloc failure\n");
+			*p = (struct lcore_params){worker_id, d,
+					rx_dist_ring, dist_tx_ring, mbuf_pool};
+			rte_eal_remote_launch(
+					(lcore_function_t *)lcore_distributor,
+					p, lcore_id);
+		} else if (worker_id == rte_lcore_count() - 4) {
+			printf("Starting tx  on worker_id %d, lcore_id %d\n",
+					worker_id, lcore_id);
+			/* tx core */
 			rte_eal_remote_launch((lcore_function_t *)lcore_tx,
-					output_ring, lcore_id);
-		else {
+					dist_tx_ring, lcore_id);
+		} else if (worker_id == rte_lcore_count() - 2) {
+			printf("Starting rx on worker_id %d, lcore_id %d\n",
+					worker_id, lcore_id);
+			/* rx core */
+			struct lcore_params *p =
+					rte_malloc(NULL, sizeof(*p), 0);
+			if (!p)
+				rte_panic("malloc failure\n");
+			*p = (struct lcore_params){worker_id, d, rx_dist_ring,
+					dist_tx_ring, mbuf_pool};
+			rte_eal_remote_launch((lcore_function_t *)lcore_rx,
+					p, lcore_id);
+		} else {
+			printf("Starting worker on worker_id %d, lcore_id %d\n",
+					worker_id, lcore_id);
 			struct lcore_params *p =
 					rte_malloc(NULL, sizeof(*p), 0);
 			if (!p)
 				rte_panic("malloc failure\n");
-			*p = (struct lcore_params){worker_id, d, output_ring, mbuf_pool};
+			*p = (struct lcore_params){worker_id, d, rx_dist_ring,
+					dist_tx_ring, mbuf_pool};
 
 			rte_eal_remote_launch((lcore_function_t *)lcore_worker,
 					p, lcore_id);
 		}
 		worker_id++;
 	}
-	/* call lcore_main on master core only */
-	struct lcore_params p = { 0, d, output_ring, mbuf_pool};
 
-	if (lcore_rx(&p) != 0)
-		return -1;
+	freq = rte_get_timer_hz();
+	t = __rdtsc() + freq;
+	while (!quit_signal_dist) {
+		if (t < __rdtsc()) {
+			print_stats();
+			t = _rdtsc() + freq;
+		}
+	}
 
 	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
 		if (rte_eal_wait_lcore(lcore_id) < 0)
-- 
2.7.4

^ permalink raw reply related

* [PATCH v2 2/5] test: unit tests for new distributor burst api
From: David Hunt @ 2016-12-22  4:37 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson, David Hunt
In-Reply-To: <1482381428-148094-1-git-send-email-david.hunt@intel.com>

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 app/test/test_distributor.c | 500 ++++++++++++++++++++++++++++++++++----------
 1 file changed, 391 insertions(+), 109 deletions(-)

diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
index 85cb8f3..7738f04 100644
--- a/app/test/test_distributor.c
+++ b/app/test/test_distributor.c
@@ -1,7 +1,7 @@
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2010-2017 Intel Corporation. All rights reserved.
  *   All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
@@ -40,11 +40,24 @@
 #include <rte_mempool.h>
 #include <rte_mbuf.h>
 #include <rte_distributor.h>
+#include <rte_distributor_burst.h>
 
 #define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
 #define BURST 32
 #define BIG_BATCH 1024
 
+#define DIST_SINGLE 0
+#define DIST_BURST  1
+#define DIST_NUM_TYPES 2
+
+struct worker_params {
+	struct rte_distributor *d;
+	struct rte_distributor_burst *db;
+	int dist_type;
+};
+
+struct worker_params worker_params;
+
 /* statics - all zero-initialized by default */
 static volatile int quit;      /**< general quit variable for all threads */
 static volatile int zero_quit; /**< var for when we just want thr0 to quit*/
@@ -81,17 +94,35 @@ static int
 handle_work(void *arg)
 {
 	struct rte_mbuf *pkt = NULL;
-	struct rte_distributor *d = arg;
-	unsigned count = 0;
-	unsigned id = __sync_fetch_and_add(&worker_idx, 1);
-
-	pkt = rte_distributor_get_pkt(d, id, NULL);
-	while (!quit) {
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
+	struct worker_params *wp = arg;
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
+	unsigned int count = 0, num = 0;
+	unsigned int id = __sync_fetch_and_add(&worker_idx, 1);
+
+	if (wp->dist_type == DIST_SINGLE) {
+		pkt = rte_distributor_get_pkt(d, id, NULL);
+		while (!quit) {
+			worker_stats[id].handled_packets++, count++;
+			pkt = rte_distributor_get_pkt(d, id, pkt);
+		}
 		worker_stats[id].handled_packets++, count++;
-		pkt = rte_distributor_get_pkt(d, id, pkt);
+		rte_distributor_return_pkt(d, id, pkt);
+	} else {
+		for (int i = 0; i < 8; i++)
+			buf[i] = NULL;
+		num = rte_distributor_get_pkt_burst(db, id, buf, buf, num);
+		while (!quit) {
+			worker_stats[id].handled_packets += num;
+			count += num;
+			num = rte_distributor_get_pkt_burst(db, id,
+					buf, buf, num);
+		}
+		worker_stats[id].handled_packets += num;
+		count += num;
+		rte_distributor_return_pkt_burst(db, id, buf, num);
 	}
-	worker_stats[id].handled_packets++, count++;
-	rte_distributor_return_pkt(d, id, pkt);
 	return 0;
 }
 
@@ -107,12 +138,21 @@ handle_work(void *arg)
  *   not necessarily in the same order (as different flows).
  */
 static int
-sanity_test(struct rte_distributor *d, struct rte_mempool *p)
+sanity_test(struct worker_params *wp, struct rte_mempool *p)
 {
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
 	struct rte_mbuf *bufs[BURST];
-	unsigned i;
+	struct rte_mbuf *returns[BURST*2];
+	unsigned int i;
+	unsigned int retries;
+	unsigned int count = 0;
+
+	if (wp->dist_type == DIST_SINGLE)
+		printf("=== Basic distributor sanity tests (single) ===\n");
+	else
+		printf("=== Basic distributor sanity tests (burst) ===\n");
 
-	printf("=== Basic distributor sanity tests ===\n");
 	clear_packet_count();
 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
@@ -124,8 +164,21 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 	for (i = 0; i < BURST; i++)
 		bufs[i]->hash.usr = 0;
 
-	rte_distributor_process(d, bufs, BURST);
-	rte_distributor_flush(d);
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_process(d, bufs, BURST);
+		rte_distributor_flush(d);
+	} else {
+		rte_distributor_process_burst(db, bufs, BURST);
+		count = 0;
+		do {
+
+			rte_distributor_flush_burst(db);
+			count += rte_distributor_returned_pkts_burst(db,
+					returns, BURST*2);
+		} while (count < BURST);
+	}
+
+
 	if (total_packet_count() != BURST) {
 		printf("Line %d: Error, not all packets flushed. "
 				"Expected %u, got %u\n",
@@ -146,8 +199,18 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 		for (i = 0; i < BURST; i++)
 			bufs[i]->hash.usr = (i & 1) << 8;
 
-		rte_distributor_process(d, bufs, BURST);
-		rte_distributor_flush(d);
+		if (wp->dist_type == DIST_SINGLE) {
+			rte_distributor_process(d, bufs, BURST);
+			rte_distributor_flush(d);
+		} else {
+			rte_distributor_process_burst(db, bufs, BURST);
+			count = 0;
+			do {
+				rte_distributor_flush_burst(db);
+				count += rte_distributor_returned_pkts_burst(db,
+						returns, BURST*2);
+			} while (count < BURST);
+		}
 		if (total_packet_count() != BURST) {
 			printf("Line %d: Error, not all packets flushed. "
 					"Expected %u, got %u\n",
@@ -155,24 +218,32 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 			return -1;
 		}
 
+
 		for (i = 0; i < rte_lcore_count() - 1; i++)
 			printf("Worker %u handled %u packets\n", i,
 					worker_stats[i].handled_packets);
 		printf("Sanity test with two hash values done\n");
-
-		if (worker_stats[0].handled_packets != 16 ||
-				worker_stats[1].handled_packets != 16)
-			return -1;
 	}
 
 	/* give a different hash value to each packet,
 	 * so load gets distributed */
 	clear_packet_count();
 	for (i = 0; i < BURST; i++)
-		bufs[i]->hash.usr = i;
+		bufs[i]->hash.usr = i+1;
+
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_process(d, bufs, BURST);
+		rte_distributor_flush(d);
+	} else {
+		rte_distributor_process_burst(db, bufs, BURST);
+		count = 0;
+		do {
+			rte_distributor_flush_burst(db);
+			count += rte_distributor_returned_pkts_burst(db,
+					returns, BURST*2);
+		} while (count < BURST);
+	}
 
-	rte_distributor_process(d, bufs, BURST);
-	rte_distributor_flush(d);
 	if (total_packet_count() != BURST) {
 		printf("Line %d: Error, not all packets flushed. "
 				"Expected %u, got %u\n",
@@ -194,8 +265,15 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 	unsigned num_returned = 0;
 
 	/* flush out any remaining packets */
-	rte_distributor_flush(d);
-	rte_distributor_clear_returns(d);
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_flush(d);
+		rte_distributor_clear_returns(d);
+	} else {
+		rte_distributor_flush_burst(db);
+		rte_distributor_clear_returns_burst(db);
+	}
+
+
 	if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) {
 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
 		return -1;
@@ -203,28 +281,59 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 	for (i = 0; i < BIG_BATCH; i++)
 		many_bufs[i]->hash.usr = i << 2;
 
-	for (i = 0; i < BIG_BATCH/BURST; i++) {
-		rte_distributor_process(d, &many_bufs[i*BURST], BURST);
+	if (wp->dist_type == DIST_SINGLE) {
+		printf("===testing single big burst===\n");
+		for (i = 0; i < BIG_BATCH/BURST; i++) {
+			rte_distributor_process(d, &many_bufs[i*BURST], BURST);
+			num_returned += rte_distributor_returned_pkts(d,
+					&return_bufs[num_returned],
+					BIG_BATCH - num_returned);
+		}
+		rte_distributor_flush(d);
 		num_returned += rte_distributor_returned_pkts(d,
 				&return_bufs[num_returned],
 				BIG_BATCH - num_returned);
+	} else {
+		printf("===testing burst big burst===\n");
+		for (i = 0; i < BIG_BATCH/BURST; i++) {
+			rte_distributor_process_burst(db,
+					&many_bufs[i*BURST], BURST);
+			count = rte_distributor_returned_pkts_burst(db,
+					&return_bufs[num_returned],
+					BIG_BATCH - num_returned);
+			num_returned += count;
+		}
+		rte_distributor_flush_burst(db);
+		count = rte_distributor_returned_pkts_burst(db,
+				&return_bufs[num_returned],
+				BIG_BATCH - num_returned);
+		num_returned += count;
 	}
-	rte_distributor_flush(d);
-	num_returned += rte_distributor_returned_pkts(d,
-			&return_bufs[num_returned], BIG_BATCH - num_returned);
+	retries = 0;
+	do {
+		rte_distributor_flush_burst(db);
+		count = rte_distributor_returned_pkts_burst(db,
+				&return_bufs[num_returned],
+				BIG_BATCH - num_returned);
+		num_returned += count;
+		retries++;
+	} while ((num_returned < BIG_BATCH) && (retries < 100));
+
 
 	if (num_returned != BIG_BATCH) {
-		printf("line %d: Number returned is not the same as "
-				"number sent\n", __LINE__);
+		printf("line %d: Missing packets, expected %d\n",
+				__LINE__, num_returned);
 		return -1;
 	}
+
 	/* big check -  make sure all packets made it back!! */
 	for (i = 0; i < BIG_BATCH; i++) {
 		unsigned j;
 		struct rte_mbuf *src = many_bufs[i];
-		for (j = 0; j < BIG_BATCH; j++)
+		for (j = 0; j < BIG_BATCH; j++) {
 			if (return_bufs[j] == src)
 				break;
+		}
 
 		if (j == BIG_BATCH) {
 			printf("Error: could not find source packet #%u\n", i);
@@ -234,7 +343,6 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 	printf("Sanity test of returned packets done\n");
 
 	rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
-
 	printf("\n");
 	return 0;
 }
@@ -249,18 +357,40 @@ static int
 handle_work_with_free_mbufs(void *arg)
 {
 	struct rte_mbuf *pkt = NULL;
-	struct rte_distributor *d = arg;
-	unsigned count = 0;
-	unsigned id = __sync_fetch_and_add(&worker_idx, 1);
-
-	pkt = rte_distributor_get_pkt(d, id, NULL);
-	while (!quit) {
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
+	struct worker_params *wp = arg;
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
+	unsigned int count = 0;
+	unsigned int i = 0;
+	unsigned int num = 0;
+	unsigned int id = __sync_fetch_and_add(&worker_idx, 1);
+
+	if (wp->dist_type == DIST_SINGLE) {
+		pkt = rte_distributor_get_pkt(d, id, NULL);
+		while (!quit) {
+			worker_stats[id].handled_packets++, count++;
+			rte_pktmbuf_free(pkt);
+			pkt = rte_distributor_get_pkt(d, id, pkt);
+		}
 		worker_stats[id].handled_packets++, count++;
-		rte_pktmbuf_free(pkt);
-		pkt = rte_distributor_get_pkt(d, id, pkt);
+		rte_distributor_return_pkt(d, id, pkt);
+	} else {
+		for (int i = 0; i < 8; i++)
+			buf[i] = NULL;
+		num = rte_distributor_get_pkt_burst(db, id, buf, buf, num);
+		while (!quit) {
+			worker_stats[id].handled_packets += num;
+			count += num;
+			for (i = 0; i < num; i++)
+				rte_pktmbuf_free(buf[i]);
+			num = rte_distributor_get_pkt_burst(db,
+					id, buf, buf, num);
+		}
+		worker_stats[id].handled_packets += num;
+		count += num;
+		rte_distributor_return_pkt_burst(db, id, buf, num);
 	}
-	worker_stats[id].handled_packets++, count++;
-	rte_distributor_return_pkt(d, id, pkt);
 	return 0;
 }
 
@@ -270,26 +400,45 @@ handle_work_with_free_mbufs(void *arg)
  * library.
  */
 static int
-sanity_test_with_mbuf_alloc(struct rte_distributor *d, struct rte_mempool *p)
+sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p)
 {
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
 	unsigned i;
 	struct rte_mbuf *bufs[BURST];
 
-	printf("=== Sanity test with mbuf alloc/free  ===\n");
+	if (wp->dist_type == DIST_SINGLE)
+		printf("=== Sanity test with mbuf alloc/free (single) ===\n");
+	else
+		printf("=== Sanity test with mbuf alloc/free (burst)  ===\n");
+
 	clear_packet_count();
 	for (i = 0; i < ((1<<ITER_POWER)); i += BURST) {
 		unsigned j;
-		while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
-			rte_distributor_process(d, NULL, 0);
+		while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0) {
+			if (wp->dist_type == DIST_SINGLE)
+				rte_distributor_process(d, NULL, 0);
+			else
+				rte_distributor_process_burst(db, NULL, 0);
+		}
 		for (j = 0; j < BURST; j++) {
 			bufs[j]->hash.usr = (i+j) << 1;
 			rte_mbuf_refcnt_set(bufs[j], 1);
 		}
 
-		rte_distributor_process(d, bufs, BURST);
+		if (wp->dist_type == DIST_SINGLE)
+			rte_distributor_process(d, bufs, BURST);
+		else
+			rte_distributor_process_burst(db, bufs, BURST);
 	}
 
-	rte_distributor_flush(d);
+	if (wp->dist_type == DIST_SINGLE)
+		rte_distributor_flush(d);
+	else
+		rte_distributor_flush_burst(db);
+
+	rte_delay_us(10000);
+
 	if (total_packet_count() < (1<<ITER_POWER)) {
 		printf("Line %u: Packet count is incorrect, %u, expected %u\n",
 				__LINE__, total_packet_count(),
@@ -305,20 +454,48 @@ static int
 handle_work_for_shutdown_test(void *arg)
 {
 	struct rte_mbuf *pkt = NULL;
-	struct rte_distributor *d = arg;
-	unsigned count = 0;
-	const unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
+	struct worker_params *wp = arg;
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
+	unsigned int count = 0;
+	unsigned int num = 0;
+	unsigned int total = 0;
+	unsigned int i;
+	unsigned int returned = 0;
+	const unsigned int id = __sync_fetch_and_add(&worker_idx, 1);
+
+	if (wp->dist_type == DIST_SINGLE)
+		pkt = rte_distributor_get_pkt(d, id, NULL);
+	else
+		num = rte_distributor_get_pkt_burst(db, id, buf, buf, num);
 
-	pkt = rte_distributor_get_pkt(d, id, NULL);
 	/* wait for quit single globally, or for worker zero, wait
 	 * for zero_quit */
 	while (!quit && !(id == 0 && zero_quit)) {
-		worker_stats[id].handled_packets++, count++;
-		rte_pktmbuf_free(pkt);
-		pkt = rte_distributor_get_pkt(d, id, NULL);
+		if (wp->dist_type == DIST_SINGLE) {
+			worker_stats[id].handled_packets++, count++;
+			rte_pktmbuf_free(pkt);
+			pkt = rte_distributor_get_pkt(d, id, NULL);
+			num = 1;
+			total += num;
+		} else {
+			worker_stats[id].handled_packets += num;
+			count += num;
+			for (i = 0; i < num; i++)
+				rte_pktmbuf_free(buf[i]);
+			num = rte_distributor_get_pkt_burst(db,
+					id, buf, buf, num);
+			total += num;
+		}
+	}
+	worker_stats[id].handled_packets += num;
+	count += num;
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_return_pkt(d, id, pkt);
+	} else {
+		returned = rte_distributor_return_pkt_burst(db, id, buf, num);
 	}
-	worker_stats[id].handled_packets++, count++;
-	rte_distributor_return_pkt(d, id, pkt);
 
 	if (id == 0) {
 		/* for worker zero, allow it to restart to pick up last packet
@@ -326,13 +503,29 @@ handle_work_for_shutdown_test(void *arg)
 		 */
 		while (zero_quit)
 			usleep(100);
-		pkt = rte_distributor_get_pkt(d, id, NULL);
+		if (wp->dist_type == DIST_SINGLE) {
+			pkt = rte_distributor_get_pkt(d, id, NULL);
+		} else {
+			num = rte_distributor_get_pkt_burst(db,
+					id, buf, buf, num);
+		}
 		while (!quit) {
 			worker_stats[id].handled_packets++, count++;
 			rte_pktmbuf_free(pkt);
-			pkt = rte_distributor_get_pkt(d, id, NULL);
+			if (wp->dist_type == DIST_SINGLE) {
+				pkt = rte_distributor_get_pkt(d, id, NULL);
+			} else {
+				num = rte_distributor_get_pkt_burst(db,
+						id, buf, buf, num);
+			}
+		}
+		if (wp->dist_type == DIST_SINGLE) {
+			rte_distributor_return_pkt(d, id, pkt);
+		} else {
+			returned = rte_distributor_return_pkt_burst(db,
+					id, buf, num);
+			printf("Num returned = %d\n", returned);
 		}
-		rte_distributor_return_pkt(d, id, pkt);
 	}
 	return 0;
 }
@@ -344,26 +537,37 @@ handle_work_for_shutdown_test(void *arg)
  * library.
  */
 static int
-sanity_test_with_worker_shutdown(struct rte_distributor *d,
+sanity_test_with_worker_shutdown(struct worker_params *wp,
 		struct rte_mempool *p)
 {
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
 	struct rte_mbuf *bufs[BURST];
 	unsigned i;
 
 	printf("=== Sanity test of worker shutdown ===\n");
 
 	clear_packet_count();
+
 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
 		return -1;
 	}
 
-	/* now set all hash values in all buffers to zero, so all pkts go to the
-	 * one worker thread */
+	/*
+	 * Now set all hash values in all buffers to same value so all
+	 * pkts go to the one worker thread
+	 */
 	for (i = 0; i < BURST; i++)
-		bufs[i]->hash.usr = 0;
+		bufs[i]->hash.usr = 1;
+
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_process(d, bufs, BURST);
+	} else {
+		rte_distributor_process_burst(db, bufs, BURST);
+		rte_distributor_flush_burst(db);
+	}
 
-	rte_distributor_process(d, bufs, BURST);
 	/* at this point, we will have processed some packets and have a full
 	 * backlog for the other ones at worker 0.
 	 */
@@ -374,14 +578,25 @@ sanity_test_with_worker_shutdown(struct rte_distributor *d,
 		return -1;
 	}
 	for (i = 0; i < BURST; i++)
-		bufs[i]->hash.usr = 0;
+		bufs[i]->hash.usr = 1;
 
 	/* get worker zero to quit */
 	zero_quit = 1;
-	rte_distributor_process(d, bufs, BURST);
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_process(d, bufs, BURST);
+		/* flush the distributor */
+		rte_distributor_flush(d);
+	} else {
+		rte_distributor_process_burst(db, bufs, BURST);
+		/* flush the distributor */
+		rte_distributor_flush_burst(db);
+	}
+	rte_delay_us(10000);
+
+	for (i = 0; i < rte_lcore_count() - 1; i++)
+		printf("Worker %u handled %u packets\n", i,
+				worker_stats[i].handled_packets);
 
-	/* flush the distributor */
-	rte_distributor_flush(d);
 	if (total_packet_count() != BURST * 2) {
 		printf("Line %d: Error, not all packets flushed. "
 				"Expected %u, got %u\n",
@@ -389,10 +604,6 @@ sanity_test_with_worker_shutdown(struct rte_distributor *d,
 		return -1;
 	}
 
-	for (i = 0; i < rte_lcore_count() - 1; i++)
-		printf("Worker %u handled %u packets\n", i,
-				worker_stats[i].handled_packets);
-
 	printf("Sanity test with worker shutdown passed\n\n");
 	return 0;
 }
@@ -401,13 +612,18 @@ sanity_test_with_worker_shutdown(struct rte_distributor *d,
  * one worker shuts down..
  */
 static int
-test_flush_with_worker_shutdown(struct rte_distributor *d,
+test_flush_with_worker_shutdown(struct worker_params *wp,
 		struct rte_mempool *p)
 {
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
 	struct rte_mbuf *bufs[BURST];
 	unsigned i;
 
-	printf("=== Test flush fn with worker shutdown ===\n");
+	if (wp->dist_type == DIST_SINGLE)
+		printf("=== Test flush fn with worker shutdown (single) ===\n");
+	else
+		printf("=== Test flush fn with worker shutdown (burst) ===\n");
 
 	clear_packet_count();
 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
@@ -420,7 +636,11 @@ test_flush_with_worker_shutdown(struct rte_distributor *d,
 	for (i = 0; i < BURST; i++)
 		bufs[i]->hash.usr = 0;
 
-	rte_distributor_process(d, bufs, BURST);
+	if (wp->dist_type == DIST_SINGLE)
+		rte_distributor_process(d, bufs, BURST);
+	else
+		rte_distributor_process_burst(db, bufs, BURST);
+
 	/* at this point, we will have processed some packets and have a full
 	 * backlog for the other ones at worker 0.
 	 */
@@ -429,9 +649,18 @@ test_flush_with_worker_shutdown(struct rte_distributor *d,
 	zero_quit = 1;
 
 	/* flush the distributor */
-	rte_distributor_flush(d);
+	if (wp->dist_type == DIST_SINGLE)
+		rte_distributor_flush(d);
+	else
+		rte_distributor_flush_burst(db);
+
+	rte_delay_us(10000);
 
 	zero_quit = 0;
+	for (i = 0; i < rte_lcore_count() - 1; i++)
+		printf("Worker %u handled %u packets\n", i,
+				worker_stats[i].handled_packets);
+
 	if (total_packet_count() != BURST) {
 		printf("Line %d: Error, not all packets flushed. "
 				"Expected %u, got %u\n",
@@ -439,10 +668,6 @@ test_flush_with_worker_shutdown(struct rte_distributor *d,
 		return -1;
 	}
 
-	for (i = 0; i < rte_lcore_count() - 1; i++)
-		printf("Worker %u handled %u packets\n", i,
-				worker_stats[i].handled_packets);
-
 	printf("Flush test with worker shutdown passed\n\n");
 	return 0;
 }
@@ -451,6 +676,7 @@ static
 int test_error_distributor_create_name(void)
 {
 	struct rte_distributor *d = NULL;
+	struct rte_distributor_burst *db = NULL;
 	char *name = NULL;
 
 	d = rte_distributor_create(name, rte_socket_id(),
@@ -460,6 +686,13 @@ int test_error_distributor_create_name(void)
 		return -1;
 	}
 
+	db = rte_distributor_create_burst(name, rte_socket_id(),
+			rte_lcore_count() - 1);
+	if (db != NULL || rte_errno != EINVAL) {
+		printf("ERROR: No error on create_burst() with NULL param\n");
+		return -1;
+	}
+
 	return 0;
 }
 
@@ -468,20 +701,32 @@ static
 int test_error_distributor_create_numworkers(void)
 {
 	struct rte_distributor *d = NULL;
+	struct rte_distributor_burst *db = NULL;
+
 	d = rte_distributor_create("test_numworkers", rte_socket_id(),
 			RTE_MAX_LCORE + 10);
 	if (d != NULL || rte_errno != EINVAL) {
 		printf("ERROR: No error on create() with num_workers > MAX\n");
 		return -1;
 	}
+
+	db = rte_distributor_create_burst("test_numworkers", rte_socket_id(),
+			RTE_MAX_LCORE + 10);
+	if (db != NULL || rte_errno != EINVAL) {
+		printf("ERROR: No error on create_burst() num_workers > MAX\n");
+		return -1;
+	}
+
 	return 0;
 }
 
 
 /* Useful function which ensures that all worker functions terminate */
 static void
-quit_workers(struct rte_distributor *d, struct rte_mempool *p)
+quit_workers(struct worker_params *wp, struct rte_mempool *p)
 {
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
 	const unsigned num_workers = rte_lcore_count() - 1;
 	unsigned i;
 	struct rte_mbuf *bufs[RTE_MAX_LCORE];
@@ -491,12 +736,20 @@ quit_workers(struct rte_distributor *d, struct rte_mempool *p)
 	quit = 1;
 	for (i = 0; i < num_workers; i++)
 		bufs[i]->hash.usr = i << 1;
-	rte_distributor_process(d, bufs, num_workers);
+	if (wp->dist_type == DIST_SINGLE)
+		rte_distributor_process(d, bufs, num_workers);
+	else
+		rte_distributor_process_burst(db, bufs, num_workers);
 
 	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
 
-	rte_distributor_process(d, NULL, 0);
-	rte_distributor_flush(d);
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_process(d, NULL, 0);
+		rte_distributor_flush(d);
+	} else {
+		rte_distributor_process_burst(db, NULL, 0);
+		rte_distributor_flush_burst(db);
+	}
 	rte_eal_mp_wait_lcore();
 	quit = 0;
 	worker_idx = 0;
@@ -506,7 +759,9 @@ static int
 test_distributor(void)
 {
 	static struct rte_distributor *d;
+	static struct rte_distributor_burst *db;
 	static struct rte_mempool *p;
+	int i;
 
 	if (rte_lcore_count() < 2) {
 		printf("ERROR: not enough cores to test distributor\n");
@@ -525,6 +780,19 @@ test_distributor(void)
 		rte_distributor_clear_returns(d);
 	}
 
+	if (db == NULL) {
+		db = rte_distributor_create_burst("Test_dist_burst",
+				rte_socket_id(),
+				rte_lcore_count() - 1);
+		if (db == NULL) {
+			printf("Error creating burst distributor\n");
+			return -1;
+		}
+	} else {
+		rte_distributor_flush_burst(db);
+		rte_distributor_clear_returns_burst(db);
+	}
+
 	const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
 			(BIG_BATCH * 2) - 1 : (511 * rte_lcore_count());
 	if (p == NULL) {
@@ -536,31 +804,45 @@ test_distributor(void)
 		}
 	}
 
-	rte_eal_mp_remote_launch(handle_work, d, SKIP_MASTER);
-	if (sanity_test(d, p) < 0)
-		goto err;
-	quit_workers(d, p);
+	worker_params.d = d;
+	worker_params.db = db;
 
-	rte_eal_mp_remote_launch(handle_work_with_free_mbufs, d, SKIP_MASTER);
-	if (sanity_test_with_mbuf_alloc(d, p) < 0)
-		goto err;
-	quit_workers(d, p);
+	for (i = 0; i < DIST_NUM_TYPES; i++) {
 
-	if (rte_lcore_count() > 2) {
-		rte_eal_mp_remote_launch(handle_work_for_shutdown_test, d,
-				SKIP_MASTER);
-		if (sanity_test_with_worker_shutdown(d, p) < 0)
-			goto err;
-		quit_workers(d, p);
+		worker_params.dist_type = i;
 
-		rte_eal_mp_remote_launch(handle_work_for_shutdown_test, d,
-				SKIP_MASTER);
-		if (test_flush_with_worker_shutdown(d, p) < 0)
+		rte_eal_mp_remote_launch(handle_work,
+				&worker_params, SKIP_MASTER);
+		if (sanity_test(&worker_params, p) < 0)
 			goto err;
-		quit_workers(d, p);
+		quit_workers(&worker_params, p);
 
-	} else {
-		printf("Not enough cores to run tests for worker shutdown\n");
+		rte_eal_mp_remote_launch(handle_work_with_free_mbufs,
+				&worker_params, SKIP_MASTER);
+		if (sanity_test_with_mbuf_alloc(&worker_params, p) < 0)
+			goto err;
+		quit_workers(&worker_params, p);
+
+		if (rte_lcore_count() > 2) {
+			rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
+					&worker_params,
+					SKIP_MASTER);
+			if (sanity_test_with_worker_shutdown(&worker_params,
+					p) < 0)
+				goto err;
+			quit_workers(&worker_params, p);
+
+			rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
+					&worker_params,
+					SKIP_MASTER);
+			if (test_flush_with_worker_shutdown(&worker_params,
+					p) < 0)
+				goto err;
+			quit_workers(&worker_params, p);
+
+		} else {
+			printf("Too few cores to run worker shutdown test\n");
+		}
 	}
 
 	if (test_error_distributor_create_numworkers() == -1 ||
@@ -572,7 +854,7 @@ test_distributor(void)
 	return 0;
 
 err:
-	quit_workers(d, p);
+	quit_workers(&worker_params, p);
 	return -1;
 }
 
-- 
2.7.4

^ permalink raw reply related

* [PATCH v2 1/5] lib: distributor performance enhancements
From: David Hunt @ 2016-12-22  4:37 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson, David Hunt
In-Reply-To: <1482381428-148094-1-git-send-email-david.hunt@intel.com>

Now sends bursts of up to 8 mbufs to each worker, and tracks
the in-flight flow-ids (atomic scheduling)

New file with a new api, similar to the old API except with _burst
at the end of the function names

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 lib/librte_distributor/Makefile                |   2 +
 lib/librte_distributor/rte_distributor.c       |  72 +--
 lib/librte_distributor/rte_distributor_burst.c | 642 +++++++++++++++++++++++++
 lib/librte_distributor/rte_distributor_burst.h | 255 ++++++++++
 lib/librte_distributor/rte_distributor_priv.h  | 190 ++++++++
 5 files changed, 1090 insertions(+), 71 deletions(-)
 create mode 100644 lib/librte_distributor/rte_distributor_burst.c
 create mode 100644 lib/librte_distributor/rte_distributor_burst.h
 create mode 100644 lib/librte_distributor/rte_distributor_priv.h

diff --git a/lib/librte_distributor/Makefile b/lib/librte_distributor/Makefile
index 4c9af17..2acc54d 100644
--- a/lib/librte_distributor/Makefile
+++ b/lib/librte_distributor/Makefile
@@ -43,9 +43,11 @@ LIBABIVER := 1
 
 # all source are stored in SRCS-y
 SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) := rte_distributor.c
+SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += rte_distributor_burst.c
 
 # install this header file
 SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include := rte_distributor.h
+SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include += rte_distributor_burst.h
 
 # this lib needs eal
 DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_eal
diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
index f3f778c..c05f6e3 100644
--- a/lib/librte_distributor/rte_distributor.c
+++ b/lib/librte_distributor/rte_distributor.c
@@ -40,79 +40,9 @@
 #include <rte_errno.h>
 #include <rte_string_fns.h>
 #include <rte_eal_memconfig.h>
+#include "rte_distributor_priv.h"
 #include "rte_distributor.h"
 
-#define NO_FLAGS 0
-#define RTE_DISTRIB_PREFIX "DT_"
-
-/* we will use the bottom four bits of pointer for flags, shifting out
- * the top four bits to make room (since a 64-bit pointer actually only uses
- * 48 bits). An arithmetic-right-shift will then appropriately restore the
- * original pointer value with proper sign extension into the top bits. */
-#define RTE_DISTRIB_FLAG_BITS 4
-#define RTE_DISTRIB_FLAGS_MASK (0x0F)
-#define RTE_DISTRIB_NO_BUF 0       /**< empty flags: no buffer requested */
-#define RTE_DISTRIB_GET_BUF (1)    /**< worker requests a buffer, returns old */
-#define RTE_DISTRIB_RETURN_BUF (2) /**< worker returns a buffer, no request */
-
-#define RTE_DISTRIB_BACKLOG_SIZE 8
-#define RTE_DISTRIB_BACKLOG_MASK (RTE_DISTRIB_BACKLOG_SIZE - 1)
-
-#define RTE_DISTRIB_MAX_RETURNS 128
-#define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
-
-/**
- * Maximum number of workers allowed.
- * Be aware of increasing the limit, becaus it is limited by how we track
- * in-flight tags. See @in_flight_bitmask and @rte_distributor_process
- */
-#define RTE_DISTRIB_MAX_WORKERS	64
-
-/**
- * Buffer structure used to pass the pointer data between cores. This is cache
- * line aligned, but to improve performance and prevent adjacent cache-line
- * prefetches of buffers for other workers, e.g. when worker 1's buffer is on
- * the next cache line to worker 0, we pad this out to three cache lines.
- * Only 64-bits of the memory is actually used though.
- */
-union rte_distributor_buffer {
-	volatile int64_t bufptr64;
-	char pad[RTE_CACHE_LINE_SIZE*3];
-} __rte_cache_aligned;
-
-struct rte_distributor_backlog {
-	unsigned start;
-	unsigned count;
-	int64_t pkts[RTE_DISTRIB_BACKLOG_SIZE];
-};
-
-struct rte_distributor_returned_pkts {
-	unsigned start;
-	unsigned count;
-	struct rte_mbuf *mbufs[RTE_DISTRIB_MAX_RETURNS];
-};
-
-struct rte_distributor {
-	TAILQ_ENTRY(rte_distributor) next;    /**< Next in list. */
-
-	char name[RTE_DISTRIBUTOR_NAMESIZE];  /**< Name of the ring. */
-	unsigned num_workers;                 /**< Number of workers polling */
-
-	uint32_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS];
-		/**< Tracks the tag being processed per core */
-	uint64_t in_flight_bitmask;
-		/**< on/off bits for in-flight tags.
-		 * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64 then
-		 * the bitmask has to expand.
-		 */
-
-	struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS];
-
-	union rte_distributor_buffer bufs[RTE_DISTRIB_MAX_WORKERS];
-
-	struct rte_distributor_returned_pkts returns;
-};
-
 TAILQ_HEAD(rte_distributor_list, rte_distributor);
 
 static struct rte_tailq_elem rte_distributor_tailq = {
diff --git a/lib/librte_distributor/rte_distributor_burst.c b/lib/librte_distributor/rte_distributor_burst.c
new file mode 100644
index 0000000..9d9ae2d
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor_burst.c
@@ -0,0 +1,642 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <sys/queue.h>
+#include <string.h>
+#include <rte_mbuf.h>
+#include <rte_memory.h>
+#include <rte_cycles.h>
+#include <rte_memzone.h>
+#include <rte_errno.h>
+#include <rte_string_fns.h>
+#include <rte_eal_memconfig.h>
+#include "rte_distributor_priv.h"
+#include "rte_distributor_burst.h"
+#include "smmintrin.h"
+
+TAILQ_HEAD(rte_dist_burst_list, rte_distributor_burst);
+
+static struct rte_tailq_elem rte_dist_burst_tailq = {
+	.name = "RTE_DIST_BURST",
+};
+EAL_REGISTER_TAILQ(rte_dist_burst_tailq)
+
+/**** APIs called by workers ****/
+
+/**** Burst Packet APIs called by workers ****/
+
+/* This function should really be called return_pkt_burst() */
+void
+rte_distributor_request_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **oldpkt,
+		unsigned int count)
+{
+	struct rte_distributor_buffer_burst *buf = &(d->bufs[worker_id]);
+	unsigned int i;
+
+	volatile int64_t *retptr64;
+
+
+	/* if we dont' have any packets to return, return. */
+	if (count == 0)
+		return;
+
+	retptr64 = &(buf->retptr64[0]);
+	/* Spin while handshake bits are set (scheduler clears it) */
+	while (unlikely(*retptr64 & RTE_DISTRIB_GET_BUF)) {
+		rte_pause();
+		uint64_t t = __rdtsc()+100;
+
+		while (__rdtsc() < t)
+			rte_pause();
+	}
+
+	/*
+	 * OK, if we've got here, then the scheduler has just cleared the
+	 * handshake bits. Populate the retptrs with returning packets.
+	 */
+
+	for (i = count; i < RTE_DIST_BURST_SIZE; i++)
+		buf->retptr64[i] = 0;
+
+	/* Set Return bit for each packet returned */
+	for (i = count; i-- > 0; )
+		buf->retptr64[i] =
+			(((int64_t)(uintptr_t)(oldpkt[i])) <<
+			RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
+
+	/*
+	 * Finally, set the GET_BUF  to signal to distributor that cache
+	 * line is ready for processing
+	 */
+	*retptr64 |= RTE_DISTRIB_GET_BUF;
+}
+
+int
+rte_distributor_poll_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **pkts)
+{
+	struct rte_distributor_buffer_burst *buf = &d->bufs[worker_id];
+	uint64_t ret;
+	int count = 0;
+
+	/* If bit is set, return */
+	if (buf->bufptr64[0] & RTE_DISTRIB_GET_BUF)
+		return 0;
+
+	/* since bufptr64 is signed, this should be an arithmetic shift */
+	for (unsigned int i = 0; i < RTE_DIST_BURST_SIZE; i++) {
+		if (likely(buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)) {
+			ret = buf->bufptr64[i] >> RTE_DISTRIB_FLAG_BITS;
+			pkts[count++] = (struct rte_mbuf *)((uintptr_t)(ret));
+		}
+	}
+
+	/*
+	 * so now we've got the contents of the cacheline into an  array of
+	 * mbuf pointers, so toggle the bit so scheduler can start working
+	 * on the next cacheline while we're working.
+	 */
+	buf->bufptr64[0] |= RTE_DISTRIB_GET_BUF;
+
+
+	return count;
+}
+
+int
+rte_distributor_get_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **pkts,
+		struct rte_mbuf **oldpkt, unsigned int return_count)
+{
+	unsigned int count;
+	uint64_t retries = 0;
+
+	rte_distributor_request_pkt_burst(d, worker_id, oldpkt, return_count);
+
+	count = rte_distributor_poll_pkt_burst(d, worker_id, pkts);
+	while (count == 0) {
+		rte_pause();
+		retries++;
+		if (retries > 1000) {
+			retries = 0;
+			return 0;
+		}
+		uint64_t t = __rdtsc()+100;
+
+		while (__rdtsc() < t)
+			rte_pause();
+
+		count = rte_distributor_poll_pkt_burst(d, worker_id, pkts);
+	}
+	return count;
+}
+
+int
+rte_distributor_return_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **oldpkt, int num)
+{
+	struct rte_distributor_buffer_burst *buf = &d->bufs[worker_id];
+	unsigned int i;
+
+	for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
+		/* Switch off the return bit first */
+		buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
+
+	for (i = num; i-- > 0; )
+		buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<
+			RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
+
+	/* set the GET_BUF but even if we got no returns */
+	buf->retptr64[0] |= RTE_DISTRIB_GET_BUF;
+
+	return 0;
+}
+
+/**** APIs called on distributor core ***/
+
+/* stores a packet returned from a worker inside the returns array */
+static inline void
+store_return(uintptr_t oldbuf, struct rte_distributor_burst *d,
+		unsigned int *ret_start, unsigned int *ret_count)
+{
+	if (!oldbuf)
+		return;
+	/* store returns in a circular buffer */
+	d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
+			= (void *)oldbuf;
+	*ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK);
+	*ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK);
+}
+
+#if RTE_MACHINE_CPUFLAG_SSE2
+static inline void
+find_match_sse2(struct rte_distributor_burst *d,
+			uint16_t *data_ptr,
+			uint16_t *output_ptr)
+{
+	/* Setup */
+	__m128i incoming_fids;
+	__m128i inflight_fids;
+	__m128i preflight_fids;
+	__m128i wkr;
+	__m128i mask1;
+	__m128i mask2;
+	__m128i output;
+	struct rte_distributor_backlog *bl;
+
+	/*
+	 * Function overview:
+	 * 2. Loop through all worker ID's
+	 *  2a. Load the current inflights for that worker into an xmm reg
+	 *  2b. Load the current backlog for that worker into an xmm reg
+	 *  2c. use cmpestrm to intersect flow_ids with backlog and inflights
+	 *  2d. Add any matches to the output
+	 * 3. Write the output xmm (matching worker ids).
+	 */
+
+
+	output = _mm_set1_epi16(0);
+	incoming_fids = _mm_load_si128((__m128i *)data_ptr);
+
+	for (uint16_t i = 0; i < d->num_workers; i++) {
+		bl = &d->backlog[i];
+
+		inflight_fids =
+			_mm_load_si128((__m128i *)&(d->in_flight_tags[i]));
+		preflight_fids =
+			_mm_load_si128((__m128i *)(bl->tags));
+
+		/*
+		 * Any incoming_fid that exists anywhere in inflight_fids will
+		 * have 0xffff in same position of the mask as the incoming fid
+		 * Example (shortened to bytes for brevity):
+		 * incoming_fids   0x01 0x02 0x03 0x04 0x05 0x06 0x07 0x08
+		 * inflight_fids   0x03 0x05 0x07 0x00 0x00 0x00 0x00 0x00
+		 * mask            0x00 0x00 0xff 0x00 0xff 0x00 0xff 0x00
+		 */
+
+		mask1 = _mm_cmpestrm(inflight_fids, 8, incoming_fids, 8,
+			_SIDD_UWORD_OPS |
+			_SIDD_CMP_EQUAL_ANY |
+			_SIDD_UNIT_MASK);
+		mask2 = _mm_cmpestrm(preflight_fids, 8, incoming_fids, 8,
+			_SIDD_UWORD_OPS |
+			_SIDD_CMP_EQUAL_ANY |
+			_SIDD_UNIT_MASK);
+
+		mask1 = _mm_or_si128(mask1, mask2);
+		/*
+		 * Now mask contains 0xffff where there's a match.
+		 * Next we need to store the worker_id in the relevant position
+		 * in the output.
+		 */
+
+		wkr = _mm_set1_epi16(i+1);
+		mask1 = _mm_and_si128(mask1, wkr);
+		output = _mm_or_si128(mask1, output);
+	}
+
+	/*
+	 * At this stage, the output 128-bit contains 8 16-bit values, with
+	 * each non-zero value containing the worker ID on which the
+	 * corresponding flow is pinned to.
+	 */
+	_mm_store_si128((__m128i *)output_ptr, output);
+}
+#endif
+
+static inline void
+find_match_scalar(struct rte_distributor_burst *d,
+			uint16_t *data_ptr,
+			uint16_t *output_ptr)
+{
+	struct rte_distributor_backlog *bl;
+	uint16_t i, j, w;
+
+	/*
+	 * Function overview:
+	 * 1. Loop through all worker ID's
+	 * 2. Compare the current inflights to the incoming tags
+	 * 3. Compare the current backlog to the incoming tags
+	 * 4. Add any matches to the output
+	 */
+
+	for (j = 0 ; j < 8; j++)
+		output_ptr[j] = 0;
+
+	for (i = 0; i < d->num_workers; i++) {
+		bl = &d->backlog[i];
+
+		for (j = 0; j < 8 ; j++)
+			for (w = 0; w < 8; w++)
+				if (d->in_flight_tags[i][j] == data_ptr[w]) {
+					output_ptr[j] = i+1;
+					break;
+				}
+		for (j = 0; j < 8; j++)
+			for (w = 0; w < 8; w++)
+				if (bl->tags[j] == data_ptr[w]) {
+					output_ptr[j] = i+1;
+					break;
+				}
+	}
+
+	/*
+	 * At this stage, the output contains 8 16-bit values, with
+	 * each non-zero value containing the worker ID on which the
+	 * corresponding flow is pinned to.
+	 */
+}
+
+
+
+static unsigned int
+handle_returns(struct rte_distributor_burst *d, unsigned int wkr)
+{
+	struct rte_distributor_buffer_burst *buf = &(d->bufs[wkr]);
+	uintptr_t oldbuf;
+	unsigned int ret_start = d->returns.start,
+			ret_count = d->returns.count;
+	unsigned int count = 0;
+	/*
+	 * wait for the GET_BUF bit to go high, otherwise we can't send
+	 * the packets to the worker
+	 */
+
+	if (buf->retptr64[0] & RTE_DISTRIB_GET_BUF) {
+		for (unsigned int i = 0; i < RTE_DIST_BURST_SIZE; i++) {
+			if (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) {
+				oldbuf = ((uintptr_t)(buf->retptr64[i] >>
+					RTE_DISTRIB_FLAG_BITS));
+				/* store returns in a circular buffer */
+				store_return(oldbuf, d, &ret_start, &ret_count);
+				count++;
+				buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
+			}
+		}
+		d->returns.start = ret_start;
+		d->returns.count = ret_count;
+		/* Clear for the worker to populate with more returns */
+		buf->retptr64[0] = 0;
+	}
+	return count;
+}
+
+static unsigned int
+release(struct rte_distributor_burst *d, unsigned int wkr)
+{
+	struct rte_distributor_buffer_burst *buf = &(d->bufs[wkr]);
+	unsigned int i;
+
+	if (d->backlog[wkr].count == 0)
+		return 0;
+
+	while (!(d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF))
+		rte_pause();
+
+	handle_returns(d, wkr);
+
+	buf->count = 0;
+
+	for (i = 0; i < d->backlog[wkr].count; i++) {
+		d->bufs[wkr].bufptr64[i] = d->backlog[wkr].pkts[i] |
+				RTE_DISTRIB_GET_BUF | RTE_DISTRIB_VALID_BUF;
+		d->in_flight_tags[wkr][i] = d->backlog[wkr].tags[i];
+	}
+	buf->count = i;
+	for ( ; i < RTE_DIST_BURST_SIZE ; i++) {
+		buf->bufptr64[i] = RTE_DISTRIB_GET_BUF;
+		d->in_flight_tags[wkr][i] = 0;
+	}
+
+	d->backlog[wkr].count = 0;
+
+	/* Clear the GET bit */
+	buf->bufptr64[0] &= ~RTE_DISTRIB_GET_BUF;
+	return  buf->count;
+
+}
+
+
+/* process a set of packets to distribute them to workers */
+int
+rte_distributor_process_burst(struct rte_distributor_burst *d,
+		struct rte_mbuf **mbufs, unsigned int num_mbufs)
+{
+	unsigned int next_idx = 0;
+	static unsigned int wkr;
+	struct rte_mbuf *next_mb = NULL;
+	int64_t next_value = 0;
+	uint16_t new_tag = 0;
+	uint16_t flows[8] __rte_cache_aligned;
+	//static int iter=0;
+
+	if (unlikely(num_mbufs == 0)) {
+		/* Flush out all non-full cache-lines to workers. */
+		for (unsigned int wid = 0 ; wid < d->num_workers; wid++) {
+			if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF)) {
+				release(d, wid);
+				handle_returns(d, wid);
+			}
+		}
+		return 0;
+	}
+
+	while (next_idx < num_mbufs) {
+		uint16_t matches[8];
+		int pkts;
+
+		if (d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF)
+			d->bufs[wkr].count = 0;
+
+		for (unsigned int i = 0; i < RTE_DIST_BURST_SIZE; i++) {
+			if (mbufs[next_idx + i]) {
+				/* flows have to be non-zero */
+				flows[i] = mbufs[next_idx + i]->hash.usr | 1;
+			} else
+				flows[i] = 0;
+		}
+
+		switch (d->dist_match_fn) {
+#ifdef RTE_MACHINE_CPUFLAG_SSE2
+		case RTE_DIST_MATCH_SSE:
+			find_match_sse2(d, &flows[0], &matches[0]);
+			break;
+#endif
+		default:
+			find_match_scalar(d, &flows[0], &matches[0]);
+		}
+
+		/*
+		 * Matches array now contain the intended worker ID (+1) of
+		 * the incoming packets. Any zeroes need to be assigned
+		 * workers.
+		 */
+
+		if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE)
+			pkts = num_mbufs - next_idx;
+		else
+			pkts = RTE_DIST_BURST_SIZE;
+
+		for (int j = 0; j < pkts; j++) {
+
+			next_mb = mbufs[next_idx++];
+			next_value = (((int64_t)(uintptr_t)next_mb) <<
+					RTE_DISTRIB_FLAG_BITS);
+			/*
+			 * User is advocated to set tag vaue for each
+			 * mbuf before calling rte_distributor_process.
+			 * User defined tags are used to identify flows,
+			 * or sessions.
+			 */
+			/* flows MUST be non-zero */
+			new_tag = (uint16_t)(next_mb->hash.usr) | 1;
+
+			/*
+			 * Using the next line will cause the find_match
+			 * function to be optimised out, making this function
+			 * do parallel (non-atomic) distribution
+			 */
+			//matches[j] = 0;
+
+			if (matches[j]) {
+				struct rte_distributor_backlog *bl =
+						&d->backlog[matches[j]-1];
+				if (unlikely(bl->count ==
+						RTE_DIST_BURST_SIZE)) {
+					release(d, matches[j]-1);
+				}
+
+				/* Add to worker that already has flow */
+				unsigned int idx = bl->count++;
+
+				bl->tags[idx] = new_tag;
+				bl->pkts[idx] = next_value;
+
+			} else {
+				struct rte_distributor_backlog *bl =
+						&d->backlog[wkr];
+				if (unlikely(bl->count ==
+						RTE_DIST_BURST_SIZE)) {
+					release(d, wkr);
+				}
+
+				/* Add to current worker worker */
+				unsigned int idx = bl->count++;
+
+				bl->tags[idx] = new_tag;
+				bl->pkts[idx] = next_value;
+				/*
+				 * Now that we've just added an unpinned flow
+				 * to a worker, we need to ensure that all
+				 * other packets with that same flow will go
+				 * to the same worker in this burst.
+				 */
+				for (int w = j; w < pkts; w++)
+					if (flows[w] == new_tag)
+						matches[w] = wkr+1;
+			}
+		}
+		wkr++;
+		if (wkr >= d->num_workers)
+			wkr = 0;
+	}
+
+	/* Flush out all non-full cache-lines to workers. */
+	for (unsigned int wid = 0 ; wid < d->num_workers; wid++)
+		if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF))
+			release(d, wid);
+
+	return num_mbufs;
+}
+
+/* return to the caller, packets returned from workers */
+int
+rte_distributor_returned_pkts_burst(struct rte_distributor_burst *d,
+		struct rte_mbuf **mbufs, unsigned int max_mbufs)
+{
+	struct rte_distributor_returned_pkts *returns = &d->returns;
+	unsigned int retval = (max_mbufs < returns->count) ?
+			max_mbufs : returns->count;
+	unsigned int i;
+
+	for (i = 0; i < retval; i++) {
+		unsigned int idx = (returns->start + i) &
+				RTE_DISTRIB_RETURNS_MASK;
+
+		mbufs[i] = returns->mbufs[idx];
+	}
+	returns->start += i;
+	returns->count -= i;
+
+	return retval;
+}
+
+/*
+ * Return the number of packets in-flight in a distributor, i.e. packets
+ * being workered on or queued up in a backlog.
+ */
+static inline unsigned int
+total_outstanding(const struct rte_distributor_burst *d)
+{
+	unsigned int wkr, total_outstanding = 0;
+
+	for (wkr = 0; wkr < d->num_workers; wkr++)
+		total_outstanding += d->backlog[wkr].count;
+
+	return total_outstanding;
+}
+
+/*
+ * Flush the distributor, so that there are no outstanding packets in flight or
+ * queued up.
+ */
+int
+rte_distributor_flush_burst(struct rte_distributor_burst *d)
+{
+	const unsigned int flushed = total_outstanding(d);
+	unsigned int wkr;
+
+	while (total_outstanding(d) > 0)
+		rte_distributor_process_burst(d, NULL, 0);
+
+	for (wkr = 0; wkr < d->num_workers; wkr++)
+		handle_returns(d, wkr);
+
+	return flushed;
+}
+
+/* clears the internal returns array in the distributor */
+void
+rte_distributor_clear_returns_burst(struct rte_distributor_burst *d)
+{
+	/* throw away returns, so workers can exit */
+	for (unsigned int wkr = 0; wkr < d->num_workers; wkr++)
+		d->bufs[wkr].retptr64[0] = 0;
+}
+
+/* creates a distributor instance */
+struct rte_distributor_burst *
+rte_distributor_create_burst(const char *name,
+		unsigned int socket_id,
+		unsigned int num_workers)
+{
+	struct rte_distributor_burst *d;
+	struct rte_dist_burst_list *dist_burst_list;
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+	const struct rte_memzone *mz;
+
+	/* compilation-time checks */
+	RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0);
+	RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
+
+	if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) {
+		rte_errno = EINVAL;
+		return NULL;
+	}
+
+	snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
+	mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
+	if (mz == NULL) {
+		rte_errno = ENOMEM;
+		return NULL;
+	}
+
+	d = mz->addr;
+	snprintf(d->name, sizeof(d->name), "%s", name);
+	d->num_workers = num_workers;
+
+#if defined(RTE_ARCH_X86)
+	if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_SSE2)) {
+		d->dist_match_fn = RTE_DIST_MATCH_SSE;
+	} else {
+#endif
+		d->dist_match_fn = RTE_DIST_MATCH_SCALAR;
+	}
+
+	/*
+	 * Set up the backog tags so they're pointing at the second cache
+	 * line for performance during flow matching
+	 */
+	for (unsigned int i = 0 ; i < num_workers ; i++)
+		d->backlog[i].tags = &d->in_flight_tags[i][RTE_DIST_BURST_SIZE];
+
+	dist_burst_list = RTE_TAILQ_CAST(rte_dist_burst_tailq.head,
+					  rte_dist_burst_list);
+
+	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+	TAILQ_INSERT_TAIL(dist_burst_list, d, next);
+	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	return d;
+}
diff --git a/lib/librte_distributor/rte_distributor_burst.h b/lib/librte_distributor/rte_distributor_burst.h
new file mode 100644
index 0000000..5096b13
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor_burst.h
@@ -0,0 +1,255 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_DIST_BURST_H_
+#define _RTE_DIST_BURST_H_
+
+/**
+ * @file
+ * RTE distributor
+ *
+ * The distributor is a component which is designed to pass packets
+ * one-at-a-time to workers, with dynamic load balancing.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct rte_distributor_burst;
+struct rte_mbuf;
+
+/**
+ * Function to create a new distributor instance
+ *
+ * Reserves the memory needed for the distributor operation and
+ * initializes the distributor to work with the configured number of workers.
+ *
+ * @param name
+ *   The name to be given to the distributor instance.
+ * @param socket_id
+ *   The NUMA node on which the memory is to be allocated
+ * @param num_workers
+ *   The maximum number of workers that will request packets from this
+ *   distributor
+ * @return
+ *   The newly created distributor instance
+ */
+struct rte_distributor_burst *
+rte_distributor_create_burst(const char *name, unsigned int socket_id,
+		unsigned int num_workers);
+
+/*  *** APIS to be called on the distributor lcore ***  */
+/*
+ * The following APIs are the public APIs which are designed for use on a
+ * single lcore which acts as the distributor lcore for a given distributor
+ * instance. These functions cannot be called on multiple cores simultaneously
+ * without using locking to protect access to the internals of the distributor.
+ *
+ * NOTE: a given lcore cannot act as both a distributor lcore and a worker lcore
+ * for the same distributor instance, otherwise deadlock will result.
+ */
+
+/**
+ * Process a set of packets by distributing them among workers that request
+ * packets. The distributor will ensure that no two packets that have the
+ * same flow id, or tag, in the mbuf will be processed on different cores at
+ * the same time.
+ *
+ * The user is advocated to set tag for each mbuf before calling this function.
+ * If user doesn't set the tag, the tag value can be various values depending on
+ * driver implementation and configuration.
+ *
+ * This is not multi-thread safe and should only be called on a single lcore.
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param mbufs
+ *   The mbufs to be distributed
+ * @param num_mbufs
+ *   The number of mbufs in the mbufs array
+ * @return
+ *   The number of mbufs processed.
+ */
+int
+rte_distributor_process_burst(struct rte_distributor_burst *d,
+		struct rte_mbuf **mbufs, unsigned int num_mbufs);
+
+/**
+ * Get a set of mbufs that have been returned to the distributor by workers
+ *
+ * This should only be called on the same lcore as rte_distributor_process()
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param mbufs
+ *   The mbufs pointer array to be filled in
+ * @param max_mbufs
+ *   The size of the mbufs array
+ * @return
+ *   The number of mbufs returned in the mbufs array.
+ */
+int
+rte_distributor_returned_pkts_burst(struct rte_distributor_burst *d,
+		struct rte_mbuf **mbufs, unsigned int max_mbufs);
+
+/**
+ * Flush the distributor component, so that there are no in-flight or
+ * backlogged packets awaiting processing
+ *
+ * This should only be called on the same lcore as rte_distributor_process()
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @return
+ *   The number of queued/in-flight packets that were completed by this call.
+ */
+int
+rte_distributor_flush_burst(struct rte_distributor_burst *d);
+
+/**
+ * Clears the array of returned packets used as the source for the
+ * rte_distributor_returned_pkts() API call.
+ *
+ * This should only be called on the same lcore as rte_distributor_process()
+ *
+ * @param d
+ *   The distributor instance to be used
+ */
+void
+rte_distributor_clear_returns_burst(struct rte_distributor_burst *d);
+
+/*  *** APIS to be called on the worker lcores ***  */
+/*
+ * The following APIs are the public APIs which are designed for use on
+ * multiple lcores which act as workers for a distributor. Each lcore should use
+ * a unique worker id when requesting packets.
+ *
+ * NOTE: a given lcore cannot act as both a distributor lcore and a worker lcore
+ * for the same distributor instance, otherwise deadlock will result.
+ */
+
+/**
+ * API called by a worker to get new packets to process. Any previous packets
+ * given to the worker is assumed to have completed processing, and may be
+ * optionally returned to the distributor via the oldpkt parameter.
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param worker_id
+ *   The worker instance number to use - must be less that num_workers passed
+ *   at distributor creation time.
+ * @param pkts
+ *   The mbufs pointer array to be filled in (up to 8 packets)
+ * @param oldpkt
+ *   The previous packet, if any, being processed by the worker
+ * @param retcount
+ *   The number of packets being returned
+ *
+ * @return
+ *   The number of packets in the pkts array
+ */
+int
+rte_distributor_get_pkt_burst(struct rte_distributor_burst *d,
+	unsigned int worker_id, struct rte_mbuf **pkts,
+	struct rte_mbuf **oldpkt, unsigned int retcount);
+
+/**
+ * API called by a worker to return a completed packet without requesting a
+ * new packet, for example, because a worker thread is shutting down
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param worker_id
+ *   The worker instance number to use - must be less that num_workers passed
+ *   at distributor creation time.
+ * @param mbuf
+ *   The previous packet being processed by the worker
+ */
+int
+rte_distributor_return_pkt_burst(struct rte_distributor_burst *d,
+	unsigned int worker_id, struct rte_mbuf **oldpkt, int num);
+
+/**
+ * API called by a worker to request a new packet to process.
+ * Any previous packet given to the worker is assumed to have completed
+ * processing, and may be optionally returned to the distributor via
+ * the oldpkt parameter.
+ * Unlike rte_distributor_get_pkt_burst(), this function does not wait for a
+ * new packet to be provided by the distributor.
+ *
+ * NOTE: after calling this function, rte_distributor_poll_pkt_burst() should
+ * be used to poll for the packet requested. The rte_distributor_get_pkt_burst()
+ * API should *not* be used to try and retrieve the new packet.
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param worker_id
+ *   The worker instance number to use - must be less that num_workers passed
+ *   at distributor creation time.
+ * @param oldpkt
+ *   The returning packets, if any, processed by the worker
+ * @param count
+ *   The number of returning packets
+ */
+void
+rte_distributor_request_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **oldpkt,
+		unsigned int count);
+
+/**
+ * API called by a worker to check for a new packet that was previously
+ * requested by a call to rte_distributor_request_pkt(). It does not wait
+ * for the new packet to be available, but returns NULL if the request has
+ * not yet been fulfilled by the distributor.
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param worker_id
+ *   The worker instance number to use - must be less that num_workers passed
+ *   at distributor creation time.
+ * @param mbufs
+ *   The array of mbufs being given to the worker
+ *
+ * @return
+ *   The number of packets being given to the worker thread, zero if no
+ *   packet is yet available.
+ */
+int
+rte_distributor_poll_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **mbufs);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/librte_distributor/rte_distributor_priv.h b/lib/librte_distributor/rte_distributor_priv.h
new file mode 100644
index 0000000..1b1295a
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor_priv.h
@@ -0,0 +1,190 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_DIST_PRIV_H_
+#define _RTE_DIST_PRIV_H_
+
+/**
+ * @file
+ * RTE distributor
+ *
+ * The distributor is a component which is designed to pass packets
+ * one-at-a-time to workers, with dynamic load balancing.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define NO_FLAGS 0
+#define RTE_DISTRIB_PREFIX "DT_"
+
+/*
+ * We will use the bottom four bits of pointer for flags, shifting out
+ * the top four bits to make room (since a 64-bit pointer actually only uses
+ * 48 bits). An arithmetic-right-shift will then appropriately restore the
+ * original pointer value with proper sign extension into the top bits.
+ */
+#define RTE_DISTRIB_FLAG_BITS 4
+#define RTE_DISTRIB_FLAGS_MASK (0x0F)
+#define RTE_DISTRIB_NO_BUF 0       /**< empty flags: no buffer requested */
+#define RTE_DISTRIB_GET_BUF (1)    /**< worker requests a buffer, returns old */
+#define RTE_DISTRIB_RETURN_BUF (2) /**< worker returns a buffer, no request */
+#define RTE_DISTRIB_VALID_BUF (4)  /**< set if bufptr contains ptr */
+
+#define RTE_DISTRIB_BACKLOG_SIZE 8
+#define RTE_DISTRIB_BACKLOG_MASK (RTE_DISTRIB_BACKLOG_SIZE - 1)
+
+#define RTE_DISTRIB_MAX_RETURNS 128
+#define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
+
+/**
+ * Maximum number of workers allowed.
+ * Be aware of increasing the limit, becaus it is limited by how we track
+ * in-flight tags. See @in_flight_bitmask and @rte_distributor_process
+ */
+#define RTE_DISTRIB_MAX_WORKERS 64
+
+#define RTE_DISTRIBUTOR_NAMESIZE 32 /**< Length of name for instance */
+
+/**
+ * Buffer structure used to pass the pointer data between cores. This is cache
+ * line aligned, but to improve performance and prevent adjacent cache-line
+ * prefetches of buffers for other workers, e.g. when worker 1's buffer is on
+ * the next cache line to worker 0, we pad this out to three cache lines.
+ * Only 64-bits of the memory is actually used though.
+ */
+union rte_distributor_buffer {
+	volatile int64_t bufptr64;
+	char pad[RTE_CACHE_LINE_SIZE*3];
+} __rte_cache_aligned;
+
+/**
+ * Number of packets to deal with in bursts. Needs to be 8 so as to
+ * fit in one cache line.
+ */
+#define RTE_DIST_BURST_SIZE (sizeof(__m128i) / sizeof(uint16_t))
+
+/**
+ * Buffer structure used to pass the pointer data between cores. This is cache
+ * line aligned, but to improve performance and prevent adjacent cache-line
+ * prefetches of buffers for other workers, e.g. when worker 1's buffer is on
+ * the next cache line to worker 0, we pad this out to two cache lines.
+ * We can pass up to 8 mbufs at a time in one cacheline.
+ * There is a separate cacheline for returns in the burst API.
+ */
+struct rte_distributor_buffer_burst {
+	volatile int64_t bufptr64[RTE_DIST_BURST_SIZE]
+			__rte_cache_aligned; /* <= outgoing to worker */
+
+	int64_t pad1 __rte_cache_aligned;    /* <= one cache line  */
+
+	volatile int64_t retptr64[RTE_DIST_BURST_SIZE]
+			__rte_cache_aligned; /* <= incoming from worker */
+
+	int64_t pad2 __rte_cache_aligned;    /* <= one cache line  */
+
+	int count __rte_cache_aligned;       /* <= number of current mbufs */
+};
+
+
+struct rte_distributor_backlog {
+	unsigned int start;
+	unsigned int count;
+	int64_t pkts[RTE_DIST_BURST_SIZE] __rte_cache_aligned;
+	uint16_t *tags; /* will point to second cacheline of inflights */
+} __rte_cache_aligned;
+
+
+struct rte_distributor_returned_pkts {
+	unsigned int start;
+	unsigned int count;
+	struct rte_mbuf *mbufs[RTE_DISTRIB_MAX_RETURNS];
+};
+
+struct rte_distributor {
+	TAILQ_ENTRY(rte_distributor) next;    /**< Next in list. */
+
+	char name[RTE_DISTRIBUTOR_NAMESIZE];  /**< Name of the ring. */
+	unsigned int num_workers;             /**< Number of workers polling */
+
+	uint32_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS];
+		/**< Tracks the tag being processed per core */
+	uint64_t in_flight_bitmask;
+		/**< on/off bits for in-flight tags.
+		  * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64 then
+		  * the bitmask has to expand.
+		  */
+
+	struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS];
+
+	union rte_distributor_buffer bufs[RTE_DISTRIB_MAX_WORKERS];
+
+	struct rte_distributor_returned_pkts returns;
+};
+
+/* All different signature compare functions */
+enum rte_distributor_match_function {
+	RTE_DIST_MATCH_SCALAR = 0,
+	RTE_DIST_MATCH_SSE,
+	RTE_DIST_MATCH_NUM
+};
+
+struct rte_distributor_burst {
+	TAILQ_ENTRY(rte_distributor_burst) next;    /**< Next in list. */
+
+	char name[RTE_DISTRIBUTOR_NAMESIZE];  /**< Name of the ring. */
+	unsigned int num_workers;             /**< Number of workers polling */
+
+	/**>
+	  * First cache line in the this array are the tags inflight
+	  * on the worker core. Second cache line are the backlog
+	  * that are going to go to the worker core.
+	  */
+	uint16_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS][RTE_DIST_BURST_SIZE*2]
+			__rte_cache_aligned;
+
+	struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS]
+			__rte_cache_aligned;
+
+	struct rte_distributor_buffer_burst bufs[RTE_DISTRIB_MAX_WORKERS];
+
+	struct rte_distributor_returned_pkts returns;
+
+	enum rte_distributor_match_function dist_match_fn;
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
-- 
2.7.4

^ permalink raw reply related

* [PATCH v2 0/5] distributor library performance enhancements
From: David Hunt @ 2016-12-22  4:37 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson
In-Reply-To: <1480567821-70846-2-git-send-email-david.hunt@intel.com>

This patch aims to improve the throughput of the distributor library.

It adds a series of API calls similar to the original API, but with
"_burst" in the function names. Usage is similar (but not identical), in that
there are now bursts of mbufs sent to each worker at a time instead of a
single mbuf pointer. See the header file rte_distributor_burst.h for more
details on API usage.

It uses a similar handshake mechanism to the previous version of
the library, in that bits are used to indicate when packets are ready
to be sent to a worker and ready to be returned from a worker. One main
difference is that instead of sending one packet in a cache line, it makes
use of the 7 free spaces in the same cache line in order to send up to
8 packets at a time to/from a worker.

The flow matching algorithm has had significant re-work, and now keeps an
array of inflight flows and an array of backlog flows, and matches incoming
flows to the inflight/backlog flows of all workers so that flow pinning to
workers can be maintained.

The Flow Match algorithm has both scalar and a vector versions, and a
function pointer is used to select the post appropriate function at run time,
depending on the presence of the SSE2 cpu flag. On non-x86 platforms, the
the scalar match function is selected, which should still gives a good boost
in performance over the non-burst API.

v2 changes:
  * Created a common distributor_priv.h header file with common
    definitions and structures.
  * Added a scalar version so it can be built and used on machines without
    sse2 instruction set
  * Added unit autotests
  * Added perf autotest
  * Added doc updates

Notes:
   Apps using the birst API must now work in bursts, as up to 8 are given
   to a worker at a time
   For performance in matching, Flow ID's are 15-bits (non-zero)
   Original API (and code) is kept for backward compatibility

Performance Gains
   2.2GHz Intel(R) Xeon(R) CPU E5-2699 v4 @ 2.20GHz
   2 x XL710 40GbE NICS to 2 x 40Gbps traffic generator channels 64b packets
   separate cores for rx, tx, distributor
    1 worker  - 4.8x
    4 workers - 2.9x
    8 workers - 1.8x
   12 workers - 2.1x
   16 workers - 1.8x

[PATCH v2 1/5] lib: distributor performance enhancements
[PATCH v2 2/5] test: unit tests for new distributor burst api
[PATCH v2 3/5] test: add distributor_perf autotest
[PATCH v2 4/5] example: distributor app showing burst api
[PATCH v2 5/5] doc: distributor library changes for new burst api

^ permalink raw reply

* Re: [PATCH v3] drivers: advertise kmod dependencies in pmdinfo
From: Andrew Rybchenko @ 2016-12-22 11:35 UTC (permalink / raw)
  To: Ferruh Yigit, Neil Horman
  Cc: Thomas Monjalon, Olivier Matz, Adrien Mazarguil, dev, vido,
	fiona.trahe, stephen
In-Reply-To: <c5ac148d-258e-2845-37b0-15621c6b8b41@intel.com>

On 12/22/2016 02:04 PM, Ferruh Yigit wrote:
> On 12/21/2016 11:40 AM, Andrew Rybchenko wrote:
>> On 12/21/2016 02:37 PM, Neil Horman wrote:
>>> On Wed, Dec 21, 2016 at 12:21:14PM +0300, Andrew Rybchenko wrote:
>>>> On 12/20/2016 08:26 PM, Thomas Monjalon wrote:
>>>>>>> Add a new macro RTE_PMD_REGISTER_KMOD_DEP() that allows a driver to
>>>>>>> declare the list of kernel modules required to run properly.
>>>>>>>
>>>>>>> Today, most PCI drivers require uio/vfio.
>>>>>>>
>>>>>>> Signed-off-by: Olivier Matz <olivier.matz@6wind.com>
>>>>>>> Acked-by: Fiona Trahe <fiona.trahe@intel.com>
>>>>>> Acked-by: Adrien Mazarguil <adrien.mazarguil@6wind.com>
>>>>> Applied in main tree, thanks
>>>> Is there any plan on how it will be done/solved for a new drivers in
>>>> dpdk-next-net?
>>>> Should I care about it for sfc?
>>>>
>>> Given that all pmdinfo information is opt-in (that is to say not obligatory),
>>> you can now wait until net-next does its next rebase, and as you continue your
>>> development of the sfc driver, you can add the use of this macro in at your
>>> leisure.  As more people do that, we will arrive at 100% coverage
>> I see. Will do. Thanks.
>>
> Hi Andrew,
>
> Patch rebased to next-net, would you mind doing the mentioned patch for it?

Hi Ferruh,

done. I was in doubts which changeset to specify in fixes, but finally 
chosen the latest from mine and Olivier's. Please, correct me, if it is 
wrong.

Andrew.

^ permalink raw reply

* [PATCH] net/sfc: advertise kmod dependencies in pmdinfo
From: Andrew Rybchenko @ 2016-12-22 11:32 UTC (permalink / raw)
  To: dev; +Cc: ferruh.yigit, olivier.matz

Fixes: 0880c40113ef ("drivers: advertise kmod dependencies in pmdinfo")

Signed-off-by: Andrew Rybchenko <arybchenko@solarflare.com>
---
 drivers/net/sfc/sfc_ethdev.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/drivers/net/sfc/sfc_ethdev.c b/drivers/net/sfc/sfc_ethdev.c
index dd5ca5c..725c971 100644
--- a/drivers/net/sfc/sfc_ethdev.c
+++ b/drivers/net/sfc/sfc_ethdev.c
@@ -1350,6 +1350,7 @@ static struct eth_driver sfc_efx_pmd = {
 
 RTE_PMD_REGISTER_PCI(net_sfc_efx, sfc_efx_pmd.pci_drv);
 RTE_PMD_REGISTER_PCI_TABLE(net_sfc_efx, pci_id_sfc_efx_map);
+RTE_PMD_REGISTER_KMOD_DEP(net_sfc_efx, "* igb_uio | uio_pci_generic | vfio");
 RTE_PMD_REGISTER_PARAM_STRING(net_sfc_efx,
 	SFC_KVARG_PERF_PROFILE "=" SFC_KVARG_VALUES_PERF_PROFILE " "
 	SFC_KVARG_MCDI_LOGGING "=" SFC_KVARG_VALUES_BOOL " "
-- 
2.5.5

^ permalink raw reply related

* Re: [PATCH] ethdev: cleanup device ops struct whitespace
From: Thomas Monjalon @ 2016-12-22 11:18 UTC (permalink / raw)
  To: Ferruh Yigit; +Cc: dev
In-Reply-To: <20161208135940.17233-1-ferruh.yigit@intel.com>

2016-12-08 13:59, Ferruh Yigit:
> - Grouped related items using empty lines

I have few comments below.

[...]
> +
>  	eth_dev_infos_get_t        dev_infos_get; /**< Get device info. */
>  	eth_dev_supported_ptypes_get_t dev_supported_ptypes_get;
> -	/**< Get packet types supported and identified by device*/
> -	mtu_set_t                  mtu_set; /**< Set MTU. */
> -	vlan_filter_set_t          vlan_filter_set;  /**< Filter VLAN Setup. */
> -	vlan_tpid_set_t            vlan_tpid_set;      /**< Outer/Inner VLAN TPID Setup. */
> +	/**< Get packet types supported and identified by device. */

We could add a blank line here.

> +	mtu_set_t                  mtu_set;       /**< Set MTU. */
> +
[...]
> +
> +	eth_udp_tunnel_port_add_t  udp_tunnel_port_add; /** Add UDP tunnel port. */
> +	eth_udp_tunnel_port_del_t  udp_tunnel_port_del; /** Del UDP tunnel port. */
> +
[...]
> +
> +	reta_update_t              reta_update;   /** Update redirection table. */
> +	reta_query_t               reta_query;    /** Query redirection table. */
> +
> +	eth_get_reg_t              get_reg;           /**< Get registers. */
> +	eth_get_eeprom_length_t    get_eeprom_length; /**< Get eeprom length. */
> +	eth_get_eeprom_t           get_eeprom;        /**< Get eeprom data. */
> +	eth_set_eeprom_t           set_eeprom;        /**< Set eeprom. */
> +
[...]
> +	rss_hash_update_t          rss_hash_update; /** Configure RSS hash protocols. */
> +	rss_hash_conf_get_t        rss_hash_conf_get; /** Get current RSS hash configuration. */

RSS should go above with RETA.

> +	eth_filter_ctrl_t          filter_ctrl; /**< common filter control. */
> +	eth_set_mc_addr_list_t     set_mc_addr_list; /**< set list of mcast addrs. */

Should go with other MAC functions.

[...]
> +	eth_timesync_enable_t      timesync_enable;
>  	/** Turn IEEE1588/802.1AS timestamping on. */
> -	eth_timesync_enable_t timesync_enable;
> +	eth_timesync_disable_t     timesync_disable;
>  	/** Turn IEEE1588/802.1AS timestamping off. */
> -	eth_timesync_disable_t timesync_disable;
> -	/** Read the IEEE1588/802.1AS RX timestamp. */
>  	eth_timesync_read_rx_timestamp_t timesync_read_rx_timestamp;
> -	/** Read the IEEE1588/802.1AS TX timestamp. */
> +	/** Read the IEEE1588/802.1AS RX timestamp. */
>  	eth_timesync_read_tx_timestamp_t timesync_read_tx_timestamp;
> +	/** Read the IEEE1588/802.1AS TX timestamp. */
> +
> +	eth_get_dcb_info           get_dcb_info; /** Get DCB information. */

DCB should not be in the middle of timesync functions.

> +	eth_timesync_adjust_time   timesync_adjust_time; /** Adjust the device clock. */
> +	eth_timesync_read_time     timesync_read_time; /** Get the device clock time. */
> +	eth_timesync_write_time    timesync_write_time; /** Set the device clock time. */

^ permalink raw reply

* Re: Why IP_PIPELINE is faster than L2FWD
From: Bruce Richardson @ 2016-12-22 11:15 UTC (permalink / raw)
  To: Royce Niu; +Cc: dev
In-Reply-To: <CAOwUCNs=c_5Ndb1+8zjJuhmA+i109XS5VjT00XvX-LpHP3GrWQ@mail.gmail.com>

On Thu, Dec 22, 2016 at 12:18:12AM +0800, Royce Niu wrote:
> Hi all,
> 
> I tested default L2FWD and IP_PIPELINE (pass-through). The throughput of
> IP_PIPELINE is higher immensely.
> 
> There are only two virtual NICs in KVM. The experiment is just moving
> packet from vNIC0  to vNIC1. I think the function is so simple. Why L2FWD
> is much slower?
> 
> How can I improve L2FWD, to make L2FWD faster?
> 
Is IP_PIPELINE in passthrough mode modifying the packets? L2FWD swaps
the mac addresses on each packet as it processes them, which can slow it
down. L2FWD is also more an example of how the APIs work than anything
else. For fastest possible port-to-port forwarding, testpmd should give
the highest performance.

/Bruce

^ permalink raw reply

* Re: [PATCH v2 0/5] example/ethtool: add bus info and fw version get
From: Thomas Monjalon @ 2016-12-22 11:07 UTC (permalink / raw)
  To: Qiming Yang; +Cc: dev, Remy Horton
In-Reply-To: <b556bf81-e70a-9e5a-2db0-f52fe180ac51@intel.com>

2016-12-08 16:34, Remy Horton:
> 
> On 06/12/2016 15:16, Qiming Yang wrote:
> [..]
> > Qiming Yang (5):
> >   ethdev: add firmware version get
> >   net/e1000: add firmware version get
> >   net/ixgbe: add firmware version get
> >   net/i40e: add firmware version get
> >   ethtool: dispaly bus info and firmware version
> 
> s/dispaly/display
> 
> doc/guides/rel_notes/release_17_02.rst ought to be updated as well. Code 
> itself looks ok though..
> 
> Acked-by: Remy Horton <remy.horton@intel.com>

It must be a feature in the table (doc/guides/nics/features/).
The deprecation notice must be removed also.

I think it is OK to add a new dev_ops and a new API function for firmware
query. Generally speaking, it is a good thing to avoid putting all
informations in the same structure (e.g. rte_eth_dev_info). However, there
is a balance to find. Could we plan to add more info to this new query?
Instead of
	rte_eth_dev_fwver_get(uint8_t port_id, char *fw_version, int fw_length)
could it fill a struct?
	rte_eth_dev_fw_info_get(uint8_t port_id, struct rte_eth_dev_fw_info *fw_info)

We already have
	rte_eth_dev_get_reg_info(uint8_t port_id, struct rte_dev_reg_info *info)
with
	uint32_t version; /**< Device version */

There are also these functions (a bit related):
	rte_eth_dev_get_eeprom_length(uint8_t port_id)
	rte_eth_dev_get_eeprom(uint8_t port_id, struct rte_dev_eeprom_info *info)

^ permalink raw reply

* Re: [PATCH v3] drivers: advertise kmod dependencies in pmdinfo
From: Ferruh Yigit @ 2016-12-22 11:04 UTC (permalink / raw)
  To: Andrew Rybchenko, Neil Horman
  Cc: Thomas Monjalon, Olivier Matz, Adrien Mazarguil, dev, vido,
	fiona.trahe, stephen
In-Reply-To: <5c8a65d6-79a8-fbbe-29d6-bee5c08be17b@solarflare.com>

On 12/21/2016 11:40 AM, Andrew Rybchenko wrote:
> On 12/21/2016 02:37 PM, Neil Horman wrote:
>> On Wed, Dec 21, 2016 at 12:21:14PM +0300, Andrew Rybchenko wrote:
>>> On 12/20/2016 08:26 PM, Thomas Monjalon wrote:
>>>>>> Add a new macro RTE_PMD_REGISTER_KMOD_DEP() that allows a driver to
>>>>>> declare the list of kernel modules required to run properly.
>>>>>>
>>>>>> Today, most PCI drivers require uio/vfio.
>>>>>>
>>>>>> Signed-off-by: Olivier Matz <olivier.matz@6wind.com>
>>>>>> Acked-by: Fiona Trahe <fiona.trahe@intel.com>
>>>>> Acked-by: Adrien Mazarguil <adrien.mazarguil@6wind.com>
>>>> Applied in main tree, thanks
>>> Is there any plan on how it will be done/solved for a new drivers in
>>> dpdk-next-net?
>>> Should I care about it for sfc?
>>>
>> Given that all pmdinfo information is opt-in (that is to say not obligatory),
>> you can now wait until net-next does its next rebase, and as you continue your
>> development of the sfc driver, you can add the use of this macro in at your
>> leisure.  As more people do that, we will arrive at 100% coverage
> 
> I see. Will do. Thanks.
> 

Hi Andrew,

Patch rebased to next-net, would you mind doing the mentioned patch for it?

Thanks,
ferruh

^ permalink raw reply

* Re: [PATCH v6 23/25] app/testpmd: handle i40e in VF VLAN filter command
From: Ferruh Yigit @ 2016-12-22 10:57 UTC (permalink / raw)
  To: Wenzhuo Lu, dev; +Cc: Bernard Iremonger
In-Reply-To: <1482302070-128496-24-git-send-email-wenzhuo.lu@intel.com>

On 12/21/2016 6:34 AM, Wenzhuo Lu wrote:
> modify set_vf_rx_vlan function to handle the i40e PMD.
> 
> Signed-off-by: Bernard Iremonger <bernard.iremonger@intel.com>
> ---

<...>

> +
> +	switch (ret) {
> +	case 0:
> +		break;
> +	case -EINVAL:
> +		printf("invalid vlan_id %d or vf_mask %lu\n",

To fix 32bit compilation:
printf("invalid vlan_id %d or vf_mask %"PRIu64"\n",


<...>

^ permalink raw reply

* Re: [PATCH 15/18] net/ixgbe: parse flow director filter
From: Ferruh Yigit @ 2016-12-22 10:44 UTC (permalink / raw)
  To: Zhao1, Wei, dev@dpdk.org; +Cc: Lu, Wenzhuo, Adrien Mazarguil
In-Reply-To: <A2573D2ACFCADC41BB3BE09C6DE313CA0200DDDD@PGSMSX103.gar.corp.intel.com>

On 12/22/2016 9:19 AM, Zhao1, Wei wrote:
> Hi, Yigit
> 
>> -----Original Message-----
>> From: Yigit, Ferruh
>> Sent: Wednesday, December 21, 2016 1:01 AM
>> To: Zhao1, Wei <wei.zhao1@intel.com>; dev@dpdk.org
>> Cc: Lu, Wenzhuo <wenzhuo.lu@intel.com>
>> Subject: Re: [dpdk-dev] [PATCH 15/18] net/ixgbe: parse flow director filter
>>
>> On 12/2/2016 10:43 AM, Wei Zhao wrote:
>>> From: wei zhao1 <wei.zhao1@intel.com>
>>>
>>> check if the rule is a flow director rule, and get the flow director info.
>>>
>>> Signed-off-by: wei zhao1 <wei.zhao1@intel.com>
>>> Signed-off-by: Wenzhuo Lu <wenzhuo.lu@intel.com>
>>> ---
>>
>> <...>
>>
>>> +	PATTERN_SKIP_VOID(rule, struct ixgbe_fdir_rule,
>>> +			  RTE_FLOW_ERROR_TYPE_ITEM_NUM);
>>> +	if (item->type != RTE_FLOW_ITEM_TYPE_ETH &&
>>> +	    item->type != RTE_FLOW_ITEM_TYPE_IPV4 &&
>>> +	    item->type != RTE_FLOW_ITEM_TYPE_IPV6 &&
>>> +	    item->type != RTE_FLOW_ITEM_TYPE_UDP &&
>>> +	    item->type != RTE_FLOW_ITEM_TYPE_VXLAN &&
>>> +	    item->type != RTE_FLOW_ITEM_TYPE_NVGRE) {
>>
>> This gives build error [1], there are a few more same usage:
>>
>> .../drivers/net/ixgbe/ixgbe_ethdev.c:9238:17: error: comparison of constant
>> 241 with expression of type 'const enum rte_flow_item_type' is always true
>> [-Werror,-Wtautological-constant-out-of-range-compare]
>>             item->type != RTE_FLOW_ITEM_TYPE_NVGRE) {
>>
>>
>>
> 
> Ok, I will add two type definition RTE_FLOW_ITEM_TYPE_NVGRE and RTE_FLOW_ITEM_TYPE_E_TAG  into const enum rte_flow_item_type to eliminate this problem.
> Thank you.
> 

CC: Adrien Mazarguil <adrien.mazarguil@6wind.com>

Yes, that is what right thing to do, since rte_flow patchset not merged
yet, perhaps Adrien may want to include this as next version of his
patchset?

What do you think Adrien?

Thanks,
ferruh

^ permalink raw reply


This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox