DPDK-dev Archive on lore.kernel.org
 help / color / mirror / Atom feed
* Re: [PATCH 13/13] i40e: improve message grepability
From: Ferruh Yigit @ 2017-01-09 14:11 UTC (permalink / raw)
  To: Michał Mirosław, dev
In-Reply-To: <39ceb2bf1e5aa61e3957a8d8f9e5b2df28d6d2ad.1481590851.git.mirq-linux@rere.qmqm.pl>

On 12/13/2016 1:08 AM, Michał Mirosław wrote:
> Signed-off-by: Michał Mirosław <michal.miroslaw@atendesoftware.pl>
> ---

<...>

>  	if (ret)
> -		PMD_INIT_LOG(ERR, "Failed to add filter to drop flow control "
> -				  " frames from VSIs.");
> +		PMD_INIT_LOG(ERR, "Failed to add filter to drop flow control frames from VSIs.");

According latest discussion, msg integrity has priority over line
limitation.

But can you please break the lines while keeping whole msg, like:
> +		PMD_INIT_LOG(ERR,
			"Failed to add filter to drop flow control frames from VSIs.");

<...>

>  	if (ret != I40E_SUCCESS) {
> -		PMD_DRV_LOG(ERR, "Fail to debug read from "
> -			    "I40E_GL_SWT_L2TAGCTRL[%d]", reg_id);
> +		PMD_DRV_LOG(ERR, "Fail to debug read from I40E_GL_SWT_L2TAGCTRL[%d]", reg_id);

And can you wrap arguments into next line:
+		PMD_DRV_LOG(ERR,
			"Fail to debug read from I40E_GL_SWT_L2TAGCTRL[%d]",
			reg_id);

<...>

^ permalink raw reply

* Re: [PATCH v5 3/8] ethdev: reserve capability flags for PMD-specific API
From: Adrien Mazarguil @ 2017-01-09 14:41 UTC (permalink / raw)
  To: Ananyev, Konstantin
  Cc: Bie, Tiwei, dev@dpdk.org, Lu, Wenzhuo, Mcnamara, John,
	olivier.matz@6wind.com, thomas.monjalon@6wind.com, Zhang, Helin,
	Dai, Wei, Wang, Xiao W
In-Reply-To: <2601191342CEEE43887BDE71AB9772583F10241C@irsmsx105.ger.corp.intel.com>

Hi Konstantin,

On Sun, Jan 08, 2017 at 12:39:55PM +0000, Ananyev, Konstantin wrote:
> 
> Hi Adrien,
> 
> > 
> > Hi Konstantin,
> > 
> > On Thu, Jan 05, 2017 at 11:32:38AM +0000, Ananyev, Konstantin wrote:
> > > Hi Adrien,
[...]
> > > > PMD-specific symbols have nothing to do in the global namespace in my
> > > > opinion, they are not versioned and may evolve without notice. Neither
> > > > applications nor the bonding PMD can rely on them. That's the trade-off.
> > >
> > > Not sure I do understand your reasoning.
> > > For me MACSEC offload is just one of many HW offloads that we support
> > > and should be treated in exactly the same way.
> > > Applications should be able to use it in a transparent and reliable way,
> > > not only under some limited conditions.
> > > Otherwise what is the point to introduce it at all?
> > 
> > Well my first reply to this thread was asking why isn't the whole API global
> > from the start then?
> 
> That's good question, and my preference would always be to have the
> API to configure this feature as generic one.
> I guess the main reason why it is not right now we don't reach an agreement
> how this API should look like: 
> http://dpdk.org/ml/archives/dev/2016-September/047810.html
> But I'll leave it to the author to provide the real reason here. 
> 
> > Given there are valid reasons for it not to and no plan to make it so in the
> > near future, applications must be aware that they are including
> > rte_pmd_ixgbe.h to use it. That in itself is a limiting condition, right?
> 
> Yes, it is definitely a limiting factor.
> Though even if API to configure device to use macsec would be PMD specific right now,
> The API to query that capability and the API to use it at datapath (mbuf.ol_flags) still
> can be (and I think should be) device independent and transparent to use.  

With RESERVED flags, what is global and transparent is the fact the mbuf or
device features in question are PMD-specific and some extra knowledge about
the underlying port is necessary to handle them. I think that could be
useful to applications in order to get the necessary precautions.

> > > Yes, right now it is supported only by ixgbe PMD, but why that should be the
> > > reason to treat is as second-class citizen?
> > > Let say PKT_TX_TUNNEL_* offloads also are supported only by one PMD right now.
> > 
> > You are right about PKT_TX_TUNNEL_*, however these flags exist on their own
> > and are not tied to any API function calls, unlike in this series where
> > PKT_TX_MACSEC can only be used if the DEV_TX_OFFLOAD_MACSEC_INSERT
> > capability is present 
> 
> I don't think PKT_TX_TUNNEL_* 'exists on its own'.
> To use it well behaving app have to:
> 1) Query that device does provide that capability: DEV_TX_OFFLOAD_*_TNL_TSO
> 2) configure PMD( & device) to use that capability
> 3) use that offload at run-time TX code (mb->ol_flags |= ...; mb->tx_offload = ...)
> 
> For PKT_TX_TUNNEL_*  2) is pretty simple - user just need to make sure
> that full-featured TX function will be selected:
> txconf.txq_flags = 0; ...;  rte_eth_tx_queue_setup(..., &txconf);

Pretty much like any other offloads. Any PMD could implement them as well
without exposing additional callbacks.

> For TX_MACSEC, as I understand 2) will be more complicated and
> right now is PMD specific, but anyway the main pattern remains the same.
> So at least 1) and 3) could be kept device neutral.

Yes, however this discussion is precisely because I think it could be a
problem that this API "right now is PMD specific", particularly because it
will remain that way.

> >and the whole thing was configured through
> > rte_pmd_ixgbe_macsec_*() calls after including rte_pmd_ixgbe.h.
> > 
> > To be clear it is not about MACsec per se (as a standardized protocol, I
> > think related definitions for offloads have their place), but it has to do
> > with the fact that the rest of the API is PMD-specific and there is a
> > dependency between them.
> > 
> > > > Therefore until APIs are made global, the safe compromise is to define
> > > > neutral, reserved symbols that any PMD can use to implement their own
> > > > temporary APIs for testing purposes. These can be renamed later without
> > > > changing their value as long as a single PMD uses them.
> > >
> > > Ok, so what we'll gain by introducing PKT_TX_RESERVED instead of PKT_TX_MACSEC?
> > > As I said in my previous mail the redefinition for the same ol_flag bit (and dev capabilities)
> > > by different PMD might create a lot of confusion in future.
> > > Does the potential saving of 1 bit really worth it?
> > 
> > That is one benefit, but my point is mainly to keep applications aware that
> > they are using an API defined by a single PMD, which may be temporary and
> > whose symbols are not versioned.
> 
> As applications have to use PMD specific functions to configure it they definitely are aware.

Therefore they also know if they are only running on top of ixgbe adapters
of a mix with something else. By choosing the PMD-specific path, they know
what they are getting into.

> > Consider this:
> > 
> > rte_mbuf.h:
> > 
> >  #define PKT_TX_RESERVED_0 (1 << 42)
> > 
> > rte_pmd_ixgbe.h:
> > 
> >  #define PKT_TX_MACSEC PKT_TX_RESERVED_0
> > 
> > That way, applications have to get the PKT_TX_MACSEC definition where the
> > rest of the API is also defined.
> > 
> > Other PMDs may reuse PKT_TX_RESERVED_0 and other reserved flags to implement
> > their own experimental APIs.
> 
> That's the main thing I am opposed to.
> I think that by allowing PMD to redefine meaning of
> mbuf.ol_flags and dev_info.(rx| tx)_offload_capa 
> we just asking for trouble.
> 
> Let say tomorrow, i40e will redefine DEV_TX_OFFLOAD_RESERVED_0 and PKT_TX_RESERVED_0
> to implement new specific TX offload (PKT_TX_FEATURE_X).
> Now let say we have an application that works over both ixgbe and i40e
> and would like to use both TX_MACSEC and TC_FEATURE_X offloads whenever they are available.
> As I can see, with the approach you proposed the only way for the application to make it
> is to support 2 different TX code paths (or at least some parts of it).
> To me that way looks inconvenient to the users and source of future troubles.

Remember applications still have to clear PKT_TX_MACSEC and other flags on
ports that do not implement them, just like when the related offload is not
configured even if supported. Whichever approach is taken, applications have
to be careful and need a few extra checks in their TX path. There is no
extra cost involved compared to existing offloads.

> Same for RX:  somewhere at upper layer user got a packet with PKT_RX_RESERVED_0 set.
> What does it really mean if there are different NIC types in the system?

For RX, I agree things are more complicated, applications would have to know
what a given flag means from a particular port. We could define several
RESERVED_X flags that do not overlap on a case basis, so at least
applications know they are dealing with PMD-specific flags.

> > Applications and the bonding PMD can easily be made aware that such reserved
> > flags cannot be shared between ports unless they know what the underlying
> > PMD is, which is already a requirement to use this API in the first place
> > (for instance, calling rte_pmd_ixgbe_macsec_*() functions with another
> > vendor's port_id may crash the application).
> 
> I am talking about that code:
> rte_eth_bond_create(const char *name, uint8_t mode, uint8_t socket_id)
> {
> ...
> /* Take the first dev's offload capabilities */
> internals->rx_offload_capa = dev_info.rx_offload_capa;
> internals->tx_offload_capa = dev_info.tx_offload_capa;
> ...
> internals->rx_offload_capa &= dev_info.rx_offload_capa;
> internals->tx_offload_capa &= dev_info.tx_offload_capa;
> 
> Obviously with what you are suggesting it is not valid any more.
> Bonded device need to support a MASK of all device reserved offloads to exclude
> them from common subset. 
> Any user app(/lib) that does similar thing would also have to be changed.

Why can't they clear RESERVED flags as well? We just have to document the
expected behavior. Even if the bonding PMD does not, applications that do
not use those flags (since they are reserved) should not be impacted.

Because the bonding PMD won't forward PMD-specific API calls, I think it's
safer to clear them anyway.

By the way, how do you handle the case where the bonding PMD exposes the
MACSEC feature and as a result the application tries to configure MACSEC
support on the bonding port_id? How is the resulting crash documented in a
generic fashion?

> > So the idea if/when the API is made global is to rename PKT_TX_RESERVED_0 to
> > PKT_TX_MACSEC and keep its original value.
> > 
> > If other PMDs also implemented PKT_TX_RESERVED_0 in the meantime, it is
> > redefined using a different value. If there is no room left to do so, these
> > PMDs are out of luck I guess, and their specific API is disabled/removed
> > until something gets re-designed.
> > 
> > How about this?
> 
> I still think that we shouldn't allow PMDs to redefine mbuf.olflags and
> dev_info.(rx|tx)_offload_capa. 
> See above for my reasons.

I'm still not comfortable with partially global/specific APIs such as this,
because we assume applications are fully aware of the underlying port in
order to use them, while at the same time the related flags are part of the
global namespace, I find it confusing.

If application developers have no problem with that (so far they haven't
commented on this topic, which means they likely agree to go with anything),
I have no reason left to go against the majority.

-- 
Adrien Mazarguil
6WIND

^ permalink raw reply

* Re: Port stats zero when using MLX5 DPDK driver
From: george.dit @ 2017-01-09 14:44 UTC (permalink / raw)
  To: Shahaf Shuler; +Cc: dev@dpdk.org
In-Reply-To: <AM4PR05MB1505297E0C5668DD7A1B2DA6C3640@AM4PR05MB1505.eurprd05.prod.outlook.com>

Hi Shahaf,

Thanks for the clarification!
Is there a plan for this functionality to be provided (soon)? (or any
technical limitation that stands in this way)

Thanks and best regards,
Georgios

On Mon, Jan 9, 2017 at 2:32 PM, Shahaf Shuler <shahafs@mellanox.com> wrote:

> Hi Georgios,
>
> It is not support on Mellanox PMD to read the primary process counters
> from a secondary process.
>
>
> --Shahaf
>
>
> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of george.dit@gmail.com
> Sent: Sunday, January 8, 2017 3:38 PM
> To: dev@dpdk.org
> Subject: [dpdk-dev] Port stats zero when using MLX5 DPDK driver
>
> Hi,
>
> I have a simple setup with a machine that contains a dual port 10 GbE
> Intel 82599ES NIC and another dual port 100 GbE Mellanox ConnectX-4 NIC.
> The Intel ports are 0 and 1, while the Mellanox ones are 2 and 3.
>
> I properly compiled DPDK 16.11 and test-pmd works just fine for all 4
> ports.
> Then, I ran a simple primary application that forwards packets from 0 <-->
> 1 and 2 <--> 3 and started dpdk-procinfo -- --stats (or --xstats) as a
> secondary monitoring process, while sending some traffic to all 4 ports.
> The problem I see is that the statistics reported by the Mellanox NICs are
> always zero (Intel ports report just fine).
>
> What is the reason behind this behavior? Is there a bug in the driver
> (maybe recently fixed by DPDK 17.02 rc?) or is it simply a lack of this
> functionality?
>
> Thanks and best regards,
>
> --
>    Georgios Katsikas
>    Ph.D. Student and Research Assistant
>    Network Systems Lab (NSL)
>
>
>
>        *E-Mail:*  george <george.katsikas@imdea.org>.dit@gmail.com
>    *Web Site:*  http://www.di.uoa.gr/~katsikas/ <
> http://people.networks.imdea.org/~george_katsikas/index.html>
>



-- 
   Georgios Katsikas
   Ph.D. Student and Research Assistant
   Network Systems Lab (NSL)



       *E-Mail:*  george <george.katsikas@imdea.org>.dit@gmail.com
   *Web Site:*  http://www.di.uoa.gr/~katsikas/
<http://people.networks.imdea.org/~george_katsikas/index.html>

^ permalink raw reply

* [PATCH] crypto/openssl: fix that remove unneeded check
From: Piotr Azarewicz @ 2017-01-09 14:45 UTC (permalink / raw)
  To: pablo.de.lara.guarch, dev; +Cc: stable

EVP_CIPHER_CTX_set_padding() function always returns 1, so the check is
unneeded.

Fixes: d61f70b4c918 ("crypto/libcrypto: add driver for OpenSSL library")

Signed-off-by: Piotr Azarewicz <piotrx.t.azarewicz@intel.com>
---
 drivers/crypto/openssl/rte_openssl_pmd.c |    3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/drivers/crypto/openssl/rte_openssl_pmd.c b/drivers/crypto/openssl/rte_openssl_pmd.c
index 832ea1d..312154a 100644
--- a/drivers/crypto/openssl/rte_openssl_pmd.c
+++ b/drivers/crypto/openssl/rte_openssl_pmd.c
@@ -522,8 +522,7 @@
 	if (EVP_DecryptInit_ex(ctx, algo, NULL, key, iv) <= 0)
 		goto process_cipher_decrypt_err;
 
-	if (EVP_CIPHER_CTX_set_padding(ctx, 0) <= 0)
-		goto process_cipher_decrypt_err;
+	EVP_CIPHER_CTX_set_padding(ctx, 0);
 
 	if (EVP_DecryptUpdate(ctx, dst, &dstlen, src, srclen) <= 0)
 		goto process_cipher_decrypt_err;
-- 
1.7.9.5

^ permalink raw reply related

* [PATCH v4 0/6] distributor library performance enhancements
From: David Hunt @ 2017-01-09  7:50 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson
In-Reply-To: <1482381428-148094-2-git-send-email-david.hunt@intel.com>

This patch aims to improve the throughput of the distributor library.

It uses a similar handshake mechanism to the previous version of
the library, in that bits are used to indicate when packets are ready
to be sent to a worker and ready to be returned from a worker. One main
difference is that instead of sending one packet in a cache line, it makes
use of the 7 free spaces in the same cache line in order to send up to
8 packets at a time to/from a worker.

The flow matching algorithm has had significant re-work, and now keeps an
array of inflight flows and an array of backlog flows, and matches incoming
flows to the inflight/backlog flows of all workers so that flow pinning to
workers can be maintained.

The Flow Match algorithm has both scalar and a vector versions, and a
function pointer is used to select the post appropriate function at run time,
depending on the presence of the SSE2 cpu flag. On non-x86 platforms, the
the scalar match function is selected, which should still gives a good boost
in performance over the non-burst API.

v2 changes:
  * Created a common distributor_priv.h header file with common
    definitions and structures.
  * Added a scalar version so it can be built and used on machines without
    sse2 instruction set
  * Added unit autotests
  * Added perf autotest

v3 changes:
  * Addressed mailing list review comments
  * Test code removal
  * Split out SSE match into separate file to facilitate NEON addition
  * Cleaned up conditional compilation flags for SSE2
  * Addressed c99 style compilation errors
  * rebased on latest head (Jan 2 2017, Happy New Year to all)

v4 changes:
   * fixed issue building shared libraries

Notes:
   Apps must now work in bursts, as up to 8 are given to a worker at a time
   For performance in matching, Flow ID's are 15-bits
   Original API (and code) is kept for backward compatibility

Performance Gains
   2.2GHz Intel(R) Xeon(R) CPU E5-2699 v4 @ 2.20GHz
   2 x XL710 40GbE NICS to 2 x 40Gbps traffic generator channels 64b packets
   separate cores for rx, tx, distributor
    1 worker  - 4.8x
    4 workers - 2.9x
    8 workers - 1.8x
   12 workers - 2.1x
   16 workers - 1.8x

[PATCH v4 1/6] lib: distributor performance enhancements
[PATCH v4 2/6] lib: add distributor vector flow matching
[PATCH v4 3/6] test: unit tests for new distributor burst api
[PATCH v4 4/6] test: add distributor_perf autotest
[PATCH v4 5/6] example: distributor app showing burst api
[PATCH v4 6/6] doc: distributor library changes for new burst api

^ permalink raw reply

* [PATCH v4 1/6] lib: distributor performance enhancements
From: David Hunt @ 2017-01-09  7:50 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson, David Hunt
In-Reply-To: <1483948248-91364-1-git-send-email-david.hunt@intel.com>

Now sends bursts of up to 8 mbufs to each worker, and tracks
the in-flight flow-ids (atomic scheduling)

New file with a new api, similar to the old API except with _burst
at the end of the function names

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 lib/librte_distributor/Makefile                    |   2 +
 lib/librte_distributor/rte_distributor.c           |  72 +--
 lib/librte_distributor/rte_distributor_burst.c     | 558 +++++++++++++++++++++
 lib/librte_distributor/rte_distributor_burst.h     | 255 ++++++++++
 lib/librte_distributor/rte_distributor_priv.h      | 189 +++++++
 lib/librte_distributor/rte_distributor_version.map |   9 +
 6 files changed, 1014 insertions(+), 71 deletions(-)
 create mode 100644 lib/librte_distributor/rte_distributor_burst.c
 create mode 100644 lib/librte_distributor/rte_distributor_burst.h
 create mode 100644 lib/librte_distributor/rte_distributor_priv.h

diff --git a/lib/librte_distributor/Makefile b/lib/librte_distributor/Makefile
index 4c9af17..2acc54d 100644
--- a/lib/librte_distributor/Makefile
+++ b/lib/librte_distributor/Makefile
@@ -43,9 +43,11 @@ LIBABIVER := 1
 
 # all source are stored in SRCS-y
 SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) := rte_distributor.c
+SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += rte_distributor_burst.c
 
 # install this header file
 SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include := rte_distributor.h
+SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include += rte_distributor_burst.h
 
 # this lib needs eal
 DEPDIRS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += lib/librte_eal
diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c
index f3f778c..c05f6e3 100644
--- a/lib/librte_distributor/rte_distributor.c
+++ b/lib/librte_distributor/rte_distributor.c
@@ -40,79 +40,9 @@
 #include <rte_errno.h>
 #include <rte_string_fns.h>
 #include <rte_eal_memconfig.h>
+#include "rte_distributor_priv.h"
 #include "rte_distributor.h"
 
-#define NO_FLAGS 0
-#define RTE_DISTRIB_PREFIX "DT_"
-
-/* we will use the bottom four bits of pointer for flags, shifting out
- * the top four bits to make room (since a 64-bit pointer actually only uses
- * 48 bits). An arithmetic-right-shift will then appropriately restore the
- * original pointer value with proper sign extension into the top bits. */
-#define RTE_DISTRIB_FLAG_BITS 4
-#define RTE_DISTRIB_FLAGS_MASK (0x0F)
-#define RTE_DISTRIB_NO_BUF 0       /**< empty flags: no buffer requested */
-#define RTE_DISTRIB_GET_BUF (1)    /**< worker requests a buffer, returns old */
-#define RTE_DISTRIB_RETURN_BUF (2) /**< worker returns a buffer, no request */
-
-#define RTE_DISTRIB_BACKLOG_SIZE 8
-#define RTE_DISTRIB_BACKLOG_MASK (RTE_DISTRIB_BACKLOG_SIZE - 1)
-
-#define RTE_DISTRIB_MAX_RETURNS 128
-#define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
-
-/**
- * Maximum number of workers allowed.
- * Be aware of increasing the limit, becaus it is limited by how we track
- * in-flight tags. See @in_flight_bitmask and @rte_distributor_process
- */
-#define RTE_DISTRIB_MAX_WORKERS	64
-
-/**
- * Buffer structure used to pass the pointer data between cores. This is cache
- * line aligned, but to improve performance and prevent adjacent cache-line
- * prefetches of buffers for other workers, e.g. when worker 1's buffer is on
- * the next cache line to worker 0, we pad this out to three cache lines.
- * Only 64-bits of the memory is actually used though.
- */
-union rte_distributor_buffer {
-	volatile int64_t bufptr64;
-	char pad[RTE_CACHE_LINE_SIZE*3];
-} __rte_cache_aligned;
-
-struct rte_distributor_backlog {
-	unsigned start;
-	unsigned count;
-	int64_t pkts[RTE_DISTRIB_BACKLOG_SIZE];
-};
-
-struct rte_distributor_returned_pkts {
-	unsigned start;
-	unsigned count;
-	struct rte_mbuf *mbufs[RTE_DISTRIB_MAX_RETURNS];
-};
-
-struct rte_distributor {
-	TAILQ_ENTRY(rte_distributor) next;    /**< Next in list. */
-
-	char name[RTE_DISTRIBUTOR_NAMESIZE];  /**< Name of the ring. */
-	unsigned num_workers;                 /**< Number of workers polling */
-
-	uint32_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS];
-		/**< Tracks the tag being processed per core */
-	uint64_t in_flight_bitmask;
-		/**< on/off bits for in-flight tags.
-		 * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64 then
-		 * the bitmask has to expand.
-		 */
-
-	struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS];
-
-	union rte_distributor_buffer bufs[RTE_DISTRIB_MAX_WORKERS];
-
-	struct rte_distributor_returned_pkts returns;
-};
-
 TAILQ_HEAD(rte_distributor_list, rte_distributor);
 
 static struct rte_tailq_elem rte_distributor_tailq = {
diff --git a/lib/librte_distributor/rte_distributor_burst.c b/lib/librte_distributor/rte_distributor_burst.c
new file mode 100644
index 0000000..ae7cf9d
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor_burst.c
@@ -0,0 +1,558 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <sys/queue.h>
+#include <string.h>
+#include <rte_mbuf.h>
+#include <rte_memory.h>
+#include <rte_cycles.h>
+#include <rte_memzone.h>
+#include <rte_errno.h>
+#include <rte_string_fns.h>
+#include <rte_eal_memconfig.h>
+#include "rte_distributor_priv.h"
+#include "rte_distributor_burst.h"
+
+TAILQ_HEAD(rte_dist_burst_list, rte_distributor_burst);
+
+static struct rte_tailq_elem rte_dist_burst_tailq = {
+	.name = "RTE_DIST_BURST",
+};
+EAL_REGISTER_TAILQ(rte_dist_burst_tailq)
+
+/**** APIs called by workers ****/
+
+/**** Burst Packet APIs called by workers ****/
+
+/* This function should really be called return_pkt_burst() */
+void
+rte_distributor_request_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **oldpkt,
+		unsigned int count)
+{
+	struct rte_distributor_buffer_burst *buf = &(d->bufs[worker_id]);
+	unsigned int i;
+
+	volatile int64_t *retptr64;
+
+
+	/* if we dont' have any packets to return, return. */
+	if (count == 0)
+		return;
+
+	retptr64 = &(buf->retptr64[0]);
+	/* Spin while handshake bits are set (scheduler clears it) */
+	while (unlikely(*retptr64 & RTE_DISTRIB_GET_BUF)) {
+		rte_pause();
+		uint64_t t = rte_rdtsc()+100;
+
+		while (rte_rdtsc() < t)
+			rte_pause();
+	}
+
+	/*
+	 * OK, if we've got here, then the scheduler has just cleared the
+	 * handshake bits. Populate the retptrs with returning packets.
+	 */
+
+	for (i = count; i < RTE_DIST_BURST_SIZE; i++)
+		buf->retptr64[i] = 0;
+
+	/* Set Return bit for each packet returned */
+	for (i = count; i-- > 0; )
+		buf->retptr64[i] =
+			(((int64_t)(uintptr_t)(oldpkt[i])) <<
+			RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
+
+	/*
+	 * Finally, set the GET_BUF  to signal to distributor that cache
+	 * line is ready for processing
+	 */
+	*retptr64 |= RTE_DISTRIB_GET_BUF;
+}
+
+int
+rte_distributor_poll_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **pkts)
+{
+	struct rte_distributor_buffer_burst *buf = &d->bufs[worker_id];
+	uint64_t ret;
+	int count = 0;
+	unsigned int i;
+
+	/* If bit is set, return */
+	if (buf->bufptr64[0] & RTE_DISTRIB_GET_BUF)
+		return 0;
+
+	/* since bufptr64 is signed, this should be an arithmetic shift */
+	for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
+		if (likely(buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)) {
+			ret = buf->bufptr64[i] >> RTE_DISTRIB_FLAG_BITS;
+			pkts[count++] = (struct rte_mbuf *)((uintptr_t)(ret));
+		}
+	}
+
+	/*
+	 * so now we've got the contents of the cacheline into an  array of
+	 * mbuf pointers, so toggle the bit so scheduler can start working
+	 * on the next cacheline while we're working.
+	 */
+	buf->bufptr64[0] |= RTE_DISTRIB_GET_BUF;
+
+
+	return count;
+}
+
+int
+rte_distributor_get_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **pkts,
+		struct rte_mbuf **oldpkt, unsigned int return_count)
+{
+	unsigned int count;
+	uint64_t retries = 0;
+
+	rte_distributor_request_pkt_burst(d, worker_id, oldpkt, return_count);
+
+	count = rte_distributor_poll_pkt_burst(d, worker_id, pkts);
+	while (count == 0) {
+		rte_pause();
+		retries++;
+		if (retries > 1000)
+			return 0;
+
+		uint64_t t = rte_rdtsc()+100;
+
+		while (rte_rdtsc() < t)
+			rte_pause();
+
+		count = rte_distributor_poll_pkt_burst(d, worker_id, pkts);
+	}
+	return count;
+}
+
+int
+rte_distributor_return_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **oldpkt, int num)
+{
+	struct rte_distributor_buffer_burst *buf = &d->bufs[worker_id];
+	unsigned int i;
+
+	for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
+		/* Switch off the return bit first */
+		buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
+
+	for (i = num; i-- > 0; )
+		buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<
+			RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
+
+	/* set the GET_BUF but even if we got no returns */
+	buf->retptr64[0] |= RTE_DISTRIB_GET_BUF;
+
+	return 0;
+}
+
+/**** APIs called on distributor core ***/
+
+/* stores a packet returned from a worker inside the returns array */
+static inline void
+store_return(uintptr_t oldbuf, struct rte_distributor_burst *d,
+		unsigned int *ret_start, unsigned int *ret_count)
+{
+	if (!oldbuf)
+		return;
+	/* store returns in a circular buffer */
+	d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
+			= (void *)oldbuf;
+	*ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK);
+	*ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK);
+}
+
+static inline void
+find_match_scalar(struct rte_distributor_burst *d,
+			uint16_t *data_ptr,
+			uint16_t *output_ptr)
+{
+	struct rte_distributor_backlog *bl;
+	uint16_t i, j, w;
+
+	/*
+	 * Function overview:
+	 * 1. Loop through all worker ID's
+	 * 2. Compare the current inflights to the incoming tags
+	 * 3. Compare the current backlog to the incoming tags
+	 * 4. Add any matches to the output
+	 */
+
+	for (j = 0 ; j < RTE_DIST_BURST_SIZE; j++)
+		output_ptr[j] = 0;
+
+	for (i = 0; i < d->num_workers; i++) {
+		bl = &d->backlog[i];
+
+		for (j = 0; j < RTE_DIST_BURST_SIZE ; j++)
+			for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
+				if (d->in_flight_tags[i][j] == data_ptr[w]) {
+					output_ptr[j] = i+1;
+					break;
+				}
+		for (j = 0; j < RTE_DIST_BURST_SIZE; j++)
+			for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
+				if (bl->tags[j] == data_ptr[w]) {
+					output_ptr[j] = i+1;
+					break;
+				}
+	}
+
+	/*
+	 * At this stage, the output contains 8 16-bit values, with
+	 * each non-zero value containing the worker ID on which the
+	 * corresponding flow is pinned to.
+	 */
+}
+
+
+
+static unsigned int
+handle_returns(struct rte_distributor_burst *d, unsigned int wkr)
+{
+	struct rte_distributor_buffer_burst *buf = &(d->bufs[wkr]);
+	uintptr_t oldbuf;
+	unsigned int ret_start = d->returns.start,
+			ret_count = d->returns.count;
+	unsigned int count = 0;
+	unsigned int i;
+	/*
+	 * wait for the GET_BUF bit to go high, otherwise we can't send
+	 * the packets to the worker
+	 */
+
+	if (buf->retptr64[0] & RTE_DISTRIB_GET_BUF) {
+		for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
+			if (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) {
+				oldbuf = ((uintptr_t)(buf->retptr64[i] >>
+					RTE_DISTRIB_FLAG_BITS));
+				/* store returns in a circular buffer */
+				store_return(oldbuf, d, &ret_start, &ret_count);
+				count++;
+				buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
+			}
+		}
+		d->returns.start = ret_start;
+		d->returns.count = ret_count;
+		/* Clear for the worker to populate with more returns */
+		buf->retptr64[0] = 0;
+	}
+	return count;
+}
+
+static unsigned int
+release(struct rte_distributor_burst *d, unsigned int wkr)
+{
+	struct rte_distributor_buffer_burst *buf = &(d->bufs[wkr]);
+	unsigned int i;
+
+	if (d->backlog[wkr].count == 0)
+		return 0;
+
+	while (!(d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF))
+		rte_pause();
+
+	handle_returns(d, wkr);
+
+	buf->count = 0;
+
+	for (i = 0; i < d->backlog[wkr].count; i++) {
+		d->bufs[wkr].bufptr64[i] = d->backlog[wkr].pkts[i] |
+				RTE_DISTRIB_GET_BUF | RTE_DISTRIB_VALID_BUF;
+		d->in_flight_tags[wkr][i] = d->backlog[wkr].tags[i];
+	}
+	buf->count = i;
+	for ( ; i < RTE_DIST_BURST_SIZE ; i++) {
+		buf->bufptr64[i] = RTE_DISTRIB_GET_BUF;
+		d->in_flight_tags[wkr][i] = 0;
+	}
+
+	d->backlog[wkr].count = 0;
+
+	/* Clear the GET bit */
+	buf->bufptr64[0] &= ~RTE_DISTRIB_GET_BUF;
+	return  buf->count;
+
+}
+
+
+/* process a set of packets to distribute them to workers */
+int
+rte_distributor_process_burst(struct rte_distributor_burst *d,
+		struct rte_mbuf **mbufs, unsigned int num_mbufs)
+{
+	unsigned int next_idx = 0;
+	static unsigned int wkr;
+	struct rte_mbuf *next_mb = NULL;
+	int64_t next_value = 0;
+	uint16_t new_tag = 0;
+	uint16_t flows[RTE_DIST_BURST_SIZE] __rte_cache_aligned;
+	unsigned int i, wid;
+	int j, w;
+
+	if (unlikely(num_mbufs == 0)) {
+		/* Flush out all non-full cache-lines to workers. */
+		for (wid = 0 ; wid < d->num_workers; wid++) {
+			if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF)) {
+				release(d, wid);
+				handle_returns(d, wid);
+			}
+		}
+		return 0;
+	}
+
+	while (next_idx < num_mbufs) {
+		uint16_t matches[RTE_DIST_BURST_SIZE];
+		int pkts;
+
+		if (d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF)
+			d->bufs[wkr].count = 0;
+
+		for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
+			if (mbufs[next_idx + i]) {
+				/* flows have to be non-zero */
+				flows[i] = mbufs[next_idx + i]->hash.usr | 1;
+			} else
+				flows[i] = 0;
+		}
+
+		switch (d->dist_match_fn) {
+		default:
+			find_match_scalar(d, &flows[0], &matches[0]);
+		}
+
+		/*
+		 * Matches array now contain the intended worker ID (+1) of
+		 * the incoming packets. Any zeroes need to be assigned
+		 * workers.
+		 */
+
+		if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE)
+			pkts = num_mbufs - next_idx;
+		else
+			pkts = RTE_DIST_BURST_SIZE;
+
+		for (j = 0; j < pkts; j++) {
+
+			next_mb = mbufs[next_idx++];
+			next_value = (((int64_t)(uintptr_t)next_mb) <<
+					RTE_DISTRIB_FLAG_BITS);
+			/*
+			 * User is advocated to set tag vaue for each
+			 * mbuf before calling rte_distributor_process.
+			 * User defined tags are used to identify flows,
+			 * or sessions.
+			 */
+			/* flows MUST be non-zero */
+			new_tag = (uint16_t)(next_mb->hash.usr) | 1;
+
+			/*
+			 * Uncommenting the next line will cause the find_match
+			 * function to be optimised out, making this function
+			 * do parallel (non-atomic) distribution
+			 */
+			/* matches[j] = 0; */
+
+			if (matches[j]) {
+				struct rte_distributor_backlog *bl =
+						&d->backlog[matches[j]-1];
+				if (unlikely(bl->count ==
+						RTE_DIST_BURST_SIZE)) {
+					release(d, matches[j]-1);
+				}
+
+				/* Add to worker that already has flow */
+				unsigned int idx = bl->count++;
+
+				bl->tags[idx] = new_tag;
+				bl->pkts[idx] = next_value;
+
+			} else {
+				struct rte_distributor_backlog *bl =
+						&d->backlog[wkr];
+				if (unlikely(bl->count ==
+						RTE_DIST_BURST_SIZE)) {
+					release(d, wkr);
+				}
+
+				/* Add to current worker worker */
+				unsigned int idx = bl->count++;
+
+				bl->tags[idx] = new_tag;
+				bl->pkts[idx] = next_value;
+				/*
+				 * Now that we've just added an unpinned flow
+				 * to a worker, we need to ensure that all
+				 * other packets with that same flow will go
+				 * to the same worker in this burst.
+				 */
+				for (w = j; w < pkts; w++)
+					if (flows[w] == new_tag)
+						matches[w] = wkr+1;
+			}
+		}
+		wkr++;
+		if (wkr >= d->num_workers)
+			wkr = 0;
+	}
+
+	/* Flush out all non-full cache-lines to workers. */
+	for (wid = 0 ; wid < d->num_workers; wid++)
+		if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF))
+			release(d, wid);
+
+	return num_mbufs;
+}
+
+/* return to the caller, packets returned from workers */
+int
+rte_distributor_returned_pkts_burst(struct rte_distributor_burst *d,
+		struct rte_mbuf **mbufs, unsigned int max_mbufs)
+{
+	struct rte_distributor_returned_pkts *returns = &d->returns;
+	unsigned int retval = (max_mbufs < returns->count) ?
+			max_mbufs : returns->count;
+	unsigned int i;
+
+	for (i = 0; i < retval; i++) {
+		unsigned int idx = (returns->start + i) &
+				RTE_DISTRIB_RETURNS_MASK;
+
+		mbufs[i] = returns->mbufs[idx];
+	}
+	returns->start += i;
+	returns->count -= i;
+
+	return retval;
+}
+
+/*
+ * Return the number of packets in-flight in a distributor, i.e. packets
+ * being workered on or queued up in a backlog.
+ */
+static inline unsigned int
+total_outstanding(const struct rte_distributor_burst *d)
+{
+	unsigned int wkr, total_outstanding = 0;
+
+	for (wkr = 0; wkr < d->num_workers; wkr++)
+		total_outstanding += d->backlog[wkr].count;
+
+	return total_outstanding;
+}
+
+/*
+ * Flush the distributor, so that there are no outstanding packets in flight or
+ * queued up.
+ */
+int
+rte_distributor_flush_burst(struct rte_distributor_burst *d)
+{
+	const unsigned int flushed = total_outstanding(d);
+	unsigned int wkr;
+
+	while (total_outstanding(d) > 0)
+		rte_distributor_process_burst(d, NULL, 0);
+
+	for (wkr = 0; wkr < d->num_workers; wkr++)
+		handle_returns(d, wkr);
+
+	return flushed;
+}
+
+/* clears the internal returns array in the distributor */
+void
+rte_distributor_clear_returns_burst(struct rte_distributor_burst *d)
+{
+	unsigned int wkr;
+
+	/* throw away returns, so workers can exit */
+	for (wkr = 0; wkr < d->num_workers; wkr++)
+		d->bufs[wkr].retptr64[0] = 0;
+}
+
+/* creates a distributor instance */
+struct rte_distributor_burst *
+rte_distributor_create_burst(const char *name,
+		unsigned int socket_id,
+		unsigned int num_workers)
+{
+	struct rte_distributor_burst *d;
+	struct rte_dist_burst_list *dist_burst_list;
+	char mz_name[RTE_MEMZONE_NAMESIZE];
+	const struct rte_memzone *mz;
+	unsigned int i;
+
+	/* compilation-time checks */
+	RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0);
+	RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
+
+	if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) {
+		rte_errno = EINVAL;
+		return NULL;
+	}
+
+	snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
+	mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
+	if (mz == NULL) {
+		rte_errno = ENOMEM;
+		return NULL;
+	}
+
+	d = mz->addr;
+	snprintf(d->name, sizeof(d->name), "%s", name);
+	d->num_workers = num_workers;
+
+	d->dist_match_fn = RTE_DIST_MATCH_SCALAR;
+
+	/*
+	 * Set up the backog tags so they're pointing at the second cache
+	 * line for performance during flow matching
+	 */
+	for (i = 0 ; i < num_workers ; i++)
+		d->backlog[i].tags = &d->in_flight_tags[i][RTE_DIST_BURST_SIZE];
+
+	dist_burst_list = RTE_TAILQ_CAST(rte_dist_burst_tailq.head,
+					  rte_dist_burst_list);
+
+	rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
+	TAILQ_INSERT_TAIL(dist_burst_list, d, next);
+	rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+
+	return d;
+}
diff --git a/lib/librte_distributor/rte_distributor_burst.h b/lib/librte_distributor/rte_distributor_burst.h
new file mode 100644
index 0000000..5096b13
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor_burst.h
@@ -0,0 +1,255 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_DIST_BURST_H_
+#define _RTE_DIST_BURST_H_
+
+/**
+ * @file
+ * RTE distributor
+ *
+ * The distributor is a component which is designed to pass packets
+ * one-at-a-time to workers, with dynamic load balancing.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct rte_distributor_burst;
+struct rte_mbuf;
+
+/**
+ * Function to create a new distributor instance
+ *
+ * Reserves the memory needed for the distributor operation and
+ * initializes the distributor to work with the configured number of workers.
+ *
+ * @param name
+ *   The name to be given to the distributor instance.
+ * @param socket_id
+ *   The NUMA node on which the memory is to be allocated
+ * @param num_workers
+ *   The maximum number of workers that will request packets from this
+ *   distributor
+ * @return
+ *   The newly created distributor instance
+ */
+struct rte_distributor_burst *
+rte_distributor_create_burst(const char *name, unsigned int socket_id,
+		unsigned int num_workers);
+
+/*  *** APIS to be called on the distributor lcore ***  */
+/*
+ * The following APIs are the public APIs which are designed for use on a
+ * single lcore which acts as the distributor lcore for a given distributor
+ * instance. These functions cannot be called on multiple cores simultaneously
+ * without using locking to protect access to the internals of the distributor.
+ *
+ * NOTE: a given lcore cannot act as both a distributor lcore and a worker lcore
+ * for the same distributor instance, otherwise deadlock will result.
+ */
+
+/**
+ * Process a set of packets by distributing them among workers that request
+ * packets. The distributor will ensure that no two packets that have the
+ * same flow id, or tag, in the mbuf will be processed on different cores at
+ * the same time.
+ *
+ * The user is advocated to set tag for each mbuf before calling this function.
+ * If user doesn't set the tag, the tag value can be various values depending on
+ * driver implementation and configuration.
+ *
+ * This is not multi-thread safe and should only be called on a single lcore.
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param mbufs
+ *   The mbufs to be distributed
+ * @param num_mbufs
+ *   The number of mbufs in the mbufs array
+ * @return
+ *   The number of mbufs processed.
+ */
+int
+rte_distributor_process_burst(struct rte_distributor_burst *d,
+		struct rte_mbuf **mbufs, unsigned int num_mbufs);
+
+/**
+ * Get a set of mbufs that have been returned to the distributor by workers
+ *
+ * This should only be called on the same lcore as rte_distributor_process()
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param mbufs
+ *   The mbufs pointer array to be filled in
+ * @param max_mbufs
+ *   The size of the mbufs array
+ * @return
+ *   The number of mbufs returned in the mbufs array.
+ */
+int
+rte_distributor_returned_pkts_burst(struct rte_distributor_burst *d,
+		struct rte_mbuf **mbufs, unsigned int max_mbufs);
+
+/**
+ * Flush the distributor component, so that there are no in-flight or
+ * backlogged packets awaiting processing
+ *
+ * This should only be called on the same lcore as rte_distributor_process()
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @return
+ *   The number of queued/in-flight packets that were completed by this call.
+ */
+int
+rte_distributor_flush_burst(struct rte_distributor_burst *d);
+
+/**
+ * Clears the array of returned packets used as the source for the
+ * rte_distributor_returned_pkts() API call.
+ *
+ * This should only be called on the same lcore as rte_distributor_process()
+ *
+ * @param d
+ *   The distributor instance to be used
+ */
+void
+rte_distributor_clear_returns_burst(struct rte_distributor_burst *d);
+
+/*  *** APIS to be called on the worker lcores ***  */
+/*
+ * The following APIs are the public APIs which are designed for use on
+ * multiple lcores which act as workers for a distributor. Each lcore should use
+ * a unique worker id when requesting packets.
+ *
+ * NOTE: a given lcore cannot act as both a distributor lcore and a worker lcore
+ * for the same distributor instance, otherwise deadlock will result.
+ */
+
+/**
+ * API called by a worker to get new packets to process. Any previous packets
+ * given to the worker is assumed to have completed processing, and may be
+ * optionally returned to the distributor via the oldpkt parameter.
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param worker_id
+ *   The worker instance number to use - must be less that num_workers passed
+ *   at distributor creation time.
+ * @param pkts
+ *   The mbufs pointer array to be filled in (up to 8 packets)
+ * @param oldpkt
+ *   The previous packet, if any, being processed by the worker
+ * @param retcount
+ *   The number of packets being returned
+ *
+ * @return
+ *   The number of packets in the pkts array
+ */
+int
+rte_distributor_get_pkt_burst(struct rte_distributor_burst *d,
+	unsigned int worker_id, struct rte_mbuf **pkts,
+	struct rte_mbuf **oldpkt, unsigned int retcount);
+
+/**
+ * API called by a worker to return a completed packet without requesting a
+ * new packet, for example, because a worker thread is shutting down
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param worker_id
+ *   The worker instance number to use - must be less that num_workers passed
+ *   at distributor creation time.
+ * @param mbuf
+ *   The previous packet being processed by the worker
+ */
+int
+rte_distributor_return_pkt_burst(struct rte_distributor_burst *d,
+	unsigned int worker_id, struct rte_mbuf **oldpkt, int num);
+
+/**
+ * API called by a worker to request a new packet to process.
+ * Any previous packet given to the worker is assumed to have completed
+ * processing, and may be optionally returned to the distributor via
+ * the oldpkt parameter.
+ * Unlike rte_distributor_get_pkt_burst(), this function does not wait for a
+ * new packet to be provided by the distributor.
+ *
+ * NOTE: after calling this function, rte_distributor_poll_pkt_burst() should
+ * be used to poll for the packet requested. The rte_distributor_get_pkt_burst()
+ * API should *not* be used to try and retrieve the new packet.
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param worker_id
+ *   The worker instance number to use - must be less that num_workers passed
+ *   at distributor creation time.
+ * @param oldpkt
+ *   The returning packets, if any, processed by the worker
+ * @param count
+ *   The number of returning packets
+ */
+void
+rte_distributor_request_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **oldpkt,
+		unsigned int count);
+
+/**
+ * API called by a worker to check for a new packet that was previously
+ * requested by a call to rte_distributor_request_pkt(). It does not wait
+ * for the new packet to be available, but returns NULL if the request has
+ * not yet been fulfilled by the distributor.
+ *
+ * @param d
+ *   The distributor instance to be used
+ * @param worker_id
+ *   The worker instance number to use - must be less that num_workers passed
+ *   at distributor creation time.
+ * @param mbufs
+ *   The array of mbufs being given to the worker
+ *
+ * @return
+ *   The number of packets being given to the worker thread, zero if no
+ *   packet is yet available.
+ */
+int
+rte_distributor_poll_pkt_burst(struct rte_distributor_burst *d,
+		unsigned int worker_id, struct rte_mbuf **mbufs);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/librte_distributor/rte_distributor_priv.h b/lib/librte_distributor/rte_distributor_priv.h
new file mode 100644
index 0000000..833855f
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor_priv.h
@@ -0,0 +1,189 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _RTE_DIST_PRIV_H_
+#define _RTE_DIST_PRIV_H_
+
+/**
+ * @file
+ * RTE distributor
+ *
+ * The distributor is a component which is designed to pass packets
+ * one-at-a-time to workers, with dynamic load balancing.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define NO_FLAGS 0
+#define RTE_DISTRIB_PREFIX "DT_"
+
+/*
+ * We will use the bottom four bits of pointer for flags, shifting out
+ * the top four bits to make room (since a 64-bit pointer actually only uses
+ * 48 bits). An arithmetic-right-shift will then appropriately restore the
+ * original pointer value with proper sign extension into the top bits.
+ */
+#define RTE_DISTRIB_FLAG_BITS 4
+#define RTE_DISTRIB_FLAGS_MASK (0x0F)
+#define RTE_DISTRIB_NO_BUF 0       /**< empty flags: no buffer requested */
+#define RTE_DISTRIB_GET_BUF (1)    /**< worker requests a buffer, returns old */
+#define RTE_DISTRIB_RETURN_BUF (2) /**< worker returns a buffer, no request */
+#define RTE_DISTRIB_VALID_BUF (4)  /**< set if bufptr contains ptr */
+
+#define RTE_DISTRIB_BACKLOG_SIZE 8
+#define RTE_DISTRIB_BACKLOG_MASK (RTE_DISTRIB_BACKLOG_SIZE - 1)
+
+#define RTE_DISTRIB_MAX_RETURNS 128
+#define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
+
+/**
+ * Maximum number of workers allowed.
+ * Be aware of increasing the limit, becaus it is limited by how we track
+ * in-flight tags. See @in_flight_bitmask and @rte_distributor_process
+ */
+#define RTE_DISTRIB_MAX_WORKERS 64
+
+#define RTE_DISTRIBUTOR_NAMESIZE 32 /**< Length of name for instance */
+
+/**
+ * Buffer structure used to pass the pointer data between cores. This is cache
+ * line aligned, but to improve performance and prevent adjacent cache-line
+ * prefetches of buffers for other workers, e.g. when worker 1's buffer is on
+ * the next cache line to worker 0, we pad this out to three cache lines.
+ * Only 64-bits of the memory is actually used though.
+ */
+union rte_distributor_buffer {
+	volatile int64_t bufptr64;
+	char pad[RTE_CACHE_LINE_SIZE*3];
+} __rte_cache_aligned;
+
+/**
+ * Number of packets to deal with in bursts. Needs to be 8 so as to
+ * fit in one cache line.
+ */
+#define RTE_DIST_BURST_SIZE (sizeof(__m128i) / sizeof(uint16_t))
+
+/**
+ * Buffer structure used to pass the pointer data between cores. This is cache
+ * line aligned, but to improve performance and prevent adjacent cache-line
+ * prefetches of buffers for other workers, e.g. when worker 1's buffer is on
+ * the next cache line to worker 0, we pad this out to two cache lines.
+ * We can pass up to 8 mbufs at a time in one cacheline.
+ * There is a separate cacheline for returns in the burst API.
+ */
+struct rte_distributor_buffer_burst {
+	volatile int64_t bufptr64[RTE_DIST_BURST_SIZE]
+			__rte_cache_aligned; /* <= outgoing to worker */
+
+	int64_t pad1 __rte_cache_aligned;    /* <= one cache line  */
+
+	volatile int64_t retptr64[RTE_DIST_BURST_SIZE]
+			__rte_cache_aligned; /* <= incoming from worker */
+
+	int64_t pad2 __rte_cache_aligned;    /* <= one cache line  */
+
+	int count __rte_cache_aligned;       /* <= number of current mbufs */
+};
+
+
+struct rte_distributor_backlog {
+	unsigned int start;
+	unsigned int count;
+	int64_t pkts[RTE_DIST_BURST_SIZE] __rte_cache_aligned;
+	uint16_t *tags; /* will point to second cacheline of inflights */
+} __rte_cache_aligned;
+
+
+struct rte_distributor_returned_pkts {
+	unsigned int start;
+	unsigned int count;
+	struct rte_mbuf *mbufs[RTE_DISTRIB_MAX_RETURNS];
+};
+
+struct rte_distributor {
+	TAILQ_ENTRY(rte_distributor) next;    /**< Next in list. */
+
+	char name[RTE_DISTRIBUTOR_NAMESIZE];  /**< Name of the ring. */
+	unsigned int num_workers;             /**< Number of workers polling */
+
+	uint32_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS];
+		/**< Tracks the tag being processed per core */
+	uint64_t in_flight_bitmask;
+		/**< on/off bits for in-flight tags.
+		 * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64 then
+		 * the bitmask has to expand.
+		 */
+
+	struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS];
+
+	union rte_distributor_buffer bufs[RTE_DISTRIB_MAX_WORKERS];
+
+	struct rte_distributor_returned_pkts returns;
+};
+
+/* All different signature compare functions */
+enum rte_distributor_match_function {
+	RTE_DIST_MATCH_SCALAR = 0,
+	RTE_DIST_MATCH_NUM
+};
+
+struct rte_distributor_burst {
+	TAILQ_ENTRY(rte_distributor_burst) next;    /**< Next in list. */
+
+	char name[RTE_DISTRIBUTOR_NAMESIZE];  /**< Name of the ring. */
+	unsigned int num_workers;             /**< Number of workers polling */
+
+	/**>
+	 * First cache line in the this array are the tags inflight
+	 * on the worker core. Second cache line are the backlog
+	 * that are going to go to the worker core.
+	 */
+	uint16_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS][RTE_DIST_BURST_SIZE*2]
+			__rte_cache_aligned;
+
+	struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS]
+			__rte_cache_aligned;
+
+	struct rte_distributor_buffer_burst bufs[RTE_DISTRIB_MAX_WORKERS];
+
+	struct rte_distributor_returned_pkts returns;
+
+	enum rte_distributor_match_function dist_match_fn;
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/lib/librte_distributor/rte_distributor_version.map b/lib/librte_distributor/rte_distributor_version.map
index 73fdc43..39795a1 100644
--- a/lib/librte_distributor/rte_distributor_version.map
+++ b/lib/librte_distributor/rte_distributor_version.map
@@ -2,14 +2,23 @@ DPDK_2.0 {
 	global:
 
 	rte_distributor_clear_returns;
+	rte_distributor_clear_returns_burst;
 	rte_distributor_create;
+	rte_distributor_create_burst;
 	rte_distributor_flush;
+	rte_distributor_flush_burst;
 	rte_distributor_get_pkt;
+	rte_distributor_get_pkt_burst;
 	rte_distributor_poll_pkt;
+	rte_distributor_poll_pkt_burst;
 	rte_distributor_process;
+	rte_distributor_process_burst;
 	rte_distributor_request_pkt;
+	rte_distributor_request_pkt_burst;
 	rte_distributor_return_pkt;
+	rte_distributor_return_pkt_burst;
 	rte_distributor_returned_pkts;
+	rte_distributor_returned_pkts_burst;
 
 	local: *;
 };
-- 
2.7.4

^ permalink raw reply related

* [PATCH v4 2/6] lib: add distributor vector flow matching
From: David Hunt @ 2017-01-09  7:50 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson, David Hunt
In-Reply-To: <1483948248-91364-1-git-send-email-david.hunt@intel.com>

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 lib/librte_distributor/Makefile                    |   4 +
 lib/librte_distributor/rte_distributor_burst.c     |  11 +-
 lib/librte_distributor/rte_distributor_match_sse.c | 113 +++++++++++++++++++++
 lib/librte_distributor/rte_distributor_priv.h      |   6 ++
 4 files changed, 133 insertions(+), 1 deletion(-)
 create mode 100644 lib/librte_distributor/rte_distributor_match_sse.c

diff --git a/lib/librte_distributor/Makefile b/lib/librte_distributor/Makefile
index 2acc54d..a725aaf 100644
--- a/lib/librte_distributor/Makefile
+++ b/lib/librte_distributor/Makefile
@@ -44,6 +44,10 @@ LIBABIVER := 1
 # all source are stored in SRCS-y
 SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) := rte_distributor.c
 SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += rte_distributor_burst.c
+ifeq ($(CONFIG_RTE_ARCH_X86),y)
+SRCS-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR) += rte_distributor_match_sse.c
+endif
+
 
 # install this header file
 SYMLINK-$(CONFIG_RTE_LIBRTE_DISTRIBUTOR)-include := rte_distributor.h
diff --git a/lib/librte_distributor/rte_distributor_burst.c b/lib/librte_distributor/rte_distributor_burst.c
index ae7cf9d..35044c4 100644
--- a/lib/librte_distributor/rte_distributor_burst.c
+++ b/lib/librte_distributor/rte_distributor_burst.c
@@ -352,6 +352,9 @@ rte_distributor_process_burst(struct rte_distributor_burst *d,
 		}
 
 		switch (d->dist_match_fn) {
+		case RTE_DIST_MATCH_VECTOR:
+			find_match_vec(d, &flows[0], &matches[0]);
+			break;
 		default:
 			find_match_scalar(d, &flows[0], &matches[0]);
 		}
@@ -538,7 +541,13 @@ rte_distributor_create_burst(const char *name,
 	snprintf(d->name, sizeof(d->name), "%s", name);
 	d->num_workers = num_workers;
 
-	d->dist_match_fn = RTE_DIST_MATCH_SCALAR;
+#if defined(RTE_ARCH_X86)
+	if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_SSE2)) {
+		d->dist_match_fn = RTE_DIST_MATCH_VECTOR;
+	} else {
+#endif
+		d->dist_match_fn = RTE_DIST_MATCH_SCALAR;
+	}
 
 	/*
 	 * Set up the backog tags so they're pointing at the second cache
diff --git a/lib/librte_distributor/rte_distributor_match_sse.c b/lib/librte_distributor/rte_distributor_match_sse.c
new file mode 100644
index 0000000..78641f5
--- /dev/null
+++ b/lib/librte_distributor/rte_distributor_match_sse.c
@@ -0,0 +1,113 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <rte_mbuf.h>
+#include "rte_distributor_priv.h"
+#include "rte_distributor_burst.h"
+#include "smmintrin.h"
+
+
+void
+find_match_vec(struct rte_distributor_burst *d,
+			uint16_t *data_ptr,
+			uint16_t *output_ptr)
+{
+	/* Setup */
+	__m128i incoming_fids;
+	__m128i inflight_fids;
+	__m128i preflight_fids;
+	__m128i wkr;
+	__m128i mask1;
+	__m128i mask2;
+	__m128i output;
+	struct rte_distributor_backlog *bl;
+	uint16_t i;
+
+	/*
+	 * Function overview:
+	 * 2. Loop through all worker ID's
+	 *  2a. Load the current inflights for that worker into an xmm reg
+	 *  2b. Load the current backlog for that worker into an xmm reg
+	 *  2c. use cmpestrm to intersect flow_ids with backlog and inflights
+	 *  2d. Add any matches to the output
+	 * 3. Write the output xmm (matching worker ids).
+	 */
+
+
+	output = _mm_set1_epi16(0);
+	incoming_fids = _mm_load_si128((__m128i *)data_ptr);
+
+	for (i = 0; i < d->num_workers; i++) {
+		bl = &d->backlog[i];
+
+		inflight_fids =
+			_mm_load_si128((__m128i *)&(d->in_flight_tags[i]));
+		preflight_fids =
+			_mm_load_si128((__m128i *)(bl->tags));
+
+		/*
+		 * Any incoming_fid that exists anywhere in inflight_fids will
+		 * have 0xffff in same position of the mask as the incoming fid
+		 * Example (shortened to bytes for brevity):
+		 * incoming_fids   0x01 0x02 0x03 0x04 0x05 0x06 0x07 0x08
+		 * inflight_fids   0x03 0x05 0x07 0x00 0x00 0x00 0x00 0x00
+		 * mask            0x00 0x00 0xff 0x00 0xff 0x00 0xff 0x00
+		 */
+
+		mask1 = _mm_cmpestrm(inflight_fids, 8, incoming_fids, 8,
+			_SIDD_UWORD_OPS |
+			_SIDD_CMP_EQUAL_ANY |
+			_SIDD_UNIT_MASK);
+		mask2 = _mm_cmpestrm(preflight_fids, 8, incoming_fids, 8,
+			_SIDD_UWORD_OPS |
+			_SIDD_CMP_EQUAL_ANY |
+			_SIDD_UNIT_MASK);
+
+		mask1 = _mm_or_si128(mask1, mask2);
+		/*
+		 * Now mask contains 0xffff where there's a match.
+		 * Next we need to store the worker_id in the relevant position
+		 * in the output.
+		 */
+
+		wkr = _mm_set1_epi16(i+1);
+		mask1 = _mm_and_si128(mask1, wkr);
+		output = _mm_or_si128(mask1, output);
+	}
+
+	/*
+	 * At this stage, the output 128-bit contains 8 16-bit values, with
+	 * each non-zero value containing the worker ID on which the
+	 * corresponding flow is pinned to.
+	 */
+	_mm_store_si128((__m128i *)output_ptr, output);
+}
diff --git a/lib/librte_distributor/rte_distributor_priv.h b/lib/librte_distributor/rte_distributor_priv.h
index 833855f..cc2c478 100644
--- a/lib/librte_distributor/rte_distributor_priv.h
+++ b/lib/librte_distributor/rte_distributor_priv.h
@@ -155,6 +155,7 @@ struct rte_distributor {
 /* All different signature compare functions */
 enum rte_distributor_match_function {
 	RTE_DIST_MATCH_SCALAR = 0,
+	RTE_DIST_MATCH_VECTOR,
 	RTE_DIST_MATCH_NUM
 };
 
@@ -182,6 +183,11 @@ struct rte_distributor_burst {
 	enum rte_distributor_match_function dist_match_fn;
 };
 
+void
+find_match_vec(struct rte_distributor_burst *d,
+			uint16_t *data_ptr,
+			uint16_t *output_ptr);
+
 #ifdef __cplusplus
 }
 #endif
-- 
2.7.4

^ permalink raw reply related

* [PATCH v4 3/6] test: unit tests for new distributor burst api
From: David Hunt @ 2017-01-09  7:50 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson, David Hunt
In-Reply-To: <1483948248-91364-1-git-send-email-david.hunt@intel.com>

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 app/test/test_distributor.c | 501 ++++++++++++++++++++++++++++++++++----------
 1 file changed, 392 insertions(+), 109 deletions(-)

diff --git a/app/test/test_distributor.c b/app/test/test_distributor.c
index 85cb8f3..3871f86 100644
--- a/app/test/test_distributor.c
+++ b/app/test/test_distributor.c
@@ -1,7 +1,7 @@
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2010-2017 Intel Corporation. All rights reserved.
  *   All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
@@ -40,11 +40,24 @@
 #include <rte_mempool.h>
 #include <rte_mbuf.h>
 #include <rte_distributor.h>
+#include <rte_distributor_burst.h>
 
 #define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
 #define BURST 32
 #define BIG_BATCH 1024
 
+#define DIST_SINGLE 0
+#define DIST_BURST  1
+#define DIST_NUM_TYPES 2
+
+struct worker_params {
+	struct rte_distributor *d;
+	struct rte_distributor_burst *db;
+	int dist_type;
+};
+
+struct worker_params worker_params;
+
 /* statics - all zero-initialized by default */
 static volatile int quit;      /**< general quit variable for all threads */
 static volatile int zero_quit; /**< var for when we just want thr0 to quit*/
@@ -81,17 +94,36 @@ static int
 handle_work(void *arg)
 {
 	struct rte_mbuf *pkt = NULL;
-	struct rte_distributor *d = arg;
-	unsigned count = 0;
-	unsigned id = __sync_fetch_and_add(&worker_idx, 1);
-
-	pkt = rte_distributor_get_pkt(d, id, NULL);
-	while (!quit) {
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
+	struct worker_params *wp = arg;
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
+	unsigned int count = 0, num = 0;
+	unsigned int id = __sync_fetch_and_add(&worker_idx, 1);
+	int i;
+
+	if (wp->dist_type == DIST_SINGLE) {
+		pkt = rte_distributor_get_pkt(d, id, NULL);
+		while (!quit) {
+			worker_stats[id].handled_packets++, count++;
+			pkt = rte_distributor_get_pkt(d, id, pkt);
+		}
 		worker_stats[id].handled_packets++, count++;
-		pkt = rte_distributor_get_pkt(d, id, pkt);
+		rte_distributor_return_pkt(d, id, pkt);
+	} else {
+		for (i = 0; i < 8; i++)
+			buf[i] = NULL;
+		num = rte_distributor_get_pkt_burst(db, id, buf, buf, num);
+		while (!quit) {
+			worker_stats[id].handled_packets += num;
+			count += num;
+			num = rte_distributor_get_pkt_burst(db, id,
+					buf, buf, num);
+		}
+		worker_stats[id].handled_packets += num;
+		count += num;
+		rte_distributor_return_pkt_burst(db, id, buf, num);
 	}
-	worker_stats[id].handled_packets++, count++;
-	rte_distributor_return_pkt(d, id, pkt);
 	return 0;
 }
 
@@ -107,12 +139,21 @@ handle_work(void *arg)
  *   not necessarily in the same order (as different flows).
  */
 static int
-sanity_test(struct rte_distributor *d, struct rte_mempool *p)
+sanity_test(struct worker_params *wp, struct rte_mempool *p)
 {
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
 	struct rte_mbuf *bufs[BURST];
-	unsigned i;
+	struct rte_mbuf *returns[BURST*2];
+	unsigned int i;
+	unsigned int retries;
+	unsigned int count = 0;
+
+	if (wp->dist_type == DIST_SINGLE)
+		printf("=== Basic distributor sanity tests (single) ===\n");
+	else
+		printf("=== Basic distributor sanity tests (burst) ===\n");
 
-	printf("=== Basic distributor sanity tests ===\n");
 	clear_packet_count();
 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
@@ -124,8 +165,21 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 	for (i = 0; i < BURST; i++)
 		bufs[i]->hash.usr = 0;
 
-	rte_distributor_process(d, bufs, BURST);
-	rte_distributor_flush(d);
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_process(d, bufs, BURST);
+		rte_distributor_flush(d);
+	} else {
+		rte_distributor_process_burst(db, bufs, BURST);
+		count = 0;
+		do {
+
+			rte_distributor_flush_burst(db);
+			count += rte_distributor_returned_pkts_burst(db,
+					returns, BURST*2);
+		} while (count < BURST);
+	}
+
+
 	if (total_packet_count() != BURST) {
 		printf("Line %d: Error, not all packets flushed. "
 				"Expected %u, got %u\n",
@@ -146,8 +200,18 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 		for (i = 0; i < BURST; i++)
 			bufs[i]->hash.usr = (i & 1) << 8;
 
-		rte_distributor_process(d, bufs, BURST);
-		rte_distributor_flush(d);
+		if (wp->dist_type == DIST_SINGLE) {
+			rte_distributor_process(d, bufs, BURST);
+			rte_distributor_flush(d);
+		} else {
+			rte_distributor_process_burst(db, bufs, BURST);
+			count = 0;
+			do {
+				rte_distributor_flush_burst(db);
+				count += rte_distributor_returned_pkts_burst(db,
+						returns, BURST*2);
+			} while (count < BURST);
+		}
 		if (total_packet_count() != BURST) {
 			printf("Line %d: Error, not all packets flushed. "
 					"Expected %u, got %u\n",
@@ -155,24 +219,32 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 			return -1;
 		}
 
+
 		for (i = 0; i < rte_lcore_count() - 1; i++)
 			printf("Worker %u handled %u packets\n", i,
 					worker_stats[i].handled_packets);
 		printf("Sanity test with two hash values done\n");
-
-		if (worker_stats[0].handled_packets != 16 ||
-				worker_stats[1].handled_packets != 16)
-			return -1;
 	}
 
 	/* give a different hash value to each packet,
 	 * so load gets distributed */
 	clear_packet_count();
 	for (i = 0; i < BURST; i++)
-		bufs[i]->hash.usr = i;
+		bufs[i]->hash.usr = i+1;
+
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_process(d, bufs, BURST);
+		rte_distributor_flush(d);
+	} else {
+		rte_distributor_process_burst(db, bufs, BURST);
+		count = 0;
+		do {
+			rte_distributor_flush_burst(db);
+			count += rte_distributor_returned_pkts_burst(db,
+					returns, BURST*2);
+		} while (count < BURST);
+	}
 
-	rte_distributor_process(d, bufs, BURST);
-	rte_distributor_flush(d);
 	if (total_packet_count() != BURST) {
 		printf("Line %d: Error, not all packets flushed. "
 				"Expected %u, got %u\n",
@@ -194,8 +266,15 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 	unsigned num_returned = 0;
 
 	/* flush out any remaining packets */
-	rte_distributor_flush(d);
-	rte_distributor_clear_returns(d);
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_flush(d);
+		rte_distributor_clear_returns(d);
+	} else {
+		rte_distributor_flush_burst(db);
+		rte_distributor_clear_returns_burst(db);
+	}
+
+
 	if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) {
 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
 		return -1;
@@ -203,28 +282,59 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 	for (i = 0; i < BIG_BATCH; i++)
 		many_bufs[i]->hash.usr = i << 2;
 
-	for (i = 0; i < BIG_BATCH/BURST; i++) {
-		rte_distributor_process(d, &many_bufs[i*BURST], BURST);
+	if (wp->dist_type == DIST_SINGLE) {
+		printf("===testing single big burst===\n");
+		for (i = 0; i < BIG_BATCH/BURST; i++) {
+			rte_distributor_process(d, &many_bufs[i*BURST], BURST);
+			num_returned += rte_distributor_returned_pkts(d,
+					&return_bufs[num_returned],
+					BIG_BATCH - num_returned);
+		}
+		rte_distributor_flush(d);
 		num_returned += rte_distributor_returned_pkts(d,
 				&return_bufs[num_returned],
 				BIG_BATCH - num_returned);
+	} else {
+		printf("===testing burst big burst===\n");
+		for (i = 0; i < BIG_BATCH/BURST; i++) {
+			rte_distributor_process_burst(db,
+					&many_bufs[i*BURST], BURST);
+			count = rte_distributor_returned_pkts_burst(db,
+					&return_bufs[num_returned],
+					BIG_BATCH - num_returned);
+			num_returned += count;
+		}
+		rte_distributor_flush_burst(db);
+		count = rte_distributor_returned_pkts_burst(db,
+				&return_bufs[num_returned],
+				BIG_BATCH - num_returned);
+		num_returned += count;
 	}
-	rte_distributor_flush(d);
-	num_returned += rte_distributor_returned_pkts(d,
-			&return_bufs[num_returned], BIG_BATCH - num_returned);
+	retries = 0;
+	do {
+		rte_distributor_flush_burst(db);
+		count = rte_distributor_returned_pkts_burst(db,
+				&return_bufs[num_returned],
+				BIG_BATCH - num_returned);
+		num_returned += count;
+		retries++;
+	} while ((num_returned < BIG_BATCH) && (retries < 100));
+
 
 	if (num_returned != BIG_BATCH) {
-		printf("line %d: Number returned is not the same as "
-				"number sent\n", __LINE__);
+		printf("line %d: Missing packets, expected %d\n",
+				__LINE__, num_returned);
 		return -1;
 	}
+
 	/* big check -  make sure all packets made it back!! */
 	for (i = 0; i < BIG_BATCH; i++) {
 		unsigned j;
 		struct rte_mbuf *src = many_bufs[i];
-		for (j = 0; j < BIG_BATCH; j++)
+		for (j = 0; j < BIG_BATCH; j++) {
 			if (return_bufs[j] == src)
 				break;
+		}
 
 		if (j == BIG_BATCH) {
 			printf("Error: could not find source packet #%u\n", i);
@@ -234,7 +344,6 @@ sanity_test(struct rte_distributor *d, struct rte_mempool *p)
 	printf("Sanity test of returned packets done\n");
 
 	rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
-
 	printf("\n");
 	return 0;
 }
@@ -249,18 +358,40 @@ static int
 handle_work_with_free_mbufs(void *arg)
 {
 	struct rte_mbuf *pkt = NULL;
-	struct rte_distributor *d = arg;
-	unsigned count = 0;
-	unsigned id = __sync_fetch_and_add(&worker_idx, 1);
-
-	pkt = rte_distributor_get_pkt(d, id, NULL);
-	while (!quit) {
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
+	struct worker_params *wp = arg;
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
+	unsigned int count = 0;
+	unsigned int i;
+	unsigned int num = 0;
+	unsigned int id = __sync_fetch_and_add(&worker_idx, 1);
+
+	if (wp->dist_type == DIST_SINGLE) {
+		pkt = rte_distributor_get_pkt(d, id, NULL);
+		while (!quit) {
+			worker_stats[id].handled_packets++, count++;
+			rte_pktmbuf_free(pkt);
+			pkt = rte_distributor_get_pkt(d, id, pkt);
+		}
 		worker_stats[id].handled_packets++, count++;
-		rte_pktmbuf_free(pkt);
-		pkt = rte_distributor_get_pkt(d, id, pkt);
+		rte_distributor_return_pkt(d, id, pkt);
+	} else {
+		for (i = 0; i < 8; i++)
+			buf[i] = NULL;
+		num = rte_distributor_get_pkt_burst(db, id, buf, buf, num);
+		while (!quit) {
+			worker_stats[id].handled_packets += num;
+			count += num;
+			for (i = 0; i < num; i++)
+				rte_pktmbuf_free(buf[i]);
+			num = rte_distributor_get_pkt_burst(db,
+					id, buf, buf, num);
+		}
+		worker_stats[id].handled_packets += num;
+		count += num;
+		rte_distributor_return_pkt_burst(db, id, buf, num);
 	}
-	worker_stats[id].handled_packets++, count++;
-	rte_distributor_return_pkt(d, id, pkt);
 	return 0;
 }
 
@@ -270,26 +401,45 @@ handle_work_with_free_mbufs(void *arg)
  * library.
  */
 static int
-sanity_test_with_mbuf_alloc(struct rte_distributor *d, struct rte_mempool *p)
+sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p)
 {
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
 	unsigned i;
 	struct rte_mbuf *bufs[BURST];
 
-	printf("=== Sanity test with mbuf alloc/free  ===\n");
+	if (wp->dist_type == DIST_SINGLE)
+		printf("=== Sanity test with mbuf alloc/free (single) ===\n");
+	else
+		printf("=== Sanity test with mbuf alloc/free (burst)  ===\n");
+
 	clear_packet_count();
 	for (i = 0; i < ((1<<ITER_POWER)); i += BURST) {
 		unsigned j;
-		while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
-			rte_distributor_process(d, NULL, 0);
+		while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0) {
+			if (wp->dist_type == DIST_SINGLE)
+				rte_distributor_process(d, NULL, 0);
+			else
+				rte_distributor_process_burst(db, NULL, 0);
+		}
 		for (j = 0; j < BURST; j++) {
 			bufs[j]->hash.usr = (i+j) << 1;
 			rte_mbuf_refcnt_set(bufs[j], 1);
 		}
 
-		rte_distributor_process(d, bufs, BURST);
+		if (wp->dist_type == DIST_SINGLE)
+			rte_distributor_process(d, bufs, BURST);
+		else
+			rte_distributor_process_burst(db, bufs, BURST);
 	}
 
-	rte_distributor_flush(d);
+	if (wp->dist_type == DIST_SINGLE)
+		rte_distributor_flush(d);
+	else
+		rte_distributor_flush_burst(db);
+
+	rte_delay_us(10000);
+
 	if (total_packet_count() < (1<<ITER_POWER)) {
 		printf("Line %u: Packet count is incorrect, %u, expected %u\n",
 				__LINE__, total_packet_count(),
@@ -305,20 +455,48 @@ static int
 handle_work_for_shutdown_test(void *arg)
 {
 	struct rte_mbuf *pkt = NULL;
-	struct rte_distributor *d = arg;
-	unsigned count = 0;
-	const unsigned id = __sync_fetch_and_add(&worker_idx, 1);
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
+	struct worker_params *wp = arg;
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
+	unsigned int count = 0;
+	unsigned int num = 0;
+	unsigned int total = 0;
+	unsigned int i;
+	unsigned int returned = 0;
+	const unsigned int id = __sync_fetch_and_add(&worker_idx, 1);
+
+	if (wp->dist_type == DIST_SINGLE)
+		pkt = rte_distributor_get_pkt(d, id, NULL);
+	else
+		num = rte_distributor_get_pkt_burst(db, id, buf, buf, num);
 
-	pkt = rte_distributor_get_pkt(d, id, NULL);
 	/* wait for quit single globally, or for worker zero, wait
 	 * for zero_quit */
 	while (!quit && !(id == 0 && zero_quit)) {
-		worker_stats[id].handled_packets++, count++;
-		rte_pktmbuf_free(pkt);
-		pkt = rte_distributor_get_pkt(d, id, NULL);
+		if (wp->dist_type == DIST_SINGLE) {
+			worker_stats[id].handled_packets++, count++;
+			rte_pktmbuf_free(pkt);
+			pkt = rte_distributor_get_pkt(d, id, NULL);
+			num = 1;
+			total += num;
+		} else {
+			worker_stats[id].handled_packets += num;
+			count += num;
+			for (i = 0; i < num; i++)
+				rte_pktmbuf_free(buf[i]);
+			num = rte_distributor_get_pkt_burst(db,
+					id, buf, buf, num);
+			total += num;
+		}
+	}
+	worker_stats[id].handled_packets += num;
+	count += num;
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_return_pkt(d, id, pkt);
+	} else {
+		returned = rte_distributor_return_pkt_burst(db, id, buf, num);
 	}
-	worker_stats[id].handled_packets++, count++;
-	rte_distributor_return_pkt(d, id, pkt);
 
 	if (id == 0) {
 		/* for worker zero, allow it to restart to pick up last packet
@@ -326,13 +504,29 @@ handle_work_for_shutdown_test(void *arg)
 		 */
 		while (zero_quit)
 			usleep(100);
-		pkt = rte_distributor_get_pkt(d, id, NULL);
+		if (wp->dist_type == DIST_SINGLE) {
+			pkt = rte_distributor_get_pkt(d, id, NULL);
+		} else {
+			num = rte_distributor_get_pkt_burst(db,
+					id, buf, buf, num);
+		}
 		while (!quit) {
 			worker_stats[id].handled_packets++, count++;
 			rte_pktmbuf_free(pkt);
-			pkt = rte_distributor_get_pkt(d, id, NULL);
+			if (wp->dist_type == DIST_SINGLE) {
+				pkt = rte_distributor_get_pkt(d, id, NULL);
+			} else {
+				num = rte_distributor_get_pkt_burst(db,
+						id, buf, buf, num);
+			}
+		}
+		if (wp->dist_type == DIST_SINGLE) {
+			rte_distributor_return_pkt(d, id, pkt);
+		} else {
+			returned = rte_distributor_return_pkt_burst(db,
+					id, buf, num);
+			printf("Num returned = %d\n", returned);
 		}
-		rte_distributor_return_pkt(d, id, pkt);
 	}
 	return 0;
 }
@@ -344,26 +538,37 @@ handle_work_for_shutdown_test(void *arg)
  * library.
  */
 static int
-sanity_test_with_worker_shutdown(struct rte_distributor *d,
+sanity_test_with_worker_shutdown(struct worker_params *wp,
 		struct rte_mempool *p)
 {
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
 	struct rte_mbuf *bufs[BURST];
 	unsigned i;
 
 	printf("=== Sanity test of worker shutdown ===\n");
 
 	clear_packet_count();
+
 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
 		printf("line %d: Error getting mbufs from pool\n", __LINE__);
 		return -1;
 	}
 
-	/* now set all hash values in all buffers to zero, so all pkts go to the
-	 * one worker thread */
+	/*
+	 * Now set all hash values in all buffers to same value so all
+	 * pkts go to the one worker thread
+	 */
 	for (i = 0; i < BURST; i++)
-		bufs[i]->hash.usr = 0;
+		bufs[i]->hash.usr = 1;
+
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_process(d, bufs, BURST);
+	} else {
+		rte_distributor_process_burst(db, bufs, BURST);
+		rte_distributor_flush_burst(db);
+	}
 
-	rte_distributor_process(d, bufs, BURST);
 	/* at this point, we will have processed some packets and have a full
 	 * backlog for the other ones at worker 0.
 	 */
@@ -374,14 +579,25 @@ sanity_test_with_worker_shutdown(struct rte_distributor *d,
 		return -1;
 	}
 	for (i = 0; i < BURST; i++)
-		bufs[i]->hash.usr = 0;
+		bufs[i]->hash.usr = 1;
 
 	/* get worker zero to quit */
 	zero_quit = 1;
-	rte_distributor_process(d, bufs, BURST);
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_process(d, bufs, BURST);
+		/* flush the distributor */
+		rte_distributor_flush(d);
+	} else {
+		rte_distributor_process_burst(db, bufs, BURST);
+		/* flush the distributor */
+		rte_distributor_flush_burst(db);
+	}
+	rte_delay_us(10000);
+
+	for (i = 0; i < rte_lcore_count() - 1; i++)
+		printf("Worker %u handled %u packets\n", i,
+				worker_stats[i].handled_packets);
 
-	/* flush the distributor */
-	rte_distributor_flush(d);
 	if (total_packet_count() != BURST * 2) {
 		printf("Line %d: Error, not all packets flushed. "
 				"Expected %u, got %u\n",
@@ -389,10 +605,6 @@ sanity_test_with_worker_shutdown(struct rte_distributor *d,
 		return -1;
 	}
 
-	for (i = 0; i < rte_lcore_count() - 1; i++)
-		printf("Worker %u handled %u packets\n", i,
-				worker_stats[i].handled_packets);
-
 	printf("Sanity test with worker shutdown passed\n\n");
 	return 0;
 }
@@ -401,13 +613,18 @@ sanity_test_with_worker_shutdown(struct rte_distributor *d,
  * one worker shuts down..
  */
 static int
-test_flush_with_worker_shutdown(struct rte_distributor *d,
+test_flush_with_worker_shutdown(struct worker_params *wp,
 		struct rte_mempool *p)
 {
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
 	struct rte_mbuf *bufs[BURST];
 	unsigned i;
 
-	printf("=== Test flush fn with worker shutdown ===\n");
+	if (wp->dist_type == DIST_SINGLE)
+		printf("=== Test flush fn with worker shutdown (single) ===\n");
+	else
+		printf("=== Test flush fn with worker shutdown (burst) ===\n");
 
 	clear_packet_count();
 	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
@@ -420,7 +637,11 @@ test_flush_with_worker_shutdown(struct rte_distributor *d,
 	for (i = 0; i < BURST; i++)
 		bufs[i]->hash.usr = 0;
 
-	rte_distributor_process(d, bufs, BURST);
+	if (wp->dist_type == DIST_SINGLE)
+		rte_distributor_process(d, bufs, BURST);
+	else
+		rte_distributor_process_burst(db, bufs, BURST);
+
 	/* at this point, we will have processed some packets and have a full
 	 * backlog for the other ones at worker 0.
 	 */
@@ -429,9 +650,18 @@ test_flush_with_worker_shutdown(struct rte_distributor *d,
 	zero_quit = 1;
 
 	/* flush the distributor */
-	rte_distributor_flush(d);
+	if (wp->dist_type == DIST_SINGLE)
+		rte_distributor_flush(d);
+	else
+		rte_distributor_flush_burst(db);
+
+	rte_delay_us(10000);
 
 	zero_quit = 0;
+	for (i = 0; i < rte_lcore_count() - 1; i++)
+		printf("Worker %u handled %u packets\n", i,
+				worker_stats[i].handled_packets);
+
 	if (total_packet_count() != BURST) {
 		printf("Line %d: Error, not all packets flushed. "
 				"Expected %u, got %u\n",
@@ -439,10 +669,6 @@ test_flush_with_worker_shutdown(struct rte_distributor *d,
 		return -1;
 	}
 
-	for (i = 0; i < rte_lcore_count() - 1; i++)
-		printf("Worker %u handled %u packets\n", i,
-				worker_stats[i].handled_packets);
-
 	printf("Flush test with worker shutdown passed\n\n");
 	return 0;
 }
@@ -451,6 +677,7 @@ static
 int test_error_distributor_create_name(void)
 {
 	struct rte_distributor *d = NULL;
+	struct rte_distributor_burst *db = NULL;
 	char *name = NULL;
 
 	d = rte_distributor_create(name, rte_socket_id(),
@@ -460,6 +687,13 @@ int test_error_distributor_create_name(void)
 		return -1;
 	}
 
+	db = rte_distributor_create_burst(name, rte_socket_id(),
+			rte_lcore_count() - 1);
+	if (db != NULL || rte_errno != EINVAL) {
+		printf("ERROR: No error on create_burst() with NULL param\n");
+		return -1;
+	}
+
 	return 0;
 }
 
@@ -468,20 +702,32 @@ static
 int test_error_distributor_create_numworkers(void)
 {
 	struct rte_distributor *d = NULL;
+	struct rte_distributor_burst *db = NULL;
+
 	d = rte_distributor_create("test_numworkers", rte_socket_id(),
 			RTE_MAX_LCORE + 10);
 	if (d != NULL || rte_errno != EINVAL) {
 		printf("ERROR: No error on create() with num_workers > MAX\n");
 		return -1;
 	}
+
+	db = rte_distributor_create_burst("test_numworkers", rte_socket_id(),
+			RTE_MAX_LCORE + 10);
+	if (db != NULL || rte_errno != EINVAL) {
+		printf("ERROR: No error on create_burst() num_workers > MAX\n");
+		return -1;
+	}
+
 	return 0;
 }
 
 
 /* Useful function which ensures that all worker functions terminate */
 static void
-quit_workers(struct rte_distributor *d, struct rte_mempool *p)
+quit_workers(struct worker_params *wp, struct rte_mempool *p)
 {
+	struct rte_distributor *d = wp->d;
+	struct rte_distributor_burst *db = wp->db;
 	const unsigned num_workers = rte_lcore_count() - 1;
 	unsigned i;
 	struct rte_mbuf *bufs[RTE_MAX_LCORE];
@@ -491,12 +737,20 @@ quit_workers(struct rte_distributor *d, struct rte_mempool *p)
 	quit = 1;
 	for (i = 0; i < num_workers; i++)
 		bufs[i]->hash.usr = i << 1;
-	rte_distributor_process(d, bufs, num_workers);
+	if (wp->dist_type == DIST_SINGLE)
+		rte_distributor_process(d, bufs, num_workers);
+	else
+		rte_distributor_process_burst(db, bufs, num_workers);
 
 	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
 
-	rte_distributor_process(d, NULL, 0);
-	rte_distributor_flush(d);
+	if (wp->dist_type == DIST_SINGLE) {
+		rte_distributor_process(d, NULL, 0);
+		rte_distributor_flush(d);
+	} else {
+		rte_distributor_process_burst(db, NULL, 0);
+		rte_distributor_flush_burst(db);
+	}
 	rte_eal_mp_wait_lcore();
 	quit = 0;
 	worker_idx = 0;
@@ -506,7 +760,9 @@ static int
 test_distributor(void)
 {
 	static struct rte_distributor *d;
+	static struct rte_distributor_burst *db;
 	static struct rte_mempool *p;
+	int i;
 
 	if (rte_lcore_count() < 2) {
 		printf("ERROR: not enough cores to test distributor\n");
@@ -525,6 +781,19 @@ test_distributor(void)
 		rte_distributor_clear_returns(d);
 	}
 
+	if (db == NULL) {
+		db = rte_distributor_create_burst("Test_dist_burst",
+				rte_socket_id(),
+				rte_lcore_count() - 1);
+		if (db == NULL) {
+			printf("Error creating burst distributor\n");
+			return -1;
+		}
+	} else {
+		rte_distributor_flush_burst(db);
+		rte_distributor_clear_returns_burst(db);
+	}
+
 	const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
 			(BIG_BATCH * 2) - 1 : (511 * rte_lcore_count());
 	if (p == NULL) {
@@ -536,31 +805,45 @@ test_distributor(void)
 		}
 	}
 
-	rte_eal_mp_remote_launch(handle_work, d, SKIP_MASTER);
-	if (sanity_test(d, p) < 0)
-		goto err;
-	quit_workers(d, p);
+	worker_params.d = d;
+	worker_params.db = db;
 
-	rte_eal_mp_remote_launch(handle_work_with_free_mbufs, d, SKIP_MASTER);
-	if (sanity_test_with_mbuf_alloc(d, p) < 0)
-		goto err;
-	quit_workers(d, p);
+	for (i = 0; i < DIST_NUM_TYPES; i++) {
 
-	if (rte_lcore_count() > 2) {
-		rte_eal_mp_remote_launch(handle_work_for_shutdown_test, d,
-				SKIP_MASTER);
-		if (sanity_test_with_worker_shutdown(d, p) < 0)
-			goto err;
-		quit_workers(d, p);
+		worker_params.dist_type = i;
 
-		rte_eal_mp_remote_launch(handle_work_for_shutdown_test, d,
-				SKIP_MASTER);
-		if (test_flush_with_worker_shutdown(d, p) < 0)
+		rte_eal_mp_remote_launch(handle_work,
+				&worker_params, SKIP_MASTER);
+		if (sanity_test(&worker_params, p) < 0)
 			goto err;
-		quit_workers(d, p);
+		quit_workers(&worker_params, p);
 
-	} else {
-		printf("Not enough cores to run tests for worker shutdown\n");
+		rte_eal_mp_remote_launch(handle_work_with_free_mbufs,
+				&worker_params, SKIP_MASTER);
+		if (sanity_test_with_mbuf_alloc(&worker_params, p) < 0)
+			goto err;
+		quit_workers(&worker_params, p);
+
+		if (rte_lcore_count() > 2) {
+			rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
+					&worker_params,
+					SKIP_MASTER);
+			if (sanity_test_with_worker_shutdown(&worker_params,
+					p) < 0)
+				goto err;
+			quit_workers(&worker_params, p);
+
+			rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
+					&worker_params,
+					SKIP_MASTER);
+			if (test_flush_with_worker_shutdown(&worker_params,
+					p) < 0)
+				goto err;
+			quit_workers(&worker_params, p);
+
+		} else {
+			printf("Too few cores to run worker shutdown test\n");
+		}
 	}
 
 	if (test_error_distributor_create_numworkers() == -1 ||
@@ -572,7 +855,7 @@ test_distributor(void)
 	return 0;
 
 err:
-	quit_workers(d, p);
+	quit_workers(&worker_params, p);
 	return -1;
 }
 
-- 
2.7.4

^ permalink raw reply related

* [PATCH v4 4/6] test: add distributor_perf autotest
From: David Hunt @ 2017-01-09  7:50 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson, David Hunt
In-Reply-To: <1483948248-91364-1-git-send-email-david.hunt@intel.com>

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 app/test/test_distributor_perf.c | 148 ++++++++++++++++++++++++++++++++++++---
 1 file changed, 137 insertions(+), 11 deletions(-)

diff --git a/app/test/test_distributor_perf.c b/app/test/test_distributor_perf.c
index 7947fe9..b273bf9 100644
--- a/app/test/test_distributor_perf.c
+++ b/app/test/test_distributor_perf.c
@@ -40,9 +40,11 @@
 #include <rte_common.h>
 #include <rte_mbuf.h>
 #include <rte_distributor.h>
+#include <rte_distributor_burst.h>
 
-#define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
-#define BURST 32
+#define ITER_POWER_CL 25 /* log 2 of how many iterations  for Cache Line test */
+#define ITER_POWER 21 /* log 2 of how many iterations we do when timing. */
+#define BURST 64
 #define BIG_BATCH 1024
 
 /* static vars - zero initialized by default */
@@ -54,7 +56,8 @@ struct worker_stats {
 } __rte_cache_aligned;
 struct worker_stats worker_stats[RTE_MAX_LCORE];
 
-/* worker thread used for testing the time to do a round-trip of a cache
+/*
+ * worker thread used for testing the time to do a round-trip of a cache
  * line between two cores and back again
  */
 static void
@@ -69,7 +72,8 @@ flip_bit(volatile uint64_t *arg)
 	}
 }
 
-/* test case to time the number of cycles to round-trip a cache line between
+/*
+ * test case to time the number of cycles to round-trip a cache line between
  * two cores and back again.
  */
 static void
@@ -86,7 +90,7 @@ time_cache_line_switch(void)
 		rte_pause();
 
 	const uint64_t start_time = rte_rdtsc();
-	for (i = 0; i < (1 << ITER_POWER); i++) {
+	for (i = 0; i < (1 << ITER_POWER_CL); i++) {
 		while (*pdata)
 			rte_pause();
 		*pdata = 1;
@@ -98,13 +102,14 @@ time_cache_line_switch(void)
 	*pdata = 2;
 	rte_eal_wait_lcore(slaveid);
 	printf("==== Cache line switch test ===\n");
-	printf("Time for %u iterations = %"PRIu64" ticks\n", (1<<ITER_POWER),
+	printf("Time for %u iterations = %"PRIu64" ticks\n", (1<<ITER_POWER_CL),
 			end_time-start_time);
 	printf("Ticks per iteration = %"PRIu64"\n\n",
-			(end_time-start_time) >> ITER_POWER);
+			(end_time-start_time) >> ITER_POWER_CL);
 }
 
-/* returns the total count of the number of packets handled by the worker
+/*
+ * returns the total count of the number of packets handled by the worker
  * functions given below.
  */
 static unsigned
@@ -123,7 +128,8 @@ clear_packet_count(void)
 	memset(&worker_stats, 0, sizeof(worker_stats));
 }
 
-/* this is the basic worker function for performance tests.
+/*
+ * this is the basic worker function for performance tests.
  * it does nothing but return packets and count them.
  */
 static int
@@ -144,7 +150,37 @@ handle_work(void *arg)
 	return 0;
 }
 
-/* this basic performance test just repeatedly sends in 32 packets at a time
+/*
+ * this is the basic worker function for performance tests.
+ * it does nothing but return packets and count them.
+ */
+static int
+handle_work_burst(void *arg)
+{
+	struct rte_distributor_burst *d = arg;
+	unsigned int count = 0;
+	unsigned int num = 0;
+	int i;
+	unsigned int id = __sync_fetch_and_add(&worker_idx, 1);
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
+
+	for (i = 0; i < 8; i++)
+		buf[i] = NULL;
+
+	num = rte_distributor_get_pkt_burst(d, id, buf, buf, num);
+	while (!quit) {
+		worker_stats[id].handled_packets += num;
+		count += num;
+		num = rte_distributor_get_pkt_burst(d, id, buf, buf, num);
+	}
+	worker_stats[id].handled_packets += num;
+	count += num;
+	rte_distributor_return_pkt_burst(d, id, buf, num);
+	return 0;
+}
+
+/*
+ * this basic performance test just repeatedly sends in 32 packets at a time
  * to the distributor and verifies at the end that we got them all in the worker
  * threads and finally how long per packet the processing took.
  */
@@ -174,6 +210,8 @@ perf_test(struct rte_distributor *d, struct rte_mempool *p)
 		rte_distributor_process(d, NULL, 0);
 	} while (total_packet_count() < (BURST << ITER_POWER));
 
+	rte_distributor_clear_returns(d);
+
 	printf("=== Performance test of distributor ===\n");
 	printf("Time per burst:  %"PRIu64"\n", (end - start) >> ITER_POWER);
 	printf("Time per packet: %"PRIu64"\n\n",
@@ -190,6 +228,55 @@ perf_test(struct rte_distributor *d, struct rte_mempool *p)
 	return 0;
 }
 
+/*
+ * this basic performance test just repeatedly sends in 32 packets at a time
+ * to the distributor and verifies at the end that we got them all in the worker
+ * threads and finally how long per packet the processing took.
+ */
+static inline int
+perf_test_burst(struct rte_distributor_burst *d, struct rte_mempool *p)
+{
+	unsigned int i;
+	uint64_t start, end;
+	struct rte_mbuf *bufs[BURST];
+
+	clear_packet_count();
+	if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
+		printf("Error getting mbufs from pool\n");
+		return -1;
+	}
+	/* ensure we have different hash value for each pkt */
+	for (i = 0; i < BURST; i++)
+		bufs[i]->hash.usr = i;
+
+	start = rte_rdtsc();
+	for (i = 0; i < (1<<ITER_POWER); i++)
+		rte_distributor_process_burst(d, bufs, BURST);
+	end = rte_rdtsc();
+
+	do {
+		usleep(100);
+		rte_distributor_process_burst(d, NULL, 0);
+	} while (total_packet_count() < (BURST << ITER_POWER));
+
+	rte_distributor_clear_returns_burst(d);
+
+	printf("=== Performance test of burst distributor ===\n");
+	printf("Time per burst:  %"PRIu64"\n", (end - start) >> ITER_POWER);
+	printf("Time per packet: %"PRIu64"\n\n",
+			((end - start) >> ITER_POWER)/BURST);
+	rte_mempool_put_bulk(p, (void *)bufs, BURST);
+
+	for (i = 0; i < rte_lcore_count() - 1; i++)
+		printf("Worker %u handled %u packets\n", i,
+				worker_stats[i].handled_packets);
+	printf("Total packets: %u (%x)\n", total_packet_count(),
+			total_packet_count());
+	printf("=== Perf test done ===\n\n");
+
+	return 0;
+}
+
 /* Useful function which ensures that all worker functions terminate */
 static void
 quit_workers(struct rte_distributor *d, struct rte_mempool *p)
@@ -212,10 +299,34 @@ quit_workers(struct rte_distributor *d, struct rte_mempool *p)
 	worker_idx = 0;
 }
 
+/* Useful function which ensures that all worker functions terminate */
+static void
+quit_workers_burst(struct rte_distributor_burst *d, struct rte_mempool *p)
+{
+	const unsigned int num_workers = rte_lcore_count() - 1;
+	unsigned int i;
+	struct rte_mbuf *bufs[RTE_MAX_LCORE];
+
+	rte_mempool_get_bulk(p, (void *)bufs, num_workers);
+
+	quit = 1;
+	for (i = 0; i < num_workers; i++)
+		bufs[i]->hash.usr = i << 1;
+	rte_distributor_process_burst(d, bufs, num_workers);
+
+	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
+
+	rte_distributor_process_burst(d, NULL, 0);
+	rte_eal_mp_wait_lcore();
+	quit = 0;
+	worker_idx = 0;
+}
+
 static int
 test_distributor_perf(void)
 {
 	static struct rte_distributor *d;
+	static struct rte_distributor_burst *db;
 	static struct rte_mempool *p;
 
 	if (rte_lcore_count() < 2) {
@@ -234,10 +345,20 @@ test_distributor_perf(void)
 			return -1;
 		}
 	} else {
-		rte_distributor_flush(d);
 		rte_distributor_clear_returns(d);
 	}
 
+	if (db == NULL) {
+		db = rte_distributor_create_burst("Test_burst", rte_socket_id(),
+				rte_lcore_count() - 1);
+		if (db == NULL) {
+			printf("Error creating burst distributor\n");
+			return -1;
+		}
+	} else {
+		rte_distributor_clear_returns_burst(db);
+	}
+
 	const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
 			(BIG_BATCH * 2) - 1 : (511 * rte_lcore_count());
 	if (p == NULL) {
@@ -254,6 +375,11 @@ test_distributor_perf(void)
 		return -1;
 	quit_workers(d, p);
 
+	rte_eal_mp_remote_launch(handle_work_burst, db, SKIP_MASTER);
+	if (perf_test_burst(db, p) < 0)
+		return -1;
+	quit_workers_burst(db, p);
+
 	return 0;
 }
 
-- 
2.7.4

^ permalink raw reply related

* [PATCH v4 5/6] example: distributor app showing burst api
From: David Hunt @ 2017-01-09  7:50 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson, David Hunt
In-Reply-To: <1483948248-91364-1-git-send-email-david.hunt@intel.com>

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 examples/distributor/main.c | 508 ++++++++++++++++++++++++++++++++++----------
 1 file changed, 390 insertions(+), 118 deletions(-)

diff --git a/examples/distributor/main.c b/examples/distributor/main.c
index e7641d2..eebfb74 100644
--- a/examples/distributor/main.c
+++ b/examples/distributor/main.c
@@ -1,8 +1,7 @@
 /*-
  *   BSD LICENSE
  *
- *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
- *   All rights reserved.
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
  *   modification, are permitted provided that the following conditions
@@ -31,6 +30,8 @@
  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 
+#define BURST_API 1
+
 #include <stdint.h>
 #include <inttypes.h>
 #include <unistd.h>
@@ -43,39 +44,87 @@
 #include <rte_malloc.h>
 #include <rte_debug.h>
 #include <rte_prefetch.h>
+#if BURST_API
+#include <rte_distributor_burst.h>
+#else
 #include <rte_distributor.h>
+#endif
 
-#define RX_RING_SIZE 256
-#define TX_RING_SIZE 512
+#define RX_QUEUE_SIZE 512
+#define TX_QUEUE_SIZE 512
 #define NUM_MBUFS ((64*1024)-1)
-#define MBUF_CACHE_SIZE 250
+#define MBUF_CACHE_SIZE 128
+#if BURST_API
+#define BURST_SIZE 64
+#define SCHED_RX_RING_SZ 8192
+#define SCHED_TX_RING_SZ 65536
+#else
 #define BURST_SIZE 32
-#define RTE_RING_SZ 1024
+#define SCHED_RX_RING_SZ 1024
+#define SCHED_TX_RING_SZ 1024
+#endif
+#define BURST_SIZE_TX 32
 
 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
 
+#define ANSI_COLOR_RED     "\x1b[31m"
+#define ANSI_COLOR_RESET   "\x1b[0m"
+
 /* mask of enabled ports */
 static uint32_t enabled_port_mask;
 volatile uint8_t quit_signal;
 volatile uint8_t quit_signal_rx;
+volatile uint8_t quit_signal_dist;
+volatile uint8_t quit_signal_work;
 
 static volatile struct app_stats {
 	struct {
 		uint64_t rx_pkts;
 		uint64_t returned_pkts;
 		uint64_t enqueued_pkts;
+		uint64_t enqdrop_pkts;
 	} rx __rte_cache_aligned;
+	int pad1 __rte_cache_aligned;
+
+	struct {
+		uint64_t in_pkts;
+		uint64_t ret_pkts;
+		uint64_t sent_pkts;
+		uint64_t enqdrop_pkts;
+	} dist __rte_cache_aligned;
+	int pad2 __rte_cache_aligned;
 
 	struct {
 		uint64_t dequeue_pkts;
 		uint64_t tx_pkts;
+		uint64_t enqdrop_pkts;
 	} tx __rte_cache_aligned;
+	int pad3 __rte_cache_aligned;
+
+	uint64_t worker_pkts[64] __rte_cache_aligned;
+
+	int pad4 __rte_cache_aligned;
+
+	uint64_t worker_bursts[64][8] __rte_cache_aligned;
+
+	int pad5 __rte_cache_aligned;
+
+	uint64_t port_rx_pkts[64] __rte_cache_aligned;
+	uint64_t port_tx_pkts[64] __rte_cache_aligned;
 } app_stats;
 
+struct app_stats prev_app_stats;
+
 static const struct rte_eth_conf port_conf_default = {
 	.rxmode = {
 		.mq_mode = ETH_MQ_RX_RSS,
 		.max_rx_pkt_len = ETHER_MAX_LEN,
+		.split_hdr_size = 0,
+		.header_split   = 0, /**< Header Split disabled */
+		.hw_ip_checksum = 1, /**< IP checksum offload enabled */
+		.hw_vlan_filter = 0, /**< VLAN filtering disabled */
+		.jumbo_frame    = 0, /**< Jumbo Frame Support disabled */
+		.hw_strip_crc   = 0, /**< CRC stripped by hardware */
 	},
 	.txmode = {
 		.mq_mode = ETH_MQ_TX_NONE,
@@ -93,6 +142,8 @@ struct output_buffer {
 	struct rte_mbuf *mbufs[BURST_SIZE];
 };
 
+static void print_stats(void);
+
 /*
  * Initialises a given port using global settings and with the rx buffers
  * coming from the mbuf_pool passed as parameter
@@ -101,9 +152,13 @@ static inline int
 port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 {
 	struct rte_eth_conf port_conf = port_conf_default;
-	const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
-	int retval;
+	const uint16_t rxRings = 1;
+	uint16_t txRings = rte_lcore_count() - 1;
 	uint16_t q;
+	int retval;
+
+	if (txRings > RTE_MAX_ETHPORTS)
+		txRings = RTE_MAX_ETHPORTS;
 
 	if (port >= rte_eth_dev_count())
 		return -1;
@@ -113,7 +168,7 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 		return retval;
 
 	for (q = 0; q < rxRings; q++) {
-		retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
+		retval = rte_eth_rx_queue_setup(port, q, RX_QUEUE_SIZE,
 						rte_eth_dev_socket_id(port),
 						NULL, mbuf_pool);
 		if (retval < 0)
@@ -121,7 +176,7 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 	}
 
 	for (q = 0; q < txRings; q++) {
-		retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE,
+		retval = rte_eth_tx_queue_setup(port, q, TX_QUEUE_SIZE,
 						rte_eth_dev_socket_id(port),
 						NULL);
 		if (retval < 0)
@@ -134,7 +189,8 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 
 	struct rte_eth_link link;
 	rte_eth_link_get_nowait(port, &link);
-	if (!link.link_status) {
+	while (!link.link_status) {
+		printf("Waiting for Link up on port %"PRIu8"\n", port);
 		sleep(1);
 		rte_eth_link_get_nowait(port, &link);
 	}
@@ -160,41 +216,52 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
 
 struct lcore_params {
 	unsigned worker_id;
-	struct rte_distributor *d;
-	struct rte_ring *r;
+	struct rte_distributor_burst *d;
+	struct rte_ring *rx_dist_ring;
+	struct rte_ring *dist_tx_ring;
 	struct rte_mempool *mem_pool;
 };
 
-static int
-quit_workers(struct rte_distributor *d, struct rte_mempool *p)
+static inline void
+flush_one_port(struct output_buffer *outbuf, uint8_t outp)
 {
-	const unsigned num_workers = rte_lcore_count() - 2;
-	unsigned i;
-	struct rte_mbuf *bufs[num_workers];
+	unsigned int nb_tx = rte_eth_tx_burst(outp, 0,
+			outbuf->mbufs, outbuf->count);
+	app_stats.tx.tx_pkts += outbuf->count;
 
-	if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
-		printf("line %d: Error getting mbufs from pool\n", __LINE__);
-		return -1;
+	if (unlikely(nb_tx < outbuf->count)) {
+		app_stats.tx.enqdrop_pkts +=  outbuf->count - nb_tx;
+		do {
+			rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
+		} while (++nb_tx < outbuf->count);
 	}
+	outbuf->count = 0;
+}
+
+static inline void
+flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports)
+{
+	uint8_t outp;
 
-	for (i = 0; i < num_workers; i++)
-		bufs[i]->hash.rss = i << 1;
+	for (outp = 0; outp < nb_ports; outp++) {
+		/* skip ports that are not enabled */
+		if ((enabled_port_mask & (1 << outp)) == 0)
+			continue;
 
-	rte_distributor_process(d, bufs, num_workers);
-	rte_mempool_put_bulk(p, (void *)bufs, num_workers);
+		if (tx_buffers[outp].count == 0)
+			continue;
 
-	return 0;
+		flush_one_port(&tx_buffers[outp], outp);
+	}
 }
 
 static int
 lcore_rx(struct lcore_params *p)
 {
-	struct rte_distributor *d = p->d;
-	struct rte_mempool *mem_pool = p->mem_pool;
-	struct rte_ring *r = p->r;
 	const uint8_t nb_ports = rte_eth_dev_count();
 	const int socket_id = rte_socket_id();
 	uint8_t port;
+	struct rte_mbuf *bufs[BURST_SIZE*2];
 
 	for (port = 0; port < nb_ports; port++) {
 		/* skip ports that are not enabled */
@@ -210,6 +277,7 @@ lcore_rx(struct lcore_params *p)
 
 	printf("\nCore %u doing packet RX.\n", rte_lcore_id());
 	port = 0;
+
 	while (!quit_signal_rx) {
 
 		/* skip ports that are not enabled */
@@ -218,7 +286,7 @@ lcore_rx(struct lcore_params *p)
 				port = 0;
 			continue;
 		}
-		struct rte_mbuf *bufs[BURST_SIZE*2];
+
 		const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
 				BURST_SIZE);
 		if (unlikely(nb_rx == 0)) {
@@ -228,19 +296,46 @@ lcore_rx(struct lcore_params *p)
 		}
 		app_stats.rx.rx_pkts += nb_rx;
 
-		rte_distributor_process(d, bufs, nb_rx);
-		const uint16_t nb_ret = rte_distributor_returned_pkts(d,
-				bufs, BURST_SIZE*2);
+/*
+ * You can run the distributor on the rx core with this code. Returned
+ * packets are then send straight to the tx core.
+ */
+#if 0
+
+#if BURST_API
+	rte_distributor_process_burst(d, bufs, nb_rx);
+	const uint16_t nb_ret = rte_distributor_returned_pkts_burst(d,
+			bufs, BURST_SIZE*2);
+#else
+	rte_distributor_process(d, bufs, nb_rx);
+	const uint16_t nb_ret = rte_distributor_returned_pkts(d,
+			bufs, BURST_SIZE*2);
+#endif
+
 		app_stats.rx.returned_pkts += nb_ret;
 		if (unlikely(nb_ret == 0)) {
 			if (++port == nb_ports)
 				port = 0;
 			continue;
 		}
-
-		uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret);
+		struct rte_ring *tx_ring = p->dist_tx_ring;
+		uint16_t sent = rte_ring_enqueue_burst(tx_ring,
+				(void *)bufs, nb_ret);
+#else
+		uint16_t nb_ret = nb_rx;
+		/*
+		* Swap the following two lines if you want the rx traffic
+		* to go directly to tx, no distribution.
+		*/
+		struct rte_ring *out_ring = p->rx_dist_ring;
+		//struct rte_ring *out_ring = p->dist_tx_ring;
+
+		uint16_t sent = rte_ring_enqueue_burst(out_ring,
+				(void *)bufs, nb_ret);
+#endif
 		app_stats.rx.enqueued_pkts += sent;
 		if (unlikely(sent < nb_ret)) {
+			app_stats.rx.enqdrop_pkts +=  nb_ret - sent;
 			RTE_LOG_DP(DEBUG, DISTRAPP,
 				"%s:Packet loss due to full ring\n", __func__);
 			while (sent < nb_ret)
@@ -249,56 +344,88 @@ lcore_rx(struct lcore_params *p)
 		if (++port == nb_ports)
 			port = 0;
 	}
-	rte_distributor_process(d, NULL, 0);
-	/* flush distributor to bring to known state */
-	rte_distributor_flush(d);
 	/* set worker & tx threads quit flag */
+	printf("\nCore %u exiting rx task.\n", rte_lcore_id());
 	quit_signal = 1;
-	/*
-	 * worker threads may hang in get packet as
-	 * distributor process is not running, just make sure workers
-	 * get packets till quit_signal is actually been
-	 * received and they gracefully shutdown
-	 */
-	if (quit_workers(d, mem_pool) != 0)
-		return -1;
-	/* rx thread should quit at last */
 	return 0;
 }
 
-static inline void
-flush_one_port(struct output_buffer *outbuf, uint8_t outp)
-{
-	unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
-			outbuf->count);
-	app_stats.tx.tx_pkts += nb_tx;
 
-	if (unlikely(nb_tx < outbuf->count)) {
-		RTE_LOG_DP(DEBUG, DISTRAPP,
-			"%s:Packet loss with tx_burst\n", __func__);
-		do {
-			rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
-		} while (++nb_tx < outbuf->count);
-	}
-	outbuf->count = 0;
-}
 
-static inline void
-flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports)
+static int
+lcore_distributor(struct lcore_params *p)
 {
-	uint8_t outp;
-	for (outp = 0; outp < nb_ports; outp++) {
-		/* skip ports that are not enabled */
-		if ((enabled_port_mask & (1 << outp)) == 0)
-			continue;
-
-		if (tx_buffers[outp].count == 0)
-			continue;
-
-		flush_one_port(&tx_buffers[outp], outp);
+	struct rte_ring *in_r = p->rx_dist_ring;
+	struct rte_ring *out_r = p->dist_tx_ring;
+	struct rte_mbuf *bufs[BURST_SIZE * 4];
+	struct rte_distributor_burst *d = p->d;
+
+	printf("\nCore %u acting as distributor core.\n", rte_lcore_id());
+	while (!quit_signal_dist) {
+		const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
+				(void *)bufs, BURST_SIZE*1);
+		if (nb_rx) {
+			app_stats.dist.in_pkts += nb_rx;
+/*
+ * This '#if' allows you to bypass the distributor. Incoming packets may be
+ * sent straight to the tx ring.
+ */
+#if 1
+
+#if BURST_API
+			/* Distribute the packets */
+			rte_distributor_process_burst(d, bufs, nb_rx);
+			/* Handle Returns */
+			const uint16_t nb_ret =
+				rte_distributor_returned_pkts_burst(d,
+					bufs, BURST_SIZE*2);
+#else
+			/* Distribute the packets */
+			rte_distributor_process(d, bufs, nb_rx);
+			/* Handle Returns */
+			const uint16_t nb_ret =
+				rte_distributor_returned_pkts(d,
+					bufs, BURST_SIZE*2);
+#endif
+
+#else
+			/* Bypass the distributor */
+			const unsigned int xor_val = (rte_eth_dev_count() > 1);
+			/* Touch the mbuf by xor'ing the port */
+			for (unsigned int i = 0; i < nb_rx; i++)
+				bufs[i]->port ^= xor_val;
+
+			const uint16_t nb_ret = nb_rx;
+#endif
+			if (unlikely(nb_ret == 0))
+				continue;
+			app_stats.dist.ret_pkts += nb_ret;
+
+			uint16_t sent = rte_ring_enqueue_burst(out_r,
+					(void *)bufs, nb_ret);
+			app_stats.dist.sent_pkts += sent;
+			if (unlikely(sent < nb_ret)) {
+				app_stats.dist.enqdrop_pkts += nb_ret - sent;
+				RTE_LOG(DEBUG, DISTRAPP,
+					"%s:Packet loss due to full out ring\n",
+					__func__);
+				while (sent < nb_ret)
+					rte_pktmbuf_free(bufs[sent++]);
+			}
+		}
 	}
+	printf("\nCore %u exiting distributor task.\n", rte_lcore_id());
+	quit_signal_work = 1;
+
+#if BURST_API
+	/* Unblock any returns so workers can exit */
+	rte_distributor_clear_returns_burst(d);
+#endif
+	quit_signal_rx = 1;
+	return 0;
 }
 
+
 static int
 lcore_tx(struct rte_ring *in_r)
 {
@@ -327,9 +454,9 @@ lcore_tx(struct rte_ring *in_r)
 			if ((enabled_port_mask & (1 << port)) == 0)
 				continue;
 
-			struct rte_mbuf *bufs[BURST_SIZE];
+			struct rte_mbuf *bufs[BURST_SIZE_TX];
 			const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
-					(void *)bufs, BURST_SIZE);
+					(void *)bufs, BURST_SIZE_TX);
 			app_stats.tx.dequeue_pkts += nb_rx;
 
 			/* if we get no traffic, flush anything we have */
@@ -358,11 +485,12 @@ lcore_tx(struct rte_ring *in_r)
 
 				outbuf = &tx_buffers[outp];
 				outbuf->mbufs[outbuf->count++] = bufs[i];
-				if (outbuf->count == BURST_SIZE)
+				if (outbuf->count == BURST_SIZE_TX)
 					flush_one_port(outbuf, outp);
 			}
 		}
 	}
+	printf("\nCore %u exiting tx task.\n", rte_lcore_id());
 	return 0;
 }
 
@@ -371,52 +499,147 @@ int_handler(int sig_num)
 {
 	printf("Exiting on signal %d\n", sig_num);
 	/* set quit flag for rx thread to exit */
-	quit_signal_rx = 1;
+	quit_signal_dist = 1;
 }
 
 static void
 print_stats(void)
 {
 	struct rte_eth_stats eth_stats;
-	unsigned i;
-
-	printf("\nRX thread stats:\n");
-	printf(" - Received:    %"PRIu64"\n", app_stats.rx.rx_pkts);
-	printf(" - Processed:   %"PRIu64"\n", app_stats.rx.returned_pkts);
-	printf(" - Enqueued:    %"PRIu64"\n", app_stats.rx.enqueued_pkts);
-
-	printf("\nTX thread stats:\n");
-	printf(" - Dequeued:    %"PRIu64"\n", app_stats.tx.dequeue_pkts);
-	printf(" - Transmitted: %"PRIu64"\n", app_stats.tx.tx_pkts);
+	unsigned int i, j;
+	const unsigned int num_workers = rte_lcore_count() - 4;
 
 	for (i = 0; i < rte_eth_dev_count(); i++) {
 		rte_eth_stats_get(i, &eth_stats);
-		printf("\nPort %u stats:\n", i);
-		printf(" - Pkts in:   %"PRIu64"\n", eth_stats.ipackets);
-		printf(" - Pkts out:  %"PRIu64"\n", eth_stats.opackets);
-		printf(" - In Errs:   %"PRIu64"\n", eth_stats.ierrors);
-		printf(" - Out Errs:  %"PRIu64"\n", eth_stats.oerrors);
-		printf(" - Mbuf Errs: %"PRIu64"\n", eth_stats.rx_nombuf);
+		app_stats.port_rx_pkts[i] = eth_stats.ipackets;
+		app_stats.port_tx_pkts[i] = eth_stats.opackets;
+	}
+
+	printf("\n\nRX Thread:\n");
+	for (i = 0; i < rte_eth_dev_count(); i++) {
+		printf("Port %u Pktsin : %5.2f\n", i,
+				(app_stats.port_rx_pkts[i] -
+				prev_app_stats.port_rx_pkts[i])/1000000.0);
+		prev_app_stats.port_rx_pkts[i] = app_stats.port_rx_pkts[i];
+	}
+	printf(" - Received:    %5.2f\n",
+			(app_stats.rx.rx_pkts -
+			prev_app_stats.rx.rx_pkts)/1000000.0);
+	printf(" - Returned:    %5.2f\n",
+			(app_stats.rx.returned_pkts -
+			prev_app_stats.rx.returned_pkts)/1000000.0);
+	printf(" - Enqueued:    %5.2f\n",
+			(app_stats.rx.enqueued_pkts -
+			prev_app_stats.rx.enqueued_pkts)/1000000.0);
+	printf(" - Dropped:     %s%5.2f%s\n", ANSI_COLOR_RED,
+			(app_stats.rx.enqdrop_pkts -
+			prev_app_stats.rx.enqdrop_pkts)/1000000.0,
+			ANSI_COLOR_RESET);
+
+	printf("Distributor thread:\n");
+	printf(" - In:          %5.2f\n",
+			(app_stats.dist.in_pkts -
+			prev_app_stats.dist.in_pkts)/1000000.0);
+	printf(" - Returned:    %5.2f\n",
+			(app_stats.dist.ret_pkts -
+			prev_app_stats.dist.ret_pkts)/1000000.0);
+	printf(" - Sent:        %5.2f\n",
+			(app_stats.dist.sent_pkts -
+			prev_app_stats.dist.sent_pkts)/1000000.0);
+	printf(" - Dropped      %s%5.2f%s\n", ANSI_COLOR_RED,
+			(app_stats.dist.enqdrop_pkts -
+			prev_app_stats.dist.enqdrop_pkts)/1000000.0,
+			ANSI_COLOR_RESET);
+
+	printf("TX thread:\n");
+	printf(" - Dequeued:    %5.2f\n",
+			(app_stats.tx.dequeue_pkts -
+			prev_app_stats.tx.dequeue_pkts)/1000000.0);
+	for (i = 0; i < rte_eth_dev_count(); i++) {
+		printf("Port %u Pktsout: %5.2f\n",
+				i, (app_stats.port_tx_pkts[i] -
+				prev_app_stats.port_tx_pkts[i])/1000000.0);
+		prev_app_stats.port_tx_pkts[i] = app_stats.port_tx_pkts[i];
+	}
+	printf(" - Transmitted: %5.2f\n",
+			(app_stats.tx.tx_pkts -
+			prev_app_stats.tx.tx_pkts)/1000000.0);
+	printf(" - Dropped:     %s%5.2f%s\n", ANSI_COLOR_RED,
+			(app_stats.tx.enqdrop_pkts -
+			prev_app_stats.tx.enqdrop_pkts)/1000000.0,
+			ANSI_COLOR_RESET);
+
+	prev_app_stats.rx.rx_pkts = app_stats.rx.rx_pkts;
+	prev_app_stats.rx.returned_pkts = app_stats.rx.returned_pkts;
+	prev_app_stats.rx.enqueued_pkts = app_stats.rx.enqueued_pkts;
+	prev_app_stats.rx.enqdrop_pkts = app_stats.rx.enqdrop_pkts;
+	prev_app_stats.dist.in_pkts = app_stats.dist.in_pkts;
+	prev_app_stats.dist.ret_pkts = app_stats.dist.ret_pkts;
+	prev_app_stats.dist.sent_pkts = app_stats.dist.sent_pkts;
+	prev_app_stats.dist.enqdrop_pkts = app_stats.dist.enqdrop_pkts;
+	prev_app_stats.tx.dequeue_pkts = app_stats.tx.dequeue_pkts;
+	prev_app_stats.tx.tx_pkts = app_stats.tx.tx_pkts;
+	prev_app_stats.tx.enqdrop_pkts = app_stats.tx.enqdrop_pkts;
+
+	for (i = 0; i < num_workers; i++) {
+		printf("Worker %02u Pkts: %5.2f. Bursts(1-8): ", i,
+				(app_stats.worker_pkts[i] -
+				prev_app_stats.worker_pkts[i])/1000000.0);
+		for (j = 0; j < 8; j++)
+			printf("%ld ", app_stats.worker_bursts[i][j]);
+		printf("\n");
+		prev_app_stats.worker_pkts[i] = app_stats.worker_pkts[i];
 	}
 }
 
 static int
 lcore_worker(struct lcore_params *p)
 {
-	struct rte_distributor *d = p->d;
+	struct rte_distributor_burst *d = p->d;
 	const unsigned id = p->worker_id;
+	unsigned int num = 0;
+	unsigned int i;
+
 	/*
 	 * for single port, xor_val will be zero so we won't modify the output
 	 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa
 	 */
 	const unsigned xor_val = (rte_eth_dev_count() > 1);
-	struct rte_mbuf *buf = NULL;
+	struct rte_mbuf *buf[8] __rte_cache_aligned;
+
+	for (i = 0; i < 8; i++)
+		buf[i] = NULL;
+
+	app_stats.worker_pkts[p->worker_id] = 1;
+
 
 	printf("\nCore %u acting as worker core.\n", rte_lcore_id());
-	while (!quit_signal) {
-		buf = rte_distributor_get_pkt(d, id, buf);
-		buf->port ^= xor_val;
+	while (!quit_signal_work) {
+
+#if BURST_API
+		num = rte_distributor_get_pkt_burst(d, id, buf, buf, num);
+		/* Do a little bit of work for each packet */
+		for (i = 0; i < num; i++) {
+			uint64_t t = __rdtsc()+100;
+
+			while (__rdtsc() < t)
+				rte_pause();
+			buf[i]->port ^= xor_val;
+		}
+#else
+		buf[0] = rte_distributor_get_pkt(d, id, buf[0]);
+		uint64_t t = __rdtsc() + 10;
+
+		while (__rdtsc() < t)
+			rte_pause();
+		buf[0]->port ^= xor_val;
+#endif
+
+		app_stats.worker_pkts[p->worker_id] += num;
+		if (num > 0)
+			app_stats.worker_bursts[p->worker_id][num-1]++;
 	}
+	printf("\nCore %u exiting worker task.\n", rte_lcore_id());
 	return 0;
 }
 
@@ -496,12 +719,14 @@ int
 main(int argc, char *argv[])
 {
 	struct rte_mempool *mbuf_pool;
-	struct rte_distributor *d;
-	struct rte_ring *output_ring;
+	struct rte_distributor_burst *d;
+	struct rte_ring *dist_tx_ring;
+	struct rte_ring *rx_dist_ring;
 	unsigned lcore_id, worker_id = 0;
 	unsigned nb_ports;
 	uint8_t portid;
 	uint8_t nb_ports_available;
+	uint64_t t, freq;
 
 	/* catch ctrl-c so we can print on exit */
 	signal(SIGINT, int_handler);
@@ -518,10 +743,12 @@ main(int argc, char *argv[])
 	if (ret < 0)
 		rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n");
 
-	if (rte_lcore_count() < 3)
+	if (rte_lcore_count() < 5)
 		rte_exit(EXIT_FAILURE, "Error, This application needs at "
-				"least 3 logical cores to run:\n"
-				"1 lcore for packet RX and distribution\n"
+				"least 5 logical cores to run:\n"
+				"1 lcore for stats (can be core 0)\n"
+				"1 lcore for packet RX\n"
+				"1 lcore for distribution\n"
 				"1 lcore for packet TX\n"
 				"and at least 1 lcore for worker threads\n");
 
@@ -560,41 +787,86 @@ main(int argc, char *argv[])
 				"All available ports are disabled. Please set portmask.\n");
 	}
 
+#if BURST_API
+	d = rte_distributor_create_burst("PKT_DIST", rte_socket_id(),
+			rte_lcore_count() - 4);
+#else
 	d = rte_distributor_create("PKT_DIST", rte_socket_id(),
-			rte_lcore_count() - 2);
+			rte_lcore_count() - 4);
+#endif
 	if (d == NULL)
 		rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
 
 	/*
-	 * scheduler ring is read only by the transmitter core, but written to
-	 * by multiple threads
+	 * scheduler ring is read by the transmitter core, and written to
+	 * by scheduler core
 	 */
-	output_ring = rte_ring_create("Output_ring", RTE_RING_SZ,
-			rte_socket_id(), RING_F_SC_DEQ);
-	if (output_ring == NULL)
+	dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ,
+			rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
+	if (dist_tx_ring == NULL)
+		rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
+
+	rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ,
+			rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
+	if (rx_dist_ring == NULL)
 		rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
 
 	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
-		if (worker_id == rte_lcore_count() - 2)
+		if (worker_id == rte_lcore_count() - 3) {
+			printf("Starting distributor on lcore_id %d\n",
+					lcore_id);
+			/* distributor core */
+			struct lcore_params *p =
+					rte_malloc(NULL, sizeof(*p), 0);
+			if (!p)
+				rte_panic("malloc failure\n");
+			*p = (struct lcore_params){worker_id, d,
+					rx_dist_ring, dist_tx_ring, mbuf_pool};
+			rte_eal_remote_launch(
+					(lcore_function_t *)lcore_distributor,
+					p, lcore_id);
+		} else if (worker_id == rte_lcore_count() - 4) {
+			printf("Starting tx  on worker_id %d, lcore_id %d\n",
+					worker_id, lcore_id);
+			/* tx core */
 			rte_eal_remote_launch((lcore_function_t *)lcore_tx,
-					output_ring, lcore_id);
-		else {
+					dist_tx_ring, lcore_id);
+		} else if (worker_id == rte_lcore_count() - 2) {
+			printf("Starting rx on worker_id %d, lcore_id %d\n",
+					worker_id, lcore_id);
+			/* rx core */
+			struct lcore_params *p =
+					rte_malloc(NULL, sizeof(*p), 0);
+			if (!p)
+				rte_panic("malloc failure\n");
+			*p = (struct lcore_params){worker_id, d, rx_dist_ring,
+					dist_tx_ring, mbuf_pool};
+			rte_eal_remote_launch((lcore_function_t *)lcore_rx,
+					p, lcore_id);
+		} else {
+			printf("Starting worker on worker_id %d, lcore_id %d\n",
+					worker_id, lcore_id);
 			struct lcore_params *p =
 					rte_malloc(NULL, sizeof(*p), 0);
 			if (!p)
 				rte_panic("malloc failure\n");
-			*p = (struct lcore_params){worker_id, d, output_ring, mbuf_pool};
+			*p = (struct lcore_params){worker_id, d, rx_dist_ring,
+					dist_tx_ring, mbuf_pool};
 
 			rte_eal_remote_launch((lcore_function_t *)lcore_worker,
 					p, lcore_id);
 		}
 		worker_id++;
 	}
-	/* call lcore_main on master core only */
-	struct lcore_params p = { 0, d, output_ring, mbuf_pool};
 
-	if (lcore_rx(&p) != 0)
-		return -1;
+	freq = rte_get_timer_hz();
+	t = __rdtsc() + freq;
+	while (!quit_signal_dist) {
+		if (t < __rdtsc()) {
+			print_stats();
+			t = _rdtsc() + freq;
+		}
+	}
 
 	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
 		if (rte_eal_wait_lcore(lcore_id) < 0)
-- 
2.7.4

^ permalink raw reply related

* [PATCH v4 6/6] doc: distributor library changes for new burst api
From: David Hunt @ 2017-01-09  7:50 UTC (permalink / raw)
  To: dev; +Cc: bruce.richardson, David Hunt
In-Reply-To: <1483948248-91364-1-git-send-email-david.hunt@intel.com>

Signed-off-by: David Hunt <david.hunt@intel.com>
---
 doc/guides/prog_guide/packet_distrib_lib.rst | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/doc/guides/prog_guide/packet_distrib_lib.rst b/doc/guides/prog_guide/packet_distrib_lib.rst
index b5bdabb..dffd4ad 100644
--- a/doc/guides/prog_guide/packet_distrib_lib.rst
+++ b/doc/guides/prog_guide/packet_distrib_lib.rst
@@ -42,6 +42,10 @@ The model of operation is shown in the diagram below.
 
    Packet Distributor mode of operation
 
+There are two versions of the API in the distributor Library, one which sends one packet at a time to workers,
+and another which sends bursts of up to 8 packets at a time to workers. The functions names of the second API
+are identified by "_burst", and must not be intermixed with the single packet API. The operations described below
+apply to both API's, select which API you wish to use by including the relevant header file.
 
 Distributor Core Operation
 --------------------------
-- 
2.7.4

^ permalink raw reply related

* Re: [PATCH v2 3/3] app/crypto-perf: introduce new performance test application
From: De Lara Guarch, Pablo @ 2017-01-09 14:51 UTC (permalink / raw)
  To: Mrozowicz, SlawomirX, dev@dpdk.org
  Cc: Mrozowicz, SlawomirX, Doherty, Declan, Azarewicz, PiotrX T,
	Kerlin, Marcin, Kobylinski, MichalX
In-Reply-To: <1483635001-15473-4-git-send-email-slawomirx.mrozowicz@intel.com>



> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Slawomir
> Mrozowicz
> Sent: Thursday, January 05, 2017 4:50 PM
> To: dev@dpdk.org
> Cc: Mrozowicz, SlawomirX; Doherty, Declan; Azarewicz, PiotrX T; Kerlin,
> Marcin; Kobylinski, MichalX
> Subject: [dpdk-dev] [PATCH v2 3/3] app/crypto-perf: introduce new
> performance test application
> 
> This patchset introduce new application which allows measuring
> performance parameters of PMDs available in crypto tree. The goal of
> this application is to replace existing performance tests in app/test.
> Parameters available are: throughput (--ptest throughput) and latency
> (--ptest latency). User can use multiply cores to run tests on but only
> one type of crypto PMD can be measured during single application
> execution. Cipher parameters, type of device, type of operation and
> chain mode have to be specified in the command line as application
> parameters. These parameters are checked using device capabilities
> structure.
> Couple of new library functions in librte_cryptodev are introduced for
> application use.
> To build the application a CONFIG_RTE_APP_CRYPTO_PERF flag has to be
> set
> (it is set by default).
> Example of usage: -c 0xc0 --vdev crypto_aesni_mb_pmd -w 0000:00:00.0 --
> --ptest throughput --devtype crypto_aesni_mb --optype cipher-then-auth
> --cipher-algo aes-cbc --cipher-op encrypt --cipher-key-sz 16 --auth-algo
> sha1-hmac --auth-op generate --auth-key-sz 64 --auth-digest-sz 12
> --total-ops 10000000 --burst-sz 32 --buffer-sz 64
> 
> Signed-off-by: Declan Doherty <declan.doherty@intel.com>
> Signed-off-by: Slawomir Mrozowicz <slawomirx.mrozowicz@intel.com>
> Signed-off-by: Piotr Azarewicz <piotrx.t.azarewicz@intel.com>
> Signed-off-by: Marcin Kerlin <marcinx.kerlin@intel.com>
> Signed-off-by: Michal Kobylinski <michalx.kobylinski@intel.com>
> ---
>  MAINTAINERS                                 |   4 +
>  app/Makefile                                |   1 +
>  app/crypto-perf/Makefile                    |  51 ++
>  app/crypto-perf/cperf.h                     |  58 ++
>  app/crypto-perf/cperf_ops.c                 | 474 +++++++++++++++
>  app/crypto-perf/cperf_ops.h                 |  66 +++
>  app/crypto-perf/cperf_options.h             | 104 ++++
>  app/crypto-perf/cperf_options_parsing.c     | 875
> ++++++++++++++++++++++++++++
>  app/crypto-perf/cperf_test_latency.c        | 685
> ++++++++++++++++++++++
>  app/crypto-perf/cperf_test_latency.h        |  57 ++
>  app/crypto-perf/cperf_test_throughput.c     | 651
> +++++++++++++++++++++
>  app/crypto-perf/cperf_test_throughput.h     |  58 ++
>  app/crypto-perf/cperf_test_vector_parsing.c | 500 ++++++++++++++++
>  app/crypto-perf/cperf_test_vector_parsing.h |  73 +++
>  app/crypto-perf/cperf_test_vectors.c        | 476 +++++++++++++++
>  app/crypto-perf/cperf_test_vectors.h        |  98 ++++
>  app/crypto-perf/cperf_verify_parser.c       | 314 ++++++++++
>  app/crypto-perf/data/aes_cbc_128_sha.data   | 503 ++++++++++++++++
>  app/crypto-perf/data/aes_cbc_192_sha.data   | 504 ++++++++++++++++
>  app/crypto-perf/data/aes_cbc_256_sha.data   | 504 ++++++++++++++++
>  app/crypto-perf/main.c                      | 411 +++++++++++++
>  config/common_base                          |   6 +
>  doc/guides/rel_notes/release_17_02.rst      |   5 +
>  doc/guides/tools/cryptoperf.rst             | 397 +++++++++++++
>  doc/guides/tools/index.rst                  |   1 +
>  25 files changed, 6876 insertions(+)

...

> diff --git a/app/crypto-perf/cperf_options_parsing.c b/app/crypto-
> perf/cperf_options_parsing.c

...

> +int
> +cperf_options_check(struct cperf_options *options)
> +{
> +	if (options->segments_nb > options->buffer_sz) {
> +		RTE_LOG(ERR, USER1,
> +				"Segments number greater than buffer
> size.\n");
> +		return -EINVAL;
> +	}

...

> +	} else if (options->op_type == CPERF_AEAD) {
> +		if (!(options->cipher_op ==
> RTE_CRYPTO_CIPHER_OP_ENCRYPT &&
> +				options->auth_op ==
> +				RTE_CRYPTO_AUTH_OP_GENERATE) ||

This logic is incorrect. This OR should be an AND.

> +				!(options->cipher_op ==
> +				RTE_CRYPTO_CIPHER_OP_DECRYPT &&
> +				options->auth_op ==
> +				RTE_CRYPTO_AUTH_OP_VERIFY)) {
> +			RTE_LOG(ERR, USER1, "Use together options:
> encrypt and"
> +					" generate or decrypt and verify.\n");
> +			return -EINVAL;
> +		}
> +	}
> +
> +	return 0;
> +}

^ permalink raw reply

* Cannot use MLX4 with igb_uio driver
From: Royce Niu @ 2017-01-09 14:53 UTC (permalink / raw)
  To: dev

Dear all,

I cannot use my Mellanox 3 Pro, after I binded it with igb_uio driver.

It always shows when I use my DPDK application.

EAL: Detected 32 lcore(s)
EAL: Probing VFIO support...
PMD: bnxt_rte_pmd_init() called for (null)
EAL: PCI device 0000:02:00.0 on NUMA socket 0
EAL:   probe driver: 8086:1521 rte_igb_pmd
EAL: PCI device 0000:02:00.1 on NUMA socket 0
EAL:   probe driver: 8086:1521 rte_igb_pmd
EAL: PCI device 0000:02:00.2 on NUMA socket 0
EAL:   probe driver: 8086:1521 rte_igb_pmd
EAL: PCI device 0000:02:00.3 on NUMA socket 0
EAL:   probe driver: 8086:1521 rte_igb_pmd
EAL: PCI device 0000:81:00.0 on NUMA socket 1
EAL:   probe driver: 15b3:1007 librte_pmd_mlx4
PMD: librte_pmd_mlx4: cannot access device, is mlx4_ib loaded?
EAL: Error - exiting with code: 1
  Cause: Cannot create mbuf pool

---------------
I have added CONFIG_RTE_LIBRTE_MLX4_PMD=y in .config, and
install MLNX_OFED_LINUX-3.4-2.0.0.0.

Thanks.
-- 
Regards,

Royce

^ permalink raw reply

* Re: Cannot use MLX4 with igb_uio driver
From: Adrien Mazarguil @ 2017-01-09 15:13 UTC (permalink / raw)
  To: Royce Niu; +Cc: dev
In-Reply-To: <CAOwUCNvmQOv3ZUwOCq+kevPwyAxOAH+V=ucsxMKFkApusHWnYw@mail.gmail.com>

Hi Royce,

On Mon, Jan 09, 2017 at 10:53:37PM +0800, Royce Niu wrote:
> Dear all,
> 
> I cannot use my Mellanox 3 Pro, after I binded it with igb_uio driver.
> 
> It always shows when I use my DPDK application.
> 
> EAL: Detected 32 lcore(s)
> EAL: Probing VFIO support...
> PMD: bnxt_rte_pmd_init() called for (null)
> EAL: PCI device 0000:02:00.0 on NUMA socket 0
> EAL:   probe driver: 8086:1521 rte_igb_pmd
> EAL: PCI device 0000:02:00.1 on NUMA socket 0
> EAL:   probe driver: 8086:1521 rte_igb_pmd
> EAL: PCI device 0000:02:00.2 on NUMA socket 0
> EAL:   probe driver: 8086:1521 rte_igb_pmd
> EAL: PCI device 0000:02:00.3 on NUMA socket 0
> EAL:   probe driver: 8086:1521 rte_igb_pmd
> EAL: PCI device 0000:81:00.0 on NUMA socket 1
> EAL:   probe driver: 15b3:1007 librte_pmd_mlx4
> PMD: librte_pmd_mlx4: cannot access device, is mlx4_ib loaded?
> EAL: Error - exiting with code: 1
>   Cause: Cannot create mbuf pool
> 
> ---------------
> I have added CONFIG_RTE_LIBRTE_MLX4_PMD=y in .config, and
> install MLNX_OFED_LINUX-3.4-2.0.0.0.

The mlx4 PMD does not operate through igb_uio (see mlx4 documentation [1]),
PCI devices must remain bound to their original kernel module (mlx4_core),
however you have to additionally load mlx4_ib, mlx4_en and ib_uverbs [2].

[1] http://dpdk.org/doc/guides/nics/mlx4.html
[2] http://dpdk.org/doc/guides/nics/mlx4.html#prerequisites

-- 
Adrien Mazarguil
6WIND

^ permalink raw reply

* Re: [PATCH v5 11/12] drivers: update PMDs to use rte_driver probe and remove
From: Ferruh Yigit @ 2017-01-09 15:19 UTC (permalink / raw)
  To: Shreyansh Jain, david.marchand; +Cc: dev, thomas.monjalon
In-Reply-To: <1482758645-23057-12-git-send-email-shreyansh.jain@nxp.com>

On 12/26/2016 1:24 PM, Shreyansh Jain wrote:
> These callbacks now act as first layer of PCI interfaces from the Bus.
> Bus probe would enter the PMDs through the rte_driver->probe/remove
> callbacks, falling to rte_xxx_driver->probe/remove (Currently, all the
> drivers are rte_pci_driver).
> 
> This patch also changes QAT which is the only crypto PMD based on PCI.
> All others would be changed in a separate patch focused on VDEV.
> 
> Signed-off-by: Shreyansh Jain <shreyansh.jain@nxp.com>
> ---
>  drivers/crypto/qat/rte_qat_cryptodev.c  | 4 ++++
>  drivers/net/bnx2x/bnx2x_ethdev.c        | 8 ++++++++
>  drivers/net/bnxt/bnxt_ethdev.c          | 4 ++++
>  drivers/net/cxgbe/cxgbe_ethdev.c        | 4 ++++
>  drivers/net/e1000/em_ethdev.c           | 4 ++++
>  drivers/net/e1000/igb_ethdev.c          | 8 ++++++++
>  drivers/net/ena/ena_ethdev.c            | 4 ++++
>  drivers/net/enic/enic_ethdev.c          | 4 ++++
>  drivers/net/fm10k/fm10k_ethdev.c        | 4 ++++
>  drivers/net/i40e/i40e_ethdev.c          | 4 ++++
>  drivers/net/i40e/i40e_ethdev_vf.c       | 4 ++++
>  drivers/net/ixgbe/ixgbe_ethdev.c        | 8 ++++++++
>  drivers/net/mlx4/mlx4.c                 | 3 ++-
>  drivers/net/mlx5/mlx5.c                 | 3 ++-
>  drivers/net/nfp/nfp_net.c               | 4 ++++
>  drivers/net/qede/qede_ethdev.c          | 8 ++++++++
>  drivers/net/szedata2/rte_eth_szedata2.c | 4 ++++
>  drivers/net/thunderx/nicvf_ethdev.c     | 4 ++++
>  drivers/net/virtio/virtio_ethdev.c      | 2 ++
>  drivers/net/vmxnet3/vmxnet3_ethdev.c    | 4 ++++
>  20 files changed, 90 insertions(+), 2 deletions(-)
> 
> diff --git a/drivers/crypto/qat/rte_qat_cryptodev.c b/drivers/crypto/qat/rte_qat_cryptodev.c
> index 1e7ee61..bc1a9c6 100644
> --- a/drivers/crypto/qat/rte_qat_cryptodev.c
> +++ b/drivers/crypto/qat/rte_qat_cryptodev.c
> @@ -120,6 +120,10 @@ crypto_qat_dev_init(__attribute__((unused)) struct rte_cryptodev_driver *crypto_
>  
>  static struct rte_cryptodev_driver rte_qat_pmd = {
>  	.pci_drv = {
> +		.driver = {
> +			.probe = rte_eal_pci_probe,
> +			.remove = rte_eal_pci_remove,
> +		},

Since this part is common for all PCI drivers, why not make this part of
RTE_PMD_REGISTER_PCI macro?

<...>

^ permalink raw reply

* Re: [PATCH v5 01/12] eal/bus: introduce bus abstraction
From: Ferruh Yigit @ 2017-01-09 15:22 UTC (permalink / raw)
  To: Shreyansh Jain, david.marchand; +Cc: dev, thomas.monjalon
In-Reply-To: <1482758645-23057-2-git-send-email-shreyansh.jain@nxp.com>

On 12/26/2016 1:23 PM, Shreyansh Jain wrote:

<...>

> +
> +DPDK_17.02 {
> +	global:
> +
> +	rte_bus_list;
> +	rte_eal_bus_add_device;
> +	rte_eal_bus_add_driver;
> +	rte_eal_bus_get;
> +	rte_eal_bus_dump;
> +	rte_eal_bus_register;

> +	rte_eal_bus_insert_device;

This function added in patch 3/12, it can be good to add this function
into .map file in that patch.

<...>

^ permalink raw reply

* Re: [PATCH] crypto test: add integrity check for mbuf data
From: Kusztal, ArkadiuszX @ 2017-01-09 15:22 UTC (permalink / raw)
  To: Trahe, Fiona, dev@dpdk.org
  Cc: Griffin, John, Jain, Deepak K, De Lara Guarch, Pablo,
	Trahe, Fiona
In-Reply-To: <1482425467-27415-1-git-send-email-fiona.trahe@intel.com>



> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Fiona Trahe
> Sent: Thursday, December 22, 2016 4:51 PM
> To: dev@dpdk.org
> Cc: Griffin, John <john.griffin@intel.com>; Jain, Deepak K
> <deepak.k.jain@intel.com>; De Lara Guarch, Pablo
> <pablo.de.lara.guarch@intel.com>; Trahe, Fiona <fiona.trahe@intel.com>
> Subject: [dpdk-dev] [PATCH] crypto test: add integrity check for mbuf data
> 
> In block cipher test cases, add checks that the source and destination mbufs
> are not modified except where expected.
> 
> Signed-off-by: Fiona Trahe <fiona.trahe@intel.com>
> ---
>  app/test/test_cryptodev_blockcipher.c | 139
> ++++++++++++++++++++++++++++++++--
>  1 file changed, 134 insertions(+), 5 deletions(-)
> 
> --
> 2.5.0

Acked-by: Arek Kusztal <arkadiuszx.kusztal@intel.com>

^ permalink raw reply

* Re: Cannot use MLX4 with igb_uio driver
From: Royce Niu @ 2017-01-09 15:23 UTC (permalink / raw)
  To: Adrien Mazarguil; +Cc: Royce Niu, dev
In-Reply-To: <20170109151353.GZ12822@6wind.com>

Hi, Adrien,

Actually, I tested using the original kernel module without binding. It
works.

However, it is only 6Mpps for 64B in pkt-gen, which is so slow for a 40Gbps
NIC.

Is that right?


On Mon, Jan 9, 2017 at 11:13 PM, Adrien Mazarguil <
adrien.mazarguil@6wind.com> wrote:

> Hi Royce,
>
> On Mon, Jan 09, 2017 at 10:53:37PM +0800, Royce Niu wrote:
> > Dear all,
> >
> > I cannot use my Mellanox 3 Pro, after I binded it with igb_uio driver.
> >
> > It always shows when I use my DPDK application.
> >
> > EAL: Detected 32 lcore(s)
> > EAL: Probing VFIO support...
> > PMD: bnxt_rte_pmd_init() called for (null)
> > EAL: PCI device 0000:02:00.0 on NUMA socket 0
> > EAL:   probe driver: 8086:1521 rte_igb_pmd
> > EAL: PCI device 0000:02:00.1 on NUMA socket 0
> > EAL:   probe driver: 8086:1521 rte_igb_pmd
> > EAL: PCI device 0000:02:00.2 on NUMA socket 0
> > EAL:   probe driver: 8086:1521 rte_igb_pmd
> > EAL: PCI device 0000:02:00.3 on NUMA socket 0
> > EAL:   probe driver: 8086:1521 rte_igb_pmd
> > EAL: PCI device 0000:81:00.0 on NUMA socket 1
> > EAL:   probe driver: 15b3:1007 librte_pmd_mlx4
> > PMD: librte_pmd_mlx4: cannot access device, is mlx4_ib loaded?
> > EAL: Error - exiting with code: 1
> >   Cause: Cannot create mbuf pool
> >
> > ---------------
> > I have added CONFIG_RTE_LIBRTE_MLX4_PMD=y in .config, and
> > install MLNX_OFED_LINUX-3.4-2.0.0.0.
>
> The mlx4 PMD does not operate through igb_uio (see mlx4 documentation [1]),
> PCI devices must remain bound to their original kernel module (mlx4_core),
> however you have to additionally load mlx4_ib, mlx4_en and ib_uverbs [2].
>
> [1] http://dpdk.org/doc/guides/nics/mlx4.html
> [2] http://dpdk.org/doc/guides/nics/mlx4.html#prerequisites
>
> --
> Adrien Mazarguil
> 6WIND
>



-- 
Regards,

Royce

^ permalink raw reply

* Re: [PATCH v2 3/3] app/crypto-perf: introduce new performance test application
From: Thomas Monjalon @ 2017-01-09 15:24 UTC (permalink / raw)
  To: Slawomir Mrozowicz, Declan Doherty
  Cc: dev, Piotr Azarewicz, Marcin Kerlin, Michal Kobylinski
In-Reply-To: <1483635001-15473-4-git-send-email-slawomirx.mrozowicz@intel.com>

2017-01-05 17:50, Slawomir Mrozowicz:
> --- a/app/Makefile
> +++ b/app/Makefile
> @@ -38,5 +38,6 @@ DIRS-$(CONFIG_RTE_TEST_PMD) += test-pmd
>  DIRS-$(CONFIG_RTE_LIBRTE_CMDLINE) += cmdline_test
>  DIRS-$(CONFIG_RTE_EXEC_ENV_LINUXAPP) += proc_info
>  DIRS-$(CONFIG_RTE_LIBRTE_PDUMP) += pdump
> +DIRS-$(CONFIG_RTE_APP_CRYPTO_PERF) += crypto-perf

Could we rename the directory to test-crypto or test-crypto-perf
for consistency?

> --- /dev/null
> +++ b/app/crypto-perf/Makefile
[...]
> +APP = dpdk-crypto-perf

I think we should have "test" in the name of the app.
dpdk-test-crypto?
dpdk-test-crypto-perf?

^ permalink raw reply

* Re: [PATCH v5 2/6] net/mlx5: support basic flow items and actions
From: Adrien Mazarguil @ 2017-01-09 15:29 UTC (permalink / raw)
  To: Ferruh Yigit; +Cc: Nelio Laranjeiro, dev
In-Reply-To: <37389e42-030a-a36f-57bc-a7ef15a0ee66@intel.com>

Hi Ferruh,

On Fri, Jan 06, 2017 at 01:52:53PM +0000, Ferruh Yigit wrote:
> On 1/4/2017 6:42 PM, Adrien Mazarguil wrote:
> > Hi Ferruh,
> > 
> > On Wed, Jan 04, 2017 at 05:49:46PM +0000, Ferruh Yigit wrote:
> >> Hi Nelio,
> >>
> >> A quick question.
> > 
> > I'll reply since it's related to the API.
> > 
> >> On 12/29/2016 3:15 PM, Nelio Laranjeiro wrote:
> >>> Introduce initial software for rte_flow rules.
> >>>
> >>> VLAN, VXLAN are still not supported.
> >>>
> >>> Signed-off-by: Nelio Laranjeiro <nelio.laranjeiro@6wind.com>
> >>> Acked-by: Adrien Mazarguil <adrien.mazarguil@6wind.com>
> >>
> >> <...>
> >>
> >>> +static int
> >>> +priv_flow_validate(struct priv *priv,
> >>> +		   const struct rte_flow_attr *attr,
> >>> +		   const struct rte_flow_item items[],
> >>> +		   const struct rte_flow_action actions[],
> >>> +		   struct rte_flow_error *error,
> >>> +		   struct mlx5_flow *flow)
> >>> +{
> >>> +	const struct mlx5_flow_items *cur_item = mlx5_flow_items;
> >>
> >> <...>
> >>
> >>> +	for (; items->type != RTE_FLOW_ITEM_TYPE_END; ++items) {
> >> <...>
> >>> +	}
> >>> +	for (; actions->type != RTE_FLOW_ACTION_TYPE_END; ++actions) {
> >> <...>
> >>> +	}
> >>
> >> Is it guarantied in somewhere that items or actions terminated with
> >> TYPE_END?
> > 
> > Yes, since it's now the only way to terminate items/actions lists [1][2].
> > There used to be a "max" value in the original draft but it seemed redundant
> > and proved annoying to use, and was therefore dropped.
> > 
> > END items/actions behave like a NUL terminator for C strings. They are
> > likewise defined with value 0 for convenience.
> 
> At least it is good idea to set END values to 0, but still if user not
> set it, most probably this will crash the app.
> 
> Although most probably this kind of error will be detected easily in
> development phase, still it would be nice to return an error instead of
> crashing when user provide wrong input.

Unfortunately I cannot think of an easy way to do that, even for debugging
purposes, this would be like checking for unterminated strings or linked
lists without a NULL ending pointer. That's the trade-off of any unbounded
data structure.

Note PMDs will likely return errors as they iterate on garbage item/action
types, crashes will also almost always occur when attempting to dereference
the related spec/last/mask/conf pointers.

> >> And these fields are direct inputs from user.
> >> Is there a way to verify user provided values are with TYPE_END terminated?
> > 
> > No, applications must check for its presence (they normally add it
> > themselves) before feeding these lists to PMDs. I think that's safe enough.
> > 
> > Note the testpmd flow command does not allow entering a flow rule without
> > "end" tokens in both lists, there is no way around this restriction.
> > 
> > [1] http://dpdk.org/doc/guides/prog_guide/rte_flow.html#matching-pattern
> > [2] http://dpdk.org/doc/guides/prog_guide/rte_flow.html#actions

-- 
Adrien Mazarguil
6WIND

^ permalink raw reply

* Re: [PATCH v2 07/11] crypto/dpaa2_sec: Add DPAA2_SEC PMD into build system
From: Thomas Monjalon @ 2017-01-09 15:33 UTC (permalink / raw)
  To: Akhil Goyal, hemant.agrawal
  Cc: dev, declan.doherty, pablo.de.lara.guarch, john.mcnamara, nhorman
In-Reply-To: <20161222201700.20020-8-akhil.goyal@nxp.com>

2016-12-23 01:46, Akhil Goyal:
> +ifeq ($(CONFIG_RTE_LIBRTE_DPAA2_COMMON),y)
> +_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_DPAA2_SEC)   += -lrte_pmd_dpaa2_sec
> +_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_DPAA2_SEC)   += -lrte_pmd_dpaa2_qbman
> +_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_DPAA2_SEC)   += -lrte_pmd_dpaa2_dpio
> +_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_DPAA2_SEC)   += -lrte_pmd_dpaa2_pool
> +_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_DPAA2_SEC)   += -lrte_pmd_fslmcbus
> +endif

There are so much libs!
We do not have even one commit per library in this patchset.
Splitting patches would allow to better introduce them one by one
with an explanation of the design and the role of each library.

^ permalink raw reply

* Re: Cannot use MLX4 with igb_uio driver
From: Adrien Mazarguil @ 2017-01-09 16:13 UTC (permalink / raw)
  To: Royce Niu; +Cc: dev
In-Reply-To: <CAOwUCNviASWFcbtDqaUovkN9HUtk-p8zijAFcFm9AyCYAzVh_g@mail.gmail.com>

On Mon, Jan 09, 2017 at 11:23:56PM +0800, Royce Niu wrote:
> Hi, Adrien,
> 
> Actually, I tested using the original kernel module without binding. It
> works.
> 
> However, it is only 6Mpps for 64B in pkt-gen, which is so slow for a 40Gbps
> NIC.
> 
> Is that right?

That's difficult to say without knowing your specific setup or application,
however 6 Mpps seems abnormally slow assuming testpmd performing basic I/O
forwarding using a single thread and two ports.

> On Mon, Jan 9, 2017 at 11:13 PM, Adrien Mazarguil <
> adrien.mazarguil@6wind.com> wrote:
> 
> > Hi Royce,
> >
> > On Mon, Jan 09, 2017 at 10:53:37PM +0800, Royce Niu wrote:
> > > Dear all,
> > >
> > > I cannot use my Mellanox 3 Pro, after I binded it with igb_uio driver.
> > >
> > > It always shows when I use my DPDK application.
> > >
> > > EAL: Detected 32 lcore(s)
> > > EAL: Probing VFIO support...
> > > PMD: bnxt_rte_pmd_init() called for (null)
> > > EAL: PCI device 0000:02:00.0 on NUMA socket 0
> > > EAL:   probe driver: 8086:1521 rte_igb_pmd
> > > EAL: PCI device 0000:02:00.1 on NUMA socket 0
> > > EAL:   probe driver: 8086:1521 rte_igb_pmd
> > > EAL: PCI device 0000:02:00.2 on NUMA socket 0
> > > EAL:   probe driver: 8086:1521 rte_igb_pmd
> > > EAL: PCI device 0000:02:00.3 on NUMA socket 0
> > > EAL:   probe driver: 8086:1521 rte_igb_pmd
> > > EAL: PCI device 0000:81:00.0 on NUMA socket 1
> > > EAL:   probe driver: 15b3:1007 librte_pmd_mlx4
> > > PMD: librte_pmd_mlx4: cannot access device, is mlx4_ib loaded?
> > > EAL: Error - exiting with code: 1
> > >   Cause: Cannot create mbuf pool
> > >
> > > ---------------
> > > I have added CONFIG_RTE_LIBRTE_MLX4_PMD=y in .config, and
> > > install MLNX_OFED_LINUX-3.4-2.0.0.0.
> >
> > The mlx4 PMD does not operate through igb_uio (see mlx4 documentation [1]),
> > PCI devices must remain bound to their original kernel module (mlx4_core),
> > however you have to additionally load mlx4_ib, mlx4_en and ib_uverbs [2].
> >
> > [1] http://dpdk.org/doc/guides/nics/mlx4.html
> > [2] http://dpdk.org/doc/guides/nics/mlx4.html#prerequisites
> >
> > --
> > Adrien Mazarguil
> > 6WIND
> >
> 
> 
> 
> -- 
> Regards,
> 
> Royce

-- 
Adrien Mazarguil
6WIND

^ permalink raw reply

* Re: [PATCH v5 11/12] drivers: update PMDs to use rte_driver probe and remove
From: Ferruh Yigit @ 2017-01-09 16:18 UTC (permalink / raw)
  To: Shreyansh Jain, david.marchand; +Cc: dev, thomas.monjalon
In-Reply-To: <8484fc57-baa7-0f84-bc05-49d7f4ca79ad@intel.com>

On 1/9/2017 3:19 PM, Ferruh Yigit wrote:
> On 12/26/2016 1:24 PM, Shreyansh Jain wrote:
>> These callbacks now act as first layer of PCI interfaces from the Bus.
>> Bus probe would enter the PMDs through the rte_driver->probe/remove
>> callbacks, falling to rte_xxx_driver->probe/remove (Currently, all the
>> drivers are rte_pci_driver).
>>
>> This patch also changes QAT which is the only crypto PMD based on PCI.
>> All others would be changed in a separate patch focused on VDEV.
>>
>> Signed-off-by: Shreyansh Jain <shreyansh.jain@nxp.com>
>> ---
<...>
>>
>> diff --git a/drivers/crypto/qat/rte_qat_cryptodev.c b/drivers/crypto/qat/rte_qat_cryptodev.c
>> index 1e7ee61..bc1a9c6 100644
>> --- a/drivers/crypto/qat/rte_qat_cryptodev.c
>> +++ b/drivers/crypto/qat/rte_qat_cryptodev.c
>> @@ -120,6 +120,10 @@ crypto_qat_dev_init(__attribute__((unused)) struct rte_cryptodev_driver *crypto_
>>  
>>  static struct rte_cryptodev_driver rte_qat_pmd = {
>>  	.pci_drv = {
>> +		.driver = {
>> +			.probe = rte_eal_pci_probe,
>> +			.remove = rte_eal_pci_remove,
>> +		},
> 
> Since this part is common for all PCI drivers, why not make this part of
> RTE_PMD_REGISTER_PCI macro?
> 

I have seen your comment [1] which looks like better idea.

providing a rte_bus->probe(),

and rte_eal_bus_probe() calls rte_bus->probe()

for pci devices, rte_bus->probe = rte_eal_pci_probe ..


[1]
http://dpdk.org/ml/archives/dev/2017-January/054125.html

Thanks,
ferruh

^ permalink raw reply

* [PATCH v2 1/2] net/i40e: fix segmentation fault in close
From: Bernard Iremonger @ 2017-01-09 16:29 UTC (permalink / raw)
  To: dev, wenzhuo.lu, jingjing.wu, helin.zhang; +Cc: Bernard Iremonger, stable
In-Reply-To: <1483979366-16914-1-git-send-email-bernard.iremonger@intel.com>

Change the order of releasing the vsi's.
Release the vmdq vsi's first, then release the main vsi.

Fixes: 4861cde46116 ("i40e: new poll mode driver")

CC: stable@dpdk.org

Signed-off-by: Bernard Iremonger <bernard.iremonger@intel.com>
---
 drivers/net/i40e/i40e_ethdev.c | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git a/drivers/net/i40e/i40e_ethdev.c b/drivers/net/i40e/i40e_ethdev.c
index 46def56..3c233e3 100644
--- a/drivers/net/i40e/i40e_ethdev.c
+++ b/drivers/net/i40e/i40e_ethdev.c
@@ -2066,18 +2066,17 @@ i40e_dev_close(struct rte_eth_dev *dev)
 	/* shutdown and destroy the HMC */
 	i40e_shutdown_lan_hmc(hw);
 
-	/* release all the existing VSIs and VEBs */
-	i40e_fdir_teardown(pf);
-	i40e_vsi_release(pf->main_vsi);
-
 	for (i = 0; i < pf->nb_cfg_vmdq_vsi; i++) {
 		i40e_vsi_release(pf->vmdq[i].vsi);
 		pf->vmdq[i].vsi = NULL;
 	}
-
 	rte_free(pf->vmdq);
 	pf->vmdq = NULL;
 
+	/* release all the existing VSIs and VEBs */
+	i40e_fdir_teardown(pf);
+	i40e_vsi_release(pf->main_vsi);
+
 	/* shutdown the adminq */
 	i40e_aq_queue_shutdown(hw, true);
 	i40e_shutdown_adminq(hw);
@@ -4335,6 +4334,9 @@ i40e_vsi_release(struct i40e_vsi *vsi)
 	if (!vsi)
 		return I40E_SUCCESS;
 
+	if (!vsi->adapter)
+		return I40E_ERR_BAD_PTR;
+
 	user_param = vsi->user_param;
 
 	pf = I40E_VSI_TO_PF(vsi);
-- 
2.10.1

^ permalink raw reply related

* [PATCH v2 0/2] net/i40e: fix segmentation fault
From: Bernard Iremonger @ 2017-01-09 16:29 UTC (permalink / raw)
  To: dev, wenzhuo.lu, jingjing.wu, helin.zhang; +Cc: Bernard Iremonger

Changes in v2:
These two patches were previously part of the following patchset:
[PATCH v7 00/27] Support VFD on i40e

They are being submitted seperately as they are not needed for VFD.
The net/i40e patch has been revised.
The testpmd patch is needed to setup VMDq in order to test the fix.

Bernard Iremonger (2):
  net/i40e: fix segmentation fault in close
  app/testpmd: add command to configure VMDq

 app/test-pmd/cmdline.c                      |  60 +++++++++++++
 app/test-pmd/testpmd.c                      | 126 ++++++++++++++++++++++++++++
 app/test-pmd/testpmd.h                      |   1 +
 doc/guides/testpmd_app_ug/testpmd_funcs.rst |   7 ++
 drivers/net/i40e/i40e_ethdev.c              |  12 +--
 5 files changed, 201 insertions(+), 5 deletions(-)

-- 
2.10.1

^ permalink raw reply


This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox